Appearance
消息压缩
概述
消息压缩是减少消息体积、降低网络传输开销和存储空间的重要手段。对于大消息或高频消息场景,压缩可以显著提升系统性能。
核心原理
压缩流程
mermaid
graph LR
subgraph 生产者
P[原始消息] --> C[压缩]
C --> S[发送]
end
subgraph 消费者
R[接收] --> D[解压]
D --> M[原始消息]
end
S --> R压缩算法对比
| 算法 | 压缩比 | 速度 | 适用场景 |
|---|---|---|---|
| gzip | 高 | 中 | 文本、JSON |
| zlib | 中 | 快 | 通用 |
| bz2 | 最高 | 慢 | 大文件 |
| lz4 | 低 | 最快 | 实时场景 |
| snappy | 低 | 极快 | 高吞吐 |
压缩决策
mermaid
graph TD
M[消息] --> S{大小检查}
S -->|小于阈值| N[不压缩]
S -->|大于阈值| T{类型检查}
T -->|文本/JSON| G[gzip压缩]
T -->|二进制| L[lz4压缩]
T -->|大文件| B[bz2压缩]PHP 代码示例
基本压缩与解压
php
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
class MessageCompressor
{
const THRESHOLD = 1024; // 1KB 以上才压缩
const LEVEL = 6; // 压缩级别 1-9
public static function compress($data)
{
$serialized = is_string($data) ? $data : json_encode($data);
// 小于阈值不压缩
if (strlen($serialized) < self::THRESHOLD) {
return [
'body' => $serialized,
'compressed' => false
];
}
$compressed = gzencode($serialized, self::LEVEL);
return [
'body' => $compressed,
'compressed' => true,
'original_size' => strlen($serialized),
'compressed_size' => strlen($compressed)
];
}
public static function decompress($data, $compressed = true)
{
if (!$compressed) {
return $data;
}
$decompressed = gzdecode($data);
return $decompressed;
}
public static function getCompressionRatio($original, $compressed)
{
return round((1 - strlen($compressed) / strlen($original)) * 100, 2);
}
}
// 使用示例
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$queueName = 'compressed-queue';
$channel->queue_declare($queueName, false, true, false, false);
// 发送压缩消息
$largeData = [
'records' => array_fill(0, 1000, [
'id' => rand(1, 10000),
'name' => 'User ' . rand(1, 100),
'email' => 'user' . rand(1, 100) . '@example.com'
])
];
$compressed = MessageCompressor::compress($largeData);
$message = new AMQPMessage(
$compressed['body'],
[
'content_type' => 'application/json',
'content_encoding' => $compressed['compressed'] ? 'gzip' : 'UTF-8',
'headers' => [
'x-compressed' => $compressed['compressed'],
'x-original-size' => $compressed['original_size'] ?? null
]
]
);
$channel->basic_publish($message, '', $queueName);
if ($compressed['compressed']) {
$ratio = MessageCompressor::getCompressionRatio(
$compressed['original_size'],
$compressed['compressed_size']
);
echo "消息已压缩,压缩率: {$ratio}%\n";
}
$channel->close();
$connection->close();消费者解压处理
php
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$queueName = 'compressed-queue';
$channel->queue_declare($queueName, false, true, false, false);
echo "等待压缩消息...\n";
$callback = function (AMQPMessage $msg) {
$contentEncoding = $msg->get('content_encoding');
$body = $msg->getBody();
// 检查是否压缩
$isCompressed = $contentEncoding === 'gzip' ||
($msg->has('application_headers') &&
$msg->get('application_headers')->getNativeData()['x-compressed'] ?? false);
if ($isCompressed) {
echo "收到压缩消息,正在解压...\n";
$body = gzdecode($body);
}
$data = json_decode($body, true);
echo "消息内容:\n";
echo " 记录数: " . count($data['records'] ?? []) . "\n";
echo "-------------------\n";
$msg->ack();
};
$channel->basic_consume($queueName, '', false, false, false, false, $callback);
while ($channel->is_consuming()) {
$channel->wait();
}
$channel->close();
$connection->close();多算法压缩器
php
<?php
use PhpAmqpLib\Message\AMQPMessage;
class MultiAlgorithmCompressor
{
const ALGO_GZIP = 'gzip';
const ALGO_ZLIB = 'zlib';
const ALGO_BZ2 = 'bz2';
const ALGO_LZ4 = 'lz4';
private $threshold;
private $defaultAlgo;
public function __construct($threshold = 1024, $defaultAlgo = self::ALGO_GZIP)
{
$this->threshold = $threshold;
$this->defaultAlgo = $defaultAlgo;
}
public function compress($data, $algo = null)
{
$algo = $algo ?? $this->defaultAlgo;
$serialized = is_string($data) ? $data : json_encode($data);
$originalSize = strlen($serialized);
// 小于阈值不压缩
if ($originalSize < $this->threshold) {
return [
'body' => $serialized,
'compressed' => false,
'algorithm' => null
];
}
$compressed = $this->doCompress($serialized, $algo);
$compressedSize = strlen($compressed);
// 如果压缩后更大,则不压缩
if ($compressedSize >= $originalSize) {
return [
'body' => $serialized,
'compressed' => false,
'algorithm' => null
];
}
return [
'body' => $compressed,
'compressed' => true,
'algorithm' => $algo,
'original_size' => $originalSize,
'compressed_size' => $compressedSize,
'ratio' => round((1 - $compressedSize / $originalSize) * 100, 2)
];
}
public function decompress($data, $algo)
{
if (!$algo) {
return $data;
}
return $this->doDecompress($data, $algo);
}
private function doCompress($data, $algo)
{
switch ($algo) {
case self::ALGO_GZIP:
return gzencode($data, 6);
case self::ALGO_ZLIB:
return zlib_encode($data, ZLIB_ENCODING_DEFLATE);
case self::ALGO_BZ2:
return bzcompress($data, 6);
case self::ALGO_LZ4:
if (function_exists('lz4_compress')) {
return lz4_compress($data);
}
throw new RuntimeException('LZ4 extension not installed');
default:
throw new InvalidArgumentException("Unknown algorithm: {$algo}");
}
}
private function doDecompress($data, $algo)
{
switch ($algo) {
case self::ALGO_GZIP:
return gzdecode($data);
case self::ALGO_ZLIB:
return zlib_decode($data);
case self::ALGO_BZ2:
return bzdecompress($data);
case self::ALGO_LZ4:
if (function_exists('lz4_uncompress')) {
return lz4_uncompress($data);
}
throw new RuntimeException('LZ4 extension not installed');
default:
throw new InvalidArgumentException("Unknown algorithm: {$algo}");
}
}
public function selectBestAlgorithm($data)
{
$serialized = is_string($data) ? $data : json_encode($data);
$results = [];
foreach ([self::ALGO_GZIP, self::ALGO_ZLIB, self::ALGO_BZ2] as $algo) {
try {
$compressed = $this->doCompress($serialized, $algo);
$results[$algo] = strlen($compressed);
} catch (Exception $e) {
continue;
}
}
if (empty($results)) {
return $this->defaultAlgo;
}
return array_keys($results, min($results))[0];
}
}压缩消息工厂
php
<?php
use PhpAmqpLib\Message\AMQPMessage;
class CompressedMessageFactory
{
private $compressor;
public function __construct($threshold = 1024)
{
$this->compressor = new MultiAlgorithmCompressor($threshold);
}
public function create($data, $algo = null)
{
$result = $this->compressor->compress($data, $algo);
$properties = [
'content_type' => 'application/json',
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT
];
if ($result['compressed']) {
$properties['content_encoding'] = $result['algorithm'];
$properties['headers'] = [
'x-compressed' => true,
'x-compression-algorithm' => $result['algorithm'],
'x-original-size' => $result['original_size']
];
}
return new AMQPMessage($result['body'], $properties);
}
public function extract(AMQPMessage $msg)
{
$body = $msg->getBody();
$headers = $msg->has('application_headers')
? $msg->get('application_headers')->getNativeData()
: [];
$isCompressed = $headers['x-compressed'] ?? false;
$algorithm = $headers['x-compression-algorithm'] ?? null;
if ($isCompressed && $algorithm) {
$body = $this->compressor->decompress($body, $algorithm);
}
return json_decode($body, true);
}
}实际应用场景
1. 日志批量传输
php
<?php
class LogBatchTransporter
{
private $channel;
private $queueName;
private $compressor;
private $batchSize = 1000;
public function __construct($channel, $queueName = 'log-batch')
{
$this->channel = $channel;
$this->queueName = $queueName;
$this->compressor = new MultiAlgorithmCompressor(512); // 512B 以上压缩
$this->setupQueue();
}
private function setupQueue()
{
$this->channel->queue_declare($this->queueName, false, true, false, false);
}
public function sendBatch(array $logs)
{
$result = $this->compressor->compress([
'logs' => $logs,
'count' => count($logs),
'sent_at' => time()
], MultiAlgorithmCompressor::ALGO_GZIP);
$message = new AMQPMessage(
$result['body'],
[
'content_type' => 'application/json',
'content_encoding' => $result['compressed'] ? 'gzip' : 'UTF-8',
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT
]
);
$this->channel->basic_publish($message, '', $this->queueName);
if ($result['compressed']) {
echo sprintf(
"发送 %d 条日志,压缩率: %.2f%%\n",
count($logs),
$result['ratio']
);
}
}
public function consumeBatches(callable $handler)
{
$callback = function ($msg) use ($handler) {
$contentEncoding = $msg->get('content_encoding');
$body = $msg->getBody();
if ($contentEncoding === 'gzip') {
$body = gzdecode($body);
}
$data = json_decode($body, true);
$handler($data['logs']);
$msg->ack();
};
$this->channel->basic_consume($this->queueName, '', false, false, false, false, $callback);
}
}2. 大文件分片传输
php
<?php
class ChunkedFileTransporter
{
private $channel;
private $chunkSize = 1024 * 1024; // 1MB
public function __construct($channel)
{
$this->channel = $channel;
}
public function sendFile($filePath, $metadata = [])
{
$fileId = uniqid('file-');
$fileSize = filesize($filePath);
$totalChunks = ceil($fileSize / $this->chunkSize);
$queueName = 'file-chunks';
$this->channel->queue_declare($queueName, false, true, false, false);
$handle = fopen($filePath, 'rb');
$chunkIndex = 0;
while (!feof($handle)) {
$chunk = fread($handle, $this->chunkSize);
// 压缩分片
$compressed = gzencode($chunk, 6);
$message = new AMQPMessage(
$compressed,
[
'content_type' => 'application/octet-stream',
'content_encoding' => 'gzip',
'headers' => [
'x-file-id' => $fileId,
'x-chunk-index' => $chunkIndex,
'x-total-chunks' => $totalChunks,
'x-file-size' => $fileSize,
'x-metadata' => $metadata
]
]
);
$this->channel->basic_publish($message, '', $queueName);
$chunkIndex++;
}
fclose($handle);
echo "文件已分片发送: {$fileId}, 共 {$totalChunks} 个分片\n";
return $fileId;
}
public function receiveFile($fileId, $outputPath)
{
$chunks = [];
$totalChunks = null;
// 接收所有分片
while (true) {
$message = $this->channel->basic_get('file-chunks');
if (!$message) {
break;
}
$headers = $message->get('application_headers')->getNativeData();
if ($headers['x-file-id'] === $fileId) {
$chunkIndex = $headers['x-chunk-index'];
$totalChunks = $headers['x-total-chunks'];
// 解压分片
$chunks[$chunkIndex] = gzdecode($message->getBody());
$this->channel->basic_ack($message->getDeliveryTag());
}
}
// 合并分片
$handle = fopen($outputPath, 'wb');
for ($i = 0; $i < $totalChunks; $i++) {
fwrite($handle, $chunks[$i]);
}
fclose($handle);
echo "文件已接收: {$outputPath}\n";
}
}3. API 响应缓存
php
<?php
class CompressedCacheService
{
private $channel;
private $queueName;
private $compressor;
public function __construct($channel)
{
$this->channel = $channel;
$this->queueName = 'api-cache';
$this->compressor = new MultiAlgorithmCompressor(256);
$this->setupQueue();
}
private function setupQueue()
{
$this->channel->queue_declare($this->queueName, false, true, false, false);
}
public function cacheResponse($key, $response, $ttl = 3600)
{
$result = $this->compressor->compress([
'key' => $key,
'response' => $response,
'cached_at' => time(),
'ttl' => $ttl
]);
$message = new AMQPMessage(
$result['body'],
[
'content_type' => 'application/json',
'content_encoding' => $result['compressed'] ? 'gzip' : 'UTF-8',
'expiration' => (string)($ttl * 1000)
]
);
$this->channel->basic_publish($message, '', $this->queueName);
}
public function getCachedResponse($key)
{
// 遍历队列查找缓存
while ($message = $this->channel->basic_get($this->queueName)) {
$contentEncoding = $message->get('content_encoding');
$body = $message->getBody();
if ($contentEncoding === 'gzip') {
$body = gzdecode($body);
}
$data = json_decode($body, true);
if ($data['key'] === $key) {
// 检查是否过期
if (time() - $data['cached_at'] < $data['ttl']) {
return $data['response'];
}
break;
}
}
return null;
}
}常见问题与解决方案
问题 1: 压缩后体积更大
症状: 小消息压缩后反而变大
解决方案:
php
<?php
// 设置合理的压缩阈值
class SmartCompressor
{
private $threshold = 1024; // 1KB 以上才压缩
public function compress($data)
{
$serialized = json_encode($data);
if (strlen($serialized) < $this->threshold) {
return $serialized; // 不压缩
}
$compressed = gzencode($serialized, 6);
// 压缩后更大则返回原始数据
if (strlen($compressed) >= strlen($serialized)) {
return $serialized;
}
return $compressed;
}
}问题 2: 解压失败
症状: 消费者解压时报错
解决方案:
php
<?php
// 添加校验机制
$callback = function ($msg) {
$body = $msg->getBody();
$contentEncoding = $msg->get('content_encoding');
if ($contentEncoding === 'gzip') {
$decompressed = @gzdecode($body);
if ($decompressed === false) {
// 解压失败,记录错误
error_log("解压失败: " . $msg->get('message_id'));
$msg->reject(false);
return;
}
$body = $decompressed;
}
// 处理消息
processMessage(json_decode($body, true));
$msg->ack();
};问题 3: CPU 开销大
症状: 压缩消耗大量 CPU
解决方案:
php
<?php
// 使用更快的压缩算法
class FastCompressor
{
public function compress($data)
{
// 对于实时场景,使用 lz4 或 snappy
if (function_exists('lz4_compress')) {
return lz4_compress($data);
}
// 降级到 gzip 低压缩级别
return gzencode($data, 1); // 级别 1 最快
}
}最佳实践建议
- 设置压缩阈值: 小消息不压缩
- 选择合适算法: 根据场景选择压缩算法
- 监控压缩率: 记录压缩效果
- 处理解压异常: 添加错误处理
- 考虑 CPU 开销: 平衡压缩率和性能
