细说linux IPC(十):system V 消息队列
- msgget()函数创建一个消息队列或打开一个消息队列。
#include <sys/types.h> #include <sys/ipc.h> #include <sys/msg.h> int msgget(key_t key, int msgflg);
struct msqid_ds { struct ipc_perm msg_perm; /* Ownership and permissions */ time_t msg_stime; /* Time of last msgsnd(2) */ time_t msg_rtime; /* Time of last msgrcv(2) */ time_t msg_ctime; /* Time of last change */ unsigned long __msg_cbytes; /* Current number of bytes in queue (nonstandard) */ msgqnum_t msg_qnum; /* Current number of messages in queue */ msglen_t msg_qbytes; /* Maximum number of bytes allowed in queue */ pid_t msg_lspid; /* PID of last msgsnd(2) */ pid_t msg_lrpid; /* PID of last msgrcv(2) */ };
struct ipc_perm { key_t __key; /* Key supplied to msgget(2) */ uid_t uid; /* Effective UID of owner */ gid_t gid; /* Effective GID of owner */ uid_t cuid; /* Effective UID of creator */ gid_t cgid; /* Effective GID of creator */ unsigned short mode; /* Permissions */ unsigned short __seq; /* Sequence number */ };
- msgsnd()函数向消息队列中添加消息。
#include <sys/types.h> #include <sys/ipc.h> #include <sys/msg.h> int msgsnd(int msqid, const void *msgp, size_t msgsz, int msgflg);
struct msgbuf { long mtype; /* message type, must be > 0 */ char mtext[1]; /* message data */ };
- msgrcv()函数从消息队列中读取消息。
#include <sys/types.h> #include <sys/ipc.h> #include <sys/msg.h> ssize_t msgrcv(int msqid, void *msgp, size_t msgsz, long msgtyp, int msgflg);
IPC_STAT - 获取消息队列信息,从内核数据结构中拷贝消息队列为msqid的结构体到buf指针指向的msqid_ds 结构体中。
#include <stdio.h> #include <string.h> #include <sys/types.h> #include <sys/ipc.h> #include <errno.h> #include <unistd.h> #include <signal.h> #include <stdlib.h> #include "slnmq.h" static int recv_type, msgid; int sln_server_loop(void) { int rcvmsg_len; key_t key; struct sln_msgbuf slnmsg; key = ftok(SLN_IPC_MQ_NAME, 0); if (key < 0) { fprintf(stderr, "ftok: %s\n", strerror(errno)); return -1; } msgid = msgget(key, IPC_CREAT | IPC_EXCL); if ((msgid < 0) && (errno != EEXIST)) { fprintf(stderr, "msgget: %s\n", strerror(errno)); return -1; } if (msgid < 0) { printf("Open existing mq!\n"); msgid = msgget(key, IPC_CREAT); if (msgid < 0) { fprintf(stderr, "msgget: %s\n", strerror(errno)); return -1; } } sleep(10); //just for test for (;;) { rcvmsg_len = msgrcv(msgid, &slnmsg, SLN_IPC_MQ_MSGSIZE, recv_type, 0); if (rcvmsg_len < 0) { fprintf(stderr, "msgrcv: %s\n", strerror(errno)); sleep(1); continue; } fprintf(stdout, "receive - recv len: %d, msg type: %d, msg: %s\n", rcvmsg_len, slnmsg.msgtype, slnmsg.msg); } return 0; } static void sigint_func(int sig) { if (msgctl(msgid, IPC_RMID, NULL) < 0) { fprintf(stderr, "msgctl: %s\n", strerror(errno)); } exit(0); } int main(int argc, const char *argv[]) { FILE *fp = NULL; if (argc != 2) { fprintf(stderr, "Usage: %s <msg type>\n", argv[0]);//运行时,指定服务要进程接收的消息类型 return -1; } recv_type = atoi(argv[1]); if (access(SLN_IPC_MQ_NAME, F_OK) < 0) { fp = fopen(SLN_IPC_MQ_NAME, "w+"); if (NULL != fp) { fclose(fp); } } signal(SIGINT, sigint_func); //服务进程捕获中断信号,在中断处理函数里面删除消息队列并推出程序 sln_server_loop(); return 0; }
#include <stdio.h> #include <string.h> #include <sys/types.h> #include <sys/ipc.h> #include <errno.h> #include "slnmq.h" int sln_msgsnd(int msg_type, const void *sndmsg, int sndlen) { int msgid, rcvmsg_len; key_t key; struct sln_msgbuf slnmsg; key = ftok(SLN_IPC_MQ_NAME, 0); if (key < 0) { fprintf(stderr, "ftok: %s\n", strerror(errno)); return -1; } msgid = msgget(key, IPC_CREAT); if (msgid < 0) { fprintf(stderr, "msgget: %s\n", strerror(errno)); return -1; } slnmsg.msgtype = msg_type; memcpy(&slnmsg.msg, sndmsg, sndlen); msgsnd(msgid, (void *)&slnmsg, sndlen, 0); } int main(int argc, const char *argv[]) { int type; if (argc != 3) { fprintf(stderr, "Usage: %s <msg type> <msg content>\n", argv[0]); return -1; } type = atoi(argv[1]); sln_msgsnd(type, argv[2], strlen(argv[2]) + 1); return 0; }
./client 1 abcd ./client 2 efghi ./client 3 jkl ./client 4 mn ./client 5 opqrst ./client 6 uv ./client 7 w ./client 8 xyz
# ./server 0(读取所有消息) receive - recv len: 5, msg type: 1, msg: abcd receive - recv len: 6, msg type: 2, msg: efghi receive - recv len: 4, msg type: 3, msg: jkl receive - recv len: 3, msg type: 4, msg: mn receive - recv len: 7, msg type: 5, msg: opqrst receive - recv len: 3, msg type: 6, msg: uv receive - recv len: 2, msg type: 7, msg: w receive - recv len: 4, msg type: 8, msg: xyz
# ./server 4 (读取消息类型为4的消息) Open existing mq! receive - recv len: 3, msg type: 4, msg: mn
# ./server -5 (读取消息小于5的消息) Open existing mq! receive - recv len: 5, msg type: 1, msg: abcd receive - recv len: 6, msg type: 2, msg: efghi receive - recv len: 4, msg type: 3, msg: jkl receive - recv len: 3, msg type: 4, msg: mn receive - recv len: 7, msg type: 5, msg: opqrst
# ipcs ------ Message Queues -------- key msqid owner perms used-bytes messages 0x001fffde 229376 root 0 0 0
# ipcs ------ Message Queues -------- key msqid owner perms used-bytes messages
郑重声明:本站内容如果来自互联网及其他传播媒体,其版权均属原媒体及文章作者所有。转载目的在于传递更多信息及用于网络分享,并不代表本站赞同其观点和对其真实性负责,也不构成任何其他建议。