memcached源码分析-----get命令处理流程



        转载请注明出处:http://blog.csdn.net/luotuo44/article/details/44217383



        本文以get命令为例子,探讨memcached是如何处理命令的。本文只是探讨memcached处理命令的工作流程,具体的代码细节在不影响阅读的前提下能省略的就省略、能取默认值就取默认值、内存是足够的(不需要动态申请空间就够用了)。涉及到数组、缓存区的就假设已经分配好了。


        现在假定memcached里面有了一个键值为”tk”的item,此时我们使用命令”get tk”获取对应item的内容。


       《半同步半异步网络模型》展示了当memcached进程accept一个新客户端连接时,会把该连接的一些信息封装成一个conn结构体,并且把新连接的初始状态设置成conn_new_cmd。此时,worker线程等待客户端命令的到来。conn结构体有很多成员变量,后文只会列出使用到的成员。



读取命令:

等待有数据可读:


        当客户端发送get命令后,memcached的event_base就会监听到客户端对应的socket fd变成可读了,接着就会调用回调函数event_handler处理这个可读事件。实际上回调函数event_handler只是一个傀儡函数,它会调用drive_machine函数进行处理。drive_machine是一个有限状态机,在真正读数据之前它会在几个状态中跳转。

void event_handler(const int fd, const short which, void *arg) {
    conn *c;

    c = (conn *)arg;

    c->which = which;

    /* sanity */
    if (fd != c->sfd) {
        conn_close(c);
        return;
    }

    drive_machine(c);

    /* wait for next event */
    return;
}


struct conn {
    int    sfd;//该conn对应的socket fd
    enum conn_states  state;//当前状态
    struct event event;//该conn对应的event
    short  ev_flags;//event当前监听的事件类型
    short  which;   /** which events were just triggered */ //触发event回调函数的原因

	//读缓冲区
    char   *rbuf;   /** buffer to read commands into */ 
	//有效数据的开始位置。从rbuf到rcurr之间的数据是已经处理的了,变成无效数据了
    char   *rcurr;  /** but if we parsed some already, this is where we stopped */
	//读缓冲区的总长度
    int    rsize;   /** total allocated size of rbuf */
	//有效数据的长度。初始值为0
    int    rbytes;  /** how much data, starting from rcur, do we have unparsed */


	...
   
    LIBEVENT_THREAD *thread;//这个conn属于哪个worker线程
};


static void drive_machine(conn *c) {
    bool stop = false;
    int sfd;
    int nreqs = settings.reqs_per_event;//20
    int res;
    const char *str;

	//drive_machine被调用会进行状态判断,并进行一些处理。但也可能发生状态的转换
	//此时就需要一个循环,当进行状态转换时,也能处理
    while (!stop) {

        switch(c->state) {
			...

        case conn_waiting://等待socket变成可读的
            if (!update_event(c, EV_READ | EV_PERSIST)) {//更新监听事件失败
                conn_set_state(c, conn_closing);
                break;
            }

            conn_set_state(c, conn_read);
			//居然stop循环,不过没关系,因为event的可读事件是水平触发的。
			//马上又会再次进入有限状态机,并且进入下面的conn_read case中。
            stop = true;
            break;

        case conn_new_cmd:

            --nreqs;
            if (nreqs >= 0) {//简单起见,不考虑nreqs小于0的情况
				//如果该conn的读缓冲区没有数据,那么将状态改成conn_waiting
				//如果该conn的读缓冲区有数据,  那么将状态改成conn_pase_cmd
                reset_cmd_handler(c);
            }
            break;

			...
        }
    }

    return;
}


static void reset_cmd_handler(conn *c) {
    c->cmd = -1;
    
	...
	//为了简单,这里假设没有数据
    if (c->rbytes > 0) {//读缓冲区里面有数据 
        conn_set_state(c, conn_parse_cmd);//解析读到的数据
    } else { 
        conn_set_state(c, conn_waiting);//否则等待数据的到来
    }
}

//设置conn的状态
static void conn_set_state(conn *c, enum conn_states state) {
	...

    if (state != c->state) {
        c->state = state;
    }
}


读取数据:

        在前面,conn的状态跳转到了conn_read。在case conn_read中,worker线程会调用try_read_network函数读取客户端发送的数据。try_read_network函数会尽可能地把所有的数据都读进conn的读缓存区中(当然也是有一个最大限度的)。

static void drive_machine(conn *c) {
    bool stop = false;
    int sfd;
    int nreqs = settings.reqs_per_event;//20
    int res;
    const char *str;

	//drive_machine被调用会进行状态判断,并进行一些处理。但也可能发生状态的转换
	//此时就需要一个循环,当进行状态转换时,也能处理
    while (!stop) {

        switch(c->state) {
			...

        case conn_read:
			//这里假定为TCP
            res = IS_UDP(c->transport) ? try_read_udp(c) : try_read_network(c);

            switch (res) {
			…
            case READ_DATA_RECEIVED://读取到了数据,接着就去解析数据
                conn_set_state(c, conn_parse_cmd);
                break;
			…
            }
            break;

			...
        }
    }

    return;
}


 //尽可能把socket的所有数据都读进c指向的一个缓冲区里面
static enum try_read_result try_read_network(conn *c) {
    enum try_read_result gotdata = READ_NO_DATA_RECEIVED;
    int res;
	...

    while (1) {
		...
        int avail = c->rsize - c->rbytes;
        res = read(c->sfd, c->rbuf + c->rbytes, avail);
        if (res > 0) {
			...
            gotdata = READ_DATA_RECEIVED;
            c->rbytes += res;
            if (res == avail) {//可能还有数据没有读出来
                continue;
            } else {
                break;//socket暂时还没数据了(即已经读取完)
            }
        }
		...
    }
    return gotdata;
}


解析命令:

        前面已经展示了,worker线程怎么读取数据(命令),并且在读取完毕后会把conn的状态设置为conn_parse_cmd。为了简单起见,我们假设经过一次读取就已经成功读取了一条完整的get命令。

static void drive_machine(conn *c) {
    int res;

    while (!stop) {

        switch(c->state) {
        case conn_parse_cmd :
			//返回1表示正在处理读取的一条命令
			//返回0表示需要继续读取socket的数据才能解析命令
			//如果读取到了一条完整的命令,那么函数内部会去解析,
			//并进行调用process_command函数进行一些处理.
			//像set、add、replace、get这些命令,会在处理的时候调用
			//conn_set_state(c, conn_nread)
            if (try_read_command(c) == 0) {
                /* wee need more data! */
                conn_set_state(c, conn_waiting);
            }

            break;
        }
    }

    return;
}



/*
 * if we have a complete line in the buffer, process it.
 */
static int try_read_command(conn *c) {

	...
 
	char *el, *cont;

	el = memchr(c->rcurr, '\n', c->rbytes);
	if (!el) {//没有读取到一条完整的命令
		...//为了简单,不考虑这种情况。
		return 0;//返回0表示需要继续读取socket的数据才能解析命令
	}


	//来到这里,说明已经读取到至少一条完整的命令

	
	cont = el + 1;//用cont指向下一行的开始,无论行尾是\n还是\r\n

	//不同的平台对于行尾有不同的处理,有的为\r\n有的则是\n。所以memcached
	//还要判断一下\n前面的一个字符是否为\r
	if ((el - c->rcurr) > 1 && *(el - 1) == '\r') {
		el--;//指向行尾的开始字符
	}

	//'\0',C语言字符串结尾符号。结合c->rcurr这个开始位置,就可以确定
	//这个命令(现在被看作一个字符串)的开始和结束位置。rcurr指向了一个字符串
	//注意,下一条命令的开始位置由前面的cont指明了
	*el = '\0';
	

	c->last_cmd_time = current_time;
	//处理这个命令
	process_command(c, c->rcurr);//命令字符串由c->rcurr指向

	...

    return 1;//返回1表示正在处理读取的一条命令
}

        上面的try_read_command函数,以\n或者\n\r为作为一条数据的结尾。并且会把数据的结尾赋值为’\0’,这样conn的rcurr指针就相当于指向一个以’\0’结尾的字符串。接着就会调用process_command函数处理这个字符串,在处理之前还要解析出这个字符串具体是什么命令。


符号化命令内容:

        在执行命令之前,必须要知道接收到的字符串是什么命令以及参数是什么。为此,memcached会调用tokenize_command函数处理命令字符串,将字符串符号化。比如命令字符串"set tt 3 0 10",将符号化为”set”、”tt”、”3”、”0”和”10”(后面会将这些称为token)。此外tokenize_command还会清除命令字符串里面的多余空白符。Memcached定义了一个token_t结构体(如下面代码所示)。memcached还为每一条字符串命令定义一个token_t数组,数组每一个元素的value成员指向对应token的开始位置,length成员则记录该token的长度。

#define COMMAND_TOKEN 0
#define SUBCOMMAND_TOKEN 1
#define KEY_TOKEN 1

#define MAX_TOKENS 8

typedef struct token_s {
    char *value;
    size_t length;
} token_t;


//command指向这条命令(该命令以字符串的形式表示)
static void process_command(conn *c, char *command) {

    token_t tokens[MAX_TOKENS];
    size_t ntokens;
    int comm;

	...

	//将一条命令分割成一个个的token,并用tokens数组一一对应的指向
	//比如命令"set tt 3 0 10",将被分割成"set"、"tt"、"3"、"0"、"10"
	//并用tokens数组的5个元素对应指向。token_t类型的value成员指向对应token
	//在command字符串中的位置,length则指明该token的长度。
	//该函数返回token的数量,数量是用户敲入的命令token数 + 1.
	//上面的set命令例子,tokenize_command会返回6。  最后一个token是无意义的
    ntokens = tokenize_command(command, tokens, MAX_TOKENS);//将命令记号化

	//对于命令"get tk",那么tokens[0].value 等于指向"get"的开始位置
	//tokens[1].value 则指向"tk"的开始位置
    if (ntokens >= 3 &&
        ((strcmp(tokens[COMMAND_TOKEN].value, "get") == 0) ||
         (strcmp(tokens[COMMAND_TOKEN].value, "bget") == 0))) {

        process_get_command(c, tokens, ntokens, false);

    }
	else
	{
		...//根据tokens判断是否为其他命令,并进行对应的处理
	}
   	
}


执行命令:

回应信息的存储:


        process_get_command函数在处理get命令时,并不是直接拷贝一份item的数据(考虑一下效率和内存),所以memcached是直接使用item本身的数据,用iovec结构体的成员变量指向item里面的数据。这样能省去拷贝数据内存,也能提高效率。但memcached里面的item可能随时被删除(归还给slab内存分配器),可以通过占用这个item,防止item被删除。在《item引用计数》中说到,只要增加item的引用计数就能防止这个item被删除。于是在process_get_command函数中会占有item,并用一个item指针数组记录其占用了哪些item(这个数组在conn结构体中)。当memcached将item的数据返回给客户端后,就会释放对item的占用。

        前面说到memcached使用iovec结构体的成员变量指向item的数据,但memcached并不是使用writev函数向客户端写数据的,而是使用sendmsg函数。sendmsg函数使用msghdr结构体指针作为参数。因为sendmsg函数中msghdr结构体中的iovec数组长度是有限制的,所以conn结构体中有一个msghdr数组。数组中每一个msghdr结构体带有IOV_MAX个iovec结构体。通过动态申请msghdr数组,可以使得有很多个iovec结构体,不再受IOV_MAX的限制。当然前面说到的iovec结构体个数也是要有足够多,所以conn结构体里面还是有一个iovec指针用来动态申请iovec结构体。现在来看一下conn结构体对应的成员。

struct conn {

    struct iovec *iov;//iovec数组指针
	//数组大小
    int    iovsize;   /* number of elements allocated in iov[] */
	//已经使用的数组元素个数
    int    iovused;   /* number of elements used in iov[] */


	//因为msghdr结构体里面的iovec结构体数组长度是有限制的。所以为了能
	//传输更多的数据,只能增加msghdr结构体的个数.add_msghdr函数负责增加
    struct msghdr *msglist;//指向msghdr数组
	//数组大小
    int    msgsize;   /* number of elements allocated in msglist[] */
	//已经使用了的msghdr元素个数
    int    msgused;   /* number of elements used in msglist[] */
	//正在用sendmsg函数传输msghdr数组中的哪一个元素
    int    msgcurr;   /* element in msglist[] being transmitted now */
    //msgcurr指向的msghdr总共有多少个字节
	int    msgbytes;  /* number of bytes in current msg */


	//worker线程需要占有这个item,直至把item的数据都写回给客户端了
	//故需要一个item指针数组记录本conn占有的item
    item   **ilist;   /* list of items to write out */
    int    isize;//数组的大小
    item   **icurr;//当前使用到的item(在释放占用item时会用到)
    int    ileft;//ilist数组中有多少个item需要释放
};


        在process_command函数中,memcached会增加msglist数组的大小。

static void process_command(conn *c, char *command) {

    c->msgcurr = 0;
    c->msgused = 0;
    c->iovused = 0;
    if (add_msghdr(c) != 0) {
        out_of_memory(c, "SERVER_ERROR out of memory preparing response");
        return;
    }

	...
}


/*
 * Adds a message header to a connection.
 *
 * Returns 0 on success, -1 on out-of-memory.
 */
static int add_msghdr(conn *c)
{
    struct msghdr *msg;

    assert(c != NULL);

    if (c->msgsize == c->msgused) {//已经用完了
        msg = realloc(c->msglist, c->msgsize * 2 * sizeof(struct msghdr));
        if (! msg) {
            return -1;
        }
        c->msglist = msg;
        c->msgsize *= 2;
    }

    msg = c->msglist + c->msgused;//msg指向空闲的节点

    /* this wipes msg_iovlen, msg_control, msg_controllen, and
       msg_flags, the last 3 of which aren't defined on solaris: */
    memset(msg, 0, sizeof(struct msghdr));

    msg->msg_iov = &c->iov[c->iovused];//指向空闲的iovec


    c->msgbytes = 0;
    c->msgused++;

    return 0;
}


        前面说到memcached使用iovec结构体的成员变量指向item的数据,实际上除了item数据,所有回应客户端的数据(包括错误信息)都是通过iovec结构体指向的。memcached通过add_iov函数把要回应的字符串加入到iovec中。

static int add_iov(conn *c, const void *buf, int len) {
    struct msghdr *m;
    int leftover;
    bool limit_to_mtu;

    assert(c != NULL);

	//在process_command函数中,一开始会调用add_msghdr函数,而add_msghdr会把
	//msgused++,所以msgused会等于1,即使在conn_new函数中它被赋值为0
    do {
        m = &c->msglist[c->msgused - 1];

        /*
         * Limit UDP packets, and the first payloads of TCP replies, to
         * UDP_MAX_PAYLOAD_SIZE bytes.
         */
        limit_to_mtu = IS_UDP(c->transport) || (1 == c->msgused);

        /* We may need to start a new msghdr if this one is full. */
        if (m->msg_iovlen == IOV_MAX ||//一个msghdr最多只能有IOV_MAX个iovec结构体
            (limit_to_mtu && c->msgbytes >= UDP_MAX_PAYLOAD_SIZE)) {
            add_msghdr(c);
            m = &c->msglist[c->msgused - 1];
        }

		//保证iovec数组是足够用的。调用add_iov函数一次会消耗一个iovec结构体
		//所以可以在插入数据之前保证iovec数组是足够用的
        if (ensure_iov_space(c) != 0)
            return -1;

        /* If the fragment is too big to fit in the datagram, split it up */
        if (limit_to_mtu && len + c->msgbytes > UDP_MAX_PAYLOAD_SIZE) {
            leftover = len + c->msgbytes - UDP_MAX_PAYLOAD_SIZE;
            len -= leftover;
        } else {
            leftover = 0;
        }

        m = &c->msglist[c->msgused - 1];

		//用一个iovec结构体指向要回应的数据
        m->msg_iov[m->msg_iovlen].iov_base = (void *)buf;
        m->msg_iov[m->msg_iovlen].iov_len = len;

        c->msgbytes += len;
        c->iovused++;
        m->msg_iovlen++;

        buf = ((char *)buf) + len;
        len = leftover;
    } while (leftover > 0);

    return 0;
}



/*
 * Ensures that there is room for another struct iovec in a connection's
 * iov list.
 *
 * Returns 0 on success, -1 on out-of-memory.
 */
static int ensure_iov_space(conn *c) {
    assert(c != NULL);

	//已经使用完了之前申请的
    if (c->iovused >= c->iovsize) {
        int i, iovnum;
        struct iovec *new_iov = (struct iovec *)realloc(c->iov,
                                (c->iovsize * 2) * sizeof(struct iovec));
        if (! new_iov) {
            return -1;
        }
        c->iov = new_iov;
        c->iovsize *= 2;

        /* Point all the msghdr structures at the new list. */
		//因为iovec数组已经重新分配在别的空间了,而msglist数组元素指向这个iovec
		//数组,所以需要修改msglist数组元素的值
        for (i = 0, iovnum = 0; i < c->msgused; i++) {
            c->msglist[i].msg_iov = &c->iov[iovnum];
            iovnum += c->msglist[i].msg_iovlen;
        }
    }

    return 0;
}

        看了上面的代码,可能读者还不是很明白前面列出的conn结构体成员的关联。不懂的,可以参考下图:

        技术分享



处理get命令:

        有了上面的说明和代码,现在来看一下process_get_command函数。当然我们这里也是假设上面三个数组都是分配了内存。在process_get_command函数中会涉及到item的哈希表查找和删除(超时懒惰删除),关于这两点可以分别参考《哈希表查找item》和《删除item》。

item *item_get(const char *key, const size_t nkey) {
    item *it;
    uint32_t hv;
    hv = hash(key, nkey);
    item_lock(hv);
    it = do_item_get(key, nkey, hv);
    item_unlock(hv);
    return it;
}


/** wrapper around assoc_find which does the lazy expiration logic */
//调用do_item_get的函数都已经加上了item_lock(hv)段级别锁或者全局锁
item *do_item_get(const char *key, const size_t nkey, const uint32_t hv) {
    //mutex_lock(&cache_lock);
    item *it = assoc_find(key, nkey, hv);//assoc_find函数内部没有加锁
    
	...
    //mutex_unlock(&cache_lock);

    if (it != NULL) {
        if (...) {
			...
        } else if (it->exptime != 0 && it->exptime <= current_time) {//该item已经过期失效了
            do_item_unlink(it, hv);//引用计数会减一
            do_item_remove(it);//引用计数减一,如果引用计数等于0,就删除
            it = NULL;
        } else {
        	//把这个item标志为被访问过的
            it->it_flags |= ITEM_FETCHED;
        }
    }

    return it;
}




/* ntokens is overwritten here... shrug.. */
static inline void process_get_command(conn *c, token_t *tokens, size_t ntokens, bool return_cas) {
    char *key;
    size_t nkey;
    int i = 0;
    item *it;
    token_t *key_token = &tokens[KEY_TOKEN];
    char *suffix;
    assert(c != NULL);

    do {
		//因为一个get命令可以同时获取多条记录的内容
		//比如get key1 key2 key3
        while(key_token->length != 0) {

            key = key_token->value;
            nkey = key_token->length;

            it = item_get(key, nkey);
  
            if (it) {

                /*
                 * Construct the response. Each hit adds three elements to the
                 * outgoing data list:
                 *   "VALUE "
                 *   key
                 *   " " + flags + " " + data length + "\r\n" + data (with \r\n)
                 */

                if (return_cas)
                {
					...//不是cas
                }
                else
                {
				  //填充要返回的信息
                  if (add_iov(c, "VALUE ", 6) != 0 ||//如果add_iov成功,则返回0
                      add_iov(c, ITEM_key(it), it->nkey) != 0 ||
                      add_iov(c, ITEM_suffix(it), it->nsuffix + it->nbytes) != 0)
                      {
                          item_remove(it);//引用计数减一
                          break;
                      }
                }

				//刷新这个item的访问时间以及在LRU队列中的位置
                item_update(it);

				//并不会马上放弃对这个item的占用。因为在add_iov函数中,memcached并不为
				//复制一份item,而是直接使用item结构体本身的数据。故不能马上解除对
				//item的引用,不然其他worker线程就有机会把这个item释放,导致野指针
                *(c->ilist + i) = it;//把这个item放到ilist数组中,日后会进行释放的
                i++;

            } 

            key_token++;
        }


        //因为调用一次tokenize_command最多只可以解析MAX_TOKENS-1个token,但
        //get命令的键值key个数可以有很多个,所以此时就会出现后面的键值
        //不在第一次tokenize的tokens数组中,此时需要多次调用tokenize_command
        //函数,把所有的键值都tokenize出来。注意,此时还是在get命令中。
		//当然在看这里的代码时直接忽略这种情况,我们只考虑"get tk"命令
        if(key_token->value != NULL) {
            ntokens = tokenize_command(key_token->value, tokens, MAX_TOKENS);
            key_token = tokens;
        }

    } while(key_token->value != NULL);

    c->icurr = c->ilist;
    c->ileft = i;

    /*
        If the loop was terminated because of out-of-memory, it is not
        reliable to add END\r\n to the buffer, because it might not end
        in \r\n. So we send SERVER_ERROR instead.
    */
    if (key_token->value != NULL || add_iov(c, "END\r\n", 5) != 0
        || (IS_UDP(c->transport) && build_udp_headers(c) != 0)) {
        out_of_memory(c, "SERVER_ERROR out of memory writing get response");
    }
    else {
        conn_set_state(c, conn_mwrite);//更改conn的状态
        c->msgcurr = 0;
    }
}


回应命令:

        前面的process_get_command函数已经把要写的数据都通过iovec结构体指明了,并且把conn的状态设置为conn_mwrite。现在来看一下memcached具体是怎么写数据的。

static void drive_machine(conn *c) {

	bool stop = false;

    while (!stop) {

        switch(c->state) {
		
		...
        case conn_mwrite:
			...

            switch (transmit(c)) {//发送数据给c->sfd指明的客户端
            case TRANSMIT_COMPLETE://发送数据完毕
                if (c->state == conn_mwrite) {
                    conn_release_items(c);//释放对item的占用
                    /* XXX:  I don't know why this wasn't the general case */
                    if(c->protocol == binary_prot) {
                        conn_set_state(c, c->write_and_go);
                    } else {//我们只考虑文本协议
                        conn_set_state(c, conn_new_cmd);//又回到了一开始的conn_new_cmd状态
                    }
                } 
                break;

			case TRANSMIT_INCOMPLETE://还没发送完毕
				break;

            }
            break;
        }
    }

    return;
}




 //通过s->sfd把数据写到对端
static enum transmit_result transmit(conn *c) {

    if (c->msgcurr < c->msgused && 
            c->msglist[c->msgcurr].msg_iovlen == 0) {//msgcurr指向的msghdr已经发送完毕
        /* Finished writing the current msg; advance to the next. */
        c->msgcurr++;
    }


    if (c->msgcurr < c->msgused) {//所有的数据都已经发送完毕
        ssize_t res;
        struct msghdr *m = &c->msglist[c->msgcurr];

        res = sendmsg(c->sfd, m, 0);
        if (res > 0) {

			//通过sendmsg返回值确定已经写了多少个iovec数组。循环减去每一个iovec数组的每一个
			//元素的数据长度即可
            while (m->msg_iovlen > 0 && res >= m->msg_iov->iov_len) {
                res -= m->msg_iov->iov_len;
                m->msg_iovlen--;
                m->msg_iov++;
            }

			//只写了iovec结构体的部分数据
            if (res > 0) {
                m->msg_iov->iov_base = (caddr_t)m->msg_iov->iov_base + res;
                m->msg_iov->iov_len -= res;
            }
            return TRANSMIT_INCOMPLETE;
        }

    } else {
        return TRANSMIT_COMPLETE;
    }
}


        可以看到,即使transmit函数一次把所有的数据都写到了客户端,还是会调用transmit函数两次才能返回TRANSMIT_COMPLETE。当memcached把所有的数据都写回客户端后,就会调用conn_release_items函数释放对item的占用。

static void conn_release_items(conn *c) {

	...
    while (c->ileft > 0) {
        item *it = *(c->icurr);
        assert((it->it_flags & ITEM_SLABBED) == 0);
        item_remove(it);
        c->icurr++;
        c->ileft--;
    }

	...
    c->icurr = c->ilist;
}




郑重声明:本站内容如果来自互联网及其他传播媒体,其版权均属原媒体及文章作者所有。转载目的在于传递更多信息及用于网络分享,并不代表本站赞同其观点和对其真实性负责,也不构成任何其他建议。