Linux组件封装(七)——线程池的简单封装
线程池的封装,基础思想与生产者消费者的封装一样,只不过我们是将线程池封装为自动获取任务、执行任务,让用户调用相应的接口来添加任务。
在线程池的封装中,我们同样需要用到的是MutexLock、Condition、Thread这些基本的封装。
基础封装如下:
MutexLock:
1 #ifndef MUTEXLOCK_H 2 #define MUTEXLOCK_H 3 4 #include "NonCopyable.h" 5 #include <pthread.h> 6 #include <stdlib.h> 7 #include <stdio.h> 8 #define TINY_CHECK(exp) 9 if(!exp)10 { 11 fprintf(stderr, "File : %s, Line : %d Exp : ["#exp"] is true, abort.\n", __FILE__, __LINE__); abort();12 } 13 14 15 16 class MutexLock : NonCopyable 17 { 18 friend class Condition; 19 public: 20 MutexLock(); 21 ~MutexLock(); 22 void lock(); 23 void unlock(); 24 25 bool isLocked() const { return _isLock; } 26 pthread_mutex_t *getMutexPtr() { return &_mutex; } 27 28 private: 29 void restoreMutexStatus() 30 { _isLock = true; } 31 32 pthread_mutex_t _mutex; 33 bool _isLock; 34 }; 35 36 37 class MutexLockGuard : NonCopyable //将锁封装到MutexLockGuard中, 38 { //这样只需定义一个对象,便可 39 public: //便可自动上锁,对象销毁时自动解锁 40 MutexLockGuard(MutexLock &mutex) 41 :_mutex(mutex) 42 { _mutex.lock(); } 43 44 ~MutexLockGuard() 45 { _mutex.unlock(); } 46 47 private: 48 MutexLock &_mutex; 49 }; 50 #define MutexLockGuard(m) "ERROR" 51 52 #endif
1 #include "MutexLock.h" 2 #include <assert.h> 3 4 MutexLock::MutexLock() 5 :_isLock(false) 6 { 7 TINY_CHECK(!pthread_mutex_init(&_mutex, NULL)); 8 } 9 10 MutexLock::~MutexLock() 11 { 12 assert(!isLocked()); 13 TINY_CHECK(!pthread_mutex_destroy(&_mutex)); 14 } 15 16 void MutexLock::lock() 17 { 18 TINY_CHECK(!pthread_mutex_lock(&_mutex)); 19 _isLock = true; 20 } 21 22 void MutexLock::unlock() 23 { 24 _isLock = false; 25 TINY_CHECK(!pthread_mutex_unlock(&_mutex)); 26 }
Condition:
1 #ifndef CONDITION_H 2 #define CONDITION_H 3 4 #include <pthread.h> 5 #include "NonCopyable.h" 6 7 class MutexLock; 8 9 10 class Condition : NonCopyable 11 { 12 public: 13 Condition(MutexLock &mutex); 14 ~Condition(); 15 16 void wait(); 17 void notify(); 18 void notifyAll(); 19 private: 20 pthread_cond_t _cond; 21 MutexLock &_mutex; 22 }; 23 24 #endif
1 #include "Condition.h" 2 #include "MutexLock.h" 3 #include <assert.h> 4 5 Condition::Condition(MutexLock &mutex) 6 :_mutex(mutex) 7 { 8 TINY_CHECK(!pthread_cond_init(&_cond, NULL)); 9 } 10 11 Condition::~Condition() 12 { 13 TINY_CHECK(!pthread_cond_destroy(&_cond)); 14 } 15 16 void Condition::wait() 17 { 18 assert(_mutex.isLocked()); 19 TINY_CHECK(!pthread_cond_wait(&_cond, _mutex.getMutexPtr())); 20 _mutex.restoreMutexStatus(); 21 } 22 23 void Condition::notify() 24 { 25 TINY_CHECK(!pthread_cond_signal(&_cond)); 26 } 27 28 void Condition::notifyAll() 29 { 30 TINY_CHECK(!pthread_cond_broadcast(&_cond)); 31 }
Thread:
1 #ifndef THREAD_H 2 #define THREAD_H 3 #include <boost/noncopyable.hpp> 4 #include <functional> 5 #include <pthread.h> 6 class Thread : boost::noncopyable 7 { 8 public: 9 10 typedef std::function<void()> ThreadCallback; 11 12 Thread(ThreadCallback cb); 13 ~Thread(); 14 15 void start(); 16 void join(); 17 18 19 20 21 private: 22 23 static void *runInThread(void *); 24 pthread_t _threadId; 25 bool _isRun; 26 ThreadCallback _callback; 27 }; 28 29 30 #endif /*THREAD_H*/
1 #include "Thread.h" 2 #include <assert.h> 3 4 Thread::Thread(ThreadCallback cb) 5 :_threadId(0), 6 _isRun(false), 7 _callback(cb) 8 { 9 10 } 11 12 Thread::~Thread() 13 { 14 if(_isRun) 15 pthread_detach(_threadId); 16 } 17 18 19 void Thread::start() 20 { 21 pthread_create(&_threadId, NULL, runInThread, this); 22 _isRun = true; 23 } 24 25 void Thread::join() 26 { 27 assert(_isRun); 28 pthread_join(_threadId, NULL); 29 _isRun = false; 30 } 31 32 void *Thread::runInThread(void *arg) 33 { 34 Thread *p = static_cast<Thread *>(arg); 35 p->_callback(); 36 return NULL; 37 }
NonCopyable:
1 #ifndef NONCOPYABLE_H 2 #define NONCOPYABLE_H 3 4 class NonCopyable //禁用值语意 5 { 6 public: 7 NonCopyable() { } 8 ~NonCopyable() { } 9 private: 10 NonCopyable(const NonCopyable &); 11 void operator= (const NonCopyable &); 12 }; 13 14 #endif /*NON_COPYABLE_H*/
在线程池的封装中,我们需要的数据结构有一个互斥锁,两个条件变量,任务队列以及线程池的队列。
然后,我们需要提供给用户添加任务的接口addTask,在线程池中,我们需要相应的获取任务函数getTask,执行任务的函数runInThread。
头文件代码如下:
1 #ifndef THREAD_POOL_H 2 #define THREAD_POOL_H 3 4 #include <boost/noncopyable.hpp> 5 #include "MutexLock.h" 6 #include "Condition.h" 7 #include <queue> 8 #include <memory> 9 #include <functional> 10 11 class Thread; 12 13 class ThreadPool : boost::noncopyable 14 { 15 public: 16 typedef std::function<void()> Task; 17 18 ThreadPool(size_t queueSize, size_t poolSize); 19 ~ThreadPool(); 20 21 void start(); 22 void stop(); 23 24 void addTask(Task task); 25 bool isRunning() const 26 { return _isStart; } 27 28 private: 29 Task getTask(); 30 void runInThread(); 31 32 mutable MutexLock _mutex; 33 Condition _full; 34 Condition _empty; 35 size_t _queueSize; 36 std::queue<Task> _queue; 37 const size_t _poolSize; 38 std::vector<std::unique_ptr<Thread> > _threads; 39 bool _isStart; 40 }; 41 42 43 #endif /*THREAD_POOL_H*/
在构造函数中,我们用一把锁去初始化两个条件变量,用相应的长度来初始化任务队列的长度与线程池中线程的个数:
ThreadPool::ThreadPool(size_t queueSize, size_t poolSize) :_full(_mutex), _empty(_mutex), _queueSize(queueSize), _poolSize(poolSize), _isStart(false) { }
addTask函数中,我们首先要判断线程池是否开启,然后加锁,判断任务队列是否已满,进行等待。等待后将相应的任务加入到任务队列,通知getTask来获取任务:
void ThreadPool::addTask(Task task) { if(!_isStart) return; MutexLockGuard lock(_mutex); while(_queue.size() >= _queueSize) _empty.wait(); _queue.push(std::move(task)); _full.notify(); }
getTask函数中,我们首先判断线程池是否开启,然后对应加锁,判断任务队列是否为空,进行等待。等待后,取出任务队列中的第一个任务,通知addTask可以继续添加任务:
ThreadPool::Task ThreadPool::getTask() { if(!_isStart) return Task(); MutexLockGuard lock(_mutex); while(_queue.empty() && _isStart) _full.wait(); assert(!_queue.empty()); Task task = _queue.front(); _queue.pop(); _empty.notify(); return task; }
runInThread函数相对简单,只是获取相应任务,执行该任务即可:
void ThreadPool::runInThread() { while(_isStart) { Task task(getTask()); if(task) task(); } }
start函数,相对来说就比较复杂,我们首先需要new出线程,将线程添加到线程队列,然后将线程队列中的线程开启。
void ThreadPool::start() { if(_isStart) return ; _isStart = true; for(size_t i = 0; i != _poolSize; ++ i) _threads.push_back(std::unique_ptr<Thread>(new Thread(std::bind(&ThreadPool::runInThread, this)))); for(size_t i = 0; i != _poolSize; ++ i) _threads[i]->start(); }
这里需要注意:由于线程不可复制和赋值,我们将Thread相应的unique_ptr添加到线程队列,才可以达到相应的效果。而每个线程的回调函数,我们运用C++11的特性——function和bind来实现,我们将Thread的回调函数设置为一个function模板,通过bind,将ThreaPool::runInThread函数中的一个隐式参数转化为无参数,即将this指针绑定给ThreaPool::runInThread,这样,ThreaPool::runInThread就不再需要this指针了。
stop函数,我们需要考虑的因素较多,由于线程池已经开启,但不是每个线程都在运行,有些还在沉睡中,所以我们需要通过notifyAll来通知所有的线程获取任务,这样线程池中的线程已经唤醒,然后我们对每个线程进行join,而任务队列中未执行的任务,我们需要将其清空,通过pop函数来弹出任务队列中的任务:
void ThreadPool::stop() { if(!_isStart) return ; { MutexLockGuard lock(_mutex); _isStart = false; _full.notifyAll(); } for(size_t i = 0; i != _poolSize; ++ i) _threads[i]->join(); while(!_queue.empty()) _queue.pop(); _threads.clear(); }
具体实现代码如下:
1 #include "Thread.h" 2 #include "ThreadPool.h" 3 #include <assert.h> 4 using namespace std; 5 6 ThreadPool::ThreadPool(size_t queueSize, size_t poolSize) 7 :_full(_mutex), 8 _empty(_mutex), 9 _queueSize(queueSize), 10 _poolSize(poolSize), 11 _isStart(false) 12 { 13 14 } 15 ThreadPool::~ThreadPool() 16 { 17 if(_isStart) 18 stop(); 19 } 20 21 void ThreadPool::addTask(Task task) 22 { 23 if(!_isStart) 24 return; 25 MutexLockGuard lock(_mutex); 26 while(_queue.size() >= _queueSize) 27 _empty.wait(); 28 29 _queue.push(std::move(task)); 30 _full.notify(); 31 } 32 33 ThreadPool::Task ThreadPool::getTask() 34 { 35 MutexLockGuard lock(_mutex); 36 while(_queue.empty() && _isStart) 37 _full.wait(); 38 39 if(!_isStart) 40 return Task(); 41 42 assert(!_queue.empty()); 43 Task task = _queue.front(); 44 _queue.pop(); 45 _empty.notify(); 46 47 return task; 48 } 49 50 void ThreadPool::runInThread() 51 { 52 while(_isStart) 53 { 54 Task task(getTask()); 55 if(task) 56 task(); 57 } 58 } 59 void ThreadPool::start() 60 { 61 if(_isStart) 62 return ; 63 _isStart = true; 64 65 for(size_t i = 0; i != _poolSize; ++ i) 66 _threads.push_back(std::unique_ptr<Thread>(new Thread(std::bind(&ThreadPool::runInThread, this)))); 67 68 for(size_t i = 0; i != _poolSize; ++ i) 69 _threads[i]->start(); 70 } 71 72 void ThreadPool::stop() 73 { 74 if(!_isStart) 75 return ; 76 { 77 MutexLockGuard lock(_mutex); 78 _isStart = false; 79 _full.notifyAll(); 80 } 81 82 for(size_t i = 0; i != _poolSize; ++ i) 83 _threads[i]->join(); 84 85 while(!_queue.empty()) 86 _queue.pop(); 87 88 _threads.clear(); 89 }
在测试中,我们可以通过前面的定时器来测试,生成一个定时器线程,定时器到期时,对线程池执行stop。
测试代码如下:
#include "ThreadPool.h" #include "nano_sleep.hpp" #include <iostream> #include <time.h> #include "TimerThread.h" using namespace std; void foo() { cout << rand() % 100 << endl; } void stopPool(ThreadPool *a) { a->stop(); } void nano_sleep(double val); int main(int argc, const char *argv[]) { ThreadPool b(120, 4); TimerThread a(4, 0, std::bind(&stopPool, &b)); b.start(); a.start(); while(b.isRunning()) { b.addTask(&foo); nano_sleep(0.5); } a.stop(); return 0; } void nano_sleep(double val) { struct timespec tv; tv.tv_sec = val; //取整 tv.tv_nsec = (val - tv.tv_sec) * 1000 * 1000 * 1000; int ret; do { ret = nanosleep(&tv, &tv); }while(ret == -1 && errno == EINTR); }
在测试代码中,nano_sleep是一个相对精确地睡眠函数,我们可以将睡眠的精度限制到double。
当定时器到时后,会显示timeout,程序会自动退出。
注意:由于运用了C++11的一些特性,如function、bind和右值引用等,编译时需加上-std=c++0x
郑重声明:本站内容如果来自互联网及其他传播媒体,其版权均属原媒体及文章作者所有。转载目的在于传递更多信息及用于网络分享,并不代表本站赞同其观点和对其真实性负责,也不构成任何其他建议。