Ch3nyang's blog collections_bookmark

post

person

about

category

category

local_offer

tag

rss_feed

rss

深入 Redis 源码 | (1)
Redis 对象

calendar_month 2025-01
archive 中间件
tag redis

There are 6 posts in series 深入 Redis 源码.

本文将会从源码层面解读 Redis 的各项机制。我们会从最基础的内容开始,尽量只看 Redis 最核心的部分,而剥离掉一些不太重要的内容,所有的源码都给出了 GitHub 的链接。

写作本文时,Redis CE 还在 7.4 版本;Redis 8.0 仍然在 Pre-Release。因此,本文主要基于 Redis 7.4,其余版本可能有所不同。本文使用的环境为 Debian 12。

简单来讲,Redis 是一个内存优先的 KV NoSQL。它有这么几个优点:

  1. 速度快:Redis 是基于内存的,所以速度非常快。
  2. 支持多种 Redis 对象:Redis 支持多种 Redis 对象,包括字符串、列表、集合、有序集合、哈希表等。
  3. 原子化操作:Redis 支持原子化操作,可以保证多个操作的原子性。

安装与启动

安装 Redis 很简单,直接用包管理器安装即可。例如,在 Debian 上:

sudo apt install redis-server

Windows 可以使用 WSL,也可以直接下载 Windows 版本的 Redis。值得注意的是,微软维护的版本早就不更新了,我看到 Chocolatey 上的版本是最新的,可以尝试安装:

sudo choco install redis
redis-server

安装完成后,可以使用 redis-cli 进行连接:

redis-cli

并且可以看一下是否正常:

$ ping
PONG

如果出问题了,可以看一下 Redis 是否启动了:

sudo service redis-server status
sudo service redis-server start

Redis 支持的所有命令及文档可以在这里查看。

Redis 对象

数据在 Redis 内的存储方式多种多样,分为以下几种

char *strEncoding(int encoding) {
    switch(encoding) {
    case OBJ_ENCODING_RAW: return "raw";
    case OBJ_ENCODING_INT: return "int";
    case OBJ_ENCODING_HT: return "hashtable";
    case OBJ_ENCODING_QUICKLIST: return "quicklist";
    case OBJ_ENCODING_LISTPACK: return "listpack";
    case OBJ_ENCODING_LISTPACK_EX: return "listpackex";
    case OBJ_ENCODING_INTSET: return "intset";
    case OBJ_ENCODING_SKIPLIST: return "skiplist";
    case OBJ_ENCODING_EMBSTR: return "embstr";
    case OBJ_ENCODING_STREAM: return "stream";
    default: return "unknown";
    }
}

但是,根据存储内容和操作的不同,Redis 对外暴露了几种 Redis 对象:

Redis 对象 存储内容 结构读写能力 内部实现
STRING 字符串、整数、浮点数 对整个字符串或其部分进行操作
自增/减整数和浮点数
int
embstr
sds
LIST 链表 从两端压入/弹出键值对
根据偏移量修剪
读取一个或多个键值对
按值查找或删除键值对
quicklist
listpack
HASH 哈希表 增加/删除/查询键值对
获取所有键值对
listpack
SET 集合 增加/删除/查询键值对
检查元素是否存在
计算交集/并集/差集
随机获取元素
intset
listpack
hashtable
ZSET 有序的字符串-浮点数对集合 增加/删除/查询键值对
获取所有键值对
按分数范围查找键值对
listpack
skiplist

此外,还有一些后来加入的 Redis 对象:

Redis 对象 应用场景 加入版本 内部实现
BITMAP 位图 2.2 字符串
HYPERLOGLOG 基数统计 2.8 字符串
GEO 地理位置 3.2 ZSET
STREAM 消息队列 5.0 stream

此外,Redis 还支持 MODULE,可以自定义对象。

当然,在未发布的 Redis 8.0 中,增加了 JSONTIME_SERIES 等对象。它们基本都是从 MODULE 转正的。等到正式发布后,我们再来介绍。

STRING

基本操作

最基本的命令就是 SETGET,其用法为 SET key valueGET key

$ SET name ch3nyang
OK

$ GET name
"ch3nyang"

value 为字符串时,最大长度为 512MB。

可以使用 DEL 删除键值对:

$ DEL name
(integer) 1

$ GET name
(nil)

这里,返回的 (integer) 1 表示操作的键中的元素个数。

对于大 KEY,还可以使用 unlink 来异步删除,使其不会阻塞主线程。

在 Redis 中,STRING 也可以存储整数和浮点数。对于整数,可以使用 INCRDECR 自增和自减:

$ SET count 1
OK

$ INCR count
(integer) 2

$ DECR count
(integer) 1

$ INCRBY count 5
(integer) 6

对于浮点数,可以使用 INCRBYFLOAT 自增浮点数:

$ SET price 10.5
OK

$ INCRBYFLOAT price 1.2
"11.7"

内部实现

在 Redis 内部,根据值的不同,STRING 有四种编码方式:

  • shared.integers:用于存储 0-10000 之间的整数
    $ SET count 1
    OK
    
    $ OBJECT ENCODING count
    "int"
    

    这是为了节省内存而共享的一些整数。

  • int:用于存储 long 范围内的整数
    $ SET count 10001
    OK
    
    $ OBJECT ENCODING count
    "int"
    
  • embstr:用于存储长度小于 44 的字符串
    $ SET name ch3nyang
    OK
    
    $ OBJECT ENCODING name
    "embstr"
    
  • sds:用于存储长度大于 44 的字符串
    $ SET name ch3nyangch3nyangch3nyangch3nyangch3nyangch3nyangch3nyang
    OK
    
    $ OBJECT ENCODING name
    "raw"
    

有一点值得一提的是,编码方式是会跟着值的变化而变化的。比如,当一个整数超过了 long 范围,Redis 会将其转换为字符串。

展开实现代码

编码方式的判断代码

robj *createStringObjectFromLongLongWithOptions(long long value, int flag) {
    robj *o;

    if (value >= 0 && value < OBJ_SHARED_INTEGERS && flag == LL2STROBJ_AUTO) {
        // 如果在 0-10000 之间,使用共享整数
        o = shared.integers[value];
    } else {
        if ((value >= LONG_MIN && value <= LONG_MAX) && flag != LL2STROBJ_NO_INT_ENC) {
            // 如果在 long 范围内,使用 int 编码
            o = createObject(OBJ_STRING, NULL);
            o->encoding = OBJ_ENCODING_INT;
            o->ptr = (void*)((long)value);
        } else {
            // 否则使用字符串编码
            char buf[LONG_STR_SIZE];
            int len = ll2string(buf, sizeof(buf), value);
            o = createStringObject(buf, len);
        }
    }
    return o;
}

以及这段代码

robj *createStringObject(const char *ptr, size_t len) {
    if (len <= OBJ_ENCODING_EMBSTR_SIZE_LIMIT)
        // 如果长度小于 44,使用 embstr 编码
        return createEmbeddedStringObject(ptr,len);
    else
        // 否则使用 sds 编码
        return createRawStringObject(ptr,len);
}

其中,我们注意到了有个 shared 的变量,这是 Redis 为了节省内存而共享的一些内容,包括:

  • 0-10000 的整数
  • 各种命令返回值的字符串

应用

  • 缓存对象

    STRING 可以用来缓存对象。比如,我们可以将一个对象序列化为字符串,然后存储在 Redis 中。

    当然,对于 JSON 之类的数据,也可以使用 MGETMSET 来批量操作:

    $ MSET user:1:name ch3nyang user:1:passwd 123456
    OK
    
    $ MGET user:1:name user:1:passwd
    1) "ch3nyang"
    2) "123456
    
  • 计数器

    STRING 可以用来实现计数器,记录点赞数、浏览数等。例如:

    $ SET post:readcount:1 0
    OK
    
    $ INCR post:readcount:1
    (integer) 1
    
  • 分布式锁

    STRING 可以用来实现分布式锁。这里需要使用 SETNX 来实现,它会在键不存在时才设置键值对,否则会返回 0。我们话可以同时为它加上一个过期时间,这样即使锁没有被释放,也不会一直占用内存:

    $ SETNX lock 378236 NX PX 10000
    (integer) 1
    
    $ SETNX lock 242343 NX PX 10000
    (integer) 0
    

    解锁则比较麻烦,需要先获取值,然后判断是否相等,最后删除。这里需要使用 Lua 脚本来保证原子性:

    EVAL "if redis.call('GET', KEYS[1]) == ARGV[1] then return redis.call('DEL', KEYS[1]) else return 0 end" 1 lock 378236
    
  • 存储 Session

    STRING 可以用来存储 Session。这里需要使用 SETEX 来设置过期时间:

    $ SETEX session:378236 3600 "ch3nyang"
    OK
    

LIST

基本操作

对于列表:

  • 使用 LPUSHRPUSH 分别从左和右压入元素
  • 使用 LPOPRPOP 分别从左和右弹出元素
  • 使用 LRANGE 获取范围内的元素:
$ RPUSH user ch3nyang A B C
(integer) 4

$ LPUSH user D E
(integer) 6

$ LRANGE user 0 -1
1) "E"
2) "D"
3) "ch3nyang"
4) "A"
5) "B"
6) "C"

$ LRANGE user 0 2
1) "E"
2) "D"
3) "ch3nyang"

$ LINDEX user 2
"ch3nyang"

$ LINDEX user 8
(nil)

$ LPOP user
"E"

$ LRANGE user 0 -1
1) "D"
2) "ch3nyang"
3) "A"
4) "B"
5) "C"

这里有几个要注意的地方:

  1. 不管是 LPUSH 还是 RPUSH,都是将输入的内容从左到右挨个压入的
  2. LRANGE 的索引是从 0 开始的,且包含两个端点
  3. 如果索引超出范围,会返回 nil

内部实现

目前的 LIST 在 Redis 主要使用 quicklistlistpack 来存储。使用哪种数据结构由配置文件中的 list-max-listpack-size 决定:

  • 如果这个值为正数,则为 listpack 的最大元素个数
  • 如果这个值为负数,指代 listpack 的最大字节数;-1 表示 4096(64 Kb)、-2 表示 8192……-5 表示 65536

其默认值为 -2,也就是让 listpack 不超过 8192 字节。

我们这里为了方便演示,暂时将 list-max-listpack-size 改为 2:

试完记得该回去!

  • listpack
    $ RPUSH user ch3nyang
    (integer) 1
    
    $ OBJECT ENCODING user
    "listpack"
    
  • quicklist
    $ RPUSH user ch3nyang A B C
    (integer) 4
    
    $ OBJECT ENCODING user
    "quicklist"
    
展开实现代码

证据在PUSH 的代码

/**
 * @param where LIST_HEAD(0) or LIST_TAIL(1)
 * @param xx 如果设置为 1,则只有当 key 存在时才会进行操作
 */
void pushGenericCommand(client *c, int where, int xx) {
    int j;

    robj *lobj = lookupKeyWrite(c->db, c->argv[1]);
    if (checkType(c,lobj,OBJ_LIST)) return;
    if (!lobj) {
        if (xx) {
            // 如果键不存在且 xx 为 1,报错返回
            addReply(c, shared.czero);
            return;
        }

        // 如果之前没有这个键,则直接创建 listpack 对象
        lobj = createListListpackObject();
        dbAdd(c->db,c->argv[1],lobj);
    }

    // 在添加元素前,决定是否要转换 list 的编码方式
    listTypeTryConversionAppend(lobj,c->argv,2,c->argc-1,NULL,NULL);

    // 插入元素
    for (j = 2; j < c->argc; j++) {
        listTypePush(lobj,c->argv[j],where);
        server.dirty++;
    }

    addReplyLongLong(c, listTypeLength(lobj));

    char *event = (where == LIST_HEAD) ? "lpush" : "rpush";
    signalModifiedKey(c,c->db,c->argv[1]);
    notifyKeyspaceEvent(NOTIFY_LIST,event,c->argv[1],c->db->id);
}

其中,listTypeTryConversionAppend 会在添加元素前决定是否要转换 list 的编码方式:

/**
 * 尝试将 listpack 转换为 quicklist
 * @param o 要转换的对象
 * @param argv 要添加的元素
 * @param start 开始索引
 * @param end 结束索引
 * @param fn 转换前的回调函数
 * @param data 回调函数的参数
 */
static void listTypeTryConvertListpack(robj *o, robj **argv, int start, int end,
                                       beforeConvertCB fn, void *data)
{
    serverAssert(o->encoding == OBJ_ENCODING_LISTPACK);

    size_t add_bytes = 0;
    size_t add_length = 0;

    // 计算要添加的字节数和长度
    if (argv) {
        for (int i = start; i <= end; i++) {
            if (!sdsEncodedObject(argv[i]))
                continue;
            add_bytes += sdslen(argv[i]->ptr);
        }
        add_length = end - start + 1;
    }

    // 检查是否超过了限制
    if (quicklistNodeExceedsLimit(server.list_max_listpack_size,
            lpBytes(o->ptr) + add_bytes, lpLength(o->ptr) + add_length))
    {
        // 如果超过了限制,转换为 quicklist
        if (fn) fn(data);

        quicklist *ql = quicklistNew(server.list_max_listpack_size, server.list_compress_depth);

        if (lpLength(o->ptr))
            quicklistAppendListpack(ql, o->ptr);
        else
            lpFree(o->ptr);
        o->ptr = ql;
        o->encoding = OBJ_ENCODING_QUICKLIST;
    }
}

/**
 * 根据对象本身的类型,将其转换为另一种类型
 * @param o 要转换的对象
 * @param lct 转换类型(LIST_CONV_GROWING、LIST_CONV_SHRINKING、LIST_CONV_AUTO)
 * @param argv 要添加的元素
 * @param start 开始索引
 * @param end 结束索引
 * @param fn 转换前的回调函数
 * @param data 回调函数的参数
 */
static void listTypeTryConversionRaw(robj *o, list_conv_type lct,
                                     robj **argv, int start, int end,
                                     beforeConvertCB fn, void *data)
{
    if (o->encoding == OBJ_ENCODING_QUICKLIST) {
        // 转换为 listpack
        if (lct == LIST_CONV_GROWING) return;
        listTypeTryConvertQuicklist(o, lct == LIST_CONV_SHRINKING, fn, data);
    } else if (o->encoding == OBJ_ENCODING_LISTPACK) {
        // 转换为 quicklist
        if (lct == LIST_CONV_SHRINKING) return;
        listTypeTryConvertListpack(o, argv, start, end, fn, data);
    } else {
        serverPanic("Unknown list encoding");
    }
}

void listTypeTryConversionAppend(robj *o, robj **argv, int start, int end,
                                 beforeConvertCB fn, void *data)
{
    listTypeTryConversionRaw(o, LIST_CONV_GROWING, argv, start, end, fn, data);
}

quicklistNodeExceedsLimit 被用于判断是否要转换为 quicklist:

// 优化等级,代表 listpack 的最大字节数
static const size_t optimization_level[] = {4096, 8192, 16384, 32768, 65536};

static size_t quicklistNodeNegFillLimit(int fill) {
    assert(fill < 0);
    size_t offset = (-fill) - 1;
    size_t max_level = sizeof(optimization_level) / sizeof(*optimization_level);
    if (offset >= max_level) offset = max_level - 1;
    return optimization_level[offset];
}

/**
 * 根据 fill 的值返回 listpack 的最大字节数或者最大元素个数
 */
void quicklistNodeLimit(int fill, size_t *size, unsigned int *count) {
    *size = SIZE_MAX; // _UI64_MAX / _UI32_MAX
    *count = UINT_MAX;

    if (fill >= 0) {
        // 如果 fill 为正数,表示每个节点的最大元素个数
        *count = (fill == 0) ? 1 : fill;
    } else {
        // 如果 fill 为负数,取反后表示优化等级。我们根据优化等级返回 listpack 的最大字节数
        *size = quicklistNodeNegFillLimit(fill);
    }
}

/**
 * 判断新了 list 的大小是否超过了限制
 * @param fill 设置中的 list-max-listpack-size。如果为正数,表示每个节点的最大元素个数;如果为负数,表示每个节点的最大字节数
 * @param new_sz 新元素的字节数
 * @param new_count 新元素的个数
 * @return 是否超过限制
 */
int quicklistNodeExceedsLimit(int fill, size_t new_sz, unsigned int new_count) {
    size_t sz_limit;
    unsigned int count_limit;
    quicklistNodeLimit(fill, &sz_limit, &count_limit);

    if (likely(sz_limit != SIZE_MAX)) {
        // 使用最大元素个数来判断
        return new_sz > sz_limit;
    } else if (count_limit != UINT_MAX) {
        // 使用最大字节数来判断
        if (!sizeMeetsSafetyLimit(new_sz)) return 1;
        return new_count > count_limit;
    }

    redis_unreachable();
}

而在 Redis 3.2 之前,LIST 则采用了 ziplistlinkedlist 两种方式。

应用

  • 消息队列

    LIST 可以用来实现消息队列。比如,我们可以使用 LPUSHRPOP 来实现一个简单的消息队列:

    $ LPUSH message ch3nyang
    (integer) 1
    
    $ LPUSH message A
    (integer) 2
    
    $ RPOP message
    "ch3nyang"
    
    $ RPOP message
    "A"
    

    不过,这样做的话,消费者需要不停地轮询。如果要实现一个更好的消息队列,可以使用 BRPOPBLPOP,这两个命令会阻塞直到有元素被压入:

    $ BRPOP message 0
    
    $ LPUSH message ch3nyang
    (integer) 1
    
    $ LPUSH message A
    (integer) 2
    
    1) "message"
    2) "ch3nyang"
    

    为了防止重复的消息,可以手动为每个消息添加一个 ID,然后使用 SET 来记录已经消费的消息。

    为了保证消息的可靠性,可以使用 RPOPLPUSH 来将消息额外存储在一个备份队列中,然后在消费者消费成功后再删除。

    LIST 作为消息队列的缺点很明显,它并不能支持多个消费者消费同一个消息。好在,现在的 Redis 已经支持了 STREAM

HASH

基本操作

对于哈希表:

  • 使用 HSETHGET,其用法为 HSET key field value [field value ...]HGET key field
  • 如果要获取所有键值对,可以使用 HGETALL。注意,这里不能使用 GET
$ HSET user name ch3nyang passwd 123456 socialcredit 10086
(integer) 3

$ HGETALL user
1) "name"
2) "ch3nyang"
3) "passwd"
4) "123456"
5) "socialcredit"
6) "10086"

$ HGET user name
"ch3nyang"

哈希表同样可以使用 DEL 整体删除。但如果只要删除一个键值对,可以使用 HDEL

$ HDEL user passwd
(integer) 1

$ HGETALL user
1) "name"
2) "ch3nyang"
3) "socialcredit"
4) "10086"

$ DEL user
(integer) 1

$ HGETALL user
(empty array)

内部实现

对于哈希表,Redis 在 7.0 之前采用 ziplisthashtable 两种方式存储。而目前则全部使用 listpack

$ HSET user name ch3nyang passwd 123456 socialcredit 10086
(integer) 3

$ OBJECT ENCODING user
"listpack"
展开实现代码

判断代码

static robj *hashTypeLookupWriteOrCreate(client *c, robj *key) {
    robj *o = lookupKeyWrite(c->db,key);
    if (checkType(c,o,OBJ_HASH)) return NULL;

    if (o == NULL) {
        o = createHashObject();
        dbAdd(c->db,key,o);
    }
    return o;
}

应用

  • 缓存对象

    这其实类似之前用 STRING 缓存对象的方法。不过,HASH 可以更好地组织对象,适合属性会频繁变化的对象。

SET

基本操作

对于集合:

  • 使用 SADD 添加元素
  • 使用 SREM 删除元素
  • 使用 SMEMBERS 获取所有元素
  • 使用 SISMEMBER 检查元素是否存在:
$ SADD user ch3nyang A B C D D E
(integer) 6

$ SMEMBERS user
1) "A"
2) "ch3nyang"
3) "B"
4) "D"
5) "E"
6) "C"

$ SISMEMBER user A
(integer) 1

$ SREM user A B C
(integer) 3

$ SISMEMBER user A
(integer) 0

$ SMEMBERS user
1) "ch3nyang"
2) "D"
3) "E"

SET 作为集合,不允许重复元素。如果添加了重复元素,只会添加一次。同时,我们注意到,它有时是无序的。

内部实现

在 Redis 内部,SET 有三种编码方式:

  • intset:元素都是整数且元素个数小于等于 set-max-intset-entries(默认为 512)
    $ SADD user 1 2 3 4 5
    (integer) 5
    
    $ OBJECT ENCODING user
    "intset"
    
  • listpack:元素个数小于等于 set-max-listpack-entries(默认为 512)
    $ SADD user ch3nyang A B C D E
    (integer) 5
    
    $ OBJECT ENCODING user
    "listpack"
    
  • hashtable:其它情况
    $ SADD user ch3nyang A B C D E ...
    (integer) 1000
    
    $ OBJECT ENCODING user
    "hashtable"
    

我们可以通过调整 set-max-intset-entriesset-max-listpack-entries 来改变 set 的编码方式。

展开实现代码

它的判断代码

robj *setTypeCreate(sds value, size_t size_hint) {
    if (isSdsRepresentableAsLongLong(value,NULL) == C_OK && size_hint <= server.set_max_intset_entries)
        // 如果元素都是整数且元素个数小于等于 set_max_intset_entries,使用 intset 编码
        return createIntsetObject();
    if (size_hint <= server.set_max_listpack_entries)
        // 如果元素个数小于等于 set_max_listpack_entries,使用 listpack 编码
        return createSetListpackObject();

    // 否则使用 hashtable 编码
    robj *o = createSetObject();
    dictExpand(o->ptr, size_hint);
    return o;
}

应用

  • 点赞统计

    SET 可以用来保证每个用户只能点赞一次。比如,我们可以使用 SADD 来添加点赞用户,使用 SCARD 来统计点赞数:

    $ SADD post:1:like ch3nyang A B C D E
    (integer) 5
    
    $ SREM post:1:like B
    
    $ SCARD post:1:like
    (integer) 4
    
  • 好友关系

    可以利用 SET 的交集、并集和差集等来求共同好友、共同关注等:

    $ SADD user:1:follow ch3nyang A B C D E
    (integer) 5
    
    $ SADD user:2:follow ch3nyang A B C D E F
    (integer) 6
    
    $ SINTER user:1:follow user:2:follow
    1) "ch3nyang"
    2) "A"
    3) "B"
    4) "C"
    5) "D"
    6) "E"
    

ZSET

基本操作

对于有序集合:

  • 使用 ZADD 添加元素
  • 使用 ZREM 删除元素
  • 使用 ZRANGE 获取范围内的元素
  • 使用 ZSCORE 获取元素的分数
  • 使用 ZRANGEBYSCORE 获取分数范围内的元素:
$ ZADD user 10086 ch3nyang 20 A 30 B 10 C
(integer) 4

$ ZRANGE user 0 -1
1) "C"
2) "A"
3) "B"
4) "ch3nyang"

$ ZRANGE user 0 -1 WITHSCORES
1) "C"
2) "10"
3) "A"
4) "20"
5) "B"
6) "30"
7) "ch3nyang"
8) "10086"

$ ZRANGEBYSCORE user 0 30
1) "C"
2) "A"
3) "B"

$ ZSCORE user ch3nyang
"10086"

$ ZREM user ch3nyang
(integer) 1

$ ZSCORE user ch3nyang
(nil)

有序集合是按照分数从小到大排序的。如果需要逆序输出,可以使用 ZREVRANGE

内部实现

在 Redis 内部,ZSET 有两种编码方式:

  • listpack:元素个数小于等于 zset-max-listpack-entries(默认为 128)且元素长度小于 zset-max-listpack-value(默认为 64)
    $ ZADD user 1 A 2 B 3 C 4 D
    (integer) 4
    
    $ OBJECT ENCODING user
    "listpack"
    
  • skiplist:其他情况
    $ ZADD user 1 ch3nyangch3nyangch3nyangch3nyangch3nyangch3nyangch3nyangch3nyangch3nyang 2 A 3 B 4 C
    (integer) 10
    
    $ OBJECT ENCODING user
    "skiplist"
    

我们可以通过调整 zset-max-listpack-entrieszset-max-listpack-value 来改变 ZSET 的编码方式。

值得注意的是,ZSET 在由 listpack 转换为 skiplist 时,只关心元素个数,不关心元素长度。

展开实现代码

它的判断代码

robj *zsetTypeCreate(size_t size_hint, size_t val_len_hint) {
    if (size_hint <= server.zset_max_listpack_entries &&
        val_len_hint <= server.zset_max_listpack_value)
    {
        // 如果元素个数小于等于 zset_max_listpack_entries 且元素长度小于等于 zset_max_listpack_value,使用 listpack 编码
        return createZsetListpackObject();
    }

    // 否则使用 skiplist 编码
    robj *zobj = createZsetObject();
    zset *zs = zobj->ptr;
    dictExpand(zs->dict, size_hint);
    return zobj;
}

void zsetTypeMaybeConvert(robj *zobj, size_t size_hint) {
    // 如果元素个数超过了 zset_max_listpack_entries,转换为 skiplist 编码,此时不关心元素长度
    if (zobj->encoding == OBJ_ENCODING_LISTPACK &&
        size_hint > server.zset_max_listpack_entries)
    {
        zsetConvertAndExpand(zobj, OBJ_ENCODING_SKIPLIST, size_hint);
    }
}

应用

  • 排行榜

    ZSET 可以用来实现排行榜。比如,我们可以使用 ZADD 来添加分数,使用 ZREVRANGE 来获取排行榜:

    $ ZADD post:like 10086 ch3nyang 20 A 30 B 10 C
    (integer) 4
    
    $ ZREVRANGE post:like 0 -1
    1) "ch3nyang"
    2) "B"
    3) "A"
    4) "C"
    

BITMAP

基本操作

BITMAP 就是一串二进制位,每一位都可以单独设置:

  • 使用 SETBIT 设置某一位
  • 使用 GETBIT 获取某一位
  • 使用 BITCOUNT 统计某一段的位数:
$ SETBIT user:1:like 0 1
(integer) 0

$ SETBIT user:1:like 1 0
(integer) 0

$ SETBIT user:1:like 2 1
(integer) 0

$ GETBIT user:1:like 0
(integer) 1

$ BITCOUNT user:1:like
(integer) 2

BITMAP 还支持一些位运算,比如 ANDORXORNOT

内部实现

在 Redis 内部,BITMAP 使用字符串来存储,然后单独操作每一位。

$ SETBIT user:1:like 0 1
(integer) 0

$ OBJECT ENCODING user:1:like
"raw"
展开实现代码

判断代码

robj *lookupStringForBitCommand(client *c, uint64_t maxbit, int *dirty) {
    size_t byte = maxbit >> 3;
    robj *o = lookupKeyWrite(c->db,c->argv[1]);
    if (checkType(c,o,OBJ_STRING)) return NULL;
    if (dirty) *dirty = 0;

    if (o == NULL) {
        o = createObject(OBJ_STRING,sdsnewlen(NULL, byte+1));
        dbAdd(c->db,c->argv[1],o);
        if (dirty) *dirty = 1;
    } else {
        o = dbUnshareStringValue(c->db,c->argv[1],o);
        size_t oldlen = sdslen(o->ptr);
        o->ptr = sdsgrowzero(o->ptr,byte+1);
        if (dirty && oldlen != sdslen(o->ptr)) *dirty = 1;
    }
    return o;
}

应用

  • 签到

    BITMAP 可以用来实现签到功能。比如,我们可以使用 SETBIT 来设置某一天是否签到:

    $ SETBIT user:1:signin 20250101 1
    (integer) 0
    
    $ SETBIT user:1:signin 20250102 1
    (integer) 0
    
    $ GETBIT user:1:signin 20250101
    (integer) 1
    
    $ BITCOUNT user:1:signin
    (integer) 2
    

HYPERLOGLOG

基本操作

HYPERLOGLOG 可以用来统计基数,即不重复元素的个数。要注意的是,它并不是精确的,而有 0.81% 的误差率。相比于其它精确统计的方法,HYPERLOGLOG 的优势在于它的空间复杂度是固定的,每个键只需要 12KB 的空间就可以统计 \(2^64\) 个元素。

使用 PFADD 添加元素,使用 PFCOUNT 统计元素个数,使用 PFMERGE 合并多个统计:

$ PFADD post:1:comment ch3nyang A B C D E A C C
(integer) 1

$ PFCOUNT post:1:comment
(integer) 6

$ PFADD post:2:comment ch3nyang A B C D E F G
(integer) 1

$ PFCOUNT post:2:comment
(integer) 8

$ PFMERGE post:comment post:1:comment post:2:comment
"OK"

$ PFCOUNT post:comment
(integer) 8

内部实现

在 Redis 内部,HYPERLOGLOG 使用 raw 字符串来存储:

$ PFADD post:1:comment ch3nyang A B C D E A C C
(integer) 1

$ OBJECT ENCODING post:1:comment
"raw"

这个字符串包含了 16 字节的头部,用于存储 HYPERLOGLOG 的元数据;以及 12KB 的空间,包含了 16384 个 6 位的桶,用于统计元素。我们将在后文详细介绍 HYPERLOGLOG 的实现。

应用

  • 网页 UV 统计

    HYPERLOGLOG 可以用来统计网页的 UV。比如,我们可以使用 PFADD 来统计用户访问网页的 IP:

    PFADD page:1:uv 192.168.0.1
    PFADD page:1:uv 192.168.0.2
    PFADD page:1:uv 192.168.0.3
    PFADD page:1:uv 192.168.0.1
    

    然后,使用 PFCOUNT 来统计 UV:

    PFCOUNT page:1:uv
    

GEO

基本操作

GEO 可以用来存储地理位置信息:

  • 使用 GEOADD 添加地理位置
  • 使用 GEOPOS 获取地理位置
  • 使用 GEOHASH 获取地理位置的哈希值
  • 使用 GEODIST 计算两个地理位置之间的距离
  • 使用 GEORADIUS 获取某一地理位置范围内的元素:
$ GEOADD user 116.404 39.915 ch3nyang 106.404 29.915 A 96.404 19.915 B

$ GEOPOS user ch3nyang
1) 1) "116.40400081872940063"
   2) "39.91500057149188763"

$ GEOHASH user ch3nyang
1) "wx4g0f6c9z0"

$ GEODIST user ch3nyang A
"1436341.5183"

$ GEORADIUS user 116.404 39.915 2000 km
1) "A"
2) "ch3nyang"

内部实现

在 Redis 内部,GEO 直接被视为 ZSET,作为有序集合存在。它的键为用户定义的名字,值为经纬度通过 GEOHASH 编码后的结果。

$ GEOADD user 116.404 39.915 ch3nyang 106.404 29.915 A 96.404 19.915 B
(integer) 3

$ OBJECT ENCODING user
"listpack"

具体到单个坐标,GEO 考虑了其精度问题,使用交错编码的方式来存储经纬度,同时还记录了精度。其定义如下:

typedef struct {
    uint64_t bits; // 编码后的结果
    uint8_t step; // 精度
} GeoHashBits;
展开实现代码

实现代码

int geohashEncode(const GeoHashRange *long_range, const GeoHashRange *lat_range,
                  double longitude, double latitude, uint8_t step,
                  GeoHashBits *hash) {
    // 输入验证与范围检查
    if (hash == NULL || step > 32 || step == 0 ||
        RANGEPISZERO(lat_range) || RANGEPISZERO(long_range)) return 0;
    if (longitude > GEO_LONG_MAX || longitude < GEO_LONG_MIN ||
        latitude > GEO_LAT_MAX || latitude < GEO_LAT_MIN) return 0;

    // bits 用于存储编码后的结果,step 用于存储精度
    hash->bits = 0;
    hash->step = step;

    if (latitude < lat_range->min || latitude > lat_range->max ||
        longitude < long_range->min || longitude > long_range->max) {
        return 0;
    }

    // 计算偏移量,将经纬度转换为 0-1 之间的相对值
    double lat_offset =
        (latitude - lat_range->min) / (lat_range->max - lat_range->min);
    double long_offset =
        (longitude - long_range->min) / (long_range->max - long_range->min);

    // 将经纬度编码
    lat_offset *= (1ULL << step);
    long_offset *= (1ULL << step);
    hash->bits = interleave64(lat_offset, long_offset);
    return 1;
}

应用

  • 附近的人

    GEO 可以用来查找附近的人。比如,我们可以使用 GEOADD 来添加用户的地理位置,使用 GEORADIUS 来查找附近的人:

    $ GEOADD user 116.404 39.915 ch3nyang 106.404 29.915 A 96.404 19.915 B
    
    $ GEORADIUS user 116.404 39.915 2000 km
    1) "A"
    2) "ch3nyang"
    

STREAM

消息队列

我们先来看普通的生产者-消费者模型。它就如同一个先进先出的队列,生产者不断地往队列中添加消息,而消费者则不断地从队列中读取消息。

然而,如果一个生产者对应多个消费者,那么这个模型就会变得复杂。这时,我们需要引入消费者组。消费者组可以让多个消费者共同消费一个队列,每个消息只会被消费一次。换句话说,对于一条消息,同一个消费者组中只能有一个消费者消费一次。如果有多个消费者组,消费者组可以让我们实现一个发布-订阅模型。

这个模型在开发中有着重要应用。例如,我们对外提供一个服务,用户可以向服务器发送请求服务的消息,而我们有数台后端服务器来消费这些消息。通过消费者组,我们可以做到:

  • 一致性保证:每个消息只会被消费一次
  • 负载均衡:每台服务器都有机会消费到消息
  • 容错能力:当一台服务器宕机时,本来归它消费的消息会被其他服务器消费
  • 弹性扩展:服务压力大时,可以随时增加消费者来处理更多的消息

基本操作

Redis 的 STREAM 完美支持了消息队列的功能。

有一些符号可用于表述消息队列中的消息:

  • > - 用于消费者组的特殊符号,表示只获取从未被消费过的消息
  • $ - 表示最新(最大)的消息 ID
  • 0 - 表示最旧(最小)的消息 ID
  • + - 表示最大的消息 ID,常用于范围查询
  • - - 表示最小的消息 ID,常用于范围查询
  • * - 用于 XADD 命令时让 Redis 自动生成消息ID

对于单个消费者-生产者模型,我们可以:

  • 使用 XADD 添加消息
  • 使用 XLEN 获取消息数量
  • 使用 XRANGE 获取范围内的消息
  • 使用 XREAD 读取消息
  • 使用 XDEL 删除消息
$ XADD message * name ch3nyang
"1631770000000-0"

$ XADD message * name A
"1631770000001-0"

$ XADD message * name B
"1631770000002-0"

$ XADD message * name C
"1631770000003-0"

$ XLEN message
(integer) 4

$ XREAD STREAMS message 0
1) 1) "message"
   2) 1) 1) "1631770000000-0"
         2) 1) "name"
            2) "ch3nyang"

$ XRANGE message - +
1) 1) "1631770000000-0"
   2) 1) "name"
      2) "ch3nyang"
2) 1) "1631770000001-0"
    2) 1) "name"
        2) "A"
3) 1) "1631770000002-0"
    2) 1) "name"
        2) "B"
4) 1) "1631770000003-0"
    2) 1) "name"
        2) "C"

$ XDEL message 1631770000000-0
(integer) 1

$ XLEN message
(integer) 3

需要注意的是,XREAD 可以读取同一个消息多次。

我们也可以让消费者阻塞,直到有新消息产生:

$ XREAD BLOCK 10000 STREAMS message $
(nil)
(10.00s)

它也支持消费者组:

  • 使用 XGROUP CREATE 创建消费者组
  • 使用 XGROUP SETID 设置消费者组的 ID
  • 使用 XREADGROUP 读取消息
  • 使用 XACK 确认消息
  • 使用 XPENDING 查看未确认的消息
$ XGROUP CREATE message consumer-group1 $
OK

$ XGROUP CREATE message consumer-group2 $
OK

$ XADD message * name ch3nyang
"1631770000000-0"

$ XADD message * name A
"1631770000001-0"

$ XADD message * name B
"1631770000002-0"

$ XADD message * name C
"1631770000003-0"

$ XREADGROUP GROUP consumer-group1 consumer1 STREAMS message >
1) 1) "message"
    2) 1) 1) "1631770000000-0"
          2) 1) "name"
              2) "ch3nyang"
       2) 1) 1) "1631770000001-0"
          2) 1) "name"
              2) "A"
        3) 1) 1) "1631770000002-0"
            2) 1) "name"
                2) "B"
        4) 1) 1) "1631770000003-0"
            2) 1) "name"
                2) "C"

$ XREADGROUP GROUP consumer-group1 consumer2 STREAMS message >
(nil)

$ XREADGROUP GROUP consumer-group2 consumer1 COUNT 1 STREAMS message >
1) 1) "message"
    2) 1) 1) "1631770000000-0"
          2) 1) "name"
              2) "ch3nyang"

$ XPENDING message consumer-group1
1) (integer) 4
2) "1631770000000-0"
3) "1631770000003-0"
4) 1) 1) "consumer1"
      2) "4"

$ XACK message consumer-group1 1631770000000-0
(integer) 1

$ XPENDING message consumer-group1
1) (integer) 3
2) "1631770000001-0"
3) "1631770000003-0"
4) 1) 1) "consumer1"
      2) "3"

为什么在 XREADGROUP 后还需要 XACK 呢?这是因为 XREADGROUP 只是读取消息,读取后消息会一直存在,直到被确认。这是为了在消费者处理消息中途出错或宕机时,消息不会丢失,可以在恢复后重新处理。消费者在处理完成后,发送 XACK 来确认消息,此时消息队列就可以放心地删除这条消息了。

内部实现

在 Redis 内部,STREAM 使用 stream 来存储:

$ XADD message * name ch3nyang
"1631770000000-0"

$ OBJECT ENCODING message
"stream"

stream 里的每个 entry 都使用 listpack 来存储,然后使用一个基数树来索引这些 entry。这样,STREAM 可以快速地插入、删除和查询消息。

展开实现代码

stream 的结构体

typedef struct stream {
    rax *rax; // 基数树
    uint64_t length; // 当前消息数量
    streamID last_id; // 最新消息 ID,如果没有则置 0
    streamID first_id; // 最旧消息 ID
    streamID max_deleted_entry_id; // 最大被删除的消息 ID
    uint64_t entries_added; // 添加过的总消息数量
    rax *cgroups; // 消费者组字典:name -> streamCG
} stream;

rax 是基数树,你可以把它理解为字典树,但它对其进行了压缩处理,也就是说,字典树的每个节点可以存储一个字符串而不是单个字符。rax 的结构体如下:

typedef struct rax {
    raxNode *head;
    uint64_t numele;
    uint64_t numnodes;
    void *metadata[];
} rax;

typedef struct raxNode {
    uint32_t iskey:1; // 该节点是否包含键
    uint32_t isnull:1; // 该节点是否为空
    uint32_t iscompr:1; // 该节点是否为压缩节点
    uint32_t size:29; // 子节点数量 / 数据长度
    unsigned char data[];
} raxNode;

回到 stream,其中的消费者组 cgroups 也是一个基数树,用于存储消费者组的信息。streamCG 的结构体如下:

typedef struct streamCG {
    streamID last_id; // 最后接收但未确认的消息 ID
    long long entries_read; // 已读取的消息数量
    rax *pel; // 待确认消息列表
    rax *consumers; // 消费者字典:name -> streamConsumer
} streamCG;

typedef struct streamConsumer {
    mstime_t seen_time; // 最后一次尝试读取/确认消息时间
    mstime_t active_time; // 最后一次成功读取/确认消息时间
    sds name; // 消费者名
    rax *pel; // 待确认消息列表
} streamConsumer;

pel 是待确认消息列表,用于存储消费者未确认的消息。它的结构如下:

typedef struct streamNACK {
    mstime_t delivery_time; // 该消息被投递的时间
    uint64_t delivery_count; // 该消息被投递的次数
    streamConsumer *consumer; // 最后一次投递给的消费者
} streamNACK;

应用

Redis 的 STREAM 可以用于实现消息队列,但它毕竟不是专业的,存在着两个问题:

  • 消息丢失

    STREAM 使用确认机制来保证消息生产者到中间件、中间件到消费者这两个过程不会发生消息丢失。但是,Redis 作为消息中间件本身是可能丢失消息的:

    • AOF 异步持久化时,如果 Redis 宕机,可能会丢失消息
    • 异步主从复制时,如果主节点宕机,可能会丢失消息

    相比之下,专业的消息中间件如 Kafka、RabbitMQ 等,使用了多副本的方式来保证消息不会丢失。当生产者发送消息时,消息会被发送到多个副本。即便某个副本宕机,消息也不会丢失。

  • 消息堆积

    STREAM 的消息存储在内存中,如果消息堆积过多,可能会导致 Redis 内存溢出。

    而专业的消息中间件则会将消息存储在磁盘中,即便消息堆积过多,也不会导致内存溢出。

综上,STREAM 适合用于一些简单的消息队列,但对于需求较高的场景,还是需要专业的消息中间件。

MODULE

Redis 还支持 MODULE,允许用户自定义命令、数据结构和事件。这使得 Redis 可以更好地适应不同的业务场景。

MODULE 的基本操作如下:

  • 使用 MODULE LOAD 加载模块
  • 使用 MODULE UNLOAD 卸载模块
  • 使用 MODULE LIST 查看已加载的模块
  • 使用 MODULE COMMAND 查看模块的命令
  • 使用 MODULE GETAPI 获取模块的 API

这其实相当于给 Redis 添加了一个插件系统,用户可以根据自己的需求来扩展 Redis 的功能。我们在此不再详细介绍。

Series 深入 Redis 源码

Comments

Share This Post