C++ Concurrency
Table of Contents
1. C++ 并发简介
C++11 中对并发添加了语言级别的支持,如增加了 std::thread, std::future, std::promise 等设施。C++14/C++17 中对并发的支持进一步完善。
1.1. Hello Concurrent World
下面是一个简单的 C++ 并发程序:
#include <iostream> #include <thread> void hello() { std::cout << "Hello Concurrent World\n"; } int main() { std::thread t(hello); // 创建一个新线程 t,t 将执行函数 hello t.join(); // 等待线程 t,防止 main 比 t 先执行结束 }
2. 管理线程
2.1. 启动线程
构造 std::thread
对象可以启动新线程。如:
void do_some_work(); std::thread my_thread(do_some_work); // do_some_work 是普通函数
std::thread
也可以通过有函数操作符的类的实例进行构造,如:
class background_task { public: void operator()() const { do_something(); do_something_else(); } }; background_task f; std::thread my_thread(f); // f 是具有 operator() 的类的实例
上面代码中,提供的函数对象 f
会复制到新线程的存储空间中,函数对象的执行和调用都在线程的内存空间中进行。
需要注意的是,当把函数对象传入到线程构造函数中时,需要避免 C++’s most vexing parse 问题。如果你传递了一个临时变量,而不是一个命名的变量。C++ 编译器会将其解析为函数声明,而不是类型对象的定义。
std::thread my_thread(background_task()); // C++ 编译器认为这里声明了一个名为 my_thread 的函数
那怎么用临时变量构造 std::thread
对象呢?有两种方式,一是多加一对小括号;二是使用统一的初始化语法。如:
std::thread my_thread((background_task())); // 解决 most vexing parse 的方法一:多加一对小括号 std::thread my_thread{background_task()}; // 解决 most vexing parse 的方法二:使用统一的初始化语法
除此外,使用 Lambda 表达式也能避免这个问题:
std::thread my_thread([]{ // 使用Lambda 表达式构造 std::thread 对象 do_something(); do_something_else(); });
2.1.1. 不要用访问局部变量的函数去创建线程
考虑下面例子:
struct func { int& i; func(int& i_) : i(i_) {} void operator() () { for (unsigned j=0 ; j<1000000 ; ++j) { do_something(i); // 潜在访问隐患:空引用 } } }; void oops() { int some_local_state=0; // 局部变量 func my_func(some_local_state); std::thread my_thread(my_func); my_thread.detach(); // 不等待线程结束 } // 新线程可能还在运行
上面代码中,函数 oops()
中创建了线程 my_thread
,这个新线程执行 my_func
,但 my_func
使用了局部变量 some_local_state
。
如果函数 oops()
结束,那么局部变量 some_local_state
会被销毁;那么线程 my_thread
还访问这个局部变量就会出问题。要解决这个问题,可以 1、在 oops()
函数中通过调用 join()
等待线程 my_thread
;2、把局部变量 some_local_state
改为全局变量;3、修改 func
的定义,把 i
从引用变量改为普通变量。
2.1.2. 要么 detach,要么 join
线程启动后要决定“等待线程结束”(调用 join),还是“让其自主运行”(调用 detach)。 如果 std::thread
对象在销毁之前还没有做出决定,程序就会终止, std::thread
的析构函数会调用 std::terminate()
。
比如,下面程序由于线程 t 在其销毁之间,既没有调用 join,也没有调用 detach,会导致程序 terminate:
#include <iostream> #include <thread> void hello() { std::cout << "Hello Concurrent World\n"; } int main() { std::thread t(hello); // 线程 t 在销毁之前还没有调用 detach,也没有调用 join,程序会 terminate }
2.1.3. 确保线程的 join 方法被调用
假设我们想在函数 f
结束之间调用 t.join()
,可以这样做:
void f() { int some_local_state=0; func my_func(some_local_state); std::thread t(my_func); try { do_something_in_current_thread(); } catch(...) { t.join(); // 1 这一处容易忘记,确保 do_something_in_current_thread 有异常时也调用 t.join() throw; } t.join(); // 2 }
要写两个 t.join()
,这多少有些繁琐,下节将介绍使用 RAII 确保线程的 join 方法被调用。
2.1.4. 使用 RAII 确保线程的 join 方法被调用(thread_guard)
下面介绍使用“资源获取即初始化方式”(RAII,Resource Acquisition Is Initialization)来确保线程的 join 方法被调用。即,提供一个类,在析构函数中使用 join()。
下面 thread_guard
类可以帮忙我们调用线程的 join 方法:
class thread_guard { std::thread& t; public: explicit thread_guard(std::thread& t_): t(t_) {} ~thread_guard() { if(t.joinable()) { t.join(); } } thread_guard(thread_guard const&)=delete; thread_guard& operator=(thread_guard const&)=delete; };
类 thread_guard
中拷贝构造函数和拷贝赋值操作标记为 =delete
,是为了不让编译器自动生成。直接对 thread_guard
对象进行拷贝或赋值是很危险的,因为这可能会弄丢已汇入的线程。通过删除声明,任何尝试给 thread_guard
对象赋值的操作都会引发一个编译错误。
下面是 thread_guard
的使用例子:
struct func { int& i; func(int& i_) : i(i_) {} void operator() () { for (unsigned j=0 ; j<1000000 ; ++j) { do_something(i); } } }; void f() { int some_local_state=0; func my_func(some_local_state); std::thread t(my_func); thread_guard g(t); // 使用 thread_guard 确保 t 的 join 一定会被调用,防止程序 terminate do_something_in_current_thread(); };
2.2. 启动线程时传递参数
在启动线程时,可以指定线程所关联函数的参数,如:
void f(int i, std::string const& s); std::thread t(f, 3, "hello"); // 第一个参数为线程所关联函数,其余参数为函数的参数
在上面这个例子中,需要说明的是 f
的第 2 个参数是 std::string
类型,而我们传递进去的是 char const *
类型,这里有个类型转换, 这个类型转换是在“新线程的上下文”中进行的。 我们考虑一个危险的例子:
// 下面代码有问题! void f(int i, std::string const& s); // f 第二个参数是 std::string void oops(int some_param) { char buffer[1024]; sprintf(buffer, "%i", some_param); std::thread t(f, 3, buffer); // 传给 f 的第二参数是 char const * t.detach(); }
由于 buffer
转换为 std::string
这个操作是在“新线程的上下文”中进行的,这里就有问题了:函数 oops
可能会在 buffer
转换成 std::string
之前就结束,这时新线程再去访问 buffer
就有问题了( buffer
已经被释放)。这个问题的解决方案就是在传递到 std::thread
构造函数之前,就将字面值转化为 std::string
,如:
void f(int i, std::string const& s); void not_oops(int some_param) { char buffer[1024]; sprintf(buffer, "%i", some_param); std::thread t(f, 3, std::string(buffer)); // 提前用 std::string,避免在“新线程的上下文”进行类型转换时 buffer 可能失效 t.detach(); }
2.2.1. 启动线程时传递引用类型的参数
在启动线程时,如果所关联的函数的参数是引用类型,则传参时需要使用 std::ref
。
考虑下面例子:
#include <iostream> #include <thread> void func1(int& n) { ++n; } // func1 的参数是非常量“引用”类型 int main() { int i = 1; std::thread t(func1, i); // 这里编译时会出错 t.join(); std::cout << i; }
std::thread
的构造函数无视函数参数类型,盲目地拷贝已提供的变量。不过,内部代码会将拷贝的参数以右值的方式进行传递,这是为了那些只支持移动的类型,而后会尝试以右值为实参调用函数 func1
。但因为函数 func1
期望的是一个非常量引用作为参数(而非右值),所以会在编译时出错。这个问题解决办法很简单:使用 std::ref
将参数转换成引用的形式,即:
#include <iostream> #include <thread> void func1(int& n) { ++n; } // func1 的参数是非常量“引用” int main() { int i = 1; std::thread t(func1, std::ref(i)); // func1 参数是引用,使用 std::ref(i) t.join(); std::cout << i; // 输出 2 }
2.2.2. 启动线程时指定对象成员函数
在启动线程时,也可以指定对象的成员函数。指定方式如下:
class X { public: void do_lengthy_work(int); }; X my_x; int num(0); std::thread t(&X::do_lengthy_work, &my_x, num); // 第 1 个参数是对象成员函数,第 2 个参数是对象地址,第 3 个参数是成员函数参数
2.2.3. 参数是 move-only 对象
在启动线程时,如果所关联的函数的参数是 move-only 对象,则需要使用 std::move
。
#include <iostream> #include <thread> void func1(std::unique_ptr<int> p) { std::cout << *p; } int main() { std::unique_ptr<int> p(new int(100)); // int(100) 的所有权从 p 转移到线程里,再转移给 func1 std::thread t(func1, std::move(p)); t.join(); }
2.3. 转移所有权
C++ 标准库中有很多资源占有(resource-owning)类型,比如 std::ifstream,std::unique_ptr 还有 std::thread 都是可移动,但不可复制。
执行线程的所有权可以在 std::thread
实例中移动,下面将展示一个例子。例子中,创建了两个执行线程,并在 std::thread
实例之间(t1,t2 和 t3)转移所有权:
#include <iostream> #include <thread> #include <chrono> void some_function() {std::cout << "this is some_function\n";} void some_other_function() {std::cout << "this is some_other_function\n";} int main() { std::thread t1(some_function); std::thread t2=std::move(t1); // 把 t1 关联的执行线程的所有权转移给 t2 t1=std::thread(some_other_function); // 临时std::thread对象启动,转换给 t1。执行线程所有者是临时对象,不用显式的调用 std::move() std::thread t3; // 启动 t3,但没有与任何线程进行关联 t3=std::move(t2); // 将 t2 的执行线程的所有权转移到 t3 中,t2 是一个命名对象,需要显式的调用 std::move() // t1=std::move(t3); // 这行会报错。因为 t1 已经有了一个关联的线程。 t1.detach(); // t2.detach(); // 这行会报错。因为 t2 已经没有关联的线程了。 t3.detach(); std::this_thread::sleep_for(std::chrono::seconds(2)); }
2.3.1. 函数返回 std::thread 对象
std::thread 支持移动,线程的所有权可以在函数外进行转移,就如下面程序一样。
void func1(int i) { std::cout << i; } std::thread g() // 函数 g 返回 std::thread 对象 { return std::thread(func1, 100); } int main() { std::thread t{g()}; t.join(); }
2.3.2. 参数为 std::thread 对象
下面介绍函数参数为 std::thread 对象的情况:
void f(std::thread t); // f 的参数为 std::thread void g() { f(std::thread(some_function)); // 调用 f,由于是临时对象,可以省略 std::move std::thread t(some_function); f(std::move(t)); // 调用 f,必须用 std::move }
2.3.3. scoped_thread(thread_guard 的改进)
当某个对象转移了线程的所有权,就不能对线程进行 join 或 detach 了(参见节 2.3)。这导致在节 2.1.4 中介绍的类 thread_guard
中存在一个问题:代码 thread_guard g(t);
中,如果 t
对象所持有的线程被转移给了其它对象,那么 thread_guard
的析构函数就会出问题。
利用 std::thread
支持移动的特性可以解决上面的问题。我们定义另外一个类 scoped_thread
,它和节 2.1.4 中介绍的类 thread_guard
的一个重要区别在于: scoped_thread
类拥有线程的所有权:
class scoped_thread // 确保 t 调用 join(拥有 t 的所有权,不会出现调用 join 时 t 所有权已转移的情况) { std::thread t; public: explicit scoped_thread(std::thread t_): t(std::move(t_)) { if(!t.joinable()) throw std::logic_error("No thread"); } ~scoped_thread() { t.join(); } scoped_thread(scoped_thread const&)=delete; scoped_thread& operator=(scoped_thread const&)=delete; }; struct func; void f() { int some_local_state; scoped_thread t(std::thread(func(some_local_state))); do_something_in_current_thread(); }
C++17 标准给出一个建议,就是添加一个 joining_thread
的类型,这个类型与 std::thread
类似,不同是的添加了析构函数,就类似于 scoped_thread
。委员会成员们对此并没有达成统一共识,所以这个类没有添加入 C++17 标准中(C++20 仍旧对这种方式进行探讨,不过名称为 std::jthread
)。
2.4. 确定线程数量
std::thread::hardware_concurrency()
会返回并发线程的数量。例如,多核系统中,返回值可以是 CPU 核芯的数量;当无法获取时,函数返回 0。如:
#include <iostream> #include <thread> int main() { std::cout << std::thread::hardware_concurrency(); }
2.5. 标识一个线程
在当前线程中调用 std::this_thread::get_id()
可以得到线程的标识,这个标识是 std::thread::id 类型。
#include <iostream> #include <thread> int main() { std::cout << std::this_thread::get_id() << std::endl; // 输出实例 0x11b703d40 std::thread t; std::cout << t.get_id() << std::endl; // 输出 0x0,t 还没有关联真正线程 }
3. 线程之间共享数据
3.1. 使用互斥量(std::lock_guard,std::scoped_lock)
通过实例化 std::mutex
创建互斥量实例,成员函数 lock()
可对互斥量上锁, unlock()
为解锁。不过,不推荐直接去调用成员函数,调用成员函数就意味着,必须在每个函数出口都要去调用 unlock()
。 C++ 标准库为互斥量提供了 RAII 模板类 std::lock_guard
,在构造时就能提供已锁的互斥量,并在析构时进行解锁,从而保证了互斥量能被正确解锁。
下面的代码中,展示了如何在多线程应用中,使用 std::mutex
构造的 std::lock_guard
实例,对列表进行访问保护:
#include <list> #include <mutex> #include <algorithm> std::list<int> some_list; std::mutex some_mutex; // some_mutex 将保护对 some_list 的访问 void add_to_list(int new_value) { std::lock_guard<std::mutex> guard(some_mutex); // 构造 guard 时,对 some_mutex 加锁;析构时对 some_mutex 解锁 some_list.push_back(new_value); } bool list_contains(int value_to_find) { std::lock_guard<std::mutex> guard(some_mutex); // 构造 guard 时,对 some_mutex 加锁;析构时对 some_mutex 解锁 return std::find(some_list.begin(),some_list.end(),value_to_find) != some_list.end(); }
C++17 中添加了一个新特性,称为“模板类参数推导”,类似 std::lock_guard
这样简单的模板类型,其模板参数列表可以省略。所有下面两个等价:
std::lock_guard<std::mutex> guard(some_mutex); std::lock_guard guard(some_mutex); // 上一行的简写形式,C++17 支持“模板类参数”推导
C++17 提供了加强版的 std::scoped_lock
,它可以接受多个 std::mutex 为参数,可完全取代 std::lock_guard
。
所以,在 C++17 的环境下,上面的代码也可以写成:
std::scoped_lock guard(some_mutex); // C++17 中引入了 std::scoped_lock
3.1.1. 接口间的竞争条件
我们知道栈 std::stack
有下面五个方法:
push() 让一个新元素进栈 pop() 让一个元素出栈,但不会返回栈顶元素 top() 查看栈顶元素 empty() 判断栈是否是空栈 size() 返回栈中有多少个元素
假设上面的每个方法,我们在内部使用了互斥量进行保护;比如,多个线程同时调用 push() 也不会出错。那么这是否意味着对它们的使用就是线程安全的呢?答案是否定的。
考虑下面场景,使用下面代码可以从栈中取出一个元素:
stack<int> s; //...... int const value = s.top(); s.pop(); do_something(value);
这个代码在单线程情况下可以正确工作;但在多线程的情况下,则有竞争条件。假设栈里有两个元素 1,2(元素 1 在栈底,元素 2 在栈顶),考虑表 1 所示场景:
Thread A | Thread B |
---|---|
int const value = s.top(); // 取栈顶元素 2 | |
int const value = s.top(); // 取栈顶元素 2 | |
s.pop(); // 弹出 2 | |
do_something(value); // 处理元素 2 | s.pop(); // 弹出 1 |
do_something(value); // 处理元素 2 |
这导致一个不期望的情况:栈顶的元素 2 分别被线程 A 和线程 B 处理了,也就是处理了两次!而元素 1 没有被处理,但也从出栈了。
你可能会问:为什么不直接让 pop 返回栈顶元素。原因在于,假设有一个 stack<vector<int>>,拷贝 vector 时需要在堆上分配内存,当这个系统处在重度负荷,或有严重的资源限制的情况下,这种内存分配就会失败,所以 vector 的拷贝构造函数可能会抛出一个 std::bad_alloc 异常。如果 pop 可以返回栈顶元素值,返回一定是最后执行的语句,stack 在返回前已经弹出了元素,但如果拷贝返回值时抛出异常,就会导致弹出的数据丢失(从栈上移除了,但拷贝失败了)。 这是 std::stack 的设计者将这个操作分解为 top 和 pop 两个方法的原因,但这却造成了多线程环境下的竞争条件。
下面介绍几种方式可以解决这个问题。
方式 1:pop() 传入一个参数(往往是引用)获取“弹出值”。
std::vector<int> result; some_stack.pop(result);
方式 1 是有局限的:需要构造出一个栈中类型的实例,用于接收目标值。对于一些类型,这样做是不现实的,因为临时构造一个实例,从时间和资源的角度上来看都不划算。对于其他的类型,这样也不总行得通,因为构造函数需要的参数,在这个阶段不一定可用。最后,需要可赋值的存储类型,这是一个重大限制:即使支持移动构造,甚至是拷贝构造(从而允许返回一个值),很多用户自定义类型可能都不支持赋值操作。
方式 2:pop() 内部使用不抛出异常的拷贝构造函数或移动构造函数。这种方式过于局限,抛异常的构造函数还是更常见的,这些类型也希望能存入到栈中。
方式 3:pop() 返回指向“弹出值”的指针。
返回一个指向弹出元素的指针,而不是直接返回值。指针的优势是自由拷贝,并且不会产生异常。对于这个方案,返回值使用 std::shared_ptr 是个不错的选择,不仅能避免内存泄露(因为当对象中指针销毁时,对象也会被销毁),而且标准库能够完全控制内存分配方案。这种方式的缺点就是返回指针需要对对象的内存分配进行管理,对于简单数据类型(比如 int),内存管理的开销要远大于直接返回值。
3.1.1.1. 实现线程安全的栈
下面实现一个线程安全的栈,采用了前面介绍的方式 1 和方式 3:
#include <exception> #include <memory> #include <mutex> #include <stack> struct empty_stack: std::exception { const char* what() const throw() { return "empty stack!"; }; }; template<typename T> class threadsafe_stack { private: std::stack<T> data; mutable std::mutex m; public: threadsafe_stack(): data(std::stack<T>()){} threadsafe_stack(const threadsafe_stack& other) { std::lock_guard<std::mutex> lock(other.m); data = other.data; } threadsafe_stack& operator=(const threadsafe_stack&) = delete; void push(T new_value) { std::lock_guard<std::mutex> lock(m); data.push(new_value); } std::shared_ptr<T> pop() // 方式3,返回指向“弹出值”的指针 { std::lock_guard<std::mutex> lock(m); if(data.empty()) throw empty_stack(); std::shared_ptr<T> const res(std::make_shared<T>(data.top())); data.pop(); return res; } void pop(T& value) // 方式 1,传入一个引用,接收“弹出值” { std::lock_guard<std::mutex> lock(m); if(data.empty()) throw empty_stack(); value=data.top(); data.pop(); } bool empty() const { std::lock_guard<std::mutex> lock(m); return data.empty(); } };
3.1.2. 死锁(std::lock,std::scoped_lock)
避免死锁的一般建议,就是让两个互斥量以相同的顺序上锁:比如,总在互斥量 B 之前锁住互斥量 A,就永远不会死锁。
不过,有些情况下,“对多个互斥量以相同的顺序上锁”,这操作起来很困难。
考虑一个操作对同一个类的两个不同实例进行数据的 swap() 操作,为了保证数据交换操作的正确性,就要避免并发修改数据,并确保每个实例上的互斥量都能锁住自己要保护的区域。这种情况,应该按什么顺序加锁呢?
1: class some_big_object; 2: void swap(some_big_object& lhs,some_big_object& rhs); 3: class X 4: { 5: private: 6: some_big_object some_detail; 7: std::mutex m; 8: public: 9: X(some_big_object const& sd):some_detail(sd){} 10: 11: friend void swap(X& lhs, X& rhs) 12: { 13: if(&lhs==&rhs) 14: return; 15: 16: // 我们需要对 lhs.m 和 rhs.m 进行加锁,但应该以什么顺序加锁呢? 17: // 下面是先对 lhs.m 加锁,再对 rhs.m 加锁 18: // 可能死锁! 19: std::lock_guard guard1(lhs.m); 20: std::lock_guard<std::mutex> guard2(rhs.m); 21: 22: swap(lhs.some_detail,rhs.some_detail); 23: } 24: };
上面代码是有问题的。两个线程试图在相同的两个实例间进行数据交换时,程序会死锁!
很幸运,C++ 标准库有办法解决这个问题, std::lock
,它可以一次性锁住多个(两个以上)的互斥量,并且不会有死锁风险。也就是改为下面代码:
class some_big_object; void swap(some_big_object& lhs,some_big_object& rhs); class X { private: some_big_object some_detail; std::mutex m; public: X(some_big_object const& sd):some_detail(sd){} friend void swap(X& lhs, X& rhs) { if(&lhs==&rhs) return; std::lock(lhs.m,rhs.m); // std::lock() 会锁住两个互斥量,且不会死锁 std::lock_guard<std::mutex> lock_a(lhs.m, std::adopt_lock); std::lock_guard<std::mutex> lock_b(rhs.m, std::adopt_lock); swap(lhs.some_detail,rhs.some_detail); } };
其中, std::adopt_lock
参数除了表示 std::lock_guard
可获取锁之外,还表示将锁交由 std::lock_guard
管理。
C++17 提供了 std::scoped_lock<>
这种 RAII 模板类型,它与 std::lock_guard<>
的功能相同,但它可以接受多个互斥量,而且不会死锁。所以上面的 swap() 函数还可以写为:
void swap(X& lhs, X& rhs) { if(&lhs==&rhs) return; std::scoped_lock guard(lhs.m,rhs.m); // 使用 std::scoped_lock 更简单 swap(lhs.some_detail,rhs.some_detail); }
3.1.3. 灵活的 std::unique_lock
std::unique_lock
是个类模板,它支持很多使用场景。
3.1.3.1. lock
下面是使用 std::unique_lock
对互斥量上锁的例子:
#include <mutex> #include <thread> #include <iostream> #include <vector> #include <chrono> int main() { int counter = 0; std::mutex counter_mutex; std::vector<std::thread> threads; auto worker_task = [&](int id) { // don't hold the lock while we simulate an expensive operation std::this_thread::sleep_for(std::chrono::seconds(1)); std::unique_lock<std::mutex> lock(counter_mutex); // 对互斥量上锁 ++counter; std::cout << id << ", initial counter: " << counter << '\n'; lock.unlock(); // 对互斥量解锁 // don't hold the lock while we simulate an expensive operation std::this_thread::sleep_for(std::chrono::seconds(1)); lock.lock(); // 对互斥量上锁 ++counter; std::cout << id << ", final counter: " << counter << '\n'; }; // 超过 scope,会对互斥量解锁 for (int i = 0; i < 10; ++i) threads.emplace_back(worker_task, i); for (auto &thread : threads) thread.join(); }
如果在创建 std::unique_lock
对象时指定 std::defer_lock
,则表示暂时不上锁,如上面代码也可以写为:
#include <mutex> #include <thread> #include <iostream> #include <vector> #include <chrono> int main() { int counter = 0; std::mutex counter_mutex; std::vector<std::thread> threads; auto worker_task = [&](int id) { std::unique_lock<std::mutex> lock(counter_mutex, std::defer_lock); // std::defer_lock 表示暂时不上锁 // don't hold the lock while we simulate an expensive operation std::this_thread::sleep_for(std::chrono::seconds(1)); lock.lock(); // 对互斥量上锁 ++counter; std::cout << id << ", initial counter: " << counter << '\n'; lock.unlock(); // 对互斥量解锁 // don't hold the lock while we simulate an expensive operation std::this_thread::sleep_for(std::chrono::seconds(1)); lock.lock(); // 对互斥量上锁 ++counter; std::cout << id << ", final counter: " << counter << '\n'; }; // 超过 scope,会对互斥量解锁 for (int i = 0; i < 10; ++i) threads.emplace_back(worker_task, i); for (auto &thread : threads) thread.join(); }
3.1.3.2. try lock
std::unique_lock 的 try lock 场景,有两种使用方式。
方式 1:创建 std::unique_lock
对象时,指定 std::try_to_lock
,如:
std::unique_lock<std::mutex> lock(_mutex, std::try_to_lock); // 创建 std::unique_lock 对象时,指定 std::try_to_lock if(lock.owns_lock()){ // owns_lock() 测试 try_to_lock 是否成功 // mutex was locked. } else { // mutex wasn't locked. }
方式 2:创建 std::unique_lock
对象时不加锁,调用 try_lock()
方法尝试加锁,如:
std::unique_lock<std::mutex> lock(mtx, std::defer_lock); // 创建 std::unique_lock 对象时,先不上锁 if(lock.try_lock()) { // 使用 try_lock() 尝试上锁 // mutex was locked. } else { // mutex wasn't locked. }
3.1.3.3. 带有超时的 try lock
try_lock_for
和 try_lock_until
都是带有超时的 try lock,它们的区别在于 try_lock_for
需要指定“相对时间”,而 try_lock_until
需要指定“绝对时间”。下面是 try_lock_for
的使用例子:
// unique_lock::try_lock_for example #include <iostream> // std::cout #include <chrono> // std::chrono::milliseconds #include <thread> // std::thread #include <mutex> // std::timed_mutex, std::unique_lock, std::defer_lock std::timed_mutex mtx; void fireworks () { std::unique_lock<std::timed_mutex> lck(mtx,std::defer_lock); // waiting to get a lock: each thread prints "-" every 200ms: while (!lck.try_lock_for(std::chrono::milliseconds(200))) { std::cout << "-"; } // got a lock! - wait for 1s, then this thread prints "*" std::this_thread::sleep_for(std::chrono::milliseconds(1000)); std::cout << "*\n"; } int main () { std::thread threads[10]; // spawn 10 threads: for (int i=0; i<10; ++i) threads[i] = std::thread(fireworks); for (auto& th : threads) th.join(); return 0; }
3.1.3.4. 转移互斥量所有权
std::unique_lock
实例中的互斥量的所有权可以通过移动操作,在不同的实例中进行传递。
std::unique_lock<std::mutex> get_lock() { extern std::mutex some_mutex; std::unique_lock<std::mutex> lk(some_mutex); // 加锁 prepare_data(); return lk; // lk是自动变量,不需要调用std::move(),编译器负责调用移动构造函数 } void process_data() { std::unique_lock<std::mutex> lk(get_lock()); // 得到了 get_lock() 中的互斥量的所有权 do_something(); } // 超过 scope,解锁
3.2. 保护共享数据的其它设施
互斥量是一种通用的机制,但其并非保护共享数据的唯一方式。有很多方式可以在特定情况下,对共享数据提供合适的保护。
3.2.1. 在初始化过程中保护数据
假设有一个共享源,其初始化的代价很昂贵,它可能会打开一个数据库连接或分配出很多的内存。
下面是单线程代码:
std::shared_ptr<some_resource> resource_ptr; void foo() { if(!resource_ptr) // 检测没有初始化,就初始化 { resource_ptr.reset(new some_resource); // 初始化 } resource_ptr->do_something(); }
如果要改为多线程安全的代码,则可以这样:
std::shared_ptr<some_resource> resource_ptr; std::mutex resource_mutex; void foo() { std::unique_lock<std::mutex> lk(resource_mutex); // 所有线程在此序列化 if(!resource_ptr) { resource_ptr.reset(new some_resource); } lk.unlock(); resource_ptr->do_something(); }
有人可能会用“双重检查锁模式”来避免每次获得锁:
// 下面是错误的实现,有竞争条件 void undefined_behaviour_with_double_checked_locking() { if(!resource_ptr) // 一次检查 { std::lock_guard<std::mutex> lk(resource_mutex); if(!resource_ptr) // 二次检查 { resource_ptr.reset(new some_resource); } } resource_ptr->do_something(); }
不过,上面的实现是存在竞争条件的。
假设有两个线程 A 和 B。线程 A 发现 resource_ptr 指针已经不是 NULL,并不意味着可以安全地调用 do_something()。因为可能另外一个线程 B 刚刚完成对指针写入,线程 A 可能没有看到新创建的 some_resource 实例,然后调用 do_something() 后,得到不正确的结果。关于这个问题,可以参考 https://www.aristeia.com/Papers/DDJ_Jul_Aug_2004_revised.pdf
3.2.1.1. std::once_flag,std::call_once
C++ 标准库提供了 std::once_flag
和 std::call_once
来处理“只需调用一次的情况”。比起锁住互斥量并显式的检查指针,每个线程只需要使用 std::call_once
就可以,在 std::call_once
的结束时,就能安全的知晓指针已经被其他的线程初始化了。如:
std::shared_ptr<some_resource> resource_ptr; std::once_flag resource_flag; // 定义 std::once_flag void init_resource() { resource_ptr.reset(new some_resource); } void foo() { std::call_once(resource_flag,init_resource); // 可以完整的进行一次初始化 resource_ptr->do_something(); }
std::call_once
比使用 mutex 的开销更小。
3.2.1.2. static 变量
在 C++11 之前,static 变量的初始化存在潜在的竞争条件:变量声明为 static 时,声明后就完成了初始化,一个线程完成了初始化,其他线程仍会抢着定义这个变量(每个线程都认为他们是第一个初始化这个变量的线程)。为此, C++11 规定 static 变量的初始化只完全发生在一个线程中,直到初始化完成前其他线程都不会做处理,从而避免了竞争条件。
在只需要一个全局实例情况下,这里提供一个 std::call_once 的替代方案:
class my_class; my_class& get_my_class_instance() { static my_class instance; // 线程安全的初始化过程 return instance; }
3.2.2. 读写锁(std::shared_lock)
从 C++14 开始有了读写锁。C++14 中提供了 std::shared_timed_mutex
,C++17 中提供了 std::shared_mutex~,这两者的区别在于 ~std::shared_timed_mutex
支持更多的操作(如带有超时的加锁),而 std::shared_mutex
性能更好。
使用 std::shared_lock
可以获得 std::shared_timed_mutex/std::shared_mutex 上的“读锁”;而使用 std::unique_lock
(也可以是 std::lock_guard
)可以获得 std::shared_timed_mutex/std::shared_mutex 上的“写锁”。下面是使用例子:
class A { private: mutable std::shared_mutex m; int n = 0; public: int read() { std::shared_lock<std::shared_mutex> lck(m); return n; } void write() { std::unique_lock<std::shared_mutex> lck(m); // 也可以 std::lock_guard<std::shared_mutex> lck(m); ++n; } };
3.2.3. 嵌套锁(std::recursive_mutex)
线程对已经上锁的 std::mutex 再次上锁是错误的,尝试这样做会导致未定义行为。如:
// 错误的例子 #include <thread> #include <mutex> std::mutex m; // 这个例子中改为 std::recursive_mutex m; 可消除未定义行为 void f() { m.lock(); m.unlock(); } void g() { m.lock(); f(); // std::mutex m 已经加锁了,不能再加锁。这导致未定义行为(可能死锁) m.unlock(); } int main() { std::thread t(g); t.join(); }
C++ 提供了 std::recursive_mutex
,它可以在一个线程上多次获取锁,当然每获得一次锁都要对应一次解锁。
4. 同步操作
4.1. 等待事件或条件
一个线程如何等待另一个线程完成任务呢?一种办法是每隔一段时间就检查某个完成标志。如:
bool flag; // 如果另一线程任意,则设置 flag 为 true std::mutex m; // 对 flag 的访问用 m 保护 void wait_for_flag() { std::unique_lock<std::mutex> lk(m); while(!flag) { lk.unlock(); std::this_thread::sleep_for(std::chrono::milliseconds(100)); // 休眠100ms,再检查 lk.lock(); } }
这种方法不好,因为很难确定合适的休眠时间。
C++ 中提供了“条件变量”来通知某事件的发生,下节将介绍它们。
4.1.1. 使用条件变量“等待条件达成”
C++ 标准库对条件变量有两套实现: std::condition_variable
和 std::condition_variable_any
,前者仅能与 std::mutex
一起工作,而后者可以和合适的互斥量一起工作,从而加上了 _any
的后缀。 std::condition_variable_any
更加通用,不过在性能和系统资源的使用方面会有更多的开销,所以通常会将 std::condition_variable
作为首选类型。当对灵活性有要求时,才会考虑 std::condition_variable_any
。
下面介绍 std::condition_variable
的使用:
#include <iostream> #include <condition_variable> #include <thread> #include <chrono> std::condition_variable cv; std::mutex cv_m; // This mutex is used for three purposes: // 1) to synchronize accesses to i // 2) to synchronize accesses to std::cerr // 3) for the condition variable cv int i = 0; void waits() { std::unique_lock<std::mutex> lk(cv_m); std::cerr << "Waiting... \n"; // cv.wait 功能: // 1、解锁 lk 关联的 cv_m(调用 cv.wait 前,cv_m 必须是上锁的状态); // 2、阻塞线程直到被其它线程通过条件变量 cv 唤醒它; // 3、再次对 cv_m 加锁; // 4、检测条件 i == 1 是否满足,如果满足就往下走;如果不满足则重新回到步骤 1。 cv.wait(lk, []{return i == 1;}); std::cerr << "...finished waiting. i == 1\n"; } void signals() { std::this_thread::sleep_for(std::chrono::seconds(1)); { std::lock_guard<std::mutex> lk(cv_m); std::cerr << "Notifying...\n"; } cv.notify_all(); // 第一次 notify_all(); std::this_thread::sleep_for(std::chrono::seconds(1)); { std::lock_guard<std::mutex> lk(cv_m); i = 1; std::cerr << "Notifying again...\n"; } cv.notify_all(); // 第二次 notify_all(); } int main() { std::thread t1(waits), t2(waits), t3(waits), t4(signals); t1.join(); t2.join(); t3.join(); t4.join(); }
编译并执行上面代码,会输出:
Waiting... Waiting... Waiting... Notifying... Notifying again... ...finished waiting. i == 1 ...finished waiting. i == 1 ...finished waiting. i == 1
从输出中可以知道,第一次调用 cv.notify_all();
并没有唤醒另外 3 个线程,这是因为第一次调用 cv.notify_all();
时, i == 1
这个条件并没有满足;第二次调用 cv.notify_all();
时唤醒了另外 3 个线程。
4.1.2. 使用条件变量实现线程安全的队列
下面是使用条件变量实现线程安全的队列的例子:
#include <queue> #include <memory> #include <mutex> #include <condition_variable> template<typename T> class threadsafe_queue { private: mutable std::mutex mut; // 互斥量成员必须为mutable,才能在empty() 和拷贝构造函数中进行上锁 std::queue<T> data_queue; std::condition_variable data_cond; public: threadsafe_queue() {} threadsafe_queue(threadsafe_queue const& other) // other 是 const 的,mut 必须是 mutable 才能上锁 { std::lock_guard<std::mutex> lk(other.mut); data_queue=other.data_queue; } void push(T new_value) { std::lock_guard<std::mutex> lk(mut); data_queue.push(new_value); data_cond.notify_one(); } void wait_and_pop(T& value) // 等待有值时才返回 { std::unique_lock<std::mutex> lk(mut); data_cond.wait(lk,[this]{return !data_queue.empty();}); value=data_queue.front(); data_queue.pop(); } std::shared_ptr<T> wait_and_pop() // 等待有值时才返回 { std::unique_lock<std::mutex> lk(mut); data_cond.wait(lk,[this]{return !data_queue.empty();}); std::shared_ptr<T> res(std::make_shared<T>(data_queue.front())); data_queue.pop(); return res; } bool try_pop(T& value) // 尝试从队列中弹出数据,如要没有值,也会直接返回 { std::lock_guard<std::mutex> lk(mut); if(data_queue.empty()) return false; value=data_queue.front(); data_queue.pop(); return true; } std::shared_ptr<T> try_pop() // 尝试从队列中弹出数据,如要没有值,也会直接返回 { std::lock_guard<std::mutex> lk(mut); if(data_queue.empty()) return std::shared_ptr<T>(); std::shared_ptr<T> res(std::make_shared<T>(data_queue.front())); data_queue.pop(); return res; } bool empty() const // empty() 标记成了 const,所以 mut 必须是 mutable 才能上锁 { std::lock_guard<std::mutex> lk(mut); return data_queue.empty(); } };
4.2. 使用 future
4.2.1. 后台任务的返回值
当不着急让任务结果时,可以使用 std::async
启动一个异步任务。与 std::thread
对象等待的方式不同, std::async
会返回一个 std::future
对象,这个对象持有最终计算出来的结果。当需要这个值时,只需要调用这个对象的 get()
成员函数,就会阻塞线程直到 future 为就绪为止,并返回计算结果。
下面是 future 的使用例子:
#include <future> #include <thread> #include <iostream> int find_the_answer() { std::this_thread::sleep_for(std::chrono::seconds(3)); return 100; } void do_other_stuff() { std::this_thread::sleep_for(std::chrono::seconds(1)); } int main() { std::future<int> the_answer = std::async(find_the_answer); // std::async 会返回 std::future do_other_stuff(); std::cout<<"do_other_stuff finished."<<std::endl; std::cout<<"The answer is "<<the_answer.get()<<std::endl; }
与 std::thread
方式一样, std::async
允许通过添加额外的调用参数,向函数传递额外的参数。如:
#include <string> #include <future> struct X { void foo(int,std::string const&); std::string bar(std::string const&); }; X x; auto f1=std::async(&X::foo,&x,42,"hello"); // 调用 p->foo(42, "hello"),p 是指向 x 的指针 auto f2=std::async(&X::bar,x,"goodbye"); // 调用 tmpx.bar("goodbye"), tmpx 是 x 的拷贝副本 struct Y { double operator()(double); }; Y y; auto f3=std::async(Y(),3.141); // 调用 tmpy(3.141),tmpy 通过 Y 的移动构造函数得到 auto f4=std::async(std::ref(y),2.718); // 调用 y(2.718) X baz(X&); std::async(baz,std::ref(x)); // 调用 baz(x)
4.2.2. 指定执行后台任务的方式(std::launch::deferred,std::launch::async)
在使用 std::async
启动后台任务时,可以在第 1 个参数中指定后台任务的执行方式。如:
int f(); auto ft1 = std::async(std::launch::async, f); // 函数 f 在不同的线程上异步执行 auto ft2 = std::async(std::launch::deferred, f); // 函数 f 在当前线程中延迟执行,在返回的 future 上调用get() 或wait() 时才运行 auto ft3 = std::async(std::launch::async | std::launch::deferred, f); // 由实现选择执行方式 auto ft4 = std::async(f) // 同上,则实现选择执行方式
4.2.3. std::packaged_task
std::async
并不是得到 std::future
的唯一方式,我们还可以使用 std::packaged_task
让 std::future
和某个任务进行关联。如:
#include <iostream> #include <cmath> #include <thread> #include <future> #include <functional> // unique function to avoid disambiguating the std::pow overload set int f(int x, int y) { return std::pow(x,y); } void task_lambda() { std::packaged_task<int(int,int)> task([](int a, int b) { return std::pow(a, b); }); std::future<int> result = task.get_future(); // 得到 future task(2, 9); // 调用 std::packaged_task 对象,future 会设置上对应值,变为“就绪状态” std::cout << "task_lambda:\t" << result.get() << '\n'; } void task_bind() { std::packaged_task<int()> task(std::bind(f, 2, 11)); std::future<int> result = task.get_future(); task(); // 调用 std::packaged_task 对象,future 会设置上对应值,变为“就绪状态” std::cout << "task_bind:\t" << result.get() << '\n'; } void task_thread() { std::packaged_task<int(int,int)> task(f); std::future<int> result = task.get_future(); std::thread task_td(std::move(task), 2, 10); task_td.join(); std::cout << "task_thread:\t" << result.get() << '\n'; } int main() { task_lambda(); task_bind(); task_thread(); }
4.2.4. std::promises
future 是“未来值”,而 promise 可以认为是“未来值的占位符”。下面是 promise 的例子:
#include <future> #include <thread> #include <iostream> int main() { std::promise<int> ps; std::future<int> ft = ps.get_future(); ps.set_value(100); // “未来值”确定后就不能再改变,set_value 只能调用一次 std::cout << ft.get(); // 输出 100 }
下面再介绍一个 promise 的例子:
#include <vector> #include <thread> #include <future> #include <numeric> #include <iostream> #include <chrono> void accumulate(std::vector<int>::iterator first, std::vector<int>::iterator last, std::promise<int> accumulate_promise) { int sum = std::accumulate(first, last, 0); accumulate_promise.set_value(sum); // Notify future } int main() { // Demonstrate using promise<int> to transmit a result between threads. std::vector<int> numbers = { 1, 2, 3, 4, 5, 6 }; std::promise<int> accumulate_promise; std::future<int> accumulate_future = accumulate_promise.get_future(); std::thread work_thread(accumulate, numbers.begin(), numbers.end(), std::move(accumulate_promise)); // future::get() will wait until the future has a valid result and retrieves it. std::cout << "result=" << accumulate_future.get() << '\n'; // 输出 result=21 work_thread.join(); }
4.2.5. future 和 promise 中的异常
先看一下 future 中的异常:
#include <thread> #include <future> #include <iostream> #include <math.h> double square_root(double x) { if(x<0) { throw std::out_of_range("x < 0"); // 抛出异常 } return sqrt(x); } int main() { std::future<double> f=std::async(square_root, -1); // square_root 抛出的异常会保存在 future 中,调用 get() 时会获得这个异常 double y; try { y=f.get(); // 如果 square_root 抛出异常,那么 get() 也会抛出异常 } catch (const std::exception& e) { std::cout << "Caught exception \"" << e.what() << "\"\n"; } }
在 promise 中,使用 set_exception()
可以设置异常,如:
extern std::promise<double> some_promise; try { some_promise.set_value(square_root(-1)); } catch(...) { some_promise.set_exception(std::current_exception()); }
4.2.6. 析构前需要设置值
如果 std::packaged_task
和 std::promise
直到析构都未设置值, std::future::get
会抛出异常。
下面是 std::packaged_task
直到析构都未设置值的例子:
#include <thread> #include <future> int f() {return 100;} int main() { std::future<int> ft; { std::packaged_task<int()> pt(f); ft = pt.get_future(); // pt(); // 把这行的注释取消,后面的 ft.get() 就不会抛出异常了 } // pt 超过 scope,会析构 ft.get(); // 会抛出异常,因为 pt 析构了,future ft 也没有设置值 }
下面是 std::promise
直到析构都未设置值的例子:
#include <thread> #include <future> int main() { std::future<int> ft; { std::promise<int> ps; ft = ps.get_future(); // ps.set_value(200); // 把这行的注释取消,后面的 ft.get() 就不会抛出异常了 } // ps 超过 scope,会析构 ft.get(); // 会抛出异常,因为 ps 析构了,future ft 也没有设置值 }
4.2.7. std::shared_future(多次调用 get)
std::future
调用 get()
后就无法再次调用 get()
,也就是说只能获取一次数据。如:
#include <future> #include <thread> #include <iostream> int main() { std::promise<int> ps; std::future<int> ft = ps.get_future(); ps.set_value(100); std::cout << ft.get(); // 输出 100 std::cout << ft.get(); // 这里会出错,因为 future 的 get() 只能调用一次 }
如果使用 std::shared_future
,则可以多次调用 get()
,如:
#include <future> #include <thread> #include <iostream> int main() { std::promise<int> ps; std::shared_future<int> ft(ps.get_future()); // 这行也可以改为: auto ft = ps.get_future().share(); ps.set_value(100); std::cout << ft.get(); // 输出 100 std::cout << ft.get(); // 输出 100 (ft 是 std::shared_future ,可以调用多次 get) }
5. 并行算法
C++17 为标准库添加了并行算法。它是对之前已存在的一些标准算法的重载,例如: std::find
, std::transform
和 std::reduce
等。并行版本的签名除了和单线程版本的函数签名相同之外,在参数列表的中新添加了一个参数(第一个参数)——指定要使用的执行策略。例如:
std::vector<int> my_data; std::sort(std::execution::par, my_data.begin(), my_data.end());
std::execution::par
是执行策略,表示使用多个线程调用此并行算法。
5.1. 执行策略
C++17 中定义了 3 个执行策略(它们是 C++ 类):
- std::execution::sequenced_policy
- std::execution::parallel_policy
- std::execution::parallel_unsequenced_policy
它们定义在头文件 <execution>
中,该头文件还分别为每个执行策略类定义了相应的对象:
- std::execution::seq
- std::execution::par
- std::execution::par_unseq
5.1.1. std::execution::sequenced_policy
std::execution::sequenced_policy 策略要求相关操作“顺序地”在当前调用线程上执行,它不是一个“并行”策略。注意,它只是要求“顺序地”(一个一个地)执行,但并没有规定具体的顺序。下面是一个例子:
std::vector<int> v(1000); int n = 0; std::for_each(std::execution::seq, v.begin(), v.end(), [&](int& x){ x = ++n; }); // 1 到 1000 存在 v 中,但顺序是未指定的
5.1.2. std::execution::parallel_policy
std::execution::parallel_policy 策略提供了基本的跨多个线程的并行执行,操作可以执行在调用算法的线程上,或执行在由库创建的线程上。
下面该策略是使用例子:
std::for_each(std::execution::par, v.begin(), v.end(), [](auto& x){ ++x; }); // 对 v 中每个元素加 1,并行执行
由于是并行执行,所有有共享数据访问时可能有竞争条件,如:
// 这是有问题的代码,对 count 的访问有竞争! std::vector<int> v(1000); int count = 0; std::for_each(std::execution::par, v.begin(), v.end(), [&](int& x){ x=++count; });
要解决上面问题,可以在操作 count 前加锁,如:
std::vector<int> v(1000); int count = 0; std::mutex m; std::for_each(std::execution::par, v.begin(), v.end(), [&](int& x){ std::lock_guard<std::mutex> guard(m); x=++count; });
此外,还可以把 count 修改为原子变量。
6. 参考
本文主要摘自:
C++ Concurrency in Action, 2ed
文中的一些代码实例摘自:
https://en.cppreference.com/w/
http://www.cplusplus.com/reference/