C++ Concurrency
Table of Contents
1. C++ 并发简介
C++11 中对并发添加了语言级别的支持,如增加了 std::thread, std::future, std::promise 等设施。C++14/C++17 中对并发的支持进一步完善。
1.1. Hello Concurrent World
下面是一个简单的 C++ 并发程序:
#include <iostream>
#include <thread>
void hello()
{
std::cout << "Hello Concurrent World\n";
}
int main()
{
std::thread t(hello); // 创建一个新线程 t,t 将执行函数 hello
t.join(); // 等待线程 t,防止 main 比 t 先执行结束
}
2. 管理线程
2.1. 启动线程
构造 std::thread 对象可以启动新线程。如:
void do_some_work(); std::thread my_thread(do_some_work); // do_some_work 是普通函数
std::thread 也可以通过有函数操作符的类的实例进行构造,如:
class background_task
{
public:
void operator()() const
{
do_something();
do_something_else();
}
};
background_task f;
std::thread my_thread(f); // f 是具有 operator() 的类的实例
上面代码中,提供的函数对象 f 会复制到新线程的存储空间中,函数对象的执行和调用都在线程的内存空间中进行。
需要注意的是,当把函数对象传入到线程构造函数中时,需要避免 C++’s most vexing parse 问题。如果你传递了一个临时变量,而不是一个命名的变量。C++ 编译器会将其解析为函数声明,而不是类型对象的定义。
std::thread my_thread(background_task()); // C++ 编译器认为这里声明了一个名为 my_thread 的函数
那怎么用临时变量构造 std::thread 对象呢?有两种方式,一是多加一对小括号;二是使用统一的初始化语法。如:
std::thread my_thread((background_task())); // 解决 most vexing parse 的方法一:多加一对小括号
std::thread my_thread{background_task()}; // 解决 most vexing parse 的方法二:使用统一的初始化语法
除此外,使用 Lambda 表达式也能避免这个问题:
std::thread my_thread([]{ // 使用Lambda 表达式构造 std::thread 对象
do_something();
do_something_else();
});
2.1.1. 不要用访问局部变量的函数去创建线程
考虑下面例子:
struct func
{
int& i;
func(int& i_) : i(i_) {}
void operator() ()
{
for (unsigned j=0 ; j<1000000 ; ++j)
{
do_something(i); // 潜在访问隐患:空引用
}
}
};
void oops()
{
int some_local_state=0; // 局部变量
func my_func(some_local_state);
std::thread my_thread(my_func);
my_thread.detach(); // 不等待线程结束
} // 新线程可能还在运行
上面代码中,函数 oops() 中创建了线程 my_thread ,这个新线程执行 my_func ,但 my_func 使用了局部变量 some_local_state 。
如果函数 oops() 结束,那么局部变量 some_local_state 会被销毁;那么线程 my_thread 还访问这个局部变量就会出问题。要解决这个问题,可以 1、在 oops() 函数中通过调用 join() 等待线程 my_thread ;2、把局部变量 some_local_state 改为全局变量;3、修改 func 的定义,把 i 从引用变量改为普通变量。
2.1.2. 要么 detach,要么 join
线程启动后要决定“等待线程结束”(调用 join),还是“让其自主运行”(调用 detach)。 如果 std::thread 对象在销毁之前还没有做出决定,程序就会终止, std::thread 的析构函数会调用 std::terminate() 。
比如,下面程序由于线程 t 在其销毁之间,既没有调用 join,也没有调用 detach,会导致程序 terminate:
#include <iostream>
#include <thread>
void hello()
{
std::cout << "Hello Concurrent World\n";
}
int main()
{
std::thread t(hello); // 线程 t 在销毁之前还没有调用 detach,也没有调用 join,程序会 terminate
}
2.1.3. 确保线程的 join 方法被调用
假设我们想在函数 f 结束之间调用 t.join() ,可以这样做:
void f()
{
int some_local_state=0;
func my_func(some_local_state);
std::thread t(my_func);
try
{
do_something_in_current_thread();
}
catch(...)
{
t.join(); // 1 这一处容易忘记,确保 do_something_in_current_thread 有异常时也调用 t.join()
throw;
}
t.join(); // 2
}
要写两个 t.join() ,这多少有些繁琐,下节将介绍使用 RAII 确保线程的 join 方法被调用。
2.1.4. 使用 RAII 确保线程的 join 方法被调用(thread_guard)
下面介绍使用“资源获取即初始化方式”(RAII,Resource Acquisition Is Initialization)来确保线程的 join 方法被调用。即,提供一个类,在析构函数中使用 join()。
下面 thread_guard 类可以帮忙我们调用线程的 join 方法:
class thread_guard
{
std::thread& t;
public:
explicit thread_guard(std::thread& t_):
t(t_)
{}
~thread_guard()
{
if(t.joinable())
{
t.join();
}
}
thread_guard(thread_guard const&)=delete;
thread_guard& operator=(thread_guard const&)=delete;
};
类 thread_guard 中拷贝构造函数和拷贝赋值操作标记为 =delete ,是为了不让编译器自动生成。直接对 thread_guard 对象进行拷贝或赋值是很危险的,因为这可能会弄丢已汇入的线程。通过删除声明,任何尝试给 thread_guard 对象赋值的操作都会引发一个编译错误。
下面是 thread_guard 的使用例子:
struct func
{
int& i;
func(int& i_) : i(i_) {}
void operator() ()
{
for (unsigned j=0 ; j<1000000 ; ++j)
{
do_something(i);
}
}
};
void f()
{
int some_local_state=0;
func my_func(some_local_state);
std::thread t(my_func);
thread_guard g(t); // 使用 thread_guard 确保 t 的 join 一定会被调用,防止程序 terminate
do_something_in_current_thread();
};
2.2. 启动线程时传递参数
在启动线程时,可以指定线程所关联函数的参数,如:
void f(int i, std::string const& s); std::thread t(f, 3, "hello"); // 第一个参数为线程所关联函数,其余参数为函数的参数
在上面这个例子中,需要说明的是 f 的第 2 个参数是 std::string 类型,而我们传递进去的是 char const * 类型,这里有个类型转换, 这个类型转换是在“新线程的上下文”中进行的。 我们考虑一个危险的例子:
// 下面代码有问题!
void f(int i, std::string const& s); // f 第二个参数是 std::string
void oops(int some_param)
{
char buffer[1024];
sprintf(buffer, "%i", some_param);
std::thread t(f, 3, buffer); // 传给 f 的第二参数是 char const *
t.detach();
}
由于 buffer 转换为 std::string 这个操作是在“新线程的上下文”中进行的,这里就有问题了:函数 oops 可能会在 buffer 转换成 std::string 之前就结束,这时新线程再去访问 buffer 就有问题了( buffer 已经被释放)。这个问题的解决方案就是在传递到 std::thread 构造函数之前,就将字面值转化为 std::string ,如:
void f(int i, std::string const& s);
void not_oops(int some_param)
{
char buffer[1024];
sprintf(buffer, "%i", some_param);
std::thread t(f, 3, std::string(buffer)); // 提前用 std::string,避免在“新线程的上下文”进行类型转换时 buffer 可能失效
t.detach();
}
2.2.1. 启动线程时传递引用类型的参数
在启动线程时,如果所关联的函数的参数是引用类型,则传参时需要使用 std::ref 。
考虑下面例子:
#include <iostream>
#include <thread>
void func1(int& n) { ++n; } // func1 的参数是非常量“引用”类型
int main()
{
int i = 1;
std::thread t(func1, i); // 这里编译时会出错
t.join();
std::cout << i;
}
std::thread 的构造函数无视函数参数类型,盲目地拷贝已提供的变量。不过,内部代码会将拷贝的参数以右值的方式进行传递,这是为了那些只支持移动的类型,而后会尝试以右值为实参调用函数 func1 。但因为函数 func1 期望的是一个非常量引用作为参数(而非右值),所以会在编译时出错。这个问题解决办法很简单:使用 std::ref 将参数转换成引用的形式,即:
#include <iostream>
#include <thread>
void func1(int& n) { ++n; } // func1 的参数是非常量“引用”
int main()
{
int i = 1;
std::thread t(func1, std::ref(i)); // func1 参数是引用,使用 std::ref(i)
t.join();
std::cout << i; // 输出 2
}
2.2.2. 启动线程时指定对象成员函数
在启动线程时,也可以指定对象的成员函数。指定方式如下:
class X
{
public:
void do_lengthy_work(int);
};
X my_x;
int num(0);
std::thread t(&X::do_lengthy_work, &my_x, num); // 第 1 个参数是对象成员函数,第 2 个参数是对象地址,第 3 个参数是成员函数参数
2.2.3. 参数是 move-only 对象
在启动线程时,如果所关联的函数的参数是 move-only 对象,则需要使用 std::move 。
#include <iostream>
#include <thread>
void func1(std::unique_ptr<int> p)
{
std::cout << *p;
}
int main()
{
std::unique_ptr<int> p(new int(100)); // int(100) 的所有权从 p 转移到线程里,再转移给 func1
std::thread t(func1, std::move(p));
t.join();
}
2.3. 转移所有权
C++ 标准库中有很多资源占有(resource-owning)类型,比如 std::ifstream,std::unique_ptr 还有 std::thread 都是可移动,但不可复制。
执行线程的所有权可以在 std::thread 实例中移动,下面将展示一个例子。例子中,创建了两个执行线程,并在 std::thread 实例之间(t1,t2 和 t3)转移所有权:
#include <iostream>
#include <thread>
#include <chrono>
void some_function() {std::cout << "this is some_function\n";}
void some_other_function() {std::cout << "this is some_other_function\n";}
int main() {
std::thread t1(some_function);
std::thread t2=std::move(t1); // 把 t1 关联的执行线程的所有权转移给 t2
t1=std::thread(some_other_function); // 临时std::thread对象启动,转换给 t1。执行线程所有者是临时对象,不用显式的调用 std::move()
std::thread t3; // 启动 t3,但没有与任何线程进行关联
t3=std::move(t2); // 将 t2 的执行线程的所有权转移到 t3 中,t2 是一个命名对象,需要显式的调用 std::move()
// t1=std::move(t3); // 这行会报错。因为 t1 已经有了一个关联的线程。
t1.detach();
// t2.detach(); // 这行会报错。因为 t2 已经没有关联的线程了。
t3.detach();
std::this_thread::sleep_for(std::chrono::seconds(2));
}
2.3.1. 函数返回 std::thread 对象
std::thread 支持移动,线程的所有权可以在函数外进行转移,就如下面程序一样。
void func1(int i) { std::cout << i; }
std::thread g() // 函数 g 返回 std::thread 对象
{
return std::thread(func1, 100);
}
int main()
{
std::thread t{g()};
t.join();
}
2.3.2. 参数为 std::thread 对象
下面介绍函数参数为 std::thread 对象的情况:
void f(std::thread t); // f 的参数为 std::thread
void g()
{
f(std::thread(some_function)); // 调用 f,由于是临时对象,可以省略 std::move
std::thread t(some_function);
f(std::move(t)); // 调用 f,必须用 std::move
}
2.3.3. scoped_thread(thread_guard 的改进)
当某个对象转移了线程的所有权,就不能对线程进行 join 或 detach 了(参见节 2.3)。这导致在节 2.1.4 中介绍的类 thread_guard 中存在一个问题:代码 thread_guard g(t); 中,如果 t 对象所持有的线程被转移给了其它对象,那么 thread_guard 的析构函数就会出问题。
利用 std::thread 支持移动的特性可以解决上面的问题。我们定义另外一个类 scoped_thread ,它和节 2.1.4 中介绍的类 thread_guard 的一个重要区别在于: scoped_thread 类拥有线程的所有权:
class scoped_thread // 确保 t 调用 join(拥有 t 的所有权,不会出现调用 join 时 t 所有权已转移的情况)
{
std::thread t;
public:
explicit scoped_thread(std::thread t_):
t(std::move(t_))
{
if(!t.joinable())
throw std::logic_error("No thread");
}
~scoped_thread()
{
t.join();
}
scoped_thread(scoped_thread const&)=delete;
scoped_thread& operator=(scoped_thread const&)=delete;
};
struct func;
void f()
{
int some_local_state;
scoped_thread t(std::thread(func(some_local_state)));
do_something_in_current_thread();
}
C++17 标准给出一个建议,就是添加一个 joining_thread 的类型,这个类型与 std::thread 类似,不同是的添加了析构函数,就类似于 scoped_thread 。委员会成员们对此并没有达成统一共识,所以这个类没有添加入 C++17 标准中(C++20 仍旧对这种方式进行探讨,不过名称为 std::jthread )。
2.4. 确定线程数量
std::thread::hardware_concurrency() 会返回并发线程的数量。例如,多核系统中,返回值可以是 CPU 核芯的数量;当无法获取时,函数返回 0。如:
#include <iostream>
#include <thread>
int main()
{
std::cout << std::thread::hardware_concurrency();
}
2.5. 标识一个线程
在当前线程中调用 std::this_thread::get_id() 可以得到线程的标识,这个标识是 std::thread::id 类型。
#include <iostream>
#include <thread>
int main()
{
std::cout << std::this_thread::get_id() << std::endl; // 输出实例 0x11b703d40
std::thread t;
std::cout << t.get_id() << std::endl; // 输出 0x0,t 还没有关联真正线程
}
3. 线程之间共享数据
3.1. 使用互斥量(std::lock_guard,std::scoped_lock)
通过实例化 std::mutex 创建互斥量实例,成员函数 lock() 可对互斥量上锁, unlock() 为解锁。不过,不推荐直接去调用成员函数,调用成员函数就意味着,必须在每个函数出口都要去调用 unlock() 。 C++ 标准库为互斥量提供了 RAII 模板类 std::lock_guard ,在构造时就能提供已锁的互斥量,并在析构时进行解锁,从而保证了互斥量能被正确解锁。
下面的代码中,展示了如何在多线程应用中,使用 std::mutex 构造的 std::lock_guard 实例,对列表进行访问保护:
#include <list>
#include <mutex>
#include <algorithm>
std::list<int> some_list;
std::mutex some_mutex; // some_mutex 将保护对 some_list 的访问
void add_to_list(int new_value)
{
std::lock_guard<std::mutex> guard(some_mutex); // 构造 guard 时,对 some_mutex 加锁;析构时对 some_mutex 解锁
some_list.push_back(new_value);
}
bool list_contains(int value_to_find)
{
std::lock_guard<std::mutex> guard(some_mutex); // 构造 guard 时,对 some_mutex 加锁;析构时对 some_mutex 解锁
return std::find(some_list.begin(),some_list.end(),value_to_find) != some_list.end();
}
C++17 中添加了一个新特性,称为“模板类参数推导”,类似 std::lock_guard 这样简单的模板类型,其模板参数列表可以省略。所有下面两个等价:
std::lock_guard<std::mutex> guard(some_mutex); std::lock_guard guard(some_mutex); // 上一行的简写形式,C++17 支持“模板类参数”推导
C++17 提供了加强版的 std::scoped_lock ,它可以接受多个 std::mutex 为参数,可完全取代 std::lock_guard 。
所以,在 C++17 的环境下,上面的代码也可以写成:
std::scoped_lock guard(some_mutex); // C++17 中引入了 std::scoped_lock
3.1.1. 接口间的竞争条件
我们知道栈 std::stack 有下面五个方法:
push() 让一个新元素进栈 pop() 让一个元素出栈,但不会返回栈顶元素 top() 查看栈顶元素 empty() 判断栈是否是空栈 size() 返回栈中有多少个元素
假设上面的每个方法,我们在内部使用了互斥量进行保护;比如,多个线程同时调用 push() 也不会出错。那么这是否意味着对它们的使用就是线程安全的呢?答案是否定的。
考虑下面场景,使用下面代码可以从栈中取出一个元素:
stack<int> s; //...... int const value = s.top(); s.pop(); do_something(value);
这个代码在单线程情况下可以正确工作;但在多线程的情况下,则有竞争条件。假设栈里有两个元素 1,2(元素 1 在栈底,元素 2 在栈顶),考虑表 1 所示场景:
| Thread A | Thread B |
|---|---|
| int const value = s.top(); // 取栈顶元素 2 | |
| int const value = s.top(); // 取栈顶元素 2 | |
| s.pop(); // 弹出 2 | |
| do_something(value); // 处理元素 2 | s.pop(); // 弹出 1 |
| do_something(value); // 处理元素 2 |
这导致一个不期望的情况:栈顶的元素 2 分别被线程 A 和线程 B 处理了,也就是处理了两次!而元素 1 没有被处理,但也从出栈了。
你可能会问:为什么不直接让 pop 返回栈顶元素。原因在于,假设有一个 stack<vector<int>>,拷贝 vector 时需要在堆上分配内存,当这个系统处在重度负荷,或有严重的资源限制的情况下,这种内存分配就会失败,所以 vector 的拷贝构造函数可能会抛出一个 std::bad_alloc 异常。如果 pop 可以返回栈顶元素值,返回一定是最后执行的语句,stack 在返回前已经弹出了元素,但如果拷贝返回值时抛出异常,就会导致弹出的数据丢失(从栈上移除了,但拷贝失败了)。 这是 std::stack 的设计者将这个操作分解为 top 和 pop 两个方法的原因,但这却造成了多线程环境下的竞争条件。
下面介绍几种方式可以解决这个问题。
方式 1:pop() 传入一个参数(往往是引用)获取“弹出值”。
std::vector<int> result; some_stack.pop(result);
方式 1 是有局限的:需要构造出一个栈中类型的实例,用于接收目标值。对于一些类型,这样做是不现实的,因为临时构造一个实例,从时间和资源的角度上来看都不划算。对于其他的类型,这样也不总行得通,因为构造函数需要的参数,在这个阶段不一定可用。最后,需要可赋值的存储类型,这是一个重大限制:即使支持移动构造,甚至是拷贝构造(从而允许返回一个值),很多用户自定义类型可能都不支持赋值操作。
方式 2:pop() 内部使用不抛出异常的拷贝构造函数或移动构造函数。这种方式过于局限,抛异常的构造函数还是更常见的,这些类型也希望能存入到栈中。
方式 3:pop() 返回指向“弹出值”的指针。
返回一个指向弹出元素的指针,而不是直接返回值。指针的优势是自由拷贝,并且不会产生异常。对于这个方案,返回值使用 std::shared_ptr 是个不错的选择,不仅能避免内存泄露(因为当对象中指针销毁时,对象也会被销毁),而且标准库能够完全控制内存分配方案。这种方式的缺点就是返回指针需要对对象的内存分配进行管理,对于简单数据类型(比如 int),内存管理的开销要远大于直接返回值。
3.1.1.1. 实现线程安全的栈
下面实现一个线程安全的栈,采用了前面介绍的方式 1 和方式 3:
#include <exception>
#include <memory>
#include <mutex>
#include <stack>
struct empty_stack: std::exception
{
const char* what() const throw() {
return "empty stack!";
};
};
template<typename T>
class threadsafe_stack
{
private:
std::stack<T> data;
mutable std::mutex m;
public:
threadsafe_stack(): data(std::stack<T>()){}
threadsafe_stack(const threadsafe_stack& other)
{
std::lock_guard<std::mutex> lock(other.m);
data = other.data;
}
threadsafe_stack& operator=(const threadsafe_stack&) = delete;
void push(T new_value)
{
std::lock_guard<std::mutex> lock(m);
data.push(new_value);
}
std::shared_ptr<T> pop() // 方式3,返回指向“弹出值”的指针
{
std::lock_guard<std::mutex> lock(m);
if(data.empty()) throw empty_stack();
std::shared_ptr<T> const res(std::make_shared<T>(data.top()));
data.pop();
return res;
}
void pop(T& value) // 方式 1,传入一个引用,接收“弹出值”
{
std::lock_guard<std::mutex> lock(m);
if(data.empty()) throw empty_stack();
value=data.top();
data.pop();
}
bool empty() const
{
std::lock_guard<std::mutex> lock(m);
return data.empty();
}
};
3.1.2. 死锁(std::lock,std::scoped_lock)
避免死锁的一般建议,就是让两个互斥量以相同的顺序上锁:比如,总在互斥量 B 之前锁住互斥量 A,就永远不会死锁。
不过,有些情况下,“对多个互斥量以相同的顺序上锁”,这操作起来很困难。
考虑一个操作对同一个类的两个不同实例进行数据的 swap() 操作,为了保证数据交换操作的正确性,就要避免并发修改数据,并确保每个实例上的互斥量都能锁住自己要保护的区域。这种情况,应该按什么顺序加锁呢?
1: class some_big_object; 2: void swap(some_big_object& lhs,some_big_object& rhs); 3: class X 4: { 5: private: 6: some_big_object some_detail; 7: std::mutex m; 8: public: 9: X(some_big_object const& sd):some_detail(sd){} 10: 11: friend void swap(X& lhs, X& rhs) 12: { 13: if(&lhs==&rhs) 14: return; 15: 16: // 我们需要对 lhs.m 和 rhs.m 进行加锁,但应该以什么顺序加锁呢? 17: // 下面是先对 lhs.m 加锁,再对 rhs.m 加锁 18: // 可能死锁! 19: std::lock_guard guard1(lhs.m); 20: std::lock_guard<std::mutex> guard2(rhs.m); 21: 22: swap(lhs.some_detail,rhs.some_detail); 23: } 24: };
上面代码是有问题的。两个线程试图在相同的两个实例间进行数据交换时,程序会死锁!
很幸运,C++ 标准库有办法解决这个问题, std::lock ,它可以一次性锁住多个(两个以上)的互斥量,并且不会有死锁风险。也就是改为下面代码:
class some_big_object;
void swap(some_big_object& lhs,some_big_object& rhs);
class X
{
private:
some_big_object some_detail;
std::mutex m;
public:
X(some_big_object const& sd):some_detail(sd){}
friend void swap(X& lhs, X& rhs)
{
if(&lhs==&rhs)
return;
std::lock(lhs.m,rhs.m); // std::lock() 会锁住两个互斥量,且不会死锁
std::lock_guard<std::mutex> lock_a(lhs.m, std::adopt_lock);
std::lock_guard<std::mutex> lock_b(rhs.m, std::adopt_lock);
swap(lhs.some_detail,rhs.some_detail);
}
};
其中, std::adopt_lock 参数除了表示 std::lock_guard 可获取锁之外,还表示将锁交由 std::lock_guard 管理。
C++17 提供了 std::scoped_lock<> 这种 RAII 模板类型,它与 std::lock_guard<> 的功能相同,但它可以接受多个互斥量,而且不会死锁。所以上面的 swap() 函数还可以写为:
void swap(X& lhs, X& rhs)
{
if(&lhs==&rhs)
return;
std::scoped_lock guard(lhs.m,rhs.m); // 使用 std::scoped_lock 更简单
swap(lhs.some_detail,rhs.some_detail);
}
3.1.3. 灵活的 std::unique_lock
std::unique_lock 是个类模板,它支持很多使用场景。
3.1.3.1. lock
下面是使用 std::unique_lock 对互斥量上锁的例子:
#include <mutex>
#include <thread>
#include <iostream>
#include <vector>
#include <chrono>
int main()
{
int counter = 0;
std::mutex counter_mutex;
std::vector<std::thread> threads;
auto worker_task = [&](int id) {
// don't hold the lock while we simulate an expensive operation
std::this_thread::sleep_for(std::chrono::seconds(1));
std::unique_lock<std::mutex> lock(counter_mutex); // 对互斥量上锁
++counter;
std::cout << id << ", initial counter: " << counter << '\n';
lock.unlock(); // 对互斥量解锁
// don't hold the lock while we simulate an expensive operation
std::this_thread::sleep_for(std::chrono::seconds(1));
lock.lock(); // 对互斥量上锁
++counter;
std::cout << id << ", final counter: " << counter << '\n';
}; // 超过 scope,会对互斥量解锁
for (int i = 0; i < 10; ++i) threads.emplace_back(worker_task, i);
for (auto &thread : threads) thread.join();
}
如果在创建 std::unique_lock 对象时指定 std::defer_lock ,则表示暂时不上锁,如上面代码也可以写为:
#include <mutex>
#include <thread>
#include <iostream>
#include <vector>
#include <chrono>
int main()
{
int counter = 0;
std::mutex counter_mutex;
std::vector<std::thread> threads;
auto worker_task = [&](int id) {
std::unique_lock<std::mutex> lock(counter_mutex, std::defer_lock); // std::defer_lock 表示暂时不上锁
// don't hold the lock while we simulate an expensive operation
std::this_thread::sleep_for(std::chrono::seconds(1));
lock.lock(); // 对互斥量上锁
++counter;
std::cout << id << ", initial counter: " << counter << '\n';
lock.unlock(); // 对互斥量解锁
// don't hold the lock while we simulate an expensive operation
std::this_thread::sleep_for(std::chrono::seconds(1));
lock.lock(); // 对互斥量上锁
++counter;
std::cout << id << ", final counter: " << counter << '\n';
}; // 超过 scope,会对互斥量解锁
for (int i = 0; i < 10; ++i) threads.emplace_back(worker_task, i);
for (auto &thread : threads) thread.join();
}
3.1.3.2. try lock
std::unique_lock 的 try lock 场景,有两种使用方式。
方式 1:创建 std::unique_lock 对象时,指定 std::try_to_lock ,如:
std::unique_lock<std::mutex> lock(_mutex, std::try_to_lock); // 创建 std::unique_lock 对象时,指定 std::try_to_lock
if(lock.owns_lock()){ // owns_lock() 测试 try_to_lock 是否成功
// mutex was locked.
} else {
// mutex wasn't locked.
}
方式 2:创建 std::unique_lock 对象时不加锁,调用 try_lock() 方法尝试加锁,如:
std::unique_lock<std::mutex> lock(mtx, std::defer_lock); // 创建 std::unique_lock 对象时,先不上锁
if(lock.try_lock()) { // 使用 try_lock() 尝试上锁
// mutex was locked.
} else {
// mutex wasn't locked.
}
3.1.3.3. 带有超时的 try lock
try_lock_for 和 try_lock_until 都是带有超时的 try lock,它们的区别在于 try_lock_for 需要指定“相对时间”,而 try_lock_until 需要指定“绝对时间”。下面是 try_lock_for 的使用例子:
// unique_lock::try_lock_for example
#include <iostream> // std::cout
#include <chrono> // std::chrono::milliseconds
#include <thread> // std::thread
#include <mutex> // std::timed_mutex, std::unique_lock, std::defer_lock
std::timed_mutex mtx;
void fireworks () {
std::unique_lock<std::timed_mutex> lck(mtx,std::defer_lock);
// waiting to get a lock: each thread prints "-" every 200ms:
while (!lck.try_lock_for(std::chrono::milliseconds(200))) {
std::cout << "-";
}
// got a lock! - wait for 1s, then this thread prints "*"
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
std::cout << "*\n";
}
int main ()
{
std::thread threads[10];
// spawn 10 threads:
for (int i=0; i<10; ++i)
threads[i] = std::thread(fireworks);
for (auto& th : threads) th.join();
return 0;
}
3.1.3.4. 转移互斥量所有权
std::unique_lock 实例中的互斥量的所有权可以通过移动操作,在不同的实例中进行传递。
std::unique_lock<std::mutex> get_lock()
{
extern std::mutex some_mutex;
std::unique_lock<std::mutex> lk(some_mutex); // 加锁
prepare_data();
return lk; // lk是自动变量,不需要调用std::move(),编译器负责调用移动构造函数
}
void process_data()
{
std::unique_lock<std::mutex> lk(get_lock()); // 得到了 get_lock() 中的互斥量的所有权
do_something();
} // 超过 scope,解锁
3.2. 保护共享数据的其它设施
互斥量是一种通用的机制,但其并非保护共享数据的唯一方式。有很多方式可以在特定情况下,对共享数据提供合适的保护。
3.2.1. 在初始化过程中保护数据
假设有一个共享源,其初始化的代价很昂贵,它可能会打开一个数据库连接或分配出很多的内存。
下面是单线程代码:
std::shared_ptr<some_resource> resource_ptr;
void foo()
{
if(!resource_ptr) // 检测没有初始化,就初始化
{
resource_ptr.reset(new some_resource); // 初始化
}
resource_ptr->do_something();
}
如果要改为多线程安全的代码,则可以这样:
std::shared_ptr<some_resource> resource_ptr;
std::mutex resource_mutex;
void foo()
{
std::unique_lock<std::mutex> lk(resource_mutex); // 所有线程在此序列化
if(!resource_ptr)
{
resource_ptr.reset(new some_resource);
}
lk.unlock();
resource_ptr->do_something();
}
有人可能会用“双重检查锁模式”来避免每次获得锁:
// 下面是错误的实现,有竞争条件
void undefined_behaviour_with_double_checked_locking()
{
if(!resource_ptr) // 一次检查
{
std::lock_guard<std::mutex> lk(resource_mutex);
if(!resource_ptr) // 二次检查
{
resource_ptr.reset(new some_resource);
}
}
resource_ptr->do_something();
}
不过,上面的实现是存在竞争条件的。
假设有两个线程 A 和 B。线程 A 发现 resource_ptr 指针已经不是 NULL,并不意味着可以安全地调用 do_something()。因为可能另外一个线程 B 刚刚完成对指针写入,线程 A 可能没有看到新创建的 some_resource 实例,然后调用 do_something() 后,得到不正确的结果。关于这个问题,可以参考 https://www.aristeia.com/Papers/DDJ_Jul_Aug_2004_revised.pdf
3.2.1.1. std::once_flag,std::call_once
C++ 标准库提供了 std::once_flag 和 std::call_once 来处理“只需调用一次的情况”。比起锁住互斥量并显式的检查指针,每个线程只需要使用 std::call_once 就可以,在 std::call_once 的结束时,就能安全的知晓指针已经被其他的线程初始化了。如:
std::shared_ptr<some_resource> resource_ptr;
std::once_flag resource_flag; // 定义 std::once_flag
void init_resource()
{
resource_ptr.reset(new some_resource);
}
void foo()
{
std::call_once(resource_flag,init_resource); // 可以完整的进行一次初始化
resource_ptr->do_something();
}
std::call_once 比使用 mutex 的开销更小。
3.2.1.2. static 变量
在 C++11 之前,static 变量的初始化存在潜在的竞争条件:变量声明为 static 时,声明后就完成了初始化,一个线程完成了初始化,其他线程仍会抢着定义这个变量(每个线程都认为他们是第一个初始化这个变量的线程)。为此, C++11 规定 static 变量的初始化只完全发生在一个线程中,直到初始化完成前其他线程都不会做处理,从而避免了竞争条件。
在只需要一个全局实例情况下,这里提供一个 std::call_once 的替代方案:
class my_class;
my_class& get_my_class_instance()
{
static my_class instance; // 线程安全的初始化过程
return instance;
}
3.2.2. 读写锁(std::shared_lock)
从 C++14 开始有了读写锁。C++14 中提供了 std::shared_timed_mutex ,C++17 中提供了 std::shared_mutex~,这两者的区别在于 ~std::shared_timed_mutex 支持更多的操作(如带有超时的加锁),而 std::shared_mutex 性能更好。
使用 std::shared_lock 可以获得 std::shared_timed_mutex/std::shared_mutex 上的“读锁”;而使用 std::unique_lock (也可以是 std::lock_guard )可以获得 std::shared_timed_mutex/std::shared_mutex 上的“写锁”。下面是使用例子:
class A {
private:
mutable std::shared_mutex m;
int n = 0;
public:
int read()
{
std::shared_lock<std::shared_mutex> lck(m);
return n;
}
void write()
{
std::unique_lock<std::shared_mutex> lck(m); // 也可以 std::lock_guard<std::shared_mutex> lck(m);
++n;
}
};
3.2.3. 嵌套锁(std::recursive_mutex)
线程对已经上锁的 std::mutex 再次上锁是错误的,尝试这样做会导致未定义行为。如:
// 错误的例子
#include <thread>
#include <mutex>
std::mutex m; // 这个例子中改为 std::recursive_mutex m; 可消除未定义行为
void f()
{
m.lock();
m.unlock();
}
void g()
{
m.lock();
f(); // std::mutex m 已经加锁了,不能再加锁。这导致未定义行为(可能死锁)
m.unlock();
}
int main()
{
std::thread t(g);
t.join();
}
C++ 提供了 std::recursive_mutex ,它可以在一个线程上多次获取锁,当然每获得一次锁都要对应一次解锁。
4. 同步操作
4.1. 等待事件或条件
一个线程如何等待另一个线程完成任务呢?一种办法是每隔一段时间就检查某个完成标志。如:
bool flag; // 如果另一线程任意,则设置 flag 为 true
std::mutex m; // 对 flag 的访问用 m 保护
void wait_for_flag()
{
std::unique_lock<std::mutex> lk(m);
while(!flag)
{
lk.unlock();
std::this_thread::sleep_for(std::chrono::milliseconds(100)); // 休眠100ms,再检查
lk.lock();
}
}
这种方法不好,因为很难确定合适的休眠时间。
C++ 中提供了“条件变量”来通知某事件的发生,下节将介绍它们。
4.1.1. 使用条件变量“等待条件达成”
C++ 标准库对条件变量有两套实现: std::condition_variable 和 std::condition_variable_any ,前者仅能与 std::mutex 一起工作,而后者可以和合适的互斥量一起工作,从而加上了 _any 的后缀。 std::condition_variable_any 更加通用,不过在性能和系统资源的使用方面会有更多的开销,所以通常会将 std::condition_variable 作为首选类型。当对灵活性有要求时,才会考虑 std::condition_variable_any 。
下面介绍 std::condition_variable 的使用:
#include <iostream>
#include <condition_variable>
#include <thread>
#include <chrono>
std::condition_variable cv;
std::mutex cv_m; // This mutex is used for three purposes:
// 1) to synchronize accesses to i
// 2) to synchronize accesses to std::cerr
// 3) for the condition variable cv
int i = 0;
void waits()
{
std::unique_lock<std::mutex> lk(cv_m);
std::cerr << "Waiting... \n";
// cv.wait 功能:
// 1、解锁 lk 关联的 cv_m(调用 cv.wait 前,cv_m 必须是上锁的状态);
// 2、阻塞线程直到被其它线程通过条件变量 cv 唤醒它;
// 3、再次对 cv_m 加锁;
// 4、检测条件 i == 1 是否满足,如果满足就往下走;如果不满足则重新回到步骤 1。
cv.wait(lk, []{return i == 1;});
std::cerr << "...finished waiting. i == 1\n";
}
void signals()
{
std::this_thread::sleep_for(std::chrono::seconds(1));
{
std::lock_guard<std::mutex> lk(cv_m);
std::cerr << "Notifying...\n";
}
cv.notify_all(); // 第一次 notify_all();
std::this_thread::sleep_for(std::chrono::seconds(1));
{
std::lock_guard<std::mutex> lk(cv_m);
i = 1;
std::cerr << "Notifying again...\n";
}
cv.notify_all(); // 第二次 notify_all();
}
int main()
{
std::thread t1(waits), t2(waits), t3(waits), t4(signals);
t1.join();
t2.join();
t3.join();
t4.join();
}
编译并执行上面代码,会输出:
Waiting... Waiting... Waiting... Notifying... Notifying again... ...finished waiting. i == 1 ...finished waiting. i == 1 ...finished waiting. i == 1
从输出中可以知道,第一次调用 cv.notify_all(); 并没有唤醒另外 3 个线程,这是因为第一次调用 cv.notify_all(); 时, i == 1 这个条件并没有满足;第二次调用 cv.notify_all(); 时唤醒了另外 3 个线程。
4.1.2. 使用条件变量实现线程安全的队列
下面是使用条件变量实现线程安全的队列的例子:
#include <queue>
#include <memory>
#include <mutex>
#include <condition_variable>
template<typename T>
class threadsafe_queue
{
private:
mutable std::mutex mut; // 互斥量成员必须为mutable,才能在empty() 和拷贝构造函数中进行上锁
std::queue<T> data_queue;
std::condition_variable data_cond;
public:
threadsafe_queue()
{}
threadsafe_queue(threadsafe_queue const& other) // other 是 const 的,mut 必须是 mutable 才能上锁
{
std::lock_guard<std::mutex> lk(other.mut);
data_queue=other.data_queue;
}
void push(T new_value)
{
std::lock_guard<std::mutex> lk(mut);
data_queue.push(new_value);
data_cond.notify_one();
}
void wait_and_pop(T& value) // 等待有值时才返回
{
std::unique_lock<std::mutex> lk(mut);
data_cond.wait(lk,[this]{return !data_queue.empty();});
value=data_queue.front();
data_queue.pop();
}
std::shared_ptr<T> wait_and_pop() // 等待有值时才返回
{
std::unique_lock<std::mutex> lk(mut);
data_cond.wait(lk,[this]{return !data_queue.empty();});
std::shared_ptr<T> res(std::make_shared<T>(data_queue.front()));
data_queue.pop();
return res;
}
bool try_pop(T& value) // 尝试从队列中弹出数据,如要没有值,也会直接返回
{
std::lock_guard<std::mutex> lk(mut);
if(data_queue.empty())
return false;
value=data_queue.front();
data_queue.pop();
return true;
}
std::shared_ptr<T> try_pop() // 尝试从队列中弹出数据,如要没有值,也会直接返回
{
std::lock_guard<std::mutex> lk(mut);
if(data_queue.empty())
return std::shared_ptr<T>();
std::shared_ptr<T> res(std::make_shared<T>(data_queue.front()));
data_queue.pop();
return res;
}
bool empty() const // empty() 标记成了 const,所以 mut 必须是 mutable 才能上锁
{
std::lock_guard<std::mutex> lk(mut);
return data_queue.empty();
}
};
4.2. 使用 future
4.2.1. 后台任务的返回值
当不着急让任务结果时,可以使用 std::async 启动一个异步任务。与 std::thread 对象等待的方式不同, std::async 会返回一个 std::future 对象,这个对象持有最终计算出来的结果。当需要这个值时,只需要调用这个对象的 get() 成员函数,就会阻塞线程直到 future 为就绪为止,并返回计算结果。
下面是 future 的使用例子:
#include <future>
#include <thread>
#include <iostream>
int find_the_answer()
{
std::this_thread::sleep_for(std::chrono::seconds(3));
return 100;
}
void do_other_stuff()
{
std::this_thread::sleep_for(std::chrono::seconds(1));
}
int main()
{
std::future<int> the_answer = std::async(find_the_answer); // std::async 会返回 std::future
do_other_stuff();
std::cout<<"do_other_stuff finished."<<std::endl;
std::cout<<"The answer is "<<the_answer.get()<<std::endl;
}
与 std::thread 方式一样, std::async 允许通过添加额外的调用参数,向函数传递额外的参数。如:
#include <string>
#include <future>
struct X
{
void foo(int,std::string const&);
std::string bar(std::string const&);
};
X x;
auto f1=std::async(&X::foo,&x,42,"hello"); // 调用 p->foo(42, "hello"),p 是指向 x 的指针
auto f2=std::async(&X::bar,x,"goodbye"); // 调用 tmpx.bar("goodbye"), tmpx 是 x 的拷贝副本
struct Y
{
double operator()(double);
};
Y y;
auto f3=std::async(Y(),3.141); // 调用 tmpy(3.141),tmpy 通过 Y 的移动构造函数得到
auto f4=std::async(std::ref(y),2.718); // 调用 y(2.718)
X baz(X&);
std::async(baz,std::ref(x)); // 调用 baz(x)
4.2.2. 指定执行后台任务的方式(std::launch::deferred,std::launch::async)
在使用 std::async 启动后台任务时,可以在第 1 个参数中指定后台任务的执行方式。如:
int f(); auto ft1 = std::async(std::launch::async, f); // 函数 f 在不同的线程上异步执行 auto ft2 = std::async(std::launch::deferred, f); // 函数 f 在当前线程中延迟执行,在返回的 future 上调用get() 或wait() 时才运行 auto ft3 = std::async(std::launch::async | std::launch::deferred, f); // 由实现选择执行方式 auto ft4 = std::async(f) // 同上,则实现选择执行方式
4.2.3. std::packaged_task
std::async 并不是得到 std::future 的唯一方式,我们还可以使用 std::packaged_task 让 std::future 和某个任务进行关联。如:
#include <iostream>
#include <cmath>
#include <thread>
#include <future>
#include <functional>
// unique function to avoid disambiguating the std::pow overload set
int f(int x, int y) { return std::pow(x,y); }
void task_lambda()
{
std::packaged_task<int(int,int)> task([](int a, int b) {
return std::pow(a, b);
});
std::future<int> result = task.get_future(); // 得到 future
task(2, 9); // 调用 std::packaged_task 对象,future 会设置上对应值,变为“就绪状态”
std::cout << "task_lambda:\t" << result.get() << '\n';
}
void task_bind()
{
std::packaged_task<int()> task(std::bind(f, 2, 11));
std::future<int> result = task.get_future();
task(); // 调用 std::packaged_task 对象,future 会设置上对应值,变为“就绪状态”
std::cout << "task_bind:\t" << result.get() << '\n';
}
void task_thread()
{
std::packaged_task<int(int,int)> task(f);
std::future<int> result = task.get_future();
std::thread task_td(std::move(task), 2, 10);
task_td.join();
std::cout << "task_thread:\t" << result.get() << '\n';
}
int main()
{
task_lambda();
task_bind();
task_thread();
}
4.2.4. std::promises
future 是“未来值”,而 promise 可以认为是“未来值的占位符”。下面是 promise 的例子:
#include <future>
#include <thread>
#include <iostream>
int main()
{
std::promise<int> ps;
std::future<int> ft = ps.get_future();
ps.set_value(100); // “未来值”确定后就不能再改变,set_value 只能调用一次
std::cout << ft.get(); // 输出 100
}
下面再介绍一个 promise 的例子:
#include <vector>
#include <thread>
#include <future>
#include <numeric>
#include <iostream>
#include <chrono>
void accumulate(std::vector<int>::iterator first,
std::vector<int>::iterator last,
std::promise<int> accumulate_promise)
{
int sum = std::accumulate(first, last, 0);
accumulate_promise.set_value(sum); // Notify future
}
int main()
{
// Demonstrate using promise<int> to transmit a result between threads.
std::vector<int> numbers = { 1, 2, 3, 4, 5, 6 };
std::promise<int> accumulate_promise;
std::future<int> accumulate_future = accumulate_promise.get_future();
std::thread work_thread(accumulate, numbers.begin(), numbers.end(),
std::move(accumulate_promise));
// future::get() will wait until the future has a valid result and retrieves it.
std::cout << "result=" << accumulate_future.get() << '\n'; // 输出 result=21
work_thread.join();
}
4.2.5. future 和 promise 中的异常
先看一下 future 中的异常:
#include <thread>
#include <future>
#include <iostream>
#include <math.h>
double square_root(double x)
{
if(x<0)
{
throw std::out_of_range("x < 0"); // 抛出异常
}
return sqrt(x);
}
int main()
{
std::future<double> f=std::async(square_root, -1); // square_root 抛出的异常会保存在 future 中,调用 get() 时会获得这个异常
double y;
try {
y=f.get(); // 如果 square_root 抛出异常,那么 get() 也会抛出异常
} catch (const std::exception& e) {
std::cout << "Caught exception \"" << e.what() << "\"\n";
}
}
在 promise 中,使用 set_exception() 可以设置异常,如:
extern std::promise<double> some_promise;
try
{
some_promise.set_value(square_root(-1));
}
catch(...)
{
some_promise.set_exception(std::current_exception());
}
4.2.6. 析构前需要设置值
如果 std::packaged_task 和 std::promise 直到析构都未设置值, std::future::get 会抛出异常。
下面是 std::packaged_task 直到析构都未设置值的例子:
#include <thread>
#include <future>
int f() {return 100;}
int main()
{
std::future<int> ft;
{
std::packaged_task<int()> pt(f);
ft = pt.get_future();
// pt(); // 把这行的注释取消,后面的 ft.get() 就不会抛出异常了
} // pt 超过 scope,会析构
ft.get(); // 会抛出异常,因为 pt 析构了,future ft 也没有设置值
}
下面是 std::promise 直到析构都未设置值的例子:
#include <thread>
#include <future>
int main()
{
std::future<int> ft;
{
std::promise<int> ps;
ft = ps.get_future();
// ps.set_value(200); // 把这行的注释取消,后面的 ft.get() 就不会抛出异常了
} // ps 超过 scope,会析构
ft.get(); // 会抛出异常,因为 ps 析构了,future ft 也没有设置值
}
4.2.7. std::shared_future(多次调用 get)
std::future 调用 get() 后就无法再次调用 get() ,也就是说只能获取一次数据。如:
#include <future>
#include <thread>
#include <iostream>
int main()
{
std::promise<int> ps;
std::future<int> ft = ps.get_future();
ps.set_value(100);
std::cout << ft.get(); // 输出 100
std::cout << ft.get(); // 这里会出错,因为 future 的 get() 只能调用一次
}
如果使用 std::shared_future ,则可以多次调用 get() ,如:
#include <future>
#include <thread>
#include <iostream>
int main()
{
std::promise<int> ps;
std::shared_future<int> ft(ps.get_future()); // 这行也可以改为: auto ft = ps.get_future().share();
ps.set_value(100);
std::cout << ft.get(); // 输出 100
std::cout << ft.get(); // 输出 100 (ft 是 std::shared_future ,可以调用多次 get)
}
5. 并行算法
C++17 为标准库添加了并行算法。它是对之前已存在的一些标准算法的重载,例如: std::find , std::transform 和 std::reduce 等。并行版本的签名除了和单线程版本的函数签名相同之外,在参数列表的中新添加了一个参数(第一个参数)——指定要使用的执行策略。例如:
std::vector<int> my_data; std::sort(std::execution::par, my_data.begin(), my_data.end());
std::execution::par 是执行策略,表示使用多个线程调用此并行算法。
5.1. 执行策略
C++17 中定义了 3 个执行策略(它们是 C++ 类):
- std::execution::sequenced_policy
- std::execution::parallel_policy
- std::execution::parallel_unsequenced_policy
它们定义在头文件 <execution> 中,该头文件还分别为每个执行策略类定义了相应的对象:
- std::execution::seq
- std::execution::par
- std::execution::par_unseq
5.1.1. std::execution::sequenced_policy
std::execution::sequenced_policy 策略要求相关操作“顺序地”在当前调用线程上执行,它不是一个“并行”策略。注意,它只是要求“顺序地”(一个一个地)执行,但并没有规定具体的顺序。下面是一个例子:
std::vector<int> v(1000);
int n = 0;
std::for_each(std::execution::seq, v.begin(), v.end(), [&](int& x){ x = ++n; }); // 1 到 1000 存在 v 中,但顺序是未指定的
5.1.2. std::execution::parallel_policy
std::execution::parallel_policy 策略提供了基本的跨多个线程的并行执行,操作可以执行在调用算法的线程上,或执行在由库创建的线程上。
下面该策略是使用例子:
std::for_each(std::execution::par, v.begin(), v.end(), [](auto& x){ ++x; }); // 对 v 中每个元素加 1,并行执行
由于是并行执行,所有有共享数据访问时可能有竞争条件,如:
// 这是有问题的代码,对 count 的访问有竞争!
std::vector<int> v(1000);
int count = 0;
std::for_each(std::execution::par, v.begin(), v.end(), [&](int& x){ x=++count; });
要解决上面问题,可以在操作 count 前加锁,如:
std::vector<int> v(1000);
int count = 0;
std::mutex m;
std::for_each(std::execution::par, v.begin(), v.end(), [&](int& x){
std::lock_guard<std::mutex> guard(m);
x=++count;
});
此外,还可以把 count 修改为原子变量。
6. 参考
本文主要摘自:
C++ Concurrency in Action, 2ed
文中的一些代码实例摘自:
https://en.cppreference.com/w/
http://www.cplusplus.com/reference/