Appearance
分布式事务处理
概述
在微服务架构中,分布式事务是一个核心挑战。RabbitMQ 可以作为分布式事务的协调者,通过消息驱动的方式实现最终一致性,避免传统两阶段提交的性能问题和复杂度。
业务背景与需求
场景描述
某电商平台的下单流程涉及多个服务:
| 服务 | 操作 | 数据库 | 特点 |
|---|---|---|---|
| 订单服务 | 创建订单 | MySQL | 主业务入口 |
| 库存服务 | 扣减库存 | MySQL | 强一致性要求 |
| 积分服务 | 增加积分 | MySQL | 可异步处理 |
| 优惠券服务 | 使用优惠券 | MySQL | 可异步处理 |
| 支付服务 | 创建支付单 | MySQL | 需要事务保证 |
痛点分析
传统分布式事务问题:
1. 两阶段提交(2PC)性能差,锁资源时间长
2. Saga 模式实现复杂,需要补偿逻辑
3. 跨服务事务难以协调
4. 网络分区时数据不一致需求目标
| 目标 | 指标 |
|---|---|
| 一致性 | 最终一致性,数据不丢失 |
| 可用性 | 服务可用性 99.9% |
| 性能 | 事务处理时间 < 500ms |
| 可靠性 | 消息可靠性 99.99% |
架构设计
整体架构图
mermaid
graph TB
subgraph "客户端"
A[用户请求]
end
subgraph "事务协调层"
B[事务管理器]
C[事务日志]
D[补偿管理器]
end
subgraph "RabbitMQ"
E[事务交换机<br/>tx.exchange]
subgraph "事务队列"
F[事务命令队列<br/>tx.command]
G[事务确认队列<br/>tx.confirm]
H[事务回滚队列<br/>tx.rollback]
end
I[死信队列<br/>tx.dlq]
end
subgraph "参与服务"
J[订单服务]
K[库存服务]
L[积分服务]
M[优惠券服务]
end
subgraph "状态存储"
N[事务状态表]
O[消息日志表]
end
A --> B
B --> C
B --> E
E --> F
E --> G
E --> H
F --> J
F --> K
F --> L
F --> M
G --> B
H --> D
D --> E
J --> N
K --> N
L --> N
M --> N
B --> O
F -.-> I
G -.-> I
H -.-> I事务流程图
mermaid
sequenceDiagram
participant Client as 客户端
participant TM as 事务管理器
participant MQ as RabbitMQ
participant Order as 订单服务
participant Inventory as 库存服务
participant Points as 积分服务
Client->>TM: 发起分布式事务
TM->>TM: 生成事务ID
TM->>TM: 记录事务日志(pending)
par 并行发送事务命令
TM->>MQ: 发送订单创建命令
TM->>MQ: 发送库存扣减命令
TM->>MQ: 发送积分增加命令
end
MQ->>Order: 投递订单命令
Order->>Order: 执行本地事务
alt 成功
Order-->>MQ: 发送确认
else 失败
Order-->>MQ: 发送回滚请求
end
MQ->>Inventory: 投递库存命令
Inventory->>Inventory: 执行本地事务
alt 成功
Inventory-->>MQ: 发送确认
else 失败
Inventory-->>MQ: 发送回滚请求
end
MQ->>Points: 投递积分命令
Points->>Points: 执行本地事务
Points-->>MQ: 发送确认
MQ-->>TM: 汇总事务结果
alt 全部成功
TM->>TM: 更新事务状态(committed)
TM-->>Client: 返回成功
else 有失败
TM->>TM: 更新事务状态(rollback)
TM->>MQ: 发送回滚命令
TM-->>Client: 返回失败
end事务状态机
mermaid
stateDiagram-v2
[*] --> pending: 创建事务
pending --> executing: 开始执行
executing --> committed: 全部成功
executing --> rollback: 有失败
executing --> timeout: 超时
rollback --> rolledback: 回滚完成
timeout --> compensating: 补偿执行
compensating --> compensated: 补偿完成
committed --> [*]
rolledback --> [*]
compensated --> [*]
note right of pending: 等待执行
note right of executing: 执行中
note right of committed: 已提交
note right of rollback: 需要回滚PHP 代码实现
分布式事务管理器
php
<?php
namespace App\DistributedTx;
class TransactionManager
{
private TransactionLogRepository $logRepository;
private MessagePublisher $publisher;
private CompensationManager $compensationManager;
private array $config;
public function __construct(
TransactionLogRepository $logRepository,
MessagePublisher $publisher,
CompensationManager $compensationManager,
array $config = []
) {
$this->logRepository = $logRepository;
$this->publisher = $publisher;
$this->compensationManager = $compensationManager;
$this->config = array_merge([
'timeout' => 30,
'max_retry' => 3,
], $config);
}
public function begin(array $participants, array $payload): Transaction
{
$transaction = new Transaction(
$this->generateTransactionId(),
$participants,
$payload
);
$this->logRepository->create([
'transaction_id' => $transaction->id,
'status' => 'pending',
'participants' => json_encode($participants),
'payload' => json_encode($payload),
'created_at' => date('Y-m-d H:i:s'),
]);
return $transaction;
}
public function commit(Transaction $transaction): TransactionResult
{
$this->logRepository->update($transaction->id, [
'status' => 'executing',
'started_at' => date('Y-m-d H:i:s'),
]);
$commands = $this->buildCommands($transaction);
foreach ($commands as $command) {
$this->publisher->publishTransactionCommand(
$transaction->id,
$command
);
}
$result = $this->waitForCompletion($transaction);
if ($result->isSuccess()) {
$this->logRepository->update($transaction->id, [
'status' => 'committed',
'completed_at' => date('Y-m-d H:i:s'),
]);
} else {
$this->rollback($transaction, $result);
}
return $result;
}
public function rollback(Transaction $transaction, TransactionResult $result): void
{
$this->logRepository->update($transaction->id, [
'status' => 'rollback',
'error' => $result->getError(),
]);
$this->compensationManager->compensate($transaction, $result);
}
private function buildCommands(Transaction $transaction): array
{
$commands = [];
foreach ($transaction->participants as $participant) {
$commands[] = new TransactionCommand(
$transaction->id,
$participant['service'],
$participant['action'],
$participant['payload'] ?? $transaction->payload,
$participant['compensate_action'] ?? null
);
}
return $commands;
}
private function waitForCompletion(Transaction $transaction): TransactionResult
{
$startTime = time();
$timeout = $this->config['timeout'];
while (true) {
$status = $this->logRepository->getParticipantStatus($transaction->id);
if ($this->allCompleted($status)) {
return $this->buildResult($status);
}
if (time() - $startTime > $timeout) {
return TransactionResult::timeout($status);
}
usleep(100000);
}
}
private function allCompleted(array $status): bool
{
foreach ($status as $participantStatus) {
if (!in_array($participantStatus, ['committed', 'failed'])) {
return false;
}
}
return true;
}
private function buildResult(array $status): TransactionResult
{
$allSuccess = true;
$errors = [];
foreach ($status as $service => $participantStatus) {
if ($participantStatus === 'failed') {
$allSuccess = false;
$errors[$service] = 'Transaction failed';
}
}
return $allSuccess
? TransactionResult::success()
: TransactionResult::failed($errors);
}
private function generateTransactionId(): string
{
return sprintf('tx_%s_%s', date('YmdHis'), bin2hex(random_bytes(8)));
}
}事务消息发布器
php
<?php
namespace App\DistributedTx;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Exchange\AMQPExchangeType;
class MessagePublisher
{
private AMQPStreamConnection $connection;
private $channel;
private string $exchangeName = 'tx.exchange';
private string $confirmExchange = 'tx.confirm.exchange';
private string $rollbackExchange = 'tx.rollback.exchange';
public function __construct(AMQPStreamConnection $connection)
{
$this->connection = $connection;
$this->channel = $connection->channel();
$this->setupInfrastructure();
}
private function setupInfrastructure(): void
{
$this->channel->exchange_declare(
$this->exchangeName,
AMQPExchangeType::TOPIC,
false,
true,
false
);
$this->channel->exchange_declare(
$this->confirmExchange,
AMQPExchangeType::DIRECT,
false,
true,
false
);
$this->channel->exchange_declare(
$this->rollbackExchange,
AMQPExchangeType::DIRECT,
false,
true,
false
);
$services = ['order', 'inventory', 'points', 'coupon', 'payment'];
foreach ($services as $service) {
$args = [
'x-dead-letter-exchange' => ['S', 'tx.dlx'],
'x-message-ttl' => ['I', 300000],
];
$this->channel->queue_declare(
"tx.{$service}.command",
false,
true,
false,
false,
false,
$args
);
$this->channel->queue_bind(
"tx.{$service}.command",
$this->exchangeName,
"tx.command.{$service}"
);
}
$this->channel->queue_declare('tx.confirm', false, true, false, false);
$this->channel->queue_bind('tx.confirm', $this->confirmExchange, 'tx.confirm');
$this->channel->queue_declare('tx.rollback', false, true, false, false);
$this->channel->queue_bind('tx.rollback', $this->rollbackExchange, 'tx.rollback');
$this->channel->queue_declare('tx.dlq', false, true, false, false);
$this->channel->exchange_declare('tx.dlx', AMQPExchangeType::DIRECT, false, true, false);
$this->channel->queue_bind('tx.dlq', 'tx.dlx', 'tx.failed');
}
public function publishTransactionCommand(string $transactionId, TransactionCommand $command): void
{
$routingKey = "tx.command.{$command->service}";
$message = new AMQPMessage(
json_encode([
'transaction_id' => $transactionId,
'command_id' => $command->id,
'service' => $command->service,
'action' => $command->action,
'payload' => $command->payload,
'compensate_action' => $command->compensateAction,
'timestamp' => time(),
]),
[
'content_type' => 'application/json',
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
'message_id' => $command->id,
'correlation_id' => $transactionId,
]
);
$this->channel->basic_publish(
$message,
$this->exchangeName,
$routingKey
);
}
public function publishConfirm(string $transactionId, string $service, string $commandId, bool $success, array $result = []): void
{
$message = new AMQPMessage(
json_encode([
'transaction_id' => $transactionId,
'command_id' => $commandId,
'service' => $service,
'success' => $success,
'result' => $result,
'timestamp' => time(),
]),
[
'content_type' => 'application/json',
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
'correlation_id' => $transactionId,
]
);
$this->channel->basic_publish(
$message,
$this->confirmExchange,
'tx.confirm'
);
}
public function publishRollback(string $transactionId, array $commands): void
{
$message = new AMQPMessage(
json_encode([
'transaction_id' => $transactionId,
'commands' => $commands,
'timestamp' => time(),
]),
[
'content_type' => 'application/json',
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
'correlation_id' => $transactionId,
]
);
$this->channel->basic_publish(
$message,
$this->rollbackExchange,
'tx.rollback'
);
}
public function close(): void
{
if ($this->channel) {
$this->channel->close();
}
}
}事务参与者实现
php
<?php
namespace App\DistributedTx;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
abstract class TransactionParticipant
{
protected AMQPStreamConnection $connection;
protected $channel;
protected MessagePublisher $publisher;
protected TransactionLogRepository $logRepository;
protected string $serviceName;
protected string $queueName;
protected bool $running = true;
public function __construct(
AMQPStreamConnection $connection,
MessagePublisher $publisher,
TransactionLogRepository $logRepository
) {
$this->connection = $connection;
$this->channel = $connection->channel();
$this->publisher = $publisher;
$this->logRepository = $logRepository;
$this->queueName = "tx.{$this->serviceName}.command";
}
public function listen(): void
{
$this->channel->basic_qos(null, 1, null);
$this->channel->basic_consume(
$this->queueName,
'',
false,
false,
false,
false,
[$this, 'handleCommand']
);
while ($this->running && count($this->channel->callbacks)) {
$this->channel->wait(null, true);
if (!$this->running) break;
usleep(100000);
}
}
public function handleCommand(AMQPMessage $message): void
{
$data = json_decode($message->body, true);
$transactionId = $data['transaction_id'];
$commandId = $data['command_id'];
$action = $data['action'];
$payload = $data['payload'];
$this->logRepository->recordParticipantStatus(
$transactionId,
$this->serviceName,
'executing'
);
try {
$result = $this->execute($action, $payload, $transactionId);
$this->logRepository->recordParticipantStatus(
$transactionId,
$this->serviceName,
'committed',
$result
);
$this->publisher->publishConfirm(
$transactionId,
$this->serviceName,
$commandId,
true,
$result
);
$message->ack();
} catch (\Exception $e) {
$this->logRepository->recordParticipantStatus(
$transactionId,
$this->serviceName,
'failed',
['error' => $e->getMessage()]
);
$this->publisher->publishConfirm(
$transactionId,
$this->serviceName,
$commandId,
false,
['error' => $e->getMessage()]
);
$message->ack();
}
}
abstract protected function execute(string $action, array $payload, string $transactionId): array;
abstract protected function compensate(string $action, array $payload, string $transactionId): array;
public function stop(): void
{
$this->running = false;
}
}
class OrderParticipant extends TransactionParticipant
{
protected string $serviceName = 'order';
private OrderRepository $orderRepository;
public function __construct(
AMQPStreamConnection $connection,
MessagePublisher $publisher,
TransactionLogRepository $logRepository,
OrderRepository $orderRepository
) {
parent::__construct($connection, $publisher, $logRepository);
$this->orderRepository = $orderRepository;
}
protected function execute(string $action, array $payload, string $transactionId): array
{
switch ($action) {
case 'create':
return $this->createOrder($payload, $transactionId);
case 'update_status':
return $this->updateStatus($payload, $transactionId);
default:
throw new \InvalidArgumentException("Unknown action: {$action}");
}
}
protected function compensate(string $action, array $payload, string $transactionId): array
{
switch ($action) {
case 'create':
return $this->cancelOrder($payload, $transactionId);
default:
return [];
}
}
private function createOrder(array $payload, string $transactionId): array
{
$orderData = [
'order_id' => $payload['order_id'],
'user_id' => $payload['user_id'],
'items' => $payload['items'],
'total_amount' => $payload['total_amount'],
'status' => 'pending',
'transaction_id' => $transactionId,
'created_at' => date('Y-m-d H:i:s'),
];
$this->orderRepository->create($orderData);
return ['order_id' => $payload['order_id']];
}
private function updateStatus(array $payload, string $transactionId): array
{
$this->orderRepository->update($payload['order_id'], [
'status' => $payload['status'],
'updated_at' => date('Y-m-d H:i:s'),
]);
return ['status' => $payload['status']];
}
private function cancelOrder(array $payload, string $transactionId): array
{
$this->orderRepository->update($payload['order_id'], [
'status' => 'cancelled',
'cancel_reason' => 'Transaction rollback',
'updated_at' => date('Y-m-d H:i:s'),
]);
return ['status' => 'cancelled'];
}
}
class InventoryParticipant extends TransactionParticipant
{
protected string $serviceName = 'inventory';
private InventoryRepository $inventoryRepository;
public function __construct(
AMQPStreamConnection $connection,
MessagePublisher $publisher,
TransactionLogRepository $logRepository,
InventoryRepository $inventoryRepository
) {
parent::__construct($connection, $publisher, $logRepository);
$this->inventoryRepository = $inventoryRepository;
}
protected function execute(string $action, array $payload, string $transactionId): array
{
switch ($action) {
case 'deduct':
return $this->deductInventory($payload, $transactionId);
case 'reserve':
return $this->reserveInventory($payload, $transactionId);
default:
throw new \InvalidArgumentException("Unknown action: {$action}");
}
}
protected function compensate(string $action, array $payload, string $transactionId): array
{
switch ($action) {
case 'deduct':
return $this->restoreInventory($payload, $transactionId);
case 'reserve':
return $this->releaseReservation($payload, $transactionId);
default:
return [];
}
}
private function deductInventory(array $payload, string $transactionId): array
{
$productId = $payload['product_id'];
$skuId = $payload['sku_id'];
$quantity = $payload['quantity'];
$stock = $this->inventoryRepository->getStock($productId, $skuId);
if ($stock < $quantity) {
throw new \RuntimeException("Insufficient inventory: {$productId}");
}
$this->inventoryRepository->deduct($productId, $skuId, $quantity, $transactionId);
return ['remaining_stock' => $stock - $quantity];
}
private function reserveInventory(array $payload, string $transactionId): array
{
$productId = $payload['product_id'];
$skuId = $payload['sku_id'];
$quantity = $payload['quantity'];
$reserved = $this->inventoryRepository->reserve(
$productId,
$skuId,
$quantity,
$transactionId
);
if (!$reserved) {
throw new \RuntimeException("Failed to reserve inventory: {$productId}");
}
return ['reserved' => $quantity];
}
private function restoreInventory(array $payload, string $transactionId): array
{
$this->inventoryRepository->restore(
$payload['product_id'],
$payload['sku_id'],
$payload['quantity'],
$transactionId
);
return ['restored' => $payload['quantity']];
}
private function releaseReservation(array $payload, string $transactionId): array
{
$this->inventoryRepository->releaseReservation(
$payload['product_id'],
$payload['sku_id'],
$payload['quantity'],
$transactionId
);
return ['released' => $payload['quantity']];
}
}补偿管理器实现
php
<?php
namespace App\DistributedTx;
class CompensationManager
{
private TransactionLogRepository $logRepository;
private MessagePublisher $publisher;
private array $participantRegistry;
public function __construct(
TransactionLogRepository $logRepository,
MessagePublisher $publisher,
array $participantRegistry = []
) {
$this->logRepository = $logRepository;
$this->publisher = $publisher;
$this->participantRegistry = $participantRegistry;
}
public function compensate(Transaction $transaction, TransactionResult $result): void
{
$executedCommands = $this->logRepository->getExecutedCommands($transaction->id);
$compensateCommands = [];
foreach (array_reverse($executedCommands) as $command) {
if ($command['status'] === 'committed') {
$compensateCommands[] = [
'service' => $command['service'],
'action' => $command['compensate_action'] ?? $this->getCompensateAction($command['action']),
'payload' => $command['payload'],
'original_command_id' => $command['command_id'],
];
}
}
if (!empty($compensateCommands)) {
$this->publisher->publishRollback($transaction->id, $compensateCommands);
}
$this->logRepository->update($transaction->id, [
'status' => 'rolledback',
'completed_at' => date('Y-m-d H:i:s'),
]);
}
private function getCompensateAction(string $action): string
{
$compensateMap = [
'create' => 'cancel',
'deduct' => 'restore',
'reserve' => 'release',
'add' => 'subtract',
'use' => 'release',
];
return $compensateMap[$action] ?? 'compensate_' . $action;
}
}本地消息表实现
php
<?php
namespace App\DistributedTx;
class LocalMessageTable
{
private $pdo;
public function __construct(\PDO $pdo)
{
$this->pdo = $pdo;
$this->initTable();
}
private function initTable(): void
{
$sql = "
CREATE TABLE IF NOT EXISTS transaction_outbox (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
message_id VARCHAR(64) NOT NULL UNIQUE,
transaction_id VARCHAR(64) NOT NULL,
target_service VARCHAR(64) NOT NULL,
message_type VARCHAR(64) NOT NULL,
payload JSON NOT NULL,
status ENUM('pending', 'sent', 'failed') DEFAULT 'pending',
retry_count INT DEFAULT 0,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
sent_at DATETIME NULL,
INDEX idx_transaction_id (transaction_id),
INDEX idx_status (status)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
";
$this->pdo->exec($sql);
}
public function insert(string $transactionId, string $targetService, string $messageType, array $payload): string
{
$messageId = $this->generateMessageId();
$sql = "
INSERT INTO transaction_outbox
(message_id, transaction_id, target_service, message_type, payload)
VALUES (?, ?, ?, ?, ?)
";
$stmt = $this->pdo->prepare($sql);
$stmt->execute([
$messageId,
$transactionId,
$targetService,
$messageType,
json_encode($payload),
]);
return $messageId;
}
public function markSent(string $messageId): void
{
$sql = "
UPDATE transaction_outbox
SET status = 'sent', sent_at = NOW()
WHERE message_id = ?
";
$stmt = $this->pdo->prepare($sql);
$stmt->execute([$messageId]);
}
public function markFailed(string $messageId): void
{
$sql = "
UPDATE transaction_outbox
SET status = 'failed', retry_count = retry_count + 1
WHERE message_id = ?
";
$stmt = $this->pdo->prepare($sql);
$stmt->execute([$messageId]);
}
public function getPendingMessages(int $limit = 100): array
{
$sql = "
SELECT * FROM transaction_outbox
WHERE status = 'pending' AND retry_count < 3
ORDER BY created_at ASC
LIMIT ?
";
$stmt = $this->pdo->prepare($sql);
$stmt->execute([$limit]);
return $stmt->fetchAll(\PDO::FETCH_ASSOC);
}
public function executeInTransaction(callable $businessLogic, callable $messageInsert): bool
{
try {
$this->pdo->beginTransaction();
$result = $businessLogic();
$messageInsert($result);
$this->pdo->commit();
return true;
} catch (\Exception $e) {
$this->pdo->rollBack();
throw $e;
}
}
private function generateMessageId(): string
{
return sprintf('msg_%s_%s', date('YmdHis'), bin2hex(random_bytes(8)));
}
}完整使用示例
php
<?php
require_once 'vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use App\DistributedTx\{
TransactionManager,
MessagePublisher,
CompensationManager,
TransactionLogRepository,
Transaction
};
use App\DistributedTx\Participants\{OrderParticipant, InventoryParticipant};
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$pdo = new PDO('mysql:host=localhost;dbname=ecommerce', 'root', 'password');
$redis = new Redis();
$redis->connect('localhost', 6379);
$logRepository = new TransactionLogRepository($pdo);
$publisher = new MessagePublisher($connection);
$compensationManager = new CompensationManager($logRepository, $publisher);
$transactionManager = new TransactionManager(
$logRepository,
$publisher,
$compensationManager,
['timeout' => 30]
);
$participants = [
[
'service' => 'order',
'action' => 'create',
'payload' => [
'order_id' => 'ORD123456',
'user_id' => 10001,
'items' => [
['product_id' => 'P001', 'sku_id' => 'SKU001', 'quantity' => 2, 'price' => 99.00],
],
'total_amount' => 198.00,
],
'compensate_action' => 'cancel',
],
[
'service' => 'inventory',
'action' => 'deduct',
'payload' => [
'product_id' => 'P001',
'sku_id' => 'SKU001',
'quantity' => 2,
],
'compensate_action' => 'restore',
],
[
'service' => 'points',
'action' => 'add',
'payload' => [
'user_id' => 10001,
'points' => 198,
'reason' => 'Order reward',
],
'compensate_action' => 'subtract',
],
];
$transaction = $transactionManager->begin($participants, []);
echo "事务已创建: {$transaction->id}\n";
$result = $transactionManager->commit($transaction);
if ($result->isSuccess()) {
echo "事务提交成功\n";
} else {
echo "事务提交失败: " . json_encode($result->getErrors()) . "\n";
}
$orderParticipant = new OrderParticipant(
$connection,
$publisher,
$logRepository,
new OrderRepository($pdo)
);
echo "启动订单服务参与者...\n";
pcntl_async_signals(true);
pcntl_signal(SIGTERM, function () use ($orderParticipant) {
echo "收到终止信号\n";
$orderParticipant->stop();
});
pcntl_signal(SIGINT, function () use ($orderParticipant) {
echo "收到中断信号\n";
$orderParticipant->stop();
});
$orderParticipant->listen();
$publisher->close();
$connection->close();关键技术点解析
1. 最终一致性
通过消息驱动实现最终一致性,而非强一致性:
- 每个参与者维护本地事务状态
- 通过消息确认机制保证消息可靠投递
- 补偿机制处理失败场景
2. 幂等性保证
php
public function handleCommand(AMQPMessage $message): void
{
$commandId = $data['command_id'];
if ($this->isProcessed($commandId)) {
$message->ack();
return;
}
$this->execute($action, $payload);
$this->markProcessed($commandId);
}3. 本地消息表
将业务操作和消息发送放在同一本地事务中:
php
$this->pdo->beginTransaction();
$businessResult = $this->doBusiness();
$this->insertOutboxMessage($businessResult);
$this->pdo->commit();4. 补偿事务
当事务失败时,执行逆向操作恢复状态:
php
$compensateMap = [
'create' => 'cancel',
'deduct' => 'restore',
'reserve' => 'release',
];性能优化建议
| 优化项 | 建议 | 说明 |
|---|---|---|
| 并行执行 | 参与者并行处理 | 减少总耗时 |
| 异步确认 | 异步等待确认 | 提高吞吐量 |
| 批量处理 | 合并小事务 | 减少网络开销 |
| 缓存状态 | Redis 缓存事务状态 | 减少数据库压力 |
常见问题与解决方案
1. 事务超时
问题: 参与者响应慢导致事务超时
解决方案:
- 设置合理的超时时间
- 实现超时后的补偿机制
- 监控参与者响应时间
2. 消息丢失
问题: 消息在传输过程中丢失
解决方案:
- 使用消息持久化
- 实现本地消息表
- 定时扫描重发
3. 补偿失败
问题: 补偿操作执行失败
解决方案:
- 记录补偿日志
- 实现重试机制
- 人工介入处理
4. 重复消费
问题: 消息被重复消费
解决方案:
- 实现幂等性处理
- 使用唯一事务ID
- Redis 记录处理状态
