Appearance
Apache Camel 与 RabbitMQ 集成
概述
Apache Camel 是一个强大的开源集成框架,基于企业集成模式(EIP),提供了丰富的组件和路由 DSL,使得集成各种系统变得简单。RabbitMQ 组件是 Camel 中最常用的消息组件之一,支持生产者、消费者、请求-回复等多种消息模式。
本教程将详细介绍如何在 Apache Camel 中集成 RabbitMQ,包括路由配置、消息转换、错误处理等核心功能。
集成架构设计
架构图
┌─────────────────────────────────────────────────────────────────────┐
│ Apache Camel Application │
│ │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ Camel Context │ │
│ │ │ │
│ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │
│ │ │ Route 1 │ │ Route 2 │ │ Route 3 │ │ │
│ │ │ │ │ │ │ │ │ │
│ │ │ from() │ │ from() │ │ from() │ │ │
│ │ │ process() │ │ choice() │ │ split() │ │ │
│ │ │ to() │ │ when() │ │ aggregate()│ │ │
│ │ └─────────────┘ └─────────────┘ └─────────────┘ │ │
│ │ │ │
│ │ ┌─────────────────────────────────────────────────────┐ │ │
│ │ │ RabbitMQ Component │ │ │
│ │ │ • Producer Endpoint (to rabbitmq:) │ │ │
│ │ │ • Consumer Endpoint (from rabbitmq:) │ │ │
│ │ │ • Message Converter │ │ │
│ │ │ • Connection Pool │ │ │
│ │ └─────────────────────────────────────────────────────┘ │ │
│ └─────────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────────┐
│ RabbitMQ Broker │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ Exchanges │ │
│ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ │
│ │ │ Direct │ │ Topic │ │ Fanout │ │ │
│ │ └─────────┘ └─────────┘ └─────────┘ │ │
│ └─────────────────────────────────────────────────────────────┘ │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ Queues │ │
│ │ ┌───────────┐ ┌───────────┐ ┌───────────┐ │ │
│ │ │ Queue 1 │ │ Queue 2 │ │ Queue 3 │ │ │
│ │ └───────────┘ └───────────┘ └───────────┘ │ │
│ └─────────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────────┘核心概念
| 概念 | 说明 |
|---|---|
| Camel Context | Camel 运行时容器,管理所有路由、组件和端点 |
| Route | 消息路由定义,定义消息从源头到目的地的流程 |
| Endpoint | 消息端点,可以是消息源或目的地 |
| Component | 组件,提供端点工厂,如 RabbitMQ 组件 |
| Exchange | 消息交换对象,包含消息和元数据 |
| Processor | 消息处理器,处理消息的业务逻辑 |
| EIP | 企业集成模式,如路由、过滤、转换等 |
配置示例
Maven 依赖
xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.example</groupId>
<artifactId>camel-rabbitmq-demo</artifactId>
<version>1.0.0</version>
<properties>
<camel.version>4.4.0</camel.version>
</properties>
<dependencies>
<!-- Camel Core -->
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-core</artifactId>
<version>${camel.version}</version>
</dependency>
<!-- Camel RabbitMQ -->
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-rabbitmq</artifactId>
<version>${camel.version}</version>
</dependency>
<!-- Camel Jackson for JSON -->
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-jackson</artifactId>
<version>${camel.version}</version>
</dependency>
<!-- Camel Bean -->
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-bean</artifactId>
<version>${camel.version}</version>
</dependency>
<!-- Camel Log -->
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-log</artifactId>
<version>${camel.version}</version>
</dependency>
<!-- SLF4J -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>2.0.9</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.4.11</version>
</dependency>
<!-- Test -->
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-test</artifactId>
<version>${camel.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-test-junit5</artifactId>
<version>${camel.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.camel</groupId>
<artifactId>camel-maven-plugin</artifactId>
<version>${camel.version}</version>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.11.0</version>
<configuration>
<source>17</source>
<target>17</target>
</configuration>
</plugin>
</plugins>
</build>
</project>application.properties 配置
properties
# RabbitMQ 连接配置
rabbitmq.host=localhost
rabbitmq.port=5672
rabbitmq.username=guest
rabbitmq.password=guest
rabbitmq.vhost=/
# Exchange 配置
rabbitmq.exchange.order=exchange.order
rabbitmq.exchange.notification=exchange.notification
rabbitmq.exchange.dlq=exchange.dlq
# Queue 配置
rabbitmq.queue.order=queue.order
rabbitmq.queue.notification=queue.notification
rabbitmq.queue.dlq=queue.dlq
# Camel 配置
camel.context.name=camel-rabbitmq-demo
camel.threadpool.maxpoolsize=20
camel.threadpool.poolsize=5Java DSL 路由示例
基础路由配置
java
package com.example.camel.routes;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.model.dataformat.JsonLibrary;
import org.springframework.stereotype.Component;
@Component
public class BasicRabbitMQRoute extends RouteBuilder {
@Override
public void configure() throws Exception {
onException(Exception.class)
.log("处理异常: ${exception.message}")
.handled(true)
.to("rabbitmq:exchange.dlq?routingKey=dlq.error")
.stop();
from("rabbitmq:exchange.order?queue=queue.order" +
"&exchangeType=direct" +
"&routingKey=order.created" +
"&autoDelete=false" +
"&durable=true" +
"&concurrentConsumers=5" +
"&maxConcurrentConsumers=10" +
"&prefetchCount=10" +
"&prefetchEnabled=true")
.routeId("order-consumer-route")
.log("收到订单消息: ${body}")
.unmarshal().json(JsonLibrary.Jackson, OrderMessage.class)
.process(exchange -> {
OrderMessage order = exchange.getIn().getBody(OrderMessage.class);
log.info("处理订单: {}", order.getOrderId());
exchange.getIn().setHeader("orderId", order.getOrderId());
})
.choice()
.when(simple("${body.amount} > 10000"))
.log("高价值订单")
.to("direct:highValueOrder")
.when(simple("${body.amount} > 1000"))
.log("普通订单")
.to("direct:normalOrder")
.otherwise()
.log("低价值订单")
.to("direct:lowValueOrder")
.end();
from("direct:highValueOrder")
.routeId("high-value-order-route")
.log("处理高价值订单: ${body}")
.process(exchange -> {
OrderMessage order = exchange.getIn().getBody(OrderMessage.class);
order.setStatus("HIGH_VALUE_PROCESSED");
})
.marshal().json(JsonLibrary.Jackson)
.to("rabbitmq:exchange.order.result?routingKey=order.completed");
from("direct:normalOrder")
.routeId("normal-order-route")
.log("处理普通订单: ${body}")
.process(exchange -> {
OrderMessage order = exchange.getIn().getBody(OrderMessage.class);
order.setStatus("NORMAL_PROCESSED");
})
.marshal().json(JsonLibrary.Jackson)
.to("rabbitmq:exchange.order.result?routingKey=order.completed");
from("direct:lowValueOrder")
.routeId("low-value-order-route")
.log("处理低价值订单: ${body}")
.process(exchange -> {
OrderMessage order = exchange.getIn().getBody(OrderMessage.class);
order.setStatus("LOW_VALUE_PROCESSED");
})
.marshal().json(JsonLibrary.Jackson)
.to("rabbitmq:exchange.order.result?routingKey=order.completed");
}
}消息生产者路由
java
package com.example.camel.routes;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.model.dataformat.JsonLibrary;
import org.springframework.stereotype.Component;
@Component
public class ProducerRoute extends RouteBuilder {
@Override
public void configure() throws Exception {
from("timer:orderGenerator?period=5000&repeatCount=10")
.routeId("order-generator-route")
.process(exchange -> {
OrderMessage order = new OrderMessage();
order.setOrderId("ORD-" + System.currentTimeMillis());
order.setAmount(Math.random() * 15000);
order.setCustomerId("CUST-" + (int)(Math.random() * 100));
order.setStatus("PENDING");
exchange.getIn().setBody(order);
})
.log("生成订单: ${body}")
.marshal().json(JsonLibrary.Jackson)
.setHeader("rabbitmq.CONTENT_TYPE", constant("application/json"))
.setHeader("rabbitmq.DELIVERY_MODE", constant(2))
.setHeader("rabbitmq.PRIORITY", constant(5))
.to("rabbitmq:exchange.order?" +
"exchangeType=direct&" +
"routingKey=order.created&" +
"autoDelete=false&" +
"durable=true")
.log("订单已发送: ${header.orderId}");
from("direct:sendOrder")
.routeId("send-order-route")
.log("发送订单消息")
.marshal().json(JsonLibrary.Jackson)
.to("rabbitmq:exchange.order?routingKey=order.created")
.log("订单发送完成");
from("direct:sendNotification")
.routeId("send-notification-route")
.log("发送通知消息")
.marshal().json(JsonLibrary.Jackson)
.to("rabbitmq:exchange.notification?exchangeType=topic&routingKey=notification.email")
.log("通知发送完成");
}
}消息转换与路由
java
package com.example.camel.routes;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.model.dataformat.JsonLibrary;
import org.apache.camel.model.rest.RestBindingMode;
import org.springframework.stereotype.Component;
@Component
public class TransformationRoute extends RouteBuilder {
@Override
public void configure() throws Exception {
from("rabbitmq:exchange.input?queue=queue.input")
.routeId("transformation-route")
.log("原始消息: ${body}")
.unmarshal().json(JsonLibrary.Jackson)
.transform(simple("${body.toUpperCase()}"))
.setHeader("originalType", simple("${body.class.name}"))
.process(exchange -> {
Object body = exchange.getIn().getBody();
if (body instanceof Map) {
Map<String, Object> map = (Map<String, Object>) body;
map.put("processedAt", System.currentTimeMillis());
map.put("processedBy", "camel-transformation");
}
})
.choice()
.when(header("destination").isEqualTo("database"))
.to("direct:toDatabase")
.when(header("destination").isEqualTo("notification"))
.to("direct:toNotification")
.otherwise()
.to("direct:defaultDestination")
.end();
from("direct:toDatabase")
.routeId("to-database-route")
.log("发送到数据库队列")
.marshal().json(JsonLibrary.Jackson)
.to("rabbitmq:exchange.database?routingKey=db.insert");
from("direct:toNotification")
.routeId("to-notification-route")
.log("发送到通知队列")
.marshal().json(JsonLibrary.Jackson)
.to("rabbitmq:exchange.notification?routingKey=notification.send");
from("direct:defaultDestination")
.routeId("default-destination-route")
.log("发送到默认队列")
.marshal().json(JsonLibrary.Jackson)
.to("rabbitmq:exchange.output?routingKey=output.default");
}
}消息分割与聚合
java
package com.example.camel.routes;
import org.apache.camel.AggregationStrategy;
import org.apache.camel.Exchange;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.model.dataformat.JsonLibrary;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.List;
@Component
public class SplitterAggregatorRoute extends RouteBuilder {
@Override
public void configure() throws Exception {
from("rabbitmq:exchange.batch?queue=queue.batch")
.routeId("batch-processing-route")
.log("收到批量订单: ${body}")
.unmarshal().json(JsonLibrary.Jackson, OrderBatch.class)
.split(simple("${body.orders}"), new OrderAggregationStrategy())
.parallelProcessing()
.log("处理单个订单: ${body}")
.process(exchange -> {
OrderMessage order = exchange.getIn().getBody(OrderMessage.class);
order.setStatus("PROCESSED");
})
.to("direct:processSingleOrder")
.end()
.log("所有订单处理完成: ${body}")
.marshal().json(JsonLibrary.Jackson)
.to("rabbitmq:exchange.batch.result?routingKey=batch.completed");
from("direct:processSingleOrder")
.routeId("process-single-order-route")
.log("处理订单: ${body.orderId}")
.delay(simple("${random(100,500)}"))
.setHeader("processed", constant(true));
from("rabbitmq:exchange.aggregate?queue=queue.aggregate")
.routeId("aggregate-route")
.unmarshal().json(JsonLibrary.Jackson)
.setHeader("correlationId", simple("${body.customerId}"))
.aggregate(header("correlationId"), new OrderListAggregationStrategy())
.completionSize(5)
.completionTimeout(5000)
.log("聚合完成: ${body}")
.marshal().json(JsonLibrary.Jackson)
.to("rabbitmq:exchange.aggregate.result?routingKey=aggregate.completed");
}
}
class OrderAggregationStrategy implements AggregationStrategy {
@Override
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
if (oldExchange == null) {
List<OrderMessage> orders = new ArrayList<>();
orders.add(newExchange.getIn().getBody(OrderMessage.class));
newExchange.getIn().setBody(orders);
return newExchange;
}
List<OrderMessage> orders = oldExchange.getIn().getBody(List.class);
orders.add(newExchange.getIn().getBody(OrderMessage.class));
oldExchange.getIn().setBody(orders);
return oldExchange;
}
}
class OrderListAggregationStrategy implements AggregationStrategy {
@Override
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
if (oldExchange == null) {
List<OrderMessage> orders = new ArrayList<>();
orders.add(newExchange.getIn().getBody(OrderMessage.class));
newExchange.getIn().setBody(orders);
return newExchange;
}
List<OrderMessage> orders = oldExchange.getIn().getBody(List.class);
orders.add(newExchange.getIn().getBody(OrderMessage.class));
oldExchange.getIn().setBody(orders);
return oldExchange;
}
}错误处理路由
java
package com.example.camel.routes;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.model.dataformat.JsonLibrary;
import org.apache.camel.model.errorhandler.DeadLetterChannelDefinition;
import org.springframework.stereotype.Component;
@Component
public class ErrorHandlingRoute extends RouteBuilder {
@Override
public void configure() throws Exception {
errorHandler(deadLetterChannel("rabbitmq:exchange.dlq?routingKey=dlq.error")
.maximumRedeliveries(3)
.redeliveryDelay(1000)
.retryAttemptedLogLevel(org.apache.camel.LoggingLevel.WARN)
.logExhausted(true)
.logExhaustedMessageHistory(true));
onException(OrderValidationException.class)
.handled(true)
.log("订单验证失败: ${exception.message}")
.setHeader("errorType", constant("VALIDATION_ERROR"))
.to("direct:handleValidationError");
onException(PaymentException.class)
.maximumRedeliveries(5)
.redeliveryDelay(2000)
.retryAttemptedLogLevel(org.apache.camel.LoggingLevel.WARN)
.log("支付处理失败,重试中...")
.handled(false);
onException(Exception.class)
.handled(true)
.log("未知异常: ${exception.message}")
.setHeader("errorType", constant("UNKNOWN_ERROR"))
.to("rabbitmq:exchange.dlq?routingKey=dlq.unknown");
from("rabbitmq:exchange.order.process?queue=queue.order.process")
.routeId("order-processing-route")
.log("开始处理订单: ${body}")
.unmarshal().json(JsonLibrary.Jackson, OrderMessage.class)
.process(exchange -> {
OrderMessage order = exchange.getIn().getBody(OrderMessage.class);
if (order.getAmount() <= 0) {
throw new OrderValidationException("订单金额必须大于0");
}
if (order.getCustomerId() == null || order.getCustomerId().isEmpty()) {
throw new OrderValidationException("客户ID不能为空");
}
})
.log("订单验证通过")
.process(exchange -> {
OrderMessage order = exchange.getIn().getBody(OrderMessage.class);
if (Math.random() < 0.1) {
throw new PaymentException("支付处理失败");
}
order.setStatus("PAID");
})
.log("支付处理完成")
.marshal().json(JsonLibrary.Jackson)
.to("rabbitmq:exchange.order.result?routingKey=order.completed");
from("direct:handleValidationError")
.routeId("validation-error-route")
.log("处理验证错误")
.setBody(simple("{\"error\": \"${exception.message}\", \"orderId\": \"${header.orderId}\"}"))
.to("rabbitmq:exchange.error?routingKey=error.validation");
}
}
class OrderValidationException extends Exception {
public OrderValidationException(String message) {
super(message);
}
}
class PaymentException extends Exception {
public PaymentException(String message) {
super(message);
}
}请求-回复模式
java
package com.example.camel.routes;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.model.dataformat.JsonLibrary;
import org.springframework.stereotype.Component;
@Component
public class RequestReplyRoute extends RouteBuilder {
@Override
public void configure() throws Exception {
from("direct:requestOrderStatus")
.routeId("request-order-status-route")
.log("请求订单状态: ${body}")
.setHeader("orderId", simple("${body}"))
.to("rabbitmq:exchange.order.status?" +
"routingKey=order.status.request&" +
"replyTo=queue.order.status.reply&" +
"requestTimeout=5000")
.log("收到订单状态响应: ${body}")
.unmarshal().json(JsonLibrary.Jackson);
from("rabbitmq:exchange.order.status?queue=queue.order.status.request")
.routeId("order-status-responder-route")
.log("收到订单状态请求: ${body}")
.unmarshal().json(JsonLibrary.Jackson)
.process(exchange -> {
String orderId = exchange.getIn().getHeader("orderId", String.class);
OrderStatus status = new OrderStatus();
status.setOrderId(orderId);
status.setStatus("COMPLETED");
status.setUpdatedAt(System.currentTimeMillis());
exchange.getIn().setBody(status);
})
.marshal().json(JsonLibrary.Jackson)
.log("返回订单状态: ${body}");
}
}
class OrderStatus {
private String orderId;
private String status;
private Long updatedAt;
public String getOrderId() { return orderId; }
public void setOrderId(String orderId) { this.orderId = orderId; }
public String getStatus() { return status; }
public void setStatus(String status) { this.status = status; }
public Long getUpdatedAt() { return updatedAt; }
public void setUpdatedAt(Long updatedAt) { this.updatedAt = updatedAt; }
}PHP 与 Apache Camel 集成
PHP 消息生产者
php
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;
class CamelIntegrationProducer
{
private $connection;
private $channel;
public function __construct(array $config)
{
$this->connection = new AMQPStreamConnection(
$config['host'] ?? 'localhost',
$config['port'] ?? 5672,
$config['user'] ?? 'guest',
$config['password'] ?? 'guest',
$config['vhost'] ?? '/'
);
$this->channel = $this->connection->channel();
$this->setupExchanges();
}
private function setupExchanges(): void
{
$this->channel->exchange_declare('exchange.order', 'direct', false, true, false);
$this->channel->exchange_declare('exchange.notification', 'topic', false, true, false);
$this->channel->exchange_declare('exchange.batch', 'direct', false, true, false);
}
public function sendOrderMessage(array $orderData): bool
{
$headers = new AMQPTable([
'CamelFileName' => 'order-' . time() . '.json',
'CamelMessageType' => 'OrderMessage',
'contentType' => 'application/json',
]);
$message = new AMQPMessage(
json_encode($orderData),
[
'content_type' => 'application/json',
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
'message_id' => uniqid('camel-', true),
'timestamp' => time(),
'application_headers' => $headers,
]
);
$this->channel->basic_publish(
$message,
'exchange.order',
'order.created'
);
return true;
}
public function sendBatchOrder(array $orders): bool
{
$batchData = [
'batchId' => 'BATCH-' . time(),
'orders' => $orders,
'createdAt' => date('c'),
];
$message = new AMQPMessage(
json_encode($batchData),
[
'content_type' => 'application/json',
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
]
);
$this->channel->basic_publish(
$message,
'exchange.batch',
'batch.process'
);
return true;
}
public function sendWithRouting(array $data, string $routingKey): bool
{
$message = new AMQPMessage(
json_encode($data),
[
'content_type' => 'application/json',
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
]
);
$this->channel->basic_publish(
$message,
'exchange.notification',
$routingKey
);
return true;
}
public function close(): void
{
$this->channel->close();
$this->connection->close();
}
}
$producer = new CamelIntegrationProducer([
'host' => 'localhost',
'port' => 5672,
'user' => 'guest',
'password' => 'guest',
]);
$orderData = [
'orderId' => 'ORD-' . time(),
'amount' => 2500.00,
'customerId' => 'CUST-001',
'status' => 'PENDING',
'items' => [
['productId' => 'PROD-001', 'quantity' => 2, 'price' => 1000],
['productId' => 'PROD-002', 'quantity' => 1, 'price' => 500],
],
];
$producer->sendOrderMessage($orderData);
echo "订单消息已发送到 Camel 路由\n";
$batchOrders = [
['orderId' => 'ORD-1', 'amount' => 100],
['orderId' => 'ORD-2', 'amount' => 200],
['orderId' => 'ORD-3', 'amount' => 300],
];
$producer->sendBatchOrder($batchOrders);
echo "批量订单已发送\n";
$producer->close();PHP 消息消费者
php
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
class CamelIntegrationConsumer
{
private $connection;
private $channel;
public function __construct(array $config)
{
$this->connection = new AMQPStreamConnection(
$config['host'] ?? 'localhost',
$config['port'] ?? 5672,
$config['user'] ?? 'guest',
$config['password'] ?? 'guest',
$config['vhost'] ?? '/'
);
$this->channel = $this->connection->channel();
}
public function consumeFromCamel(string $queue, callable $processor): void
{
$this->channel->basic_qos(0, 10, false);
$this->channel->basic_consume(
$queue,
'',
false,
false,
false,
false,
function (AMQPMessage $message) use ($processor) {
$this->processCamelMessage($message, $processor);
}
);
echo "等待 Camel 路由消息...\n";
while ($this->channel->is_consuming()) {
$this->channel->wait();
}
}
private function processCamelMessage(AMQPMessage $message, callable $processor): void
{
try {
$body = json_decode($message->getBody(), true);
$headers = $this->extractCamelHeaders($message);
echo "收到 Camel 消息:\n";
echo " Message ID: " . ($headers['message_id'] ?? 'N/A') . "\n";
echo " Content Type: " . ($headers['content_type'] ?? 'N/A') . "\n";
echo " Camel Headers: " . json_encode($headers['camel'] ?? []) . "\n";
$result = $processor($body, $headers);
if ($result) {
$message->ack();
echo "消息处理成功\n";
} else {
$message->nack(false, true);
echo "消息处理失败,重新入队\n";
}
} catch (Exception $e) {
echo "处理异常: " . $e->getMessage() . "\n";
$message->nack(false, false);
}
}
private function extractCamelHeaders(AMQPMessage $message): array
{
$headers = [];
if ($message->has('application_headers')) {
$appHeaders = $message->get('application_headers')->getNativeData();
$camelHeaders = [];
foreach ($appHeaders as $key => $value) {
if (strpos($key, 'Camel') === 0) {
$camelHeaders[$key] = $value;
}
}
$headers['camel'] = $camelHeaders;
$headers = array_merge($headers, $appHeaders);
}
$headers['message_id'] = $message->get('message_id');
$headers['timestamp'] = $message->get('timestamp');
$headers['content_type'] = $message->get('content_type');
return $headers;
}
public function close(): void
{
$this->channel->close();
$this->connection->close();
}
}
class CamelMessageProcessor
{
public function processOrder(array $data, array $headers): bool
{
echo "处理订单:\n";
echo " 订单ID: " . ($data['orderId'] ?? 'N/A') . "\n";
echo " 金额: " . ($data['amount'] ?? 0) . "\n";
echo " 状态: " . ($data['status'] ?? 'N/A') . "\n";
return true;
}
public function processBatch(array $data, array $headers): bool
{
echo "处理批量订单:\n";
echo " 批次ID: " . ($data['batchId'] ?? 'N/A') . "\n";
echo " 订单数量: " . count($data['orders'] ?? []) . "\n";
foreach ($data['orders'] ?? [] as $order) {
echo " - 订单: " . ($order['orderId'] ?? 'N/A') . "\n";
}
return true;
}
}
$consumer = new CamelIntegrationConsumer([
'host' => 'localhost',
'port' => 5672,
'user' => 'guest',
'password' => 'guest',
]);
$processor = new CamelMessageProcessor();
$queue = $argv[1] ?? 'queue.order.processed';
$consumer->consumeFromCamel($queue, [$processor, 'processOrder']);
$consumer->close();实际应用场景
场景一:订单处理流水线
java
package com.example.camel.scenario;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.model.dataformat.JsonLibrary;
import org.springframework.stereotype.Component;
@Component
public class OrderPipelineRoute extends RouteBuilder {
@Override
public void configure() throws Exception {
from("rabbitmq:exchange.order.input?queue=queue.order.input")
.routeId("order-pipeline-route")
.log("开始订单处理流水线")
.unmarshal().json(JsonLibrary.Jackson, OrderMessage.class)
.process(exchange -> {
OrderMessage order = exchange.getIn().getBody(OrderMessage.class);
exchange.setProperty("originalOrder", order);
})
.to("direct:validateOrder")
.to("direct:processPayment")
.to("direct:reserveInventory")
.to("direct:confirmOrder")
.log("订单处理流水线完成: ${body}")
.marshal().json(JsonLibrary.Jackson)
.to("rabbitmq:exchange.order.output?routingKey=order.completed");
from("direct:validateOrder")
.routeId("validate-order-route")
.log("验证订单")
.process(exchange -> {
OrderMessage order = exchange.getIn().getBody(OrderMessage.class);
if (order.getAmount() <= 0) {
throw new IllegalArgumentException("订单金额无效");
}
order.setStatus("VALIDATED");
});
from("direct:processPayment")
.routeId("process-payment-route")
.log("处理支付")
.process(exchange -> {
OrderMessage order = exchange.getIn().getBody(OrderMessage.class);
order.setStatus("PAID");
})
.to("rabbitmq:exchange.payment?routingKey=payment.processed");
from("direct:reserveInventory")
.routeId("reserve-inventory-route")
.log("预留库存")
.process(exchange -> {
OrderMessage order = exchange.getIn().getBody(OrderMessage.class);
order.setStatus("INVENTORY_RESERVED");
})
.to("rabbitmq:exchange.inventory?routingKey=inventory.reserved");
from("direct:confirmOrder")
.routeId("confirm-order-route")
.log("确认订单")
.process(exchange -> {
OrderMessage order = exchange.getIn().getBody(OrderMessage.class);
order.setStatus("CONFIRMED");
});
}
}场景二:多系统集成
java
package com.example.camel.scenario;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.model.dataformat.JsonLibrary;
import org.springframework.stereotype.Component;
@Component
public class MultiSystemIntegrationRoute extends RouteBuilder {
@Override
public void configure() throws Exception {
from("rabbitmq:exchange.crm?queue=queue.crm.events")
.routeId("crm-integration-route")
.log("CRM 事件: ${body}")
.unmarshal().json(JsonLibrary.Jackson)
.multicast()
.parallelProcessing()
.to("direct:toERP")
.to("direct:toBilling")
.to("direct:toNotification")
.end();
from("direct:toERP")
.routeId("to-erp-route")
.log("发送到 ERP 系统")
.marshal().json(JsonLibrary.Jackson)
.to("rabbitmq:exchange.erp?routingKey=erp.sync")
.to("rabbitmq:exchange.audit?routingKey=audit.erp");
from("direct:toBilling")
.routeId("to-billing-route")
.log("发送到计费系统")
.marshal().json(JsonLibrary.Jackson)
.to("rabbitmq:exchange.billing?routingKey=billing.charge")
.to("rabbitmq:exchange.audit?routingKey=audit.billing");
from("direct:toNotification")
.routeId("to-notification-route")
.log("发送通知")
.marshal().json(JsonLibrary.Jackson)
.to("rabbitmq:exchange.notification?routingKey=notification.send")
.to("rabbitmq:exchange.audit?routingKey=audit.notification");
from("rabbitmq:exchange.audit?queue=queue.audit")
.routeId("audit-route")
.log("审计日志: ${body}")
.to("file:/var/log/camel/audit?fileName=audit-${date:now:yyyyMMdd}.log");
}
}常见问题与解决方案
问题一:连接断开
症状: Camel 与 RabbitMQ 连接频繁断开
解决方案:
java
from("rabbitmq:exchange.order?queue=queue.order" +
"&automaticRecoveryEnabled=true" +
"&networkRecoveryInterval=5000" +
"&topologyRecoveryEnabled=true")
.routeId("recovery-route")
.log("处理消息: ${body}");问题二:消息堆积
症状: 队列中消息堆积,消费速度慢
解决方案:
java
from("rabbitmq:exchange.high.throughput?queue=queue.high.throughput" +
"&concurrentConsumers=10" +
"&maxConcurrentConsumers=50" +
"&prefetchCount=20" +
"&prefetchEnabled=true" +
"&threadPoolSize=50")
.routeId("high-throughput-route")
.threads(20)
.log("高吞吐量处理: ${body}");问题三:消息顺序
症状: 消息处理顺序与发送顺序不一致
解决方案:
java
from("rabbitmq:exchange.ordered?queue=queue.ordered" +
"&concurrentConsumers=1" +
"&prefetchCount=1")
.routeId("ordered-route")
.log("顺序处理: ${body}");最佳实践建议
1. 路由组织
java
@Component
public class OrganizedRoutes extends RouteBuilder {
@Override
public void configure() throws Exception {
errorHandler(defaultErrorHandler()
.maximumRedeliveries(3)
.redeliveryDelay(1000)
.retryAttemptedLogLevel(LoggingLevel.WARN));
configureOrderRoutes();
configureNotificationRoutes();
configureAuditRoutes();
}
private void configureOrderRoutes() {
from("rabbitmq:exchange.order?queue=queue.order")
.routeId("order-route")
.routeGroup("order")
.log("处理订单");
}
private void configureNotificationRoutes() {
from("rabbitmq:exchange.notification?queue=queue.notification")
.routeId("notification-route")
.routeGroup("notification")
.log("发送通知");
}
private void configureAuditRoutes() {
from("rabbitmq:exchange.audit?queue=queue.audit")
.routeId("audit-route")
.routeGroup("audit")
.log("审计日志");
}
}2. 监控配置
java
from("rabbitmq:exchange.monitored?queue=queue.monitored")
.routeId("monitored-route")
.metrics().timer("rabbitmq.processing.time")
.log("处理消息: ${body}")
.to("metrics:counter:rabbitmq.messages.processed");3. 测试支持
java
@CamelSpringBootTest
class RabbitMQRouteTest {
@Autowired
private CamelContext camelContext;
@Autowired
private ProducerTemplate producerTemplate;
@Test
void testOrderRoute() {
OrderMessage order = new OrderMessage();
order.setOrderId("TEST-001");
order.setAmount(1000.0);
String result = producerTemplate.requestBody(
"direct:sendOrder",
order,
String.class
);
assertNotNull(result);
}
}版本兼容性
| Apache Camel | Java | RabbitMQ Client | RabbitMQ Server |
|---|---|---|---|
| 4.4.x | 17+ | 5.x | 3.11+ |
| 4.3.x | 17+ | 5.x | 3.10+ |
| 4.2.x | 17+ | 5.x | 3.9+ |
| 3.21.x | 11+ | 5.x | 3.9+ |
| 3.20.x | 11+ | 5.x | 3.8+ |
