memcache的线程模型
-b backlog队列长度,默认1024
- typedef struct {
- pthread_t thread_id; /* unique ID of this thread */
- struct event_base *base; /* libevent handle this thread uses */
- struct event notify_event; /* listen event for notify pipe */
- int notify_receive_fd; /* receiving end of notify pipe */
- int notify_send_fd; /* sending end of notify pipe */
- CQ new_conn_queue; /* queue of new connections to handle */
- } LIBEVENT_THREAD;
- void dispatch_conn_new(int sfd, enum conn_states init_state, int event_flags,
- int read_buffer_size, enum network_transport transport) {
- CQ_ITEM *item = cqi_new();//创建一个conn_item
- char buf[1];
- int tid = (last_thread + 1) % settings.num_threads;//通过round-robin算法选择一个线程
-
- LIBEVENT_THREAD *thread = threads + tid;//thread数组存储了所有的工作线程
-
- cq_push(thread->new_conn_queue, item);//投递item信息到Worker线程的工作队列中
-
- buf[0] = ‘c‘;
- //在Worker线程的notify_send_fd写入字符c,表示有连接
- if (write(thread->notify_send_fd, buf, 1) != 1) {
- perror("Writing to thread notify pipe");
- }
- static void thread_libevent_process(int fd, short which, void *arg) {
- LIBEVENT_THREAD *me = arg;
- CQ_ITEM *item;
- char buf[1];
- if (read(fd, buf, 1) != 1)//PIPE管道读取一个字节的数据
- if (settings.verbose > 0)
- fprintf(stderr, "Can‘t read from libevent pipe\n");
- switch (buf[0]) {
- case ‘c‘://如果是c,则处理网络连接
- item = cq_pop(me->new_conn_queue);//从连接队列读出Master线程投递的消息 \
-
- if (NULL != item) {
- conn *c = conn_new(item->sfd, item->init_state, item->event_flags,
- item->read_buffer_size, item->transport, me->base);//创建连接
- conn *conn_new(const int sfd, enum conn_states init_state, const int event_flags,
- const int read_buffer_size, enum network_transport transport,
- struct event_base *base)
- {
- conn *c = conn_from_freelist();//获取一个空闲连接,conn是Memcached内部对网络连接的一个封装
- if (NULL == c)//如果没有空闲的连接
- {
- if (!(c = (conn *) calloc(1, sizeof(conn))))//申请空间
- {
- fprintf(stderr, "calloc()\n");
- return NULL;
- }MEMCACHED_CONN_CREATE(c);
- //每个conn都自带读入和输出缓冲区,在进行网络收发数据时,特别方便
- c->rbuf = (char *) malloc((size_t) c->rsize);
- c->wbuf = (char *) malloc((size_t) c->wsize);
- c->ilist = (item **) malloc(sizeof(item *) * c->isize);
- //建立sfd描述符上面的event事件,事件回调函数为event_handler
- event_set(&c->event, sfd, event_flags, event_handler, (void *) c);
- event_base_set(base, &c->event);
- c->ev_flags = event_flags;
- if (event_add(&c->event, 0) == -1)
- {
- //如果建立libevent事件失败,将创建的conn添加到空闲列表中
- if (conn_add_to_freelist(c))
- {
- conn_free(c);
- }
- perror("event_add");
- return NULL;
- }
- //libevent事件回调函数的处理,回调函数被调用时,表明Memcached监听的端口号有网络事件到了
- void event_handler(const int fd, const short which, void *arg)
- {
- conn *c;
- //进入业务处理状态机
- drive_machine(c);
线程通信
- struct conn_queue_item {
- int sfd;//accept之后的描述符
- enum conn_states init_state;//连接的初始状态
- int event_flags;//libevent标志
- int read_buffer_size;//读取数据缓冲区大小
- enum network_transport transport;//内部通信所用的协议
- CQ_ITEM *next;//用于实现链表的指针
- };
-
- struct conn_queue {
- CQ_ITEM *head;//头指针,注意这里是单链表,不是双向链表
- CQ_ITEM *tail;//尾部指针,
- pthread_mutex_t lock;//锁
- pthread_cond_t cond;//条件变量
- };
-
- //获取一个连接
- static CQ_ITEM *cq_pop(CQ *cq) {
- CQ_ITEM *item;
- pthread_mutex_lock(&cq->lock);//执行加锁操作
- item = cq->head;//获得头部指针指向的数据
- if (NULL != item) {
- cq->head = item->next;//更新头指针信息
- if (NULL == cq->head)//这里为空的话,则尾指针也为空,链表此时为空
- cq->tail = NULL;
- }
- pthread_mutex_unlock(&cq->lock);//释放锁操作
- return item;
- }
参考链接 http://blog.csdn.net/column/details/lc-memcached.html
郑重声明:本站内容如果来自互联网及其他传播媒体,其版权均属原媒体及文章作者所有。转载目的在于传递更多信息及用于网络分享,并不代表本站赞同其观点和对其真实性负责,也不构成任何其他建议。