JavaScript高级并发模式—信号量、读写锁与生产级任务调度器|新宇宙博客const
await
Promise
all
map
url =>
fetch
并发控制的目标
同时运行中的任务数 ≤ concurrency 限制
↑
任务队列(等待中)
限速并发调度器(Scheduler)
核心实现
class AsyncScheduler {
#queue = [];
#running = 0;
#concurrency;
#paused = false;
#stats = { completed: 0, failed: 0, queued: 0 };
constructor(concurrency = 5) {
this.#concurrency = concurrency;
}
add(taskFn, { priority = 0 } = {}) {
return new Promise((resolve, reject) => {
const task = { taskFn, resolve, reject, priority, addedAt: Date.now() };
const insertIdx = this.#queue.findIndex(t => t.priority < priority);
if (insertIdx === -1) {
this.#queue.push(task);
} else {
this.#queue.splice(insertIdx, 0, task);
}
this.#stats.queued++;
this.#dispatch();
});
}
#dispatch() {
if (this.#paused) return;
while (this.#running < this.#concurrency && this.#queue.length > 0) {
const task = this.#queue.shift();
this.#stats.queued--;
this.#run(task);
}
}
async #run(task) {
this.#running++;
try {
const result = await task.taskFn();
task.resolve(result);
this.#stats.completed++;
} catch (err) {
task.reject(err);
this.#stats.failed++;
} finally {
this.#running--;
this.#dispatch();
}
}
pause() { this.#paused = true; }
resume() { this.#paused = false; this.#dispatch(); }
clear() {
const count = this.#queue.length;
this.#queue.forEach(t => t.reject(new Error('队列已清空')));
this.#queue = [];
this.#stats.queued = 0;
return count;
}
get stats() {
return {
...this.#stats,
running: this.#running,
queued: this.#queue.length,
};
}
async drain() {
if (this.#running === 0 && this.#queue.length === 0) return;
await new Promise(resolve => {
const check = () => {
if (this.#running === 0 && this.#queue.length === 0) {
resolve();
} else {
setTimeout(check, 10);
}
};
check();
});
}
}
const scheduler = new AsyncScheduler(3);
const urls = Array.from({ length: 20 }, (_, i) => `/api/item/${i}`);
const results = await Promise.allSettled(
urls.map(url => scheduler.add(() => fetch(url).then(r => r.json())))
);
console.log(scheduler.stats);
批量处理工具函数
async function batchProcess(items, handler, concurrency = 5) {
const results = new Array(items.length);
const errors = [];
let index = 0;
async function worker() {
while (index < items.length) {
const i = index++;
try {
results[i] = await handler(items[i], i);
} catch (e) {
errors.push({ index: i, item: items[i], error: e });
results[i] = null;
}
}
}
await Promise.all(
Array.from({ length: Math.min(concurrency, items.length) }, worker)
);
return { results, errors };
}
const { results, errors } = await batchProcess(
Array.from({ length: 100 }, (_, i) => i),
async (id) => {
const resp = await fetch(`/api/users/${id}`);
if (!resp.ok) throw new Error(`HTTP ${resp.status}`);
return resp.json();
},
10
);
console.log(`成功: ${results.filter(Boolean).length}, 失败: ${errors.length}`);
任务队列与优先级调度
优先级堆队列
class PriorityQueue {
#heap = [];
#compare;
constructor(compare = (a, b) => a.priority - b.priority) {
this.#compare = compare;
}
push(item) {
this.#heap.push(item);
this.#siftUp(this.#heap.length - 1);
}
pop() {
const top = this.#heap[0];
const last = this.#heap.pop();
if (this.#heap.length > 0) {
this.#heap[0] = last;
this.#siftDown(0);
}
return top;
}
peek() { return this.#heap[0]; }
get size() { return this.#heap.length; }
#siftUp(i) {
while (i > 0) {
const parent = (i - 1) >> 1;
if (this.#compare(this.#heap[i], this.#heap[parent]) < 0) {
[this.#heap[i], this.#heap[parent]] = [this.#heap[parent], this.#heap[i]];
i = parent;
} else break;
}
}
#siftDown(i) {
const n = this.#heap.length;
while (true) {
let smallest = i;
const l = 2 * i + 1, r = 2 * i + 2;
if (l < n && this.#compare(this.#heap[l], this.#heap[smallest]) < 0) smallest = l;
if (r < n && this.#compare(this.#heap[r], this.#heap[smallest]) < 0) smallest = r;
if (smallest !== i) {
[this.#heap[i], this.#heap[smallest]] = [this.#heap[smallest], this.#heap[i]];
i = smallest;
} else break;
}
}
}
class ReactStyleScheduler {
static Priority = {
IMMEDIATE: 1,
USER_BLOCK: 2,
NORMAL: 3,
LOW: 4,
IDLE: 5,
};
static #timeouts = {
1: -1,
2: 250,
3: 5000,
4: 10000,
5: Infinity,
};
#queue = new PriorityQueue((a, b) => a.expirationTime - b.expirationTime);
schedule(callback, priority = ReactStyleScheduler.Priority.NORMAL) {
const timeout = ReactStyleScheduler.#timeouts[priority];
const expirationTime = performance.now() + timeout;
const task = { callback, priority, expirationTime };
this.#queue.push(task);
this.#requestFlush();
return task;
}
#flushRequested = false;
#requestFlush() {
if (this.#flushRequested) return;
this.#flushRequested = true;
MessageChannel && new MessageChannel().port1.postMessage(null);
}
flush(deadline = Infinity) {
const start = performance.now();
this.#flushRequested = false;
while (this.#queue.size > 0) {
const now = performance.now();
const task = this.#queue.peek();
if (task.expirationTime <= now || now - start < deadline) {
this.#queue.pop();
task.callback();
} else {
this.#requestFlush();
break;
}
}
}
}
取消机制(AbortController)
AbortController 基础
const controller = new AbortController();
const { signal } = controller;
try {
const response = await fetch('/api/data', { signal });
const data = await response.json();
} catch (e) {
if (e.name === 'AbortError') {
console.log('请求被取消');
} else {
throw e;
}
}
controller.abort();
controller.abort(new Error('用户取消操作'));
超时自动取消
const signal = AbortSignal.timeout(5000);
try {
const resp = await fetch('/api/slow', { signal });
} catch (e) {
if (e.name === 'TimeoutError') {
console.log('请求超时');
}
}
const userCancel = new AbortController();
const timeout = AbortSignal.timeout(5000);
const combinedSignal = AbortSignal.any([userCancel.signal, timeout]);
链式取消传播
class CancellableTask {
#controller = new AbortController();
get signal() { return this.#controller.signal; }
cancel(reason) {
this.#controller.abort(reason);
}
async run(asyncFn) {
if (this.#controller.signal.aborted) {
throw this.#controller.signal.reason ?? new Error('已取消');
}
return new Promise((resolve, reject) => {
this.#controller.signal.addEventListener('abort', () => {
reject(this.#controller.signal.reason ?? new Error('操作被取消'));
}, { once: true });
asyncFn(this.#controller.signal).then(resolve).catch(reject);
});
}
}
const task = new CancellableTask();
const resultPromise = task.run(async (signal) => {
const resp = await fetch('/api/data', { signal });
if (signal.aborted) throw new Error('计算被取消');
return resp.json();
});
setTimeout(() => task.cancel(new Error('用户点击取消')), 2000);
try {
const result = await resultPromise;
} catch (e) {
console.log('任务失败:', e.message);
}
指数退避重试器
class RetryFetcher {
#options;
constructor({
maxRetries = 3,
baseDelay = 1000, // 初始延迟 1s
maxDelay = 30000, // 最大延迟 30s
factor = 2, // 退避因子(每次乘以 2)
jitter = true, // 是否添加随机抖动(防止雷同重试)
retryOn = (e) => true, // 哪些错误需要重试
} = {}) {
this.#options = { maxRetries, baseDelay, maxDelay, factor, jitter, retryOn };
}
async fetch(url, fetchOptions = {}) {
const { maxRetries, baseDelay, maxDelay, factor, jitter, retryOn } = this.#options;
let attempt = 0;
let lastError;
while (attempt <= maxRetries) {
if (fetchOptions.signal?.aborted) {
throw fetchOptions.signal.reason ?? new Error('请求被取消');
}
try {
const response = await fetch(url, fetchOptions);
if (!response.ok) {
if (response.status < 500 && response.status !== 429) {
throw new Error(`HTTP ${response.status}`);
}
throw new Error(`HTTP ${response.status} (重试中)`);
}
return response;
} catch (e) {
lastError = e;
if (attempt >= maxRetries || !retryOn(e)) {
break;
}
const delay = this.#calcDelay(attempt, { baseDelay, maxDelay, factor, jitter });
console.log(`第 ${attempt + 1} 次重试,等待 ${delay}ms...`);
await this.#sleep(delay, fetchOptions.signal);
attempt++;
}
}
throw lastError;
}
#calcDelay(attempt, { baseDelay, maxDelay, factor, jitter }) {
let delay = Math.min(baseDelay * Math.pow(factor, attempt), maxDelay);
if (jitter) {
delay = delay * (0.75 + Math.random() * 0.5);
}
return Math.floor(delay);
}
#sleep(ms, signal) {
return new Promise((resolve, reject) => {
if (signal?.aborted) return reject(signal.reason);
const id = setTimeout(resolve, ms);
signal?.addEventListener('abort', () => {
clearTimeout(id);
reject(signal.reason ?? new Error('等待被取消'));
}, { once: true });
});
}
}
const fetcher = new RetryFetcher({
maxRetries: 3,
baseDelay: 500,
retryOn: (e) => !e.message.startsWith('HTTP 4'),
});
const controller = new AbortController();
const resp = await fetcher.fetch('/api/data', { signal: controller.signal });
可取消 Promise
function createCancellablePromise(executor) {
let cancel;
const innerPromise = new Promise(executor);
const cancellablePromise = new Promise((resolve, reject) => {
cancel = (reason = new Error('Promise 被取消')) => {
reject(reason);
};
innerPromise.then(resolve, reject);
});
cancellablePromise.cancel = cancel;
return cancellablePromise;
}
function makeCancellable(promise) {
let isCancelled = false;
const wrappedPromise = new Promise((resolve, reject) => {
promise.then(
value => isCancelled ? reject({ isCancelled: true }) : resolve(value),
error => isCancelled ? reject({ isCancelled: true }) : reject(error)
);
});
return {
promise: wrappedPromise,
cancel() { isCancelled = true; },
};
}
function fetchData(id, signal) {
const { promise, cancel } = makeCancellable(
fetch(`/api/data/${id}`, { signal }).then(r => r.json())
);
promise.then(data => {
setData(data);
}).catch(e => {
if (!e.isCancelled) setError(e);
});
return cancel;
}
手写验证:完整调度系统
class FullScheduler {
#concurrency;
#running = new Set();
#queue = [];
#abortMap = new Map();
#taskId = 0;
constructor(concurrency = 5) {
this.#concurrency = concurrency;
}
submit(taskFn, {
priority = 0,
maxRetries = 0,
timeout = 0,
onProgress = null,
} = {}) {
const id = this.#taskId++;
const outerController = new AbortController();
this.#abortMap.set(id, outerController);
const promise = new Promise((resolve, reject) => {
const task = {
id,
priority,
retries: 0,
maxRetries,
timeout,
onProgress,
execute: async () => {
const signals = [outerController.signal];
if (timeout > 0) signals.push(AbortSignal.timeout(timeout));
const signal = AbortSignal.any(signals);
let attempt = 0;
while (true) {
try {
const result = await taskFn(signal, (progress) => onProgress?.(id, progress));
resolve(result);
break;
} catch (e) {
if (e.name === 'AbortError' || e.name === 'TimeoutError') {
reject(e);
break;
}
if (attempt >= maxRetries) {
reject(e);
break;
}
const delay = 500 * Math.pow(2, attempt++) * (0.5 + Math.random() * 0.5);
await new Promise(r => setTimeout(r, delay));
}
}
this.#abortMap.delete(id);
},
};
const idx = this.#queue.findIndex(t => t.priority < priority);
this.#queue.splice(idx === -1 ? this.#queue.length : idx, 0, task);
this.#tick();
});
promise.taskId = id;
promise.cancel = (reason) => this.cancel(id, reason);
return promise;
}
cancel(taskId, reason = new Error('任务被取消')) {
const ctrl = this.#abortMap.get(taskId);
if (ctrl) {
ctrl.abort(reason);
this.#abortMap.delete(taskId);
}
this.#queue = this.#queue.filter(t => t.id !== taskId);
}
cancelAll() {
for (const [id] of this.#abortMap) this.cancel(id);
}
#tick() {
while (this.#running.size < this.#concurrency && this.#queue.length > 0) {
const task = this.#queue.shift();
this.#running.add(task.id);
task.execute().finally(() => {
this.#running.delete(task.id);
this.#tick();
});
}
}
get status() {
return {
running: this.#running.size,
queued: this.#queue.length,
};
}
}
const scheduler = new FullScheduler(3);
const tasks = Array.from({ length: 10 }, (_, i) => {
const isHigh = i < 2;
return scheduler.submit(
async (signal, reportProgress) => {
for (let step = 0; step < 5; step++) {
if (signal.aborted) throw new Error('cancelled');
await new Promise(r => setTimeout(r, 100));
reportProgress((step + 1) / 5);
}
return `任务 ${i} 完成`;
},
{
priority: isHigh ? 10 : 0,
maxRetries: 2,
timeout: 3000,
onProgress: (id, p) => console.log(`任务 ${i} 进度: ${(p * 100).toFixed(0)}%`),
}
);
});
setTimeout(() => {
tasks[5].cancel(new Error('用户取消'));
tasks[6].cancel(new Error('用户取消'));
}, 2000);
const results = await Promise.allSettled(tasks);
results.forEach((r, i) => {
if (r.status === 'fulfilled') console.log(r.value);
else console.log(`任务 ${i} 失败:`, r.reason.message);
});
深度追问
Q1:Promise.all 和限速并发调度器各自适用什么场景?
Promise.all:任务数量少(< 10)且服务器能承受时,代码简洁直接。
限速调度器:任务数量大、需要保护服务器、需要进度上报、需要优先级控制、需要支持取消时使用。本质上是生产者-消费者模式:add() 是生产,Worker 池是消费,队列是缓冲。
Q2:AbortController 如何实现"取消已完成的 Promise"?
无法取消。AbortController.abort() 发出信号,但 Promise 状态一旦 settled(fulfilled/rejected)就不可变。已完成的 fetch 无法"撤销"。取消的本质是:在 Promise resolve/reject 之前,通过 signal 通知 fetch 中止网络请求,让 Promise 以 AbortError 被 reject。
Q3:为什么需要在退避延迟中加入随机抖动(Jitter)?
多个客户端同时失败(如服务器重启),若都用固定退避时间(1s, 2s, 4s...),它们会同步重试,每次退避后同时冲击服务器,形成周期性峰值(惊群效应)。
加入随机抖动后,重试时间分散,服务器收到的请求更平滑。AWS 文档推荐"Full Jitter"策略:random(0, min(maxDelay, base * 2^attempt))。
Q4:如何判断一个任务是"IO 密集"还是"CPU 密集",对调度策略有何影响?
- IO 密集(fetch、IndexedDB、文件读写):大部分时间等待,调度器 concurrency 可以设高(10~50),因为等待期间不占 CPU。
- CPU 密集(加密、排序、图像处理):持续占用 CPU,调度器 concurrency 应等于核心数(
navigator.hardwareConcurrency),超出则产生上下文切换开销;或直接用 Web Worker 池,彻底移出主线程。