Appearance
注解驱动消费者
一、概述
Spring AMQP 提供了基于注解的消息消费方式,通过 @RabbitListener 注解可以简化消费者的开发,实现声明式的消息监听。
1.1 注解架构
┌─────────────────────────────────────────────────────────────────────┐
│ 注解驱动消费者架构 │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ Spring Application │ │
│ │ │ │
│ │ ┌─────────────────────────────────────────────────────┐ │ │
│ │ │ @RabbitListener 注解 │ │ │
│ │ │ ├── queues: 监听队列 │ │ │
│ │ │ ├── bindings: 绑定声明 │ │ │
│ │ │ ├── containerFactory: 容器工厂 │ │ │
│ │ │ ├── ackMode: 确认模式 │ │ │
│ │ │ └── concurrency: 并发数 │ │ │
│ │ └─────────────────────────────────────────────────────┘ │ │
│ │ │ │ │
│ │ ▼ │ │
│ │ ┌─────────────────────────────────────────────────────┐ │ │
│ │ │ RabbitListenerAnnotationBeanPostProcessor │ │ │
│ │ │ (注解处理器) │ │ │
│ │ └─────────────────────────────────────────────────────┘ │ │
│ │ │ │ │
│ │ ▼ │ │
│ │ ┌─────────────────────────────────────────────────────┐ │ │
│ │ │ SimpleMessageListenerContainer │ │ │
│ │ │ (监听容器) │ │ │
│ │ └─────────────────────────────────────────────────────┘ │ │
│ └──────────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ RabbitMQ │ │
│ └─────────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────┘1.2 核心注解
text
┌─────────────────────────────────────────────────────────────────────┐
│ 核心注解说明 │
├───────────────────────┬─────────────────────────────────────────────┤
│ 注解 │ 说明 │
├───────────────────────┼─────────────────────────────────────────────┤
│ @RabbitListener │ 声明消息监听方法 │
│ │ 可配置队列、绑定、容器工厂等 │
├───────────────────────┼─────────────────────────────────────────────┤
│ @QueueBinding │ 声明队列绑定 │
│ │ 包含队列、交换器、路由键 │
├───────────────────────┼─────────────────────────────────────────────┤
│ @Queue │ 声明队列 │
│ │ 可配置队列属性 │
├───────────────────────┼─────────────────────────────────────────────┤
│ @Exchange │ 声明交换器 │
│ │ 可配置交换器类型和属性 │
├───────────────────────┼─────────────────────────────────────────────┤
│ @RabbitHandler │ 多方法消息处理器 │
│ │ 根据消息类型路由到不同方法 │
└───────────────────────┴─────────────────────────────────────────────┘二、核心知识点
2.1 @RabbitListener 基础
2.1.1 监听单个队列
java
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class SimpleConsumer {
@RabbitListener(queues = "order.queue")
public void handleOrder(Order order) {
System.out.println("收到订单: " + order.getOrderId());
}
@RabbitListener(queues = "payment.queue")
public void handlePayment(String message) {
System.out.println("收到支付消息: " + message);
}
@RabbitListener(queues = "notification.queue")
public void handleNotification(byte[] message) {
System.out.println("收到通知: " + new String(message));
}
}2.1.2 监听多个队列
java
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class MultiQueueConsumer {
@RabbitListener(queues = {"order.queue", "order.priority.queue"})
public void handleMultipleQueues(Message message) {
String queue = message.getMessageProperties().getConsumerQueue();
String body = new String(message.getBody());
System.out.println("从队列 " + queue + " 收到消息: " + body);
}
@RabbitListener(queues = {"#{orderQueue.name}", "#{paymentQueue.name}"})
public void handleWithSpEL(Message message) {
System.out.println("使用 SpEL 引用队列: " +
new String(message.getBody()));
}
}2.2 声明式绑定
2.2.1 自动声明队列和绑定
java
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class AutoBindingConsumer {
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "order.created.queue", durable = "true"),
exchange = @Exchange(name = "order.exchange", type = "topic"),
key = "order.created"
))
public void handleOrderCreated(Order order) {
System.out.println("处理订单创建: " + order.getOrderId());
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(
name = "payment.completed.queue",
durable = "true",
arguments = {
@Argument(name = "x-message-ttl", value = "86400000"),
@Argument(name = "x-dead-letter-exchange", value = "dlx.exchange")
}
),
exchange = @Exchange(name = "payment.exchange", type = "direct"),
key = "payment.completed"
))
public void handlePaymentCompleted(Payment payment) {
System.out.println("处理支付完成: " + payment.getPaymentId());
}
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue(name = "log.all.queue"),
exchange = @Exchange(name = "log.exchange", type = "topic"),
key = "#"
)
})
public void handleAllLogs(LogEntry log) {
System.out.println("处理日志: " + log);
}
}2.2.2 多绑定配置
java
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class MultiBindingConsumer {
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue(name = "order.created.queue"),
exchange = @Exchange(name = "order.exchange"),
key = "order.created"
),
@QueueBinding(
value = @Queue(name = "order.updated.queue"),
exchange = @Exchange(name = "order.exchange"),
key = "order.updated"
)
})
public void handleOrderEvents(OrderEvent event) {
System.out.println("处理订单事件: " + event.getEventType());
}
}2.3 手动确认模式
java
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import com.rabbitmq.client.Channel;
import java.io.IOException;
@Component
public class ManualAckConsumer {
@RabbitListener(queues = "order.queue", ackMode = "MANUAL")
public void handleOrderWithManualAck(Message message, Channel channel)
throws IOException {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
String body = new String(message.getBody());
System.out.println("处理消息: " + body);
processMessage(body);
channel.basicAck(deliveryTag, false);
System.out.println("消息已确认");
} catch (RetryableException e) {
System.err.println("可重试异常: " + e.getMessage());
channel.basicNack(deliveryTag, false, true);
} catch (Exception e) {
System.err.println("处理失败: " + e.getMessage());
channel.basicNack(deliveryTag, false, false);
}
}
private void processMessage(String message) throws Exception {
System.out.println("处理: " + message);
}
}
class RetryableException extends Exception {
public RetryableException(String message) {
super(message);
}
}2.4 @RabbitHandler 多方法处理
java
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@RabbitListener(queues = "multi.type.queue")
public class MultiTypeConsumer {
@RabbitHandler
public void handleOrder(Order order) {
System.out.println("处理订单: " + order.getOrderId());
}
@RabbitHandler
public void handlePayment(Payment payment) {
System.out.println("处理支付: " + payment.getPaymentId());
}
@RabbitHandler
public void handleUser(User user) {
System.out.println("处理用户: " + user.getUserId());
}
@RabbitHandler(isDefault = true)
public void handleDefault(Object message) {
System.out.println("处理默认消息: " + message);
}
}2.5 并发配置
java
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class ConcurrencyConsumer {
@RabbitListener(queues = "high.throughput.queue", concurrency = "10-50")
public void handleHighThroughput(Message message) {
System.out.println("高并发处理: " + new String(message.getBody()) +
", 线程: " + Thread.currentThread().getName());
}
@RabbitListener(
queues = "ordered.queue",
concurrency = "1",
containerFactory = "singleThreadFactory"
)
public void handleOrdered(Message message) {
System.out.println("顺序处理: " + new String(message.getBody()));
}
@RabbitListener(
queues = "custom.concurrency.queue",
concurrency = "#{@concurrencyConfig.min}-#{@concurrencyConfig.max}"
)
public void handleWithSpEL(Message message) {
System.out.println("SpEL 配置并发: " + new String(message.getBody()));
}
}2.6 异常处理
java
import org.springframework.amqp.AmqpRejectAndDontRequeueException;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.support.ListenerExecutionFailedException;
import org.springframework.stereotype.Component;
@Component
public class ExceptionHandlingConsumer {
@RabbitListener(queues = "error.handling.queue")
public void handleWithExceptionHandling(Message message) {
try {
processMessage(message);
} catch (BusinessException e) {
System.err.println("业务异常,不重试: " + e.getMessage());
throw new AmqpRejectAndDontRequeueException(e);
} catch (TemporaryException e) {
System.err.println("临时异常,重试: " + e.getMessage());
throw new RuntimeException(e);
}
}
private void processMessage(Message message) throws BusinessException, TemporaryException {
String body = new String(message.getBody());
if (body.contains("business_error")) {
throw new BusinessException("业务错误");
}
if (body.contains("temporary_error")) {
throw new TemporaryException("临时错误");
}
System.out.println("处理成功: " + body);
}
}
class BusinessException extends Exception {
public BusinessException(String message) { super(message); }
}
class TemporaryException extends Exception {
public TemporaryException(String message) { super(message); }
}三、代码示例
3.1 完整注解消费者
java
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;
import com.rabbitmq.client.Channel;
import java.io.IOException;
@Component
public class CompleteAnnotationConsumer {
@RabbitListener(
bindings = @QueueBinding(
value = @Queue(
name = "order.events.queue",
durable = "true",
arguments = {
@Argument(name = "x-message-ttl", value = "86400000"),
@Argument(name = "x-dead-letter-exchange", value = "dlx.exchange"),
@Argument(name = "x-dead-letter-routing-key", value = "dead.letter")
}
),
exchange = @Exchange(
name = "order.events.exchange",
type = "topic",
durable = "true"
),
key = "order.#"
),
containerFactory = "orderListenerFactory",
ackMode = "MANUAL",
concurrency = "3-10",
id = "orderEventListener",
autoStartup = "true"
)
public void handleOrderEvent(Message message, Channel channel) throws IOException {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
String routingKey = message.getMessageProperties().getReceivedRoutingKey();
try {
String body = new String(message.getBody());
System.out.println("收到订单事件 [" + routingKey + "]: " + body);
switch (routingKey) {
case "order.created":
handleOrderCreated(body);
break;
case "order.paid":
handleOrderPaid(body);
break;
case "order.shipped":
handleOrderShipped(body);
break;
case "order.cancelled":
handleOrderCancelled(body);
break;
default:
System.out.println("未知事件类型: " + routingKey);
}
channel.basicAck(deliveryTag, false);
} catch (Exception e) {
System.err.println("处理失败: " + e.getMessage());
channel.basicNack(deliveryTag, false, false);
}
}
private void handleOrderCreated(String body) {
System.out.println("处理订单创建: " + body);
}
private void handleOrderPaid(String body) {
System.out.println("处理订单支付: " + body);
}
private void handleOrderShipped(String body) {
System.out.println("处理订单发货: " + body);
}
private void handleOrderCancelled(String body) {
System.out.println("处理订单取消: " + body);
}
}3.2 动态队列监听
java
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
import java.util.List;
@Service
public class DynamicQueueConsumer {
@RabbitListener(queues = "#{queueManager.getQueueNames()}")
public void handleDynamicQueues(Message message) {
String queue = message.getMessageProperties().getConsumerQueue();
System.out.println("从动态队列 " + queue + " 收到消息");
}
}
@Component
class QueueManager {
public List<String> getQueueNames() {
return List.of("dynamic.queue.1", "dynamic.queue.2", "dynamic.queue.3");
}
}3.3 消费者生命周期管理
java
import org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistry;
import org.springframework.stereotype.Service;
import java.util.Set;
@Service
public class ListenerManagementService {
private final RabbitListenerEndpointRegistry registry;
public ListenerManagementService(RabbitListenerEndpointRegistry registry) {
this.registry = registry;
}
public void startListener(String id) {
var container = registry.getListenerContainer(id);
if (container != null && !container.isRunning()) {
container.start();
System.out.println("监听器已启动: " + id);
}
}
public void stopListener(String id) {
var container = registry.getListenerContainer(id);
if (container != null && container.isRunning()) {
container.stop();
System.out.println("监听器已停止: " + id);
}
}
public Set<String> getListenerIds() {
return registry.getListenerContainerIds();
}
public ListenerStatus getListenerStatus(String id) {
var container = registry.getListenerContainer(id);
if (container == null) {
return null;
}
return new ListenerStatus(
id,
container.isRunning(),
container.isAutoStartup()
);
}
}
record ListenerStatus(String id, boolean running, boolean autoStartup) {}四、实际应用场景
4.1 订单事件处理
java
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class OrderEventConsumer {
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "inventory.order.queue"),
exchange = @Exchange(name = "order.exchange", type = "topic"),
key = "order.created"
))
public void handleOrderCreatedForInventory(OrderEvent event) {
System.out.println("库存服务处理订单创建: " + event.getOrderId());
deductInventory(event);
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "notification.order.queue"),
exchange = @Exchange(name = "order.exchange", type = "topic"),
key = "order.#"
))
public void handleOrderEventForNotification(OrderEvent event) {
System.out.println("通知服务处理订单事件: " + event.getEventType());
sendNotification(event);
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "analytics.order.queue"),
exchange = @Exchange(name = "order.exchange", type = "topic"),
key = "order.#"
))
public void handleOrderEventForAnalytics(OrderEvent event) {
System.out.println("分析服务处理订单事件: " + event.getEventType());
recordAnalytics(event);
}
private void deductInventory(OrderEvent event) {
System.out.println("扣减库存");
}
private void sendNotification(OrderEvent event) {
System.out.println("发送通知");
}
private void recordAnalytics(OrderEvent event) {
System.out.println("记录分析数据");
}
}4.2 日志收集
java
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class LogCollectionConsumer {
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "log.error.queue"),
exchange = @Exchange(name = "log.exchange", type = "topic"),
key = "*.error"
))
public void handleErrorLogs(LogEntry log) {
System.err.println("[ERROR] " + log.getService() + ": " + log.getMessage());
sendAlert(log);
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "log.all.queue"),
exchange = @Exchange(name = "log.exchange", type = "topic"),
key = "#"
))
public void handleAllLogs(LogEntry log) {
System.out.println("[" + log.getLevel() + "] " +
log.getService() + ": " + log.getMessage());
storeLog(log);
}
private void sendAlert(LogEntry log) {
System.out.println("发送告警");
}
private void storeLog(LogEntry log) {
System.out.println("存储日志");
}
}
class LogEntry {
private String service;
private String level;
private String message;
public String getService() { return service; }
public void setService(String service) { this.service = service; }
public String getLevel() { return level; }
public void setLevel(String level) { this.level = level; }
public String getMessage() { return message; }
public void setMessage(String message) { this.message = message; }
}五、常见问题与解决方案
5.1 队列不存在
问题描述: 监听的队列不存在导致启动失败。
解决方案:
java
@RabbitListener(queues = "maybe.not.exist.queue",
missingQueuesFatal = "false")
public void handle(Message message) {
System.out.println("收到消息");
}5.2 消息类型不匹配
问题描述: 消息类型与方法参数类型不匹配。
解决方案:
java
@RabbitHandler
public void handleOrder(Order order) {
System.out.println("处理订单");
}
@RabbitHandler(isDefault = true)
public void handleUnknown(Object message) {
System.out.println("未知类型: " + message.getClass());
}5.3 消费者启动顺序
问题描述: 需要控制消费者启动顺序。
解决方案:
java
@RabbitListener(queues = "first.queue", autoStartup = "true")
public void handleFirst(Message message) {
System.out.println("第一个消费者");
}
@RabbitListener(queues = "second.queue", autoStartup = "false")
public void handleSecond(Message message) {
System.out.println("第二个消费者");
}六、最佳实践建议
6.1 注解使用建议
text
使用建议:
├── 使用声明式绑定简化配置
├── 合理设置并发数
├── 使用手动确认模式
├── 处理异常情况
└── 为消费者设置 ID6.2 性能建议
text
性能建议:
├── 根据业务设置并发数
├── 使用合适的容器工厂
├── 批量处理消息
├── 异步处理耗时操作
└── 控制预取数量6.3 可靠性建议
text
可靠性建议:
├── 使用手动确认模式
├── 实现幂等性处理
├── 配置死信队列
├── 处理异常情况
└── 监控消费者状态