Appearance
自定义插件开发
概述
RabbitMQ 支持开发者创建自定义插件来扩展其功能。自定义插件可以添加新的交换器类型、实现自定义认证机制、添加新的协议支持等。本教程将介绍 RabbitMQ 插件开发的基础知识和实践方法。
核心知识点
插件开发架构
mermaid
graph TB
subgraph RabbitMQ 核心
App[应用层]
PluginMgr[插件管理器]
end
subgraph 自定义插件
Plugin[自定义插件]
Init[初始化函数]
Hook[生命周期钩子]
Handler[消息处理器]
end
subgraph 扩展点
Ex[交换器类型]
Auth[认证后端]
Protocol[协议适配]
Feature[功能扩展]
end
App --> PluginMgr
PluginMgr --> Plugin
Plugin --> Init
Plugin --> Hook
Plugin --> Handler
Handler --> Ex
Handler --> Auth
Handler --> Protocol
Handler --> Feature插件结构
mermaid
graph LR
subgraph 插件目录结构
A[my_plugin-1.0.0]
B[ebin/]
C[priv/]
D[include/]
E[src/]
end
A --> B
A --> C
A --> D
A --> E生命周期
mermaid
sequenceDiagram
participant RMQ as RabbitMQ
participant P as 自定义插件
Note over RMQ,P: 插件加载
RMQ->>P: start
P-->>RMQ: ok
Note over RMQ,P: 运行时
RMQ->>P: 消息处理
P-->>RMQ: 处理结果
Note over RMQ,P: 插件停止
RMQ->>P: stop
P-->>RMQ: okPHP 代码示例
插件基础框架
erlang
-module(rabbit_my_custom_plugin).
-include_lib("rabbit_common/include/rabbit.hrl").
-export([start/0, stop/1, description/0]).
-behaviour(rabbit_plugin).
-rabbit_boot_step({?MODULE,
[{description, "My Custom Plugin"},
{mfa, {?MODULE, boot, []}},
{requires, [rabbit_registry]},
{enables, [rabbit_exchange_type]}]}).
start() ->
ok.
stop(_State) ->
ok.
description() ->
[{name, <<"my_custom_plugin">>},
{description, <<"My custom plugin description">>},
{version, <<"1.0.0">>}].
boot() ->
%% 注册交换器类型
rabbit_exchange:register_type(
<<"x-custom-exchange">>,
rabbit_exchange_type_custom,
?MODULE
),
ok.自定义交换器类型
erlang
-module(rabbit_exchange_type_custom).
-include_lib("rabbit_common/include/rabbit.hrl").
-export([description/0, route/2]).
-behaviour(rabbit_exchange_type).
description() ->
[{name, <<"x-custom-exchange">>},
{description, <<"Custom exchange type">>}].
route(#exchange{name = Name}, #delivery{message = Message}) ->
%% 自定义路由逻辑
RoutingKey = Message#basic_message.routing_key,
Route = calculate_route(RoutingKey, Name),
[Route].
calculate_route(RoutingKey, _Name) ->
%% 示例:根据路由键计算目标队列
case RoutingKey of
<<"urgent.", _/binary>> -> <<"high_priority_queue">>;
<<"normal.", _/binary>> -> <<"normal_queue">>;
_ -> <<"default_queue">>
end.PHP 管理接口
php
<?php
class CustomPluginManager
{
private $pluginDir = '/usr/lib/rabbitmq/plugins';
private $rabbitmqCtl = '/usr/sbin/rabbitmq-plugins';
public function createPluginPackage($pluginName, $version, $files)
{
$pluginFile = "{$pluginName}-{$version}.ez";
$tempDir = sys_get_temp_dir() . '/rabbit_plugin_' . time();
mkdir($tempDir, 0755, true);
$this->createPluginStructure($tempDir, $files);
$this->packagePlugin($tempDir, $pluginFile);
$this->cleanup($tempDir);
return $pluginFile;
}
private function createPluginStructure($dir, $files)
{
foreach ($files as $type => $contents) {
switch ($type) {
case 'src':
$this->writeErlangFiles($dir . '/src', $contents);
break;
case 'priv':
$this->writeFiles($dir . '/priv', $contents);
break;
case 'include':
$this->writeFiles($dir . '/include', $contents);
break;
}
}
}
private function writeErlangFiles($dir, $files)
{
if (!is_dir($dir)) {
mkdir($dir, 0755, true);
}
foreach ($files as $filename => $content) {
file_put_contents("{$dir}/{$filename}", $content);
}
}
private function writeFiles($dir, $files)
{
if (!is_dir($dir)) {
mkdir($dir, 0755, true);
}
foreach ($files as $filename => $content) {
file_put_contents("{$dir}/{$filename}", $content);
}
}
private function packagePlugin($sourceDir, $outputFile)
{
$zip = new ZipArchive();
if ($zip->open($outputFile, ZipArchive::CREATE) === true) {
$this->addDirectoryToZip($zip, $sourceDir, '');
$zip->close();
}
}
private function addDirectoryToZip($zip, $sourceDir, $basePath)
{
$files = scandir($sourceDir);
foreach ($files as $file) {
if ($file === '.' || $file === '..') {
continue;
}
$fullPath = "{$sourceDir}/{$file}";
$zipPath = empty($basePath) ? $file : "{$basePath}/{$file";
if (is_dir($fullPath)) {
$this->addDirectoryToZip($zip, $fullPath, $zipPath);
} else {
$zip->addFile($fullPath, $zipPath);
}
}
}
private function cleanup($dir)
{
exec("rm -rf {$dir}");
}
public function installPlugin($pluginFile)
{
$targetPath = "{$this->pluginDir}/{$pluginFile}";
if (!file_exists($pluginFile)) {
throw new RuntimeException("插件文件不存在: {$pluginFile}");
}
copy($pluginFile, $targetPath);
echo "插件已安装到: {$targetPath}\n";
}
public function enablePlugin($pluginName)
{
$command = "{$this->rabbitmqCtl} enable {$pluginName}";
exec($command, $output, $returnCode);
if ($returnCode !== 0) {
throw new RuntimeException("启用插件失败: " . implode("\n", $output));
}
echo "插件 {$pluginName} 已启用\n";
}
public function disablePlugin($pluginName)
{
$command = "{$this->rabbitmqCtl} disable {$pluginName}";
exec($command, $output, $returnCode);
if ($returnCode !== 0) {
throw new RuntimeException("禁用插件失败: " . implode("\n", $output));
}
echo "插件 {$pluginName} 已禁用\n";
}
}插件配置管理
php
<?php
class CustomPluginConfig
{
private $configFile = '/etc/rabbitmq/plugin_config.json';
private $config = [];
public function __construct()
{
$this->loadConfig();
}
public function loadConfig()
{
if (file_exists($this->configFile)) {
$content = file_get_contents($this->configFile);
$this->config = json_decode($content, true) ?? [];
}
}
public function saveConfig()
{
file_put_contents(
$this->configFile,
json_encode($this->config, JSON_PRETTY_PRINT)
);
}
public function get($key, $default = null)
{
return $this->config[$key] ?? $default;
}
public function set($key, $value)
{
$this->config[$key] = $value;
}
public function delete($key)
{
unset($this->config[$key]);
}
public function generateErlangConfig()
{
$erlConfig = [];
foreach ($this->config as $key => $value) {
if (is_bool($value)) {
$erlConfig[] = "{$key} = {$value}";
} elseif (is_array($value)) {
$erlConfig[] = "{$key} = " . json_encode($value);
} else {
$erlConfig[] = "{$key} = {$value}";
}
}
return implode("\n", $erlConfig);
}
}自定义认证后端
erlang
-module(rabbit_auth_backend_custom).
-behaviour(rabbit_auth_backend).
-export([description/0, check_user/2, check_vhost_access/3, check_resource_access/4]).
description() ->
[{name, <<"custom">>},
{description, <<"Custom authentication backend">>}].
check_user(Username, Password) ->
%% 自定义认证逻辑
case custom_auth:verify_credentials(Username, Password) of
true ->
{ok, #auth_user{username = Username,
tags = [monitor],
impl = none}};
false ->
{refused, <<"Invalid credentials">>}
end.
check_vhost_access(#auth_user{username = Username}, VHost, _Ref) ->
custom_auth:has_vhost_access(Username, VHost).
check_resource_access(#auth_user{username = Username},
Resource, _Permission, _Ref) ->
custom_auth:has_resource_access(Username, Resource).插件监控接口
php
<?php
class CustomPluginMonitor
{
private $rabbitmqApi;
public function __construct()
{
$this->rabbitmqApi = new RabbitMQApiClient('localhost', 15672, 'guest', 'guest');
}
public function getPluginStatus($pluginName)
{
try {
$overview = $this->rabbitmqApi->get('/api/overview');
return [
'plugin' => $pluginName,
'running' => true,
'version' => $overview['rabbitmq_version'] ?? 'unknown'
];
} catch (Exception $e) {
return [
'plugin' => $pluginName,
'running' => false,
'error' => $e->getMessage()
];
}
}
public function getPluginMetrics($pluginName)
{
return [
'messages_published' => 0,
'messages_consumed' => 0,
'connections' => 0,
'channels' => 0
];
}
public function healthCheck($pluginName)
{
$status = $this->getPluginStatus($pluginName);
if (!$status['running']) {
return [
'healthy' => false,
'reason' => 'Plugin not running'
];
}
$metrics = $this->getPluginMetrics($pluginName);
if ($metrics['connections'] < 0) {
return [
'healthy' => false,
'reason' => 'Invalid connection count'
];
}
return ['healthy' => true];
}
}实际应用场景
1. 自定义消息路由插件
erlang
-module(rabbit_exchange_type_distributed).
-behaviour(rabbit_exchange_type).
-export([description/0, route/2]).
description() ->
[{name, <<"x-distributed">>},
{description, <<"Distributed exchange with sharding">>}].
route(Exchange, Delivery) ->
ShardCount = get_shard_count(Exchange),
ShardKey = calculate_shard_key(Delivery),
ShardIndex = erlang:phash2(ShardKey, ShardCount),
ShardQueue = list_to_binary(["shard_", integer_to_list(ShardIndex)]),
[ShardQueue].
get_shard_count(#exchange{arguments = Args}) ->
proplists:get_value(<<"shard_count">>, Args, 10).
calculate_shard_key(#delivery{message = Message}) ->
Message#basic_message.routing_key.2. 自定义协议适配器
erlang
-module(rabbit_protocol_adapter_custom).
-export([init/1, handle_packet/2, terminate/2]).
init(Connection) ->
{ok, Connection}.
handle_packet(Connection, Packet) ->
case parse_custom_protocol(Packet) of
{publish, Topic, Payload} ->
handle_publish(Connection, Topic, Payload);
{subscribe, Topic} ->
handle_subscribe(Connection, Topic);
{unsubscribe, Topic} ->
handle_unsubscribe(Connection, Topic);
unknown ->
{close, Connection}
end.
terminate(_Reason, _Connection) ->
ok.3. 自定义消息转换插件
php
<?php
class MessageTransformPlugin
{
private $transformers = [];
public function registerTransformer($name, callable $transformer)
{
$this->transformers[$name] = $transformer;
}
public function transform($message, $transformerName)
{
if (!isset($this->transformers[$transformerName])) {
throw new RuntimeException("未知的转换器: {$transformerName}");
}
$transformer = $this->transformers[$transformerName];
return call_user_func($transformer, $message);
}
public function transformJsonToXml($message)
{
$data = json_decode($message['body'], true);
$xml = $this->arrayToXml($data);
return array_merge($message, [
'body' => $xml,
'content_type' => 'application/xml'
]);
}
public function transformXmlToJson($message)
{
$data = $this->xmlToArray($message['body']);
$json = json_encode($data);
return array_merge($message, [
'body' => $json,
'content_type' => 'application/json'
]);
}
private function arrayToXml($array, $rootElement = 'root', $xml = null)
{
if ($xml === null) {
$xml = new SimpleXMLElement("<{$rootElement}/>");
}
foreach ($array as $key => $value) {
if (is_numeric($key)) {
$key = 'item';
}
if (is_array($value)) {
$subnode = $xml->addChild($key);
$this->arrayToXml($value, $key, $subnode);
} else {
$xml->addChild($key, htmlspecialchars($value));
}
}
return $xml->asXML();
}
private function xmlToArray($xml)
{
$xmlObject = simplexml_load_string($xml);
return json_decode(json_encode($xmlObject), true);
}
}常见问题与解决方案
问题 1:插件编译失败
解决方案:
php
<?php
class PluginCompiler
{
private $erlangBin = '/usr/bin';
private $rabbitmqHome = '/var/lib/rabbitmq';
public function compilePlugin($pluginSourceDir)
{
$this->ensureDependencies();
$ebinDir = $pluginSourceDir . '/ebin';
if (!is_dir($ebinDir)) {
mkdir($ebinDir, 0755, true);
}
$compileCommand = sprintf(
'cd %s && erlc -I %s/include -o %s/ebin -Wall +debug_info src/*.erl',
$pluginSourceDir,
$this->rabbitmqHome,
$pluginSourceDir
);
exec($compileCommand, $output, $returnCode);
if ($returnCode !== 0) {
throw new RuntimeException(
"编译失败: " . implode("\n", $output)
);
}
echo "插件编译成功\n";
return true;
}
private function ensureDependencies()
{
if (!is_dir($this->rabbitmqHome . '/include')) {
throw new RuntimeException("RabbitMQ 开发头文件不存在");
}
}
}问题 2:插件加载冲突
解决方案:
php
<?php
class PluginDependencyChecker
{
private $pluginRegistry;
public function checkDependencies($pluginName, $dependencies)
{
$errors = [];
foreach ($dependencies as $dep) {
if (!$this->isPluginLoaded($dep)) {
$errors[] = "缺少依赖插件: {$dep}";
}
}
if (!empty($errors)) {
throw new RuntimeException(
"插件依赖检查失败: " . implode("; ", $errors)
);
}
return true;
}
private function isPluginLoaded($pluginName)
{
exec('rabbitmq-plugins list -e', $output);
foreach ($output as $line) {
if (strpos($line, $pluginName) !== false) {
return true;
}
}
return false;
}
}问题 3:插件版本不兼容
解决方案:
php
<?php
class VersionCompatibilityChecker
{
private $minRabbitmqVersion = '3.8.0';
public function checkCompatibility($pluginVersion, $rabbitmqVersion)
{
$requiredVersion = $this->parseVersion($this->minRabbitmqVersion);
$currentVersion = $this->parseVersion($rabbitmqVersion);
if ($currentVersion < $requiredVersion) {
throw new RuntimeException(
"插件需要 RabbitMQ 版本 >= {$this->minRabbitmqVersion}," .
"当前版本: {$rabbitmqVersion}"
);
}
return true;
}
private function parseVersion($version)
{
$parts = explode('.', $version);
return [
'major' => (int)($parts[0] ?? 0),
'minor' => (int)($parts[1] ?? 0),
'patch' => (int)($parts[2] ?? 0)
];
}
}最佳实践建议
1. 插件测试框架
php
<?php
class PluginTestFramework
{
private $testDir;
public function __construct()
{
$this->testDir = sys_get_temp_dir() . '/plugin_tests';
}
public function runUnitTests($pluginSourceDir)
{
$testResults = [];
$testFiles = glob("{$this->testDir}/*.erl");
foreach ($testFiles as $testFile) {
$result = $this->runEunitTest($testFile);
$testResults[basename($testFile)] = $result;
}
return $testResults;
}
private function runEunitTest($testFile)
{
$command = "erlc -o {$this->testDir} {$testFile}";
exec($command, $output, $returnCode);
return [
'compiled' => $returnCode === 0,
'output' => implode("\n", $output)
];
}
public function runIntegrationTests($pluginName)
{
$tests = [
'test_plugin_loads',
'test_plugin_start',
'test_plugin_functionality',
'test_plugin_stops'
];
$results = [];
foreach ($tests as $test) {
$results[$test] = $this->runTest($pluginName, $test);
}
return $results;
}
private function runTest($pluginName, $testName)
{
return ['passed' => true];
}
}2. 插件文档生成
php
<?php
class PluginDocumentationGenerator
{
private $outputDir;
public function __construct($outputDir = './docs')
{
$this->outputDir = $outputDir;
}
public function generate($pluginName, $pluginSpec)
{
$markdown = $this->generateMarkdown($pluginSpec);
$filename = "{$this->outputDir}/{$pluginName}.md";
file_put_contents($filename, $markdown);
echo "文档已生成: {$filename}\n";
}
private function generateMarkdown($spec)
{
$md = "# {$spec['name']}\n\n";
$md .= "## 描述\n\n{$spec['description']}\n\n";
$md .= "## 版本\n\n{$spec['version']}\n\n";
if (isset($spec['configuration'])) {
$md .= "## 配置\n\n";
foreach ($spec['configuration'] as $key => $value) {
$md .= "- `{$key}`: {$value}\n";
}
$md .= "\n";
}
if (isset($spec['dependencies'])) {
$md .= "## 依赖\n\n";
foreach ($spec['dependencies'] as $dep) {
$md .= "- {$dep}\n";
}
$md .= "\n";
}
return $md;
}
}3. 插件部署流水线
php
<?php
class PluginDeploymentPipeline
{
private $stages = [];
public function addStage($name, callable $action)
{
$this->stages[$name] = $action;
}
public function deploy($pluginSourceDir)
{
$results = [];
foreach ($this->stages as $name => $action) {
echo "执行阶段: {$name}\n";
try {
$result = call_user_func($action, $pluginSourceDir);
$results[$name] = ['success' => true, 'result' => $result];
echo "✓ {$name} 完成\n";
} catch (Exception $e) {
$results[$name] = ['success' => false, 'error' => $e->getMessage()];
echo "✗ {$name} 失败: {$e->getMessage()}\n";
$this->rollback($results);
return false;
}
}
return true;
}
private function rollback($results)
{
echo "执行回滚...\n";
}
}
$pipeline = new PluginDeploymentPipeline();
$pipeline->addStage('compile', function ($dir) {
$compiler = new PluginCompiler();
return $compiler->compilePlugin($dir);
});
$pipeline->addStage('test', function ($dir) {
$tester = new PluginTestFramework();
return $tester->runUnitTests($dir);
});
$pipeline->addStage('package', function ($dir) {
$packager = new CustomPluginManager();
return $packager->createPluginPackage('my_plugin', '1.0.0', []);
});
$pipeline->addStage('deploy', function ($dir) {
$packager = new CustomPluginManager();
$packager->installPlugin('my_plugin-1.0.0.ez');
});
$pipeline->deploy('/path/to/plugin');