Appearance
诊断工具与方法
概述
RabbitMQ 提供了丰富的诊断工具和方法,帮助运维人员快速定位和解决问题。本文档将介绍常用的诊断工具、命令和方法论。
诊断工具概览
┌─────────────────────────────────────────────────────────────┐
│ RabbitMQ 诊断工具 │
├─────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │rabbitmqctl │ │rabbitmq- │ │ Management │ │
│ │ 命令行工具 │ │ diagnostics │ │ UI │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
│ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ HTTP API │ │ 日志分析 │ │ Prometheus │ │
│ │ 接口 │ │ 工具 │ │ 监控 │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘命令行诊断工具
1. rabbitmqctl 常用命令
bash
# 查看节点状态
rabbitmqctl status
# 查看环境信息
rabbitmqctl environment
# 查看集群状态
rabbitmqctl cluster_status
# 查看用户列表
rabbitmqctl list_users
# 查看虚拟主机
rabbitmqctl list_vhosts2. 连接诊断命令
bash
# 查看所有连接
rabbitmqctl list_connections
# 查看连接详情
rabbitmqctl list_connections \
name user peer_host peer_port state channels \
recv_oct send_oct recv_cnt send_cnt \
connected_at
# 查看连接统计
rabbitmqctl list_connections \
name \
recv_oct send_oct \
recv_cnt send_cnt
# 按用户统计连接
rabbitmqctl list_connections user | sort | uniq -c3. 通道诊断命令
bash
# 查看所有通道
rabbitmqctl list_channels
# 查看通道详情
rabbitmqctl list_channels \
pid user vhost consumer_count \
messages_unacked messages_uncommitted \
acks_uncommitted prefetch_count
# 查看通道状态
rabbitmqctl list_channels name state
# 查看未确认消息
rabbitmqctl list_channels \
name messages_unacked | \
awk '$2 > 0 {print}'4. 队列诊断命令
bash
# 查看所有队列
rabbitmqctl list_queues
# 查看队列详情
rabbitmqctl list_queues \
name messages messages_ready messages_unacked \
consumers memory durable auto_delete
# 查看消息统计
rabbitmqctl list_queues \
name message_stats.deliver_get \
message_stats.publish
# 查看队列内存
rabbitmqctl list_queues name memory | sort -k2 -n -r | head -20
# 查看无消费者的队列
rabbitmqctl list_queues name consumers | awk '$2 == 0 {print}'
# 查看消息堆积队列
rabbitmqctl list_queues name messages | sort -k2 -n -r | head -205. 交换器诊断命令
bash
# 查看所有交换器
rabbitmqctl list_exchanges
# 查看交换器详情
rabbitmqctl list_exchanges name type durable auto_delete
# 查看绑定关系
rabbitmqctl list_bindings
# 查看特定交换器的绑定
rabbitmqctl list_bindings source_name destination_name | grep "exchange_name"6. 消费者诊断命令
bash
# 查看所有消费者
rabbitmqctl list_consumers
# 查看消费者详情
rabbitmqctl list_consumers \
queue_name channel_pid consumer_tag \
ack_required prefetch_count
# 统计每个队列的消费者数量
rabbitmqctl list_queues name consumersrabbitmq-diagnostics 工具
1. 健康检查
bash
# 检查节点是否运行
rabbitmq-diagnostics check_running
# 检查节点是否就绪
rabbitmq-diagnostics check_if_node_is_quorum_critical
# 检查本地警报
rabbitmq-diagnostics check_local_alarms
# 检查网络连接
rabbitmq-diagnostics check_port_connectivity
# 检查端口监听
rabbitmq-diagnostics check_port_listening
# 检查协议版本
rabbitmq-diagnostics check_protocol_versions2. 状态诊断
bash
# 查看节点状态
rabbitmq-diagnostics status
# 查看服务器状态
rabbitmq-diagnostics server_version
# 查看集群状态
rabbitmq-diagnostics cluster_status
# 查看内存使用
rabbitmq-diagnostics memory_breakdown
# 查看Erlang VM信息
rabbitmq-diagnostics runtime_thread_stats
# 查看系统限制
rabbitmq-diagnostics limits3. 网络诊断
bash
# 检查网络分区
rabbitmq-diagnostics check_network_partitions
# 查看网络分区状态
rabbitmq-diagnostics partitions
# 检查节点间连接
rabbitmq-diagnostics -n rabbit@node1 check_inter_node_communication
# 查看监听端口
rabbitmq-diagnostics listeners4. 性能诊断
bash
# 查看消息速率
rabbitmq-diagnostics observer
# 查看内存分配
rabbitmq-diagnostics memory_breakdown
# 查看IO统计
rabbitmq-diagnostics io_thread_statsHTTP API 诊断
1. 概览API
bash
# 获取概览信息
curl -s -u guest:guest http://localhost:15672/api/overview | jq .
# 获取节点信息
curl -s -u guest:guest http://localhost:15672/api/nodes | jq .
# 获取定义信息
curl -s -u guest:guest http://localhost:15672/api/definitions | jq .2. 连接API
bash
# 获取所有连接
curl -s -u guest:guest http://localhost:15672/api/connections | jq .
# 获取特定连接
curl -s -u guest:guest http://localhost:15672/api/connections/{name} | jq .
# 关闭连接
curl -X DELETE -u guest:guest http://localhost:15672/api/connections/{name}
# 获取连接的通道
curl -s -u guest:guest http://localhost:15672/api/connections/{name}/channels | jq .3. 队列API
bash
# 获取所有队列
curl -s -u guest:guest http://localhost:15672/api/queues | jq .
# 获取特定队列
curl -s -u guest:guest http://localhost:15672/api/queues/{vhost}/{name} | jq .
# 获取队列消息
curl -s -u guest:guest -H "Content-Type: application/json" \
-d '{"count": 10, "ackmode": "ack_requeue_false", "encoding": "auto"}' \
http://localhost:15672/api/queues/{vhost}/{name}/get | jq .
# 清空队列
curl -X DELETE -u guest:guest http://localhost:15672/api/queues/{vhost}/{name}/contents
# 获取队列统计
curl -s -u guest:guest http://localhost:15672/api/queues/{vhost}/{name}/stats | jq .4. 通道API
bash
# 获取所有通道
curl -s -u guest:guest http://localhost:15672/api/channels | jq .
# 获取特定通道
curl -s -u guest:guest http://localhost:15672/api/channels/{name} | jq .5. 消费者API
bash
# 获取所有消费者
curl -s -u guest:guest http://localhost:15672/api/consumers | jq .
# 获取特定vhost的消费者
curl -s -u guest:guest http://localhost:15672/api/consumers/{vhost} | jq .PHP 诊断工具类
php
<?php
class RabbitMQDiagnostics
{
private $host;
private $port;
private $user;
private $password;
private $apiUrl;
public function __construct(
string $host = 'localhost',
int $port = 15672,
string $user = 'guest',
string $password = 'guest'
) {
$this->host = $host;
$this->port = $port;
$this->user = $user;
$this->password = $password;
$this->apiUrl = "http://{$host}:{$port}/api";
}
private function request(string $endpoint): array
{
$ch = curl_init();
curl_setopt_array($ch, [
CURLOPT_URL => $this->apiUrl . $endpoint,
CURLOPT_RETURNTRANSFER => true,
CURLOPT_USERPWD => $this->user . ':' . $this->password,
CURLOPT_HTTPHEADER => ['Content-Type: application/json'],
CURLOPT_TIMEOUT => 10,
]);
$response = curl_exec($ch);
$httpCode = curl_getinfo($ch, CURLINFO_HTTP_CODE);
curl_close($ch);
if ($httpCode !== 200) {
throw new \RuntimeException("API请求失败: HTTP {$httpCode}");
}
return json_decode($response, true);
}
public function getOverview(): array
{
return $this->request('/overview');
}
public function getNodes(): array
{
return $this->request('/nodes');
}
public function getConnections(): array
{
return $this->request('/connections');
}
public function getChannels(): array
{
return $this->request('/channels');
}
public function getQueues(?string $vhost = null): array
{
$endpoint = $vhost ? "/queues/{$vhost}" : '/queues';
return $this->request($endpoint);
}
public function getQueue(string $vhost, string $name): array
{
return $this->request("/queues/{$vhost}/{$name}");
}
public function getExchanges(?string $vhost = null): array
{
$endpoint = $vhost ? "/exchanges/{$vhost}" : '/exchanges';
return $this->request($endpoint);
}
public function getBindings(?string $vhost = null): array
{
$endpoint = $vhost ? "/bindings/{$vhost}" : '/bindings';
return $this->request($endpoint);
}
public function getConsumers(?string $vhost = null): array
{
$endpoint = $vhost ? "/consumers/{$vhost}" : '/consumers';
return $this->request($endpoint);
}
public function getQueueMessages(
string $vhost,
string $queue,
int $count = 10,
bool $requeue = true
): array {
$data = [
'count' => $count,
'ackmode' => $requeue ? 'ack_requeue_true' : 'ack_requeue_false',
'encoding' => 'auto',
];
$ch = curl_init();
curl_setopt_array($ch, [
CURLOPT_URL => $this->apiUrl . "/queues/{$vhost}/{$queue}/get",
CURLOPT_RETURNTRANSFER => true,
CURLOPT_USERPWD => $this->user . ':' . $this->password,
CURLOPT_HTTPHEADER => ['Content-Type: application/json'],
CURLOPT_POST => true,
CURLOPT_POSTFIELDS => json_encode($data),
CURLOPT_TIMEOUT => 10,
]);
$response = curl_exec($ch);
curl_close($ch);
return json_decode($response, true);
}
public function purgeQueue(string $vhost, string $queue): bool
{
$ch = curl_init();
curl_setopt_array($ch, [
CURLOPT_URL => $this->apiUrl . "/queues/{$vhost}/{$queue}/contents",
CURLOPT_RETURNTRANSFER => true,
CURLOPT_USERPWD => $this->user . ':' . $this->password,
CURLOPT_CUSTOMREQUEST => 'DELETE',
CURLOPT_TIMEOUT => 10,
]);
curl_exec($ch);
$httpCode = curl_getinfo($ch, CURLINFO_HTTP_CODE);
curl_close($ch);
return $httpCode === 204;
}
public function closeConnection(string $connectionName): bool
{
$ch = curl_init();
curl_setopt_array($ch, [
CURLOPT_URL => $this->apiUrl . "/connections/{$connectionName}",
CURLOPT_RETURNTRANSFER => true,
CURLOPT_USERPWD => $this->user . ':' . $this->password,
CURLOPT_CUSTOMREQUEST => 'DELETE',
CURLOPT_TIMEOUT => 10,
]);
curl_exec($ch);
$httpCode = curl_getinfo($ch, CURLINFO_HTTP_CODE);
curl_close($ch);
return $httpCode === 204;
}
public function getHealthReport(): array
{
$overview = $this->getOverview();
$nodes = $this->getNodes();
$queues = $this->getQueues();
$report = [
'timestamp' => date('Y-m-d H:i:s'),
'cluster' => [
'name' => $overview['cluster_name'] ?? 'unknown',
'rabbitmq_version' => $overview['rabbitmq_version'] ?? 'unknown',
'erlang_version' => $overview['erlang_version'] ?? 'unknown',
],
'statistics' => [
'connections' => $overview['object_totals']['connections'] ?? 0,
'channels' => $overview['object_totals']['channels'] ?? 0,
'queues' => $overview['object_totals']['queues'] ?? 0,
'consumers' => $overview['object_totals']['consumers'] ?? 0,
'messages' => $overview['queue_totals']['messages'] ?? 0,
'messages_ready' => $overview['queue_totals']['messages_ready'] ?? 0,
'messages_unacked' => $overview['queue_totals']['messages_unacked'] ?? 0,
],
'message_rates' => [
'publish' => $overview['message_stats']['publish_details']['rate'] ?? 0,
'deliver' => $overview['message_stats']['deliver_get_details']['rate'] ?? 0,
'ack' => $overview['message_stats']['ack_details']['rate'] ?? 0,
],
'nodes' => [],
'alerts' => [],
];
foreach ($nodes as $node) {
$report['nodes'][] = [
'name' => $node['name'],
'type' => $node['type'],
'running' => $node['running'] ?? false,
'mem_used' => $node['mem_used'] ?? 0,
'mem_limit' => $node['mem_limit'] ?? 0,
'mem_percent' => round(($node['mem_used'] ?? 0) / ($node['mem_limit'] ?? 1) * 100, 2),
'fd_used' => $node['fd_used'] ?? 0,
'fd_total' => $node['fd_total'] ?? 0,
'sockets_used' => $node['sockets_used'] ?? 0,
'sockets_total' => $node['sockets_total'] ?? 0,
'disk_free' => $node['disk_free'] ?? 0,
'disk_free_limit' => $node['disk_free_limit'] ?? 0,
];
}
foreach ($queues as $queue) {
if (($queue['messages'] ?? 0) > 10000) {
$report['alerts'][] = [
'level' => 'warning',
'type' => 'queue_backlog',
'queue' => $queue['name'],
'messages' => $queue['messages'],
'message' => "队列 {$queue['name']} 有 {$queue['messages']} 条消息积压",
];
}
if (($queue['consumers'] ?? 0) === 0 && ($queue['messages'] ?? 0) > 0) {
$report['alerts'][] = [
'level' => 'warning',
'type' => 'no_consumer',
'queue' => $queue['name'],
'message' => "队列 {$queue['name']} 无消费者",
];
}
}
foreach ($nodes as $node) {
$memPercent = ($node['mem_used'] ?? 0) / ($node['mem_limit'] ?? 1) * 100;
if ($memPercent > 80) {
$report['alerts'][] = [
'level' => 'critical',
'type' => 'memory_high',
'node' => $node['name'],
'percent' => round($memPercent, 2),
'message' => "节点 {$node['name']} 内存使用率 {$memPercent}%",
];
}
$diskFree = $node['disk_free'] ?? 0;
$diskLimit = $node['disk_free_limit'] ?? 0;
if ($diskFree < $diskLimit * 2) {
$report['alerts'][] = [
'level' => 'critical',
'type' => 'disk_low',
'node' => $node['name'],
'disk_free' => $diskFree,
'message' => "节点 {$node['name']} 磁盘空间不足",
];
}
}
return $report;
}
}
// 使用示例
$diagnostics = new RabbitMQDiagnostics();
$health = $diagnostics->getHealthReport();
echo json_encode($health, JSON_PRETTY_PRINT | JSON_UNESCAPED_UNICODE);诊断方法论
1. 问题排查流程
┌─────────────────┐
│ 发现问题 │
└────────┬────────┘
│
▼
┌─────────────────┐
│ 确认症状 │
│ 收集信息 │
└────────┬────────┘
│
▼
┌─────────────────┐
│ 检查日志 │
│ 查看监控 │
└────────┬────────┘
│
┌──────────────┼──────────────┐
▼ ▼ ▼
┌────────────┐ ┌────────────┐ ┌────────────┐
│ 连接问题 │ │ 队列问题 │ │ 性能问题 │
└─────┬──────┘ └─────┬──────┘ └─────┬──────┘
│ │ │
└──────────────┼──────────────┘
│
▼
┌─────────────────┐
│ 定位原因 │
└────────┬────────┘
│
▼
┌─────────────────┐
│ 制定方案 │
└────────┬────────┘
│
▼
┌─────────────────┐
│ 实施解决 │
└────────┬────────┘
│
▼
┌─────────────────┐
│ 验证结果 │
└─────────────────┘2. 常用诊断脚本
bash
#!/bin/bash
# rabbitmq_diagnostics.sh
echo "=== RabbitMQ 综合诊断报告 ==="
echo "生成时间: $(date '+%Y-%m-%d %H:%M:%S')"
echo -e "\n[1] 节点状态"
rabbitmqctl status 2>/dev/null | head -20
echo -e "\n[2] 集群状态"
rabbitmqctl cluster_status
echo -e "\n[3] 连接统计"
echo "总连接数: $(rabbitmqctl list_connections | wc -l)"
echo "按用户统计:"
rabbitmqctl list_connections user 2>/dev/null | sort | uniq -c | sort -rn
echo -e "\n[4] 队列统计"
echo "总队列数: $(rabbitmqctl list_queues name | wc -l)"
echo "消息堆积TOP10:"
rabbitmqctl list_queues name messages 2>/dev/null | sort -k2 -rn | head -10
echo -e "\n[5] 消费者统计"
echo "无消费者队列:"
rabbitmqctl list_queues name consumers 2>/dev/null | awk '$2 == 0 {print $1}'
echo -e "\n[6] 内存使用"
rabbitmqctl status 2>/dev/null | grep -A 10 "Memory"
echo -e "\n[7] 磁盘空间"
df -h /var/lib/rabbitmq
echo -e "\n[8] 告警状态"
rabbitmqctl list_node_names | while read node; do
echo "节点: $node"
rabbitmqctl -n $node eval 'rabbit_alarm:get_alarms().' 2>/dev/null
done
echo -e "\n[9] 网络分区"
rabbitmqctl cluster_status | grep -A 5 "Partitions"
echo -e "\n[10] 最近错误日志"
tail -50 /var/log/rabbitmq/rabbit@*.log | grep -i "error\|exception\|failed" | tail -10
echo -e "\n诊断报告完成"3. 实时监控脚本
bash
#!/bin/bash
# rabbitmq_watch.sh
while true; do
clear
echo "=== RabbitMQ 实时监控 ==="
echo "时间: $(date '+%Y-%m-%d %H:%M:%S')"
echo ""
connections=$(rabbitmqctl list_connections 2>/dev/null | wc -l)
channels=$(rabbitmqctl list_channels 2>/dev/null | wc -l)
queues=$(rabbitmqctl list_queues name 2>/dev/null | wc -l)
messages=$(rabbitmqctl list_queues messages 2>/dev/null | awk '{sum+=$1} END {print sum}')
echo "连接数: $connections"
echo "通道数: $channels"
echo "队列数: $queues"
echo "总消息: $messages"
echo ""
echo "消息堆积队列:"
rabbitmqctl list_queues name messages 2>/dev/null | sort -k2 -rn | head -5
echo ""
echo "内存使用:"
rabbitmqctl status 2>/dev/null | grep "memory:" | head -1
sleep 5
done注意事项
- 诊断工具可能影响性能:生产环境谨慎使用
- API调用需要权限:确保用户有足够权限
- 日志文件可能很大:使用grep过滤关键信息
- 定期保存诊断报告:便于历史对比分析
- 建立诊断知识库:记录常见问题和解决方案
