Appearance
事务机制
概述
RabbitMQ 事务机制允许生产者将多个操作组合成一个原子操作,确保消息发送的可靠性。但事务机制会带来较大的性能开销,在大多数场景下推荐使用发布确认机制。
核心原理
事务流程
mermaid
sequenceDiagram
participant P as 生产者
participant B as Broker
P->>B: tx.select() 开启事务
B-->>P: +OK
P->>B: 发送消息 1
P->>B: 发送消息 2
P->>B: 发送消息 3
alt 提交事务
P->>B: tx.commit()
B-->>P: +OK
B->>B: 消息已处理
else 回滚事务
P->>B: tx.rollback()
B-->>P: +OK
B->>B: 消息已丢弃
end事务 vs 发布确认
| 特性 | 事务 | 发布确认 |
|---|---|---|
| 可靠性 | 最高 | 高 |
| 性能 | 最低 | 高 |
| 批量操作 | 否 | 是 |
| 回滚支持 | 是 | 否 |
| AMQP 支持 | 是 | 是 |
事务原理
mermaid
graph TD
subgraph 事务机制
S[tx.select] --> P[发送消息]
P --> C[tx.commit]
S2[tx.select] --> P2[发送消息]
P2 --> R[tx.rollback]
end
subgraph 事务内消息处理
C --> W1[等待所有消息写入]
W1 --> W2[批量 ACK]
W2 --> E[结束]
endPHP 代码示例
基本事务操作
php
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$queueName = 'transaction-queue';
$channel->queue_declare($queueName, false, true, false, false);
// 开启事务模式
$channel->tx_select();
echo "事务已开启\n";
try {
// 发送多条消息
for ($i = 1; $i <= 5; $i++) {
$message = new AMQPMessage(
json_encode(['id' => $i, 'message' => 'test message ' . $i]),
['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]
);
$channel->basic_publish($message, '', $queueName);
echo "消息 {$i} 已发送\n";
}
// 提交事务
$channel->tx_commit();
echo "事务已提交\n";
} catch (Exception $e) {
// 回滚事务
$channel->tx_rollback();
echo "事务已回滚: " . $e->getMessage() . "\n";
}
$channel->close();
$connection->close();事务回滚示例
php
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$queueName = 'transaction-queue';
$channel->queue_declare($queueName, false, true, false, false);
$shouldFail = true; // 模拟失败条件
// 开启事务
$channel->tx_select();
try {
// 发送消息 1
$msg1 = new AMQPMessage(
json_encode(['id' => 1, 'data' => 'message 1']),
['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]
);
$channel->basic_publish($msg1, '', $queueName);
echo "消息 1 已发送\n";
// 模拟业务逻辑失败
if ($shouldFail) {
throw new Exception("业务逻辑处理失败");
}
// 发送消息 2
$msg2 = new AMQPMessage(
json_encode(['id' => 2, 'data' => 'message 2']),
['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]
);
$channel->basic_publish($msg2, '', $queueName);
echo "消息 2 已发送\n";
// 提交事务
$channel->tx_commit();
echo "事务提交成功\n";
} catch (Exception $e) {
// 回滚事务
$channel->tx_rollback();
echo "事务回滚: " . $e->getMessage() . "\n";
}
$channel->close();
$connection->close();事务工具类
php
<?php
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
class TransactionalPublisher
{
private $channel;
private $inTransaction = false;
public function __construct($channel)
{
$this->channel = $channel;
}
public function begin()
{
if ($this->inTransaction) {
throw new RuntimeException('事务已开启');
}
$this->channel->tx_select();
$this->inTransaction = true;
}
public function commit()
{
if (!$this->inTransaction) {
throw new RuntimeException('没有开启的事务');
}
$this->channel->tx_commit();
$this->inTransaction = false;
}
public function rollback()
{
if (!$this->inTransaction) {
throw new RuntimeException('没有开启的事务');
}
$this->channel->tx_rollback();
$this->inTransaction = false;
}
public function publish($exchange, $routingKey, $data)
{
$message = new AMQPMessage(
json_encode($data),
[
'content_type' => 'application/json',
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT
]
);
$this->channel->basic_publish($message, $exchange, $routingKey);
}
public function isInTransaction()
{
return $this->inTransaction;
}
public function transactional(callable $callback)
{
$this->begin();
try {
$result = $callback($this);
$this->commit();
return $result;
} catch (Exception $e) {
$this->rollback();
throw $e;
}
}
}
// 使用示例
$publisher = new TransactionalPublisher($channel);
$publisher->transactional(function ($pub) use ($channel) {
// 在事务中发送多条消息
$pub->publish('', 'queue1', ['msg' => 'message 1']);
$pub->publish('', 'queue2', ['msg' => 'message 2']);
$pub->publish('', 'queue3', ['msg' => 'message 3']);
echo "事务内的所有消息已发送\n";
});实际应用场景
1. 批量消息发送
php
<?php
class BatchTransactionPublisher
{
private $channel;
private $batchSize = 100;
public function __construct($channel)
{
$this->channel = $channel;
}
public function sendBatch(array $messages)
{
$this->channel->tx_select();
$successCount = 0;
$failCount = 0;
try {
foreach ($messages as $msgData) {
$message = new AMQPMessage(
json_encode($msgData),
['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]
);
$this->channel->basic_publish(
$message,
$msgData['exchange'] ?? '',
$msgData['routing_key'] ?? ''
);
$successCount++;
// 达到批次大小时提交
if ($successCount % $this->batchSize === 0) {
$this->channel->tx_commit();
echo "批次提交成功: {$this->batchSize} 条消息\n";
$this->channel->tx_select();
}
}
// 提交剩余消息
$this->channel->tx_commit();
echo "全部提交成功: {$successCount} 条消息\n";
return $successCount;
} catch (Exception $e) {
$this->channel->tx_rollback();
echo "批次回滚: " . $e->getMessage() . "\n";
return $successCount - $failCount;
}
}
}2. 订单与库存一致性
php
<?php
class OrderTransactionPublisher
{
private $channel;
public function __construct($channel)
{
$this->channel = $channel;
}
public function createOrder($orderData, $inventoryItems)
{
$this->channel->tx_select();
try {
// 发送订单消息
$orderMessage = new AMQPMessage(
json_encode([
'type' => 'order_created',
'data' => $orderData,
'timestamp' => time()
]),
[
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
'message_id' => 'order-' . $orderData['order_id']
]
);
$this->channel->basic_publish($orderMessage, 'orders', 'order.created');
// 发送库存扣减消息
foreach ($inventoryItems as $item) {
$inventoryMessage = new AMQPMessage(
json_encode([
'type' => 'inventory_deducted',
'data' => $item,
'timestamp' => time()
]),
[
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
'correlation_id' => 'order-' . $orderData['order_id']
]
);
$this->channel->basic_publish($inventoryMessage, 'inventory', 'inventory.deduct');
}
// 提交事务
$this->channel->tx_commit();
echo "订单创建成功\n";
return true;
} catch (Exception $e) {
$this->channel->tx_rollback();
echo "订单创建失败,已回滚: " . $e->getMessage() . "\n";
return false;
}
}
}3. 日志批量提交
php
<?php
class TransactionalLogPublisher
{
private $channel;
private $logs = [];
private $flushSize = 50;
public function __construct($channel)
{
$this->channel = $channel;
}
public function log($level, $service, $message, $context = [])
{
$this->logs[] = [
'level' => $level,
'service' => $service,
'message' => $message,
'context' => $context,
'timestamp' => time()
];
if (count($this->logs) >= $this->flushSize) {
$this->flush();
}
}
public function flush()
{
if (empty($this->logs)) {
return;
}
$this->channel->tx_select();
try {
foreach ($this->logs as $log) {
$message = new AMQPMessage(
json_encode($log),
['content_type' => 'application/json']
);
$this->channel->basic_publish(
$message,
'logs',
$log['service'] . '.' . $log['level']
);
}
$this->channel->tx_commit();
echo "日志批量提交成功: " . count($this->logs) . " 条\n";
$this->logs = [];
} catch (Exception $e) {
$this->channel->tx_rollback();
echo "日志批量提交失败: " . $e->getMessage() . "\n";
}
}
public function __destruct()
{
$this->flush();
}
}常见问题与解决方案
问题 1: 事务性能差
症状: 事务模式发送消息非常慢
原因: 事务模式下每条消息都需要同步等待确认
解决方案:
php
<?php
// 优先使用发布确认机制代替事务
$channel->confirm_select();
// 批量确认
for ($i = 0; $i < 100; $i++) {
$channel->basic_publish($message, '', $queueName);
}
$channel->wait_for_pending_acks();问题 2: 事务超时
症状: 事务提交超时
解决方案:
php
<?php
// 设置心跳保持连接
$connection = new AMQPStreamConnection(
'localhost',
5672,
'guest',
'guest',
'/',
false,
'AMQPLAIN',
null,
'en_US',
3.0, // 心跳间隔
60 // 超时时间
);问题 3: 事务与发布确认混用
症状: 报错
原因: 事务模式和发布确认模式不能同时使用
解决方案:
php
<?php
// 只能选择一种模式
// 事务模式
$channel->tx_select();
// ... 发送消息
$channel->tx_commit();
// 发布确认模式
$channel->confirm_select();
// ... 发送消息
$channel->wait_for_pending_acks();最佳实践建议
- 优先使用发布确认: 性能更好,可靠性足够
- 小批次使用事务: 避免长事务
- 异常处理: 务必捕获异常并回滚事务
- 监控性能: 监控事务模式下的吞吐量
- 考虑替代方案: 使用发布确认+补偿机制
