RocketMQ 最佳实践
原文地址:http://hscarb.github.io/rocketmq/20221104-rocketmq-best-practice.html
RocketMQ 最佳实践
生产者
发送重试
默认的消息发送超时时间为 3s,重试次数为 2 次。
在生产环境中建议将超时时间合重试次数设大一点,以便有足够的重试次数来应对发送失败的场景。
配置建议:
使用异步发送可以避免上游调用超时,可以将超时时间设为 10s,重试次数设为 16次。
producer.setSendMsgTimeout(10000);
producer.setRetryTimesWhenSendFailed(16);
producer.setRetryTimesWhenSendAsyncFailed(16);
延迟故障规避
RocketMQ 引入了延迟故障规避机制,当消息发送失败后,不再会发送到失败的 Broker,而是换一个 Broker 发送。该机制默认不开启。
该配置项为 sendLatencyFaultEnable
- false:默认值,规避策略只在当前消息发送失败重试时失效。
- true:一旦消息发送失败,在接下来的一段时间内所有的客户端都不会向对应的 Broker 发送消息。
配置建议:
根据集群的负载来选择,一般无需开启。
- 如果集群负载较高,不建议开启。因为某个 Broker 发送失败后的 5 分钟不会接收消息,会造成其他 Broker 负载过高。
同一进程中多个生产者发送消息到多个集群
RocketMQ 客户端 SDK 中的 MQClientInstance
表示对应到一个 RocketMQ 集群的客户端,在一个进程中可以有多个。
在 MQClientInstance
中可以注册多个生产者和消费者,这些生产者和消费者的元数据配置是相同的。
为了实现多个生产者分别发送消息到多个集群,需要将生产者分别注册到多个 MQClientInstance
下,具体的方法为:为生产者设置不同的 ClientId
。
配置建议:
将生产者的 UnitName
设置成集群名称,ClientId
生成时会拼接 UnitName
,进而产生不同的 ClientId
。
DefaultMQProducer producer1 = new DefaultMQProducer("producer_group1");
producer.setUnitName("Cluster1")
producer.setNamesrvAddr("1.1.1.1:9876");
producer.start();
DefaultMQProducer producer2 = new DefaultMQProducer("producer_group2");
producer.setUnitName("Cluster2")
producer.setNamesrvAddr("2.2.2.2:9876");
producer.start();
附:生成 ClientId
的源码,
public String buildMQClientId() {
StringBuilder sb = new StringBuilder();
sb.append(this.getClientIP());
sb.append("@");
sb.append(this.getInstanceName());
if (!UtilAll.isBlank(this.unitName)) {
sb.append("@");
sb.append(this.unitName);
}
return sb.toString();
}
消费者
消费幂等
RocketMQ无法避免消息重复(Exactly-Once),所以如果业务对消费重复非常敏感,有必要根据业务上的唯一Key对消息做幂等处理。
因为不同的Message ID对应的消息内容可能相同,有可能出现冲突(重复)的情况,所以真正安全的幂等处理,不建议以Message ID作为处理依据。最好的方式是以业务唯一标识作为幂等处理的关键依据,而业务的唯一标识可以通过消息Key设置。
消费组线程数
RocketMQ 消费者提供 consumeThreadMin
、consumeThreadMax
两个参数来设置线程池中的线程个数,但是由于线程池内部为无界队列,所以 consumeThreadMax
参数无效。
在实践中这两个值往往会设置成相同的。
避免订阅关系不一致导致消息丢失
RocketMQ 的一个消费组可以订阅多个 Topic,订阅多个 Tag。到那时同一个消费组中的订阅关系必须一致。
如果订阅关系不一致会造成消息丢失(部分消息未被消费)。
避免 ClientId 相同
消费者的 ClientId 生成规则与生产者一样。如果一个消费组内两个消费者的 ClientId 相同,会出现有的队列重复消费、有的队列无法消费的情况。
配置建议:
由于 ClientId 生成时会拼接消费者的 clientIP
属性,同一 IP 下不同消费者的 clientIP
相同会导致 ClientId 相同,所以建议手动设置 clientIP
。
consumer.setClientIP('192.168.3.10' + System.currentTimeMillis());
消费重试次数
普通消息默认重试 16 次,重试实践按照延迟等级每次重试会递增,到达 16 次后,之后每次重试按照最大延迟等级对应的时间间隔。
// 重试的时间从 10s 开始
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
顺序消费模式下重试次数配置无效,如果一条消息消费不成功会一直重试,重试次数为 Integer.MAX_VALUE
。重试时间间隔可以用 suspendCurrentQueueTimeMillis
设置,默认为 1s。
Broker
欢迎关注公众号【消息中间件】(middleware-mq),更新消息中间件的源码解析和最新动态!
