Appearance
RabbitMQ 消费者(Consumer)
概述
消费者(Consumer)是 RabbitMQ 消息模型中的消息接收者和处理者。它从队列中获取消息并执行相应的业务逻辑。消费者与生产者完全解耦,不需要知道消息的来源,也不需要知道消息是否被其他消费者处理。
消费者的核心职责
mermaid
graph LR
A[订阅队列] --> B[接收消息]
B --> C[解析消息]
C --> D[业务处理]
D --> E{处理成功?}
E -->|是| F[发送 ACK]
E -->|否| G[发送 NACK/Reject]
F --> H[继续消费]
G --> H消费者的主要职责包括:
- 队列订阅:订阅一个或多个队列
- 消息接收:从队列获取消息
- 消息处理:执行业务逻辑
- 结果确认:发送 ACK 或 NACK
- 错误处理:处理异常情况
核心知识点
1. 消费模式
RabbitMQ 支持两种消费模式:
mermaid
graph TB
subgraph 推模式 - basic.consume
A1[Broker] -->|主动推送| B1[Consumer]
B1 -->|实时性好| C1[低延迟]
end
subgraph 拉模式 - basic.get
A2[Broker] -->|被动等待| B2[Consumer]
B2 -->|主动拉取| C2[轮询获取]
end推模式(Push Mode)
- 使用
basic.consume订阅队列 - Broker 主动推送消息给消费者
- 实时性好,延迟低
- 适合持续消费场景
拉模式(Pull Mode)
- 使用
basic.get主动获取消息 - 消费者主动从队列拉取消息
- 需要轮询,效率较低
- 适合批量处理或低频消费场景
2. 消息确认机制
消息确认是保证消息可靠传递的关键机制:
mermaid
sequenceDiagram
participant B as Broker
participant C as Consumer
B->>C: 投递消息
Note over C: 消息状态: Unacked
C->>C: 处理消息
alt 处理成功
C->>B: basic.ack
Note over B: 删除消息
else 处理失败-重试
C->>B: basic.nack(requeue=true)
Note over B: 消息重新入队
else 处理失败-丢弃
C->>B: basic.reject(requeue=false)
Note over B: 删除或转死信队列
end确认类型
| 确认类型 | 说明 | 消息处理 |
|---|---|---|
| ACK | 确认成功 | 消息从队列删除 |
| NACK | 否认(批量) | 可选择重新入队 |
| Reject | 拒绝(单条) | 可选择重新入队 |
自动确认 vs 手动确认
mermaid
graph TB
subgraph 自动确认 - auto_ack=true
A1[消息投递] --> B1[立即确认]
B1 --> C1[消息删除]
C1 --> D1[风险: 消息可能丢失]
end
subgraph 手动确认 - auto_ack=false
A2[消息投递] --> B2[等待处理]
B2 --> C2[手动确认]
C2 --> D2[安全: 确保消息处理]
end3. 消费者标签
消费者标签(Consumer Tag)用于标识消费者:
php
$consumerTag = $channel->basic_consume(
$queue,
$consumerTag,
$noLocal,
$noAck,
$exclusive,
$nowait,
$callback
);- 用于取消订阅:
$channel->basic_cancel($consumerTag) - 用于区分同一队列的多个消费者
4. 预取数量(Prefetch Count)
预取数量控制消费者同时处理的消息数量:
mermaid
graph LR
subgraph Prefetch = 1
A1[Queue] -->|1条| B1[Consumer]
B1 -->|ACK后| C1[再获取1条]
end
subgraph Prefetch = 10
A2[Queue] -->|10条| B2[Consumer]
B2 -->|并行处理| C2[公平分发]
end设置预取数量的作用:
- 控制消费者负载
- 实现公平分发
- 防止消息积压
5. QoS(服务质量)
QoS 参数控制消息投递行为:
php
$channel->basic_qos(
$prefetch_size,
$prefetch_count,
$global
);| 参数 | 说明 |
|---|---|
| prefetch_size | 预取字节数(0=不限制) |
| prefetch_count | 预取消息数量 |
| global | true=整个 Channel,false=每个消费者 |
代码示例
基础消费者实现
php
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
class BasicConsumer
{
private $connection;
private $channel;
public function __construct(array $config = [])
{
$this->connection = new AMQPStreamConnection(
$config['host'] ?? 'localhost',
$config['port'] ?? 5672,
$config['user'] ?? 'guest',
$config['password'] ?? 'guest',
$config['vhost'] ?? '/'
);
$this->channel = $this->connection->channel();
}
public function consume(string $queue, callable $callback): void
{
$this->channel->basic_consume(
$queue,
'',
false,
false,
false,
false,
function ($message) use ($callback) {
$callback($message->body);
$message->ack();
}
);
while ($this->channel->is_consuming()) {
$this->channel->wait();
}
}
public function close(): void
{
$this->channel->close();
$this->connection->close();
}
}
$consumer = new BasicConsumer();
$consumer->consume('orders_queue', function ($body) {
$data = json_decode($body, true);
echo "Processing: " . json_encode($data) . "\n";
});
$consumer->close();手动确认消费者
php
<?php
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
class ManualAckConsumer
{
private $connection;
private $channel;
public function __construct(array $config = [])
{
$this->connection = new AMQPStreamConnection(
$config['host'] ?? 'localhost',
$config['port'] ?? 5672,
$config['user'] ?? 'guest',
$config['password'] ?? 'guest',
$config['vhost'] ?? '/'
);
$this->channel = $this->connection->channel();
}
public function consumeWithAck(
string $queue,
callable $processor,
int $prefetchCount = 1
): void {
$this->channel->basic_qos(null, $prefetchCount, null);
$this->channel->basic_consume(
$queue,
'',
false,
false,
false,
false,
function ($message) use ($processor) {
try {
$result = $processor($message->body);
if ($result) {
$message->ack();
echo " [x] Message acknowledged\n";
} else {
$message->nack(true);
echo " [!] Message nacked, requeuing\n";
}
} catch (Exception $e) {
echo " [!] Error: " . $e->getMessage() . "\n";
$message->nack(true);
}
}
);
echo " [*] Waiting for messages. Press CTRL+C to exit\n";
while ($this->channel->is_consuming()) {
$this->channel->wait();
}
}
public function close(): void
{
$this->channel->close();
$this->connection->close();
}
}
$consumer = new ManualAckConsumer();
$consumer->consumeWithAck('orders_queue', function ($body) {
$data = json_decode($body, true);
echo "Processing order: {$data['order_id']}\n";
$success = processOrder($data);
return $success;
}, 1);
function processOrder(array $data): bool
{
return true;
}带重试机制的消费者
php
<?php
class RetryConsumer
{
private $connection;
private $channel;
private $maxRetries = 3;
public function __construct(array $config = [])
{
$this->connection = new AMQPStreamConnection(
$config['host'] ?? 'localhost',
$config['port'] ?? 5672,
$config['user'] ?? 'guest',
$config['password'] ?? 'guest',
$config['vhost'] ?? '/'
);
$this->channel = $this->connection->channel();
}
public function consumeWithRetry(string $queue, callable $processor): void
{
$this->channel->basic_qos(null, 1, null);
$this->channel->basic_consume(
$queue,
'',
false,
false,
false,
false,
function ($message) use ($processor, $queue) {
$body = json_decode($message->body, true);
$retryCount = $body['retry_count'] ?? 0;
try {
$processor($body);
$message->ack();
echo " [x] Message processed successfully\n";
} catch (Exception $e) {
echo " [!] Processing failed (attempt " . ($retryCount + 1) . "): " . $e->getMessage() . "\n";
if ($retryCount < $this->maxRetries) {
$body['retry_count'] = $retryCount + 1;
$body['last_error'] = $e->getMessage();
$body['last_retry_at'] = date('Y-m-d H:i:s');
$newMessage = new AMQPMessage(
json_encode($body),
[
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
'headers' => [
'retry_count' => $retryCount + 1
]
]
);
$delayMs = $this->calculateDelay($retryCount);
$this->publishWithDelay($queue, $newMessage, $delayMs);
$message->ack();
} else {
echo " [!] Max retries exceeded, sending to DLQ\n";
$message->nack(false);
}
}
}
);
while ($this->channel->is_consuming()) {
$this->channel->wait();
}
}
private function calculateDelay(int $retryCount): int
{
$delays = [1000, 5000, 15000, 60000, 300000];
return $delays[min($retryCount, count($delays) - 1)];
}
private function publishWithDelay(string $queue, AMQPMessage $message, int $delayMs): void
{
$delayQueue = "{$queue}.delay";
$this->channel->queue_declare(
$delayQueue,
false,
true,
false,
false,
false,
new \PhpAmqpLib\Wire\AMQPTable([
'x-dead-letter-exchange' => '',
'x-dead-letter-routing-key' => $queue,
'x-message-ttl' => $delayMs
])
);
$this->channel->basic_publish($message, '', $delayQueue);
}
public function close(): void
{
$this->channel->close();
$this->connection->close();
}
}拉模式消费者
php
<?php
class PullConsumer
{
private $connection;
private $channel;
public function __construct(array $config = [])
{
$this->connection = new AMQPStreamConnection(
$config['host'] ?? 'localhost',
$config['port'] ?? 5672,
$config['user'] ?? 'guest',
$config['password'] ?? 'guest',
$config['vhost'] ?? '/'
);
$this->channel = $this->connection->channel();
}
public function fetchOne(string $queue, callable $processor): bool
{
$message = $this->channel->basic_get($queue);
if (!$message) {
return false;
}
try {
$processor($message->body);
$this->channel->basic_ack($message->getDeliveryTag());
return true;
} catch (Exception $e) {
$this->channel->basic_nack($message->getDeliveryTag(), false, true);
return false;
}
}
public function fetchBatch(string $queue, int $batchSize, callable $processor): int
{
$processed = 0;
for ($i = 0; $i < $batchSize; $i++) {
$message = $this->channel->basic_get($queue);
if (!$message) {
break;
}
try {
$processor($message->body);
$this->channel->basic_ack($message->getDeliveryTag());
$processed++;
} catch (Exception $e) {
$this->channel->basic_nack($message->getDeliveryTag(), false, true);
}
}
return $processed;
}
public function poll(
string $queue,
callable $processor,
int $intervalMs = 1000,
int $maxMessages = 0
): void {
$processed = 0;
while (true) {
$message = $this->channel->basic_get($queue);
if ($message) {
try {
$processor($message->body);
$this->channel->basic_ack($message->getDeliveryTag());
$processed++;
if ($maxMessages > 0 && $processed >= $maxMessages) {
break;
}
} catch (Exception $e) {
$this->channel->basic_nack($message->getDeliveryTag(), false, true);
}
} else {
usleep($intervalMs * 1000);
}
}
}
public function close(): void
{
$this->channel->close();
$this->connection->close();
}
}
$consumer = new PullConsumer();
$consumer->poll('orders_queue', function ($body) {
echo "Processing: $body\n";
}, 1000, 100);多队列消费者
php
<?php
class MultiQueueConsumer
{
private $connection;
private $channel;
private $handlers = [];
public function __construct(array $config = [])
{
$this->connection = new AMQPStreamConnection(
$config['host'] ?? 'localhost',
$config['port'] ?? 5672,
$config['user'] ?? 'guest',
$config['password'] ?? 'guest',
$config['vhost'] ?? '/'
);
$this->channel = $this->connection->channel();
}
public function subscribe(string $queue, callable $handler, string $consumerTag = ''): void
{
$this->handlers[$queue] = $handler;
$this->channel->basic_qos(null, 1, null);
$tag = $this->channel->basic_consume(
$queue,
$consumerTag ?: "consumer_{$queue}",
false,
false,
false,
false,
function ($message) use ($queue) {
$this->handleMessage($queue, $message);
}
);
echo "Subscribed to queue: {$queue} with tag: {$tag}\n";
}
private function handleMessage(string $queue, $message): void
{
$handler = $this->handlers[$queue] ?? null;
if (!$handler) {
echo "No handler for queue: {$queue}\n";
$message->ack();
return;
}
try {
$handler($message);
$message->ack();
} catch (Exception $e) {
echo "Error processing message from {$queue}: " . $e->getMessage() . "\n";
$message->nack(true);
}
}
public function run(): void
{
echo "Consumer started. Press CTRL+C to stop.\n";
while ($this->channel->is_consuming()) {
$this->channel->wait();
}
}
public function stop(): void
{
foreach ($this->handlers as $queue => $handler) {
$this->channel->basic_cancel("consumer_{$queue}");
}
}
public function close(): void
{
$this->channel->close();
$this->connection->close();
}
}
$consumer = new MultiQueueConsumer();
$consumer->subscribe('orders_queue', function ($message) {
$data = json_decode($message->body, true);
echo "Processing order: " . json_encode($data) . "\n";
});
$consumer->subscribe('notifications_queue', function ($message) {
$data = json_decode($message->body, true);
echo "Sending notification: " . json_encode($data) . "\n";
});
$consumer->subscribe('logs_queue', function ($message) {
$data = json_decode($message->body, true);
echo "Logging: " . json_encode($data) . "\n";
});
$consumer->run();优雅关闭消费者
php
<?php
class GracefulConsumer
{
private $connection;
private $channel;
private $running = true;
private $processing = false;
public function __construct(array $config = [])
{
$this->connection = new AMQPStreamConnection(
$config['host'] ?? 'localhost',
$config['port'] ?? 5672,
$config['user'] ?? 'guest',
$config['password'] ?? 'guest',
$config['vhost'] ?? '/'
);
$this->channel = $this->connection->channel();
$this->setupSignalHandlers();
}
private function setupSignalHandlers(): void
{
if (function_exists('pcntl_signal')) {
pcntl_signal(SIGTERM, [$this, 'handleSignal']);
pcntl_signal(SIGINT, [$this, 'handleSignal']);
pcntl_signal(SIGHUP, [$this, 'handleSignal']);
}
}
public function handleSignal(int $signal): void
{
echo "\nReceived signal {$signal}, shutting down gracefully...\n";
$this->running = false;
}
public function consume(string $queue, callable $processor): void
{
$this->channel->basic_qos(null, 1, null);
$this->channel->basic_consume(
$queue,
'',
false,
false,
false,
false,
function ($message) use ($processor) {
$this->processing = true;
try {
$processor($message->body);
$message->ack();
} catch (Exception $e) {
echo "Error: " . $e->getMessage() . "\n";
$message->nack(true);
} finally {
$this->processing = false;
}
}
);
echo "Consumer started. Press CTRL+C to stop.\n";
while ($this->running || $this->processing) {
if (function_exists('pcntl_signal_dispatch')) {
pcntl_signal_dispatch();
}
if ($this->running) {
try {
$this->channel->wait(null, true, 1);
} catch (Exception $e) {
continue;
}
}
}
echo "Consumer stopped gracefully.\n";
}
public function close(): void
{
$this->channel->close();
$this->connection->close();
}
}
$consumer = new GracefulConsumer();
$consumer->consume('orders_queue', function ($body) {
$data = json_decode($body, true);
echo "Processing: " . json_encode($data) . "\n";
sleep(2);
echo "Done processing\n";
});
$consumer->close();实际应用场景
1. 订单处理消费者
php
<?php
class OrderConsumer
{
private $consumer;
private $orderService;
private $inventoryService;
private $notificationService;
public function __construct(
GracefulConsumer $consumer,
$orderService,
$inventoryService,
$notificationService
) {
$this->consumer = $consumer;
$this->orderService = $orderService;
$this->inventoryService = $inventoryService;
$this->notificationService = $notificationService;
}
public function processOrders(): void
{
$this->consumer->consume('orders_queue', function ($body) {
$data = json_decode($body, true);
$event = $data['event'] ?? 'unknown';
switch ($event) {
case 'order.created':
$this->handleOrderCreated($data['data']);
break;
case 'order.paid':
$this->handleOrderPaid($data['data']);
break;
case 'order.cancelled':
$this->handleOrderCancelled($data['data']);
break;
default:
echo "Unknown event: {$event}\n";
}
});
}
private function handleOrderCreated(array $order): void
{
echo "Processing new order: {$order['order_id']}\n";
$this->inventoryService->reserveStock(
$order['items'],
$order['order_id']
);
$this->notificationService->sendOrderConfirmation(
$order['user_id'],
$order['order_id']
);
echo "Order {$order['order_id']} processed successfully\n";
}
private function handleOrderPaid(array $data): void
{
echo "Processing paid order: {$data['order_id']}\n";
$this->orderService->updateStatus(
$data['order_id'],
'paid'
);
$this->inventoryService->confirmReservation(
$data['order_id']
);
$this->notificationService->sendPaymentConfirmation(
$data['order_id']
);
}
private function handleOrderCancelled(array $data): void
{
echo "Processing cancelled order: {$data['order_id']}\n";
$this->orderService->updateStatus(
$data['order_id'],
'cancelled'
);
$this->inventoryService->releaseReservation(
$data['order_id']
);
$this->notificationService->sendCancellationNotice(
$data['order_id'],
$data['reason']
);
}
}2. 邮件发送消费者
php
<?php
class EmailConsumer
{
private $consumer;
private $mailer;
public function __construct(GracefulConsumer $consumer, $mailer)
{
$this->consumer = $consumer;
$this->mailer = $mailer;
}
public function consumeEmails(): void
{
$this->consumer->consume('emails_queue', function ($body) {
$data = json_decode($body, true);
$this->sendEmail($data);
});
}
private function sendEmail(array $data): void
{
$to = $data['to'];
$subject = $data['subject'];
$body = $data['body'];
$options = $data['options'] ?? [];
echo "Sending email to: {$to}\n";
$result = $this->mailer->send($to, $subject, $body, $options);
if ($result) {
echo "Email sent successfully to: {$to}\n";
} else {
throw new Exception("Failed to send email to: {$to}");
}
}
}3. 日志处理消费者
php
<?php
class LogConsumer
{
private $consumer;
private $logStorage;
public function __construct(GracefulConsumer $consumer, $logStorage)
{
$this->consumer = $consumer;
$this->logStorage = $logStorage;
}
public function consumeLogs(): void
{
$this->consumer->consume('logs_queue', function ($body) {
$data = json_decode($body, true);
$this->processLog($data);
});
}
private function processLog(array $data): void
{
$level = $data['level'];
$message = $data['message'];
$context = $data['context'] ?? [];
$timestamp = $data['timestamp'] ?? date('Y-m-d H:i:s');
$logEntry = [
'level' => $level,
'message' => $message,
'context' => $context,
'timestamp' => $timestamp,
'service' => $data['service'] ?? 'unknown',
'hostname' => $data['hostname'] ?? gethostname()
];
$this->logStorage->store($logEntry);
if (in_array($level, ['error', 'emergency', 'critical'])) {
$this->alertOnCriticalLog($logEntry);
}
}
private function alertOnCriticalLog(array $logEntry): void
{
// 发送告警通知
echo "ALERT: Critical log detected - {$logEntry['message']}\n";
}
}常见问题与解决方案
1. 消息重复消费
问题原因:
- ACK 丢失
- 消费者崩溃
- 网络问题
解决方案:
php
<?php
class IdempotentConsumer
{
private $redis;
private $consumer;
public function consumeIdempotent(string $queue, callable $processor): void
{
$this->consumer->consume($queue, function ($body) use ($processor) {
$data = json_decode($body, true);
$messageId = $data['message_id'] ?? md5($body);
$lockKey = "message_lock:{$messageId}";
$processedKey = "message_processed:{$messageId}";
if ($this->redis->exists($processedKey)) {
echo "Message {$messageId} already processed, skipping\n";
return;
}
$locked = $this->redis->set($lockKey, '1', ['NX', 'EX' => 30]);
if (!$locked) {
echo "Message {$messageId} is being processed by another consumer\n";
return;
}
try {
$processor($data);
$this->redis->setex($processedKey, 86400, '1');
echo "Message {$messageId} processed successfully\n";
} finally {
$this->redis->del($lockKey);
}
});
}
}2. 消费者阻塞
问题原因:
- 单条消息处理时间过长
- 外部服务调用超时
- 死锁或无限循环
解决方案:
php
<?php
class TimeoutConsumer
{
private $consumer;
private $defaultTimeout = 30;
public function consumeWithTimeout(
string $queue,
callable $processor,
int $timeout = null
): void {
$timeout = $timeout ?? $this->defaultTimeout;
$this->consumer->consume($queue, function ($body) use ($processor, $timeout) {
$startTime = time();
$result = $this->executeWithTimeout($processor, $body, $timeout);
if ($result['timeout']) {
throw new Exception("Processing timeout after {$timeout} seconds");
}
if (!$result['success']) {
throw new Exception($result['error']);
}
});
}
private function executeWithTimeout(callable $processor, $body, int $timeout): array
{
$result = ['success' => false, 'timeout' => false, 'error' => null];
if (function_exists('pcntl_fork')) {
$pid = pcntl_fork();
if ($pid == -1) {
$result['error'] = 'Failed to fork process';
return $result;
}
if ($pid == 0) {
try {
$processor($body);
exit(0);
} catch (Exception $e) {
exit(1);
}
}
$start = time();
while (true) {
$status = null;
$res = pcntl_waitpid($pid, $status, WNOHANG);
if ($res == $pid) {
$result['success'] = pcntl_wifexited($status) && pcntl_wexitstatus($status) == 0;
break;
}
if (time() - $start > $timeout) {
posix_kill($pid, SIGKILL);
pcntl_waitpid($pid, $status);
$result['timeout'] = true;
break;
}
usleep(100000);
}
} else {
try {
$processor($body);
$result['success'] = true;
} catch (Exception $e) {
$result['error'] = $e->getMessage();
}
}
return $result;
}
}3. 消费者性能问题
问题原因:
- 预取数量设置不当
- 处理逻辑效率低
- 资源竞争
解决方案:
php
<?php
class OptimizedConsumer
{
private $connection;
private $channel;
private $batchSize = 10;
private $batch = [];
public function consumeBatch(string $queue, callable $processor): void
{
$this->channel->basic_qos(null, $this->batchSize, null);
$this->channel->basic_consume(
$queue,
'',
false,
false,
false,
false,
function ($message) use ($processor, $queue) {
$this->batch[] = $message;
if (count($this->batch) >= $this->batchSize) {
$this->processBatch($processor);
}
}
);
while ($this->channel->is_consuming()) {
$this->channel->wait();
}
}
private function processBatch(callable $processor): void
{
if (empty($this->batch)) {
return;
}
$messages = $this->batch;
$this->batch = [];
$results = $processor(array_map(function ($msg) {
return $msg->body;
}, $messages));
foreach ($messages as $index => $message) {
if ($results[$index] ?? false) {
$message->ack();
} else {
$message->nack(true);
}
}
}
}最佳实践建议
1. 消费者配置优化
php
<?php
class ConsumerConfig
{
public static function createOptimized(array $baseConfig = []): array
{
return array_merge([
'host' => 'localhost',
'port' => 5672,
'user' => 'guest',
'password' => 'guest',
'vhost' => '/',
'insist' => false,
'login_method' => 'AMQPLAIN',
'login_response' => null,
'locale' => 'en_US',
'connection_timeout' => 3.0,
'read_write_timeout' => 130.0,
'context' => null,
'keepalive' => true,
'heartbeat' => 60,
], $baseConfig);
}
public static function createConnection(array $config): AMQPStreamConnection
{
return new AMQPStreamConnection(
$config['host'],
$config['port'],
$config['user'],
$config['password'],
$config['vhost'],
$config['insist'],
$config['login_method'],
$config['login_response'],
$config['locale'],
$config['connection_timeout'],
$config['read_write_timeout'],
$config['context'],
$config['keepalive'],
$config['heartbeat']
);
}
}2. 错误处理与监控
php
<?php
class MonitoredConsumer
{
private $consumer;
private $metrics;
private $logger;
public function consume(string $queue, callable $processor): void
{
$this->consumer->consume($queue, function ($body) use ($processor, $queue) {
$startTime = microtime(true);
$messageId = md5($body);
$this->metrics->increment("consumer.{$queue}.received");
try {
$processor($body);
$duration = (microtime(true) - $startTime) * 1000;
$this->metrics->increment("consumer.{$queue}.success");
$this->metrics->timing("consumer.{$queue}.duration", $duration);
$this->logger->info('Message processed', [
'queue' => $queue,
'message_id' => $messageId,
'duration_ms' => round($duration, 2)
]);
} catch (Exception $e) {
$this->metrics->increment("consumer.{$queue}.error");
$this->logger->error('Message processing failed', [
'queue' => $queue,
'message_id' => $messageId,
'error' => $e->getMessage(),
'trace' => $e->getTraceAsString()
]);
throw $e;
}
});
}
}3. 死信队列处理
php
<?php
class DeadLetterConsumer
{
private $consumer;
private $logger;
public function setupDeadLetterQueue(string $originalQueue): void
{
$channel = $this->consumer->getChannel();
$dlqName = "{$originalQueue}.dlq";
$dlxName = "{$originalQueue}.dlx";
$channel->exchange_declare($dlxName, 'direct', false, true, false);
$channel->queue_declare($dlqName, false, true, false, false);
$channel->queue_bind($dlqName, $dlxName, $originalQueue);
}
public function consumeDeadLetters(string $queue): void
{
$dlqName = "{$queue}.dlq";
$this->consumer->consume($dlqName, function ($body) use ($queue) {
$data = json_decode($body, true);
$this->logger->error('Dead letter received', [
'original_queue' => $queue,
'message' => $data,
'x-death' => $data['x-death'] ?? []
]);
$this->handleDeadLetter($data);
});
}
private function handleDeadLetter(array $data): void
{
// 1. 记录到数据库
// 2. 发送告警
// 3. 人工处理或自动修复
}
}