Skip to content

RabbitMQ 与 Kafka 集成

概述

Apache Kafka 是一个分布式流处理平台,具有高吞吐量、持久化存储、分布式处理等特点。将 RabbitMQ 与 Kafka 集成,可以构建强大的混合消息架构,利用 RabbitMQ 的灵活路由和 Kafka 的高吞吐量流处理能力。

本教程将详细介绍 RabbitMQ 与 Kafka 的集成方案,包括消息桥接、数据管道、流处理等场景。

集成架构设计

架构图

┌─────────────────────────────────────────────────────────────────────┐
│                    RabbitMQ + Kafka 混合架构                         │
│                                                                      │
│  ┌─────────────────────────────────────────────────────────────┐    │
│  │                    数据源层                                  │    │
│  │  ┌─────────────┐    ┌─────────────┐    ┌─────────────┐     │    │
│  │  │   应用服务   │    │   IoT 设备   │    │   日志系统   │     │    │
│  │  └─────────────┘    └─────────────┘    └─────────────┘     │    │
│  └─────────────────────────────────────────────────────────────┘    │
│                              │                                       │
│              ┌───────────────┴───────────────┐                      │
│              ▼                               ▼                      │
│  ┌─────────────────────┐        ┌─────────────────────┐            │
│  │      RabbitMQ       │        │        Kafka        │            │
│  │                     │        │                     │            │
│  │  • 灵活路由         │        │  • 高吞吐量         │            │
│  │  • 即时消息         │        │  • 流处理           │            │
│  │  • 请求/回复        │        │  • 事件溯源         │            │
│  │  • RPC 通信         │        │  • 日志聚合         │            │
│  └─────────────────────┘        └─────────────────────┘            │
│              │                               │                      │
│              └───────────────┬───────────────┘                      │
│                              ▼                                       │
│  ┌─────────────────────────────────────────────────────────────┐    │
│  │                    桥接层                                    │    │
│  │  ┌─────────────┐    ┌─────────────┐    ┌─────────────┐     │    │
│  │  │  RabbitMQ   │    │   Kafka     │    │   双向      │     │    │
│  │  │  → Kafka    │    │ → RabbitMQ  │    │   同步      │     │    │
│  │  └─────────────┘    └─────────────┘    └─────────────┘     │    │
│  └─────────────────────────────────────────────────────────────┘    │
│                              │                                       │
│                              ▼                                       │
│  ┌─────────────────────────────────────────────────────────────┐    │
│  │                    消费者层                                  │    │
│  │  ┌─────────────┐    ┌─────────────┐    ┌─────────────┐     │    │
│  │  │  实时服务   │    │  批处理服务  │    │  分析服务   │     │    │
│  │  └─────────────┘    └─────────────┘    └─────────────┘     │    │
│  └─────────────────────────────────────────────────────────────┘    │
└─────────────────────────────────────────────────────────────────────┘

集成模式

模式说明
RabbitMQ to Kafka将 RabbitMQ 消息转发到 Kafka 进行流处理
Kafka to RabbitMQ将 Kafka 消息转发到 RabbitMQ 进行灵活路由
Bidirectional Sync双向同步,实现两个系统的消息互通
Event Bridge事件桥接,统一事件格式

配置示例

Kafka PHP 客户端配置

php
<?php

require_once __DIR__ . '/vendor/autoload.php';

use RdKafka\Conf;
use RdKafka\Producer;
use RdKafka\KafkaConsumer;
use RdKafka\TopicConf;

class KafkaConfig
{
    public static function getProducer(): Producer
    {
        $conf = new Conf();
        
        $conf->set('bootstrap.servers', getenv('KAFKA_BROKERS') ?: 'localhost:9092');
        $conf->set('client.id', getenv('KAFKA_CLIENT_ID') ?: 'php-producer');
        $conf->set('queue.buffering.max.messages', '100000');
        $conf->set('queue.buffering.max.kbytes', '1048576');
        $conf->set('batch.num.messages', '1000');
        $conf->set('compression.codec', 'snappy');
        $conf->set('message.timeout.ms', '30000');
        $conf->set('request.timeout.ms', '5000');
        $conf->set('metadata.request.timeout.ms', '5000');
        
        $conf->set('acks', 'all');
        $conf->set('retries', '3');
        $conf->set('retry.backoff.ms', '100');
        
        $conf->setDrMsgCb(function ($kafka, $message) {
            if ($message->err) {
                error_log("消息发送失败: " . $message->errstr());
            }
        });
        
        return new Producer($conf);
    }
    
    public static function getConsumer(string $groupId): KafkaConsumer
    {
        $conf = new Conf();
        
        $conf->set('bootstrap.servers', getenv('KAFKA_BROKERS') ?: 'localhost:9092');
        $conf->set('group.id', $groupId);
        $conf->set('client.id', getenv('KAFKA_CLIENT_ID') ?: 'php-consumer');
        $conf->set('auto.offset.reset', 'earliest');
        $conf->set('enable.auto.commit', 'false');
        $conf->set('session.timeout.ms', '30000');
        $conf->set('heartbeat.interval.ms', '10000');
        $conf->set('max.poll.interval.ms', '300000');
        $conf->set('max.poll.records', '500');
        
        $conf->setRebalanceCb(function ($kafka, $err, $partitions = null) {
            switch ($err) {
                case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:
                    echo "分区分配: " . count($partitions) . "\n";
                    $kafka->assign($partitions);
                    break;
                case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:
                    echo "分区撤销\n";
                    $kafka->assign(null);
                    break;
                default:
                    echo "重平衡错误: " . $err . "\n";
            }
        });
        
        return new KafkaConsumer($conf);
    }
}

RabbitMQ 连接配置

php
<?php

use PhpAmqpLib\Connection\AMQPStreamConnection;

class RabbitMQConfig
{
    public static function getConnection(): AMQPStreamConnection
    {
        return new AMQPStreamConnection(
            getenv('RABBITMQ_HOST') ?: 'localhost',
            getenv('RABBITMQ_PORT') ?: 5672,
            getenv('RABBITMQ_USER') ?: 'guest',
            getenv('RABBITMQ_PASSWORD') ?: 'guest',
            getenv('RABBITMQ_VHOST') ?: '/'
        );
    }
}

PHP 代码示例

RabbitMQ 到 Kafka 桥接

php
<?php

class RabbitToKafkaBridge
{
    private $rabbitChannel;
    private $rabbitConnection;
    private $kafkaProducer;
    
    public function __construct(
        AMQPStreamConnection $rabbitConnection,
        Producer $kafkaProducer
    ) {
        $this->rabbitConnection = $rabbitConnection;
        $this->rabbitChannel = $rabbitConnection->channel();
        $this->kafkaProducer = $kafkaProducer;
        $this->setupInfrastructure();
    }
    
    private function setupInfrastructure(): void
    {
        $this->rabbitChannel->exchange_declare(
            'bridge.to.kafka',
            'topic',
            false,
            true,
            false
        );
        
        $this->rabbitChannel->queue_declare(
            'bridge.rabbit.to.kafka',
            false,
            true,
            false,
            false
        );
        
        $this->rabbitChannel->queue_bind(
            'bridge.rabbit.to.kafka',
            'bridge.to.kafka',
            '#'
        );
    }
    
    public function bridge(string $queue, string $kafkaTopic): void
    {
        $callback = function ($message) use ($kafkaTopic) {
            $this->forwardToKafka($message, $kafkaTopic);
        };
        
        $this->rabbitChannel->basic_qos(0, 100, false);
        $this->rabbitChannel->basic_consume(
            $queue,
            '',
            false,
            false,
            false,
            false,
            $callback
        );
        
        while ($this->rabbitChannel->is_consuming()) {
            $this->rabbitChannel->wait();
        }
    }
    
    private function forwardToKafka($message, string $topic): void
    {
        $body = $message->getBody();
        $headers = $this->extractHeaders($message);
        
        $topic = $this->kafkaProducer->newTopic($topic);
        
        $kafkaHeaders = [];
        foreach ($headers as $key => $value) {
            $kafkaHeaders[] = $key . ':' . $value;
        }
        
        $topic->producev(
            RD_KAFKA_PARTITION_UA,
            0,
            $body,
            $headers['routing_key'] ?? null,
            $kafkaHeaders
        );
        
        $this->kafkaProducer->poll(0);
        
        $message->ack();
    }
    
    private function extractHeaders($message): array
    {
        $headers = [
            'message_id' => $message->get('message_id'),
            'timestamp' => $message->get('timestamp'),
            'content_type' => $message->get('content_type'),
            'routing_key' => $message->get('routing_key'),
        ];
        
        if ($message->has('application_headers')) {
            $appHeaders = $message->get('application_headers')->getNativeData();
            $headers = array_merge($headers, $appHeaders);
        }
        
        return $headers;
    }
    
    public function flush(): void
    {
        $this->kafkaProducer->flush(10000);
    }
    
    public function close(): void
    {
        $this->flush();
        $this->rabbitChannel->close();
        $this->rabbitConnection->close();
    }
}

// 使用示例
$rabbit = RabbitMQConfig::getConnection();
$kafka = KafkaConfig::getProducer();

$bridge = new RabbitToKafkaBridge($rabbit, $kafka);

echo "开始桥接 RabbitMQ 到 Kafka...\n";

$bridge->bridge('bridge.rabbit.to.kafka', 'events');

$bridge->close();

Kafka 到 RabbitMQ 桥接

php
<?php

class KafkaToRabbitBridge
{
    private $rabbitChannel;
    private $rabbitConnection;
    private $kafkaConsumer;
    
    public function __construct(
        AMQPStreamConnection $rabbitConnection,
        KafkaConsumer $kafkaConsumer
    ) {
        $this->rabbitConnection = $rabbitConnection;
        $this->rabbitChannel = $rabbitConnection->channel();
        $this->kafkaConsumer = $kafkaConsumer;
        $this->setupInfrastructure();
    }
    
    private function setupInfrastructure(): void
    {
        $this->rabbitChannel->exchange_declare(
            'bridge.from.kafka',
            'topic',
            false,
            true,
            false
        );
    }
    
    public function bridge(array $topics, string $rabbitExchange): void
    {
        $this->kafkaConsumer->subscribe($topics);
        
        echo "开始桥接 Kafka 到 RabbitMQ...\n";
        
        while (true) {
            $message = $this->kafkaConsumer->consume(1000);
            
            if ($message === null) {
                continue;
            }
            
            switch ($message->err) {
                case RD_KAFKA_RESP_ERR_NO_ERROR:
                    $this->forwardToRabbit($message, $rabbitExchange);
                    break;
                    
                case RD_KAFKA_RESP_ERR__PARTITION_EOF:
                    echo "到达分区末尾\n";
                    break;
                    
                case RD_KAFKA_RESP_ERR__TIMED_OUT:
                    break;
                    
                default:
                    echo "消费错误: " . $message->errstr() . "\n";
                    break;
            }
        }
    }
    
    private function forwardToRabbit($message, string $exchange): void
    {
        $routingKey = $this->buildRoutingKey($message);
        
        $headers = [];
        if ($message->headers) {
            foreach ($message->headers as $key => $value) {
                $headers[$key] = $value;
            }
        }
        
        $amqpMessage = new \PhpAmqpLib\Message\AMQPMessage(
            $message->payload,
            [
                'content_type' => 'application/json',
                'delivery_mode' => \PhpAmqpLib\Message\AMQPMessage::DELIVERY_MODE_PERSISTENT,
                'message_id' => $message->key ?? uniqid(),
                'timestamp' => time(),
                'application_headers' => new \PhpAmqpLib\Wire\AMQPTable(array_merge($headers, [
                    'kafka_topic' => $message->topic_name,
                    'kafka_partition' => $message->partition,
                    'kafka_offset' => $message->offset,
                ]))
            ]
        );
        
        $this->rabbitChannel->basic_publish(
            $amqpMessage,
            $exchange,
            $routingKey
        );
        
        $this->kafkaConsumer->commit($message);
    }
    
    private function buildRoutingKey($message): string
    {
        $topic = str_replace('_', '.', $message->topic_name);
        
        if (isset($message->headers['event_type'])) {
            return $topic . '.' . $message->headers['event_type'];
        }
        
        return $topic;
    }
    
    public function close(): void
    {
        $this->kafkaConsumer->close();
        $this->rabbitChannel->close();
        $this->rabbitConnection->close();
    }
}

// 使用示例
$rabbit = RabbitMQConfig::getConnection();
$kafka = KafkaConfig::getConsumer('rabbit-bridge-group');

$bridge = new KafkaToRabbitBridge($rabbit, $kafka);

$bridge->bridge(['events', 'orders'], 'bridge.from.kafka');

$bridge->close();

双向同步服务

php
<?php

class BidirectionalSync
{
    private $rabbitChannel;
    private $rabbitConnection;
    private $kafkaProducer;
    private $kafkaConsumer;
    
    private const SYNC_QUEUE = 'sync.bidirectional';
    private const SYNC_TOPIC = 'sync.events';
    
    public function __construct(
        AMQPStreamConnection $rabbitConnection,
        Producer $kafkaProducer,
        KafkaConsumer $kafkaConsumer
    ) {
        $this->rabbitConnection = $rabbitConnection;
        $this->rabbitChannel = $rabbitConnection->channel();
        $this->kafkaProducer = $kafkaProducer;
        $this->kafkaConsumer = $kafkaConsumer;
        $this->setupInfrastructure();
    }
    
    private function setupInfrastructure(): void
    {
        $this->rabbitChannel->exchange_declare(
            'sync.exchange',
            'topic',
            false,
            true,
            false
        );
        
        $this->rabbitChannel->queue_declare(
            self::SYNC_QUEUE,
            false,
            true,
            false,
            false
        );
        
        $this->rabbitChannel->queue_bind(
            self::SYNC_QUEUE,
            'sync.exchange',
            'sync.#'
        );
    }
    
    public function startSync(): void
    {
        $this->kafkaConsumer->subscribe([self::SYNC_TOPIC]);
        
        $rabbitCallback = function ($message) {
            $this->handleRabbitMessage($message);
        };
        
        $this->rabbitChannel->basic_qos(0, 50, false);
        $this->rabbitChannel->basic_consume(
            self::SYNC_QUEUE,
            '',
            false,
            false,
            false,
            false,
            $rabbitCallback
        );
        
        while (true) {
            $read = [$this->rabbitConnection->getSocket()];
            $write = null;
            $except = null;
            
            if (stream_select($read, $write, $except, 0, 100000) > 0) {
                $this->rabbitChannel->wait(null, true);
            }
            
            $kafkaMessage = $this->kafkaConsumer->consume(0);
            
            if ($kafkaMessage && $kafkaMessage->err === RD_KAFKA_RESP_ERR_NO_ERROR) {
                $this->handleKafkaMessage($kafkaMessage);
            }
        }
    }
    
    private function handleRabbitMessage($message): void
    {
        $source = $message->get('application_headers')['source'] ?? 'unknown';
        
        if ($source === 'kafka') {
            $message->ack();
            return;
        }
        
        $topic = $this->kafkaProducer->newTopic(self::SYNC_TOPIC);
        
        $payload = json_encode([
            'source' => 'rabbitmq',
            'body' => $message->getBody(),
            'headers' => $this->extractHeaders($message),
            'timestamp' => date('c'),
        ]);
        
        $topic->produce(
            RD_KAFKA_PARTITION_UA,
            0,
            $payload,
            $message->get('message_id')
        );
        
        $this->kafkaProducer->poll(0);
        
        $message->ack();
    }
    
    private function handleKafkaMessage($message): void
    {
        $data = json_decode($message->payload, true);
        
        if ($data['source'] === 'rabbitmq') {
            $this->kafkaConsumer->commit($message);
            return;
        }
        
        $amqpMessage = new \PhpAmqpLib\Message\AMQPMessage(
            $data['body'],
            [
                'content_type' => 'application/json',
                'delivery_mode' => \PhpAmqpLib\Message\AMQPMessage::DELIVERY_MODE_PERSISTENT,
                'application_headers' => new \PhpAmqpLib\Wire\AMQPTable([
                    'source' => 'kafka',
                    'kafka_topic' => $message->topic_name,
                    'kafka_partition' => $message->partition,
                    'kafka_offset' => $message->offset,
                ])
            ]
        );
        
        $routingKey = 'sync.from.kafka';
        
        $this->rabbitChannel->basic_publish(
            $amqpMessage,
            'sync.exchange',
            $routingKey
        );
        
        $this->kafkaConsumer->commit($message);
    }
    
    private function extractHeaders($message): array
    {
        $headers = [];
        
        if ($message->has('application_headers')) {
            $headers = $message->get('application_headers')->getNativeData();
        }
        
        return $headers;
    }
    
    public function close(): void
    {
        $this->kafkaProducer->flush(10000);
        $this->kafkaConsumer->close();
        $this->rabbitChannel->close();
        $this->rabbitConnection->close();
    }
}

消息格式转换器

php
<?php

class MessageFormatConverter
{
    public static function rabbitToKafka($rabbitMessage): array
    {
        $headers = [];
        
        if ($rabbitMessage->has('application_headers')) {
            $appHeaders = $rabbitMessage->get('application_headers')->getNativeData();
            foreach ($appHeaders as $key => $value) {
                $headers[] = $key . '=' . $value;
            }
        }
        
        return [
            'key' => $rabbitMessage->get('message_id'),
            'value' => $rabbitMessage->getBody(),
            'headers' => $headers,
            'timestamp' => $rabbitMessage->get('timestamp') * 1000,
        ];
    }
    
    public static function kafkaToRabbit($kafkaMessage): \PhpAmqpLib\Message\AMQPMessage
    {
        $headers = [];
        
        if ($kafkaMessage->headers) {
            foreach ($kafkaMessage->headers as $key => $value) {
                $headers[$key] = $value;
            }
        }
        
        $headers['kafka_topic'] = $kafkaMessage->topic_name;
        $headers['kafka_partition'] = $kafkaMessage->partition;
        $headers['kafka_offset'] = $kafkaMessage->offset;
        
        return new \PhpAmqpLib\Message\AMQPMessage(
            $kafkaMessage->payload,
            [
                'content_type' => 'application/json',
                'delivery_mode' => \PhpAmqpLib\Message\AMQPMessage::DELIVERY_MODE_PERSISTENT,
                'message_id' => $kafkaMessage->key ?? uniqid(),
                'timestamp' => (int) ($kafkaMessage->timestamp / 1000),
                'application_headers' => new \PhpAmqpLib\Wire\AMQPTable($headers),
            ]
        );
    }
    
    public static function normalizeEvent(array $rawEvent): array
    {
        return [
            'eventId' => $rawEvent['eventId'] ?? uniqid(),
            'eventType' => $rawEvent['eventType'] ?? 'unknown',
            'aggregateType' => $rawEvent['aggregateType'] ?? 'unknown',
            'aggregateId' => $rawEvent['aggregateId'] ?? null,
            'payload' => $rawEvent['payload'] ?? [],
            'metadata' => [
                'timestamp' => $rawEvent['timestamp'] ?? date('c'),
                'correlationId' => $rawEvent['correlationId'] ?? null,
                'causationId' => $rawEvent['causationId'] ?? null,
                'source' => $rawEvent['source'] ?? 'unknown',
            ],
        ];
    }
}

实际应用场景

场景一:事件流处理

php
<?php

class EventStreamProcessor
{
    private $rabbitChannel;
    private $kafkaProducer;
    private $kafkaConsumer;
    
    public function __construct(
        AMQPStreamConnection $rabbit,
        Producer $kafkaProducer,
        KafkaConsumer $kafkaConsumer
    ) {
        $this->rabbitChannel = $rabbit->channel();
        $this->kafkaProducer = $kafkaProducer;
        $this->kafkaConsumer = $kafkaConsumer;
    }
    
    public function processEventStream(): void
    {
        $this->kafkaConsumer->subscribe(['events.raw']);
        
        while (true) {
            $message = $this->kafkaConsumer->consume(1000);
            
            if (!$message || $message->err !== RD_KAFKA_RESP_ERR_NO_ERROR) {
                continue;
            }
            
            $event = json_decode($message->payload, true);
            
            $processedEvent = $this->processEvent($event);
            
            $this->sendToKafka('events.processed', $processedEvent);
            
            $this->sendToRabbit($processedEvent);
            
            $this->kafkaConsumer->commit($message);
        }
    }
    
    private function processEvent(array $event): array
    {
        $event['processedAt'] = date('c');
        $event['processorId'] = gethostname();
        
        switch ($event['eventType']) {
            case 'OrderCreated':
                $event['payload']['orderNumber'] = 'ORD-' . time();
                break;
            case 'PaymentProcessed':
                $event['payload']['transactionId'] = 'TXN-' . uniqid();
                break;
        }
        
        return $event;
    }
    
    private function sendToKafka(string $topic, array $event): void
    {
        $kafkaTopic = $this->kafkaProducer->newTopic($topic);
        
        $kafkaTopic->produce(
            RD_KAFKA_PARTITION_UA,
            0,
            json_encode($event),
            $event['eventId']
        );
        
        $this->kafkaProducer->poll(0);
    }
    
    private function sendToRabbit(array $event): void
    {
        $routingKey = sprintf(
            '%s.%s',
            strtolower($event['aggregateType']),
            strtolower($event['eventType'])
        );
        
        $message = new \PhpAmqpLib\Message\AMQPMessage(
            json_encode($event),
            [
                'content_type' => 'application/json',
                'delivery_mode' => \PhpAmqpLib\Message\AMQPMessage::DELIVERY_MODE_PERSISTENT,
                'message_id' => $event['eventId'],
            ]
        );
        
        $this->rabbitChannel->basic_publish(
            $message,
            'events.processed',
            $routingKey
        );
    }
}

场景二:日志聚合管道

php
<?php

class LogAggregationPipeline
{
    private $rabbitChannel;
    private $kafkaProducer;
    
    public function __construct(
        AMQPStreamConnection $rabbit,
        Producer $kafkaProducer
    ) {
        $this->rabbitChannel = $rabbit->channel();
        $this->kafkaProducer = $kafkaProducer;
    }
    
    public function startPipeline(): void
    {
        $this->rabbitChannel->queue_declare('logs.ingestion', false, true, false, false);
        
        $callback = function ($message) {
            $this->processLog($message);
        };
        
        $this->rabbitChannel->basic_qos(0, 100, false);
        $this->rabbitChannel->basic_consume(
            'logs.ingestion',
            '',
            false,
            false,
            false,
            false,
            $callback
        );
        
        while ($this->rabbitChannel->is_consuming()) {
            $this->rabbitChannel->wait();
        }
    }
    
    private function processLog($message): void
    {
        $log = json_decode($message->getBody(), true);
        
        $enrichedLog = $this->enrichLog($log);
        
        $this->sendToKafka($enrichedLog);
        
        $message->ack();
    }
    
    private function enrichLog(array $log): array
    {
        return [
            'timestamp' => $log['timestamp'] ?? date('c'),
            'level' => strtoupper($log['level'] ?? 'INFO'),
            'message' => $log['message'] ?? '',
            'service' => $log['service'] ?? 'unknown',
            'host' => $log['host'] ?? gethostname(),
            'trace_id' => $log['trace_id'] ?? null,
            'span_id' => $log['span_id'] ?? null,
            'context' => $log['context'] ?? [],
            'extra' => $log['extra'] ?? [],
            'enriched_at' => date('c'),
            'pipeline_version' => '1.0',
        ];
    }
    
    private function sendToKafka(array $log): void
    {
        $topic = $this->kafkaProducer->newTopic('logs.enriched');
        
        $partitionKey = $log['service'] ?? null;
        
        $topic->produce(
            RD_KAFKA_PARTITION_UA,
            0,
            json_encode($log),
            $partitionKey
        );
        
        $this->kafkaProducer->poll(0);
    }
}

常见问题与解决方案

问题一:消息顺序保证

症状: 消息顺序不一致

解决方案: 使用分区键保证顺序

php
$topic->produce(
    RD_KAFKA_PARTITION_UA,
    0,
    $payload,
    $aggregateId
);

问题二:消息丢失

症状: 部分消息未成功传递

解决方案: 配置确认机制

php
$conf->set('acks', 'all');
$conf->set('enable.idempotence', 'true');

$conf->setDrMsgCb(function ($kafka, $message) {
    if ($message->err) {
        $this->handleSendFailure($message);
    }
});

问题三:消费者重平衡

症状: 消费者频繁重平衡导致处理中断

解决方案: 优化会话配置

php
$conf->set('session.timeout.ms', '30000');
$conf->set('heartbeat.interval.ms', '10000');
$conf->set('max.poll.interval.ms', '300000');
$conf->set('max.poll.records', '100');

最佳实践建议

1. 错误处理

php
class BridgeErrorHandler
{
    public static function handle($message, Exception $e): void
    {
        $error = [
            'original_message' => $message->payload ?? $message->getBody(),
            'error' => $e->getMessage(),
            'timestamp' => date('c'),
        ];
        
        error_log(json_encode($error));
    }
}

2. 监控指标

php
class BridgeMetrics
{
    public static function recordThroughput(string $direction, int $count): void
    {
        // 记录吞吐量
    }
    
    public static function recordLatency(string $direction, float $ms): void
    {
        // 记录延迟
    }
}

3. 健康检查

php
class BridgeHealthCheck
{
    public static function check(): array
    {
        return [
            'rabbitmq' => $this->checkRabbitMQ(),
            'kafka' => $this->checkKafka(),
            'status' => 'healthy',
        ];
    }
}

版本兼容性

PHPlibrdkafkaphp-rdkafkaKafkaRabbitMQ Server
8.2+2.x6.x3.x3.11+
8.1+2.x6.x3.x3.10+
8.0+1.x/2.x5.x/6.x2.x/3.x3.9+
7.4+1.x4.x/5.x2.x3.8+

相关链接