ssdb源码初探之libnet库

关于ssdb的介绍就不多说了,自行百度去了解下。因为工作上用到了ssdb,正好看看源码也可以学习下作者的设计思路。ssdb的作者在自己的博客上也写了不少关于它的文章,但说的很浅不够深入,借这个机会看看它的内在究竟是怎样的。我下的版本是1.8.0,直接从github上clone过来的。由于本人道行很浅,说的不对的地方还请大家不吝指正。

下载完后可以看到两个比较关键的目录src和deps。deps下包含了ssdb所依赖的一些项目(cpy、jemalloc、leveldb和snappy),cpy是作者造的一个编程语言,jemalloc是一个内存管理库,leveldb是一个kv数据库,ssdb底部的数据库引擎采用的是leveldb,snappy用来压缩和解压缩的开发包。src目录包含了ssdb核心的代码,这是我要分析的主要区域。src目录下有三个子目录:net、ssdb和util,编译这三个子目录会生成三个.a文件:libnet.a、libssdb.a和libutil.a,ssdb-server会依赖这三个静态链接库,不妨先从它们三个入手。本节先看看libnet库~

网络连接

相关文件:link.h、link.cpp

不妨先看看libutil库中的Buffer类,一个简单的缓冲区类,对SSDB协议格式有兴趣的可以看这篇博文,这对于看懂read_record和append_record方法有很大帮助:
技术分享

class Buffer{
    private:
        char *buf;          /* Buffer起始地址 */
        char *data_;        /* 数据区起始地址 */
        int size_;          /* 数据区大小 *
        int total_;         /* 缓冲区大小 */
        int origin_total;   /* 首次申请大小 */
    public:
        Buffer(int total);
        ~Buffer();

        int total() const{ // 缓冲区大小
            return total_;
        }

        bool empty() const{
            return size_ == 0;
        }

        // 数据
        char* data() const{
            return data_;
        }

        int size() const{
            return size_;
        }

        char* slot() const{
            return data_ + size_;
        }

        // 剩余空间大小
        int space() const{
            // 注:(data_ - buf)表示无效数据空间大小
            return total_ - (data_ - buf) - size_;
        }

        // 数据区扩大
        void incr(int num){
            size_ += num;
        }

        // 数据区缩小
        void decr(int num){
            size_ -= num;
            data_ += num;
        }

        // 保证不改变后半段的数据, 以便使已生成的 Bytes 不失效.
        // 注:当前段无效数据空间超过缓冲区一半时,把数据区拷贝到缓冲区起始地址
        void nice();
        // 扩大缓冲区
        int grow();

        std::string stats() const;
        int read_record(Bytes *s);

        int append(char c);
        int append(const char *p);
        int append(const void *p, int size);
        int append(const Bytes &s);

        int append_record(const Bytes &s);
};

Link类表示一个TCP连接,它封装好了常用的操作:建立连接、数据收发等。

class Link{
    private:
        int sock;                           /* socket fd */
        bool noblock_;                      /* socket是否阻塞 */
        bool error_;                        /* 是否出错 */
        std::vector<Bytes> recv_data;       /* 接收数据 */

        RedisLink *redis;                   /* redis连接 */
    public:
        char remote_ip[INET_ADDRSTRLEN];    /* 远端IP */
        int remote_port;                    /* 远端端口 */
        bool auth;                          /* 是否已验证身份 */

        static int min_recv_buf;            /* useless? */
        static int min_send_buf;            /* useless? */

        Buffer *input;                      /* 输入缓冲 */
        Buffer *output;                     /* 输出缓冲 */

        double create_time;                 /* 创建时间(秒) */
        double active_time;                 /* 活跃时间(秒) */

        Link(bool is_server=false);
        ~Link();
        void close();
        void nodelay(bool enable=true);
        // noblock(true) is supposed to corperate with IO Multiplex,
        // otherwise, flush() may cause a lot unneccessary write calls.
        void noblock(bool enable=true);
        void keepalive(bool enable=true);

        int fd() const{
            return sock;
        }
        bool error() const{
            return error_;
        }
        void mark_error(){
            error_ = true;
        }

        static Link* connect(const char *ip, int port);
        static Link* listen(const char *ip, int port);
        Link* accept();

        // read network data info buffer
        int read();
        int write();
        // flush buffered data to network
        // REQUIRES: nonblock
        int flush();

        /**
         * parse received data, and return -
         * NULL: error
         * empty vector: recv not ready
         * vector<Bytes>: recv ready
         */
        const std::vector<Bytes>* recv();
        // wait until a response received.
        const std::vector<Bytes>* response();

        // need to call flush to ensure all data has flush into network
        int send(const std::vector<std::string> &packet);
        int send(const std::vector<Bytes> &packet);
        int send(const Bytes &s1);
        int send(const Bytes &s1, const Bytes &s2);
        int send(const Bytes &s1, const Bytes &s2, const Bytes &s3);
        int send(const Bytes &s1, const Bytes &s2, const Bytes &s3, const Bytes &s4);
        int send(const Bytes &s1, const Bytes &s2, const Bytes &s3, const Bytes &s4, const Bytes &s5);

        const std::vector<Bytes>* last_recv(){
            return &recv_data;
        }

        /** these methods will send a request to the server, and wait until a response received.
         * @return
         * NULL: error
         * vector<Bytes>: response ready
         */
        const std::vector<Bytes>* request(const Bytes &s1);
        const std::vector<Bytes>* request(const Bytes &s1, const Bytes &s2);
        const std::vector<Bytes>* request(const Bytes &s1, const Bytes &s2, const Bytes &s3);
        const std::vector<Bytes>* request(const Bytes &s1, const Bytes &s2, const Bytes &s3, const Bytes &s4);
        const std::vector<Bytes>* request(const Bytes &s1, const Bytes &s2, const Bytes &s3, const Bytes &s4, const Bytes &s5);
};

fd事件

相关文件:fde.h、fde.cpp、fde_select.cpp和fde_epoll.cpp

事件相关的类有Fdevent和Fdevents,根据机器是否支持epoll来选择用select或epoll来管理fd事件。Fdevent为基本fd事件类,Fdevents为Fdevent的管理类。

struct Fdevent{
    int fd;
    int s_flags; // subscribed events
    int events;  // ready events
    struct{
        int num;
        void *ptr;
    }data;
};

class Fdevents{
    public:
        typedef std::vector<struct Fdevent *> events_t;
    private:
#ifdef HAVE_EPOLL
        static const int MAX_FDS = 8 * 1024;
        int ep_fd;
        struct epoll_event ep_events[MAX_FDS];
#else
        int maxfd;
        fd_set readset;
        fd_set writeset;
#endif
        events_t events;        // all fd events, please check get_fde method
        events_t ready_events;  // read events set

        struct Fdevent *get_fde(int fd);
    public:
        Fdevents();
        ~Fdevents();

        bool isset(int fd, int flag); // 判断是否侦听fd读/写事件
        int set(int fd, int flags, int data_num, void *data_ptr); // 添加fd事件
        int del(int fd); // 取消监听fd
        int clr(int fd, int flags);  // 清除fd读/写事件
        const events_t* wait(int timeout_ms=-1); // 监听fd事件,放在ready_events中
};

工作线程池WorkerPool

相关文件:proc.h、proc.cpp、worker.h、worker.cpp、util\thread.h

这里面有几个概念:Command、ProcMap、ProcJob、ProcWorker、WorkerPool、ProcWorkerPool。Command表示处理任务的指令,它有一个关键参数为proc函数指针,定义如下,不难发现它将请求req处理完后返回响应resp,具体实现可以自行定义。

typedef int (*proc_t)(NetworkServer *net, Link *link, const Request &req, Response *resp);

ProcMap可以理解为Command的一个全局集合,以Command名称-Command作为映射保存在map中。NetworkServer类里有个成员变量proc_map,初始化的时候会将一些基本的命令保存在proc_map。ProJob表示一个任务,它将连接和待执行的命令关联在一起。ProcWorker有点像工人,交给他一个ProcJob,他对ProcJob先进行解析,然后执行任务,最后将任务的结果保存在连接的输出缓冲里。WorkerPool是一个模板类,它是一个工作线程池,将Worker和Job关联在一起,它有几个关键的方法:start(开启num_workers个worker线程,每个线程执行_run_worker函数,实际上就是一个从jobs队列中取任务,执行任务,最终将结果push到results队列中)、stop(停止所有工作线程)、push(向jobs队列压进一个待完成的任务)和pop(从results队列中弹出一个已完成的任务)。

NetworkServer类

NetworkServer类就是libnet库里Server的实现类,主要可以看看init和serve两个方法,init完成了Server初始化的内容,serve方法是Server的主要逻辑实现,里面有很多状态维护的代码,用select/epoll写过代码的都清楚是什么样子,不得不承认这种异步的代码给程序员的心智带来了多大的负担。

怎样使用libnet库

C++语言重复造轮子的风气很严重,工作中要是用到的大件都是用开源的,容易遭到大公司一些人的歧视,尽管很多人造出来的轮子没什么利用价值。如果你对自己造轮子的能力没什么信心,比如像我这样的,可以试试用用别人写的。SSDB的作者给了一个demo教你怎么用这个库:

#include "server.h"

void usage(int argc, char **argv){
    printf("Usage:\n");
    printf("    %s [-d] /path/to/server.conf\n", argv[0]);
    printf("Options:\n");
    printf("    -d    run as daemon\n");
}

DEF_PROC(hello);

int main(int argc, char **argv){
    bool is_daemon = false; // 其实没用到,你可以自己实现daemon的机制或调用util/daemon.h下的daemonize函数
    const char *conf_file = NULL;
    for(int i=1; i<argc; i++){
        if(strcmp(argv[i], "-d") == 0){
            is_daemon = true;
        }else{
            conf_file = argv[i];
        }
    }
    if(conf_file == NULL){
        usage(argc, argv);
        exit(1);
    }

    NetworkServer *serv = NetworkServer::init(conf_file);
    if(!serv){
        exit(1);
    }
    // register command procedure
    serv->proc_map.set_proc("hello", proc_hello);
    serv->serve();
    delete serv;
    return 0;
}

int proc_hello(NetworkServer *net, Link *link, const Request &req, Response *resp){
    resp->push_back("ok");
    resp->push_back("world!");
    if(req.size() > 1){
        // The first argument start at index 1, just like argv.
        resp->push_back(req[1].String());
    }
    return 0;
}

郑重声明:本站内容如果来自互联网及其他传播媒体,其版权均属原媒体及文章作者所有。转载目的在于传递更多信息及用于网络分享,并不代表本站赞同其观点和对其真实性负责,也不构成任何其他建议。