Ch3nyang's blog collections_bookmark

post

person

about

category

category

local_offer

tag

rss_feed

rss

深入 Redis 源码 | (2)
Redis 数据结构

calendar_month 2025-01
archive 中间件
tag redis tag structure

There are 6 posts in series 深入 Redis 源码.

本文将会从源码层面解读 Redis 的各项机制。我们会从最基础的内容开始,尽量只看 Redis 最核心的部分,而剥离掉一些不太重要的内容,所有的源码都给出了 GitHub 的链接。

写作本文时,Redis CE 还在 7.4 版本;Redis 8.0 仍然在 Pre-Release。因此,本文主要基于 Redis 7.4,其余版本可能有所不同。本文使用的环境为 Debian 12。

数据结构

以上我们介绍了 Redis 的几种 Redis 对象,用来实现它们的数据结构包括了 intembstrsdsziplisthashtablequicklistintsetskiplist。我们接下来将会逐个介绍这些数据结构。

int

存储结构

我们在前文中看到

robj *createStringObjectFromLongLongWithOptions(long long value, int flag) {
    robj *o;

    /* ... */

    o = createObject(OBJ_STRING, NULL);
    o->encoding = OBJ_ENCODING_INT;
    o->ptr = (void*)((long)value);

    /* ... */

    return o;
}

可见,int 类型的数据结构是一个 robj 结构体,ptr 指向一个 long 类型的数据。这个数据结构是一个整数,可以存储在 long 范围内的整数。

embstr

存储结构

embstr 也是创建了一个 robj 结构体,只不过它的 ptr 指向的是一个 sdshdr8 结构体,这个结构体我们会在后文介绍:

robj *createEmbeddedStringObject(const char *ptr, size_t len) {
    // 为 embstr 分配空间
    robj *o = zmalloc(sizeof(robj)+sizeof(struct sdshdr8)+len+1);
    struct sdshdr8 *sh = (void*)(o+1);

    // 设置类型、编码、指针、引用计数和 LRU
    o->type = OBJ_STRING;
    o->encoding = OBJ_ENCODING_EMBSTR;
    o->ptr = sh+1;
    o->refcount = 1;
    o->lru = 0;

    sh->len = len;
    sh->alloc = len;
    sh->flags = SDS_TYPE_8;
    // 拷贝字符串内容并添加 '\0'
    if (ptr == SDS_NOINIT)
        sh->buf[len] = '\0';
    else if (ptr) {
        memcpy(sh->buf,ptr,len);
        sh->buf[len] = '\0';
    } else {
        memset(sh->buf,0,len+1);
    }
    return o;
}

这使得 embstr 成为了一个只读的字符串,不支持修改。这样做的好处是可以节省一些内存,因为 sds 会多分配一些空间。

如果要修改 embstr,Redis 会将其转换为 sds

sds

存储结构

sds 结构体可以动态调整字符串的大小以节省空间:

robj *createRawStringObject(const char *ptr, size_t len) {
    return createObject(OBJ_STRING, sdsnewlen(ptr,len));
}

按照大小不同,共定义了 4 种sdshdr5 不会被使用到):

  • sdshdr8:长度为 1 字节
  • sdshdr16:长度为 2 字节
  • sdshdr32:长度为 4 字节
  • sdshdr64:长度为 8 字节
展开代码
struct __attribute__ ((__packed__)) sdshdr8 {
    uint8_t len;
    uint8_t alloc;
    unsigned char flags;
    char buf[];
};
struct __attribute__ ((__packed__)) sdshdr16 {
    uint16_t len;
    uint16_t alloc;
    unsigned char flags;
    char buf[];
};
struct __attribute__ ((__packed__)) sdshdr32 {
    uint32_t len;
    uint32_t alloc;
    unsigned char flags;
    char buf[];
};
struct __attribute__ ((__packed__)) sdshdr64 {
    uint64_t len;
    uint64_t alloc;
    unsigned char flags;
    char buf[];
};

这 4 种结构体分别用于存储长度为 1 字节、2 字节、4 字节和 8 字节的字符串。这里的 len 表示字符串的实际长度,alloc 表示分配的空间大小,flags 表示类型,buf 表示字符串的内容。

这里的 __attribute__ ((__packed__)) 是告诉编译器不要对结构体进行字节对齐,而是按照实际的状态。

初始化字符串时,Redis 会首先调用 sdsReqType 函数,根据字符串长度决定字符串类型,然后分配空间并初始化字符串。

展开代码
static inline char sdsReqType(size_t string_size) {
    if (string_size < 1<<5)
        return SDS_TYPE_5;
    if (string_size < 1<<8)
        return SDS_TYPE_8;
    if (string_size < 1<<16)
        return SDS_TYPE_16;
#if (LONG_MAX == LLONG_MAX)
    if (string_size < 1ll<<32)
        return SDS_TYPE_32;
    return SDS_TYPE_64;
#else
    return SDS_TYPE_32;
#endif
}

扩容机制

对于 sds,Redis 会根据字符串长度调整空间大小。这个过程在 _sdsMakeRoomFor 函数中实现。

展开代码
sds _sdsMakeRoomFor(sds s, size_t addlen, int greedy) {
    void *sh, *newsh;
    size_t avail = sdsavail(s);
    size_t len, newlen, reqlen;
    char type, oldtype = s[-1] & SDS_TYPE_MASK;
    int hdrlen;
    size_t usable;

    // 如果剩余空间足够,直接返回
    if (avail >= addlen) return s;

    // 获取字符串长度
    len = sdslen(s);
    // 获取字符串头指针
    sh = (char*)s-sdsHdrSize(oldtype);
    // 计算新字符串长度
    reqlen = newlen = (len+addlen);
    assert(newlen > len);
    // 根据 greedy 参数调整空间大小
    if (greedy == 1) {
        if (newlen < SDS_MAX_PREALLOC)
            newlen *= 2;
        else
            newlen += SDS_MAX_PREALLOC;
    }

    // 计算新字符串类型
    type = sdsReqType(newlen);

    if (type == SDS_TYPE_5) type = SDS_TYPE_8;

    // 计算头部长度
    hdrlen = sdsHdrSize(type);
    assert(hdrlen + newlen + 1 > reqlen);
    // 如果类型相同,直接调整空间大小
    if (oldtype==type) {
        // 重新分配空间大小为:头部长度+新字符串长度+1
        newsh = s_realloc_usable(sh, hdrlen+newlen+1, &usable);
        if (newsh == NULL) return NULL;
        s = (char*)newsh+hdrlen;
    } else {
        // 类型不同的情况下,头部长度会发生变化,需要重新分配空间
        newsh = s_malloc_usable(hdrlen+newlen+1, &usable);
        if (newsh == NULL) return NULL;
        // 将原字符串内容拷贝到新空间中
        memcpy((char*)newsh+hdrlen, s, len+1);
        s_free(sh);
        // 更新头部指针、类型和长度
        s = (char*)newsh+hdrlen;
        s[-1] = type;
        sdssetlen(s, len);
    }
    // 计算剩余空间
    usable = usable-hdrlen-1;
    if (usable > sdsTypeMaxSize(type))
        usable = sdsTypeMaxSize(type);
    sdssetalloc(s, usable);
    return s;
}

空间分配的策略取决于 greddy 参数:

  • 如果 greddy 为 0,Redis 会根据字符串长度调整空间大小为刚刚好;
  • 如果 greddy 为 1
    • 如果原大小小于 \(1024 \times 1024\),Redis 会让空间大小增加一倍
    • 否则增加 \(1024 \times 1024\)。

Redis 还设置了 cron job 来释放未使用的空间。当字符串浪费掉的空间大于 \(1024 \times 4\) 时,就会调用 sdsResize 函数来缩减大小。此过程和上面扩容类似,也是计算字符串长度,然后重新分配空间。

总结

Redis 还实现了很多常见的字符串操作,如拼接、复制、比较等。这些操作都是基于 sds 实现的。众所周知,每个 C 语言项目都会实现一个自己的字符串库,经常写 C 的大伙肯定不但看腻了,也写腻了。Redis 的实现也没什么很特别的,这里就不再赘述了。

sds 这种设计带来了几个好处:

  1. \(O(1)\) 时间复杂度的字符串长度计算;
  2. 操作全部在字节级别,可以存储任意数据,二进制安全;
  3. 可以动态调整空间大小,节省内存。
  4. 通过记录已分配大小和实际大小,可以避免缓冲区溢出。
  5. 通过比需要的空间多分配一些,可以有效减少内存分配次数。

hashtable

存储结构

Redis 使用存储了 hashtable 的字典结构实现了 HASHSET 和本身的键值对数据库。SETHASH 本质上是一样的,只是 SET 的值为空。

字典类型被定义在 dict.h 中:

struct dict {
    // 字典的一些基本属性
    dictType *type;

    // 两个哈希表,一个用于存储数据,一个用于扩容
    dictEntry **ht_table[2];
    // 两个哈希表的大小
    unsigned long ht_used[2];

    // 扩容用的索引
    long rehashidx;

    // 一些标志位
    unsigned pauserehash : 15;
    unsigned useStoredKeyApi : 1;
    signed char ht_size_exp[2];
    int16_t pauseAutoResize;
    void *metadata[];
};

其中的 dictEntry 结构体定义在 dict.c 中:

struct dictEntry {
    // 键
    void *key;
    // 值
    union {
        void *val;
        uint64_t u64;
        int64_t s64;
        double d;
    } v;
    // 下一个条目的指针
    struct dictEntry *next;
};

Redis 使用的是链地址法来解决冲突。从上面的代码可以看出,每个元素都会有一个指针,指向下一个元素。这样,当出现哈希冲突时,即将冲突的元素放在同一个桶中,通过链表连接。

扩容机制

Redis 的 hashtable 初始大小为 \(4\):

#define DICT_HT_INITIAL_SIZE 4

在运行过程中,Redis 为了避免挤爆 hashtable,会判断是否需要扩容

展开代码
int dictExpandIfNeeded(dict *d) {
    // 如果正在扩容,则别重复扩容了
    if (dictIsRehashing(d)) return DICT_OK;

    // 如果哈希表为空,则扩容到初始大小(4)
    if (DICTHT_SIZE(d->ht_size_exp[0]) == 0) {
        dictExpand(d, DICT_HT_INITIAL_SIZE);
        return DICT_OK;
    }
    
    // 如果满足扩容条件,则扩容
    if ((dict_can_resize == DICT_RESIZE_ENABLE &&
         d->ht_used[0] >= DICTHT_SIZE(d->ht_size_exp[0])) ||
        (dict_can_resize != DICT_RESIZE_FORBID &&
         d->ht_used[0] >= dict_force_resize_ratio * DICTHT_SIZE(d->ht_size_exp[0])))
    {
        // 由于有时候扩容需要分配大量内存,所以需要检查是否允许扩容
        if (dictTypeResizeAllowed(d, d->ht_used[0] + 1))
            // 扩容
            dictExpand(d, d->ht_used[0] + 1);
        return DICT_OK;
    }
    return DICT_ERR;
}

hashtable 扩容的条件有三种:

  1. 如果 hashtable 为空,扩容到初始大小 \(4\)。
  2. 如果 hashtable 的负载因子超过了 \(1:1\)(即平均每个桶中有超过一个元素),且允许扩容,则进行扩容。
  3. 如果 hashtable 的负载因子超过了 dict_force_resize_ratio(默认为 \(4\)),不管是否允许扩容,都进行扩容。

扩容后的大小通过函数计算得到。具体来讲,如果扩容后大小不超过 long 类型的最大值,那么 hashtable 的大小会以 \(2\) 的幂次方增长。

展开代码
static signed char _dictNextExp(unsigned long size)
{
    if (size <= DICT_HT_INITIAL_SIZE) return DICT_HT_INITIAL_EXP;
    if (size >= LONG_MAX) return (8*sizeof(long)-1);

    // __builtin_clzl 返回二进制中从最高位开始连续的 0 的个数
    // 8*sizeof(long) 返回 long 类型的二进制位数
    // 因此这个表达式返回的是 size-1 的有效位数
    return 8*sizeof(long) - __builtin_clzl(size-1);
}

如果满足了扩容条件,Redis 会最终调用到 dictRehash 函数来扩容 hashtable

我们前面看到,hahstable 有两个 ht_table,一个用于存储数据,一个用于扩容。扩容时,Redis 会将数据从 ht_table[0] 迁移到 ht_table[1]

由于一次性移动所有数据可能会消耗大量计算资源,Redis 采用了渐进式扩容。具体来说,它使用 rehashidx 来记录当前迁移到的位置,每当对字典进行一次 CRUD 操作后,就同时迁移 n 个桶的数据。完成一次的迁移后,rehashidx 会自增至下一个要迁移的桶。

每次迁移多少个桶呢?Redis 规定每次迁移 \(100\) 个桶。但如果迁移花费的时间超过了一个阈值,Redis 会中止迁移。

另外需要注意的是,如果迁移的桶为空,Redis 会跳过这个桶,不把它计算在迁移的桶数中。

展开代码
int dictRehash(dict *d, int n) {
    /* n 为每次迁移的桶数 */
    // 空桶访问次数限制
    int empty_visits = n*10;
    // 两张哈希表的大小
    unsigned long s0 = DICTHT_SIZE(d->ht_size_exp[0]);
    unsigned long s1 = DICTHT_SIZE(d->ht_size_exp[1]);

    /* ... */

    // 执行迁移,直至迁移 n 个桶或者没东西可迁移了
    while(n-- && d->ht_used[0] != 0) {
        assert(DICTHT_SIZE(d->ht_size_exp[0]) > (unsigned long)d->rehashidx);

        // 跳过空桶
        while(d->ht_table[0][d->rehashidx] == NULL) {
            d->rehashidx++;
            if (--empty_visits == 0) return 1;
        }
        
        // 迁移桶中的数据
        rehashEntriesInBucketAtIndex(d, d->rehashidx);
        // 迁移完成后,将 rehashidx 加一,指向下一个桶
        d->rehashidx++;
    }

    // 检查是否迁移完成
    return !dictCheckRehashingCompleted(d);
}

当全部数据迁移完成后,Redis 会将扩容后的 hashtable 转正。也就是说,将 ht_table[0]ht_table[1] 的指针互换,释放 ht_table[0] 的内存,并将 rehashidx 重置为 \(-1\)。

展开代码
static int dictCheckRehashingCompleted(dict *d) {
    // 如果字典里还有元素,说明迁移还没完成
    if (d->ht_used[0] != 0) return 0;
    
    // 释放旧哈希表
    if (d->type->rehashingCompleted) d->type->rehashingCompleted(d);
    zfree(d->ht_table[0]);
    
    // 将扩容后的哈希表转正
    d->ht_table[0] = d->ht_table[1];
    d->ht_used[0] = d->ht_used[1];
    d->ht_size_exp[0] = d->ht_size_exp[1];
    // 重置新的用来扩容的哈希表
    _dictReset(d, 1);
    // 重置 rehashidx,-1 表示没有在迁移
    d->rehashidx = -1;
    return 1;
}

如果哈希表太空了,Redis 也会缩容

展开代码
int dictShrinkIfNeeded(dict *d) {
    // 如果正在扩容/缩容,则别缩容了
    if (dictIsRehashing(d)) return DICT_OK;
    
    // 如果哈希表大小小于等于初始大小,别缩容了
    if (DICTHT_SIZE(d->ht_size_exp[0]) <= DICT_HT_INITIAL_SIZE) return DICT_OK;

    // 如果负载因子小于 1:8 且允许缩容,或者小于 1:32 但是不允许缩容
    if ((dict_can_resize == DICT_RESIZE_ENABLE &&
         d->ht_used[0] * HASHTABLE_MIN_FILL <= DICTHT_SIZE(d->ht_size_exp[0])) ||
        (dict_can_resize != DICT_RESIZE_FORBID &&
         d->ht_used[0] * HASHTABLE_MIN_FILL * dict_force_resize_ratio <= DICTHT_SIZE(d->ht_size_exp[0])))
    {
        if (dictTypeResizeAllowed(d, d->ht_used[0]))
            dictShrink(d, d->ht_used[0]);
        return DICT_OK;
    }
    return DICT_ERR;
}

hashtable 缩容的条件有两种:

  1. 如果 hashtable 的负载因子小于 \(1:8\),且允许缩容,则进行缩容。
  2. 如果 hashtable 的负载因子小于 \(1:32\),但是不允许缩容,则进行缩容。

后面这个 \(1:32\) 可以通过 dict_force_resize_ratio 来调整,默认为 \(4\),在实际运行的时候会乘上 HASHTABLE_MIN_FILL(默认为 \(8\))。

总结

最后再提一嘴无关紧要的,Redis 的哈希函数使用的是 siphash,这是一种安全的哈希函数。

总体来讲,Redis 中 dict 的亮点在于:

  1. 使用链地址法解决冲突,避免了哈希冲突;
  2. 使用渐进式扩容,避免了一次性分配大量内存。

intset

存储结构

如果 SET 中的元素都是整数,Redis 会使用有序且不重复intset 来存储:

typedef struct intset {
    // 编码类型
    uint32_t encoding;
    // 长度,即 contents 中元素的个数
    uint32_t length;
    // 内容,这里的 int8_t 不会产生任何实际效果
    int8_t contents[];
} intset;

Redis 会根据元素值的大小选择合适的编码。intset编码有三种:

  • INTSET_ENC_INT16:16 位整数
  • INTSET_ENC_INT32:32 位整数
  • INTSET_ENC_INT64:64 位整数
展开代码
#define INTSET_ENC_INT16 (sizeof(int16_t))
#define INTSET_ENC_INT32 (sizeof(int32_t))
#define INTSET_ENC_INT64 (sizeof(int64_t))

插入和升级

当需要添加新元素时,Redis 先在 intset 中搜索该元素:

  • 如果找到了,就不再添加
  • 如果没有找到,就将元素添加到 intset

添加的过程是通过将大于要添加的值的元素后移一个位置,然后将新元素插入到空出的位置,类似插入排序。这个过程在 intsetAdd 函数中实现。

展开代码
intset *intsetAdd(intset *is, int64_t value, uint8_t *success) {
    // 获取要添加的值的编码类型
    uint8_t valenc = _intsetValueEncoding(value);

    uint32_t pos;
    if (success) *success = 1;

    // 如果要添加的值的编码类型大于当前集合的编码类型
    if (valenc > intrev32ifbe(is->encoding)) {
        // 升级集合的编码类型并添加值
        return intsetUpgradeAndAdd(is,value);
    } else {
        // 在集合中搜索该值,如果找到则原样返回并通知不成功
        if (intsetSearch(is,value,&pos)) {
            if (success) *success = 0;
            return is;
        }

        // 调整集合长度为:原长度+1
        is = intsetResize(is,intrev32ifbe(is->length)+1);
        // 如果新值不是最大的那个,则移动尾部元素给它腾位置
        if (pos < intrev32ifbe(is->length)) intsetMoveTail(is,pos,pos+1);
    }

    // 在腾出的位置放入新值
    _intsetSet(is,pos,value);
    // 更新集合长度为:原长度+1
    is->length = intrev32ifbe(intrev32ifbe(is->length)+1);
    return is;
}

intrev32ifbe 被用来将内容转换为小端存储。

如果元素的大小超过了当前编码的范围,Redis 会将 intset 的编码升级为更大的编码。intset 的编码升级是通过 intsetUpgradeAndAdd 函数实现的。它分为三步操作:

  • 先设置新的 intset 大小为 (原长度+1)*新编码类型的大小
  • 然后将现有元素重新编码并复制到新集合
  • 最后设置新值并更新集合长度
展开代码
static intset *intsetUpgradeAndAdd(intset *is, int64_t value) {
    // 获取当前集合的编码类型
    uint8_t curenc = intrev32ifbe(is->encoding);
    // 获取要添加的值的编码类型
    uint8_t newenc = _intsetValueEncoding(value);
    // 获取当前集合的长度
    int length = intrev32ifbe(is->length);
    // 获取要添加的值的位置(如果元素<0,则在头部添加;否则在尾部添加)
    int prepend = value < 0 ? 1 : 0;

    // 设置新的编码类型
    is->encoding = intrev32ifbe(newenc);
    // 重新调整集合大小为:(原长度+1)*新编码类型的大小
    is = intsetResize(is,intrev32ifbe(is->length)+1);

    // 将现有元素重新编码并复制到新集合
    while(length--)
        // 三个参数分别为集合、位置和值
        _intsetSet(is,length+prepend,_intsetGetEncoded(is,length,curenc));

    // 设置新值
    if (prepend)
        _intsetSet(is,0,value);
    else
        _intsetSet(is,intrev32ifbe(is->length),value);
    // 更新集合长度
    is->length = intrev32ifbe(intrev32ifbe(is->length)+1);
    return is;
}

由以上内容可以发现,intset 维护有序的方法实际上就是最简单的插入排序。插入的时间复杂度是 \(O(n)\),而查找的时间复杂度是 \(O(\log{n})\)。为了节省空间,数组大小始终处于刚刚能放下所有元素的状态。

然而,intset 只会升级编码,不会降级编码,如果删除了元素,intset 的编码只会保持不变。

总结

intset 的特点主要有以下几点:

  1. 以最小的空间存储整数集合;
  2. 插入时的时间复杂度较高;
  3. 通过升级编码来适应更大的整数。
  4. 只会升级编码,不会降级编码。

skiplist

存储结构

我们先讲 skiplistskiplist 是一种有序数据结构,它通过多级索引来加速查找。

跳跃表

具体来讲,skiplist 由多个层级组成,每个层级都是一个有序链表。每个节点都包含了一个指向下一个节点的指针,以及一个指向下一层的指针。这样,我们可以通过上层的指针快速定位到下层的节点。

skiplist 的最底层是一个普通的有序链表。每个较高层都充当下面列表的快速通道。插入时,层 \(i\) 中的元素以某个固定概率 \(p\) 出现在层 \(i+1\) 中( \(p\) 的两个常用值是 \(\frac{1}{2}\) 或 \(\frac{1}{4}\))。平均而言,每个元素出现在 \(\frac{1}{1-p}\) 个列表中,并且最高元素(通常是跳跃列表前面的特殊头元素)出现在所有列表中。跳跃列表包含 \(\log_{1/p}{n}\) 个列表。

skiplist 的搜索从最高层列表的第一个元素开始,横向前进,直到当前元素大于或等于目标值:

  • 如果当前元素等于目标值,则已找到
  • 如果当前元素大于目标值,或者搜索到达链接列表的末尾,则返回前一个元素并垂直下降到下一层列表后重复该过程

每个链接列表中的预期步数最多为 \(\frac{1}{p}\),因此,搜索的总预期成本为 \(\frac{1}{p}\log_{1/p}{n}\),即 \(O\left(\log{n}\right)\),其中 \(p\) 为常数。通过选择不同的 \(p\) 值,可以实现搜索成本与存储成本的平衡。

在 Redis 中,skiplist 最多有 32 层,可以存储约 \(2^{32}\) 个节点。同时,它取了 \(p = \frac{1}{4}\),这是一个经验值。

展开代码

其实现在 zslRandomLevel

int zslRandomLevel(void) {
    static const int threshold = ZSKIPLIST_P*RAND_MAX;
    int level = 1;
    while (random() < threshold)
        level += 1;
    return (level<ZSKIPLIST_MAXLEVEL) ? level : ZSKIPLIST_MAXLEVEL;
}

skiplist 被定义在 server.h 中:

typedef struct zskiplistNode {
    // 成员对象
    sds ele;
    // 分值
    double score;
    // 后退指针
    struct zskiplistNode *backward;
    // 层级数组
    struct zskiplistLevel {
        // 前进指针
        struct zskiplistNode *forward;
        // 跨度
        unsigned long span;
    } level[];
} zskiplistNode;

typedef struct zskiplist {
    // 头尾指针
    struct zskiplistNode *header, *tail;
    // 节点数量
    unsigned long length;
    // 最大层数
    int level;
} zskiplist;

typedef struct zset {
    // 字典用于存储元素和分数的映射
    dict *dict;
    // 跳跃表用于排序
    zskiplist *zsl;
} zset;

后退指针用于快速定位到前一个节点,这在我们之前讲到的遍历的过程中很有用。在层级数组中,每个元素包含了前进指针以从后向前逐个访问节点,以及这两个节点之间的跨度。跨度被用于计算某个节点的排名,这样只需要像正常遍历一样即可得到。

插入

skiplist 的插入操作通过 zslInsert 函数实现。

由于 skiplist 存储了跨度这一信息,所以插入操作可能需要更新多个节点的跨度。在插入节点时:

  • 首先从上至下搜索插入的位置,当需要往下走时,记录下这个节点,后面需要更新它的跨度
  • 接着,随机生成一个层数
    • 如果这个层数大于当前表的最高层数,我们就需要更新整个表
    • 否则直接插入节点
  • 最后,更新之前记录的节点的跨度,并更新后退指针
展开代码
zskiplistNode *zslInsert(zskiplist *zsl, double score, sds ele) {
    // update 数组用于存储每一层需要更新的节点
    zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
    // rank 数组用于存储每一层跨越的节点数
    unsigned long rank[ZSKIPLIST_MAXLEVEL];
    int i, level;

    serverAssert(!isnan(score));
    x = zsl->header;

    for (i = zsl->level-1; i >= 0; i--) {
        // 新到第 i 层,当前跨度为上一层的跨度
        rank[i] = i == (zsl->level-1) ? 0 : rank[i+1];
        while (x->level[i].forward &&
                (x->level[i].forward->score < score ||
                    (x->level[i].forward->score == score &&
                    sdscmp(x->level[i].forward->ele,ele) < 0)))
        {
            // 如果还没找到要向下走的节点,就更新跨度并继续向前走
            rank[i] += x->level[i].span;
            x = x->level[i].forward;
        }
        // 走到要向下时,存储当前节点,后面需要更新它的跨度
        update[i] = x;
    }
    
    // 获取一个随机层数
    level = zslRandomLevel();

    if (level > zsl->level) {
        // 如果随机层数大于当前总层数,需要更新整张表
        for (i = zsl->level; i < level; i++) {
            rank[i] = 0;
            update[i] = zsl->header;
            update[i]->level[i].span = zsl->length;
        }
        zsl->level = level;
    }

    x = zslCreateNode(level,score,ele);
    for (i = 0; i < level; i++) {
        // 插入节点
        x->level[i].forward = update[i]->level[i].forward;
        update[i]->level[i].forward = x;

        // 更新跨度
        x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]);
        update[i]->level[i].span = (rank[0] - rank[i]) + 1;
    }

    // 有些层没有访问到,页需要更新跨度
    for (i = level; i < zsl->level; i++) {
        update[i]->level[i].span++;
    }

    // 设置后退指针
    x->backward = (update[0] == zsl->header) ? NULL : update[0];
    if (x->level[0].forward)
        x->level[0].forward->backward = x;
    else
        zsl->tail = x;
    // 节点数加一
    zsl->length++;
    return x;
}

总结

显然,skiplist 的功能平衡树也能完成。相比之下,skiplist 的特点主要有以下几点:

  1. 指针比平衡树少,节省内存;
  2. 算法实现难度低,便于维护;
  3. 但是 skiplist 的缓存命中率不如平衡树高。

我个人看下来,对于 Redis 这种对速度和内存要求都很高的工具来讲,skiplist 应该是不如平衡树的,但是 Redis 选择了 skiplist,我觉得最大的可能性还是社区的惯性。

ziplist

存储结构

ziplist 是一种紧凑的数据结构,可以存储字符串和整数。

在普通链表的实现下,每个节点都会有一个向前的指针和一个向后的指针,这样会浪费大量的空间。ziplist 通过则省去了这些指针,把所有数据放在一个紧凑的结构中,通过记录前一个节点的长度和当前节点的长度来实现链表上的移动。

其单个条目的定义如下:

typedef struct {
    // 当存储的是字符串时,sval 指向字符串
    unsigned char *sval;
    // slen 为上一个条目的长度
    unsigned int slen;
    // 当存储的是整数时,lval 为整数值,sval 为 NULL
    long long lval;
} ziplistEntry;

zlentry定义如下:

typedef struct zlentry {
    // 编码前一个条目长度所使用的字节数
    unsigned int prevrawlensize;
    // 前一个条目的长度
    unsigned int prevrawlen;
    // 编码当前条目长度所使用的字节数
    unsigned int lensize;
    // 当前条目的长度
    unsigned int len;
    // prevrawlensize + lensize
    unsigned int headersize;
    // 条目的编码方式,ZIP_STR_* 或 ZIP_INT_*
    unsigned char encoding;
    // 指向条目的指针
    unsigned char *p;
} zlentry;

为了将 zlentry 合并到一起,ziplist 整体在内存中按照以下方式存储。

bytes tail offset len entry entry entry end
32 bits 32 bits 16 bits       8 bits
  • bytesziplist 的总字节数
  • tail offset:到达最后一个 entry 的偏移量
  • lenziplist 中的元素个数
  • entryziplist 中的具体元素
  • end:结束标志
展开代码
unsigned char *ziplistNew(void) {
    // 分配所需的初始字节数,包括头部和结尾标志的大小
    unsigned int bytes = ZIPLIST_HEADER_SIZE+ZIPLIST_END_SIZE;
    unsigned char *zl = zmalloc(bytes);
    // 设置总字节数
    ZIPLIST_BYTES(zl) = intrev32ifbe(bytes);
    // 设置表尾偏移量,初始值为 32*2+16
    ZIPLIST_TAIL_OFFSET(zl) = intrev32ifbe(ZIPLIST_HEADER_SIZE);
    // 设置长度为 0
    ZIPLIST_LENGTH(zl) = 0;
    // 设置结束标志,固定为 255
    zl[bytes-1] = ZIP_END;
    return zl;
}

每个 entry 在内存中形如:

prevlen encode & len content
8/40 bits 8/16/40 bits  
  • prevlen:前一个条目的长度,用于向前移动
  • encode & len:编码和长度,用于向后移动
  • content:条目的内容

encode & len 很容易得到,它们两个确定了 content 的长度。Redis 将这两者编码在了一起。根据元素类型和长度的不同,有不同的计算方法,来确定放在内存中的值:

  • 如果是字符串
    • 如果长度 \(\leq 63\),为 00 + 6 位长度,共占 1 个字节
    • 如果长度 \(\leq 16383\),为 01 + 14 位长度,共占 2 个字节
    • 否则为 10xxxxxx + 32 位长度,共占 5 个字节
  • 如果是整数
    • 如果是 int8_t,为 0xfe + 8 位整数,共占 2 个字节
    • 如果是 int16_t,为 0xc0 + 16 位整数,共占 3 个字节
    • 如果是 int24_t,为 0xf0 + 24 位整数,共占 4 个字节
    • 如果是 int32_t,为 0xd0 + 32 位整数,共占 5 个字节
    • 如果是 int64_t,为 0xe0 + 64 位整数,共占 9 个字节
展开代码
unsigned int zipStoreEntryEncoding(unsigned char *p, unsigned char encoding, unsigned int rawlen) {
    unsigned char len = 1, buf[5];

    if (ZIP_IS_STR(encoding)) {
        // 如果是字符串
        if (rawlen <= 0x3f) {
            // 如果长度 <= 63,存储长度需要 6 位,前面再加上 00
            if (!p) return len;
            buf[0] = ZIP_STR_06B | rawlen;
        } else if (rawlen <= 0x3fff) {
            // 如果长度 <= 16383,存储长度需要 14 位,前面再加上 01
            len += 1;
            if (!p) return len;
            buf[0] = ZIP_STR_14B | ((rawlen >> 8) & 0x3f);
            buf[1] = rawlen & 0xff;
        } else {
            // 否则存储长度需要 40 位,其中前 8 位为 10000000
            len += 4;
            if (!p) return len;
            buf[0] = ZIP_STR_32B;
            buf[1] = (rawlen >> 24) & 0xff;
            buf[2] = (rawlen >> 16) & 0xff;
            buf[3] = (rawlen >> 8) & 0xff;
            buf[4] = rawlen & 0xff;
        }
    } else {
        // 如果是整数
        if (!p) return len;
        // 预先定义好的 8 位不同类型的编码,其中前 2 位为 11
        buf[0] = encoding;
    }

    memcpy(p,buf,len);
    return len;
}

下文如无特殊说明,我们说的“当前条目长度”指的是 encodelen 这个整体。

prevrawlen 则是在添加元素时更新的。其计算方法如下:

  • 如果前一个条目的长度 \(< 254\),直接存储长度,共占 1 个字节
  • 否则,第一个字节为 0xfe,后面 4 个字节存储长度,共占 5 个字节
展开代码
unsigned int zipStorePrevEntryLength(unsigned char *p, unsigned int len) {
    if (p == NULL) {
        // 如果 p 为 NULL,只返回存储长度所需的字节数(1 或 5)
        return (len < ZIP_BIG_PREVLEN) ? 1 : sizeof(uint32_t) + 1;
    } else {
        // 如果 p 不为 NULL,返回存储长度所需的字节数,并将长度存储到 p 中
        if (len < ZIP_BIG_PREVLEN) {
            // 如果长度小于 254,直接存储
            p[0] = len;
            return 1;
        } else {
            // 否则调用 zipStorePrevEntryLengthLarge
            return zipStorePrevEntryLengthLarge(p,len);
        }
    }
}

int zipStorePrevEntryLengthLarge(unsigned char *p, unsigned int len) {
    uint32_t u32;
    if (p != NULL) {
        // 第一个字节设置为 254
        p[0] = ZIP_BIG_PREVLEN;
        // 剩余 4 个字节存储长度
        u32 = len;
        memcpy(p+1,&u32,sizeof(u32));
        memrev32ifbe(p+1);
    }
    // 存储长度所需的字节数为 5
    return 1 + sizeof(uint32_t);
}

索引

ziplist 记录上一个条目的长度好处是正向反向遍历都很方便。正向遍历时,只需要根据当前条目长度向后移动即可。反向遍历时,只需要根据上一个条目的长度向前移动即可。

利用这个特性,我们可以看看索引的实现

  • 如果索引为正数,从前往后遍历,指针首先指向第一个条目
    • 读取当前条目的长度
    • 将指针向后移动当前条目的长度
  • 如果索引为负数,从后往前遍历,指针首先指向最后一个条目
    • 读取前一个条目的长度
    • 将指针向前移动前一个条目的长度
展开代码
unsigned char *ziplistIndex(unsigned char *zl, int index) {
    unsigned char *p;
    unsigned int prevlensize, prevlen = 0;
    // 总字节数
    size_t zlbytes = intrev32ifbe(ZIPLIST_BYTES(zl));
    if (index < 0) {
        // 如果索引为负数,从表尾开始向前遍历
        // 由于 -1 是最后一个条目,因此需要向前移动 -index-1 个条目
        index = (-index)-1;
        // 获取表尾条目的地址
        p = ZIPLIST_ENTRY_TAIL(zl);
        if (p[0] != ZIP_END) {
            // 解码前一个条目的长度字节数
            ZIP_DECODE_PREVLENSIZE(p, prevlensize);
            // 确保前一个条目的长度字节数在范围内
            assert(p + prevlensize < zl + zlbytes - ZIPLIST_END_SIZE);
            // 解码前一个条目的长度
            ZIP_DECODE_PREVLEN(p, prevlensize, prevlen);
            while (prevlen > 0 && index--) {
                // 移动到前一个条目
                p -= prevlen;
                assert(p >= zl + ZIPLIST_HEADER_SIZE && p < zl + zlbytes - ZIPLIST_END_SIZE);
                // 解码前一个条目的长度
                ZIP_DECODE_PREVLEN(p, prevlensize, prevlen);
            }
        }
    } else {
        // 如果索引为正数,从表头开始向后遍历
        p = ZIPLIST_ENTRY_HEAD(zl);
        while (index--) {
            // 移动到下一个条目
            p += zipRawEntryLengthSafe(zl, zlbytes, p);
            if (p[0] == ZIP_END)
                break;
        }
    }
    if (p[0] == ZIP_END || index > 0)
        return NULL;
    zipAssertValidEntry(zl, zlbytes, p);
    return p;
}

可以看到,通过记录上一个条目的长度,ziplist 在实现了双向遍历的功能。

插入和连锁更新

我们来看插入操作的实现

插入操作需要根据插入位置,将后续的条目向后移动。然而,由于下一个条目的 prevlen 此时要记录当前条目的长度,其所占据的字节数可能会发生变化。因此,插入操作的流程是这样的:

  1. 指针指向插入位置的前一个条目
  2. 得到前一个条目的长度,由此可以确定要插入条目的 prevlen
  3. 根据条目的内容计算当前条目的长度,至此,要插入的条目内容和长度都确定了
  4. 得到插入位置的后一个条目原本的 prevlen,检查一下更新它的值后,所需的字节数是否发生变化
  5. 结合要插入条目的长度和后一个条目的 prevlen 所需字节数的变化,计算出整体需要移动的字节数
  6. 将插入位置后的条目向后移动
  7. 更新后一个条目的 prevlen
  8. 更新 tail offset
  9. 插入当前条目
展开代码
unsigned char *__ziplistInsert(unsigned char *zl, unsigned char *p, unsigned char *s, unsigned int slen) {
    /* 在 zl 中的 p 位置插入一个新的条目,内容为 s,长度为 slen */
    // curlen 为当前 zl 的总字节数
    size_t curlen = intrev32ifbe(ZIPLIST_BYTES(zl)), reqlen, newlen;
    unsigned int prevlensize, prevlen = 0;
    size_t offset;
    int nextdiff = 0;
    unsigned char encoding = 0;
    long long value = 123456789;
    zlentry tail;

    if (p[0] != ZIP_END) {
        // 如果插入位置不是表尾,获取前一个条目的长度
        ZIP_DECODE_PREVLEN(p, prevlensize, prevlen);
    } else {
        // 如果插入位置是表尾,获取尾部元素的实际地址
        unsigned char *ptail = ZIPLIST_ENTRY_TAIL(zl);
        if (ptail[0] != ZIP_END) {
            // 获取尾部元素的长度
            prevlen = zipRawEntryLengthSafe(zl, curlen, ptail);
        }
    }

    if (zipTryEncoding(s,slen,&value,&encoding)) {
        // 如果是整数,转换为整数编码
        reqlen = zipIntSize(encoding);
    } else {
        // 如果是整数,计算所需的字节数
        reqlen = slen;
    }

    // 计算当前条目开头所需的总字节数
    // 计算存储前一个条目长度所需的字节数
    reqlen += zipStorePrevEntryLength(NULL,prevlen);
    // 计算存储当前条目长度所需的字节数
    reqlen += zipStoreEntryEncoding(NULL,encoding,slen);

    int forcelarge = 0;
    // 如果插入位置不是表尾,计算当前条目长度 - 下一个条目 prevlen 占据空间,可能的值为 -4、0、4
    nextdiff = (p[0] != ZIP_END) ? zipPrevLenByteDiff(p,reqlen) : 0;
    if (nextdiff == -4 && reqlen < 4) {
        // 如果下一个条目的 prevlen 占据了 5 字节,但当前条目长度为 1 字节,需要强制使用 5 字节
        nextdiff = 0;
        forcelarge = 1;
    }
    // 所以,就是要么不变,要么加 4

    // 存储偏移量,因为 realloc 可能会改变 zl 的地址
    offset = p-zl;
    // 计算新的总字节数
    newlen = curlen+reqlen+nextdiff;
    // 重新分配空间
    zl = ziplistResize(zl,newlen);
    // 重新获取插入位置
    p = zl+offset;

    if (p[0] != ZIP_END) {
        // 如果插入位置不是表尾,需要移动后续条目
        // 移动的源地址为插入位置 - nextdiff
        // 移动的目标地址为插入位置 + 当前条目所需的总字节数
        // 移动的内容长度为当前 zl 的总字节数 - 插入位置的偏移量 - 1 + nextdiff
        memmove(p+reqlen,p-nextdiff,curlen-offset-1+nextdiff);

        if (forcelarge)
            // 如果下一个条目记录的 prevlen 所需的字节数从 1 变为了 5,需要重新计算并存储
            zipStorePrevEntryLengthLarge(p+reqlen,reqlen);
        else
            // 否则按照正常流程计算并存储
            zipStorePrevEntryLength(p+reqlen,reqlen);

        // 更新 tail offset 为原来的值 + 当前条目所需的总字节数
        ZIPLIST_TAIL_OFFSET(zl) =
            intrev32ifbe(intrev32ifbe(ZIPLIST_TAIL_OFFSET(zl))+reqlen);

        assert(zipEntrySafe(zl, newlen, p+reqlen, &tail, 1));
        if (p[reqlen+tail.headersize+tail.len] != ZIP_END) {
            // 如果当前表尾位置不是 ZIP_END(即 forcelarge 了),需要再加上 nextdiff
            ZIPLIST_TAIL_OFFSET(zl) =
                intrev32ifbe(intrev32ifbe(ZIPLIST_TAIL_OFFSET(zl))+nextdiff);
        }
    } else {
        // 如果插入位置是表尾,只需要更新 tail offset 为新的位置
        ZIPLIST_TAIL_OFFSET(zl) = intrev32ifbe(p-zl);
    }

    // 如果 nextdiff 不为 0,需要进行连锁更新
    if (nextdiff != 0) {
        offset = p-zl;
        zl = __ziplistCascadeUpdate(zl,p+reqlen);
        p = zl+offset;
    }

    // 插入新条目
    p += zipStorePrevEntryLength(p,prevlen);
    p += zipStoreEntryEncoding(p,encoding,slen);
    if (ZIP_IS_STR(encoding)) {
        memcpy(p,s,slen);
    } else {
        zipSaveInteger(p,value,encoding);
    }
    ZIPLIST_INCR_LENGTH(zl,1);
    return zl;
}

然而,后一个条目 prevlen 所占空间的变化可能导致连锁更新

现在考虑这样一种情况:本来条目 B 的总长度为 253,现在在它前面插入了一个长度为 10086 的条目 A,导致 B 的 prevlen 占据的空间由 1 变为 5,B 占据的总空间由 253 变为了 257。

这导致 B 的下个条目 C 的 prevlen 占据的空间也要从 1 变为 5,它也要进行更新,连锁更新就发生了。

在最坏情况下,这种更新会一直传递下去,直到表尾,导致插入操作的时间复杂度会变为 \(O(n^2)\)。

展开代码

连锁更新代码

unsigned char *__ziplistCascadeUpdate(unsigned char *zl, unsigned char *p) {
    zlentry cur;
    size_t prevlen, prevlensize, prevoffset;
    size_t firstentrylen;
    size_t rawlen, curlen = intrev32ifbe(ZIPLIST_BYTES(zl));
    size_t extra = 0, cnt = 0, offset;
    size_t delta = 4;
    unsigned char *tail = zl + intrev32ifbe(ZIPLIST_TAIL_OFFSET(zl));

    // 空表直接返回
    if (p[0] == ZIP_END) return zl;

    // 获取当前条目信息
    zipEntry(p, &cur);
    firstentrylen = prevlen = cur.headersize + cur.len;
    prevlensize = zipStorePrevEntryLength(NULL, prevlen);
    prevoffset = p - zl;
    p += prevlen;

    // 遍历后续条目,检查是否需要连锁更新
    while (p[0] != ZIP_END) {
        assert(zipEntrySafe(zl, curlen, p, &cur, 0));

        // 如果当前条目的 prevlen 已经正确,则无需更新
        if (cur.prevrawlen == prevlen) break;

        // 如果当前条目的 prevlen 大小足够存储新的 prevlen
        if (cur.prevrawlensize >= prevlensize) {
            if (cur.prevrawlensize == prevlensize) {
                zipStorePrevEntryLength(p, prevlen);
            } else {
                zipStorePrevEntryLengthLarge(p, prevlen);
            }
            break;
        }

        // 确保 prevlen 的一致性
        assert(cur.prevrawlen == 0 || cur.prevrawlen + delta == prevlen);

        // 更新 prevlen 和 prevlensize
        rawlen = cur.headersize + cur.len;
        prevlen = rawlen + delta; 
        prevlensize = zipStorePrevEntryLength(NULL, prevlen);
        prevoffset = p - zl;
        p += rawlen;
        extra += delta;
        cnt++;
    }

    // 如果没有额外空间需求,直接返回
    if (extra == 0) return zl;

    // 更新表尾偏移量
    if (tail == zl + prevoffset) {
        if (extra - delta != 0) {
            ZIPLIST_TAIL_OFFSET(zl) =
                intrev32ifbe(intrev32ifbe(ZIPLIST_TAIL_OFFSET(zl))+extra-delta);
        }
    } else {
        ZIPLIST_TAIL_OFFSET(zl) =
            intrev32ifbe(intrev32ifbe(ZIPLIST_TAIL_OFFSET(zl))+extra);
    }

    // 调整 ziplist 大小
    offset = p - zl;
    zl = ziplistResize(zl, curlen + extra);
    p = zl + offset;
    memmove(p + extra, p, curlen - offset - 1);
    p += extra;

    // 处理连锁更新
    while (cnt) {
        zipEntry(zl + prevoffset, &cur);
        rawlen = cur.headersize + cur.len;
        memmove(p - (rawlen - cur.prevrawlensize), 
                zl + prevoffset + cur.prevrawlensize, 
                rawlen - cur.prevrawlensize);
        p -= (rawlen + delta);
        if (cur.prevrawlen == 0) {
            zipStorePrevEntryLength(p, firstentrylen);
        } else {
            zipStorePrevEntryLength(p, cur.prevrawlen+delta);
        }
        prevoffset -= cur.prevrawlen;
        cnt--;
    }
    return zl;
}

尽管连锁更新发生的概率很小,但是一旦发生,就会导致大量的数据移动,从而影响性能。

总结

其它的操作,如删除、查找等,都是类似的。这里不再赘述。

ziplist 的优缺点也很明显:

  • 存储结构紧凑,节省空间;
  • 通过记录前一个节点的长度,实现了双向遍历;
  • 插入、删除操作可能会导致连锁更新,影响性能。

listpack

存储结构

我们反思一下 ziplist 的问题。ziplist 的连锁更新问题是由于每个节点都包含了前一个节点的长度,而前一个节点长度所占据的空间大小可能会发生变化。

要想彻底解决这个问题,就不能记录前一个节点的长度————于是 listpack 诞生了。

listapck 单个条目的定义ziplist 略有区别:

typedef struct {
    // 当存储的是字符串时,sval 指向字符串
    unsigned char *sval;
    // slen 为当前字符串长度
    uint32_t slen;
    // 当存储的是整数时,lval 为整数值,sval 为 NULL
    long long lval;
} listpackEntry;

一个 listpack 在内存中的结构如下:

bytes len entry entry entry end
32 bits 16 bits       8 bits
  • byteslistpack 的总字节数
  • lenlistpack 中的元素个数
  • entrylistpack 中的具体元素
  • end:结束标志
展开代码

我们直接来看 listpack初始化

unsigned char *lpNew(size_t capacity) {
    // 预先分配至少 6+1 个字节的空间
    unsigned char *lp = lp_malloc(capacity > LP_HDR_SIZE+1 ? capacity : LP_HDR_SIZE+1);
    if (lp == NULL) return NULL;
    // 将 32 位整数 6+1 存储到 lp 中,表示当前的总字节数
    lpSetTotalBytes(lp,LP_HDR_SIZE+1);
    // 将 16 位整数 0 接着存储到后面,表示当前的条目数
    lpSetNumElements(lp,0);
    // 后面再填充 8 位的 0xFF
    lp[LP_HDR_SIZE] = LP_EOF;
    return lp;
}

在内存中,每个 entry 的结构如下:

encode & len content
8/16/40 bits  
  • encode & len:编码和长度,用于向后移动
  • content:条目的内容

尽管 Redis 代码中定义了 listpackEntry,但是实际上并没有使用。

可以看到,listpack 没有记录前一个节点的长度,因此不会出现连锁更新的问题。

索引

对于正向遍历,listpackziplist 没有太大区别。

但是问题来了,对于反向遍历,listpack 由于不记录前一个节点的长度,怎么办呢?解决这个事情也不难:

  • 将指针指向前一个节点的最后一个字节
  • 计算得到前一个节点的长度
  • 将指针移动到前一个节点的第一个字节
展开代码
unsigned char *lpPrev(unsigned char *lp, unsigned char *p) {
    assert(p);
    if (p-lp == LP_HDR_SIZE) return NULL;
    // 将 p 指向前一个条目的最后一个字节
    p--;
    // 计算前一个条目的长度
    uint64_t prevlen = lpDecodeBacklen(p);
    prevlen += lpEncodeBacklenBytes(prevlen);
    // 移动到前一个条目的第一个字节
    p -= prevlen-1;
    lpAssertValidEntry(lp, lpBytes(lp), p);
    return p;
}

这里的关键在于如何计算前一个条目长度listpack 采用了一种变长编码的方式:

  • 每个字节低 7 位存储数据
  • 每个字节高 1 位指示是否为最后一个字节
    • 如果为 0,说明是最后一个字节
    • 如果为 1,说明后面还有字节

也就是说,listpack 每个节点在内存中类似 0xxxxxxx 1xxxxxxx 1xxxxxxx ...

展开代码
static inline uint64_t lpDecodeBacklen(unsigned char *p) {
    uint64_t val = 0;
    uint64_t shift = 0;
    do {
        // 获取当前字节的低 7 位,存入 val
        val |= (uint64_t)(p[0] & 127) << shift;

        // 如果当前字节的高 1 位为 0,说明是最后一个字节
        if (!(p[0] & 128)) break;

        // 否则,继续读取下一个字节
        shift += 7;
        p--;

        // 如果 shift 大于 28,说明超过了 5 个字节,报错
        if (shift > 28) return UINT64_MAX;
    } while(1);
    return val;
}

插入、删除和替换

listpack 的插入、删除和替换操作都写在了同一个函数中。

例如插入,只需要:

  • 计算插入位置的偏移量
  • 计算新的总字节数
  • 重新分配空间
  • 将插入位置后的条目向后移动
  • 插入当前条目

这个过程和插入排序几乎一模一样,不再发生连锁更新。

展开代码
unsigned char *lpInsert(unsigned char *lp, unsigned char *elestr, unsigned char *eleint,
                        uint32_t size, unsigned char *p, int where, unsigned char **newp)
{
    /* elestr/eleint 为要插入或替换的内容,size 为编码后的长度
     * where 指示要新元素的位置,包括 LP_BEFORE / LP_AFTER / LP_REPLACE
     * newp 返回操作后元素的位置 */

    // 用于存储编码后的整数
    unsigned char intenc[LP_MAX_INT_ENCODING_LEN];
     // 用于存储前一个元素的长度
    unsigned char backlen[LP_MAX_BACKLEN_SIZE];

    // 编码所需的字节数
    uint64_t enclen;
    // 判断是否为删除操作
    int delete = (elestr == NULL && eleint == NULL);

    // 如果是删除操作,强制将 where 设置为 LP_REPLACE
    if (delete) where = LP_REPLACE;

    // 如果需要在当前元素之后插入,跳到下一个元素并将位置参数设置为在之前插入
    if (where == LP_AFTER) {
        p = lpSkip(p);
        where = LP_BEFORE;
        ASSERT_INTEGRITY(lp, p);
    }

    // 计算插入位置的偏移量,方便在重新分配空间后定位
    unsigned long poff = p-lp;

    int enctype;
    if (elestr) {
        // 如果要插入的是字符串,计算编码方式,并将 enclen 设置为字符串长度 + 编码长度
        enctype = lpEncodeGetType(elestr,size,intenc,&enclen);
        // 发现有大聪明把整数传给了 elestr
        if (enctype == LP_ENCODING_INT) eleint = intenc;
    } else if (eleint) {
        // 如果要插入的是整数,直接设置编码方式
        enctype = LP_ENCODING_INT;
        enclen = size;
    } else {
        // 如果什么也不是,说明是删除操作
        enctype = -1;
        enclen = 0;
    }

    // 将 enclen 存入 backlen,并返回后退步数的编码所需字节数
    unsigned long backlen_size = (!delete) ? lpEncodeBacklen(backlen,enclen) : 0;
    // 计算总字节数
    uint64_t old_listpack_bytes = lpGetTotalBytes(lp);
    uint32_t replaced_len  = 0;
    if (where == LP_REPLACE) {
        // 需要替换的长度 + 该长度的编码所需字节数
        replaced_len = lpCurrentEncodedSizeUnsafe(p);
        replaced_len += lpEncodeBacklenBytes(replaced_len);
        ASSERT_INTEGRITY_LEN(lp, p, replaced_len);
    }

    // 计算新的总字节数:原总字节数 + 新值长度 + 编码长度 + 后退步数的编码长度 - 被替换元素长度
    uint64_t new_listpack_bytes = old_listpack_bytes + enclen + backlen_size
                                  - replaced_len;
    if (new_listpack_bytes > UINT32_MAX) return NULL;

    unsigned char *dst = lp + poff;

    if (new_listpack_bytes > old_listpack_bytes &&
        new_listpack_bytes > lp_malloc_size(lp)) {
        // 如果需要扩容,重新分配空间
        if ((lp = lp_realloc(lp,new_listpack_bytes)) == NULL) return NULL;
        dst = lp + poff;
    }

    if (where == LP_BEFORE) {
        // 如果是在当前元素之前插入,需要将后续元素向后移动
        // 移动的源地址为当前元素的地址
        // 移动的目标地址为当前元素的地址 + 新值长度 + 编码长度 + 后退步数的编码长度
        // 移动的内容长度为原总字节数 - 插入位置的偏移量
        memmove(dst+enclen+backlen_size,dst,old_listpack_bytes-poff);
    } else {
        // 如果是替换操作
        // 移动的源地址为当前元素的地址 + 需要替换的长度 + 该长度的编码所需字节数
        // 移动的目标地址为当前元素的地址 + 新值长度 + 编码长度 + 后退步数的编码长度
        // 移动的内容长度为原总字节数 - 插入位置的偏移量 - 需要替换的长度 - 该长度的编码所需字节数
        memmove(dst+enclen+backlen_size,
                dst+replaced_len,
                old_listpack_bytes-poff-replaced_len);
    }

    // 如果新的总字节数小于原总字节数,需要释放多余的空间
    if (new_listpack_bytes < old_listpack_bytes) {
        if ((lp = lp_realloc(lp,new_listpack_bytes)) == NULL) return NULL;
        dst = lp + poff;
    }

    // 更新 newp,如果是删除操作,将 newp 设置为 NULL
    if (newp) {
        *newp = dst;
        if (delete && dst[0] == LP_EOF) *newp = NULL;
    }
    if (!delete) {
        // 如果不是删除操作,插入新值
        if (enctype == LP_ENCODING_INT) {
            memcpy(dst,eleint,enclen);
        } else if (elestr) {
            lpEncodeString(dst,elestr,size);
        } else {
            redis_unreachable();
        }
        dst += enclen;
        memcpy(dst,backlen,backlen_size);
        dst += backlen_size;
    }

    // 更新头部信息
    if (where != LP_REPLACE || delete) {
        uint32_t num_elements = lpGetNumElements(lp);
        if (num_elements != LP_HDR_NUMELE_UNKNOWN) {
            if (!delete)
                lpSetNumElements(lp,num_elements+1);
            else
                lpSetNumElements(lp,num_elements-1);
        }
    }
    lpSetTotalBytes(lp,new_listpack_bytes);

    return lp;
}

可以看到,listpack 的插入、删除和替换操作都是 \(O(n)\) 的,但是由于不需要记录前一个节点的长度,不会发生连锁更新,因此性能会更好。

总结

listpack 这样把插入、删除和替换写进同一个函数的做法还是挺有意思的,不过 Redis 在 ziplist 实现时并没有这样做,导致还得重读代码才能理解。

listpack 的特点如下:

  • 插入、删除和替换操作不会发生连锁更新,性能更好;
  • 由于不记录前一个节点的长度,反向遍历没有那么直接,需要额外的计算;
  • 去掉了一个字段,节省了空间;但每个字节都有 1 位用于标记是否为最后一个字节,又消耗掉了一点空间。

quicklist

存储结构

为了缓解 ziplist 连锁更新的问题,Redis 引入了 quicklist

quicklist 是一个双向链表,每个节点都包含了一个 ziplist。这样一来,单个 ziplist 发生连锁更新时,影响范围将会大大减小。

不过,从 Redis 7.0 开始,quicklist 变成了一个 listpack 的双向链表。这样一来,quicklist 的优点就更加明显了,它索引的速度将会更快。

我们先来看双向链表。双向链表的定义为:

typedef struct listNode {
    // 前指针
    struct listNode *prev;
    // 后指针
    struct listNode *next;
    // 值
    void *value;
} listNode;

typedef struct list {
    // 头指针
    listNode *head;
    // 尾指针
    listNode *tail;
    // 操作函数
    void *(*dup)(void *ptr);
    void (*free)(void *ptr);
    int (*match)(void *ptr, void *key);
    // 长度
    unsigned long len;
} list;

总体看下来就是最常规的链表实现,没有什么特别的地方。我们就不详细讨论了。

接下来来看 quicklistquicklist定义为:

typedef struct quicklist {
    // 指向第一个 quicklistNode 节点的指针
    quicklistNode *head;
    // 指向最后一个 quicklistNode 节点的指针
    quicklistNode *tail;
    // 所有 listpack 中的条目总数
    unsigned long count;
    // quicklist 节点总数
    unsigned long len;
    
    // 填充因子,用于控制每个 quicklistNode 中 listpack 的大小
    signed int fill : QL_FILL_BITS;
    // 压缩深度,表示从两端开始有多少个 quicklistNode 节点不压缩
    unsigned int compress : QL_COMP_BITS;
    // 书签数量,用于快速定位 quicklistNode 节点
    unsigned int bookmark_count: QL_BM_BITS;
    // 书签数组,用于存储书签信息
    quicklistBookmark bookmarks[];
} quicklist;

quicklist 由多个 quicklistNode 组成,其定义为:

typedef struct quicklistNode {
    // 指向前一个 quicklistNode 节点
    struct quicklistNode *prev;
    // 指向后一个 quicklistNode 节点
    struct quicklistNode *next;
    // 指向 listpack 的指针
    unsigned char *entry;
    // listpack 的总字节数
    size_t sz;
    // listpack 中的条目数
    unsigned int count : 16;
    // 是否压缩:0 为不压缩,1 为压缩
    unsigned int encoding : 2;
    // 容器类型:1 表示普通节点,2 表示压缩节点
    unsigned int container : 2;
    // 是否重新压缩:1 表示该节点之前被压缩过
    unsigned int recompress : 1;
    // 是否尝试压缩:1 表示该节点无法压缩,因为数据太小
    unsigned int attempted_compress : 1;
    // 是否禁止压缩:1 表示防止压缩将来会使用的条目
    unsigned int dont_compress : 1;
    // 额外的 9 位,用于将来扩展使用
    unsigned int extra : 9;
} quicklistNode;

我们注意到 quicklist 中有一个 fill 字段,这个字段用于控制每个 quicklistNodelistpack 的大小。

每当插入一个新元素时,都会检查对应位置 quicklistNodelistpack 是否已经满了,如果满了就新建一个 quicklistNode。这样一来,既不至于 listpack 过大,也不至于 quicklistNode 过多。

插入

quicklist插入代码支持了在指定 listpack 节点的前面或者后面插入元素。但是这代码又臭又长,这里就不粘进来写注释了。它大概做了这些事情:

  • 如果没有可供插入的 listpack 节点,新建一个,并将新元素插入其中
  • 然后判断了几个条件:
    • 当前 listpack 节点是否已满
    • 如果在后方插入,判断一下下一个 listpack 节点是否已满
    • 如果在前方插入,判断一下上一个 listpack 节点是否已满
    • 如果元素太大:
      • 如果当前节点的 container 使用了 QUICKLIST_NODE_CONTAINER_PLAIN,直接插入为新节点
      • 否则,调用拆分函数,将当前节点拆分为两个节点,然后插入新节点
  • 接下来就是真正的插入操作了
    • 如果当前节点没满且在尾部插入,则在当前节点的当前位置之后插入
    • 如果当前节点没满且在头部插入,则在当前节点的当前位置之前插入
    • 如果当前节点满了且在尾部插入且下一个节点没满,则在下一个节点的头部插入
    • 如果当前节点满了且在头部插入且上一个节点没满,则在上一个节点的尾部插入
    • 如果当前节点满了且相邻节点都满了,则新建一个节点,然后插入
    • 如果当前节点已满且需要拆分节点,则拆分当前节点并插入新的节点

我觉得这个函数从逻辑上来讲是很顺畅的,但是代码量太大了,不高兴粘贴过来写注释了 :P

总结

quicklist 的优点如下:

  • 通过双向链表的方式,解决了 ziplist 连锁更新的问题;
  • Redis 7.0 开始,用 listpack 替换 ziplist,进一步减少空间浪费,加快操作速度。

HYPERLOGLOG

基数估计

Cardinality,中文基数,表示的是一个集合中不重复元素的个数。在 Redis 中,HYPERLOGLOG 就是用来估计基数的。

按照常规想法,我们可能会用一个集合来存储所有的元素;每当出现一个新元素,先查找是否已经存在,如果不存在则插入。这样一来,集合中的元素个数就是基数。但是这种方法的空间复杂度是 \(O(n)\) 的,当元素个数很大时,会占用大量内存。

HYPERLOGLOG 使用的算法来源于 Heule 等人1和 P. Flajolet 等人2,它只需要占用固定大小的内存(12 KB + 16 B),就可以在 \(O(1)\) 的时间复杂度内估计基数,且误差率在 \(0.81\%\)。

它的原理可以化简为抛硬币问题。我只需要问你:最多有多少次是连续正面朝上的?就可以估计出抛硬币的总次数。这个估计值也许存在偶然性,但是在大量数据下,这个估计值是相对准确的。

我们还可以改进一下,给你 10 枚硬币去抛,让你分别记录这 10 枚硬币中连续正面朝上的最大次数。这样一来,我们估计得到的结果会更加准确。

HYPERLOGLOG 的原理是这样的:它将每个新元素通过哈希函数映射到一个固定长度的二进制串,其可用于索引到对应的寄存器(对应了分别记录不同的硬币),然后统计这些二进制串中前导 0 的最大长度(对应了硬币连续正面朝上的次数)。

具体来讲,HYPERLOGLOG 有 \(16384\)(\(2^{14}\)) 个寄存器,每个寄存器存储着 \(0-51\),共计 12 KB。

Redis 使用了 64 位的 MurmurHash2 哈希算法,将元素哈希到一个 64 位的整数。

展开查看 MurmurHash2 算法实现
uint64_t MurmurHash64A (const void * key, int len, unsigned int seed) {
    const uint64_t m = 0xc6a4a7935bd1e995;
    const int r = 47;
    uint64_t h = seed ^ (len * m);
    const uint8_t *data = (const uint8_t *)key;
    const uint8_t *end = data + (len-(len&7));

    while(data != end) {
        uint64_t k;

#if (BYTE_ORDER == LITTLE_ENDIAN)
    #ifdef USE_ALIGNED_ACCESS
        memcpy(&k,data,sizeof(uint64_t));
    #else
        k = *((uint64_t*)data);
    #endif
#else
        k = (uint64_t) data[0];
        k |= (uint64_t) data[1] << 8;
        k |= (uint64_t) data[2] << 16;
        k |= (uint64_t) data[3] << 24;
        k |= (uint64_t) data[4] << 32;
        k |= (uint64_t) data[5] << 40;
        k |= (uint64_t) data[6] << 48;
        k |= (uint64_t) data[7] << 56;
#endif

        k *= m;
        k ^= k >> r;
        k *= m;
        h ^= k;
        h *= m;
        data += 8;
    }

    switch(len & 7) {
    case 7: h ^= (uint64_t)data[6] << 48; /* fall-thru */
    case 6: h ^= (uint64_t)data[5] << 40; /* fall-thru */
    case 5: h ^= (uint64_t)data[4] << 32; /* fall-thru */
    case 4: h ^= (uint64_t)data[3] << 24; /* fall-thru */
    case 3: h ^= (uint64_t)data[2] << 16; /* fall-thru */
    case 2: h ^= (uint64_t)data[1] << 8; /* fall-thru */
    case 1: h ^= (uint64_t)data[0];
            h *= m; /* fall-thru */
    };

    h ^= h >> r;
    h *= m;
    h ^= h >> r;
    return h;
}

然后取这个整数的低 14 位作为寄存器的索引,取剩下的 50 位中前导 0 的长度 + 1 作为该元素的值。如果这个值大于寄存器中的值,则更新寄存器中的值。换句话说,寄存器中存储的是落到其中的所有元素的最大前导 0 的长度。

那么,如何根据这些寄存器中的值来估计基数呢?这里用到了一个算法。假设寄存器中的值位 \(C=\left\{c_0,c_1,\cdots,c_{q+1}\right\}\),那么估计的基数为:

\[\begin{aligned} &z \leftarrow m \cdot \tau\left(1 - C_{q+1} / m\right)\\ &\text{for } k \leftarrow q \text{ to } 1 \text{ do} \\ &\quad\quad z \leftarrow 0.5 \times \left(z + C_k\right)\\ &z \leftarrow z + m \cdot \sigma\left(C_0 / m\right)\\ &\text{return } \alpha_{\infty} \cdot m^2 / z \end{aligned}\]
展开查看算法实现

实现为:

uint64_t hllCount(struct hllhdr *hdr, int *invalid) {
    double m = HLL_REGISTERS;
    double E;
    int j;
    int reghisto[64] = {0};

    if (hdr->encoding == HLL_DENSE) {
        hllDenseRegHisto(hdr->registers,reghisto);
    } else if (hdr->encoding == HLL_SPARSE) {
        hllSparseRegHisto(hdr->registers,
                         sdslen((sds)hdr)-HLL_HDR_SIZE,invalid,reghisto);
    } else if (hdr->encoding == HLL_RAW) {
        hllRawRegHisto(hdr->registers,reghisto);
    } else {
        serverPanic("Unknown HyperLogLog encoding in hllCount()");
    }

    double z = m * hllTau((m-reghisto[HLL_Q+1])/(double)m);
    for (j = HLL_Q; j >= 1; --j) {
        z += reghisto[j];
        z *= 0.5;
    }
    z += m * hllSigma(reghisto[0]/(double)m);
    E = llroundl(HLL_ALPHA_INF*m*m/z);

    return (uint64_t) E;
}

其中:

  • \(m = 2^{14}\),即寄存器的个数
    #define HLL_P 14
    #define HLL_REGISTERS (1<<HLL_P)
    
  • \(\tau(x)\) 是估计函数,定义为 \(\tau(x) = 1 - x - \frac{(1-\sqrt[2]{x})^2}{2} - \frac{(1-\sqrt[4]{x})^2}{4} - \frac{(1-\sqrt[8]{x})^2}{8} - \cdots\)
    double hllTau(double x) {
        if (x == 0. || x == 1.) return 0.;
        double zPrime;
        double y = 1.0;
        double z = 1 - x;
        do {
            x = sqrt(x);
            zPrime = z;
            y *= 0.5;
            z -= pow(1 - x, 2)*y;
        } while(zPrime != z);
        return z / 3;
    }
    
  • \(\sigma(x)\) 是估计函数,定义为 \(\sigma(x) = x + x^2 + 2x^4 + 4x^8 + 8x^{16} + \cdots\)
    double hllSigma(double x) {
        if (x == 1.) return INFINITY;
        double zPrime;
        double y = 1;
        double z = x;
        do {
            x *= x;
            zPrime = z;
            z += x * y;
            y += y;
        } while(zPrime != z);
        return z;
    }
    
  • \(\alpha_{\infty}=0.5\ln2\) 是一个常数,用于调整估计值
    #define HLL_ALPHA_INF 0.721347520444481703680
    

由于寄存器数量固定,因此这个算法是 \(O(1)\) 的。

存储结构

HYPERLOGLOG 本质上也是一个字符串,但它有着自己的一套存储结构。它分为 densesparse 两种存储结构,分别对应于密集型和稀疏型的情况。

我们先来看 dense 结构。

它有一个 16 字节的头部,用于存储一些元信息:

+------+---+-----+----------+
| HYLL | E | N/U | Cardin.  |
+------+---+-----+----------+
  • 前 4 个字节是 HYLL,这是个 Magic Number,用于标识这是一个 HYPERLOGLOG 结构;
  • 接下来 1 个字节用于标识是 dense 还是 sparse 结构;
  • 再接下来 3 个字节暂未使用;
  • 最后 8 个字节是以小端存储的 64 位整数,用于存储最后一次计算的基数。基数最高字节的最高位指示了是否被修改过,如果没有修改,则可以重复使用。

它的每个寄存器条目为 6 位,其格式如下:

+--------+--------+--------+------//
|11000000|22221111|33333322|554444//
+--------+--------+--------+------//

稀疏表示

由于存储的寄存器内容可能包含大量的 0,因此可以使用稀疏表示来存储。稀疏表示使用游程长度编码来编码寄存器的内容。游程长度编码是一种压缩算法,它将连续的重复数据压缩为一个数据和一个长度:

  • 00xxxxxx 表示长度为 \(1-64\) 个连续的值为 0 的寄存器。注意,xxxxxx 为实际长度减一;
  • 01xxxxxx yyyyyyyy 表示长度为 \(0-16384\) 个连续的值为 0 的寄存器。注意,xxxxxxyyyyyyyy 为实际长度加一;
  • 1vvvvvxx 表示 \(1-32\) 的值重复了 \(1-4\) 次。注意,vvvvvxx 均为实际值减一。

我们注意到,我们无法表示一个大于 32 的重复值,这时因为这种情况出现的概率很小,因此可以忽略。

密集表示占用内存更大,但处理速度更快;稀疏表示占用内存更小,但处理速度更慢。因此,我们可以通过修改 hll-sparse-max-bytes 来决定何时使用稀疏表示。它的默认值是 3000,即计算得到的基数大于 3000 时使用稀疏表示。

为了加快速度,Redis 对 HYPERLOGLOG 的实现非常的炫技。它大量使用了宏定义和位运算技巧,并且几乎没有使用任何的分支语句。

stream

我们有这一项只是占个坑,但我不太想看它的代码了。

我们前面讲到,Redis 的 stream 相比于 Kafka、RabbitMQ 等消息队列,功能上有所欠缺。我们以后将这些专业消息队列时,再看看它们的实现。

  1. Heule, Nunkesser, Hall: HyperLogLog in Practice: Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm. 

  2. P. Flajolet, Éric Fusy, O. Gandouet, and F. Meunier. Hyperloglog: The analysis of a near-optimal cardinality estimation algorithm. 

Comments

Share This Post