Skip to content

RabbitMQ 仲裁队列高可用

一、概述

仲裁队列(Quorum Queue)是 RabbitMQ 3.8 引入的新型队列,基于 Raft 共识算法实现,提供强一致性的数据复制和自动故障恢复能力。仲裁队列是现代 RabbitMQ 高可用部署的推荐选择。

仲裁队列架构

mermaid
graph TB
    subgraph "仲裁队列(Raft 集群)"
        L[Leader<br/>主节点]
        F1[Follower<br/>从节点1]
        F2[Follower<br/>从节点2]
        
        L -->|复制| F1
        L -->|复制| F2
        F1 -->|投票| L
        F2 -->|投票| L
    end
    
    subgraph "客户端"
        P[生产者]
        C[消费者]
    end
    
    P -->|写入| L
    L -->|读取| C

二、核心知识点

2.1 Raft 共识算法

Raft 工作原理

mermaid
sequenceDiagram
    participant L as Leader
    participant F1 as Follower 1
    participant F2 as Follower 2
    
    Note over L: 客户端写入请求
    
    L->>F1: AppendEntries (日志条目)
    L->>F2: AppendEntries (日志条目)
    
    F1->>L: 确认
    F2->>L: 确认
    
    Note over L: 收到多数确认<br/>提交日志
    
    L->>F1: 提交通知
    L->>F2: 提交通知
    
    Note over L: 返回客户端成功

Raft 核心概念

概念说明
Leader主节点,处理所有客户端请求
Follower从节点,接收 Leader 的日志复制
Term任期,Leader 的任期编号
Log日志,存储所有操作记录
Quorum法定人数,过半节点同意

2.2 仲裁队列特性

与镜像队列对比

特性仲裁队列镜像队列
一致性强一致最终一致
数据安全保证不丢失可能丢失
故障恢复自动选举手动/自动
性能较低较高
节点要求奇数节点任意
推荐场景生产环境开发测试

仲裁队列优势

  1. 数据安全: 基于 Raft 保证数据不丢失
  2. 自动恢复: Leader 故障自动选举新 Leader
  3. 一致性保证: 强一致性,避免数据不一致
  4. 简化运维: 无需手动管理同步

2.3 仲裁队列工作流程

mermaid
stateDiagram-v2
    [*] --> Follower: 节点启动
    Follower --> Candidate: 选举超时
    Candidate --> Leader: 获得多数票
    Candidate --> Follower: 收到更高 Term
    Leader --> Follower: 发现更高 Term
    Follower --> Follower: 接收心跳

2.4 仲裁队列配置参数

参数默认值说明
x-queue-typequorum队列类型
x-quorum-initial-group-size3初始仲裁组大小
x-delivery-limit20消息投递限制
x-overflowreject-publish-dlx溢出行为
x-quorum-target-group-size3目标仲裁组大小

三、配置示例

3.1 创建仲裁队列

命令行创建

bash
# 使用 rabbitmqadmin
rabbitmqadmin declare queue name=my-quorum-queue \
    type=quorum \
    durable=true \
    arguments='{"x-quorum-initial-group-size":3}'

# 使用 rabbitmqctl
rabbitmqctl eval '
rabbit_amqqueue:declare(
    rabbit_misc:r(<<"/">>, queue, <<"my-quorum-queue">>),
    true, false, [],
    [{<<"x-queue-type">>, longstr, <<"quorum">>},
     {<<"x-quorum-initial-group-size">>, long, 3}],
    <<"guest">>
).'

配置文件声明

ini
# /etc/rabbitmq/rabbitmq.conf

# 默认队列类型
queue_default_type = quorum

# 默认仲裁组大小
quorum_default_group_size = 3

3.2 PHP 创建仲裁队列

php
<?php

class QuorumQueueManager
{
    private string $host;
    private int $port;
    private string $user;
    private string $password;
    
    public function __construct(
        string $host = 'localhost',
        int $port = 15672,
        string $user = 'guest',
        string $password = 'guest'
    ) {
        $this->host = $host;
        $this->port = $port;
        $this->user = $user;
        $this->password = $password;
    }
    
    private function request(string $endpoint, string $method = 'GET', array $data = null): array
    {
        $url = "http://{$this->host}:{$this->port}/api/{$endpoint}";
        
        $ch = curl_init();
        curl_setopt_array($ch, [
            CURLOPT_URL => $url,
            CURLOPT_RETURNTRANSFER => true,
            CURLOPT_USERPWD => "{$this->user}:{$this->password}",
            CURLOPT_HTTPHEADER => ['Content-Type: application/json'],
            CURLOPT_CUSTOMREQUEST => $method,
        ]);
        
        if ($data !== null) {
            curl_setopt($ch, CURLOPT_POSTFIELDS, json_encode($data));
        }
        
        $response = curl_exec($ch);
        $httpCode = curl_getinfo($ch, CURLINFO_HTTP_CODE);
        curl_close($ch);
        
        return [
            'status' => $httpCode,
            'data' => json_decode($response, true)
        ];
    }
    
    public function createQuorumQueue(
        string $name,
        string $vhost = '%2f',
        int $quorumSize = 3,
        array $extraArgs = []
    ): array {
        $arguments = array_merge([
            'x-queue-type' => 'quorum',
            'x-quorum-initial-group-size' => $quorumSize,
        ], $extraArgs);
        
        return $this->request("queues/{$vhost}/{$name}", 'PUT', [
            'auto_delete' => false,
            'durable' => true,
            'arguments' => $arguments,
        ]);
    }
    
    public function getQuorumQueueStatus(string $name, string $vhost = '%2f'): array
    {
        $result = $this->request("queues/{$vhost}/{$name}");
        
        if ($result['status'] !== 200) {
            return ['error' => 'Queue not found'];
        }
        
        $queue = $result['data'];
        
        return [
            'name' => $queue['name'],
            'type' => $queue['type'] ?? 'classic',
            'is_quorum' => ($queue['type'] ?? 'classic') === 'quorum',
            'leader' => $queue['leader'] ?? null,
            'members' => $queue['members'] ?? [],
            'messages' => $queue['messages'] ?? 0,
            'consumers' => $queue['consumers'] ?? 0,
            'state' => $queue['state'] ?? 'unknown',
        ];
    }
    
    public function listQuorumQueues(string $vhost = '%2f'): array
    {
        $queues = $this->request("queues/{$vhost}");
        
        if ($queues['status'] !== 200) {
            return [];
        }
        
        return array_filter($queues['data'], function ($queue) {
            return ($queue['type'] ?? 'classic') === 'quorum';
        });
    }
    
    public function getRaftStatus(string $name, string $vhost = '%2f'): array
    {
        $result = $this->request("queues/{$vhost}/{$name}");
        
        if ($result['status'] !== 200) {
            return ['error' => 'Queue not found'];
        }
        
        $queue = $result['data'];
        
        if (($queue['type'] ?? 'classic') !== 'quorum') {
            return ['error' => 'Not a quorum queue'];
        }
        
        return [
            'leader' => $queue['leader'] ?? 'none',
            'members' => $queue['members'] ?? [],
            'member_count' => count($queue['members'] ?? []),
            'online' => count(array_filter($queue['members'] ?? [], function ($m) {
                return ($m['online'] ?? false);
            })),
        ];
    }
    
    public function generateQuorumReport(): string
    {
        $queues = $this->listQuorumQueues();
        
        $report = "=== 仲裁队列报告 ===\n";
        $report .= "时间: " . date('Y-m-d H:i:s') . "\n\n";
        
        foreach ($queues as $queue) {
            $status = $this->getQuorumQueueStatus($queue['name']);
            
            $report .= "队列: {$queue['name']}\n";
            $report .= "  Leader: {$status['leader']}\n";
            $report .= "  成员数: " . count($status['members']) . "\n";
            $report .= "  消息数: {$status['messages']}\n";
            $report .= "  消费者: {$status['consumers']}\n";
            $report .= "  状态: {$status['state']}\n\n";
        }
        
        return $report;
    }
}

$manager = new QuorumQueueManager('localhost', 15672, 'admin', 'Admin@123456');

// 创建仲裁队列
$manager->createQuorumQueue('orders-queue', '%2f', 3);

// 生成报告
echo $manager->generateQuorumReport();

3.3 策略配置

bash
# 设置默认仲裁队列策略
rabbitmqctl set_policy quorum-all "^(?!amq\\.).*" \
    '{"queue-type":"quorum"}' \
    --apply-to queues

# 为特定队列设置仲裁组大小
rabbitmqctl set_policy quorum-size "^important\." \
    '{"queue-type":"quorum","x-quorum-initial-group-size":5}' \
    --apply-to queues

3.4 监控仲裁队列

bash
# 查看仲裁队列状态
rabbitmqctl list_queues name type leader members

# 查看 Raft 状态
rabbitmqctl eval 'ra:cluster_status({name, node}).'

# 查看队列成员
rabbitmqctl list_queues name members

四、常见问题与解决方案

4.1 Leader 选举失败

问题: Leader 故障后无法选举新 Leader

排查:

bash
# 检查节点状态
rabbitmqctl cluster_status

# 检查 Raft 状态
rabbitmqctl eval 'ra:members({queue_name, node}).'

# 检查日志
grep -i "raft" /var/log/rabbitmq/rabbit@$(hostname).log

解决方案:

bash
# 确保奇数节点
# 确保多数节点可用
# 检查网络连接

4.2 性能问题

问题: 仲裁队列性能低于预期

优化:

ini
# 增加批次大小
quorum_commands_soft_limit = 64

# 优化磁盘 I/O
# 使用 SSD

4.3 节点加入失败

问题: 新节点无法加入仲裁组

解决方案:

bash
# 检查节点状态
rabbitmqctl cluster_status

# 手动添加成员
rabbitmqctl eval 'rabbit_quorum_queue:add_member(QueueName, NodeName).'

五、最佳实践建议

5.1 节点配置

  1. 奇数节点: 使用 3 或 5 个节点
  2. 资源充足: 确保每个节点资源充足
  3. 网络稳定: 节点间网络延迟 < 1ms

5.2 队列配置

ini
# 推荐配置
x-queue-type = quorum
x-quorum-initial-group-size = 3
x-delivery-limit = 20
x-overflow = reject-publish-dlx

5.3 迁移建议

从镜像队列迁移到仲裁队列:

  1. 创建新的仲裁队列
  2. 更新应用使用新队列
  3. 迁移存量消息
  4. 删除旧队列

六、相关链接