Appearance
Spring Boot 集成配置
一、概述
Spring Boot 提供了对 RabbitMQ 的自动配置支持,通过简单的配置即可快速集成 RabbitMQ。本文档详细介绍 Spring Boot 与 RabbitMQ 的集成配置方式。
1.1 集成架构
┌─────────────────────────────────────────────────────────────────────┐
│ Spring Boot RabbitMQ 集成架构 │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ Spring Boot Application │ │
│ │ │ │
│ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │
│ │ │ application │ │ @RabbitList-│ │ @Service │ │ │
│ │ │ .yml │ │ ener │ │ @Component │ │ │
│ │ └──────┬──────┘ └──────┬──────┘ └──────┬──────┘ │ │
│ │ │ │ │ │ │
│ │ ▼ ▼ ▼ │ │
│ │ ┌─────────────────────────────────────────────────────┐ │ │
│ │ │ Spring Boot Auto-Configuration │ │ │
│ │ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │ │
│ │ │ │RabbitAuto- │ │RabbitTempla-│ │RabbitListen-│ │ │ │
│ │ │ │Configuration│ │te │ │erContainer │ │ │ │
│ │ │ └─────────────┘ └─────────────┘ └─────────────┘ │ │ │
│ │ └─────────────────────────────────────────────────────┘ │ │
│ └──────────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ RabbitMQ Server │ │
│ └─────────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────┘1.2 依赖配置
xml
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
</dependencies>二、核心知识点
2.1 基础配置
2.1.1 application.yml 配置
yaml
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
virtual-host: /
connection-timeout: 30000
cache:
channel:
size: 25
checkout-timeout: 5000
connection:
mode: channel
size: 5
listener:
simple:
acknowledge-mode: manual
concurrency: 3
max-concurrency: 10
prefetch: 10
auto-startup: true
default-requeue-rejected: false
template:
mandatory: true
receive-timeout: 5000
reply-timeout: 5000
publisher-confirm-type: correlated
publisher-returns: true2.1.2 配置属性详解
text
┌─────────────────────────────────────────────────────────────────────┐
│ Spring Boot RabbitMQ 配置属性 │
├──────────────────────────┬──────────────────────────────────────────┤
│ 属性 │ 说明 │
├──────────────────────────┼──────────────────────────────────────────┤
│ spring.rabbitmq.host │ RabbitMQ 服务器地址 │
│ spring.rabbitmq.port │ RabbitMQ 服务器端口 │
│ spring.rabbitmq.username│ 用户名 │
│ spring.rabbitmq.password│ 密码 │
│ spring.rabbitmq.virtual-host│ 虚拟主机 │
├──────────────────────────┼──────────────────────────────────────────┤
│ connection-timeout │ 连接超时时间 │
│ requested-heartbeat │ 心跳间隔 │
├──────────────────────────┼──────────────────────────────────────────┤
│ cache.channel.size │ 通道缓存大小 │
│ cache.connection.size │ 连接缓存大小 │
├──────────────────────────┼──────────────────────────────────────────┤
│ listener.simple.acknowledge-mode│ 确认模式 │
│ listener.simple.concurrency│ 最小并发消费者数量 │
│ listener.simple.max-concurrency│ 最大并发消费者数量 │
│ listener.simple.prefetch│ 预取数量 │
├──────────────────────────┼──────────────────────────────────────────┤
│ publisher-confirm-type │ 发布确认类型 │
│ publisher-returns │ 是否启用发布返回 │
└──────────────────────────┴──────────────────────────────────────────┘2.2 多环境配置
2.2.1 Profile 配置
yaml
spring:
profiles:
active: dev
---
spring:
config:
activate:
on-profile: dev
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
virtual-host: /dev
listener:
simple:
concurrency: 2
max-concurrency: 5
---
spring:
config:
activate:
on-profile: test
rabbitmq:
host: test.rabbitmq.local
port: 5672
username: test_user
password: test_password
virtual-host: /test
listener:
simple:
concurrency: 3
max-concurrency: 10
---
spring:
config:
activate:
on-profile: prod
rabbitmq:
host: prod.rabbitmq.local
port: 5672
username: prod_user
password: ${RABBITMQ_PASSWORD}
virtual-host: /prod
listener:
simple:
concurrency: 5
max-concurrency: 20
prefetch: 20
publisher-confirm-type: correlated
publisher-returns: true2.2.2 环境变量配置
yaml
spring:
rabbitmq:
host: ${RABBITMQ_HOST:localhost}
port: ${RABBITMQ_PORT:5672}
username: ${RABBITMQ_USERNAME:guest}
password: ${RABBITMQ_PASSWORD:guest}
virtual-host: ${RABBITMQ_VHOST:/}2.3 集群配置
yaml
spring:
rabbitmq:
addresses: rabbitmq1:5672,rabbitmq2:5672,rabbitmq3:5672
username: guest
password: guest
cache:
channel:
size: 50
connection:
mode: connection
size: 102.4 SSL 配置
yaml
spring:
rabbitmq:
host: localhost
port: 5671
username: guest
password: guest
ssl:
enabled: true
algorithm: TLSv1.2
key-store: classpath:client-key.jks
key-store-password: ${SSL_KEYSTORE_PASSWORD}
key-store-type: JKS
trust-store: classpath:trust-store.jks
trust-store-password: ${SSL_TRUSTSTORE_PASSWORD}
trust-store-type: JKS
validate-server-certificate: true
verify-hostname: true2.5 Java 配置类
java
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMQConfig {
@Value("${app.rabbitmq.exchange.order}")
private String orderExchange;
@Value("${app.rabbitmq.queue.order}")
private String orderQueue;
@Value("${app.rabbitmq.routing-key.order}")
private String orderRoutingKey;
@Bean
public MessageConverter jsonMessageConverter() {
return new Jackson2JsonMessageConverter();
}
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory,
MessageConverter jsonMessageConverter) {
RabbitTemplate template = new RabbitTemplate(connectionFactory);
template.setMessageConverter(jsonMessageConverter);
template.setMandatory(true);
template.setConfirmCallback((correlationData, ack, cause) -> {
if (ack) {
System.out.println("消息确认成功");
} else {
System.err.println("消息确认失败: " + cause);
}
});
template.setReturnsCallback(returned -> {
System.err.println("消息被退回: " + returned.getReplyText());
});
return template;
}
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(
ConnectionFactory connectionFactory,
MessageConverter jsonMessageConverter) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setMessageConverter(jsonMessageConverter);
factory.setConcurrentConsumers(3);
factory.setMaxConcurrentConsumers(10);
factory.setPrefetchCount(10);
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
factory.setAutoStartup(true);
return factory;
}
@Bean
public DirectExchange orderExchange() {
return ExchangeBuilder
.directExchange(orderExchange)
.durable(true)
.build();
}
@Bean
public Queue orderQueue() {
return QueueBuilder
.durable(orderQueue)
.withArgument("x-message-ttl", 86400000)
.withArgument("x-dead-letter-exchange", "dlx.exchange")
.build();
}
@Bean
public Binding orderBinding() {
return BindingBuilder
.bind(orderQueue())
.to(orderExchange())
.with(orderRoutingKey);
}
}三、代码示例
3.1 完整配置示例
java
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.retry.backoff.ExponentialBackOffPolicy;
import org.springframework.retry.support.RetryTemplate;
@Configuration
@EnableRabbit
public class CompleteRabbitMQConfig {
@Value("${spring.rabbitmq.host}")
private String host;
@Value("${spring.rabbitmq.port}")
private int port;
@Value("${spring.rabbitmq.username}")
private String username;
@Value("${spring.rabbitmq.password}")
private String password;
@Value("${spring.rabbitmq.virtual-host}")
private String virtualHost;
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory factory = new CachingConnectionFactory();
factory.setHost(host);
factory.setPort(port);
factory.setUsername(username);
factory.setPassword(password);
factory.setVirtualHost(virtualHost);
factory.setConnectionCacheSize(10);
factory.setChannelCacheSize(50);
factory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);
factory.setPublisherReturns(true);
return factory;
}
@Bean
public MessageConverter jsonMessageConverter() {
return new Jackson2JsonMessageConverter();
}
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory,
MessageConverter jsonMessageConverter) {
RabbitTemplate template = new RabbitTemplate(connectionFactory);
template.setMessageConverter(jsonMessageConverter);
template.setMandatory(true);
RetryTemplate retryTemplate = new RetryTemplate();
ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
backOffPolicy.setInitialInterval(1000);
backOffPolicy.setMultiplier(2.0);
backOffPolicy.setMaxInterval(10000);
retryTemplate.setBackOffPolicy(backOffPolicy);
template.setRetryTemplate(retryTemplate);
return template;
}
@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
return new RabbitAdmin(connectionFactory);
}
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(
ConnectionFactory connectionFactory,
MessageConverter jsonMessageConverter) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setMessageConverter(jsonMessageConverter);
factory.setConcurrentConsumers(3);
factory.setMaxConcurrentConsumers(10);
factory.setPrefetchCount(10);
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
factory.setDefaultRequeueRejected(false);
factory.setAutoStartup(true);
return factory;
}
}3.2 配置属性类
java
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.Map;
@Component
@ConfigurationProperties(prefix = "app.rabbitmq")
public class RabbitMQProperties {
private Map<String, ExchangeConfig> exchanges = new HashMap<>();
private Map<String, QueueConfig> queues = new HashMap<>();
private Map<String, BindingConfig> bindings = new HashMap<>();
public Map<String, ExchangeConfig> getExchanges() { return exchanges; }
public void setExchanges(Map<String, ExchangeConfig> exchanges) {
this.exchanges = exchanges;
}
public Map<String, QueueConfig> getQueues() { return queues; }
public void setQueues(Map<String, QueueConfig> queues) {
this.queues = queues;
}
public Map<String, BindingConfig> getBindings() { return bindings; }
public void setBindings(Map<String, BindingConfig> bindings) {
this.bindings = bindings;
}
public static class ExchangeConfig {
private String name;
private String type = "direct";
private boolean durable = true;
private boolean autoDelete = false;
public String getName() { return name; }
public void setName(String name) { this.name = name; }
public String getType() { return type; }
public void setType(String type) { this.type = type; }
public boolean isDurable() { return durable; }
public void setDurable(boolean durable) { this.durable = durable; }
public boolean isAutoDelete() { return autoDelete; }
public void setAutoDelete(boolean autoDelete) { this.autoDelete = autoDelete; }
}
public static class QueueConfig {
private String name;
private boolean durable = true;
private boolean exclusive = false;
private boolean autoDelete = false;
private Map<String, Object> arguments = new HashMap<>();
public String getName() { return name; }
public void setName(String name) { this.name = name; }
public boolean isDurable() { return durable; }
public void setDurable(boolean durable) { this.durable = durable; }
public boolean isExclusive() { return exclusive; }
public void setExclusive(boolean exclusive) { this.exclusive = exclusive; }
public boolean isAutoDelete() { return autoDelete; }
public void setAutoDelete(boolean autoDelete) { this.autoDelete = autoDelete; }
public Map<String, Object> getArguments() { return arguments; }
public void setArguments(Map<String, Object> arguments) { this.arguments = arguments; }
}
public static class BindingConfig {
private String queue;
private String exchange;
private String routingKey;
public String getQueue() { return queue; }
public void setQueue(String queue) { this.queue = queue; }
public String getExchange() { return exchange; }
public void setExchange(String exchange) { this.exchange = exchange; }
public String getRoutingKey() { return routingKey; }
public void setRoutingKey(String routingKey) { this.routingKey = routingKey; }
}
}3.3 动态配置
java
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.stereotype.Component;
import java.util.Map;
@Component
public class DynamicRabbitMQConfig {
private final RabbitAdmin rabbitAdmin;
private final RabbitMQProperties properties;
public DynamicRabbitMQConfig(RabbitAdmin rabbitAdmin, RabbitMQProperties properties) {
this.rabbitAdmin = rabbitAdmin;
this.properties = properties;
initializeResources();
}
private void initializeResources() {
properties.getExchanges().forEach((key, config) -> {
Exchange exchange = createExchange(config);
rabbitAdmin.declareExchange(exchange);
});
properties.getQueues().forEach((key, config) -> {
Queue queue = createQueue(config);
rabbitAdmin.declareQueue(queue);
});
properties.getBindings().forEach((key, config) -> {
Binding binding = createBinding(config);
rabbitAdmin.declareBinding(binding);
});
}
private Exchange createExchange(RabbitMQProperties.ExchangeConfig config) {
return switch (config.getType().toLowerCase()) {
case "direct" -> new DirectExchange(config.getName(), config.isDurable(), config.isAutoDelete());
case "topic" -> new TopicExchange(config.getName(), config.isDurable(), config.isAutoDelete());
case "fanout" -> new FanoutExchange(config.getName(), config.isDurable(), config.isAutoDelete());
case "headers" -> new HeadersExchange(config.getName(), config.isDurable(), config.isAutoDelete());
default -> throw new IllegalArgumentException("不支持的交换器类型: " + config.getType());
};
}
private Queue createQueue(RabbitMQProperties.QueueConfig config) {
return new Queue(config.getName(), config.isDurable(),
config.isExclusive(), config.isAutoDelete(), config.getArguments());
}
private Binding createBinding(RabbitMQProperties.BindingConfig config) {
return new Binding(config.getQueue(), Binding.DestinationType.QUEUE,
config.getExchange(), config.getRoutingKey(), null);
}
}四、实际应用场景
4.1 微服务配置
yaml
spring:
application:
name: order-service
rabbitmq:
host: ${RABBITMQ_HOST:rabbitmq}
port: ${RABBITMQ_PORT:5672}
username: ${RABBITMQ_USERNAME:order_service}
password: ${RABBITMQ_PASSWORD}
virtual-host: /production
publisher-confirm-type: correlated
publisher-returns: true
listener:
simple:
acknowledge-mode: manual
concurrency: 5
max-concurrency: 15
prefetch: 20
app:
rabbitmq:
exchanges:
order:
name: order.exchange
type: topic
durable: true
payment:
name: payment.exchange
type: direct
durable: true
queues:
order-created:
name: order.created.queue
durable: true
arguments:
x-message-ttl: 86400000
x-dead-letter-exchange: dlx.exchange
payment-completed:
name: payment.completed.queue
durable: true
bindings:
order-binding:
queue: order.created.queue
exchange: order.exchange
routing-key: order.#4.2 健康检查配置
java
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.actuate.health.Health;
import org.springframework.boot.actuate.health.HealthIndicator;
import org.springframework.stereotype.Component;
@Component
public class RabbitMQHealthIndicator implements HealthIndicator {
private final RabbitTemplate rabbitTemplate;
public RabbitMQHealthIndicator(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}
@Override
public Health health() {
try {
rabbitTemplate.execute(channel -> {
channel.getConnectionFactory().createConnection().close();
return null;
});
return Health.up()
.withDetail("status", "connected")
.build();
} catch (Exception e) {
return Health.down()
.withDetail("error", e.getMessage())
.build();
}
}
}五、常见问题与解决方案
5.1 连接失败
问题描述: 应用启动时无法连接到 RabbitMQ。
解决方案:
yaml
spring:
rabbitmq:
connection-timeout: 60000
listener:
simple:
auto-startup: false5.2 消息丢失
问题描述: 消息发送后丢失。
解决方案:
yaml
spring:
rabbitmq:
publisher-confirm-type: correlated
publisher-returns: true
template:
mandatory: true5.3 消费者阻塞
问题描述: 消费者处理缓慢导致消息堆积。
解决方案:
yaml
spring:
rabbitmq:
listener:
simple:
prefetch: 10
concurrency: 5
max-concurrency: 20六、最佳实践建议
6.1 配置建议
text
配置建议:
├── 使用环境变量管理敏感信息
├── 配置合理的连接和通道缓存
├── 启用发布确认机制
├── 配置手动确认模式
└── 设置合理的并发消费者数量6.2 安全配置
text
安全建议:
├── 生产环境使用 SSL 加密
├── 使用专用用户账号
├── 配置虚拟主机隔离
├── 定期轮换密码
└── 限制用户权限6.3 性能配置
text
性能建议:
├── 合理设置预取数量
├── 配置连接和通道缓存
├── 使用批量操作
├── 异步确认模式
└── 合理的并发消费者数量