memcached 集群c++客户端

memcached 是一个高性能内存缓存,在作为缓存,不需要持久化的场性能稳定,由于现在服务器内存较大,很多应用场景单台memcached就能满足业务需求,普通的官方c API

就能满足需求。


而大型的应用数据量很大,也应该考虑单点故障,集群化可以分散压力,单点故障影响较小。集群的管理通常有两种方式:1.普通hash 2.一致性hash

1.普通hash实现相对简单,效率更高,但是不能动态扩展,这种能满足业务数据不是经常扩展比较固定的场景,单点故障影响不大,这种长期其实很多。通常的mysql + memcached 架构很适合这种方式

2.一致性哈希的优点就是可以动态扩展,适合业务数据持续增长的场景,实现相对复杂,通常需要代理服务器管理


下面是自己实现的基于hash的memcached 集群c++客户端代码,经过线上测试,性能和稳定性没有太大的问题。

/************************************************
function: c++ mcached api 
author:liuyi
date:2012.12.31
version:3.0
modify:2014.10.15
*************************************************/

#ifndef MULI_MEMCACHE_H
#define MULI_MEMCACHE_H

#include <iostream>
#include <string>
#include <vector>
#include <map>
#include <time.h>
#include "libmemcached/memcached.h"
using namespace std;

typedef unsigned int(*hash_fun)(const char*);
unsigned int rs_hash(const char *str)  
{  
	unsigned int b = 378551;  
	unsigned int a = 63689;  
	unsigned int hash = 0;  							     
	while (*str)  
	{  	
		hash = hash * a + (*str++);  
		a *= b;  
	}
	return (hash & 0x7FFFFFFF);  
}

struct memcache_info
{
	string host;
	int port;
};

class single_memcached
{
	public:
		single_memcached()
		{
			servers = NULL;
			memc = NULL;
			rc = MEMCACHED_SUCCESS;
			result_buff = NULL;
		}

		virtual ~single_memcached()
		{
			memcached_free(memc);
			delete []result_buff;
		}
		
		enum {MAX_VALUE_LEN = 1024*1024};

		bool init(const char*host, const int& port)
		{
			result_buff = new char[MAX_VALUE_LEN];
			if(result_buff == NULL)
				return false;

			memc = memcached_create(NULL);
			servers = memcached_server_list_append(NULL, host, port, &rc);
			if(servers == NULL)
				return false;
			rc = memcached_server_push(memc, servers);
			if(rc == MEMCACHED_SUCCESS)
			{
				memcached_server_free(servers);
				return true;
			}
			else
			{
				return false;
			}
		}

		//插入或者覆盖原有数据 expiration为有效时间,默认0为一直有效
		bool set(const char* key, const char* value, time_t expiration = 0)
		{
			rc = memcached_set(memc, key, strlen(key), value, strlen(value), expiration, 0);
			return rc == MEMCACHED_SUCCESS;
		}
		
		//删除数据 expiration为有效时间,默认0为立即生效
		bool delete_k_v(const char* key, time_t expiration = 0)
		{
			rc = memcached_delete(memc, key, strlen(key), expiration);
			return  rc == MEMCACHED_SUCCESS;
		}

		char *get(const char* key)
		{
			size_t value_len = 0;
			uint32_t flag = 0;
			char *ret = memcached_get(memc, key, strlen(key), &value_len, &flag, &rc);
			if(ret == NULL)
			{
				return NULL;
			}
			memset(result_buff, 0, MAX_VALUE_LEN);
			if(value_len < MAX_VALUE_LEN)
			{
				memcpy(result_buff, ret, value_len);
				free(ret);
			}

			return rc == MEMCACHED_SUCCESS && value_len < MAX_VALUE_LEN  ? result_buff : NULL;
		}

	private:
		memcached_server_st *servers;
		memcached_st *memc;
		memcached_return rc;
		char *result_buff;
};

class mulit_memcached
{
	public:
		mulit_memcached()
		{
			memcache_servers.clear();
			memcache_servers_id.clear();
		}

		virtual ~mulit_memcached()
		{
		}
	
		bool init(const vector<memcache_info>& memcache_info_vec, hash_fun hash = rs_hash)
		{
			m_hash_fun = hash;
			memcache_servers.resize(memcache_info_vec.size());
			for(size_t i = 0; i < memcache_info_vec.size(); i++)
			{
				char value[1024] = {0};
				snprintf(value, 1023, "%s#%d", memcache_info_vec[i].host.c_str(), memcache_info_vec[i].port);
				memcache_servers_id.insert(pair<unsigned int, string>(i, value));
				if(!memcache_servers[i].init(memcache_info_vec[i].host.c_str(), memcache_info_vec[i].port))
				{
					return false;
				}
			}
			return true;
		}

		bool set(const char* key, const char* value, time_t expiration = 0)
		{
			unsigned int index = m_hash_fun(key) % memcache_servers.size();
			return memcache_servers[index].set(key, value, expiration);
		}

		bool delete_k_v(const char* key, time_t expiration = 0)
		{
			unsigned int index = m_hash_fun(key) % memcache_servers.size();
			return memcache_servers[index].delete_k_v(key, expiration);
		}

		char *get(const char* key)
		{
			unsigned int index = m_hash_fun(key) % memcache_servers.size();
			return memcache_servers[index].get(key);
		}

		string get_memcache_info(const char* key)
		{
			unsigned int index = m_hash_fun(key) % memcache_servers.size();
			return memcache_servers_id[index];
		}

	private:
		vector<single_memcached> memcache_servers;
		map<unsigned int, string> memcache_servers_id;
		hash_fun m_hash_fun;
};

#endif


//test 

#include <iostream>
#include <string>
#include "memcache_api.h"
using namespace std;

unsigned int user_define_hash(const char *str)  
{  
	unsigned int b = 378551;  
	unsigned int a = 63689;  
	unsigned int hash = 0;  							     
	while (*str)  
	{  	
		hash = hash * a + (*str++);  
		a *= b;  
	}
	hash += 1;
	hash -= 1;
	return (hash & 0x7FFFFFFF);  
}

int main(int argc, char *argv[])
{
	single_memcached mc;
	if(!mc.init("10.101.88.244", 11212))
	{
		cout<<"mc init error"<<endl;
	}

	char key[1024] = {0};
	char value[1024] = {"1"};
	if(!mc.set(key, value))
	{
	//	cout<<"set error"<<endl;
	}
	for(int i = 0; i < 1000; i++)
	{
		sprintf(key, "%d", i);
		if(!mc.set(key, key)){cout<<"error"<<endl;}
		if(mc.get(key))cout<<mc.get(key)<<endl;
		mc.delete_k_v(key);
	}

	memcache_info host_port;
	host_port.host = "10.101.88.244";
	host_port.port = 11211;
	vector<memcache_info> servers;
	servers.push_back(host_port);
	host_port.port = 11212;
	servers.push_back(host_port);
	mulit_memcached mulit_memcached_servers;
	if(!mulit_memcached_servers.init(servers))
	//if(!mulit_memcached_servers.init(servers, user_define_hash))
	{
		cout<<"init error"<<endl;
	}
	for(int i = 0; i < 1000; i++)
	{
		char k[16] = {0};
		sprintf(k, "%d", i);
		//cout<<k<<endl;
		cout<<mulit_memcached_servers.set(k, k)<<endl;;
		cout<<mulit_memcached_servers.set(k, k, 100)<<endl;;
		if(mulit_memcached_servers.get(k))cout<<mulit_memcached_servers.get(k)<<endl;
		cout<<mulit_memcached_servers.delete_k_v(k)<<endl;
		cout<<mulit_memcached_servers.get_memcache_info(key)<<endl;//user for log
	}
	return 0;
}





郑重声明:本站内容如果来自互联网及其他传播媒体,其版权均属原媒体及文章作者所有。转载目的在于传递更多信息及用于网络分享,并不代表本站赞同其观点和对其真实性负责,也不构成任何其他建议。