Boost Asio
Table of Contents
1. Boost.Asio 简介
Boost.Asio 是一个 C++实现的跨平台的 I/O 操作库,它可用于 socket 等 I/O 对象的同步或异步操作。 Boost.Asio 基于 Proactor 模式封装了操作系统提供的 select、kqueue、poll/epool 等机制,实现了异步 I/O 模型。
Boost.Asio 的核心类是 boost::asio::io_context
(以前叫 io_service),每个 Boost.Asio 程序都至少有一个 boost::asio::io_context
类型的对象,它代表程序到操作系统 I/O 服务之间的“连接”。
在同步模式下,程序发起一个 I/O 操作,向 io_context
提交请求, io_context
把操作转交给操作系统,同步地等待。当 I/O 操作完成时,操作系统通知 io_context
,然后 io_context
再把结果发回给程序,完成整个同步流程。
在异步模式下,程序除了要发起 I/O 操作,还要定义一个用于完成处理后的回调函数。 io_context
同样把 I/O 操作转交给操作系统执行,但它不同步等待,而是立即返回。调用 io_context
的 run()
成员函数可以等待异步操作完成,当异步操作完成时 io_context
从操作系统获取结果,再调用之前注册的回调函数。
本文主要参考:https://www.boost.org/doc/libs/1_67_0/doc/html/boost_asio.html
2. Boost.Asio 实例
2.1. 定时器
由于定时器类 boost::asio::steady_timer
比较简单,我们从它开始介绍 Boost.Asio 库的使用。
2.1.1. 同步定时器
下面是同步定时器实例:
#include <iostream> #include <boost/asio.hpp> int main() { boost::asio::io_context io; boost::asio::steady_timer t(io, std::chrono::seconds(5)); // 5秒过期的定时器 t.wait(); // t.wait() 一直等待,直到定时器过期 std::cout << "Hello, world!" << std::endl; return 0; }
编译运行如下:
$ c++ -std=c++11 -lboost_system timer1.cpp -o timer1 $ ./timer1 # 5秒后输出下面内容 Hello, world!
2.1.2. 异步定时器
下面是异步定时器实例:
#include <iostream> #include <boost/asio.hpp> void print(const boost::system::error_code& /*e*/) { std::cout << "Hello, world!" << std::endl; } int main() { boost::asio::io_context io; boost::asio::steady_timer t(io, std::chrono::seconds(5)); t.async_wait(&print); // 设置定时器过期后的回调函数 std::cout << "Run here." << std::endl; // 上一步是async_wait(不是wait),会马上输出这行 io.run(); // 要使回调函数被执行,必需调用io_context::run(),且Asio库保证仅在调用了 // io_context::run()的那个线程中,才会执行回调函数。 // io_context::run()当“没有事情可做”时返回。在这个例子中,等待定时器过期, // 且回调函数执行完成后,run()才会返回。 return 0; }
编译运行如下:
$ c++ -std=c++11 -lboost_system timer2.cpp -o timer2 $ ./timer2 # 首先输出Run here.等待5秒后再输出Hello, world! Run here. Hello, world!
注 1: io_context::run()
意味着开始“阻塞执行事件循环”,它会阻塞等待直到所有注册到 io_context
上的事件完成。只有调用了它,回调函数才可能执行;且执行回调函数的线程就是调用 io_context::run()
那个线程。
注 2:由于函数名会隐式地转换为函数的指针,所以上面代码中 t.async_wait(&print);
也可以写为 t.async_wait(print);
。
注 3:在 Boost.Asio 库中回调函数不一定要是函数或者函数指针,函数对象、bind/lambda 表达式等可调用物也都可以作为回调函数。
2.1.3. 传参给回调函数
本节的例子和上节例子类似,只是增加了回调函数传递参数的功能。
通过 steady_timer::async_wait() 的参数可以设置回调函数, steady_timer::async_wait() 期待参数是一个函数对象(function object),该函数对象的签名为 void(const boost::system::error_code&)
。下面例子中,print 函数还有两个额外的参数,通过 boost::bind
可以把 print 函数转换为匹配签名的函数对象。
#include <iostream> #include <boost/asio.hpp> #include <boost/bind.hpp> void print(const boost::system::error_code& /*e*/, boost::asio::steady_timer* pt, int* pcount) { if (*pcount < 5) { std::cout << *pcount << std::endl; ++(*pcount); pt->expires_at(pt->expiry() + std::chrono::seconds(1)); // 重置过期时间 pt->async_wait(boost::bind(print, // 设置回调,并传参 boost::asio::placeholders::error, pt, pcount)); } } int main() { boost::asio::io_context io; int count = 0; boost::asio::steady_timer t(io, std::chrono::seconds(1)); // 定时器,1秒过期 t.async_wait(boost::bind(print, // 设置回调,并传参 boost::asio::placeholders::error, &t, &count)); io.run(); std::cout << "Final count is " << count << std::endl; return 0; }
注: boost::bind
的参数 boost::asio::placeholders::error
是一个占位符,类似于 std::bind
中常使用的 std::placeholders::_1
。
编译运行如下:
$ c++ -std=c++11 -lboost_system timer3.cpp -o timer3 $ ./timer3 # 隔1秒输出一个数字 0 1 2 3 4 Final count is 5
2.1.4. 回调函数为类非静态成员函数(需要绑定隐含的 this 参数)
本节的例子和上节例子类似,只是回调函数为类成员函数。由于下面例子中 print 是非静态成员函数,所以需要使用 boost::bind
显式地把 this
参数绑定到 print 函数上。
#include <iostream> #include <boost/asio.hpp> #include <boost/bind.hpp> class printer { public: printer(boost::asio::io_context& io) : timer_(io, std::chrono::seconds(1)), count_(0) { timer_.async_wait(boost::bind(&printer::print, this)); // 需要绑定隐含的this参数 } ~printer() { std::cout << "Final count is " << count_ << std::endl; } void print() // print为非静态成员函数 { if (count_ < 5) { std::cout << count_ << std::endl; ++count_; timer_.expires_at(timer_.expiry() + std::chrono::seconds(1)); timer_.async_wait(boost::bind(&printer::print, this)); // 需要绑定隐含的this参数 } } private: boost::asio::steady_timer timer_; int count_; }; int main() { boost::asio::io_context io; printer p(io); io.run(); return 0; }
编译运行如下:
$ c++ -lboost_system -lboost_chrono timer4.cpp -o timer4 $ ./timer4 0 1 2 3 4 Final count is 5
2.1.5. 同步回调函数(io_context::strand 使用实例)
前面说过,回调函数会在执行 io_context::run()
的那个线程中执行。如果某个回调函数的执行需要较长的时间,则可能会导致其它回调函数得不到及时执行。比如:
#include <iostream> #include <chrono> #include <thread> #include <boost/asio.hpp> void print1(const boost::system::error_code& /*e*/) { std::cout << "print1 start!" << std::endl; std::this_thread::sleep_for(std::chrono::seconds(5)); // sleep 5秒,模拟长时间任务 std::cout << "print1 done!" << std::endl; } void print2(const boost::system::error_code& /*e*/) { std::cout << "print2 start!" << std::endl; std::this_thread::sleep_for(std::chrono::seconds(5)); // sleep 5秒,模拟长时间任务 std::cout << "print2 done!" << std::endl; } int main() { boost::asio::io_context io; boost::asio::steady_timer t1(io, std::chrono::seconds(5)); boost::asio::steady_timer t2(io, std::chrono::seconds(5)); t1.async_wait(&print1); t2.async_wait(&print2); io.run(); return 0; }
在上面程序中,定义了两个定时器 t1 和 t2,分别在超时后执行 print1 和 print2,它们都需要执行较长时间(5秒)。由于仅在 main 线程中执行 io.run(),所以两个回调函数都会在 main 线程中执行,由于只有一个线程,其中一个回调函数不得不等待另一个执行完后再执行。
有一个办法可以让回调函数并发地执行:启多个线程执行 io_context::run()
。 对于上面的例子,我们可以把 io.run();
替换为:
// 方式一: // 启动另一个单独线程,执行 io.run(); // main线程依然执行 io.run() std::thread thread1{[&io](){ io.run(); }}; io.run(); thread1.join();
也可以把 io.run();
替换为:
// 方式二: // 启动另两个线程,执行 io.run(); // main线程无需执行 io.run() std::thread thread1{[&io](){ io.run(); }}; std::thread thread2{[&io](){ io.run(); }}; thread1.join(); thread2.join();
上面两种方式中,两个回调函数会在不同的线程中执行,不用相互等待了。但是,如果两个回调函数访问共享资源,这将导致竞争条件。上面例子中就存在竞争条件,因为 print1 和 print2 会“同时”访问 std::out,下面是采用方式二时的输出实例:
print1 start!print2 start! print2 done!print1 done!
或者:
print1 start!print2 start! print1 done! print2 done!
可见,多次运行得到的输出不一样,这是由于存在竞争条件(竞争 std::out 的使用)。
我们可以通过加锁来避免竞争条件,这里介绍另外一种方法: 通过 io_context::strand
对象来实现回调函数的串行化,从而避免竞争条件。
The boost::asio::bind_executor()
function returns a new handler that automatically dispatches its contained handler through the io_context::strand
object. By binding the handlers to the same io_context::strand
, we are ensuring that they cannot execute concurrently.
下面是使用 boost::asio::io_context::strand
对象让回调函数串行化的实例:
#include <iostream> #include <chrono> #include <thread> #include <boost/asio.hpp> void print1(const boost::system::error_code& /*e*/) { std::cout << "print1 start!" << std::endl; std::this_thread::sleep_for(std::chrono::seconds(5)); // sleep 5秒,模拟长时间任务 std::cout << "print1 done!" << std::endl; } void print2(const boost::system::error_code& /*e*/) { std::cout << "print2 start!" << std::endl; std::this_thread::sleep_for(std::chrono::seconds(5)); // sleep 5秒,模拟长时间任务 std::cout << "print2 done!" << std::endl; } int main() { boost::asio::io_context io; boost::asio::steady_timer t1(io, std::chrono::seconds(5)); boost::asio::steady_timer t2(io, std::chrono::seconds(5)); boost::asio::io_context::strand strand1(io); t1.async_wait(boost::asio::bind_executor(strand1, &print1)); t2.async_wait(boost::asio::bind_executor(strand1, &print2)); std::thread thread1{[&io](){ io.run(); }}; std::thread thread2{[&io](){ io.run(); }}; thread1.join(); thread2.join(); return 0; }
上面例子中,由于 print1 和 print2 关联到了同一个 boost::asio::io_context::strand
对象(即 strand1),所以 print1 和 print2 不会并行执行。
上面程序的输出:
print1 start! print1 done! print2 start! print2 done!
2.2. 信号处理
Boost.Asio 提供了类 signal_set
,利用异步 I/O 的方式处理 UNIX 信号。下面是一个实例程序:
1: #include <iostream> 2: #include <boost/asio.hpp> 3: 4: using namespace boost::asio; 5: using namespace boost::system; 6: 7: int main() { 8: io_context io; 9: signal_set sig(io, SIGINT); // 捕获信号SIGINT(Ctrl+C可发送该信号) 10: 11: typedef void (handler_type) (const boost::system::error_code& ec, int n); // 定义回调函数类型 12: std::function<handler_type> handler1 = [] (const boost::system::error_code& ec, int n) { 13: std::cout << "handler1 called" << std::endl; 14: if (ec) { // 检查是否有错误发生 15: std::cout << ec.message() << std::endl; 16: return; 17: } 18: 19: if (n != SIGINT) { // 检查信号是否是要处理的信号 20: return; 21: } 22: 23: std::cout << "handler1 do something" << std::endl; 24: }; 25: 26: sig.async_wait(handler1); 27: 28: io.run(); 29: std::cout << "program finished" << std::endl; 30: return 0; 31: }
编译上面程序,运行后按 Ctrl+C 发送 SIGINT 信号:
$ c++ -std=c++11 -lboost_system sigtest.cpp -o sigtest $ ./sigtest # 按 Ctrl + C ^Chandler1 called handler1 do something program finished
注:上面代码中第 11 行和第 12 行可简写为:
auto handler1 = [] (const boost::system::error_code& ec, int n) {
2.3. 网络 I/O
Boost.Asio 抽象了本地文件、网络等等 I/O 操作,提供了下面函数进行同步或异步读写:
read(stream, buffer [, extra options]) async_read(stream, buffer [, extra options], handler) write(stream, buffer [, extra options]) async_write(stream, buffer [, extra options], handler)
2.3.1. 实例:同步 Daytime 服务器
下面是使用 Boost.Asio 实现同步 Daytime 服务器(迭代服务器,一次只处理一个连接)的例子:
#include <ctime> #include <iostream> #include <string> #include <boost/asio.hpp> using boost::asio::ip::tcp; std::string make_daytime_string() { using namespace std; // For time_t, time and ctime; time_t now = time(0); return ctime(&now); } int main() { try { boost::asio::io_context io_context; tcp::acceptor acceptor(io_context, tcp::endpoint(tcp::v4(), 11113)); // 绑定到tcp端口11113 for (;;) // 属于迭代服务器(一次只处理一个连接)。一旦有客户端连接服务器就发送当前时间 { tcp::socket socket(io_context); // 创建socket对象 acceptor.accept(socket); // 阻塞等待socket连接 std::string message = make_daytime_string(); boost::system::error_code ignored_error; boost::asio::write(socket, boost::asio::buffer(message), ignored_error); // 同步往socket写数据 } } catch (std::exception& e) { std::cerr << e.what() << std::endl; } return 0; }
上面代码和传统的 socket 程序类似,只不过 tcp::acceptor, tcp::socket 等类包装了底层 API,代码更新简单。
运行上面服务器后,可以使用 nc
进行测试:
$ nc localhost 11113 Mon Aug 6 20:53:29 2018
2.3.2. 实例:同步 Daytime 客户端
下面是使用 Boost.Asio 实现同步 Daytime 客户端(功能是连接服务器,输出服务器返回的数据)的例子:
// // client.cpp // ~~~~~~~~~~ // // Copyright (c) 2003-2018 Christopher M. Kohlhoff (chris at kohlhoff dot com) // // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) // #include <iostream> #include <boost/array.hpp> #include <boost/asio.hpp> using boost::asio::ip::tcp; int main(int argc, char* argv[]) { try { if (argc != 3) { std::cerr << "Usage: client <host> <port>" << std::endl; return 1; } boost::asio::io_context io_context; tcp::resolver resolver(io_context); tcp::resolver::results_type endpoints = resolver.resolve(argv[1], argv[2]); tcp::socket socket(io_context); boost::asio::connect(socket, endpoints); for (;;) { boost::array<char, 128> buf; boost::system::error_code error; size_t len = socket.read_some(boost::asio::buffer(buf), error); if (error == boost::asio::error::eof) // 服务器发送完毕后退出for循环 break; // Connection closed cleanly by peer. else if (error) throw boost::system::system_error(error); // Some other error. std::cout.write(buf.data(), len); } } catch (std::exception& e) { std::cerr << e.what() << std::endl; } return 0; }
运行上一节介绍的服务器后,编译运行本节的客户端:
$ c++ -std=c++11 -lboost_system client.cpp -o client $ ./client localhost 11113 Mon Aug 6 21:13:29 2018
2.3.3. 实例:异步 Daytime 服务器
和同步服务器使用 accept
和 write
不同,异步服务器需要使用 async_accept
和 async_write
等函数。完整的例子可参考:Daytime.3 - An asynchronous TCP daytime server