Skip to content

诊断工具与方法

概述

RabbitMQ 提供了丰富的诊断工具和方法,帮助运维人员快速定位和解决问题。本文档将介绍常用的诊断工具、命令和方法论。

诊断工具概览

┌─────────────────────────────────────────────────────────────┐
│                    RabbitMQ 诊断工具                         │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐         │
│  │rabbitmqctl  │  │rabbitmq-    │  │ Management  │         │
│  │ 命令行工具  │  │ diagnostics │  │    UI       │         │
│  └─────────────┘  └─────────────┘  └─────────────┘         │
│                                                             │
│  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐         │
│  │ HTTP API    │  │ 日志分析    │  │ Prometheus  │         │
│  │   接口      │  │   工具      │  │   监控      │         │
│  └─────────────┘  └─────────────┘  └─────────────┘         │
│                                                             │
└─────────────────────────────────────────────────────────────┘

命令行诊断工具

1. rabbitmqctl 常用命令

bash
# 查看节点状态
rabbitmqctl status

# 查看环境信息
rabbitmqctl environment

# 查看集群状态
rabbitmqctl cluster_status

# 查看用户列表
rabbitmqctl list_users

# 查看虚拟主机
rabbitmqctl list_vhosts

2. 连接诊断命令

bash
# 查看所有连接
rabbitmqctl list_connections

# 查看连接详情
rabbitmqctl list_connections \
  name user peer_host peer_port state channels \
  recv_oct send_oct recv_cnt send_cnt \
  connected_at

# 查看连接统计
rabbitmqctl list_connections \
  name \
  recv_oct send_oct \
  recv_cnt send_cnt

# 按用户统计连接
rabbitmqctl list_connections user | sort | uniq -c

3. 通道诊断命令

bash
# 查看所有通道
rabbitmqctl list_channels

# 查看通道详情
rabbitmqctl list_channels \
  pid user vhost consumer_count \
  messages_unacked messages_uncommitted \
  acks_uncommitted prefetch_count

# 查看通道状态
rabbitmqctl list_channels name state

# 查看未确认消息
rabbitmqctl list_channels \
  name messages_unacked | \
  awk '$2 > 0 {print}'

4. 队列诊断命令

bash
# 查看所有队列
rabbitmqctl list_queues

# 查看队列详情
rabbitmqctl list_queues \
  name messages messages_ready messages_unacked \
  consumers memory durable auto_delete

# 查看消息统计
rabbitmqctl list_queues \
  name message_stats.deliver_get \
  message_stats.publish

# 查看队列内存
rabbitmqctl list_queues name memory | sort -k2 -n -r | head -20

# 查看无消费者的队列
rabbitmqctl list_queues name consumers | awk '$2 == 0 {print}'

# 查看消息堆积队列
rabbitmqctl list_queues name messages | sort -k2 -n -r | head -20

5. 交换器诊断命令

bash
# 查看所有交换器
rabbitmqctl list_exchanges

# 查看交换器详情
rabbitmqctl list_exchanges name type durable auto_delete

# 查看绑定关系
rabbitmqctl list_bindings

# 查看特定交换器的绑定
rabbitmqctl list_bindings source_name destination_name | grep "exchange_name"

6. 消费者诊断命令

bash
# 查看所有消费者
rabbitmqctl list_consumers

# 查看消费者详情
rabbitmqctl list_consumers \
  queue_name channel_pid consumer_tag \
  ack_required prefetch_count

# 统计每个队列的消费者数量
rabbitmqctl list_queues name consumers

rabbitmq-diagnostics 工具

1. 健康检查

bash
# 检查节点是否运行
rabbitmq-diagnostics check_running

# 检查节点是否就绪
rabbitmq-diagnostics check_if_node_is_quorum_critical

# 检查本地警报
rabbitmq-diagnostics check_local_alarms

# 检查网络连接
rabbitmq-diagnostics check_port_connectivity

# 检查端口监听
rabbitmq-diagnostics check_port_listening

# 检查协议版本
rabbitmq-diagnostics check_protocol_versions

2. 状态诊断

bash
# 查看节点状态
rabbitmq-diagnostics status

# 查看服务器状态
rabbitmq-diagnostics server_version

# 查看集群状态
rabbitmq-diagnostics cluster_status

# 查看内存使用
rabbitmq-diagnostics memory_breakdown

# 查看Erlang VM信息
rabbitmq-diagnostics runtime_thread_stats

# 查看系统限制
rabbitmq-diagnostics limits

3. 网络诊断

bash
# 检查网络分区
rabbitmq-diagnostics check_network_partitions

# 查看网络分区状态
rabbitmq-diagnostics partitions

# 检查节点间连接
rabbitmq-diagnostics -n rabbit@node1 check_inter_node_communication

# 查看监听端口
rabbitmq-diagnostics listeners

4. 性能诊断

bash
# 查看消息速率
rabbitmq-diagnostics observer

# 查看内存分配
rabbitmq-diagnostics memory_breakdown

# 查看IO统计
rabbitmq-diagnostics io_thread_stats

HTTP API 诊断

1. 概览API

bash
# 获取概览信息
curl -s -u guest:guest http://localhost:15672/api/overview | jq .

# 获取节点信息
curl -s -u guest:guest http://localhost:15672/api/nodes | jq .

# 获取定义信息
curl -s -u guest:guest http://localhost:15672/api/definitions | jq .

2. 连接API

bash
# 获取所有连接
curl -s -u guest:guest http://localhost:15672/api/connections | jq .

# 获取特定连接
curl -s -u guest:guest http://localhost:15672/api/connections/{name} | jq .

# 关闭连接
curl -X DELETE -u guest:guest http://localhost:15672/api/connections/{name}

# 获取连接的通道
curl -s -u guest:guest http://localhost:15672/api/connections/{name}/channels | jq .

3. 队列API

bash
# 获取所有队列
curl -s -u guest:guest http://localhost:15672/api/queues | jq .

# 获取特定队列
curl -s -u guest:guest http://localhost:15672/api/queues/{vhost}/{name} | jq .

# 获取队列消息
curl -s -u guest:guest -H "Content-Type: application/json" \
  -d '{"count": 10, "ackmode": "ack_requeue_false", "encoding": "auto"}' \
  http://localhost:15672/api/queues/{vhost}/{name}/get | jq .

# 清空队列
curl -X DELETE -u guest:guest http://localhost:15672/api/queues/{vhost}/{name}/contents

# 获取队列统计
curl -s -u guest:guest http://localhost:15672/api/queues/{vhost}/{name}/stats | jq .

4. 通道API

bash
# 获取所有通道
curl -s -u guest:guest http://localhost:15672/api/channels | jq .

# 获取特定通道
curl -s -u guest:guest http://localhost:15672/api/channels/{name} | jq .

5. 消费者API

bash
# 获取所有消费者
curl -s -u guest:guest http://localhost:15672/api/consumers | jq .

# 获取特定vhost的消费者
curl -s -u guest:guest http://localhost:15672/api/consumers/{vhost} | jq .

PHP 诊断工具类

php
<?php

class RabbitMQDiagnostics
{
    private $host;
    private $port;
    private $user;
    private $password;
    private $apiUrl;

    public function __construct(
        string $host = 'localhost',
        int $port = 15672,
        string $user = 'guest',
        string $password = 'guest'
    ) {
        $this->host = $host;
        $this->port = $port;
        $this->user = $user;
        $this->password = $password;
        $this->apiUrl = "http://{$host}:{$port}/api";
    }

    private function request(string $endpoint): array
    {
        $ch = curl_init();
        curl_setopt_array($ch, [
            CURLOPT_URL => $this->apiUrl . $endpoint,
            CURLOPT_RETURNTRANSFER => true,
            CURLOPT_USERPWD => $this->user . ':' . $this->password,
            CURLOPT_HTTPHEADER => ['Content-Type: application/json'],
            CURLOPT_TIMEOUT => 10,
        ]);
        
        $response = curl_exec($ch);
        $httpCode = curl_getinfo($ch, CURLINFO_HTTP_CODE);
        curl_close($ch);
        
        if ($httpCode !== 200) {
            throw new \RuntimeException("API请求失败: HTTP {$httpCode}");
        }
        
        return json_decode($response, true);
    }

    public function getOverview(): array
    {
        return $this->request('/overview');
    }

    public function getNodes(): array
    {
        return $this->request('/nodes');
    }

    public function getConnections(): array
    {
        return $this->request('/connections');
    }

    public function getChannels(): array
    {
        return $this->request('/channels');
    }

    public function getQueues(?string $vhost = null): array
    {
        $endpoint = $vhost ? "/queues/{$vhost}" : '/queues';
        return $this->request($endpoint);
    }

    public function getQueue(string $vhost, string $name): array
    {
        return $this->request("/queues/{$vhost}/{$name}");
    }

    public function getExchanges(?string $vhost = null): array
    {
        $endpoint = $vhost ? "/exchanges/{$vhost}" : '/exchanges';
        return $this->request($endpoint);
    }

    public function getBindings(?string $vhost = null): array
    {
        $endpoint = $vhost ? "/bindings/{$vhost}" : '/bindings';
        return $this->request($endpoint);
    }

    public function getConsumers(?string $vhost = null): array
    {
        $endpoint = $vhost ? "/consumers/{$vhost}" : '/consumers';
        return $this->request($endpoint);
    }

    public function getQueueMessages(
        string $vhost,
        string $queue,
        int $count = 10,
        bool $requeue = true
    ): array {
        $data = [
            'count' => $count,
            'ackmode' => $requeue ? 'ack_requeue_true' : 'ack_requeue_false',
            'encoding' => 'auto',
        ];
        
        $ch = curl_init();
        curl_setopt_array($ch, [
            CURLOPT_URL => $this->apiUrl . "/queues/{$vhost}/{$queue}/get",
            CURLOPT_RETURNTRANSFER => true,
            CURLOPT_USERPWD => $this->user . ':' . $this->password,
            CURLOPT_HTTPHEADER => ['Content-Type: application/json'],
            CURLOPT_POST => true,
            CURLOPT_POSTFIELDS => json_encode($data),
            CURLOPT_TIMEOUT => 10,
        ]);
        
        $response = curl_exec($ch);
        curl_close($ch);
        
        return json_decode($response, true);
    }

    public function purgeQueue(string $vhost, string $queue): bool
    {
        $ch = curl_init();
        curl_setopt_array($ch, [
            CURLOPT_URL => $this->apiUrl . "/queues/{$vhost}/{$queue}/contents",
            CURLOPT_RETURNTRANSFER => true,
            CURLOPT_USERPWD => $this->user . ':' . $this->password,
            CURLOPT_CUSTOMREQUEST => 'DELETE',
            CURLOPT_TIMEOUT => 10,
        ]);
        
        curl_exec($ch);
        $httpCode = curl_getinfo($ch, CURLINFO_HTTP_CODE);
        curl_close($ch);
        
        return $httpCode === 204;
    }

    public function closeConnection(string $connectionName): bool
    {
        $ch = curl_init();
        curl_setopt_array($ch, [
            CURLOPT_URL => $this->apiUrl . "/connections/{$connectionName}",
            CURLOPT_RETURNTRANSFER => true,
            CURLOPT_USERPWD => $this->user . ':' . $this->password,
            CURLOPT_CUSTOMREQUEST => 'DELETE',
            CURLOPT_TIMEOUT => 10,
        ]);
        
        curl_exec($ch);
        $httpCode = curl_getinfo($ch, CURLINFO_HTTP_CODE);
        curl_close($ch);
        
        return $httpCode === 204;
    }

    public function getHealthReport(): array
    {
        $overview = $this->getOverview();
        $nodes = $this->getNodes();
        $queues = $this->getQueues();
        
        $report = [
            'timestamp' => date('Y-m-d H:i:s'),
            'cluster' => [
                'name' => $overview['cluster_name'] ?? 'unknown',
                'rabbitmq_version' => $overview['rabbitmq_version'] ?? 'unknown',
                'erlang_version' => $overview['erlang_version'] ?? 'unknown',
            ],
            'statistics' => [
                'connections' => $overview['object_totals']['connections'] ?? 0,
                'channels' => $overview['object_totals']['channels'] ?? 0,
                'queues' => $overview['object_totals']['queues'] ?? 0,
                'consumers' => $overview['object_totals']['consumers'] ?? 0,
                'messages' => $overview['queue_totals']['messages'] ?? 0,
                'messages_ready' => $overview['queue_totals']['messages_ready'] ?? 0,
                'messages_unacked' => $overview['queue_totals']['messages_unacked'] ?? 0,
            ],
            'message_rates' => [
                'publish' => $overview['message_stats']['publish_details']['rate'] ?? 0,
                'deliver' => $overview['message_stats']['deliver_get_details']['rate'] ?? 0,
                'ack' => $overview['message_stats']['ack_details']['rate'] ?? 0,
            ],
            'nodes' => [],
            'alerts' => [],
        ];
        
        foreach ($nodes as $node) {
            $report['nodes'][] = [
                'name' => $node['name'],
                'type' => $node['type'],
                'running' => $node['running'] ?? false,
                'mem_used' => $node['mem_used'] ?? 0,
                'mem_limit' => $node['mem_limit'] ?? 0,
                'mem_percent' => round(($node['mem_used'] ?? 0) / ($node['mem_limit'] ?? 1) * 100, 2),
                'fd_used' => $node['fd_used'] ?? 0,
                'fd_total' => $node['fd_total'] ?? 0,
                'sockets_used' => $node['sockets_used'] ?? 0,
                'sockets_total' => $node['sockets_total'] ?? 0,
                'disk_free' => $node['disk_free'] ?? 0,
                'disk_free_limit' => $node['disk_free_limit'] ?? 0,
            ];
        }
        
        foreach ($queues as $queue) {
            if (($queue['messages'] ?? 0) > 10000) {
                $report['alerts'][] = [
                    'level' => 'warning',
                    'type' => 'queue_backlog',
                    'queue' => $queue['name'],
                    'messages' => $queue['messages'],
                    'message' => "队列 {$queue['name']} 有 {$queue['messages']} 条消息积压",
                ];
            }
            
            if (($queue['consumers'] ?? 0) === 0 && ($queue['messages'] ?? 0) > 0) {
                $report['alerts'][] = [
                    'level' => 'warning',
                    'type' => 'no_consumer',
                    'queue' => $queue['name'],
                    'message' => "队列 {$queue['name']} 无消费者",
                ];
            }
        }
        
        foreach ($nodes as $node) {
            $memPercent = ($node['mem_used'] ?? 0) / ($node['mem_limit'] ?? 1) * 100;
            if ($memPercent > 80) {
                $report['alerts'][] = [
                    'level' => 'critical',
                    'type' => 'memory_high',
                    'node' => $node['name'],
                    'percent' => round($memPercent, 2),
                    'message' => "节点 {$node['name']} 内存使用率 {$memPercent}%",
                ];
            }
            
            $diskFree = $node['disk_free'] ?? 0;
            $diskLimit = $node['disk_free_limit'] ?? 0;
            if ($diskFree < $diskLimit * 2) {
                $report['alerts'][] = [
                    'level' => 'critical',
                    'type' => 'disk_low',
                    'node' => $node['name'],
                    'disk_free' => $diskFree,
                    'message' => "节点 {$node['name']} 磁盘空间不足",
                ];
            }
        }
        
        return $report;
    }
}

// 使用示例
$diagnostics = new RabbitMQDiagnostics();

$health = $diagnostics->getHealthReport();
echo json_encode($health, JSON_PRETTY_PRINT | JSON_UNESCAPED_UNICODE);

诊断方法论

1. 问题排查流程

                    ┌─────────────────┐
                    │   发现问题      │
                    └────────┬────────┘


                    ┌─────────────────┐
                    │   确认症状      │
                    │  收集信息       │
                    └────────┬────────┘


                    ┌─────────────────┐
                    │   检查日志      │
                    │  查看监控       │
                    └────────┬────────┘

              ┌──────────────┼──────────────┐
              ▼              ▼              ▼
       ┌────────────┐ ┌────────────┐ ┌────────────┐
       │ 连接问题   │ │ 队列问题   │ │ 性能问题   │
       └─────┬──────┘ └─────┬──────┘ └─────┬──────┘
             │              │              │
              └──────────────┼──────────────┘


                    ┌─────────────────┐
                    │   定位原因      │
                    └────────┬────────┘


                    ┌─────────────────┐
                    │   制定方案      │
                    └────────┬────────┘


                    ┌─────────────────┐
                    │   实施解决      │
                    └────────┬────────┘


                    ┌─────────────────┐
                    │   验证结果      │
                    └─────────────────┘

2. 常用诊断脚本

bash
#!/bin/bash
# rabbitmq_diagnostics.sh

echo "=== RabbitMQ 综合诊断报告 ==="
echo "生成时间: $(date '+%Y-%m-%d %H:%M:%S')"

echo -e "\n[1] 节点状态"
rabbitmqctl status 2>/dev/null | head -20

echo -e "\n[2] 集群状态"
rabbitmqctl cluster_status

echo -e "\n[3] 连接统计"
echo "总连接数: $(rabbitmqctl list_connections | wc -l)"
echo "按用户统计:"
rabbitmqctl list_connections user 2>/dev/null | sort | uniq -c | sort -rn

echo -e "\n[4] 队列统计"
echo "总队列数: $(rabbitmqctl list_queues name | wc -l)"
echo "消息堆积TOP10:"
rabbitmqctl list_queues name messages 2>/dev/null | sort -k2 -rn | head -10

echo -e "\n[5] 消费者统计"
echo "无消费者队列:"
rabbitmqctl list_queues name consumers 2>/dev/null | awk '$2 == 0 {print $1}'

echo -e "\n[6] 内存使用"
rabbitmqctl status 2>/dev/null | grep -A 10 "Memory"

echo -e "\n[7] 磁盘空间"
df -h /var/lib/rabbitmq

echo -e "\n[8] 告警状态"
rabbitmqctl list_node_names | while read node; do
    echo "节点: $node"
    rabbitmqctl -n $node eval 'rabbit_alarm:get_alarms().' 2>/dev/null
done

echo -e "\n[9] 网络分区"
rabbitmqctl cluster_status | grep -A 5 "Partitions"

echo -e "\n[10] 最近错误日志"
tail -50 /var/log/rabbitmq/rabbit@*.log | grep -i "error\|exception\|failed" | tail -10

echo -e "\n诊断报告完成"

3. 实时监控脚本

bash
#!/bin/bash
# rabbitmq_watch.sh

while true; do
    clear
    echo "=== RabbitMQ 实时监控 ==="
    echo "时间: $(date '+%Y-%m-%d %H:%M:%S')"
    echo ""
    
    connections=$(rabbitmqctl list_connections 2>/dev/null | wc -l)
    channels=$(rabbitmqctl list_channels 2>/dev/null | wc -l)
    queues=$(rabbitmqctl list_queues name 2>/dev/null | wc -l)
    messages=$(rabbitmqctl list_queues messages 2>/dev/null | awk '{sum+=$1} END {print sum}')
    
    echo "连接数: $connections"
    echo "通道数: $channels"
    echo "队列数: $queues"
    echo "总消息: $messages"
    echo ""
    
    echo "消息堆积队列:"
    rabbitmqctl list_queues name messages 2>/dev/null | sort -k2 -rn | head -5
    echo ""
    
    echo "内存使用:"
    rabbitmqctl status 2>/dev/null | grep "memory:" | head -1
    
    sleep 5
done

注意事项

  1. 诊断工具可能影响性能:生产环境谨慎使用
  2. API调用需要权限:确保用户有足够权限
  3. 日志文件可能很大:使用grep过滤关键信息
  4. 定期保存诊断报告:便于历史对比分析
  5. 建立诊断知识库:记录常见问题和解决方案

相关链接