redis源码分析(7)——rdb

RDB是redis的另一种持久化方式,相当于是定时快照,也用于主从同步中快照+redo log。redis在进行RDB时,不需要加锁,这是通过利用父子进程共享同一份内存完成的。在父进程fork子进程之后,父子以copy-on-write方式共享同一份物理内存,当两个进程写内存时,才会按照内存页复制内存。这就需要保证在RDB时,最坏情况下需要保证有2倍的内存空间用于父子进程使用(redis使用时占用1G,那么就要保证系统有2G的内存,否则可能会出现使用swap的情况)。因为copy-on-write,所以需要避免不必要的内存拷贝。子进程中基本上只需要读内存,而父进程响应客户端请求,就需要修改内存,为了减少内存修改,父进程会暂停keyspace对应的hash表的rehash(rehash会有大量拷贝,要在不同的桶之间拷贝数据)。下面看一下RDB相关内容。

1. RDB文件格式

RDB文件格式比较简单,可以看做是一条条指令序列,每条指令的组成:

|-----------------------|----------------------------------|

| OP code: 1Byte  |       Instruction: nBytes    |

|-----------------------|----------------------------------|

在加载RDB时,就是对这个指令序列进行解析。所有的OP code包括:

REDIS_RDB_OPCODE_EXPIRETIME_MS: ms级的过期时间

REDIS_RDB_OPCODE_EXPIRETIME:秒级的过期时间

REDIS_RDB_OPCODE_SELECTDB:用于select db命令

REDIS_RDB_OPCODE_EOF:RDB文件结尾

OP code还包括所有的数据类型(REDIS_RDB_TYPE_LIST, REDIS_RDB_TYPE_SET等),用于指定后续kv对中,value的类型。

RDB文件的前5个字节是magic number,用于表示文件是RDB文件。接下来的4个字节是版本号,在加载RDB时,会根据RDB的版本号和redis的版本号比较,查看是否可以处理该版本的RDB。然后,是一条条指令序列,最后以EOF结尾。

2. RDB dump

首先看一下dump的时机,主要分为3块:

1)save命令:客户端发送save命令,redis实例阻塞执行dump。在saveCommand函数中。

2)bgsave命令:dump任务由子进程完成,主进程可以继续服务请求。在bgsaveCommand函数中。

3)被动触发:redis变更次数或者dump的间隔超过阈值。在serverCron中,检测并触发。

4)主从同步触发:在不能实现partial sync时,master需要将rdb传输给slave。在syncCommand函数中。

下面看一下rdb dump的具体过程,这是由rdbSave函数完成的。

    // <MM>
    // 创建并打开临时rdb文件
    // </MM>
    snprintf(tmpfile,256,"temp-%d.rdb", (int) getpid());
    fp = fopen(tmpfile,"w");
    if (!fp) {
        redisLog(REDIS_WARNING, "Failed opening .rdb for saving: %s",
            strerror(errno));
        return REDIS_ERR;
    }

    rioInitWithFile(&rdb,fp);
    if (server.rdb_checksum)
        rdb.update_cksum = rioGenericUpdateChecksum;
创建并打开临时文件,这是为了保证rdb的数据完整性,只有在dump成功后,才会替换原文件。然后是初始化rio,用于输出。

    // <MM>
    // 写入magic number,format:
    // 9bit: REDIS[RDB_VERSION]
    // </MM>
    snprintf(magic,sizeof(magic),"REDIS%04d",REDIS_RDB_VERSION);
    if (rdbWriteRaw(&rdb,magic,9) == -1) goto werr;
写入magic number以及版本号。

接下来是一个循环,用于对每个redis DB遍历,并生成对应的内容。

    for (j = 0; j < server.dbnum; j++) {
        // dump该DB
    }
看下每个DB的dump过程,实际上就是遍历并输出每个Key-Value对。

        redisDb *db = server.db+j;
        dict *d = db->dict;
        if (dictSize(d) == 0) continue;
        di = dictGetSafeIterator(d);
        if (!di) {
            fclose(fp);
            return REDIS_ERR;
        }
先判断DB是否为空,如果为空就跳过。然后获取DB的迭代器。

        /* Write the SELECT DB opcode */
        if (rdbSaveType(&rdb,REDIS_RDB_OPCODE_SELECTDB) == -1) goto werr;
        if (rdbSaveLen(&rdb,j) == -1) goto werr;
输出select db的opcode,然后是对应的DB号。具体格式是,1字节的OPcode,加上1,2或4字节的DB号。

        /* Iterate this DB writing every entry */
        while((de = dictNext(di)) != NULL) {
            sds keystr = dictGetKey(de);
            robj key, *o = dictGetVal(de);
            long long expire;

            initStaticStringObject(key,keystr);
            expire = getExpire(db,&key);
            if (rdbSaveKeyValuePair(&rdb,&key,o,expire,now) == -1) goto werr;
        }
        dictReleaseIterator(di);
然后是一个while循环,遍历所有K-V对,并进行dump。对于每个KV,获取key,value和expire time。然后调用rdbSaveKeyValuePair函数进行dump,下面就看一下这个函数。

/* Save a key-value pair, with expire time, type, key, value.
 * On error -1 is returned.
 * On success if the key was actually saved 1 is returned, otherwise 0
 * is returned (the key was already expired). */
int rdbSaveKeyValuePair(rio *rdb, robj *key, robj *val,
                        long long expiretime, long long now)
{
    /* Save the expire time */
    if (expiretime != -1) {
        /* If this key is already expired skip it */
        if (expiretime < now) return 0;
        if (rdbSaveType(rdb,REDIS_RDB_OPCODE_EXPIRETIME_MS) == -1) return -1;
        if (rdbSaveMillisecondTime(rdb,expiretime) == -1) return -1;
    }

    /* Save type, key, value */
    if (rdbSaveObjectType(rdb,val) == -1) return -1;
    if (rdbSaveStringObject(rdb,key) == -1) return -1;
    if (rdbSaveObject(rdb,val) == -1) return -1;
    return 1;
}
如果expire不为空,则输出该信息。先写出OPCode表示expire,然后是具体的超时时间。接下来是具体的KV对,首先也类似OPCode,表示value的类型,然后是字符串类型的key,最后是value对象。具体对象的dump内容比较多,这里暂时不展开。

上面完成所有DB的dump后,接下来看一下收尾工作。

    di = NULL; /* So that we don‘t release it again on error. */

    /* EOF opcode */
    if (rdbSaveType(&rdb,REDIS_RDB_OPCODE_EOF) == -1) goto werr;
输出EOF对应的OPCode,表示rdb结束。

    /* CRC64 checksum. It will be zero if checksum computation is disabled, the
     * loading code skips the check in this case. */
    cksum = rdb.cksum;
    memrev64ifbe(&cksum);
    if (rioWrite(&rdb,&cksum,8) == 0) goto werr;
计算并输出CRC。

    /* Make sure data will not remain on the OS‘s output buffers */
    if (fflush(fp) == EOF) goto werr;
    if (fsync(fileno(fp)) == -1) goto werr;
    if (fclose(fp) == EOF) goto werr;
首先是调用fflush将输出缓冲区刷新到page cache,然后调用fsync将cache中的内容写盘,最后关闭文件。

    /* Use RENAME to make sure the DB file is changed atomically only
     * if the generate DB file is ok. */
    if (rename(tmpfile,filename) == -1) {
        redisLog(REDIS_WARNING,"Error moving temp DB file on the final destination: %s", strerror(errno));
        unlink(tmpfile);
        return REDIS_ERR;
    }
将临时文件重命名为指定的文件名。

    redisLog(REDIS_NOTICE,"DB saved on disk");
    server.dirty = 0;
    server.lastsave = time(NULL);
    server.lastbgsave_status = REDIS_OK;
    return REDIS_OK;
最后,打印日志,重置dirty和lastsave,这两个值会影响被动触发rdb dump的时机。

werr:
    fclose(fp);
    unlink(tmpfile);
    redisLog(REDIS_WARNING,"Write error saving DB on disk: %s", strerror(errno));
    if (di) dictReleaseIterator(di);
    return REDIS_ERR;
上述出错的错误处理,主要就是删除临时文件,销毁迭代器并打印日志。

上面就是整个rdb dump的过程,在后台进行rdb dump时,上述是在子进程中完成的,主进程还需要进行最后的一些清理工作,下面看一下这个部分。在serverCron中,如果server.rdb_child_pid不为-1(存在rdb dump的子进程),会调用wait3对子进程收割,如果是rdb子进程完成,会调用backgroundSaveDoneHandler函数做最后处理。

/* A background saving child (BGSAVE) terminated its work. Handle this. */
void backgroundSaveDoneHandler(int exitcode, int bysignal) {
    if (!bysignal && exitcode == 0) {
        redisLog(REDIS_NOTICE,
            "Background saving terminated with success");
        server.dirty = server.dirty - server.dirty_before_bgsave;
        server.lastsave = time(NULL);
        server.lastbgsave_status = REDIS_OK;
    } else if (!bysignal && exitcode != 0) {
        redisLog(REDIS_WARNING, "Background saving error");
        server.lastbgsave_status = REDIS_ERR;
    } else {
        mstime_t latency;

        redisLog(REDIS_WARNING,
            "Background saving terminated by signal %d", bysignal);
        latencyStartMonitor(latency);
        rdbRemoveTempFile(server.rdb_child_pid);
        latencyEndMonitor(latency);
        latencyAddSampleIfNeeded("rdb-unlink-temp-file",latency);
        /* SIGUSR1 is whitelisted, so we have a way to kill a child without
         * tirggering an error conditon. */
        if (bysignal != SIGUSR1)
            server.lastbgsave_status = REDIS_ERR;
    }
    server.rdb_child_pid = -1;
    server.rdb_save_time_last = time(NULL)-server.rdb_save_time_start;
    server.rdb_save_time_start = -1;
    /* Possibly there are slaves waiting for a BGSAVE in order to be served
     * (the first stage of SYNC is a bulk transfer of dump.rdb) */
    updateSlavesWaitingBgsave((!bysignal && exitcode == 0) ? REDIS_OK : REDIS_ERR);
}
相对于aof rewrite,这块工作要简单一些,主要是根据子进程的退出状态以及是否被信号kill进行处理。最后一个函数updateSlavesWaitingBgsave是用于在主从同步中,完成rdb dump,通知向slave传输rdb。

3. RDB Load

RDB加载主要是两个地方会用到:

1)redis启动时,加载RDB

2)主从同步时,master向从发送RDB

redis启动时加载RDB,和AOF一样,是在loadDataFromDisk函数:

/* Function called at startup to load RDB or AOF file in memory. */
void loadDataFromDisk(void) {
    long long start = ustime();
    if (server.aof_state == REDIS_AOF_ON) {
        if (loadAppendOnlyFile(server.aof_filename) == REDIS_OK)
            redisLog(REDIS_NOTICE,"DB loaded from append only file: %.3f seconds",(float)(ustime()-start)/1000000);
    } else {
        if (rdbLoad(server.rdb_filename) == REDIS_OK) {
            redisLog(REDIS_NOTICE,"DB loaded from disk: %.3f seconds",
                (float)(ustime()-start)/1000000);
        } else if (errno != ENOENT) {
            redisLog(REDIS_WARNING,"Fatal error loading the DB: %s. Exiting.",strerror(errno));
            exit(1);
        }
    }
}
如果没有开启AOF配置,那么就会尝试加载RDB。由rdbLoad完成RDB的加载,下面看一下这个函数。

    uint32_t dbid;
    int type, rdbver;
    redisDb *db = server.db+0;
    char buf[1024];
    long long expiretime, now = mstime();
    FILE *fp;
    rio rdb;

    if ((fp = fopen(filename,"r")) == NULL) return REDIS_ERR;
首先是打开RDB文件。

    rioInitWithFile(&rdb,fp);
    rdb.update_cksum = rdbLoadProgressCallback;
    rdb.max_processing_chunk = server.loading_process_events_interval_bytes;
初始化rio,设置update_cksum回调函数,以及read的块大小(默认配置的是2M)。这里使用update_cksum还完成一些功能:

1)更新加载进度

2)如果是主从同步过程中,加载RDB,因为整个加载过程可能会很漫长,所以需要不停的想master发送心跳,避免master认为这个slave已经timeout,主动断开连接。

3)处理一些io事件

    if (rioRead(&rdb,buf,9) == 0) goto eoferr;
    buf[9] = ‘\0‘;
    if (memcmp(buf,"REDIS",5) != 0) {
        fclose(fp);
        redisLog(REDIS_WARNING,"Wrong signature trying to load DB from file");
        errno = EINVAL;
        return REDIS_ERR;
    }
    rdbver = atoi(buf+5);
    if (rdbver < 1 || rdbver > REDIS_RDB_VERSION) {
        fclose(fp);
        redisLog(REDIS_WARNING,"Can‘t handle RDB format version %d",rdbver);
        errno = EINVAL;
        return REDIS_ERR;
    }
读取RDB前9个字节,校验magic number以及版本号。

    startLoading(fp);
准备开始加载,记录加载开始时间,以及需要加载的字节总数,用于更新加载进度。

接下来是一个解释循环,不停的读取一条条指令。

    while (1) {
        // 解释一条条指令
    }
下面看一下一条指令的解释过程:

        robj *key, *val;
        expiretime = -1;

        /* Read type. */
        if ((type = rdbLoadType(&rdb)) == -1) goto eoferr;
首先读取type(对应OPCode)。

        if (type == REDIS_RDB_OPCODE_EXPIRETIME) {
            if ((expiretime = rdbLoadTime(&rdb)) == -1) goto eoferr;
            /* We read the time so we need to read the object type again. */
            if ((type = rdbLoadType(&rdb)) == -1) goto eoferr;
            /* the EXPIRETIME opcode specifies time in seconds, so convert
             * into milliseconds. */
            expiretime *= 1000;
        } else if (type == REDIS_RDB_OPCODE_EXPIRETIME_MS) {
            /* Milliseconds precision expire times introduced with RDB
             * version 3. */
            if ((expiretime = rdbLoadMillisecondTime(&rdb)) == -1) goto eoferr;
            /* We read the time so we need to read the object type again. */
            if ((type = rdbLoadType(&rdb)) == -1) goto eoferr;
        }
如果OPCode对应的expire指令,需要解析出对应的expire time,然后再次读取type(对应随后的kv对中value的类型)。

        if (type == REDIS_RDB_OPCODE_EOF)
            break;
如果OPCode对应的是EOF指令,则RDB加载完成,跳出循环。

        /* Handle SELECT DB opcode as a special case */
        if (type == REDIS_RDB_OPCODE_SELECTDB) {
            if ((dbid = rdbLoadLen(&rdb,NULL)) == REDIS_RDB_LENERR)
                goto eoferr;
            if (dbid >= (unsigned)server.dbnum) {
                redisLog(REDIS_WARNING,"FATAL: Data file was created with a Redis server configured to handle more than %d databases. Exiting\n", server.dbnum);
                exit(1);
            }
            db = server.db+dbid;
            continue;
        }
如果OPCode是selectDB指令,读取DB号,然后切换到对应的DB。

上面把特殊的指令执行完毕,接下来要解析KV对。

        /* Read key */
        if ((key = rdbLoadStringObject(&rdb)) == NULL) goto eoferr;
读取字符串类型的key。

        /* Read value */
        if ((val = rdbLoadObject(type,&rdb)) == NULL) goto eoferr;
读取value,函数rdbLoadObject会根据type的不同,执行不同类型对象的加载,这里不对该函数展开。

        /* Check if the key already expired. This function is used when loading
         * an RDB file from disk, either at startup, or when an RDB was
         * received from the master. In the latter case, the master is
         * responsible for key expiry. If we would expire keys here, the
         * snapshot taken by the master may not be reflected on the slave. */
        if (server.masterhost == NULL && expiretime != -1 && expiretime < now) {
            decrRefCount(key);
            decrRefCount(val);
            continue;
        }
检测是否过期,如果过期就不会将KV对添加到DB中。

        /* Add the new object in the hash table */
        dbAdd(db,key,val);
将KV对添加到DB。

        /* Set the expire time if needed */
        if (expiretime != -1) setExpire(db,key,expiretime);

        decrRefCount(key);
如果设置了expire time,则添加到expire dict中。

    /* Verify the checksum if RDB version is >= 5 */
    if (rdbver >= 5 && server.rdb_checksum) {
        uint64_t cksum, expected = rdb.cksum;

        if (rioRead(&rdb,&cksum,8) == 0) goto eoferr;
        memrev64ifbe(&cksum);
        if (cksum == 0) {
            redisLog(REDIS_WARNING,"RDB file was saved with checksum disabled: no check performed.");
        } else if (cksum != expected) {
            redisLog(REDIS_WARNING,"Wrong RDB checksum. Aborting now.");
            exit(1);
        }
    }
检查check sum。

    fclose(fp);
    stopLoading();
    return REDIS_OK;
最后,关闭文件,并将server.load置为0,表示加载不在进行。

eoferr: /* unexpected end of file is handled here with a fatal exit */
    redisLog(REDIS_WARNING,"Short read or OOM loading DB. Unrecoverable error, aborting now.");
    exit(1);
    return REDIS_ERR; /* Just to avoid warning */
在上述加载过程中出错,会跳到eoferr分支。加载出错时,打印日志并退出进程。


郑重声明:本站内容如果来自互联网及其他传播媒体,其版权均属原媒体及文章作者所有。转载目的在于传递更多信息及用于网络分享,并不代表本站赞同其观点和对其真实性负责,也不构成任何其他建议。