Appearance
RabbitMQ Shovel 插件
一、概述
Shovel 是 RabbitMQ 的数据传输插件,用于在不同队列、交换器或不同 RabbitMQ 实例之间可靠地移动消息。Shovel 适用于数据迁移、跨集群消息传递和消息路由场景。
Shovel 架构
mermaid
graph TB
subgraph "源端"
SQ[源队列]
SE[源交换器]
end
subgraph "Shovel"
S[Shovel 消费者]
P[Shovel 生产者]
S --> P
end
subgraph "目标端"
DQ[目标队列]
DE[目标交换器]
end
SQ --> S
SE --> S
P --> DQ
P --> DE二、核心知识点
2.1 Shovel 工作原理
消息流转
mermaid
sequenceDiagram
participant SQ as 源队列
participant SC as Shovel 消费者
participant SP as Shovel 生产者
participant DQ as 目标队列
SQ->>SC: 消费消息
SC->>SP: 转发消息
SP->>DQ: 发布消息
DQ-->>SP: 确认
SP-->>SC: 确认
SC-->>SQ: 确认2.2 Shovel 类型
| 类型 | 说明 | 配置 |
|---|---|---|
| 静态 Shovel | 配置文件定义 | advanced.config |
| 动态 Shovel | 运行时创建 | HTTP API / rabbitmqctl |
2.3 Shovel 特点
优点
- 可靠性: 确保消息可靠传输
- 灵活性: 支持多种源和目标
- 跨协议: 支持 AMQP 0-9-1 和 AMQP 1.0
- 可配置: 支持消息转换
适用场景
- 数据迁移: 集群间数据迁移
- 消息路由: 复杂消息路由
- 跨集群通信: 不同集群间消息传递
- 协议转换: AMQP 协议间转换
2.4 Shovel 与 Federation 对比
| 特性 | Shovel | Federation |
|---|---|---|
| 配置方式 | 静态/动态 | 策略 |
| 消息流向 | 双向可配 | 上游到下游 |
| 自动发现 | 不支持 | 支持 |
| 灵活性 | 高 | 中 |
三、配置示例
3.1 启用 Shovel 插件
bash
# 启用插件
rabbitmq-plugins enable rabbitmq_shovel
rabbitmq-plugins enable rabbitmq_shovel_management
# 验证
rabbitmq-plugins list | grep shovel3.2 静态 Shovel 配置
erlang
% /etc/rabbitmq/advanced.config
[
{rabbitmq_shovel, [
{shovels, [
{'my-shovel', [
{source, [
{protocol, amqp091},
{uris, ["amqp://source-host:5672"]},
{declarations, [
{'queue.declare', [{queue, <<"source-queue">>}]}
]},
{queue, <<"source-queue">>},
{prefetch_count, 1000}
]},
{destination, [
{protocol, amqp091},
{uris, ["amqp://dest-host:5672"]},
{declarations, [
{'queue.declare', [{queue, <<"dest-queue">>}]}
]},
{publish_properties, [
{delivery_mode, 2}
]},
{publish_fields, [
{exchange, <<>>},
{routing_key, <<"dest-queue">>}
]}
]},
{ack_mode, on_confirm},
{reconnect_delay, 5}
]}
]}
]}
].3.3 动态 Shovel 配置
bash
# 创建动态 Shovel
rabbitmqctl set_parameter shovel my-shovel \
'{
"src-protocol": "amqp091",
"src-uri": "amqp://source-host:5672",
"src-queue": "source-queue",
"dest-protocol": "amqp091",
"dest-uri": "amqp://dest-host:5672",
"dest-queue": "dest-queue",
"ack-mode": "on-confirm",
"src-prefetch-count": 1000,
"src-delete-after": "never"
}'3.4 PHP 管理示例
php
<?php
class ShovelManager
{
private string $host;
private int $port;
private string $user;
private string $password;
public function __construct(
string $host = 'localhost',
int $port = 15672,
string $user = 'guest',
string $password = 'guest'
) {
$this->host = $host;
$this->port = $port;
$this->user = $user;
$this->password = $password;
}
private function request(string $endpoint, string $method = 'GET', array $data = null): array
{
$url = "http://{$this->host}:{$this->port}/api/{$endpoint}";
$ch = curl_init();
curl_setopt_array($ch, [
CURLOPT_URL => $url,
CURLOPT_RETURNTRANSFER => true,
CURLOPT_USERPWD => "{$this->user}:{$this->password}",
CURLOPT_HTTPHEADER => ['Content-Type: application/json'],
CURLOPT_CUSTOMREQUEST => $method,
]);
if ($data !== null) {
curl_setopt($ch, CURLOPT_POSTFIELDS, json_encode($data));
}
$response = curl_exec($ch);
$httpCode = curl_getinfo($ch, CURLINFO_HTTP_CODE);
curl_close($ch);
return [
'status' => $httpCode,
'data' => json_decode($response, true)
];
}
public function createShovel(
string $name,
string $srcUri,
string $srcQueue,
string $destUri,
string $destQueue,
array $options = []
): array {
$config = array_merge([
'src-protocol' => 'amqp091',
'src-uri' => $srcUri,
'src-queue' => $srcQueue,
'dest-protocol' => 'amqp091',
'dest-uri' => $destUri,
'dest-queue' => $destQueue,
'ack-mode' => 'on-confirm',
'src-prefetch-count' => 1000,
'src-delete-after' => 'never',
], $options);
return $this->request("parameters/shovel/%2f/{$name}", 'PUT', [
'value' => $config,
]);
}
public function deleteShovel(string $name): array
{
return $this->request("parameters/shovel/%2f/{$name}", 'DELETE');
}
public function listShovels(): array
{
return $this->request('shovels');
}
public function getShovelStatus(string $name): array
{
return $this->request("shovels/%2f/{$name}");
}
public function generateShovelReport(): string
{
$shovels = $this->listShovels();
$report = "=== Shovel 报告 ===\n";
$report .= "时间: " . date('Y-m-d H:i:s') . "\n\n";
foreach ($shovels['data'] ?? [] as $shovel) {
$report .= "Shovel: {$shovel['name']}\n";
$report .= " 源: {$shovel['source_uri']} -> {$shovel['source_queue']}\n";
$report .= " 目标: {$shovel['destination_uri']} -> {$shovel['destination_queue']}\n";
$report .= " 状态: {$shovel['state']}\n";
$report .= " 已传输: {$shovel['messages_transferred']}\n\n";
}
return $report;
}
}
$manager = new ShovelManager('localhost', 15672, 'admin', 'Admin@123456');
// 创建 Shovel
$manager->createShovel(
'migration-shovel',
'amqp://source-cluster:5672',
'source-queue',
'amqp://dest-cluster:5672',
'dest-queue'
);
// 生成报告
echo $manager->generateShovelReport();四、常见问题与解决方案
4.1 Shovel 启动失败
排查:
bash
# 检查 Shovel 状态
rabbitmqctl list_shovels
# 检查日志
grep -i shovel /var/log/rabbitmq/rabbit@$(hostname).log4.2 消息传输慢
优化:
json
{
"src-prefetch-count": 5000,
"ack-mode": "on-confirm",
"src-delete-after": "never"
}五、最佳实践建议
5.1 配置建议
- 确认模式: 使用
on-confirm确保可靠性 - 预取数量: 根据消息大小调整预取数量
- 重连延迟: 设置合理的重连延迟
5.2 监控建议
- 监控 Shovel 运行状态
- 监控消息传输速率
- 监控传输延迟
