设置学习环境
mini-redis
安装mini-redis
服务器
cargo install mini-redis
通过启动服务器来确保它已经安装
mini-redis-server
在单独的终端窗口中,尝试使用以下命令获取密钥mini-redis-cli
mini-redis-cli set foo 1
mini-redis-cli get foo
Hello world
use mini_redis::{client, Result};
#[tokio::main]
async fn main() -> Result<()> {
// 建立与mini-redis服务器的连接
let mut client = client::connect("127.0.0.1:6379").await?;
// 设置 key: "hello" 和 值: "world"
client.set("hello", "world".into()).await?;
// 获取"key=hello"的值
let result = client.get("hello").await?;
println!("从服务器端获取到结果={:?}", result);
Ok(())
}
client::connect
函数由mini-redis
包提供,它使用异步的方式跟指定的远程IP
地址建立 TCP 长连接,一旦连接建立成功,那 client 的赋值初始化也将完成
特别值得注意的是:虽然该连接是异步建立的,但是从代码本身来看,完全是同步的代码编写方式,唯一能说明异步的点就是.await
在上例中,redis
的连接函数 connect
实现如上,它看上去很像是一个同步函数,但是 async fn
出卖了它。 async fn
异步函数并不会直接返回值,而是返回一个 Future
,顾名思义,该 Future
会在未来某个时间点被执行,然后最终获取到真实的返回值 Result<Client>
由于 async
会返回一个 Future
,因此我们还需要配合使用 .await
来让该 Future
运行起来,最终获得返回值:
async fn say_to_world() -> String {
String::from("world")
}
#[tokio::main]
async fn main() {
// 此处的函数调用是惰性的,并不会执行 `say_to_world()` 函数体中的代码
let op = say_to_world();
// 首先打印出 "hello"
println!("hello");
// 使用 `.await` 让 `say_to_world` 开始运行起来
println!("{}", op.await);
}
async fn
到底返回什么?它实际上返回的是一个实现了 Future
特征的匿名类型: impl Future<Output = String>
async main
在代码中,使用了一个与众不同的main
函数:async fn main
,而且是用#[tokio::main]
属性进行了标记。异步main
函数有以下意义:
.await
只能在async
函数中使用,如果是以前的fn main
,那它内部是无法直接使用async
函数的。这个会极大地限制了我们的使用场景- 异步运行时本身需要初始化
因此#[tokio::main]
宏将async fn main()
隐式的转换为fn main()
的同时还对整个异步运行时进行了初始化:
#[tokio::main]
async fn main() {
println!("hello");
}
// 将被转换成:
fn main() {
let mut rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async {
println!("hello");
})
}
cargo feature
在引入 tokio
包时,我们在 Cargo.toml
文件中添加了这么一行:
tokio = { version = "1", features = ["full"] }
里面有个 features = ["full"]
可能大家会比较迷惑,当然,关于它的具体解释在本书的 Cargo详解专题 有介绍,这里就简单进行说明
Tokio
有很多功能和特性,例如 TCP
,UDP
,Unix sockets
,同步工具,多调度类型等等,不是每个应用都需要所有的这些特性。为了优化编译时间和最终生成可执行文件大小、内存占用大小,应用可以对这些特性进行可选引入
而这里为了演示的方便,我们使用 full
,表示直接引入所有的特性
任务
一个 Tokio
任务是一个异步的绿色线程,它们通过 tokio::spawn
进行创建,该函数会返回一个 JoinHandle
类型的句柄,调用者可以使用该句柄跟创建的任务进行交互
#[tokio::main]
async fn main() {
let handle = tokio::spawn(async {
// Do some async work
"return value"
});
// Do some other work
let out = handle.await.unwrap();
println!("GOT {}", out);
}
等待JoinHandle
返回一个Result
。当任务在执行过程中遇到错误时,JoinHandle
将会返回一个Err
。当任务发生紧急情况或者任务因运行时关闭而强制取消时,就会发生这种情况
任务是调度程序管理的执行单元。spawn
生成任务会将其提交给Tokio 调度程序,然后由它负责调度执行。需要注意的是,执行任务的线程未必是创建任务的线程,任务完全有可能运行在另一个不同的线程上,而且任务在生成后,它还可能会在线程间被移动
‘static 约束
当在Tokio 运行时生成任务时,其类型的生命周期必须是'static
。意味着,在任务中不能使用外部数据的引用
这是一个常见的误解,
static
总是意味着“永远活着”,但事实并非如此。仅仅因为某个值是static
并不意味着存在内存泄漏。常见的Rust生命周期误解
例如,以下内容将无法编译:
use tokio::task;
#[tokio::main]
async fn main() {
let v = vec![1, 2, 3];
task::spawn(async {
println!("Here's a vec: {:?}", v);
});
}
发生这种情况是因为,默认情况下,变量不会移动到异步块中。该v变量仍然属于该main函数。线程println借用v。将第7行更改为task::spawn(async move {
将指示编译器将v移动到线程内部。现在,该任务拥有其所有数据,使其成为'static
我们说某个值是'static
时,这意味着永远保留该值不会无效。这很重要,因为编译器无法推断新生成的任务会保留多长时间。我们必须确保任务能够永远保存,以便Tokio可以让任务运行到需要的时间
如果必须同时从多个任务访问单个数据,则必须使用同步原语(例如Arc)
Send 约束
tokio::spawn
生成的任务必须实现 Send
特征,因为当这些任务在 .await
执行过程中发生阻塞时,Tokio 调度器会将任务在线程间移动
一个任务要实现 Send
特征,那它在 .await
调用的过程中所持有的全部数据都必须实现 Send
特征。当 .await
调用发生阻塞时,任务会让出当前线程所有权给调度器,然后当任务准备好后,调度器会从上一次暂停的位置继续执行该任务。该流程能正确的工作,任务必须将.await
之后使用的所有状态保存起来,这样才能在中断后恢复现场并继续执行。若这些状态实现了 Send
特征(可以在线程间安全地移动),那任务自然也就可以在线程间安全地移动
例如这段代码是可以编译运行的:
use tokio::task::yield_now;
use std::rc::Rc;
#[tokio::main]
async fn main() {
tokio::spawn(async {
// 语句块的使用强制了 `rc` 会在 `.await` 被调用前就被释放,
// 因此 `rc` 并不会影响 `.await`的安全性
{
let rc = Rc::new("hello");
println!("{}", rc);
}
// `rc` 的作用范围已经失效,因此当任务让出所有权给当前线程时,它无需作为状态被保存起来
yield_now().await;
});
}
而这段则不行:
use tokio::task::yield_now;
use std::rc::Rc;
#[tokio::main]
async fn main() {
tokio::spawn(async {
let rc = Rc::new("hello");
// `rc` 在 `.await` 后还被继续使用,因此它必须被作为任务的状态保存起来
yield_now().await;
// 事实上,注释掉下面一行代码,依然会报错
// 原因是:是否保存,不取决于 `rc` 是否被使用,而是取决于 `.await`在调用时是否仍然处于 `rc` 的作用域中
println!("{}", rc);
// rc 作用域在这里结束
});
}
这里有一个很重要的点,代码注释里面有讲到,rc
是否保存到任务状态中,取决于.await
的调用是否处于它的作用域中,上面代码中,就算注释掉println!
,该报错依然会报错,因此rc
的作用域知道async
的末尾才结束
共享状态
在使用 Tokio
编写异步代码时,一个常见的错误无条件地使用 tokio::sync::Mutex
,而真相是:Tokio
提供的异步锁只应该在跨多个 .await
调用时使用,而且 Tokio
的 Mutex
实际上内部使用的也是 std::sync::Mutex
- 锁如果在多个
.await
过程中持有,应该使用Tokio
提供的锁,原因是.await
的过程中锁可能在线程间转移,若使用标准库的同步锁存在死锁的可能性,例如某个任务刚获取完锁,还没使用完就因为.await
让出了当前线程的所有权,结果下个任务又去获取了锁,造成死锁 - 锁竞争不多的情况下,使用
std::sync::Mutex
- 锁竞争多,可以考虑使用三方库提供的性能更高的锁,例如parking_lot::Mutex
当同步锁的竞争变成一个问题时,使用Tokio提供的异步锁几乎并不能帮你解决问题,此时可以考虑如下选项:
- 创建专门的任务并使用消息传递的方式来管理状态
- 将锁进行分片
- 重构代码以避免锁
尝试编译下面的代码, 编译报错
rustc 1.81.0 (eeb90cda1 2024-09-04)
use std::sync::{Mutex, MutexGuard};
async fn increment_and_do_stuff(mutex: &Mutex<i32>) {
let mut lock: MutexGuard<i32> = mutex.lock().unwrap();
*lock += 1;
do_something_async().await;
} // lock goes out of scope here
std::sync::MutexGuard
的类型不是 Send
. 也就是说不能将互斥锁发送到另一个线程, 并且发生错误是因为 tokio 运行时可以在每个 .await
的线程之间移动任务.
为了避免这种情况, 应该重新构建代码:
// This works!
async fn increment_and_do_stuff(mutex: &Mutex<i32>) {
{
let mut lock: MutexGuard<i32> = mutex.lock().unwrap();
*lock += 1;
} // lock goes out of scope here
do_something_async().await;
}
如果有三方库为
MutexGuard
实现了Send
. 这种情况下, 即使.await
持有MutexGuard
, 也不会出现编译期错误. 但是会发生死锁
推荐的一些写法:
use std::sync::Mutex;
struct CanIncrement {
mutex: Mutex<i32>,
}
impl CanIncrement {
// This function is not marked async.
fn increment(&self) {
let mut lock = self.mutex.lock().unwrap();
*lock += 1;
}
}
async fn increment_and_do_stuff(can_incr: &CanIncrement) {
can_incr.increment();
do_something_async().await;
}
Channels
场景: 多个用户访问同一个资源服务器
这个时候就不能使用 异步互斥锁
来实现了, 理论上还是可行的, 但是只允许一个正在进行的请求
解决方案: message passing
tokio’s channel primitives
- mpsc: 多生产者, 单消费者 channel
- oneshot: 单生产者, 单消费者 channel
- broadcast: 多生产者, 多消费者 channel
- watch: 多生产者, 多消费者 channel; 但是不保留历史记录, 接收方只能看到最新消息
思路
mpsc
和 oneshot
都是可以产生 生产者
和 消费者
的
使用 mpsc
+ oneshot
解决上述问题, 步骤:
mpsc消费者
负责处理mpsc生产者
的指令请求, 实际的数据处理是mpsc消费者
执行的mpsc生产者
负责组装数据, 组装数据的时候把oneshot生产者
一并发送给mpsc消费者
mpsc消费者
处理完mpsc生产者
的指令之后, 把结果通过oneshot生产者
返回给mpsc生产者
mpsc生产者
通过oneshot消费者
接受自mpsc消费者
返回的结果
I'm so cute. Please give me money.