NingG +

RocketMQ:延迟队列

0.概要

目标:RocketMQ 的延迟队列的底层实现原理,潜在问题,以及业务使用过程中,如何补偿。

具体焦点:延迟队列,Scheduled Message,定时消息

1.延迟队列

几个方面:

1.1.底层实现原理

几个方面:

目前rockatmq支持的延迟时间有:18 个级别,在 Broker 启动前,可以在配置文件中设置

1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

以上支持的延迟时间在 msg.setDelayTimeLevel 对应的级别依次是123

上图是 RocketMQ 的 Scheduled Message(定时消息/延迟消息)的实现原理,其主要分为 2 部分:

  1. 消息落盘:落在独立的延迟队列
  2. 消息调度:依靠定时任务,进行消息的消费,并在时间到达后,将消息,写入真正的目标 Topic

Tips:

核心:将 msg 暂存在「延迟队列」中,依赖定时任务,定期检查,将满足条件的 msg,送回「真正的目标队列」中;

特别说明:上述处理,都是在 RocketMQ 的「Server 端/服务器端」进行处理的;

消息落盘:详细过程

  1. 替换 Topic 和 queueId:在写入CommitLog之前,如果是延迟消息,替换掉消息的TopicqueueId(被替换为延迟消息特定的Topic,queueId则为延迟级别对应的id)
  2. 消息转存:消息写入CommitLog之后,提交 dispatchRequestDispatchService
  3. 落盘存储:根据替换后的 TopicqueueId,将 msg 写入 ScheduledConsumeQueue 中(特定Queue,不会被消费)

消息调度:详细过程

  1. 定时任务监听:给每个Level设置定时器,从ScheduledConsumeQueue中读取信息,msg 已经耗尽延时时间,则,从CommitLog中读取消息内容,恢复成正常的消息内容写入CommitLog
  2. 消息转存:写入CommitLog后,提交 dispatchRequestDispatchService
  3. 落盘存储:由于已恢复 Topic 等属性,所以,此时DispatchService会将消息投递到正确的ConsumeQueue

回顾一下这个方案,最大的优点就是没有排序

  1. 分级隔离:先发一条level是5s的消息,再发一条level是3s的消息,因为他们会属于不同的ScheduleQueue所以投递顺序能保持正确
  2. 同级有序:如果先后发两条level相同的消息,那么他们的处于同一个ConsumeQueue且保持发送顺序
  3. 固定数量:因为level数固定,每个level的有自己独立的定时器,开销也不会很大
  4. 系统可靠:ScheduledConsumeQueue其实是一个普通的ConsumeQueue,所以可靠性等都可以按照原系统的M-S结构等得到保障(多副本存储)

但是这个方案也有一些问题:

  1. 灵活性有限
    1. 固定了Level,不够灵活,最多只能支持18个Level
    2. 业务是会变的,但是Level需要提前划分,不支持修改
  2. 大数据量问题
    1. 如果要支持30天的延迟,CommitLog的量会很大,这块怎么处理没有看到

1.2.业务实践

焦点:业务角度,常见问题,以及解决方案

典型的应用场景

2.参考资料

原文地址:https://ningg.top/rocketmq-series-02-scheduled-message/
微信公众号 ningg, 联系我

同类文章:

微信搜索: 公众号 ningg, 联系我, 交个朋友.

Top