Linux组件封装之五:生产者消费者问题

生产者,消费者问题是有关互斥锁(MutexLock)、条件变量(Condition)、线程(Thread)的经典案例;

描述的问题可以叙述为 生产者往buffer中投放产品,而消费者则从buffer中消费产品。

生产着消费者问题的难点在于:

为了缓冲区数据的安全性,一次只允许一个线程进入缓冲区投放或者消费产品,这个buffer就是所谓的临界资源。
生产者往缓冲区中投放产品时,如果缓冲区已满,那么该线程需要等待,即进入阻塞状态,一直到消费者取走产品为止。
相应的,消费者欲取走产品,如果此时缓冲区为空,即没有产品,那么消费者则需要等待,一直到有生产者投放产品为止。

第一个问题属于互斥问题,我们需要一把互斥锁实现互斥访问(MutexLock), 以确保实现缓冲区的安全访问。
后两个问题则属于同步问题,两类线程相互协作,我们需要两个条件变量,一个用于通知消费者从缓冲区取走产品,另一个通知生产者往缓冲区投放产品。

生产者的大概流程为:

1、加锁;
2、若缓冲区已满,则进入等待状态;否则执行 33、生产产品;
4、解锁;
5、通知消费者取走产品

消费者的大概流程为:

1、加锁;
2、若缓冲区已空,则进入等待状态;否则执行 33、取走产品;
4、解锁;
5、通知生产者生产产品

为此,我们设计出一个缓冲区类,把互斥锁和条件变量作为其成员变量;

 1 #ifndef BUFFER_H_
 2 #define BUFFER_H_
 3 
 4 #include "NonCopyable.h"
 5 #include "MutexLock.h"
 6 #include "Condition.h"
 7 #include <queue> 
 8 
 9 class Buffer:NonCopyable
10 {
11     public:
12         Buffer(size_t size);//attention
13 
14         void push(int val);//投放产品
15         int pop();//取走产品
16 
17         bool isEmpty()const;
18         size_t size()const;
19     private:
20          mutable MutexLock mutex_;//注意声明次序,不能改变
21         Condition full_;
22         Condition empty_;
23         
24         size_t size_;//缓冲区大小
25         std::queue<int> q_;
26 };
27 
28 #endif

这里注意, 我们把同步与互斥的操作都放入Buffer中,是得生产者和消费者线程不必考虑其中的细节,这符合软件设计的“高内聚,低耦合”原则;

还有一点, mutex被声明为mutable类型,意味着mutex的状态在const函数中仍然可以被改变,是符合程序逻辑的,把mutex声明为mutable,是一种标准实现

Buffer的具体实现代码如下:

 1 #include "Buffer.h"
 2 #include <iostream>
 3 Buffer::Buffer(size_t size)
 4     :size_(size),
 5     full_(mutex_), //用mutex初始化Condition的一个对象
 6     empty_(mutex_)//用mutex初始化Condition的另一个对象
 7 {} 
 8 
 9 void Buffer::push(int val)
10 {
11     { //attention 作用域问题
12     MutexGuard lock(mutex_);
13     while(q_.size()>= size_)
14         empty_.wait();
15     q_.push(val);
16     }
17     full_.signal();
18 }   
19 
20 int Buffer::pop()//attention
21 {
22     int tmp= 0;
23     {
24         MutexGuard lock(mutex_);
25         while(q_.empty())
26             full_.wait();
27         tmp = q_.front();
28         q_.pop();
29     }
30     empty_.signal();
31     return tmp;
32 }
33 
34 
35 bool Buffer::isEmpty()const
36 {
37 //after    
38     MutexGuard lock(mutex_);//作用域仅限于花括号内
39     return q_.empty();
40 }
41 
42 size_t Buffer::size()const
43 {
44     MutexGuard lock(mutex_);
45     return q_.size();
46 }

注意:
1、条件变量的等待必须使用While, 这是一种最佳实践,原因可见Condition的封装 Linux组件封装之二:Condition

2、可以先notify,也可以先解锁,不过推荐先解锁,原因是如果线程A先notify,唤醒一个线程B,但是A还未解锁,此时如果线程切换至刚唤醒的线程B,B马上尝试lock,但是肯定失败,然后阻塞,这增加了一次线程切换的开销

 

这里还有一个问题,就是我们在main函数中,必须一个一个的声明生产者,消费者,一个一个的去start、join,那么为了防止这种麻烦,我们可以怎么做呢?

我们可以将缓冲区与多个生产者、消费者封装成一个 车间类。代码如下:

 1 #ifndef WORKSHOP_H_
 2 #define WORKSHOP_H_
 3 
 4 #include "NonCopyable.h"
 5 #include "Buffer.h"
 6 #include <vector>
 7 
 8 class ProducerThread;
 9 class ConsumerThread;
10 class Buffer;
11 class WorkShop:NonCopyable
12 {
13     public:
14         WorkShop(size_t bufferSize,
15                  size_t producerSize,
16                  size_t consumerSize);
17 
18         ~WorkShop();
19         void startWorking();
20   
21     private:
22         size_t bufferSize_;
23         Buffer buffer_;
24 
25         size_t producerSize_;
26         size_t consumerSize_;
27         std::vector<ProducerThread*> producers_;
28         std::vector<ConsumerThread*> consumers_;
29 };
30 
31 #endif

实现如下(注意之处放在cpp中);

 1 #include "WorkShop.h"
 2 #include "ProducerThread.h"
 3 #include "ConsumerThread.h"
 4 
 5 //version 1
 6 WorkShop::WorkShop(size_t buffersize,
 7                    size_t producerSize,
 8                    size_t consumerSize)
 9     :bufferSize_(buffersize),
10      buffer_(bufferSize_),
11      producerSize_(producerSize),
12      consumerSize_(consumerSize),
13      producers_(producerSize_, new ProducerThread(buffer_)),  
14      consumers_(consumerSize_, new ConsumerThread(buffer_)) 
15 {
16 
17 }
18 
19 //version 2
20 /*
21 WorkShop::WorkShop(size_t buffersize,
22                    size_t producerSize,
23                    size_t consumerSize)
24     :bufferSize_(buffersize),
25      buffer_(bufferSize_),
26      producerSize_(producerSize),
27      consumerSize_(consumerSize),
28      producers_(producerSize_,NULL),//vector 的初始化   
29      consumers_(consumerSize_,NULL) 
30 {
31     for(std::vector<ProducerThread*>::iterator it = producers_.begin();
32         it != producers_.end();
33         ++it)
34     {
35         *it = new ProducerThread(buffer_);
36     }
37 
38     for(std::vector<ConsumerThread*>::iterator it = consumers_.begin();
39         it != consumers_.end();
40         ++it)
41     {
42         *it = new ConsumerThread(buffer_);
43     }
44 }
45 */
46 
47 WorkShop::~WorkShop()
48 {
49     for(std::vector<ProducerThread*>::iterator it = producers_.begin();
50         it != producers_.end();
51         ++it)
52     {
53          delete *it ;
54     }
55 
56     for(std::vector<ConsumerThread*>::iterator it = consumers_.begin();
57         it != consumers_.end();
58         ++it)
59     {
60         delete *it ;
61     }
62 }
63 
64 void WorkShop::startWorking()
65 {
66     for(std::vector<ProducerThread*>::iterator it = producers_.begin();
67         it != producers_.end();
68         ++it)
69     {
70         //注意,此循环不能同时调用start,join->发生阻塞(只能产生一个 ProducerThread)
71         (*it)->start() ;
72     }
73     for(std::vector<ConsumerThread*>::iterator it = consumers_.begin();
74         it != consumers_.end();
75         ++it)
76     {
77         (*it)->start() ;
78     }
79 
80     for(std::vector<ProducerThread*>::iterator it = producers_.begin();
81         it != producers_.end();
82         ++it)
83     {
84          (*it)->join() ;
85     }
86     for(std::vector<ConsumerThread*>::iterator it = consumers_.begin();
87         it != consumers_.end();
88         ++it)
89     {
90         (*it)->join() ;
91     }
92 }

这样我们就可以同时指定 buffer的大小,生产者的数目,消费者的数目

郑重声明:本站内容如果来自互联网及其他传播媒体,其版权均属原媒体及文章作者所有。转载目的在于传递更多信息及用于网络分享,并不代表本站赞同其观点和对其真实性负责,也不构成任何其他建议。