Appearance
RabbitTemplate
一、概述
RabbitTemplate 是 Spring AMQP 提供的核心类,用于简化消息的发送和接收操作。它提供了丰富的 API 来处理各种消息场景,包括同步发送、异步发送、消息转换、确认回调等功能。
1.1 RabbitTemplate 架构
┌─────────────────────────────────────────────────────────────────────┐
│ RabbitTemplate 架构 │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ Application Layer │ │
│ │ (业务代码) │ │
│ └──────────────────────────────┬──────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ RabbitTemplate │ │
│ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │
│ │ │ send() │ │convertAndS- │ │receive() │ │ │
│ │ │ │ │end() │ │ │ │ │
│ │ └─────────────┘ └─────────────┘ └─────────────┘ │ │
│ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │
│ │ │ MessageConv-│ │ ConfirmCall-│ │ ReturnsCall-│ │ │
│ │ │ erter │ │ back │ │ back │ │ │
│ │ └─────────────┘ └─────────────┘ └─────────────┘ │ │
│ └──────────────────────────────┬──────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ ConnectionFactory │ │
│ └──────────────────────────────┬──────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ RabbitMQ Server │ │
│ └─────────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────┘1.2 核心功能
text
核心功能:
├── 消息发送:send、convertAndSend
├── 消息接收:receive、receiveAndConvert
├── RPC 通信:convertSendAndReceive
├── 消息转换:MessageConverter
├── 确认回调:ConfirmCallback
├── 返回回调:ReturnsCallback
└── 重试机制:RetryTemplate二、核心知识点
2.1 消息发送
2.1.1 基础发送方法
java
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
@Component
public class MessageSender {
private final RabbitTemplate rabbitTemplate;
public MessageSender(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}
public void sendRawMessage(String exchange, String routingKey, byte[] body) {
Message message = MessageBuilder.withBody(body).build();
rabbitTemplate.send(exchange, routingKey, message);
}
public void sendMessageWithProperties(String exchange, String routingKey,
String body, String contentType) {
Message message = MessageBuilder.withBody(body.getBytes())
.setContentType(contentType)
.setMessageId(java.util.UUID.randomUUID().toString())
.setPriority(5)
.build();
rabbitTemplate.send(exchange, routingKey, message);
}
public void convertAndSend(String exchange, String routingKey, Object message) {
rabbitTemplate.convertAndSend(exchange, routingKey, message);
}
public void convertAndSendWithProcessor(String exchange, String routingKey,
Object message) {
rabbitTemplate.convertAndSend(exchange, routingKey, message, msg -> {
msg.getMessageProperties().setMessageId(java.util.UUID.randomUUID().toString());
msg.getMessageProperties().setPriority(5);
msg.getMessageProperties().setHeader("X-Custom-Header", "value");
return msg;
});
}
}2.1.2 发送方法对比
text
┌─────────────────────────────────────────────────────────────────────┐
│ 发送方法对比 │
├──────────────────────┬──────────────────────────────────────────────┤
│ 方法 │ 说明 │
├──────────────────────┼──────────────────────────────────────────────┤
│ send() │ 发送原始 Message 对象 │
│ │ 需要手动构建消息 │
├──────────────────────┼──────────────────────────────────────────────┤
│ convertAndSend() │ 发送对象,自动转换 │
│ │ 使用 MessageConverter 转换 │
├──────────────────────┼──────────────────────────────────────────────┤
│ convertSendAndRecei-│ 发送并等待响应 │
│ ve() │ RPC 模式 │
├──────────────────────┼──────────────────────────────────────────────┤
│ sendAndReceive() │ 发送原始消息并等待响应 │
└──────────────────────┴──────────────────────────────────────────────┘2.2 消息接收
2.2.1 接收方法
java
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
@Component
public class MessageReceiver {
private final RabbitTemplate rabbitTemplate;
public MessageReceiver(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}
public Message receiveRaw(String queueName) {
return rabbitTemplate.receive(queueName);
}
public Message receiveWithTimeout(String queueName, long timeoutMillis) {
return rabbitTemplate.receive(queueName, timeoutMillis);
}
public Object receiveAndConvert(String queueName) {
return rabbitTemplate.receiveAndConvert(queueName);
}
public <T> T receiveAndConvert(String queueName, Class<T> type) {
Object result = rabbitTemplate.receiveAndConvert(queueName);
return result != null ? type.cast(result) : null;
}
}2.3 RPC 模式
java
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Service;
import java.util.concurrent.TimeUnit;
@Service
public class RpcClient {
private final RabbitTemplate rabbitTemplate;
public RpcClient(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}
public Object sendAndReceive(String exchange, String routingKey, Object request) {
return rabbitTemplate.convertSendAndReceive(exchange, routingKey, request);
}
public <T> T sendAndReceive(String exchange, String routingKey,
Object request, Class<T> responseType) {
Object response = rabbitTemplate.convertSendAndReceive(exchange, routingKey, request);
return response != null ? responseType.cast(response) : null;
}
public Object sendAndReceiveWithTimeout(String exchange, String routingKey,
Object request, long timeout, TimeUnit unit) {
rabbitTemplate.setReplyTimeout(unit.toMillis(timeout));
return rabbitTemplate.convertSendAndReceive(exchange, routingKey, request);
}
}
@Service
class RpcServer {
private final RabbitTemplate rabbitTemplate;
@org.springframework.amqp.rabbit.annotation.RabbitListener(queues = "rpc.queue")
public Object handleRequest(Object request) {
System.out.println("收到 RPC 请求: " + request);
Object response = processRequest(request);
return response;
}
private Object processRequest(Object request) {
return "处理结果: " + request;
}
}2.4 确认回调
2.4.1 配置确认回调
java
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@Component
public class ConfirmCallbackHandler implements RabbitTemplate.ConfirmCallback {
private final RabbitTemplate rabbitTemplate;
private final ConcurrentMap<String, PendingMessage> pendingMessages = new ConcurrentHashMap<>();
public ConfirmCallbackHandler(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}
@PostConstruct
public void init() {
rabbitTemplate.setConfirmCallback(this);
}
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (correlationData == null) {
return;
}
String messageId = correlationData.getId();
PendingMessage pending = pendingMessages.remove(messageId);
if (ack) {
System.out.println("消息确认成功: " + messageId);
if (pending != null) {
pending.onSuccess();
}
} else {
System.err.println("消息确认失败: " + messageId + ", 原因: " + cause);
if (pending != null) {
pending.onFailure(cause);
}
}
}
public void sendWithConfirm(String exchange, String routingKey, Object message) {
String messageId = UUID.randomUUID().toString();
CorrelationData correlationData = new CorrelationData(messageId);
pendingMessages.put(messageId, new PendingMessage(messageId, message));
rabbitTemplate.convertAndSend(exchange, routingKey, message, correlationData);
}
public void sendWithConfirm(String exchange, String routingKey, Object message,
Runnable onSuccess, java.util.function.Consumer<String> onFailure) {
String messageId = UUID.randomUUID().toString();
CorrelationData correlationData = new CorrelationData(messageId);
PendingMessage pending = new PendingMessage(messageId, message);
pending.setOnSuccess(onSuccess);
pending.setOnFailure(onFailure);
pendingMessages.put(messageId, pending);
rabbitTemplate.convertAndSend(exchange, routingKey, message, correlationData);
}
}
class PendingMessage {
private final String messageId;
private final Object message;
private Runnable onSuccess;
private java.util.function.Consumer<String> onFailure;
public PendingMessage(String messageId, Object message) {
this.messageId = messageId;
this.message = message;
}
public void onSuccess() {
if (onSuccess != null) {
onSuccess.run();
}
}
public void onFailure(String cause) {
if (onFailure != null) {
onFailure.accept(cause);
}
}
public String getMessageId() { return messageId; }
public Object getMessage() { return message; }
public void setOnSuccess(Runnable onSuccess) { this.onSuccess = onSuccess; }
public void setOnFailure(java.util.function.Consumer<String> onFailure) {
this.onFailure = onFailure;
}
}2.5 返回回调
java
import org.springframework.amqp.core.Returned;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
@Component
public class ReturnsCallbackHandler implements RabbitTemplate.ReturnsCallback {
private final RabbitTemplate rabbitTemplate;
public ReturnsCallbackHandler(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}
@PostConstruct
public void init() {
rabbitTemplate.setReturnsCallback(this);
rabbitTemplate.setMandatory(true);
}
@Override
public void returnedMessage(Returned returned) {
System.err.println("=== 消息被退回 ===");
System.err.println("交换器: " + returned.getExchange());
System.err.println("路由键: " + returned.getRoutingKey());
System.err.println("回复码: " + returned.getReplyCode());
System.err.println("回复文本: " + returned.getReplyText());
System.err.println("消息内容: " + new String(returned.getMessage().getBody()));
handleReturnedMessage(returned);
}
private void handleReturnedMessage(Returned returned) {
String exchange = returned.getExchange();
String routingKey = returned.getRoutingKey();
if (returned.getReplyCode() == 312) {
System.err.println("消息无法路由,检查绑定关系");
} else if (returned.getReplyCode() == 313) {
System.err.println("没有消费者");
}
storeFailedMessage(returned);
}
private void storeFailedMessage(Returned returned) {
System.out.println("存储失败消息以供后续处理");
}
}2.6 批量操作
java
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
import java.util.List;
@Component
public class BatchOperations {
private final RabbitTemplate rabbitTemplate;
public BatchOperations(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}
public void sendBatch(String exchange, String routingKey, List<Object> messages) {
rabbitTemplate.execute(channel -> {
for (Object message : messages) {
try {
byte[] body = rabbitTemplate.getMessageConverter()
.toMessage(message, null).getBody();
channel.basicPublish(exchange, routingKey, null, body);
} catch (Exception e) {
System.err.println("发送失败: " + e.getMessage());
}
}
return null;
});
}
public void sendBatchWithConfirm(String exchange, String routingKey,
List<Object> messages) {
rabbitTemplate.invoke(operations -> {
for (Object message : messages) {
operations.convertAndSend(exchange, routingKey, message);
}
return null;
});
}
}三、代码示例
3.1 完整 RabbitTemplate 服务
java
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Service;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
@Service
public class RabbitTemplateService {
private final RabbitTemplate rabbitTemplate;
private final ConcurrentMap<String, CompletableFuture<Boolean>> confirmFutures =
new ConcurrentHashMap<>();
public RabbitTemplateService(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
setupCallbacks();
}
private void setupCallbacks() {
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
if (correlationData != null) {
CompletableFuture<Boolean> future = confirmFutures.remove(correlationData.getId());
if (future != null) {
future.complete(ack);
}
}
});
rabbitTemplate.setReturnsCallback(returned -> {
System.err.println("消息被退回: " + returned.getReplyText());
});
rabbitTemplate.setMandatory(true);
}
public void send(String exchange, String routingKey, Object message) {
rabbitTemplate.convertAndSend(exchange, routingKey, message);
}
public void sendWithHeaders(String exchange, String routingKey, Object message,
java.util.Map<String, Object> headers) {
rabbitTemplate.convertAndSend(exchange, routingKey, message, msg -> {
headers.forEach((key, value) ->
msg.getMessageProperties().setHeader(key, value));
return msg;
});
}
public CompletableFuture<Boolean> sendAsync(String exchange, String routingKey,
Object message) {
String correlationId = UUID.randomUUID().toString();
CorrelationData correlationData = new CorrelationData(correlationId);
CompletableFuture<Boolean> future = new CompletableFuture<>();
confirmFutures.put(correlationId, future);
rabbitTemplate.convertAndSend(exchange, routingKey, message, correlationData);
return future.orTimeout(30, TimeUnit.SECONDS);
}
public <T> T sendAndReceive(String exchange, String routingKey,
Object request, Class<T> responseType) {
Object response = rabbitTemplate.convertSendAndReceive(exchange, routingKey, request);
return response != null ? responseType.cast(response) : null;
}
public <T> T receive(String queueName, Class<T> type) {
Object message = rabbitTemplate.receiveAndConvert(queueName);
return message != null ? type.cast(message) : null;
}
public <T> T receiveWithTimeout(String queueName, long timeoutMillis, Class<T> type) {
Object message = rabbitTemplate.receiveAndConvert(queueName, timeoutMillis);
return message != null ? type.cast(message) : null;
}
public void executeInChannel(ChannelOperation operation) {
rabbitTemplate.execute(channel -> {
operation.execute(channel);
return null;
});
}
@FunctionalInterface
public interface ChannelOperation {
void execute(com.rabbitmq.client.Channel channel) throws Exception;
}
}3.2 消息构建器
java
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.stereotype.Component;
import java.util.Map;
import java.util.UUID;
@Component
public class MessageBuilderService {
public Message buildTextMessage(String content) {
return MessageBuilder.withBody(content.getBytes())
.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN)
.setMessageId(UUID.randomUUID().toString())
.build();
}
public Message buildJsonMessage(Object object) {
return MessageBuilder.withBody(object.toString().getBytes())
.setContentType(MessageProperties.CONTENT_TYPE_JSON)
.setMessageId(UUID.randomUUID().toString())
.build();
}
public Message buildMessageWithHeaders(String content, Map<String, Object> headers) {
MessageBuilder builder = MessageBuilder.withBody(content.getBytes())
.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN)
.setMessageId(UUID.randomUUID().toString());
headers.forEach((key, value) -> builder.setHeader(key, value));
return builder.build();
}
public Message buildPersistentMessage(String content) {
return MessageBuilder.withBody(content.getBytes())
.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN)
.setDeliveryMode(MessageProperties.DELIVERY_MODE_PERSISTENT)
.setMessageId(UUID.randomUUID().toString())
.build();
}
public Message buildPriorityMessage(String content, int priority) {
return MessageBuilder.withBody(content.getBytes())
.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN)
.setPriority(priority)
.setMessageId(UUID.randomUUID().toString())
.build();
}
public Message buildTTLMessage(String content, long ttlMillis) {
return MessageBuilder.withBody(content.getBytes())
.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN)
.setExpiration(String.valueOf(ttlMillis))
.setMessageId(UUID.randomUUID().toString())
.build();
}
}3.3 重试机制
java
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.retry.RetryCallback;
import org.springframework.retry.RetryContext;
import org.springframework.retry.backoff.ExponentialBackOffPolicy;
import org.springframework.retry.listener.RetryListenerSupport;
import org.springframework.retry.policy.SimpleRetryPolicy;
import org.springframework.retry.support.RetryTemplate;
import org.springframework.stereotype.Component;
import java.util.Collections;
@Component
public class RetryRabbitTemplate {
private final RabbitTemplate rabbitTemplate;
private final RetryTemplate retryTemplate;
public RetryRabbitTemplate(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
this.retryTemplate = createRetryTemplate();
}
private RetryTemplate createRetryTemplate() {
RetryTemplate template = new RetryTemplate();
ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
backOffPolicy.setInitialInterval(1000);
backOffPolicy.setMultiplier(2.0);
backOffPolicy.setMaxInterval(10000);
template.setBackOffPolicy(backOffPolicy);
SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy(3,
Collections.singletonMap(Exception.class, true));
template.setRetryPolicy(retryPolicy);
template.registerListener(new RetryListenerSupport() {
@Override
public <T, E extends Throwable> void onError(RetryContext context,
RetryCallback<T, E> callback, Throwable throwable) {
System.err.println("重试 " + context.getRetryCount() + ": " + throwable.getMessage());
}
});
return template;
}
public void sendWithRetry(String exchange, String routingKey, Object message) {
retryTemplate.execute(context -> {
rabbitTemplate.convertAndSend(exchange, routingKey, message);
return null;
});
}
public <T> T sendAndReceiveWithRetry(String exchange, String routingKey,
Object request, Class<T> responseType) {
return retryTemplate.execute(context -> {
Object response = rabbitTemplate.convertSendAndReceive(exchange, routingKey, request);
return response != null ? responseType.cast(response) : null;
});
}
}四、实际应用场景
4.1 订单消息发送
java
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Service;
import java.time.LocalDateTime;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
@Service
public class OrderMessageService {
private final RabbitTemplate rabbitTemplate;
public OrderMessageService(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}
public void sendOrderCreated(Order order) {
Map<String, Object> event = new HashMap<>();
event.put("eventId", UUID.randomUUID().toString());
event.put("eventType", "order.created");
event.put("data", order);
event.put("timestamp", LocalDateTime.now().toString());
rabbitTemplate.convertAndSend("order.exchange", "order.created", event, message -> {
message.getMessageProperties().setMessageId(UUID.randomUUID().toString());
message.getMessageProperties().setContentType("application/json");
message.getMessageProperties().setHeader("X-Event-Type", "order.created");
message.getMessageProperties().setHeader("X-Source", "order-service");
return message;
});
}
public void sendOrderPaid(Order order) {
Map<String, Object> event = new HashMap<>();
event.put("eventId", UUID.randomUUID().toString());
event.put("eventType", "order.paid");
event.put("data", order);
event.put("timestamp", LocalDateTime.now().toString());
rabbitTemplate.convertAndSend("order.exchange", "order.paid", event);
}
public void sendOrderCancelled(Order order, String reason) {
Map<String, Object> event = new HashMap<>();
event.put("eventId", UUID.randomUUID().toString());
event.put("eventType", "order.cancelled");
event.put("data", order);
event.put("reason", reason);
event.put("timestamp", LocalDateTime.now().toString());
rabbitTemplate.convertAndSend("order.exchange", "order.cancelled", event);
}
}
class Order {
private String orderId;
private String userId;
private double amount;
public String getOrderId() { return orderId; }
public void setOrderId(String orderId) { this.orderId = orderId; }
public String getUserId() { return userId; }
public void setUserId(String userId) { this.userId = userId; }
public double getAmount() { return amount; }
public void setAmount(double amount) { this.amount = amount; }
}4.2 延迟消息发送
java
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Service;
@Service
public class DelayMessageService {
private final RabbitTemplate rabbitTemplate;
public DelayMessageService(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}
public void sendDelayMessage(String exchange, String routingKey,
Object message, long delayMillis) {
rabbitTemplate.convertAndSend(exchange, routingKey, message, msg -> {
msg.getMessageProperties().setDelay((int) delayMillis);
return msg;
});
}
public void sendDelayMessageWithTTL(String exchange, String routingKey,
Object message, long ttlMillis) {
rabbitTemplate.convertAndSend(exchange, routingKey, message, msg -> {
msg.getMessageProperties().setExpiration(String.valueOf(ttlMillis));
return msg;
});
}
}五、常见问题与解决方案
5.1 消息发送失败
问题描述: 消息发送时抛出异常。
解决方案:
java
try {
rabbitTemplate.convertAndSend(exchange, routingKey, message);
} catch (Exception e) {
log.error("消息发送失败", e);
storeMessageForRetry(message);
}5.2 RPC 超时
问题描述: RPC 调用超时。
解决方案:
java
rabbitTemplate.setReplyTimeout(30000);
Object response = rabbitTemplate.convertSendAndReceive(exchange, routingKey, request);5.3 消息转换失败
问题描述: 消息序列化/反序列化失败。
解决方案:
java
@Bean
public MessageConverter jsonMessageConverter() {
ObjectMapper mapper = new ObjectMapper();
mapper.registerModule(new JavaTimeModule());
return new Jackson2JsonMessageConverter(mapper);
}六、最佳实践建议
6.1 发送建议
text
发送建议:
├── 使用 convertAndSend 简化代码
├── 启用确认回调确保可靠性
├── 设置 mandatory 处理路由失败
├── 合理使用消息属性
└── 实现重试机制6.2 性能建议
text
性能建议:
├── 复用 RabbitTemplate
├── 批量发送消息
├── 异步确认模式
├── 合理设置超时
└── 使用连接缓存6.3 可靠性建议
text
可靠性建议:
├── 启用发布确认
├── 实现返回回调
├── 消息持久化
├── 错误处理
└── 监控发送状态