//push.php header('Content-type:text/html;charset=utf-8;'); use Workerman\Worker; require_once './Workerman/Autoloader.php'; require_once "./CPdo.class.php"; $App = new CPdo(); function logout($App,$token){ $sql = "update app_user set islogin = 0 where token = '$token'"; $App->exec($sql); } // 初始化一个worker容器,监听1234端口 $worker = new Worker('websocket://0.0.0.0:1234'); // 这里进程数必须设置为1 $worker->count = 1; // worker进程启动后建立一个内部通讯端口 $worker->onWorkerStart = function($worker) { // 开启一个内部端口,方便内部系统推送数据,Text协议格式 文本+换行符 $inner_text_worker = new Worker('Text://0.0.0.0:5678'); $inner_text_worker->onMessage = function($connection, $buffer) { global $worker; // $data数组格式,里面有uid,表示向那个uid的页面推送数据 $data = json_decode($buffer, true); $uid = $data['token']; $msgData['type'] = $data['type']; $msgData['msg'] = $data['msg']; $msgData['event'] = $data['event']; $msgData=json_encode($msgData); // 通过workerman,向uid的页面推送数据 $ret = sendMessageByUid($uid, $msgData); // broadcast($uid); // 返回推送结果 $connection->send($ret ? 'ok' : 'fail'); }; $inner_text_worker->listen(); }; // 新增加一个属性,用来保存uid到connection的映射 $worker->uidConnections = array(); // 当有客户端发来消息时执行的回调函数 $worker->onMessage = function($connection, $data)use($worker) { // 判断当前客户端是否已经验证,既是否设置了uid if(!isset($connection->uid)) { if(strpos($data,'"') >= 0){ $data = trim($data,'"'); } // 没验证的话把第一个包当做uid(这里为了方便演示,没做真正的验证) $connection->uid = $data; /* 保存uid到connection的映射,这样可以方便的通过uid查找connection, * 实现针对特定uid推送数据 */ $worker->uidConnections[$connection->uid] = $connection; return; } }; // 当有客户端连接断开时 $worker->onClose = function($connection)use($worker) { global $worker; if(isset($connection->uid)) { global $App; // 连接断开时删除映射 unset($worker->uidConnections[$connection->uid]); logout($App,$connection->uid); } }; // 向所有验证的用户推送数据 function broadcast($message) { global $worker; foreach($worker->uidConnections as $connection) { $connection->send($message); } } // 针对uid推送数据 function sendMessageByUid($uid, $message) { global $worker; if(isset($worker->uidConnections[$uid])) { $connection = $worker->uidConnections[$uid]; $connection->send($message); return true; } return false; } // 运行所有的worker(其实当前只定义了一个) Worker::runAll();
public function sendToWs($type='',$token='',$msg='',$event=''){ // 建立socket连接到内部推送端口 $client = stream_socket_client('tcp://127.0.0.1:5678', $errno, $errmsg, 1); // 推送的数据,包含uid字段,表示是给这个uid推送 $data = array('type'=>$type, 'token'=>$token, 'msg'=>$msg , 'event'=>$event); // 发送数据,注意5678端口是Text协议的端口,Text协议需要在数据末尾加上换行符 fwrite($client, json_encode($data)."\n"); // 读取推送结果 // echo fread($client, 8192); }
class CPdo{ protected $_dsn = "mysql:host=192.168.1.228;dbname=app"; protected $_name = "dbuser"; protected $_pass = "123456"; protected $_condition = array(); protected $pdo; protected $fetchAll; protected $query; protected $result; protected $num; protected $mode; protected $prepare; protected $row; protected $fetchAction; protected $beginTransaction; protected $rollback; protected $commit; protected $char; private static $get_mode; private static $get_fetch_action; /** *pdo construct */ // 构造 public function __construct($pconnect = false) { $this->_condition = array(PDO::ATTR_PERSISTENT => $pconnect); $this->pdo_connect(); } /** *pdo connect */ // 连接PDO private function pdo_connect() { try{ $this->pdo = new PDO($this->_dsn,$this->_name,$this->_pass,$this->_condition); } catch(Exception $e) { return $this->setExceptionError($e->getMessage(), $e->getline, $e->getFile); } } /** *self sql get value action */ // 查询 public function getValueBySelfCreateSql($sql, $fetchAction = "assoc",$mode = null) { $this->fetchAction = $this->fetchAction($fetchAction); $this->result = $this->setAttribute($sql, $this->fetchAction, $mode); $this->AllValue = $this->result->fetchAll(); return $this->AllValue; } /** *select condition can query */ // 获取并处理data private function setAttribute($sql, $fetchAction, $mode) { $this->mode = self::getMode($mode); $this->fetchAction = self::fetchAction($fetchAction); $this->pdo->setAttribute(PDO::ATTR_CASE, $this->mode); $this->query = $this->base_query($sql); $this->query->setFetchMode($this->fetchAction); return $this->query; } /** *get mode action */ // 大小写模式 private static function getMode($get_style){ switch($get_style) { case null: self::$get_mode = PDO::CASE_NATURAL; break; case true: self::$get_mode = PDO::CASE_UPPER; break; case false; self::$get_mode= PDO::CASE_LOWER; break; } return self::$get_mode; } /** *fetch value action */ // model的形式 private static function fetchAction($fetchAction) { switch($fetchAction) { case "assoc": self::$get_fetch_action = PDO::FETCH_ASSOC; //asso array break; case "num": self::$get_fetch_action = PDO::FETCH_NUM; //num array break; case "object": self::$get_fetch_action = PDO::FETCH_OBJ; //object array break; case "both": self::$get_fetch_action = PDO::FETCH_BOTH; //assoc array and num array break; default: self::$get_fetch_action = PDO::FETCH_ASSOC; break; } return self::$get_fetch_action; } /** *get total num action */ // 获取结果行数 public function rowCount($sql) { $this->result = $this->base_query($sql); $this->num = $this->result->rowCount(); return $this->num; } /* *simple query and easy query action */ // 简易查询 public function query($table, $column = "*",$condition = array(), $group = "",$order = "", $having = "", $startSet = "",$endSet = "",$fetchAction = "assoc",$params = null){ $sql = "select ".$column." from `".$table."` "; if ($condition != null) { foreach($condition as $key=>$value) { $where .= "$key = '$value' and "; } $sql .= "where $where"; $sql .= "1 = 1 "; } if ($group != "") { $sql .= "group by ".$group." "; } if ($order != "") { $sql .= " order by ".$order." "; } if ($having != "") { $sql .= "having '$having' "; } if ($startSet != "" && $endSet != "" && is_numeric($endSet) && is_numeric($startSet)) { $sql .= "limit $startSet,$endSet"; } $this->result = $this->getValueBySelfCreateSql($sql, $fetchAction, $params); return $this->result; } /** *execute delete update insert and so on action */ // CURL执行 public function exec($sql) { $this->result = $this->pdo->exec($sql); $substr = substr($sql, 0 ,6); if ($this->result) { return $this->successful($substr); } else { return $this->fail($substr); } } /** *prepare action */ // 预执行 public function prepare($sql) { $this->prepare = $this->pdo->prepare($sql); $this->setChars(); $this->prepare->execute(); while($this->rowz = $this->prepare->fetch()) { return $this->row; } } /** *USE transaction */ // 事务处理 public function transaction($sql) { $this->begin(); $this->result = $this->pdo->exec($sql); if ($this->result) { $this->commit(); } else { $this->rollback(); } } /** *start transaction */ // 开始事务 private function begin() { $this->beginTransaction = $this->pdo->beginTransaction(); return $this->beginTransaction; } /** *commit transaction */ // 事务提交 private function commit() { $this->commit = $this->pdo->commit(); return $this->commit; } /** *rollback transaction */ // 事务撤销 private function rollback() { $this->rollback = $this->pdo->rollback(); return $this->rollback; } /** *base query */ // 基本查询 private function base_query($sql) { $this->setChars(); $this->query = $this->pdo->query($sql); return $this->query; } /** *set chars */ // 设置字符集 private function setChars() { $this->char = $this->pdo->query("SET NAMES 'UTF8'"); return $this->char; } /** *process sucessful action */ // 成功提示信息 private function successful($params){ return "The ".$params." action is successful"; } /** *process fail action */ // 失败提示信息 private function fail($params){ return "The ".$params." action is fail"; } /** *process exception action */ // 异常 private function setExceptionError($getMessage, $getLine ,$getFile) { echo "Error message is ".$getMessage."<br /> The Error in ".$getLine." line <br /> This file dir on ".$getFile; exit(); } }