Skip to content

消息监听容器

一、概述

消息监听容器(Message Listener Container)是 Spring AMQP 提供的核心组件,用于管理消息消费者的生命周期,处理消息的并发消费、错误处理和重试机制。

1.1 容器架构

┌─────────────────────────────────────────────────────────────────────┐
│                    消息监听容器架构                                   │
├─────────────────────────────────────────────────────────────────────┤
│                                                                     │
│  ┌─────────────────────────────────────────────────────────────┐   │
│  │              SimpleMessageListenerContainer                  │   │
│  │                                                              │   │
│  │  ┌─────────────────────────────────────────────────────┐   │   │
│  │  │                 消费者线程池                          │   │   │
│  │  │  ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐   │   │   │
│  │  │  │Consumer1│ │Consumer2│ │Consumer3│ │ConsumerN│   │   │   │
│  │  │  │ Thread  │ │ Thread  │ │ Thread  │ │ Thread  │   │   │   │
│  │  │  └────┬────┘ └────┬────┘ └────┬────┘ └────┬────┘   │   │   │
│  │  │       │           │           │           │         │   │   │
│  │  └───────┼───────────┼───────────┼───────────┼─────────┘   │   │
│  │          │           │           │           │             │   │
│  │          ▼           ▼           ▼           ▼             │   │
│  │  ┌─────────────────────────────────────────────────────┐   │   │
│  │  │              MessageListener / @RabbitListener       │   │   │
│  │  └─────────────────────────────────────────────────────┘   │   │
│  │                                                              │   │
│  │  配置项:                                                    │   │
│  │  ├── concurrency: 并发消费者数量                            │   │
│  │  ├── prefetchCount: 预取数量                                │   │
│  │  ├── acknowledgeMode: 确认模式                              │   │
│  │  └── errorHandler: 错误处理器                               │   │
│  └──────────────────────────────────────────────────────────────┘   │
│                                 │                                   │
│                                 ▼                                   │
│  ┌─────────────────────────────────────────────────────────────┐   │
│  │                        Queue                                 │   │
│  └─────────────────────────────────────────────────────────────┘   │
│                                                                     │
└─────────────────────────────────────────────────────────────────────┘

1.2 容器类型

text
┌─────────────────────────────────────────────────────────────────────┐
│                     监听容器类型                                     │
├───────────────────────┬─────────────────────────────────────────────┤
│         类型          │                    说明                      │
├───────────────────────┼─────────────────────────────────────────────┤
│ SimpleMessageListener │ 简单消息监听容器                            │
│ Container             │ 支持并发消费、事务、重试                    │
├───────────────────────┼─────────────────────────────────────────────┤
│ DirectMessageListener │ 直接消息监听容器                            │
│ Container             │ 更底层的控制,适合高级场景                  │
└───────────────────────┴─────────────────────────────────────────────┘

二、核心知识点

2.1 基础配置

2.1.1 Java 配置

java
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class ListenerContainerConfig {
    
    @Bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(
            ConnectionFactory connectionFactory) {
        
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        
        factory.setConcurrentConsumers(3);
        factory.setMaxConcurrentConsumers(10);
        factory.setPrefetchCount(10);
        factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        
        factory.setAutoStartup(true);
        factory.setDefaultRequeueRejected(false);
        
        return factory;
    }
    
    @Bean
    public SimpleMessageListenerContainer messageListenerContainer(
            ConnectionFactory connectionFactory) {
        
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.setQueueNames("order.queue", "payment.queue");
        
        container.setMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message message) {
                System.out.println("收到消息: " + new String(message.getBody()));
            }
        });
        
        container.setConcurrentConsumers(3);
        container.setMaxConcurrentConsumers(10);
        container.setPrefetchCount(10);
        container.setAcknowledgeMode(AcknowledgeMode.AUTO);
        
        return container;
    }
}

2.1.2 YAML 配置

yaml
spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: manual
        concurrency: 3
        max-concurrency: 10
        prefetch: 10
        auto-startup: true
        default-requeue-rejected: false
        missing-queues-fatal: false
        idle-event-interval: 60000

2.2 并发消费

java
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class ConcurrentConsumer {
    
    @RabbitListener(queues = "order.queue", concurrency = "3-10")
    public void handleOrder(Order order) {
        System.out.println("处理订单: " + order.getOrderId() + 
            ", 线程: " + Thread.currentThread().getName());
    }
    
    @RabbitListener(queues = "payment.queue", containerFactory = "highConcurrencyFactory")
    public void handlePayment(Payment payment) {
        System.out.println("处理支付: " + payment.getPaymentId());
    }
}

@Configuration
class HighConcurrencyConfig {
    
    @Bean
    public SimpleRabbitListenerContainerFactory highConcurrencyFactory(
            ConnectionFactory connectionFactory) {
        
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setConcurrentConsumers(10);
        factory.setMaxConcurrentConsumers(50);
        factory.setPrefetchCount(20);
        
        return factory;
    }
}

2.3 确认模式

java
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import com.rabbitmq.client.Channel;
import java.io.IOException;

@Component
public class AcknowledgeModeConsumer {
    
    @RabbitListener(queues = "auto.ack.queue", ackMode = "AUTO")
    public void handleAutoAck(Message message) {
        System.out.println("自动确认模式: " + new String(message.getBody()));
    }
    
    @RabbitListener(queues = "manual.ack.queue", ackMode = "MANUAL")
    public void handleManualAck(Message message, Channel channel) throws IOException {
        try {
            String body = new String(message.getBody());
            System.out.println("手动确认模式: " + body);
            
            processMessage(body);
            
            long deliveryTag = message.getMessageProperties().getDeliveryTag();
            channel.basicAck(deliveryTag, false);
            
        } catch (Exception e) {
            long deliveryTag = message.getMessageProperties().getDeliveryTag();
            channel.basicNack(deliveryTag, false, false);
        }
    }
    
    private void processMessage(String message) throws Exception {
        System.out.println("处理消息: " + message);
    }
}

2.4 错误处理

java
import org.springframework.amqp.rabbit.listener.ConditionalRejectingErrorHandler;
import org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException;
import org.springframework.stereotype.Component;
import org.springframework.util.ErrorHandler;

@Component
public class CustomErrorHandler implements ErrorHandler {
    
    @Override
    public void handleError(Throwable t) {
        if (t instanceof ListenerExecutionFailedException) {
            ListenerExecutionFailedException ex = (ListenerExecutionFailedException) t;
            
            System.err.println("消息处理失败:");
            System.err.println("  队列: " + ex.getFailedMessage().getMessageProperties().getConsumerQueue());
            System.err.println("  消息: " + new String(ex.getFailedMessage().getBody()));
            System.err.println("  异常: " + ex.getCause().getMessage());
            
            sendToDeadLetterQueue(ex.getFailedMessage());
        }
    }
    
    private void sendToDeadLetterQueue(org.springframework.amqp.core.Message message) {
        System.out.println("发送到死信队列");
    }
}

@Configuration
class ErrorHandlerConfig {
    
    @Bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(
            ConnectionFactory connectionFactory,
            CustomErrorHandler errorHandler) {
        
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setErrorHandler(errorHandler);
        
        return factory;
    }
}

2.5 重试机制

java
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.retry.RejectAndDontRequeueRecoverer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.retry.backoff.ExponentialBackOffPolicy;
import org.springframework.retry.policy.SimpleRetryPolicy;
import org.springframework.retry.support.RetryTemplate;

@Configuration
public class RetryConfig {
    
    @Bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(
            ConnectionFactory connectionFactory) {
        
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        
        RetryTemplate retryTemplate = new RetryTemplate();
        
        ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
        backOffPolicy.setInitialInterval(1000);
        backOffPolicy.setMultiplier(2.0);
        backOffPolicy.setMaxInterval(10000);
        retryTemplate.setBackOffPolicy(backOffPolicy);
        
        SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
        retryPolicy.setMaxAttempts(3);
        retryTemplate.setRetryPolicy(retryPolicy);
        
        factory.setRetryTemplate(retryTemplate);
        factory.setRecoveryCallback(context -> {
            System.err.println("重试失败,执行恢复操作");
            return null;
        });
        
        return factory;
    }
}

2.6 消费者标签

java
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class ConsumerTagDemo {
    
    @RabbitListener(queues = "order.queue", id = "orderConsumer")
    public void handleOrder(Order order) {
        System.out.println("处理订单: " + order.getOrderId());
    }
    
    @RabbitListener(queues = "payment.queue", id = "paymentConsumer", 
                    autoStartup = "false")
    public void handlePayment(Payment payment) {
        System.out.println("处理支付: " + payment.getPaymentId());
    }
}

三、代码示例

3.1 完整监听容器配置

java
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistry;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.retry.backoff.ExponentialBackOffPolicy;
import org.springframework.retry.policy.SimpleRetryPolicy;
import org.springframework.retry.support.RetryTemplate;

@Configuration
@EnableRabbit
public class CompleteListenerContainerConfig {
    
    @Bean
    public MessageConverter jsonMessageConverter() {
        return new Jackson2JsonMessageConverter();
    }
    
    @Bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(
            ConnectionFactory connectionFactory,
            MessageConverter jsonMessageConverter) {
        
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setMessageConverter(jsonMessageConverter);
        
        factory.setConcurrentConsumers(3);
        factory.setMaxConcurrentConsumers(10);
        factory.setPrefetchCount(10);
        
        factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        factory.setDefaultRequeueRejected(false);
        
        factory.setAutoStartup(true);
        factory.setMissingQueuesFatal(false);
        factory.setIdleEventInterval(60000L);
        
        RetryTemplate retryTemplate = createRetryTemplate();
        factory.setRetryTemplate(retryTemplate);
        
        factory.setErrorHandler(t -> {
            System.err.println("消息处理错误: " + t.getMessage());
        });
        
        return factory;
    }
    
    private RetryTemplate createRetryTemplate() {
        RetryTemplate retryTemplate = new RetryTemplate();
        
        ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
        backOffPolicy.setInitialInterval(1000);
        backOffPolicy.setMultiplier(2.0);
        backOffPolicy.setMaxInterval(30000);
        retryTemplate.setBackOffPolicy(backOffPolicy);
        
        SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
        retryPolicy.setMaxAttempts(3);
        retryTemplate.setRetryPolicy(retryPolicy);
        
        return retryTemplate;
    }
    
    @Bean
    public SimpleMessageListenerContainer orderListenerContainer(
            ConnectionFactory connectionFactory,
            OrderMessageListener orderMessageListener) {
        
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.setQueueNames("order.queue");
        
        MessageListenerAdapter adapter = new MessageListenerAdapter(orderMessageListener, "handleMessage");
        adapter.setMessageConverter(jsonMessageConverter());
        container.setMessageListener(adapter);
        
        container.setConcurrentConsumers(5);
        container.setMaxConcurrentConsumers(15);
        container.setPrefetchCount(20);
        container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        
        return container;
    }
}

@Component
class OrderMessageListener {
    
    public void handleMessage(Order order, Message message, 
                             com.rabbitmq.client.Channel channel) throws Exception {
        try {
            System.out.println("处理订单: " + order.getOrderId());
            
            long deliveryTag = message.getMessageProperties().getDeliveryTag();
            channel.basicAck(deliveryTag, false);
            
        } catch (Exception e) {
            long deliveryTag = message.getMessageProperties().getDeliveryTag();
            channel.basicNack(deliveryTag, false, true);
        }
    }
}

3.2 动态容器管理

java
import org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistry;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.stereotype.Service;
import java.util.Set;

@Service
public class ContainerManagementService {
    
    private final RabbitListenerEndpointRegistry registry;
    
    public ContainerManagementService(RabbitListenerEndpointRegistry registry) {
        this.registry = registry;
    }
    
    public void startContainer(String id) {
        SimpleMessageListenerContainer container = 
            (SimpleMessageListenerContainer) registry.getListenerContainer(id);
        if (container != null && !container.isRunning()) {
            container.start();
            System.out.println("容器已启动: " + id);
        }
    }
    
    public void stopContainer(String id) {
        SimpleMessageListenerContainer container = 
            (SimpleMessageListenerContainer) registry.getListenerContainer(id);
        if (container != null && container.isRunning()) {
            container.stop();
            System.out.println("容器已停止: " + id);
        }
    }
    
    public void pauseContainer(String id) {
        SimpleMessageListenerContainer container = 
            (SimpleMessageListenerContainer) registry.getListenerContainer(id);
        if (container != null) {
            container.stop(() -> System.out.println("容器已暂停: " + id));
        }
    }
    
    public void resumeContainer(String id) {
        SimpleMessageListenerContainer container = 
            (SimpleMessageListenerContainer) registry.getListenerContainer(id);
        if (container != null) {
            container.start();
            System.out.println("容器已恢复: " + id);
        }
    }
    
    public Set<String> getContainerIds() {
        return registry.getListenerContainerIds();
    }
    
    public ContainerStatus getContainerStatus(String id) {
        SimpleMessageListenerContainer container = 
            (SimpleMessageListenerContainer) registry.getListenerContainer(id);
        
        if (container == null) {
            return null;
        }
        
        return new ContainerStatus(
            id,
            container.isRunning(),
            container.getActiveConsumerCount(),
            container.getConcurrentConsumers()
        );
    }
}

class ContainerStatus {
    private final String id;
    private final boolean running;
    private final int activeConsumerCount;
    private final int concurrentConsumers;
    
    public ContainerStatus(String id, boolean running, int activeConsumerCount, 
                          int concurrentConsumers) {
        this.id = id;
        this.running = running;
        this.activeConsumerCount = activeConsumerCount;
        this.concurrentConsumers = concurrentConsumers;
    }
    
    public String getId() { return id; }
    public boolean isRunning() { return running; }
    public int getActiveConsumerCount() { return activeConsumerCount; }
    public int getConcurrentConsumers() { return concurrentConsumers; }
}

3.3 多队列监听

java
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import com.rabbitmq.client.Channel;
import java.io.IOException;

@Component
public class MultiQueueConsumer {
    
    @RabbitListener(queues = {"order.queue", "order.priority.queue"})
    public void handleMultipleQueues(Message message, Channel channel) throws IOException {
        String queue = message.getMessageProperties().getConsumerQueue();
        String body = new String(message.getBody());
        
        System.out.println("从队列 " + queue + " 收到消息: " + body);
        
        try {
            processMessage(queue, body);
            
            long deliveryTag = message.getMessageProperties().getDeliveryTag();
            channel.basicAck(deliveryTag, false);
            
        } catch (Exception e) {
            long deliveryTag = message.getMessageProperties().getDeliveryTag();
            channel.basicNack(deliveryTag, false, false);
        }
    }
    
    private void processMessage(String queue, String body) throws Exception {
        System.out.println("处理来自 " + queue + " 的消息: " + body);
    }
}

四、实际应用场景

4.1 高并发消费

java
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

@Component
public class HighConcurrencyConsumer {
    
    private final ExecutorService executorService = Executors.newFixedThreadPool(20);
    
    @RabbitListener(queues = "high.throughput.queue", 
                    concurrency = "10-50", 
                    containerFactory = "highConcurrencyFactory")
    public void handleHighThroughput(Message message) {
        executorService.submit(() -> {
            try {
                processMessage(message);
            } catch (Exception e) {
                System.err.println("处理失败: " + e.getMessage());
            }
        });
    }
    
    private void processMessage(Message message) {
        System.out.println("处理消息: " + new String(message.getBody()));
    }
}

4.2 顺序消费

java
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.locks.ReentrantLock;

@Component
public class OrderedConsumer {
    
    private final ConcurrentMap<String, ReentrantLock> locks = new ConcurrentHashMap<>();
    
    @RabbitListener(queues = "ordered.queue", concurrency = "1")
    public void handleOrderedMessage(OrderEvent event) {
        String orderId = event.getOrderId();
        
        ReentrantLock lock = locks.computeIfAbsent(orderId, k -> new ReentrantLock());
        
        lock.lock();
        try {
            System.out.println("顺序处理订单事件: " + event);
            processOrderEvent(event);
        } finally {
            lock.unlock();
        }
    }
    
    private void processOrderEvent(OrderEvent event) {
        System.out.println("处理事件: " + event.getEventType());
    }
}

class OrderEvent {
    private String orderId;
    private String eventType;
    
    public String getOrderId() { return orderId; }
    public void setOrderId(String orderId) { this.orderId = orderId; }
    public String getEventType() { return eventType; }
    public void setEventType(String eventType) { this.eventType = eventType; }
}

五、常见问题与解决方案

5.1 消费者阻塞

问题描述: 消费者处理缓慢导致消息堆积。

解决方案

yaml
spring:
  rabbitmq:
    listener:
      simple:
        concurrency: 5
        max-concurrency: 20
        prefetch: 10

5.2 消息重复消费

问题描述: 消息被重复处理。

解决方案

java
@RabbitListener(queues = "order.queue")
public void handleOrder(Order order) {
    if (isProcessed(order.getOrderId())) {
        return;
    }
    
    processOrder(order);
    markAsProcessed(order.getOrderId());
}

5.3 队列不存在

问题描述: 监听的队列不存在导致启动失败。

解决方案

yaml
spring:
  rabbitmq:
    listener:
      simple:
        missing-queues-fatal: false

六、最佳实践建议

6.1 配置建议

text
配置建议:
├── 合理设置并发消费者数量
├── 配置适当的预取数量
├── 使用手动确认模式
├── 配置错误处理器
└── 启用重试机制

6.2 性能建议

text
性能建议:
├── 根据业务设置并发数
├── 控制预取数量避免堆积
├── 异步处理耗时操作
├── 使用批量确认
└── 监控消费速率

6.3 可靠性建议

text
可靠性建议:
├── 使用手动确认模式
├── 实现幂等性处理
├── 配置死信队列
├── 处理异常情况
└── 监控消费者状态

七、相关链接