Skip to content

AMQP 帧结构

一、概述

AMQP 协议使用帧(Frame)作为数据传输的基本单位。理解帧结构对于深入理解 AMQP 协议、排查网络问题和优化性能都非常重要。

1.1 什么是帧

帧是 AMQP 协议数据传输的最小单位,类似于网络协议中的数据包。每条 AMQP 命令都被封装成一个或多个帧进行传输。

text
┌─────────────────────────────────────────────────────────────────────┐
│                         AMQP 帧的概念                                │
├─────────────────────────────────────────────────────────────────────┤
│                                                                     │
│  消息 (Message)                                                     │
│  ┌─────────────────────────────────────────────────────────────┐   │
│  │  这是一条完整的消息内容,可能很长...                          │   │
│  └─────────────────────────────────────────────────────────────┘   │
│                              │                                      │
│                              ▼ 分帧                                 │
│  ┌─────────────┐ ┌─────────────┐ ┌─────────────┐                  │
│  │   Frame 1   │ │   Frame 2   │ │   Frame 3   │                  │
│  │  (头部信息)  │ │  (消息属性)  │ │  (消息体)    │                  │
│  └─────────────┘ └─────────────┘ └─────────────┘                  │
│                                                                     │
└─────────────────────────────────────────────────────────────────────┘

1.2 帧的类型

帧类型类型值说明
Method Frame1方法帧,携带 AMQP 方法
Content Header Frame2内容头帧,携带消息属性
Content Body Frame3内容体帧,携带消息体
Heartbeat Frame8心跳帧,保持连接活跃

二、核心知识点

2.1 帧结构详解

2.1.1 通用帧格式

text
┌─────────────────────────────────────────────────────────────────────┐
│                        AMQP 帧通用格式                               │
├─────────────────────────────────────────────────────────────────────┤
│                                                                     │
│  ┌──────────┬──────────┬──────────┬───────────────┬──────────┐    │
│  │   Type   │ Channel  │  Size    │    Payload    │   End    │    │
│  │  1 byte  │  2 bytes │  4 bytes │   Size bytes  │  1 byte  │    │
│  └──────────┴──────────┴──────────┴───────────────┴──────────┘    │
│                                                                     │
│  字段说明:                                                         │
│  ├── Type: 帧类型 (1=方法, 2=内容头, 3=内容体, 8=心跳)              │
│  ├── Channel: 通道编号 (0=连接级别, 1-65535=通道级别)               │
│  ├── Size: 负载大小 (4字节无符号整数)                               │
│  ├── Payload: 帧负载内容                                            │
│  └── End: 帧结束标志 (0xCE)                                         │
│                                                                     │
└─────────────────────────────────────────────────────────────────────┘

2.1.2 方法帧结构

text
┌─────────────────────────────────────────────────────────────────────┐
│                        方法帧 (Method Frame)                         │
├─────────────────────────────────────────────────────────────────────┤
│                                                                     │
│  ┌──────────┬──────────┬──────────┬─────────────────────┬───────┐ │
│  │   Type   │ Channel  │  Size    │      Payload        │  End  │ │
│  │   = 1    │  2 bytes │  4 bytes │                     │ = CE  │ │
│  └──────────┴──────────┴──────────┴─────────────────────┴───────┘ │
│                                               │                     │
│                                               ▼                     │
│                         ┌─────────────────────────────────────┐    │
│                         │           Payload 结构              │    │
│                         ├─────────────────────────────────────┤    │
│                         │  Class ID (2 bytes)                 │    │
│                         │  Method ID (2 bytes)                │    │
│                         │  Method Arguments (变长)            │    │
│                         └─────────────────────────────────────┘    │
│                                                                     │
│  示例 - Basic.Publish 方法帧:                                      │
│  ┌────────────────────────────────────────────────────────────┐   │
│  │ Type=1 │ Ch=1 │ Size=XX │ Class=60 │ Method=40 │ Args... │CE │ │
│  └────────────────────────────────────────────────────────────┘   │
│                                                                     │
└─────────────────────────────────────────────────────────────────────┘

2.1.3 内容头帧结构

text
┌─────────────────────────────────────────────────────────────────────┐
│                    内容头帧 (Content Header Frame)                   │
├─────────────────────────────────────────────────────────────────────┤
│                                                                     │
│  ┌──────────┬──────────┬──────────┬─────────────────────┬───────┐ │
│  │   Type   │ Channel  │  Size    │      Payload        │  End  │ │
│  │   = 2    │  2 bytes │  4 bytes │                     │ = CE  │ │
│  └──────────┴──────────┴──────────┴─────────────────────┴───────┘ │
│                                               │                     │
│                                               ▼                     │
│                         ┌─────────────────────────────────────┐    │
│                         │           Payload 结构              │    │
│                         ├─────────────────────────────────────┤    │
│                         │  Class ID (2 bytes)                 │    │
│                         │  Weight (2 bytes, 通常为 0)         │    │
│                         │  Body Size (8 bytes)                │    │
│                         │  Property Flags (2 bytes)           │    │
│                         │  Property List (变长)               │    │
│                         └─────────────────────────────────────┘    │
│                                                                     │
│  属性标志位说明:                                                   │
│  ┌────────────────────────────────────────────────────────────┐   │
│  │ Bit 15: content_type      │ Bit 7:  priority              │   │
│  │ Bit 14: content_encoding  │ Bit 6:  correlation_id        │   │
│  │ Bit 13: headers           │ Bit 5:  reply_to              │   │
│  │ Bit 12: delivery_mode     │ Bit 4:  expiration            │   │
│  │ Bit 11: priority          │ Bit 3:  message_id            │   │
│  │ ...                        │ ...                           │   │
│  └────────────────────────────────────────────────────────────┘   │
│                                                                     │
└─────────────────────────────────────────────────────────────────────┘

2.1.4 内容体帧结构

text
┌─────────────────────────────────────────────────────────────────────┐
│                      内容体帧 (Content Body Frame)                   │
├─────────────────────────────────────────────────────────────────────┤
│                                                                     │
│  ┌──────────┬──────────┬──────────┬─────────────────────┬───────┐ │
│  │   Type   │ Channel  │  Size    │      Payload        │  End  │ │
│  │   = 3    │  2 bytes │  4 bytes │   (消息体数据)       │ = CE  │ │
│  └──────────┴──────────┴──────────┴─────────────────────┴───────┘ │
│                                                                     │
│  说明:                                                             │
│  ├── Payload 就是消息体的原始数据                                   │
│  ├── 大消息会被分割成多个内容体帧                                    │
│  └── 每个帧的最大大小由 frame_max 参数决定                          │
│                                                                     │
└─────────────────────────────────────────────────────────────────────┘

2.1.5 心跳帧结构

text
┌─────────────────────────────────────────────────────────────────────┐
│                        心跳帧 (Heartbeat Frame)                      │
├─────────────────────────────────────────────────────────────────────┤
│                                                                     │
│  ┌──────────┬──────────┬──────────┬─────────────────────┬───────┐ │
│  │   Type   │ Channel  │  Size    │      Payload        │  End  │ │
│  │   = 8    │   = 0    │   = 0    │       (空)          │ = CE  │ │
│  └──────────┴──────────┴──────────┴─────────────────────┴───────┘ │
│                                                                     │
│  说明:                                                             │
│  ├── 心跳帧没有负载内容                                             │
│  ├── Channel 必须为 0(连接级别)                                   │
│  ├── 用于检测连接是否存活                                           │
│  └── 心跳间隔由 Connection.Tune 协商确定                            │
│                                                                     │
└─────────────────────────────────────────────────────────────────────┘

2.2 消息传输流程

2.2.1 完整消息的帧序列

text
发布一条消息的帧序列:

客户端                                      服务端
   │                                          │
   │ ──── Method Frame ────────────────────► │
   │      Basic.Publish                      │
   │      (exchange, routing-key)            │
   │                                          │
   │ ──── Content Header Frame ────────────► │
   │      (body-size, properties)            │
   │                                          │
   │ ──── Content Body Frame 1 ────────────► │
   │      (部分消息体)                        │
   │                                          │
   │ ──── Content Body Frame 2 ────────────► │
   │      (部分消息体)                        │
   │                                          │
   │ ──── Content Body Frame N ────────────► │
   │      (剩余消息体)                        │
   │                                          │

帧序列示意:
┌─────────────────┐
│  Method Frame   │  Basic.Publish 方法
│   Type = 1      │
├─────────────────┤
│  Header Frame   │  消息属性
│   Type = 2      │  body-size = 总大小
├─────────────────┤
│  Body Frame 1   │  消息体片段 1
│   Type = 3      │
├─────────────────┤
│  Body Frame 2   │  消息体片段 2
│   Type = 3      │
├─────────────────┤
│      ...        │
├─────────────────┤
│  Body Frame N   │  消息体片段 N
│   Type = 3      │
└─────────────────┘

2.2.2 帧大小限制

text
帧大小限制 (frame_max):

┌─────────────────────────────────────────────────────────────────────┐
│  frame_max 参数在 Connection.Tune 时协商:                          │
│                                                                     │
│  默认值:131072 bytes (128 KB)                                      │
│  最小值:4096 bytes (4 KB)                                          │
│  推荐值:根据消息大小调整                                           │
│                                                                     │
│  计算示例:                                                         │
│  ├── 消息体大小:1 MB                                               │
│  ├── frame_max:128 KB                                              │
│  └── 需要的内容体帧数:ceil(1 MB / 128 KB) = 8 帧                   │
│                                                                     │
└─────────────────────────────────────────────────────────────────────┘

2.3 协议头

2.3.1 协议头格式

text
连接建立时,客户端首先发送协议头:

┌─────────────────────────────────────────────────────────────────────┐
│                        AMQP 协议头                                   │
├─────────────────────────────────────────────────────────────────────┤
│                                                                     │
│  ┌───────┬───────┬───────┬───────┬───────┬───────┬───────┬───────┐│
│  │  'A'  │  'M'  │  'Q'  │  'P'  │  0    │  0    │  9    │  1    ││
│  └───────┴───────┴───────┴───────┴───────┴───────┴───────┴───────┘│
│                                                                     │
│  说明:                                                             │
│  ├── 前 4 字节:固定为 "AMQP"                                       │
│  ├── 后 4 字节:协议版本 (0-9-1)                                    │
│  └── 总共 8 字节                                                    │
│                                                                     │
│  版本对照:                                                         │
│  ├── AMQP 0-9-1:  0x00 0x00 0x09 0x01                              │
│  ├── AMQP 0-9:    0x00 0x00 0x09 0x00                              │
│  └── AMQP 0-8:    0x00 0x00 0x08 0x00                              │
│                                                                     │
└─────────────────────────────────────────────────────────────────────┘

2.4 数据编码

2.4.1 基本数据类型

text
AMQP 使用的数据类型:

┌──────────────┬────────────┬───────────────────────────────────────┐
│    类型      │   大小     │                 说明                  │
├──────────────┼────────────┼───────────────────────────────────────┤
│    bit       │   1 bit    │ 布尔值,多个 bit 打包成字节           │
├──────────────┼────────────┼───────────────────────────────────────┤
│    octet     │   1 byte   │ 无符号 8 位整数                       │
├──────────────┼────────────┼───────────────────────────────────────┤
│    short     │   2 bytes  │ 无符号 16 位整数,大端序              │
├──────────────┼────────────┼───────────────────────────────────────┤
│    long      │   4 bytes  │ 无符号 32 位整数,大端序              │
├──────────────┼────────────┼───────────────────────────────────────┤
│  longlong    │   8 bytes  │ 无符号 64 位整数,大端序              │
├──────────────┼────────────┼───────────────────────────────────────┤
│ shortstr     │  1+255     │ 短字符串,1字节长度 + 最大255字节内容  │
├──────────────┼────────────┼───────────────────────────────────────┤
│ longstr      │  4+N       │ 长字符串,4字节长度 + N字节内容        │
├──────────────┼────────────┼───────────────────────────────────────┤
│ timestamp    │   8 bytes  │ 64 位时间戳                          │
├──────────────┼────────────┼───────────────────────────────────────┤
│ table        │   变长     │键值对表,4字节长度 + 内容             │
└──────────────┴────────────┴───────────────────────────────────────┘

2.4.2 Table 编码

text
Table (键值对) 编码格式:

┌─────────────────────────────────────────────────────────────────────┐
│                        Table 编码结构                                │
├─────────────────────────────────────────────────────────────────────┤
│                                                                     │
│  ┌─────────────────────────────────────────────────────────────┐   │
│  │  Table Size (4 bytes)                                       │   │
│  ├─────────────────────────────────────────────────────────────┤   │
│  │  Entry 1:                                                   │   │
│  │  ├── Key Length (1 byte)                                    │   │
│  │  ├── Key (N bytes)                                          │   │
│  │  ├── Value Type (1 byte)                                    │   │
│  │  └── Value (变长)                                           │   │
│  ├─────────────────────────────────────────────────────────────┤   │
│  │  Entry 2:                                                   │   │
│  │  └── ...                                                    │   │
│  ├─────────────────────────────────────────────────────────────┤   │
│  │  Entry N:                                                   │   │
│  │  └── ...                                                    │   │
│  └─────────────────────────────────────────────────────────────┘   │
│                                                                     │
│  Value Type 值:                                                    │
│  ├── 'S' = string                                                  │
│  ├── 'I' = 32-bit integer                                          │
│  ├── 'D' = decimal                                                 │
│  ├── 'T' = timestamp                                               │
│  ├── 'F' = table (嵌套)                                            │
│  ├── 'A' = array                                                   │
│  ├── 't' = boolean                                                 │
│  ├── 'l' = 64-bit integer                                          │
│  └── 'x' = byte array                                              │
│                                                                     │
└─────────────────────────────────────────────────────────────────────┘

三、代码示例

3.1 PHP 帧解析示例

php
<?php
class AMQPFrameParser
{
    const FRAME_METHOD = 1;
    const FRAME_HEADER = 2;
    const FRAME_BODY = 3;
    const FRAME_HEARTBEAT = 8;
    const FRAME_END = 0xCE;

    public function parseFrame(string $data): array
    {
        if (strlen($data) < 8) {
            throw new InvalidArgumentException('数据太短,无法解析帧');
        }

        $type = ord($data[0]);
        $channel = unpack('n', substr($data, 1, 2))[1];
        $size = unpack('N', substr($data, 3, 4))[1];
        $payload = substr($data, 7, $size);
        $end = ord($data[7 + $size]);

        if ($end !== self::FRAME_END) {
            throw new RuntimeException('帧结束标志无效');
        }

        return [
            'type' => $type,
            'type_name' => $this->getFrameTypeName($type),
            'channel' => $channel,
            'size' => $size,
            'payload' => $payload,
        ];
    }

    public function parseMethodFrame(string $payload): array
    {
        $classId = unpack('n', substr($payload, 0, 2))[1];
        $methodId = unpack('n', substr($payload, 2, 2))[1];
        $arguments = substr($payload, 4);

        return [
            'class_id' => $classId,
            'method_id' => $methodId,
            'class_name' => $this->getClassName($classId),
            'method_name' => $this->getMethodName($classId, $methodId),
            'arguments' => $arguments,
        ];
    }

    public function parseContentHeader(string $payload): array
    {
        $classId = unpack('n', substr($payload, 0, 2))[1];
        $weight = unpack('n', substr($payload, 2, 2))[1];
        $bodySize = unpack('J', substr($payload, 4, 8))[1];
        $propertyFlags = unpack('n', substr($payload, 12, 2))[1];
        $propertyList = substr($payload, 14);

        return [
            'class_id' => $classId,
            'weight' => $weight,
            'body_size' => $bodySize,
            'property_flags' => $propertyFlags,
            'property_list' => $propertyList,
        ];
    }

    private function getFrameTypeName(int $type): string
    {
        return match ($type) {
            self::FRAME_METHOD => 'METHOD',
            self::FRAME_HEADER => 'HEADER',
            self::FRAME_BODY => 'BODY',
            self::FRAME_HEARTBEAT => 'HEARTBEAT',
            default => 'UNKNOWN',
        };
    }

    private function getClassName(int $classId): string
    {
        return match ($classId) {
            10 => 'connection',
            20 => 'channel',
            30 => 'access',
            40 => 'exchange',
            50 => 'queue',
            60 => 'basic',
            90 => 'tx',
            default => "class_{$classId}",
        };
    }

    private function getMethodName(int $classId, int $methodId): string
    {
        $methods = [
            10 => [10 => 'start', 20 => 'secure', 30 => 'tune', 40 => 'open', 50 => 'close'],
            20 => [10 => 'open', 20 => 'flow', 40 => 'close'],
            40 => [10 => 'declare', 20 => 'delete'],
            50 => [10 => 'declare', 20 => 'bind', 30 => 'purge', 40 => 'delete'],
            60 => [10 => 'qos', 20 => 'consume', 30 => 'cancel', 40 => 'publish', 50 => 'return', 60 => 'deliver', 70 => 'get', 80 => 'ack', 90 => 'reject', 100 => 'recover'],
        ];

        return $methods[$classId][$methodId] ?? "method_{$methodId}";
    }

    public function buildHeartbeatFrame(): string
    {
        return pack('CnNC', self::FRAME_HEARTBEAT, 0, 0, self::FRAME_END);
    }

    public function buildMethodFrame(int $channel, int $classId, int $methodId, string $arguments = ''): string
    {
        $payload = pack('nn', $classId, $methodId) . $arguments;
        return pack('CnN', self::FRAME_METHOD, $channel, strlen($payload)) . $payload . chr(self::FRAME_END);
    }
}

$parser = new AMQPFrameParser();

$heartbeat = $parser->buildHeartbeatFrame();
echo "心跳帧: " . bin2hex($heartbeat) . "\n";

$parsed = $parser->parseFrame($heartbeat);
print_r($parsed);

3.2 帧大小优化示例

php
<?php
require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

class FrameOptimizedProducer
{
    private $connection;
    private $channel;
    private $frameMax;

    public function __construct(string $host, int $port, int $frameMax = 131072)
    {
        $this->connection = new AMQPStreamConnection(
            $host,
            $port,
            'guest',
            'guest',
            '/',
            false,
            'AMQPLAIN',
            null,
            'en_US',
            3.0,
            3.0,
            null,
            true,
            60,
            $frameMax
        );
        
        $this->channel = $this->connection->channel();
        $this->frameMax = $frameMax;
    }

    public function publishLargeMessage(string $exchange, string $routingKey, string $body): void
    {
        $messageSize = strlen($body);
        $estimatedFrames = $this->estimateFrameCount($messageSize);
        
        echo "消息大小: {$messageSize} 字节\n";
        echo "帧大小限制: {$this->frameMax} 字节\n";
        echo "预计帧数量: {$estimatedFrames}\n";
        
        $message = new AMQPMessage($body, [
            'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
            'content_type' => 'application/json',
        ]);
        
        $this->channel->basic_publish($message, $exchange, $routingKey);
        
        echo "消息发布成功\n";
    }

    private function estimateFrameCount(int $bodySize): int
    {
        $methodFrameOverhead = 20;
        $headerFrameOverhead = 22;
        $bodyFrameOverhead = 8;
        
        $bodyFramePayload = $this->frameMax - $bodyFrameOverhead;
        $bodyFrames = ceil($bodySize / $bodyFramePayload);
        
        return 1 + 1 + (int)$bodyFrames;
    }

    public function close(): void
    {
        $this->channel->close();
        $this->connection->close();
    }
}

$producer = new FrameOptimizedProducer('localhost', 5672, 131072);

$producer->publishLargeMessage(
    'test.exchange',
    'test.key',
    str_repeat('x', 500000)
);

$producer->close();

3.3 心跳帧处理示例

php
<?php
require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;

class HeartbeatMonitor
{
    private $connection;
    private $channel;
    private $heartbeatInterval;
    private $lastHeartbeat;

    public function __construct(string $host, int $port, int $heartbeat = 60)
    {
        $this->heartbeatInterval = $heartbeat;
        $this->lastHeartbeat = time();
        
        $this->connection = new AMQPStreamConnection(
            $host,
            $port,
            'guest',
            'guest',
            '/',
            false,
            'AMQPLAIN',
            null,
            'en_US',
            3.0,
            3.0,
            null,
            true,
            $heartbeat
        );
        
        $this->channel = $this->connection->channel();
        
        echo "连接建立,心跳间隔: {$heartbeat} 秒\n";
    }

    public function startMonitoring(): void
    {
        $this->connection->set_heartbeat_callback(function () {
            $this->lastHeartbeat = time();
            echo "[" . date('Y-m-d H:i:s') . "] 心跳帧已发送\n";
        });
    }

    public function checkHeartbeat(): bool
    {
        $elapsed = time() - $this->lastHeartbeat;
        $threshold = $this->heartbeatInterval * 2;
        
        if ($elapsed > $threshold) {
            echo "警告: 心跳超时!已过 {$elapsed} 秒\n";
            return false;
        }
        
        echo "心跳正常,距上次: {$elapsed} 秒\n";
        return true;
    }

    public function getConnectionInfo(): array
    {
        return [
            'is_connected' => $this->connection->isConnected(),
            'heartbeat_interval' => $this->heartbeatInterval,
            'last_heartbeat' => date('Y-m-d H:i:s', $this->lastHeartbeat),
            'seconds_since_heartbeat' => time() - $this->lastHeartbeat,
        ];
    }

    public function close(): void
    {
        $this->channel->close();
        $this->connection->close();
    }
}

$monitor = new HeartbeatMonitor('localhost', 5672, 30);
$monitor->startMonitoring();

for ($i = 0; $i < 5; $i++) {
    sleep(10);
    $monitor->checkHeartbeat();
    print_r($monitor->getConnectionInfo());
}

$monitor->close();

四、实际应用场景

4.1 大文件传输

text
场景:传输大型文件或数据包

帧处理策略:
┌─────────────────────────────────────────────────────────────────────┐
│  大文件传输优化                                                      │
├─────────────────────────────────────────────────────────────────────┤
│                                                                     │
│  1. 增大 frame_max                                                  │
│     ├── 减少帧数量                                                  │
│     ├── 降低网络开销                                                │
│     └── 提高传输效率                                                │
│                                                                     │
│  2. 分块传输                                                        │
│     ├── 将大文件分成多个消息                                        │
│     ├── 每个消息携带序号                                            │
│     └── 消费端重组                                                  │
│                                                                     │
│  3. 流量控制                                                        │
│     ├── 使用 QoS 限制未确认消息数                                   │
│     ├── 避免内存溢出                                                │
│     └── 平衡生产消费速度                                            │
│                                                                     │
└─────────────────────────────────────────────────────────────────────┘

4.2 高频消息场景

text
场景:高频交易或实时数据推送

帧优化策略:
┌─────────────────────────────────────────────────────────────────────┐
│  高频消息优化                                                        │
├─────────────────────────────────────────────────────────────────────┤
│                                                                     │
│  1. 批量发送                                                        │
│     ├── 合并多个小消息                                              │
│     ├── 减少帧数量                                                  │
│     └── 降低网络延迟                                                │
│                                                                     │
│  2. 调整心跳间隔                                                    │
│     ├── 高频场景可增大心跳间隔                                      │
│     ├── 减少心跳帧开销                                              │
│     └── 但需注意连接检测                                            │
│                                                                     │
│  3. 连接复用                                                        │
│     ├── 使用 Channel 多路复用                                       │
│     ├── 避免频繁建立连接                                            │
│     └── 减少协议握手开销                                            │
│                                                                     │
└─────────────────────────────────────────────────────────────────────┘

五、常见问题与解决方案

5.1 帧过大错误

问题描述

Exception: frame larger than max frame size

解决方案

php
<?php
$frameMax = 256 * 1024;
$connection = new AMQPStreamConnection(
    'localhost',
    5672,
    'guest',
    'guest',
    '/',
    false,
    'AMQPLAIN',
    null,
    'en_US',
    3.0,
    3.0,
    null,
    true,
    60,
    $frameMax
);

5.2 心跳超时断开

问题描述: 连接因心跳超时而断开。

解决方案

php
<?php
$connection = new AMQPStreamConnection(
    'localhost',
    5672,
    'guest',
    'guest',
    '/',
    false,
    'AMQPLAIN',
    null,
    'en_US',
    3.0,
    3.0,
    null,
    true,
    60
);

$connection->set_heartbeat_callback(function () {
    echo "心跳发送\n";
});

5.3 帧解析错误

问题描述: 帧解析时出现数据错乱。

解决方案

php
<?php
class FrameReader
{
    private $buffer = '';
    
    public function append(string $data): void
    {
        $this->buffer .= $data;
    }
    
    public function hasCompleteFrame(): bool
    {
        if (strlen($this->buffer) < 7) {
            return false;
        }
        
        $size = unpack('N', substr($this->buffer, 3, 4))[1];
        return strlen($this->buffer) >= 7 + $size + 1;
    }
    
    public function readFrame(): ?string
    {
        if (!$this->hasCompleteFrame()) {
            return null;
        }
        
        $size = unpack('N', substr($this->buffer, 3, 4))[1];
        $frame = substr($this->buffer, 0, 7 + $size + 1);
        $this->buffer = substr($this->buffer, 7 + $size + 1);
        
        return $frame;
    }
}

六、最佳实践建议

6.1 帧大小配置

text
配置建议:
├── 小消息场景:默认 128 KB 即可
├── 大消息场景:增大到 256 KB - 1 MB
├── 超大消息:考虑分块传输
└── 注意:服务端也需要相应配置

6.2 心跳配置

text
心跳建议:
├── 局域网:30-60 秒
├── 公网:60-120 秒
├── 高延迟网络:可适当增大
└── 监控:实现心跳超时告警

6.3 性能优化

text
性能建议:
├── 批量发送:减少帧数量
├── 合理帧大小:平衡内存和网络
├── 异步处理:避免阻塞等待
└── 监控帧统计:分析性能瓶颈

七、相关链接