Skip to content

RabbitMQ 多活数据中心

一、概述

多活数据中心(Multi-Data Center)部署是 RabbitMQ 实现跨地域高可用的关键架构。通过在不同数据中心部署独立的 RabbitMQ 集群,并使用 Federation 或 Shovel 进行数据同步,实现跨地域的消息传递和容灾能力。

多活数据中心架构

mermaid
graph TB
    subgraph "数据中心 A - 北京"
        CA[RabbitMQ 集群 A]
        CA1[节点 A1]
        CA2[节点 A2]
        CA3[节点 A3]
        CA --> CA1
        CA --> CA2
        CA --> CA3
        
        LBA[负载均衡 A]
        APPA[应用服务 A]
    end
    
    subgraph "数据中心 B - 上海"
        CB[RabbitMQ 集群 B]
        CB1[节点 B1]
        CB2[节点 B2]
        CB3[节点 B3]
        CB --> CB1
        CB --> CB2
        CB --> CB3
        
        LBB[负载均衡 B]
        APPB[应用服务 B]
    end
    
    CA <-.->|Federation/Shovel| CB
    
    APPA --> LBA --> CA
    APPB --> LBB --> CB

二、核心知识点

2.1 多活架构模式

主备模式

mermaid
graph LR
    subgraph "主数据中心"
        PRIMARY[主集群<br/>处理所有流量]
    end
    
    subgraph "备数据中心"
        STANDBY[备集群<br/>待命状态]
    end
    
    PRIMARY <-.->|同步| STANDBY

双活模式

mermaid
graph LR
    subgraph "数据中心 A"
        DCA[集群 A<br/>处理 A 区域流量]
    end
    
    subgraph "数据中心 B"
        DCB[集群 B<br/>处理 B 区域流量]
    end
    
    DCA <-.->|双向同步| DCB

多活模式

mermaid
graph TB
    subgraph "多活架构"
        DC1[数据中心 1]
        DC2[数据中心 2]
        DC3[数据中心 3]
    end
    
    DC1 <-.-> DC2
    DC2 <-.-> DC3
    DC1 <-.-> DC3

2.2 数据同步策略

策略说明适用场景
Federation联邦队列/交换器多集群互联
Shovel数据传输插件点对点迁移
应用层同步应用自行同步自定义需求

2.3 多活架构设计要点

网络规划

mermaid
graph TB
    subgraph "网络架构"
        DC1[数据中心 1]
        DC2[数据中心 2]
        
        VPN[VPN/专线]
        FW1[防火墙]
        FW2[防火墙]
    end
    
    DC1 --> FW1 --> VPN --> FW2 --> DC2

端口规划

端口用途访问范围
5672AMQP数据中心间
5671AMQPS数据中心间(加密)
15672Management管理网络
15692Prometheus监控系统

2.4 故障切换策略

mermaid
stateDiagram-v2
    [*] --> Normal: 正常运行
    Normal --> Degraded: 单数据中心故障
    Degraded --> Normal: 故障恢复
    Degraded --> Critical: 多数据中心故障
    Critical --> Degraded: 部分恢复
    Critical --> [*]: 完全恢复

三、配置示例

3.1 双活数据中心配置

数据中心 A 配置

bash
# 在数据中心 A 上配置 Federation
rabbitmq-plugins enable rabbitmq_federation
rabbitmq-plugins enable rabbitmq_federation_management

# 设置上游(数据中心 B)
rabbitmqctl set_parameter federation-upstream dc-b \
    '{"uri":"amqp://admin:password@dc-b-cluster:5672","ack-mode":"on-confirm"}'

# 设置联邦策略
rabbitmqctl set_policy --apply-to exchanges federation-exchanges "^global\." \
    '{"federation-upstream":"dc-b"}'

数据中心 B 配置

bash
# 在数据中心 B 上配置 Federation
rabbitmq-plugins enable rabbitmq_federation
rabbitmq-plugins enable rabbitmq_federation_management

# 设置上游(数据中心 A)
rabbitmqctl set_parameter federation-upstream dc-a \
    '{"uri":"amqp://admin:password@dc-a-cluster:5672","ack-mode":"on-confirm"}'

# 设置联邦策略
rabbitmqctl set_policy --apply-to exchanges federation-exchanges "^global\." \
    '{"federation-upstream":"dc-a"}'

3.2 PHP 多活管理

php
<?php

class MultiDataCenterManager
{
    private array $dataCenters;
    
    public function __construct(array $dataCenters)
    {
        $this->dataCenters = $dataCenters;
    }
    
    private function request(string $dc, string $endpoint, string $method = 'GET', array $data = null): array
    {
        $config = $this->dataCenters[$dc];
        $url = "http://{$config['host']}:{$config['port']}/api/{$endpoint}";
        
        $ch = curl_init();
        curl_setopt_array($ch, [
            CURLOPT_URL => $url,
            CURLOPT_RETURNTRANSFER => true,
            CURLOPT_USERPWD => "{$config['user']}:{$config['password']}",
            CURLOPT_HTTPHEADER => ['Content-Type: application/json'],
            CURLOPT_CUSTOMREQUEST => $method,
            CURLOPT_TIMEOUT => 30,
        ]);
        
        if ($data !== null) {
            curl_setopt($ch, CURLOPT_POSTFIELDS, json_encode($data));
        }
        
        $response = curl_exec($ch);
        $httpCode = curl_getinfo($ch, CURLINFO_HTTP_CODE);
        curl_close($ch);
        
        return [
            'status' => $httpCode,
            'data' => json_decode($response, true)
        ];
    }
    
    public function checkAllDataCenters(): array
    {
        $results = [];
        
        foreach ($this->dataCenters as $name => $config) {
            $overview = $this->request($name, 'overview');
            $nodes = $this->request($name, 'nodes');
            
            $results[$name] = [
                'status' => $overview['status'] === 200 ? 'online' : 'offline',
                'cluster_name' => $overview['data']['cluster_name'] ?? 'unknown',
                'node_count' => count($nodes['data'] ?? []),
                'rabbitmq_version' => $overview['data']['rabbitmq_version'] ?? 'unknown',
            ];
        }
        
        return $results;
    }
    
    public function getFederationStatus(): array
    {
        $results = [];
        
        foreach ($this->dataCenters as $name => $config) {
            $links = $this->request($name, 'federation-links');
            
            $results[$name] = [
                'links' => $links['data'] ?? [],
                'link_count' => count($links['data'] ?? []),
            ];
        }
        
        return $results;
    }
    
    public function generateMultiDCReport(): string
    {
        $status = $this->checkAllDataCenters();
        $federation = $this->getFederationStatus();
        
        $report = "=== 多活数据中心报告 ===\n";
        $report .= "时间: " . date('Y-m-d H:i:s') . "\n\n";
        
        foreach ($status as $dc => $info) {
            $report .= "数据中心: {$dc}\n";
            $report .= "  状态: {$info['status']}\n";
            $report .= "  集群: {$info['cluster_name']}\n";
            $report .= "  节点数: {$info['node_count']}\n";
            $report .= "  版本: {$info['rabbitmq_version']}\n";
            
            $fedInfo = $federation[$dc] ?? [];
            $report .= "  联邦链接: {$fedInfo['link_count']}\n\n";
        }
        
        return $report;
    }
    
    public function failover(string $fromDC, string $toDC): array
    {
        return [
            'action' => 'failover',
            'from' => $fromDC,
            'to' => $toDC,
            'steps' => [
                '1. 停止 ' . $fromDC . ' 的应用连接',
                '2. 更新 DNS/负载均衡指向 ' . $toDC,
                '3. 验证 ' . $toDC . ' 正常服务',
                '4. 通知相关团队',
            ],
        ];
    }
}

$dataCenters = [
    'dc-beijing' => [
        'host' => 'rabbitmq-beijing.example.com',
        'port' => 15672,
        'user' => 'admin',
        'password' => 'password',
    ],
    'dc-shanghai' => [
        'host' => 'rabbitmq-shanghai.example.com',
        'port' => 15672,
        'user' => 'admin',
        'password' => 'password',
    ],
];

$manager = new MultiDataCenterManager($dataCenters);

echo $manager->generateMultiDCReport();

$failover = $manager->failover('dc-beijing', 'dc-shanghai');
echo "\n=== 故障切换方案 ===\n";
foreach ($failover['steps'] as $step) {
    echo "{$step}\n";
}

3.3 监控配置

yaml
# Prometheus 多数据中心监控
global:
  scrape_interval: 15s

scrape_configs:
  - job_name: 'rabbitmq-dc-beijing'
    static_configs:
      - targets:
          - 'rabbitmq-bj-1:15692'
          - 'rabbitmq-bj-2:15692'
          - 'rabbitmq-bj-3:15692'
    labels:
      datacenter: 'beijing'

  - job_name: 'rabbitmq-dc-shanghai'
    static_configs:
      - targets:
          - 'rabbitmq-sh-1:15692'
          - 'rabbitmq-sh-2:15692'
          - 'rabbitmq-sh-3:15692'
    labels:
      datacenter: 'shanghai'

四、常见问题与解决方案

4.1 跨数据中心网络延迟

问题: 数据中心间网络延迟高

解决方案:

  1. 使用专线或 VPN
  2. 配置消息压缩
  3. 批量传输消息

4.2 数据一致性

问题: 多数据中心数据不一致

解决方案:

  1. 使用仲裁队列
  2. 配置消息确认模式
  3. 实现幂等消费

4.3 故障切换

问题: 数据中心故障时如何切换

解决方案:

bash
# 1. 更新 DNS
# 2. 更新负载均衡
# 3. 通知应用更新连接

五、最佳实践建议

5.1 架构建议

  1. 独立集群: 每个数据中心独立集群
  2. 网络规划: 确保网络稳定和低延迟
  3. 监控完善: 全面监控所有数据中心

5.2 运维建议

  1. 定期演练: 定期进行故障切换演练
  2. 文档完善: 完善运维文档
  3. 自动化: 实现自动化故障切换

六、相关链接