Skip to content

RabbitMQ 与 MongoDB 集成

概述

MongoDB 是一个高性能、无模式的文档数据库,支持丰富的查询语言和索引功能。将 RabbitMQ 与 MongoDB 集成,可以实现事件溯源、变更数据捕获、数据同步、日志存储等功能。本教程将详细介绍两者的集成方案。

集成架构设计

架构图

┌─────────────────────────────────────────────────────────────────────┐
│                    MongoDB 集成架构                                  │
│                                                                      │
│  ┌─────────────────────────────────────────────────────────────┐    │
│  │                    应用层                                    │    │
│  │  ┌─────────────┐    ┌─────────────┐    ┌─────────────┐     │    │
│  │  │   Event     │    │   Change    │    │   Sync      │     │    │
│  │  │   Store     │    │   Stream    │    │   Service   │     │    │
│  │  └─────────────┘    └─────────────┘    └─────────────┘     │    │
│  └─────────────────────────────────────────────────────────────┘    │
│                              │                                       │
│          ┌───────────────────┼───────────────────┐                  │
│          ▼                   ▼                   ▼                  │
│  ┌─────────────┐    ┌─────────────┐    ┌─────────────┐             │
│  │  RabbitMQ   │    │   MongoDB   │    │   Redis     │             │
│  │  (消息队列)  │    │  (文档存储)  │    │   (缓存)    │             │
│  │             │    │             │    │             │             │
│  │  • 事件通知  │    │  • 事件存储  │    │  • 查询缓存  │             │
│  │  • 异步处理  │    │  • 文档存储  │    │  • 会话管理  │             │
│  │  • 解耦系统  │    │  • Change Stream│  │             │             │
│  └─────────────┘    └─────────────┘    └─────────────┘             │
│         │                   │                   │                   │
│         └───────────────────┴───────────────────┘                   │
│                              │                                       │
│                              ▼                                       │
│  ┌─────────────────────────────────────────────────────────────┐    │
│  │                    数据处理层                                │    │
│  │  • 事件溯源重建                                              │    │
│  │  • 变更数据同步                                              │    │
│  │  • 聚合查询处理                                              │    │
│  └─────────────────────────────────────────────────────────────┘    │
└─────────────────────────────────────────────────────────────────────┘

集成模式

模式说明
Event Sourcing事件溯源,将所有变更作为事件存储
Change Data Capture变更数据捕获,监听 MongoDB 变更
CQRS命令查询分离,读写分离架构
Saga Pattern编排模式,协调分布式事务
Log Aggregation日志聚合,集中存储和分析日志

配置示例

MongoDB PHP 客户端配置

php
<?php

require_once __DIR__ . '/vendor/autoload.php';

use MongoDB\Client;
use MongoDB\Driver\Manager;

class MongoDBConfig
{
    public static function getClient(): Client
    {
        $uri = sprintf(
            'mongodb://%s:%s@%s:%d/%s',
            getenv('MONGO_USER') ?: 'root',
            getenv('MONGO_PASSWORD') ?: 'password',
            getenv('MONGO_HOST') ?: 'localhost',
            getenv('MONGO_PORT') ?: 27017,
            getenv('MONGO_DATABASE') ?: 'app_db'
        );
        
        $options = [
            'retryWrites' => true,
            'w' => 'majority',
            'readPreference' => 'primaryPreferred',
            'maxPoolSize' => 100,
            'minPoolSize' => 10,
            'connectTimeoutMS' => 5000,
            'socketTimeoutMS' => 30000,
        ];
        
        return new Client($uri, $options);
    }
    
    public static function getReplicaSetClient(): Client
    {
        $hosts = getenv('MONGO_REPLICA_SET_HOSTS') ?: 'mongo1:27017,mongo2:27017,mongo3:27017';
        $replicaSet = getenv('MONGO_REPLICA_SET') ?: 'rs0';
        
        $uri = sprintf(
            'mongodb://%s:%s@%s/%s?replicaSet=%s',
            getenv('MONGO_USER') ?: 'root',
            getenv('MONGO_PASSWORD') ?: 'password',
            $hosts,
            getenv('MONGO_DATABASE') ?: 'app_db',
            $replicaSet
        );
        
        return new Client($uri, [
            'retryWrites' => true,
            'w' => 'majority',
            'readConcernLevel' => 'majority',
        ]);
    }
}

RabbitMQ 连接配置

php
<?php

use PhpAmqpLib\Connection\AMQPStreamConnection;

class RabbitMQConfig
{
    public static function getConnection(): AMQPStreamConnection
    {
        return new AMQPStreamConnection(
            getenv('RABBITMQ_HOST') ?: 'localhost',
            getenv('RABBITMQ_PORT') ?: 5672,
            getenv('RABBITMQ_USER') ?: 'guest',
            getenv('RABBITMQ_PASSWORD') ?: 'guest',
            getenv('RABBITMQ_VHOST') ?: '/'
        );
    }
}

PHP 代码示例

事件溯源实现

php
<?php

class EventStore
{
    private $mongoClient;
    private $rabbitChannel;
    private $rabbitConnection;
    private $eventCollection;
    
    public function __construct(
        MongoDB\Client $mongoClient,
        AMQPStreamConnection $rabbitConnection
    ) {
        $this->mongoClient = $mongoClient;
        $this->rabbitConnection = $rabbitConnection;
        $this->rabbitChannel = $rabbitConnection->channel();
        $this->eventCollection = $mongoClient->app_db->events;
        $this->setupInfrastructure();
    }
    
    private function setupInfrastructure(): void
    {
        $this->eventCollection->createIndex(
            ['aggregateId' => 1, 'version' => 1],
            ['unique' => true]
        );
        
        $this->eventCollection->createIndex(
            ['eventType' => 1, 'timestamp' => -1]
        );
        
        $this->eventCollection->createIndex(
            ['timestamp' => -1]
        );
        
        $this->rabbitChannel->exchange_declare(
            'events.exchange',
            'topic',
            false,
            true,
            false
        );
    }
    
    public function append(string $aggregateId, array $events, int $expectedVersion = null): bool
    {
        $currentVersion = $this->getCurrentVersion($aggregateId);
        
        if ($expectedVersion !== null && $currentVersion !== $expectedVersion) {
            throw new RuntimeException(
                "并发冲突: 期望版本 {$expectedVersion}, 当前版本 {$currentVersion}"
            );
        }
        
        $version = $currentVersion;
        $storedEvents = [];
        
        foreach ($events as $event) {
            $version++;
            
            $storedEvent = [
                'eventId' => $this->generateUUID(),
                'aggregateId' => $aggregateId,
                'aggregateType' => $event['aggregateType'] ?? 'Unknown',
                'eventType' => $event['eventType'],
                'version' => $version,
                'payload' => $event['payload'] ?? [],
                'metadata' => [
                    'correlationId' => $event['correlationId'] ?? $this->generateUUID(),
                    'causationId' => $event['causationId'] ?? null,
                    'userId' => $event['userId'] ?? null,
                    'timestamp' => new MongoDB\BSON\UTCDateTime(),
                ],
            ];
            
            $storedEvents[] = $storedEvent;
        }
        
        try {
            $this->eventCollection->insertMany($storedEvents);
            
            foreach ($storedEvents as $event) {
                $this->publishEvent($event);
            }
            
            return true;
        } catch (MongoDB\Driver\Exception\BulkWriteException $e) {
            throw new RuntimeException("事件存储失败: " . $e->getMessage());
        }
    }
    
    public function getEvents(string $aggregateId, int $fromVersion = 0): array
    {
        $filter = [
            'aggregateId' => $aggregateId,
            'version' => ['$gt' => $fromVersion],
        ];
        
        $options = [
            'sort' => ['version' => 1],
        ];
        
        $cursor = $this->eventCollection->find($filter, $options);
        
        return iterator_to_array($cursor);
    }
    
    public function getEventsByType(
        string $eventType,
        int $limit = 100,
        int $skip = 0
    ): array {
        $filter = ['eventType' => $eventType];
        
        $options = [
            'sort' => ['timestamp' => -1],
            'limit' => $limit,
            'skip' => $skip,
        ];
        
        $cursor = $this->eventCollection->find($filter, $options);
        
        return iterator_to_array($cursor);
    }
    
    public function getEventsByTimeRange(
        \DateTime $from,
        \DateTime $to
    ): array {
        $filter = [
            'timestamp' => [
                '$gte' => new MongoDB\BSON\UTCDateTime($from->getTimestamp() * 1000),
                '$lte' => new MongoDB\BSON\UTCDateTime($to->getTimestamp() * 1000),
            ],
        ];
        
        $cursor = $this->eventCollection->find($filter, ['sort' => ['timestamp' => 1]]);
        
        return iterator_to_array($cursor);
    }
    
    private function getCurrentVersion(string $aggregateId): int
    {
        $filter = ['aggregateId' => $aggregateId];
        $options = [
            'sort' => ['version' => -1],
            'projection' => ['version' => 1],
        ];
        
        $document = $this->eventCollection->findOne($filter, $options);
        
        return $document ? $document['version'] : 0;
    }
    
    private function publishEvent(array $event): void
    {
        $routingKey = sprintf(
            '%s.%s',
            strtolower($event['aggregateType']),
            strtolower($event['eventType'])
        );
        
        $message = new \PhpAmqpLib\Message\AMQPMessage(
            json_encode($event),
            [
                'content_type' => 'application/json',
                'message_id' => $event['eventId'],
                'delivery_mode' => \PhpAmqpLib\Message\AMQPMessage::DELIVERY_MODE_PERSISTENT,
                'timestamp' => time(),
                'headers' => new \PhpAmqpLib\Wire\AMQPTable([
                    'eventType' => $event['eventType'],
                    'aggregateId' => $event['aggregateId'],
                    'version' => $event['version'],
                ])
            ]
        );
        
        $this->rabbitChannel->basic_publish(
            $message,
            'events.exchange',
            $routingKey
        );
    }
    
    private function generateUUID(): string
    {
        return sprintf(
            '%04x%04x-%04x-%04x-%04x-%04x%04x%04x',
            mt_rand(0, 0xffff),
            mt_rand(0, 0xffff),
            mt_rand(0, 0xffff),
            mt_rand(0, 0x0fff) | 0x4000,
            mt_rand(0, 0x3fff) | 0x8000,
            mt_rand(0, 0xffff),
            mt_rand(0, 0xffff),
            mt_rand(0, 0xffff)
        );
    }
    
    public function close(): void
    {
        $this->rabbitChannel->close();
        $this->rabbitConnection->close();
    }
}

// 使用示例
$mongo = MongoDBConfig::getClient();
$rabbit = RabbitMQConfig::getConnection();
$eventStore = new EventStore($mongo, $rabbit);

$eventStore->append('order-123', [
    [
        'eventType' => 'OrderCreated',
        'aggregateType' => 'Order',
        'payload' => [
            'customerId' => 'CUST-001',
            'amount' => 1500.00,
            'items' => [
                ['productId' => 'PROD-001', 'quantity' => 2, 'price' => 500],
                ['productId' => 'PROD-002', 'quantity' => 1, 'price' => 500],
            ],
        ],
    ],
    [
        'eventType' => 'OrderConfirmed',
        'aggregateType' => 'Order',
        'payload' => [
            'confirmedAt' => date('c'),
        ],
    ],
]);

$events = $eventStore->getEvents('order-123');
print_r($events);

$eventStore->close();

Change Stream 监听

php
<?php

class MongoDBChangeStream
{
    private $mongoClient;
    private $rabbitChannel;
    private $rabbitConnection;
    
    public function __construct(
        MongoDB\Client $mongoClient,
        AMQPStreamConnection $rabbitConnection
    ) {
        $this->mongoClient = $mongoClient;
        $this->rabbitConnection = $rabbitConnection;
        $this->rabbitChannel = $rabbitConnection->channel();
        $this->setupInfrastructure();
    }
    
    private function setupInfrastructure(): void
    {
        $this->rabbitChannel->exchange_declare(
            'mongodb.changes',
            'topic',
            false,
            true,
            false
        );
    }
    
    public function watchCollection(
        string $database,
        string $collection,
        array $pipeline = []
    ): void {
        $collection = $this->mongoClient->selectCollection($database, $collection);
        
        $defaultPipeline = [
            ['$match' => ['operationType' => ['$in' => ['insert', 'update', 'delete', 'replace']]]],
        ];
        
        $fullPipeline = empty($pipeline) ? $defaultPipeline : $pipeline;
        
        $changeStream = $collection->watch($fullPipeline, [
            'fullDocument' => 'updateLookup',
        ]);
        
        foreach ($changeStream as $change) {
            $this->handleChange($change);
        }
    }
    
    public function watchDatabase(string $database): void
    {
        $db = $this->mongoClient->selectDatabase($database);
        
        $changeStream = $db->watch([], [
            'fullDocument' => 'updateLookup',
        ]);
        
        foreach ($changeStream as $change) {
            $this->handleChange($change);
        }
    }
    
    private function handleChange(array $change): void
    {
        $event = [
            'id' => (string) $change['_id'],
            'operationType' => $change['operationType'],
            'database' => $change['ns']['db'] ?? '',
            'collection' => $change['ns']['coll'] ?? '',
            'documentKey' => $change['documentKey'] ?? [],
            'fullDocument' => $change['fullDocument'] ?? null,
            'updateDescription' => $change['updateDescription'] ?? null,
            'timestamp' => $change['clusterTime']->getTimestamp(),
        ];
        
        $this->publishChange($event);
    }
    
    private function publishChange(array $event): void
    {
        $routingKey = sprintf(
            'mongodb.%s.%s',
            $event['collection'],
            $event['operationType']
        );
        
        $message = new \PhpAmqpLib\Message\AMQPMessage(
            json_encode($event),
            [
                'content_type' => 'application/json',
                'delivery_mode' => \PhpAmqpLib\Message\AMQPMessage::DELIVERY_MODE_PERSISTENT,
                'timestamp' => time(),
            ]
        );
        
        $this->rabbitChannel->basic_publish(
            $message,
            'mongodb.changes',
            $routingKey
        );
    }
    
    public function close(): void
    {
        $this->rabbitChannel->close();
        $this->rabbitConnection->close();
    }
}

// 使用示例
$mongo = MongoDBConfig::getReplicaSetClient();
$rabbit = RabbitMQConfig::getConnection();
$changeStream = new MongoDBChangeStream($mongo, $rabbit);

echo "监听 MongoDB 变更...\n";

$changeStream->watchCollection('app_db', 'orders');

$changeStream->close();

文档同步服务

php
<?php

class DocumentSyncService
{
    private $mongoClient;
    private $rabbitChannel;
    private $rabbitConnection;
    
    public function __construct(
        MongoDB\Client $mongoClient,
        AMQPStreamConnection $rabbitConnection
    ) {
        $this->mongoClient = $mongoClient;
        $this->rabbitConnection = $rabbitConnection;
        $this->rabbitChannel = $rabbitConnection->channel();
        $this->setupInfrastructure();
    }
    
    private function setupInfrastructure(): void
    {
        $this->rabbitChannel->exchange_declare(
            'sync.documents',
            'direct',
            false,
            true,
            false
        );
        
        $this->rabbitChannel->queue_declare(
            'sync.document.updates',
            false,
            true,
            false,
            false
        );
        
        $this->rabbitChannel->queue_bind(
            'sync.document.updates',
            'sync.documents',
            'document.update'
        );
    }
    
    public function syncDocument(
        string $database,
        string $collection,
        string $documentId,
        array $document
    ): bool {
        $collection = $this->mongoClient->selectCollection($database, $collection);
        
        $result = $collection->updateOne(
            ['_id' => new MongoDB\BSON\ObjectId($documentId)],
            ['$set' => $document],
            ['upsert' => true]
        );
        
        $this->publishSyncEvent($database, $collection, $documentId, 'synced');
        
        return $result->getModifiedCount() > 0 || $result->getUpsertedCount() > 0;
    }
    
    public function bulkSync(
        string $database,
        string $collection,
        array $documents
    ): int {
        $collection = $this->mongoClient->selectCollection($database, $collection);
        
        $bulkOperations = [];
        
        foreach ($documents as $document) {
            $bulkOperations[] = [
                'replaceOne' => [
                    ['_id' => $document['_id']],
                    $document,
                    ['upsert' => true]
                ]
            ];
        }
        
        $result = $collection->bulkWrite($bulkOperations);
        
        $syncedCount = $result->getModifiedCount() + $result->getUpsertedCount();
        
        $this->publishBulkSyncEvent($database, $collection, $syncedCount);
        
        return $syncedCount;
    }
    
    public function deleteDocument(
        string $database,
        string $collection,
        string $documentId
    ): bool {
        $collection = $this->mongoClient->selectCollection($database, $collection);
        
        $result = $collection->deleteOne(
            ['_id' => new MongoDB\BSON\ObjectId($documentId)]
        );
        
        $this->publishSyncEvent($database, $collection, $documentId, 'deleted');
        
        return $result->getDeletedCount() > 0;
    }
    
    public function consumeSyncRequests(callable $processor): void
    {
        $callback = function ($message) use ($processor) {
            $data = json_decode($message->getBody(), true);
            
            try {
                $processor($data);
                $message->ack();
            } catch (Exception $e) {
                error_log("同步失败: " . $e->getMessage());
                $message->nack(false, true);
            }
        };
        
        $this->rabbitChannel->basic_qos(0, 10, false);
        $this->rabbitChannel->basic_consume(
            'sync.document.updates',
            '',
            false,
            false,
            false,
            false,
            $callback
        );
        
        while ($this->rabbitChannel->is_consuming()) {
            $this->rabbitChannel->wait();
        }
    }
    
    private function publishSyncEvent(
        string $database,
        string $collection,
        string $documentId,
        string $action
    ): void {
        $event = [
            'database' => $database,
            'collection' => $collection,
            'documentId' => $documentId,
            'action' => $action,
            'timestamp' => date('c'),
        ];
        
        $message = new \PhpAmqpLib\Message\AMQPMessage(
            json_encode($event),
            ['content_type' => 'application/json']
        );
        
        $this->rabbitChannel->basic_publish(
            $message,
            'sync.documents',
            'document.update'
        );
    }
    
    private function publishBulkSyncEvent(
        string $database,
        string $collection,
        int $count
    ): void {
        $event = [
            'database' => $database,
            'collection' => $collection,
            'count' => $count,
            'action' => 'bulk_sync',
            'timestamp' => date('c'),
        ];
        
        $message = new \PhpAmqpLib\Message\AMQPMessage(
            json_encode($event),
            ['content_type' => 'application/json']
        );
        
        $this->rabbitChannel->basic_publish(
            $message,
            'sync.documents',
            'document.update'
        );
    }
    
    public function close(): void
    {
        $this->rabbitChannel->close();
        $this->rabbitConnection->close();
    }
}

// 使用示例
$mongo = MongoDBConfig::getClient();
$rabbit = RabbitMQConfig::getConnection();
$sync = new DocumentSyncService($mongo, $rabbit);

$sync->syncDocument('app_db', 'products', '507f1f77bcf86cd799439011', [
    'name' => 'iPhone 15 Pro',
    'price' => 999.99,
    'category' => 'electronics',
    'updated_at' => new MongoDB\BSON\UTCDateTime(),
]);

$sync->close();

聚合查询服务

php
<?php

class AggregationService
{
    private $mongoClient;
    private $rabbitChannel;
    
    public function __construct(
        MongoDB\Client $mongoClient,
        AMQPStreamConnection $rabbit
    ) {
        $this->mongoClient = $mongoClient;
        $this->rabbitChannel = $rabbit->channel();
    }
    
    public function aggregateOrdersByCustomer(string $customerId): array
    {
        $collection = $this->mongoClient->app_db->orders;
        
        $pipeline = [
            ['$match' => ['customerId' => $customerId]],
            [
                '$group' => [
                    '_id' => '$customerId',
                    'totalOrders' => ['$sum' => 1],
                    'totalAmount' => ['$sum' => '$amount'],
                    'avgAmount' => ['$avg' => '$amount'],
                    'minAmount' => ['$min' => '$amount'],
                    'maxAmount' => ['$max' => '$amount'],
                ]
            ],
        ];
        
        return iterator_to_array($collection->aggregate($pipeline));
    }
    
    public function aggregateSalesByDate(
        \DateTime $from,
        \DateTime $to,
        string $interval = 'day'
    ): array {
        $collection = $this->mongoClient->app_db->orders;
        
        $dateFormat = $interval === 'hour' ? '%Y-%m-%d %H:00' : '%Y-%m-%d';
        
        $pipeline = [
            [
                '$match' => [
                    'createdAt' => [
                        '$gte' => new MongoDB\BSON\UTCDateTime($from->getTimestamp() * 1000),
                        '$lte' => new MongoDB\BSON\UTCDateTime($to->getTimestamp() * 1000),
                    ]
                ]
            ],
            [
                '$group' => [
                    '_id' => [
                        '$dateToString' => [
                            'format' => $dateFormat,
                            'date' => '$createdAt'
                        ]
                    ],
                    'totalSales' => ['$sum' => '$amount'],
                    'orderCount' => ['$sum' => 1],
                ]
            ],
            ['$sort' => ['_id' => 1]],
        ];
        
        return iterator_to_array($collection->aggregate($pipeline));
    }
    
    public function aggregateTopProducts(int $limit = 10): array
    {
        $collection = $this->mongoClient->app_db->orders;
        
        $pipeline = [
            ['$unwind' => '$items'],
            [
                '$group' => [
                    '_id' => '$items.productId',
                    'totalQuantity' => ['$sum' => '$items.quantity'],
                    'totalRevenue' => [
                        '$sum' => [
                            '$multiply' => ['$items.quantity', '$items.price']
                        ]
                    ],
                ]
            ],
            ['$sort' => ['totalRevenue' => -1]],
            ['$limit' => $limit],
        ];
        
        return iterator_to_array($collection->aggregate($pipeline));
    }
    
    public function publishAggregationResult(
        string $aggregationType,
        array $result
    ): void {
        $event = [
            'type' => $aggregationType,
            'result' => $result,
            'timestamp' => date('c'),
        ];
        
        $message = new \PhpAmqpLib\Message\AMQPMessage(
            json_encode($event),
            ['content_type' => 'application/json']
        );
        
        $this->rabbitChannel->basic_publish(
            $message,
            'aggregation.results',
            "aggregation.{$aggregationType}"
        );
    }
}

// 使用示例
$mongo = MongoDBConfig::getClient();
$rabbit = RabbitMQConfig::getConnection();
$agg = new AggregationService($mongo, $rabbit);

$salesByDate = $agg->aggregateSalesByDate(
    new DateTime('-7 days'),
    new DateTime(),
    'day'
);

print_r($salesByDate);

$topProducts = $agg->aggregateTopProducts(10);
print_r($topProducts);

实际应用场景

场景一:订单事件溯源

php
<?php

class OrderEventSourcing
{
    private EventStore $eventStore;
    private $mongoClient;
    
    public function __construct(
        EventStore $eventStore,
        MongoDB\Client $mongoClient
    ) {
        $this->eventStore = $eventStore;
        $this->mongoClient = $mongoClient;
    }
    
    public function createOrder(array $orderData): string
    {
        $orderId = 'ORD-' . time();
        
        $this->eventStore->append($orderId, [
            [
                'eventType' => 'OrderCreated',
                'aggregateType' => 'Order',
                'payload' => array_merge($orderData, [
                    'orderId' => $orderId,
                    'status' => 'PENDING',
                    'createdAt' => date('c'),
                ]),
            ]
        ]);
        
        return $orderId;
    }
    
    public function confirmOrder(string $orderId): void
    {
        $events = $this->eventStore->getEvents($orderId);
        
        if (empty($events)) {
            throw new RuntimeException("订单不存在: {$orderId}");
        }
        
        $lastEvent = end($events);
        
        if ($lastEvent['payload']['status'] !== 'PENDING') {
            throw new RuntimeException("订单状态不允许确认");
        }
        
        $this->eventStore->append($orderId, [
            [
                'eventType' => 'OrderConfirmed',
                'aggregateType' => 'Order',
                'payload' => [
                    'confirmedAt' => date('c'),
                    'status' => 'CONFIRMED',
                ],
            ]
        ], $lastEvent['version']);
    }
    
    public function shipOrder(string $orderId, string $trackingNumber): void
    {
        $events = $this->eventStore->getEvents($orderId);
        
        $lastEvent = end($events);
        
        if ($lastEvent['payload']['status'] !== 'CONFIRMED') {
            throw new RuntimeException("订单状态不允许发货");
        }
        
        $this->eventStore->append($orderId, [
            [
                'eventType' => 'OrderShipped',
                'aggregateType' => 'Order',
                'payload' => [
                    'shippedAt' => date('c'),
                    'trackingNumber' => $trackingNumber,
                    'status' => 'SHIPPED',
                ],
            ]
        ], $lastEvent['version']);
    }
    
    public function rebuildOrderState(string $orderId): array
    {
        $events = $this->eventStore->getEvents($orderId);
        
        $state = [];
        
        foreach ($events as $event) {
            $state = array_merge($state, $event['payload']);
        }
        
        return $state;
    }
    
    public function createSnapshot(string $orderId): void
    {
        $state = $this->rebuildOrderState($orderId);
        
        $snapshots = $this->mongoClient->app_db->order_snapshots;
        
        $snapshots->replaceOne(
            ['orderId' => $orderId],
            [
                'orderId' => $orderId,
                'state' => $state,
                'version' => count($this->eventStore->getEvents($orderId)),
                'createdAt' => new MongoDB\BSON\UTCDateTime(),
            ],
            ['upsert' => true]
        );
    }
    
    public function getOrderFromSnapshot(string $orderId): ?array
    {
        $snapshots = $this->mongoClient->app_db->order_snapshots;
        
        $snapshot = $snapshots->findOne(['orderId' => $orderId]);
        
        if (!$snapshot) {
            return $this->rebuildOrderState($orderId);
        }
        
        $events = $this->eventStore->getEvents($orderId, $snapshot['version']);
        
        $state = $snapshot['state'];
        
        foreach ($events as $event) {
            $state = array_merge($state, $event['payload']);
        }
        
        return $state;
    }
}

场景二:日志聚合

php
<?php

class LogAggregationService
{
    private $mongoClient;
    private $rabbitChannel;
    
    public function __construct(
        MongoDB\Client $mongoClient,
        AMQPStreamConnection $rabbit
    ) {
        $this->mongoClient = $mongoClient;
        $this->rabbitChannel = $rabbit->channel();
        $this->setupIndexes();
    }
    
    private function setupIndexes(): void
    {
        $collection = $this->mongoClient->app_db->logs;
        
        $collection->createIndex(['timestamp' => -1]);
        $collection->createIndex(['level' => 1, 'timestamp' => -1]);
        $collection->createIndex(['service' => 1, 'timestamp' => -1]);
        $collection->createIndex(['trace_id' => 1]);
    }
    
    public function ingestLog(array $log): bool
    {
        $collection = $this->mongoClient->app_db->logs;
        
        $document = [
            'timestamp' => new MongoDB\BSON\UTCDateTime(),
            'level' => $log['level'] ?? 'INFO',
            'message' => $log['message'] ?? '',
            'service' => $log['service'] ?? 'unknown',
            'host' => $log['host'] ?? gethostname(),
            'trace_id' => $log['trace_id'] ?? null,
            'span_id' => $log['span_id'] ?? null,
            'context' => $log['context'] ?? [],
            'extra' => $log['extra'] ?? [],
        ];
        
        $result = $collection->insertOne($document);
        
        return $result->getInsertedCount() > 0;
    }
    
    public function bulkIngest(array $logs): int
    {
        $collection = $this->mongoClient->app_db->logs;
        
        $documents = array_map(function ($log) {
            return [
                'timestamp' => new MongoDB\BSON\UTCDateTime(),
                'level' => $log['level'] ?? 'INFO',
                'message' => $log['message'] ?? '',
                'service' => $log['service'] ?? 'unknown',
                'host' => $log['host'] ?? gethostname(),
                'trace_id' => $log['trace_id'] ?? null,
                'span_id' => $log['span_id'] ?? null,
                'context' => $log['context'] ?? [],
                'extra' => $log['extra'] ?? [],
            ];
        }, $logs);
        
        $result = $collection->insertMany($documents);
        
        return $result->getInsertedCount();
    }
    
    public function searchLogs(array $criteria): array
    {
        $collection = $this->mongoClient->app_db->logs;
        
        $filter = [];
        
        if (!empty($criteria['level'])) {
            $filter['level'] = $criteria['level'];
        }
        
        if (!empty($criteria['service'])) {
            $filter['service'] = $criteria['service'];
        }
        
        if (!empty($criteria['trace_id'])) {
            $filter['trace_id'] = $criteria['trace_id'];
        }
        
        if (!empty($criteria['message'])) {
            $filter['message'] = ['$regex' => $criteria['message'], '$options' => 'i'];
        }
        
        if (!empty($criteria['from']) || !empty($criteria['to'])) {
            $filter['timestamp'] = [];
            
            if (!empty($criteria['from'])) {
                $filter['timestamp']['$gte'] = new MongoDB\BSON\UTCDateTime(
                    strtotime($criteria['from']) * 1000
                );
            }
            
            if (!empty($criteria['to'])) {
                $filter['timestamp']['$lte'] = new MongoDB\BSON\UTCDateTime(
                    strtotime($criteria['to']) * 1000
                );
            }
        }
        
        $options = [
            'sort' => ['timestamp' => -1],
            'limit' => $criteria['limit'] ?? 100,
            'skip' => $criteria['offset'] ?? 0,
        ];
        
        return iterator_to_array($collection->find($filter, $options));
    }
    
    public function getLogStats(string $interval = 'hour'): array
    {
        $collection = $this->mongoClient->app_db->logs;
        
        $dateFormat = match ($interval) {
            'minute' => '%Y-%m-%d %H:%M',
            'hour' => '%Y-%m-%d %H:00',
            'day' => '%Y-%m-%d',
            default => '%Y-%m-%d %H:00',
        };
        
        $pipeline = [
            [
                '$group' => [
                    '_id' => [
                        'time' => [
                            '$dateToString' => [
                                'format' => $dateFormat,
                                'date' => '$timestamp'
                            ]
                        ],
                        'level' => '$level'
                    ],
                    'count' => ['$sum' => 1]
                ]
            ],
            [
                '$group' => [
                    '_id' => '$_id.time',
                    'levels' => [
                        '$push' => [
                            'level' => '$_id.level',
                            'count' => '$count'
                        ]
                    ],
                    'total' => ['$sum' => '$count']
                ]
            ],
            ['$sort' => ['_id' => 1]],
        ];
        
        return iterator_to_array($collection->aggregate($pipeline));
    }
    
    public function consumeLogs(): void
    {
        $callback = function ($message) {
            $log = json_decode($message->getBody(), true);
            
            $this->ingestLog($log);
            
            $message->ack();
        };
        
        $this->rabbitChannel->basic_qos(0, 100, false);
        $this->rabbitChannel->basic_consume(
            'logs.ingestion',
            '',
            false,
            false,
            false,
            false,
            $callback
        );
        
        while ($this->rabbitChannel->is_consuming()) {
            $this->rabbitChannel->wait();
        }
    }
}

常见问题与解决方案

问题一:连接超时

症状: MongoDB 连接频繁超时

解决方案: 配置连接池和重试策略

php
$options = [
    'connectTimeoutMS' => 5000,
    'socketTimeoutMS' => 30000,
    'serverSelectionTimeoutMS' => 5000,
    'maxPoolSize' => 100,
    'minPoolSize' => 10,
    'retryWrites' => true,
    'w' => 'majority',
];

问题二:查询性能

症状: 聚合查询性能下降

解决方案: 创建适当的索引和优化查询

php
$collection->createIndex(
    ['field1' => 1, 'field2' => -1],
    ['background' => true]
);

$pipeline = [
    ['$match' => $filter],
    ['$sort' => ['indexedField' => 1]],
    ['$skip' => $offset],
    ['$limit' => $limit],
];

问题三:文档大小限制

症状: 文档超过 16MB 限制

解决方案: 使用 GridFS 或分片存储

php
$bucket = $this->mongoClient->app_db->selectGridFSBucket();

$stream = fopen('php://memory', 'r+');
fwrite($stream, $largeData);
rewind($stream);

$fileId = $bucket->uploadFromStream('large_file.json', $stream);

最佳实践建议

1. 索引管理

php
class IndexManager
{
    public static function ensureIndexes(MongoDB\Collection $collection, array $indexes): void
    {
        foreach ($indexes as $name => $spec) {
            $collection->createIndex($spec['keys'], [
                'name' => $name,
                'background' => true,
                'unique' => $spec['unique'] ?? false,
            ]);
        }
    }
}

2. 事务处理

php
public function executeTransaction(callable $operation): void
{
    $session = $this->mongoClient->startSession();
    
    try {
        $session->startTransaction([
            'readConcern' => new MongoDB\Driver\ReadConcern('majority'),
            'writeConcern' => new MongoDB\Driver\WriteConcern(MongoDB\Driver\WriteConcern::MAJORITY),
        ]);
        
        $operation($session);
        
        $session->commitTransaction();
    } catch (Exception $e) {
        $session->abortTransaction();
        throw $e;
    } finally {
        $session->endSession();
    }
}

3. 错误处理

php
class MongoDBErrorHandler
{
    public static function handle(Exception $e, array $context = []): void
    {
        error_log(sprintf(
            "[MongoDB Error] %s\nContext: %s",
            $e->getMessage(),
            json_encode($context)
        ));
    }
}

版本兼容性

PHPMongoDB PHP DriverMongoDB ServerRabbitMQ Server
8.2+1.15+7.x3.11+
8.1+1.14+6.x/7.x3.10+
8.0+1.13+5.x/6.x3.9+
7.4+1.12+4.x/5.x3.8+

相关链接