Linux c 开发 - Memcached源码分析之HashTable(4)
前言
上一章我们讲解了Memcached的消息回应机制《Linux c 开发 - Memcached源码分析之消息回应(3)》。从这一章开始我们慢慢讲解Memcached是如何存储数据的。
讲解本章前,我们先看一个Memcached存储数据的基本结构。
//item的具体结构 typedef struct _stritem { struct _stritem *next; //下一个结构 struct _stritem *prev; //前一个结构 struct _stritem *h_next; //hashtable的list /* hash chain next */ rel_time_t time; /* least recent access */ rel_time_t exptime; /* expire time */ int nbytes; /* size of data */ unsigned short refcount; uint8_t nsuffix; /* length of flags-and-length string */ uint8_t it_flags; /* ITEM_* above */ uint8_t slabs_clsid;/* which slab class we're in */ uint8_t nkey; /* key length, w/terminating null and padding */ /* this odd type prevents type-punning issues when we do * the little shuffle to save space when not using CAS. */ union { uint64_t cas; char end; } data[]; /* if it_flags & ITEM_CAS we have 8 bytes CAS */ /* then null-terminated key */ /* then " flags length\r\n" (no terminating null) */ /* then data with terminating \r\n (no terminating null; it's binary!) */ } item;
说明:
1. Memcached上存储的每一个元素都会有一个Item的结构。
2. Item结构主要记录与HashTable之间的关系,以及存储数据的slabs_class(后面几章节会讲解)的关系以及key的信息,存储的数据长度等基本信息。
Memcached的HashTable源码分析
HashTable结构图
说明:
1. Memcached在启动的时候,会默认初始化一个HashTable,这个table的默认长度为65536。
2. 我们将这个HashTable中的每一个元素称为桶,每个桶就是一个item结构的单向链表。
3. Memcached会将key值hash成一个变量名称为hv的uint32_t类型的值。
4. 通过hv与桶的个数之间的按位与计算,hv & hashmask(hashpower),就可以得到当前的key会落在哪个桶上面。
item_get - 继续从get方法说起
结合第二章,我们继续看Memcached get的操作方法。这个方法中,将key 哈希成一个uint32_t类型的值。
//这边的item_*系列的方法,就是Memcached核心存储块的接口 item *item_get(const char *key, const size_t nkey) { item *it; uint32_t hv; hv = hash(key, nkey); //对key进行hash,返回一个uint32_t类型的值 item_lock(hv); //块锁,当取数据的时候,不允许其他的操作,保证取数据的原子性 it = do_item_get(key, nkey, hv); item_unlock(hv); return it; }继续看do_item_get这个方法,我们可以看到assoc_find这个方法就是查询HashTable上的Item。
/** wrapper around assoc_find which does the lazy expiration logic */ item *do_item_get(const char *key, const size_t nkey, const uint32_t hv) { //mutex_lock(&cache_lock); item *it = assoc_find(key, nkey, hv); //hashTable上存储Item //....more code }
assoc.c - assoc_init 初始化HashTable
当我们进入assoc_find这个方法后,发现HashTable的操作都在assoc.c这个文件中。
assoc_init这个方法为HashTable初始化,assoc_init这个方法是在入口函数main中初始化的。
HashTable默认设置为16,1 << 16后得到65536个桶。如果用户自定义设置,设置值在12-64之间。
//初始化HahsTable表 void assoc_init(const int hashtable_init) { //初始化的时候 hashtable_init值需要大于12 小于64 //如果hashtable_init的值没有设定,则hashpower使用默认值为16 if (hashtable_init) { hashpower = hashtable_init; } //primary_hashtable主要用来存储这个HashTable //hashsize方法是求桶的个数,默认如果hashpower=16的话,桶的个数为:65536 primary_hashtable = calloc(hashsize(hashpower), sizeof(void *)); if (! primary_hashtable) { fprintf(stderr, "Failed to init hashtable.\n"); exit(EXIT_FAILURE); } STATS_LOCK(); stats.hash_power_level = hashpower; stats.hash_bytes = hashsize(hashpower) * sizeof(void *); STATS_UNLOCK(); }
assoc.c - assoc_find 查找一个Item
//寻找一个Item item *assoc_find(const char *key, const size_t nkey, const uint32_t hv) { item *it; unsigned int oldbucket; //判断是否在扩容中... if (expanding && (oldbucket = (hv & hashmask(hashpower - 1))) >= expand_bucket) { it = old_hashtable[oldbucket]; } else { //获取得到具体的桶的地址 it = primary_hashtable[hv & hashmask(hashpower)]; } item *ret = NULL; int depth = 0; //循环的深度 while (it) { //循环查找桶的list中的Item if ((nkey == it->nkey) && (memcmp(key, ITEM_key(it), nkey) == 0)) { ret = it; break; } it = it->h_next; ++depth; } MEMCACHED_ASSOC_FIND(key, nkey, depth); return ret; }
assoc.c - assoc_insert 新增一个Item
//新增Item操作 int assoc_insert(item *it, const uint32_t hv) { unsigned int oldbucket; assert(assoc_find(ITEM_key(it), it->nkey) == 0); /* shouldn't have duplicately named things defined */ //判断是否在扩容,如果是扩容中,为保证程序继续可用,则需要使用旧的桶 if (expanding && (oldbucket = (hv & hashmask(hashpower - 1))) >= expand_bucket) { it->h_next = old_hashtable[oldbucket]; old_hashtable[oldbucket] = it; } else { //hv & hashmask(hashpower) 按位与计算是在哪个桶上面 //将当前的item->h_next 指向桶中首个Item的位置 it->h_next = primary_hashtable[hv & hashmask(hashpower)]; //然后将hashtable中的首页Item指向新的Item地址值 primary_hashtable[hv & hashmask(hashpower)] = it; } hash_items++; //因为是新增操作,则就会增加一个Item //如果hash_items的个数大于当前 (桶的个数(默认:65536) * 3) / 2的时候,就需要重新扩容 //因为初始化的桶本身就比较多了,所以扩容必须在单独的线程中处理,每次扩容估计耗时比较长 if (! expanding && hash_items > (hashsize(hashpower) * 3) / 2) { assoc_start_expand(); } MEMCACHED_ASSOC_INSERT(ITEM_key(it), it->nkey, hash_items); return 1; }
assoc.c - assoc_delete 删除item操作
//该方法主要用于寻找 static item** _hashitem_before (const char *key, const size_t nkey, const uint32_t hv) { item **pos; unsigned int oldbucket; //判断是否在扩容中 if (expanding && (oldbucket = (hv & hashmask(hashpower - 1))) >= expand_bucket) { pos = &old_hashtable[oldbucket]; } else { //返回具体桶的地址 pos = &primary_hashtable[hv & hashmask(hashpower)]; } //在桶的list中匹配key值是否相同,相同则找到Item while (*pos && ((nkey != (*pos)->nkey) || memcmp(key, ITEM_key(*pos), nkey))) { pos = &(*pos)->h_next; } return pos; } //删除一个桶上的Item void assoc_delete(const char *key, const size_t nkey, const uint32_t hv) { item **before = _hashitem_before(key, nkey, hv); //查询Item是否存在 //如果Item存在,则当前的Item值指向下一个Item的指针地址 if (*before) { item *nxt; hash_items--; //item个数减去1 /* The DTrace probe cannot be triggered as the last instruction * due to possible tail-optimization by the compiler */ MEMCACHED_ASSOC_DELETE(key, nkey, hash_items); nxt = (*before)->h_next; (*before)->h_next = 0; /* probably pointless, but whatever. */ *before = nxt; return; } /* Note: we never actually get here. the callers don't delete things they can't find. */ assert(*before != 0); }
assoc.c - 关于扩容
Memcached的扩容都是在单独线程中进行的。
(桶的个数(默认:65536) * 3) / 2的时候,就需要重新扩容。扩容需要的时间比较久,所以必须使用不同的线程进行自动扩容处理。
static void assoc_start_expand(void) { if (started_expanding) return; started_expanding = true; //唤醒线程 pthread_cond_signal(&maintenance_cond); } /* grows the hashtable to the next power of 2. */ //扩容方法 static void assoc_expand(void) { old_hashtable = primary_hashtable; primary_hashtable = calloc(hashsize(hashpower + 1), sizeof(void *)); if (primary_hashtable) { if (settings.verbose > 1) fprintf(stderr, "Hash table expansion starting\n"); hashpower++; expanding = true; expand_bucket = 0; STATS_LOCK(); stats.hash_power_level = hashpower; stats.hash_bytes += hashsize(hashpower) * sizeof(void *); stats.hash_is_expanding = 1; STATS_UNLOCK(); } else { primary_hashtable = old_hashtable; /* Bad news, but we can keep running. */ } } static volatile int do_run_maintenance_thread = 1; #define DEFAULT_HASH_BULK_MOVE 1 int hash_bulk_move = DEFAULT_HASH_BULK_MOVE; static void *assoc_maintenance_thread(void *arg) { while (do_run_maintenance_thread) { int ii = 0; /* Lock the cache, and bulk move multiple buckets to the new * hash table. */ item_lock_global(); mutex_lock(&cache_lock); for (ii = 0; ii < hash_bulk_move && expanding; ++ii) { item *it, *next; int bucket; for (it = old_hashtable[expand_bucket]; NULL != it; it = next) { next = it->h_next; bucket = hash(ITEM_key(it), it->nkey) & hashmask(hashpower); it->h_next = primary_hashtable[bucket]; primary_hashtable[bucket] = it; } old_hashtable[expand_bucket] = NULL; expand_bucket++; if (expand_bucket == hashsize(hashpower - 1)) { expanding = false; free(old_hashtable); STATS_LOCK(); stats.hash_bytes -= hashsize(hashpower - 1) * sizeof(void *); stats.hash_is_expanding = 0; STATS_UNLOCK(); if (settings.verbose > 1) fprintf(stderr, "Hash table expansion done\n"); } } mutex_unlock(&cache_lock); item_unlock_global(); if (!expanding) { /* finished expanding. tell all threads to use fine-grained locks */ switch_item_lock_type(ITEM_LOCK_GRANULAR); slabs_rebalancer_resume(); /* We are done expanding.. just wait for next invocation */ mutex_lock(&cache_lock); started_expanding = false; pthread_cond_wait(&maintenance_cond, &cache_lock); /* Before doing anything, tell threads to use a global lock */ mutex_unlock(&cache_lock); slabs_rebalancer_pause(); switch_item_lock_type(ITEM_LOCK_GLOBAL); mutex_lock(&cache_lock); assoc_expand(); mutex_unlock(&cache_lock); } } return NULL; } static pthread_t maintenance_tid; int start_assoc_maintenance_thread() { int ret; char *env = getenv("MEMCACHED_HASH_BULK_MOVE"); if (env != NULL) { hash_bulk_move = atoi(env); if (hash_bulk_move == 0) { hash_bulk_move = DEFAULT_HASH_BULK_MOVE; } } if ((ret = pthread_create(&maintenance_tid, NULL, assoc_maintenance_thread, NULL)) != 0) { fprintf(stderr, "Can't create thread: %s\n", strerror(ret)); return -1; } return 0; } void stop_assoc_maintenance_thread() { mutex_lock(&cache_lock); do_run_maintenance_thread = 0; pthread_cond_signal(&maintenance_cond); mutex_unlock(&cache_lock); /* Wait for the maintenance thread to stop */ pthread_join(maintenance_tid, NULL); }
郑重声明:本站内容如果来自互联网及其他传播媒体,其版权均属原媒体及文章作者所有。转载目的在于传递更多信息及用于网络分享,并不代表本站赞同其观点和对其真实性负责,也不构成任何其他建议。