Dart 异步编程模型
Site Owner
Published on 2026-05-22
详解Dart Event Loop与Microtask Queue、Future状态机、Stream分类、StreamController背压处理及async*生成器语法

Dart 异步编程模型
核心摘要:Dart 运行在单线程事件循环(Event Loop)之上,通过
Future、async/await、Stream三层抽象实现高效异步。理解 Dart 的 Isolate 模型、微任务队列(Microtask Queue)与事件队列(Event Queue)的执行顺序,是写出高性能 Flutter 应用的基础。本文还对比 JavaScript 的 Promise/async-await、Java 的 CompletableFuture 和 Kotlin 的协程(Coroutines)。
目录
- 事件循环与队列模型
- Future 详解
- async / await 原理
- Stream:响应式数据流
- Isolate:真正的并行
- 与 JS / Java / Kotlin 的对比
- 实战案例:并发数据加载器
- 深度追问
- 总结表格
1. 事件循环与队列模型
Dart 的并发模型基于单线程 + 事件循环,与 JavaScript 的模型非常相似,但有一个重要区别:Dart 有两个队列:
┌─────────────────────────────────────┐
│ Dart Event Loop │
│ │
│ ┌─────────────────────────────┐ │
│ │ Microtask Queue(微任务) │ ◀── 优先级高
│ │ - Future.microtask() │ │
│ │ - scheduleMicrotask() │ │
│ └─────────────────────────────┘ │
│ ↓ 清空后 │
│ ┌─────────────────────────────┐ │
│ │ Event Queue(事件队列) │ ◀── 优先级低
│ │ - Timer callbacks │ │
│ │ - I/O events │ │
│ │ - UI events │ │
│ │ - Future(非 microtask) │ │
│ └─────────────────────────────┘ │
└─────────────────────────────────────┘
执行规则:
- 执行当前同步代码
- 清空 Microtask Queue(全部执行完才进行下一步)
- 取 Event Queue 中一个事件执行
- 回到步骤 2
import 'dart:async';
void main() {
print('1. 同步开始');
Future(() => print('4. Event Queue')); // 加入事件队列
Future.microtask(() => print('3. Microtask')); // 加入微任务队列
scheduleMicrotask(() => print('3b. scheduleMicrotask')); // 也是微任务
print('2. 同步结束');
// 输出顺序:
// 1. 同步开始
// 2. 同步结束
// 3. Microtask
// 3b. scheduleMicrotask
// 4. Event Queue
}
⚠️ 陷阱:无限微任务
// ❌ 危险!微任务不断添加微任务,事件循环永远无法处理 Event Queue
void dangerousMicrotask() {
void loop() {
scheduleMicrotask(() {
// 做一些事情...
loop(); // 再次调度微任务 → 饿死事件队列
});
}
loop();
}
2. Future 详解
Future<T> 代表一个未来某时刻可用的值,类似 JavaScript 的 Promise<T>。
2.1 Future 的三种状态
// State 1: Pending(未完成)
final future = Future<String>.delayed(
const Duration(seconds: 1),
() => 'done',
);
// State 2: Completed with value
Future.value('immediate'); // 立即完成的 Future
// State 3: Completed with error
Future.error(Exception('出错了'));
2.2 Future 的创建方式
// 方式1:构造函数
Future<int> compute() => Future(() {
// 这段代码在 Event Queue 中执行(不是微任务)
return heavyComputation();
});
// 方式2:async 函数(最常用)
Future<User> fetchUser(int id) async {
final response = await http.get('/users/$id');
return User.fromJson(response.body);
}
// 方式3:Completer(手动控制 Future 的完成)
Future<String> manualFuture() {
final completer = Completer<String>();
// 模拟某个回调式 API
someCallbackApi((result) {
if (result.isSuccess) {
completer.complete(result.value);
} else {
completer.completeError(result.error);
}
});
return completer.future;
}
2.3 Future 组合
import 'dart:async';
// 串行(链式)
Future<String> serial() {
return Future.value(1)
.then((v) => v * 2) // 2
.then((v) => v.toString()) // "2"
.then((v) => 'Result: $v'); // "Result: 2"
}
// 并行 —— Future.wait
Future<void> parallel() async {
final results = await Future.wait([
fetchUser(1), // 同时发起
fetchUser(2), // 同时发起
fetchUser(3), // 同时发起
]);
// 等所有完成后,results 是 [User, User, User]
print(results.map((u) => u.name).join(', '));
}
// 竞争 —— Future.any(第一个完成的获胜)
Future<String> race() async {
return Future.any([
fetchFromPrimaryServer(),
fetchFromBackupServer(),
]);
}
// 错误处理
Future<User?> safeUser(int id) {
return fetchUser(id)
.catchError(
(e) => null,
test: (e) => e is NetworkException, // 只捕获特定错误
);
}
// whenComplete(finally 语义)
Future<void> withCleanup() async {
LoadingIndicator.show();
try {
await fetchData();
} finally {
LoadingIndicator.hide(); // 无论成功失败都执行
}
}
3. async / await 原理
async/await 是语法糖,编译器将其转换为 Future 链。理解底层有助于避免常见陷阱。
3.1 编译器转换原理
// 你写的代码
Future<String> fetchData() async {
final a = await step1();
final b = await step2(a);
return b;
}
// 编译器大致转换为(简化):
Future<String> fetchData() {
return step1().then((a) {
return step2(a).then((b) {
return b;
});
});
}
3.2 async 函数返回值规则
// async 函数总是返回 Future
String notAsync() => 'hello'; // 返回 String
Future<String> withAsync() async => 'hello'; // 返回 Future<String>
// 即使返回 void,也是 Future<void>
Future<void> doWork() async {
await someOperation();
// 没有 return 语句,等价于 return;
}
// async 函数中抛出异常 = Future.error
Future<int> divide(int a, int b) async {
if (b == 0) throw ArgumentError('除数不能为零');
return a ~/ b;
}
3.3 常见陷阱
// 陷阱1:忘记 await,导致 Future 被丢弃
Future<void> badExample() async {
saveToDatabase(data); // ❌ 没有 await,错误被吞掉,操作不确定完成
}
Future<void> goodExample() async {
await saveToDatabase(data); // ✅
}
// 陷阱2:在循环中串行 await(低效)
Future<void> slowLoop(List<int> ids) async {
for (final id in ids) {
await processItem(id); // ❌ 串行执行,n 个任务花 n 倍时间
}
}
// 改进:并行执行
Future<void> fastLoop(List<int> ids) async {
await Future.wait(ids.map(processItem)); // ✅ 并行,接近 1 倍时间
}
// 陷阱3:async 函数中的 try-catch 边界
Future<void> catchIssue() async {
try {
final future = fetchData(); // 注意:没有 await
// 这里 future 的错误不会被这个 try-catch 捕获!
await future; // ✅ await 放在 try 内才能捕获
} catch (e) {
print('捕获错误: $e');
}
}
// 陷阱4:FutureBuilder 中的重复构建
// ❌ 每次 build 都创建新 Future
Widget badWidget() {
return FutureBuilder(
future: fetchData(), // 每次重建都重新请求!
builder: (context, snapshot) => Text(snapshot.data ?? ''),
);
}
// ✅ 在 State 中缓存 Future
class GoodState extends State<MyWidget> {
late Future<String> _future;
@override
void initState() {
super.initState();
_future = fetchData(); // 只创建一次
}
@override
Widget build(BuildContext context) {
return FutureBuilder(
future: _future, // 复用已有 Future
builder: (context, snapshot) => Text(snapshot.data ?? ''),
);
}
}
4. Stream:响应式数据流
Stream<T> 是异步的可迭代序列,用于处理一系列异步事件。
4.1 Single-subscription vs Broadcast
// Single-subscription Stream:只能有一个监听者(默认)
final stream = Stream<int>.fromIterable([1, 2, 3, 4, 5]);
// Broadcast Stream:可以有多个监听者
final broadcastStream = stream.asBroadcastStream();
// StreamController:手动控制 Stream
final controller = StreamController<String>();
// 添加数据
controller.sink.add('hello');
controller.sink.add('world');
controller.sink.addError(Exception('出错'));
controller.sink.close(); // 关闭流
// 监听
controller.stream.listen(
(data) => print('收到: $data'),
onError: (e) => print('错误: $e'),
onDone: () => print('流结束'),
cancelOnError: false, // 出错后是否取消订阅
);
4.2 Stream 操作符
Stream<int> numbers = Stream.fromIterable(
List.generate(10, (i) => i + 1)
);
// 变换链(类比 RxJava / RxJS)
numbers
.where((n) => n.isEven) // [2, 4, 6, 8, 10]
.map((n) => n * n) // [4, 16, 36, 64, 100]
.take(3) // [4, 16, 36]
.listen(print);
// asyncMap:每个元素异步处理
Stream<User> userStream = Stream.fromIterable([1, 2, 3])
.asyncMap((id) => fetchUser(id));
// expand:一对多映射
Stream<String> words = Stream.fromIterable(['hello world', 'foo bar'])
.expand((s) => s.split(' ')); // ['hello', 'world', 'foo', 'bar']
// distinct:去重(连续重复)
Stream.fromIterable([1, 1, 2, 2, 3, 1])
.distinct() // [1, 2, 3, 1]
.listen(print);
// timeout:超时处理
stream.timeout(
const Duration(seconds: 5),
onTimeout: (sink) => sink.addError(TimeoutException('超时')),
);
4.3 async* / yield:生成器语法
// async* 函数返回 Stream
Stream<int> countDown(int from) async* {
for (int i = from; i >= 0; i--) {
yield i; // 每次 yield 发出一个值
await Future.delayed(const Duration(seconds: 1));
}
}
// yield*:委托给另一个 Stream
Stream<int> allNumbers() async* {
yield* countDown(5); // 先 5 到 0
yield* Stream.fromIterable([10, 20, 30]); // 然后 10, 20, 30
}
// 实际应用:文件逐行读取
Stream<String> readLines(String path) async* {
final file = File(path);
await for (final line in file.openRead().transform(utf8.decoder).transform(LineSplitter())) {
yield line;
}
}
4.4 StreamBuilder in Flutter
class LivePriceWidget extends StatelessWidget {
final Stream<double> priceStream;
const LivePriceWidget({super.key, required this.priceStream});
@override
Widget build(BuildContext context) {
return StreamBuilder<double>(
stream: priceStream,
builder: (context, snapshot) {
return switch (snapshot.connectionState) {
ConnectionState.waiting => const CircularProgressIndicator(),
ConnectionState.active when snapshot.hasData =>
Text('¥${snapshot.data!.toStringAsFixed(2)}',
style: const TextStyle(fontSize: 24)),
ConnectionState.active when snapshot.hasError =>
Text('错误: ${snapshot.error}'),
ConnectionState.done => const Text('已关闭'),
_ => const SizedBox(),
};
},
);
}
}
5. Isolate:真正的并行
Dart 的 Isolate 类似进程:独立内存,无共享状态,通过消息传递通信。
import 'dart:isolate';
// 方式1:compute(Flutter 封装,最简单)
// compute 在新 Isolate 中运行函数,自动处理消息传递
Future<List<int>> sortBigList(List<int> data) async {
return compute(_sortIsolate, data); // _sortIsolate 必须是顶层函数
}
List<int> _sortIsolate(List<int> data) {
data.sort(); // 在独立 Isolate 中执行,不阻塞 UI
return data;
}
// 方式2:Isolate.run(Dart 2.19+,更简洁)
Future<void> modernIsolate() async {
final result = await Isolate.run(() {
// 复杂计算
return heavyComputation();
});
print(result);
}
// 方式3:手动 Isolate + SendPort(长期通信)
Future<void> longRunningIsolate() async {
final receivePort = ReceivePort();
await Isolate.spawn(
_workerIsolate,
receivePort.sendPort, // 传入通信端口
);
SendPort? workerSendPort;
receivePort.listen((message) {
if (message is SendPort) {
// 握手:收到 worker 的 SendPort
workerSendPort = message;
} else if (message is String) {
print('Worker 返回: $message');
}
});
// 向 worker 发送任务
workerSendPort?.send({'task': 'process', 'data': [1, 2, 3]});
}
void _workerIsolate(SendPort mainSendPort) {
final workerReceivePort = ReceivePort();
// 发送自己的 SendPort(握手)
mainSendPort.send(workerReceivePort.sendPort);
workerReceivePort.listen((message) {
if (message is Map) {
// 处理任务并返回结果
final result = processTask(message['data']);
mainSendPort.send(result);
}
});
}
何时使用 Isolate:
- JSON 解析大文件(> 1MB)
- 图像处理、加密运算
- 复杂排序/搜索算法
- 任何导致帧率低于 60fps 的同步操作
6. 与 JS / Java / Kotlin 的对比
JavaScript 对比
// JS:Promise + async/await(概念上相同)
async function fetchUser(id) {
const response = await fetch(`/users/${id}`);
return response.json();
}
// JS:Promise.all(对应 Future.wait)
const [user1, user2] = await Promise.all([fetchUser(1), fetchUser(2)]);
// JS:没有 Isolate,用 Web Worker 实现并行
// 但 Web Worker 通信模型与 Dart Isolate 类似
关键差异:
- Dart Future 是微任务优先,JS Promise 回调也是微任务(
queueMicrotask) - Dart 有
Stream,JS 有ReadableStream/ RxJSObservable(概念对应) - Dart Isolate 内存隔离,JS Worker 通过
postMessage传递拷贝(structuredClone)
Java CompletableFuture 对比
// Java:CompletableFuture(冗长)
CompletableFuture<User> future = CompletableFuture
.supplyAsync(() -> fetchUser(1))
.thenApply(user -> enrichUser(user))
.exceptionally(e -> defaultUser());
// Java 21:Virtual Thread(更接近 Dart 的 async/await)
try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
Future<User> f = executor.submit(() -> fetchUser(1));
}
Kotlin 协程对比
// Kotlin:suspend 函数(语义上与 Dart async 相同)
suspend fun fetchUser(id: Int): User {
return withContext(Dispatchers.IO) {
api.getUser(id)
}
}
// Kotlin:Flow(对应 Dart Stream)
fun priceFlow(): Flow<Double> = flow {
while (true) {
emit(fetchCurrentPrice())
delay(1000)
}
}
// Kotlin:launch + async(对应 Future.wait)
val result = coroutineScope {
val a = async { fetchA() }
val b = async { fetchB() }
a.await() + b.await()
}
Dart vs Kotlin 协程深度对比:
| 特性 | Dart | Kotlin |
|---|---|---|
| 基本单元 | Future<T> | Deferred<T>(async) |
| 挂起标记 | async 函数 + await | suspend 函数 |
| 流 | Stream<T> | Flow<T> |
| 并发范围 | 无 CoroutineScope | CoroutineScope(结构化并发) |
| 线程调度 | 单线程(Isolate 间切换) | Dispatchers.IO/Main/Default |
| 取消 | 无内置取消 | 协程支持协作取消 |
7. 实战案例:并发数据加载器
import 'dart:async';
// 带超时、重试、并发限制的请求管理器
class ConcurrentLoader<T> {
final int maxConcurrent;
final Duration timeout;
final int maxRetries;
int _active = 0;
final _queue = <_LoadTask<T>>[];
ConcurrentLoader({
this.maxConcurrent = 3,
this.timeout = const Duration(seconds: 10),
this.maxRetries = 2,
});
Future<T> load(Future<T> Function() task) {
final completer = Completer<T>();
_queue.add(_LoadTask(task, completer));
_processQueue();
return completer.future;
}
void _processQueue() {
while (_active < maxConcurrent && _queue.isNotEmpty) {
final task = _queue.removeAt(0);
_active++;
_executeWithRetry(task).then((_) {
_active--;
_processQueue();
});
}
}
Future<void> _executeWithRetry(_LoadTask<T> task, [int attempt = 0]) async {
try {
final result = await task.fn().timeout(timeout);
task.completer.complete(result);
} catch (e) {
if (attempt < maxRetries) {
await Future.delayed(Duration(milliseconds: 200 * (attempt + 1)));
return _executeWithRetry(task, attempt + 1);
}
task.completer.completeError(e);
}
}
}
class _LoadTask<T> {
final Future<T> Function() fn;
final Completer<T> completer;
_LoadTask(this.fn, this.completer);
}
// 使用示例
Future<void> loadDashboard() async {
final loader = ConcurrentLoader<dynamic>(maxConcurrent: 3);
final results = await Future.wait([
loader.load(() => fetchUserProfile()),
loader.load(() => fetchOrders()),
loader.load(() => fetchNotifications()),
loader.load(() => fetchRecommendations()),
loader.load(() => fetchStats()),
// 以上 5 个请求最多同时 3 个并发
]);
final [profile, orders, notifications, recs, stats] = results;
updateUI(profile, orders, notifications, recs, stats);
}
// 响应式数据管道示例
class DataPipeline {
final _inputController = StreamController<String>.broadcast();
late final Stream<SearchResult> results;
DataPipeline() {
results = _inputController.stream
.where((query) => query.length >= 2) // 至少2字符
.distinct() // 去重
.debounceTime(const Duration(milliseconds: 300)) // 防抖(需 rxdart)
.switchMap((query) => _search(query).asStream()) // 取消上次请求
.handleError((e) => print('搜索错误: $e'));
}
void search(String query) => _inputController.add(query);
Stream<SearchResult> _search(String query) async* {
yield SearchResult.loading();
try {
final data = await searchApi(query);
yield SearchResult.success(data);
} catch (e) {
yield SearchResult.error(e.toString());
}
}
void dispose() => _inputController.close();
}
8. 深度追问
Q1:Future 和 Stream 本质上有什么关系?
Future<T>是只发出一个值(或错误)的异步操作。Stream<T>是发出零个或多个值的异步序列。可以说Future是Stream的特例(只有一个元素的流)。Dart 提供了Stream.fromFuture(future)将 Future 转为 Stream,也可以stream.first将 Stream 转为 Future。在底层,两者都基于事件循环。
Q2:为什么 Dart 选择单线程 + Isolate,而不是像 Java 那样的多线程?
多线程编程的根本难点是共享可变状态(Shared Mutable State):需要锁(Mutex)、信号量、volatile,容易出现竞态条件(Race Condition)和死锁。Dart 用内存隔离从根源解决这个问题:Isolate 不共享任何内存,通信只能通过消息传递(类似 Erlang/Actor 模型)。代价是跨 Isolate 传递大数据需要序列化(拷贝),但 Dart 也提供了
TransferableTypedData来零拷贝传递 typed data。
Q3:async* / yield 和手动 StreamController 哪个更好?
async*:适合按顺序生成数据的场景,代码更简洁,编译器自动处理暂停/恢复StreamController:适合外部推送数据的场景(如 WebSocket、传感器、UI 事件) 原则:能用async*就用;需要外部代码向流中添加数据时用StreamController,但务必处理好close()和背压(backpressure)。
Q4:如何取消一个正在进行的 Future?
Dart 原生 Future 不支持取消(与 Kotlin 协程不同)。常用替代方案:
- 用
CancelToken模式(如 Dio 库)- 检查标志位:在 Future 内部定期检查
_isCancelled- 用
Stream替代:StreamSubscription.cancel()可以取消- 竞争一个
Completer.completeError(CancelledException())
9. 总结表格
| 概念 | Dart | JavaScript | Kotlin | Java |
|---|---|---|---|---|
| 单次异步 | Future<T> | Promise<T> | Deferred<T> | CompletableFuture<T> |
| 异步语法 | async/await | async/await | suspend | .thenApply() |
| 异步序列 | Stream<T> | ReadableStream / Observable | Flow<T> | Flux<T>(Reactor) |
| 并发方式 | Isolate(内存隔离) | Worker(隔离) | 协程+线程池 | 线程+虚拟线程 |
| 微任务 | scheduleMicrotask | queueMicrotask | Dispatchers.Unconfined | 无直接对应 |
| 并行等待 | Future.wait | Promise.all | awaitAll | CompletableFuture.allOf |
| 错误处理 | try/catch + catchError | try/catch + .catch() | try/catch | exceptionally() |
最佳实践速记
1. 永远 await Future,不要让它"飘"(丢失错误)
2. 并行任务用 Future.wait,不要顺序 await
3. 长时间计算放 Isolate,保持 UI 线程流畅(>16ms 就考虑)
4. Stream 用完记得 cancel 订阅,避免内存泄漏
5. FutureBuilder 的 future 在 State 中缓存,不要在 build 方法里创建
6. async* + yield 是处理分页加载的绝佳模式
参考资料: