Skip to content

自定义插件开发

概述

RabbitMQ 支持开发者创建自定义插件来扩展其功能。自定义插件可以添加新的交换器类型、实现自定义认证机制、添加新的协议支持等。本教程将介绍 RabbitMQ 插件开发的基础知识和实践方法。

核心知识点

插件开发架构

mermaid
graph TB
    subgraph RabbitMQ 核心
        App[应用层]
        PluginMgr[插件管理器]
    end

    subgraph 自定义插件
        Plugin[自定义插件]
        Init[初始化函数]
        Hook[生命周期钩子]
        Handler[消息处理器]
    end

    subgraph 扩展点
        Ex[交换器类型]
        Auth[认证后端]
        Protocol[协议适配]
        Feature[功能扩展]
    end

    App --> PluginMgr
    PluginMgr --> Plugin

    Plugin --> Init
    Plugin --> Hook
    Plugin --> Handler

    Handler --> Ex
    Handler --> Auth
    Handler --> Protocol
    Handler --> Feature

插件结构

mermaid
graph LR
    subgraph 插件目录结构
        A[my_plugin-1.0.0]
        B[ebin/]
        C[priv/]
        D[include/]
        E[src/]
    end

    A --> B
    A --> C
    A --> D
    A --> E

生命周期

mermaid
sequenceDiagram
    participant RMQ as RabbitMQ
    participant P as 自定义插件

    Note over RMQ,P: 插件加载
    RMQ->>P: start
    P-->>RMQ: ok

    Note over RMQ,P: 运行时
    RMQ->>P: 消息处理
    P-->>RMQ: 处理结果

    Note over RMQ,P: 插件停止
    RMQ->>P: stop
    P-->>RMQ: ok

PHP 代码示例

插件基础框架

erlang
-module(rabbit_my_custom_plugin).
-include_lib("rabbit_common/include/rabbit.hrl").

-export([start/0, stop/1, description/0]).

-behaviour(rabbit_plugin).

-rabbit_boot_step({?MODULE,
    [{description, "My Custom Plugin"},
     {mfa, {?MODULE, boot, []}},
     {requires, [rabbit_registry]},
     {enables, [rabbit_exchange_type]}]}).

start() ->
    ok.

stop(_State) ->
    ok.

description() ->
    [{name, <<"my_custom_plugin">>},
     {description, <<"My custom plugin description">>},
     {version, <<"1.0.0">>}].

boot() ->
    %% 注册交换器类型
    rabbit_exchange:register_type(
        <<"x-custom-exchange">>,
        rabbit_exchange_type_custom,
        ?MODULE
    ),
    ok.

自定义交换器类型

erlang
-module(rabbit_exchange_type_custom).
-include_lib("rabbit_common/include/rabbit.hrl").

-export([description/0, route/2]).

-behaviour(rabbit_exchange_type).

description() ->
    [{name, <<"x-custom-exchange">>},
     {description, <<"Custom exchange type">>}].

route(#exchange{name = Name}, #delivery{message = Message}) ->
    %% 自定义路由逻辑
    RoutingKey = Message#basic_message.routing_key,
    Route = calculate_route(RoutingKey, Name),
    [Route].

calculate_route(RoutingKey, _Name) ->
    %% 示例:根据路由键计算目标队列
    case RoutingKey of
        <<"urgent.", _/binary>> -> <<"high_priority_queue">>;
        <<"normal.", _/binary>> -> <<"normal_queue">>;
        _ -> <<"default_queue">>
    end.

PHP 管理接口

php
<?php

class CustomPluginManager
{
    private $pluginDir = '/usr/lib/rabbitmq/plugins';
    private $rabbitmqCtl = '/usr/sbin/rabbitmq-plugins';

    public function createPluginPackage($pluginName, $version, $files)
    {
        $pluginFile = "{$pluginName}-{$version}.ez";

        $tempDir = sys_get_temp_dir() . '/rabbit_plugin_' . time();
        mkdir($tempDir, 0755, true);

        $this->createPluginStructure($tempDir, $files);

        $this->packagePlugin($tempDir, $pluginFile);

        $this->cleanup($tempDir);

        return $pluginFile;
    }

    private function createPluginStructure($dir, $files)
    {
        foreach ($files as $type => $contents) {
            switch ($type) {
                case 'src':
                    $this->writeErlangFiles($dir . '/src', $contents);
                    break;
                case 'priv':
                    $this->writeFiles($dir . '/priv', $contents);
                    break;
                case 'include':
                    $this->writeFiles($dir . '/include', $contents);
                    break;
            }
        }
    }

    private function writeErlangFiles($dir, $files)
    {
        if (!is_dir($dir)) {
            mkdir($dir, 0755, true);
        }

        foreach ($files as $filename => $content) {
            file_put_contents("{$dir}/{$filename}", $content);
        }
    }

    private function writeFiles($dir, $files)
    {
        if (!is_dir($dir)) {
            mkdir($dir, 0755, true);
        }

        foreach ($files as $filename => $content) {
            file_put_contents("{$dir}/{$filename}", $content);
        }
    }

    private function packagePlugin($sourceDir, $outputFile)
    {
        $zip = new ZipArchive();

        if ($zip->open($outputFile, ZipArchive::CREATE) === true) {
            $this->addDirectoryToZip($zip, $sourceDir, '');
            $zip->close();
        }
    }

    private function addDirectoryToZip($zip, $sourceDir, $basePath)
    {
        $files = scandir($sourceDir);

        foreach ($files as $file) {
            if ($file === '.' || $file === '..') {
                continue;
            }

            $fullPath = "{$sourceDir}/{$file}";
            $zipPath = empty($basePath) ? $file : "{$basePath}/{$file";

            if (is_dir($fullPath)) {
                $this->addDirectoryToZip($zip, $fullPath, $zipPath);
            } else {
                $zip->addFile($fullPath, $zipPath);
            }
        }
    }

    private function cleanup($dir)
    {
        exec("rm -rf {$dir}");
    }

    public function installPlugin($pluginFile)
    {
        $targetPath = "{$this->pluginDir}/{$pluginFile}";

        if (!file_exists($pluginFile)) {
            throw new RuntimeException("插件文件不存在: {$pluginFile}");
        }

        copy($pluginFile, $targetPath);

        echo "插件已安装到: {$targetPath}\n";
    }

    public function enablePlugin($pluginName)
    {
        $command = "{$this->rabbitmqCtl} enable {$pluginName}";

        exec($command, $output, $returnCode);

        if ($returnCode !== 0) {
            throw new RuntimeException("启用插件失败: " . implode("\n", $output));
        }

        echo "插件 {$pluginName} 已启用\n";
    }

    public function disablePlugin($pluginName)
    {
        $command = "{$this->rabbitmqCtl} disable {$pluginName}";

        exec($command, $output, $returnCode);

        if ($returnCode !== 0) {
            throw new RuntimeException("禁用插件失败: " . implode("\n", $output));
        }

        echo "插件 {$pluginName} 已禁用\n";
    }
}

插件配置管理

php
<?php

class CustomPluginConfig
{
    private $configFile = '/etc/rabbitmq/plugin_config.json';
    private $config = [];

    public function __construct()
    {
        $this->loadConfig();
    }

    public function loadConfig()
    {
        if (file_exists($this->configFile)) {
            $content = file_get_contents($this->configFile);
            $this->config = json_decode($content, true) ?? [];
        }
    }

    public function saveConfig()
    {
        file_put_contents(
            $this->configFile,
            json_encode($this->config, JSON_PRETTY_PRINT)
        );
    }

    public function get($key, $default = null)
    {
        return $this->config[$key] ?? $default;
    }

    public function set($key, $value)
    {
        $this->config[$key] = $value;
    }

    public function delete($key)
    {
        unset($this->config[$key]);
    }

    public function generateErlangConfig()
    {
        $erlConfig = [];

        foreach ($this->config as $key => $value) {
            if (is_bool($value)) {
                $erlConfig[] = "{$key} = {$value}";
            } elseif (is_array($value)) {
                $erlConfig[] = "{$key} = " . json_encode($value);
            } else {
                $erlConfig[] = "{$key} = {$value}";
            }
        }

        return implode("\n", $erlConfig);
    }
}

自定义认证后端

erlang
-module(rabbit_auth_backend_custom).
-behaviour(rabbit_auth_backend).

-export([description/0, check_user/2, check_vhost_access/3, check_resource_access/4]).

description() ->
    [{name, <<"custom">>},
     {description, <<"Custom authentication backend">>}].

check_user(Username, Password) ->
    %% 自定义认证逻辑
    case custom_auth:verify_credentials(Username, Password) of
        true ->
            {ok, #auth_user{username = Username,
                             tags = [monitor],
                             impl = none}};
        false ->
            {refused, <<"Invalid credentials">>}
    end.

check_vhost_access(#auth_user{username = Username}, VHost, _Ref) ->
    custom_auth:has_vhost_access(Username, VHost).

check_resource_access(#auth_user{username = Username},
                      Resource, _Permission, _Ref) ->
    custom_auth:has_resource_access(Username, Resource).

插件监控接口

php
<?php

class CustomPluginMonitor
{
    private $rabbitmqApi;

    public function __construct()
    {
        $this->rabbitmqApi = new RabbitMQApiClient('localhost', 15672, 'guest', 'guest');
    }

    public function getPluginStatus($pluginName)
    {
        try {
            $overview = $this->rabbitmqApi->get('/api/overview');

            return [
                'plugin' => $pluginName,
                'running' => true,
                'version' => $overview['rabbitmq_version'] ?? 'unknown'
            ];
        } catch (Exception $e) {
            return [
                'plugin' => $pluginName,
                'running' => false,
                'error' => $e->getMessage()
            ];
        }
    }

    public function getPluginMetrics($pluginName)
    {
        return [
            'messages_published' => 0,
            'messages_consumed' => 0,
            'connections' => 0,
            'channels' => 0
        ];
    }

    public function healthCheck($pluginName)
    {
        $status = $this->getPluginStatus($pluginName);

        if (!$status['running']) {
            return [
                'healthy' => false,
                'reason' => 'Plugin not running'
            ];
        }

        $metrics = $this->getPluginMetrics($pluginName);

        if ($metrics['connections'] < 0) {
            return [
                'healthy' => false,
                'reason' => 'Invalid connection count'
            ];
        }

        return ['healthy' => true];
    }
}

实际应用场景

1. 自定义消息路由插件

erlang
-module(rabbit_exchange_type_distributed).
-behaviour(rabbit_exchange_type).

-export([description/0, route/2]).

description() ->
    [{name, <<"x-distributed">>},
     {description, <<"Distributed exchange with sharding">>}].

route(Exchange, Delivery) ->
    ShardCount = get_shard_count(Exchange),
    ShardKey = calculate_shard_key(Delivery),
    ShardIndex = erlang:phash2(ShardKey, ShardCount),
    ShardQueue = list_to_binary(["shard_", integer_to_list(ShardIndex)]),
    [ShardQueue].

get_shard_count(#exchange{arguments = Args}) ->
    proplists:get_value(<<"shard_count">>, Args, 10).

calculate_shard_key(#delivery{message = Message}) ->
    Message#basic_message.routing_key.

2. 自定义协议适配器

erlang
-module(rabbit_protocol_adapter_custom).
-export([init/1, handle_packet/2, terminate/2]).

init(Connection) ->
    {ok, Connection}.

handle_packet(Connection, Packet) ->
    case parse_custom_protocol(Packet) of
        {publish, Topic, Payload} ->
            handle_publish(Connection, Topic, Payload);
        {subscribe, Topic} ->
            handle_subscribe(Connection, Topic);
        {unsubscribe, Topic} ->
            handle_unsubscribe(Connection, Topic);
        unknown ->
            {close, Connection}
    end.

terminate(_Reason, _Connection) ->
    ok.

3. 自定义消息转换插件

php
<?php

class MessageTransformPlugin
{
    private $transformers = [];

    public function registerTransformer($name, callable $transformer)
    {
        $this->transformers[$name] = $transformer;
    }

    public function transform($message, $transformerName)
    {
        if (!isset($this->transformers[$transformerName])) {
            throw new RuntimeException("未知的转换器: {$transformerName}");
        }

        $transformer = $this->transformers[$transformerName];

        return call_user_func($transformer, $message);
    }

    public function transformJsonToXml($message)
    {
        $data = json_decode($message['body'], true);

        $xml = $this->arrayToXml($data);

        return array_merge($message, [
            'body' => $xml,
            'content_type' => 'application/xml'
        ]);
    }

    public function transformXmlToJson($message)
    {
        $data = $this->xmlToArray($message['body']);

        $json = json_encode($data);

        return array_merge($message, [
            'body' => $json,
            'content_type' => 'application/json'
        ]);
    }

    private function arrayToXml($array, $rootElement = 'root', $xml = null)
    {
        if ($xml === null) {
            $xml = new SimpleXMLElement("<{$rootElement}/>");
        }

        foreach ($array as $key => $value) {
            if (is_numeric($key)) {
                $key = 'item';
            }

            if (is_array($value)) {
                $subnode = $xml->addChild($key);
                $this->arrayToXml($value, $key, $subnode);
            } else {
                $xml->addChild($key, htmlspecialchars($value));
            }
        }

        return $xml->asXML();
    }

    private function xmlToArray($xml)
    {
        $xmlObject = simplexml_load_string($xml);

        return json_decode(json_encode($xmlObject), true);
    }
}

常见问题与解决方案

问题 1:插件编译失败

解决方案

php
<?php

class PluginCompiler
{
    private $erlangBin = '/usr/bin';
    private $rabbitmqHome = '/var/lib/rabbitmq';

    public function compilePlugin($pluginSourceDir)
    {
        $this->ensureDependencies();

        $ebinDir = $pluginSourceDir . '/ebin';

        if (!is_dir($ebinDir)) {
            mkdir($ebinDir, 0755, true);
        }

        $compileCommand = sprintf(
            'cd %s && erlc -I %s/include -o %s/ebin -Wall +debug_info src/*.erl',
            $pluginSourceDir,
            $this->rabbitmqHome,
            $pluginSourceDir
        );

        exec($compileCommand, $output, $returnCode);

        if ($returnCode !== 0) {
            throw new RuntimeException(
                "编译失败: " . implode("\n", $output)
            );
        }

        echo "插件编译成功\n";

        return true;
    }

    private function ensureDependencies()
    {
        if (!is_dir($this->rabbitmqHome . '/include')) {
            throw new RuntimeException("RabbitMQ 开发头文件不存在");
        }
    }
}

问题 2:插件加载冲突

解决方案

php
<?php

class PluginDependencyChecker
{
    private $pluginRegistry;

    public function checkDependencies($pluginName, $dependencies)
    {
        $errors = [];

        foreach ($dependencies as $dep) {
            if (!$this->isPluginLoaded($dep)) {
                $errors[] = "缺少依赖插件: {$dep}";
            }
        }

        if (!empty($errors)) {
            throw new RuntimeException(
                "插件依赖检查失败: " . implode("; ", $errors)
            );
        }

        return true;
    }

    private function isPluginLoaded($pluginName)
    {
        exec('rabbitmq-plugins list -e', $output);

        foreach ($output as $line) {
            if (strpos($line, $pluginName) !== false) {
                return true;
            }
        }

        return false;
    }
}

问题 3:插件版本不兼容

解决方案

php
<?php

class VersionCompatibilityChecker
{
    private $minRabbitmqVersion = '3.8.0';

    public function checkCompatibility($pluginVersion, $rabbitmqVersion)
    {
        $requiredVersion = $this->parseVersion($this->minRabbitmqVersion);
        $currentVersion = $this->parseVersion($rabbitmqVersion);

        if ($currentVersion < $requiredVersion) {
            throw new RuntimeException(
                "插件需要 RabbitMQ 版本 >= {$this->minRabbitmqVersion}," .
                "当前版本: {$rabbitmqVersion}"
            );
        }

        return true;
    }

    private function parseVersion($version)
    {
        $parts = explode('.', $version);

        return [
            'major' => (int)($parts[0] ?? 0),
            'minor' => (int)($parts[1] ?? 0),
            'patch' => (int)($parts[2] ?? 0)
        ];
    }
}

最佳实践建议

1. 插件测试框架

php
<?php

class PluginTestFramework
{
    private $testDir;

    public function __construct()
    {
        $this->testDir = sys_get_temp_dir() . '/plugin_tests';
    }

    public function runUnitTests($pluginSourceDir)
    {
        $testResults = [];

        $testFiles = glob("{$this->testDir}/*.erl");

        foreach ($testFiles as $testFile) {
            $result = $this->runEunitTest($testFile);
            $testResults[basename($testFile)] = $result;
        }

        return $testResults;
    }

    private function runEunitTest($testFile)
    {
        $command = "erlc -o {$this->testDir} {$testFile}";

        exec($command, $output, $returnCode);

        return [
            'compiled' => $returnCode === 0,
            'output' => implode("\n", $output)
        ];
    }

    public function runIntegrationTests($pluginName)
    {
        $tests = [
            'test_plugin_loads',
            'test_plugin_start',
            'test_plugin_functionality',
            'test_plugin_stops'
        ];

        $results = [];

        foreach ($tests as $test) {
            $results[$test] = $this->runTest($pluginName, $test);
        }

        return $results;
    }

    private function runTest($pluginName, $testName)
    {
        return ['passed' => true];
    }
}

2. 插件文档生成

php
<?php

class PluginDocumentationGenerator
{
    private $outputDir;

    public function __construct($outputDir = './docs')
    {
        $this->outputDir = $outputDir;
    }

    public function generate($pluginName, $pluginSpec)
    {
        $markdown = $this->generateMarkdown($pluginSpec);

        $filename = "{$this->outputDir}/{$pluginName}.md";

        file_put_contents($filename, $markdown);

        echo "文档已生成: {$filename}\n";
    }

    private function generateMarkdown($spec)
    {
        $md = "# {$spec['name']}\n\n";
        $md .= "## 描述\n\n{$spec['description']}\n\n";
        $md .= "## 版本\n\n{$spec['version']}\n\n";

        if (isset($spec['configuration'])) {
            $md .= "## 配置\n\n";
            foreach ($spec['configuration'] as $key => $value) {
                $md .= "- `{$key}`: {$value}\n";
            }
            $md .= "\n";
        }

        if (isset($spec['dependencies'])) {
            $md .= "## 依赖\n\n";
            foreach ($spec['dependencies'] as $dep) {
                $md .= "- {$dep}\n";
            }
            $md .= "\n";
        }

        return $md;
    }
}

3. 插件部署流水线

php
<?php

class PluginDeploymentPipeline
{
    private $stages = [];

    public function addStage($name, callable $action)
    {
        $this->stages[$name] = $action;
    }

    public function deploy($pluginSourceDir)
    {
        $results = [];

        foreach ($this->stages as $name => $action) {
            echo "执行阶段: {$name}\n";

            try {
                $result = call_user_func($action, $pluginSourceDir);
                $results[$name] = ['success' => true, 'result' => $result];
                echo "✓ {$name} 完成\n";
            } catch (Exception $e) {
                $results[$name] = ['success' => false, 'error' => $e->getMessage()];
                echo "✗ {$name} 失败: {$e->getMessage()}\n";

                $this->rollback($results);

                return false;
            }
        }

        return true;
    }

    private function rollback($results)
    {
        echo "执行回滚...\n";
    }
}

$pipeline = new PluginDeploymentPipeline();

$pipeline->addStage('compile', function ($dir) {
    $compiler = new PluginCompiler();
    return $compiler->compilePlugin($dir);
});

$pipeline->addStage('test', function ($dir) {
    $tester = new PluginTestFramework();
    return $tester->runUnitTests($dir);
});

$pipeline->addStage('package', function ($dir) {
    $packager = new CustomPluginManager();
    return $packager->createPluginPackage('my_plugin', '1.0.0', []);
});

$pipeline->addStage('deploy', function ($dir) {
    $packager = new CustomPluginManager();
    $packager->installPlugin('my_plugin-1.0.0.ez');
});

$pipeline->deploy('/path/to/plugin');

相关链接