您的位置:首页 > 编程语言

异步代码的几种写法 | Rust学习笔记

2020-09-28 14:18 1031 查看

作者:谢敬伟,江湖人称“刀哥”,20年IT老兵,数据通信网络专家,电信网络架构师,目前任Netwarps开发总监。刀哥在操作系统、网络编程、高并发、高吞吐、高可用性等领域有多年的实践经验,并对网络及编程等方面的新技术有浓厚的兴趣。

Rust
历史不长,仍然处于快速发展的历程中。关于异步编程的模式,现在已经发展到
async/await
协程的高级阶段。大概是因为
async/await
出现的时间还不长,所以现有大多数的开源项目并不是或不是纯粹使用
async/await
来书写的,而是前前后后有多种的写法。这样的状况给
Rust
的学习带来了一些的难度。在这里,我们来捋一捋异步代码的几种写法。

mio

最原始的方式是使用

mio
进行开发。
mio
是一个底层异步
I/O
库,提供非阻塞方式的
API
,具有很高的性能。实际上
mio
是对于操作系统
epoll/kqueue/IOCP
的封装。在
C/C++
中我们使用
libevent
之类的库,
mio
可以理解为对应的
Rust
版本。基于
mio
的代码大致如下:

loop {
// Poll Mio for events, blocking until we get an event.
poll.poll(&mut events, None)?;

// Process each event.
for event in events.iter() {
if event.is_writable() {
// socket可写,开始发送数据
}

if event.is_readable() {
// socket可读,开始接收数据
}
// socket 关闭,退出循环
return Ok(());
}
}

总的来说,这是完全基于异步事件通知的写法,和

C/C++
区别不是很大,异步代码对于程序员是一个挑战,当代码逻辑越来越复杂,添加新功能或是解决已有问题的难度也越来越大。

另外,

mio
实现的是一个单线程事件循环,虽然可以处理成千上万路的
I/O
操作,但没有多线程的能力,需要自己扩充。

Future Poll

为了更好地规范异步的逻辑,

Rust
抽象出
Future
表示尚未发生的事物。这些
Future
可以用很多方式組合成一个更复杂的复合
Future
来代表一系列的事件。
Future
需要程序主动去
poll
(轮询)才能获取到最终的结果,每一次轮询的结果可能是
Ready
或者
Pending

运行库提供

Executor
Reactor
来执行
Future
,也就是调用
Future
poll
方法循环执行一系列就绪的
Future
,当
Future
返回 71f0
Pending
的时候,会将
Future
转移到
Reactor
上等待唤醒。
Reactor
被用来负责唤醒之前无法完成的
Future
。事实上,
tokio
Reactor
是基于
mio
实现的,而
async-std/smol
则是封装了
epoll/kqueue/IOCP
,提供类似的功能。

手动实现

Future
是一件相对繁琐的工作,主要的问题在于异步模式本身的特性。例如,接收网络数据,无法臆测每次轮询会收到多少字节的数据,往往需要开辟一段接收缓冲区容纳数据,协议解码也需要一个状态机拼包向上层提交;发送网络数据存在相似问题,发送数据时底层未就绪,则缓冲发送数据,待下次轮询时,需要首先检查并处理发送缓冲区。另外还有一些值得注意的地方,如果手动实现的
Future
返回
Pending
,则必须自己实现唤醒机制,也就是需要将
cx
克隆一份记下来,然后在适当的时侯调用
cx.wake()
。因为网络相关的功能往往是分层的,因此手动的
Poll
循环也会是层层堆叠的,这时候,返回值
Poll::Ready(T)
就有学问了。泛型T可能包裹各种不同的数据,
Option<T>
Result<T,E>
,或者两者的组合。因为最外层还有一个
Poll<T>
,所有这时候的
match
语句写起来会非常臃肿,粘贴复制写很多代码,完成的功能却非常有限,而且由于这些代码很相似,大大增加了出错的可能性。

标准库中仅仅定义了

Future
,更多的相关功能需要引用
futures-rs
类库,里面定义了一系列有关异步的操作,包括
Stream
Sink
AsyncRead
AsyncWrite
等基础
Trait
,以及对应实现了大量方便操作的组合子的
Ext Trait
,特别用途的
fused
Box
Try
系列的扩展,诸如
join!
select!
pin_mut!
等一系列的宏。理论上,不使用这些扩展也能写出代码,只不过那样的代码很可能篇幅会长的可怕。值得一提的是,除了一些可以简化代码的过程宏之外,扩展
Trait
提供的组合子也会让代码精简不少。比如
Future::and_then
可以让代码写成链式调用的方式;
Sink::send
包装了
Sink
发送三步骤
poll_ready/start_send/poll_flush
,使用
.await
一行代码直接就可以完成发送。因此,很多
poll
方式的代码实际上是准确地说是混合式的,其中也使用了不少
async
代码块。

总之,搞清楚

Future
相关的这些内容是需要花费不少时间,更不用说用它们来写代码了。不过,即便是使用
async/await
这种更高级原语,也是有必要了解底层的工作原理和实现机制,所谓知其然知其所以然。

async/await

使用

async/await
可以将异步的代码写得类似同步的过程,更加符合人体工程学。因为
async
被翻译为一个
Future
状态机,原先在
poll
方式中需要处理的与
Pending
相关的状态现在都由
async
生成的状态机自动完成,因此大大减轻了程序员的心智负担。

如前所述,底层的

Futures
提供了很多方便的组合子扩展
Future
,使用起来很简洁,可以极大地简化代码。例如,上文提到过的
Sink::send
包装了发送缓冲区的实现和异步发送的三个步骤;
AsyncRead::read_exact
实现了读取指定字节数的功能,在处理网络协议解析时可以避免手写一个拼包状态机;
AsyncWrite::write_all
实现了发送全部数据以及发送缓冲,等等。正是在这些底层功能的支持下,
async/await
成为了更高级的书写异步代码的方式。也许会有少许担心,这样所谓“高级”会不会在性能上有很大损失?笔者个人不这么认为。自动实现的状态机也许未必比程序员手动完成的性能更差。状态机编程对于任何人,即便是一个有经验的程序员都是不小挑战。蹩脚的状态机实现不仅可能有性能问题,更大的风险来自于实现上的漏洞,以及维护上的困难。代码写出来更多是给别人看的,完成同样的功能,简洁的代码更有可能是更高质量的代码。

以下例子是固定长度分割的报文接收过程,使用

async/await
是很简单的。如果实现为一个
Stream/poll_next
,代码会复杂很多。

/// convenient method for reading a whole frame
pub async fn recv_frame(&mut self) -> io::Result<Vec<u8>> {
let mut len = [0; 4];
let _ = self.inner.read_exact(&mut len).await?;  // inner socket, 支持 AsyncRead

let n = u32::from_be_bytes(len) as usize;
if n > self.max_frame_len {
let msg = format!(
"data length {} exceeds allowed maximum {}",
n, self.max_frame_len
);
return Err(io::Error::new(io::ErrorKind::PermissionDenied, msg));
}

let mut frame = vec![0; n];
self.inner.read_exact(&mut frame).await?;

Ok(frame)
}

最后,完全使用

async/await
写代码目前还有几个问题:

  • async trait

    当前

    Trait
    不支持
    async fn
    ,无法直接用
    Trait
    来抽象异步方法。暂时解决办法是使用三方库
    async-trait
    。如下:

use async_trait::async_trait;

#[async_trait]
trait Advertisement {
async fn run(&self);
}

async_trait
将代码转换为一个返回
Pin&lt;Box&lt;dyn Future + Send + 'async&gt;&gt;
的同步方法。因为装箱和动态派发的原因,性能上会有少许损失。

  • 异步析构

    当前

    drop
    方法必须是同步调用,不能使用
    await
    语法。当一个
    I/O
    对象越过生命周期被析构,往往在关闭底层句柄之前,还需要完成某些
    I/O
    操作。比如,通知网络对端连接已经关闭。在同步代码中,我们只需要在
    drop()
    中置入这些操作,但是在异步代码中,无法在
    drop()
    中做类似的事情。

解决办法,总是在异步

I/O
对象越过生命周期之前显式地执行关闭动作,或是,实现一个类似
GC
的功能,专门负责清理工作。

展望

笔者在学习

Rust
过程中,主要关注网络相关的并发编程。因为之前有在
Go
版本的
ipfs/libp2p
上的开发经验,故而学习研究了
rust-libp2p
以及
nervos tentacle
rust-libp2p
Parity
实现的准官方版本,但是这个项目的代码及其难懂,过于强调使用泛型参数的抽象,导致代码可读性非常差。请教了代码作者,他承认代码可能有些复杂,但也强调都是有原因的...
nervos tentacle
的实现在协议上不够完整,特别是与标准
libp2p
并不兼容。两个项目共有的特点是主要用
poll
的方式写代码,逻辑上都是状态机的嵌套。

因此,笔者试图完全使用

async/await
方式重构
libp2p
,参考
rust-libp2p
的实现,代码协程化,向上层提供纯粹的异步接口,争取在
API
层面的体验接近
go-libp2p
,这是推广
Rust
协程机制的一个尝试,同时也是个人的一个学习的过程。目前刚刚起步,仅完成了
secio
yamux
部分,待合适时机开源,期望更多
Rust
爱好者共同来开发完善。

参考:
Asynchronous Destructors

深圳星链网科科技有限公司(Netwarps),专注于互联网安全存储领域技术的研发与应用,是先进的安全存储基础设施提供商,主要产品有去中心化文件系统(DFS)、企业联盟链平台(EAC)、区块链操作系统(BOS)。
微信公众号:Netwarps

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: