C++ Concurrency

Table of Contents

1. C++ 并发简介

C++11 中对并发添加了语言级别的支持,如增加了 std::thread, std::future, std::promise 等设施。C++14/C++17 中对并发的支持进一步完善。

1.1. Hello Concurrent World

下面是一个简单的 C++ 并发程序:

#include <iostream>
#include <thread>
void hello()
{
  std::cout << "Hello Concurrent World\n";
}

int main()
{
  std::thread t(hello);  // 创建一个新线程 t,t 将执行函数 hello
  t.join();              // 等待线程 t,防止 main 比 t 先执行结束
}

2. 管理线程

2.1. 启动线程

构造 std::thread 对象可以启动新线程。如:

void do_some_work();
std::thread my_thread(do_some_work);  // do_some_work 是普通函数

std::thread 也可以通过有函数操作符的类的实例进行构造,如:

class background_task
{
public:
  void operator()() const
  {
    do_something();
    do_something_else();
  }
};

background_task f;
std::thread my_thread(f);             // f 是具有 operator() 的类的实例

上面代码中,提供的函数对象 f 会复制到新线程的存储空间中,函数对象的执行和调用都在线程的内存空间中进行。

需要注意的是,当把函数对象传入到线程构造函数中时,需要避免 C++’s most vexing parse 问题。如果你传递了一个临时变量,而不是一个命名的变量。C++ 编译器会将其解析为函数声明,而不是类型对象的定义。

std::thread my_thread(background_task());     // C++ 编译器认为这里声明了一个名为 my_thread 的函数

那怎么用临时变量构造 std::thread 对象呢?有两种方式,一是多加一对小括号;二是使用统一的初始化语法。如:

std::thread my_thread((background_task()));   // 解决 most vexing parse 的方法一:多加一对小括号
std::thread my_thread{background_task()};     // 解决 most vexing parse 的方法二:使用统一的初始化语法

除此外,使用 Lambda 表达式也能避免这个问题:

std::thread my_thread([]{                     // 使用Lambda 表达式构造 std::thread 对象
  do_something();
  do_something_else();
});

2.1.1. 不要用访问局部变量的函数去创建线程

考虑下面例子:

struct func
{
  int& i;
  func(int& i_) : i(i_) {}
  void operator() ()
  {
    for (unsigned j=0 ; j<1000000 ; ++j)
    {
      do_something(i);            // 潜在访问隐患:空引用
    }
  }
};

void oops()
{
  int some_local_state=0;         // 局部变量
  func my_func(some_local_state);
  std::thread my_thread(my_func);
  my_thread.detach();             // 不等待线程结束
}                                 // 新线程可能还在运行

上面代码中,函数 oops() 中创建了线程 my_thread ,这个新线程执行 my_func ,但 my_func 使用了局部变量 some_local_state

如果函数 oops() 结束,那么局部变量 some_local_state 会被销毁;那么线程 my_thread 还访问这个局部变量就会出问题。要解决这个问题,可以 1、在 oops() 函数中通过调用 join() 等待线程 my_thread ;2、把局部变量 some_local_state 改为全局变量;3、修改 func 的定义,把 i 从引用变量改为普通变量。

2.1.2. 要么 detach,要么 join

线程启动后要决定“等待线程结束”(调用 join),还是“让其自主运行”(调用 detach)。 如果 std::thread 对象在销毁之前还没有做出决定,程序就会终止, std::thread 的析构函数会调用 std::terminate()

比如,下面程序由于线程 t 在其销毁之间,既没有调用 join,也没有调用 detach,会导致程序 terminate:

#include <iostream>
#include <thread>
void hello()
{
  std::cout << "Hello Concurrent World\n";
}

int main()
{
  std::thread t(hello);    // 线程 t 在销毁之前还没有调用 detach,也没有调用 join,程序会 terminate
}

2.1.3. 确保线程的 join 方法被调用

假设我们想在函数 f 结束之间调用 t.join() ,可以这样做:

void f()
{
  int some_local_state=0;
  func my_func(some_local_state);
  std::thread t(my_func);
  try
  {
    do_something_in_current_thread();
  }
  catch(...)
  {
    t.join();    // 1 这一处容易忘记,确保 do_something_in_current_thread 有异常时也调用 t.join()
    throw;
  }
  t.join();      // 2
}

要写两个 t.join() ,这多少有些繁琐,下节将介绍使用 RAII 确保线程的 join 方法被调用。

2.1.4. 使用 RAII 确保线程的 join 方法被调用(thread_guard)

下面介绍使用“资源获取即初始化方式”(RAII,Resource Acquisition Is Initialization)来确保线程的 join 方法被调用。即,提供一个类,在析构函数中使用 join()。

下面 thread_guard 类可以帮忙我们调用线程的 join 方法:

class thread_guard
{
  std::thread& t;
public:
  explicit thread_guard(std::thread& t_):
    t(t_)
  {}
  ~thread_guard()
  {
    if(t.joinable())
    {
      t.join();
    }
  }
  thread_guard(thread_guard const&)=delete;
  thread_guard& operator=(thread_guard const&)=delete;
};

thread_guard 中拷贝构造函数和拷贝赋值操作标记为 =delete ,是为了不让编译器自动生成。直接对 thread_guard 对象进行拷贝或赋值是很危险的,因为这可能会弄丢已汇入的线程。通过删除声明,任何尝试给 thread_guard 对象赋值的操作都会引发一个编译错误。

下面是 thread_guard 的使用例子:

struct func
{
  int& i;
  func(int& i_) : i(i_) {}
  void operator() ()
  {
    for (unsigned j=0 ; j<1000000 ; ++j)
    {
      do_something(i);
    }
  }
};

void f()
{
  int some_local_state=0;
  func my_func(some_local_state);
  std::thread t(my_func);
  thread_guard g(t);               // 使用 thread_guard 确保 t 的 join 一定会被调用,防止程序 terminate
  do_something_in_current_thread();
};

2.2. 启动线程时传递参数

在启动线程时,可以指定线程所关联函数的参数,如:

void f(int i, std::string const& s);

std::thread t(f, 3, "hello");         // 第一个参数为线程所关联函数,其余参数为函数的参数

在上面这个例子中,需要说明的是 f 的第 2 个参数是 std::string 类型,而我们传递进去的是 char const * 类型,这里有个类型转换, 这个类型转换是在“新线程的上下文”中进行的。 我们考虑一个危险的例子:

// 下面代码有问题!
void f(int i, std::string const& s);  // f 第二个参数是 std::string

void oops(int some_param)
{
  char buffer[1024];
  sprintf(buffer, "%i", some_param);
  std::thread t(f, 3, buffer);        // 传给 f 的第二参数是 char const *
  t.detach();
}

由于 buffer 转换为 std::string 这个操作是在“新线程的上下文”中进行的,这里就有问题了:函数 oops 可能会在 buffer 转换成 std::string 之前就结束,这时新线程再去访问 buffer 就有问题了( buffer 已经被释放)。这个问题的解决方案就是在传递到 std::thread 构造函数之前,就将字面值转化为 std::string ,如:

void f(int i, std::string const& s);

void not_oops(int some_param)
{
  char buffer[1024];
  sprintf(buffer, "%i", some_param);
  std::thread t(f, 3, std::string(buffer)); // 提前用 std::string,避免在“新线程的上下文”进行类型转换时 buffer 可能失效
  t.detach();
}

2.2.1. 启动线程时传递引用类型的参数

在启动线程时,如果所关联的函数的参数是引用类型,则传参时需要使用 std::ref

考虑下面例子:

#include <iostream>
#include <thread>

void func1(int& n) { ++n; }     // func1 的参数是非常量“引用”类型

int main()
{
  int i = 1;
  std::thread t(func1, i);      // 这里编译时会出错
  t.join();
  std::cout << i;
}

std::thread 的构造函数无视函数参数类型,盲目地拷贝已提供的变量。不过,内部代码会将拷贝的参数以右值的方式进行传递,这是为了那些只支持移动的类型,而后会尝试以右值为实参调用函数 func1 。但因为函数 func1 期望的是一个非常量引用作为参数(而非右值),所以会在编译时出错。这个问题解决办法很简单:使用 std::ref 将参数转换成引用的形式,即:

#include <iostream>
#include <thread>

void func1(int& n) { ++n; }           // func1 的参数是非常量“引用”

int main()
{
  int i = 1;
  std::thread t(func1, std::ref(i));  // func1 参数是引用,使用 std::ref(i)
  t.join();
  std::cout << i;                     // 输出 2
}

2.2.2. 启动线程时指定对象成员函数

在启动线程时,也可以指定对象的成员函数。指定方式如下:

class X
{
public:
  void do_lengthy_work(int);
};

X my_x;
int num(0);
std::thread t(&X::do_lengthy_work, &my_x, num);  // 第 1 个参数是对象成员函数,第 2 个参数是对象地址,第 3 个参数是成员函数参数

2.2.3. 参数是 move-only 对象

在启动线程时,如果所关联的函数的参数是 move-only 对象,则需要使用 std::move

#include <iostream>
#include <thread>

void func1(std::unique_ptr<int> p)
{
  std::cout << *p;
}

int main()
{
  std::unique_ptr<int> p(new int(100));   // int(100) 的所有权从 p 转移到线程里,再转移给 func1
  std::thread t(func1, std::move(p));
  t.join();
}

2.3. 转移所有权

C++ 标准库中有很多资源占有(resource-owning)类型,比如 std::ifstream,std::unique_ptr 还有 std::thread 都是可移动,但不可复制。

执行线程的所有权可以在 std::thread 实例中移动,下面将展示一个例子。例子中,创建了两个执行线程,并在 std::thread 实例之间(t1,t2 和 t3)转移所有权:

#include <iostream>
#include <thread>
#include <chrono>

void some_function() {std::cout << "this is some_function\n";}
void some_other_function() {std::cout << "this is some_other_function\n";}

int main() {
    std::thread t1(some_function);
    std::thread t2=std::move(t1);         // 把 t1 关联的执行线程的所有权转移给 t2
    t1=std::thread(some_other_function);  // 临时std::thread对象启动,转换给 t1。执行线程所有者是临时对象,不用显式的调用 std::move()
    std::thread t3;                       // 启动 t3,但没有与任何线程进行关联
    t3=std::move(t2);                     // 将 t2 的执行线程的所有权转移到 t3 中,t2 是一个命名对象,需要显式的调用 std::move()
    // t1=std::move(t3);                  // 这行会报错。因为 t1 已经有了一个关联的线程。

    t1.detach();
    // t2.detach();                       // 这行会报错。因为 t2 已经没有关联的线程了。
    t3.detach();

    std::this_thread::sleep_for(std::chrono::seconds(2));
}

2.3.1. 函数返回 std::thread 对象

std::thread 支持移动,线程的所有权可以在函数外进行转移,就如下面程序一样。

void func1(int i) { std::cout << i; }

std::thread g()                    // 函数 g 返回 std::thread 对象
{
  return std::thread(func1, 100);
}

int main()
{
  std::thread t{g()};
  t.join();
}

2.3.2. 参数为 std::thread 对象

下面介绍函数参数为 std::thread 对象的情况:

void f(std::thread t);            // f 的参数为 std::thread

void g()
{
  f(std::thread(some_function));  // 调用 f,由于是临时对象,可以省略 std::move

  std::thread t(some_function);
  f(std::move(t));                // 调用 f,必须用 std::move
}

2.3.3. scoped_thread(thread_guard 的改进)

当某个对象转移了线程的所有权,就不能对线程进行 join 或 detach 了(参见节 2.3)。这导致在节 2.1.4 中介绍的类 thread_guard 中存在一个问题:代码 thread_guard g(t); 中,如果 t 对象所持有的线程被转移给了其它对象,那么 thread_guard 的析构函数就会出问题。

利用 std::thread 支持移动的特性可以解决上面的问题。我们定义另外一个类 scoped_thread ,它和节 2.1.4 中介绍的类 thread_guard 的一个重要区别在于: scoped_thread 类拥有线程的所有权:

class scoped_thread               // 确保 t 调用 join(拥有 t 的所有权,不会出现调用 join 时 t 所有权已转移的情况)
{
  std::thread t;
public:
  explicit scoped_thread(std::thread t_):
    t(std::move(t_))
  {
    if(!t.joinable())
      throw std::logic_error("No thread");
  }
  ~scoped_thread()
  {
    t.join();
  }
  scoped_thread(scoped_thread const&)=delete;
  scoped_thread& operator=(scoped_thread const&)=delete;
};

struct func;

void f()
{
  int some_local_state;
  scoped_thread t(std::thread(func(some_local_state)));
  do_something_in_current_thread();
}

C++17 标准给出一个建议,就是添加一个 joining_thread 的类型,这个类型与 std::thread 类似,不同是的添加了析构函数,就类似于 scoped_thread 。委员会成员们对此并没有达成统一共识,所以这个类没有添加入 C++17 标准中(C++20 仍旧对这种方式进行探讨,不过名称为 std::jthread )。

2.4. 确定线程数量

std::thread::hardware_concurrency() 会返回并发线程的数量。例如,多核系统中,返回值可以是 CPU 核芯的数量;当无法获取时,函数返回 0。如:

#include <iostream>
#include <thread>

int main()
{
  std::cout << std::thread::hardware_concurrency();
}

2.5. 标识一个线程

在当前线程中调用 std::this_thread::get_id() 可以得到线程的标识,这个标识是 std::thread::id 类型。

#include <iostream>
#include <thread>

int main()
{
  std::cout << std::this_thread::get_id() << std::endl; // 输出实例 0x11b703d40

  std::thread t;
  std::cout << t.get_id() << std::endl;                 // 输出 0x0,t 还没有关联真正线程
}

3. 线程之间共享数据

3.1. 使用互斥量(std::lock_guard,std::scoped_lock)

通过实例化 std::mutex 创建互斥量实例,成员函数 lock() 可对互斥量上锁, unlock() 为解锁。不过,不推荐直接去调用成员函数,调用成员函数就意味着,必须在每个函数出口都要去调用 unlock()C++ 标准库为互斥量提供了 RAII 模板类 std::lock_guard ,在构造时就能提供已锁的互斥量,并在析构时进行解锁,从而保证了互斥量能被正确解锁。

下面的代码中,展示了如何在多线程应用中,使用 std::mutex 构造的 std::lock_guard 实例,对列表进行访问保护:

#include <list>
#include <mutex>
#include <algorithm>

std::list<int> some_list;
std::mutex some_mutex;       // some_mutex 将保护对 some_list 的访问

void add_to_list(int new_value)
{
  std::lock_guard<std::mutex> guard(some_mutex);  // 构造 guard 时,对 some_mutex 加锁;析构时对 some_mutex 解锁
  some_list.push_back(new_value);
}

bool list_contains(int value_to_find)
{
  std::lock_guard<std::mutex> guard(some_mutex);  // 构造 guard 时,对 some_mutex 加锁;析构时对 some_mutex 解锁
  return std::find(some_list.begin(),some_list.end(),value_to_find) != some_list.end();
}

C++17 中添加了一个新特性,称为“模板类参数推导”,类似 std::lock_guard 这样简单的模板类型,其模板参数列表可以省略。所有下面两个等价:

std::lock_guard<std::mutex> guard(some_mutex);
std::lock_guard guard(some_mutex);              // 上一行的简写形式,C++17 支持“模板类参数”推导

C++17 提供了加强版的 std::scoped_lock ,它可以接受多个 std::mutex 为参数,可完全取代 std::lock_guard

所以,在 C++17 的环境下,上面的代码也可以写成:

std::scoped_lock guard(some_mutex);             // C++17 中引入了 std::scoped_lock

3.1.1. 接口间的竞争条件

我们知道栈 std::stack 有下面五个方法:

push() 让一个新元素进栈
pop() 让一个元素出栈,但不会返回栈顶元素
top() 查看栈顶元素
empty() 判断栈是否是空栈
size() 返回栈中有多少个元素

假设上面的每个方法,我们在内部使用了互斥量进行保护;比如,多个线程同时调用 push() 也不会出错。那么这是否意味着对它们的使用就是线程安全的呢?答案是否定的。

考虑下面场景,使用下面代码可以从栈中取出一个元素:

stack<int> s;
//......

int const value = s.top();
s.pop();
do_something(value);

这个代码在单线程情况下可以正确工作;但在多线程的情况下,则有竞争条件。假设栈里有两个元素 1,2(元素 1 在栈底,元素 2 在栈顶),考虑表 1 所示场景:

Table 1: 竞争条件
Thread A Thread B
int const value = s.top(); // 取栈顶元素 2  
  int const value = s.top(); // 取栈顶元素 2
s.pop(); // 弹出 2  
do_something(value); // 处理元素 2 s.pop(); // 弹出 1
  do_something(value); // 处理元素 2

这导致一个不期望的情况:栈顶的元素 2 分别被线程 A 和线程 B 处理了,也就是处理了两次!而元素 1 没有被处理,但也从出栈了。

你可能会问:为什么不直接让 pop 返回栈顶元素。原因在于,假设有一个 stack<vector<int>>,拷贝 vector 时需要在堆上分配内存,当这个系统处在重度负荷,或有严重的资源限制的情况下,这种内存分配就会失败,所以 vector 的拷贝构造函数可能会抛出一个 std::bad_alloc 异常。如果 pop 可以返回栈顶元素值,返回一定是最后执行的语句,stack 在返回前已经弹出了元素,但如果拷贝返回值时抛出异常,就会导致弹出的数据丢失(从栈上移除了,但拷贝失败了)。 这是 std::stack 的设计者将这个操作分解为 top 和 pop 两个方法的原因,但这却造成了多线程环境下的竞争条件。

下面介绍几种方式可以解决这个问题。

方式 1:pop() 传入一个参数(往往是引用)获取“弹出值”。

std::vector<int> result;
some_stack.pop(result);

方式 1 是有局限的:需要构造出一个栈中类型的实例,用于接收目标值。对于一些类型,这样做是不现实的,因为临时构造一个实例,从时间和资源的角度上来看都不划算。对于其他的类型,这样也不总行得通,因为构造函数需要的参数,在这个阶段不一定可用。最后,需要可赋值的存储类型,这是一个重大限制:即使支持移动构造,甚至是拷贝构造(从而允许返回一个值),很多用户自定义类型可能都不支持赋值操作。

方式 2:pop() 内部使用不抛出异常的拷贝构造函数或移动构造函数。这种方式过于局限,抛异常的构造函数还是更常见的,这些类型也希望能存入到栈中。

方式 3:pop() 返回指向“弹出值”的指针。
返回一个指向弹出元素的指针,而不是直接返回值。指针的优势是自由拷贝,并且不会产生异常。对于这个方案,返回值使用 std::shared_ptr 是个不错的选择,不仅能避免内存泄露(因为当对象中指针销毁时,对象也会被销毁),而且标准库能够完全控制内存分配方案。这种方式的缺点就是返回指针需要对对象的内存分配进行管理,对于简单数据类型(比如 int),内存管理的开销要远大于直接返回值。

3.1.1.1. 实现线程安全的栈

下面实现一个线程安全的栈,采用了前面介绍的方式 1 和方式 3:

#include <exception>
#include <memory>
#include <mutex>
#include <stack>

struct empty_stack: std::exception
{
  const char* what() const throw() {
    return "empty stack!";
  };
};

template<typename T>
class threadsafe_stack
{
private:
  std::stack<T> data;
  mutable std::mutex m;

public:
  threadsafe_stack(): data(std::stack<T>()){}

  threadsafe_stack(const threadsafe_stack& other)
  {
    std::lock_guard<std::mutex> lock(other.m);
    data = other.data;
  }

  threadsafe_stack& operator=(const threadsafe_stack&) = delete;

  void push(T new_value)
  {
    std::lock_guard<std::mutex> lock(m);
    data.push(new_value);
  }

  std::shared_ptr<T> pop()              // 方式3,返回指向“弹出值”的指针
  {
    std::lock_guard<std::mutex> lock(m);
    if(data.empty()) throw empty_stack();

    std::shared_ptr<T> const res(std::make_shared<T>(data.top()));
    data.pop();
    return res;
  }

  void pop(T& value)                    // 方式 1,传入一个引用,接收“弹出值”
  {
    std::lock_guard<std::mutex> lock(m);
    if(data.empty()) throw empty_stack();

    value=data.top();
    data.pop();
  }

  bool empty() const
  {
    std::lock_guard<std::mutex> lock(m);
    return data.empty();
  }
};

3.1.2. 死锁(std::lock,std::scoped_lock)

避免死锁的一般建议,就是让两个互斥量以相同的顺序上锁:比如,总在互斥量 B 之前锁住互斥量 A,就永远不会死锁。

不过,有些情况下,“对多个互斥量以相同的顺序上锁”,这操作起来很困难。

考虑一个操作对同一个类的两个不同实例进行数据的 swap() 操作,为了保证数据交换操作的正确性,就要避免并发修改数据,并确保每个实例上的互斥量都能锁住自己要保护的区域。这种情况,应该按什么顺序加锁呢?

 1: class some_big_object;
 2: void swap(some_big_object& lhs,some_big_object& rhs);
 3: class X
 4: {
 5: private:
 6:   some_big_object some_detail;
 7:   std::mutex m;
 8: public:
 9:   X(some_big_object const& sd):some_detail(sd){}
10: 
11:   friend void swap(X& lhs, X& rhs)
12:   {
13:     if(&lhs==&rhs)
14:       return;
15: 
16:     // 我们需要对 lhs.m 和 rhs.m 进行加锁,但应该以什么顺序加锁呢?
17:     // 下面是先对 lhs.m 加锁,再对 rhs.m 加锁
18:     // 可能死锁!
19:     std::lock_guard guard1(lhs.m);
20:     std::lock_guard<std::mutex> guard2(rhs.m);
21: 
22:     swap(lhs.some_detail,rhs.some_detail);
23:   }
24: };

上面代码是有问题的。两个线程试图在相同的两个实例间进行数据交换时,程序会死锁!

很幸运,C++ 标准库有办法解决这个问题, std::lock ,它可以一次性锁住多个(两个以上)的互斥量,并且不会有死锁风险。也就是改为下面代码:

class some_big_object;
void swap(some_big_object& lhs,some_big_object& rhs);
class X
{
private:
  some_big_object some_detail;
  std::mutex m;
public:
  X(some_big_object const& sd):some_detail(sd){}

  friend void swap(X& lhs, X& rhs)
  {
    if(&lhs==&rhs)
      return;

    std::lock(lhs.m,rhs.m);     // std::lock() 会锁住两个互斥量,且不会死锁
    std::lock_guard<std::mutex> lock_a(lhs.m, std::adopt_lock);
    std::lock_guard<std::mutex> lock_b(rhs.m, std::adopt_lock);
    swap(lhs.some_detail,rhs.some_detail);
  }
};

其中, std::adopt_lock 参数除了表示 std::lock_guard 可获取锁之外,还表示将锁交由 std::lock_guard 管理。

C++17 提供了 std::scoped_lock<> 这种 RAII 模板类型,它与 std::lock_guard<> 的功能相同,但它可以接受多个互斥量,而且不会死锁。所以上面的 swap() 函数还可以写为:

void swap(X& lhs, X& rhs)
{
  if(&lhs==&rhs)
    return;
  std::scoped_lock guard(lhs.m,rhs.m);        // 使用 std::scoped_lock 更简单
  swap(lhs.some_detail,rhs.some_detail);
}

3.1.3. 灵活的 std::unique_lock

std::unique_lock 是个类模板,它支持很多使用场景。

3.1.3.1. lock

下面是使用 std::unique_lock 对互斥量上锁的例子:

#include <mutex>
#include <thread>
#include <iostream>
#include <vector>
#include <chrono>

int main()
{
    int counter = 0;
    std::mutex counter_mutex;
    std::vector<std::thread> threads;

    auto worker_task = [&](int id) {
        // don't hold the lock while we simulate an expensive operation
        std::this_thread::sleep_for(std::chrono::seconds(1));

        std::unique_lock<std::mutex> lock(counter_mutex);             // 对互斥量上锁
        ++counter;
        std::cout << id << ", initial counter: " << counter << '\n';
        lock.unlock();                                                // 对互斥量解锁

        // don't hold the lock while we simulate an expensive operation
        std::this_thread::sleep_for(std::chrono::seconds(1));

        lock.lock();                                                  // 对互斥量上锁
        ++counter;
        std::cout << id << ", final counter: " << counter << '\n';
    };                                                                // 超过 scope,会对互斥量解锁

    for (int i = 0; i < 10; ++i) threads.emplace_back(worker_task, i);

    for (auto &thread : threads) thread.join();
}

如果在创建 std::unique_lock 对象时指定 std::defer_lock ,则表示暂时不上锁,如上面代码也可以写为:

#include <mutex>
#include <thread>
#include <iostream>
#include <vector>
#include <chrono>

int main()
{
    int counter = 0;
    std::mutex counter_mutex;
    std::vector<std::thread> threads;

    auto worker_task = [&](int id) {
        std::unique_lock<std::mutex> lock(counter_mutex, std::defer_lock); // std::defer_lock 表示暂时不上锁

        // don't hold the lock while we simulate an expensive operation
        std::this_thread::sleep_for(std::chrono::seconds(1));

        lock.lock();                                                  // 对互斥量上锁
        ++counter;
        std::cout << id << ", initial counter: " << counter << '\n';
        lock.unlock();                                                // 对互斥量解锁

        // don't hold the lock while we simulate an expensive operation
        std::this_thread::sleep_for(std::chrono::seconds(1));

        lock.lock();                                                  // 对互斥量上锁
        ++counter;
        std::cout << id << ", final counter: " << counter << '\n';
    };                                                                // 超过 scope,会对互斥量解锁

    for (int i = 0; i < 10; ++i) threads.emplace_back(worker_task, i);

    for (auto &thread : threads) thread.join();
}
3.1.3.2. try lock

std::unique_lock 的 try lock 场景,有两种使用方式。

方式 1:创建 std::unique_lock 对象时,指定 std::try_to_lock ,如:

std::unique_lock<std::mutex> lock(_mutex, std::try_to_lock); // 创建 std::unique_lock 对象时,指定 std::try_to_lock
if(lock.owns_lock()){                                        // owns_lock() 测试 try_to_lock 是否成功
  // mutex was locked.
} else {
  // mutex wasn't locked.
}

方式 2:创建 std::unique_lock 对象时不加锁,调用 try_lock() 方法尝试加锁,如:

std::unique_lock<std::mutex> lock(mtx, std::defer_lock);     // 创建 std::unique_lock 对象时,先不上锁
if(lock.try_lock()) {                                        // 使用 try_lock() 尝试上锁
  // mutex was locked.
} else {
  // mutex wasn't locked.
}
3.1.3.3. 带有超时的 try lock

try_lock_fortry_lock_until 都是带有超时的 try lock,它们的区别在于 try_lock_for 需要指定“相对时间”,而 try_lock_until 需要指定“绝对时间”。下面是 try_lock_for 的使用例子:

// unique_lock::try_lock_for example
#include <iostream>       // std::cout
#include <chrono>         // std::chrono::milliseconds
#include <thread>         // std::thread
#include <mutex>          // std::timed_mutex, std::unique_lock, std::defer_lock

std::timed_mutex mtx;

void fireworks () {
  std::unique_lock<std::timed_mutex> lck(mtx,std::defer_lock);
  // waiting to get a lock: each thread prints "-" every 200ms:
  while (!lck.try_lock_for(std::chrono::milliseconds(200))) {
    std::cout << "-";
  }
  // got a lock! - wait for 1s, then this thread prints "*"
  std::this_thread::sleep_for(std::chrono::milliseconds(1000));
  std::cout << "*\n";
}

int main ()
{
  std::thread threads[10];
  // spawn 10 threads:
  for (int i=0; i<10; ++i)
    threads[i] = std::thread(fireworks);

  for (auto& th : threads) th.join();

  return 0;
}
3.1.3.4. 转移互斥量所有权

std::unique_lock 实例中的互斥量的所有权可以通过移动操作,在不同的实例中进行传递。

std::unique_lock<std::mutex> get_lock()
{
  extern std::mutex some_mutex;
  std::unique_lock<std::mutex> lk(some_mutex);    // 加锁
  prepare_data();
  return lk;                  // lk是自动变量,不需要调用std::move(),编译器负责调用移动构造函数
}

void process_data()
{
  std::unique_lock<std::mutex> lk(get_lock());    // 得到了 get_lock() 中的互斥量的所有权
  do_something();
}                             // 超过 scope,解锁

3.2. 保护共享数据的其它设施

互斥量是一种通用的机制,但其并非保护共享数据的唯一方式。有很多方式可以在特定情况下,对共享数据提供合适的保护。

3.2.1. 在初始化过程中保护数据

假设有一个共享源,其初始化的代价很昂贵,它可能会打开一个数据库连接或分配出很多的内存。

下面是单线程代码:

std::shared_ptr<some_resource> resource_ptr;

void foo()
{
  if(!resource_ptr)                        // 检测没有初始化,就初始化
  {
    resource_ptr.reset(new some_resource); // 初始化
  }
  resource_ptr->do_something();
}

如果要改为多线程安全的代码,则可以这样:

std::shared_ptr<some_resource> resource_ptr;
std::mutex resource_mutex;

void foo()
{
  std::unique_lock<std::mutex> lk(resource_mutex);  // 所有线程在此序列化
  if(!resource_ptr)
  {
    resource_ptr.reset(new some_resource);
  }
  lk.unlock();

  resource_ptr->do_something();
}

有人可能会用“双重检查锁模式”来避免每次获得锁:

// 下面是错误的实现,有竞争条件
void undefined_behaviour_with_double_checked_locking()
{
  if(!resource_ptr)           // 一次检查
  {
    std::lock_guard<std::mutex> lk(resource_mutex);
    if(!resource_ptr)         // 二次检查
    {
      resource_ptr.reset(new some_resource);
    }
  }
  resource_ptr->do_something();
}

不过,上面的实现是存在竞争条件的。

假设有两个线程 A 和 B。线程 A 发现 resource_ptr 指针已经不是 NULL,并不意味着可以安全地调用 do_something()。因为可能另外一个线程 B 刚刚完成对指针写入,线程 A 可能没有看到新创建的 some_resource 实例,然后调用 do_something() 后,得到不正确的结果。关于这个问题,可以参考 https://www.aristeia.com/Papers/DDJ_Jul_Aug_2004_revised.pdf

3.2.1.1. std::once_flag,std::call_once

C++ 标准库提供了 std::once_flagstd::call_once 来处理“只需调用一次的情况”。比起锁住互斥量并显式的检查指针,每个线程只需要使用 std::call_once 就可以,在 std::call_once 的结束时,就能安全的知晓指针已经被其他的线程初始化了。如:

std::shared_ptr<some_resource> resource_ptr;
std::once_flag resource_flag;                  // 定义 std::once_flag

void init_resource()
{
  resource_ptr.reset(new some_resource);
}

void foo()
{
  std::call_once(resource_flag,init_resource);  // 可以完整的进行一次初始化
  resource_ptr->do_something();
}

std::call_once 比使用 mutex 的开销更小。

3.2.1.2. static 变量

在 C++11 之前,static 变量的初始化存在潜在的竞争条件:变量声明为 static 时,声明后就完成了初始化,一个线程完成了初始化,其他线程仍会抢着定义这个变量(每个线程都认为他们是第一个初始化这个变量的线程)。为此, C++11 规定 static 变量的初始化只完全发生在一个线程中,直到初始化完成前其他线程都不会做处理,从而避免了竞争条件。

在只需要一个全局实例情况下,这里提供一个 std::call_once 的替代方案:

class my_class;
my_class& get_my_class_instance()
{
  static my_class instance;       // 线程安全的初始化过程
  return instance;
}

3.2.2. 读写锁(std::shared_lock)

从 C++14 开始有了读写锁。C++14 中提供了 std::shared_timed_mutex ,C++17 中提供了 std::shared_mutex~,这两者的区别在于 ~std::shared_timed_mutex 支持更多的操作(如带有超时的加锁),而 std::shared_mutex 性能更好。

使用 std::shared_lock 可以获得 std::shared_timed_mutex/std::shared_mutex 上的“读锁”;而使用 std::unique_lock (也可以是 std::lock_guard )可以获得 std::shared_timed_mutex/std::shared_mutex 上的“写锁”。下面是使用例子:

class A {
 private:
  mutable std::shared_mutex m;
  int n = 0;

 public:
  int read()
  {
    std::shared_lock<std::shared_mutex> lck(m);
    return n;
  }
  void write()
  {
    std::unique_lock<std::shared_mutex> lck(m);  // 也可以 std::lock_guard<std::shared_mutex> lck(m);
    ++n;
  }
};

3.2.3. 嵌套锁(std::recursive_mutex)

线程对已经上锁的 std::mutex 再次上锁是错误的,尝试这样做会导致未定义行为。如:

// 错误的例子
#include <thread>
#include <mutex>

std::mutex m;     // 这个例子中改为 std::recursive_mutex m; 可消除未定义行为

void f()
{
  m.lock();
  m.unlock();
}

void g()
{
  m.lock();
  f();             // std::mutex m 已经加锁了,不能再加锁。这导致未定义行为(可能死锁)
  m.unlock();
}

int main()
{
  std::thread t(g);
  t.join();
}

C++ 提供了 std::recursive_mutex ,它可以在一个线程上多次获取锁,当然每获得一次锁都要对应一次解锁。

4. 同步操作

4.1. 等待事件或条件

一个线程如何等待另一个线程完成任务呢?一种办法是每隔一段时间就检查某个完成标志。如:

bool flag;             // 如果另一线程任意,则设置 flag 为 true
std::mutex m;          // 对 flag 的访问用 m 保护

void wait_for_flag()
{
  std::unique_lock<std::mutex> lk(m);
  while(!flag)
  {
    lk.unlock();
    std::this_thread::sleep_for(std::chrono::milliseconds(100));  // 休眠100ms,再检查
    lk.lock();
  }
}

这种方法不好,因为很难确定合适的休眠时间。

C++ 中提供了“条件变量”来通知某事件的发生,下节将介绍它们。

4.1.1. 使用条件变量“等待条件达成”

C++ 标准库对条件变量有两套实现: std::condition_variablestd::condition_variable_any ,前者仅能与 std::mutex 一起工作,而后者可以和合适的互斥量一起工作,从而加上了 _any 的后缀。 std::condition_variable_any 更加通用,不过在性能和系统资源的使用方面会有更多的开销,所以通常会将 std::condition_variable 作为首选类型。当对灵活性有要求时,才会考虑 std::condition_variable_any

下面介绍 std::condition_variable 的使用:

#include <iostream>
#include <condition_variable>
#include <thread>
#include <chrono>

std::condition_variable cv;
std::mutex cv_m; // This mutex is used for three purposes:
                 // 1) to synchronize accesses to i
                 // 2) to synchronize accesses to std::cerr
                 // 3) for the condition variable cv
int i = 0;

void waits()
{
    std::unique_lock<std::mutex> lk(cv_m);
    std::cerr << "Waiting... \n";
    // cv.wait 功能:
    // 1、解锁 lk 关联的 cv_m(调用 cv.wait 前,cv_m 必须是上锁的状态);
    // 2、阻塞线程直到被其它线程通过条件变量 cv 唤醒它;
    // 3、再次对 cv_m 加锁;
    // 4、检测条件 i == 1 是否满足,如果满足就往下走;如果不满足则重新回到步骤 1。
    cv.wait(lk, []{return i == 1;});
    std::cerr << "...finished waiting. i == 1\n";
}

void signals()
{
    std::this_thread::sleep_for(std::chrono::seconds(1));
    {
        std::lock_guard<std::mutex> lk(cv_m);
        std::cerr << "Notifying...\n";
    }
    cv.notify_all();      // 第一次 notify_all();

    std::this_thread::sleep_for(std::chrono::seconds(1));

    {
        std::lock_guard<std::mutex> lk(cv_m);
        i = 1;
        std::cerr << "Notifying again...\n";
    }
    cv.notify_all();      // 第二次 notify_all();
}

int main()
{
    std::thread t1(waits), t2(waits), t3(waits), t4(signals);
    t1.join();
    t2.join();
    t3.join();
    t4.join();
}

编译并执行上面代码,会输出:

Waiting...
Waiting...
Waiting...
Notifying...
Notifying again...
...finished waiting. i == 1
...finished waiting. i == 1
...finished waiting. i == 1

从输出中可以知道,第一次调用 cv.notify_all(); 并没有唤醒另外 3 个线程,这是因为第一次调用 cv.notify_all(); 时, i == 1 这个条件并没有满足;第二次调用 cv.notify_all(); 时唤醒了另外 3 个线程。

4.1.2. 使用条件变量实现线程安全的队列

下面是使用条件变量实现线程安全的队列的例子:

#include <queue>
#include <memory>
#include <mutex>
#include <condition_variable>

template<typename T>
class threadsafe_queue
{
private:
  mutable std::mutex mut;               // 互斥量成员必须为mutable,才能在empty() 和拷贝构造函数中进行上锁
  std::queue<T> data_queue;
  std::condition_variable data_cond;
public:
  threadsafe_queue()
  {}
  threadsafe_queue(threadsafe_queue const& other) // other 是 const 的,mut 必须是 mutable 才能上锁
  {
    std::lock_guard<std::mutex> lk(other.mut);
    data_queue=other.data_queue;
  }

  void push(T new_value)
  {
    std::lock_guard<std::mutex> lk(mut);
    data_queue.push(new_value);
    data_cond.notify_one();
  }

  void wait_and_pop(T& value)           // 等待有值时才返回
  {
    std::unique_lock<std::mutex> lk(mut);
    data_cond.wait(lk,[this]{return !data_queue.empty();});
    value=data_queue.front();
    data_queue.pop();
  }

  std::shared_ptr<T> wait_and_pop()     // 等待有值时才返回
  {
    std::unique_lock<std::mutex> lk(mut);
    data_cond.wait(lk,[this]{return !data_queue.empty();});
    std::shared_ptr<T> res(std::make_shared<T>(data_queue.front()));
    data_queue.pop();
    return res;
  }

  bool try_pop(T& value)                // 尝试从队列中弹出数据,如要没有值,也会直接返回
  {
    std::lock_guard<std::mutex> lk(mut);
    if(data_queue.empty())
      return false;
    value=data_queue.front();
    data_queue.pop();
    return true;
  }

  std::shared_ptr<T> try_pop()          // 尝试从队列中弹出数据,如要没有值,也会直接返回
  {
    std::lock_guard<std::mutex> lk(mut);
    if(data_queue.empty())
      return std::shared_ptr<T>();
    std::shared_ptr<T> res(std::make_shared<T>(data_queue.front()));
    data_queue.pop();
    return res;
  }

  bool empty() const                    // empty() 标记成了 const,所以 mut 必须是 mutable 才能上锁
  {
    std::lock_guard<std::mutex> lk(mut);
    return data_queue.empty();
  }
};

4.2. 使用 future

4.2.1. 后台任务的返回值

当不着急让任务结果时,可以使用 std::async 启动一个异步任务。与 std::thread 对象等待的方式不同, std::async 会返回一个 std::future 对象,这个对象持有最终计算出来的结果。当需要这个值时,只需要调用这个对象的 get() 成员函数,就会阻塞线程直到 future 为就绪为止,并返回计算结果。

下面是 future 的使用例子:

#include <future>
#include <thread>
#include <iostream>

int find_the_answer()
{
  std::this_thread::sleep_for(std::chrono::seconds(3));
  return 100;
}

void do_other_stuff()
{
  std::this_thread::sleep_for(std::chrono::seconds(1));
}

int main()
{
  std::future<int> the_answer = std::async(find_the_answer);   // std::async 会返回 std::future

  do_other_stuff();
  std::cout<<"do_other_stuff finished."<<std::endl;

  std::cout<<"The answer is "<<the_answer.get()<<std::endl;
}

std::thread 方式一样, std::async 允许通过添加额外的调用参数,向函数传递额外的参数。如:

#include <string>
#include <future>
struct X
{
  void foo(int,std::string const&);
  std::string bar(std::string const&);
};
X x;
auto f1=std::async(&X::foo,&x,42,"hello");  // 调用 p->foo(42, "hello"),p 是指向 x 的指针
auto f2=std::async(&X::bar,x,"goodbye");    // 调用 tmpx.bar("goodbye"), tmpx 是 x 的拷贝副本

struct Y
{
  double operator()(double);
};
Y y;
auto f3=std::async(Y(),3.141);          // 调用 tmpy(3.141),tmpy 通过 Y 的移动构造函数得到
auto f4=std::async(std::ref(y),2.718);  // 调用 y(2.718)

X baz(X&);
std::async(baz,std::ref(x));            // 调用 baz(x)

4.2.2. 指定执行后台任务的方式(std::launch::deferred,std::launch::async)

在使用 std::async 启动后台任务时,可以在第 1 个参数中指定后台任务的执行方式。如:

int f();
auto ft1 = std::async(std::launch::async, f);     // 函数 f 在不同的线程上异步执行
auto ft2 = std::async(std::launch::deferred, f);  // 函数 f 在当前线程中延迟执行,在返回的 future 上调用get() 或wait() 时才运行
auto ft3 = std::async(std::launch::async | std::launch::deferred, f);  // 由实现选择执行方式
auto ft4 = std::async(f)                                               // 同上,则实现选择执行方式

4.2.3. std::packaged_task

std::async 并不是得到 std::future 的唯一方式,我们还可以使用 std::packaged_taskstd::future 和某个任务进行关联。如:

#include <iostream>
#include <cmath>
#include <thread>
#include <future>
#include <functional>

// unique function to avoid disambiguating the std::pow overload set
int f(int x, int y) { return std::pow(x,y); }

void task_lambda()
{
    std::packaged_task<int(int,int)> task([](int a, int b) {
        return std::pow(a, b);
    });
    std::future<int> result = task.get_future();  // 得到 future

    task(2, 9);  // 调用 std::packaged_task 对象,future 会设置上对应值,变为“就绪状态”

    std::cout << "task_lambda:\t" << result.get() << '\n';
}

void task_bind()
{
    std::packaged_task<int()> task(std::bind(f, 2, 11));
    std::future<int> result = task.get_future();

    task();      // 调用 std::packaged_task 对象,future 会设置上对应值,变为“就绪状态”

    std::cout << "task_bind:\t" << result.get() << '\n';
}

void task_thread()
{
    std::packaged_task<int(int,int)> task(f);
    std::future<int> result = task.get_future();

    std::thread task_td(std::move(task), 2, 10);
    task_td.join();

    std::cout << "task_thread:\t" << result.get() << '\n';
}

int main()
{
    task_lambda();
    task_bind();
    task_thread();
}

4.2.4. std::promises

future 是“未来值”,而 promise 可以认为是“未来值的占位符”。下面是 promise 的例子:

#include <future>
#include <thread>
#include <iostream>

int main()
{
  std::promise<int> ps;

  std::future<int> ft = ps.get_future();
  ps.set_value(100);          // “未来值”确定后就不能再改变,set_value 只能调用一次

  std::cout << ft.get();      // 输出 100
}

下面再介绍一个 promise 的例子:

#include <vector>
#include <thread>
#include <future>
#include <numeric>
#include <iostream>
#include <chrono>

void accumulate(std::vector<int>::iterator first,
                std::vector<int>::iterator last,
                std::promise<int> accumulate_promise)
{
  int sum = std::accumulate(first, last, 0);
  accumulate_promise.set_value(sum);         // Notify future
}

int main()
{
  // Demonstrate using promise<int> to transmit a result between threads.
  std::vector<int> numbers = { 1, 2, 3, 4, 5, 6 };

  std::promise<int> accumulate_promise;
  std::future<int> accumulate_future = accumulate_promise.get_future();
  std::thread work_thread(accumulate, numbers.begin(), numbers.end(),
                          std::move(accumulate_promise));

  // future::get() will wait until the future has a valid result and retrieves it.
  std::cout << "result=" << accumulate_future.get() << '\n';    // 输出 result=21

  work_thread.join();
}

4.2.5. future 和 promise 中的异常

先看一下 future 中的异常:

#include <thread>
#include <future>
#include <iostream>
#include <math.h>

double square_root(double x)
{
  if(x<0)
  {
    throw std::out_of_range("x < 0");       // 抛出异常
  }
  return sqrt(x);
}

int main()
{
  std::future<double> f=std::async(square_root, -1);  // square_root 抛出的异常会保存在 future 中,调用 get() 时会获得这个异常

  double y;
  try {
    y=f.get();                              // 如果 square_root 抛出异常,那么 get() 也会抛出异常
  } catch (const std::exception& e) {
    std::cout << "Caught exception \"" << e.what() << "\"\n";
  }
}

在 promise 中,使用 set_exception() 可以设置异常,如:

extern std::promise<double> some_promise;
try
{
  some_promise.set_value(square_root(-1));
}
catch(...)
{
  some_promise.set_exception(std::current_exception());
}

4.2.6. 析构前需要设置值

如果 std::packaged_taskstd::promise 直到析构都未设置值, std::future::get 会抛出异常。

下面是 std::packaged_task 直到析构都未设置值的例子:

#include <thread>
#include <future>

int f() {return 100;}

int main()
{
  std::future<int> ft;
  {
    std::packaged_task<int()> pt(f);
    ft = pt.get_future();

    // pt();                  // 把这行的注释取消,后面的 ft.get() 就不会抛出异常了
  } // pt 超过 scope,会析构
  ft.get();                   // 会抛出异常,因为 pt 析构了,future ft 也没有设置值
}

下面是 std::promise 直到析构都未设置值的例子:

#include <thread>
#include <future>

int main()
{
  std::future<int> ft;
  {
    std::promise<int> ps;
    ft = ps.get_future();

    // ps.set_value(200);     // 把这行的注释取消,后面的 ft.get() 就不会抛出异常了
  } // ps 超过 scope,会析构
  ft.get();                   // 会抛出异常,因为 ps 析构了,future ft 也没有设置值
}

4.2.7. std::shared_future(多次调用 get)

std::future 调用 get() 后就无法再次调用 get() ,也就是说只能获取一次数据。如:

#include <future>
#include <thread>
#include <iostream>

int main()
{
  std::promise<int> ps;

  std::future<int> ft = ps.get_future();
  ps.set_value(100);

  std::cout << ft.get();      // 输出 100
  std::cout << ft.get();      // 这里会出错,因为 future 的 get() 只能调用一次
}

如果使用 std::shared_future ,则可以多次调用 get() ,如:

#include <future>
#include <thread>
#include <iostream>

int main()
{
  std::promise<int> ps;

  std::shared_future<int> ft(ps.get_future());   // 这行也可以改为: auto ft = ps.get_future().share();
  ps.set_value(100);

  std::cout << ft.get();      // 输出 100
  std::cout << ft.get();      // 输出 100 (ft 是 std::shared_future ,可以调用多次 get)
}

5. 并行算法

C++17 为标准库添加了并行算法。它是对之前已存在的一些标准算法的重载,例如: std::findstd::transformstd::reduce 等。并行版本的签名除了和单线程版本的函数签名相同之外,在参数列表的中新添加了一个参数(第一个参数)——指定要使用的执行策略。例如:

std::vector<int> my_data;
std::sort(std::execution::par, my_data.begin(), my_data.end());

std::execution::par 是执行策略,表示使用多个线程调用此并行算法。

5.1. 执行策略

C++17 中定义了 3 个执行策略(它们是 C++ 类):

  • std::execution::sequenced_policy
  • std::execution::parallel_policy
  • std::execution::parallel_unsequenced_policy

它们定义在头文件 <execution> 中,该头文件还分别为每个执行策略类定义了相应的对象:

  • std::execution::seq
  • std::execution::par
  • std::execution::par_unseq

5.1.1. std::execution::sequenced_policy

std::execution::sequenced_policy 策略要求相关操作“顺序地”在当前调用线程上执行,它不是一个“并行”策略。注意,它只是要求“顺序地”(一个一个地)执行,但并没有规定具体的顺序。下面是一个例子:

std::vector<int> v(1000);
int n = 0;
std::for_each(std::execution::seq, v.begin(), v.end(), [&](int& x){ x = ++n; });  // 1 到 1000 存在 v 中,但顺序是未指定的

5.1.2. std::execution::parallel_policy

std::execution::parallel_policy 策略提供了基本的跨多个线程的并行执行,操作可以执行在调用算法的线程上,或执行在由库创建的线程上。

下面该策略是使用例子:

std::for_each(std::execution::par, v.begin(), v.end(), [](auto& x){ ++x; });  // 对 v 中每个元素加 1,并行执行

由于是并行执行,所有有共享数据访问时可能有竞争条件,如:

// 这是有问题的代码,对 count 的访问有竞争!
std::vector<int> v(1000);
int count = 0;
std::for_each(std::execution::par, v.begin(), v.end(), [&](int& x){ x=++count; });

要解决上面问题,可以在操作 count 前加锁,如:

std::vector<int> v(1000);
int count = 0;
std::mutex m;

std::for_each(std::execution::par, v.begin(), v.end(), [&](int& x){
  std::lock_guard<std::mutex> guard(m);
  x=++count;
});

此外,还可以把 count 修改为原子变量。

6. 参考

本文主要摘自:
C++ Concurrency in Action, 2ed

文中的一些代码实例摘自:
https://en.cppreference.com/w/
http://www.cplusplus.com/reference/

Author: cig01

Created: <2019-12-08 Sun>

Last updated: <2020-09-02 Wed>

Creator: Emacs 27.1 (Org mode 9.4)