Skip to content

RabbitMQ Federation 联邦队列

一、概述

RabbitMQ Federation 插件提供了一种跨集群或跨数据中心的消息传递机制,允许不同 RabbitMQ 集群之间进行消息转发。Federation 适用于多数据中心、跨地域部署和消息分发场景。

Federation 架构

mermaid
graph TB
    subgraph "数据中心 A"
        CA[RabbitMQ 集群 A]
        CA1[节点 A1]
        CA2[节点 A2]
        CA --> CA1
        CA --> CA2
    end
    
    subgraph "数据中心 B"
        CB[RabbitMQ 集群 B]
        CB1[节点 B1]
        CB2[节点 B2]
        CB --> CB1
        CB --> CB2
    end
    
    CA <-.->|Federation Link| CB
    
    subgraph "消息流向"
        P[生产者] --> CA
        CA -->|联邦转发| CB
        CB --> C[消费者]
    end

二、核心知识点

2.1 Federation 工作原理

联邦交换器

mermaid
sequenceDiagram
    participant P as 生产者
    participant EA as 上游交换器
    participant EB as 下游交换器
    participant QB as 下游队列
    participant C as 消费者
    
    P->>EA: 发布消息
    EA->>EB: 联邦转发
    EB->>QB: 路由到队列
    QB->>C: 消费消息

联邦队列

mermaid
sequenceDiagram
    participant P as 生产者
    participant QA as 上游队列
    participant QB as 下游队列
    participant C as 消费者
    
    P->>QA: 发布消息
    QA->>QB: 联邦拉取
    QB->>C: 消费消息

2.2 Federation 类型

类型说明适用场景
联邦交换器上游交换器转发到下游交换器消息广播、跨集群发布
联邦队列下游队列从上游队列拉取消息负载均衡、消息分发

2.3 Federation 特点

优点

  1. 松耦合: 集群间独立运行
  2. 容错性: 单集群故障不影响其他集群
  3. 灵活性: 可选择性联邦特定资源
  4. 可扩展: 支持多层级联邦

限制

  1. 最终一致性: 消息可能有延迟
  2. 单向传输: 默认单向消息流
  3. 资源消耗: 需要额外资源维护连接

2.4 Federation 与 Shovel 对比

特性FederationShovel
配置方式策略配置插件配置
消息流向上游到下游双向可配
自动发现支持不支持
适用场景多集群互联点对点传输

三、配置示例

3.1 启用 Federation 插件

bash
# 在所有节点上启用插件
rabbitmq-plugins enable rabbitmq_federation
rabbitmq-plugins enable rabbitmq_federation_management

# 验证插件状态
rabbitmq-plugins list | grep federation

3.2 配置上游服务器

bash
# 设置上游服务器
rabbitmqctl set_parameter federation-upstream upstream-a \
    '{"uri":"amqp://user:password@cluster-a:5672","ack-mode":"on-confirm"}'

# 设置上游集群
rabbitmqctl set_parameter federation-upstream-set all-upstreams \
    '[{"upstream":"upstream-a"},{"upstream":"upstream-b"}]'

3.3 配置联邦策略

联邦交换器

bash
# 联邦交换器策略
rabbitmqctl set_policy --apply-to exchanges federation-exchanges "^federated\." \
    '{"federation-upstream":"upstream-a"}'

联邦队列

bash
# 联邦队列策略
rabbitmqctl set_policy --apply-to queues federation-queues "^federated\." \
    '{"federation-upstream":"upstream-a"}'

3.4 PHP 配置示例

php
<?php

class FederationManager
{
    private string $host;
    private int $port;
    private string $user;
    private string $password;
    
    public function __construct(
        string $host = 'localhost',
        int $port = 15672,
        string $user = 'guest',
        string $password = 'guest'
    ) {
        $this->host = $host;
        $this->port = $port;
        $this->user = $user;
        $this->password = $password;
    }
    
    private function request(string $endpoint, string $method = 'GET', array $data = null): array
    {
        $url = "http://{$this->host}:{$this->port}/api/{$endpoint}";
        
        $ch = curl_init();
        curl_setopt_array($ch, [
            CURLOPT_URL => $url,
            CURLOPT_RETURNTRANSFER => true,
            CURLOPT_USERPWD => "{$this->user}:{$this->password}",
            CURLOPT_HTTPHEADER => ['Content-Type: application/json'],
            CURLOPT_CUSTOMREQUEST => $method,
        ]);
        
        if ($data !== null) {
            curl_setopt($ch, CURLOPT_POSTFIELDS, json_encode($data));
        }
        
        $response = curl_exec($ch);
        $httpCode = curl_getinfo($ch, CURLINFO_HTTP_CODE);
        curl_close($ch);
        
        return [
            'status' => $httpCode,
            'data' => json_decode($response, true)
        ];
    }
    
    public function setUpstream(
        string $name,
        string $uri,
        string $ackMode = 'on-confirm',
        int $maxHops = 1
    ): array {
        return $this->request("parameters/federation-upstream/%2f/{$name}", 'PUT', [
            'value' => [
                'uri' => $uri,
                'ack-mode' => $ackMode,
                'max-hops' => $maxHops,
            ],
        ]);
    }
    
    public function setUpstreamSet(string $name, array $upstreams): array
    {
        $value = array_map(function ($upstream) {
            return ['upstream' => $upstream];
        }, $upstreams);
        
        return $this->request("parameters/federation-upstream-set/%2f/{$name}", 'PUT', [
            'value' => $value,
        ]);
    }
    
    public function setFederationPolicy(
        string $name,
        string $pattern,
        string $upstream,
        string $applyTo = 'exchanges'
    ): array {
        return $this->request("policies/%2f/{$name}", 'PUT', [
            'pattern' => $pattern,
            'definition' => [
                'federation-upstream' => $upstream,
            ],
            'apply-to' => $applyTo,
        ]);
    }
    
    public function getFederationStatus(): array
    {
        return $this->request('federation-links');
    }
    
    public function listUpstreams(): array
    {
        return $this->request('parameters/federation-upstream');
    }
    
    public function generateFederationReport(): string
    {
        $status = $this->getFederationStatus();
        $upstreams = $this->listUpstreams();
        
        $report = "=== Federation 报告 ===\n";
        $report .= "时间: " . date('Y-m-d H:i:s') . "\n\n";
        
        $report .= "上游服务器:\n";
        foreach ($upstreams['data'] ?? [] as $upstream) {
            $report .= "  - {$upstream['name']}: {$upstream['value']['uri']}\n";
        }
        
        $report .= "\n联邦链接状态:\n";
        foreach ($status['data'] ?? [] as $link) {
            $statusText = $link['status'] ?? 'unknown';
            $report .= "  - {$link['exchange']} -> {$link['upstream_exchange']}: {$statusText}\n";
        }
        
        return $report;
    }
}

$manager = new FederationManager('localhost', 15672, 'admin', 'Admin@123456');

// 设置上游服务器
$manager->setUpstream('upstream-dc-a', 'amqp://admin:password@cluster-a:5672');

// 设置联邦策略
$manager->setFederationPolicy('federation-policy', '^federated\.', 'upstream-dc-a');

// 生成报告
echo $manager->generateFederationReport();

四、常见问题与解决方案

4.1 联邦链接断开

问题: Federation 链接频繁断开

排查:

bash
# 检查链接状态
rabbitmqctl eval 'rabbit_federation_status:status().'

# 检查网络连通性
ping cluster-a
telnet cluster-a 5672

# 检查日志
grep -i federation /var/log/rabbitmq/rabbit@$(hostname).log

4.2 消息重复

问题: 联邦传输导致消息重复

解决方案:

bash
# 配置消息去重
rabbitmqctl set_parameter federation-upstream upstream-a \
    '{"uri":"amqp://cluster-a","trust-user-id":true}'

五、最佳实践建议

5.1 部署建议

  1. 网络稳定: 确保集群间网络稳定
  2. 带宽充足: 预留足够带宽
  3. 监控链接: 监控联邦链接状态

5.2 配置建议

json
{
    "uri": "amqp://user:password@upstream:5672",
    "ack-mode": "on-confirm",
    "max-hops": 1,
    "expires": 3600000,
    "message-ttl": 86400000
}

六、相关链接