Appearance
物联网应用
概述
物联网(IoT)场景涉及海量设备连接和数据采集,RabbitMQ 作为消息中间件,可以实现设备数据的可靠传输、实时处理和灵活路由,是构建物联网平台的核心组件。
业务背景与需求
场景描述
某智能家居平台设备接入:
| 设备类型 | 数量 | 数据频率 | 特点 |
|---|---|---|---|
| 智能开关 | 10万 | 事件触发 | 低频、实时性要求高 |
| 温湿度传感器 | 5万 | 1次/分钟 | 中频、数据量大 |
| 智能摄像头 | 1万 | 实时流 | 高带宽、低延迟 |
| 智能门锁 | 3万 | 事件触发 | 安全性要求高 |
| 智能家电 | 2万 | 多种频率 | 协议多样 |
技术挑战
物联网系统挑战:
1. 海量连接:百万级设备同时在线
2. 数据量大:每秒百万级消息
3. 网络不稳定:设备断线重连
4. 协议多样:MQTT、HTTP、TCP等
5. 实时处理:告警需要即时响应架构设计
整体架构图
mermaid
graph TB
subgraph "设备层"
A[智能开关]
B[传感器]
C[摄像头]
D[智能门锁]
E[智能家电]
end
subgraph "接入层"
F[MQTT Broker]
G[HTTP Gateway]
H[TCP Gateway]
end
subgraph "RabbitMQ"
I[IoT事件总线<br/>iot.exchange]
subgraph "数据队列"
J[设备数据队列]
K[告警队列]
L[指令队列]
end
M[死信队列]
end
subgraph "处理层"
N[数据处理器]
O[规则引擎]
P[告警服务]
end
subgraph "存储层"
Q[时序数据库]
R[对象存储]
S[Redis]
end
subgraph "应用层"
T[监控大屏]
U[移动APP]
V[管理后台]
end
A --> F
B --> F
C --> H
D --> F
E --> G
F --> I
G --> I
H --> I
I --> J
I --> K
I --> L
J --> N
K --> P
L --> O
N --> Q
N --> S
P --> S
O --> S
Q --> T
S --> U
S --> V数据流转流程
mermaid
sequenceDiagram
participant Device as IoT设备
participant Gateway as 接入网关
participant MQ as RabbitMQ
participant Processor as 数据处理器
participant Storage as 存储
participant App as 应用
Device->>Gateway: 上报数据(MQTT/HTTP)
Gateway->>Gateway: 协议转换
Gateway->>MQ: 发布设备消息
MQ->>Processor: 投递消息
Processor->>Processor: 数据解析
Processor->>Processor: 规则匹配
alt 告警规则命中
Processor->>App: 推送告警
end
Processor->>Storage: 存储数据
Processor-->>MQ: ACK确认
App->>MQ: 发送控制指令
MQ->>Gateway: 投递指令
Gateway->>Device: 下发指令
Device-->>Gateway: 执行结果
Gateway->>MQ: 上报结果PHP 代码实现
设备消息结构
php
<?php
namespace App\IoT;
class DeviceMessage
{
public string $messageId;
public string $deviceId;
public string $deviceType;
public string $messageType;
public int $timestamp;
public array $payload;
public array $metadata;
public function __construct(
string $deviceId,
string $deviceType,
string $messageType,
array $payload
) {
$this->messageId = $this->generateMessageId();
$this->deviceId = $deviceId;
$this->deviceType = $deviceType;
$this->messageType = $messageType;
$this->timestamp = time();
$this->payload = $payload;
$this->metadata = [];
}
private function generateMessageId(): string
{
return sprintf('iot_%s_%s', date('YmdHis'), bin2hex(random_bytes(8)));
}
public function toArray(): array
{
return [
'message_id' => $this->messageId,
'device_id' => $this->deviceId,
'device_type' => $this->deviceType,
'message_type' => $this->messageType,
'timestamp' => $this->timestamp,
'payload' => $this->payload,
'metadata' => $this->metadata,
];
}
public static function fromArray(array $data): self
{
$message = new self(
$data['device_id'],
$data['device_type'],
$data['message_type'],
$data['payload']
);
$message->messageId = $data['message_id'];
$message->timestamp = $data['timestamp'];
$message->metadata = $data['metadata'] ?? [];
return $message;
}
}设备接入网关
php
<?php
namespace App\IoT;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Exchange\AMQPExchangeType;
class DeviceGateway
{
private AMQPStreamConnection $connection;
private $channel;
private string $exchangeName = 'iot.exchange';
private array $deviceRegistry;
public function __construct(
AMQPStreamConnection $connection,
array $deviceRegistry = []
) {
$this->connection = $connection;
$this->channel = $connection->channel();
$this->deviceRegistry = $deviceRegistry;
$this->setupInfrastructure();
}
private function setupInfrastructure(): void
{
$this->channel->exchange_declare(
$this->exchangeName,
AMQPExchangeType::TOPIC,
false,
true,
false
);
$queues = [
'iot.device.data' => ['device.*.data', 'sensor.*.data'],
'iot.device.event' => ['device.*.event', 'device.*.status'],
'iot.device.command' => ['device.*.command'],
'iot.alert' => ['alert.*'],
];
foreach ($queues as $queueName => $routingKeys) {
$args = [
'x-message-ttl' => ['I', 604800000],
'x-max-length' => ['I', 10000000],
];
$this->channel->queue_declare($queueName, false, true, false, false, false, $args);
foreach ($routingKeys as $routingKey) {
$this->channel->queue_bind($queueName, $this->exchangeName, $routingKey);
}
}
}
public function handleDeviceMessage(string $rawMessage): array
{
$data = $this->parseMessage($rawMessage);
if (!$this->validateDevice($data['device_id'])) {
return ['success' => false, 'error' => 'Invalid device'];
}
$message = new DeviceMessage(
$data['device_id'],
$data['device_type'],
$data['message_type'],
$data['payload']
);
$this->publishMessage($message);
return ['success' => true, 'message_id' => $message->messageId];
}
private function parseMessage(string $rawMessage): array
{
$data = json_decode($rawMessage, true);
if (json_last_error() !== JSON_ERROR_NONE) {
$data = $this->parseBinaryMessage($rawMessage);
}
return $data;
}
private function parseBinaryMessage(string $rawMessage): array
{
return [
'device_id' => substr($rawMessage, 0, 16),
'device_type' => 'sensor',
'message_type' => 'data',
'payload' => [
'temperature' => ord($rawMessage[16]) + ord($rawMessage[17]) / 100,
'humidity' => ord($rawMessage[18]) + ord($rawMessage[19]) / 100,
],
];
}
private function validateDevice(string $deviceId): bool
{
return isset($this->deviceRegistry[$deviceId]) || $this->checkDeviceInDb($deviceId);
}
private function checkDeviceInDb(string $deviceId): bool
{
return true;
}
private function publishMessage(DeviceMessage $message): void
{
$routingKey = sprintf(
'%s.%s.%s',
$message->deviceType,
$message->deviceId,
$message->messageType
);
$amqpMessage = new AMQPMessage(
json_encode($message->toArray()),
[
'content_type' => 'application/json',
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
'message_id' => $message->messageId,
'timestamp' => time(),
'headers' => [
'device_id' => $message->deviceId,
'device_type' => $message->deviceType,
],
]
);
$this->channel->basic_publish($amqpMessage, $this->exchangeName, $routingKey);
}
public function sendCommand(string $deviceId, string $command, array $params = []): string
{
$commandId = sprintf('cmd_%s_%s', date('YmdHis'), bin2hex(random_bytes(4)));
$message = [
'command_id' => $commandId,
'device_id' => $deviceId,
'command' => $command,
'params' => $params,
'timestamp' => time(),
];
$routingKey = "device.{$deviceId}.command";
$amqpMessage = new AMQPMessage(
json_encode($message),
[
'content_type' => 'application/json',
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
'message_id' => $commandId,
'reply_to' => 'iot.command.response',
]
);
$this->channel->basic_publish($amqpMessage, $this->exchangeName, $routingKey);
return $commandId;
}
}数据处理消费者
php
<?php
namespace App\IoT;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
class DataProcessor
{
private AMQPStreamConnection $connection;
private $channel;
private TimeSeriesStorage $storage;
private RuleEngine $ruleEngine;
private AlertService $alertService;
private bool $running = true;
public function __construct(
AMQPStreamConnection $connection,
TimeSeriesStorage $storage,
RuleEngine $ruleEngine,
AlertService $alertService
) {
$this->connection = $connection;
$this->channel = $connection->channel();
$this->storage = $storage;
$this->ruleEngine = $ruleEngine;
$this->alertService = $alertService;
}
public function consume(): void
{
$this->channel->basic_qos(null, 100, null);
$this->channel->basic_consume('iot.device.data', '', false, false, false, false, [$this, 'handleMessage']);
while ($this->running && count($this->channel->callbacks)) {
$this->channel->wait(null, true);
if (!$this->running) break;
usleep(10000);
}
}
public function handleMessage(AMQPMessage $message): void
{
$data = json_decode($message->body, true);
try {
$deviceMessage = DeviceMessage::fromArray($data);
$this->processData($deviceMessage);
$this->checkRules($deviceMessage);
$message->ack();
} catch (\Exception $e) {
error_log("Data processor error: " . $e->getMessage());
$message->nack(false, false);
}
}
private function processData(DeviceMessage $message): void
{
$this->storage->write([
'measurement' => $message->deviceType,
'tags' => [
'device_id' => $message->deviceId,
'device_type' => $message->deviceType,
],
'fields' => $message->payload,
'timestamp' => $message->timestamp * 1000000000,
]);
}
private function checkRules(DeviceMessage $message): void
{
$alerts = $this->ruleEngine->evaluate($message);
foreach ($alerts as $alert) {
$this->alertService->sendAlert([
'device_id' => $message->deviceId,
'alert_type' => $alert['type'],
'severity' => $alert['severity'],
'message' => $alert['message'],
'data' => $message->payload,
'timestamp' => time(),
]);
}
}
public function stop(): void
{
$this->running = false;
}
}规则引擎实现
php
<?php
namespace App\IoT;
class RuleEngine
{
private array $rules;
private $redis;
public function __construct($redis, array $rules = [])
{
$this->redis = $redis;
$this->rules = $rules;
}
public function evaluate(DeviceMessage $message): array
{
$alerts = [];
foreach ($this->rules as $rule) {
if ($this->matchRule($message, $rule)) {
if ($this->checkCondition($message, $rule)) {
$alerts[] = [
'type' => $rule['name'],
'severity' => $rule['severity'],
'message' => $this->formatMessage($message, $rule),
];
}
}
}
return $alerts;
}
private function matchRule(DeviceMessage $message, array $rule): bool
{
if (isset($rule['device_type']) && $message->deviceType !== $rule['device_type']) {
return false;
}
if (isset($rule['device_ids']) && !in_array($message->deviceId, $rule['device_ids'])) {
return false;
}
return true;
}
private function checkCondition(DeviceMessage $message, array $rule): bool
{
foreach ($rule['conditions'] as $condition) {
$field = $condition['field'];
$operator = $condition['operator'];
$value = $condition['value'];
if (!isset($message->payload[$field])) {
continue;
}
$actualValue = $message->payload[$field];
if (!$this->compare($actualValue, $operator, $value)) {
return false;
}
}
return true;
}
private function compare($actual, string $operator, $expected): bool
{
switch ($operator) {
case 'gt':
return $actual > $expected;
case 'gte':
return $actual >= $expected;
case 'lt':
return $actual < $expected;
case 'lte':
return $actual <= $expected;
case 'eq':
return $actual == $expected;
case 'neq':
return $actual != $expected;
default:
return false;
}
}
private function formatMessage(DeviceMessage $message, array $rule): string
{
return str_replace(
['{device_id}', '{device_type}'],
[$message->deviceId, $message->deviceType],
$rule['message_template']
);
}
public function addRule(array $rule): void
{
$this->rules[] = $rule;
}
}时序数据存储
php
<?php
namespace App\IoT;
class TimeSeriesStorage
{
private $influxClient;
public function __construct($influxClient)
{
$this->influxClient = $influxClient;
}
public function write(array $data): bool
{
$point = \InfluxDB2\Point::measurement($data['measurement']);
foreach ($data['tags'] as $key => $value) {
$point->addTag($key, $value);
}
foreach ($data['fields'] as $key => $value) {
if (is_float($value) || is_int($value)) {
$point->addField($key, $value);
} else {
$point->addField($key, (string) $value);
}
}
if (isset($data['timestamp'])) {
$point->time($data['timestamp']);
}
$this->influxClient->getWriteApi()->write($point);
return true;
}
public function query(string $measurement, array $tags, string $timeRange): array
{
$query = sprintf(
'from(bucket: "iot") |> range(start: %s) |> filter(fn: (r) => r._measurement == "%s")',
$timeRange,
$measurement
);
foreach ($tags as $key => $value) {
$query .= sprintf(' |> filter(fn: (r) => r.%s == "%s")', $key, $value);
}
return $this->influxClient->getQueryApi()->query($query);
}
public function getLatest(string $deviceId, string $measurement): ?array
{
$query = sprintf(
'from(bucket: "iot") |> range(start: -1h) |> filter(fn: (r) => r._measurement == "%s" and r.device_id == "%s") |> last()',
$measurement,
$deviceId
);
$result = $this->influxClient->getQueryApi()->query($query);
if (empty($result)) {
return null;
}
return $this->parseResult($result);
}
private function parseResult(array $result): array
{
$data = [];
foreach ($result as $table) {
foreach ($table->records as $record) {
$field = $record->getField();
$value = $record->getValue();
$data[$field] = $value;
}
}
return $data;
}
}完整使用示例
php
<?php
require_once 'vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use App\IoT\{DeviceGateway, DataProcessor, RuleEngine, AlertService, TimeSeriesStorage};
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$redis = new Redis();
$redis->connect('localhost', 6379);
$rules = [
[
'name' => 'high_temperature',
'device_type' => 'sensor',
'conditions' => [
['field' => 'temperature', 'operator' => 'gt', 'value' => 35],
],
'severity' => 'warning',
'message_template' => '设备 {device_id} 温度过高',
],
[
'name' => 'low_battery',
'device_type' => 'sensor',
'conditions' => [
['field' => 'battery', 'operator' => 'lt', 'value' => 20],
],
'severity' => 'info',
'message_template' => '设备 {device_id} 电量不足',
],
];
$ruleEngine = new RuleEngine($redis, $rules);
$alertService = new AlertService($redis);
$storage = new TimeSeriesStorage($influxClient);
$gateway = new DeviceGateway($connection);
echo "=== 物联网示例 ===\n\n";
echo "1. 设备上报数据\n";
$rawMessage = json_encode([
'device_id' => 'DEV001',
'device_type' => 'sensor',
'message_type' => 'data',
'payload' => [
'temperature' => 28.5,
'humidity' => 65.0,
'battery' => 85,
],
]);
$result = $gateway->handleDeviceMessage($rawMessage);
echo "消息处理结果: " . json_encode($result) . "\n";
echo "\n2. 发送设备指令\n";
$commandId = $gateway->sendCommand('DEV001', 'set_interval', ['interval' => 60]);
echo "指令已发送: {$commandId}\n";
echo "\n3. 启动数据处理器...\n";
$processor = new DataProcessor($connection, $storage, $ruleEngine, $alertService);
$pid = pcntl_fork();
if ($pid === 0) {
$processor->consume();
exit(0);
}
sleep(2);
$highTempMessage = json_encode([
'device_id' => 'DEV002',
'device_type' => 'sensor',
'message_type' => 'data',
'payload' => [
'temperature' => 38.5,
'humidity' => 70.0,
'battery' => 15,
],
]);
echo "\n4. 触发告警规则\n";
$gateway->handleDeviceMessage($highTempMessage);
sleep(2);
posix_kill($pid, SIGTERM);
pcntl_wait($status);
$connection->close();
echo "\n=== 示例完成 ===\n";关键技术点解析
1. 设备认证
php
private function validateDevice(string $deviceId, string $token): bool
{
$cached = $this->redis->get("device:token:{$deviceId}");
if ($cached === $token) {
return true;
}
return $this->db->validateDeviceToken($deviceId, $token);
}2. 数据压缩
php
private function compressPayload(array $payload): string
{
return gzencode(json_encode($payload), 6);
}3. 批量写入
php
public function writeBatch(array $points): void
{
$this->influxClient->getWriteApi()->write($points);
}4. 告警去重
php
private function shouldAlert(string $deviceId, string $alertType): bool
{
$key = "alert:dedup:{$deviceId}:{$alertType}";
return !$this->redis->exists($key);
}性能优化建议
| 优化项 | 建议 | 说明 |
|---|---|---|
| 批量处理 | 合并消息批量处理 | 提高吞吐量 |
| 数据分片 | 按设备ID分片 | 提高并行度 |
| 边缘计算 | 边缘节点预处理 | 减少云端压力 |
| 冷热分离 | 热数据Redis,冷数据时序库 | 优化存储成本 |
常见问题与解决方案
1. 设备掉线
解决方案: 心跳检测 + 离线消息缓存
2. 消息积压
解决方案: 增加消费者 + 背压控制
3. 数据丢失
解决方案: 消息持久化 + 确认机制
