细说linux IPC(九):posix消息队列
- mq_open()函数打开或创建一个posix消息队列。
#include <fcntl.h> /* For O_* constants */ #include <sys/stat.h> /* For mode constants */ #include <mqueue.h> mqd_t mq_open(const char *name, int oflag); mqd_t mq_open(const char *name, int oflag, mode_t mode, struct mq_attr *attr); Link with -lrt.
- mq_close()函数关闭消息队列。
#include <mqueue.h> int mq_close(mqd_t mqdes); Link with -lrt.
- mq_unlink()函数从系统中删除某个消息队列。
#include <mqueue.h> int mq_unlink(const char *name); Link with -lrt.
- mq_setattr()函数和mq_getattr()函数分别设置和和获取消息队列属性。
#include <mqueue.h> int mq_getattr(mqd_t mqdes, struct mq_attr *attr); int mq_setattr(mqd_t mqdes, struct mq_attr *newattr, struct mq_attr *oldattr); Link with -lrt.
struct mq_attr { long mq_flags; /* Flags: 0 or O_NONBLOCK */ long mq_maxmsg; /* Max. # of messages on queue */ long mq_msgsize; /* Max. message size (bytes) */ long mq_curmsgs; /* # of messages currently in queue */ };
- mq_send() 函数 和mq_receive()函数分别用于向消息队列放置和取走消息。
#include <mqueue.h> int mq_send(mqd_t mqdes, const char *msg_ptr, size_t msg_len, unsigned msg_prio); ssize_t mq_receive(mqd_t mqdes, char *msg_ptr, size_t msg_len, unsigned *msg_prio); Link with -lrt.
int sln_ipc_mq_loop(void) { mqd_t mqd; struct mq_attr setattr, attr; char *recvbuf = NULL; unsigned int prio; int recvlen; setattr.mq_maxmsg = SLN_IPC_MQ_MAXMSG; setattr.mq_msgsize = SLN_IPC_MQ_MSGSIZE; mqd = mq_open(SLN_IPC_MQ_NAME, O_RDWR | O_CREAT | O_EXCL, 0644, &setattr); //创建消息队列并设置消息队列属性 if ((mqd < 0) && (errno != EEXIST)) { fprintf(stderr, "mq_open: %s\n", strerror(errno)); return -1; } if ((mqd < 0) && (errno == EEXIST)) { // 消息队列存在则打开 mqd = mq_open(SLN_IPC_MQ_NAME, O_RDWR); if (mqd < 0) { fprintf(stderr, "mq_open: %s\n", strerror(errno)); return -1; } } if (mq_getattr(mqd, &attr) < 0) { //获取消息队列属性 fprintf(stderr, "mq_getattr: %s\n", strerror(errno)); return -1; } printf("flags: %ld, maxmsg: %ld, msgsize: %ld, curmsgs: %ld\n", attr.mq_flags, attr.mq_maxmsg, attr.mq_msgsize, attr.mq_curmsgs); recvbuf = malloc(attr.mq_msgsize); //为读取消息队列分配当前系统允许的每条消息的最大大小的内存空间 if (NULL == recvbuf) { return -1; } for (;;) { recvlen = mq_receive(mqd, recvbuf, attr.mq_msgsize, &prio); //从消息队列中读取消息 if (recvlen < 0) { fprintf(stderr, "mq_receive: %s\n", strerror(errno)); continue; } printf("recvive length: %d, prio: %d, recvbuf: %s\n", recvlen, prio, recvbuf); } return 0; }
int sln_ipc_mq_send(const char *sendbuf, int sendlen, int prio) { mqd_t mqd; mqd = mq_open(SLN_IPC_MQ_NAME, O_WRONLY); //客户进程打开消息队列 if (mqd < 0) { fprintf(stderr, "mq_open: %s\n", strerror(errno)); return -1; } if (mq_send(mqd, sendbuf, sendlen, prio) < 0) { //客户进程网消息队列中添加一条消息 fprintf(stderr, "mq_send: %s\n", strerror(errno)); return -1; } return 0; }
int sln_ipc_mq_loop(void) { mqd_t mqd; struct mq_attr setattr, attr; char *recvbuf = NULL; unsigned int prio; int recvlen; memset(&setattr, 0, sizeof(setattr)); setattr.mq_maxmsg = SLN_IPC_MQ_MAXMSG; setattr.mq_msgsize = SLN_IPC_MQ_MSGSIZE; mqd = mq_open(SLN_IPC_MQ_NAME, O_RDWR | O_CREAT | O_EXCL, 0644, &setattr); //mqd = mq_open(SLN_IPC_MQ_NAME, O_RDWR | O_CREAT | O_EXCL, 0644, NULL); if ((mqd < 0) && (errno != EEXIST)) { fprintf(stderr, "mq_open: %s\n", strerror(errno)); return -1; } if ((mqd < 0) && (errno == EEXIST)) { // name is exist mqd = mq_open(SLN_IPC_MQ_NAME, O_RDWR); if (mqd < 0) { fprintf(stderr, "mq_open: %s\n", strerror(errno)); return -1; } } if (mq_getattr(mqd, &attr) < 0) { fprintf(stderr, "mq_getattr: %s\n", strerror(errno)); return -1; } printf("flags: %ld, maxmsg: %ld, msgsize: %ld, curmsgs: %ld\n", attr.mq_flags, attr.mq_maxmsg, attr.mq_msgsize, attr.mq_curmsgs); recvbuf = malloc(attr.mq_msgsize); if (NULL == recvbuf) { return -1; } sleep(10); //此处等待10秒,此时客户进程一次性向消息队列加入多条消息 for (;;) { if (mq_getattr(mqd, &attr) < 0) { fprintf(stderr, "mq_getattr: %s\n", strerror(errno)); return -1; } printf("msgsize: %ld, curmsgs: %ld\n", attr.mq_msgsize, attr.mq_curmsgs); recvlen = mq_receive(mqd, recvbuf, attr.mq_msgsize, &prio); if (recvlen < 0) { fprintf(stderr, "mq_receive: %s\n", strerror(errno)); continue; } printf("recvive-> prio: %d, recvbuf: %s\n", prio, recvbuf); sleep(1); //每秒处理一个消息 } mq_close(mqd); return 0; }
# ./server flags: 0, maxmsg: 10, msgsize: 1024, curmsgs: 0 msgsize: 1024, curmsgs: 10 recvive-> prio: 10, recvbuf: asdf msgsize: 1024, curmsgs: 10 recvive-> prio: 11, recvbuf: 1234 msgsize: 1024, curmsgs: 10 recvive-> prio: 12, recvbuf: asdf msgsize: 1024, curmsgs: 9 recvive-> prio: 9, recvbuf: 1234 msgsize: 1024, curmsgs: 8 recvive-> prio: 8, recvbuf: asdf msgsize: 1024, curmsgs: 7 recvive-> prio: 7, recvbuf: 1234 msgsize: 1024, curmsgs: 6 recvive-> prio: 6, recvbuf: asdf msgsize: 1024, curmsgs: 5 recvive-> prio: 5, recvbuf: 1234 msgsize: 1024, curmsgs: 4 recvive-> prio: 4, recvbuf: asdf msgsize: 1024, curmsgs: 3 recvive-> prio: 3, recvbuf: 1234 msgsize: 1024, curmsgs: 2 recvive-> prio: 2, recvbuf: asdf msgsize: 1024, curmsgs: 1 recvive-> prio: 1, recvbuf: 1234 msgsize: 1024, curmsgs: 0
郑重声明:本站内容如果来自互联网及其他传播媒体,其版权均属原媒体及文章作者所有。转载目的在于传递更多信息及用于网络分享,并不代表本站赞同其观点和对其真实性负责,也不构成任何其他建议。