Skip to content

消息分组

概述

消息分组(Message Grouping)是 RabbitMQ 中一种将相关消息组织在一起处理的机制。通过消息分组,可以确保同一组的消息被同一个消费者按顺序处理,这对于需要保持消息顺序性的业务场景非常重要。

核心知识点

什么是消息分组

消息分组的核心思想是:

  1. 组内顺序:同一组的消息按发送顺序被消费
  2. 组间并行:不同组的消息可以并行处理
  3. 亲和性:同一组的消息总是路由到同一个队列分区
mermaid
graph TB
    subgraph 生产者
        P1[消息组 A]
        P2[消息组 B]
        P3[消息组 C]
    end

    subgraph RabbitMQ
        Q1[队列分区 1]
        Q2[队列分区 2]
        Q3[队列分区 3]
    end

    subgraph 消费者
        C1[消费者 1]
        C2[消费者 2]
        C3[消费者 3]
    end

    P1 --> Q1
    P2 --> Q2
    P3 --> Q3

    Q1 --> C1
    Q2 --> C2
    Q3 --> C3

    style P1 fill:#e1f5fe
    style P2 fill:#fff3e0
    style P3 fill:#e8f5e9

实现方式

RabbitMQ 提供两种主要的消息分组实现方式:

方式说明适用场景
Consistent Hash Exchange一致性哈希交换器需要均匀分布的场景
Message Group Plugin消息分组插件需要严格顺序保证的场景
自定义路由基于 routing key 分组简单分组场景

工作原理

mermaid
sequenceDiagram
    participant P as 生产者
    participant E as 分组交换器
    participant Q1 as 队列1
    participant Q2 as 队列2
    participant C1 as 消费者1
    participant C2 as 消费者2

    Note over P: 发送消息组 A-1
    P->>E: 消息(group_id=A)
    E->>Q1: 路由到队列1
    Q1->>C1: 消费

    Note over P: 发送消息组 A-2
    P->>E: 消息(group_id=A)
    E->>Q1: 路由到队列1(同一组)
    Q1->>C1: 按顺序消费

    Note over P: 发送消息组 B-1
    P->>E: 消息(group_id=B)
    E->>Q2: 路由到队列2
    Q2->>C2: 并行消费

PHP 代码示例

基于一致性哈希的消息分组

php
<?php

require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;

class ConsistentHashPublisher
{
    private $connection;
    private $channel;
    private $exchangeName = 'consistent_hash_exchange';

    public function __construct()
    {
        $this->connection = new AMQPStreamConnection(
            'localhost',
            5672,
            'guest',
            'guest'
        );
        $this->channel = $this->connection->channel();

        $this->setupExchange();
    }

    private function setupExchange()
    {
        $this->channel->exchange_declare(
            $this->exchangeName,
            'x-consistent-hash',
            false,
            true,
            false
        );
    }

    public function publish($groupId, $message)
    {
        $msg = new AMQPMessage(
            json_encode([
                'group_id' => $groupId,
                'data' => $message,
                'timestamp' => time()
            ]),
            ['content_type' => 'application/json']
        );

        $this->channel->basic_publish(
            $msg,
            $this->exchangeName,
            $groupId
        );

        echo "消息已发送到组: {$groupId}\n";
    }

    public function close()
    {
        $this->channel->close();
        $this->connection->close();
    }
}

分组队列消费者

php
<?php

require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

class GroupedQueueConsumer
{
    private $connection;
    private $channel;
    private $exchangeName = 'consistent_hash_exchange';
    private $queueName;

    public function __construct($workerId)
    {
        $this->queueName = "group_queue_{$workerId}";

        $this->connection = new AMQPStreamConnection(
            'localhost',
            5672,
            'guest',
            'guest'
        );
        $this->channel = $this->connection->channel();

        $this->setupQueue();
    }

    private function setupQueue()
    {
        $this->channel->queue_declare(
            $this->queueName,
            false,
            true,
            false,
            false
        );

        $this->channel->queue_bind(
            $this->queueName,
            $this->exchangeName,
            '1'
        );
    }

    public function consume()
    {
        $callback = function (AMQPMessage $message) {
            $body = json_decode($message->body, true);
            $groupId = $body['group_id'];

            echo sprintf(
                "[%s] 处理消息组: %s, 数据: %s\n",
                date('Y-m-d H:i:s'),
                $groupId,
                json_encode($body['data'], JSON_UNESCAPED_UNICODE)
            );

            $message->ack();
        };

        $this->channel->basic_qos(null, 1, null);
        $this->channel->basic_consume(
            $this->queueName,
            '',
            false,
            false,
            false,
            false,
            $callback
        );

        echo "消费者已启动,监听队列: {$this->queueName}\n";

        while ($this->channel->is_open()) {
            $this->channel->wait();
        }
    }

    public function close()
    {
        $this->channel->close();
        $this->connection->close();
    }
}

基于 Routing Key 的简单分组

php
<?php

require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

class SimpleGroupPublisher
{
    private $connection;
    private $channel;
    private $exchangeName = 'group_exchange';

    public function __construct()
    {
        $this->connection = new AMQPStreamConnection(
            'localhost',
            5672,
            'guest',
            'guest'
        );
        $this->channel = $this->connection->channel();

        $this->channel->exchange_declare(
            $this->exchangeName,
            'direct',
            false,
            true,
            false
        );
    }

    public function publishToGroup($groupId, $message, $sequenceNumber = null)
    {
        $msgData = [
            'group_id' => $groupId,
            'data' => $message,
            'timestamp' => time()
        ];

        if ($sequenceNumber !== null) {
            $msgData['sequence'] = $sequenceNumber;
        }

        $msg = new AMQPMessage(
            json_encode($msgData),
            [
                'content_type' => 'application/json',
                'headers' => [
                    'group_id' => $groupId
                ]
            ]
        );

        $this->channel->basic_publish(
            $msg,
            $this->exchangeName,
            $groupId
        );

        echo "消息已发送到组: {$groupId}\n";
    }

    public function close()
    {
        $this->channel->close();
        $this->connection->close();
    }
}

分组消息处理器

php
<?php

class GroupMessageProcessor
{
    private $connection;
    private $channel;
    private $exchangeName = 'group_exchange';
    private $groupId;
    private $queueName;
    private $lastSequence = 0;
    private $pendingMessages = [];

    public function __construct($groupId)
    {
        $this->groupId = $groupId;
        $this->queueName = "group_queue_{$groupId}";

        $this->connection = new AMQPStreamConnection(
            'localhost',
            5672,
            'guest',
            'guest'
        );
        $this->channel = $this->connection->channel();

        $this->setup();
    }

    private function setup()
    {
        $this->channel->exchange_declare(
            $this->exchangeName,
            'direct',
            false,
            true,
            false
        );

        $this->channel->queue_declare(
            $this->queueName,
            false,
            true,
            false,
            false
        );

        $this->channel->queue_bind(
            $this->queueName,
            $this->exchangeName,
            $this->groupId
        );
    }

    public function consume()
    {
        $callback = function (AMQPMessage $message) {
            $body = json_decode($message->body, true);
            $sequence = $body['sequence'] ?? null;

            if ($sequence !== null) {
                $this->processOrdered($message, $body, $sequence);
            } else {
                $this->processUnordered($message, $body);
            }
        };

        $this->channel->basic_qos(null, 1, null);
        $this->channel->basic_consume(
            $this->queueName,
            '',
            false,
            false,
            false,
            false,
            $callback
        );

        echo "分组消费者已启动,组ID: {$this->groupId}\n";

        while ($this->channel->is_open()) {
            $this->channel->wait();
        }
    }

    private function processOrdered($message, $body, $sequence)
    {
        if ($sequence === $this->lastSequence + 1) {
            $this->executeMessage($body);
            $this->lastSequence = $sequence;

            $this->processPendingMessages();
        } else {
            $this->pendingMessages[$sequence] = [
                'message' => $message,
                'body' => $body
            ];
        }

        $message->ack();
    }

    private function processUnordered($message, $body)
    {
        $this->executeMessage($body);
        $message->ack();
    }

    private function processPendingMessages()
    {
        while (isset($this->pendingMessages[$this->lastSequence + 1])) {
            $next = $this->pendingMessages[$this->lastSequence + 1];
            $this->executeMessage($next['body']);
            $this->lastSequence++;
            unset($this->pendingMessages[$this->lastSequence]);
        }
    }

    private function executeMessage($body)
    {
        echo sprintf(
            "[%s] 处理消息 - 组: %s, 序号: %s, 数据: %s\n",
            date('Y-m-d H:i:s'),
            $body['group_id'],
            $body['sequence'] ?? 'N/A',
            json_encode($body['data'], JSON_UNESCAPED_UNICODE)
        );
    }

    public function close()
    {
        $this->channel->close();
        $this->connection->close();
    }
}

完整示例:订单处理分组

php
<?php

class OrderGroupProcessor
{
    private $publisher;
    private $groupId;

    public function __construct($groupId)
    {
        $this->publisher = new SimpleGroupPublisher();
        $this->groupId = $groupId;
    }

    public function processOrder($orderId, $steps)
    {
        $sequence = 1;

        foreach ($steps as $step) {
            $this->publisher->publishToGroup(
                $this->groupId,
                [
                    'order_id' => $orderId,
                    'step' => $step,
                    'action' => $this->getActionForStep($step)
                ],
                $sequence++
            );
        }
    }

    private function getActionForStep($step)
    {
        $actions = [
            'validate' => '验证订单信息',
            'inventory' => '检查库存',
            'payment' => '处理支付',
            'shipping' => '创建物流单',
            'notification' => '发送通知'
        ];

        return $actions[$step] ?? $step;
    }

    public function close()
    {
        $this->publisher->close();
    }
}

class OrderGroupConsumer
{
    private $processor;

    public function __construct($groupId)
    {
        $this->processor = new GroupMessageProcessor($groupId);
    }

    public function start()
    {
        $this->processor->consume();
    }

    public function close()
    {
        $this->processor->close();
    }
}

$orderProcessor = new OrderGroupProcessor('order_group_1');
$orderProcessor->processOrder('ORD-001', [
    'validate',
    'inventory',
    'payment',
    'shipping',
    'notification'
]);
$orderProcessor->close();

实际应用场景

1. 用户操作顺序处理

php
<?php

class UserActionGrouper
{
    private $publisher;
    private $users = [];

    public function __construct()
    {
        $this->publisher = new SimpleGroupPublisher();
    }

    public function recordAction($userId, $action, $data = [])
    {
        $groupId = "user_{$userId}";

        if (!isset($this->users[$userId])) {
            $this->users[$userId] = 0;
        }

        $sequence = ++$this->users[$userId];

        $this->publisher->publishToGroup($groupId, [
            'user_id' => $userId,
            'action' => $action,
            'data' => $data
        ], $sequence);
    }

    public function close()
    {
        $this->publisher->close();
    }
}

$grouper = new UserActionGrouper();

$grouper->recordAction(1001, 'login', ['ip' => '192.168.1.1']);
$grouper->recordAction(1001, 'view_product', ['product_id' => 123]);
$grouper->recordAction(1001, 'add_to_cart', ['product_id' => 123, 'qty' => 2]);
$grouper->recordAction(1001, 'checkout', ['cart_id' => 'CART-001']);

$grouper->close();

2. 分布式任务编排

php
<?php

class TaskOrchestrator
{
    private $publisher;
    private $taskGroups = [];

    public function __construct()
    {
        $this->publisher = new SimpleGroupPublisher();
    }

    public function createTaskGroup($groupId, $tasks)
    {
        $this->taskGroups[$groupId] = $tasks;

        foreach ($tasks as $index => $task) {
            $this->publisher->publishToGroup($groupId, [
                'task_id' => uniqid(),
                'task_name' => $task['name'],
                'task_params' => $task['params'] ?? [],
                'depends_on' => $task['depends_on'] ?? []
            ], $index + 1);
        }
    }

    public function close()
    {
        $this->publisher->close();
    }
}

$orchestrator = new TaskOrchestrator();

$orchestrator->createTaskGroup('batch_import_001', [
    ['name' => 'download', 'params' => ['url' => 'http://example.com/data.csv']],
    ['name' => 'parse', 'params' => ['format' => 'csv'], 'depends_on' => ['download']],
    ['name' => 'validate', 'params' => [], 'depends_on' => ['parse']],
    ['name' => 'import', 'params' => ['batch_size' => 1000], 'depends_on' => ['validate']],
    ['name' => 'cleanup', 'params' => [], 'depends_on' => ['import']]
]);

$orchestrator->close();

3. 会话消息处理

php
<?php

class SessionMessageGrouper
{
    private $publisher;
    private $sessions = [];

    public function __construct()
    {
        $this->publisher = new SimpleGroupPublisher();
    }

    public function sendMessage($sessionId, $from, $content)
    {
        $groupId = "session_{$sessionId}";

        if (!isset($this->sessions[$sessionId])) {
            $this->sessions[$sessionId] = 0;
        }

        $sequence = ++$this->sessions[$sessionId];

        $this->publisher->publishToGroup($groupId, [
            'session_id' => $sessionId,
            'from' => $from,
            'content' => $content,
            'message_id' => uniqid('msg_')
        ], $sequence);
    }

    public function close()
    {
        $this->publisher->close();
    }
}

$sessionGrouper = new SessionMessageGrouper();

$sessionGrouper->sendMessage('sess_001', 'user_a', '你好');
$sessionGrouper->sendMessage('sess_001', 'user_b', '你好,有什么可以帮助你的?');
$sessionGrouper->sendMessage('sess_001', 'user_a', '我想咨询一下产品信息');

$sessionGrouper->close();

常见问题与解决方案

问题 1:消息顺序错乱

原因:网络延迟或消费者处理速度不一致

解决方案

php
<?php

class OrderedMessageHandler
{
    private $expectedSequence = 1;
    private $buffer = [];
    private $bufferSize = 100;

    public function handle($message)
    {
        $sequence = $message['sequence'];

        if ($sequence === $this->expectedSequence) {
            $this->process($message);
            $this->expectedSequence++;

            $this->flushBuffer();
        } elseif ($sequence > $this->expectedSequence) {
            if (count($this->buffer) < $this->bufferSize) {
                $this->buffer[$sequence] = $message;
            } else {
                throw new RuntimeException("缓冲区已满,可能存在消息丢失");
            }
        }
    }

    private function flushBuffer()
    {
        while (isset($this->buffer[$this->expectedSequence])) {
            $message = $this->buffer[$this->expectedSequence];
            $this->process($message);
            unset($this->buffer[$this->expectedSequence]);
            $this->expectedSequence++;
        }
    }

    private function process($message)
    {
        echo "处理消息序号: {$message['sequence']}\n";
    }
}

问题 2:分组负载不均衡

原因:某些组消息量过大

解决方案

php
<?php

class BalancedGroupPublisher
{
    private $publisher;
    private $groupStats = [];
    private $maxGroupSize = 1000;

    public function publish($baseGroupId, $message)
    {
        $groupId = $this->getBalancedGroupId($baseGroupId);

        $this->publisher->publishToGroup($groupId, $message);

        if (!isset($this->groupStats[$groupId])) {
            $this->groupStats[$groupId] = 0;
        }
        $this->groupStats[$groupId]++;
    }

    private function getBalancedGroupId($baseGroupId)
    {
        $partition = 0;

        for ($i = 0; $i < 100; $i++) {
            $groupId = "{$baseGroupId}_{$partition}";

            $currentSize = $this->groupStats[$groupId] ?? 0;

            if ($currentSize < $this->maxGroupSize) {
                return $groupId;
            }

            $partition++;
        }

        return "{$baseGroupId}_{$partition}";
    }
}

问题 3:消费者故障导致消息阻塞

原因:单消费者处理整个组

解决方案

php
<?php

class FaultTolerantGroupConsumer
{
    private $groupId;
    private $lastProcessedSequence = 0;
    private $storage;

    public function __construct($groupId)
    {
        $this->groupId = $groupId;
        $this->storage = new Redis();
        $this->storage->connect('127.0.0.1', 6379);

        $this->loadLastProcessedSequence();
    }

    private function loadLastProcessedSequence()
    {
        $key = "group:{$this->groupId}:last_sequence";
        $this->lastProcessedSequence = (int)$this->storage->get($key) ?: 0;
    }

    public function process($message)
    {
        $sequence = $message['sequence'];

        if ($sequence <= $this->lastProcessedSequence) {
            echo "跳过已处理的消息: {$sequence}\n";
            return;
        }

        $this->executeMessage($message);

        $this->lastProcessedSequence = $sequence;
        $this->saveLastProcessedSequence();
    }

    private function saveLastProcessedSequence()
    {
        $key = "group:{$this->groupId}:last_sequence";
        $this->storage->set($key, $this->lastProcessedSequence);
    }

    private function executeMessage($message)
    {
        echo "处理消息: " . json_encode($message) . "\n";
    }
}

最佳实践建议

1. 合理设计分组策略

php
<?php

class GroupStrategy
{
    const STRATEGY_USER = 'user';
    const STRATEGY_ORDER = 'order';
    const STRATEGY_SESSION = 'session';
    const STRATEGY_REGION = 'region';

    public function getGroupId($strategy, $identifier)
    {
        switch ($strategy) {
            case self::STRATEGY_USER:
                return "user_{$identifier}";
            case self::STRATEGY_ORDER:
                return "order_{$identifier}";
            case self::STRATEGY_SESSION:
                return "session_{$identifier}";
            case self::STRATEGY_REGION:
                return "region_{$identifier}";
            default:
                return "default_{$identifier}";
        }
    }

    public function getPartition($identifier, $partitionCount)
    {
        $hash = crc32($identifier);
        return abs($hash) % $partitionCount;
    }
}

2. 监控分组状态

php
<?php

class GroupMonitor
{
    private $redis;

    public function __construct()
    {
        $this->redis = new Redis();
        $this->redis->connect('127.0.0.1', 6379);
    }

    public function recordMessageSent($groupId, $sequence)
    {
        $key = "group:{$groupId}:stats";
        $this->redis->hSet($key, 'last_sent', $sequence);
        $this->redis->hIncrBy($key, 'total_sent', 1);
    }

    public function recordMessageProcessed($groupId, $sequence)
    {
        $key = "group:{$groupId}:stats";
        $this->redis->hSet($key, 'last_processed', $sequence);
        $this->redis->hIncrBy($key, 'total_processed', 1);
    }

    public function getGroupLag($groupId)
    {
        $key = "group:{$groupId}:stats";
        $lastSent = (int)$this->redis->hGet($key, 'last_sent') ?: 0;
        $lastProcessed = (int)$this->redis->hGet($key, 'last_processed') ?: 0;

        return $lastSent - $lastProcessed;
    }

    public function getGroupStats($groupId)
    {
        $key = "group:{$groupId}:stats";
        return $this->redis->hGetAll($key);
    }
}

3. 实现优雅的消费者重启

php
<?php

class GracefulGroupConsumer
{
    private $groupId;
    private $running = true;
    private $currentMessage = null;

    public function __construct($groupId)
    {
        $this->groupId = $groupId;

        pcntl_async_signals(true);
        pcntl_signal(SIGTERM, [$this, 'handleShutdown']);
        pcntl_signal(SIGINT, [$this, 'handleShutdown']);
    }

    public function handleShutdown($signal)
    {
        echo "收到停止信号,准备优雅关闭...\n";
        $this->running = false;
    }

    public function consume()
    {
        while ($this->running) {
            $message = $this->fetchMessage();

            if (!$message) {
                usleep(100000);
                continue;
            }

            $this->currentMessage = $message;

            $this->process($message);

            $this->ack($message);

            $this->currentMessage = null;
        }

        if ($this->currentMessage) {
            echo "等待当前消息处理完成...\n";
        }

        echo "消费者已关闭\n";
    }

    private function fetchMessage()
    {
        return null;
    }

    private function process($message)
    {
        echo "处理消息...\n";
    }

    private function ack($message)
    {
        echo "确认消息\n";
    }
}

相关链接