Skip to content

Spring AMQP

一、概述

Spring AMQP 是 Spring 生态系统的一部分,提供了对 AMQP 协议的高级抽象,简化了 RabbitMQ 的使用。它基于 Spring 的核心概念,提供了消息监听容器、消息转换器、事务管理等功能。

1.1 Spring AMQP 架构

┌─────────────────────────────────────────────────────────────────────┐
│                     Spring AMQP 架构                                 │
├─────────────────────────────────────────────────────────────────────┤
│                                                                     │
│  ┌─────────────────────────────────────────────────────────────┐   │
│  │                    Spring Application                        │   │
│  └──────────────────────────────┬──────────────────────────────┘   │
│                                 │                                   │
│                                 ▼                                   │
│  ┌─────────────────────────────────────────────────────────────┐   │
│  │                     Spring AMQP                              │   │
│  │  ┌─────────────┐ ┌─────────────┐ ┌─────────────┐           │   │
│  │  │RabbitTemplate│ │MessageList- │ │  SimpleMess-│           │   │
│  │  │             │ │enerContainer│ │ageListener  │           │   │
│  │  └─────────────┘ └─────────────┘ └─────────────┘           │   │
│  │  ┌─────────────┐ ┌─────────────┐ ┌─────────────┐           │   │
│  │  │MessageConv- │ │ RabbitAdmin │ │ CachingConn-│           │   │
│  │  │erter        │ │             │ │ ectionFactory│           │   │
│  │  └─────────────┘ └─────────────┘ └─────────────┘           │   │
│  └──────────────────────────────┬──────────────────────────────┘   │
│                                 │                                   │
│                                 ▼                                   │
│  ┌─────────────────────────────────────────────────────────────┐   │
│  │                   RabbitMQ Java Client                       │   │
│  └──────────────────────────────┬──────────────────────────────┘   │
│                                 │                                   │
│                                 ▼                                   │
│  ┌─────────────────────────────────────────────────────────────┐   │
│  │                     RabbitMQ Server                          │   │
│  └─────────────────────────────────────────────────────────────┘   │
│                                                                     │
└─────────────────────────────────────────────────────────────────────┘

1.2 核心组件

text
核心组件:
├── CachingConnectionFactory:连接工厂,支持连接缓存
├── RabbitAdmin:管理交换器、队列、绑定
├── RabbitTemplate:消息发送模板
├── SimpleMessageListenerContainer:消息监听容器
├── MessageConverter:消息转换器
└── RabbitListener:注解驱动的消费者

1.3 依赖配置

xml
<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    
    <dependency>
        <groupId>org.springframework.amqp</groupId>
        <artifactId>spring-rabbit</artifactId>
    </dependency>
</dependencies>

二、核心知识点

2.1 CachingConnectionFactory

2.1.1 基础配置

java
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMQConfig {
    
    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory factory = new CachingConnectionFactory();
        
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setUsername("guest");
        factory.setPassword("guest");
        factory.setVirtualHost("/");
        
        factory.setConnectionCacheSize(5);
        factory.setChannelCacheSize(25);
        
        factory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);
        factory.setPublisherReturns(true);
        
        return factory;
    }
}

2.1.2 配置属性说明

text
┌─────────────────────────────────────────────────────────────────────┐
│               CachingConnectionFactory 配置属性                      │
├────────────────────────┬────────────────────────────────────────────┤
│         属性           │                    说明                     │
├────────────────────────┼────────────────────────────────────────────┤
│  host                  │ RabbitMQ 服务器地址                         │
│  port                  │ RabbitMQ 服务器端口                         │
│  username              │ 用户名                                     │
│  password              │ 密码                                       │
│  virtualHost           │ 虚拟主机                                   │
├────────────────────────┼────────────────────────────────────────────┤
│  connectionCacheSize   │ 连接缓存大小                               │
│  channelCacheSize      │ 通道缓存大小                               │
├────────────────────────┼────────────────────────────────────────────┤
│  publisherConfirmType  │ 发布确认类型                               │
│  publisherReturns      │ 是否启用发布返回                           │
├────────────────────────┼────────────────────────────────────────────┤
│  connectionTimeout     │ 连接超时时间                               │
│  requestedHeartBeat    │ 心跳间隔                                   │
└────────────────────────┴────────────────────────────────────────────┘

2.2 RabbitAdmin

java
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitAdminConfig {
    
    @Bean
    public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
        RabbitAdmin admin = new RabbitAdmin(connectionFactory);
        admin.setAutoStartup(true);
        return admin;
    }
    
    @Bean
    public Queue orderQueue() {
        return QueueBuilder.durable("order.queue")
            .withArgument("x-message-ttl", 86400000)
            .withArgument("x-dead-letter-exchange", "dlx.exchange")
            .build();
    }
    
    @Bean
    public DirectExchange orderExchange() {
        return ExchangeBuilder.directExchange("order.exchange")
            .durable(true)
            .build();
    }
    
    @Bean
    public Binding orderBinding() {
        return BindingBuilder.bind(orderQueue())
            .to(orderExchange())
            .with("order.created");
    }
}

2.3 RabbitTemplate

2.3.1 基础使用

java
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;

@Component
public class MessageProducer {
    
    private final RabbitTemplate rabbitTemplate;
    
    public MessageProducer(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
    }
    
    public void sendSimpleMessage(String exchange, String routingKey, String message) {
        rabbitTemplate.send(exchange, routingKey, 
            MessageBuilder.withBody(message.getBytes()).build());
    }
    
    public void convertAndSend(String exchange, String routingKey, Object message) {
        rabbitTemplate.convertAndSend(exchange, routingKey, message);
    }
    
    public void convertAndSendWithCallback(String exchange, String routingKey, 
                                           Object message) {
        rabbitTemplate.convertAndSend(exchange, routingKey, message, msg -> {
            msg.getMessageProperties().setMessageId(java.util.UUID.randomUUID().toString());
            msg.getMessageProperties().setContentType("application/json");
            msg.getMessageProperties().setPriority(5);
            return msg;
        });
    }
    
    public Object receiveAndConvert(String queueName) {
        return rabbitTemplate.receiveAndConvert(queueName);
    }
    
    public Object sendAndReceive(String exchange, String routingKey, Object message) {
        return rabbitTemplate.convertSendAndReceive(exchange, routingKey, message);
    }
}

2.3.2 发布确认

java
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
import java.util.UUID;

@Component
public class ConfirmProducer implements RabbitTemplate.ConfirmCallback {
    
    private final RabbitTemplate rabbitTemplate;
    
    public ConfirmProducer(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
        rabbitTemplate.setConfirmCallback(this);
    }
    
    public void sendWithConfirm(String exchange, String routingKey, Object message) {
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        
        rabbitTemplate.convertAndSend(exchange, routingKey, message, correlationData);
    }
    
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if (ack) {
            System.out.println("消息确认成功: " + correlationData.getId());
        } else {
            System.err.println("消息确认失败: " + correlationData.getId() + ", 原因: " + cause);
        }
    }
}

2.3.3 返回回调

java
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;

@Component
public class ReturnProducer implements RabbitTemplate.ReturnsCallback {
    
    private final RabbitTemplate rabbitTemplate;
    
    public ReturnProducer(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
        rabbitTemplate.setReturnsCallback(this);
        rabbitTemplate.setMandatory(true);
    }
    
    @Override
    public void returnedMessage(Returned returned) {
        System.err.println("消息被退回:");
        System.err.println("  交换器: " + returned.getExchange());
        System.err.println("  路由键: " + returned.getRoutingKey());
        System.err.println("  回复码: " + returned.getReplyCode());
        System.err.println("  回复文本: " + returned.getReplyText());
        System.err.println("  消息: " + new String(returned.getMessage().getBody()));
    }
}

2.4 SimpleMessageListenerContainer

java
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class ListenerContainerConfig {
    
    @Bean
    public SimpleMessageListenerContainer listenerContainer(
            ConnectionFactory connectionFactory) {
        
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.setQueueNames("order.queue", "payment.queue");
        
        container.setMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message message) {
                System.out.println("收到消息: " + new String(message.getBody()));
            }
        });
        
        container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        container.setPrefetchCount(10);
        container.setConcurrentConsumers(3);
        container.setMaxConcurrentConsumers(10);
        
        container.setAutoStartup(true);
        
        return container;
    }
}

2.5 消息转换器

2.5.1 Jackson2JsonMessageConverter

java
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class MessageConverterConfig {
    
    @Bean
    public MessageConverter jsonMessageConverter() {
        return new Jackson2JsonMessageConverter();
    }
    
    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory,
                                         MessageConverter jsonMessageConverter) {
        RabbitTemplate template = new RabbitTemplate(connectionFactory);
        template.setMessageConverter(jsonMessageConverter);
        return template;
    }
}

2.5.2 自定义消息转换器

java
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.support.converter.MessageConversionException;
import org.springframework.amqp.support.converter.MessageConverter;

public class CustomMessageConverter implements MessageConverter {
    
    @Override
    public Message toMessage(Object object, MessageProperties messageProperties) 
            throws MessageConversionException {
        
        if (object instanceof String) {
            messageProperties.setContentType("text/plain");
            return new Message(((String) object).getBytes(), messageProperties);
        }
        
        if (object instanceof byte[]) {
            messageProperties.setContentType("application/octet-stream");
            return new Message((byte[]) object, messageProperties);
        }
        
        throw new MessageConversionException("不支持的对象类型: " + object.getClass());
    }
    
    @Override
    public Object fromMessage(Message message) throws MessageConversionException {
        String contentType = message.getMessageProperties().getContentType();
        
        if ("text/plain".equals(contentType)) {
            return new String(message.getBody());
        }
        
        if ("application/octet-stream".equals(contentType)) {
            return message.getBody();
        }
        
        return message.getBody();
    }
}

三、代码示例

3.1 完整配置示例

java
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class CompleteRabbitMQConfig {
    
    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory factory = new CachingConnectionFactory();
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setUsername("guest");
        factory.setPassword("guest");
        factory.setVirtualHost("/");
        
        factory.setConnectionCacheSize(5);
        factory.setChannelCacheSize(25);
        factory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);
        factory.setPublisherReturns(true);
        
        return factory;
    }
    
    @Bean
    public MessageConverter jsonMessageConverter() {
        return new Jackson2JsonMessageConverter();
    }
    
    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory,
                                         MessageConverter jsonMessageConverter) {
        RabbitTemplate template = new RabbitTemplate(connectionFactory);
        template.setMessageConverter(jsonMessageConverter);
        template.setMandatory(true);
        return template;
    }
    
    @Bean
    public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
        return new RabbitAdmin(connectionFactory);
    }
    
    @Bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(
            ConnectionFactory connectionFactory) {
        
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setMessageConverter(jsonMessageConverter());
        factory.setConcurrentConsumers(3);
        factory.setMaxConcurrentConsumers(10);
        factory.setPrefetchCount(10);
        factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        
        return factory;
    }
    
    @Bean
    public Queue orderQueue() {
        return QueueBuilder.durable("order.queue")
            .withArgument("x-message-ttl", 86400000)
            .withArgument("x-dead-letter-exchange", "dlx.exchange")
            .build();
    }
    
    @Bean
    public DirectExchange orderExchange() {
        return new DirectExchange("order.exchange", true, false);
    }
    
    @Bean
    public Binding orderBinding() {
        return BindingBuilder.bind(orderQueue())
            .to(orderExchange())
            .with("order.created");
    }
}

3.2 消息生产者服务

java
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Service;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

@Service
public class MessageProducerService {
    
    private final RabbitTemplate rabbitTemplate;
    private final ConcurrentMap<String, MessageStatus> pendingMessages = new ConcurrentHashMap<>();
    
    public MessageProducerService(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
        
        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
            if (correlationData != null) {
                String id = correlationData.getId();
                MessageStatus status = pendingMessages.remove(id);
                
                if (ack) {
                    System.out.println("消息确认成功: " + id);
                    if (status != null) {
                        status.setConfirmed(true);
                    }
                } else {
                    System.err.println("消息确认失败: " + id + ", 原因: " + cause);
                    if (status != null) {
                        status.setError(cause);
                    }
                }
            }
        });
        
        rabbitTemplate.setReturnsCallback(returned -> {
            System.err.println("消息被退回: " + returned.getReplyText());
        });
        
        rabbitTemplate.setMandatory(true);
    }
    
    public void sendOrderMessage(Order order) {
        String messageId = UUID.randomUUID().toString();
        CorrelationData correlationData = new CorrelationData(messageId);
        
        pendingMessages.put(messageId, new MessageStatus(messageId));
        
        rabbitTemplate.convertAndSend(
            "order.exchange",
            "order.created",
            order,
            message -> {
                message.getMessageProperties().setMessageId(messageId);
                message.getMessageProperties().setContentType("application/json");
                message.getMessageProperties().setPriority(5);
                return message;
            },
            correlationData
        );
        
        System.out.println("订单消息已发送: " + order.getOrderId());
    }
    
    public void sendWithRetry(String exchange, String routingKey, Object message, 
                              int maxRetries) {
        
        int attempt = 0;
        Exception lastException = null;
        
        while (attempt < maxRetries) {
            try {
                rabbitTemplate.convertAndSend(exchange, routingKey, message);
                return;
            } catch (Exception e) {
                lastException = e;
                attempt++;
                
                if (attempt < maxRetries) {
                    try {
                        Thread.sleep(1000 * attempt);
                    } catch (InterruptedException ie) {
                        Thread.currentThread().interrupt();
                        break;
                    }
                }
            }
        }
        
        throw new RuntimeException("发送失败,重试次数耗尽", lastException);
    }
}

class MessageStatus {
    private final String messageId;
    private volatile boolean confirmed = false;
    private volatile String error;
    
    public MessageStatus(String messageId) {
        this.messageId = messageId;
    }
    
    public String getMessageId() { return messageId; }
    public boolean isConfirmed() { return confirmed; }
    public void setConfirmed(boolean confirmed) { this.confirmed = confirmed; }
    public String getError() { return error; }
    public void setError(String error) { this.error = error; }
}

class Order {
    private String orderId;
    private String userId;
    private double amount;
    
    public String getOrderId() { return orderId; }
    public void setOrderId(String orderId) { this.orderId = orderId; }
    public String getUserId() { return userId; }
    public void setUserId(String userId) { this.userId = userId; }
    public double getAmount() { return amount; }
    public void setAmount(double amount) { this.amount = amount; }
}

3.3 消息消费者服务

java
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
import com.rabbitmq.client.Channel;
import java.io.IOException;

@Service
public class MessageConsumerService {
    
    @RabbitListener(queues = "order.queue")
    public void handleOrderMessage(Order order, Message message, Channel channel) 
            throws IOException {
        
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        
        try {
            System.out.println("收到订单消息: " + order.getOrderId());
            
            processOrder(order);
            
            channel.basicAck(deliveryTag, false);
            System.out.println("订单处理完成: " + order.getOrderId());
            
        } catch (RetryableException e) {
            System.err.println("可重试异常: " + e.getMessage());
            channel.basicNack(deliveryTag, false, true);
            
        } catch (Exception e) {
            System.err.println("处理失败: " + e.getMessage());
            channel.basicNack(deliveryTag, false, false);
        }
    }
    
    private void processOrder(Order order) throws Exception {
        System.out.println("处理订单: " + order.getOrderId());
    }
}

class RetryableException extends Exception {
    public RetryableException(String message) {
        super(message);
    }
}

四、实际应用场景

4.1 订单处理系统

java
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

@Service
public class OrderProcessingService {
    
    private final RabbitTemplate rabbitTemplate;
    
    public OrderProcessingService(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
    }
    
    public void publishOrderCreated(Order order) {
        rabbitTemplate.convertAndSend("order.exchange", "order.created", order);
    }
    
    public void publishOrderPaid(Order order) {
        rabbitTemplate.convertAndSend("order.exchange", "order.paid", order);
    }
    
    public void publishOrderShipped(Order order) {
        rabbitTemplate.convertAndSend("order.exchange", "order.shipped", order);
    }
    
    @RabbitListener(queues = "order.queue")
    @Transactional
    public void handleOrderEvent(OrderEvent event, Message message, 
                                 Channel channel) throws IOException {
        
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        
        try {
            switch (event.getEventType()) {
                case "order.created":
                    handleOrderCreated(event.getOrder());
                    break;
                case "order.paid":
                    handleOrderPaid(event.getOrder());
                    break;
                case "order.shipped":
                    handleOrderShipped(event.getOrder());
                    break;
            }
            
            channel.basicAck(deliveryTag, false);
            
        } catch (Exception e) {
            channel.basicNack(deliveryTag, false, false);
        }
    }
    
    private void handleOrderCreated(Order order) {
        System.out.println("处理订单创建: " + order.getOrderId());
    }
    
    private void handleOrderPaid(Order order) {
        System.out.println("处理订单支付: " + order.getOrderId());
    }
    
    private void handleOrderShipped(Order order) {
        System.out.println("处理订单发货: " + order.getOrderId());
    }
}

class OrderEvent {
    private String eventType;
    private Order order;
    
    public String getEventType() { return eventType; }
    public void setEventType(String eventType) { this.eventType = eventType; }
    public Order getOrder() { return order; }
    public void setOrder(Order order) { this.order = order; }
}

4.2 日志收集系统

java
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Service;
import java.time.LocalDateTime;

@Service
public class LogCollectionService {
    
    private final RabbitTemplate rabbitTemplate;
    
    public LogCollectionService(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
    }
    
    public void log(String service, String level, String message) {
        LogEntry entry = new LogEntry();
        entry.setService(service);
        entry.setLevel(level);
        entry.setMessage(message);
        entry.setTimestamp(LocalDateTime.now());
        
        String routingKey = service + "." + level.toLowerCase();
        rabbitTemplate.convertAndSend("logs.exchange", routingKey, entry);
    }
    
    public void logError(String service, String message, Throwable throwable) {
        LogEntry entry = new LogEntry();
        entry.setService(service);
        entry.setLevel("ERROR");
        entry.setMessage(message + "\n" + getStackTrace(throwable));
        entry.setTimestamp(LocalDateTime.now());
        
        rabbitTemplate.convertAndSend("logs.exchange", service + ".error", entry);
    }
    
    @RabbitListener(queues = "logs.error.queue")
    public void handleErrorLogs(LogEntry entry) {
        System.err.println("[ERROR] " + entry.getService() + ": " + entry.getMessage());
        sendAlert(entry);
    }
    
    @RabbitListener(queues = "logs.all.queue")
    public void handleAllLogs(LogEntry entry) {
        System.out.println("[" + entry.getLevel() + "] " + 
            entry.getService() + ": " + entry.getMessage());
    }
    
    private void sendAlert(LogEntry entry) {
        System.err.println("发送告警: " + entry.getMessage());
    }
    
    private String getStackTrace(Throwable throwable) {
        StringBuilder sb = new StringBuilder();
        for (StackTraceElement element : throwable.getStackTrace()) {
            sb.append(element).append("\n");
        }
        return sb.toString();
    }
}

class LogEntry {
    private String service;
    private String level;
    private String message;
    private LocalDateTime timestamp;
    
    public String getService() { return service; }
    public void setService(String service) { this.service = service; }
    public String getLevel() { return level; }
    public void setLevel(String level) { this.level = level; }
    public String getMessage() { return message; }
    public void setMessage(String message) { this.message = message; }
    public LocalDateTime getTimestamp() { return timestamp; }
    public void setTimestamp(LocalDateTime timestamp) { this.timestamp = timestamp; }
}

五、常见问题与解决方案

5.1 消息序列化失败

问题描述: 消息序列化时抛出异常。

解决方案

java
@Bean
public MessageConverter jsonMessageConverter() {
    ObjectMapper mapper = new ObjectMapper();
    mapper.registerModule(new JavaTimeModule());
    mapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
    
    return new Jackson2JsonMessageConverter(mapper);
}

5.2 消费者并发问题

问题描述: 消费者并发处理导致数据竞争。

解决方案

java
@RabbitListener(queues = "order.queue", concurrency = "3-10")
public void handleMessage(Order order) {
    processOrder(order);
}

5.3 消息确认超时

问题描述: 消息确认超时导致消息重新投递。

解决方案

properties
spring.rabbitmq.listener.simple.acknowledge-mode=manual
spring.rabbitmq.listener.simple.prefetch=10

六、最佳实践建议

6.1 配置建议

text
配置建议:
├── 启用发布确认确保消息可靠
├── 设置合理的预取数量
├── 配置适当的并发消费者数量
├── 使用 JSON 消息转换器
└── 启用连接缓存

6.2 性能优化

text
性能建议:
├── 批量发送消息
├── 异步确认模式
├── 合理设置并发消费者
├── 使用缓存连接工厂
└── 优化消息大小

6.3 可靠性保障

text
可靠性建议:
├── 使用手动确认模式
├── 实现消息重试机制
├── 配置死信队列
├── 启用消息持久化
└── 监控消息处理状态

七、相关链接