Skip to content

高可用配置

概述

高可用性(High Availability)是生产环境 RabbitMQ 集群的核心要求。本文档介绍 RabbitMQ 高可用配置的最佳实践,确保消息系统在故障情况下仍能持续提供服务。

高可用架构层次

┌─────────────────────────────────────────────────────────────────────────┐
│                        高可用架构层次                                    │
├─────────────────────────────────────────────────────────────────────────┤
│                                                                         │
│  ┌─────────────────────────────────────────────────────────────────┐   │
│  │                    应用层高可用                                  │   │
│  │  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐             │   │
│  │  │   Client 1  │  │   Client 2  │  │   Client 3  │             │   │
│  │  │  (Producer) │  │  (Consumer) │  │  (Consumer) │             │   │
│  │  └──────┬──────┘  └──────┬──────┘  └──────┬──────┘             │   │
│  │         │                │                │                     │   │
│  │         └────────────────┴────────────────┘                     │   │
│  │                          │                                      │   │
│  │                    连接重试、故障转移                            │   │
│  └──────────────────────────┼──────────────────────────────────────┘   │
│                             │                                           │
│  ┌──────────────────────────┼──────────────────────────────────────┐   │
│  │                    接入层高可用                                  │   │
│  │                          │                                      │   │
│  │  ┌───────────────────────┴───────────────────────┐             │   │
│  │  │              Load Balancer (HAProxy)           │             │   │
│  │  │         ┌──────────┬──────────┬──────────┐    │             │   │
│  │  │         │  Active  │  Backup  │  Backup  │    │             │   │
│  │  │         └──────────┴──────────┴──────────┘    │             │   │
│  │  └───────────────────────────────────────────────┘             │   │
│  │                    健康检查、负载均衡                            │   │
│  └──────────────────────────┬──────────────────────────────────────┘   │
│                             │                                           │
│  ┌──────────────────────────┼──────────────────────────────────────┐   │
│  │                    服务层高可用                                  │   │
│  │                          │                                      │   │
│  │  ┌───────────────────────┴───────────────────────┐             │   │
│  │  │              RabbitMQ Cluster                  │             │   │
│  │  │  ┌──────────┐  ┌──────────┐  ┌──────────┐    │             │   │
│  │  │  │  Node 1  │  │  Node 2  │  │  Node 3  │    │             │   │
│  │  │  │ (Leader) │  │(Follower)│  │(Follower)│    │             │   │
│  │  │  └──────────┘  └──────────┘  └──────────┘    │             │   │
│  │  └───────────────────────────────────────────────┘             │   │
│  │                    集群、仲裁队列、镜像队列                       │   │
│  └──────────────────────────┬──────────────────────────────────────┘   │
│                             │                                           │
│  ┌──────────────────────────┼──────────────────────────────────────┐   │
│  │                    存储层高可用                                  │   │
│  │                          │                                      │   │
│  │  ┌───────────────────────┴───────────────────────┐             │   │
│  │  │              Persistent Storage                │             │   │
│  │  │  ┌──────────┐  ┌──────────┐  ┌──────────┐    │             │   │
│  │  │  │  Disk 1  │  │  Disk 2  │  │  Disk 3  │    │             │   │
│  │  │  │  (RAID)  │  │  (RAID)  │  │  (RAID)  │    │             │   │
│  │  │  └──────────┘  └──────────┘  └──────────┘    │             │   │
│  │  └───────────────────────────────────────────────┘             │   │
│  │                    持久化存储、备份                              │   │
│  └─────────────────────────────────────────────────────────────────┘   │
│                                                                         │
└─────────────────────────────────────────────────────────────────────────┘

仲裁队列高可用配置

PHP 代码示例:仲裁队列声明

php
<?php

namespace App\Messaging\HA;

use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;

class QuorumQueueManager
{
    private $channel;
    
    public function declareQuorumQueue(
        string $queueName,
        int $quorumSize = 3,
        array $options = []
    ): void {
        $arguments = new AMQPTable([
            'x-queue-type' => 'quorum',
            'x-quorum-initial-group-size' => $quorumSize,
            'x-delivery-limit' => $options['delivery_limit'] ?? 10,
            'x-dead-letter-exchange' => $options['dead_letter_exchange'] ?? '',
            'x-dead-letter-routing-key' => $options['dead_letter_routing_key'] ?? '',
            'x-max-length' => $options['max_length'] ?? 0,
            'x-message-ttl' => $options['message_ttl'] ?? 0,
            'x-quorum-min-group-size' => $options['min_group_size'] ?? 2,
        ]);
        
        $this->channel->queue_declare(
            $queueName,
            false,
            true,
            false,
            false,
            false,
            $arguments
        );
    }
    
    public function declareClassicQueueWithHA(
        string $queueName,
        string $haPolicy = 'all',
        array $options = []
    ): void {
        $arguments = new AMQPTable([
            'x-ha-policy' => $haPolicy,
            'x-ha-policy-params' => $options['ha_nodes'] ?? null,
            'x-message-ttl' => $options['message_ttl'] ?? 0,
            'x-max-length' => $options['max_length'] ?? 0,
            'x-dead-letter-exchange' => $options['dead_letter_exchange'] ?? '',
        ]);
        
        $this->channel->queue_declare(
            $queueName,
            false,
            true,
            false,
            false,
            false,
            $arguments
        );
    }
    
    public function publishWithConfirmation(
        string $exchange,
        string $routingKey,
        AMQPMessage $message,
        int $timeout = 5
    ): bool {
        $this->channel->confirm_select();
        
        $this->channel->basic_publish(
            $message,
            $exchange,
            $routingKey,
            true,
            false
        );
        
        return $this->channel->wait_for_pending_acks($timeout);
    }
}

高可用策略配置

php
<?php

namespace App\Messaging\HA;

class HAPolicyManager
{
    private $channel;
    
    public function applyHAPolicy(
        string $pattern,
        array $policy
    ): void {
        $policyDefinition = json_encode([
            'ha-mode' => $policy['ha_mode'] ?? 'exactly',
            'ha-params' => $policy['ha_params'] ?? 2,
            'ha-sync-mode' => $policy['ha_sync_mode'] ?? 'automatic',
            'ha-sync-batch-size' => $policy['ha_sync_batch_size'] ?? 100,
            'ha-promote-on-shutdown' => $policy['ha_promote_on_shutdown'] ?? 'when-synced',
            'ha-promote-on-failure' => $policy['ha_promote_on_failure'] ?? 'when-synced',
        ]);
        
        $this->channel->set_policy(
            $pattern . '_ha_policy',
            $pattern,
            $policyDefinition,
            0,
            'queues',
            null
        );
    }
    
    public function applyQuorumPolicy(string $pattern): void
    {
        $policyDefinition = json_encode([
            'target-group-size' => 3,
        ]);
        
        $this->channel->set_policy(
            $pattern . '_quorum_policy',
            $pattern,
            $policyDefinition,
            0,
            'queues',
            null
        );
    }
    
    public function removePolicy(string $policyName): void
    {
        $this->channel->clear_policy($policyName);
    }
}

连接高可用配置

php
<?php

namespace App\Messaging\HA;

use PhpAmqpLib\Connection\AMQPStreamConnection;
use Psr\Log\LoggerInterface;

class HAConnectionManager
{
    private array $nodes;
    private LoggerInterface $logger;
    private ?AMQPStreamConnection $connection = null;
    private array $config;
    
    public function __construct(array $nodes, LoggerInterface $logger, array $config = [])
    {
        $this->nodes = $nodes;
        $this->logger = $logger;
        $this->config = array_merge([
            'connection_timeout' => 3.0,
            'read_write_timeout' => 10.0,
            'heartbeat' => 60,
            'max_retries' => 3,
            'retry_delay' => 1000,
            'keepalive' => true,
        ], $config);
    }
    
    public function getConnection(): AMQPStreamConnection
    {
        if ($this->connection && $this->connection->isConnected()) {
            return $this->connection;
        }
        
        return $this->establishConnection();
    }
    
    private function establishConnection(): AMQPStreamConnection
    {
        $nodes = $this->getAvailableNodes();
        $lastException = null;
        
        foreach ($nodes as $node) {
            try {
                $connection = $this->createConnection($node);
                $this->setupConnectionHandlers($connection);
                $this->connection = $connection;
                
                $this->logger->info('Connected to RabbitMQ node', [
                    'host' => $node['host'],
                    'port' => $node['port'],
                ]);
                
                return $connection;
            } catch (\Exception $e) {
                $lastException = $e;
                $this->logger->warning('Failed to connect to node', [
                    'host' => $node['host'],
                    'error' => $e->getMessage(),
                ]);
            }
        }
        
        throw new ConnectionException(
            'Failed to connect to any RabbitMQ node',
            0,
            $lastException
        );
    }
    
    private function getAvailableNodes(): array
    {
        $nodes = $this->nodes;
        shuffle($nodes);
        return $nodes;
    }
    
    private function createConnection(array $node): AMQPStreamConnection
    {
        return new AMQPStreamConnection(
            $node['host'],
            $node['port'],
            $node['user'] ?? $this->config['user'] ?? 'guest',
            $node['password'] ?? $this->config['password'] ?? 'guest',
            $node['vhost'] ?? $this->config['vhost'] ?? '/',
            false,
            'AMQPLAIN',
            null,
            'en_US',
            $this->config['connection_timeout'],
            $this->config['read_write_timeout'],
            null,
            $this->config['keepalive'],
            $this->config['heartbeat']
        );
    }
    
    private function setupConnectionHandlers(AMQPStreamConnection $connection): void
    {
        $connection->set_close_handler(function ($reason, $replyCode) {
            $this->handleConnectionClose($reason, $replyCode);
        });
        
        $connection->set_blocked_handler(function ($reason) {
            $this->handleBlocked($reason);
        });
        
        $connection->set_unblocked_handler(function () {
            $this->handleUnblocked();
        });
    }
    
    private function handleConnectionClose(string $reason, int $replyCode): void
    {
        $this->logger->error('Connection closed', [
            'reason' => $reason,
            'reply_code' => $replyCode,
        ]);
        
        $this->connection = null;
    }
    
    private function handleBlocked(string $reason): void
    {
        $this->logger->warning('Connection blocked', ['reason' => $reason]);
    }
    
    private function handleUnblocked(): void
    {
        $this->logger->info('Connection unblocked');
    }
    
    public function reconnect(): AMQPStreamConnection
    {
        $this->close();
        return $this->establishConnection();
    }
    
    public function close(): void
    {
        if ($this->connection) {
            try {
                $this->connection->close();
            } catch (\Exception $e) {
                $this->logger->warning('Error closing connection', [
                    'error' => $e->getMessage(),
                ]);
            }
            $this->connection = null;
        }
    }
}

消费者高可用配置

php
<?php

namespace App\Messaging\HA;

use PhpAmqpLib\Message\AMQPMessage;
use Psr\Log\LoggerInterface;

class HAConsumer
{
    private HAConnectionManager $connectionManager;
    private LoggerInterface $logger;
    private array $config;
    private $channel = null;
    
    public function __construct(
        HAConnectionManager $connectionManager,
        LoggerInterface $logger,
        array $config = []
    ) {
        $this->connectionManager = $connectionManager;
        $this->logger = $logger;
        $this->config = array_merge([
            'prefetch_count' => 10,
            'consumer_timeout' => 30,
            'reconnect_delay' => 5,
            'max_reconnect_attempts' => 10,
        ], $config);
    }
    
    public function consume(string $queue, callable $handler): void
    {
        $reconnectAttempts = 0;
        
        while ($reconnectAttempts < $this->config['max_reconnect_attempts']) {
            try {
                $this->ensureChannel();
                $this->setupConsumer($queue, $handler);
                
                $reconnectAttempts = 0;
                
                while ($this->channel && $this->channel->is_consuming()) {
                    try {
                        $this->channel->wait(
                            null,
                            false,
                            $this->config['consumer_timeout']
                        );
                    } catch (\PhpAmqpLib\Exception\AMQPTimeoutException $e) {
                        continue;
                    }
                }
            } catch (\Exception $e) {
                $reconnectAttempts++;
                $this->logger->error('Consumer error, attempting reconnect', [
                    'attempt' => $reconnectAttempts,
                    'error' => $e->getMessage(),
                ]);
                
                $this->closeChannel();
                
                sleep($this->config['reconnect_delay']);
            }
        }
        
        throw new ConsumerException('Max reconnect attempts reached');
    }
    
    private function ensureChannel(): void
    {
        if ($this->channel && $this->channel->is_open()) {
            return;
        }
        
        $connection = $this->connectionManager->getConnection();
        $this->channel = $connection->channel();
        
        $this->channel->basic_qos(
            null,
            $this->config['prefetch_count'],
            null
        );
        
        $this->channel->set_close_handler(function ($reason) {
            $this->logger->warning('Channel closed', ['reason' => $reason]);
            $this->channel = null;
        });
    }
    
    private function setupConsumer(string $queue, callable $handler): void
    {
        $callback = function (AMQPMessage $message) use ($handler) {
            $this->processMessage($message, $handler);
        };
        
        $this->channel->basic_consume(
            $queue,
            $this->getConsumerTag(),
            false,
            false,
            false,
            false,
            $callback
        );
        
        $this->logger->info('Consumer started', [
            'queue' => $queue,
            'consumer_tag' => $this->getConsumerTag(),
        ]);
    }
    
    private function processMessage(AMQPMessage $message, callable $handler): void
    {
        try {
            $result = $handler($message);
            
            if ($result !== false) {
                $message->ack();
            } else {
                $message->nack(false, true);
            }
        } catch (\Exception $e) {
            $this->logger->error('Message processing failed', [
                'error' => $e->getMessage(),
                'message_id' => $message->get('message_id'),
            ]);
            
            $message->nack(false, true);
        }
    }
    
    private function getConsumerTag(): string
    {
        return gethostname() . '-' . getmypid();
    }
    
    private function closeChannel(): void
    {
        if ($this->channel) {
            try {
                $this->channel->close();
            } catch (\Exception $e) {
                // Ignore close errors
            }
            $this->channel = null;
        }
    }
    
    public function stop(): void
    {
        $this->closeChannel();
        $this->connectionManager->close();
    }
}

错误做法:无高可用保护

php
<?php

class NonHAConsumer
{
    public function consume(string $queue, callable $handler): void
    {
        // 错误1:单节点连接,无故障转移
        $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
        $channel = $connection->channel();
        
        // 错误2:无重连机制
        $callback = function ($message) use ($handler) {
            $handler($message);
            // 错误3:无异常处理
            $message->ack();
        };
        
        $channel->basic_consume($queue, '', false, false, false, false, $callback);
        
        // 错误4:无超时处理
        while ($channel->is_consuming()) {
            $channel->wait();
        }
        
        // 错误5:连接断开后无法恢复
    }
}

镜像队列配置

HA Policy 配置示例

bash
# 设置所有队列为镜像队列,同步到所有节点
rabbitmqctl set_policy ha-all ".*" '{"ha-mode":"all"}'

# 设置镜像队列为 2 个副本
rabbitmqctl set_policy ha-exactly "^ha\." '{"ha-mode":"exactly","ha-params":2}'

# 设置特定队列为镜像队列
rabbitmqctl set_policy ha-order "^order\." '{"ha-mode":"exactly","ha-params":2,"ha-sync-mode":"automatic"}'

# 设置仲裁队列策略
rabbitmqctl set_policy quorum "^quorum\." '{"queue-type":"quorum"}'

PHP 设置 HA Policy

php
<?php

namespace App\Messaging\HA;

class PolicySetup
{
    private $apiClient;
    private string $vhost;
    
    public function __construct($apiClient, string $vhost = '/')
    {
        $this->apiClient = $apiClient;
        $this->vhost = $vhost;
    }
    
    public function setHAPolicy(
        string $name,
        string $pattern,
        array $definition,
        int $priority = 0
    ): void {
        $url = sprintf('/api/policies/%s/%s', urlencode($this->vhost), urlencode($name));
        
        $data = [
            'pattern' => $pattern,
            'definition' => $definition,
            'priority' => $priority,
            'apply-to' => 'queues',
        ];
        
        $this->apiClient->put($url, $data);
    }
    
    public function setupDefaultPolicies(): void
    {
        $this->setHAPolicy('ha-all', '.*', [
            'ha-mode' => 'all',
            'ha-sync-mode' => 'automatic',
        ]);
        
        $this->setHAPolicy('ha-critical', '^critical\.', [
            'ha-mode' => 'exactly',
            'ha-params' => 3,
            'ha-sync-mode' => 'automatic',
        ], 10);
        
        $this->setHAPolicy('quorum-default', '^quorum\.', [
            'x-queue-type' => 'quorum',
        ], 5);
    }
}

故障转移配置

自动故障转移

php
<?php

namespace App\Messaging\HA;

class FailoverManager
{
    private array $nodes;
    private $logger;
    private int $healthCheckInterval = 30;
    
    public function __construct(array $nodes, $logger)
    {
        $this->nodes = $nodes;
        $this->logger = $logger;
    }
    
    public function monitorAndFailover(): void
    {
        while (true) {
            foreach ($this->nodes as $node) {
                $health = $this->checkNodeHealth($node);
                
                if (!$health['healthy']) {
                    $this->handleNodeFailure($node, $health);
                }
            }
            
            sleep($this->healthCheckInterval);
        }
    }
    
    private function checkNodeHealth(array $node): array
    {
        $url = sprintf('http://%s:15672/api/health/checks/alarms', $node['host']);
        
        try {
            $response = $this->httpGet($url, $node['user'], $node['password']);
            
            return [
                'healthy' => $response['status'] === 'ok',
                'alarms' => $response['alarms'] ?? [],
            ];
        } catch (\Exception $e) {
            return [
                'healthy' => false,
                'error' => $e->getMessage(),
            ];
        }
    }
    
    private function handleNodeFailure(array $node, array $health): void
    {
        $this->logger->error('Node health check failed', [
            'node' => $node['host'],
            'health' => $health,
        ]);
        
        $this->notifyAdmins($node, $health);
        
        $this->rebalanceQueues($node);
    }
    
    private function rebalanceQueues(array $failedNode): void
    {
        foreach ($this->nodes as $node) {
            if ($node['host'] !== $failedNode['host']) {
                $this->triggerQueueRebalance($node);
                break;
            }
        }
    }
    
    private function triggerQueueRebalance(array $node): void
    {
        $url = sprintf('http://%s:15672/api/rebalance/queues', $node['host']);
        
        try {
            $this->httpPost($url, [], $node['user'], $node['password']);
        } catch (\Exception $e) {
            $this->logger->error('Failed to trigger queue rebalance', [
                'error' => $e->getMessage(),
            ]);
        }
    }
    
    private function notifyAdmins(array $node, array $health): void
    {
        // 发送告警通知
    }
    
    private function httpGet(string $url, string $user, string $password): array
    {
        // HTTP GET 实现
        return [];
    }
    
    private function httpPost(string $url, array $data, string $user, string $password): array
    {
        // HTTP POST 实现
        return [];
    }
}

最佳实践建议清单

仲裁队列配置

  • [ ] 使用仲裁队列替代镜像队列
  • [ ] 配置合理的仲裁节点数
  • [ ] 设置消息投递限制
  • [ ] 配置死信队列

连接高可用

  • [ ] 配置多节点连接
  • [ ] 实现自动重连机制
  • [ ] 设置连接超时和心跳
  • [ ] 处理连接阻塞事件

消费者高可用

  • [ ] 实现消费者重连
  • [ ] 配置合理的预取数量
  • [ ] 处理消息确认异常
  • [ ] 实现优雅关闭

集群高可用

  • [ ] 配置 HA Policy
  • [ ] 设置自动同步
  • [ ] 配置故障转移策略
  • [ ] 监控集群状态

生产环境注意事项

  1. 仲裁队列 vs 镜像队列

    • RabbitMQ 3.8+ 推荐使用仲裁队列
    • 仲裁队列数据一致性更强
    • 镜像队列在极端情况下可能丢消息
  2. 节点数量

    • 仲裁队列需要奇数节点(3、5、7)
    • 最小生产配置为 3 节点
    • 跨可用区部署提高容灾能力
  3. 故障恢复

    • 配置自动故障转移
    • 准备手动干预脚本
    • 定期演练故障恢复
  4. 监控告警

    • 监控节点状态
    • 监控队列同步状态
    • 配置告警阈值

相关链接