分布式缓存系统Memcached(九)——状态机之socket连接与派发
上节已经分析到了主线程中监听socket注册事件和工作线程中连接socket注册事件的回调函数都是event_handler,且event_handler的核心部分都是一个有限状态机:drive_machine。因此接下来将对该状态机具体的业务处理进行深入的剖析。
memcached将每个socket都封装为一个conn结构体,该结构体包含了比如socket的文件描述符sfd、注册事件event、连接状态结构体conn_states,等等诸多信息字段,其中的状态结构:conn_states中包含了该socket的各种状态。 而状态机drive_machine正是通过该状态结构来判断该socket当前所处的具体状态,从而进行业务逻辑处理的。
其中连接状态结构体如下:
<span style="font-size:18px;">//socket的可能状态组成的结构体 enum conn_states { conn_listening, //监听状态/**< the socket which listens for connections */ conn_new_cmd, //为下一个连接做准备/**< Prepare connection for next command */ conn_waiting, //等待读取一个数据包/**< waiting for a readable socket */ conn_read, //读取网络数据/**< reading in a command line */ conn_parse_cmd, //解析缓冲区数据/**< try to parse a command from the input buffer */ conn_write, //简单的回复数据/**< writing out a simple response */ conn_nread, //读取固定字节的网络数据/**< reading in a fixed number of bytes */ conn_swallow, //处理不需要的写缓冲区的数据/**< swallowing unnecessary bytes w/o storing */ conn_closing, //关闭连接/**< closing this connection */ conn_mwrite, //顺序写入多个item数据 /**< writing out many items sequentially */ conn_closed, //连接已关闭/**< connection is closed */ conn_max_state //最大状态,断言使用/**< Max state value (used for assertion) */ };</span>
接下来看下drive_machine的概貌吧,其中主要就是一个while循环以处理各状态的业务逻辑:
//监听套接字和 连接套接字 事件回调函数的核心部分: //有限状态机:根据套接字的状态conn_sattes执行对应的操作 static void drive_machine(conn *c) { bool stop = false; int sfd; socklen_t addrlen; struct sockaddr_storage addr; int nreqs = settings.reqs_per_event; int res; const char *str; assert(c != NULL); //因为状态间存在转化或跳变等,因此需要循环,直到确定stop为止 while (!stop) { //对套接字的各种状态,进行对应业务处理 switch(c->state) { case conn_listening://监听状态 addrlen = sizeof(addr); // // // //主线程进入状态机之后执行accept操作,这个操作也是非阻塞的。 sfd = accept(c->sfd, (struct sockaddr *)&addr, &addrlen); #endif //连接失败 if (sfd == -1) { // // } //连接成功,则将连接socket设为非阻塞 if (!use_accept4) { if (fcntl(sfd, F_SETFL, fcntl(sfd, F_GETFL) | O_NONBLOCK) < 0) { perror("setting O_NONBLOCK"); close(sfd); break; } } //如果超过最大连接数(根据全局状态结构的记录判断),则需要关闭连接 if (settings.maxconns_fast && stats.curr_conns + stats.reserved_fds >= settings.maxconns - 1) { // // } else {//如果没有超载,则直接分发(UDP,不需要建立连接,直接分发)工作线程 dispatch_conn_new(sfd, conn_new_cmd, EV_READ | EV_PERSIST, DATA_BUFFER_SIZE, tcp_transport); } stop = true; break; case conn_waiting: case conn_read: case conn_parse_cmd : case conn_nread: //以及其他各种状态 return; } }
本小节要着重分析的是第一个状态 conn_listening:
该状态是主线程监听socket的业务处理:监听套接字,接受,并将得到的连接socket分发给选中的某个工作线程。
switch(c->state) { case conn_listening://监听状态 addrlen = sizeof(addr); #ifdef HAVE_ACCEPT4 if (use_accept4) { sfd = accept4(c->sfd, (struct sockaddr *)&addr, &addrlen, SOCK_NONBLOCK); } else { sfd = accept(c->sfd, (struct sockaddr *)&addr, &addrlen); } #else //主线程进入状态机之后执行accept操作,这个操作也是非阻塞的。 sfd = accept(c->sfd, (struct sockaddr *)&addr, &addrlen); #endif if (sfd == -1) { if (use_accept4 && errno == ENOSYS) { use_accept4 = 0; continue; } perror(use_accept4 ? "accept4()" : "accept()"); if (errno == EAGAIN || errno == EWOULDBLOCK) { /* these are transient, so don't log anything */ stop = true; } else if (errno == EMFILE) {//连接超载 if (settings.verbose > 0) fprintf(stderr, "Too many open connections\n"); accept_new_conns(false); stop = true; } else { perror("accept()"); stop = true; } break; } //连接成功,则将连接socket设为非阻塞 if (!use_accept4) { if (fcntl(sfd, F_SETFL, fcntl(sfd, F_GETFL) | O_NONBLOCK) < 0) { perror("setting O_NONBLOCK"); close(sfd); break; } } //如果超过设置的同时在线最大连接数(默认为1024)(根据全局状态结构的记录判断),则需要关闭连接 if (settings.maxconns_fast && stats.curr_conns + stats.reserved_fds >= settings.maxconns - 1) { str = "ERROR Too many open connections\r\n"; res = write(sfd, str, strlen(str)); close(sfd); STATS_LOCK(); stats.rejected_conns++; STATS_UNLOCK(); } else {//如果没有超载,则直接分发(UDP,不需要建立连接,直接分发)工作线程 dispatch_conn_new(sfd, conn_new_cmd, EV_READ | EV_PERSIST, DATA_BUFFER_SIZE, tcp_transport); } stop = true; break; }
//主线程在监听套接字的回调函数中,当有新连接到来时, 调用该函数将接受到的新连接socket分发给工作线程 //注意:由于UDP不需要建立连接,所以直接分发给Worker线程 void dispatch_conn_new(int sfd, enum conn_states init_state, int event_flags, int read_buffer_size, enum network_transport transport) { CQ_ITEM *item = cqi_new();//从CQ_ITEM资源池中取得一个空闲ITEM char buf[1]; if (item == NULL) { close(sfd); /* given that malloc failed this may also fail, but let's try */ fprintf(stderr, "Failed to allocate memory for connection object\n"); return ; } int tid = (last_thread + 1) % settings.num_threads;//通过round-robin算法选择一个线程 LIBEVENT_THREAD *thread = threads + tid;//缓存这次选中的线程 last_thread = tid;//更新最近一次选中的线程编号 //设置CQ_ITEM的各字段 item->sfd = sfd;//sfd是连接socket item->init_state = init_state; item->event_flags = event_flags; item->read_buffer_size = read_buffer_size; item->transport = transport; //主线程将item投递到选中的工作线程的ITEM连接队列中 cq_push(thread->new_conn_queue, item); MEMCACHED_CONN_DISPATCH(sfd, thread->thread_id); buf[0] = 'c'; //管道通知:在Worker线程的notify_send_fd写入字符c,表示有连接 if (write(thread->notify_send_fd, buf, 1) != 1) { perror("Writing to thread notify pipe"); } }可以看到,在该派发函数中首先从CQ_ITEM资源池(空闲链表)中提取一个ITEM,并设置为该连接socket的各字段信息,然后以采用轮询方式选择一个工作线程,再将该ITEM放入该工作线程的连接任务队列CQ中,最后通过通知管道的写端,写入通知信息。 接下来就是前面已经分析过的工作线程来负责处理该连接socket的所有业务了。
郑重声明:本站内容如果来自互联网及其他传播媒体,其版权均属原媒体及文章作者所有。转载目的在于传递更多信息及用于网络分享,并不代表本站赞同其观点和对其真实性负责,也不构成任何其他建议。