Skip to content

MQTT 插件

概述

MQTT 插件(rabbitmq_mqtt)是 RabbitMQ 的重要扩展,它使 RabbitMQ 能够支持 MQTT 协议。MQTT(Message Queuing Telemetry Transport)是一种轻量级的发布/订阅消息协议,特别适用于物联网(IoT)设备和带宽受限的网络环境。

核心知识点

MQTT 协议架构

mermaid
graph TB
    subgraph MQTT 客户端
        P[发布者]
        S[订阅者]
    end

    subgraph RabbitMQ MQTT 插件
        M[MQTT 协议层]
        A[AMQP 转换层]
    end

    subgraph RabbitMQ
        E[Exchange]
        Q[Queue]
    end

    P -->|MQTT| M
    M -->|转换| A
    A --> E
    E --> Q
    Q -->|转换| A
    A -->|MQTT| S

MQTT 与 AMQP 对比

特性MQTTAMQP
协议开销极小(2字节头部)较大
传输方式TCP/WebSocketTCP
QoS 级别0, 1, 2最多一次、手动确认
主题订阅通配符(#/+)routing key
适用场景IoT、移动设备企业应用

工作流程

mermaid
sequenceDiagram
    participant C as MQTT 客户端
    participant M as MQTT 插件
    participant E as Exchange
    participant Q as 队列
    participant S as 订阅者

    Note over C,M: CONNECT
    C->>M: CONNECT
    M-->>C: CONNACK

    Note over C,M: SUBSCRIBE
    C->>M: SUBSCRIBE topic/sensors/#
    M-->>C: SUBACK

    Note over C,M: PUBLISH
    C->>M: PUBLISH topic/sensors/temp {"value": 25}
    M->>E: 转换为 AMQP 消息
    E->>Q: 路由消息

    Note over Q,S: 投递消息
    Q->>M: 消息
    M-->>S: PUBLISH

PHP 代码示例

安装和配置 MQTT 插件

php
<?php

class MqttPluginInstaller
{
    public function install()
    {
        echo "安装 MQTT 插件...\n";

        $commands = [
            'rabbitmq-plugins enable rabbitmq_mqtt',
            'systemctl restart rabbitmq-server'
        ];

        foreach ($commands as $command) {
            exec($command, $output, $returnCode);

            if ($returnCode !== 0) {
                throw new RuntimeException("命令执行失败: {$command}");
            }
        }

        echo "MQTT 插件安装完成\n";
    }

    public function isEnabled()
    {
        exec('rabbitmq-plugins list -e', $output);

        foreach ($output as $line) {
            if (strpos($line, 'rabbitmq_mqtt') !== false && strpos($line, '[E]') !== false) {
                return true;
            }
        }

        return false;
    }

    public function configure($config = [])
    {
        $defaultConfig = [
            'mqtt.listeners.tcp.default' => 1883,
            'mqtt.allow_anonymous' => 'true',
            'mqtt.vhost' => '/',
            'mqtt.exchange' => 'amq.topic'
        ];

        $config = array_merge($defaultConfig, $config);

        $configContent = '';

        foreach ($config as $key => $value) {
            $configContent .= "{$key} = {$value}\n";
        }

        file_put_contents('/etc/rabbitmq/rabbitmq.conf', $configContent);

        echo "MQTT 配置已更新\n";
    }
}

MQTT 客户端连接

php
<?php

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

class MqttAdapter
{
    private $connection;
    private $channel;
    private $exchangeName = 'amq.topic';

    public function __construct()
    {
        $this->connection = new AMQPStreamConnection(
            'localhost',
            5672,
            'guest',
            'guest'
        );
        $this->channel = $this->connection->channel();

        $this->channel->exchange_declare(
            $this->exchangeName,
            'topic',
            false,
            true,
            false
        );
    }

    public function publish($topic, $message, $qos = 0)
    {
        $msg = new AMQPMessage(
            json_encode($message),
            [
                'content_type' => 'application/json',
                'delivery_mode' => $qos > 0 ? 2 : 1,
                'timestamp' => time()
            ]
        );

        $this->channel->basic_publish(
            $msg,
            $this->exchangeName,
            $topic
        );
    }

    public function subscribe($topicPattern, callable $callback)
    {
        $queueName = 'mqtt_subscriber_' . uniqid();

        $this->channel->queue_declare($queueName, false, true, false, false);

        $this->channel->queue_bind(
            $queueName,
            $this->exchangeName,
            $topicPattern
        );

        $callbackWrapper = function (AMQPMessage $message) use ($callback) {
            $body = json_decode($message->body, true);

            $topic = $message->delivery_info['routing_key'];

            $callback($topic, $body);

            $message->ack();
        };

        $this->channel->basic_consume(
            $queueName,
            '',
            false,
            false,
            false,
            false,
            $callbackWrapper
        );

        while ($this->channel->is_open()) {
            $this->channel->wait();
        }
    }

    public function close()
    {
        $this->channel->close();
        $this->connection->close();
    }
}

传感器数据收集

php
<?php

class SensorDataCollector
{
    private $adapter;

    public function __construct()
    {
        $this->adapter = new MqttAdapter();
    }

    public function publishTemperature($sensorId, $temperature)
    {
        $this->adapter->publish("sensors/{$sensorId}/temperature", [
            'sensor_id' => $sensorId,
            'type' => 'temperature',
            'value' => $temperature,
            'unit' => 'celsius',
            'timestamp' => time()
        ]);
    }

    public function publishHumidity($sensorId, $humidity)
    {
        $this->adapter->publish("sensors/{$sensorId}/humidity", [
            'sensor_id' => $sensorId,
            'type' => 'humidity',
            'value' => $humidity,
            'unit' => 'percent',
            'timestamp' => time()
        ]);
    }

    public function publishPressure($sensorId, $pressure)
    {
        $this->adapter->publish("sensors/{$sensorId}/pressure", [
            'sensor_id' => $sensorId,
            'type' => 'pressure',
            'value' => $pressure,
            'unit' => 'hPa',
            'timestamp' => time()
        ]);
    }

    public function publishAll($sensorId, $data)
    {
        foreach ($data as $type => $value) {
            $topic = "sensors/{$sensorId}/{$type}";
            $this->adapter->publish($topic, [
                'sensor_id' => $sensorId,
                'type' => $type,
                'value' => $value,
                'timestamp' => time()
            ]);
        }
    }
}

class SensorDataSubscriber
{
    private $adapter;

    public function __construct()
    {
        $this->adapter = new MqttAdapter();
    }

    public function subscribeToAllSensors()
    {
        echo "订阅所有传感器数据...\n";

        $this->adapter->subscribe('sensors/#', function ($topic, $data) {
            echo sprintf(
                "[%s] 收到数据: %s = %s %s\n",
                date('Y-m-d H:i:s'),
                $topic,
                $data['value'],
                $data['unit'] ?? ''
            );
        });
    }

    public function subscribeToSensor($sensorId)
    {
        echo "订阅传感器 {$sensorId} 数据...\n";

        $this->adapter->subscribe("sensors/{$sensorId}/#", function ($topic, $data) {
            echo sprintf(
                "[%s] 传感器 %s: %s = %s\n",
                date('Y-m-d H:i:s'),
                $data['sensor_id'],
                $data['type'],
                $data['value']
            );
        });
    }

    public function subscribeToTemperature()
    {
        echo "订阅所有温度数据...\n";

        $this->adapter->subscribe('sensors/+/temperature', function ($topic, $data) {
            echo "温度告警: 传感器 {$data['sensor_id']} = {$data['value']}°C\n";
        });
    }
}

完整示例:IoT 设备管理

php
<?php

class DeviceManager
{
    private $adapter;

    const DEVICE_STATUS_ONLINE = 'online';
    const DEVICE_STATUS_OFFLINE = 'offline';
    const DEVICE_STATUS_ERROR = 'error';

    public function __construct()
    {
        $this->adapter = new MqttAdapter();
    }

    public function registerDevice($deviceId, $deviceType, $metadata = [])
    {
        $this->adapter->publish("device/{$deviceId}/register", [
            'device_id' => $deviceId,
            'device_type' => $deviceType,
            'metadata' => $metadata,
            'registered_at' => time()
        ]);
    }

    public function updateDeviceStatus($deviceId, $status)
    {
        $this->adapter->publish("device/{$deviceId}/status", [
            'device_id' => $deviceId,
            'status' => $status,
            'timestamp' => time()
        ]);
    }

    public function sendCommand($deviceId, $command, $params = [])
    {
        $this->adapter->publish("device/{$deviceId}/command", [
            'command_id' => uniqid('cmd_'),
            'device_id' => $deviceId,
            'command' => $command,
            'params' => $params,
            'sent_at' => time()
        ]);
    }

    public function handleDeviceData($topic, $data)
    {
        echo "处理设备数据: {$topic}\n";

        $this->processData($data);
    }

    private function processData($data)
    {
        // 处理传感器数据的逻辑
    }
}

class DeviceCommandHandler
{
    private $adapter;

    public function __construct()
    {
        $this->adapter = new MqttAdapter();
    }

    public function startListening()
    {
        echo "监听设备命令...\n";

        $this->adapter->subscribe('device/+/command', function ($topic, $data) {
            $deviceId = $data['device_id'];
            $command = $data['command'];
            $params = $data['params'];

            echo "收到命令 - 设备: {$deviceId}, 命令: {$command}\n";

            $result = $this->executeCommand($deviceId, $command, $params);

            $this->adapter->publish("device/{$deviceId}/response", [
                'command_id' => $data['command_id'],
                'result' => $result,
                'timestamp' => time()
            ]);
        });
    }

    private function executeCommand($deviceId, $command, $params)
    {
        return [
            'success' => true,
            'executed_at' => time()
        ];
    }
}

智能家居系统

php
<?php

class SmartHomeController
{
    private $adapter;

    public function __construct()
    {
        $this->adapter = new MqttAdapter();
    }

    public function publishLightState($room, $deviceId, $state)
    {
        $this->adapter->publish("home/{$room}/light/{$deviceId}/state", [
            'device_id' => $deviceId,
            'state' => $state,
            'timestamp' => time()
        ]);
    }

    public function publishTemperature($room, $temperature)
    {
        $this->adapter->publish("home/{$room}/temperature", [
            'value' => $temperature,
            'timestamp' => time()
        ]);
    }

    public function publishMotion($room, $detected)
    {
        $this->adapter->publish("home/{$room}/motion", [
            'detected' => $detected,
            'timestamp' => time()
        ]);
    }

    public function subscribeToRoom($room)
    {
        return new class($this->adapter, $room) {
            private $adapter;
            private $room;

            public function __construct($adapter, $room)
            {
                $this->adapter = $adapter;
                $this->room = $room;
            }

            public function onLightChange(callable $callback)
            {
                $this->adapter->subscribe("home/{$this->room}/light/#", function ($topic, $data) use ($callback) {
                    $callback($data);
                });
            }

            public function onTemperatureChange(callable $callback)
            {
                $this->adapter->subscribe("home/{$this->room}/temperature", function ($topic, $data) use ($callback) {
                    $callback($data);
                });
            }
        };
    }
}

实际应用场景

1. 环境监测系统

php
<?php

class EnvironmentMonitor
{
    private $adapter;
    private $thresholds = [
        'temperature' => ['min' => 15, 'max' => 30],
        'humidity' => ['min' => 30, 'max' => 70],
        'co2' => ['min' => 0, 'max' => 1000]
    ];

    public function __construct()
    {
        $this->adapter = new MqttAdapter();
    }

    public function checkThresholds($sensorId, $type, $value)
    {
        if (!isset($this->thresholds[$type])) {
            return;
        }

        $threshold = $this->thresholds[$type];

        if ($value < $threshold['min'] || $value > $threshold['max']) {
            $this->adapter->publish("alerts/{$sensorId}", [
                'sensor_id' => $sensorId,
                'type' => $type,
                'value' => $value,
                'threshold' => $threshold,
                'alert' => $value < $threshold['min'] ? 'below' : 'above',
                'timestamp' => time()
            ]);
        }
    }

    public function subscribeToAlerts($location, callable $callback)
    {
        $this->adapter->subscribe("alerts/{$location}/#", function ($topic, $data) use ($callback) {
            $callback($data);
        });
    }
}

2. 车队管理

php
<?php

class FleetManager
{
    private $adapter;

    public function __construct()
    {
        $this->adapter = new MqttAdapter();
    }

    public function trackVehicle($vehicleId, $location)
    {
        $this->adapter->publish("fleet/vehicle/{$vehicleId}/location", [
            'vehicle_id' => $vehicleId,
            'latitude' => $location['lat'],
            'longitude' => $location['lng'],
            'speed' => $location['speed'] ?? 0,
            'heading' => $location['heading'] ?? 0,
            'timestamp' => time()
        ]);
    }

    public function vehicleStatus($vehicleId, $status)
    {
        $this->adapter->publish("fleet/vehicle/{$vehicleId}/status", [
            'vehicle_id' => $vehicleId,
            'status' => $status,
            'timestamp' => time()
        ]);
    }

    public function subscribeToFleet(callable $callback)
    {
        $this->adapter->subscribe('fleet/vehicle/#', function ($topic, $data) use ($callback) {
            $callback($topic, $data);
        });
    }
}

3. 工业自动化

php
<?php

class IndustrialAutomation
{
    private $adapter;

    public function __construct()
    {
        $this->adapter = new MqttAdapter();
    }

    public function publishMachineData($machineId, $data)
    {
        $topic = "industrial/machine/{$machineId}/telemetry";

        $this->adapter->publish($topic, array_merge($data, [
            'machine_id' => $machineId,
            'timestamp' => time()
        ]));
    }

    public function publishAlert($machineId, $alert)
    {
        $topic = "industrial/machine/{$machineId}/alert";

        $this->adapter->publish($topic, array_merge($alert, [
            'machine_id' => $machineId,
            'timestamp' => time()
        ]));
    }

    public function sendCommand($machineId, $command)
    {
        $topic = "industrial/machine/{$machineId}/command";

        $this->adapter->publish($topic, [
            'command' => $command,
            'timestamp' => time()
        ]);
    }
}

常见问题与解决方案

问题 1:MQTT 连接被拒绝

原因:端口未开放或认证失败

解决方案

php
<?php

class MqttConnectionTester
{
    private $host = 'localhost';
    private $port = 1883;

    public function testConnection()
    {
        $socket = @fsockopen($this->host, $this->port, $errno, $errstr, 5);

        if (!$socket) {
            return [
                'success' => false,
                'error' => "无法连接到 MQTT 端口: {$errstr}"
            ];
        }

        fclose($socket);

        return ['success' => true];
    }

    public function testWithCredentials($clientId, $username, $password)
    {
        $result = $this->testConnection();

        if (!$result['success']) {
            return $result;
        }

        return [
            'success' => true,
            'note' => '请在客户端测试认证'
        ];
    }
}

问题 2:消息丢失

原因:QoS 级别设置不当

解决方案

php
<?php

class ReliableMqttPublisher
{
    private $adapter;
    private $qos = 1;

    public function setQos($qos)
    {
        $this->qos = min(2, max(0, $qos));
    }

    public function publish($topic, $message)
    {
        $this->adapter->publish($topic, $message, $this->qos);
    }

    public function publishWithRetry($topic, $message, $maxRetries = 3)
    {
        for ($i = 0; $i < $maxRetries; $i++) {
            try {
                $this->publish($topic, $message);
                return true;
            } catch (Exception $e) {
                if ($i === $maxRetries - 1) {
                    throw $e;
                }
                usleep(100000 * ($i + 1));
            }
        }

        return false;
    }
}

问题 3:主题映射错误

解决方案

php
<?php

class TopicMapper
{
    private $mappings = [];

    public function addMapping($mqttTopic, $amqpRoutingKey)
    {
        $this->mappings[$mqttTopic] = $amqpRoutingKey;
    }

    public function toAmqpRoutingKey($mqttTopic)
    {
        foreach ($this->mappings as $pattern => $routingKey) {
            if ($this->matchTopic($mqttTopic, $pattern)) {
                return $routingKey;
            }
        }

        return str_replace('/', '.', $mqttTopic);
    }

    private function matchTopic($topic, $pattern)
    {
        $regex = str_replace(['#', '+'], ['.*', '[^/]+'], $pattern);

        return preg_match("/^{$regex}$/", $topic) === 1;
    }
}

最佳实践建议

1. QoS 级别选择

php
<?php

class QosSelector
{
    const QOS_AT_MOST_ONCE = 0;
    const QOS_AT_LEAST_ONCE = 1;
    const QOS_EXACTLY_ONCE = 2;

    public static function select($useCase)
    {
        $qosMap = [
            'telemetry' => self::QOS_AT_MOST_ONCE,
            'commands' => self::QOS_AT_LEAST_ONCE,
            'alerts' => self::QOS_AT_LEAST_ONCE,
            'critical' => self::QOS_EXACTLY_ONCE
        ];

        return $qosMap[$useCase] ?? self::QOS_AT_MOST_ONCE;
    }
}

2. 主题设计规范

php
<?php

class TopicConvention
{
    const PATTERN = '/^[a-z][a-z0-9_]*(\/[a-z][a-z0-9_]*)*$/';

    public static function validate($topic)
    {
        if (!preg_match(self::PATTERN, $topic)) {
            throw new InvalidArgumentException(
                "主题格式无效: {$topic}。应使用小写字母、数字和下划线"
            );
        }

        return true;
    }

    public static function build($parts)
    {
        $topic = implode('/', $parts);

        self::validate($topic);

        return $topic;
    }
}

3. 消息压缩

php
<?php

class MqttMessageCompressor
{
    public function compress($data)
    {
        if (is_array($data)) {
            $data = json_encode($data);
        }

        return gzdeflate($data, 6);
    }

    public function decompress($data)
    {
        $decompressed = gzinflate($data);

        return json_decode($decompressed, true);
    }
}

相关链接