Skip to content

RabbitMQ 消费者(Consumer)

概述

消费者(Consumer)是 RabbitMQ 消息模型中的消息接收者和处理者。它从队列中获取消息并执行相应的业务逻辑。消费者与生产者完全解耦,不需要知道消息的来源,也不需要知道消息是否被其他消费者处理。

消费者的核心职责

mermaid
graph LR
    A[订阅队列] --> B[接收消息]
    B --> C[解析消息]
    C --> D[业务处理]
    D --> E{处理成功?}
    E -->|是| F[发送 ACK]
    E -->|否| G[发送 NACK/Reject]
    F --> H[继续消费]
    G --> H

消费者的主要职责包括:

  • 队列订阅:订阅一个或多个队列
  • 消息接收:从队列获取消息
  • 消息处理:执行业务逻辑
  • 结果确认:发送 ACK 或 NACK
  • 错误处理:处理异常情况

核心知识点

1. 消费模式

RabbitMQ 支持两种消费模式:

mermaid
graph TB
    subgraph 推模式 - basic.consume
        A1[Broker] -->|主动推送| B1[Consumer]
        B1 -->|实时性好| C1[低延迟]
    end
    
    subgraph 拉模式 - basic.get
        A2[Broker] -->|被动等待| B2[Consumer]
        B2 -->|主动拉取| C2[轮询获取]
    end

推模式(Push Mode)

  • 使用 basic.consume 订阅队列
  • Broker 主动推送消息给消费者
  • 实时性好,延迟低
  • 适合持续消费场景

拉模式(Pull Mode)

  • 使用 basic.get 主动获取消息
  • 消费者主动从队列拉取消息
  • 需要轮询,效率较低
  • 适合批量处理或低频消费场景

2. 消息确认机制

消息确认是保证消息可靠传递的关键机制:

mermaid
sequenceDiagram
    participant B as Broker
    participant C as Consumer
    
    B->>C: 投递消息
    Note over C: 消息状态: Unacked
    C->>C: 处理消息
    alt 处理成功
        C->>B: basic.ack
        Note over B: 删除消息
    else 处理失败-重试
        C->>B: basic.nack(requeue=true)
        Note over B: 消息重新入队
    else 处理失败-丢弃
        C->>B: basic.reject(requeue=false)
        Note over B: 删除或转死信队列
    end

确认类型

确认类型说明消息处理
ACK确认成功消息从队列删除
NACK否认(批量)可选择重新入队
Reject拒绝(单条)可选择重新入队

自动确认 vs 手动确认

mermaid
graph TB
    subgraph 自动确认 - auto_ack=true
        A1[消息投递] --> B1[立即确认]
        B1 --> C1[消息删除]
        C1 --> D1[风险: 消息可能丢失]
    end
    
    subgraph 手动确认 - auto_ack=false
        A2[消息投递] --> B2[等待处理]
        B2 --> C2[手动确认]
        C2 --> D2[安全: 确保消息处理]
    end

3. 消费者标签

消费者标签(Consumer Tag)用于标识消费者:

php
$consumerTag = $channel->basic_consume(
    $queue,
    $consumerTag,
    $noLocal,
    $noAck,
    $exclusive,
    $nowait,
    $callback
);
  • 用于取消订阅:$channel->basic_cancel($consumerTag)
  • 用于区分同一队列的多个消费者

4. 预取数量(Prefetch Count)

预取数量控制消费者同时处理的消息数量:

mermaid
graph LR
    subgraph Prefetch = 1
        A1[Queue] -->|1条| B1[Consumer]
        B1 -->|ACK后| C1[再获取1条]
    end
    
    subgraph Prefetch = 10
        A2[Queue] -->|10条| B2[Consumer]
        B2 -->|并行处理| C2[公平分发]
    end

设置预取数量的作用

  • 控制消费者负载
  • 实现公平分发
  • 防止消息积压

5. QoS(服务质量)

QoS 参数控制消息投递行为:

php
$channel->basic_qos(
    $prefetch_size,
    $prefetch_count,
    $global
);
参数说明
prefetch_size预取字节数(0=不限制)
prefetch_count预取消息数量
globaltrue=整个 Channel,false=每个消费者

代码示例

基础消费者实现

php
<?php

require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;

class BasicConsumer
{
    private $connection;
    private $channel;
    
    public function __construct(array $config = [])
    {
        $this->connection = new AMQPStreamConnection(
            $config['host'] ?? 'localhost',
            $config['port'] ?? 5672,
            $config['user'] ?? 'guest',
            $config['password'] ?? 'guest',
            $config['vhost'] ?? '/'
        );
        
        $this->channel = $this->connection->channel();
    }
    
    public function consume(string $queue, callable $callback): void
    {
        $this->channel->basic_consume(
            $queue,
            '',
            false,
            false,
            false,
            false,
            function ($message) use ($callback) {
                $callback($message->body);
                $message->ack();
            }
        );
        
        while ($this->channel->is_consuming()) {
            $this->channel->wait();
        }
    }
    
    public function close(): void
    {
        $this->channel->close();
        $this->connection->close();
    }
}

$consumer = new BasicConsumer();

$consumer->consume('orders_queue', function ($body) {
    $data = json_decode($body, true);
    echo "Processing: " . json_encode($data) . "\n";
});

$consumer->close();

手动确认消费者

php
<?php

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

class ManualAckConsumer
{
    private $connection;
    private $channel;
    
    public function __construct(array $config = [])
    {
        $this->connection = new AMQPStreamConnection(
            $config['host'] ?? 'localhost',
            $config['port'] ?? 5672,
            $config['user'] ?? 'guest',
            $config['password'] ?? 'guest',
            $config['vhost'] ?? '/'
        );
        
        $this->channel = $this->connection->channel();
    }
    
    public function consumeWithAck(
        string $queue,
        callable $processor,
        int $prefetchCount = 1
    ): void {
        $this->channel->basic_qos(null, $prefetchCount, null);
        
        $this->channel->basic_consume(
            $queue,
            '',
            false,
            false,
            false,
            false,
            function ($message) use ($processor) {
                try {
                    $result = $processor($message->body);
                    
                    if ($result) {
                        $message->ack();
                        echo " [x] Message acknowledged\n";
                    } else {
                        $message->nack(true);
                        echo " [!] Message nacked, requeuing\n";
                    }
                } catch (Exception $e) {
                    echo " [!] Error: " . $e->getMessage() . "\n";
                    $message->nack(true);
                }
            }
        );
        
        echo " [*] Waiting for messages. Press CTRL+C to exit\n";
        
        while ($this->channel->is_consuming()) {
            $this->channel->wait();
        }
    }
    
    public function close(): void
    {
        $this->channel->close();
        $this->connection->close();
    }
}

$consumer = new ManualAckConsumer();

$consumer->consumeWithAck('orders_queue', function ($body) {
    $data = json_decode($body, true);
    
    echo "Processing order: {$data['order_id']}\n";
    
    $success = processOrder($data);
    
    return $success;
}, 1);

function processOrder(array $data): bool
{
    return true;
}

带重试机制的消费者

php
<?php

class RetryConsumer
{
    private $connection;
    private $channel;
    private $maxRetries = 3;
    
    public function __construct(array $config = [])
    {
        $this->connection = new AMQPStreamConnection(
            $config['host'] ?? 'localhost',
            $config['port'] ?? 5672,
            $config['user'] ?? 'guest',
            $config['password'] ?? 'guest',
            $config['vhost'] ?? '/'
        );
        
        $this->channel = $this->connection->channel();
    }
    
    public function consumeWithRetry(string $queue, callable $processor): void
    {
        $this->channel->basic_qos(null, 1, null);
        
        $this->channel->basic_consume(
            $queue,
            '',
            false,
            false,
            false,
            false,
            function ($message) use ($processor, $queue) {
                $body = json_decode($message->body, true);
                $retryCount = $body['retry_count'] ?? 0;
                
                try {
                    $processor($body);
                    $message->ack();
                    echo " [x] Message processed successfully\n";
                } catch (Exception $e) {
                    echo " [!] Processing failed (attempt " . ($retryCount + 1) . "): " . $e->getMessage() . "\n";
                    
                    if ($retryCount < $this->maxRetries) {
                        $body['retry_count'] = $retryCount + 1;
                        $body['last_error'] = $e->getMessage();
                        $body['last_retry_at'] = date('Y-m-d H:i:s');
                        
                        $newMessage = new AMQPMessage(
                            json_encode($body),
                            [
                                'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
                                'headers' => [
                                    'retry_count' => $retryCount + 1
                                ]
                            ]
                        );
                        
                        $delayMs = $this->calculateDelay($retryCount);
                        $this->publishWithDelay($queue, $newMessage, $delayMs);
                        
                        $message->ack();
                    } else {
                        echo " [!] Max retries exceeded, sending to DLQ\n";
                        $message->nack(false);
                    }
                }
            }
        );
        
        while ($this->channel->is_consuming()) {
            $this->channel->wait();
        }
    }
    
    private function calculateDelay(int $retryCount): int
    {
        $delays = [1000, 5000, 15000, 60000, 300000];
        return $delays[min($retryCount, count($delays) - 1)];
    }
    
    private function publishWithDelay(string $queue, AMQPMessage $message, int $delayMs): void
    {
        $delayQueue = "{$queue}.delay";
        
        $this->channel->queue_declare(
            $delayQueue,
            false,
            true,
            false,
            false,
            false,
            new \PhpAmqpLib\Wire\AMQPTable([
                'x-dead-letter-exchange' => '',
                'x-dead-letter-routing-key' => $queue,
                'x-message-ttl' => $delayMs
            ])
        );
        
        $this->channel->basic_publish($message, '', $delayQueue);
    }
    
    public function close(): void
    {
        $this->channel->close();
        $this->connection->close();
    }
}

拉模式消费者

php
<?php

class PullConsumer
{
    private $connection;
    private $channel;
    
    public function __construct(array $config = [])
    {
        $this->connection = new AMQPStreamConnection(
            $config['host'] ?? 'localhost',
            $config['port'] ?? 5672,
            $config['user'] ?? 'guest',
            $config['password'] ?? 'guest',
            $config['vhost'] ?? '/'
        );
        
        $this->channel = $this->connection->channel();
    }
    
    public function fetchOne(string $queue, callable $processor): bool
    {
        $message = $this->channel->basic_get($queue);
        
        if (!$message) {
            return false;
        }
        
        try {
            $processor($message->body);
            $this->channel->basic_ack($message->getDeliveryTag());
            return true;
        } catch (Exception $e) {
            $this->channel->basic_nack($message->getDeliveryTag(), false, true);
            return false;
        }
    }
    
    public function fetchBatch(string $queue, int $batchSize, callable $processor): int
    {
        $processed = 0;
        
        for ($i = 0; $i < $batchSize; $i++) {
            $message = $this->channel->basic_get($queue);
            
            if (!$message) {
                break;
            }
            
            try {
                $processor($message->body);
                $this->channel->basic_ack($message->getDeliveryTag());
                $processed++;
            } catch (Exception $e) {
                $this->channel->basic_nack($message->getDeliveryTag(), false, true);
            }
        }
        
        return $processed;
    }
    
    public function poll(
        string $queue,
        callable $processor,
        int $intervalMs = 1000,
        int $maxMessages = 0
    ): void {
        $processed = 0;
        
        while (true) {
            $message = $this->channel->basic_get($queue);
            
            if ($message) {
                try {
                    $processor($message->body);
                    $this->channel->basic_ack($message->getDeliveryTag());
                    $processed++;
                    
                    if ($maxMessages > 0 && $processed >= $maxMessages) {
                        break;
                    }
                } catch (Exception $e) {
                    $this->channel->basic_nack($message->getDeliveryTag(), false, true);
                }
            } else {
                usleep($intervalMs * 1000);
            }
        }
    }
    
    public function close(): void
    {
        $this->channel->close();
        $this->connection->close();
    }
}

$consumer = new PullConsumer();

$consumer->poll('orders_queue', function ($body) {
    echo "Processing: $body\n";
}, 1000, 100);

多队列消费者

php
<?php

class MultiQueueConsumer
{
    private $connection;
    private $channel;
    private $handlers = [];
    
    public function __construct(array $config = [])
    {
        $this->connection = new AMQPStreamConnection(
            $config['host'] ?? 'localhost',
            $config['port'] ?? 5672,
            $config['user'] ?? 'guest',
            $config['password'] ?? 'guest',
            $config['vhost'] ?? '/'
        );
        
        $this->channel = $this->connection->channel();
    }
    
    public function subscribe(string $queue, callable $handler, string $consumerTag = ''): void
    {
        $this->handlers[$queue] = $handler;
        
        $this->channel->basic_qos(null, 1, null);
        
        $tag = $this->channel->basic_consume(
            $queue,
            $consumerTag ?: "consumer_{$queue}",
            false,
            false,
            false,
            false,
            function ($message) use ($queue) {
                $this->handleMessage($queue, $message);
            }
        );
        
        echo "Subscribed to queue: {$queue} with tag: {$tag}\n";
    }
    
    private function handleMessage(string $queue, $message): void
    {
        $handler = $this->handlers[$queue] ?? null;
        
        if (!$handler) {
            echo "No handler for queue: {$queue}\n";
            $message->ack();
            return;
        }
        
        try {
            $handler($message);
            $message->ack();
        } catch (Exception $e) {
            echo "Error processing message from {$queue}: " . $e->getMessage() . "\n";
            $message->nack(true);
        }
    }
    
    public function run(): void
    {
        echo "Consumer started. Press CTRL+C to stop.\n";
        
        while ($this->channel->is_consuming()) {
            $this->channel->wait();
        }
    }
    
    public function stop(): void
    {
        foreach ($this->handlers as $queue => $handler) {
            $this->channel->basic_cancel("consumer_{$queue}");
        }
    }
    
    public function close(): void
    {
        $this->channel->close();
        $this->connection->close();
    }
}

$consumer = new MultiQueueConsumer();

$consumer->subscribe('orders_queue', function ($message) {
    $data = json_decode($message->body, true);
    echo "Processing order: " . json_encode($data) . "\n";
});

$consumer->subscribe('notifications_queue', function ($message) {
    $data = json_decode($message->body, true);
    echo "Sending notification: " . json_encode($data) . "\n";
});

$consumer->subscribe('logs_queue', function ($message) {
    $data = json_decode($message->body, true);
    echo "Logging: " . json_encode($data) . "\n";
});

$consumer->run();

优雅关闭消费者

php
<?php

class GracefulConsumer
{
    private $connection;
    private $channel;
    private $running = true;
    private $processing = false;
    
    public function __construct(array $config = [])
    {
        $this->connection = new AMQPStreamConnection(
            $config['host'] ?? 'localhost',
            $config['port'] ?? 5672,
            $config['user'] ?? 'guest',
            $config['password'] ?? 'guest',
            $config['vhost'] ?? '/'
        );
        
        $this->channel = $this->connection->channel();
        $this->setupSignalHandlers();
    }
    
    private function setupSignalHandlers(): void
    {
        if (function_exists('pcntl_signal')) {
            pcntl_signal(SIGTERM, [$this, 'handleSignal']);
            pcntl_signal(SIGINT, [$this, 'handleSignal']);
            pcntl_signal(SIGHUP, [$this, 'handleSignal']);
        }
    }
    
    public function handleSignal(int $signal): void
    {
        echo "\nReceived signal {$signal}, shutting down gracefully...\n";
        $this->running = false;
    }
    
    public function consume(string $queue, callable $processor): void
    {
        $this->channel->basic_qos(null, 1, null);
        
        $this->channel->basic_consume(
            $queue,
            '',
            false,
            false,
            false,
            false,
            function ($message) use ($processor) {
                $this->processing = true;
                
                try {
                    $processor($message->body);
                    $message->ack();
                } catch (Exception $e) {
                    echo "Error: " . $e->getMessage() . "\n";
                    $message->nack(true);
                } finally {
                    $this->processing = false;
                }
            }
        );
        
        echo "Consumer started. Press CTRL+C to stop.\n";
        
        while ($this->running || $this->processing) {
            if (function_exists('pcntl_signal_dispatch')) {
                pcntl_signal_dispatch();
            }
            
            if ($this->running) {
                try {
                    $this->channel->wait(null, true, 1);
                } catch (Exception $e) {
                    continue;
                }
            }
        }
        
        echo "Consumer stopped gracefully.\n";
    }
    
    public function close(): void
    {
        $this->channel->close();
        $this->connection->close();
    }
}

$consumer = new GracefulConsumer();

$consumer->consume('orders_queue', function ($body) {
    $data = json_decode($body, true);
    echo "Processing: " . json_encode($data) . "\n";
    sleep(2);
    echo "Done processing\n";
});

$consumer->close();

实际应用场景

1. 订单处理消费者

php
<?php

class OrderConsumer
{
    private $consumer;
    private $orderService;
    private $inventoryService;
    private $notificationService;
    
    public function __construct(
        GracefulConsumer $consumer,
        $orderService,
        $inventoryService,
        $notificationService
    ) {
        $this->consumer = $consumer;
        $this->orderService = $orderService;
        $this->inventoryService = $inventoryService;
        $this->notificationService = $notificationService;
    }
    
    public function processOrders(): void
    {
        $this->consumer->consume('orders_queue', function ($body) {
            $data = json_decode($body, true);
            $event = $data['event'] ?? 'unknown';
            
            switch ($event) {
                case 'order.created':
                    $this->handleOrderCreated($data['data']);
                    break;
                case 'order.paid':
                    $this->handleOrderPaid($data['data']);
                    break;
                case 'order.cancelled':
                    $this->handleOrderCancelled($data['data']);
                    break;
                default:
                    echo "Unknown event: {$event}\n";
            }
        });
    }
    
    private function handleOrderCreated(array $order): void
    {
        echo "Processing new order: {$order['order_id']}\n";
        
        $this->inventoryService->reserveStock(
            $order['items'],
            $order['order_id']
        );
        
        $this->notificationService->sendOrderConfirmation(
            $order['user_id'],
            $order['order_id']
        );
        
        echo "Order {$order['order_id']} processed successfully\n";
    }
    
    private function handleOrderPaid(array $data): void
    {
        echo "Processing paid order: {$data['order_id']}\n";
        
        $this->orderService->updateStatus(
            $data['order_id'],
            'paid'
        );
        
        $this->inventoryService->confirmReservation(
            $data['order_id']
        );
        
        $this->notificationService->sendPaymentConfirmation(
            $data['order_id']
        );
    }
    
    private function handleOrderCancelled(array $data): void
    {
        echo "Processing cancelled order: {$data['order_id']}\n";
        
        $this->orderService->updateStatus(
            $data['order_id'],
            'cancelled'
        );
        
        $this->inventoryService->releaseReservation(
            $data['order_id']
        );
        
        $this->notificationService->sendCancellationNotice(
            $data['order_id'],
            $data['reason']
        );
    }
}

2. 邮件发送消费者

php
<?php

class EmailConsumer
{
    private $consumer;
    private $mailer;
    
    public function __construct(GracefulConsumer $consumer, $mailer)
    {
        $this->consumer = $consumer;
        $this->mailer = $mailer;
    }
    
    public function consumeEmails(): void
    {
        $this->consumer->consume('emails_queue', function ($body) {
            $data = json_decode($body, true);
            
            $this->sendEmail($data);
        });
    }
    
    private function sendEmail(array $data): void
    {
        $to = $data['to'];
        $subject = $data['subject'];
        $body = $data['body'];
        $options = $data['options'] ?? [];
        
        echo "Sending email to: {$to}\n";
        
        $result = $this->mailer->send($to, $subject, $body, $options);
        
        if ($result) {
            echo "Email sent successfully to: {$to}\n";
        } else {
            throw new Exception("Failed to send email to: {$to}");
        }
    }
}

3. 日志处理消费者

php
<?php

class LogConsumer
{
    private $consumer;
    private $logStorage;
    
    public function __construct(GracefulConsumer $consumer, $logStorage)
    {
        $this->consumer = $consumer;
        $this->logStorage = $logStorage;
    }
    
    public function consumeLogs(): void
    {
        $this->consumer->consume('logs_queue', function ($body) {
            $data = json_decode($body, true);
            
            $this->processLog($data);
        });
    }
    
    private function processLog(array $data): void
    {
        $level = $data['level'];
        $message = $data['message'];
        $context = $data['context'] ?? [];
        $timestamp = $data['timestamp'] ?? date('Y-m-d H:i:s');
        
        $logEntry = [
            'level' => $level,
            'message' => $message,
            'context' => $context,
            'timestamp' => $timestamp,
            'service' => $data['service'] ?? 'unknown',
            'hostname' => $data['hostname'] ?? gethostname()
        ];
        
        $this->logStorage->store($logEntry);
        
        if (in_array($level, ['error', 'emergency', 'critical'])) {
            $this->alertOnCriticalLog($logEntry);
        }
    }
    
    private function alertOnCriticalLog(array $logEntry): void
    {
        // 发送告警通知
        echo "ALERT: Critical log detected - {$logEntry['message']}\n";
    }
}

常见问题与解决方案

1. 消息重复消费

问题原因

  • ACK 丢失
  • 消费者崩溃
  • 网络问题

解决方案

php
<?php

class IdempotentConsumer
{
    private $redis;
    private $consumer;
    
    public function consumeIdempotent(string $queue, callable $processor): void
    {
        $this->consumer->consume($queue, function ($body) use ($processor) {
            $data = json_decode($body, true);
            $messageId = $data['message_id'] ?? md5($body);
            
            $lockKey = "message_lock:{$messageId}";
            $processedKey = "message_processed:{$messageId}";
            
            if ($this->redis->exists($processedKey)) {
                echo "Message {$messageId} already processed, skipping\n";
                return;
            }
            
            $locked = $this->redis->set($lockKey, '1', ['NX', 'EX' => 30]);
            
            if (!$locked) {
                echo "Message {$messageId} is being processed by another consumer\n";
                return;
            }
            
            try {
                $processor($data);
                
                $this->redis->setex($processedKey, 86400, '1');
                
                echo "Message {$messageId} processed successfully\n";
            } finally {
                $this->redis->del($lockKey);
            }
        });
    }
}

2. 消费者阻塞

问题原因

  • 单条消息处理时间过长
  • 外部服务调用超时
  • 死锁或无限循环

解决方案

php
<?php

class TimeoutConsumer
{
    private $consumer;
    private $defaultTimeout = 30;
    
    public function consumeWithTimeout(
        string $queue,
        callable $processor,
        int $timeout = null
    ): void {
        $timeout = $timeout ?? $this->defaultTimeout;
        
        $this->consumer->consume($queue, function ($body) use ($processor, $timeout) {
            $startTime = time();
            
            $result = $this->executeWithTimeout($processor, $body, $timeout);
            
            if ($result['timeout']) {
                throw new Exception("Processing timeout after {$timeout} seconds");
            }
            
            if (!$result['success']) {
                throw new Exception($result['error']);
            }
        });
    }
    
    private function executeWithTimeout(callable $processor, $body, int $timeout): array
    {
        $result = ['success' => false, 'timeout' => false, 'error' => null];
        
        if (function_exists('pcntl_fork')) {
            $pid = pcntl_fork();
            
            if ($pid == -1) {
                $result['error'] = 'Failed to fork process';
                return $result;
            }
            
            if ($pid == 0) {
                try {
                    $processor($body);
                    exit(0);
                } catch (Exception $e) {
                    exit(1);
                }
            }
            
            $start = time();
            while (true) {
                $status = null;
                $res = pcntl_waitpid($pid, $status, WNOHANG);
                
                if ($res == $pid) {
                    $result['success'] = pcntl_wifexited($status) && pcntl_wexitstatus($status) == 0;
                    break;
                }
                
                if (time() - $start > $timeout) {
                    posix_kill($pid, SIGKILL);
                    pcntl_waitpid($pid, $status);
                    $result['timeout'] = true;
                    break;
                }
                
                usleep(100000);
            }
        } else {
            try {
                $processor($body);
                $result['success'] = true;
            } catch (Exception $e) {
                $result['error'] = $e->getMessage();
            }
        }
        
        return $result;
    }
}

3. 消费者性能问题

问题原因

  • 预取数量设置不当
  • 处理逻辑效率低
  • 资源竞争

解决方案

php
<?php

class OptimizedConsumer
{
    private $connection;
    private $channel;
    private $batchSize = 10;
    private $batch = [];
    
    public function consumeBatch(string $queue, callable $processor): void
    {
        $this->channel->basic_qos(null, $this->batchSize, null);
        
        $this->channel->basic_consume(
            $queue,
            '',
            false,
            false,
            false,
            false,
            function ($message) use ($processor, $queue) {
                $this->batch[] = $message;
                
                if (count($this->batch) >= $this->batchSize) {
                    $this->processBatch($processor);
                }
            }
        );
        
        while ($this->channel->is_consuming()) {
            $this->channel->wait();
        }
    }
    
    private function processBatch(callable $processor): void
    {
        if (empty($this->batch)) {
            return;
        }
        
        $messages = $this->batch;
        $this->batch = [];
        
        $results = $processor(array_map(function ($msg) {
            return $msg->body;
        }, $messages));
        
        foreach ($messages as $index => $message) {
            if ($results[$index] ?? false) {
                $message->ack();
            } else {
                $message->nack(true);
            }
        }
    }
}

最佳实践建议

1. 消费者配置优化

php
<?php

class ConsumerConfig
{
    public static function createOptimized(array $baseConfig = []): array
    {
        return array_merge([
            'host' => 'localhost',
            'port' => 5672,
            'user' => 'guest',
            'password' => 'guest',
            'vhost' => '/',
            'insist' => false,
            'login_method' => 'AMQPLAIN',
            'login_response' => null,
            'locale' => 'en_US',
            'connection_timeout' => 3.0,
            'read_write_timeout' => 130.0,
            'context' => null,
            'keepalive' => true,
            'heartbeat' => 60,
        ], $baseConfig);
    }
    
    public static function createConnection(array $config): AMQPStreamConnection
    {
        return new AMQPStreamConnection(
            $config['host'],
            $config['port'],
            $config['user'],
            $config['password'],
            $config['vhost'],
            $config['insist'],
            $config['login_method'],
            $config['login_response'],
            $config['locale'],
            $config['connection_timeout'],
            $config['read_write_timeout'],
            $config['context'],
            $config['keepalive'],
            $config['heartbeat']
        );
    }
}

2. 错误处理与监控

php
<?php

class MonitoredConsumer
{
    private $consumer;
    private $metrics;
    private $logger;
    
    public function consume(string $queue, callable $processor): void
    {
        $this->consumer->consume($queue, function ($body) use ($processor, $queue) {
            $startTime = microtime(true);
            $messageId = md5($body);
            
            $this->metrics->increment("consumer.{$queue}.received");
            
            try {
                $processor($body);
                
                $duration = (microtime(true) - $startTime) * 1000;
                
                $this->metrics->increment("consumer.{$queue}.success");
                $this->metrics->timing("consumer.{$queue}.duration", $duration);
                
                $this->logger->info('Message processed', [
                    'queue' => $queue,
                    'message_id' => $messageId,
                    'duration_ms' => round($duration, 2)
                ]);
            } catch (Exception $e) {
                $this->metrics->increment("consumer.{$queue}.error");
                
                $this->logger->error('Message processing failed', [
                    'queue' => $queue,
                    'message_id' => $messageId,
                    'error' => $e->getMessage(),
                    'trace' => $e->getTraceAsString()
                ]);
                
                throw $e;
            }
        });
    }
}

3. 死信队列处理

php
<?php

class DeadLetterConsumer
{
    private $consumer;
    private $logger;
    
    public function setupDeadLetterQueue(string $originalQueue): void
    {
        $channel = $this->consumer->getChannel();
        
        $dlqName = "{$originalQueue}.dlq";
        $dlxName = "{$originalQueue}.dlx";
        
        $channel->exchange_declare($dlxName, 'direct', false, true, false);
        
        $channel->queue_declare($dlqName, false, true, false, false);
        
        $channel->queue_bind($dlqName, $dlxName, $originalQueue);
    }
    
    public function consumeDeadLetters(string $queue): void
    {
        $dlqName = "{$queue}.dlq";
        
        $this->consumer->consume($dlqName, function ($body) use ($queue) {
            $data = json_decode($body, true);
            
            $this->logger->error('Dead letter received', [
                'original_queue' => $queue,
                'message' => $data,
                'x-death' => $data['x-death'] ?? []
            ]);
            
            $this->handleDeadLetter($data);
        });
    }
    
    private function handleDeadLetter(array $data): void
    {
        // 1. 记录到数据库
        // 2. 发送告警
        // 3. 人工处理或自动修复
    }
}

相关链接