Skip to content

Java 异常处理

一、概述

在使用 RabbitMQ Java 客户端时,正确处理异常对于构建健壮的消息系统至关重要。本文档详细介绍各种异常类型、处理策略和最佳实践。

1.1 异常层次结构

┌─────────────────────────────────────────────────────────────────────┐
│                     RabbitMQ 异常层次结构                            │
├─────────────────────────────────────────────────────────────────────┤
│                                                                     │
│  ┌─────────────────────────────────────────────────────────────┐   │
│  │                    java.lang.Exception                       │   │
│  └──────────────────────────────┬──────────────────────────────┘   │
│                                 │                                   │
│                                 ▼                                   │
│  ┌─────────────────────────────────────────────────────────────┐   │
│  │              java.io.IOException                             │   │
│  │                 (IO 异常基类)                                 │   │
│  └──────────────────────────────┬──────────────────────────────┘   │
│                                 │                                   │
│         ┌───────────────────────┼───────────────────────┐          │
│         │                       │                       │          │
│         ▼                       ▼                       ▼          │
│  ┌─────────────┐        ┌─────────────┐        ┌─────────────┐    │
│  │ AlreadyCl-  │        │ ShutdownSig-│        │  Protocol   │    │
│  │ osedException│        │ nalException│        │ Exception   │    │
│  │ (已关闭异常) │        │ (关闭信号)   │        │ (协议异常)   │    │
│  └─────────────┘        └─────────────┘        └─────────────┘    │
│                                                                     │
│  其他异常:                                                         │
│  ├── ConnectException - 连接失败                                   │
│  ├── TimeoutException - 超时                                       │
│  ├── InterruptedException - 线程中断                               │
│  └── RuntimeException - 运行时异常                                  │
│                                                                     │
└─────────────────────────────────────────────────────────────────────┘

1.2 异常分类

text
┌─────────────────────────────────────────────────────────────────────┐
│                       异常分类                                       │
├───────────────────┬─────────────────────────────────────────────────┤
│       类型        │                    说明                          │
├───────────────────┼─────────────────────────────────────────────────┤
│  连接异常         │ 连接建立、断开、恢复相关异常                     │
├───────────────────┼─────────────────────────────────────────────────┤
│  通道异常         │ 通道操作相关异常                                │
├───────────────────┼─────────────────────────────────────────────────┤
│  协议异常         │ AMQP 协议相关异常                               │
├───────────────────┼─────────────────────────────────────────────────┤
│  业务异常         │ 消息处理业务逻辑异常                            │
├───────────────────┼─────────────────────────────────────────────────┤
│  资源异常         │ 资源不存在、权限不足等                          │
└───────────────────┴─────────────────────────────────────────────────┘

二、核心知识点

2.1 常见异常类型

2.1.1 IOException

java
import com.rabbitmq.client.*;
import java.io.IOException;
import java.net.ConnectException;
import java.net.SocketTimeoutException;

public class IOExceptionDemo {
    
    public void handleIOException() {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        
        try {
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();
            
            channel.basicPublish("exchange", "key", null, "message".getBytes());
            
        } catch (ConnectException e) {
            System.err.println("连接被拒绝: " + e.getMessage());
            System.err.println("请检查 RabbitMQ 服务是否运行");
            
        } catch (SocketTimeoutException e) {
            System.err.println("连接超时: " + e.getMessage());
            System.err.println("请检查网络连接和防火墙设置");
            
        } catch (IOException e) {
            System.err.println("IO 异常: " + e.getMessage());
            if (e.getCause() != null) {
                System.err.println("原因: " + e.getCause().getMessage());
            }
        }
    }
}

2.1.2 AlreadyClosedException

java
import com.rabbitmq.client.AlreadyClosedException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

public class AlreadyClosedExceptionDemo {
    
    public void handleAlreadyClosedException(Channel channel) {
        try {
            if (channel.isOpen()) {
                channel.basicPublish("exchange", "key", null, "message".getBytes());
            }
            
        } catch (AlreadyClosedException e) {
            System.err.println("连接或通道已关闭");
            System.err.println("关闭原因: " + e.getReason());
            
            if (e.getReason() != null) {
                System.err.println("是否由应用发起: " + e.getReason().isInitiatedByApplication());
            }
            
            handleReconnection();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    
    private void handleReconnection() {
        System.out.println("尝试重新连接...");
    }
}

2.1.3 ShutdownSignalException

java
import com.rabbitmq.client.ShutdownSignalException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

public class ShutdownSignalExceptionDemo {
    
    public void handleShutdownSignal(Channel channel) {
        try {
            channel.addShutdownListener(cause -> {
                System.err.println("=== 关闭信号 ===");
                System.err.println("消息: " + cause.getMessage());
                System.err.println("原因: " + cause.getReason());
                
                System.err.println("是否为硬错误: " + cause.isHardError());
                System.err.println("是否由应用发起: " + cause.isInitiatedByApplication());
                
                if (cause.isHardError()) {
                    System.err.println("连接级别错误");
                    System.err.println("参考类: " + cause.getReference());
                } else {
                    System.err.println("通道级别错误");
                    System.err.println("通道编号: " + cause.getChannelNumber());
                }
                
                if (!cause.isInitiatedByApplication()) {
                    System.err.println("异常关闭,需要处理");
                    handleUnexpectedShutdown(cause);
                }
            });
            
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    
    private void handleUnexpectedShutdown(ShutdownSignalException cause) {
        System.err.println("处理异常关闭...");
        
        if (cause.getMessage().contains("CONNECTION_FORCED")) {
            System.err.println("连接被强制关闭");
        } else if (cause.getMessage().contains("ACCESS_REFUSED")) {
            System.err.println("访问被拒绝,检查权限");
        }
    }
}

2.1.4 ProtocolException

java
import com.rabbitmq.client.ProtocolException;
import com.rabbitmq.client.Channel;

public class ProtocolExceptionDemo {
    
    public void demonstrateProtocolErrors(Channel channel) {
        try {
            channel.queueDeclare("existing.queue", false, false, false, null);
            
            channel.queueDeclare("existing.queue", true, false, false, null);
            
        } catch (IOException e) {
            if (e.getCause() instanceof ProtocolException) {
                ProtocolException pe = (ProtocolException) e.getCause();
                
                System.err.println("协议异常: " + pe.getMessage());
                System.err.println("关闭原因: " + pe.getCloseReason());
                
                handleProtocolError(pe);
            }
        }
    }
    
    private void handleProtocolError(ProtocolException e) {
        int replyCode = e.getCloseReason() != null ? 
            e.getCloseReason().getReplyCode() : 0;
        
        switch (replyCode) {
            case 403:
                System.err.println("访问被拒绝");
                break;
            case 404:
                System.err.println("资源不存在");
                break;
            case 405:
                System.err.println("资源被锁定");
                break;
            case 406:
                System.err.println("前置条件失败");
                break;
            case 504:
                System.err.println("通道错误");
                break;
            default:
                System.err.println("未知协议错误: " + replyCode);
        }
    }
}

2.2 错误码详解

text
┌─────────────────────────────────────────────────────────────────────┐
│                       AMQP 错误码                                    │
├──────────────┬──────────────────────────────────────────────────────┤
│   错误码     │                    说明                               │
├──────────────┼──────────────────────────────────────────────────────┤
│    311       │ CONTENT_TOO_LARGE - 消息内容过大                     │
│    312       │ NO_ROUTE - 消息无法路由                              │
│    313       │ NO_CONSUMERS - 没有消费者                           │
├──────────────┼──────────────────────────────────────────────────────┤
│    403       │ ACCESS_REFUSED - 访问被拒绝                          │
│    404       │ NOT_FOUND - 资源不存在                               │
│    405       │ RESOURCE_LOCKED - 资源被锁定                         │
│    406       │ PRECONDITION_FAILED - 前置条件失败                   │
├──────────────┼──────────────────────────────────────────────────────┤
│    501       │ FRAME_ERROR - 帧错误                                 │
│    502       │ SYNTAX_ERROR - 语法错误                              │
│    503       │ COMMAND_INVALID - 命令无效                           │
│    504       │ CHANNEL_ERROR - 通道错误                             │
│    505       │ UNEXPECTED_FRAME - 意外的帧                          │
│    506       │ RESOURCE_ERROR - 资源错误                            │
├──────────────┼──────────────────────────────────────────────────────┤
│    530       │ NOT_ALLOWED - 不允许的操作                           │
│    540       │ NOT_IMPLEMENTED - 未实现                             │
│    541       │ INTERNAL_ERROR - 内部错误                            │
└──────────────┴──────────────────────────────────────────────────────┘

2.3 异常处理器

java
import com.rabbitmq.client.*;
import java.io.IOException;

public class CustomExceptionHandler implements ExceptionHandler {
    
    @Override
    public void handleUnexpectedConnectionDriverException(Connection conn, Throwable exception) {
        System.err.println("=== 连接驱动异常 ===");
        System.err.println("连接: " + conn.getClientProvidedName());
        System.err.println("异常: " + exception.getClass().getName());
        System.err.println("消息: " + exception.getMessage());
        
        logException("connection_driver", exception);
        
        notifyMonitoring("connection_error", exception);
    }
    
    @Override
    public void handleReturnListenerException(Channel channel, Throwable exception) {
        System.err.println("=== Return 监听器异常 ===");
        System.err.println("通道: " + channel.getChannelNumber());
        System.err.println("异常: " + exception.getMessage());
        
        logException("return_listener", exception);
    }
    
    @Override
    public void handleFlowListenerException(Channel channel, Throwable exception) {
        System.err.println("=== Flow 监听器异常 ===");
        System.err.println("通道: " + channel.getChannelNumber());
        System.err.println("异常: " + exception.getMessage());
        
        logException("flow_listener", exception);
    }
    
    @Override
    public void handleConfirmListenerException(Channel channel, Throwable exception) {
        System.err.println("=== Confirm 监听器异常 ===");
        System.err.println("通道: " + channel.getChannelNumber());
        System.err.println("异常: " + exception.getMessage());
        
        logException("confirm_listener", exception);
    }
    
    @Override
    public void handleBlockedListenerException(Connection connection, Throwable exception) {
        System.err.println("=== Blocked 监听器异常 ===");
        System.err.println("异常: " + exception.getMessage());
        
        logException("blocked_listener", exception);
    }
    
    @Override
    public void handleConsumerException(Channel channel, Throwable exception,
                                        Consumer consumer, String consumerTag,
                                        String methodName) {
        System.err.println("=== 消费者异常 ===");
        System.err.println("通道: " + channel.getChannelNumber());
        System.err.println("消费者标签: " + consumerTag);
        System.err.println("方法: " + methodName);
        System.err.println("异常: " + exception.getMessage());
        
        logException("consumer", exception);
    }
    
    @Override
    public void handleConnectionRecoveryException(Connection conn, Throwable exception) {
        System.err.println("=== 连接恢复异常 ===");
        System.err.println("异常: " + exception.getMessage());
        
        logException("connection_recovery", exception);
    }
    
    @Override
    public void handleChannelRecoveryException(Channel ch, Throwable exception) {
        System.err.println("=== 通道恢复异常 ===");
        System.err.println("通道: " + ch.getChannelNumber());
        System.err.println("异常: " + exception.getMessage());
        
        logException("channel_recovery", exception);
    }
    
    @Override
    public void handleTopologyRecoveryException(Connection conn, Channel ch, 
                                                TopologyRecoveryException exception) {
        System.err.println("=== 拓扑恢复异常 ===");
        System.err.println("异常: " + exception.getMessage());
        System.err.println("恢复项: " + exception.getRecoveryAttemptedItem());
        
        logException("topology_recovery", exception);
    }
    
    private void logException(String type, Throwable exception) {
        System.err.println("[LOG] " + type + ": " + exception.getMessage());
    }
    
    private void notifyMonitoring(String type, Throwable exception) {
        System.err.println("[ALERT] " + type + ": " + exception.getMessage());
    }
}

2.4 重试机制

java
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeUnit;

public class RetryHandler {
    
    private final int maxRetries;
    private final long initialDelay;
    private final long maxDelay;
    private final double multiplier;
    
    public RetryHandler(int maxRetries, long initialDelay, 
                       long maxDelay, double multiplier) {
        this.maxRetries = maxRetries;
        this.initialDelay = initialDelay;
        this.maxDelay = maxDelay;
        this.multiplier = multiplier;
    }
    
    public <T> T executeWithRetry(RetryableOperation<T> operation) throws Exception {
        int attempt = 0;
        long delay = initialDelay;
        Exception lastException = null;
        
        while (attempt < maxRetries) {
            try {
                return operation.execute();
                
            } catch (AlreadyClosedException e) {
                lastException = e;
                attempt++;
                
                System.err.println("尝试 " + attempt + "/" + maxRetries + 
                    " 失败: " + e.getMessage());
                
                if (attempt < maxRetries) {
                    Thread.sleep(delay);
                    delay = Math.min((long)(delay * multiplier), maxDelay);
                }
                
            } catch (IOException e) {
                if (isRetryable(e)) {
                    lastException = e;
                    attempt++;
                    
                    if (attempt < maxRetries) {
                        Thread.sleep(delay);
                        delay = Math.min((long)(delay * multiplier), maxDelay);
                    }
                } else {
                    throw e;
                }
            }
        }
        
        throw new RetryExhaustedException("重试次数耗尽", lastException);
    }
    
    private boolean isRetryable(IOException e) {
        if (e.getCause() instanceof java.net.ConnectException) {
            return true;
        }
        if (e.getCause() instanceof java.net.SocketTimeoutException) {
            return true;
        }
        return false;
    }
    
    @FunctionalInterface
    public interface RetryableOperation<T> {
        T execute() throws Exception;
    }
}

class RetryExhaustedException extends Exception {
    public RetryExhaustedException(String message, Throwable cause) {
        super(message, cause);
    }
}

三、代码示例

3.1 完整异常处理示例

java
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class CompleteExceptionHandlingDemo {
    
    private Connection connection;
    private Channel channel;
    
    public void connect(String host, int port, String username, String password) {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(host);
        factory.setPort(port);
        factory.setUsername(username);
        factory.setPassword(password);
        
        factory.setAutomaticRecoveryEnabled(true);
        factory.setNetworkRecoveryInterval(5000);
        factory.setExceptionHandler(new CustomExceptionHandler());
        
        try {
            connection = factory.newConnection("exception-demo");
            
            connection.addShutdownListener(cause -> {
                System.err.println("连接关闭: " + cause.getMessage());
                
                if (!cause.isInitiatedByApplication()) {
                    System.err.println("异常关闭,检查服务状态");
                }
            });
            
            channel = connection.createChannel();
            
            channel.addShutdownListener(cause -> {
                System.err.println("通道关闭: " + cause.getMessage());
            });
            
            System.out.println("连接建立成功");
            
        } catch (java.net.ConnectException e) {
            System.err.println("无法连接到 RabbitMQ: " + e.getMessage());
            System.err.println("请检查:");
            System.err.println("  1. RabbitMQ 服务是否运行");
            System.err.println("  2. 主机和端口是否正确");
            System.err.println("  3. 防火墙是否允许连接");
            
        } catch (java.net.SocketTimeoutException e) {
            System.err.println("连接超时: " + e.getMessage());
            System.err.println("请检查网络连接");
            
        } catch (IOException e) {
            System.err.println("IO 异常: " + e.getMessage());
            handleIOException(e);
            
        } catch (TimeoutException e) {
            System.err.println("超时异常: " + e.getMessage());
        }
    }
    
    public void publishWithExceptionHandling(String exchange, String routingKey, 
                                             byte[] body) {
        try {
            if (channel == null || !channel.isOpen()) {
                throw new IllegalStateException("通道未打开");
            }
            
            channel.basicPublish(exchange, routingKey, 
                MessageProperties.PERSISTENT_TEXT_PLAIN, body);
            
        } catch (AlreadyClosedException e) {
            System.err.println("通道已关闭: " + e.getMessage());
            handleChannelClosed(e);
            
        } catch (IOException e) {
            System.err.println("发布失败: " + e.getMessage());
            handlePublishError(e);
        }
    }
    
    private void handleIOException(IOException e) {
        if (e.getCause() != null) {
            Throwable cause = e.getCause();
            
            if (cause instanceof ProtocolException) {
                ProtocolException pe = (ProtocolException) cause;
                System.err.println("协议错误: " + pe.getMessage());
            }
        }
    }
    
    private void handleChannelClosed(AlreadyClosedException e) {
        if (e.getReason() != null) {
            ShutdownSignalException reason = e.getReason();
            
            if (reason.isHardError()) {
                System.err.println("连接级别错误");
            } else {
                System.err.println("通道级别错误");
            }
        }
        
        attemptRecovery();
    }
    
    private void handlePublishError(IOException e) {
        System.err.println("处理发布错误...");
    }
    
    private void attemptRecovery() {
        System.out.println("尝试恢复连接...");
    }
    
    public void close() {
        try {
            if (channel != null && channel.isOpen()) {
                channel.close();
            }
            if (connection != null && connection.isOpen()) {
                connection.close();
            }
        } catch (Exception e) {
            System.err.println("关闭时出错: " + e.getMessage());
        }
    }
}

3.2 消费者异常处理

java
import com.rabbitmq.client.*;
import java.io.IOException;

public class ConsumerExceptionHandling {
    
    private final Channel channel;
    
    public ConsumerExceptionHandling(Channel channel) {
        this.channel = channel;
    }
    
    public void startConsumer(String queueName) throws Exception {
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            long deliveryTag = delivery.getEnvelope().getDeliveryTag();
            
            try {
                String message = new String(delivery.getBody(), "UTF-8");
                
                processMessage(message);
                
                channel.basicAck(deliveryTag, false);
                
            } catch (RetryableBusinessException e) {
                System.err.println("可重试业务异常: " + e.getMessage());
                channel.basicNack(deliveryTag, false, true);
                
            } catch (NonRetryableBusinessException e) {
                System.err.println("不可重试业务异常: " + e.getMessage());
                channel.basicReject(deliveryTag, false);
                sendToDeadLetterQueue(delivery, e);
                
            } catch (Exception e) {
                System.err.println("未知异常: " + e.getMessage());
                channel.basicNack(deliveryTag, false, false);
            }
        };
        
        CancelCallback cancelCallback = consumerTag -> {
            System.err.println("消费者被取消: " + consumerTag);
        };
        
        channel.basicConsume(queueName, false, deliverCallback, cancelCallback);
    }
    
    private void processMessage(String message) throws Exception {
        if (message.contains("retry")) {
            throw new RetryableBusinessException("需要重试");
        }
        if (message.contains("error")) {
            throw new NonRetryableBusinessException("无法处理");
        }
        
        System.out.println("处理消息: " + message);
    }
    
    private void sendToDeadLetterQueue(Delivery delivery, Exception e) {
        System.err.println("发送到死信队列: " + e.getMessage());
    }
}

class RetryableBusinessException extends Exception {
    public RetryableBusinessException(String message) {
        super(message);
    }
}

class NonRetryableBusinessException extends Exception {
    public NonRetryableBusinessException(String message) {
        super(message);
    }
}

3.3 熔断器模式

java
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

public class CircuitBreaker {
    
    private final int failureThreshold;
    private final long timeout;
    private final AtomicInteger failureCount = new AtomicInteger(0);
    private final AtomicLong lastFailureTime = new AtomicLong(0);
    private volatile State state = State.CLOSED;
    
    public CircuitBreaker(int failureThreshold, long timeout) {
        this.failureThreshold = failureThreshold;
        this.timeout = timeout;
    }
    
    public void execute(Runnable operation) throws Exception {
        if (state == State.OPEN) {
            if (System.currentTimeMillis() - lastFailureTime.get() > timeout) {
                state = State.HALF_OPEN;
                System.out.println("熔断器进入半开状态");
            } else {
                throw new CircuitBreakerOpenException("熔断器打开,拒绝请求");
            }
        }
        
        try {
            operation.run();
            onSuccess();
        } catch (Exception e) {
            onFailure();
            throw e;
        }
    }
    
    private void onSuccess() {
        failureCount.set(0);
        if (state == State.HALF_OPEN) {
            state = State.CLOSED;
            System.out.println("熔断器关闭");
        }
    }
    
    private void onFailure() {
        int count = failureCount.incrementAndGet();
        lastFailureTime.set(System.currentTimeMillis());
        
        if (count >= failureThreshold) {
            state = State.OPEN;
            System.out.println("熔断器打开,失败次数: " + count);
        }
    }
    
    public State getState() {
        return state;
    }
    
    public enum State {
        CLOSED,
        OPEN,
        HALF_OPEN
    }
}

class CircuitBreakerOpenException extends Exception {
    public CircuitBreakerOpenException(String message) {
        super(message);
    }
}

class CircuitBreakerDemo {
    
    public static void main(String[] args) {
        CircuitBreaker breaker = new CircuitBreaker(3, 10000);
        
        for (int i = 0; i < 10; i++) {
            try {
                breaker.execute(() -> {
                    if (Math.random() > 0.5) {
                        throw new RuntimeException("模拟失败");
                    }
                    System.out.println("操作成功");
                });
            } catch (CircuitBreakerOpenException e) {
                System.err.println("熔断器打开: " + e.getMessage());
            } catch (Exception e) {
                System.err.println("操作失败: " + e.getMessage());
            }
            
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }
}

四、实际应用场景

4.1 生产者异常处理

java
import com.rabbitmq.client.*;
import java.io.IOException;

public class ProducerExceptionHandling {
    
    private final Channel channel;
    private final CircuitBreaker circuitBreaker;
    
    public ProducerExceptionHandling(Channel channel) {
        this.channel = channel;
        this.circuitBreaker = new CircuitBreaker(5, 30000);
    }
    
    public void publish(String exchange, String routingKey, byte[] body) {
        try {
            circuitBreaker.execute(() -> {
                try {
                    channel.basicPublish(exchange, routingKey, 
                        MessageProperties.PERSISTENT_TEXT_PLAIN, body);
                } catch (IOException e) {
                    throw new RuntimeException(e);
                }
            });
            
        } catch (CircuitBreakerOpenException e) {
            System.err.println("熔断器打开,消息暂存: " + new String(body));
            storeMessageForRetry(body);
            
        } catch (Exception e) {
            System.err.println("发布失败: " + e.getMessage());
            handlePublishFailure(exchange, routingKey, body, e);
        }
    }
    
    private void storeMessageForRetry(byte[] body) {
        System.out.println("存储消息等待重试");
    }
    
    private void handlePublishFailure(String exchange, String routingKey, 
                                      byte[] body, Exception e) {
        System.err.println("处理发布失败");
    }
}

4.2 消费者异常处理

java
import com.rabbitmq.client.*;
import java.util.Map;

public class ConsumerExceptionHandlingDemo {
    
    private final Channel channel;
    
    public ConsumerExceptionHandlingDemo(Channel channel) {
        this.channel = channel;
    }
    
    public void start(String queueName) throws Exception {
        channel.basicQos(10);
        
        DeliverCallback callback = (tag, delivery) -> {
            MessageContext context = new MessageContext(delivery);
            
            try {
                process(context);
                ack(context);
                
            } catch (ValidationException e) {
                System.err.println("验证失败: " + e.getMessage());
                reject(context, false);
                
            } catch (TemporaryException e) {
                System.err.println("临时错误,重试: " + e.getMessage());
                nack(context, true);
                
            } catch (PermanentException e) {
                System.err.println("永久错误,拒绝: " + e.getMessage());
                reject(context, false);
                sendToDlq(context, e);
                
            } catch (Exception e) {
                System.err.println("未知错误: " + e.getMessage());
                nack(context, false);
            }
        };
        
        channel.basicConsume(queueName, false, callback, tag -> {});
    }
    
    private void process(MessageContext context) throws Exception {
        System.out.println("处理消息: " + context.getBody());
    }
    
    private void ack(MessageContext context) throws IOException {
        channel.basicAck(context.getDeliveryTag(), false);
    }
    
    private void nack(MessageContext context, boolean requeue) throws IOException {
        channel.basicNack(context.getDeliveryTag(), false, requeue);
    }
    
    private void reject(MessageContext context, boolean requeue) throws IOException {
        channel.basicReject(context.getDeliveryTag(), requeue);
    }
    
    private void sendToDlq(MessageContext context, Exception e) {
        System.err.println("发送到死信队列");
    }
}

class MessageContext {
    private final Delivery delivery;
    
    public MessageContext(Delivery delivery) {
        this.delivery = delivery;
    }
    
    public long getDeliveryTag() {
        return delivery.getEnvelope().getDeliveryTag();
    }
    
    public String getBody() {
        return new String(delivery.getBody());
    }
}

class ValidationException extends Exception {
    public ValidationException(String message) { super(message); }
}

class TemporaryException extends Exception {
    public TemporaryException(String message) { super(message); }
}

class PermanentException extends Exception {
    public PermanentException(String message) { super(message); }
}

五、常见问题与解决方案

5.1 连接频繁断开

问题描述: 连接频繁断开重连。

解决方案

java
ConnectionFactory factory = new ConnectionFactory();
factory.setAutomaticRecoveryEnabled(true);
factory.setNetworkRecoveryInterval(10000);
factory.setRequestedHeartbeat(60);

5.2 通道意外关闭

问题描述: 通道在使用过程中意外关闭。

解决方案

java
channel.addShutdownListener(cause -> {
    if (!cause.isInitiatedByApplication()) {
        log.error("通道异常关闭: {}", cause.getMessage());
        notifyAdmin("通道异常关闭");
    }
});

5.3 消息处理异常

问题描述: 消息处理过程中抛出异常。

解决方案

java
try {
    processMessage(message);
    channel.basicAck(deliveryTag, false);
} catch (RetryableException e) {
    channel.basicNack(deliveryTag, false, true);
} catch (NonRetryableException e) {
    channel.basicReject(deliveryTag, false);
    sendToDlq(message, e);
}

六、最佳实践建议

6.1 异常处理原则

text
处理原则:
├── 区分可恢复和不可恢复异常
├── 实现适当的重试机制
├── 记录详细的错误日志
├── 监控异常发生频率
└── 实现优雅降级

6.2 日志记录

text
日志建议:
├── 记录异常类型和消息
├── 记录异常发生时间
├── 记录相关上下文信息
├── 记录堆栈跟踪
└── 使用合适的日志级别

6.3 监控告警

text
监控建议:
├── 监控连接状态
├── 监控异常频率
├── 监控消息处理失败率
├── 设置合适的告警阈值
└── 实现自动恢复机制

七、相关链接