来源:C++ 11 多线程
课程介绍
并发:同一时间间隔
并行:同一时刻
多进程:进程间通信:文件、管道、消息队列
多进程:共享内存
C++多线程库<thread>
创建一个线程
thread t(callable);
其中callable
为可调用对象
void greeting() {
cout << "hello multithread" << endl;
}
int main() {
thread t(greeting);
t.join();
return 0;
}
t.join()
表示主线程等待子线程结束后运行
如果主线程不等待子线程结束,使用detach
void greeting() {
cout << "hello multithread" << endl;
}
int main() {
thread t(greeting);
t.detach();
return 0;
}
由于主线程执行太快,子线程还没有执行,主线程就已经退出
不能同时对一个线程调用detach()
和join()
void greeting() {
cout << "hello multithread" << endl;
}
int main() {
thread t(greeting);
t.join();
t.detach();
return 0;
}
可以使用函数判断能否join
int main() {
thread t(greeting);
if (t.joinable())
t.join();
return 0;
}
线程管理
// 考虑下面的例子
void greeting() {
cout << "hello multithread" << endl;
}
int main() {
thread t(greeting);
for(int i=0;i<100;++i){
cout<<"from main: "<< i <<endl;
}
t.join();
return 0;
}
上面的代码,如果for循环中抛出异常,在join
前t
就会被销毁,可以修改为
void greeting() {
cout << "hello multithread" << endl;
}
int main() {
thread t(greeting);
try {
for (int i = 0; i<100; ++i) {
cout << "from main: " << i << endl;
}
}
catch (...) {
t.join();
throw;
}
t.join();
return 0;
}
这样,即使抛出了异常,也会调用join
可以通过可调用对象创建线程
public:
void operator()(string& msg) {
for (int i = 0; i < 1; ++i)
cout << "from t1: " << msg << endl;
msg = "I love HouZongliang";
}
};
int main() {
string s = "I love Houzi";
thread t((Fator()), s);
t.join();
cout << s << endl;
return 0;
}
上面的形参是引用,但我们发现,尽管修改了msg,但是实参并没有被修改。如果要修改参数的话,需要这样传参。
class Fator {
public:
void operator()(string& msg) {
for (int i = 0; i < 1; ++i)
cout << "from t1: " << msg << endl;
msg = "I love Cpp";
}
};
int main() {
string s = "I love Houzi";
thread t((Fator()), ref(s));
t.join();
cout << s << endl;
return 0;
}
如果主线程中不需要再使用s,可以用move把s变成右值
class Fator {
public:
void operator()(string& msg) {
for (int i = 0; i < 1; ++i)
cout << "from t1: " << msg << endl;
msg = "I love Cpp";
}
};
int main() {
string s = "I love Houzi";
thread t((Fator()), move(s));
t.join();
cout << s << endl;
return 0;
}
可以发现s已经是空值了。
标准库中有很多不能复制,只能移动的类型,比如thread
//下面的会报错
//thread t1 = t;
thread t1 = move(t);
下面的函数给出了最大可创建的线程数
int main() {
//string s = "I love Houzi";
//thread t((Fator()), move(s));
//t.join();
cout << std::thread::hardware_concurrency() << endl;
return 0;
}
数据竞争与互斥对象
下面的代码,打印出来的结果往往是乱的
void greeting() {
for(int i=100;i>0;--i)
std::cout << "from t1: "<< i << std::endl;
return;
}
int main() {
thread t(greeting);
for (int i = 100; i>0; --i)
std::cout << "from main" << i << std::endl;
t.join();
return 0;
}
原因是不同的线程竞争资源cout。
可以使用互斥对象来同步资源的访问。
#include <mutex>
std::mutex mu;
void shared_print(string msg, int id) {
mu.lock();
cout << msg << " " << id << endl;
mu.unlock();
}
void greeting() {
for(int i=100;i>0;--i)
shared_print("from t1: ", i);
return;
}
int main() {
thread t(greeting);
for (int i = 100; i>0; --i)
shared_print("from main", i);
t.join();
return 0;
}
但是cout << msg << " " << id << endl;
可能会抛出异常。使用类lock_guard<mutex>
来管理互斥锁
void shared_print(string msg, int id) {
//mu.lock();
lock_guard<mutex> guard(mu);
cout << msg << " " << id << endl;
//mu.unlock();
}
当guard
析构的时候,锁会被释放。
这个线程还有一个问题,cout是一个全局的变量。其他函数可以在不加锁的情况下使用cout。
class LofFile {
mutex m_mutex;
ofstream f;
public:
LofFile() {
f.open("log.txt");
}
void shared_print(string id, int val) {
lock_guard<mutex> guard(m_mutex);
f << "From " << id << ": " << val << endl;
}
};
void greeting(LofFile& log) {
for(int i=100;i>0;--i)
log.shared_print("from t1: ", i);
return;
}
int main() {
LofFile log;
thread t(greeting,ref(log));
for (int i = 100; i>0; --i)
log.shared_print("from main", i);
t.join();
return 0;
}
上面的例子中,f始终在mutex的保护之内,但是我们不能把f返回给类外,或者把f传给其他参数,比如
ofstream& GetStream() { return f; }
或者
void processf(void fun(ofstream&)){
fun(f);
}
死锁
假设有下面的代码
class LofFile {
mutex m_mutex;
mutex m_mutex2;
ofstream f;
public:
LofFile() {
f.open("log.txt");
}
void shared_print(string id, int val) {
lock_guard<mutex> guard(m_mutex);
lock_guard<mutex> guard2(m_mutex2);
cout << "From " << id << ": " << val << endl;
}
void shared_print2(string id, int val) {
lock_guard<mutex> guard2(m_mutex2);
lock_guard<mutex> guard(m_mutex);
cout << "From " << id << ": " << val << endl;
}
};
void greeting(LofFile& log) {
for(int i=100;i>0;--i)
log.shared_print("t1: ", i);
return;
}
int main() {
LofFile log;
thread t(greeting,ref(log));
for (int i = 100; i>0; --i)
log.shared_print2("main", i);
t.join();
return 0;
}
运行结果:
避免方式:
- 申请资源顺序相同
class LofFile {
mutex m_mutex;
mutex m_mutex2;
ofstream f;
public:
LofFile() {
f.open("log.txt");
}
void shared_print(string id, int val) {
lock_guard<mutex> guard(m_mutex);
lock_guard<mutex> guard2(m_mutex2);
cout << "From " << id << ": " << val << endl;
}
void shared_print2(string id, int val) {
lock_guard<mutex> guard(m_mutex);
lock_guard<mutex> guard2(m_mutex2);
cout << "From " << id << ": " << val << endl;
}
};
void greeting(LofFile& log) {
for(int i=100;i>0;--i)
log.shared_print("t1: ", i);
return;
}
int main() {
LofFile log;
thread t(greeting,ref(log));
for (int i = 100; i>0; --i)
log.shared_print2("main", i);
t.join();
return 0;
}
- 使用C++提供的lock
此时adopt_lock是为了告诉lock_guard已经锁了,只需要获取试用权,并释放
class LofFile {
mutex m_mutex;
mutex m_mutex2;
ofstream f;
public:
LofFile() {
f.open("log.txt");
}
void shared_print(string id, int val) {
lock(m_mutex, m_mutex2);
lock_guard<mutex> guard(m_mutex,std::adopt_lock);
lock_guard<mutex> guard2(m_mutex2, std::adopt_lock);
cout << "From " << id << ": " << val << endl;
}
void shared_print2(string id, int val) {
lock(m_mutex, m_mutex2);
lock_guard<mutex> guard2(m_mutex2, std::adopt_lock);
lock_guard<mutex> guard(m_mutex, std::adopt_lock);
cout << "From " << id << ": " << val << endl;
}
};
void greeting(LofFile& log) {
for(int i=100;i>0;--i)
log.shared_print("t1: ", i);
return;
}
int main() {
LofFile log;
thread t(greeting,ref(log));
for (int i = 100; i>0; --i)
log.shared_print2("main", i);
t.join();
return 0;
}
- 避免使用多个锁
Unique Lock和Lazy Initialization
另一种加锁的方式
class LofFile {
mutex m_mutex;
ofstream f;
public:
LofFile() {
f.open("log.txt");
}
void shared_print(string id, int val) {
std::unique_lock<mutex> locker(m_mutex);
cout << "From " << id << ": " << val << endl;
locker.unlock();
//后面的操作不需要加锁
}
};
void greeting(LofFile& log) {
for(int i=100;i>0;--i)
log.shared_print("t1: ", i);
return;
}
int main() {
LofFile log;
thread t(greeting,ref(log));
for (int i = 100; i>0; --i)
log.shared_print("main", i);
t.join();
return 0;
}
unique_lock还有其他灵活的操作
class LofFile {
mutex m_mutex;
ofstream f;
public:
LofFile() {
f.open("log.txt");
}
void shared_print(string id, int val) {
std::unique_lock<mutex> locker(m_mutex,std::defer_lock);
//...目前还没有加锁
locker.lock();
cout << "From " << id << ": " << val << endl;
locker.unlock();
//后面的操作不需要加锁
}
};
void greeting(LofFile& log) {
for(int i=100;i>0;--i)
log.shared_print("t1: ", i);
return;
}
int main() {
LofFile log;
thread t(greeting,ref(log));
for (int i = 100; i>0; --i)
log.shared_print("main", i);
t.join();
return 0;
}
lazy initialization
如果需要保证,在shared_print
函数调用的时候才打开文件,需要怎么做?
下面的代码不是线程安全的,线程1发现文件没有打开,加锁,线程2也发现没有打开,加锁并阻塞,线程1打开后释放锁,线程2重新打开了一次。
class LofFile {
mutex m_mutex;
mutex m_mutex_open;
ofstream f;
public:
LofFile() {
//f.open("log.txt");
}
void shared_print(string id, int val) {
if (!f.is_open()) {
std::unique_lock<mutex> locker(m_mutex_open, std::defer_lock);
f.open("log.txt");
}
std::unique_lock<mutex> locker(m_mutex,std::defer_lock);
cout << "From " << id << ": " << val << endl;
}
};
void greeting(LofFile& log) {
for(int i=100;i>0;--i)
log.shared_print("t1: ", i);
return;
}
int main() {
LofFile log;
thread t(greeting,ref(log));
for (int i = 100; i>0; --i)
log.shared_print("main", i);
t.join();
return 0;
}
因此需要对is_open
函数也加锁。
class LofFile {
mutex m_mutex;
mutex m_mutex_open;
ofstream f;
public:
LofFile() {
//f.open("log.txt");
}
void shared_print(string id, int val) {
{
std::unique_lock<mutex> locker(m_mutex_open, std::defer_lock);
if(!f.is_open()) {
f.open("log.txt");
}
}
std::unique_lock<mutex> locker(m_mutex,std::defer_lock);
cout << "From " << id << ": " << val << endl;
}
};
void greeting(LofFile& log) {
for(int i=100;i>0;--i)
log.shared_print("t1: ", i);
return;
}
int main() {
LofFile log;
thread t(greeting,ref(log));
for (int i = 100; i>0; --i)
log.shared_print("main", i);
t.join();
return 0;
}
但是这样会导致每次调用都创建一个unique_lock然后判断文件是否打开,占用资源。
C++标准库提供了方案。
class LofFile {
mutex m_mutex;
std::once_flag m_flag;
ofstream f;
public:
LofFile() {
//f.open("log.txt");
}
void shared_print(string id, int val) {
std::call_once(m_flag, [&]() {f.open("log.txt"); });
std::unique_lock<mutex> locker(m_mutex);
cout << "From " << id << ": " << val << endl;
}
};
void greeting(LofFile& log) {
for(int i=100;i>0;--i)
log.shared_print("t1: ", i);
return;
}
int main() {
LofFile log;
thread t(greeting,ref(log));
for (int i = 100; i>0; --i)
log.shared_print("main", i);
t.join();
return 0;
}
条件变量
生产者-消费者模式中,下面的代码里,消费者会持续的循环,直到生产者sleep结束。
std::deque<int> q;
std::mutex mu;
void fun1() {
int cnt = 10;
while (cnt) {
std::unique_lock<std::mutex> locker(mu);
q.push_front(cnt);
locker.unlock();
std::this_thread::sleep_for(std::chrono::seconds(1));
--cnt;
}
}
void fun2() {
int data = 0;
while (data != 1) {
std::unique_lock<std::mutex> locker(mu);
if (!q.empty()) {
data = q.back();
q.pop_back();
locker.unlock();
std::cout << "t2 got a value from t1: " << data << std::endl;
}
else {
locker.unlock();
}
}
}
int main() {
std::thread t1(fun1);
std::thread t2(fun2);
t1.join();
t2.join();
return 0;
}
可以在队列为空的时候让消费者也sleep,但是不能确定其睡眠时间
使用标准库conditional_variable
std::deque<int> q;
std::mutex mu;
std::condition_variable cond;
void fun1() {
int cnt = 10;
while (cnt) {
std::unique_lock<std::mutex> locker(mu);
q.push_front(cnt);
locker.unlock();
cond.notify_one();
std::this_thread::sleep_for(std::chrono::seconds(1));
--cnt;
}
}
void fun2() {
int data = 0;
while (data != 1) {
std::unique_lock<std::mutex> locker(mu);
cond.wait(locker);
data = q.back();
q.pop_back();
locker.unlock();
std::cout << "t2 got a value from t1: " << data << std::endl;
}
}
int main() {
std::thread t1(fun1);
std::thread t2(fun2);
t1.join();
t2.join();
return 0;
}
cond.wait(locker)
需要先把锁住的mu解锁,然后休眠,然后重新加锁。由于需要重复加解锁,只能使用unique_lock
由于线程2可能被自己激活,需要判断伪激活的情况。wait里多加入一个参数
std::deque<int> q;
std::mutex mu;
std::condition_variable cond;
void fun1() {
int cnt = 10;
while (cnt) {
std::unique_lock<std::mutex> locker(mu);
q.push_front(cnt);
locker.unlock();
cond.notify_one();
std::this_thread::sleep_for(std::chrono::seconds(1));
--cnt;
}
}
void fun2() {
int data = 0;
while (data != 1) {
std::unique_lock<std::mutex> locker(mu);
cond.wait(locker, []() { return !q.empty(); });
data = q.back();
q.pop_back();
locker.unlock();
std::cout << "t2 got a value from t1: " << data << std::endl;
}
}
int main() {
std::thread t1(fun1);
std::thread t2(fun2);
t1.join();
t2.join();
return 0;
}