本文将会从源码层面解读 Redis 的各项机制。我们会从最基础的内容开始,尽量只看 Redis 最核心的部分,而剥离掉一些不太重要的内容,所有的源码都给出了 GitHub 的链接。
写作本文时,Redis CE 还在 7.4 版本;Redis 8.0 仍然在 Pre-Release。因此,本文主要基于 Redis 7.4,其余版本可能有所不同。本文使用的环境为 Debian 12。
数据结构
以上我们介绍了 Redis 的几种 Redis 对象,用来实现它们的数据结构包括了 int
、embstr
、sds
、ziplist
、hashtable
、quicklist
、intset
和 skiplist
。我们接下来将会逐个介绍这些数据结构。
int
存储结构
我们在前文中看到:
robj *createStringObjectFromLongLongWithOptions(long long value, int flag) {
robj *o;
/* ... */
o = createObject(OBJ_STRING, NULL);
o->encoding = OBJ_ENCODING_INT;
o->ptr = (void*)((long)value);
/* ... */
return o;
}
可见,int
类型的数据结构是一个 robj
结构体,ptr
指向一个 long
类型的数据。这个数据结构是一个整数,可以存储在 long
范围内的整数。
embstr
存储结构
embstr
也是创建了一个 robj
结构体,只不过它的 ptr
指向的是一个 sdshdr8
结构体,这个结构体我们会在后文介绍:
robj *createEmbeddedStringObject(const char *ptr, size_t len) {
// 为 embstr 分配空间
robj *o = zmalloc(sizeof(robj)+sizeof(struct sdshdr8)+len+1);
struct sdshdr8 *sh = (void*)(o+1);
// 设置类型、编码、指针、引用计数和 LRU
o->type = OBJ_STRING;
o->encoding = OBJ_ENCODING_EMBSTR;
o->ptr = sh+1;
o->refcount = 1;
o->lru = 0;
sh->len = len;
sh->alloc = len;
sh->flags = SDS_TYPE_8;
// 拷贝字符串内容并添加 '\0'
if (ptr == SDS_NOINIT)
sh->buf[len] = '\0';
else if (ptr) {
memcpy(sh->buf,ptr,len);
sh->buf[len] = '\0';
} else {
memset(sh->buf,0,len+1);
}
return o;
}
这使得 embstr
成为了一个只读的字符串,不支持修改。这样做的好处是可以节省一些内存,因为 sds
会多分配一些空间。
如果要修改 embstr
,Redis 会将其转换为 sds
。
sds
存储结构
sds
结构体可以动态调整字符串的大小以节省空间:
robj *createRawStringObject(const char *ptr, size_t len) {
return createObject(OBJ_STRING, sdsnewlen(ptr,len));
}
按照大小不同,共定义了 4 种(sdshdr5
不会被使用到):
-
sdshdr8
:长度为 1 字节 -
sdshdr16
:长度为 2 字节 -
sdshdr32
:长度为 4 字节 -
sdshdr64
:长度为 8 字节
展开代码
struct __attribute__ ((__packed__)) sdshdr8 {
uint8_t len;
uint8_t alloc;
unsigned char flags;
char buf[];
};
struct __attribute__ ((__packed__)) sdshdr16 {
uint16_t len;
uint16_t alloc;
unsigned char flags;
char buf[];
};
struct __attribute__ ((__packed__)) sdshdr32 {
uint32_t len;
uint32_t alloc;
unsigned char flags;
char buf[];
};
struct __attribute__ ((__packed__)) sdshdr64 {
uint64_t len;
uint64_t alloc;
unsigned char flags;
char buf[];
};
这 4 种结构体分别用于存储长度为 1 字节、2 字节、4 字节和 8 字节的字符串。这里的 len
表示字符串的实际长度,alloc
表示分配的空间大小,flags
表示类型,buf
表示字符串的内容。
这里的
__attribute__ ((__packed__))
是告诉编译器不要对结构体进行字节对齐,而是按照实际的状态。
在初始化字符串时,Redis 会首先调用 sdsReqType
函数,根据字符串长度决定字符串类型,然后分配空间并初始化字符串。
展开代码
static inline char sdsReqType(size_t string_size) {
if (string_size < 1<<5)
return SDS_TYPE_5;
if (string_size < 1<<8)
return SDS_TYPE_8;
if (string_size < 1<<16)
return SDS_TYPE_16;
#if (LONG_MAX == LLONG_MAX)
if (string_size < 1ll<<32)
return SDS_TYPE_32;
return SDS_TYPE_64;
#else
return SDS_TYPE_32;
#endif
}
扩容机制
对于 sds
,Redis 会根据字符串长度调整空间大小。这个过程在 _sdsMakeRoomFor
函数中实现。
展开代码
sds _sdsMakeRoomFor(sds s, size_t addlen, int greedy) {
void *sh, *newsh;
size_t avail = sdsavail(s);
size_t len, newlen, reqlen;
char type, oldtype = s[-1] & SDS_TYPE_MASK;
int hdrlen;
size_t usable;
// 如果剩余空间足够,直接返回
if (avail >= addlen) return s;
// 获取字符串长度
len = sdslen(s);
// 获取字符串头指针
sh = (char*)s-sdsHdrSize(oldtype);
// 计算新字符串长度
reqlen = newlen = (len+addlen);
assert(newlen > len);
// 根据 greedy 参数调整空间大小
if (greedy == 1) {
if (newlen < SDS_MAX_PREALLOC)
newlen *= 2;
else
newlen += SDS_MAX_PREALLOC;
}
// 计算新字符串类型
type = sdsReqType(newlen);
if (type == SDS_TYPE_5) type = SDS_TYPE_8;
// 计算头部长度
hdrlen = sdsHdrSize(type);
assert(hdrlen + newlen + 1 > reqlen);
// 如果类型相同,直接调整空间大小
if (oldtype==type) {
// 重新分配空间大小为:头部长度+新字符串长度+1
newsh = s_realloc_usable(sh, hdrlen+newlen+1, &usable);
if (newsh == NULL) return NULL;
s = (char*)newsh+hdrlen;
} else {
// 类型不同的情况下,头部长度会发生变化,需要重新分配空间
newsh = s_malloc_usable(hdrlen+newlen+1, &usable);
if (newsh == NULL) return NULL;
// 将原字符串内容拷贝到新空间中
memcpy((char*)newsh+hdrlen, s, len+1);
s_free(sh);
// 更新头部指针、类型和长度
s = (char*)newsh+hdrlen;
s[-1] = type;
sdssetlen(s, len);
}
// 计算剩余空间
usable = usable-hdrlen-1;
if (usable > sdsTypeMaxSize(type))
usable = sdsTypeMaxSize(type);
sdssetalloc(s, usable);
return s;
}
空间分配的策略取决于 greddy
参数:
- 如果
greddy
为 0,Redis 会根据字符串长度调整空间大小为刚刚好; - 如果
greddy
为 1- 如果原大小小于 \(1024 \times 1024\),Redis 会让空间大小增加一倍
- 否则增加 \(1024 \times 1024\)。
Redis 还设置了 cron job 来释放未使用的空间。当字符串浪费掉的空间大于 \(1024 \times 4\) 时,就会调用 sdsResize
函数来缩减大小。此过程和上面扩容类似,也是计算字符串长度,然后重新分配空间。
总结
Redis 还实现了很多常见的字符串操作,如拼接、复制、比较等。这些操作都是基于 sds
实现的。众所周知,每个 C 语言项目都会实现一个自己的字符串库,经常写 C 的大伙肯定不但看腻了,也写腻了。Redis 的实现也没什么很特别的,这里就不再赘述了。
sds
这种设计带来了几个好处:
- \(O(1)\) 时间复杂度的字符串长度计算;
- 操作全部在字节级别,可以存储任意数据,二进制安全;
- 可以动态调整空间大小,节省内存。
- 通过记录已分配大小和实际大小,可以避免缓冲区溢出。
- 通过比需要的空间多分配一些,可以有效减少内存分配次数。
hashtable
存储结构
Redis 使用存储了 hashtable
的字典结构实现了 HASH
、SET
和本身的键值对数据库。SET
和 HASH
本质上是一样的,只是 SET
的值为空。
字典类型被定义在 dict.h
中:
struct dict {
// 字典的一些基本属性
dictType *type;
// 两个哈希表,一个用于存储数据,一个用于扩容
dictEntry **ht_table[2];
// 两个哈希表的大小
unsigned long ht_used[2];
// 扩容用的索引
long rehashidx;
// 一些标志位
unsigned pauserehash : 15;
unsigned useStoredKeyApi : 1;
signed char ht_size_exp[2];
int16_t pauseAutoResize;
void *metadata[];
};
其中的 dictEntry
结构体定义在 dict.c
中:
struct dictEntry {
// 键
void *key;
// 值
union {
void *val;
uint64_t u64;
int64_t s64;
double d;
} v;
// 下一个条目的指针
struct dictEntry *next;
};
Redis 使用的是链地址法来解决冲突。从上面的代码可以看出,每个元素都会有一个指针,指向下一个元素。这样,当出现哈希冲突时,即将冲突的元素放在同一个桶中,通过链表连接。
扩容机制
Redis 的 hashtable
初始大小为 \(4\):
#define DICT_HT_INITIAL_SIZE 4
在运行过程中,Redis 为了避免挤爆 hashtable
,会判断是否需要扩容。
展开代码
int dictExpandIfNeeded(dict *d) {
// 如果正在扩容,则别重复扩容了
if (dictIsRehashing(d)) return DICT_OK;
// 如果哈希表为空,则扩容到初始大小(4)
if (DICTHT_SIZE(d->ht_size_exp[0]) == 0) {
dictExpand(d, DICT_HT_INITIAL_SIZE);
return DICT_OK;
}
// 如果满足扩容条件,则扩容
if ((dict_can_resize == DICT_RESIZE_ENABLE &&
d->ht_used[0] >= DICTHT_SIZE(d->ht_size_exp[0])) ||
(dict_can_resize != DICT_RESIZE_FORBID &&
d->ht_used[0] >= dict_force_resize_ratio * DICTHT_SIZE(d->ht_size_exp[0])))
{
// 由于有时候扩容需要分配大量内存,所以需要检查是否允许扩容
if (dictTypeResizeAllowed(d, d->ht_used[0] + 1))
// 扩容
dictExpand(d, d->ht_used[0] + 1);
return DICT_OK;
}
return DICT_ERR;
}
hashtable
扩容的条件有三种:
- 如果
hashtable
为空,扩容到初始大小 \(4\)。 - 如果
hashtable
的负载因子超过了 \(1:1\)(即平均每个桶中有超过一个元素),且允许扩容,则进行扩容。 - 如果
hashtable
的负载因子超过了dict_force_resize_ratio
(默认为 \(4\)),不管是否允许扩容,都进行扩容。
扩容后的大小通过函数计算得到。具体来讲,如果扩容后大小不超过 long
类型的最大值,那么 hashtable
的大小会以 \(2\) 的幂次方增长。
展开代码
static signed char _dictNextExp(unsigned long size)
{
if (size <= DICT_HT_INITIAL_SIZE) return DICT_HT_INITIAL_EXP;
if (size >= LONG_MAX) return (8*sizeof(long)-1);
// __builtin_clzl 返回二进制中从最高位开始连续的 0 的个数
// 8*sizeof(long) 返回 long 类型的二进制位数
// 因此这个表达式返回的是 size-1 的有效位数
return 8*sizeof(long) - __builtin_clzl(size-1);
}
如果满足了扩容条件,Redis 会最终调用到 dictRehash
函数来扩容 hashtable
。
我们前面看到,hahstable
有两个 ht_table
,一个用于存储数据,一个用于扩容。扩容时,Redis 会将数据从 ht_table[0]
迁移到 ht_table[1]
。
由于一次性移动所有数据可能会消耗大量计算资源,Redis 采用了渐进式扩容。具体来说,它使用 rehashidx
来记录当前迁移到的位置,每当对字典进行一次 CRUD 操作后,就同时迁移 n
个桶的数据。完成一次的迁移后,rehashidx
会自增至下一个要迁移的桶。
每次迁移多少个桶呢?Redis 规定每次迁移 \(100\) 个桶。但如果迁移花费的时间超过了一个阈值,Redis 会中止迁移。
另外需要注意的是,如果迁移的桶为空,Redis 会跳过这个桶,不把它计算在迁移的桶数中。
展开代码
int dictRehash(dict *d, int n) {
/* n 为每次迁移的桶数 */
// 空桶访问次数限制
int empty_visits = n*10;
// 两张哈希表的大小
unsigned long s0 = DICTHT_SIZE(d->ht_size_exp[0]);
unsigned long s1 = DICTHT_SIZE(d->ht_size_exp[1]);
/* ... */
// 执行迁移,直至迁移 n 个桶或者没东西可迁移了
while(n-- && d->ht_used[0] != 0) {
assert(DICTHT_SIZE(d->ht_size_exp[0]) > (unsigned long)d->rehashidx);
// 跳过空桶
while(d->ht_table[0][d->rehashidx] == NULL) {
d->rehashidx++;
if (--empty_visits == 0) return 1;
}
// 迁移桶中的数据
rehashEntriesInBucketAtIndex(d, d->rehashidx);
// 迁移完成后,将 rehashidx 加一,指向下一个桶
d->rehashidx++;
}
// 检查是否迁移完成
return !dictCheckRehashingCompleted(d);
}
当全部数据迁移完成后,Redis 会将扩容后的 hashtable
转正。也就是说,将 ht_table[0]
和 ht_table[1]
的指针互换,释放 ht_table[0]
的内存,并将 rehashidx
重置为 \(-1\)。
展开代码
static int dictCheckRehashingCompleted(dict *d) {
// 如果字典里还有元素,说明迁移还没完成
if (d->ht_used[0] != 0) return 0;
// 释放旧哈希表
if (d->type->rehashingCompleted) d->type->rehashingCompleted(d);
zfree(d->ht_table[0]);
// 将扩容后的哈希表转正
d->ht_table[0] = d->ht_table[1];
d->ht_used[0] = d->ht_used[1];
d->ht_size_exp[0] = d->ht_size_exp[1];
// 重置新的用来扩容的哈希表
_dictReset(d, 1);
// 重置 rehashidx,-1 表示没有在迁移
d->rehashidx = -1;
return 1;
}
如果哈希表太空了,Redis 也会缩容。
展开代码
int dictShrinkIfNeeded(dict *d) {
// 如果正在扩容/缩容,则别缩容了
if (dictIsRehashing(d)) return DICT_OK;
// 如果哈希表大小小于等于初始大小,别缩容了
if (DICTHT_SIZE(d->ht_size_exp[0]) <= DICT_HT_INITIAL_SIZE) return DICT_OK;
// 如果负载因子小于 1:8 且允许缩容,或者小于 1:32 但是不允许缩容
if ((dict_can_resize == DICT_RESIZE_ENABLE &&
d->ht_used[0] * HASHTABLE_MIN_FILL <= DICTHT_SIZE(d->ht_size_exp[0])) ||
(dict_can_resize != DICT_RESIZE_FORBID &&
d->ht_used[0] * HASHTABLE_MIN_FILL * dict_force_resize_ratio <= DICTHT_SIZE(d->ht_size_exp[0])))
{
if (dictTypeResizeAllowed(d, d->ht_used[0]))
dictShrink(d, d->ht_used[0]);
return DICT_OK;
}
return DICT_ERR;
}
hashtable
缩容的条件有两种:
- 如果
hashtable
的负载因子小于 \(1:8\),且允许缩容,则进行缩容。 - 如果
hashtable
的负载因子小于 \(1:32\),但是不允许缩容,则进行缩容。
后面这个 \(1:32\) 可以通过 dict_force_resize_ratio
来调整,默认为 \(4\),在实际运行的时候会乘上 HASHTABLE_MIN_FILL
(默认为 \(8\))。
总结
最后再提一嘴无关紧要的,Redis 的哈希函数使用的是 siphash
,这是一种安全的哈希函数。
总体来讲,Redis 中 dict
的亮点在于:
- 使用链地址法解决冲突,避免了哈希冲突;
- 使用渐进式扩容,避免了一次性分配大量内存。
intset
存储结构
如果 SET
中的元素都是整数,Redis 会使用有序且不重复的 intset
来存储:
typedef struct intset {
// 编码类型
uint32_t encoding;
// 长度,即 contents 中元素的个数
uint32_t length;
// 内容,这里的 int8_t 不会产生任何实际效果
int8_t contents[];
} intset;
Redis 会根据元素值的大小选择合适的编码。intset
的编码有三种:
-
INTSET_ENC_INT16
:16 位整数 -
INTSET_ENC_INT32
:32 位整数 -
INTSET_ENC_INT64
:64 位整数
展开代码
#define INTSET_ENC_INT16 (sizeof(int16_t))
#define INTSET_ENC_INT32 (sizeof(int32_t))
#define INTSET_ENC_INT64 (sizeof(int64_t))
插入和升级
当需要添加新元素时,Redis 先在 intset
中搜索该元素:
- 如果找到了,就不再添加
- 如果没有找到,就将元素添加到
intset
中
添加的过程是通过将大于要添加的值的元素后移一个位置,然后将新元素插入到空出的位置,类似插入排序。这个过程在 intsetAdd
函数中实现。
展开代码
intset *intsetAdd(intset *is, int64_t value, uint8_t *success) {
// 获取要添加的值的编码类型
uint8_t valenc = _intsetValueEncoding(value);
uint32_t pos;
if (success) *success = 1;
// 如果要添加的值的编码类型大于当前集合的编码类型
if (valenc > intrev32ifbe(is->encoding)) {
// 升级集合的编码类型并添加值
return intsetUpgradeAndAdd(is,value);
} else {
// 在集合中搜索该值,如果找到则原样返回并通知不成功
if (intsetSearch(is,value,&pos)) {
if (success) *success = 0;
return is;
}
// 调整集合长度为:原长度+1
is = intsetResize(is,intrev32ifbe(is->length)+1);
// 如果新值不是最大的那个,则移动尾部元素给它腾位置
if (pos < intrev32ifbe(is->length)) intsetMoveTail(is,pos,pos+1);
}
// 在腾出的位置放入新值
_intsetSet(is,pos,value);
// 更新集合长度为:原长度+1
is->length = intrev32ifbe(intrev32ifbe(is->length)+1);
return is;
}
intrev32ifbe
被用来将内容转换为小端存储。
如果元素的大小超过了当前编码的范围,Redis 会将 intset
的编码升级为更大的编码。intset
的编码升级是通过 intsetUpgradeAndAdd
函数实现的。它分为三步操作:
- 先设置新的
intset
大小为(原长度+1)*新编码类型的大小
- 然后将现有元素重新编码并复制到新集合
- 最后设置新值并更新集合长度
展开代码
static intset *intsetUpgradeAndAdd(intset *is, int64_t value) {
// 获取当前集合的编码类型
uint8_t curenc = intrev32ifbe(is->encoding);
// 获取要添加的值的编码类型
uint8_t newenc = _intsetValueEncoding(value);
// 获取当前集合的长度
int length = intrev32ifbe(is->length);
// 获取要添加的值的位置(如果元素<0,则在头部添加;否则在尾部添加)
int prepend = value < 0 ? 1 : 0;
// 设置新的编码类型
is->encoding = intrev32ifbe(newenc);
// 重新调整集合大小为:(原长度+1)*新编码类型的大小
is = intsetResize(is,intrev32ifbe(is->length)+1);
// 将现有元素重新编码并复制到新集合
while(length--)
// 三个参数分别为集合、位置和值
_intsetSet(is,length+prepend,_intsetGetEncoded(is,length,curenc));
// 设置新值
if (prepend)
_intsetSet(is,0,value);
else
_intsetSet(is,intrev32ifbe(is->length),value);
// 更新集合长度
is->length = intrev32ifbe(intrev32ifbe(is->length)+1);
return is;
}
由以上内容可以发现,intset
维护有序的方法实际上就是最简单的插入排序。插入的时间复杂度是 \(O(n)\),而查找的时间复杂度是 \(O(\log{n})\)。为了节省空间,数组大小始终处于刚刚能放下所有元素的状态。
然而,intset
只会升级编码,不会降级编码,如果删除了元素,intset
的编码只会保持不变。
总结
intset
的特点主要有以下几点:
- 以最小的空间存储整数集合;
- 插入时的时间复杂度较高;
- 通过升级编码来适应更大的整数。
- 只会升级编码,不会降级编码。
skiplist
存储结构
我们先讲 skiplist
。skiplist
是一种有序数据结构,它通过多级索引来加速查找。
具体来讲,skiplist
由多个层级组成,每个层级都是一个有序链表。每个节点都包含了一个指向下一个节点的指针,以及一个指向下一层的指针。这样,我们可以通过上层的指针快速定位到下层的节点。
skiplist
的最底层是一个普通的有序链表。每个较高层都充当下面列表的快速通道。插入时,层 \(i\) 中的元素以某个固定概率 \(p\) 出现在层 \(i+1\) 中( \(p\) 的两个常用值是 \(\frac{1}{2}\) 或 \(\frac{1}{4}\))。平均而言,每个元素出现在 \(\frac{1}{1-p}\) 个列表中,并且最高元素(通常是跳跃列表前面的特殊头元素)出现在所有列表中。跳跃列表包含 \(\log_{1/p}{n}\) 个列表。
skiplist
的搜索从最高层列表的第一个元素开始,横向前进,直到当前元素大于或等于目标值:
- 如果当前元素等于目标值,则已找到
- 如果当前元素大于目标值,或者搜索到达链接列表的末尾,则返回前一个元素并垂直下降到下一层列表后重复该过程
每个链接列表中的预期步数最多为 \(\frac{1}{p}\),因此,搜索的总预期成本为 \(\frac{1}{p}\log_{1/p}{n}\),即 \(O\left(\log{n}\right)\),其中 \(p\) 为常数。通过选择不同的 \(p\) 值,可以实现搜索成本与存储成本的平衡。
在 Redis 中,skiplist
最多有 32 层,可以存储约 \(2^{32}\) 个节点。同时,它取了 \(p = \frac{1}{4}\),这是一个经验值。
展开代码
其实现在 zslRandomLevel
:
int zslRandomLevel(void) {
static const int threshold = ZSKIPLIST_P*RAND_MAX;
int level = 1;
while (random() < threshold)
level += 1;
return (level<ZSKIPLIST_MAXLEVEL) ? level : ZSKIPLIST_MAXLEVEL;
}
skiplist
被定义在 server.h
中:
typedef struct zskiplistNode {
// 成员对象
sds ele;
// 分值
double score;
// 后退指针
struct zskiplistNode *backward;
// 层级数组
struct zskiplistLevel {
// 前进指针
struct zskiplistNode *forward;
// 跨度
unsigned long span;
} level[];
} zskiplistNode;
typedef struct zskiplist {
// 头尾指针
struct zskiplistNode *header, *tail;
// 节点数量
unsigned long length;
// 最大层数
int level;
} zskiplist;
typedef struct zset {
// 字典用于存储元素和分数的映射
dict *dict;
// 跳跃表用于排序
zskiplist *zsl;
} zset;
后退指针用于快速定位到前一个节点,这在我们之前讲到的遍历的过程中很有用。在层级数组中,每个元素包含了前进指针以从后向前逐个访问节点,以及这两个节点之间的跨度。跨度被用于计算某个节点的排名,这样只需要像正常遍历一样即可得到。
插入
skiplist
的插入操作通过 zslInsert
函数实现。
由于 skiplist
存储了跨度这一信息,所以插入操作可能需要更新多个节点的跨度。在插入节点时:
- 首先从上至下搜索插入的位置,当需要往下走时,记录下这个节点,后面需要更新它的跨度
- 接着,随机生成一个层数
- 如果这个层数大于当前表的最高层数,我们就需要更新整个表
- 否则直接插入节点
- 最后,更新之前记录的节点的跨度,并更新后退指针
展开代码
zskiplistNode *zslInsert(zskiplist *zsl, double score, sds ele) {
// update 数组用于存储每一层需要更新的节点
zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
// rank 数组用于存储每一层跨越的节点数
unsigned long rank[ZSKIPLIST_MAXLEVEL];
int i, level;
serverAssert(!isnan(score));
x = zsl->header;
for (i = zsl->level-1; i >= 0; i--) {
// 新到第 i 层,当前跨度为上一层的跨度
rank[i] = i == (zsl->level-1) ? 0 : rank[i+1];
while (x->level[i].forward &&
(x->level[i].forward->score < score ||
(x->level[i].forward->score == score &&
sdscmp(x->level[i].forward->ele,ele) < 0)))
{
// 如果还没找到要向下走的节点,就更新跨度并继续向前走
rank[i] += x->level[i].span;
x = x->level[i].forward;
}
// 走到要向下时,存储当前节点,后面需要更新它的跨度
update[i] = x;
}
// 获取一个随机层数
level = zslRandomLevel();
if (level > zsl->level) {
// 如果随机层数大于当前总层数,需要更新整张表
for (i = zsl->level; i < level; i++) {
rank[i] = 0;
update[i] = zsl->header;
update[i]->level[i].span = zsl->length;
}
zsl->level = level;
}
x = zslCreateNode(level,score,ele);
for (i = 0; i < level; i++) {
// 插入节点
x->level[i].forward = update[i]->level[i].forward;
update[i]->level[i].forward = x;
// 更新跨度
x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]);
update[i]->level[i].span = (rank[0] - rank[i]) + 1;
}
// 有些层没有访问到,页需要更新跨度
for (i = level; i < zsl->level; i++) {
update[i]->level[i].span++;
}
// 设置后退指针
x->backward = (update[0] == zsl->header) ? NULL : update[0];
if (x->level[0].forward)
x->level[0].forward->backward = x;
else
zsl->tail = x;
// 节点数加一
zsl->length++;
return x;
}
总结
显然,skiplist
的功能平衡树也能完成。相比之下,skiplist
的特点主要有以下几点:
- 指针比平衡树少,节省内存;
- 算法实现难度低,便于维护;
- 但是
skiplist
的缓存命中率不如平衡树高。
我个人看下来,对于 Redis 这种对速度和内存要求都很高的工具来讲,
skiplist
应该是不如平衡树的,但是 Redis 选择了skiplist
,我觉得最大的可能性还是社区的惯性。
ziplist
存储结构
ziplist
是一种紧凑的数据结构,可以存储字符串和整数。
在普通链表的实现下,每个节点都会有一个向前的指针和一个向后的指针,这样会浪费大量的空间。ziplist
通过则省去了这些指针,把所有数据放在一个紧凑的结构中,通过记录前一个节点的长度和当前节点的长度来实现链表上的移动。
其单个条目的定义如下:
typedef struct {
// 当存储的是字符串时,sval 指向字符串
unsigned char *sval;
// slen 为上一个条目的长度
unsigned int slen;
// 当存储的是整数时,lval 为整数值,sval 为 NULL
long long lval;
} ziplistEntry;
zlentry
的定义如下:
typedef struct zlentry {
// 编码前一个条目长度所使用的字节数
unsigned int prevrawlensize;
// 前一个条目的长度
unsigned int prevrawlen;
// 编码当前条目长度所使用的字节数
unsigned int lensize;
// 当前条目的长度
unsigned int len;
// prevrawlensize + lensize
unsigned int headersize;
// 条目的编码方式,ZIP_STR_* 或 ZIP_INT_*
unsigned char encoding;
// 指向条目的指针
unsigned char *p;
} zlentry;
为了将 zlentry
合并到一起,ziplist
整体在内存中按照以下方式存储。
bytes | tail offset | len | entry | entry | entry | end |
---|---|---|---|---|---|---|
32 bits | 32 bits | 16 bits | 8 bits |
-
bytes
:ziplist
的总字节数 -
tail offset
:到达最后一个entry
的偏移量 -
len
:ziplist
中的元素个数 -
entry
:ziplist
中的具体元素 -
end
:结束标志
展开代码
unsigned char *ziplistNew(void) {
// 分配所需的初始字节数,包括头部和结尾标志的大小
unsigned int bytes = ZIPLIST_HEADER_SIZE+ZIPLIST_END_SIZE;
unsigned char *zl = zmalloc(bytes);
// 设置总字节数
ZIPLIST_BYTES(zl) = intrev32ifbe(bytes);
// 设置表尾偏移量,初始值为 32*2+16
ZIPLIST_TAIL_OFFSET(zl) = intrev32ifbe(ZIPLIST_HEADER_SIZE);
// 设置长度为 0
ZIPLIST_LENGTH(zl) = 0;
// 设置结束标志,固定为 255
zl[bytes-1] = ZIP_END;
return zl;
}
每个 entry
在内存中形如:
prevlen | encode & len | content |
---|---|---|
8/40 bits | 8/16/40 bits |
-
prevlen
:前一个条目的长度,用于向前移动 -
encode & len
:编码和长度,用于向后移动 -
content
:条目的内容
encode & len
很容易得到,它们两个确定了 content
的长度。Redis 将这两者编码在了一起。根据元素类型和长度的不同,有不同的计算方法,来确定放在内存中的值:
- 如果是字符串
- 如果长度 \(\leq 63\),为
00
+ 6 位长度,共占 1 个字节 - 如果长度 \(\leq 16383\),为
01
+ 14 位长度,共占 2 个字节 - 否则为
10xxxxxx
+ 32 位长度,共占 5 个字节
- 如果长度 \(\leq 63\),为
- 如果是整数
- 如果是
int8_t
,为0xfe
+ 8 位整数,共占 2 个字节 - 如果是
int16_t
,为0xc0
+ 16 位整数,共占 3 个字节 - 如果是
int24_t
,为0xf0
+ 24 位整数,共占 4 个字节 - 如果是
int32_t
,为0xd0
+ 32 位整数,共占 5 个字节 - 如果是
int64_t
,为0xe0
+ 64 位整数,共占 9 个字节
- 如果是
展开代码
unsigned int zipStoreEntryEncoding(unsigned char *p, unsigned char encoding, unsigned int rawlen) {
unsigned char len = 1, buf[5];
if (ZIP_IS_STR(encoding)) {
// 如果是字符串
if (rawlen <= 0x3f) {
// 如果长度 <= 63,存储长度需要 6 位,前面再加上 00
if (!p) return len;
buf[0] = ZIP_STR_06B | rawlen;
} else if (rawlen <= 0x3fff) {
// 如果长度 <= 16383,存储长度需要 14 位,前面再加上 01
len += 1;
if (!p) return len;
buf[0] = ZIP_STR_14B | ((rawlen >> 8) & 0x3f);
buf[1] = rawlen & 0xff;
} else {
// 否则存储长度需要 40 位,其中前 8 位为 10000000
len += 4;
if (!p) return len;
buf[0] = ZIP_STR_32B;
buf[1] = (rawlen >> 24) & 0xff;
buf[2] = (rawlen >> 16) & 0xff;
buf[3] = (rawlen >> 8) & 0xff;
buf[4] = rawlen & 0xff;
}
} else {
// 如果是整数
if (!p) return len;
// 预先定义好的 8 位不同类型的编码,其中前 2 位为 11
buf[0] = encoding;
}
memcpy(p,buf,len);
return len;
}
下文如无特殊说明,我们说的“当前条目长度”指的是
encode
和len
这个整体。
而 prevrawlen
则是在添加元素时更新的。其计算方法如下:
- 如果前一个条目的长度 \(< 254\),直接存储长度,共占 1 个字节
- 否则,第一个字节为
0xfe
,后面 4 个字节存储长度,共占 5 个字节
展开代码
unsigned int zipStorePrevEntryLength(unsigned char *p, unsigned int len) {
if (p == NULL) {
// 如果 p 为 NULL,只返回存储长度所需的字节数(1 或 5)
return (len < ZIP_BIG_PREVLEN) ? 1 : sizeof(uint32_t) + 1;
} else {
// 如果 p 不为 NULL,返回存储长度所需的字节数,并将长度存储到 p 中
if (len < ZIP_BIG_PREVLEN) {
// 如果长度小于 254,直接存储
p[0] = len;
return 1;
} else {
// 否则调用 zipStorePrevEntryLengthLarge
return zipStorePrevEntryLengthLarge(p,len);
}
}
}
int zipStorePrevEntryLengthLarge(unsigned char *p, unsigned int len) {
uint32_t u32;
if (p != NULL) {
// 第一个字节设置为 254
p[0] = ZIP_BIG_PREVLEN;
// 剩余 4 个字节存储长度
u32 = len;
memcpy(p+1,&u32,sizeof(u32));
memrev32ifbe(p+1);
}
// 存储长度所需的字节数为 5
return 1 + sizeof(uint32_t);
}
索引
ziplist
记录上一个条目的长度好处是正向反向遍历都很方便。正向遍历时,只需要根据当前条目长度向后移动即可。反向遍历时,只需要根据上一个条目的长度向前移动即可。
利用这个特性,我们可以看看索引的实现:
- 如果索引为正数,从前往后遍历,指针首先指向第一个条目
- 读取当前条目的长度
- 将指针向后移动当前条目的长度
- 如果索引为负数,从后往前遍历,指针首先指向最后一个条目
- 读取前一个条目的长度
- 将指针向前移动前一个条目的长度
展开代码
unsigned char *ziplistIndex(unsigned char *zl, int index) {
unsigned char *p;
unsigned int prevlensize, prevlen = 0;
// 总字节数
size_t zlbytes = intrev32ifbe(ZIPLIST_BYTES(zl));
if (index < 0) {
// 如果索引为负数,从表尾开始向前遍历
// 由于 -1 是最后一个条目,因此需要向前移动 -index-1 个条目
index = (-index)-1;
// 获取表尾条目的地址
p = ZIPLIST_ENTRY_TAIL(zl);
if (p[0] != ZIP_END) {
// 解码前一个条目的长度字节数
ZIP_DECODE_PREVLENSIZE(p, prevlensize);
// 确保前一个条目的长度字节数在范围内
assert(p + prevlensize < zl + zlbytes - ZIPLIST_END_SIZE);
// 解码前一个条目的长度
ZIP_DECODE_PREVLEN(p, prevlensize, prevlen);
while (prevlen > 0 && index--) {
// 移动到前一个条目
p -= prevlen;
assert(p >= zl + ZIPLIST_HEADER_SIZE && p < zl + zlbytes - ZIPLIST_END_SIZE);
// 解码前一个条目的长度
ZIP_DECODE_PREVLEN(p, prevlensize, prevlen);
}
}
} else {
// 如果索引为正数,从表头开始向后遍历
p = ZIPLIST_ENTRY_HEAD(zl);
while (index--) {
// 移动到下一个条目
p += zipRawEntryLengthSafe(zl, zlbytes, p);
if (p[0] == ZIP_END)
break;
}
}
if (p[0] == ZIP_END || index > 0)
return NULL;
zipAssertValidEntry(zl, zlbytes, p);
return p;
}
可以看到,通过记录上一个条目的长度,ziplist
在实现了双向遍历的功能。
插入和连锁更新
我们来看插入操作的实现。
插入操作需要根据插入位置,将后续的条目向后移动。然而,由于下一个条目的 prevlen
此时要记录当前条目的长度,其所占据的字节数可能会发生变化。因此,插入操作的流程是这样的:
- 指针指向插入位置的前一个条目
- 得到前一个条目的长度,由此可以确定要插入条目的
prevlen
- 根据条目的内容计算当前条目的长度,至此,要插入的条目内容和长度都确定了
- 得到插入位置的后一个条目原本的
prevlen
,检查一下更新它的值后,所需的字节数是否发生变化 - 结合要插入条目的长度和后一个条目的
prevlen
所需字节数的变化,计算出整体需要移动的字节数 - 将插入位置后的条目向后移动
- 更新后一个条目的
prevlen
- 更新
tail offset
- 插入当前条目
展开代码
unsigned char *__ziplistInsert(unsigned char *zl, unsigned char *p, unsigned char *s, unsigned int slen) {
/* 在 zl 中的 p 位置插入一个新的条目,内容为 s,长度为 slen */
// curlen 为当前 zl 的总字节数
size_t curlen = intrev32ifbe(ZIPLIST_BYTES(zl)), reqlen, newlen;
unsigned int prevlensize, prevlen = 0;
size_t offset;
int nextdiff = 0;
unsigned char encoding = 0;
long long value = 123456789;
zlentry tail;
if (p[0] != ZIP_END) {
// 如果插入位置不是表尾,获取前一个条目的长度
ZIP_DECODE_PREVLEN(p, prevlensize, prevlen);
} else {
// 如果插入位置是表尾,获取尾部元素的实际地址
unsigned char *ptail = ZIPLIST_ENTRY_TAIL(zl);
if (ptail[0] != ZIP_END) {
// 获取尾部元素的长度
prevlen = zipRawEntryLengthSafe(zl, curlen, ptail);
}
}
if (zipTryEncoding(s,slen,&value,&encoding)) {
// 如果是整数,转换为整数编码
reqlen = zipIntSize(encoding);
} else {
// 如果是整数,计算所需的字节数
reqlen = slen;
}
// 计算当前条目开头所需的总字节数
// 计算存储前一个条目长度所需的字节数
reqlen += zipStorePrevEntryLength(NULL,prevlen);
// 计算存储当前条目长度所需的字节数
reqlen += zipStoreEntryEncoding(NULL,encoding,slen);
int forcelarge = 0;
// 如果插入位置不是表尾,计算当前条目长度 - 下一个条目 prevlen 占据空间,可能的值为 -4、0、4
nextdiff = (p[0] != ZIP_END) ? zipPrevLenByteDiff(p,reqlen) : 0;
if (nextdiff == -4 && reqlen < 4) {
// 如果下一个条目的 prevlen 占据了 5 字节,但当前条目长度为 1 字节,需要强制使用 5 字节
nextdiff = 0;
forcelarge = 1;
}
// 所以,就是要么不变,要么加 4
// 存储偏移量,因为 realloc 可能会改变 zl 的地址
offset = p-zl;
// 计算新的总字节数
newlen = curlen+reqlen+nextdiff;
// 重新分配空间
zl = ziplistResize(zl,newlen);
// 重新获取插入位置
p = zl+offset;
if (p[0] != ZIP_END) {
// 如果插入位置不是表尾,需要移动后续条目
// 移动的源地址为插入位置 - nextdiff
// 移动的目标地址为插入位置 + 当前条目所需的总字节数
// 移动的内容长度为当前 zl 的总字节数 - 插入位置的偏移量 - 1 + nextdiff
memmove(p+reqlen,p-nextdiff,curlen-offset-1+nextdiff);
if (forcelarge)
// 如果下一个条目记录的 prevlen 所需的字节数从 1 变为了 5,需要重新计算并存储
zipStorePrevEntryLengthLarge(p+reqlen,reqlen);
else
// 否则按照正常流程计算并存储
zipStorePrevEntryLength(p+reqlen,reqlen);
// 更新 tail offset 为原来的值 + 当前条目所需的总字节数
ZIPLIST_TAIL_OFFSET(zl) =
intrev32ifbe(intrev32ifbe(ZIPLIST_TAIL_OFFSET(zl))+reqlen);
assert(zipEntrySafe(zl, newlen, p+reqlen, &tail, 1));
if (p[reqlen+tail.headersize+tail.len] != ZIP_END) {
// 如果当前表尾位置不是 ZIP_END(即 forcelarge 了),需要再加上 nextdiff
ZIPLIST_TAIL_OFFSET(zl) =
intrev32ifbe(intrev32ifbe(ZIPLIST_TAIL_OFFSET(zl))+nextdiff);
}
} else {
// 如果插入位置是表尾,只需要更新 tail offset 为新的位置
ZIPLIST_TAIL_OFFSET(zl) = intrev32ifbe(p-zl);
}
// 如果 nextdiff 不为 0,需要进行连锁更新
if (nextdiff != 0) {
offset = p-zl;
zl = __ziplistCascadeUpdate(zl,p+reqlen);
p = zl+offset;
}
// 插入新条目
p += zipStorePrevEntryLength(p,prevlen);
p += zipStoreEntryEncoding(p,encoding,slen);
if (ZIP_IS_STR(encoding)) {
memcpy(p,s,slen);
} else {
zipSaveInteger(p,value,encoding);
}
ZIPLIST_INCR_LENGTH(zl,1);
return zl;
}
然而,后一个条目 prevlen
所占空间的变化可能导致连锁更新。
现在考虑这样一种情况:本来条目 B 的总长度为 253,现在在它前面插入了一个长度为 10086 的条目 A,导致 B 的 prevlen
占据的空间由 1 变为 5,B 占据的总空间由 253 变为了 257。
这导致 B 的下个条目 C 的 prevlen
占据的空间也要从 1 变为 5,它也要进行更新,连锁更新就发生了。
在最坏情况下,这种更新会一直传递下去,直到表尾,导致插入操作的时间复杂度会变为 \(O(n^2)\)。
展开代码
连锁更新代码:
unsigned char *__ziplistCascadeUpdate(unsigned char *zl, unsigned char *p) {
zlentry cur;
size_t prevlen, prevlensize, prevoffset;
size_t firstentrylen;
size_t rawlen, curlen = intrev32ifbe(ZIPLIST_BYTES(zl));
size_t extra = 0, cnt = 0, offset;
size_t delta = 4;
unsigned char *tail = zl + intrev32ifbe(ZIPLIST_TAIL_OFFSET(zl));
// 空表直接返回
if (p[0] == ZIP_END) return zl;
// 获取当前条目信息
zipEntry(p, &cur);
firstentrylen = prevlen = cur.headersize + cur.len;
prevlensize = zipStorePrevEntryLength(NULL, prevlen);
prevoffset = p - zl;
p += prevlen;
// 遍历后续条目,检查是否需要连锁更新
while (p[0] != ZIP_END) {
assert(zipEntrySafe(zl, curlen, p, &cur, 0));
// 如果当前条目的 prevlen 已经正确,则无需更新
if (cur.prevrawlen == prevlen) break;
// 如果当前条目的 prevlen 大小足够存储新的 prevlen
if (cur.prevrawlensize >= prevlensize) {
if (cur.prevrawlensize == prevlensize) {
zipStorePrevEntryLength(p, prevlen);
} else {
zipStorePrevEntryLengthLarge(p, prevlen);
}
break;
}
// 确保 prevlen 的一致性
assert(cur.prevrawlen == 0 || cur.prevrawlen + delta == prevlen);
// 更新 prevlen 和 prevlensize
rawlen = cur.headersize + cur.len;
prevlen = rawlen + delta;
prevlensize = zipStorePrevEntryLength(NULL, prevlen);
prevoffset = p - zl;
p += rawlen;
extra += delta;
cnt++;
}
// 如果没有额外空间需求,直接返回
if (extra == 0) return zl;
// 更新表尾偏移量
if (tail == zl + prevoffset) {
if (extra - delta != 0) {
ZIPLIST_TAIL_OFFSET(zl) =
intrev32ifbe(intrev32ifbe(ZIPLIST_TAIL_OFFSET(zl))+extra-delta);
}
} else {
ZIPLIST_TAIL_OFFSET(zl) =
intrev32ifbe(intrev32ifbe(ZIPLIST_TAIL_OFFSET(zl))+extra);
}
// 调整 ziplist 大小
offset = p - zl;
zl = ziplistResize(zl, curlen + extra);
p = zl + offset;
memmove(p + extra, p, curlen - offset - 1);
p += extra;
// 处理连锁更新
while (cnt) {
zipEntry(zl + prevoffset, &cur);
rawlen = cur.headersize + cur.len;
memmove(p - (rawlen - cur.prevrawlensize),
zl + prevoffset + cur.prevrawlensize,
rawlen - cur.prevrawlensize);
p -= (rawlen + delta);
if (cur.prevrawlen == 0) {
zipStorePrevEntryLength(p, firstentrylen);
} else {
zipStorePrevEntryLength(p, cur.prevrawlen+delta);
}
prevoffset -= cur.prevrawlen;
cnt--;
}
return zl;
}
尽管连锁更新发生的概率很小,但是一旦发生,就会导致大量的数据移动,从而影响性能。
总结
其它的操作,如删除、查找等,都是类似的。这里不再赘述。
ziplist
的优缺点也很明显:
- 存储结构紧凑,节省空间;
- 通过记录前一个节点的长度,实现了双向遍历;
- 插入、删除操作可能会导致连锁更新,影响性能。
listpack
存储结构
我们反思一下 ziplist
的问题。ziplist
的连锁更新问题是由于每个节点都包含了前一个节点的长度,而前一个节点长度所占据的空间大小可能会发生变化。
要想彻底解决这个问题,就不能记录前一个节点的长度————于是 listpack
诞生了。
listapck
单个条目的定义和 ziplist
略有区别:
typedef struct {
// 当存储的是字符串时,sval 指向字符串
unsigned char *sval;
// slen 为当前字符串长度
uint32_t slen;
// 当存储的是整数时,lval 为整数值,sval 为 NULL
long long lval;
} listpackEntry;
一个 listpack
在内存中的结构如下:
bytes | len | entry | entry | entry | end |
---|---|---|---|---|---|
32 bits | 16 bits | 8 bits |
-
bytes
:listpack
的总字节数 -
len
:listpack
中的元素个数 -
entry
:listpack
中的具体元素 -
end
:结束标志
展开代码
我们直接来看 listpack
的初始化:
unsigned char *lpNew(size_t capacity) {
// 预先分配至少 6+1 个字节的空间
unsigned char *lp = lp_malloc(capacity > LP_HDR_SIZE+1 ? capacity : LP_HDR_SIZE+1);
if (lp == NULL) return NULL;
// 将 32 位整数 6+1 存储到 lp 中,表示当前的总字节数
lpSetTotalBytes(lp,LP_HDR_SIZE+1);
// 将 16 位整数 0 接着存储到后面,表示当前的条目数
lpSetNumElements(lp,0);
// 后面再填充 8 位的 0xFF
lp[LP_HDR_SIZE] = LP_EOF;
return lp;
}
在内存中,每个 entry
的结构如下:
encode & len | content |
---|---|
8/16/40 bits |
-
encode & len
:编码和长度,用于向后移动 -
content
:条目的内容
尽管 Redis 代码中定义了
listpackEntry
,但是实际上并没有使用。
可以看到,listpack
没有记录前一个节点的长度,因此不会出现连锁更新的问题。
索引
对于正向遍历,listpack
和 ziplist
没有太大区别。
但是问题来了,对于反向遍历,listpack
由于不记录前一个节点的长度,怎么办呢?解决这个事情也不难:
- 将指针指向前一个节点的最后一个字节
- 计算得到前一个节点的长度
- 将指针移动到前一个节点的第一个字节
展开代码
unsigned char *lpPrev(unsigned char *lp, unsigned char *p) {
assert(p);
if (p-lp == LP_HDR_SIZE) return NULL;
// 将 p 指向前一个条目的最后一个字节
p--;
// 计算前一个条目的长度
uint64_t prevlen = lpDecodeBacklen(p);
prevlen += lpEncodeBacklenBytes(prevlen);
// 移动到前一个条目的第一个字节
p -= prevlen-1;
lpAssertValidEntry(lp, lpBytes(lp), p);
return p;
}
这里的关键在于如何计算前一个条目长度。listpack
采用了一种变长编码的方式:
- 每个字节低 7 位存储数据
- 每个字节高 1 位指示是否为最后一个字节
- 如果为 0,说明是最后一个字节
- 如果为 1,说明后面还有字节
也就是说,listpack
每个节点在内存中类似 0xxxxxxx 1xxxxxxx 1xxxxxxx ...
。
展开代码
static inline uint64_t lpDecodeBacklen(unsigned char *p) {
uint64_t val = 0;
uint64_t shift = 0;
do {
// 获取当前字节的低 7 位,存入 val
val |= (uint64_t)(p[0] & 127) << shift;
// 如果当前字节的高 1 位为 0,说明是最后一个字节
if (!(p[0] & 128)) break;
// 否则,继续读取下一个字节
shift += 7;
p--;
// 如果 shift 大于 28,说明超过了 5 个字节,报错
if (shift > 28) return UINT64_MAX;
} while(1);
return val;
}
插入、删除和替换
listpack
的插入、删除和替换操作都写在了同一个函数中。
例如插入,只需要:
- 计算插入位置的偏移量
- 计算新的总字节数
- 重新分配空间
- 将插入位置后的条目向后移动
- 插入当前条目
这个过程和插入排序几乎一模一样,不再发生连锁更新。
展开代码
unsigned char *lpInsert(unsigned char *lp, unsigned char *elestr, unsigned char *eleint,
uint32_t size, unsigned char *p, int where, unsigned char **newp)
{
/* elestr/eleint 为要插入或替换的内容,size 为编码后的长度
* where 指示要新元素的位置,包括 LP_BEFORE / LP_AFTER / LP_REPLACE
* newp 返回操作后元素的位置 */
// 用于存储编码后的整数
unsigned char intenc[LP_MAX_INT_ENCODING_LEN];
// 用于存储前一个元素的长度
unsigned char backlen[LP_MAX_BACKLEN_SIZE];
// 编码所需的字节数
uint64_t enclen;
// 判断是否为删除操作
int delete = (elestr == NULL && eleint == NULL);
// 如果是删除操作,强制将 where 设置为 LP_REPLACE
if (delete) where = LP_REPLACE;
// 如果需要在当前元素之后插入,跳到下一个元素并将位置参数设置为在之前插入
if (where == LP_AFTER) {
p = lpSkip(p);
where = LP_BEFORE;
ASSERT_INTEGRITY(lp, p);
}
// 计算插入位置的偏移量,方便在重新分配空间后定位
unsigned long poff = p-lp;
int enctype;
if (elestr) {
// 如果要插入的是字符串,计算编码方式,并将 enclen 设置为字符串长度 + 编码长度
enctype = lpEncodeGetType(elestr,size,intenc,&enclen);
// 发现有大聪明把整数传给了 elestr
if (enctype == LP_ENCODING_INT) eleint = intenc;
} else if (eleint) {
// 如果要插入的是整数,直接设置编码方式
enctype = LP_ENCODING_INT;
enclen = size;
} else {
// 如果什么也不是,说明是删除操作
enctype = -1;
enclen = 0;
}
// 将 enclen 存入 backlen,并返回后退步数的编码所需字节数
unsigned long backlen_size = (!delete) ? lpEncodeBacklen(backlen,enclen) : 0;
// 计算总字节数
uint64_t old_listpack_bytes = lpGetTotalBytes(lp);
uint32_t replaced_len = 0;
if (where == LP_REPLACE) {
// 需要替换的长度 + 该长度的编码所需字节数
replaced_len = lpCurrentEncodedSizeUnsafe(p);
replaced_len += lpEncodeBacklenBytes(replaced_len);
ASSERT_INTEGRITY_LEN(lp, p, replaced_len);
}
// 计算新的总字节数:原总字节数 + 新值长度 + 编码长度 + 后退步数的编码长度 - 被替换元素长度
uint64_t new_listpack_bytes = old_listpack_bytes + enclen + backlen_size
- replaced_len;
if (new_listpack_bytes > UINT32_MAX) return NULL;
unsigned char *dst = lp + poff;
if (new_listpack_bytes > old_listpack_bytes &&
new_listpack_bytes > lp_malloc_size(lp)) {
// 如果需要扩容,重新分配空间
if ((lp = lp_realloc(lp,new_listpack_bytes)) == NULL) return NULL;
dst = lp + poff;
}
if (where == LP_BEFORE) {
// 如果是在当前元素之前插入,需要将后续元素向后移动
// 移动的源地址为当前元素的地址
// 移动的目标地址为当前元素的地址 + 新值长度 + 编码长度 + 后退步数的编码长度
// 移动的内容长度为原总字节数 - 插入位置的偏移量
memmove(dst+enclen+backlen_size,dst,old_listpack_bytes-poff);
} else {
// 如果是替换操作
// 移动的源地址为当前元素的地址 + 需要替换的长度 + 该长度的编码所需字节数
// 移动的目标地址为当前元素的地址 + 新值长度 + 编码长度 + 后退步数的编码长度
// 移动的内容长度为原总字节数 - 插入位置的偏移量 - 需要替换的长度 - 该长度的编码所需字节数
memmove(dst+enclen+backlen_size,
dst+replaced_len,
old_listpack_bytes-poff-replaced_len);
}
// 如果新的总字节数小于原总字节数,需要释放多余的空间
if (new_listpack_bytes < old_listpack_bytes) {
if ((lp = lp_realloc(lp,new_listpack_bytes)) == NULL) return NULL;
dst = lp + poff;
}
// 更新 newp,如果是删除操作,将 newp 设置为 NULL
if (newp) {
*newp = dst;
if (delete && dst[0] == LP_EOF) *newp = NULL;
}
if (!delete) {
// 如果不是删除操作,插入新值
if (enctype == LP_ENCODING_INT) {
memcpy(dst,eleint,enclen);
} else if (elestr) {
lpEncodeString(dst,elestr,size);
} else {
redis_unreachable();
}
dst += enclen;
memcpy(dst,backlen,backlen_size);
dst += backlen_size;
}
// 更新头部信息
if (where != LP_REPLACE || delete) {
uint32_t num_elements = lpGetNumElements(lp);
if (num_elements != LP_HDR_NUMELE_UNKNOWN) {
if (!delete)
lpSetNumElements(lp,num_elements+1);
else
lpSetNumElements(lp,num_elements-1);
}
}
lpSetTotalBytes(lp,new_listpack_bytes);
return lp;
}
可以看到,listpack
的插入、删除和替换操作都是 \(O(n)\) 的,但是由于不需要记录前一个节点的长度,不会发生连锁更新,因此性能会更好。
总结
listpack
这样把插入、删除和替换写进同一个函数的做法还是挺有意思的,不过 Redis 在 ziplist
实现时并没有这样做,导致还得重读代码才能理解。
listpack
的特点如下:
- 插入、删除和替换操作不会发生连锁更新,性能更好;
- 由于不记录前一个节点的长度,反向遍历没有那么直接,需要额外的计算;
- 去掉了一个字段,节省了空间;但每个字节都有 1 位用于标记是否为最后一个字节,又消耗掉了一点空间。
quicklist
存储结构
为了缓解 ziplist
连锁更新的问题,Redis 引入了 quicklist
。
quicklist
是一个双向链表,每个节点都包含了一个 ziplist
。这样一来,单个 ziplist
发生连锁更新时,影响范围将会大大减小。
不过,从 Redis 7.0 开始,quicklist
变成了一个 listpack
的双向链表。这样一来,quicklist
的优点就更加明显了,它索引的速度将会更快。
我们先来看双向链表。双向链表的定义为:
typedef struct listNode {
// 前指针
struct listNode *prev;
// 后指针
struct listNode *next;
// 值
void *value;
} listNode;
typedef struct list {
// 头指针
listNode *head;
// 尾指针
listNode *tail;
// 操作函数
void *(*dup)(void *ptr);
void (*free)(void *ptr);
int (*match)(void *ptr, void *key);
// 长度
unsigned long len;
} list;
总体看下来就是最常规的链表实现,没有什么特别的地方。我们就不详细讨论了。
接下来来看 quicklist
。quicklist
的定义为:
typedef struct quicklist {
// 指向第一个 quicklistNode 节点的指针
quicklistNode *head;
// 指向最后一个 quicklistNode 节点的指针
quicklistNode *tail;
// 所有 listpack 中的条目总数
unsigned long count;
// quicklist 节点总数
unsigned long len;
// 填充因子,用于控制每个 quicklistNode 中 listpack 的大小
signed int fill : QL_FILL_BITS;
// 压缩深度,表示从两端开始有多少个 quicklistNode 节点不压缩
unsigned int compress : QL_COMP_BITS;
// 书签数量,用于快速定位 quicklistNode 节点
unsigned int bookmark_count: QL_BM_BITS;
// 书签数组,用于存储书签信息
quicklistBookmark bookmarks[];
} quicklist;
quicklist
由多个 quicklistNode
组成,其定义为:
typedef struct quicklistNode {
// 指向前一个 quicklistNode 节点
struct quicklistNode *prev;
// 指向后一个 quicklistNode 节点
struct quicklistNode *next;
// 指向 listpack 的指针
unsigned char *entry;
// listpack 的总字节数
size_t sz;
// listpack 中的条目数
unsigned int count : 16;
// 是否压缩:0 为不压缩,1 为压缩
unsigned int encoding : 2;
// 容器类型:1 表示普通节点,2 表示压缩节点
unsigned int container : 2;
// 是否重新压缩:1 表示该节点之前被压缩过
unsigned int recompress : 1;
// 是否尝试压缩:1 表示该节点无法压缩,因为数据太小
unsigned int attempted_compress : 1;
// 是否禁止压缩:1 表示防止压缩将来会使用的条目
unsigned int dont_compress : 1;
// 额外的 9 位,用于将来扩展使用
unsigned int extra : 9;
} quicklistNode;
我们注意到 quicklist
中有一个 fill
字段,这个字段用于控制每个 quicklistNode
中 listpack
的大小。
每当插入一个新元素时,都会检查对应位置 quicklistNode
的 listpack
是否已经满了,如果满了就新建一个 quicklistNode
。这样一来,既不至于 listpack
过大,也不至于 quicklistNode
过多。
插入
quicklist
的插入代码支持了在指定 listpack 节点的前面或者后面插入元素。但是这代码又臭又长,这里就不粘进来写注释了。它大概做了这些事情:
- 如果没有可供插入的 listpack 节点,新建一个,并将新元素插入其中
- 然后判断了几个条件:
- 当前 listpack 节点是否已满
- 如果在后方插入,判断一下下一个 listpack 节点是否已满
- 如果在前方插入,判断一下上一个 listpack 节点是否已满
- 如果元素太大:
- 如果当前节点的
container
使用了QUICKLIST_NODE_CONTAINER_PLAIN
,直接插入为新节点 - 否则,调用拆分函数,将当前节点拆分为两个节点,然后插入新节点
- 如果当前节点的
- 接下来就是真正的插入操作了
- 如果当前节点没满且在尾部插入,则在当前节点的当前位置之后插入
- 如果当前节点没满且在头部插入,则在当前节点的当前位置之前插入
- 如果当前节点满了且在尾部插入且下一个节点没满,则在下一个节点的头部插入
- 如果当前节点满了且在头部插入且上一个节点没满,则在上一个节点的尾部插入
- 如果当前节点满了且相邻节点都满了,则新建一个节点,然后插入
- 如果当前节点已满且需要拆分节点,则拆分当前节点并插入新的节点
我觉得这个函数从逻辑上来讲是很顺畅的,但是代码量太大了,不高兴粘贴过来写注释了 :P
总结
quicklist
的优点如下:
- 通过双向链表的方式,解决了
ziplist
连锁更新的问题; - Redis 7.0 开始,用
listpack
替换ziplist
,进一步减少空间浪费,加快操作速度。
HYPERLOGLOG
基数估计
Cardinality,中文基数,表示的是一个集合中不重复元素的个数。在 Redis 中,HYPERLOGLOG
就是用来估计基数的。
按照常规想法,我们可能会用一个集合来存储所有的元素;每当出现一个新元素,先查找是否已经存在,如果不存在则插入。这样一来,集合中的元素个数就是基数。但是这种方法的空间复杂度是 \(O(n)\) 的,当元素个数很大时,会占用大量内存。
HYPERLOGLOG
使用的算法来源于 Heule 等人1和 P. Flajolet 等人2,它只需要占用固定大小的内存(12 KB + 16 B),就可以在 \(O(1)\) 的时间复杂度内估计基数,且误差率在 \(0.81\%\)。
它的原理可以化简为抛硬币问题。我只需要问你:最多有多少次是连续正面朝上的?就可以估计出抛硬币的总次数。这个估计值也许存在偶然性,但是在大量数据下,这个估计值是相对准确的。
我们还可以改进一下,给你 10 枚硬币去抛,让你分别记录这 10 枚硬币中连续正面朝上的最大次数。这样一来,我们估计得到的结果会更加准确。
HYPERLOGLOG
的原理是这样的:它将每个新元素通过哈希函数映射到一个固定长度的二进制串,其可用于索引到对应的寄存器(对应了分别记录不同的硬币),然后统计这些二进制串中前导 0 的最大长度(对应了硬币连续正面朝上的次数)。
具体来讲,HYPERLOGLOG
有 \(16384\)(\(2^{14}\)) 个寄存器,每个寄存器存储着 \(0-51\),共计 12 KB。
Redis 使用了 64 位的 MurmurHash2 哈希算法,将元素哈希到一个 64 位的整数。
展开查看 MurmurHash2 算法实现
uint64_t MurmurHash64A (const void * key, int len, unsigned int seed) {
const uint64_t m = 0xc6a4a7935bd1e995;
const int r = 47;
uint64_t h = seed ^ (len * m);
const uint8_t *data = (const uint8_t *)key;
const uint8_t *end = data + (len-(len&7));
while(data != end) {
uint64_t k;
#if (BYTE_ORDER == LITTLE_ENDIAN)
#ifdef USE_ALIGNED_ACCESS
memcpy(&k,data,sizeof(uint64_t));
#else
k = *((uint64_t*)data);
#endif
#else
k = (uint64_t) data[0];
k |= (uint64_t) data[1] << 8;
k |= (uint64_t) data[2] << 16;
k |= (uint64_t) data[3] << 24;
k |= (uint64_t) data[4] << 32;
k |= (uint64_t) data[5] << 40;
k |= (uint64_t) data[6] << 48;
k |= (uint64_t) data[7] << 56;
#endif
k *= m;
k ^= k >> r;
k *= m;
h ^= k;
h *= m;
data += 8;
}
switch(len & 7) {
case 7: h ^= (uint64_t)data[6] << 48; /* fall-thru */
case 6: h ^= (uint64_t)data[5] << 40; /* fall-thru */
case 5: h ^= (uint64_t)data[4] << 32; /* fall-thru */
case 4: h ^= (uint64_t)data[3] << 24; /* fall-thru */
case 3: h ^= (uint64_t)data[2] << 16; /* fall-thru */
case 2: h ^= (uint64_t)data[1] << 8; /* fall-thru */
case 1: h ^= (uint64_t)data[0];
h *= m; /* fall-thru */
};
h ^= h >> r;
h *= m;
h ^= h >> r;
return h;
}
然后取这个整数的低 14 位作为寄存器的索引,取剩下的 50 位中前导 0 的长度 + 1 作为该元素的值。如果这个值大于寄存器中的值,则更新寄存器中的值。换句话说,寄存器中存储的是落到其中的所有元素的最大前导 0 的长度。
那么,如何根据这些寄存器中的值来估计基数呢?这里用到了一个算法。假设寄存器中的值位 \(C=\left\{c_0,c_1,\cdots,c_{q+1}\right\}\),那么估计的基数为:
\[\begin{aligned} &z \leftarrow m \cdot \tau\left(1 - C_{q+1} / m\right)\\ &\text{for } k \leftarrow q \text{ to } 1 \text{ do} \\ &\quad\quad z \leftarrow 0.5 \times \left(z + C_k\right)\\ &z \leftarrow z + m \cdot \sigma\left(C_0 / m\right)\\ &\text{return } \alpha_{\infty} \cdot m^2 / z \end{aligned}\]展开查看算法实现
其实现为:
uint64_t hllCount(struct hllhdr *hdr, int *invalid) {
double m = HLL_REGISTERS;
double E;
int j;
int reghisto[64] = {0};
if (hdr->encoding == HLL_DENSE) {
hllDenseRegHisto(hdr->registers,reghisto);
} else if (hdr->encoding == HLL_SPARSE) {
hllSparseRegHisto(hdr->registers,
sdslen((sds)hdr)-HLL_HDR_SIZE,invalid,reghisto);
} else if (hdr->encoding == HLL_RAW) {
hllRawRegHisto(hdr->registers,reghisto);
} else {
serverPanic("Unknown HyperLogLog encoding in hllCount()");
}
double z = m * hllTau((m-reghisto[HLL_Q+1])/(double)m);
for (j = HLL_Q; j >= 1; --j) {
z += reghisto[j];
z *= 0.5;
}
z += m * hllSigma(reghisto[0]/(double)m);
E = llroundl(HLL_ALPHA_INF*m*m/z);
return (uint64_t) E;
}
其中:
- \(m = 2^{14}\),即寄存器的个数
#define HLL_P 14 #define HLL_REGISTERS (1<<HLL_P)
- \(\tau(x)\) 是估计函数,定义为 \(\tau(x) = 1 - x - \frac{(1-\sqrt[2]{x})^2}{2} - \frac{(1-\sqrt[4]{x})^2}{4} - \frac{(1-\sqrt[8]{x})^2}{8} - \cdots\)
double hllTau(double x) { if (x == 0. || x == 1.) return 0.; double zPrime; double y = 1.0; double z = 1 - x; do { x = sqrt(x); zPrime = z; y *= 0.5; z -= pow(1 - x, 2)*y; } while(zPrime != z); return z / 3; }
- \(\sigma(x)\) 是估计函数,定义为 \(\sigma(x) = x + x^2 + 2x^4 + 4x^8 + 8x^{16} + \cdots\)
double hllSigma(double x) { if (x == 1.) return INFINITY; double zPrime; double y = 1; double z = x; do { x *= x; zPrime = z; z += x * y; y += y; } while(zPrime != z); return z; }
- \(\alpha_{\infty}=0.5\ln2\) 是一个常数,用于调整估计值
#define HLL_ALPHA_INF 0.721347520444481703680
由于寄存器数量固定,因此这个算法是 \(O(1)\) 的。
存储结构
HYPERLOGLOG
本质上也是一个字符串,但它有着自己的一套存储结构。它分为 dense
和 sparse
两种存储结构,分别对应于密集型和稀疏型的情况。
我们先来看 dense
结构。
它有一个 16 字节的头部,用于存储一些元信息:
+------+---+-----+----------+
| HYLL | E | N/U | Cardin. |
+------+---+-----+----------+
- 前 4 个字节是
HYLL
,这是个 Magic Number,用于标识这是一个HYPERLOGLOG
结构; - 接下来 1 个字节用于标识是
dense
还是sparse
结构; - 再接下来 3 个字节暂未使用;
- 最后 8 个字节是以小端存储的 64 位整数,用于存储最后一次计算的基数。基数最高字节的最高位指示了是否被修改过,如果没有修改,则可以重复使用。
它的每个寄存器条目为 6 位,其格式如下:
+--------+--------+--------+------//
|11000000|22221111|33333322|554444//
+--------+--------+--------+------//
稀疏表示
由于存储的寄存器内容可能包含大量的 0,因此可以使用稀疏表示来存储。稀疏表示使用游程长度编码来编码寄存器的内容。游程长度编码是一种压缩算法,它将连续的重复数据压缩为一个数据和一个长度:
-
00xxxxxx
表示长度为 \(1-64\) 个连续的值为 0 的寄存器。注意,xxxxxx
为实际长度减一; -
01xxxxxx yyyyyyyy
表示长度为 \(0-16384\) 个连续的值为 0 的寄存器。注意,xxxxxxyyyyyyyy
为实际长度加一; -
1vvvvvxx
表示 \(1-32\) 的值重复了 \(1-4\) 次。注意,vvvvv
和xx
均为实际值减一。
我们注意到,我们无法表示一个大于 32 的重复值,这时因为这种情况出现的概率很小,因此可以忽略。
密集表示占用内存更大,但处理速度更快;稀疏表示占用内存更小,但处理速度更慢。因此,我们可以通过修改 hll-sparse-max-bytes
来决定何时使用稀疏表示。它的默认值是 3000,即计算得到的基数大于 3000 时使用稀疏表示。
为了加快速度,Redis 对 HYPERLOGLOG
的实现非常的炫技。它大量使用了宏定义和位运算技巧,并且几乎没有使用任何的分支语句。
stream
我们有这一项只是占个坑,但我不太想看它的代码了。
我们前面讲到,Redis 的 stream
相比于 Kafka、RabbitMQ 等消息队列,功能上有所欠缺。我们以后将这些专业消息队列时,再看看它们的实现。
Comments