Connection 类
每个客户端连接对应一个Connection对象,可以设置对象的onMessage、onClose等回调,同时提供了向客户端发送数据send接口与关闭连接close接口,以及其它一些必要的接口。
Connection 类 属性
id
连接的id。这是一个自增的整数。
protocol
设置当前连接的协议类
worker
此属性为只读属性,即当前connection对象所属的worker实例
maxSendBufferSize
每个连接都有一个单独的应用层发送缓冲区,如果客户端接收速度小于服务端发送速度,数据会在应用层缓冲区暂存等待发送 默认1M
defaultMaxSendBufferSize
此属性为全局静态属性,用来设置所有连接的默认应用层发送缓冲区大小。不设置默认为1MB。
maxPackageSize
此属性为全局静态属性,用来设置每个连接能够接收的最大包包长。不设置默认为10MB。
onMessage
作用与Worker::$onMessage回调相同,区别是只针对当前连接有效,也就是可以针对某个连接的设置onMessage回调。
onClose
此回调与Worker::$onClose回调作用相同,区别是只针对当前连接有效,也就是可以针对某个连接的设置onClose回调。
onBufferFull
作用与Worker::$onBufferFull回调相同,区别是只针对当前连接起作用,即可以单独设置某个连接的onBufferFull回调
onBufferDrain
作用与Worker::$onBufferDrain回调相同,区别是只针对当前连接起作用,即可以单独设置某个连接的onBufferDrain回调
onError
作用与Worker::$onError回调相同,区别是只针对当前连接起作用,即可以单独设置某个连接的onError回调
connection方法
send
向客户端发送数据
参数
$data
要发送的数据,如果在初始化Worker类时指定了协议,则会自动调用协议的encode方法,完成协议打包工作后发送给客户端
$raw
是否发送原始数据,即不调用协议的encode方法,默认是false,即自动调用协议的encode方法
返回值
true 表示数据已经成功写入到该连接的操作系统层的socket发送缓冲区
null 表示数据已经写入到该连接的应用层发送缓冲区,等待向系统层socket发送缓冲区写入
false 表示发送失败,失败原因可能是客户端连接已经关闭,或者该连接的应用层发送缓冲区已满
getRemoteIp
获得该连接的客户端ip
getRemotePort
获得该连接的客户端端口
close
安全的关闭连接.
调用close会等待发送缓冲区的数据发送完毕后才关闭连接,并触发连接的onClose回调。
destroy
立刻关闭连接。
与close不同之处是,调用destroy后即使该连接的发送缓冲区还有数据未发送到对端,连接也会立刻被关闭,并立刻触发该连接的onClose回调。
pauseRecv
使当前连接停止接收数据。该连接的onMessage回调将不会被触发。此方法对于上传流量控制非常有用
resumeRecv
使当前连接继续接收数据。此方法与Connection::pauseRecv配合使用,对于上传流量控制非常有用
pipe
将当前连接的数据流导入到目标连接。内置了流量控制。此方法做TCP代理非常有用
定时器 Timer
add
定时执行某个函数或者类方法。
注意:定时器是在当前进程中运行的,workerman中不会创建新的进程或者线程去运行定时器。
参数
time_interval
多长时间执行一次,单位秒,支持小数,可以精确到0.001
callback
回调函数注意:如果回调函数是类的方法,则方法必须是public属性
args
回调函数的参数,必须为数组,数组元素为参数值
persistent
是否是持久的,如果只想定时执行一次,则传递false(只执行一次的任务在执行完毕后会自动销毁,不必调用Timer::del())。默认是true,即一直定时执行。
del
删除某个定时器
参数
timer_id
定时器的id,即add接口返回的整型
WebServer
WorkerMan自带了一个简单的Web服务器,同样也是基于Worker实现的。文件位置在Workerman/WebServer.php。
定制通讯协议
协议的补充 ws 协议 以及 frame协议
workerman可以作为客户端通过ws协议发起websocket连接,连到远程websocket服务器,实现双向通讯
workerman定义了一种叫做frame的协议,协议格式为 总包长+包体,其中包长为4字节网络字节序的整数,包体可以是普通文本或者二进制数据。
如何定制协议
1 区分数据边界的标识
2 数据格式定义
协议格式
{"status":"success"}."\n"
实现步骤
1、协议文件放到项目的Protocols文件夹
2、以namespace Workerman\Protocols;为命名空间,必须实现三个静态方法分别为 input、encode、decode
注意:workerman会自动调用这三个静态方法,用来实现分包、解包、打包。具体流程参考下面执行流程说明。
协议交互流程
1 客户端发送一个数据包给服务端,服务端收到数据调用协议的input方法,返回长度值$length给workerman框架
2 得到这个$length值后、就会从缓冲区截取出$length长度的数据(即分包),并调用协议的decode方法解包,解包后的数据为$data。
3解包后workerman将数据$data以回调onMessage($connection, $data)的形式传递给业务
4 服务端给客户端发送数据时,workerman会自动利用协议的encode方法将$buffer打包后再发给客户端
Channel分布式通讯组件
基于订阅的多进程通讯组件,用于workerman进程间通讯或者服务器集群通讯,类似redis订阅发布机制。基于workerman开发。
GIT地址:https://github.com/walkor/Channel
1、基于订阅发布模型
2、非阻塞式IO
基于 workerman
原理
Channel包含Channel/Server服务端和Channel/Client客户端
Channel/Client通过connect接口连接Channel/Server并保持长连接
Channel/Client通过调用on接口告诉Channel/Server自己关注哪些事件,并注册事件回调函数(回调发生在Channel/Client所在进程中)
Channel/Client通过publish接口向Channel/Server发布某个事件及事件相关的数据
Channel/Server接收事件及数据后会分发给关注这个事件的Channel/Client
Channel/Client收到事件及数据后触发on接口设置的回调
Channel/Client只会收到自己关注事件并触发回调
服务端
__construct([string $listen_ip = '0.0.0.0', int $listen_port = 2206])
参数
listen_ip
监听的本机ip地址,不传默认是0.0.0.0
listen_port
监听的端口,不传默认是2206
require_once 'Workerman/Autoloader.php';
require_once 'Channel/src/Server.php';
use Workerman\Worker;
use Channel\Server;
$channel_server = new Server('0.0.0.0',3333);
Worker::runAll();
connect
Client::connect([string $listen_ip = '127.0.0.1', int $listen_port = 2206])
参数
listen_ip
Channel/Server 监听的ip地址,不传默认是127.0.0.1
listen_port
Channel/Server监听的端口,不传默认是2206
Client::connect('127.0.0.1',3333);
on
on(string $event_name, callback $callback_function)
订阅$event_name事件并注册事件发生时的回调$callback_function
回调函数的参数
$event_name
订阅的事件名称,可以是任意的字符串。
$callback_function
事件发生时触发的回调函数。函数原型为callback_function(mixed $event_data)。$event_data是事件发布(publish)时传递的事件数据。
注意:
如果同一个事件注册了两个回调函数,后一个回调函数将覆盖前一个回调函数。
publish
publish(string $event_name, mixed $event_data)
发布某个事件,所有这个事件的订阅者会收到这个事件并触发on($event_name, $callback)注册的回调$callback
参数
$event_name
发布的事件名称,可以是任意的字符串。如果事件没有任何订阅者,事件将被忽略。
$event_data
事件相关的数据,可以是数字、字符串或者数组
unsubscribe
unsubscribe(string $event_name)
取消订阅某个事件,这个事件发生时将不会再触发on($event_name, $callback)注册的回调$callback
参数
$event_name
事件名称
基于channel通讯组件实战消息推送
视频地址:https://ke.qq.com/course/398052?tuin=30289dc0
心跳
注意:长连接应用必须加心跳,否则连接可能由于长时间未通讯被路由节点强行断开。断电断网。
心跳实现:
1、客户端定时给服务端发送点数据,防止连接由于长时间没有通讯而被某些节点的防火墙关闭导致连接断开的情况。
2、服务端可以通过心跳来判断客户端是否在线,如果客户端在规定时间内没有发来任何数据,就认为客户端下线。这样可以检测到客户端由于极端情况(断电、断网等)下线的事件。
心跳间隔建议值:
建议客户端发送心跳间隔小于60秒,比如55秒。
workerman开发注意事项
1 长连接必须加心跳
2 协议对应 防火墙 端口放开
3 不要使用exit die sleep语句
4 业务代码里不要有死循环
5 改代码要重启
6 长连接应用建议用GatewayWorker框架
7 支持更高并发
如果业务并发连接数超过1000同时在线,请务必优化linux内核,并安装event扩展或者libevent扩展。
调优
http://doc.workerman.net/appendices/kernel-optimization.html
tp5使用workerman
安装 5.0.*
composer require topthink/think-worker=1.0.*
服务端 server.php 放在代码根目录
define('APP_PATH', __DIR__ . '/application/');
define(‘BIND_MODULE‘,’test/Index');
require __DIR__ . '/thinkphp/start.php';
namespace app\test\controller;
use think\worker\Server;
class index extends Server{
protected $socket = 'text://0.0.0.0:8888';
protected $processes = 6;
public function onMessage ($connection,$data){
echo $data;
$connection->send('hello think worker');
}
public function onWorkerStart ($worker){
echo 'worke start'.PHP_EOL;
}
public function onConnect($connection){
$connection->send('connect success');
}
}
安装 5.1.*
composer require topthink/think-worker
启动
php think worker:server
配置文件
config/worker_server.php
并且支持workerman所有的参数(包括全局静态参数)。
也支持使用闭包方式定义相关事件回调。
'worker_class' => ['app\test\http','app\test\text'], // 自定义Workerman服务类名 支持数组定义多个服务
namespace app\test;
use think\worker\Server;
class text extends Server{
protected $socket = 'text://0.0.0.0:5555';
protected $option = ['name'=>'text','count'=>4];
public function onMessage($connection,$data){
$connection->send('hello worker');
}
}
欢迎加入workerman/swool技术交流群:604438441
本文章对应的视频教程地址:https://ke.qq.com/course/398052?tuin=30289dc0