Skip to content

事务机制

概述

RabbitMQ 事务机制允许生产者将多个操作组合成一个原子操作,确保消息发送的可靠性。但事务机制会带来较大的性能开销,在大多数场景下推荐使用发布确认机制。

核心原理

事务流程

mermaid
sequenceDiagram
    participant P as 生产者
    participant B as Broker
    
    P->>B: tx.select() 开启事务
    B-->>P: +OK
    
    P->>B: 发送消息 1
    P->>B: 发送消息 2
    P->>B: 发送消息 3
    
    alt 提交事务
        P->>B: tx.commit()
        B-->>P: +OK
        B->>B: 消息已处理
    else 回滚事务
        P->>B: tx.rollback()
        B-->>P: +OK
        B->>B: 消息已丢弃
    end

事务 vs 发布确认

特性事务发布确认
可靠性最高
性能最低
批量操作
回滚支持
AMQP 支持

事务原理

mermaid
graph TD
    subgraph 事务机制
        S[tx.select] --> P[发送消息]
        P --> C[tx.commit]
        
        S2[tx.select] --> P2[发送消息]
        P2 --> R[tx.rollback]
    end
    
    subgraph 事务内消息处理
        C --> W1[等待所有消息写入]
        W1 --> W2[批量 ACK]
        W2 --> E[结束]
    end

PHP 代码示例

基本事务操作

php
<?php

require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$queueName = 'transaction-queue';
$channel->queue_declare($queueName, false, true, false, false);

// 开启事务模式
$channel->tx_select();

echo "事务已开启\n";

try {
    // 发送多条消息
    for ($i = 1; $i <= 5; $i++) {
        $message = new AMQPMessage(
            json_encode(['id' => $i, 'message' => 'test message ' . $i]),
            ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]
        );
        
        $channel->basic_publish($message, '', $queueName);
        echo "消息 {$i} 已发送\n";
    }
    
    // 提交事务
    $channel->tx_commit();
    echo "事务已提交\n";
    
} catch (Exception $e) {
    // 回滚事务
    $channel->tx_rollback();
    echo "事务已回滚: " . $e->getMessage() . "\n";
}

$channel->close();
$connection->close();

事务回滚示例

php
<?php

require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$queueName = 'transaction-queue';
$channel->queue_declare($queueName, false, true, false, false);

$shouldFail = true;  // 模拟失败条件

// 开启事务
$channel->tx_select();

try {
    // 发送消息 1
    $msg1 = new AMQPMessage(
        json_encode(['id' => 1, 'data' => 'message 1']),
        ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]
    );
    $channel->basic_publish($msg1, '', $queueName);
    echo "消息 1 已发送\n";
    
    // 模拟业务逻辑失败
    if ($shouldFail) {
        throw new Exception("业务逻辑处理失败");
    }
    
    // 发送消息 2
    $msg2 = new AMQPMessage(
        json_encode(['id' => 2, 'data' => 'message 2']),
        ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]
    );
    $channel->basic_publish($msg2, '', $queueName);
    echo "消息 2 已发送\n";
    
    // 提交事务
    $channel->tx_commit();
    echo "事务提交成功\n";
    
} catch (Exception $e) {
    // 回滚事务
    $channel->tx_rollback();
    echo "事务回滚: " . $e->getMessage() . "\n";
}

$channel->close();
$connection->close();

事务工具类

php
<?php

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

class TransactionalPublisher
{
    private $channel;
    private $inTransaction = false;
    
    public function __construct($channel)
    {
        $this->channel = $channel;
    }
    
    public function begin()
    {
        if ($this->inTransaction) {
            throw new RuntimeException('事务已开启');
        }
        
        $this->channel->tx_select();
        $this->inTransaction = true;
    }
    
    public function commit()
    {
        if (!$this->inTransaction) {
            throw new RuntimeException('没有开启的事务');
        }
        
        $this->channel->tx_commit();
        $this->inTransaction = false;
    }
    
    public function rollback()
    {
        if (!$this->inTransaction) {
            throw new RuntimeException('没有开启的事务');
        }
        
        $this->channel->tx_rollback();
        $this->inTransaction = false;
    }
    
    public function publish($exchange, $routingKey, $data)
    {
        $message = new AMQPMessage(
            json_encode($data),
            [
                'content_type' => 'application/json',
                'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT
            ]
        );
        
        $this->channel->basic_publish($message, $exchange, $routingKey);
    }
    
    public function isInTransaction()
    {
        return $this->inTransaction;
    }
    
    public function transactional(callable $callback)
    {
        $this->begin();
        
        try {
            $result = $callback($this);
            $this->commit();
            return $result;
            
        } catch (Exception $e) {
            $this->rollback();
            throw $e;
        }
    }
}

// 使用示例
$publisher = new TransactionalPublisher($channel);

$publisher->transactional(function ($pub) use ($channel) {
    // 在事务中发送多条消息
    $pub->publish('', 'queue1', ['msg' => 'message 1']);
    $pub->publish('', 'queue2', ['msg' => 'message 2']);
    $pub->publish('', 'queue3', ['msg' => 'message 3']);
    
    echo "事务内的所有消息已发送\n";
});

实际应用场景

1. 批量消息发送

php
<?php

class BatchTransactionPublisher
{
    private $channel;
    private $batchSize = 100;
    
    public function __construct($channel)
    {
        $this->channel = $channel;
    }
    
    public function sendBatch(array $messages)
    {
        $this->channel->tx_select();
        
        $successCount = 0;
        $failCount = 0;
        
        try {
            foreach ($messages as $msgData) {
                $message = new AMQPMessage(
                    json_encode($msgData),
                    ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]
                );
                
                $this->channel->basic_publish(
                    $message,
                    $msgData['exchange'] ?? '',
                    $msgData['routing_key'] ?? ''
                );
                
                $successCount++;
                
                // 达到批次大小时提交
                if ($successCount % $this->batchSize === 0) {
                    $this->channel->tx_commit();
                    echo "批次提交成功: {$this->batchSize} 条消息\n";
                    
                    $this->channel->tx_select();
                }
            }
            
            // 提交剩余消息
            $this->channel->tx_commit();
            echo "全部提交成功: {$successCount} 条消息\n";
            
            return $successCount;
            
        } catch (Exception $e) {
            $this->channel->tx_rollback();
            echo "批次回滚: " . $e->getMessage() . "\n";
            
            return $successCount - $failCount;
        }
    }
}

2. 订单与库存一致性

php
<?php

class OrderTransactionPublisher
{
    private $channel;
    
    public function __construct($channel)
    {
        $this->channel = $channel;
    }
    
    public function createOrder($orderData, $inventoryItems)
    {
        $this->channel->tx_select();
        
        try {
            // 发送订单消息
            $orderMessage = new AMQPMessage(
                json_encode([
                    'type' => 'order_created',
                    'data' => $orderData,
                    'timestamp' => time()
                ]),
                [
                    'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
                    'message_id' => 'order-' . $orderData['order_id']
                ]
            );
            
            $this->channel->basic_publish($orderMessage, 'orders', 'order.created');
            
            // 发送库存扣减消息
            foreach ($inventoryItems as $item) {
                $inventoryMessage = new AMQPMessage(
                    json_encode([
                        'type' => 'inventory_deducted',
                        'data' => $item,
                        'timestamp' => time()
                    ]),
                    [
                        'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
                        'correlation_id' => 'order-' . $orderData['order_id']
                    ]
                );
                
                $this->channel->basic_publish($inventoryMessage, 'inventory', 'inventory.deduct');
            }
            
            // 提交事务
            $this->channel->tx_commit();
            
            echo "订单创建成功\n";
            return true;
            
        } catch (Exception $e) {
            $this->channel->tx_rollback();
            echo "订单创建失败,已回滚: " . $e->getMessage() . "\n";
            return false;
        }
    }
}

3. 日志批量提交

php
<?php

class TransactionalLogPublisher
{
    private $channel;
    private $logs = [];
    private $flushSize = 50;
    
    public function __construct($channel)
    {
        $this->channel = $channel;
    }
    
    public function log($level, $service, $message, $context = [])
    {
        $this->logs[] = [
            'level' => $level,
            'service' => $service,
            'message' => $message,
            'context' => $context,
            'timestamp' => time()
        ];
        
        if (count($this->logs) >= $this->flushSize) {
            $this->flush();
        }
    }
    
    public function flush()
    {
        if (empty($this->logs)) {
            return;
        }
        
        $this->channel->tx_select();
        
        try {
            foreach ($this->logs as $log) {
                $message = new AMQPMessage(
                    json_encode($log),
                    ['content_type' => 'application/json']
                );
                
                $this->channel->basic_publish(
                    $message,
                    'logs',
                    $log['service'] . '.' . $log['level']
                );
            }
            
            $this->channel->tx_commit();
            echo "日志批量提交成功: " . count($this->logs) . " 条\n";
            
            $this->logs = [];
            
        } catch (Exception $e) {
            $this->channel->tx_rollback();
            echo "日志批量提交失败: " . $e->getMessage() . "\n";
        }
    }
    
    public function __destruct()
    {
        $this->flush();
    }
}

常见问题与解决方案

问题 1: 事务性能差

症状: 事务模式发送消息非常慢

原因: 事务模式下每条消息都需要同步等待确认

解决方案:

php
<?php

// 优先使用发布确认机制代替事务
$channel->confirm_select();

// 批量确认
for ($i = 0; $i < 100; $i++) {
    $channel->basic_publish($message, '', $queueName);
}

$channel->wait_for_pending_acks();

问题 2: 事务超时

症状: 事务提交超时

解决方案:

php
<?php

// 设置心跳保持连接
$connection = new AMQPStreamConnection(
    'localhost',
    5672,
    'guest',
    'guest',
    '/',
    false,
    'AMQPLAIN',
    null,
    'en_US',
    3.0,  // 心跳间隔
    60     // 超时时间
);

问题 3: 事务与发布确认混用

症状: 报错

原因: 事务模式和发布确认模式不能同时使用

解决方案:

php
<?php

// 只能选择一种模式

// 事务模式
$channel->tx_select();
// ... 发送消息
$channel->tx_commit();

// 发布确认模式
$channel->confirm_select();
// ... 发送消息
$channel->wait_for_pending_acks();

最佳实践建议

  1. 优先使用发布确认: 性能更好,可靠性足够
  2. 小批次使用事务: 避免长事务
  3. 异常处理: 务必捕获异常并回滚事务
  4. 监控性能: 监控事务模式下的吞吐量
  5. 考虑替代方案: 使用发布确认+补偿机制

相关链接