Appearance
Spring Cloud Stream 与 RabbitMQ 集成
概述
Spring Cloud Stream 是一个用于构建消息驱动微服务的框架,它抽象了底层消息中间件的细节,提供了统一的编程模型。在 Spring Cloud Stream 中,RabbitMQ 是最常用的消息中间件之一,它提供了可靠的消息传递、灵活的路由和强大的集群能力。
本教程将详细介绍如何在 Spring Cloud Stream 中集成 RabbitMQ,包括配置详解、代码示例和最佳实践。
集成架构设计
架构图
┌─────────────────────────────────────────────────────────────────┐
│ Spring Cloud Stream │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ Binder Layer │ │
│ │ ┌─────────────────────────────────────────────────────┐ │ │
│ │ │ RabbitMQ Binder │ │ │
│ │ │ • Channel → Exchange/Queue 映射 │ │ │
│ │ │ • 消息序列化/反序列化 │ │ │
│ │ │ • 消费者组管理 │ │ │
│ │ └─────────────────────────────────────────────────────┘ │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ Application Layer │ │
│ │ @EnableBinding / @StreamListener / @RabbitListener │ │
│ └─────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────┐
│ RabbitMQ Cluster │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ Node 1 │◄──►│ Node 2 │◄──►│ Node 3 │ │
│ │ Exchange │ │ Exchange │ │ Exchange │ │
│ │ Queue │ │ Queue │ │ Queue │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
└─────────────────────────────────────────────────────────────────┘核心概念
| 概念 | 说明 |
|---|---|
| Binder | 消息中间件的抽象层,Spring Cloud Stream 通过 Binder 与 RabbitMQ 交互 |
| Channel | Spring Integration 的 Channel,对应 RabbitMQ 的 Exchange 和 Queue |
| Destination | 消息的目标地址,可以是 Exchange 或 Queue |
| Consumer Group | 消费者组,实现消息的负载均衡 |
| Message | 消息载体,包含 Header 和 Body |
配置示例
Maven 依赖
xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.example</groupId>
<artifactId>rabbitmq-spring-cloud-stream</artifactId>
<version>1.0.0</version>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.2.0</version>
<relativePath/>
</parent>
<properties>
<spring-cloud.version>2023.0.0</spring-cloud.version>
</properties>
<dependencies>
<!-- Spring Cloud Stream RabbitMQ Binder -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-rabbit</artifactId>
</dependency>
<!-- Spring Boot Web -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- JSON 序列化 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-json</artifactId>
</dependency>
<!-- Lombok -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<!-- Test -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-test-binder</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>application.yml 配置
yaml
server:
port: 8080
spring:
application:
name: rabbitmq-integration-demo
cloud:
stream:
# RabbitMQ Binder 配置
rabbit:
binder:
# RabbitMQ 连接地址
addresses: rabbitmq1:5672,rabbitmq2:5672,rabbitmq3:5672
# 用户名
username: guest
# 密码
password: guest
# 虚拟主机
virtual-host: /
# 连接超时时间(毫秒)
connection-timeout: 10000
# 是否启用 SSL
ssl:
enabled: false
# 性能优化
performance:
# 发布确认模式
publisher-confirm-type: correlated
# 发布返回模式
publisher-returns: true
# 集群配置
cluster:
check-for-orphans: true
# 默认 Binding 配置
default:
# 默认生产者配置
producer:
# 消息持久化
persistent: true
# 分区键
partition-key-expression: headers['partition']
# 分区数量
partition-count: 3
# 必需属性
required-properties:
- contentType
# 压缩类型
compression-type: gzip
# 批量发送
batch-mode: false
# 发送超时(毫秒)
send-timeout: 5000
# 错误处理
error-channel-enabled: true
# 默认消费者配置
consumer:
# 手动确认
acknowledge-mode: auto
# 并发消费者数
concurrency: 3
# 最大并发数
max-concurrency: 10
# 预取数量
prefetch: 10
# 持久化
durable: true
# 失败消息重试
retry:
enabled: true
initial-interval: 1000
max-attempts: 3
max-interval: 10000
multiplier: 2.0
# 死信队列
dead-letter:
queue: dlq
exchange: dlx
routing-key: dlq.routing.key
# 分区消费
partitioned: true
# 消费者组
group: ${spring.application.name}
# 自定义 Binding 配置
bindings:
# 输出通道配置
output:
destination: exchange.example
group: group.example
content-type: application/json
binder: rabbit
# 输入通道配置
input:
destination: exchange.example
group: group.example
content-type: application/json
binder: rabbit
consumer:
max-attempts: 3
retry-delay: 2000ms
# RabbitMQ 高级配置
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
virtual-host: /
# 连接池配置
cache:
channel:
size: 25
checkout-timeout: 5000
# 监听器配置
listener:
simple:
acknowledge-mode: auto
concurrency: 3
max-concurrency: 10
prefetch: 10
retry:
enabled: true
initial-interval: 1000
max-attempts: 3
max-interval: 10000
multiplier: 2.0
default-requeue-rejected: false
# 发布确认
publisher-confirm-type: correlated
publisher-returns: true
template:
mandatory: trueJava 代码示例
生产者服务
java
package com.example.rabbitmq.producer;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.config.EnablePublisher;
import org.springframework.integration.core.MessageSelector;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.messaging.support.MessageBuilder;
import java.time.LocalDateTime;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Supplier;
@SpringBootApplication
@EnableBinding(Source.class)
@EnablePublisher
public class ProducerApplication {
public static void main(String[] args) {
SpringApplication.run(ProducerApplication.class, args);
}
@Bean
public Supplier<Message<OrderEvent>> orderSupplier() {
return () -> {
OrderEvent order = new OrderEvent();
order.setOrderId("ORD-" + System.currentTimeMillis());
order.setAmount(Math.random() * 1000);
order.setStatus("PENDING");
order.setCreatedAt(LocalDateTime.now());
Map<String, Object> headers = new HashMap<>();
headers.put("contentType", "application/json");
headers.put("priority", 5);
headers.put("timestamp", System.currentTimeMillis());
return MessageBuilder.createMessage(order,
new org.springframework.messaging.support.MessageHeaders(headers));
};
}
}
@Data
@NoArgsConstructor
@AllArgsConstructor
class OrderEvent {
private String orderId;
private Double amount;
private String status;
private LocalDateTime createdAt;
}消费者服务
java
package com.example.rabbitmq.consumer;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.annotation.Filter;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.messaging.Message;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.messaging.support.ErrorMessage;
import java.time.LocalDateTime;
import java.util.Map;
@SpringBootApplication
@EnableBinding(Sink.class)
@Slf4j
public class ConsumerApplication {
public static void main(String[] args) {
SpringApplication.run(ConsumerApplication.class, args);
}
@StreamListener(Sink.INPUT)
public void handleOrderMessage(@Payload OrderEvent order,
@Headers Map<String, Object> headers,
@Header(name = "amqp_deliveryTag", defaultValue = "0") Long deliveryTag,
@Header(name = "amqp_consumerQueue", defaultValue = "") String queue) {
log.info("接收到订单消息: {}", order);
log.info("消息头信息: {}", headers);
log.info("投递标签: {}, 队列: {}", deliveryTag, queue);
try {
processOrder(order);
log.info("订单处理成功: {}", order.getOrderId());
} catch (Exception e) {
log.error("订单处理失败: {}", order.getOrderId(), e);
throw e;
}
}
private void processOrder(OrderEvent order) {
log.info("处理订单: ID={}, 金额={}", order.getOrderId(), order.getAmount());
}
@Bean
public MessageSelector orderFilter() {
return message -> {
Object payload = message.getPayload();
if (payload instanceof OrderEvent) {
return ((OrderEvent) payload).getAmount() > 100;
}
return true;
};
}
@ServiceActivator(inputChannel = "input.errors")
public void handleError(ErrorMessage errorMessage) {
log.error("处理错误消息: {}", errorMessage.getPayload().getMessage());
log.error("原始消息: {}", errorMessage.getOriginalMessage());
}
}
@Data
@NoArgsConstructor
@AllArgsConstructor
class OrderEvent {
private String orderId;
private Double amount;
private String status;
private LocalDateTime createdAt;
}响应式编程示例
java
package com.example.rabbitmq.reactive;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.http.MediaType;
import org.springframework.messaging.Message;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
@SpringBootApplication
@Slf4j
public class ReactiveApplication {
public static void main(String[] args) {
SpringApplication.run(ReactiveApplication.class, args);
}
@Bean
public Supplier<Flux<OrderEvent>> orderStream() {
return () -> Flux.interval(Duration.ofSeconds(1))
.map(tick -> {
OrderEvent order = new OrderEvent();
order.setOrderId("ORD-" + System.currentTimeMillis());
order.setAmount(Math.random() * 1000);
order.setStatus("PENDING");
order.setCreatedAt(LocalDateTime.now());
return order;
});
}
@Bean
public Function<Flux<Message<OrderEvent>>, Mono<Void>> processOrder() {
return flux -> flux
.doOnNext(message -> log.info("接收消息: {}", message.getPayload()))
.flatMap(message -> processOrderAsync(message.getPayload())
.doOnSuccess(result -> log.info("处理成功: {}", result))
.onErrorResume(e -> {
log.error("处理失败: {}", e.getMessage());
return Mono.empty();
}))
.then();
}
private Mono<String> processOrderAsync(OrderEvent order) {
return Mono.just("Processed: " + order.getOrderId())
.delayElement(Duration.ofMillis(100));
}
}
@Data
@NoArgsConstructor
@AllArgsConstructor
class OrderEvent {
private String orderId;
private Double amount;
private String status;
private LocalDateTime createdAt;
}分区处理示例
java
package com.example.rabbitmq.partition;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;
@SpringBootApplication
@EnableBinding(PartitionProcessor.class)
@Slf4j
public class PartitionApplication {
public static void main(String[] args) {
SpringApplication.run(PartitionApplication.class, args);
}
@StreamListener("partitioned-input")
public void handlePartitionedMessage(OrderEvent order) {
log.info("分区 [{}] 接收到订单: {}",
Thread.currentThread().getName(),
order);
}
@StreamListener("partitioned-input-2")
public void handlePartitionedMessage2(OrderEvent order) {
log.info("分区2 [{}] 接收到订单: {}",
Thread.currentThread().getName(),
order);
}
}
interface PartitionProcessor {
String PARTITIONED_INPUT = "partitioned-input";
String PARTITIONED_INPUT_2 = "partitioned-input-2";
String PARTITIONED_OUTPUT = "partitioned-output";
@Input(PARTITIONED_INPUT)
SubscribableChannel partitionedInput();
@Input(PARTITIONED_INPUT_2)
SubscribableChannel partitionedInput2();
@Output(PARTITIONED_OUTPUT)
MessageChannel partitionedOutput();
}
@Data
@NoArgsConstructor
@AllArgsConstructor
class OrderEvent {
private String orderId;
private Double amount;
private String status;
private LocalDateTime createdAt;
}动态路由示例
java
package com.example.rabbitmq.dynamic;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.binding.DynamicDestinationBinding;
import org.springframework.cloud.stream.config.BindingServiceProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.handler.AbstractMessageHandler;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.messaging.support.MessageBuilder;
import jakarta.annotation.PostConstruct;
import java.util.HashMap;
import java.util.Map;
@SpringBootApplication
@Slf4j
public class DynamicRoutingApplication {
public static void main(String[] args) {
SpringApplication.run(DynamicRoutingApplication.class, args);
}
@Bean
public DynamicDestinationBinder dynamicDestinationBinder(
BindingServiceProperties bindingServiceProperties) {
return new DynamicDestinationBinder(bindingServiceProperties);
}
@PostConstruct
public void init() {
log.info("动态路由示例应用启动");
}
}
class DynamicDestinationBinder {
private final BindingServiceProperties bindingServiceProperties;
public DynamicDestinationBinder(BindingServiceProperties bindingServiceProperties) {
this.bindingServiceProperties = bindingServiceProperties;
}
public void bindProducer(String channelName, String destination, String group) {
Map<String, Object> props = new HashMap<>();
props.put("destination", destination);
if (group != null) {
props.put("group", group);
}
bindingServiceProperties.getBindings().put(channelName,
new org.springframework.cloud.stream.config.BindingProperties(props));
}
}
@Data
@NoArgsConstructor
@AllArgsConstructor
class OrderEvent {
private String orderId;
private Double amount;
private String status;
}PHP 与 Spring Cloud Stream 集成
虽然 Spring Cloud Stream 主要是 Java 生态的框架,但 PHP 应用可以通过以下方式与其集成:
PHP 消息生产者
php
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;
class SpringCloudStreamProducer
{
private $connection;
private $channel;
private $exchange;
public function __construct(array $config)
{
$this->connection = new AMQPStreamConnection(
$config['host'] ?? 'localhost',
$config['port'] ?? 5672,
$config['user'] ?? 'guest',
$config['password'] ?? 'guest',
$config['vhost'] ?? '/'
);
$this->channel = $this->connection->channel();
$this->exchange = $config['exchange'] ?? 'exchange.example';
$this->declareExchange();
}
private function declareExchange()
{
$this->channel->exchange_declare(
$this->exchange,
'topic',
false,
true,
false
);
}
public function sendOrderMessage(array $orderData): void
{
$message = new AMQPMessage(
json_encode($orderData),
[
'content_type' => 'application/json',
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
'timestamp' => time(),
'headers' => new AMQPTable([
'contentType' => 'application/json',
'spring_json_header_type' => 'order',
'timestamp' => time() * 1000,
])
]
);
$routingKey = 'order.created';
$this->channel->basic_publish($message, $this->exchange, $routingKey);
echo "消息已发送: {$routingKey}\n";
}
public function close(): void
{
$this->channel->close();
$this->connection->close();
}
}
$config = [
'host' => getenv('RABBITMQ_HOST') ?: 'localhost',
'port' => (int)(getenv('RABBITMQ_PORT') ?: 5672),
'user' => getenv('RABBITMQ_USER') ?: 'guest',
'password' => getenv('RABBITMQ_PASSWORD') ?: 'guest',
'vhost' => getenv('RABBITMQ_VHOST') ?: '/',
'exchange' => 'exchange.example',
];
$producer = new SpringCloudStreamProducer($config);
$orderData = [
'orderId' => 'ORD-' . time(),
'amount' => rand(100, 1000),
'status' => 'PENDING',
'createdAt' => date('c'),
];
$producer->sendOrderMessage($orderData);
$producer->close();PHP 消息消费者
php
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;
class SpringCloudStreamConsumer
{
private $connection;
private $channel;
private $queue;
private $exchange;
public function __construct(array $config)
{
$this->connection = new AMQPStreamConnection(
$config['host'] ?? 'localhost',
$config['port'] ?? 5672,
$config['user'] ?? 'guest',
$config['password'] ?? 'guest',
$config['vhost'] ?? '/'
);
$this->channel = $this->connection->channel();
$this->queue = $config['queue'] ?? 'queue.example';
$this->exchange = $config['exchange'] ?? 'exchange.example';
$this->setupQueue();
}
private function setupQueue(): void
{
$this->channel->exchange_declare(
$this->exchange,
'topic',
false,
true,
false
);
$this->channel->queue_declare(
$this->queue,
false,
true,
false,
false,
false,
new AMQPTable([
'x-dead-letter-exchange' => 'dlx',
'x-dead-letter-routing-key' => 'dlq.routing.key',
])
);
$this->channel->queue_bind($this->queue, $this->exchange, 'order.#');
}
public function consume(callable $callback): void
{
$this->channel->basic_qos(0, 10, false);
$this->channel->basic_consume(
$this->queue,
'',
false,
false,
false,
false,
function (AMQPMessage $message) use ($callback) {
$data = json_decode($message->getBody(), true);
$headers = $message->has('application_headers')
? $message->get('application_headers')->getNativeData()
: [];
try {
$callback($data, $headers);
$message->ack();
} catch (Exception $e) {
echo "处理失败: " . $e->getMessage() . "\n";
$message->nack(false, false);
}
}
);
while ($this->channel->is_consuming()) {
$this->channel->wait();
}
}
public function close(): void
{
$this->channel->close();
$this->connection->close();
}
}
$config = [
'host' => getenv('RABBITMQ_HOST') ?: 'localhost',
'port' => (int)(getenv('RABBITMQ_PORT') ?: 5672),
'user' => getenv('RABBITMQ_USER') ?: 'guest',
'password' => getenv('RABBITMQ_PASSWORD') ?: 'guest',
'vhost' => getenv('RABBITMQ_VHOST') ?: '/',
'exchange' => 'exchange.example',
'queue' => 'group.example',
];
$consumer = new SpringCloudStreamConsumer($config);
echo "开始消费消息...\n";
$consumer->consume(function (array $data, array $headers) {
echo "收到订单消息:\n";
echo " 订单ID: " . ($data['orderId'] ?? 'N/A') . "\n";
echo " 金额: " . ($data['amount'] ?? 0) . "\n";
echo " 状态: " . ($data['status'] ?? 'N/A') . "\n";
echo " 时间戳: " . ($headers['timestamp'] ?? 'N/A') . "\n";
echo "\n";
});
$consumer->close();实际应用场景
场景一:订单处理系统
java
package com.example.scenario.order;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;
@SpringBootApplication
@EnableBinding(OrderChannels.class)
@Slf4j
public class OrderProcessingApplication {
public static void main(String[] args) {
SpringApplication.run(OrderProcessingApplication.class, args);
}
@StreamListener(target = OrderChannels.ORDER_INPUT)
@SendTo(OrderChannels.ORDER_OUTPUT)
public OrderResult processOrder(OrderMessage order) {
log.info("处理订单: {}", order.getOrderId());
try {
validateOrder(order);
processPayment(order);
reserveInventory(order);
confirmOrder(order);
return new OrderResult(order.getOrderId(), "SUCCESS", "订单处理成功");
} catch (Exception e) {
log.error("订单处理失败: {}", order.getOrderId(), e);
return new OrderResult(order.getOrderId(), "FAILED", e.getMessage());
}
}
@StreamListener(target = OrderChannels.ORDER_OUTPUT)
public void handleOrderResult(OrderResult result) {
log.info("订单结果: {}", result);
}
private void validateOrder(OrderMessage order) {
if (order.getAmount() <= 0) {
throw new IllegalArgumentException("订单金额必须大于0");
}
}
private void processPayment(OrderMessage order) {
log.info("处理支付: {}", order.getOrderId());
}
private void reserveInventory(OrderMessage order) {
log.info("预留库存: {}", order.getOrderId());
}
private void confirmOrder(OrderMessage order) {
log.info("确认订单: {}", order.getOrderId());
}
}
interface OrderChannels {
String ORDER_INPUT = "order-input";
String ORDER_OUTPUT = "order-output";
String PAYMENT_INPUT = "payment-input";
String NOTIFICATION_OUTPUT = "notification-output";
@Input(ORDER_INPUT)
SubscribableChannel orderInput();
@Output(ORDER_OUTPUT)
MessageChannel orderOutput();
@Input(PAYMENT_INPUT)
SubscribableChannel paymentInput();
@Output(NOTIFICATION_OUTPUT)
MessageChannel notificationOutput();
}
@Data
@NoArgsConstructor
@AllArgsConstructor
class OrderMessage {
private String orderId;
private Double amount;
private String customerId;
private String status;
}
@Data
@NoArgsConstructor
@AllArgsConstructor
class OrderResult {
private String orderId;
private String status;
private String message;
}场景二:事件驱动架构
java
package com.example.scenario.event;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.SubscribableChannel;
import java.time.LocalDateTime;
@SpringBootApplication
@EnableBinding(EventChannels.class)
@Slf4j
public class EventDrivenApplication {
public static void main(String[] args) {
SpringApplication.run(EventDrivenApplication.class, args);
}
@StreamListener(target = EventChannels.USER_EVENTS)
public void handleUserEvent(UserEvent event) {
log.info("收到用户事件: type={}, userId={}",
event.getEventType(), event.getUserId());
switch (event.getEventType()) {
case "USER_CREATED":
handleUserCreated(event);
break;
case "USER_UPDATED":
handleUserUpdated(event);
break;
case "USER_DELETED":
handleUserDeleted(event);
break;
default:
log.warn("未知事件类型: {}", event.getEventType());
}
}
private void handleUserCreated(UserEvent event) {
log.info("创建用户记录: {}", event.getUserId());
}
private void handleUserUpdated(UserEvent event) {
log.info("更新用户记录: {}", event.getUserId());
}
private void handleUserDeleted(UserEvent event) {
log.info("删除用户记录: {}", event.getUserId());
}
}
interface EventChannels {
String USER_EVENTS = "user-events";
String ORDER_EVENTS = "order-events";
String NOTIFICATION_EVENTS = "notification-events";
@Input(USER_EVENTS)
SubscribableChannel userEvents();
@Input(ORDER_EVENTS)
SubscribableChannel orderEvents();
@Input(NOTIFICATION_EVENTS)
SubscribableChannel notificationEvents();
}
@Data
@NoArgsConstructor
@AllArgsConstructor
class UserEvent {
private String eventType;
private String userId;
private String username;
private String email;
private LocalDateTime timestamp;
private Object payload;
}常见问题与解决方案
问题一:消息丢失
症状: 消息发送后,消费者无法接收到消息
解决方案:
yaml
spring:
cloud:
stream:
rabbit:
binder:
# 启用发布确认
performance:
publisher-confirm-type: correlated
publisher-returns: true
bindings:
output:
producer:
# 启用持久化
persistent: true
# 必需属性检查
required-properties:
- contentType问题二:消息重复消费
症状: 同一条消息被消费多次
解决方案:
java
@StreamListener(Sink.INPUT)
public void handleMessage(@Payload OrderEvent order) {
// 使用唯一键进行幂等处理
String idempotencyKey = order.getOrderId();
redisTemplate.opsForValue()
.setIfAbsent("processed:" + idempotencyKey, "1",
Duration.ofDays(1));
if (Boolean.TRUE.equals(redisTemplate.hasKey("processed:" + idempotencyKey))) {
log.warn("重复消息,跳过处理: {}", idempotencyKey);
return;
}
processOrder(order);
}问题三:消费者组负载不均
症状: 某些消费者处理大量消息,其他消费者空闲
解决方案:
yaml
spring:
cloud:
stream:
bindings:
input:
consumer:
# 设置并发数
concurrency: 5
# 启用分区
partitioned: true
rabbit:
binder:
# 确保队列被分区
queues:
- name: queue.example
declare: true问题四:死信队列处理
症状: 消息进入死信队列,但未被处理
解决方案:
java
@StreamListener("input.DLQ")
public void handleDeadLetter(Message<?> message) {
log.error("死信消息: {}", message.getPayload());
log.error("原始错误: {}", message.getHeaders().get("x-death"));
// 记录或人工处理
deadLetterService.process(message);
}最佳实践建议
1. 消息设计
java
// 使用强类型消息
@Data
@NoArgsConstructor
@AllArgsConstructor
class OrderEvent {
private String orderId;
private Double amount;
private String status;
private LocalDateTime createdAt;
private String idempotencyKey;
public String getIdempotencyKey() {
return orderId + "_" + createdAt.toString();
}
}2. 错误处理
java
@ServiceActivator(inputChannel = "input.errors")
public ErrorMessage errorHandler(ErrorMessage errorMessage) {
log.error("处理失败: {}", errorMessage.getPayload().getMessage());
// 发送到专门的错误处理队列
errorQueue.send(errorMessage);
return errorMessage;
}3. 监控配置
yaml
spring:
cloud:
stream:
bindings:
output:
producer:
error-channel-enabled: true
management:
endpoints:
web:
exposure:
include: health,info,metrics,bindings
endpoint:
health:
show-details: always
metrics:
tags:
application: ${spring.application.name}4. 连接池配置
yaml
spring:
cloud:
stream:
rabbit:
binder:
# 连接池大小
addresses: rabbitmq1:5672,rabbitmq2:5672
connection-timeout: 10000
rabbitmq:
cache:
channel:
size: 25
checkout-timeout: 5000版本兼容性
| Spring Cloud Stream | Spring Boot | RabbitMQ Client | RabbitMQ Server |
|---|---|---|---|
| 4.0.x | 3.2.x | 5.x | 3.11+ |
| 3.4.x | 2.7.x | 5.x | 3.10+ |
| 3.3.x | 2.6.x | 5.x | 3.9+ |
| 3.2.x | 2.5.x | 5.x | 3.9+ |
| 3.1.x | 2.4.x | 5.x | 3.8+ |
