banner

Tokio随记

Scroll down

设置学习环境

mini-redis

安装mini-redis 服务器

cargo install mini-redis

通过启动服务器来确保它已经安装

mini-redis-server

在单独的终端窗口中,尝试使用以下命令获取密钥mini-redis-cli

mini-redis-cli set foo 1
mini-redis-cli get foo

Hello world

use mini_redis::{client, Result};

#[tokio::main]
async fn main() -> Result<()> {
    // 建立与mini-redis服务器的连接
    let mut client = client::connect("127.0.0.1:6379").await?;

    // 设置 key: "hello" 和 值: "world"
    client.set("hello", "world".into()).await?;

    // 获取"key=hello"的值
    let result = client.get("hello").await?;

    println!("从服务器端获取到结果={:?}", result);

    Ok(())
}

client::connect 函数由mini-redis 包提供,它使用异步的方式跟指定的远程IP 地址建立 TCP 长连接,一旦连接建立成功,那 client 的赋值初始化也将完成

特别值得注意的是:虽然该连接是异步建立的,但是从代码本身来看,完全是同步的代码编写方式,唯一能说明异步的点就是.await

在上例中,redis 的连接函数 connect 实现如上,它看上去很像是一个同步函数,但是 async fn 出卖了它。 async fn 异步函数并不会直接返回值,而是返回一个 Future,顾名思义,该 Future 会在未来某个时间点被执行,然后最终获取到真实的返回值 Result<Client>

由于 async 会返回一个 Future,因此我们还需要配合使用 .await 来让该 Future 运行起来,最终获得返回值:

async fn say_to_world() -> String {
    String::from("world")
}

#[tokio::main]
async fn main() {
    // 此处的函数调用是惰性的,并不会执行 `say_to_world()` 函数体中的代码
    let op = say_to_world();

    // 首先打印出 "hello"
    println!("hello");

    // 使用 `.await` 让 `say_to_world` 开始运行起来
    println!("{}", op.await);
}

async fn 到底返回什么?它实际上返回的是一个实现了 Future 特征的匿名类型: impl Future<Output = String>

async main

在代码中,使用了一个与众不同的main 函数:async fn main,而且是用#[tokio::main] 属性进行了标记。异步main 函数有以下意义:

  • .await 只能在async 函数中使用,如果是以前的fn main,那它内部是无法直接使用async 函数的。这个会极大地限制了我们的使用场景
  • 异步运行时本身需要初始化

因此#[tokio::main] 宏将async fn main() 隐式的转换为fn main() 的同时还对整个异步运行时进行了初始化:

#[tokio::main]
async fn main() {
    println!("hello");
}
// 将被转换成:
fn main() {
    let mut rt = tokio::runtime::Runtime::new().unwrap();
    rt.block_on(async {
        println!("hello");
    })
}

cargo feature

在引入 tokio 包时,我们在 Cargo.toml 文件中添加了这么一行:

tokio = { version = "1", features = ["full"] }

里面有个 features = ["full"] 可能大家会比较迷惑,当然,关于它的具体解释在本书的 Cargo详解专题 有介绍,这里就简单进行说明

Tokio 有很多功能和特性,例如 TCPUDPUnix sockets,同步工具,多调度类型等等,不是每个应用都需要所有的这些特性。为了优化编译时间和最终生成可执行文件大小、内存占用大小,应用可以对这些特性进行可选引入

而这里为了演示的方便,我们使用 full ,表示直接引入所有的特性

任务

一个 Tokio 任务是一个异步的绿色线程,它们通过 tokio::spawn 进行创建,该函数会返回一个 JoinHandle 类型的句柄,调用者可以使用该句柄跟创建的任务进行交互

#[tokio::main]
async fn main() {
    let handle = tokio::spawn(async {
        // Do some async work
        "return value"
    });

    // Do some other work

    let out = handle.await.unwrap();
    println!("GOT {}", out);
}

等待JoinHandle 返回一个Result。当任务在执行过程中遇到错误时,JoinHandle 将会返回一个Err。当任务发生紧急情况或者任务因运行时关闭而强制取消时,就会发生这种情况

任务是调度程序管理的执行单元。spawn 生成任务会将其提交给Tokio 调度程序,然后由它负责调度执行。需要注意的是,执行任务的线程未必是创建任务的线程,任务完全有可能运行在另一个不同的线程上,而且任务在生成后,它还可能会在线程间被移动

‘static 约束

当在Tokio 运行时生成任务时,其类型的生命周期必须是'static。意味着,在任务中不能使用外部数据的引用

这是一个常见的误解,static 总是意味着“永远活着”,但事实并非如此。仅仅因为某个值是static 并不意味着存在内存泄漏。常见的Rust生命周期误解

例如,以下内容将无法编译:

use tokio::task;

#[tokio::main]
async fn main() {
    let v = vec![1, 2, 3];

    task::spawn(async {
        println!("Here's a vec: {:?}", v);
    });
}

发生这种情况是因为,默认情况下,变量不会移动到异步块中。该v变量仍然属于该main函数。线程println借用v。将第7行更改为task::spawn(async move { 将指示编译器将v移动到线程内部。现在,该任务拥有其所有数据,使其成为'static

我们说某个值是'static 时,这意味着永远保留该值不会无效。这很重要,因为编译器无法推断新生成的任务会保留多长时间。我们必须确保任务能够永远保存,以便Tokio可以让任务运行到需要的时间

如果必须同时从多个任务访问单个数据,则必须使用同步原语(例如Arc)

Send 约束

tokio::spawn 生成的任务必须实现 Send 特征,因为当这些任务在 .await 执行过程中发生阻塞时,Tokio 调度器会将任务在线程间移动

一个任务要实现 Send 特征,那它在 .await 调用的过程中所持有的全部数据都必须实现 Send 特征。当 .await 调用发生阻塞时,任务会让出当前线程所有权给调度器,然后当任务准备好后,调度器会从上一次暂停的位置继续执行该任务。该流程能正确的工作,任务必须将.await之后使用的所有状态保存起来,这样才能在中断后恢复现场并继续执行。若这些状态实现了 Send 特征(可以在线程间安全地移动),那任务自然也就可以在线程间安全地移动

例如这段代码是可以编译运行的:

use tokio::task::yield_now;
use std::rc::Rc;

#[tokio::main]
async fn main() {
    tokio::spawn(async {
        // 语句块的使用强制了 `rc` 会在 `.await` 被调用前就被释放,
        // 因此 `rc` 并不会影响 `.await`的安全性
        {
            let rc = Rc::new("hello");
            println!("{}", rc);
        }

        // `rc` 的作用范围已经失效,因此当任务让出所有权给当前线程时,它无需作为状态被保存起来
        yield_now().await;
    });
}

而这段则不行:

use tokio::task::yield_now;
use std::rc::Rc;

#[tokio::main]
async fn main() {
    tokio::spawn(async {
        let rc = Rc::new("hello");

        // `rc` 在 `.await` 后还被继续使用,因此它必须被作为任务的状态保存起来
        yield_now().await;

        // 事实上,注释掉下面一行代码,依然会报错
        // 原因是:是否保存,不取决于 `rc` 是否被使用,而是取决于 `.await`在调用时是否仍然处于 `rc` 的作用域中
        println!("{}", rc);

        // rc 作用域在这里结束
    });
}

这里有一个很重要的点,代码注释里面有讲到,rc 是否保存到任务状态中,取决于.await 的调用是否处于它的作用域中,上面代码中,就算注释掉println!,该报错依然会报错,因此rc 的作用域知道async 的末尾才结束

共享状态

在使用 Tokio 编写异步代码时,一个常见的错误无条件地使用 tokio::sync::Mutex ,而真相是:Tokio 提供的异步锁只应该在跨多个 .await调用时使用,而且 TokioMutex 实际上内部使用的也是 std::sync::Mutex

  • 锁如果在多个 .await 过程中持有,应该使用 Tokio 提供的锁,原因是 .await的过程中锁可能在线程间转移,若使用标准库的同步锁存在死锁的可能性,例如某个任务刚获取完锁,还没使用完就因为 .await 让出了当前线程的所有权,结果下个任务又去获取了锁,造成死锁
  • 锁竞争不多的情况下,使用 std::sync::Mutex
  • 锁竞争多,可以考虑使用三方库提供的性能更高的锁,例如parking_lot::Mutex

当同步锁的竞争变成一个问题时,使用Tokio提供的异步锁几乎并不能帮你解决问题,此时可以考虑如下选项:

  • 创建专门的任务并使用消息传递的方式来管理状态
  • 将锁进行分片
  • 重构代码以避免锁

尝试编译下面的代码, 编译报错

rustc 1.81.0 (eeb90cda1 2024-09-04)

use std::sync::{Mutex, MutexGuard};

async fn increment_and_do_stuff(mutex: &Mutex<i32>) {
    let mut lock: MutexGuard<i32> = mutex.lock().unwrap();
    *lock += 1;

    do_something_async().await;
} // lock goes out of scope here

std::sync::MutexGuard 的类型不是 Send. 也就是说不能将互斥锁发送到另一个线程, 并且发生错误是因为 tokio 运行时可以在每个 .await 的线程之间移动任务.
为了避免这种情况, 应该重新构建代码:

// This works!
async fn increment_and_do_stuff(mutex: &Mutex<i32>) {
    {
        let mut lock: MutexGuard<i32> = mutex.lock().unwrap();
        *lock += 1;
    } // lock goes out of scope here

    do_something_async().await;
}

如果有三方库为 MutexGuard 实现了 Send. 这种情况下, 即使 .await 持有 MutexGuard, 也不会出现编译期错误. 但是会发生死锁

推荐的一些写法:

use std::sync::Mutex;

struct CanIncrement {
    mutex: Mutex<i32>,
}
impl CanIncrement {
    // This function is not marked async.
    fn increment(&self) {
        let mut lock = self.mutex.lock().unwrap();
        *lock += 1;
    }
}

async fn increment_and_do_stuff(can_incr: &CanIncrement) {
    can_incr.increment();
    do_something_async().await;
}

Channels

场景: 多个用户访问同一个资源服务器

这个时候就不能使用 异步互斥锁 来实现了, 理论上还是可行的, 但是只允许一个正在进行的请求

解决方案: message passing

tokio’s channel primitives

  • mpsc: 多生产者, 单消费者 channel
  • oneshot: 单生产者, 单消费者 channel
  • broadcast: 多生产者, 多消费者 channel
  • watch: 多生产者, 多消费者 channel; 但是不保留历史记录, 接收方只能看到最新消息

思路

mpsconeshot 都是可以产生 生产者消费者

使用 mpsc + oneshot 解决上述问题, 步骤:

  • mpsc消费者 负责处理 mpsc生产者 的指令请求, 实际的数据处理是 mpsc消费者 执行的
  • mpsc生产者 负责组装数据, 组装数据的时候把 oneshot生产者 一并发送给 mpsc消费者
  • mpsc消费者 处理完 mpsc生产者 的指令之后, 把结果通过 oneshot生产者 返回给 mpsc生产者
  • mpsc生产者 通过 oneshot消费者 接受自 mpsc消费者 返回的结果

I'm so cute. Please give me money.

其他文章
cover
Learn WebGPU
  • 2024-09-25
  • 17:03:30
  • 笔记
cover
Rust学习笔记
  • 2023-08-24
  • 14:05:39
  • 笔记
目录导航 置顶
  1. 1. 设置学习环境
    1. 1.1. mini-redis
  2. 2. Hello world
    1. 2.1. async main
    2. 2.2. cargo feature
    3. 2.3. 任务
    4. 2.4. ‘static 约束
    5. 2.5. Send 约束
  3. 3. 共享状态
  4. 4. Channels
    1. 4.1. tokio’s channel primitives
    2. 4.2. 思路
请输入关键词进行搜索