Skip to content

分布式事务处理

概述

在微服务架构中,分布式事务是一个核心挑战。RabbitMQ 可以作为分布式事务的协调者,通过消息驱动的方式实现最终一致性,避免传统两阶段提交的性能问题和复杂度。

业务背景与需求

场景描述

某电商平台的下单流程涉及多个服务:

服务操作数据库特点
订单服务创建订单MySQL主业务入口
库存服务扣减库存MySQL强一致性要求
积分服务增加积分MySQL可异步处理
优惠券服务使用优惠券MySQL可异步处理
支付服务创建支付单MySQL需要事务保证

痛点分析

传统分布式事务问题:
1. 两阶段提交(2PC)性能差,锁资源时间长
2. Saga 模式实现复杂,需要补偿逻辑
3. 跨服务事务难以协调
4. 网络分区时数据不一致

需求目标

目标指标
一致性最终一致性,数据不丢失
可用性服务可用性 99.9%
性能事务处理时间 < 500ms
可靠性消息可靠性 99.99%

架构设计

整体架构图

mermaid
graph TB
    subgraph "客户端"
        A[用户请求]
    end
    
    subgraph "事务协调层"
        B[事务管理器]
        C[事务日志]
        D[补偿管理器]
    end
    
    subgraph "RabbitMQ"
        E[事务交换机<br/>tx.exchange]
        
        subgraph "事务队列"
            F[事务命令队列<br/>tx.command]
            G[事务确认队列<br/>tx.confirm]
            H[事务回滚队列<br/>tx.rollback]
        end
        
        I[死信队列<br/>tx.dlq]
    end
    
    subgraph "参与服务"
        J[订单服务]
        K[库存服务]
        L[积分服务]
        M[优惠券服务]
    end
    
    subgraph "状态存储"
        N[事务状态表]
        O[消息日志表]
    end
    
    A --> B
    B --> C
    B --> E
    E --> F
    E --> G
    E --> H
    
    F --> J
    F --> K
    F --> L
    F --> M
    
    G --> B
    H --> D
    
    D --> E
    
    J --> N
    K --> N
    L --> N
    M --> N
    
    B --> O
    
    F -.-> I
    G -.-> I
    H -.-> I

事务流程图

mermaid
sequenceDiagram
    participant Client as 客户端
    participant TM as 事务管理器
    participant MQ as RabbitMQ
    participant Order as 订单服务
    participant Inventory as 库存服务
    participant Points as 积分服务
    
    Client->>TM: 发起分布式事务
    TM->>TM: 生成事务ID
    TM->>TM: 记录事务日志(pending)
    
    par 并行发送事务命令
        TM->>MQ: 发送订单创建命令
        TM->>MQ: 发送库存扣减命令
        TM->>MQ: 发送积分增加命令
    end
    
    MQ->>Order: 投递订单命令
    Order->>Order: 执行本地事务
    alt 成功
        Order-->>MQ: 发送确认
    else 失败
        Order-->>MQ: 发送回滚请求
    end
    
    MQ->>Inventory: 投递库存命令
    Inventory->>Inventory: 执行本地事务
    alt 成功
        Inventory-->>MQ: 发送确认
    else 失败
        Inventory-->>MQ: 发送回滚请求
    end
    
    MQ->>Points: 投递积分命令
    Points->>Points: 执行本地事务
    Points-->>MQ: 发送确认
    
    MQ-->>TM: 汇总事务结果
    
    alt 全部成功
        TM->>TM: 更新事务状态(committed)
        TM-->>Client: 返回成功
    else 有失败
        TM->>TM: 更新事务状态(rollback)
        TM->>MQ: 发送回滚命令
        TM-->>Client: 返回失败
    end

事务状态机

mermaid
stateDiagram-v2
    [*] --> pending: 创建事务
    pending --> executing: 开始执行
    executing --> committed: 全部成功
    executing --> rollback: 有失败
    executing --> timeout: 超时
    rollback --> rolledback: 回滚完成
    timeout --> compensating: 补偿执行
    compensating --> compensated: 补偿完成
    committed --> [*]
    rolledback --> [*]
    compensated --> [*]
    
    note right of pending: 等待执行
    note right of executing: 执行中
    note right of committed: 已提交
    note right of rollback: 需要回滚

PHP 代码实现

分布式事务管理器

php
<?php

namespace App\DistributedTx;

class TransactionManager
{
    private TransactionLogRepository $logRepository;
    private MessagePublisher $publisher;
    private CompensationManager $compensationManager;
    private array $config;

    public function __construct(
        TransactionLogRepository $logRepository,
        MessagePublisher $publisher,
        CompensationManager $compensationManager,
        array $config = []
    ) {
        $this->logRepository = $logRepository;
        $this->publisher = $publisher;
        $this->compensationManager = $compensationManager;
        $this->config = array_merge([
            'timeout' => 30,
            'max_retry' => 3,
        ], $config);
    }

    public function begin(array $participants, array $payload): Transaction
    {
        $transaction = new Transaction(
            $this->generateTransactionId(),
            $participants,
            $payload
        );

        $this->logRepository->create([
            'transaction_id' => $transaction->id,
            'status' => 'pending',
            'participants' => json_encode($participants),
            'payload' => json_encode($payload),
            'created_at' => date('Y-m-d H:i:s'),
        ]);

        return $transaction;
    }

    public function commit(Transaction $transaction): TransactionResult
    {
        $this->logRepository->update($transaction->id, [
            'status' => 'executing',
            'started_at' => date('Y-m-d H:i:s'),
        ]);

        $commands = $this->buildCommands($transaction);
        
        foreach ($commands as $command) {
            $this->publisher->publishTransactionCommand(
                $transaction->id,
                $command
            );
        }

        $result = $this->waitForCompletion($transaction);

        if ($result->isSuccess()) {
            $this->logRepository->update($transaction->id, [
                'status' => 'committed',
                'completed_at' => date('Y-m-d H:i:s'),
            ]);
        } else {
            $this->rollback($transaction, $result);
        }

        return $result;
    }

    public function rollback(Transaction $transaction, TransactionResult $result): void
    {
        $this->logRepository->update($transaction->id, [
            'status' => 'rollback',
            'error' => $result->getError(),
        ]);

        $this->compensationManager->compensate($transaction, $result);
    }

    private function buildCommands(Transaction $transaction): array
    {
        $commands = [];
        
        foreach ($transaction->participants as $participant) {
            $commands[] = new TransactionCommand(
                $transaction->id,
                $participant['service'],
                $participant['action'],
                $participant['payload'] ?? $transaction->payload,
                $participant['compensate_action'] ?? null
            );
        }

        return $commands;
    }

    private function waitForCompletion(Transaction $transaction): TransactionResult
    {
        $startTime = time();
        $timeout = $this->config['timeout'];

        while (true) {
            $status = $this->logRepository->getParticipantStatus($transaction->id);
            
            if ($this->allCompleted($status)) {
                return $this->buildResult($status);
            }

            if (time() - $startTime > $timeout) {
                return TransactionResult::timeout($status);
            }

            usleep(100000);
        }
    }

    private function allCompleted(array $status): bool
    {
        foreach ($status as $participantStatus) {
            if (!in_array($participantStatus, ['committed', 'failed'])) {
                return false;
            }
        }
        return true;
    }

    private function buildResult(array $status): TransactionResult
    {
        $allSuccess = true;
        $errors = [];

        foreach ($status as $service => $participantStatus) {
            if ($participantStatus === 'failed') {
                $allSuccess = false;
                $errors[$service] = 'Transaction failed';
            }
        }

        return $allSuccess
            ? TransactionResult::success()
            : TransactionResult::failed($errors);
    }

    private function generateTransactionId(): string
    {
        return sprintf('tx_%s_%s', date('YmdHis'), bin2hex(random_bytes(8)));
    }
}

事务消息发布器

php
<?php

namespace App\DistributedTx;

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Exchange\AMQPExchangeType;

class MessagePublisher
{
    private AMQPStreamConnection $connection;
    private $channel;
    private string $exchangeName = 'tx.exchange';
    private string $confirmExchange = 'tx.confirm.exchange';
    private string $rollbackExchange = 'tx.rollback.exchange';

    public function __construct(AMQPStreamConnection $connection)
    {
        $this->connection = $connection;
        $this->channel = $connection->channel();
        $this->setupInfrastructure();
    }

    private function setupInfrastructure(): void
    {
        $this->channel->exchange_declare(
            $this->exchangeName,
            AMQPExchangeType::TOPIC,
            false,
            true,
            false
        );

        $this->channel->exchange_declare(
            $this->confirmExchange,
            AMQPExchangeType::DIRECT,
            false,
            true,
            false
        );

        $this->channel->exchange_declare(
            $this->rollbackExchange,
            AMQPExchangeType::DIRECT,
            false,
            true,
            false
        );

        $services = ['order', 'inventory', 'points', 'coupon', 'payment'];
        
        foreach ($services as $service) {
            $args = [
                'x-dead-letter-exchange' => ['S', 'tx.dlx'],
                'x-message-ttl' => ['I', 300000],
            ];

            $this->channel->queue_declare(
                "tx.{$service}.command",
                false,
                true,
                false,
                false,
                false,
                $args
            );

            $this->channel->queue_bind(
                "tx.{$service}.command",
                $this->exchangeName,
                "tx.command.{$service}"
            );
        }

        $this->channel->queue_declare('tx.confirm', false, true, false, false);
        $this->channel->queue_bind('tx.confirm', $this->confirmExchange, 'tx.confirm');

        $this->channel->queue_declare('tx.rollback', false, true, false, false);
        $this->channel->queue_bind('tx.rollback', $this->rollbackExchange, 'tx.rollback');

        $this->channel->queue_declare('tx.dlq', false, true, false, false);
        $this->channel->exchange_declare('tx.dlx', AMQPExchangeType::DIRECT, false, true, false);
        $this->channel->queue_bind('tx.dlq', 'tx.dlx', 'tx.failed');
    }

    public function publishTransactionCommand(string $transactionId, TransactionCommand $command): void
    {
        $routingKey = "tx.command.{$command->service}";

        $message = new AMQPMessage(
            json_encode([
                'transaction_id' => $transactionId,
                'command_id' => $command->id,
                'service' => $command->service,
                'action' => $command->action,
                'payload' => $command->payload,
                'compensate_action' => $command->compensateAction,
                'timestamp' => time(),
            ]),
            [
                'content_type' => 'application/json',
                'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
                'message_id' => $command->id,
                'correlation_id' => $transactionId,
            ]
        );

        $this->channel->basic_publish(
            $message,
            $this->exchangeName,
            $routingKey
        );
    }

    public function publishConfirm(string $transactionId, string $service, string $commandId, bool $success, array $result = []): void
    {
        $message = new AMQPMessage(
            json_encode([
                'transaction_id' => $transactionId,
                'command_id' => $commandId,
                'service' => $service,
                'success' => $success,
                'result' => $result,
                'timestamp' => time(),
            ]),
            [
                'content_type' => 'application/json',
                'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
                'correlation_id' => $transactionId,
            ]
        );

        $this->channel->basic_publish(
            $message,
            $this->confirmExchange,
            'tx.confirm'
        );
    }

    public function publishRollback(string $transactionId, array $commands): void
    {
        $message = new AMQPMessage(
            json_encode([
                'transaction_id' => $transactionId,
                'commands' => $commands,
                'timestamp' => time(),
            ]),
            [
                'content_type' => 'application/json',
                'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
                'correlation_id' => $transactionId,
            ]
        );

        $this->channel->basic_publish(
            $message,
            $this->rollbackExchange,
            'tx.rollback'
        );
    }

    public function close(): void
    {
        if ($this->channel) {
            $this->channel->close();
        }
    }
}

事务参与者实现

php
<?php

namespace App\DistributedTx;

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

abstract class TransactionParticipant
{
    protected AMQPStreamConnection $connection;
    protected $channel;
    protected MessagePublisher $publisher;
    protected TransactionLogRepository $logRepository;
    protected string $serviceName;
    protected string $queueName;
    protected bool $running = true;

    public function __construct(
        AMQPStreamConnection $connection,
        MessagePublisher $publisher,
        TransactionLogRepository $logRepository
    ) {
        $this->connection = $connection;
        $this->channel = $connection->channel();
        $this->publisher = $publisher;
        $this->logRepository = $logRepository;
        $this->queueName = "tx.{$this->serviceName}.command";
    }

    public function listen(): void
    {
        $this->channel->basic_qos(null, 1, null);

        $this->channel->basic_consume(
            $this->queueName,
            '',
            false,
            false,
            false,
            false,
            [$this, 'handleCommand']
        );

        while ($this->running && count($this->channel->callbacks)) {
            $this->channel->wait(null, true);
            if (!$this->running) break;
            usleep(100000);
        }
    }

    public function handleCommand(AMQPMessage $message): void
    {
        $data = json_decode($message->body, true);
        $transactionId = $data['transaction_id'];
        $commandId = $data['command_id'];
        $action = $data['action'];
        $payload = $data['payload'];

        $this->logRepository->recordParticipantStatus(
            $transactionId,
            $this->serviceName,
            'executing'
        );

        try {
            $result = $this->execute($action, $payload, $transactionId);

            $this->logRepository->recordParticipantStatus(
                $transactionId,
                $this->serviceName,
                'committed',
                $result
            );

            $this->publisher->publishConfirm(
                $transactionId,
                $this->serviceName,
                $commandId,
                true,
                $result
            );

            $message->ack();

        } catch (\Exception $e) {
            $this->logRepository->recordParticipantStatus(
                $transactionId,
                $this->serviceName,
                'failed',
                ['error' => $e->getMessage()]
            );

            $this->publisher->publishConfirm(
                $transactionId,
                $this->serviceName,
                $commandId,
                false,
                ['error' => $e->getMessage()]
            );

            $message->ack();
        }
    }

    abstract protected function execute(string $action, array $payload, string $transactionId): array;

    abstract protected function compensate(string $action, array $payload, string $transactionId): array;

    public function stop(): void
    {
        $this->running = false;
    }
}

class OrderParticipant extends TransactionParticipant
{
    protected string $serviceName = 'order';
    private OrderRepository $orderRepository;

    public function __construct(
        AMQPStreamConnection $connection,
        MessagePublisher $publisher,
        TransactionLogRepository $logRepository,
        OrderRepository $orderRepository
    ) {
        parent::__construct($connection, $publisher, $logRepository);
        $this->orderRepository = $orderRepository;
    }

    protected function execute(string $action, array $payload, string $transactionId): array
    {
        switch ($action) {
            case 'create':
                return $this->createOrder($payload, $transactionId);
            case 'update_status':
                return $this->updateStatus($payload, $transactionId);
            default:
                throw new \InvalidArgumentException("Unknown action: {$action}");
        }
    }

    protected function compensate(string $action, array $payload, string $transactionId): array
    {
        switch ($action) {
            case 'create':
                return $this->cancelOrder($payload, $transactionId);
            default:
                return [];
        }
    }

    private function createOrder(array $payload, string $transactionId): array
    {
        $orderData = [
            'order_id' => $payload['order_id'],
            'user_id' => $payload['user_id'],
            'items' => $payload['items'],
            'total_amount' => $payload['total_amount'],
            'status' => 'pending',
            'transaction_id' => $transactionId,
            'created_at' => date('Y-m-d H:i:s'),
        ];

        $this->orderRepository->create($orderData);

        return ['order_id' => $payload['order_id']];
    }

    private function updateStatus(array $payload, string $transactionId): array
    {
        $this->orderRepository->update($payload['order_id'], [
            'status' => $payload['status'],
            'updated_at' => date('Y-m-d H:i:s'),
        ]);

        return ['status' => $payload['status']];
    }

    private function cancelOrder(array $payload, string $transactionId): array
    {
        $this->orderRepository->update($payload['order_id'], [
            'status' => 'cancelled',
            'cancel_reason' => 'Transaction rollback',
            'updated_at' => date('Y-m-d H:i:s'),
        ]);

        return ['status' => 'cancelled'];
    }
}

class InventoryParticipant extends TransactionParticipant
{
    protected string $serviceName = 'inventory';
    private InventoryRepository $inventoryRepository;

    public function __construct(
        AMQPStreamConnection $connection,
        MessagePublisher $publisher,
        TransactionLogRepository $logRepository,
        InventoryRepository $inventoryRepository
    ) {
        parent::__construct($connection, $publisher, $logRepository);
        $this->inventoryRepository = $inventoryRepository;
    }

    protected function execute(string $action, array $payload, string $transactionId): array
    {
        switch ($action) {
            case 'deduct':
                return $this->deductInventory($payload, $transactionId);
            case 'reserve':
                return $this->reserveInventory($payload, $transactionId);
            default:
                throw new \InvalidArgumentException("Unknown action: {$action}");
        }
    }

    protected function compensate(string $action, array $payload, string $transactionId): array
    {
        switch ($action) {
            case 'deduct':
                return $this->restoreInventory($payload, $transactionId);
            case 'reserve':
                return $this->releaseReservation($payload, $transactionId);
            default:
                return [];
        }
    }

    private function deductInventory(array $payload, string $transactionId): array
    {
        $productId = $payload['product_id'];
        $skuId = $payload['sku_id'];
        $quantity = $payload['quantity'];

        $stock = $this->inventoryRepository->getStock($productId, $skuId);
        
        if ($stock < $quantity) {
            throw new \RuntimeException("Insufficient inventory: {$productId}");
        }

        $this->inventoryRepository->deduct($productId, $skuId, $quantity, $transactionId);

        return ['remaining_stock' => $stock - $quantity];
    }

    private function reserveInventory(array $payload, string $transactionId): array
    {
        $productId = $payload['product_id'];
        $skuId = $payload['sku_id'];
        $quantity = $payload['quantity'];

        $reserved = $this->inventoryRepository->reserve(
            $productId,
            $skuId,
            $quantity,
            $transactionId
        );

        if (!$reserved) {
            throw new \RuntimeException("Failed to reserve inventory: {$productId}");
        }

        return ['reserved' => $quantity];
    }

    private function restoreInventory(array $payload, string $transactionId): array
    {
        $this->inventoryRepository->restore(
            $payload['product_id'],
            $payload['sku_id'],
            $payload['quantity'],
            $transactionId
        );

        return ['restored' => $payload['quantity']];
    }

    private function releaseReservation(array $payload, string $transactionId): array
    {
        $this->inventoryRepository->releaseReservation(
            $payload['product_id'],
            $payload['sku_id'],
            $payload['quantity'],
            $transactionId
        );

        return ['released' => $payload['quantity']];
    }
}

补偿管理器实现

php
<?php

namespace App\DistributedTx;

class CompensationManager
{
    private TransactionLogRepository $logRepository;
    private MessagePublisher $publisher;
    private array $participantRegistry;

    public function __construct(
        TransactionLogRepository $logRepository,
        MessagePublisher $publisher,
        array $participantRegistry = []
    ) {
        $this->logRepository = $logRepository;
        $this->publisher = $publisher;
        $this->participantRegistry = $participantRegistry;
    }

    public function compensate(Transaction $transaction, TransactionResult $result): void
    {
        $executedCommands = $this->logRepository->getExecutedCommands($transaction->id);

        $compensateCommands = [];
        
        foreach (array_reverse($executedCommands) as $command) {
            if ($command['status'] === 'committed') {
                $compensateCommands[] = [
                    'service' => $command['service'],
                    'action' => $command['compensate_action'] ?? $this->getCompensateAction($command['action']),
                    'payload' => $command['payload'],
                    'original_command_id' => $command['command_id'],
                ];
            }
        }

        if (!empty($compensateCommands)) {
            $this->publisher->publishRollback($transaction->id, $compensateCommands);
        }

        $this->logRepository->update($transaction->id, [
            'status' => 'rolledback',
            'completed_at' => date('Y-m-d H:i:s'),
        ]);
    }

    private function getCompensateAction(string $action): string
    {
        $compensateMap = [
            'create' => 'cancel',
            'deduct' => 'restore',
            'reserve' => 'release',
            'add' => 'subtract',
            'use' => 'release',
        ];

        return $compensateMap[$action] ?? 'compensate_' . $action;
    }
}

本地消息表实现

php
<?php

namespace App\DistributedTx;

class LocalMessageTable
{
    private $pdo;

    public function __construct(\PDO $pdo)
    {
        $this->pdo = $pdo;
        $this->initTable();
    }

    private function initTable(): void
    {
        $sql = "
            CREATE TABLE IF NOT EXISTS transaction_outbox (
                id BIGINT AUTO_INCREMENT PRIMARY KEY,
                message_id VARCHAR(64) NOT NULL UNIQUE,
                transaction_id VARCHAR(64) NOT NULL,
                target_service VARCHAR(64) NOT NULL,
                message_type VARCHAR(64) NOT NULL,
                payload JSON NOT NULL,
                status ENUM('pending', 'sent', 'failed') DEFAULT 'pending',
                retry_count INT DEFAULT 0,
                created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
                sent_at DATETIME NULL,
                INDEX idx_transaction_id (transaction_id),
                INDEX idx_status (status)
            ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
        ";
        $this->pdo->exec($sql);
    }

    public function insert(string $transactionId, string $targetService, string $messageType, array $payload): string
    {
        $messageId = $this->generateMessageId();

        $sql = "
            INSERT INTO transaction_outbox 
            (message_id, transaction_id, target_service, message_type, payload)
            VALUES (?, ?, ?, ?, ?)
        ";

        $stmt = $this->pdo->prepare($sql);
        $stmt->execute([
            $messageId,
            $transactionId,
            $targetService,
            $messageType,
            json_encode($payload),
        ]);

        return $messageId;
    }

    public function markSent(string $messageId): void
    {
        $sql = "
            UPDATE transaction_outbox 
            SET status = 'sent', sent_at = NOW()
            WHERE message_id = ?
        ";
        $stmt = $this->pdo->prepare($sql);
        $stmt->execute([$messageId]);
    }

    public function markFailed(string $messageId): void
    {
        $sql = "
            UPDATE transaction_outbox 
            SET status = 'failed', retry_count = retry_count + 1
            WHERE message_id = ?
        ";
        $stmt = $this->pdo->prepare($sql);
        $stmt->execute([$messageId]);
    }

    public function getPendingMessages(int $limit = 100): array
    {
        $sql = "
            SELECT * FROM transaction_outbox 
            WHERE status = 'pending' AND retry_count < 3
            ORDER BY created_at ASC
            LIMIT ?
        ";
        $stmt = $this->pdo->prepare($sql);
        $stmt->execute([$limit]);
        return $stmt->fetchAll(\PDO::FETCH_ASSOC);
    }

    public function executeInTransaction(callable $businessLogic, callable $messageInsert): bool
    {
        try {
            $this->pdo->beginTransaction();

            $result = $businessLogic();

            $messageInsert($result);

            $this->pdo->commit();
            return true;

        } catch (\Exception $e) {
            $this->pdo->rollBack();
            throw $e;
        }
    }

    private function generateMessageId(): string
    {
        return sprintf('msg_%s_%s', date('YmdHis'), bin2hex(random_bytes(8)));
    }
}

完整使用示例

php
<?php

require_once 'vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use App\DistributedTx\{
    TransactionManager,
    MessagePublisher,
    CompensationManager,
    TransactionLogRepository,
    Transaction
};
use App\DistributedTx\Participants\{OrderParticipant, InventoryParticipant};

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');

$pdo = new PDO('mysql:host=localhost;dbname=ecommerce', 'root', 'password');
$redis = new Redis();
$redis->connect('localhost', 6379);

$logRepository = new TransactionLogRepository($pdo);
$publisher = new MessagePublisher($connection);
$compensationManager = new CompensationManager($logRepository, $publisher);

$transactionManager = new TransactionManager(
    $logRepository,
    $publisher,
    $compensationManager,
    ['timeout' => 30]
);

$participants = [
    [
        'service' => 'order',
        'action' => 'create',
        'payload' => [
            'order_id' => 'ORD123456',
            'user_id' => 10001,
            'items' => [
                ['product_id' => 'P001', 'sku_id' => 'SKU001', 'quantity' => 2, 'price' => 99.00],
            ],
            'total_amount' => 198.00,
        ],
        'compensate_action' => 'cancel',
    ],
    [
        'service' => 'inventory',
        'action' => 'deduct',
        'payload' => [
            'product_id' => 'P001',
            'sku_id' => 'SKU001',
            'quantity' => 2,
        ],
        'compensate_action' => 'restore',
    ],
    [
        'service' => 'points',
        'action' => 'add',
        'payload' => [
            'user_id' => 10001,
            'points' => 198,
            'reason' => 'Order reward',
        ],
        'compensate_action' => 'subtract',
    ],
];

$transaction = $transactionManager->begin($participants, []);

echo "事务已创建: {$transaction->id}\n";

$result = $transactionManager->commit($transaction);

if ($result->isSuccess()) {
    echo "事务提交成功\n";
} else {
    echo "事务提交失败: " . json_encode($result->getErrors()) . "\n";
}

$orderParticipant = new OrderParticipant(
    $connection,
    $publisher,
    $logRepository,
    new OrderRepository($pdo)
);

echo "启动订单服务参与者...\n";

pcntl_async_signals(true);
pcntl_signal(SIGTERM, function () use ($orderParticipant) {
    echo "收到终止信号\n";
    $orderParticipant->stop();
});
pcntl_signal(SIGINT, function () use ($orderParticipant) {
    echo "收到中断信号\n";
    $orderParticipant->stop();
});

$orderParticipant->listen();

$publisher->close();
$connection->close();

关键技术点解析

1. 最终一致性

通过消息驱动实现最终一致性,而非强一致性:

  • 每个参与者维护本地事务状态
  • 通过消息确认机制保证消息可靠投递
  • 补偿机制处理失败场景

2. 幂等性保证

php
public function handleCommand(AMQPMessage $message): void
{
    $commandId = $data['command_id'];
    
    if ($this->isProcessed($commandId)) {
        $message->ack();
        return;
    }
    
    $this->execute($action, $payload);
    $this->markProcessed($commandId);
}

3. 本地消息表

将业务操作和消息发送放在同一本地事务中:

php
$this->pdo->beginTransaction();
$businessResult = $this->doBusiness();
$this->insertOutboxMessage($businessResult);
$this->pdo->commit();

4. 补偿事务

当事务失败时,执行逆向操作恢复状态:

php
$compensateMap = [
    'create' => 'cancel',
    'deduct' => 'restore',
    'reserve' => 'release',
];

性能优化建议

优化项建议说明
并行执行参与者并行处理减少总耗时
异步确认异步等待确认提高吞吐量
批量处理合并小事务减少网络开销
缓存状态Redis 缓存事务状态减少数据库压力

常见问题与解决方案

1. 事务超时

问题: 参与者响应慢导致事务超时

解决方案:

  • 设置合理的超时时间
  • 实现超时后的补偿机制
  • 监控参与者响应时间

2. 消息丢失

问题: 消息在传输过程中丢失

解决方案:

  • 使用消息持久化
  • 实现本地消息表
  • 定时扫描重发

3. 补偿失败

问题: 补偿操作执行失败

解决方案:

  • 记录补偿日志
  • 实现重试机制
  • 人工介入处理

4. 重复消费

问题: 消息被重复消费

解决方案:

  • 实现幂等性处理
  • 使用唯一事务ID
  • Redis 记录处理状态

相关链接