Redis Stream
Table of Contents
1. Stream 简介
Redis 5.0 中引入了新数据类型:stream,它借鉴了 Kafka 很多思想。操作 stream 的相关命令都以 X
开头,如 XADD
, XLEN
等等。
2. 写入数据(XADD)
使用命令 XADD 可以往 stream 中增加消息。比如:
> XADD mystream * sensor-id 1234 temperature 19.8 1518951480106-0
上面例子中:
- 第一个参数 mystream 是 stream 的名字。
- 第二个参数 * 表示由 Redis 服务器生成一个消息 ID,也可以自己指定消息 ID(这种用法很少),这时你需要保证消息 ID 的递增性。
- 后面的参数 sensor-id 1234 temperature 19.8 是所增加消息的内容。这个例子中只指定了两个“field-value”对,你可以指定很多对,它的一般形式是 field1 value1 field2 value2 ... fieldN vauleN。
- 返回的信息 1518951480106-0 表示所增加消息的 ID,这里返回的是 Redis 服务器所生成的消息 ID。如果第二个参数是自己指定消息 ID,则这里会直接返回之前指定的消息 ID。
注:目前没有一个单独的命令来创建一个“空 stream”,当 stream 不存在时, XADD
会自动创建它。
2.1. 消息 ID 的格式(millisecondsTime-sequenceNumber)
前面例子中的消息 ID 为“1518951480106-0”。消息 ID 由两部分组成,中间用连字符隔开,它的格式如下:
millisecondsTime-sequenceNumber
前半部分是“时间戳(毫秒)”,后半部分是“序号”。这样可以保证同一毫秒内的消息也有不同的序号。
3. 读取 Stream 数据的三种方式
从 stream 中读取数据有三种方式,下面将分别介绍它。
3.1. 方式一:查询范围(XRANGE or XREVRANGE)
使用命令 XRANGE 可以查询 stream 中指定范围内消息,用法为:
XRANGE key start end [COUNT count]
下面是一个实例:
> XRANGE mystream 1518951480106 1518951480107 # 等同于 XRANGE mystream 1518951480106-0 1518951480107-0 1) 1) 1518951480106-0 2) 1) "sensor-id" 2) "1234" 3) "temperature" 4) "19.8"
上面命令会返回消息 ID 满足条件“1518951480106-0 <= id <= 1518951480107-0”(注:是闭区间)的所有消息。
要查询 stream 中的所有消息可以指定范围为从 -
到 +
,如:
> XRANGE mystream - + 1) 1) 1518951480106-0 2) 1) "sensor-id" 2) "1234" 3) "temperature" 4) "19.8" 2) 1) 1518951482479-0 2) 1) "sensor-id" 2) "9999" 3) "temperature" 4) "18.2"
XREVRANGE 和 XRANGE 类似,也是查询范围,它的用法为:
XREVRANGE key end start [COUNT count]
和 XRANGE 不同的地方在于,先指定“end”参数,然后再指定“start”参数。
3.2. 方式二:监听模式(XREAD)
使用命令 XREAD 也可以从 stream 中读取消息,它的用法为:
XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] id [id ...]
下面是一个实例:
XREAD BLOCK 5000 STREAMS mystream $
上面例子中,消息 ID $
表示“新消息”,也就是如果有新消息,则读取它,如果没有新消息,则等待 5 秒,还没有就超时退出。
特别说明:要在循环中读取所有的“新消息”,我们只能指定一次 $
,第一次读取得到消息 ID 后,后续读取时需要指定前面得到的消息 ID。下面这样的写法(循环中多次使用 $
)是不对的:
// 下面是错误写法! for { XREAD BLOCK 5000 STREAMS mystream $ // process msg }
为什么上面写法错误呢?因为在两次执行命令 XREAD BLOCK 5000 STREAMS mystream $
的时间窗口内可能有新消息加入。
正确的写法(伪代码)是:
XREAD BLOCK 5000 STREAMS mystream $ get id (e.g. xxx-xxx) // process msg id = xxx-xxx for { XREAD BLOCK 5000 STREAMS mystream id // process msg get id (e.g. yyy-yyy) id = yyy-yyy // update id }
注: 当有多个客户端都使用 XRAED 命令读取消息时,消息会到达每一个客户端,这可以实现“广播”场景。
3.3. 方式三:消费组(XREADGROUP)
我们知道使用命令 XREAD 读取 stream 中消息时,消息会到达每一个客户端。但有时,我们希望消息仅被一个客户端处理即可。比如有 7 个消息,3 个消费者,我们希望像下面描述场景来处理消息:
1 -> C1 2 -> C2 3 -> C3 4 -> C1 5 -> C2 6 -> C3 7 -> C1
这可以通过“消费组”(如图 1 所示,摘自:New Features of DCS Redis 5.0)来实现,消费组有下面行为:
- 一个 stream 可以有多个“消费组”,“消费组”可以包含多个消费者;
- 每个“消费组”都有一个 last_delivered_id,表示最后一个已经投递给消费者的 ID 号;
- 一个消费组内,某条消息只会投递个某个消费者(不会投递给每个消费者);
- 每个消费者都有一个 pending_ids,记录着“已经投递给消费者,但还没有收到消费者确认消息(Ack 消息)”的消息 ID。
Figure 1: Consumer Group
3.3.1. 管理消费组(XGROUP)
使用命令 XGROUP 可以管理消息组,如创建消费组、删除消费组等等。其语法为:
XGROUP [CREATE key groupname id-or-$] [SETID key groupname id-or-$] [DESTROY key groupname] [DELCONSUMER key groupname consumername]
下面例子将为名为 newstream 的 stream 创建消费组 mygroup:
> XGROUP CREATE newstream mygroup $ OK
上面命令中, $
表示仅投递 Redis 中的新消息到消费组内的消费者。当指定为 0 时表示投递 stream 中所有消息(包含历史消息)到消费组内的消费者,如:
> XGROUP CREATE newstream mygroup 0 OK
默认,创建消费组时如果指定的 stream 不存在会报错。增加参数 MKSTREAM ,可以在 stream 不存在时自动创建它:
> XGROUP CREATE newstream mygroup $ MKSTREAM OK
3.3.2. 消费组内的消费者读取消息(XREADGROUP)
使用命令 XREADGROUP 可以使用指定消费者读取消息。其语法为:
XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...]
下面例子将以消费者 Alice(属于消费组 mygroup)的身份从 mystream 中读取消息:
> XREADGROUP GROUP mygroup Alice COUNT 1 STREAMS mystream > 1) 1) "mystream" 2) 1) 1) 1526569495631-0 2) 1) "message" 2) "apple"
XREADGROUP 命令中的 ID,可以指定为下面两种情况:
>
表示消费者希望接收“还没有投递给其它消费者的消息”。- 其它 ID(往往写为
0
,也可以是其它有效的 ID),表示从 pending_ids(未确认的消息列表)中读取消息。
3.3.3. 确认消息(XACK)
使用命令 XACK 可以对消息进行确认。其语法为:
XACK key group ID [ID ...]
下面例子将确认消息 1526569495631-0:
> XACK mystream mygroup 1526569495631-0 (integer) 1
3.3.4. 错误恢复情况一:消费者重启
如果消费者重启,可能有些消息之前收到后没有处理完(当然也没有 Ack)。这时我们可以 先使用 XREADGROUP 命令时指定 ID 为 0
(表示读取 pending 的消息),处理它;然后再指定 ID 为 >
读取新消息。
4. Stream 其它命令
stream 的其它命令如表 1 所示。
命令 | 作用 |
---|---|
XINFO | 查看 stream 相关信息 |
XLEN | 查看 stream 长度(消息个数) |
XDEL | 根据 ID 删除消息 |
XTRIM | 只保存指定个数的消息(可能丢弃 ID 小的消息) |
注:stream 本身就是一个 key 值,使用 DEL
可以删除整个 stream。