Appearance
消息可靠性投递
一、概述
消息可靠性投递是消息系统中至关重要的环节,确保消息从生产者到消费者的整个链路中不丢失。本文档详细介绍 Spring AMQP 中实现消息可靠性投递的各种机制。
1.1 可靠性链路
┌─────────────────────────────────────────────────────────────────────┐
│ 消息可靠性链路 │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ 生产者 │ ──► │ Broker │ ──► │ 队列 │ ──► │ 消费者 │ │
│ │ Producer │ │ Exchange │ │ Queue │ │ Consumer │ │
│ └────┬─────┘ └────┬─────┘ └────┬─────┘ └────┬─────┘ │
│ │ │ │ │ │
│ ▼ ▼ ▼ ▼ │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │发布者确认 │ │消息持久化│ │队列持久化│ │消费确认 │ │
│ │Publisher │ │Message │ │Queue │ │Consumer │ │
│ │Confirm │ │Persist │ │Persist │ │Ack │ │
│ └──────────┘ └──────────┘ └──────────┘ └──────────┘ │
│ │
│ 可靠性保障: │
│ ├── 生产者:发布确认、返回回调 │
│ ├── Broker:消息持久化、队列持久化 │
│ └── 消费者:手动确认、重试机制 │
│ │
└─────────────────────────────────────────────────────────────────────┘1.2 可靠性等级
text
┌─────────────────────────────────────────────────────────────────────┐
│ 消息可靠性等级 │
├───────────────┬─────────────────────────────────────────────────────┤
│ 等级 │ 保障措施 │
├───────────────┼─────────────────────────────────────────────────────┤
│ 基础可靠 │ 消息持久化 + 手动确认 │
├───────────────┼─────────────────────────────────────────────────────┤
│ 标准可靠 │ 消息持久化 + 发布确认 + 手动确认 │
├───────────────┼─────────────────────────────────────────────────────┤
│ 高可靠 │ 消息持久化 + 发布确认 + 手动确认 + 死信队列 │
├───────────────┼─────────────────────────────────────────────────────┤
│ 极高可靠 │ 高可靠 + 消息追踪 + 监控告警 + 备份交换器 │
└───────────────┴─────────────────────────────────────────────────────┘二、核心知识点
2.1 发布者确认
2.1.1 配置发布确认
java
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class PublisherConfirmConfig {
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory factory = new CachingConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
factory.setUsername("guest");
factory.setPassword("guest");
factory.setPublisherConfirmType(
CachingConnectionFactory.ConfirmType.CORRELATED);
return factory;
}
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate template = new RabbitTemplate(connectionFactory);
template.setConfirmCallback((correlationData, ack, cause) -> {
if (ack) {
System.out.println("消息确认成功");
} else {
System.err.println("消息确认失败: " + cause);
}
});
return template;
}
}2.1.2 确认类型
text
┌─────────────────────────────────────────────────────────────────────┐
│ 发布确认类型 │
├───────────────────────┬─────────────────────────────────────────────┤
│ 类型 │ 说明 │
├───────────────────────┼─────────────────────────────────────────────┤
│ NONE │ 禁用发布确认 │
├───────────────────────┼─────────────────────────────────────────────┤
│ SIMPLE │ 同步确认,使用 waitForConfirms() │
├───────────────────────┼─────────────────────────────────────────────┤
│ CORRELATED │ 异步确认,使用回调函数 │
│ │ 推荐使用 │
└───────────────────────┴─────────────────────────────────────────────┘2.1.3 完整确认示例
java
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CompletableFuture;
@Component
public class ReliablePublisher {
private final RabbitTemplate rabbitTemplate;
private final ConcurrentMap<String, PendingMessage> pendingMessages =
new ConcurrentHashMap<>();
public ReliablePublisher(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
if (correlationData != null) {
String messageId = correlationData.getId();
PendingMessage pending = pendingMessages.remove(messageId);
if (ack) {
System.out.println("消息确认成功: " + messageId);
if (pending != null) {
pending.getFuture().complete(true);
}
} else {
System.err.println("消息确认失败: " + messageId + ", 原因: " + cause);
if (pending != null) {
pending.getFuture().completeExceptionally(
new RuntimeException(cause));
handleConfirmFailure(pending, cause);
}
}
}
});
}
public CompletableFuture<Boolean> sendReliable(String exchange,
String routingKey,
Object message) {
String messageId = UUID.randomUUID().toString();
CorrelationData correlationData = new CorrelationData(messageId);
CompletableFuture<Boolean> future = new CompletableFuture<>();
pendingMessages.put(messageId, new PendingMessage(messageId, message, future));
rabbitTemplate.convertAndSend(exchange, routingKey, message, msg -> {
msg.getMessageProperties().setMessageId(messageId);
msg.getMessageProperties().setDeliveryMode(
org.springframework.amqp.core.MessageDeliveryMode.PERSISTENT);
return msg;
}, correlationData);
return future;
}
private void handleConfirmFailure(PendingMessage pending, String cause) {
System.err.println("处理确认失败: " + pending.getMessageId());
storeFailedMessage(pending);
}
private void storeFailedMessage(PendingMessage pending) {
System.out.println("存储失败消息以供重试");
}
}
class PendingMessage {
private final String messageId;
private final Object message;
private final CompletableFuture<Boolean> future;
public PendingMessage(String messageId, Object message,
CompletableFuture<Boolean> future) {
this.messageId = messageId;
this.message = message;
this.future = future;
}
public String getMessageId() { return messageId; }
public Object getMessage() { return message; }
public CompletableFuture<Boolean> getFuture() { return future; }
}2.2 返回回调
java
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
@Component
public class ReturnCallbackHandler {
private final RabbitTemplate rabbitTemplate;
public ReturnCallbackHandler(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}
@PostConstruct
public void init() {
rabbitTemplate.setReturnsCallback(returned -> {
System.err.println("=== 消息被退回 ===");
System.err.println("交换器: " + returned.getExchange());
System.err.println("路由键: " + returned.getRoutingKey());
System.err.println("回复码: " + returned.getReplyCode());
System.err.println("回复文本: " + returned.getReplyText());
handleReturnedMessage(returned);
});
rabbitTemplate.setMandatory(true);
}
private void handleReturnedMessage(
RabbitTemplate.Returned returned) {
String messageId = returned.getMessage().getMessageProperties().getMessageId();
switch (returned.getReplyCode()) {
case 312:
System.err.println("消息无法路由,检查绑定关系");
break;
case 313:
System.err.println("没有消费者");
break;
default:
System.err.println("未知错误: " + returned.getReplyText());
}
storeReturnedMessage(returned);
}
private void storeReturnedMessage(RabbitTemplate.Returned returned) {
System.out.println("存储退回消息以供后续处理");
}
}2.3 消息持久化
2.3.1 队列持久化
java
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class DurableQueueConfig {
@Bean
public Queue durableQueue() {
return QueueBuilder.durable("durable.queue")
.withArgument("x-message-ttl", 86400000)
.withArgument("x-dead-letter-exchange", "dlx.exchange")
.build();
}
@Bean
public DirectExchange durableExchange() {
return ExchangeBuilder.directExchange("durable.exchange")
.durable(true)
.build();
}
@Bean
public Binding durableBinding() {
return BindingBuilder.bind(durableQueue())
.to(durableExchange())
.with("durable.key");
}
}2.3.2 消息持久化
java
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
@Component
public class PersistentMessageSender {
private final RabbitTemplate rabbitTemplate;
public PersistentMessageSender(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}
public void sendPersistent(String exchange, String routingKey, Object message) {
rabbitTemplate.convertAndSend(exchange, routingKey, message, msg -> {
msg.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
return msg;
});
}
public void sendWithAllProperties(String exchange, String routingKey,
Object message, String messageId) {
Message amqpMessage = MessageBuilder.withBody(message.toString().getBytes())
.setDeliveryMode(MessageDeliveryMode.PERSISTENT)
.setMessageId(messageId)
.setContentType("application/json")
.setPriority(5)
.build();
rabbitTemplate.send(exchange, routingKey, amqpMessage);
}
}2.4 消费者确认
java
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import com.rabbitmq.client.Channel;
import java.io.IOException;
@Component
public class ReliableConsumer {
@RabbitListener(queues = "reliable.queue", ackMode = "MANUAL")
public void handleReliableMessage(Message message, Channel channel)
throws IOException {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
String messageId = message.getMessageProperties().getMessageId();
try {
String body = new String(message.getBody());
System.out.println("处理消息: " + messageId);
processMessage(body);
channel.basicAck(deliveryTag, false);
System.out.println("消息确认成功: " + messageId);
} catch (RetryableException e) {
System.err.println("可重试异常: " + e.getMessage());
channel.basicNack(deliveryTag, false, true);
} catch (NonRetryableException e) {
System.err.println("不可重试异常: " + e.getMessage());
channel.basicNack(deliveryTag, false, false);
} catch (Exception e) {
System.err.println("未知异常: " + e.getMessage());
channel.basicNack(deliveryTag, false, false);
}
}
private void processMessage(String body) throws Exception {
System.out.println("处理: " + body);
}
}
class RetryableException extends Exception {
public RetryableException(String message) { super(message); }
}
class NonRetryableException extends Exception {
public NonRetryableException(String message) { super(message); }
}2.5 死信队列
java
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class DeadLetterQueueConfig {
@Bean
public DirectExchange dlxExchange() {
return new DirectExchange("dlx.exchange", true, false);
}
@Bean
public Queue dlqQueue() {
return QueueBuilder.durable("dlq.queue")
.withArgument("x-message-ttl", 604800000)
.build();
}
@Bean
public Binding dlqBinding() {
return BindingBuilder.bind(dlqQueue())
.to(dlxExchange())
.with("dead.letter");
}
@Bean
public Queue businessQueue() {
return QueueBuilder.durable("business.queue")
.withArgument("x-dead-letter-exchange", "dlx.exchange")
.withArgument("x-dead-letter-routing-key", "dead.letter")
.withArgument("x-message-ttl", 300000)
.build();
}
@Bean
public DirectExchange businessExchange() {
return new DirectExchange("business.exchange", true, false);
}
@Bean
public Binding businessBinding() {
return BindingBuilder.bind(businessQueue())
.to(businessExchange())
.with("business.key");
}
}2.6 备份交换器
java
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class AlternateExchangeConfig {
@Bean
public FanoutExchange backupExchange() {
return new FanoutExchange("backup.exchange", true, false);
}
@Bean
public Queue backupQueue() {
return new Queue("backup.queue", true);
}
@Bean
public Binding backupBinding() {
return BindingBuilder.bind(backupQueue())
.to(backupExchange());
}
@Bean
public DirectExchange mainExchange() {
return ExchangeBuilder.directExchange("main.exchange")
.durable(true)
.withArgument("alternate-exchange", "backup.exchange")
.build();
}
@Bean
public Queue mainQueue() {
return new Queue("main.queue", true);
}
@Bean
public Binding mainBinding() {
return BindingBuilder.bind(mainQueue())
.to(mainExchange())
.with("main.key");
}
}三、代码示例
3.1 完整可靠性配置
java
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class CompleteReliableConfig {
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory factory = new CachingConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
factory.setUsername("guest");
factory.setPassword("guest");
factory.setPublisherConfirmType(
CachingConnectionFactory.ConfirmType.CORRELATED);
factory.setPublisherReturns(true);
return factory;
}
@Bean
public MessageConverter jsonMessageConverter() {
return new Jackson2JsonMessageConverter();
}
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory,
MessageConverter jsonMessageConverter) {
RabbitTemplate template = new RabbitTemplate(connectionFactory);
template.setMessageConverter(jsonMessageConverter);
template.setMandatory(true);
template.setConfirmCallback((correlationData, ack, cause) -> {
if (!ack) {
System.err.println("消息确认失败: " + cause);
}
});
template.setReturnsCallback(returned -> {
System.err.println("消息被退回: " + returned.getReplyText());
});
return template;
}
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(
ConnectionFactory connectionFactory,
MessageConverter jsonMessageConverter) {
SimpleRabbitListenerContainerFactory factory =
new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setMessageConverter(jsonMessageConverter);
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
factory.setPrefetchCount(10);
factory.setDefaultRequeueRejected(false);
return factory;
}
@Bean
public DirectExchange dlxExchange() {
return new DirectExchange("dlx.exchange", true, false);
}
@Bean
public Queue dlqQueue() {
return new Queue("dlq.queue", true);
}
@Bean
public Binding dlqBinding() {
return BindingBuilder.bind(dlqQueue())
.to(dlxExchange())
.with("dead.letter");
}
@Bean
public Queue reliableQueue() {
return QueueBuilder.durable("reliable.queue")
.withArgument("x-dead-letter-exchange", "dlx.exchange")
.withArgument("x-dead-letter-routing-key", "dead.letter")
.build();
}
@Bean
public DirectExchange reliableExchange() {
return new DirectExchange("reliable.exchange", true, false);
}
@Bean
public Binding reliableBinding() {
return BindingBuilder.bind(reliableQueue())
.to(reliableExchange())
.with("reliable.key");
}
}3.2 可靠消息服务
java
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Service;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
@Service
public class ReliableMessageService {
private final RabbitTemplate rabbitTemplate;
private final ConcurrentMap<String, MessageRecord> messageRecords =
new ConcurrentHashMap<>();
public ReliableMessageService(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
setupCallbacks();
}
private void setupCallbacks() {
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
if (correlationData != null) {
String messageId = correlationData.getId();
MessageRecord record = messageRecords.get(messageId);
if (record != null) {
if (ack) {
record.setStatus(MessageStatus.CONFIRMED);
record.getFuture().complete(true);
} else {
record.setStatus(MessageStatus.FAILED);
record.setError(cause);
record.getFuture().complete(false);
}
}
}
});
rabbitTemplate.setReturnsCallback(returned -> {
String messageId = returned.getMessage()
.getMessageProperties().getMessageId();
MessageRecord record = messageRecords.get(messageId);
if (record != null) {
record.setStatus(MessageStatus.RETURNED);
record.setError(returned.getReplyText());
record.getFuture().complete(false);
}
});
}
public CompletableFuture<Boolean> sendReliable(String exchange,
String routingKey,
Object payload) {
String messageId = UUID.randomUUID().toString();
CorrelationData correlationData = new CorrelationData(messageId);
CompletableFuture<Boolean> future = new CompletableFuture<>();
MessageRecord record = new MessageRecord();
record.setMessageId(messageId);
record.setPayload(payload);
record.setExchange(exchange);
record.setRoutingKey(routingKey);
record.setStatus(MessageStatus.PENDING);
record.setFuture(future);
record.setCreateTime(System.currentTimeMillis());
messageRecords.put(messageId, record);
rabbitTemplate.convertAndSend(exchange, routingKey, payload, message -> {
message.getMessageProperties().setMessageId(messageId);
message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
return message;
}, correlationData);
return future.orTimeout(30, TimeUnit.SECONDS);
}
public MessageRecord getMessageRecord(String messageId) {
return messageRecords.get(messageId);
}
public void retryFailedMessages() {
messageRecords.values().stream()
.filter(r -> r.getStatus() == MessageStatus.FAILED)
.forEach(r -> {
sendReliable(r.getExchange(), r.getRoutingKey(), r.getPayload());
});
}
}
enum MessageStatus {
PENDING, CONFIRMED, FAILED, RETURNED
}
class MessageRecord {
private String messageId;
private Object payload;
private String exchange;
private String routingKey;
private MessageStatus status;
private String error;
private CompletableFuture<Boolean> future;
private long createTime;
public String getMessageId() { return messageId; }
public void setMessageId(String messageId) { this.messageId = messageId; }
public Object getPayload() { return payload; }
public void setPayload(Object payload) { this.payload = payload; }
public String getExchange() { return exchange; }
public void setExchange(String exchange) { this.exchange = exchange; }
public String getRoutingKey() { return routingKey; }
public void setRoutingKey(String routingKey) { this.routingKey = routingKey; }
public MessageStatus getStatus() { return status; }
public void setStatus(MessageStatus status) { this.status = status; }
public String getError() { return error; }
public void setError(String error) { this.error = error; }
public CompletableFuture<Boolean> getFuture() { return future; }
public void setFuture(CompletableFuture<Boolean> future) { this.future = future; }
public long getCreateTime() { return createTime; }
public void setCreateTime(long createTime) { this.createTime = createTime; }
}四、实际应用场景
4.1 订单系统可靠性投递
java
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Service;
import java.util.UUID;
@Service
public class OrderReliableService {
private final RabbitTemplate rabbitTemplate;
private final ReliableMessageService reliableMessageService;
public OrderReliableService(RabbitTemplate rabbitTemplate,
ReliableMessageService reliableMessageService) {
this.rabbitTemplate = rabbitTemplate;
this.reliableMessageService = reliableMessageService;
}
public void publishOrderCreated(Order order) {
String messageId = UUID.randomUUID().toString();
reliableMessageService.sendReliable(
"order.exchange",
"order.created",
order
).thenAccept(success -> {
if (success) {
System.out.println("订单创建消息发送成功: " + order.getOrderId());
} else {
System.err.println("订单创建消息发送失败: " + order.getOrderId());
handleSendFailure(order);
}
});
}
private void handleSendFailure(Order order) {
System.out.println("处理发送失败: " + order.getOrderId());
}
}4.2 支付系统可靠性投递
java
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
import com.rabbitmq.client.Channel;
import java.io.IOException;
@Service
public class PaymentReliableService {
@RabbitListener(queues = "payment.queue", ackMode = "MANUAL")
public void handlePayment(Payment payment, Message message, Channel channel)
throws IOException {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
String messageId = message.getMessageProperties().getMessageId();
try {
System.out.println("处理支付: " + payment.getPaymentId());
processPayment(payment);
channel.basicAck(deliveryTag, false);
System.out.println("支付处理成功: " + messageId);
} catch (Exception e) {
System.err.println("支付处理失败: " + e.getMessage());
int retryCount = getRetryCount(message);
if (retryCount < 3) {
channel.basicNack(deliveryTag, false, true);
} else {
channel.basicNack(deliveryTag, false, false);
sendToDlq(payment, e);
}
}
}
private void processPayment(Payment payment) throws Exception {
System.out.println("处理支付逻辑");
}
private int getRetryCount(Message message) {
Object retryCount = message.getMessageProperties()
.getHeaders().get("x-retry-count");
return retryCount != null ? (int) retryCount : 0;
}
private void sendToDlq(Payment payment, Exception e) {
System.out.println("发送到死信队列: " + payment.getPaymentId());
}
}五、常见问题与解决方案
5.1 消息丢失
问题描述: 消息在传输过程中丢失。
解决方案:
java
factory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);
factory.setPublisherReturns(true);
template.setMandatory(true);5.2 消息重复
问题描述: 消息被重复消费。
解决方案:
java
@RabbitListener(queues = "order.queue")
public void handleOrder(Order order) {
if (isProcessed(order.getOrderId())) {
return;
}
processOrder(order);
markAsProcessed(order.getOrderId());
}5.3 消息顺序
问题描述: 消息消费顺序错乱。
解决方案:
java
@RabbitListener(queues = "ordered.queue", concurrency = "1")
public void handleOrdered(Message message) {
processMessage(message);
}六、最佳实践建议
6.1 生产者建议
text
生产者建议:
├── 启用发布确认
├── 设置 mandatory 标志
├── 消息持久化
├── 实现重试机制
└── 记录消息发送状态6.2 消费者建议
text
消费者建议:
├── 使用手动确认模式
├── 实现幂等性处理
├── 配置合理的重试次数
├── 使用死信队列
└── 监控消费状态6.3 架构建议
text
架构建议:
├── 使用备份交换器
├── 配置死信队列
├── 实现消息追踪
├── 监控告警
└── 定期检查失败消息