Skip to content

确认最佳实践

概述

消息确认机制是 RabbitMQ 保证消息可靠性的核心。本文档总结生产者和消费者确认的最佳实践,帮助构建可靠的消息系统。

核心原则

可靠性金字塔

mermaid
graph TD
    subgraph 可靠性层级
        L1[最高可靠性] --> T1[事务]
        L2[高可靠性] --> T2[发布确认]
        L3[中等可靠性] --> T3[手动确认]
        L4[低可靠性] --> T4[自动确认]
    end
    
    style L1 fill:#90EE90
    style L2 fill:#87CEEB
    style L3 fill:#DDA0DD
    style L4 fill:#FFA07A

确认机制选择

场景生产者确认消费者确认推荐方案
高可靠性金融交易事务手动+事务最高可靠性
订单处理发布确认手动确认高可靠性
日志收集批量确认手动确认平衡
实时推送异步确认自动确认高吞吐

生产者确认最佳实践

1. 启用发布确认

php
<?php

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

class ReliablePublisher
{
    private $channel;
    private $confirmedCount = 0;
    private $nackedCount = 0;
    
    public function __construct($channel)
    {
        $this->channel = $channel;
        $this->setupConfirmMode();
    }
    
    private function setupConfirmMode()
    {
        // 启用发布确认模式
        $this->channel->confirm_select();
        
        // 设置确认回调
        $this->channel->set_ack_handler(function ($deliveryTag, $multiple) {
            $this->confirmedCount++;
        });
        
        $this->channel->set_nack_handler(function ($deliveryTag, $multiple, $requeue) {
            $this->nackedCount++;
            $this->handleNack($deliveryTag);
        });
    }
    
    public function publish($exchange, $routingKey, $data)
    {
        $message = new AMQPMessage(
            json_encode($data),
            [
                'content_type' => 'application/json',
                'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
                'message_id' => uniqid('msg-', true),
                'timestamp' => time()
            ]
        );
        
        $this->channel->basic_publish($message, $exchange, $routingKey);
        
        // 等待确认
        $this->channel->wait_for_pending_acks();
        
        return true;
    }
    
    private function handleNack($deliveryTag)
    {
        // 记录失败日志
        error_log("消息被拒绝: delivery_tag={$deliveryTag}");
        
        // 发送告警
        $this->sendAlert("消息发送失败: tag={$deliveryTag}");
    }
    
    private function sendAlert($message)
    {
        // 告警逻辑
    }
}

2. 批量确认优化

php
<?php

class BatchConfirmPublisher
{
    private $channel;
    private $batchSize;
    private $sentCount = 0;
    
    public function __construct($channel, $batchSize = 100)
    {
        $this->channel = $channel;
        $this->batchSize = $batchSize;
        $this->channel->confirm_select();
    }
    
    public function publish($exchange, $routingKey, $data)
    {
        $message = new AMQPMessage(
            json_encode($data),
            ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]
        );
        
        $this->channel->basic_publish($message, $exchange, $routingKey);
        $this->sentCount++;
        
        // 达到批次大小时确认
        if ($this->sentCount % $this->batchSize === 0) {
            $this->waitForConfirms();
        }
    }
    
    public function flush()
    {
        if ($this->sentCount % $this->batchSize !== 0) {
            $this->waitForConfirms();
        }
    }
    
    private function waitForConfirms($timeout = 30)
    {
        try {
            $this->channel->wait_for_pending_acks($timeout);
        } catch (Exception $e) {
            $this->handleConfirmError($e);
        }
    }
    
    private function handleConfirmError($exception)
    {
        // 记录错误
        error_log("确认失败: " . $exception->getMessage());
        
        // 根据错误类型处理
        if ($exception instanceof PhpAmqpLib\Exception\AMQPTimeoutException) {
            // 超时处理
        }
    }
}

3. 异步确认模式

php
<?php

class AsyncConfirmPublisher
{
    private $channel;
    private $pendingMessages = [];
    private $maxPending = 1000;
    
    public function __construct($channel)
    {
        $this->channel = $channel;
        $this->setupAsyncConfirm();
    }
    
    private function setupAsyncConfirm()
    {
        $this->channel->confirm_select();
        
        $this->channel->set_ack_handler(function ($deliveryTag, $multiple) {
            $this->handleAck($deliveryTag, $multiple);
        });
        
        $this->channel->set_nack_handler(function ($deliveryTag, $multiple, $requeue) {
            $this->handleNack($deliveryTag, $multiple);
        });
    }
    
    public function publish($exchange, $routingKey, $data, $messageId = null)
    {
        // 检查待确认消息数量
        if (count($this->pendingMessages) >= $this->maxPending) {
            $this->waitForConfirms();
        }
        
        $messageId = $messageId ?? uniqid('msg-');
        
        $message = new AMQPMessage(
            json_encode(array_merge($data, ['message_id' => $messageId])),
            [
                'content_type' => 'application/json',
                'message_id' => $messageId,
                'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT
            ]
        );
        
        $this->channel->basic_publish($message, $exchange, $routingKey);
        
        $deliveryTag = $this->channel->getNextDeliveryTag();
        $this->pendingMessages[$deliveryTag] = [
            'message_id' => $messageId,
            'data' => $data,
            'published_at' => time()
        ];
        
        return $messageId;
    }
    
    private function handleAck($deliveryTag, $multiple)
    {
        if ($multiple) {
            foreach ($this->pendingMessages as $tag => $info) {
                if ($tag <= $deliveryTag) {
                    unset($this->pendingMessages[$tag]);
                }
            }
        } else {
            unset($this->pendingMessages[$deliveryTag]);
        }
    }
    
    private function handleNack($deliveryTag, $multiple)
    {
        $failedMessages = [];
        
        if ($multiple) {
            foreach ($this->pendingMessages as $tag => $info) {
                if ($tag <= $deliveryTag) {
                    $failedMessages[] = $info;
                    unset($this->pendingMessages[$tag]);
                }
            }
        } else {
            if (isset($this->pendingMessages[$deliveryTag])) {
                $failedMessages[] = $this->pendingMessages[$deliveryTag];
                unset($this->pendingMessages[$deliveryTag]);
            }
        }
        
        // 处理失败消息
        foreach ($failedMessages as $msg) {
            $this->handleFailedMessage($msg);
        }
    }
    
    private function handleFailedMessage($messageInfo)
    {
        // 记录失败
        error_log("消息发送失败: " . json_encode($messageInfo));
        
        // 可以重试或发送到死信队列
    }
    
    public function waitForConfirms($timeout = 30)
    {
        $this->channel->wait_for_pending_acks_returns($timeout);
    }
    
    public function getPendingCount()
    {
        return count($this->pendingMessages);
    }
}

消费者确认最佳实践

1. 手动确认模式

php
<?php

class ReliableConsumer
{
    private $channel;
    private $queueName;
    
    public function __construct($channel, $queueName)
    {
        $this->channel = $channel;
        $this->queueName = $queueName;
    }
    
    public function consume(callable $handler)
    {
        // 设置公平分发
        $this->channel->basic_qos(null, 1, null);
        
        $callback = function ($msg) use ($handler) {
            $data = json_decode($msg->getBody(), true);
            
            try {
                // 处理消息
                $handler($data);
                
                // 确认消息
                $msg->ack();
                
            } catch (RecoverableException $e) {
                // 可恢复错误,重新入队
                $msg->nack(true);
                $this->logError($e, $data, 'recoverable');
                
            } catch (UnrecoverableException $e) {
                // 不可恢复错误,拒绝消息
                $msg->reject(false);
                $this->logError($e, $data, 'unrecoverable');
            }
        };
        
        // 手动确认模式
        $this->channel->basic_consume(
            $this->queueName,
            '',
            false,
            false,  // no_ack = false
            false,
            false,
            $callback
        );
        
        while ($this->channel->is_consuming()) {
            $this->channel->wait();
        }
    }
    
    private function logError($exception, $data, $type)
    {
        error_log(sprintf(
            "[%s] %s: %s, data: %s",
            $type,
            get_class($exception),
            $exception->getMessage(),
            json_encode($data)
        ));
    }
}

class RecoverableException extends Exception {}
class UnrecoverableException extends Exception {}

2. 幂等性处理

php
<?php

class IdempotentConsumer
{
    private $channel;
    private $redis;
    private $queueName;
    
    public function __construct($channel, $redis, $queueName)
    {
        $this->channel = $channel;
        $this->redis = $redis;
        $this->queueName = $queueName;
    }
    
    public function consume(callable $handler)
    {
        $this->channel->basic_qos(null, 1, null);
        
        $callback = function ($msg) use ($handler) {
            $data = json_decode($msg->getBody(), true);
            $messageId = $this->extractMessageId($msg, $data);
            
            // 检查是否已处理
            if ($this->isProcessed($messageId)) {
                echo "消息已处理,跳过: {$messageId}\n";
                $msg->ack();
                return;
            }
            
            try {
                // 处理消息
                $handler($data);
                
                // 标记为已处理
                $this->markProcessed($messageId);
                
                // 确认消息
                $msg->ack();
                
            } catch (Exception $e) {
                $msg->nack(true);
            }
        };
        
        $this->channel->basic_consume($this->queueName, '', false, false, false, false, $callback);
    }
    
    private function extractMessageId($msg, $data)
    {
        // 优先使用消息属性中的 message_id
        if ($msg->get('message_id')) {
            return $msg->get('message_id');
        }
        
        // 其次使用消息体中的 message_id
        if (isset($data['message_id'])) {
            return $data['message_id'];
        }
        
        // 最后使用业务 ID
        if (isset($data['order_id'])) {
            return 'order:' . $data['order_id'];
        }
        
        // 生成基于内容的 ID
        return md5(json_encode($data));
    }
    
    private function isProcessed($messageId)
    {
        return $this->redis->exists("processed:{$messageId}");
    }
    
    private function markProcessed($messageId)
    {
        $this->redis->setex("processed:{$messageId}", 86400, 1);
    }
}

3. 重试机制

php
<?php

class RetryableConsumer
{
    private $channel;
    private $maxRetries = 3;
    private $retryDelay = 5000;  // 5秒
    
    public function __construct($channel, $maxRetries = 3)
    {
        $this->channel = $channel;
        $this->maxRetries = $maxRetries;
    }
    
    public function consume($queueName, callable $handler)
    {
        $this->channel->basic_qos(null, 1, null);
        
        $callback = function ($msg) use ($handler) {
            $retryCount = $this->getRetryCount($msg);
            $data = json_decode($msg->getBody(), true);
            
            try {
                $handler($data);
                $msg->ack();
                
            } catch (Exception $e) {
                if ($retryCount < $this->maxRetries) {
                    // 发送到延迟重试队列
                    $this->sendToRetryQueue($msg, $retryCount + 1);
                    $msg->ack();
                    
                    echo "消息重试: {$retryCount}/{$this->maxRetries}\n";
                } else {
                    // 超过最大重试次数
                    $this->handleMaxRetriesExceeded($msg, $data, $e);
                    $msg->reject(false);
                }
            }
        };
        
        $this->channel->basic_consume($queueName, '', false, false, false, false, $callback);
    }
    
    private function getRetryCount($msg)
    {
        if (!$msg->has('application_headers')) {
            return 0;
        }
        
        $headers = $msg->get('application_headers')->getNativeData();
        return $headers['x-retry-count'] ?? 0;
    }
    
    private function sendToRetryQueue($msg, $retryCount)
    {
        $retryQueue = 'retry_queue';
        
        // 确保重试队列存在
        $this->channel->queue_declare(
            $retryQueue,
            false,
            true,
            false,
            false,
            false,
            new \PhpAmqpLib\Wire\AMQPTable([
                'x-message-ttl' => $this->retryDelay,
                'x-dead-letter-exchange' => '',
                'x-dead-letter-routing-key' => $msg->getRoutingKey() ?: $msg->getExchange()
            ])
        );
        
        // 发送到重试队列
        $retryMessage = new \PhpAmqpLib\Message\AMQPMessage(
            $msg->getBody(),
            [
                'delivery_mode' => \PhpAmqpLib\Message\AMQPMessage::DELIVERY_MODE_PERSISTENT,
                'application_headers' => new \PhpAmqpLib\Wire\AMQPTable([
                    'x-retry-count' => $retryCount
                ])
            ]
        );
        
        $this->channel->basic_publish($retryMessage, '', $retryQueue);
    }
    
    private function handleMaxRetriesExceeded($msg, $data, $exception)
    {
        // 记录错误
        error_log("消息超过最大重试次数: " . json_encode($data));
        
        // 发送告警
        $this->sendAlert($data, $exception);
    }
    
    private function sendAlert($data, $exception)
    {
        // 告警逻辑
    }
}

综合最佳实践

1. 完整的可靠消息系统

php
<?php

class ReliableMessagingSystem
{
    private $channel;
    private $redis;
    private $publisher;
    private $consumer;
    
    public function __construct($channel, $redis)
    {
        $this->channel = $channel;
        $this->redis = $redis;
        $this->publisher = new AsyncConfirmPublisher($channel);
        $this->consumer = new IdempotentConsumer($channel, $redis, 'main_queue');
    }
    
    public function send($exchange, $routingKey, $data)
    {
        return $this->publisher->publish($exchange, $routingKey, $data);
    }
    
    public function receive(callable $handler)
    {
        $this->consumer->consume($handler);
    }
    
    public function shutdown()
    {
        // 确保所有消息已确认
        $this->publisher->waitForConfirms();
    }
}

2. 监控指标

php
<?php

class ConfirmMetrics
{
    private $redis;
    
    const METRIC_PREFIX = 'rabbitmq:confirms:';
    
    public function __construct($redis)
    {
        $this->redis = $redis;
    }
    
    public function recordPublish($success = true)
    {
        $key = self::METRIC_PREFIX . 'publish:' . ($success ? 'success' : 'failed');
        $this->redis->incr($key);
    }
    
    public function recordAck($success = true)
    {
        $key = self::METRIC_PREFIX . 'ack:' . ($success ? 'success' : 'failed');
        $this->redis->incr($key);
    }
    
    public function getStats()
    {
        return [
            'publish_success' => (int) $this->redis->get(self::METRIC_PREFIX . 'publish:success'),
            'publish_failed' => (int) $this->redis->get(self::METRIC_PREFIX . 'publish:failed'),
            'ack_success' => (int) $this->redis->get(self::METRIC_PREFIX . 'ack:success'),
            'ack_failed' => (int) $this->redis->get(self::METRIC_PREFIX . 'ack:failed'),
        ];
    }
}

最佳实践清单

生产者确认

  • [ ] 启用发布确认模式
  • [ ] 设置确认回调处理 ACK/NACK
  • [ ] 使用批量确认提高性能
  • [ ] 处理确认超时
  • [ ] 记录失败消息
  • [ ] 实现重试机制

消费者确认

  • [ ] 使用手动确认模式
  • [ ] 设置合理的 prefetch
  • [ ] 实现幂等性处理
  • [ ] 区分可恢复和不可恢复错误
  • [ ] 实现重试机制
  • [ ] 处理死信消息

监控与告警

  • [ ] 监控确认成功率
  • [ ] 监控未确认消息数量
  • [ ] 监控消息积压
  • [ ] 设置告警阈值
  • [ ] 记录错误日志

相关链接