教程 > redis教程 > 阅读:43

redis 流(stream)——迹忆客-ag捕鱼王app官网

redis 流


redis stream 是 redis 5.0 版本新增加的数据结构。

redis stream 主要用于消息队列(mq,message queue),redis 本身是有一个 redis 发布订阅 (pub/sub) 来实现消息队列的功能,但它有个缺点就是消息无法持久化,如果出现网络断开、redis 宕机等,消息就会被丢弃。

简单来说发布订阅 (pub/sub) 可以分发消息,但无法记录历史消息。

而 redis stream 提供了消息的持久化和主备复制功能,可以让任何客户端访问任何时刻的数据,并且能记住每一个客户端的访问位置,还能保证消息不丢失。

redis stream 的结构如下所示,它有一个消息链表,将所有加入的消息都串起来,每个消息都有一个唯一的 id 和对应的内容:

redis流结构图

每个 stream 都有唯一的名称,它就是 redis 的 key,在我们首次使用 xadd 指令追加消息时自动创建。

上图解析:

  • consumer group :消费组,使用 xgroup create 命令创建,一个消费组有多个消费者(consumer)。
  • last_delivered_id :游标,每个消费组会有个游标 last_delivered_id,任意一个消费者读取了消息都会使游标 last_delivered_id 往前移动。
  • pending_ids :消费者(consumer)的状态变量,作用是维护消费者的未确认的 id。 pending_ids 记录了当前已经被客户端读取的消息,但是还没有 ack (acknowledge character:确认字符)。

消息队列相关命令: xadd - 添加消息到末尾 xtrim - 对流进行修剪,限制长度 xdel - 删除消息 xlen - 获取流包含的元素数量,即消息长度 xrange - 获取消息列表,会自动过滤已经删除的消息 xrevrange - 反向获取消息列表,id 从大到小 xread - 以阻塞或非阻塞方式获取消息列表

xadd

使用 xadd 向队列添加消息,如果指定的队列不存在,则创建一个队列,xadd 语法格式:

xadd key id field value [field value ...]
  • key :队列名称,如果不存在就创建
  • id :消息 id,我们使用 * 表示由 redis 生成,可以自定义,但是要自己保证递增性。
  • field value : 记录。

示例

redis 127.0.0.1:6379> xadd mystream * name sara surname oconnor
"1601372323627-0"
redis 127.0.0.1:6379> xadd mystream * field1 value1 field2 value2 field3 value3
"1601372323627-1"
redis 127.0.0.1:6379> xlen mystream
(integer) 2
redis 127.0.0.1:6379> xrange mystream -  
1) 1) "1601372323627-0"
   2) 1) "name"
      2) "sara"
      3) "surname"
      4) "oconnor"
2) 1) "1601372323627-1"
   2) 1) "field1"
      2) "value1"
      3) "field2"
      4) "value2"
      5) "field3"
      6) "value3"

xtrim

使用 xtrim 对流进行修剪,限制长度, 语法格式:

xtrim key maxlen [~] count
  • key :队列名称
  • maxlen :长度
  • count :数量

示例

redis 127.0.0.1:6379> xadd mystream * field1 a field2 b field3 c field4 d
"1601372434568-0"
redis 127.0.0.1:6379> xtrim mystream maxlen 2
(integer) 0
redis 127.0.0.1:6379> xrange mystream -  
1) 1) "1601372434568-0"
   2) 1) "field1"
      2) "a"
      3) "field2"
      4) "b"
      5) "field3"
      6) "c"
      7) "field4"
      8) "d"
redis>

xdel

使用 xdel 删除消息,语法格式:

xdel key id [id ...]
  • key:队列名称
  • id :消息 id

示例

redis 127.0.0.1:6379> xadd mystream * a 1
1538561698944-0
redis 127.0.0.1:6379> xadd mystream * b 2
1538561700640-0
redis 127.0.0.1:6379> xadd mystream * c 3
1538561701744-0
redis 127.0.0.1:6379> xdel mystream 1538561700640-0
(integer) 1
redis 127.0.0.1:6379> xrange mystream -  
1) 1) 1538561698944-0
   2) 1) "a"
      2) "1"
2) 1) 1538561701744-0
   2) 1) "c"
      2) "3"

xlen

使用 xlen 获取流包含的元素数量,即消息长度,语法格式:

xlen key
  • key:队列名称

示例

redis 127.0.0.1:6379> xadd mystream * item 1
"1601372563177-0"
redis 127.0.0.1:6379> xadd mystream * item 2
"1601372563178-0"
redis 127.0.0.1:6379> xadd mystream * item 3
"1601372563178-1"
redis 127.0.0.1:6379> xlen mystream
(integer) 3

xrange

使用 xrange 获取消息列表,会自动过滤已经删除的消息 ,语法格式:

xrange key start end [count count]
  • key :队列名
  • start :开始值, - 表示最小值
  • end :结束值, 表示最大值
  • count :数量

示例

redis 127.0.0.1:6379> xadd writers * name virginia surname woolf
"1601372577811-0"
redis 127.0.0.1:6379> xadd writers * name jane surname austen
"1601372577811-1"
redis 127.0.0.1:6379> xadd writers * name toni surname morrison
"1601372577811-2"
redis 127.0.0.1:6379> xadd writers * name agatha surname christie
"1601372577812-0"
redis 127.0.0.1:6379> xadd writers * name ngozi surname adichie
"1601372577812-1"
redis 127.0.0.1:6379> xlen writers
(integer) 5
redis 127.0.0.1:6379> xrange writers -   count 2
1) 1) "1601372577811-0"
   2) 1) "name"
      2) "virginia"
      3) "surname"
      4) "woolf"
2) 1) "1601372577811-1"
   2) 1) "name"
      2) "jane"
      3) "surname"
      4) "austen"

xrevrange

使用 xrevrange 获取消息列表,会自动过滤已经删除的消息 ,语法格式:

xrevrange key end start [count count]
  • key :队列名
  • end :结束值, 表示最大值
  • start :开始值, - 表示最小值
  • count :数量

示例

redis 127.0.0.1:6379> xadd writers * name virginia surname woolf
"1601372731458-0"
redis 127.0.0.1:6379> xadd writers * name jane surname austen
"1601372731459-0"
redis 127.0.0.1:6379> xadd writers * name toni surname morrison
"1601372731459-1"
redis 127.0.0.1:6379> xadd writers * name agatha surname christie
"1601372731459-2"
redis 127.0.0.1:6379> xadd writers * name ngozi surname adichie
"1601372731459-3"
redis 127.0.0.1:6379> xlen writers
(integer) 5
redis 127.0.0.1:6379> xrevrange writers   - count 1
1) 1) "1601372731459-3"
   2) 1) "name"
      2) "ngozi"
      3) "surname"
      4) "adichie"

xread

使用 xread 以阻塞或非阻塞方式获取消息列表 ,语法格式:

xread [count count] [block milliseconds] streams key [key ...] id [id ...]
  • count :数量
  • milliseconds :可选,阻塞毫秒数,没有设置就是非阻塞模式
  • key :队列名
  • id :消息 id

示例

# 从 stream 头部读取两条消息
redis 127.0.0.1:6379> xread count 2 streams mystream writers 0-0 0-0
1) 1) "mystream"
   2) 1) 1) 1526984818136-0
         2) 1) "duration"
            2) "1532"
            3) "event-id"
            4) "5"
            5) "user-id"
            6) "7782813"
      2) 1) 1526999352406-0
         2) 1) "duration"
            2) "812"
            3) "event-id"
            4) "9"
            5) "user-id"
            6) "388234"
2) 1) "writers"
   2) 1) 1) 1526985676425-0
         2) 1) "name"
            2) "virginia"
            3) "surname"
            4) "woolf"
      2) 1) 1526985685298-0
         2) 1) "name"
            2) "jane"
            3) "surname"
            4) "austen"

xgroup create

使用 xgroup create 创建消费者组,语法格式:

xgroup [create key groupname id-or-$] [setid key groupname id-or-$] [destroy key groupname] [delconsumer key groupname consumername]
  • key :队列名称,如果不存在就创建
  • groupname :组名。
  • $ : 表示从尾部开始消费,只接受新消息,当前 stream 消息会全部忽略。

从头开始消费:

xgroup create mystream consumer-group-name 0-0

从尾部开始消费:

xgroup create mystream consumer-group-name $

xreadgroup group

使用 xreadgroup group 读取消费组中的消息,语法格式:

xreadgroup group group consumer [count count] [block milliseconds] [noack] streams key [key ...] id [id ...]
  • group :消费组名;
  • consumer :消费者名;
  • count : 读取数量;
  • milliseconds : 阻塞毫秒数;
  • key : 队列名;
  • id : 消息 id。
xreadgroup group consumer-group-name consumer-name count 1 streams mystream >

查看笔记

扫码一下
查看教程更方便
网站地图