ssdb源码初探之libnet库
关于ssdb的介绍就不多说了,自行百度去了解下。因为工作上用到了ssdb,正好看看源码也可以学习下作者的设计思路。ssdb的作者在自己的博客上也写了不少关于它的文章,但说的很浅不够深入,借这个机会看看它的内在究竟是怎样的。我下的版本是1.8.0,直接从github上clone过来的。由于本人道行很浅,说的不对的地方还请大家不吝指正。
下载完后可以看到两个比较关键的目录src和deps。deps下包含了ssdb所依赖的一些项目(cpy、jemalloc、leveldb和snappy),cpy是作者造的一个编程语言,jemalloc是一个内存管理库,leveldb是一个kv数据库,ssdb底部的数据库引擎采用的是leveldb,snappy用来压缩和解压缩的开发包。src目录包含了ssdb核心的代码,这是我要分析的主要区域。src目录下有三个子目录:net、ssdb和util,编译这三个子目录会生成三个.a文件:libnet.a、libssdb.a和libutil.a,ssdb-server会依赖这三个静态链接库,不妨先从它们三个入手。本节先看看libnet库~
网络连接
相关文件:link.h、link.cpp
不妨先看看libutil库中的Buffer类,一个简单的缓冲区类,对SSDB协议格式有兴趣的可以看这篇博文,这对于看懂read_record和append_record方法有很大帮助:
class Buffer{
private:
char *buf; /* Buffer起始地址 */
char *data_; /* 数据区起始地址 */
int size_; /* 数据区大小 *
int total_; /* 缓冲区大小 */
int origin_total; /* 首次申请大小 */
public:
Buffer(int total);
~Buffer();
int total() const{ // 缓冲区大小
return total_;
}
bool empty() const{
return size_ == 0;
}
// 数据
char* data() const{
return data_;
}
int size() const{
return size_;
}
char* slot() const{
return data_ + size_;
}
// 剩余空间大小
int space() const{
// 注:(data_ - buf)表示无效数据空间大小
return total_ - (data_ - buf) - size_;
}
// 数据区扩大
void incr(int num){
size_ += num;
}
// 数据区缩小
void decr(int num){
size_ -= num;
data_ += num;
}
// 保证不改变后半段的数据, 以便使已生成的 Bytes 不失效.
// 注:当前段无效数据空间超过缓冲区一半时,把数据区拷贝到缓冲区起始地址
void nice();
// 扩大缓冲区
int grow();
std::string stats() const;
int read_record(Bytes *s);
int append(char c);
int append(const char *p);
int append(const void *p, int size);
int append(const Bytes &s);
int append_record(const Bytes &s);
};
Link类表示一个TCP连接,它封装好了常用的操作:建立连接、数据收发等。
class Link{
private:
int sock; /* socket fd */
bool noblock_; /* socket是否阻塞 */
bool error_; /* 是否出错 */
std::vector<Bytes> recv_data; /* 接收数据 */
RedisLink *redis; /* redis连接 */
public:
char remote_ip[INET_ADDRSTRLEN]; /* 远端IP */
int remote_port; /* 远端端口 */
bool auth; /* 是否已验证身份 */
static int min_recv_buf; /* useless? */
static int min_send_buf; /* useless? */
Buffer *input; /* 输入缓冲 */
Buffer *output; /* 输出缓冲 */
double create_time; /* 创建时间(秒) */
double active_time; /* 活跃时间(秒) */
Link(bool is_server=false);
~Link();
void close();
void nodelay(bool enable=true);
// noblock(true) is supposed to corperate with IO Multiplex,
// otherwise, flush() may cause a lot unneccessary write calls.
void noblock(bool enable=true);
void keepalive(bool enable=true);
int fd() const{
return sock;
}
bool error() const{
return error_;
}
void mark_error(){
error_ = true;
}
static Link* connect(const char *ip, int port);
static Link* listen(const char *ip, int port);
Link* accept();
// read network data info buffer
int read();
int write();
// flush buffered data to network
// REQUIRES: nonblock
int flush();
/**
* parse received data, and return -
* NULL: error
* empty vector: recv not ready
* vector<Bytes>: recv ready
*/
const std::vector<Bytes>* recv();
// wait until a response received.
const std::vector<Bytes>* response();
// need to call flush to ensure all data has flush into network
int send(const std::vector<std::string> &packet);
int send(const std::vector<Bytes> &packet);
int send(const Bytes &s1);
int send(const Bytes &s1, const Bytes &s2);
int send(const Bytes &s1, const Bytes &s2, const Bytes &s3);
int send(const Bytes &s1, const Bytes &s2, const Bytes &s3, const Bytes &s4);
int send(const Bytes &s1, const Bytes &s2, const Bytes &s3, const Bytes &s4, const Bytes &s5);
const std::vector<Bytes>* last_recv(){
return &recv_data;
}
/** these methods will send a request to the server, and wait until a response received.
* @return
* NULL: error
* vector<Bytes>: response ready
*/
const std::vector<Bytes>* request(const Bytes &s1);
const std::vector<Bytes>* request(const Bytes &s1, const Bytes &s2);
const std::vector<Bytes>* request(const Bytes &s1, const Bytes &s2, const Bytes &s3);
const std::vector<Bytes>* request(const Bytes &s1, const Bytes &s2, const Bytes &s3, const Bytes &s4);
const std::vector<Bytes>* request(const Bytes &s1, const Bytes &s2, const Bytes &s3, const Bytes &s4, const Bytes &s5);
};
fd事件
相关文件:fde.h、fde.cpp、fde_select.cpp和fde_epoll.cpp
事件相关的类有Fdevent和Fdevents,根据机器是否支持epoll来选择用select或epoll来管理fd事件。Fdevent为基本fd事件类,Fdevents为Fdevent的管理类。
struct Fdevent{
int fd;
int s_flags; // subscribed events
int events; // ready events
struct{
int num;
void *ptr;
}data;
};
class Fdevents{
public:
typedef std::vector<struct Fdevent *> events_t;
private:
#ifdef HAVE_EPOLL
static const int MAX_FDS = 8 * 1024;
int ep_fd;
struct epoll_event ep_events[MAX_FDS];
#else
int maxfd;
fd_set readset;
fd_set writeset;
#endif
events_t events; // all fd events, please check get_fde method
events_t ready_events; // read events set
struct Fdevent *get_fde(int fd);
public:
Fdevents();
~Fdevents();
bool isset(int fd, int flag); // 判断是否侦听fd读/写事件
int set(int fd, int flags, int data_num, void *data_ptr); // 添加fd事件
int del(int fd); // 取消监听fd
int clr(int fd, int flags); // 清除fd读/写事件
const events_t* wait(int timeout_ms=-1); // 监听fd事件,放在ready_events中
};
工作线程池WorkerPool
相关文件:proc.h、proc.cpp、worker.h、worker.cpp、util\thread.h
这里面有几个概念:Command、ProcMap、ProcJob、ProcWorker、WorkerPool、ProcWorkerPool。Command表示处理任务的指令,它有一个关键参数为proc函数指针,定义如下,不难发现它将请求req处理完后返回响应resp,具体实现可以自行定义。
typedef int (*proc_t)(NetworkServer *net, Link *link, const Request &req, Response *resp);
ProcMap可以理解为Command的一个全局集合,以Command名称-Command作为映射保存在map中。NetworkServer类里有个成员变量proc_map,初始化的时候会将一些基本的命令保存在proc_map。ProJob表示一个任务,它将连接和待执行的命令关联在一起。ProcWorker有点像工人,交给他一个ProcJob,他对ProcJob先进行解析,然后执行任务,最后将任务的结果保存在连接的输出缓冲里。WorkerPool是一个模板类,它是一个工作线程池,将Worker和Job关联在一起,它有几个关键的方法:start(开启num_workers个worker线程,每个线程执行_run_worker函数,实际上就是一个从jobs队列中取任务,执行任务,最终将结果push到results队列中)、stop(停止所有工作线程)、push(向jobs队列压进一个待完成的任务)和pop(从results队列中弹出一个已完成的任务)。
NetworkServer类
NetworkServer类就是libnet库里Server的实现类,主要可以看看init和serve两个方法,init完成了Server初始化的内容,serve方法是Server的主要逻辑实现,里面有很多状态维护的代码,用select/epoll写过代码的都清楚是什么样子,不得不承认这种异步的代码给程序员的心智带来了多大的负担。
怎样使用libnet库
C++语言重复造轮子的风气很严重,工作中要是用到的大件都是用开源的,容易遭到大公司一些人的歧视,尽管很多人造出来的轮子没什么利用价值。如果你对自己造轮子的能力没什么信心,比如像我这样的,可以试试用用别人写的。SSDB的作者给了一个demo教你怎么用这个库:
#include "server.h"
void usage(int argc, char **argv){
printf("Usage:\n");
printf(" %s [-d] /path/to/server.conf\n", argv[0]);
printf("Options:\n");
printf(" -d run as daemon\n");
}
DEF_PROC(hello);
int main(int argc, char **argv){
bool is_daemon = false; // 其实没用到,你可以自己实现daemon的机制或调用util/daemon.h下的daemonize函数
const char *conf_file = NULL;
for(int i=1; i<argc; i++){
if(strcmp(argv[i], "-d") == 0){
is_daemon = true;
}else{
conf_file = argv[i];
}
}
if(conf_file == NULL){
usage(argc, argv);
exit(1);
}
NetworkServer *serv = NetworkServer::init(conf_file);
if(!serv){
exit(1);
}
// register command procedure
serv->proc_map.set_proc("hello", proc_hello);
serv->serve();
delete serv;
return 0;
}
int proc_hello(NetworkServer *net, Link *link, const Request &req, Response *resp){
resp->push_back("ok");
resp->push_back("world!");
if(req.size() > 1){
// The first argument start at index 1, just like argv.
resp->push_back(req[1].String());
}
return 0;
}
郑重声明:本站内容如果来自互联网及其他传播媒体,其版权均属原媒体及文章作者所有。转载目的在于传递更多信息及用于网络分享,并不代表本站赞同其观点和对其真实性负责,也不构成任何其他建议。