Skip to content

部署架构

概述

RabbitMQ 的部署架构直接影响系统的可用性、性能和可维护性。本文档介绍 RabbitMQ 在生产环境中的部署架构最佳实践,帮助您构建稳定可靠的消息系统。

部署架构类型

1. 单节点部署

适用于开发、测试环境,不建议用于生产环境。

┌─────────────────────────────────────┐
│            单节点架构                │
├─────────────────────────────────────┤
│                                     │
│    ┌─────────────────────────┐     │
│    │      RabbitMQ Server    │     │
│    │                         │     │
│    │  ┌─────┐ ┌─────┐ ┌────┐│     │
│    │  │Queue│ │Queue│ │ ...││     │
│    │  └─────┘ └─────┘ └────┘│     │
│    └─────────────────────────┘     │
│                                     │
│    优点:部署简单,资源消耗低         │
│    缺点:单点故障,无法扩展           │
│                                     │
└─────────────────────────────────────┘

2. 普通集群部署

多节点共享元数据,消息仍存储在单个节点。

┌─────────────────────────────────────────────────────────────┐
│                      普通集群架构                            │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐         │
│  │   Node 1    │  │   Node 2    │  │   Node 3    │         │
│  │  (Master)   │  │  (Slave)    │  │  (Slave)    │         │
│  │             │  │             │  │             │         │
│  │  ┌───────┐  │  │             │  │             │         │
│  │  │ Queue │  │  │             │  │             │         │
│  │  └───────┘  │  │             │  │             │         │
│  └──────┬──────┘  └──────┬──────┘  └──────┬──────┘         │
│         │                │                │                 │
│         └────────────────┴────────────────┘                 │
│                     元数据同步                               │
│                                                             │
│  优点:负载均衡,元数据共享                                   │
│  缺点:消息不复制,节点故障消息丢失                            │
│                                                             │
└─────────────────────────────────────────────────────────────┘

3. 镜像队列集群

消息在多个节点间复制,提供高可用性。

┌─────────────────────────────────────────────────────────────┐
│                     镜像队列集群架构                          │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐         │
│  │   Node 1    │  │   Node 2    │  │   Node 3    │         │
│  │  (Master)   │  │  (Mirror)   │  │  (Mirror)   │         │
│  │             │  │             │  │             │         │
│  │  ┌───────┐  │  │  ┌───────┐  │  │  ┌───────┐  │         │
│  │  │ Queue │◄─┼──┼─►│ Queue │◄─┼──┼─►│ Queue │  │         │
│  │  └───────┘  │  │  └───────┘  │  │  └───────┘  │         │
│  └─────────────┘  └─────────────┘  └─────────────┘         │
│                                                             │
│         └──────────消息同步复制──────────┘                   │
│                                                             │
│  优点:高可用,自动故障转移                                   │
│  缺点:同步复制影响性能,资源消耗大                            │
│                                                             │
└─────────────────────────────────────────────────────────────┘

4. 仲裁队列集群

RabbitMQ 3.8+ 推荐的高可用方案,使用 Raft 协议。

┌─────────────────────────────────────────────────────────────┐
│                     仲裁队列集群架构                          │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐         │
│  │   Node 1    │  │   Node 2    │  │   Node 3    │         │
│  │  (Leader)   │  │ (Follower)  │  │ (Follower)  │         │
│  │             │  │             │  │             │         │
│  │  ┌───────┐  │  │  ┌───────┐  │  │  ┌───────┐  │         │
│  │  │Quorum │  │  │  │Quorum │  │  │  │Quorum │  │         │
│  │  │ Queue │◄─┼──┼─►│ Queue │◄─┼──┼─►│ Queue │  │         │
│  │  └───────┘  │  │  └───────┘  │  │  └───────┘  │         │
│  └─────────────┘  └─────────────┘  └─────────────┘         │
│                                                             │
│         └──────────Raft 共识协议──────────┘                  │
│                                                             │
│  优点:数据一致性强,故障恢复快                               │
│  缺点:写入延迟略高,需要奇数节点                              │
│                                                             │
└─────────────────────────────────────────────────────────────┘

生产环境部署方案

推荐架构:仲裁队列 + 负载均衡

┌─────────────────────────────────────────────────────────────────────────┐
│                        生产环境推荐架构                                   │
├─────────────────────────────────────────────────────────────────────────┤
│                                                                         │
│    ┌─────────────────────────────────────────────────────────────┐     │
│    │                      负载均衡层                              │     │
│    │   ┌───────────┐   ┌───────────┐   ┌───────────┐            │     │
│    │   │  HAProxy  │   │  HAProxy  │   │  HAProxy  │            │     │
│    │   │  (Active) │   │ (Backup)  │   │ (Backup)  │            │     │
│    │   └─────┬─────┘   └─────┬─────┘   └─────┬─────┘            │     │
│    └─────────┼───────────────┼───────────────┼──────────────────┘     │
│              │               │               │                         │
│              ▼               ▼               ▼                         │
│    ┌─────────────────────────────────────────────────────────────┐     │
│    │                    RabbitMQ 集群层                           │     │
│    │                                                              │     │
│    │  ┌───────────────┐  ┌───────────────┐  ┌───────────────┐   │     │
│    │  │    Node 1     │  │    Node 2     │  │    Node 3     │   │     │
│    │  │  rabbit@n1    │  │  rabbit@n2    │  │  rabbit@n3    │   │     │
│    │  │               │  │               │  │               │   │     │
│    │  │  ┌─────────┐  │  │  ┌─────────┐  │  │  ┌─────────┐  │   │     │
│    │  │  │ Quorum  │  │  │  │ Quorum  │  │  │  │ Quorum  │  │   │     │
│    │  │  │  Queue  │◄─┼──┼─►│  Queue  │◄─┼──┼─►│  Queue  │  │   │     │
│    │  │  └─────────┘  │  │  └─────────┘  │  │  └─────────┘  │   │     │
│    │  │               │  │               │  │               │   │     │
│    │  │  Port: 5672   │  │  Port: 5672   │  │  Port: 5672   │   │     │
│    │  │  Port: 15672  │  │  Port: 15672  │  │  Port: 15672  │   │     │
│    │  └───────────────┘  └───────────────┘  └───────────────┘   │     │
│    │                                                              │     │
│    └─────────────────────────────────────────────────────────────┘     │
│                                                                         │
│    ┌─────────────────────────────────────────────────────────────┐     │
│    │                      监控告警层                              │     │
│    │   ┌───────────┐   ┌───────────┐   ┌───────────┐            │     │
│    │   │ Prometheus│   │  Grafana  │   │ AlertManager│           │     │
│    │   └───────────┘   └───────────┘   └───────────┘            │     │
│    └─────────────────────────────────────────────────────────────┘     │
│                                                                         │
└─────────────────────────────────────────────────────────────────────────┘

PHP 代码示例

正确做法:生产环境连接配置

php
<?php

namespace App\Messaging\Config;

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Connection\AMQPLazyConnection;

class ProductionConnectionFactory
{
    private array $nodes;
    private array $options;
    
    public function __construct(array $nodes, array $options = [])
    {
        $this->nodes = $nodes;
        $this->options = array_merge([
            'user' => 'guest',
            'password' => 'guest',
            'vhost' => '/',
            'connection_timeout' => 3.0,
            'read_write_timeout' => 10.0,
            'heartbeat' => 60,
            'keepalive' => true,
            'prefetch_count' => 10,
        ], $options);
    }
    
    public function createConnection(): AMQPStreamConnection
    {
        $lastException = null;
        
        foreach ($this->getShuffledNodes() as $node) {
            try {
                $connection = $this->connectToNode($node);
                
                if ($connection->isConnected()) {
                    return $connection;
                }
            } catch (\Exception $e) {
                $lastException = $e;
                $this->logConnectionFailure($node, $e);
                continue;
            }
        }
        
        throw new ConnectionException(
            'Failed to connect to any RabbitMQ node',
            0,
            $lastException
        );
    }
    
    private function getShuffledNodes(): array
    {
        $nodes = $this->nodes;
        shuffle($nodes);
        return $nodes;
    }
    
    private function connectToNode(array $node): AMQPStreamConnection
    {
        return new AMQPStreamConnection(
            $node['host'],
            $node['port'],
            $this->options['user'],
            $this->options['password'],
            $this->options['vhost'],
            false,
            'AMQPLAIN',
            null,
            'en_US',
            $this->options['connection_timeout'],
            $this->options['read_write_timeout'],
            null,
            $this->options['keepalive'],
            $this->options['heartbeat']
        );
    }
    
    private function logConnectionFailure(array $node, \Exception $e): void
    {
        error_log(sprintf(
            'Failed to connect to RabbitMQ node %s:%d - %s',
            $node['host'],
            $node['port'],
            $e->getMessage()
        ));
    }
}

class ConnectionManager
{
    private ProductionConnectionFactory $factory;
    private ?AMQPStreamConnection $connection = null;
    private int $maxRetries = 3;
    private int $retryDelay = 1000;
    
    public function __construct(ProductionConnectionFactory $factory)
    {
        $this->factory = $factory;
    }
    
    public function getConnection(): AMQPStreamConnection
    {
        if ($this->connection && $this->connection->isConnected()) {
            return $this->connection;
        }
        
        return $this->reconnect();
    }
    
    public function reconnect(): AMQPStreamConnection
    {
        $attempt = 0;
        $lastException = null;
        
        while ($attempt < $this->maxRetries) {
            try {
                $this->connection = $this->factory->createConnection();
                $this->setupConnectionCallbacks();
                return $this->connection;
            } catch (\Exception $e) {
                $lastException = $e;
                $attempt++;
                
                if ($attempt < $this->maxRetries) {
                    usleep($this->retryDelay * 1000 * $attempt);
                }
            }
        }
        
        throw $lastException;
    }
    
    private function setupConnectionCallbacks(): void
    {
        $this->connection->set_close_handler(function ($reason) {
            $this->handleConnectionClose($reason);
        });
        
        $this->connection->set_blocked_handler(function ($reason) {
            $this->handleConnectionBlocked($reason);
        });
        
        $this->connection->set_unblocked_handler(function () {
            $this->handleConnectionUnblocked();
        });
    }
    
    private function handleConnectionClose(string $reason): void
    {
        error_log("RabbitMQ connection closed: {$reason}");
        $this->connection = null;
    }
    
    private function handleConnectionBlocked(string $reason): void
    {
        error_log("RabbitMQ connection blocked: {$reason}");
    }
    
    private function handleConnectionUnblocked(): void
    {
        error_log("RabbitMQ connection unblocked");
    }
    
    public function close(): void
    {
        if ($this->connection) {
            $this->connection->close();
            $this->connection = null;
        }
    }
}

配置文件示例

php
<?php

return [
    'rabbitmq' => [
        'nodes' => [
            [
                'host' => env('RABBITMQ_NODE1_HOST', 'rabbitmq-node1'),
                'port' => env('RABBITMQ_NODE1_PORT', 5672),
            ],
            [
                'host' => env('RABBITMQ_NODE2_HOST', 'rabbitmq-node2'),
                'port' => env('RABBITMQ_NODE2_PORT', 5672),
            ],
            [
                'host' => env('RABBITMQ_NODE3_HOST', 'rabbitmq-node3'),
                'port' => env('RABBITMQ_NODE3_PORT', 5672),
            ],
        ],
        'user' => env('RABBITMQ_USER', 'admin'),
        'password' => env('RABBITMQ_PASSWORD', 'password'),
        'vhost' => env('RABBITMQ_VHOST', '/production'),
        'options' => [
            'connection_timeout' => 3.0,
            'read_write_timeout' => 10.0,
            'heartbeat' => 60,
            'keepalive' => true,
            'prefetch_count' => 10,
        ],
    ],
];

Docker Compose 部署示例

yaml
version: '3.8'

services:
  rabbitmq-node1:
    image: rabbitmq:3.12-management
    hostname: rabbitmq-node1
    environment:
      RABBITMQ_ERLANG_COOKIE: 'secret_cookie_value'
      RABBITMQ_DEFAULT_USER: 'admin'
      RABBITMQ_DEFAULT_PASS: 'password'
      RABBITMQ_DEFAULT_VHOST: '/production'
    volumes:
      - rabbitmq_node1_data:/var/lib/rabbitmq
      - ./rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf:ro
      - ./enabled_plugins:/etc/rabbitmq/enabled_plugins:ro
    ports:
      - "5672:5672"
      - "15672:15672"
    networks:
      - rabbitmq_cluster
    deploy:
      resources:
        limits:
          memory: 2G
        reservations:
          memory: 1G

  rabbitmq-node2:
    image: rabbitmq:3.12-management
    hostname: rabbitmq-node2
    environment:
      RABBITMQ_ERLANG_COOKIE: 'secret_cookie_value'
      RABBITMQ_DEFAULT_USER: 'admin'
      RABBITMQ_DEFAULT_PASS: 'password'
      RABBITMQ_DEFAULT_VHOST: '/production'
    volumes:
      - rabbitmq_node2_data:/var/lib/rabbitmq
      - ./rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf:ro
      - ./enabled_plugins:/etc/rabbitmq/enabled_plugins:ro
    ports:
      - "5673:5672"
      - "15673:15672"
    networks:
      - rabbitmq_cluster
    deploy:
      resources:
        limits:
          memory: 2G
        reservations:
          memory: 1G

  rabbitmq-node3:
    image: rabbitmq:3.12-management
    hostname: rabbitmq-node3
    environment:
      RABBITMQ_ERLANG_COOKIE: 'secret_cookie_value'
      RABBITMQ_DEFAULT_USER: 'admin'
      RABBITMQ_DEFAULT_PASS: 'password'
      RABBITMQ_DEFAULT_VHOST: '/production'
    volumes:
      - rabbitmq_node3_data:/var/lib/rabbitmq
      - ./rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf:ro
      - ./enabled_plugins:/etc/rabbitmq/enabled_plugins:ro
    ports:
      - "5674:5672"
      - "15674:15672"
    networks:
      - rabbitmq_cluster
    deploy:
      resources:
        limits:
          memory: 2G
        reservations:
          memory: 1G

  haproxy:
    image: haproxy:2.8
    volumes:
      - ./haproxy.cfg:/usr/local/etc/haproxy/haproxy.cfg:ro
    ports:
      - "5670:5670"
      - "15670:15670"
      - "8404:8404"
    networks:
      - rabbitmq_cluster
    depends_on:
      - rabbitmq-node1
      - rabbitmq-node2
      - rabbitmq-node3

volumes:
  rabbitmq_node1_data:
  rabbitmq_node2_data:
  rabbitmq_node3_data:

networks:
  rabbitmq_cluster:
    driver: bridge

HAProxy 配置示例

global
    log stdout format raw local0
    maxconn 4096

defaults
    log     global
    mode    tcp
    option  tcplog
    option  dontlognull
    timeout connect 5s
    timeout client  30s
    timeout server  30s
    retries 3

frontend rabbitmq_amqp
    bind *:5670
    mode tcp
    option tcplog
    default_backend rabbitmq_amqp_servers

backend rabbitmq_amqp_servers
    mode tcp
    balance roundrobin
    option tcp-check
    tcp-check connect port 5672
    server rabbitmq-node1 rabbitmq-node1:5672 check inter 5s rise 2 fall 3
    server rabbitmq-node2 rabbitmq-node2:5672 check inter 5s rise 2 fall 3
    server rabbitmq-node3 rabbitmq-node3:5672 check inter 5s rise 2 fall 3

frontend rabbitmq_management
    bind *:15670
    mode http
    option httplog
    default_backend rabbitmq_management_servers

backend rabbitmq_management_servers
    mode http
    balance roundrobin
    option httpchk GET /api/health/checks/alarms
    http-check expect status 200
    server rabbitmq-node1 rabbitmq-node1:15672 check inter 5s rise 2 fall 3
    server rabbitmq-node2 rabbitmq-node2:15672 check inter 5s rise 2 fall 3
    server rabbitmq-node3 rabbitmq-node3:15672 check inter 5s rise 2 fall 3

frontend stats
    bind *:8404
    mode http
    stats enable
    stats uri /stats
    stats refresh 10s
    stats admin if LOCALHOST

RabbitMQ 配置文件示例

# rabbitmq.conf

# 基础配置
listeners.tcp.default = 5672
management.tcp.port = 15672

# 内存配置
vm_memory_high_watermark.relative = 0.6
vm_memory_high_watermark_paging_ratio = 0.75

# 磁盘配置
disk_free_limit.relative = 1.5

# 队列配置
queue_master_locator = min-masters

# 仲裁队列配置
quorum_default_quorum_queue = true

# 集群配置
cluster_formation.peer_discovery_backend = rabbit_peer_discovery_classic_config
cluster_formation.classic_config.nodes.1 = rabbit@rabbitmq-node1
cluster_formation.classic_config.nodes.2 = rabbit@rabbitmq-node2
cluster_formation.classic_config.nodes.3 = rabbit@rabbitmq-node3

# 安全配置
auth_mechanisms.1 = PLAIN
auth_mechanisms.2 = AMQPLAIN

# 日志配置
log.console.level = info
log.file.level = info

# 心跳配置
heartbeat = 60

# 连接配置
connection_max = 10000
channel_max = 2048

错误做法:不合理的部署配置

php
<?php

class BadDeploymentConfig
{
    public function createConnection()
    {
        // 错误1:单节点硬编码,无故障转移
        $connection = new AMQPStreamConnection(
            'localhost',
            5672,
            'guest',
            'guest'
        );
        
        // 错误2:无超时配置
        // 错误3:无心跳配置
        // 错误4:无重连机制
        
        return $connection;
    }
    
    public function publish($data)
    {
        // 错误5:每次操作都创建新连接
        $connection = $this->createConnection();
        $channel = $connection->channel();
        
        $channel->basic_publish(...);
        
        // 错误6:立即关闭连接
        $channel->close();
        $connection->close();
    }
}

Kubernetes 部署方案

yaml
apiVersion: apps/v1
kind: StatefulSet
metadata:
  name: rabbitmq
  namespace: messaging
spec:
  serviceName: rabbitmq
  replicas: 3
  selector:
    matchLabels:
      app: rabbitmq
  template:
    metadata:
      labels:
        app: rabbitmq
    spec:
      terminationGracePeriodSeconds: 600
      containers:
      - name: rabbitmq
        image: rabbitmq:3.12-management
        ports:
        - containerPort: 5672
          name: amqp
        - containerPort: 15672
          name: management
        env:
        - name: RABBITMQ_ERLANG_COOKIE
          valueFrom:
            secretKeyRef:
              name: rabbitmq-secret
              key: erlang-cookie
        - name: RABBITMQ_DEFAULT_USER
          valueFrom:
            secretKeyRef:
              name: rabbitmq-secret
              key: username
        - name: RABBITMQ_DEFAULT_PASS
          valueFrom:
            secretKeyRef:
              name: rabbitmq-secret
              key: password
        - name: RABBITMQ_NODENAME
          valueFrom:
            fieldRef:
              fieldPath: metadata.name
        resources:
          limits:
            memory: "2Gi"
            cpu: "1000m"
          requests:
            memory: "1Gi"
            cpu: "500m"
        volumeMounts:
        - name: rabbitmq-data
          mountPath: /var/lib/rabbitmq
        - name: rabbitmq-config
          mountPath: /etc/rabbitmq
        livenessProbe:
          exec:
            command:
            - rabbitmq-diagnostics
            - check_running
          initialDelaySeconds: 60
          periodSeconds: 30
          timeoutSeconds: 10
        readinessProbe:
          exec:
            command:
            - rabbitmq-diagnostics
            - check_running
          initialDelaySeconds: 20
          periodSeconds: 10
          timeoutSeconds: 5
      volumes:
      - name: rabbitmq-config
        configMap:
          name: rabbitmq-config
  volumeClaimTemplates:
  - metadata:
      name: rabbitmq-data
    spec:
      accessModes: [ "ReadWriteOnce" ]
      resources:
        requests:
          storage: 10Gi
---
apiVersion: v1
kind: Service
metadata:
  name: rabbitmq
  namespace: messaging
spec:
  type: ClusterIP
  ports:
  - port: 5672
    targetPort: 5672
    name: amqp
  - port: 15672
    targetPort: 15672
    name: management
  selector:
    app: rabbitmq
---
apiVersion: v1
kind: Service
metadata:
  name: rabbitmq-headless
  namespace: messaging
spec:
  type: ClusterIP
  clusterIP: None
  ports:
  - port: 5672
    targetPort: 5672
    name: amqp
  - port: 15672
    targetPort: 15672
    name: management
  - port: 4369
    targetPort: 4369
    name: epmd
  - port: 25672
    targetPort: 25672
    name: clustering
  selector:
    app: rabbitmq

最佳实践建议清单

架构设计

  • [ ] 选择合适的集群模式(仲裁队列推荐)
  • [ ] 规划节点数量(奇数,至少 3 个)
  • [ ] 配置负载均衡器
  • [ ] 设计故障转移策略
  • [ ] 规划网络拓扑

资源配置

  • [ ] 合理分配内存资源
  • [ ] 配置磁盘空间告警
  • [ ] 设置连接数限制
  • [ ] 配置文件描述符限制
  • [ ] 规划存储卷大小

安全配置

  • [ ] 配置 TLS 加密
  • [ ] 设置强密码策略
  • [ ] 配置防火墙规则
  • [ ] 使用 vhost 隔离
  • [ ] 定期更新凭证

运维配置

  • [ ] 配置健康检查
  • [ ] 设置自动重启策略
  • [ ] 配置日志收集
  • [ ] 设置监控告警
  • [ ] 准备备份策略

生产环境注意事项

  1. 节点规划

    • 至少 3 个节点组成集群
    • 节点分布在不同的可用区
    • 每个节点独立存储
  2. 资源预留

    • 内存:至少 2GB,推荐 4GB+
    • 磁盘:SSD 存储,至少 20GB
    • CPU:至少 2 核
  3. 网络配置

    • 节点间低延迟网络
    • 配置防火墙规则
    • 使用内部 DNS
  4. 监控配置

    • 监控节点状态
    • 监控队列深度
    • 监控资源使用

相关链接