Appearance
RabbitMQ HTTP API 详解
概述
RabbitMQ HTTP API 是由 rabbitmq_management 插件提供的 RESTful API 接口,允许开发者通过 HTTP 请求管理 RabbitMQ 服务器。HTTP API 提供了与命令行工具和 Web 管理界面相同的管理能力,非常适合集成到自动化运维系统、监控平台和应用程序中。
核心功能
- 系统监控:获取服务器状态、节点信息、统计数据
- 资源管理:创建、删除、查询队列、交换器、绑定
- 用户管理:创建、删除用户,设置权限
- 消息操作:发布消息、获取消息、清空队列
- 连接管理:查看连接、关闭连接
- 策略管理:创建、删除策略
- 虚拟主机管理:创建、删除虚拟主机
API 基础
访问地址
http://hostname:15672/api/认证方式
HTTP API 使用 HTTP Basic Authentication 进行认证:
Authorization: Basic base64(username:password)请求格式
Content-Type: application/json响应格式
Content-Type: application/jsonHTTP 状态码
| 状态码 | 说明 |
|---|---|
| 200 | 成功 |
| 201 | 创建成功 |
| 204 | 成功(无返回内容) |
| 400 | 请求参数错误 |
| 401 | 认证失败 |
| 403 | 权限不足 |
| 404 | 资源不存在 |
| 409 | 资源冲突 |
| 500 | 服务器内部错误 |
API 端点分类
概览与统计 API
| 端点 | 方法 | 说明 |
|---|---|---|
| /api/overview | GET | 获取系统概览 |
| /api/overview | POST | 重置统计 |
| /api/nodes | GET | 获取节点列表 |
| /api/nodes/ | GET | 获取节点详情 |
| /api/whoami | GET | 获取当前用户信息 |
连接管理 API
| 端点 | 方法 | 说明 |
|---|---|---|
| /api/connections | GET | 获取连接列表 |
| /api/connections/ | GET | 获取连接详情 |
| /api/connections/ | DELETE | 关闭连接 |
| /api/connections/{name}/channels | GET | 获取连接的通道 |
通道管理 API
| 端点 | 方法 | 说明 |
|---|---|---|
| /api/channels | GET | 获取通道列表 |
| /api/channels/ | GET | 获取通道详情 |
交换器管理 API
| 端点 | 方法 | 说明 |
|---|---|---|
| /api/exchanges | GET | 获取所有交换器 |
| /api/exchanges/ | GET | 获取虚拟主机的交换器 |
| /api/exchanges/{vhost}/ | GET | 获取交换器详情 |
| /api/exchanges/{vhost}/ | PUT | 创建交换器 |
| /api/exchanges/{vhost}/ | DELETE | 删除交换器 |
| /api/exchanges/{vhost}/{name}/publish | POST | 发布消息 |
| /api/exchanges/{vhost}/{name}/bindings/source | GET | 获取源绑定 |
| /api/exchanges/{vhost}/{name}/bindings/destination | GET | 获取目标绑定 |
队列管理 API
| 端点 | 方法 | 说明 |
|---|---|---|
| /api/queues | GET | 获取所有队列 |
| /api/queues/ | GET | 获取虚拟主机的队列 |
| /api/queues/{vhost}/ | GET | 获取队列详情 |
| /api/queues/{vhost}/ | PUT | 创建队列 |
| /api/queues/{vhost}/ | DELETE | 删除队列 |
| /api/queues/{vhost}/{name}/contents | DELETE | 清空队列 |
| /api/queues/{vhost}/{name}/get | POST | 获取消息 |
| /api/queues/{vhost}/{name}/bindings | GET | 获取绑定关系 |
| /api/queues/{vhost}/{name}/purge | POST | 清空队列 |
绑定管理 API
| 端点 | 方法 | 说明 |
|---|---|---|
| /api/bindings | GET | 获取所有绑定 |
| /api/bindings/ | GET | 获取虚拟主机的绑定 |
| /api/bindings/{vhost}/e/{exchange}/q/ | GET | 获取交换器到队列的绑定 |
| /api/bindings/{vhost}/e/{exchange}/q/ | POST | 创建绑定 |
| /api/bindings/{vhost}/e/{exchange}/q/{queue}/ | DELETE | 删除绑定 |
| /api/bindings/{vhost}/e/{source}/e/ | POST | 交换器到交换器绑定 |
用户管理 API
| 端点 | 方法 | 说明 |
|---|---|---|
| /api/users | GET | 获取用户列表 |
| /api/users/ | GET | 获取用户详情 |
| /api/users/ | PUT | 创建/更新用户 |
| /api/users/ | DELETE | 删除用户 |
| /api/users/{name}/permissions | GET | 获取用户权限 |
权限管理 API
| 端点 | 方法 | 说明 |
|---|---|---|
| /api/permissions | GET | 获取所有权限 |
| /api/permissions/{vhost}/ | GET | 获取特定权限 |
| /api/permissions/{vhost}/ | PUT | 设置权限 |
| /api/permissions/{vhost}/ | DELETE | 删除权限 |
虚拟主机管理 API
| 端点 | 方法 | 说明 |
|---|---|---|
| /api/vhosts | GET | 获取虚拟主机列表 |
| /api/vhosts/ | GET | 获取虚拟主机详情 |
| /api/vhosts/ | PUT | 创建虚拟主机 |
| /api/vhosts/ | DELETE | 删除虚拟主机 |
| /api/vhosts/{name}/permissions | GET | 获取虚拟主机权限 |
策略管理 API
| 端点 | 方法 | 说明 |
|---|---|---|
| /api/policies | GET | 获取所有策略 |
| /api/policies/ | GET | 获取虚拟主机的策略 |
| /api/policies/{vhost}/ | GET | 获取策略详情 |
| /api/policies/{vhost}/ | PUT | 创建/更新策略 |
| /api/policies/{vhost}/ | DELETE | 删除策略 |
参数管理 API
| 端点 | 方法 | 说明 |
|---|---|---|
| /api/parameters | GET | 获取所有参数 |
| /api/parameters/ | GET | 获取组件参数 |
| /api/parameters/{component}/{vhost}/ | PUT | 设置参数 |
| /api/parameters/{component}/{vhost}/ | DELETE | 删除参数 |
PHP 代码示例
完整的 RabbitMQ API 客户端
php
<?php
class RabbitMQApiClient
{
private $host;
private $port;
private $username;
private $password;
private $timeout;
public function __construct(
$host = 'localhost',
$port = 15672,
$username = 'guest',
$password = 'guest',
$timeout = 30
) {
$this->host = $host;
$this->port = $port;
$this->username = $username;
$this->password = $password;
$this->timeout = $timeout;
}
private function request($method, $path, $data = null, $query = [])
{
$url = "http://{$this->host}:{$this->port}/api/{$path}";
if (!empty($query)) {
$url .= '?' . http_build_query($query);
}
$ch = curl_init();
$options = [
CURLOPT_URL => $url,
CURLOPT_RETURNTRANSFER => true,
CURLOPT_USERPWD => "{$this->username}:{$this->password}",
CURLOPT_HTTPHEADER => ['Content-Type: application/json'],
CURLOPT_CUSTOMREQUEST => $method,
CURLOPT_TIMEOUT => $this->timeout,
CURLOPT_FOLLOWLOCATION => true,
];
if ($data !== null) {
$options[CURLOPT_POSTFIELDS] = json_encode($data);
}
curl_setopt_array($ch, $options);
$response = curl_exec($ch);
$httpCode = curl_getinfo($ch, CURLINFO_HTTP_CODE);
$error = curl_error($ch);
curl_close($ch);
return [
'code' => $httpCode,
'body' => json_decode($response, true),
'error' => $error,
'success' => $httpCode >= 200 && $httpCode < 300
];
}
public function getOverview()
{
return $this->request('GET', 'overview');
}
public function resetStats()
{
return $this->request('POST', 'overview');
}
public function getNodes()
{
return $this->request('GET', 'nodes');
}
public function getNode($name)
{
return $this->request('GET', "nodes/{$name}");
}
public function whoami()
{
return $this->request('GET', 'whoami');
}
public function getConnections()
{
return $this->request('GET', 'connections');
}
public function getConnection($name)
{
return $this->request('GET', "connections/{$name}");
}
public function closeConnection($name, $reason = 'API requested')
{
return $this->request('DELETE', "connections/{$name}", ['reason' => $reason]);
}
public function getChannels()
{
return $this->request('GET', 'channels');
}
public function getChannel($name)
{
return $this->request('GET', "channels/{$name}");
}
public function getExchanges($vhost = null)
{
if ($vhost) {
return $this->request('GET', "exchanges/{$vhost}");
}
return $this->request('GET', 'exchanges');
}
public function getExchange($name, $vhost = '%2F')
{
return $this->request('GET', "exchanges/{$vhost}/{$name}");
}
public function createExchange($name, $type, $vhost = '%2F', $options = [])
{
$data = array_merge([
'type' => $type,
'auto_delete' => false,
'durable' => true,
'internal' => false,
'arguments' => []
], $options);
return $this->request('PUT', "exchanges/{$vhost}/{$name}", $data);
}
public function deleteExchange($name, $vhost = '%2F')
{
return $this->request('DELETE', "exchanges/{$vhost}/{$name}");
}
public function publishMessage($exchange, $routingKey, $payload, $vhost = '%2F', $properties = [])
{
$data = array_merge([
'properties' => array_merge([
'delivery_mode' => 2,
'content_type' => 'application/json'
], $properties),
'routing_key' => $routingKey,
'payload' => $payload,
'payload_encoding' => 'string'
], []);
return $this->request('POST', "exchanges/{$vhost}/{$exchange}/publish", $data);
}
public function getQueues($vhost = null)
{
if ($vhost) {
return $this->request('GET', "queues/{$vhost}");
}
return $this->request('GET', 'queues');
}
public function getQueue($name, $vhost = '%2F')
{
return $this->request('GET', "queues/{$vhost}/{$name}");
}
public function createQueue($name, $vhost = '%2F', $options = [])
{
$data = array_merge([
'auto_delete' => false,
'durable' => true,
'arguments' => []
], $options);
return $this->request('PUT', "queues/{$vhost}/{$name}", $data);
}
public function deleteQueue($name, $vhost = '%2F')
{
return $this->request('DELETE', "queues/{$vhost}/{$name}");
}
public function purgeQueue($name, $vhost = '%2F')
{
return $this->request('DELETE', "queues/{$vhost}/{$name}/contents");
}
public function getMessages($queue, $count = 1, $ackMode = 'ack_requeue_false', $vhost = '%2F')
{
$data = [
'count' => $count,
'ackmode' => $ackMode,
'encoding' => 'auto'
];
return $this->request('POST', "queues/{$vhost}/{$queue}/get", $data);
}
public function getBindings($vhost = null)
{
if ($vhost) {
return $this->request('GET', "bindings/{$vhost}");
}
return $this->request('GET', 'bindings');
}
public function createBinding($exchange, $queue, $routingKey = '', $vhost = '%2F', $arguments = [])
{
$data = [
'routing_key' => $routingKey,
'arguments' => $arguments
];
return $this->request('POST', "bindings/{$vhost}/e/{$exchange}/q/{$queue}", $data);
}
public function deleteBinding($exchange, $queue, $routingKey = '', $vhost = '%2F')
{
$bindings = $this->request('GET', "bindings/{$vhost}/e/{$exchange}/q/{$queue}");
if ($bindings['success']) {
foreach ($bindings['body'] as $binding) {
if ($binding['routing_key'] === $routingKey) {
$props = $binding['properties_key'];
return $this->request('DELETE', "bindings/{$vhost}/e/{$exchange}/q/{$queue}/{$props}");
}
}
}
return ['success' => false, 'code' => 404, 'body' => ['error' => 'Binding not found']];
}
public function getUsers()
{
return $this->request('GET', 'users');
}
public function getUser($name)
{
return $this->request('GET', "users/{$name}");
}
public function createUser($name, $password, $tags = '')
{
$data = [
'password' => $password,
'tags' => $tags
];
return $this->request('PUT', "users/{$name}", $data);
}
public function deleteUser($name)
{
return $this->request('DELETE', "users/{$name}");
}
public function getPermissions()
{
return $this->request('GET', 'permissions');
}
public function getPermission($vhost, $user)
{
return $this->request('GET', "permissions/{$vhost}/{$user}");
}
public function setPermission($user, $vhost, $configure = '.*', $write = '.*', $read = '.*')
{
$data = [
'configure' => $configure,
'write' => $write,
'read' => $read
];
return $this->request('PUT', "permissions/{$vhost}/{$user}", $data);
}
public function deletePermission($user, $vhost)
{
return $this->request('DELETE', "permissions/{$vhost}/{$user}");
}
public function getVhosts()
{
return $this->request('GET', 'vhosts');
}
public function getVhost($name)
{
return $this->request('GET', "vhosts/{$name}");
}
public function createVhost($name)
{
return $this->request('PUT', "vhosts/{$name}");
}
public function deleteVhost($name)
{
return $this->request('DELETE', "vhosts/{$name}");
}
public function getPolicies($vhost = null)
{
if ($vhost) {
return $this->request('GET', "policies/{$vhost}");
}
return $this->request('GET', 'policies');
}
public function createPolicy($name, $pattern, $definition, $vhost = '%2F', $priority = 0, $applyTo = 'all')
{
$data = [
'pattern' => $pattern,
'definition' => $definition,
'priority' => $priority,
'apply-to' => $applyTo
];
return $this->request('PUT', "policies/{$vhost}/{$name}", $data);
}
public function deletePolicy($name, $vhost = '%2F')
{
return $this->request('DELETE', "policies/{$vhost}/{$name}");
}
public function getParameters($component = null)
{
if ($component) {
return $this->request('GET', "parameters/{$component}");
}
return $this->request('GET', 'parameters');
}
public function setParameter($component, $name, $value, $vhost = '%2F')
{
$data = [
'value' => $value
];
return $this->request('PUT', "parameters/{$component}/{$vhost}/{$name}", $data);
}
public function deleteParameter($component, $name, $vhost = '%2F')
{
return $this->request('DELETE', "parameters/{$component}/{$vhost}/{$name}");
}
public function exportDefinitions($vhost = null)
{
$query = $vhost ? ['vhost' => $vhost] : [];
return $this->request('GET', 'definitions', null, $query);
}
public function importDefinitions($definitions)
{
return $this->request('POST', 'definitions', $definitions);
}
}使用示例
php
<?php
require_once 'RabbitMQApiClient.php';
$api = new RabbitMQApiClient('localhost', 15672, 'admin', 'admin123');
$overview = $api->getOverview();
echo "RabbitMQ 版本: {$overview['body']['rabbitmq_version']}\n";
echo "Erlang 版本: {$overview['body']['erlang_version']}\n";
echo "队列总数: {$overview['body']['object_totals']['queues']}\n";
echo "连接总数: {$overview['body']['object_totals']['connections']}\n";
$result = $api->createUser('app_user', 'secure_password', 'monitoring');
echo "创建用户: " . ($result['success'] ? '成功' : '失败') . "\n";
$result = $api->setPermission('app_user', '%2F', '^app\.', '^app\.', '^app\.');
echo "设置权限: " . ($result['success'] ? '成功' : '失败') . "\n";
$result = $api->createVhost('production');
echo "创建虚拟主机: " . ($result['success'] ? '成功' : '失败') . "\n";
$result = $api->createQueue('app.orders', '%2F', [
'arguments' => [
'x-message-ttl' => 86400000,
'x-max-length' => 10000
]
]);
echo "创建队列: " . ($result['success'] ? '成功' : '失败') . "\n";
$result = $api->createExchange('app.exchange', 'topic', '%2F');
echo "创建交换器: " . ($result['success'] ? '成功' : '失败') . "\n";
$result = $api->createBinding('app.exchange', 'app.orders', 'order.*');
echo "创建绑定: " . ($result['success'] ? '成功' : '失败') . "\n";
$result = $api->publishMessage(
'app.exchange',
'order.created',
json_encode(['order_id' => 12345, 'amount' => 99.99]),
'%2F',
['delivery_mode' => 2]
);
echo "发布消息: " . ($result['success'] ? '成功' : '失败') . "\n";
$messages = $api->getMessages('app.orders', 5);
if ($messages['success']) {
foreach ($messages['body'] as $msg) {
echo "消息: {$msg['payload']}\n";
}
}
$haPolicy = [
'ha-mode' => 'all',
'ha-sync-mode' => 'automatic'
];
$result = $api->createPolicy('ha-all', '^ha\.', $haPolicy, '%2F', 1, 'queues');
echo "创建策略: " . ($result['success'] ? '成功' : '失败') . "\n";
$queues = $api->getQueues();
foreach ($queues['body'] as $queue) {
echo "队列: {$queue['name']}, 消息数: {$queue['messages']}\n";
}
$definitions = $api->exportDefinitions();
file_put_contents('rabbitmq_definitions.json', json_encode($definitions['body'], JSON_PRETTY_PRINT));
echo "配置已导出到 rabbitmq_definitions.json\n";监控类
php
<?php
class RabbitMQMonitor
{
private $api;
private $thresholds;
public function __construct(RabbitMQApiClient $api, array $thresholds = [])
{
$this->api = $api;
$this->thresholds = array_merge([
'queue_messages_warning' => 1000,
'queue_messages_critical' => 10000,
'consumer_min' => 1,
'connection_max' => 1000,
'memory_high_watermark' => 0.8
], $thresholds);
}
public function checkSystemHealth()
{
$overview = $this->api->getOverview();
if (!$overview['success']) {
return [
'status' => 'error',
'message' => '无法连接到 RabbitMQ'
];
}
$data = $overview['body'];
$alerts = [];
$connections = $data['object_totals']['connections'];
if ($connections > $this->thresholds['connection_max']) {
$alerts[] = [
'level' => 'warning',
'message' => "连接数过多: {$connections}"
];
}
return [
'status' => empty($alerts) ? 'healthy' : 'warning',
'rabbitmq_version' => $data['rabbitmq_version'],
'erlang_version' => $data['erlang_version'],
'connections' => $connections,
'channels' => $data['object_totals']['channels'],
'queues' => $data['object_totals']['queues'],
'exchanges' => $data['object_totals']['exchanges'],
'consumers' => $data['object_totals']['consumers'],
'messages' => $data['queue_totals']['messages'] ?? 0,
'message_rates' => [
'publish' => $data['message_stats']['publish_details']['rate'] ?? 0,
'deliver' => $data['message_stats']['deliver_get_details']['rate'] ?? 0
],
'alerts' => $alerts
];
}
public function checkQueueHealth($queueName, $vhost = '%2F')
{
$result = $this->api->getQueue($queueName, $vhost);
if (!$result['success']) {
return [
'status' => 'error',
'message' => "队列不存在或无法访问: {$queueName}"
];
}
$queue = $result['body'];
$alerts = [];
$status = 'healthy';
$messages = $queue['messages'] ?? 0;
if ($messages > $this->thresholds['queue_messages_critical']) {
$status = 'critical';
$alerts[] = [
'level' => 'critical',
'message' => "队列消息数严重积压: {$messages}"
];
} elseif ($messages > $this->thresholds['queue_messages_warning']) {
$status = 'warning';
$alerts[] = [
'level' => 'warning',
'message' => "队列消息数积压: {$messages}"
];
}
$consumers = $queue['consumers'] ?? 0;
if ($consumers < $this->thresholds['consumer_min']) {
$status = 'warning';
$alerts[] = [
'level' => 'warning',
'message' => "消费者数量不足: {$consumers}"
];
}
return [
'status' => $status,
'queue' => $queueName,
'vhost' => $vhost,
'messages' => $messages,
'messages_ready' => $queue['messages_ready'] ?? 0,
'messages_unacked' => $queue['messages_unacknowledged'] ?? 0,
'consumers' => $consumers,
'memory' => $queue['memory'] ?? 0,
'state' => $queue['state'] ?? 'unknown',
'alerts' => $alerts
];
}
public function checkAllQueues()
{
$result = $this->api->getQueues();
if (!$result['success']) {
return ['status' => 'error', 'message' => '无法获取队列列表'];
}
$queueHealth = [];
$criticalCount = 0;
$warningCount = 0;
foreach ($result['body'] as $queue) {
$health = $this->checkQueueHealth($queue['name'], $queue['vhost']);
$queueHealth[] = $health;
if ($health['status'] === 'critical') {
$criticalCount++;
} elseif ($health['status'] === 'warning') {
$warningCount++;
}
}
return [
'status' => $criticalCount > 0 ? 'critical' : ($warningCount > 0 ? 'warning' : 'healthy'),
'total_queues' => count($queueHealth),
'critical_count' => $criticalCount,
'warning_count' => $warningCount,
'queues' => $queueHealth
];
}
public function checkNodeHealth()
{
$result = $this->api->getNodes();
if (!$result['success']) {
return ['status' => 'error', 'message' => '无法获取节点信息'];
}
$nodes = [];
$alerts = [];
foreach ($result['body'] as $node) {
$nodeStatus = [
'name' => $node['name'],
'type' => $node['type'],
'running' => $node['running'] ?? false,
'mem_used' => $node['mem_used'] ?? 0,
'mem_limit' => $node['mem_limit'] ?? 0,
'mem_alarm' => $node['mem_alarm'] ?? false,
'disk_free' => $node['disk_free'] ?? 0,
'disk_free_limit' => $node['disk_free_limit'] ?? 0,
'disk_free_alarm' => $node['disk_free_alarm'] ?? false
];
if ($nodeStatus['mem_alarm']) {
$alerts[] = [
'level' => 'critical',
'node' => $node['name'],
'message' => '内存告警触发'
];
}
if ($nodeStatus['disk_free_alarm']) {
$alerts[] = [
'level' => 'critical',
'node' => $node['name'],
'message' => '磁盘空间告警触发'
];
}
if (!$nodeStatus['running']) {
$alerts[] = [
'level' => 'critical',
'node' => $node['name'],
'message' => '节点未运行'
];
}
$nodes[] = $nodeStatus;
}
return [
'status' => empty($alerts) ? 'healthy' : 'critical',
'nodes' => $nodes,
'alerts' => $alerts
];
}
public function generateHealthReport()
{
$system = $this->checkSystemHealth();
$queues = $this->checkAllQueues();
$nodes = $this->checkNodeHealth();
$report = "=== RabbitMQ 健康报告 ===\n";
$report .= "生成时间: " . date('Y-m-d H:i:s') . "\n\n";
$report .= "## 系统状态\n";
$report .= "状态: {$system['status']}\n";
$report .= "版本: RabbitMQ {$system['rabbitmq_version']}, Erlang {$system['erlang_version']}\n";
$report .= "连接数: {$system['connections']}\n";
$report .= "通道数: {$system['channels']}\n";
$report .= "队列数: {$system['queues']}\n";
$report .= "消息总数: {$system['messages']}\n\n";
$report .= "## 队列状态\n";
$report .= "总队列数: {$queues['total_queues']}\n";
$report .= "严重问题: {$queues['critical_count']}\n";
$report .= "警告问题: {$queues['warning_count']}\n\n";
$report .= "## 节点状态\n";
$report .= "节点数: " . count($nodes['nodes']) . "\n";
foreach ($nodes['nodes'] as $node) {
$status = $node['running'] ? '运行中' : '已停止';
$report .= " - {$node['name']}: {$status}\n";
}
if (!empty($system['alerts']) || !empty($nodes['alerts'])) {
$report .= "\n## 告警信息\n";
foreach (array_merge($system['alerts'], $nodes['alerts']) as $alert) {
$report .= " [{$alert['level']}] {$alert['message']}\n";
}
}
return $report;
}
}
$api = new RabbitMQApiClient('localhost', 15672, 'admin', 'admin123');
$monitor = new RabbitMQMonitor($api, [
'queue_messages_warning' => 5000,
'queue_messages_critical' => 50000
]);
$health = $monitor->checkQueueHealth('app.orders');
print_r($health);
echo $monitor->generateHealthReport();批量操作类
php
<?php
class RabbitMQBatchOperations
{
private $api;
public function __construct(RabbitMQApiClient $api)
{
$this->api = $api;
}
public function setupApplication($config)
{
$results = [];
if (!empty($config['vhosts'])) {
foreach ($config['vhosts'] as $vhost) {
$results['vhosts'][$vhost] = $this->api->createVhost($vhost);
}
}
if (!empty($config['users'])) {
foreach ($config['users'] as $username => $userData) {
$results['users'][$username] = $this->api->createUser(
$username,
$userData['password'],
$userData['tags'] ?? ''
);
if (!empty($userData['permissions'])) {
foreach ($userData['permissions'] as $vhost => $perms) {
$results['permissions'][$username][$vhost] = $this->api->setPermission(
$username,
$vhost,
$perms['configure'] ?? '.*',
$perms['write'] ?? '.*',
$perms['read'] ?? '.*'
);
}
}
}
}
if (!empty($config['exchanges'])) {
foreach ($config['exchanges'] as $name => $exchangeConfig) {
$results['exchanges'][$name] = $this->api->createExchange(
$name,
$exchangeConfig['type'],
$exchangeConfig['vhost'] ?? '%2F',
$exchangeConfig['options'] ?? []
);
}
}
if (!empty($config['queues'])) {
foreach ($config['queues'] as $name => $queueConfig) {
$results['queues'][$name] = $this->api->createQueue(
$name,
$queueConfig['vhost'] ?? '%2F',
$queueConfig['options'] ?? []
);
}
}
if (!empty($config['bindings'])) {
foreach ($config['bindings'] as $binding) {
$results['bindings'][$binding['exchange'] . ':' . $binding['queue']] = $this->api->createBinding(
$binding['exchange'],
$binding['queue'],
$binding['routing_key'] ?? '',
$binding['vhost'] ?? '%2F',
$binding['arguments'] ?? []
);
}
}
if (!empty($config['policies'])) {
foreach ($config['policies'] as $name => $policyConfig) {
$results['policies'][$name] = $this->api->createPolicy(
$name,
$policyConfig['pattern'],
$policyConfig['definition'],
$policyConfig['vhost'] ?? '%2F',
$policyConfig['priority'] ?? 0,
$policyConfig['apply_to'] ?? 'all'
);
}
}
return $results;
}
public function teardownApplication($config)
{
$results = [];
if (!empty($config['bindings'])) {
foreach ($config['bindings'] as $binding) {
$results['bindings'][] = $this->api->deleteBinding(
$binding['exchange'],
$binding['queue'],
$binding['routing_key'] ?? '',
$binding['vhost'] ?? '%2F'
);
}
}
if (!empty($config['queues'])) {
foreach ($config['queues'] as $name => $queueConfig) {
$results['queues'][$name] = $this->api->deleteQueue(
$name,
$queueConfig['vhost'] ?? '%2F'
);
}
}
if (!empty($config['exchanges'])) {
foreach ($config['exchanges'] as $name => $exchangeConfig) {
$results['exchanges'][$name] = $this->api->deleteExchange(
$name,
$exchangeConfig['vhost'] ?? '%2F'
);
}
}
if (!empty($config['users'])) {
foreach ($config['users'] as $username => $userData) {
$results['users'][$username] = $this->api->deleteUser($username);
}
}
if (!empty($config['vhosts'])) {
foreach ($config['vhosts'] as $vhost) {
$results['vhosts'][$vhost] = $this->api->deleteVhost($vhost);
}
}
return $results;
}
public function purgeAllQueues($vhost = null)
{
$queues = $this->api->getQueues($vhost);
$results = [];
if ($queues['success']) {
foreach ($queues['body'] as $queue) {
$results[$queue['name']] = $this->api->purgeQueue(
$queue['name'],
$queue['vhost']
);
}
}
return $results;
}
public function closeAllConnections($vhost = null)
{
$connections = $this->api->getConnections();
$results = [];
if ($connections['success']) {
foreach ($connections['body'] as $conn) {
if ($vhost === null || $conn['vhost'] === $vhost) {
$results[$conn['name']] = $this->api->closeConnection($conn['name']);
}
}
}
return $results;
}
public function backupConfiguration($filename)
{
$definitions = $this->api->exportDefinitions();
if ($definitions['success']) {
file_put_contents($filename, json_encode($definitions['body'], JSON_PRETTY_PRINT));
return true;
}
return false;
}
public function restoreConfiguration($filename)
{
if (!file_exists($filename)) {
return false;
}
$definitions = json_decode(file_get_contents($filename), true);
return $this->api->importDefinitions($definitions);
}
}
$config = [
'vhosts' => ['/app'],
'users' => [
'app_producer' => [
'password' => 'producer_pass',
'tags' => '',
'permissions' => [
'/app' => [
'configure' => '^app\.',
'write' => '^app\.',
'read' => '^app\.'
]
]
],
'app_consumer' => [
'password' => 'consumer_pass',
'tags' => '',
'permissions' => [
'/app' => [
'configure' => '^app\.',
'write' => '',
'read' => '^app\.'
]
]
]
],
'exchanges' => [
'app.orders' => [
'type' => 'topic',
'vhost' => '%2Fapp'
],
'app.notifications' => [
'type' => 'fanout',
'vhost' => '%2Fapp'
]
],
'queues' => [
'app.orders.created' => [
'vhost' => '%2Fapp',
'options' => [
'arguments' => [
'x-message-ttl' => 86400000,
'x-max-length' => 10000
]
]
],
'app.orders.completed' => [
'vhost' => '%2Fapp'
],
'app.notifications.email' => [
'vhost' => '%2Fapp'
]
],
'bindings' => [
[
'exchange' => 'app.orders',
'queue' => 'app.orders.created',
'routing_key' => 'order.created',
'vhost' => '%2Fapp'
],
[
'exchange' => 'app.orders',
'queue' => 'app.orders.completed',
'routing_key' => 'order.completed',
'vhost' => '%2Fapp'
],
[
'exchange' => 'app.notifications',
'queue' => 'app.notifications.email',
'routing_key' => '',
'vhost' => '%2Fapp'
]
],
'policies' => [
'ha-all' => [
'pattern' => '^app\.',
'definition' => [
'ha-mode' => 'all',
'ha-sync-mode' => 'automatic'
],
'vhost' => '%2Fapp',
'apply_to' => 'queues'
]
]
];
$api = new RabbitMQApiClient('localhost', 15672, 'admin', 'admin123');
$batch = new RabbitMQBatchOperations($api);
$results = $batch->setupApplication($config);
print_r($results);
$batch->backupConfiguration('/tmp/rabbitmq_backup_' . date('YmdHis') . '.json');实际应用场景
场景一:CI/CD 集成
php
<?php
class CICDIntegration
{
private $api;
public function __construct(RabbitMQApiClient $api)
{
$this->api = $api;
}
public function deployEnvironment($env)
{
$configFile = "/config/rabbitmq/{$env}.json";
$config = json_decode(file_get_contents($configFile), true);
$batch = new RabbitMQBatchOperations($this->api);
return $batch->setupApplication($config);
}
public function runSmokeTests($env)
{
$results = [];
$testExchange = "test.{$env}.exchange";
$testQueue = "test.{$env}.queue";
$testMessage = ['test' => true, 'timestamp' => time()];
$results['create_exchange'] = $this->api->createExchange($testExchange, 'direct');
$results['create_queue'] = $this->api->createQueue($testQueue);
$results['create_binding'] = $this->api->createBinding($testExchange, $testQueue, 'test');
$results['publish'] = $this->api->publishMessage(
$testExchange,
'test',
json_encode($testMessage)
);
$messages = $this->api->getMessages($testQueue, 1);
$results['consume'] = $messages['success'] && count($messages['body']) > 0;
$results['delete_queue'] = $this->api->deleteQueue($testQueue);
$results['delete_exchange'] = $this->api->deleteExchange($testExchange);
return $results;
}
}场景二:监控告警集成
php
<?php
class AlertIntegration
{
private $api;
private $webhookUrl;
public function __construct(RabbitMQApiClient $api, $webhookUrl)
{
$this->api = $api;
$this->webhookUrl = $webhookUrl;
}
public function sendAlert($level, $message, $details = [])
{
$payload = [
'level' => $level,
'message' => $message,
'details' => $details,
'timestamp' => date('c')
];
$ch = curl_init($this->webhookUrl);
curl_setopt_array($ch, [
CURLOPT_POST => true,
CURLOPT_POSTFIELDS => json_encode($payload),
CURLOPT_HTTPHEADER => ['Content-Type: application/json'],
CURLOPT_RETURNTRANSFER => true
]);
curl_exec($ch);
curl_close($ch);
}
public function monitorAndAlert()
{
$monitor = new RabbitMQMonitor($this->api);
$systemHealth = $monitor->checkSystemHealth();
if ($systemHealth['status'] !== 'healthy') {
$this->sendAlert('warning', 'RabbitMQ 系统状态异常', $systemHealth);
}
$queueHealth = $monitor->checkAllQueues();
foreach ($queueHealth['queues'] as $queue) {
if ($queue['status'] === 'critical') {
$this->sendAlert('critical', "队列 {$queue['queue']} 状态严重", $queue);
}
}
$nodeHealth = $monitor->checkNodeHealth();
if ($nodeHealth['status'] !== 'healthy') {
$this->sendAlert('critical', 'RabbitMQ 节点异常', $nodeHealth);
}
}
}常见问题与解决方案
问题一:认证失败
错误信息:HTTP 401 Unauthorized
解决方案:
php
// 确保用户名密码正确
$api = new RabbitMQApiClient('localhost', 15672, 'admin', 'correct_password');
// 检查用户权限
$whoami = $api->whoami();
print_r($whoami);问题二:虚拟主机编码问题
错误信息:HTTP 404 Not Found
原因:虚拟主机名称需要 URL 编码
解决方案:
php
// 默认虚拟主机 "/" 需要编码为 "%2F"
$api->getQueue('my_queue', '%2F');
// 自定义编码函数
function encodeVhost($vhost) {
return urlencode($vhost);
}问题三:请求超时
错误信息:cURL timeout
解决方案:
php
// 增加超时时间
$api = new RabbitMQApiClient('localhost', 15672, 'admin', 'password', 60);
// 或使用异步请求问题四:消息编码问题
错误信息:消息内容乱码
解决方案:
php
// 发布消息时指定编码
$api->publishMessage(
'exchange',
'routing.key',
json_encode($data, JSON_UNESCAPED_UNICODE),
'%2F',
['content_type' => 'application/json; charset=utf-8']
);最佳实践建议
1. 安全配置
- 使用 HTTPS(生产环境)
- 创建专用 API 用户
- 使用最小权限原则
- 定期轮换密码
2. 错误处理
- 检查 HTTP 状态码
- 处理网络异常
- 实现重试机制
- 记录错误日志
3. 性能优化
- 批量操作减少请求
- 缓存常用数据
- 使用连接池
- 避免频繁轮询
4. 监控集成
- 定期健康检查
- 设置告警阈值
- 记录关键指标
- 集成到监控系统
5. 配置管理
- 使用配置文件
- 版本控制配置
- 定期备份定义
- 自动化部署
