【转】使用面向对象编程封装Thread虚函数和函数对象

C++11提供了thread,但是过于复杂,我们还是倾向于在项目中编写自己的Thread。
Posix Thread的使用这里不再赘述。
重点是这个函数:

#include <pthread.h>
int pthread_create(pthread_t *thread, const pthread_attr_t *attr,
                          void *(*start_routine) (void *), void *arg);

第三个参数是一个回调函数,该函数必须返回值为void*,而且只有一个参数,类型也是void*。

POSIX的thread默认是joinable,需要手工调用pthread_join函数来回收,也可以调用pthread_detach将其变为detachable,此时不需要手工回收线程。

通过虚函数封装

下面介绍Thread的封装。
我们把Thread的声明先放在这里:

#ifndef THREAD_H_
#define THREAD_H_
#include "NonCopyable.h"
#include <pthread.h>
class Thread : NonCopyable
{
public:
    Thread();
    virtual ~Thread();
    void start();
    void join();
    virtual void run() = 0;
    pthread_t getThreadId() const
    { return threadId_; }

private:
      //提供给pthread_create的第三个参数使用
    static void *runInThread(void *arg);
    pthread_t threadId_;
    //pid_t tid_; //进程标示
    bool isRunning_;
};
#endif //THREAD_H_

这里需要说明的是:

  • 首先,为了获得最干净的语义,Thread应该是不可复制的,所以需要继承NonCopyable。
  • 其次,为了调用pthread_create创建线程,我们往里面注册的不能是一个成员函数,因为成员函数含有一个隐式参数,导致函数的指针类型并不是void *(*start_routine) (void *),所以我们采用了static函数。
  • static函数无法访问某一对象的成员,所以我们在调用pthread_create时,将this指针作为回调函数的参数。

这里相关代码如下:

//static
void *Thread::runInThread(void *arg)
{
    Thread *pt = static_cast<Thread*>(arg);
    //pt->tid_ = syscall(SYS_gettid);
    pt->run();
    return NULL;
}

用户将自己的逻辑注册在run中就可以了。

这个Thread不提供detach函数,因为我们在析构函数中做了如下的处理,如果Thread对象析构,线程还在运行,那么需要将Thread设置为detach状态

Thread::~Thread()
{
    if(isRunning_)
    {
        pthread_detach(threadId_);
    }
}

大部分逻辑都是固定的,用户只需要改变run里面的代码即可,于是我们将run设置为纯虚函数,让用户继承Thread类所以析构函数为virtual。

完整的CPP实现如下:

#include "Thread.h"
#include <assert.h>
#include <unistd.h>
#include "MutexLock.h" //TINY_CHECK

Thread::Thread()
    :threadId_(0),
     isRunning_(false)
{

}

Thread::~Thread()
{
    if(isRunning_)
    {
        TINY_CHECK(!pthread_detach(threadId_));
    }
}

//static
void *Thread::runInThread(void *arg)
{
    Thread *pt = static_cast<Thread*>(arg);
    //pt->tid_ = syscall(SYS_gettid);
    pt->run();
    return NULL;
}

void Thread::start()
{
    TINY_CHECK(!pthread_create(&threadId_, NULL, Thread::runInThread, this));
    isRunning_ = true;
}

void Thread::join()
{
    assert(isRunning_);
    TINY_CHECK(!pthread_join(threadId_, NULL));
    isRunning_ = false;
}

测试代码如下:采用继承的方式使用这个类。

扫描二维码关注公众号,回复: 4887874 查看本文章
#include "Thread.h"
#include <iostream>
#include <unistd.h>
using namespace std;

class MyThread : public Thread
{
public:
    void run()
    {
        cout << "foo" << endl;
    }
};

int main(int argc, char const *argv[])
{
    MyThread t;
    t.start();

    t.join();

    return 0;
}

NonCopyable类的定义如下:

#ifndef NONCOPYABLE_H
#define NONCOPYABLE_H


class NonCopyable //禁用值语义
{
public:
    NonCopyable() { }
    ~NonCopyable() { }
private:
    NonCopyable(const NonCopyable &);
    void operator=(const NonCopyable &);
};
#endif //NONCOPYABLE_H

通过函数对象借助function/bind封装

上面采用的是虚函数+继承的方式,用户通过重写Thread基类的run方法,传入自己的用户逻辑。

现在我们采用C++11的function,将函数作为Thread类的成员,用户只需要将function对象传入线程即可,所以Thread的声明中,应该含有一个function成员变量。\

C++11之function模板和bind函数适配器

类的声明如下:

#ifndef THREAD_H_
#define THREAD_H_
#include <boost/noncopyable.hpp>
#include <functional>
#include <pthread.h>
class Thread : boost::noncopyable
{
public:
    typedef std::function<void ()> ThreadCallback;
    Thread(ThreadCallback callback);
    ~Thread();
    void start();
    void join();
    static void *runInThread(void *);
private:
    pthread_t threadId_;
    bool isRunning_;
    ThreadCallback callback_; //回调函数
};
#endif //THREAD_H_

那么如何开启线程?思路与之前一致,写一个static函数,用户pthread_create的第三个参数,this作为最后一个参数即可。

void Thread::start()
{
    pthread_create(&threadId_, NULL, runInThread, this);
    isRunning_ = true;
}

回调函数

  注意在这种封装方式中,我们采用了回调函数。回调函数与普通函数的区别就是,普通函数写完由我们自己直接调用,函数调用是一种不断往上堆积的方式,而回调函数通常是我们把某一个函数传入一个“盒子”,由该盒子内的机制来调用它。
  在这个例子里面,我们将function传入Thread,当Thread启动的时候,由Thread去执行function对象。
  在win32编程中大量用到这种机制,我们为鼠标单击、双击等事件编写相应的函数,然后将其注册给windows系统,然后系统在我们触发各种事件的时候,根据事件的类型,调用相应的构造函数。

关于回调函数,可以参考:http://www.zhihu.com/question/19801131
以后有时间,再专门总结下回调函数。

完整的cpp如下:

#include "Thread.h"

Thread::Thread(ThreadCallback callback)
: threadId_(0),
  isRunning_(false),
  callback_(std::move(callback))
{

}
    
Thread::~Thread()
{
    if(isRunning_)
    {
        //detach
        pthread_detach(threadId_);
    }
}

void Thread::start()
{
    pthread_create(&threadId_, NULL, runInThread, this);
    isRunning_ = true;
}
void Thread::join()
{
    pthread_join(threadId_, NULL);
    isRunning_ = false;
}

void *Thread::runInThread(void *arg)
{
    Thread *pt = static_cast<Thread*>(arg);
    pt->callback_(); //调用回调函数

    return NULL;
}

这个线程的使用方式有三种:

  • 一是将普通函数作为回调函数
void foo()
{
    while(1)
    {
        printf("foo\n");
        sleep(1);
    }
}

int main(int argc, char const *argv[])
{
    Thread t(&foo);

    t.start();
    t.join();

    return 0;
}
  • 二是采用类的成员函数作为回调函数:
class Foo
{
public:
    void foo(int i)
    {
        while(1)
        {
            printf("foo %d\n", i++);
            sleep(1);
        }
    }
};
int main(int argc, char const *argv[])
{
    Foo f;
    int i = 34;
    Thread t(bind(&Foo::foo, &f, i));

    t.start();
    t.join();

    return 0;
}
  • 最后一种是组合一个新的线程类,注意这里采用的是类的组合:
class Foo
{
public:

    Foo()
    : thread_(bind(&Foo::foo, this))
    {
    }

    void start()
    {
        thread_.start();
        thread_.join();
    }

    void foo()
    {
        while(1)
        {
            printf("foo\n");
            sleep(1);
        }
    }
private:
    Thread thread_;
};

int main(int argc, char const *argv[])
{
    Foo f;
    f.start();

    return 0;
}

有些复杂的类,还需要将三种方式加以整合,例如后面要谈到的TimerThread,里面含有一个Thread和Timer,用户将逻辑注册给Timer,然后Timer的start函数注册给Thread。

这种方式的Thread,使用灵活性相对于面向对象的风格,提高了很多。

基于对象和面向对象

这里总结几点:

面向对象依靠的是虚函数+继承,用户通过重写基类的虚函数,实现自己的逻辑。

基于对象,依赖类的组合,使用function和bind实现委托机制,更加依赖于回调函数传入逻辑。

使用C++11封装线程池ThreadPool

https://www.cnblogs.com/inevermore/p/4038698.html

线程池本质上是一个生产者消费者模型,所以请熟悉这篇文章:Linux组件封装(五)一个生产者消费者问题示例
在ThreadPool中,物品为计算任务,消费者为pool内的线程,而生产者则是调用线程池的每个函数。

  • 搞清了这一点,我们很容易就需要得出,ThreadPool需要一把互斥锁和两个同步变量,实现同步与互斥。
  • 存储任务,当然需要一个任务队列。
  • 除此之外,我们还需要一系列的Thread,因为Thread无法复制,所以我们使用unique_ptr作为一个中间层。

所以Thread的数据变量如下:

class ThreadPool : boost::noncopyable
{
public:
    typedef std::function<void ()> Task;
    ThreadPool(size_t queueSize, size_t threadsNum);
    ThreadPool::~ThreadPool()
	{
    	if(isStarted_)
        	stop();
	}
    void start();
    void stop();
    void addTask(Task task); //C++11
    Task getTask();
    bool isStarted() const { return isStarted_; }
    void runInThread();
private:
    mutable MutexLock mutex_;
    Condition empty_;
    Condition full_;
    size_t queueSize_;
    std::queue<Task> queue_;
    const size_t threadsNum_;
    std::vector<std::unique_ptr<Thread> > threads_;
    bool isStarted_;
};

显然,我们使用了function,作为任务队列的任务元素。
构造函数的实现较简单,不过,之前务必注意元素的声明顺序与初始化列表的顺序相一致。

ThreadPool::ThreadPool(size_t queueSize, size_t threadsNum)
: empty_(mutex_),
  full_(mutex_),
  queueSize_(queueSize),
  threadsNum_(threadsNum),
  isStarted_(false)
{

}

添加和取走任务是生产者消费者模型最核心的部分,但是套路较为固定,如下:

void ThreadPool::addTask(Task task)
{
    MutexLockGuard lock(mutex_);
    while(queue_.size() >= queueSize_)
        empty_.wait();
    queue_.push(std::move(task));
    full_.notify();
}

ThreadPool::Task ThreadPool::getTask()
{
    MutexLockGuard lock(mutex_);
    while(queue_.empty())
        full_.wait();
    Task task = queue_.front();
    queue_.pop();
    empty_.notify();
    return task;
}

注意我们的addTask使用了C++11的move语义,在传入右值时,可以提高性能。
还有一些老生常谈的问题,例如:

  • wait前加锁
  • 使用while循环判断wait条件(为什么?)

要想启动线程,需要给Thread提供一个回调函数,编写如下:

void ThreadPool::runInThread()
{
    while(1)
    {
        Task task(getTask());
        if(task)
            task();
    }
}

就是不停的取走任务,然后执行。

OK,有了线程的回调函数,那么我们可以编写start函数。

void ThreadPool::start()
{
    isStarted_ = true;
    //std::vector<std::unique<Thread> >
    for(size_t ix = 0; ix != threadsNum_; ++ix)
    {
        threads_.push_back(
            std::unique_ptr<Thread>(
                new Thread(
                    std::bind(&ThreadPool::runInThread, this))));
    }
    for(size_t ix = 0; ix != threadsNum_; ++ix)
    {
        threads_[ix]->start();
    }

}

这里我们采用C++11的unique_ptr,成功实现vector无法存储Thread(为什么?)的问题。

我们的第一个版本已经编写完毕了。

添加stop功能

刚才的ThreadPool只能启动,无法stop,我们从几个方面着手,利用bool变量isStarted_,实现正确退出。
改动的有以下几点:

首先是Thread的回调函数不再是一个死循环,而是:

void ThreadPool::runInThread()
{
    while(isStarted_)
    {
        Task task(getTask());
        if(task)
            task();
    }
}

然后addTask和getTask,在while循环判断时,加入了bool变量:

void ThreadPool::addTask(Task task)
{
    MutexLockGuard lock(mutex_);
    while(queue_.size() >= queueSize_ && isStarted_)
        empty_.wait();

    if(!isStarted_)
        return;

    queue_.push(std::move(task));
    full_.notify();
}


ThreadPool::Task ThreadPool::getTask()
{
    MutexLockGuard lock(mutex_);
    while(queue_.empty() && isStarted_)
        full_.wait();

    if(!isStarted_) //线程池关闭
        return Task(); //空任务,存量任务扔了

    assert(!queue_.empty());
    Task task = queue_.front();
    queue_.pop();
    empty_.notify();
    return task;
}

这里注意,退出while循环后,需要再判断一次bool变量,因为未必是条件满足了,可能是线程池需要退出,调整了isStarted变量。

最后一个关键是我们的stop函数:

void ThreadPool::stop()
{
    if(isStarted_ == false)
        return;
    {
        MutexLockGuard lock(mutex_);
        isStarted_ = false;
        //清空任务
        while(!queue_.empty()) 
            queue_.pop();
    }
    full_.notifyAll(); //激活所有的线程
    empty_.notifyAll();//getTask在empty_.notify()前已经退出
    
    for(size_t ix = 0; ix != threadsNum_; ++ix)
    {
        threads_[ix]->join();
    }
    threads_.clear();
}

先将bool设置为false,然后调用notifyAll,激活所有等待的线程(为什么)。

最后我们总结下ThreadPool关闭的流程

  • 1.isStarted设置为false
  • 2.加锁,清空队列
  • 3.发信号激活所有线程
  • 4.正在运行的Thread,执行到下一次循环,退出
  • 5.正在等待的线程被激活,然后while判断为false,执行到下一句,检查bool值,然后退出。
  • 6.主线程依次join每个线程。
  • 7.退出。

猜你喜欢

转载自blog.csdn.net/u013870094/article/details/85888550