本文是阅读新版tokio文档,顺便翻译之作,啥时候读完啥时候写完咯。
Intruduction
本教程将一步步带你构建一个简化的Redis客户端和服务器。我们首先介绍Rust中异步编程的基本概念,而后实现Redis命令的一个子集。
Mini-Redis
安装完成Rust以后,请安装Mini-Redis服务器程序,这可以用来测试我们的程序:
cargo install mini-redis
你可以键入如下命令开始程序:
mini-redis-server
而后你可以另起一个命令行,用自带的客户端程序尝试获取键foo
:
mini-redis-cli get foo
你将看到输出为nil
。
Hello Tokio
引入
我们首先编写非常基础的Tokio应用,它将连接到我们的Mini-Redis服务器,插入键值对<hello, world>
,而后我们用cli
包来获取它,以验证正确性。
首先我们用carge创建新的Rust应用目录:
cargo new my-redis
cd my-redis
而后编辑Cargo.toml
,添加依赖:
tokio = { version = "0.3", features = ["full"] }
mini-redis = "0.3"
现在我们可以编辑main.rs
,代码如下:
use mini_redis::{client, Result};
#[tokio::main]
pub async fn main() -> Result<()> {
// Open a connection to the mini-redis address.
let mut client = client::connect("127.0.0.1:6379").await?;
// Set the key "hello" with value "world"
client.set("hello", "world".into()).await?;
// Get key "hello"
let result = client.get("hello").await?;
println!("got value from the server; result={:?}", result);
Ok(())
}
现在请在另一个终端运行server,然后运行main,你将得到:
cargo run
got value from the server; result=Some(b"world")
分析
我们来看看我们刚刚的程序做了什么,实际上没有几行代码,但是确实发生了很多事情。首先,client::connect
函数由mini-redis包提供,它异步地创建了一个TCP连接。一旦连接建立完成,一个client
句柄将被返回。即使整个过程是异步的,但是我们看到的代码似乎是同步的,唯一的暗示在于我们用到的await
操作符。
什么是异步编程
大多是程序的运行顺序和编写顺序是一致的,首先执行第一行,然后第二行,以此类推。在同步编程中,如果程序遇到一条无法立即返回的指令,它将阻塞在这里,直到指令完成。例如,创建一个TCP连接需要三次握手,这可能花费很长时间,在同步编程中,当前线程将会阻塞。
在异步编程中,无法立刻返回的指令将被暂停挂在后台,而当前线程可以继续执行其他操作。一旦该指令完成,任务将重新在暂停的地方开始被恢复执行。我们的例子中只有一个任务,所以在挂起的时候程序没干别的事情,但是通常异步程序会拥有很多异步任务需要调度。
尽管异步编程可以使程序运行更高效,但它也使得程序逻辑变得复杂。程序员需要考虑所有必要的状态来恢复暂停的任务。历史上,这是一件很冗杂无聊和易出错的事情。
编译时绿色线程
Rust使用async/await
特性来实现异步编程。所有包含异步操作的函数使用async
来标记,在我们上面的例子中,connect
函数的定义如下:
use mini_redis::Result;
use mini_redis::client::Client;
use tokio::net::ToSocketAddrs;
pub async fn connect<T: ToSocketAddrs>(addr: T) -> Result<Client> {
// ...
}
这里的async fn
的定义看起来和普通的同步函数一样,但是执行的时候是异步的。Rust在编译时就将异步函数转换成一个异步的任务。在异步函数内调用await
时,调度器会把当前任务挂起,转而在此线程上执行其他任务,此异步任务将在别处处理(操作系统接受网络数据,或者读取文件等等)。
其他一些语言也实现了
async/await
特性,但是Rust的实现方式是独特的。Rust的async操作是lazy的,这使得其运行时语义不同于其他语言。
如果目前为止的内容你还难以理解,别担心,我们后续会有更详细的解释。
使用async/await
异步函数的调用和普通函数无异。然而,单单调用并不会执行这些函数的函数体。调用一个async函数仅仅是返回一个代表这一异步操作的值,这在概念上类似于一个无参数的闭包。我们必须显式调用.await
操作符才能获得函数真正的返回值。看下面的代码:
async fn say_world() {
println!("world");
}
#[tokio::main]
async fn main() {
// Calling `say_world()` does not execute the body of `say_world()`.
let op = say_world();
// This println! comes first
println!("hello");
// Calling `.await` on `op` starts executing `say_world`.
op.await;
}
这段代码输出:
hello
world
实际上,异步函数的返回值是一个实现了Future
的匿名类型值。
异步的main函数
上面的程序中的main函数和通常的Rust应用不同,它是异步的,并且使用#[tokio::main]
作了注解。当我们在函数中用到异步操作符的时候,该函数需要定义为async的。而async函数必须要用一个运行时去执行。该运行时需要包含异步任务调度器,并且提供内部的定时器,事件驱动型的IO框架等。这一运行时也需要main函数来启动。
#[tokio::main]
是一个宏,它将async fn main()
转化为一个同步的main()
函数,并在其中初始化运行时并执行异步的main函数。例如,下面一段程序:
#[tokio::main]
async fn main() {
println!("hello");
}
将被转化成:
fn main() {
let mut rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async {
println!("hello");
})
}
Tokio运行时的细节将在后面谈到。
Cargo features
当用到tokio的时候,我们设置了features为full:
tokio = { version = "0.3", features = ["full"] }
Tokio本身有很多功能(TCP, UDP, Unix sockets, timers, sync utilities, multple scheduler types, etc)。并不是所有应用都会用到全部的功能。当我们尝试优化应用的编译时长或最终生成的软件大小时,可以砍掉一些没有用到的feature。
但是目前为止,我们直接使用full就好了。
Spawning
我们现在真正开始实现我们的Redis服务。首先我们把上一节用到的代码移到example包里,我们可以用它来测试我们自己的Redis服务器。
mkdir -p examples
mv src/main.rs examples/hello-redis.rs
然后我们新建一个空的src/main.rs
文件。
接受新的socket连接
Redis服务器第一件要干的事情就是监听固定端口上的TCP连接请求,这可以用tokio::net::TcpListener
完成。
许多Tokio的内部类型都和Rust标准库里的同步等价物命名相同。Tokio合理地将标准库里地一些API以异步的方式重新暴露给用户使用。
一个TcpListener绑定带6379号端口,而后我们在一个循环中接收连接,每一个连接都被处理一次,然后关闭。目前,我们只读取接受到的命令然后输出到stdout,然后向客户端响应一个error。
use tokio::net::{TcpListener, TcpStream};
use mini_redis::{Connection, Frame};
#[tokio::main]
async fn main() {
// Bind the listener to the address
let listener = TcpListener::bind("127.0.0.1:6379").await.unwrap();
loop {
// The second item contains the IP and port of the new connection.
let (socket, _) = listener.accept().await.unwrap();
process(socket).await;
}
}
async fn process(socket: TcpStream) {
// The `Connection` lets us read/write redis **frames** instead of
// byte streams. The `Connection` type is defined by mini-redis.
let mut connection = Connection::new(socket);
if let Some(frame) = connection.read_frame().await.unwrap() {
println!("GOT: {:?}", frame);
// Respond with an error
let response = Frame::Error("unimplemented".to_string());
connection.write_frame(&response).await.unwrap();
}
}
现在我们运行一下:
cargo run
在另一个终端我们运行之前的example的hello-redis:
cargo run --example hello-redis
输出应该是:
Error: "unimplemented"
在服务器的终端上应该显示如下:
GOT: Array([Bulk(b"set"), Bulk(b"hello"), Bulk(b"world")])
并发执行
除了只返回error以外,上面的程序有一个小问题,它只能一次处理一个连接。当一个连接被接受以后,服务器程序将执行process程序,处理完成后才开始接受新的请求。
我们希望我们的Redis服务器能够同时处理大量请求,这需要一些并发。
并发和并行不是一个东西。如果你在两个任务之间来回切换,你是在并发执行两个任务,但并非并行。并行指的是你必须有两个不同的个体各自同时执行一个任务。在计算机中,达到并行必须使用多个线程在多核处理器上运行。
使用Tokio的好处在于异步编程能够让许多任务并发执行,而不需要手动开启多个原生线程。事实上,Tokio调度器可以让许多任务在单线程上并发执行。
为了并发地处理连接请求,我们需要在每一次连接到来的时候发布(spawn)一个新的任务,此任务包含连接的处理逻辑,我们把main函数改成这样:
use tokio::net::TcpListener;
#[tokio::main]
async fn main() {
let listener = TcpListener::bind("127.0.0.1:6379").await.unwrap();
loop {
let (socket, _) = listener.accept().await.unwrap();
// A new task is spawned for each inbound socket. The socket is
// moved to the new task and processed there.
tokio::spawn(async move {
process(socket).await;
});
}
}
任务
一个Tokio任务就是一个异步的绿色线程(或者说协程也不为过)。它们通过一个async块来创建,然后传入tokio::spawn
函数来发布,该函数返回一个JoinHandle
,调用者可以通过这一句柄和该任务交互。该async块有时会有返回值,可以通过在JoinHandle
上调用.await
来获得它。例如:
#[tokio::main]
async fn main() {
let handle = tokio::spawn(async {
// Do some async work
"return value"
});
// Do some other work
let out = handle.await.unwrap();
println!("GOT {}", out);
}
在该句柄上await将得到一个Result
,如果任务执行过程中产生错误,会返回Err
,这通常发生在任务发生panic,或者运行时异常宕机导致任务被取消的情况下。
Tokio任务是Tokio调度器管理的执行单元。当我们将任务发布后,该任务被提交给调度器,由调度器负责其执行。每一个任务可能就在当前线程执行,也可能被调度到不同的线程,也有可能在不同线程间切换。
Tokio中的任务是非常轻量级的,实际上创建它们仅消耗一次内存分配以及64字节的内存。你可以随意发布任务,当然别弄个几百万个就好。
‘static限制
当你发布一个任务时,它的类型必须是静态`static的,这意味着任务不能包含任何任务外部数据的引用。
‘static并不总是意味着永生,一个值是’static的并不意味着内存泄漏:你可以在Common Rust Lifetime Misconceptions上阅读更多。
例如,下述代码无法编译通过:
use tokio::task;
#[tokio::main]
async fn main() {
let v = vec![1, 2, 3];
task::spawn(async {
println!("Here's a vec: {:?}", v);
});
}
如果你尝试编译,将会出现如下的报错:
error[E0373]: async block may outlive the current function, but
it borrows `v`, which is owned by the current function
--> src/main.rs:7:23
|
7 | task::spawn(async {
| _______________________^
8 | | println!("Here's a vec: {:?}", v);
| | - `v` is borrowed here
9 | | });
| |_____^ may outlive borrowed value `v`
|
note: function requires argument type to outlive `'static`
--> src/main.rs:7:17
|
7 | task::spawn(async {
| _________________^
8 | | println!("Here's a vector: {:?}", v);
9 | | });
| |_____^
help: to force the async block to take ownership of `v` (and any other
referenced variables), use the `move` keyword
|
7 | task::spawn(async move {
8 | println!("Here's a vec: {:?}", v);
9 | });
|
这是因为,默认情况下,变量不会被移动到async块内,这里的v
的所有权仍然归main函数,而println!
借用了v
。编译器解释了这一错误,并建议使用move
关键字把v
移动进去。修改后,任务就拥有了所有内部数据的所有权,成为’static的了。如果你一定要让多个任务共享一个变量,则它必须用一些同步原语来包装,如线程安全的多重所有权指针Arc
。
注意到错误信息提示:参数类型需要比’static活得长。这听起来有点令人困惑,因为’static变量的生命期等同整个程序,如果它比’static活得长,岂不是内存泄漏了吗?解释如下:是类型需要活得长,而不是变量本身,变量在类型无效前就可以被销毁。
当我们说一个值是’static的时候,它仅仅意味着:任意传递这个变量都不会出错。这很重要,因为编译器无法推断一个新发布的任务会存活多长时间,所以唯一的方法就是假定它可能会永生。
Send限制
被发布的任务必须实现了Send
,这允许Tokio运行时将任务在不同线程间切换。当所有生命期跨过.await
语句的变量是Send
的时候,该任务是Send
的。这有点微妙。当.await
被调用时,任务挂起,然后调度器调度其他任务。下次这个任务被唤醒时,将在挂起的时候重新恢复执行,也就是说在.await
之后用到的所有变量都会被保存。而恢复执行的线程可能不同于原来的线程,因此这些变量需要是Send
的。
例如,下面这段代码是正确的,因为变量rc
提前释放了,它没有跨越await
:
use tokio::task::yield_now;
use std::rc::Rc;
#[tokio::main]
async fn main() {
tokio::spawn(async {
// The scope forces `rc` to drop before `.await`.
{
let rc = Rc::new("hello");
println!("{}", rc);
}
// `rc` is no longer used. It is **not** persisted when
// the task yields to the scheduler
yield_now().await;
});
}
而下面这段代码无法编译通过:
use tokio::task::yield_now;
use std::rc::Rc;
#[tokio::main]
async fn main() {
tokio::spawn(async {
let rc = Rc::new("hello");
// `rc` is used after `.await`. It must be persisted to
// the task's state.
yield_now().await;
println!("{}", rc);
});
}
编译错误如下:
error: future cannot be sent between threads safely
--> src/main.rs:6:5
|
6 | tokio::spawn(async {
| ^^^^^^^^^^^^ future created by async block is not `Send`
|
::: [..]spawn.rs:127:21
|
127 | T: Future + Send + 'static,
| ---- required by this bound in
| `tokio::task::spawn::spawn`
|
= help: within `impl std::future::Future`, the trait
| `std::marker::Send` is not implemented for
| `std::rc::Rc<&str>`
note: future is not `Send` as this value is used across an await
--> src/main.rs:10:9
|
7 | let rc = Rc::new("hello");
| -- has type `std::rc::Rc<&str>` which is not `Send`
...
10 | yield_now().await;
| ^^^^^^^^^^^^^^^^^ await occurs here, with `rc` maybe
| used later
11 | println!("{}", rc);
12 | });
| - `rc` is later dropped here
我们将在下一节深入讨论这个错误的一个特殊情况。
保存数据
我们现在实现新的process
函数,来处理新到达的请求。我们使用HashMap
来保存数据。SET
命令把数据存到哈希表中,而GET
命令读取它们。同时我们循环读取一次连接中的多个命令。
use tokio::net::TcpStream;
use mini_redis::{Connection, Frame};
async fn process(socket: TcpStream) {
use mini_redis::Command::{self, Get, Set};
use std::collections::HashMap;
// A hashmap is used to store data
let mut db = HashMap::new();
// Connection, provided by `mini-redis`, handles parsing frames from
// the socket
let mut connection = Connection::new(socket);
// Use `read_frame` to receive a command from the connection.
while let Some(frame) = connection.read_frame().await.unwrap() {
let response = match Command::from_frame(frame).unwrap() {
Set(cmd) => {
// The value is stored as `Vec<u8>`
db.insert(cmd.key().to_string(), cmd.value().to_vec());
Frame::Simple("OK".to_string())
}
Get(cmd) => {
if let Some(value) = db.get(cmd.key()) {
// `Frame::Bulk` expects data to be of type `Bytes`. This
// type will be covered later in the tutorial. For now,
// `&Vec<u8>` is converted to `Bytes` using `into()`.
Frame::Bulk(value.clone().into())
} else {
Frame::Null
}
}
cmd => panic!("unimplemented {:?}", cmd),
};
// Write the response to the client
connection.write_frame(&response).await.unwrap();
}
}
现在我们运行这个server:
cargo run
在另一个终端运行hello-redis
程序:
cargo run --example hello-redis
我们可以看到输出为:
got value from the server; success=Some(b"world")
我们现在可以插入和读取数据了,但是这里有个问题:数据并没有在连接之间共享,如果另一个客户端想要读取相同的键值,它什么都读不到。在下一节我们来看看如何把数据跨连接保存并共享。
共享状态
策略
有很多策略都能够在Tokio中共享状态:
- 用互斥锁来保护状态变量。
- 发布一个专门用于管理状态的任务,使用消息传递模式来操作状态。
一般来说第一种方法用来管理简单的数据,第二种用于需要异步操作的工作,如IO操作。在本节,我们需要共享的状态是一个哈希表,对应的操作是插入和查询(insert和get),这些操作都是同步的,因此我们使用互斥锁即可。第二种方法将在下一节讲述。
添加bytes包
我们的Mini-Redis使用Bytes
数据结构,而不是Vec<u8>
来表示字节流,前者来自bytes
包。Bytes
的设计目标是为网络编程提供健壮的字节数组。它与Vec<u8>
的最大区别是浅拷贝。也就是说,当我们在Bytes
实例上调用clone()
方法时,并不会拷贝内部的数据,实际上它仅仅增加引用计数。在实现上,你可以认为近似于Arc<Vec<u8>>
,只不过提供了更丰富的封装。
我们在Cargo.toml里添加:
bytes = "0.6"
初始化HashMap
我们的哈希表会在许多任务间共享,因此用Arc<Mutex<_>>
来封装即可。
首先我们为这一类型起个别名:
use bytes::Bytes;
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
type Db = Arc<Mutex<HashMap<String, Bytes>>>;
而后我们修改main函数进行初始化,将智能指针Arc
传递给process
函数,Arc
是线程安全的引用计数指针。
use tokio::net::TcpListener;
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
#[tokio::main]
async fn main() {
let listener = TcpListener::bind("127.0.0.1:6379").await.unwrap();
println!("Listening");
let db = Arc::new(Mutex::new(HashMap::new()));
loop {
let (socket, _) = listener.accept().await.unwrap();
// Clone the handle to the hash map.
let db = db.clone();
println!("Accepted");
tokio::spawn(async move {
process(socket, db).await;
});
}
}
std::sync::Mutex
注意我们使用的是std::sync::Mutex
而不是tokio::sync::Mutex
。在同步的代码中使用tokio::sync::Mutex
是错误的。异步的互斥锁是指其锁的范围跨越了.await
操作符。
一个同步的互斥锁会使当前线程阻塞,等待获取锁,因此会使得当前线程无法再同时处理别的任务。使用tokio::sync::Mutex
也并不会改变这种情况,因为异步的互斥锁在内部也是使用了原始的同步互斥锁。
通常的经验是,在异步代码块中使用同步互斥锁没啥问题,只要临界区很小并且锁不跨越.await
。此外,你可以考虑使用parking_lot::Mutex
,它比标准库的互斥锁更快一些。
更新process函数
process函数不再需要初始化哈希表了,它接收其实例的一个智能指针作为参数。在插入数据时,我们需要先加锁。
use tokio::net::TcpStream;
use mini_redis::{Connection, Frame};
async fn process(socket: TcpStream, db: Db) {
use mini_redis::Command::{self, Get, Set};
// Connection, provided by `mini-redis`, handles parsing frames from
// the socket
let mut connection = Connection::new(socket);
while let Some(frame) = connection.read_frame().await.unwrap() {
let response = match Command::from_frame(frame).unwrap() {
Set(cmd) => {
let mut db = db.lock().unwrap();
db.insert(cmd.key().to_string(), cmd.value().clone());
Frame::Simple("OK".to_string())
}
Get(cmd) => {
let db = db.lock().unwrap();
if let Some(value) = db.get(cmd.key()) {
Frame::Bulk(value.clone())
} else {
Frame::Null
}
}
cmd => panic!("unimplemented {:?}", cmd),
};
// Write the response to the client
connection.write_frame(&response).await.unwrap();
}
}
任务,线程以及竞争
使用阻塞的互斥锁来保护短的临界区是很好的策略,尤其是竞争较小的情况。当锁被争用的时候,线程必须阻塞并且等待锁释放,这同时也导致当前线程被分配的其他任务也被阻塞。
默认情况下,Tokio运行时使用多线程的调度器。任务将在多个线程组成的线程池中调度执行。如果大量任务被发布并执行在不同线程中,且都需要获得哈希表的锁,那么就产生了竞争。换句话说,如果配置了current_thread
,锁就不会被竞争。
current_thread
运行时是一个轻量级,单线程的运行时。当我们发布的任务不是很多,且连接数也不多的情况下,单线程是很好的思路。
如果锁的争用很剧烈,影响程序的性能,可以考虑下面的一些选项:
- 使用专门的Task来管理状态,用消息传递模式。
- 把锁分片。
- 重构代码,不用互斥锁。
在我们的例子中,因为哈希表中每个key都是独立的,锁分片是很好的策略,我们不使用单独的Mutex<HashMap<_, _>>
,而是使用N
个哈希表实例:
type ShardedDb = Arc<Vec<Mutex<HashMap<String, Vec<u8>>>>>;
而后,寻找相应key
所在的位置分两步:首先,利用哈希值确定key所在的哈希表,然后再查该哈希表。
let shard = db[hash(key) % db.len()].lock().unwrap();
shard.insert(key, value);
dashmap包基于这一思想实现了分片的哈希表,提供更好的并发性能。
跨.await的锁
你可能会写出如下的代码:
use std::sync::Mutex;
async fn increment_and_do_stuff(mutex: &Mutex<i32>) {
let mut lock = mutex.lock().unwrap();
*lock += 1;
do_something_async().await;
} // lock goes out of scope here
当你在Tokio任务中调用这一函数时,编译会报错:
error: future cannot be sent between threads safely
--> src/lib.rs:13:5
|
13 | tokio::spawn(async move {
| ^^^^^^^^^^^^ future created by async block is not `Send`
|
::: /playground/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-0.2.21/src/task/spawn.rs:127:21
|
127 | T: Future + Send + 'static,
| ---- required by this bound in `tokio::task::spawn::spawn`
|
= help: within `impl std::future::Future`, the trait `std::marker::Send` is not implemented for `std::sync::MutexGuard<'_, i32>`
note: future is not `Send` as this value is used across an await
--> src/lib.rs:7:5
|
4 | let mut lock = mutex.lock().unwrap();
| -------- has type `std::sync::MutexGuard<'_, i32>` which is not `Send`
...
7 | do_something_async().await;
| ^^^^^^^^^^^^^^^^^^^^^^^^^^ await occurs here, with `mut lock` maybe used later
8 | }
| - `mut lock` is later dropped here
错误原因是,std::sync:MutexGuard
不是Send
的。这意味着你不能让一个互斥锁跨线程传递。前面说过,Tokio调度器可能会让一个任务在多个线程上执行。为了解决这个问题,你必须要在.await
之前释放锁:
// This works!
async fn increment_and_do_stuff(mutex: &Mutex<i32>) {
{
let mut lock = mutex.lock().unwrap();
*lock += 1;
} // lock goes out of scope here
do_something_async().await;
}
注意,虽然看起来很对,但是下面这段代码无法通过编译:
use std::sync::Mutex;
// This fails too.
async fn increment_and_do_stuff(mutex: &Mutex<i32>) {
let mut lock = mutex.lock().unwrap();
*lock += 1;
drop(lock);
do_something_async().await;
}
这是因为编译器目前仅根据作用域来判断一个future
是否是Send
的。将来可能会支持显式的drop
,不过现在还是老老实实用大括号吧。
不要想着能不能在不要求Send
的情况下发布任务来躲避这一问题(单线程调度器)。因为如果Tokio在await时挂起了你的任务,但锁还未释放,另一个任务可能会在同一线程执行,并请求同一个锁。此时就产生了死锁,因为当前线程阻塞,但是锁永远无法释放。
其他办法
在await前通过作用域释放锁是一个办法,实际上还有更鲁棒的写法。例如,你可以把互斥锁包装在一个结构体中,然后通过一个方法来执行锁的逻辑。
use std::sync::Mutex;
struct CanIncrement {
mutex: Mutex<i32>,
}
impl CanIncrement {
// This function is not marked async.
fn increment(&self) {
let mut lock = self.mutex.lock().unwrap();
*lock += 1;
}
}
async fn increment_and_do_stuff(can_incr: &CanIncrement) {
can_incr.increment();
do_something_async().await;
}
这一编程模式保证了上面的Send
错误永远不会出现,因为锁根本就没有出现在异步块中。
另一个办法就是使用消息传递策略,我们将在下一节提到。
Tokio的异步锁
我们前面提到的tokio::sync::Mutex
终于派上用场了。它最主要的特性就是能够跨越.await
,而不产生问题。当然这也意味着异步锁的性能损失更大,如果非必要,请用上面的两种策略,而不是用异步锁:
use tokio::sync::Mutex; // note! This uses the Tokio mutex
// This compiles!
// (but restructuring the code would be better in this case)
async fn increment_and_do_stuff(mutex: &Mutex<i32>) {
let mut lock = mutex.lock().await;
*lock += 1;
do_something_async().await;
} // lock goes out of scope here
Channels
现在我们已经学到了一些Tokio的异步知识了,本节我们来写写客户端。首先我们尝试发布Tokio任务来发送命令:
use mini_redis::client;
#[tokio::main]
async fn main() {
// Establish a connection to the server
let mut client = client::connect("127.0.0.1:6379").await.unwrap();
// Spawn two tasks, one gets a key, the other sets a key
let t1 = tokio::spawn(async {
let res = client.get("hello").await;
});
let t2 = tokio::spawn(async {
client.set("foo", "bar".into()).await;
});
t1.await.unwrap();
t2.await.unwrap();
}
上面这段代码并不能通过编译,因为两个任务都需要用到client变量,但是Client
并没有实现Copy
,因此这一共享是错误的。当然我们可以为每条命令都新建一个连接,但显然这很慢。
消息传递
答案是使用消息传递模型,即我们使用专门的任务来管理client
连接。任何想要发送新的命令的任务都把指令作为一条消息传递给该client任务,client任务负责把命令发送给服务端,然后再把消息传回原任务。
使用这一策略,我们只需要维护一个与服务端的连接即可。client任务能够很好地处理收到的命令,同时channel本身也起到缓冲的作用。此模型很好地提高吞吐量,同时也可以扩展成连接池模型。
Tokio的channel原语
Tokio提供了许多类型的channel,每一个都提供不同的功能和设计目标:
- mpsc:多生产者,单消费者。可以一次传递多个值。
- oneshot:单生产者,单消费者。一次只能传递一个值。
- broadcast:多生产者,多消费者。一次传递多值,值能够被每个接收器看见。
- watch:单生产者,多消费者。一次传递多值,不过不记录消息历史。接收器只能看见最近的值。
如果你需要多生产者,多消费者的channel,并且只有一个接收器能看见全部值,你可以使用async-channel包。在同步编程中也有一些channel,如std::sync::mpsc和crossbeam::channel,这些channel在等待数据的时候会阻塞当前线程,而在Tokio异步编程中,当前线程不会阻塞。
在本节,我们会使用mpsc
和oneshot
,另外几种channel将在后续讲解。
定义消息类型
在大多数情况下,当我们使用消息传递时,接收消息的任务常常需要响应多个累积的消息。在我们的例子中,该任务需要处理GET
和SET
消息,我们首先定义Command
类型:
use bytes::Bytes;
#[derive(Debug)]
enum Command {
Get {
key: String,
},
Set {
key: String,
val: Bytes,
}
}
创建channel
在main函数中,我们以如下方式创建mpsc
通道:
use tokio::sync::mpsc;
#[tokio::main]
async fn main() {
// Create a new channel with a capacity of at most 32.
let (mut tx, mut rx) = mpsc::channel(32);
// ... Rest comes here
}
这个channel是为了向管理连接的任务发送命令的,多生产者属性使得我们可以在多个任务中发送消息。tx和rx分别是channel的发送器和接收器句柄,它们可以被分配给不同的任务。
当前channel以容量32创建,如果消息的发送速度大于接受速度,则通道会保存未读取的数据。一旦通道中保存的数据超过32,调用send(...).await
会导致任务挂起,直到channel有多余空间。
从多个任务发送数据可以通过克隆Sender来实现,例如:
use tokio::sync::mpsc;
#[tokio::main]
async fn main() {
let (mut tx, mut rx) = mpsc::channel(32);
let mut tx2 = tx.clone();
tokio::spawn(async move {
tx.send("sending from first handle").await;
});
tokio::spawn(async move {
tx2.send("sending from second handle").await;
});
while let Some(message) = rx.recv().await {
println!("GOT = {}", message);
}
}
在上面的代码中,两个数据都被发送到了Reciever,注意mpsc的接收器是无法复制的。
当每个Sender实例都结束生命期后,这就意味着我们无法再向对应的接收器发送任何数据。此时,如果对rx调用recv会得到None,这代表着所有Sender都已经释放,因此channel关闭。
在mini-redis项目中,管理客户端连接的任务将在channel关闭的同时关闭对应的TCP连接,因为不会再有新的命令了,此redis连接作废。
创建manager
现在我们创建一个新的任务,用以管理channel的数据接收和发送。首先,我们建立一个新的与Redis服务的TCP连接,之后我们把从channel中接收到的数据转发到Redis即可。
use mini_redis::client;
// The `move` keyword is used to **move** ownership of `rx` into the task.
let manager = tokio::spawn(async move {
// Establish a connection to the server
let mut client = client::connect("127.0.0.1:6379").await.unwrap();
// Start receiving messages
while let Some(cmd) = rx.recv().await {
use Command::*;
match cmd {
Get { key } => {
client.get(&key).await;
}
Set { key, val } => {
client.set(&key, val).await;
}
}
}
});
现在我们把上一小节的代码修改一下,把数据发送给manager,而不是Redis服务器:
// The `Sender` handles are moved into the tasks. As there are two
// tasks, we need a second `Sender`.
let mut tx2 = tx.clone();
// Spawn two tasks, one gets a key, the other sets a key
let t1 = tokio::spawn(async move {
let cmd = Command::Get {
key: "hello".to_string(),
};
tx.send(cmd).await.unwrap();
});
let t2 = tokio::spawn(async move {
let cmd = Command::Set {
key: "foo".to_string(),
val: "bar".into(),
};
tx2.send(cmd).await.unwrap();
});
在main函数的结尾我们await三个任务的JoinHandle
,以保证它们都执行完成:
t1.await.unwrap();
t2.await.unwrap();
manager.await.unwrap();
接收响应
最后一步,我们还要把Redis服务器返回的响应内容转发到发送命令的task。GET
命令需要获得具体的值,而SET
命令需要返回操作结果(失败or成功)。
为了把response传回去,我们使用前面介绍的oneshot
通道。它是单生产者,单消费者,并且为单值的消息传递作了优化。在我们的例子中,单值指的是Response。
类似于mpsc
,oneshot::channel
返回发送器和接收器句柄:
use tokio::sync::oneshot;
let (tx, rx) = oneshot::channel();
不过和mpsc不同,我们不需要设置channel的大小,并且任何一个句柄都是不可复制的。
为了从manager协程接收到响应,在发送命令之前,我们需要先创建一个oneshot通道,然后把发送器一起传给manager即可。为此我们更新一下之前定义的Command
类型:
use tokio::sync::oneshot;
use bytes::Bytes;
/// Multiple different commands are multiplexed over a single channel.
#[derive(Debug)]
enum Command {
Get {
key: String,
resp: Responder<Option<Bytes>>,
},
Set {
key: String,
val: Vec<u8>,
resp: Responder<()>,
},
}
/// Provided by the requester and used by the manager task to send
/// the command response back to the requester.
type Responder<T> = oneshot::Sender<mini_redis::Result<T>>;
现在我们更新之前的两个发送任务:
let t1 = tokio::spawn(async move {
let (resp_tx, resp_rx) = oneshot::channel();
let cmd = Command::Get {
key: "hello".to_string(),
resp: resp_tx,
};
// Send the GET request
tx.send(cmd).await.unwrap();
// Await the response
let res = resp_rx.await;
println!("GOT = {:?}", res);
});
let t2 = tokio::spawn(async move {
let (resp_tx, resp_rx) = oneshot::channel();
let cmd = Command::Set {
key: "foo".to_string(),
val: b"bar".to_vec(),
resp: resp_tx,
};
// Send the SET request
tx2.send(cmd).await.unwrap();
// Await the response
let res = resp_rx.await;
println!("GOT = {:?}", res);
});
最后再修改一下manager任务,让它把接收到的响应通过Sender发送回去:
while let Some(cmd) = rx.recv().await {
match cmd {
Command::Get { key, resp } => {
let res = client.get(&key).await;
// Ignore errors
let _ = resp.send(res);
}
Command::Set { key, val, resp } => {
let res = client.set(&key, val.into()).await;
// Ignore errors
let _ = resp.send(res);
}
}
}
注意,调用oneshot通道的send是一个同步方法,不需要await。这是因为oneshot的send会立刻知道是否发送成功。当接收器实例已经释放,则send会返回Err
,在上面的代码中,我们忽略这一错误,因为这是允许的情况。
背压以及有界channel
当并发和队列同时出现,我们需要保证队列是有界的,并且系统能够优雅地处理负载。无界地队列最终有可能会占满整个内存,导致系统出现不可预知的错误。
Tokio防止隐式的队列出现。很大一部分原因是异步操作是惰性的,例如下面的代码:
loop {
async_op();
}
如果异步操作是立刻执行的,那么上面的循环会不断地创建新的异步操作,而不会等待上一个操作执行完成,这就产生了隐式的无界队列。基于回调的系统,以及基于eager future的系统特别容易出现这样的错误。
然而,异步Rust和Tokio很好地规避了这一问题,上面的代码在Rust中不会直接执行,async_op本身只是一个Future变量,只有调用await才能让它真正执行,否则就只是一个单纯的死循环,不会占用更多内存。
并发和队列必须显式调用,比如通过如下形式:
tokio::spawn
select!
join!
mpsc::channel
当我们做这些调用的时候,请保证所有的并发任务数目是有界的。例如,当编写异步接收TCP请求的循环时,请保证socket的数量是有界的;当使用mpsc通道时,设置一个适当的channel容量,容量大小取决于具体应用的特性。
小心地选取适当的bound是编写可靠的Tokio应用程序的重要一步。
I/O
Tokio里面的I/O操作和标准库里的很像,只不过全部进行了异步化。我们使用两个trait来实现:AsyncRead
和AsyncWrite
。一些常用的类型都实现了这两种trait,例如TcpStream
,File
,Stdout
。一些常用的数据结构也实现了,如Vec<u8>
和&[u8]
。
本节将介绍基本的Tokio I/O编程,下一节将介绍更高级的用法。
AsyncRead和AsyncWrite
这两个trait提供了异步读写字节流的功能,其内部方法实际上不会手动调用,而是通过Tokio提供的工具包AsyncReadExt
和AsyncWriteExt
来调用。
我们简单看看这些工具包里的方法,注意所有的方法都是异步的,需要使用await才能实际执行。
async fn read()
AsyncReadExt::read()
提供了异步读取数据到一个buffer的能力,返回读取到的字节数。
注意:当read()
返回Ok(0)
时,说明流已经关闭了,任何后续的读取都会返回0。因此如果读写的是TcpStream,我们可以直接关闭连接了(实际上此时说明TCP连接的对方已经调用close关闭连接,发送了FIN报文)。
use tokio::fs::File;
use tokio::io::{self, AsyncReadExt};
#[tokio::main]
async fn main() -> io::Result<()> {
let mut f = File::open("foo.txt").await?;
let mut buffer = [0; 10];
// read up to 10 bytes
let n = f.read(&mut buffer[..]).await?;
println!("The bytes: {:?}", &buffer[..n]);
Ok(())
}
async fn read_to_end()
AsyncReadExt::read_to_end
将从流中读取所有字节,直到遇到EOF。
use tokio::io::{self, AsyncReadExt};
use tokio::fs::File;
#[tokio::main]
async fn main() -> io::Result<()> {
let mut f = File::open("foo.txt").await?;
let mut buffer = Vec::new();
// read the whole file
f.read_to_end(&mut buffer).await?;
Ok(())
}
async fn write()
AsyncWriteExt::write
将一个buffer里面的所有字节都写到流中(不一定能全部写进去),返回实际写入的字节数。
use tokio::io::{self, AsyncWriteExt};
use tokio::fs::File;
#[tokio::main]
async fn main() -> io::Result<()> {
let mut file = File::create("foo.txt").await?;
// Writes some prefix of the byte string, but not necessarily all of it.
let n = file.write(b"some bytes").await?;
println!("Wrote the first {} bytes of 'some bytes'.", n);
Ok(())
}
async fn write_all()
AsyncWriteExt::write_all
把整个buffer中的数据全部写入,保证全部写入完成才会返回。
use tokio::io::{self, AsyncWriteExt};
use tokio::fs::File;
#[tokio::main]
async fn main() -> io::Result<()> {
let mut buffer = File::create("foo.txt").await?;
buffer.write_all(b"some bytes").await?;
Ok(())
}
除此之外还有很多其他实用的函数可以查看API文档。
Helper函数
和标准库std一样,tokio::io
模块包含很多实用的工具函数,例如tokio::io::copy
异步地将一个reader的数据拷贝到一个writer发送出去。
use tokio::fs::File;
use tokio::io;
#[tokio::main]
async fn main() -> io::Result<()> {
let mut reader: &[u8] = b"hello";
let mut file = File::create("foo.txt").await?;
io::copy(&mut reader, &mut file).await?;
Ok(())
}
当然这利用了一个事实:字节数组&[u8]
实现了AsyncRead
。
Echo服务器
回响服务器是常见的网络编程例子,我们来实现一下。首先需要创建一个TcpListener
,接收到来的客户端请求,每个连接的处理都是一致的:从socket中读取数据,然后直接将读取到的数据写回。因此客户端看到的响应就是自己之前发送的数据。
我们将用两种不太一样的策略来实现echo服务器。
使用io::copy
实现
我们首先利用前面介绍的io::copy
方法来实现,大致的框架先搭好:
use tokio::io;
use tokio::net::TcpListener;
#[tokio::main]
async fn main() -> io::Result<()> {
let mut listener = TcpListener::bind("127.0.0.1:6142").await.unwrap();
loop {
let (mut socket, _) = listener.accept().await?;
tokio::spawn(async move {
// Copy data here
});
}
}
显然我们只需要在空出来的异步块中填写copy的逻辑即可,注意到TcpStream
同时实现了AsyncRead
和AsyncWrite
,是不是可以直接作为reader和writer调用copy呢?
// This fails to compile
io::copy(&mut socket, &mut socket).await
很遗憾上面的代码无法编译,因为我们同时可变引用了两次socket
变量(&mut
),这在Rust中是不允许的。
拆分成reader和writer
如何处理这个问题呢,我们把一个socket拆分成用于reader的句柄和用于writer的句柄。任何同时具有reader和writer属性的类型都可以使用io::aplit
拆分,此函数返回reader和writer句柄,而后这两个句柄可以单独用于不同的任务。
例如我们的echo客户端可以如下编写:
use tokio::io::{self, AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpStream;
#[tokio::main]
async fn main() -> io::Result<()> {
let socket = TcpStream::connect("127.0.0.1:6142").await?;
let (mut rd, mut wr) = io::split(socket);
// Write data in the background
let write_task = tokio::spawn(async move {
wr.write_all(b"hello\r\n").await?;
wr.write_all(b"world\r\n").await?;
// Sometimes, the rust type inferencer needs
// a little help
Ok::<_, io::Error>(())
});
let mut buf = vec![0; 128];
loop {
let n = rd.read(&mut buf).await?;
if n == 0 {
break;
}
println!("GOT {:?}", &buf[..n]);
}
Ok(())
}
io::split
支持任何实现了AsyncWrite+AsyncRead
的类型,在内部它使用Arc
和Mutex
,这会产生一定的性能损失。对于TcpStream
,我们提供了独特的split
机制。
TcpStream::split
接收一个stream的引用作为参数,然会读写句柄。由于我们使用的是引用,因此得到的两个读写句柄不能再用在别的task上了,只能用于当前任务。这个特殊的split
函数是零开销的,内部没有使用Arc
和Mutex
。TcpStream
还提供了into_split
方法,其提供了可以传递给其他任务的handle,里面用到了Arc
。
由于io::copy
是在同一task中调用的,我们可以直接使用TcpStream::split
来实现拷贝的逻辑:
tokio::spawn(async move {
let (mut rd, mut wr) = socket.split();
if io::copy(&mut rd, &mut wr).await.is_err() {
eprintln!("failed to copy");
}
});
手动copy
现在我们考虑不用Tokio提供的工具,手动使用read
和write_all
来实现echo服务。代码如下:
use tokio::io::{self, AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpListener;
#[tokio::main]
async fn main() -> io::Result<()> {
let mut listener = TcpListener::bind("127.0.0.1:6142").await.unwrap();
loop {
let (mut socket, _) = listener.accept().await?;
tokio::spawn(async move {
let mut buf = vec![0; 1024];
loop {
match socket.read(&mut buf).await {
// Return value of `Ok(0)` signifies that the remote has
// closed
Ok(0) => return,
Ok(n) => {
// Copy the data back to socket
if socket.write_all(&buf[..n]).await.is_err() {
// Unexpected socket error. There isn't much we can
// do here so just stop processing.
return;
}
}
Err(_) => {
// Unexpected socket error. There isn't much we can do
// here so just stop processing.
return;
}
}
}
});
}
}
我们一步步分析一下上面的代码。首先,创建一个大小为1024的buffer。注意我们使用的是Vec
,堆上的动态数组,而不是栈上的基础数组[u8]
。因为buffer的使用跨越了await,如果用栈数组来表示buffer,理论上也没问题,但由于它是栈上空间,我们每次处理一个请求都会开启新的一个task,每个task具有一个数据结构,看起来可能像这样:
struct Task {
// internal task fields here
task: enum {
AwaitingRead {
socket: TcpStream,
buf: [BufferType],
},
AwaitingWriteAll {
socket: TcpStream,
buf: [BufferType],
}
}
}
如果使用栈数组,它将直接存放在task结构体中,这会导致结构体比较大,而且由于buffer的大小通常设为页的整数倍,这导致task结构的大小有点尴尬。因此,我们推荐使用堆上内存去分配buffer。
当TCP数据流关闭后,我们将从read中读到Ok(0)
,此时必须跳出循环,因为后续的读取都会是这个,不跳出的话程序就一定会陷入死循环。
Framing
我们现在可以应用前面的IO知识实现Mini-Redis的framing层(也叫解码层),指的是将字节流中的数据转化为数据帧的流,一个帧是客户端和服务端数据传输的一个单位。我们使用的Redis通信协议的数据帧如下:
use bytes::Bytes;
enum Frame {
Simple(String),
Error(String),
Integer(u64),
Bulk(Bytes),
Null,
Array(Vec<Frame>),
}
一个HTTP协议的帧可能会长这样:
enum HttpFrame {
RequestHead {
method: Method,
uri: Uri,
version: Version,
headers: HeaderMap,
},
ResponseHead {
status: StatusCode,
version: Version,
headers: HeaderMap,
},
BodyChunk {
chunk: Bytes,
},
}
为了实现Mini-Redis,我们实现一个Connection
结构体,它包装一个TcpStream
并且读写mini_redis::Frame
值。
use tokio::net::TcpStream;
use mini_redis::{Frame, Result};
struct Connection {
stream: TcpStream,
// ... other fields here
}
impl Connection {
/// Read a frame from the connection.
///
/// Returns `None` if EOF is reached
pub async fn read_frame(&mut self)
-> Result<Option<Frame>>
{
// implementation here
}
/// Write a frame to the connection.
pub async fn write_frame(&mut self, frame: &Frame)
-> Result<()>
{
// implementation here
}
}
完整的代码在这里。
缓冲读
read_frame
方法等待接收到一个完整的帧,这是因为一次read
读到的数据是不确定的(流式传输的特点),可能有多个帧,也可能只传输了部分帧。因此如果读到了不完整的一个帧,我们将数据存放在缓冲区等待新的数据,如果读到了多个帧,我们一次只返回一个帧,剩下的也放在缓冲区,用于下次read_frame
调用。
为了实现这样的想法,Connection
需要一个读缓冲区,数据将从socket读到缓冲区,当一个完整的帧解析成功,我们就把对应的数据移除。
我们使用BytesMut
作为缓冲区的类型,这是Bytes
类型的可变版本。
use bytes::BytesMut;
use tokio::net::TcpStream;
pub struct Connection {
stream: TcpStream,
buffer: BytesMut,
}
impl Connection {
pub fn new(stream: TcpStream) -> Connection {
Connection {
stream,
// Allocate the buffer with 4kb of capacity.
buffer: BytesMut::with_capacity(4096),
}
}
}
然后我们再来实现read_frame
:
use tokio::io::AsyncReadExt;
use bytes::Buf;
use mini_redis::Result;
pub async fn read_frame(&mut self)
-> Result<Option<Frame>>
{
loop {
// Attempt to parse a frame from the buffered data. If
// enough data has been buffered, the frame is
// returned.
if let Some(frame) = self.parse_frame()? {
return Ok(Some(frame));
}
// There is not enough buffered data to read a frame.
// Attempt to read more data from the socket.
//
// On success, the number of bytes is returned. `0`
// indicates "end of stream".
if 0 == self.stream.read_buf(&mut self.buffer).await? {
// The remote closed the connection. For this to be
// a clean shutdown, there should be no data in the
// read buffer. If there is, this means that the
// peer closed the socket while sending a frame.
if self.buffer.is_empty() {
return Ok(None);
} else {
return Err("connection reset by peer".into());
}
}
}
}
我们分析一下,函数的主体在一个循环中,我们首先尝试从现有的缓冲区中解析出一个帧,如果解析成功,我们就可以将帧返回,如果解析不出,说明缓冲区内没有足够的数据。此时我们尝试把socket中接收到的数据读到buffer中,如果读到的数据量为0,要么帧已经完整解析完,socket里面没有数据了,这是正常的连接关闭,如果帧没有解析完,但是对方已经关闭连接,则属于不正常关闭,需要返回Err。
Buf
trait
注意到我们这次从Tcp流中读取数据时用的方法是read_buf
,此方法接收的参数必须实现bytes
包中的BufMut
。
如果我们用Vec<u8>
来实现,则可以直接使用前面用到的read()
方法,此时需要一个cursor
来表示当前缓存的数据的尾部,Connection
的定义需要修改为如下:
use tokio::net::TcpStream;
pub struct Connection {
stream: TcpStream,
buffer: Vec<u8>,
cursor: usize,
}
impl Connection {
pub fn new(stream: TcpStream) -> Connection {
Connection {
stream,
// Allocate the buffer with 4kb of capacity.
buffer: vec![0; 4096],
cursor: 0,
}
}
}
此时read_frame
函数也要修改:
use mini_redis::{Frame, Result};
pub async fn read_frame(&mut self)
-> Result<Option<Frame>>
{
loop {
if let Some(frame) = self.parse_frame()? {
return Ok(Some(frame));
}
// Ensure the buffer has capacity
if self.buffer.len() == self.cursor {
// Grow the buffer
self.buffer.resize(self.cursor * 2, 0);
}
// Read into the buffer, tracking the number
// of bytes read
let n = self.stream.read(
&mut self.buffer[self.cursor..]).await?;
if 0 == n {
if self.cursor == 0 {
return Ok(None);
} else {
return Err("connection reset by peer".into());
}
} else {
// Update our cursor
self.cursor += n;
}
}
}
可以看到我们必须要使用cursor
来保证新读取的数据不会覆盖还未处理完成的数据,同时如果缓冲区的容量用完了,我们必须手动把缓冲区扩容,在parse_frame
中,我们需要处理的部分是self.buffer[..self.cursor]
。
因为这样的字节数组的处理非常常见,bytes
包提供了高级的抽象:Buf
和BufMut
。Buf
指的是可读的数据,BufMut
指可写数据类型。当我们将实现了BufMut
的实例传入read_buf
时,read_buf
会帮你处理内部的cursor
,这使得外部代码更简洁。
此外,当我们使用Vec<u8>
时,buffer必须要初始化,上面的代码就初始化成4KB的大小,每个位置都是0。当我们扩容的时候,新的内容也是初始化为零。初始化过程是非必须的,因为反正后面读的时候也是要覆盖掉原来的数据的。我们的BytesMut
的实现保证了数据不会初始化,省去了这一开销,同时它也在API层面禁止用户访问未初始化的数据。
Parsing
现在我们看如何实现parse_frame
函数,解析帧需要两步:
- 读完整的一个帧,定位到尾部。
- 解析这个帧。
mini_redis
包提供了两个函数处理这个事:Frame::check
和Frame::parse
。
同时我们也会用到Buf
,Buf
类型的数据将被传入check
,check会检查一个帧,同时移动内部的cursor
,当其返回时,内部的cursor
就会指向帧的尾部。std::io::Cursor
就实现了Buf
。
use mini_redis::{Frame, Result};
use mini_redis::frame::Error::Incomplete;
use bytes::Buf;
use std::io::Cursor;
fn parse_frame(&mut self)
-> Result<Option<Frame>>
{
// Create the `T: Buf` type.
let mut buf = Cursor::new(&self.buffer[..]);
// Check whether a full frame is available
match Frame::check(&mut buf) {
Ok(_) => {
// Get the byte length of the frame
let len = buf.position() as usize;
// Reset the internal cursor for the
// call to `parse`.
buf.set_position(0);
// Parse the frame
let frame = Frame::parse(&mut buf)?;
// Discard the frame from the buffer
self.buffer.advance(len);
// Return the frame to the caller.
Ok(Some(frame))
}
// Not enough data has been buffered
Err(Incomplete) => Ok(None),
// An error was encountered
Err(e) => Err(e.into()),
}
}
check
函数的实现在这里。我们不会完整讲解这个函数。
注意到,Buf
的API是字节迭代器风格的,例如我们需要查看第一个字节来判断帧的类型,用到的方法是Buf::get_u8
,它读取当前cursor
位置的1个字节,并将cursor后移1位。还有很多好用的方法,请阅读API文档 。
缓冲写
framing的另一个API是write_frame(frame)
。