这章十分不好写,因为必要要结合实际应用场景,否则读者并不能深切感受到异步队列在业务中导致的一些问题,我见过太多不懂异步队列的同学,在debug过程中对数据流向不明所以,导致会问为什么我有什么可以获取到数据,有时候获取不到,再者更好使用 canvas 时不知道 new Image 是异步操作而发生不知道为什么画不上图片的疑问,虽说这一章是放在了 node 中,但是只要你是使用 js 来写代码就无法逃出异步队列的场景,原因很简单,有以下几点:
- js 为了保证其优秀的体验,使用了全异步编程,这就导致出现很多回调函数来保证异步完成后的操作,这是这门语言的特性,也是 node 适合处理并发的原因之一,依然如此我们就一定会产生数据依赖的问题;
- js 基于宿主环境会有不同的事件循环机制,但是目前主要以 浏览器 与 node 环境为主,两者的事件循环机制大体上是一样的,只是 node 比 浏览器多了如 nextTick、setImmediate 等用于当前线程相关的生命周期,而浏览器也因 H5 有了 Mutation Observer 这样的异步监听 dom 更新的 api,但两者的事件循环的核心是换汤不换药的,这就导致了编写过程中存在很多异步 api,如上述我说到的创建 image 对象、setTimeout 等;
- 多个异步请求导致数据依赖的问题,即下个请求依赖于上个请求的响应结果,我们必须建立异步队列以确保上个请求完成后才能处理下个请求;
- 在 node 中操纵数据库时,绝大多数都是异步的过程,此时我们常常需要进行
查找 => 更新 => 重查找
过程,比如点赞的业务过程,我们必须先确定用户是否已点过赞(点过一般肯定不能再点),其次再进行点赞的插入或更新,最后再次查询完成此次操作之后的总点赞数(重新查找是必须的,因为会存在并发),在这整个过程中,node对整个表的操作过程实际都是异步的,但每一步都依赖于上一步的完成,这就是一个最鲜活的异步队列的例子了,更不用说更复杂的如权限判断操作等业务了;
从上述的几个理论到实际业务的例子,大家现在应该明白异步队列的重要性了,在异步编程中异步队列是一个从古至今的大问题,但如今,我们已经有了很多解决方案,本章就是围绕着这个问题给大家介绍了多种解决方案,下面我会告诉各位有哪些解决方案,但是不会每个方案都详解,但是都会提到,有兴趣各位自己去看看对应的包即可。
一、node 自带的 events 模块实现订阅发布模式的异步调用
node 自身核心 API 本来就是基于事件的一套架构,所以自然提供了基于事件监听的方法,即使没使用过 events 模块,不过我相信各位在使用流形式去读取文件或原生 http 模块时一定也看到过 res.on('data', (chunk) => {...}) res.on('end', () => {...}
这样的监听方式,你猜的没错,fs.ReadStream 等 api 都在处理流对象时都会使用到事件模块,下面给大家上一个简单的 demo
// 这是简单的一个计和应用
// ES6 后我们不需要像书中一样使用 util.inherits 去实现继承,而转为使用 ES6 声明子类继承的方式
class MyEmitter extends EventEmitter {}
const myEmitter = new MyEmitter
// on 订阅事件及监听
myEmitter.on('count', (num) => {
console.log(`和为${num}`)
})
let arr = [1, 2, 3, 4, 5]
// 发布事件,并传入所需参数
myEmitter.emit('count', arr.reduce((lastVal, item) => {
return lastVal + item
}))
上面是一个最简单的使用 events 模块的过程,在实际场景中,我们会将触发放在异步函数内,比如 fs.readFile(path[, options], (err, data) => { myEmitter.emit(eventName) })
中,去实现异步过程中控制何时发布事件,继而在发布事件的回调去抽向出对应的业务代码,避免回调地狱的出现
在一般情况下,我们的事件应该是一对多的方式去反复触发,但是经常也有多对一的情况,比如渲染视图时,我们需要将模板、绑定数据、外部资源全部读取完毕后才去触发事件开始渲染,聪明的同学可能已经想到建立一个哨兵变量去在某个函数中去记录3者的完成即可
// count 为哨兵变量
var count = 0, results = {}
myEmitter.on('done', (key, val) => {
results[key] = val
count++
if (count >= 3) {
myEmitter.emit('render', results)
}
})
myEmitter.on('render', (results) => {
render(results)
})
// 伪代码
db.query(sql, (err, data) => {
myEmitter('done', 'data', data)
})
fs.readFile(templatePath, 'utf8', (err, template) => {
myEmitter('done', 'template', template)
})
request.get(sourcePath, (err, res, body) => {
myEmitter('done', 'resources', body)
})
但是敏感的同学可能早已发现,这里的哨兵变量是一个全局变量,在稍微复杂的业务中,它是有可能被污染的,所以我们最好能将它抽成局部变量,所以我们可以将 done 函数改造为
let after = (times, callback) => {
var count = 0, results = {}
return (key, value) => {
results[key] = val
count++
if (count >= times) {
callback(results)
}
}
}
let done = after(3, (results) => {
myEmitter.emit('render', results)
})
myEmitter.on('done', done)
这里我们不仅使用闭包实现了缓存,而且也实现了局部作用域,并将函数与业务分离,降低了函数耦合性。
书中还介绍了利用 once 来解决雪崩现象,本质就是利用一次性事件,来解决缓存失效时,多次重复查询静态数据或静态资源的情况。第一次查询成功后将被缓存,下次出现同一查询时直接返回数据,而不进行查询,除非缓存再次失效,需要注意的是要确定缓存失效才进行一次性事件的订阅,否则可能会引发侦听器过多的警告,为了避免警告,我们可以设置 setMaxListeners(0)
来移除警告。
如果觉得这个 done 函数比较烦,可以使用 朴灵 老师自己写的 EventProxy 模块,提供了一个 all 方法来实现 done 中的操作,即判断所有需要触发的函数都触发后才执行回调,例如
proxy.all('template', 'data', 'resources', (template, data, resources) => {...})
后续的 after、tail、any 还有错误处理等方法不做描述,有兴趣大家可以自己去研究下,这个模块在前端也可使用。
二、最经典的 Promise/Deferred 模式
首先,promise 的用法我想应该不用我多讲,如果这都不会的话,还是劝各位暂时别看这本书了。。。姑且放上一个学习链接吧
Promise初了解
书中是利用 events 模块实现了 Promises/A 规范的原理,但是我们知道 A 的then 方法是这样的 promise().then(onResovle, onReject)
也就是说成功和失败都在一个 thenable api 里,但是我们平常用的是符合链式职责链调用模式的,其中任何一链错误都会使其状态变为 reject 然后触发 catch 去做错误处理,这样的模式规范是另一个规范 Promise/A+ ,它不是由 CommonJs 规范提出的,在这里你可以看到并没有 CommonJs规范 ,所以其实它是由 Promises/A+ 这个组织提出的,对同名。。。你想的没错,npm 中有一个很出名的异步解决方案包 bluebird 就是完全基于此,它也更贴近我们前端使用的方式,所以接下里我会写个 Promise/A+ 的简单实现,不具有代表性,只是单纯满足规范的简单实现,如果需要查看完整规范,点这里
Promise/A+规范中文翻译,详细每一条地址在底部
A 与 A+ 的规范要求是一样的,其次 A 比 A+ 简单,大家尚可放心,首先基本实现需要满足下面几点规范
- 在3种状态 pending、fullfilled、rejected 之间互相转变
- 每次的返回必须是 thenable 即一个 Promise.resolve() 对象,可以通俗的理解为可 then 的对象
- 链式调用且可通过 exception 中断(throw 抛出的值),并且会给出 reason ,即失败的信息
为了让大家快速理解下面简单实现的轮子,我先跟大家说一下思路
- 要保证能链式调用 then,所以我们 then 函数返回的一定要为原型链中存在 then 的函数或对象
- 因为 promise 主函数中可能存在异步,所以我们后续 then 中的函数一定要放在一个队列中 延迟执行,以保证异步结束 resolve() 时,后续 then 的链式调用能够顺序执行且会延迟在初始的 resolve 调用后执行,为此我们必须加入 完成态 概念来判断,在异步未进行时将后续 then 中的东西存入队列中
- 为了保证下个 then 中函数的参数是上个的返回值,所以我们得引入一个 中间promise 的概念来保证既能维持 链式调用的功能 又能 保存上个的返回值注入下个 then 中
- 轮子中会有注释,看明白轮子的前提是明白 闭包 与 事件循环 的概念
- 如若一次性看不懂,可查看这篇美团早年的文章,是一步步解释的 剖析 Promise 之基础篇
- 扩展更多的功能,建议引入 bluebird ,遵从 Promise/A+ 规范
function Promise (fn) {
var state = 'pending', deferreds = [], value = null
this.then = function (onFulfilled, onRejected) {
// bridge promise(关键性闭包)
return new Promise((resolve, reject) => {
handler({
resolve: resolve,
reject: reject,
onFulfilled: onFulfilled || null,
onRejected: onRejected || null
})
})
}
function handler (deferred) {
// 要点:这里的 pending 状态是必须的,因为如果传入的 fn 中有异步 api 并在里面才调用 resolve,此时需要将主函数 then 的后续操作推入队列
// 并且只有第一次调用 resolve 时才为 pending 状态,此时推入 deferreds 队列,后续执行 bridge promise 时状态都已为 fulfilled 会直接
if (state === 'pending') {
deferreds.push(deferred)
return ;
}
var cb = state === 'fulfilled' ? deferred.onFulfilled : deferred.onRejected, ret;
// 当第一次既没有调用 resolve 也没有 reject 时,直接当错误情况处理
if (cb === null) {
cb = state === 'fulfilled' ? deferred.onFulfilled : deferred.onRejected;
cb(value)
return ;
}
// 此时 value 已经为用户上次调用 resolve 时赋值的 value 了
var ret = cb(value)
// 将用户上次返回的值执行一次 resolve 去改变 value 的值,使队列中下一链的 onFulfilled 被执行时的 value 为上次的返回值
deferred.resolve(ret)
}
// 点睛之笔
function resolve (newValue) {
// 要点一:用户第一次是因为 new Promise 的 resolve 依赖注入被调用时而进入,如果不被调用 resolve() 则不会执行下面步骤,链式调用断裂,若调用则会改变状态为完成态 fulfilled 并开始执行 deferreds 队列
// 要点二*:由于此时是因为 bridge promise 进入,那么用户在这个 then 中传入要被执行的 onFulfilled 那一瞬间,就产生了一个闭包,闭包就是这个 bridge Promise,这个闭包中执行了 handler 这个内部方法
// 注意这里是个闭包,内部每个 deferreds 都是独立的,也就是说每次执行 bridge promise 时都会有一个新的 Promise 产生,即会有新的 deferreds 队列,每次 deferred.resolve(ret) 都会去执行内部的 deferreds 队列
// 所以不出意外,一般 deferreds 的长度都为 1,但是每个都是新的 macro task,所以一定会顺序执行
// 要点三:我们有可能在决定执行 resolve 时传入普通函数或对象,函数此时不会被有效执行, 除非函数的原型链中含有 then,则调用一次 then 判定为 bridge promise,计入本链中
// 如果传入为对象,则有可能为 new Promise 对象,处理同上
if (newValue && (typeof newValue === 'object' || typeof newValue === 'function')) {
var then = newValue.then
if (typeof then === 'function') {
then.call(newValue, resolve)
return ;
}
}
// 无论此处是 bridge promise 还是用户第一次主动调用 resolve,都会重新赋值 value,如果用户没有返回值,则 value 为 undefined
value = newValue
state = 'fulfilled'
finale();
}
function reject (reason) {
state = 'rejected'
value = reason
finale()
}
function finale () {
// 延迟执行 deferreds 队列,先去执行主函数,即后面的 then,如果 fn 内有异步,则完成后才在这里执行最后的 macro task setTimeout,去顺序调用 handler(第一次 resolve 中为异步的话,此时 deferreds 中应都为 handler的参数)
// 每次 handler 都会执行 resolve 去改变 value 为上一链的返回值,以保证本次执行 onFulfilled 时的参数是上次的返回值
setTimeout(function () {
deferreds.forEach(function (deferred) {
handler(deferred)
})
}, 0)
}
// 像构造函数中注入 resolve 依赖
fn(resolve, reject)
}
三、async 模块
- 书中所指的是这个 async ,大家如果觉得 ES7 的 async await 不够用可以去看看这个;
- ES7 async await 需要注意的是异步函数必须返回的是 thunk 函数或者 promise 对象,所以一般封装都会封装成 promise,这需要对整体有一定的规范才行,若不经处理,直接
return
,则会直接相当于返回promise.resolve(arg)
的形式,但是如果想调用出 promise 的 catch 则直接在 async 函数中await Promise reject(err)
即可,不需要return
并且它会中断后续的操作,直接 catch,具体请看
阮一峰 - async 函数 - ES7 的 async await 是基于生成器 Generator 封装的,大家可以了解下,并且可以看下它的集大成者(因为算是吧,逃~) co
四、Step 模块
不多说了,上个地址,大家自己了解吧~
step
五、异步并发控制
简而言之就是在并发场景下控制异步过载的问题,作者简单介绍了利用一个队列存储并发的异步,用于限制后续进入的并发,从而控制并发量
然而我个人认为这是一个业务场景下才能真正实践的问题,我无法扩展开来详解,因为我也没有这样的经验,所以这一节我就不详解了,实在是怕误导大家。
本章详解了 event 模块与 Promise 的简单实现,以及推荐使用语法更简洁的 async await,但后者更需要了解 Promise 的使用方法才能更好的驾驭,所以本章重点我个人认为是 Promise 以及各类异步解决方案的异步队列思路
没错,异步队列的实现思路才是关键!