进程间通信(6) - 消息队列posix

1.前言

本篇文章的所有例子,基于RHEL6.5平台(linux kernal: 2.6.32-431.el6.i686)。

2.介绍

消息队列是先进先出FIFO原则。

消息队列就是一个消息的链表。可以把消息看作一个记录,具有特定的格式以及特定的优先级。对消息队列有写权限的进程可以向其中按照一定的规则添加新消息;对消息队列有读权限的进程则可以从消息队列中读走消息。消息队列是随内核持续的。

目前主要有两种类型的消息队列:POSIX消息队列以及System V消息队列,System V消息队列目前被大量使用。考虑到程序的可移植性,新开发的应用程序应尽量使用POSIX消息队列。

消息队列和之前讨论过的管道(点此链接)和FIFO(点此链接)有很大的区别,主要有以下两点:
     · 一个进程向消息队列写入消息之前,并不需要某个进程在该队列上等待该消息的到达,而管道和FIFO是相反的,进程向其中写消息时,管道和FIFO必需已经打开来读,否则写进程就会阻塞(默认情况下)。
    ·  IPC的持续性不同。管道和FIFO随进程的持续性,当管道和FIFO最后一次关闭发生时,仍在管道和FIFO中的数据会被丢弃。消息队列是随内核的持续性,即一个进程向消息队列写入消息后,然后终止,另外一个进程可以在以后某个时刻打开该队列读取消息。只要内核没有重新自举,消息队列没有被删除。

消息队列中的每条消息通常具有以下属性:
    · 一个表示优先级的整数;
    
· 消息的数据部分的长度;
    ·  消息数据本身;

POSIX消息队列的一个可能的设计是一个如下图所示的消息链表,链表头部有消息队列的属性信息。
技术分享

3.创建与关闭

POSIX消息队列的创建,关闭和删除用到以下三个函数接口:

#include <mqueue.h>  
mqd_t mq_open(const char *name, int oflag, /* mode_t mode, struct mq_attr *attr */);  
                                    //成功返回消息队列描述符,失败返回-1  
mqd_t mq_close(mqd_t mqdes);  
mqd_t mq_unlink(const char *name);  //成功返回0,失败返回-1

mq_open用于打开或创建一个消息队列。
    name:表示消息队列的名字,它符合POSIX IPC的名字规则。
    oflag:表示打开的方式,和open函数的类似。有必须的选项:O_RDONLYO_WRONLYO_RDWR,还有可选的选项:O_NONBLOCKO_CREATO_EXCL
    mode:是一个可选参数,在oflag中含有O_CREAT标志且消息队列不存在时,才需要提供该参数。表示默认访问权限。可以参考open
    attr:也是一个可选参数,在oflag中含有O_CREAT标志且消息队列不存在时才需要。该参数用于给新队列设定某些属性,如果是空指针,那么就采用默认属性。
    mq_open返回值是mqd_t类型的值,被称为消息队列描述符。在Linux 2.6.32中该类型的定义为整型:    #include <bits/mqueue.h>      typedef int mqd_t; 

mq_close用于关闭一个消息队列,与对文件进行close类似,关闭后,消息队列并不从系统中删除。一个进程结束,会自动调用关闭打开着的消息队列。

mq_unlink用于删除一个消息队列。消息队列创建后只有通过调用该函数或者是内核自举才能进行删除。每个消息队列都有一个保存当前打开着描述符数的引用计数器,和文件一样,因此本函数能够实现类似于unlink函数删除一个文件的机制。

POSIX消息队列的名字所创建的真正路径名和具体的系统实现有关,关于具体POSIX IPC的名字规则可以参考《UNIX 网络编程 卷2:进程间通信》的P14。

经过测试,在Linux 2.6.32中,所创建的POSIX消息队列不会在文件系统中创建真正的路径名。且POSIX的名字只能以一个’/’开头,名字中不能包含其他的’/’。

4.消息队列的属性

POSIX标准规定消息队列属性mq_attr必须要含有以下四个内容:
long    mq_flags         //消息队列的标志:0或O_NONBLOCK,用来表示是否阻塞  
long    mq_maxmsg  //消息队列的最大消息数  
long    mq_msgsize  //消息队列中每个消息的最大字节数  
long    mq_curmsgs  //消息队列中当前的消息数目  

在Linux 2.6.32中mq_attr结构的定义如下:

#include <bits/mqueue.h>
struct mq_attr {
        long    mq_flags;       /* message queue flags                  */
        long    mq_maxmsg;      /* maximum number of messages           */
        long    mq_msgsize;     /* maximum message size                 */
        long    mq_curmsgs;     /* number of messages currently queued  */
        long    __reserved[4];  /* ignored for input, zeroed for output */
};

POSIX消息队列的属性设置和获取可以通过下面两个函数实现:

#include <mqueue.h>  
//成功返回0,失败返回-1
mqd_t mq_getattr(mqd_t mqdes, struct mq_attr *attr);  
mqd_t mq_setattr(mqd_t mqdes, struct mq_attr *newattr, struct mq_attr *oldattr);

mq_getattr用于获取当前消息队列的属性,mq_setattr用于设置当前消息队列的属性。其中mq_setattr中的oldattr用于保存修改前的消息队列的属性,可以为空。
mq_setattr可以设置的属性只有mq_flags,用来设置或清除消息队列的非阻塞标志。newattr结构的其他属性被忽略。mq_maxmsg和mq_msgsize属性只能在创建消息队列时通过
mq_open来设置。mq_open只会设置该两个属性,忽略另外两个属性。mq_curmsgs属性只能被获取而不能被设置。

下面是测试代码:

#include <iostream>  
#include <cstring>  

#include <errno.h>  
#include <unistd.h>  
#include <fcntl.h>  
#include <mqueue.h>  

int main()
{
	mqd_t mqID;
	mqID = mq_open("/shltsh_Queue", O_RDWR | O_CREAT, 0666, NULL);

	if (mqID < 0)
	{
		std::cout << "open message queue error..." << strerror(errno) << std::endl;
		return -1;
	}

	mq_attr mqAttr;
	if (mq_getattr(mqID, &mqAttr) < 0)
	{
		std::cout << "get the message queue attribute error" << std::endl;
		return -1;
	}

	std::cout << "mq_flags:" << mqAttr.mq_flags << std::endl;
	std::cout << "mq_maxmsg:" << mqAttr.mq_maxmsg << std::endl;
	std::cout << "mq_msgsize:" << mqAttr.mq_msgsize << std::endl;
	std::cout << "mq_curmsgs:" << mqAttr.mq_curmsgs << std::endl;
}
编译:
[root@MiWiFi-R1CM csdnblog]# g++ pipe.cpp  -lrt
输出:
mq_flags:0
mq_maxmsg:10
mq_msgsize:8192
mq_curmsgs:0

5.消息的发送接收

POSIX消息队列可以通过以下两个函数来进行发送和接收消息:

#include <mqueue.h>  
mqd_t mq_send(mqd_t mqdes, const char *msg_ptr,  
                      size_t msg_len, unsigned msg_prio);  
                     //成功返回0,出错返回-1  
  
mqd_t mq_receive(mqd_t mqdes, char *msg_ptr,  
                      size_t msg_len, unsigned *msg_prio);  
                     //成功返回接收到消息的字节数,出错返回-1  
  
#ifdef __USE_XOPEN2K  
mqd_t mq_timedsend(mqd_t mqdes, const char *msg_ptr,  
                      size_t msg_len, unsigned msg_prio,  
                      const struct timespec *abs_timeout);  
  
mqd_t mq_timedreceive(mqd_t mqdes, char *msg_ptr,  
                      size_t msg_len, unsigned *msg_prio,  
                      const struct timespec *abs_timeout);  
#endif

其中,mq_send向消息队列中写入一条消息,mq_receive从消息队列中读取一条消息。

mqdes:消息队列描述符;
msg_ptr:指向消息体缓冲区的指针;
msg_len:消息体的长度,其中mq_receive的该参数不能小于能写入队列中消息的最大大小,即一定要大于等于该队列的mq_attr结构中mq_msgsize的大小。如果mq_receive中的msg_len小于该值,就会返回EMSGSIZE错误。POXIS消息队列发送的消息长度可以为0。
msg_prio:消息的优先级;它是一个小于MQ_PRIO_MAX的数,数值越大,优先级越高。POSIX消息队列在调用mq_receive时总是返回队列中最高优先级的最早消息。如果消息不需要设定优先级,那么可以在mq_send是置msg_prio为0,mq_receive的msg_prio置为NULL。

还有两个XSI定义的扩展接口限时发送和接收消息的函数:mq_timedsend和mq_timedreceive函数。默认情况下mq_send和mq_receive是阻塞进行调用,可以通过mq_setattr来设置为O_NONBLOCK。

下面是消息队列使用的测试代码:

#include <iostream>  
#include <cstring>  
#include <errno.h>
#include <stdlib.h>

#include <unistd.h>  
#include <fcntl.h>  
#include <mqueue.h>  

using namespace std;

int main()
{
	mqd_t mqID;
	mqID = mq_open("/shltsh_queue", O_RDWR | O_CREAT | O_EXCL, 0666, NULL);

	if (mqID < 0)
	{
		if (errno == EEXIST)
		{
			mq_unlink("/shltsh_queue");
			mqID = mq_open("/shltsh_queue", O_RDWR | O_CREAT, 0666, NULL);
		}
		else
		{
			cout << "open message queue error." << strerror(errno) << endl;
			return -1;
		}
	}

	if (fork() == 0)
	{
		mq_attr mqAttr;
		mq_getattr(mqID, &mqAttr);

		char *buf = new char[mqAttr.mq_msgsize];

		for (int i = 1; i <= 5; ++i)
		{
			if (mq_receive(mqID, buf, mqAttr.mq_msgsize, NULL) < 0)
			{
				cout << "receive message  failed.";
				cout << "error info:" << strerror(errno) << endl;
				continue;
			}

			cout << "receive message " << i << ": " << buf << endl;
		}
		exit(0);
	}

	char msg[] = "hello world";
	for (int i = 1; i <= 5; ++i)
	{
		if (mq_send(mqID, msg, sizeof(msg), i) < 0)
		{
			cout << "send message " << i << " failed. ";
			cout << "error info:" << strerror(errno) << endl;
		}
		cout << "send message " << i << " success. " << endl;

		sleep(1);
	}
}
编译:

[root@MiWiFi-R1CM csdnblog]# g++ pipe.cpp  -lrt

输出:
send message 1 success. 
receive message 1: hello world
send message 2 success. 
receive message 2: hello world
send message 3 success. 
receive message 3: hello world
send message 4 success. 
receive message 4: hello world
send message 5 success. 
receive message 5: hello world

6.消息队列的限制

POSIX消息队列本身的限制就是mq_attr中的mq_maxmsg和mq_msgsize,分别用于限定消息队列中的最大消息数和每个消息的最大字节数。在前面已经说过了,这两个参数可以在调用mq_open创建一个消息队列的时候设定。当这个设定是受到系统内核限制的。

下面是在Linux 2.6.32下shell对启动进程的POSIX消息队列大小的限制:

[root@MiWiFi-R1CM csdnblog]# ulimit -a  | grep message
POSIX message queues     (bytes, -q) 819200

限制大小为800KB,该大小是整个消息队列的大小,不仅仅是最大消息数*消息的最大大小;还包括消息队列的额外开销。前面我们知道Linux 2.6.32下POSIX消息队列默认的最大消息数和消息的最大大小分别为:
mq_maxmsg = 10  
mq_msgsize = 8192  

为了说明上面的限制大小包括消息队列的额外开销,下面是测试代码:

#include <iostream>  
#include <cstring>  
#include <errno.h>  
#include <stdlib.h>

#include <unistd.h>  
#include <fcntl.h>  
#include <mqueue.h>  

int main(int argc, char **argv)
{
	mqd_t mqID;
	mq_attr attr;
	attr.mq_maxmsg = atoi(argv[1]);
	attr.mq_msgsize = atoi(argv[2]);

	mqID = mq_open("/root/shltsh_queue", O_RDWR | O_CREAT | O_EXCL, 0666, &attr);

	if (mqID < 0)
	{
		if (errno == EEXIST)
		{
			mq_unlink("/root/shltsh_queue");
			mqID = mq_open("/root/shltsh_queue", O_RDWR | O_CREAT, 0666, &attr);

			if (mqID < 0)
			{
				std::cout << "open message queue error." << strerror(errno) << std::endl;
				return -1;
			}
		}
		else
		{
			std::cout << "open message queue error." << strerror(errno) << std::endl;
			return -1;
		}
	}

	mq_attr mqAttr;
	if (mq_getattr(mqID, &mqAttr) < 0)
	{
		std::cout << "get the message queue attribute error." << std::endl;
		return -1;
	}

	std::cout << "mq_flags:" << mqAttr.mq_flags << std::endl;
	std::cout << "mq_maxmsg:" << mqAttr.mq_maxmsg << std::endl;
	std::cout << "mq_msgsize:" << mqAttr.mq_msgsize << std::endl;
	std::cout << "mq_curmsgs:" << mqAttr.mq_curmsgs << std::endl;
}

下面进行创建消息队列时设置最大消息数和消息的最大大小进行测试:

[root@MiWiFi-R1CM csdnblog]# ./a.out 10 81920
open message queue error.Cannot allocate memory
[root@MiWiFi-R1CM csdnblog]# ./a.out 10 80000
open message queue error.Cannot allocate memory
[root@MiWiFi-R1CM csdnblog]# ./a.out 10 70000
open message queue error.Cannot allocate memory
[root@MiWiFi-R1CM csdnblog]# ./a.out 10 60000
mq_flags:0
mq_maxmsg:10
mq_msgsize:60000
mq_curmsgs:0

从上面可以看出消息队列真正存放消息数据的大小是没有819200B的。可以通过修改该限制参数,来改变消息队列的所能容纳消息的数量。可以通过下面方式来修改限制,但这会在shell启动进程结束后失效,可以将设置写入开机启动的脚本中执行,例如.bashrc,rc.local。

[root@MiWiFi-R1CM csdnblog]# ulimit -q 1024000000 
[root@MiWiFi-R1CM csdnblog]# ulimit -a | grep message
POSIX message queues     (bytes, -q) 1024000000

下面再次测试可以设置的消息队列的属性:
[root@MiWiFi-R1CM csdnblog]# ./a.out 10 81920
mq_flags:0
mq_maxmsg:10
mq_msgsize:81920
mq_curmsgs:0
[root@MiWiFi-R1CM csdnblog]# ./a.out 10 819200
mq_flags:0
mq_maxmsg:10
mq_msgsize:819200
mq_curmsgs:0
[root@MiWiFi-R1CM csdnblog]# ./a.out 10 8192
mq_flags:0
mq_maxmsg:10
mq_msgsize:8192
mq_curmsgs:0

POSIX消息队列在实现上还有另外两个限制:
MQ_OPEN_MAX:一个进程能同时打开的消息队列的最大数目,POSIX要求至少为8;
MQ_PRIO_MAX:消息的最大优先级,POSIX要求至少为32;




















郑重声明:本站内容如果来自互联网及其他传播媒体,其版权均属原媒体及文章作者所有。转载目的在于传递更多信息及用于网络分享,并不代表本站赞同其观点和对其真实性负责,也不构成任何其他建议。