版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/u012750702/article/details/63731793
针对多线程中常见的生产者消费者问题,封装了一个类,和测试代码,方便日后直接套用。
具体来说就是多线程使用时候经常遇到的 “一个线程取数据,一个处理数据,一个保存或传递数据 的情况” 。
简单说一下程序功能,很简单的一个应用,就是从某指定文件夹路径下读取文件名为 0-19.txt 的文件内容(此处一个线程),输出到控制台(此处一个线程),并向 out 文件夹中再写入这些文件内容(此处一个线程)。其中读取线程为生产者,显示保存线程为消费者。一个共享链表作为临界区,读取线程向里面 push 数据,读取线程与保存线程均处理完后erase 该数据。
其中主要用到了C++11中提供的几个类:
std::thread 类用于创建线程,
std::mutex 是C++11中提供的互斥锁,调用方法try_lock()加锁,unlock()解锁,对象构造初始是处于未上锁状态。
上锁解锁操作,除了直接调用上述两个方法,还可以使用std::lock_guard std::unique_lock这两个类,这两个对比没仔细看过,下面的代码中仅用了lock_guard,感觉这种使用起来方便一点。
使用过程中,用 std::lock_guard 类创建对象,并添加互斥锁实例,构造时给互斥锁加锁,等到 lock_guard 对象析构时解锁。
std::mutex 是C++11中提供的互斥锁,调用方法try_lock()加锁,unlock()解锁,对象构造初始是处于未上锁状态。
上锁解锁操作,除了直接调用上述两个方法,还可以使用std::lock_guard std::unique_lock这两个类,这两个对比没仔细看过,下面的代码中仅用了lock_guard,感觉这种使用起来方便一点。
使用过程中,用 std::lock_guard 类创建对象,并添加互斥锁实例,构造时给互斥锁加锁,等到 lock_guard 对象析构时解锁。
结构体 File 定义的是本次要操作的主要内容的存储结构:
constent ---- 保存文件内容;
id ---- 文件编号;
haveShown ---- 该文件是否被显示;
haveWritten ---- 该文件是否被保存;
(上述两个标志均为 true 时可从临界区中删除此数据)。
MutliThread类中包含与多线程有关的操作方法、存储区和控制用变量:
static bool showFinish ---- 退出标志,此标志为 true 时表示显示线程执行结束;
static bool writeFinish ---- 退出标志,此标志为 true 时表示保存线程执行结束;
(上述两个标志均为 true 时才可跳出循环);
static std::list<File> fileList ---- 文件链表,用于存储待操作的文件,是三个线程间的临界区;
static std::mutex mutex ---- 用来给临界区上锁;
int nowId ---- 用于计数,供读取线程做控制用(每次从输入文件名列表中的读取哪个文件);
std::string outPath ---- 输出文件夹路径,提供给保存线程用(还有别的写法,这里是想做个参数传递的示例);
void initialization(std::string _outPath) ---- 对整个过程进行初始化,实例化此类后需要调用,主要是初始化文件输出路径,
首文件 id,退出标志;
static void readFile(std::vector<std::string>* path, voif* mThread) ---- 读取线程中操作的具体实现,参数列表中path为待操作的所有文件的全名, mThread为MutliThread类的一个实例(这个参数列表也有别的写法,这里也是想做个参数传递的示例);
static void showFile() ---- 显示线程中操作的具体实现;
static void writeFile(void* mThread) ---- 保存线程中操作的具体实现,mThread为MutliThread类的实例。
接下来我们来看看各个文件:
main.cpp文件
#include <iostream>
#include <sstream>
#include <fstream>
#include <string>
#include <thread>
#include "cmdLine.h"
#include "mutlithread.h"
int main(int argc, char *argv[])
{
std::string inputPath = "/media/add7/E/testData/testThreads/";
std::vector<std::string> fileNameList;
for(std::size_t i = 0; i < 20; ++i)
{
std::stringstream ss;
ss << i;
std::string s;
ss >> s;
std::string tempPath = inputPath + "/" + s + ".txt";
fileNameList.push_back(tempPath);
}
MutliThread myThread;
myThread.initialization("/media/add7/E/testData/testThreads/out/");
while(true)
{
if(MutliThread::writeFinish && MutliThread::showFinish)
break;
std::thread th1(MutliThread::readFile, &fileNameList, &myThread);
std::thread th2(MutliThread::showFile);
std::thread th3(MutliThread::writeFile, &myThread);
std::cout << "join th1" << std::endl;
th1.join();
std::cout << "join th2" << std::endl;
th2.join();
std::cout << "join th3" << std::endl;
th3.join();
}
std::cout << "threads exit !" << std::endl;
return 1;
}
测试过程为了省事,直接将文件命名为 0-19.txt,代码中首先生成fileNameList保存所有文件名,然后送去读取线程。
之后实例化了一个类MutliThread的实例myThread,主要是为了保存非临界区的变量,需要在线程操作时将此实例数据传递。
由于我这里使用的是join(),即主线程需要等待子线程执行完后方可执行,而且对于生产者消费者的问题来说,需要有循环操作存在,因此使用while(true)循环来进行循环读取显示,同时由于这个循环的存在,就需要标志位 writeFinish 和 showFinish 来判断是否可以跳出,判断方法就是根据文件的id号。
mutlithread.h文件
#ifndef MUTLITHREAD_H
#define MUTLITHREAD_H
#include <string>
#include <vector>
#include <list>
#include <mutex>
struct File{
std::string constent;
int id;
bool haveShown = false;
bool haveWritten = false;
};
class MutliThread
{
public:
MutliThread();
void initialization(std::string _outPath);
static void readFile(std::vector<std::string> *path, void* mThread);
static void showFile();
static void writeFile(void* mThread);
public:
static bool showFinish;
static bool writeFinish;
private:
static std::list<File> fileList;
static std::mutex mutex;
int nowId;
std::string outPath;
};
#endif // MUTLITHREAD_H
mutlithread.cpp文件
#include "mutlithread.h"
#include <iostream>
#include <fstream>
#include <sstream>
MutliThread::MutliThread()
{
}
void
MutliThread::initialization(std::string _outPath)
{
this->outPath = _outPath;
this->nowId = 0;
this->writeFinish = false;
this->showFinish = false;
}
void
MutliThread::readFile(std::vector<std::string>* path, void* mThread)
{
std::cout << "read file." << std::endl;
MutliThread* thisThread = (MutliThread*)mThread;
if(thisThread->nowId < path->size())
{
std::ifstream in;
in.open(path->at(thisThread->nowId));
if(!in)
{
std::cout << "open " << path->at(thisThread->nowId) << " failed!" << std::endl;
return;
}
File inF;
in >> inF.constent;
inF.id = thisThread->nowId;
{
std::lock_guard<std::mutex> guard(mutex);
fileList.push_back(inF);
}
in.close();
++thisThread->nowId;
}
}
void
MutliThread::showFile()
{
std::cout << "show file." << std::endl;
if(!fileList.begin()->haveShown)
{
std::lock_guard<std::mutex> guard(mutex);
std::cout << fileList.begin()->constent << std::endl;
fileList.begin()->haveShown = true;
if(fileList.begin()->id == 19)
showFinish = true;
}
}
void
MutliThread::writeFile(void* mThread)
{
std::cout << "write file." << std::endl;
if(!fileList.begin()->haveWritten)
{
MutliThread* thisThread = (MutliThread*)mThread;
std::ofstream out;
std::stringstream ss;
ss << fileList.begin()->id;
std::string s;
ss >> s;
std::string outFile = thisThread->outPath + "/" + s + ".txt";
out.open(outFile);
if(!out)
{
std::cout << "create " << outFile << " failed!" << std::endl;
return;
}
{
std::lock_guard<std::mutex> guard(mutex);
out << fileList.begin()->constent;
fileList.begin()->haveWritten = true;
}
out.close();
}
if(fileList.begin()->haveShown && fileList.begin()->haveWritten)
{
if(fileList.begin()->id == 19)
writeFinish = true;
std::lock_guard<std::mutex> guard(mutex);
fileList.erase(fileList.begin());
}
}
bool MutliThread::showFinish;
bool MutliThread::writeFinish;
std::list<File> MutliThread::fileList;
std::mutex MutliThread::mutex;
main函数中实例化的myThread以void类型传递进函数,函数中要先做类型转换,然后使用非临界区的数据。
----------------------------------------------------
如果只是单纯的多线程循环并行的问题,请看我关于OpenMP使用的文章
http://blog.csdn.net/u012750702/article/details/51476390
http://blog.csdn.net/u012750702/article/details/51769576