Appearance
Java 异常处理
一、概述
在使用 RabbitMQ Java 客户端时,正确处理异常对于构建健壮的消息系统至关重要。本文档详细介绍各种异常类型、处理策略和最佳实践。
1.1 异常层次结构
┌─────────────────────────────────────────────────────────────────────┐
│ RabbitMQ 异常层次结构 │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ java.lang.Exception │ │
│ └──────────────────────────────┬──────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ java.io.IOException │ │
│ │ (IO 异常基类) │ │
│ └──────────────────────────────┬──────────────────────────────┘ │
│ │ │
│ ┌───────────────────────┼───────────────────────┐ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ AlreadyCl- │ │ ShutdownSig-│ │ Protocol │ │
│ │ osedException│ │ nalException│ │ Exception │ │
│ │ (已关闭异常) │ │ (关闭信号) │ │ (协议异常) │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
│ │
│ 其他异常: │
│ ├── ConnectException - 连接失败 │
│ ├── TimeoutException - 超时 │
│ ├── InterruptedException - 线程中断 │
│ └── RuntimeException - 运行时异常 │
│ │
└─────────────────────────────────────────────────────────────────────┘1.2 异常分类
text
┌─────────────────────────────────────────────────────────────────────┐
│ 异常分类 │
├───────────────────┬─────────────────────────────────────────────────┤
│ 类型 │ 说明 │
├───────────────────┼─────────────────────────────────────────────────┤
│ 连接异常 │ 连接建立、断开、恢复相关异常 │
├───────────────────┼─────────────────────────────────────────────────┤
│ 通道异常 │ 通道操作相关异常 │
├───────────────────┼─────────────────────────────────────────────────┤
│ 协议异常 │ AMQP 协议相关异常 │
├───────────────────┼─────────────────────────────────────────────────┤
│ 业务异常 │ 消息处理业务逻辑异常 │
├───────────────────┼─────────────────────────────────────────────────┤
│ 资源异常 │ 资源不存在、权限不足等 │
└───────────────────┴─────────────────────────────────────────────────┘二、核心知识点
2.1 常见异常类型
2.1.1 IOException
java
import com.rabbitmq.client.*;
import java.io.IOException;
import java.net.ConnectException;
import java.net.SocketTimeoutException;
public class IOExceptionDemo {
public void handleIOException() {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try {
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.basicPublish("exchange", "key", null, "message".getBytes());
} catch (ConnectException e) {
System.err.println("连接被拒绝: " + e.getMessage());
System.err.println("请检查 RabbitMQ 服务是否运行");
} catch (SocketTimeoutException e) {
System.err.println("连接超时: " + e.getMessage());
System.err.println("请检查网络连接和防火墙设置");
} catch (IOException e) {
System.err.println("IO 异常: " + e.getMessage());
if (e.getCause() != null) {
System.err.println("原因: " + e.getCause().getMessage());
}
}
}
}2.1.2 AlreadyClosedException
java
import com.rabbitmq.client.AlreadyClosedException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
public class AlreadyClosedExceptionDemo {
public void handleAlreadyClosedException(Channel channel) {
try {
if (channel.isOpen()) {
channel.basicPublish("exchange", "key", null, "message".getBytes());
}
} catch (AlreadyClosedException e) {
System.err.println("连接或通道已关闭");
System.err.println("关闭原因: " + e.getReason());
if (e.getReason() != null) {
System.err.println("是否由应用发起: " + e.getReason().isInitiatedByApplication());
}
handleReconnection();
} catch (IOException e) {
e.printStackTrace();
}
}
private void handleReconnection() {
System.out.println("尝试重新连接...");
}
}2.1.3 ShutdownSignalException
java
import com.rabbitmq.client.ShutdownSignalException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
public class ShutdownSignalExceptionDemo {
public void handleShutdownSignal(Channel channel) {
try {
channel.addShutdownListener(cause -> {
System.err.println("=== 关闭信号 ===");
System.err.println("消息: " + cause.getMessage());
System.err.println("原因: " + cause.getReason());
System.err.println("是否为硬错误: " + cause.isHardError());
System.err.println("是否由应用发起: " + cause.isInitiatedByApplication());
if (cause.isHardError()) {
System.err.println("连接级别错误");
System.err.println("参考类: " + cause.getReference());
} else {
System.err.println("通道级别错误");
System.err.println("通道编号: " + cause.getChannelNumber());
}
if (!cause.isInitiatedByApplication()) {
System.err.println("异常关闭,需要处理");
handleUnexpectedShutdown(cause);
}
});
} catch (Exception e) {
e.printStackTrace();
}
}
private void handleUnexpectedShutdown(ShutdownSignalException cause) {
System.err.println("处理异常关闭...");
if (cause.getMessage().contains("CONNECTION_FORCED")) {
System.err.println("连接被强制关闭");
} else if (cause.getMessage().contains("ACCESS_REFUSED")) {
System.err.println("访问被拒绝,检查权限");
}
}
}2.1.4 ProtocolException
java
import com.rabbitmq.client.ProtocolException;
import com.rabbitmq.client.Channel;
public class ProtocolExceptionDemo {
public void demonstrateProtocolErrors(Channel channel) {
try {
channel.queueDeclare("existing.queue", false, false, false, null);
channel.queueDeclare("existing.queue", true, false, false, null);
} catch (IOException e) {
if (e.getCause() instanceof ProtocolException) {
ProtocolException pe = (ProtocolException) e.getCause();
System.err.println("协议异常: " + pe.getMessage());
System.err.println("关闭原因: " + pe.getCloseReason());
handleProtocolError(pe);
}
}
}
private void handleProtocolError(ProtocolException e) {
int replyCode = e.getCloseReason() != null ?
e.getCloseReason().getReplyCode() : 0;
switch (replyCode) {
case 403:
System.err.println("访问被拒绝");
break;
case 404:
System.err.println("资源不存在");
break;
case 405:
System.err.println("资源被锁定");
break;
case 406:
System.err.println("前置条件失败");
break;
case 504:
System.err.println("通道错误");
break;
default:
System.err.println("未知协议错误: " + replyCode);
}
}
}2.2 错误码详解
text
┌─────────────────────────────────────────────────────────────────────┐
│ AMQP 错误码 │
├──────────────┬──────────────────────────────────────────────────────┤
│ 错误码 │ 说明 │
├──────────────┼──────────────────────────────────────────────────────┤
│ 311 │ CONTENT_TOO_LARGE - 消息内容过大 │
│ 312 │ NO_ROUTE - 消息无法路由 │
│ 313 │ NO_CONSUMERS - 没有消费者 │
├──────────────┼──────────────────────────────────────────────────────┤
│ 403 │ ACCESS_REFUSED - 访问被拒绝 │
│ 404 │ NOT_FOUND - 资源不存在 │
│ 405 │ RESOURCE_LOCKED - 资源被锁定 │
│ 406 │ PRECONDITION_FAILED - 前置条件失败 │
├──────────────┼──────────────────────────────────────────────────────┤
│ 501 │ FRAME_ERROR - 帧错误 │
│ 502 │ SYNTAX_ERROR - 语法错误 │
│ 503 │ COMMAND_INVALID - 命令无效 │
│ 504 │ CHANNEL_ERROR - 通道错误 │
│ 505 │ UNEXPECTED_FRAME - 意外的帧 │
│ 506 │ RESOURCE_ERROR - 资源错误 │
├──────────────┼──────────────────────────────────────────────────────┤
│ 530 │ NOT_ALLOWED - 不允许的操作 │
│ 540 │ NOT_IMPLEMENTED - 未实现 │
│ 541 │ INTERNAL_ERROR - 内部错误 │
└──────────────┴──────────────────────────────────────────────────────┘2.3 异常处理器
java
import com.rabbitmq.client.*;
import java.io.IOException;
public class CustomExceptionHandler implements ExceptionHandler {
@Override
public void handleUnexpectedConnectionDriverException(Connection conn, Throwable exception) {
System.err.println("=== 连接驱动异常 ===");
System.err.println("连接: " + conn.getClientProvidedName());
System.err.println("异常: " + exception.getClass().getName());
System.err.println("消息: " + exception.getMessage());
logException("connection_driver", exception);
notifyMonitoring("connection_error", exception);
}
@Override
public void handleReturnListenerException(Channel channel, Throwable exception) {
System.err.println("=== Return 监听器异常 ===");
System.err.println("通道: " + channel.getChannelNumber());
System.err.println("异常: " + exception.getMessage());
logException("return_listener", exception);
}
@Override
public void handleFlowListenerException(Channel channel, Throwable exception) {
System.err.println("=== Flow 监听器异常 ===");
System.err.println("通道: " + channel.getChannelNumber());
System.err.println("异常: " + exception.getMessage());
logException("flow_listener", exception);
}
@Override
public void handleConfirmListenerException(Channel channel, Throwable exception) {
System.err.println("=== Confirm 监听器异常 ===");
System.err.println("通道: " + channel.getChannelNumber());
System.err.println("异常: " + exception.getMessage());
logException("confirm_listener", exception);
}
@Override
public void handleBlockedListenerException(Connection connection, Throwable exception) {
System.err.println("=== Blocked 监听器异常 ===");
System.err.println("异常: " + exception.getMessage());
logException("blocked_listener", exception);
}
@Override
public void handleConsumerException(Channel channel, Throwable exception,
Consumer consumer, String consumerTag,
String methodName) {
System.err.println("=== 消费者异常 ===");
System.err.println("通道: " + channel.getChannelNumber());
System.err.println("消费者标签: " + consumerTag);
System.err.println("方法: " + methodName);
System.err.println("异常: " + exception.getMessage());
logException("consumer", exception);
}
@Override
public void handleConnectionRecoveryException(Connection conn, Throwable exception) {
System.err.println("=== 连接恢复异常 ===");
System.err.println("异常: " + exception.getMessage());
logException("connection_recovery", exception);
}
@Override
public void handleChannelRecoveryException(Channel ch, Throwable exception) {
System.err.println("=== 通道恢复异常 ===");
System.err.println("通道: " + ch.getChannelNumber());
System.err.println("异常: " + exception.getMessage());
logException("channel_recovery", exception);
}
@Override
public void handleTopologyRecoveryException(Connection conn, Channel ch,
TopologyRecoveryException exception) {
System.err.println("=== 拓扑恢复异常 ===");
System.err.println("异常: " + exception.getMessage());
System.err.println("恢复项: " + exception.getRecoveryAttemptedItem());
logException("topology_recovery", exception);
}
private void logException(String type, Throwable exception) {
System.err.println("[LOG] " + type + ": " + exception.getMessage());
}
private void notifyMonitoring(String type, Throwable exception) {
System.err.println("[ALERT] " + type + ": " + exception.getMessage());
}
}2.4 重试机制
java
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
public class RetryHandler {
private final int maxRetries;
private final long initialDelay;
private final long maxDelay;
private final double multiplier;
public RetryHandler(int maxRetries, long initialDelay,
long maxDelay, double multiplier) {
this.maxRetries = maxRetries;
this.initialDelay = initialDelay;
this.maxDelay = maxDelay;
this.multiplier = multiplier;
}
public <T> T executeWithRetry(RetryableOperation<T> operation) throws Exception {
int attempt = 0;
long delay = initialDelay;
Exception lastException = null;
while (attempt < maxRetries) {
try {
return operation.execute();
} catch (AlreadyClosedException e) {
lastException = e;
attempt++;
System.err.println("尝试 " + attempt + "/" + maxRetries +
" 失败: " + e.getMessage());
if (attempt < maxRetries) {
Thread.sleep(delay);
delay = Math.min((long)(delay * multiplier), maxDelay);
}
} catch (IOException e) {
if (isRetryable(e)) {
lastException = e;
attempt++;
if (attempt < maxRetries) {
Thread.sleep(delay);
delay = Math.min((long)(delay * multiplier), maxDelay);
}
} else {
throw e;
}
}
}
throw new RetryExhaustedException("重试次数耗尽", lastException);
}
private boolean isRetryable(IOException e) {
if (e.getCause() instanceof java.net.ConnectException) {
return true;
}
if (e.getCause() instanceof java.net.SocketTimeoutException) {
return true;
}
return false;
}
@FunctionalInterface
public interface RetryableOperation<T> {
T execute() throws Exception;
}
}
class RetryExhaustedException extends Exception {
public RetryExhaustedException(String message, Throwable cause) {
super(message, cause);
}
}三、代码示例
3.1 完整异常处理示例
java
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class CompleteExceptionHandlingDemo {
private Connection connection;
private Channel channel;
public void connect(String host, int port, String username, String password) {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(host);
factory.setPort(port);
factory.setUsername(username);
factory.setPassword(password);
factory.setAutomaticRecoveryEnabled(true);
factory.setNetworkRecoveryInterval(5000);
factory.setExceptionHandler(new CustomExceptionHandler());
try {
connection = factory.newConnection("exception-demo");
connection.addShutdownListener(cause -> {
System.err.println("连接关闭: " + cause.getMessage());
if (!cause.isInitiatedByApplication()) {
System.err.println("异常关闭,检查服务状态");
}
});
channel = connection.createChannel();
channel.addShutdownListener(cause -> {
System.err.println("通道关闭: " + cause.getMessage());
});
System.out.println("连接建立成功");
} catch (java.net.ConnectException e) {
System.err.println("无法连接到 RabbitMQ: " + e.getMessage());
System.err.println("请检查:");
System.err.println(" 1. RabbitMQ 服务是否运行");
System.err.println(" 2. 主机和端口是否正确");
System.err.println(" 3. 防火墙是否允许连接");
} catch (java.net.SocketTimeoutException e) {
System.err.println("连接超时: " + e.getMessage());
System.err.println("请检查网络连接");
} catch (IOException e) {
System.err.println("IO 异常: " + e.getMessage());
handleIOException(e);
} catch (TimeoutException e) {
System.err.println("超时异常: " + e.getMessage());
}
}
public void publishWithExceptionHandling(String exchange, String routingKey,
byte[] body) {
try {
if (channel == null || !channel.isOpen()) {
throw new IllegalStateException("通道未打开");
}
channel.basicPublish(exchange, routingKey,
MessageProperties.PERSISTENT_TEXT_PLAIN, body);
} catch (AlreadyClosedException e) {
System.err.println("通道已关闭: " + e.getMessage());
handleChannelClosed(e);
} catch (IOException e) {
System.err.println("发布失败: " + e.getMessage());
handlePublishError(e);
}
}
private void handleIOException(IOException e) {
if (e.getCause() != null) {
Throwable cause = e.getCause();
if (cause instanceof ProtocolException) {
ProtocolException pe = (ProtocolException) cause;
System.err.println("协议错误: " + pe.getMessage());
}
}
}
private void handleChannelClosed(AlreadyClosedException e) {
if (e.getReason() != null) {
ShutdownSignalException reason = e.getReason();
if (reason.isHardError()) {
System.err.println("连接级别错误");
} else {
System.err.println("通道级别错误");
}
}
attemptRecovery();
}
private void handlePublishError(IOException e) {
System.err.println("处理发布错误...");
}
private void attemptRecovery() {
System.out.println("尝试恢复连接...");
}
public void close() {
try {
if (channel != null && channel.isOpen()) {
channel.close();
}
if (connection != null && connection.isOpen()) {
connection.close();
}
} catch (Exception e) {
System.err.println("关闭时出错: " + e.getMessage());
}
}
}3.2 消费者异常处理
java
import com.rabbitmq.client.*;
import java.io.IOException;
public class ConsumerExceptionHandling {
private final Channel channel;
public ConsumerExceptionHandling(Channel channel) {
this.channel = channel;
}
public void startConsumer(String queueName) throws Exception {
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
long deliveryTag = delivery.getEnvelope().getDeliveryTag();
try {
String message = new String(delivery.getBody(), "UTF-8");
processMessage(message);
channel.basicAck(deliveryTag, false);
} catch (RetryableBusinessException e) {
System.err.println("可重试业务异常: " + e.getMessage());
channel.basicNack(deliveryTag, false, true);
} catch (NonRetryableBusinessException e) {
System.err.println("不可重试业务异常: " + e.getMessage());
channel.basicReject(deliveryTag, false);
sendToDeadLetterQueue(delivery, e);
} catch (Exception e) {
System.err.println("未知异常: " + e.getMessage());
channel.basicNack(deliveryTag, false, false);
}
};
CancelCallback cancelCallback = consumerTag -> {
System.err.println("消费者被取消: " + consumerTag);
};
channel.basicConsume(queueName, false, deliverCallback, cancelCallback);
}
private void processMessage(String message) throws Exception {
if (message.contains("retry")) {
throw new RetryableBusinessException("需要重试");
}
if (message.contains("error")) {
throw new NonRetryableBusinessException("无法处理");
}
System.out.println("处理消息: " + message);
}
private void sendToDeadLetterQueue(Delivery delivery, Exception e) {
System.err.println("发送到死信队列: " + e.getMessage());
}
}
class RetryableBusinessException extends Exception {
public RetryableBusinessException(String message) {
super(message);
}
}
class NonRetryableBusinessException extends Exception {
public NonRetryableBusinessException(String message) {
super(message);
}
}3.3 熔断器模式
java
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
public class CircuitBreaker {
private final int failureThreshold;
private final long timeout;
private final AtomicInteger failureCount = new AtomicInteger(0);
private final AtomicLong lastFailureTime = new AtomicLong(0);
private volatile State state = State.CLOSED;
public CircuitBreaker(int failureThreshold, long timeout) {
this.failureThreshold = failureThreshold;
this.timeout = timeout;
}
public void execute(Runnable operation) throws Exception {
if (state == State.OPEN) {
if (System.currentTimeMillis() - lastFailureTime.get() > timeout) {
state = State.HALF_OPEN;
System.out.println("熔断器进入半开状态");
} else {
throw new CircuitBreakerOpenException("熔断器打开,拒绝请求");
}
}
try {
operation.run();
onSuccess();
} catch (Exception e) {
onFailure();
throw e;
}
}
private void onSuccess() {
failureCount.set(0);
if (state == State.HALF_OPEN) {
state = State.CLOSED;
System.out.println("熔断器关闭");
}
}
private void onFailure() {
int count = failureCount.incrementAndGet();
lastFailureTime.set(System.currentTimeMillis());
if (count >= failureThreshold) {
state = State.OPEN;
System.out.println("熔断器打开,失败次数: " + count);
}
}
public State getState() {
return state;
}
public enum State {
CLOSED,
OPEN,
HALF_OPEN
}
}
class CircuitBreakerOpenException extends Exception {
public CircuitBreakerOpenException(String message) {
super(message);
}
}
class CircuitBreakerDemo {
public static void main(String[] args) {
CircuitBreaker breaker = new CircuitBreaker(3, 10000);
for (int i = 0; i < 10; i++) {
try {
breaker.execute(() -> {
if (Math.random() > 0.5) {
throw new RuntimeException("模拟失败");
}
System.out.println("操作成功");
});
} catch (CircuitBreakerOpenException e) {
System.err.println("熔断器打开: " + e.getMessage());
} catch (Exception e) {
System.err.println("操作失败: " + e.getMessage());
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
}四、实际应用场景
4.1 生产者异常处理
java
import com.rabbitmq.client.*;
import java.io.IOException;
public class ProducerExceptionHandling {
private final Channel channel;
private final CircuitBreaker circuitBreaker;
public ProducerExceptionHandling(Channel channel) {
this.channel = channel;
this.circuitBreaker = new CircuitBreaker(5, 30000);
}
public void publish(String exchange, String routingKey, byte[] body) {
try {
circuitBreaker.execute(() -> {
try {
channel.basicPublish(exchange, routingKey,
MessageProperties.PERSISTENT_TEXT_PLAIN, body);
} catch (IOException e) {
throw new RuntimeException(e);
}
});
} catch (CircuitBreakerOpenException e) {
System.err.println("熔断器打开,消息暂存: " + new String(body));
storeMessageForRetry(body);
} catch (Exception e) {
System.err.println("发布失败: " + e.getMessage());
handlePublishFailure(exchange, routingKey, body, e);
}
}
private void storeMessageForRetry(byte[] body) {
System.out.println("存储消息等待重试");
}
private void handlePublishFailure(String exchange, String routingKey,
byte[] body, Exception e) {
System.err.println("处理发布失败");
}
}4.2 消费者异常处理
java
import com.rabbitmq.client.*;
import java.util.Map;
public class ConsumerExceptionHandlingDemo {
private final Channel channel;
public ConsumerExceptionHandlingDemo(Channel channel) {
this.channel = channel;
}
public void start(String queueName) throws Exception {
channel.basicQos(10);
DeliverCallback callback = (tag, delivery) -> {
MessageContext context = new MessageContext(delivery);
try {
process(context);
ack(context);
} catch (ValidationException e) {
System.err.println("验证失败: " + e.getMessage());
reject(context, false);
} catch (TemporaryException e) {
System.err.println("临时错误,重试: " + e.getMessage());
nack(context, true);
} catch (PermanentException e) {
System.err.println("永久错误,拒绝: " + e.getMessage());
reject(context, false);
sendToDlq(context, e);
} catch (Exception e) {
System.err.println("未知错误: " + e.getMessage());
nack(context, false);
}
};
channel.basicConsume(queueName, false, callback, tag -> {});
}
private void process(MessageContext context) throws Exception {
System.out.println("处理消息: " + context.getBody());
}
private void ack(MessageContext context) throws IOException {
channel.basicAck(context.getDeliveryTag(), false);
}
private void nack(MessageContext context, boolean requeue) throws IOException {
channel.basicNack(context.getDeliveryTag(), false, requeue);
}
private void reject(MessageContext context, boolean requeue) throws IOException {
channel.basicReject(context.getDeliveryTag(), requeue);
}
private void sendToDlq(MessageContext context, Exception e) {
System.err.println("发送到死信队列");
}
}
class MessageContext {
private final Delivery delivery;
public MessageContext(Delivery delivery) {
this.delivery = delivery;
}
public long getDeliveryTag() {
return delivery.getEnvelope().getDeliveryTag();
}
public String getBody() {
return new String(delivery.getBody());
}
}
class ValidationException extends Exception {
public ValidationException(String message) { super(message); }
}
class TemporaryException extends Exception {
public TemporaryException(String message) { super(message); }
}
class PermanentException extends Exception {
public PermanentException(String message) { super(message); }
}五、常见问题与解决方案
5.1 连接频繁断开
问题描述: 连接频繁断开重连。
解决方案:
java
ConnectionFactory factory = new ConnectionFactory();
factory.setAutomaticRecoveryEnabled(true);
factory.setNetworkRecoveryInterval(10000);
factory.setRequestedHeartbeat(60);5.2 通道意外关闭
问题描述: 通道在使用过程中意外关闭。
解决方案:
java
channel.addShutdownListener(cause -> {
if (!cause.isInitiatedByApplication()) {
log.error("通道异常关闭: {}", cause.getMessage());
notifyAdmin("通道异常关闭");
}
});5.3 消息处理异常
问题描述: 消息处理过程中抛出异常。
解决方案:
java
try {
processMessage(message);
channel.basicAck(deliveryTag, false);
} catch (RetryableException e) {
channel.basicNack(deliveryTag, false, true);
} catch (NonRetryableException e) {
channel.basicReject(deliveryTag, false);
sendToDlq(message, e);
}六、最佳实践建议
6.1 异常处理原则
text
处理原则:
├── 区分可恢复和不可恢复异常
├── 实现适当的重试机制
├── 记录详细的错误日志
├── 监控异常发生频率
└── 实现优雅降级6.2 日志记录
text
日志建议:
├── 记录异常类型和消息
├── 记录异常发生时间
├── 记录相关上下文信息
├── 记录堆栈跟踪
└── 使用合适的日志级别6.3 监控告警
text
监控建议:
├── 监控连接状态
├── 监控异常频率
├── 监控消息处理失败率
├── 设置合适的告警阈值
└── 实现自动恢复机制