c++ 阻塞队列

阻塞队列是后台开发中多线程异步架构的基本数据结构,像python, java 都提供线程安全的阻塞队列,c++ 可能需要自己实现一个模板。

从性能考虑,自己没有使用STL的queue作为基本数据结构,而是使用循环数组作为基本数据结构,性能应该比queue高,省去了动态内存分配和回收。

确定就是,队列大小不可动态扩展,当时实际开发中,可以通过压力测试,配置合适的队列大小。


/********************************************
function: thread safe blocking queue.
author: liuyi
date: 2014.11.13
version: 2.0
********************************************/

#ifndef BLOCK_QUEUE_H
#define BLOCK_QUEUE_H

#include <iostream>
#include <pthread.h>
#include <sys/time.h>
using namespace std;

template<class T>
class block_queue
{
	public:
		block_queue(int max_size = 1000)
		{
			if(max_size <= 0)
			{
				exit(-1);
			}
			
			m_max_size = max_size;
			m_array = new T[max_size];
			m_size = 0;
			m_front = -1;
			m_back = -1;

			m_mutex = new pthread_mutex_t;
			m_cond = new pthread_cond_t;
			pthread_mutex_init(m_mutex, NULL);
			pthread_cond_init(m_cond, NULL);
		}

		~block_queue()
		{
			pthread_mutex_lock(m_mutex);
			if(m_array != NULL)
				delete  m_array;
			pthread_mutex_unlock(m_mutex);

			pthread_mutex_destroy(m_mutex);
			pthread_cond_destroy(m_cond);

			delete m_mutex;
			delete m_cond;
		}

		bool full()const
		{
			pthread_mutex_lock(m_mutex);
			if(m_size >= m_max_size)
			{
				pthread_mutex_unlock(m_mutex);
				return true;
			}
			pthread_mutex_unlock(m_mutex);
			return false;
		}

		bool empty()const
		{
			pthread_mutex_lock(m_mutex);
			if(0 == m_size)
			{
				pthread_mutex_unlock(m_mutex);
				return true;
			}
			pthread_mutex_unlock(m_mutex);
			return false;
		}
		
		T front()const
		{
			T tmp;
			pthread_mutex_lock(m_mutex);
			tmp = m_array[m_front];
			pthread_mutex_unlock(m_mutex);
			return tmp;
		}
		
		T back()const
		{
			T tmp;
			pthread_mutex_lock(m_mutex);
			tmp = m_array[m_back];
			pthread_mutex_unlock(m_mutex);
			return tmp;
		}

		int size()const
		{
			int tmp = 0;
			pthread_mutex_lock(m_mutex);
			tmp = m_size;
			pthread_mutex_unlock(m_mutex);
			return tmp;
		}

		int max_size()const
		{
			int tmp = 0;
			pthread_mutex_lock(m_mutex);
			tmp = m_max_size;
			pthread_mutex_unlock(m_mutex);
			return tmp;
		}

		bool push(const T& item)
		{
			pthread_mutex_lock(m_mutex);
			if(m_size >= m_max_size)
			{
				pthread_cond_broadcast(m_cond);
				pthread_mutex_unlock(m_mutex);
				return false;
			}
			
			m_back = (m_back + 1) % m_max_size;
			m_array[m_back] = item;

			m_size++;
			pthread_cond_broadcast(m_cond);
			pthread_mutex_unlock(m_mutex);

			return true;
		}

		bool pop(T& item)
		{
			pthread_mutex_lock(m_mutex);
			while(m_size <= 0)
			{
				if(0 != pthread_cond_wait(m_cond, m_mutex))
				{
					pthread_mutex_unlock(m_mutex);
					return false;
				}
			}

			m_front = (m_front + 1) % m_max_size;
			item = m_array[m_front];
			m_size--;
			pthread_mutex_unlock(m_mutex);
			return true;
		}

		bool pop(T& item, int ms_timeout)
		{
			struct timespec t = {0,0};
			struct timeval now = {0,0};
			gettimeofday(&now, NULL);
			pthread_mutex_lock(m_mutex);
			if(m_size <= 0)
			{
				t.tv_sec = now.tv_sec + ms_timeout/1000;
				t.tv_nsec = (ms_timeout % 1000)*1000;
				if(0 != pthread_cond_timedwait(m_cond, m_mutex, &t))
				{
					pthread_mutex_unlock(m_mutex);
					return false;
				}
			}
			
			m_front = (m_front + 1) % m_max_size;
			item = m_array[m_front];
			m_size--;
			pthread_mutex_unlock(m_mutex);
			return true;
		}

	private:
		pthread_mutex_t *m_mutex;
		pthread_cond_t *m_cond;
		T *m_array;
		int m_size;
		int m_max_size;
		int m_front;
		int m_back;
};

#endif

//测试程序

#include<iostream>
#include"block_queue.h"
using namespace std;

block_queue<int> g_queue(100);
void *p(void *args)
{
	sleep(1);
	int data = 0;
	for(int i = 0; i < 100; i++)
	{
		g_queue.push(data++);
	}
	return NULL;
}

void *c(void* args)
{
	while(true)
	{
		int t = 0;
		if(!g_queue.pop(t,1000))
		{
			cout<<"timeout"<<endl;
			continue;
		}
		else
		{
			cout<<t<<endl;
		}
		g_queue.pop(t);
		cout<<"block="<<t<<endl;
	
	}

	return NULL;
}

int main()
{
	pthread_t id;
	pthread_create(&id, NULL, p, NULL);
	//pthread_create(&id, NULL, p, NULL);
	//pthread_create(&id, NULL, c, NULL);
	pthread_create(&id, NULL, c, NULL);
	for(;;)sleep(1);
	return 0;
}


郑重声明:本站内容如果来自互联网及其他传播媒体,其版权均属原媒体及文章作者所有。转载目的在于传递更多信息及用于网络分享,并不代表本站赞同其观点和对其真实性负责,也不构成任何其他建议。