Boost Asio

Table of Contents

1. Boost.Asio 简介

Boost.Asio 是一个 C++实现的跨平台的 I/O 操作库,它可用于 socket 等 I/O 对象的同步或异步操作。 Boost.Asio 基于 Proactor 模式封装了操作系统提供的 select、kqueue、poll/epool 等机制,实现了异步 I/O 模型。

Boost.Asio 的核心类是 boost::asio::io_context (以前叫 io_service),每个 Boost.Asio 程序都至少有一个 boost::asio::io_context 类型的对象,它代表程序到操作系统 I/O 服务之间的“连接”。

在同步模式下,程序发起一个 I/O 操作,向 io_context 提交请求, io_context 把操作转交给操作系统,同步地等待。当 I/O 操作完成时,操作系统通知 io_context ,然后 io_context 再把结果发回给程序,完成整个同步流程。

在异步模式下,程序除了要发起 I/O 操作,还要定义一个用于完成处理后的回调函数。 io_context 同样把 I/O 操作转交给操作系统执行,但它不同步等待,而是立即返回。调用 io_contextrun() 成员函数可以等待异步操作完成,当异步操作完成时 io_context 从操作系统获取结果,再调用之前注册的回调函数。

本文主要参考:https://www.boost.org/doc/libs/1_67_0/doc/html/boost_asio.html

2. Boost.Asio 实例

2.1. 定时器

由于定时器类 boost::asio::steady_timer 比较简单,我们从它开始介绍 Boost.Asio 库的使用。

2.1.1. 同步定时器

下面是同步定时器实例:

#include <iostream>
#include <boost/asio.hpp>

int main()
{
  boost::asio::io_context io;

  boost::asio::steady_timer t(io, std::chrono::seconds(5));  // 5秒过期的定时器
  t.wait();                                // t.wait() 一直等待,直到定时器过期

  std::cout << "Hello, world!" << std::endl;

  return 0;
}

编译运行如下:

$ c++ -std=c++11 -lboost_system timer1.cpp -o timer1
$ ./timer1        # 5秒后输出下面内容
Hello, world!

2.1.2. 异步定时器

下面是异步定时器实例:

#include <iostream>
#include <boost/asio.hpp>

void print(const boost::system::error_code& /*e*/)
{
  std::cout << "Hello, world!" << std::endl;
}

int main()
{
  boost::asio::io_context io;

  boost::asio::steady_timer t(io, std::chrono::seconds(5));
  t.async_wait(&print);                    // 设置定时器过期后的回调函数

  std::cout << "Run here." << std::endl;   // 上一步是async_wait(不是wait),会马上输出这行

  io.run();    // 要使回调函数被执行,必需调用io_context::run(),且Asio库保证仅在调用了
               // io_context::run()的那个线程中,才会执行回调函数。
               // io_context::run()当“没有事情可做”时返回。在这个例子中,等待定时器过期,
               // 且回调函数执行完成后,run()才会返回。

  return 0;
}

编译运行如下:

$ c++ -std=c++11 -lboost_system timer2.cpp -o timer2
$ ./timer2           # 首先输出Run here.等待5秒后再输出Hello, world!
Run here.
Hello, world!

注 1: io_context::run() 意味着开始“阻塞执行事件循环”,它会阻塞等待直到所有注册到 io_context 上的事件完成。只有调用了它,回调函数才可能执行;且执行回调函数的线程就是调用 io_context::run() 那个线程。
注 2:由于函数名会隐式地转换为函数的指针,所以上面代码中 t.async_wait(&print); 也可以写为 t.async_wait(print);
注 3:在 Boost.Asio 库中回调函数不一定要是函数或者函数指针,函数对象、bind/lambda 表达式等可调用物也都可以作为回调函数。

2.1.3. 传参给回调函数

本节的例子和上节例子类似,只是增加了回调函数传递参数的功能。

通过 steady_timer::async_wait() 的参数可以设置回调函数, steady_timer::async_wait() 期待参数是一个函数对象(function object),该函数对象的签名为 void(const boost::system::error_code&) 。下面例子中,print 函数还有两个额外的参数,通过 boost::bind 可以把 print 函数转换为匹配签名的函数对象。

#include <iostream>
#include <boost/asio.hpp>
#include <boost/bind.hpp>

void print(const boost::system::error_code& /*e*/,
    boost::asio::steady_timer* pt, int* pcount)
{
  if (*pcount < 5)
  {
    std::cout << *pcount << std::endl;
    ++(*pcount);

    pt->expires_at(pt->expiry() + std::chrono::seconds(1));  // 重置过期时间
    pt->async_wait(boost::bind(print,                        // 设置回调,并传参
          boost::asio::placeholders::error, pt, pcount));
  }
}

int main()
{
  boost::asio::io_context io;

  int count = 0;
  boost::asio::steady_timer t(io, std::chrono::seconds(1));     // 定时器,1秒过期
  t.async_wait(boost::bind(print,                               // 设置回调,并传参
        boost::asio::placeholders::error, &t, &count));

  io.run();

  std::cout << "Final count is " << count << std::endl;

  return 0;
}

注: boost::bind 的参数 boost::asio::placeholders::error 是一个占位符,类似于 std::bind 中常使用的 std::placeholders::_1

编译运行如下:

$ c++ -std=c++11 -lboost_system timer3.cpp -o timer3
$ ./timer3            # 隔1秒输出一个数字
0
1
2
3
4
Final count is 5

2.1.4. 回调函数为类非静态成员函数(需要绑定隐含的 this 参数)

本节的例子和上节例子类似,只是回调函数为类成员函数。由于下面例子中 print 是非静态成员函数,所以需要使用 boost::bind 显式地把 this 参数绑定到 print 函数上。

#include <iostream>
#include <boost/asio.hpp>
#include <boost/bind.hpp>

class printer
{
public:
  printer(boost::asio::io_context& io)
    : timer_(io, std::chrono::seconds(1)),
      count_(0)
  {
    timer_.async_wait(boost::bind(&printer::print, this));    // 需要绑定隐含的this参数
  }

  ~printer()
  {
    std::cout << "Final count is " << count_ << std::endl;
  }

  void print()          // print为非静态成员函数
  {
    if (count_ < 5)
    {
      std::cout << count_ << std::endl;
      ++count_;

      timer_.expires_at(timer_.expiry() + std::chrono::seconds(1));
      timer_.async_wait(boost::bind(&printer::print, this));  // 需要绑定隐含的this参数
    }
  }

private:
  boost::asio::steady_timer timer_;
  int count_;
};

int main()
{
  boost::asio::io_context io;
  printer p(io);
  io.run();

  return 0;
}

编译运行如下:

$ c++ -lboost_system -lboost_chrono timer4.cpp -o timer4
$ ./timer4
0
1
2
3
4
Final count is 5

2.1.5. 同步回调函数(io_context::strand 使用实例)

前面说过,回调函数会在执行 io_context::run() 的那个线程中执行。如果某个回调函数的执行需要较长的时间,则可能会导致其它回调函数得不到及时执行。比如:

#include <iostream>
#include <chrono>
#include <thread>

#include <boost/asio.hpp>

void print1(const boost::system::error_code& /*e*/)
{
  std::cout << "print1 start!" << std::endl;
  std::this_thread::sleep_for(std::chrono::seconds(5));  // sleep 5秒,模拟长时间任务
  std::cout << "print1 done!" << std::endl;
}

void print2(const boost::system::error_code& /*e*/)
{
  std::cout << "print2 start!" << std::endl;
  std::this_thread::sleep_for(std::chrono::seconds(5));  // sleep 5秒,模拟长时间任务
  std::cout << "print2 done!" << std::endl;
}

int main()
{
  boost::asio::io_context io;

  boost::asio::steady_timer t1(io, std::chrono::seconds(5));
  boost::asio::steady_timer t2(io, std::chrono::seconds(5));

  t1.async_wait(&print1);
  t2.async_wait(&print2);

  io.run();

  return 0;
}

在上面程序中,定义了两个定时器 t1 和 t2,分别在超时后执行 print1 和 print2,它们都需要执行较长时间(5秒)。由于仅在 main 线程中执行 io.run(),所以两个回调函数都会在 main 线程中执行,由于只有一个线程,其中一个回调函数不得不等待另一个执行完后再执行。

有一个办法可以让回调函数并发地执行:启多个线程执行 io_context::run() 对于上面的例子,我们可以把 io.run(); 替换为:

// 方式一:
// 启动另一个单独线程,执行 io.run();
// main线程依然执行 io.run()
  std::thread thread1{[&io](){ io.run(); }};

  io.run();

  thread1.join();

也可以把 io.run(); 替换为:

// 方式二:
// 启动另两个线程,执行 io.run();
// main线程无需执行 io.run()
  std::thread thread1{[&io](){ io.run(); }};
  std::thread thread2{[&io](){ io.run(); }};

  thread1.join();
  thread2.join();

上面两种方式中,两个回调函数会在不同的线程中执行,不用相互等待了。但是,如果两个回调函数访问共享资源,这将导致竞争条件。上面例子中就存在竞争条件,因为 print1 和 print2 会“同时”访问 std::out,下面是采用方式二时的输出实例:

print1 start!print2 start!

print2 done!print1 done!

或者:

print1 start!print2 start!

print1 done!
print2 done!

可见,多次运行得到的输出不一样,这是由于存在竞争条件(竞争 std::out 的使用)。

我们可以通过加锁来避免竞争条件,这里介绍另外一种方法: 通过 io_context::strand 对象来实现回调函数的串行化,从而避免竞争条件。

The boost::asio::bind_executor() function returns a new handler that automatically dispatches its contained handler through the io_context::strand object. By binding the handlers to the same io_context::strand, we are ensuring that they cannot execute concurrently.

下面是使用 boost::asio::io_context::strand 对象让回调函数串行化的实例:

#include <iostream>
#include <chrono>
#include <thread>

#include <boost/asio.hpp>

void print1(const boost::system::error_code& /*e*/)
{
  std::cout << "print1 start!" << std::endl;
  std::this_thread::sleep_for(std::chrono::seconds(5));  // sleep 5秒,模拟长时间任务
  std::cout << "print1 done!" << std::endl;
}

void print2(const boost::system::error_code& /*e*/)
{
  std::cout << "print2 start!" << std::endl;
  std::this_thread::sleep_for(std::chrono::seconds(5));  // sleep 5秒,模拟长时间任务
  std::cout << "print2 done!" << std::endl;
}

int main()
{
  boost::asio::io_context io;

  boost::asio::steady_timer t1(io, std::chrono::seconds(5));
  boost::asio::steady_timer t2(io, std::chrono::seconds(5));

  boost::asio::io_context::strand strand1(io);
  t1.async_wait(boost::asio::bind_executor(strand1, &print1));
  t2.async_wait(boost::asio::bind_executor(strand1, &print2));

  std::thread thread1{[&io](){ io.run(); }};
  std::thread thread2{[&io](){ io.run(); }};

  thread1.join();
  thread2.join();

  return 0;
}

上面例子中,由于 print1 和 print2 关联到了同一个 boost::asio::io_context::strand 对象(即 strand1),所以 print1 和 print2 不会并行执行。

上面程序的输出:

print1 start!
print1 done!
print2 start!
print2 done!

2.2. 信号处理

Boost.Asio 提供了类 signal_set ,利用异步 I/O 的方式处理 UNIX 信号。下面是一个实例程序:

 1: #include <iostream>
 2: #include <boost/asio.hpp>
 3: 
 4: using namespace boost::asio;
 5: using namespace boost::system;
 6: 
 7: int main() {
 8:     io_context io;
 9:     signal_set sig(io, SIGINT);     // 捕获信号SIGINT(Ctrl+C可发送该信号)
10: 
11:     typedef void (handler_type) (const boost::system::error_code& ec, int n);  // 定义回调函数类型
12:     std::function<handler_type> handler1 = [] (const boost::system::error_code& ec, int n) {
13:         std::cout << "handler1 called" << std::endl;
14:         if (ec) {             // 检查是否有错误发生
15:             std::cout << ec.message() << std::endl;
16:             return;
17:         }
18: 
19:         if (n != SIGINT) {    // 检查信号是否是要处理的信号
20:             return;
21:         }
22: 
23:         std::cout << "handler1 do something" << std::endl;
24:     };
25: 
26:     sig.async_wait(handler1);
27: 
28:     io.run();
29:     std::cout << "program finished" << std::endl;
30:     return 0;
31: }

编译上面程序,运行后按 Ctrl+C 发送 SIGINT 信号:

$ c++ -std=c++11 -lboost_system sigtest.cpp -o sigtest
$ ./sigtest             # 按 Ctrl + C
^Chandler1 called
handler1 do something
program finished

注:上面代码中第 11 行和第 12 行可简写为:

    auto handler1 = [] (const boost::system::error_code& ec, int n) {

2.3. 网络 I/O

Boost.Asio 抽象了本地文件、网络等等 I/O 操作,提供了下面函数进行同步或异步读写:

read(stream, buffer [, extra options])
async_read(stream, buffer [, extra options], handler)
write(stream, buffer [, extra options])
async_write(stream, buffer [, extra options], handler)

2.3.1. 实例:同步 Daytime 服务器

下面是使用 Boost.Asio 实现同步 Daytime 服务器(迭代服务器,一次只处理一个连接)的例子:

#include <ctime>
#include <iostream>
#include <string>
#include <boost/asio.hpp>

using boost::asio::ip::tcp;

std::string make_daytime_string()
{
  using namespace std; // For time_t, time and ctime;
  time_t now = time(0);
  return ctime(&now);
}

int main()
{
  try
  {
    boost::asio::io_context io_context;

    tcp::acceptor acceptor(io_context, tcp::endpoint(tcp::v4(), 11113));  // 绑定到tcp端口11113

    for (;;)  // 属于迭代服务器(一次只处理一个连接)。一旦有客户端连接服务器就发送当前时间
    {
      tcp::socket socket(io_context);                    // 创建socket对象
      acceptor.accept(socket);                           // 阻塞等待socket连接

      std::string message = make_daytime_string();

      boost::system::error_code ignored_error;
      boost::asio::write(socket, boost::asio::buffer(message), ignored_error);  // 同步往socket写数据
    }
  }
  catch (std::exception& e)
  {
    std::cerr << e.what() << std::endl;
  }

  return 0;
}

上面代码和传统的 socket 程序类似,只不过 tcp::acceptor, tcp::socket 等类包装了底层 API,代码更新简单。

运行上面服务器后,可以使用 nc 进行测试:

$ nc localhost 11113
Mon Aug  6 20:53:29 2018

2.3.2. 实例:同步 Daytime 客户端

下面是使用 Boost.Asio 实现同步 Daytime 客户端(功能是连接服务器,输出服务器返回的数据)的例子:

//
// client.cpp
// ~~~~~~~~~~
//
// Copyright (c) 2003-2018 Christopher M. Kohlhoff (chris at kohlhoff dot com)
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//

#include <iostream>
#include <boost/array.hpp>
#include <boost/asio.hpp>

using boost::asio::ip::tcp;

int main(int argc, char* argv[])
{
  try
  {
    if (argc != 3)
    {
      std::cerr << "Usage: client <host> <port>" << std::endl;
      return 1;
    }

    boost::asio::io_context io_context;

    tcp::resolver resolver(io_context);
    tcp::resolver::results_type endpoints = resolver.resolve(argv[1], argv[2]);

    tcp::socket socket(io_context);
    boost::asio::connect(socket, endpoints);

    for (;;)
    {
      boost::array<char, 128> buf;
      boost::system::error_code error;

      size_t len = socket.read_some(boost::asio::buffer(buf), error);

      if (error == boost::asio::error::eof)  // 服务器发送完毕后退出for循环
        break; // Connection closed cleanly by peer.
      else if (error)
        throw boost::system::system_error(error); // Some other error.

      std::cout.write(buf.data(), len);
    }
  }
  catch (std::exception& e)
  {
    std::cerr << e.what() << std::endl;
  }

  return 0;
}

运行上一节介绍的服务器后,编译运行本节的客户端:

$ c++ -std=c++11 -lboost_system client.cpp -o client
$ ./client localhost 11113
Mon Aug  6 21:13:29 2018

2.3.3. 实例:异步 Daytime 服务器

和同步服务器使用 acceptwrite 不同,异步服务器需要使用 async_acceptasync_write 等函数。完整的例子可参考:Daytime.3 - An asynchronous TCP daytime server

Author: cig01

Created: <2018-08-04 Sat>

Last updated: <2018-08-17 Fri>

Creator: Emacs 27.1 (Org mode 9.4)