Appearance
Spring AMQP
一、概述
Spring AMQP 是 Spring 生态系统的一部分,提供了对 AMQP 协议的高级抽象,简化了 RabbitMQ 的使用。它基于 Spring 的核心概念,提供了消息监听容器、消息转换器、事务管理等功能。
1.1 Spring AMQP 架构
┌─────────────────────────────────────────────────────────────────────┐
│ Spring AMQP 架构 │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ Spring Application │ │
│ └──────────────────────────────┬──────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ Spring AMQP │ │
│ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │
│ │ │RabbitTemplate│ │MessageList- │ │ SimpleMess-│ │ │
│ │ │ │ │enerContainer│ │ageListener │ │ │
│ │ └─────────────┘ └─────────────┘ └─────────────┘ │ │
│ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │
│ │ │MessageConv- │ │ RabbitAdmin │ │ CachingConn-│ │ │
│ │ │erter │ │ │ │ ectionFactory│ │ │
│ │ └─────────────┘ └─────────────┘ └─────────────┘ │ │
│ └──────────────────────────────┬──────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ RabbitMQ Java Client │ │
│ └──────────────────────────────┬──────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ RabbitMQ Server │ │
│ └─────────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────┘1.2 核心组件
text
核心组件:
├── CachingConnectionFactory:连接工厂,支持连接缓存
├── RabbitAdmin:管理交换器、队列、绑定
├── RabbitTemplate:消息发送模板
├── SimpleMessageListenerContainer:消息监听容器
├── MessageConverter:消息转换器
└── RabbitListener:注解驱动的消费者1.3 依赖配置
xml
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
</dependency>
</dependencies>二、核心知识点
2.1 CachingConnectionFactory
2.1.1 基础配置
java
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMQConfig {
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory factory = new CachingConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
factory.setUsername("guest");
factory.setPassword("guest");
factory.setVirtualHost("/");
factory.setConnectionCacheSize(5);
factory.setChannelCacheSize(25);
factory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);
factory.setPublisherReturns(true);
return factory;
}
}2.1.2 配置属性说明
text
┌─────────────────────────────────────────────────────────────────────┐
│ CachingConnectionFactory 配置属性 │
├────────────────────────┬────────────────────────────────────────────┤
│ 属性 │ 说明 │
├────────────────────────┼────────────────────────────────────────────┤
│ host │ RabbitMQ 服务器地址 │
│ port │ RabbitMQ 服务器端口 │
│ username │ 用户名 │
│ password │ 密码 │
│ virtualHost │ 虚拟主机 │
├────────────────────────┼────────────────────────────────────────────┤
│ connectionCacheSize │ 连接缓存大小 │
│ channelCacheSize │ 通道缓存大小 │
├────────────────────────┼────────────────────────────────────────────┤
│ publisherConfirmType │ 发布确认类型 │
│ publisherReturns │ 是否启用发布返回 │
├────────────────────────┼────────────────────────────────────────────┤
│ connectionTimeout │ 连接超时时间 │
│ requestedHeartBeat │ 心跳间隔 │
└────────────────────────┴────────────────────────────────────────────┘2.2 RabbitAdmin
java
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitAdminConfig {
@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
RabbitAdmin admin = new RabbitAdmin(connectionFactory);
admin.setAutoStartup(true);
return admin;
}
@Bean
public Queue orderQueue() {
return QueueBuilder.durable("order.queue")
.withArgument("x-message-ttl", 86400000)
.withArgument("x-dead-letter-exchange", "dlx.exchange")
.build();
}
@Bean
public DirectExchange orderExchange() {
return ExchangeBuilder.directExchange("order.exchange")
.durable(true)
.build();
}
@Bean
public Binding orderBinding() {
return BindingBuilder.bind(orderQueue())
.to(orderExchange())
.with("order.created");
}
}2.3 RabbitTemplate
2.3.1 基础使用
java
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
@Component
public class MessageProducer {
private final RabbitTemplate rabbitTemplate;
public MessageProducer(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}
public void sendSimpleMessage(String exchange, String routingKey, String message) {
rabbitTemplate.send(exchange, routingKey,
MessageBuilder.withBody(message.getBytes()).build());
}
public void convertAndSend(String exchange, String routingKey, Object message) {
rabbitTemplate.convertAndSend(exchange, routingKey, message);
}
public void convertAndSendWithCallback(String exchange, String routingKey,
Object message) {
rabbitTemplate.convertAndSend(exchange, routingKey, message, msg -> {
msg.getMessageProperties().setMessageId(java.util.UUID.randomUUID().toString());
msg.getMessageProperties().setContentType("application/json");
msg.getMessageProperties().setPriority(5);
return msg;
});
}
public Object receiveAndConvert(String queueName) {
return rabbitTemplate.receiveAndConvert(queueName);
}
public Object sendAndReceive(String exchange, String routingKey, Object message) {
return rabbitTemplate.convertSendAndReceive(exchange, routingKey, message);
}
}2.3.2 发布确认
java
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
import java.util.UUID;
@Component
public class ConfirmProducer implements RabbitTemplate.ConfirmCallback {
private final RabbitTemplate rabbitTemplate;
public ConfirmProducer(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
rabbitTemplate.setConfirmCallback(this);
}
public void sendWithConfirm(String exchange, String routingKey, Object message) {
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
rabbitTemplate.convertAndSend(exchange, routingKey, message, correlationData);
}
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (ack) {
System.out.println("消息确认成功: " + correlationData.getId());
} else {
System.err.println("消息确认失败: " + correlationData.getId() + ", 原因: " + cause);
}
}
}2.3.3 返回回调
java
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
@Component
public class ReturnProducer implements RabbitTemplate.ReturnsCallback {
private final RabbitTemplate rabbitTemplate;
public ReturnProducer(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
rabbitTemplate.setReturnsCallback(this);
rabbitTemplate.setMandatory(true);
}
@Override
public void returnedMessage(Returned returned) {
System.err.println("消息被退回:");
System.err.println(" 交换器: " + returned.getExchange());
System.err.println(" 路由键: " + returned.getRoutingKey());
System.err.println(" 回复码: " + returned.getReplyCode());
System.err.println(" 回复文本: " + returned.getReplyText());
System.err.println(" 消息: " + new String(returned.getMessage().getBody()));
}
}2.4 SimpleMessageListenerContainer
java
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class ListenerContainerConfig {
@Bean
public SimpleMessageListenerContainer listenerContainer(
ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames("order.queue", "payment.queue");
container.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
System.out.println("收到消息: " + new String(message.getBody()));
}
});
container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
container.setPrefetchCount(10);
container.setConcurrentConsumers(3);
container.setMaxConcurrentConsumers(10);
container.setAutoStartup(true);
return container;
}
}2.5 消息转换器
2.5.1 Jackson2JsonMessageConverter
java
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class MessageConverterConfig {
@Bean
public MessageConverter jsonMessageConverter() {
return new Jackson2JsonMessageConverter();
}
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory,
MessageConverter jsonMessageConverter) {
RabbitTemplate template = new RabbitTemplate(connectionFactory);
template.setMessageConverter(jsonMessageConverter);
return template;
}
}2.5.2 自定义消息转换器
java
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.support.converter.MessageConversionException;
import org.springframework.amqp.support.converter.MessageConverter;
public class CustomMessageConverter implements MessageConverter {
@Override
public Message toMessage(Object object, MessageProperties messageProperties)
throws MessageConversionException {
if (object instanceof String) {
messageProperties.setContentType("text/plain");
return new Message(((String) object).getBytes(), messageProperties);
}
if (object instanceof byte[]) {
messageProperties.setContentType("application/octet-stream");
return new Message((byte[]) object, messageProperties);
}
throw new MessageConversionException("不支持的对象类型: " + object.getClass());
}
@Override
public Object fromMessage(Message message) throws MessageConversionException {
String contentType = message.getMessageProperties().getContentType();
if ("text/plain".equals(contentType)) {
return new String(message.getBody());
}
if ("application/octet-stream".equals(contentType)) {
return message.getBody();
}
return message.getBody();
}
}三、代码示例
3.1 完整配置示例
java
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class CompleteRabbitMQConfig {
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory factory = new CachingConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
factory.setUsername("guest");
factory.setPassword("guest");
factory.setVirtualHost("/");
factory.setConnectionCacheSize(5);
factory.setChannelCacheSize(25);
factory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);
factory.setPublisherReturns(true);
return factory;
}
@Bean
public MessageConverter jsonMessageConverter() {
return new Jackson2JsonMessageConverter();
}
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory,
MessageConverter jsonMessageConverter) {
RabbitTemplate template = new RabbitTemplate(connectionFactory);
template.setMessageConverter(jsonMessageConverter);
template.setMandatory(true);
return template;
}
@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
return new RabbitAdmin(connectionFactory);
}
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(
ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setMessageConverter(jsonMessageConverter());
factory.setConcurrentConsumers(3);
factory.setMaxConcurrentConsumers(10);
factory.setPrefetchCount(10);
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
return factory;
}
@Bean
public Queue orderQueue() {
return QueueBuilder.durable("order.queue")
.withArgument("x-message-ttl", 86400000)
.withArgument("x-dead-letter-exchange", "dlx.exchange")
.build();
}
@Bean
public DirectExchange orderExchange() {
return new DirectExchange("order.exchange", true, false);
}
@Bean
public Binding orderBinding() {
return BindingBuilder.bind(orderQueue())
.to(orderExchange())
.with("order.created");
}
}3.2 消息生产者服务
java
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Service;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@Service
public class MessageProducerService {
private final RabbitTemplate rabbitTemplate;
private final ConcurrentMap<String, MessageStatus> pendingMessages = new ConcurrentHashMap<>();
public MessageProducerService(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
if (correlationData != null) {
String id = correlationData.getId();
MessageStatus status = pendingMessages.remove(id);
if (ack) {
System.out.println("消息确认成功: " + id);
if (status != null) {
status.setConfirmed(true);
}
} else {
System.err.println("消息确认失败: " + id + ", 原因: " + cause);
if (status != null) {
status.setError(cause);
}
}
}
});
rabbitTemplate.setReturnsCallback(returned -> {
System.err.println("消息被退回: " + returned.getReplyText());
});
rabbitTemplate.setMandatory(true);
}
public void sendOrderMessage(Order order) {
String messageId = UUID.randomUUID().toString();
CorrelationData correlationData = new CorrelationData(messageId);
pendingMessages.put(messageId, new MessageStatus(messageId));
rabbitTemplate.convertAndSend(
"order.exchange",
"order.created",
order,
message -> {
message.getMessageProperties().setMessageId(messageId);
message.getMessageProperties().setContentType("application/json");
message.getMessageProperties().setPriority(5);
return message;
},
correlationData
);
System.out.println("订单消息已发送: " + order.getOrderId());
}
public void sendWithRetry(String exchange, String routingKey, Object message,
int maxRetries) {
int attempt = 0;
Exception lastException = null;
while (attempt < maxRetries) {
try {
rabbitTemplate.convertAndSend(exchange, routingKey, message);
return;
} catch (Exception e) {
lastException = e;
attempt++;
if (attempt < maxRetries) {
try {
Thread.sleep(1000 * attempt);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
break;
}
}
}
}
throw new RuntimeException("发送失败,重试次数耗尽", lastException);
}
}
class MessageStatus {
private final String messageId;
private volatile boolean confirmed = false;
private volatile String error;
public MessageStatus(String messageId) {
this.messageId = messageId;
}
public String getMessageId() { return messageId; }
public boolean isConfirmed() { return confirmed; }
public void setConfirmed(boolean confirmed) { this.confirmed = confirmed; }
public String getError() { return error; }
public void setError(String error) { this.error = error; }
}
class Order {
private String orderId;
private String userId;
private double amount;
public String getOrderId() { return orderId; }
public void setOrderId(String orderId) { this.orderId = orderId; }
public String getUserId() { return userId; }
public void setUserId(String userId) { this.userId = userId; }
public double getAmount() { return amount; }
public void setAmount(double amount) { this.amount = amount; }
}3.3 消息消费者服务
java
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
import com.rabbitmq.client.Channel;
import java.io.IOException;
@Service
public class MessageConsumerService {
@RabbitListener(queues = "order.queue")
public void handleOrderMessage(Order order, Message message, Channel channel)
throws IOException {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
System.out.println("收到订单消息: " + order.getOrderId());
processOrder(order);
channel.basicAck(deliveryTag, false);
System.out.println("订单处理完成: " + order.getOrderId());
} catch (RetryableException e) {
System.err.println("可重试异常: " + e.getMessage());
channel.basicNack(deliveryTag, false, true);
} catch (Exception e) {
System.err.println("处理失败: " + e.getMessage());
channel.basicNack(deliveryTag, false, false);
}
}
private void processOrder(Order order) throws Exception {
System.out.println("处理订单: " + order.getOrderId());
}
}
class RetryableException extends Exception {
public RetryableException(String message) {
super(message);
}
}四、实际应用场景
4.1 订单处理系统
java
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
@Service
public class OrderProcessingService {
private final RabbitTemplate rabbitTemplate;
public OrderProcessingService(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}
public void publishOrderCreated(Order order) {
rabbitTemplate.convertAndSend("order.exchange", "order.created", order);
}
public void publishOrderPaid(Order order) {
rabbitTemplate.convertAndSend("order.exchange", "order.paid", order);
}
public void publishOrderShipped(Order order) {
rabbitTemplate.convertAndSend("order.exchange", "order.shipped", order);
}
@RabbitListener(queues = "order.queue")
@Transactional
public void handleOrderEvent(OrderEvent event, Message message,
Channel channel) throws IOException {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
switch (event.getEventType()) {
case "order.created":
handleOrderCreated(event.getOrder());
break;
case "order.paid":
handleOrderPaid(event.getOrder());
break;
case "order.shipped":
handleOrderShipped(event.getOrder());
break;
}
channel.basicAck(deliveryTag, false);
} catch (Exception e) {
channel.basicNack(deliveryTag, false, false);
}
}
private void handleOrderCreated(Order order) {
System.out.println("处理订单创建: " + order.getOrderId());
}
private void handleOrderPaid(Order order) {
System.out.println("处理订单支付: " + order.getOrderId());
}
private void handleOrderShipped(Order order) {
System.out.println("处理订单发货: " + order.getOrderId());
}
}
class OrderEvent {
private String eventType;
private Order order;
public String getEventType() { return eventType; }
public void setEventType(String eventType) { this.eventType = eventType; }
public Order getOrder() { return order; }
public void setOrder(Order order) { this.order = order; }
}4.2 日志收集系统
java
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Service;
import java.time.LocalDateTime;
@Service
public class LogCollectionService {
private final RabbitTemplate rabbitTemplate;
public LogCollectionService(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}
public void log(String service, String level, String message) {
LogEntry entry = new LogEntry();
entry.setService(service);
entry.setLevel(level);
entry.setMessage(message);
entry.setTimestamp(LocalDateTime.now());
String routingKey = service + "." + level.toLowerCase();
rabbitTemplate.convertAndSend("logs.exchange", routingKey, entry);
}
public void logError(String service, String message, Throwable throwable) {
LogEntry entry = new LogEntry();
entry.setService(service);
entry.setLevel("ERROR");
entry.setMessage(message + "\n" + getStackTrace(throwable));
entry.setTimestamp(LocalDateTime.now());
rabbitTemplate.convertAndSend("logs.exchange", service + ".error", entry);
}
@RabbitListener(queues = "logs.error.queue")
public void handleErrorLogs(LogEntry entry) {
System.err.println("[ERROR] " + entry.getService() + ": " + entry.getMessage());
sendAlert(entry);
}
@RabbitListener(queues = "logs.all.queue")
public void handleAllLogs(LogEntry entry) {
System.out.println("[" + entry.getLevel() + "] " +
entry.getService() + ": " + entry.getMessage());
}
private void sendAlert(LogEntry entry) {
System.err.println("发送告警: " + entry.getMessage());
}
private String getStackTrace(Throwable throwable) {
StringBuilder sb = new StringBuilder();
for (StackTraceElement element : throwable.getStackTrace()) {
sb.append(element).append("\n");
}
return sb.toString();
}
}
class LogEntry {
private String service;
private String level;
private String message;
private LocalDateTime timestamp;
public String getService() { return service; }
public void setService(String service) { this.service = service; }
public String getLevel() { return level; }
public void setLevel(String level) { this.level = level; }
public String getMessage() { return message; }
public void setMessage(String message) { this.message = message; }
public LocalDateTime getTimestamp() { return timestamp; }
public void setTimestamp(LocalDateTime timestamp) { this.timestamp = timestamp; }
}五、常见问题与解决方案
5.1 消息序列化失败
问题描述: 消息序列化时抛出异常。
解决方案:
java
@Bean
public MessageConverter jsonMessageConverter() {
ObjectMapper mapper = new ObjectMapper();
mapper.registerModule(new JavaTimeModule());
mapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
return new Jackson2JsonMessageConverter(mapper);
}5.2 消费者并发问题
问题描述: 消费者并发处理导致数据竞争。
解决方案:
java
@RabbitListener(queues = "order.queue", concurrency = "3-10")
public void handleMessage(Order order) {
processOrder(order);
}5.3 消息确认超时
问题描述: 消息确认超时导致消息重新投递。
解决方案:
properties
spring.rabbitmq.listener.simple.acknowledge-mode=manual
spring.rabbitmq.listener.simple.prefetch=10六、最佳实践建议
6.1 配置建议
text
配置建议:
├── 启用发布确认确保消息可靠
├── 设置合理的预取数量
├── 配置适当的并发消费者数量
├── 使用 JSON 消息转换器
└── 启用连接缓存6.2 性能优化
text
性能建议:
├── 批量发送消息
├── 异步确认模式
├── 合理设置并发消费者
├── 使用缓存连接工厂
└── 优化消息大小6.3 可靠性保障
text
可靠性建议:
├── 使用手动确认模式
├── 实现消息重试机制
├── 配置死信队列
├── 启用消息持久化
└── 监控消息处理状态