Skip to content

网络问题诊断

概述

RabbitMQ 网络问题可能导致连接断开、消息传输延迟、集群通信异常等。本文档将详细介绍网络问题的诊断方法和解决方案。

网络架构分析

1. RabbitMQ 网络通信

┌─────────────────────────────────────────────────────────────┐
│                  RabbitMQ 网络通信架构                       │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  ┌──────────┐                    ┌──────────┐              │
│  │ Producer │◄─────AMQP 5672────►│ RabbitMQ │              │
│  └──────────┘                    │  Server  │              │
│                                  └────┬─────┘              │
│  ┌──────────┐                         │                    │
│  │ Consumer │◄─────AMQP 5672──────────┤                    │
│  └──────────┘                         │                    │
│                                       │                    │
│  ┌──────────┐                         │                    │
│  │Management│◄─────HTTP 15672─────────┤                    │
│  │   Web    │                         │                    │
│  └──────────┘                         │                    │
│                                       │                    │
│                        ┌──────────────┴──────────────┐     │
│                        │     集群间通信              │     │
│                        │     Erlang 25672           │     │
│                        └──────────────┬──────────────┘     │
│                                       │                    │
│                                  ┌────┴────┐               │
│                                  │  Node2  │               │
│                                  └─────────┘               │
│                                                             │
└─────────────────────────────────────────────────────────────┘

2. 端口说明

┌─────────────────────────────────────────────────────────────┐
│                    RabbitMQ 端口说明                         │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  端口      │  协议      │  用途                             │
│  ─────────────────────────────────────────────────────────  │
│  5672      │  AMQP     │  客户端连接                       │
│  5671      │  AMQPS    │  客户端SSL连接                    │
│  15672     │  HTTP     │  管理界面                         │
│  15671     │  HTTPS    │  管理界面SSL                      │
│  25672     │  Erlang   │  集群节点间通信                   │
│  4369      │  EPMD     │  Erlang端口映射守护进程           │
│  61613     │  STOMP    │  STOMP协议                        │
│  61614     │  STOMPS   │  STOMP协议SSL                     │
│  1883      │  MQTT     │  MQTT协议                         │
│  8883      │  MQTTS    │  MQTT协议SSL                      │
│                                                             │
└─────────────────────────────────────────────────────────────┘

诊断步骤

步骤1:检查端口监听

bash
# 检查端口监听状态
netstat -tlnp | grep -E "5672|15672|25672|4369"

# 使用 ss 命令
ss -tlnp | grep -E "5672|15672|25672"

# 检查端口是否可访问
telnet localhost 5672
nc -zv localhost 5672

步骤2:检查网络连接

bash
# 查看连接状态
rabbitmqctl list_connections

# 查看连接详情
rabbitmqctl list_connections name peer_host peer_port state

# 统计连接状态
netstat -an | grep 5672 | awk '{print $6}' | sort | uniq -c

步骤3:检查网络延迟

bash
# 检查网络延迟
ping -c 10 rabbitmq-server

# 检查路由
traceroute rabbitmq-server

# 检查网络质量
mtr -r -c 100 rabbitmq-server

步骤4:检查网络带宽

bash
# 查看网络流量
iftop -i eth0

# 查看网络统计
sar -n DEV 1 10

# 查看网卡流量
cat /proc/net/dev

步骤5:检查防火墙

bash
# 检查iptables规则
iptables -L -n

# 检查firewalld规则
firewall-cmd --list-all

# 检查端口是否开放
nmap -p 5672,15672,25672 rabbitmq-server

PHP 网络诊断工具

php
<?php

class RabbitMQNetworkDiagnostics
{
    private $apiUrl;
    private $user;
    private $password;
    private $host;
    private $port;

    public function __construct(
        string $host = 'localhost',
        int $port = 15672,
        string $user = 'guest',
        string $password = 'guest'
    ) {
        $this->host = $host;
        $this->port = $port;
        $this->apiUrl = "http://{$host}:{$port}/api";
        $this->user = $user;
        $this->password = $password;
    }

    private function request(string $endpoint): array
    {
        $ch = curl_init();
        curl_setopt_array($ch, [
            CURLOPT_URL => $this->apiUrl . $endpoint,
            CURLOPT_RETURNTRANSFER => true,
            CURLOPT_USERPWD => $this->user . ':' . $this->password,
            CURLOPT_HTTPHEADER => ['Content-Type: application/json'],
            CURLOPT_TIMEOUT => 10,
            CURLOPT_CONNECTTIMEOUT => 5,
        ]);
        
        $response = curl_exec($ch);
        $httpCode = curl_getinfo($ch, CURLINFO_HTTP_CODE);
        $error = curl_error($ch);
        curl_close($ch);
        
        if ($error) {
            return ['error' => $error, 'http_code' => $httpCode];
        }
        
        return json_decode($response, true) ?? [];
    }

    public function checkConnectivity(): array
    {
        $result = [
            'timestamp' => date('Y-m-d H:i:s'),
            'host' => $this->host,
            'port' => $this->port,
            'checks' => [],
        ];
        
        $start = microtime(true);
        $overview = $this->request('/overview');
        $latency = round((microtime(true) - $start) * 1000, 2);
        
        $result['checks']['api'] = [
            'status' => isset($overview['cluster_name']) ? 'ok' : 'failed',
            'latency_ms' => $latency,
            'error' => $overview['error'] ?? null,
        ];
        
        $tcpStart = microtime(true);
        $socket = @fsockopen($this->host, 5672, $errno, $errstr, 5);
        $tcpLatency = round((microtime(true) - $tcpStart) * 1000, 2);
        
        $result['checks']['amqp_port'] = [
            'status' => $socket ? 'ok' : 'failed',
            'latency_ms' => $tcpLatency,
            'error' => $socket ? null : "{$errno}: {$errstr}",
        ];
        
        if ($socket) {
            fclose($socket);
        }
        
        return $result;
    }

    public function getConnectionAnalysis(): array
    {
        $connections = $this->request('/connections');
        
        if (isset($connections['error'])) {
            return $connections;
        }
        
        $result = [
            'timestamp' => date('Y-m-d H:i:s'),
            'total_connections' => count($connections),
            'by_state' => [],
            'by_peer_host' => [],
            'by_user' => [],
            'connections' => [],
        ];
        
        foreach ($connections as $conn) {
            $state = $conn['state'] ?? 'unknown';
            $peerHost = $conn['peer_host'] ?? 'unknown';
            $user = $conn['user'] ?? 'unknown';
            
            $result['by_state'][$state] = ($result['by_state'][$state] ?? 0) + 1;
            $result['by_peer_host'][$peerHost] = ($result['by_peer_host'][$peerHost] ?? 0) + 1;
            $result['by_user'][$user] = ($result['by_user'][$user] ?? 0) + 1;
            
            $result['connections'][] = [
                'name' => $conn['name'],
                'user' => $user,
                'peer_host' => $peerHost,
                'peer_port' => $conn['peer_port'] ?? 0,
                'state' => $state,
                'channels' => $conn['channels'] ?? 0,
                'recv_oct' => $this->formatBytes($conn['recv_oct'] ?? 0),
                'send_oct' => $this->formatBytes($conn['send_oct'] ?? 0),
                'connected_at' => isset($conn['connected_at']) ? 
                    date('Y-m-d H:i:s', strtotime($conn['connected_at'])) : 'unknown',
            ];
        }
        
        return $result;
    }

    public function getNetworkStats(): array
    {
        $overview = $this->request('/overview');
        $nodes = $this->request('/nodes');
        
        if (isset($overview['error'])) {
            return $overview;
        }
        
        $result = [
            'timestamp' => date('Y-m-d H:i:s'),
            'message_stats' => [
                'publish_rate' => $overview['message_stats']['publish_details']['rate'] ?? 0,
                'deliver_rate' => $overview['message_stats']['deliver_get_details']['rate'] ?? 0,
                'ack_rate' => $overview['message_stats']['ack_details']['rate'] ?? 0,
            ],
            'network_io' => [],
        ];
        
        foreach ($nodes as $node) {
            $result['network_io'][] = [
                'node' => $node['name'],
                'recv_bytes' => $this->formatBytes($node['recv_oct'] ?? 0),
                'send_bytes' => $this->formatBytes($node['send_oct'] ?? 0),
                'recv_count' => $node['recv_cnt'] ?? 0,
                'send_count' => $node['send_cnt'] ?? 0,
            ];
        }
        
        return $result;
    }

    public function diagnoseNetworkIssues(): array
    {
        $result = [
            'timestamp' => date('Y-m-d H:i:s'),
            'issues' => [],
            'recommendations' => [],
        ];
        
        $connectivity = $this->checkConnectivity();
        
        if (($connectivity['checks']['api']['status'] ?? '') !== 'ok') {
            $result['issues'][] = [
                'severity' => 'critical',
                'type' => 'api_unreachable',
                'message' => 'API接口无法访问',
                'details' => $connectivity['checks']['api']['error'] ?? 'Unknown error',
            ];
            $result['recommendations'][] = '检查RabbitMQ服务是否运行';
            $result['recommendations'][] = '检查防火墙是否开放15672端口';
        }
        
        if (($connectivity['checks']['amqp_port']['status'] ?? '') !== 'ok') {
            $result['issues'][] = [
                'severity' => 'critical',
                'type' => 'amqp_port_unreachable',
                'message' => 'AMQP端口无法访问',
                'details' => $connectivity['checks']['amqp_port']['error'] ?? 'Unknown error',
            ];
            $result['recommendations'][] = '检查RabbitMQ服务是否运行';
            $result['recommendations'][] = '检查防火墙是否开放5672端口';
        }
        
        $apiLatency = $connectivity['checks']['api']['latency_ms'] ?? 0;
        if ($apiLatency > 100) {
            $result['issues'][] = [
                'severity' => 'warning',
                'type' => 'high_latency',
                'message' => "API延迟过高: {$apiLatency}ms",
            ];
            $result['recommendations'][] = '检查网络连接质量';
            $result['recommendations'][] = '检查服务器负载';
        }
        
        $connections = $this->getConnectionAnalysis();
        
        $blockedCount = $connections['by_state']['blocked'] ?? 0;
        if ($blockedCount > 0) {
            $result['issues'][] = [
                'severity' => 'warning',
                'type' => 'blocked_connections',
                'message' => "有 {$blockedCount} 个连接被阻塞",
            ];
            $result['recommendations'][] = '检查是否触发流控';
            $result['recommendations'][] = '检查内存和磁盘告警';
        }
        
        return $result;
    }

    public function generateNetworkReport(): string
    {
        $connectivity = $this->checkConnectivity();
        $connections = $this->getConnectionAnalysis();
        $stats = $this->getNetworkStats();
        $diagnosis = $this->diagnoseNetworkIssues();
        
        $report = "=== RabbitMQ 网络诊断报告 ===\n";
        $report .= "生成时间: {$connectivity['timestamp']}\n\n";
        
        $report .= "【连通性检查】\n";
        $report .= "目标: {$connectivity['host']}:{$connectivity['port']}\n";
        foreach ($connectivity['checks'] as $check => $result) {
            $status = $result['status'] === 'ok' ? '✓' : '✗';
            $report .= "  {$check}: {$status} (延迟: {$result['latency_ms']}ms)\n";
            if (isset($result['error'])) {
                $report .= "    错误: {$result['error']}\n";
            }
        }
        $report .= "\n";
        
        $report .= "【连接统计】\n";
        $report .= "总连接数: {$connections['total_connections']}\n";
        $report .= "按状态:\n";
        foreach ($connections['by_state'] as $state => $count) {
            $report .= "  {$state}: {$count}\n";
        }
        $report .= "按来源IP:\n";
        foreach (array_slice($connections['by_peer_host']-> ?? [], 0, 5, true) as $ip => $count) {
            $report .= "  {$ip}: {$count}\n";
        }
        $report .= "\n";
        
        $report .= "【网络IO统计】\n";
        foreach ($stats['network_io'] as $node) {
            $report .= "节点: {$node['node']}\n";
            $report .= "  接收: {$node['recv_bytes']} ({$node['recv_count']} 次)\n";
            $report .= "  发送: {$node['send_bytes']} ({$node['send_count']} 次)\n";
        }
        $report .= "\n";
        
        if (!empty($diagnosis['issues'])) {
            $report .= "【诊断问题】\n";
            foreach ($diagnosis['issues'] as $issue) {
                $report .= "[{$issue['severity']}] {$issue['message']}\n";
            }
            $report .= "\n";
        }
        
        if (!empty($diagnosis['recommendations'])) {
            $report .= "【建议】\n";
            foreach ($diagnosis['recommendations'] as $rec) {
                $report .= "  - {$rec}\n";
            }
        }
        
        return $report;
    }

    private function formatBytes(int $bytes): string
    {
        $units = ['B', 'KB', 'MB', 'GB', 'TB'];
        $i = 0;
        while ($bytes >= 1024 && $i < count($units) - 1) {
            $bytes /= 1024;
            $i++;
        }
        return round($bytes, 2) . ' ' . $units[$i];
    }
}

// 使用示例
$diagnostics = new RabbitMQNetworkDiagnostics();
echo $diagnostics->generateNetworkReport();

常见网络问题及解决方案

1. 连接超时

php
<?php

class ConnectionTimeoutHandler
{
    private $config;

    public function __construct(array $config)
    {
        $this->config = array_merge([
            'host' => 'localhost',
            'port' => 5672,
            'user' => 'guest',
            'password' => 'guest',
            'connection_timeout' => 5.0,
            'read_write_timeout' => 30.0,
            'heartbeat' => 15,
        ], $config);
    }

    public function connectWithRetry(int $maxRetries = 3): ?\PhpAmqpLib\Connection\AMQPStreamConnection
    {
        $attempt = 0;
        
        while ($attempt < $maxRetries) {
            try {
                $connection = new \PhpAmqpLib\Connection\AMQPStreamConnection(
                    $this->config['host'],
                    $this->config['port'],
                    $this->config['user'],
                    $this->config['password'],
                    '/',
                    false,
                    'AMQPLAIN',
                    null,
                    'en_US',
                    $this->config['connection_timeout'],
                    $this->config['read_write_timeout'],
                    null,
                    true,
                    $this->config['heartbeat']
                );
                
                echo "连接成功\n";
                return $connection;
            } catch (\Exception $e) {
                $attempt++;
                echo "连接失败 ({$attempt}/{$maxRetries}): " . $e->getMessage() . "\n";
                
                if ($attempt < $maxRetries) {
                    $delay = pow(2, $attempt);
                    echo "等待 {$delay} 秒后重试...\n";
                    sleep($delay);
                }
            }
        }
        
        return null;
    }
}

2. 心跳超时

bash
# 调整心跳配置
# rabbitmq.conf
heartbeat = 60

# 客户端配置
# 设置 read_write_timeout 为心跳时间的 2 倍以上

3. 网络分区

bash
# 检查网络分区
rabbitmqctl cluster_status | grep partitions

# 处理网络分区
# 自动恢复模式
# rabbitmq.conf
# cluster_partition_handling = autoheal

网络配置优化

1. TCP参数优化

bash
# rabbitmq.conf

# TCP缓冲区大小
tcp_listen_options.backlog = 4096
tcp_listen_options.nodelay = true
tcp_listen_options.sndbuf = 196608
tcp_listen_options.recbuf = 196608

# 连接超时
connection_timeout = 60000

2. 心跳配置

bash
# rabbitmq.conf

# 心跳间隔(秒)
heartbeat = 60

# 心跳超时检测
# 默认为心跳间隔的 2 倍

3. 防火墙配置

bash
# 开放必要端口
firewall-cmd --permanent --add-port=5672/tcp
firewall-cmd --permanent --add-port=15672/tcp
firewall-cmd --permanent --add-port=25672/tcp
firewall-cmd --permanent --add-port=4369/tcp
firewall-cmd --reload

# iptables 配置
iptables -A INPUT -p tcp --dport 5672 -j ACCEPT
iptables -A INPUT -p tcp --dport 15672 -j ACCEPT
iptables -A INPUT -p tcp --dport 25672 -j ACCEPT
iptables -A INPUT -p tcp --dport 4369 -j ACCEPT

网络监控脚本

bash
#!/bin/bash
# network_monitor.sh

HOST="localhost"
LOG_FILE="/var/log/rabbitmq/network_monitor.log"

log_message() {
    echo "[$(date '+%Y-%m-%d %H:%M:%S')] $1" >> $LOG_FILE
}

check_port() {
    local port=$1
    local result=$(nc -zv $HOST $port 2>&1)
    
    if echo "$result" | grep -q "succeeded"; then
        echo "ok"
    else
        echo "failed"
    fi
}

check_latency() {
    local latency=$(ping -c 1 $HOST | grep 'time=' | awk -F'=' '{print $4}' | awk '{print $1}')
    echo "$latency"
}

check_connections() {
    rabbitmqctl list_connections 2>/dev/null | wc -l
}

monitor() {
    log_message "=== 网络监控检查 ==="
    
    local amqp_status=$(check_port 5672)
    local mgmt_status=$(check_port 15672)
    local latency=$(check_latency)
    local connections=$(check_connections)
    
    log_message "AMQP端口: $amqp_status"
    log_message "管理端口: $mgmt_status"
    log_message "网络延迟: ${latency}ms"
    log_message "连接数: $connections"
    
    if [ "$amqp_status" != "ok" ] || [ "$mgmt_status" != "ok" ]; then
        log_message "WARNING: 端口不可访问"
        # send_alert "RabbitMQ端口告警"
    fi
    
    if (( $(echo "$latency > 100" | bc -l) )); then
        log_message "WARNING: 网络延迟过高"
    fi
}

while true; do
    monitor
    sleep 60
done

注意事项

  1. 心跳时间要合理:太短增加负载,太长检测不及时
  2. 防火墙要正确配置:开放所有必要端口
  3. 网络延迟要监控:高延迟影响消息传输
  4. 连接数要控制:过多连接消耗资源
  5. 集群网络要稳定:网络分区会导致数据不一致

相关链接