Skip to content

Spring Boot 集成配置

一、概述

Spring Boot 提供了对 RabbitMQ 的自动配置支持,通过简单的配置即可快速集成 RabbitMQ。本文档详细介绍 Spring Boot 与 RabbitMQ 的集成配置方式。

1.1 集成架构

┌─────────────────────────────────────────────────────────────────────┐
│                   Spring Boot RabbitMQ 集成架构                      │
├─────────────────────────────────────────────────────────────────────┤
│                                                                     │
│  ┌─────────────────────────────────────────────────────────────┐   │
│  │                   Spring Boot Application                    │   │
│  │                                                              │   │
│  │  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐         │   │
│  │  │ application │  │ @RabbitList-│  │ @Service    │         │   │
│  │  │  .yml       │  │ ener        │  │ @Component  │         │   │
│  │  └──────┬──────┘  └──────┬──────┘  └──────┬──────┘         │   │
│  │         │                │                │                 │   │
│  │         ▼                ▼                ▼                 │   │
│  │  ┌─────────────────────────────────────────────────────┐   │   │
│  │  │              Spring Boot Auto-Configuration          │   │   │
│  │  │  ┌─────────────┐ ┌─────────────┐ ┌─────────────┐   │   │   │
│  │  │  │RabbitAuto-  │ │RabbitTempla-│ │RabbitListen-│   │   │   │
│  │  │  │Configuration│ │te           │ │erContainer  │   │   │   │
│  │  │  └─────────────┘ └─────────────┘ └─────────────┘   │   │   │
│  │  └─────────────────────────────────────────────────────┘   │   │
│  └──────────────────────────────────────────────────────────────┘   │
│                                 │                                   │
│                                 ▼                                   │
│  ┌─────────────────────────────────────────────────────────────┐   │
│  │                     RabbitMQ Server                          │   │
│  └─────────────────────────────────────────────────────────────┘   │
│                                                                     │
└─────────────────────────────────────────────────────────────────────┘

1.2 依赖配置

xml
<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    
    <dependency>
        <groupId>com.fasterxml.jackson.core</groupId>
        <artifactId>jackson-databind</artifactId>
    </dependency>
    
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <optional>true</optional>
    </dependency>
</dependencies>

二、核心知识点

2.1 基础配置

2.1.1 application.yml 配置

yaml
spring:
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest
    virtual-host: /
    
    connection-timeout: 30000
    
    cache:
      channel:
        size: 25
        checkout-timeout: 5000
      connection:
        mode: channel
        size: 5
    
    listener:
      simple:
        acknowledge-mode: manual
        concurrency: 3
        max-concurrency: 10
        prefetch: 10
        auto-startup: true
        default-requeue-rejected: false
        
    template:
      mandatory: true
      receive-timeout: 5000
      reply-timeout: 5000
      
    publisher-confirm-type: correlated
    publisher-returns: true

2.1.2 配置属性详解

text
┌─────────────────────────────────────────────────────────────────────┐
│                   Spring Boot RabbitMQ 配置属性                      │
├──────────────────────────┬──────────────────────────────────────────┤
│          属性            │                   说明                    │
├──────────────────────────┼──────────────────────────────────────────┤
│  spring.rabbitmq.host    │ RabbitMQ 服务器地址                      │
│  spring.rabbitmq.port    │ RabbitMQ 服务器端口                      │
│  spring.rabbitmq.username│ 用户名                                  │
│  spring.rabbitmq.password│ 密码                                    │
│  spring.rabbitmq.virtual-host│ 虚拟主机                            │
├──────────────────────────┼──────────────────────────────────────────┤
│  connection-timeout      │ 连接超时时间                            │
│  requested-heartbeat     │ 心跳间隔                                │
├──────────────────────────┼──────────────────────────────────────────┤
│  cache.channel.size      │ 通道缓存大小                            │
│  cache.connection.size   │ 连接缓存大小                            │
├──────────────────────────┼──────────────────────────────────────────┤
│  listener.simple.acknowledge-mode│ 确认模式                        │
│  listener.simple.concurrency│ 最小并发消费者数量                    │
│  listener.simple.max-concurrency│ 最大并发消费者数量                │
│  listener.simple.prefetch│ 预取数量                                │
├──────────────────────────┼──────────────────────────────────────────┤
│  publisher-confirm-type  │ 发布确认类型                            │
│  publisher-returns       │ 是否启用发布返回                        │
└──────────────────────────┴──────────────────────────────────────────┘

2.2 多环境配置

2.2.1 Profile 配置

yaml
spring:
  profiles:
    active: dev

---
spring:
  config:
    activate:
      on-profile: dev
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest
    virtual-host: /dev
    listener:
      simple:
        concurrency: 2
        max-concurrency: 5

---
spring:
  config:
    activate:
      on-profile: test
  rabbitmq:
    host: test.rabbitmq.local
    port: 5672
    username: test_user
    password: test_password
    virtual-host: /test
    listener:
      simple:
        concurrency: 3
        max-concurrency: 10

---
spring:
  config:
    activate:
      on-profile: prod
  rabbitmq:
    host: prod.rabbitmq.local
    port: 5672
    username: prod_user
    password: ${RABBITMQ_PASSWORD}
    virtual-host: /prod
    listener:
      simple:
        concurrency: 5
        max-concurrency: 20
        prefetch: 20
    publisher-confirm-type: correlated
    publisher-returns: true

2.2.2 环境变量配置

yaml
spring:
  rabbitmq:
    host: ${RABBITMQ_HOST:localhost}
    port: ${RABBITMQ_PORT:5672}
    username: ${RABBITMQ_USERNAME:guest}
    password: ${RABBITMQ_PASSWORD:guest}
    virtual-host: ${RABBITMQ_VHOST:/}

2.3 集群配置

yaml
spring:
  rabbitmq:
    addresses: rabbitmq1:5672,rabbitmq2:5672,rabbitmq3:5672
    username: guest
    password: guest
    
    cache:
      channel:
        size: 50
      connection:
        mode: connection
        size: 10

2.4 SSL 配置

yaml
spring:
  rabbitmq:
    host: localhost
    port: 5671
    username: guest
    password: guest
    ssl:
      enabled: true
      algorithm: TLSv1.2
      key-store: classpath:client-key.jks
      key-store-password: ${SSL_KEYSTORE_PASSWORD}
      key-store-type: JKS
      trust-store: classpath:trust-store.jks
      trust-store-password: ${SSL_TRUSTSTORE_PASSWORD}
      trust-store-type: JKS
      validate-server-certificate: true
      verify-hostname: true

2.5 Java 配置类

java
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMQConfig {
    
    @Value("${app.rabbitmq.exchange.order}")
    private String orderExchange;
    
    @Value("${app.rabbitmq.queue.order}")
    private String orderQueue;
    
    @Value("${app.rabbitmq.routing-key.order}")
    private String orderRoutingKey;
    
    @Bean
    public MessageConverter jsonMessageConverter() {
        return new Jackson2JsonMessageConverter();
    }
    
    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory,
                                         MessageConverter jsonMessageConverter) {
        RabbitTemplate template = new RabbitTemplate(connectionFactory);
        template.setMessageConverter(jsonMessageConverter);
        template.setMandatory(true);
        
        template.setConfirmCallback((correlationData, ack, cause) -> {
            if (ack) {
                System.out.println("消息确认成功");
            } else {
                System.err.println("消息确认失败: " + cause);
            }
        });
        
        template.setReturnsCallback(returned -> {
            System.err.println("消息被退回: " + returned.getReplyText());
        });
        
        return template;
    }
    
    @Bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(
            ConnectionFactory connectionFactory,
            MessageConverter jsonMessageConverter) {
        
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setMessageConverter(jsonMessageConverter);
        factory.setConcurrentConsumers(3);
        factory.setMaxConcurrentConsumers(10);
        factory.setPrefetchCount(10);
        factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        factory.setAutoStartup(true);
        
        return factory;
    }
    
    @Bean
    public DirectExchange orderExchange() {
        return ExchangeBuilder
            .directExchange(orderExchange)
            .durable(true)
            .build();
    }
    
    @Bean
    public Queue orderQueue() {
        return QueueBuilder
            .durable(orderQueue)
            .withArgument("x-message-ttl", 86400000)
            .withArgument("x-dead-letter-exchange", "dlx.exchange")
            .build();
    }
    
    @Bean
    public Binding orderBinding() {
        return BindingBuilder
            .bind(orderQueue())
            .to(orderExchange())
            .with(orderRoutingKey);
    }
}

三、代码示例

3.1 完整配置示例

java
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.retry.backoff.ExponentialBackOffPolicy;
import org.springframework.retry.support.RetryTemplate;

@Configuration
@EnableRabbit
public class CompleteRabbitMQConfig {
    
    @Value("${spring.rabbitmq.host}")
    private String host;
    
    @Value("${spring.rabbitmq.port}")
    private int port;
    
    @Value("${spring.rabbitmq.username}")
    private String username;
    
    @Value("${spring.rabbitmq.password}")
    private String password;
    
    @Value("${spring.rabbitmq.virtual-host}")
    private String virtualHost;
    
    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory factory = new CachingConnectionFactory();
        factory.setHost(host);
        factory.setPort(port);
        factory.setUsername(username);
        factory.setPassword(password);
        factory.setVirtualHost(virtualHost);
        
        factory.setConnectionCacheSize(10);
        factory.setChannelCacheSize(50);
        factory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);
        factory.setPublisherReturns(true);
        
        return factory;
    }
    
    @Bean
    public MessageConverter jsonMessageConverter() {
        return new Jackson2JsonMessageConverter();
    }
    
    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory,
                                         MessageConverter jsonMessageConverter) {
        RabbitTemplate template = new RabbitTemplate(connectionFactory);
        template.setMessageConverter(jsonMessageConverter);
        template.setMandatory(true);
        
        RetryTemplate retryTemplate = new RetryTemplate();
        ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
        backOffPolicy.setInitialInterval(1000);
        backOffPolicy.setMultiplier(2.0);
        backOffPolicy.setMaxInterval(10000);
        retryTemplate.setBackOffPolicy(backOffPolicy);
        template.setRetryTemplate(retryTemplate);
        
        return template;
    }
    
    @Bean
    public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
        return new RabbitAdmin(connectionFactory);
    }
    
    @Bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(
            ConnectionFactory connectionFactory,
            MessageConverter jsonMessageConverter) {
        
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setMessageConverter(jsonMessageConverter);
        factory.setConcurrentConsumers(3);
        factory.setMaxConcurrentConsumers(10);
        factory.setPrefetchCount(10);
        factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        factory.setDefaultRequeueRejected(false);
        factory.setAutoStartup(true);
        
        return factory;
    }
}

3.2 配置属性类

java
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.Map;

@Component
@ConfigurationProperties(prefix = "app.rabbitmq")
public class RabbitMQProperties {
    
    private Map<String, ExchangeConfig> exchanges = new HashMap<>();
    private Map<String, QueueConfig> queues = new HashMap<>();
    private Map<String, BindingConfig> bindings = new HashMap<>();
    
    public Map<String, ExchangeConfig> getExchanges() { return exchanges; }
    public void setExchanges(Map<String, ExchangeConfig> exchanges) { 
        this.exchanges = exchanges; 
    }
    
    public Map<String, QueueConfig> getQueues() { return queues; }
    public void setQueues(Map<String, QueueConfig> queues) { 
        this.queues = queues; 
    }
    
    public Map<String, BindingConfig> getBindings() { return bindings; }
    public void setBindings(Map<String, BindingConfig> bindings) { 
        this.bindings = bindings; 
    }
    
    public static class ExchangeConfig {
        private String name;
        private String type = "direct";
        private boolean durable = true;
        private boolean autoDelete = false;
        
        public String getName() { return name; }
        public void setName(String name) { this.name = name; }
        public String getType() { return type; }
        public void setType(String type) { this.type = type; }
        public boolean isDurable() { return durable; }
        public void setDurable(boolean durable) { this.durable = durable; }
        public boolean isAutoDelete() { return autoDelete; }
        public void setAutoDelete(boolean autoDelete) { this.autoDelete = autoDelete; }
    }
    
    public static class QueueConfig {
        private String name;
        private boolean durable = true;
        private boolean exclusive = false;
        private boolean autoDelete = false;
        private Map<String, Object> arguments = new HashMap<>();
        
        public String getName() { return name; }
        public void setName(String name) { this.name = name; }
        public boolean isDurable() { return durable; }
        public void setDurable(boolean durable) { this.durable = durable; }
        public boolean isExclusive() { return exclusive; }
        public void setExclusive(boolean exclusive) { this.exclusive = exclusive; }
        public boolean isAutoDelete() { return autoDelete; }
        public void setAutoDelete(boolean autoDelete) { this.autoDelete = autoDelete; }
        public Map<String, Object> getArguments() { return arguments; }
        public void setArguments(Map<String, Object> arguments) { this.arguments = arguments; }
    }
    
    public static class BindingConfig {
        private String queue;
        private String exchange;
        private String routingKey;
        
        public String getQueue() { return queue; }
        public void setQueue(String queue) { this.queue = queue; }
        public String getExchange() { return exchange; }
        public void setExchange(String exchange) { this.exchange = exchange; }
        public String getRoutingKey() { return routingKey; }
        public void setRoutingKey(String routingKey) { this.routingKey = routingKey; }
    }
}

3.3 动态配置

java
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.stereotype.Component;
import java.util.Map;

@Component
public class DynamicRabbitMQConfig {
    
    private final RabbitAdmin rabbitAdmin;
    private final RabbitMQProperties properties;
    
    public DynamicRabbitMQConfig(RabbitAdmin rabbitAdmin, RabbitMQProperties properties) {
        this.rabbitAdmin = rabbitAdmin;
        this.properties = properties;
        
        initializeResources();
    }
    
    private void initializeResources() {
        properties.getExchanges().forEach((key, config) -> {
            Exchange exchange = createExchange(config);
            rabbitAdmin.declareExchange(exchange);
        });
        
        properties.getQueues().forEach((key, config) -> {
            Queue queue = createQueue(config);
            rabbitAdmin.declareQueue(queue);
        });
        
        properties.getBindings().forEach((key, config) -> {
            Binding binding = createBinding(config);
            rabbitAdmin.declareBinding(binding);
        });
    }
    
    private Exchange createExchange(RabbitMQProperties.ExchangeConfig config) {
        return switch (config.getType().toLowerCase()) {
            case "direct" -> new DirectExchange(config.getName(), config.isDurable(), config.isAutoDelete());
            case "topic" -> new TopicExchange(config.getName(), config.isDurable(), config.isAutoDelete());
            case "fanout" -> new FanoutExchange(config.getName(), config.isDurable(), config.isAutoDelete());
            case "headers" -> new HeadersExchange(config.getName(), config.isDurable(), config.isAutoDelete());
            default -> throw new IllegalArgumentException("不支持的交换器类型: " + config.getType());
        };
    }
    
    private Queue createQueue(RabbitMQProperties.QueueConfig config) {
        return new Queue(config.getName(), config.isDurable(), 
            config.isExclusive(), config.isAutoDelete(), config.getArguments());
    }
    
    private Binding createBinding(RabbitMQProperties.BindingConfig config) {
        return new Binding(config.getQueue(), Binding.DestinationType.QUEUE,
            config.getExchange(), config.getRoutingKey(), null);
    }
}

四、实际应用场景

4.1 微服务配置

yaml
spring:
  application:
    name: order-service
  rabbitmq:
    host: ${RABBITMQ_HOST:rabbitmq}
    port: ${RABBITMQ_PORT:5672}
    username: ${RABBITMQ_USERNAME:order_service}
    password: ${RABBITMQ_PASSWORD}
    virtual-host: /production
    
    publisher-confirm-type: correlated
    publisher-returns: true
    
    listener:
      simple:
        acknowledge-mode: manual
        concurrency: 5
        max-concurrency: 15
        prefetch: 20
        
app:
  rabbitmq:
    exchanges:
      order:
        name: order.exchange
        type: topic
        durable: true
      payment:
        name: payment.exchange
        type: direct
        durable: true
    queues:
      order-created:
        name: order.created.queue
        durable: true
        arguments:
          x-message-ttl: 86400000
          x-dead-letter-exchange: dlx.exchange
      payment-completed:
        name: payment.completed.queue
        durable: true
    bindings:
      order-binding:
        queue: order.created.queue
        exchange: order.exchange
        routing-key: order.#

4.2 健康检查配置

java
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.actuate.health.Health;
import org.springframework.boot.actuate.health.HealthIndicator;
import org.springframework.stereotype.Component;

@Component
public class RabbitMQHealthIndicator implements HealthIndicator {
    
    private final RabbitTemplate rabbitTemplate;
    
    public RabbitMQHealthIndicator(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
    }
    
    @Override
    public Health health() {
        try {
            rabbitTemplate.execute(channel -> {
                channel.getConnectionFactory().createConnection().close();
                return null;
            });
            
            return Health.up()
                .withDetail("status", "connected")
                .build();
        } catch (Exception e) {
            return Health.down()
                .withDetail("error", e.getMessage())
                .build();
        }
    }
}

五、常见问题与解决方案

5.1 连接失败

问题描述: 应用启动时无法连接到 RabbitMQ。

解决方案

yaml
spring:
  rabbitmq:
    connection-timeout: 60000
    listener:
      simple:
        auto-startup: false

5.2 消息丢失

问题描述: 消息发送后丢失。

解决方案

yaml
spring:
  rabbitmq:
    publisher-confirm-type: correlated
    publisher-returns: true
    template:
      mandatory: true

5.3 消费者阻塞

问题描述: 消费者处理缓慢导致消息堆积。

解决方案

yaml
spring:
  rabbitmq:
    listener:
      simple:
        prefetch: 10
        concurrency: 5
        max-concurrency: 20

六、最佳实践建议

6.1 配置建议

text
配置建议:
├── 使用环境变量管理敏感信息
├── 配置合理的连接和通道缓存
├── 启用发布确认机制
├── 配置手动确认模式
└── 设置合理的并发消费者数量

6.2 安全配置

text
安全建议:
├── 生产环境使用 SSL 加密
├── 使用专用用户账号
├── 配置虚拟主机隔离
├── 定期轮换密码
└── 限制用户权限

6.3 性能配置

text
性能建议:
├── 合理设置预取数量
├── 配置连接和通道缓存
├── 使用批量操作
├── 异步确认模式
└── 合理的并发消费者数量

七、相关链接