Promise 并发控制

Promise 并发,往往都直接 Promise.all 但如果我想控制并发为 5 怎么破?

问题分析

因为 Promise 在插件实例的时候,已经开发执行并发任务了。
所以我们没办法在 Promise 实例上做手脚。
那么只能在 Promise 实例之前,限制创建 Promise 实例的速度。

再有,采取什么形式限制,也是个问题。
我一开始想的是队列(queue) 模式。
创建一个异步队列类,统一管理异步队列,这样在添加异步任务的方法上做限制即可。

1
2
3
4
5
6
7
const q = new Queue(2);
q.push(() => promiseTask());
q.push(() => promiseTask());
q.push(() => promiseTask());
q.all()
.then(arr => console.log(arr))
.catch(err => console.log(err));

这个形式是比较容易实现的。
因为我们 push 到队列中的只是普通函数,异步任务并没有执行。
所以我只要控制 普通函数 执行速度即可。
当 all 的时候执行异步任务,当一个任务执行完成后,看看列队是否还有任务,如果有就执行,没有就结束。

列队控制实现

下面是雏形:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
class Queue {
constructor(concurrency = Infinity) {
this.concurrency = concurrency;
this.queue = [];
this.tasks = [];
this.activeCount = 0;
}

push(fn) {
this.tasks.push(new Promise((resolve, reject) => {
const task = () => {
this.activeCount++;
fn()
.then(data => resolve(data))
.catch(err => reject(err))
.then(() => this.next());
};
if (this.activeCount < this.concurrency) {
task();
} else {
this.queue.push(task);
}
}));
}

all() {
return Promise.all(this.tasks);
}

next() {
this.activeCount--;
if (this.queue.length > 0) {
this.queue.shift()();
}
}
}

这里利用 Promise.all 执行所有异步任务,而异步任务是经过处理的,在 tasks 中的任务都是一层代理异步层。
这一层负责限制并发,以及结果处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
function fn(t, err) {
return new Promise((resolve, reject) => {
setTimeout(() => {
err ? reject(err) : resolve(`time: ${t}s.`);
}, t);
});
}

console.time('Queue');
const q = new Queue(2);
q.push(() => fn(1000));
q.push(() => fn(1000));
q.push(() => fn(1000));
q.push(() => fn(1000));
q.all()
.then(console.log)
.then(() => console.timeEnd('Queue'));

这段代码执行耗时是 2秒 完全符合我们异步控制结果。

错误处理

但由于是 Promise.all 处理的,所以错误捕获的问题上,依然存在之前提到的 Promise.all 丢弃全部结果问题。

1
2
3
4
5
6
7
8
9
10
11
console.time('Queue');
const q = new Queue(2);
q.push(() => fn(1000));
q.push(() => fn(1000));
q.push(() => fn(1000));
q.push(() => fn(1000));
q.push(() => fn(200, 'error'));
q.all()
.then(console.log)
.catch(err => console.log('err', err))
.then(() => console.timeEnd('Queue'));

在 2.2s 的时候执行了错误,导致异步结果全部被丢弃。

1
2
3
4
5
6
7
8
9
10
11
console.time('Queue');
const q = new Queue(2);
q.push(() => fn(1000));
q.push(() => fn(1000));
q.push(() => fn(1000));
q.push(() => fn(1000));
q.push(() => fn(200, 'error').catch(e => e));
q.all()
.then(console.log)
.catch(err => console.log('err', err))
.then(() => console.timeEnd('Queue'));

需要通过 catch 去和谐掉这错误。
虽然想过在内部加上这个题处理,但其实还是用户自己处理会比较合适,否则不方便统一错误上报等事情。

这里还有一个很大的隐患,就是没有用 Promise.try 包裹用户方法,这会导致一些问题。
不过为了简单起见,demo 代码这样比较容易阅读。

小结

如果你是 sindresorhus 大神的粉丝,你会发现这其实就是 p-queue 的核心概念。
其实 sindresorhus 大神还有另一个模块,更是颠覆了我对异步控制的局限认知 p-limit

p-limit 原理其实非常简单,跟 Queue 做了一样的事情,只是函数化的维护了这个列队。
但调用方式却做了很大突破。

(流下了没有技术的泪水。。)