Redis Stream 是 Redis 5.0 版别中引入的一种新的数据结构,它用于完成简略但功能强大的音讯传递形式。

这篇文章,咱们聊聊 Redis Stream 基本用法 ,以及如安在 SpringBoot 项目中应用 Redis Stream 。

Redis stream 用做音讯行列完美吗 ?

1 基础知识

Redis Stream 的结构如下图所示,它是一个音讯链表,将一切参加的音讯都串起来,每个音讯都有一个唯一的 ID 和对应的内容。

Redis stream 用做音讯行列完美吗 ?

每个 Redis Stream 都有唯一的称号 ,对应唯一的 Redis Key 。

同一个 Stream 能够挂载多个消费组 ConsumerGroup , 消费组不能主动创立,需求运用 XGROUP CREATE 指令创立

每个消费组会有个游标 last_delivered_id,恣意一个顾客读取了音讯都会使游标 last_delivered_id 往前移动 ,标识当时消费组消费到哪条音讯了。

消费组 ConsumerGroup 相同能够挂载多个顾客 Consumer , 每个 Consumer 并行的读取音讯,恣意一个顾客读取了音讯都会使游标 last_delivered_id 往前移动。

顾客内部有一个属性 pending_ids , 记录了当时顾客读取但没有回复 ACK 的音讯 ID 列表 。

2 核心指令

01 XADD 向 Stream 结尾添加音讯

运用 XADD 向行列添加音讯,假如指定的行列不存在,则创立一个行列。基础语法格局:

XADD key ID field value [field value ...]
  • key :行列称号,假如不存在就创立
  • ID :音讯 id,咱们运用 * 表明由 redis 生成,能够自界说,可是要自己确保递加性。
  • field value : 记录。
127.0.0.1:6379> XADD mystream * name1 value1 name2 value2
"1712473185388-0"
127.0.0.1:6379> XLEN mystream
(integer) 1
127.0.0.1:6379> XADD mystream * name2 value2 name3 value3
"1712473231761-0"

音讯 ID 运用 * 表明由 redis 生成,同时也能够自界说,可是自界说时要确保递加性。

音讯 ID 的格局: 毫秒级时刻戳 + 序号 , 例如:1712473185388-5 , 它表明当时音讯在毫秒时刻戳 1712473185388 发生 ,而且该毫秒内发生到了第5条音讯。

在添加行列音讯时,也能够指定行列的长度

127.0.0.1:6379> XADD mystream MAXLEN 100 * name value1 age 30
"1713082205042-0"

运用 XADD 指令向 mystream 的 stream 中添加了一条音讯,而且指定了最大长度为 100。音讯的 ID 由 Redis 主动生成,音讯包括两个字段 nameage,别离对应的值是 value130

02 XRANGE 获取音讯列表

运用 XRANGE 获取音讯列表,会主动过滤现已删去的音讯。语法格局:

XRANGE key start end [COUNT count]
  • key :行列名
  • start :开端值, 表明最小值
  • end :完毕值, + 表明最大值
  • count :数量
127.0.0.1:6379> XRANGE mystream - + COUNT 2
1) 1) "1712473185388-0"
  2) 1) "name1"
   2) "value1"
   3) "name2"
   4) "value2"
2) 1) "1712473231761-0"
  2) 1) "name2"
   2) "value2"
   3) "name3"
   4) "value3"

咱们得到两条音讯,第一层是音讯 ID ,第二层是音讯内容 ,音讯内容是 Hash 数据结构 。

03 XREAD 以堵塞/非堵塞方式获取音讯列表

运用 XREAD 以堵塞或非堵塞方式获取音讯列表 ,语法格局:

XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] id [id ...]
  • count :数量
  • milliseconds :可选,堵塞毫秒数,没有设置便是非堵塞形式
  • key :行列名
  • id :音讯 ID
127.0.0.1:6379> XREAD streams mystream 0-0
1) 1) "mystream"
  2) 1) 1) "1712473185388-0"
     2) 1) "name1"
      2) "value1"
      3) "name2"
      4) "value2"
   2) 1) "1712473231761-0"
     2) 1) "name2"
      2) "value2"
      3) "name3"
      4) "value3"

XRED 读音讯时分为堵塞非堵塞形式,运用 BLOCK 选项能够表明堵塞形式,需求设置堵塞时长。非堵塞形式下,读取完毕(即使没有任何音讯)当即回来,而在堵塞形式下,若读取不到内容,则堵塞等候。

127.0.0.1:6379> XREAD block 1000 streams mystream $
(nil)
(1.07s)

运用 Block 形式,配合 $ 作为 ID ,表明读取最新的音讯,若没有音讯,指令堵塞!等候过程中,其他客户端向行列追加音讯,则会当即读取到。

因而,典型的行列便是 XADD 配合 XREAD Block 完结。XADD 担任生成音讯,XREAD 担任消费音讯。

04 XGROUP CREATE 创立顾客组

运用 XGROUP CREATE 创立顾客组,分两种情况:

  • 从头开端消费:
XGROUP CREATE mystream consumer-group-name 0-0 
  • 从尾部开端消费:
XGROUP CREATE mystream consumer-group-name $

履行作用如下:

127.0.0.1:6379> XGROUP CREATE mystream mygroup 0-0
OK

05 XREADGROUP GROUP 读取消费组中的音讯

运用 XREADGROUP GROUP 读取消费组中的音讯,语法格局:

XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...]
  • group :消费组名
  • consumer :顾客名。
  • count : 读取数量。
  • milliseconds : 堵塞毫秒数。
  • key : 行列名。
  • ID : 音讯 ID。

示例:

127.0.0.1:6379>  XREADGROUP group mygroup consumerA count 1 streams mystream >
1) 1) "mystream"
  2) 1) 1) "1712473185388-0"
     2) 1) "name1"
      2) "value1"
      3) "name2"
      4) "value2"

顾客组 mygroup 中的顾客 consumerA ,从 名为 mystream 的 Stream 中读取音讯。

  • COUNT 1 表明一次最多读取一条音讯
  • > 表明音讯的起始位置是当时可用音讯的 ID,即从当时未读取的最早音讯开端读取。

06 XACK 音讯消费承认

接收到音讯之后,咱们要手动承认一下(ack),语法格局:

xack key group-key ID [ID ...]

示例:

127.0.0.1:6379> XACK mystream mygroup 1713089061658-0
(integer) 1

消费承认增加了音讯的可靠性,一般在事务处理完结之后,需求履行 ack 承认音讯现已被消费完结,整个流程的履行如下图所示:

Redis stream 用做音讯行列完美吗 ?

咱们能够运用 xpending 指令查看顾客未承认的音讯ID

127.0.0.1:6379> xpending mystream mygroup
1) (integer) 1
2) "1713091227595-0"
3) "1713091227595-0"
4) 1) 1) "consumerA"
   2) "1"

07 XTRIM 约束 Stream 长度

咱们运用 XTRIM 对流进行修剪,约束长度, 语法格局:

127.0.0.1:6379> XADD mystream * field1 A field2 B field3 C field4 D
"1712535017402-0"
127.0.0.1:6379> XTRIM mystream MAXLEN 2
(integer) 4
127.0.0.1:6379> XRANGE mystream - +
1) 1) "1712498239430-0"
  2) 1) "name"
   2) "zhangyogn"
2) 1) "1712535017402-0"
  2) 1) "field1"
   2) "A"
   3) "field2"
   4) "B"
   5) "field3"
   6) "C"
   7) "field4"
   8) "D"

3 SpringBoot Redis Stream 实战

1、添加 SpringBoot Redis 依靠

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

2、yaml 文件配置

Redis stream 用做音讯行列完美吗 ?

3、RedisTemplate 配置

Redis stream 用做音讯行列完美吗 ?

4、界说stream监听器

Redis stream 用做音讯行列完美吗 ?

5、界说streamcontainer 并启动

Redis stream 用做音讯行列完美吗 ?

6、发送音讯

Redis stream 用做音讯行列完美吗 ?

履行完结之后,顾客就能够打印如下日志:

Redis stream 用做音讯行列完美吗 ?

演示代码地址

github.com/makemyownli…

4 Redis stream 用做音讯行列完美吗

笔者以为 Redis stream 用于音讯行列最大的进步在于:完成了发布订阅模型

发布订阅模型具有如下特色:

  • 消费独立

    相比行列模型的匿名消费方式,发布订阅模型中消费方都会具有的身份,一般叫做订阅组(订阅关系),不同订阅组之间相互独立不会相互影响。

  • 一对多通讯

    根据独立身份的设计,同一个主题内的音讯能够被多个订阅组处理,每个订阅组都能够拿到全量音讯。因而发布订阅模型能够完成一对多通讯。

细品 Redis stream 的设计,咱们发现它和 Kafka 十分类似,比如说顾客组,消费进度偏移量等。

咱们从前诟病 Redis List 数据结构用做行列时,由于消费时没有 Ack 机制,应用反常挂掉导致音讯偶发丢掉的情况,Redis Stream 现已完美的解决了。

由于顾客内部有一个属性 pending_ids , 记录了当时顾客读取但没有回复 ACK 的音讯 ID 列表 。当顾客从头上线,这些音讯能够从头被消费。

但 Redis stream 用做音讯行列完美吗 ?

这个真没有!

1、Redis 本身定位内存数据库,它的设计之初都是为缓存预备的,并不具有音讯堆积的能力。而专业音讯行列一个十分重要的功能是数据中转枢纽,Redis 的定位很难满意,所以运用起来要十分小心。

2、Redis 的高可用计划可能丢掉音讯(AOF 持久化 和 主从复制都是异步 ),而专业音讯行列能够针对不同的场景挑选不同的高可用策略。

所以,笔者以为 Redis 十分合适轻量级音讯行列解决计划,轻量级意味着:数据量可控 + 事务模型简略 。


参考文章:

redis.io/docs/data-t…

www.runoob.com/redis/redis…

pdai.tech/md/db/nosql…


假如我的文章对你有所帮助,还请帮忙点赞、在看、转发一下,你的支持会鼓励我输出更高质量的文章,十分感谢!