Skip to content

Java 消费者实现

一、概述

消费者(Consumer)是 RabbitMQ 中接收和处理消息的组件。本文档详细介绍如何使用 Java 客户端实现消息消费者,包括订阅消费、消息确认、消费模式等内容。

1.1 消费者架构

┌─────────────────────────────────────────────────────────────────────┐
│                       消费者架构图                                    │
├─────────────────────────────────────────────────────────────────────┤
│                                                                     │
│  ┌─────────────┐     ┌─────────────┐     ┌─────────────┐          │
│  │   Queue     │ ──► │   Channel   │ ──► │  Consumer   │          │
│  └─────────────┘     └─────────────┘     └──────┬──────┘          │
│                                                  │                  │
│                                                  ▼                  │
│                                          ┌─────────────┐          │
│                                          │ Application │          │
│                                          │  (处理消息)  │          │
│                                          └─────────────┘          │
│                                                                     │
│  消费模式:                                                         │
│  ├── Push 模式:服务端主动推送消息                                  │
│  └── Pull 模式:客户端主动拉取消息                                  │
│                                                                     │
│  消息确认:                                                         │
│  ├── 自动确认:autoAck = true                                       │
│  └── 手动确认:autoAck = false                                      │
│                                                                     │
└─────────────────────────────────────────────────────────────────────┘

1.2 消费方式对比

text
┌─────────────────────────────────────────────────────────────────────┐
│                     消费方式对比                                     │
├───────────────┬─────────────────────────────────────────────────────┤
│     方式      │                    说明                             │
├───────────────┼─────────────────────────────────────────────────────┤
│  Push 模式    │ 服务端主动推送消息到消费者                          │
│  (basicConsume)│ 实时性好,推荐使用                                 │
│               │ 需要注册回调函数                                    │
├───────────────┼─────────────────────────────────────────────────────┤
│  Pull 模式    │ 客户端主动从队列获取消息                            │
│  (basicGet)   │ 适合低频消费场景                                    │
│               │ 每次只获取一条消息                                  │
└───────────────┴─────────────────────────────────────────────────────┘

二、核心知识点

2.1 Push 模式消费

2.1.1 基础消费者

java
import com.rabbitmq.client.*;

public class BasicConsumer {
    
    private final Channel channel;
    private String consumerTag;
    
    public BasicConsumer(Channel channel) {
        this.channel = channel;
    }
    
    public void startConsuming(String queueName) throws Exception {
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            
            System.out.println("收到消息: " + message);
            System.out.println("交换器: " + delivery.getEnvelope().getExchange());
            System.out.println("路由键: " + delivery.getEnvelope().getRoutingKey());
            System.out.println("投递标签: " + delivery.getEnvelope().getDeliveryTag());
            
            delivery.getEnvelope().getDeliveryTag();
            channel.basicAck(deliveryTag, false);
        };
        
        CancelCallback cancelCallback = consumerTag -> {
            System.out.println("消费者被取消: " + consumerTag);
        };
        
        boolean autoAck = false;
        consumerTag = channel.basicConsume(queueName, autoAck, deliverCallback, cancelCallback);
        
        System.out.println("消费者已启动,标签: " + consumerTag);
    }
    
    public void stopConsuming() throws Exception {
        if (consumerTag != null) {
            channel.basicCancel(consumerTag);
            System.out.println("消费者已停止");
        }
    }
}

2.1.2 完整消费者实现

java
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.Map;

public class CompleteConsumer {
    
    private final Channel channel;
    private String consumerTag;
    
    public CompleteConsumer(Channel channel) {
        this.channel = channel;
    }
    
    public void startCompleteConsumer(String queueName) throws Exception {
        DeliverCallback deliverCallback = (tag, delivery) -> {
            try {
                String message = new String(delivery.getBody(), "UTF-8");
                
                AMQP.BasicProperties props = delivery.getProperties();
                Map<String, Object> headers = props.getHeaders();
                
                System.out.println("=== 收到消息 ===");
                System.out.println("消息ID: " + props.getMessageId());
                System.out.println("消息类型: " + props.getType());
                System.out.println("时间戳: " + props.getTimestamp());
                System.out.println("内容类型: " + props.getContentType());
                System.out.println("消息内容: " + message);
                
                if (headers != null) {
                    System.out.println("消息头: " + headers);
                }
                
                processMessage(message);
                
                long deliveryTag = delivery.getEnvelope().getDeliveryTag();
                channel.basicAck(deliveryTag, false);
                
                System.out.println("消息处理完成并确认");
                
            } catch (Exception e) {
                System.err.println("消息处理失败: " + e.getMessage());
                
                long deliveryTag = delivery.getEnvelope().getDeliveryTag();
                boolean requeue = shouldRequeue(e);
                channel.basicNack(deliveryTag, false, requeue);
            }
        };
        
        CancelCallback cancelCallback = tag -> {
            System.out.println("消费者被取消: " + tag);
        };
        
        ConsumerShutdownSignalCallback shutdownCallback = (tag, sig) -> {
            System.out.println("消费者关闭: " + sig.getMessage());
        };
        
        channel.basicQos(10);
        
        boolean autoAck = false;
        consumerTag = channel.basicConsume(
            queueName, 
            autoAck, 
            "my-consumer-tag",
            false,
            false,
            null,
            deliverCallback, 
            cancelCallback,
            shutdownCallback
        );
    }
    
    private void processMessage(String message) throws Exception {
        Thread.sleep(100);
        System.out.println("处理消息: " + message);
    }
    
    private boolean shouldRequeue(Exception e) {
        return !(e instanceof BusinessException);
    }
    
    public void stop() throws Exception {
        if (consumerTag != null) {
            channel.basicCancel(consumerTag);
        }
    }
}

class BusinessException extends Exception {
    public BusinessException(String message) {
        super(message);
    }
}

2.2 Pull 模式消费

java
import com.rabbitmq.client.*;

public class PullConsumer {
    
    private final Channel channel;
    
    public PullConsumer(Channel channel) {
        this.channel = channel;
    }
    
    public void consumeWithGet(String queueName) throws Exception {
        while (true) {
            GetResponse response = channel.basicGet(queueName, false);
            
            if (response == null) {
                System.out.println("队列为空,等待...");
                Thread.sleep(1000);
                continue;
            }
            
            String message = new String(response.getBody(), "UTF-8");
            
            System.out.println("获取消息: " + message);
            System.out.println("消息数量: " + response.getMessageCount());
            
            try {
                processMessage(message);
                
                long deliveryTag = response.getEnvelope().getDeliveryTag();
                channel.basicAck(deliveryTag, false);
                
                System.out.println("消息已确认");
                
            } catch (Exception e) {
                System.err.println("处理失败: " + e.getMessage());
                
                long deliveryTag = response.getEnvelope().getDeliveryTag();
                channel.basicNack(deliveryTag, false, true);
            }
        }
    }
    
    private void processMessage(String message) throws Exception {
        System.out.println("处理: " + message);
    }
}

2.3 消息确认

2.3.1 确认模式

java
import com.rabbitmq.client.*;

public class AckModes {
    
    private final Channel channel;
    
    public AckModes(Channel channel) {
        this.channel = channel;
    }
    
    public void autoAck(String queueName) throws Exception {
        DeliverCallback callback = (tag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println("收到消息(自动确认): " + message);
        };
        
        channel.basicConsume(queueName, true, callback, tag -> {});
    }
    
    public void manualAck(String queueName) throws Exception {
        DeliverCallback callback = (tag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println("收到消息: " + message);
            
            long deliveryTag = delivery.getEnvelope().getDeliveryTag();
            channel.basicAck(deliveryTag, false);
        };
        
        channel.basicConsume(queueName, false, callback, tag -> {});
    }
    
    public void demonstrateAckTypes(long deliveryTag) throws Exception {
        channel.basicAck(deliveryTag, false);
        
        channel.basicAck(deliveryTag, true);
        
        channel.basicNack(deliveryTag, false, false);
        
        channel.basicNack(deliveryTag, true, true);
        
        channel.basicReject(deliveryTag, false);
        
        channel.basicReject(deliveryTag, true);
    }
}

2.3.2 确认策略

text
┌─────────────────────────────────────────────────────────────────────┐
│                     消息确认策略                                     │
├───────────────┬─────────────────────────────────────────────────────┤
│     方法      │                    说明                             │
├───────────────┼─────────────────────────────────────────────────────┤
│  basicAck     │ 确认消息成功处理                                    │
│               │ multiple=false: 确认单条消息                        │
│               │ multiple=true: 确认该标签及之前的所有消息           │
├───────────────┼─────────────────────────────────────────────────────┤
│  basicNack    │ 否定确认消息                                        │
│               │ multiple: 是否批量                                  │
│               │ requeue: 是否重新入队                               │
├───────────────┼─────────────────────────────────────────────────────┤
│  basicReject  │ 拒绝单条消息                                        │
│               │ requeue: 是否重新入队                               │
│               │ 比 basicNack 更轻量                                 │
├───────────────┼─────────────────────────────────────────────────────┤
│  basicRecover │ 重新投递所有未确认的消息                            │
│               │ requeue=true: 重新入队后投递                        │
│               │ requeue=false: 尝试投递给同一消费者                 │
└───────────────┴─────────────────────────────────────────────────────┘

2.4 QoS 设置

java
import com.rabbitmq.client.*;

public class QosDemo {
    
    private final Channel channel;
    
    public QosDemo(Channel channel) {
        this.channel = channel;
    }
    
    public void setQos() throws Exception {
        channel.basicQos(10);
        
        channel.basicQos(20, true);
        
        channel.basicQos(5, 10, false);
        
        System.out.println("QoS 设置完成");
    }
    
    public void demonstrateQosEffect(String queueName) throws Exception {
        channel.basicQos(5);
        
        int[] processingCount = {0};
        
        DeliverCallback callback = (tag, delivery) -> {
            processingCount[0]++;
            System.out.println("处理中消息数: " + processingCount[0]);
            
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            
            long deliveryTag = delivery.getEnvelope().getDeliveryTag();
            channel.basicAck(deliveryTag, false);
            
            processingCount[0]--;
        };
        
        channel.basicConsume(queueName, false, callback, tag -> {});
    }
}

2.5 消费者标签

java
import com.rabbitmq.client.*;

public class ConsumerTagDemo {
    
    private final Channel channel;
    
    public ConsumerTagDemo(Channel channel) {
        this.channel = channel;
    }
    
    public void useCustomTag(String queueName) throws Exception {
        String customTag = "consumer-" + System.currentTimeMillis();
        
        DeliverCallback callback = (tag, delivery) -> {
            System.out.println("消费者标签: " + tag);
            System.out.println("消息: " + new String(delivery.getBody()));
            
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        };
        
        String returnedTag = channel.basicConsume(
            queueName, 
            false, 
            customTag,
            callback, 
            tag -> System.out.println("取消: " + tag)
        );
        
        System.out.println("返回的标签: " + returnedTag);
    }
    
    public void cancelConsumer(String consumerTag) throws Exception {
        channel.basicCancel(consumerTag);
        System.out.println("消费者已取消: " + consumerTag);
    }
}

2.6 消费者取消通知

java
import com.rabbitmq.client.*;

public class ConsumerCancelDemo {
    
    public void setupCancelNotification(Channel channel, String queueName) 
            throws Exception {
        
        DeliverCallback deliverCallback = (tag, delivery) -> {
            System.out.println("收到消息: " + new String(delivery.getBody()));
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        };
        
        CancelCallback cancelCallback = tag -> {
            System.out.println("消费者被取消,标签: " + tag);
            System.out.println("可能原因:队列被删除");
        };
        
        String consumerTag = channel.basicConsume(
            queueName, 
            false, 
            deliverCallback, 
            cancelCallback
        );
    }
}

三、代码示例

3.1 完整消费者示例

java
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

public class CompleteConsumerDemo {
    
    private final Connection connection;
    private final ExecutorService executorService;
    private final AtomicInteger processedCount = new AtomicInteger(0);
    private volatile boolean running = true;
    
    public CompleteConsumerDemo(String host, int port, String username, 
                                String password, String vhost) throws Exception {
        
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(host);
        factory.setPort(port);
        factory.setUsername(username);
        factory.setPassword(password);
        factory.setVirtualHost(vhost);
        factory.setAutomaticRecoveryEnabled(true);
        factory.setNetworkRecoveryInterval(5000);
        
        executorService = Executors.newFixedThreadPool(10);
        factory.setSharedExecutor(executorService);
        
        connection = factory.newConnection("consumer-app");
    }
    
    public void startConsumer(String queueName, int prefetchCount) throws Exception {
        Channel channel = connection.createChannel();
        
        channel.basicQos(prefetchCount);
        
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            executorService.submit(() -> {
                try {
                    processMessage(delivery);
                    
                    channel.basicAck(
                        delivery.getEnvelope().getDeliveryTag(), 
                        false
                    );
                    
                    int count = processedCount.incrementAndGet();
                    if (count % 100 == 0) {
                        System.out.println("已处理消息数: " + count);
                    }
                    
                } catch (Exception e) {
                    System.err.println("处理失败: " + e.getMessage());
                    
                    try {
                        channel.basicNack(
                            delivery.getEnvelope().getDeliveryTag(),
                            false,
                            shouldRequeue(e)
                        );
                    } catch (IOException ex) {
                        ex.printStackTrace();
                    }
                }
            });
        };
        
        CancelCallback cancelCallback = consumerTag -> {
            System.out.println("消费者被取消: " + consumerTag);
        };
        
        ConsumerShutdownSignalCallback shutdownCallback = (consumerTag, sig) -> {
            System.err.println("消费者关闭: " + sig.getMessage());
            if (!sig.isInitiatedByApplication()) {
                System.err.println("异常关闭,尝试恢复...");
            }
        };
        
        channel.basicConsume(
            queueName, 
            false, 
            "consumer-" + System.currentTimeMillis(),
            deliverCallback, 
            cancelCallback,
            shutdownCallback
        );
        
        System.out.println("消费者已启动,队列: " + queueName);
    }
    
    private void processMessage(Delivery delivery) throws Exception {
        String body = new String(delivery.getBody(), "UTF-8");
        
        AMQP.BasicProperties props = delivery.getProperties();
        String messageId = props.getMessageId();
        String messageType = props.getType();
        Map<String, Object> headers = props.getHeaders();
        
        System.out.println("处理消息:");
        System.out.println("  ID: " + messageId);
        System.out.println("  类型: " + messageType);
        System.out.println("  内容: " + body);
        
        switch (messageType != null ? messageType : "default") {
            case "order.created":
                processOrderCreated(body);
                break;
            case "order.cancelled":
                processOrderCancelled(body);
                break;
            default:
                processDefault(body);
        }
    }
    
    private void processOrderCreated(String body) {
        System.out.println("处理订单创建: " + body);
    }
    
    private void processOrderCancelled(String body) {
        System.out.println("处理订单取消: " + body);
    }
    
    private void processDefault(String body) {
        System.out.println("处理默认消息: " + body);
    }
    
    private boolean shouldRequeue(Exception e) {
        if (e instanceof RetryableException) {
            return true;
        }
        return false;
    }
    
    public void shutdown() {
        running = false;
        
        try {
            if (connection != null && connection.isOpen()) {
                connection.close(5000);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        
        executorService.shutdown();
        try {
            if (!executorService.awaitTermination(10, TimeUnit.SECONDS)) {
                executorService.shutdownNow();
            }
        } catch (InterruptedException e) {
            executorService.shutdownNow();
        }
        
        System.out.println("消费者已关闭");
    }
    
    public static void main(String[] args) throws Exception {
        CompleteConsumerDemo consumer = new CompleteConsumerDemo(
            "localhost", 5672, "guest", "guest", "/"
        );
        
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            consumer.shutdown();
        }));
        
        consumer.startConsumer("orders.queue", 10);
        
        Thread.sleep(Long.MAX_VALUE);
    }
}

class RetryableException extends Exception {
    public RetryableException(String message) {
        super(message);
    }
}

3.2 多线程消费者

java
import com.rabbitmq.client.*;
import java.util.concurrent.*;

public class MultiThreadConsumer {
    
    private final Connection connection;
    private final ExecutorService executorService;
    
    public MultiThreadConsumer(Connection connection, int threadCount) {
        this.connection = connection;
        this.executorService = Executors.newFixedThreadPool(threadCount);
    }
    
    public void startConsumers(String queueName, int consumerCount, int prefetchCount) 
            throws Exception {
        
        for (int i = 0; i < consumerCount; i++) {
            final int consumerIndex = i;
            
            executorService.submit(() -> {
                try {
                    Channel channel = connection.createChannel();
                    channel.basicQos(prefetchCount);
                    
                    String consumerTag = "consumer-" + consumerIndex + "-" + 
                        Thread.currentThread().getId();
                    
                    DeliverCallback callback = (tag, delivery) -> {
                        String message = new String(delivery.getBody(), "UTF-8");
                        System.out.println("[Consumer-" + consumerIndex + 
                            "] 处理消息: " + message);
                        
                        try {
                            Thread.sleep(100);
                        } catch (InterruptedException e) {
                            Thread.currentThread().interrupt();
                        }
                        
                        channel.basicAck(
                            delivery.getEnvelope().getDeliveryTag(), 
                            false
                        );
                    };
                    
                    channel.basicConsume(queueName, false, consumerTag, 
                        callback, tag -> {});
                    
                    System.out.println("消费者 " + consumerIndex + " 已启动");
                    
                } catch (Exception e) {
                    e.printStackTrace();
                }
            });
        }
    }
    
    public void shutdown() {
        executorService.shutdown();
        try {
            if (!executorService.awaitTermination(30, TimeUnit.SECONDS)) {
                executorService.shutdownNow();
            }
        } catch (InterruptedException e) {
            executorService.shutdownNow();
        }
    }
}

3.3 优先级消费者

java
import com.rabbitmq.client.*;
import java.util.concurrent.*;

public class PriorityConsumer {
    
    private final Channel channel;
    private final PriorityBlockingQueue<PriorityMessage> messageQueue;
    private final ExecutorService executorService;
    
    public PriorityConsumer(Channel channel, int workerCount) {
        this.channel = channel;
        this.messageQueue = new PriorityBlockingQueue<>(1000, 
            (a, b) -> b.priority - a.priority);
        this.executorService = Executors.newFixedThreadPool(workerCount);
    }
    
    public void start(String queueName) throws Exception {
        channel.basicQos(100);
        
        DeliverCallback callback = (tag, delivery) -> {
            int priority = 0;
            Map<String, Object> headers = delivery.getProperties().getHeaders();
            if (headers != null && headers.containsKey("priority")) {
                priority = ((Number) headers.get("priority")).intValue();
            }
            
            PriorityMessage msg = new PriorityMessage(
                delivery,
                priority,
                delivery.getEnvelope().getDeliveryTag()
            );
            
            messageQueue.offer(msg);
        };
        
        channel.basicConsume(queueName, false, callback, tag -> {});
        
        for (int i = 0; i < 5; i++) {
            executorService.submit(this::processMessages);
        }
        
        System.out.println("优先级消费者已启动");
    }
    
    private void processMessages() {
        while (!Thread.currentThread().isInterrupted()) {
            try {
                PriorityMessage msg = messageQueue.take();
                
                System.out.println("处理消息 [优先级: " + msg.priority + 
                    "]: " + new String(msg.delivery.getBody()));
                
                channel.basicAck(msg.deliveryTag, false);
                
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                break;
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
    
    public void shutdown() {
        executorService.shutdown();
    }
    
    static class PriorityMessage {
        final Delivery delivery;
        final int priority;
        final long deliveryTag;
        
        PriorityMessage(Delivery delivery, int priority, long deliveryTag) {
            this.delivery = delivery;
            this.priority = priority;
            this.deliveryTag = deliveryTag;
        }
    }
}

四、实际应用场景

4.1 订单处理消费者

java
import com.rabbitmq.client.*;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.Map;

public class OrderConsumer {
    
    private final Channel channel;
    private final ObjectMapper objectMapper = new ObjectMapper();
    
    public OrderConsumer(Channel channel) {
        this.channel = channel;
    }
    
    public void start(String queueName) throws Exception {
        channel.basicQos(10);
        
        DeliverCallback callback = (tag, delivery) -> {
            try {
                String messageType = delivery.getProperties().getType();
                String body = new String(delivery.getBody(), "UTF-8");
                
                Map<String, Object> order = objectMapper.readValue(body, Map.class);
                
                switch (messageType) {
                    case "order.created":
                        handleOrderCreated(order);
                        break;
                    case "order.paid":
                        handleOrderPaid(order);
                        break;
                    case "order.shipped":
                        handleOrderShipped(order);
                        break;
                    case "order.cancelled":
                        handleOrderCancelled(order);
                        break;
                    default:
                        System.err.println("未知消息类型: " + messageType);
                }
                
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
                
            } catch (Exception e) {
                System.err.println("处理订单失败: " + e.getMessage());
                channel.basicNack(
                    delivery.getEnvelope().getDeliveryTag(), 
                    false, 
                    true
                );
            }
        };
        
        channel.basicConsume(queueName, false, callback, tag -> {});
    }
    
    private void handleOrderCreated(Map<String, Object> order) {
        System.out.println("处理订单创建: " + order.get("orderId"));
    }
    
    private void handleOrderPaid(Map<String, Object> order) {
        System.out.println("处理订单支付: " + order.get("orderId"));
    }
    
    private void handleOrderShipped(Map<String, Object> order) {
        System.out.println("处理订单发货: " + order.get("orderId"));
    }
    
    private void handleOrderCancelled(Map<String, Object> order) {
        System.out.println("处理订单取消: " + order.get("orderId"));
    }
}

4.2 日志处理消费者

java
import com.rabbitmq.client.*;
import java.util.Map;

public class LogConsumer {
    
    private final Channel channel;
    
    public LogConsumer(Channel channel) {
        this.channel = channel;
    }
    
    public void start(String queueName) throws Exception {
        DeliverCallback callback = (tag, delivery) -> {
            String body = new String(delivery.getBody(), "UTF-8");
            String routingKey = delivery.getEnvelope().getRoutingKey();
            
            String[] parts = routingKey.split("\\.");
            String service = parts.length > 0 ? parts[0] : "unknown";
            String level = parts.length > 1 ? parts[1] : "info";
            
            processLog(service, level, body);
            
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        };
        
        channel.basicConsume(queueName, true, callback, tag -> {});
    }
    
    private void processLog(String service, String level, String message) {
        String timestamp = java.time.LocalDateTime.now().toString();
        
        System.out.printf("[%s] [%s] [%s] %s%n", 
            timestamp, level.toUpperCase(), service, message);
        
        if ("error".equalsIgnoreCase(level)) {
            sendAlert(service, message);
        }
    }
    
    private void sendAlert(String service, String message) {
        System.err.println("发送告警: " + service + " - " + message);
    }
}

五、常见问题与解决方案

5.1 消息重复消费

问题描述: 消息被重复处理。

解决方案

java
import java.util.concurrent.ConcurrentHashMap;
import java.util.Set;

public class DeduplicationConsumer {
    
    private final Set<String> processedMessages = ConcurrentHashMap.newKeySet();
    
    public void processWithDeduplication(Delivery delivery) throws Exception {
        String messageId = delivery.getProperties().getMessageId();
        
        if (messageId != null && processedMessages.contains(messageId)) {
            System.out.println("重复消息,跳过: " + messageId);
            return;
        }
        
        processMessage(delivery);
        
        if (messageId != null) {
            processedMessages.add(messageId);
        }
    }
}

5.2 消息处理超时

问题描述: 消息处理时间过长。

解决方案

java
import java.util.concurrent.*;

public class TimeoutConsumer {
    
    private final ExecutorService executor = Executors.newCachedThreadPool();
    
    public void processWithTimeout(Delivery delivery, long timeoutSeconds) 
            throws Exception {
        
        Future<?> future = executor.submit(() -> {
            processMessage(delivery);
        });
        
        try {
            future.get(timeoutSeconds, TimeUnit.SECONDS);
        } catch (TimeoutException e) {
            future.cancel(true);
            throw new RuntimeException("处理超时");
        }
    }
}

5.3 消费者阻塞

问题描述: 消费者处理速度慢导致消息堆积。

解决方案

java
public void handleBackpressure(Channel channel, Delivery delivery) throws Exception {
    int messageCount = getQueueMessageCount(channel);
    
    if (messageCount > 10000) {
        Thread.sleep(100);
    }
    
    processMessage(delivery);
}

private int getQueueMessageCount(Channel channel) throws Exception {
    AMQP.Queue.DeclareOk declareOk = channel.queueDeclarePassive("my.queue");
    return declareOk.getMessageCount();
}

六、最佳实践建议

6.1 消费者设计

text
最佳实践:
├── 使用手动确认模式
├── 合理设置 QoS 预取数量
├── 实现幂等性处理
├── 处理异常情况
├── 监控消费速率
└── 实现优雅关闭

6.2 性能优化

text
性能建议:
├── 使用多线程处理
├── 批量确认消息
├── 异步处理消息
├── 控制预取数量
└── 避免阻塞操作

6.3 可靠性保障

text
可靠性建议:
├── 正确处理确认和拒绝
├── 实现重试机制
├── 使用死信队列
├── 监控消费者状态
└── 处理网络中断

七、相关链接