异步系列 2-asio 源码分析
自圆其说是本 blog 的宗旨,有的时候理论真实可能会更加复杂,但是我愿意使用自己能理解的方式去理解的,并且达到自圆其说的地步;目前处于分布书存储领域,这个领域大神 太多但我不是那种很牛逼的大神。来这个领域只是为了追求自己喜欢做的事情,所以会持续几年吧。
个人喜欢非常 cool 的工具和膜拜各类大神,前提是你真的比我厉害,不喜欢吹嘘的人。算不上很喜欢写程序,但是挺喜欢现在的状态,最好 35 岁别把我裁了就更加好了。梦想着 自己能实现收支平衡,做一个开心的废物;最近一段时间正在学习 Rust,这个语言我超级喜欢,因为我非常喜欢 C++,但是不得不说 C++ 的包袱太重了;下一份工作可能会尝试 remote work,不知道这是不是我喜欢的生活; 家中有一个小宝贝,有点任性,不知道我能不能教育好她,希望她能健健康康的成长,我努力让自己不焦虑去鸡娃;我努力让你成为富 二代吧。
继续之前的异步系列,这次主要将 asio 的源码分析,分析的目的也是为了能更好的了解目前在 Linux/Macos 是如何去做异步的,这样会更加的去了解后面的 goroutine、Rust async 或者 Scylla seastar 框架,为什么没有聊到 window iocp 呢,因为实现方式上有点区别,但是理论上应该也差不多太多,不过这部分是 windows os 帮实现的,用整体的完成度来说会更加好一些。
这边文档大概的内容分布为:
- asio 的整体异步的架构;
- 如何阅读 asio 的源码
- scheduler,asio 的核心代码
- 找 timer、tcp 的异步发送数据的源码细节分析
- 总结和下期预告
1. asio 的整体的异步架构

这幅图是上一篇讲序言中提高的关于异步框架的本质,如果你能很好的了解图中所画你就知道基本上目前市面上的所有的异步框架都是这个类型,差别在于主要体现在性能优化和用户体验上;大致分为几种:
- callback: asio、seastar、js 或者 node.js 等
- coroutine: golang、大部分 stackful 的协程框架
- SM(state machine): rust async await 其他的
- other
基本上原理是不变的,都是给予上面这幅图所展示的,让 worker 的线程 non-blocking,让io-thread 做所有的 blocking 的操作,因为有了多路复用的技术,可以让单个或者少数的线程就能完全所有的 blocking 异步通知的功能。这边不要纠结与多用复用是否是异步这个概念,放到宏观来讲它提供上层异步的能力就行。
1.1 asio 的关键点

- Io-Type: 表示一些异步 io 的类型,比如 asio 中有定时器、socket、串行端口、signal 之类的,当然 asio 还有其他的一些,比如
basic_descriptor这种,这种主要是基础类,为了上面这些服务的;通常用户使用的都是定时器、socket 等等;- io_object_impl: io_object_impl 是 asio 使用的编程技巧,所有的 io-type 与某一个 service 进行关联都是通过 io_object_impl 来操作的,这是一个泛型,所以可以统一所有的异步类型 +service 的关系管理
- service: 这个最为重要的异步核心类,就类似于第一个部分讲的,所以大部分异步架构的本身都是这个模型,但是如何用这个模型来封装各种异步的操作呢,这就是 service 需要做的事情
1.2 asio 整体的架构

目前 asio 的整体任务流向基本上是这样的,主要分为几个步骤:
- io_type/io_object 的初始化,初始化的过程会伴随着 service 的创建或者注册,service 主要是用于管理相同类型的不同 io_object;
- service 根据实现来不断的往任务队列中 push task,对应的 task 可以认为是 cpu 不会被 blocking 任务,也可以认为是 complete task;
- worker 表示不断会被运行的线程;在代码中就是运行
service::run()的线程; 它会不断的从 scheduler's task queue 取出任务然后运行的;一定要记住:worker 运行的 task 是不能被阻塞的,如果阻塞造成的影响就是异步流程会 delay,比如 timer 不准时之类的;
是一个经典的 MPMC 的模型的;其实如果在学习其他的异步框架的时候甚至于其他类似的场景的时候,都会发现异步的本身其实就是 producer + consumer 模型,我在 twitter 上还戏言说:
难道异步的本质就是:queue + execute 吗?
你不断的去学习你会发现 queue 这个数据结构真心非常重要,在操作系统的各个环节、在各种业务程序的各个环节都起到了非常大的作用;
2. 关于如何阅读 asio 的代码
搭建可运行的环境; asio 有一个 standalone 版本,以头文件的方式提供出来,所以搭建一个可运行的环境还是比较简单的; learn_asio 这个是我自己的环境,上传到了 github 上了,可以运行使用的。后期我会慢慢补全里面的注释方便以后回顾;
找个极端简单的例子开始尝试的研究 asio;很多人上来就准备用 asio 写一个 tcp 服务端和客户端进行通信,个人觉得这样增加了学习 asio 的复杂度,原因在于 asio 的代码实现使用了不少 template metaprograming,可读性并不佳,很多的地方你跟进去之后就一脸懵逼了,完全找不到后面到底是什么的,所以个人觉得别上来就 socket 异步了,建议可以先 timer 这个 io_object,因为行为可控,并且涉及到的东西比较少; 个人自己分析的过程差不多是这样的,这个路径可以让你先了解架构和代码的实现的方式,后面分析 tcp 的时候靠猜你都知道它是怎么样完成的;
讲一些 asio 中有用的技巧
io_object与service之间的关系
// 这个是 basic_waitable_timer 这个类的数据成员,构造函数通常看不出来什么,但是直接看这个数据成员你就知道这个类关联的 service 是`deadline_timer_service`
detail::io_object_impl<
detail::deadline_timer_service<
detail::chrono_time_traits<Clock, WaitTraits> >,
executor_type > impl_;
async_wait到底做了什么?
// 基本上所有的 async_xxx 进来都只能看到这个,所以我不断的吐槽 asio 写的真的可读性太差了; 不过你只要注意到`initiate_async_wait`就可以,因为它是真正在做事情的
template <
ASIO_COMPLETION_TOKEN_FOR(void (asio::error_code))
WaitHandler ASIO_DEFAULT_COMPLETION_TOKEN_TYPE(executor_type)>
ASIO_INITFN_AUTO_RESULT_TYPE(WaitHandler,
void (asio::error_code))
async_wait(
ASIO_MOVE_ARG(WaitHandler) handler
ASIO_DEFAULT_COMPLETION_TOKEN(executor_type))
{
return async_initiate<WaitHandler, void (asio::error_code)>(
initiate_async_wait(this), handler);
}
// `initiate_async_wait`的 () 重载函数,这个函数就做了将这个 io-object 与底层的 service 进行交流,这边主要就是将定时器添加到底层的 service 中
template <typename WaitHandler>
void operator()(ASIO_MOVE_ARG(WaitHandler) handler) const
{
// If you get an error on the following line it means that your handler
// does not meet the documented type requirements for a WaitHandler.
ASIO_WAIT_HANDLER_CHECK(WaitHandler, handler) type_check;
detail::non_const_lvalue<WaitHandler> handler2(handler);
self_->impl_.get_service().async_wait(
self_->impl_.get_implementation(),
handler2.value, self_->impl_.get_executor());
}
上面两个技巧,主要完成了 io_object 到 service 之间的鸿沟,后面我们重点会将 asio 的 scheduler + 几个常见的 service;
基础类型
scheduler_operation: 这个类是所有 task 的基类,用来描述 worker thread 调用 task 的时候应该怎么做;目录地址为:
include/asio/detail/scheduler_operation.hpp- 核心数据成员为: func_,这个函数是子类构造函数来确定的,表示当前任务被执行的时候到底应该执行一些什么东西;
直接继承于
scheduler_operation的子类:挑几个重要的descriptor_state: 用来保存每一个句柄对应的 io 操作;这个很核心,理解网络的异步很重要的;reactor_op: 封装网络层面的 io 操作,比如有 async_read, async_write, async_resolve 等等;task_operation: 这个比较重要,原因在于它其实表示 epoll_wait 的任务;大概看这几个,因为比较关键,reactor_op 有很多继承类,用来描述不同的 op 需要的事情;
3. asio's io_service service 管理机制
io_service 是 asio 的核心类,所以的都是从初始化这个开始,上面我们提到不同的 io_object 都会有自己的 service,那 asio 是如何管理这些 service 的呢;这主要是由service_registry来进行管理的,大概的内容有:
文件:
include/asio/detail/impl/service_registry.hpp代码基本分析:
原理介绍 : 本质上是一个 map 数据结构 (具体实现是一个侵入式的 list,这边我不清楚为什么要这么做,map 其实也很合适呢,反正都有 lock),在单个的 io_service 中每一个 service 的实例只会存在一个,所以你可以理解为是一个管理 singleton 的模块,每一个 service 的构造和析构都在这个里面管理;当一个新的 io_object 被初始化的时候,会判断当前 service 是否存在,如果存在就返回已存在的,如果不存在就构造一个返回给用户;在单 io_service 是通过唯一 id 存在的; 当然除了 io_object 对应的 service,还有其他的基本 service 也是统一管理的,最典型的有 scheduler 这个也作为 service 在这个类中管理;
主要的代码分析
// 第一个 service,这就是我说的侵入式 list; 通过遍历的方式来找到对应的 service 是否存在,当然这边我觉得 service 本身量级都很小,所以用什么实现问题都不大 execution_context::service* first_service_; // 主要的几个函数 // 1. 获得某一个 service,如果没有就创建; template <typename Service> Service& use_service(io_context& owner); // 2. 将一个 service 添加进来,如果已经存在会报异常 template <typename Service> void add_service(Service* new_service); //判断某一个 service 是否已经存在于管理体系内部 template <typename Service> bool has_service() const;经典出现场景
io_object_impl's construct function: 按照本文上面所讲,每次 io_object 的初始化都会调用io_object_impl,所以在它构造函数中就会去获得对应的 service 实例
// use_service 就是调用上面的 service_registry::use_service explicit io_object_impl(const executor_type& ex) : service_(&asio::use_service<IoObjectService>( io_object_impl::get_context(ex))), executor_(ex) { service_->construct(implementation_); }- io_service 初始化的过程中也会调用对应的函数,因为需要把 scheduler 给注册进去
4. scheduler asio 的核心调度类
异步框架都会存在调度模块,这个模块作用是让各个 worker 能执行到任务,其实有点类似于平时我们用的线程池的 module,而 asio 的调度模块可能做了更多的事情,其中很大的一点在于性能上的权衡,但是这边的重点还是在于如何让各个 worker 都有 task 可以被运行,这样才能运用多核 cpu 的能力;
文件:
include/asio/detail/scheduler.hpp初始化流程:
伴随着 asio 中核心类
io_service初始化的时候会这个 scheduluer, impl 本质就是 scheduler 这个类; 也就是每一个io_service会都有一个 scheduler,不同的 io_service 是相互隔离的;io_context::io_context() : impl_(add_impl(new impl_type(*this, ASIO_CONCURRENCY_HINT_DEFAULT, false))){}
核心函数和数据成员的分析
数据成员:
reactor* task_: 对多路复用 IO 的封装,当然主要的功能op_queue<operation> op_queue_: 已经完成的 task 队列,或者说是可以被 worker thread 直接运行 task 的队列;mutable mutex mutex_: 保护 scheduler 内部对象的 lock; 因为 scheduler 可能被多个 thread 的调用
核心方法 (代码分析倒时候提供注释的方式提供在 github 上,否则文章会有点长):
scheduler::run: 对应到用户的代码中就是io_service::run,载体通常是新起来的 thread,当然也不一定scheduler::do_run_one:run内部会调用这个函数,这个函数是非常重要的函数, 它主要描述了 worker thread 如何获得任务,然后执行;这个函数名字主要的意思是执行一个可运行的任务,估计是为了让各个 worker 负载均衡一些吧;task_::run: 这个是各个操作系统对多路复用 IO 的封装; 主要通过将网络 io 或者 timer 一些时间注册上去,worker thread 会不断的执行类似与epoll_wait的函数来知道目前有哪些 io 可以被读写之类,如果这些异步操作完成就会将回调任务放回到队列中,让 worker thread 去执行;- linux: epoll; 文件为:
include/asio/detail/epoll_reactor.hpp - macos: kqueue; 文件为:
include/asio/detail/kqueue_reactor.hpp - windows: windows 有点不一样,没分析
- linux: epoll; 文件为:
聊一下在 queue 中的任务类型;
scheduler's op_queue_中存放了两种任务类型;分别为- 异步任务完成之后的 callback
- 仅仅只是为了当作线程池模型来使用的 task
多路复用的任务;
这三种任务都可以被理解为可运行的任务,置于第三种 (多路复用的任务) 在大部分场景下面使用的方式是
epoll_wait长期阻塞,但是 asio 希望能复用当前 thread,也就是即使不开辟新线程,在当前主线程也能完成异步的操作,所以这个类型的任务就被封装了,epoll_wait不是长期阻塞,而是如果没有可操作的 io 的话就瞬间返回,这样这种类型的任务就不会被阻塞;worker thread 可以保证高性能的运行;
下图对 scheduler 类进行了描述,能更加好的知道 scheduler 本身的一些作用;

5. 几个 service 分析
5.1 strand service
strand 的英文:链; 我个人对它在 asio 中的语义应该是保证 op 之间的链式调度或者顺序调用,比如对 socket 的读写可能有多个异步任务,当你的 worker thread > 1 之后就可能会触发多线程安全问题,但是用 strand 之后,它能保证的点在于:
- 相同 strand 的 op 之间是有明确的顺序关系的,必须我之前的 op 调用完之后才能调用它之后的 op
- 不同 op 可能运行在不同的 thread 上,但是能保证每一个时刻所有的 op 只会被一个 thread 操作;并不是说 strand 一定是运行在单线程上的;只不过单线程天生就保证了顺序性;
不过 strand 有一个函数叫做dispatch函数,这个函数会判断当前这个 op 是否是前一个 op 产生的,如果是的话就直接调用吧,不用排队了;

上图主要把 strand 的大概给描述了一下;其实 strand 的有序也是通过 lock 来保证了的,只不过它 lock 在临界区比较小,主要是用来保护内部的变量线程安全,其他的时候 worker thread 就无 lock 跑,效率应该不会差; 它的基本思路是这样的:
- 内部维护两个 queue,一个 queue 是 ready queue,可能已经被某一个线程在运行中; 另外一个 queue 是 wait queue,这个 queue 是用来存放 user 放进来的 task;
- 会有一个 flag 来标识,当前的 task 是存到哪个 queue 中,如果是存放到 ready queue 中的话,就会让底层 worker 去消费
- 消费的过程其实就是将 ready queue 中一个一个去运行,运行完之后会将 wait queue 中的队列放到 ready queue 中,并且往底层队列中放一个 task 来消费,通过这种方式来循环执行;
所以 strand 的机制是每一次 ready queue 只会被某一个线程消费,从而保证整体的有序性;
5.2 tcp 的异步过程

上图基本上涵盖了用户构建一个异步操作到真正 callback 被执行的过程;
- 用户构建 io_object 的对象,比如 async_read
- aiso 会找到对应的 service,找到对应的句柄对象,这个对象就是
descriptor_state - 将自己关注的操作初始化好,然后就注册到 reactor 对象中,本质就是
epoll_wait的操作 后面的操作就是会不断的调用
perform来接受数据或者 xxx- 4.1 当前 op 要求的数据已经达到要求,比如获得 100 byte 的数据,这个时候任务就完成,把这个完成任务放到私有队列中,后期会被合并到全局队列中
- 4.2 如果没有达到目前,比如就获得了 50byte,那么就继续等 epoll_wait
- 对于完成的 op 操作会存放到对应线程的私有队列,这个队列会在本次 op 完成之后合并到全局队列中,这么做的原因主要是为了性能影响;
6. 总结
哎..这文章慢慢写了两周,今天再来写的时候有点心累了...不过先写到这里,后面我慢慢在补充;源码分析的文章其实还蛮难写的,文字多了不好,代码也不能贴太多,不贴代码讲代码又有点空,所以就多挂了点图..就这样吧,后面会慢慢会补充 asio 的例子和代码的注释吧; 后面文章会继续,不过可能下次异步系列要等一段时间了;主要包含:
- rust 的 async/await
- golang 的 goroutine
- seastar 的分析
- tokio 分析,今天去参加一个 rust 的大会,有个人讲代码分析,我简直吐了...话讲不清楚,ppt 整段的代码..哎,希望我不会....