返回列表高级并发与调度流控制
系统讲解异步任务并发边界控制、队列缓冲机制、动态任务挂起与自动出队、取消机制及生产级调度系统架构
高级并发与调度流控制
当异步操作的数量超出系统承载能力时,需要精确的并发控制和调度策略。本文深入探讨信号量、背压(backpressure)、优先级队列、令牌桶等并发原语在 JavaScript 中的实现,帮助构建高性能、不崩溃的异步系统。
目录
- 并发模型基础
- 信号量与互斥锁
- 背压控制
- 优先级任务调度
- 令牌桶限流
- 并发模式:扇入扇出
- 错误隔离与重试策略
- 实战案例
- 深度追问
- 总结表格
1. 并发模型基础
JavaScript 虽然是单线程,但通过异步 I/O 可以发起大量并发操作:
async function naiveFetchAll(urls) {
const results = await Promise.all(
urls.map( (url).( r.()))
);
results;
}
JavaScript高级并发控制与任务调度—从队列缓冲到优先级调度|新宇宙博客url =>
fetch
then
r =>
json
return
2. 信号量与互斥锁
class Semaphore {
#permits;
#queue = [];
constructor(permits) {
this.#permits = permits;
}
async acquire() {
if (this.#permits > 0) {
this.#permits--;
return;
}
await new Promise(resolve => this.#queue.push(resolve));
}
release() {
if (this.#queue.length > 0) {
const resolve = this.#queue.shift();
resolve();
} else {
this.#permits++;
}
}
async withPermit(fn) {
await this.acquire();
try {
return await fn();
} finally {
this.release();
}
}
get availablePermits() { return this.#permits; }
get waitingCount() { return this.#queue.length; }
}
class Mutex extends Semaphore {
constructor() { super(1); }
async lock() { return this.acquire(); }
unlock() { this.release(); }
}
const semaphore = new Semaphore(5);
const urls = Array.from({ length: 100 }, (_, i) => `/api/${i}`);
const results = await Promise.all(
urls.map(url => semaphore.withPermit(() => fetch(url)))
);
读写锁
class ReadWriteLock {
#readers = 0;
#writer = false;
#readQueue = [];
#writeQueue = [];
async acquireRead() {
if (!this.#writer && this.#writeQueue.length === 0) {
this.#readers++;
return;
}
await new Promise(resolve => this.#readQueue.push(resolve));
}
releaseRead() {
this.#readers--;
if (this.#readers === 0 && this.#writeQueue.length > 0) {
this.#writer = true;
this.#writeQueue.shift()();
}
}
async acquireWrite() {
if (!this.#writer && this.#readers === 0) {
this.#writer = true;
return;
}
await new Promise(resolve => this.#writeQueue.push(resolve));
}
releaseWrite() {
this.#writer = false;
if (this.#readQueue.length > 0) {
const readers = this.#readQueue.splice(0);
this.#readers = readers.length;
readers.forEach(resolve => resolve());
} else if (this.#writeQueue.length > 0) {
this.#writer = true;
this.#writeQueue.shift()();
}
}
}
3. 背压控制
class BackpressureChannel {
#buffer = [];
#maxSize;
#producers = [];
#consumers = [];
constructor(maxSize = 10) {
this.#maxSize = maxSize;
}
async send(value) {
if (this.#consumers.length > 0) {
const consumer = this.#consumers.shift();
consumer(value);
return;
}
if (this.#buffer.length >= this.#maxSize) {
await new Promise(resolve => this.#producers.push(resolve));
}
this.#buffer.push(value);
}
async receive() {
if (this.#buffer.length > 0) {
const value = this.#buffer.shift();
if (this.#producers.length > 0) {
const producer = this.#producers.shift();
producer();
}
return value;
}
return new Promise(resolve => this.#consumers.push(resolve));
}
get size() { return this.#buffer.length; }
get isFull() { return this.#buffer.length >= this.#maxSize; }
}
const channel = new BackpressureChannel(5);
async function producer() {
for (let i = 0; i < 100; i++) {
await channel.send(i);
console.log(`Produced: ${i}, buffer: ${channel.size}`);
}
}
async function consumer() {
for (let i = 0; i < 100; i++) {
const value = await channel.receive();
await new Promise(r => setTimeout(r, 100));
console.log(`Consumed: ${value}`);
}
}
producer();
consumer();
4. 优先级任务调度
class PriorityScheduler {
#queues = new Map();
#running = 0;
#concurrency;
constructor(concurrency = 3) {
this.#concurrency = concurrency;
}
schedule(taskFn, priority = 'normal') {
const priorities = ['critical', 'high', 'normal', 'low', 'idle'];
if (!priorities.includes(priority)) {
throw new Error(`Invalid priority: ${priority}`);
}
return new Promise((resolve, reject) => {
if (!this.#queues.has(priority)) {
this.#queues.set(priority, []);
}
this.#queues.get(priority).push({ taskFn, resolve, reject });
this.#processNext();
});
}
#getNextTask() {
const priorities = ['critical', 'high', 'normal', 'low', 'idle'];
for (const priority of priorities) {
const queue = this.#queues.get(priority);
if (queue && queue.length > 0) {
return queue.shift();
}
}
return null;
}
async #processNext() {
if (this.#running >= this.#concurrency) return;
const task = this.#getNextTask();
if (!task) return;
this.#running++;
try {
const result = await task.taskFn();
task.resolve(result);
} catch (err) {
task.reject(err);
} finally {
this.#running--;
this.#processNext();
}
}
}
const scheduler = new PriorityScheduler(2);
scheduler.schedule(() => fetch('/analytics'), 'low');
scheduler.schedule(() => fetch('/user-data'), 'high');
scheduler.schedule(() => fetch('/critical-update'), 'critical');
5. 令牌桶限流
class TokenBucket {
#tokens;
#maxTokens;
#refillRate;
#lastRefill;
#waitQueue = [];
constructor(maxTokens, refillRate) {
this.#tokens = maxTokens;
this.#maxTokens = maxTokens;
this.#refillRate = refillRate;
this.#lastRefill = Date.now();
}
#refill() {
const now = Date.now();
const elapsed = (now - this.#lastRefill) / 1000;
const newTokens = elapsed * this.#refillRate;
this.#tokens = Math.min(this.#maxTokens, this.#tokens + newTokens);
this.#lastRefill = now;
}
async consume(tokens = 1) {
this.#refill();
if (this.#tokens >= tokens) {
this.#tokens -= tokens;
return;
}
const deficit = tokens - this.#tokens;
const waitTime = (deficit / this.#refillRate) * 1000;
await new Promise(resolve => {
setTimeout(() => {
this.#refill();
this.#tokens -= tokens;
resolve();
}, waitTime);
});
}
get availableTokens() {
this.#refill();
return Math.floor(this.#tokens);
}
}
const rateLimiter = new TokenBucket(10, 10);
async function rateLimitedFetch(url) {
await rateLimiter.consume(1);
return fetch(url);
}
6. 并发模式:扇入扇出
async function fanOutFanIn(items, workerFn, concurrency = 5) {
const results = new Array(items.length);
let nextIndex = 0;
async function worker() {
while (nextIndex < items.length) {
const index = nextIndex++;
results[index] = await workerFn(items[index], index);
}
}
const workers = Array.from({ length: Math.min(concurrency, items.length) }, () => worker());
await Promise.all(workers);
return results;
}
const files = await fanOutFanIn(
fileUrls,
async (url) => {
const response = await fetch(url);
const data = await response.arrayBuffer();
return processFile(data);
},
10
);
流水线并发
class Pipeline {
#stages = [];
addStage(fn, concurrency = 1) {
this.#stages.push({ fn, concurrency });
return this;
}
async process(items) {
let current = items;
for (const stage of this.#stages) {
current = await fanOutFanIn(current, stage.fn, stage.concurrency);
}
return current;
}
}
const pipeline = new Pipeline()
.addStage(async (url) => fetch(url), 10)
.addStage(async (res) => res.json(), 5)
.addStage(async (data) => transform(data), 3)
.addStage(async (result) => saveToDb(result), 2);
await pipeline.process(urls);
7. 错误隔离与重试策略
class RetryPolicy {
constructor(options = {}) {
this.maxRetries = options.maxRetries ?? 3;
this.baseDelay = options.baseDelay ?? 1000;
this.maxDelay = options.maxDelay ?? 30000;
this.jitter = options.jitter ?? true;
this.retryOn = options.retryOn ?? (() => true);
}
async execute(fn) {
let lastError;
for (let attempt = 0; attempt <= this.maxRetries; attempt++) {
try {
return await fn(attempt);
} catch (err) {
lastError = err;
if (attempt === this.maxRetries || !this.retryOn(err, attempt)) {
throw err;
}
const delay = this.#calculateDelay(attempt);
await new Promise(r => setTimeout(r, delay));
}
}
throw lastError;
}
#calculateDelay(attempt) {
let delay = this.baseDelay * Math.pow(2, attempt);
delay = Math.min(delay, this.maxDelay);
if (this.jitter) {
delay = delay * (0.5 + Math.random() * 0.5);
}
return delay;
}
}
class CircuitBreaker {
#state = 'closed';
#failureCount = 0;
#threshold;
#resetTimeout;
#lastFailure = 0;
constructor(threshold = 5, resetTimeout = 60000) {
this.#threshold = threshold;
this.#resetTimeout = resetTimeout;
}
async execute(fn) {
if (this.#state === 'open') {
if (Date.now() - this.#lastFailure > this.#resetTimeout) {
this.#state = 'half-open';
} else {
throw new Error('Circuit breaker is open');
}
}
try {
const result = await fn();
this.#onSuccess();
return result;
} catch (err) {
this.#onFailure();
throw err;
}
}
#onSuccess() {
this.#failureCount = 0;
this.#state = 'closed';
}
#onFailure() {
this.#failureCount++;
this.#lastFailure = Date.now();
if (this.#failureCount >= this.#threshold) {
this.#state = 'open';
}
}
get state() { return this.#state; }
}
8. 实战案例
实战案例 1:图片预加载器
class ImagePreloader {
#semaphore;
#cache = new Map();
constructor(concurrency = 4) {
this.#semaphore = new Semaphore(concurrency);
}
async preload(urls) {
return Promise.allSettled(
urls.map(url => this.#loadOne(url))
);
}
async #loadOne(url) {
if (this.#cache.has(url)) return this.#cache.get(url);
return this.#semaphore.withPermit(async () => {
const img = new Image();
const promise = new Promise((resolve, reject) => {
img.onload = () => resolve(img);
img.onerror = () => reject(new Error(`Failed: ${url}`));
});
img.src = url;
const result = await promise;
this.#cache.set(url, result);
return result;
});
}
}
实战案例 2:数据库连接池
class ConnectionPool {
#pool = [];
#semaphore;
#factory;
#validator;
constructor({ maxSize = 10, factory, validator }) {
this.#semaphore = new Semaphore(maxSize);
this.#factory = factory;
this.#validator = validator ?? (() => true);
}
async query(sql, params) {
await this.#semaphore.acquire();
let conn;
try {
conn = await this.#getConnection();
return await conn.execute(sql, params);
} finally {
if (conn) this.#returnConnection(conn);
this.#semaphore.release();
}
}
async #getConnection() {
while (this.#pool.length > 0) {
const conn = this.#pool.pop();
if (await this.#validator(conn)) return conn;
}
return await this.#factory();
}
#returnConnection(conn) {
this.#pool.push(conn);
}
}
实战案例 3:批量 API 请求编排
async function batchApiRequests(requests, options = {}) {
const {
concurrency = 5,
rateLimit = 100,
retries = 3,
onProgress
} = options;
const bucket = new TokenBucket(rateLimit, rateLimit);
const semaphore = new Semaphore(concurrency);
const retry = new RetryPolicy({ maxRetries: retries });
let completed = 0;
const total = requests.length;
const results = await Promise.allSettled(
requests.map(async (req) => {
await semaphore.acquire();
try {
await bucket.consume(1);
const result = await retry.execute(() =>
fetch(req.url, req.options).then(r => {
if (!r.ok) throw new Error(`HTTP ${r.status}`);
return r.json();
})
);
completed++;
onProgress?.({ completed, total, success: true });
return result;
} catch (err) {
completed++;
onProgress?.({ completed, total, success: false, error: err });
throw err;
} finally {
semaphore.release();
}
})
);
return results;
}
9. 深度追问
Q1:JavaScript 单线程如何实现"并发"?
JavaScript 的并发是 I/O 并发而非计算并发。当 fetch 发起网络请求后,实际的 I/O 操作由操作系统内核处理(epoll/kqueue/IOCP),JavaScript 线程可以继续执行其他代码。只有回调/Promise 的执行是串行的。
Q2:Semaphore 在 JavaScript 中有意义吗?
虽然 JS 是单线程,但异步操作有并发性。Semaphore 限制的是同时"进行中"的异步操作数量(如网络请求、文件操作),而非同时执行的 CPU 线程。它是资源管理工具,不是线程同步工具。
Q3:如何检测和处理内存压力下的调度?
使用 performance.measureUserAgentSpecificMemory()(Chrome)或 process.memoryUsage()(Node.js)监控内存,动态调整并发度。当接近限制时减少并发,避免 OOM。
10. 总结表格
| 模式 | 适用场景 | 核心机制 |
|---|
| 信号量 | 限制并发数 | 计数器 + 等待队列 |
| 背压通道 | 生产者-消费者 | 有界缓冲区 |
| 令牌桶 | API 限流 | 令牌生成速率 |
| 优先级调度 | 差异化处理 | 多级队列 |
| 断路器 | 故障隔离 | 状态机 |
| 重试策略 | 瞬时故障恢复 | 指数退避 |
| 调度策略 | 延迟 | 吞吐量 | 公平性 |
|---|
| FIFO | 一般 | 高 | 高 |
| 优先级 | 低(高优先) | 中 | 低 |
| 轮询 | 均匀 | 中 | 高 |
| 加权 | 按权重 | 高 | 中 |