Appearance
RabbitMQ 队列(Queue)
概述
队列(Queue)是 RabbitMQ 中存储消息的核心组件。它遵循先进先出(FIFO)原则,是消息的缓冲区。生产者将消息发送到 Exchange,Exchange 根据路由规则将消息投递到队列,消费者从队列中获取消息进行处理。
队列的核心作用
mermaid
graph LR
subgraph 生产者端
P[Producer]
end
subgraph RabbitMQ
E[Exchange] -->|路由| Q[Queue]
Q -->|存储| M1[消息1]
Q -->|存储| M2[消息2]
Q -->|存储| M3[消息3]
end
subgraph 消费者端
C[Consumer]
end
P -->|发布| E
Q -->|投递| C队列的主要作用:
- 消息存储:临时存储待处理的消息
- 流量缓冲:削峰填谷,平滑流量
- 解耦中介:连接生产者和消费者
- 可靠传输:通过持久化和确认机制保证消息不丢失
核心知识点
1. 队列类型
mermaid
graph TB
A[Queue Types] --> B[Classic Queue]
A --> C[Quorum Queue]
A --> D[Stream]
B --> B1[传统队列]
B --> B2[单节点或镜像]
B --> B3[适合大多数场景]
C --> C1[仲裁队列]
C --> C2[基于 Raft 共识]
C --> C3[高可用性保证]
D --> D1[流式队列]
D --> D2[支持重复消费]
D --> D3[大规模日志场景]| 类型 | 特点 | 适用场景 |
|---|---|---|
| Classic Queue | 传统队列,支持镜像 | 通用场景 |
| Quorum Queue | 基于 Raft,数据安全 | 高可靠性要求 |
| Stream | 持久化日志,可重复消费 | 日志、事件流 |
2. 队列属性
基本属性
| 属性 | 参数 | 说明 |
|---|---|---|
| 持久化 | durable | 队列在 Broker 重启后是否保留 |
| 独占 | exclusive | 队列是否只能被一个连接使用 |
| 自动删除 | auto_delete | 最后一个消费者断开后是否自动删除 |
扩展属性(x-arguments)
| 属性 | 参数 | 说明 |
|---|---|---|
| 消息 TTL | x-message-ttl | 消息在队列中的存活时间(毫秒) |
| 队列 TTL | x-expires | 队列空闲多久后自动删除(毫秒) |
| 最大长度 | x-max-length | 队列最大消息数量 |
| 最大字节数 | x-max-length-bytes | 队列最大字节数 |
| 溢出行为 | x-overflow | 超出限制时的行为 |
| 死信交换机 | x-dead-letter-exchange | 消息被拒绝或过期时的目标交换机 |
| 死信路由键 | x-dead-letter-routing-key | 死信消息的路由键 |
| 最大优先级 | x-max-priority | 支持的最大优先级(0-255) |
| 惰性队列 | x-queue-mode | lazy 模式将消息存储在磁盘 |
3. 消息生命周期
mermaid
stateDiagram-v2
[*] --> Ready: 消息入队
Ready --> Unacked: 消费者获取
Unacked --> Ready: NACK(requeue=true)
Unacked --> Acked: ACK
Unacked --> Ready: 消费者断开
Acked --> [*]: 从队列删除
Ready --> DLQ: 过期/拒绝
Ready --> Dropped: 队列满4. 队列模式
惰性队列(Lazy Queue)
mermaid
graph TB
subgraph 默认模式
A1[消息] --> B1[内存]
B1 --> C1[磁盘]
end
subgraph 惰性模式
A2[消息] --> B2[磁盘]
B2 --> C2[内存]
C2 --> D2[消费者]
end惰性队列特点:
- 消息尽可能存储在磁盘
- 减少内存使用
- 适合消息量大、消费者处理慢的场景
- 牺牲性能换取稳定性
5. 队列长度限制
mermaid
graph LR
A[新消息] --> B{队列是否满?}
B -->|否| C[入队]
B -->|是| D{溢出策略}
D -->|drop-head| E[丢弃最旧消息]
D -->|reject-publish| F[拒绝新消息]
E --> G[新消息入队]
F --> H[返回错误]代码示例
基础队列操作
php
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
class QueueManager
{
private $connection;
private $channel;
public function __construct(array $config = [])
{
$this->connection = new AMQPStreamConnection(
$config['host'] ?? 'localhost',
$config['port'] ?? 5672,
$config['user'] ?? 'guest',
$config['password'] ?? 'guest',
$config['vhost'] ?? '/'
);
$this->channel = $this->connection->channel();
}
public function declareQueue(
string $name,
bool $durable = true,
bool $exclusive = false,
bool $autoDelete = false
): void {
$this->channel->queue_declare(
$name,
false,
$durable,
$exclusive,
$autoDelete
);
echo "Queue '{$name}' declared\n";
}
public function deleteQueue(string $name): void
{
$this->channel->queue_delete($name);
echo "Queue '{$name}' deleted\n";
}
public function purgeQueue(string $name): void
{
$this->channel->queue_purge($name);
echo "Queue '{$name}' purged\n";
}
public function getQueueInfo(string $name): array
{
try {
[$queue, $messageCount, $consumerCount] = $this->channel->queue_declare(
$name,
true
);
return [
'name' => $queue,
'message_count' => $messageCount,
'consumer_count' => $consumerCount
];
} catch (Exception $e) {
return [
'error' => $e->getMessage()
];
}
}
public function close(): void
{
$this->channel->close();
$this->connection->close();
}
}
$manager = new QueueManager();
$manager->declareQueue('orders_queue', true);
$info = $manager->getQueueInfo('orders_queue');
print_r($info);
$manager->close();带 TTL 的队列
php
<?php
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Wire\AMQPTable;
class TTLQueueManager
{
private $connection;
private $channel;
public function __construct(array $config = [])
{
$this->connection = new AMQPStreamConnection(
$config['host'] ?? 'localhost',
$config['port'] ?? 5672,
$config['user'] ?? 'guest',
$config['password'] => 'guest',
$config['vhost'] ?? '/'
);
$this->channel = $this->connection->channel();
}
public function declareQueueWithMessageTTL(string $name, int $ttlMs): void
{
$this->channel->queue_declare(
$name,
false,
true,
false,
false,
false,
new AMQPTable([
'x-message-ttl' => $ttlMs
])
);
echo "Queue '{$name}' declared with message TTL: {$ttlMs}ms\n";
}
public function declareQueueWithExpiry(string $name, int $expiryMs): void
{
$this->channel->queue_declare(
$name,
false,
true,
false,
false,
false,
new AMQPTable([
'x-expires' => $expiryMs
])
);
echo "Queue '{$name}' declared with expiry: {$expiryMs}ms\n";
}
public function declareQueueWithBothTTL(string $name, int $messageTtl, int $queueExpiry): void
{
$this->channel->queue_declare(
$name,
false,
true,
false,
false,
false,
new AMQPTable([
'x-message-ttl' => $messageTtl,
'x-expires' => $queueExpiry
])
);
echo "Queue '{$name}' declared with message TTL: {$messageTtl}ms, queue expiry: {$queueExpiry}ms\n";
}
public function close(): void
{
$this->channel->close();
$this->connection->close();
}
}
$manager = new TTLQueueManager();
$manager->declareQueueWithMessageTTL('temp_queue', 60000);
$manager->declareQueueWithExpiry('auto_delete_queue', 3600000);
$manager->declareQueueWithBothTTL('mixed_queue', 30000, 86400000);
$manager->close();带长度限制的队列
php
<?php
class LimitedQueueManager
{
private $connection;
private $channel;
public function __construct(array $config = [])
{
$this->connection = new AMQPStreamConnection(
$config['host'] ?? 'localhost',
$config['port'] ?? 5672,
$config['user'] ?? 'guest',
$config['password'] ?? 'guest',
$config['vhost'] ?? '/'
);
$this->channel = $this->connection->channel();
}
public function declareQueueWithMaxLength(
string $name,
int $maxLength,
string $overflow = 'drop-head'
): void {
$this->channel->queue_declare(
$name,
false,
true,
false,
false,
false,
new AMQPTable([
'x-max-length' => $maxLength,
'x-overflow' => $overflow
])
);
echo "Queue '{$name}' declared with max length: {$maxLength}, overflow: {$overflow}\n";
}
public function declareQueueWithMaxBytes(
string $name,
int $maxBytes,
string $overflow = 'drop-head'
): void {
$this->channel->queue_declare(
$name,
false,
true,
false,
false,
false,
new AMQPTable([
'x-max-length-bytes' => $maxBytes,
'x-overflow' => $overflow
])
);
echo "Queue '{$name}' declared with max bytes: {$maxBytes}, overflow: {$overflow}\n";
}
public function declareQueueWithBothLimits(
string $name,
int $maxLength,
int $maxBytes,
string $overflow = 'drop-head'
): void {
$this->channel->queue_declare(
$name,
false,
true,
false,
false,
false,
new AMQPTable([
'x-max-length' => $maxLength,
'x-max-length-bytes' => $maxBytes,
'x-overflow' => $overflow
])
);
echo "Queue '{$name}' declared with max length: {$maxLength}, max bytes: {$maxBytes}\n";
}
public function close(): void
{
$this->channel->close();
$this->connection->close();
}
}
$manager = new LimitedQueueManager();
$manager->declareQueueWithMaxLength('limited_queue', 10000, 'drop-head');
$manager->declareQueueWithMaxLength('reject_queue', 5000, 'reject-publish');
$manager->declareQueueWithMaxBytes('byte_limited_queue', 1024 * 1024 * 100);
$manager->close();死信队列配置
php
<?php
class DeadLetterQueueManager
{
private $connection;
private $channel;
public function __construct(array $config = [])
{
$this->connection = new AMQPStreamConnection(
$config['host'] ?? 'localhost',
$config['port'] ?? 5672,
$config['user'] ?? 'guest',
$config['password'] ?? 'guest',
$config['vhost'] ?? '/'
);
$this->channel = $this->connection->channel();
}
public function setupDeadLetterConfiguration(
string $queueName,
string $dlxName,
string $dlqName,
?string $dlRoutingKey = null
): void {
$this->channel->exchange_declare($dlxName, 'direct', false, true, false);
$this->channel->queue_declare($dlqName, false, true, false, false);
$this->channel->queue_bind($dlqName, $dlxName, $dlRoutingKey ?? $queueName);
$arguments = [
'x-dead-letter-exchange' => $dlxName,
'x-dead-letter-routing-key' => $dlRoutingKey ?? $queueName
];
$this->channel->queue_declare(
$queueName,
false,
true,
false,
false,
false,
new AMQPTable($arguments)
);
echo "Dead letter configuration set up:\n";
echo " - Queue: {$queueName}\n";
echo " - DLX: {$dlxName}\n";
echo " - DLQ: {$dlqName}\n";
}
public function setupDeadLetterWithTTL(
string $queueName,
string $dlxName,
string $dlqName,
int $ttlMs
): void {
$this->channel->exchange_declare($dlxName, 'direct', false, true, false);
$this->channel->queue_declare($dlqName, false, true, false, false);
$this->channel->queue_bind($dlqName, $dlxName, $queueName);
$this->channel->queue_declare(
$queueName,
false,
true,
false,
false,
false,
new AMQPTable([
'x-dead-letter-exchange' => $dlxName,
'x-dead-letter-routing-key' => $queueName,
'x-message-ttl' => $ttlMs
])
);
echo "Queue '{$queueName}' with TTL {$ttlMs}ms configured with DLQ '{$dlqName}'\n";
}
public function close(): void
{
$this->channel->close();
$this->connection->close();
}
}
$manager = new DeadLetterQueueManager();
$manager->setupDeadLetterConfiguration(
'orders_queue',
'orders_dlx',
'orders_dlq'
);
$manager->setupDeadLetterWithTTL(
'temp_orders_queue',
'temp_orders_dlx',
'temp_orders_dlq',
60000
);
$manager->close();优先级队列
php
<?php
class PriorityQueueManager
{
private $connection;
private $channel;
public function __construct(array $config = [])
{
$this->connection = new AMQPStreamConnection(
$config['host'] ?? 'localhost',
$config['port'] ?? 5672,
$config['user'] ?? 'guest',
$config['password'] ?? 'guest',
$config['vhost'] ?? '/'
);
$this->channel = $this->connection->channel();
}
public function declarePriorityQueue(string $name, int $maxPriority = 10): void
{
$this->channel->queue_declare(
$name,
false,
true,
false,
false,
false,
new AMQPTable([
'x-max-priority' => $maxPriority
])
);
echo "Priority queue '{$name}' declared with max priority: {$maxPriority}\n";
}
public function publishWithPriority(
string $queue,
string $body,
int $priority
): void {
$message = new AMQPMessage($body, [
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
'priority' => $priority
]);
$this->channel->basic_publish($message, '', $queue);
echo "Message published to '{$queue}' with priority: {$priority}\n";
}
public function close(): void
{
$this->channel->close();
$this->connection->close();
}
}
$manager = new PriorityQueueManager();
$manager->declarePriorityQueue('priority_tasks', 10);
$manager->publishWithPriority('priority_tasks', json_encode(['task' => 'low']), 1);
$manager->publishWithPriority('priority_tasks', json_encode(['task' => 'high']), 10);
$manager->publishWithPriority('priority_tasks', json_encode(['task' => 'medium']), 5);
$manager->close();惰性队列
php
<?php
class LazyQueueManager
{
private $connection;
private $channel;
public function __construct(array $config = [])
{
$this->connection = new AMQPStreamConnection(
$config['host'] ?? 'localhost',
$config['port'] ?? 5672,
$config['user'] ?? 'guest',
$config['password'] ?? 'guest',
$config['vhost'] ?? '/'
);
$this->channel = $this->connection->channel();
}
public function declareLazyQueue(string $name): void
{
$this->channel->queue_declare(
$name,
false,
true,
false,
false,
false,
new AMQPTable([
'x-queue-mode' => 'lazy'
])
);
echo "Lazy queue '{$name}' declared\n";
}
public function declareDefaultQueue(string $name): void
{
$this->channel->queue_declare(
$name,
false,
true,
false,
false,
false,
new AMQPTable([
'x-queue-mode' => 'default'
])
);
echo "Default queue '{$name}' declared\n";
}
public function close(): void
{
$this->channel->close();
$this->connection->close();
}
}
$manager = new LazyQueueManager();
$manager->declareLazyQueue('lazy_logs_queue');
$manager->declareDefaultQueue('default_queue');
$manager->close();实际应用场景
1. 订单队列配置
php
<?php
class OrderQueueSetup
{
private $channel;
public function __construct(AMQPStreamConnection $connection)
{
$this->channel = $connection->channel();
}
public function setup(): void
{
$this->channel->exchange_declare('orders_exchange', 'direct', false, true, false);
$this->channel->exchange_declare('orders_dlx', 'direct', false, true, false);
$this->channel->queue_declare(
'orders_new',
false,
true,
false,
false,
false,
new AMQPTable([
'x-dead-letter-exchange' => 'orders_dlx',
'x-dead-letter-routing-key' => 'orders_failed',
'x-message-ttl' => 86400000,
'x-max-priority' => 5
])
);
$this->channel->queue_declare(
'orders_processing',
false,
true,
false,
false,
false,
new AMQPTable([
'x-dead-letter-exchange' => 'orders_dlx',
'x-dead-letter-routing-key' => 'orders_failed'
])
);
$this->channel->queue_declare(
'orders_completed',
false,
true,
false,
false,
false,
new AMQPTable([
'x-message-ttl' => 604800000,
'x-max-length' => 100000
])
);
$this->channel->queue_declare('orders_failed', false, true, false, false);
$this->channel->queue_bind('orders_new', 'orders_exchange', 'order.new');
$this->channel->queue_bind('orders_processing', 'orders_exchange', 'order.processing');
$this->channel->queue_bind('orders_completed', 'orders_exchange', 'order.completed');
$this->channel->queue_bind('orders_failed', 'orders_dlx', 'orders_failed');
echo "Order queues setup completed\n";
}
}2. 延迟队列实现
php
<?php
class DelayQueueManager
{
private $channel;
public function __construct(AMQPStreamConnection $connection)
{
$this->channel = $connection->channel();
}
public function setupDelayQueue(string $targetQueue, int $delayMs): string
{
$delayQueue = "{$targetQueue}.delay.{$delayMs}";
$this->channel->queue_declare(
$delayQueue,
false,
true,
false,
false,
false,
new AMQPTable([
'x-dead-letter-exchange' => '',
'x-dead-letter-routing-key' => $targetQueue,
'x-message-ttl' => $delayMs
])
);
echo "Delay queue '{$delayQueue}' created for target '{$targetQueue}'\n";
return $delayQueue;
}
public function publishWithDelay(string $targetQueue, string $body, int $delayMs): void
{
$delayQueues = [
1000 => 'delay.1s',
5000 => 'delay.5s',
10000 => 'delay.10s',
30000 => 'delay.30s',
60000 => 'delay.1m',
300000 => 'delay.5m',
600000 => 'delay.10m',
3600000 => 'delay.1h'
];
$closestDelay = null;
foreach ($delayQueues as $delay => $name) {
if ($delay >= $delayMs) {
$closestDelay = $delay;
break;
}
}
if ($closestDelay === null) {
$closestDelay = 3600000;
}
$delayQueue = $this->setupDelayQueue($targetQueue, $closestDelay);
$message = new AMQPMessage($body, [
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT
]);
$this->channel->basic_publish($message, '', $delayQueue);
echo "Message published to delay queue, will arrive in {$closestDelay}ms\n";
}
}3. 工作队列配置
php
<?php
class WorkQueueSetup
{
private $channel;
public function __construct(AMQPStreamConnection $connection)
{
$this->channel = $connection->channel();
}
public function setupWorkQueue(string $name, array $options = []): void
{
$options = array_merge([
'durable' => true,
'max_length' => null,
'message_ttl' => null,
'dead_letter_exchange' => null,
'priority' => false,
'lazy' => false
], $options);
$arguments = [];
if ($options['max_length']) {
$arguments['x-max-length'] = $options['max_length'];
$arguments['x-overflow'] = 'reject-publish';
}
if ($options['message_ttl']) {
$arguments['x-message-ttl'] = $options['message_ttl'];
}
if ($options['dead_letter_exchange']) {
$arguments['x-dead-letter-exchange'] = $options['dead_letter_exchange'];
}
if ($options['priority']) {
$arguments['x-max-priority'] = is_int($options['priority']) ? $options['priority'] : 10;
}
if ($options['lazy']) {
$arguments['x-queue-mode'] = 'lazy';
}
$this->channel->queue_declare(
$name,
false,
$options['durable'],
false,
false,
false,
empty($arguments) ? null : new AMQPTable($arguments)
);
echo "Work queue '{$name}' created with options: " . json_encode($options) . "\n";
}
}常见问题与解决方案
1. 队列堆积问题
问题原因:
- 消费者处理速度慢
- 消费者数量不足
- 消费者异常
解决方案:
php
<?php
class QueueMonitor
{
private $channel;
public function checkQueueHealth(string $queueName, int $warningThreshold = 10000, int $criticalThreshold = 50000): array
{
try {
[$queue, $messageCount, $consumerCount] = $this->channel->queue_declare($queueName, true);
$status = 'healthy';
$alerts = [];
if ($messageCount > $criticalThreshold) {
$status = 'critical';
$alerts[] = "Queue '{$queueName}' has {$messageCount} messages (critical)";
} elseif ($messageCount > $warningThreshold) {
$status = 'warning';
$alerts[] = "Queue '{$queueName}' has {$messageCount} messages (warning)";
}
if ($consumerCount === 0 && $messageCount > 0) {
$status = 'critical';
$alerts[] = "Queue '{$queueName}' has no consumers but {$messageCount} messages";
}
return [
'queue' => $queueName,
'message_count' => $messageCount,
'consumer_count' => $consumerCount,
'status' => $status,
'alerts' => $alerts
];
} catch (Exception $e) {
return [
'queue' => $queueName,
'status' => 'error',
'error' => $e->getMessage()
];
}
}
public function getBacklogMetrics(string $queueName): array
{
[$queue, $messageCount, $consumerCount] = $this->channel->queue_declare($queueName, true);
$avgProcessingTime = 0.5;
$estimatedClearTime = $consumerCount > 0
? ($messageCount / $consumerCount) * $avgProcessingTime
: -1;
return [
'queue' => $queueName,
'message_count' => $messageCount,
'consumer_count' => $consumerCount,
'estimated_clear_seconds' => $estimatedClearTime,
'estimated_clear_human' => $this->formatTime($estimatedClearTime)
];
}
private function formatTime(float $seconds): string
{
if ($seconds < 0) {
return 'N/A (no consumers)';
}
$hours = floor($seconds / 3600);
$minutes = floor(($seconds % 3600) / 60);
$secs = $seconds % 60;
return sprintf('%02d:%02d:%02d', $hours, $minutes, $secs);
}
}2. 队列不存在问题
问题原因:
- 队列未声明
- 队列被自动删除
- 队列名称错误
解决方案:
php
<?php
class SafeQueueProducer
{
private $channel;
private $declaredQueues = [];
public function ensureQueueExists(string $queueName, array $options = []): void
{
if (isset($this->declaredQueues[$queueName])) {
return;
}
$options = array_merge([
'durable' => true,
'arguments' => []
], $options);
try {
$this->channel->queue_declare(
$queueName,
false,
$options['durable'],
false,
false,
false,
empty($options['arguments']) ? null : new AMQPTable($options['arguments'])
);
$this->declaredQueues[$queueName] = true;
echo "Queue '{$queueName}' ensured to exist\n";
} catch (Exception $e) {
throw new RuntimeException("Failed to ensure queue '{$queueName}': " . $e->getMessage());
}
}
public function publish(string $queueName, string $body, array $options = []): void
{
$this->ensureQueueExists($queueName, $options['queue_options'] ?? []);
$message = new AMQPMessage($body, [
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT
]);
$this->channel->basic_publish($message, '', $queueName);
}
}3. 内存溢出问题
问题原因:
- 队列消息过多
- 消息体积过大
- 非惰性队列
解决方案:
php
<?php
class MemorySafeQueue
{
private $channel;
public function declareMemorySafeQueue(
string $name,
int $maxLength = 100000,
int $maxBytes = 1073741824
): void {
$this->channel->queue_declare(
$name,
false,
true,
false,
false,
false,
new AMQPTable([
'x-max-length' => $maxLength,
'x-max-length-bytes' => $maxBytes,
'x-overflow' => 'reject-publish',
'x-queue-mode' => 'lazy'
])
);
echo "Memory-safe queue '{$name}' declared\n";
}
public function publishWithCheck(string $queue, string $body): bool
{
$maxMessageSize = 16 * 1024 * 1024;
if (strlen($body) > $maxMessageSize) {
throw new RuntimeException("Message size exceeds limit");
}
[$q, $messageCount, $consumerCount] = $this->channel->queue_declare($queue, true);
if ($messageCount > 50000) {
echo "Warning: Queue '{$queue}' has high message count: {$messageCount}\n";
}
try {
$message = new AMQPMessage($body, [
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT
]);
$this->channel->basic_publish($message, '', $queue);
return true;
} catch (Exception $e) {
if (strpos($e->getMessage(), 'RESOURCE_LOCKED') !== false) {
echo "Queue is full, message rejected\n";
return false;
}
throw $e;
}
}
}最佳实践建议
1. 队列命名规范
php
<?php
class QueueNamingConvention
{
public static function generateQueueName(
string $domain,
string $entity,
string $action,
?string $suffix = null
): string {
$parts = [$domain, $entity, $action];
if ($suffix) {
$parts[] = $suffix;
}
return implode('.', $parts);
}
public static function generateDLQName(string $originalQueue): string
{
return "{$originalQueue}.dlq";
}
public static function generateDelayQueueName(string $originalQueue, int $delayMs): string
{
return "{$originalQueue}.delay.{$delayMs}";
}
}
$orderQueue = QueueNamingConvention::generateQueueName('order', 'payment', 'pending');
$orderDLQ = QueueNamingConvention::generateDLQName($orderQueue);2. 队列配置模板
php
<?php
class QueueTemplates
{
public static function standardQueue(string $name): array
{
return [
'name' => $name,
'durable' => true,
'arguments' => []
];
}
public static function criticalQueue(string $name): array
{
return [
'name' => $name,
'durable' => true,
'arguments' => [
'x-dead-letter-exchange' => "{$name}.dlx",
'x-dead-letter-routing-key' => "{$name}.failed"
]
];
}
public static function temporaryQueue(string $name, int $ttlMs): array
{
return [
'name' => $name,
'durable' => false,
'arguments' => [
'x-message-ttl' => $ttlMs,
'x-expires' => $ttlMs * 2
]
];
}
public static function boundedQueue(string $name, int $maxLength): array
{
return [
'name' => $name,
'durable' => true,
'arguments' => [
'x-max-length' => $maxLength,
'x-overflow' => 'reject-publish'
]
];
}
public static function priorityQueue(string $name, int $maxPriority = 10): array
{
return [
'name' => $name,
'durable' => true,
'arguments' => [
'x-max-priority' => $maxPriority
]
];
}
}3. 队列监控
php
<?php
class QueueHealthChecker
{
private $apiUrl;
private $credentials;
public function __construct(string $host, string $user, string $password)
{
$this->apiUrl = "http://{$host}:15672/api";
$this->credentials = base64_encode("{$user}:{$password}");
}
public function getQueueMetrics(string $vhost, string $queue): array
{
$url = "{$this->apiUrl}/queues/" . urlencode($vhost) . "/" . urlencode($queue);
$ch = curl_init();
curl_setopt($ch, CURLOPT_URL, $url);
curl_setopt($ch, CURLOPT_RETURNTRANSFER, true);
curl_setopt($ch, CURLOPT_HTTPHEADER, [
"Authorization: Basic {$this->credentials}"
]);
$response = curl_exec($ch);
curl_close($ch);
return json_decode($response, true);
}
public function checkHealth(string $vhost, array $queues): array
{
$results = [];
foreach ($queues as $queue) {
$metrics = $this->getQueueMetrics($vhost, $queue);
$results[$queue] = [
'messages' => $metrics['messages'] ?? 0,
'messages_ready' => $metrics['messages_ready'] ?? 0,
'messages_unacked' => $metrics['messages_unacked'] ?? 0,
'consumers' => $metrics['consumers'] ?? 0,
'memory' => $metrics['memory'] ?? 0,
'status' => $this->determineStatus($metrics)
];
}
return $results;
}
private function determineStatus(array $metrics): string
{
$messages = $metrics['messages'] ?? 0;
$consumers = $metrics['consumers'] ?? 0;
if ($consumers === 0 && $messages > 0) {
return 'no_consumers';
}
if ($messages > 100000) {
return 'backlog_critical';
}
if ($messages > 10000) {
return 'backlog_warning';
}
return 'healthy';
}
}