Skip to content

RabbitMQ 队列(Queue)

概述

队列(Queue)是 RabbitMQ 中存储消息的核心组件。它遵循先进先出(FIFO)原则,是消息的缓冲区。生产者将消息发送到 Exchange,Exchange 根据路由规则将消息投递到队列,消费者从队列中获取消息进行处理。

队列的核心作用

mermaid
graph LR
    subgraph 生产者端
        P[Producer]
    end
    
    subgraph RabbitMQ
        E[Exchange] -->|路由| Q[Queue]
        Q -->|存储| M1[消息1]
        Q -->|存储| M2[消息2]
        Q -->|存储| M3[消息3]
    end
    
    subgraph 消费者端
        C[Consumer]
    end
    
    P -->|发布| E
    Q -->|投递| C

队列的主要作用:

  • 消息存储:临时存储待处理的消息
  • 流量缓冲:削峰填谷,平滑流量
  • 解耦中介:连接生产者和消费者
  • 可靠传输:通过持久化和确认机制保证消息不丢失

核心知识点

1. 队列类型

mermaid
graph TB
    A[Queue Types] --> B[Classic Queue]
    A --> C[Quorum Queue]
    A --> D[Stream]
    
    B --> B1[传统队列]
    B --> B2[单节点或镜像]
    B --> B3[适合大多数场景]
    
    C --> C1[仲裁队列]
    C --> C2[基于 Raft 共识]
    C --> C3[高可用性保证]
    
    D --> D1[流式队列]
    D --> D2[支持重复消费]
    D --> D3[大规模日志场景]
类型特点适用场景
Classic Queue传统队列,支持镜像通用场景
Quorum Queue基于 Raft,数据安全高可靠性要求
Stream持久化日志,可重复消费日志、事件流

2. 队列属性

基本属性

属性参数说明
持久化durable队列在 Broker 重启后是否保留
独占exclusive队列是否只能被一个连接使用
自动删除auto_delete最后一个消费者断开后是否自动删除

扩展属性(x-arguments)

属性参数说明
消息 TTLx-message-ttl消息在队列中的存活时间(毫秒)
队列 TTLx-expires队列空闲多久后自动删除(毫秒)
最大长度x-max-length队列最大消息数量
最大字节数x-max-length-bytes队列最大字节数
溢出行为x-overflow超出限制时的行为
死信交换机x-dead-letter-exchange消息被拒绝或过期时的目标交换机
死信路由键x-dead-letter-routing-key死信消息的路由键
最大优先级x-max-priority支持的最大优先级(0-255)
惰性队列x-queue-modelazy 模式将消息存储在磁盘

3. 消息生命周期

mermaid
stateDiagram-v2
    [*] --> Ready: 消息入队
    Ready --> Unacked: 消费者获取
    Unacked --> Ready: NACK(requeue=true)
    Unacked --> Acked: ACK
    Unacked --> Ready: 消费者断开
    Acked --> [*]: 从队列删除
    Ready --> DLQ: 过期/拒绝
    Ready --> Dropped: 队列满

4. 队列模式

惰性队列(Lazy Queue)

mermaid
graph TB
    subgraph 默认模式
        A1[消息] --> B1[内存]
        B1 --> C1[磁盘]
    end
    
    subgraph 惰性模式
        A2[消息] --> B2[磁盘]
        B2 --> C2[内存]
        C2 --> D2[消费者]
    end

惰性队列特点:

  • 消息尽可能存储在磁盘
  • 减少内存使用
  • 适合消息量大、消费者处理慢的场景
  • 牺牲性能换取稳定性

5. 队列长度限制

mermaid
graph LR
    A[新消息] --> B{队列是否满?}
    B -->|否| C[入队]
    B -->|是| D{溢出策略}
    D -->|drop-head| E[丢弃最旧消息]
    D -->|reject-publish| F[拒绝新消息]
    E --> G[新消息入队]
    F --> H[返回错误]

代码示例

基础队列操作

php
<?php

require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

class QueueManager
{
    private $connection;
    private $channel;
    
    public function __construct(array $config = [])
    {
        $this->connection = new AMQPStreamConnection(
            $config['host'] ?? 'localhost',
            $config['port'] ?? 5672,
            $config['user'] ?? 'guest',
            $config['password'] ?? 'guest',
            $config['vhost'] ?? '/'
        );
        
        $this->channel = $this->connection->channel();
    }
    
    public function declareQueue(
        string $name,
        bool $durable = true,
        bool $exclusive = false,
        bool $autoDelete = false
    ): void {
        $this->channel->queue_declare(
            $name,
            false,
            $durable,
            $exclusive,
            $autoDelete
        );
        
        echo "Queue '{$name}' declared\n";
    }
    
    public function deleteQueue(string $name): void
    {
        $this->channel->queue_delete($name);
        echo "Queue '{$name}' deleted\n";
    }
    
    public function purgeQueue(string $name): void
    {
        $this->channel->queue_purge($name);
        echo "Queue '{$name}' purged\n";
    }
    
    public function getQueueInfo(string $name): array
    {
        try {
            [$queue, $messageCount, $consumerCount] = $this->channel->queue_declare(
                $name,
                true
            );
            
            return [
                'name' => $queue,
                'message_count' => $messageCount,
                'consumer_count' => $consumerCount
            ];
        } catch (Exception $e) {
            return [
                'error' => $e->getMessage()
            ];
        }
    }
    
    public function close(): void
    {
        $this->channel->close();
        $this->connection->close();
    }
}

$manager = new QueueManager();

$manager->declareQueue('orders_queue', true);

$info = $manager->getQueueInfo('orders_queue');
print_r($info);

$manager->close();

带 TTL 的队列

php
<?php

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Wire\AMQPTable;

class TTLQueueManager
{
    private $connection;
    private $channel;
    
    public function __construct(array $config = [])
    {
        $this->connection = new AMQPStreamConnection(
            $config['host'] ?? 'localhost',
            $config['port'] ?? 5672,
            $config['user'] ?? 'guest',
            $config['password'] => 'guest',
            $config['vhost'] ?? '/'
        );
        
        $this->channel = $this->connection->channel();
    }
    
    public function declareQueueWithMessageTTL(string $name, int $ttlMs): void
    {
        $this->channel->queue_declare(
            $name,
            false,
            true,
            false,
            false,
            false,
            new AMQPTable([
                'x-message-ttl' => $ttlMs
            ])
        );
        
        echo "Queue '{$name}' declared with message TTL: {$ttlMs}ms\n";
    }
    
    public function declareQueueWithExpiry(string $name, int $expiryMs): void
    {
        $this->channel->queue_declare(
            $name,
            false,
            true,
            false,
            false,
            false,
            new AMQPTable([
                'x-expires' => $expiryMs
            ])
        );
        
        echo "Queue '{$name}' declared with expiry: {$expiryMs}ms\n";
    }
    
    public function declareQueueWithBothTTL(string $name, int $messageTtl, int $queueExpiry): void
    {
        $this->channel->queue_declare(
            $name,
            false,
            true,
            false,
            false,
            false,
            new AMQPTable([
                'x-message-ttl' => $messageTtl,
                'x-expires' => $queueExpiry
            ])
        );
        
        echo "Queue '{$name}' declared with message TTL: {$messageTtl}ms, queue expiry: {$queueExpiry}ms\n";
    }
    
    public function close(): void
    {
        $this->channel->close();
        $this->connection->close();
    }
}

$manager = new TTLQueueManager();

$manager->declareQueueWithMessageTTL('temp_queue', 60000);

$manager->declareQueueWithExpiry('auto_delete_queue', 3600000);

$manager->declareQueueWithBothTTL('mixed_queue', 30000, 86400000);

$manager->close();

带长度限制的队列

php
<?php

class LimitedQueueManager
{
    private $connection;
    private $channel;
    
    public function __construct(array $config = [])
    {
        $this->connection = new AMQPStreamConnection(
            $config['host'] ?? 'localhost',
            $config['port'] ?? 5672,
            $config['user'] ?? 'guest',
            $config['password'] ?? 'guest',
            $config['vhost'] ?? '/'
        );
        
        $this->channel = $this->connection->channel();
    }
    
    public function declareQueueWithMaxLength(
        string $name,
        int $maxLength,
        string $overflow = 'drop-head'
    ): void {
        $this->channel->queue_declare(
            $name,
            false,
            true,
            false,
            false,
            false,
            new AMQPTable([
                'x-max-length' => $maxLength,
                'x-overflow' => $overflow
            ])
        );
        
        echo "Queue '{$name}' declared with max length: {$maxLength}, overflow: {$overflow}\n";
    }
    
    public function declareQueueWithMaxBytes(
        string $name,
        int $maxBytes,
        string $overflow = 'drop-head'
    ): void {
        $this->channel->queue_declare(
            $name,
            false,
            true,
            false,
            false,
            false,
            new AMQPTable([
                'x-max-length-bytes' => $maxBytes,
                'x-overflow' => $overflow
            ])
        );
        
        echo "Queue '{$name}' declared with max bytes: {$maxBytes}, overflow: {$overflow}\n";
    }
    
    public function declareQueueWithBothLimits(
        string $name,
        int $maxLength,
        int $maxBytes,
        string $overflow = 'drop-head'
    ): void {
        $this->channel->queue_declare(
            $name,
            false,
            true,
            false,
            false,
            false,
            new AMQPTable([
                'x-max-length' => $maxLength,
                'x-max-length-bytes' => $maxBytes,
                'x-overflow' => $overflow
            ])
        );
        
        echo "Queue '{$name}' declared with max length: {$maxLength}, max bytes: {$maxBytes}\n";
    }
    
    public function close(): void
    {
        $this->channel->close();
        $this->connection->close();
    }
}

$manager = new LimitedQueueManager();

$manager->declareQueueWithMaxLength('limited_queue', 10000, 'drop-head');

$manager->declareQueueWithMaxLength('reject_queue', 5000, 'reject-publish');

$manager->declareQueueWithMaxBytes('byte_limited_queue', 1024 * 1024 * 100);

$manager->close();

死信队列配置

php
<?php

class DeadLetterQueueManager
{
    private $connection;
    private $channel;
    
    public function __construct(array $config = [])
    {
        $this->connection = new AMQPStreamConnection(
            $config['host'] ?? 'localhost',
            $config['port'] ?? 5672,
            $config['user'] ?? 'guest',
            $config['password'] ?? 'guest',
            $config['vhost'] ?? '/'
        );
        
        $this->channel = $this->connection->channel();
    }
    
    public function setupDeadLetterConfiguration(
        string $queueName,
        string $dlxName,
        string $dlqName,
        ?string $dlRoutingKey = null
    ): void {
        $this->channel->exchange_declare($dlxName, 'direct', false, true, false);
        
        $this->channel->queue_declare($dlqName, false, true, false, false);
        
        $this->channel->queue_bind($dlqName, $dlxName, $dlRoutingKey ?? $queueName);
        
        $arguments = [
            'x-dead-letter-exchange' => $dlxName,
            'x-dead-letter-routing-key' => $dlRoutingKey ?? $queueName
        ];
        
        $this->channel->queue_declare(
            $queueName,
            false,
            true,
            false,
            false,
            false,
            new AMQPTable($arguments)
        );
        
        echo "Dead letter configuration set up:\n";
        echo "  - Queue: {$queueName}\n";
        echo "  - DLX: {$dlxName}\n";
        echo "  - DLQ: {$dlqName}\n";
    }
    
    public function setupDeadLetterWithTTL(
        string $queueName,
        string $dlxName,
        string $dlqName,
        int $ttlMs
    ): void {
        $this->channel->exchange_declare($dlxName, 'direct', false, true, false);
        
        $this->channel->queue_declare($dlqName, false, true, false, false);
        
        $this->channel->queue_bind($dlqName, $dlxName, $queueName);
        
        $this->channel->queue_declare(
            $queueName,
            false,
            true,
            false,
            false,
            false,
            new AMQPTable([
                'x-dead-letter-exchange' => $dlxName,
                'x-dead-letter-routing-key' => $queueName,
                'x-message-ttl' => $ttlMs
            ])
        );
        
        echo "Queue '{$queueName}' with TTL {$ttlMs}ms configured with DLQ '{$dlqName}'\n";
    }
    
    public function close(): void
    {
        $this->channel->close();
        $this->connection->close();
    }
}

$manager = new DeadLetterQueueManager();

$manager->setupDeadLetterConfiguration(
    'orders_queue',
    'orders_dlx',
    'orders_dlq'
);

$manager->setupDeadLetterWithTTL(
    'temp_orders_queue',
    'temp_orders_dlx',
    'temp_orders_dlq',
    60000
);

$manager->close();

优先级队列

php
<?php

class PriorityQueueManager
{
    private $connection;
    private $channel;
    
    public function __construct(array $config = [])
    {
        $this->connection = new AMQPStreamConnection(
            $config['host'] ?? 'localhost',
            $config['port'] ?? 5672,
            $config['user'] ?? 'guest',
            $config['password'] ?? 'guest',
            $config['vhost'] ?? '/'
        );
        
        $this->channel = $this->connection->channel();
    }
    
    public function declarePriorityQueue(string $name, int $maxPriority = 10): void
    {
        $this->channel->queue_declare(
            $name,
            false,
            true,
            false,
            false,
            false,
            new AMQPTable([
                'x-max-priority' => $maxPriority
            ])
        );
        
        echo "Priority queue '{$name}' declared with max priority: {$maxPriority}\n";
    }
    
    public function publishWithPriority(
        string $queue,
        string $body,
        int $priority
    ): void {
        $message = new AMQPMessage($body, [
            'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
            'priority' => $priority
        ]);
        
        $this->channel->basic_publish($message, '', $queue);
        
        echo "Message published to '{$queue}' with priority: {$priority}\n";
    }
    
    public function close(): void
    {
        $this->channel->close();
        $this->connection->close();
    }
}

$manager = new PriorityQueueManager();

$manager->declarePriorityQueue('priority_tasks', 10);

$manager->publishWithPriority('priority_tasks', json_encode(['task' => 'low']), 1);
$manager->publishWithPriority('priority_tasks', json_encode(['task' => 'high']), 10);
$manager->publishWithPriority('priority_tasks', json_encode(['task' => 'medium']), 5);

$manager->close();

惰性队列

php
<?php

class LazyQueueManager
{
    private $connection;
    private $channel;
    
    public function __construct(array $config = [])
    {
        $this->connection = new AMQPStreamConnection(
            $config['host'] ?? 'localhost',
            $config['port'] ?? 5672,
            $config['user'] ?? 'guest',
            $config['password'] ?? 'guest',
            $config['vhost'] ?? '/'
        );
        
        $this->channel = $this->connection->channel();
    }
    
    public function declareLazyQueue(string $name): void
    {
        $this->channel->queue_declare(
            $name,
            false,
            true,
            false,
            false,
            false,
            new AMQPTable([
                'x-queue-mode' => 'lazy'
            ])
        );
        
        echo "Lazy queue '{$name}' declared\n";
    }
    
    public function declareDefaultQueue(string $name): void
    {
        $this->channel->queue_declare(
            $name,
            false,
            true,
            false,
            false,
            false,
            new AMQPTable([
                'x-queue-mode' => 'default'
            ])
        );
        
        echo "Default queue '{$name}' declared\n";
    }
    
    public function close(): void
    {
        $this->channel->close();
        $this->connection->close();
    }
}

$manager = new LazyQueueManager();

$manager->declareLazyQueue('lazy_logs_queue');

$manager->declareDefaultQueue('default_queue');

$manager->close();

实际应用场景

1. 订单队列配置

php
<?php

class OrderQueueSetup
{
    private $channel;
    
    public function __construct(AMQPStreamConnection $connection)
    {
        $this->channel = $connection->channel();
    }
    
    public function setup(): void
    {
        $this->channel->exchange_declare('orders_exchange', 'direct', false, true, false);
        
        $this->channel->exchange_declare('orders_dlx', 'direct', false, true, false);
        
        $this->channel->queue_declare(
            'orders_new',
            false,
            true,
            false,
            false,
            false,
            new AMQPTable([
                'x-dead-letter-exchange' => 'orders_dlx',
                'x-dead-letter-routing-key' => 'orders_failed',
                'x-message-ttl' => 86400000,
                'x-max-priority' => 5
            ])
        );
        
        $this->channel->queue_declare(
            'orders_processing',
            false,
            true,
            false,
            false,
            false,
            new AMQPTable([
                'x-dead-letter-exchange' => 'orders_dlx',
                'x-dead-letter-routing-key' => 'orders_failed'
            ])
        );
        
        $this->channel->queue_declare(
            'orders_completed',
            false,
            true,
            false,
            false,
            false,
            new AMQPTable([
                'x-message-ttl' => 604800000,
                'x-max-length' => 100000
            ])
        );
        
        $this->channel->queue_declare('orders_failed', false, true, false, false);
        
        $this->channel->queue_bind('orders_new', 'orders_exchange', 'order.new');
        $this->channel->queue_bind('orders_processing', 'orders_exchange', 'order.processing');
        $this->channel->queue_bind('orders_completed', 'orders_exchange', 'order.completed');
        $this->channel->queue_bind('orders_failed', 'orders_dlx', 'orders_failed');
        
        echo "Order queues setup completed\n";
    }
}

2. 延迟队列实现

php
<?php

class DelayQueueManager
{
    private $channel;
    
    public function __construct(AMQPStreamConnection $connection)
    {
        $this->channel = $connection->channel();
    }
    
    public function setupDelayQueue(string $targetQueue, int $delayMs): string
    {
        $delayQueue = "{$targetQueue}.delay.{$delayMs}";
        
        $this->channel->queue_declare(
            $delayQueue,
            false,
            true,
            false,
            false,
            false,
            new AMQPTable([
                'x-dead-letter-exchange' => '',
                'x-dead-letter-routing-key' => $targetQueue,
                'x-message-ttl' => $delayMs
            ])
        );
        
        echo "Delay queue '{$delayQueue}' created for target '{$targetQueue}'\n";
        
        return $delayQueue;
    }
    
    public function publishWithDelay(string $targetQueue, string $body, int $delayMs): void
    {
        $delayQueues = [
            1000 => 'delay.1s',
            5000 => 'delay.5s',
            10000 => 'delay.10s',
            30000 => 'delay.30s',
            60000 => 'delay.1m',
            300000 => 'delay.5m',
            600000 => 'delay.10m',
            3600000 => 'delay.1h'
        ];
        
        $closestDelay = null;
        foreach ($delayQueues as $delay => $name) {
            if ($delay >= $delayMs) {
                $closestDelay = $delay;
                break;
            }
        }
        
        if ($closestDelay === null) {
            $closestDelay = 3600000;
        }
        
        $delayQueue = $this->setupDelayQueue($targetQueue, $closestDelay);
        
        $message = new AMQPMessage($body, [
            'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT
        ]);
        
        $this->channel->basic_publish($message, '', $delayQueue);
        
        echo "Message published to delay queue, will arrive in {$closestDelay}ms\n";
    }
}

3. 工作队列配置

php
<?php

class WorkQueueSetup
{
    private $channel;
    
    public function __construct(AMQPStreamConnection $connection)
    {
        $this->channel = $connection->channel();
    }
    
    public function setupWorkQueue(string $name, array $options = []): void
    {
        $options = array_merge([
            'durable' => true,
            'max_length' => null,
            'message_ttl' => null,
            'dead_letter_exchange' => null,
            'priority' => false,
            'lazy' => false
        ], $options);
        
        $arguments = [];
        
        if ($options['max_length']) {
            $arguments['x-max-length'] = $options['max_length'];
            $arguments['x-overflow'] = 'reject-publish';
        }
        
        if ($options['message_ttl']) {
            $arguments['x-message-ttl'] = $options['message_ttl'];
        }
        
        if ($options['dead_letter_exchange']) {
            $arguments['x-dead-letter-exchange'] = $options['dead_letter_exchange'];
        }
        
        if ($options['priority']) {
            $arguments['x-max-priority'] = is_int($options['priority']) ? $options['priority'] : 10;
        }
        
        if ($options['lazy']) {
            $arguments['x-queue-mode'] = 'lazy';
        }
        
        $this->channel->queue_declare(
            $name,
            false,
            $options['durable'],
            false,
            false,
            false,
            empty($arguments) ? null : new AMQPTable($arguments)
        );
        
        echo "Work queue '{$name}' created with options: " . json_encode($options) . "\n";
    }
}

常见问题与解决方案

1. 队列堆积问题

问题原因

  • 消费者处理速度慢
  • 消费者数量不足
  • 消费者异常

解决方案

php
<?php

class QueueMonitor
{
    private $channel;
    
    public function checkQueueHealth(string $queueName, int $warningThreshold = 10000, int $criticalThreshold = 50000): array
    {
        try {
            [$queue, $messageCount, $consumerCount] = $this->channel->queue_declare($queueName, true);
            
            $status = 'healthy';
            $alerts = [];
            
            if ($messageCount > $criticalThreshold) {
                $status = 'critical';
                $alerts[] = "Queue '{$queueName}' has {$messageCount} messages (critical)";
            } elseif ($messageCount > $warningThreshold) {
                $status = 'warning';
                $alerts[] = "Queue '{$queueName}' has {$messageCount} messages (warning)";
            }
            
            if ($consumerCount === 0 && $messageCount > 0) {
                $status = 'critical';
                $alerts[] = "Queue '{$queueName}' has no consumers but {$messageCount} messages";
            }
            
            return [
                'queue' => $queueName,
                'message_count' => $messageCount,
                'consumer_count' => $consumerCount,
                'status' => $status,
                'alerts' => $alerts
            ];
        } catch (Exception $e) {
            return [
                'queue' => $queueName,
                'status' => 'error',
                'error' => $e->getMessage()
            ];
        }
    }
    
    public function getBacklogMetrics(string $queueName): array
    {
        [$queue, $messageCount, $consumerCount] = $this->channel->queue_declare($queueName, true);
        
        $avgProcessingTime = 0.5;
        
        $estimatedClearTime = $consumerCount > 0 
            ? ($messageCount / $consumerCount) * $avgProcessingTime 
            : -1;
        
        return [
            'queue' => $queueName,
            'message_count' => $messageCount,
            'consumer_count' => $consumerCount,
            'estimated_clear_seconds' => $estimatedClearTime,
            'estimated_clear_human' => $this->formatTime($estimatedClearTime)
        ];
    }
    
    private function formatTime(float $seconds): string
    {
        if ($seconds < 0) {
            return 'N/A (no consumers)';
        }
        
        $hours = floor($seconds / 3600);
        $minutes = floor(($seconds % 3600) / 60);
        $secs = $seconds % 60;
        
        return sprintf('%02d:%02d:%02d', $hours, $minutes, $secs);
    }
}

2. 队列不存在问题

问题原因

  • 队列未声明
  • 队列被自动删除
  • 队列名称错误

解决方案

php
<?php

class SafeQueueProducer
{
    private $channel;
    private $declaredQueues = [];
    
    public function ensureQueueExists(string $queueName, array $options = []): void
    {
        if (isset($this->declaredQueues[$queueName])) {
            return;
        }
        
        $options = array_merge([
            'durable' => true,
            'arguments' => []
        ], $options);
        
        try {
            $this->channel->queue_declare(
                $queueName,
                false,
                $options['durable'],
                false,
                false,
                false,
                empty($options['arguments']) ? null : new AMQPTable($options['arguments'])
            );
            
            $this->declaredQueues[$queueName] = true;
            
            echo "Queue '{$queueName}' ensured to exist\n";
        } catch (Exception $e) {
            throw new RuntimeException("Failed to ensure queue '{$queueName}': " . $e->getMessage());
        }
    }
    
    public function publish(string $queueName, string $body, array $options = []): void
    {
        $this->ensureQueueExists($queueName, $options['queue_options'] ?? []);
        
        $message = new AMQPMessage($body, [
            'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT
        ]);
        
        $this->channel->basic_publish($message, '', $queueName);
    }
}

3. 内存溢出问题

问题原因

  • 队列消息过多
  • 消息体积过大
  • 非惰性队列

解决方案

php
<?php

class MemorySafeQueue
{
    private $channel;
    
    public function declareMemorySafeQueue(
        string $name,
        int $maxLength = 100000,
        int $maxBytes = 1073741824
    ): void {
        $this->channel->queue_declare(
            $name,
            false,
            true,
            false,
            false,
            false,
            new AMQPTable([
                'x-max-length' => $maxLength,
                'x-max-length-bytes' => $maxBytes,
                'x-overflow' => 'reject-publish',
                'x-queue-mode' => 'lazy'
            ])
        );
        
        echo "Memory-safe queue '{$name}' declared\n";
    }
    
    public function publishWithCheck(string $queue, string $body): bool
    {
        $maxMessageSize = 16 * 1024 * 1024;
        
        if (strlen($body) > $maxMessageSize) {
            throw new RuntimeException("Message size exceeds limit");
        }
        
        [$q, $messageCount, $consumerCount] = $this->channel->queue_declare($queue, true);
        
        if ($messageCount > 50000) {
            echo "Warning: Queue '{$queue}' has high message count: {$messageCount}\n";
        }
        
        try {
            $message = new AMQPMessage($body, [
                'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT
            ]);
            
            $this->channel->basic_publish($message, '', $queue);
            
            return true;
        } catch (Exception $e) {
            if (strpos($e->getMessage(), 'RESOURCE_LOCKED') !== false) {
                echo "Queue is full, message rejected\n";
                return false;
            }
            throw $e;
        }
    }
}

最佳实践建议

1. 队列命名规范

php
<?php

class QueueNamingConvention
{
    public static function generateQueueName(
        string $domain,
        string $entity,
        string $action,
        ?string $suffix = null
    ): string {
        $parts = [$domain, $entity, $action];
        
        if ($suffix) {
            $parts[] = $suffix;
        }
        
        return implode('.', $parts);
    }
    
    public static function generateDLQName(string $originalQueue): string
    {
        return "{$originalQueue}.dlq";
    }
    
    public static function generateDelayQueueName(string $originalQueue, int $delayMs): string
    {
        return "{$originalQueue}.delay.{$delayMs}";
    }
}

$orderQueue = QueueNamingConvention::generateQueueName('order', 'payment', 'pending');
$orderDLQ = QueueNamingConvention::generateDLQName($orderQueue);

2. 队列配置模板

php
<?php

class QueueTemplates
{
    public static function standardQueue(string $name): array
    {
        return [
            'name' => $name,
            'durable' => true,
            'arguments' => []
        ];
    }
    
    public static function criticalQueue(string $name): array
    {
        return [
            'name' => $name,
            'durable' => true,
            'arguments' => [
                'x-dead-letter-exchange' => "{$name}.dlx",
                'x-dead-letter-routing-key' => "{$name}.failed"
            ]
        ];
    }
    
    public static function temporaryQueue(string $name, int $ttlMs): array
    {
        return [
            'name' => $name,
            'durable' => false,
            'arguments' => [
                'x-message-ttl' => $ttlMs,
                'x-expires' => $ttlMs * 2
            ]
        ];
    }
    
    public static function boundedQueue(string $name, int $maxLength): array
    {
        return [
            'name' => $name,
            'durable' => true,
            'arguments' => [
                'x-max-length' => $maxLength,
                'x-overflow' => 'reject-publish'
            ]
        ];
    }
    
    public static function priorityQueue(string $name, int $maxPriority = 10): array
    {
        return [
            'name' => $name,
            'durable' => true,
            'arguments' => [
                'x-max-priority' => $maxPriority
            ]
        ];
    }
}

3. 队列监控

php
<?php

class QueueHealthChecker
{
    private $apiUrl;
    private $credentials;
    
    public function __construct(string $host, string $user, string $password)
    {
        $this->apiUrl = "http://{$host}:15672/api";
        $this->credentials = base64_encode("{$user}:{$password}");
    }
    
    public function getQueueMetrics(string $vhost, string $queue): array
    {
        $url = "{$this->apiUrl}/queues/" . urlencode($vhost) . "/" . urlencode($queue);
        
        $ch = curl_init();
        curl_setopt($ch, CURLOPT_URL, $url);
        curl_setopt($ch, CURLOPT_RETURNTRANSFER, true);
        curl_setopt($ch, CURLOPT_HTTPHEADER, [
            "Authorization: Basic {$this->credentials}"
        ]);
        
        $response = curl_exec($ch);
        curl_close($ch);
        
        return json_decode($response, true);
    }
    
    public function checkHealth(string $vhost, array $queues): array
    {
        $results = [];
        
        foreach ($queues as $queue) {
            $metrics = $this->getQueueMetrics($vhost, $queue);
            
            $results[$queue] = [
                'messages' => $metrics['messages'] ?? 0,
                'messages_ready' => $metrics['messages_ready'] ?? 0,
                'messages_unacked' => $metrics['messages_unacked'] ?? 0,
                'consumers' => $metrics['consumers'] ?? 0,
                'memory' => $metrics['memory'] ?? 0,
                'status' => $this->determineStatus($metrics)
            ];
        }
        
        return $results;
    }
    
    private function determineStatus(array $metrics): string
    {
        $messages = $metrics['messages'] ?? 0;
        $consumers = $metrics['consumers'] ?? 0;
        
        if ($consumers === 0 && $messages > 0) {
            return 'no_consumers';
        }
        
        if ($messages > 100000) {
            return 'backlog_critical';
        }
        
        if ($messages > 10000) {
            return 'backlog_warning';
        }
        
        return 'healthy';
    }
}

相关链接