Skip to content

RabbitTemplate

一、概述

RabbitTemplate 是 Spring AMQP 提供的核心类,用于简化消息的发送和接收操作。它提供了丰富的 API 来处理各种消息场景,包括同步发送、异步发送、消息转换、确认回调等功能。

1.1 RabbitTemplate 架构

┌─────────────────────────────────────────────────────────────────────┐
│                    RabbitTemplate 架构                               │
├─────────────────────────────────────────────────────────────────────┤
│                                                                     │
│  ┌─────────────────────────────────────────────────────────────┐   │
│  │                    Application Layer                         │   │
│  │                         (业务代码)                            │   │
│  └──────────────────────────────┬──────────────────────────────┘   │
│                                 │                                   │
│                                 ▼                                   │
│  ┌─────────────────────────────────────────────────────────────┐   │
│  │                     RabbitTemplate                           │   │
│  │  ┌─────────────┐ ┌─────────────┐ ┌─────────────┐           │   │
│  │  │ send()      │ │convertAndS- │ │receive()    │           │   │
│  │  │             │ │end()        │ │             │           │   │
│  │  └─────────────┘ └─────────────┘ └─────────────┘           │   │
│  │  ┌─────────────┐ ┌─────────────┐ ┌─────────────┐           │   │
│  │  │ MessageConv-│ │ ConfirmCall-│ │ ReturnsCall-│           │   │
│  │  │ erter       │ │ back        │ │ back        │           │   │
│  │  └─────────────┘ └─────────────┘ └─────────────┘           │   │
│  └──────────────────────────────┬──────────────────────────────┘   │
│                                 │                                   │
│                                 ▼                                   │
│  ┌─────────────────────────────────────────────────────────────┐   │
│  │                   ConnectionFactory                          │   │
│  └──────────────────────────────┬──────────────────────────────┘   │
│                                 │                                   │
│                                 ▼                                   │
│  ┌─────────────────────────────────────────────────────────────┐   │
│  │                     RabbitMQ Server                          │   │
│  └─────────────────────────────────────────────────────────────┘   │
│                                                                     │
└─────────────────────────────────────────────────────────────────────┘

1.2 核心功能

text
核心功能:
├── 消息发送:send、convertAndSend
├── 消息接收:receive、receiveAndConvert
├── RPC 通信:convertSendAndReceive
├── 消息转换:MessageConverter
├── 确认回调:ConfirmCallback
├── 返回回调:ReturnsCallback
└── 重试机制:RetryTemplate

二、核心知识点

2.1 消息发送

2.1.1 基础发送方法

java
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;

@Component
public class MessageSender {
    
    private final RabbitTemplate rabbitTemplate;
    
    public MessageSender(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
    }
    
    public void sendRawMessage(String exchange, String routingKey, byte[] body) {
        Message message = MessageBuilder.withBody(body).build();
        rabbitTemplate.send(exchange, routingKey, message);
    }
    
    public void sendMessageWithProperties(String exchange, String routingKey, 
                                          String body, String contentType) {
        Message message = MessageBuilder.withBody(body.getBytes())
            .setContentType(contentType)
            .setMessageId(java.util.UUID.randomUUID().toString())
            .setPriority(5)
            .build();
        
        rabbitTemplate.send(exchange, routingKey, message);
    }
    
    public void convertAndSend(String exchange, String routingKey, Object message) {
        rabbitTemplate.convertAndSend(exchange, routingKey, message);
    }
    
    public void convertAndSendWithProcessor(String exchange, String routingKey, 
                                            Object message) {
        rabbitTemplate.convertAndSend(exchange, routingKey, message, msg -> {
            msg.getMessageProperties().setMessageId(java.util.UUID.randomUUID().toString());
            msg.getMessageProperties().setPriority(5);
            msg.getMessageProperties().setHeader("X-Custom-Header", "value");
            return msg;
        });
    }
}

2.1.2 发送方法对比

text
┌─────────────────────────────────────────────────────────────────────┐
│                     发送方法对比                                     │
├──────────────────────┬──────────────────────────────────────────────┤
│         方法         │                    说明                       │
├──────────────────────┼──────────────────────────────────────────────┤
│  send()              │ 发送原始 Message 对象                         │
│                      │ 需要手动构建消息                              │
├──────────────────────┼──────────────────────────────────────────────┤
│  convertAndSend()    │ 发送对象,自动转换                            │
│                      │ 使用 MessageConverter 转换                   │
├──────────────────────┼──────────────────────────────────────────────┤
│  convertSendAndRecei-│ 发送并等待响应                                │
│  ve()                │ RPC 模式                                      │
├──────────────────────┼──────────────────────────────────────────────┤
│  sendAndReceive()    │ 发送原始消息并等待响应                        │
└──────────────────────┴──────────────────────────────────────────────┘

2.2 消息接收

2.2.1 接收方法

java
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;

@Component
public class MessageReceiver {
    
    private final RabbitTemplate rabbitTemplate;
    
    public MessageReceiver(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
    }
    
    public Message receiveRaw(String queueName) {
        return rabbitTemplate.receive(queueName);
    }
    
    public Message receiveWithTimeout(String queueName, long timeoutMillis) {
        return rabbitTemplate.receive(queueName, timeoutMillis);
    }
    
    public Object receiveAndConvert(String queueName) {
        return rabbitTemplate.receiveAndConvert(queueName);
    }
    
    public <T> T receiveAndConvert(String queueName, Class<T> type) {
        Object result = rabbitTemplate.receiveAndConvert(queueName);
        return result != null ? type.cast(result) : null;
    }
}

2.3 RPC 模式

java
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Service;
import java.util.concurrent.TimeUnit;

@Service
public class RpcClient {
    
    private final RabbitTemplate rabbitTemplate;
    
    public RpcClient(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
    }
    
    public Object sendAndReceive(String exchange, String routingKey, Object request) {
        return rabbitTemplate.convertSendAndReceive(exchange, routingKey, request);
    }
    
    public <T> T sendAndReceive(String exchange, String routingKey, 
                                Object request, Class<T> responseType) {
        Object response = rabbitTemplate.convertSendAndReceive(exchange, routingKey, request);
        return response != null ? responseType.cast(response) : null;
    }
    
    public Object sendAndReceiveWithTimeout(String exchange, String routingKey,
                                            Object request, long timeout, TimeUnit unit) {
        rabbitTemplate.setReplyTimeout(unit.toMillis(timeout));
        return rabbitTemplate.convertSendAndReceive(exchange, routingKey, request);
    }
}

@Service
class RpcServer {
    
    private final RabbitTemplate rabbitTemplate;
    
    @org.springframework.amqp.rabbit.annotation.RabbitListener(queues = "rpc.queue")
    public Object handleRequest(Object request) {
        System.out.println("收到 RPC 请求: " + request);
        
        Object response = processRequest(request);
        
        return response;
    }
    
    private Object processRequest(Object request) {
        return "处理结果: " + request;
    }
}

2.4 确认回调

2.4.1 配置确认回调

java
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

@Component
public class ConfirmCallbackHandler implements RabbitTemplate.ConfirmCallback {
    
    private final RabbitTemplate rabbitTemplate;
    private final ConcurrentMap<String, PendingMessage> pendingMessages = new ConcurrentHashMap<>();
    
    public ConfirmCallbackHandler(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
    }
    
    @PostConstruct
    public void init() {
        rabbitTemplate.setConfirmCallback(this);
    }
    
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if (correlationData == null) {
            return;
        }
        
        String messageId = correlationData.getId();
        PendingMessage pending = pendingMessages.remove(messageId);
        
        if (ack) {
            System.out.println("消息确认成功: " + messageId);
            if (pending != null) {
                pending.onSuccess();
            }
        } else {
            System.err.println("消息确认失败: " + messageId + ", 原因: " + cause);
            if (pending != null) {
                pending.onFailure(cause);
            }
        }
    }
    
    public void sendWithConfirm(String exchange, String routingKey, Object message) {
        String messageId = UUID.randomUUID().toString();
        CorrelationData correlationData = new CorrelationData(messageId);
        
        pendingMessages.put(messageId, new PendingMessage(messageId, message));
        
        rabbitTemplate.convertAndSend(exchange, routingKey, message, correlationData);
    }
    
    public void sendWithConfirm(String exchange, String routingKey, Object message,
                                Runnable onSuccess, java.util.function.Consumer<String> onFailure) {
        String messageId = UUID.randomUUID().toString();
        CorrelationData correlationData = new CorrelationData(messageId);
        
        PendingMessage pending = new PendingMessage(messageId, message);
        pending.setOnSuccess(onSuccess);
        pending.setOnFailure(onFailure);
        
        pendingMessages.put(messageId, pending);
        
        rabbitTemplate.convertAndSend(exchange, routingKey, message, correlationData);
    }
}

class PendingMessage {
    private final String messageId;
    private final Object message;
    private Runnable onSuccess;
    private java.util.function.Consumer<String> onFailure;
    
    public PendingMessage(String messageId, Object message) {
        this.messageId = messageId;
        this.message = message;
    }
    
    public void onSuccess() {
        if (onSuccess != null) {
            onSuccess.run();
        }
    }
    
    public void onFailure(String cause) {
        if (onFailure != null) {
            onFailure.accept(cause);
        }
    }
    
    public String getMessageId() { return messageId; }
    public Object getMessage() { return message; }
    public void setOnSuccess(Runnable onSuccess) { this.onSuccess = onSuccess; }
    public void setOnFailure(java.util.function.Consumer<String> onFailure) { 
        this.onFailure = onFailure; 
    }
}

2.5 返回回调

java
import org.springframework.amqp.core.Returned;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;

@Component
public class ReturnsCallbackHandler implements RabbitTemplate.ReturnsCallback {
    
    private final RabbitTemplate rabbitTemplate;
    
    public ReturnsCallbackHandler(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
    }
    
    @PostConstruct
    public void init() {
        rabbitTemplate.setReturnsCallback(this);
        rabbitTemplate.setMandatory(true);
    }
    
    @Override
    public void returnedMessage(Returned returned) {
        System.err.println("=== 消息被退回 ===");
        System.err.println("交换器: " + returned.getExchange());
        System.err.println("路由键: " + returned.getRoutingKey());
        System.err.println("回复码: " + returned.getReplyCode());
        System.err.println("回复文本: " + returned.getReplyText());
        System.err.println("消息内容: " + new String(returned.getMessage().getBody()));
        
        handleReturnedMessage(returned);
    }
    
    private void handleReturnedMessage(Returned returned) {
        String exchange = returned.getExchange();
        String routingKey = returned.getRoutingKey();
        
        if (returned.getReplyCode() == 312) {
            System.err.println("消息无法路由,检查绑定关系");
        } else if (returned.getReplyCode() == 313) {
            System.err.println("没有消费者");
        }
        
        storeFailedMessage(returned);
    }
    
    private void storeFailedMessage(Returned returned) {
        System.out.println("存储失败消息以供后续处理");
    }
}

2.6 批量操作

java
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
import java.util.List;

@Component
public class BatchOperations {
    
    private final RabbitTemplate rabbitTemplate;
    
    public BatchOperations(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
    }
    
    public void sendBatch(String exchange, String routingKey, List<Object> messages) {
        rabbitTemplate.execute(channel -> {
            for (Object message : messages) {
                try {
                    byte[] body = rabbitTemplate.getMessageConverter()
                        .toMessage(message, null).getBody();
                    
                    channel.basicPublish(exchange, routingKey, null, body);
                } catch (Exception e) {
                    System.err.println("发送失败: " + e.getMessage());
                }
            }
            return null;
        });
    }
    
    public void sendBatchWithConfirm(String exchange, String routingKey, 
                                     List<Object> messages) {
        rabbitTemplate.invoke(operations -> {
            for (Object message : messages) {
                operations.convertAndSend(exchange, routingKey, message);
            }
            return null;
        });
    }
}

三、代码示例

3.1 完整 RabbitTemplate 服务

java
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Service;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;

@Service
public class RabbitTemplateService {
    
    private final RabbitTemplate rabbitTemplate;
    private final ConcurrentMap<String, CompletableFuture<Boolean>> confirmFutures = 
        new ConcurrentHashMap<>();
    
    public RabbitTemplateService(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
        
        setupCallbacks();
    }
    
    private void setupCallbacks() {
        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
            if (correlationData != null) {
                CompletableFuture<Boolean> future = confirmFutures.remove(correlationData.getId());
                if (future != null) {
                    future.complete(ack);
                }
            }
        });
        
        rabbitTemplate.setReturnsCallback(returned -> {
            System.err.println("消息被退回: " + returned.getReplyText());
        });
        
        rabbitTemplate.setMandatory(true);
    }
    
    public void send(String exchange, String routingKey, Object message) {
        rabbitTemplate.convertAndSend(exchange, routingKey, message);
    }
    
    public void sendWithHeaders(String exchange, String routingKey, Object message,
                                java.util.Map<String, Object> headers) {
        rabbitTemplate.convertAndSend(exchange, routingKey, message, msg -> {
            headers.forEach((key, value) -> 
                msg.getMessageProperties().setHeader(key, value));
            return msg;
        });
    }
    
    public CompletableFuture<Boolean> sendAsync(String exchange, String routingKey, 
                                                Object message) {
        String correlationId = UUID.randomUUID().toString();
        CorrelationData correlationData = new CorrelationData(correlationId);
        
        CompletableFuture<Boolean> future = new CompletableFuture<>();
        confirmFutures.put(correlationId, future);
        
        rabbitTemplate.convertAndSend(exchange, routingKey, message, correlationData);
        
        return future.orTimeout(30, TimeUnit.SECONDS);
    }
    
    public <T> T sendAndReceive(String exchange, String routingKey, 
                                Object request, Class<T> responseType) {
        Object response = rabbitTemplate.convertSendAndReceive(exchange, routingKey, request);
        return response != null ? responseType.cast(response) : null;
    }
    
    public <T> T receive(String queueName, Class<T> type) {
        Object message = rabbitTemplate.receiveAndConvert(queueName);
        return message != null ? type.cast(message) : null;
    }
    
    public <T> T receiveWithTimeout(String queueName, long timeoutMillis, Class<T> type) {
        Object message = rabbitTemplate.receiveAndConvert(queueName, timeoutMillis);
        return message != null ? type.cast(message) : null;
    }
    
    public void executeInChannel(ChannelOperation operation) {
        rabbitTemplate.execute(channel -> {
            operation.execute(channel);
            return null;
        });
    }
    
    @FunctionalInterface
    public interface ChannelOperation {
        void execute(com.rabbitmq.client.Channel channel) throws Exception;
    }
}

3.2 消息构建器

java
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.stereotype.Component;
import java.util.Map;
import java.util.UUID;

@Component
public class MessageBuilderService {
    
    public Message buildTextMessage(String content) {
        return MessageBuilder.withBody(content.getBytes())
            .setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN)
            .setMessageId(UUID.randomUUID().toString())
            .build();
    }
    
    public Message buildJsonMessage(Object object) {
        return MessageBuilder.withBody(object.toString().getBytes())
            .setContentType(MessageProperties.CONTENT_TYPE_JSON)
            .setMessageId(UUID.randomUUID().toString())
            .build();
    }
    
    public Message buildMessageWithHeaders(String content, Map<String, Object> headers) {
        MessageBuilder builder = MessageBuilder.withBody(content.getBytes())
            .setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN)
            .setMessageId(UUID.randomUUID().toString());
        
        headers.forEach((key, value) -> builder.setHeader(key, value));
        
        return builder.build();
    }
    
    public Message buildPersistentMessage(String content) {
        return MessageBuilder.withBody(content.getBytes())
            .setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN)
            .setDeliveryMode(MessageProperties.DELIVERY_MODE_PERSISTENT)
            .setMessageId(UUID.randomUUID().toString())
            .build();
    }
    
    public Message buildPriorityMessage(String content, int priority) {
        return MessageBuilder.withBody(content.getBytes())
            .setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN)
            .setPriority(priority)
            .setMessageId(UUID.randomUUID().toString())
            .build();
    }
    
    public Message buildTTLMessage(String content, long ttlMillis) {
        return MessageBuilder.withBody(content.getBytes())
            .setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN)
            .setExpiration(String.valueOf(ttlMillis))
            .setMessageId(UUID.randomUUID().toString())
            .build();
    }
}

3.3 重试机制

java
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.retry.RetryCallback;
import org.springframework.retry.RetryContext;
import org.springframework.retry.backoff.ExponentialBackOffPolicy;
import org.springframework.retry.listener.RetryListenerSupport;
import org.springframework.retry.policy.SimpleRetryPolicy;
import org.springframework.retry.support.RetryTemplate;
import org.springframework.stereotype.Component;
import java.util.Collections;

@Component
public class RetryRabbitTemplate {
    
    private final RabbitTemplate rabbitTemplate;
    private final RetryTemplate retryTemplate;
    
    public RetryRabbitTemplate(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
        this.retryTemplate = createRetryTemplate();
    }
    
    private RetryTemplate createRetryTemplate() {
        RetryTemplate template = new RetryTemplate();
        
        ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
        backOffPolicy.setInitialInterval(1000);
        backOffPolicy.setMultiplier(2.0);
        backOffPolicy.setMaxInterval(10000);
        template.setBackOffPolicy(backOffPolicy);
        
        SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy(3, 
            Collections.singletonMap(Exception.class, true));
        template.setRetryPolicy(retryPolicy);
        
        template.registerListener(new RetryListenerSupport() {
            @Override
            public <T, E extends Throwable> void onError(RetryContext context, 
                    RetryCallback<T, E> callback, Throwable throwable) {
                System.err.println("重试 " + context.getRetryCount() + ": " + throwable.getMessage());
            }
        });
        
        return template;
    }
    
    public void sendWithRetry(String exchange, String routingKey, Object message) {
        retryTemplate.execute(context -> {
            rabbitTemplate.convertAndSend(exchange, routingKey, message);
            return null;
        });
    }
    
    public <T> T sendAndReceiveWithRetry(String exchange, String routingKey, 
                                         Object request, Class<T> responseType) {
        return retryTemplate.execute(context -> {
            Object response = rabbitTemplate.convertSendAndReceive(exchange, routingKey, request);
            return response != null ? responseType.cast(response) : null;
        });
    }
}

四、实际应用场景

4.1 订单消息发送

java
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Service;
import java.time.LocalDateTime;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;

@Service
public class OrderMessageService {
    
    private final RabbitTemplate rabbitTemplate;
    
    public OrderMessageService(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
    }
    
    public void sendOrderCreated(Order order) {
        Map<String, Object> event = new HashMap<>();
        event.put("eventId", UUID.randomUUID().toString());
        event.put("eventType", "order.created");
        event.put("data", order);
        event.put("timestamp", LocalDateTime.now().toString());
        
        rabbitTemplate.convertAndSend("order.exchange", "order.created", event, message -> {
            message.getMessageProperties().setMessageId(UUID.randomUUID().toString());
            message.getMessageProperties().setContentType("application/json");
            message.getMessageProperties().setHeader("X-Event-Type", "order.created");
            message.getMessageProperties().setHeader("X-Source", "order-service");
            return message;
        });
    }
    
    public void sendOrderPaid(Order order) {
        Map<String, Object> event = new HashMap<>();
        event.put("eventId", UUID.randomUUID().toString());
        event.put("eventType", "order.paid");
        event.put("data", order);
        event.put("timestamp", LocalDateTime.now().toString());
        
        rabbitTemplate.convertAndSend("order.exchange", "order.paid", event);
    }
    
    public void sendOrderCancelled(Order order, String reason) {
        Map<String, Object> event = new HashMap<>();
        event.put("eventId", UUID.randomUUID().toString());
        event.put("eventType", "order.cancelled");
        event.put("data", order);
        event.put("reason", reason);
        event.put("timestamp", LocalDateTime.now().toString());
        
        rabbitTemplate.convertAndSend("order.exchange", "order.cancelled", event);
    }
}

class Order {
    private String orderId;
    private String userId;
    private double amount;
    
    public String getOrderId() { return orderId; }
    public void setOrderId(String orderId) { this.orderId = orderId; }
    public String getUserId() { return userId; }
    public void setUserId(String userId) { this.userId = userId; }
    public double getAmount() { return amount; }
    public void setAmount(double amount) { this.amount = amount; }
}

4.2 延迟消息发送

java
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Service;

@Service
public class DelayMessageService {
    
    private final RabbitTemplate rabbitTemplate;
    
    public DelayMessageService(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
    }
    
    public void sendDelayMessage(String exchange, String routingKey, 
                                  Object message, long delayMillis) {
        rabbitTemplate.convertAndSend(exchange, routingKey, message, msg -> {
            msg.getMessageProperties().setDelay((int) delayMillis);
            return msg;
        });
    }
    
    public void sendDelayMessageWithTTL(String exchange, String routingKey,
                                        Object message, long ttlMillis) {
        rabbitTemplate.convertAndSend(exchange, routingKey, message, msg -> {
            msg.getMessageProperties().setExpiration(String.valueOf(ttlMillis));
            return msg;
        });
    }
}

五、常见问题与解决方案

5.1 消息发送失败

问题描述: 消息发送时抛出异常。

解决方案

java
try {
    rabbitTemplate.convertAndSend(exchange, routingKey, message);
} catch (Exception e) {
    log.error("消息发送失败", e);
    storeMessageForRetry(message);
}

5.2 RPC 超时

问题描述: RPC 调用超时。

解决方案

java
rabbitTemplate.setReplyTimeout(30000);

Object response = rabbitTemplate.convertSendAndReceive(exchange, routingKey, request);

5.3 消息转换失败

问题描述: 消息序列化/反序列化失败。

解决方案

java
@Bean
public MessageConverter jsonMessageConverter() {
    ObjectMapper mapper = new ObjectMapper();
    mapper.registerModule(new JavaTimeModule());
    return new Jackson2JsonMessageConverter(mapper);
}

六、最佳实践建议

6.1 发送建议

text
发送建议:
├── 使用 convertAndSend 简化代码
├── 启用确认回调确保可靠性
├── 设置 mandatory 处理路由失败
├── 合理使用消息属性
└── 实现重试机制

6.2 性能建议

text
性能建议:
├── 复用 RabbitTemplate
├── 批量发送消息
├── 异步确认模式
├── 合理设置超时
└── 使用连接缓存

6.3 可靠性建议

text
可靠性建议:
├── 启用发布确认
├── 实现返回回调
├── 消息持久化
├── 错误处理
└── 监控发送状态

七、相关链接