Appearance
消息分组
概述
消息分组(Message Grouping)是 RabbitMQ 中一种将相关消息组织在一起处理的机制。通过消息分组,可以确保同一组的消息被同一个消费者按顺序处理,这对于需要保持消息顺序性的业务场景非常重要。
核心知识点
什么是消息分组
消息分组的核心思想是:
- 组内顺序:同一组的消息按发送顺序被消费
- 组间并行:不同组的消息可以并行处理
- 亲和性:同一组的消息总是路由到同一个队列分区
mermaid
graph TB
subgraph 生产者
P1[消息组 A]
P2[消息组 B]
P3[消息组 C]
end
subgraph RabbitMQ
Q1[队列分区 1]
Q2[队列分区 2]
Q3[队列分区 3]
end
subgraph 消费者
C1[消费者 1]
C2[消费者 2]
C3[消费者 3]
end
P1 --> Q1
P2 --> Q2
P3 --> Q3
Q1 --> C1
Q2 --> C2
Q3 --> C3
style P1 fill:#e1f5fe
style P2 fill:#fff3e0
style P3 fill:#e8f5e9实现方式
RabbitMQ 提供两种主要的消息分组实现方式:
| 方式 | 说明 | 适用场景 |
|---|---|---|
| Consistent Hash Exchange | 一致性哈希交换器 | 需要均匀分布的场景 |
| Message Group Plugin | 消息分组插件 | 需要严格顺序保证的场景 |
| 自定义路由 | 基于 routing key 分组 | 简单分组场景 |
工作原理
mermaid
sequenceDiagram
participant P as 生产者
participant E as 分组交换器
participant Q1 as 队列1
participant Q2 as 队列2
participant C1 as 消费者1
participant C2 as 消费者2
Note over P: 发送消息组 A-1
P->>E: 消息(group_id=A)
E->>Q1: 路由到队列1
Q1->>C1: 消费
Note over P: 发送消息组 A-2
P->>E: 消息(group_id=A)
E->>Q1: 路由到队列1(同一组)
Q1->>C1: 按顺序消费
Note over P: 发送消息组 B-1
P->>E: 消息(group_id=B)
E->>Q2: 路由到队列2
Q2->>C2: 并行消费PHP 代码示例
基于一致性哈希的消息分组
php
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;
class ConsistentHashPublisher
{
private $connection;
private $channel;
private $exchangeName = 'consistent_hash_exchange';
public function __construct()
{
$this->connection = new AMQPStreamConnection(
'localhost',
5672,
'guest',
'guest'
);
$this->channel = $this->connection->channel();
$this->setupExchange();
}
private function setupExchange()
{
$this->channel->exchange_declare(
$this->exchangeName,
'x-consistent-hash',
false,
true,
false
);
}
public function publish($groupId, $message)
{
$msg = new AMQPMessage(
json_encode([
'group_id' => $groupId,
'data' => $message,
'timestamp' => time()
]),
['content_type' => 'application/json']
);
$this->channel->basic_publish(
$msg,
$this->exchangeName,
$groupId
);
echo "消息已发送到组: {$groupId}\n";
}
public function close()
{
$this->channel->close();
$this->connection->close();
}
}分组队列消费者
php
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
class GroupedQueueConsumer
{
private $connection;
private $channel;
private $exchangeName = 'consistent_hash_exchange';
private $queueName;
public function __construct($workerId)
{
$this->queueName = "group_queue_{$workerId}";
$this->connection = new AMQPStreamConnection(
'localhost',
5672,
'guest',
'guest'
);
$this->channel = $this->connection->channel();
$this->setupQueue();
}
private function setupQueue()
{
$this->channel->queue_declare(
$this->queueName,
false,
true,
false,
false
);
$this->channel->queue_bind(
$this->queueName,
$this->exchangeName,
'1'
);
}
public function consume()
{
$callback = function (AMQPMessage $message) {
$body = json_decode($message->body, true);
$groupId = $body['group_id'];
echo sprintf(
"[%s] 处理消息组: %s, 数据: %s\n",
date('Y-m-d H:i:s'),
$groupId,
json_encode($body['data'], JSON_UNESCAPED_UNICODE)
);
$message->ack();
};
$this->channel->basic_qos(null, 1, null);
$this->channel->basic_consume(
$this->queueName,
'',
false,
false,
false,
false,
$callback
);
echo "消费者已启动,监听队列: {$this->queueName}\n";
while ($this->channel->is_open()) {
$this->channel->wait();
}
}
public function close()
{
$this->channel->close();
$this->connection->close();
}
}基于 Routing Key 的简单分组
php
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
class SimpleGroupPublisher
{
private $connection;
private $channel;
private $exchangeName = 'group_exchange';
public function __construct()
{
$this->connection = new AMQPStreamConnection(
'localhost',
5672,
'guest',
'guest'
);
$this->channel = $this->connection->channel();
$this->channel->exchange_declare(
$this->exchangeName,
'direct',
false,
true,
false
);
}
public function publishToGroup($groupId, $message, $sequenceNumber = null)
{
$msgData = [
'group_id' => $groupId,
'data' => $message,
'timestamp' => time()
];
if ($sequenceNumber !== null) {
$msgData['sequence'] = $sequenceNumber;
}
$msg = new AMQPMessage(
json_encode($msgData),
[
'content_type' => 'application/json',
'headers' => [
'group_id' => $groupId
]
]
);
$this->channel->basic_publish(
$msg,
$this->exchangeName,
$groupId
);
echo "消息已发送到组: {$groupId}\n";
}
public function close()
{
$this->channel->close();
$this->connection->close();
}
}分组消息处理器
php
<?php
class GroupMessageProcessor
{
private $connection;
private $channel;
private $exchangeName = 'group_exchange';
private $groupId;
private $queueName;
private $lastSequence = 0;
private $pendingMessages = [];
public function __construct($groupId)
{
$this->groupId = $groupId;
$this->queueName = "group_queue_{$groupId}";
$this->connection = new AMQPStreamConnection(
'localhost',
5672,
'guest',
'guest'
);
$this->channel = $this->connection->channel();
$this->setup();
}
private function setup()
{
$this->channel->exchange_declare(
$this->exchangeName,
'direct',
false,
true,
false
);
$this->channel->queue_declare(
$this->queueName,
false,
true,
false,
false
);
$this->channel->queue_bind(
$this->queueName,
$this->exchangeName,
$this->groupId
);
}
public function consume()
{
$callback = function (AMQPMessage $message) {
$body = json_decode($message->body, true);
$sequence = $body['sequence'] ?? null;
if ($sequence !== null) {
$this->processOrdered($message, $body, $sequence);
} else {
$this->processUnordered($message, $body);
}
};
$this->channel->basic_qos(null, 1, null);
$this->channel->basic_consume(
$this->queueName,
'',
false,
false,
false,
false,
$callback
);
echo "分组消费者已启动,组ID: {$this->groupId}\n";
while ($this->channel->is_open()) {
$this->channel->wait();
}
}
private function processOrdered($message, $body, $sequence)
{
if ($sequence === $this->lastSequence + 1) {
$this->executeMessage($body);
$this->lastSequence = $sequence;
$this->processPendingMessages();
} else {
$this->pendingMessages[$sequence] = [
'message' => $message,
'body' => $body
];
}
$message->ack();
}
private function processUnordered($message, $body)
{
$this->executeMessage($body);
$message->ack();
}
private function processPendingMessages()
{
while (isset($this->pendingMessages[$this->lastSequence + 1])) {
$next = $this->pendingMessages[$this->lastSequence + 1];
$this->executeMessage($next['body']);
$this->lastSequence++;
unset($this->pendingMessages[$this->lastSequence]);
}
}
private function executeMessage($body)
{
echo sprintf(
"[%s] 处理消息 - 组: %s, 序号: %s, 数据: %s\n",
date('Y-m-d H:i:s'),
$body['group_id'],
$body['sequence'] ?? 'N/A',
json_encode($body['data'], JSON_UNESCAPED_UNICODE)
);
}
public function close()
{
$this->channel->close();
$this->connection->close();
}
}完整示例:订单处理分组
php
<?php
class OrderGroupProcessor
{
private $publisher;
private $groupId;
public function __construct($groupId)
{
$this->publisher = new SimpleGroupPublisher();
$this->groupId = $groupId;
}
public function processOrder($orderId, $steps)
{
$sequence = 1;
foreach ($steps as $step) {
$this->publisher->publishToGroup(
$this->groupId,
[
'order_id' => $orderId,
'step' => $step,
'action' => $this->getActionForStep($step)
],
$sequence++
);
}
}
private function getActionForStep($step)
{
$actions = [
'validate' => '验证订单信息',
'inventory' => '检查库存',
'payment' => '处理支付',
'shipping' => '创建物流单',
'notification' => '发送通知'
];
return $actions[$step] ?? $step;
}
public function close()
{
$this->publisher->close();
}
}
class OrderGroupConsumer
{
private $processor;
public function __construct($groupId)
{
$this->processor = new GroupMessageProcessor($groupId);
}
public function start()
{
$this->processor->consume();
}
public function close()
{
$this->processor->close();
}
}
$orderProcessor = new OrderGroupProcessor('order_group_1');
$orderProcessor->processOrder('ORD-001', [
'validate',
'inventory',
'payment',
'shipping',
'notification'
]);
$orderProcessor->close();实际应用场景
1. 用户操作顺序处理
php
<?php
class UserActionGrouper
{
private $publisher;
private $users = [];
public function __construct()
{
$this->publisher = new SimpleGroupPublisher();
}
public function recordAction($userId, $action, $data = [])
{
$groupId = "user_{$userId}";
if (!isset($this->users[$userId])) {
$this->users[$userId] = 0;
}
$sequence = ++$this->users[$userId];
$this->publisher->publishToGroup($groupId, [
'user_id' => $userId,
'action' => $action,
'data' => $data
], $sequence);
}
public function close()
{
$this->publisher->close();
}
}
$grouper = new UserActionGrouper();
$grouper->recordAction(1001, 'login', ['ip' => '192.168.1.1']);
$grouper->recordAction(1001, 'view_product', ['product_id' => 123]);
$grouper->recordAction(1001, 'add_to_cart', ['product_id' => 123, 'qty' => 2]);
$grouper->recordAction(1001, 'checkout', ['cart_id' => 'CART-001']);
$grouper->close();2. 分布式任务编排
php
<?php
class TaskOrchestrator
{
private $publisher;
private $taskGroups = [];
public function __construct()
{
$this->publisher = new SimpleGroupPublisher();
}
public function createTaskGroup($groupId, $tasks)
{
$this->taskGroups[$groupId] = $tasks;
foreach ($tasks as $index => $task) {
$this->publisher->publishToGroup($groupId, [
'task_id' => uniqid(),
'task_name' => $task['name'],
'task_params' => $task['params'] ?? [],
'depends_on' => $task['depends_on'] ?? []
], $index + 1);
}
}
public function close()
{
$this->publisher->close();
}
}
$orchestrator = new TaskOrchestrator();
$orchestrator->createTaskGroup('batch_import_001', [
['name' => 'download', 'params' => ['url' => 'http://example.com/data.csv']],
['name' => 'parse', 'params' => ['format' => 'csv'], 'depends_on' => ['download']],
['name' => 'validate', 'params' => [], 'depends_on' => ['parse']],
['name' => 'import', 'params' => ['batch_size' => 1000], 'depends_on' => ['validate']],
['name' => 'cleanup', 'params' => [], 'depends_on' => ['import']]
]);
$orchestrator->close();3. 会话消息处理
php
<?php
class SessionMessageGrouper
{
private $publisher;
private $sessions = [];
public function __construct()
{
$this->publisher = new SimpleGroupPublisher();
}
public function sendMessage($sessionId, $from, $content)
{
$groupId = "session_{$sessionId}";
if (!isset($this->sessions[$sessionId])) {
$this->sessions[$sessionId] = 0;
}
$sequence = ++$this->sessions[$sessionId];
$this->publisher->publishToGroup($groupId, [
'session_id' => $sessionId,
'from' => $from,
'content' => $content,
'message_id' => uniqid('msg_')
], $sequence);
}
public function close()
{
$this->publisher->close();
}
}
$sessionGrouper = new SessionMessageGrouper();
$sessionGrouper->sendMessage('sess_001', 'user_a', '你好');
$sessionGrouper->sendMessage('sess_001', 'user_b', '你好,有什么可以帮助你的?');
$sessionGrouper->sendMessage('sess_001', 'user_a', '我想咨询一下产品信息');
$sessionGrouper->close();常见问题与解决方案
问题 1:消息顺序错乱
原因:网络延迟或消费者处理速度不一致
解决方案:
php
<?php
class OrderedMessageHandler
{
private $expectedSequence = 1;
private $buffer = [];
private $bufferSize = 100;
public function handle($message)
{
$sequence = $message['sequence'];
if ($sequence === $this->expectedSequence) {
$this->process($message);
$this->expectedSequence++;
$this->flushBuffer();
} elseif ($sequence > $this->expectedSequence) {
if (count($this->buffer) < $this->bufferSize) {
$this->buffer[$sequence] = $message;
} else {
throw new RuntimeException("缓冲区已满,可能存在消息丢失");
}
}
}
private function flushBuffer()
{
while (isset($this->buffer[$this->expectedSequence])) {
$message = $this->buffer[$this->expectedSequence];
$this->process($message);
unset($this->buffer[$this->expectedSequence]);
$this->expectedSequence++;
}
}
private function process($message)
{
echo "处理消息序号: {$message['sequence']}\n";
}
}问题 2:分组负载不均衡
原因:某些组消息量过大
解决方案:
php
<?php
class BalancedGroupPublisher
{
private $publisher;
private $groupStats = [];
private $maxGroupSize = 1000;
public function publish($baseGroupId, $message)
{
$groupId = $this->getBalancedGroupId($baseGroupId);
$this->publisher->publishToGroup($groupId, $message);
if (!isset($this->groupStats[$groupId])) {
$this->groupStats[$groupId] = 0;
}
$this->groupStats[$groupId]++;
}
private function getBalancedGroupId($baseGroupId)
{
$partition = 0;
for ($i = 0; $i < 100; $i++) {
$groupId = "{$baseGroupId}_{$partition}";
$currentSize = $this->groupStats[$groupId] ?? 0;
if ($currentSize < $this->maxGroupSize) {
return $groupId;
}
$partition++;
}
return "{$baseGroupId}_{$partition}";
}
}问题 3:消费者故障导致消息阻塞
原因:单消费者处理整个组
解决方案:
php
<?php
class FaultTolerantGroupConsumer
{
private $groupId;
private $lastProcessedSequence = 0;
private $storage;
public function __construct($groupId)
{
$this->groupId = $groupId;
$this->storage = new Redis();
$this->storage->connect('127.0.0.1', 6379);
$this->loadLastProcessedSequence();
}
private function loadLastProcessedSequence()
{
$key = "group:{$this->groupId}:last_sequence";
$this->lastProcessedSequence = (int)$this->storage->get($key) ?: 0;
}
public function process($message)
{
$sequence = $message['sequence'];
if ($sequence <= $this->lastProcessedSequence) {
echo "跳过已处理的消息: {$sequence}\n";
return;
}
$this->executeMessage($message);
$this->lastProcessedSequence = $sequence;
$this->saveLastProcessedSequence();
}
private function saveLastProcessedSequence()
{
$key = "group:{$this->groupId}:last_sequence";
$this->storage->set($key, $this->lastProcessedSequence);
}
private function executeMessage($message)
{
echo "处理消息: " . json_encode($message) . "\n";
}
}最佳实践建议
1. 合理设计分组策略
php
<?php
class GroupStrategy
{
const STRATEGY_USER = 'user';
const STRATEGY_ORDER = 'order';
const STRATEGY_SESSION = 'session';
const STRATEGY_REGION = 'region';
public function getGroupId($strategy, $identifier)
{
switch ($strategy) {
case self::STRATEGY_USER:
return "user_{$identifier}";
case self::STRATEGY_ORDER:
return "order_{$identifier}";
case self::STRATEGY_SESSION:
return "session_{$identifier}";
case self::STRATEGY_REGION:
return "region_{$identifier}";
default:
return "default_{$identifier}";
}
}
public function getPartition($identifier, $partitionCount)
{
$hash = crc32($identifier);
return abs($hash) % $partitionCount;
}
}2. 监控分组状态
php
<?php
class GroupMonitor
{
private $redis;
public function __construct()
{
$this->redis = new Redis();
$this->redis->connect('127.0.0.1', 6379);
}
public function recordMessageSent($groupId, $sequence)
{
$key = "group:{$groupId}:stats";
$this->redis->hSet($key, 'last_sent', $sequence);
$this->redis->hIncrBy($key, 'total_sent', 1);
}
public function recordMessageProcessed($groupId, $sequence)
{
$key = "group:{$groupId}:stats";
$this->redis->hSet($key, 'last_processed', $sequence);
$this->redis->hIncrBy($key, 'total_processed', 1);
}
public function getGroupLag($groupId)
{
$key = "group:{$groupId}:stats";
$lastSent = (int)$this->redis->hGet($key, 'last_sent') ?: 0;
$lastProcessed = (int)$this->redis->hGet($key, 'last_processed') ?: 0;
return $lastSent - $lastProcessed;
}
public function getGroupStats($groupId)
{
$key = "group:{$groupId}:stats";
return $this->redis->hGetAll($key);
}
}3. 实现优雅的消费者重启
php
<?php
class GracefulGroupConsumer
{
private $groupId;
private $running = true;
private $currentMessage = null;
public function __construct($groupId)
{
$this->groupId = $groupId;
pcntl_async_signals(true);
pcntl_signal(SIGTERM, [$this, 'handleShutdown']);
pcntl_signal(SIGINT, [$this, 'handleShutdown']);
}
public function handleShutdown($signal)
{
echo "收到停止信号,准备优雅关闭...\n";
$this->running = false;
}
public function consume()
{
while ($this->running) {
$message = $this->fetchMessage();
if (!$message) {
usleep(100000);
continue;
}
$this->currentMessage = $message;
$this->process($message);
$this->ack($message);
$this->currentMessage = null;
}
if ($this->currentMessage) {
echo "等待当前消息处理完成...\n";
}
echo "消费者已关闭\n";
}
private function fetchMessage()
{
return null;
}
private function process($message)
{
echo "处理消息...\n";
}
private function ack($message)
{
echo "确认消息\n";
}
}