本文是阅读新版tokio文档,顺便翻译之作,啥时候读完啥时候写完咯。

Intruduction

本教程将一步步带你构建一个简化的Redis客户端和服务器。我们首先介绍Rust中异步编程的基本概念,而后实现Redis命令的一个子集。

Mini-Redis

安装完成Rust以后,请安装Mini-Redis服务器程序,这可以用来测试我们的程序:

cargo install mini-redis

你可以键入如下命令开始程序:

mini-redis-server

而后你可以另起一个命令行,用自带的客户端程序尝试获取键foo

mini-redis-cli get foo

你将看到输出为nil

Hello Tokio

引入

我们首先编写非常基础的Tokio应用,它将连接到我们的Mini-Redis服务器,插入键值对<hello, world>,而后我们用cli包来获取它,以验证正确性。

首先我们用carge创建新的Rust应用目录:

cargo new my-redis
cd my-redis

而后编辑Cargo.toml,添加依赖:

tokio = { version = "0.3", features = ["full"] }
mini-redis = "0.3"

现在我们可以编辑main.rs,代码如下:

use mini_redis::{client, Result};

#[tokio::main]
pub async fn main() -> Result<()> {
    // Open a connection to the mini-redis address.
    let mut client = client::connect("127.0.0.1:6379").await?;

    // Set the key "hello" with value "world"
    client.set("hello", "world".into()).await?;

    // Get key "hello"
    let result = client.get("hello").await?;

    println!("got value from the server; result={:?}", result);

    Ok(())
}

现在请在另一个终端运行server,然后运行main,你将得到:

cargo run
got value from the server; result=Some(b"world")

分析

我们来看看我们刚刚的程序做了什么,实际上没有几行代码,但是确实发生了很多事情。首先,client::connect函数由mini-redis包提供,它异步地创建了一个TCP连接。一旦连接建立完成,一个client句柄将被返回。即使整个过程是异步的,但是我们看到的代码似乎是同步的,唯一的暗示在于我们用到的await操作符。

什么是异步编程

大多是程序的运行顺序和编写顺序是一致的,首先执行第一行,然后第二行,以此类推。在同步编程中,如果程序遇到一条无法立即返回的指令,它将阻塞在这里,直到指令完成。例如,创建一个TCP连接需要三次握手,这可能花费很长时间,在同步编程中,当前线程将会阻塞。

在异步编程中,无法立刻返回的指令将被暂停挂在后台,而当前线程可以继续执行其他操作。一旦该指令完成,任务将重新在暂停的地方开始被恢复执行。我们的例子中只有一个任务,所以在挂起的时候程序没干别的事情,但是通常异步程序会拥有很多异步任务需要调度。

尽管异步编程可以使程序运行更高效,但它也使得程序逻辑变得复杂。程序员需要考虑所有必要的状态来恢复暂停的任务。历史上,这是一件很冗杂无聊和易出错的事情。

编译时绿色线程

Rust使用async/await特性来实现异步编程。所有包含异步操作的函数使用async来标记,在我们上面的例子中,connect函数的定义如下:

use mini_redis::Result;
use mini_redis::client::Client;
use tokio::net::ToSocketAddrs;

pub async fn connect<T: ToSocketAddrs>(addr: T) -> Result<Client> {
    // ...
}

这里的async fn的定义看起来和普通的同步函数一样,但是执行的时候是异步的。Rust在编译时就将异步函数转换成一个异步的任务。在异步函数内调用await时,调度器会把当前任务挂起,转而在此线程上执行其他任务,此异步任务将在别处处理(操作系统接受网络数据,或者读取文件等等)。

其他一些语言也实现了async/await特性,但是Rust的实现方式是独特的。Rust的async操作是lazy的,这使得其运行时语义不同于其他语言。

如果目前为止的内容你还难以理解,别担心,我们后续会有更详细的解释。

使用async/await

异步函数的调用和普通函数无异。然而,单单调用并不会执行这些函数的函数体。调用一个async函数仅仅是返回一个代表这一异步操作的值,这在概念上类似于一个无参数的闭包。我们必须显式调用.await操作符才能获得函数真正的返回值。看下面的代码:

async fn say_world() {
    println!("world");
}

#[tokio::main]
async fn main() {
    // Calling `say_world()` does not execute the body of `say_world()`.
    let op = say_world();

    // This println! comes first
    println!("hello");

    // Calling `.await` on `op` starts executing `say_world`.
    op.await;
}

这段代码输出:

hello
world

实际上,异步函数的返回值是一个实现了Future的匿名类型值。

异步的main函数

上面的程序中的main函数和通常的Rust应用不同,它是异步的,并且使用#[tokio::main]作了注解。当我们在函数中用到异步操作符的时候,该函数需要定义为async的。而async函数必须要用一个运行时去执行。该运行时需要包含异步任务调度器,并且提供内部的定时器,事件驱动型的IO框架等。这一运行时也需要main函数来启动。

#[tokio::main]是一个宏,它将async fn main()转化为一个同步的main()函数,并在其中初始化运行时并执行异步的main函数。例如,下面一段程序:

#[tokio::main]
async fn main() {
    println!("hello");
}

将被转化成:

fn main() {
    let mut rt = tokio::runtime::Runtime::new().unwrap();
    rt.block_on(async {
        println!("hello");
    })
}

Tokio运行时的细节将在后面谈到。

Cargo features

当用到tokio的时候,我们设置了features为full:

tokio = { version = "0.3", features = ["full"] }

Tokio本身有很多功能(TCP, UDP, Unix sockets, timers, sync utilities, multple scheduler types, etc)。并不是所有应用都会用到全部的功能。当我们尝试优化应用的编译时长或最终生成的软件大小时,可以砍掉一些没有用到的feature。

但是目前为止,我们直接使用full就好了。

Spawning

我们现在真正开始实现我们的Redis服务。首先我们把上一节用到的代码移到example包里,我们可以用它来测试我们自己的Redis服务器。

mkdir -p examples
mv src/main.rs examples/hello-redis.rs

然后我们新建一个空的src/main.rs文件。

接受新的socket连接

Redis服务器第一件要干的事情就是监听固定端口上的TCP连接请求,这可以用tokio::net::TcpListener完成。

许多Tokio的内部类型都和Rust标准库里的同步等价物命名相同。Tokio合理地将标准库里地一些API以异步的方式重新暴露给用户使用。

一个TcpListener绑定带6379号端口,而后我们在一个循环中接收连接,每一个连接都被处理一次,然后关闭。目前,我们只读取接受到的命令然后输出到stdout,然后向客户端响应一个error。

use tokio::net::{TcpListener, TcpStream};
use mini_redis::{Connection, Frame};

#[tokio::main]
async fn main() {
    // Bind the listener to the address
    let listener = TcpListener::bind("127.0.0.1:6379").await.unwrap();
    loop {
        // The second item contains the IP and port of the new connection.
        let (socket, _) = listener.accept().await.unwrap();
        process(socket).await;
    }
}

async fn process(socket: TcpStream) {
    // The `Connection` lets us read/write redis **frames** instead of
    // byte streams. The `Connection` type is defined by mini-redis.
    let mut connection = Connection::new(socket);

    if let Some(frame) = connection.read_frame().await.unwrap() {
        println!("GOT: {:?}", frame);

        // Respond with an error
        let response = Frame::Error("unimplemented".to_string());
        connection.write_frame(&response).await.unwrap();
    }
}

现在我们运行一下:

cargo run

在另一个终端我们运行之前的example的hello-redis:

cargo run --example hello-redis

输出应该是:

Error: "unimplemented"

在服务器的终端上应该显示如下:

GOT: Array([Bulk(b"set"), Bulk(b"hello"), Bulk(b"world")])

并发执行

除了只返回error以外,上面的程序有一个小问题,它只能一次处理一个连接。当一个连接被接受以后,服务器程序将执行process程序,处理完成后才开始接受新的请求。

我们希望我们的Redis服务器能够同时处理大量请求,这需要一些并发。

并发和并行不是一个东西。如果你在两个任务之间来回切换,你是在并发执行两个任务,但并非并行。并行指的是你必须有两个不同的个体各自同时执行一个任务。在计算机中,达到并行必须使用多个线程在多核处理器上运行。

使用Tokio的好处在于异步编程能够让许多任务并发执行,而不需要手动开启多个原生线程。事实上,Tokio调度器可以让许多任务在单线程上并发执行。

为了并发地处理连接请求,我们需要在每一次连接到来的时候发布(spawn)一个新的任务,此任务包含连接的处理逻辑,我们把main函数改成这样:

use tokio::net::TcpListener;

#[tokio::main]
async fn main() {
    let listener = TcpListener::bind("127.0.0.1:6379").await.unwrap();
    loop {
        let (socket, _) = listener.accept().await.unwrap();
        // A new task is spawned for each inbound socket. The socket is
        // moved to the new task and processed there.
        tokio::spawn(async move {
            process(socket).await;
        });
    }
}

任务

一个Tokio任务就是一个异步的绿色线程(或者说协程也不为过)。它们通过一个async块来创建,然后传入tokio::spawn函数来发布,该函数返回一个JoinHandle,调用者可以通过这一句柄和该任务交互。该async块有时会有返回值,可以通过在JoinHandle上调用.await来获得它。例如:

#[tokio::main]
async fn main() {
    let handle = tokio::spawn(async {
        // Do some async work
        "return value"
    });
    // Do some other work
    let out = handle.await.unwrap();
    println!("GOT {}", out);
}

在该句柄上await将得到一个Result,如果任务执行过程中产生错误,会返回Err,这通常发生在任务发生panic,或者运行时异常宕机导致任务被取消的情况下。

Tokio任务是Tokio调度器管理的执行单元。当我们将任务发布后,该任务被提交给调度器,由调度器负责其执行。每一个任务可能就在当前线程执行,也可能被调度到不同的线程,也有可能在不同线程间切换。

Tokio中的任务是非常轻量级的,实际上创建它们仅消耗一次内存分配以及64字节的内存。你可以随意发布任务,当然别弄个几百万个就好。

‘static限制

当你发布一个任务时,它的类型必须是静态`static的,这意味着任务不能包含任何任务外部数据的引用。

‘static并不总是意味着永生,一个值是’static的并不意味着内存泄漏:你可以在Common Rust Lifetime Misconceptions上阅读更多。

例如,下述代码无法编译通过:

use tokio::task;

#[tokio::main]
async fn main() {
    let v = vec![1, 2, 3];
    task::spawn(async {
        println!("Here's a vec: {:?}", v);
    });
}

如果你尝试编译,将会出现如下的报错:

error[E0373]: async block may outlive the current function, but
              it borrows `v`, which is owned by the current function
 --> src/main.rs:7:23
  |
7 |       task::spawn(async {
  |  _______________________^
8 | |         println!("Here's a vec: {:?}", v);
  | |                                        - `v` is borrowed here
9 | |     });
  | |_____^ may outlive borrowed value `v`
  |
note: function requires argument type to outlive `'static`
 --> src/main.rs:7:17
  |
7 |       task::spawn(async {
  |  _________________^
8 | |         println!("Here's a vector: {:?}", v);
9 | |     });
  | |_____^
help: to force the async block to take ownership of `v` (and any other
      referenced variables), use the `move` keyword
  |
7 |     task::spawn(async move {
8 |         println!("Here's a vec: {:?}", v);
9 |     });
  |

这是因为,默认情况下,变量不会被移动到async块内,这里的v的所有权仍然归main函数,而println!借用了v。编译器解释了这一错误,并建议使用move关键字把v移动进去。修改后,任务就拥有了所有内部数据的所有权,成为’static的了。如果你一定要让多个任务共享一个变量,则它必须用一些同步原语来包装,如线程安全的多重所有权指针Arc

注意到错误信息提示:参数类型需要比’static活得长。这听起来有点令人困惑,因为’static变量的生命期等同整个程序,如果它比’static活得长,岂不是内存泄漏了吗?解释如下:是类型需要活得长,而不是变量本身,变量在类型无效前就可以被销毁。

当我们说一个值是’static的时候,它仅仅意味着:任意传递这个变量都不会出错。这很重要,因为编译器无法推断一个新发布的任务会存活多长时间,所以唯一的方法就是假定它可能会永生。

Send限制

被发布的任务必须实现了Send,这允许Tokio运行时将任务在不同线程间切换。当所有生命期跨过.await语句的变量是Send的时候,该任务是Send的。这有点微妙。当.await被调用时,任务挂起,然后调度器调度其他任务。下次这个任务被唤醒时,将在挂起的时候重新恢复执行,也就是说在.await之后用到的所有变量都会被保存。而恢复执行的线程可能不同于原来的线程,因此这些变量需要是Send的。

例如,下面这段代码是正确的,因为变量rc提前释放了,它没有跨越await

use tokio::task::yield_now;
use std::rc::Rc;

#[tokio::main]
async fn main() {
    tokio::spawn(async {
        // The scope forces `rc` to drop before `.await`.
        {
            let rc = Rc::new("hello");
            println!("{}", rc);
        }
        // `rc` is no longer used. It is **not** persisted when
        // the task yields to the scheduler
        yield_now().await;
    });
}

而下面这段代码无法编译通过:

use tokio::task::yield_now;
use std::rc::Rc;

#[tokio::main]
async fn main() {
    tokio::spawn(async {
        let rc = Rc::new("hello");
        // `rc` is used after `.await`. It must be persisted to
        // the task's state.
        yield_now().await;
        println!("{}", rc);
    });
}

编译错误如下:

error: future cannot be sent between threads safely
   --> src/main.rs:6:5
    |
6   |     tokio::spawn(async {
    |     ^^^^^^^^^^^^ future created by async block is not `Send`
    | 
   ::: [..]spawn.rs:127:21
    |
127 |         T: Future + Send + 'static,
    |                     ---- required by this bound in
    |                          `tokio::task::spawn::spawn`
    |
    = help: within `impl std::future::Future`, the trait
    |       `std::marker::Send` is not  implemented for
    |       `std::rc::Rc<&str>`
note: future is not `Send` as this value is used across an await
   --> src/main.rs:10:9
    |
7   |         let rc = Rc::new("hello");
    |             -- has type `std::rc::Rc<&str>` which is not `Send`
...
10  |         yield_now().await;
    |         ^^^^^^^^^^^^^^^^^ await occurs here, with `rc` maybe
    |                           used later
11  |         println!("{}", rc);
12  |     });
    |     - `rc` is later dropped here

我们将在下一节深入讨论这个错误的一个特殊情况。

保存数据

我们现在实现新的process函数,来处理新到达的请求。我们使用HashMap来保存数据。SET命令把数据存到哈希表中,而GET命令读取它们。同时我们循环读取一次连接中的多个命令。

use tokio::net::TcpStream;
use mini_redis::{Connection, Frame};

async fn process(socket: TcpStream) {
    use mini_redis::Command::{self, Get, Set};
    use std::collections::HashMap;
    // A hashmap is used to store data
    let mut db = HashMap::new();
    // Connection, provided by `mini-redis`, handles parsing frames from
    // the socket
    let mut connection = Connection::new(socket);
    // Use `read_frame` to receive a command from the connection.
    while let Some(frame) = connection.read_frame().await.unwrap() {
        let response = match Command::from_frame(frame).unwrap() {
            Set(cmd) => {
                // The value is stored as `Vec<u8>`
                db.insert(cmd.key().to_string(), cmd.value().to_vec());
                Frame::Simple("OK".to_string())
            }
            Get(cmd) => {
                if let Some(value) = db.get(cmd.key()) {
                    // `Frame::Bulk` expects data to be of type `Bytes`. This
                    // type will be covered later in the tutorial. For now,
                    // `&Vec<u8>` is converted to `Bytes` using `into()`.
                    Frame::Bulk(value.clone().into())
                } else {
                    Frame::Null
                }
            }
            cmd => panic!("unimplemented {:?}", cmd),
        };
        // Write the response to the client
        connection.write_frame(&response).await.unwrap();
    }
}

现在我们运行这个server:

cargo run

在另一个终端运行hello-redis程序:

cargo run --example hello-redis

我们可以看到输出为:

got value from the server; success=Some(b"world")

我们现在可以插入和读取数据了,但是这里有个问题:数据并没有在连接之间共享,如果另一个客户端想要读取相同的键值,它什么都读不到。在下一节我们来看看如何把数据跨连接保存并共享。

共享状态

策略

有很多策略都能够在Tokio中共享状态:

  1. 用互斥锁来保护状态变量。
  2. 发布一个专门用于管理状态的任务,使用消息传递模式来操作状态。

一般来说第一种方法用来管理简单的数据,第二种用于需要异步操作的工作,如IO操作。在本节,我们需要共享的状态是一个哈希表,对应的操作是插入和查询(insert和get),这些操作都是同步的,因此我们使用互斥锁即可。第二种方法将在下一节讲述。

添加bytes包

我们的Mini-Redis使用Bytes数据结构,而不是Vec<u8>来表示字节流,前者来自bytes包。Bytes的设计目标是为网络编程提供健壮的字节数组。它与Vec<u8>的最大区别是浅拷贝。也就是说,当我们在Bytes实例上调用clone()方法时,并不会拷贝内部的数据,实际上它仅仅增加引用计数。在实现上,你可以认为近似于Arc<Vec<u8>>,只不过提供了更丰富的封装。

我们在Cargo.toml里添加:

bytes = "0.6"

初始化HashMap

我们的哈希表会在许多任务间共享,因此用Arc<Mutex<_>>来封装即可。

首先我们为这一类型起个别名:

use bytes::Bytes;
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
type Db = Arc<Mutex<HashMap<String, Bytes>>>;

而后我们修改main函数进行初始化,将智能指针Arc传递给process函数,Arc是线程安全的引用计数指针。

use tokio::net::TcpListener;
use std::collections::HashMap;
use std::sync::{Arc, Mutex};

#[tokio::main]
async fn main() {
    let listener = TcpListener::bind("127.0.0.1:6379").await.unwrap();
    println!("Listening");
    let db = Arc::new(Mutex::new(HashMap::new()));
    loop {
        let (socket, _) = listener.accept().await.unwrap();
        // Clone the handle to the hash map.
        let db = db.clone();
        println!("Accepted");
        tokio::spawn(async move {
            process(socket, db).await;
        });
    }
}

std::sync::Mutex

注意我们使用的是std::sync::Mutex而不是tokio::sync::Mutex。在同步的代码中使用tokio::sync::Mutex是错误的。异步的互斥锁是指其锁的范围跨越了.await操作符。

一个同步的互斥锁会使当前线程阻塞,等待获取锁,因此会使得当前线程无法再同时处理别的任务。使用tokio::sync::Mutex也并不会改变这种情况,因为异步的互斥锁在内部也是使用了原始的同步互斥锁。

通常的经验是,在异步代码块中使用同步互斥锁没啥问题,只要临界区很小并且锁不跨越.await。此外,你可以考虑使用parking_lot::Mutex,它比标准库的互斥锁更快一些。

更新process函数

process函数不再需要初始化哈希表了,它接收其实例的一个智能指针作为参数。在插入数据时,我们需要先加锁。

use tokio::net::TcpStream;
use mini_redis::{Connection, Frame};

async fn process(socket: TcpStream, db: Db) {
    use mini_redis::Command::{self, Get, Set};
    // Connection, provided by `mini-redis`, handles parsing frames from
    // the socket
    let mut connection = Connection::new(socket);
    while let Some(frame) = connection.read_frame().await.unwrap() {
        let response = match Command::from_frame(frame).unwrap() {
            Set(cmd) => {
                let mut db = db.lock().unwrap();
                db.insert(cmd.key().to_string(), cmd.value().clone());
                Frame::Simple("OK".to_string())
            }           
            Get(cmd) => {
                let db = db.lock().unwrap();
                if let Some(value) = db.get(cmd.key()) {
                    Frame::Bulk(value.clone())
                } else {
                    Frame::Null
                }
            }
            cmd => panic!("unimplemented {:?}", cmd),
        };
        // Write the response to the client
        connection.write_frame(&response).await.unwrap();
    }
}

任务,线程以及竞争

使用阻塞的互斥锁来保护短的临界区是很好的策略,尤其是竞争较小的情况。当锁被争用的时候,线程必须阻塞并且等待锁释放,这同时也导致当前线程被分配的其他任务也被阻塞。

默认情况下,Tokio运行时使用多线程的调度器。任务将在多个线程组成的线程池中调度执行。如果大量任务被发布并执行在不同线程中,且都需要获得哈希表的锁,那么就产生了竞争。换句话说,如果配置了current_thread,锁就不会被竞争。

current_thread运行时是一个轻量级,单线程的运行时。当我们发布的任务不是很多,且连接数也不多的情况下,单线程是很好的思路。

如果锁的争用很剧烈,影响程序的性能,可以考虑下面的一些选项:

  • 使用专门的Task来管理状态,用消息传递模式。
  • 把锁分片。
  • 重构代码,不用互斥锁。

在我们的例子中,因为哈希表中每个key都是独立的,锁分片是很好的策略,我们不使用单独的Mutex<HashMap<_, _>>,而是使用N个哈希表实例:

type ShardedDb = Arc<Vec<Mutex<HashMap<String, Vec<u8>>>>>;

而后,寻找相应key所在的位置分两步:首先,利用哈希值确定key所在的哈希表,然后再查该哈希表。

let shard = db[hash(key) % db.len()].lock().unwrap();
shard.insert(key, value);

dashmap包基于这一思想实现了分片的哈希表,提供更好的并发性能。

跨.await的锁

你可能会写出如下的代码:

use std::sync::Mutex;
async fn increment_and_do_stuff(mutex: &Mutex<i32>) {
    let mut lock = mutex.lock().unwrap();
    *lock += 1;
    do_something_async().await;
} // lock goes out of scope here

当你在Tokio任务中调用这一函数时,编译会报错:

error: future cannot be sent between threads safely
   --> src/lib.rs:13:5
    |
13  |     tokio::spawn(async move {
    |     ^^^^^^^^^^^^ future created by async block is not `Send`
    |
   ::: /playground/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-0.2.21/src/task/spawn.rs:127:21
    |
127 |         T: Future + Send + 'static,
    |                     ---- required by this bound in `tokio::task::spawn::spawn`
    |
    = help: within `impl std::future::Future`, the trait `std::marker::Send` is not implemented for `std::sync::MutexGuard<'_, i32>`
note: future is not `Send` as this value is used across an await
   --> src/lib.rs:7:5
    |
4   |     let mut lock = mutex.lock().unwrap();
    |         -------- has type `std::sync::MutexGuard<'_, i32>` which is not `Send`
...
7   |     do_something_async().await;
    |     ^^^^^^^^^^^^^^^^^^^^^^^^^^ await occurs here, with `mut lock` maybe used later
8   | }
    | - `mut lock` is later dropped here

错误原因是,std::sync:MutexGuard不是Send的。这意味着你不能让一个互斥锁跨线程传递。前面说过,Tokio调度器可能会让一个任务在多个线程上执行。为了解决这个问题,你必须要在.await之前释放锁:

// This works!
async fn increment_and_do_stuff(mutex: &Mutex<i32>) {
    {
        let mut lock = mutex.lock().unwrap();
        *lock += 1;
    } // lock goes out of scope here
    do_something_async().await;
}

注意,虽然看起来很对,但是下面这段代码无法通过编译:

use std::sync::Mutex;

// This fails too.
async fn increment_and_do_stuff(mutex: &Mutex<i32>) {
    let mut lock = mutex.lock().unwrap();
    *lock += 1;
    drop(lock);
    do_something_async().await;
}

这是因为编译器目前仅根据作用域来判断一个future是否是Send的。将来可能会支持显式的drop,不过现在还是老老实实用大括号吧。

不要想着能不能在不要求Send的情况下发布任务来躲避这一问题(单线程调度器)。因为如果Tokio在await时挂起了你的任务,但锁还未释放,另一个任务可能会在同一线程执行,并请求同一个锁。此时就产生了死锁,因为当前线程阻塞,但是锁永远无法释放。

其他办法

在await前通过作用域释放锁是一个办法,实际上还有更鲁棒的写法。例如,你可以把互斥锁包装在一个结构体中,然后通过一个方法来执行锁的逻辑。

use std::sync::Mutex;

struct CanIncrement {
    mutex: Mutex<i32>,
}
impl CanIncrement {
    // This function is not marked async.
    fn increment(&self) {
        let mut lock = self.mutex.lock().unwrap();
        *lock += 1;
    }
}
async fn increment_and_do_stuff(can_incr: &CanIncrement) {
    can_incr.increment();
    do_something_async().await;
}

这一编程模式保证了上面的Send错误永远不会出现,因为锁根本就没有出现在异步块中。

另一个办法就是使用消息传递策略,我们将在下一节提到。

Tokio的异步锁

我们前面提到的tokio::sync::Mutex终于派上用场了。它最主要的特性就是能够跨越.await,而不产生问题。当然这也意味着异步锁的性能损失更大,如果非必要,请用上面的两种策略,而不是用异步锁:

use tokio::sync::Mutex; // note! This uses the Tokio mutex
// This compiles!
// (but restructuring the code would be better in this case)
async fn increment_and_do_stuff(mutex: &Mutex<i32>) {
    let mut lock = mutex.lock().await;
    *lock += 1;
    do_something_async().await;
} // lock goes out of scope here

Channels

现在我们已经学到了一些Tokio的异步知识了,本节我们来写写客户端。首先我们尝试发布Tokio任务来发送命令:

use mini_redis::client;
#[tokio::main]
async fn main() {
    // Establish a connection to the server
    let mut client = client::connect("127.0.0.1:6379").await.unwrap();
    // Spawn two tasks, one gets a key, the other sets a key
    let t1 = tokio::spawn(async {
        let res = client.get("hello").await;
    });
    let t2 = tokio::spawn(async {
        client.set("foo", "bar".into()).await;
    });
    t1.await.unwrap();
    t2.await.unwrap();
}

上面这段代码并不能通过编译,因为两个任务都需要用到client变量,但是Client并没有实现Copy,因此这一共享是错误的。当然我们可以为每条命令都新建一个连接,但显然这很慢。

消息传递

答案是使用消息传递模型,即我们使用专门的任务来管理client连接。任何想要发送新的命令的任务都把指令作为一条消息传递给该client任务,client任务负责把命令发送给服务端,然后再把消息传回原任务。

使用这一策略,我们只需要维护一个与服务端的连接即可。client任务能够很好地处理收到的命令,同时channel本身也起到缓冲的作用。此模型很好地提高吞吐量,同时也可以扩展成连接池模型。

Tokio的channel原语

Tokio提供了许多类型的channel,每一个都提供不同的功能和设计目标:

  • mpsc:多生产者,单消费者。可以一次传递多个值。
  • oneshot:单生产者,单消费者。一次只能传递一个值。
  • broadcast:多生产者,多消费者。一次传递多值,值能够被每个接收器看见。
  • watch:单生产者,多消费者。一次传递多值,不过不记录消息历史。接收器只能看见最近的值。

如果你需要多生产者,多消费者的channel,并且只有一个接收器能看见全部值,你可以使用async-channel包。在同步编程中也有一些channel,如std::sync::mpsc和crossbeam::channel,这些channel在等待数据的时候会阻塞当前线程,而在Tokio异步编程中,当前线程不会阻塞。

在本节,我们会使用mpsconeshot,另外几种channel将在后续讲解。

定义消息类型

在大多数情况下,当我们使用消息传递时,接收消息的任务常常需要响应多个累积的消息。在我们的例子中,该任务需要处理GETSET消息,我们首先定义Command类型:

use bytes::Bytes;

#[derive(Debug)]
enum Command {
    Get {
        key: String,
    },
    Set {
        key: String,
        val: Bytes,
    }
}

创建channel

在main函数中,我们以如下方式创建mpsc通道:

use tokio::sync::mpsc;

#[tokio::main]
async fn main() {
    // Create a new channel with a capacity of at most 32.
    let (mut tx, mut rx) = mpsc::channel(32);
    // ... Rest comes here
}

这个channel是为了向管理连接的任务发送命令的,多生产者属性使得我们可以在多个任务中发送消息。tx和rx分别是channel的发送器和接收器句柄,它们可以被分配给不同的任务。

当前channel以容量32创建,如果消息的发送速度大于接受速度,则通道会保存未读取的数据。一旦通道中保存的数据超过32,调用send(...).await会导致任务挂起,直到channel有多余空间。

从多个任务发送数据可以通过克隆Sender来实现,例如:

use tokio::sync::mpsc;

#[tokio::main]
async fn main() {
    let (mut tx, mut rx) = mpsc::channel(32);
    let mut tx2 = tx.clone();

    tokio::spawn(async move {
        tx.send("sending from first handle").await;
    });

    tokio::spawn(async move {
        tx2.send("sending from second handle").await;
    });

    while let Some(message) = rx.recv().await {
        println!("GOT = {}", message);
    }
}

在上面的代码中,两个数据都被发送到了Reciever,注意mpsc的接收器是无法复制的。

当每个Sender实例都结束生命期后,这就意味着我们无法再向对应的接收器发送任何数据。此时,如果对rx调用recv会得到None,这代表着所有Sender都已经释放,因此channel关闭。

在mini-redis项目中,管理客户端连接的任务将在channel关闭的同时关闭对应的TCP连接,因为不会再有新的命令了,此redis连接作废。

创建manager

现在我们创建一个新的任务,用以管理channel的数据接收和发送。首先,我们建立一个新的与Redis服务的TCP连接,之后我们把从channel中接收到的数据转发到Redis即可。

use mini_redis::client;
// The `move` keyword is used to **move** ownership of `rx` into the task.
let manager = tokio::spawn(async move {
    // Establish a connection to the server
    let mut client = client::connect("127.0.0.1:6379").await.unwrap();

    // Start receiving messages
    while let Some(cmd) = rx.recv().await {
        use Command::*;

        match cmd {
            Get { key } => {
                client.get(&key).await;
            }
            Set { key, val } => {
                client.set(&key, val).await;
            }
        }
    }
});

现在我们把上一小节的代码修改一下,把数据发送给manager,而不是Redis服务器:

// The `Sender` handles are moved into the tasks. As there are two
// tasks, we need a second `Sender`.
let mut tx2 = tx.clone();

// Spawn two tasks, one gets a key, the other sets a key
let t1 = tokio::spawn(async move {
    let cmd = Command::Get {
        key: "hello".to_string(),
    };

    tx.send(cmd).await.unwrap();
});

let t2 = tokio::spawn(async move {
    let cmd = Command::Set {
        key: "foo".to_string(),
        val: "bar".into(),
    };

    tx2.send(cmd).await.unwrap();
});

在main函数的结尾我们await三个任务的JoinHandle,以保证它们都执行完成:

t1.await.unwrap();
t2.await.unwrap();
manager.await.unwrap();

接收响应

最后一步,我们还要把Redis服务器返回的响应内容转发到发送命令的task。GET命令需要获得具体的值,而SET命令需要返回操作结果(失败or成功)。

为了把response传回去,我们使用前面介绍的oneshot通道。它是单生产者,单消费者,并且为单值的消息传递作了优化。在我们的例子中,单值指的是Response。

类似于mpsconeshot::channel返回发送器和接收器句柄:

use tokio::sync::oneshot;
let (tx, rx) = oneshot::channel();

不过和mpsc不同,我们不需要设置channel的大小,并且任何一个句柄都是不可复制的。

为了从manager协程接收到响应,在发送命令之前,我们需要先创建一个oneshot通道,然后把发送器一起传给manager即可。为此我们更新一下之前定义的Command类型:

use tokio::sync::oneshot;
use bytes::Bytes;

/// Multiple different commands are multiplexed over a single channel.
#[derive(Debug)]
enum Command {
    Get {
        key: String,
        resp: Responder<Option<Bytes>>,
    },
    Set {
        key: String,
        val: Vec<u8>,
        resp: Responder<()>,
    },
}
/// Provided by the requester and used by the manager task to send
/// the command response back to the requester.
type Responder<T> = oneshot::Sender<mini_redis::Result<T>>;

现在我们更新之前的两个发送任务:

let t1 = tokio::spawn(async move {
    let (resp_tx, resp_rx) = oneshot::channel();
    let cmd = Command::Get {
        key: "hello".to_string(),
        resp: resp_tx,
    };
    // Send the GET request
    tx.send(cmd).await.unwrap();
    // Await the response
    let res = resp_rx.await;
    println!("GOT = {:?}", res);
});

let t2 = tokio::spawn(async move {
    let (resp_tx, resp_rx) = oneshot::channel();
    let cmd = Command::Set {
        key: "foo".to_string(),
        val: b"bar".to_vec(),
        resp: resp_tx,
    };
    // Send the SET request
    tx2.send(cmd).await.unwrap();
    // Await the response
    let res = resp_rx.await;
    println!("GOT = {:?}", res);
});

最后再修改一下manager任务,让它把接收到的响应通过Sender发送回去:

while let Some(cmd) = rx.recv().await {
    match cmd {
        Command::Get { key, resp } => {
            let res = client.get(&key).await;
            // Ignore errors
            let _ = resp.send(res);
        }
        Command::Set { key, val, resp } => {
            let res = client.set(&key, val.into()).await;
            // Ignore errors
            let _ = resp.send(res);
        }
    }
}

注意,调用oneshot通道的send是一个同步方法,不需要await。这是因为oneshot的send会立刻知道是否发送成功。当接收器实例已经释放,则send会返回Err,在上面的代码中,我们忽略这一错误,因为这是允许的情况。

背压以及有界channel

当并发和队列同时出现,我们需要保证队列是有界的,并且系统能够优雅地处理负载。无界地队列最终有可能会占满整个内存,导致系统出现不可预知的错误。

Tokio防止隐式的队列出现。很大一部分原因是异步操作是惰性的,例如下面的代码:

loop {
    async_op();
}

如果异步操作是立刻执行的,那么上面的循环会不断地创建新的异步操作,而不会等待上一个操作执行完成,这就产生了隐式的无界队列。基于回调的系统,以及基于eager future的系统特别容易出现这样的错误。

然而,异步Rust和Tokio很好地规避了这一问题,上面的代码在Rust中不会直接执行,async_op本身只是一个Future变量,只有调用await才能让它真正执行,否则就只是一个单纯的死循环,不会占用更多内存。

并发和队列必须显式调用,比如通过如下形式:

  • tokio::spawn
  • select!
  • join!
  • mpsc::channel

当我们做这些调用的时候,请保证所有的并发任务数目是有界的。例如,当编写异步接收TCP请求的循环时,请保证socket的数量是有界的;当使用mpsc通道时,设置一个适当的channel容量,容量大小取决于具体应用的特性。

小心地选取适当的bound是编写可靠的Tokio应用程序的重要一步。

I/O

Tokio里面的I/O操作和标准库里的很像,只不过全部进行了异步化。我们使用两个trait来实现:AsyncReadAsyncWrite。一些常用的类型都实现了这两种trait,例如TcpStreamFileStdout。一些常用的数据结构也实现了,如Vec<u8>&[u8]

本节将介绍基本的Tokio I/O编程,下一节将介绍更高级的用法。

AsyncRead和AsyncWrite

这两个trait提供了异步读写字节流的功能,其内部方法实际上不会手动调用,而是通过Tokio提供的工具包AsyncReadExtAsyncWriteExt来调用。

我们简单看看这些工具包里的方法,注意所有的方法都是异步的,需要使用await才能实际执行。

async fn read()

AsyncReadExt::read()提供了异步读取数据到一个buffer的能力,返回读取到的字节数。

注意:当read()返回Ok(0)时,说明流已经关闭了,任何后续的读取都会返回0。因此如果读写的是TcpStream,我们可以直接关闭连接了(实际上此时说明TCP连接的对方已经调用close关闭连接,发送了FIN报文)。

use tokio::fs::File;
use tokio::io::{self, AsyncReadExt};

#[tokio::main]
async fn main() -> io::Result<()> {
    let mut f = File::open("foo.txt").await?;
    let mut buffer = [0; 10];

    // read up to 10 bytes
    let n = f.read(&mut buffer[..]).await?;

    println!("The bytes: {:?}", &buffer[..n]);
    Ok(())
}

async fn read_to_end()

AsyncReadExt::read_to_end将从流中读取所有字节,直到遇到EOF。

use tokio::io::{self, AsyncReadExt};
use tokio::fs::File;

#[tokio::main]
async fn main() -> io::Result<()> {
    let mut f = File::open("foo.txt").await?;
    let mut buffer = Vec::new();

    // read the whole file
    f.read_to_end(&mut buffer).await?;
    Ok(())
}

async fn write()

AsyncWriteExt::write将一个buffer里面的所有字节都写到流中(不一定能全部写进去),返回实际写入的字节数。

use tokio::io::{self, AsyncWriteExt};
use tokio::fs::File;

#[tokio::main]
async fn main() -> io::Result<()> {
    let mut file = File::create("foo.txt").await?;

    // Writes some prefix of the byte string, but not necessarily all of it.
    let n = file.write(b"some bytes").await?;

    println!("Wrote the first {} bytes of 'some bytes'.", n);
    Ok(())
}

async fn write_all()

AsyncWriteExt::write_all把整个buffer中的数据全部写入,保证全部写入完成才会返回。

use tokio::io::{self, AsyncWriteExt};
use tokio::fs::File;

#[tokio::main]
async fn main() -> io::Result<()> {
    let mut buffer = File::create("foo.txt").await?;

    buffer.write_all(b"some bytes").await?;
    Ok(())
}

除此之外还有很多其他实用的函数可以查看API文档。

Helper函数

和标准库std一样,tokio::io模块包含很多实用的工具函数,例如tokio::io::copy异步地将一个reader的数据拷贝到一个writer发送出去。

use tokio::fs::File;
use tokio::io;

#[tokio::main]
async fn main() -> io::Result<()> {
    let mut reader: &[u8] = b"hello";
    let mut file = File::create("foo.txt").await?;

    io::copy(&mut reader, &mut file).await?;
    Ok(())
}

当然这利用了一个事实:字节数组&[u8]实现了AsyncRead

Echo服务器

回响服务器是常见的网络编程例子,我们来实现一下。首先需要创建一个TcpListener,接收到来的客户端请求,每个连接的处理都是一致的:从socket中读取数据,然后直接将读取到的数据写回。因此客户端看到的响应就是自己之前发送的数据。

我们将用两种不太一样的策略来实现echo服务器。

使用io::copy实现

我们首先利用前面介绍的io::copy方法来实现,大致的框架先搭好:

use tokio::io;
use tokio::net::TcpListener;

#[tokio::main]
async fn main() -> io::Result<()> {
    let mut listener = TcpListener::bind("127.0.0.1:6142").await.unwrap();

    loop {
        let (mut socket, _) = listener.accept().await?;

        tokio::spawn(async move {
            // Copy data here
        });
    }
}

显然我们只需要在空出来的异步块中填写copy的逻辑即可,注意到TcpStream同时实现了AsyncReadAsyncWrite,是不是可以直接作为reader和writer调用copy呢?

// This fails to compile
io::copy(&mut socket, &mut socket).await

很遗憾上面的代码无法编译,因为我们同时可变引用了两次socket变量(&mut),这在Rust中是不允许的。

拆分成reader和writer

如何处理这个问题呢,我们把一个socket拆分成用于reader的句柄和用于writer的句柄。任何同时具有reader和writer属性的类型都可以使用io::aplit拆分,此函数返回reader和writer句柄,而后这两个句柄可以单独用于不同的任务。

例如我们的echo客户端可以如下编写:

use tokio::io::{self, AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpStream;
#[tokio::main]
async fn main() -> io::Result<()> {
    let socket = TcpStream::connect("127.0.0.1:6142").await?;
    let (mut rd, mut wr) = io::split(socket);
    // Write data in the background
    let write_task = tokio::spawn(async move {
        wr.write_all(b"hello\r\n").await?;
        wr.write_all(b"world\r\n").await?;
        // Sometimes, the rust type inferencer needs
        // a little help
        Ok::<_, io::Error>(())
    });
    let mut buf = vec![0; 128];
    loop {
        let n = rd.read(&mut buf).await?;
        if n == 0 {
            break;
        }
        println!("GOT {:?}", &buf[..n]);
    }
    Ok(())
}

io::split支持任何实现了AsyncWrite+AsyncRead的类型,在内部它使用ArcMutex,这会产生一定的性能损失。对于TcpStream,我们提供了独特的split机制。

TcpStream::split接收一个stream的引用作为参数,然会读写句柄。由于我们使用的是引用,因此得到的两个读写句柄不能再用在别的task上了,只能用于当前任务。这个特殊的split函数是零开销的,内部没有使用ArcMutexTcpStream还提供了into_split方法,其提供了可以传递给其他任务的handle,里面用到了Arc

由于io::copy是在同一task中调用的,我们可以直接使用TcpStream::split来实现拷贝的逻辑:

tokio::spawn(async move {
    let (mut rd, mut wr) = socket.split();
    if io::copy(&mut rd, &mut wr).await.is_err() {
        eprintln!("failed to copy");
    }
});

手动copy

现在我们考虑不用Tokio提供的工具,手动使用readwrite_all来实现echo服务。代码如下:

use tokio::io::{self, AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpListener;

#[tokio::main]
async fn main() -> io::Result<()> {
    let mut listener = TcpListener::bind("127.0.0.1:6142").await.unwrap();
    loop {
        let (mut socket, _) = listener.accept().await?;
        tokio::spawn(async move {
            let mut buf = vec![0; 1024];
            loop {
                match socket.read(&mut buf).await {
                    // Return value of `Ok(0)` signifies that the remote has
                    // closed
                    Ok(0) => return,
                    Ok(n) => {
                        // Copy the data back to socket
                        if socket.write_all(&buf[..n]).await.is_err() {
                            // Unexpected socket error. There isn't much we can
                            // do here so just stop processing.
                            return;
                        }
                    }
                    Err(_) => {
                        // Unexpected socket error. There isn't much we can do
                        // here so just stop processing.
                        return;
                    }
                }
            }
        });
    }
}

我们一步步分析一下上面的代码。首先,创建一个大小为1024的buffer。注意我们使用的是Vec,堆上的动态数组,而不是栈上的基础数组[u8]。因为buffer的使用跨越了await,如果用栈数组来表示buffer,理论上也没问题,但由于它是栈上空间,我们每次处理一个请求都会开启新的一个task,每个task具有一个数据结构,看起来可能像这样:

struct Task {
    // internal task fields here
    task: enum {
        AwaitingRead {
            socket: TcpStream,
            buf: [BufferType],
        },
        AwaitingWriteAll {
            socket: TcpStream,
            buf: [BufferType],
        }

    }
}

如果使用栈数组,它将直接存放在task结构体中,这会导致结构体比较大,而且由于buffer的大小通常设为页的整数倍,这导致task结构的大小有点尴尬。因此,我们推荐使用堆上内存去分配buffer。

当TCP数据流关闭后,我们将从read中读到Ok(0),此时必须跳出循环,因为后续的读取都会是这个,不跳出的话程序就一定会陷入死循环。

Framing

我们现在可以应用前面的IO知识实现Mini-Redis的framing层(也叫解码层),指的是将字节流中的数据转化为数据帧的流,一个帧是客户端和服务端数据传输的一个单位。我们使用的Redis通信协议的数据帧如下:

use bytes::Bytes;

enum Frame {
    Simple(String),
    Error(String),
    Integer(u64),
    Bulk(Bytes),
    Null,
    Array(Vec<Frame>),
}

一个HTTP协议的帧可能会长这样:

enum HttpFrame {
    RequestHead {
        method: Method,
        uri: Uri,
        version: Version,
        headers: HeaderMap,
    },
    ResponseHead {
        status: StatusCode,
        version: Version,
        headers: HeaderMap,
    },
    BodyChunk {
        chunk: Bytes,
    },
}

为了实现Mini-Redis,我们实现一个Connection结构体,它包装一个TcpStream并且读写mini_redis::Frame值。

use tokio::net::TcpStream;
use mini_redis::{Frame, Result};

struct Connection {
    stream: TcpStream,
    // ... other fields here
}

impl Connection {
    /// Read a frame from the connection.
    /// 
    /// Returns `None` if EOF is reached
    pub async fn read_frame(&mut self)
        -> Result<Option<Frame>>
    {
        // implementation here
    }

    /// Write a frame to the connection.
    pub async fn write_frame(&mut self, frame: &Frame)
        -> Result<()>
    {
        // implementation here
    }
}

完整的代码在这里

缓冲读

read_frame方法等待接收到一个完整的帧,这是因为一次read读到的数据是不确定的(流式传输的特点),可能有多个帧,也可能只传输了部分帧。因此如果读到了不完整的一个帧,我们将数据存放在缓冲区等待新的数据,如果读到了多个帧,我们一次只返回一个帧,剩下的也放在缓冲区,用于下次read_frame调用。

为了实现这样的想法,Connection需要一个读缓冲区,数据将从socket读到缓冲区,当一个完整的帧解析成功,我们就把对应的数据移除。

我们使用BytesMut作为缓冲区的类型,这是Bytes类型的可变版本。

use bytes::BytesMut;
use tokio::net::TcpStream;

pub struct Connection {
    stream: TcpStream,
    buffer: BytesMut,
}

impl Connection {
    pub fn new(stream: TcpStream) -> Connection {
        Connection {
            stream,
            // Allocate the buffer with 4kb of capacity.
            buffer: BytesMut::with_capacity(4096),
        }
    }
}

然后我们再来实现read_frame

use tokio::io::AsyncReadExt;
use bytes::Buf;
use mini_redis::Result;

pub async fn read_frame(&mut self)
    -> Result<Option<Frame>>
{
    loop {
        // Attempt to parse a frame from the buffered data. If
        // enough data has been buffered, the frame is
        // returned.
        if let Some(frame) = self.parse_frame()? {
            return Ok(Some(frame));
        }

        // There is not enough buffered data to read a frame.
        // Attempt to read more data from the socket.
        //
        // On success, the number of bytes is returned. `0`
        // indicates "end of stream".
        if 0 == self.stream.read_buf(&mut self.buffer).await? {
            // The remote closed the connection. For this to be
            // a clean shutdown, there should be no data in the
            // read buffer. If there is, this means that the
            // peer closed the socket while sending a frame.
            if self.buffer.is_empty() {
                return Ok(None);
            } else {
                return Err("connection reset by peer".into());
            }
        }
    }
}

我们分析一下,函数的主体在一个循环中,我们首先尝试从现有的缓冲区中解析出一个帧,如果解析成功,我们就可以将帧返回,如果解析不出,说明缓冲区内没有足够的数据。此时我们尝试把socket中接收到的数据读到buffer中,如果读到的数据量为0,要么帧已经完整解析完,socket里面没有数据了,这是正常的连接关闭,如果帧没有解析完,但是对方已经关闭连接,则属于不正常关闭,需要返回Err。

Buf trait

注意到我们这次从Tcp流中读取数据时用的方法是read_buf,此方法接收的参数必须实现bytes包中的BufMut

如果我们用Vec<u8>来实现,则可以直接使用前面用到的read()方法,此时需要一个cursor来表示当前缓存的数据的尾部,Connection的定义需要修改为如下:

use tokio::net::TcpStream;

pub struct Connection {
    stream: TcpStream,
    buffer: Vec<u8>,
    cursor: usize,
}

impl Connection {
    pub fn new(stream: TcpStream) -> Connection {
        Connection {
            stream,
            // Allocate the buffer with 4kb of capacity.
            buffer: vec![0; 4096],
            cursor: 0,
        }
    }
}

此时read_frame函数也要修改:

use mini_redis::{Frame, Result};

pub async fn read_frame(&mut self)
    -> Result<Option<Frame>>
{
    loop {
        if let Some(frame) = self.parse_frame()? {
            return Ok(Some(frame));
        }
        // Ensure the buffer has capacity
        if self.buffer.len() == self.cursor {
            // Grow the buffer
            self.buffer.resize(self.cursor * 2, 0);
        }
        // Read into the buffer, tracking the number
        // of bytes read
        let n = self.stream.read(
            &mut self.buffer[self.cursor..]).await?;
        if 0 == n {
            if self.cursor == 0 {
                return Ok(None);
            } else {
                return Err("connection reset by peer".into());
            }
        } else {
            // Update our cursor
            self.cursor += n;
        }
    }
}

可以看到我们必须要使用cursor来保证新读取的数据不会覆盖还未处理完成的数据,同时如果缓冲区的容量用完了,我们必须手动把缓冲区扩容,在parse_frame中,我们需要处理的部分是self.buffer[..self.cursor]

因为这样的字节数组的处理非常常见,bytes包提供了高级的抽象:BufBufMutBuf指的是可读的数据,BufMut指可写数据类型。当我们将实现了BufMut的实例传入read_buf时,read_buf会帮你处理内部的cursor,这使得外部代码更简洁。

此外,当我们使用Vec<u8>时,buffer必须要初始化,上面的代码就初始化成4KB的大小,每个位置都是0。当我们扩容的时候,新的内容也是初始化为零。初始化过程是非必须的,因为反正后面读的时候也是要覆盖掉原来的数据的。我们的BytesMut 的实现保证了数据不会初始化,省去了这一开销,同时它也在API层面禁止用户访问未初始化的数据。

Parsing

现在我们看如何实现parse_frame函数,解析帧需要两步:

  1. 读完整的一个帧,定位到尾部。
  2. 解析这个帧。

mini_redis包提供了两个函数处理这个事:Frame::checkFrame::parse

同时我们也会用到BufBuf类型的数据将被传入check,check会检查一个帧,同时移动内部的cursor,当其返回时,内部的cursor就会指向帧的尾部。std::io::Cursor就实现了Buf

use mini_redis::{Frame, Result};
use mini_redis::frame::Error::Incomplete;
use bytes::Buf;
use std::io::Cursor;

fn parse_frame(&mut self)
    -> Result<Option<Frame>>
{
    // Create the `T: Buf` type.
    let mut buf = Cursor::new(&self.buffer[..]);
    // Check whether a full frame is available
    match Frame::check(&mut buf) {
        Ok(_) => {
            // Get the byte length of the frame
            let len = buf.position() as usize;
            // Reset the internal cursor for the
            // call to `parse`.
            buf.set_position(0);
            // Parse the frame
            let frame = Frame::parse(&mut buf)?;
            // Discard the frame from the buffer
            self.buffer.advance(len);
            // Return the frame to the caller.
            Ok(Some(frame))
        }
        // Not enough data has been buffered
        Err(Incomplete) => Ok(None),
        // An error was encountered
        Err(e) => Err(e.into()),
    }
}

check函数的实现在这里。我们不会完整讲解这个函数。

注意到,Buf的API是字节迭代器风格的,例如我们需要查看第一个字节来判断帧的类型,用到的方法是Buf::get_u8,它读取当前cursor位置的1个字节,并将cursor后移1位。还有很多好用的方法,请阅读API文档

缓冲写

framing的另一个API是write_frame(frame)