Skip to content

消息压缩

概述

消息压缩是减少消息体积、降低网络传输开销和存储空间的重要手段。对于大消息或高频消息场景,压缩可以显著提升系统性能。

核心原理

压缩流程

mermaid
graph LR
    subgraph 生产者
        P[原始消息] --> C[压缩]
        C --> S[发送]
    end
    
    subgraph 消费者
        R[接收] --> D[解压]
        D --> M[原始消息]
    end
    
    S --> R

压缩算法对比

算法压缩比速度适用场景
gzip文本、JSON
zlib通用
bz2最高大文件
lz4最快实时场景
snappy极快高吞吐

压缩决策

mermaid
graph TD
    M[消息] --> S{大小检查}
    S -->|小于阈值| N[不压缩]
    S -->|大于阈值| T{类型检查}
    T -->|文本/JSON| G[gzip压缩]
    T -->|二进制| L[lz4压缩]
    T -->|大文件| B[bz2压缩]

PHP 代码示例

基本压缩与解压

php
<?php

require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

class MessageCompressor
{
    const THRESHOLD = 1024;  // 1KB 以上才压缩
    const LEVEL = 6;         // 压缩级别 1-9
    
    public static function compress($data)
    {
        $serialized = is_string($data) ? $data : json_encode($data);
        
        // 小于阈值不压缩
        if (strlen($serialized) < self::THRESHOLD) {
            return [
                'body' => $serialized,
                'compressed' => false
            ];
        }
        
        $compressed = gzencode($serialized, self::LEVEL);
        
        return [
            'body' => $compressed,
            'compressed' => true,
            'original_size' => strlen($serialized),
            'compressed_size' => strlen($compressed)
        ];
    }
    
    public static function decompress($data, $compressed = true)
    {
        if (!$compressed) {
            return $data;
        }
        
        $decompressed = gzdecode($data);
        
        return $decompressed;
    }
    
    public static function getCompressionRatio($original, $compressed)
    {
        return round((1 - strlen($compressed) / strlen($original)) * 100, 2);
    }
}

// 使用示例
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$queueName = 'compressed-queue';
$channel->queue_declare($queueName, false, true, false, false);

// 发送压缩消息
$largeData = [
    'records' => array_fill(0, 1000, [
        'id' => rand(1, 10000),
        'name' => 'User ' . rand(1, 100),
        'email' => 'user' . rand(1, 100) . '@example.com'
    ])
];

$compressed = MessageCompressor::compress($largeData);

$message = new AMQPMessage(
    $compressed['body'],
    [
        'content_type' => 'application/json',
        'content_encoding' => $compressed['compressed'] ? 'gzip' : 'UTF-8',
        'headers' => [
            'x-compressed' => $compressed['compressed'],
            'x-original-size' => $compressed['original_size'] ?? null
        ]
    ]
);

$channel->basic_publish($message, '', $queueName);

if ($compressed['compressed']) {
    $ratio = MessageCompressor::getCompressionRatio(
        $compressed['original_size'],
        $compressed['compressed_size']
    );
    echo "消息已压缩,压缩率: {$ratio}%\n";
}

$channel->close();
$connection->close();

消费者解压处理

php
<?php

require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$queueName = 'compressed-queue';
$channel->queue_declare($queueName, false, true, false, false);

echo "等待压缩消息...\n";

$callback = function (AMQPMessage $msg) {
    $contentEncoding = $msg->get('content_encoding');
    $body = $msg->getBody();
    
    // 检查是否压缩
    $isCompressed = $contentEncoding === 'gzip' || 
                    ($msg->has('application_headers') && 
                     $msg->get('application_headers')->getNativeData()['x-compressed'] ?? false);
    
    if ($isCompressed) {
        echo "收到压缩消息,正在解压...\n";
        $body = gzdecode($body);
    }
    
    $data = json_decode($body, true);
    
    echo "消息内容:\n";
    echo "  记录数: " . count($data['records'] ?? []) . "\n";
    echo "-------------------\n";
    
    $msg->ack();
};

$channel->basic_consume($queueName, '', false, false, false, false, $callback);

while ($channel->is_consuming()) {
    $channel->wait();
}

$channel->close();
$connection->close();

多算法压缩器

php
<?php

use PhpAmqpLib\Message\AMQPMessage;

class MultiAlgorithmCompressor
{
    const ALGO_GZIP = 'gzip';
    const ALGO_ZLIB = 'zlib';
    const ALGO_BZ2 = 'bz2';
    const ALGO_LZ4 = 'lz4';
    
    private $threshold;
    private $defaultAlgo;
    
    public function __construct($threshold = 1024, $defaultAlgo = self::ALGO_GZIP)
    {
        $this->threshold = $threshold;
        $this->defaultAlgo = $defaultAlgo;
    }
    
    public function compress($data, $algo = null)
    {
        $algo = $algo ?? $this->defaultAlgo;
        $serialized = is_string($data) ? $data : json_encode($data);
        $originalSize = strlen($serialized);
        
        // 小于阈值不压缩
        if ($originalSize < $this->threshold) {
            return [
                'body' => $serialized,
                'compressed' => false,
                'algorithm' => null
            ];
        }
        
        $compressed = $this->doCompress($serialized, $algo);
        $compressedSize = strlen($compressed);
        
        // 如果压缩后更大,则不压缩
        if ($compressedSize >= $originalSize) {
            return [
                'body' => $serialized,
                'compressed' => false,
                'algorithm' => null
            ];
        }
        
        return [
            'body' => $compressed,
            'compressed' => true,
            'algorithm' => $algo,
            'original_size' => $originalSize,
            'compressed_size' => $compressedSize,
            'ratio' => round((1 - $compressedSize / $originalSize) * 100, 2)
        ];
    }
    
    public function decompress($data, $algo)
    {
        if (!$algo) {
            return $data;
        }
        
        return $this->doDecompress($data, $algo);
    }
    
    private function doCompress($data, $algo)
    {
        switch ($algo) {
            case self::ALGO_GZIP:
                return gzencode($data, 6);
            case self::ALGO_ZLIB:
                return zlib_encode($data, ZLIB_ENCODING_DEFLATE);
            case self::ALGO_BZ2:
                return bzcompress($data, 6);
            case self::ALGO_LZ4:
                if (function_exists('lz4_compress')) {
                    return lz4_compress($data);
                }
                throw new RuntimeException('LZ4 extension not installed');
            default:
                throw new InvalidArgumentException("Unknown algorithm: {$algo}");
        }
    }
    
    private function doDecompress($data, $algo)
    {
        switch ($algo) {
            case self::ALGO_GZIP:
                return gzdecode($data);
            case self::ALGO_ZLIB:
                return zlib_decode($data);
            case self::ALGO_BZ2:
                return bzdecompress($data);
            case self::ALGO_LZ4:
                if (function_exists('lz4_uncompress')) {
                    return lz4_uncompress($data);
                }
                throw new RuntimeException('LZ4 extension not installed');
            default:
                throw new InvalidArgumentException("Unknown algorithm: {$algo}");
        }
    }
    
    public function selectBestAlgorithm($data)
    {
        $serialized = is_string($data) ? $data : json_encode($data);
        $results = [];
        
        foreach ([self::ALGO_GZIP, self::ALGO_ZLIB, self::ALGO_BZ2] as $algo) {
            try {
                $compressed = $this->doCompress($serialized, $algo);
                $results[$algo] = strlen($compressed);
            } catch (Exception $e) {
                continue;
            }
        }
        
        if (empty($results)) {
            return $this->defaultAlgo;
        }
        
        return array_keys($results, min($results))[0];
    }
}

压缩消息工厂

php
<?php

use PhpAmqpLib\Message\AMQPMessage;

class CompressedMessageFactory
{
    private $compressor;
    
    public function __construct($threshold = 1024)
    {
        $this->compressor = new MultiAlgorithmCompressor($threshold);
    }
    
    public function create($data, $algo = null)
    {
        $result = $this->compressor->compress($data, $algo);
        
        $properties = [
            'content_type' => 'application/json',
            'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT
        ];
        
        if ($result['compressed']) {
            $properties['content_encoding'] = $result['algorithm'];
            $properties['headers'] = [
                'x-compressed' => true,
                'x-compression-algorithm' => $result['algorithm'],
                'x-original-size' => $result['original_size']
            ];
        }
        
        return new AMQPMessage($result['body'], $properties);
    }
    
    public function extract(AMQPMessage $msg)
    {
        $body = $msg->getBody();
        
        $headers = $msg->has('application_headers')
            ? $msg->get('application_headers')->getNativeData()
            : [];
        
        $isCompressed = $headers['x-compressed'] ?? false;
        $algorithm = $headers['x-compression-algorithm'] ?? null;
        
        if ($isCompressed && $algorithm) {
            $body = $this->compressor->decompress($body, $algorithm);
        }
        
        return json_decode($body, true);
    }
}

实际应用场景

1. 日志批量传输

php
<?php

class LogBatchTransporter
{
    private $channel;
    private $queueName;
    private $compressor;
    private $batchSize = 1000;
    
    public function __construct($channel, $queueName = 'log-batch')
    {
        $this->channel = $channel;
        $this->queueName = $queueName;
        $this->compressor = new MultiAlgorithmCompressor(512);  // 512B 以上压缩
        $this->setupQueue();
    }
    
    private function setupQueue()
    {
        $this->channel->queue_declare($this->queueName, false, true, false, false);
    }
    
    public function sendBatch(array $logs)
    {
        $result = $this->compressor->compress([
            'logs' => $logs,
            'count' => count($logs),
            'sent_at' => time()
        ], MultiAlgorithmCompressor::ALGO_GZIP);
        
        $message = new AMQPMessage(
            $result['body'],
            [
                'content_type' => 'application/json',
                'content_encoding' => $result['compressed'] ? 'gzip' : 'UTF-8',
                'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT
            ]
        );
        
        $this->channel->basic_publish($message, '', $this->queueName);
        
        if ($result['compressed']) {
            echo sprintf(
                "发送 %d 条日志,压缩率: %.2f%%\n",
                count($logs),
                $result['ratio']
            );
        }
    }
    
    public function consumeBatches(callable $handler)
    {
        $callback = function ($msg) use ($handler) {
            $contentEncoding = $msg->get('content_encoding');
            $body = $msg->getBody();
            
            if ($contentEncoding === 'gzip') {
                $body = gzdecode($body);
            }
            
            $data = json_decode($body, true);
            $handler($data['logs']);
            
            $msg->ack();
        };
        
        $this->channel->basic_consume($this->queueName, '', false, false, false, false, $callback);
    }
}

2. 大文件分片传输

php
<?php

class ChunkedFileTransporter
{
    private $channel;
    private $chunkSize = 1024 * 1024;  // 1MB
    
    public function __construct($channel)
    {
        $this->channel = $channel;
    }
    
    public function sendFile($filePath, $metadata = [])
    {
        $fileId = uniqid('file-');
        $fileSize = filesize($filePath);
        $totalChunks = ceil($fileSize / $this->chunkSize);
        
        $queueName = 'file-chunks';
        $this->channel->queue_declare($queueName, false, true, false, false);
        
        $handle = fopen($filePath, 'rb');
        $chunkIndex = 0;
        
        while (!feof($handle)) {
            $chunk = fread($handle, $this->chunkSize);
            
            // 压缩分片
            $compressed = gzencode($chunk, 6);
            
            $message = new AMQPMessage(
                $compressed,
                [
                    'content_type' => 'application/octet-stream',
                    'content_encoding' => 'gzip',
                    'headers' => [
                        'x-file-id' => $fileId,
                        'x-chunk-index' => $chunkIndex,
                        'x-total-chunks' => $totalChunks,
                        'x-file-size' => $fileSize,
                        'x-metadata' => $metadata
                    ]
                ]
            );
            
            $this->channel->basic_publish($message, '', $queueName);
            $chunkIndex++;
        }
        
        fclose($handle);
        
        echo "文件已分片发送: {$fileId}, 共 {$totalChunks} 个分片\n";
        
        return $fileId;
    }
    
    public function receiveFile($fileId, $outputPath)
    {
        $chunks = [];
        $totalChunks = null;
        
        // 接收所有分片
        while (true) {
            $message = $this->channel->basic_get('file-chunks');
            
            if (!$message) {
                break;
            }
            
            $headers = $message->get('application_headers')->getNativeData();
            
            if ($headers['x-file-id'] === $fileId) {
                $chunkIndex = $headers['x-chunk-index'];
                $totalChunks = $headers['x-total-chunks'];
                
                // 解压分片
                $chunks[$chunkIndex] = gzdecode($message->getBody());
                
                $this->channel->basic_ack($message->getDeliveryTag());
            }
        }
        
        // 合并分片
        $handle = fopen($outputPath, 'wb');
        for ($i = 0; $i < $totalChunks; $i++) {
            fwrite($handle, $chunks[$i]);
        }
        fclose($handle);
        
        echo "文件已接收: {$outputPath}\n";
    }
}

3. API 响应缓存

php
<?php

class CompressedCacheService
{
    private $channel;
    private $queueName;
    private $compressor;
    
    public function __construct($channel)
    {
        $this->channel = $channel;
        $this->queueName = 'api-cache';
        $this->compressor = new MultiAlgorithmCompressor(256);
        $this->setupQueue();
    }
    
    private function setupQueue()
    {
        $this->channel->queue_declare($this->queueName, false, true, false, false);
    }
    
    public function cacheResponse($key, $response, $ttl = 3600)
    {
        $result = $this->compressor->compress([
            'key' => $key,
            'response' => $response,
            'cached_at' => time(),
            'ttl' => $ttl
        ]);
        
        $message = new AMQPMessage(
            $result['body'],
            [
                'content_type' => 'application/json',
                'content_encoding' => $result['compressed'] ? 'gzip' : 'UTF-8',
                'expiration' => (string)($ttl * 1000)
            ]
        );
        
        $this->channel->basic_publish($message, '', $this->queueName);
    }
    
    public function getCachedResponse($key)
    {
        // 遍历队列查找缓存
        while ($message = $this->channel->basic_get($this->queueName)) {
            $contentEncoding = $message->get('content_encoding');
            $body = $message->getBody();
            
            if ($contentEncoding === 'gzip') {
                $body = gzdecode($body);
            }
            
            $data = json_decode($body, true);
            
            if ($data['key'] === $key) {
                // 检查是否过期
                if (time() - $data['cached_at'] < $data['ttl']) {
                    return $data['response'];
                }
                break;
            }
        }
        
        return null;
    }
}

常见问题与解决方案

问题 1: 压缩后体积更大

症状: 小消息压缩后反而变大

解决方案:

php
<?php

// 设置合理的压缩阈值
class SmartCompressor
{
    private $threshold = 1024;  // 1KB 以上才压缩
    
    public function compress($data)
    {
        $serialized = json_encode($data);
        
        if (strlen($serialized) < $this->threshold) {
            return $serialized;  // 不压缩
        }
        
        $compressed = gzencode($serialized, 6);
        
        // 压缩后更大则返回原始数据
        if (strlen($compressed) >= strlen($serialized)) {
            return $serialized;
        }
        
        return $compressed;
    }
}

问题 2: 解压失败

症状: 消费者解压时报错

解决方案:

php
<?php

// 添加校验机制
$callback = function ($msg) {
    $body = $msg->getBody();
    $contentEncoding = $msg->get('content_encoding');
    
    if ($contentEncoding === 'gzip') {
        $decompressed = @gzdecode($body);
        
        if ($decompressed === false) {
            // 解压失败,记录错误
            error_log("解压失败: " . $msg->get('message_id'));
            $msg->reject(false);
            return;
        }
        
        $body = $decompressed;
    }
    
    // 处理消息
    processMessage(json_decode($body, true));
    $msg->ack();
};

问题 3: CPU 开销大

症状: 压缩消耗大量 CPU

解决方案:

php
<?php

// 使用更快的压缩算法
class FastCompressor
{
    public function compress($data)
    {
        // 对于实时场景,使用 lz4 或 snappy
        if (function_exists('lz4_compress')) {
            return lz4_compress($data);
        }
        
        // 降级到 gzip 低压缩级别
        return gzencode($data, 1);  // 级别 1 最快
    }
}

最佳实践建议

  1. 设置压缩阈值: 小消息不压缩
  2. 选择合适算法: 根据场景选择压缩算法
  3. 监控压缩率: 记录压缩效果
  4. 处理解压异常: 添加错误处理
  5. 考虑 CPU 开销: 平衡压缩率和性能

相关链接