Skip to content

消息可靠性投递

一、概述

消息可靠性投递是消息系统中至关重要的环节,确保消息从生产者到消费者的整个链路中不丢失。本文档详细介绍 Spring AMQP 中实现消息可靠性投递的各种机制。

1.1 可靠性链路

┌─────────────────────────────────────────────────────────────────────┐
│                     消息可靠性链路                                   │
├─────────────────────────────────────────────────────────────────────┤
│                                                                     │
│  ┌──────────┐     ┌──────────┐     ┌──────────┐     ┌──────────┐  │
│  │  生产者   │ ──► │  Broker  │ ──► │   队列    │ ──► │  消费者   │  │
│  │ Producer │     │ Exchange │     │  Queue   │     │ Consumer │  │
│  └────┬─────┘     └────┬─────┘     └────┬─────┘     └────┬─────┘  │
│       │                │                │                │        │
│       ▼                ▼                ▼                ▼        │
│  ┌──────────┐     ┌──────────┐     ┌──────────┐     ┌──────────┐  │
│  │发布者确认 │     │消息持久化│     │队列持久化│     │消费确认   │  │
│  │Publisher │     │Message   │     │Queue     │     │Consumer  │  │
│  │Confirm   │     │Persist   │     │Persist   │     │Ack       │  │
│  └──────────┘     └──────────┘     └──────────┘     └──────────┘  │
│                                                                     │
│  可靠性保障:                                                       │
│  ├── 生产者:发布确认、返回回调                                     │
│  ├── Broker:消息持久化、队列持久化                                 │
│  └── 消费者:手动确认、重试机制                                     │
│                                                                     │
└─────────────────────────────────────────────────────────────────────┘

1.2 可靠性等级

text
┌─────────────────────────────────────────────────────────────────────┐
│                     消息可靠性等级                                   │
├───────────────┬─────────────────────────────────────────────────────┤
│     等级      │                    保障措施                          │
├───────────────┼─────────────────────────────────────────────────────┤
│  基础可靠     │ 消息持久化 + 手动确认                               │
├───────────────┼─────────────────────────────────────────────────────┤
│  标准可靠     │ 消息持久化 + 发布确认 + 手动确认                    │
├───────────────┼─────────────────────────────────────────────────────┤
│  高可靠       │ 消息持久化 + 发布确认 + 手动确认 + 死信队列         │
├───────────────┼─────────────────────────────────────────────────────┤
│  极高可靠     │ 高可靠 + 消息追踪 + 监控告警 + 备份交换器           │
└───────────────┴─────────────────────────────────────────────────────┘

二、核心知识点

2.1 发布者确认

2.1.1 配置发布确认

java
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class PublisherConfirmConfig {
    
    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory factory = new CachingConnectionFactory();
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setUsername("guest");
        factory.setPassword("guest");
        
        factory.setPublisherConfirmType(
            CachingConnectionFactory.ConfirmType.CORRELATED);
        
        return factory;
    }
    
    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate template = new RabbitTemplate(connectionFactory);
        
        template.setConfirmCallback((correlationData, ack, cause) -> {
            if (ack) {
                System.out.println("消息确认成功");
            } else {
                System.err.println("消息确认失败: " + cause);
            }
        });
        
        return template;
    }
}

2.1.2 确认类型

text
┌─────────────────────────────────────────────────────────────────────┐
│                     发布确认类型                                     │
├───────────────────────┬─────────────────────────────────────────────┤
│         类型          │                    说明                      │
├───────────────────────┼─────────────────────────────────────────────┤
│ NONE                  │ 禁用发布确认                                │
├───────────────────────┼─────────────────────────────────────────────┤
│ SIMPLE                │ 同步确认,使用 waitForConfirms()            │
├───────────────────────┼─────────────────────────────────────────────┤
│ CORRELATED            │ 异步确认,使用回调函数                      │
│                       │ 推荐使用                                    │
└───────────────────────┴─────────────────────────────────────────────┘

2.1.3 完整确认示例

java
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CompletableFuture;

@Component
public class ReliablePublisher {
    
    private final RabbitTemplate rabbitTemplate;
    private final ConcurrentMap<String, PendingMessage> pendingMessages = 
        new ConcurrentHashMap<>();
    
    public ReliablePublisher(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
        
        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
            if (correlationData != null) {
                String messageId = correlationData.getId();
                PendingMessage pending = pendingMessages.remove(messageId);
                
                if (ack) {
                    System.out.println("消息确认成功: " + messageId);
                    if (pending != null) {
                        pending.getFuture().complete(true);
                    }
                } else {
                    System.err.println("消息确认失败: " + messageId + ", 原因: " + cause);
                    if (pending != null) {
                        pending.getFuture().completeExceptionally(
                            new RuntimeException(cause));
                        handleConfirmFailure(pending, cause);
                    }
                }
            }
        });
    }
    
    public CompletableFuture<Boolean> sendReliable(String exchange, 
                                                   String routingKey, 
                                                   Object message) {
        String messageId = UUID.randomUUID().toString();
        CorrelationData correlationData = new CorrelationData(messageId);
        
        CompletableFuture<Boolean> future = new CompletableFuture<>();
        pendingMessages.put(messageId, new PendingMessage(messageId, message, future));
        
        rabbitTemplate.convertAndSend(exchange, routingKey, message, msg -> {
            msg.getMessageProperties().setMessageId(messageId);
            msg.getMessageProperties().setDeliveryMode(
                org.springframework.amqp.core.MessageDeliveryMode.PERSISTENT);
            return msg;
        }, correlationData);
        
        return future;
    }
    
    private void handleConfirmFailure(PendingMessage pending, String cause) {
        System.err.println("处理确认失败: " + pending.getMessageId());
        storeFailedMessage(pending);
    }
    
    private void storeFailedMessage(PendingMessage pending) {
        System.out.println("存储失败消息以供重试");
    }
}

class PendingMessage {
    private final String messageId;
    private final Object message;
    private final CompletableFuture<Boolean> future;
    
    public PendingMessage(String messageId, Object message, 
                         CompletableFuture<Boolean> future) {
        this.messageId = messageId;
        this.message = message;
        this.future = future;
    }
    
    public String getMessageId() { return messageId; }
    public Object getMessage() { return message; }
    public CompletableFuture<Boolean> getFuture() { return future; }
}

2.2 返回回调

java
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;

@Component
public class ReturnCallbackHandler {
    
    private final RabbitTemplate rabbitTemplate;
    
    public ReturnCallbackHandler(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
    }
    
    @PostConstruct
    public void init() {
        rabbitTemplate.setReturnsCallback(returned -> {
            System.err.println("=== 消息被退回 ===");
            System.err.println("交换器: " + returned.getExchange());
            System.err.println("路由键: " + returned.getRoutingKey());
            System.err.println("回复码: " + returned.getReplyCode());
            System.err.println("回复文本: " + returned.getReplyText());
            
            handleReturnedMessage(returned);
        });
        
        rabbitTemplate.setMandatory(true);
    }
    
    private void handleReturnedMessage(
            RabbitTemplate.Returned returned) {
        
        String messageId = returned.getMessage().getMessageProperties().getMessageId();
        
        switch (returned.getReplyCode()) {
            case 312:
                System.err.println("消息无法路由,检查绑定关系");
                break;
            case 313:
                System.err.println("没有消费者");
                break;
            default:
                System.err.println("未知错误: " + returned.getReplyText());
        }
        
        storeReturnedMessage(returned);
    }
    
    private void storeReturnedMessage(RabbitTemplate.Returned returned) {
        System.out.println("存储退回消息以供后续处理");
    }
}

2.3 消息持久化

2.3.1 队列持久化

java
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class DurableQueueConfig {
    
    @Bean
    public Queue durableQueue() {
        return QueueBuilder.durable("durable.queue")
            .withArgument("x-message-ttl", 86400000)
            .withArgument("x-dead-letter-exchange", "dlx.exchange")
            .build();
    }
    
    @Bean
    public DirectExchange durableExchange() {
        return ExchangeBuilder.directExchange("durable.exchange")
            .durable(true)
            .build();
    }
    
    @Bean
    public Binding durableBinding() {
        return BindingBuilder.bind(durableQueue())
            .to(durableExchange())
            .with("durable.key");
    }
}

2.3.2 消息持久化

java
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;

@Component
public class PersistentMessageSender {
    
    private final RabbitTemplate rabbitTemplate;
    
    public PersistentMessageSender(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
    }
    
    public void sendPersistent(String exchange, String routingKey, Object message) {
        rabbitTemplate.convertAndSend(exchange, routingKey, message, msg -> {
            msg.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
            return msg;
        });
    }
    
    public void sendWithAllProperties(String exchange, String routingKey, 
                                      Object message, String messageId) {
        Message amqpMessage = MessageBuilder.withBody(message.toString().getBytes())
            .setDeliveryMode(MessageDeliveryMode.PERSISTENT)
            .setMessageId(messageId)
            .setContentType("application/json")
            .setPriority(5)
            .build();
        
        rabbitTemplate.send(exchange, routingKey, amqpMessage);
    }
}

2.4 消费者确认

java
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import com.rabbitmq.client.Channel;
import java.io.IOException;

@Component
public class ReliableConsumer {
    
    @RabbitListener(queues = "reliable.queue", ackMode = "MANUAL")
    public void handleReliableMessage(Message message, Channel channel) 
            throws IOException {
        
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        String messageId = message.getMessageProperties().getMessageId();
        
        try {
            String body = new String(message.getBody());
            System.out.println("处理消息: " + messageId);
            
            processMessage(body);
            
            channel.basicAck(deliveryTag, false);
            System.out.println("消息确认成功: " + messageId);
            
        } catch (RetryableException e) {
            System.err.println("可重试异常: " + e.getMessage());
            channel.basicNack(deliveryTag, false, true);
            
        } catch (NonRetryableException e) {
            System.err.println("不可重试异常: " + e.getMessage());
            channel.basicNack(deliveryTag, false, false);
            
        } catch (Exception e) {
            System.err.println("未知异常: " + e.getMessage());
            channel.basicNack(deliveryTag, false, false);
        }
    }
    
    private void processMessage(String body) throws Exception {
        System.out.println("处理: " + body);
    }
}

class RetryableException extends Exception {
    public RetryableException(String message) { super(message); }
}

class NonRetryableException extends Exception {
    public NonRetryableException(String message) { super(message); }
}

2.5 死信队列

java
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class DeadLetterQueueConfig {
    
    @Bean
    public DirectExchange dlxExchange() {
        return new DirectExchange("dlx.exchange", true, false);
    }
    
    @Bean
    public Queue dlqQueue() {
        return QueueBuilder.durable("dlq.queue")
            .withArgument("x-message-ttl", 604800000)
            .build();
    }
    
    @Bean
    public Binding dlqBinding() {
        return BindingBuilder.bind(dlqQueue())
            .to(dlxExchange())
            .with("dead.letter");
    }
    
    @Bean
    public Queue businessQueue() {
        return QueueBuilder.durable("business.queue")
            .withArgument("x-dead-letter-exchange", "dlx.exchange")
            .withArgument("x-dead-letter-routing-key", "dead.letter")
            .withArgument("x-message-ttl", 300000)
            .build();
    }
    
    @Bean
    public DirectExchange businessExchange() {
        return new DirectExchange("business.exchange", true, false);
    }
    
    @Bean
    public Binding businessBinding() {
        return BindingBuilder.bind(businessQueue())
            .to(businessExchange())
            .with("business.key");
    }
}

2.6 备份交换器

java
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class AlternateExchangeConfig {
    
    @Bean
    public FanoutExchange backupExchange() {
        return new FanoutExchange("backup.exchange", true, false);
    }
    
    @Bean
    public Queue backupQueue() {
        return new Queue("backup.queue", true);
    }
    
    @Bean
    public Binding backupBinding() {
        return BindingBuilder.bind(backupQueue())
            .to(backupExchange());
    }
    
    @Bean
    public DirectExchange mainExchange() {
        return ExchangeBuilder.directExchange("main.exchange")
            .durable(true)
            .withArgument("alternate-exchange", "backup.exchange")
            .build();
    }
    
    @Bean
    public Queue mainQueue() {
        return new Queue("main.queue", true);
    }
    
    @Bean
    public Binding mainBinding() {
        return BindingBuilder.bind(mainQueue())
            .to(mainExchange())
            .with("main.key");
    }
}

三、代码示例

3.1 完整可靠性配置

java
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class CompleteReliableConfig {
    
    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory factory = new CachingConnectionFactory();
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setUsername("guest");
        factory.setPassword("guest");
        
        factory.setPublisherConfirmType(
            CachingConnectionFactory.ConfirmType.CORRELATED);
        factory.setPublisherReturns(true);
        
        return factory;
    }
    
    @Bean
    public MessageConverter jsonMessageConverter() {
        return new Jackson2JsonMessageConverter();
    }
    
    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory,
                                         MessageConverter jsonMessageConverter) {
        RabbitTemplate template = new RabbitTemplate(connectionFactory);
        template.setMessageConverter(jsonMessageConverter);
        
        template.setMandatory(true);
        
        template.setConfirmCallback((correlationData, ack, cause) -> {
            if (!ack) {
                System.err.println("消息确认失败: " + cause);
            }
        });
        
        template.setReturnsCallback(returned -> {
            System.err.println("消息被退回: " + returned.getReplyText());
        });
        
        return template;
    }
    
    @Bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(
            ConnectionFactory connectionFactory,
            MessageConverter jsonMessageConverter) {
        
        SimpleRabbitListenerContainerFactory factory = 
            new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setMessageConverter(jsonMessageConverter);
        factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        factory.setPrefetchCount(10);
        factory.setDefaultRequeueRejected(false);
        
        return factory;
    }
    
    @Bean
    public DirectExchange dlxExchange() {
        return new DirectExchange("dlx.exchange", true, false);
    }
    
    @Bean
    public Queue dlqQueue() {
        return new Queue("dlq.queue", true);
    }
    
    @Bean
    public Binding dlqBinding() {
        return BindingBuilder.bind(dlqQueue())
            .to(dlxExchange())
            .with("dead.letter");
    }
    
    @Bean
    public Queue reliableQueue() {
        return QueueBuilder.durable("reliable.queue")
            .withArgument("x-dead-letter-exchange", "dlx.exchange")
            .withArgument("x-dead-letter-routing-key", "dead.letter")
            .build();
    }
    
    @Bean
    public DirectExchange reliableExchange() {
        return new DirectExchange("reliable.exchange", true, false);
    }
    
    @Bean
    public Binding reliableBinding() {
        return BindingBuilder.bind(reliableQueue())
            .to(reliableExchange())
            .with("reliable.key");
    }
}

3.2 可靠消息服务

java
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Service;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;

@Service
public class ReliableMessageService {
    
    private final RabbitTemplate rabbitTemplate;
    private final ConcurrentMap<String, MessageRecord> messageRecords = 
        new ConcurrentHashMap<>();
    
    public ReliableMessageService(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
        setupCallbacks();
    }
    
    private void setupCallbacks() {
        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
            if (correlationData != null) {
                String messageId = correlationData.getId();
                MessageRecord record = messageRecords.get(messageId);
                
                if (record != null) {
                    if (ack) {
                        record.setStatus(MessageStatus.CONFIRMED);
                        record.getFuture().complete(true);
                    } else {
                        record.setStatus(MessageStatus.FAILED);
                        record.setError(cause);
                        record.getFuture().complete(false);
                    }
                }
            }
        });
        
        rabbitTemplate.setReturnsCallback(returned -> {
            String messageId = returned.getMessage()
                .getMessageProperties().getMessageId();
            MessageRecord record = messageRecords.get(messageId);
            
            if (record != null) {
                record.setStatus(MessageStatus.RETURNED);
                record.setError(returned.getReplyText());
                record.getFuture().complete(false);
            }
        });
    }
    
    public CompletableFuture<Boolean> sendReliable(String exchange, 
                                                   String routingKey, 
                                                   Object payload) {
        String messageId = UUID.randomUUID().toString();
        CorrelationData correlationData = new CorrelationData(messageId);
        
        CompletableFuture<Boolean> future = new CompletableFuture<>();
        
        MessageRecord record = new MessageRecord();
        record.setMessageId(messageId);
        record.setPayload(payload);
        record.setExchange(exchange);
        record.setRoutingKey(routingKey);
        record.setStatus(MessageStatus.PENDING);
        record.setFuture(future);
        record.setCreateTime(System.currentTimeMillis());
        
        messageRecords.put(messageId, record);
        
        rabbitTemplate.convertAndSend(exchange, routingKey, payload, message -> {
            message.getMessageProperties().setMessageId(messageId);
            message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
            return message;
        }, correlationData);
        
        return future.orTimeout(30, TimeUnit.SECONDS);
    }
    
    public MessageRecord getMessageRecord(String messageId) {
        return messageRecords.get(messageId);
    }
    
    public void retryFailedMessages() {
        messageRecords.values().stream()
            .filter(r -> r.getStatus() == MessageStatus.FAILED)
            .forEach(r -> {
                sendReliable(r.getExchange(), r.getRoutingKey(), r.getPayload());
            });
    }
}

enum MessageStatus {
    PENDING, CONFIRMED, FAILED, RETURNED
}

class MessageRecord {
    private String messageId;
    private Object payload;
    private String exchange;
    private String routingKey;
    private MessageStatus status;
    private String error;
    private CompletableFuture<Boolean> future;
    private long createTime;
    
    public String getMessageId() { return messageId; }
    public void setMessageId(String messageId) { this.messageId = messageId; }
    public Object getPayload() { return payload; }
    public void setPayload(Object payload) { this.payload = payload; }
    public String getExchange() { return exchange; }
    public void setExchange(String exchange) { this.exchange = exchange; }
    public String getRoutingKey() { return routingKey; }
    public void setRoutingKey(String routingKey) { this.routingKey = routingKey; }
    public MessageStatus getStatus() { return status; }
    public void setStatus(MessageStatus status) { this.status = status; }
    public String getError() { return error; }
    public void setError(String error) { this.error = error; }
    public CompletableFuture<Boolean> getFuture() { return future; }
    public void setFuture(CompletableFuture<Boolean> future) { this.future = future; }
    public long getCreateTime() { return createTime; }
    public void setCreateTime(long createTime) { this.createTime = createTime; }
}

四、实际应用场景

4.1 订单系统可靠性投递

java
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Service;
import java.util.UUID;

@Service
public class OrderReliableService {
    
    private final RabbitTemplate rabbitTemplate;
    private final ReliableMessageService reliableMessageService;
    
    public OrderReliableService(RabbitTemplate rabbitTemplate,
                                ReliableMessageService reliableMessageService) {
        this.rabbitTemplate = rabbitTemplate;
        this.reliableMessageService = reliableMessageService;
    }
    
    public void publishOrderCreated(Order order) {
        String messageId = UUID.randomUUID().toString();
        
        reliableMessageService.sendReliable(
            "order.exchange",
            "order.created",
            order
        ).thenAccept(success -> {
            if (success) {
                System.out.println("订单创建消息发送成功: " + order.getOrderId());
            } else {
                System.err.println("订单创建消息发送失败: " + order.getOrderId());
                handleSendFailure(order);
            }
        });
    }
    
    private void handleSendFailure(Order order) {
        System.out.println("处理发送失败: " + order.getOrderId());
    }
}

4.2 支付系统可靠性投递

java
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
import com.rabbitmq.client.Channel;
import java.io.IOException;

@Service
public class PaymentReliableService {
    
    @RabbitListener(queues = "payment.queue", ackMode = "MANUAL")
    public void handlePayment(Payment payment, Message message, Channel channel) 
            throws IOException {
        
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        String messageId = message.getMessageProperties().getMessageId();
        
        try {
            System.out.println("处理支付: " + payment.getPaymentId());
            
            processPayment(payment);
            
            channel.basicAck(deliveryTag, false);
            
            System.out.println("支付处理成功: " + messageId);
            
        } catch (Exception e) {
            System.err.println("支付处理失败: " + e.getMessage());
            
            int retryCount = getRetryCount(message);
            
            if (retryCount < 3) {
                channel.basicNack(deliveryTag, false, true);
            } else {
                channel.basicNack(deliveryTag, false, false);
                sendToDlq(payment, e);
            }
        }
    }
    
    private void processPayment(Payment payment) throws Exception {
        System.out.println("处理支付逻辑");
    }
    
    private int getRetryCount(Message message) {
        Object retryCount = message.getMessageProperties()
            .getHeaders().get("x-retry-count");
        return retryCount != null ? (int) retryCount : 0;
    }
    
    private void sendToDlq(Payment payment, Exception e) {
        System.out.println("发送到死信队列: " + payment.getPaymentId());
    }
}

五、常见问题与解决方案

5.1 消息丢失

问题描述: 消息在传输过程中丢失。

解决方案

java
factory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);
factory.setPublisherReturns(true);
template.setMandatory(true);

5.2 消息重复

问题描述: 消息被重复消费。

解决方案

java
@RabbitListener(queues = "order.queue")
public void handleOrder(Order order) {
    if (isProcessed(order.getOrderId())) {
        return;
    }
    processOrder(order);
    markAsProcessed(order.getOrderId());
}

5.3 消息顺序

问题描述: 消息消费顺序错乱。

解决方案

java
@RabbitListener(queues = "ordered.queue", concurrency = "1")
public void handleOrdered(Message message) {
    processMessage(message);
}

六、最佳实践建议

6.1 生产者建议

text
生产者建议:
├── 启用发布确认
├── 设置 mandatory 标志
├── 消息持久化
├── 实现重试机制
└── 记录消息发送状态

6.2 消费者建议

text
消费者建议:
├── 使用手动确认模式
├── 实现幂等性处理
├── 配置合理的重试次数
├── 使用死信队列
└── 监控消费状态

6.3 架构建议

text
架构建议:
├── 使用备份交换器
├── 配置死信队列
├── 实现消息追踪
├── 监控告警
└── 定期检查失败消息

七、相关链接