本文将会从源码层面解读 Redis 的各项机制。我们会从最基础的内容开始,尽量只看 Redis 最核心的部分,而剥离掉一些不太重要的内容,所有的源码都给出了 GitHub 的链接。
写作本文时,Redis CE 还在 7.4 版本;Redis 8.0 仍然在 Pre-Release。因此,本文主要基于 Redis 7.4,其余版本可能有所不同。本文使用的环境为 Debian 12。
持久化
Redis 的持久化有两种方式:RDB(Redis DataBase)和 AOF(Append Only File)。RDB 是快照的方式,AOF 是日志的方式。
RDB
RDB 是通过快照的方式来持久化数据的。Redis 会将内存中的数据保存到磁盘上,以便在重启时可以快速加载数据。
RDB 提供了两种保存方式:save
和 bgsave
。save
是阻塞的,bgsave
则会 fork 一个子进程来异步地进行持久化操作。
save
做的是全量快照,因此不可能每做一次修改都进行一次快照。
save
命令也可以通过配置文件来实现每隔一段时间自动保存快照。例如:
save 900 1
save 300 10
save 60 10000
这个配置表示,如果 900 秒内有至少 1 个 key 被修改,就会保存快照;如果 300 秒内有至少 10 个 key 被修改,就会保存快照;如果 60 秒内有至少 10000 个 key 被修改,就会保存快照。
save
我们先来看较简单的 save
。
在这个过程中,RDB 的保存操作为了防止数据损坏:
- 先将数据保存到
rio
结构体中,写入的内容包括- RDB 版本号
- 辅助信息
- 模块辅助信息(前半部分)
- 函数信息
- 数据
- 遍历并序列化写入
- 模块辅助信息(后半部分)
- 结束符
- 校验和
- 然后再将
rio
结构体写入临时文件temp-pid.rdb
中 - 然后再重命名为正式文件
临时文件的作用是,即使保存过程中出现了问题,也不会影响到原有的备份文件。而这里的 rio
相当于做了一个序列化的工作。
点击查看源码
save
的实现如下:
int rdbSave(int req, char *filename, rdbSaveInfo *rsi, int rdbflags) {
/* ... */
startSaving(rdbflags);
// 生成临时文件名为 temp-进程号.rdb
snprintf(tmpfile,256,"temp-%d.rdb", (int) getpid());
// 保存数据到临时文件
if (rdbSaveInternal(req,tmpfile,rsi,rdbflags) != C_OK) {
/* ... */
}
// 将临时文件重命名为正式文件
if (rename(tmpfile,filename) == -1) {
/* ... */
}
// 同步目录,确保文件已经写入磁盘
if (fsyncFileDir(filename) != 0) {
/* ... */
}
/* ... */
// 重置脏数据
server.dirty = 0;
// 更新最后保存时间
server.lastsave = time(NULL);
// 更新最后保存状态
server.lastbgsave_status = C_OK;
stopSaving(1);
return C_OK;
}
具体的保存操作在 rdbSaveInternal
中:
static int rdbSaveInternal(int req, const char *filename, rdbSaveInfo *rsi, int rdbflags) {
/* ... */
// 打开文件进行写操作
FILE *fp = fopen(filename,"w");
/* ... */
// 初始化 rio 结构体
rioInitWithFile(&rdb,fp);
// 如果使用了增量同步,设置自动同步和回收缓存
if (server.rdb_save_incremental_fsync) {
rioSetAutoSync(&rdb,REDIS_AUTOSYNC_BYTES);
if (!(rdbflags & RDBFLAGS_KEEP_CACHE)) rioSetReclaimCache(&rdb,1);
}
// 保存数据到 rio 结构体
if (rdbSaveRio(req,&rdb,&error,rdbflags,rsi) == C_ERR) {
errno = error;
err_op = "rdbSaveRio";
goto werr;
}
// 刷新文件缓冲区
if (fflush(fp)) { err_op = "fflush"; goto werr; }
// 同步文件到磁盘
if (fsync(fileno(fp))) { err_op = "fsync"; goto werr; }
// 如果没有设置保留缓存,回收缓存
if (!(rdbflags & RDBFLAGS_KEEP_CACHE) && reclaimFilePageCache(fileno(fp), 0, 0) == -1) {
/* ... */
}
// 关闭文件
if (fclose(fp)) { fp = NULL; err_op = "fclose"; goto werr; }
return C_OK;
/* ... */
}
具体写入的内容在 rdbSaveRio
将其分为了几个部分:
int rdbSaveRio(int req, rio *rdb, int *error, int rdbflags, rdbSaveInfo *rsi) {
char magic[10];
uint64_t cksum;
long key_counter = 0;
int j;
// 如果启用了校验和,设置校验和函数
if (server.rdb_checksum)
rdb->update_cksum = rioGenericUpdateChecksum;
// 写入 RDB 版本号
snprintf(magic,sizeof(magic),"REDIS%04d",RDB_VERSION);
if (rdbWriteRaw(rdb,magic,9) == -1) goto werr;
// 写入辅助信息
if (rdbSaveInfoAuxFields(rdb,rdbflags,rsi) == -1) goto werr;
// 写入模块辅助信息1
if (!(req & SLAVE_REQ_RDB_EXCLUDE_DATA) && rdbSaveModulesAux(rdb, REDISMODULE_AUX_BEFORE_RDB) == -1) goto werr;
// 写入函数信息
if (!(req & SLAVE_REQ_RDB_EXCLUDE_FUNCTIONS) && rdbSaveFunctions(rdb) == -1) goto werr;
// 写入具体数据
if (!(req & SLAVE_REQ_RDB_EXCLUDE_DATA)) {
for (j = 0; j < server.dbnum; j++) {
if (rdbSaveDb(rdb, j, rdbflags, &key_counter) == -1) goto werr;
}
}
// 写入模块辅助信息2
if (!(req & SLAVE_REQ_RDB_EXCLUDE_DATA) && rdbSaveModulesAux(rdb, REDISMODULE_AUX_AFTER_RDB) == -1) goto werr;
// 写入结束符
if (rdbSaveType(rdb,RDB_OPCODE_EOF) == -1) goto werr;
// 写入校验和
cksum = rdb->cksum;
memrev64ifbe(&cksum);
if (rioWrite(rdb,&cksum,8) == 0) goto werr;
return C_OK;
werr:
if (error) *error = errno;
return C_ERR;
}
接下来,我们来重点关注写入具体数据的部分:
ssize_t rdbSaveDb(rio *rdb, int dbid, int rdbflags, long *key_counter) {
dictEntry *de;
ssize_t written = 0;
ssize_t res;
kvstoreIterator *kvs_it = NULL;
static long long info_updated_time = 0;
// pname 用于记录当前是 RDB 还是 AOF
char *pname = (rdbflags & RDBFLAGS_AOF_PREAMBLE) ? "AOF rewrite" : "RDB";
// 获取指定数据库
redisDb *db = server.db + dbid;
unsigned long long int db_size = kvstoreSize(db->keys);
if (db_size == 0) return 0;
// 写入选择数据库操作码和数据库 ID
if ((res = rdbSaveType(rdb,RDB_OPCODE_SELECTDB)) < 0) goto werr;
written += res;
if ((res = rdbSaveLen(rdb, dbid)) < 0) goto werr;
written += res;
// 写入数据库大小和过期键大小
unsigned long long expires_size = kvstoreSize(db->expires);
if ((res = rdbSaveType(rdb,RDB_OPCODE_RESIZEDB)) < 0) goto werr;
written += res;
if ((res = rdbSaveLen(rdb,db_size)) < 0) goto werr;
written += res;
if ((res = rdbSaveLen(rdb,expires_size)) < 0) goto werr;
written += res;
// 迭代访问数据库字典中的所有键值对
kvs_it = kvstoreIteratorInit(db->keys);
int last_slot = -1;
while ((de = kvstoreIteratorNext(kvs_it)) != NULL) {
int curr_slot = kvstoreIteratorGetCurrentDictIndex(kvs_it);
if (server.cluster_enabled && curr_slot != last_slot) {
// 写入槽信息
if ((res = rdbSaveType(rdb, RDB_OPCODE_SLOT_INFO)) < 0) goto werr;
written += res;
if ((res = rdbSaveLen(rdb, curr_slot)) < 0) goto werr;
written += res;
if ((res = rdbSaveLen(rdb, kvstoreDictSize(db->keys, curr_slot))) < 0) goto werr;
written += res;
if ((res = rdbSaveLen(rdb, kvstoreDictSize(db->expires, curr_slot))) < 0) goto werr;
written += res;
last_slot = curr_slot;
}
// 获取键和值
sds keystr = dictGetKey(de);
robj key, *o = dictGetVal(de);
long long expire;
size_t rdb_bytes_before_key = rdb->processed_bytes;
initStaticStringObject(key,keystr);
expire = getExpire(db,&key);
// 写入键值对
if ((res = rdbSaveKeyValuePair(rdb, &key, o, expire, dbid)) < 0) goto werr;
written += res;
// 计算总字节数
size_t dump_size = rdb->processed_bytes - rdb_bytes_before_key;
// 如果是在子进程中,释放对象
if (server.in_fork_child) dismissObject(o, dump_size);
// 更新子进程信息
if (((*key_counter)++ & 1023) == 0) {
long long now = mstime();
if (now - info_updated_time >= 1000) {
sendChildInfo(CHILD_INFO_TYPE_CURRENT_INFO, *key_counter, pname);
info_updated_time = now;
}
}
}
kvstoreIteratorRelease(kvs_it);
return written;
werr:
if (kvs_it) kvstoreIteratorRelease(kvs_it);
return -1;
}
可以看到,这部分不但写入了数据,还写入了和数据库及字典相关的信息,以确保能够正确重建。
我们进一步看 rdbSaveKeyValuePair
:
int rdbSaveKeyValuePair(rio *rdb, robj *key, robj *val, long long expiretime, int dbid) {
int savelru = server.maxmemory_policy & MAXMEMORY_FLAG_LRU;
int savelfu = server.maxmemory_policy & MAXMEMORY_FLAG_LFU;
// 写入过期时间
if (expiretime != -1) {
if (rdbSaveType(rdb,RDB_OPCODE_EXPIRETIME_MS) == -1) return -1;
if (rdbSaveMillisecondTime(rdb,expiretime) == -1) return -1;
}
// 写入 LRU 信息
if (savelru) {
uint64_t idletime = estimateObjectIdleTime(val);
idletime /= 1000;
if (rdbSaveType(rdb,RDB_OPCODE_IDLE) == -1) return -1;
if (rdbSaveLen(rdb,idletime) == -1) return -1;
}
// 写入 LFU 信息
if (savelfu) {
uint8_t buf[1];
buf[0] = LFUDecrAndReturn(val);
if (rdbSaveType(rdb,RDB_OPCODE_FREQ) == -1) return -1;
if (rdbWriteRaw(rdb,buf,1) == -1) return -1;
}
// 写入对象类型
if (rdbSaveObjectType(rdb,val) == -1) return -1;
// 写入键
if (rdbSaveStringObject(rdb,key) == -1) return -1;
// 写入值
if (rdbSaveObject(rdb,val,key,dbid) == -1) return -1;
// 如果设置了保存延迟,进行延迟
if (server.rdb_key_save_delay)
debugDelay(server.rdb_key_save_delay);
return 1;
}
上面的代码 rdb_key_save_delay
是用于测试的,可以设置一个延迟,用于模拟保存数据时的延迟。
我们继续关注最主要的问题:如何写入具体数据。上面的代码中,无论是 rdbSaveObjectType
、rdbSaveStringObject
还是 rdbSaveObject
,它们无非就是对结构化数据做了序列化,使其变为字符串。考虑到 Redis 目前支持的数据结构非常多,这些代码写得非常长,但逻辑很简单,就不再仔细讲解了。
它们最终都是调用了 rdbWriteRaw
函数。这个函数封装了 rioWrite
的函数,用于写入数据:
static inline size_t rioWrite(rio *r, const void *buf, size_t len) {
if (r->flags & (RIO_FLAG_WRITE_ERROR | RIO_FLAG_ABORT)) return 0;
// 循环写入数据,直到 len 为 0
while (len) {
// 计算本次写入的字节数
size_t bytes_to_write = (r->max_processing_chunk && r->max_processing_chunk < len) ? r->max_processing_chunk : len;
// 如果设置了校验和更新函数,更新校验和
if (r->update_cksum) r->update_cksum(r,buf,bytes_to_write);
// 调用 rio 结构体的写入函数写入数据
if (r->write(r,buf,bytes_to_write) == 0) {
// 如果写入失败,设置写入错误标志并返回 0
r->flags |= RIO_FLAG_WRITE_ERROR;
return 0;
}
// 更新缓冲区指针和剩余字节数
buf = (char*)buf + bytes_to_write;
len -= bytes_to_write;
// 更新已处理的字节数
r->processed_bytes += bytes_to_write;
}
return 1;
}
这里的 r->write
是一个函数指针,指向了 rioFileWrite
函数。这个函数归根结底就是使用 fwrite
函数写入数据,不过加入了一些错误处理的逻辑。
到这里,我们就搞明白了 save
背后的运行逻辑。可以看到,它就是给整个 Redis 数据库做了一个序列化的操作,将内存中的数据保存到磁盘文件中。说实话,这个过程边边角角的处理挺多,不过逻辑还是很清晰的。
bgsave
其实刚刚在看 save
的时候,我们已经看到了很多跟 bgsave
相关的代码。bgsave
其实就是 Fork 一个子进程,然后在子进程中原封不动地调用我们最先看到的 rdbSave
函数。
点击查看源码
其代码如下:
int rdbSaveBackground(int req, char *filename, rdbSaveInfo *rsi, int rdbflags) {
pid_t childpid;
// 如果已经有一个活跃的子进程在运行,返回错误
if (hasActiveChildProcess()) return C_ERR;
// 统计 RDB 保存次数
server.stat_rdb_saves++;
// 记录脏数据
server.dirty_before_bgsave = server.dirty;
// 记录最后一次尝试保存时间
server.lastbgsave_try = time(NULL);
// fork 一个子进程
if ((childpid = redisFork(CHILD_TYPE_RDB)) == 0) {
int retval;
// 设置子进程的名字
redisSetProcTitle("redis-rdb-bgsave");
// 设置 CPU 亲和性
redisSetCpuAffinity(server.bgsave_cpulist);
// 调用 rdbSave 函数保存数据
retval = rdbSave(req, filename,rsi,rdbflags);
if (retval == C_OK) {
// 如果保存成功,发送子进程信息
sendChildCowInfo(CHILD_INFO_TYPE_RDB_COW_SIZE, "RDB");
}
// 退出子进程
exitFromChild((retval == C_OK) ? 0 : 1);
} else {
// 如果 fork 失败,返回错误
if (childpid == -1) {
server.lastbgsave_status = C_ERR;
/* ... */
return C_ERR;
}
/* ... */
server.rdb_save_time_start = time(NULL);
server.rdb_child_type = RDB_CHILD_TYPE_DISK;
return C_OK;
}
return C_OK;
}
实际上,你会发现这就是个简简单单的 Fork 操作。
不过这里存在一个问题:在 bgsave
时,如果原先的数据被修改了该怎么办?答案是操作系统提供的 Copy On Write(COW)。当父进程对共享内存修改时,会将物理内存复制一份,并在这个副本上进行操作。这样,子进程就不会受到父进程的影响。
自动保存
当然,RDB 不可能要求运维没事就去手动保存一下。它在事件循环中会自动保存。
RDB 自动保存需要满足以下所有条件:
- 上次保存以来的更改次数大于等于
sp->changes
; - 触发保存的时间间隔大于
sp->seconds
; - 如果上次保存失败,重试保存的时间间隔大于
CONFIG_BGSAVE_RETRY_DELAY
(默认为 5)
这里的 sp->changes
和 sp->seconds
就是我们之前在配置文件中设置的内容。
如果满足了条件,就会调用 rdbSaveBackground
在后台偷偷保存。
点击查看源码
if (server.dirty >= sp->changes &&
server.unixtime-server.lastsave > sp->seconds &&
(server.unixtime-server.lastbgsave_try >
CONFIG_BGSAVE_RETRY_DELAY ||
server.lastbgsave_status == C_OK))
{
/* ... */
rdbSaveInfo rsi, *rsiptr;
rsiptr = rdbPopulateSaveInfo(&rsi);
rdbSaveBackground(SLAVE_REQ_NONE,server.rdb_filename,rsiptr,RDBFLAGS_NONE);
break;
}
AOF
AOF 是通过日志的方式来持久化数据的。Redis 会将每次的写操作都记录到日志文件中,以便在重启时可以重新执行这些操作。
默认情况下,AOF 是关闭的。你可以通过配置文件(/etc/redis/redis.conf
)来开启 AOF:
appendonly yes
appendfilename "appendonly.aof"
AOF 文件的内容遵循一定的规则。例如,在执行 SET name ch3nyang
时,AOF 文件会记录:
*3
$3
SET
$4
name
$8
ch3nyang
-
*3
表示后面有 3 个参数; -
$3
表示后面的参数长度为 3; -
SET
表示第一个参数; -
$4
表示第二个参数长度为 4; -
name
表示第二个参数; -
$8
表示第三个参数长度为 8; -
ch3nyang
表示第三个参数。
点击查看源码
它通过 feedAppendOnlyFile
函数来实现:
void feedAppendOnlyFile(int dictid, robj **argv, int argc) {
sds buf = sdsempty();
serverAssert(dictid == -1 || (dictid >= 0 && dictid < server.dbnum));
if (server.aof_timestamp_enabled) {
// 生成AOF时间戳注释
sds ts = genAofTimestampAnnotationIfNeeded(0);
if (ts != NULL) {
buf = sdscatsds(buf, ts);
sdsfree(ts);
}
}
if (dictid != -1 && dictid != server.aof_selected_db) {
// 如果当前数据库ID与选择的数据库ID不同,生成SELECT命令
char seldb[64];
snprintf(seldb,sizeof(seldb),"%d",dictid);
buf = sdscatprintf(buf,"*2\r\n$6\r\nSELECT\r\n$%lu\r\n%s\r\n",
(unsigned long)strlen(seldb),seldb);
server.aof_selected_db = dictid;
}
// 将命令写入 sds 字符串
buf = catAppendOnlyGenericCommand(buf,argc,argv);
if (server.aof_state == AOF_ON ||
(server.aof_state == AOF_WAIT_REWRITE && server.child_type == CHILD_TYPE_AOF))
{
// 如果 AOF 处于打开状态,或者正在等待重写,将命令写入 AOF buffer
server.aof_buf = sdscatlen(server.aof_buf, buf, sdslen(buf));
}
sdsfree(buf);
}
其中最核心功能的代码实现非常简单:
sds catAppendOnlyGenericCommand(sds dst, int argc, robj **argv) {
char buf[32];
int len, j;
robj *o;
// 参数数量
buf[0] = '*';
len = 1+ll2string(buf+1,sizeof(buf)-1,argc);
buf[len++] = '\r';
buf[len++] = '\n';
dst = sdscatlen(dst,buf,len);
// 遍历所有参数
for (j = 0; j < argc; j++) {
o = getDecodedObject(argv[j]);
// 参数长度
buf[0] = '$';
len = 1+ll2string(buf+1,sizeof(buf)-1,sdslen(o->ptr));
buf[len++] = '\r';
buf[len++] = '\n';
dst = sdscatlen(dst,buf,len);
// 参数值
dst = sdscatlen(dst,o->ptr,sdslen(o->ptr));
dst = sdscatlen(dst,"\r\n",2);
decrRefCount(o);
}
return dst;
}
在 AOF 过程中,Redis 会首先执行命令,然后将命令写入 AOF 文件,最后将命令执行的结果返回给客户端。这两件事情同步执行,且不是原子操作。这导致了两个问题:
- 如果命令执行成功,但写入 AOF 文件失败,那么重启后数据会丢失;
- 如果磁盘 I/O 繁忙,写入较慢,那么整个 Redis 会被阻塞。
写回策略
为了解决这个问题,Redis 提供了三种写回策略:
-
always
:每次写操作都会同步写入 AOF 文件,这样可以保证数据不会丢失,但会降低性能; -
everysec
:每秒写入一次 AOF 文件,这样可以保证数据不会丢失,且性能较好; -
no
:不写入 AOF 文件,这样性能最好,但数据容易丢失。
点击查看源码
#define AOF_FSYNC_NO 0
#define AOF_FSYNC_ALWAYS 1
#define AOF_FSYNC_EVERYSEC 2
写回策略对于 AOF 的控制体现在 flushAppendOnlyFile
函数中:
void flushAppendOnlyFile(int force) {
/* ... */
// 在内存中修改 AOF 文件,使其内容为 AOF buffer 的内容
nwritten = aofWrite(server.aof_fd,server.aof_buf,sdslen(server.aof_buf));
/* ... */
// 只有在满足一定条件时才会同步进物理磁盘
try_fsync:
// 如果设置了 no-appendfsync-on-rewrite 且有子进程在后台进行 I/O 操作,则不进行 fsync
if (server.aof_no_fsync_on_rewrite && hasActiveChildProcess())
return;
if (server.aof_fsync == AOF_FSYNC_ALWAYS) {
// 如果设置了 always,每次都进行 fsync
latencyStartMonitor(latency);
// 尝试 fsync
if (redis_fsync(server.aof_fd) == -1) {
/* ... */
exit(1);
}
latencyEndMonitor(latency);
latencyAddSampleIfNeeded("aof-fsync-always",latency);
server.aof_last_incr_fsync_offset = server.aof_last_incr_size;
server.aof_last_fsync = server.mstime;
atomicSet(server.fsynced_reploff_pending, server.master_repl_offset);
} else if (server.aof_fsync == AOF_FSYNC_EVERYSEC &&
server.mstime - server.aof_last_fsync >= 1000) {
// 如果设置了 everysec,且距离上次 fsync 已经超过 1 秒
if (!sync_in_progress) {
// 如果没有正在进行的 sync,进行 fsync
aof_background_fsync(server.aof_fd);
server.aof_last_incr_fsync_offset = server.aof_last_incr_size;
}
server.aof_last_fsync = server.mstime;
}
}
这个函数每次会在内存中修改 AOF 文件的内容,但只有满足一定条件时才会同步进物理磁盘。
这里的 redis_fsync
函数在不同平台上使用了不同的系统调用:
#if defined(__linux__)
#define redis_fsync(fd) fdatasync(fd)
#elif defined(__APPLE__)
#define redis_fsync(fd) fcntl(fd, F_FULLFSYNC)
#else
#define redis_fsync(fd) fsync(fd)
#endif
这三种策略的配置如下:
appendfsync always
appendfsync everysec # 默认配置
appendfsync no
以上三种写回策略并没有彻底解决前文提到的两个问题:
- 程序依然是要先执行命令、再写入 AOF 文件、最后返回结果,并没有实现原子化;
- 依然可能产生阻塞,只是在设置为
everysec
时,不会不停阻塞。
所以,这些机制充其量只是在性能和数据一致性之间给了用户选择的权力,用户还是需要根据实际需求来选择。
重写机制
如果我们对同一个键值对反复修改,那么 AOF 文件会像 git 一样,记录下每次的修改历史。这样一来,AOF 文件会记录太多我们不需要的东西,导致文件过大。为了解决这个问题,Redis 提供了重写机制。
当满足一定条件时,Redis 会读取数据库中所有的键值对,然后将这些键值对只按照最终状态写入到新的 AOF 文件中。这样一来,新的 AOF 文件就不会包含多余的历史记录了。
AOF 重写有两种触发方式:
-
可以通过
BGREWRITEAOF
命令来手动触发:点击查看源码
void bgrewriteaofCommand(client *c) { if (server.child_type == CHILD_TYPE_AOF) { /* ... */ } else if (hasActiveChildProcess() || server.in_exec) { server.aof_rewrite_scheduled = 1; server.stat_aofrw_consecutive_failures = 0; /* ... */ } else if (rewriteAppendOnlyFileBackground() == C_OK) { /* ... */ } else { /* ... */ } }
-
也可以在事件循环中自动触发,其触发条件如下:
- AOF 处于打开状态;
- 没有活跃的子进程;
- AOF 文件大小超过了
aof_rewrite_min_size
(默认 64M); - AOF 文件大小增长超过了
aof_rewrite_perc
(默认 100%,即 AOF 文件大小翻倍); - 没有限制 AOF 重写的条件(如果上次重写失败,会依次限制 1min、2min、4min 直到 1h)。
点击查看源码
if (server.aof_state == AOF_ON && !hasActiveChildProcess() && server.aof_rewrite_perc && server.aof_current_size > server.aof_rewrite_min_size) { long long base = server.aof_rewrite_base_size ? server.aof_rewrite_base_size : 1; long long growth = (server.aof_current_size*100/base) - 100; if (growth >= server.aof_rewrite_perc && !aofRewriteLimited()) { /* ... */ rewriteAppendOnlyFileBackground(); } }
这两种方式都调用了 rewriteAppendOnlyFileBackground
函数来在进行重写。
AOF 重写需要消耗一定的时间和资源,因此 Redis 会在后台异步地进行重写操作。和 RDB 一样,AOF 重写会 fork 一个名为 redis-aof-rewrite
的子进程,然后在子进程中进行重写操作。
重写时,依然是:
- 将 AOF buffer 中的内容写入到结构体中;
- 将结构体中的内容写入到
temp-rewriteaof-bg-pid.aof
临时文件 - 然后再重命名
这样一来,即使重写失败,也不会影响原来的 AOF 文件。
如果在此期间有新的写操作,Redis 会利用 COW 机制来保证数据的一致性。当然,这些新写入的数据将来肯定也要放入 AOF 文件中,因此此时 Redis 会暂时将它们写入 AOF buffer。
点击查看源码
int rewriteAppendOnlyFileBackground(void) {
pid_t childpid;
// 如果有活跃的子进程在运行,返回错误
if (hasActiveChildProcess()) return C_ERR;
// 创建或打开 AOF 文件目录
if (dirCreateIfMissing(server.aof_dirname) == -1) {
/* ... */
server.aof_lastbgrewrite_status = C_ERR;
return C_ERR;
}
// 重置选中的数据库
server.aof_selected_db = -1;
// 将 buffer 写入磁盘
flushAppendOnlyFile(1);
// 打开新的增量 AOF 文件进行追加
if (openNewIncrAofForAppend() != C_OK) {
server.aof_lastbgrewrite_status = C_ERR;
return C_ERR;
}
if (server.aof_state == AOF_WAIT_REWRITE) {
// 如果 AOF 状态为等待重写,排空 AOF 的后台任务并设置同步的复制偏移量
bioDrainWorker(BIO_AOF_FSYNC);
atomicSet(server.fsynced_reploff_pending, server.master_repl_offset);
server.fsynced_reploff = 0;
}
// 统计重写次数
server.stat_aof_rewrites++;
// fork 一个子进程
if ((childpid = redisFork(CHILD_TYPE_AOF)) == 0) {
char tmpfile[256];
// 设置子进程名字
redisSetProcTitle("redis-aof-rewrite");
// 设置 CPU 亲和性
redisSetCpuAffinity(server.aof_rewrite_cpulist);
// 生成临时文件名
snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) getpid());
// 重写 AOF 文件
if (rewriteAppendOnlyFile(tmpfile) == C_OK) {
/* ... */
sendChildCowInfo(CHILD_INFO_TYPE_AOF_COW_SIZE, "AOF rewrite");
exitFromChild(0);
} else {
exitFromChild(1);
}
} else {
// 如果 fork 失败,返回错误
if (childpid == -1) {
server.aof_lastbgrewrite_status = C_ERR;
/* ... */
return C_ERR;
}
/* ... */
server.aof_rewrite_scheduled = 0;
server.aof_rewrite_time_start = time(NULL);
return C_OK;
}
return C_OK;
}
这个函数首先将 AOF buffer 中的内容全部写入到当前的 AOF 文件中,然后创建了一个子进程,在子进程中调用了 rewriteAppendOnlyFile
函数:
int rewriteAppendOnlyFile(char *filename) {
rio aof;
FILE *fp = NULL;
char tmpfile[256];
// 生成临时文件名
snprintf(tmpfile,256,"temp-rewriteaof-%d.aof", (int) getpid());
// 打开临时文件
fp = fopen(tmpfile,"w");
if (!fp) {
/* ... */
return C_ERR;
}
// 初始化 rio 结构体
rioInitWithFile(&aof,fp);
// 如果设置了增量 fsync,设置自动同步和回收缓存
if (server.aof_rewrite_incremental_fsync) {
rioSetAutoSync(&aof,REDIS_AUTOSYNC_BYTES);
rioSetReclaimCache(&aof,1);
}
// 开始保存
startSaving(RDBFLAGS_AOF_PREAMBLE);
if (server.aof_use_rdb_preamble) {
// 如果使用 RDB 头部,保存 RDB 头部
int error;
if (rdbSaveRio(SLAVE_REQ_NONE,&aof,&error,RDBFLAGS_AOF_PREAMBLE,NULL) == C_ERR) {
errno = error;
goto werr;
}
} else {
// 否则,重写 AOF 文件
if (rewriteAppendOnlyFileRio(&aof) == C_ERR) goto werr;
}
// 刷新缓冲区、同步文件和回收缓存
if (fflush(fp)) goto werr;
if (fsync(fileno(fp))) goto werr;
if (reclaimFilePageCache(fileno(fp), 0, 0) == -1) {
/* ... */
}
if (fclose(fp)) { fp = NULL; goto werr; }
fp = NULL;
// 重命名临时文件
if (rename(tmpfile,filename) == -1) {
/* ... */
unlink(tmpfile);
stopSaving(0);
return C_ERR;
}
stopSaving(1);
return C_OK;
werr:
/* ... */
if (fp) fclose(fp);
unlink(tmpfile);
stopSaving(0);
return C_ERR;
}
和之前的 RDB 一样,为了防止写入过程中出现错误,Redis 会先写入到一个临时文件中,然后再重命名为正式的 AOF 文件。
更具体的重写操作在 rewriteAppendOnlyFileRio
函数中:
int rewriteAppendOnlyFileRio(rio *aof) {
dictEntry *de;
int j;
long key_count = 0;
long long updated_time = 0;
kvstoreIterator *kvs_it = NULL;
// 记录时间戳
if (server.aof_timestamp_enabled) {
sds ts = genAofTimestampAnnotationIfNeeded(1);
if (rioWrite(aof,ts,sdslen(ts)) == 0) { sdsfree(ts); goto werr; }
sdsfree(ts);
}
// 重写函数
if (rewriteFunctions(aof) == 0) goto werr;
// 遍历所有数据库
for (j = 0; j < server.dbnum; j++) {
char selectcmd[] = "*2\r\n$6\r\nSELECT\r\n";
redisDb *db = server.db + j;
if (kvstoreSize(db->keys) == 0) continue;
// 选择数据库
if (rioWrite(aof,selectcmd,sizeof(selectcmd)-1) == 0) goto werr;
if (rioWriteBulkLongLong(aof,j) == 0) goto werr;
kvs_it = kvstoreIteratorInit(db->keys);
// 遍历所有键值对
while((de = kvstoreIteratorNext(kvs_it)) != NULL) {
sds keystr;
robj key, *o;
long long expiretime;
size_t aof_bytes_before_key = aof->processed_bytes;
keystr = dictGetKey(de);
o = dictGetVal(de);
initStaticStringObject(key,keystr);
expiretime = getExpire(db,&key);
// 保存键值对
if (o->type == OBJ_STRING) {
char cmd[]="*3\r\n$3\r\nSET\r\n";
if (rioWrite(aof,cmd,sizeof(cmd)-1) == 0) goto werr;
if (rioWriteBulkObject(aof,&key) == 0) goto werr;
if (rioWriteBulkObject(aof,o) == 0) goto werr;
} else if (o->type == OBJ_LIST) {
if (rewriteListObject(aof,&key,o) == 0) goto werr;
} else if (o->type == OBJ_SET) {
if (rewriteSetObject(aof,&key,o) == 0) goto werr;
} else if (o->type == OBJ_ZSET) {
if (rewriteSortedSetObject(aof,&key,o) == 0) goto werr;
} else if (o->type == OBJ_HASH) {
if (rewriteHashObject(aof,&key,o) == 0) goto werr;
} else if (o->type == OBJ_STREAM) {
if (rewriteStreamObject(aof,&key,o) == 0) goto werr;
} else if (o->type == OBJ_MODULE) {
if (rewriteModuleObject(aof,&key,o,j) == 0) goto werr;
} else {
serverPanic("Unknown object type");
}
// 写完后将对象释放,减少 COW
size_t dump_size = aof->processed_bytes - aof_bytes_before_key;
if (server.in_fork_child) dismissObject(o, dump_size);
// 保存过期时间
if (expiretime != -1) {
char cmd[]="*3\r\n$9\r\nPEXPIREAT\r\n";
if (rioWrite(aof,cmd,sizeof(cmd)-1) == 0) goto werr;
if (rioWriteBulkObject(aof,&key) == 0) goto werr;
if (rioWriteBulkLongLong(aof,expiretime) == 0) goto werr;
}
// 每写入 1024 个键值对,更新子进程信息
if ((key_count++ & 1023) == 0) {
long long now = mstime();
if (now - updated_time >= 1000) {
sendChildInfo(CHILD_INFO_TYPE_CURRENT_INFO, key_count, "AOF rewrite");
updated_time = now;
}
}
// 测试用的延迟
if (server.rdb_key_save_delay)
debugDelay(server.rdb_key_save_delay);
}
kvstoreIteratorRelease(kvs_it);
}
return C_OK;
werr:
if (kvs_it) kvstoreIteratorRelease(kvs_it);
return C_ERR;
}
这个函数逻辑上很好理解,就是遍历所有数据库中的所有键值对,然后按照类型调用对应的函数,将它们写入到 AOF 文件中。
值得注意的是,为了减少 COW,Redis 每写入一个键值对后,都会立即释放对象。
混合持久化
- AOF 作为增量持久化,占用系统资源少,数据不易丢失,但是文件体积大,恢复速度慢。
- RDB 作为全量持久化,占用系统资源多,数据易丢失,但是文件体积小,恢复速度快。
那么,有没有一种持久化方式,既占用系统资源少,数据不易丢失,又文件体积小,恢复速度快呢?
答案是混合持久化。Redis 4.0 之后,提供了混合持久化的功能。它需要使用 aof-use-rdb-preamble yes
配置。
混合持久化时,Redis 会在 AOF 文件的开头写入一个 RDB 文件的内容,然后再将缓冲区中的内容以 AOF 的格式写入文件。
Comments