Appearance
高可用配置
概述
高可用性(High Availability)是生产环境 RabbitMQ 集群的核心要求。本文档介绍 RabbitMQ 高可用配置的最佳实践,确保消息系统在故障情况下仍能持续提供服务。
高可用架构层次
┌─────────────────────────────────────────────────────────────────────────┐
│ 高可用架构层次 │
├─────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────────────────────────────────────────────────────┐ │
│ │ 应用层高可用 │ │
│ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │
│ │ │ Client 1 │ │ Client 2 │ │ Client 3 │ │ │
│ │ │ (Producer) │ │ (Consumer) │ │ (Consumer) │ │ │
│ │ └──────┬──────┘ └──────┬──────┘ └──────┬──────┘ │ │
│ │ │ │ │ │ │
│ │ └────────────────┴────────────────┘ │ │
│ │ │ │ │
│ │ 连接重试、故障转移 │ │
│ └──────────────────────────┼──────────────────────────────────────┘ │
│ │ │
│ ┌──────────────────────────┼──────────────────────────────────────┐ │
│ │ 接入层高可用 │ │
│ │ │ │ │
│ │ ┌───────────────────────┴───────────────────────┐ │ │
│ │ │ Load Balancer (HAProxy) │ │ │
│ │ │ ┌──────────┬──────────┬──────────┐ │ │ │
│ │ │ │ Active │ Backup │ Backup │ │ │ │
│ │ │ └──────────┴──────────┴──────────┘ │ │ │
│ │ └───────────────────────────────────────────────┘ │ │
│ │ 健康检查、负载均衡 │ │
│ └──────────────────────────┬──────────────────────────────────────┘ │
│ │ │
│ ┌──────────────────────────┼──────────────────────────────────────┐ │
│ │ 服务层高可用 │ │
│ │ │ │ │
│ │ ┌───────────────────────┴───────────────────────┐ │ │
│ │ │ RabbitMQ Cluster │ │ │
│ │ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │ │
│ │ │ │ Node 1 │ │ Node 2 │ │ Node 3 │ │ │ │
│ │ │ │ (Leader) │ │(Follower)│ │(Follower)│ │ │ │
│ │ │ └──────────┘ └──────────┘ └──────────┘ │ │ │
│ │ └───────────────────────────────────────────────┘ │ │
│ │ 集群、仲裁队列、镜像队列 │ │
│ └──────────────────────────┬──────────────────────────────────────┘ │
│ │ │
│ ┌──────────────────────────┼──────────────────────────────────────┐ │
│ │ 存储层高可用 │ │
│ │ │ │ │
│ │ ┌───────────────────────┴───────────────────────┐ │ │
│ │ │ Persistent Storage │ │ │
│ │ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │ │
│ │ │ │ Disk 1 │ │ Disk 2 │ │ Disk 3 │ │ │ │
│ │ │ │ (RAID) │ │ (RAID) │ │ (RAID) │ │ │ │
│ │ │ └──────────┘ └──────────┘ └──────────┘ │ │ │
│ │ └───────────────────────────────────────────────┘ │ │
│ │ 持久化存储、备份 │ │
│ └─────────────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────┘仲裁队列高可用配置
PHP 代码示例:仲裁队列声明
php
<?php
namespace App\Messaging\HA;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;
class QuorumQueueManager
{
private $channel;
public function declareQuorumQueue(
string $queueName,
int $quorumSize = 3,
array $options = []
): void {
$arguments = new AMQPTable([
'x-queue-type' => 'quorum',
'x-quorum-initial-group-size' => $quorumSize,
'x-delivery-limit' => $options['delivery_limit'] ?? 10,
'x-dead-letter-exchange' => $options['dead_letter_exchange'] ?? '',
'x-dead-letter-routing-key' => $options['dead_letter_routing_key'] ?? '',
'x-max-length' => $options['max_length'] ?? 0,
'x-message-ttl' => $options['message_ttl'] ?? 0,
'x-quorum-min-group-size' => $options['min_group_size'] ?? 2,
]);
$this->channel->queue_declare(
$queueName,
false,
true,
false,
false,
false,
$arguments
);
}
public function declareClassicQueueWithHA(
string $queueName,
string $haPolicy = 'all',
array $options = []
): void {
$arguments = new AMQPTable([
'x-ha-policy' => $haPolicy,
'x-ha-policy-params' => $options['ha_nodes'] ?? null,
'x-message-ttl' => $options['message_ttl'] ?? 0,
'x-max-length' => $options['max_length'] ?? 0,
'x-dead-letter-exchange' => $options['dead_letter_exchange'] ?? '',
]);
$this->channel->queue_declare(
$queueName,
false,
true,
false,
false,
false,
$arguments
);
}
public function publishWithConfirmation(
string $exchange,
string $routingKey,
AMQPMessage $message,
int $timeout = 5
): bool {
$this->channel->confirm_select();
$this->channel->basic_publish(
$message,
$exchange,
$routingKey,
true,
false
);
return $this->channel->wait_for_pending_acks($timeout);
}
}高可用策略配置
php
<?php
namespace App\Messaging\HA;
class HAPolicyManager
{
private $channel;
public function applyHAPolicy(
string $pattern,
array $policy
): void {
$policyDefinition = json_encode([
'ha-mode' => $policy['ha_mode'] ?? 'exactly',
'ha-params' => $policy['ha_params'] ?? 2,
'ha-sync-mode' => $policy['ha_sync_mode'] ?? 'automatic',
'ha-sync-batch-size' => $policy['ha_sync_batch_size'] ?? 100,
'ha-promote-on-shutdown' => $policy['ha_promote_on_shutdown'] ?? 'when-synced',
'ha-promote-on-failure' => $policy['ha_promote_on_failure'] ?? 'when-synced',
]);
$this->channel->set_policy(
$pattern . '_ha_policy',
$pattern,
$policyDefinition,
0,
'queues',
null
);
}
public function applyQuorumPolicy(string $pattern): void
{
$policyDefinition = json_encode([
'target-group-size' => 3,
]);
$this->channel->set_policy(
$pattern . '_quorum_policy',
$pattern,
$policyDefinition,
0,
'queues',
null
);
}
public function removePolicy(string $policyName): void
{
$this->channel->clear_policy($policyName);
}
}连接高可用配置
php
<?php
namespace App\Messaging\HA;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use Psr\Log\LoggerInterface;
class HAConnectionManager
{
private array $nodes;
private LoggerInterface $logger;
private ?AMQPStreamConnection $connection = null;
private array $config;
public function __construct(array $nodes, LoggerInterface $logger, array $config = [])
{
$this->nodes = $nodes;
$this->logger = $logger;
$this->config = array_merge([
'connection_timeout' => 3.0,
'read_write_timeout' => 10.0,
'heartbeat' => 60,
'max_retries' => 3,
'retry_delay' => 1000,
'keepalive' => true,
], $config);
}
public function getConnection(): AMQPStreamConnection
{
if ($this->connection && $this->connection->isConnected()) {
return $this->connection;
}
return $this->establishConnection();
}
private function establishConnection(): AMQPStreamConnection
{
$nodes = $this->getAvailableNodes();
$lastException = null;
foreach ($nodes as $node) {
try {
$connection = $this->createConnection($node);
$this->setupConnectionHandlers($connection);
$this->connection = $connection;
$this->logger->info('Connected to RabbitMQ node', [
'host' => $node['host'],
'port' => $node['port'],
]);
return $connection;
} catch (\Exception $e) {
$lastException = $e;
$this->logger->warning('Failed to connect to node', [
'host' => $node['host'],
'error' => $e->getMessage(),
]);
}
}
throw new ConnectionException(
'Failed to connect to any RabbitMQ node',
0,
$lastException
);
}
private function getAvailableNodes(): array
{
$nodes = $this->nodes;
shuffle($nodes);
return $nodes;
}
private function createConnection(array $node): AMQPStreamConnection
{
return new AMQPStreamConnection(
$node['host'],
$node['port'],
$node['user'] ?? $this->config['user'] ?? 'guest',
$node['password'] ?? $this->config['password'] ?? 'guest',
$node['vhost'] ?? $this->config['vhost'] ?? '/',
false,
'AMQPLAIN',
null,
'en_US',
$this->config['connection_timeout'],
$this->config['read_write_timeout'],
null,
$this->config['keepalive'],
$this->config['heartbeat']
);
}
private function setupConnectionHandlers(AMQPStreamConnection $connection): void
{
$connection->set_close_handler(function ($reason, $replyCode) {
$this->handleConnectionClose($reason, $replyCode);
});
$connection->set_blocked_handler(function ($reason) {
$this->handleBlocked($reason);
});
$connection->set_unblocked_handler(function () {
$this->handleUnblocked();
});
}
private function handleConnectionClose(string $reason, int $replyCode): void
{
$this->logger->error('Connection closed', [
'reason' => $reason,
'reply_code' => $replyCode,
]);
$this->connection = null;
}
private function handleBlocked(string $reason): void
{
$this->logger->warning('Connection blocked', ['reason' => $reason]);
}
private function handleUnblocked(): void
{
$this->logger->info('Connection unblocked');
}
public function reconnect(): AMQPStreamConnection
{
$this->close();
return $this->establishConnection();
}
public function close(): void
{
if ($this->connection) {
try {
$this->connection->close();
} catch (\Exception $e) {
$this->logger->warning('Error closing connection', [
'error' => $e->getMessage(),
]);
}
$this->connection = null;
}
}
}消费者高可用配置
php
<?php
namespace App\Messaging\HA;
use PhpAmqpLib\Message\AMQPMessage;
use Psr\Log\LoggerInterface;
class HAConsumer
{
private HAConnectionManager $connectionManager;
private LoggerInterface $logger;
private array $config;
private $channel = null;
public function __construct(
HAConnectionManager $connectionManager,
LoggerInterface $logger,
array $config = []
) {
$this->connectionManager = $connectionManager;
$this->logger = $logger;
$this->config = array_merge([
'prefetch_count' => 10,
'consumer_timeout' => 30,
'reconnect_delay' => 5,
'max_reconnect_attempts' => 10,
], $config);
}
public function consume(string $queue, callable $handler): void
{
$reconnectAttempts = 0;
while ($reconnectAttempts < $this->config['max_reconnect_attempts']) {
try {
$this->ensureChannel();
$this->setupConsumer($queue, $handler);
$reconnectAttempts = 0;
while ($this->channel && $this->channel->is_consuming()) {
try {
$this->channel->wait(
null,
false,
$this->config['consumer_timeout']
);
} catch (\PhpAmqpLib\Exception\AMQPTimeoutException $e) {
continue;
}
}
} catch (\Exception $e) {
$reconnectAttempts++;
$this->logger->error('Consumer error, attempting reconnect', [
'attempt' => $reconnectAttempts,
'error' => $e->getMessage(),
]);
$this->closeChannel();
sleep($this->config['reconnect_delay']);
}
}
throw new ConsumerException('Max reconnect attempts reached');
}
private function ensureChannel(): void
{
if ($this->channel && $this->channel->is_open()) {
return;
}
$connection = $this->connectionManager->getConnection();
$this->channel = $connection->channel();
$this->channel->basic_qos(
null,
$this->config['prefetch_count'],
null
);
$this->channel->set_close_handler(function ($reason) {
$this->logger->warning('Channel closed', ['reason' => $reason]);
$this->channel = null;
});
}
private function setupConsumer(string $queue, callable $handler): void
{
$callback = function (AMQPMessage $message) use ($handler) {
$this->processMessage($message, $handler);
};
$this->channel->basic_consume(
$queue,
$this->getConsumerTag(),
false,
false,
false,
false,
$callback
);
$this->logger->info('Consumer started', [
'queue' => $queue,
'consumer_tag' => $this->getConsumerTag(),
]);
}
private function processMessage(AMQPMessage $message, callable $handler): void
{
try {
$result = $handler($message);
if ($result !== false) {
$message->ack();
} else {
$message->nack(false, true);
}
} catch (\Exception $e) {
$this->logger->error('Message processing failed', [
'error' => $e->getMessage(),
'message_id' => $message->get('message_id'),
]);
$message->nack(false, true);
}
}
private function getConsumerTag(): string
{
return gethostname() . '-' . getmypid();
}
private function closeChannel(): void
{
if ($this->channel) {
try {
$this->channel->close();
} catch (\Exception $e) {
// Ignore close errors
}
$this->channel = null;
}
}
public function stop(): void
{
$this->closeChannel();
$this->connectionManager->close();
}
}错误做法:无高可用保护
php
<?php
class NonHAConsumer
{
public function consume(string $queue, callable $handler): void
{
// 错误1:单节点连接,无故障转移
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
// 错误2:无重连机制
$callback = function ($message) use ($handler) {
$handler($message);
// 错误3:无异常处理
$message->ack();
};
$channel->basic_consume($queue, '', false, false, false, false, $callback);
// 错误4:无超时处理
while ($channel->is_consuming()) {
$channel->wait();
}
// 错误5:连接断开后无法恢复
}
}镜像队列配置
HA Policy 配置示例
bash
# 设置所有队列为镜像队列,同步到所有节点
rabbitmqctl set_policy ha-all ".*" '{"ha-mode":"all"}'
# 设置镜像队列为 2 个副本
rabbitmqctl set_policy ha-exactly "^ha\." '{"ha-mode":"exactly","ha-params":2}'
# 设置特定队列为镜像队列
rabbitmqctl set_policy ha-order "^order\." '{"ha-mode":"exactly","ha-params":2,"ha-sync-mode":"automatic"}'
# 设置仲裁队列策略
rabbitmqctl set_policy quorum "^quorum\." '{"queue-type":"quorum"}'PHP 设置 HA Policy
php
<?php
namespace App\Messaging\HA;
class PolicySetup
{
private $apiClient;
private string $vhost;
public function __construct($apiClient, string $vhost = '/')
{
$this->apiClient = $apiClient;
$this->vhost = $vhost;
}
public function setHAPolicy(
string $name,
string $pattern,
array $definition,
int $priority = 0
): void {
$url = sprintf('/api/policies/%s/%s', urlencode($this->vhost), urlencode($name));
$data = [
'pattern' => $pattern,
'definition' => $definition,
'priority' => $priority,
'apply-to' => 'queues',
];
$this->apiClient->put($url, $data);
}
public function setupDefaultPolicies(): void
{
$this->setHAPolicy('ha-all', '.*', [
'ha-mode' => 'all',
'ha-sync-mode' => 'automatic',
]);
$this->setHAPolicy('ha-critical', '^critical\.', [
'ha-mode' => 'exactly',
'ha-params' => 3,
'ha-sync-mode' => 'automatic',
], 10);
$this->setHAPolicy('quorum-default', '^quorum\.', [
'x-queue-type' => 'quorum',
], 5);
}
}故障转移配置
自动故障转移
php
<?php
namespace App\Messaging\HA;
class FailoverManager
{
private array $nodes;
private $logger;
private int $healthCheckInterval = 30;
public function __construct(array $nodes, $logger)
{
$this->nodes = $nodes;
$this->logger = $logger;
}
public function monitorAndFailover(): void
{
while (true) {
foreach ($this->nodes as $node) {
$health = $this->checkNodeHealth($node);
if (!$health['healthy']) {
$this->handleNodeFailure($node, $health);
}
}
sleep($this->healthCheckInterval);
}
}
private function checkNodeHealth(array $node): array
{
$url = sprintf('http://%s:15672/api/health/checks/alarms', $node['host']);
try {
$response = $this->httpGet($url, $node['user'], $node['password']);
return [
'healthy' => $response['status'] === 'ok',
'alarms' => $response['alarms'] ?? [],
];
} catch (\Exception $e) {
return [
'healthy' => false,
'error' => $e->getMessage(),
];
}
}
private function handleNodeFailure(array $node, array $health): void
{
$this->logger->error('Node health check failed', [
'node' => $node['host'],
'health' => $health,
]);
$this->notifyAdmins($node, $health);
$this->rebalanceQueues($node);
}
private function rebalanceQueues(array $failedNode): void
{
foreach ($this->nodes as $node) {
if ($node['host'] !== $failedNode['host']) {
$this->triggerQueueRebalance($node);
break;
}
}
}
private function triggerQueueRebalance(array $node): void
{
$url = sprintf('http://%s:15672/api/rebalance/queues', $node['host']);
try {
$this->httpPost($url, [], $node['user'], $node['password']);
} catch (\Exception $e) {
$this->logger->error('Failed to trigger queue rebalance', [
'error' => $e->getMessage(),
]);
}
}
private function notifyAdmins(array $node, array $health): void
{
// 发送告警通知
}
private function httpGet(string $url, string $user, string $password): array
{
// HTTP GET 实现
return [];
}
private function httpPost(string $url, array $data, string $user, string $password): array
{
// HTTP POST 实现
return [];
}
}最佳实践建议清单
仲裁队列配置
- [ ] 使用仲裁队列替代镜像队列
- [ ] 配置合理的仲裁节点数
- [ ] 设置消息投递限制
- [ ] 配置死信队列
连接高可用
- [ ] 配置多节点连接
- [ ] 实现自动重连机制
- [ ] 设置连接超时和心跳
- [ ] 处理连接阻塞事件
消费者高可用
- [ ] 实现消费者重连
- [ ] 配置合理的预取数量
- [ ] 处理消息确认异常
- [ ] 实现优雅关闭
集群高可用
- [ ] 配置 HA Policy
- [ ] 设置自动同步
- [ ] 配置故障转移策略
- [ ] 监控集群状态
生产环境注意事项
仲裁队列 vs 镜像队列
- RabbitMQ 3.8+ 推荐使用仲裁队列
- 仲裁队列数据一致性更强
- 镜像队列在极端情况下可能丢消息
节点数量
- 仲裁队列需要奇数节点(3、5、7)
- 最小生产配置为 3 节点
- 跨可用区部署提高容灾能力
故障恢复
- 配置自动故障转移
- 准备手动干预脚本
- 定期演练故障恢复
监控告警
- 监控节点状态
- 监控队列同步状态
- 配置告警阈值
