Skip to content

RabbitMQ HTTP API 详解

概述

RabbitMQ HTTP API 是由 rabbitmq_management 插件提供的 RESTful API 接口,允许开发者通过 HTTP 请求管理 RabbitMQ 服务器。HTTP API 提供了与命令行工具和 Web 管理界面相同的管理能力,非常适合集成到自动化运维系统、监控平台和应用程序中。

核心功能

  • 系统监控:获取服务器状态、节点信息、统计数据
  • 资源管理:创建、删除、查询队列、交换器、绑定
  • 用户管理:创建、删除用户,设置权限
  • 消息操作:发布消息、获取消息、清空队列
  • 连接管理:查看连接、关闭连接
  • 策略管理:创建、删除策略
  • 虚拟主机管理:创建、删除虚拟主机

API 基础

访问地址

http://hostname:15672/api/

认证方式

HTTP API 使用 HTTP Basic Authentication 进行认证:

Authorization: Basic base64(username:password)

请求格式

Content-Type: application/json

响应格式

Content-Type: application/json

HTTP 状态码

状态码说明
200成功
201创建成功
204成功(无返回内容)
400请求参数错误
401认证失败
403权限不足
404资源不存在
409资源冲突
500服务器内部错误

API 端点分类

概览与统计 API

端点方法说明
/api/overviewGET获取系统概览
/api/overviewPOST重置统计
/api/nodesGET获取节点列表
/api/nodes/GET获取节点详情
/api/whoamiGET获取当前用户信息

连接管理 API

端点方法说明
/api/connectionsGET获取连接列表
/api/connections/GET获取连接详情
/api/connections/DELETE关闭连接
/api/connections/{name}/channelsGET获取连接的通道

通道管理 API

端点方法说明
/api/channelsGET获取通道列表
/api/channels/GET获取通道详情

交换器管理 API

端点方法说明
/api/exchangesGET获取所有交换器
/api/exchanges/GET获取虚拟主机的交换器
/api/exchanges/{vhost}/GET获取交换器详情
/api/exchanges/{vhost}/PUT创建交换器
/api/exchanges/{vhost}/DELETE删除交换器
/api/exchanges/{vhost}/{name}/publishPOST发布消息
/api/exchanges/{vhost}/{name}/bindings/sourceGET获取源绑定
/api/exchanges/{vhost}/{name}/bindings/destinationGET获取目标绑定

队列管理 API

端点方法说明
/api/queuesGET获取所有队列
/api/queues/GET获取虚拟主机的队列
/api/queues/{vhost}/GET获取队列详情
/api/queues/{vhost}/PUT创建队列
/api/queues/{vhost}/DELETE删除队列
/api/queues/{vhost}/{name}/contentsDELETE清空队列
/api/queues/{vhost}/{name}/getPOST获取消息
/api/queues/{vhost}/{name}/bindingsGET获取绑定关系
/api/queues/{vhost}/{name}/purgePOST清空队列

绑定管理 API

端点方法说明
/api/bindingsGET获取所有绑定
/api/bindings/GET获取虚拟主机的绑定
/api/bindings/{vhost}/e/{exchange}/q/GET获取交换器到队列的绑定
/api/bindings/{vhost}/e/{exchange}/q/POST创建绑定
/api/bindings/{vhost}/e/{exchange}/q/{queue}/DELETE删除绑定
/api/bindings/{vhost}/e/{source}/e/POST交换器到交换器绑定

用户管理 API

端点方法说明
/api/usersGET获取用户列表
/api/users/GET获取用户详情
/api/users/PUT创建/更新用户
/api/users/DELETE删除用户
/api/users/{name}/permissionsGET获取用户权限

权限管理 API

端点方法说明
/api/permissionsGET获取所有权限
/api/permissions/{vhost}/GET获取特定权限
/api/permissions/{vhost}/PUT设置权限
/api/permissions/{vhost}/DELETE删除权限

虚拟主机管理 API

端点方法说明
/api/vhostsGET获取虚拟主机列表
/api/vhosts/GET获取虚拟主机详情
/api/vhosts/PUT创建虚拟主机
/api/vhosts/DELETE删除虚拟主机
/api/vhosts/{name}/permissionsGET获取虚拟主机权限

策略管理 API

端点方法说明
/api/policiesGET获取所有策略
/api/policies/GET获取虚拟主机的策略
/api/policies/{vhost}/GET获取策略详情
/api/policies/{vhost}/PUT创建/更新策略
/api/policies/{vhost}/DELETE删除策略

参数管理 API

端点方法说明
/api/parametersGET获取所有参数
/api/parameters/GET获取组件参数
/api/parameters/{component}/{vhost}/PUT设置参数
/api/parameters/{component}/{vhost}/DELETE删除参数

PHP 代码示例

完整的 RabbitMQ API 客户端

php
<?php

class RabbitMQApiClient
{
    private $host;
    private $port;
    private $username;
    private $password;
    private $timeout;

    public function __construct(
        $host = 'localhost',
        $port = 15672,
        $username = 'guest',
        $password = 'guest',
        $timeout = 30
    ) {
        $this->host = $host;
        $this->port = $port;
        $this->username = $username;
        $this->password = $password;
        $this->timeout = $timeout;
    }

    private function request($method, $path, $data = null, $query = [])
    {
        $url = "http://{$this->host}:{$this->port}/api/{$path}";
        
        if (!empty($query)) {
            $url .= '?' . http_build_query($query);
        }
        
        $ch = curl_init();
        
        $options = [
            CURLOPT_URL => $url,
            CURLOPT_RETURNTRANSFER => true,
            CURLOPT_USERPWD => "{$this->username}:{$this->password}",
            CURLOPT_HTTPHEADER => ['Content-Type: application/json'],
            CURLOPT_CUSTOMREQUEST => $method,
            CURLOPT_TIMEOUT => $this->timeout,
            CURLOPT_FOLLOWLOCATION => true,
        ];
        
        if ($data !== null) {
            $options[CURLOPT_POSTFIELDS] = json_encode($data);
        }
        
        curl_setopt_array($ch, $options);
        
        $response = curl_exec($ch);
        $httpCode = curl_getinfo($ch, CURLINFO_HTTP_CODE);
        $error = curl_error($ch);
        curl_close($ch);
        
        return [
            'code' => $httpCode,
            'body' => json_decode($response, true),
            'error' => $error,
            'success' => $httpCode >= 200 && $httpCode < 300
        ];
    }

    public function getOverview()
    {
        return $this->request('GET', 'overview');
    }

    public function resetStats()
    {
        return $this->request('POST', 'overview');
    }

    public function getNodes()
    {
        return $this->request('GET', 'nodes');
    }

    public function getNode($name)
    {
        return $this->request('GET', "nodes/{$name}");
    }

    public function whoami()
    {
        return $this->request('GET', 'whoami');
    }

    public function getConnections()
    {
        return $this->request('GET', 'connections');
    }

    public function getConnection($name)
    {
        return $this->request('GET', "connections/{$name}");
    }

    public function closeConnection($name, $reason = 'API requested')
    {
        return $this->request('DELETE', "connections/{$name}", ['reason' => $reason]);
    }

    public function getChannels()
    {
        return $this->request('GET', 'channels');
    }

    public function getChannel($name)
    {
        return $this->request('GET', "channels/{$name}");
    }

    public function getExchanges($vhost = null)
    {
        if ($vhost) {
            return $this->request('GET', "exchanges/{$vhost}");
        }
        return $this->request('GET', 'exchanges');
    }

    public function getExchange($name, $vhost = '%2F')
    {
        return $this->request('GET', "exchanges/{$vhost}/{$name}");
    }

    public function createExchange($name, $type, $vhost = '%2F', $options = [])
    {
        $data = array_merge([
            'type' => $type,
            'auto_delete' => false,
            'durable' => true,
            'internal' => false,
            'arguments' => []
        ], $options);
        
        return $this->request('PUT', "exchanges/{$vhost}/{$name}", $data);
    }

    public function deleteExchange($name, $vhost = '%2F')
    {
        return $this->request('DELETE', "exchanges/{$vhost}/{$name}");
    }

    public function publishMessage($exchange, $routingKey, $payload, $vhost = '%2F', $properties = [])
    {
        $data = array_merge([
            'properties' => array_merge([
                'delivery_mode' => 2,
                'content_type' => 'application/json'
            ], $properties),
            'routing_key' => $routingKey,
            'payload' => $payload,
            'payload_encoding' => 'string'
        ], []);
        
        return $this->request('POST', "exchanges/{$vhost}/{$exchange}/publish", $data);
    }

    public function getQueues($vhost = null)
    {
        if ($vhost) {
            return $this->request('GET', "queues/{$vhost}");
        }
        return $this->request('GET', 'queues');
    }

    public function getQueue($name, $vhost = '%2F')
    {
        return $this->request('GET', "queues/{$vhost}/{$name}");
    }

    public function createQueue($name, $vhost = '%2F', $options = [])
    {
        $data = array_merge([
            'auto_delete' => false,
            'durable' => true,
            'arguments' => []
        ], $options);
        
        return $this->request('PUT', "queues/{$vhost}/{$name}", $data);
    }

    public function deleteQueue($name, $vhost = '%2F')
    {
        return $this->request('DELETE', "queues/{$vhost}/{$name}");
    }

    public function purgeQueue($name, $vhost = '%2F')
    {
        return $this->request('DELETE', "queues/{$vhost}/{$name}/contents");
    }

    public function getMessages($queue, $count = 1, $ackMode = 'ack_requeue_false', $vhost = '%2F')
    {
        $data = [
            'count' => $count,
            'ackmode' => $ackMode,
            'encoding' => 'auto'
        ];
        
        return $this->request('POST', "queues/{$vhost}/{$queue}/get", $data);
    }

    public function getBindings($vhost = null)
    {
        if ($vhost) {
            return $this->request('GET', "bindings/{$vhost}");
        }
        return $this->request('GET', 'bindings');
    }

    public function createBinding($exchange, $queue, $routingKey = '', $vhost = '%2F', $arguments = [])
    {
        $data = [
            'routing_key' => $routingKey,
            'arguments' => $arguments
        ];
        
        return $this->request('POST', "bindings/{$vhost}/e/{$exchange}/q/{$queue}", $data);
    }

    public function deleteBinding($exchange, $queue, $routingKey = '', $vhost = '%2F')
    {
        $bindings = $this->request('GET', "bindings/{$vhost}/e/{$exchange}/q/{$queue}");
        
        if ($bindings['success']) {
            foreach ($bindings['body'] as $binding) {
                if ($binding['routing_key'] === $routingKey) {
                    $props = $binding['properties_key'];
                    return $this->request('DELETE', "bindings/{$vhost}/e/{$exchange}/q/{$queue}/{$props}");
                }
            }
        }
        
        return ['success' => false, 'code' => 404, 'body' => ['error' => 'Binding not found']];
    }

    public function getUsers()
    {
        return $this->request('GET', 'users');
    }

    public function getUser($name)
    {
        return $this->request('GET', "users/{$name}");
    }

    public function createUser($name, $password, $tags = '')
    {
        $data = [
            'password' => $password,
            'tags' => $tags
        ];
        
        return $this->request('PUT', "users/{$name}", $data);
    }

    public function deleteUser($name)
    {
        return $this->request('DELETE', "users/{$name}");
    }

    public function getPermissions()
    {
        return $this->request('GET', 'permissions');
    }

    public function getPermission($vhost, $user)
    {
        return $this->request('GET', "permissions/{$vhost}/{$user}");
    }

    public function setPermission($user, $vhost, $configure = '.*', $write = '.*', $read = '.*')
    {
        $data = [
            'configure' => $configure,
            'write' => $write,
            'read' => $read
        ];
        
        return $this->request('PUT', "permissions/{$vhost}/{$user}", $data);
    }

    public function deletePermission($user, $vhost)
    {
        return $this->request('DELETE', "permissions/{$vhost}/{$user}");
    }

    public function getVhosts()
    {
        return $this->request('GET', 'vhosts');
    }

    public function getVhost($name)
    {
        return $this->request('GET', "vhosts/{$name}");
    }

    public function createVhost($name)
    {
        return $this->request('PUT', "vhosts/{$name}");
    }

    public function deleteVhost($name)
    {
        return $this->request('DELETE', "vhosts/{$name}");
    }

    public function getPolicies($vhost = null)
    {
        if ($vhost) {
            return $this->request('GET', "policies/{$vhost}");
        }
        return $this->request('GET', 'policies');
    }

    public function createPolicy($name, $pattern, $definition, $vhost = '%2F', $priority = 0, $applyTo = 'all')
    {
        $data = [
            'pattern' => $pattern,
            'definition' => $definition,
            'priority' => $priority,
            'apply-to' => $applyTo
        ];
        
        return $this->request('PUT', "policies/{$vhost}/{$name}", $data);
    }

    public function deletePolicy($name, $vhost = '%2F')
    {
        return $this->request('DELETE', "policies/{$vhost}/{$name}");
    }

    public function getParameters($component = null)
    {
        if ($component) {
            return $this->request('GET', "parameters/{$component}");
        }
        return $this->request('GET', 'parameters');
    }

    public function setParameter($component, $name, $value, $vhost = '%2F')
    {
        $data = [
            'value' => $value
        ];
        
        return $this->request('PUT', "parameters/{$component}/{$vhost}/{$name}", $data);
    }

    public function deleteParameter($component, $name, $vhost = '%2F')
    {
        return $this->request('DELETE', "parameters/{$component}/{$vhost}/{$name}");
    }

    public function exportDefinitions($vhost = null)
    {
        $query = $vhost ? ['vhost' => $vhost] : [];
        return $this->request('GET', 'definitions', null, $query);
    }

    public function importDefinitions($definitions)
    {
        return $this->request('POST', 'definitions', $definitions);
    }
}

使用示例

php
<?php

require_once 'RabbitMQApiClient.php';

$api = new RabbitMQApiClient('localhost', 15672, 'admin', 'admin123');

$overview = $api->getOverview();
echo "RabbitMQ 版本: {$overview['body']['rabbitmq_version']}\n";
echo "Erlang 版本: {$overview['body']['erlang_version']}\n";
echo "队列总数: {$overview['body']['object_totals']['queues']}\n";
echo "连接总数: {$overview['body']['object_totals']['connections']}\n";

$result = $api->createUser('app_user', 'secure_password', 'monitoring');
echo "创建用户: " . ($result['success'] ? '成功' : '失败') . "\n";

$result = $api->setPermission('app_user', '%2F', '^app\.', '^app\.', '^app\.');
echo "设置权限: " . ($result['success'] ? '成功' : '失败') . "\n";

$result = $api->createVhost('production');
echo "创建虚拟主机: " . ($result['success'] ? '成功' : '失败') . "\n";

$result = $api->createQueue('app.orders', '%2F', [
    'arguments' => [
        'x-message-ttl' => 86400000,
        'x-max-length' => 10000
    ]
]);
echo "创建队列: " . ($result['success'] ? '成功' : '失败') . "\n";

$result = $api->createExchange('app.exchange', 'topic', '%2F');
echo "创建交换器: " . ($result['success'] ? '成功' : '失败') . "\n";

$result = $api->createBinding('app.exchange', 'app.orders', 'order.*');
echo "创建绑定: " . ($result['success'] ? '成功' : '失败') . "\n";

$result = $api->publishMessage(
    'app.exchange',
    'order.created',
    json_encode(['order_id' => 12345, 'amount' => 99.99]),
    '%2F',
    ['delivery_mode' => 2]
);
echo "发布消息: " . ($result['success'] ? '成功' : '失败') . "\n";

$messages = $api->getMessages('app.orders', 5);
if ($messages['success']) {
    foreach ($messages['body'] as $msg) {
        echo "消息: {$msg['payload']}\n";
    }
}

$haPolicy = [
    'ha-mode' => 'all',
    'ha-sync-mode' => 'automatic'
];
$result = $api->createPolicy('ha-all', '^ha\.', $haPolicy, '%2F', 1, 'queues');
echo "创建策略: " . ($result['success'] ? '成功' : '失败') . "\n";

$queues = $api->getQueues();
foreach ($queues['body'] as $queue) {
    echo "队列: {$queue['name']}, 消息数: {$queue['messages']}\n";
}

$definitions = $api->exportDefinitions();
file_put_contents('rabbitmq_definitions.json', json_encode($definitions['body'], JSON_PRETTY_PRINT));
echo "配置已导出到 rabbitmq_definitions.json\n";

监控类

php
<?php

class RabbitMQMonitor
{
    private $api;
    private $thresholds;

    public function __construct(RabbitMQApiClient $api, array $thresholds = [])
    {
        $this->api = $api;
        $this->thresholds = array_merge([
            'queue_messages_warning' => 1000,
            'queue_messages_critical' => 10000,
            'consumer_min' => 1,
            'connection_max' => 1000,
            'memory_high_watermark' => 0.8
        ], $thresholds);
    }

    public function checkSystemHealth()
    {
        $overview = $this->api->getOverview();
        
        if (!$overview['success']) {
            return [
                'status' => 'error',
                'message' => '无法连接到 RabbitMQ'
            ];
        }
        
        $data = $overview['body'];
        $alerts = [];
        
        $connections = $data['object_totals']['connections'];
        if ($connections > $this->thresholds['connection_max']) {
            $alerts[] = [
                'level' => 'warning',
                'message' => "连接数过多: {$connections}"
            ];
        }
        
        return [
            'status' => empty($alerts) ? 'healthy' : 'warning',
            'rabbitmq_version' => $data['rabbitmq_version'],
            'erlang_version' => $data['erlang_version'],
            'connections' => $connections,
            'channels' => $data['object_totals']['channels'],
            'queues' => $data['object_totals']['queues'],
            'exchanges' => $data['object_totals']['exchanges'],
            'consumers' => $data['object_totals']['consumers'],
            'messages' => $data['queue_totals']['messages'] ?? 0,
            'message_rates' => [
                'publish' => $data['message_stats']['publish_details']['rate'] ?? 0,
                'deliver' => $data['message_stats']['deliver_get_details']['rate'] ?? 0
            ],
            'alerts' => $alerts
        ];
    }

    public function checkQueueHealth($queueName, $vhost = '%2F')
    {
        $result = $this->api->getQueue($queueName, $vhost);
        
        if (!$result['success']) {
            return [
                'status' => 'error',
                'message' => "队列不存在或无法访问: {$queueName}"
            ];
        }
        
        $queue = $result['body'];
        $alerts = [];
        $status = 'healthy';
        
        $messages = $queue['messages'] ?? 0;
        if ($messages > $this->thresholds['queue_messages_critical']) {
            $status = 'critical';
            $alerts[] = [
                'level' => 'critical',
                'message' => "队列消息数严重积压: {$messages}"
            ];
        } elseif ($messages > $this->thresholds['queue_messages_warning']) {
            $status = 'warning';
            $alerts[] = [
                'level' => 'warning',
                'message' => "队列消息数积压: {$messages}"
            ];
        }
        
        $consumers = $queue['consumers'] ?? 0;
        if ($consumers < $this->thresholds['consumer_min']) {
            $status = 'warning';
            $alerts[] = [
                'level' => 'warning',
                'message' => "消费者数量不足: {$consumers}"
            ];
        }
        
        return [
            'status' => $status,
            'queue' => $queueName,
            'vhost' => $vhost,
            'messages' => $messages,
            'messages_ready' => $queue['messages_ready'] ?? 0,
            'messages_unacked' => $queue['messages_unacknowledged'] ?? 0,
            'consumers' => $consumers,
            'memory' => $queue['memory'] ?? 0,
            'state' => $queue['state'] ?? 'unknown',
            'alerts' => $alerts
        ];
    }

    public function checkAllQueues()
    {
        $result = $this->api->getQueues();
        
        if (!$result['success']) {
            return ['status' => 'error', 'message' => '无法获取队列列表'];
        }
        
        $queueHealth = [];
        $criticalCount = 0;
        $warningCount = 0;
        
        foreach ($result['body'] as $queue) {
            $health = $this->checkQueueHealth($queue['name'], $queue['vhost']);
            $queueHealth[] = $health;
            
            if ($health['status'] === 'critical') {
                $criticalCount++;
            } elseif ($health['status'] === 'warning') {
                $warningCount++;
            }
        }
        
        return [
            'status' => $criticalCount > 0 ? 'critical' : ($warningCount > 0 ? 'warning' : 'healthy'),
            'total_queues' => count($queueHealth),
            'critical_count' => $criticalCount,
            'warning_count' => $warningCount,
            'queues' => $queueHealth
        ];
    }

    public function checkNodeHealth()
    {
        $result = $this->api->getNodes();
        
        if (!$result['success']) {
            return ['status' => 'error', 'message' => '无法获取节点信息'];
        }
        
        $nodes = [];
        $alerts = [];
        
        foreach ($result['body'] as $node) {
            $nodeStatus = [
                'name' => $node['name'],
                'type' => $node['type'],
                'running' => $node['running'] ?? false,
                'mem_used' => $node['mem_used'] ?? 0,
                'mem_limit' => $node['mem_limit'] ?? 0,
                'mem_alarm' => $node['mem_alarm'] ?? false,
                'disk_free' => $node['disk_free'] ?? 0,
                'disk_free_limit' => $node['disk_free_limit'] ?? 0,
                'disk_free_alarm' => $node['disk_free_alarm'] ?? false
            ];
            
            if ($nodeStatus['mem_alarm']) {
                $alerts[] = [
                    'level' => 'critical',
                    'node' => $node['name'],
                    'message' => '内存告警触发'
                ];
            }
            
            if ($nodeStatus['disk_free_alarm']) {
                $alerts[] = [
                    'level' => 'critical',
                    'node' => $node['name'],
                    'message' => '磁盘空间告警触发'
                ];
            }
            
            if (!$nodeStatus['running']) {
                $alerts[] = [
                    'level' => 'critical',
                    'node' => $node['name'],
                    'message' => '节点未运行'
                ];
            }
            
            $nodes[] = $nodeStatus;
        }
        
        return [
            'status' => empty($alerts) ? 'healthy' : 'critical',
            'nodes' => $nodes,
            'alerts' => $alerts
        ];
    }

    public function generateHealthReport()
    {
        $system = $this->checkSystemHealth();
        $queues = $this->checkAllQueues();
        $nodes = $this->checkNodeHealth();
        
        $report = "=== RabbitMQ 健康报告 ===\n";
        $report .= "生成时间: " . date('Y-m-d H:i:s') . "\n\n";
        
        $report .= "## 系统状态\n";
        $report .= "状态: {$system['status']}\n";
        $report .= "版本: RabbitMQ {$system['rabbitmq_version']}, Erlang {$system['erlang_version']}\n";
        $report .= "连接数: {$system['connections']}\n";
        $report .= "通道数: {$system['channels']}\n";
        $report .= "队列数: {$system['queues']}\n";
        $report .= "消息总数: {$system['messages']}\n\n";
        
        $report .= "## 队列状态\n";
        $report .= "总队列数: {$queues['total_queues']}\n";
        $report .= "严重问题: {$queues['critical_count']}\n";
        $report .= "警告问题: {$queues['warning_count']}\n\n";
        
        $report .= "## 节点状态\n";
        $report .= "节点数: " . count($nodes['nodes']) . "\n";
        foreach ($nodes['nodes'] as $node) {
            $status = $node['running'] ? '运行中' : '已停止';
            $report .= "  - {$node['name']}: {$status}\n";
        }
        
        if (!empty($system['alerts']) || !empty($nodes['alerts'])) {
            $report .= "\n## 告警信息\n";
            foreach (array_merge($system['alerts'], $nodes['alerts']) as $alert) {
                $report .= "  [{$alert['level']}] {$alert['message']}\n";
            }
        }
        
        return $report;
    }
}

$api = new RabbitMQApiClient('localhost', 15672, 'admin', 'admin123');
$monitor = new RabbitMQMonitor($api, [
    'queue_messages_warning' => 5000,
    'queue_messages_critical' => 50000
]);

$health = $monitor->checkQueueHealth('app.orders');
print_r($health);

echo $monitor->generateHealthReport();

批量操作类

php
<?php

class RabbitMQBatchOperations
{
    private $api;

    public function __construct(RabbitMQApiClient $api)
    {
        $this->api = $api;
    }

    public function setupApplication($config)
    {
        $results = [];
        
        if (!empty($config['vhosts'])) {
            foreach ($config['vhosts'] as $vhost) {
                $results['vhosts'][$vhost] = $this->api->createVhost($vhost);
            }
        }
        
        if (!empty($config['users'])) {
            foreach ($config['users'] as $username => $userData) {
                $results['users'][$username] = $this->api->createUser(
                    $username,
                    $userData['password'],
                    $userData['tags'] ?? ''
                );
                
                if (!empty($userData['permissions'])) {
                    foreach ($userData['permissions'] as $vhost => $perms) {
                        $results['permissions'][$username][$vhost] = $this->api->setPermission(
                            $username,
                            $vhost,
                            $perms['configure'] ?? '.*',
                            $perms['write'] ?? '.*',
                            $perms['read'] ?? '.*'
                        );
                    }
                }
            }
        }
        
        if (!empty($config['exchanges'])) {
            foreach ($config['exchanges'] as $name => $exchangeConfig) {
                $results['exchanges'][$name] = $this->api->createExchange(
                    $name,
                    $exchangeConfig['type'],
                    $exchangeConfig['vhost'] ?? '%2F',
                    $exchangeConfig['options'] ?? []
                );
            }
        }
        
        if (!empty($config['queues'])) {
            foreach ($config['queues'] as $name => $queueConfig) {
                $results['queues'][$name] = $this->api->createQueue(
                    $name,
                    $queueConfig['vhost'] ?? '%2F',
                    $queueConfig['options'] ?? []
                );
            }
        }
        
        if (!empty($config['bindings'])) {
            foreach ($config['bindings'] as $binding) {
                $results['bindings'][$binding['exchange'] . ':' . $binding['queue']] = $this->api->createBinding(
                    $binding['exchange'],
                    $binding['queue'],
                    $binding['routing_key'] ?? '',
                    $binding['vhost'] ?? '%2F',
                    $binding['arguments'] ?? []
                );
            }
        }
        
        if (!empty($config['policies'])) {
            foreach ($config['policies'] as $name => $policyConfig) {
                $results['policies'][$name] = $this->api->createPolicy(
                    $name,
                    $policyConfig['pattern'],
                    $policyConfig['definition'],
                    $policyConfig['vhost'] ?? '%2F',
                    $policyConfig['priority'] ?? 0,
                    $policyConfig['apply_to'] ?? 'all'
                );
            }
        }
        
        return $results;
    }

    public function teardownApplication($config)
    {
        $results = [];
        
        if (!empty($config['bindings'])) {
            foreach ($config['bindings'] as $binding) {
                $results['bindings'][] = $this->api->deleteBinding(
                    $binding['exchange'],
                    $binding['queue'],
                    $binding['routing_key'] ?? '',
                    $binding['vhost'] ?? '%2F'
                );
            }
        }
        
        if (!empty($config['queues'])) {
            foreach ($config['queues'] as $name => $queueConfig) {
                $results['queues'][$name] = $this->api->deleteQueue(
                    $name,
                    $queueConfig['vhost'] ?? '%2F'
                );
            }
        }
        
        if (!empty($config['exchanges'])) {
            foreach ($config['exchanges'] as $name => $exchangeConfig) {
                $results['exchanges'][$name] = $this->api->deleteExchange(
                    $name,
                    $exchangeConfig['vhost'] ?? '%2F'
                );
            }
        }
        
        if (!empty($config['users'])) {
            foreach ($config['users'] as $username => $userData) {
                $results['users'][$username] = $this->api->deleteUser($username);
            }
        }
        
        if (!empty($config['vhosts'])) {
            foreach ($config['vhosts'] as $vhost) {
                $results['vhosts'][$vhost] = $this->api->deleteVhost($vhost);
            }
        }
        
        return $results;
    }

    public function purgeAllQueues($vhost = null)
    {
        $queues = $this->api->getQueues($vhost);
        $results = [];
        
        if ($queues['success']) {
            foreach ($queues['body'] as $queue) {
                $results[$queue['name']] = $this->api->purgeQueue(
                    $queue['name'],
                    $queue['vhost']
                );
            }
        }
        
        return $results;
    }

    public function closeAllConnections($vhost = null)
    {
        $connections = $this->api->getConnections();
        $results = [];
        
        if ($connections['success']) {
            foreach ($connections['body'] as $conn) {
                if ($vhost === null || $conn['vhost'] === $vhost) {
                    $results[$conn['name']] = $this->api->closeConnection($conn['name']);
                }
            }
        }
        
        return $results;
    }

    public function backupConfiguration($filename)
    {
        $definitions = $this->api->exportDefinitions();
        
        if ($definitions['success']) {
            file_put_contents($filename, json_encode($definitions['body'], JSON_PRETTY_PRINT));
            return true;
        }
        
        return false;
    }

    public function restoreConfiguration($filename)
    {
        if (!file_exists($filename)) {
            return false;
        }
        
        $definitions = json_decode(file_get_contents($filename), true);
        return $this->api->importDefinitions($definitions);
    }
}

$config = [
    'vhosts' => ['/app'],
    'users' => [
        'app_producer' => [
            'password' => 'producer_pass',
            'tags' => '',
            'permissions' => [
                '/app' => [
                    'configure' => '^app\.',
                    'write' => '^app\.',
                    'read' => '^app\.'
                ]
            ]
        ],
        'app_consumer' => [
            'password' => 'consumer_pass',
            'tags' => '',
            'permissions' => [
                '/app' => [
                    'configure' => '^app\.',
                    'write' => '',
                    'read' => '^app\.'
                ]
            ]
        ]
    ],
    'exchanges' => [
        'app.orders' => [
            'type' => 'topic',
            'vhost' => '%2Fapp'
        ],
        'app.notifications' => [
            'type' => 'fanout',
            'vhost' => '%2Fapp'
        ]
    ],
    'queues' => [
        'app.orders.created' => [
            'vhost' => '%2Fapp',
            'options' => [
                'arguments' => [
                    'x-message-ttl' => 86400000,
                    'x-max-length' => 10000
                ]
            ]
        ],
        'app.orders.completed' => [
            'vhost' => '%2Fapp'
        ],
        'app.notifications.email' => [
            'vhost' => '%2Fapp'
        ]
    ],
    'bindings' => [
        [
            'exchange' => 'app.orders',
            'queue' => 'app.orders.created',
            'routing_key' => 'order.created',
            'vhost' => '%2Fapp'
        ],
        [
            'exchange' => 'app.orders',
            'queue' => 'app.orders.completed',
            'routing_key' => 'order.completed',
            'vhost' => '%2Fapp'
        ],
        [
            'exchange' => 'app.notifications',
            'queue' => 'app.notifications.email',
            'routing_key' => '',
            'vhost' => '%2Fapp'
        ]
    ],
    'policies' => [
        'ha-all' => [
            'pattern' => '^app\.',
            'definition' => [
                'ha-mode' => 'all',
                'ha-sync-mode' => 'automatic'
            ],
            'vhost' => '%2Fapp',
            'apply_to' => 'queues'
        ]
    ]
];

$api = new RabbitMQApiClient('localhost', 15672, 'admin', 'admin123');
$batch = new RabbitMQBatchOperations($api);

$results = $batch->setupApplication($config);
print_r($results);

$batch->backupConfiguration('/tmp/rabbitmq_backup_' . date('YmdHis') . '.json');

实际应用场景

场景一:CI/CD 集成

php
<?php

class CICDIntegration
{
    private $api;

    public function __construct(RabbitMQApiClient $api)
    {
        $this->api = $api;
    }

    public function deployEnvironment($env)
    {
        $configFile = "/config/rabbitmq/{$env}.json";
        $config = json_decode(file_get_contents($configFile), true);
        
        $batch = new RabbitMQBatchOperations($this->api);
        return $batch->setupApplication($config);
    }

    public function runSmokeTests($env)
    {
        $results = [];
        
        $testExchange = "test.{$env}.exchange";
        $testQueue = "test.{$env}.queue";
        $testMessage = ['test' => true, 'timestamp' => time()];
        
        $results['create_exchange'] = $this->api->createExchange($testExchange, 'direct');
        $results['create_queue'] = $this->api->createQueue($testQueue);
        $results['create_binding'] = $this->api->createBinding($testExchange, $testQueue, 'test');
        
        $results['publish'] = $this->api->publishMessage(
            $testExchange,
            'test',
            json_encode($testMessage)
        );
        
        $messages = $this->api->getMessages($testQueue, 1);
        $results['consume'] = $messages['success'] && count($messages['body']) > 0;
        
        $results['delete_queue'] = $this->api->deleteQueue($testQueue);
        $results['delete_exchange'] = $this->api->deleteExchange($testExchange);
        
        return $results;
    }
}

场景二:监控告警集成

php
<?php

class AlertIntegration
{
    private $api;
    private $webhookUrl;

    public function __construct(RabbitMQApiClient $api, $webhookUrl)
    {
        $this->api = $api;
        $this->webhookUrl = $webhookUrl;
    }

    public function sendAlert($level, $message, $details = [])
    {
        $payload = [
            'level' => $level,
            'message' => $message,
            'details' => $details,
            'timestamp' => date('c')
        ];
        
        $ch = curl_init($this->webhookUrl);
        curl_setopt_array($ch, [
            CURLOPT_POST => true,
            CURLOPT_POSTFIELDS => json_encode($payload),
            CURLOPT_HTTPHEADER => ['Content-Type: application/json'],
            CURLOPT_RETURNTRANSFER => true
        ]);
        curl_exec($ch);
        curl_close($ch);
    }

    public function monitorAndAlert()
    {
        $monitor = new RabbitMQMonitor($this->api);
        
        $systemHealth = $monitor->checkSystemHealth();
        if ($systemHealth['status'] !== 'healthy') {
            $this->sendAlert('warning', 'RabbitMQ 系统状态异常', $systemHealth);
        }
        
        $queueHealth = $monitor->checkAllQueues();
        foreach ($queueHealth['queues'] as $queue) {
            if ($queue['status'] === 'critical') {
                $this->sendAlert('critical', "队列 {$queue['queue']} 状态严重", $queue);
            }
        }
        
        $nodeHealth = $monitor->checkNodeHealth();
        if ($nodeHealth['status'] !== 'healthy') {
            $this->sendAlert('critical', 'RabbitMQ 节点异常', $nodeHealth);
        }
    }
}

常见问题与解决方案

问题一:认证失败

错误信息:HTTP 401 Unauthorized

解决方案

php
// 确保用户名密码正确
$api = new RabbitMQApiClient('localhost', 15672, 'admin', 'correct_password');

// 检查用户权限
$whoami = $api->whoami();
print_r($whoami);

问题二:虚拟主机编码问题

错误信息:HTTP 404 Not Found

原因:虚拟主机名称需要 URL 编码

解决方案

php
// 默认虚拟主机 "/" 需要编码为 "%2F"
$api->getQueue('my_queue', '%2F');

// 自定义编码函数
function encodeVhost($vhost) {
    return urlencode($vhost);
}

问题三:请求超时

错误信息:cURL timeout

解决方案

php
// 增加超时时间
$api = new RabbitMQApiClient('localhost', 15672, 'admin', 'password', 60);

// 或使用异步请求

问题四:消息编码问题

错误信息:消息内容乱码

解决方案

php
// 发布消息时指定编码
$api->publishMessage(
    'exchange',
    'routing.key',
    json_encode($data, JSON_UNESCAPED_UNICODE),
    '%2F',
    ['content_type' => 'application/json; charset=utf-8']
);

最佳实践建议

1. 安全配置

  • 使用 HTTPS(生产环境)
  • 创建专用 API 用户
  • 使用最小权限原则
  • 定期轮换密码

2. 错误处理

  • 检查 HTTP 状态码
  • 处理网络异常
  • 实现重试机制
  • 记录错误日志

3. 性能优化

  • 批量操作减少请求
  • 缓存常用数据
  • 使用连接池
  • 避免频繁轮询

4. 监控集成

  • 定期健康检查
  • 设置告警阈值
  • 记录关键指标
  • 集成到监控系统

5. 配置管理

  • 使用配置文件
  • 版本控制配置
  • 定期备份定义
  • 自动化部署

相关链接