Skip to content

RabbitMQ 消费者限流

概述

消费者限流是控制消息消费速率的重要机制,通过合理配置预取计数和确认模式,可以实现精准的流量控制,保护消费者和系统稳定性。

核心知识点

限流机制原理

┌─────────────────────────────────────────────────────────────┐
│                    消费者限流原理                            │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  RabbitMQ                        消费者                     │
│    │                               │                        │
│    │  ┌─────────────────────┐      │                        │
│    │  │     队列            │      │                        │
│    │  │  ┌───┬───┬───┬───┐ │      │                        │
│    │  │  │ M │ M │ M │ M │ │      │                        │
│    │  │  └───┴───┴───┴───┘ │      │                        │
│    │  └──────────┬──────────┘      │                        │
│    │             │                 │                        │
│    │    prefetch_count = 2         │                        │
│    │             │                 │                        │
│    │     ┌───────┴───────┐         │                        │
│    │     │               │         │                        │
│    │     ▼               ▼         │                        │
│    │  ┌─────┐        ┌─────┐      │                        │
│    │  │ M1  │───────▶│ M2  │─────▶│                        │
│    │  └─────┘        └─────┘      │                        │
│    │                               │                        │
│    │  等待确认后才能发送新消息      │                        │
│    │                               │                        │
│    │◀────────── ACK ───────────────│                        │
│    │                               │                        │
│    │     ┌───────┐                 │                        │
│    │     │ M3    │────────────────▶│                        │
│    │     └───────┘                 │                        │
│                                                             │
└─────────────────────────────────────────────────────────────┘

预取计数(Prefetch Count)

prefetch_count = N 意味着:

1. 每个消费者最多同时有 N 条未确认消息
2. 只有确认后才会发送新消息
3. 控制消费者的处理压力

设置建议:
┌─────────────────────────────────────────────────────────────┐
│  场景                    │  推荐值     │  说明              │
├─────────────────────────────────────────────────────────────┤
│  低延迟要求              │  1-5       │  快速响应          │
│  高吞吐量                │  10-50     │  批量处理          │
│  处理时间长              │  1-3       │  避免积压          │
│  多消费者                │  较小值    │  均衡分配          │
└─────────────────────────────────────────────────────────────┘

确认模式影响

确认模式说明限流效果
自动确认消息投递即确认无限流效果
手动确认处理完成后确认有效限流
批量确认一次确认多条提高吞吐

全局 vs 单通道

┌─────────────────────────────────────────────────────────────┐
│              global 参数影响                                 │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  global = false(默认):                                    │
│  ┌─────────────────────────────────────────────────────┐   │
│  │  每个消费者独立计算预取                              │   │
│  │  Consumer 1: prefetch = 10                          │   │
│  │  Consumer 2: prefetch = 10                          │   │
│  │  总未确认: 最多 20 条                                │   │
│  └─────────────────────────────────────────────────────┘   │
│                                                             │
│  global = true:                                            │
│  ┌─────────────────────────────────────────────────────┐   │
│  │  整个通道共享预取                                    │   │
│  │  Channel prefetch = 10                              │   │
│  │  总未确认: 最多 10 条                                │   │
│  └─────────────────────────────────────────────────────┘   │
│                                                             │
└─────────────────────────────────────────────────────────────┘

配置示例

基础限流配置

php
<?php

use PhpAmqpLib\Channel\AMQPChannel;

function setupConsumerWithLimit(AMQPChannel $channel, string $queueName, int $prefetchCount): void
{
    // 设置预取计数
    // 参数: prefetch_size, prefetch_count, global
    $channel->basic_qos(null, $prefetchCount, null);
    
    // 消费消息
    $channel->basic_consume(
        $queueName,
        '',
        false,  // no_local
        false,  // no_ack (false = 手动确认)
        false,  // exclusive
        false,  // nowait
        function ($msg) {
            processMessage($msg);
            $msg->ack();
        }
    );
}

多消费者限流

php
<?php

use PhpAmqpLib\Channel\AMQPChannel;

function setupMultipleConsumers(AMQPChannel $channel, string $queueName, int $consumerCount): void
{
    // 每个消费者的预取计数
    $prefetchPerConsumer = 5;
    
    // 设置全局预取(可选)
    // $channel->basic_qos(null, $prefetchPerConsumer * $consumerCount, true);
    
    // 设置单个消费者预取
    $channel->basic_qos(null, $prefetchPerConsumer, false);
    
    for ($i = 0; $i < $consumerCount; $i++) {
        $channel->basic_consume(
            $queueName,
            "consumer_{$i}",
            false,
            false,
            false,
            false,
            function ($msg) use ($i) {
                echo "Consumer {$i} processing: " . $msg->body . "\n";
                $msg->ack();
            }
        );
    }
}

动态限流

php
<?php

use PhpAmqpLib\Channel\AMQPChannel;

class DynamicRateLimiter
{
    private AMQPChannel $channel;
    private int $currentPrefetch;
    private int $minPrefetch = 1;
    private int $maxPrefetch = 50;
    
    public function __construct(AMQPChannel $channel, int $initialPrefetch = 10)
    {
        $this->channel = $channel;
        $this->currentPrefetch = $initialPrefetch;
        $this->applyPrefetch();
    }
    
    public function adjustBasedOnLoad(float $processingTime): void
    {
        if ($processingTime < 0.1) {
            $this->increasePrefetch();
        } elseif ($processingTime > 0.5) {
            $this->decreasePrefetch();
        }
    }

    private function increasePrefetch(): void
    {
        $newPrefetch = min($this->currentPrefetch * 1.2, $this->maxPrefetch);
        
        if ((int) $newPrefetch !== $this->currentPrefetch) {
            $this->currentPrefetch = (int) $newPrefetch;
            $this->applyPrefetch();
        }
    }

    private function decreasePrefetch(): void
    {
        $newPrefetch = max($this->currentPrefetch * 0.8, $this->minPrefetch);
        
        if ((int) $newPrefetch !== $this->currentPrefetch) {
            $this->currentPrefetch = (int) $newPrefetch;
            $this->applyPrefetch();
        }
    }

    private function applyPrefetch(): void
    {
        $this->channel->basic_qos(null, $this->currentPrefetch, null);
    }

    public function getCurrentPrefetch(): int
    {
        return $this->currentPrefetch;
    }
}

PHP 代码示例

消费者限流管理器

php
<?php

namespace App\RabbitMQ\Consumer;

use PhpAmqpLib\Channel\AMQPChannel;

class ConsumerRateLimiter
{
    private AMQPChannel $channel;
    private array $consumers = [];
    private int $defaultPrefetch;
    
    public function __construct(AMQPChannel $channel, int $defaultPrefetch = 10)
    {
        $this->channel = $channel;
        $this->defaultPrefetch = $defaultPrefetch;
    }
    
    public function registerConsumer(
        string $queueName,
        string $consumerTag,
        callable $callback,
        int $prefetchCount = null
    ): void {
        $prefetch = $prefetchCount ?? $this->defaultPrefetch;
        
        $this->channel->basic_qos(null, $prefetch, null);
        
        $this->channel->basic_consume(
            $queueName,
            $consumerTag,
            false,
            false,
            false,
            false,
            function ($msg) use ($callback, $consumerTag) {
                $startTime = microtime(true);
                
                $callback($msg);
                
                $processingTime = microtime(true) - $startTime;
                $this->recordProcessingTime($consumerTag, $processingTime);
                
                $msg->ack();
            }
        );
        
        $this->consumers[$consumerTag] = [
            'queue' => $queueName,
            'prefetch' => $prefetch,
            'messages_processed' => 0,
            'total_processing_time' => 0,
        ];
    }

    public function adjustPrefetch(string $consumerTag, int $newPrefetch): void
    {
        if (!isset($this->consumers[$consumerTag])) {
            return;
        }
        
        $this->consumers[$consumerTag]['prefetch'] = $newPrefetch;
        $this->channel->basic_qos(null, $newPrefetch, null);
    }

    public function getConsumerStats(string $consumerTag): array
    {
        if (!isset($this->consumers[$consumerTag])) {
            return [];
        }
        
        $consumer = $this->consumers[$consumerTag];
        $avgProcessingTime = $consumer['messages_processed'] > 0
            ? $consumer['total_processing_time'] / $consumer['messages_processed']
            : 0;
        
        return [
            'consumer_tag' => $consumerTag,
            'queue' => $consumer['queue'],
            'prefetch' => $consumer['prefetch'],
            'messages_processed' => $consumer['messages_processed'],
            'avg_processing_time' => round($avgProcessingTime * 1000, 2) . ' ms',
        ];
    }

    public function getRecommendedPrefetch(string $consumerTag): int
    {
        if (!isset($this->consumers[$consumerTag])) {
            return $this->defaultPrefetch;
        }
        
        $consumer = $this->consumers[$consumerTag];
        $avgTime = $consumer['messages_processed'] > 0
            ? $consumer['total_processing_time'] / $consumer['messages_processed']
            : 0;
        
        if ($avgTime < 0.01) {
            return min($consumer['prefetch'] * 2, 100);
        } elseif ($avgTime > 0.1) {
            return max($consumer['prefetch'] / 2, 1);
        }
        
        return $consumer['prefetch'];
    }

    private function recordProcessingTime(string $consumerTag, float $time): void
    {
        if (isset($this->consumers[$consumerTag])) {
            $this->consumers[$consumerTag]['messages_processed']++;
            $this->consumers[$consumerTag]['total_processing_time'] += $time;
        }
    }

    public function startConsuming(int $timeout = 0): void
    {
        while (count($this->channel->callbacks)) {
            $this->channel->wait(null, false, $timeout);
        }
    }
}

自适应限流器

php
<?php

namespace App\RabbitMQ\Consumer;

use PhpAmqpLib\Channel\AMQPChannel;

class AdaptiveRateLimiter
{
    private AMQPChannel $channel;
    private int $minPrefetch;
    private int $maxPrefetch;
    private int $currentPrefetch;
    private array $processingTimes = [];
    private int $sampleSize = 100;
    
    public function __construct(
        AMQPChannel $channel,
        int $minPrefetch = 1,
        int $maxPrefetch = 100,
        int $initialPrefetch = 10
    ) {
        $this->channel = $channel;
        $this->minPrefetch = $minPrefetch;
        $this->maxPrefetch = $maxPrefetch;
        $this->currentPrefetch = $initialPrefetch;
        
        $this->applyPrefetch();
    }
    
    public function recordProcessingTime(float $time): void
    {
        $this->processingTimes[] = $time;
        
        if (count($this->processingTimes) >= $this->sampleSize) {
            $this->adjust();
            $this->processingTimes = [];
        }
    }

    public function getWrappedCallback(callable $originalCallback): callable
    {
        return function ($msg) use ($originalCallback) {
            $startTime = microtime(true);
            
            $originalCallback($msg);
            
            $processingTime = microtime(true) - $startTime;
            $this->recordProcessingTime($processingTime);
            
            $msg->ack();
        };
    }

    private function adjust(): void
    {
        if (empty($this->processingTimes)) {
            return;
        }
        
        $avgTime = array_sum($this->processingTimes) / count($this->processingTimes);
        $p99Time = $this->calculatePercentile($this->processingTimes, 99);
        
        if ($avgTime < 0.01 && $p99Time < 0.05) {
            $this->increasePrefetch();
        } elseif ($avgTime > 0.1 || $p99Time > 0.5) {
            $this->decreasePrefetch();
        }
    }

    private function increasePrefetch(): void
    {
        $newPrefetch = min(
            (int) ($this->currentPrefetch * 1.5),
            $this->maxPrefetch
        );
        
        if ($newPrefetch !== $this->currentPrefetch) {
            $this->currentPrefetch = $newPrefetch;
            $this->applyPrefetch();
        }
    }

    private function decreasePrefetch(): void
    {
        $newPrefetch = max(
            (int) ($this->currentPrefetch * 0.7),
            $this->minPrefetch
        );
        
        if ($newPrefetch !== $this->currentPrefetch) {
            $this->currentPrefetch = $newPrefetch;
            $this->applyPrefetch();
        }
    }

    private function applyPrefetch(): void
    {
        $this->channel->basic_qos(null, $this->currentPrefetch, null);
    }

    private function calculatePercentile(array $data, float $percentile): float
    {
        sort($data);
        $index = (count($data) - 1) * ($percentile / 100);
        $lower = (int) floor($index);
        $upper = (int) ceil($index);
        
        if ($lower === $upper) {
            return $data[$lower];
        }
        
        return $data[$lower] + ($data[$upper] - $data[$lower]) * ($index - $lower);
    }

    public function getCurrentPrefetch(): int
    {
        return $this->currentPrefetch;
    }

    public function getStats(): array
    {
        return [
            'current_prefetch' => $this->currentPrefetch,
            'min_prefetch' => $this->minPrefetch,
            'max_prefetch' => $this->maxPrefetch,
            'sample_count' => count($this->processingTimes),
        ];
    }
}

批量确认限流

php
<?php

namespace App\RabbitMQ\Consumer;

use PhpAmqpLib\Channel\AMQPChannel;
use PhpAmqpLib\Message\AMQPMessage;

class BatchAckConsumer
{
    private AMQPChannel $channel;
    private int $batchSize;
    private int $currentBatch = 0;
    private ?int $lastDeliveryTag = null;
    
    public function __construct(AMQPChannel $channel, int $batchSize = 10)
    {
        $this->channel = $channel;
        $this->batchSize = $batchSize;
        
        $this->channel->basic_qos(null, $batchSize * 2, null);
    }
    
    public function consume(string $queueName, callable $processor): void
    {
        $this->channel->basic_consume(
            $queueName,
            '',
            false,
            false,
            false,
            false,
            function (AMQPMessage $msg) use ($processor) {
                $processor($msg);
                $this->handleAck($msg);
            }
        );
        
        while ($this->channel->is_consuming()) {
            $this->channel->wait();
        }
    }

    private function handleAck(AMQPMessage $msg): void
    {
        $this->currentBatch++;
        $this->lastDeliveryTag = $msg->getDeliveryTag();
        
        if ($this->currentBatch >= $this->batchSize) {
            $this->ackBatch();
        }
    }

    private function ackBatch(): void
    {
        if ($this->lastDeliveryTag !== null) {
            $this->channel->basic_ack($this->lastDeliveryTag, true);
            $this->currentBatch = 0;
            $this->lastDeliveryTag = null;
        }
    }

    public function forceAck(): void
    {
        $this->ackBatch();
    }

    public function getPendingCount(): int
    {
        return $this->currentBatch;
    }
}

实际应用场景

场景一:高吞吐消费

php
<?php

class HighThroughputConsumer
{
    private AMQPChannel $channel;
    
    public function start(string $queueName): void
    {
        // 高预取计数提高吞吐
        $this->channel->basic_qos(null, 50, null);
        
        // 批量确认
        $batchConsumer = new BatchAckConsumer($this->channel, 20);
        $batchConsumer->consume($queueName, function ($msg) {
            $this->processMessage($msg);
        });
    }

    private function processMessage($msg): void
    {
        // 处理逻辑
    }
}

场景二:低延迟消费

php
<?php

class LowLatencyConsumer
{
    private AMQPChannel $channel;
    
    public function start(string $queueName): void
    {
        // 低预取计数减少延迟
        $this->channel->basic_qos(null, 1, null);
        
        $this->channel->basic_consume(
            $queueName,
            '',
            false,
            false,
            false,
            false,
            function ($msg) {
                $this->processMessage($msg);
                $msg->ack();
            }
        );
    }

    private function processMessage($msg): void
    {
        // 处理逻辑
    }
}

常见问题与解决方案

问题一:消息积压

解决方案

php
// 增加预取计数
$channel->basic_qos(null, 20, null);

// 增加消费者数量

问题二:消费者过载

解决方案

php
// 降低预取计数
$channel->basic_qos(null, 1, null);

// 使用自适应限流
$limiter = new AdaptiveRateLimiter($channel, 1, 10, 3);

最佳实践建议

预取计数选择

场景预取计数说明
快速处理10-50高吞吐
慢处理1-3避免积压
不确定自适应动态调整

确认策略

策略适用场景
单条确认可靠性优先
批量确认吞吐量优先
异步确认平衡场景

监控指标

指标告警阈值
未确认消息数> prefetch × 2
处理延迟> 目标延迟 × 2
消费者数量< 预期值

相关链接