Appearance
消息监听容器
一、概述
消息监听容器(Message Listener Container)是 Spring AMQP 提供的核心组件,用于管理消息消费者的生命周期,处理消息的并发消费、错误处理和重试机制。
1.1 容器架构
┌─────────────────────────────────────────────────────────────────────┐
│ 消息监听容器架构 │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ SimpleMessageListenerContainer │ │
│ │ │ │
│ │ ┌─────────────────────────────────────────────────────┐ │ │
│ │ │ 消费者线程池 │ │ │
│ │ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ │ │
│ │ │ │Consumer1│ │Consumer2│ │Consumer3│ │ConsumerN│ │ │ │
│ │ │ │ Thread │ │ Thread │ │ Thread │ │ Thread │ │ │ │
│ │ │ └────┬────┘ └────┬────┘ └────┬────┘ └────┬────┘ │ │ │
│ │ │ │ │ │ │ │ │ │
│ │ └───────┼───────────┼───────────┼───────────┼─────────┘ │ │
│ │ │ │ │ │ │ │
│ │ ▼ ▼ ▼ ▼ │ │
│ │ ┌─────────────────────────────────────────────────────┐ │ │
│ │ │ MessageListener / @RabbitListener │ │ │
│ │ └─────────────────────────────────────────────────────┘ │ │
│ │ │ │
│ │ 配置项: │ │
│ │ ├── concurrency: 并发消费者数量 │ │
│ │ ├── prefetchCount: 预取数量 │ │
│ │ ├── acknowledgeMode: 确认模式 │ │
│ │ └── errorHandler: 错误处理器 │ │
│ └──────────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ Queue │ │
│ └─────────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────┘1.2 容器类型
text
┌─────────────────────────────────────────────────────────────────────┐
│ 监听容器类型 │
├───────────────────────┬─────────────────────────────────────────────┤
│ 类型 │ 说明 │
├───────────────────────┼─────────────────────────────────────────────┤
│ SimpleMessageListener │ 简单消息监听容器 │
│ Container │ 支持并发消费、事务、重试 │
├───────────────────────┼─────────────────────────────────────────────┤
│ DirectMessageListener │ 直接消息监听容器 │
│ Container │ 更底层的控制,适合高级场景 │
└───────────────────────┴─────────────────────────────────────────────┘二、核心知识点
2.1 基础配置
2.1.1 Java 配置
java
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class ListenerContainerConfig {
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(
ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setConcurrentConsumers(3);
factory.setMaxConcurrentConsumers(10);
factory.setPrefetchCount(10);
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
factory.setAutoStartup(true);
factory.setDefaultRequeueRejected(false);
return factory;
}
@Bean
public SimpleMessageListenerContainer messageListenerContainer(
ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames("order.queue", "payment.queue");
container.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
System.out.println("收到消息: " + new String(message.getBody()));
}
});
container.setConcurrentConsumers(3);
container.setMaxConcurrentConsumers(10);
container.setPrefetchCount(10);
container.setAcknowledgeMode(AcknowledgeMode.AUTO);
return container;
}
}2.1.2 YAML 配置
yaml
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: manual
concurrency: 3
max-concurrency: 10
prefetch: 10
auto-startup: true
default-requeue-rejected: false
missing-queues-fatal: false
idle-event-interval: 600002.2 并发消费
java
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class ConcurrentConsumer {
@RabbitListener(queues = "order.queue", concurrency = "3-10")
public void handleOrder(Order order) {
System.out.println("处理订单: " + order.getOrderId() +
", 线程: " + Thread.currentThread().getName());
}
@RabbitListener(queues = "payment.queue", containerFactory = "highConcurrencyFactory")
public void handlePayment(Payment payment) {
System.out.println("处理支付: " + payment.getPaymentId());
}
}
@Configuration
class HighConcurrencyConfig {
@Bean
public SimpleRabbitListenerContainerFactory highConcurrencyFactory(
ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setConcurrentConsumers(10);
factory.setMaxConcurrentConsumers(50);
factory.setPrefetchCount(20);
return factory;
}
}2.3 确认模式
java
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import com.rabbitmq.client.Channel;
import java.io.IOException;
@Component
public class AcknowledgeModeConsumer {
@RabbitListener(queues = "auto.ack.queue", ackMode = "AUTO")
public void handleAutoAck(Message message) {
System.out.println("自动确认模式: " + new String(message.getBody()));
}
@RabbitListener(queues = "manual.ack.queue", ackMode = "MANUAL")
public void handleManualAck(Message message, Channel channel) throws IOException {
try {
String body = new String(message.getBody());
System.out.println("手动确认模式: " + body);
processMessage(body);
long deliveryTag = message.getMessageProperties().getDeliveryTag();
channel.basicAck(deliveryTag, false);
} catch (Exception e) {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
channel.basicNack(deliveryTag, false, false);
}
}
private void processMessage(String message) throws Exception {
System.out.println("处理消息: " + message);
}
}2.4 错误处理
java
import org.springframework.amqp.rabbit.listener.ConditionalRejectingErrorHandler;
import org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException;
import org.springframework.stereotype.Component;
import org.springframework.util.ErrorHandler;
@Component
public class CustomErrorHandler implements ErrorHandler {
@Override
public void handleError(Throwable t) {
if (t instanceof ListenerExecutionFailedException) {
ListenerExecutionFailedException ex = (ListenerExecutionFailedException) t;
System.err.println("消息处理失败:");
System.err.println(" 队列: " + ex.getFailedMessage().getMessageProperties().getConsumerQueue());
System.err.println(" 消息: " + new String(ex.getFailedMessage().getBody()));
System.err.println(" 异常: " + ex.getCause().getMessage());
sendToDeadLetterQueue(ex.getFailedMessage());
}
}
private void sendToDeadLetterQueue(org.springframework.amqp.core.Message message) {
System.out.println("发送到死信队列");
}
}
@Configuration
class ErrorHandlerConfig {
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(
ConnectionFactory connectionFactory,
CustomErrorHandler errorHandler) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setErrorHandler(errorHandler);
return factory;
}
}2.5 重试机制
java
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.retry.RejectAndDontRequeueRecoverer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.retry.backoff.ExponentialBackOffPolicy;
import org.springframework.retry.policy.SimpleRetryPolicy;
import org.springframework.retry.support.RetryTemplate;
@Configuration
public class RetryConfig {
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(
ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
RetryTemplate retryTemplate = new RetryTemplate();
ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
backOffPolicy.setInitialInterval(1000);
backOffPolicy.setMultiplier(2.0);
backOffPolicy.setMaxInterval(10000);
retryTemplate.setBackOffPolicy(backOffPolicy);
SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
retryPolicy.setMaxAttempts(3);
retryTemplate.setRetryPolicy(retryPolicy);
factory.setRetryTemplate(retryTemplate);
factory.setRecoveryCallback(context -> {
System.err.println("重试失败,执行恢复操作");
return null;
});
return factory;
}
}2.6 消费者标签
java
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class ConsumerTagDemo {
@RabbitListener(queues = "order.queue", id = "orderConsumer")
public void handleOrder(Order order) {
System.out.println("处理订单: " + order.getOrderId());
}
@RabbitListener(queues = "payment.queue", id = "paymentConsumer",
autoStartup = "false")
public void handlePayment(Payment payment) {
System.out.println("处理支付: " + payment.getPaymentId());
}
}三、代码示例
3.1 完整监听容器配置
java
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistry;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.retry.backoff.ExponentialBackOffPolicy;
import org.springframework.retry.policy.SimpleRetryPolicy;
import org.springframework.retry.support.RetryTemplate;
@Configuration
@EnableRabbit
public class CompleteListenerContainerConfig {
@Bean
public MessageConverter jsonMessageConverter() {
return new Jackson2JsonMessageConverter();
}
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(
ConnectionFactory connectionFactory,
MessageConverter jsonMessageConverter) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setMessageConverter(jsonMessageConverter);
factory.setConcurrentConsumers(3);
factory.setMaxConcurrentConsumers(10);
factory.setPrefetchCount(10);
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
factory.setDefaultRequeueRejected(false);
factory.setAutoStartup(true);
factory.setMissingQueuesFatal(false);
factory.setIdleEventInterval(60000L);
RetryTemplate retryTemplate = createRetryTemplate();
factory.setRetryTemplate(retryTemplate);
factory.setErrorHandler(t -> {
System.err.println("消息处理错误: " + t.getMessage());
});
return factory;
}
private RetryTemplate createRetryTemplate() {
RetryTemplate retryTemplate = new RetryTemplate();
ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
backOffPolicy.setInitialInterval(1000);
backOffPolicy.setMultiplier(2.0);
backOffPolicy.setMaxInterval(30000);
retryTemplate.setBackOffPolicy(backOffPolicy);
SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
retryPolicy.setMaxAttempts(3);
retryTemplate.setRetryPolicy(retryPolicy);
return retryTemplate;
}
@Bean
public SimpleMessageListenerContainer orderListenerContainer(
ConnectionFactory connectionFactory,
OrderMessageListener orderMessageListener) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames("order.queue");
MessageListenerAdapter adapter = new MessageListenerAdapter(orderMessageListener, "handleMessage");
adapter.setMessageConverter(jsonMessageConverter());
container.setMessageListener(adapter);
container.setConcurrentConsumers(5);
container.setMaxConcurrentConsumers(15);
container.setPrefetchCount(20);
container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
return container;
}
}
@Component
class OrderMessageListener {
public void handleMessage(Order order, Message message,
com.rabbitmq.client.Channel channel) throws Exception {
try {
System.out.println("处理订单: " + order.getOrderId());
long deliveryTag = message.getMessageProperties().getDeliveryTag();
channel.basicAck(deliveryTag, false);
} catch (Exception e) {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
channel.basicNack(deliveryTag, false, true);
}
}
}3.2 动态容器管理
java
import org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistry;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.stereotype.Service;
import java.util.Set;
@Service
public class ContainerManagementService {
private final RabbitListenerEndpointRegistry registry;
public ContainerManagementService(RabbitListenerEndpointRegistry registry) {
this.registry = registry;
}
public void startContainer(String id) {
SimpleMessageListenerContainer container =
(SimpleMessageListenerContainer) registry.getListenerContainer(id);
if (container != null && !container.isRunning()) {
container.start();
System.out.println("容器已启动: " + id);
}
}
public void stopContainer(String id) {
SimpleMessageListenerContainer container =
(SimpleMessageListenerContainer) registry.getListenerContainer(id);
if (container != null && container.isRunning()) {
container.stop();
System.out.println("容器已停止: " + id);
}
}
public void pauseContainer(String id) {
SimpleMessageListenerContainer container =
(SimpleMessageListenerContainer) registry.getListenerContainer(id);
if (container != null) {
container.stop(() -> System.out.println("容器已暂停: " + id));
}
}
public void resumeContainer(String id) {
SimpleMessageListenerContainer container =
(SimpleMessageListenerContainer) registry.getListenerContainer(id);
if (container != null) {
container.start();
System.out.println("容器已恢复: " + id);
}
}
public Set<String> getContainerIds() {
return registry.getListenerContainerIds();
}
public ContainerStatus getContainerStatus(String id) {
SimpleMessageListenerContainer container =
(SimpleMessageListenerContainer) registry.getListenerContainer(id);
if (container == null) {
return null;
}
return new ContainerStatus(
id,
container.isRunning(),
container.getActiveConsumerCount(),
container.getConcurrentConsumers()
);
}
}
class ContainerStatus {
private final String id;
private final boolean running;
private final int activeConsumerCount;
private final int concurrentConsumers;
public ContainerStatus(String id, boolean running, int activeConsumerCount,
int concurrentConsumers) {
this.id = id;
this.running = running;
this.activeConsumerCount = activeConsumerCount;
this.concurrentConsumers = concurrentConsumers;
}
public String getId() { return id; }
public boolean isRunning() { return running; }
public int getActiveConsumerCount() { return activeConsumerCount; }
public int getConcurrentConsumers() { return concurrentConsumers; }
}3.3 多队列监听
java
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import com.rabbitmq.client.Channel;
import java.io.IOException;
@Component
public class MultiQueueConsumer {
@RabbitListener(queues = {"order.queue", "order.priority.queue"})
public void handleMultipleQueues(Message message, Channel channel) throws IOException {
String queue = message.getMessageProperties().getConsumerQueue();
String body = new String(message.getBody());
System.out.println("从队列 " + queue + " 收到消息: " + body);
try {
processMessage(queue, body);
long deliveryTag = message.getMessageProperties().getDeliveryTag();
channel.basicAck(deliveryTag, false);
} catch (Exception e) {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
channel.basicNack(deliveryTag, false, false);
}
}
private void processMessage(String queue, String body) throws Exception {
System.out.println("处理来自 " + queue + " 的消息: " + body);
}
}四、实际应用场景
4.1 高并发消费
java
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@Component
public class HighConcurrencyConsumer {
private final ExecutorService executorService = Executors.newFixedThreadPool(20);
@RabbitListener(queues = "high.throughput.queue",
concurrency = "10-50",
containerFactory = "highConcurrencyFactory")
public void handleHighThroughput(Message message) {
executorService.submit(() -> {
try {
processMessage(message);
} catch (Exception e) {
System.err.println("处理失败: " + e.getMessage());
}
});
}
private void processMessage(Message message) {
System.out.println("处理消息: " + new String(message.getBody()));
}
}4.2 顺序消费
java
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.locks.ReentrantLock;
@Component
public class OrderedConsumer {
private final ConcurrentMap<String, ReentrantLock> locks = new ConcurrentHashMap<>();
@RabbitListener(queues = "ordered.queue", concurrency = "1")
public void handleOrderedMessage(OrderEvent event) {
String orderId = event.getOrderId();
ReentrantLock lock = locks.computeIfAbsent(orderId, k -> new ReentrantLock());
lock.lock();
try {
System.out.println("顺序处理订单事件: " + event);
processOrderEvent(event);
} finally {
lock.unlock();
}
}
private void processOrderEvent(OrderEvent event) {
System.out.println("处理事件: " + event.getEventType());
}
}
class OrderEvent {
private String orderId;
private String eventType;
public String getOrderId() { return orderId; }
public void setOrderId(String orderId) { this.orderId = orderId; }
public String getEventType() { return eventType; }
public void setEventType(String eventType) { this.eventType = eventType; }
}五、常见问题与解决方案
5.1 消费者阻塞
问题描述: 消费者处理缓慢导致消息堆积。
解决方案:
yaml
spring:
rabbitmq:
listener:
simple:
concurrency: 5
max-concurrency: 20
prefetch: 105.2 消息重复消费
问题描述: 消息被重复处理。
解决方案:
java
@RabbitListener(queues = "order.queue")
public void handleOrder(Order order) {
if (isProcessed(order.getOrderId())) {
return;
}
processOrder(order);
markAsProcessed(order.getOrderId());
}5.3 队列不存在
问题描述: 监听的队列不存在导致启动失败。
解决方案:
yaml
spring:
rabbitmq:
listener:
simple:
missing-queues-fatal: false六、最佳实践建议
6.1 配置建议
text
配置建议:
├── 合理设置并发消费者数量
├── 配置适当的预取数量
├── 使用手动确认模式
├── 配置错误处理器
└── 启用重试机制6.2 性能建议
text
性能建议:
├── 根据业务设置并发数
├── 控制预取数量避免堆积
├── 异步处理耗时操作
├── 使用批量确认
└── 监控消费速率6.3 可靠性建议
text
可靠性建议:
├── 使用手动确认模式
├── 实现幂等性处理
├── 配置死信队列
├── 处理异常情况
└── 监控消费者状态