Appearance
4.5 批量操作
概述
批量操作是MongoDB中执行多个写入操作的高效方式,包括批量插入、批量更新、批量删除等。MongoDB的批量操作可以显著提高性能,减少网络往返次数,优化资源使用。本章节将详细介绍MongoDB批量操作的各种方法、最佳实践和性能优化技巧。
MongoDB的批量操作具有高性能、原子性和灵活性的特点。掌握批量操作对于处理大量数据迁移、数据导入、批量更新等场景至关重要。
基本概念
批量操作类型
MongoDB支持以下批量操作类型:
- 批量插入:使用
insertMany()插入多个文档 - 批量更新:使用
updateMany()更新多个文档 - 批量删除:使用
deleteMany()删除多个文档 - 混合批量操作:使用
bulkWrite()执行混合操作
批量操作优势
批量操作的主要优势:
- 性能提升:减少网络往返次数
- 原子性:操作在单个请求中执行
- 资源优化:减少服务器资源消耗
- 错误处理:统一的错误处理机制
批量操作限制
批量操作的注意事项:
- 文档大小:单个文档最大16MB
- 批量大小:建议每批1000-10000个操作
- 内存使用:大批量操作可能消耗较多内存
- 错误处理:需要妥善处理部分失败的情况
原理深度解析
批量插入原理
使用insertMany()方法批量插入文档:
php
<?php
// 批量插入详解
class BatchInsertOperations {
private $database;
public function __construct($databaseName) {
$client = new MongoDB\Client("mongodb://localhost:27017");
$this->database = $client->selectDatabase($databaseName);
}
public function batchInsert($collectionName, $documents, $options = []) {
$collection = $this->database->selectCollection($collectionName);
try {
$result = $collection->insertMany($documents, $options);
return [
'success' => true,
'inserted_count' => $result->getInsertedCount(),
'inserted_ids' => $result->getInsertedIds(),
'acknowledged' => $result->isAcknowledged()
];
} catch (MongoDB\Driver\Exception\BulkWriteException $e) {
$writeResult = $e->getWriteResult();
return [
'success' => false,
'error' => 'Bulk insert error',
'inserted_count' => $writeResult->getInsertedCount(),
'write_errors' => $writeResult->getWriteErrors(),
'message' => $e->getMessage()
];
} catch (Exception $e) {
return [
'success' => false,
'error' => 'Batch insert failed',
'message' => $e->getMessage()
];
}
}
public function batchInsertWithProgress($collectionName, $documents, $batchSize = 1000, $callback = null) {
$collection = $this->database->selectCollection($collectionName);
$totalDocuments = count($documents);
$batches = array_chunk($documents, $batchSize);
$totalBatches = count($batches);
$results = [
'total_documents' => $totalDocuments,
'total_batches' => $totalBatches,
'batch_size' => $batchSize,
'inserted_count' => 0,
'failed_batches' => 0,
'batch_results' => []
];
foreach ($batches as $batchIndex => $batch) {
try {
$result = $collection->insertMany($batch);
$batchResult = [
'batch_number' => $batchIndex + 1,
'batch_size' => count($batch),
'inserted_count' => $result->getInsertedCount(),
'status' => 'success'
];
$results['inserted_count'] += $result->getInsertedCount();
} catch (MongoDB\Driver\Exception\BulkWriteException $e) {
$writeResult = $e->getWriteResult();
$batchResult = [
'batch_number' => $batchIndex + 1,
'batch_size' => count($batch),
'inserted_count' => $writeResult->getInsertedCount(),
'status' => 'partial_failure',
'errors' => $writeResult->getWriteErrors()
];
$results['inserted_count'] += $writeResult->getInsertedCount();
$results['failed_batches']++;
} catch (Exception $e) {
$batchResult = [
'batch_number' => $batchIndex + 1,
'batch_size' => count($batch),
'inserted_count' => 0,
'status' => 'failed',
'error' => $e->getMessage()
];
$results['failed_batches']++;
}
$results['batch_results'][] = $batchResult;
// 调用回调函数
if ($callback && is_callable($callback)) {
$callback($batchResult, $batchIndex + 1, $totalBatches);
}
}
return $results;
}
public function batchInsertWithOrderedOption($collectionName, $documents, $ordered = true) {
$collection = $this->database->selectCollection($collectionName);
try {
$result = $collection->insertMany($documents, ['ordered' => $ordered]);
return [
'success' => true,
'ordered' => $ordered,
'inserted_count' => $result->getInsertedCount(),
'inserted_ids' => $result->getInsertedIds()
];
} catch (MongoDB\Driver\Exception\BulkWriteException $e) {
$writeResult = $e->getWriteResult();
return [
'success' => false,
'ordered' => $ordered,
'inserted_count' => $writeResult->getInsertedCount(),
'write_errors' => $writeResult->getWriteErrors(),
'message' => $ordered ?
'Insertion stopped at first error' :
'Some documents failed, others inserted'
];
}
}
public function benchmarkBatchInsertion($collectionName, $documentCount, $batchSizes = [100, 500, 1000, 5000]) {
$collection = $this->database->selectCollection($collectionName);
$benchmarkResults = [];
foreach ($batchSizes as $batchSize) {
$documents = [];
for ($i = 0; $i < $documentCount; $i++) {
$documents[] = [
'name' => "User {$i}",
'email' => "user{$i}@example.com",
'age' => rand(18, 65),
'batch_size_test' => $batchSize
];
}
$startTime = microtime(true);
try {
$result = $collection->insertMany($documents);
$endTime = microtime(true);
$executionTime = $endTime - $startTime;
$throughput = $documentCount / $executionTime;
$benchmarkResults[] = [
'batch_size' => $batchSize,
'document_count' => $documentCount,
'execution_time_seconds' => round($executionTime, 4),
'throughput_docs_per_sec' => round($throughput, 2),
'avg_time_per_doc_ms' => round(($executionTime / $documentCount) * 1000, 4),
'status' => 'success'
];
} catch (Exception $e) {
$benchmarkResults[] = [
'batch_size' => $batchSize,
'document_count' => $documentCount,
'status' => 'failed',
'error' => $e->getMessage()
];
}
// 清理测试数据
$collection->deleteMany(['batch_size_test' => $batchSize]);
}
return $benchmarkResults;
}
}
// 使用示例
$batchInsertOps = new BatchInsertOperations('testdb');
// 基本批量插入
$documents = [];
for ($i = 0; $i < 100; $i++) {
$documents[] = [
'name' => "User {$i}",
'email' => "user{$i}@example.com",
'age' => rand(18, 65)
];
}
$insertResult = $batchInsertOps->batchInsert('users', $documents);
print_r($insertResult);
// 带进度的批量插入
$largeDocuments = [];
for ($i = 0; $i < 10000; $i++) {
$largeDocuments[] = [
'name' => "User {$i}",
'email' => "user{$i}@example.com",
'age' => rand(18, 65)
];
}
$progressCallback = function($batchResult, $currentBatch, $totalBatches) {
echo "Batch {$currentBatch}/{$totalBatches}: ";
echo "Inserted {$batchResult['inserted_count']} documents, ";
echo "Status: {$batchResult['status']}\n";
};
$progressResult = $batchInsertOps->batchInsertWithProgress(
'users',
$largeDocuments,
1000,
$progressCallback
);
print_r($progressResult);
// 有序vs无序批量插入
$duplicateDocuments = [
['name' => 'Test User 1', 'email' => 'test1@example.com'],
['name' => 'Test User 2', 'email' => 'test2@example.com'],
['name' => 'Test User 1', 'email' => 'test1@example.com'], // 重复
['name' => 'Test User 3', 'email' => 'test3@example.com']
];
$orderedResult = $batchInsertOps->batchInsertWithOrderedOption('users', $duplicateDocuments, true);
print_r($orderedResult);
$unorderedResult = $batchInsertOps->batchInsertWithOrderedOption('users', $duplicateDocuments, false);
print_r($unorderedResult);
// 批量插入性能基准测试
$benchmarkResults = $batchInsertOps->benchmarkBatchInsertion('users', 10000);
print_r($benchmarkResults);
?>批量更新原理
使用updateMany()方法批量更新文档:
php
<?php
// 批量更新详解
class BatchUpdateOperations {
private $database;
public function __construct($databaseName) {
$client = new MongoDB\Client("mongodb://localhost:27017");
$this->database = $client->selectDatabase($databaseName);
}
public function batchUpdate($collectionName, $filter, $update, $options = []) {
$collection = $this->database->selectCollection($collectionName);
try {
$result = $collection->updateMany($filter, $update, $options);
return [
'success' => true,
'matched_count' => $result->getMatchedCount(),
'modified_count' => $result->getModifiedCount(),
'upserted_count' => $result->getUpsertedCount(),
'upserted_id' => $result->getUpsertedId(),
'acknowledged' => $result->isAcknowledged()
];
} catch (Exception $e) {
return [
'success' => false,
'error' => 'Batch update failed',
'message' => $e->getMessage()
];
}
}
public function batchUpdateWithProgress($collectionName, $filter, $update, $batchSize = 1000, $callback = null) {
$collection = $this->database->selectCollection($collectionName);
// 获取匹配的文档总数
$totalMatched = $collection->countDocuments($filter);
if ($totalMatched === 0) {
return [
'success' => true,
'message' => 'No documents match the filter',
'total_matched' => 0,
'total_modified' => 0
];
}
$results = [
'total_matched' => $totalMatched,
'batch_size' => $batchSize,
'total_batches' => ceil($totalMatched / $batchSize),
'total_modified' => 0,
'batch_results' => []
];
$skip = 0;
$batchNumber = 0;
while ($skip < $totalMatched) {
$batchNumber++;
try {
$result = $collection->updateMany($filter, $update, [
'skip' => $skip,
'limit' => $batchSize
]);
$batchResult = [
'batch_number' => $batchNumber,
'matched_count' => $result->getMatchedCount(),
'modified_count' => $result->getModifiedCount(),
'status' => 'success'
];
$results['total_modified'] += $result->getModifiedCount();
} catch (Exception $e) {
$batchResult = [
'batch_number' => $batchNumber,
'matched_count' => 0,
'modified_count' => 0,
'status' => 'failed',
'error' => $e->getMessage()
];
}
$results['batch_results'][] = $batchResult;
// 调用回调函数
if ($callback && is_callable($callback)) {
$callback($batchResult, $batchNumber, $results['total_batches']);
}
$skip += $batchSize;
}
return $results;
}
public function batchUpdateWithArrayFilters($collectionName, $filter, $update, $arrayFilters) {
$collection = $this->database->selectCollection($collectionName);
try {
$result = $collection->updateMany($filter, $update, [
'arrayFilters' => $arrayFilters
]);
return [
'success' => true,
'array_filters' => $arrayFilters,
'matched_count' => $result->getMatchedCount(),
'modified_count' => $result->getModifiedCount()
];
} catch (Exception $e) {
return [
'success' => false,
'error' => 'Batch update with array filters failed',
'message' => $e->getMessage()
];
}
}
public function batchUpdateWithUpsert($collectionName, $filter, $update, $options = []) {
$collection = $this->database->selectCollection($collectionName);
try {
$result = $collection->updateMany($filter, $update, array_merge($options, ['upsert' => true]));
$wasInserted = $result->getUpsertedCount() > 0;
$wasUpdated = $result->getModifiedCount() > 0;
return [
'success' => true,
'upsert_enabled' => true,
'inserted' => $wasInserted,
'updated' => $wasUpdated,
'matched_count' => $result->getMatchedCount(),
'modified_count' => $result->getModifiedCount(),
'upserted_count' => $result->getUpsertedCount(),
'upserted_id' => $result->getUpsertedId()
];
} catch (Exception $e) {
return [
'success' => false,
'error' => 'Batch update with upsert failed',
'message' => $e->getMessage()
];
}
}
}
// 使用示例
$batchUpdateOps = new BatchUpdateOperations('testdb');
// 基本批量更新
$filter = ['status' => 'pending'];
$update = ['$set' => ['status' => 'processed', 'processed_at' => new MongoDB\BSON\UTCDateTime()]];
$updateResult = $batchUpdateOps->batchUpdate('orders', $filter, $update);
print_r($updateResult);
// 带进度的批量更新
$progressCallback = function($batchResult, $currentBatch, $totalBatches) {
echo "Batch {$currentBatch}/{$totalBatches}: ";
echo "Modified {$batchResult['modified_count']} documents, ";
echo "Status: {$batchResult['status']}\n";
};
$progressUpdateResult = $batchUpdateOps->batchUpdateWithProgress(
'orders',
['status' => 'pending'],
['$set' => ['status' => 'processed', 'processed_at' => new MongoDB\BSON\UTCDateTime()]],
1000,
$progressCallback
);
print_r($progressUpdateResult);
// 使用数组过滤器的批量更新
$arrayFilterResult = $batchUpdateOps->batchUpdateWithArrayFilters(
'orders',
['order_id' => 'ORD001'],
['$set' => ['items.$[elem].price' => 99.99]],
[['elem.item_id' => 'ITEM001']]
);
print_r($arrayFilterResult);
// 带upsert的批量更新
$upsertResult = $batchUpdateOps->batchUpdateWithUpsert(
'users',
['role' => 'premium'],
['$set' => ['discount' => 0.1, 'updated_at' => new MongoDB\BSON\UTCDateTime()]]
);
print_r($upsertResult);
?>批量删除原理
使用deleteMany()方法批量删除文档:
php
<?php
// 批量删除详解
class BatchDeleteOperations {
private $database;
public function __construct($databaseName) {
$client = new MongoDB\Client("mongodb://localhost:27017");
$this->database = $client->selectDatabase($databaseName);
}
public function batchDelete($collectionName, $filter, $options = []) {
$collection = $this->database->selectCollection($collectionName);
try {
$result = $collection->deleteMany($filter, $options);
return [
'success' => true,
'deleted_count' => $result->getDeletedCount(),
'acknowledged' => $result->isAcknowledged()
];
} catch (Exception $e) {
return [
'success' => false,
'error' => 'Batch delete failed',
'message' => $e->getMessage()
];
}
}
public function batchDeleteWithProgress($collectionName, $filter, $batchSize = 10000, $callback = null) {
$collection = $this->database->selectCollection($collectionName);
// 获取匹配的文档总数
$totalMatched = $collection->countDocuments($filter);
if ($totalMatched === 0) {
return [
'success' => true,
'message' => 'No documents match the filter',
'total_matched' => 0,
'total_deleted' => 0
];
}
$results = [
'total_matched' => $totalMatched,
'batch_size' => $batchSize,
'total_batches' => ceil($totalMatched / $batchSize),
'total_deleted' => 0,
'batch_results' => []
];
$skip = 0;
$batchNumber = 0;
while ($skip < $totalMatched) {
$batchNumber++;
try {
$result = $collection->deleteMany($filter, [
'skip' => $skip,
'limit' => $batchSize
]);
$batchResult = [
'batch_number' => $batchNumber,
'deleted_count' => $result->getDeletedCount(),
'status' => 'success'
];
$results['total_deleted'] += $result->getDeletedCount();
} catch (Exception $e) {
$batchResult = [
'batch_number' => $batchNumber,
'deleted_count' => 0,
'status' => 'failed',
'error' => $e->getMessage()
];
}
$results['batch_results'][] = $batchResult;
// 调用回调函数
if ($callback && is_callable($callback)) {
$callback($batchResult, $batchNumber, $results['total_batches']);
}
$skip += $batchSize;
}
return $results;
}
public function batchDeleteWithBackup($collectionName, $filter, $backup = true) {
$collection = $this->database->selectCollection($collectionName);
try {
// 备份数据
$backupData = null;
if ($backup) {
$backupData = $collection->find($filter)->toArray();
if (!empty($backupData)) {
$backupCollection = $this->database->selectCollection(
$collection->getCollectionName() . '_backup_' . date('YmdHis')
);
$backupCollection->insertMany($backupData);
}
}
// 删除数据
$result = $collection->deleteMany($filter);
return [
'success' => true,
'backup_created' => $backup && !empty($backupData),
'backup_count' => $backup ? count($backupData) : 0,
'deleted_count' => $result->getDeletedCount()
];
} catch (Exception $e) {
return [
'success' => false,
'error' => 'Batch delete with backup failed',
'message' => $e->getMessage()
];
}
}
public function batchDeleteByDateRange($collectionName, $dateField, $startDate, $endDate, $batchSize = 10000) {
$collection = $this->database->selectCollection($collectionName);
try {
$filter = [
$dateField => [
'$gte' => $startDate,
'$lte' => $endDate
]
];
return $this->batchDeleteWithProgress($collectionName, $filter, $batchSize);
} catch (Exception $e) {
return [
'success' => false,
'error' => 'Date range batch delete failed',
'message' => $e->getMessage()
];
}
}
}
// 使用示例
$batchDeleteOps = new BatchDeleteOperations('testdb');
// 基本批量删除
$filter = ['status' => 'inactive'];
$deleteResult = $batchDeleteOps->batchDelete('users', $filter);
print_r($deleteResult);
// 带进度的批量删除
$progressCallback = function($batchResult, $currentBatch, $totalBatches) {
echo "Batch {$currentBatch}/{$totalBatches}: ";
echo "Deleted {$batchResult['deleted_count']} documents, ";
echo "Status: {$batchResult['status']}\n";
};
$progressDeleteResult = $batchDeleteOps->batchDeleteWithProgress(
'logs',
['created_at' => ['$lt' => new MongoDB\BSON\UTCDateTime((time() - 90 * 86400) * 1000)]],
10000,
$progressCallback
);
print_r($progressDeleteResult);
// 带备份的批量删除
$backupDeleteResult = $batchDeleteOps->batchDeleteWithBackup(
'users',
['status' => 'deleted'],
true
);
print_r($backupDeleteResult);
// 按日期范围批量删除
$startDate = new MongoDB\BSON\UTCDateTime(strtotime('2024-01-01') * 1000);
$endDate = new MongoDB\BSON\UTCDateTime(strtotime('2024-01-31') * 1000);
$dateRangeDeleteResult = $batchDeleteOps->batchDeleteByDateRange(
'logs',
'created_at',
$startDate,
$endDate,
5000
);
print_r($dateRangeDeleteResult);
?>混合批量操作原理
使用bulkWrite()方法执行混合批量操作:
php
<?php
// 混合批量操作详解
class MixedBulkOperations {
private $database;
public function __construct($databaseName) {
$client = new MongoDB\Client("mongodb://localhost:27017");
$this->database = $client->selectDatabase($databaseName);
}
public function executeBulkWrite($collectionName, $operations, $options = []) {
$collection = $this->database->selectCollection($collectionName);
try {
$result = $collection->bulkWrite($operations, $options);
return [
'success' => true,
'inserted_count' => $result->getInsertedCount(),
'matched_count' => $result->getMatchedCount(),
'modified_count' => $result->getModifiedCount(),
'deleted_count' => $result->getDeletedCount(),
'upserted_count' => $result->getUpsertedCount(),
'upserted_ids' => $result->getUpsertedIds()
];
} catch (MongoDB\Driver\Exception\BulkWriteException $e) {
$writeResult = $e->getWriteResult();
return [
'success' => false,
'error' => 'Bulk write error',
'inserted_count' => $writeResult->getInsertedCount(),
'matched_count' => $writeResult->getMatchedCount(),
'modified_count' => $writeResult->getModifiedCount(),
'deleted_count' => $writeResult->getDeletedCount(),
'write_errors' => $writeResult->getWriteErrors(),
'message' => $e->getMessage()
];
} catch (Exception $e) {
return [
'success' => false,
'error' => 'Bulk write failed',
'message' => $e->getMessage()
];
}
}
public function executeBulkWriteWithProgress($collectionName, $operations, $batchSize = 1000, $callback = null) {
$collection = $this->database->selectCollection($collectionName);
$totalOperations = count($operations);
$batches = array_chunk($operations, $batchSize);
$totalBatches = count($batches);
$results = [
'total_operations' => $totalOperations,
'total_batches' => $totalBatches,
'batch_size' => $batchSize,
'total_inserted' => 0,
'total_updated' => 0,
'total_deleted' => 0,
'batch_results' => []
];
foreach ($batches as $batchIndex => $batch) {
try {
$result = $collection->bulkWrite($batch);
$batchResult = [
'batch_number' => $batchIndex + 1,
'batch_size' => count($batch),
'inserted_count' => $result->getInsertedCount(),
'matched_count' => $result->getMatchedCount(),
'modified_count' => $result->getModifiedCount(),
'deleted_count' => $result->getDeletedCount(),
'status' => 'success'
];
$results['total_inserted'] += $result->getInsertedCount();
$results['total_updated'] += $result->getModifiedCount();
$results['total_deleted'] += $result->getDeletedCount();
} catch (MongoDB\Driver\Exception\BulkWriteException $e) {
$writeResult = $e->getWriteResult();
$batchResult = [
'batch_number' => $batchIndex + 1,
'batch_size' => count($batch),
'inserted_count' => $writeResult->getInsertedCount(),
'matched_count' => $writeResult->getMatchedCount(),
'modified_count' => $writeResult->getModifiedCount(),
'deleted_count' => $writeResult->getDeletedCount(),
'status' => 'partial_failure',
'errors' => $writeResult->getWriteErrors()
];
$results['total_inserted'] += $writeResult->getInsertedCount();
$results['total_updated'] += $writeResult->getModifiedCount();
$results['total_deleted'] += $writeResult->getDeletedCount();
} catch (Exception $e) {
$batchResult = [
'batch_number' => $batchIndex + 1,
'batch_size' => count($batch),
'inserted_count' => 0,
'matched_count' => 0,
'modified_count' => 0,
'deleted_count' => 0,
'status' => 'failed',
'error' => $e->getMessage()
];
}
$results['batch_results'][] = $batchResult;
// 调用回调函数
if ($callback && is_callable($callback)) {
$callback($batchResult, $batchIndex + 1, $totalBatches);
}
}
return $results;
}
public function executeOrderedBulkWrite($collectionName, $operations, $ordered = true) {
$collection = $this->database->selectCollection($collectionName);
try {
$result = $collection->bulkWrite($operations, ['ordered' => $ordered]);
return [
'success' => true,
'ordered' => $ordered,
'inserted_count' => $result->getInsertedCount(),
'matched_count' => $result->getMatchedCount(),
'modified_count' => $result->getModifiedCount(),
'deleted_count' => $result->getDeletedCount()
];
} catch (MongoDB\Driver\Exception\BulkWriteException $e) {
$writeResult = $e->getWriteResult();
return [
'success' => false,
'ordered' => $ordered,
'inserted_count' => $writeResult->getInsertedCount(),
'matched_count' => $writeResult->getMatchedCount(),
'modified_count' => $writeResult->getModifiedCount(),
'deleted_count' => $writeResult->getDeletedCount(),
'write_errors' => $writeResult->getWriteErrors(),
'message' => $ordered ?
'Execution stopped at first error' :
'Some operations failed, others succeeded'
];
}
}
}
// 使用示例
$mixedBulkOps = new MixedBulkOperations('testdb');
// 基本混合批量操作
$operations = [
new MongoDB\Driver\BulkWrite(['insertOne' => [
['name' => 'User 1', 'email' => 'user1@example.com', 'age' => 25]
]]),
new MongoDB\Driver\BulkWrite(['insertOne' => [
['name' => 'User 2', 'email' => 'user2@example.com', 'age' => 30]
]]),
new MongoDB\Driver\BulkWrite(['updateOne' => [
['email' => 'user1@example.com'],
['$set' => ['age' => 26]]
]]),
new MongoDB\Driver\BulkWrite(['deleteOne' => [
['email' => 'user2@example.com']
]])
];
$bulkWriteResult = $mixedBulkOps->executeBulkWrite('users', $operations);
print_r($bulkWriteResult);
// 带进度的混合批量操作
$largeOperations = [];
for ($i = 0; $i < 10000; $i++) {
$largeOperations[] = new MongoDB\Driver\BulkWrite(['insertOne' => [
['name' => "User {$i}", 'email' => "user{$i}@example.com", 'age' => rand(18, 65)]
]]);
}
$progressCallback = function($batchResult, $currentBatch, $totalBatches) {
echo "Batch {$currentBatch}/{$totalBatches}: ";
echo "Inserted {$batchResult['inserted_count']}, ";
echo "Updated {$batchResult['modified_count']}, ";
echo "Deleted {$batchResult['deleted_count']}, ";
echo "Status: {$batchResult['status']}\n";
};
$progressBulkResult = $mixedBulkOps->executeBulkWriteWithProgress(
'users',
$largeOperations,
1000,
$progressCallback
);
print_r($progressBulkResult);
// 有序混合批量操作
$orderedBulkResult = $mixedBulkOps->executeOrderedBulkWrite('users', $operations, true);
print_r($orderedBulkResult);
$unorderedBulkResult = $mixedBulkOps->executeOrderedBulkWrite('users', $operations, false);
print_r($unorderedBulkResult);
?>常见错误与踩坑点
错误1:批量操作内存溢出
问题描述:一次性处理过多数据导致内存溢出。
php
<?php
// 错误示例 - 批量操作内存溢出
try {
$client = new MongoDB\Client("mongodb://localhost:27017");
$database = $client->selectDatabase("testdb");
$collection = $database->selectCollection('users');
// 错误:一次性插入过多数据
$largeDocuments = [];
for ($i = 0; $i < 100000; $i++) {
$largeDocuments[] = [
'name' => "User {$i}",
'email' => "user{$i}@example.com",
'data' => str_repeat('x', 10000) // 大数据
];
}
$collection->insertMany($largeDocuments);
} catch (Exception $e) {
echo "错误: " . $e->getMessage() . "\n";
}
// 正确示例 - 分批处理大量数据
$client = new MongoDB\Client("mongodb://localhost:27017");
$database = $client->selectDatabase("testdb");
$collection = $database->selectCollection('users');
// 正确:分批插入
$batchSize = 1000;
$batches = array_chunk($largeDocuments, $batchSize);
foreach ($batches as $batchIndex => $batch) {
try {
$collection->insertMany($batch);
echo "Batch " . ($batchIndex + 1) . ": Inserted " . count($batch) . " documents\n";
// 释放内存
unset($batch);
} catch (Exception $e) {
echo "Batch " . ($batchIndex + 1) . " failed: " . $e->getMessage() . "\n";
}
}
echo "All batches processed\n";
?>错误2:批量操作错误处理不当
问题描述:批量操作部分失败时错误处理不当。
php
<?php
// 错误示例 - 批量操作错误处理不当
try {
$client = new MongoDB\Client("mongodb://localhost:27017");
$database = $client->selectDatabase("testdb");
$collection = $database->selectCollection('users');
// 错误:不处理批量操作的部分失败
$documents = [
['name' => 'User 1', 'email' => 'user1@example.com'],
['name' => 'User 2', 'email' => 'user1@example.com'], // 重复邮箱
['name' => 'User 3', 'email' => 'user3@example.com']
];
$result = $collection->insertMany($documents);
echo "Inserted " . $result->getInsertedCount() . " documents\n";
} catch (MongoDB\Driver\Exception\BulkWriteException $e) {
echo "错误: " . $e->getMessage() . "\n";
}
// 正确示例 - 妥善处理批量操作错误
$client = new MongoDB\Client("mongodb://localhost:27017");
$database = $client->selectDatabase("testdb");
$collection = $database->selectCollection('users');
// 正确:处理部分失败
$documents = [
['name' => 'User 1', 'email' => 'user1@example.com'],
['name' => 'User 2', 'email' => 'user1@example.com'], // 重复邮箱
['name' => 'User 3', 'email' => 'user3@example.com']
];
try {
$result = $collection->insertMany($documents);
echo "Successfully inserted " . $result->getInsertedCount() . " documents\n";
} catch (MongoDB\Driver\Exception\BulkWriteException $e) {
$writeResult = $e->getWriteResult();
echo "Partial failure occurred\n";
echo "Successfully inserted: " . $writeResult->getInsertedCount() . "\n";
$writeErrors = $writeResult->getWriteErrors();
foreach ($writeErrors as $error) {
echo "Error: " . $error->getMessage() . "\n";
echo "Code: " . $error->getCode() . "\n";
}
// 重试失败的操作
$failedDocuments = [];
foreach ($writeErrors as $error) {
$index = $error->getIndex();
$failedDocuments[] = $documents[$index];
}
if (!empty($failedDocuments)) {
echo "Retrying " . count($failedDocuments) . " failed documents...\n";
foreach ($failedDocuments as $document) {
try {
$collection->insertOne($document);
echo "Retried: " . $document['name'] . "\n";
} catch (Exception $retryError) {
echo "Retry failed: " . $retryError->getMessage() . "\n";
}
}
}
}
?>常见应用场景
场景1:数据导入系统
实现高性能数据导入功能:
php
<?php
// 数据导入系统
class DataImportSystem {
private $database;
public function __construct($databaseName) {
$client = new MongoDB\Client("mongodb://localhost:27017");
$this->database = $client->selectDatabase($databaseName);
}
public function importData($collectionName, $data, $options = []) {
$collection = $this->database->selectCollection($collectionName);
$batchSize = $options['batch_size'] ?? 1000;
$continueOnError = $options['continue_on_error'] ?? true;
$updateExisting = $options['update_existing'] ?? false;
$results = [
'total_records' => count($data),
'imported' => 0,
'updated' => 0,
'failed' => 0,
'errors' => []
];
$batches = array_chunk($data, $batchSize);
foreach ($batches as $batchIndex => $batch) {
foreach ($batch as $record) {
try {
if ($updateExisting) {
// 更新或插入
$filter = $this->buildUniqueFilter($record, $options['unique_fields'] ?? []);
$result = $collection->updateOne(
$filter,
[
'$set' => $record,
'$setOnInsert' => ['created_at' => new MongoDB\BSON\UTCDateTime()]
],
['upsert' => true]
);
if ($result->getUpsertedCount() > 0) {
$results['imported']++;
} else {
$results['updated']++;
}
} else {
// 仅插入新记录
$result = $collection->insertOne($record);
$results['imported']++;
}
} catch (MongoDB\Driver\Exception\BulkWriteException $e) {
$results['failed']++;
if (!$continueOnError) {
throw $e;
}
$results['errors'][] = [
'record' => $record,
'error' => $e->getMessage()
];
} catch (Exception $e) {
$results['failed']++;
$results['errors'][] = [
'record' => $record,
'error' => $e->getMessage()
];
}
}
}
return $results;
}
private function buildUniqueFilter($record, $uniqueFields) {
$filter = [];
foreach ($uniqueFields as $field) {
if (isset($record[$field])) {
$filter[$field] = $record[$field];
}
}
return $filter;
}
public function importWithProgress($collectionName, $data, $options = []) {
$collection = $this->database->selectCollection($collectionName);
$batchSize = $options['batch_size'] ?? 1000;
$callback = $options['progress_callback'] ?? null;
$totalRecords = count($data);
$batches = array_chunk($data, $batchSize);
$totalBatches = count($batches);
$results = [
'total_records' => $totalRecords,
'total_batches' => $totalBatches,
'batch_size' => $batchSize,
'imported' => 0,
'updated' => 0,
'failed' => 0,
'batch_results' => []
];
foreach ($batches as $batchIndex => $batch) {
try {
$result = $collection->insertMany($batch);
$batchResult = [
'batch_number' => $batchIndex + 1,
'batch_size' => count($batch),
'imported_count' => $result->getInsertedCount(),
'status' => 'success'
];
$results['imported'] += $result->getInsertedCount();
} catch (MongoDB\Driver\Exception\BulkWriteException $e) {
$writeResult = $e->getWriteResult();
$batchResult = [
'batch_number' => $batchIndex + 1,
'batch_size' => count($batch),
'imported_count' => $writeResult->getInsertedCount(),
'status' => 'partial_failure',
'errors' => $writeResult->getWriteErrors()
];
$results['imported'] += $writeResult->getInsertedCount();
$results['failed'] += count($batch) - $writeResult->getInsertedCount();
} catch (Exception $e) {
$batchResult = [
'batch_number' => $batchIndex + 1,
'batch_size' => count($batch),
'imported_count' => 0,
'status' => 'failed',
'error' => $e->getMessage()
];
$results['failed'] += count($batch);
}
$results['batch_results'][] = $batchResult;
// 调用回调函数
if ($callback && is_callable($callback)) {
$callback($batchResult, $batchIndex + 1, $totalBatches);
}
}
return $results;
}
}
// 使用示例
$dataImportSystem = new DataImportSystem('testdb');
// 基本数据导入
$importData = [];
for ($i = 0; $i < 5000; $i++) {
$importData[] = [
'name' => "User {$i}",
'email' => "user{$i}@example.com",
'age' => rand(18, 65)
];
}
$importResult = $dataImportSystem->importData('users', $importData, [
'batch_size' => 1000,
'continue_on_error' => true,
'update_existing' => false
]);
print_r($importResult);
// 带进度的数据导入
$progressCallback = function($batchResult, $currentBatch, $totalBatches) {
echo "Batch {$currentBatch}/{$totalBatches}: ";
echo "Imported {$batchResult['imported_count']} documents, ";
echo "Status: {$batchResult['status']}\n";
};
$progressImportResult = $dataImportSystem->importWithProgress(
'users',
$importData,
[
'batch_size' => 1000,
'progress_callback' => $progressCallback
]
);
print_r($progressImportResult);
?>常见问题答疑
问题1:如何选择合适的批量大小?
回答:根据数据大小、网络延迟、服务器性能选择批量大小:
php
<?php
// 批量大小选择助手
class BatchSizeSelector {
public static function selectBatchSize($documentSize, $networkLatency, $serverPerformance) {
$baseBatchSize = 1000;
// 根据文档大小调整
if ($documentSize > 10000) { // > 10KB
$baseBatchSize = 100;
} elseif ($documentSize > 5000) { // > 5KB
$baseBatchSize = 500;
} elseif ($documentSize > 1000) { // > 1KB
$baseBatchSize = 1000;
}
// 根据网络延迟调整
if ($networkLatency > 100) { // > 100ms
$baseBatchSize = max(100, $baseBatchSize * 0.5);
} elseif ($networkLatency > 50) { // > 50ms
$baseBatchSize = $baseBatchSize * 0.75;
}
// 根据服务器性能调整
if ($serverPerformance === 'low') {
$baseBatchSize = max(100, $baseBatchSize * 0.5);
} elseif ($serverPerformance === 'high') {
$baseBatchSize = $baseBatchSize * 1.5;
}
return [
'recommended_batch_size' => (int)$baseBatchSize,
'document_size' => $documentSize,
'network_latency' => $networkLatency,
'server_performance' => $serverPerformance
];
}
}
// 使用示例
$batchSize = BatchSizeSelector::selectBatchSize(5000, 75, 'medium');
print_r($batchSize);
?>实战练习
练习1:实现批量数据同步
实现一个批量数据同步系统:
php
<?php
// 批量数据同步系统
class BatchDataSync {
private $sourceDatabase;
private $targetDatabase;
public function __construct($sourceDatabaseName, $targetDatabaseName) {
$client = new MongoDB\Client("mongodb://localhost:27017");
$this->sourceDatabase = $client->selectDatabase($sourceDatabaseName);
$this->targetDatabase = $client->selectDatabase($targetDatabaseName);
}
public function syncCollection($sourceCollectionName, $targetCollectionName, $options = []) {
$sourceCollection = $this->sourceDatabase->selectCollection($sourceCollectionName);
$targetCollection = $this->targetDatabase->selectCollection($targetCollectionName);
$batchSize = $options['batch_size'] ?? 1000;
$syncStrategy = $options['sync_strategy'] ?? 'insert_only';
// 获取源数据
$sourceDocuments = $sourceCollection->find()->toArray();
$results = [
'total_documents' => count($sourceDocuments),
'synced' => 0,
'updated' => 0,
'failed' => 0,
'batch_results' => []
];
$batches = array_chunk($sourceDocuments, $batchSize);
foreach ($batches as $batchIndex => $batch) {
try {
if ($syncStrategy === 'insert_only') {
// 仅插入
$result = $targetCollection->insertMany($batch);
$results['synced'] += $result->getInsertedCount();
} elseif ($syncStrategy === 'upsert') {
// 更新或插入
foreach ($batch as $document) {
$filter = ['_id' => $document['_id']];
$result = $targetCollection->updateOne(
$filter,
['$set' => $document],
['upsert' => true]
);
if ($result->getUpsertedCount() > 0) {
$results['synced']++;
} else {
$results['updated']++;
}
}
}
$batchResult = [
'batch_number' => $batchIndex + 1,
'batch_size' => count($batch),
'status' => 'success'
];
} catch (Exception $e) {
$batchResult = [
'batch_number' => $batchIndex + 1,
'batch_size' => count($batch),
'status' => 'failed',
'error' => $e->getMessage()
];
$results['failed'] += count($batch);
}
$results['batch_results'][] = $batchResult;
}
return $results;
}
}
// 使用示例
$batchSync = new BatchDataSync('source_db', 'target_db');
$syncResult = $batchSync->syncCollection('users', 'users', [
'batch_size' => 1000,
'sync_strategy' => 'upsert'
]);
print_r($syncResult);
?>知识点总结
核心概念
- 批量操作方法:
insertMany()、updateMany()、deleteMany()、bulkWrite() - 批量操作优势:性能提升、原子性、资源优化
- 批量操作限制:文档大小、批量大小、内存使用
- 错误处理:部分失败、重试机制、错误恢复
批量操作方法
- insertMany():批量插入文档
- updateMany():批量更新文档
- deleteMany():批量删除文档
- bulkWrite():执行混合批量操作
最佳实践
- 批量大小:根据数据大小选择合适的批量大小
- 错误处理:妥善处理部分失败的情况
- 进度监控:提供批量操作进度反馈
- 内存管理:分批处理大量数据避免内存溢出
常见场景
- 数据导入:批量导入大量数据
- 数据同步:批量同步数据到目标集合
- 批量更新:批量更新符合条件的文档
- 数据清理:批量删除过期数据
拓展参考资料
官方文档
- MongoDB批量写入:https://docs.mongodb.com/manual/reference/method/db.collection.bulkWrite/
- insertMany方法:https://docs.mongodb.com/manual/reference/method/db.collection.insertMany/
- updateMany方法:https://docs.mongodb.com/manual/reference/method/db.collection.updateMany/
推荐阅读
- 《MongoDB批量操作优化》
- 《大规模数据导入实践》
- 《MongoDB性能调优指南》
在线资源
- MongoDB University批量操作课程
- MongoDB官方博客性能优化文章
- Stack Overflow MongoDB批量操作相关问题
