Linux组件封装(七)——线程池的简单封装

线程池的封装,基础思想与生产者消费者的封装一样,只不过我们是将线程池封装为自动获取任务、执行任务,让用户调用相应的接口来添加任务。

在线程池的封装中,我们同样需要用到的是MutexLock、Condition、Thread这些基本的封装。

基础封装如下:

MutexLock:

 1 #ifndef MUTEXLOCK_H
 2 #define MUTEXLOCK_H
 3 
 4 #include "NonCopyable.h"
 5 #include <pthread.h>
 6 #include <stdlib.h>
 7 #include <stdio.h>
 8 #define TINY_CHECK(exp) 9         if(!exp)10         {    11             fprintf(stderr, "File : %s, Line : %d Exp : ["#exp"] is true, abort.\n", __FILE__, __LINE__); abort();12         }
13 
14 
15 
16 class MutexLock :  NonCopyable
17 {
18     friend class Condition;
19 public:
20     MutexLock();
21     ~MutexLock();
22     void lock();
23     void unlock();
24 
25     bool isLocked() const { return _isLock; }
26     pthread_mutex_t *getMutexPtr() { return &_mutex; }
27 
28 private:
29     void restoreMutexStatus()
30     { _isLock = true; }
31 
32     pthread_mutex_t _mutex;
33     bool _isLock;
34 };
35 
36 
37 class MutexLockGuard : NonCopyable            //将锁封装到MutexLockGuard中,
38 {                                            //这样只需定义一个对象,便可
39 public:                                        //便可自动上锁,对象销毁时自动解锁
40     MutexLockGuard(MutexLock &mutex)
41         :_mutex(mutex)
42     { _mutex.lock(); }
43 
44     ~MutexLockGuard()
45     { _mutex.unlock(); }
46 
47 private:
48     MutexLock &_mutex;
49 };
50 #define MutexLockGuard(m) "ERROR"
51 
52 #endif
View Code
 1 #include "MutexLock.h"
 2 #include <assert.h>
 3 
 4 MutexLock::MutexLock()
 5     :_isLock(false)
 6 {
 7     TINY_CHECK(!pthread_mutex_init(&_mutex, NULL));
 8 }
 9 
10 MutexLock::~MutexLock()
11 {
12     assert(!isLocked());
13     TINY_CHECK(!pthread_mutex_destroy(&_mutex));
14 }
15 
16 void MutexLock::lock()
17 {
18     TINY_CHECK(!pthread_mutex_lock(&_mutex));
19     _isLock = true;
20 }
21 
22 void MutexLock::unlock()
23 {
24     _isLock = false;
25     TINY_CHECK(!pthread_mutex_unlock(&_mutex));
26 }
View Code

Condition:

 1 #ifndef CONDITION_H
 2 #define CONDITION_H
 3 
 4 #include <pthread.h>
 5 #include "NonCopyable.h"
 6 
 7 class MutexLock;
 8 
 9 
10 class Condition : NonCopyable
11 {
12 public:
13     Condition(MutexLock &mutex);
14     ~Condition();
15 
16     void wait();
17     void notify();
18     void notifyAll();
19 private:
20     pthread_cond_t _cond;
21     MutexLock &_mutex;
22 };
23 
24 #endif
View Code
 1 #include "Condition.h"
 2 #include "MutexLock.h"
 3 #include <assert.h>
 4 
 5 Condition::Condition(MutexLock &mutex)
 6     :_mutex(mutex)
 7 {
 8     TINY_CHECK(!pthread_cond_init(&_cond, NULL));
 9 }
10 
11 Condition::~Condition()
12 {
13     TINY_CHECK(!pthread_cond_destroy(&_cond));
14 }
15 
16 void Condition::wait()
17 {
18     assert(_mutex.isLocked());
19     TINY_CHECK(!pthread_cond_wait(&_cond, _mutex.getMutexPtr()));
20     _mutex.restoreMutexStatus();
21 }
22 
23 void Condition::notify()
24 {
25     TINY_CHECK(!pthread_cond_signal(&_cond));
26 }
27 
28 void Condition::notifyAll()
29 {
30     TINY_CHECK(!pthread_cond_broadcast(&_cond));
31 }
View Code

Thread:

 1 #ifndef THREAD_H
 2 #define THREAD_H
 3 #include <boost/noncopyable.hpp>
 4 #include <functional>
 5 #include <pthread.h>
 6 class Thread : boost::noncopyable
 7 {
 8 public:
 9 
10     typedef std::function<void()> ThreadCallback;
11 
12     Thread(ThreadCallback cb);
13     ~Thread();
14 
15     void start();
16     void join();
17 
18 
19 
20 
21 private:
22 
23     static void *runInThread(void *);
24     pthread_t _threadId;
25     bool _isRun;
26     ThreadCallback _callback;
27 };
28 
29 
30 #endif  /*THREAD_H*/
View Code
 1 #include "Thread.h"
 2 #include <assert.h>
 3 
 4 Thread::Thread(ThreadCallback cb)
 5     :_threadId(0),
 6      _isRun(false),
 7      _callback(cb)
 8 {
 9 
10 }
11 
12 Thread::~Thread()
13 {
14     if(_isRun)
15         pthread_detach(_threadId);
16 }
17 
18 
19 void Thread::start()
20 {
21     pthread_create(&_threadId, NULL, runInThread, this);
22     _isRun = true;
23 }
24 
25 void Thread::join()
26 {
27     assert(_isRun);
28     pthread_join(_threadId, NULL);
29     _isRun = false;
30 }
31 
32 void *Thread::runInThread(void *arg)
33 {
34     Thread *p = static_cast<Thread *>(arg);
35     p->_callback();
36     return NULL;
37 }
View Code

NonCopyable:

 1 #ifndef NONCOPYABLE_H
 2 #define NONCOPYABLE_H
 3 
 4 class NonCopyable             //禁用值语意
 5 {
 6     public:
 7         NonCopyable() { }
 8         ~NonCopyable() { }
 9     private:
10         NonCopyable(const NonCopyable &);
11         void operator= (const NonCopyable &);
12 };
13 
14 #endif  /*NON_COPYABLE_H*/
View Code

 

在线程池的封装中,我们需要的数据结构有一个互斥锁,两个条件变量,任务队列以及线程池的队列。

然后,我们需要提供给用户添加任务的接口addTask,在线程池中,我们需要相应的获取任务函数getTask,执行任务的函数runInThread。

头文件代码如下:

 1 #ifndef THREAD_POOL_H
 2 #define THREAD_POOL_H
 3 
 4 #include <boost/noncopyable.hpp>
 5 #include "MutexLock.h"
 6 #include "Condition.h"
 7 #include <queue>
 8 #include <memory>
 9 #include <functional>
10 
11 class Thread;
12 
13 class ThreadPool : boost::noncopyable
14 {
15 public:
16     typedef std::function<void()> Task;
17 
18     ThreadPool(size_t queueSize, size_t poolSize);
19     ~ThreadPool();
20 
21     void start();
22     void stop();
23 
24     void addTask(Task task);
25     bool isRunning() const
26     { return _isStart; }
27 
28 private:
29     Task getTask();
30     void runInThread();
31 
32     mutable MutexLock _mutex;
33     Condition _full;
34     Condition _empty;
35     size_t _queueSize;
36     std::queue<Task> _queue;
37     const size_t _poolSize;
38     std::vector<std::unique_ptr<Thread> > _threads;
39     bool _isStart;
40 };
41 
42 
43 #endif  /*THREAD_POOL_H*/
View Code

在构造函数中,我们用一把锁去初始化两个条件变量,用相应的长度来初始化任务队列的长度与线程池中线程的个数:

ThreadPool::ThreadPool(size_t queueSize, size_t poolSize)
    :_full(_mutex),
     _empty(_mutex),
     _queueSize(queueSize),
     _poolSize(poolSize),
     _isStart(false)
{

}

addTask函数中,我们首先要判断线程池是否开启,然后加锁,判断任务队列是否已满,进行等待。等待后将相应的任务加入到任务队列,通知getTask来获取任务:

void ThreadPool::addTask(Task task)
{
    if(!_isStart)
        return;
    MutexLockGuard lock(_mutex);
    while(_queue.size() >= _queueSize)
        _empty.wait();

    _queue.push(std::move(task));
    _full.notify();
}

getTask函数中,我们首先判断线程池是否开启,然后对应加锁,判断任务队列是否为空,进行等待。等待后,取出任务队列中的第一个任务,通知addTask可以继续添加任务:

ThreadPool::Task ThreadPool::getTask()
{
    if(!_isStart)
        return Task();
    
    MutexLockGuard lock(_mutex);
    while(_queue.empty() && _isStart)
        _full.wait();



    assert(!_queue.empty());
    Task task = _queue.front();
    _queue.pop();
    _empty.notify();

    return task;
}

runInThread函数相对简单,只是获取相应任务,执行该任务即可:

void ThreadPool::runInThread()
{
    while(_isStart)
    {
        Task task(getTask());
        if(task)
            task();
    }
}

start函数,相对来说就比较复杂,我们首先需要new出线程,将线程添加到线程队列,然后将线程队列中的线程开启。

void ThreadPool::start()
{
    if(_isStart)
        return ;
    _isStart = true;

    for(size_t i = 0; i != _poolSize; ++ i)
        _threads.push_back(std::unique_ptr<Thread>(new Thread(std::bind(&ThreadPool::runInThread, this))));

    for(size_t i = 0; i != _poolSize; ++ i)
        _threads[i]->start();
}

这里需要注意:由于线程不可复制和赋值,我们将Thread相应的unique_ptr添加到线程队列,才可以达到相应的效果。而每个线程的回调函数,我们运用C++11的特性——function和bind来实现,我们将Thread的回调函数设置为一个function模板,通过bind,将ThreaPool::runInThread函数中的一个隐式参数转化为无参数,即将this指针绑定给ThreaPool::runInThread,这样,ThreaPool::runInThread就不再需要this指针了。

stop函数,我们需要考虑的因素较多,由于线程池已经开启,但不是每个线程都在运行,有些还在沉睡中,所以我们需要通过notifyAll来通知所有的线程获取任务,这样线程池中的线程已经唤醒,然后我们对每个线程进行join,而任务队列中未执行的任务,我们需要将其清空,通过pop函数来弹出任务队列中的任务:

void ThreadPool::stop()
{
    if(!_isStart)
        return ;
    {
        MutexLockGuard lock(_mutex);
        _isStart = false;
        _full.notifyAll();
    }

    for(size_t i = 0; i != _poolSize; ++ i)
        _threads[i]->join();

    while(!_queue.empty())
        _queue.pop();

    _threads.clear();
}

 

 

具体实现代码如下:

 1 #include "Thread.h"
 2 #include "ThreadPool.h"
 3 #include <assert.h>
 4 using namespace std;
 5 
 6 ThreadPool::ThreadPool(size_t queueSize, size_t poolSize)
 7     :_full(_mutex),
 8      _empty(_mutex),
 9      _queueSize(queueSize),
10      _poolSize(poolSize),
11      _isStart(false)
12 {
13 
14 }
15 ThreadPool::~ThreadPool()
16 {
17     if(_isStart)
18         stop();
19 }
20 
21 void ThreadPool::addTask(Task task)
22 {
23     if(!_isStart)
24         return;
25     MutexLockGuard lock(_mutex);
26     while(_queue.size() >= _queueSize)
27         _empty.wait();
28 
29     _queue.push(std::move(task));
30     _full.notify();
31 }
32 
33 ThreadPool::Task ThreadPool::getTask()
34 {
35     MutexLockGuard lock(_mutex);
36     while(_queue.empty() && _isStart)
37         _full.wait();
38 
39     if(!_isStart)
40         return Task();
41 
42     assert(!_queue.empty());
43     Task task = _queue.front();
44     _queue.pop();
45     _empty.notify();
46 
47     return task;
48 }
49 
50 void ThreadPool::runInThread()
51 {
52     while(_isStart)
53     {
54         Task task(getTask());
55         if(task)
56             task();
57     }
58 }
59 void ThreadPool::start()
60 {
61     if(_isStart)
62         return ;
63     _isStart = true;
64 
65     for(size_t i = 0; i != _poolSize; ++ i)
66         _threads.push_back(std::unique_ptr<Thread>(new Thread(std::bind(&ThreadPool::runInThread, this))));
67 
68     for(size_t i = 0; i != _poolSize; ++ i)
69         _threads[i]->start();
70 }
71 
72 void ThreadPool::stop()
73 {
74     if(!_isStart)
75         return ;
76     {
77         MutexLockGuard lock(_mutex);
78         _isStart = false;
79         _full.notifyAll();
80     }
81 
82     for(size_t i = 0; i != _poolSize; ++ i)
83         _threads[i]->join();
84 
85     while(!_queue.empty())
86         _queue.pop();
87 
88     _threads.clear();
89 }
View Code

 

 

在测试中,我们可以通过前面的定时器来测试,生成一个定时器线程,定时器到期时,对线程池执行stop。

测试代码如下:

#include "ThreadPool.h"
#include "nano_sleep.hpp"
#include <iostream>
#include <time.h>
#include "TimerThread.h"
using namespace std;

void foo()
{
    cout << rand() % 100 << endl;
}
void stopPool(ThreadPool *a)
{
    a->stop();
}
void nano_sleep(double val);
int main(int argc, const char *argv[])
{
    ThreadPool b(120, 4);
    TimerThread a(4, 0, std::bind(&stopPool, &b));
    b.start();
    a.start();
    while(b.isRunning())
    {
        b.addTask(&foo);
        nano_sleep(0.5);
    }

    a.stop();

    return 0;
}

void nano_sleep(double val)
{
    struct timespec tv;
    tv.tv_sec = val; //取整
    tv.tv_nsec = (val - tv.tv_sec) * 1000 * 1000 * 1000;

    int ret;
    do
    {
        ret = nanosleep(&tv, &tv);
    }while(ret == -1 && errno == EINTR);
}

在测试代码中,nano_sleep是一个相对精确地睡眠函数,我们可以将睡眠的精度限制到double。

当定时器到时后,会显示timeout,程序会自动退出。

注意:由于运用了C++11的一些特性,如function、bind和右值引用等,编译时需加上-std=c++0x

郑重声明:本站内容如果来自互联网及其他传播媒体,其版权均属原媒体及文章作者所有。转载目的在于传递更多信息及用于网络分享,并不代表本站赞同其观点和对其真实性负责,也不构成任何其他建议。