Ch3nyang's blog collections_bookmark

post

person

about

category

category

local_offer

tag

rss_feed

rss

深入 Redis 源码 | (3)
Redis 数据库

calendar_month 2025-01
archive 中间件
tag redis tag db

There are 6 posts in series 深入 Redis 源码.

本文将会从源码层面解读 Redis 的各项机制。我们会从最基础的内容开始,尽量只看 Redis 最核心的部分,而剥离掉一些不太重要的内容,所有的源码都给出了 GitHub 的链接。

写作本文时,Redis CE 还在 7.4 版本;Redis 8.0 仍然在 Pre-Release。因此,本文主要基于 Redis 7.4,其余版本可能有所不同。本文使用的环境为 Debian 12。

KV 数据库

以上讲了很多种内部的数据结构,那么 Redis 是如何将这些数据结构和对外暴露的对象接口联系起来的呢?

数据库结构

整个 Redis 服务器最顶层的数据结构是 redisServer,其中包含了大量的成员变量,包括服务器配置信息、数据库信息、客户端信息等等。其中,最重要的莫过于 redisDb *db 了。

一个 Redis 服务器可以有多个数据库,每个数据库都是一个 redisDb 结构体。redisDb定义为:

typedef struct redisDb {
    // 数据库中的所有键及其对应的值
    kvstore *keys;
    // 设置了超时时间的键及其对应的超时时间
    kvstore *expires;
    // 哈希表中每个哈希的单个TTL(下一个要过期的字段)
    ebuckets hexpires;
    // 有客户端在阻塞等待数据的键(用于BLPOP命令)
    dict *blocking_keys;
    // 有客户端在等待数据且如果键被删除则应解除阻塞的键(用于XREADEDGROUP命令)
    dict *blocking_keys_unblock_on_nokey;
    // 已经收到PUSH命令的阻塞键,即将通知客户端
    dict *ready_keys;
    // 在 MULTI/EXEC 事务中被 WATCH 命令监视的键。
    dict *watched_keys;
    // 用于区分不同的数据库实例
    int id;
    // 平均 TTL,仅用于统计
    long long avg_ttl;
    // 活动过期周期的游标,用于遍历和处理过期键,以实现过期键的定期清理机制
    unsigned long expires_cursor;
    // 需要逐个尝试碎片整理的键名列表
    list *defrag_later;
} redisDb;

id

id 是数据库的编号。[默认情况]下,Redis 有 16 个数据库,编号从 0 到 15。可以通过 SELECT 命令来切换数据库

$ SELECT 0
OK
点击查看源码
int selectDb(client *c, int id) {
    if (id < 0 || id >= server.dbnum)
        return C_ERR;
    c->db = &server.db[id];
    return C_OK;
}

keys

其中,键值对数据库使用了一个自定义的数据结构 kvstore。其中使用了几个 dict 来存储数据:

struct _kvstore {
    int flags;
    dictType dtype;
    // 字典数组,每个字典都是一个哈希表
    dict **dicts;
    // 字典数组的数量
    long long num_dicts;
    // 字典数组的数量的二进制位数
    long long num_dicts_bits;
    // 正在进行 rehash 的字典列表
    list *rehashing;
    // 用于逐步调整字典大小的游标(仅在num_dicts > 1时使用)
    int resize_cursor;
    // 已分配的字典数量
    int allocated_dicts;
    // 非空字典的数量
    int non_empty_dicts;
    // kvstore 中总的键数量
    unsigned long long key_count;
    // kvstore 中所有字典的总桶数量
    unsigned long long bucket_count;
    // 描述了累积键频率的二进制索引树
    unsigned long long *dict_size_index;
    // 所有字典的查找表的开销
    size_t overhead_hashtable_lut;
    // 正在 rehash 的字典的开销
    size_t overhead_hashtable_rehashing;
    void *metadata[];
};

Redis 的这个 KV 数据库设计还是很有意思的,和内部的 hashtable 共用了同一个数据结构,当然也共享了渐进式扩容的功能。

robj

这个哈希表的键依然是字符串,值是一个 robj 结构体。robj 是一个结构体。这个结构体包含了编码方式和一个指针,指针指向实际的 Redis 对象。这样一来,Redis 就可以存储不同类型的数据了。

struct redisObject {
    // 类型
    unsigned type:4;
    // 编码
    unsigned encoding:4;
    // LRU/LFU 时间戳和计数
    unsigned lru:LRU_BITS; // 24
    // 引用计数
    int refcount;
    // 指向实际数据的指针
    void *ptr;
};

这个 robj 结构体使用的引用计数,这样可以有效地避免内存泄漏。当引用计数为 0 时,robj 就会被释放。引用计数主要针对的是前文提到的 shared 对象。

有很多命令可以从全局观察 Redis 数据库的状态,比如:

  • INFO [section]:查看服务器的信息;
  • KEYS pattern:阻塞地匹配键名。例如 KEYS * 可以列出所有的键名;
  • SCAN cursor [MATCH pattern] [COUNT count] [TYPE type]:异步地迭代数据库中的键。例如 SCAN 0 可以列出所有的键名;
  • FLUSHDB [ASYNC]:清空当前数据库中的所有键;
  • RANDOMKEY:随机返回一个键名;
  • DBSIZE:返回当前数据库中的键数量;
  • EXISTS key [key ...]:判断键是否存在;
  • RENAME key newkey:重命名键。

我们以 kvstoreDictFind 为例来看看 keys 的使用:

  • 首先根据字典编号找到对应的字典
  • 然后在字典中查找键
    • 根据哈希找到对应的哈希表
    • 遍历哈希表,找到对应的键
点击查看源码
dictEntry *kvstoreDictFind(kvstore *kvs, int didx, void *key) {
    // 根据字典编号找到对应字典
    dict *d = kvstoreGetDict(kvs, didx);
    if (!d)
        return NULL;
    // 在字典中查找键
    return dictFind(d, key);
}

其中,dictFind 函数为:

static dictEntry *dictFind(dict *ht, const void *key) {
    dictEntry *he;
    unsigned int h;

    if (ht->size == 0) return NULL;
    // 根据哈希找到对应的哈希表
    h = dictHashKey(ht, key) & ht->sizemask;
    he = ht->table[h];
    while(he) {
        // 遍历哈希表,找到对应的键
        if (dictCompareHashKeys(ht, key, he->key))
            return he;
        he = he->next;
    }
    return NULL;
}

内存淘汰

Redis 在内存中运行,而内存毕竟不像硬盘那样大。当 Redis 的内存占用过高时,就需要释放一些内存。Redis 采用了 LRU(Least Recently Used)和 LFU(Least Frequently Used)两种算法来释放内存。

在配置文件中,可以通过 maxmemory 来设置 Redis 的最大内存占用,通过 maxmemory-policy 来设置内存占用超过最大内存时的策略。maxmemory-policy可以是:

  • noeviction:不清理内存,但是会拒绝写入(默认);
  • volatile-lru:清理设置了过期时间的键,使用 LRU 算法;
  • volatile-lfu:清理设置了过期时间的键,使用 LFU 算法;
  • volatile-random:清理设置了过期时间的键,随机清理;
  • volatile-ttl:清理设置了过期时间的键,根据 TTL 清理;
  • allkeys-lru:清理所有键,使用 LRU 算法;
  • allkeys-lfu:清理所有键,使用 LFU 算法;
  • allkeys-random:清理所有键,随机清理。
点击查看源码
#define MAXMEMORY_VOLATILE_LRU ((0<<8)|MAXMEMORY_FLAG_LRU)
#define MAXMEMORY_VOLATILE_LFU ((1<<8)|MAXMEMORY_FLAG_LFU)
#define MAXMEMORY_VOLATILE_TTL (2<<8)
#define MAXMEMORY_VOLATILE_RANDOM (3<<8)
#define MAXMEMORY_ALLKEYS_LRU ((4<<8)|MAXMEMORY_FLAG_LRU|MAXMEMORY_FLAG_ALLKEYS)
#define MAXMEMORY_ALLKEYS_LFU ((5<<8)|MAXMEMORY_FLAG_LFU|MAXMEMORY_FLAG_ALLKEYS)
#define MAXMEMORY_ALLKEYS_RANDOM ((6<<8)|MAXMEMORY_FLAG_ALLKEYS)
#define MAXMEMORY_NO_EVICTION (7<<8)

要查看当前使用的策略,可以使用 CONFIG GET maxmemory-policy 命令:

$ CONFIG GET maxmemory-policy
1) "maxmemory-policy"
2) "noeviction"

LRU

LRU 算法是一种最近最少使用算法。Redis 使用 LRU 算法来释放内存。Redis 会记录每个键的最后一次访问时间,当内存占用过高时,Redis 会释放最近最少使用的键。

通常,LRU 会使用哈希表 + 双向链表来实现:

  • 哈希表用于快速查找键值对;
  • 双向链表用于记录键值对的访问顺序,每当访问一个键值对时,就将其移动到链表的头部。

然而,Redis 并没有使用这种方式。它使用了一种近似的 LRU 算法,牺牲了一部分准确性的同时,提高了性能。这种方法的好处是只需要占据常量大小的内存。

正如前文在 robj 处看到的那样,robj 中有一个 lru 字段,用于记录最后一次访问时间。Redis 会定期地更新这个字段,以实现 LRU 算法。我们再次看一下 robj 的定义:

struct redisObject {
    // 类型
    unsigned type:4;
    // 编码
    unsigned encoding:4;
    // LRU/LFU 时间戳和计数
    unsigned lru:LRU_BITS; // 24
    // 引用计数
    int refcount;
    // 指向实际数据的指针
    void *ptr;
};

对于 LRU,lru 字段全部用来存储时间,单位为秒。由于 24 位最多只能存储 194 天,因此在数据存储了 194 天后,所有的键的 lru 都一模一样,导致淘汰策略失效。

淘汰数据的方式如下:

  • 建立一个大小为 16 的淘汰池,池中的键按照新老顺序排序
  • 每当有一个键过期时,采样 5 个键(该值由 maxmemory-samples 参数决定),保持新老顺序插入淘汰池
  • 淘汰池已满,则扔掉最新的
点击查看源码

LRU 算法中,加入淘汰池的实现如下:

/**
 * 根据采样的键,更新淘汰池
 * @param db 数据库
 * @param samplekvs 采样的键值对
 * @param pool 淘汰池
 */
int evictionPoolPopulate(redisDb *db, kvstore *samplekvs, struct evictionPoolEntry *pool) {
    int j, k, count;
    dictEntry *samples[server.maxmemory_samples];

    // 从给定的数据库中采样 maxmemory_samples 个键值对
    int slot = kvstoreGetFairRandomDictIndex(samplekvs);
    count = kvstoreDictGetSomeKeys(samplekvs,slot,samples,server.maxmemory_samples);

    for (j = 0; j < count; j++) {
        unsigned long long idle;
        sds key;
        robj *o;
        dictEntry *de;

        de = samples[j];
        key = dictGetKey(de);

        // 如果不是 volatile-ttl,还需要查找键
        if (server.maxmemory_policy != MAXMEMORY_VOLATILE_TTL) {
            if (samplekvs != db->keys)
                de = kvstoreDictFind(db->keys, slot, key);
            o = dictGetVal(de);
        }

        if (server.maxmemory_policy & MAXMEMORY_FLAG_LRU) {
            idle = estimateObjectIdleTime(o);
        } else if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) {
            idle = 255-LFUDecrAndReturn(o);
        } else if (server.maxmemory_policy == MAXMEMORY_VOLATILE_TTL) {
            idle = ULLONG_MAX - dictGetSignedIntegerVal(de);
        } else {
            /* ... */
        }

        // 首先,找到第一个空的位置或者第一个比当前键更老的键
        k = 0;
        while (k < EVPOOL_SIZE &&
               pool[k].key &&
               pool[k].idle < idle) k++;
        // 然后插入排序,如果池中已满,则删除掉最新的键
        if (k == 0 && pool[EVPOOL_SIZE-1].key != NULL) {
            continue;
        } else if (k < EVPOOL_SIZE && pool[k].key == NULL) {
        } else {
            if (pool[EVPOOL_SIZE-1].key == NULL) {
                sds cached = pool[EVPOOL_SIZE-1].cached;
                memmove(pool+k+1,pool+k,
                    sizeof(pool[0])*(EVPOOL_SIZE-k-1));
                pool[k].cached = cached;
            } else {
                k--;
                sds cached = pool[0].cached;
                if (pool[0].key != pool[0].cached) sdsfree(pool[0].key);
                memmove(pool,pool+1,sizeof(pool[0])*k);
                pool[k].cached = cached;
            }
        }

        int klen = sdslen(key);
        if (klen > EVPOOL_CACHED_SDS_SIZE) {
            pool[k].key = sdsdup(key);
        } else {
            memcpy(pool[k].cached,key,klen+1);
            sdssetlen(pool[k].cached,klen);
            pool[k].key = pool[k].cached;
        }
        pool[k].idle = idle;
        pool[k].dbid = db->id;
        pool[k].slot = slot;
    }

    return count;
}

淘汰池中的键即使被删除了也不会立即释放,而是等到淘汰池满了之后再释放。因此,在淘汰池中内容的时候,需要检查键是否仍然存在。

点击查看源码

执行淘汰操作的实现如下:

int performEvictions(void) {
    if (!isSafeToPerformEvictions()) return EVICT_OK;

    int keys_freed = 0; // 已经释放的键的数量
    size_t mem_reported, mem_tofree; // 已经释放的内存和需要释放的内存
    long long mem_freed; // 已经释放的内存(可能为负)
    mstime_t latency, eviction_latency; // 用于记录延迟
    long long delta; // 内存变化量
    int slaves = listLength(server.slaves); // 从节点数量
    int result = EVICT_FAIL; // 淘汰结果

    // 获取内存状态,如果内存使用正常直接返回成功 
    if (getMaxmemoryState(&mem_reported,NULL,&mem_tofree,NULL) == C_OK) {
        result = EVICT_OK;
        goto update_metrics;
    }

    // 如果策略是不淘汰,则直接返回失败
    if (server.maxmemory_policy == MAXMEMORY_NO_EVICTION) {
        result = EVICT_FAIL;  /* We need to free memory, but policy forbids. */
        goto update_metrics;
    }

    unsigned long eviction_time_limit_us = evictionTimeLimitUs(); // 淘汰操作的时间限制

    mem_freed = 0;

    latencyStartMonitor(latency);

    monotime evictionTimer;
    elapsedStart(&evictionTimer);

    serverAssert(server.also_propagate.numops == 0);

    // 一直释放内存直到达到目标
    while (mem_freed < (long long)mem_tofree) {
        int j, k, i;
        static unsigned int next_db = 0;
        sds bestkey = NULL; // 选中要删除的键
        int bestdbid;  // 选中键所在的数据库ID
        redisDb *db;  // 当前操作的数据库
        dictEntry *de; // 字典项

        // 如果使用LRU、LFU或者TTL策略
        if (server.maxmemory_policy & (MAXMEMORY_FLAG_LRU|MAXMEMORY_FLAG_LFU) ||
            server.maxmemory_policy == MAXMEMORY_VOLATILE_TTL)
        {
            struct evictionPoolEntry *pool = EvictionPoolLRU;

            // 尝试从淘汰池中找出最佳删除键
            while (bestkey == NULL) {
                unsigned long total_keys = 0;

                // 遍历所有数据库填充淘汰池
                for (i = 0; i < server.dbnum; i++) {
                    db = server.db+i;
                    kvstore *kvs;

                    // 根据策略选择要处理的键集合
                    if (server.maxmemory_policy & MAXMEMORY_FLAG_ALLKEYS) {
                        kvs = db->keys; // 所有键
                    } else {
                        kvs = db->expires; // 过期键
                    }

                    unsigned long sampled_keys = 0;
                    unsigned long current_db_keys = kvstoreSize(kvs);
                    if (current_db_keys == 0) continue;

                    total_keys += current_db_keys;
                    int l = kvstoreNumNonEmptyDicts(kvs);
                    
                    // 采样填充淘汰池
                    while (l--) {
                        sampled_keys += evictionPoolPopulate(db, kvs, pool);
                        // 采样数达到上限
                        if (sampled_keys >= (unsigned long) server.maxmemory_samples)
                            break;
                        // 键数量太少
                        if (current_db_keys < (unsigned long) server.maxmemory_samples*10)
                            break;
                    }
                }

                // 没有可供淘汰的键
                if (!total_keys) break;

                // 从后向前遍历淘汰池,选择最佳淘汰对象
                for (k = EVPOOL_SIZE-1; k >= 0; k--) {
                    if (pool[k].key == NULL) continue;
                    bestdbid = pool[k].dbid;

                    kvstore *kvs;
                    if (server.maxmemory_policy & MAXMEMORY_FLAG_ALLKEYS) {
                        kvs = server.db[bestdbid].keys;
                    } else {
                        kvs = server.db[bestdbid].expires;
                    }

                    // 检查键是否仍然存在
                    de = kvstoreDictFind(kvs, pool[k].slot, pool[k].key);

                    // 清空池中该条目
                    if (pool[k].key != pool[k].cached)
                        sdsfree(pool[k].key);
                    pool[k].key = NULL;
                    pool[k].idle = 0;

                    // 如果找到可用键就选中它
                    if (de) {
                        bestkey = dictGetKey(de);
                        break;
                    } else {
                        /* Ghost... Iterate again. */
                    }
                }
            }
        }

        // 如果是随机策略
        else if (server.maxmemory_policy == MAXMEMORY_ALLKEYS_RANDOM ||
                 server.maxmemory_policy == MAXMEMORY_VOLATILE_RANDOM)
        {
            // 轮流访问每个数据库
            for (i = 0; i < server.dbnum; i++) {
                j = (++next_db) % server.dbnum;
                db = server.db+j;
                kvstore *kvs;

                // 选择要处理的键集合
                if (server.maxmemory_policy == MAXMEMORY_ALLKEYS_RANDOM) {
                    kvs = db->keys;
                } else {
                    kvs = db->expires;
                }

                // 随机选择一个键
                int slot = kvstoreGetFairRandomDictIndex(kvs);
                de = kvstoreDictGetRandomKey(kvs, slot);
                if (de) {
                    bestkey = dictGetKey(de);
                    bestdbid = j;
                    break;
                }
            }
        }

        // 删除选中的键
        if (bestkey) {
            db = server.db+bestdbid;
            robj *keyobj = createStringObject(bestkey,sdslen(bestkey));
            
            enterExecutionUnit(1, 0);
            delta = (long long) zmalloc_used_memory();
            latencyStartMonitor(eviction_latency);

            // 删除键
            dbGenericDelete(db,keyobj,server.lazyfree_lazy_eviction,DB_FLAG_KEY_EVICTED);

            latencyEndMonitor(eviction_latency);
            latencyAddSampleIfNeeded("eviction-del",eviction_latency);
            delta -= (long long) zmalloc_used_memory();
            mem_freed += delta;
            server.stat_evictedkeys++;
            signalModifiedKey(NULL,db,keyobj);
            notifyKeyspaceEvent(NOTIFY_EVICTED, "evicted",
                keyobj, db->id);
            propagateDeletion(db,keyobj,server.lazyfree_lazy_eviction);
            exitExecutionUnit();
            postExecutionUnitOperations();
            decrRefCount(keyobj);
            keys_freed++;

            // 每删除16个键检查一次
            if (keys_freed % 16 == 0) {
                if (slaves) flushSlavesOutputBuffers();

                // 检查内存是否已经达标
                if (server.lazyfree_lazy_eviction) {
                    if (getMaxmemoryState(NULL,NULL,NULL,NULL) == C_OK) {
                        break;
                    }
                }

                // 检查是否超时 
                if (elapsedUs(evictionTimer) > eviction_time_limit_us) {
                    startEvictionTimeProc();
                    break;
                }
            }
        } else {
            // 没有可释放的键
            goto cant_free;
        }
    }
    
    // 更新淘汰结果
    result = (isEvictionProcRunning) ? EVICT_RUNNING : EVICT_OK;

cant_free:
    if (result == EVICT_FAIL) {
        // 如果淘汰失败但有后台任务在释放内存,稍等一会
        mstime_t lazyfree_latency;
        latencyStartMonitor(lazyfree_latency);
        while (bioPendingJobsOfType(BIO_LAZY_FREE) &&
              elapsedUs(evictionTimer) < eviction_time_limit_us) {
            if (getMaxmemoryState(NULL,NULL,NULL,NULL) == C_OK) {
                result = EVICT_OK;
                break;
            }
            usleep(eviction_time_limit_us < 1000 ? eviction_time_limit_us : 1000);
        }
        latencyEndMonitor(lazyfree_latency);
        latencyAddSampleIfNeeded("eviction-lazyfree",lazyfree_latency);
    }

    latencyEndMonitor(latency);
    latencyAddSampleIfNeeded("eviction-cycle",latency);

update_metrics:
    // 更新统计指标
    if (result == EVICT_RUNNING || result == EVICT_FAIL) {
        if (server.stat_last_eviction_exceeded_time == 0)
            elapsedStart(&server.stat_last_eviction_exceeded_time);
    } else if (result == EVICT_OK) {
        if (server.stat_last_eviction_exceeded_time != 0) {
            server.stat_total_eviction_exceeded_time += elapsedUs(server.stat_last_eviction_exceeded_time);
            server.stat_last_eviction_exceeded_time = 0;
        }
    }
    return result;
}

LRU 仅仅关注了数据的访问时间,而没有关注数据的访问频率。这样一来,一些热点数据可能会被错误地淘汰。

除了 Redis,还有很多地方使用了 LRU 算法,比如 Linux 内核页表交换、MySQL 缓存池的缓存页替换等。

LFU

LFU 算法是一种最少使用频率算法。Redis 会记录每个键的访问频率,当内存占用过高时,Redis 会释放访问频率最低的键。

LFU 复用了之前 LRU 的 lru 字段,但它被分为了两部分:

  • 前 16 位存储上次访问时间,单位为分钟
  • 后 8 位存储访问频率。这里不是访问次数,而是一个随着时间减少的值,初始为 5

每当有键被访问时:

  • 首先计算和前一次访问直接的时间差,然后据此减少访问频率。减少的速率可由 lfu-decay-time 参数调整,每 lfu-decay-time 分钟减少 1,默认为 1,也就是默认情况下一个键最多有 5 分钟不访问
  • 然后按照概率增加访问频率。访问频率越高,增加的概率越低。增加的概率可由 lfu-log-factor 参数调整,默认为 10,以 \(\frac{1}{(\text{访问频率} - 5) \times 10 + 1}\) 的概率增加 1
点击查看源码

LFU 算法中,更新访问频率的实现如下:

uint8_t LFULogIncr(uint8_t counter) {
    if (counter == 255) return 255;
    double r = (double)rand()/RAND_MAX;
    double baseval = counter - LFU_INIT_VAL;
    if (baseval < 0) baseval = 0;
    double p = 1.0/(baseval*server.lfu_log_factor+1);
    if (r < p) counter++;
    return counter;
}

unsigned long LFUDecrAndReturn(robj *o) {
    unsigned long ldt = o->lru >> 8;
    unsigned long counter = o->lru & 255;
    unsigned long num_periods = server.lfu_decay_time ? LFUTimeElapsed(ldt) / server.lfu_decay_time : 0;
    if (num_periods)
        counter = (num_periods > counter) ? 0 : counter - num_periods;
    return counter;
}

过期删除

Redis 作为一个内存数据库,肯定不可能像很多 SQL 数据库那样无限制地扩张。当 Redis 中存储的数据量变多后,一是会导致内存占用过高,二是会导致 Redis 的性能下降。为了解决这个问题,Redis 提供了过期删除机制。

可以使用 SETEXEXPIREEXPIREAT 等命令为键设置过期时间。当键过期后,Redis 会将其删除:

$ SETEX name 5 ch3nyang
OK

$ TTL name
(integer) 3

$ EXPIRE name 10
(integer) 1

$ TTL name
(integer) 8

// 等待 10 秒后

$ TTL name
(integer) -2

$ GET name
(nil)

NXXXGTLT 等参数可以用于控制过期时间的设置:

  • NX:只在键不存在时设置过期时间;
  • XX:只在键存在时设置过期时间;
  • GT:只在键的过期时间大于给定时间时设置过期时间;
  • LT:只在键的过期时间小于给定时间时设置过期时间。

过期键的保存

Redis 为了实现过期删除机制,需要保存过期键的信息。这个信息保存在上文所述 redisDbexpires 字典中。expires 字典的键是过期键的键名,值是过期时间戳。

注意,这里是具体的时间戳,而不是剩余时间。这样一来,Redis 就可以通过比较时间戳来判断键是否过期了。

点击查看源码

设置过期时间的代码如下:

void expireGenericCommand(client *c, long long basetime, int unit) {
    /* unit 参数用于指定时间单位,可以是 UNIT_SECONDS 或 UNIT_MILLISECONDS */

    robj *key = c->argv[1], *param = c->argv[2];
    long long when; /* unix time in milliseconds when the key will expire. */
    long long current_expire = -1;
    int flag = 0;

    // 解析参数
    if (parseExtendedExpireArgumentsOrReply(c, &flag) != C_OK) {
        return;
    }
    if (getLongLongFromObjectOrReply(c, param, &when, NULL) != C_OK)
        return;

    // 防止时间溢出
    if (unit == UNIT_SECONDS) {
        if (when > LLONG_MAX / 1000 || when < LLONG_MIN / 1000) {
            addReplyErrorExpireTime(c);
            return;
        }
        when *= 1000;
    }
    if (when > LLONG_MAX - basetime) {
        addReplyErrorExpireTime(c);
        return;
    }
    // 计算过期时间
    when += basetime;

    // 如果找不到键,直接返回
    if (lookupKeyWrite(c->db,key) == NULL) {
        addReply(c,shared.czero);
        return;
    }

    // 处理参数
    if (flag) {
        // 获取当前过期时间
        current_expire = getExpire(c->db, key);

        // 如果 NX 选项被设置,需要已经过期
        if (flag & EXPIRE_NX) {
            if (current_expire != -1) {
                addReply(c,shared.czero);
                return;
            }
        }

        // 如果 XX 选项被设置,需要未过期
        if (flag & EXPIRE_XX) {
            if (current_expire == -1) {
                addReply(c,shared.czero);
                return;
            }
        }

        // 如果 GT 选项被设置,需要大于当前过期时间
        if (flag & EXPIRE_GT) {
            if (when <= current_expire || current_expire == -1) {
                addReply(c,shared.czero);
                return;
            }
        }

        // 如果 LT 选项被设置,需要小于当前过期时间
        if (flag & EXPIRE_LT) {
            if (current_expire != -1 && when >= current_expire) {
                addReply(c,shared.czero);
                return;
            }
        }
    }

    if (checkAlreadyExpired(when)) {
        // 如果过期时间已经过期,直接删除键
        robj *aux;

        int deleted = dbGenericDelete(c->db,key,server.lazyfree_lazy_expire,DB_FLAG_KEY_EXPIRED);
        serverAssertWithInfo(c,key,deleted);
        server.dirty++;

        aux = server.lazyfree_lazy_expire ? shared.unlink : shared.del;
        rewriteClientCommandVector(c,2,aux,key);
        signalModifiedKey(c,c->db,key);
        notifyKeyspaceEvent(NOTIFY_GENERIC,"del",key,c->db->id);
        addReply(c, shared.cone);
        return;
    } else {
        // 设置新的过期时间
        setExpire(c,c->db,key,when);
        addReply(c,shared.cone);
        if (c->cmd->proc != pexpireatCommand) {
            rewriteClientCommandArgument(c,0,shared.pexpireat);
        }

        if (basetime != 0 || unit == UNIT_SECONDS) {
            robj *when_obj = createStringObjectFromLongLong(when);
            rewriteClientCommandArgument(c,2,when_obj);
            decrRefCount(when_obj);
        }

        signalModifiedKey(c,c->db,key);
        notifyKeyspaceEvent(NOTIFY_GENERIC,"expire",key,c->db->id);
        server.dirty++;
        return;
    }
}

其中,最关键的是 setExpire 函数

void setExpire(client *c, redisDb *db, robj *key, long long when) {
    dictEntry *kde, *de, *existing;

    // 获取键的槽位
    int slot = getKeySlot(key->ptr);
    // 在数据库的 keys 字典中查找键对应的条目
    kde = kvstoreDictFind(db->keys, slot, key->ptr);
    // 确保找到的键不为空
    serverAssertWithInfo(NULL,key,kde != NULL);
    // 在数据库的 expires 字典中添加键
    de = kvstoreDictAddRaw(db->expires, slot, dictGetKey(kde), &existing);
    if (existing) {
        // 如果键存在,更新过期时间
        dictSetSignedIntegerVal(existing, when);
    } else {
        // 如果键不存在,设置过期时间
        dictSetSignedIntegerVal(de, when);
    }

    // 主从节点相关
    int writable_slave = server.masterhost && server.repl_slave_ro == 0;
    if (c && writable_slave && !(c->flags & CLIENT_MASTER))
        rememberSlaveKeyWithExpire(db,key);
}

过期删除策略

现在我们知道了 expires 字典中保存了过期键的信息,那么 Redis 是如何删除过期键的呢?常见的过期删除策略有三种:

  • 定时删除:在设置过期时间时,创建一个定时器,到时间后触发删除操作。

    这一方法在 Redis 上表现尤其的烂,因为 Redis 对时间事件使用链表实现,遍历的时间复杂度很高;同时,CPU 忙着处理高消耗任务时,插进来一个无关紧要的删除过期键任务,会导致 Redis 的性能下降。

  • 惰性删除:在获取键时,检查键是否过期,如果过期就删除。

    这一方法在占用的系统资源是最少的。然而,把一堆过期的键留在内存中,会导致内存占用过高。

  • 定期删除:每隔一段时间,随机检查一批键是否过期,如果过期就删除。

    这一方法属于是定时删除和惰性删除的折中,兼顾了性能和资源占用。然而,如何确定定期删除的频率是一个问题。

对于 Redis 来说,它采用的是惰性删除+定期删除的策略。

惰性删除

惰性删除是在获取键时,检查键是否过期,如果过期就删除。这一策略在 lookupKeydbRandomKeydelGenericCommandscanGenericCommand 这四个函数中被使用,几乎覆盖了 Redis 的读写键的操作。

点击查看源码

这一策略的代码如下:

keyStatus expireIfNeeded(redisDb *db, robj *key, int flags) {
    // 如果 lazy_expire 被禁用,直接返回  KEY_VALID
    if (server.lazy_expire_disabled) return KEY_VALID;
    // 如果键未过期,直接返回 KEY_VALID
    if (!keyIsExpired(db,key)) return KEY_VALID;

    if (server.masterhost != NULL) {
        // 如果是从节点,如果当前客户端是主节点,返回 KEY_VALID
        if (server.current_client && (server.current_client->flags & CLIENT_MASTER)) return KEY_VALID;
        // 如果没有 EXPIRE_FORCE_DELETE_EXPIRED 标志,返回 KEY_EXPIRED
        if (!(flags & EXPIRE_FORCE_DELETE_EXPIRED)) return KEY_EXPIRED;
    }

    // 如果有 EXPIRE_AVOID_DELETE_EXPIRED 标志,返回 KEY_EXPIRED
    if (flags & EXPIRE_AVOID_DELETE_EXPIRED)
        return KEY_EXPIRED;

    // 如果有暂停的更新操作,返回 KEY_EXPIRED
    if (isPausedActionsWithUpdate(PAUSE_ACTION_EXPIRE)) return KEY_EXPIRED;

    int static_key = key->refcount == OBJ_STATIC_REFCOUNT;
    if (static_key) {
        // 如果是静态键,新建一个字符串对象以便删除
        key = createStringObject(key->ptr, sdslen(key->ptr));
    }

    // 删除键并传播
    deleteExpiredKeyAndPropagate(db,key);
    if (static_key) {
        // 如果是静态键,减少引用计数
        decrRefCount(key);
    }
    return KEY_DELETED;
}

定期删除

定期删除是每隔一段时间,随机检查一批键是否过期,如果过期就删除。这一行为需要权衡删除的频率,Redis 提供了 active_expire_effort 参数来调整这一频率,它可以设置为 1 到 10 之间的整数:

  • \(\text{每次循环处理的键数量} = 20 + 5 \times \text{effort}\),也就是每次可以处理的过期键数量,而每次最多扫描 20 倍这个数值的桶;
  • \(\text{快速周期的持续时间} = 1000 + 250 \times \text{effort}\),也就是快速周期的时间限制;
  • \(\text{慢速周期的比例} = 25 + 2 \times \text{effort}\),也就是慢速周期可以占据 CPU 时间的比例;
  • \(\text{可接受的过期键数量} = 10 - \text{effort}\),也就是如果过期键数量不多,就不开始新的快速周期。

定期删除分为快速周期和慢速周期,它们的区别只在于时间限制和处理的键数量不同。它们按照如下策略进行:

  • 如果指定要快速周期:
    • 如果上一个快速周期没有因为时间限制而退出,但过期键数量不够多,就不开始新的快速周期
    • 如果上一个快速周期距离现在不到快速周期持续时间的两倍,就不开始新的快速周期
  • 如果上一次触发了时间限制,这次就处理所有的数据库
  • 遍历要处理的数据库并根据限制条件处理过期键
点击查看源码

这一策略在 activeExpireCycle 函数中被使用:

void activeExpireCycle(int type) {
    unsigned long
    effort = server.active_expire_effort-1, // 0-9
    // 每次循环处理的键数量 = 20 + 5 * effort
    config_keys_per_loop = ACTIVE_EXPIRE_CYCLE_KEYS_PER_LOOP +
                           ACTIVE_EXPIRE_CYCLE_KEYS_PER_LOOP/4*effort,
    // 快速周期的持续时间 = 1000 + 250 * effort
    config_cycle_fast_duration = ACTIVE_EXPIRE_CYCLE_FAST_DURATION +
                                 ACTIVE_EXPIRE_CYCLE_FAST_DURATION/4*effort,
    // 慢速周期的比例 = 25 + 2 * effort
    config_cycle_slow_time_perc = ACTIVE_EXPIRE_CYCLE_SLOW_TIME_PERC +
                                  2*effort,
    // 可接受的过期键数量 = 10 - effort
    config_cycle_acceptable_stale = ACTIVE_EXPIRE_CYCLE_ACCEPTABLE_STALE-
                                    effort;

    // 几个静态的全局变量
    static unsigned int current_db = 0;
    static int timelimit_exit = 0;
    static long long last_fast_cycle = 0;

    int j, iteration = 0;
    int dbs_per_call = CRON_DBS_PER_CALL; // 16
    int dbs_performed = 0;
    long long start = ustime(), timelimit, elapsed;

    // 如果有暂停的更新操作,直接返回
    if (isPausedActionsWithUpdate(PAUSE_ACTION_EXPIRE)) return;

    // 如果类型是快速周期
    if (type == ACTIVE_EXPIRE_CYCLE_FAST) {
        // 如果上一个快速周期没有因为时间限制而退出,且过期键数量不多,就不开始新的快速周期
        if (!timelimit_exit &&
            server.stat_expired_stale_perc < config_cycle_acceptable_stale)
            return;

        // 如果上一个快速周期距离现在不到 快速周期的持续时间 * 2,就不开始新的快速周期
        if (start < last_fast_cycle + (long long)config_cycle_fast_duration*2)
            return;

        // 设置新的快速周期的开始时间
        last_fast_cycle = start;
    }

    // 防止 db 数量不足 16
    // 如果上一次触发了时间限制,这次就处理所有的 db
    if (dbs_per_call > server.dbnum || timelimit_exit)
        dbs_per_call = server.dbnum;

    // 每个 CPU 周期的时间限制(微秒)
    timelimit = config_cycle_slow_time_perc*1000000/server.hz/100;
    timelimit_exit = 0;
    if (timelimit <= 0) timelimit = 1;

    if (type == ACTIVE_EXPIRE_CYCLE_FAST)
        // 快速周期有自己的时间限制方案
        timelimit = config_cycle_fast_duration;

    long total_sampled = 0;
    long total_expired = 0;

    serverAssert(server.also_propagate.numops == 0);

    // 当时间限制没到,且还有 db 没处理完时,循环
    for (j = 0; dbs_performed < dbs_per_call && timelimit_exit == 0 && j < server.dbnum; j++) {
        expireScanData data;
        data.ttl_sum = 0;
        data.ttl_samples = 0;

        // 获取当前 db
        redisDb *db = server.db+(current_db % server.dbnum);
        data.db = db;

        int db_done = 0;
        int update_avg_ttl_times = 0, repeat = 0;

        // 指示下次定期删除时的起始 db
        current_db++;

        // 将哈希字段过期与密钥过期交错
        activeExpireHashFieldCycle(type);

        if (kvstoreSize(db->expires))
            // 如果有过期键,处理过的 db 数量加 1
            dbs_performed++;

        // 由 repeat 控制是否继续循环
        do {
            // 最大需要检查的键数量
            unsigned long num;
            iteration++;

            // 如果没有待检查的过期键,直接跳出循环
            if ((num = kvstoreSize(db->expires)) == 0) {
                db->avg_ttl = 0;
                break;
            }
            data.now = mstime();

            // 已经检查的键数量
            data.sampled = 0;
            data.expired = 0;

            // 如果可以检查的键数量大于规定每次检查的键数量,就设置为每次检查的键数量
            if (num > config_keys_per_loop)
                num = config_keys_per_loop;

            // 最多扫描的桶数量 = 最大需要检查的键数量 * 20
            long max_buckets = num*20;
            long checked_buckets = 0;

            int origin_ttl_samples = data.ttl_samples;

            // 如果已经检查的键数量大于最大需要检查的键数量,或者已经检查的桶数量大于最多扫描的桶数量,就跳出循环
            while (data.sampled < num && checked_buckets < max_buckets) {
                // 扫描过期键
                db->expires_cursor = kvstoreScan(db->expires, db->expires_cursor, -1, expireScanCallback, isExpiryDictValidForSamplingCb, &data);
                if (db->expires_cursor == 0) {
                    db_done = 1;
                    break;
                }
                checked_buckets++;
            }
            total_expired += data.expired;
            total_sampled += data.sampled;

            // 更新平均 TTL
            if (data.ttl_samples - origin_ttl_samples > 0) update_avg_ttl_times++;

            // 如果数据库扫描完成或有足够多需要处理的过期键,则不重复当前数据库的周期
            repeat = db_done ? 0 : (data.sampled == 0 || (data.expired * 100 / data.sampled) > config_cycle_acceptable_stale);

            // 为了防止阻塞,每 16 次迭代检查一次时间限制
            if ((iteration & 0xf) == 0 || !repeat) {
                // 更新平均 TTL,该操作较为耗时
                if (data.ttl_samples) {
                    long long avg_ttl = data.ttl_sum / data.ttl_samples;

                    // 当前的估值占 2% 的权重,之前的估值占 98% 的权重
                    if (db->avg_ttl == 0) {
                        db->avg_ttl = avg_ttl;
                    } else {
                        // 使用常量表加速计算
                        db->avg_ttl = avg_ttl + (db->avg_ttl - avg_ttl) * avg_ttl_factor[update_avg_ttl_times - 1] ;
                    }
                    update_avg_ttl_times = 0;
                    data.ttl_sum = 0;
                    data.ttl_samples = 0;
                }
                // 每 16 次迭代检查一次时间限制
                if ((iteration & 0xf) == 0) {
                    elapsed = ustime()-start;
                    if (elapsed > timelimit) {
                        timelimit_exit = 1;
                        server.stat_expired_time_cap_reached_count++;
                        break;
                    }
                }
            }
        } while (repeat);
    }

    elapsed = ustime()-start;
    server.stat_expire_cycle_time_used += elapsed;
    latencyAddSampleIfNeeded("expire-cycle",elapsed/1000);

    // 更新未过期键的统计信息,当前的占比为 5%,之前的占比为 95%
    double current_perc;
    if (total_sampled) {
        current_perc = (double)total_expired/total_sampled;
    } else
        current_perc = 0;
    server.stat_expired_stale_perc = (current_perc*0.05)+
                                     (server.stat_expired_stale_perc*0.95);
}

可以看到,为了均衡性能和资源占用,Redis 可谓是处心积虑,设计了大量的参数和策略。

注意到,Redis 还会同时记录评价 TTL 和过期键的比例,以便在下一次定期删除时能够更好地调整参数。它们都使用了加权平均值的方法,之前的数据占 \(95%\),当前的数据占 \(5%\)。

Comments

Share This Post