Appearance
RabbitMQ 与 MongoDB 集成
概述
MongoDB 是一个高性能、无模式的文档数据库,支持丰富的查询语言和索引功能。将 RabbitMQ 与 MongoDB 集成,可以实现事件溯源、变更数据捕获、数据同步、日志存储等功能。本教程将详细介绍两者的集成方案。
集成架构设计
架构图
┌─────────────────────────────────────────────────────────────────────┐
│ MongoDB 集成架构 │
│ │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ 应用层 │ │
│ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │
│ │ │ Event │ │ Change │ │ Sync │ │ │
│ │ │ Store │ │ Stream │ │ Service │ │ │
│ │ └─────────────┘ └─────────────┘ └─────────────┘ │ │
│ └─────────────────────────────────────────────────────────────┘ │
│ │ │
│ ┌───────────────────┼───────────────────┐ │
│ ▼ ▼ ▼ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ RabbitMQ │ │ MongoDB │ │ Redis │ │
│ │ (消息队列) │ │ (文档存储) │ │ (缓存) │ │
│ │ │ │ │ │ │ │
│ │ • 事件通知 │ │ • 事件存储 │ │ • 查询缓存 │ │
│ │ • 异步处理 │ │ • 文档存储 │ │ • 会话管理 │ │
│ │ • 解耦系统 │ │ • Change Stream│ │ │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
│ │ │ │ │
│ └───────────────────┴───────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ 数据处理层 │ │
│ │ • 事件溯源重建 │ │
│ │ • 变更数据同步 │ │
│ │ • 聚合查询处理 │ │
│ └─────────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────────┘集成模式
| 模式 | 说明 |
|---|---|
| Event Sourcing | 事件溯源,将所有变更作为事件存储 |
| Change Data Capture | 变更数据捕获,监听 MongoDB 变更 |
| CQRS | 命令查询分离,读写分离架构 |
| Saga Pattern | 编排模式,协调分布式事务 |
| Log Aggregation | 日志聚合,集中存储和分析日志 |
配置示例
MongoDB PHP 客户端配置
php
<?php
require_once __DIR__ . '/vendor/autoload.php';
use MongoDB\Client;
use MongoDB\Driver\Manager;
class MongoDBConfig
{
public static function getClient(): Client
{
$uri = sprintf(
'mongodb://%s:%s@%s:%d/%s',
getenv('MONGO_USER') ?: 'root',
getenv('MONGO_PASSWORD') ?: 'password',
getenv('MONGO_HOST') ?: 'localhost',
getenv('MONGO_PORT') ?: 27017,
getenv('MONGO_DATABASE') ?: 'app_db'
);
$options = [
'retryWrites' => true,
'w' => 'majority',
'readPreference' => 'primaryPreferred',
'maxPoolSize' => 100,
'minPoolSize' => 10,
'connectTimeoutMS' => 5000,
'socketTimeoutMS' => 30000,
];
return new Client($uri, $options);
}
public static function getReplicaSetClient(): Client
{
$hosts = getenv('MONGO_REPLICA_SET_HOSTS') ?: 'mongo1:27017,mongo2:27017,mongo3:27017';
$replicaSet = getenv('MONGO_REPLICA_SET') ?: 'rs0';
$uri = sprintf(
'mongodb://%s:%s@%s/%s?replicaSet=%s',
getenv('MONGO_USER') ?: 'root',
getenv('MONGO_PASSWORD') ?: 'password',
$hosts,
getenv('MONGO_DATABASE') ?: 'app_db',
$replicaSet
);
return new Client($uri, [
'retryWrites' => true,
'w' => 'majority',
'readConcernLevel' => 'majority',
]);
}
}RabbitMQ 连接配置
php
<?php
use PhpAmqpLib\Connection\AMQPStreamConnection;
class RabbitMQConfig
{
public static function getConnection(): AMQPStreamConnection
{
return new AMQPStreamConnection(
getenv('RABBITMQ_HOST') ?: 'localhost',
getenv('RABBITMQ_PORT') ?: 5672,
getenv('RABBITMQ_USER') ?: 'guest',
getenv('RABBITMQ_PASSWORD') ?: 'guest',
getenv('RABBITMQ_VHOST') ?: '/'
);
}
}PHP 代码示例
事件溯源实现
php
<?php
class EventStore
{
private $mongoClient;
private $rabbitChannel;
private $rabbitConnection;
private $eventCollection;
public function __construct(
MongoDB\Client $mongoClient,
AMQPStreamConnection $rabbitConnection
) {
$this->mongoClient = $mongoClient;
$this->rabbitConnection = $rabbitConnection;
$this->rabbitChannel = $rabbitConnection->channel();
$this->eventCollection = $mongoClient->app_db->events;
$this->setupInfrastructure();
}
private function setupInfrastructure(): void
{
$this->eventCollection->createIndex(
['aggregateId' => 1, 'version' => 1],
['unique' => true]
);
$this->eventCollection->createIndex(
['eventType' => 1, 'timestamp' => -1]
);
$this->eventCollection->createIndex(
['timestamp' => -1]
);
$this->rabbitChannel->exchange_declare(
'events.exchange',
'topic',
false,
true,
false
);
}
public function append(string $aggregateId, array $events, int $expectedVersion = null): bool
{
$currentVersion = $this->getCurrentVersion($aggregateId);
if ($expectedVersion !== null && $currentVersion !== $expectedVersion) {
throw new RuntimeException(
"并发冲突: 期望版本 {$expectedVersion}, 当前版本 {$currentVersion}"
);
}
$version = $currentVersion;
$storedEvents = [];
foreach ($events as $event) {
$version++;
$storedEvent = [
'eventId' => $this->generateUUID(),
'aggregateId' => $aggregateId,
'aggregateType' => $event['aggregateType'] ?? 'Unknown',
'eventType' => $event['eventType'],
'version' => $version,
'payload' => $event['payload'] ?? [],
'metadata' => [
'correlationId' => $event['correlationId'] ?? $this->generateUUID(),
'causationId' => $event['causationId'] ?? null,
'userId' => $event['userId'] ?? null,
'timestamp' => new MongoDB\BSON\UTCDateTime(),
],
];
$storedEvents[] = $storedEvent;
}
try {
$this->eventCollection->insertMany($storedEvents);
foreach ($storedEvents as $event) {
$this->publishEvent($event);
}
return true;
} catch (MongoDB\Driver\Exception\BulkWriteException $e) {
throw new RuntimeException("事件存储失败: " . $e->getMessage());
}
}
public function getEvents(string $aggregateId, int $fromVersion = 0): array
{
$filter = [
'aggregateId' => $aggregateId,
'version' => ['$gt' => $fromVersion],
];
$options = [
'sort' => ['version' => 1],
];
$cursor = $this->eventCollection->find($filter, $options);
return iterator_to_array($cursor);
}
public function getEventsByType(
string $eventType,
int $limit = 100,
int $skip = 0
): array {
$filter = ['eventType' => $eventType];
$options = [
'sort' => ['timestamp' => -1],
'limit' => $limit,
'skip' => $skip,
];
$cursor = $this->eventCollection->find($filter, $options);
return iterator_to_array($cursor);
}
public function getEventsByTimeRange(
\DateTime $from,
\DateTime $to
): array {
$filter = [
'timestamp' => [
'$gte' => new MongoDB\BSON\UTCDateTime($from->getTimestamp() * 1000),
'$lte' => new MongoDB\BSON\UTCDateTime($to->getTimestamp() * 1000),
],
];
$cursor = $this->eventCollection->find($filter, ['sort' => ['timestamp' => 1]]);
return iterator_to_array($cursor);
}
private function getCurrentVersion(string $aggregateId): int
{
$filter = ['aggregateId' => $aggregateId];
$options = [
'sort' => ['version' => -1],
'projection' => ['version' => 1],
];
$document = $this->eventCollection->findOne($filter, $options);
return $document ? $document['version'] : 0;
}
private function publishEvent(array $event): void
{
$routingKey = sprintf(
'%s.%s',
strtolower($event['aggregateType']),
strtolower($event['eventType'])
);
$message = new \PhpAmqpLib\Message\AMQPMessage(
json_encode($event),
[
'content_type' => 'application/json',
'message_id' => $event['eventId'],
'delivery_mode' => \PhpAmqpLib\Message\AMQPMessage::DELIVERY_MODE_PERSISTENT,
'timestamp' => time(),
'headers' => new \PhpAmqpLib\Wire\AMQPTable([
'eventType' => $event['eventType'],
'aggregateId' => $event['aggregateId'],
'version' => $event['version'],
])
]
);
$this->rabbitChannel->basic_publish(
$message,
'events.exchange',
$routingKey
);
}
private function generateUUID(): string
{
return sprintf(
'%04x%04x-%04x-%04x-%04x-%04x%04x%04x',
mt_rand(0, 0xffff),
mt_rand(0, 0xffff),
mt_rand(0, 0xffff),
mt_rand(0, 0x0fff) | 0x4000,
mt_rand(0, 0x3fff) | 0x8000,
mt_rand(0, 0xffff),
mt_rand(0, 0xffff),
mt_rand(0, 0xffff)
);
}
public function close(): void
{
$this->rabbitChannel->close();
$this->rabbitConnection->close();
}
}
// 使用示例
$mongo = MongoDBConfig::getClient();
$rabbit = RabbitMQConfig::getConnection();
$eventStore = new EventStore($mongo, $rabbit);
$eventStore->append('order-123', [
[
'eventType' => 'OrderCreated',
'aggregateType' => 'Order',
'payload' => [
'customerId' => 'CUST-001',
'amount' => 1500.00,
'items' => [
['productId' => 'PROD-001', 'quantity' => 2, 'price' => 500],
['productId' => 'PROD-002', 'quantity' => 1, 'price' => 500],
],
],
],
[
'eventType' => 'OrderConfirmed',
'aggregateType' => 'Order',
'payload' => [
'confirmedAt' => date('c'),
],
],
]);
$events = $eventStore->getEvents('order-123');
print_r($events);
$eventStore->close();Change Stream 监听
php
<?php
class MongoDBChangeStream
{
private $mongoClient;
private $rabbitChannel;
private $rabbitConnection;
public function __construct(
MongoDB\Client $mongoClient,
AMQPStreamConnection $rabbitConnection
) {
$this->mongoClient = $mongoClient;
$this->rabbitConnection = $rabbitConnection;
$this->rabbitChannel = $rabbitConnection->channel();
$this->setupInfrastructure();
}
private function setupInfrastructure(): void
{
$this->rabbitChannel->exchange_declare(
'mongodb.changes',
'topic',
false,
true,
false
);
}
public function watchCollection(
string $database,
string $collection,
array $pipeline = []
): void {
$collection = $this->mongoClient->selectCollection($database, $collection);
$defaultPipeline = [
['$match' => ['operationType' => ['$in' => ['insert', 'update', 'delete', 'replace']]]],
];
$fullPipeline = empty($pipeline) ? $defaultPipeline : $pipeline;
$changeStream = $collection->watch($fullPipeline, [
'fullDocument' => 'updateLookup',
]);
foreach ($changeStream as $change) {
$this->handleChange($change);
}
}
public function watchDatabase(string $database): void
{
$db = $this->mongoClient->selectDatabase($database);
$changeStream = $db->watch([], [
'fullDocument' => 'updateLookup',
]);
foreach ($changeStream as $change) {
$this->handleChange($change);
}
}
private function handleChange(array $change): void
{
$event = [
'id' => (string) $change['_id'],
'operationType' => $change['operationType'],
'database' => $change['ns']['db'] ?? '',
'collection' => $change['ns']['coll'] ?? '',
'documentKey' => $change['documentKey'] ?? [],
'fullDocument' => $change['fullDocument'] ?? null,
'updateDescription' => $change['updateDescription'] ?? null,
'timestamp' => $change['clusterTime']->getTimestamp(),
];
$this->publishChange($event);
}
private function publishChange(array $event): void
{
$routingKey = sprintf(
'mongodb.%s.%s',
$event['collection'],
$event['operationType']
);
$message = new \PhpAmqpLib\Message\AMQPMessage(
json_encode($event),
[
'content_type' => 'application/json',
'delivery_mode' => \PhpAmqpLib\Message\AMQPMessage::DELIVERY_MODE_PERSISTENT,
'timestamp' => time(),
]
);
$this->rabbitChannel->basic_publish(
$message,
'mongodb.changes',
$routingKey
);
}
public function close(): void
{
$this->rabbitChannel->close();
$this->rabbitConnection->close();
}
}
// 使用示例
$mongo = MongoDBConfig::getReplicaSetClient();
$rabbit = RabbitMQConfig::getConnection();
$changeStream = new MongoDBChangeStream($mongo, $rabbit);
echo "监听 MongoDB 变更...\n";
$changeStream->watchCollection('app_db', 'orders');
$changeStream->close();文档同步服务
php
<?php
class DocumentSyncService
{
private $mongoClient;
private $rabbitChannel;
private $rabbitConnection;
public function __construct(
MongoDB\Client $mongoClient,
AMQPStreamConnection $rabbitConnection
) {
$this->mongoClient = $mongoClient;
$this->rabbitConnection = $rabbitConnection;
$this->rabbitChannel = $rabbitConnection->channel();
$this->setupInfrastructure();
}
private function setupInfrastructure(): void
{
$this->rabbitChannel->exchange_declare(
'sync.documents',
'direct',
false,
true,
false
);
$this->rabbitChannel->queue_declare(
'sync.document.updates',
false,
true,
false,
false
);
$this->rabbitChannel->queue_bind(
'sync.document.updates',
'sync.documents',
'document.update'
);
}
public function syncDocument(
string $database,
string $collection,
string $documentId,
array $document
): bool {
$collection = $this->mongoClient->selectCollection($database, $collection);
$result = $collection->updateOne(
['_id' => new MongoDB\BSON\ObjectId($documentId)],
['$set' => $document],
['upsert' => true]
);
$this->publishSyncEvent($database, $collection, $documentId, 'synced');
return $result->getModifiedCount() > 0 || $result->getUpsertedCount() > 0;
}
public function bulkSync(
string $database,
string $collection,
array $documents
): int {
$collection = $this->mongoClient->selectCollection($database, $collection);
$bulkOperations = [];
foreach ($documents as $document) {
$bulkOperations[] = [
'replaceOne' => [
['_id' => $document['_id']],
$document,
['upsert' => true]
]
];
}
$result = $collection->bulkWrite($bulkOperations);
$syncedCount = $result->getModifiedCount() + $result->getUpsertedCount();
$this->publishBulkSyncEvent($database, $collection, $syncedCount);
return $syncedCount;
}
public function deleteDocument(
string $database,
string $collection,
string $documentId
): bool {
$collection = $this->mongoClient->selectCollection($database, $collection);
$result = $collection->deleteOne(
['_id' => new MongoDB\BSON\ObjectId($documentId)]
);
$this->publishSyncEvent($database, $collection, $documentId, 'deleted');
return $result->getDeletedCount() > 0;
}
public function consumeSyncRequests(callable $processor): void
{
$callback = function ($message) use ($processor) {
$data = json_decode($message->getBody(), true);
try {
$processor($data);
$message->ack();
} catch (Exception $e) {
error_log("同步失败: " . $e->getMessage());
$message->nack(false, true);
}
};
$this->rabbitChannel->basic_qos(0, 10, false);
$this->rabbitChannel->basic_consume(
'sync.document.updates',
'',
false,
false,
false,
false,
$callback
);
while ($this->rabbitChannel->is_consuming()) {
$this->rabbitChannel->wait();
}
}
private function publishSyncEvent(
string $database,
string $collection,
string $documentId,
string $action
): void {
$event = [
'database' => $database,
'collection' => $collection,
'documentId' => $documentId,
'action' => $action,
'timestamp' => date('c'),
];
$message = new \PhpAmqpLib\Message\AMQPMessage(
json_encode($event),
['content_type' => 'application/json']
);
$this->rabbitChannel->basic_publish(
$message,
'sync.documents',
'document.update'
);
}
private function publishBulkSyncEvent(
string $database,
string $collection,
int $count
): void {
$event = [
'database' => $database,
'collection' => $collection,
'count' => $count,
'action' => 'bulk_sync',
'timestamp' => date('c'),
];
$message = new \PhpAmqpLib\Message\AMQPMessage(
json_encode($event),
['content_type' => 'application/json']
);
$this->rabbitChannel->basic_publish(
$message,
'sync.documents',
'document.update'
);
}
public function close(): void
{
$this->rabbitChannel->close();
$this->rabbitConnection->close();
}
}
// 使用示例
$mongo = MongoDBConfig::getClient();
$rabbit = RabbitMQConfig::getConnection();
$sync = new DocumentSyncService($mongo, $rabbit);
$sync->syncDocument('app_db', 'products', '507f1f77bcf86cd799439011', [
'name' => 'iPhone 15 Pro',
'price' => 999.99,
'category' => 'electronics',
'updated_at' => new MongoDB\BSON\UTCDateTime(),
]);
$sync->close();聚合查询服务
php
<?php
class AggregationService
{
private $mongoClient;
private $rabbitChannel;
public function __construct(
MongoDB\Client $mongoClient,
AMQPStreamConnection $rabbit
) {
$this->mongoClient = $mongoClient;
$this->rabbitChannel = $rabbit->channel();
}
public function aggregateOrdersByCustomer(string $customerId): array
{
$collection = $this->mongoClient->app_db->orders;
$pipeline = [
['$match' => ['customerId' => $customerId]],
[
'$group' => [
'_id' => '$customerId',
'totalOrders' => ['$sum' => 1],
'totalAmount' => ['$sum' => '$amount'],
'avgAmount' => ['$avg' => '$amount'],
'minAmount' => ['$min' => '$amount'],
'maxAmount' => ['$max' => '$amount'],
]
],
];
return iterator_to_array($collection->aggregate($pipeline));
}
public function aggregateSalesByDate(
\DateTime $from,
\DateTime $to,
string $interval = 'day'
): array {
$collection = $this->mongoClient->app_db->orders;
$dateFormat = $interval === 'hour' ? '%Y-%m-%d %H:00' : '%Y-%m-%d';
$pipeline = [
[
'$match' => [
'createdAt' => [
'$gte' => new MongoDB\BSON\UTCDateTime($from->getTimestamp() * 1000),
'$lte' => new MongoDB\BSON\UTCDateTime($to->getTimestamp() * 1000),
]
]
],
[
'$group' => [
'_id' => [
'$dateToString' => [
'format' => $dateFormat,
'date' => '$createdAt'
]
],
'totalSales' => ['$sum' => '$amount'],
'orderCount' => ['$sum' => 1],
]
],
['$sort' => ['_id' => 1]],
];
return iterator_to_array($collection->aggregate($pipeline));
}
public function aggregateTopProducts(int $limit = 10): array
{
$collection = $this->mongoClient->app_db->orders;
$pipeline = [
['$unwind' => '$items'],
[
'$group' => [
'_id' => '$items.productId',
'totalQuantity' => ['$sum' => '$items.quantity'],
'totalRevenue' => [
'$sum' => [
'$multiply' => ['$items.quantity', '$items.price']
]
],
]
],
['$sort' => ['totalRevenue' => -1]],
['$limit' => $limit],
];
return iterator_to_array($collection->aggregate($pipeline));
}
public function publishAggregationResult(
string $aggregationType,
array $result
): void {
$event = [
'type' => $aggregationType,
'result' => $result,
'timestamp' => date('c'),
];
$message = new \PhpAmqpLib\Message\AMQPMessage(
json_encode($event),
['content_type' => 'application/json']
);
$this->rabbitChannel->basic_publish(
$message,
'aggregation.results',
"aggregation.{$aggregationType}"
);
}
}
// 使用示例
$mongo = MongoDBConfig::getClient();
$rabbit = RabbitMQConfig::getConnection();
$agg = new AggregationService($mongo, $rabbit);
$salesByDate = $agg->aggregateSalesByDate(
new DateTime('-7 days'),
new DateTime(),
'day'
);
print_r($salesByDate);
$topProducts = $agg->aggregateTopProducts(10);
print_r($topProducts);实际应用场景
场景一:订单事件溯源
php
<?php
class OrderEventSourcing
{
private EventStore $eventStore;
private $mongoClient;
public function __construct(
EventStore $eventStore,
MongoDB\Client $mongoClient
) {
$this->eventStore = $eventStore;
$this->mongoClient = $mongoClient;
}
public function createOrder(array $orderData): string
{
$orderId = 'ORD-' . time();
$this->eventStore->append($orderId, [
[
'eventType' => 'OrderCreated',
'aggregateType' => 'Order',
'payload' => array_merge($orderData, [
'orderId' => $orderId,
'status' => 'PENDING',
'createdAt' => date('c'),
]),
]
]);
return $orderId;
}
public function confirmOrder(string $orderId): void
{
$events = $this->eventStore->getEvents($orderId);
if (empty($events)) {
throw new RuntimeException("订单不存在: {$orderId}");
}
$lastEvent = end($events);
if ($lastEvent['payload']['status'] !== 'PENDING') {
throw new RuntimeException("订单状态不允许确认");
}
$this->eventStore->append($orderId, [
[
'eventType' => 'OrderConfirmed',
'aggregateType' => 'Order',
'payload' => [
'confirmedAt' => date('c'),
'status' => 'CONFIRMED',
],
]
], $lastEvent['version']);
}
public function shipOrder(string $orderId, string $trackingNumber): void
{
$events = $this->eventStore->getEvents($orderId);
$lastEvent = end($events);
if ($lastEvent['payload']['status'] !== 'CONFIRMED') {
throw new RuntimeException("订单状态不允许发货");
}
$this->eventStore->append($orderId, [
[
'eventType' => 'OrderShipped',
'aggregateType' => 'Order',
'payload' => [
'shippedAt' => date('c'),
'trackingNumber' => $trackingNumber,
'status' => 'SHIPPED',
],
]
], $lastEvent['version']);
}
public function rebuildOrderState(string $orderId): array
{
$events = $this->eventStore->getEvents($orderId);
$state = [];
foreach ($events as $event) {
$state = array_merge($state, $event['payload']);
}
return $state;
}
public function createSnapshot(string $orderId): void
{
$state = $this->rebuildOrderState($orderId);
$snapshots = $this->mongoClient->app_db->order_snapshots;
$snapshots->replaceOne(
['orderId' => $orderId],
[
'orderId' => $orderId,
'state' => $state,
'version' => count($this->eventStore->getEvents($orderId)),
'createdAt' => new MongoDB\BSON\UTCDateTime(),
],
['upsert' => true]
);
}
public function getOrderFromSnapshot(string $orderId): ?array
{
$snapshots = $this->mongoClient->app_db->order_snapshots;
$snapshot = $snapshots->findOne(['orderId' => $orderId]);
if (!$snapshot) {
return $this->rebuildOrderState($orderId);
}
$events = $this->eventStore->getEvents($orderId, $snapshot['version']);
$state = $snapshot['state'];
foreach ($events as $event) {
$state = array_merge($state, $event['payload']);
}
return $state;
}
}场景二:日志聚合
php
<?php
class LogAggregationService
{
private $mongoClient;
private $rabbitChannel;
public function __construct(
MongoDB\Client $mongoClient,
AMQPStreamConnection $rabbit
) {
$this->mongoClient = $mongoClient;
$this->rabbitChannel = $rabbit->channel();
$this->setupIndexes();
}
private function setupIndexes(): void
{
$collection = $this->mongoClient->app_db->logs;
$collection->createIndex(['timestamp' => -1]);
$collection->createIndex(['level' => 1, 'timestamp' => -1]);
$collection->createIndex(['service' => 1, 'timestamp' => -1]);
$collection->createIndex(['trace_id' => 1]);
}
public function ingestLog(array $log): bool
{
$collection = $this->mongoClient->app_db->logs;
$document = [
'timestamp' => new MongoDB\BSON\UTCDateTime(),
'level' => $log['level'] ?? 'INFO',
'message' => $log['message'] ?? '',
'service' => $log['service'] ?? 'unknown',
'host' => $log['host'] ?? gethostname(),
'trace_id' => $log['trace_id'] ?? null,
'span_id' => $log['span_id'] ?? null,
'context' => $log['context'] ?? [],
'extra' => $log['extra'] ?? [],
];
$result = $collection->insertOne($document);
return $result->getInsertedCount() > 0;
}
public function bulkIngest(array $logs): int
{
$collection = $this->mongoClient->app_db->logs;
$documents = array_map(function ($log) {
return [
'timestamp' => new MongoDB\BSON\UTCDateTime(),
'level' => $log['level'] ?? 'INFO',
'message' => $log['message'] ?? '',
'service' => $log['service'] ?? 'unknown',
'host' => $log['host'] ?? gethostname(),
'trace_id' => $log['trace_id'] ?? null,
'span_id' => $log['span_id'] ?? null,
'context' => $log['context'] ?? [],
'extra' => $log['extra'] ?? [],
];
}, $logs);
$result = $collection->insertMany($documents);
return $result->getInsertedCount();
}
public function searchLogs(array $criteria): array
{
$collection = $this->mongoClient->app_db->logs;
$filter = [];
if (!empty($criteria['level'])) {
$filter['level'] = $criteria['level'];
}
if (!empty($criteria['service'])) {
$filter['service'] = $criteria['service'];
}
if (!empty($criteria['trace_id'])) {
$filter['trace_id'] = $criteria['trace_id'];
}
if (!empty($criteria['message'])) {
$filter['message'] = ['$regex' => $criteria['message'], '$options' => 'i'];
}
if (!empty($criteria['from']) || !empty($criteria['to'])) {
$filter['timestamp'] = [];
if (!empty($criteria['from'])) {
$filter['timestamp']['$gte'] = new MongoDB\BSON\UTCDateTime(
strtotime($criteria['from']) * 1000
);
}
if (!empty($criteria['to'])) {
$filter['timestamp']['$lte'] = new MongoDB\BSON\UTCDateTime(
strtotime($criteria['to']) * 1000
);
}
}
$options = [
'sort' => ['timestamp' => -1],
'limit' => $criteria['limit'] ?? 100,
'skip' => $criteria['offset'] ?? 0,
];
return iterator_to_array($collection->find($filter, $options));
}
public function getLogStats(string $interval = 'hour'): array
{
$collection = $this->mongoClient->app_db->logs;
$dateFormat = match ($interval) {
'minute' => '%Y-%m-%d %H:%M',
'hour' => '%Y-%m-%d %H:00',
'day' => '%Y-%m-%d',
default => '%Y-%m-%d %H:00',
};
$pipeline = [
[
'$group' => [
'_id' => [
'time' => [
'$dateToString' => [
'format' => $dateFormat,
'date' => '$timestamp'
]
],
'level' => '$level'
],
'count' => ['$sum' => 1]
]
],
[
'$group' => [
'_id' => '$_id.time',
'levels' => [
'$push' => [
'level' => '$_id.level',
'count' => '$count'
]
],
'total' => ['$sum' => '$count']
]
],
['$sort' => ['_id' => 1]],
];
return iterator_to_array($collection->aggregate($pipeline));
}
public function consumeLogs(): void
{
$callback = function ($message) {
$log = json_decode($message->getBody(), true);
$this->ingestLog($log);
$message->ack();
};
$this->rabbitChannel->basic_qos(0, 100, false);
$this->rabbitChannel->basic_consume(
'logs.ingestion',
'',
false,
false,
false,
false,
$callback
);
while ($this->rabbitChannel->is_consuming()) {
$this->rabbitChannel->wait();
}
}
}常见问题与解决方案
问题一:连接超时
症状: MongoDB 连接频繁超时
解决方案: 配置连接池和重试策略
php
$options = [
'connectTimeoutMS' => 5000,
'socketTimeoutMS' => 30000,
'serverSelectionTimeoutMS' => 5000,
'maxPoolSize' => 100,
'minPoolSize' => 10,
'retryWrites' => true,
'w' => 'majority',
];问题二:查询性能
症状: 聚合查询性能下降
解决方案: 创建适当的索引和优化查询
php
$collection->createIndex(
['field1' => 1, 'field2' => -1],
['background' => true]
);
$pipeline = [
['$match' => $filter],
['$sort' => ['indexedField' => 1]],
['$skip' => $offset],
['$limit' => $limit],
];问题三:文档大小限制
症状: 文档超过 16MB 限制
解决方案: 使用 GridFS 或分片存储
php
$bucket = $this->mongoClient->app_db->selectGridFSBucket();
$stream = fopen('php://memory', 'r+');
fwrite($stream, $largeData);
rewind($stream);
$fileId = $bucket->uploadFromStream('large_file.json', $stream);最佳实践建议
1. 索引管理
php
class IndexManager
{
public static function ensureIndexes(MongoDB\Collection $collection, array $indexes): void
{
foreach ($indexes as $name => $spec) {
$collection->createIndex($spec['keys'], [
'name' => $name,
'background' => true,
'unique' => $spec['unique'] ?? false,
]);
}
}
}2. 事务处理
php
public function executeTransaction(callable $operation): void
{
$session = $this->mongoClient->startSession();
try {
$session->startTransaction([
'readConcern' => new MongoDB\Driver\ReadConcern('majority'),
'writeConcern' => new MongoDB\Driver\WriteConcern(MongoDB\Driver\WriteConcern::MAJORITY),
]);
$operation($session);
$session->commitTransaction();
} catch (Exception $e) {
$session->abortTransaction();
throw $e;
} finally {
$session->endSession();
}
}3. 错误处理
php
class MongoDBErrorHandler
{
public static function handle(Exception $e, array $context = []): void
{
error_log(sprintf(
"[MongoDB Error] %s\nContext: %s",
$e->getMessage(),
json_encode($context)
));
}
}版本兼容性
| PHP | MongoDB PHP Driver | MongoDB Server | RabbitMQ Server |
|---|---|---|---|
| 8.2+ | 1.15+ | 7.x | 3.11+ |
| 8.1+ | 1.14+ | 6.x/7.x | 3.10+ |
| 8.0+ | 1.13+ | 5.x/6.x | 3.9+ |
| 7.4+ | 1.12+ | 4.x/5.x | 3.8+ |
