Skip to content

Apache Camel 与 RabbitMQ 集成

概述

Apache Camel 是一个强大的开源集成框架,基于企业集成模式(EIP),提供了丰富的组件和路由 DSL,使得集成各种系统变得简单。RabbitMQ 组件是 Camel 中最常用的消息组件之一,支持生产者、消费者、请求-回复等多种消息模式。

本教程将详细介绍如何在 Apache Camel 中集成 RabbitMQ,包括路由配置、消息转换、错误处理等核心功能。

集成架构设计

架构图

┌─────────────────────────────────────────────────────────────────────┐
│                      Apache Camel Application                        │
│                                                                      │
│  ┌─────────────────────────────────────────────────────────────┐    │
│  │                     Camel Context                            │    │
│  │                                                              │    │
│  │  ┌─────────────┐    ┌─────────────┐    ┌─────────────┐     │    │
│  │  │   Route 1   │    │   Route 2   │    │   Route 3   │     │    │
│  │  │             │    │             │    │             │     │    │
│  │  │  from()     │    │  from()     │    │  from()     │     │    │
│  │  │  process()  │    │  choice()   │    │  split()    │     │    │
│  │  │  to()       │    │  when()     │    │  aggregate()│     │    │
│  │  └─────────────┘    └─────────────┘    └─────────────┘     │    │
│  │                                                              │    │
│  │  ┌─────────────────────────────────────────────────────┐    │    │
│  │  │              RabbitMQ Component                      │    │    │
│  │  │  • Producer Endpoint (to rabbitmq:)                 │    │    │
│  │  │  • Consumer Endpoint (from rabbitmq:)               │    │    │
│  │  │  • Message Converter                                │    │    │
│  │  │  • Connection Pool                                  │    │    │
│  │  └─────────────────────────────────────────────────────┘    │    │
│  └─────────────────────────────────────────────────────────────┘    │
└─────────────────────────────────────────────────────────────────────┘


┌─────────────────────────────────────────────────────────────────────┐
│                         RabbitMQ Broker                              │
│  ┌─────────────────────────────────────────────────────────────┐    │
│  │                        Exchanges                             │    │
│  │    ┌─────────┐    ┌─────────┐    ┌─────────┐               │    │
│  │    │ Direct  │    │  Topic  │    │ Fanout  │               │    │
│  │    └─────────┘    └─────────┘    └─────────┘               │    │
│  └─────────────────────────────────────────────────────────────┘    │
│  ┌─────────────────────────────────────────────────────────────┐    │
│  │                          Queues                              │       │
│  │    ┌───────────┐    ┌───────────┐    ┌───────────┐         │    │
│  │    │  Queue 1  │    │  Queue 2  │    │  Queue 3  │         │    │
│  │    └───────────┘    └───────────┘    └───────────┘         │    │
│  └─────────────────────────────────────────────────────────────┘    │
└─────────────────────────────────────────────────────────────────────┘

核心概念

概念说明
Camel ContextCamel 运行时容器,管理所有路由、组件和端点
Route消息路由定义,定义消息从源头到目的地的流程
Endpoint消息端点,可以是消息源或目的地
Component组件,提供端点工厂,如 RabbitMQ 组件
Exchange消息交换对象,包含消息和元数据
Processor消息处理器,处理消息的业务逻辑
EIP企业集成模式,如路由、过滤、转换等

配置示例

Maven 依赖

xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
         http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    
    <groupId>com.example</groupId>
    <artifactId>camel-rabbitmq-demo</artifactId>
    <version>1.0.0</version>
    
    <properties>
        <camel.version>4.4.0</camel.version>
    </properties>
    
    <dependencies>
        <!-- Camel Core -->
        <dependency>
            <groupId>org.apache.camel</groupId>
            <artifactId>camel-core</artifactId>
            <version>${camel.version}</version>
        </dependency>
        
        <!-- Camel RabbitMQ -->
        <dependency>
            <groupId>org.apache.camel</groupId>
            <artifactId>camel-rabbitmq</artifactId>
            <version>${camel.version}</version>
        </dependency>
        
        <!-- Camel Jackson for JSON -->
        <dependency>
            <groupId>org.apache.camel</groupId>
            <artifactId>camel-jackson</artifactId>
            <version>${camel.version}</version>
        </dependency>
        
        <!-- Camel Bean -->
        <dependency>
            <groupId>org.apache.camel</groupId>
            <artifactId>camel-bean</artifactId>
            <version>${camel.version}</version>
        </dependency>
        
        <!-- Camel Log -->
        <dependency>
            <groupId>org.apache.camel</groupId>
            <artifactId>camel-log</artifactId>
            <version>${camel.version}</version>
        </dependency>
        
        <!-- SLF4J -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>2.0.9</version>
        </dependency>
        <dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-classic</artifactId>
            <version>1.4.11</version>
        </dependency>
        
        <!-- Test -->
        <dependency>
            <groupId>org.apache.camel</groupId>
            <artifactId>camel-test</artifactId>
            <version>${camel.version}</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.camel</groupId>
            <artifactId>camel-test-junit5</artifactId>
            <version>${camel.version}</version>
            <scope>test</scope>
        </dependency>
    </dependencies>
    
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.camel</groupId>
                <artifactId>camel-maven-plugin</artifactId>
                <version>${camel.version}</version>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.11.0</version>
                <configuration>
                    <source>17</source>
                    <target>17</target>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

application.properties 配置

properties
# RabbitMQ 连接配置
rabbitmq.host=localhost
rabbitmq.port=5672
rabbitmq.username=guest
rabbitmq.password=guest
rabbitmq.vhost=/

# Exchange 配置
rabbitmq.exchange.order=exchange.order
rabbitmq.exchange.notification=exchange.notification
rabbitmq.exchange.dlq=exchange.dlq

# Queue 配置
rabbitmq.queue.order=queue.order
rabbitmq.queue.notification=queue.notification
rabbitmq.queue.dlq=queue.dlq

# Camel 配置
camel.context.name=camel-rabbitmq-demo
camel.threadpool.maxpoolsize=20
camel.threadpool.poolsize=5

Java DSL 路由示例

基础路由配置

java
package com.example.camel.routes;

import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.model.dataformat.JsonLibrary;
import org.springframework.stereotype.Component;

@Component
public class BasicRabbitMQRoute extends RouteBuilder {
    
    @Override
    public void configure() throws Exception {
        
        onException(Exception.class)
            .log("处理异常: ${exception.message}")
            .handled(true)
            .to("rabbitmq:exchange.dlq?routingKey=dlq.error")
            .stop();
        
        from("rabbitmq:exchange.order?queue=queue.order" +
             "&exchangeType=direct" +
             "&routingKey=order.created" +
             "&autoDelete=false" +
             "&durable=true" +
             "&concurrentConsumers=5" +
             "&maxConcurrentConsumers=10" +
             "&prefetchCount=10" +
             "&prefetchEnabled=true")
            
            .routeId("order-consumer-route")
            .log("收到订单消息: ${body}")
            .unmarshal().json(JsonLibrary.Jackson, OrderMessage.class)
            .process(exchange -> {
                OrderMessage order = exchange.getIn().getBody(OrderMessage.class);
                log.info("处理订单: {}", order.getOrderId());
                exchange.getIn().setHeader("orderId", order.getOrderId());
            })
            .choice()
                .when(simple("${body.amount} > 10000"))
                    .log("高价值订单")
                    .to("direct:highValueOrder")
                .when(simple("${body.amount} > 1000"))
                    .log("普通订单")
                    .to("direct:normalOrder")
                .otherwise()
                    .log("低价值订单")
                    .to("direct:lowValueOrder")
            .end();
        
        from("direct:highValueOrder")
            .routeId("high-value-order-route")
            .log("处理高价值订单: ${body}")
            .process(exchange -> {
                OrderMessage order = exchange.getIn().getBody(OrderMessage.class);
                order.setStatus("HIGH_VALUE_PROCESSED");
            })
            .marshal().json(JsonLibrary.Jackson)
            .to("rabbitmq:exchange.order.result?routingKey=order.completed");
        
        from("direct:normalOrder")
            .routeId("normal-order-route")
            .log("处理普通订单: ${body}")
            .process(exchange -> {
                OrderMessage order = exchange.getIn().getBody(OrderMessage.class);
                order.setStatus("NORMAL_PROCESSED");
            })
            .marshal().json(JsonLibrary.Jackson)
            .to("rabbitmq:exchange.order.result?routingKey=order.completed");
        
        from("direct:lowValueOrder")
            .routeId("low-value-order-route")
            .log("处理低价值订单: ${body}")
            .process(exchange -> {
                OrderMessage order = exchange.getIn().getBody(OrderMessage.class);
                order.setStatus("LOW_VALUE_PROCESSED");
            })
            .marshal().json(JsonLibrary.Jackson)
            .to("rabbitmq:exchange.order.result?routingKey=order.completed");
    }
}

消息生产者路由

java
package com.example.camel.routes;

import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.model.dataformat.JsonLibrary;
import org.springframework.stereotype.Component;

@Component
public class ProducerRoute extends RouteBuilder {
    
    @Override
    public void configure() throws Exception {
        
        from("timer:orderGenerator?period=5000&repeatCount=10")
            .routeId("order-generator-route")
            .process(exchange -> {
                OrderMessage order = new OrderMessage();
                order.setOrderId("ORD-" + System.currentTimeMillis());
                order.setAmount(Math.random() * 15000);
                order.setCustomerId("CUST-" + (int)(Math.random() * 100));
                order.setStatus("PENDING");
                exchange.getIn().setBody(order);
            })
            .log("生成订单: ${body}")
            .marshal().json(JsonLibrary.Jackson)
            .setHeader("rabbitmq.CONTENT_TYPE", constant("application/json"))
            .setHeader("rabbitmq.DELIVERY_MODE", constant(2))
            .setHeader("rabbitmq.PRIORITY", constant(5))
            .to("rabbitmq:exchange.order?" +
                "exchangeType=direct&" +
                "routingKey=order.created&" +
                "autoDelete=false&" +
                "durable=true")
            .log("订单已发送: ${header.orderId}");
        
        from("direct:sendOrder")
            .routeId("send-order-route")
            .log("发送订单消息")
            .marshal().json(JsonLibrary.Jackson)
            .to("rabbitmq:exchange.order?routingKey=order.created")
            .log("订单发送完成");
        
        from("direct:sendNotification")
            .routeId("send-notification-route")
            .log("发送通知消息")
            .marshal().json(JsonLibrary.Jackson)
            .to("rabbitmq:exchange.notification?exchangeType=topic&routingKey=notification.email")
            .log("通知发送完成");
    }
}

消息转换与路由

java
package com.example.camel.routes;

import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.model.dataformat.JsonLibrary;
import org.apache.camel.model.rest.RestBindingMode;
import org.springframework.stereotype.Component;

@Component
public class TransformationRoute extends RouteBuilder {
    
    @Override
    public void configure() throws Exception {
        
        from("rabbitmq:exchange.input?queue=queue.input")
            .routeId("transformation-route")
            .log("原始消息: ${body}")
            
            .unmarshal().json(JsonLibrary.Jackson)
            
            .transform(simple("${body.toUpperCase()}"))
            
            .setHeader("originalType", simple("${body.class.name}"))
            
            .process(exchange -> {
                Object body = exchange.getIn().getBody();
                if (body instanceof Map) {
                    Map<String, Object> map = (Map<String, Object>) body;
                    map.put("processedAt", System.currentTimeMillis());
                    map.put("processedBy", "camel-transformation");
                }
            })
            
            .choice()
                .when(header("destination").isEqualTo("database"))
                    .to("direct:toDatabase")
                .when(header("destination").isEqualTo("notification"))
                    .to("direct:toNotification")
                .otherwise()
                    .to("direct:defaultDestination")
            .end();
        
        from("direct:toDatabase")
            .routeId("to-database-route")
            .log("发送到数据库队列")
            .marshal().json(JsonLibrary.Jackson)
            .to("rabbitmq:exchange.database?routingKey=db.insert");
        
        from("direct:toNotification")
            .routeId("to-notification-route")
            .log("发送到通知队列")
            .marshal().json(JsonLibrary.Jackson)
            .to("rabbitmq:exchange.notification?routingKey=notification.send");
        
        from("direct:defaultDestination")
            .routeId("default-destination-route")
            .log("发送到默认队列")
            .marshal().json(JsonLibrary.Jackson)
            .to("rabbitmq:exchange.output?routingKey=output.default");
    }
}

消息分割与聚合

java
package com.example.camel.routes;

import org.apache.camel.AggregationStrategy;
import org.apache.camel.Exchange;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.model.dataformat.JsonLibrary;
import org.springframework.stereotype.Component;

import java.util.ArrayList;
import java.util.List;

@Component
public class SplitterAggregatorRoute extends RouteBuilder {
    
    @Override
    public void configure() throws Exception {
        
        from("rabbitmq:exchange.batch?queue=queue.batch")
            .routeId("batch-processing-route")
            .log("收到批量订单: ${body}")
            .unmarshal().json(JsonLibrary.Jackson, OrderBatch.class)
            
            .split(simple("${body.orders}"), new OrderAggregationStrategy())
                .parallelProcessing()
                .log("处理单个订单: ${body}")
                .process(exchange -> {
                    OrderMessage order = exchange.getIn().getBody(OrderMessage.class);
                    order.setStatus("PROCESSED");
                })
                .to("direct:processSingleOrder")
            .end()
            
            .log("所有订单处理完成: ${body}")
            .marshal().json(JsonLibrary.Jackson)
            .to("rabbitmq:exchange.batch.result?routingKey=batch.completed");
        
        from("direct:processSingleOrder")
            .routeId("process-single-order-route")
            .log("处理订单: ${body.orderId}")
            .delay(simple("${random(100,500)}"))
            .setHeader("processed", constant(true));
        
        from("rabbitmq:exchange.aggregate?queue=queue.aggregate")
            .routeId("aggregate-route")
            .unmarshal().json(JsonLibrary.Jackson)
            .setHeader("correlationId", simple("${body.customerId}"))
            .aggregate(header("correlationId"), new OrderListAggregationStrategy())
                .completionSize(5)
                .completionTimeout(5000)
                .log("聚合完成: ${body}")
                .marshal().json(JsonLibrary.Jackson)
                .to("rabbitmq:exchange.aggregate.result?routingKey=aggregate.completed");
    }
}

class OrderAggregationStrategy implements AggregationStrategy {
    @Override
    public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
        if (oldExchange == null) {
            List<OrderMessage> orders = new ArrayList<>();
            orders.add(newExchange.getIn().getBody(OrderMessage.class));
            newExchange.getIn().setBody(orders);
            return newExchange;
        }
        
        List<OrderMessage> orders = oldExchange.getIn().getBody(List.class);
        orders.add(newExchange.getIn().getBody(OrderMessage.class));
        oldExchange.getIn().setBody(orders);
        return oldExchange;
    }
}

class OrderListAggregationStrategy implements AggregationStrategy {
    @Override
    public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
        if (oldExchange == null) {
            List<OrderMessage> orders = new ArrayList<>();
            orders.add(newExchange.getIn().getBody(OrderMessage.class));
            newExchange.getIn().setBody(orders);
            return newExchange;
        }
        
        List<OrderMessage> orders = oldExchange.getIn().getBody(List.class);
        orders.add(newExchange.getIn().getBody(OrderMessage.class));
        oldExchange.getIn().setBody(orders);
        return oldExchange;
    }
}

错误处理路由

java
package com.example.camel.routes;

import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.model.dataformat.JsonLibrary;
import org.apache.camel.model.errorhandler.DeadLetterChannelDefinition;
import org.springframework.stereotype.Component;

@Component
public class ErrorHandlingRoute extends RouteBuilder {
    
    @Override
    public void configure() throws Exception {
        
        errorHandler(deadLetterChannel("rabbitmq:exchange.dlq?routingKey=dlq.error")
            .maximumRedeliveries(3)
            .redeliveryDelay(1000)
            .retryAttemptedLogLevel(org.apache.camel.LoggingLevel.WARN)
            .logExhausted(true)
            .logExhaustedMessageHistory(true));
        
        onException(OrderValidationException.class)
            .handled(true)
            .log("订单验证失败: ${exception.message}")
            .setHeader("errorType", constant("VALIDATION_ERROR"))
            .to("direct:handleValidationError");
        
        onException(PaymentException.class)
            .maximumRedeliveries(5)
            .redeliveryDelay(2000)
            .retryAttemptedLogLevel(org.apache.camel.LoggingLevel.WARN)
            .log("支付处理失败,重试中...")
            .handled(false);
        
        onException(Exception.class)
            .handled(true)
            .log("未知异常: ${exception.message}")
            .setHeader("errorType", constant("UNKNOWN_ERROR"))
            .to("rabbitmq:exchange.dlq?routingKey=dlq.unknown");
        
        from("rabbitmq:exchange.order.process?queue=queue.order.process")
            .routeId("order-processing-route")
            .log("开始处理订单: ${body}")
            .unmarshal().json(JsonLibrary.Jackson, OrderMessage.class)
            
            .process(exchange -> {
                OrderMessage order = exchange.getIn().getBody(OrderMessage.class);
                if (order.getAmount() <= 0) {
                    throw new OrderValidationException("订单金额必须大于0");
                }
                if (order.getCustomerId() == null || order.getCustomerId().isEmpty()) {
                    throw new OrderValidationException("客户ID不能为空");
                }
            })
            
            .log("订单验证通过")
            
            .process(exchange -> {
                OrderMessage order = exchange.getIn().getBody(OrderMessage.class);
                if (Math.random() < 0.1) {
                    throw new PaymentException("支付处理失败");
                }
                order.setStatus("PAID");
            })
            
            .log("支付处理完成")
            .marshal().json(JsonLibrary.Jackson)
            .to("rabbitmq:exchange.order.result?routingKey=order.completed");
        
        from("direct:handleValidationError")
            .routeId("validation-error-route")
            .log("处理验证错误")
            .setBody(simple("{\"error\": \"${exception.message}\", \"orderId\": \"${header.orderId}\"}"))
            .to("rabbitmq:exchange.error?routingKey=error.validation");
    }
}

class OrderValidationException extends Exception {
    public OrderValidationException(String message) {
        super(message);
    }
}

class PaymentException extends Exception {
    public PaymentException(String message) {
        super(message);
    }
}

请求-回复模式

java
package com.example.camel.routes;

import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.model.dataformat.JsonLibrary;
import org.springframework.stereotype.Component;

@Component
public class RequestReplyRoute extends RouteBuilder {
    
    @Override
    public void configure() throws Exception {
        
        from("direct:requestOrderStatus")
            .routeId("request-order-status-route")
            .log("请求订单状态: ${body}")
            .setHeader("orderId", simple("${body}"))
            .to("rabbitmq:exchange.order.status?" +
                "routingKey=order.status.request&" +
                "replyTo=queue.order.status.reply&" +
                "requestTimeout=5000")
            .log("收到订单状态响应: ${body}")
            .unmarshal().json(JsonLibrary.Jackson);
        
        from("rabbitmq:exchange.order.status?queue=queue.order.status.request")
            .routeId("order-status-responder-route")
            .log("收到订单状态请求: ${body}")
            .unmarshal().json(JsonLibrary.Jackson)
            .process(exchange -> {
                String orderId = exchange.getIn().getHeader("orderId", String.class);
                OrderStatus status = new OrderStatus();
                status.setOrderId(orderId);
                status.setStatus("COMPLETED");
                status.setUpdatedAt(System.currentTimeMillis());
                exchange.getIn().setBody(status);
            })
            .marshal().json(JsonLibrary.Jackson)
            .log("返回订单状态: ${body}");
    }
}

class OrderStatus {
    private String orderId;
    private String status;
    private Long updatedAt;
    
    public String getOrderId() { return orderId; }
    public void setOrderId(String orderId) { this.orderId = orderId; }
    public String getStatus() { return status; }
    public void setStatus(String status) { this.status = status; }
    public Long getUpdatedAt() { return updatedAt; }
    public void setUpdatedAt(Long updatedAt) { this.updatedAt = updatedAt; }
}

PHP 与 Apache Camel 集成

PHP 消息生产者

php
<?php

require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;

class CamelIntegrationProducer
{
    private $connection;
    private $channel;
    
    public function __construct(array $config)
    {
        $this->connection = new AMQPStreamConnection(
            $config['host'] ?? 'localhost',
            $config['port'] ?? 5672,
            $config['user'] ?? 'guest',
            $config['password'] ?? 'guest',
            $config['vhost'] ?? '/'
        );
        
        $this->channel = $this->connection->channel();
        $this->setupExchanges();
    }
    
    private function setupExchanges(): void
    {
        $this->channel->exchange_declare('exchange.order', 'direct', false, true, false);
        $this->channel->exchange_declare('exchange.notification', 'topic', false, true, false);
        $this->channel->exchange_declare('exchange.batch', 'direct', false, true, false);
    }
    
    public function sendOrderMessage(array $orderData): bool
    {
        $headers = new AMQPTable([
            'CamelFileName' => 'order-' . time() . '.json',
            'CamelMessageType' => 'OrderMessage',
            'contentType' => 'application/json',
        ]);
        
        $message = new AMQPMessage(
            json_encode($orderData),
            [
                'content_type' => 'application/json',
                'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
                'message_id' => uniqid('camel-', true),
                'timestamp' => time(),
                'application_headers' => $headers,
            ]
        );
        
        $this->channel->basic_publish(
            $message,
            'exchange.order',
            'order.created'
        );
        
        return true;
    }
    
    public function sendBatchOrder(array $orders): bool
    {
        $batchData = [
            'batchId' => 'BATCH-' . time(),
            'orders' => $orders,
            'createdAt' => date('c'),
        ];
        
        $message = new AMQPMessage(
            json_encode($batchData),
            [
                'content_type' => 'application/json',
                'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
            ]
        );
        
        $this->channel->basic_publish(
            $message,
            'exchange.batch',
            'batch.process'
        );
        
        return true;
    }
    
    public function sendWithRouting(array $data, string $routingKey): bool
    {
        $message = new AMQPMessage(
            json_encode($data),
            [
                'content_type' => 'application/json',
                'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
            ]
        );
        
        $this->channel->basic_publish(
            $message,
            'exchange.notification',
            $routingKey
        );
        
        return true;
    }
    
    public function close(): void
    {
        $this->channel->close();
        $this->connection->close();
    }
}

$producer = new CamelIntegrationProducer([
    'host' => 'localhost',
    'port' => 5672,
    'user' => 'guest',
    'password' => 'guest',
]);

$orderData = [
    'orderId' => 'ORD-' . time(),
    'amount' => 2500.00,
    'customerId' => 'CUST-001',
    'status' => 'PENDING',
    'items' => [
        ['productId' => 'PROD-001', 'quantity' => 2, 'price' => 1000],
        ['productId' => 'PROD-002', 'quantity' => 1, 'price' => 500],
    ],
];

$producer->sendOrderMessage($orderData);
echo "订单消息已发送到 Camel 路由\n";

$batchOrders = [
    ['orderId' => 'ORD-1', 'amount' => 100],
    ['orderId' => 'ORD-2', 'amount' => 200],
    ['orderId' => 'ORD-3', 'amount' => 300],
];
$producer->sendBatchOrder($batchOrders);
echo "批量订单已发送\n";

$producer->close();

PHP 消息消费者

php
<?php

require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

class CamelIntegrationConsumer
{
    private $connection;
    private $channel;
    
    public function __construct(array $config)
    {
        $this->connection = new AMQPStreamConnection(
            $config['host'] ?? 'localhost',
            $config['port'] ?? 5672,
            $config['user'] ?? 'guest',
            $config['password'] ?? 'guest',
            $config['vhost'] ?? '/'
        );
        
        $this->channel = $this->connection->channel();
    }
    
    public function consumeFromCamel(string $queue, callable $processor): void
    {
        $this->channel->basic_qos(0, 10, false);
        
        $this->channel->basic_consume(
            $queue,
            '',
            false,
            false,
            false,
            false,
            function (AMQPMessage $message) use ($processor) {
                $this->processCamelMessage($message, $processor);
            }
        );
        
        echo "等待 Camel 路由消息...\n";
        
        while ($this->channel->is_consuming()) {
            $this->channel->wait();
        }
    }
    
    private function processCamelMessage(AMQPMessage $message, callable $processor): void
    {
        try {
            $body = json_decode($message->getBody(), true);
            $headers = $this->extractCamelHeaders($message);
            
            echo "收到 Camel 消息:\n";
            echo "  Message ID: " . ($headers['message_id'] ?? 'N/A') . "\n";
            echo "  Content Type: " . ($headers['content_type'] ?? 'N/A') . "\n";
            echo "  Camel Headers: " . json_encode($headers['camel'] ?? []) . "\n";
            
            $result = $processor($body, $headers);
            
            if ($result) {
                $message->ack();
                echo "消息处理成功\n";
            } else {
                $message->nack(false, true);
                echo "消息处理失败,重新入队\n";
            }
        } catch (Exception $e) {
            echo "处理异常: " . $e->getMessage() . "\n";
            $message->nack(false, false);
        }
    }
    
    private function extractCamelHeaders(AMQPMessage $message): array
    {
        $headers = [];
        
        if ($message->has('application_headers')) {
            $appHeaders = $message->get('application_headers')->getNativeData();
            
            $camelHeaders = [];
            foreach ($appHeaders as $key => $value) {
                if (strpos($key, 'Camel') === 0) {
                    $camelHeaders[$key] = $value;
                }
            }
            $headers['camel'] = $camelHeaders;
            $headers = array_merge($headers, $appHeaders);
        }
        
        $headers['message_id'] = $message->get('message_id');
        $headers['timestamp'] = $message->get('timestamp');
        $headers['content_type'] = $message->get('content_type');
        
        return $headers;
    }
    
    public function close(): void
    {
        $this->channel->close();
        $this->connection->close();
    }
}

class CamelMessageProcessor
{
    public function processOrder(array $data, array $headers): bool
    {
        echo "处理订单:\n";
        echo "  订单ID: " . ($data['orderId'] ?? 'N/A') . "\n";
        echo "  金额: " . ($data['amount'] ?? 0) . "\n";
        echo "  状态: " . ($data['status'] ?? 'N/A') . "\n";
        
        return true;
    }
    
    public function processBatch(array $data, array $headers): bool
    {
        echo "处理批量订单:\n";
        echo "  批次ID: " . ($data['batchId'] ?? 'N/A') . "\n";
        echo "  订单数量: " . count($data['orders'] ?? []) . "\n";
        
        foreach ($data['orders'] ?? [] as $order) {
            echo "    - 订单: " . ($order['orderId'] ?? 'N/A') . "\n";
        }
        
        return true;
    }
}

$consumer = new CamelIntegrationConsumer([
    'host' => 'localhost',
    'port' => 5672,
    'user' => 'guest',
    'password' => 'guest',
]);

$processor = new CamelMessageProcessor();

$queue = $argv[1] ?? 'queue.order.processed';

$consumer->consumeFromCamel($queue, [$processor, 'processOrder']);

$consumer->close();

实际应用场景

场景一:订单处理流水线

java
package com.example.camel.scenario;

import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.model.dataformat.JsonLibrary;
import org.springframework.stereotype.Component;

@Component
public class OrderPipelineRoute extends RouteBuilder {
    
    @Override
    public void configure() throws Exception {
        
        from("rabbitmq:exchange.order.input?queue=queue.order.input")
            .routeId("order-pipeline-route")
            .log("开始订单处理流水线")
            
            .unmarshal().json(JsonLibrary.Jackson, OrderMessage.class)
            
            .process(exchange -> {
                OrderMessage order = exchange.getIn().getBody(OrderMessage.class);
                exchange.setProperty("originalOrder", order);
            })
            
            .to("direct:validateOrder")
            .to("direct:processPayment")
            .to("direct:reserveInventory")
            .to("direct:confirmOrder")
            
            .log("订单处理流水线完成: ${body}")
            .marshal().json(JsonLibrary.Jackson)
            .to("rabbitmq:exchange.order.output?routingKey=order.completed");
        
        from("direct:validateOrder")
            .routeId("validate-order-route")
            .log("验证订单")
            .process(exchange -> {
                OrderMessage order = exchange.getIn().getBody(OrderMessage.class);
                if (order.getAmount() <= 0) {
                    throw new IllegalArgumentException("订单金额无效");
                }
                order.setStatus("VALIDATED");
            });
        
        from("direct:processPayment")
            .routeId("process-payment-route")
            .log("处理支付")
            .process(exchange -> {
                OrderMessage order = exchange.getIn().getBody(OrderMessage.class);
                order.setStatus("PAID");
            })
            .to("rabbitmq:exchange.payment?routingKey=payment.processed");
        
        from("direct:reserveInventory")
            .routeId("reserve-inventory-route")
            .log("预留库存")
            .process(exchange -> {
                OrderMessage order = exchange.getIn().getBody(OrderMessage.class);
                order.setStatus("INVENTORY_RESERVED");
            })
            .to("rabbitmq:exchange.inventory?routingKey=inventory.reserved");
        
        from("direct:confirmOrder")
            .routeId("confirm-order-route")
            .log("确认订单")
            .process(exchange -> {
                OrderMessage order = exchange.getIn().getBody(OrderMessage.class);
                order.setStatus("CONFIRMED");
            });
    }
}

场景二:多系统集成

java
package com.example.camel.scenario;

import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.model.dataformat.JsonLibrary;
import org.springframework.stereotype.Component;

@Component
public class MultiSystemIntegrationRoute extends RouteBuilder {
    
    @Override
    public void configure() throws Exception {
        
        from("rabbitmq:exchange.crm?queue=queue.crm.events")
            .routeId("crm-integration-route")
            .log("CRM 事件: ${body}")
            .unmarshal().json(JsonLibrary.Jackson)
            
            .multicast()
                .parallelProcessing()
                .to("direct:toERP")
                .to("direct:toBilling")
                .to("direct:toNotification")
            .end();
        
        from("direct:toERP")
            .routeId("to-erp-route")
            .log("发送到 ERP 系统")
            .marshal().json(JsonLibrary.Jackson)
            .to("rabbitmq:exchange.erp?routingKey=erp.sync")
            .to("rabbitmq:exchange.audit?routingKey=audit.erp");
        
        from("direct:toBilling")
            .routeId("to-billing-route")
            .log("发送到计费系统")
            .marshal().json(JsonLibrary.Jackson)
            .to("rabbitmq:exchange.billing?routingKey=billing.charge")
            .to("rabbitmq:exchange.audit?routingKey=audit.billing");
        
        from("direct:toNotification")
            .routeId("to-notification-route")
            .log("发送通知")
            .marshal().json(JsonLibrary.Jackson)
            .to("rabbitmq:exchange.notification?routingKey=notification.send")
            .to("rabbitmq:exchange.audit?routingKey=audit.notification");
        
        from("rabbitmq:exchange.audit?queue=queue.audit")
            .routeId("audit-route")
            .log("审计日志: ${body}")
            .to("file:/var/log/camel/audit?fileName=audit-${date:now:yyyyMMdd}.log");
    }
}

常见问题与解决方案

问题一:连接断开

症状: Camel 与 RabbitMQ 连接频繁断开

解决方案:

java
from("rabbitmq:exchange.order?queue=queue.order" +
     "&automaticRecoveryEnabled=true" +
     "&networkRecoveryInterval=5000" +
     "&topologyRecoveryEnabled=true")
    .routeId("recovery-route")
    .log("处理消息: ${body}");

问题二:消息堆积

症状: 队列中消息堆积,消费速度慢

解决方案:

java
from("rabbitmq:exchange.high.throughput?queue=queue.high.throughput" +
     "&concurrentConsumers=10" +
     "&maxConcurrentConsumers=50" +
     "&prefetchCount=20" +
     "&prefetchEnabled=true" +
     "&threadPoolSize=50")
    .routeId("high-throughput-route")
    .threads(20)
    .log("高吞吐量处理: ${body}");

问题三:消息顺序

症状: 消息处理顺序与发送顺序不一致

解决方案:

java
from("rabbitmq:exchange.ordered?queue=queue.ordered" +
     "&concurrentConsumers=1" +
     "&prefetchCount=1")
    .routeId("ordered-route")
    .log("顺序处理: ${body}");

最佳实践建议

1. 路由组织

java
@Component
public class OrganizedRoutes extends RouteBuilder {
    
    @Override
    public void configure() throws Exception {
        errorHandler(defaultErrorHandler()
            .maximumRedeliveries(3)
            .redeliveryDelay(1000)
            .retryAttemptedLogLevel(LoggingLevel.WARN));
        
        configureOrderRoutes();
        configureNotificationRoutes();
        configureAuditRoutes();
    }
    
    private void configureOrderRoutes() {
        from("rabbitmq:exchange.order?queue=queue.order")
            .routeId("order-route")
            .routeGroup("order")
            .log("处理订单");
    }
    
    private void configureNotificationRoutes() {
        from("rabbitmq:exchange.notification?queue=queue.notification")
            .routeId("notification-route")
            .routeGroup("notification")
            .log("发送通知");
    }
    
    private void configureAuditRoutes() {
        from("rabbitmq:exchange.audit?queue=queue.audit")
            .routeId("audit-route")
            .routeGroup("audit")
            .log("审计日志");
    }
}

2. 监控配置

java
from("rabbitmq:exchange.monitored?queue=queue.monitored")
    .routeId("monitored-route")
    .metrics().timer("rabbitmq.processing.time")
    .log("处理消息: ${body}")
    .to("metrics:counter:rabbitmq.messages.processed");

3. 测试支持

java
@CamelSpringBootTest
class RabbitMQRouteTest {
    
    @Autowired
    private CamelContext camelContext;
    
    @Autowired
    private ProducerTemplate producerTemplate;
    
    @Test
    void testOrderRoute() {
        OrderMessage order = new OrderMessage();
        order.setOrderId("TEST-001");
        order.setAmount(1000.0);
        
        String result = producerTemplate.requestBody(
            "direct:sendOrder", 
            order, 
            String.class
        );
        
        assertNotNull(result);
    }
}

版本兼容性

Apache CamelJavaRabbitMQ ClientRabbitMQ Server
4.4.x17+5.x3.11+
4.3.x17+5.x3.10+
4.2.x17+5.x3.9+
3.21.x11+5.x3.9+
3.20.x11+5.x3.8+

相关链接