前言
Promise.all
可以用来并行处理一组 Promise,在 Promise 数量不大的时候是一个很好的方案。
但当 Promise 的数量非常庞大的时候,Promise.all
很容易出现磁盘/网络的 IO 激增,而导致其他 Promise 无法处理,最后整个 Promise.all
一直无法返回。
而解决这钟问题的关键是需要一个限流的 Promise 队列,也就是本文需要讨论的对象。
问题
最近遇到一个问题,用 Promise.all
从 CDN 中下载几百个文件到本地的时候,系统的监控显示已经没有网络 IO 了,但 Promise.all
一直没返回。
代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
var fs = require('fs');
var path = require('path');
var axios = require('axios');
var resources = [ 'url1', 'url2', 'url3' ]; // length is 400
var folder = path.resolve(__dirname, '.');
Promise.all(
resources.map(url => new Promise((resolve, reject) => {
axios({
method: 'GET',
url: url,
responseType: 'stream'
}).then(res => {
res.data
.on('end', resolve)
.on('error', reject)
.pipe(fs.createWriteStream(
path.resolve(folder, path.basename(url))
));
});
}))
).then(console.log).catch(console.error);
|
很明显,这是因为并发量太大,某些 Promise 无法被处理,最后才导致整个 Promise.all
无法返回的。
解决方案
既然不能同时下载所有资源,那就一个一个下载吧。这就是我们要实现的 Promise 队列了。
1
|
type PromiseQueue = Array<() => Promise<any>>;
|
需要注意的是,Promise 队列的类型不能是 Array<Promise<any>>
,这是因为当我们通过 new Promise
来创建 Promise 的时候,Promise 就会马上执行。
因此需要通过调用函数来创建 Promise,这样就能延迟创建 Promise 了。
有了队列之后,我们还需要一个 loadNext
的方法,当执行完上一个 Promise 的时候就去执行下一个 Promise。这个实现起来很简单:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
|
// ...
var queue = [];
var MAX_PARALLEL_COUNT = 10;
var loadingCount = 0;
var loadedCount = 0;
function loadNext() {
if (loadingCount === MAX_PARALLEL_COUNT || queue.length === 0) return;
loadingCount++;
var promiseFn = queue.shift()
promiseFn().then(onLoaded).catch(onLoaded);
}
function onLoaded() {
loadingCount--;
loadedCount++;
console.log('Queue Length: ', queue.length, ' Loaded Count: ', loadedCount, ' Loading Count: ', loadingCount);
if (loadingCount === 0) {
console.log('Done.');
} else {
loadNext();
}
}
for (var i = 0; i < resources.length; i++) {
var url = resources[i];
queue.push(() => new Promise((resolve, reject) => {
axios({
method: 'GET',
url: url,
responseType: 'stream'
}).then(res => {
res.data
.on('end', resolve)
.on('error', reject)
.pipe(fs.createWriteStream(
path.resolve(folder, path.basename(url))
));
});
}));
loadNext()
}
|
通过这种限流的 Promise 队列,我们就可以解决 Promise.all
并发太高而导致的奇怪问题了。
封装
最后封装一下上面的例子:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
|
class PromiseQueue {
constructor(options = {}) {
this.concurrency = options.concurrency || 1;
this._current = 0;
this._list = [];
}
add(promiseFn) {
this._list.push(promiseFn);
this.loadNext();
}
loadNext() {
if (this._list.length === 0 || this.concurrency === this._current) return;
this._current++;
const fn = this._list.shift();
const promise = fn();
promise.then(this.onLoaded.bind(this)).catch(this.onLoaded.bind(this));
}
onLoaded() {
this._current--;
this.loadNext();
}
}
const q = new PromiseQueue();
[1, 2, 3, 4, 5].forEach(v => {
q.add(() => new Promise((resolve) => {
setTimeout(() => {
console.log(v);
resolve();
}, 1000);
}));
});
|
这样,我们就能实现了一个简单限流的 Promise 队列了。
(完)。
文章作者
scarletsky
上次更新
2019-11-08
(9f2fb7f)