Linux客户/服务器程序设计范式2——并发服务器(进程池)
引言
让服务器在启动阶段调用fork创建一个子进程池,通过子进程来处理客户端请求。子进程与父进程之间使用socketpair进行通信(为了方便使用sendmsg与recvmsg,如果使用匿名管道,则无法使用以上两个函数)。以下针对TCP进行分析。
server端使用select轮询用于监听客户端请求的被动套接字fd_listen以及用于父子之间通信的socketpair。每当客户端有请求时,server端会将由accept返回的用于与客户端通信的socket描述符通过socketpair发送给一个空闲的子进程,由子进程与客户端进行通信(处理请求)。因此服务器端需要维护一个子进程队列,队列中的每个元素存放着与子进程通信的socketpair以及标记子进程是否空闲的标志位,如下:
1 typedef struct tag_chd 2 { 3 int s_sfd ; //与子进程通信的socketpair描述符 4 int s_state ; //标记子进程是否空闲 5 }NODE, *pNODE;
每当子进程处理完客户端请求时,会通过socketpair向server端发送消息,server端select到该socketpair后,会将对应子进程标志位设置为空闲。
注意
1. 由于父进程是先创建子进程,之后才accept用于与客户端通信的socket描述符fd_client,因此子进程的pcb中并没有fd_client的信息。server端需要将fd_client发送子进程。如果只是用send来发送fd_client信息的话,子进程只会将其当成一个整型数。我们需要用sendmsg将fd_client连同其辅助(控制)信息一并发送,这样子进程才会将其当成一个socket描述符。
2. 父进程预先创建子进程池,该子进程如同server端一样是永远不会退出的。子进程中使用while死循环,如下:
1 while(1) 2 { 3 readn = read(sfd, &flag, 4); // 服务器分配的子进程在子进程队列中的下标 4 printf("readn: %d \n", readn); 5 printf("read from father: %d \n", flag); 6 recv_fd(sfd, &fd_client); // recv_fd中封装了recvmsg,接收与客户端通信的socket描述符 7 handle_request(fd_client); // 处理客户端请求 8 write(sfd, &pid, sizeof(pid)); // 处理完请求后通过socketpair通知服务器,服务器将该子进程状态设置为空闲 9 }
每当子进程处理完一个客户端请求后(也就是客户端退出了),子进程会阻塞在 read 处,等待接收下一个客户端请求。
由于是while死循环,且死循环中没有break语句,因此子进程不可能跳出这个while循环,也就不会执行while循环以下的内容了,这样可以保证子进程结尾没有exit也不会执行之后的内容。
函数原型
1 #include <sys/types.h> 2 #include <sys/socket.h> 3 4 ssize_t recvmsg(int sockfd, struct msghdr *msg, int flags); 5 ssize_t sendmsg(int sockfd, const struct msghdr *msg, int flags); 6 7 struct msghdr { 8 void *msg_name; /* optional address */ 9 socklen_t msg_namelen; /* size of address */ 10 struct iovec *msg_iov; /* scatter/gather array */ 11 size_t msg_iovlen; /* # elements in msg_iov */ 12 void *msg_control; /* ancillary data, see below */ 13 socklen_t msg_controllen; /* ancillary data buffer len */ 14 int msg_flags; /* flags on received message */ 15 }; 16 17 18 19 fields: 20 21 struct iovec { /* Scatter/gather array items */ 22 void *iov_base; /* Starting address */ 23 size_t iov_len; /* Number of bytes to transfer */ 24 }; 25 26 struct cmsghdr { 27 socklen_t cmsg_len; /* data byte count, including header */ 28 int cmsg_level; /* originating protocol */ /* 如果是文件描述符,填SOL_SOCKET */ 29 int cmsg_type; /* protocol-specific type */ /* 如果是文件描述符,填SCM_RIGHTS */ 30 /* followed by unsigned char cmsg_data[]; */ 31 }; 32 33 /* 返回cmsghdr结构的cmsg_len成员的值,考虑到对齐,使用数据部分的长度作为参数。*/ 34 size_t CMSG_LEN(size_t length); 35 /* 返回cmsghdr的数据部分指针。*/ 36 unsigned char *CMSG_DATA(struct cmsghdr *cmsg); 37 38 CMSG_DATA() returns a pointer to the data portion of a cmsghdr. 39 CMSG_LEN() returns the value to store in the cmsg_len member of the cmsghdr structure, taking into 40 account any necessary alignment. It takes the data length as an argument. 41 This is a constant expression.
1 NAME 2 socketpair - create a pair of connected sockets 3 4 SYNOPSIS 5 #include <sys/types.h> /* See NOTES */ 6 #include <sys/socket.h> 7 8 int socketpair(int domain, int type, int protocol, int sv[2]); 9 RETURN VALUE 10 On success, zero is returned. On error, -1 is returned, and errno is 11 set appropriately.
代码
server.h
1 #ifndef __SERVER_H__ 2 #define __SERVER_H__ 3 #include "my_socket.h" 4 #include <sys/stat.h> 5 #include <sys/types.h> 6 #include <fcntl.h> 7 #include <sys/time.h> 8 #include <sys/select.h> 9 #include <sys/uio.h> 10 #include <sys/wait.h> 11 #include <errno.h> 12 #define SER_IP "127.0.0.1" 13 #define SER_PORT 8888 14 #define ST_BUSY 1 15 #define ST_IDLE 2 16 #define SIZE 8192 17 #define MSG_SIZE (SIZE - 4) 18 19 typedef struct tag_mag 20 { 21 int msg_len ; 22 char msg_buf[MSG_SIZE];//8188 23 }MSG, *pMSG; 24 25 typedef struct tag_chd 26 { 27 int s_sfd ; 28 int s_state ; 29 }NODE, *pNODE; 30 31 extern int errno ; 32 void make_child(pNODE arr, int cnt); 33 void child_main(int sfd) ; 34 void handle_request(int sfd); 35 void send_fd(int sfd, int fd_file) ; 36 void recv_fd(int sfd, int* fd_file) ; 37 void dispatch(pNODE arr, int cnt, int fd_client); 38 #endif
main.c
1 #include "server.h" 2 int main(int argc, char* argv[])//exe chld_cnt 3 { 4 if(argc != 2) 5 { 6 printf("Usage: exe , child_cnt! \n"); 7 exit(1); 8 } 9 int child_cnt = atoi(argv[1]); 10 pNODE arr_child = (pNODE)calloc(child_cnt, sizeof(NODE)) ; /* 动态数组维护子进程池 */ 11 make_child(arr_child, child_cnt); 12 13 int fd_listen, fd_client ; 14 my_socket(&fd_listen, MY_TCP, SER_IP, SER_PORT); 15 my_listen(fd_listen, 10); 16 17 fd_set readset, readyset ; 18 FD_ZERO(&readset); 19 FD_ZERO(&readyset); 20 FD_SET(fd_listen, &readset); 21 int index ; 22 for(index = 0; index < child_cnt; index ++) 23 { 24 FD_SET(arr_child[index].s_sfd, &readset); 25 } 26 27 int select_ret ; 28 struct timeval tm ; 29 while(1) 30 { 31 tm.tv_sec = 0 ; 32 tm.tv_usec = 1000 ; 33 readyset = readset ; 34 select_ret = select(1024, &readyset, NULL, NULL, &tm); 35 if(select_ret == 0) /* 轮询时间内,所有描述符均没有活动,返回0,继续轮询 */ 36 { 37 continue ; 38 }else if(select_ret == -1) /* 信号 */ 39 { 40 if(errno == EINTR) 41 { 42 continue ; 43 }else 44 { 45 exit(1); 46 } 47 }else 48 { 49 if(FD_ISSET(fd_listen, &readyset)) 50 { 51 fd_client = accept(fd_listen, NULL, NULL) ; 52 dispatch(arr_child, child_cnt ,fd_client); 53 close(fd_client); 54 } 55 for(index = 0; index < child_cnt; index ++) 56 { 57 if(FD_ISSET(arr_child[index].s_sfd, &readyset)) 58 { 59 int val ; 60 read(arr_child[index].s_sfd, &val, 4); 61 arr_child[index].s_state = ST_IDLE ; 62 } 63 } 64 65 } 66 67 } 68 }
server.c
1 #include "server.h" 2 void make_child(pNODE arr, int cnt) 3 { 4 int index ; 5 for(index = 0; index < cnt; index ++) 6 { 7 pid_t pid ; 8 int fds[2] ;//fds[0] - c fds[1] - p 9 socketpair(AF_LOCAL, SOCK_STREAM, 0, fds); 10 pid = fork() ; 11 if(pid == 0)// child 12 { 13 close(fds[1]); /* 子进程用fds[0],关闭fds[1] */ 14 child_main(fds[0]) ; /* 每创建一个子进程,子进程就进入该函数中(死循环),接收请求,处理请求,如此循环。*/ 15 16 }else 17 { 18 /* 初始化进程池队列中的每一个子进程 */ 19 arr[index].s_sfd = fds[1] ; 20 arr[index].s_state = ST_IDLE ; 21 close(fds[0]); /* 父进程用fds[1], 关闭fds[0] */ 22 } 23 24 } 25 26 } 27 void child_main(int sfd) 28 { 29 int fd_client ; 30 int flag ; 31 int readn ; 32 pid_t pid = getpid(); 33 while(1) 34 { 35 readn = read(sfd, &flag, 4); 36 printf("readn: %d \n", readn); 37 printf("read from father: %d \n", flag); 38 recv_fd(sfd, &fd_client); 39 handle_request(fd_client); 40 write(sfd, &pid, sizeof(pid)); 41 } 42 } 43 void handle_request(int sfd) 44 { 45 46 MSG my_msg ; 47 int recvn ; 48 while(1) 49 { 50 memset(&my_msg, 0, sizeof(MSG)); 51 my_recv(&recvn, sfd, &my_msg, 4); 52 if(my_msg.msg_len == 0) 53 { 54 break ; 55 } 56 my_recv(NULL, sfd, my_msg.msg_buf, my_msg.msg_len); 57 my_send(NULL, sfd, &my_msg, my_msg.msg_len + 4); 58 59 } 60 61 } 62 void send_fd(int sfd, int fd_file) 63 { 64 struct msghdr my_msg ; 65 memset(&my_msg, 0, sizeof(my_msg)); 66 67 struct iovec bufs[1] ; 68 char buf[32] = "hello world ! \n"; 69 bufs[0].iov_base = buf ; 70 bufs[0].iov_len = strlen(buf) ; 71 72 my_msg.msg_name = NULL ; 73 my_msg.msg_namelen = 0 ; 74 my_msg.msg_iov = bufs ; 75 my_msg.msg_iovlen = 1 ; 76 my_msg.msg_flags = 0 ; 77 78 struct cmsghdr *p ; 79 int cmsg_len = CMSG_LEN(sizeof(int)) ; /* 所传为文件描述符,因此sizeof(int) */ 80 p = (struct cmsghdr*)calloc(1, cmsg_len) ; 81 p -> cmsg_len = cmsg_len ; 82 p -> cmsg_level = SOL_SOCKET ; 83 p -> cmsg_type = SCM_RIGHTS ; 84 *(int*)CMSG_DATA(p) = fd_file ; 85 86 my_msg.msg_control = p ; 87 my_msg.msg_controllen = cmsg_len ; 88 89 int sendn ; 90 sendn = sendmsg(sfd, &my_msg, 0); 91 printf("send masg len : %d \n", sendn); 92 } 93 void recv_fd(int sfd, int* fd_file) 94 { 95 struct msghdr my_msg ; 96 97 struct iovec bufs[1] ; 98 char buf1[32]="" ; 99 bufs[0].iov_base = buf1 ; 100 bufs[0].iov_len = 31 ; 101 102 my_msg.msg_name = NULL ; 103 my_msg.msg_namelen = 0 ; 104 my_msg.msg_iov = bufs ; 105 my_msg.msg_iovlen = 2 ; 106 my_msg.msg_flags = 0 ; 107 108 struct cmsghdr *p ; 109 int cmsg_len = CMSG_LEN(sizeof(int)) ; 110 p = (struct cmsghdr*)calloc(1, cmsg_len) ; 111 my_msg.msg_control = p ; 112 my_msg.msg_controllen = cmsg_len ; 113 114 int recvn ; 115 recvn = recvmsg(sfd, &my_msg, 0); 116 117 *fd_file = *(int*)CMSG_DATA((struct cmsghdr*)my_msg.msg_control); //写成*(int*)CMSG_DATA(P)也可 118 119 printf("buf1: %s, recv msg len : %d \n", buf1, recvn); 120 121 } 122 void dispatch(pNODE arr, int cnt, int fd_client) 123 { 124 int index ; 125 for(index = 0 ; index < cnt; index ++) 126 { 127 if(arr[index].s_state == ST_IDLE) 128 { 129 write(arr[index].s_sfd, &index, 4); 130 send_fd(arr[index].s_sfd, fd_client); /* 向空闲的子进程分配任务,将服务器accept返回的socket描述符发送给子进程*/ 131 arr[index].s_state = ST_BUSY ; 132 break ; 133 } 134 } 135 }
client.c
1 #include "my_socket.h" 2 #define MY_IP "127.0.0.1" 3 #define MY_PORT 6666 4 #define SER_IP "127.0.0.1" 5 #define SER_PORT 8888 6 #define SIZE 8192 7 #define MSG_SIZE (SIZE - 4) 8 typedef struct tag_mag// 9 { 10 int msg_len ; 11 char msg_buf[MSG_SIZE];//8188 12 }MSG, *pMSG; 13 int main(int argc, char* argv[]) 14 { 15 int sfd ; 16 my_socket(&sfd, MY_TCP, MY_IP, atoi(argv[1])); 17 my_connect(sfd, SER_IP, SER_PORT); 18 MSG my_msg ; 19 while(memset(&my_msg, 0, sizeof(MSG)), fgets(my_msg.msg_buf, MSG_SIZE, stdin)!= NULL) 20 { 21 my_msg.msg_len = strlen(my_msg.msg_buf); 22 my_send(NULL, sfd, &my_msg, 4 + my_msg.msg_len ); 23 memset(&my_msg, 0, sizeof(MSG)); 24 my_recv(NULL, sfd, &my_msg, 4); 25 my_recv(NULL, sfd, &my_msg.msg_buf, my_msg.msg_len); 26 printf("recv from server : %s \n", my_msg.msg_buf); 27 28 } 29 /* 客户端退出时,向服务器发送一个长度为0的消息 ,用于通知服务器退出 */ 30 memset(&my_msg, 0, sizeof(MSG)); 31 my_send(NULL, sfd, &my_msg, 4 + my_msg.msg_len); 32 close(sfd); 33 34 }
郑重声明:本站内容如果来自互联网及其他传播媒体,其版权均属原媒体及文章作者所有。转载目的在于传递更多信息及用于网络分享,并不代表本站赞同其观点和对其真实性负责,也不构成任何其他建议。