Skip to content

注解驱动消费者

一、概述

Spring AMQP 提供了基于注解的消息消费方式,通过 @RabbitListener 注解可以简化消费者的开发,实现声明式的消息监听。

1.1 注解架构

┌─────────────────────────────────────────────────────────────────────┐
│                    注解驱动消费者架构                                 │
├─────────────────────────────────────────────────────────────────────┤
│                                                                     │
│  ┌─────────────────────────────────────────────────────────────┐   │
│  │                    Spring Application                        │   │
│  │                                                              │   │
│  │  ┌─────────────────────────────────────────────────────┐   │   │
│  │  │              @RabbitListener 注解                     │   │   │
│  │  │  ├── queues: 监听队列                                │   │   │
│  │  │  ├── bindings: 绑定声明                              │   │   │
│  │  │  ├── containerFactory: 容器工厂                      │   │   │
│  │  │  ├── ackMode: 确认模式                               │   │   │
│  │  │  └── concurrency: 并发数                             │   │   │
│  │  └─────────────────────────────────────────────────────┘   │   │
│  │                          │                                  │   │
│  │                          ▼                                  │   │
│  │  ┌─────────────────────────────────────────────────────┐   │   │
│  │  │           RabbitListenerAnnotationBeanPostProcessor  │   │   │
│  │  │                    (注解处理器)                       │   │   │
│  │  └─────────────────────────────────────────────────────┘   │   │
│  │                          │                                  │   │
│  │                          ▼                                  │   │
│  │  ┌─────────────────────────────────────────────────────┐   │   │
│  │  │           SimpleMessageListenerContainer             │   │   │
│  │  │                    (监听容器)                         │   │   │
│  │  └─────────────────────────────────────────────────────┘   │   │
│  └──────────────────────────────────────────────────────────────┘   │
│                                 │                                   │
│                                 ▼                                   │
│  ┌─────────────────────────────────────────────────────────────┐   │
│  │                        RabbitMQ                              │   │
│  └─────────────────────────────────────────────────────────────┘   │
│                                                                     │
└─────────────────────────────────────────────────────────────────────┘

1.2 核心注解

text
┌─────────────────────────────────────────────────────────────────────┐
│                     核心注解说明                                     │
├───────────────────────┬─────────────────────────────────────────────┤
│         注解          │                    说明                      │
├───────────────────────┼─────────────────────────────────────────────┤
│ @RabbitListener       │ 声明消息监听方法                            │
│                       │ 可配置队列、绑定、容器工厂等                 │
├───────────────────────┼─────────────────────────────────────────────┤
│ @QueueBinding         │ 声明队列绑定                                │
│                       │ 包含队列、交换器、路由键                     │
├───────────────────────┼─────────────────────────────────────────────┤
│ @Queue                │ 声明队列                                    │
│                       │ 可配置队列属性                              │
├───────────────────────┼─────────────────────────────────────────────┤
│ @Exchange             │ 声明交换器                                  │
│                       │ 可配置交换器类型和属性                       │
├───────────────────────┼─────────────────────────────────────────────┤
│ @RabbitHandler        │ 多方法消息处理器                            │
│                       │ 根据消息类型路由到不同方法                   │
└───────────────────────┴─────────────────────────────────────────────┘

二、核心知识点

2.1 @RabbitListener 基础

2.1.1 监听单个队列

java
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class SimpleConsumer {
    
    @RabbitListener(queues = "order.queue")
    public void handleOrder(Order order) {
        System.out.println("收到订单: " + order.getOrderId());
    }
    
    @RabbitListener(queues = "payment.queue")
    public void handlePayment(String message) {
        System.out.println("收到支付消息: " + message);
    }
    
    @RabbitListener(queues = "notification.queue")
    public void handleNotification(byte[] message) {
        System.out.println("收到通知: " + new String(message));
    }
}

2.1.2 监听多个队列

java
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class MultiQueueConsumer {
    
    @RabbitListener(queues = {"order.queue", "order.priority.queue"})
    public void handleMultipleQueues(Message message) {
        String queue = message.getMessageProperties().getConsumerQueue();
        String body = new String(message.getBody());
        
        System.out.println("从队列 " + queue + " 收到消息: " + body);
    }
    
    @RabbitListener(queues = {"#{orderQueue.name}", "#{paymentQueue.name}"})
    public void handleWithSpEL(Message message) {
        System.out.println("使用 SpEL 引用队列: " + 
            new String(message.getBody()));
    }
}

2.2 声明式绑定

2.2.1 自动声明队列和绑定

java
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class AutoBindingConsumer {
    
    @RabbitListener(bindings = @QueueBinding(
        value = @Queue(name = "order.created.queue", durable = "true"),
        exchange = @Exchange(name = "order.exchange", type = "topic"),
        key = "order.created"
    ))
    public void handleOrderCreated(Order order) {
        System.out.println("处理订单创建: " + order.getOrderId());
    }
    
    @RabbitListener(bindings = @QueueBinding(
        value = @Queue(
            name = "payment.completed.queue",
            durable = "true",
            arguments = {
                @Argument(name = "x-message-ttl", value = "86400000"),
                @Argument(name = "x-dead-letter-exchange", value = "dlx.exchange")
            }
        ),
        exchange = @Exchange(name = "payment.exchange", type = "direct"),
        key = "payment.completed"
    ))
    public void handlePaymentCompleted(Payment payment) {
        System.out.println("处理支付完成: " + payment.getPaymentId());
    }
    
    @RabbitListener(bindings = {
        @QueueBinding(
            value = @Queue(name = "log.all.queue"),
            exchange = @Exchange(name = "log.exchange", type = "topic"),
            key = "#"
        )
    })
    public void handleAllLogs(LogEntry log) {
        System.out.println("处理日志: " + log);
    }
}

2.2.2 多绑定配置

java
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class MultiBindingConsumer {
    
    @RabbitListener(bindings = {
        @QueueBinding(
            value = @Queue(name = "order.created.queue"),
            exchange = @Exchange(name = "order.exchange"),
            key = "order.created"
        ),
        @QueueBinding(
            value = @Queue(name = "order.updated.queue"),
            exchange = @Exchange(name = "order.exchange"),
            key = "order.updated"
        )
    })
    public void handleOrderEvents(OrderEvent event) {
        System.out.println("处理订单事件: " + event.getEventType());
    }
}

2.3 手动确认模式

java
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import com.rabbitmq.client.Channel;
import java.io.IOException;

@Component
public class ManualAckConsumer {
    
    @RabbitListener(queues = "order.queue", ackMode = "MANUAL")
    public void handleOrderWithManualAck(Message message, Channel channel) 
            throws IOException {
        
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        
        try {
            String body = new String(message.getBody());
            System.out.println("处理消息: " + body);
            
            processMessage(body);
            
            channel.basicAck(deliveryTag, false);
            System.out.println("消息已确认");
            
        } catch (RetryableException e) {
            System.err.println("可重试异常: " + e.getMessage());
            channel.basicNack(deliveryTag, false, true);
            
        } catch (Exception e) {
            System.err.println("处理失败: " + e.getMessage());
            channel.basicNack(deliveryTag, false, false);
        }
    }
    
    private void processMessage(String message) throws Exception {
        System.out.println("处理: " + message);
    }
}

class RetryableException extends Exception {
    public RetryableException(String message) {
        super(message);
    }
}

2.4 @RabbitHandler 多方法处理

java
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@RabbitListener(queues = "multi.type.queue")
public class MultiTypeConsumer {
    
    @RabbitHandler
    public void handleOrder(Order order) {
        System.out.println("处理订单: " + order.getOrderId());
    }
    
    @RabbitHandler
    public void handlePayment(Payment payment) {
        System.out.println("处理支付: " + payment.getPaymentId());
    }
    
    @RabbitHandler
    public void handleUser(User user) {
        System.out.println("处理用户: " + user.getUserId());
    }
    
    @RabbitHandler(isDefault = true)
    public void handleDefault(Object message) {
        System.out.println("处理默认消息: " + message);
    }
}

2.5 并发配置

java
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class ConcurrencyConsumer {
    
    @RabbitListener(queues = "high.throughput.queue", concurrency = "10-50")
    public void handleHighThroughput(Message message) {
        System.out.println("高并发处理: " + new String(message.getBody()) +
            ", 线程: " + Thread.currentThread().getName());
    }
    
    @RabbitListener(
        queues = "ordered.queue",
        concurrency = "1",
        containerFactory = "singleThreadFactory"
    )
    public void handleOrdered(Message message) {
        System.out.println("顺序处理: " + new String(message.getBody()));
    }
    
    @RabbitListener(
        queues = "custom.concurrency.queue",
        concurrency = "#{@concurrencyConfig.min}-#{@concurrencyConfig.max}"
    )
    public void handleWithSpEL(Message message) {
        System.out.println("SpEL 配置并发: " + new String(message.getBody()));
    }
}

2.6 异常处理

java
import org.springframework.amqp.AmqpRejectAndDontRequeueException;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.support.ListenerExecutionFailedException;
import org.springframework.stereotype.Component;

@Component
public class ExceptionHandlingConsumer {
    
    @RabbitListener(queues = "error.handling.queue")
    public void handleWithExceptionHandling(Message message) {
        try {
            processMessage(message);
        } catch (BusinessException e) {
            System.err.println("业务异常,不重试: " + e.getMessage());
            throw new AmqpRejectAndDontRequeueException(e);
        } catch (TemporaryException e) {
            System.err.println("临时异常,重试: " + e.getMessage());
            throw new RuntimeException(e);
        }
    }
    
    private void processMessage(Message message) throws BusinessException, TemporaryException {
        String body = new String(message.getBody());
        
        if (body.contains("business_error")) {
            throw new BusinessException("业务错误");
        }
        if (body.contains("temporary_error")) {
            throw new TemporaryException("临时错误");
        }
        
        System.out.println("处理成功: " + body);
    }
}

class BusinessException extends Exception {
    public BusinessException(String message) { super(message); }
}

class TemporaryException extends Exception {
    public TemporaryException(String message) { super(message); }
}

三、代码示例

3.1 完整注解消费者

java
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;
import com.rabbitmq.client.Channel;
import java.io.IOException;

@Component
public class CompleteAnnotationConsumer {
    
    @RabbitListener(
        bindings = @QueueBinding(
            value = @Queue(
                name = "order.events.queue",
                durable = "true",
                arguments = {
                    @Argument(name = "x-message-ttl", value = "86400000"),
                    @Argument(name = "x-dead-letter-exchange", value = "dlx.exchange"),
                    @Argument(name = "x-dead-letter-routing-key", value = "dead.letter")
                }
            ),
            exchange = @Exchange(
                name = "order.events.exchange",
                type = "topic",
                durable = "true"
            ),
            key = "order.#"
        ),
        containerFactory = "orderListenerFactory",
        ackMode = "MANUAL",
        concurrency = "3-10",
        id = "orderEventListener",
        autoStartup = "true"
    )
    public void handleOrderEvent(Message message, Channel channel) throws IOException {
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        String routingKey = message.getMessageProperties().getReceivedRoutingKey();
        
        try {
            String body = new String(message.getBody());
            System.out.println("收到订单事件 [" + routingKey + "]: " + body);
            
            switch (routingKey) {
                case "order.created":
                    handleOrderCreated(body);
                    break;
                case "order.paid":
                    handleOrderPaid(body);
                    break;
                case "order.shipped":
                    handleOrderShipped(body);
                    break;
                case "order.cancelled":
                    handleOrderCancelled(body);
                    break;
                default:
                    System.out.println("未知事件类型: " + routingKey);
            }
            
            channel.basicAck(deliveryTag, false);
            
        } catch (Exception e) {
            System.err.println("处理失败: " + e.getMessage());
            channel.basicNack(deliveryTag, false, false);
        }
    }
    
    private void handleOrderCreated(String body) {
        System.out.println("处理订单创建: " + body);
    }
    
    private void handleOrderPaid(String body) {
        System.out.println("处理订单支付: " + body);
    }
    
    private void handleOrderShipped(String body) {
        System.out.println("处理订单发货: " + body);
    }
    
    private void handleOrderCancelled(String body) {
        System.out.println("处理订单取消: " + body);
    }
}

3.2 动态队列监听

java
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
import java.util.List;

@Service
public class DynamicQueueConsumer {
    
    @RabbitListener(queues = "#{queueManager.getQueueNames()}")
    public void handleDynamicQueues(Message message) {
        String queue = message.getMessageProperties().getConsumerQueue();
        System.out.println("从动态队列 " + queue + " 收到消息");
    }
}

@Component
class QueueManager {
    
    public List<String> getQueueNames() {
        return List.of("dynamic.queue.1", "dynamic.queue.2", "dynamic.queue.3");
    }
}

3.3 消费者生命周期管理

java
import org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistry;
import org.springframework.stereotype.Service;
import java.util.Set;

@Service
public class ListenerManagementService {
    
    private final RabbitListenerEndpointRegistry registry;
    
    public ListenerManagementService(RabbitListenerEndpointRegistry registry) {
        this.registry = registry;
    }
    
    public void startListener(String id) {
        var container = registry.getListenerContainer(id);
        if (container != null && !container.isRunning()) {
            container.start();
            System.out.println("监听器已启动: " + id);
        }
    }
    
    public void stopListener(String id) {
        var container = registry.getListenerContainer(id);
        if (container != null && container.isRunning()) {
            container.stop();
            System.out.println("监听器已停止: " + id);
        }
    }
    
    public Set<String> getListenerIds() {
        return registry.getListenerContainerIds();
    }
    
    public ListenerStatus getListenerStatus(String id) {
        var container = registry.getListenerContainer(id);
        if (container == null) {
            return null;
        }
        
        return new ListenerStatus(
            id,
            container.isRunning(),
            container.isAutoStartup()
        );
    }
}

record ListenerStatus(String id, boolean running, boolean autoStartup) {}

四、实际应用场景

4.1 订单事件处理

java
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class OrderEventConsumer {
    
    @RabbitListener(bindings = @QueueBinding(
        value = @Queue(name = "inventory.order.queue"),
        exchange = @Exchange(name = "order.exchange", type = "topic"),
        key = "order.created"
    ))
    public void handleOrderCreatedForInventory(OrderEvent event) {
        System.out.println("库存服务处理订单创建: " + event.getOrderId());
        deductInventory(event);
    }
    
    @RabbitListener(bindings = @QueueBinding(
        value = @Queue(name = "notification.order.queue"),
        exchange = @Exchange(name = "order.exchange", type = "topic"),
        key = "order.#"
    ))
    public void handleOrderEventForNotification(OrderEvent event) {
        System.out.println("通知服务处理订单事件: " + event.getEventType());
        sendNotification(event);
    }
    
    @RabbitListener(bindings = @QueueBinding(
        value = @Queue(name = "analytics.order.queue"),
        exchange = @Exchange(name = "order.exchange", type = "topic"),
        key = "order.#"
    ))
    public void handleOrderEventForAnalytics(OrderEvent event) {
        System.out.println("分析服务处理订单事件: " + event.getEventType());
        recordAnalytics(event);
    }
    
    private void deductInventory(OrderEvent event) {
        System.out.println("扣减库存");
    }
    
    private void sendNotification(OrderEvent event) {
        System.out.println("发送通知");
    }
    
    private void recordAnalytics(OrderEvent event) {
        System.out.println("记录分析数据");
    }
}

4.2 日志收集

java
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class LogCollectionConsumer {
    
    @RabbitListener(bindings = @QueueBinding(
        value = @Queue(name = "log.error.queue"),
        exchange = @Exchange(name = "log.exchange", type = "topic"),
        key = "*.error"
    ))
    public void handleErrorLogs(LogEntry log) {
        System.err.println("[ERROR] " + log.getService() + ": " + log.getMessage());
        sendAlert(log);
    }
    
    @RabbitListener(bindings = @QueueBinding(
        value = @Queue(name = "log.all.queue"),
        exchange = @Exchange(name = "log.exchange", type = "topic"),
        key = "#"
    ))
    public void handleAllLogs(LogEntry log) {
        System.out.println("[" + log.getLevel() + "] " + 
            log.getService() + ": " + log.getMessage());
        storeLog(log);
    }
    
    private void sendAlert(LogEntry log) {
        System.out.println("发送告警");
    }
    
    private void storeLog(LogEntry log) {
        System.out.println("存储日志");
    }
}

class LogEntry {
    private String service;
    private String level;
    private String message;
    
    public String getService() { return service; }
    public void setService(String service) { this.service = service; }
    public String getLevel() { return level; }
    public void setLevel(String level) { this.level = level; }
    public String getMessage() { return message; }
    public void setMessage(String message) { this.message = message; }
}

五、常见问题与解决方案

5.1 队列不存在

问题描述: 监听的队列不存在导致启动失败。

解决方案

java
@RabbitListener(queues = "maybe.not.exist.queue", 
                missingQueuesFatal = "false")
public void handle(Message message) {
    System.out.println("收到消息");
}

5.2 消息类型不匹配

问题描述: 消息类型与方法参数类型不匹配。

解决方案

java
@RabbitHandler
public void handleOrder(Order order) {
    System.out.println("处理订单");
}

@RabbitHandler(isDefault = true)
public void handleUnknown(Object message) {
    System.out.println("未知类型: " + message.getClass());
}

5.3 消费者启动顺序

问题描述: 需要控制消费者启动顺序。

解决方案

java
@RabbitListener(queues = "first.queue", autoStartup = "true")
public void handleFirst(Message message) {
    System.out.println("第一个消费者");
}

@RabbitListener(queues = "second.queue", autoStartup = "false")
public void handleSecond(Message message) {
    System.out.println("第二个消费者");
}

六、最佳实践建议

6.1 注解使用建议

text
使用建议:
├── 使用声明式绑定简化配置
├── 合理设置并发数
├── 使用手动确认模式
├── 处理异常情况
└── 为消费者设置 ID

6.2 性能建议

text
性能建议:
├── 根据业务设置并发数
├── 使用合适的容器工厂
├── 批量处理消息
├── 异步处理耗时操作
└── 控制预取数量

6.3 可靠性建议

text
可靠性建议:
├── 使用手动确认模式
├── 实现幂等性处理
├── 配置死信队列
├── 处理异常情况
└── 监控消费者状态

七、相关链接