Appearance
MQTT 插件
概述
MQTT 插件(rabbitmq_mqtt)是 RabbitMQ 的重要扩展,它使 RabbitMQ 能够支持 MQTT 协议。MQTT(Message Queuing Telemetry Transport)是一种轻量级的发布/订阅消息协议,特别适用于物联网(IoT)设备和带宽受限的网络环境。
核心知识点
MQTT 协议架构
mermaid
graph TB
subgraph MQTT 客户端
P[发布者]
S[订阅者]
end
subgraph RabbitMQ MQTT 插件
M[MQTT 协议层]
A[AMQP 转换层]
end
subgraph RabbitMQ
E[Exchange]
Q[Queue]
end
P -->|MQTT| M
M -->|转换| A
A --> E
E --> Q
Q -->|转换| A
A -->|MQTT| SMQTT 与 AMQP 对比
| 特性 | MQTT | AMQP |
|---|---|---|
| 协议开销 | 极小(2字节头部) | 较大 |
| 传输方式 | TCP/WebSocket | TCP |
| QoS 级别 | 0, 1, 2 | 最多一次、手动确认 |
| 主题订阅 | 通配符(#/+) | routing key |
| 适用场景 | IoT、移动设备 | 企业应用 |
工作流程
mermaid
sequenceDiagram
participant C as MQTT 客户端
participant M as MQTT 插件
participant E as Exchange
participant Q as 队列
participant S as 订阅者
Note over C,M: CONNECT
C->>M: CONNECT
M-->>C: CONNACK
Note over C,M: SUBSCRIBE
C->>M: SUBSCRIBE topic/sensors/#
M-->>C: SUBACK
Note over C,M: PUBLISH
C->>M: PUBLISH topic/sensors/temp {"value": 25}
M->>E: 转换为 AMQP 消息
E->>Q: 路由消息
Note over Q,S: 投递消息
Q->>M: 消息
M-->>S: PUBLISHPHP 代码示例
安装和配置 MQTT 插件
php
<?php
class MqttPluginInstaller
{
public function install()
{
echo "安装 MQTT 插件...\n";
$commands = [
'rabbitmq-plugins enable rabbitmq_mqtt',
'systemctl restart rabbitmq-server'
];
foreach ($commands as $command) {
exec($command, $output, $returnCode);
if ($returnCode !== 0) {
throw new RuntimeException("命令执行失败: {$command}");
}
}
echo "MQTT 插件安装完成\n";
}
public function isEnabled()
{
exec('rabbitmq-plugins list -e', $output);
foreach ($output as $line) {
if (strpos($line, 'rabbitmq_mqtt') !== false && strpos($line, '[E]') !== false) {
return true;
}
}
return false;
}
public function configure($config = [])
{
$defaultConfig = [
'mqtt.listeners.tcp.default' => 1883,
'mqtt.allow_anonymous' => 'true',
'mqtt.vhost' => '/',
'mqtt.exchange' => 'amq.topic'
];
$config = array_merge($defaultConfig, $config);
$configContent = '';
foreach ($config as $key => $value) {
$configContent .= "{$key} = {$value}\n";
}
file_put_contents('/etc/rabbitmq/rabbitmq.conf', $configContent);
echo "MQTT 配置已更新\n";
}
}MQTT 客户端连接
php
<?php
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
class MqttAdapter
{
private $connection;
private $channel;
private $exchangeName = 'amq.topic';
public function __construct()
{
$this->connection = new AMQPStreamConnection(
'localhost',
5672,
'guest',
'guest'
);
$this->channel = $this->connection->channel();
$this->channel->exchange_declare(
$this->exchangeName,
'topic',
false,
true,
false
);
}
public function publish($topic, $message, $qos = 0)
{
$msg = new AMQPMessage(
json_encode($message),
[
'content_type' => 'application/json',
'delivery_mode' => $qos > 0 ? 2 : 1,
'timestamp' => time()
]
);
$this->channel->basic_publish(
$msg,
$this->exchangeName,
$topic
);
}
public function subscribe($topicPattern, callable $callback)
{
$queueName = 'mqtt_subscriber_' . uniqid();
$this->channel->queue_declare($queueName, false, true, false, false);
$this->channel->queue_bind(
$queueName,
$this->exchangeName,
$topicPattern
);
$callbackWrapper = function (AMQPMessage $message) use ($callback) {
$body = json_decode($message->body, true);
$topic = $message->delivery_info['routing_key'];
$callback($topic, $body);
$message->ack();
};
$this->channel->basic_consume(
$queueName,
'',
false,
false,
false,
false,
$callbackWrapper
);
while ($this->channel->is_open()) {
$this->channel->wait();
}
}
public function close()
{
$this->channel->close();
$this->connection->close();
}
}传感器数据收集
php
<?php
class SensorDataCollector
{
private $adapter;
public function __construct()
{
$this->adapter = new MqttAdapter();
}
public function publishTemperature($sensorId, $temperature)
{
$this->adapter->publish("sensors/{$sensorId}/temperature", [
'sensor_id' => $sensorId,
'type' => 'temperature',
'value' => $temperature,
'unit' => 'celsius',
'timestamp' => time()
]);
}
public function publishHumidity($sensorId, $humidity)
{
$this->adapter->publish("sensors/{$sensorId}/humidity", [
'sensor_id' => $sensorId,
'type' => 'humidity',
'value' => $humidity,
'unit' => 'percent',
'timestamp' => time()
]);
}
public function publishPressure($sensorId, $pressure)
{
$this->adapter->publish("sensors/{$sensorId}/pressure", [
'sensor_id' => $sensorId,
'type' => 'pressure',
'value' => $pressure,
'unit' => 'hPa',
'timestamp' => time()
]);
}
public function publishAll($sensorId, $data)
{
foreach ($data as $type => $value) {
$topic = "sensors/{$sensorId}/{$type}";
$this->adapter->publish($topic, [
'sensor_id' => $sensorId,
'type' => $type,
'value' => $value,
'timestamp' => time()
]);
}
}
}
class SensorDataSubscriber
{
private $adapter;
public function __construct()
{
$this->adapter = new MqttAdapter();
}
public function subscribeToAllSensors()
{
echo "订阅所有传感器数据...\n";
$this->adapter->subscribe('sensors/#', function ($topic, $data) {
echo sprintf(
"[%s] 收到数据: %s = %s %s\n",
date('Y-m-d H:i:s'),
$topic,
$data['value'],
$data['unit'] ?? ''
);
});
}
public function subscribeToSensor($sensorId)
{
echo "订阅传感器 {$sensorId} 数据...\n";
$this->adapter->subscribe("sensors/{$sensorId}/#", function ($topic, $data) {
echo sprintf(
"[%s] 传感器 %s: %s = %s\n",
date('Y-m-d H:i:s'),
$data['sensor_id'],
$data['type'],
$data['value']
);
});
}
public function subscribeToTemperature()
{
echo "订阅所有温度数据...\n";
$this->adapter->subscribe('sensors/+/temperature', function ($topic, $data) {
echo "温度告警: 传感器 {$data['sensor_id']} = {$data['value']}°C\n";
});
}
}完整示例:IoT 设备管理
php
<?php
class DeviceManager
{
private $adapter;
const DEVICE_STATUS_ONLINE = 'online';
const DEVICE_STATUS_OFFLINE = 'offline';
const DEVICE_STATUS_ERROR = 'error';
public function __construct()
{
$this->adapter = new MqttAdapter();
}
public function registerDevice($deviceId, $deviceType, $metadata = [])
{
$this->adapter->publish("device/{$deviceId}/register", [
'device_id' => $deviceId,
'device_type' => $deviceType,
'metadata' => $metadata,
'registered_at' => time()
]);
}
public function updateDeviceStatus($deviceId, $status)
{
$this->adapter->publish("device/{$deviceId}/status", [
'device_id' => $deviceId,
'status' => $status,
'timestamp' => time()
]);
}
public function sendCommand($deviceId, $command, $params = [])
{
$this->adapter->publish("device/{$deviceId}/command", [
'command_id' => uniqid('cmd_'),
'device_id' => $deviceId,
'command' => $command,
'params' => $params,
'sent_at' => time()
]);
}
public function handleDeviceData($topic, $data)
{
echo "处理设备数据: {$topic}\n";
$this->processData($data);
}
private function processData($data)
{
// 处理传感器数据的逻辑
}
}
class DeviceCommandHandler
{
private $adapter;
public function __construct()
{
$this->adapter = new MqttAdapter();
}
public function startListening()
{
echo "监听设备命令...\n";
$this->adapter->subscribe('device/+/command', function ($topic, $data) {
$deviceId = $data['device_id'];
$command = $data['command'];
$params = $data['params'];
echo "收到命令 - 设备: {$deviceId}, 命令: {$command}\n";
$result = $this->executeCommand($deviceId, $command, $params);
$this->adapter->publish("device/{$deviceId}/response", [
'command_id' => $data['command_id'],
'result' => $result,
'timestamp' => time()
]);
});
}
private function executeCommand($deviceId, $command, $params)
{
return [
'success' => true,
'executed_at' => time()
];
}
}智能家居系统
php
<?php
class SmartHomeController
{
private $adapter;
public function __construct()
{
$this->adapter = new MqttAdapter();
}
public function publishLightState($room, $deviceId, $state)
{
$this->adapter->publish("home/{$room}/light/{$deviceId}/state", [
'device_id' => $deviceId,
'state' => $state,
'timestamp' => time()
]);
}
public function publishTemperature($room, $temperature)
{
$this->adapter->publish("home/{$room}/temperature", [
'value' => $temperature,
'timestamp' => time()
]);
}
public function publishMotion($room, $detected)
{
$this->adapter->publish("home/{$room}/motion", [
'detected' => $detected,
'timestamp' => time()
]);
}
public function subscribeToRoom($room)
{
return new class($this->adapter, $room) {
private $adapter;
private $room;
public function __construct($adapter, $room)
{
$this->adapter = $adapter;
$this->room = $room;
}
public function onLightChange(callable $callback)
{
$this->adapter->subscribe("home/{$this->room}/light/#", function ($topic, $data) use ($callback) {
$callback($data);
});
}
public function onTemperatureChange(callable $callback)
{
$this->adapter->subscribe("home/{$this->room}/temperature", function ($topic, $data) use ($callback) {
$callback($data);
});
}
};
}
}实际应用场景
1. 环境监测系统
php
<?php
class EnvironmentMonitor
{
private $adapter;
private $thresholds = [
'temperature' => ['min' => 15, 'max' => 30],
'humidity' => ['min' => 30, 'max' => 70],
'co2' => ['min' => 0, 'max' => 1000]
];
public function __construct()
{
$this->adapter = new MqttAdapter();
}
public function checkThresholds($sensorId, $type, $value)
{
if (!isset($this->thresholds[$type])) {
return;
}
$threshold = $this->thresholds[$type];
if ($value < $threshold['min'] || $value > $threshold['max']) {
$this->adapter->publish("alerts/{$sensorId}", [
'sensor_id' => $sensorId,
'type' => $type,
'value' => $value,
'threshold' => $threshold,
'alert' => $value < $threshold['min'] ? 'below' : 'above',
'timestamp' => time()
]);
}
}
public function subscribeToAlerts($location, callable $callback)
{
$this->adapter->subscribe("alerts/{$location}/#", function ($topic, $data) use ($callback) {
$callback($data);
});
}
}2. 车队管理
php
<?php
class FleetManager
{
private $adapter;
public function __construct()
{
$this->adapter = new MqttAdapter();
}
public function trackVehicle($vehicleId, $location)
{
$this->adapter->publish("fleet/vehicle/{$vehicleId}/location", [
'vehicle_id' => $vehicleId,
'latitude' => $location['lat'],
'longitude' => $location['lng'],
'speed' => $location['speed'] ?? 0,
'heading' => $location['heading'] ?? 0,
'timestamp' => time()
]);
}
public function vehicleStatus($vehicleId, $status)
{
$this->adapter->publish("fleet/vehicle/{$vehicleId}/status", [
'vehicle_id' => $vehicleId,
'status' => $status,
'timestamp' => time()
]);
}
public function subscribeToFleet(callable $callback)
{
$this->adapter->subscribe('fleet/vehicle/#', function ($topic, $data) use ($callback) {
$callback($topic, $data);
});
}
}3. 工业自动化
php
<?php
class IndustrialAutomation
{
private $adapter;
public function __construct()
{
$this->adapter = new MqttAdapter();
}
public function publishMachineData($machineId, $data)
{
$topic = "industrial/machine/{$machineId}/telemetry";
$this->adapter->publish($topic, array_merge($data, [
'machine_id' => $machineId,
'timestamp' => time()
]));
}
public function publishAlert($machineId, $alert)
{
$topic = "industrial/machine/{$machineId}/alert";
$this->adapter->publish($topic, array_merge($alert, [
'machine_id' => $machineId,
'timestamp' => time()
]));
}
public function sendCommand($machineId, $command)
{
$topic = "industrial/machine/{$machineId}/command";
$this->adapter->publish($topic, [
'command' => $command,
'timestamp' => time()
]);
}
}常见问题与解决方案
问题 1:MQTT 连接被拒绝
原因:端口未开放或认证失败
解决方案:
php
<?php
class MqttConnectionTester
{
private $host = 'localhost';
private $port = 1883;
public function testConnection()
{
$socket = @fsockopen($this->host, $this->port, $errno, $errstr, 5);
if (!$socket) {
return [
'success' => false,
'error' => "无法连接到 MQTT 端口: {$errstr}"
];
}
fclose($socket);
return ['success' => true];
}
public function testWithCredentials($clientId, $username, $password)
{
$result = $this->testConnection();
if (!$result['success']) {
return $result;
}
return [
'success' => true,
'note' => '请在客户端测试认证'
];
}
}问题 2:消息丢失
原因:QoS 级别设置不当
解决方案:
php
<?php
class ReliableMqttPublisher
{
private $adapter;
private $qos = 1;
public function setQos($qos)
{
$this->qos = min(2, max(0, $qos));
}
public function publish($topic, $message)
{
$this->adapter->publish($topic, $message, $this->qos);
}
public function publishWithRetry($topic, $message, $maxRetries = 3)
{
for ($i = 0; $i < $maxRetries; $i++) {
try {
$this->publish($topic, $message);
return true;
} catch (Exception $e) {
if ($i === $maxRetries - 1) {
throw $e;
}
usleep(100000 * ($i + 1));
}
}
return false;
}
}问题 3:主题映射错误
解决方案:
php
<?php
class TopicMapper
{
private $mappings = [];
public function addMapping($mqttTopic, $amqpRoutingKey)
{
$this->mappings[$mqttTopic] = $amqpRoutingKey;
}
public function toAmqpRoutingKey($mqttTopic)
{
foreach ($this->mappings as $pattern => $routingKey) {
if ($this->matchTopic($mqttTopic, $pattern)) {
return $routingKey;
}
}
return str_replace('/', '.', $mqttTopic);
}
private function matchTopic($topic, $pattern)
{
$regex = str_replace(['#', '+'], ['.*', '[^/]+'], $pattern);
return preg_match("/^{$regex}$/", $topic) === 1;
}
}最佳实践建议
1. QoS 级别选择
php
<?php
class QosSelector
{
const QOS_AT_MOST_ONCE = 0;
const QOS_AT_LEAST_ONCE = 1;
const QOS_EXACTLY_ONCE = 2;
public static function select($useCase)
{
$qosMap = [
'telemetry' => self::QOS_AT_MOST_ONCE,
'commands' => self::QOS_AT_LEAST_ONCE,
'alerts' => self::QOS_AT_LEAST_ONCE,
'critical' => self::QOS_EXACTLY_ONCE
];
return $qosMap[$useCase] ?? self::QOS_AT_MOST_ONCE;
}
}2. 主题设计规范
php
<?php
class TopicConvention
{
const PATTERN = '/^[a-z][a-z0-9_]*(\/[a-z][a-z0-9_]*)*$/';
public static function validate($topic)
{
if (!preg_match(self::PATTERN, $topic)) {
throw new InvalidArgumentException(
"主题格式无效: {$topic}。应使用小写字母、数字和下划线"
);
}
return true;
}
public static function build($parts)
{
$topic = implode('/', $parts);
self::validate($topic);
return $topic;
}
}3. 消息压缩
php
<?php
class MqttMessageCompressor
{
public function compress($data)
{
if (is_array($data)) {
$data = json_encode($data);
}
return gzdeflate($data, 6);
}
public function decompress($data)
{
$decompressed = gzinflate($data);
return json_decode($decompressed, true);
}
}