使用Redis做消息队列(一)

消息队列简介

在一个解决方案中,引入消息队列的原因一般有下列几种:

  1. 解耦。在生产者-消费者关系的系统中,任一方经常变动而另一方不想受到影响,则可以使用消息队列进行解耦。
  2. 异步。在生产者-消费者关系的系统中,若生产速度或消费速度可能会经常变化,需要动态调节,则可以使用消息队列将流程拆成异步的关系;在一条流程中,若某些逻辑本不需要以同步顺序执行,则用消息队列进行异步处理可以提高效率。
  3. 削峰。在生产者-消费者关系的系统中,若生产者突然暴增,而消费者跟不上,则可以起到缓冲作用,消费者按自己能力去处理,避免消费者系统(例如数据库连接数过大)卡死宕机。

引入消息队列,从而引入的问题有下列几种:

  1. 降低系统可用性
  2. 增加系统复杂性

Redis 如何做消息队列

1. BRPOP / BRPOPLPUSH

  • 使用普通队列,任一消息只能被一个消费者接收到
  • 优化的一点就是利用 Redis 的阻塞 POP1,以避免在空队列的时候空跑轮询
  • 增加可用性的一点,就是利用 RPOPLPUSH 这种原子操作做两个连续动作2
  • 需要注意 Redis 空连接自动断开的问题

2. 发布订阅(SUBSCRIBE / PUBLISH)

  • 一个消息可以被所有订阅者收到
  • 如果没有订阅者,消息将丢失
  • 消息积压会导致错误断开

3. 基于 Stream 类型

  • Redis 5.0 以后支持
  • 借鉴 kafka 实现的一个比较专业的消息队列类型
    • 支持消息分组
    • 每个消息组支持多个消费者
    • 消息自动持久化
    • 支持定长消息容量

Redis Stream 的消费者组

一个消费者组,像一个消费者一样从 stream 获取消息,然后提供给任意个真正的消费者,并保证:

  1. 一条消息不会给不同的消费者处理
  2. 消费者用名字区分,这样即使消费者宕掉重启,原来给他处理的消息,还能继续让他处理
  3. 每个消费者组,都有一个未处理的下一条消息概念,当消费者问要消息的时候就提供这一条
  4. 消费者处理消息后需要显式ack,以便消费者组可以去掉这条消息
  5. 消费者组会追踪所有待处理的消息,包括已经提供给消费者,但还没有收到ack的消息。这样消费者在查询历史消息的时候,可以只看到发送给了自己的消息列表

消费者组大约就是保存如下信息的一个状态集合:

+----------------------------------------+
| consumer_group_name: mygroup           |
| consumer_group_stream: somekey         |
| last_delivered_id: 1292309234234-92    |
|                                        |
| consumers:                             |
|    "consumer-1" with pending messages  |
|       1292309234234-4                  |
|       1292309234232-8                  |
|    "consumer-42" with pending messages |
|       ... (and so forth)               |
+----------------------------------------+

简单试用 Redis Stream

参考官方的说明文档3,简单试用一下 Stream。

XREAD has no other options than COUNT and BLOCK, so it’s a pretty basic command with a specific purpose to attach consumers to one or multiple streams. More powerful features to consume streams are available using the consumer groups API, however reading via consumer groups is implemented by a different command called XREADGROUP, covered in the next section of this guide.

我们使用 Python 3 语言制作一个生产者,每隔 0.5 秒生成一个消息,然后消费者每隔2秒处理5个消息。

生产者代码:

import redis
import time

def main():
    cnt = 0
    r = redis.Redis(host='localhost', port=6379, db=0)
    while True:
        time.sleep(1)
        data = {
            'name': 'name-%d' % cnt,
            'cnt': cnt,
        }
        r.xadd('mystream', data)
        cnt += 1
        print('pushed %d to mystream' % cnt)

if __name__ == '__main__':
    main()

首先创建一个消费者组grp-1,然后是消费者代码:

import redis
import time

consumer_name = 'consumer_b'
stream_name = 'mystream'
group_name = 'grp-1'

def process_msg(id, msg):
    print('process [%s]: %s' % (id, msg))

def main():
    r = redis.Redis(host='localhost', port=6379, db=0)

    print('[%s] start!' % consumer_name)

    check_backlog = True
    last_id = 0
    while True:
        my_id = None
        if check_backlog:
            my_id = last_id
        else:
            my_id = '>'
        
        items = r.xreadgroup(group_name, consumer_name, 
                {stream_name: my_id}, block=2000, count=5)

        if len(items) == 0:
            print('timeout!', my_id)
            continue
        
        if len(items[0][1]) == 0:
            check_backlog = False
            print('----------------- old msg done! -----------------')

        for item in items[0][1]:
            process_msg(item[0], item[1])
            r.xack(stream_name, group_name, item[0])
            last_id = id

        time.sleep(2)

if __name__ == '__main__':
    main()

这里消费者代码有个 my_id 值得注意,他的作用就是先把 pending 的全部处理完,然后再处理新的消息,因为当 my_id == 0 的时候会读取历史记录中所有给此消费者的 pending 记录,因为限定了每次只处理 5 个,所以要用 last_id 来记录处理的最后一个,并设置 my_id = last_id,直到全部处理完,len(items) == 0 的时候,设置 my_id = '>'

运行输出节选:

生产者 消费者
pushed 9 to mystream process [b'1593262289451-0’]: {b’name’: b’name-6’, b’cnt’: b'6'}
pushed 10 to mystream process [b'1593262290452-0’]: {b’name’: b’name-7’, b’cnt’: b'7'}
pushed 11 to mystream process [b'1593262291454-0’]: {b’name’: b’name-8’, b’cnt’: b'8'}
pushed 12 to mystream process [b'1593262292456-0’]: {b’name’: b’name-9’, b’cnt’: b'9'}
pushed 13 to mystream process [b'1593262293457-0’]: {b’name’: b’name-10’, b’cnt’: b'10'}
pushed 14 to mystream process [b'1593262294459-0’]: {b’name’: b’name-11’, b’cnt’: b'11'}
pushed 15 to mystream process [b'1593262295460-0’]: {b’name’: b’name-12’, b’cnt’: b'12'}
pushed 16 to mystream process [b'1593262296461-0’]: {b’name’: b’name-13’, b’cnt’: b'13'}