Skip to content

生产者确认

概述

生产者确认(Publisher Confirms)是 RabbitMQ 提供的一种机制,用于确认生产者发送的消息已被 Broker 成功处理。这是保证消息可靠投递的重要手段。

核心原理

确认流程

mermaid
sequenceDiagram
    participant P as 生产者
    participant B as Broker
    
    P->>B: confirm_select()
    B-->>P: 确认进入确认模式
    
    P->>B: 发送消息 1
    P->>B: 发送消息 2
    P->>B: 发送消息 3
    
    B->>B: 处理消息
    
    B-->>P: ACK (消息 1)
    B-->>P: ACK (消息 2)
    B-->>P: NACK (消息 3)
    
    Note over P: 根据确认结果处理

确认模式对比

模式性能可靠性复杂度
自动确认最高最低最低
事务最低最高
同步确认
异步确认

确认结果

  • ACK: 消息已被成功处理
  • NACK: 消息处理失败(如内部错误)

PHP 代码示例

启用发布确认模式

php
<?php

require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

// 声明队列
$queueName = 'confirm-queue';
$channel->queue_declare($queueName, false, true, false, false);

// 启用发布确认模式
$channel->confirm_select();

echo "已进入发布确认模式\n";

// 发送消息
$message = new AMQPMessage(
    json_encode(['data' => 'test']),
    ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]
);

$channel->basic_publish($message, '', $queueName);

// 同步等待确认
try {
    $channel->wait_for_pending_acks();
    echo "消息已确认\n";
} catch (Exception $e) {
    echo "消息确认失败: " . $e->getMessage() . "\n";
}

$channel->close();
$connection->close();

同步确认

php
<?php

require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$queueName = 'sync-confirm-queue';
$channel->queue_declare($queueName, false, true, false, false);
$channel->confirm_select();

// 发送多条消息
$messageCount = 10;
for ($i = 1; $i <= $messageCount; $i++) {
    $message = new AMQPMessage(
        json_encode(['id' => $i, 'data' => 'message ' . $i]),
        ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]
    );
    
    $channel->basic_publish($message, '', $queueName);
    echo "消息 {$i} 已发送\n";
    
    // 每条消息单独确认
    try {
        $channel->wait_for_pending_acks();
        echo "消息 {$i} 已确认\n";
    } catch (Exception $e) {
        echo "消息 {$i} 确认失败: " . $e->getMessage() . "\n";
    }
}

$channel->close();
$connection->close();

批量确认

php
<?php

require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$queueName = 'batch-confirm-queue';
$channel->queue_declare($queueName, false, true, false, false);
$channel->confirm_select();

$batchSize = 100;
$totalMessages = 1000;

for ($i = 1; $i <= $totalMessages; $i++) {
    $message = new AMQPMessage(
        json_encode(['id' => $i]),
        ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]
    );
    
    $channel->basic_publish($message, '', $queueName);
    
    // 每批发送后确认
    if ($i % $batchSize === 0) {
        try {
            $channel->wait_for_pending_acks();
            echo "批次 " . ($i / $batchSize) . " 已确认 ({$batchSize} 条消息)\n";
        } catch (Exception $e) {
            echo "批次确认失败: " . $e->getMessage() . "\n";
        }
    }
}

// 确认剩余消息
if ($totalMessages % $batchSize !== 0) {
    $channel->wait_for_pending_acks();
    echo "剩余消息已确认\n";
}

$channel->close();
$connection->close();

异步确认

php
<?php

require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$queueName = 'async-confirm-queue';
$channel->queue_declare($queueName, false, true, false, false);
$channel->confirm_select();

// 统计确认结果
$confirmedCount = 0;
$nackedCount = 0;
$publishedCount = 0;

// 设置确认回调
$channel->set_ack_handler(function ($deliveryTag, $multiple) use (&$confirmedCount) {
    $confirmedCount++;
    if ($multiple) {
        echo "批量确认到 tag: {$deliveryTag}\n";
    } else {
        echo "消息确认: tag {$deliveryTag}\n";
    }
});

// 设置拒绝回调
$channel->set_nack_handler(function ($deliveryTag, $multiple, $requeue) use (&$nackedCount) {
    $nackedCount++;
    echo "消息拒绝: tag {$deliveryTag}, requeue: " . ($requeue ? 'true' : 'false') . "\n";
});

// 发送消息
$messageCount = 100;
for ($i = 1; $i <= $messageCount; $i++) {
    $message = new AMQPMessage(
        json_encode(['id' => $i]),
        ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]
    );
    
    $channel->basic_publish($message, '', $queueName);
    $publishedCount++;
}

echo "已发送 {$publishedCount} 条消息\n";

// 等待所有确认
$channel->wait_for_pending_acks();

echo "确认统计:\n";
echo "  已确认: {$confirmedCount}\n";
echo "  已拒绝: {$nackedCount}\n";

$channel->close();
$connection->close();

发布确认管理类

php
<?php

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

class PublisherConfirmManager
{
    private $channel;
    private $pendingMessages = [];
    private $confirmedMessages = [];
    private $failedMessages = [];
    private $batchSize;
    
    public function __construct($channel, $batchSize = 100)
    {
        $this->channel = $channel;
        $this->batchSize = $batchSize;
        $this->enableConfirmMode();
    }
    
    private function enableConfirmMode()
    {
        $this->channel->confirm_select();
        
        $this->channel->set_ack_handler(function ($deliveryTag, $multiple) {
            $this->handleAck($deliveryTag, $multiple);
        });
        
        $this->channel->set_nack_handler(function ($deliveryTag, $multiple, $requeue) {
            $this->handleNack($deliveryTag, $multiple, $requeue);
        });
    }
    
    public function publish($exchange, $routingKey, $data, $messageId = null)
    {
        $messageId = $messageId ?? uniqid('msg-');
        
        $message = new AMQPMessage(
            json_encode(array_merge($data, ['message_id' => $messageId])),
            [
                'content_type' => 'application/json',
                'message_id' => $messageId,
                'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT
            ]
        );
        
        $this->channel->basic_publish($message, $exchange, $routingKey);
        
        $deliveryTag = $this->channel->getNextDeliveryTag();
        $this->pendingMessages[$deliveryTag] = [
            'message_id' => $messageId,
            'data' => $data,
            'published_at' => time()
        ];
        
        // 达到批次大小时等待确认
        if (count($this->pendingMessages) >= $this->batchSize) {
            $this->waitForConfirms();
        }
        
        return $messageId;
    }
    
    public function waitForConfirms($timeout = 30)
    {
        $this->channel->wait_for_pending_acks_returns($timeout);
    }
    
    private function handleAck($deliveryTag, $multiple)
    {
        if ($multiple) {
            // 批量确认
            foreach ($this->pendingMessages as $tag => $info) {
                if ($tag <= $deliveryTag) {
                    $this->confirmedMessages[$info['message_id']] = $info;
                    unset($this->pendingMessages[$tag]);
                }
            }
        } else {
            // 单条确认
            if (isset($this->pendingMessages[$deliveryTag])) {
                $info = $this->pendingMessages[$deliveryTag];
                $this->confirmedMessages[$info['message_id']] = $info;
                unset($this->pendingMessages[$deliveryTag]);
            }
        }
    }
    
    private function handleNack($deliveryTag, $multiple, $requeue)
    {
        if ($multiple) {
            foreach ($this->pendingMessages as $tag => $info) {
                if ($tag <= $deliveryTag) {
                    $this->failedMessages[$info['message_id']] = $info;
                    unset($this->pendingMessages[$tag]);
                }
            }
        } else {
            if (isset($this->pendingMessages[$deliveryTag])) {
                $info = $this->pendingMessages[$deliveryTag];
                $this->failedMessages[$info['message_id']] = $info;
                unset($this->pendingMessages[$deliveryTag]);
            }
        }
    }
    
    public function getStats()
    {
        return [
            'pending' => count($this->pendingMessages),
            'confirmed' => count($this->confirmedMessages),
            'failed' => count($this->failedMessages)
        ];
    }
    
    public function getFailedMessages()
    {
        return $this->failedMessages;
    }
    
    public function retryFailed()
    {
        foreach ($this->failedMessages as $messageId => $info) {
            // 重新发送失败的消息
            $this->publish('', '', $info['data'], $messageId);
        }
        
        $this->failedMessages = [];
    }
}

实际应用场景

1. 可靠消息发送

php
<?php

class ReliablePublisher
{
    private $confirmManager;
    private $maxRetries = 3;
    
    public function __construct($channel)
    {
        $this->confirmManager = new PublisherConfirmManager($channel);
    }
    
    public function sendWithRetry($exchange, $routingKey, $data)
    {
        $attempts = 0;
        
        while ($attempts < $this->maxRetries) {
            $attempts++;
            
            $messageId = $this->confirmManager->publish($exchange, $routingKey, $data);
            $this->confirmManager->waitForConfirms();
            
            $stats = $this->confirmManager->getStats();
            
            if ($stats['failed'] === 0) {
                echo "消息发送成功: {$messageId}\n";
                return $messageId;
            }
            
            echo "发送失败,重试 {$attempts}/{$this->maxRetries}\n";
            $this->confirmManager->retryFailed();
        }
        
        throw new RuntimeException("消息发送失败,已重试 {$this->maxRetries} 次");
    }
}

2. 批量消息发送

php
<?php

class BatchPublisher
{
    private $channel;
    private $batchSize;
    private $batch = [];
    
    public function __construct($channel, $batchSize = 100)
    {
        $this->channel = $channel;
        $this->batchSize = $batchSize;
        $this->channel->confirm_select();
    }
    
    public function add($exchange, $routingKey, $data)
    {
        $this->batch[] = [
            'exchange' => $exchange,
            'routing_key' => $routingKey,
            'data' => $data
        ];
        
        if (count($this->batch) >= $this->batchSize) {
            return $this->flush();
        }
        
        return 0;
    }
    
    public function flush()
    {
        if (empty($this->batch)) {
            return 0;
        }
        
        foreach ($this->batch as $item) {
            $message = new AMQPMessage(
                json_encode($item['data']),
                ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]
            );
            
            $this->channel->basic_publish(
                $message,
                $item['exchange'],
                $item['routing_key']
            );
        }
        
        $count = count($this->batch);
        $this->batch = [];
        
        $this->channel->wait_for_pending_acks();
        
        echo "批量发送 {$count} 条消息已确认\n";
        
        return $count;
    }
}

3. 事务性消息发送

php
<?php

class TransactionalPublisher
{
    private $channel;
    private $db;
    
    public function __construct($channel, $db)
    {
        $this->channel = $channel;
        $this->db = $db;
        $this->channel->confirm_select();
    }
    
    public function sendInTransaction($queueName, $data)
    {
        try {
            // 开始数据库事务
            $this->db->beginTransaction();
            
            // 记录消息到数据库
            $messageId = $this->recordMessage($data);
            
            // 发送消息
            $message = new AMQPMessage(
                json_encode(array_merge($data, ['message_id' => $messageId])),
                [
                    'message_id' => $messageId,
                    'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT
                ]
            );
            
            $this->channel->basic_publish($message, '', $queueName);
            
            // 等待确认
            $this->channel->wait_for_pending_acks();
            
            // 提交数据库事务
            $this->db->commit();
            
            echo "消息发送成功: {$messageId}\n";
            
            return $messageId;
            
        } catch (Exception $e) {
            // 回滚数据库事务
            $this->db->rollBack();
            
            echo "消息发送失败: " . $e->getMessage() . "\n";
            
            throw $e;
        }
    }
    
    private function recordMessage($data)
    {
        $messageId = uniqid('msg-');
        
        $sql = "INSERT INTO message_log (message_id, data, status, created_at) 
                VALUES (?, ?, 'pending', NOW())";
        
        $stmt = $this->db->prepare($sql);
        $stmt->execute([$messageId, json_encode($data)]);
        
        return $messageId;
    }
}

常见问题与解决方案

问题 1: 确认超时

症状: wait_for_pending_acks 超时

解决方案:

php
<?php

// 设置合理的超时时间
try {
    $channel->wait_for_pending_acks(30);  // 30秒超时
} catch (PhpAmqpLib\Exception\AMQPTimeoutException $e) {
    echo "确认超时,可能需要重试\n";
    // 处理超时逻辑
}

问题 2: 内存占用过高

症状: 大量消息未确认导致内存增长

解决方案:

php
<?php

// 定期等待确认,不要积累太多未确认消息
$batchSize = 100;
$sentCount = 0;

foreach ($messages as $data) {
    $channel->basic_publish($message, '', $queueName);
    $sentCount++;
    
    if ($sentCount % $batchSize === 0) {
        $channel->wait_for_pending_acks();
    }
}

// 确认剩余消息
if ($sentCount % $batchSize !== 0) {
    $channel->wait_for_pending_acks();
}

问题 3: NACK 处理

症状: 收到 NACK 但不知道如何处理

解决方案:

php
<?php

$channel->set_nack_handler(function ($deliveryTag, $multiple, $requeue) {
    // 记录失败消息
    error_log("消息被拒绝: delivery_tag={$deliveryTag}");
    
    // 根据业务需求决定是否重试
    // 注意:NACK 通常表示 Broker 内部错误,重试可能无效
});

// 对于关键消息,建议使用数据库记录并重试

最佳实践建议

  1. 启用确认模式: 生产环境务必启用发布确认
  2. 批量确认: 平衡性能和可靠性
  3. 异步确认: 高吞吐场景使用异步确认
  4. 处理 NACK: 记录并告警 NACK 消息
  5. 合理超时: 设置合理的确认超时时间

相关链接