Appearance
简单队列模式
概述
简单队列模式(Simple Queue Pattern)是 RabbitMQ 最基础的消息传递模式。它由一个生产者、一个队列和一个消费者组成,实现了最简单的点对点消息传递。这是学习 RabbitMQ 的起点,也是理解更复杂模式的基础。
核心知识点
架构图
mermaid
graph LR
subgraph 生产者
P[Producer]
end
subgraph RabbitMQ
Q[Queue]
end
subgraph 消费者
C[Consumer]
end
P -->|发布消息| Q
Q -->|消费消息| C
style P fill:#e1f5fe
style Q fill:#fff3e0
style C fill:#e8f5e9工作流程
mermaid
sequenceDiagram
participant P as 生产者
participant Q as 队列
participant C as 消费者
Note over P: 1. 建立连接
P->>Q: 声明队列
Q-->>P: 队列已创建
Note over P: 2. 发送消息
P->>Q: 发布消息
Q-->>P: 确认接收
Note over C: 3. 消费消息
C->>Q: 订阅队列
Q->>C: 投递消息
C->>Q: 确认消费核心概念
| 概念 | 说明 |
|---|---|
| Producer | 消息生产者,负责发送消息 |
| Queue | 消息队列,存储消息 |
| Consumer | 消息消费者,接收并处理消息 |
| Connection | TCP 连接 |
| Channel | 连接中的轻量级通道 |
PHP 代码示例
基础生产者
php
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
class SimpleProducer
{
private $connection;
private $channel;
private $queueName = 'simple_queue';
public function __construct()
{
$this->connection = new AMQPStreamConnection(
'localhost',
5672,
'guest',
'guest'
);
$this->channel = $this->connection->channel();
$this->channel->queue_declare(
$this->queueName,
false,
true,
false,
false
);
}
public function send($message)
{
$msg = new AMQPMessage(
json_encode([
'content' => $message,
'timestamp' => time()
]),
['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]
);
$this->channel->basic_publish($msg, '', $this->queueName);
echo " [x] 发送消息: {$message}\n";
}
public function close()
{
$this->channel->close();
$this->connection->close();
}
}
$producer = new SimpleProducer();
$producer->send('Hello RabbitMQ!');
$producer->close();基础消费者
php
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
class SimpleConsumer
{
private $connection;
private $channel;
private $queueName = 'simple_queue';
public function __construct()
{
$this->connection = new AMQPStreamConnection(
'localhost',
5672,
'guest',
'guest'
);
$this->channel = $this->connection->channel();
$this->channel->queue_declare(
$this->queueName,
false,
true,
false,
false
);
}
public function consume()
{
$callback = function ($message) {
$body = json_decode($message->body, true);
echo sprintf(
" [x] 接收消息: %s\n",
$body['content']
);
$message->ack();
};
$this->channel->basic_qos(null, 1, null);
$this->channel->basic_consume(
$this->queueName,
'',
false,
false,
false,
false,
$callback
);
echo " [*] 等待消息。按 CTRL+C 退出\n";
while ($this->channel->is_open()) {
$this->channel->wait();
}
}
public function close()
{
$this->channel->close();
$this->connection->close();
}
}
$consumer = new SimpleConsumer();
$consumer->consume();带错误处理的完整示例
php
<?php
class RobustSimpleQueue
{
private $connection;
private $channel;
private $queueName;
private $config;
public function __construct($queueName, array $config = [])
{
$this->queueName = $queueName;
$this->config = array_merge([
'host' => 'localhost',
'port' => 5672,
'user' => 'guest',
'password' => 'guest',
'vhost' => '/'
], $config);
$this->connect();
}
private function connect()
{
try {
$this->connection = new AMQPStreamConnection(
$this->config['host'],
$this->config['port'],
$this->config['user'],
$this->config['password'],
$this->config['vhost']
);
$this->channel = $this->connection->channel();
$this->channel->queue_declare(
$this->queueName,
false,
true,
false,
false
);
echo "连接成功\n";
} catch (Exception $e) {
throw new RuntimeException(
"连接 RabbitMQ 失败: " . $e->getMessage()
);
}
}
public function publish($message, array $options = [])
{
$defaultOptions = [
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
'content_type' => 'application/json'
];
$options = array_merge($defaultOptions, $options);
$msg = new AMQPMessage(
is_string($message) ? $message : json_encode($message),
$options
);
try {
$this->channel->basic_publish($msg, '', $this->queueName);
return true;
} catch (Exception $e) {
error_log("发布消息失败: " . $e->getMessage());
return false;
}
}
public function consume(callable $callback, $timeout = 0)
{
$wrapper = function ($message) use ($callback) {
try {
$result = call_user_func($callback, $message);
$message->ack();
return $result;
} catch (Exception $e) {
error_log("处理消息失败: " . $e->getMessage());
$message->nack(true);
}
};
$this->channel->basic_qos(null, 1, null);
$this->channel->basic_consume(
$this->queueName,
'',
false,
false,
false,
false,
$wrapper
);
while ($this->channel->is_open()) {
try {
$this->channel->wait(null, false, $timeout ?: 0);
} catch (PhpAmqpLib\Exception\AMQPTimeoutException $e) {
if ($timeout > 0) {
break;
}
}
}
}
public function close()
{
try {
if ($this->channel) {
$this->channel->close();
}
if ($this->connection) {
$this->connection->close();
}
} catch (Exception $e) {
error_log("关闭连接失败: " . $e->getMessage());
}
}
public function __destruct()
{
$this->close();
}
}连接池管理
php
<?php
class ConnectionPool
{
private static $connections = [];
private static $maxConnections = 10;
public static function getConnection($name = 'default', array $config = [])
{
if (!isset(self::$connections[$name])) {
if (count(self::$connections) >= self::$maxConnections) {
throw new RuntimeException("连接池已满");
}
$defaultConfig = [
'host' => 'localhost',
'port' => 5672,
'user' => 'guest',
'password' => 'guest'
];
$config = array_merge($defaultConfig, $config);
self::$connections[$name] = new AMQPStreamConnection(
$config['host'],
$config['port'],
$config['user'],
$config['password']
);
}
return self::$connections[$name];
}
public static function closeAll()
{
foreach (self::$connections as $connection) {
try {
$connection->close();
} catch (Exception $e) {
error_log("关闭连接失败: " . $e->getMessage());
}
}
self::$connections = [];
}
public static function close($name)
{
if (isset(self::$connections[$name])) {
try {
self::$connections[$name]->close();
} catch (Exception $e) {
error_log("关闭连接失败: " . $e->getMessage());
}
unset(self::$connections[$name]);
}
}
}实际应用场景
1. 日志收集
php
<?php
class LogCollector
{
private $queue;
public function __construct()
{
$this->queue = new RobustSimpleQueue('app_logs');
}
public function log($level, $message, array $context = [])
{
$logEntry = [
'level' => $level,
'message' => $message,
'context' => $context,
'timestamp' => date('Y-m-d H:i:s'),
'hostname' => gethostname()
];
$this->queue->publish($logEntry);
}
public function info($message, array $context = [])
{
$this->log('info', $message, $context);
}
public function error($message, array $context = [])
{
$this->log('error', $message, $context);
}
public function warning($message, array $context = [])
{
$this->log('warning', $message, $context);
}
}
class LogProcessor
{
private $queue;
public function __construct()
{
$this->queue = new RobustSimpleQueue('app_logs');
}
public function start()
{
echo "日志处理器启动...\n";
$this->queue->consume(function ($message) {
$log = json_decode($message->body, true);
$this->processLog($log);
});
}
private function processLog($log)
{
$level = strtoupper($log['level']);
echo sprintf(
"[%s] %s: %s\n",
$log['timestamp'],
$level,
$log['message']
);
}
}2. 邮件发送队列
php
<?php
class EmailQueue
{
private $queue;
public function __construct()
{
$this->queue = new RobustSimpleQueue('email_queue');
}
public function send($to, $subject, $body, array $options = [])
{
$email = [
'to' => $to,
'subject' => $subject,
'body' => $body,
'options' => $options,
'created_at' => time()
];
$this->queue->publish($email);
echo "邮件已加入发送队列: {$to}\n";
}
public function sendTemplate($to, $template, array $data)
{
$this->send($to, $template, '', [
'template' => $template,
'data' => $data
]);
}
}
class EmailWorker
{
private $queue;
public function __construct()
{
$this->queue = new RobustSimpleQueue('email_queue');
}
public function start()
{
echo "邮件发送工作器启动...\n";
$this->queue->consume(function ($message) {
$email = json_decode($message->body, true);
$this->sendEmail($email);
});
}
private function sendEmail($email)
{
echo sprintf(
"发送邮件到: %s, 主题: %s\n",
$email['to'],
$email['subject']
);
}
}3. 文件处理队列
php
<?php
class FileProcessingQueue
{
private $queue;
public function __construct()
{
$this->queue = new RobustSimpleQueue('file_processing');
}
public function submit($filePath, $operation, array $params = [])
{
$task = [
'task_id' => uniqid('task_'),
'file_path' => $filePath,
'operation' => $operation,
'params' => $params,
'submitted_at' => time()
];
$this->queue->publish($task);
return $task['task_id'];
}
public function resize($filePath, $width, $height)
{
return $this->submit($filePath, 'resize', [
'width' => $width,
'height' => $height
]);
}
public function convert($filePath, $format)
{
return $this->submit($filePath, 'convert', [
'format' => $format
]);
}
}常见问题与解决方案
问题 1:消息丢失
原因:消息未持久化或消费者未确认
解决方案:
php
<?php
class ReliableQueue
{
public function publishWithConfirm($message)
{
$this->channel->confirm_select();
$msg = new AMQPMessage(
json_encode($message),
['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]
);
$this->channel->basic_publish($msg, '', $this->queueName);
try {
$this->channel->wait_for_pending_acks();
return true;
} catch (Exception $e) {
return false;
}
}
}问题 2:消费者处理慢
原因:单消费者处理能力有限
解决方案:
php
<?php
class BatchConsumer
{
private $batchSize = 10;
private $batchTimeout = 5;
private $batch = [];
public function consume()
{
$lastProcess = time();
$callback = function ($message) use (&$lastProcess) {
$this->batch[] = $message;
if (count($this->batch) >= $this->batchSize ||
time() - $lastProcess >= $this->batchTimeout) {
$this->processBatch();
$lastProcess = time();
}
};
$this->channel->basic_consume(
$this->queueName,
'',
false,
false,
false,
false,
$callback
);
while ($this->channel->is_open()) {
$this->channel->wait();
}
}
private function processBatch()
{
if (empty($this->batch)) {
return;
}
foreach ($this->batch as $message) {
$this->process($message);
$message->ack();
}
$this->batch = [];
}
private function process($message)
{
// 处理逻辑
}
}问题 3:连接断开
原因:网络问题或服务器重启
解决方案:
php
<?php
class ReconnectingQueue
{
private $config;
private $connection;
private $channel;
private $maxRetries = 5;
private $retryDelay = 1;
public function __construct(array $config)
{
$this->config = $config;
$this->connect();
}
private function connect()
{
$attempts = 0;
while ($attempts < $this->maxRetries) {
try {
$this->connection = new AMQPStreamConnection(
$this->config['host'],
$this->config['port'],
$this->config['user'],
$this->config['password']
);
$this->channel = $this->connection->channel();
echo "连接成功\n";
return;
} catch (Exception $e) {
$attempts++;
echo "连接失败,尝试 {$attempts}/{$this->maxRetries}\n";
sleep($this->retryDelay * $attempts);
}
}
throw new RuntimeException("无法连接到 RabbitMQ");
}
public function publishWithRetry($message)
{
$attempts = 0;
while ($attempts < $this->maxRetries) {
try {
if (!$this->connection || !$this->connection->isConnected()) {
$this->connect();
}
$msg = new AMQPMessage(json_encode($message));
$this->channel->basic_publish($msg, '', $this->queueName);
return true;
} catch (Exception $e) {
$attempts++;
sleep($this->retryDelay);
}
}
return false;
}
}最佳实践建议
1. 消息格式标准化
php
<?php
class StandardMessage
{
public static function create($type, $payload, array $meta = [])
{
return [
'id' => uniqid('msg_', true),
'type' => $type,
'payload' => $payload,
'meta' => array_merge([
'timestamp' => time(),
'source' => gethostname()
], $meta),
'version' => '1.0'
];
}
public static function parse($body)
{
$data = json_decode($body, true);
if (json_last_error() !== JSON_ERROR_NONE) {
throw new InvalidArgumentException("无效的 JSON 格式");
}
return $data;
}
}2. 监控和指标
php
<?php
class QueueMonitor
{
private $stats = [];
public function recordPublish($queueName, $success = true)
{
$this->increment($queueName, 'published', $success);
}
public function recordConsume($queueName, $success = true)
{
$this->increment($queueName, 'consumed', $success);
}
private function increment($queueName, $type, $success)
{
if (!isset($this->stats[$queueName])) {
$this->stats[$queueName] = [
'published' => ['total' => 0, 'success' => 0, 'failed' => 0],
'consumed' => ['total' => 0, 'success' => 0, 'failed' => 0]
];
}
$this->stats[$queueName][$type]['total']++;
if ($success) {
$this->stats[$queueName][$type]['success']++;
} else {
$this->stats[$queueName][$type]['failed']++;
}
}
public function getStats()
{
return $this->stats;
}
}3. 优雅关闭
php
<?php
class GracefulConsumer
{
private $running = true;
public function consume()
{
pcntl_async_signals(true);
pcntl_signal(SIGTERM, [$this, 'shutdown']);
pcntl_signal(SIGINT, [$this, 'shutdown']);
while ($this->running && $this->channel->is_open()) {
try {
$this->channel->wait(null, false, 1);
} catch (PhpAmqpLib\Exception\AMQPTimeoutException $e) {
continue;
}
}
echo "消费者已关闭\n";
}
public function shutdown()
{
echo "收到关闭信号,正在停止...\n";
$this->running = false;
}
}