什么是发布订阅
在软件架构中,发布订阅是一种消息范式,消息的发送者(称为发布者)不会将消息直接发送给特定的接收者(称为订阅者)。而是将发布的消息分为不同的类别,无需了解哪些订阅者(如果有的话)可能存在。同样的,订阅者可以表达对一个或多个类别的兴趣,只接收感兴趣的消息,无需了解哪些发布者(如果有的话)存在。
发布订阅模式可以理解为观察者模式
的扩展,个人理解区别在于发布订阅是由中介控制的
意图:定义对象间的一种一对多的依赖关系,当一个对象的状态发生改变时,所有依赖于它的对象都得到通知并被自动更新。
node中的发布订阅 EventEmitter
先从文档上看下使用方式
const myEE = new EventEmitter();
myEE.on('foo', () => console.log('a'));
myEE.emit('foo');
订阅on
EventEmitter.prototype.on = EventEmitter.prototype.addListener;
EventEmitter.prototype.addListener = function addListener(type, listener) {
return _addListener(this, type, listener, false);
};
EventEmitter在原型上定义了on
方法 实际内部执行的是_addListener
这个函数
function _addListener(target, type, listener, prepend) {
// (●'◡'●) 最多几个监听器
let m;
// (●'◡'●) 监听事件
let events;
// (●'◡'●) 存档监听器的容器
let existing;
// (●'◡'●) 用于检查listener是不是function
checkListener(listener);
// (●'◡'●) 同步对象上已经有的监听事件
events = target._events;
// (●'◡'●) 如果不存在 就先初始化
if (events === undefined) {
events = target._events = ObjectCreate(null);
target._eventsCount = 0;
} else {
//(●'◡'●) !!!!防止循环调用
//EventEmitter 实例在新的监听器被添加到其内部监听器数组之前,会触发自身的 'newListener' 事件
if (events.newListener !== undefined) {
target.emit('newListener', type,
listener.listener ? listener.listener : listener);
// Re-assign `events` because a newListener handler could have caused the
// this._events to be assigned to a new object
events = target._events;
}
existing = events[type];
}
/**
*(●'◡'●) 这一段就是如果还没有就直接设置 如果有就变成数组(我们自己写的时候偷懒可以直接写成一个数组)
*/
if (existing === undefined) {
// Optimize the case of one listener. Don't need the extra array object.
events[type] = listener;
++target._eventsCount;
} else {
if (typeof existing === 'function') {
// Adding the second element, need to change to array.
existing = events[type] =
prepend ? [listener, existing] : [existing, listener];
// If we've already got an array, just append.
} else if (prepend) {
existing.unshift(listener);
} else {
existing.push(listener);
}
/**
*(●'◡'●) 这一段是检查最大监听器数量的 如果溢出就抛出一个错误
*/
m = _getMaxListeners(target);
if (m > 0 && existing.length > m && !existing.warned) {
existing.warned = true;
// No error code for this since it is a Warning
// eslint-disable-next-line no-restricted-syntax
const w = new Error('Possible EventEmitter memory leak detected. ' +
`${
existing.length} ${
String(type)} listeners ` +
`added to ${
inspect(target, {
depth: -1 })}. Use ` +
'emitter.setMaxListeners() to increase limit');
w.name = 'MaxListenersExceededWarning';
w.emitter = target;
w.type = type;
w.count = existing.length;
process.emitWarning(w);
}
}
return target;
}
从上面的代码可以知道实现订阅 需要一个事件map对象 key为事件 value存放监听器
发布emit
EventEmitter.prototype.emit = function emit(type, ...args) {
//(●'◡'●) 是不是'error'找个特殊类型
let doError = (type === 'error');
const events = this._events;
/**
*(●'◡'●) 这一段处理了一下 有错误事件的情况
*/
if (events !== undefined) {
// kErrorMonitor的定义是这样的 const kErrorMonitor = Symbol('events.errorMonitor');
if (doError && events[kErrorMonitor] !== undefined)
this.emit(kErrorMonitor, ...args);
doError = (doError && events.error === undefined);
} else if (!doError)
return false;
// If there is no 'error' event listener then throw.
/**
*(●'◡'●) 这一段 如果没有监听事件 但是发射了一个error 抛错
*/
if (doError) {
let er;
if (args.length > 0)
er = args[0];
if (er instanceof Error) {
try {
const capture = {
};
// eslint-disable-next-line no-restricted-syntax
Error.captureStackTrace(capture, EventEmitter.prototype.emit);
ObjectDefineProperty(er, kEnhanceStackBeforeInspector, {
value: enhanceStackTrace.bind(this, er, capture),
configurable: true
});
} catch {
}
// Note: The comments on the `throw` lines are intentional, they show
// up in Node's output if this results in an unhandled exception.
throw er; // Unhandled 'error' event
}
let stringifiedEr;
const {
inspect } = require('internal/util/inspect');
try {
stringifiedEr = inspect(er);
} catch {
stringifiedEr = er;
}
// At least give some kind of context to the user
const err = new ERR_UNHANDLED_ERROR(stringifiedEr);
err.context = er;
throw err; // Unhandled 'error' event
}
/**
*(●'◡'●) 这里开始进入正题
*/
const handler = events[type];
//(●'◡'●) 没处理函数直接return
if (handler === undefined)
return false;
if (typeof handler === 'function') {
//(●'◡'●) 这里没法直接读到源码 原因 https://stackoverflow.com/questions/59750976/what-are-primordials-in-node-js
//这里应该猜测是调用handler方法 类似于https://developer.mozilla.org/zh-CN/docs/Web/JavaScript/Reference/Global_Objects/Reflect/apply
const result = ReflectApply(handler, this, args);
// We check if result is undefined first because that
// is the most common case so we do not pay any perf
// penalty
//(●'◡'●) 我们首先检查结果是否未定义,因为最常见的就是这种情况
if (result !== undefined && result !== null) {
addCatch(this, result, type, args); //如果有结果 就捕获
}
} else {
//区别只在于传入的处理函数是以单个的还是数组的形式
const len = handler.length;
const listeners = arrayClone(handler);
for (let i = 0; i < len; ++i) {
const result = ReflectApply(listeners[i], this, args);
// We check if result is undefined first because that
// is the most common case so we do not pay any perf
// penalty.
// This code is duplicated because extracting it away
// would make it non-inlineable.
if (result !== undefined && result !== null) {
addCatch(this, result, type, args);
}
}
}
return true;
};
关于addCatch这个函数 是用来捕获错误的 这个写法也是相当的骚。。。
function addCatch(that, promise, type, args) {
if (!that[kCapture]) {
return;
}
// Handle Promises/A+ spec, then could be a getter
// that throws on second use.
try {
const then = promise.then;
if (typeof then === 'function') {
then.call(promise, undefined, function(err) {
// The callback is called with nextTick to avoid a follow-up
// rejection from this promise.
process.nextTick(emitUnhandledRejectionOrErr, that, err, type, args);
});
}
} catch (err) {
that.emit('error', err);
}
}
once
once的主要实现方式就是绑定一个外层对象 身上有一个属性fired true的时候不执行 并删除监听函数
EventEmitter.prototype.once = function once(type, listener) {
checkListener(listener);
this.on(type, _onceWrap(this, type, listener));
return this;
};
function onceWrapper() {
if (!this.fired) {
this.target.removeListener(this.type, this.wrapFn);
this.fired = true;
if (arguments.length === 0)
return this.listener.call(this.target);
return this.listener.apply(this.target, arguments);
}
}
function _onceWrap(target, type, listener) {
const state = {
fired: false, wrapFn: undefined, target, type, listener };
const wrapped = onceWrapper.bind(state);
wrapped.listener = listener;
state.wrapFn = wrapped;
return wrapped;
}