零、资料来源
【消息中间件】4天学会RocketMQ,企业真实场景,最新最细通俗易懂_哔哩哔哩_bilibili↗
一、mq概述
1.0 消息队列的选型
Kafka、ActiveMQ、RabbitMQ、RocketMQ来进行不同维度对比。
| 特性 | ActiveMQ | RabbitMQ | RocketMQ | Kafka |
|---|---|---|---|---|
| 单机吞吐量 | 万级 | 万级 | 10 万级 | 10 万级 |
| 时效性 | 毫秒级 | 微秒级 | 毫秒级 | 毫秒级 |
| 可用性 | 高(主从) | 高(主从) | 非常高(分布式) | 非常高(分布式) |
| 消息重复 | 至少一次 | 至少一次 | 至少一次 最多一次 | 至少一次最多一次 |
| 消息顺序性 | 有序 | 有序 | 有序 | 分区有序 |
| 支持主题数 | 千级 | 百万级 | 千级 | 百级,多了性能严重下滑 |
| 消息回溯 | 不支持 | 不支持 | 支持(按时间回溯) | 支持(按offset回溯) |
| 管理界面 | 普通 | 普通 | 完善 | 普通 |
选型的时候,我们需要根据业务场景,结合上述特性来进行选型。
1.1 消息队列使用场景
- 解耦:可以在多个系统(如库存系统、支付系统、物流系统…)之间进行解耦,将原本通过网络进行调用的方式改为使用MQ进行消息的异步通讯,只要该操作不是需要同步的,就可以改为使用MQ进行不同系统之间的通信,这样项目之间不会存在耦合,系统之间不会产生太大的影响,就算一个系统挂了,也只是消息挤压在MQ里面没人进行消费而已,不会对其他的系统产生影响。

- 异步:假如一个操作涉及到好几个步骤,这些步骤之间不需要同步完成,比如客户去创建了一个订单,还要去客户轨迹系统添加一条轨迹、去库存系统更新库存、去客户系统修改客户的状态等等。这样如果这个系统都直接进行调用,那么将会产生大量的时间,这样对于客户是无法接受的;并且像添加客户轨迹这种操作是不需要去同步操作的,如果使用MQ,在客户创建订单时,将后面的轨迹、库存、状态等信息的更新全都放到MQ里面然后去异步操作,这样就可以加快系统的访问速度,提供更好的客户体验。

- 削峰:一个系统访问流量有高峰时期,也有低峰时期,比如说,中午整点有一个抢购活动等等。比如系统平时流量并不高,一秒钟只有100多个并发请求,系统处理没有任何压力,一切风平浪静,到了某个抢购活动时间,系统并发访问量剧增,比如达到了每秒5000个并发请求,而我们的系统每秒只能处理1000个请求,那么由于流量太大,我们的系统、数据库可能就会崩溃。这时如果使用MQ进行流量削峰,将用户的大量消息直接放到MQ里面,然后我们的系统去按自己的最大消费能力去消费这些消息,就可以保证系统的稳定,只是可能要根据业务逻辑,给用户返回特定页面或者稍后通过其他方式通知其结果

1.2 mq的劣势

二、rocketmq

2.0 docker容器启动
[root@mq-vm ~]# docker network create rocketmq-net
931485285fd0b7e0fa2ddb588432bc431e859d1ba6f997162f5d0fe30f9e5f78
[root@mq-vm ~]# docker run -d --name rocketmq-nameserver --network rocketmq-net -p 9876:9876 apache/rocketmq:4.9.4 sh mqnamesrv
02f73715456be7005dd3204c076d15e3502289850a28cc1ca172991bcf264154
[root@mq-vm ~]# sudo mkdir -p /opt/rocketmq/broker/conf
[root@mq-vm ~]# cat > /opt/rocketmq/broker/conf/broker.conf <<'EOF'
brokerClusterName=DefaultCluster
brokerName=broker-a
brokerId=0
brokerIP1=192.168.126.129
listenPort=10911
autoCreateTopicEnable=true
namesrvAddr=rocketmq-nameserver:9876
EOF
[root@mq-vm ~]# docker run -d --name rocketmq-broker --network rocketmq-net \
-p 10911:10911 -p 10909:10909 \
-v /opt/rocketmq/broker/store:/home/rocketmq/store:Z \
-v /opt/rocketmq/broker/logs:/home/rocketmq/logs:Z \
-v /opt/rocketmq/broker/conf/broker.conf:/home/rocketmq/rocketmq-4.9.4/conf/broker.conf:Z \
apache/rocketmq:4.9.4 sh mqbroker -c /home/rocketmq/rocketmq-4.9.4/conf/broker.conf
[root@mq-vm ~]# docker run -d --name rocketmq-console --network rocketmq-net -p 8080:8080 \
-e JAVA_OPTS="-Drocketmq.namesrv.addr=rocketmq-nameserver:9876" \
styletang/rocketmq-console-ng:latest
e387591ad5fcb25e803969f642b0422736a4ab7153731e3b9875e1180ce5cc86
[root@mq-vm ~]# docker logs -f rocketmq-nameserver
OpenJDK 64-Bit Server VM warning: Using the DefNew young collector with the CMS collector is deprecated and will likely be removed in a future release
OpenJDK 64-Bit Server VM warning: UseCMSCompactAtFullCollection is deprecated and will likely be removed in a future release.
The Name Server boot success. serializeType=JSON# docker-compose版本
version: '3.8'
services:
namesrv:
image: apache/rocketmq:4.9.4
container_name: rocketmq-nameserver
command: sh mqnamesrv
networks:
- rocketmq-net
ports:
- "9876:9876"
broker:
image: apache/rocketmq:4.9.4
container_name: rocketmq-broker
command: sh mqbroker -c /home/rocketmq/rocketmq-4.9.4/conf/broker.conf
networks:
- rocketmq-net
volumes:
- /opt/rocketmq/broker/store:/home/rocketmq/store:Z
- /opt/rocketmq/broker/logs:/home/rocketmq/logs:Z
- /opt/rocketmq/broker/conf/broker.conf:/home/rocketmq/rocketmq-4.9.4/conf/broker.conf:Z
ports:
- "10911:10911"
- "10909:10909"
depends_on:
- namesrv
console:
image: styletang/rocketmq-console-ng:latest
container_name: rocketmq-console
networks:
- rocketmq-net
ports:
- "8080:8080"
environment:
- JAVA_OPTS=-Drocketmq.namesrv.addr=rocketmq-nameserver:9876
networks:
rocketmq-net:
external: true
[root@opengauss-vm ~]# docker-compose up -d 2.1 生产者与消费者
<!-- RocketMQ Spring Boot starter (provides RocketMQTemplate, annotations) -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>${rocketmq.spring.boot.starter.version}</version>
</dependency>
<!-- RocketMQ client library (explicit version match to broker) -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>${rocketmq.version}</version>
</dependency>默认情况下,对于同一个topic
- 不同
group之间(广播模式)会消费到同一批消息(总和一样)- 缓存同步,每个实例都同步最新到状态
- 同个
group(集群模式)内,每条消息只会被一个消费者消费- 订单处理,每条订单只会被处理一次,不会重复扣库存
// 默认使用,不写也没关系
consumer.setMessageModel(MessageModel.CLUSTERING);❓如何让同一个group内也能使用广播模式
可以修改消费模式为广播模式
// 设置消费模式为广播模式
consumer.setMessageModel(MessageModel.BROADCASTING);
默认情况下,每个topic中有4个队列queue0、1、2、3
@Test
public void testCompleteFlow() throws Exception {
System.out.println("=== RocketMQ 完整流程测试开始 ===");
// 1. 先启动消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
consumer.setNamesrvAddr("192.168.126.129:9876");
consumer.subscribe("BenchmarkTest", "*");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
AtomicInteger consumedCount = new AtomicInteger(0);
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
System.out.println("🎉 消费者收到 " + msgs.size() + " 条消息:");
for (MessageExt msg : msgs) {
System.out.println("----------------------------------------");
System.out.println("消息ID: " + msg.getMsgId());
System.out.println("主题: " + msg.getTopic());
System.out.println("标签: " + msg.getTags());
System.out.println("内容: " + new String(msg.getBody()));
System.out.println("队列: " + msg.getQueueId());
System.out.println("----------------------------------------");
consumedCount.incrementAndGet();
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
consumer.start();
System.out.println("✅ 消费者启动成功");
// 2. 等待消费者注册
Thread.sleep(5000);
// 3. 启动生产者发送消息
DefaultMQProducer producer = new DefaultMQProducer("test-producer-group");
producer.setNamesrvAddr("192.168.126.129:9876");
producer.setVipChannelEnabled(false);
producer.start();
System.out.println("✅ 生产者启动成功");
// 发送多条测试消息
for (int i = 1; i <= 5; i++) {
String messageContent = "测试消息 " + i + " - 时间: " + System.currentTimeMillis();
Message message = new Message("BenchmarkTest", "TEST", messageContent.getBytes());
SendResult result = producer.send(message);
System.out.println("📤 发送消息 " + i + " 成功, MsgId: " + result.getMsgId());
Thread.sleep(1000); // 间隔1秒
}
producer.shutdown();
System.out.println("✅ 生产者已关闭");
// 4. 等待消息被消费
System.out.println("⏳ 等待消息消费...");
Thread.sleep(10000);
System.out.println("📊 总共消费消息数量: " + consumedCount.get());
consumer.shutdown();
System.out.println("✅ 消费者已关闭");
System.out.println("=== 测试完成 ===");
}2.2 消息类别
(1) 同步消息
特征:即时性较强,重要的消息,且必须有回执的消息,例如短信,通知(转账成功)
前文发送的消息都是同步消息
producer.send(Message msg);
(2) 异步消息
特征:即时性较弱,但需要有回执的消息,例如订单中的某些信息
producer.send(Message msg, SendCallback sendCallback);
(3) 单向消息
特征:不需要有回执的消息,例如日志类消息
producer.sendOneway(Message msg);
(4) 延时消息
特征:消息发送时并不直接发送到消息服务器,而是根据设定的等待时间到达,起到延时到达的缓冲作用
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”;
message.setDelayTimeLevel(int level);
broker在接收到延时消息的时候,会将延时消息存入到延时Topic的队列中,然后ScheduleMessageService中,每个queue对应的定时任务会不停地被执行,检查queue中哪些消息已到设定时间,然后转发到消息的原始Topic,这些消息就会被各自的consumer消费了。

(5) 批量消息
producer.send(Collection<Message> msgs);
- 这些批量消息应该有相同的
topic - 相同的
waitStoreMsgOK - 不能是延时消息
- 消息内容总长度不超过
4M - 消息内容总长度包含如下:
- topic(字符串字节数)
- body(字节数组长度)
- 消息追加的属性(key与value对应字符串字节数)
- 日志(固定20字节)
2.3 消息过滤
2.3.1 tag过滤
// 发送的信息带 tag1 标签
Message message = new Message("topic8", "tag1", msg.getBytes());
// 只消费 vip 标签的消息
consumer.subscribe("topic8", "vip");
// 只消费 tag1 或 vip 标签的消息
consumer.subscribe("topic8", "tag1 || vip");2.3.2 属性过滤
// 给消息设置属性
message.putUserProperty(String name, String value);
message1.putUserProperty("age", "18");
message1.putUserProperty("name", "zhangsan");
message2.putUserProperty("age", "15");
message2.putUserProperty("name", "lisi");
// 只接收 message1
consumer1.subscribe("topic8", MessageSelector.bySql("age > 16"));
// 只接收 message2
consumer2.subscribe("topic8", MessageSelector.bySql("age < 16 and name = 'lisi'"));在conf文件夹下的broker.conf中添加如下配置后,重启brocker以开启sql过滤
enablePropertyFilter=true三、springboot整合
3.1 基础整合
3.1.1 yaml配置
spring:
application:
name: demo-service
rocketmq:
# NameServer 列表(必填),多个用 ; 或 , 分隔
name-server: 127.0.0.1:9876
# 如果是云上部署(例如阿里云),可设置 CLOUD;本地集群用 LOCAL
access-channel: LOCAL
producer:
# 必填:生产者 group
group: demo-producer-group
# 发送超时(ms),默认几秒,生产环境可根据场景调优
send-message-timeout: 3000
# 发送失败时重试次数(同步发送),生产环境可设置 1~3
retry-times-when-send-failed: 2
# 最大消息体(字节)
max-message-size: 4194304
# 如果开启 ACL(鉴权),在这里填环境变量或占位符
access-key: ${ROCKETMQ_ACCESS_KEY:}
secret-key: ${ROCKETMQ_SECRET_KEY:}
# 是否开启消息链路追踪(需要服务端/插件支持)
enable-msg-trace: true
consumer:
# 必填:消费者 group
group: demo-consumer-group
# 消息模式:CLUSTERING(集群消费)或 BROADCASTING(广播)
message-model: CLUSTERING
# 消费线程数(并发消费线程)
consume-thread-nums: 20
# 达到该重试次数后进入死信(或业务处理方式),视 broker 配置
max-reconsume-times: 16
# 批量拉取大小
pull-batch-size: 32
# ACL(与 producer 对应)
access-key: ${ROCKETMQ_ACCESS_KEY:}
secret-key: ${ROCKETMQ_SECRET_KEY:}3.1.2 生产者整合
实体类要在互联网传播,需要实现Serializable接口以虚拟化
import lombok.Data;
import java.io.Serializable;
@Data
@AllArgsConstructor
public class Order implements Serializable {
private String orderId;
private String userId;
private String productId;
private int quantity;
private double price;
}与redis等中间件相似,注入一个XxxTemplate以发送消息
@Autowired
RocketMQTemplate rocketMQTemplate;
public void test() {
Order order = new Order("orderId111", "userId222", "productId333", 6, 6.6);
rocketMQTemplate.convertAndSend("topic8", order);
}3.1.3 消费者整合
@Service
// 监听topic8主题,消费组group8,过滤tag1和tag2的消息,广播模式
@RocketMQMessageListener(topic = "topic8", consumerGroup = "group8",
selectorExpression = "tag1 || tag2",
messageModel = MessageModel.BROADCASTING)
public class DemoConsumer implements RocketMQListener<Order> {
@Override
public void onMessage(Order order) {
System.out.println("接收到订单: " + order);
}
}
👇使用sql过滤
// 监听topic8主题,消费组group8,使用SQL92表达式过滤消息
@RocketMQMessageListener(topic = "topic8", consumerGroup = "group8",
selectorType = SelectorType.SQL92, selectorExpression = "quantity >= 2 AND price <= 100")3.1.4 消息类别设置
public void test() {
Order order = new Order("orderId111", "userId222", "productId333", 6, 6.6);
// 发送同步消息
SendResult syncSend = rocketMQTemplate.syncSend("topic8", order);
// 延时等级3的延时消息
SendResult delaySend = rocketMQTemplate.syncSend("topic8",
MessageBuilder.withPayload(order).build(), 3000, 3);
// 发送异步消息
rocketMQTemplate.asyncSend("topic8", order, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.println("消息发送成功: " + sendResult);
}
@Override
public void onException(Throwable e) {
System.out.println("消息发送失败: " + e.getMessage());
}
}, 3000); // 超时时间3秒
// 发送单向消息
rocketMQTemplate.sendOneWay("topic8", order);
}3.2 消息顺序错乱
| 原因分类 | 具体原因 | 典型场景 | 解决方法 |
|---|---|---|---|
| 生产者 | 消息发到不同队列 | 默认轮询发送 | 使用 MessageQueueSelector 按业务 key 路由 |
| 异步发送 + 网络波动 | sendAsync | 改用同步发送(send) | |
| 发送失败重试 | 网络抖动 | 关闭重试或用同步发送 | |
| Broker | 多队列并行写入 | Topic 有多个 Queue | 减少队列数或按 key 路由 |
| 主从同步延迟 | 主从切换 | 使用同步刷盘 + 同步复制 | |
| 消费者 | 并发消费(多线程) | MessageListenerConcurrently | 改用 MessageListenerOrderly |
| 订阅多个队列 | 消费者负载均衡 | 按队列粒度单线程消费 | |
| 消费失败重试 | 业务异常 | 顺序消费模式下重试会阻塞当前队列 | |
| Rebalance | 消费者上下线 | 避免频繁重启,使用有序消费 | |
| 网络/系统 | 网络延迟、丢包 | 跨机房部署 | 优化网络,使用同步模式 |
| 时钟不同步 | 分布式环境 | 使用逻辑序列号而非时间戳 |
RocketMQ采用了局部顺序一致性的机制,实现了单个队列中的消息严格有序。也就是说,如果想要保证顺序消费,必须将一组消息发送到同一个队列中,然后再由消费者进行顺序消费。
RocketMQ推荐的顺序消费解决方案是:按照业务划分不同的队列,然后将需要顺序消费的消息发往同一队列中即可,不同业务之间的消息仍采用并发消费。这种方式在满足顺序消费的同时提高了消息的处理速度,在一定程度上避免了消息堆积问题
RocketMQ顺序消息的原理是:
- 在Producer(生产者)把一批需要保证顺序的消息发送到同一个
MessageQueue - Consumer(消费者)则通过加锁的机制来保证消息消费的顺序性,Broker端通过对
MessageQueue进行加锁,保证同一个MessageQueue只能被同一个Consumer进行消费。
3.2.1 生产者侧的原因
(1) 消息发送到了不同的队列
- 原因:RocketMQ 的 Topic 由多个 MessageQueue 组成,默认情况下消息会轮询或随机发送到不同队列。
- 结果:同一业务流程的消息分散到多个队列,消费时无法保证顺序。
示例场景: 订单创建 → 订单付款 → 订单推送→ 订单完成,四条消息分别进入 Queue0、Queue1、Queue2,消费者可能先读到”订单完成”。

(2) 异步发送 + 网络波动
- 原因:使用
sendAsync或sendOneway时,消息发送顺序依赖网络和 Broker 处理速度。 - 结果:先发的消息可能后到达 Broker。
(3) 发送失败重试导致乱序
- 原因:消息 A 发送失败触发重试,期间消息 B 已成功发送并入队。
- 结果:最终 Broker 中顺序变成 B → A。
3.2.2 Broker(服务端)侧原因
(1) 多队列并行写入
- 原因:一个 Topic 默认有 4 个或更多 MessageQueue,Broker 并发写入不同队列。
- 结果:即使生产者按顺序发送,不同队列之间无全局顺序保证。
(2) 主从同步延迟(高可用场景)
- 原因:Master 写入成功但 Slave 同步延迟,主从切换时可能丢失部分消息或顺序。
- 结果:故障恢复后消息顺序可能错乱。
3.2.3 消费者侧原因
(1) 并发消费(最常见)
- 原因:默认使用
MessageListenerConcurrently,消费者开启多线程(默认 20 线程)并发处理消息。 - 结果:即使消息在队列中有序,多线程处理导致执行顺序随机。
代码示例(错误用法):
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
// 多线程并发执行,无法保证顺序
msgs.forEach(msg -> process(msg));
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});(2) 消费者订阅了多个队列
- 原因:一个消费者实例可能分配到多个 MessageQueue,从多个队列拉取消息并行消费。
- 结果:不同队列的消息交错处理。
(3) 消费失败重试机制
- 原因:消息 A 消费失败进入重试队列(%RETRY%),消息 B 继续消费成功。
- 结果:A 延迟重新消费,顺序被打乱。
(4) Rebalance(队列重新分配)
- 原因:消费者组中实例增减触发 Rebalance,队列在消费者间重新分配。
- 结果:未消费完的队列转移到其他消费者,可能导致重复消费或乱序。
3.2.4 网络与系统原因
(1) 网络延迟与丢包
- 原因:生产者到 Broker、Broker 到消费者的网络不稳定。
- 结果:消息到达时间不确定,顺序可能错乱。
(2) 时钟不同步
- 原因:分布式系统中不同节点时钟偏差。
- 结果:依赖时间戳排序的逻辑可能失效。
3.2.5 解决方案
顺序发送与消费
使用 MessageQueueSelector 将相关消息发送到同一队列:
- 同一
orderId的所有消息进入同一队列。 - 使用 同步发送而非异步。
// 按订单 ID 哈希选择队列
producer.send(message, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
String orderId = (String) arg; // 订单ID作为参数
int index = Math.abs(orderId.hashCode()) % mqs.size();
return mqs.get(index);
}
}, orderId); // 传入订单ID
使用 MessageListenerOrderly 单线程顺序消费:
- 消费线程池是共享的,但通过锁机制保证同一队列消息的有序性。
MessageListenerOrderly对每个队列加锁,同一队列消息串行消费。- 不同队列之间仍然并行消费。
consumer.registerMessageListener((MessageListenerOrderly) (msgs, context) -> {
// RocketMQ 保证同一队列的消息串行处理
msgs.forEach(msg -> process(msg));
return ConsumeOrderlyStatus.SUCCESS;
});注意事项:
- 顺序消费会降低吞吐量(同一队列只能串行处理)。
- 消费失败时会阻塞当前队列:如果返回
SUSPEND_CURRENT_QUEUE_A_MOMENT,RocketMQ 会暂停消费该队列,稍后重试。 - 必须配合生产者使用 MessageQueueSelector,否则相关消息可能进入不同队列,仍然无法保证业务顺序。
减少队列数量
- 如果业务允许,将 Topic 的队列数设为 1(牺牲性能换取全局有序)。
- 适用于低并发、强顺序要求的场景。
消费者侧业务排序
- 在消费端根据业务字段(如时间戳、版本号)重新排序后再处理。
- 适用于对实时性要求不高的场景。
3.3 事务
提交状态:允许进入队列,此消息与非事务消息无区别
回滚状态:不允许进入队列,此消息等同于未发送过
中间状态:完成了half消息的发送,未对MQ进行二次状态确认
注意:事务消息仅与生产者有关,与消费者无关

3.3.1 正常事务过程
生产者集群 向 Broker 发送一条 Half 消息(半消息/预备消息)Half 消息对消费者不可见,暂时不会进入消费者队列;
Broker 成功接收 Half 消息后,返回 OK 状态 给生产者
生产者收到 OK 后,开始执行本地事务(如数据库操作、业务逻辑等)
- 本地事务成功→ 向 Broker 发送Commit(提交)指令
- Broker 将 Half 消息转为正常消息,进入消费者队列
- 消费者集群可以消费该消息
- 本地事务失败→ 向 Broker 发送Rollback(回滚)指令
- Broker 删除 Half 消息,消息不会被消费
3.3.2 事务补偿过程(网络异常或超时)
如果 Broker 在一定时间(默认60s)内没有收到 Commit/Rollback 指令(如网络故障、生产者宕机),Broker 会主动发起事务回查
检查本地事务状态,Broker 向生产者发起回查请求,询问本地事务的最终状态
根据事务状态提交或回滚
- 生产者检查本地事务状态(查数据库、日志等)
- 返回事务状态:
- COMMIT_MESSAGE → Broker 提交消息
- ROLLBACK_MESSAGE → Broker 删除消息
- UNKNOWN → Broker 稍后再次回查
3.3.3 代码示例
import org.apache.rocketmq.client.producer.*;
import org.apache.rocketmq.common.message.Message;
public class TransactionProducer {
public static void main(String[] args) throws Exception {
// 1. 创建事务消息生产者
TransactionMQProducer producer = new TransactionMQProducer("transaction-producer-group");
producer.setNamesrvAddr("192.168.126.129:9876");
// 2. 设置事务监听器(处理本地事务 + 事务回查)
producer. setTransactionListener(new TransactionListener() {
// 执行本地事务
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
try {
System.out.println("执行本地事务: " + new String(msg.getBody()));
// 模拟本地事务(如数据库操作)
boolean success = doLocalTransaction();
if (success) {
return LocalTransactionState.COMMIT_MESSAGE; // 提交
} else {
return LocalTransactionState.ROLLBACK_MESSAGE; // 回滚
}
} catch (Exception e) {
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}
// 事务回查(Broker 主动调用)
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
System.out.println("Broker 回查事务状态: " + msg.getTransactionId());
// 查询本地事务执行结果(如查数据库)
boolean committed = checkTransactionStatus(msg.getTransactionId());
if (committed) {
return LocalTransactionState.COMMIT_MESSAGE;
} else {
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}
});
producer.start();
// 3. 发送事务消息
Message message = new Message("TransactionTopic", "TagA", "订单创建成功". getBytes());
TransactionSendResult result = producer. sendMessageInTransaction(message, null);
System.out.println("发送结果: " + result. getSendStatus());
// 不关闭,等待回查
// producer.shutdown();
}
// 模拟本地事务
private static boolean doLocalTransaction() {
// 实际业务:操作数据库、调用其他服务等
return true;
}
// 模拟回查本地事务状态
private static boolean checkTransactionStatus(String transactionId) {
// 实际业务:查数据库判断事务是否已提交
return true;
}
}四、mq集群
┌──────────────────────────────────────────────────────────────┐
│ NameServer │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ NameServer 1 │ │ NameServer 2 │ │ NameServer 3 │ │
│ └──────────────┘ └──────────────┘ └──────────────┘ │
│ ▲ ▲ ▲ │
└─────────┼─────────────────┼─────────────────┼────────────────┘
│ 心跳注册 │ │
│ │ │
┌─────────┴─────────────────┴─────────────────┴────────────────┐
│ Broker │
│ ┌─────────────────────┐ ┌─────────────────────┐ │
│ │ Broker Master A │ │ Broker Master B │ │
│ │ (可读可写) │ │ (可读可写) │ │
│ └──────────┬──────────┘ └──────────┬──────────┘ │
│ │ 同步/异步复制 │ │
│ ┌──────────┴──────────┐ ┌──────────┴──────────┐ │
│ │ Broker Slave A │ │ Broker Slave B │ │
│ │ (只读) │ │ (只读) │ │
│ └─────────────────────┘ └─────────────────────┘ │
└──────────────────────────────────────────────────────────────┘
▲ ▲
│ │
┌───────┴────────┐ ┌───────┴────────┐
│ Producer │ │ Consumer │
│ ┌──────────┐ │ │ ┌──────────┐ │
│ │Producer 1│ │ │ │Consumer 1│ │
│ ├──────────┤ │ │ ├──────────┤ │
│ │Producer 2│ │ │ │Consumer 2│ │
│ └──────────┘ │ │ └──────────┘ │
└────────────────┘ └────────────────┘4.1 集群架构
4.1.1 NameServer 集群
作用
- 路由注册中心:管理 Broker 的路由信息
- 无状态节点:NameServer 之间不通信,各自独立
- 轻量级:不持久化数据,全部存储在内存
工作机制
- Broker 注册:每个 Broker 启动后向所有 NameServer 注册自己的信息
- 心跳维持:Broker 每 30 秒向 NameServer 发送心跳
- 路由剔除:NameServer 超过 120 秒未收到心跳,认为 Broker 下线,删除路由
- 客户端获取路由:Producer/Consumer 从任意一个 NameServer 获取 Broker 地址
特点
- 无主从关系:所有 NameServer 地位平等
- 数据最终一致:通过 Broker 的定时注册实现
- 高可用:单个 NameServer 挂掉不影响其他节点
部署建议
- 生产环境:至少部署 2-3 个 NameServer 节点
- 部署位置:分布在不同机器/机房

4.1.2 Broker 集群
Broker 是消息存储和转发的核心,支持多种集群模式。
Broker 角色
| 角色 | 说明 | 读写权限 |
|---|---|---|
| Master | 主节点,负责消息写入和读取 | 可读可写 |
| Slave | 从节点,从 Master 同步数据,提供读服务 | 只读(默认) |
消息同步方式
| 方式 | 说明 | 优点 | 缺点 |
|---|---|---|---|
| 同步复制 | Master 等待 Slave 确认后才返回成功 | 数据可靠性高 | 性能较低 |
| 异步复制 | Master 写入成功立即返回,异步同步到 Slave | 性能高 | 可能丢失少量数据 |
单 Master 模式
┌─────────────┐
│ Master │
│ (单节点) │
└─────────────┘特点:
- 最简单的部署方式
- 风险:单点故障,Master 挂了整个服务不可用
适用场景:
- 开发/测试环境
- 不建议生产环境使用
多 Master 模式(无 Slave)
┌──────────┐ ┌──────────┐ ┌──────────┐
│ Master A │ │ Master B │ │ Master C │
└──────────┘ └──────────┘ └──────────┘特点:
- 多个 Master 节点,每个 Master 管理不同的 Queue
- 无从节点,Master 之间不复制数据
- 优点:配置简单,性能高
- 缺点:单个 Master 宕机后,其上的消息无法消费(直到恢复)
适用场景:
- 对消息可靠性要求不高
- 追求高吞吐量
多 Master 多 Slave 模式(异步复制)
┌──────────┐ ┌──────────┐
│ Master A │───────▶│ Slave A │
└──────────┘ 异步 └──────────┘
┌──────────┐ ┌──────────┐
│ Master B │───────▶│ Slave B │
└──────────┘ 异步 └──────────┘特点:
- 每个 Master 配置至少 1 个 Slave
- Master 写入成功立即返回,异步同步到 Slave
- Master 宕机后,Consumer 可从 Slave 继续消费(只读)
- 优点:高性能 + 高可用
- 缺点:Master 宕机可能丢失极少量未同步的数据
配置示例(Master):
# broker-a.properties
brokerClusterName=DefaultCluster
brokerName=broker-a
brokerId=0 # 0 表示 Master
brokerRole=ASYNC_MASTER # 异步主节点
flushDiskType=ASYNC_FLUSH # 异步刷盘配置示例(Slave):
# broker-a-slave.properties
brokerClusterName=DefaultCluster
brokerName=broker-a # 与 Master 同名
brokerId=1 # 非 0 表示 Slave
brokerRole=SLAVE # 从节点适用场景:
- 生产环境首选
- 对性能和可用性都有要求
多 Master 多 Slave 模式(2主2从)

┌──────────┐ 同步 ┌──────────┐
│ Master A │───────▶│ Slave A │
└──────────┘ 等待 └──────────┘
┌──────────┐ 同步 ┌──────────┐
│ Master B │───────▶│ Slave B │
└──────────┘ 等待 └──────────┘特点:
- Master 写入后等待 Slave 同步成功才返回
- 数据零丢失(同步成功才算成功)
- 优点:数据可靠性最高
- 缺点:性能较低(RT 增加)
配置示例(Master):
brokerRole=SYNC_MASTER # 同步主节点
flushDiskType=SYNC_FLUSH # 同步刷盘(可选,进一步提高可靠性)适用场景:
- 金融、支付等对数据可靠性要求极高的场景
- 可以接受性能损失
五、高级特性
5.1 消息重复

5.1.1 重复消费怎么解决
生产端为了保证消息发送成功,可能会重复推送(直到收到成功 ACK),会产生重复消息。但是一个成熟的 MQ Server 框架一般会想办法解决,避免存储重复消息(比如:空间换时间,存储已处理过的 message_id),给生产端提供一个幂等性的发送消息接口。
但是消费端却无法根本解决这个问题,在高并发标准要求下,拉取消息+业务处理+提交消费位移需要做事务处理,另外消费端服务可能宕机,很可能会拉取到重复消息。
所以,只能业务端自己做控制,对于已经消费成功的消息,本地数据库表或 Redis 缓存业务标识,每次处理前先进行校验,保证幂等。

| 场景 | ACK | 方向 | 代码体现 |
|---|---|---|---|
| 生产者发送消息 | Broker 确认”消息已存储” | Broker → Producer | SendResult(返回值) |
| 消费者消费消息 | Consumer 确认”消息已处理” | Consumer → Broker | CONSUME_SUCCESS(返回值) |
❓RocketMQ怎么保证消息不被重复消费
在业务逻辑中实现幂等性,即使消息被重复消费,也不会影响业务状态。例如,对于支付或转账类操作,可以使用唯一订单号或事务ID作为幂等性的标识符,确保同样的操作只会被执行一次。
5.1.2 如何保证幂等写
幂等性是指同一操作的多次执行对系统状态的影响与一次执行结果一致。 例如,支付接口因网络重试被多次调用,最终应确保仅扣款一次。消费者在处理消息前,先检查消息 ID 是否已处理,若已处理则直接丢弃。

方案一:唯一标识(幂等键)
- 客户端为每个请求生成全局唯一 ID(如 UUID、业务主键)
- 服务端校验该 ID 是否已处理,适用场景:接口调用、消息消费等
方案二:数据库事务 + 乐观锁
- 通过版本号或状态字段控制并发更新,确保多次更新等同于一次操作
- 适用场景:数据库库存更新、订单状态变更
方案三:数据库唯一约束
- 利用数据库唯一索引防止重复数据写入
- 适用场景:数据库插入场景(如订单创建)
方案四:分布式锁
- 通过锁机制保证同一时刻仅有一个请求执行关键操作
- 适用场景:高并发下的资源抢夺(如秒杀)
方案五:消息去重
- 消息队列生产者为每条消息生成唯一的消息 ID
- 消费者在处理消息前,先检查该消息 ID 是否已经处理过
- 如果已处理则删除该消息
5.2 高效读写
5.2.1 顺序写
ActiveMQ先存储消息进数据库,再存入linux文件系统,吞吐量小
现代MQ直接存入文件系统,吞吐量大

文件初始顺序存储,删除部分文件后产生零散的存储碎片。当写入新文件时,文件系统会填充这些碎片空间,导致数据不连续存储,形成随机写操作,降低 I/O 性能。

RocketMQ在存储数据前,会先向linux申请一定的存储空间,存储消息时,即可实现顺序写

5.2.2 零拷贝

5.2.3 消息存储的物理地址
❓RocketMQ 如何优化随机写
- CommitLog 顺序写入(核心优化)
- 所有消息追加到同一个 CommitLog 文件末尾
- 不支持随机修改,只支持顺序追加
- 利用操作系统页缓存(Page Cache)批量刷盘
CommitLog 文件:
[消息1][消息2][消息3][消息4]... ← 顺序追加,磁头不回退
- 异步刷盘(默认)
待到数据存储到一定量后,再一次性写入硬盘,节约连接
| 刷盘方式 | 性能 | 安全性 | 说明 |
|---|---|---|---|
| 异步刷盘 | 高 | 中 | 消息先写入内存,异步批量刷盘(默认) |
| 同步刷盘 | 低 | 高 | 每条消息立即刷盘,确保不丢失 |
配置方式:
# broker.conf
flushDiskType=ASYNC_FLUSH # 异步刷盘(推荐)
# flushDiskType=SYNC_FLUSH # 同步刷盘(金融场景)
- 定期清理文件,减少碎片
- 定时删除过期文件(默认凌晨 4 点)
- 按整个文件删除,而非按消息删除
- 避免文件内部产生碎片
# broker.conf
deleteWhen=04 # 删除时间(凌晨 4 点)
fileReservedTime=48 # 文件保留时间(小时)5.3 高可用性
NameServer
- 无状态节点,Broker 向所有 NameServer 注册
消息服务器
- 主从架构(2M-2S)
消息生产
- Topic 的 MessageQueue 分布在多个 Broker Master 上,Producer 通过负载均衡发送消息,保障单个 Master 宕机后不影响可用性
消息消费
- RocketMQ 根据 Master 负载动态选择消息读取节点,Master 繁忙时自动切换到 Slave 读取
- 👇实现了读写分离

主从数据复制
| 维度 | 同步复制(SYNC_MASTER) | 异步复制(ASYNC_MASTER) |
|---|---|---|
| 数据安全性 | 高(零丢失) | 中(可能丢失秒级数据) |
| 性能(TPS) | 低(1-2 万) | 高(10 万+) |
| 延迟(RT) | 高(10-50ms) | 低(< 10ms) |
| 故障恢复 | 简单(Slave 数据完整) | 需人工介入(可能丢数据) |
| 适用场景 | 金融、支付、订单系统 | 日志、监控、推荐系统 |
| 配置 | brokerRole=SYNC_MASTER | brokerRole=ASYNC_MASTER |
5.4 消息重试
5.4.1 顺序消息重试
当消费者消费消息失败后,RocketMQ会自动进行消息重试(每次间隔时间为1秒) 注意:应用会出现消息消费被阻塞的情况,因此,要对顺序消息的消费情况进行监控,避免阻塞现象的发生
5.4.2 无序消息重试

5.4.3 死信
👆当消息消费重试到达了指定次数(默认16次)后,MQ将无法被正常消费的消息称为死信消息(Dead-Letter Message) 死信消息不会被直接抛弃,而是保存到了一个全新的队列中,该队列称为死信队列(Dead-Letter Queue)
死信队列特征
- 归属某一个组(GourpId),而不归属Topic,也不归属消费者
- 一个死信队列中可以包含同一个组下的多个Topic中的死信消息
- 死信队列不会进行默认初始化,当第一个死信出现后,此队列首次初始化
死信队列中消息特征
- 不会被再次重复消费
- 死信队列中的消息有效期为3天,达到时限后将被清除
