跳至主要內容

RocketMQ 实现高性能定时消息

Scarb大约 20 分钟

原文地址:http://hscarb.github.io/rocketmq/20220412-rocketmq-flexable-scheduled-message.htmlopen in new window

RocketMQ 实现高性能定时消息

背景

RocketMQ 是阿里孵化的 Apache 顶级开源分布式高可用消息队列。在开源版本中支持延迟消息的功能,但是仅支持几个固定的延迟时间(1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h)。本文介绍如何在 RocketMQ 基础上实现高性能的、任意时间的定时消息。

定时消息和延迟消息是什么?

定时消息和延迟消息是消息队列中对于消息的基本概念。

  • 定时消息:生产者将消息发送到消息队列服务端,但不期望这条消息马上被投递,而是在当前时间之后的某个时间投递,此时消费者才可以消费到这条消息。
  • 延迟消息:生产者将消息发送到消息队列服务端,但不期望这条消息马上被投递,而是延迟一定时间后投递。

这两个概念虽然感觉起来不同,但是在实际使用中效果是完全相同的:消息到达消息队列服务端后不会马上投递,而是到达某个时间才投递给消费者。也就是说,实现其中一个,就可以达到另一个的效果。

使用定时消息,将消息定时设置为当前时间往后的 X 时间,可以实现延迟消息的效果。
使用延迟消息,如果想要固定时间投递,可以计算投递时间到当前时间的时间差 X,然后设置这条消息延迟 X 时间。

所以本文中所实现的定时消息效果也可以用作延迟消息使用。

定时消息的需求和应用场景

定时消息在当前的互联网环境中有非常大的需求。

例如电商/网约车等业务中都会出现的订单场景,客户下单后并不会马上付款,但是这个订单也不可能一直开着,因为订单会占用商品/网约车资源。这时候就需要一个机制,在比如 5 分钟后进行一次回调,回调关闭订单的方法。
这个回调的触发可以用分布式定时任务来处理,,但是更好的方法可以是使用消息队列发送一个延迟消息,因为一条消息会比一个分布式定时任务轻量得多。
开启一个消费者消费订单取消 Topic 的消息,收到消息后关闭订单,简单高效。

当用户支付了订单,那么这个订单不再需要被取消,刚才发的延迟消息也不再需要被投递。当然,你可以在消费消息时判断一下订单的状态以确定是否需要关闭,但是这样做会有一次额外的数据库操作。如果可以取消定时消息,那么只要发送一条定时消息取消的命令就可以取消之前发送的定时消息投递。

除此之外,定时消息还能用于更多其他场景,如定时任务触发、等待重试、事件预订等等。

各大消息队列对定时消息支持的现状

当前各大消息队列和云厂商都对定时消息和延迟消息有一定程度上的支持,但是往往在精度、延迟时间、性能等方面无法都达到完美。

消息队列 / 功能延迟时间精度性能是否支持取消
Kafka××××
RabbitMQ一个队列只支持一个延迟时间低于 RocketMQ×
Pulser支持跨度很大的延迟消息1s无法支持大规模使用×
RocketMQ仅支持固定等级的延迟消息,最大 2 h1s接近于 RocketMQ 普通消息性能×
Amazon SQS15 分钟内×
阿里云 RocketMQ40 天1s~2s的延迟误差接近于 RocketMQ 普通消息性能×
腾讯云 CMQ1 小时内1s单队列处于飞行状态的消息数限制为2万条×
华为云 RocketMQ1 年0.1s接近于 RocketMQ 普通消息性能

可以看到,4 大主流开源消息队列对定时消息的实现都有局限性,无法达到任意时间定时。

各大云厂商将该功能作为一个竞争力项,支持比较灵活的延迟消息。其中华为云 RocketMQ 支持最长 1 年的延迟消息,且延迟精度能够达到 0.1s 内,同时还具备基本等同于普通消息的性能表现。此外,还支持延迟消息的取消,功能领先所有开源消息队列和云化消息队列。

下面我们将揭开华为云 RocketMQ 任意时间定时消息的面纱,看一看它究竟是怎么实现的。

设计和实现

在设计定时消息的实现方案前,我的设计目标是——构建定时消息底座,不仅能让业务使用,也能在其之上构建其他服务,如定时任务调度服务;甚至作为 RocketMQ 一些特性的基础,基于定时消息实现如消息重投等功能。作为一个底座,其必须有如下的特点:支持任意时间的定时、高精度、高性能、高可靠。

难点和取舍

各大开源消息队列和云厂商没有实现完美的定时消息,因为在每个指标上要达到完美,都涉及到其他方面的一些取舍。

  • 延迟时间:一般来说,保存的消息数据都有一个过期时间,如 3 天过期清除,也就是说定时消息延迟时间最大也不能超过这个清除时间。支持更大的延迟时间意味着延迟消息更长的保存时间,也意味着需要单独对定时消息进行存储。
  • 精度:定时消息如何投递?势必会用到类似定时任务地机制。比如每 1s 执行一次定时任务投递这 1s 内的定时消息。定时消息的精度越高就意味着定时任务执行越频繁,计算压力越大,I/O 越频繁。
  • 性能和可靠性:这两个指标往往不能兼得。更高的可靠性意味着消息同步投递、主从同步,那么消息的 TPS 就不可避免地变低。

对于这些取舍,我想说的是:我全都要!

站在巨人的肩膀上

在实现“全都要” 的定时消息之前,我们先来看一下开源版本 RocketMQ 定时消息的实现,从中可以学习和借鉴需多东西。

开源版本 RocketMQ 的定时消息也是取舍之后的产物。对于单队列支持任意时间定时消息的难点是消息的顺序问题。比如用户先发了一条延迟 1 分钟的消息,随后马上发一条延迟 3 秒的消息,显然延迟 3 秒的消息需要先被投递出去,那么服务端在收到消息后需要对消息进行排序再投递。在 RocketMQ 中,为了保证可靠性,消息是需要落盘的,且对性能和延迟的要求,决定了在服务端对消息进行排序是完全不可接受的。

如何解决排序问题?开源版本的做法是:通过固定几个延迟等级的方式,绕过排序。开源 RocketMQ 设定了 18 个固定延迟时间的延迟等级:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h。这样,对于每个延迟等级,他们之中的消息顺序就是固定的:先来先投递,后来后投递,从而绕过了排序这个问题。

下面是开源 RocketMQ 延迟消息的流程图

RocketMQ 延迟消息的流程图
RocketMQ 延迟消息的流程图

RocketMQ 为延迟消息创建了一个内部 Topic,下有 18 个 Queue,对应 18 个延迟等级。收到延迟消息后,不会立即投递到它本该去的 Topic 中,而是投递到延迟消息 Topic 中的对应 Queue。然后的实现十分简单粗暴:为每个 Queue 创建一个线程,循环扫描这个 Queue 里面的消息是否到达投递时间,如果到达则投递到它本该去的 Topic 中。由于每个 Queue 里面消息的延迟等级相同,那么他们的投递就是按顺序的,避免了对延迟消息重新排序。

开源的延迟消息实现经过 4.9.3 版本的优化open in new window,已经支持了异步投递,拥有了不错的性能。

但是它仍存在一个致命的问题:不支持 18 个等级之外的延迟时间。日益增长的客户诉求促使我们探究任意时间定时消息的实现。站在开源 RocketMQ 实现的肩膀上,只要能将 18 个等级改为定时任意时间,就可以实现高性能高可靠的定时消息,因为高可靠和高性能都可以依赖 RocketMQ 本身的机制达到。

存储设计

想要实现任意时间的定时消息,存储设计是最重要的。存储设计面临的两个最大的问题就是

  1. 定时消息的顺序:先发送的不一定先投递。
  2. 消息存储过期:如果定时消息和普通消息存在一起,那么最大延迟时间还是会受到普通消息文件过期时间的限制。

这两个问题可以通过精心的设计存储方案来解决。

使用索引文件解决定时消息顺序问题

回想 RocketMQ 的索引文件 IndexFileopen in new window,它提供了按照消息 Key 查找消息的能力。具体的做法是:它用类似 HashMap 的形式存储了每个消息 Key 下的消息的位置信息,当查询某个 Key 的消息时,可以马上定位到这个 Key 下存储的消息位置信息链表,然后通过位置信息从消息存储文件 CommitLog 中将消息全部信息查出来。

对于定时消息,也可以构建这样一个索引文件,用来快速查找某一时刻需要投递的消息。这样一来,投递消息时只需要借助索引文件就可以查找所有该时刻需要投递的消息,免去了排序的步骤,解决了定时消息顺序问题。

参照 IndexFile,定时消息索引的存储方案设计就变得很简单。索引文件的 Key 即投递的时间段,Value 即该时间段内要投递的所有消息位置信息。

特别提需要确定的几个关键值

  1. 索引每个 Key 的时间跨度:1s 的精度我认为太差,一些秒杀场景可能慢 1 秒就没货了,于是决定降低一个数量级—— 0.1s
  2. 容量和时间跨度:RocketMQ 中文件存储为内存映射文件,最大不超过 2G,所以每个索引文件不应超过 2G。如果按 TPS 1w 来算,1 小时的索引文件大小就会达到 700M 左右。所以时间跨度定为 1 小时,容量为 3600w。如果超过 3600w 怎么办?把这个文件做成可扩展的文件队列即可。

消息存储的方案取舍

想要摆脱消息默认过期时间的限制,达到更长的延迟时间,那么只能把定时消息单独存储,投递前永不删除。

如果要单独存储,也有几个选择:使用第三方 K-V 数据库如 RocksDB,或者自研存储。下面来分析一下这几个方案的优缺点

优点缺点
与普通消息共用存储无需额外开发延迟时间受限
RocksDB延迟时间不限。性能高,开发量较小引入第三方组件,增加维护成本
自研存储延迟时间不限。性能高,易于管理,侵入小开发量大

由前文所说,我们希望实现一个定时消息的底座,不希望将其他第三方组件引入开源 RocketMQ,于是毅然选择自研存储。

自研的存储的目的是为了能够长期保存定时消息,但是一直存着不删也不行。很明显,已经投递的定时消息文件可以被删除,但是如何删除已经投递的定时消息成为一个问题:因为定时消息定时的时间不固定,一个消息文件中可能包含延迟 1s 的和延迟 1 年的消息,那这个文件的删除时间需要在延迟最大的消息投递之后,这显然不是一个好办法。

借鉴之前索引文件的涉及,我们把定时消息存储文件也按投递时间段拆分,例如每一天需要投递的消息保存为一个文件队列,这样就可以在投递完后的第二天把整个文件队列删除。

存储最终方案

经过上面的分析,最终的存储方案就很明确了:需要新增两种存储,分别是定时消息数据和定时消息索引,如图所示。

定时消息投递

定时消息的处理逻辑主要分为两个部分:存储和投递。存储已经搞定了,下面我们看看投递的设计。

投递即在定时消息到期后把消息重新投递到 RocketMQ 的普通消息存储 CommitLog 中,让消费者可以消费。为了追求更高的性能,定时消息的投递默认使用异步刷盘的方式。

容易想到使用一个单独的线程来处理投递逻辑,整个流程如下图所示,其中 Scheduled Replay 即定时消息投递线程。

其实在最初还实现了另一种方案:将定时消息直接投递到 ConsumeQueue 中,让其保存消息在定时消息存储中的位点。这样的好处是免去了一次 CommitLog 的存储,减少磁盘占用、性能也更好。但是这种方案在主从同步时会有消息顺序不同的问题:RocketMQ 主从同步只按顺序同步 CommitLog 中的消息,然后依靠 CommitLog 生成的索引顺序就会与 CommitLog 中消息顺序一致。如果直接投递到 ConsumeQueue,从节点上想要同步 ConsumeQueue 就需要在从节点也启动一个 Scheduled Replay 投递线程,在异步投递的情况下顺序可能与主节点不一致,这样就可能造成主从切换后丢失消息。

投递一条消息的逻辑很简单:拿到一个索引项后从中获取定时消息存储位置,从定时消息存储中查出消息,然后保存到 CommitLog 中。但是要实现高性能的定时消息投递,如何处理索引的加载和投递的触发就成为需要深思熟虑的问题。

预加载到时间轮

对于定时消息的投递,网络上的很多资料都指向一个方案——时间轮。

箭头按顺时针方向以固定频率移动(我们要达到的定时精度为 0.1s,所以频率就是 0.1s 一次),每一次移动称为一个 tick。每个 tick 保存了一个链表,里面是该 tick 需要触发的所有任务。

时间轮常被用做定时器的实现,它最大的优势就是,任务的新增和取消都是 O(1) 时间复杂度,而且只需要一个线程就可以驱动时间轮进行工作。

有了时间轮,我们就可以把每个定时消息的投递作为一个任务,将该 tick 需要投递的消息都查出来放到时间轮里面,随着时间轮跳动就不断地触发投递任务。

时间轮保存在内存里面,这么多消息要投递存不下怎么办?每小时加载下一个小时要投递的消息索引就好了!

0.1s 一个 tick,需要的格子数太多怎么办?用分层时间轮或者哈希时间轮就好了!

这样看来,时间轮看似一个完美的解决方案。那它真的完美吗?它在高性能和大量定时消息的情况下可能引发灾难。

让我们来设想这两个场景

  1. 发送了 1000w 某一时刻的定时消息,要同时投递
  2. 定时消息和普通消息同时大量投递,导致 Page Cache 繁忙,定时消息投递失败。

第一个场景下,如果按照上述方案,会将 1000w 个索引项加载进内存,内存无法承受会导致程序崩溃。

第二个场景就需要对定时消息的投递做流控和失败重试。

流控和重试意味着任务暂时等待,过一 tick 再执行。也就是说,要把这一 tick 中的任务拿出来放到下一 tick,如果此时流控还没有解除,那要继续进行任务的转移,这样就会造成很多额外的出入队操作。

如果继续优化时间轮的方案未尝不可,但是会有很多工作量。有没有其他可取的方案?

周期性启动定时任务

开源 RocketMQ 的定时消息实现为我们提供了很好的参照:在 4.9.3 版本的优化后不仅支持高性能异步投递,且实现了流控和投递失败的重投。

在扫描每个延迟等级的队列时具体的逻辑是:

  • 有一个表记录每个队列当前的投递位移
  • 每次启动一个定时任务,从投递位移之后开始扫描消息,如果消息到期则直接投递,如果还未到期则该任务结束。在 0.1s 后会启动一个新的定时任务进行下一次扫描

借鉴这个方法,对于任意时间定时消息的投递,也可以每 0.1s 启动一个新的定时任务,投递这 0.1s 内要投递的消息。这样就可以免去将投递任务加入到时间轮这个操作。对于流控也不需要重新投递这么多任务,而是只需要从上一次投递的时间和偏移量开始一个新的投递任务即可。

如何解决某一时刻投递消息量太大导致的内存问题?限制每个投递任务投递的消息量即可。投递不完则到下个任务继续投。

于是投递的逻辑变成这样:

  • 用[已投递时间]和[已投递位移]两个参数来记录当前投递状态
  • 如果投递速度能跟上,那么下任务会在上个任务 0.1s 后执行
  • 如果投递速度跟不上(某一时刻要投递的定时消息太多,0.1s 内投不完),则不等待直接进行第二个任务

至此,还剩下最后的问题是流控和消息的重投。

流控

这里需要引入第二个线程:投递任务状态更新线程。对于异步投递,实际上投递方法执行后可以抽象成一个投递任务,在内存中异步投递。那么这里就还需要一个投递任务状态更新线程,扫描投递任务的状态,当投递任务结束时进行一些状态更新。消息投递线程和任务状态更新线程是生产-消费模式的一个实现。

异步投递任务由一个阻塞队列来承载,这个阻塞队列的长度就代表着同时可以的进行异步投递任务的数量。我们可以人为地为阻塞队列设置一个长度上限,当达到上限时说明有过多的异步投递任务还在执行,需要流控。此时,跳出该投递任务,等待一会再开始新一个投递任务。

重投

如果消息投递失败了且不做任何处理,这条定时消息就会丢失。在 RocketMQ 这样高可靠的消息队列中,消息的丢失是不能被接收的,所以异常消息的重投就十分重要。

当前的做法是在投递任务状态更新线程扫描到状态为失败的任务时就开始重新投递该消息,如果多次投递失败会阻塞其他消息的投递,不断重试重投,直到该消息投递成功。

至此,我们完成了任意时间定时消息的设计旅程。在遇见和解决一个一个坑之后实现了高性能高可靠的任意时间定时消息。

定时消息取消

支持定时也要支持取消,这样才是一个完整的功能。定时消息由于其延迟投递的特性,是有机会在其投递之前“反悔”的。

对比定时消息的实现,取消功能的难度没有那么大,但也要考虑大消息量和高 TPS 下的影响。

实现定时消息取消的实现要点是保存一个需要取消的定时消息 ID 的信息集,每次投递时查询该集来判断是否要投递该消息。这个集合保存在内存中可能会造成内存泄漏,所以保存在磁盘中为妙。但是为取消消息专门创建一个文件来存储又太多余,能不能在现有的消息存储中存储?

当然可以!在每条消息中有一个字段来存储消息的 Flag,可以把取消消息的状态更新到该字段中存储。

最后是取消的触发方式。

在消息队列中,万物皆消息,任何功能都可以用消息来触发。

于是最终决定采用发送一条“取消消息”的方式来触发定时消息的取消。

小结

本文第一节从定时消息的定义和需求出发,说明了当今企业对消息队列定时消息能力的强烈需求和当下主流消息队列对定时消息能力支持不完美现状。

在第二节中,首先分析了任意时间定时消息实现的难点,然后站在开源 RocketMQ 延迟消息实现的基础上,从存储和投递两个方面详解了华为云 RocketMQ 任意时间定时消息的设计和实现。最后讲解了定时消息取消的设计和实现。

华为云 RocketMQ 定时消息已经上线

华为云已经上线分布式消息服务 RocketMQ 版任意时间定时消息特性。

任意时间定时消息特性提供领先业界的定时消息体验:

  • 任意时间的定时消息或延迟消息,最长延迟可达 1 年

  • 支持取消已经发送的定时消息

  • 消息投递毫秒级精度

点击使用open in new window

参考资料


欢迎关注公众号【消息中间件】(middleware-mq),更新消息中间件的源码解析和最新动态!