Appearance
RabbitMQ 日常运维任务
概述
日常运维是保障 RabbitMQ 稳定运行的基础工作。通过规范的运维流程和自动化工具,可以提高运维效率,减少人为错误。本文将详细介绍 RabbitMQ 的日常运维任务、检查清单和自动化脚本。
核心知识点
运维任务分类
| 分类 | 频率 | 任务 |
|---|---|---|
| 健康检查 | 每小时 | 节点状态、资源使用 |
| 日常巡检 | 每天 | 日志检查、队列状态 |
| 定期维护 | 每周 | 清理、优化 |
| 深度检查 | 每月 | 性能分析、容量评估 |
运维检查清单
□ 节点状态检查
□ 内存使用检查
□ 磁盘空间检查
□ 队列状态检查
□ 连接数检查
□ 消息堆积检查
□ 日志异常检查
□ 备份状态检查运维工具
| 工具 | 用途 |
|---|---|
| rabbitmqctl | 命令行管理工具 |
| rabbitmq-diagnostics | 诊断工具 |
| rabbitmq-plugins | 插件管理 |
| rabbitmq-queues | 队列管理 |
| HTTP API | 远程管理接口 |
配置示例
PHP 日常运维类
php
<?php
class RabbitMQDailyOps
{
private $apiClient;
private $logger;
private $config;
public function __construct($apiClient, $logger, $config = [])
{
$this->apiClient = $apiClient;
$this->logger = $logger;
$this->config = array_merge([
'max_messages_per_queue' => 10000,
'max_connections' => 1000,
'min_disk_free_gb' => 10,
'max_memory_percent' => 80,
], $config);
}
public function runHealthCheck()
{
$results = [];
$results['nodes'] = $this->checkNodes();
$results['resources'] = $this->checkResources();
$results['queues'] = $this->checkQueues();
$results['connections'] = $this->checkConnections();
$results['messages'] = $this->checkMessages();
$results['summary'] = $this->generateSummary($results);
$this->logger->info('Health check completed', $results);
return $results;
}
private function checkNodes()
{
$nodes = $this->apiClient->getNodes();
$results = [];
foreach ($nodes as $node) {
$issues = [];
if (!$node['running']) {
$issues[] = '节点未运行';
}
if ($node['mem_alarm'] ?? false) {
$issues[] = '内存告警已触发';
}
if ($node['disk_free_alarm'] ?? false) {
$issues[] = '磁盘告警已触发';
}
$results[$node['name']] = [
'running' => $node['running'],
'uptime' => $node['uptime'] ?? 0,
'type' => $node['type'],
'issues' => $issues,
'status' => empty($issues) ? 'healthy' : 'unhealthy',
];
}
return $results;
}
private function checkResources()
{
$nodes = $this->apiClient->getNodes();
$results = [];
foreach ($nodes as $node) {
$memUsed = $node['mem_used'] ?? 0;
$memLimit = $node['mem_limit'] ?? 1;
$memPercent = ($memUsed / $memLimit) * 100;
$diskFree = ($node['disk_free'] ?? 0) / 1024 / 1024 / 1024;
$fdUsed = $node['fd_used'] ?? 0;
$fdTotal = $node['fd_total'] ?? 1;
$fdPercent = ($fdUsed / $fdTotal) * 100;
$results[$node['name']] = [
'memory' => [
'used' => $memUsed,
'limit' => $memLimit,
'percent' => round($memPercent, 2),
'status' => $memPercent < $this->config['max_memory_percent'] ? 'ok' : 'warning',
],
'disk' => [
'free_gb' => round($diskFree, 2),
'status' => $diskFree > $this->config['min_disk_free_gb'] ? 'ok' : 'warning',
],
'file_descriptors' => [
'used' => $fdUsed,
'total' => $fdTotal,
'percent' => round($fdPercent, 2),
'status' => $fdPercent < 80 ? 'ok' : 'warning',
],
];
}
return $results;
}
private function checkQueues()
{
$queues = $this->apiClient->getQueues();
$results = [];
foreach ($queues as $queue) {
$issues = [];
if ($queue['messages'] > $this->config['max_messages_per_queue']) {
$issues[] = "消息堆积过多: {$queue['messages']}";
}
if ($queue['consumers'] === 0 && $queue['messages'] > 0) {
$issues[] = '队列有消息但无消费者';
}
if ($queue['state'] !== 'running') {
$issues[] = "队列状态异常: {$queue['state']}";
}
if (!empty($issues)) {
$results[$queue['name']] = [
'vhost' => $queue['vhost'],
'messages' => $queue['messages'],
'consumers' => $queue['consumers'],
'state' => $queue['state'],
'issues' => $issues,
];
}
}
return $results;
}
private function checkConnections()
{
$connections = $this->apiClient->getConnections();
$totalConnections = count($connections);
$idleConnections = 0;
$byHost = [];
foreach ($connections as $conn) {
if (($conn['channels'] ?? 0) === 0) {
$idleConnections++;
}
$host = $conn['peer_host'] ?? 'unknown';
$byHost[$host] = ($byHost[$host] ?? 0) + 1;
}
return [
'total' => $totalConnections,
'idle' => $idleConnections,
'by_host' => $byHost,
'status' => $totalConnections < $this->config['max_connections'] ? 'ok' : 'warning',
];
}
private function checkMessages()
{
$overview = $this->apiClient->getOverview();
$queueTotals = $overview['queue_totals'] ?? [];
$messageStats = $overview['message_stats'] ?? [];
return [
'total' => $queueTotals['messages'] ?? 0,
'ready' => $queueTotals['messages_ready'] ?? 0,
'unacked' => $queueTotals['messages_unacked'] ?? 0,
'publish_rate' => $messageStats['publish_details']['rate'] ?? 0,
'consume_rate' => $messageStats['consume_details']['rate'] ?? 0,
'ack_rate' => $messageStats['ack_details']['rate'] ?? 0,
];
}
private function generateSummary($results)
{
$issues = [];
foreach ($results['nodes'] as $name => $node) {
if ($node['status'] === 'unhealthy') {
$issues[] = "节点 {$name}: " . implode(', ', $node['issues']);
}
}
foreach ($results['queues'] as $name => $queue) {
$issues[] = "队列 {$name}: " . implode(', ', $queue['issues']);
}
return [
'status' => empty($issues) ? 'healthy' : 'issues_found',
'issues_count' => count($issues),
'issues' => $issues,
];
}
public function cleanupIdleQueues($maxAge = 3600)
{
$queues = $this->apiClient->getQueues();
$cleaned = [];
foreach ($queues as $queue) {
if ($queue['consumers'] === 0 && ($queue['messages'] ?? 0) === 0) {
$idleSince = $queue['idle_since'] ?? null;
if ($idleSince) {
$idleTime = strtotime($idleSince);
if (time() - $idleTime > $maxAge) {
$this->apiClient->deleteQueue($queue['vhost'], $queue['name']);
$cleaned[] = $queue['name'];
$this->logger->info("Cleaned idle queue: {$queue['name']}");
}
}
}
}
return $cleaned;
}
public function closeIdleConnections($maxIdleTime = 3600)
{
$connections = $this->apiClient->getConnections();
$closed = [];
foreach ($connections as $conn) {
if (($conn['channels'] ?? 0) === 0) {
$connectedAt = $conn['connected_at'] ?? null;
if ($connectedAt) {
$connectedTime = strtotime($connectedAt);
if (time() - $connectedTime > $maxIdleTime) {
$this->apiClient->closeConnection($conn['name']);
$closed[] = $conn['name'];
$this->logger->info("Closed idle connection: {$conn['name']}");
}
}
}
}
return $closed;
}
public function generateDailyReport()
{
$overview = $this->apiClient->getOverview();
$nodes = $this->apiClient->getNodes();
$queues = $this->apiClient->getQueues();
$connections = $this->apiClient->getConnections();
$report = [
'generated_at' => date('Y-m-d H:i:s'),
'cluster' => [
'name' => $overview['cluster_name'] ?? 'unknown',
'version' => $overview['rabbitmq_version'] ?? 'unknown',
'nodes' => count($nodes),
],
'statistics' => [
'queues' => count($queues),
'connections' => count($connections),
'messages_total' => $overview['queue_totals']['messages'] ?? 0,
'messages_ready' => $overview['queue_totals']['messages_ready'] ?? 0,
'messages_unacked' => $overview['queue_totals']['messages_unacked'] ?? 0,
],
'rates' => [
'publish' => $overview['message_stats']['publish_details']['rate'] ?? 0,
'consume' => $overview['message_stats']['consume_details']['rate'] ?? 0,
'ack' => $overview['message_stats']['ack_details']['rate'] ?? 0,
],
'resources' => [],
];
foreach ($nodes as $node) {
$report['resources'][$node['name']] = [
'memory_percent' => round(($node['mem_used'] / $node['mem_limit']) * 100, 2),
'disk_free_gb' => round($node['disk_free'] / 1024 / 1024 / 1024, 2),
'fd_percent' => round(($node['fd_used'] / $node['fd_total']) * 100, 2),
];
}
return $report;
}
}Shell 日常巡检脚本
bash
#!/bin/bash
/opt/rabbitmq/scripts/daily_check.sh
LOG_FILE="/var/log/rabbitmq/daily_check.log"
ALERT_FILE="/var/log/rabbitmq/alerts.log"
log() {
echo "[$(date '+%Y-%m-%d %H:%M:%S')] $1" | tee -a "$LOG_FILE"
}
alert() {
local level="$1"
local message="$2"
echo "[${level}] $(date '+%Y-%m-%d %H:%M:%S') - ${message}" | tee -a "$ALERT_FILE"
}
log "========== RabbitMQ 日常巡检开始 =========="
log "1. 检查节点状态..."
rabbitmqctl status > /dev/null 2>&1
if [ $? -eq 0 ]; then
log "节点状态: 正常"
else
alert "CRITICAL" "节点状态异常"
fi
log "2. 检查集群状态..."
rabbitmqctl cluster_status | grep -q "running_nodes"
if [ $? -eq 0 ]; then
log "集群状态: 正常"
else
alert "WARNING" "集群状态异常"
fi
log "3. 检查内存使用..."
MEMORY_USAGE=$(rabbitmqctl status | grep -oP 'memory:\s*\K[0-9]+')
MEMORY_LIMIT=$(rabbitmqctl status | grep -oP 'memory_limit:\s*\K[0-9]+')
if [ -n "$MEMORY_USAGE" ] && [ -n "$MEMORY_LIMIT" ]; then
PERCENT=$((MEMORY_USAGE * 100 / MEMORY_LIMIT))
log "内存使用: ${PERCENT}%"
if [ $PERCENT -gt 80 ]; then
alert "WARNING" "内存使用率过高: ${PERCENT}%"
fi
fi
log "4. 检查磁盘空间..."
DISK_FREE=$(df -h /var/lib/rabbitmq | awk 'NR==2 {print $4}')
log "磁盘可用: ${DISK_FREE}"
log "5. 检查队列状态..."
QUEUE_COUNT=$(rabbitmqctl list_queues name | wc -l)
log "队列数量: ${QUEUE_COUNT}"
log "6. 检查消息堆积..."
rabbitmqctl list_queues name messages | while read queue messages; do
if [ "$messages" -gt 10000 ]; then
alert "WARNING" "队列 ${queue} 消息堆积: ${messages}"
fi
done
log "7. 检查连接数..."
CONNECTION_COUNT=$(rabbitmqctl list_connections | wc -l)
log "连接数: ${CONNECTION_COUNT}"
log "8. 检查消费者状态..."
rabbitmqctl list_queues name consumers | while read queue consumers; do
if [ "$consumers" -eq 0 ]; then
alert "INFO" "队列 ${queue} 无消费者"
fi
done
log "9. 检查日志错误..."
ERROR_COUNT=$(grep -c "error" /var/log/rabbitmq/rabbit.log 2>/dev/null || echo 0)
log "错误日志数: ${ERROR_COUNT}"
log "========== RabbitMQ 日常巡检完成 =========="
exit 0定时任务配置
cron
/etc/cron.d/rabbitmq-ops
*/5 * * * * rabbitmq /opt/rabbitmq/scripts/health_check.sh >> /var/log/rabbitmq/health.log 2>&1
0 * * * * rabbitmq /opt/rabbitmq/scripts/hourly_check.sh >> /var/log/rabbitmq/hourly.log 2>&1
0 8 * * * rabbitmq /opt/rabbitmq/scripts/daily_check.sh >> /var/log/rabbitmq/daily.log 2>&1
0 0 * * 0 rabbitmq /opt/rabbitmq/scripts/weekly_cleanup.sh >> /var/log/rabbitmq/weekly.log 2>&1
0 0 1 * * rabbitmq /opt/rabbitmq/scripts/monthly_report.sh >> /var/log/rabbitmq/monthly.log 2>&1实际应用场景
场景一:自动化运维平台
php
<?php
class RabbitMQOpsPlatform
{
private $ops;
private $db;
public function __construct(RabbitMQDailyOps $ops, PDO $db)
{
$this->ops = $ops;
$this->db = $db;
}
public function runScheduledTasks($type)
{
$tasks = [
'hourly' => [$this, 'runHourlyTasks'],
'daily' => [$this, 'runDailyTasks'],
'weekly' => [$this, 'runWeeklyTasks'],
];
$task = $tasks[$type] ?? null;
if ($task) {
return call_user_func($task);
}
return false;
}
private function runHourlyTasks()
{
$results = [];
$results['health_check'] = $this->ops->runHealthCheck();
$this->recordMetrics($results['health_check']);
return $results;
}
private function runDailyTasks()
{
$results = [];
$results['health_check'] = $this->ops->runHealthCheck();
$results['cleanup_queues'] = $this->ops->cleanupIdleQueues(86400);
$results['cleanup_connections'] = $this->ops->closeIdleConnections(86400);
$results['daily_report'] = $this->ops->generateDailyReport();
$this->saveDailyReport($results['daily_report']);
return $results;
}
private function runWeeklyTasks()
{
$results = [];
$results['health_check'] = $this->ops->runHealthCheck();
$results['cleanup_queues'] = $this->ops->cleanupIdleQueues(604800);
return $results;
}
private function recordMetrics($healthCheck)
{
$sql = "INSERT INTO rabbitmq_metrics (
recorded_at, status, issues_count,
total_messages, total_connections, total_queues
) VALUES (NOW(), :status, :issues_count, :total_messages, :total_connections, :total_queues)";
$stmt = $this->db->prepare($sql);
$stmt->execute([
'status' => $healthCheck['summary']['status'],
'issues_count' => $healthCheck['summary']['issues_count'],
'total_messages' => $healthCheck['messages']['total'],
'total_connections' => $healthCheck['connections']['total'],
'total_queues' => count($healthCheck['queues']),
]);
}
private function saveDailyReport($report)
{
$sql = "INSERT INTO rabbitmq_daily_reports (
report_date, cluster_name, cluster_version,
node_count, queue_count, connection_count,
messages_total, messages_ready, messages_unacked,
publish_rate, consume_rate, ack_rate, resources
) VALUES (
CURDATE(), :cluster_name, :cluster_version,
:node_count, :queue_count, :connection_count,
:messages_total, :messages_ready, :messages_unacked,
:publish_rate, :consume_rate, :ack_rate, :resources
)";
$stmt = $this->db->prepare($sql);
$stmt->execute([
'cluster_name' => $report['cluster']['name'],
'cluster_version' => $report['cluster']['version'],
'node_count' => $report['cluster']['nodes'],
'queue_count' => $report['statistics']['queues'],
'connection_count' => $report['statistics']['connections'],
'messages_total' => $report['statistics']['messages_total'],
'messages_ready' => $report['statistics']['messages_ready'],
'messages_unacked' => $report['statistics']['messages_unacked'],
'publish_rate' => $report['rates']['publish'],
'consume_rate' => $report['rates']['consume'],
'ack_rate' => $report['rates']['ack'],
'resources' => json_encode($report['resources']),
]);
}
}场景二:运维仪表板
php
<?php
class OpsDashboard
{
private $ops;
private $db;
public function __construct(RabbitMQDailyOps $ops, PDO $db)
{
$this->ops = $ops;
$this->db = $db;
}
public function getDashboardData()
{
return [
'current_status' => $this->ops->runHealthCheck(),
'metrics_trend' => $this->getMetricsTrend(24),
'recent_alerts' => $this->getRecentAlerts(10),
'top_queues' => $this->getTopQueues(10),
'daily_summary' => $this->getDailySummary(),
];
}
private function getMetricsTrend($hours)
{
$sql = "SELECT * FROM rabbitmq_metrics
WHERE recorded_at >= DATE_SUB(NOW(), INTERVAL :hours HOUR)
ORDER BY recorded_at ASC";
$stmt = $this->db->prepare($sql);
$stmt->execute(['hours' => $hours]);
return $stmt->fetchAll(PDO::FETCH_ASSOC);
}
private function getRecentAlerts($limit)
{
$sql = "SELECT * FROM rabbitmq_alerts
ORDER BY created_at DESC
LIMIT :limit";
$stmt = $this->db->prepare($sql);
$stmt->execute(['limit' => $limit]);
return $stmt->fetchAll(PDO::FETCH_ASSOC);
}
private function getTopQueues($limit)
{
$queues = $this->ops->apiClient->getQueues();
usort($queues, function($a, $b) {
return ($b['messages'] ?? 0) <=> ($a['messages'] ?? 0);
});
return array_slice($queues, 0, $limit);
}
private function getDailySummary()
{
$sql = "SELECT * FROM rabbitmq_daily_reports
WHERE report_date = CURDATE()";
$stmt = $this->db->prepare($sql);
$stmt->execute();
return $stmt->fetch(PDO::FETCH_ASSOC);
}
}常见问题与解决方案
问题一:队列清理误删
现象:清理空闲队列时误删了有用的队列。
解决方案:
php
public function cleanupIdleQueues($maxAge = 3600, $whitelist = [])
{
$queues = $this->apiClient->getQueues();
$cleaned = [];
foreach ($queues as $queue) {
if (in_array($queue['name'], $whitelist)) {
continue;
}
if ($queue['consumers'] === 0 && ($queue['messages'] ?? 0) === 0) {
$idleSince = $queue['idle_since'] ?? null;
if ($idleSince && time() - strtotime($idleSince) > $maxAge) {
$this->apiClient->deleteQueue($queue['vhost'], $queue['name']);
$cleaned[] = $queue['name'];
}
}
}
return $cleaned;
}问题二:巡检脚本超时
现象:巡检脚本执行时间过长。
解决方案:
php
class TimeoutHealthCheck
{
private $timeout = 30;
public function checkWithTimeout($callback)
{
$startTime = microtime(true);
$result = $callback();
$elapsed = microtime(true) - $startTime;
if ($elapsed > $this->timeout) {
$this->logger->warning("Health check took too long: {$elapsed}s");
}
return $result;
}
}问题三:日志文件过大
现象:运维日志文件占用过多空间。
解决方案:
bash
/var/log/rabbitmq/ops.log {
daily
rotate 7
compress
missingok
notifempty
size 100M
}最佳实践
1. 运维任务时间表
| 时间 | 任务 | 负责人 |
|---|---|---|
| 每小时 | 健康检查 | 自动化 |
| 每天 8:00 | 日常巡检 | 自动化 |
| 每周一 | 清理维护 | 运维人员 |
| 每月 1 日 | 深度检查 | 架构师 |
2. 运维检查清单
日常检查:
□ 节点运行状态
□ 内存使用率
□ 磁盘空间
□ 队列消息数
□ 连接数
□ 消费者状态
每周检查:
□ 日志分析
□ 性能趋势
□ 备份验证
□ 清理空闲资源
每月检查:
□ 容量评估
□ 安全审计
□ 版本更新
□ 文档更新3. 自动化程度建议
| 任务类型 | 自动化程度 |
|---|---|
| 监控检查 | 100% |
| 数据收集 | 100% |
| 告警通知 | 100% |
| 日常清理 | 80% |
| 问题处理 | 20% |
