RocketMQ Memory Store
大约 1 分钟
原文地址:http://hscarb.github.io/rocketmq/20220618-rocketmq-memory-store.html
RocketMQ Memory Store
背景
概要设计
整体流程

首先需要在 Broker 中创建 Topic,并指定为内存存储的 Topic。
- 生产者发送生产请求
SendMessageProcessor
处理生产请求- 根据请求的 Topic 获取
TopicConfig
- 判断该 Topic 是否是内存 Topic
- 如果不是,走原有
DefaultMessageStore
存储 - 如果是,走
MemoryMessageStore
存储
- 根据请求的 Topic 获取
- 消费者发送拉取消息请求
PullMessageProcessor
处理拉取消息请求- 根据请求的 Topic 获取
TopicConfig
- 判断该 Topic 是否是内存 Topic
- 如果不是,走原有
DefaultMessageStore
取消息 - 如果是,走
MemoryMessageStore
取消息
- 根据请求的 Topic 获取
详细设计
整体流程顺序图

类设计
MemoryMessageStore

为
MemoryMessageStore
设置一个存储阈值,为内存中存储的消息总大小,默认为可用内存的 75%messageQueueMap
:一个HashMap
,Key 是 Topic 名称,Value 是 Disruptor 存储的MessageExtBrokerInner
消息队列messageTotalSize
:记录保存的消息总大小putMessage
:将消息根据 Topic 放入指定队列,如果超过消息总大小阈值,则从该 Topic 中删除一条消息。如果该 Topic 为空,则删除一条保存最久的消息
TopicConfig
boolean memory
:是否是内存 Topic
BrokerController
memoryTopicEnable
:是否启动内存 Topic 功能,默认 false
欢迎关注公众号【消息中间件】(middleware-mq),更新消息中间件的源码解析和最新动态!
