Ch3nyang's blog collections_bookmark

post

person

about

category

category

local_offer

tag

rss_feed

rss

深入 Redis 源码 | (4)
Redis 持久化

calendar_month 2025-01
archive 中间件
tag redis

There are 6 posts in series 深入 Redis 源码.

本文将会从源码层面解读 Redis 的各项机制。我们会从最基础的内容开始,尽量只看 Redis 最核心的部分,而剥离掉一些不太重要的内容,所有的源码都给出了 GitHub 的链接。

写作本文时,Redis CE 还在 7.4 版本;Redis 8.0 仍然在 Pre-Release。因此,本文主要基于 Redis 7.4,其余版本可能有所不同。本文使用的环境为 Debian 12。

持久化

Redis 的持久化有两种方式:RDB(Redis DataBase)和 AOF(Append Only File)。RDB 是快照的方式,AOF 是日志的方式。

RDB

RDB 是通过快照的方式来持久化数据的。Redis 会将内存中的数据保存到磁盘上,以便在重启时可以快速加载数据。

RDB 提供了两种保存方式:savebgsavesave 是阻塞的,bgsave 则会 fork 一个子进程来异步地进行持久化操作。

save 做的是全量快照,因此不可能每做一次修改都进行一次快照。

save 命令也可以通过配置文件来实现每隔一段时间自动保存快照。例如:

save 900 1
save 300 10
save 60 10000

这个配置表示,如果 900 秒内有至少 1 个 key 被修改,就会保存快照;如果 300 秒内有至少 10 个 key 被修改,就会保存快照;如果 60 秒内有至少 10000 个 key 被修改,就会保存快照。

save

我们先来看较简单的 save

在这个过程中,RDB 的保存操作为了防止数据损坏:

  • 先将数据保存到 rio 结构体中,写入的内容包括
    • RDB 版本号
    • 辅助信息
    • 模块辅助信息(前半部分)
    • 函数信息
    • 数据
      • 遍历并序列化写入
    • 模块辅助信息(后半部分)
    • 结束符
    • 校验和
  • 然后再将 rio 结构体写入临时文件 temp-pid.rdb
  • 然后再重命名为正式文件

临时文件的作用是,即使保存过程中出现了问题,也不会影响到原有的备份文件。而这里的 rio 相当于做了一个序列化的工作。

点击查看源码

save 的实现如下:

int rdbSave(int req, char *filename, rdbSaveInfo *rsi, int rdbflags) {
    /* ... */

    startSaving(rdbflags);
    // 生成临时文件名为 temp-进程号.rdb
    snprintf(tmpfile,256,"temp-%d.rdb", (int) getpid());

    // 保存数据到临时文件
    if (rdbSaveInternal(req,tmpfile,rsi,rdbflags) != C_OK) {
        /* ... */
    }
    
    // 将临时文件重命名为正式文件
    if (rename(tmpfile,filename) == -1) {
        /* ... */
    }
    // 同步目录,确保文件已经写入磁盘
    if (fsyncFileDir(filename) != 0) {
        /* ... */
    }

    /* ... */
    // 重置脏数据
    server.dirty = 0;
    // 更新最后保存时间
    server.lastsave = time(NULL);
    // 更新最后保存状态
    server.lastbgsave_status = C_OK;
    stopSaving(1);
    return C_OK;
}

具体的保存操作在 rdbSaveInternal 中:

static int rdbSaveInternal(int req, const char *filename, rdbSaveInfo *rsi, int rdbflags) {
    /* ... */

    // 打开文件进行写操作
    FILE *fp = fopen(filename,"w");
    
    /* ... */

    // 初始化 rio 结构体
    rioInitWithFile(&rdb,fp);

    // 如果使用了增量同步,设置自动同步和回收缓存
    if (server.rdb_save_incremental_fsync) {
        rioSetAutoSync(&rdb,REDIS_AUTOSYNC_BYTES);
        if (!(rdbflags & RDBFLAGS_KEEP_CACHE)) rioSetReclaimCache(&rdb,1);
    }

    // 保存数据到 rio 结构体
    if (rdbSaveRio(req,&rdb,&error,rdbflags,rsi) == C_ERR) {
        errno = error;
        err_op = "rdbSaveRio";
        goto werr;
    }

    // 刷新文件缓冲区
    if (fflush(fp)) { err_op = "fflush"; goto werr; }
    // 同步文件到磁盘
    if (fsync(fileno(fp))) { err_op = "fsync"; goto werr; }
    // 如果没有设置保留缓存,回收缓存
    if (!(rdbflags & RDBFLAGS_KEEP_CACHE) && reclaimFilePageCache(fileno(fp), 0, 0) == -1) {
        /* ... */
    }
    // 关闭文件
    if (fclose(fp)) { fp = NULL; err_op = "fclose"; goto werr; }

    return C_OK;
    
    /* ... */
}

具体写入的内容在 rdbSaveRio 将其分为了几个部分:

int rdbSaveRio(int req, rio *rdb, int *error, int rdbflags, rdbSaveInfo *rsi) {
    char magic[10];
    uint64_t cksum;
    long key_counter = 0;
    int j;

    // 如果启用了校验和,设置校验和函数
    if (server.rdb_checksum)
        rdb->update_cksum = rioGenericUpdateChecksum;
    
    // 写入 RDB 版本号
    snprintf(magic,sizeof(magic),"REDIS%04d",RDB_VERSION);
    if (rdbWriteRaw(rdb,magic,9) == -1) goto werr;

    // 写入辅助信息
    if (rdbSaveInfoAuxFields(rdb,rdbflags,rsi) == -1) goto werr;
    // 写入模块辅助信息1
    if (!(req & SLAVE_REQ_RDB_EXCLUDE_DATA) && rdbSaveModulesAux(rdb, REDISMODULE_AUX_BEFORE_RDB) == -1) goto werr;
    // 写入函数信息
    if (!(req & SLAVE_REQ_RDB_EXCLUDE_FUNCTIONS) && rdbSaveFunctions(rdb) == -1) goto werr;
    // 写入具体数据
    if (!(req & SLAVE_REQ_RDB_EXCLUDE_DATA)) {
        for (j = 0; j < server.dbnum; j++) {
            if (rdbSaveDb(rdb, j, rdbflags, &key_counter) == -1) goto werr;
        }
    }
    // 写入模块辅助信息2
    if (!(req & SLAVE_REQ_RDB_EXCLUDE_DATA) && rdbSaveModulesAux(rdb, REDISMODULE_AUX_AFTER_RDB) == -1) goto werr;
    // 写入结束符
    if (rdbSaveType(rdb,RDB_OPCODE_EOF) == -1) goto werr;
    // 写入校验和
    cksum = rdb->cksum;
    memrev64ifbe(&cksum);
    if (rioWrite(rdb,&cksum,8) == 0) goto werr;
    return C_OK;

werr:
    if (error) *error = errno;
    return C_ERR;
}

接下来,我们来重点关注写入具体数据的部分

ssize_t rdbSaveDb(rio *rdb, int dbid, int rdbflags, long *key_counter) {
    dictEntry *de;
    ssize_t written = 0;
    ssize_t res;
    kvstoreIterator *kvs_it = NULL;
    static long long info_updated_time = 0;
    // pname 用于记录当前是 RDB 还是 AOF
    char *pname = (rdbflags & RDBFLAGS_AOF_PREAMBLE) ? "AOF rewrite" :  "RDB";

    // 获取指定数据库
    redisDb *db = server.db + dbid;
    unsigned long long int db_size = kvstoreSize(db->keys);
    if (db_size == 0) return 0;

    // 写入选择数据库操作码和数据库 ID
    if ((res = rdbSaveType(rdb,RDB_OPCODE_SELECTDB)) < 0) goto werr;
    written += res;
    if ((res = rdbSaveLen(rdb, dbid)) < 0) goto werr;
    written += res;

    // 写入数据库大小和过期键大小
    unsigned long long expires_size = kvstoreSize(db->expires);
    if ((res = rdbSaveType(rdb,RDB_OPCODE_RESIZEDB)) < 0) goto werr;
    written += res;
    if ((res = rdbSaveLen(rdb,db_size)) < 0) goto werr;
    written += res;
    if ((res = rdbSaveLen(rdb,expires_size)) < 0) goto werr;
    written += res;

    // 迭代访问数据库字典中的所有键值对
    kvs_it = kvstoreIteratorInit(db->keys);
    int last_slot = -1;
    while ((de = kvstoreIteratorNext(kvs_it)) != NULL) {
        int curr_slot = kvstoreIteratorGetCurrentDictIndex(kvs_it);
        if (server.cluster_enabled && curr_slot != last_slot) {
            // 写入槽信息
            if ((res = rdbSaveType(rdb, RDB_OPCODE_SLOT_INFO)) < 0) goto werr;
            written += res;
            if ((res = rdbSaveLen(rdb, curr_slot)) < 0) goto werr;
            written += res;
            if ((res = rdbSaveLen(rdb, kvstoreDictSize(db->keys, curr_slot))) < 0) goto werr;
            written += res;
            if ((res = rdbSaveLen(rdb, kvstoreDictSize(db->expires, curr_slot))) < 0) goto werr;
            written += res;
            last_slot = curr_slot;
        }
        // 获取键和值
        sds keystr = dictGetKey(de);
        robj key, *o = dictGetVal(de);
        long long expire;
        size_t rdb_bytes_before_key = rdb->processed_bytes;

        initStaticStringObject(key,keystr);
        expire = getExpire(db,&key);
        // 写入键值对
        if ((res = rdbSaveKeyValuePair(rdb, &key, o, expire, dbid)) < 0) goto werr;
        written += res;

        // 计算总字节数
        size_t dump_size = rdb->processed_bytes - rdb_bytes_before_key;
        // 如果是在子进程中,释放对象
        if (server.in_fork_child) dismissObject(o, dump_size);

        // 更新子进程信息
        if (((*key_counter)++ & 1023) == 0) {
            long long now = mstime();
            if (now - info_updated_time >= 1000) {
                sendChildInfo(CHILD_INFO_TYPE_CURRENT_INFO, *key_counter, pname);
                info_updated_time = now;
            }
        }
    }
    kvstoreIteratorRelease(kvs_it);
    return written;

werr:
    if (kvs_it) kvstoreIteratorRelease(kvs_it);
    return -1;
}

可以看到,这部分不但写入了数据,还写入了和数据库及字典相关的信息,以确保能够正确重建。

我们进一步看 rdbSaveKeyValuePair

int rdbSaveKeyValuePair(rio *rdb, robj *key, robj *val, long long expiretime, int dbid) {
    int savelru = server.maxmemory_policy & MAXMEMORY_FLAG_LRU;
    int savelfu = server.maxmemory_policy & MAXMEMORY_FLAG_LFU;

    // 写入过期时间
    if (expiretime != -1) {
        if (rdbSaveType(rdb,RDB_OPCODE_EXPIRETIME_MS) == -1) return -1;
        if (rdbSaveMillisecondTime(rdb,expiretime) == -1) return -1;
    }

    // 写入 LRU 信息
    if (savelru) {
        uint64_t idletime = estimateObjectIdleTime(val);
        idletime /= 1000;
        if (rdbSaveType(rdb,RDB_OPCODE_IDLE) == -1) return -1;
        if (rdbSaveLen(rdb,idletime) == -1) return -1;
    }

    // 写入 LFU 信息
    if (savelfu) {
        uint8_t buf[1];
        buf[0] = LFUDecrAndReturn(val);
        if (rdbSaveType(rdb,RDB_OPCODE_FREQ) == -1) return -1;
        if (rdbWriteRaw(rdb,buf,1) == -1) return -1;
    }

    // 写入对象类型
    if (rdbSaveObjectType(rdb,val) == -1) return -1;
    // 写入键
    if (rdbSaveStringObject(rdb,key) == -1) return -1;
    // 写入值
    if (rdbSaveObject(rdb,val,key,dbid) == -1) return -1;

    // 如果设置了保存延迟,进行延迟
    if (server.rdb_key_save_delay)
        debugDelay(server.rdb_key_save_delay);

    return 1;
}

上面的代码 rdb_key_save_delay 是用于测试的,可以设置一个延迟,用于模拟保存数据时的延迟。

我们继续关注最主要的问题:如何写入具体数据。上面的代码中,无论是 rdbSaveObjectTyperdbSaveStringObject 还是 rdbSaveObject,它们无非就是对结构化数据做了序列化,使其变为字符串。考虑到 Redis 目前支持的数据结构非常多,这些代码写得非常长,但逻辑很简单,就不再仔细讲解了。

它们最终都是调用了 rdbWriteRaw 函数。这个函数封装了 rioWrite 的函数,用于写入数据:

static inline size_t rioWrite(rio *r, const void *buf, size_t len) {
    if (r->flags & (RIO_FLAG_WRITE_ERROR | RIO_FLAG_ABORT)) return 0;

    // 循环写入数据,直到 len 为 0
    while (len) {
        // 计算本次写入的字节数
        size_t bytes_to_write = (r->max_processing_chunk && r->max_processing_chunk < len) ? r->max_processing_chunk : len;
        // 如果设置了校验和更新函数,更新校验和
        if (r->update_cksum) r->update_cksum(r,buf,bytes_to_write);
        // 调用 rio 结构体的写入函数写入数据
        if (r->write(r,buf,bytes_to_write) == 0) {
            // 如果写入失败,设置写入错误标志并返回 0
            r->flags |= RIO_FLAG_WRITE_ERROR;
            return 0;
        }
        // 更新缓冲区指针和剩余字节数
        buf = (char*)buf + bytes_to_write;
        len -= bytes_to_write;
        // 更新已处理的字节数
        r->processed_bytes += bytes_to_write;
    }
    return 1;
}

这里的 r->write 是一个函数指针,指向了 rioFileWrite 函数。这个函数归根结底就是使用 fwrite 函数写入数据,不过加入了一些错误处理的逻辑。

到这里,我们就搞明白了 save 背后的运行逻辑。可以看到,它就是给整个 Redis 数据库做了一个序列化的操作,将内存中的数据保存到磁盘文件中。说实话,这个过程边边角角的处理挺多,不过逻辑还是很清晰的。

bgsave

其实刚刚在看 save 的时候,我们已经看到了很多跟 bgsave 相关的代码。bgsave 其实就是 Fork 一个子进程,然后在子进程中原封不动地调用我们最先看到的 rdbSave 函数。

点击查看源码

代码如下:

int rdbSaveBackground(int req, char *filename, rdbSaveInfo *rsi, int rdbflags) {
    pid_t childpid;
    
    // 如果已经有一个活跃的子进程在运行,返回错误
    if (hasActiveChildProcess()) return C_ERR;

    // 统计 RDB 保存次数
    server.stat_rdb_saves++;
    // 记录脏数据
    server.dirty_before_bgsave = server.dirty;
    // 记录最后一次尝试保存时间
    server.lastbgsave_try = time(NULL);

    // fork 一个子进程
    if ((childpid = redisFork(CHILD_TYPE_RDB)) == 0) {
        int retval;

        // 设置子进程的名字
        redisSetProcTitle("redis-rdb-bgsave");
        // 设置 CPU 亲和性
        redisSetCpuAffinity(server.bgsave_cpulist);

        // 调用 rdbSave 函数保存数据
        retval = rdbSave(req, filename,rsi,rdbflags);

        if (retval == C_OK) {
            // 如果保存成功,发送子进程信息
            sendChildCowInfo(CHILD_INFO_TYPE_RDB_COW_SIZE, "RDB");
        }

        // 退出子进程
        exitFromChild((retval == C_OK) ? 0 : 1);
    } else {
        // 如果 fork 失败,返回错误
        if (childpid == -1) {
            server.lastbgsave_status = C_ERR;
            /* ... */
            return C_ERR;
        }
        /* ... */
        server.rdb_save_time_start = time(NULL);
        server.rdb_child_type = RDB_CHILD_TYPE_DISK;
        return C_OK;
    }
    return C_OK;
}

实际上,你会发现这就是个简简单单的 Fork 操作。

不过这里存在一个问题:在 bgsave 时,如果原先的数据被修改了该怎么办?答案是操作系统提供的 Copy On Write(COW)。当父进程对共享内存修改时,会将物理内存复制一份,并在这个副本上进行操作。这样,子进程就不会受到父进程的影响。

自动保存

当然,RDB 不可能要求运维没事就去手动保存一下。它在事件循环中会自动保存

RDB 自动保存需要满足以下所有条件:

  • 上次保存以来的更改次数大于等于 sp->changes
  • 触发保存的时间间隔大于 sp->seconds
  • 如果上次保存失败,重试保存的时间间隔大于 CONFIG_BGSAVE_RETRY_DELAY(默认为 5)

这里的 sp->changessp->seconds 就是我们之前在配置文件中设置的内容。

如果满足了条件,就会调用 rdbSaveBackground 在后台偷偷保存。

点击查看源码
if (server.dirty >= sp->changes &&
    server.unixtime-server.lastsave > sp->seconds &&
    (server.unixtime-server.lastbgsave_try >
      CONFIG_BGSAVE_RETRY_DELAY ||
      server.lastbgsave_status == C_OK))
{
    /* ... */
    rdbSaveInfo rsi, *rsiptr;
    rsiptr = rdbPopulateSaveInfo(&rsi);
    rdbSaveBackground(SLAVE_REQ_NONE,server.rdb_filename,rsiptr,RDBFLAGS_NONE);
    break;
}

AOF

AOF 是通过日志的方式来持久化数据的。Redis 会将每次的写操作都记录到日志文件中,以便在重启时可以重新执行这些操作。

默认情况下,AOF 是关闭的。你可以通过配置文件(/etc/redis/redis.conf)来开启 AOF:

appendonly     yes
appendfilename "appendonly.aof"

AOF 文件的内容遵循一定的规则。例如,在执行 SET name ch3nyang 时,AOF 文件会记录:

*3
$3
SET
$4
name
$8
ch3nyang
  • *3 表示后面有 3 个参数;
  • $3 表示后面的参数长度为 3;
  • SET 表示第一个参数;
  • $4 表示第二个参数长度为 4;
  • name 表示第二个参数;
  • $8 表示第三个参数长度为 8;
  • ch3nyang 表示第三个参数。
点击查看源码

它通过 feedAppendOnlyFile 函数来实现:

void feedAppendOnlyFile(int dictid, robj **argv, int argc) {
    sds buf = sdsempty();

    serverAssert(dictid == -1 || (dictid >= 0 && dictid < server.dbnum));

    if (server.aof_timestamp_enabled) {
        // 生成AOF时间戳注释
        sds ts = genAofTimestampAnnotationIfNeeded(0);
        if (ts != NULL) {
            buf = sdscatsds(buf, ts);
            sdsfree(ts);
        }
    }

    if (dictid != -1 && dictid != server.aof_selected_db) {
        // 如果当前数据库ID与选择的数据库ID不同,生成SELECT命令
        char seldb[64];

        snprintf(seldb,sizeof(seldb),"%d",dictid);
        buf = sdscatprintf(buf,"*2\r\n$6\r\nSELECT\r\n$%lu\r\n%s\r\n",
            (unsigned long)strlen(seldb),seldb);
        server.aof_selected_db = dictid;
    }

    // 将命令写入 sds 字符串
    buf = catAppendOnlyGenericCommand(buf,argc,argv);

    if (server.aof_state == AOF_ON ||
        (server.aof_state == AOF_WAIT_REWRITE && server.child_type == CHILD_TYPE_AOF))
    {
        // 如果 AOF 处于打开状态,或者正在等待重写,将命令写入 AOF buffer
        server.aof_buf = sdscatlen(server.aof_buf, buf, sdslen(buf));
    }

    sdsfree(buf);
}

其中最核心功能的代码实现非常简单:

sds catAppendOnlyGenericCommand(sds dst, int argc, robj **argv) {
    char buf[32];
    int len, j;
    robj *o;

    // 参数数量
    buf[0] = '*';
    len = 1+ll2string(buf+1,sizeof(buf)-1,argc);
    buf[len++] = '\r';
    buf[len++] = '\n';
    dst = sdscatlen(dst,buf,len);

    // 遍历所有参数
    for (j = 0; j < argc; j++) {
        o = getDecodedObject(argv[j]);
        // 参数长度
        buf[0] = '$';
        len = 1+ll2string(buf+1,sizeof(buf)-1,sdslen(o->ptr));
        buf[len++] = '\r';
        buf[len++] = '\n';
        dst = sdscatlen(dst,buf,len);
        // 参数值
        dst = sdscatlen(dst,o->ptr,sdslen(o->ptr));
        dst = sdscatlen(dst,"\r\n",2);
        decrRefCount(o);
    }
    return dst;
}

在 AOF 过程中,Redis 会首先执行命令,然后将命令写入 AOF 文件,最后将命令执行的结果返回给客户端。这两件事情同步执行,且不是原子操作。这导致了两个问题:

  • 如果命令执行成功,但写入 AOF 文件失败,那么重启后数据会丢失;
  • 如果磁盘 I/O 繁忙,写入较慢,那么整个 Redis 会被阻塞。

写回策略

为了解决这个问题,Redis 提供了三种写回策略

  • always:每次写操作都会同步写入 AOF 文件,这样可以保证数据不会丢失,但会降低性能;
  • everysec:每秒写入一次 AOF 文件,这样可以保证数据不会丢失,且性能较好;
  • no:不写入 AOF 文件,这样性能最好,但数据容易丢失。
点击查看源码
#define AOF_FSYNC_NO 0
#define AOF_FSYNC_ALWAYS 1
#define AOF_FSYNC_EVERYSEC 2

写回策略对于 AOF 的控制体现在 flushAppendOnlyFile 函数中:

void flushAppendOnlyFile(int force) {
    /* ... */

    // 在内存中修改 AOF 文件,使其内容为 AOF buffer 的内容
    nwritten = aofWrite(server.aof_fd,server.aof_buf,sdslen(server.aof_buf));

    /* ... */

    // 只有在满足一定条件时才会同步进物理磁盘
try_fsync:
    // 如果设置了 no-appendfsync-on-rewrite 且有子进程在后台进行 I/O 操作,则不进行 fsync
    if (server.aof_no_fsync_on_rewrite && hasActiveChildProcess())
        return;

    if (server.aof_fsync == AOF_FSYNC_ALWAYS) {
        // 如果设置了 always,每次都进行 fsync
        
        latencyStartMonitor(latency);
        // 尝试 fsync
        if (redis_fsync(server.aof_fd) == -1) {
            /* ... */
            exit(1);
        }
        latencyEndMonitor(latency);
        latencyAddSampleIfNeeded("aof-fsync-always",latency);
        server.aof_last_incr_fsync_offset = server.aof_last_incr_size;
        server.aof_last_fsync = server.mstime;
        atomicSet(server.fsynced_reploff_pending, server.master_repl_offset);
    } else if (server.aof_fsync == AOF_FSYNC_EVERYSEC &&
               server.mstime - server.aof_last_fsync >= 1000) {
        // 如果设置了 everysec,且距离上次 fsync 已经超过 1 秒

        if (!sync_in_progress) {
            // 如果没有正在进行的 sync,进行 fsync
            aof_background_fsync(server.aof_fd);
            server.aof_last_incr_fsync_offset = server.aof_last_incr_size;
        }
        server.aof_last_fsync = server.mstime;
    }
}

这个函数每次会在内存中修改 AOF 文件的内容,但只有满足一定条件时才会同步进物理磁盘。

这里的 redis_fsync 函数在不同平台上使用了不同的系统调用:

#if defined(__linux__)
#define redis_fsync(fd) fdatasync(fd)
#elif defined(__APPLE__)
#define redis_fsync(fd) fcntl(fd, F_FULLFSYNC)
#else
#define redis_fsync(fd) fsync(fd)
#endif

这三种策略的配置如下:

appendfsync always
appendfsync everysec # 默认配置
appendfsync no

以上三种写回策略并没有彻底解决前文提到的两个问题:

  • 程序依然是要先执行命令、再写入 AOF 文件、最后返回结果,并没有实现原子化;
  • 依然可能产生阻塞,只是在设置为 everysec 时,不会不停阻塞。

所以,这些机制充其量只是在性能和数据一致性之间给了用户选择的权力,用户还是需要根据实际需求来选择。

重写机制

如果我们对同一个键值对反复修改,那么 AOF 文件会像 git 一样,记录下每次的修改历史。这样一来,AOF 文件会记录太多我们不需要的东西,导致文件过大。为了解决这个问题,Redis 提供了重写机制。

当满足一定条件时,Redis 会读取数据库中所有的键值对,然后将这些键值对只按照最终状态写入到新的 AOF 文件中。这样一来,新的 AOF 文件就不会包含多余的历史记录了。

AOF 重写有两种触发方式:

  • 可以通过 BGREWRITEAOF 命令来手动触发:

    点击查看源码
    void bgrewriteaofCommand(client *c) {
        if (server.child_type == CHILD_TYPE_AOF) {
            /* ... */
        } else if (hasActiveChildProcess() || server.in_exec) {
            server.aof_rewrite_scheduled = 1;
            server.stat_aofrw_consecutive_failures = 0;
            /* ... */
        } else if (rewriteAppendOnlyFileBackground() == C_OK) {
            /* ... */
        } else {
            /* ... */
        }
    }
    
  • 也可以在事件循环中自动触发,其触发条件如下:

    • AOF 处于打开状态;
    • 没有活跃的子进程;
    • AOF 文件大小超过了 aof_rewrite_min_size(默认 64M);
    • AOF 文件大小增长超过了 aof_rewrite_perc(默认 100%,即 AOF 文件大小翻倍);
    • 没有限制 AOF 重写的条件(如果上次重写失败,会依次限制 1min、2min、4min 直到 1h)。
    点击查看源码
    if (server.aof_state == AOF_ON &&
        !hasActiveChildProcess() &&
        server.aof_rewrite_perc &&
        server.aof_current_size > server.aof_rewrite_min_size)
    {
        long long base = server.aof_rewrite_base_size ?
            server.aof_rewrite_base_size : 1;
        long long growth = (server.aof_current_size*100/base) - 100;
        if (growth >= server.aof_rewrite_perc && !aofRewriteLimited()) {
            /* ... */
            rewriteAppendOnlyFileBackground();
        }
    }
    

这两种方式都调用了 rewriteAppendOnlyFileBackground 函数来在进行重写。

AOF 重写需要消耗一定的时间和资源,因此 Redis 会在后台异步地进行重写操作。和 RDB 一样,AOF 重写会 fork 一个名为 redis-aof-rewrite 的子进程,然后在子进程中进行重写操作。

重写时,依然是:

  • 将 AOF buffer 中的内容写入到结构体中;
  • 将结构体中的内容写入到 temp-rewriteaof-bg-pid.aof 临时文件
  • 然后再重命名

这样一来,即使重写失败,也不会影响原来的 AOF 文件。

如果在此期间有新的写操作,Redis 会利用 COW 机制来保证数据的一致性。当然,这些新写入的数据将来肯定也要放入 AOF 文件中,因此此时 Redis 会暂时将它们写入 AOF buffer。

点击查看源码
int rewriteAppendOnlyFileBackground(void) {
    pid_t childpid;

    // 如果有活跃的子进程在运行,返回错误
    if (hasActiveChildProcess()) return C_ERR;
    
    // 创建或打开 AOF 文件目录
    if (dirCreateIfMissing(server.aof_dirname) == -1) {
        /* ... */
        server.aof_lastbgrewrite_status = C_ERR;
        return C_ERR;
    }

    // 重置选中的数据库
    server.aof_selected_db = -1;
    // 将 buffer 写入磁盘
    flushAppendOnlyFile(1);
    // 打开新的增量 AOF 文件进行追加
    if (openNewIncrAofForAppend() != C_OK) {
        server.aof_lastbgrewrite_status = C_ERR;
        return C_ERR;
    }

    if (server.aof_state == AOF_WAIT_REWRITE) {
        // 如果 AOF 状态为等待重写,排空 AOF 的后台任务并设置同步的复制偏移量
        bioDrainWorker(BIO_AOF_FSYNC);
        atomicSet(server.fsynced_reploff_pending, server.master_repl_offset);
        server.fsynced_reploff = 0;
    }

    // 统计重写次数
    server.stat_aof_rewrites++;

    // fork 一个子进程
    if ((childpid = redisFork(CHILD_TYPE_AOF)) == 0) {
        char tmpfile[256];

        // 设置子进程名字
        redisSetProcTitle("redis-aof-rewrite");
        // 设置 CPU 亲和性
        redisSetCpuAffinity(server.aof_rewrite_cpulist);
        // 生成临时文件名
        snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) getpid());
        // 重写 AOF 文件
        if (rewriteAppendOnlyFile(tmpfile) == C_OK) {
            /* ... */
            sendChildCowInfo(CHILD_INFO_TYPE_AOF_COW_SIZE, "AOF rewrite");
            exitFromChild(0);
        } else {
            exitFromChild(1);
        }
    } else {
        // 如果 fork 失败,返回错误
        if (childpid == -1) {
            server.aof_lastbgrewrite_status = C_ERR;
            /* ... */
            return C_ERR;
        }
        /* ... */
        server.aof_rewrite_scheduled = 0;
        server.aof_rewrite_time_start = time(NULL);
        return C_OK;
    }
    return C_OK;
}

这个函数首先将 AOF buffer 中的内容全部写入到当前的 AOF 文件中,然后创建了一个子进程,在子进程中调用了 rewriteAppendOnlyFile 函数

int rewriteAppendOnlyFile(char *filename) {
    rio aof;
    FILE *fp = NULL;
    char tmpfile[256];
    
    // 生成临时文件名
    snprintf(tmpfile,256,"temp-rewriteaof-%d.aof", (int) getpid());
    // 打开临时文件
    fp = fopen(tmpfile,"w");
    if (!fp) {
        /* ... */
        return C_ERR;
    }
    
    // 初始化 rio 结构体
    rioInitWithFile(&aof,fp);

    // 如果设置了增量 fsync,设置自动同步和回收缓存
    if (server.aof_rewrite_incremental_fsync) {
        rioSetAutoSync(&aof,REDIS_AUTOSYNC_BYTES);
        rioSetReclaimCache(&aof,1);
    }

    // 开始保存
    startSaving(RDBFLAGS_AOF_PREAMBLE);

    if (server.aof_use_rdb_preamble) {
        // 如果使用 RDB 头部,保存 RDB 头部
        int error;
        if (rdbSaveRio(SLAVE_REQ_NONE,&aof,&error,RDBFLAGS_AOF_PREAMBLE,NULL) == C_ERR) {
            errno = error;
            goto werr;
        }
    } else {
        // 否则,重写 AOF 文件
        if (rewriteAppendOnlyFileRio(&aof) == C_ERR) goto werr;
    }

    // 刷新缓冲区、同步文件和回收缓存
    if (fflush(fp)) goto werr;
    if (fsync(fileno(fp))) goto werr;
    if (reclaimFilePageCache(fileno(fp), 0, 0) == -1) {
        /* ... */
    }
    if (fclose(fp)) { fp = NULL; goto werr; }
    fp = NULL;

    // 重命名临时文件
    if (rename(tmpfile,filename) == -1) {
        /* ... */
        unlink(tmpfile);
        stopSaving(0);
        return C_ERR;
    }
    stopSaving(1);

    return C_OK;

werr:
    /* ... */
    if (fp) fclose(fp);
    unlink(tmpfile);
    stopSaving(0);
    return C_ERR;
}

和之前的 RDB 一样,为了防止写入过程中出现错误,Redis 会先写入到一个临时文件中,然后再重命名为正式的 AOF 文件。

更具体的重写操作在 rewriteAppendOnlyFileRio 函数中:

int rewriteAppendOnlyFileRio(rio *aof) {
    dictEntry *de;
    int j;
    long key_count = 0;
    long long updated_time = 0;
    kvstoreIterator *kvs_it = NULL;

    // 记录时间戳
    if (server.aof_timestamp_enabled) {
        sds ts = genAofTimestampAnnotationIfNeeded(1);
        if (rioWrite(aof,ts,sdslen(ts)) == 0) { sdsfree(ts); goto werr; }
        sdsfree(ts);
    }

    // 重写函数
    if (rewriteFunctions(aof) == 0) goto werr;
    
    // 遍历所有数据库
    for (j = 0; j < server.dbnum; j++) {
        char selectcmd[] = "*2\r\n$6\r\nSELECT\r\n";
        redisDb *db = server.db + j;
        if (kvstoreSize(db->keys) == 0) continue;

        // 选择数据库
        if (rioWrite(aof,selectcmd,sizeof(selectcmd)-1) == 0) goto werr;
        if (rioWriteBulkLongLong(aof,j) == 0) goto werr;

        kvs_it = kvstoreIteratorInit(db->keys);
        // 遍历所有键值对
        while((de = kvstoreIteratorNext(kvs_it)) != NULL) {
            sds keystr;
            robj key, *o;
            long long expiretime;
            size_t aof_bytes_before_key = aof->processed_bytes;

            keystr = dictGetKey(de);
            o = dictGetVal(de);
            initStaticStringObject(key,keystr);

            expiretime = getExpire(db,&key);

            // 保存键值对
            if (o->type == OBJ_STRING) {
                char cmd[]="*3\r\n$3\r\nSET\r\n";
                if (rioWrite(aof,cmd,sizeof(cmd)-1) == 0) goto werr;
                if (rioWriteBulkObject(aof,&key) == 0) goto werr;
                if (rioWriteBulkObject(aof,o) == 0) goto werr;
            } else if (o->type == OBJ_LIST) {
                if (rewriteListObject(aof,&key,o) == 0) goto werr;
            } else if (o->type == OBJ_SET) {
                if (rewriteSetObject(aof,&key,o) == 0) goto werr;
            } else if (o->type == OBJ_ZSET) {
                if (rewriteSortedSetObject(aof,&key,o) == 0) goto werr;
            } else if (o->type == OBJ_HASH) {
                if (rewriteHashObject(aof,&key,o) == 0) goto werr;
            } else if (o->type == OBJ_STREAM) {
                if (rewriteStreamObject(aof,&key,o) == 0) goto werr;
            } else if (o->type == OBJ_MODULE) {
                if (rewriteModuleObject(aof,&key,o,j) == 0) goto werr;
            } else {
                serverPanic("Unknown object type");
            }

            // 写完后将对象释放,减少 COW
            size_t dump_size = aof->processed_bytes - aof_bytes_before_key;
            if (server.in_fork_child) dismissObject(o, dump_size);

            // 保存过期时间
            if (expiretime != -1) {
                char cmd[]="*3\r\n$9\r\nPEXPIREAT\r\n";
                if (rioWrite(aof,cmd,sizeof(cmd)-1) == 0) goto werr;
                if (rioWriteBulkObject(aof,&key) == 0) goto werr;
                if (rioWriteBulkLongLong(aof,expiretime) == 0) goto werr;
            }

            // 每写入 1024 个键值对,更新子进程信息
            if ((key_count++ & 1023) == 0) {
                long long now = mstime();
                if (now - updated_time >= 1000) {
                    sendChildInfo(CHILD_INFO_TYPE_CURRENT_INFO, key_count, "AOF rewrite");
                    updated_time = now;
                }
            }

            // 测试用的延迟
            if (server.rdb_key_save_delay)
                debugDelay(server.rdb_key_save_delay);
        }
        kvstoreIteratorRelease(kvs_it);
    }
    return C_OK;

werr:
    if (kvs_it) kvstoreIteratorRelease(kvs_it);
    return C_ERR;
}

这个函数逻辑上很好理解,就是遍历所有数据库中的所有键值对,然后按照类型调用对应的函数,将它们写入到 AOF 文件中。

值得注意的是,为了减少 COW,Redis 每写入一个键值对后,都会立即释放对象。

混合持久化

  • AOF 作为增量持久化,占用系统资源少,数据不易丢失,但是文件体积大,恢复速度慢。
  • RDB 作为全量持久化,占用系统资源多,数据易丢失,但是文件体积小,恢复速度快。

那么,有没有一种持久化方式,既占用系统资源少,数据不易丢失,又文件体积小,恢复速度快呢?

答案是混合持久化。Redis 4.0 之后,提供了混合持久化的功能。它需要使用 aof-use-rdb-preamble yes 配置。

混合持久化时,Redis 会在 AOF 文件的开头写入一个 RDB 文件的内容,然后再将缓冲区中的内容以 AOF 的格式写入文件。

Comments

Share This Post