Skip to content

简单队列模式

概述

简单队列模式(Simple Queue Pattern)是 RabbitMQ 最基础的消息传递模式。它由一个生产者、一个队列和一个消费者组成,实现了最简单的点对点消息传递。这是学习 RabbitMQ 的起点,也是理解更复杂模式的基础。

核心知识点

架构图

mermaid
graph LR
    subgraph 生产者
        P[Producer]
    end

    subgraph RabbitMQ
        Q[Queue]
    end

    subgraph 消费者
        C[Consumer]
    end

    P -->|发布消息| Q
    Q -->|消费消息| C

    style P fill:#e1f5fe
    style Q fill:#fff3e0
    style C fill:#e8f5e9

工作流程

mermaid
sequenceDiagram
    participant P as 生产者
    participant Q as 队列
    participant C as 消费者

    Note over P: 1. 建立连接
    P->>Q: 声明队列
    Q-->>P: 队列已创建

    Note over P: 2. 发送消息
    P->>Q: 发布消息
    Q-->>P: 确认接收

    Note over C: 3. 消费消息
    C->>Q: 订阅队列
    Q->>C: 投递消息
    C->>Q: 确认消费

核心概念

概念说明
Producer消息生产者,负责发送消息
Queue消息队列,存储消息
Consumer消息消费者,接收并处理消息
ConnectionTCP 连接
Channel连接中的轻量级通道

PHP 代码示例

基础生产者

php
<?php

require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

class SimpleProducer
{
    private $connection;
    private $channel;
    private $queueName = 'simple_queue';

    public function __construct()
    {
        $this->connection = new AMQPStreamConnection(
            'localhost',
            5672,
            'guest',
            'guest'
        );
        $this->channel = $this->connection->channel();

        $this->channel->queue_declare(
            $this->queueName,
            false,
            true,
            false,
            false
        );
    }

    public function send($message)
    {
        $msg = new AMQPMessage(
            json_encode([
                'content' => $message,
                'timestamp' => time()
            ]),
            ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]
        );

        $this->channel->basic_publish($msg, '', $this->queueName);

        echo " [x] 发送消息: {$message}\n";
    }

    public function close()
    {
        $this->channel->close();
        $this->connection->close();
    }
}

$producer = new SimpleProducer();
$producer->send('Hello RabbitMQ!');
$producer->close();

基础消费者

php
<?php

require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;

class SimpleConsumer
{
    private $connection;
    private $channel;
    private $queueName = 'simple_queue';

    public function __construct()
    {
        $this->connection = new AMQPStreamConnection(
            'localhost',
            5672,
            'guest',
            'guest'
        );
        $this->channel = $this->connection->channel();

        $this->channel->queue_declare(
            $this->queueName,
            false,
            true,
            false,
            false
        );
    }

    public function consume()
    {
        $callback = function ($message) {
            $body = json_decode($message->body, true);

            echo sprintf(
                " [x] 接收消息: %s\n",
                $body['content']
            );

            $message->ack();
        };

        $this->channel->basic_qos(null, 1, null);
        $this->channel->basic_consume(
            $this->queueName,
            '',
            false,
            false,
            false,
            false,
            $callback
        );

        echo " [*] 等待消息。按 CTRL+C 退出\n";

        while ($this->channel->is_open()) {
            $this->channel->wait();
        }
    }

    public function close()
    {
        $this->channel->close();
        $this->connection->close();
    }
}

$consumer = new SimpleConsumer();
$consumer->consume();

带错误处理的完整示例

php
<?php

class RobustSimpleQueue
{
    private $connection;
    private $channel;
    private $queueName;
    private $config;

    public function __construct($queueName, array $config = [])
    {
        $this->queueName = $queueName;
        $this->config = array_merge([
            'host' => 'localhost',
            'port' => 5672,
            'user' => 'guest',
            'password' => 'guest',
            'vhost' => '/'
        ], $config);

        $this->connect();
    }

    private function connect()
    {
        try {
            $this->connection = new AMQPStreamConnection(
                $this->config['host'],
                $this->config['port'],
                $this->config['user'],
                $this->config['password'],
                $this->config['vhost']
            );

            $this->channel = $this->connection->channel();

            $this->channel->queue_declare(
                $this->queueName,
                false,
                true,
                false,
                false
            );

            echo "连接成功\n";
        } catch (Exception $e) {
            throw new RuntimeException(
                "连接 RabbitMQ 失败: " . $e->getMessage()
            );
        }
    }

    public function publish($message, array $options = [])
    {
        $defaultOptions = [
            'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
            'content_type' => 'application/json'
        ];

        $options = array_merge($defaultOptions, $options);

        $msg = new AMQPMessage(
            is_string($message) ? $message : json_encode($message),
            $options
        );

        try {
            $this->channel->basic_publish($msg, '', $this->queueName);
            return true;
        } catch (Exception $e) {
            error_log("发布消息失败: " . $e->getMessage());
            return false;
        }
    }

    public function consume(callable $callback, $timeout = 0)
    {
        $wrapper = function ($message) use ($callback) {
            try {
                $result = call_user_func($callback, $message);
                $message->ack();
                return $result;
            } catch (Exception $e) {
                error_log("处理消息失败: " . $e->getMessage());
                $message->nack(true);
            }
        };

        $this->channel->basic_qos(null, 1, null);
        $this->channel->basic_consume(
            $this->queueName,
            '',
            false,
            false,
            false,
            false,
            $wrapper
        );

        while ($this->channel->is_open()) {
            try {
                $this->channel->wait(null, false, $timeout ?: 0);
            } catch (PhpAmqpLib\Exception\AMQPTimeoutException $e) {
                if ($timeout > 0) {
                    break;
                }
            }
        }
    }

    public function close()
    {
        try {
            if ($this->channel) {
                $this->channel->close();
            }
            if ($this->connection) {
                $this->connection->close();
            }
        } catch (Exception $e) {
            error_log("关闭连接失败: " . $e->getMessage());
        }
    }

    public function __destruct()
    {
        $this->close();
    }
}

连接池管理

php
<?php

class ConnectionPool
{
    private static $connections = [];
    private static $maxConnections = 10;

    public static function getConnection($name = 'default', array $config = [])
    {
        if (!isset(self::$connections[$name])) {
            if (count(self::$connections) >= self::$maxConnections) {
                throw new RuntimeException("连接池已满");
            }

            $defaultConfig = [
                'host' => 'localhost',
                'port' => 5672,
                'user' => 'guest',
                'password' => 'guest'
            ];

            $config = array_merge($defaultConfig, $config);

            self::$connections[$name] = new AMQPStreamConnection(
                $config['host'],
                $config['port'],
                $config['user'],
                $config['password']
            );
        }

        return self::$connections[$name];
    }

    public static function closeAll()
    {
        foreach (self::$connections as $connection) {
            try {
                $connection->close();
            } catch (Exception $e) {
                error_log("关闭连接失败: " . $e->getMessage());
            }
        }

        self::$connections = [];
    }

    public static function close($name)
    {
        if (isset(self::$connections[$name])) {
            try {
                self::$connections[$name]->close();
            } catch (Exception $e) {
                error_log("关闭连接失败: " . $e->getMessage());
            }

            unset(self::$connections[$name]);
        }
    }
}

实际应用场景

1. 日志收集

php
<?php

class LogCollector
{
    private $queue;

    public function __construct()
    {
        $this->queue = new RobustSimpleQueue('app_logs');
    }

    public function log($level, $message, array $context = [])
    {
        $logEntry = [
            'level' => $level,
            'message' => $message,
            'context' => $context,
            'timestamp' => date('Y-m-d H:i:s'),
            'hostname' => gethostname()
        ];

        $this->queue->publish($logEntry);
    }

    public function info($message, array $context = [])
    {
        $this->log('info', $message, $context);
    }

    public function error($message, array $context = [])
    {
        $this->log('error', $message, $context);
    }

    public function warning($message, array $context = [])
    {
        $this->log('warning', $message, $context);
    }
}

class LogProcessor
{
    private $queue;

    public function __construct()
    {
        $this->queue = new RobustSimpleQueue('app_logs');
    }

    public function start()
    {
        echo "日志处理器启动...\n";

        $this->queue->consume(function ($message) {
            $log = json_decode($message->body, true);

            $this->processLog($log);
        });
    }

    private function processLog($log)
    {
        $level = strtoupper($log['level']);

        echo sprintf(
            "[%s] %s: %s\n",
            $log['timestamp'],
            $level,
            $log['message']
        );
    }
}

2. 邮件发送队列

php
<?php

class EmailQueue
{
    private $queue;

    public function __construct()
    {
        $this->queue = new RobustSimpleQueue('email_queue');
    }

    public function send($to, $subject, $body, array $options = [])
    {
        $email = [
            'to' => $to,
            'subject' => $subject,
            'body' => $body,
            'options' => $options,
            'created_at' => time()
        ];

        $this->queue->publish($email);

        echo "邮件已加入发送队列: {$to}\n";
    }

    public function sendTemplate($to, $template, array $data)
    {
        $this->send($to, $template, '', [
            'template' => $template,
            'data' => $data
        ]);
    }
}

class EmailWorker
{
    private $queue;

    public function __construct()
    {
        $this->queue = new RobustSimpleQueue('email_queue');
    }

    public function start()
    {
        echo "邮件发送工作器启动...\n";

        $this->queue->consume(function ($message) {
            $email = json_decode($message->body, true);

            $this->sendEmail($email);
        });
    }

    private function sendEmail($email)
    {
        echo sprintf(
            "发送邮件到: %s, 主题: %s\n",
            $email['to'],
            $email['subject']
        );
    }
}

3. 文件处理队列

php
<?php

class FileProcessingQueue
{
    private $queue;

    public function __construct()
    {
        $this->queue = new RobustSimpleQueue('file_processing');
    }

    public function submit($filePath, $operation, array $params = [])
    {
        $task = [
            'task_id' => uniqid('task_'),
            'file_path' => $filePath,
            'operation' => $operation,
            'params' => $params,
            'submitted_at' => time()
        ];

        $this->queue->publish($task);

        return $task['task_id'];
    }

    public function resize($filePath, $width, $height)
    {
        return $this->submit($filePath, 'resize', [
            'width' => $width,
            'height' => $height
        ]);
    }

    public function convert($filePath, $format)
    {
        return $this->submit($filePath, 'convert', [
            'format' => $format
        ]);
    }
}

常见问题与解决方案

问题 1:消息丢失

原因:消息未持久化或消费者未确认

解决方案

php
<?php

class ReliableQueue
{
    public function publishWithConfirm($message)
    {
        $this->channel->confirm_select();

        $msg = new AMQPMessage(
            json_encode($message),
            ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]
        );

        $this->channel->basic_publish($msg, '', $this->queueName);

        try {
            $this->channel->wait_for_pending_acks();
            return true;
        } catch (Exception $e) {
            return false;
        }
    }
}

问题 2:消费者处理慢

原因:单消费者处理能力有限

解决方案

php
<?php

class BatchConsumer
{
    private $batchSize = 10;
    private $batchTimeout = 5;
    private $batch = [];

    public function consume()
    {
        $lastProcess = time();

        $callback = function ($message) use (&$lastProcess) {
            $this->batch[] = $message;

            if (count($this->batch) >= $this->batchSize ||
                time() - $lastProcess >= $this->batchTimeout) {
                $this->processBatch();
                $lastProcess = time();
            }
        };

        $this->channel->basic_consume(
            $this->queueName,
            '',
            false,
            false,
            false,
            false,
            $callback
        );

        while ($this->channel->is_open()) {
            $this->channel->wait();
        }
    }

    private function processBatch()
    {
        if (empty($this->batch)) {
            return;
        }

        foreach ($this->batch as $message) {
            $this->process($message);
            $message->ack();
        }

        $this->batch = [];
    }

    private function process($message)
    {
        // 处理逻辑
    }
}

问题 3:连接断开

原因:网络问题或服务器重启

解决方案

php
<?php

class ReconnectingQueue
{
    private $config;
    private $connection;
    private $channel;
    private $maxRetries = 5;
    private $retryDelay = 1;

    public function __construct(array $config)
    {
        $this->config = $config;
        $this->connect();
    }

    private function connect()
    {
        $attempts = 0;

        while ($attempts < $this->maxRetries) {
            try {
                $this->connection = new AMQPStreamConnection(
                    $this->config['host'],
                    $this->config['port'],
                    $this->config['user'],
                    $this->config['password']
                );

                $this->channel = $this->connection->channel();

                echo "连接成功\n";
                return;
            } catch (Exception $e) {
                $attempts++;
                echo "连接失败,尝试 {$attempts}/{$this->maxRetries}\n";
                sleep($this->retryDelay * $attempts);
            }
        }

        throw new RuntimeException("无法连接到 RabbitMQ");
    }

    public function publishWithRetry($message)
    {
        $attempts = 0;

        while ($attempts < $this->maxRetries) {
            try {
                if (!$this->connection || !$this->connection->isConnected()) {
                    $this->connect();
                }

                $msg = new AMQPMessage(json_encode($message));
                $this->channel->basic_publish($msg, '', $this->queueName);
                return true;
            } catch (Exception $e) {
                $attempts++;
                sleep($this->retryDelay);
            }
        }

        return false;
    }
}

最佳实践建议

1. 消息格式标准化

php
<?php

class StandardMessage
{
    public static function create($type, $payload, array $meta = [])
    {
        return [
            'id' => uniqid('msg_', true),
            'type' => $type,
            'payload' => $payload,
            'meta' => array_merge([
                'timestamp' => time(),
                'source' => gethostname()
            ], $meta),
            'version' => '1.0'
        ];
    }

    public static function parse($body)
    {
        $data = json_decode($body, true);

        if (json_last_error() !== JSON_ERROR_NONE) {
            throw new InvalidArgumentException("无效的 JSON 格式");
        }

        return $data;
    }
}

2. 监控和指标

php
<?php

class QueueMonitor
{
    private $stats = [];

    public function recordPublish($queueName, $success = true)
    {
        $this->increment($queueName, 'published', $success);
    }

    public function recordConsume($queueName, $success = true)
    {
        $this->increment($queueName, 'consumed', $success);
    }

    private function increment($queueName, $type, $success)
    {
        if (!isset($this->stats[$queueName])) {
            $this->stats[$queueName] = [
                'published' => ['total' => 0, 'success' => 0, 'failed' => 0],
                'consumed' => ['total' => 0, 'success' => 0, 'failed' => 0]
            ];
        }

        $this->stats[$queueName][$type]['total']++;

        if ($success) {
            $this->stats[$queueName][$type]['success']++;
        } else {
            $this->stats[$queueName][$type]['failed']++;
        }
    }

    public function getStats()
    {
        return $this->stats;
    }
}

3. 优雅关闭

php
<?php

class GracefulConsumer
{
    private $running = true;

    public function consume()
    {
        pcntl_async_signals(true);
        pcntl_signal(SIGTERM, [$this, 'shutdown']);
        pcntl_signal(SIGINT, [$this, 'shutdown']);

        while ($this->running && $this->channel->is_open()) {
            try {
                $this->channel->wait(null, false, 1);
            } catch (PhpAmqpLib\Exception\AMQPTimeoutException $e) {
                continue;
            }
        }

        echo "消费者已关闭\n";
    }

    public function shutdown()
    {
        echo "收到关闭信号,正在停止...\n";
        $this->running = false;
    }
}

相关链接