Appearance
流队列
概述
流队列(Stream Queue)是 RabbitMQ 3.9.0 引入的一种新型队列,基于日志抽象实现。它提供了高性能的消息存储和消费能力,支持消息的持久化存储和重复消费,非常适合事件溯源、日志处理等场景。
核心原理
流队列特点
mermaid
graph TD
subgraph 流队列架构
P1[生产者1] --> S[Stream Queue]
P2[生产者2] --> S
P3[生产者3] --> S
S --> C1[消费者1<br/>offset: 0]
S --> C2[消费者2<br/>offset: 100]
S --> C3[消费者3<br/>offset: latest]
end
subgraph 消息存储
S --> D1[Segment 1<br/>0-999]
S --> D2[Segment 2<br/>1000-1999]
S --> D3[Segment 3<br/>2000-...]
end
style S fill:#f9f,stroke:#333流队列 vs 传统队列
| 特性 | 流队列 | 传统队列 |
|---|---|---|
| 消息保留 | 基于时间/大小 | 消费后删除 |
| 消费模式 | 支持重复消费 | 一次性消费 |
| 消费位置 | 支持 offset | 仅支持头部 |
| 性能 | 极高 | 高 |
| 适用场景 | 日志、事件溯源 | 任务队列 |
消费者 Offset 类型
| Offset 类型 | 说明 |
|---|---|
| first | 从第一条消息开始 |
| last | 从最后一条消息开始 |
| next | 从下一条新消息开始 |
| timestamp | 从指定时间戳开始 |
| specific | 从指定 offset 开始 |
PHP 代码示例
声明流队列
php
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$streamName = 'events-stream';
// 声明流队列
$channel->queue_declare(
$streamName,
false,
true,
false,
false,
false,
new AMQPTable([
'x-queue-type' => 'stream',
'x-max-length-bytes' => 10737418240, // 10GB
'x-stream-max-segment-size-bytes' => 52428800, // 50MB
'x-stream-retention-period' => 604800000 // 7天
])
);
echo "流队列已创建: {$streamName}\n";
$channel->close();
$connection->close();发送消息到流队列
php
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$streamName = 'events-stream';
$channel->queue_declare(
$streamName,
false,
true,
false,
false,
false,
new AMQPTable(['x-queue-type' => 'stream'])
);
// 发送大量事件
for ($i = 1; $i <= 1000; $i++) {
$message = new AMQPMessage(
json_encode([
'event_id' => $i,
'type' => 'user_action',
'data' => [
'user_id' => rand(1, 100),
'action' => ['click', 'view', 'purchase'][rand(0, 2)]
],
'timestamp' => time()
]),
[
'content_type' => 'application/json'
]
);
$channel->basic_publish($message, '', $streamName);
}
echo "已发送 1000 条消息到流队列\n";
$channel->close();
$connection->close();从流队列消费消息
php
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$streamName = 'events-stream';
echo "从流队列消费消息...\n";
$callback = function (AMQPMessage $msg) {
$data = json_decode($msg->getBody(), true);
echo sprintf(
"[%d] 事件: %s, 用户: %d\n",
$data['event_id'],
$data['type'],
$data['data']['user_id']
);
// 流队列不需要 ack,消息保留在队列中
};
// 设置消费者参数
$channel->basic_consume(
$streamName,
'consumer-1',
false,
false,
false,
false,
$callback,
null,
new AMQPTable([
'x-stream-offset' => 'first' // 从第一条消息开始
])
);
while ($channel->is_consuming()) {
$channel->wait();
}
$channel->close();
$connection->close();从指定 Offset 消费
php
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Wire\AMQPTable;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$streamName = 'events-stream';
// 从指定 offset 开始消费
$offset = 500; // 从第 500 条消息开始
$callback = function ($msg) {
$data = json_decode($msg->getBody(), true);
echo "Offset: {$data['event_id']}, 类型: {$data['type']}\n";
};
$channel->basic_consume(
$streamName,
'consumer-offset',
false,
false,
false,
false,
$callback,
null,
new AMQPTable([
'x-stream-offset' => $offset
])
);
while ($channel->is_consuming()) {
$channel->wait();
}
$channel->close();
$connection->close();从指定时间戳消费
php
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Wire\AMQPTable;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$streamName = 'events-stream';
// 从 1 小时前开始消费
$timestamp = (time() - 3600) * 1000; // 毫秒时间戳
$callback = function ($msg) {
$data = json_decode($msg->getBody(), true);
echo "事件时间: " . date('Y-m-d H:i:s', $data['timestamp']) . "\n";
};
$channel->basic_consume(
$streamName,
'consumer-timestamp',
false,
false,
false,
false,
$callback,
null,
new AMQPTable([
'x-stream-offset' => $timestamp
])
);
while ($channel->is_consuming()) {
$channel->wait();
}
$channel->close();
$connection->close();流队列管理类
php
<?php
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;
class StreamQueueManager
{
private $channel;
public function __construct($channel)
{
$this->channel = $channel;
}
public function declareStream($streamName, array $options = [])
{
$arguments = ['x-queue-type' => 'stream'];
if (isset($options['maxLengthBytes'])) {
$arguments['x-max-length-bytes'] = $options['maxLengthBytes'];
}
if (isset($options['maxSegmentSize'])) {
$arguments['x-stream-max-segment-size-bytes'] = $options['maxSegmentSize'];
}
if (isset($options['retentionPeriod'])) {
$arguments['x-stream-retention-period'] = $options['retentionPeriod'];
}
$this->channel->queue_declare(
$streamName,
false,
true,
false,
false,
false,
new AMQPTable($arguments)
);
return $streamName;
}
public function publish($streamName, $data)
{
$message = new AMQPMessage(
json_encode($data),
['content_type' => 'application/json']
);
$this->channel->basic_publish($message, '', $streamName);
}
public function consume($streamName, $consumerTag, $offset, callable $handler)
{
$callback = function ($msg) use ($handler) {
$data = json_decode($msg->getBody(), true);
$handler($data, $msg);
};
$offsetValue = is_numeric($offset) ? $offset : $offset;
$this->channel->basic_consume(
$streamName,
$consumerTag,
false,
false,
false,
false,
$callback,
null,
new AMQPTable(['x-stream-offset' => $offsetValue])
);
while ($this->channel->is_consuming()) {
$this->channel->wait();
}
}
public function consumeFromFirst($streamName, $consumerTag, callable $handler)
{
$this->consume($streamName, $consumerTag, 'first', $handler);
}
public function consumeFromLast($streamName, $consumerTag, callable $handler)
{
$this->consume($streamName, $consumerTag, 'last', $handler);
}
public function consumeFromTimestamp($streamName, $consumerTag, $timestamp, callable $handler)
{
$this->consume($streamName, $consumerTag, $timestamp * 1000, $handler);
}
}实际应用场景
1. 事件溯源
php
<?php
class EventStore
{
private $streamManager;
private $streamName = 'domain-events';
public function __construct($channel)
{
$this->streamManager = new StreamQueueManager($channel);
$this->streamManager->declareStream($this->streamName, [
'maxLengthBytes' => 107374182400, // 100GB
'maxSegmentSize' => 104857600, // 100MB
'retentionPeriod' => 2592000000 // 30天
]);
}
public function appendEvent($aggregateId, $eventType, $payload, $version)
{
$this->streamManager->publish($this->streamName, [
'aggregate_id' => $aggregateId,
'event_type' => $eventType,
'payload' => $payload,
'version' => $version,
'timestamp' => time()
]);
}
public function replayEvents($aggregateId, $fromVersion = 0)
{
$events = [];
$this->streamManager->consumeFromFirst(
$this->streamName,
'replay-' . $aggregateId,
function ($data) use ($aggregateId, $fromVersion, &$events) {
if ($data['aggregate_id'] === $aggregateId && $data['version'] >= $fromVersion) {
$events[] = $data;
}
}
);
return $events;
}
public function subscribeToEvents($handler)
{
$this->streamManager->consumeFromLast(
$this->streamName,
'subscriber-' . uniqid(),
$handler
);
}
}2. 日志收集
php
<?php
class LogStreamCollector
{
private $streamManager;
private $streamName = 'application-logs';
public function __construct($channel)
{
$this->streamManager = new StreamQueueManager($channel);
$this->streamManager->declareStream($this->streamName, [
'maxLengthBytes' => 53687091200, // 50GB
'retentionPeriod' => 604800000 // 7天
]);
}
public function log($level, $service, $message, $context = [])
{
$this->streamManager->publish($this->streamName, [
'level' => $level,
'service' => $service,
'message' => $message,
'context' => $context,
'hostname' => gethostname(),
'timestamp' => time()
]);
}
public function queryLogs($startTime, $level = null, $service = null)
{
$logs = [];
$this->streamManager->consumeFromTimestamp(
$this->streamName,
'query-' . uniqid(),
$startTime,
function ($data) use ($level, $service, &$logs) {
if ($level && $data['level'] !== $level) {
return;
}
if ($service && $data['service'] !== $service) {
return;
}
$logs[] = $data;
}
);
return $logs;
}
public function tailLogs($handler)
{
$this->streamManager->consumeFromLast(
$this->streamName,
'tail-' . uniqid(),
$handler
);
}
}3. 数据同步
php
<?php
class DataSyncStream
{
private $streamManager;
private $streamName = 'data-changes';
public function __construct($channel)
{
$this->streamManager = new StreamQueueManager($channel);
$this->streamManager->declareStream($this->streamName, [
'maxLengthBytes' => 10737418240, // 10GB
'retentionPeriod' => 2592000000 // 30天
]);
}
public function recordChange($entity, $id, $action, $data)
{
$this->streamManager->publish($this->streamName, [
'entity' => $entity,
'id' => $id,
'action' => $action, // create, update, delete
'data' => $data,
'timestamp' => time()
]);
}
public function syncFromBeginning($entity, callable $handler)
{
$this->streamManager->consumeFromFirst(
$this->streamName,
'sync-' . $entity,
function ($data) use ($entity, $handler) {
if ($data['entity'] === $entity) {
$handler($data);
}
}
);
}
public function subscribeChanges($entity, $lastSyncTime, callable $handler)
{
$this->streamManager->consumeFromTimestamp(
$this->streamName,
'sub-' . $entity . '-' . uniqid(),
$lastSyncTime,
function ($data) use ($entity, $handler) {
if ($data['entity'] === $entity) {
$handler($data);
}
}
);
}
}常见问题与解决方案
问题 1: 磁盘空间占用
症状: 流队列占用大量磁盘空间
解决方案:
php
<?php
// 设置合理的保留策略
$channel->queue_declare(
$streamName,
false,
true,
false,
false,
false,
new AMQPTable([
'x-queue-type' => 'stream',
'x-max-length-bytes' => 10737418240, // 限制最大 10GB
'x-stream-retention-period' => 604800000 // 7天后自动删除
])
);问题 2: 消费者 Offset 管理
症状: 消费者重启后重复消费消息
解决方案:
php
<?php
// 持久化消费者的 offset
class OffsetManager
{
private $redis;
public function saveOffset($consumerId, $offset)
{
$this->redis->set("stream:offset:{$consumerId}", $offset);
}
public function getOffset($consumerId)
{
return $this->redis->get("stream:offset:{$consumerId}") ?: 'last';
}
}
// 使用示例
$offsetManager = new OffsetManager($redis);
$lastOffset = $offsetManager->getOffset('consumer-1');
$channel->basic_consume(
$streamName,
'consumer-1',
false,
false,
false,
false,
function ($msg) use ($offsetManager) {
// 处理消息
processMessage($msg);
// 保存 offset
$offsetManager->saveOffset('consumer-1', $msg->get('x-stream-offset'));
},
null,
new AMQPTable(['x-stream-offset' => $lastOffset])
);问题 3: 性能优化
症状: 流队列吞吐量不够
解决方案:
php
<?php
// 增大批处理大小
$channel->basic_qos(null, 100, null);
// 使用更大的 segment 大小
$channel->queue_declare(
$streamName,
false,
true,
false,
false,
false,
new AMQPTable([
'x-queue-type' => 'stream',
'x-stream-max-segment-size-bytes' => 524288000 // 500MB
])
);最佳实践建议
- 合理设置保留策略: 根据业务需求设置数据保留时间和大小
- Offset 管理: 持久化消费者的 offset,避免重复消费
- 监控磁盘: 监控磁盘空间使用情况
- 批量处理: 使用较大的 prefetch 提高吞吐
- 分区策略: 对于高吞吐场景,考虑使用多个流队列
