Skip to content

Spring Cloud Stream 与 RabbitMQ 集成

概述

Spring Cloud Stream 是一个用于构建消息驱动微服务的框架,它抽象了底层消息中间件的细节,提供了统一的编程模型。在 Spring Cloud Stream 中,RabbitMQ 是最常用的消息中间件之一,它提供了可靠的消息传递、灵活的路由和强大的集群能力。

本教程将详细介绍如何在 Spring Cloud Stream 中集成 RabbitMQ,包括配置详解、代码示例和最佳实践。

集成架构设计

架构图

┌─────────────────────────────────────────────────────────────────┐
│                     Spring Cloud Stream                         │
│  ┌─────────────────────────────────────────────────────────┐    │
│  │                    Binder Layer                          │    │
│  │  ┌─────────────────────────────────────────────────────┐ │    │
│  │  │              RabbitMQ Binder                         │ │    │
│  │  │  • Channel → Exchange/Queue 映射                    │ │    │
│  │  │  • 消息序列化/反序列化                                │ │    │
│  │  │  • 消费者组管理                                      │ │    │
│  │  └─────────────────────────────────────────────────────┘ │    │
│  └─────────────────────────────────────────────────────────┘    │
│                              │                                   │
│                              ▼                                   │
│  ┌─────────────────────────────────────────────────────────┐    │
│  │                    Application Layer                      │    │
│  │  @EnableBinding / @StreamListener / @RabbitListener      │    │
│  └─────────────────────────────────────────────────────────┘    │
└─────────────────────────────────────────────────────────────────┘


┌─────────────────────────────────────────────────────────────────┐
│                      RabbitMQ Cluster                            │
│  ┌─────────────┐    ┌─────────────┐    ┌─────────────┐         │
│  │   Node 1    │◄──►│   Node 2    │◄──►│   Node 3    │         │
│  │  Exchange   │    │  Exchange   │    │  Exchange   │         │
│  │   Queue     │    │   Queue     │    │   Queue     │         │
│  └─────────────┘    └─────────────┘    └─────────────┘         │
└─────────────────────────────────────────────────────────────────┘

核心概念

概念说明
Binder消息中间件的抽象层,Spring Cloud Stream 通过 Binder 与 RabbitMQ 交互
ChannelSpring Integration 的 Channel,对应 RabbitMQ 的 Exchange 和 Queue
Destination消息的目标地址,可以是 Exchange 或 Queue
Consumer Group消费者组,实现消息的负载均衡
Message消息载体,包含 Header 和 Body

配置示例

Maven 依赖

xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
         http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    
    <groupId>com.example</groupId>
    <artifactId>rabbitmq-spring-cloud-stream</artifactId>
    <version>1.0.0</version>
    
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>3.2.0</version>
        <relativePath/>
    </parent>
    
    <properties>
        <spring-cloud.version>2023.0.0</spring-cloud.version>
    </properties>
    
    <dependencies>
        <!-- Spring Cloud Stream RabbitMQ Binder -->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-binder-rabbit</artifactId>
        </dependency>
        
        <!-- Spring Boot Web -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        
        <!-- JSON 序列化 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-json</artifactId>
        </dependency>
        
        <!-- Lombok -->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        
        <!-- Test -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-test-binder</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>
    
    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>${spring-cloud.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>
    
    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>
</project>

application.yml 配置

yaml
server:
  port: 8080

spring:
  application:
    name: rabbitmq-integration-demo
  
  cloud:
    stream:
      # RabbitMQ Binder 配置
      rabbit:
        binder:
          # RabbitMQ 连接地址
          addresses: rabbitmq1:5672,rabbitmq2:5672,rabbitmq3:5672
          # 用户名
          username: guest
          # 密码
          password: guest
          # 虚拟主机
          virtual-host: /
          # 连接超时时间(毫秒)
          connection-timeout: 10000
          # 是否启用 SSL
          ssl:
            enabled: false
          # 性能优化
          performance:
            # 发布确认模式
            publisher-confirm-type: correlated
            # 发布返回模式
            publisher-returns: true
          # 集群配置
          cluster:
            check-for-orphans: true
      
      # 默认 Binding 配置
      default:
        # 默认生产者配置
        producer:
          # 消息持久化
          persistent: true
          # 分区键
          partition-key-expression: headers['partition']
          # 分区数量
          partition-count: 3
          # 必需属性
          required-properties: 
            - contentType
          # 压缩类型
          compression-type: gzip
          # 批量发送
          batch-mode: false
          # 发送超时(毫秒)
          send-timeout: 5000
          # 错误处理
          error-channel-enabled: true
        
        # 默认消费者配置
        consumer:
          # 手动确认
          acknowledge-mode: auto
          # 并发消费者数
          concurrency: 3
          # 最大并发数
          max-concurrency: 10
          # 预取数量
          prefetch: 10
          # 持久化
          durable: true
          # 失败消息重试
          retry:
            enabled: true
            initial-interval: 1000
            max-attempts: 3
            max-interval: 10000
            multiplier: 2.0
          # 死信队列
          dead-letter:
            queue: dlq
            exchange: dlx
            routing-key: dlq.routing.key
          # 分区消费
          partitioned: true
          # 消费者组
          group: ${spring.application.name}
      
      # 自定义 Binding 配置
      bindings:
        # 输出通道配置
        output:
          destination: exchange.example
          group: group.example
          content-type: application/json
          binder: rabbit
        
        # 输入通道配置  
        input:
          destination: exchange.example
          group: group.example
          content-type: application/json
          binder: rabbit
          consumer:
            max-attempts: 3
            retry-delay: 2000ms

# RabbitMQ 高级配置
rabbitmq:
  host: localhost
  port: 5672
  username: guest
  password: guest
  virtual-host: /
  
  # 连接池配置
  cache:
    channel:
      size: 25
      checkout-timeout: 5000
  
  # 监听器配置
  listener:
    simple:
      acknowledge-mode: auto
      concurrency: 3
      max-concurrency: 10
      prefetch: 10
      retry:
        enabled: true
        initial-interval: 1000
        max-attempts: 3
        max-interval: 10000
        multiplier: 2.0
      default-requeue-rejected: false
  
  # 发布确认
  publisher-confirm-type: correlated
  publisher-returns: true
  template:
    mandatory: true

Java 代码示例

生产者服务

java
package com.example.rabbitmq.producer;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.config.EnablePublisher;
import org.springframework.integration.core.MessageSelector;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.messaging.support.MessageBuilder;

import java.time.LocalDateTime;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Supplier;

@SpringBootApplication
@EnableBinding(Source.class)
@EnablePublisher
public class ProducerApplication {
    
    public static void main(String[] args) {
        SpringApplication.run(ProducerApplication.class, args);
    }
    
    @Bean
    public Supplier<Message<OrderEvent>> orderSupplier() {
        return () -> {
            OrderEvent order = new OrderEvent();
            order.setOrderId("ORD-" + System.currentTimeMillis());
            order.setAmount(Math.random() * 1000);
            order.setStatus("PENDING");
            order.setCreatedAt(LocalDateTime.now());
            
            Map<String, Object> headers = new HashMap<>();
            headers.put("contentType", "application/json");
            headers.put("priority", 5);
            headers.put("timestamp", System.currentTimeMillis());
            
            return MessageBuilder.createMessage(order, 
                new org.springframework.messaging.support.MessageHeaders(headers));
        };
    }
}

@Data
@NoArgsConstructor
@AllArgsConstructor
class OrderEvent {
    private String orderId;
    private Double amount;
    private String status;
    private LocalDateTime createdAt;
}

消费者服务

java
package com.example.rabbitmq.consumer;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.annotation.Filter;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.messaging.Message;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.messaging.support.ErrorMessage;

import java.time.LocalDateTime;
import java.util.Map;

@SpringBootApplication
@EnableBinding(Sink.class)
@Slf4j
public class ConsumerApplication {
    
    public static void main(String[] args) {
        SpringApplication.run(ConsumerApplication.class, args);
    }
    
    @StreamListener(Sink.INPUT)
    public void handleOrderMessage(@Payload OrderEvent order,
                                   @Headers Map<String, Object> headers,
                                   @Header(name = "amqp_deliveryTag", defaultValue = "0") Long deliveryTag,
                                   @Header(name = "amqp_consumerQueue", defaultValue = "") String queue) {
        log.info("接收到订单消息: {}", order);
        log.info("消息头信息: {}", headers);
        log.info("投递标签: {}, 队列: {}", deliveryTag, queue);
        
        try {
            processOrder(order);
            log.info("订单处理成功: {}", order.getOrderId());
        } catch (Exception e) {
            log.error("订单处理失败: {}", order.getOrderId(), e);
            throw e;
        }
    }
    
    private void processOrder(OrderEvent order) {
        log.info("处理订单: ID={}, 金额={}", order.getOrderId(), order.getAmount());
    }
    
    @Bean
    public MessageSelector orderFilter() {
        return message -> {
            Object payload = message.getPayload();
            if (payload instanceof OrderEvent) {
                return ((OrderEvent) payload).getAmount() > 100;
            }
            return true;
        };
    }
    
    @ServiceActivator(inputChannel = "input.errors")
    public void handleError(ErrorMessage errorMessage) {
        log.error("处理错误消息: {}", errorMessage.getPayload().getMessage());
        log.error("原始消息: {}", errorMessage.getOriginalMessage());
    }
}

@Data
@NoArgsConstructor
@AllArgsConstructor
class OrderEvent {
    private String orderId;
    private Double amount;
    private String status;
    private LocalDateTime createdAt;
}

响应式编程示例

java
package com.example.rabbitmq.reactive;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.http.MediaType;
import org.springframework.messaging.Message;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.time.Duration;
import java.time.LocalDateTime;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;

@SpringBootApplication
@Slf4j
public class ReactiveApplication {
    
    public static void main(String[] args) {
        SpringApplication.run(ReactiveApplication.class, args);
    }
    
    @Bean
    public Supplier<Flux<OrderEvent>> orderStream() {
        return () -> Flux.interval(Duration.ofSeconds(1))
            .map(tick -> {
                OrderEvent order = new OrderEvent();
                order.setOrderId("ORD-" + System.currentTimeMillis());
                order.setAmount(Math.random() * 1000);
                order.setStatus("PENDING");
                order.setCreatedAt(LocalDateTime.now());
                return order;
            });
    }
    
    @Bean
    public Function<Flux<Message<OrderEvent>>, Mono<Void>> processOrder() {
        return flux -> flux
            .doOnNext(message -> log.info("接收消息: {}", message.getPayload()))
            .flatMap(message -> processOrderAsync(message.getPayload())
                .doOnSuccess(result -> log.info("处理成功: {}", result))
                .onErrorResume(e -> {
                    log.error("处理失败: {}", e.getMessage());
                    return Mono.empty();
                }))
            .then();
    }
    
    private Mono<String> processOrderAsync(OrderEvent order) {
        return Mono.just("Processed: " + order.getOrderId())
            .delayElement(Duration.ofMillis(100));
    }
}

@Data
@NoArgsConstructor
@AllArgsConstructor
class OrderEvent {
    private String orderId;
    private Double amount;
    private String status;
    private LocalDateTime createdAt;
}

分区处理示例

java
package com.example.rabbitmq.partition;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;

@SpringBootApplication
@EnableBinding(PartitionProcessor.class)
@Slf4j
public class PartitionApplication {
    
    public static void main(String[] args) {
        SpringApplication.run(PartitionApplication.class, args);
    }
    
    @StreamListener("partitioned-input")
    public void handlePartitionedMessage(OrderEvent order) {
        log.info("分区 [{}] 接收到订单: {}", 
            Thread.currentThread().getName(), 
            order);
    }
    
    @StreamListener("partitioned-input-2")
    public void handlePartitionedMessage2(OrderEvent order) {
        log.info("分区2 [{}] 接收到订单: {}", 
            Thread.currentThread().getName(), 
            order);
    }
}

interface PartitionProcessor {
    String PARTITIONED_INPUT = "partitioned-input";
    String PARTITIONED_INPUT_2 = "partitioned-input-2";
    String PARTITIONED_OUTPUT = "partitioned-output";
    
    @Input(PARTITIONED_INPUT)
    SubscribableChannel partitionedInput();
    
    @Input(PARTITIONED_INPUT_2)
    SubscribableChannel partitionedInput2();
    
    @Output(PARTITIONED_OUTPUT)
    MessageChannel partitionedOutput();
}

@Data
@NoArgsConstructor
@AllArgsConstructor
class OrderEvent {
    private String orderId;
    private Double amount;
    private String status;
    private LocalDateTime createdAt;
}

动态路由示例

java
package com.example.rabbitmq.dynamic;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.binding.DynamicDestinationBinding;
import org.springframework.cloud.stream.config.BindingServiceProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.handler.AbstractMessageHandler;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.messaging.support.MessageBuilder;

import jakarta.annotation.PostConstruct;
import java.util.HashMap;
import java.util.Map;

@SpringBootApplication
@Slf4j
public class DynamicRoutingApplication {
    
    public static void main(String[] args) {
        SpringApplication.run(DynamicRoutingApplication.class, args);
    }
    
    @Bean
    public DynamicDestinationBinder dynamicDestinationBinder(
            BindingServiceProperties bindingServiceProperties) {
        return new DynamicDestinationBinder(bindingServiceProperties);
    }
    
    @PostConstruct
    public void init() {
        log.info("动态路由示例应用启动");
    }
}

class DynamicDestinationBinder {
    private final BindingServiceProperties bindingServiceProperties;
    
    public DynamicDestinationBinder(BindingServiceProperties bindingServiceProperties) {
        this.bindingServiceProperties = bindingServiceProperties;
    }
    
    public void bindProducer(String channelName, String destination, String group) {
        Map<String, Object> props = new HashMap<>();
        props.put("destination", destination);
        if (group != null) {
            props.put("group", group);
        }
        bindingServiceProperties.getBindings().put(channelName, 
            new org.springframework.cloud.stream.config.BindingProperties(props));
    }
}

@Data
@NoArgsConstructor
@AllArgsConstructor
class OrderEvent {
    private String orderId;
    private Double amount;
    private String status;
}

PHP 与 Spring Cloud Stream 集成

虽然 Spring Cloud Stream 主要是 Java 生态的框架,但 PHP 应用可以通过以下方式与其集成:

PHP 消息生产者

php
<?php

require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;

class SpringCloudStreamProducer
{
    private $connection;
    private $channel;
    private $exchange;
    
    public function __construct(array $config)
    {
        $this->connection = new AMQPStreamConnection(
            $config['host'] ?? 'localhost',
            $config['port'] ?? 5672,
            $config['user'] ?? 'guest',
            $config['password'] ?? 'guest',
            $config['vhost'] ?? '/'
        );
        
        $this->channel = $this->connection->channel();
        $this->exchange = $config['exchange'] ?? 'exchange.example';
        
        $this->declareExchange();
    }
    
    private function declareExchange()
    {
        $this->channel->exchange_declare(
            $this->exchange,
            'topic',
            false,
            true,
            false
        );
    }
    
    public function sendOrderMessage(array $orderData): void
    {
        $message = new AMQPMessage(
            json_encode($orderData),
            [
                'content_type' => 'application/json',
                'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
                'timestamp' => time(),
                'headers' => new AMQPTable([
                    'contentType' => 'application/json',
                    'spring_json_header_type' => 'order',
                    'timestamp' => time() * 1000,
                ])
            ]
        );
        
        $routingKey = 'order.created';
        $this->channel->basic_publish($message, $this->exchange, $routingKey);
        
        echo "消息已发送: {$routingKey}\n";
    }
    
    public function close(): void
    {
        $this->channel->close();
        $this->connection->close();
    }
}

$config = [
    'host' => getenv('RABBITMQ_HOST') ?: 'localhost',
    'port' => (int)(getenv('RABBITMQ_PORT') ?: 5672),
    'user' => getenv('RABBITMQ_USER') ?: 'guest',
    'password' => getenv('RABBITMQ_PASSWORD') ?: 'guest',
    'vhost' => getenv('RABBITMQ_VHOST') ?: '/',
    'exchange' => 'exchange.example',
];

$producer = new SpringCloudStreamProducer($config);

$orderData = [
    'orderId' => 'ORD-' . time(),
    'amount' => rand(100, 1000),
    'status' => 'PENDING',
    'createdAt' => date('c'),
];

$producer->sendOrderMessage($orderData);
$producer->close();

PHP 消息消费者

php
<?php

require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;

class SpringCloudStreamConsumer
{
    private $connection;
    private $channel;
    private $queue;
    private $exchange;
    
    public function __construct(array $config)
    {
        $this->connection = new AMQPStreamConnection(
            $config['host'] ?? 'localhost',
            $config['port'] ?? 5672,
            $config['user'] ?? 'guest',
            $config['password'] ?? 'guest',
            $config['vhost'] ?? '/'
        );
        
        $this->channel = $this->connection->channel();
        $this->queue = $config['queue'] ?? 'queue.example';
        $this->exchange = $config['exchange'] ?? 'exchange.example';
        
        $this->setupQueue();
    }
    
    private function setupQueue(): void
    {
        $this->channel->exchange_declare(
            $this->exchange,
            'topic',
            false,
            true,
            false
        );
        
        $this->channel->queue_declare(
            $this->queue,
            false,
            true,
            false,
            false,
            false,
            new AMQPTable([
                'x-dead-letter-exchange' => 'dlx',
                'x-dead-letter-routing-key' => 'dlq.routing.key',
            ])
        );
        
        $this->channel->queue_bind($this->queue, $this->exchange, 'order.#');
    }
    
    public function consume(callable $callback): void
    {
        $this->channel->basic_qos(0, 10, false);
        
        $this->channel->basic_consume(
            $this->queue,
            '',
            false,
            false,
            false,
            false,
            function (AMQPMessage $message) use ($callback) {
                $data = json_decode($message->getBody(), true);
                $headers = $message->has('application_headers') 
                    ? $message->get('application_headers')->getNativeData() 
                    : [];
                
                try {
                    $callback($data, $headers);
                    $message->ack();
                } catch (Exception $e) {
                    echo "处理失败: " . $e->getMessage() . "\n";
                    $message->nack(false, false);
                }
            }
        );
        
        while ($this->channel->is_consuming()) {
            $this->channel->wait();
        }
    }
    
    public function close(): void
    {
        $this->channel->close();
        $this->connection->close();
    }
}

$config = [
    'host' => getenv('RABBITMQ_HOST') ?: 'localhost',
    'port' => (int)(getenv('RABBITMQ_PORT') ?: 5672),
    'user' => getenv('RABBITMQ_USER') ?: 'guest',
    'password' => getenv('RABBITMQ_PASSWORD') ?: 'guest',
    'vhost' => getenv('RABBITMQ_VHOST') ?: '/',
    'exchange' => 'exchange.example',
    'queue' => 'group.example',
];

$consumer = new SpringCloudStreamConsumer($config);

echo "开始消费消息...\n";

$consumer->consume(function (array $data, array $headers) {
    echo "收到订单消息:\n";
    echo "  订单ID: " . ($data['orderId'] ?? 'N/A') . "\n";
    echo "  金额: " . ($data['amount'] ?? 0) . "\n";
    echo "  状态: " . ($data['status'] ?? 'N/A') . "\n";
    echo "  时间戳: " . ($headers['timestamp'] ?? 'N/A') . "\n";
    echo "\n";
});

$consumer->close();

实际应用场景

场景一:订单处理系统

java
package com.example.scenario.order;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;

@SpringBootApplication
@EnableBinding(OrderChannels.class)
@Slf4j
public class OrderProcessingApplication {
    
    public static void main(String[] args) {
        SpringApplication.run(OrderProcessingApplication.class, args);
    }
    
    @StreamListener(target = OrderChannels.ORDER_INPUT)
    @SendTo(OrderChannels.ORDER_OUTPUT)
    public OrderResult processOrder(OrderMessage order) {
        log.info("处理订单: {}", order.getOrderId());
        
        try {
            validateOrder(order);
            processPayment(order);
            reserveInventory(order);
            confirmOrder(order);
            
            return new OrderResult(order.getOrderId(), "SUCCESS", "订单处理成功");
        } catch (Exception e) {
            log.error("订单处理失败: {}", order.getOrderId(), e);
            return new OrderResult(order.getOrderId(), "FAILED", e.getMessage());
        }
    }
    
    @StreamListener(target = OrderChannels.ORDER_OUTPUT)
    public void handleOrderResult(OrderResult result) {
        log.info("订单结果: {}", result);
    }
    
    private void validateOrder(OrderMessage order) {
        if (order.getAmount() <= 0) {
            throw new IllegalArgumentException("订单金额必须大于0");
        }
    }
    
    private void processPayment(OrderMessage order) {
        log.info("处理支付: {}", order.getOrderId());
    }
    
    private void reserveInventory(OrderMessage order) {
        log.info("预留库存: {}", order.getOrderId());
    }
    
    private void confirmOrder(OrderMessage order) {
        log.info("确认订单: {}", order.getOrderId());
    }
}

interface OrderChannels {
    String ORDER_INPUT = "order-input";
    String ORDER_OUTPUT = "order-output";
    String PAYMENT_INPUT = "payment-input";
    String NOTIFICATION_OUTPUT = "notification-output";
    
    @Input(ORDER_INPUT)
    SubscribableChannel orderInput();
    
    @Output(ORDER_OUTPUT)
    MessageChannel orderOutput();
    
    @Input(PAYMENT_INPUT)
    SubscribableChannel paymentInput();
    
    @Output(NOTIFICATION_OUTPUT)
    MessageChannel notificationOutput();
}

@Data
@NoArgsConstructor
@AllArgsConstructor
class OrderMessage {
    private String orderId;
    private Double amount;
    private String customerId;
    private String status;
}

@Data
@NoArgsConstructor
@AllArgsConstructor
class OrderResult {
    private String orderId;
    private String status;
    private String message;
}

场景二:事件驱动架构

java
package com.example.scenario.event;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.SubscribableChannel;

import java.time.LocalDateTime;

@SpringBootApplication
@EnableBinding(EventChannels.class)
@Slf4j
public class EventDrivenApplication {
    
    public static void main(String[] args) {
        SpringApplication.run(EventDrivenApplication.class, args);
    }
    
    @StreamListener(target = EventChannels.USER_EVENTS)
    public void handleUserEvent(UserEvent event) {
        log.info("收到用户事件: type={}, userId={}", 
            event.getEventType(), event.getUserId());
        
        switch (event.getEventType()) {
            case "USER_CREATED":
                handleUserCreated(event);
                break;
            case "USER_UPDATED":
                handleUserUpdated(event);
                break;
            case "USER_DELETED":
                handleUserDeleted(event);
                break;
            default:
                log.warn("未知事件类型: {}", event.getEventType());
        }
    }
    
    private void handleUserCreated(UserEvent event) {
        log.info("创建用户记录: {}", event.getUserId());
    }
    
    private void handleUserUpdated(UserEvent event) {
        log.info("更新用户记录: {}", event.getUserId());
    }
    
    private void handleUserDeleted(UserEvent event) {
        log.info("删除用户记录: {}", event.getUserId());
    }
}

interface EventChannels {
    String USER_EVENTS = "user-events";
    String ORDER_EVENTS = "order-events";
    String NOTIFICATION_EVENTS = "notification-events";
    
    @Input(USER_EVENTS)
    SubscribableChannel userEvents();
    
    @Input(ORDER_EVENTS)
    SubscribableChannel orderEvents();
    
    @Input(NOTIFICATION_EVENTS)
    SubscribableChannel notificationEvents();
}

@Data
@NoArgsConstructor
@AllArgsConstructor
class UserEvent {
    private String eventType;
    private String userId;
    private String username;
    private String email;
    private LocalDateTime timestamp;
    private Object payload;
}

常见问题与解决方案

问题一:消息丢失

症状: 消息发送后,消费者无法接收到消息

解决方案:

yaml
spring:
  cloud:
    stream:
      rabbit:
        binder:
          # 启用发布确认
          performance:
            publisher-confirm-type: correlated
            publisher-returns: true
      bindings:
        output:
          producer:
            # 启用持久化
            persistent: true
            # 必需属性检查
            required-properties:
              - contentType

问题二:消息重复消费

症状: 同一条消息被消费多次

解决方案:

java
@StreamListener(Sink.INPUT)
public void handleMessage(@Payload OrderEvent order) {
    // 使用唯一键进行幂等处理
    String idempotencyKey = order.getOrderId();
    
    redisTemplate.opsForValue()
        .setIfAbsent("processed:" + idempotencyKey, "1", 
            Duration.ofDays(1));
    
    if (Boolean.TRUE.equals(redisTemplate.hasKey("processed:" + idempotencyKey))) {
        log.warn("重复消息,跳过处理: {}", idempotencyKey);
        return;
    }
    
    processOrder(order);
}

问题三:消费者组负载不均

症状: 某些消费者处理大量消息,其他消费者空闲

解决方案:

yaml
spring:
  cloud:
    stream:
      bindings:
        input:
          consumer:
            # 设置并发数
            concurrency: 5
            # 启用分区
            partitioned: true
      rabbit:
        binder:
          # 确保队列被分区
          queues:
            - name: queue.example
              declare: true

问题四:死信队列处理

症状: 消息进入死信队列,但未被处理

解决方案:

java
@StreamListener("input.DLQ")
public void handleDeadLetter(Message<?> message) {
    log.error("死信消息: {}", message.getPayload());
    log.error("原始错误: {}", message.getHeaders().get("x-death"));
    
    // 记录或人工处理
    deadLetterService.process(message);
}

最佳实践建议

1. 消息设计

java
// 使用强类型消息
@Data
@NoArgsConstructor
@AllArgsConstructor
class OrderEvent {
    private String orderId;
    private Double amount;
    private String status;
    private LocalDateTime createdAt;
    private String idempotencyKey;
    
    public String getIdempotencyKey() {
        return orderId + "_" + createdAt.toString();
    }
}

2. 错误处理

java
@ServiceActivator(inputChannel = "input.errors")
public ErrorMessage errorHandler(ErrorMessage errorMessage) {
    log.error("处理失败: {}", errorMessage.getPayload().getMessage());
    
    // 发送到专门的错误处理队列
    errorQueue.send(errorMessage);
    
    return errorMessage;
}

3. 监控配置

yaml
spring:
  cloud:
    stream:
      bindings:
        output:
          producer:
            error-channel-enabled: true

management:
  endpoints:
    web:
      exposure:
        include: health,info,metrics,bindings
  endpoint:
    health:
      show-details: always
  metrics:
    tags:
      application: ${spring.application.name}

4. 连接池配置

yaml
spring:
  cloud:
    stream:
      rabbit:
        binder:
          # 连接池大小
          addresses: rabbitmq1:5672,rabbitmq2:5672
          connection-timeout: 10000

rabbitmq:
  cache:
    channel:
      size: 25
      checkout-timeout: 5000

版本兼容性

Spring Cloud StreamSpring BootRabbitMQ ClientRabbitMQ Server
4.0.x3.2.x5.x3.11+
3.4.x2.7.x5.x3.10+
3.3.x2.6.x5.x3.9+
3.2.x2.5.x5.x3.9+
3.1.x2.4.x5.x3.8+

相关链接