Skip to content

RabbitMQ Shovel 插件

一、概述

Shovel 是 RabbitMQ 的数据传输插件,用于在不同队列、交换器或不同 RabbitMQ 实例之间可靠地移动消息。Shovel 适用于数据迁移、跨集群消息传递和消息路由场景。

Shovel 架构

mermaid
graph TB
    subgraph "源端"
        SQ[源队列]
        SE[源交换器]
    end
    
    subgraph "Shovel"
        S[Shovel 消费者]
        P[Shovel 生产者]
        S --> P
    end
    
    subgraph "目标端"
        DQ[目标队列]
        DE[目标交换器]
    end
    
    SQ --> S
    SE --> S
    P --> DQ
    P --> DE

二、核心知识点

2.1 Shovel 工作原理

消息流转

mermaid
sequenceDiagram
    participant SQ as 源队列
    participant SC as Shovel 消费者
    participant SP as Shovel 生产者
    participant DQ as 目标队列
    
    SQ->>SC: 消费消息
    SC->>SP: 转发消息
    SP->>DQ: 发布消息
    DQ-->>SP: 确认
    SP-->>SC: 确认
    SC-->>SQ: 确认

2.2 Shovel 类型

类型说明配置
静态 Shovel配置文件定义advanced.config
动态 Shovel运行时创建HTTP API / rabbitmqctl

2.3 Shovel 特点

优点

  1. 可靠性: 确保消息可靠传输
  2. 灵活性: 支持多种源和目标
  3. 跨协议: 支持 AMQP 0-9-1 和 AMQP 1.0
  4. 可配置: 支持消息转换

适用场景

  1. 数据迁移: 集群间数据迁移
  2. 消息路由: 复杂消息路由
  3. 跨集群通信: 不同集群间消息传递
  4. 协议转换: AMQP 协议间转换

2.4 Shovel 与 Federation 对比

特性ShovelFederation
配置方式静态/动态策略
消息流向双向可配上游到下游
自动发现不支持支持
灵活性

三、配置示例

3.1 启用 Shovel 插件

bash
# 启用插件
rabbitmq-plugins enable rabbitmq_shovel
rabbitmq-plugins enable rabbitmq_shovel_management

# 验证
rabbitmq-plugins list | grep shovel

3.2 静态 Shovel 配置

erlang
% /etc/rabbitmq/advanced.config
[
  {rabbitmq_shovel, [
    {shovels, [
      {'my-shovel', [
        {source, [
          {protocol, amqp091},
          {uris, ["amqp://source-host:5672"]},
          {declarations, [
            {'queue.declare', [{queue, <<"source-queue">>}]}
          ]},
          {queue, <<"source-queue">>},
          {prefetch_count, 1000}
        ]},
        {destination, [
          {protocol, amqp091},
          {uris, ["amqp://dest-host:5672"]},
          {declarations, [
            {'queue.declare', [{queue, <<"dest-queue">>}]}
          ]},
          {publish_properties, [
            {delivery_mode, 2}
          ]},
          {publish_fields, [
            {exchange, <<>>},
            {routing_key, <<"dest-queue">>}
          ]}
        ]},
        {ack_mode, on_confirm},
        {reconnect_delay, 5}
      ]}
    ]}
  ]}
].

3.3 动态 Shovel 配置

bash
# 创建动态 Shovel
rabbitmqctl set_parameter shovel my-shovel \
    '{
        "src-protocol": "amqp091",
        "src-uri": "amqp://source-host:5672",
        "src-queue": "source-queue",
        "dest-protocol": "amqp091",
        "dest-uri": "amqp://dest-host:5672",
        "dest-queue": "dest-queue",
        "ack-mode": "on-confirm",
        "src-prefetch-count": 1000,
        "src-delete-after": "never"
    }'

3.4 PHP 管理示例

php
<?php

class ShovelManager
{
    private string $host;
    private int $port;
    private string $user;
    private string $password;
    
    public function __construct(
        string $host = 'localhost',
        int $port = 15672,
        string $user = 'guest',
        string $password = 'guest'
    ) {
        $this->host = $host;
        $this->port = $port;
        $this->user = $user;
        $this->password = $password;
    }
    
    private function request(string $endpoint, string $method = 'GET', array $data = null): array
    {
        $url = "http://{$this->host}:{$this->port}/api/{$endpoint}";
        
        $ch = curl_init();
        curl_setopt_array($ch, [
            CURLOPT_URL => $url,
            CURLOPT_RETURNTRANSFER => true,
            CURLOPT_USERPWD => "{$this->user}:{$this->password}",
            CURLOPT_HTTPHEADER => ['Content-Type: application/json'],
            CURLOPT_CUSTOMREQUEST => $method,
        ]);
        
        if ($data !== null) {
            curl_setopt($ch, CURLOPT_POSTFIELDS, json_encode($data));
        }
        
        $response = curl_exec($ch);
        $httpCode = curl_getinfo($ch, CURLINFO_HTTP_CODE);
        curl_close($ch);
        
        return [
            'status' => $httpCode,
            'data' => json_decode($response, true)
        ];
    }
    
    public function createShovel(
        string $name,
        string $srcUri,
        string $srcQueue,
        string $destUri,
        string $destQueue,
        array $options = []
    ): array {
        $config = array_merge([
            'src-protocol' => 'amqp091',
            'src-uri' => $srcUri,
            'src-queue' => $srcQueue,
            'dest-protocol' => 'amqp091',
            'dest-uri' => $destUri,
            'dest-queue' => $destQueue,
            'ack-mode' => 'on-confirm',
            'src-prefetch-count' => 1000,
            'src-delete-after' => 'never',
        ], $options);
        
        return $this->request("parameters/shovel/%2f/{$name}", 'PUT', [
            'value' => $config,
        ]);
    }
    
    public function deleteShovel(string $name): array
    {
        return $this->request("parameters/shovel/%2f/{$name}", 'DELETE');
    }
    
    public function listShovels(): array
    {
        return $this->request('shovels');
    }
    
    public function getShovelStatus(string $name): array
    {
        return $this->request("shovels/%2f/{$name}");
    }
    
    public function generateShovelReport(): string
    {
        $shovels = $this->listShovels();
        
        $report = "=== Shovel 报告 ===\n";
        $report .= "时间: " . date('Y-m-d H:i:s') . "\n\n";
        
        foreach ($shovels['data'] ?? [] as $shovel) {
            $report .= "Shovel: {$shovel['name']}\n";
            $report .= "  源: {$shovel['source_uri']} -> {$shovel['source_queue']}\n";
            $report .= "  目标: {$shovel['destination_uri']} -> {$shovel['destination_queue']}\n";
            $report .= "  状态: {$shovel['state']}\n";
            $report .= "  已传输: {$shovel['messages_transferred']}\n\n";
        }
        
        return $report;
    }
}

$manager = new ShovelManager('localhost', 15672, 'admin', 'Admin@123456');

// 创建 Shovel
$manager->createShovel(
    'migration-shovel',
    'amqp://source-cluster:5672',
    'source-queue',
    'amqp://dest-cluster:5672',
    'dest-queue'
);

// 生成报告
echo $manager->generateShovelReport();

四、常见问题与解决方案

4.1 Shovel 启动失败

排查:

bash
# 检查 Shovel 状态
rabbitmqctl list_shovels

# 检查日志
grep -i shovel /var/log/rabbitmq/rabbit@$(hostname).log

4.2 消息传输慢

优化:

json
{
    "src-prefetch-count": 5000,
    "ack-mode": "on-confirm",
    "src-delete-after": "never"
}

五、最佳实践建议

5.1 配置建议

  1. 确认模式: 使用 on-confirm 确保可靠性
  2. 预取数量: 根据消息大小调整预取数量
  3. 重连延迟: 设置合理的重连延迟

5.2 监控建议

  1. 监控 Shovel 运行状态
  2. 监控消息传输速率
  3. 监控传输延迟

六、相关链接