Skip to content

物联网应用

概述

物联网(IoT)场景涉及海量设备连接和数据采集,RabbitMQ 作为消息中间件,可以实现设备数据的可靠传输、实时处理和灵活路由,是构建物联网平台的核心组件。

业务背景与需求

场景描述

某智能家居平台设备接入:

设备类型数量数据频率特点
智能开关10万事件触发低频、实时性要求高
温湿度传感器5万1次/分钟中频、数据量大
智能摄像头1万实时流高带宽、低延迟
智能门锁3万事件触发安全性要求高
智能家电2万多种频率协议多样

技术挑战

物联网系统挑战:
1. 海量连接:百万级设备同时在线
2. 数据量大:每秒百万级消息
3. 网络不稳定:设备断线重连
4. 协议多样:MQTT、HTTP、TCP等
5. 实时处理:告警需要即时响应

架构设计

整体架构图

mermaid
graph TB
    subgraph "设备层"
        A[智能开关]
        B[传感器]
        C[摄像头]
        D[智能门锁]
        E[智能家电]
    end
    
    subgraph "接入层"
        F[MQTT Broker]
        G[HTTP Gateway]
        H[TCP Gateway]
    end
    
    subgraph "RabbitMQ"
        I[IoT事件总线<br/>iot.exchange]
        
        subgraph "数据队列"
            J[设备数据队列]
            K[告警队列]
            L[指令队列]
        end
        
        M[死信队列]
    end
    
    subgraph "处理层"
        N[数据处理器]
        O[规则引擎]
        P[告警服务]
    end
    
    subgraph "存储层"
        Q[时序数据库]
        R[对象存储]
        S[Redis]
    end
    
    subgraph "应用层"
        T[监控大屏]
        U[移动APP]
        V[管理后台]
    end
    
    A --> F
    B --> F
    C --> H
    D --> F
    E --> G
    
    F --> I
    G --> I
    H --> I
    
    I --> J
    I --> K
    I --> L
    
    J --> N
    K --> P
    L --> O
    
    N --> Q
    N --> S
    P --> S
    O --> S
    
    Q --> T
    S --> U
    S --> V

数据流转流程

mermaid
sequenceDiagram
    participant Device as IoT设备
    participant Gateway as 接入网关
    participant MQ as RabbitMQ
    participant Processor as 数据处理器
    participant Storage as 存储
    participant App as 应用
    
    Device->>Gateway: 上报数据(MQTT/HTTP)
    Gateway->>Gateway: 协议转换
    Gateway->>MQ: 发布设备消息
    
    MQ->>Processor: 投递消息
    Processor->>Processor: 数据解析
    Processor->>Processor: 规则匹配
    
    alt 告警规则命中
        Processor->>App: 推送告警
    end
    
    Processor->>Storage: 存储数据
    Processor-->>MQ: ACK确认
    
    App->>MQ: 发送控制指令
    MQ->>Gateway: 投递指令
    Gateway->>Device: 下发指令
    Device-->>Gateway: 执行结果
    Gateway->>MQ: 上报结果

PHP 代码实现

设备消息结构

php
<?php

namespace App\IoT;

class DeviceMessage
{
    public string $messageId;
    public string $deviceId;
    public string $deviceType;
    public string $messageType;
    public int $timestamp;
    public array $payload;
    public array $metadata;

    public function __construct(
        string $deviceId,
        string $deviceType,
        string $messageType,
        array $payload
    ) {
        $this->messageId = $this->generateMessageId();
        $this->deviceId = $deviceId;
        $this->deviceType = $deviceType;
        $this->messageType = $messageType;
        $this->timestamp = time();
        $this->payload = $payload;
        $this->metadata = [];
    }

    private function generateMessageId(): string
    {
        return sprintf('iot_%s_%s', date('YmdHis'), bin2hex(random_bytes(8)));
    }

    public function toArray(): array
    {
        return [
            'message_id' => $this->messageId,
            'device_id' => $this->deviceId,
            'device_type' => $this->deviceType,
            'message_type' => $this->messageType,
            'timestamp' => $this->timestamp,
            'payload' => $this->payload,
            'metadata' => $this->metadata,
        ];
    }

    public static function fromArray(array $data): self
    {
        $message = new self(
            $data['device_id'],
            $data['device_type'],
            $data['message_type'],
            $data['payload']
        );
        $message->messageId = $data['message_id'];
        $message->timestamp = $data['timestamp'];
        $message->metadata = $data['metadata'] ?? [];
        return $message;
    }
}

设备接入网关

php
<?php

namespace App\IoT;

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Exchange\AMQPExchangeType;

class DeviceGateway
{
    private AMQPStreamConnection $connection;
    private $channel;
    private string $exchangeName = 'iot.exchange';
    private array $deviceRegistry;

    public function __construct(
        AMQPStreamConnection $connection,
        array $deviceRegistry = []
    ) {
        $this->connection = $connection;
        $this->channel = $connection->channel();
        $this->deviceRegistry = $deviceRegistry;
        $this->setupInfrastructure();
    }

    private function setupInfrastructure(): void
    {
        $this->channel->exchange_declare(
            $this->exchangeName,
            AMQPExchangeType::TOPIC,
            false,
            true,
            false
        );

        $queues = [
            'iot.device.data' => ['device.*.data', 'sensor.*.data'],
            'iot.device.event' => ['device.*.event', 'device.*.status'],
            'iot.device.command' => ['device.*.command'],
            'iot.alert' => ['alert.*'],
        ];

        foreach ($queues as $queueName => $routingKeys) {
            $args = [
                'x-message-ttl' => ['I', 604800000],
                'x-max-length' => ['I', 10000000],
            ];

            $this->channel->queue_declare($queueName, false, true, false, false, false, $args);

            foreach ($routingKeys as $routingKey) {
                $this->channel->queue_bind($queueName, $this->exchangeName, $routingKey);
            }
        }
    }

    public function handleDeviceMessage(string $rawMessage): array
    {
        $data = $this->parseMessage($rawMessage);

        if (!$this->validateDevice($data['device_id'])) {
            return ['success' => false, 'error' => 'Invalid device'];
        }

        $message = new DeviceMessage(
            $data['device_id'],
            $data['device_type'],
            $data['message_type'],
            $data['payload']
        );

        $this->publishMessage($message);

        return ['success' => true, 'message_id' => $message->messageId];
    }

    private function parseMessage(string $rawMessage): array
    {
        $data = json_decode($rawMessage, true);

        if (json_last_error() !== JSON_ERROR_NONE) {
            $data = $this->parseBinaryMessage($rawMessage);
        }

        return $data;
    }

    private function parseBinaryMessage(string $rawMessage): array
    {
        return [
            'device_id' => substr($rawMessage, 0, 16),
            'device_type' => 'sensor',
            'message_type' => 'data',
            'payload' => [
                'temperature' => ord($rawMessage[16]) + ord($rawMessage[17]) / 100,
                'humidity' => ord($rawMessage[18]) + ord($rawMessage[19]) / 100,
            ],
        ];
    }

    private function validateDevice(string $deviceId): bool
    {
        return isset($this->deviceRegistry[$deviceId]) || $this->checkDeviceInDb($deviceId);
    }

    private function checkDeviceInDb(string $deviceId): bool
    {
        return true;
    }

    private function publishMessage(DeviceMessage $message): void
    {
        $routingKey = sprintf(
            '%s.%s.%s',
            $message->deviceType,
            $message->deviceId,
            $message->messageType
        );

        $amqpMessage = new AMQPMessage(
            json_encode($message->toArray()),
            [
                'content_type' => 'application/json',
                'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
                'message_id' => $message->messageId,
                'timestamp' => time(),
                'headers' => [
                    'device_id' => $message->deviceId,
                    'device_type' => $message->deviceType,
                ],
            ]
        );

        $this->channel->basic_publish($amqpMessage, $this->exchangeName, $routingKey);
    }

    public function sendCommand(string $deviceId, string $command, array $params = []): string
    {
        $commandId = sprintf('cmd_%s_%s', date('YmdHis'), bin2hex(random_bytes(4)));

        $message = [
            'command_id' => $commandId,
            'device_id' => $deviceId,
            'command' => $command,
            'params' => $params,
            'timestamp' => time(),
        ];

        $routingKey = "device.{$deviceId}.command";

        $amqpMessage = new AMQPMessage(
            json_encode($message),
            [
                'content_type' => 'application/json',
                'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
                'message_id' => $commandId,
                'reply_to' => 'iot.command.response',
            ]
        );

        $this->channel->basic_publish($amqpMessage, $this->exchangeName, $routingKey);

        return $commandId;
    }
}

数据处理消费者

php
<?php

namespace App\IoT;

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

class DataProcessor
{
    private AMQPStreamConnection $connection;
    private $channel;
    private TimeSeriesStorage $storage;
    private RuleEngine $ruleEngine;
    private AlertService $alertService;
    private bool $running = true;

    public function __construct(
        AMQPStreamConnection $connection,
        TimeSeriesStorage $storage,
        RuleEngine $ruleEngine,
        AlertService $alertService
    ) {
        $this->connection = $connection;
        $this->channel = $connection->channel();
        $this->storage = $storage;
        $this->ruleEngine = $ruleEngine;
        $this->alertService = $alertService;
    }

    public function consume(): void
    {
        $this->channel->basic_qos(null, 100, null);
        $this->channel->basic_consume('iot.device.data', '', false, false, false, false, [$this, 'handleMessage']);

        while ($this->running && count($this->channel->callbacks)) {
            $this->channel->wait(null, true);
            if (!$this->running) break;
            usleep(10000);
        }
    }

    public function handleMessage(AMQPMessage $message): void
    {
        $data = json_decode($message->body, true);

        try {
            $deviceMessage = DeviceMessage::fromArray($data);

            $this->processData($deviceMessage);

            $this->checkRules($deviceMessage);

            $message->ack();

        } catch (\Exception $e) {
            error_log("Data processor error: " . $e->getMessage());
            $message->nack(false, false);
        }
    }

    private function processData(DeviceMessage $message): void
    {
        $this->storage->write([
            'measurement' => $message->deviceType,
            'tags' => [
                'device_id' => $message->deviceId,
                'device_type' => $message->deviceType,
            ],
            'fields' => $message->payload,
            'timestamp' => $message->timestamp * 1000000000,
        ]);
    }

    private function checkRules(DeviceMessage $message): void
    {
        $alerts = $this->ruleEngine->evaluate($message);

        foreach ($alerts as $alert) {
            $this->alertService->sendAlert([
                'device_id' => $message->deviceId,
                'alert_type' => $alert['type'],
                'severity' => $alert['severity'],
                'message' => $alert['message'],
                'data' => $message->payload,
                'timestamp' => time(),
            ]);
        }
    }

    public function stop(): void
    {
        $this->running = false;
    }
}

规则引擎实现

php
<?php

namespace App\IoT;

class RuleEngine
{
    private array $rules;
    private $redis;

    public function __construct($redis, array $rules = [])
    {
        $this->redis = $redis;
        $this->rules = $rules;
    }

    public function evaluate(DeviceMessage $message): array
    {
        $alerts = [];

        foreach ($this->rules as $rule) {
            if ($this->matchRule($message, $rule)) {
                if ($this->checkCondition($message, $rule)) {
                    $alerts[] = [
                        'type' => $rule['name'],
                        'severity' => $rule['severity'],
                        'message' => $this->formatMessage($message, $rule),
                    ];
                }
            }
        }

        return $alerts;
    }

    private function matchRule(DeviceMessage $message, array $rule): bool
    {
        if (isset($rule['device_type']) && $message->deviceType !== $rule['device_type']) {
            return false;
        }

        if (isset($rule['device_ids']) && !in_array($message->deviceId, $rule['device_ids'])) {
            return false;
        }

        return true;
    }

    private function checkCondition(DeviceMessage $message, array $rule): bool
    {
        foreach ($rule['conditions'] as $condition) {
            $field = $condition['field'];
            $operator = $condition['operator'];
            $value = $condition['value'];

            if (!isset($message->payload[$field])) {
                continue;
            }

            $actualValue = $message->payload[$field];

            if (!$this->compare($actualValue, $operator, $value)) {
                return false;
            }
        }

        return true;
    }

    private function compare($actual, string $operator, $expected): bool
    {
        switch ($operator) {
            case 'gt':
                return $actual > $expected;
            case 'gte':
                return $actual >= $expected;
            case 'lt':
                return $actual < $expected;
            case 'lte':
                return $actual <= $expected;
            case 'eq':
                return $actual == $expected;
            case 'neq':
                return $actual != $expected;
            default:
                return false;
        }
    }

    private function formatMessage(DeviceMessage $message, array $rule): string
    {
        return str_replace(
            ['{device_id}', '{device_type}'],
            [$message->deviceId, $message->deviceType],
            $rule['message_template']
        );
    }

    public function addRule(array $rule): void
    {
        $this->rules[] = $rule;
    }
}

时序数据存储

php
<?php

namespace App\IoT;

class TimeSeriesStorage
{
    private $influxClient;

    public function __construct($influxClient)
    {
        $this->influxClient = $influxClient;
    }

    public function write(array $data): bool
    {
        $point = \InfluxDB2\Point::measurement($data['measurement']);

        foreach ($data['tags'] as $key => $value) {
            $point->addTag($key, $value);
        }

        foreach ($data['fields'] as $key => $value) {
            if (is_float($value) || is_int($value)) {
                $point->addField($key, $value);
            } else {
                $point->addField($key, (string) $value);
            }
        }

        if (isset($data['timestamp'])) {
            $point->time($data['timestamp']);
        }

        $this->influxClient->getWriteApi()->write($point);

        return true;
    }

    public function query(string $measurement, array $tags, string $timeRange): array
    {
        $query = sprintf(
            'from(bucket: "iot") |> range(start: %s) |> filter(fn: (r) => r._measurement == "%s")',
            $timeRange,
            $measurement
        );

        foreach ($tags as $key => $value) {
            $query .= sprintf(' |> filter(fn: (r) => r.%s == "%s")', $key, $value);
        }

        return $this->influxClient->getQueryApi()->query($query);
    }

    public function getLatest(string $deviceId, string $measurement): ?array
    {
        $query = sprintf(
            'from(bucket: "iot") |> range(start: -1h) |> filter(fn: (r) => r._measurement == "%s" and r.device_id == "%s") |> last()',
            $measurement,
            $deviceId
        );

        $result = $this->influxClient->getQueryApi()->query($query);

        if (empty($result)) {
            return null;
        }

        return $this->parseResult($result);
    }

    private function parseResult(array $result): array
    {
        $data = [];
        foreach ($result as $table) {
            foreach ($table->records as $record) {
                $field = $record->getField();
                $value = $record->getValue();
                $data[$field] = $value;
            }
        }
        return $data;
    }
}

完整使用示例

php
<?php

require_once 'vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use App\IoT\{DeviceGateway, DataProcessor, RuleEngine, AlertService, TimeSeriesStorage};

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');

$redis = new Redis();
$redis->connect('localhost', 6379);

$rules = [
    [
        'name' => 'high_temperature',
        'device_type' => 'sensor',
        'conditions' => [
            ['field' => 'temperature', 'operator' => 'gt', 'value' => 35],
        ],
        'severity' => 'warning',
        'message_template' => '设备 {device_id} 温度过高',
    ],
    [
        'name' => 'low_battery',
        'device_type' => 'sensor',
        'conditions' => [
            ['field' => 'battery', 'operator' => 'lt', 'value' => 20],
        ],
        'severity' => 'info',
        'message_template' => '设备 {device_id} 电量不足',
    ],
];

$ruleEngine = new RuleEngine($redis, $rules);
$alertService = new AlertService($redis);
$storage = new TimeSeriesStorage($influxClient);

$gateway = new DeviceGateway($connection);

echo "=== 物联网示例 ===\n\n";

echo "1. 设备上报数据\n";
$rawMessage = json_encode([
    'device_id' => 'DEV001',
    'device_type' => 'sensor',
    'message_type' => 'data',
    'payload' => [
        'temperature' => 28.5,
        'humidity' => 65.0,
        'battery' => 85,
    ],
]);

$result = $gateway->handleDeviceMessage($rawMessage);
echo "消息处理结果: " . json_encode($result) . "\n";

echo "\n2. 发送设备指令\n";
$commandId = $gateway->sendCommand('DEV001', 'set_interval', ['interval' => 60]);
echo "指令已发送: {$commandId}\n";

echo "\n3. 启动数据处理器...\n";

$processor = new DataProcessor($connection, $storage, $ruleEngine, $alertService);

$pid = pcntl_fork();
if ($pid === 0) {
    $processor->consume();
    exit(0);
}

sleep(2);

$highTempMessage = json_encode([
    'device_id' => 'DEV002',
    'device_type' => 'sensor',
    'message_type' => 'data',
    'payload' => [
        'temperature' => 38.5,
        'humidity' => 70.0,
        'battery' => 15,
    ],
]);

echo "\n4. 触发告警规则\n";
$gateway->handleDeviceMessage($highTempMessage);

sleep(2);

posix_kill($pid, SIGTERM);
pcntl_wait($status);

$connection->close();

echo "\n=== 示例完成 ===\n";

关键技术点解析

1. 设备认证

php
private function validateDevice(string $deviceId, string $token): bool
{
    $cached = $this->redis->get("device:token:{$deviceId}");
    if ($cached === $token) {
        return true;
    }
    return $this->db->validateDeviceToken($deviceId, $token);
}

2. 数据压缩

php
private function compressPayload(array $payload): string
{
    return gzencode(json_encode($payload), 6);
}

3. 批量写入

php
public function writeBatch(array $points): void
{
    $this->influxClient->getWriteApi()->write($points);
}

4. 告警去重

php
private function shouldAlert(string $deviceId, string $alertType): bool
{
    $key = "alert:dedup:{$deviceId}:{$alertType}";
    return !$this->redis->exists($key);
}

性能优化建议

优化项建议说明
批量处理合并消息批量处理提高吞吐量
数据分片按设备ID分片提高并行度
边缘计算边缘节点预处理减少云端压力
冷热分离热数据Redis,冷数据时序库优化存储成本

常见问题与解决方案

1. 设备掉线

解决方案: 心跳检测 + 离线消息缓存

2. 消息积压

解决方案: 增加消费者 + 背压控制

3. 数据丢失

解决方案: 消息持久化 + 确认机制

相关链接