本文将会从源码层面解读 Redis 的各项机制。我们会从最基础的内容开始,尽量只看 Redis 最核心的部分,而剥离掉一些不太重要的内容,所有的源码都给出了 GitHub 的链接。
写作本文时,Redis CE 还在 7.4 版本;Redis 8.0 仍然在 Pre-Release。因此,本文主要基于 Redis 7.4,其余版本可能有所不同。本文使用的环境为 Debian 12。
简单来讲,Redis 是一个内存优先的 KV NoSQL。它有这么几个优点:
- 速度快:Redis 是基于内存的,所以速度非常快。
- 支持多种 Redis 对象:Redis 支持多种 Redis 对象,包括字符串、列表、集合、有序集合、哈希表等。
- 原子化操作:Redis 支持原子化操作,可以保证多个操作的原子性。
安装与启动
安装 Redis 很简单,直接用包管理器安装即可。例如,在 Debian 上:
sudo apt install redis-server
Windows 可以使用 WSL,也可以直接下载 Windows 版本的 Redis。值得注意的是,微软维护的版本早就不更新了,我看到 Chocolatey 上的版本是最新的,可以尝试安装:
sudo choco install redis
redis-server
安装完成后,可以使用 redis-cli
进行连接:
redis-cli
并且可以看一下是否正常:
$ ping
PONG
如果出问题了,可以看一下 Redis 是否启动了:
sudo service redis-server status
sudo service redis-server start
Redis 支持的所有命令及文档可以在这里查看。
Redis 对象
数据在 Redis 内的存储方式多种多样,分为以下几种:
char *strEncoding(int encoding) {
switch(encoding) {
case OBJ_ENCODING_RAW: return "raw";
case OBJ_ENCODING_INT: return "int";
case OBJ_ENCODING_HT: return "hashtable";
case OBJ_ENCODING_QUICKLIST: return "quicklist";
case OBJ_ENCODING_LISTPACK: return "listpack";
case OBJ_ENCODING_LISTPACK_EX: return "listpackex";
case OBJ_ENCODING_INTSET: return "intset";
case OBJ_ENCODING_SKIPLIST: return "skiplist";
case OBJ_ENCODING_EMBSTR: return "embstr";
case OBJ_ENCODING_STREAM: return "stream";
default: return "unknown";
}
}
但是,根据存储内容和操作的不同,Redis 对外暴露了几种 Redis 对象:
Redis 对象 | 存储内容 | 结构读写能力 | 内部实现 |
---|---|---|---|
STRING | 字符串、整数、浮点数 | 对整个字符串或其部分进行操作 自增/减整数和浮点数 |
int embstr sds
|
LIST | 链表 | 从两端压入/弹出键值对 根据偏移量修剪 读取一个或多个键值对 按值查找或删除键值对 |
quicklist listpack
|
HASH | 哈希表 | 增加/删除/查询键值对 获取所有键值对 | listpack |
SET | 集合 | 增加/删除/查询键值对 检查元素是否存在 计算交集/并集/差集 随机获取元素 |
intset listpack hashtable
|
ZSET | 有序的字符串-浮点数对集合 | 增加/删除/查询键值对 获取所有键值对 按分数范围查找键值对 |
listpack skiplist
|
此外,还有一些后来加入的 Redis 对象:
Redis 对象 | 应用场景 | 加入版本 | 内部实现 |
---|---|---|---|
BITMAP | 位图 | 2.2 | 字符串 |
HYPERLOGLOG | 基数统计 | 2.8 | 字符串 |
GEO | 地理位置 | 3.2 | ZSET |
STREAM | 消息队列 | 5.0 | stream |
此外,Redis 还支持 MODULE
,可以自定义对象。
当然,在未发布的 Redis 8.0 中,增加了 JSON
、TIME_SERIES
等对象。它们基本都是从 MODULE
转正的。等到正式发布后,我们再来介绍。
STRING
基本操作
最基本的命令就是 SET
和 GET
,其用法为 SET key value
和 GET key
:
$ SET name ch3nyang
OK
$ GET name
"ch3nyang"
当 value
为字符串时,最大长度为 512MB。
可以使用 DEL
删除键值对:
$ DEL name
(integer) 1
$ GET name
(nil)
这里,返回的 (integer) 1
表示操作的键中的元素个数。
对于大 KEY,还可以使用 unlink
来异步删除,使其不会阻塞主线程。
在 Redis 中,STRING
也可以存储整数和浮点数。对于整数,可以使用 INCR
和 DECR
自增和自减:
$ SET count 1
OK
$ INCR count
(integer) 2
$ DECR count
(integer) 1
$ INCRBY count 5
(integer) 6
对于浮点数,可以使用 INCRBYFLOAT
自增浮点数:
$ SET price 10.5
OK
$ INCRBYFLOAT price 1.2
"11.7"
内部实现
在 Redis 内部,根据值的不同,STRING
有四种编码方式:
-
shared.integers
:用于存储 0-10000 之间的整数$ SET count 1 OK $ OBJECT ENCODING count "int"
这是为了节省内存而共享的一些整数。
-
int
:用于存储 long 范围内的整数$ SET count 10001 OK $ OBJECT ENCODING count "int"
-
embstr
:用于存储长度小于 44 的字符串$ SET name ch3nyang OK $ OBJECT ENCODING name "embstr"
-
sds
:用于存储长度大于 44 的字符串$ SET name ch3nyangch3nyangch3nyangch3nyangch3nyangch3nyangch3nyang OK $ OBJECT ENCODING name "raw"
有一点值得一提的是,编码方式是会跟着值的变化而变化的。比如,当一个整数超过了 long 范围,Redis 会将其转换为字符串。
展开实现代码
编码方式的判断代码:
robj *createStringObjectFromLongLongWithOptions(long long value, int flag) {
robj *o;
if (value >= 0 && value < OBJ_SHARED_INTEGERS && flag == LL2STROBJ_AUTO) {
// 如果在 0-10000 之间,使用共享整数
o = shared.integers[value];
} else {
if ((value >= LONG_MIN && value <= LONG_MAX) && flag != LL2STROBJ_NO_INT_ENC) {
// 如果在 long 范围内,使用 int 编码
o = createObject(OBJ_STRING, NULL);
o->encoding = OBJ_ENCODING_INT;
o->ptr = (void*)((long)value);
} else {
// 否则使用字符串编码
char buf[LONG_STR_SIZE];
int len = ll2string(buf, sizeof(buf), value);
o = createStringObject(buf, len);
}
}
return o;
}
以及这段代码:
robj *createStringObject(const char *ptr, size_t len) {
if (len <= OBJ_ENCODING_EMBSTR_SIZE_LIMIT)
// 如果长度小于 44,使用 embstr 编码
return createEmbeddedStringObject(ptr,len);
else
// 否则使用 sds 编码
return createRawStringObject(ptr,len);
}
其中,我们注意到了有个 shared
的变量,这是 Redis 为了节省内存而共享的一些内容,包括:
- 0-10000 的整数
- 各种命令返回值的字符串
应用
-
缓存对象
STRING
可以用来缓存对象。比如,我们可以将一个对象序列化为字符串,然后存储在 Redis 中。当然,对于 JSON 之类的数据,也可以使用
MGET
和MSET
来批量操作:$ MSET user:1:name ch3nyang user:1:passwd 123456 OK $ MGET user:1:name user:1:passwd 1) "ch3nyang" 2) "123456
-
计数器
STRING
可以用来实现计数器,记录点赞数、浏览数等。例如:$ SET post:readcount:1 0 OK $ INCR post:readcount:1 (integer) 1
-
分布式锁
STRING
可以用来实现分布式锁。这里需要使用SETNX
来实现,它会在键不存在时才设置键值对,否则会返回 0。我们话可以同时为它加上一个过期时间,这样即使锁没有被释放,也不会一直占用内存:$ SETNX lock 378236 NX PX 10000 (integer) 1 $ SETNX lock 242343 NX PX 10000 (integer) 0
解锁则比较麻烦,需要先获取值,然后判断是否相等,最后删除。这里需要使用 Lua 脚本来保证原子性:
EVAL "if redis.call('GET', KEYS[1]) == ARGV[1] then return redis.call('DEL', KEYS[1]) else return 0 end" 1 lock 378236
-
存储 Session
STRING
可以用来存储 Session。这里需要使用SETEX
来设置过期时间:$ SETEX session:378236 3600 "ch3nyang" OK
LIST
基本操作
对于列表:
- 使用
LPUSH
和RPUSH
分别从左和右压入元素 - 使用
LPOP
和RPOP
分别从左和右弹出元素 - 使用
LRANGE
获取范围内的元素:
$ RPUSH user ch3nyang A B C
(integer) 4
$ LPUSH user D E
(integer) 6
$ LRANGE user 0 -1
1) "E"
2) "D"
3) "ch3nyang"
4) "A"
5) "B"
6) "C"
$ LRANGE user 0 2
1) "E"
2) "D"
3) "ch3nyang"
$ LINDEX user 2
"ch3nyang"
$ LINDEX user 8
(nil)
$ LPOP user
"E"
$ LRANGE user 0 -1
1) "D"
2) "ch3nyang"
3) "A"
4) "B"
5) "C"
这里有几个要注意的地方:
- 不管是
LPUSH
还是RPUSH
,都是将输入的内容从左到右挨个压入的 -
LRANGE
的索引是从 0 开始的,且包含两个端点 - 如果索引超出范围,会返回
nil
。
内部实现
目前的 LIST
在 Redis 主要使用 quicklist
和 listpack
来存储。使用哪种数据结构由配置文件中的 list-max-listpack-size
决定:
- 如果这个值为正数,则为
listpack
的最大元素个数 - 如果这个值为负数,指代
listpack
的最大字节数;-1
表示 4096(64 Kb)、-2
表示 8192……-5
表示 65536
其默认值为 -2
,也就是让 listpack
不超过 8192 字节。
我们这里为了方便演示,暂时将 list-max-listpack-size
改为 2:
试完记得该回去!
-
listpack
:$ RPUSH user ch3nyang (integer) 1 $ OBJECT ENCODING user "listpack"
-
quicklist
:$ RPUSH user ch3nyang A B C (integer) 4 $ OBJECT ENCODING user "quicklist"
展开实现代码
证据在PUSH
的代码:
/**
* @param where LIST_HEAD(0) or LIST_TAIL(1)
* @param xx 如果设置为 1,则只有当 key 存在时才会进行操作
*/
void pushGenericCommand(client *c, int where, int xx) {
int j;
robj *lobj = lookupKeyWrite(c->db, c->argv[1]);
if (checkType(c,lobj,OBJ_LIST)) return;
if (!lobj) {
if (xx) {
// 如果键不存在且 xx 为 1,报错返回
addReply(c, shared.czero);
return;
}
// 如果之前没有这个键,则直接创建 listpack 对象
lobj = createListListpackObject();
dbAdd(c->db,c->argv[1],lobj);
}
// 在添加元素前,决定是否要转换 list 的编码方式
listTypeTryConversionAppend(lobj,c->argv,2,c->argc-1,NULL,NULL);
// 插入元素
for (j = 2; j < c->argc; j++) {
listTypePush(lobj,c->argv[j],where);
server.dirty++;
}
addReplyLongLong(c, listTypeLength(lobj));
char *event = (where == LIST_HEAD) ? "lpush" : "rpush";
signalModifiedKey(c,c->db,c->argv[1]);
notifyKeyspaceEvent(NOTIFY_LIST,event,c->argv[1],c->db->id);
}
其中,listTypeTryConversionAppend
会在添加元素前决定是否要转换 list 的编码方式:
/**
* 尝试将 listpack 转换为 quicklist
* @param o 要转换的对象
* @param argv 要添加的元素
* @param start 开始索引
* @param end 结束索引
* @param fn 转换前的回调函数
* @param data 回调函数的参数
*/
static void listTypeTryConvertListpack(robj *o, robj **argv, int start, int end,
beforeConvertCB fn, void *data)
{
serverAssert(o->encoding == OBJ_ENCODING_LISTPACK);
size_t add_bytes = 0;
size_t add_length = 0;
// 计算要添加的字节数和长度
if (argv) {
for (int i = start; i <= end; i++) {
if (!sdsEncodedObject(argv[i]))
continue;
add_bytes += sdslen(argv[i]->ptr);
}
add_length = end - start + 1;
}
// 检查是否超过了限制
if (quicklistNodeExceedsLimit(server.list_max_listpack_size,
lpBytes(o->ptr) + add_bytes, lpLength(o->ptr) + add_length))
{
// 如果超过了限制,转换为 quicklist
if (fn) fn(data);
quicklist *ql = quicklistNew(server.list_max_listpack_size, server.list_compress_depth);
if (lpLength(o->ptr))
quicklistAppendListpack(ql, o->ptr);
else
lpFree(o->ptr);
o->ptr = ql;
o->encoding = OBJ_ENCODING_QUICKLIST;
}
}
/**
* 根据对象本身的类型,将其转换为另一种类型
* @param o 要转换的对象
* @param lct 转换类型(LIST_CONV_GROWING、LIST_CONV_SHRINKING、LIST_CONV_AUTO)
* @param argv 要添加的元素
* @param start 开始索引
* @param end 结束索引
* @param fn 转换前的回调函数
* @param data 回调函数的参数
*/
static void listTypeTryConversionRaw(robj *o, list_conv_type lct,
robj **argv, int start, int end,
beforeConvertCB fn, void *data)
{
if (o->encoding == OBJ_ENCODING_QUICKLIST) {
// 转换为 listpack
if (lct == LIST_CONV_GROWING) return;
listTypeTryConvertQuicklist(o, lct == LIST_CONV_SHRINKING, fn, data);
} else if (o->encoding == OBJ_ENCODING_LISTPACK) {
// 转换为 quicklist
if (lct == LIST_CONV_SHRINKING) return;
listTypeTryConvertListpack(o, argv, start, end, fn, data);
} else {
serverPanic("Unknown list encoding");
}
}
void listTypeTryConversionAppend(robj *o, robj **argv, int start, int end,
beforeConvertCB fn, void *data)
{
listTypeTryConversionRaw(o, LIST_CONV_GROWING, argv, start, end, fn, data);
}
quicklistNodeExceedsLimit
被用于判断是否要转换为 quicklist:
// 优化等级,代表 listpack 的最大字节数
static const size_t optimization_level[] = {4096, 8192, 16384, 32768, 65536};
static size_t quicklistNodeNegFillLimit(int fill) {
assert(fill < 0);
size_t offset = (-fill) - 1;
size_t max_level = sizeof(optimization_level) / sizeof(*optimization_level);
if (offset >= max_level) offset = max_level - 1;
return optimization_level[offset];
}
/**
* 根据 fill 的值返回 listpack 的最大字节数或者最大元素个数
*/
void quicklistNodeLimit(int fill, size_t *size, unsigned int *count) {
*size = SIZE_MAX; // _UI64_MAX / _UI32_MAX
*count = UINT_MAX;
if (fill >= 0) {
// 如果 fill 为正数,表示每个节点的最大元素个数
*count = (fill == 0) ? 1 : fill;
} else {
// 如果 fill 为负数,取反后表示优化等级。我们根据优化等级返回 listpack 的最大字节数
*size = quicklistNodeNegFillLimit(fill);
}
}
/**
* 判断新了 list 的大小是否超过了限制
* @param fill 设置中的 list-max-listpack-size。如果为正数,表示每个节点的最大元素个数;如果为负数,表示每个节点的最大字节数
* @param new_sz 新元素的字节数
* @param new_count 新元素的个数
* @return 是否超过限制
*/
int quicklistNodeExceedsLimit(int fill, size_t new_sz, unsigned int new_count) {
size_t sz_limit;
unsigned int count_limit;
quicklistNodeLimit(fill, &sz_limit, &count_limit);
if (likely(sz_limit != SIZE_MAX)) {
// 使用最大元素个数来判断
return new_sz > sz_limit;
} else if (count_limit != UINT_MAX) {
// 使用最大字节数来判断
if (!sizeMeetsSafetyLimit(new_sz)) return 1;
return new_count > count_limit;
}
redis_unreachable();
}
而在 Redis 3.2 之前,LIST
则采用了 ziplist
和 linkedlist
两种方式。
应用
-
消息队列
LIST
可以用来实现消息队列。比如,我们可以使用LPUSH
和RPOP
来实现一个简单的消息队列:$ LPUSH message ch3nyang (integer) 1 $ LPUSH message A (integer) 2 $ RPOP message "ch3nyang" $ RPOP message "A"
不过,这样做的话,消费者需要不停地轮询。如果要实现一个更好的消息队列,可以使用
BRPOP
和BLPOP
,这两个命令会阻塞直到有元素被压入:$ BRPOP message 0 $ LPUSH message ch3nyang (integer) 1 $ LPUSH message A (integer) 2 1) "message" 2) "ch3nyang"
为了防止重复的消息,可以手动为每个消息添加一个 ID,然后使用
SET
来记录已经消费的消息。为了保证消息的可靠性,可以使用
RPOPLPUSH
来将消息额外存储在一个备份队列中,然后在消费者消费成功后再删除。LIST
作为消息队列的缺点很明显,它并不能支持多个消费者消费同一个消息。好在,现在的 Redis 已经支持了STREAM
。
HASH
基本操作
对于哈希表:
- 使用
HSET
和HGET
,其用法为HSET key field value [field value ...]
和HGET key field
- 如果要获取所有键值对,可以使用
HGETALL
。注意,这里不能使用GET
。
$ HSET user name ch3nyang passwd 123456 socialcredit 10086
(integer) 3
$ HGETALL user
1) "name"
2) "ch3nyang"
3) "passwd"
4) "123456"
5) "socialcredit"
6) "10086"
$ HGET user name
"ch3nyang"
哈希表同样可以使用 DEL
整体删除。但如果只要删除一个键值对,可以使用 HDEL
:
$ HDEL user passwd
(integer) 1
$ HGETALL user
1) "name"
2) "ch3nyang"
3) "socialcredit"
4) "10086"
$ DEL user
(integer) 1
$ HGETALL user
(empty array)
内部实现
对于哈希表,Redis 在 7.0 之前采用 ziplist
和 hashtable
两种方式存储。而目前则全部使用 listpack
:
$ HSET user name ch3nyang passwd 123456 socialcredit 10086
(integer) 3
$ OBJECT ENCODING user
"listpack"
展开实现代码
判断代码:
static robj *hashTypeLookupWriteOrCreate(client *c, robj *key) {
robj *o = lookupKeyWrite(c->db,key);
if (checkType(c,o,OBJ_HASH)) return NULL;
if (o == NULL) {
o = createHashObject();
dbAdd(c->db,key,o);
}
return o;
}
应用
-
缓存对象
这其实类似之前用
STRING
缓存对象的方法。不过,HASH
可以更好地组织对象,适合属性会频繁变化的对象。
SET
基本操作
对于集合:
- 使用
SADD
添加元素 - 使用
SREM
删除元素 - 使用
SMEMBERS
获取所有元素 - 使用
SISMEMBER
检查元素是否存在:
$ SADD user ch3nyang A B C D D E
(integer) 6
$ SMEMBERS user
1) "A"
2) "ch3nyang"
3) "B"
4) "D"
5) "E"
6) "C"
$ SISMEMBER user A
(integer) 1
$ SREM user A B C
(integer) 3
$ SISMEMBER user A
(integer) 0
$ SMEMBERS user
1) "ch3nyang"
2) "D"
3) "E"
SET
作为集合,不允许重复元素。如果添加了重复元素,只会添加一次。同时,我们注意到,它有时是无序的。
内部实现
在 Redis 内部,SET
有三种编码方式:
-
intset
:元素都是整数且元素个数小于等于set-max-intset-entries
(默认为 512)$ SADD user 1 2 3 4 5 (integer) 5 $ OBJECT ENCODING user "intset"
-
listpack
:元素个数小于等于set-max-listpack-entries
(默认为 512)$ SADD user ch3nyang A B C D E (integer) 5 $ OBJECT ENCODING user "listpack"
-
hashtable
:其它情况$ SADD user ch3nyang A B C D E ... (integer) 1000 $ OBJECT ENCODING user "hashtable"
我们可以通过调整 set-max-intset-entries
和 set-max-listpack-entries
来改变 set
的编码方式。
展开实现代码
它的判断代码:
robj *setTypeCreate(sds value, size_t size_hint) {
if (isSdsRepresentableAsLongLong(value,NULL) == C_OK && size_hint <= server.set_max_intset_entries)
// 如果元素都是整数且元素个数小于等于 set_max_intset_entries,使用 intset 编码
return createIntsetObject();
if (size_hint <= server.set_max_listpack_entries)
// 如果元素个数小于等于 set_max_listpack_entries,使用 listpack 编码
return createSetListpackObject();
// 否则使用 hashtable 编码
robj *o = createSetObject();
dictExpand(o->ptr, size_hint);
return o;
}
应用
-
点赞统计
SET
可以用来保证每个用户只能点赞一次。比如,我们可以使用SADD
来添加点赞用户,使用SCARD
来统计点赞数:$ SADD post:1:like ch3nyang A B C D E (integer) 5 $ SREM post:1:like B $ SCARD post:1:like (integer) 4
-
好友关系
可以利用
SET
的交集、并集和差集等来求共同好友、共同关注等:$ SADD user:1:follow ch3nyang A B C D E (integer) 5 $ SADD user:2:follow ch3nyang A B C D E F (integer) 6 $ SINTER user:1:follow user:2:follow 1) "ch3nyang" 2) "A" 3) "B" 4) "C" 5) "D" 6) "E"
ZSET
基本操作
对于有序集合:
- 使用
ZADD
添加元素 - 使用
ZREM
删除元素 - 使用
ZRANGE
获取范围内的元素 - 使用
ZSCORE
获取元素的分数 - 使用
ZRANGEBYSCORE
获取分数范围内的元素:
$ ZADD user 10086 ch3nyang 20 A 30 B 10 C
(integer) 4
$ ZRANGE user 0 -1
1) "C"
2) "A"
3) "B"
4) "ch3nyang"
$ ZRANGE user 0 -1 WITHSCORES
1) "C"
2) "10"
3) "A"
4) "20"
5) "B"
6) "30"
7) "ch3nyang"
8) "10086"
$ ZRANGEBYSCORE user 0 30
1) "C"
2) "A"
3) "B"
$ ZSCORE user ch3nyang
"10086"
$ ZREM user ch3nyang
(integer) 1
$ ZSCORE user ch3nyang
(nil)
有序集合是按照分数从小到大排序的。如果需要逆序输出,可以使用 ZREVRANGE
。
内部实现
在 Redis 内部,ZSET
有两种编码方式:
-
listpack
:元素个数小于等于zset-max-listpack-entries
(默认为 128)且元素长度小于zset-max-listpack-value
(默认为 64)$ ZADD user 1 A 2 B 3 C 4 D (integer) 4 $ OBJECT ENCODING user "listpack"
-
skiplist
:其他情况$ ZADD user 1 ch3nyangch3nyangch3nyangch3nyangch3nyangch3nyangch3nyangch3nyangch3nyang 2 A 3 B 4 C (integer) 10 $ OBJECT ENCODING user "skiplist"
我们可以通过调整 zset-max-listpack-entries
和 zset-max-listpack-value
来改变 ZSET
的编码方式。
值得注意的是,ZSET
在由 listpack
转换为 skiplist
时,只关心元素个数,不关心元素长度。
展开实现代码
它的判断代码:
robj *zsetTypeCreate(size_t size_hint, size_t val_len_hint) {
if (size_hint <= server.zset_max_listpack_entries &&
val_len_hint <= server.zset_max_listpack_value)
{
// 如果元素个数小于等于 zset_max_listpack_entries 且元素长度小于等于 zset_max_listpack_value,使用 listpack 编码
return createZsetListpackObject();
}
// 否则使用 skiplist 编码
robj *zobj = createZsetObject();
zset *zs = zobj->ptr;
dictExpand(zs->dict, size_hint);
return zobj;
}
void zsetTypeMaybeConvert(robj *zobj, size_t size_hint) {
// 如果元素个数超过了 zset_max_listpack_entries,转换为 skiplist 编码,此时不关心元素长度
if (zobj->encoding == OBJ_ENCODING_LISTPACK &&
size_hint > server.zset_max_listpack_entries)
{
zsetConvertAndExpand(zobj, OBJ_ENCODING_SKIPLIST, size_hint);
}
}
应用
-
排行榜
ZSET
可以用来实现排行榜。比如,我们可以使用ZADD
来添加分数,使用ZREVRANGE
来获取排行榜:$ ZADD post:like 10086 ch3nyang 20 A 30 B 10 C (integer) 4 $ ZREVRANGE post:like 0 -1 1) "ch3nyang" 2) "B" 3) "A" 4) "C"
BITMAP
基本操作
BITMAP
就是一串二进制位,每一位都可以单独设置:
- 使用
SETBIT
设置某一位 - 使用
GETBIT
获取某一位 - 使用
BITCOUNT
统计某一段的位数:
$ SETBIT user:1:like 0 1
(integer) 0
$ SETBIT user:1:like 1 0
(integer) 0
$ SETBIT user:1:like 2 1
(integer) 0
$ GETBIT user:1:like 0
(integer) 1
$ BITCOUNT user:1:like
(integer) 2
BITMAP
还支持一些位运算,比如 AND
、OR
、XOR
和 NOT
。
内部实现
在 Redis 内部,BITMAP
使用字符串来存储,然后单独操作每一位。
$ SETBIT user:1:like 0 1
(integer) 0
$ OBJECT ENCODING user:1:like
"raw"
展开实现代码
判断代码:
robj *lookupStringForBitCommand(client *c, uint64_t maxbit, int *dirty) {
size_t byte = maxbit >> 3;
robj *o = lookupKeyWrite(c->db,c->argv[1]);
if (checkType(c,o,OBJ_STRING)) return NULL;
if (dirty) *dirty = 0;
if (o == NULL) {
o = createObject(OBJ_STRING,sdsnewlen(NULL, byte+1));
dbAdd(c->db,c->argv[1],o);
if (dirty) *dirty = 1;
} else {
o = dbUnshareStringValue(c->db,c->argv[1],o);
size_t oldlen = sdslen(o->ptr);
o->ptr = sdsgrowzero(o->ptr,byte+1);
if (dirty && oldlen != sdslen(o->ptr)) *dirty = 1;
}
return o;
}
应用
-
签到
BITMAP
可以用来实现签到功能。比如,我们可以使用SETBIT
来设置某一天是否签到:$ SETBIT user:1:signin 20250101 1 (integer) 0 $ SETBIT user:1:signin 20250102 1 (integer) 0 $ GETBIT user:1:signin 20250101 (integer) 1 $ BITCOUNT user:1:signin (integer) 2
HYPERLOGLOG
基本操作
HYPERLOGLOG
可以用来统计基数,即不重复元素的个数。要注意的是,它并不是精确的,而有 0.81% 的误差率。相比于其它精确统计的方法,HYPERLOGLOG
的优势在于它的空间复杂度是固定的,每个键只需要 12KB 的空间就可以统计 \(2^64\) 个元素。
使用 PFADD
添加元素,使用 PFCOUNT
统计元素个数,使用 PFMERGE
合并多个统计:
$ PFADD post:1:comment ch3nyang A B C D E A C C
(integer) 1
$ PFCOUNT post:1:comment
(integer) 6
$ PFADD post:2:comment ch3nyang A B C D E F G
(integer) 1
$ PFCOUNT post:2:comment
(integer) 8
$ PFMERGE post:comment post:1:comment post:2:comment
"OK"
$ PFCOUNT post:comment
(integer) 8
内部实现
在 Redis 内部,HYPERLOGLOG
使用 raw
字符串来存储:
$ PFADD post:1:comment ch3nyang A B C D E A C C
(integer) 1
$ OBJECT ENCODING post:1:comment
"raw"
这个字符串包含了 16 字节的头部,用于存储 HYPERLOGLOG
的元数据;以及 12KB 的空间,包含了 16384 个 6 位的桶,用于统计元素。我们将在后文详细介绍 HYPERLOGLOG
的实现。
应用
-
网页 UV 统计
HYPERLOGLOG
可以用来统计网页的 UV。比如,我们可以使用PFADD
来统计用户访问网页的 IP:PFADD page:1:uv 192.168.0.1 PFADD page:1:uv 192.168.0.2 PFADD page:1:uv 192.168.0.3 PFADD page:1:uv 192.168.0.1
然后,使用
PFCOUNT
来统计 UV:PFCOUNT page:1:uv
GEO
基本操作
GEO
可以用来存储地理位置信息:
- 使用
GEOADD
添加地理位置 - 使用
GEOPOS
获取地理位置 - 使用
GEOHASH
获取地理位置的哈希值 - 使用
GEODIST
计算两个地理位置之间的距离 - 使用
GEORADIUS
获取某一地理位置范围内的元素:
$ GEOADD user 116.404 39.915 ch3nyang 106.404 29.915 A 96.404 19.915 B
$ GEOPOS user ch3nyang
1) 1) "116.40400081872940063"
2) "39.91500057149188763"
$ GEOHASH user ch3nyang
1) "wx4g0f6c9z0"
$ GEODIST user ch3nyang A
"1436341.5183"
$ GEORADIUS user 116.404 39.915 2000 km
1) "A"
2) "ch3nyang"
内部实现
在 Redis 内部,GEO
直接被视为 ZSET
,作为有序集合存在。它的键为用户定义的名字,值为经纬度通过 GEOHASH
编码后的结果。
$ GEOADD user 116.404 39.915 ch3nyang 106.404 29.915 A 96.404 19.915 B
(integer) 3
$ OBJECT ENCODING user
"listpack"
具体到单个坐标,GEO
考虑了其精度问题,使用交错编码的方式来存储经纬度,同时还记录了精度。其定义如下:
typedef struct {
uint64_t bits; // 编码后的结果
uint8_t step; // 精度
} GeoHashBits;
展开实现代码
实现代码:
int geohashEncode(const GeoHashRange *long_range, const GeoHashRange *lat_range,
double longitude, double latitude, uint8_t step,
GeoHashBits *hash) {
// 输入验证与范围检查
if (hash == NULL || step > 32 || step == 0 ||
RANGEPISZERO(lat_range) || RANGEPISZERO(long_range)) return 0;
if (longitude > GEO_LONG_MAX || longitude < GEO_LONG_MIN ||
latitude > GEO_LAT_MAX || latitude < GEO_LAT_MIN) return 0;
// bits 用于存储编码后的结果,step 用于存储精度
hash->bits = 0;
hash->step = step;
if (latitude < lat_range->min || latitude > lat_range->max ||
longitude < long_range->min || longitude > long_range->max) {
return 0;
}
// 计算偏移量,将经纬度转换为 0-1 之间的相对值
double lat_offset =
(latitude - lat_range->min) / (lat_range->max - lat_range->min);
double long_offset =
(longitude - long_range->min) / (long_range->max - long_range->min);
// 将经纬度编码
lat_offset *= (1ULL << step);
long_offset *= (1ULL << step);
hash->bits = interleave64(lat_offset, long_offset);
return 1;
}
应用
-
附近的人
GEO
可以用来查找附近的人。比如,我们可以使用GEOADD
来添加用户的地理位置,使用GEORADIUS
来查找附近的人:$ GEOADD user 116.404 39.915 ch3nyang 106.404 29.915 A 96.404 19.915 B $ GEORADIUS user 116.404 39.915 2000 km 1) "A" 2) "ch3nyang"
STREAM
消息队列
我们先来看普通的生产者-消费者模型。它就如同一个先进先出的队列,生产者不断地往队列中添加消息,而消费者则不断地从队列中读取消息。
然而,如果一个生产者对应多个消费者,那么这个模型就会变得复杂。这时,我们需要引入消费者组。消费者组可以让多个消费者共同消费一个队列,每个消息只会被消费一次。换句话说,对于一条消息,同一个消费者组中只能有一个消费者消费一次。如果有多个消费者组,消费者组可以让我们实现一个发布-订阅模型。
这个模型在开发中有着重要应用。例如,我们对外提供一个服务,用户可以向服务器发送请求服务的消息,而我们有数台后端服务器来消费这些消息。通过消费者组,我们可以做到:
- 一致性保证:每个消息只会被消费一次
- 负载均衡:每台服务器都有机会消费到消息
- 容错能力:当一台服务器宕机时,本来归它消费的消息会被其他服务器消费
- 弹性扩展:服务压力大时,可以随时增加消费者来处理更多的消息
基本操作
Redis 的 STREAM
完美支持了消息队列的功能。
有一些符号可用于表述消息队列中的消息:
-
>
- 用于消费者组的特殊符号,表示只获取从未被消费过的消息 -
$
- 表示最新(最大)的消息 ID -
0
- 表示最旧(最小)的消息 ID -
+
- 表示最大的消息 ID,常用于范围查询 -
-
- 表示最小的消息 ID,常用于范围查询 -
*
- 用于 XADD 命令时让 Redis 自动生成消息ID
对于单个消费者-生产者模型,我们可以:
- 使用
XADD
添加消息 - 使用
XLEN
获取消息数量 - 使用
XRANGE
获取范围内的消息 - 使用
XREAD
读取消息 - 使用
XDEL
删除消息
$ XADD message * name ch3nyang
"1631770000000-0"
$ XADD message * name A
"1631770000001-0"
$ XADD message * name B
"1631770000002-0"
$ XADD message * name C
"1631770000003-0"
$ XLEN message
(integer) 4
$ XREAD STREAMS message 0
1) 1) "message"
2) 1) 1) "1631770000000-0"
2) 1) "name"
2) "ch3nyang"
$ XRANGE message - +
1) 1) "1631770000000-0"
2) 1) "name"
2) "ch3nyang"
2) 1) "1631770000001-0"
2) 1) "name"
2) "A"
3) 1) "1631770000002-0"
2) 1) "name"
2) "B"
4) 1) "1631770000003-0"
2) 1) "name"
2) "C"
$ XDEL message 1631770000000-0
(integer) 1
$ XLEN message
(integer) 3
需要注意的是,XREAD
可以读取同一个消息多次。
我们也可以让消费者阻塞,直到有新消息产生:
$ XREAD BLOCK 10000 STREAMS message $
(nil)
(10.00s)
它也支持消费者组:
- 使用
XGROUP CREATE
创建消费者组 - 使用
XGROUP SETID
设置消费者组的 ID - 使用
XREADGROUP
读取消息 - 使用
XACK
确认消息 - 使用
XPENDING
查看未确认的消息
$ XGROUP CREATE message consumer-group1 $
OK
$ XGROUP CREATE message consumer-group2 $
OK
$ XADD message * name ch3nyang
"1631770000000-0"
$ XADD message * name A
"1631770000001-0"
$ XADD message * name B
"1631770000002-0"
$ XADD message * name C
"1631770000003-0"
$ XREADGROUP GROUP consumer-group1 consumer1 STREAMS message >
1) 1) "message"
2) 1) 1) "1631770000000-0"
2) 1) "name"
2) "ch3nyang"
2) 1) 1) "1631770000001-0"
2) 1) "name"
2) "A"
3) 1) 1) "1631770000002-0"
2) 1) "name"
2) "B"
4) 1) 1) "1631770000003-0"
2) 1) "name"
2) "C"
$ XREADGROUP GROUP consumer-group1 consumer2 STREAMS message >
(nil)
$ XREADGROUP GROUP consumer-group2 consumer1 COUNT 1 STREAMS message >
1) 1) "message"
2) 1) 1) "1631770000000-0"
2) 1) "name"
2) "ch3nyang"
$ XPENDING message consumer-group1
1) (integer) 4
2) "1631770000000-0"
3) "1631770000003-0"
4) 1) 1) "consumer1"
2) "4"
$ XACK message consumer-group1 1631770000000-0
(integer) 1
$ XPENDING message consumer-group1
1) (integer) 3
2) "1631770000001-0"
3) "1631770000003-0"
4) 1) 1) "consumer1"
2) "3"
为什么在 XREADGROUP
后还需要 XACK
呢?这是因为 XREADGROUP
只是读取消息,读取后消息会一直存在,直到被确认。这是为了在消费者处理消息中途出错或宕机时,消息不会丢失,可以在恢复后重新处理。消费者在处理完成后,发送 XACK
来确认消息,此时消息队列就可以放心地删除这条消息了。
内部实现
在 Redis 内部,STREAM
使用 stream
来存储:
$ XADD message * name ch3nyang
"1631770000000-0"
$ OBJECT ENCODING message
"stream"
stream
里的每个 entry
都使用 listpack
来存储,然后使用一个基数树来索引这些 entry
。这样,STREAM
可以快速地插入、删除和查询消息。
展开实现代码
typedef struct stream {
rax *rax; // 基数树
uint64_t length; // 当前消息数量
streamID last_id; // 最新消息 ID,如果没有则置 0
streamID first_id; // 最旧消息 ID
streamID max_deleted_entry_id; // 最大被删除的消息 ID
uint64_t entries_added; // 添加过的总消息数量
rax *cgroups; // 消费者组字典:name -> streamCG
} stream;
rax
是基数树,你可以把它理解为字典树,但它对其进行了压缩处理,也就是说,字典树的每个节点可以存储一个字符串而不是单个字符。rax
的结构体如下:
typedef struct rax {
raxNode *head;
uint64_t numele;
uint64_t numnodes;
void *metadata[];
} rax;
typedef struct raxNode {
uint32_t iskey:1; // 该节点是否包含键
uint32_t isnull:1; // 该节点是否为空
uint32_t iscompr:1; // 该节点是否为压缩节点
uint32_t size:29; // 子节点数量 / 数据长度
unsigned char data[];
} raxNode;
回到 stream
,其中的消费者组 cgroups
也是一个基数树,用于存储消费者组的信息。streamCG
的结构体如下:
typedef struct streamCG {
streamID last_id; // 最后接收但未确认的消息 ID
long long entries_read; // 已读取的消息数量
rax *pel; // 待确认消息列表
rax *consumers; // 消费者字典:name -> streamConsumer
} streamCG;
typedef struct streamConsumer {
mstime_t seen_time; // 最后一次尝试读取/确认消息时间
mstime_t active_time; // 最后一次成功读取/确认消息时间
sds name; // 消费者名
rax *pel; // 待确认消息列表
} streamConsumer;
pel
是待确认消息列表,用于存储消费者未确认的消息。它的结构如下:
typedef struct streamNACK {
mstime_t delivery_time; // 该消息被投递的时间
uint64_t delivery_count; // 该消息被投递的次数
streamConsumer *consumer; // 最后一次投递给的消费者
} streamNACK;
应用
Redis 的 STREAM
可以用于实现消息队列,但它毕竟不是专业的,存在着两个问题:
-
消息丢失
STREAM
使用确认机制来保证消息生产者到中间件、中间件到消费者这两个过程不会发生消息丢失。但是,Redis 作为消息中间件本身是可能丢失消息的:- AOF 异步持久化时,如果 Redis 宕机,可能会丢失消息
- 异步主从复制时,如果主节点宕机,可能会丢失消息
相比之下,专业的消息中间件如 Kafka、RabbitMQ 等,使用了多副本的方式来保证消息不会丢失。当生产者发送消息时,消息会被发送到多个副本。即便某个副本宕机,消息也不会丢失。
-
消息堆积
STREAM
的消息存储在内存中,如果消息堆积过多,可能会导致 Redis 内存溢出。而专业的消息中间件则会将消息存储在磁盘中,即便消息堆积过多,也不会导致内存溢出。
综上,STREAM
适合用于一些简单的消息队列,但对于需求较高的场景,还是需要专业的消息中间件。
MODULE
Redis 还支持 MODULE
,允许用户自定义命令、数据结构和事件。这使得 Redis 可以更好地适应不同的业务场景。
MODULE
的基本操作如下:
- 使用
MODULE LOAD
加载模块 - 使用
MODULE UNLOAD
卸载模块 - 使用
MODULE LIST
查看已加载的模块 - 使用
MODULE COMMAND
查看模块的命令 - 使用
MODULE GETAPI
获取模块的 API
这其实相当于给 Redis 添加了一个插件系统,用户可以根据自己的需求来扩展 Redis 的功能。我们在此不再详细介绍。
Comments