redis源码分析(7)——rdb
RDB是redis的另一种持久化方式,相当于是定时快照,也用于主从同步中快照+redo log。redis在进行RDB时,不需要加锁,这是通过利用父子进程共享同一份内存完成的。在父进程fork子进程之后,父子以copy-on-write方式共享同一份物理内存,当两个进程写内存时,才会按照内存页复制内存。这就需要保证在RDB时,最坏情况下需要保证有2倍的内存空间用于父子进程使用(redis使用时占用1G,那么就要保证系统有2G的内存,否则可能会出现使用swap的情况)。因为copy-on-write,所以需要避免不必要的内存拷贝。子进程中基本上只需要读内存,而父进程响应客户端请求,就需要修改内存,为了减少内存修改,父进程会暂停keyspace对应的hash表的rehash(rehash会有大量拷贝,要在不同的桶之间拷贝数据)。下面看一下RDB相关内容。
1. RDB文件格式
RDB文件格式比较简单,可以看做是一条条指令序列,每条指令的组成:
|-----------------------|----------------------------------|
| OP code: 1Byte | Instruction: nBytes |
|-----------------------|----------------------------------|
在加载RDB时,就是对这个指令序列进行解析。所有的OP code包括:
REDIS_RDB_OPCODE_EXPIRETIME_MS: ms级的过期时间
REDIS_RDB_OPCODE_EXPIRETIME:秒级的过期时间
REDIS_RDB_OPCODE_SELECTDB:用于select db命令
REDIS_RDB_OPCODE_EOF:RDB文件结尾
OP code还包括所有的数据类型(REDIS_RDB_TYPE_LIST, REDIS_RDB_TYPE_SET等),用于指定后续kv对中,value的类型。
RDB文件的前5个字节是magic number,用于表示文件是RDB文件。接下来的4个字节是版本号,在加载RDB时,会根据RDB的版本号和redis的版本号比较,查看是否可以处理该版本的RDB。然后,是一条条指令序列,最后以EOF结尾。
2. RDB dump
首先看一下dump的时机,主要分为3块:
1)save命令:客户端发送save命令,redis实例阻塞执行dump。在saveCommand函数中。
2)bgsave命令:dump任务由子进程完成,主进程可以继续服务请求。在bgsaveCommand函数中。
3)被动触发:redis变更次数或者dump的间隔超过阈值。在serverCron中,检测并触发。
4)主从同步触发:在不能实现partial sync时,master需要将rdb传输给slave。在syncCommand函数中。
下面看一下rdb dump的具体过程,这是由rdbSave函数完成的。
// <MM> // 创建并打开临时rdb文件 // </MM> snprintf(tmpfile,256,"temp-%d.rdb", (int) getpid()); fp = fopen(tmpfile,"w"); if (!fp) { redisLog(REDIS_WARNING, "Failed opening .rdb for saving: %s", strerror(errno)); return REDIS_ERR; } rioInitWithFile(&rdb,fp); if (server.rdb_checksum) rdb.update_cksum = rioGenericUpdateChecksum;创建并打开临时文件,这是为了保证rdb的数据完整性,只有在dump成功后,才会替换原文件。然后是初始化rio,用于输出。
// <MM> // 写入magic number,format: // 9bit: REDIS[RDB_VERSION] // </MM> snprintf(magic,sizeof(magic),"REDIS%04d",REDIS_RDB_VERSION); if (rdbWriteRaw(&rdb,magic,9) == -1) goto werr;写入magic number以及版本号。
接下来是一个循环,用于对每个redis DB遍历,并生成对应的内容。
for (j = 0; j < server.dbnum; j++) { // dump该DB }看下每个DB的dump过程,实际上就是遍历并输出每个Key-Value对。
redisDb *db = server.db+j; dict *d = db->dict; if (dictSize(d) == 0) continue; di = dictGetSafeIterator(d); if (!di) { fclose(fp); return REDIS_ERR; }先判断DB是否为空,如果为空就跳过。然后获取DB的迭代器。
/* Write the SELECT DB opcode */ if (rdbSaveType(&rdb,REDIS_RDB_OPCODE_SELECTDB) == -1) goto werr; if (rdbSaveLen(&rdb,j) == -1) goto werr;输出select db的opcode,然后是对应的DB号。具体格式是,1字节的OPcode,加上1,2或4字节的DB号。
/* Iterate this DB writing every entry */ while((de = dictNext(di)) != NULL) { sds keystr = dictGetKey(de); robj key, *o = dictGetVal(de); long long expire; initStaticStringObject(key,keystr); expire = getExpire(db,&key); if (rdbSaveKeyValuePair(&rdb,&key,o,expire,now) == -1) goto werr; } dictReleaseIterator(di);然后是一个while循环,遍历所有K-V对,并进行dump。对于每个KV,获取key,value和expire time。然后调用rdbSaveKeyValuePair函数进行dump,下面就看一下这个函数。
/* Save a key-value pair, with expire time, type, key, value. * On error -1 is returned. * On success if the key was actually saved 1 is returned, otherwise 0 * is returned (the key was already expired). */ int rdbSaveKeyValuePair(rio *rdb, robj *key, robj *val, long long expiretime, long long now) { /* Save the expire time */ if (expiretime != -1) { /* If this key is already expired skip it */ if (expiretime < now) return 0; if (rdbSaveType(rdb,REDIS_RDB_OPCODE_EXPIRETIME_MS) == -1) return -1; if (rdbSaveMillisecondTime(rdb,expiretime) == -1) return -1; } /* Save type, key, value */ if (rdbSaveObjectType(rdb,val) == -1) return -1; if (rdbSaveStringObject(rdb,key) == -1) return -1; if (rdbSaveObject(rdb,val) == -1) return -1; return 1; }如果expire不为空,则输出该信息。先写出OPCode表示expire,然后是具体的超时时间。接下来是具体的KV对,首先也类似OPCode,表示value的类型,然后是字符串类型的key,最后是value对象。具体对象的dump内容比较多,这里暂时不展开。
上面完成所有DB的dump后,接下来看一下收尾工作。
di = NULL; /* So that we don‘t release it again on error. */ /* EOF opcode */ if (rdbSaveType(&rdb,REDIS_RDB_OPCODE_EOF) == -1) goto werr;输出EOF对应的OPCode,表示rdb结束。
/* CRC64 checksum. It will be zero if checksum computation is disabled, the * loading code skips the check in this case. */ cksum = rdb.cksum; memrev64ifbe(&cksum); if (rioWrite(&rdb,&cksum,8) == 0) goto werr;计算并输出CRC。
/* Make sure data will not remain on the OS‘s output buffers */ if (fflush(fp) == EOF) goto werr; if (fsync(fileno(fp)) == -1) goto werr; if (fclose(fp) == EOF) goto werr;首先是调用fflush将输出缓冲区刷新到page cache,然后调用fsync将cache中的内容写盘,最后关闭文件。
/* Use RENAME to make sure the DB file is changed atomically only * if the generate DB file is ok. */ if (rename(tmpfile,filename) == -1) { redisLog(REDIS_WARNING,"Error moving temp DB file on the final destination: %s", strerror(errno)); unlink(tmpfile); return REDIS_ERR; }将临时文件重命名为指定的文件名。
redisLog(REDIS_NOTICE,"DB saved on disk"); server.dirty = 0; server.lastsave = time(NULL); server.lastbgsave_status = REDIS_OK; return REDIS_OK;最后,打印日志,重置dirty和lastsave,这两个值会影响被动触发rdb dump的时机。
werr: fclose(fp); unlink(tmpfile); redisLog(REDIS_WARNING,"Write error saving DB on disk: %s", strerror(errno)); if (di) dictReleaseIterator(di); return REDIS_ERR;上述出错的错误处理,主要就是删除临时文件,销毁迭代器并打印日志。
上面就是整个rdb dump的过程,在后台进行rdb dump时,上述是在子进程中完成的,主进程还需要进行最后的一些清理工作,下面看一下这个部分。在serverCron中,如果server.rdb_child_pid不为-1(存在rdb dump的子进程),会调用wait3对子进程收割,如果是rdb子进程完成,会调用backgroundSaveDoneHandler函数做最后处理。
/* A background saving child (BGSAVE) terminated its work. Handle this. */ void backgroundSaveDoneHandler(int exitcode, int bysignal) { if (!bysignal && exitcode == 0) { redisLog(REDIS_NOTICE, "Background saving terminated with success"); server.dirty = server.dirty - server.dirty_before_bgsave; server.lastsave = time(NULL); server.lastbgsave_status = REDIS_OK; } else if (!bysignal && exitcode != 0) { redisLog(REDIS_WARNING, "Background saving error"); server.lastbgsave_status = REDIS_ERR; } else { mstime_t latency; redisLog(REDIS_WARNING, "Background saving terminated by signal %d", bysignal); latencyStartMonitor(latency); rdbRemoveTempFile(server.rdb_child_pid); latencyEndMonitor(latency); latencyAddSampleIfNeeded("rdb-unlink-temp-file",latency); /* SIGUSR1 is whitelisted, so we have a way to kill a child without * tirggering an error conditon. */ if (bysignal != SIGUSR1) server.lastbgsave_status = REDIS_ERR; } server.rdb_child_pid = -1; server.rdb_save_time_last = time(NULL)-server.rdb_save_time_start; server.rdb_save_time_start = -1; /* Possibly there are slaves waiting for a BGSAVE in order to be served * (the first stage of SYNC is a bulk transfer of dump.rdb) */ updateSlavesWaitingBgsave((!bysignal && exitcode == 0) ? REDIS_OK : REDIS_ERR); }相对于aof rewrite,这块工作要简单一些,主要是根据子进程的退出状态以及是否被信号kill进行处理。最后一个函数updateSlavesWaitingBgsave是用于在主从同步中,完成rdb dump,通知向slave传输rdb。
3. RDB Load
RDB加载主要是两个地方会用到:
1)redis启动时,加载RDB
2)主从同步时,master向从发送RDB
redis启动时加载RDB,和AOF一样,是在loadDataFromDisk函数:
/* Function called at startup to load RDB or AOF file in memory. */ void loadDataFromDisk(void) { long long start = ustime(); if (server.aof_state == REDIS_AOF_ON) { if (loadAppendOnlyFile(server.aof_filename) == REDIS_OK) redisLog(REDIS_NOTICE,"DB loaded from append only file: %.3f seconds",(float)(ustime()-start)/1000000); } else { if (rdbLoad(server.rdb_filename) == REDIS_OK) { redisLog(REDIS_NOTICE,"DB loaded from disk: %.3f seconds", (float)(ustime()-start)/1000000); } else if (errno != ENOENT) { redisLog(REDIS_WARNING,"Fatal error loading the DB: %s. Exiting.",strerror(errno)); exit(1); } } }如果没有开启AOF配置,那么就会尝试加载RDB。由rdbLoad完成RDB的加载,下面看一下这个函数。
uint32_t dbid; int type, rdbver; redisDb *db = server.db+0; char buf[1024]; long long expiretime, now = mstime(); FILE *fp; rio rdb; if ((fp = fopen(filename,"r")) == NULL) return REDIS_ERR;首先是打开RDB文件。
rioInitWithFile(&rdb,fp); rdb.update_cksum = rdbLoadProgressCallback; rdb.max_processing_chunk = server.loading_process_events_interval_bytes;初始化rio,设置update_cksum回调函数,以及read的块大小(默认配置的是2M)。这里使用update_cksum还完成一些功能:
1)更新加载进度
2)如果是主从同步过程中,加载RDB,因为整个加载过程可能会很漫长,所以需要不停的想master发送心跳,避免master认为这个slave已经timeout,主动断开连接。
3)处理一些io事件
if (rioRead(&rdb,buf,9) == 0) goto eoferr; buf[9] = ‘\0‘; if (memcmp(buf,"REDIS",5) != 0) { fclose(fp); redisLog(REDIS_WARNING,"Wrong signature trying to load DB from file"); errno = EINVAL; return REDIS_ERR; } rdbver = atoi(buf+5); if (rdbver < 1 || rdbver > REDIS_RDB_VERSION) { fclose(fp); redisLog(REDIS_WARNING,"Can‘t handle RDB format version %d",rdbver); errno = EINVAL; return REDIS_ERR; }读取RDB前9个字节,校验magic number以及版本号。
startLoading(fp);准备开始加载,记录加载开始时间,以及需要加载的字节总数,用于更新加载进度。
接下来是一个解释循环,不停的读取一条条指令。
while (1) { // 解释一条条指令 }下面看一下一条指令的解释过程:
robj *key, *val; expiretime = -1; /* Read type. */ if ((type = rdbLoadType(&rdb)) == -1) goto eoferr;首先读取type(对应OPCode)。
if (type == REDIS_RDB_OPCODE_EXPIRETIME) { if ((expiretime = rdbLoadTime(&rdb)) == -1) goto eoferr; /* We read the time so we need to read the object type again. */ if ((type = rdbLoadType(&rdb)) == -1) goto eoferr; /* the EXPIRETIME opcode specifies time in seconds, so convert * into milliseconds. */ expiretime *= 1000; } else if (type == REDIS_RDB_OPCODE_EXPIRETIME_MS) { /* Milliseconds precision expire times introduced with RDB * version 3. */ if ((expiretime = rdbLoadMillisecondTime(&rdb)) == -1) goto eoferr; /* We read the time so we need to read the object type again. */ if ((type = rdbLoadType(&rdb)) == -1) goto eoferr; }如果OPCode对应的expire指令,需要解析出对应的expire time,然后再次读取type(对应随后的kv对中value的类型)。
if (type == REDIS_RDB_OPCODE_EOF) break;如果OPCode对应的是EOF指令,则RDB加载完成,跳出循环。
/* Handle SELECT DB opcode as a special case */ if (type == REDIS_RDB_OPCODE_SELECTDB) { if ((dbid = rdbLoadLen(&rdb,NULL)) == REDIS_RDB_LENERR) goto eoferr; if (dbid >= (unsigned)server.dbnum) { redisLog(REDIS_WARNING,"FATAL: Data file was created with a Redis server configured to handle more than %d databases. Exiting\n", server.dbnum); exit(1); } db = server.db+dbid; continue; }如果OPCode是selectDB指令,读取DB号,然后切换到对应的DB。
上面把特殊的指令执行完毕,接下来要解析KV对。
/* Read key */ if ((key = rdbLoadStringObject(&rdb)) == NULL) goto eoferr;读取字符串类型的key。
/* Read value */ if ((val = rdbLoadObject(type,&rdb)) == NULL) goto eoferr;读取value,函数rdbLoadObject会根据type的不同,执行不同类型对象的加载,这里不对该函数展开。
/* Check if the key already expired. This function is used when loading * an RDB file from disk, either at startup, or when an RDB was * received from the master. In the latter case, the master is * responsible for key expiry. If we would expire keys here, the * snapshot taken by the master may not be reflected on the slave. */ if (server.masterhost == NULL && expiretime != -1 && expiretime < now) { decrRefCount(key); decrRefCount(val); continue; }检测是否过期,如果过期就不会将KV对添加到DB中。
/* Add the new object in the hash table */ dbAdd(db,key,val);将KV对添加到DB。
/* Set the expire time if needed */ if (expiretime != -1) setExpire(db,key,expiretime); decrRefCount(key);如果设置了expire time,则添加到expire dict中。
/* Verify the checksum if RDB version is >= 5 */ if (rdbver >= 5 && server.rdb_checksum) { uint64_t cksum, expected = rdb.cksum; if (rioRead(&rdb,&cksum,8) == 0) goto eoferr; memrev64ifbe(&cksum); if (cksum == 0) { redisLog(REDIS_WARNING,"RDB file was saved with checksum disabled: no check performed."); } else if (cksum != expected) { redisLog(REDIS_WARNING,"Wrong RDB checksum. Aborting now."); exit(1); } }检查check sum。
fclose(fp); stopLoading(); return REDIS_OK;最后,关闭文件,并将server.load置为0,表示加载不在进行。
eoferr: /* unexpected end of file is handled here with a fatal exit */ redisLog(REDIS_WARNING,"Short read or OOM loading DB. Unrecoverable error, aborting now."); exit(1); return REDIS_ERR; /* Just to avoid warning */在上述加载过程中出错,会跳到eoferr分支。加载出错时,打印日志并退出进程。
郑重声明:本站内容如果来自互联网及其他传播媒体,其版权均属原媒体及文章作者所有。转载目的在于传递更多信息及用于网络分享,并不代表本站赞同其观点和对其真实性负责,也不构成任何其他建议。