Skip to content

RabbitMQ rabbitmqctl 命令详解

概述

rabbitmqctl 是 RabbitMQ 的命令行管理工具,提供对 RabbitMQ 节点和集群的全面管理能力。通过 rabbitmqctl,管理员可以执行用户管理、权限配置、队列操作、节点管理、策略配置等操作。

rabbitmqctl 是 RabbitMQ 安装后自带的命令行工具,无需额外安装即可使用。

核心功能分类

类别说明
节点管理启动、停止、重启节点
用户管理创建、删除、修改用户
权限管理配置用户访问权限
队列管理创建、删除、清空队列
交换器管理创建、删除交换器
策略管理配置集群策略
集群管理节点加入/离开集群
状态查询查看系统状态和统计

基本语法

bash
rabbitmqctl <command> [command options]

常用选项

选项说明
-n node指定节点名称
-q静默模式,减少输出
-t timeout命令超时时间(秒)
--offline离线模式(节点未运行时)
--online强制在线模式

节点名称格式

rabbit@hostname

例如:

bash
rabbitmqctl -n rabbit@localhost status

用户管理命令

查看用户列表

bash
rabbitmqctl list_users

输出示例:

Listing users ...
user    tags
admin   [administrator]
guest   [administrator]
app_user    []

创建用户

bash
rabbitmqctl add_user <username> <password>

示例:

bash
rabbitmqctl add_user admin "SecureP@ssw0rd"

删除用户

bash
rabbitmqctl delete_user <username>

示例:

bash
rabbitmqctl delete_user old_user

修改用户密码

bash
rabbitmqctl change_password <username> <new_password>

示例:

bash
rabbitmqctl change_password admin "NewP@ssw0rd"

清空用户密码

bash
rabbitmqctl clear_password <username>

清空密码后,用户无法通过密码认证登录。

设置用户标签

bash
rabbitmqctl set_user_tags <username> <tag> [<tag> ...]

可用标签

  • administrator - 管理员,完全访问权限
  • monitoring - 监控者,可查看所有配置和统计
  • policymaker - 策略制定者,可管理策略
  • management - 管理者,可访问管理界面

示例:

bash
# 设置单个标签
rabbitmqctl set_user_tags admin administrator

# 设置多个标签
rabbitmqctl set_user_tags dev_user monitoring management

权限管理命令

查看用户权限

bash
# 查看所有用户的权限
rabbitmqctl list_permissions

# 查看特定用户的权限
rabbitmqctl list_user_permissions <username>

输出示例:

Listing permissions for user "admin" ...
vhost   configure write read
/       .*       .*   .*
/vhost2 .*       .*   .*

设置用户权限

bash
rabbitmqctl set_permissions [-p <vhost>] <username> <configure> <write> <read>

参数说明:

  • -p <vhost>:虚拟主机路径(默认 /
  • <configure>:配置权限正则
  • <write>:写入权限正则
  • <read>:读取权限正则

示例:

bash
# 授予所有资源的所有权限
rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"

# 授予特定前缀资源的权限
rabbitmqctl set_permissions -p / app_user "^app\." "^app\." "^app\."

# 只读权限
rabbitmqctl set_permissions -p / readonly_user "^$" "^$" ".*"

清除用户权限

bash
rabbitmqctl clear_permissions [-p <vhost>] <username>

示例:

bash
rabbitmqctl clear_permissions -p / app_user

虚拟主机管理命令

查看虚拟主机列表

bash
rabbitmqctl list_vhosts

创建虚拟主机

bash
rabbitmqctl add_vhost <vhost_path>

示例:

bash
rabbitmqctl add_vhost /production
rabbitmqctl add_vhost /staging

删除虚拟主机

bash
rabbitmqctl delete_vhost <vhost_path>

示例:

bash
rabbitmqctl delete_vhost /staging

注意:删除虚拟主机会同时删除其中的所有队列、交换器和绑定关系。

查看虚拟主机权限

bash
rabbitmqctl list_permissions -p <vhost_path>

队列管理命令

查看队列列表

bash
# 查看默认虚拟主机的队列
rabbitmqctl list_queues

# 查看特定虚拟主机的队列
rabbitmqctl list_queues -p /myvhost

常用列:

bash
rabbitmqctl list_queues name messages consumers memory

常用列说明

列名说明
name队列名称
messages消息总数
messages_ready准备消费的消息数
messages_unacked未确认消息数
consumers消费者数量
memory内存使用(字节)
durable是否持久化
auto_delete是否自动删除

创建队列

bash
rabbitmqctl declare_queue [-p <vhost>] <queue_name>

示例:

bash
# 创建持久化队列
rabbitmqctl declare_queue -p / my_queue

# 使用队列属性
rabbitmqctl declare_queue -- durable=true -- auto_delete=false my_queue

删除队列

bash
rabbitmqctl delete_queue [-p <vhost>] <queue_name>

示例:

bash
rabbitmqctl delete_queue -p / my_queue

清空队列消息

bash
rabbitmqctl purge_queue [-p <vhost>] <queue_name>

示例:

bash
rabbitmqctl purge_queue -p / my_queue

获取队列信息

bash
rabbitmqctl list_queueatters <queue_name>

交换器管理命令

查看交换器列表

bash
rabbitmqctl list_exchanges

常用列:

bash
rabbitmqctl list_exchanges name type durable auto_delete

创建交换器

bash
rabbitmqctl declare_exchange [-p <vhost>] <exchange_name> <type>

类型:direct, topic, fanout, headers

示例:

bash
# 创建 direct 类型的交换器
rabbitmqctl declare_exchange -p / my_direct direct

# 创建 topic 类型的交换器(持久化)
rabbitmqctl declare_exchange -p / my_topic topic -- durable=true

删除交换器

bash
rabbitmqctl delete_exchange [-p <vhost>] <exchange_name>

绑定管理命令

查看绑定关系

bash
# 查看所有绑定
rabbitmqctl list_bindings

# 查看特定交换器的绑定
rabbitmqctl list_bindings -p / source my_exchange

创建绑定

bash
rabbitmqctl declare_binding [-p <vhost>] <exchange_name> <queue_name> <routing_key>

示例:

bash
rabbitmqctl declare_binding -p / my_exchange my_queue my.routing.key

策略管理命令

查看策略列表

bash
rabbitmqctl list_policies

创建策略

bash
rabbitmqctl set_policy [-p <vhost>] <name> <pattern> <definition> [--priority <priority>] [--apply-to <apply-to>]

参数说明:

  • -p <vhost>:虚拟主机
  • <name>:策略名称
  • <pattern>:匹配模式(正则表达式)
  • <definition>:策略定义(JSON 格式)
  • --priority:优先级(数字,越高越优先)
  • --apply-to:应用对象(queues, exchanges, all)

示例:

HA 镜像队列策略

bash
rabbitmqctl set_policy ha-all "^ha\." '{"ha-mode":"all"}' --priority 1 --apply-to queues

TTL 策略

bash
rabbitmqctl set_policy ttl-policy "^ttl\." '{"message-ttl":86400000}' --priority 1

最大长度策略

bash
rabbitmqctl set_policy max-length "^max\." '{"max-length":10000,"overflow":"reject-publish-dlx"}'

删除策略

bash
rabbitmqctl clear_policy [-p <vhost>] <policy_name>

示例:

bash
rabbitmqctl clear_policy -p / ha-all

节点管理命令

查看节点状态

bash
rabbitmqctl status

输出包含:

  • 节点名称
  • 内存使用
  • 进程信息
  • 磁盘空间
  • 网络连接统计
  • 队列统计

查看集群状态

bash
rabbitmqctl cluster_status

关闭节点

bash
# 优雅关闭(等待连接关闭)
rabbitmqctl stop

# 强制关闭
rabbitmqctl stop_app

# 启动节点
rabbitmqctl start_app

节点重置

bash
# 重置节点(清除所有数据)
rabbitmqctl reset

# 强制重置
rabbitmqctl force_reset

警告:reset 会删除节点上的所有数据,请谨慎使用。

集群管理命令

将节点加入集群

bash
# 在要加入的节点上执行
rabbitmqctl join_cluster <cluster_node>

示例:

bash
rabbitmqctl join_cluster rabbit@node1

离开集群

bash
# 节点主动离开
rabbitmqctl leave_cluster

# 从其他节点强制移除节点
rabbitmqctl forget_cluster_node rabbit@node2

更改集群节点类型

bash
# 将节点设置为磁盘节点
rabbitmqctl set_cluster_disk_node

# 将节点设置为内存节点
rabbitmqctl set_cluster_ram_node

设置集群名称

bash
rabbitmqctl set_cluster_name my_cluster

消息确认命令

列出未确认消息

bash
rabbitmqctl list_channels

重新排队未确认消息

bash
# 通过连接名称
rabbitmqctl reopen_connections --vhost <vhost>

日志和诊断命令

查看日志

bash
# 查看日志文件位置
rabbitmqctl rotate_logs

# 指定日志文件
tail -f /var/log/rabbitmq/rabbit@hostname.log

异步任务队列

bash
# 列出异步任务
rabbitmqctl list_async_tasks

高级命令

启用功能标志

bash
rabbitmqctl enable_feature_flag <feature_name>

示例:

bash
rabbitmqctl enable_feature_flag quorum_queues
rabbitmqctl enable_feature_flag stream_queue

关闭连接

bash
rabbitmqctl close_connection <connection_pid> <reason>

示例:

bash
rabbitmqctl close_connection "<rabbit@hostname.1.2.3>" "Admin requested closure"

关闭所有用户连接

bash
rabbitmqctl close_all_user_connections <username>

导出配置

bash
rabbitmqctl export_definitions <filename>

导入配置

bash
rabbitmqctl import_definitions <filename>

PHP 代码示例

使用 SSH/Exec 执行命令

php
<?php

class RabbitMQControl
{
    private $host;
    private $sshConnection;

    public function __construct($host, $username = 'root', $password = null, $keyFile = null)
    {
        $this->host = $host;
        
        $this->sshConnection = ssh2_connect($host, 22);
        
        if ($password) {
            ssh2_auth_password($this->sshConnection, $username, $password);
        } elseif ($keyFile) {
            ssh2_auth_pubkey_file($this->sshConnection, $username, $keyFile . '.pub', $keyFile);
        }
    }

    private function execute($command)
    {
        $stream = ssh2_exec($this->sshConnection, $command);
        stream_set_blocking($stream, true);
        $output = stream_get_contents($stream);
        fclose($stream);
        
        return $output;
    }

    public function listUsers()
    {
        return $this->execute('rabbitmqctl list_users');
    }

    public function addUser($username, $password)
    {
        $command = sprintf('rabbitmqctl add_user %s %s', $username, $password);
        return $this->execute($command);
    }

    public function deleteUser($username)
    {
        $command = sprintf('rabbitmqctl delete_user %s', $username);
        return $this->execute($command);
    }

    public function setUserTags($username, $tags)
    {
        $command = sprintf('rabbitmqctl set_user_tags %s %s', $username, implode(' ', $tags));
        return $this->execute($command);
    }

    public function setPermissions($username, $vhost = '/', $configure = '.*', $write = '.*', $read = '.*')
    {
        $command = sprintf('rabbitmqctl set_permissions -p %s %s "%s" "%s" "%s"', 
            $vhost, $username, $configure, $write, $read);
        return $this->execute($command);
    }

    public function listQueues($vhost = '/')
    {
        $command = sprintf('rabbitmqctl list_queues -p %s name messages consumers memory', $vhost);
        return $this->execute($command);
    }

    public function createQueue($queueName, $vhost = '/', $durable = true, $autoDelete = false)
    {
        $command = sprintf('rabbitmqctl declare_queue -p %s -- durable=%s -- auto_delete=%s %s',
            $vhost, $durable ? 'true' : 'false', $autoDelete ? 'true' : 'false', $queueName);
        return $this->execute($command);
    }

    public function deleteQueue($queueName, $vhost = '/')
    {
        $command = sprintf('rabbitmqctl delete_queue -p %s %s', $vhost, $queueName);
        return $this->execute($command);
    }

    public function purgeQueue($queueName, $vhost = '/')
    {
        $command = sprintf('rabbitmqctl purge_queue -p %s %s', $vhost, $queueName);
        return $this->execute($command);
    }

    public function createExchange($exchangeName, $type, $vhost = '/', $durable = true)
    {
        $command = sprintf('rabbitmqctl declare_exchange -p %s -- durable=%s %s %s',
            $vhost, $durable ? 'true' : 'false', $exchangeName, $type);
        return $this->execute($command);
    }

    public function setPolicy($name, $pattern, $definition, $vhost = '/', $priority = 1, $applyTo = 'queues')
    {
        $command = sprintf('rabbitmqctl set_policy -p %s %s "%s" \'%s\' --priority %d --apply-to %s',
            $vhost, $name, $pattern, json_encode($definition), $priority, $applyTo);
        return $this->execute($command);
    }

    public function getStatus()
    {
        return $this->execute('rabbitmqctl status');
    }

    public function getClusterStatus()
    {
        return $this->execute('rabbitmqctl cluster_status');
    }

    public function getQueueInfo($queueName, $vhost = '/')
    {
        $command = sprintf('rabbitmqctl list_queueatters %s -p %s', $queueName, $vhost);
        return $this->execute($command);
    }

    public function closeConnection($reason = 'Admin requested')
    {
        $connections = $this->execute("rabbitmqctl list_connections | tail -n +2 | awk '{print \$1}'");
        $connectionPids = array_filter(explode("\n", trim($connections)));
        
        foreach ($connectionPids as $pid) {
            $this->execute("rabbitmqctl close_connection \"$pid\" \"$reason\"");
        }
    }

    public function addVhost($vhost)
    {
        $command = sprintf('rabbitmqctl add_vhost %s', $vhost);
        return $this->execute($command);
    }

    public function deleteVhost($vhost)
    {
        $command = sprintf('rabbitmqctl delete_vhost %s', $vhost);
        return $this->execute($command);
    }

    public function exportDefinitions($filename)
    {
        $command = sprintf('rabbitmqctl export_definitions %s', $filename);
        return $this->execute($command);
    }

    public function importDefinitions($filename)
    {
        $command = sprintf('rabbitmqctl import_definitions %s', $filename);
        return $this->execute($command);
    }
}

$rabbit = new RabbitMQControl('192.168.1.100', 'admin', 'password');

echo $rabbit->addUser('app_user', 'secure_password');
echo $rabbit->setUserTags('app_user', ['monitoring']);
echo $rabbit->setPermissions('app_user', '/', '^app\.', '^app\.', '^app\.');

echo $rabbit->createQueue('app.orders', '/', true, false);
echo $rabbit->createExchange('app.exchange', 'topic', '/', true);

$haPolicy = [
    'ha-mode' => 'all',
    'ha-sync-mode' => 'automatic'
];
echo $rabbit->setPolicy('ha-all', '^ha\.', $haPolicy, '/', 1);

echo $rabbit->getClusterStatus();

使用消息队列执行命令

php
<?php

class RabbitMQAdminScript
{
    private $outputDir = '/tmp/rabbitmq-scripts';

    public function __construct()
    {
        if (!is_dir($this->outputDir)) {
            mkdir($this->outputDir, 0755, true);
        }
    }

    public function generateScript($commands)
    {
        $scriptPath = $this->outputDir . '/setup_' . time() . '.sh';
        
        $content = "#!/bin/bash\n\n";
        foreach ($commands as $command) {
            $content .= "rabbitmqctl {$command}\n";
        }
        
        file_put_contents($scriptPath, $content);
        chmod($scriptPath, 0755);
        
        return $scriptPath;
    }

    public function executeCommands($host, $commands)
    {
        $scriptPath = $this->generateScript($commands);
        
        $command = sprintf(
            'ssh -o StrictHostKeyChecking=no %s "bash -s" < %s',
            $host,
            $scriptPath
        );
        
        exec($command, $output, $returnCode);
        
        unlink($scriptPath);
        
        return [
            'return_code' => $returnCode,
            'output' => implode("\n", $output)
        ];
    }
}

$script = new RabbitMQAdminScript();

$commands = [
    'add_user app_user "secure_password"',
    'set_user_tags app_user monitoring',
    'set_permissions -p / app_user "^app\\\." "^app\\\." "^app\\\."',
    'add_vhost /app',
    'declare_queue -- durable=true --auto_delete=false app.orders',
    'declare_exchange -- durable=true app.exchange topic',
    'declare_binding -p / app.exchange app.orders "order.#"',
    'set_policy ha-all "^ha\\\." \'{"ha-mode":"all"}\' --priority 1 --apply-to queues'
];

$result = $script->executeCommands('192.168.1.100', $commands);

echo "返回码: {$result['return_code']}\n";
echo "输出:\n{$result['output']}\n";

实际应用场景

场景一:自动化部署

在 CI/CD 流程中配置消息队列:

bash
#!/bin/bash

# 自动化 RabbitMQ 配置脚本

# 创建用户
rabbitmqctl add_user ${RABBITMQ_USER} ${RABBITMQ_PASSWORD}

# 设置标签
rabbitmqctl set_user_tags ${RABBITMQ_USER} monitoring

# 设置权限
rabbitmqctl set_permissions -p / ${RABBITMQ_USER} "^${APP_PREFIX}\." "^${APP_PREFIX}\." "^${APP_PREFIX}\."

# 创建虚拟主机
rabbitmqctl add_vhost /${APP_PREFIX}

# 设置 HA 策略
rabbitmqctl set_policy ha-${APP_PREFIX} "^${APP_PREFIX}\." \
    '{"ha-mode":"all","ha-sync-mode":"automatic"}' \
    --priority 1 --apply-to queues

echo "RabbitMQ 配置完成"

场景二:监控脚本

bash
#!/bin/bash

# 检查队列积压
MESSAGES=$(rabbitmqctl list_queues -p / name messages | grep my_queue | awk '{print $2}')

if [ "$MESSAGES" -gt 10000 ]; then
    echo "警告: 队列积压严重,当前消息数: $MESSAGES"
    # 发送告警通知
    curl -X POST "https://notify.example.com/alert" -d "message=队列积压: $MESSAGES"
fi

场景三:清理脚本

bash
#!/bin/bash

# 清理测试环境

# 删除测试队列
rabbitmqctl delete_queue test_queue_1
rabbitmqctl delete_queue test_queue_2

# 清空积压队列
rabbitmqctl purge_queue production_queue

# 删除过期用户
rabbitmqctl delete_user test_user

echo "清理完成"

常见问题与解决方案

问题一:权限不足

错误信息

Error: operation not permitted - the current user has insufficient permissions to execute this command

解决方案

bash
# 确保以管理员权限运行
rabbitmqctl set_user_tags your_user administrator

问题二:节点无法连接

错误信息

Error: unable to connect to node rabbit@hostname: nodedown

解决方案

bash
# 检查节点状态
rabbitmqctl status

# 检查网络连接
ping hostname

# 检查 Erlang Cookie
cat /var/lib/rabbitmq/.erlang.cookie

问题三:虚拟主机不存在

错误信息

Error: vhost /myvhost does not exist

解决方案

bash
# 创建虚拟主机
rabbitmqctl add_vhost /myvhost

问题四:策略冲突

错误信息

Error: (406) PRECONDITION_FAILED - inequivalent arg 'x-message-ttl' for queue

解决方案

bash
# 清除旧策略
rabbitmqctl clear_policy -p / old_policy

# 重新设置新策略
rabbitmqctl set_policy new_policy ...

问题五:无法删除队列

原因:队列有消费者连接或消息

解决方案

bash
# 先清空队列
rabbitmqctl purge_queue queue_name

# 关闭所有消费者
rabbitmqctl close_all_user_connections consumer_user

# 强制删除
rabbitmqctl delete_queue queue_name

最佳实践建议

1. 安全实践

  • 避免在命令行中明文输入密码
  • 使用环境变量或配置文件
  • 定期更换密码
  • 使用最小权限原则

2. 脚本化运维

  • 将常用操作写成脚本
  • 使用版本控制管理脚本
  • 记录操作日志
  • 做好备份

3. 集群操作

  • 先在测试环境验证
  • 逐个节点操作
  • 观察集群状态变化
  • 准备好回滚方案

4. 权限管理

  • 按项目划分虚拟主机
  • 为每个应用创建独立用户
  • 使用正则表达式限制资源访问
  • 定期审计权限配置

5. 监控和告警

  • 定期检查系统状态
  • 设置关键指标告警
  • 保留日志便于排查
  • 建立应急预案

相关链接