Appearance
Spring Integration 与 RabbitMQ 集成
概述
Spring Integration 是 Spring 生态系统中用于构建企业集成模式的框架,它提供了消息通道、消息端点、消息路由等核心组件,支持与多种消息中间件集成。RabbitMQ 作为 Spring Integration 的主要消息通道适配器之一,提供了可靠的消息传递能力。
本教程将详细介绍如何在 Spring Integration 中集成 RabbitMQ,包括入站/出站通道适配器、消息网关、消息路由等核心功能。
集成架构设计
架构图
┌─────────────────────────────────────────────────────────────────────┐
│ Spring Integration Application │
│ │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ Message │ │ Message │ │ Message │ │
│ │ Channel │───►│ Handler │───►│ Channel │ │
│ │ (Input) │ │ (Processor) │ │ (Output) │ │
│ └──────────────┘ └──────────────┘ └──────────────┘ │
│ ▲ │ │
│ │ ▼ │
│ ┌──────────────┐ ┌──────────────┐ │
│ │ Inbound │ │ Outbound │ │
│ │ Adapter │ │ Adapter │ │
│ │ (RabbitMQ) │ │ (RabbitMQ) │ │
│ └──────────────┘ └──────────────┘ │
│ ▲ │ │
└─────────│────────────────────────────────────────│───────────────────┘
│ │
│ ▼
┌─────────────────────────────────────────────────────────────────────┐
│ RabbitMQ Broker │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ Exchange │ │
│ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ │
│ │ │ Direct │ │ Topic │ │ Fanout │ │ │
│ │ └─────────┘ └─────────┘ └─────────┘ │ │
│ └─────────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ Queues │ │
│ │ ┌───────────┐ ┌───────────┐ ┌───────────┐ │ │
│ │ │ Queue 1 │ │ Queue 2 │ │ Queue 3 │ │ │
│ │ └───────────┘ └───────────┘ └───────────┘ │ │
│ └─────────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────────┘核心组件
| 组件 | 说明 |
|---|---|
| Message Channel | 消息通道,用于在组件之间传递消息 |
| Inbound Channel Adapter | 入站通道适配器,从 RabbitMQ 接收消息 |
| Outbound Channel Adapter | 出站通道适配器,向 RabbitMQ 发送消息 |
| Message Gateway | 消息网关,提供简化的消息发送接口 |
| Service Activator | 服务激活器,处理消息的业务逻辑 |
| Message Router | 消息路由器,根据条件将消息路由到不同通道 |
| Message Transformer | 消息转换器,转换消息格式 |
配置示例
Maven 依赖
xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.example</groupId>
<artifactId>spring-integration-rabbitmq</artifactId>
<version>1.0.0</version>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.2.0</version>
<relativePath/>
</parent>
<dependencies>
<!-- Spring Boot Starter -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<!-- Spring Integration -->
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-core</artifactId>
</dependency>
<!-- Spring Integration AMQP (RabbitMQ) -->
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-amqp</artifactId>
</dependency>
<!-- Spring AMQP -->
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
</dependency>
<!-- Spring Boot AMQP -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!-- JSON 支持 -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
<!-- Lombok -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<!-- Test -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>application.yml 配置
yaml
server:
port: 8080
spring:
application:
name: spring-integration-rabbitmq-demo
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
virtual-host: /
connection-timeout: 10000
cache:
channel:
size: 25
checkout-timeout: 5000
listener:
simple:
acknowledge-mode: auto
concurrency: 3
max-concurrency: 10
prefetch: 10
retry:
enabled: true
initial-interval: 1000
max-attempts: 3
max-interval: 10000
multiplier: 2.0
default-requeue-rejected: false
publisher-confirm-type: correlated
publisher-returns: true
template:
mandatory: true
spring.integration:
poller:
fixed-delay: 1000
max-messages-per-poll: 10
error:
ignore-failures: true
logging:
level:
org.springframework.integration: DEBUG
org.springframework.amqp: DEBUGJava 代码示例
基础配置类
java
package com.example.integration.config;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.amqp.dsl.Amqp;
import org.springframework.integration.amqp.outbound.AmqpOutboundEndpoint;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.MessageChannels;
import org.springframework.messaging.MessageChannel;
@Configuration
public class RabbitMQIntegrationConfig {
public static final String ORDER_EXCHANGE = "exchange.order";
public static final String ORDER_QUEUE = "queue.order";
public static final String ORDER_ROUTING_KEY = "order.created";
public static final String NOTIFICATION_EXCHANGE = "exchange.notification";
public static final String NOTIFICATION_QUEUE = "queue.notification";
public static final String NOTIFICATION_ROUTING_KEY = "notification.send";
public static final String DLQ_EXCHANGE = "exchange.dlq";
public static final String DLQ_QUEUE = "queue.dlq";
@Bean
public MessageConverter jsonMessageConverter() {
return new Jackson2JsonMessageConverter();
}
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate template = new RabbitTemplate(connectionFactory);
template.setMessageConverter(jsonMessageConverter());
template.setMandatory(true);
return template;
}
@Bean
public DirectExchange orderExchange() {
return new DirectExchange(ORDER_EXCHANGE, true, false);
}
@Bean
public Queue orderQueue() {
return QueueBuilder.durable(ORDER_QUEUE)
.withArgument("x-dead-letter-exchange", DLQ_EXCHANGE)
.withArgument("x-dead-letter-routing-key", "dlq.order")
.build();
}
@Bean
public Binding orderBinding() {
return BindingBuilder.bind(orderQueue())
.to(orderExchange())
.with(ORDER_ROUTING_KEY);
}
@Bean
public TopicExchange notificationExchange() {
return new TopicExchange(NOTIFICATION_EXCHANGE, true, false);
}
@Bean
public Queue notificationQueue() {
return QueueBuilder.durable(NOTIFICATION_QUEUE).build();
}
@Bean
public Binding notificationBinding() {
return BindingBuilder.bind(notificationQueue())
.to(notificationExchange())
.with("notification.#");
}
@Bean
public DirectExchange dlqExchange() {
return new DirectExchange(DLQ_EXCHANGE, true, false);
}
@Bean
public Queue dlqQueue() {
return QueueBuilder.durable(DLQ_QUEUE).build();
}
@Bean
public Binding dlqBinding() {
return BindingBuilder.bind(dlqQueue())
.to(dlqExchange())
.with("dlq.#");
}
@Bean
public MessageChannel orderInputChannel() {
return MessageChannels.direct().getObject();
}
@Bean
public MessageChannel orderOutputChannel() {
return MessageChannels.direct().getObject();
}
@Bean
public MessageChannel notificationChannel() {
return MessageChannels.direct().getObject();
}
@Bean
public QueueChannel errorChannel() {
return MessageChannels.queue(100).getObject();
}
}入站通道适配器
java
package com.example.integration.inbound;
import com.example.integration.config.RabbitMQIntegrationConfig;
import com.example.integration.model.OrderMessage;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.amqp.dsl.Amqp;
import org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.handler.annotation.Payload;
@Configuration
@Slf4j
public class InboundChannelAdapterConfig {
@Bean
public MessageChannel amqpInputChannel() {
return new DirectChannel();
}
@Bean
public AmqpInboundChannelAdapter inboundAdapter(
ConnectionFactory connectionFactory,
MessageChannel amqpInputChannel) {
AmqpInboundChannelAdapter adapter = Amp.inboundAdapter(
connectionFactory,
RabbitMQIntegrationConfig.ORDER_QUEUE)
.messageConverter(new Jackson2JsonMessageConverter())
.outputChannel(amqpInputChannel)
.get();
adapter.setAutoStartup(true);
return adapter;
}
@ServiceActivator(inputChannel = "amqpInputChannel")
public void handleOrderMessage(@Payload OrderMessage order) {
log.info("接收到订单消息: {}", order);
processOrder(order);
}
private void processOrder(OrderMessage order) {
log.info("处理订单: ID={}, 金额={}", order.getOrderId(), order.getAmount());
}
@Bean
public IntegrationFlow amqpInboundFlow(ConnectionFactory connectionFactory) {
return IntegrationFlows
.from(Amqp.inboundAdapter(connectionFactory)
.queueNames(RabbitMQIntegrationConfig.NOTIFICATION_QUEUE)
.messageConverter(new Jackson2JsonMessageConverter()))
.log(LoggingHandler.Level.INFO, "收到通知消息")
.handle(message -> {
log.info("处理通知: {}", message.getPayload());
})
.get();
}
}出站通道适配器
java
package com.example.integration.outbound;
import com.example.integration.config.RabbitMQIntegrationConfig;
import com.example.integration.model.OrderMessage;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.amqp.dsl.Amqp;
import org.springframework.integration.amqp.outbound.AmqpOutboundEndpoint;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.MessageChannels;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
@Configuration
@Slf4j
public class OutboundChannelAdapterConfig {
@Bean
public MessageChannel outboundChannel() {
return MessageChannels.direct().getObject();
}
@Bean
@ServiceActivator(inputChannel = "outboundChannel")
public AmqpOutboundEndpoint amqpOutboundEndpoint(RabbitTemplate rabbitTemplate) {
AmqpOutboundEndpoint endpoint = new AmqpOutboundEndpoint(rabbitTemplate);
endpoint.setExchangeName(RabbitMQIntegrationConfig.ORDER_EXCHANGE);
endpoint.setRoutingKey(RabbitMQIntegrationConfig.ORDER_ROUTING_KEY);
endpoint.setExpectReply(false);
return endpoint;
}
@MessagingGateway(defaultRequestChannel = "outboundChannel")
public interface OrderGateway {
void sendOrder(OrderMessage order);
}
@Bean
public IntegrationFlow orderOutboundFlow(ConnectionFactory connectionFactory) {
return IntegrationFlows
.from("orderInputChannel")
.log(LoggingHandler.Level.INFO, "发送订单消息")
.handle(Amqp.outboundAdapter(connectionFactory)
.exchangeName(RabbitMQIntegrationConfig.ORDER_EXCHANGE)
.routingKey(RabbitMQIntegrationConfig.ORDER_ROUTING_KEY)
.messageConverter(new Jackson2JsonMessageConverter()))
.get();
}
}消息网关
java
package com.example.integration.gateway;
import com.example.integration.model.OrderMessage;
import com.example.integration.model.OrderResult;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;
import java.util.concurrent.CompletableFuture;
@MessagingGateway
public interface OrderProcessingGateway {
void sendOrder(OrderMessage order);
OrderResult processOrder(OrderMessage order);
CompletableFuture<OrderResult> processOrderAsync(OrderMessage order);
void sendWithHeaders(OrderMessage order,
@Header("priority") int priority,
@Header("source") String source);
@MessagingGateway(defaultRequestChannel = "orderInputChannel",
defaultReplyChannel = "orderOutputChannel",
defaultRequestTimeout = 5000,
defaultReplyTimeout = 10000)
OrderResult processWithTimeout(OrderMessage order);
}消息路由
java
package com.example.integration.router;
import com.example.integration.model.OrderMessage;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.Router;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.MessageChannels;
import org.springframework.integration.router.AbstractMessageRouter;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import java.util.Collection;
import java.util.Collections;
@Configuration
@Slf4j
public class MessageRouterConfig {
@Bean
public MessageChannel highPriorityChannel() {
return MessageChannels.direct().getObject();
}
@Bean
public MessageChannel normalPriorityChannel() {
return MessageChannels.direct().getObject();
}
@Bean
public MessageChannel lowPriorityChannel() {
return MessageChannels.direct().getObject();
}
@Router(inputChannel = "orderRoutingChannel")
public String routeOrder(OrderMessage order) {
if (order.getAmount() > 10000) {
return "highPriorityChannel";
} else if (order.getAmount() > 1000) {
return "normalPriorityChannel";
} else {
return "lowPriorityChannel";
}
}
@Bean
public AbstractMessageRouter priorityRouter() {
return new AbstractMessageRouter() {
@Override
protected Collection<MessageChannel> determineTargetChannels(Message<?> message) {
OrderMessage order = (OrderMessage) message.getPayload();
if (order.getAmount() > 10000) {
return Collections.singletonList(highPriorityChannel());
} else if (order.getAmount() > 1000) {
return Collections.singletonList(normalPriorityChannel());
} else {
return Collections.singletonList(lowPriorityChannel());
}
}
};
}
@Bean
public IntegrationFlow routingFlow() {
return IntegrationFlows
.from("orderRoutingChannel")
.route(OrderMessage.class, order -> {
if (order.getAmount() > 10000) {
return "highPriorityChannel";
} else if (order.getAmount() > 1000) {
return "normalPriorityChannel";
} else {
return "lowPriorityChannel";
}
})
.get();
}
}消息转换器
java
package com.example.integration.transformer;
import com.example.integration.model.OrderMessage;
import com.example.integration.model.OrderResult;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.Transformer;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.json.JsonToObjectTransformer;
import org.springframework.integration.json.ObjectToJsonTransformer;
import org.springframework.integration.transformer.PayloadDeserializingTransformer;
import org.springframework.integration.transformer.PayloadSerializingTransformer;
import org.springframework.messaging.Message;
import java.time.LocalDateTime;
@Configuration
@Slf4j
public class MessageTransformerConfig {
@Transformer(inputChannel = "jsonInputChannel",
outputChannel = "objectOutputChannel")
@Bean
public JsonToObjectTransformer jsonToObjectTransformer() {
return new JsonToObjectTransformer(OrderMessage.class);
}
@Transformer(inputChannel = "objectInputChannel",
outputChannel = "jsonOutputChannel")
@Bean
public ObjectToJsonTransformer objectToJsonTransformer() {
return new ObjectToJsonTransformer();
}
@Transformer(inputChannel = "orderInputChannel",
outputChannel = "orderResultChannel")
public OrderResult transformOrderToResult(OrderMessage order) {
log.info("转换订单到结果: {}", order.getOrderId());
OrderResult result = new OrderResult();
result.setOrderId(order.getOrderId());
result.setStatus("PROCESSED");
result.setProcessedAt(LocalDateTime.now());
result.setMessage("订单处理成功");
return result;
}
@Transformer(inputChannel = "enrichInputChannel",
outputChannel = "enrichedOutputChannel")
public OrderMessage enrichOrder(OrderMessage order) {
if (order.getCreatedAt() == null) {
order.setCreatedAt(LocalDateTime.now());
}
if (order.getStatus() == null) {
order.setStatus("PENDING");
}
return order;
}
@Bean
public IntegrationFlow transformationFlow() {
return IntegrationFlows
.from("transformationInputChannel")
.transform(new ObjectToJsonTransformer())
.log(LoggingHandler.Level.INFO, "JSON转换完成")
.transform(new JsonToObjectTransformer(OrderMessage.class))
.log(LoggingHandler.Level.INFO, "对象转换完成")
.channel("transformationOutputChannel")
.get();
}
}消息过滤器
java
package com.example.integration.filter;
import com.example.integration.model.OrderMessage;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.Filter;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.filter.MessageFilter;
import org.springframework.messaging.Message;
@Configuration
@Slf4j
public class MessageFilterConfig {
@Filter(inputChannel = "filterInputChannel",
outputChannel = "filterOutputChannel",
discardChannel = "discardChannel")
public boolean filterOrder(OrderMessage order) {
boolean valid = order.getAmount() > 0
&& order.getOrderId() != null
&& !order.getOrderId().isEmpty();
if (!valid) {
log.warn("订单消息被过滤: {}", order);
}
return valid;
}
@Bean
public MessageFilter amountFilter() {
MessageFilter filter = new MessageFilter(message -> {
OrderMessage order = (OrderMessage) message.getPayload();
return order.getAmount() >= 100;
});
filter.setDiscardChannelName("discardChannel");
return filter;
}
@Bean
public IntegrationFlow filterFlow() {
return IntegrationFlows
.from("filterInputChannel")
.filter(OrderMessage.class,
order -> order.getAmount() > 0 && order.getOrderId() != null,
f -> f.discardChannel("discardChannel"))
.channel("filterOutputChannel")
.get();
}
@Bean
public IntegrationFlow discardFlow() {
return IntegrationFlows
.from("discardChannel")
.log(LoggingHandler.Level.WARN, "消息被丢弃")
.handle(message -> {
log.warn("处理被丢弃的消息: {}", message.getPayload());
})
.get();
}
}消息分割与聚合
java
package com.example.integration.splitter;
import com.example.integration.model.OrderItem;
import com.example.integration.model.OrderMessage;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.aggregator.MessageGroupProcessor;
import org.springframework.integration.aggregator.MethodInvokingMessageGroupProcessor;
import org.springframework.integration.annotation.Aggregator;
import org.springframework.integration.annotation.CorrelationStrategy;
import org.springframework.integration.annotation.ReleaseStrategy;
import org.springframework.integration.annotation.Splitter;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.store.MessageGroup;
import org.springframework.messaging.Message;
import java.util.List;
import java.util.stream.Collectors;
@Configuration
@Slf4j
public class SplitterAggregatorConfig {
@Splitter(inputChannel = "orderSplitChannel",
outputChannel = "itemProcessChannel")
public List<OrderItem> splitOrder(OrderMessage order) {
log.info("分割订单: {}", order.getOrderId());
return order.getItems();
}
@Aggregator(inputChannel = "itemAggregateChannel",
outputChannel = "orderCompleteChannel")
public OrderMessage aggregateItems(List<OrderItem> items) {
log.info("聚合订单项: {} 个", items.size());
OrderMessage order = new OrderMessage();
order.setOrderId(items.get(0).getOrderId());
order.setItems(items);
order.setAmount(items.stream()
.mapToDouble(OrderItem::getPrice)
.sum());
return order;
}
@CorrelationStrategy
public String correlateByOrderId(OrderItem item) {
return item.getOrderId();
}
@ReleaseStrategy
public boolean canRelease(MessageGroup group) {
return group.size() >= 1;
}
@Bean
public IntegrationFlow splitAggregateFlow() {
return IntegrationFlows
.from("orderSplitChannel")
.split(OrderMessage.class, OrderMessage::getItems)
.channel("itemProcessChannel")
.handle(item -> {
log.info("处理订单项: {}", item);
})
.channel("itemAggregateChannel")
.aggregate(a -> a
.correlationStrategy(m -> ((OrderItem) m.getPayload()).getOrderId())
.releaseStrategy(g -> g.size() >= 1))
.channel("orderCompleteChannel")
.get();
}
}PHP 与 Spring Integration 集成
PHP 消息生产者
php
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;
class SpringIntegrationProducer
{
private $connection;
private $channel;
public function __construct(array $config)
{
$this->connection = new AMQPStreamConnection(
$config['host'] ?? 'localhost',
$config['port'] ?? 5672,
$config['user'] ?? 'guest',
$config['password'] ?? 'guest',
$config['vhost'] ?? '/'
);
$this->channel = $this->connection->channel();
$this->setupExchangesAndQueues();
}
private function setupExchangesAndQueues(): void
{
$this->channel->exchange_declare('exchange.order', 'direct', false, true, false);
$this->channel->exchange_declare('exchange.notification', 'topic', false, true, false);
$this->channel->queue_declare('queue.order', false, true, false, false, false,
new AMQPTable([
'x-dead-letter-exchange' => 'exchange.dlq',
'x-dead-letter-routing-key' => 'dlq.order',
])
);
$this->channel->queue_bind('queue.order', 'exchange.order', 'order.created');
}
public function sendOrderMessage(array $orderData): bool
{
$message = new AMQPMessage(
json_encode($orderData),
[
'content_type' => 'application/json',
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
'message_id' => uniqid('msg-', true),
'timestamp' => time(),
'headers' => new AMQPTable([
'spring_json_header_type' => 'com.example.integration.model.OrderMessage',
'priority' => $orderData['priority'] ?? 5,
])
]
);
$this->channel->basic_publish(
$message,
'exchange.order',
'order.created'
);
return true;
}
public function sendNotification(string $type, array $data): bool
{
$routingKey = "notification.{$type}";
$message = new AMQPMessage(
json_encode($data),
[
'content_type' => 'application/json',
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
]
);
$this->channel->basic_publish(
$message,
'exchange.notification',
$routingKey
);
return true;
}
public function close(): void
{
$this->channel->close();
$this->connection->close();
}
}
$producer = new SpringIntegrationProducer([
'host' => 'localhost',
'port' => 5672,
'user' => 'guest',
'password' => 'guest',
]);
$orderData = [
'orderId' => 'ORD-' . time(),
'amount' => 1500.00,
'customerId' => 'CUST-001',
'status' => 'PENDING',
'items' => [
['productId' => 'PROD-001', 'quantity' => 2, 'price' => 500],
['productId' => 'PROD-002', 'quantity' => 1, 'price' => 500],
],
'priority' => 8,
];
$producer->sendOrderMessage($orderData);
echo "订单消息已发送\n";
$producer->sendNotification('email', [
'to' => 'customer@example.com',
'subject' => '订单确认',
'body' => '您的订单已创建成功',
]);
echo "通知消息已发送\n";
$producer->close();PHP 消息消费者
php
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
class SpringIntegrationConsumer
{
private $connection;
private $channel;
public function __construct(array $config)
{
$this->connection = new AMQPStreamConnection(
$config['host'] ?? 'localhost',
$config['port'] ?? 5672,
$config['user'] ?? 'guest',
$config['password'] ?? 'guest',
$config['vhost'] ?? '/'
);
$this->channel = $this->connection->channel();
}
public function consumeOrders(string $queue, callable $processor): void
{
$this->channel->basic_qos(0, 10, false);
$this->channel->basic_consume(
$queue,
'',
false,
false,
false,
false,
function (AMQPMessage $message) use ($processor) {
$this->processMessage($message, $processor);
}
);
echo "等待消息...\n";
while ($this->channel->is_consuming()) {
$this->channel->wait();
}
}
private function processMessage(AMQPMessage $message, callable $processor): void
{
try {
$body = json_decode($message->getBody(), true);
$headers = $this->extractHeaders($message);
$result = $processor($body, $headers);
if ($result) {
$message->ack();
echo "消息处理成功\n";
} else {
$message->nack(false, true);
echo "消息处理失败,重新入队\n";
}
} catch (Exception $e) {
echo "处理异常: " . $e->getMessage() . "\n";
$message->nack(false, false);
}
}
private function extractHeaders(AMQPMessage $message): array
{
$headers = [];
if ($message->has('application_headers')) {
$appHeaders = $message->get('application_headers')->getNativeData();
$headers = array_merge($headers, $appHeaders);
}
$headers['message_id'] = $message->get('message_id');
$headers['timestamp'] = $message->get('timestamp');
$headers['content_type'] = $message->get('content_type');
return $headers;
}
public function close(): void
{
$this->channel->close();
$this->connection->close();
}
}
class OrderProcessor
{
public function process(array $orderData, array $headers): bool
{
echo "处理订单:\n";
echo " 订单ID: " . ($orderData['orderId'] ?? 'N/A') . "\n";
echo " 金额: " . ($orderData['amount'] ?? 0) . "\n";
echo " 客户ID: " . ($orderData['customerId'] ?? 'N/A') . "\n";
echo " 消息ID: " . ($headers['message_id'] ?? 'N/A') . "\n";
if (isset($orderData['items'])) {
echo " 订单项:\n";
foreach ($orderData['items'] as $item) {
echo " - 产品: " . ($item['productId'] ?? 'N/A');
echo ", 数量: " . ($item['quantity'] ?? 0);
echo ", 价格: " . ($item['price'] ?? 0) . "\n";
}
}
return true;
}
}
$consumer = new SpringIntegrationConsumer([
'host' => 'localhost',
'port' => 5672,
'user' => 'guest',
'password' => 'guest',
]);
$processor = new OrderProcessor();
$consumer->consumeOrders('queue.order', [$processor, 'process']);
$consumer->close();实际应用场景
场景一:订单处理流水线
java
package com.example.scenario.pipeline;
import com.example.integration.model.OrderMessage;
import com.example.integration.model.OrderResult;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.amqp.dsl.Amqp;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.handler.LoggingHandler;
import java.time.LocalDateTime;
@Configuration
@Slf4j
public class OrderPipelineConfig {
@Bean
public IntegrationFlow orderProcessingPipeline(ConnectionFactory connectionFactory) {
return IntegrationFlows
.from(Amqp.inboundAdapter(connectionFactory)
.queueNames("queue.order.pipeline")
.messageConverter(new Jackson2JsonMessageConverter()))
.log(LoggingHandler.Level.INFO, "开始处理订单")
.filter(OrderMessage.class,
order -> order.getAmount() > 0,
f -> f.discardChannel("invalidOrderChannel"))
.transform(OrderMessage.class, order -> {
order.setProcessedAt(LocalDateTime.now());
return order;
})
.route(OrderMessage.class, order -> {
if (order.getAmount() > 10000) {
return "highValueOrderChannel";
} else if (order.getAmount() > 1000) {
return "normalOrderChannel";
} else {
return "lowValueOrderChannel";
}
})
.get();
}
@Bean
public IntegrationFlow highValueOrderFlow() {
return IntegrationFlows
.from("highValueOrderChannel")
.log(LoggingHandler.Level.INFO, "处理高价值订单")
.handle(order -> {
log.info("高价值订单处理: {}", order.getPayload());
})
.channel("orderResultChannel")
.get();
}
@Bean
public IntegrationFlow normalOrderFlow() {
return IntegrationFlows
.from("normalOrderChannel")
.log(LoggingHandler.Level.INFO, "处理普通订单")
.handle(order -> {
log.info("普通订单处理: {}", order.getPayload());
})
.channel("orderResultChannel")
.get();
}
@Bean
public IntegrationFlow lowValueOrderFlow() {
return IntegrationFlows
.from("lowValueOrderChannel")
.log(LoggingHandler.Level.INFO, "处理低价值订单")
.handle(order -> {
log.info("低价值订单处理: {}", order.getPayload());
})
.channel("orderResultChannel")
.get();
}
@Bean
public IntegrationFlow resultFlow(ConnectionFactory connectionFactory) {
return IntegrationFlows
.from("orderResultChannel")
.transform(OrderMessage.class, order -> {
OrderResult result = new OrderResult();
result.setOrderId(order.getOrderId());
result.setStatus("COMPLETED");
result.setProcessedAt(LocalDateTime.now());
return result;
})
.handle(Amqp.outboundAdapter(connectionFactory)
.exchangeName("exchange.order.result")
.routingKey("order.completed"))
.get();
}
}场景二:消息广播与订阅
java
package com.example.scenario.pubsub;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.amqp.dsl.Amqp;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.MessageChannels;
import org.springframework.messaging.MessageChannel;
@Configuration
@Slf4j
public class PubSubConfig {
@Bean
public FanoutExchange broadcastExchange() {
return new FanoutExchange("exchange.broadcast", true, false);
}
@Bean
public MessageChannel broadcastInputChannel() {
return MessageChannels.direct().getObject();
}
@Bean
public IntegrationFlow broadcastPublisher(ConnectionFactory connectionFactory) {
return IntegrationFlows
.from("broadcastInputChannel")
.log(LoggingHandler.Level.INFO, "广播消息")
.handle(Amqp.outboundAdapter(connectionFactory)
.exchangeName("exchange.broadcast"))
.get();
}
@Bean
public IntegrationFlow subscriber1(ConnectionFactory connectionFactory) {
return IntegrationFlows
.from(Amqp.inboundAdapter(connectionFactory)
.queueNames("queue.subscriber1"))
.handle(message -> {
log.info("订阅者1收到消息: {}", message.getPayload());
})
.get();
}
@Bean
public IntegrationFlow subscriber2(ConnectionFactory connectionFactory) {
return IntegrationFlows
.from(Amqp.inboundAdapter(connectionFactory)
.queueNames("queue.subscriber2"))
.handle(message -> {
log.info("订阅者2收到消息: {}", message.getPayload());
})
.get();
}
}常见问题与解决方案
问题一:消息序列化失败
症状: 消息无法正确序列化/反序列化
解决方案:
java
@Configuration
public class SerializationConfig {
@Bean
public MessageConverter jsonMessageConverter() {
Jackson2JsonMessageConverter converter = new Jackson2JsonMessageConverter();
converter.setAlwaysConvertToInferredType(true);
return converter;
}
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate template = new RabbitTemplate(connectionFactory);
template.setMessageConverter(jsonMessageConverter());
return template;
}
}问题二:消息堆积
症状: 队列中消息堆积,消费速度慢
解决方案:
java
@Bean
public IntegrationFlow highThroughputFlow(ConnectionFactory connectionFactory) {
return IntegrationFlows
.from(Amqp.inboundAdapter(connectionFactory)
.queueNames("queue.high.throughput")
.concurrentConsumers(10)
.maxConcurrentConsumers(50)
.prefetchCount(20))
.channel(MessageChannels.executor(Executors.newFixedThreadPool(20)))
.handle(message -> {
processMessage(message);
})
.get();
}问题三:消息顺序保证
症状: 消息处理顺序与发送顺序不一致
解决方案:
java
@Bean
public IntegrationFlow orderedProcessingFlow(ConnectionFactory connectionFactory) {
return IntegrationFlows
.from(Amqp.inboundAdapter(connectionFactory)
.queueNames("queue.ordered")
.concurrentConsumers(1))
.channel(MessageChannels.queue())
.handle(message -> {
processInOrder(message);
})
.get();
}最佳实践建议
1. 错误处理
java
@Bean
public IntegrationFlow errorHandlingFlow() {
return IntegrationFlows
.from("errorChannel")
.log(LoggingHandler.Level.ERROR, "错误处理")
.handle(message -> {
ErrorMessage errorMessage = (ErrorMessage) message;
log.error("处理失败: {}", errorMessage.getPayload());
storeErrorForRetry(errorMessage);
})
.get();
}2. 监控配置
java
@Bean
public IntegrationFlow monitoringFlow() {
return IntegrationFlows
.from("monitoredChannel")
.wireTap("monitoringChannel")
.handle(message -> {
processMessage(message);
})
.get();
}
@Bean
public IntegrationFlow monitoringChannelFlow() {
return IntegrationFlows
.from("monitoringChannel")
.handle(message -> {
metricsService.recordMessageProcessed(message);
})
.get();
}3. 重试机制
java
@Bean
public IntegrationFlow retryFlow() {
return IntegrationFlows
.from("retryChannel")
.retryWhen(retrySpec -> retrySpec
.maxAttempts(3)
.filter(throwable -> throwable instanceof RetryableException)
.backoff(Backoff.exponential(
Duration.ofSeconds(1),
Duration.ofSeconds(30),
2.0,
true)))
.handle(message -> {
processWithRetry(message);
})
.get();
}版本兼容性
| Spring Integration | Spring Boot | Spring AMQP | RabbitMQ Server |
|---|---|---|---|
| 6.2.x | 3.2.x | 3.1.x | 3.11+ |
| 6.1.x | 3.1.x | 3.0.x | 3.10+ |
| 6.0.x | 3.0.x | 3.0.x | 3.9+ |
| 5.5.x | 2.7.x | 2.4.x | 3.9+ |
| 5.4.x | 2.6.x | 2.4.x | 3.8+ |
