laravel广播 - laravel-echo-server服务端 - laravel-echo客户端的交互过程及原理
交互图
!!! 事先说明 :
①本文章是以redis作为驱动来解读的。
② 文章中的代码注释一定看,介绍都在注释中!!!
此图如有侵权,通知我删除 ;
通过上图可以看得出来,laravel广播事件与前端是没有直接交互的。
大概流程就是 :laravel广播事件通过redis的发布订阅功能发布频道,laravel-echo-server链接redis订阅频道获取发布的数据,通过socket主动推送给laravel-echo,前端渲染。
现在我们就跟着这个流程读一下源码吧!
第一步:laravel广播发布数据到redis中。
(1)进入广播服务提供者类所在文件夹中 :\vendor\laravel\framework\src\Illuminate\Broadcasting
(2) 写一个发送广播事件的demo
use Illuminate\Broadcasting\Channel;
use Illuminate\Queue\SerializesModels;
use Illuminate\Broadcasting\PrivateChannel;
use Illuminate\Broadcasting\PresenceChannel;
use Illuminate\Foundation\Events\Dispatchable;
use Illuminate\Broadcasting\InteractsWithSockets;
use Illuminate\Contracts\Broadcasting\ShouldBroadcast;
class PrivateMessageEvent implements ShouldBroadcast
{
use Dispatchable, InteractsWithSockets, SerializesModels;
public function __construct(){}
/**
* 获取广播事件对应的频道。
*/
public function broadcastOn()
{
return new PrivateChannel('PrivateMessage.' . $this->admin_id );
}
/**
* 自定义广播数据
* @return mixed
*/
public function broadcastWith()
{
return $this->data;
}
}
// 发布广播事件
broadcast(new PrivateMessageEvent();
(3) 进入 BroadcastEvent.php 广播事件类找 handle 方法
/**
* 创建一个新的作业处理程序实例。
* @param mixed $event 广播事件类实例
*/
public function __construct($event)
{
$this->event = $event;
}
/**
* 处理排队的作业。
* @param \Illuminate\Contracts\Broadcasting\Broadcaster $broadcaster
*/
public function handle(Broadcaster $broadcaster)
{
// 主要看是否自定义广播事件名称,没有则使用类名
$name = method_exists($this->event, 'broadcastAs')
? $this->event->broadcastAs() : get_class($this->event);
// 发布广播数据到redis中
$broadcaster->broadcast(
// 参1: 获取广播的频道名
Arr::wrap($this->event->broadcastOn()),
// 参2:事件名
$name,
// 参3:获取自定义广播数据 / 广播类的公共属性 / socket
$this->getPayloadFromEvent($this->event)
);
}
(4)追一下这个方法:getPayloadFromEvent($this->event) 发现它会返回 (自定义广播数据 / 广播类的公共属性 / socket)
/**
* 获取给定事件的有效负载。
*
* @param mixed $event 广播事件类对象
* @return array
*/
protected function getPayloadFromEvent($event)
{
// 判断事件类中是否有broadcastWith方法
if (method_exists($event, 'broadcastWith')) {
// 返回自定义载核 以及 对象中叫socket的属性值
return array_merge(
$event->broadcastWith(), ['socket' => data_get($event, 'socket')]
);
}
$payload = [];
foreach ((new ReflectionClass($event))->getProperties(ReflectionProperty::IS_PUBLIC) as $property) {
$payload[$property->getName()] = $this->formatProperty($property->getValue($event));
}
unset($payload['broadcastQueue']);
// 返回所有公共属性
return $payload;
}
(5)返回刚才的地方再追一下 $broadcaster->broadcast() 方法,然后向下找到我们要使用的redis驱动类。进入!
/**
* 广播给定事件。
*
* @param array $channels 广播对应频道名称
* @param string $event 广播事件名称
* @param array $payload 广播数据
* @return void
*/
public function broadcast(array $channels, $event, array $payload = [])
{
// 建立redis链接
$connection = $this->redis->connection($this->connection);
// 写入redis的数据 : 事件名 、 载核 、 socket
$payload = json_encode([
'event' => $event,
'data' => $payload,
'socket' => Arr::pull($payload, 'socket'),
]);
foreach ($this->formatChannels($channels) as $channel) {
// 发布广播数据到redis频道中
$connection->publish($channel, $payload);
}
}
至此我们的广播数据已经发布到redis中!
第二步 : laravel-echo-server 在redis中订阅。
(1) 进入laravel-echo-server包
里面,可以在 github 中下载看看。建议要看一下。
|____api
| |____http-api.ts
| |____index.ts
|____channels
| |____channel.ts
| |____index.ts
| |____presence-channel.ts
| |____private-channel.ts
|____cli
| |____cli.ts
| |____index.ts
|____database
| |____database-driver.ts
| |____database.ts
| |____index.ts
| |____redis.ts
| |____sqlite.ts
|____echo-server.ts
|____index.ts
|____log.ts
|____server.ts
|____subscribers
| |____http-subscriber.ts
| |____index.ts
| |____redis-subscriber.ts
| |____subscriber.ts
laravel-echo-server 目录中主要包含:接口 (api)、频道 (channels)、 数据库 (database)、订阅 (subscribers) 等
(2) 当执行laravel-echo-server start
的时候就是从 echo-server.js
中的run()
方法开始的
/*
* L A R A V E L E C H O S E R V E R
version 1.6.2
⚠ Starting server in DEV mode...
✔ Running at localhost on port 6001
✔ Channels are ready.
✔ Listening for http events...
✔ Listening for redis events...
Server ready!
* !!!!!!!!!!! 上面控制台输出就是run方法执行过程中输出的。
*/
EchoServer.prototype.run = function (options) {
var _this = this;
return new Promise(function (resolve, reject) {
// 判断是否自定义配置,没有则用默认的
_this.options = Object.assign(_this.defaultOptions, options);
// 输出到控制台
_this.startup();
// 意思是在host:port上面运行服务,准备通道,监听http事件,监听redis事件
_this.server = new server_1.Server(_this.options);
// 服务器初始化完毕。
_this.server.init().then(function (io) {
_this.init(io).then(function () {
log_1.Log.info('\nServer ready!\n');
resolve(_this);
}, function (error) { return log_1.Log.error(error); });
}, function (error) { return log_1.Log.error(error); });
});
};
(3) 启动过后就处于监听状态了,我们进入监听函数listen
中看一下
EchoServer.prototype.listen = function () {
var _this = this;
return new Promise(function (resolve, reject) {
var subscribePromises = _this.subscribers.map(function (subscriber) {
// 这里就是订阅redis ,在回调函数中发送到socket上
return subscriber.subscribe(function (channel, message) {
return _this.broadcast(channel, message);
});
});
Promise.all(subscribePromises).then(function () { return resolve(); });
});
};
(4) 在这里我们发现循环了_this.subscribers
属性我们追一下,看看这个里面都存了什么东西
- 我们追到了
init
初始化方法中,发现在往_this.subscribers
这个属性中push实例
EchoServer.prototype.init = function (io) {
var _this = this;
return new Promise(function (resolve, reject) {
_this.channel = new channels_1.Channel(io, _this.options);
_this.subscribers = [];
if (_this.options.subscribers.http)
_this.subscribers.push(new subscribers_1.HttpSubscriber(_this.server.express, _this.options));
// 因为我们用的redis驱动,直接看这里
if (_this.options.subscribers.redis)
_this.subscribers.push(new subscribers_1.RedisSubscriber(_this.options));
_this.httpApi = new api_1.HttpApi(io, _this.channel, _this.server.express, _this.options.apiOriginAllow);
_this.httpApi.init();
_this.onConnect();
_this.listen().then(function () { return resolve(); }, function (err) { return log_1.Log.error(err); });
});
};
- 我们继续追代码追过去。来到了
\dist\subscribers\redis-subscriber.js
中。因为(3)使用了这个实例的subscribe
方法,我们看一下这个方法做了什么!
/*
通过订阅以 _this._keyPrefix 为前缀的所有频道获取广播数据,
然后调用外层的回调函数发布到 socket 中。
*/
RedisSubscriber.prototype.subscribe = function (callback) {
var _this = this;
return new Promise(function (resolve, reject) {
// 监听pmessage , 回调函数 参2: 订阅的广播频道名称 ; 参3 : 广播数据
_this._redis.on('pmessage', function (subscribed, channel, message) {
try {
message = JSON.parse(message);
if (_this.options.devMode) {
log_1.Log.info("Channel: " + channel); // 将广播名 输出到控制台
log_1.Log.info("Event: " + message.event); // 将事件名 输出到控制台
}
/*
调用外层回调
function (channel, message) {
return _this.broadcast(channel, message);
}
*/
callback(channel.substring(_this._keyPrefix.length), message);
}
catch (e) {
if (_this.options.devMode) {
log_1.Log.info("No JSON message");
}
}
});
// 订阅以 _this._keyPrefix 为前缀的所有频道
_this._redis.psubscribe(_this._keyPrefix + "*", function (err, count) {
if (err) {
reject('Redis could not subscribe.');
}
log_1.Log.success('Listening for redis events...');
resolve();
});
});
};
(5)我们看一下 (3)的回调函数中_this.broadcast 方法
的作用
EchoServer.prototype.broadcast = function (channel, message) {
// 判断是否私有
if (message.socket && this.find(message.socket)) {
// 发送给已经授权的人
return this.toOthers(this.find(message.socket), channel, message);
}
else {
// 发送给所有
return this.toAll(channel, message);
}
};
至此laravel-echo-server 订阅redis中的广播数据,并发布到socket就完成了。
第三步 : 公共 – 前端laravel-echo 与 laravel-echo-server 交互
(1) 实例化laravel-echo
传入配置参数,接收socket服务器的公开频道和监听事件
window.Echo = new Echo({
broadcaster: 'socket.io',
host: window.location.hostname + ':6001',
auth:
{
headers:
{
'authorization': 'Bearer ' + store.getters.token
}
}
});
window.Echo.channel('public_channel')
.listen('RssPublicEvent', (e) => {
that.names.push(e.name)
});
(2) 根据new Echo()
—> 进入到laravel-echo包中的echo.ts,找到 constructor 构造方法
/**
* 创建一个新的类实例。
*/
constructor(options: any) {
this.options = options; // 初始化配置参数
this.connect();
if (!this.options.withoutInterceptors) {
this.registerInterceptors();
}
/*
this.connect() 对应代码
/**
* 创建一个新的连接。
*/
connect(): void {
if (this.options.broadcaster == 'pusher') {
this.connector = new PusherConnector(this.options);
} else if (this.options.broadcaster == 'socket.io') {
// 此类创建到Socket.io服务器的连接器
this.connector = new SocketIoConnector(this.options);
} else if (this.options.broadcaster == 'null') {
this.connector = new NullConnector(this.options);
} else if (typeof this.options.broadcaster == 'function') {
this.connector = new this.options.broadcaster(this.options);
}
}
*/
}
(3) 追一下 window.Echo.channel('public_channel')
,进入 echo.ts中channel()
方法
/**
* 通过名称获取通道实例。
*/
channel(channel: string): Channel {
// 根据广播名称获取通道实例
return this.connector.channel(channel);
}
/**
* 通过名称获取通道实例。
*/
channel(name: string): SocketIoChannel {
// 判断属性中是否保存了对应的通道实例
if (!this.channels[name]) {
// 获取通道实例并保存在属性中 SocketIoChannel(socket链接,通道名称,配置参数)
this.channels[name] = new SocketIoChannel(this.socket, name, this.options);
}
// 返回通道实例
return this.channels[name];
}
(4)追一下 new SocketIoChannel()
,进入到 SocketIoChannel类中看一下构造函数中做了什么事
/**
* 此类表示Socket.io通道。
*/
export class SocketIoChannel extends Channel {
/**
* 创建一个新的类实例。 构造函数
*/
constructor(socket: any, name: string, options: any) {
super();
this.name = name;
this.socket = socket;
this.options = options;
this.eventFormatter = new EventFormatter(this.options.namespace);
// 订阅Socket.io频道
this.subscribe();
/*
追 subscribe() 查看以下代码
//订阅Socket.io频道。
subscribe(): void {
// 发送事件
this.socket.emit('subscribe', {
channel: this.name,
auth: this.options.auth || {},
});
}
*/
this.configureReconnector();
}
}
(6)在(5)中的实例化SocketIoChannel类时向socket发送了事件名叫subscribe
的,我们看看laravel-echo-server怎么接收它的。并做了什么事?
① 在 init()
中有 _this.onConnect();
进入!
EchoServer.prototype.onConnect = function () {
var _this = this;
this.server.io.on('connection', function (socket) {
_this.onSubscribe(socket); //监听客户端
_this.onUnsubscribe(socket);
_this.onDisconnecting(socket);
_this.onClientEvent(socket);
});
};
② 进入 onSubscribe
里面发现有在监听 subscribe
EchoServer.prototype.onSubscribe = function (socket) {
var _this = this;
socket.on('subscribe', function (data) {
_this.channel.join(socket, data);
});
};
③ 进入 join
,因为这里是公共的 ,所以会走else
,从而建立通道
Channel.prototype.join = function (socket, data) {
if (data.channel) {
// 如果是私有的
if (this.isPrivate(data.channel)) {
this.joinPrivate(socket, data);
}
else {
// 建立通道
socket.join(data.channel);
// 将通道名输出到控制台中
this.onJoin(socket, data.channel);
}
}
};
从.channel()返回了SocketIoChannel类的实例,而在在实例化SocketIoChannel类的构造函数中发送一个事件到socket里,laravel-echo-server接收然后建立通道(所以这个通道的建立实在这个地方)。
(7)继续向下走我们追一下 .listen()
我们进入 SocketIoChannel
类中找到 listen
方法
// socketio-connector.ts 在通道实例上监听事件。
listen(event: string, callback: Function): SocketIoChannel {
this.on(this.eventFormatter.format(event), callback);
return this;
}
/**
* 将通道的套接字绑定到事件并存储回调。
*/
on(event: string, callback: Function): void {
let listener = (channel, data) => {
if (this.name == channel) {
callback(data);
}
};
// 监听通道上的事件
this.socket.on(event, listener);
this.bind(event, listener);
}
至此 laravel-echo 完成了与laravel-echo-server的交互。
第四步 私有–前端laravel-echo 与 laravel-echo-server 交互
window.Echo = new Echo({
broadcaster: 'socket.io',
host: window.location.hostname + ':6001',
auth:
{
headers:
{
'authorization': 'Bearer ' + store.getters.token
}
}
});
window.Echo.private('private_channel.1')
.listen('RssoPrivateEvent', (e) => {
that.names.push(e.name)
});
(2)实例化Echo因为上面有说,这里直接追private()进入方法中看一下
/**
* 通过名称获取私有通道实例.
*/
private(channel: string): Channel {
return this.connector.privateChannel(channel);
}
/**
* 通过名称获取私有通道实例。
*/
privateChannel(name: string): SocketIoPrivateChannel {
// 这里拼接private-然后判断私有通道是否存在
if (!this.channels['private-' + name]) {
// 不存在就实例化一个
this.channels['private-' + name] = new SocketIoPrivateChannel(this.socket, 'private-' + name, this.options);
}
// 返回私有通道
return this.channels['private-' + name];
}
(3)最终也追到了 SocketIoChannel类
,进入到 SocketIoChannel类中,依旧发送事件
/**
* 此类表示Socket.io通道。
*/
export class SocketIoChannel extends Channel {
/**
* 创建一个新的类实例。 构造函数
*/
constructor(socket: any, name: string, options: any) {
super();
this.name = name;
this.socket = socket;
this.options = options;
this.eventFormatter = new EventFormatter(this.options.namespace);
// 订阅Socket.io频道
this.subscribe();
/*
追 subscribe() 查看以下代码
//订阅Socket.io频道。
subscribe(): void {
// 发送事件
this.socket.emit('subscribe', {
channel: this.name,// 拼接了private-
auth: this.options.auth || {},
});
}
*/
this.configureReconnector();
}
}
(4)在(3)中的实例化SocketIoChannel类时向socket发送了事件名叫subscribe
的,我们看看laravel-echo-server怎么接收它的。并做了什么事?
① 在 init()
中有 _this.onConnect();
进入!
EchoServer.prototype.onConnect = function () {
var _this = this;
this.server.io.on('connection', function (socket) {
_this.onSubscribe(socket); //监听客户端
_this.onUnsubscribe(socket);
_this.onDisconnecting(socket);
_this.onClientEvent(socket);
});
};
② 进入 onSubscribe
里面发现有在监听 subscribe
EchoServer.prototype.onSubscribe = function (socket) {
var _this = this;
socket.on('subscribe', function (data) {
_this.channel.join(socket, data);
});
};
③ 进入 join
,因为这里是私有的 ,所以会走if
Channel.prototype.join = function (socket, data) {
if (data.channel) {
// 如果是私有的
if (this.isPrivate(data.channel)) {
this.joinPrivate(socket, data);
}
else {
// 建立通道
socket.join(data.channel);
// 将通道名输出到控制台中
this.onJoin(socket, data.channel);
}
}
};
④ 进入 joinPrivate();
,这里发送了鉴权请求到laravel项目,鉴权成功则建立通道,相比公共的,这里多了个鉴权操作。
Channel.prototype.joinPrivate = function (socket, data) {
var _this = this;
// 直接向laravel项目发送鉴权请求
this.private.authenticate(socket, data).then(function (res) {
// 鉴权成功则 建立通道
socket.join(data.channel);
if (_this.isPresence(data.channel)) {
var member = res.channel_data;
try {
member = JSON.parse(res.channel_data);
}
catch (e) { }
_this.presence.join(socket, data.channel, member);
}
// 输出到控制台
_this.onJoin(socket, data.channel);
}, function (error) {
if (_this.options.devMode) {
log_1.Log.error(error.reason);
}
_this.io.sockets.to(socket.id)
.emit('subscription_error', data.channel, error.status);
});
};