Timer.5 在多线程中同步 Completion Handler

Timer.5——在多线程中同步completion handler #

原文链接 | 源码链接

本教程演示如何使用 strand 类模板在多线程程序中同步 completion handlers 。

前面的四个教程通过仅在一个线程中调用 io_context::run() 函数来避免 handler 同步问题。正如前面所说,asio 库保证仅在当前调用 io_context::run() 的线程中调用 completion handlers。因此,仅在一个线程中调用 io_context::run() 可确保 completion handlers 不会并发执行。

使用 asio 开发应用程序时,单线程方法通常是最好的起点。缺点是它对程序(尤其是服务器)施加了限制,包括:

  • 当 handlers 可能需要很长时间才能完成时,响应速度很差。
  • 无法在多处理器系统上扩展。

如果你遇到了这些限制,另一种方法是使用一个调用 io_context::run() 的线程池。然而,由于这允许 handlers 并发执行,当 handlers 可能访问共享的、线程不安全的资源时,我们需要一种同步方法。

#include <iostream>
#include <boost/asio.hpp>
#include <boost/thread/thread.hpp>
#include <boost/bind/bind.hpp>

我们首先定义一个名为 printer 的类,类似于上一个教程中的类。这个类通过并行运行两个定时器来扩展上一教程。

class printer
{
public:

除了初始化一对 boost::asio::steady_timer 成员之外,构造函数还初始化了 strand_ 成员,它是一个 boost::asio::strand<boost::asio::io_context::executor_type> 类型的对象。

strand 类模板是一个执行器适配器(executor adapter),它保证在分配 handlers 时,正在执行的 handler 将在下一个 handler 启动之前完成。无论调用 io_context::run() 的线程有多少个,都能保证这一点。当然,handlers 仍然可以和其它不是通过 strand 分配或通过不同 strand 对象分配的 handler 同时执行。

  printer(boost::asio::io_context& io)
    : strand_(boost::asio::make_strand(io)),
      timer1_(io, boost::asio::chrono::seconds(1)),
      timer2_(io, boost::asio::chrono::seconds(1)),
      count_(0)
  {

当启动异步操作时,每个 completion handler 都“绑定”到 boost::asio::strand<boost::asio::io_context::executor_type> 对象。 boost::asio::bind_executor() 函数返回一个新的 handler ,该 handler 通过 strand 对象自动分派其包含的 handler。通过将 handlers 绑定到同一个 strand,我们确保它们不能同时执行。

    timer1_.async_wait(boost::asio::bind_executor(strand_,
          boost::bind(&printer::print1, this)));

    timer2_.async_wait(boost::asio::bind_executor(strand_,
          boost::bind(&printer::print2, this)));
  }

  ~printer()
  {
    std::cout << "Final count is " << count_ << std::endl;
  }

在多线程程序中,如果异步操作的 handler 访问共享资源,则应同步它们。在本教程中,handlers(print1 和 print2)使用的共享资源是 std::cout 和 count_ 数据成员。

  void print1()
  {
    if (count_ < 10)
    {
      std::cout << "Timer 1: " << count_ << std::endl;
      ++count_;

      timer1_.expires_at(timer1_.expiry() + boost::asio::chrono::seconds(1));

      timer1_.async_wait(boost::asio::bind_executor(strand_,
            boost::bind(&printer::print1, this)));
    }
  }

  void print2()
  {
    if (count_ < 10)
    {
      std::cout << "Timer 2: " << count_ << std::endl;
      ++count_;

      timer2_.expires_at(timer2_.expiry() + boost::asio::chrono::seconds(1));

      timer2_.async_wait(boost::asio::bind_executor(strand_,
            boost::bind(&printer::print2, this)));
    }
  }

private:
  boost::asio::strand<boost::asio::io_context::executor_type> strand_;
  boost::asio::steady_timer timer1_;
  boost::asio::steady_timer timer2_;
  int count_;
};

main 函数现在导致从两个线程调用 io_context::run():主线程和一个附加线程。这是使用 boost::thread 对象来完成的。

就像从单个线程调用一样,对 io_context::run() 的并发调用将在还有“工作”要做时继续执行。在所有异步操作完成之前,后台线程不会退出。

main 函数现在使得 io_context::run() 被两个线程被调用:主线程和一个附加线程。这是使用 boost::thread 对象来实现的。

就像它在单个线程中的调用一样,当还有“工作”要做时,对 io_context::run() 的并发调用将会持续进行。在所有异步操作完成之前,后台线程不会退出。

int main()
{
  boost::asio::io_context io;
  printer p(io);
  boost::thread t(boost::bind(&boost::asio::io_context::run, &io));
  io.run();
  t.join();

  return 0;
}