Appearance
RabbitMQ 与数据库集成
概述
在现代分布式系统中,消息队列与数据库的集成是构建可靠数据管道的关键。RabbitMQ 可以与各种数据库系统集成,实现数据同步、事件溯源、变更数据捕获(CDC)等功能。本教程将详细介绍 RabbitMQ 与关系型数据库(MySQL、PostgreSQL)的集成方案。
集成架构设计
架构图
┌─────────────────────────────────────────────────────────────────────┐
│ 数据库集成架构 │
│ │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ 应用层 │ │
│ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │
│ │ │ Producer │ │ Consumer │ │ Sync │ │ │
│ │ │ Service │ │ Service │ │ Service │ │ │
│ │ └─────────────┘ └─────────────┘ └─────────────┘ │ │
│ └─────────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ RabbitMQ │ │
│ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │
│ │ │ Event │ │ Command │ │ Sync │ │ │
│ │ │ Queue │ │ Queue │ │ Queue │ │ │
│ │ └─────────────┘ └─────────────┘ └─────────────┘ │ │
│ └─────────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ 数据库层 │ │
│ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │
│ │ │ MySQL │ │ PostgreSQL │ │ Redis │ │ │
│ │ │ Primary │ │ Primary │ │ Cache │ │ │
│ │ └─────────────┘ └─────────────┘ └─────────────┘ │ │
│ └─────────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────────┘集成模式
| 模式 | 说明 |
|---|---|
| Outbox Pattern | 发件箱模式,确保消息与数据库事务一致性 |
| CDC (Change Data Capture) | 变更数据捕获,监听数据库变更并发送消息 |
| Event Sourcing | 事件溯源,将所有变更作为事件存储 |
| Saga Pattern | 编排模式,协调分布式事务 |
| CQRS | 命令查询分离,读写分离架构 |
配置示例
PHP 数据库连接配置
php
<?php
class DatabaseConfig
{
public static function getMySQLConnection(): PDO
{
$dsn = sprintf(
'mysql:host=%s;port=%s;dbname=%s;charset=utf8mb4',
getenv('MYSQL_HOST') ?: 'localhost',
getenv('MYSQL_PORT') ?: '3306',
getenv('MYSQL_DATABASE') ?: 'app_db'
);
$options = [
PDO::ATTR_ERRMODE => PDO::ERRMODE_EXCEPTION,
PDO::ATTR_DEFAULT_FETCH_MODE => PDO::FETCH_ASSOC,
PDO::ATTR_EMULATE_PREPARES => false,
PDO::MYSQL_ATTR_INIT_COMMAND => "SET NAMES utf8mb4 COLLATE utf8mb4_unicode_ci",
];
return new PDO(
$dsn,
getenv('MYSQL_USER') ?: 'root',
getenv('MYSQL_PASSWORD') ?: '',
$options
);
}
public static function getPostgreSQLConnection(): PDO
{
$dsn = sprintf(
'pgsql:host=%s;port=%s;dbname=%s',
getenv('PG_HOST') ?: 'localhost',
getenv('PG_PORT') ?: '5432',
getenv('PG_DATABASE') ?: 'app_db'
);
$options = [
PDO::ATTR_ERRMODE => PDO::ERRMODE_EXCEPTION,
PDO::ATTR_DEFAULT_FETCH_MODE => PDO::FETCH_ASSOC,
];
return new PDO(
$dsn,
getenv('PG_USER') ?: 'postgres',
getenv('PG_PASSWORD') ?: '',
$options
);
}
}RabbitMQ 连接配置
php
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;
class RabbitMQConfig
{
public static function getConnection(): AMQPStreamConnection
{
return new AMQPStreamConnection(
getenv('RABBITMQ_HOST') ?: 'localhost',
getenv('RABBITMQ_PORT') ?: 5672,
getenv('RABBITMQ_USER') ?: 'guest',
getenv('RABBITMQ_PASSWORD') ?: 'guest',
getenv('RABBITMQ_VHOST') ?: '/'
);
}
}PHP 代码示例
Outbox Pattern 实现
php
<?php
class OutboxPattern
{
private PDO $db;
private $rabbitChannel;
private $rabbitConnection;
public function __construct(PDO $db, AMQPStreamConnection $rabbitConnection)
{
$this->db = $db;
$this->rabbitConnection = $rabbitConnection;
$this->rabbitChannel = $rabbitConnection->channel();
$this->setupOutboxTable();
$this->setupRabbitMQ();
}
private function setupOutboxTable(): void
{
$sql = "
CREATE TABLE IF NOT EXISTS outbox_messages (
id VARCHAR(36) PRIMARY KEY,
aggregate_type VARCHAR(255) NOT NULL,
aggregate_id VARCHAR(255) NOT NULL,
event_type VARCHAR(255) NOT NULL,
payload JSON NOT NULL,
status ENUM('pending', 'sent', 'failed') DEFAULT 'pending',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
sent_at TIMESTAMP NULL,
retry_count INT DEFAULT 0,
error_message TEXT NULL,
INDEX idx_status (status),
INDEX idx_created_at (created_at)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
";
$this->db->exec($sql);
}
private function setupRabbitMQ(): void
{
$this->rabbitChannel->exchange_declare(
'outbox.events',
'topic',
false,
true,
false
);
$this->rabbitChannel->queue_declare(
'outbox.processing',
false,
true,
false,
false
);
$this->rabbitChannel->queue_bind(
'outbox.processing',
'outbox.events',
'#'
);
}
public function executeInTransaction(callable $operation, array $events): bool
{
try {
$this->db->beginTransaction();
$result = $operation($this->db);
foreach ($events as $event) {
$this->saveToOutbox($event);
}
$this->db->commit();
$this->processOutbox();
return true;
} catch (Exception $e) {
$this->db->rollBack();
throw $e;
}
}
private function saveToOutbox(array $event): void
{
$id = $this->generateUUID();
$sql = "
INSERT INTO outbox_messages
(id, aggregate_type, aggregate_id, event_type, payload)
VALUES (?, ?, ?, ?, ?)
";
$stmt = $this->db->prepare($sql);
$stmt->execute([
$id,
$event['aggregate_type'],
$event['aggregate_id'],
$event['event_type'],
json_encode($event['payload'])
]);
}
public function processOutbox(int $batchSize = 100): int
{
$sql = "
SELECT * FROM outbox_messages
WHERE status = 'pending'
ORDER BY created_at ASC
LIMIT ?
FOR UPDATE SKIP LOCKED
";
$stmt = $this->db->prepare($sql);
$stmt->execute([$batchSize]);
$messages = $stmt->fetchAll();
$processed = 0;
foreach ($messages as $message) {
try {
$this->sendMessage($message);
$this->markAsSent($message['id']);
$processed++;
} catch (Exception $e) {
$this->markAsFailed($message['id'], $e->getMessage());
}
}
return $processed;
}
private function sendMessage(array $message): void
{
$routingKey = sprintf(
'%s.%s',
strtolower($message['aggregate_type']),
strtolower($message['event_type'])
);
$amqpMessage = new AMQPMessage(
$message['payload'],
[
'content_type' => 'application/json',
'message_id' => $message['id'],
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
'timestamp' => time(),
'headers' => new AMQPTable([
'event_type' => $message['event_type'],
'aggregate_type' => $message['aggregate_type'],
'aggregate_id' => $message['aggregate_id'],
])
]
);
$this->rabbitChannel->basic_publish(
$amqpMessage,
'outbox.events',
$routingKey
);
}
private function markAsSent(string $id): void
{
$sql = "
UPDATE outbox_messages
SET status = 'sent', sent_at = NOW()
WHERE id = ?
";
$stmt = $this->db->prepare($sql);
$stmt->execute([$id]);
}
private function markAsFailed(string $id, string $error): void
{
$sql = "
UPDATE outbox_messages
SET status = 'failed',
error_message = ?,
retry_count = retry_count + 1
WHERE id = ?
";
$stmt = $this->db->prepare($sql);
$stmt->execute([$error, $id]);
}
private function generateUUID(): string
{
return sprintf(
'%04x%04x-%04x-%04x-%04x-%04x%04x%04x',
mt_rand(0, 0xffff),
mt_rand(0, 0xffff),
mt_rand(0, 0xffff),
mt_rand(0, 0x0fff) | 0x4000,
mt_rand(0, 0x3fff) | 0x8000,
mt_rand(0, 0xffff),
mt_rand(0, 0xffff),
mt_rand(0, 0xffff)
);
}
public function close(): void
{
$this->rabbitChannel->close();
$this->rabbitConnection->close();
}
}
// 使用示例
$db = DatabaseConfig::getMySQLConnection();
$rabbit = RabbitMQConfig::getConnection();
$outbox = new OutboxPattern($db, $rabbit);
$outbox->executeInTransaction(
function (PDO $db) {
$sql = "INSERT INTO orders (customer_id, amount, status) VALUES (?, ?, ?)";
$stmt = $db->prepare($sql);
$stmt->execute(['CUST-001', 1500.00, 'PENDING']);
return $db->lastInsertId();
},
[
[
'aggregate_type' => 'Order',
'aggregate_id' => 'ORD-' . time(),
'event_type' => 'OrderCreated',
'payload' => [
'orderId' => 'ORD-' . time(),
'customerId' => 'CUST-001',
'amount' => 1500.00,
'status' => 'PENDING',
'createdAt' => date('c'),
]
]
]
);
$outbox->close();CDC (变更数据捕获) 实现
php
<?php
class CDCProcessor
{
private PDO $db;
private $rabbitChannel;
private $rabbitConnection;
public function __construct(PDO $db, AMQPStreamConnection $rabbitConnection)
{
$this->db = $db;
$this->rabbitConnection = $rabbitConnection;
$this->rabbitChannel = $rabbitConnection->channel();
$this->setupCDC();
}
private function setupCDC(): void
{
$this->rabbitChannel->exchange_declare(
'cdc.events',
'topic',
false,
true,
false
);
$this->setupMySQLTriggers();
}
private function setupMySQLTriggers(): void
{
$tables = ['orders', 'customers', 'products'];
foreach ($tables as $table) {
$this->createCDCTable($table);
$this->createInsertTrigger($table);
$this->createUpdateTrigger($table);
$this->createDeleteTrigger($table);
}
}
private function createCDCTable(string $table): void
{
$cdcTable = "cdc_{$table}";
$sql = "
CREATE TABLE IF NOT EXISTS {$cdcTable} (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
operation ENUM('INSERT', 'UPDATE', 'DELETE') NOT NULL,
record_id VARCHAR(255) NOT NULL,
old_data JSON NULL,
new_data JSON NULL,
changed_columns JSON NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
processed TINYINT(1) DEFAULT 0,
INDEX idx_processed (processed),
INDEX idx_created_at (created_at)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
";
$this->db->exec($sql);
}
private function createInsertTrigger(string $table): void
{
$triggerName = "trg_{$table}_insert";
$cdcTable = "cdc_{$table}";
$sql = "
DROP TRIGGER IF EXISTS {$triggerName};
CREATE TRIGGER {$triggerName}
AFTER INSERT ON {$table}
FOR EACH ROW
BEGIN
INSERT INTO {$cdcTable} (operation, record_id, new_data)
VALUES ('INSERT', NEW.id, JSON_OBJECT(
'id', NEW.id,
'data', JSON_OBJECT(
'id', NEW.id
)
));
END
";
$this->db->exec($sql);
}
private function createUpdateTrigger(string $table): void
{
$triggerName = "trg_{$table}_update";
$cdcTable = "cdc_{$table}";
$sql = "
DROP TRIGGER IF EXISTS {$triggerName};
CREATE TRIGGER {$triggerName}
AFTER UPDATE ON {$table}
FOR EACH ROW
BEGIN
INSERT INTO {$cdcTable} (operation, record_id, old_data, new_data, changed_columns)
VALUES ('UPDATE', NEW.id,
JSON_OBJECT('id', OLD.id),
JSON_OBJECT('id', NEW.id),
JSON_ARRAY()
);
END
";
$this->db->exec($sql);
}
private function createDeleteTrigger(string $table): void
{
$triggerName = "trg_{$table}_delete";
$cdcTable = "cdc_{$table}";
$sql = "
DROP TRIGGER IF EXISTS {$triggerName};
CREATE TRIGGER {$triggerName}
AFTER DELETE ON {$table}
FOR EACH ROW
BEGIN
INSERT INTO {$cdcTable} (operation, record_id, old_data)
VALUES ('DELETE', OLD.id, JSON_OBJECT('id', OLD.id));
END
";
$this->db->exec($sql);
}
public function processCDCTable(string $table, int $batchSize = 100): int
{
$cdcTable = "cdc_{$table}";
$sql = "
SELECT * FROM {$cdcTable}
WHERE processed = 0
ORDER BY created_at ASC
LIMIT ?
FOR UPDATE SKIP LOCKED
";
$stmt = $this->db->prepare($sql);
$stmt->execute([$batchSize]);
$records = $stmt->fetchAll();
$processed = 0;
foreach ($records as $record) {
try {
$this->publishCDCEvent($table, $record);
$this->markCDCProcessed($cdcTable, $record['id']);
$processed++;
} catch (Exception $e) {
$this->markCDCError($cdcTable, $record['id'], $e->getMessage());
}
}
return $processed;
}
private function publishCDCEvent(string $table, array $record): void
{
$routingKey = sprintf(
'cdc.%s.%s',
strtolower($table),
strtolower($record['operation'])
);
$event = [
'table' => $table,
'operation' => $record['operation'],
'recordId' => $record['record_id'],
'oldData' => json_decode($record['old_data'], true),
'newData' => json_decode($record['new_data'], true),
'changedColumns' => json_decode($record['changed_columns'], true),
'timestamp' => $record['created_at'],
];
$message = new AMQPMessage(
json_encode($event),
[
'content_type' => 'application/json',
'message_id' => $record['id'],
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
'timestamp' => time(),
]
);
$this->rabbitChannel->basic_publish(
$message,
'cdc.events',
$routingKey
);
}
private function markCDCProcessed(string $cdcTable, int $id): void
{
$sql = "UPDATE {$cdcTable} SET processed = 1 WHERE id = ?";
$stmt = $this->db->prepare($sql);
$stmt->execute([$id]);
}
private function markCDCError(string $cdcTable, int $id, string $error): void
{
$sql = "UPDATE {$cdcTable} SET error_message = ? WHERE id = ?";
$stmt = $this->db->prepare($sql);
$stmt->execute([$error, $id]);
}
public function close(): void
{
$this->rabbitChannel->close();
$this->rabbitConnection->close();
}
}
// 使用示例
$db = DatabaseConfig::getMySQLConnection();
$rabbit = RabbitMQConfig::getConnection();
$cdc = new CDCProcessor($db, $rabbit);
while (true) {
$processed = $cdc->processCDCTable('orders');
echo "处理了 {$processed} 条 CDC 记录\n";
sleep(1);
}
$cdc->close();数据库消费者实现
php
<?php
class DatabaseConsumer
{
private PDO $db;
private $rabbitChannel;
private $rabbitConnection;
public function __construct(PDO $db, AMQPStreamConnection $rabbitConnection)
{
$this->db = $db;
$this->rabbitConnection = $rabbitConnection;
$this->rabbitChannel = $rabbitConnection->channel();
}
public function consumeOrderEvents(string $queue): void
{
$this->rabbitChannel->basic_qos(0, 10, false);
$this->rabbitChannel->basic_consume(
$queue,
'',
false,
false,
false,
false,
function ($message) {
$this->handleOrderEvent($message);
}
);
while ($this->rabbitChannel->is_consuming()) {
$this->rabbitChannel->wait();
}
}
private function handleOrderEvent($message): void
{
try {
$data = json_decode($message->getBody(), true);
$this->db->beginTransaction();
$eventType = $message->get('application_headers')['event_type'] ?? '';
switch ($eventType) {
case 'OrderCreated':
$this->handleOrderCreated($data);
break;
case 'OrderUpdated':
$this->handleOrderUpdated($data);
break;
case 'OrderDeleted':
$this->handleOrderDeleted($data);
break;
default:
$this->handleUnknownEvent($eventType, $data);
}
$this->db->commit();
$message->ack();
} catch (Exception $e) {
$this->db->rollBack();
$message->nack(false, true);
error_log("处理订单事件失败: " . $e->getMessage());
}
}
private function handleOrderCreated(array $data): void
{
$sql = "
INSERT INTO orders (id, customer_id, amount, status, created_at)
VALUES (?, ?, ?, ?, ?)
ON DUPLICATE KEY UPDATE
customer_id = VALUES(customer_id),
amount = VALUES(amount),
status = VALUES(status)
";
$stmt = $this->db->prepare($sql);
$stmt->execute([
$data['orderId'],
$data['customerId'],
$data['amount'],
$data['status'],
$data['createdAt'],
]);
$this->updateOrderSummary($data['customerId']);
}
private function handleOrderUpdated(array $data): void
{
$sql = "
UPDATE orders
SET customer_id = ?, amount = ?, status = ?, updated_at = ?
WHERE id = ?
";
$stmt = $this->db->prepare($sql);
$stmt->execute([
$data['customerId'],
$data['amount'],
$data['status'],
$data['updatedAt'],
$data['orderId'],
]);
}
private function handleOrderDeleted(array $data): void
{
$sql = "DELETE FROM orders WHERE id = ?";
$stmt = $this->db->prepare($sql);
$stmt->execute([$data['orderId']]);
}
private function handleUnknownEvent(string $eventType, array $data): void
{
error_log("未知事件类型: {$eventType}");
}
private function updateOrderSummary(string $customerId): void
{
$sql = "
INSERT INTO customer_order_summary (customer_id, total_orders, total_amount)
SELECT customer_id, COUNT(*), SUM(amount)
FROM orders
WHERE customer_id = ?
ON DUPLICATE KEY UPDATE
total_orders = VALUES(total_orders),
total_amount = VALUES(total_amount)
";
$stmt = $this->db->prepare($sql);
$stmt->execute([$customerId]);
}
public function close(): void
{
$this->rabbitChannel->close();
$this->rabbitConnection->close();
}
}
// 使用示例
$db = DatabaseConfig::getMySQLConnection();
$rabbit = RabbitMQConfig::getConnection();
$consumer = new DatabaseConsumer($db, $rabbit);
$consumer->consumeOrderEvents('order.events');
$consumer->close();批量数据同步实现
php
<?php
class BatchDataSync
{
private PDO $sourceDb;
private PDO $targetDb;
private $rabbitChannel;
private $rabbitConnection;
public function __construct(
PDO $sourceDb,
PDO $targetDb,
AMQPStreamConnection $rabbitConnection
) {
$this->sourceDb = $sourceDb;
$this->targetDb = $targetDb;
$this->rabbitConnection = $rabbitConnection;
$this->rabbitChannel = $rabbitConnection->channel();
}
public function syncTable(
string $table,
string $primaryKey = 'id',
int $batchSize = 1000
): int {
$offset = 0;
$totalSynced = 0;
while (true) {
$sql = "SELECT * FROM {$table} ORDER BY {$primaryKey} LIMIT ? OFFSET ?";
$stmt = $this->sourceDb->prepare($sql);
$stmt->execute([$batchSize, $offset]);
$rows = $stmt->fetchAll();
if (empty($rows)) {
break;
}
$synced = $this->syncBatch($table, $rows);
$totalSynced += $synced;
$this->publishSyncEvent($table, $synced, $offset);
$offset += $batchSize;
}
return $totalSynced;
}
private function syncBatch(string $table, array $rows): int
{
$synced = 0;
$this->targetDb->beginTransaction();
try {
foreach ($rows as $row) {
$this->upsertRow($table, $row);
$synced++;
}
$this->targetDb->commit();
} catch (Exception $e) {
$this->targetDb->rollBack();
throw $e;
}
return $synced;
}
private function upsertRow(string $table, array $row): void
{
$columns = array_keys($row);
$placeholders = implode(',', array_fill(0, count($columns), '?'));
$columnList = implode(',', $columns);
$updateClauses = [];
foreach ($columns as $column) {
$updateClauses[] = "{$column} = VALUES({$column})";
}
$updateClause = implode(',', $updateClauses);
$sql = "
INSERT INTO {$table} ({$columnList})
VALUES ({$placeholders})
ON DUPLICATE KEY UPDATE {$updateClause}
";
$stmt = $this->targetDb->prepare($sql);
$stmt->execute(array_values($row));
}
private function publishSyncEvent(string $table, int $count, int $offset): void
{
$event = [
'table' => $table,
'recordsSynced' => $count,
'offset' => $offset,
'timestamp' => date('c'),
];
$message = new AMQPMessage(
json_encode($event),
[
'content_type' => 'application/json',
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
]
);
$this->rabbitChannel->basic_publish(
$message,
'sync.events',
'sync.completed'
);
}
public function close(): void
{
$this->rabbitChannel->close();
$this->rabbitConnection->close();
}
}
// 使用示例
$sourceDb = DatabaseConfig::getMySQLConnection();
$targetDb = DatabaseConfig::getPostgreSQLConnection();
$rabbit = RabbitMQConfig::getConnection();
$sync = new BatchDataSync($sourceDb, $targetDb, $rabbit);
$totalSynced = $sync->syncTable('orders', 'id', 1000);
echo "同步完成: {$totalSynced} 条记录\n";
$sync->close();实际应用场景
场景一:订单状态同步
php
<?php
class OrderStatusSync
{
private PDO $db;
private $rabbitChannel;
public function __construct(PDO $db, AMQPStreamConnection $rabbit)
{
$this->db = $db;
$this->rabbitChannel = $rabbit->channel();
$this->rabbitChannel->exchange_declare(
'order.status',
'direct',
false,
true,
false
);
}
public function updateStatus(string $orderId, string $newStatus): bool
{
try {
$this->db->beginTransaction();
$sql = "SELECT status FROM orders WHERE id = ? FOR UPDATE";
$stmt = $this->db->prepare($sql);
$stmt->execute([$orderId]);
$order = $stmt->fetch();
if (!$order) {
throw new Exception("订单不存在: {$orderId}");
}
$oldStatus = $order['status'];
if (!$this->isValidTransition($oldStatus, $newStatus)) {
throw new Exception(
"无效的状态转换: {$oldStatus} -> {$newStatus}"
);
}
$sql = "UPDATE orders SET status = ?, updated_at = NOW() WHERE id = ?";
$stmt = $this->db->prepare($sql);
$stmt->execute([$newStatus, $orderId]);
$this->publishStatusEvent($orderId, $oldStatus, $newStatus);
$this->db->commit();
return true;
} catch (Exception $e) {
$this->db->rollBack();
throw $e;
}
}
private function isValidTransition(string $from, string $to): bool
{
$transitions = [
'PENDING' => ['CONFIRMED', 'CANCELLED'],
'CONFIRMED' => ['PROCESSING', 'CANCELLED'],
'PROCESSING' => ['SHIPPED', 'FAILED'],
'SHIPPED' => ['DELIVERED', 'RETURNED'],
'DELIVERED' => ['RETURNED'],
'RETURNED' => ['REFUNDED'],
'CANCELLED' => [],
'FAILED' => ['PENDING'],
'REFUNDED' => [],
];
return in_array($to, $transitions[$from] ?? []);
}
private function publishStatusEvent(
string $orderId,
string $oldStatus,
string $newStatus
): void {
$event = [
'orderId' => $orderId,
'oldStatus' => $oldStatus,
'newStatus' => $newStatus,
'timestamp' => date('c'),
];
$message = new AMQPMessage(
json_encode($event),
[
'content_type' => 'application/json',
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
]
);
$this->rabbitChannel->basic_publish(
$message,
'order.status',
"order.status.{$newStatus}"
);
}
}场景二:库存同步
php
<?php
class InventorySync
{
private PDO $db;
private $rabbitChannel;
public function __construct(PDO $db, AMQPStreamConnection $rabbit)
{
$this->db = $db;
$this->rabbitChannel = $rabbit->channel();
}
public function reserveInventory(string $orderId, array $items): bool
{
try {
$this->db->beginTransaction();
foreach ($items as $item) {
$sql = "
UPDATE inventory
SET quantity = quantity - ?,
reserved = reserved + ?,
updated_at = NOW()
WHERE product_id = ? AND quantity >= ?
";
$stmt = $this->db->prepare($sql);
$stmt->execute([
$item['quantity'],
$item['quantity'],
$item['productId'],
$item['quantity']
]);
if ($stmt->rowCount() === 0) {
throw new Exception(
"库存不足: {$item['productId']}"
);
}
$this->recordReservation($orderId, $item);
}
$this->publishInventoryEvent($orderId, $items, 'reserved');
$this->db->commit();
return true;
} catch (Exception $e) {
$this->db->rollBack();
throw $e;
}
}
public function releaseInventory(string $orderId): bool
{
try {
$this->db->beginTransaction();
$sql = "
SELECT product_id, quantity
FROM inventory_reservations
WHERE order_id = ? AND status = 'reserved'
";
$stmt = $this->db->prepare($sql);
$stmt->execute([$orderId]);
$reservations = $stmt->fetchAll();
foreach ($reservations as $reservation) {
$sql = "
UPDATE inventory
SET quantity = quantity + ?,
reserved = reserved - ?,
updated_at = NOW()
WHERE product_id = ?
";
$stmt = $this->db->prepare($sql);
$stmt->execute([
$reservation['quantity'],
$reservation['quantity'],
$reservation['product_id']
]);
}
$sql = "
UPDATE inventory_reservations
SET status = 'released', released_at = NOW()
WHERE order_id = ? AND status = 'reserved'
";
$stmt = $this->db->prepare($sql);
$stmt->execute([$orderId]);
$this->publishInventoryEvent($orderId, $reservations, 'released');
$this->db->commit();
return true;
} catch (Exception $e) {
$this->db->rollBack();
throw $e;
}
}
private function recordReservation(string $orderId, array $item): void
{
$sql = "
INSERT INTO inventory_reservations
(order_id, product_id, quantity, status, created_at)
VALUES (?, ?, ?, 'reserved', NOW())
";
$stmt = $this->db->prepare($sql);
$stmt->execute([
$orderId,
$item['productId'],
$item['quantity']
]);
}
private function publishInventoryEvent(
string $orderId,
array $items,
string $action
): void {
$event = [
'orderId' => $orderId,
'action' => $action,
'items' => $items,
'timestamp' => date('c'),
];
$message = new AMQPMessage(
json_encode($event),
[
'content_type' => 'application/json',
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
]
);
$this->rabbitChannel->basic_publish(
$message,
'inventory.events',
"inventory.{$action}"
);
}
}常见问题与解决方案
问题一:消息与数据库事务不一致
症状: 消息发送成功但数据库事务回滚,或数据库事务成功但消息发送失败
解决方案: 使用 Outbox Pattern
php
$outbox->executeInTransaction(
function (PDO $db) {
// 数据库操作
},
[
// 事件列表
]
);问题二:消息重复消费
症状: 同一条消息被多次处理
解决方案: 实现幂等性
php
private function isProcessed(string $messageId): bool
{
$sql = "SELECT COUNT(*) FROM processed_messages WHERE message_id = ?";
$stmt = $this->db->prepare($sql);
$stmt->execute([$messageId]);
return $stmt->fetchColumn() > 0;
}
private function markAsProcessed(string $messageId): void
{
$sql = "INSERT INTO processed_messages (message_id, processed_at) VALUES (?, NOW())";
$stmt = $this->db->prepare($sql);
$stmt->execute([$messageId]);
}问题三:批量操作性能问题
症状: 大批量数据同步时性能下降
解决方案: 使用批量插入和事务优化
php
private function batchInsert(string $table, array $rows): void
{
$columns = array_keys($rows[0]);
$placeholders = '(' . implode(',', array_fill(0, count($columns), '?')) . ')';
$allPlaceholders = implode(',', array_fill(0, count($rows), $placeholders));
$sql = "INSERT INTO {$table} (" . implode(',', $columns) . ") VALUES {$allPlaceholders}";
$values = [];
foreach ($rows as $row) {
$values = array_merge($values, array_values($row));
}
$stmt = $this->db->prepare($sql);
$stmt->execute($values);
}最佳实践建议
1. 连接池管理
php
class ConnectionPool
{
private static ?PDO $dbConnection = null;
private static ?AMQPStreamConnection $rabbitConnection = null;
public static function getDbConnection(): PDO
{
if (self::$dbConnection === null) {
self::$dbConnection = DatabaseConfig::getMySQLConnection();
}
return self::$dbConnection;
}
public static function getRabbitConnection(): AMQPStreamConnection
{
if (self::$rabbitConnection === null) {
self::$rabbitConnection = RabbitMQConfig::getConnection();
}
return self::$rabbitConnection;
}
}2. 错误处理
php
class ErrorHandler
{
public static function handle(Exception $e, array $context = []): void
{
error_log(sprintf(
"[%s] %s in %s:%d\nContext: %s",
date('Y-m-d H:i:s'),
$e->getMessage(),
$e->getFile(),
$e->getLine(),
json_encode($context)
));
}
}3. 监控指标
php
class Metrics
{
public static function record(string $metric, float $value, array $tags = []): void
{
// 发送到监控系统
}
public static function increment(string $counter, array $tags = []): void
{
// 增加计数器
}
}版本兼容性
| PHP | PDO MySQL | PDO PostgreSQL | php-amqplib | RabbitMQ Server |
|---|---|---|---|---|
| 8.2+ | 8.2+ | 8.2+ | 3.x | 3.11+ |
| 8.1+ | 8.1+ | 8.1+ | 3.x | 3.10+ |
| 8.0+ | 8.0+ | 8.0+ | 2.x/3.x | 3.9+ |
| 7.4+ | 7.4+ | 7.4+ | 2.x | 3.8+ |
