Redis Stream

Table of Contents

1. Stream 简介

Redis 5.0 中引入了新数据类型:stream,它借鉴了 Kafka 很多思想。操作 stream 的相关命令都以 X 开头,如 XADD, XLEN 等等。

参考:
Introduction to Redis Streams

2. 写入数据(XADD)

使用命令 XADD 可以往 stream 中增加消息。比如:

> XADD mystream * sensor-id 1234 temperature 19.8
1518951480106-0

上面例子中:

  • 第一个参数 mystream 是 stream 的名字。
  • 第二个参数 * 表示由 Redis 服务器生成一个消息 ID,也可以自己指定消息 ID(这种用法很少),这时你需要保证消息 ID 的递增性。
  • 后面的参数 sensor-id 1234 temperature 19.8 是所增加消息的内容。这个例子中只指定了两个“field-value”对,你可以指定很多对,它的一般形式是 field1 value1 field2 value2 ... fieldN vauleN。
  • 返回的信息 1518951480106-0 表示所增加消息的 ID,这里返回的是 Redis 服务器所生成的消息 ID。如果第二个参数是自己指定消息 ID,则这里会直接返回之前指定的消息 ID。

注:目前没有一个单独的命令来创建一个“空 stream”,当 stream 不存在时, XADD 会自动创建它。

2.1. 消息 ID 的格式(millisecondsTime-sequenceNumber)

前面例子中的消息 ID 为“1518951480106-0”。消息 ID 由两部分组成,中间用连字符隔开,它的格式如下:

millisecondsTime-sequenceNumber

前半部分是“时间戳(毫秒)”,后半部分是“序号”。这样可以保证同一毫秒内的消息也有不同的序号。

3. 读取 Stream 数据的三种方式

从 stream 中读取数据有三种方式,下面将分别介绍它。

3.1. 方式一:查询范围(XRANGE or XREVRANGE)

使用命令 XRANGE 可以查询 stream 中指定范围内消息,用法为:

XRANGE key start end [COUNT count]

下面是一个实例:

> XRANGE mystream 1518951480106 1518951480107   # 等同于 XRANGE mystream 1518951480106-0 1518951480107-0
1) 1) 1518951480106-0
   2) 1) "sensor-id"
      2) "1234"
      3) "temperature"
      4) "19.8"

上面命令会返回消息 ID 满足条件“1518951480106-0 <= id <= 1518951480107-0”(注:是闭区间)的所有消息。

要查询 stream 中的所有消息可以指定范围为从 -+ ,如:

> XRANGE mystream - +
1) 1) 1518951480106-0
   2) 1) "sensor-id"
      2) "1234"
      3) "temperature"
      4) "19.8"
2) 1) 1518951482479-0
   2) 1) "sensor-id"
      2) "9999"
      3) "temperature"
      4) "18.2"

XREVRANGE 和 XRANGE 类似,也是查询范围,它的用法为:

XREVRANGE key end start [COUNT count]

和 XRANGE 不同的地方在于,先指定“end”参数,然后再指定“start”参数。

3.2. 方式二:监听模式(XREAD)

使用命令 XREAD 也可以从 stream 中读取消息,它的用法为:

XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] id [id ...]

下面是一个实例:

XREAD BLOCK 5000 STREAMS mystream $

上面例子中,消息 ID $ 表示“新消息”,也就是如果有新消息,则读取它,如果没有新消息,则等待 5 秒,还没有就超时退出。

特别说明:要在循环中读取所有的“新消息”,我们只能指定一次 $ ,第一次读取得到消息 ID 后,后续读取时需要指定前面得到的消息 ID。下面这样的写法(循环中多次使用 $ )是不对的:

// 下面是错误写法!

for {
    XREAD BLOCK 5000 STREAMS mystream $
    // process msg
}

为什么上面写法错误呢?因为在两次执行命令 XREAD BLOCK 5000 STREAMS mystream $ 的时间窗口内可能有新消息加入。

正确的写法(伪代码)是:

XREAD BLOCK 5000 STREAMS mystream $
get id (e.g. xxx-xxx)
// process msg

id = xxx-xxx
for {
    XREAD BLOCK 5000 STREAMS mystream id
    // process msg
    get id (e.g. yyy-yyy)
    id = yyy-yyy  // update id
}

注: 当有多个客户端都使用 XRAED 命令读取消息时,消息会到达每一个客户端,这可以实现“广播”场景。

3.3. 方式三:消费组(XREADGROUP)

我们知道使用命令 XREAD 读取 stream 中消息时,消息会到达每一个客户端。但有时,我们希望消息仅被一个客户端处理即可。比如有 7 个消息,3 个消费者,我们希望像下面描述场景来处理消息:

1 -> C1
2 -> C2
3 -> C3
4 -> C1
5 -> C2
6 -> C3
7 -> C1

这可以通过“消费组”(如图 1 所示,摘自:New Features of DCS Redis 5.0)来实现,消费组有下面行为:

  • 一个 stream 可以有多个“消费组”,“消费组”可以包含多个消费者;
  • 每个“消费组”都有一个 last_delivered_id,表示最后一个已经投递给消费者的 ID 号;
  • 一个消费组内,某条消息只会投递个某个消费者(不会投递给每个消费者);
  • 每个消费者都有一个 pending_ids,记录着“已经投递给消费者,但还没有收到消费者确认消息(Ack 消息)”的消息 ID。

redis_stream.png

Figure 1: Consumer Group

3.3.1. 管理消费组(XGROUP)

使用命令 XGROUP 可以管理消息组,如创建消费组、删除消费组等等。其语法为:

XGROUP [CREATE key groupname id-or-$] [SETID key groupname id-or-$] [DESTROY key groupname] [DELCONSUMER key groupname consumername]

下面例子将为名为 newstream 的 stream 创建消费组 mygroup:

> XGROUP CREATE newstream mygroup $
OK

上面命令中, $ 表示仅投递 Redis 中的新消息到消费组内的消费者。当指定为 0 时表示投递 stream 中所有消息(包含历史消息)到消费组内的消费者,如:

> XGROUP CREATE newstream mygroup 0
OK

默认,创建消费组时如果指定的 stream 不存在会报错。增加参数 MKSTREAM ,可以在 stream 不存在时自动创建它:

> XGROUP CREATE newstream mygroup $ MKSTREAM
OK

3.3.2. 消费组内的消费者读取消息(XREADGROUP)

使用命令 XREADGROUP 可以使用指定消费者读取消息。其语法为:

XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...]

下面例子将以消费者 Alice(属于消费组 mygroup)的身份从 mystream 中读取消息:

> XREADGROUP GROUP mygroup Alice COUNT 1 STREAMS mystream >
1) 1) "mystream"
   2) 1) 1) 1526569495631-0
         2) 1) "message"
            2) "apple"

XREADGROUP 命令中的 ID,可以指定为下面两种情况:

  • > 表示消费者希望接收“还没有投递给其它消费者的消息”。
  • 其它 ID(往往写为 0 ,也可以是其它有效的 ID),表示从 pending_ids(未确认的消息列表)中读取消息。

3.3.3. 确认消息(XACK)

使用命令 XACK 可以对消息进行确认。其语法为:

XACK key group ID [ID ...]

下面例子将确认消息 1526569495631-0:

> XACK mystream mygroup 1526569495631-0
(integer) 1

3.3.4. 错误恢复情况一:消费者重启

如果消费者重启,可能有些消息之前收到后没有处理完(当然也没有 Ack)。这时我们可以 先使用 XREADGROUP 命令时指定 ID 为 0 (表示读取 pending 的消息),处理它;然后再指定 ID 为 > 读取新消息。

3.3.5. 错误恢复情况二:消费者不再工作(XPENDING 和 XCLAIM)

假如某个消费者有 pending 的消息,然后 crash 了,不过它也不启动了,怎么办呢?针对这种情况,Redis Stream 提供了查询消息处理状态的命令 XPENDING 和转移消息所有者的命令 XCLAIM

4. Stream 其它命令

stream 的其它命令如表 1 所示。

Table 1: stream 其它命令
命令 作用
XINFO 查看 stream 相关信息
XLEN 查看 stream 长度(消息个数)
XDEL 根据 ID 删除消息
XTRIM 只保存指定个数的消息(可能丢弃 ID 小的消息)

注:stream 本身就是一个 key 值,使用 DEL 可以删除整个 stream。

Author: cig01

Created: <2018-12-15 Sat>

Last updated: <2019-12-31 Tue>

Creator: Emacs 27.1 (Org mode 9.4)