Skip to content

RabbitMQ 与数据库集成

概述

在现代分布式系统中,消息队列与数据库的集成是构建可靠数据管道的关键。RabbitMQ 可以与各种数据库系统集成,实现数据同步、事件溯源、变更数据捕获(CDC)等功能。本教程将详细介绍 RabbitMQ 与关系型数据库(MySQL、PostgreSQL)的集成方案。

集成架构设计

架构图

┌─────────────────────────────────────────────────────────────────────┐
│                    数据库集成架构                                     │
│                                                                      │
│  ┌─────────────────────────────────────────────────────────────┐    │
│  │                    应用层                                    │    │
│  │  ┌─────────────┐    ┌─────────────┐    ┌─────────────┐     │    │
│  │  │   Producer  │    │   Consumer  │    │   Sync      │     │    │
│  │  │   Service   │    │   Service   │    │   Service   │     │    │
│  │  └─────────────┘    └─────────────┘    └─────────────┘     │    │
│  └─────────────────────────────────────────────────────────────┘    │
│                              │                                       │
│                              ▼                                       │
│  ┌─────────────────────────────────────────────────────────────┐    │
│  │                    RabbitMQ                                  │    │
│  │  ┌─────────────┐    ┌─────────────┐    ┌─────────────┐     │    │
│  │  │  Event      │    │  Command    │    │  Sync       │     │    │
│  │  │  Queue      │    │  Queue      │    │  Queue      │     │    │
│  │  └─────────────┘    └─────────────┘    └─────────────┘     │    │
│  └─────────────────────────────────────────────────────────────┘    │
│                              │                                       │
│                              ▼                                       │
│  ┌─────────────────────────────────────────────────────────────┐    │
│  │                    数据库层                                  │    │
│  │  ┌─────────────┐    ┌─────────────┐    ┌─────────────┐     │    │
│  │  │   MySQL     │    │ PostgreSQL  │    │   Redis     │     │    │
│  │  │   Primary   │    │   Primary   │    │   Cache     │     │    │
│  │  └─────────────┘    └─────────────┘    └─────────────┘     │    │
│  └─────────────────────────────────────────────────────────────┘    │
└─────────────────────────────────────────────────────────────────────┘

集成模式

模式说明
Outbox Pattern发件箱模式,确保消息与数据库事务一致性
CDC (Change Data Capture)变更数据捕获,监听数据库变更并发送消息
Event Sourcing事件溯源,将所有变更作为事件存储
Saga Pattern编排模式,协调分布式事务
CQRS命令查询分离,读写分离架构

配置示例

PHP 数据库连接配置

php
<?php

class DatabaseConfig
{
    public static function getMySQLConnection(): PDO
    {
        $dsn = sprintf(
            'mysql:host=%s;port=%s;dbname=%s;charset=utf8mb4',
            getenv('MYSQL_HOST') ?: 'localhost',
            getenv('MYSQL_PORT') ?: '3306',
            getenv('MYSQL_DATABASE') ?: 'app_db'
        );
        
        $options = [
            PDO::ATTR_ERRMODE => PDO::ERRMODE_EXCEPTION,
            PDO::ATTR_DEFAULT_FETCH_MODE => PDO::FETCH_ASSOC,
            PDO::ATTR_EMULATE_PREPARES => false,
            PDO::MYSQL_ATTR_INIT_COMMAND => "SET NAMES utf8mb4 COLLATE utf8mb4_unicode_ci",
        ];
        
        return new PDO(
            $dsn,
            getenv('MYSQL_USER') ?: 'root',
            getenv('MYSQL_PASSWORD') ?: '',
            $options
        );
    }
    
    public static function getPostgreSQLConnection(): PDO
    {
        $dsn = sprintf(
            'pgsql:host=%s;port=%s;dbname=%s',
            getenv('PG_HOST') ?: 'localhost',
            getenv('PG_PORT') ?: '5432',
            getenv('PG_DATABASE') ?: 'app_db'
        );
        
        $options = [
            PDO::ATTR_ERRMODE => PDO::ERRMODE_EXCEPTION,
            PDO::ATTR_DEFAULT_FETCH_MODE => PDO::FETCH_ASSOC,
        ];
        
        return new PDO(
            $dsn,
            getenv('PG_USER') ?: 'postgres',
            getenv('PG_PASSWORD') ?: '',
            $options
        );
    }
}

RabbitMQ 连接配置

php
<?php

require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;

class RabbitMQConfig
{
    public static function getConnection(): AMQPStreamConnection
    {
        return new AMQPStreamConnection(
            getenv('RABBITMQ_HOST') ?: 'localhost',
            getenv('RABBITMQ_PORT') ?: 5672,
            getenv('RABBITMQ_USER') ?: 'guest',
            getenv('RABBITMQ_PASSWORD') ?: 'guest',
            getenv('RABBITMQ_VHOST') ?: '/'
        );
    }
}

PHP 代码示例

Outbox Pattern 实现

php
<?php

class OutboxPattern
{
    private PDO $db;
    private $rabbitChannel;
    private $rabbitConnection;
    
    public function __construct(PDO $db, AMQPStreamConnection $rabbitConnection)
    {
        $this->db = $db;
        $this->rabbitConnection = $rabbitConnection;
        $this->rabbitChannel = $rabbitConnection->channel();
        $this->setupOutboxTable();
        $this->setupRabbitMQ();
    }
    
    private function setupOutboxTable(): void
    {
        $sql = "
            CREATE TABLE IF NOT EXISTS outbox_messages (
                id VARCHAR(36) PRIMARY KEY,
                aggregate_type VARCHAR(255) NOT NULL,
                aggregate_id VARCHAR(255) NOT NULL,
                event_type VARCHAR(255) NOT NULL,
                payload JSON NOT NULL,
                status ENUM('pending', 'sent', 'failed') DEFAULT 'pending',
                created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                sent_at TIMESTAMP NULL,
                retry_count INT DEFAULT 0,
                error_message TEXT NULL,
                INDEX idx_status (status),
                INDEX idx_created_at (created_at)
            ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
        ";
        
        $this->db->exec($sql);
    }
    
    private function setupRabbitMQ(): void
    {
        $this->rabbitChannel->exchange_declare(
            'outbox.events',
            'topic',
            false,
            true,
            false
        );
        
        $this->rabbitChannel->queue_declare(
            'outbox.processing',
            false,
            true,
            false,
            false
        );
        
        $this->rabbitChannel->queue_bind(
            'outbox.processing',
            'outbox.events',
            '#'
        );
    }
    
    public function executeInTransaction(callable $operation, array $events): bool
    {
        try {
            $this->db->beginTransaction();
            
            $result = $operation($this->db);
            
            foreach ($events as $event) {
                $this->saveToOutbox($event);
            }
            
            $this->db->commit();
            
            $this->processOutbox();
            
            return true;
        } catch (Exception $e) {
            $this->db->rollBack();
            throw $e;
        }
    }
    
    private function saveToOutbox(array $event): void
    {
        $id = $this->generateUUID();
        
        $sql = "
            INSERT INTO outbox_messages 
            (id, aggregate_type, aggregate_id, event_type, payload)
            VALUES (?, ?, ?, ?, ?)
        ";
        
        $stmt = $this->db->prepare($sql);
        $stmt->execute([
            $id,
            $event['aggregate_type'],
            $event['aggregate_id'],
            $event['event_type'],
            json_encode($event['payload'])
        ]);
    }
    
    public function processOutbox(int $batchSize = 100): int
    {
        $sql = "
            SELECT * FROM outbox_messages 
            WHERE status = 'pending' 
            ORDER BY created_at ASC 
            LIMIT ?
            FOR UPDATE SKIP LOCKED
        ";
        
        $stmt = $this->db->prepare($sql);
        $stmt->execute([$batchSize]);
        $messages = $stmt->fetchAll();
        
        $processed = 0;
        
        foreach ($messages as $message) {
            try {
                $this->sendMessage($message);
                
                $this->markAsSent($message['id']);
                $processed++;
            } catch (Exception $e) {
                $this->markAsFailed($message['id'], $e->getMessage());
            }
        }
        
        return $processed;
    }
    
    private function sendMessage(array $message): void
    {
        $routingKey = sprintf(
            '%s.%s',
            strtolower($message['aggregate_type']),
            strtolower($message['event_type'])
        );
        
        $amqpMessage = new AMQPMessage(
            $message['payload'],
            [
                'content_type' => 'application/json',
                'message_id' => $message['id'],
                'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
                'timestamp' => time(),
                'headers' => new AMQPTable([
                    'event_type' => $message['event_type'],
                    'aggregate_type' => $message['aggregate_type'],
                    'aggregate_id' => $message['aggregate_id'],
                ])
            ]
        );
        
        $this->rabbitChannel->basic_publish(
            $amqpMessage,
            'outbox.events',
            $routingKey
        );
    }
    
    private function markAsSent(string $id): void
    {
        $sql = "
            UPDATE outbox_messages 
            SET status = 'sent', sent_at = NOW() 
            WHERE id = ?
        ";
        
        $stmt = $this->db->prepare($sql);
        $stmt->execute([$id]);
    }
    
    private function markAsFailed(string $id, string $error): void
    {
        $sql = "
            UPDATE outbox_messages 
            SET status = 'failed', 
                error_message = ?, 
                retry_count = retry_count + 1 
            WHERE id = ?
        ";
        
        $stmt = $this->db->prepare($sql);
        $stmt->execute([$error, $id]);
    }
    
    private function generateUUID(): string
    {
        return sprintf(
            '%04x%04x-%04x-%04x-%04x-%04x%04x%04x',
            mt_rand(0, 0xffff),
            mt_rand(0, 0xffff),
            mt_rand(0, 0xffff),
            mt_rand(0, 0x0fff) | 0x4000,
            mt_rand(0, 0x3fff) | 0x8000,
            mt_rand(0, 0xffff),
            mt_rand(0, 0xffff),
            mt_rand(0, 0xffff)
        );
    }
    
    public function close(): void
    {
        $this->rabbitChannel->close();
        $this->rabbitConnection->close();
    }
}

// 使用示例
$db = DatabaseConfig::getMySQLConnection();
$rabbit = RabbitMQConfig::getConnection();
$outbox = new OutboxPattern($db, $rabbit);

$outbox->executeInTransaction(
    function (PDO $db) {
        $sql = "INSERT INTO orders (customer_id, amount, status) VALUES (?, ?, ?)";
        $stmt = $db->prepare($sql);
        $stmt->execute(['CUST-001', 1500.00, 'PENDING']);
        
        return $db->lastInsertId();
    },
    [
        [
            'aggregate_type' => 'Order',
            'aggregate_id' => 'ORD-' . time(),
            'event_type' => 'OrderCreated',
            'payload' => [
                'orderId' => 'ORD-' . time(),
                'customerId' => 'CUST-001',
                'amount' => 1500.00,
                'status' => 'PENDING',
                'createdAt' => date('c'),
            ]
        ]
    ]
);

$outbox->close();

CDC (变更数据捕获) 实现

php
<?php

class CDCProcessor
{
    private PDO $db;
    private $rabbitChannel;
    private $rabbitConnection;
    
    public function __construct(PDO $db, AMQPStreamConnection $rabbitConnection)
    {
        $this->db = $db;
        $this->rabbitConnection = $rabbitConnection;
        $this->rabbitChannel = $rabbitConnection->channel();
        $this->setupCDC();
    }
    
    private function setupCDC(): void
    {
        $this->rabbitChannel->exchange_declare(
            'cdc.events',
            'topic',
            false,
            true,
            false
        );
        
        $this->setupMySQLTriggers();
    }
    
    private function setupMySQLTriggers(): void
    {
        $tables = ['orders', 'customers', 'products'];
        
        foreach ($tables as $table) {
            $this->createCDCTable($table);
            $this->createInsertTrigger($table);
            $this->createUpdateTrigger($table);
            $this->createDeleteTrigger($table);
        }
    }
    
    private function createCDCTable(string $table): void
    {
        $cdcTable = "cdc_{$table}";
        
        $sql = "
            CREATE TABLE IF NOT EXISTS {$cdcTable} (
                id BIGINT AUTO_INCREMENT PRIMARY KEY,
                operation ENUM('INSERT', 'UPDATE', 'DELETE') NOT NULL,
                record_id VARCHAR(255) NOT NULL,
                old_data JSON NULL,
                new_data JSON NULL,
                changed_columns JSON NULL,
                created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                processed TINYINT(1) DEFAULT 0,
                INDEX idx_processed (processed),
                INDEX idx_created_at (created_at)
            ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
        ";
        
        $this->db->exec($sql);
    }
    
    private function createInsertTrigger(string $table): void
    {
        $triggerName = "trg_{$table}_insert";
        $cdcTable = "cdc_{$table}";
        
        $sql = "
            DROP TRIGGER IF EXISTS {$triggerName};
            CREATE TRIGGER {$triggerName}
            AFTER INSERT ON {$table}
            FOR EACH ROW
            BEGIN
                INSERT INTO {$cdcTable} (operation, record_id, new_data)
                VALUES ('INSERT', NEW.id, JSON_OBJECT(
                    'id', NEW.id,
                    'data', JSON_OBJECT(
                        'id', NEW.id
                    )
                ));
            END
        ";
        
        $this->db->exec($sql);
    }
    
    private function createUpdateTrigger(string $table): void
    {
        $triggerName = "trg_{$table}_update";
        $cdcTable = "cdc_{$table}";
        
        $sql = "
            DROP TRIGGER IF EXISTS {$triggerName};
            CREATE TRIGGER {$triggerName}
            AFTER UPDATE ON {$table}
            FOR EACH ROW
            BEGIN
                INSERT INTO {$cdcTable} (operation, record_id, old_data, new_data, changed_columns)
                VALUES ('UPDATE', NEW.id, 
                    JSON_OBJECT('id', OLD.id),
                    JSON_OBJECT('id', NEW.id),
                    JSON_ARRAY()
                );
            END
        ";
        
        $this->db->exec($sql);
    }
    
    private function createDeleteTrigger(string $table): void
    {
        $triggerName = "trg_{$table}_delete";
        $cdcTable = "cdc_{$table}";
        
        $sql = "
            DROP TRIGGER IF EXISTS {$triggerName};
            CREATE TRIGGER {$triggerName}
            AFTER DELETE ON {$table}
            FOR EACH ROW
            BEGIN
                INSERT INTO {$cdcTable} (operation, record_id, old_data)
                VALUES ('DELETE', OLD.id, JSON_OBJECT('id', OLD.id));
            END
        ";
        
        $this->db->exec($sql);
    }
    
    public function processCDCTable(string $table, int $batchSize = 100): int
    {
        $cdcTable = "cdc_{$table}";
        
        $sql = "
            SELECT * FROM {$cdcTable}
            WHERE processed = 0
            ORDER BY created_at ASC
            LIMIT ?
            FOR UPDATE SKIP LOCKED
        ";
        
        $stmt = $this->db->prepare($sql);
        $stmt->execute([$batchSize]);
        $records = $stmt->fetchAll();
        
        $processed = 0;
        
        foreach ($records as $record) {
            try {
                $this->publishCDCEvent($table, $record);
                
                $this->markCDCProcessed($cdcTable, $record['id']);
                $processed++;
            } catch (Exception $e) {
                $this->markCDCError($cdcTable, $record['id'], $e->getMessage());
            }
        }
        
        return $processed;
    }
    
    private function publishCDCEvent(string $table, array $record): void
    {
        $routingKey = sprintf(
            'cdc.%s.%s',
            strtolower($table),
            strtolower($record['operation'])
        );
        
        $event = [
            'table' => $table,
            'operation' => $record['operation'],
            'recordId' => $record['record_id'],
            'oldData' => json_decode($record['old_data'], true),
            'newData' => json_decode($record['new_data'], true),
            'changedColumns' => json_decode($record['changed_columns'], true),
            'timestamp' => $record['created_at'],
        ];
        
        $message = new AMQPMessage(
            json_encode($event),
            [
                'content_type' => 'application/json',
                'message_id' => $record['id'],
                'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
                'timestamp' => time(),
            ]
        );
        
        $this->rabbitChannel->basic_publish(
            $message,
            'cdc.events',
            $routingKey
        );
    }
    
    private function markCDCProcessed(string $cdcTable, int $id): void
    {
        $sql = "UPDATE {$cdcTable} SET processed = 1 WHERE id = ?";
        $stmt = $this->db->prepare($sql);
        $stmt->execute([$id]);
    }
    
    private function markCDCError(string $cdcTable, int $id, string $error): void
    {
        $sql = "UPDATE {$cdcTable} SET error_message = ? WHERE id = ?";
        $stmt = $this->db->prepare($sql);
        $stmt->execute([$error, $id]);
    }
    
    public function close(): void
    {
        $this->rabbitChannel->close();
        $this->rabbitConnection->close();
    }
}

// 使用示例
$db = DatabaseConfig::getMySQLConnection();
$rabbit = RabbitMQConfig::getConnection();
$cdc = new CDCProcessor($db, $rabbit);

while (true) {
    $processed = $cdc->processCDCTable('orders');
    echo "处理了 {$processed} 条 CDC 记录\n";
    sleep(1);
}

$cdc->close();

数据库消费者实现

php
<?php

class DatabaseConsumer
{
    private PDO $db;
    private $rabbitChannel;
    private $rabbitConnection;
    
    public function __construct(PDO $db, AMQPStreamConnection $rabbitConnection)
    {
        $this->db = $db;
        $this->rabbitConnection = $rabbitConnection;
        $this->rabbitChannel = $rabbitConnection->channel();
    }
    
    public function consumeOrderEvents(string $queue): void
    {
        $this->rabbitChannel->basic_qos(0, 10, false);
        
        $this->rabbitChannel->basic_consume(
            $queue,
            '',
            false,
            false,
            false,
            false,
            function ($message) {
                $this->handleOrderEvent($message);
            }
        );
        
        while ($this->rabbitChannel->is_consuming()) {
            $this->rabbitChannel->wait();
        }
    }
    
    private function handleOrderEvent($message): void
    {
        try {
            $data = json_decode($message->getBody(), true);
            
            $this->db->beginTransaction();
            
            $eventType = $message->get('application_headers')['event_type'] ?? '';
            
            switch ($eventType) {
                case 'OrderCreated':
                    $this->handleOrderCreated($data);
                    break;
                case 'OrderUpdated':
                    $this->handleOrderUpdated($data);
                    break;
                case 'OrderDeleted':
                    $this->handleOrderDeleted($data);
                    break;
                default:
                    $this->handleUnknownEvent($eventType, $data);
            }
            
            $this->db->commit();
            $message->ack();
            
        } catch (Exception $e) {
            $this->db->rollBack();
            $message->nack(false, true);
            
            error_log("处理订单事件失败: " . $e->getMessage());
        }
    }
    
    private function handleOrderCreated(array $data): void
    {
        $sql = "
            INSERT INTO orders (id, customer_id, amount, status, created_at)
            VALUES (?, ?, ?, ?, ?)
            ON DUPLICATE KEY UPDATE
                customer_id = VALUES(customer_id),
                amount = VALUES(amount),
                status = VALUES(status)
        ";
        
        $stmt = $this->db->prepare($sql);
        $stmt->execute([
            $data['orderId'],
            $data['customerId'],
            $data['amount'],
            $data['status'],
            $data['createdAt'],
        ]);
        
        $this->updateOrderSummary($data['customerId']);
    }
    
    private function handleOrderUpdated(array $data): void
    {
        $sql = "
            UPDATE orders 
            SET customer_id = ?, amount = ?, status = ?, updated_at = ?
            WHERE id = ?
        ";
        
        $stmt = $this->db->prepare($sql);
        $stmt->execute([
            $data['customerId'],
            $data['amount'],
            $data['status'],
            $data['updatedAt'],
            $data['orderId'],
        ]);
    }
    
    private function handleOrderDeleted(array $data): void
    {
        $sql = "DELETE FROM orders WHERE id = ?";
        $stmt = $this->db->prepare($sql);
        $stmt->execute([$data['orderId']]);
    }
    
    private function handleUnknownEvent(string $eventType, array $data): void
    {
        error_log("未知事件类型: {$eventType}");
    }
    
    private function updateOrderSummary(string $customerId): void
    {
        $sql = "
            INSERT INTO customer_order_summary (customer_id, total_orders, total_amount)
            SELECT customer_id, COUNT(*), SUM(amount)
            FROM orders
            WHERE customer_id = ?
            ON DUPLICATE KEY UPDATE
                total_orders = VALUES(total_orders),
                total_amount = VALUES(total_amount)
        ";
        
        $stmt = $this->db->prepare($sql);
        $stmt->execute([$customerId]);
    }
    
    public function close(): void
    {
        $this->rabbitChannel->close();
        $this->rabbitConnection->close();
    }
}

// 使用示例
$db = DatabaseConfig::getMySQLConnection();
$rabbit = RabbitMQConfig::getConnection();
$consumer = new DatabaseConsumer($db, $rabbit);

$consumer->consumeOrderEvents('order.events');

$consumer->close();

批量数据同步实现

php
<?php

class BatchDataSync
{
    private PDO $sourceDb;
    private PDO $targetDb;
    private $rabbitChannel;
    private $rabbitConnection;
    
    public function __construct(
        PDO $sourceDb,
        PDO $targetDb,
        AMQPStreamConnection $rabbitConnection
    ) {
        $this->sourceDb = $sourceDb;
        $this->targetDb = $targetDb;
        $this->rabbitConnection = $rabbitConnection;
        $this->rabbitChannel = $rabbitConnection->channel();
    }
    
    public function syncTable(
        string $table,
        string $primaryKey = 'id',
        int $batchSize = 1000
    ): int {
        $offset = 0;
        $totalSynced = 0;
        
        while (true) {
            $sql = "SELECT * FROM {$table} ORDER BY {$primaryKey} LIMIT ? OFFSET ?";
            $stmt = $this->sourceDb->prepare($sql);
            $stmt->execute([$batchSize, $offset]);
            
            $rows = $stmt->fetchAll();
            
            if (empty($rows)) {
                break;
            }
            
            $synced = $this->syncBatch($table, $rows);
            $totalSynced += $synced;
            
            $this->publishSyncEvent($table, $synced, $offset);
            
            $offset += $batchSize;
        }
        
        return $totalSynced;
    }
    
    private function syncBatch(string $table, array $rows): int
    {
        $synced = 0;
        
        $this->targetDb->beginTransaction();
        
        try {
            foreach ($rows as $row) {
                $this->upsertRow($table, $row);
                $synced++;
            }
            
            $this->targetDb->commit();
        } catch (Exception $e) {
            $this->targetDb->rollBack();
            throw $e;
        }
        
        return $synced;
    }
    
    private function upsertRow(string $table, array $row): void
    {
        $columns = array_keys($row);
        $placeholders = implode(',', array_fill(0, count($columns), '?'));
        $columnList = implode(',', $columns);
        
        $updateClauses = [];
        foreach ($columns as $column) {
            $updateClauses[] = "{$column} = VALUES({$column})";
        }
        $updateClause = implode(',', $updateClauses);
        
        $sql = "
            INSERT INTO {$table} ({$columnList})
            VALUES ({$placeholders})
            ON DUPLICATE KEY UPDATE {$updateClause}
        ";
        
        $stmt = $this->targetDb->prepare($sql);
        $stmt->execute(array_values($row));
    }
    
    private function publishSyncEvent(string $table, int $count, int $offset): void
    {
        $event = [
            'table' => $table,
            'recordsSynced' => $count,
            'offset' => $offset,
            'timestamp' => date('c'),
        ];
        
        $message = new AMQPMessage(
            json_encode($event),
            [
                'content_type' => 'application/json',
                'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
            ]
        );
        
        $this->rabbitChannel->basic_publish(
            $message,
            'sync.events',
            'sync.completed'
        );
    }
    
    public function close(): void
    {
        $this->rabbitChannel->close();
        $this->rabbitConnection->close();
    }
}

// 使用示例
$sourceDb = DatabaseConfig::getMySQLConnection();
$targetDb = DatabaseConfig::getPostgreSQLConnection();
$rabbit = RabbitMQConfig::getConnection();

$sync = new BatchDataSync($sourceDb, $targetDb, $rabbit);

$totalSynced = $sync->syncTable('orders', 'id', 1000);
echo "同步完成: {$totalSynced} 条记录\n";

$sync->close();

实际应用场景

场景一:订单状态同步

php
<?php

class OrderStatusSync
{
    private PDO $db;
    private $rabbitChannel;
    
    public function __construct(PDO $db, AMQPStreamConnection $rabbit)
    {
        $this->db = $db;
        $this->rabbitChannel = $rabbit->channel();
        
        $this->rabbitChannel->exchange_declare(
            'order.status',
            'direct',
            false,
            true,
            false
        );
    }
    
    public function updateStatus(string $orderId, string $newStatus): bool
    {
        try {
            $this->db->beginTransaction();
            
            $sql = "SELECT status FROM orders WHERE id = ? FOR UPDATE";
            $stmt = $this->db->prepare($sql);
            $stmt->execute([$orderId]);
            $order = $stmt->fetch();
            
            if (!$order) {
                throw new Exception("订单不存在: {$orderId}");
            }
            
            $oldStatus = $order['status'];
            
            if (!$this->isValidTransition($oldStatus, $newStatus)) {
                throw new Exception(
                    "无效的状态转换: {$oldStatus} -> {$newStatus}"
                );
            }
            
            $sql = "UPDATE orders SET status = ?, updated_at = NOW() WHERE id = ?";
            $stmt = $this->db->prepare($sql);
            $stmt->execute([$newStatus, $orderId]);
            
            $this->publishStatusEvent($orderId, $oldStatus, $newStatus);
            
            $this->db->commit();
            
            return true;
        } catch (Exception $e) {
            $this->db->rollBack();
            throw $e;
        }
    }
    
    private function isValidTransition(string $from, string $to): bool
    {
        $transitions = [
            'PENDING' => ['CONFIRMED', 'CANCELLED'],
            'CONFIRMED' => ['PROCESSING', 'CANCELLED'],
            'PROCESSING' => ['SHIPPED', 'FAILED'],
            'SHIPPED' => ['DELIVERED', 'RETURNED'],
            'DELIVERED' => ['RETURNED'],
            'RETURNED' => ['REFUNDED'],
            'CANCELLED' => [],
            'FAILED' => ['PENDING'],
            'REFUNDED' => [],
        ];
        
        return in_array($to, $transitions[$from] ?? []);
    }
    
    private function publishStatusEvent(
        string $orderId,
        string $oldStatus,
        string $newStatus
    ): void {
        $event = [
            'orderId' => $orderId,
            'oldStatus' => $oldStatus,
            'newStatus' => $newStatus,
            'timestamp' => date('c'),
        ];
        
        $message = new AMQPMessage(
            json_encode($event),
            [
                'content_type' => 'application/json',
                'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
            ]
        );
        
        $this->rabbitChannel->basic_publish(
            $message,
            'order.status',
            "order.status.{$newStatus}"
        );
    }
}

场景二:库存同步

php
<?php

class InventorySync
{
    private PDO $db;
    private $rabbitChannel;
    
    public function __construct(PDO $db, AMQPStreamConnection $rabbit)
    {
        $this->db = $db;
        $this->rabbitChannel = $rabbit->channel();
    }
    
    public function reserveInventory(string $orderId, array $items): bool
    {
        try {
            $this->db->beginTransaction();
            
            foreach ($items as $item) {
                $sql = "
                    UPDATE inventory 
                    SET quantity = quantity - ?, 
                        reserved = reserved + ?,
                        updated_at = NOW()
                    WHERE product_id = ? AND quantity >= ?
                ";
                
                $stmt = $this->db->prepare($sql);
                $stmt->execute([
                    $item['quantity'],
                    $item['quantity'],
                    $item['productId'],
                    $item['quantity']
                ]);
                
                if ($stmt->rowCount() === 0) {
                    throw new Exception(
                        "库存不足: {$item['productId']}"
                    );
                }
                
                $this->recordReservation($orderId, $item);
            }
            
            $this->publishInventoryEvent($orderId, $items, 'reserved');
            
            $this->db->commit();
            
            return true;
        } catch (Exception $e) {
            $this->db->rollBack();
            throw $e;
        }
    }
    
    public function releaseInventory(string $orderId): bool
    {
        try {
            $this->db->beginTransaction();
            
            $sql = "
                SELECT product_id, quantity 
                FROM inventory_reservations 
                WHERE order_id = ? AND status = 'reserved'
            ";
            
            $stmt = $this->db->prepare($sql);
            $stmt->execute([$orderId]);
            $reservations = $stmt->fetchAll();
            
            foreach ($reservations as $reservation) {
                $sql = "
                    UPDATE inventory 
                    SET quantity = quantity + ?, 
                        reserved = reserved - ?,
                        updated_at = NOW()
                    WHERE product_id = ?
                ";
                
                $stmt = $this->db->prepare($sql);
                $stmt->execute([
                    $reservation['quantity'],
                    $reservation['quantity'],
                    $reservation['product_id']
                ]);
            }
            
            $sql = "
                UPDATE inventory_reservations 
                SET status = 'released', released_at = NOW()
                WHERE order_id = ? AND status = 'reserved'
            ";
            
            $stmt = $this->db->prepare($sql);
            $stmt->execute([$orderId]);
            
            $this->publishInventoryEvent($orderId, $reservations, 'released');
            
            $this->db->commit();
            
            return true;
        } catch (Exception $e) {
            $this->db->rollBack();
            throw $e;
        }
    }
    
    private function recordReservation(string $orderId, array $item): void
    {
        $sql = "
            INSERT INTO inventory_reservations 
            (order_id, product_id, quantity, status, created_at)
            VALUES (?, ?, ?, 'reserved', NOW())
        ";
        
        $stmt = $this->db->prepare($sql);
        $stmt->execute([
            $orderId,
            $item['productId'],
            $item['quantity']
        ]);
    }
    
    private function publishInventoryEvent(
        string $orderId,
        array $items,
        string $action
    ): void {
        $event = [
            'orderId' => $orderId,
            'action' => $action,
            'items' => $items,
            'timestamp' => date('c'),
        ];
        
        $message = new AMQPMessage(
            json_encode($event),
            [
                'content_type' => 'application/json',
                'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
            ]
        );
        
        $this->rabbitChannel->basic_publish(
            $message,
            'inventory.events',
            "inventory.{$action}"
        );
    }
}

常见问题与解决方案

问题一:消息与数据库事务不一致

症状: 消息发送成功但数据库事务回滚,或数据库事务成功但消息发送失败

解决方案: 使用 Outbox Pattern

php
$outbox->executeInTransaction(
    function (PDO $db) {
        // 数据库操作
    },
    [
        // 事件列表
    ]
);

问题二:消息重复消费

症状: 同一条消息被多次处理

解决方案: 实现幂等性

php
private function isProcessed(string $messageId): bool
{
    $sql = "SELECT COUNT(*) FROM processed_messages WHERE message_id = ?";
    $stmt = $this->db->prepare($sql);
    $stmt->execute([$messageId]);
    return $stmt->fetchColumn() > 0;
}

private function markAsProcessed(string $messageId): void
{
    $sql = "INSERT INTO processed_messages (message_id, processed_at) VALUES (?, NOW())";
    $stmt = $this->db->prepare($sql);
    $stmt->execute([$messageId]);
}

问题三:批量操作性能问题

症状: 大批量数据同步时性能下降

解决方案: 使用批量插入和事务优化

php
private function batchInsert(string $table, array $rows): void
{
    $columns = array_keys($rows[0]);
    $placeholders = '(' . implode(',', array_fill(0, count($columns), '?')) . ')';
    $allPlaceholders = implode(',', array_fill(0, count($rows), $placeholders));
    
    $sql = "INSERT INTO {$table} (" . implode(',', $columns) . ") VALUES {$allPlaceholders}";
    
    $values = [];
    foreach ($rows as $row) {
        $values = array_merge($values, array_values($row));
    }
    
    $stmt = $this->db->prepare($sql);
    $stmt->execute($values);
}

最佳实践建议

1. 连接池管理

php
class ConnectionPool
{
    private static ?PDO $dbConnection = null;
    private static ?AMQPStreamConnection $rabbitConnection = null;
    
    public static function getDbConnection(): PDO
    {
        if (self::$dbConnection === null) {
            self::$dbConnection = DatabaseConfig::getMySQLConnection();
        }
        return self::$dbConnection;
    }
    
    public static function getRabbitConnection(): AMQPStreamConnection
    {
        if (self::$rabbitConnection === null) {
            self::$rabbitConnection = RabbitMQConfig::getConnection();
        }
        return self::$rabbitConnection;
    }
}

2. 错误处理

php
class ErrorHandler
{
    public static function handle(Exception $e, array $context = []): void
    {
        error_log(sprintf(
            "[%s] %s in %s:%d\nContext: %s",
            date('Y-m-d H:i:s'),
            $e->getMessage(),
            $e->getFile(),
            $e->getLine(),
            json_encode($context)
        ));
    }
}

3. 监控指标

php
class Metrics
{
    public static function record(string $metric, float $value, array $tags = []): void
    {
        // 发送到监控系统
    }
    
    public static function increment(string $counter, array $tags = []): void
    {
        // 增加计数器
    }
}

版本兼容性

PHPPDO MySQLPDO PostgreSQLphp-amqplibRabbitMQ Server
8.2+8.2+8.2+3.x3.11+
8.1+8.1+8.1+3.x3.10+
8.0+8.0+8.0+2.x/3.x3.9+
7.4+7.4+7.4+2.x3.8+

相关链接