Skip to content

Spring Integration 与 RabbitMQ 集成

概述

Spring Integration 是 Spring 生态系统中用于构建企业集成模式的框架,它提供了消息通道、消息端点、消息路由等核心组件,支持与多种消息中间件集成。RabbitMQ 作为 Spring Integration 的主要消息通道适配器之一,提供了可靠的消息传递能力。

本教程将详细介绍如何在 Spring Integration 中集成 RabbitMQ,包括入站/出站通道适配器、消息网关、消息路由等核心功能。

集成架构设计

架构图

┌─────────────────────────────────────────────────────────────────────┐
│                     Spring Integration Application                   │
│                                                                      │
│  ┌──────────────┐    ┌──────────────┐    ┌──────────────┐          │
│  │   Message    │    │   Message    │    │   Message    │          │
│  │   Channel    │───►│   Handler    │───►│   Channel    │          │
│  │  (Input)     │    │  (Processor) │    │  (Output)    │          │
│  └──────────────┘    └──────────────┘    └──────────────┘          │
│         ▲                                        │                   │
│         │                                        ▼                   │
│  ┌──────────────┐                      ┌──────────────┐             │
│  │  Inbound     │                      │  Outbound    │             │
│  │  Adapter     │                      │  Adapter     │             │
│  │  (RabbitMQ)  │                      │  (RabbitMQ)  │             │
│  └──────────────┘                      └──────────────┘             │
│         ▲                                        │                   │
└─────────│────────────────────────────────────────│───────────────────┘
          │                                        │
          │                                        ▼
┌─────────────────────────────────────────────────────────────────────┐
│                         RabbitMQ Broker                              │
│  ┌─────────────────────────────────────────────────────────────┐    │
│  │                        Exchange                              │    │
│  │    ┌─────────┐    ┌─────────┐    ┌─────────┐               │    │
│  │    │ Direct  │    │  Topic  │    │ Fanout  │               │    │
│  │    └─────────┘    └─────────┘    └─────────┘               │    │
│  └─────────────────────────────────────────────────────────────┘    │
│                              │                                       │
│                              ▼                                       │
│  ┌─────────────────────────────────────────────────────────────┐    │
│  │                          Queues                              │    │
│  │    ┌───────────┐    ┌───────────┐    ┌───────────┐         │    │
│  │    │  Queue 1  │    │  Queue 2  │    │  Queue 3  │         │    │
│  │    └───────────┘    └───────────┘    └───────────┘         │    │
│  └─────────────────────────────────────────────────────────────┘    │
└─────────────────────────────────────────────────────────────────────┘

核心组件

组件说明
Message Channel消息通道,用于在组件之间传递消息
Inbound Channel Adapter入站通道适配器,从 RabbitMQ 接收消息
Outbound Channel Adapter出站通道适配器,向 RabbitMQ 发送消息
Message Gateway消息网关,提供简化的消息发送接口
Service Activator服务激活器,处理消息的业务逻辑
Message Router消息路由器,根据条件将消息路由到不同通道
Message Transformer消息转换器,转换消息格式

配置示例

Maven 依赖

xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
         http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    
    <groupId>com.example</groupId>
    <artifactId>spring-integration-rabbitmq</artifactId>
    <version>1.0.0</version>
    
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>3.2.0</version>
        <relativePath/>
    </parent>
    
    <dependencies>
        <!-- Spring Boot Starter -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        
        <!-- Spring Integration -->
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-core</artifactId>
        </dependency>
        
        <!-- Spring Integration AMQP (RabbitMQ) -->
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-amqp</artifactId>
        </dependency>
        
        <!-- Spring AMQP -->
        <dependency>
            <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-rabbit</artifactId>
        </dependency>
        
        <!-- Spring Boot AMQP -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        
        <!-- JSON 支持 -->
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
        </dependency>
        
        <!-- Lombok -->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        
        <!-- Test -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>
    
    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>
</project>

application.yml 配置

yaml
server:
  port: 8080

spring:
  application:
    name: spring-integration-rabbitmq-demo
  
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest
    virtual-host: /
    
    connection-timeout: 10000
    
    cache:
      channel:
        size: 25
        checkout-timeout: 5000
    
    listener:
      simple:
        acknowledge-mode: auto
        concurrency: 3
        max-concurrency: 10
        prefetch: 10
        retry:
          enabled: true
          initial-interval: 1000
          max-attempts: 3
          max-interval: 10000
          multiplier: 2.0
        default-requeue-rejected: false
    
    publisher-confirm-type: correlated
    publisher-returns: true
    template:
      mandatory: true

spring.integration:
  poller:
    fixed-delay: 1000
    max-messages-per-poll: 10
  error:
    ignore-failures: true

logging:
  level:
    org.springframework.integration: DEBUG
    org.springframework.amqp: DEBUG

Java 代码示例

基础配置类

java
package com.example.integration.config;

import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.amqp.dsl.Amqp;
import org.springframework.integration.amqp.outbound.AmqpOutboundEndpoint;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.MessageChannels;
import org.springframework.messaging.MessageChannel;

@Configuration
public class RabbitMQIntegrationConfig {
    
    public static final String ORDER_EXCHANGE = "exchange.order";
    public static final String ORDER_QUEUE = "queue.order";
    public static final String ORDER_ROUTING_KEY = "order.created";
    
    public static final String NOTIFICATION_EXCHANGE = "exchange.notification";
    public static final String NOTIFICATION_QUEUE = "queue.notification";
    public static final String NOTIFICATION_ROUTING_KEY = "notification.send";
    
    public static final String DLQ_EXCHANGE = "exchange.dlq";
    public static final String DLQ_QUEUE = "queue.dlq";
    
    @Bean
    public MessageConverter jsonMessageConverter() {
        return new Jackson2JsonMessageConverter();
    }
    
    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate template = new RabbitTemplate(connectionFactory);
        template.setMessageConverter(jsonMessageConverter());
        template.setMandatory(true);
        return template;
    }
    
    @Bean
    public DirectExchange orderExchange() {
        return new DirectExchange(ORDER_EXCHANGE, true, false);
    }
    
    @Bean
    public Queue orderQueue() {
        return QueueBuilder.durable(ORDER_QUEUE)
            .withArgument("x-dead-letter-exchange", DLQ_EXCHANGE)
            .withArgument("x-dead-letter-routing-key", "dlq.order")
            .build();
    }
    
    @Bean
    public Binding orderBinding() {
        return BindingBuilder.bind(orderQueue())
            .to(orderExchange())
            .with(ORDER_ROUTING_KEY);
    }
    
    @Bean
    public TopicExchange notificationExchange() {
        return new TopicExchange(NOTIFICATION_EXCHANGE, true, false);
    }
    
    @Bean
    public Queue notificationQueue() {
        return QueueBuilder.durable(NOTIFICATION_QUEUE).build();
    }
    
    @Bean
    public Binding notificationBinding() {
        return BindingBuilder.bind(notificationQueue())
            .to(notificationExchange())
            .with("notification.#");
    }
    
    @Bean
    public DirectExchange dlqExchange() {
        return new DirectExchange(DLQ_EXCHANGE, true, false);
    }
    
    @Bean
    public Queue dlqQueue() {
        return QueueBuilder.durable(DLQ_QUEUE).build();
    }
    
    @Bean
    public Binding dlqBinding() {
        return BindingBuilder.bind(dlqQueue())
            .to(dlqExchange())
            .with("dlq.#");
    }
    
    @Bean
    public MessageChannel orderInputChannel() {
        return MessageChannels.direct().getObject();
    }
    
    @Bean
    public MessageChannel orderOutputChannel() {
        return MessageChannels.direct().getObject();
    }
    
    @Bean
    public MessageChannel notificationChannel() {
        return MessageChannels.direct().getObject();
    }
    
    @Bean
    public QueueChannel errorChannel() {
        return MessageChannels.queue(100).getObject();
    }
}

入站通道适配器

java
package com.example.integration.inbound;

import com.example.integration.config.RabbitMQIntegrationConfig;
import com.example.integration.model.OrderMessage;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.amqp.dsl.Amqp;
import org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.handler.annotation.Payload;

@Configuration
@Slf4j
public class InboundChannelAdapterConfig {
    
    @Bean
    public MessageChannel amqpInputChannel() {
        return new DirectChannel();
    }
    
    @Bean
    public AmqpInboundChannelAdapter inboundAdapter(
            ConnectionFactory connectionFactory,
            MessageChannel amqpInputChannel) {
        
        AmqpInboundChannelAdapter adapter = Amp.inboundAdapter(
                connectionFactory,
                RabbitMQIntegrationConfig.ORDER_QUEUE)
            .messageConverter(new Jackson2JsonMessageConverter())
            .outputChannel(amqpInputChannel)
            .get();
        
        adapter.setAutoStartup(true);
        return adapter;
    }
    
    @ServiceActivator(inputChannel = "amqpInputChannel")
    public void handleOrderMessage(@Payload OrderMessage order) {
        log.info("接收到订单消息: {}", order);
        processOrder(order);
    }
    
    private void processOrder(OrderMessage order) {
        log.info("处理订单: ID={}, 金额={}", order.getOrderId(), order.getAmount());
    }
    
    @Bean
    public IntegrationFlow amqpInboundFlow(ConnectionFactory connectionFactory) {
        return IntegrationFlows
            .from(Amqp.inboundAdapter(connectionFactory)
                .queueNames(RabbitMQIntegrationConfig.NOTIFICATION_QUEUE)
                .messageConverter(new Jackson2JsonMessageConverter()))
            .log(LoggingHandler.Level.INFO, "收到通知消息")
            .handle(message -> {
                log.info("处理通知: {}", message.getPayload());
            })
            .get();
    }
}

出站通道适配器

java
package com.example.integration.outbound;

import com.example.integration.config.RabbitMQIntegrationConfig;
import com.example.integration.model.OrderMessage;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.amqp.dsl.Amqp;
import org.springframework.integration.amqp.outbound.AmqpOutboundEndpoint;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.MessageChannels;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;

@Configuration
@Slf4j
public class OutboundChannelAdapterConfig {
    
    @Bean
    public MessageChannel outboundChannel() {
        return MessageChannels.direct().getObject();
    }
    
    @Bean
    @ServiceActivator(inputChannel = "outboundChannel")
    public AmqpOutboundEndpoint amqpOutboundEndpoint(RabbitTemplate rabbitTemplate) {
        AmqpOutboundEndpoint endpoint = new AmqpOutboundEndpoint(rabbitTemplate);
        endpoint.setExchangeName(RabbitMQIntegrationConfig.ORDER_EXCHANGE);
        endpoint.setRoutingKey(RabbitMQIntegrationConfig.ORDER_ROUTING_KEY);
        endpoint.setExpectReply(false);
        return endpoint;
    }
    
    @MessagingGateway(defaultRequestChannel = "outboundChannel")
    public interface OrderGateway {
        void sendOrder(OrderMessage order);
    }
    
    @Bean
    public IntegrationFlow orderOutboundFlow(ConnectionFactory connectionFactory) {
        return IntegrationFlows
            .from("orderInputChannel")
            .log(LoggingHandler.Level.INFO, "发送订单消息")
            .handle(Amqp.outboundAdapter(connectionFactory)
                .exchangeName(RabbitMQIntegrationConfig.ORDER_EXCHANGE)
                .routingKey(RabbitMQIntegrationConfig.ORDER_ROUTING_KEY)
                .messageConverter(new Jackson2JsonMessageConverter()))
            .get();
    }
}

消息网关

java
package com.example.integration.gateway;

import com.example.integration.model.OrderMessage;
import com.example.integration.model.OrderResult;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;

import java.util.concurrent.CompletableFuture;

@MessagingGateway
public interface OrderProcessingGateway {
    
    void sendOrder(OrderMessage order);
    
    OrderResult processOrder(OrderMessage order);
    
    CompletableFuture<OrderResult> processOrderAsync(OrderMessage order);
    
    void sendWithHeaders(OrderMessage order,
                        @Header("priority") int priority,
                        @Header("source") String source);
    
    @MessagingGateway(defaultRequestChannel = "orderInputChannel",
                     defaultReplyChannel = "orderOutputChannel",
                     defaultRequestTimeout = 5000,
                     defaultReplyTimeout = 10000)
    OrderResult processWithTimeout(OrderMessage order);
}

消息路由

java
package com.example.integration.router;

import com.example.integration.model.OrderMessage;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.Router;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.MessageChannels;
import org.springframework.integration.router.AbstractMessageRouter;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;

import java.util.Collection;
import java.util.Collections;

@Configuration
@Slf4j
public class MessageRouterConfig {
    
    @Bean
    public MessageChannel highPriorityChannel() {
        return MessageChannels.direct().getObject();
    }
    
    @Bean
    public MessageChannel normalPriorityChannel() {
        return MessageChannels.direct().getObject();
    }
    
    @Bean
    public MessageChannel lowPriorityChannel() {
        return MessageChannels.direct().getObject();
    }
    
    @Router(inputChannel = "orderRoutingChannel")
    public String routeOrder(OrderMessage order) {
        if (order.getAmount() > 10000) {
            return "highPriorityChannel";
        } else if (order.getAmount() > 1000) {
            return "normalPriorityChannel";
        } else {
            return "lowPriorityChannel";
        }
    }
    
    @Bean
    public AbstractMessageRouter priorityRouter() {
        return new AbstractMessageRouter() {
            @Override
            protected Collection<MessageChannel> determineTargetChannels(Message<?> message) {
                OrderMessage order = (OrderMessage) message.getPayload();
                
                if (order.getAmount() > 10000) {
                    return Collections.singletonList(highPriorityChannel());
                } else if (order.getAmount() > 1000) {
                    return Collections.singletonList(normalPriorityChannel());
                } else {
                    return Collections.singletonList(lowPriorityChannel());
                }
            }
        };
    }
    
    @Bean
    public IntegrationFlow routingFlow() {
        return IntegrationFlows
            .from("orderRoutingChannel")
            .route(OrderMessage.class, order -> {
                if (order.getAmount() > 10000) {
                    return "highPriorityChannel";
                } else if (order.getAmount() > 1000) {
                    return "normalPriorityChannel";
                } else {
                    return "lowPriorityChannel";
                }
            })
            .get();
    }
}

消息转换器

java
package com.example.integration.transformer;

import com.example.integration.model.OrderMessage;
import com.example.integration.model.OrderResult;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.Transformer;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.json.JsonToObjectTransformer;
import org.springframework.integration.json.ObjectToJsonTransformer;
import org.springframework.integration.transformer.PayloadDeserializingTransformer;
import org.springframework.integration.transformer.PayloadSerializingTransformer;
import org.springframework.messaging.Message;

import java.time.LocalDateTime;

@Configuration
@Slf4j
public class MessageTransformerConfig {
    
    @Transformer(inputChannel = "jsonInputChannel", 
                 outputChannel = "objectOutputChannel")
    @Bean
    public JsonToObjectTransformer jsonToObjectTransformer() {
        return new JsonToObjectTransformer(OrderMessage.class);
    }
    
    @Transformer(inputChannel = "objectInputChannel", 
                 outputChannel = "jsonOutputChannel")
    @Bean
    public ObjectToJsonTransformer objectToJsonTransformer() {
        return new ObjectToJsonTransformer();
    }
    
    @Transformer(inputChannel = "orderInputChannel", 
                 outputChannel = "orderResultChannel")
    public OrderResult transformOrderToResult(OrderMessage order) {
        log.info("转换订单到结果: {}", order.getOrderId());
        
        OrderResult result = new OrderResult();
        result.setOrderId(order.getOrderId());
        result.setStatus("PROCESSED");
        result.setProcessedAt(LocalDateTime.now());
        result.setMessage("订单处理成功");
        
        return result;
    }
    
    @Transformer(inputChannel = "enrichInputChannel", 
                 outputChannel = "enrichedOutputChannel")
    public OrderMessage enrichOrder(OrderMessage order) {
        if (order.getCreatedAt() == null) {
            order.setCreatedAt(LocalDateTime.now());
        }
        if (order.getStatus() == null) {
            order.setStatus("PENDING");
        }
        return order;
    }
    
    @Bean
    public IntegrationFlow transformationFlow() {
        return IntegrationFlows
            .from("transformationInputChannel")
            .transform(new ObjectToJsonTransformer())
            .log(LoggingHandler.Level.INFO, "JSON转换完成")
            .transform(new JsonToObjectTransformer(OrderMessage.class))
            .log(LoggingHandler.Level.INFO, "对象转换完成")
            .channel("transformationOutputChannel")
            .get();
    }
}

消息过滤器

java
package com.example.integration.filter;

import com.example.integration.model.OrderMessage;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.Filter;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.filter.MessageFilter;
import org.springframework.messaging.Message;

@Configuration
@Slf4j
public class MessageFilterConfig {
    
    @Filter(inputChannel = "filterInputChannel", 
            outputChannel = "filterOutputChannel",
            discardChannel = "discardChannel")
    public boolean filterOrder(OrderMessage order) {
        boolean valid = order.getAmount() > 0 
            && order.getOrderId() != null 
            && !order.getOrderId().isEmpty();
        
        if (!valid) {
            log.warn("订单消息被过滤: {}", order);
        }
        
        return valid;
    }
    
    @Bean
    public MessageFilter amountFilter() {
        MessageFilter filter = new MessageFilter(message -> {
            OrderMessage order = (OrderMessage) message.getPayload();
            return order.getAmount() >= 100;
        });
        filter.setDiscardChannelName("discardChannel");
        return filter;
    }
    
    @Bean
    public IntegrationFlow filterFlow() {
        return IntegrationFlows
            .from("filterInputChannel")
            .filter(OrderMessage.class, 
                order -> order.getAmount() > 0 && order.getOrderId() != null,
                f -> f.discardChannel("discardChannel"))
            .channel("filterOutputChannel")
            .get();
    }
    
    @Bean
    public IntegrationFlow discardFlow() {
        return IntegrationFlows
            .from("discardChannel")
            .log(LoggingHandler.Level.WARN, "消息被丢弃")
            .handle(message -> {
                log.warn("处理被丢弃的消息: {}", message.getPayload());
            })
            .get();
    }
}

消息分割与聚合

java
package com.example.integration.splitter;

import com.example.integration.model.OrderItem;
import com.example.integration.model.OrderMessage;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.aggregator.MessageGroupProcessor;
import org.springframework.integration.aggregator.MethodInvokingMessageGroupProcessor;
import org.springframework.integration.annotation.Aggregator;
import org.springframework.integration.annotation.CorrelationStrategy;
import org.springframework.integration.annotation.ReleaseStrategy;
import org.springframework.integration.annotation.Splitter;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.store.MessageGroup;
import org.springframework.messaging.Message;

import java.util.List;
import java.util.stream.Collectors;

@Configuration
@Slf4j
public class SplitterAggregatorConfig {
    
    @Splitter(inputChannel = "orderSplitChannel", 
              outputChannel = "itemProcessChannel")
    public List<OrderItem> splitOrder(OrderMessage order) {
        log.info("分割订单: {}", order.getOrderId());
        return order.getItems();
    }
    
    @Aggregator(inputChannel = "itemAggregateChannel", 
                outputChannel = "orderCompleteChannel")
    public OrderMessage aggregateItems(List<OrderItem> items) {
        log.info("聚合订单项: {} 个", items.size());
        
        OrderMessage order = new OrderMessage();
        order.setOrderId(items.get(0).getOrderId());
        order.setItems(items);
        order.setAmount(items.stream()
            .mapToDouble(OrderItem::getPrice)
            .sum());
        
        return order;
    }
    
    @CorrelationStrategy
    public String correlateByOrderId(OrderItem item) {
        return item.getOrderId();
    }
    
    @ReleaseStrategy
    public boolean canRelease(MessageGroup group) {
        return group.size() >= 1;
    }
    
    @Bean
    public IntegrationFlow splitAggregateFlow() {
        return IntegrationFlows
            .from("orderSplitChannel")
            .split(OrderMessage.class, OrderMessage::getItems)
            .channel("itemProcessChannel")
            .handle(item -> {
                log.info("处理订单项: {}", item);
            })
            .channel("itemAggregateChannel")
            .aggregate(a -> a
                .correlationStrategy(m -> ((OrderItem) m.getPayload()).getOrderId())
                .releaseStrategy(g -> g.size() >= 1))
            .channel("orderCompleteChannel")
            .get();
    }
}

PHP 与 Spring Integration 集成

PHP 消息生产者

php
<?php

require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;

class SpringIntegrationProducer
{
    private $connection;
    private $channel;
    
    public function __construct(array $config)
    {
        $this->connection = new AMQPStreamConnection(
            $config['host'] ?? 'localhost',
            $config['port'] ?? 5672,
            $config['user'] ?? 'guest',
            $config['password'] ?? 'guest',
            $config['vhost'] ?? '/'
        );
        
        $this->channel = $this->connection->channel();
        $this->setupExchangesAndQueues();
    }
    
    private function setupExchangesAndQueues(): void
    {
        $this->channel->exchange_declare('exchange.order', 'direct', false, true, false);
        $this->channel->exchange_declare('exchange.notification', 'topic', false, true, false);
        
        $this->channel->queue_declare('queue.order', false, true, false, false, false,
            new AMQPTable([
                'x-dead-letter-exchange' => 'exchange.dlq',
                'x-dead-letter-routing-key' => 'dlq.order',
            ])
        );
        
        $this->channel->queue_bind('queue.order', 'exchange.order', 'order.created');
    }
    
    public function sendOrderMessage(array $orderData): bool
    {
        $message = new AMQPMessage(
            json_encode($orderData),
            [
                'content_type' => 'application/json',
                'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
                'message_id' => uniqid('msg-', true),
                'timestamp' => time(),
                'headers' => new AMQPTable([
                    'spring_json_header_type' => 'com.example.integration.model.OrderMessage',
                    'priority' => $orderData['priority'] ?? 5,
                ])
            ]
        );
        
        $this->channel->basic_publish(
            $message, 
            'exchange.order', 
            'order.created'
        );
        
        return true;
    }
    
    public function sendNotification(string $type, array $data): bool
    {
        $routingKey = "notification.{$type}";
        
        $message = new AMQPMessage(
            json_encode($data),
            [
                'content_type' => 'application/json',
                'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
            ]
        );
        
        $this->channel->basic_publish(
            $message, 
            'exchange.notification', 
            $routingKey
        );
        
        return true;
    }
    
    public function close(): void
    {
        $this->channel->close();
        $this->connection->close();
    }
}

$producer = new SpringIntegrationProducer([
    'host' => 'localhost',
    'port' => 5672,
    'user' => 'guest',
    'password' => 'guest',
]);

$orderData = [
    'orderId' => 'ORD-' . time(),
    'amount' => 1500.00,
    'customerId' => 'CUST-001',
    'status' => 'PENDING',
    'items' => [
        ['productId' => 'PROD-001', 'quantity' => 2, 'price' => 500],
        ['productId' => 'PROD-002', 'quantity' => 1, 'price' => 500],
    ],
    'priority' => 8,
];

$producer->sendOrderMessage($orderData);
echo "订单消息已发送\n";

$producer->sendNotification('email', [
    'to' => 'customer@example.com',
    'subject' => '订单确认',
    'body' => '您的订单已创建成功',
]);
echo "通知消息已发送\n";

$producer->close();

PHP 消息消费者

php
<?php

require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

class SpringIntegrationConsumer
{
    private $connection;
    private $channel;
    
    public function __construct(array $config)
    {
        $this->connection = new AMQPStreamConnection(
            $config['host'] ?? 'localhost',
            $config['port'] ?? 5672,
            $config['user'] ?? 'guest',
            $config['password'] ?? 'guest',
            $config['vhost'] ?? '/'
        );
        
        $this->channel = $this->connection->channel();
    }
    
    public function consumeOrders(string $queue, callable $processor): void
    {
        $this->channel->basic_qos(0, 10, false);
        
        $this->channel->basic_consume(
            $queue,
            '',
            false,
            false,
            false,
            false,
            function (AMQPMessage $message) use ($processor) {
                $this->processMessage($message, $processor);
            }
        );
        
        echo "等待消息...\n";
        
        while ($this->channel->is_consuming()) {
            $this->channel->wait();
        }
    }
    
    private function processMessage(AMQPMessage $message, callable $processor): void
    {
        try {
            $body = json_decode($message->getBody(), true);
            $headers = $this->extractHeaders($message);
            
            $result = $processor($body, $headers);
            
            if ($result) {
                $message->ack();
                echo "消息处理成功\n";
            } else {
                $message->nack(false, true);
                echo "消息处理失败,重新入队\n";
            }
        } catch (Exception $e) {
            echo "处理异常: " . $e->getMessage() . "\n";
            $message->nack(false, false);
        }
    }
    
    private function extractHeaders(AMQPMessage $message): array
    {
        $headers = [];
        
        if ($message->has('application_headers')) {
            $appHeaders = $message->get('application_headers')->getNativeData();
            $headers = array_merge($headers, $appHeaders);
        }
        
        $headers['message_id'] = $message->get('message_id');
        $headers['timestamp'] = $message->get('timestamp');
        $headers['content_type'] = $message->get('content_type');
        
        return $headers;
    }
    
    public function close(): void
    {
        $this->channel->close();
        $this->connection->close();
    }
}

class OrderProcessor
{
    public function process(array $orderData, array $headers): bool
    {
        echo "处理订单:\n";
        echo "  订单ID: " . ($orderData['orderId'] ?? 'N/A') . "\n";
        echo "  金额: " . ($orderData['amount'] ?? 0) . "\n";
        echo "  客户ID: " . ($orderData['customerId'] ?? 'N/A') . "\n";
        echo "  消息ID: " . ($headers['message_id'] ?? 'N/A') . "\n";
        
        if (isset($orderData['items'])) {
            echo "  订单项:\n";
            foreach ($orderData['items'] as $item) {
                echo "    - 产品: " . ($item['productId'] ?? 'N/A');
                echo ", 数量: " . ($item['quantity'] ?? 0);
                echo ", 价格: " . ($item['price'] ?? 0) . "\n";
            }
        }
        
        return true;
    }
}

$consumer = new SpringIntegrationConsumer([
    'host' => 'localhost',
    'port' => 5672,
    'user' => 'guest',
    'password' => 'guest',
]);

$processor = new OrderProcessor();

$consumer->consumeOrders('queue.order', [$processor, 'process']);

$consumer->close();

实际应用场景

场景一:订单处理流水线

java
package com.example.scenario.pipeline;

import com.example.integration.model.OrderMessage;
import com.example.integration.model.OrderResult;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.amqp.dsl.Amqp;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.handler.LoggingHandler;

import java.time.LocalDateTime;

@Configuration
@Slf4j
public class OrderPipelineConfig {
    
    @Bean
    public IntegrationFlow orderProcessingPipeline(ConnectionFactory connectionFactory) {
        return IntegrationFlows
            .from(Amqp.inboundAdapter(connectionFactory)
                .queueNames("queue.order.pipeline")
                .messageConverter(new Jackson2JsonMessageConverter()))
            
            .log(LoggingHandler.Level.INFO, "开始处理订单")
            
            .filter(OrderMessage.class, 
                order -> order.getAmount() > 0,
                f -> f.discardChannel("invalidOrderChannel"))
            
            .transform(OrderMessage.class, order -> {
                order.setProcessedAt(LocalDateTime.now());
                return order;
            })
            
            .route(OrderMessage.class, order -> {
                if (order.getAmount() > 10000) {
                    return "highValueOrderChannel";
                } else if (order.getAmount() > 1000) {
                    return "normalOrderChannel";
                } else {
                    return "lowValueOrderChannel";
                }
            })
            
            .get();
    }
    
    @Bean
    public IntegrationFlow highValueOrderFlow() {
        return IntegrationFlows
            .from("highValueOrderChannel")
            .log(LoggingHandler.Level.INFO, "处理高价值订单")
            .handle(order -> {
                log.info("高价值订单处理: {}", order.getPayload());
            })
            .channel("orderResultChannel")
            .get();
    }
    
    @Bean
    public IntegrationFlow normalOrderFlow() {
        return IntegrationFlows
            .from("normalOrderChannel")
            .log(LoggingHandler.Level.INFO, "处理普通订单")
            .handle(order -> {
                log.info("普通订单处理: {}", order.getPayload());
            })
            .channel("orderResultChannel")
            .get();
    }
    
    @Bean
    public IntegrationFlow lowValueOrderFlow() {
        return IntegrationFlows
            .from("lowValueOrderChannel")
            .log(LoggingHandler.Level.INFO, "处理低价值订单")
            .handle(order -> {
                log.info("低价值订单处理: {}", order.getPayload());
            })
            .channel("orderResultChannel")
            .get();
    }
    
    @Bean
    public IntegrationFlow resultFlow(ConnectionFactory connectionFactory) {
        return IntegrationFlows
            .from("orderResultChannel")
            .transform(OrderMessage.class, order -> {
                OrderResult result = new OrderResult();
                result.setOrderId(order.getOrderId());
                result.setStatus("COMPLETED");
                result.setProcessedAt(LocalDateTime.now());
                return result;
            })
            .handle(Amqp.outboundAdapter(connectionFactory)
                .exchangeName("exchange.order.result")
                .routingKey("order.completed"))
            .get();
    }
}

场景二:消息广播与订阅

java
package com.example.scenario.pubsub;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.amqp.dsl.Amqp;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.MessageChannels;
import org.springframework.messaging.MessageChannel;

@Configuration
@Slf4j
public class PubSubConfig {
    
    @Bean
    public FanoutExchange broadcastExchange() {
        return new FanoutExchange("exchange.broadcast", true, false);
    }
    
    @Bean
    public MessageChannel broadcastInputChannel() {
        return MessageChannels.direct().getObject();
    }
    
    @Bean
    public IntegrationFlow broadcastPublisher(ConnectionFactory connectionFactory) {
        return IntegrationFlows
            .from("broadcastInputChannel")
            .log(LoggingHandler.Level.INFO, "广播消息")
            .handle(Amqp.outboundAdapter(connectionFactory)
                .exchangeName("exchange.broadcast"))
            .get();
    }
    
    @Bean
    public IntegrationFlow subscriber1(ConnectionFactory connectionFactory) {
        return IntegrationFlows
            .from(Amqp.inboundAdapter(connectionFactory)
                .queueNames("queue.subscriber1"))
            .handle(message -> {
                log.info("订阅者1收到消息: {}", message.getPayload());
            })
            .get();
    }
    
    @Bean
    public IntegrationFlow subscriber2(ConnectionFactory connectionFactory) {
        return IntegrationFlows
            .from(Amqp.inboundAdapter(connectionFactory)
                .queueNames("queue.subscriber2"))
            .handle(message -> {
                log.info("订阅者2收到消息: {}", message.getPayload());
            })
            .get();
    }
}

常见问题与解决方案

问题一:消息序列化失败

症状: 消息无法正确序列化/反序列化

解决方案:

java
@Configuration
public class SerializationConfig {
    
    @Bean
    public MessageConverter jsonMessageConverter() {
        Jackson2JsonMessageConverter converter = new Jackson2JsonMessageConverter();
        converter.setAlwaysConvertToInferredType(true);
        return converter;
    }
    
    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate template = new RabbitTemplate(connectionFactory);
        template.setMessageConverter(jsonMessageConverter());
        return template;
    }
}

问题二:消息堆积

症状: 队列中消息堆积,消费速度慢

解决方案:

java
@Bean
public IntegrationFlow highThroughputFlow(ConnectionFactory connectionFactory) {
    return IntegrationFlows
        .from(Amqp.inboundAdapter(connectionFactory)
            .queueNames("queue.high.throughput")
            .concurrentConsumers(10)
            .maxConcurrentConsumers(50)
            .prefetchCount(20))
        .channel(MessageChannels.executor(Executors.newFixedThreadPool(20)))
        .handle(message -> {
            processMessage(message);
        })
        .get();
}

问题三:消息顺序保证

症状: 消息处理顺序与发送顺序不一致

解决方案:

java
@Bean
public IntegrationFlow orderedProcessingFlow(ConnectionFactory connectionFactory) {
    return IntegrationFlows
        .from(Amqp.inboundAdapter(connectionFactory)
            .queueNames("queue.ordered")
            .concurrentConsumers(1))
        .channel(MessageChannels.queue())
        .handle(message -> {
            processInOrder(message);
        })
        .get();
}

最佳实践建议

1. 错误处理

java
@Bean
public IntegrationFlow errorHandlingFlow() {
    return IntegrationFlows
        .from("errorChannel")
        .log(LoggingHandler.Level.ERROR, "错误处理")
        .handle(message -> {
            ErrorMessage errorMessage = (ErrorMessage) message;
            log.error("处理失败: {}", errorMessage.getPayload());
            storeErrorForRetry(errorMessage);
        })
        .get();
}

2. 监控配置

java
@Bean
public IntegrationFlow monitoringFlow() {
    return IntegrationFlows
        .from("monitoredChannel")
        .wireTap("monitoringChannel")
        .handle(message -> {
            processMessage(message);
        })
        .get();
}

@Bean
public IntegrationFlow monitoringChannelFlow() {
    return IntegrationFlows
        .from("monitoringChannel")
        .handle(message -> {
            metricsService.recordMessageProcessed(message);
        })
        .get();
}

3. 重试机制

java
@Bean
public IntegrationFlow retryFlow() {
    return IntegrationFlows
        .from("retryChannel")
        .retryWhen(retrySpec -> retrySpec
            .maxAttempts(3)
            .filter(throwable -> throwable instanceof RetryableException)
            .backoff(Backoff.exponential(
                Duration.ofSeconds(1),
                Duration.ofSeconds(30),
                2.0,
                true)))
        .handle(message -> {
            processWithRetry(message);
        })
        .get();
}

版本兼容性

Spring IntegrationSpring BootSpring AMQPRabbitMQ Server
6.2.x3.2.x3.1.x3.11+
6.1.x3.1.x3.0.x3.10+
6.0.x3.0.x3.0.x3.9+
5.5.x2.7.x2.4.x3.9+
5.4.x2.6.x2.4.x3.8+

相关链接