Skip to content

RabbitMQ 镜像队列

一、概述

镜像队列(Mirrored Queue)是 RabbitMQ 实现高可用的传统方式,通过在多个节点之间复制队列数据来提供冗余。虽然仲裁队列已成为推荐的高可用方案,但镜像队列在某些场景下仍有其价值。

镜像队列架构

mermaid
graph TB
    subgraph "镜像队列架构"
        N1[节点1<br/>队列主]
        N2[节点2<br/>镜像]
        N3[节点3<br/>镜像]
        
        N1 -->|同步| N2
        N1 -->|同步| N3
    end
    
    subgraph "客户端"
        P[生产者]
        C[消费者]
    end
    
    P -->|写入| N1
    N1 -->|读取| C

二、核心知识点

2.1 镜像队列工作原理

主从架构

mermaid
sequenceDiagram
    participant P as 生产者
    participant M as 主队列
    participant S1 as 镜像1
    participant S2 as 镜像2
    
    P->>M: 发布消息
    M->>S1: 同步消息
    M->>S2: 同步消息
    S1-->>M: 确认
    S2-->>M: 确认
    M-->>P: 确认

数据同步流程

  1. 消息写入: 生产者将消息发送到主队列
  2. 同步复制: 主队列将消息同步到所有镜像
  3. 确认机制: 所有镜像确认后,主队列返回确认
  4. 故障转移: 主队列故障时,镜像提升为主

2.2 镜像策略配置

镜像模式

模式说明配置方式
all镜像到所有节点ha-mode: all
exactly镜像到指定数量节点ha-mode: exactly, ha-params: N
nodes镜像到指定节点ha-mode: nodes, ha-params: [node1, node2]

同步模式

模式说明
automatic自动同步,新镜像加入时自动同步数据
manual手动同步,需要手动触发同步

2.3 故障转移机制

mermaid
stateDiagram-v2
    [*] --> Running: 正常运行
    Running --> MasterDown: 主节点故障
    MasterDown --> PromoteMirror: 选择新主
    PromoteMirror --> Running: 镜像提升为主
    Running --> SyncNewMirror: 新镜像加入
    SyncNewMirror --> Running: 同步完成

主节点选举规则

  1. 选择同步最完整的镜像
  2. 选择运行时间最长的镜像
  3. 选择节点名称字典序最小的镜像

2.4 镜像队列与仲裁队列对比

特性镜像队列仲裁队列
一致性保证最终一致强一致(Raft)
数据安全可能丢失未同步数据保证不丢失
性能较高较低
资源消耗中等较高
推荐场景高吞吐、可容忍少量丢失重要数据、强一致性要求

三、配置示例

3.1 镜像策略配置

镜像到所有节点

bash
# 命令行配置
rabbitmqctl set_policy ha-all "^(?!amq\\.).*" \
    '{"ha-mode":"all","ha-sync-mode":"automatic"}' \
    --apply-to queues

# HTTP API 配置
curl -u admin:password -X PUT \
    http://localhost:15672/api/policies/%2f/ha-all \
    -H "Content-Type: application/json" \
    -d '{
        "pattern": "^(?!amq\\.).*",
        "definition": {
            "ha-mode": "all",
            "ha-sync-mode": "automatic"
        },
        "apply-to": "queues"
    }'

镜像到指定数量节点

bash
# 镜像到 2 个节点
rabbitmqctl set_policy ha-two "^(?!amq\\.).*" \
    '{"ha-mode":"exactly","ha-params":2,"ha-sync-mode":"automatic"}' \
    --apply-to queues

镜像到指定节点

bash
# 镜像到指定节点
rabbitmqctl set_policy ha-nodes "^important\." \
    '{"ha-mode":"nodes","ha-params":["rabbit@node1","rabbit@node2"]}' \
    --apply-to queues

3.2 配置文件方式

ini
# /etc/rabbitmq/advanced.config

[
  {rabbit, [
    {ha_policies, [
      {<<"^ha\."/utf8>>, [
        {<<"ha-mode">>, <<"all">>},
        {<<"ha-sync-mode">>, <<"automatic">>}
      ]}
    ]}
  ]}
].

3.3 同步管理

bash
# 查看队列同步状态
rabbitmqctl list_queues name policy slave_pids synchronised_slave_pids

# 手动同步队列
rabbitmqctl sync_queue queue_name

# 取消同步
rabbitmqctl cancel_sync_queue queue_name

# 查看同步进度
rabbitmqctl list_queues name synchronised_slave_pids

3.4 PHP 代码示例

php
<?php

class MirroredQueueManager
{
    private string $host;
    private int $port;
    private string $user;
    private string $password;
    
    public function __construct(
        string $host = 'localhost',
        int $port = 15672,
        string $user = 'guest',
        string $password = 'guest'
    ) {
        $this->host = $host;
        $this->port = $port;
        $this->user = $user;
        $this->password = $password;
    }
    
    private function request(string $endpoint, string $method = 'GET', array $data = null): array
    {
        $url = "http://{$this->host}:{$this->port}/api/{$endpoint}";
        
        $ch = curl_init();
        curl_setopt_array($ch, [
            CURLOPT_URL => $url,
            CURLOPT_RETURNTRANSFER => true,
            CURLOPT_USERPWD => "{$this->user}:{$this->password}",
            CURLOPT_HTTPHEADER => ['Content-Type: application/json'],
            CURLOPT_CUSTOMREQUEST => $method,
        ]);
        
        if ($data !== null) {
            curl_setopt($ch, CURLOPT_POSTFIELDS, json_encode($data));
        }
        
        $response = curl_exec($ch);
        $httpCode = curl_getinfo($ch, CURLINFO_HTTP_CODE);
        curl_close($ch);
        
        return [
            'status' => $httpCode,
            'data' => json_decode($response, true)
        ];
    }
    
    public function setMirroredPolicy(
        string $policyName,
        string $pattern,
        string $mode = 'all',
        int $exactlyCount = 0,
        array $nodes = [],
        string $syncMode = 'automatic'
    ): array {
        $definition = [
            'ha-mode' => $mode,
            'ha-sync-mode' => $syncMode,
        ];
        
        if ($mode === 'exactly' && $exactlyCount > 0) {
            $definition['ha-params'] = $exactlyCount;
        }
        
        if ($mode === 'nodes' && !empty($nodes)) {
            $definition['ha-params'] = $nodes;
        }
        
        return $this->request("policies/%2f/{$policyName}", 'PUT', [
            'pattern' => $pattern,
            'definition' => $definition,
            'apply-to' => 'queues',
        ]);
    }
    
    public function getQueueMirroringStatus(): array
    {
        $queues = $this->request('queues');
        
        if ($queues['status'] !== 200) {
            return ['error' => 'Failed to get queues'];
        }
        
        $status = [];
        foreach ($queues['data'] as $queue) {
            $backingQueueStatus = $queue['backing_queue_status'] ?? [];
            
            $status[$queue['name']] = [
                'policy' => $queue['policy'] ?? null,
                'master' => $queue['pid'] ?? null,
                'slaves' => $queue['slave_pids'] ?? [],
                'synced_slaves' => $queue['synchronised_slave_pids'] ?? [],
                'is_mirrored' => !empty($queue['slave_pids']),
                'sync_status' => $this->getSyncStatus($queue),
            ];
        }
        
        return $status;
    }
    
    private function getSyncStatus(array $queue): string
    {
        $slaves = $queue['slave_pids'] ?? [];
        $synced = $queue['synchronised_slave_pids'] ?? [];
        
        if (empty($slaves)) {
            return 'not_mirrored';
        }
        
        if (count($slaves) === count($synced)) {
            return 'fully_synced';
        }
        
        return 'syncing';
    }
    
    public function generateMirroringReport(): string
    {
        $status = $this->getQueueMirroringStatus();
        
        $report = "=== 镜像队列报告 ===\n";
        $report .= "时间: " . date('Y-m-d H:i:s') . "\n\n";
        
        $mirroredCount = 0;
        $notMirroredCount = 0;
        $syncingCount = 0;
        
        foreach ($status as $name => $info) {
            if ($info['is_mirrored']) {
                $mirroredCount++;
                $slaveCount = count($info['slaves']);
                $syncedCount = count($info['synced_slaves']);
                
                $report .= "队列: {$name}\n";
                $report .= "  策略: {$info['policy']}\n";
                $report .= "  镜像数: {$slaveCount}\n";
                $report .= "  同步状态: {$info['sync_status']}\n";
                
                if ($info['sync_status'] === 'syncing') {
                    $syncingCount++;
                }
            } else {
                $notMirroredCount++;
            }
        }
        
        $report .= "\n=== 统计 ===\n";
        $report .= "已镜像队列: {$mirroredCount}\n";
        $report .= "未镜像队列: {$notMirroredCount}\n";
        $report .= "同步中队列: {$syncingCount}\n";
        
        return $report;
    }
}

$manager = new MirroredQueueManager('localhost', 15672, 'admin', 'Admin@123456');

// 设置镜像策略
$manager->setMirroredPolicy('ha-all', '^(?!amq\\.).*', 'all');

// 生成报告
echo $manager->generateMirroringReport();

四、常见问题与解决方案

4.1 同步速度慢

问题: 大量数据同步时速度很慢

解决方案:

ini
# 增加同步批次大小
ha-sync-batch-size = 1000

# 增加同步超时时间
ha-sync-mode = automatic

4.2 网络分区后数据不一致

问题: 网络分区恢复后镜像队列数据不一致

解决方案:

bash
# 检查队列状态
rabbitmqctl list_queues name slave_pids synchronised_slave_pids

# 手动同步
rabbitmqctl sync_queue queue_name

# 或删除重建
rabbitmqctl delete_queue queue_name

4.3 主节点故障后消息丢失

问题: 主节点故障时未同步的消息丢失

解决方案:

使用仲裁队列替代镜像队列,或配置 ha-sync-batch-size 减少同步延迟。

五、最佳实践建议

5.1 策略建议

  1. 生产环境: 使用仲裁队列替代镜像队列
  2. 开发测试: 可使用镜像队列简化配置
  3. 重要数据: 必须使用仲裁队列

5.2 配置建议

ini
# 推荐配置
ha-mode = exactly
ha-params = 2  # 主 + 1 镜像
ha-sync-mode = automatic
ha-sync-batch-size = 100

六、相关链接