简介
EventEmitter 模块是node中经常用到模块,主要是为了实现监听事件。像在koa、express中就经常看到这样的监听事件。
app.on('3000', () => {
console.log('the server is running')
})
因为大多数 Node.js 核心 API 都采用惯用的异步事件驱动架构,所以需要监听某个阶段来告知当前的进程,或者触发一些逻辑事件。
// 普通的触发事件
const EventEmitter = require('events');
class Test extends EventEmitter {}
const test = new Test();
test.on('get', val => {
console.log('触发了get事件!', val);
});
test.emit('get', '123');
// 输出 '触发了get事件! 123'
源码分析
首先先从61行看起,这一行首先定义了3个关键词
EventEmitter.prototype._events = undefined;
EventEmitter.prototype._eventsCount = 0;
EventEmitter.prototype._maxListeners = undefined;
_events:顾名思义是存放的是事件对象组。至于内部结构后面再分析。
_eventsCount:记录存放事件组的个数
_maxListeners:一个监听中的最大监听数
接下来是一些初始化操作
// By default EventEmitters will print a warning if more than 10 listeners are
// added to it. This is a useful default which helps finding memory leaks.
let defaultMaxListeners = 10;
// 初始化:定义defaultMaxListeners参数
Object.defineProperty(EventEmitter, 'defaultMaxListeners', {
enumerable: true, // 可被枚举的
get() {
// 得到最大监听数
return defaultMaxListeners;
},
set(arg) {
// 设置set校验
if (typeof arg !== 'number' || arg < 0 || NumberIsNaN(arg)) {
throw new RangeError(
'The value of "defaultMaxListeners" is out of range. It must be a non-negative number. Received ' +
arg +
'.'
);
}
defaultMaxListeners = arg;
},
});
EventEmitter.init = function() {
// prototype 没有events, 或者 getPrototype 上面没有events(es5)
if (
this._events === undefined ||
this._events === Object.getPrototypeOf(this)._events
) {
// 没有则创建一个空对象
this._events = Object.create(null);
// events数量置为0
this._eventsCount = 0;
}
this._maxListeners = this._maxListeners || undefined;
};
这里主要就是为了初始化上面三项基础数据。这样一个新的实例上的几项属性即为:空的监听对象组、0个监听事件个数、10个最大同时监听数(未初始化前是undefined)。
// Obviously not all Emitters should be limited to 10. This function allows
// that to be increased. Set to zero for unlimited.
EventEmitter.prototype.setMaxListeners = function setMaxListeners(n) {
if (typeof n !== 'number' || n < 0 || NumberIsNaN(n)) {
throw new RangeError(
'The value of "n" is out of range. It must be a non-negative number. Received ' +
n +
'.'
);
}
this._maxListeners = n;
return this;
};
这里即是设置实例中的最大同时监听数,如果参数非数字的话则会抛出错误。
function $getMaxListeners(that) {
if (that._maxListeners === undefined) {
return EventEmitter.defaultMaxListeners;
}
return that._maxListeners;
}
EventEmitter.prototype.getMaxListeners = function getMaxListeners() {
return $getMaxListeners(this);
};
返回当前最大监听数,如果未实例化则返回的是构造函数上的 defaultMaxListeners ,即是 10。如果实例后则返回的是实例上的defaultMaxListeners。
以上都是对Emitter的一些初始化和暴露出实例的初始化操作,这些都是在实例那一步完成的。光靠这些我们是无法断定events是如何实现事件监听的,以及存放的数据结构的,那就接下来继续往下看。
接下来便是暴露的操作方法,其中便有on、emit、once等。首先这里先从on开始看起。
EventEmitter.prototype.on = EventEmitter.prototype.addListener;
这里on指向的是addListener
EventEmitter.prototype.addListener = function addListener(type, listener) {
return _addListener(this, type, listener, false);
};
调用的是_addListener这个方法。这里传入了两个参数,分别是type、listener。
test.on('get', val => {
console.log('触发了get事件!', val);
});
即之前在外面调用的。然后分析这两个参数即是我们传的‘事件名’、‘触发事件’。
而后又调用了_addListener这个方法,将this,‘事件名’,‘触发事件’,一个布尔值传了进去。现在还不知道这个布尔值拿来干嘛,那就接下来往下看_addListener这个方法。
function _addListener(target, type, listener, prepend) {
var m;
var events;
var existing;
if (typeof listener !== 'function') {
throw new TypeError('The "listener" argument must be of type Function. Received type ' + typeof listener);
}
events = target._events;
if (events === undefined) {
events = target._events = Object.create(null);
target._eventsCount = 0;
} else {
// To avoid recursion in the case that type === "newListener"! Before
// adding it to the listeners, first emit "newListener".
if (events.newListener !== undefined) {
target.emit('newListener', type,
listener.listener ? listener.listener : listener);
// Re-assign `events` because a newListener handler could have caused the
// this._events to be assigned to a new object
events = target._events;
}
existing = events[type];
}
if (existing === undefined) {
// Optimize the case of one listener. Don't need the extra array object.
existing = events[type] = listener;
++target._eventsCount;
} else {
if (typeof existing === 'function') {
// Adding the second element, need to change to array.
existing = events[type] =
prepend ? [listener, existing] : [existing, listener];
// If we've already got an array, just append.
} else if (prepend) {
existing.unshift(listener);
} else {
existing.push(listener);
}
// Check for listener leak
m = $getMaxListeners(target);
if (m > 0 && existing.length > m && !existing.warned) {
existing.warned = true;
// No error code for this since it is a Warning
// eslint-disable-next-line no-restricted-syntax
var w = new Error('Possible EventEmitter memory leak detected. ' +
existing.length + ' ' + String(type) + ' listeners ' +
'added. Use emitter.setMaxListeners() to ' +
'increase limit');
w.name = 'MaxListenersExceededWarning';
w.emitter = target;
w.type = type;
w.count = existing.length;
ProcessEmitWarning(w);
}
}
return target;
}
这里先初始化m、events、existing这三个参数。
然后判断触发事件是否为一个function类型,因此在on这一步中第二个参数就必须传一个方法类型,不然就会抛出错误。
下面初始化事件组:
events = target._events
这里即会得到一个对象,对象中存放了触发事件组或空对象(即没有触发值)。
而后又对events作了一系列判断
if (events === undefined) {
events = target._events = Object.create(null);
target._eventsCount = 0;
} else {
// newListener为监听字段,当创建好newListener后会执行newListener监听事件
if (events.newListener !== undefined) {
target.emit(
'newListener',
type,
listener.listener ? listener.listener : listener
);
// 重新注册events 因为 newListener 钩子可能导致this._events 去重新注册个新对象。详情请看EventEmitter.prototype.emit
events = target._events;
}
existing = events[type];
}
从这里即可看出当有时间被挂载上,即触发on事件后, 会触发一个 newListener 事件。因此 newListener 可以用来检测挂载的事件是否正确触发。
这里将上一次这个触发事件赋值给existing这个变量,当然也存在上一次不存在这个事件key值。
if (existing === undefined) {
existing = events[type] = listener;
// 递增调用次数
++target._eventsCount;
} else {
if (typeof existing === 'function') {
// 如果有prepend参数,则现在插入参数放在最前面调用
existing = events[type] = prepend
? [ listener, existing ]
: [ existing, listener ];
// If we've already got an array, just append.
} else if (prepend) {
// 如果是个数组,并且设了prepend,则unshift至数组
existing.unshift(listener);
} else {
existing.push(listener);
}
// 3,校验当前监听数量
m = $getMaxListeners(target);
if (m > 0 && existing.length > m && !existing.warned) {
existing.warned = true;
const w = new Error(
'Possible EventEmitter memory leak detected. ' +
existing.length +
' ' +
String(type) +
' listeners ' +
'added. Use emitter.setMaxListeners() to ' +
'increase limit'
);
w.name = 'MaxListenersExceededWarning';
w.emitter = target;
w.type = type;
w.count = existing.length;
ProcessEmitWarning(w);
}
}
这里则对上面的 existing 做了判断,这里共有两种情况
如果该触发事件没有,则在 this._events 上面进行添加,并且自增一个触发事件对。
如果该触发事件存在,则把这个 之前Function( 触发事件 ) 和现在 Function , 放在一个数组中,这里还用到prepend参数,之前给的布尔值会在这边做出判断到底是push 还是 unshift,不同的放置,后面的触发顺序也会不同。那这里 addListener 方法对 prepend 已经固定是 false 了,也就是说以 on 方法来添加触发事件默认是放在数组后面。那后面也有用来前置的方式:
EventEmitter.prototype.prependListener =
// prepend 调用
function prependListener(type, listener) {
return _addListener(this, type, listener, true);
};
prependListener即可以将同名的触发事件前置。这也是为什么之前将prependListener这个方法提取出来。
从以上可以即可分析出 this._events 对象中的格式:
_events = {
type: Function,
type2: [ Function, Function, ... ],
}
继续看下面:
m = $getMaxListeners(target);
if (m > 0 && existing.length > m && !existing.warned) {
const w = new Error(
'Possible EventEmitter memory leak detected. ' +
existing.length +
' ' +
String(type) +
' listeners ' +
'added. Use emitter.setMaxListeners() to ' +
'increase limit'
);
w.name = 'MaxListenersExceededWarning';
w.emitter = target;
w.type = type;
w.count = existing.length;
ProcessEmitWarning(w);
}
function $getMaxListeners(that) {
// 还未实例化的话则返回构造函数的默认最大监听数
if (that._maxListeners === undefined) {
return EventEmitter.defaultMaxListeners;
}
return that._maxListeners;
}
function ProcessEmitWarning(warning) {
if (console && console.warn) console.warn(warning);
}
在这里检测了当前连接数,如果超过则会抛出警告,但并不会进行拦截触发事件。
最后返回该实例。
接着来看emit触发事件。
EventEmitter.prototype.emit = function emit(type) {
const args = [];
for (var i = 1; i < arguments.length; i++) args.push(arguments[i]);
let doError = (type === 'error');
const events = this._events;
if (events !== undefined) { doError = (doError && events.error === undefined); } else if (!doError) { return false; }
// If there is no 'error' event listener then throw.
if (doError) {
let er;
if (args.length > 0) { er = args[0]; }
if (er instanceof Error) {
// Note: The comments on the `throw` lines are intentional, they show
// up in Node's output if this results in an unhandled exception.
throw er; // Unhandled 'error' event
}
// At least give some kind of context to the user
const err = new Error('Unhandled error.' + (er ? ' (' + er.message + ')' : ''));
err.context = er;
throw err; // Unhandled 'error' event
}
const handler = events[type];
if (handler === undefined) { return false; }
if (typeof handler === 'function') {
ReflectApply(handler, this, args);
} else {
const len = handler.length;
const listeners = arrayClone(handler, len);
for (var i = 0; i < len; ++i) { ReflectApply(listeners[i], this, args); }
}
return true;
};
从这边可以想到之前emit是如何传参的。
test.emit('get', '123');
get即为监听名,123为触发事件的传的参数,其实这里后面可以写多个参数,类似test.emit('get', '123', '456', '789'); 这样除第一个参数都可以在触发器中接收到。
这里首先将所有的参数放置在一个数组中,然后接下来对触发名称为' error ' 的事件做了单独的判断,如果之前自己没有定义' error ' 监听事件的话,EventEmitter 会把这个监听处理成一个异常触发事件,当error被触发时,nodejs就会退出程序并输出错误信息,因此一般吧error事件设置成监听器,避免遇到错误后整个程序崩溃。
然后查看之前的事件对象组是否有这个触发事件,由于之前添加触发事件只会存在Function或数组Function,因此如果存在Function则直接调用ReflectApply方法,如果是个数组的话则去循环执行ReflectApply。
ReflectApply方法:
const R = typeof Reflect === 'object' ? Reflect : null;
const ReflectApply =
R && typeof R.apply === 'function'
?
R.apply
: function ReflectApply(target, receiver, args) {
return Function.prototype.apply.call(target, receiver, args);
};
这里可以看到有个Reflect.apply,其方式就类似于Function.prototype.apply.call,这里可以理解为[ target ].apply(receiver, arg)。
因此这里就是触发实例上的触发事件。
再看移除监听 off 方法:
EventEmitter.prototype.off = EventEmitter.prototype.removeListener;
EventEmitter.prototype.removeListener =
function removeListener(type, listener) {
let list, events, position, i, originalListener;
if (typeof listener !== 'function') {
throw new TypeError('The "listener" argument must be of type Function. Received type ' + typeof listener);
}
events = this._events;
if (events === undefined)
{return this;}
list = events[type];
if (list === undefined)
{return this;}
// 如果匹配到监听事件key: Function
if (list === listener || list.listener === listener) {
if (--this._eventsCount === 0) // 监听事件组的个数减一
// 如果为0 则已经移除所有监听,这里重置监听对象组
{this._events = Object.create(null);}
else {
// 常规删除
delete events[type];
if (events.removeListener) // 触发reomoveListener,可以在外部对其进行监听
{this.emit('removeListener', type, list.listener || listener);}
}
} else if (typeof list !== 'function') {
/* 如果匹配到的是个数组
下面的操作即是删除数组中对应的触发事件
*/
position = -1;
for (i = list.length - 1; i >= 0; i--) {
if (list[i] === listener || list[i].listener === listener) {
originalListener = list[i].listener;
position = i;
break;
}
}
if (position < 0)
{return this;}
if (position === 0)
{list.shift();}
else {
spliceOne(list, position);
}
// 如果还剩唯一一个,则把数组Function 再次转化为 Function
if (list.length === 1)
{events[type] = list[0];}
if (events.removeListener !== undefined)
{this.emit('removeListener', type, originalListener || listener);}
}
return this;
};
在删除中,必须传入监听名和触发函数,这样主要目的就是要精确在一些数组Function中删除指定的触发事件。
once 方法:
EventEmitter.prototype.once = function once(type, listener) {
if (typeof listener !== 'function') {
throw new TypeError('The "listener" argument must be of type Function. Received type ' + typeof listener);
}
this.on(type, _onceWrap(this, type, listener));
return this;
};
function _onceWrap(target, type, listener) {
let state = { fired: false, wrapFn: undefined, target, type, listener };
let wrapped = onceWrapper.bind(state);
wrapped.listener = listener;
state.wrapFn = wrapped;
return wrapped;
}
function onceWrapper() {
let args = [];
for (let i = 0; i < arguments.length; i++) args.push(arguments[i]);
if (!this.fired) {
this.target.removeListener(this.type, this.wrapFn);
this.fired = true;
ReflectApply(this.listener, this.target, args);
}
}
这里用到了闭包,通过onceWrapper.bind(state)挂载了一个返回值为{ [Function: bound onceWrapper] listener: [Function] } 的可执行的对象,如果直接执行该对象可触发 onceWrapper 函数,而 onceWrapper 函数上的this绑定了该触发事件的的一些属性,其中 fired 便这个属性通过第一次执行后将置为true,并且移除了该触发事件。而后面listener则是记录的触发事件,由于这个函数并非常规的Function 或 数组Function ,因此需要一个listener来进行存储,当新增或删除的时候,就需要这个listener来代替。
// 新增
if (events.newListener !== undefined) {
target.emit('newListener', type,
listener.listener ? listener.listener : listener);
}
// 删除
if (events.removeListener)
this.emit('removeListener', type, list.listener || listener);
}
关键的几个就是这些啦,其他的例如removeAllListeners、listenerCount、eventNames都是相对比较简单的,可以从上面的内容进行推导,就不多做分析了。
总结
EventEmitter是属于比较元老级的模块,所以在代码中很多功能都是用原生js实现的,但兼容性是非常好。适合用在异步IO中。对于普通的回调比起来,EventEmitter表现更为独立。源码并不复杂,但是其中的很多思想值得去学习和借鉴。