Skip to content

流队列

概述

流队列(Stream Queue)是 RabbitMQ 3.9.0 引入的一种新型队列,基于日志抽象实现。它提供了高性能的消息存储和消费能力,支持消息的持久化存储和重复消费,非常适合事件溯源、日志处理等场景。

核心原理

流队列特点

mermaid
graph TD
    subgraph 流队列架构
        P1[生产者1] --> S[Stream Queue]
        P2[生产者2] --> S
        P3[生产者3] --> S
        
        S --> C1[消费者1<br/>offset: 0]
        S --> C2[消费者2<br/>offset: 100]
        S --> C3[消费者3<br/>offset: latest]
    end
    
    subgraph 消息存储
        S --> D1[Segment 1<br/>0-999]
        S --> D2[Segment 2<br/>1000-1999]
        S --> D3[Segment 3<br/>2000-...]
    end
    
    style S fill:#f9f,stroke:#333

流队列 vs 传统队列

特性流队列传统队列
消息保留基于时间/大小消费后删除
消费模式支持重复消费一次性消费
消费位置支持 offset仅支持头部
性能极高
适用场景日志、事件溯源任务队列

消费者 Offset 类型

Offset 类型说明
first从第一条消息开始
last从最后一条消息开始
next从下一条新消息开始
timestamp从指定时间戳开始
specific从指定 offset 开始

PHP 代码示例

声明流队列

php
<?php

require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$streamName = 'events-stream';

// 声明流队列
$channel->queue_declare(
    $streamName,
    false,
    true,
    false,
    false,
    false,
    new AMQPTable([
        'x-queue-type' => 'stream',
        'x-max-length-bytes' => 10737418240,  // 10GB
        'x-stream-max-segment-size-bytes' => 52428800,  // 50MB
        'x-stream-retention-period' => 604800000  // 7天
    ])
);

echo "流队列已创建: {$streamName}\n";

$channel->close();
$connection->close();

发送消息到流队列

php
<?php

require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$streamName = 'events-stream';
$channel->queue_declare(
    $streamName,
    false,
    true,
    false,
    false,
    false,
    new AMQPTable(['x-queue-type' => 'stream'])
);

// 发送大量事件
for ($i = 1; $i <= 1000; $i++) {
    $message = new AMQPMessage(
        json_encode([
            'event_id' => $i,
            'type' => 'user_action',
            'data' => [
                'user_id' => rand(1, 100),
                'action' => ['click', 'view', 'purchase'][rand(0, 2)]
            ],
            'timestamp' => time()
        ]),
        [
            'content_type' => 'application/json'
        ]
    );
    
    $channel->basic_publish($message, '', $streamName);
}

echo "已发送 1000 条消息到流队列\n";

$channel->close();
$connection->close();

从流队列消费消息

php
<?php

require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$streamName = 'events-stream';

echo "从流队列消费消息...\n";

$callback = function (AMQPMessage $msg) {
    $data = json_decode($msg->getBody(), true);
    
    echo sprintf(
        "[%d] 事件: %s, 用户: %d\n",
        $data['event_id'],
        $data['type'],
        $data['data']['user_id']
    );
    
    // 流队列不需要 ack,消息保留在队列中
};

// 设置消费者参数
$channel->basic_consume(
    $streamName,
    'consumer-1',
    false,
    false,
    false,
    false,
    $callback,
    null,
    new AMQPTable([
        'x-stream-offset' => 'first'  // 从第一条消息开始
    ])
);

while ($channel->is_consuming()) {
    $channel->wait();
}

$channel->close();
$connection->close();

从指定 Offset 消费

php
<?php

require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Wire\AMQPTable;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$streamName = 'events-stream';

// 从指定 offset 开始消费
$offset = 500;  // 从第 500 条消息开始

$callback = function ($msg) {
    $data = json_decode($msg->getBody(), true);
    echo "Offset: {$data['event_id']}, 类型: {$data['type']}\n";
};

$channel->basic_consume(
    $streamName,
    'consumer-offset',
    false,
    false,
    false,
    false,
    $callback,
    null,
    new AMQPTable([
        'x-stream-offset' => $offset
    ])
);

while ($channel->is_consuming()) {
    $channel->wait();
}

$channel->close();
$connection->close();

从指定时间戳消费

php
<?php

require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Wire\AMQPTable;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$streamName = 'events-stream';

// 从 1 小时前开始消费
$timestamp = (time() - 3600) * 1000;  // 毫秒时间戳

$callback = function ($msg) {
    $data = json_decode($msg->getBody(), true);
    echo "事件时间: " . date('Y-m-d H:i:s', $data['timestamp']) . "\n";
};

$channel->basic_consume(
    $streamName,
    'consumer-timestamp',
    false,
    false,
    false,
    false,
    $callback,
    null,
    new AMQPTable([
        'x-stream-offset' => $timestamp
    ])
);

while ($channel->is_consuming()) {
    $channel->wait();
}

$channel->close();
$connection->close();

流队列管理类

php
<?php

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;

class StreamQueueManager
{
    private $channel;
    
    public function __construct($channel)
    {
        $this->channel = $channel;
    }
    
    public function declareStream($streamName, array $options = [])
    {
        $arguments = ['x-queue-type' => 'stream'];
        
        if (isset($options['maxLengthBytes'])) {
            $arguments['x-max-length-bytes'] = $options['maxLengthBytes'];
        }
        
        if (isset($options['maxSegmentSize'])) {
            $arguments['x-stream-max-segment-size-bytes'] = $options['maxSegmentSize'];
        }
        
        if (isset($options['retentionPeriod'])) {
            $arguments['x-stream-retention-period'] = $options['retentionPeriod'];
        }
        
        $this->channel->queue_declare(
            $streamName,
            false,
            true,
            false,
            false,
            false,
            new AMQPTable($arguments)
        );
        
        return $streamName;
    }
    
    public function publish($streamName, $data)
    {
        $message = new AMQPMessage(
            json_encode($data),
            ['content_type' => 'application/json']
        );
        
        $this->channel->basic_publish($message, '', $streamName);
    }
    
    public function consume($streamName, $consumerTag, $offset, callable $handler)
    {
        $callback = function ($msg) use ($handler) {
            $data = json_decode($msg->getBody(), true);
            $handler($data, $msg);
        };
        
        $offsetValue = is_numeric($offset) ? $offset : $offset;
        
        $this->channel->basic_consume(
            $streamName,
            $consumerTag,
            false,
            false,
            false,
            false,
            $callback,
            null,
            new AMQPTable(['x-stream-offset' => $offsetValue])
        );
        
        while ($this->channel->is_consuming()) {
            $this->channel->wait();
        }
    }
    
    public function consumeFromFirst($streamName, $consumerTag, callable $handler)
    {
        $this->consume($streamName, $consumerTag, 'first', $handler);
    }
    
    public function consumeFromLast($streamName, $consumerTag, callable $handler)
    {
        $this->consume($streamName, $consumerTag, 'last', $handler);
    }
    
    public function consumeFromTimestamp($streamName, $consumerTag, $timestamp, callable $handler)
    {
        $this->consume($streamName, $consumerTag, $timestamp * 1000, $handler);
    }
}

实际应用场景

1. 事件溯源

php
<?php

class EventStore
{
    private $streamManager;
    private $streamName = 'domain-events';
    
    public function __construct($channel)
    {
        $this->streamManager = new StreamQueueManager($channel);
        
        $this->streamManager->declareStream($this->streamName, [
            'maxLengthBytes' => 107374182400,  // 100GB
            'maxSegmentSize' => 104857600,     // 100MB
            'retentionPeriod' => 2592000000    // 30天
        ]);
    }
    
    public function appendEvent($aggregateId, $eventType, $payload, $version)
    {
        $this->streamManager->publish($this->streamName, [
            'aggregate_id' => $aggregateId,
            'event_type' => $eventType,
            'payload' => $payload,
            'version' => $version,
            'timestamp' => time()
        ]);
    }
    
    public function replayEvents($aggregateId, $fromVersion = 0)
    {
        $events = [];
        
        $this->streamManager->consumeFromFirst(
            $this->streamName,
            'replay-' . $aggregateId,
            function ($data) use ($aggregateId, $fromVersion, &$events) {
                if ($data['aggregate_id'] === $aggregateId && $data['version'] >= $fromVersion) {
                    $events[] = $data;
                }
            }
        );
        
        return $events;
    }
    
    public function subscribeToEvents($handler)
    {
        $this->streamManager->consumeFromLast(
            $this->streamName,
            'subscriber-' . uniqid(),
            $handler
        );
    }
}

2. 日志收集

php
<?php

class LogStreamCollector
{
    private $streamManager;
    private $streamName = 'application-logs';
    
    public function __construct($channel)
    {
        $this->streamManager = new StreamQueueManager($channel);
        
        $this->streamManager->declareStream($this->streamName, [
            'maxLengthBytes' => 53687091200,   // 50GB
            'retentionPeriod' => 604800000     // 7天
        ]);
    }
    
    public function log($level, $service, $message, $context = [])
    {
        $this->streamManager->publish($this->streamName, [
            'level' => $level,
            'service' => $service,
            'message' => $message,
            'context' => $context,
            'hostname' => gethostname(),
            'timestamp' => time()
        ]);
    }
    
    public function queryLogs($startTime, $level = null, $service = null)
    {
        $logs = [];
        
        $this->streamManager->consumeFromTimestamp(
            $this->streamName,
            'query-' . uniqid(),
            $startTime,
            function ($data) use ($level, $service, &$logs) {
                if ($level && $data['level'] !== $level) {
                    return;
                }
                if ($service && $data['service'] !== $service) {
                    return;
                }
                $logs[] = $data;
            }
        );
        
        return $logs;
    }
    
    public function tailLogs($handler)
    {
        $this->streamManager->consumeFromLast(
            $this->streamName,
            'tail-' . uniqid(),
            $handler
        );
    }
}

3. 数据同步

php
<?php

class DataSyncStream
{
    private $streamManager;
    private $streamName = 'data-changes';
    
    public function __construct($channel)
    {
        $this->streamManager = new StreamQueueManager($channel);
        
        $this->streamManager->declareStream($this->streamName, [
            'maxLengthBytes' => 10737418240,  // 10GB
            'retentionPeriod' => 2592000000   // 30天
        ]);
    }
    
    public function recordChange($entity, $id, $action, $data)
    {
        $this->streamManager->publish($this->streamName, [
            'entity' => $entity,
            'id' => $id,
            'action' => $action,  // create, update, delete
            'data' => $data,
            'timestamp' => time()
        ]);
    }
    
    public function syncFromBeginning($entity, callable $handler)
    {
        $this->streamManager->consumeFromFirst(
            $this->streamName,
            'sync-' . $entity,
            function ($data) use ($entity, $handler) {
                if ($data['entity'] === $entity) {
                    $handler($data);
                }
            }
        );
    }
    
    public function subscribeChanges($entity, $lastSyncTime, callable $handler)
    {
        $this->streamManager->consumeFromTimestamp(
            $this->streamName,
            'sub-' . $entity . '-' . uniqid(),
            $lastSyncTime,
            function ($data) use ($entity, $handler) {
                if ($data['entity'] === $entity) {
                    $handler($data);
                }
            }
        );
    }
}

常见问题与解决方案

问题 1: 磁盘空间占用

症状: 流队列占用大量磁盘空间

解决方案:

php
<?php

// 设置合理的保留策略
$channel->queue_declare(
    $streamName,
    false,
    true,
    false,
    false,
    false,
    new AMQPTable([
        'x-queue-type' => 'stream',
        'x-max-length-bytes' => 10737418240,  // 限制最大 10GB
        'x-stream-retention-period' => 604800000  // 7天后自动删除
    ])
);

问题 2: 消费者 Offset 管理

症状: 消费者重启后重复消费消息

解决方案:

php
<?php

// 持久化消费者的 offset
class OffsetManager
{
    private $redis;
    
    public function saveOffset($consumerId, $offset)
    {
        $this->redis->set("stream:offset:{$consumerId}", $offset);
    }
    
    public function getOffset($consumerId)
    {
        return $this->redis->get("stream:offset:{$consumerId}") ?: 'last';
    }
}

// 使用示例
$offsetManager = new OffsetManager($redis);
$lastOffset = $offsetManager->getOffset('consumer-1');

$channel->basic_consume(
    $streamName,
    'consumer-1',
    false,
    false,
    false,
    false,
    function ($msg) use ($offsetManager) {
        // 处理消息
        processMessage($msg);
        
        // 保存 offset
        $offsetManager->saveOffset('consumer-1', $msg->get('x-stream-offset'));
    },
    null,
    new AMQPTable(['x-stream-offset' => $lastOffset])
);

问题 3: 性能优化

症状: 流队列吞吐量不够

解决方案:

php
<?php

// 增大批处理大小
$channel->basic_qos(null, 100, null);

// 使用更大的 segment 大小
$channel->queue_declare(
    $streamName,
    false,
    true,
    false,
    false,
    false,
    new AMQPTable([
        'x-queue-type' => 'stream',
        'x-stream-max-segment-size-bytes' => 524288000  // 500MB
    ])
);

最佳实践建议

  1. 合理设置保留策略: 根据业务需求设置数据保留时间和大小
  2. Offset 管理: 持久化消费者的 offset,避免重复消费
  3. 监控磁盘: 监控磁盘空间使用情况
  4. 批量处理: 使用较大的 prefetch 提高吞吐
  5. 分区策略: 对于高吞吐场景,考虑使用多个流队列

相关链接