Appearance
RabbitMQ 与 Kubernetes 集成
概述
Kubernetes 是目前最流行的容器编排平台,提供了强大的部署、扩展和管理能力。将 RabbitMQ 部署在 Kubernetes 上,可以实现高可用、弹性伸缩、自动故障恢复等特性。本教程将详细介绍 RabbitMQ 在 Kubernetes 上的部署、配置和运维。
集成架构设计
架构图
┌─────────────────────────────────────────────────────────────────────┐
│ Kubernetes + RabbitMQ 集成架构 │
│ │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ Kubernetes Cluster │ │
│ │ │ │
│ │ ┌─────────────────────────────────────────────────────┐ │ │
│ │ │ Ingress / Load Balancer │ │ │
│ │ └─────────────────────────────────────────────────────┘ │ │
│ │ │ │ │
│ │ ┌─────────────────────────────────────────────────────┐ │ │
│ │ │ RabbitMQ Cluster │ │ │
│ │ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ │ │
│ │ │ │ Pod 1 │ │ Pod 2 │ │ Pod 3 │ │ │ │
│ │ │ │ (Master)│ │ (Slave) │ │ (Slave) │ │ │ │
│ │ │ └─────────┘ └─────────┘ └─────────┘ │ │ │
│ │ │ │ │ │ │ │ │
│ │ │ └──────────────┴──────────────┘ │ │ │
│ │ │ │ │ │ │
│ │ │ ┌──────┴──────┐ │ │ │
│ │ │ │ Service │ │ │ │
│ │ │ └─────────────┘ │ │ │
│ │ └─────────────────────────────────────────────────────┘ │ │
│ │ │ │ │
│ │ ┌─────────────────────────────────────────────────────┐ │ │
│ │ │ Application Pods │ │ │
│ │ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ │ │
│ │ │ │ App 1 │ │ App 2 │ │ App 3 │ │ │ │
│ │ │ └─────────┘ └─────────┘ └─────────┘ │ │ │
│ │ └─────────────────────────────────────────────────────┘ │ │
│ │ │ │
│ │ ┌─────────────────────────────────────────────────────┐ │ │
│ │ │ Persistent Storage │ │ │
│ │ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ │ │
│ │ │ │ PVC 1 │ │ PVC 2 │ │ PVC 3 │ │ │ │
│ │ │ └─────────┘ └─────────┘ └─────────┘ │ │ │
│ │ └─────────────────────────────────────────────────────┘ │ │
│ └─────────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────────┘集成模式
| 模式 | 说明 |
|---|---|
| StatefulSet | 有状态应用部署,保证 Pod 标识和存储持久性 |
| Service | 服务发现和负载均衡 |
| ConfigMap | 配置管理 |
| Secret | 敏感信息管理 |
| HorizontalPodAutoscaler | 自动伸缩 |
RabbitMQ Kubernetes 部署
命名空间配置
yaml
# namespace.yaml
apiVersion: v1
kind: Namespace
metadata:
name: rabbitmq
labels:
app.kubernetes.io/name: rabbitmq
app.kubernetes.io/component: messagingSecret 配置
yaml
# secret.yaml
apiVersion: v1
kind: Secret
metadata:
name: rabbitmq-secret
namespace: rabbitmq
type: Opaque
stringData:
RABBITMQ_DEFAULT_USER: admin
RABBITMQ_DEFAULT_PASS: "your-secure-password"
RABBITMQ_ERLANG_COOKIE: "your-erlang-cookie"
---
apiVersion: v1
kind: Secret
metadata:
name: rabbitmq-tls
namespace: rabbitmq
type: kubernetes.io/tls
stringData:
tls.crt: |
-----BEGIN CERTIFICATE-----
...
-----END CERTIFICATE-----
tls.key: |
-----BEGIN PRIVATE KEY-----
...
-----END PRIVATE KEY-----ConfigMap 配置
yaml
# configmap.yaml
apiVersion: v1
kind: ConfigMap
metadata:
name: rabbitmq-config
namespace: rabbitmq
data:
rabbitmq.conf: |
## Cluster Configuration
cluster_formation.peer_discovery_backend = rabbit_peer_discovery_k8s
cluster_formation.k8s.host = kubernetes.default.svc.cluster.local
cluster_formation.k8s.address_type = hostname
cluster_formation.node_cleanup.interval = 30
cluster_formation.node_cleanup.only_log_warning = true
## Memory Management
vm_memory_high_watermark.relative = 0.6
vm_memory_high_watermark_paging_ratio = 0.75
## Disk Management
disk_free_limit.relative = 2.0
## Network
listeners.tcp.default = 5672
management.tcp.port = 15672
## TLS
listeners.ssl.default = 5671
ssl_options.cacertfile = /etc/rabbitmq-tls/ca.crt
ssl_options.certfile = /etc/rabbitmq-tls/tls.crt
ssl_options.keyfile = /etc/rabbitmq-tls/tls.key
ssl_options.verify = verify_peer
ssl_options.fail_if_no_peer_cert = false
## Management
management.load_definitions = /etc/rabbitmq/definitions.json
## Policies
default_vhost = /
default_user = guest
default_pass = guest
## Logging
log.console.level = info
log.file.level = info
log.file = /var/log/rabbitmq/rabbitmq.log
## Performance
channel_max = 2048
heartbeat = 60
consumer_timeout = 1800000
enabled_plugins: |
[rabbitmq_management,rabbitmq_peer_discovery_k8s,rabbitmq_prometheus].
definitions.json: |
{
"users": [],
"vhosts": [],
"permissions": [],
"queues": [],
"exchanges": [],
"bindings": [],
"policies": [
{
"name": "ha-all",
"vhost": "/",
"pattern": ".*",
"definition": {
"ha-mode": "all",
"ha-sync-mode": "automatic"
}
}
]
}StatefulSet 配置
yaml
# statefulset.yaml
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: rabbitmq
namespace: rabbitmq
labels:
app.kubernetes.io/name: rabbitmq
app.kubernetes.io/component: messaging
spec:
serviceName: rabbitmq-headless
replicas: 3
selector:
matchLabels:
app.kubernetes.io/name: rabbitmq
template:
metadata:
labels:
app.kubernetes.io/name: rabbitmq
annotations:
prometheus.io/scrape: "true"
prometheus.io/port: "15692"
spec:
serviceAccountName: rabbitmq
terminationGracePeriodSeconds: 600
affinity:
podAntiAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
- labelSelector:
matchLabels:
app.kubernetes.io/name: rabbitmq
topologyKey: kubernetes.io/hostname
containers:
- name: rabbitmq
image: rabbitmq:3.12-management
imagePullPolicy: IfNotPresent
ports:
- name: amqp
containerPort: 5672
protocol: TCP
- name: amqps
containerPort: 5671
protocol: TCP
- name: management
containerPort: 15672
protocol: TCP
- name: prometheus
containerPort: 15692
protocol: TCP
- name: epmd
containerPort: 4369
protocol: TCP
- name: dist
containerPort: 25672
protocol: TCP
env:
- name: RABBITMQ_DEFAULT_USER
valueFrom:
secretKeyRef:
name: rabbitmq-secret
key: RABBITMQ_DEFAULT_USER
- name: RABBITMQ_DEFAULT_PASS
valueFrom:
secretKeyRef:
name: rabbitmq-secret
key: RABBITMQ_DEFAULT_PASS
- name: RABBITMQ_ERLANG_COOKIE
valueFrom:
secretKeyRef:
name: rabbitmq-secret
key: RABBITMQ_ERLANG_COOKIE
- name: RABBITMQ_NODENAME
valueFrom:
fieldRef:
fieldPath: metadata.name
- name: K8S_SERVICE_NAME
value: "rabbitmq-headless"
- name: POD_IP
valueFrom:
fieldRef:
fieldPath: status.podIP
- name: RABBITMQ_USE_LONGNAME
value: "true"
- name: RABBITMQ_CONFIG_FILE
value: "/etc/rabbitmq/rabbitmq"
resources:
requests:
cpu: "500m"
memory: "1Gi"
limits:
cpu: "2"
memory: "4Gi"
livenessProbe:
exec:
command:
- rabbitmq-diagnostics
- check_running
initialDelaySeconds: 60
periodSeconds: 30
timeoutSeconds: 10
failureThreshold: 3
readinessProbe:
exec:
command:
- rabbitmq-diagnostics
- check_running
initialDelaySeconds: 20
periodSeconds: 10
timeoutSeconds: 5
failureThreshold: 3
volumeMounts:
- name: rabbitmq-data
mountPath: /var/lib/rabbitmq
- name: rabbitmq-config
mountPath: /etc/rabbitmq
- name: rabbitmq-tls
mountPath: /etc/rabbitmq-tls
readOnly: true
- name: rabbitmq-logs
mountPath: /var/log/rabbitmq
volumes:
- name: rabbitmq-config
configMap:
name: rabbitmq-config
- name: rabbitmq-tls
secret:
secretName: rabbitmq-tls
- name: rabbitmq-logs
emptyDir: {}
volumeClaimTemplates:
- metadata:
name: rabbitmq-data
spec:
accessModes:
- ReadWriteOnce
storageClassName: standard
resources:
requests:
storage: 10GiService 配置
yaml
# service.yaml
apiVersion: v1
kind: Service
metadata:
name: rabbitmq-headless
namespace: rabbitmq
labels:
app.kubernetes.io/name: rabbitmq
spec:
type: ClusterIP
clusterIP: None
ports:
- name: amqp
port: 5672
targetPort: amqp
- name: management
port: 15672
targetPort: management
- name: prometheus
port: 15692
targetPort: prometheus
selector:
app.kubernetes.io/name: rabbitmq
---
apiVersion: v1
kind: Service
metadata:
name: rabbitmq
namespace: rabbitmq
labels:
app.kubernetes.io/name: rabbitmq
spec:
type: ClusterIP
ports:
- name: amqp
port: 5672
targetPort: amqp
- name: management
port: 15672
targetPort: management
selector:
app.kubernetes.io/name: rabbitmq
---
apiVersion: v1
kind: Service
metadata:
name: rabbitmq-external
namespace: rabbitmq
labels:
app.kubernetes.io/name: rabbitmq
spec:
type: LoadBalancer
ports:
- name: amqp
port: 5672
targetPort: amqp
- name: management
port: 15672
targetPort: management
selector:
app.kubernetes.io/name: rabbitmqServiceAccount 和 RBAC
yaml
# rbac.yaml
apiVersion: v1
kind: ServiceAccount
metadata:
name: rabbitmq
namespace: rabbitmq
---
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
name: rabbitmq
namespace: rabbitmq
rules:
- apiGroups: [""]
resources: ["endpoints"]
verbs: ["get", "list", "watch"]
- apiGroups: [""]
resources: ["pods"]
verbs: ["get", "list", "watch"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
name: rabbitmq
namespace: rabbitmq
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: Role
name: rabbitmq
subjects:
- kind: ServiceAccount
name: rabbitmq
namespace: rabbitmqHorizontalPodAutoscaler 配置
yaml
# hpa.yaml
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: rabbitmq
namespace: rabbitmq
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: StatefulSet
name: rabbitmq
minReplicas: 3
maxReplicas: 10
metrics:
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 70
- type: Resource
resource:
name: memory
target:
type: Utilization
averageUtilization: 80
- type: Pods
pods:
metric:
name: rabbitmq_queue_messages_ready
target:
type: AverageValue
averageValue: "1000"
behavior:
scaleDown:
stabilizationWindowSeconds: 300
policies:
- type: Pods
value: 1
periodSeconds: 120
scaleUp:
stabilizationWindowSeconds: 60
policies:
- type: Pods
value: 2
periodSeconds: 60PodDisruptionBudget 配置
yaml
# pdb.yaml
apiVersion: policy/v1
kind: PodDisruptionBudget
metadata:
name: rabbitmq
namespace: rabbitmq
spec:
minAvailable: 2
selector:
matchLabels:
app.kubernetes.io/name: rabbitmqPHP 应用部署配置
PHP 应用 Deployment
yaml
# php-app-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: php-app
namespace: default
spec:
replicas: 3
selector:
matchLabels:
app: php-app
template:
metadata:
labels:
app: php-app
spec:
containers:
- name: php-app
image: your-registry/php-app:latest
ports:
- containerPort: 80
env:
- name: RABBITMQ_HOST
value: "rabbitmq.rabbitmq.svc.cluster.local"
- name: RABBITMQ_PORT
value: "5672"
- name: RABBITMQ_USER
valueFrom:
secretKeyRef:
name: rabbitmq-secret
key: RABBITMQ_DEFAULT_USER
- name: RABBITMQ_PASSWORD
valueFrom:
secretKeyRef:
name: rabbitmq-secret
key: RABBITMQ_DEFAULT_PASS
resources:
requests:
cpu: "100m"
memory: "128Mi"
limits:
cpu: "500m"
memory: "512Mi"
livenessProbe:
httpGet:
path: /health
port: 80
initialDelaySeconds: 30
periodSeconds: 10
readinessProbe:
httpGet:
path: /ready
port: 80
initialDelaySeconds: 5
periodSeconds: 5PHP 应用 ConfigMap
yaml
# php-app-config.yaml
apiVersion: v1
kind: ConfigMap
metadata:
name: php-app-config
namespace: default
data:
RABBITMQ_EXCHANGE: "app.events"
RABBITMQ_VHOST: "/"
RABBITMQ_PREFETCH: "10"
RABBITMQ_HEARTBEAT: "60"PHP 代码示例
Kubernetes 环境下的 RabbitMQ 连接
php
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Connection\AMQPLazyConnection;
class KubernetesRabbitMQClient
{
private ?AMQPStreamConnection $connection = null;
private array $config;
public function __construct()
{
$this->config = [
'host' => getenv('RABBITMQ_HOST') ?: 'rabbitmq.rabbitmq.svc.cluster.local',
'port' => (int)(getenv('RABBITMQ_PORT') ?: 5672),
'user' => getenv('RABBITMQ_USER') ?: 'guest',
'password' => getenv('RABBITMQ_PASSWORD') ?: 'guest',
'vhost' => getenv('RABBITMQ_VHOST') ?: '/',
'heartbeat' => (int)(getenv('RABBITMQ_HEARTBEAT') ?: 60),
];
}
public function getConnection(): AMQPStreamConnection
{
if ($this->connection === null || !$this->connection->isConnected()) {
$this->connection = $this->createConnection();
}
return $this->connection;
}
private function createConnection(): AMQPStreamConnection
{
$maxRetries = 5;
$retryDelay = 2;
for ($attempt = 1; $attempt <= $maxRetries; $attempt++) {
try {
return new AMQPLazyConnection(
$this->config['host'],
$this->config['port'],
$this->config['user'],
$this->config['password'],
$this->config['vhost'],
false,
'AMQPLAIN',
null,
'en_US',
$this->config['heartbeat']
);
} catch (Exception $e) {
error_log(sprintf(
"Connection attempt %d/%d failed: %s",
$attempt,
$maxRetries,
$e->getMessage()
));
if ($attempt < $maxRetries) {
sleep($retryDelay);
$retryDelay *= 2;
} else {
throw $e;
}
}
}
throw new RuntimeException("Failed to connect to RabbitMQ");
}
public function close(): void
{
if ($this->connection !== null) {
$this->connection->close();
$this->connection = null;
}
}
}
class KubernetesHealthCheck
{
private KubernetesRabbitMQClient $client;
public function __construct(KubernetesRabbitMQClient $client)
{
$this->client = $client;
}
public function isHealthy(): bool
{
try {
$connection = $this->client->getConnection();
return $connection->isConnected();
} catch (Exception $e) {
return false;
}
}
public function isReady(): bool
{
try {
$connection = $this->client->getConnection();
$channel = $connection->channel();
$channel->queue_declare(
'health-check-' . uniqid(),
false,
false,
true,
true
);
$channel->close();
return true;
} catch (Exception $e) {
return false;
}
}
}
// 健康检查端点
if (php_sapi_name() === 'cli-server') {
$path = $_SERVER['REQUEST_URI'];
$client = new KubernetesRabbitMQClient();
$health = new KubernetesHealthCheck($client);
if ($path === '/health') {
header('Content-Type: application/json');
echo json_encode([
'status' => $health->isHealthy() ? 'healthy' : 'unhealthy',
'timestamp' => date('c'),
]);
} elseif ($path === '/ready') {
header('Content-Type: application/json');
echo json_encode([
'status' => $health->isReady() ? 'ready' : 'not_ready',
'timestamp' => date('c'),
]);
}
}Kubernetes 消费者 Worker
php
<?php
class KubernetesWorker
{
private KubernetesRabbitMQClient $client;
private bool $running = true;
public function __construct(KubernetesRabbitMQClient $client)
{
$this->client = $client;
$this->setupSignalHandlers();
}
private function setupSignalHandlers(): void
{
pcntl_async_signals(true);
pcntl_signal(SIGTERM, function () {
$this->running = false;
});
pcntl_signal(SIGINT, function () {
$this->running = false;
});
}
public function consume(string $queue, callable $processor): void
{
$connection = $this->client->getConnection();
$channel = $connection->channel();
$prefetch = (int)(getenv('RABBITMQ_PREFETCH') ?: 10);
$channel->basic_qos(0, $prefetch, false);
$consumerTag = 'worker-' . gethostname() . '-' . getmypid();
$channel->basic_consume(
$queue,
$consumerTag,
false,
false,
false,
false,
function ($message) use ($processor) {
$this->processMessage($message, $processor);
}
);
while ($this->running && $channel->is_consuming()) {
$channel->wait(null, true, 1);
}
$channel->close();
}
private function processMessage($message, callable $processor): void
{
try {
$processor($message);
$message->ack();
} catch (Exception $e) {
error_log("Message processing failed: " . $e->getMessage());
$message->nack(false, true);
}
}
public function stop(): void
{
$this->running = false;
}
}
// Worker 入口
$client = new KubernetesRabbitMQClient();
$worker = new KubernetesWorker($client);
$worker->consume('tasks', function ($message) {
$data = json_decode($message->getBody(), true);
echo "Processing: " . json_encode($data) . "\n";
});
$client->close();监控配置
Prometheus ServiceMonitor
yaml
# servicemonitor.yaml
apiVersion: monitoring.coreos.com/v1
kind: ServiceMonitor
metadata:
name: rabbitmq
namespace: rabbitmq
labels:
app.kubernetes.io/name: rabbitmq
release: prometheus
spec:
selector:
matchLabels:
app.kubernetes.io/name: rabbitmq
endpoints:
- port: prometheus
interval: 30s
path: /metricsPrometheusRule 告警规则
yaml
# prometheusrule.yaml
apiVersion: monitoring.coreos.com/v1
kind: PrometheusRule
metadata:
name: rabbitmq-alerts
namespace: rabbitmq
labels:
app.kubernetes.io/name: rabbitmq
release: prometheus
spec:
groups:
- name: rabbitmq
rules:
- alert: RabbitMQDown
expr: rabbitmq_up == 0
for: 5m
labels:
severity: critical
annotations:
summary: "RabbitMQ instance is down"
description: "RabbitMQ instance {{ $labels.instance }} has been down for more than 5 minutes."
- alert: RabbitMQHighMemory
expr: rabbitmq_memory_used_bytes / rabbitmq_memory_total_bytes > 0.9
for: 5m
labels:
severity: warning
annotations:
summary: "RabbitMQ memory usage is high"
description: "RabbitMQ {{ $labels.instance }} memory usage is above 90%."
- alert: RabbitMQQueueMessagesReady
expr: rabbitmq_queue_messages_ready > 10000
for: 5m
labels:
severity: warning
annotations:
summary: "RabbitMQ queue has too many ready messages"
description: "Queue {{ $labels.queue }} has {{ $value }} ready messages."
- alert: RabbitMQClusterPartition
expr: rabbitmq_partitions > 0
for: 1m
labels:
severity: critical
annotations:
summary: "RabbitMQ cluster partition detected"
description: "RabbitMQ cluster has {{ $value }} partitions."常见问题与解决方案
问题一:集群节点无法发现
症状: 节点无法加入集群
解决方案: 检查 Service 和 DNS 配置
yaml
# 确保 headless service 正确配置
apiVersion: v1
kind: Service
metadata:
name: rabbitmq-headless
spec:
clusterIP: None
selector:
app.kubernetes.io/name: rabbitmq问题二:存储持久化问题
症状: Pod 重启后数据丢失
解决方案: 使用正确的 StorageClass 和 PVC
yaml
volumeClaimTemplates:
- metadata:
name: rabbitmq-data
spec:
accessModes:
- ReadWriteOnce
storageClassName: standard
resources:
requests:
storage: 10Gi问题三:资源不足
症状: Pod 处于 Pending 状态
解决方案: 调整资源请求和限制
yaml
resources:
requests:
cpu: "500m"
memory: "1Gi"
limits:
cpu: "2"
memory: "4Gi"最佳实践建议
1. 安全配置
yaml
# NetworkPolicy
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
name: rabbitmq
namespace: rabbitmq
spec:
podSelector:
matchLabels:
app.kubernetes.io/name: rabbitmq
policyTypes:
- Ingress
- Egress
ingress:
- from:
- namespaceSelector:
matchLabels:
name: default
ports:
- protocol: TCP
port: 5672
- protocol: TCP
port: 156722. 备份策略
yaml
# CronJob for backup
apiVersion: batch/v1
kind: CronJob
metadata:
name: rabbitmq-backup
namespace: rabbitmq
spec:
schedule: "0 2 * * *"
jobTemplate:
spec:
template:
spec:
containers:
- name: backup
image: rabbitmq:3.12-management
command:
- /bin/sh
- -c
- |
rabbitmqctl export_definitions /backup/definitions.json
volumes:
- name: backup
persistentVolumeClaim:
claimName: rabbitmq-backup3. 优雅关闭
yaml
spec:
terminationGracePeriodSeconds: 600
lifecycle:
preStop:
exec:
command:
- /bin/sh
- -c
- "rabbitmqctl stop_app"版本兼容性
| Kubernetes | RabbitMQ | RabbitMQ Operator | Helm Chart |
|---|---|---|---|
| 1.28+ | 3.12.x | 2.6.x | 12.x |
| 1.27+ | 3.11.x | 2.5.x | 11.x |
| 1.26+ | 3.10.x | 2.4.x | 10.x |
| 1.25+ | 3.9.x | 2.3.x | 9.x |
