Skip to content

消息堆积问题

概述

消息堆积是指消息生产速度超过消费速度,导致队列中消息数量持续增长的现象。轻微堆积是正常现象,但严重堆积会消耗大量资源,影响系统稳定性。

问题表现与症状

常见症状

┌─────────────────────────────────────────────────────────────┐
│                    消息堆积典型症状                          │
├─────────────────────────────────────────────────────────────┤
│  1. 队列消息数量持续增长,不见下降                           │
│  2. 消费者处理延迟明显增加                                   │
│  3. 内存使用率持续上升                                       │
│  4. 客户端连接响应变慢                                       │
│  5. 触发流控(Flow Control)告警                            │
│  6. 管理界面显示队列红色警告                                 │
└─────────────────────────────────────────────────────────────┘

问题严重程度分级

┌────────────────────────────────────────────────────────────────┐
│  等级  │  消息数量  │  内存占用  │  处理建议                   │
├────────────────────────────────────────────────────────────────┤
│  正常  │  < 1万     │  < 50%     │  无需处理                   │
│  轻度  │  1万-10万  │  50%-70%   │  关注,准备预案             │
│  中度  │  10万-100万│  70%-85%   │  启动扩容,增加消费者       │
│  重度  │  > 100万   │  > 85%     │  紧急处理,临时队列分流     │
│  危急  │  触发流控  │  > 95%     │  熔断降级,紧急清理         │
└────────────────────────────────────────────────────────────────┘

问题排查流程图

                    ┌─────────────────┐
                    │  发现消息堆积    │
                    └────────┬────────┘

              ┌──────────────┼──────────────┐
              ▼              ▼              ▼
       ┌────────────┐ ┌────────────┐ ┌────────────┐
       │ 生产端     │ │ 消费端     │ │ Broker端   │
       │ 排查      │ │ 排查       │ │ 排查       │
       └─────┬──────┘ └─────┬──────┘ └─────┬──────┘
             │              │              │
             ▼              ▼              ▼
       ┌────────────┐ ┌────────────┐ ┌────────────┐
       │ 生产速率   │ │ 消费者数量 │ │ 资源限制   │
       │ 异常增加   │ │ 不足       │ │ 配置不当   │
       └────────────┘ └────────────┘ └────────────┘

问题原因分析

1. 生产端原因

原因说明排查方法
流量突增业务高峰或促销活动检查业务监控
批量任务定时任务大量发送检查任务调度
循环发送代码逻辑错误导致循环检查应用日志
重试风暴失败消息大量重试检查错误日志

2. 消费端原因

原因说明排查方法
消费者数量不足消费能力跟不上检查消费者数量
消费逻辑慢数据库查询慢、外部调用慢性能分析
消费者异常消费者频繁重启或崩溃检查错误日志
预取设置不当prefetch设置过大或过小检查配置

3. Broker端原因

原因说明排查方法
资源不足CPU、内存、磁盘IO瓶颈系统监控
队列配置不当惰性队列vs默认队列检查队列类型
磁盘空间不足触发磁盘告警检查磁盘空间
网络带宽限制网络吞吐量不足网络监控

诊断步骤

步骤1:快速评估堆积情况

bash
# 查看所有队列消息数量
rabbitmqctl list_queues name messages messages_ready messages_unacked

# 按消息数量排序
rabbitmqctl list_queues name messages | sort -k2 -n -r | head -20

# 查看队列内存占用
rabbitmqctl list_queues name memory

# 查看队列消费者数量
rabbitmqctl list_queues name consumers

步骤2:分析消息流入流出速率

bash
# 查看消息统计详情
rabbitmqctl list_queues name message_stats

# 使用管理API获取详细统计
curl -u guest:guest http://localhost:15672/api/queues | jq '.[] | {name, messages, message_stats}'

# 监控消息速率变化
watch -n 1 'rabbitmqctl list_queues name messages messages_ready'

步骤3:检查消费者状态

bash
# 查看消费者列表
rabbitmqctl list_consumers

# 查看通道状态
rabbitmqctl list_channels name consumer_count messages_unacked

# 查看连接状态
rabbitmqctl list_connections user channels

步骤4:系统资源检查

bash
# 检查内存使用
rabbitmqctl status | grep -A 20 memory

# 检查磁盘空间
df -h /var/lib/rabbitmq

# 检查磁盘IO
iostat -x 1 10

# 检查网络连接
netstat -an | grep 5672 | wc -l

解决方案

1. 紧急处理方案

临时队列分流

php
<?php

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

class MessageDiversion
{
    private $connection;
    private $channel;

    public function __construct()
    {
        $this->connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
        $this->channel = $this->connection->channel();
    }

    public function createOverflowQueues(string $originalQueue, int $shardCount = 5)
    {
        for ($i = 1; $i <= $shardCount; $i++) {
            $shardQueueName = "{$originalQueue}_shard_{$i}";
            
            $this->channel->queue_declare(
                $shardQueueName,
                false,
                true,
                false,
                false
            );
            
            echo "创建分片队列: {$shardQueueName}\n";
        }
    }

    public function divertMessages(string $sourceQueue, array $targetQueues)
    {
        $this->channel->basic_qos(null, 1000, null);
        
        $divertedCount = 0;
        $targetIndex = 0;
        $targetCount = count($targetQueues);
        
        $callback = function (AMQPMessage $message) use (&$divertedCount, &$targetIndex, $targetCount, $targetQueues) {
            $targetQueue = $targetQueues[$targetIndex % $targetCount];
            
            $newMessage = new AMQPMessage(
                $message->getBody(),
                [
                    'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
                ]
            );
            
            $this->channel->basic_publish($newMessage, '', $targetQueue);
            $message->ack();
            
            $divertedCount++;
            $targetIndex++;
            
            if ($divertedCount % 1000 === 0) {
                echo "已分流 {$divertedCount} 条消息\n";
            }
        };
        
        $this->channel->basic_consume($sourceQueue, '', false, false, false, false, $callback);
        
        $timeout = 10;
        while ($this->channel->is_consuming()) {
            $this->channel->wait(null, false, $timeout);
        }
        
        echo "分流完成,共处理 {$divertedCount} 条消息\n";
    }

    public function purgeQueue(string $queueName)
    {
        $this->channel->queue_purge($queueName);
        echo "队列 {$queueName} 已清空\n";
    }

    public function close()
    {
        $this->channel->close();
        $this->connection->close();
    }
}

// 紧急分流示例
$diversion = new MessageDiversion();

$originalQueue = 'orders.queue';
$shardQueues = [];

for ($i = 1; $i <= 5; $i++) {
    $shardQueues[] = "{$originalQueue}_shard_{$i}";
}

$diversion->createOverflowQueues($originalQueue, 5);
$diversion->divertMessages($originalQueue, $shardQueues);
$diversion->close();

2. 消费端扩容方案

动态消费者管理

php
<?php

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

class ScalableConsumerPool
{
    private $config;
    private $consumers = [];
    private $maxConsumers = 10;
    private $minConsumers = 2;
    private $messagesPerConsumer = 1000;

    public function __construct(array $config)
    {
        $this->config = $config;
    }

    public function start()
    {
        for ($i = 0; $i < $this->minConsumers; $i++) {
            $this->spawnConsumer();
        }
        
        $this->monitorAndScale();
    }

    private function spawnConsumer()
    {
        $pid = pcntl_fork();
        
        if ($pid === -1) {
            throw new \RuntimeException('无法创建消费者进程');
        } elseif ($pid === 0) {
            $this->runConsumer();
            exit(0);
        } else {
            $this->consumers[$pid] = [
                'started_at' => time(),
                'messages_processed' => 0,
            ];
            echo "启动消费者进程: {$pid}\n";
        }
    }

    private function runConsumer()
    {
        $connection = new AMQPStreamConnection(
            $this->config['host'],
            $this->config['port'],
            $this->config['user'],
            $this->config['password']
        );
        
        $channel = $connection->channel();
        $channel->basic_qos(null, $this->messagesPerConsumer, null);
        
        $callback = function (AMQPMessage $message) {
            $this->processMessage($message);
            $message->ack();
        };
        
        $channel->basic_consume(
            $this->config['queue'],
            '',
            false,
            false,
            false,
            false,
            $callback
        );
        
        while ($channel->is_consuming()) {
            $channel->wait();
        }
        
        $channel->close();
        $connection->close();
    }

    private function processMessage(AMQPMessage $message)
    {
        $body = json_decode($message->getBody(), true);
        
        // 业务处理逻辑
        usleep(10000); // 模拟处理时间
        
        echo sprintf(
            "[%d] 处理消息: %s\n",
            getmypid(),
            $body['id'] ?? 'unknown'
        );
    }

    private function monitorAndScale()
    {
        while (true) {
            $this->reapDeadConsumers();
            
            $queueStats = $this->getQueueStats();
            $currentConsumers = count($this->consumers);
            
            if ($queueStats['messages'] > $currentConsumers * $this->messagesPerConsumer * 2) {
                if ($currentConsumers < $this->maxConsumers) {
                    echo "检测到消息堆积,扩容消费者...\n";
                    $this->spawnConsumer();
                }
            } elseif ($queueStats['messages'] < $currentConsumers * $this->messagesPerConsumer * 0.5) {
                if ($currentConsumers > $this->minConsumers) {
                    echo "消息量下降,可考虑缩减消费者\n";
                }
            }
            
            sleep(10);
        }
    }

    private function getQueueStats(): array
    {
        $connection = new AMQPStreamConnection(
            $this->config['host'],
            $this->config['port'],
            $this->config['user'],
            $this->config['password']
        );
        
        $channel = $connection->channel();
        
        [$queue, $messages, $consumers] = $channel->queue_declare(
            $this->config['queue'],
            true
        );
        
        $channel->close();
        $connection->close();
        
        return [
            'messages' => $messages,
            'consumers' => $consumers,
        ];
    }

    private function reapDeadConsumers()
    {
        foreach ($this->consumers as $pid => $info) {
            $res = pcntl_waitpid($pid, $status, WNOHANG);
            
            if ($res === $pid) {
                unset($this->consumers[$pid]);
                echo "消费者进程 {$pid} 已退出\n";
            }
        }
    }
}

// 使用示例
$pool = new ScalableConsumerPool([
    'host' => 'localhost',
    'port' => 5672,
    'user' => 'guest',
    'password' => 'guest',
    'queue' => 'orders.queue',
]);

$pool->start();

3. 消费性能优化

批量消费处理

php
<?php

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

class BatchConsumer
{
    private $connection;
    private $channel;
    private $batchSize = 100;
    private $batchTimeout = 5;
    private $batch = [];

    public function __construct()
    {
        $this->connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
        $this->channel = $this->connection->channel();
        $this->channel->basic_qos(null, $this->batchSize, null);
    }

    public function consume(string $queue)
    {
        $callback = function (AMQPMessage $message) {
            $this->batch[] = $message;
            
            if (count($this->batch) >= $this->batchSize) {
                $this->processBatch();
            }
        };
        
        $this->channel->basic_consume($queue, '', false, false, false, false, $callback);
        
        $lastProcessTime = time();
        
        while ($this->channel->is_consuming()) {
            $this->channel->wait(null, false, 1);
            
            if (!empty($this->batch) && (time() - $lastProcessTime) >= $this->batchTimeout) {
                $this->processBatch();
                $lastProcessTime = time();
            }
        }
    }

    private function processBatch()
    {
        if (empty($this->batch)) {
            return;
        }
        
        $messages = array_map(function (AMQPMessage $msg) {
            return json_decode($msg->getBody(), true);
        }, $this->batch);
        
        echo sprintf(
            "[%s] 批量处理 %d 条消息\n",
            date('Y-m-d H:i:s'),
            count($messages)
        );
        
        $this->batchInsertToDatabase($messages);
        
        foreach ($this->batch as $message) {
            $message->ack();
        }
        
        $this->batch = [];
    }

    private function batchInsertToDatabase(array $messages)
    {
        // 批量数据库插入逻辑
        // INSERT INTO ... VALUES (...), (...), (...)
    }

    public function close()
    {
        $this->channel->close();
        $this->connection->close();
    }
}

// 使用示例
$consumer = new BatchConsumer();
$consumer->consume('orders.queue');

4. 惰性队列配置

php
<?php

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Wire\AMQPTable;

class LazyQueueSetup
{
    private $connection;
    private $channel;

    public function __construct()
    {
        $this->connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
        $this->channel = $this->connection->channel();
    }

    public function createLazyQueue(string $queueName)
    {
        $args = new AMQPTable([
            'x-queue-type' => 'lazy',
        ]);
        
        $this->channel->queue_declare(
            $queueName,
            false,
            true,
            false,
            false,
            false,
            $args
        );
        
        echo "创建惰性队列: {$queueName}\n";
    }

    public function convertToLazyQueue(string $queueName)
    {
        $policy = [
            'pattern' => '^' . preg_quote($queueName) . '$',
            'definition' => [
                'queue-type' => 'lazy',
            ],
            'priority' => 1,
            'apply-to' => 'queues',
        ];
        
        // 使用 rabbitmqctl 设置策略
        // rabbitmqctl set_policy lazy-orders "^orders\.queue$" '{"queue-type":"lazy"}' --apply-to queues
    }

    public function close()
    {
        $this->channel->close();
        $this->connection->close();
    }
}

预防措施

1. 监控告警配置

yaml
# Prometheus 告警规则
groups:
  - name: rabbitmq_backlog
    rules:
      - alert: QueueBacklogWarning
        expr: rabbitmq_queue_messages_ready > 10000
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "队列消息堆积预警"
          description: "队列 {{ $labels.queue }} 消息堆积超过 10000"

      - alert: QueueBacklogCritical
        expr: rabbitmq_queue_messages_ready > 100000
        for: 5m
        labels:
          severity: critical
        annotations:
          summary: "队列消息堆积严重"
          description: "队列 {{ $labels.queue }} 消息堆积超过 100000,需要立即处理"

      - alert: ConsumerLag
        expr: |
          rate(rabbitmq_queue_messages_ready[5m]) 
          > 
          rate(rabbitmq_queue_messages_delivered[5m]) * 2
        for: 10m
        labels:
          severity: warning
        annotations:
          summary: "消费速率低于生产速率"
          description: "队列 {{ $labels.queue }} 消费速率明显低于生产速率"

2. 自动化处理脚本

bash
#!/bin/bash
# auto_scale_consumers.sh

QUEUE_NAME="orders.queue"
MAX_MESSAGES_PER_CONSUMER=5000
MAX_CONSUMERS=20
CONSUMER_SCRIPT="/path/to/consumer.php"

get_queue_messages() {
    rabbitmqctl list_queues name messages | grep "^$QUEUE_NAME" | awk '{print $2}'
}

get_active_consumers() {
    rabbitmqctl list_queues name consumers | grep "^$QUEUE_NAME" | awk '{print $2}'
}

messages=$(get_queue_messages)
consumers=$(get_active_consumers)
consumers=${consumers:-0}

required_consumers=$((messages / MAX_MESSAGES_PER_CONSUMER + 1))

if [ "$required_consumers" -gt "$MAX_CONSUMERS" ]; then
    required_consumers=$MAX_CONSUMERS
fi

if [ "$required_consumers" -gt "$consumers" ]; then
    to_start=$((required_consumers - consumers))
    echo "需要启动 {$to_start} 个消费者"
    
    for ((i=1; i<=to_start; i++)); do
        nohup php $CONSUMER_SCRIPT > /dev/null 2>&1 &
    done
fi

3. 容量规划

┌─────────────────────────────────────────────────────────────┐
│                    消费容量计算公式                          │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  所需消费者数量 = 消息生产速率 / 单消费者处理速率             │
│                                                             │
│  示例计算:                                                   │
│  - 生产速率: 10000 消息/秒                                   │
│  - 单消费者处理速率: 500 消息/秒                             │
│  - 所需消费者: 10000 / 500 = 20 个                          │
│  - 建议冗余: 20 * 1.5 = 30 个                               │
│                                                             │
└─────────────────────────────────────────────────────────────┘

注意事项

  1. 不要盲目清空队列:清空前评估业务影响
  2. 扩容要有上限:避免资源耗尽
  3. 惰性队列有代价:会增加磁盘IO
  4. 监控要全面:包括生产、消费、积压三方面
  5. 预案要提前准备:定期演练堆积处理流程

相关链接