综合应用:Futures、Tasks 和 Threads
正如我们在第16章中所见,线程提供了一种并发的方法。在本章中,我们看到了另一种方法:使用 async
与 futures
和 streams
。如果你在犹豫何时选择哪种方法,答案是:视情况而定!在许多情况下,选择并不是线程 或 async
,而是线程 和 async
。
许多操作系统已经提供了基于线程的并发模型数十年,因此许多编程语言也支持它们。然而,这些模型并非没有权衡。在许多操作系统上,每个线程都会占用相当一部分内存,并且启动和关闭时会有一些开销。线程也只有在你的操作系统和硬件支持它们时才是一个选项。与主流的桌面和移动计算机不同,一些嵌入式系统根本没有操作系统,因此它们也没有线程。
async
模型提供了一组不同的——最终是互补的——权衡。在 async
模型中,并发操作不需要自己的线程。相反,它们可以在任务上运行,就像我们在 streams
部分中使用 trpl::spawn_task
从同步函数中启动工作一样。任务类似于线程,但它不是由操作系统管理,而是由库级代码管理:运行时。
在上一节中,我们看到可以通过使用 async
通道并生成一个可以从同步代码调用的 async
任务来构建一个流。我们可以用线程做完全相同的事情。在 Listing 17-40 中,我们使用了 trpl::spawn_task
和 trpl::sleep
。在 Listing 17-41 中,我们在 get_intervals
函数中用标准库中的 thread::spawn
和 thread::sleep
API 替换了它们。
extern crate trpl; // required for mdbook test use std::{pin::pin, thread, time::Duration}; use trpl::{ReceiverStream, Stream, StreamExt}; fn main() { trpl::run(async { let messages = get_messages().timeout(Duration::from_millis(200)); let intervals = get_intervals() .map(|count| format!("Interval #{count}")) .throttle(Duration::from_millis(500)) .timeout(Duration::from_secs(10)); let merged = messages.merge(intervals).take(20); let mut stream = pin!(merged); while let Some(result) = stream.next().await { match result { Ok(item) => println!("{item}"), Err(reason) => eprintln!("Problem: {reason:?}"), } } }); } fn get_messages() -> impl Stream<Item = String> { let (tx, rx) = trpl::channel(); trpl::spawn_task(async move { let messages = ["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"]; for (index, message) in messages.into_iter().enumerate() { let time_to_sleep = if index % 2 == 0 { 100 } else { 300 }; trpl::sleep(Duration::from_millis(time_to_sleep)).await; if let Err(send_error) = tx.send(format!("Message: '{message}'")) { eprintln!("Cannot send message '{message}': {send_error}"); break; } } }); ReceiverStream::new(rx) } fn get_intervals() -> impl Stream<Item = u32> { let (tx, rx) = trpl::channel(); // This is *not* `trpl::spawn` but `std::thread::spawn`! thread::spawn(move || { let mut count = 0; loop { // Likewise, this is *not* `trpl::sleep` but `std::thread::sleep`! thread::sleep(Duration::from_millis(1)); count += 1; if let Err(send_error) = tx.send(count) { eprintln!("Could not send interval {count}: {send_error}"); break; }; } }); ReceiverStream::new(rx) }
如果你运行这段代码,输出与 Listing 17-40 完全相同。并且注意从调用代码的角度来看,这里的变化是多么小。更重要的是,即使我们的一个函数在运行时生成了一个 async
任务,而另一个函数生成了一个操作系统线程,生成的流并没有受到这些差异的影响。
尽管它们有相似之处,但这两种方法的行为非常不同,尽管在这个非常简单的示例中我们可能很难测量出来。我们可以在任何现代个人计算机上生成数百万个 async
任务。如果我们尝试用线程来做同样的事情,我们实际上会耗尽内存!
然而,这些 API 如此相似是有原因的。线程作为一组同步操作的边界;并发可以在线程之间进行。任务作为一组异步操作的边界;并发可以在任务之间和任务内部进行,因为任务可以在其主体中的 futures
之间切换。最后,futures
是 Rust 中最细粒度的并发单元,每个 future
可能代表其他 futures
的树。运行时——特别是它的执行器——管理任务,任务管理 futures
。在这方面,任务类似于轻量级的、由运行时管理的线程,具有由运行时而不是操作系统管理的额外功能。
这并不意味着 async
任务总是比线程更好(反之亦然)。在某些方面,使用线程进行并发比使用 async
进行并发更简单。这可以是一个优势,也可以是一个劣势。线程有点“发射后不管”;它们没有原生的 future
等价物,因此它们只是运行到完成,除了操作系统本身之外不会被中断。也就是说,它们没有像 futures
那样内置的 任务内并发 支持。Rust 中的线程也没有取消机制——这个话题我们在本章中没有明确讨论,但隐含在我们每次结束一个 future
时,它的状态都会被正确清理的事实中。
这些限制也使线程比 futures
更难组合。例如,使用线程构建像我们在本章前面构建的 timeout
和 throttle
方法这样的辅助工具要困难得多。futures
是更丰富的数据结构,这意味着它们可以更自然地组合在一起,正如我们所看到的。
因此,任务为我们提供了对 futures
的额外控制,允许我们选择在哪里以及如何将它们分组。事实证明,线程和任务通常可以很好地协同工作,因为任务可以(至少在某些运行时中)在线程之间移动。事实上,我们一直在使用的运行时——包括 spawn_blocking
和 spawn_task
函数——默认是多线程的!许多运行时使用一种称为 工作窃取 的方法,根据线程当前的利用率在线程之间透明地移动任务,以提高系统的整体性能。这种方法实际上需要线程 和 任务,因此也需要 futures
。
在考虑何时使用哪种方法时,请考虑以下经验法则:
- 如果工作是非常可并行的,例如处理一堆可以分别处理的数据,线程是更好的选择。
- 如果工作是非常并发的,例如处理来自一堆不同来源的消息,这些消息可能以不同的间隔或不同的速率到达,
async
是更好的选择。
如果你需要并行性和并发性,你不必在线程和 async
之间做出选择。你可以自由地同时使用它们,让每种方法发挥其最佳作用。例如,Listing 17-42 展示了在现实世界的 Rust 代码中这种混合的常见示例。
extern crate trpl; // for mdbook test use std::{thread, time::Duration}; fn main() { let (tx, mut rx) = trpl::channel(); thread::spawn(move || { for i in 1..11 { tx.send(i).unwrap(); thread::sleep(Duration::from_secs(1)); } }); trpl::run(async { while let Some(message) = rx.recv().await { println!("{message}"); } }); }
我们首先创建一个 async
通道,然后生成一个线程,该线程拥有通道的发送端。在线程中,我们发送数字 1 到 10,每次发送之间休眠一秒钟。最后,我们运行一个使用 async
块创建的 future
,并将其传递给 trpl::run
,就像我们在本章中所做的那样。在该 future
中,我们等待这些消息,就像我们在其他消息传递示例中所看到的那样。
回到本章开头的场景,想象一下使用专用线程运行一组视频编码任务(因为视频编码是计算密集型的),但使用 async
通道通知 UI 这些操作已完成。在现实世界的用例中,有无数这样的组合示例。
总结
这并不是你在本书中最后一次看到并发。第21章中的项目将在比这里讨论的更现实的情况下应用这些概念,并更直接地比较使用线程与任务解决问题的方法。
无论你选择哪种方法,Rust 都为你提供了编写安全、快速、并发代码所需的工具——无论是用于高吞吐量的 Web 服务器还是嵌入式操作系统。
接下来,我们将讨论在 Rust 程序变大时,如何以惯用的方式建模问题和构建解决方案。此外,我们还将讨论 Rust 的惯用法与你可能熟悉的面向对象编程中的惯用法之间的关系。