Skip to content

RabbitMQ Zabbix 监控

概述

Zabbix 是一款企业级的分布式监控解决方案,能够监控各种网络参数、服务器健康状况和应用程序完整性。本文将详细介绍如何使用 Zabbix 监控 RabbitMQ,包括模板配置、自定义监控项和告警规则。

核心知识点

Zabbix 监控架构

RabbitMQ --> Zabbix Agent --> Zabbix Server --> Zabbix Web
                |                  |                |
           采集数据           处理存储数据        可视化展示

监控方式

方式说明适用场景
Agent 主动模式Agent 主动发送数据大规模部署
Agent 被动模式Server 拉取数据小规模部署
HTTP Agent通过 HTTP API 采集无需安装 Agent
External Check外部脚本采集自定义监控

监控项分类

类型监控项
节点状态运行状态、版本、运行时间
资源使用内存、磁盘、CPU、文件描述符
连接统计连接数、通道数、消费者数
队列状态消息数、消费速率、队列长度
消息统计发布速率、确认速率、投递速率

配置示例

Zabbix Agent 配置

ini
/etc/zabbix/zabbix_agentd.conf

UserParameter=rabbitmq.status[*],/opt/zabbix/scripts/rabbitmq_status.sh $1
UserParameter=rabbitmq.queue[*],/opt/zabbix/scripts/rabbitmq_queue.sh $1 $2
UserParameter=rabbitmq.node[*],/opt/zabbix/scripts/rabbitmq_node.sh $1
UserParameter=rabbitmq.overview[*],/opt/zabbix/scripts/rabbitmq_overview.sh $1

节点状态监控脚本

bash
#!/bin/bash
/opt/zabbix/scripts/rabbitmq_status.sh

RABBITMQ_HOST="localhost"
RABBITMQ_PORT="15672"
RABBITMQ_USER="admin"
RABBITMQ_PASS="admin123"

API_URL="http://${RABBITMQ_HOST}:${RABBITMQ_PORT}/api"

case "$1" in
    "status")
        result=$(curl -s -u "${RABBITMQ_USER}:${RABBITMQ_PASS}" "${API_URL}/overview" | grep -o '"rabbitmq_version":"[^"]*"' | cut -d'"' -f4)
        if [ -n "$result" ]; then
            echo "1"
        else
            echo "0"
        fi
        ;;
    "version")
        curl -s -u "${RABBITMQ_USER}:${RABBITMQ_PASS}" "${API_URL}/overview" | grep -o '"rabbitmq_version":"[^"]*"' | cut -d'"' -f4
        ;;
    "uptime")
        curl -s -u "${RABBITMQ_USER}:${RABBITMQ_PASS}" "${API_URL}/nodes" | grep -o '"uptime":[0-9]*' | head -1 | cut -d':' -f2
        ;;
    *)
        echo "Usage: $0 {status|version|uptime}"
        exit 1
        ;;
esac

队列监控脚本

bash
#!/bin/bash
/opt/zabbix/scripts/rabbitmq_queue.sh

RABBITMQ_HOST="localhost"
RABBITMQ_PORT="15672"
RABBITMQ_USER="admin"
RABBITMQ_PASS="admin123"

API_URL="http://${RABBITMQ_HOST}:${RABBITMQ_PORT}/api"

queue_name="$1"
metric="$2"

case "$metric" in
    "messages")
        curl -s -u "${RABBITMQ_USER}:${RABBITMQ_PASS}" "${API_URL}/queues/%2F/${queue_name}" | grep -o '"messages":[0-9]*' | cut -d':' -f2
        ;;
    "messages_ready")
        curl -s -u "${RABBITMQ_USER}:${RABBITMQ_PASS}" "${API_URL}/queues/%2F/${queue_name}" | grep -o '"messages_ready":[0-9]*' | cut -d':' -f2
        ;;
    "messages_unacked")
        curl -s -u "${RABBITMQ_USER}:${RABBITMQ_PASS}" "${API_URL}/queues/%2F/${queue_name}" | grep -o '"messages_unacked":[0-9]*' | cut -d':' -f2
        ;;
    "consumers")
        curl -s -u "${RABBITMQ_USER}:${RABBITMQ_PASS}" "${API_URL}/queues/%2F/${queue_name}" | grep -o '"consumers":[0-9]*' | cut -d':' -f2
        ;;
    "memory")
        curl -s -u "${RABBITMQ_USER}:${RABBITMQ_PASS}" "${API_URL}/queues/%2F/${queue_name}" | grep -o '"memory":[0-9]*' | cut -d':' -f2
        ;;
    *)
        echo "Usage: $0 <queue_name> {messages|messages_ready|messages_unacked|consumers|memory}"
        exit 1
        ;;
esac

节点资源监控脚本

bash
#!/bin/bash
/opt/zabbix/scripts/rabbitmq_node.sh

RABBITMQ_HOST="localhost"
RABBITMQ_PORT="15672"
RABBITMQ_USER="admin"
RABBITMQ_PASS="admin123"

API_URL="http://${RABBITMQ_HOST}:${RABBITMQ_PORT}/api"

case "$1" in
    "mem_used")
        curl -s -u "${RABBITMQ_USER}:${RABBITMQ_PASS}" "${API_URL}/nodes" | python3 -c "import sys,json; print(json.load(sys.stdin)[0]['mem_used'])"
        ;;
    "mem_limit")
        curl -s -u "${RABBITMQ_USER}:${RABBITMQ_PASS}" "${API_URL}/nodes" | python3 -c "import sys,json; print(json.load(sys.stdin)[0]['mem_limit'])"
        ;;
    "mem_alarm")
        curl -s -u "${RABBITMQ_USER}:${RABBITMQ_PASS}" "${API_URL}/nodes" | python3 -c "import sys,json; print(1 if json.load(sys.stdin)[0]['mem_alarm'] else 0)"
        ;;
    "disk_free")
        curl -s -u "${RABBITMQ_USER}:${RABBITMQ_PASS}" "${API_URL}/nodes" | python3 -c "import sys,json; print(json.load(sys.stdin)[0]['disk_free'])"
        ;;
    "disk_free_limit")
        curl -s -u "${RABBITMQ_USER}:${RABBITMQ_PASS}" "${API_URL}/nodes" | python3 -c "import sys,json; print(json.load(sys.stdin)[0]['disk_free_limit'])"
        ;;
    "disk_alarm")
        curl -s -u "${RABBITMQ_USER}:${RABBITMQ_PASS}" "${API_URL}/nodes" | python3 -c "import sys,json; print(1 if json.load(sys.stdin)[0]['disk_free_alarm'] else 0)"
        ;;
    "fd_used")
        curl -s -u "${RABBITMQ_USER}:${RABBITMQ_PASS}" "${API_URL}/nodes" | python3 -c "import sys,json; print(json.load(sys.stdin)[0]['fd_used'])"
        ;;
    "fd_total")
        curl -s -u "${RABBITMQ_USER}:${RABBITMQ_PASS}" "${API_URL}/nodes" | python3 -c "import sys,json; print(json.load(sys.stdin)[0]['fd_total'])"
        ;;
    "sockets_used")
        curl -s -u "${RABBITMQ_USER}:${RABBITMQ_PASS}" "${API_URL}/nodes" | python3 -c "import sys,json; print(json.load(sys.stdin)[0]['sockets_used'])"
        ;;
    "sockets_total")
        curl -s -u "${RABBITMQ_USER}:${RABBITMQ_PASS}" "${API_URL}/nodes" | python3 -c "import sys,json; print(json.load(sys.stdin)[0]['sockets_total'])"
        ;;
    "proc_used")
        curl -s -u "${RABBITMQ_USER}:${RABBITMQ_PASS}" "${API_URL}/nodes" | python3 -c "import sys,json; print(json.load(sys.stdin)[0]['proc_used'])"
        ;;
    "proc_total")
        curl -s -u "${RABBITMQ_USER}:${RABBITMQ_PASS}" "${API_URL}/nodes" | python3 -c "import sys,json; print(json.load(sys.stdin)[0]['proc_total'])"
        ;;
    *)
        echo "Usage: $0 {mem_used|mem_limit|mem_alarm|disk_free|disk_free_limit|disk_alarm|fd_used|fd_total|sockets_used|sockets_total|proc_used|proc_total}"
        exit 1
        ;;
esac

概览监控脚本

bash
#!/bin/bash
/opt/zabbix/scripts/rabbitmq_overview.sh

RABBITMQ_HOST="localhost"
RABBITMQ_PORT="15672"
RABBITMQ_USER="admin"
RABBITMQ_PASS="admin123"

API_URL="http://${RABBITMQ_HOST}:${RABBITMQ_PORT}/api"

case "$1" in
    "connections")
        curl -s -u "${RABBITMQ_USER}:${RABBITMQ_PASS}" "${API_URL}/overview" | python3 -c "import sys,json; print(json.load(sys.stdin)['object_totals']['connections'])"
        ;;
    "channels")
        curl -s -u "${RABBITMQ_USER}:${RABBITMQ_PASS}" "${API_URL}/overview" | python3 -c "import sys,json; print(json.load(sys.stdin)['object_totals']['channels'])"
        ;;
    "queues")
        curl -s -u "${RABBITMQ_USER}:${RABBITMQ_PASS}" "${API_URL}/overview" | python3 -c "import sys,json; print(json.load(sys.stdin)['object_totals']['queues'])"
        ;;
    "consumers")
        curl -s -u "${RABBITMQ_USER}:${RABBITMQ_PASS}" "${API_URL}/overview" | python3 -c "import sys,json; print(json.load(sys.stdin)['object_totals']['consumers'])"
        ;;
    "exchanges")
        curl -s -u "${RABBITMQ_USER}:${RABBITMQ_PASS}" "${API_URL}/overview" | python3 -c "import sys,json; print(json.load(sys.stdin)['object_totals']['exchanges'])"
        ;;
    "messages_total")
        curl -s -u "${RABBITMQ_USER}:${RABBITMQ_PASS}" "${API_URL}/overview" | python3 -c "import sys,json; d=json.load(sys.stdin); print(d['queue_totals']['messages'])"
        ;;
    "messages_ready")
        curl -s -u "${RABBITMQ_USER}:${RABBITMQ_PASS}" "${API_URL}/overview" | python3 -c "import sys,json; d=json.load(sys.stdin); print(d['queue_totals']['messages_ready'])"
        ;;
    "messages_unacked")
        curl -s -u "${RABBITMQ_USER}:${RABBITMQ_PASS}" "${API_URL}/overview" | python3 -c "import sys,json; d=json.load(sys.stdin); print(d['queue_totals']['messages_unacked'])"
        ;;
    "publish_rate")
        curl -s -u "${RABBITMQ_USER}:${RABBITMQ_PASS}" "${API_URL}/overview" | python3 -c "import sys,json; d=json.load(sys.stdin); print(d['message_stats']['publish_details']['rate'])"
        ;;
    "consume_rate")
        curl -s -u "${RABBITMQ_USER}:${RABBITMQ_PASS}" "${API_URL}/overview" | python3 -c "import sys,json; d=json.load(sys.stdin); print(d['message_stats']['consume_details']['rate'])"
        ;;
    *)
        echo "Usage: $0 {connections|channels|queues|consumers|exchanges|messages_total|messages_ready|messages_unacked|publish_rate|consume_rate}"
        exit 1
        ;;
esac

PHP Zabbix 发送器

php
<?php

class ZabbixSender
{
    private $server;
    private $port;
    
    public function __construct($server = '127.0.0.1', $port = 10051)
    {
        $this->server = $server;
        $this->port = $port;
    }
    
    public function send($host, $items)
    {
        $payload = [
            'request' => 'sender data',
            'data' => [],
        ];
        
        foreach ($items as $key => $value) {
            $payload['data'][] = [
                'host' => $host,
                'key' => $key,
                'value' => $value,
                'clock' => time(),
            ];
        }
        
        $json = json_encode($payload);
        $header = "ZBXD\x01" . pack('V', strlen($json)) . "\x00\x00\x00\x00";
        
        $socket = socket_create(AF_INET, SOCK_STREAM, SOL_TCP);
        if (!$socket) {
            return false;
        }
        
        $result = socket_connect($socket, $this->server, $this->port);
        if (!$result) {
            socket_close($socket);
            return false;
        }
        
        socket_write($socket, $header . $json);
        
        $response = socket_read($socket, 1024);
        socket_close($socket);
        
        return substr($response, 13);
    }
}

class RabbitMQZabbixSender
{
    private $zabbix;
    private $rabbitmqHost;
    private $rabbitmqPort;
    private $rabbitmqUser;
    private $rabbitmqPass;
    
    public function __construct(
        ZabbixSender $zabbix,
        $rabbitmqHost = 'localhost',
        $rabbitmqPort = 15672,
        $rabbitmqUser = 'guest',
        $rabbitmqPass = 'guest'
    ) {
        $this->zabbix = $zabbix;
        $this->rabbitmqHost = $rabbitmqHost;
        $this->rabbitmqPort = $rabbitmqPort;
        $this->rabbitmqUser = $rabbitmqUser;
        $this->rabbitmqPass = $rabbitmqPass;
    }
    
    private function request($endpoint)
    {
        $url = "http://{$this->rabbitmqHost}:{$this->rabbitmqPort}/api/{$endpoint}";
        
        $ch = curl_init();
        curl_setopt_array($ch, [
            CURLOPT_URL => $url,
            CURLOPT_RETURNTRANSFER => true,
            CURLOPT_USERPWD => "{$this->rabbitmqUser}:{$this->rabbitmqPass}",
            CURLOPT_HTTPHEADER => ['Content-Type: application/json'],
            CURLOPT_TIMEOUT => 10,
        ]);
        
        $response = curl_exec($ch);
        curl_close($ch);
        
        return json_decode($response, true);
    }
    
    public function sendMetrics($zabbixHost)
    {
        $overview = $this->request('overview');
        $nodes = $this->request('nodes');
        
        $items = [];
        
        if (isset($overview['object_totals'])) {
            $items['rabbitmq.connections'] = $overview['object_totals']['connections'] ?? 0;
            $items['rabbitmq.channels'] = $overview['object_totals']['channels'] ?? 0;
            $items['rabbitmq.queues'] = $overview['object_totals']['queues'] ?? 0;
            $items['rabbitmq.consumers'] = $overview['object_totals']['consumers'] ?? 0;
            $items['rabbitmq.exchanges'] = $overview['object_totals']['exchanges'] ?? 0;
        }
        
        if (isset($overview['queue_totals'])) {
            $items['rabbitmq.messages_total'] = $overview['queue_totals']['messages'] ?? 0;
            $items['rabbitmq.messages_ready'] = $overview['queue_totals']['messages_ready'] ?? 0;
            $items['rabbitmq.messages_unacked'] = $overview['queue_totals']['messages_unacked'] ?? 0;
        }
        
        if (isset($overview['message_stats'])) {
            $items['rabbitmq.publish_rate'] = $overview['message_stats']['publish_details']['rate'] ?? 0;
            $items['rabbitmq.consume_rate'] = $overview['message_stats']['consume_details']['rate'] ?? 0;
            $items['rabbitmq.ack_rate'] = $overview['message_stats']['ack_details']['rate'] ?? 0;
        }
        
        if (!empty($nodes)) {
            $node = $nodes[0];
            $items['rabbitmq.mem_used'] = $node['mem_used'] ?? 0;
            $items['rabbitmq.mem_limit'] = $node['mem_limit'] ?? 0;
            $items['rabbitmq.mem_alarm'] = ($node['mem_alarm'] ?? false) ? 1 : 0;
            $items['rabbitmq.disk_free'] = $node['disk_free'] ?? 0;
            $items['rabbitmq.disk_alarm'] = ($node['disk_free_alarm'] ?? false) ? 1 : 0;
            $items['rabbitmq.fd_used'] = $node['fd_used'] ?? 0;
            $items['rabbitmq.fd_total'] = $node['fd_total'] ?? 0;
            $items['rabbitmq.sockets_used'] = $node['sockets_used'] ?? 0;
            $items['rabbitmq.sockets_total'] = $node['sockets_total'] ?? 0;
            $items['rabbitmq.proc_used'] = $node['proc_used'] ?? 0;
            $items['rabbitmq.proc_total'] = $node['proc_total'] ?? 0;
        }
        
        return $this->zabbix->send($zabbixHost, $items);
    }
}

$zabbix = new ZabbixSender('zabbix-server', 10051);
$sender = new RabbitMQZabbixSender($zabbix, 'localhost', 15672, 'admin', 'admin123');
$result = $sender->sendMetrics('rabbitmq-server');
echo $result . "\n";

实际应用场景

场景一:Zabbix 模板配置

xml
<?xml version="1.0" encoding="UTF-8"?>
<zabbix_export>
    <version>5.0</version>
    <groups>
        <group>
            <name>Message Queues</name>
        </group>
    </groups>
    <templates>
        <template>
            <template>Template RabbitMQ</template>
            <name>Template RabbitMQ</name>
            <groups>
                <group>
                    <name>Message Queues</name>
                </group>
            </groups>
            <items>
                <item>
                    <name>RabbitMQ Status</name>
                    <key>rabbitmq.status[status]</key>
                    <value_type>NUMERIC</value_type>
                    <triggers>
                        <trigger>
                            <expression>{last()}=0</expression>
                            <name>RabbitMQ is down</name>
                            <priority>HIGH</priority>
                        </trigger>
                    </triggers>
                </item>
                <item>
                    <name>Memory Used</name>
                    <key>rabbitmq.node[mem_used]</key>
                    <units>B</units>
                </item>
                <item>
                    <name>Memory Limit</name>
                    <key>rabbitmq.node[mem_limit]</key>
                    <units>B</units>
                </item>
                <item>
                    <name>Memory Alarm</name>
                    <key>rabbitmq.node[mem_alarm]</key>
                    <value_type>NUMERIC</value_type>
                    <triggers>
                        <trigger>
                            <expression>{last()}=1</expression>
                            <name>Memory alarm triggered</name>
                            <priority>HIGH</priority>
                        </trigger>
                    </triggers>
                </item>
                <item>
                    <name>Disk Free</name>
                    <key>rabbitmq.node[disk_free]</key>
                    <units>B</units>
                </item>
                <item>
                    <name>Disk Alarm</name>
                    <key>rabbitmq.node[disk_alarm]</key>
                    <value_type>NUMERIC</value_type>
                    <triggers>
                        <trigger>
                            <expression>{last()}=1</expression>
                            <name>Disk alarm triggered</name>
                            <priority>HIGH</priority>
                        </trigger>
                    </triggers>
                </item>
                <item>
                    <name>Total Connections</name>
                    <key>rabbitmq.overview[connections]</key>
                    <value_type>NUMERIC</value_type>
                </item>
                <item>
                    <name>Total Channels</name>
                    <key>rabbitmq.overview[channels]</key>
                    <value_type>NUMERIC</value_type>
                </item>
                <item>
                    <name>Total Queues</name>
                    <key>rabbitmq.overview[queues]</key>
                    <value_type>NUMERIC</value_type>
                </item>
                <item>
                    <name>Total Messages</name>
                    <key>rabbitmq.overview[messages_total]</key>
                    <value_type>NUMERIC</value_type>
                    <triggers>
                        <trigger>
                            <expression>{last()}&gt;100000</expression>
                            <name>Too many messages in queues</name>
                            <priority>WARNING</priority>
                        </trigger>
                    </triggers>
                </item>
            </items>
        </template>
    </templates>
</zabbix_export>

场景二:自动发现队列

bash
#!/bin/bash
/opt/zabbix/scripts/rabbitmq_queue_discovery.sh

RABBITMQ_HOST="localhost"
RABBITMQ_PORT="15672"
RABBITMQ_USER="admin"
RABBITMQ_PASS="admin123"

API_URL="http://${RABBITMQ_HOST}:${RABBITMQ_PORT}/api"

queues=$(curl -s -u "${RABBITMQ_USER}:${RABBITMQ_PASS}" "${API_URL}/queues")

echo '{"data":['

first=true
echo "$queues" | python3 -c "
import sys, json
queues = json.load(sys.stdin)
for i, q in enumerate(queues):
    sep = ',' if i > 0 else ''
    print(f'{sep}{{\"{{#QUEUE_NAME}}\": \"{q[\"name\"]}\", \"{{#VHOST}}\": \"{q[\"vhost\"]}\"}}')
"

echo ']}'

Zabbix Agent 配置:

ini
UserParameter=rabbitmq.queue.discovery,/opt/zabbix/scripts/rabbitmq_queue_discovery.sh
UserParameter=rabbitmq.queue.stats[*],curl -s -u "admin:admin123" "http://localhost:15672/api/queues/$1/$2" | python3 -c "import sys,json; d=json.load(sys.stdin); print(d['$3'])"

常见问题与解决方案

问题一:脚本执行权限问题

现象:Zabbix Agent 无法执行脚本。

解决方案

bash
chmod +x /opt/zabbix/scripts/*.sh
chown zabbix:zabbix /opt/zabbix/scripts/*.sh

问题二:API 连接超时

现象:监控项采集超时。

解决方案

bash
Timeout=30

问题三:中文乱码

现象:队列名称包含中文时显示乱码。

解决方案

确保脚本使用 UTF-8 编码,并在 Zabbix 中设置正确的字符集。

最佳实践

1. 监控项命名规范

rabbitmq.<category>.<metric>
rabbitmq.node.mem_used
rabbitmq.queue.messages
rabbitmq.overview.connections

2. 告警阈值设置

监控项警告阈值严重阈值
内存使用率70%85%
磁盘剩余10GB5GB
消息堆积50000100000
连接数800950

3. 数据保留策略

  • 原始数据:7 天
  • 趋势数据:365 天

相关链接