Appearance
4.1 插入文档 (Create)
概述
插入文档是MongoDB中最基本的操作之一,用于向集合中添加新数据。MongoDB提供了多种插入文档的方法,包括单文档插入、批量插入、条件插入等。本章节将详细介绍MongoDB文档插入的各种方式、最佳实践以及性能优化技巧。
MongoDB的插入操作具有原子性、灵活性和高性能的特点。理解不同插入方法的适用场景和性能特性,对于构建高效的数据存储系统至关重要。
基本概念
插入操作类型
MongoDB支持以下插入操作类型:
- 单文档插入:使用
insertOne()方法插入单个文档 - 批量插入:使用
insertMany()方法插入多个文档 - 条件插入:使用
updateOne()与$setOnInsert操作符实现条件插入 - 批量写入:使用
bulkWrite()方法执行混合的批量操作
文档ID生成
MongoDB文档的_id字段具有以下特点:
- 唯一性:每个文档必须有唯一的
_id值 - 自动生成:如果不提供
_id,MongoDB会自动生成ObjectId - 自定义ID:可以自定义
_id值,但必须确保唯一性 - 不可变性:
_id字段创建后不能修改
插入性能因素
影响插入性能的主要因素:
- 文档大小:较大的文档插入速度较慢
- 索引数量:索引越多,插入速度越慢
- 批量大小:适当的批量大小可以提高性能
- 写入关注:不同的写入关注级别影响性能
- 网络延迟:网络延迟影响插入响应时间
原理深度解析
单文档插入原理
单文档插入使用insertOne()方法,返回插入的文档ID:
php
<?php
// 单文档插入详解
class SingleDocumentInsertion {
private $database;
public function __construct($databaseName) {
$client = new MongoDB\Client("mongodb://localhost:27017");
$this->database = $client->selectDatabase($databaseName);
}
public function insertSingleDocument($collectionName, $document) {
$collection = $this->database->selectCollection($collectionName);
try {
$result = $collection->insertOne($document);
return [
'success' => true,
'inserted_id' => $result->getInsertedId(),
'inserted_count' => $result->getInsertedCount(),
'acknowledged' => $result->isAcknowledged()
];
} catch (MongoDB\Driver\Exception\BulkWriteException $e) {
return [
'success' => false,
'error' => 'Duplicate key error',
'message' => $e->getMessage(),
'write_errors' => $e->getWriteResult()->getWriteErrors()
];
} catch (Exception $e) {
return [
'success' => false,
'error' => 'Insertion failed',
'message' => $e->getMessage()
];
}
}
public function insertWithCustomId($collectionName, $customId, $document) {
$collection = $this->database->selectCollection($collectionName);
// 添加自定义ID
$document['_id'] = $customId;
try {
$result = $collection->insertOne($document);
return [
'success' => true,
'custom_id' => $customId,
'inserted_id' => $result->getInsertedId(),
'message' => 'Document inserted with custom ID'
];
} catch (MongoDB\Driver\Exception\BulkWriteException $e) {
return [
'success' => false,
'error' => 'Duplicate key error',
'message' => 'Document with this ID already exists',
'custom_id' => $customId
];
}
}
public function insertWithValidation($collectionName, $document, $validationRules) {
$collection = $this->database->selectCollection($collectionName);
// 验证文档
$validationResult = $this->validateDocument($document, $validationRules);
if (!$validationResult['valid']) {
return [
'success' => false,
'error' => 'Validation failed',
'validation_errors' => $validationResult['errors']
];
}
try {
$result = $collection->insertOne($document);
return [
'success' => true,
'inserted_id' => $result->getInsertedId(),
'validation_passed' => true
];
} catch (Exception $e) {
return [
'success' => false,
'error' => 'Insertion failed',
'message' => $e->getMessage()
];
}
}
private function validateDocument($document, $rules) {
$errors = [];
foreach ($rules as $field => $rule) {
if (!isset($document[$field])) {
if ($rule['required'] ?? false) {
$errors[] = "Field '{$field}' is required";
}
continue;
}
$value = $document[$field];
// 类型验证
if (isset($rule['type'])) {
$expectedType = $rule['type'];
$actualType = gettype($value);
if ($actualType !== $expectedType) {
$errors[] = "Field '{$field}' should be {$expectedType}, got {$actualType}";
}
}
// 长度验证
if (isset($rule['min_length']) && strlen($value) < $rule['min_length']) {
$errors[] = "Field '{$field}' should be at least {$rule['min_length']} characters";
}
if (isset($rule['max_length']) && strlen($value) > $rule['max_length']) {
$errors[] = "Field '{$field}' should be at most {$rule['max_length']} characters";
}
// 值范围验证
if (isset($rule['min']) && $value < $rule['min']) {
$errors[] = "Field '{$field}' should be at least {$rule['min']}";
}
if (isset($rule['max']) && $value > $rule['max']) {
$errors[] = "Field '{$field}' should be at most {$rule['max']}";
}
}
return [
'valid' => empty($errors),
'errors' => $errors
];
}
public function insertWithMetadata($collectionName, $document, $metadata = []) {
$collection = $this->database->selectCollection($collectionName);
// 添加元数据
$document['created_at'] = new MongoDB\BSON\UTCDateTime();
$document['updated_at'] = new MongoDB\BSON\UTCDateTime();
$document['version'] = 1;
if (!empty($metadata)) {
$document['metadata'] = $metadata;
}
try {
$result = $collection->insertOne($document);
return [
'success' => true,
'inserted_id' => $result->getInsertedId(),
'metadata_added' => !empty($metadata)
];
} catch (Exception $e) {
return [
'success' => false,
'error' => 'Insertion failed',
'message' => $e->getMessage()
];
}
}
}
// 使用示例
$singleInsertion = new SingleDocumentInsertion('testdb');
// 基本单文档插入
$document = [
'name' => 'John Doe',
'email' => 'john@example.com',
'age' => 30,
'active' => true
];
$result = $singleInsertion->insertSingleDocument('users', $document);
print_r($result);
// 使用自定义ID插入
$customId = 'user_001';
$customDocument = [
'name' => 'Jane Smith',
'email' => 'jane@example.com',
'age' => 25
];
$customResult = $singleInsertion->insertWithCustomId('users', $customId, $customDocument);
print_r($customResult);
// 带验证的插入
$validationRules = [
'name' => ['required' => true, 'type' => 'string', 'min_length' => 2, 'max_length' => 100],
'email' => ['required' => true, 'type' => 'string'],
'age' => ['required' => true, 'type' => 'integer', 'min' => 0, 'max' => 150]
];
$validatedDocument = [
'name' => 'Bob Johnson',
'email' => 'bob@example.com',
'age' => 35
];
$validationResult = $singleInsertion->insertWithValidation('users', $validatedDocument, $validationRules);
print_r($validationResult);
// 带元数据的插入
$metadata = [
'source' => 'web_application',
'ip_address' => '192.168.1.1',
'user_agent' => 'Mozilla/5.0'
];
$metadataResult = $singleInsertion->insertWithMetadata('users', $document, $metadata);
print_r($metadataResult);
?>批量插入原理
批量插入使用insertMany()方法,可以高效插入多个文档:
php
<?php
// 批量插入详解
class BatchDocumentInsertion {
private $database;
public function __construct($databaseName) {
$client = new MongoDB\Client("mongodb://localhost:27017");
$this->database = $client->selectDatabase($databaseName);
}
public function insertBatchDocuments($collectionName, $documents, $options = []) {
$collection = $this->database->selectCollection($collectionName);
try {
$result = $collection->insertMany($documents, $options);
return [
'success' => true,
'inserted_count' => $result->getInsertedCount(),
'inserted_ids' => $result->getInsertedIds(),
'acknowledged' => $result->isAcknowledged()
];
} catch (MongoDB\Driver\Exception\BulkWriteException $e) {
$writeResult = $e->getWriteResult();
return [
'success' => false,
'error' => 'Bulk write error',
'inserted_count' => $writeResult->getInsertedCount(),
'write_errors' => $writeResult->getWriteErrors(),
'message' => $e->getMessage()
];
} catch (Exception $e) {
return [
'success' => false,
'error' => 'Batch insertion failed',
'message' => $e->getMessage()
];
}
}
public function insertBatchWithProgress($collectionName, $documents, $batchSize = 1000, $callback = null) {
$collection = $this->database->selectCollection($collectionName);
$totalDocuments = count($documents);
$batches = array_chunk($documents, $batchSize);
$totalBatches = count($batches);
$results = [
'total_documents' => $totalDocuments,
'total_batches' => $totalBatches,
'batch_size' => $batchSize,
'inserted_count' => 0,
'failed_batches' => 0,
'batch_results' => []
];
foreach ($batches as $batchIndex => $batch) {
try {
$result = $collection->insertMany($batch);
$batchResult = [
'batch_number' => $batchIndex + 1,
'batch_size' => count($batch),
'inserted_count' => $result->getInsertedCount(),
'status' => 'success'
];
$results['inserted_count'] += $result->getInsertedCount();
} catch (MongoDB\Driver\Exception\BulkWriteException $e) {
$writeResult = $e->getWriteResult();
$batchResult = [
'batch_number' => $batchIndex + 1,
'batch_size' => count($batch),
'inserted_count' => $writeResult->getInsertedCount(),
'status' => 'partial_failure',
'errors' => $writeResult->getWriteErrors()
];
$results['inserted_count'] += $writeResult->getInsertedCount();
$results['failed_batches']++;
} catch (Exception $e) {
$batchResult = [
'batch_number' => $batchIndex + 1,
'batch_size' => count($batch),
'inserted_count' => 0,
'status' => 'failed',
'error' => $e->getMessage()
];
$results['failed_batches']++;
}
$results['batch_results'][] = $batchResult;
// 调用回调函数
if ($callback && is_callable($callback)) {
$callback($batchResult, $batchIndex + 1, $totalBatches);
}
}
return $results;
}
public function insertBatchWithOrderedOption($collectionName, $documents, $ordered = true) {
$collection = $this->database->selectCollection($collectionName);
try {
$result = $collection->insertMany($documents, ['ordered' => $ordered]);
return [
'success' => true,
'ordered' => $ordered,
'inserted_count' => $result->getInsertedCount(),
'inserted_ids' => $result->getInsertedIds(),
'message' => $ordered ?
'All documents inserted in order' :
'Documents inserted without order guarantee'
];
} catch (MongoDB\Driver\Exception\BulkWriteException $e) {
$writeResult = $e->getWriteResult();
return [
'success' => false,
'ordered' => $ordered,
'inserted_count' => $writeResult->getInsertedCount(),
'write_errors' => $writeResult->getWriteErrors(),
'message' => $ordered ?
'Insertion stopped at first error' :
'Some documents failed, others inserted'
];
}
}
public function benchmarkBatchInsertion($collectionName, $documentCount, $batchSizes = [100, 500, 1000, 5000]) {
$collection = $this->database->selectCollection($collectionName);
$benchmarkResults = [];
foreach ($batchSizes as $batchSize) {
$documents = [];
for ($i = 0; $i < $documentCount; $i++) {
$documents[] = [
'name' => "User {$i}",
'email' => "user{$i}@example.com",
'age' => rand(18, 65),
'batch_size_test' => $batchSize
];
}
$startTime = microtime(true);
try {
$result = $collection->insertMany($documents);
$endTime = microtime(true);
$executionTime = $endTime - $startTime;
$throughput = $documentCount / $executionTime;
$benchmarkResults[] = [
'batch_size' => $batchSize,
'document_count' => $documentCount,
'execution_time_seconds' => round($executionTime, 4),
'throughput_docs_per_sec' => round($throughput, 2),
'avg_time_per_doc_ms' => round(($executionTime / $documentCount) * 1000, 4),
'status' => 'success'
];
} catch (Exception $e) {
$benchmarkResults[] = [
'batch_size' => $batchSize,
'document_count' => $documentCount,
'status' => 'failed',
'error' => $e->getMessage()
];
}
// 清理测试数据
$collection->deleteMany(['batch_size_test' => $batchSize]);
}
return $benchmarkResults;
}
}
// 使用示例
$batchInsertion = new BatchDocumentInsertion('testdb');
// 基本批量插入
$documents = [
['name' => 'Alice', 'email' => 'alice@example.com', 'age' => 28],
['name' => 'Bob', 'email' => 'bob@example.com', 'age' => 32],
['name' => 'Charlie', 'email' => 'charlie@example.com', 'age' => 45],
['name' => 'Diana', 'email' => 'diana@example.com', 'age' => 29],
['name' => 'Eve', 'email' => 'eve@example.com', 'age' => 38]
];
$batchResult = $batchInsertion->insertBatchDocuments('users', $documents);
print_r($batchResult);
// 带进度的批量插入
$largeDocuments = [];
for ($i = 0; $i < 10000; $i++) {
$largeDocuments[] = [
'name' => "User {$i}",
'email' => "user{$i}@example.com",
'age' => rand(18, 65)
];
}
$progressCallback = function($batchResult, $currentBatch, $totalBatches) {
echo "Batch {$currentBatch}/{$totalBatches}: ";
echo "Inserted {$batchResult['inserted_count']} documents, ";
echo "Status: {$batchResult['status']}\n";
};
$progressResult = $batchInsertion->insertBatchWithProgress(
'users',
$largeDocuments,
1000,
$progressCallback
);
print_r($progressResult);
// 有序vs无序批量插入
$duplicateDocuments = [
['name' => 'Test User 1', 'email' => 'test1@example.com'],
['name' => 'Test User 2', 'email' => 'test2@example.com'],
['name' => 'Test User 1', 'email' => 'test1@example.com'], // 重复
['name' => 'Test User 3', 'email' => 'test3@example.com']
];
$orderedResult = $batchInsertion->insertBatchWithOrderedOption('users', $duplicateDocuments, true);
print_r($orderedResult);
$unorderedResult = $batchInsertion->insertBatchWithOrderedOption('users', $duplicateDocuments, false);
print_r($unorderedResult);
// 批量插入性能基准测试
$benchmarkResults = $batchInsertion->benchmarkBatchInsertion('users', 10000);
print_r($benchmarkResults);
?>条件插入原理
使用updateOne()和$setOnInsert操作符实现条件插入:
php
<?php
// 条件插入详解
class ConditionalInsertion {
private $database;
public function __construct($databaseName) {
$client = new MongoDB\Client("mongodb://localhost:27017");
$this->database = $client->selectDatabase($databaseName);
}
public function insertIfNotExists($collectionName, $filter, $document) {
$collection = $this->database->selectCollection($collectionName);
try {
$result = $collection->updateOne(
$filter,
[
'$setOnInsert' => $document,
'$set' => [
'updated_at' => new MongoDB\BSON\UTCDateTime()
]
],
[
'upsert' => true
]
);
$wasInserted = $result->getUpsertedCount() > 0;
return [
'success' => true,
'inserted' => $wasInserted,
'updated' => $result->getModifiedCount() > 0,
'upserted_id' => $result->getUpsertedId(),
'message' => $wasInserted ?
'Document inserted' :
'Document already exists, updated timestamp'
];
} catch (Exception $e) {
return [
'success' => false,
'error' => 'Conditional insertion failed',
'message' => $e->getMessage()
];
}
}
public function insertOrUpdate($collectionName, $filter, $updateDocument, $insertDocument) {
$collection = $this->database->selectCollection($collectionName);
try {
$result = $collection->updateOne(
$filter,
[
'$set' => $updateDocument,
'$setOnInsert' => $insertDocument
],
[
'upsert' => true
]
);
$wasInserted = $result->getUpsertedCount() > 0;
$wasUpdated = $result->getModifiedCount() > 0;
return [
'success' => true,
'inserted' => $wasInserted,
'updated' => $wasUpdated,
'upserted_id' => $result->getUpsertedId(),
'message' => $wasInserted ?
'Document inserted' :
($wasUpdated ? 'Document updated' : 'No change needed')
];
} catch (Exception $e) {
return [
'success' => false,
'error' => 'Insert or update failed',
'message' => $e->getMessage()
];
}
}
public function bulkConditionalInsert($collectionName, $operations) {
$collection = $this->database->selectCollection($collectionName);
$bulkOperations = [];
foreach ($operations as $operation) {
$bulkOperations[] = new MongoDB\Driver\BulkWrite(
[
'updateOne' => [
$operation['filter'],
[
'$setOnInsert' => $operation['document'],
'$set' => [
'updated_at' => new MongoDB\BSON\UTCDateTime()
]
],
['upsert' => true]
]
]
);
}
try {
$result = $collection->bulkWrite($bulkOperations);
return [
'success' => true,
'inserted_count' => $result->getInsertedCount(),
'upserted_count' => $result->getUpsertedCount(),
'modified_count' => $result->getModifiedCount(),
'upserted_ids' => $result->getUpsertedIds()
];
} catch (Exception $e) {
return [
'success' => false,
'error' => 'Bulk conditional insertion failed',
'message' => $e->getMessage()
];
}
}
public function insertWithRetry($collectionName, $filter, $document, $maxRetries = 3) {
$collection = $this->database->selectCollection($collectionName);
$attempt = 0;
$lastError = null;
while ($attempt < $maxRetries) {
$attempt++;
try {
$result = $collection->updateOne(
$filter,
[
'$setOnInsert' => $document,
'$set' => [
'updated_at' => new MongoDB\BSON\UTCDateTime()
]
],
[
'upsert' => true
]
);
$wasInserted = $result->getUpsertedCount() > 0;
return [
'success' => true,
'inserted' => $wasInserted,
'attempts' => $attempt,
'upserted_id' => $result->getUpsertedId()
];
} catch (MongoDB\Driver\Exception\BulkWriteException $e) {
$lastError = $e;
// 检查是否是重复键错误
$writeErrors = $e->getWriteResult()->getWriteErrors();
$isDuplicateKey = false;
foreach ($writeErrors as $error) {
if ($error->getCode() === 11000) {
$isDuplicateKey = true;
break;
}
}
if (!$isDuplicateKey) {
break; // 非重复键错误,不再重试
}
// 等待一段时间后重试
usleep(rand(100000, 500000)); // 100-500ms
} catch (Exception $e) {
$lastError = $e;
break;
}
}
return [
'success' => false,
'error' => 'Insertion failed after retries',
'attempts' => $attempt,
'max_retries' => $maxRetries,
'last_error' => $lastError ? $lastError->getMessage() : 'Unknown error'
];
}
}
// 使用示例
$conditionalInsertion = new ConditionalInsertion('testdb');
// 条件插入 - 如果不存在则插入
$filter = ['email' => 'unique@example.com'];
$document = [
'name' => 'Unique User',
'email' => 'unique@example.com',
'age' => 30,
'created_at' => new MongoDB\BSON\UTCDateTime()
];
$conditionalResult = $conditionalInsertion->insertIfNotExists('users', $filter, $document);
print_r($conditionalResult);
// 再次尝试插入相同的文档
$conditionalResult2 = $conditionalInsertion->insertIfNotExists('users', $filter, $document);
print_r($conditionalResult2);
// 插入或更新
$updateDocument = [
'last_login' => new MongoDB\BSON\UTCDateTime(),
'login_count' => 1
];
$insertDocument = [
'name' => 'New User',
'email' => 'newuser@example.com',
'created_at' => new MongoDB\BSON\UTCDateTime()
};
$upsertResult = $conditionalInsertion->insertOrUpdate(
'users',
['email' => 'newuser@example.com'],
$updateDocument,
$insertDocument
);
print_r($upsertResult);
// 批量条件插入
$bulkOperations = [
[
'filter' => ['email' => 'bulk1@example.com'],
'document' => ['name' => 'Bulk User 1', 'email' => 'bulk1@example.com']
],
[
'filter' => ['email' => 'bulk2@example.com'],
'document' => ['name' => 'Bulk User 2', 'email' => 'bulk2@example.com']
],
[
'filter' => ['email' => 'bulk3@example.com'],
'document' => ['name' => 'Bulk User 3', 'email' => 'bulk3@example.com']
]
];
$bulkConditionalResult = $conditionalInsertion->bulkConditionalInsert('users', $bulkOperations);
print_r($bulkConditionalResult);
// 带重试的插入
$retryResult = $conditionalInsertion->insertWithRetry(
'users',
['email' => 'retry@example.com'],
['name' => 'Retry User', 'email' => 'retry@example.com'],
3
);
print_r($retryResult);
?>常见错误与踩坑点
错误1:重复键错误
问题描述:插入具有重复_id或唯一索引字段的文档。
php
<?php
// 错误示例 - 重复键错误
try {
$client = new MongoDB\Client("mongodb://localhost:27017");
$database = $client->selectDatabase("testdb");
$collection = $database->selectCollection('users');
// 创建唯一索引
$collection->createIndex(['email' => 1], ['unique' => true]);
// 插入第一个文档
$collection->insertOne([
'name' => 'John Doe',
'email' => 'john@example.com'
]);
// 错误:插入具有重复email的文档
$collection->insertOne([
'name' => 'Jane Doe',
'email' => 'john@example.com'
]);
} catch (MongoDB\Driver\Exception\BulkWriteException $e) {
echo "错误: " . $e->getMessage() . "\n";
echo "错误代码: " . $e->getCode() . "\n";
}
// 正确示例 - 处理重复键错误
$client = new MongoDB\Client("mongodb://localhost:27017");
$database = $client->selectDatabase("testdb");
$collection = $database->selectCollection('users');
try {
$result = $collection->insertOne([
'name' => 'Jane Doe',
'email' => 'jane@example.com'
]);
echo "Document inserted successfully\n";
} catch (MongoDB\Driver\Exception\BulkWriteException $e) {
$writeErrors = $e->getWriteResult()->getWriteErrors();
foreach ($writeErrors as $error) {
if ($error->getCode() === 11000) {
echo "Duplicate key error: Email already exists\n";
// 更新现有文档
$collection->updateOne(
['email' => 'jane@example.com'],
['$set' => ['name' => 'Jane Doe Updated']]
);
echo "Existing document updated\n";
} else {
echo "Other error: " . $error->getMessage() . "\n";
}
}
}
?>错误2:文档大小超限
问题描述:插入超过16MB大小限制的文档。
php
<?php
// 错误示例 - 文档大小超限
try {
$client = new MongoDB\Client("mongodb://localhost:27017");
$database = $client->selectDatabase("testdb");
$collection = $database->selectCollection('large_documents');
// 错误:创建过大的文档
$largeDocument = [
'name' => 'Large Document',
'data' => str_repeat('x', 20 * 1024 * 1024) // 20MB
];
$collection->insertOne($largeDocument);
} catch (MongoDB\Driver\Exception\BulkWriteException $e) {
echo "错误: " . $e->getMessage() . "\n";
}
// 正确示例 - 使用GridFS处理大文件
$client = new MongoDB\Client("mongodb://localhost:27017");
$database = $client->selectDatabase("testdb");
// 使用GridFS存储大文件
$gridFS = new MongoDB\GridFS\Bucket($database);
$largeData = str_repeat('x', 20 * 1024 * 1024); // 20MB
$stream = $gridFS->openUploadStream('large_file.txt');
fwrite($stream, $largeData);
fclose($stream);
echo "Large file stored in GridFS successfully\n";
// 或者分割文档
$collection = $database->selectCollection('split_documents');
$chunkSize = 10 * 1024 * 1024; // 10MB chunks
$chunks = str_split($largeData, $chunkSize);
foreach ($chunks as $index => $chunk) {
$collection->insertOne([
'chunk_index' => $index,
'data' => $chunk,
'total_chunks' => count($chunks)
]);
}
echo "Large document split into " . count($chunks) . " chunks\n";
?>错误3:字段名不符合规范
问题描述:使用不符合BSON规范的字段名导致插入失败。
php
<?php
// 错误示例 - 字段名不符合规范
try {
$client = new MongoDB\Client("mongodb://localhost:27017");
$database = $client->selectDatabase("testdb");
$collection = $database->selectCollection('invalid_fields');
// 错误:使用不符合规范的字段名
$invalidDocument = [
'field with spaces' => 'value1',
'field.with.dots' => 'value2',
'field$with$dollar' => 'value3',
'' => 'empty field name'
];
$collection->insertOne($invalidDocument);
} catch (Exception $e) {
echo "错误: " . $e->getMessage() . "\n";
}
// 正确示例 - 使用规范字段名
$client = new MongoDB\Client("mongodb://localhost:27017");
$database = $client->selectDatabase("testdb");
$collection = $database->selectCollection('valid_fields');
$validDocument = [
'field_with_underscores' => 'value1',
'fieldWithCamelCase' => 'value2',
'field_with_numbers_123' => 'value3',
'normal_field' => 'value4'
];
$result = $collection->insertOne($validDocument);
echo "Document with valid field names inserted successfully\n";
echo "Inserted ID: " . $result->getInsertedId() . "\n";
?>常见应用场景
场景1:用户注册系统
实现用户注册功能,包含数据验证和条件插入:
php
<?php
// 用户注册系统
class UserRegistrationSystem {
private $database;
public function __construct($databaseName) {
$client = new MongoDB\Client("mongodb://localhost:27017");
$this->database = $client->selectDatabase($databaseName);
}
public function registerUser($userData) {
$collection = $this->database->selectCollection('users');
// 验证用户数据
$validationResult = $this->validateUserData($userData);
if (!$validationResult['valid']) {
return [
'success' => false,
'error' => 'Validation failed',
'validation_errors' => $validationResult['errors']
];
}
// 检查用户是否已存在
$existingUser = $collection->findOne(['email' => $userData['email']]);
if ($existingUser) {
return [
'success' => false,
'error' => 'User already exists',
'message' => 'Email already registered'
];
}
// 准备用户文档
$userDocument = [
'email' => $userData['email'],
'password_hash' => password_hash($userData['password'], PASSWORD_DEFAULT),
'name' => $userData['name'],
'status' => 'active',
'role' => $userData['role'] ?? 'user',
'profile' => [
'avatar' => $userData['avatar'] ?? null,
'bio' => $userData['bio'] ?? ''
],
'preferences' => [
'theme' => $userData['preferences']['theme'] ?? 'light',
'language' => $userData['preferences']['language'] ?? 'en',
'notifications' => $userData['preferences']['notifications'] ?? true
],
'metadata' => [
'created_at' => new MongoDB\BSON\UTCDateTime(),
'updated_at' => new MongoDB\BSON\UTCDateTime(),
'last_login' => null,
'login_count' => 0,
'registration_ip' => $userData['ip_address'] ?? 'unknown',
'user_agent' => $userData['user_agent'] ?? 'unknown'
]
];
try {
$result = $collection->insertOne($userDocument);
return [
'success' => true,
'user_id' => $result->getInsertedId(),
'message' => 'User registered successfully'
];
} catch (Exception $e) {
return [
'success' => false,
'error' => 'Registration failed',
'message' => $e->getMessage()
];
}
}
public function batchRegisterUsers($usersData) {
$collection = $this->database->selectCollection('users');
$results = [
'total' => count($usersData),
'successful' => 0,
'failed' => 0,
'details' => []
];
foreach ($usersData as $index => $userData) {
$registrationResult = $this->registerUser($userData);
if ($registrationResult['success']) {
$results['successful']++;
} else {
$results['failed']++;
}
$results['details'][] = [
'index' => $index,
'email' => $userData['email'],
'result' => $registrationResult
];
}
return $results;
}
private function validateUserData($userData) {
$errors = [];
// 必填字段验证
if (empty($userData['email'])) {
$errors[] = 'Email is required';
} elseif (!filter_var($userData['email'], FILTER_VALIDATE_EMAIL)) {
$errors[] = 'Invalid email format';
}
if (empty($userData['password'])) {
$errors[] = 'Password is required';
} elseif (strlen($userData['password']) < 8) {
$errors[] = 'Password must be at least 8 characters';
}
if (empty($userData['name'])) {
$errors[] = 'Name is required';
} elseif (strlen($userData['name']) < 2) {
$errors[] = 'Name must be at least 2 characters';
}
return [
'valid' => empty($errors),
'errors' => $errors
];
}
}
// 使用示例
$userRegistration = new UserRegistrationSystem('user_db');
// 单用户注册
$userData = [
'email' => 'newuser@example.com',
'password' => 'securepassword123',
'name' => 'New User',
'role' => 'user',
'avatar' => 'https://example.com/avatars/newuser.jpg',
'bio' => 'A new user',
'preferences' => [
'theme' => 'dark',
'language' => 'en',
'notifications' => true
],
'ip_address' => '192.168.1.100',
'user_agent' => 'Mozilla/5.0'
];
$registrationResult = $userRegistration->registerUser($userData);
print_r($registrationResult);
// 批量用户注册
$usersData = [
[
'email' => 'user1@example.com',
'password' => 'password123',
'name' => 'User One'
],
[
'email' => 'user2@example.com',
'password' => 'password456',
'name' => 'User Two'
],
[
'email' => 'user3@example.com',
'password' => 'password789',
'name' => 'User Three'
]
];
$batchResult = $userRegistration->batchRegisterUsers($usersData);
print_r($batchResult);
?>场景2:日志收集系统
实现高性能的日志收集和存储:
php
<?php
// 日志收集系统
class LogCollectionSystem {
private $database;
public function __construct($databaseName) {
$client = new MongoDB\Client("mongodb://localhost:27017");
$this->database = $client->selectDatabase($databaseName);
}
public function initializeLogCollections() {
$logLevels = ['debug', 'info', 'warning', 'error', 'critical'];
foreach ($logLevels as $level) {
$collectionName = 'logs_' . $level;
try {
// 创建固定集合
$this->database->createCollection($collectionName, [
'capped' => true,
'size' => 1024 * 1024 * 100, // 100MB
'max' => 100000
]);
$collection = $this->database->selectCollection($collectionName);
// 创建索引
$collection->createIndex(['timestamp' => -1]);
$collection->createIndex(['level' => 1, 'timestamp' => -1]);
$collection->createIndex(['service' => 1, 'timestamp' => -1]);
} catch (Exception $e) {
// 集合可能已存在
}
}
}
public function log($level, $message, $context = []) {
$collectionName = 'logs_' . $level;
$collection = $this->database->selectCollection($collectionName);
$logEntry = [
'timestamp' => new MongoDB\BSON\UTCDateTime(),
'level' => $level,
'message' => $message,
'context' => $context,
'service' => $context['service'] ?? 'unknown',
'environment' => $context['environment'] ?? 'production',
'host' => gethostname(),
'process_id' => getmypid()
];
try {
$result = $collection->insertOne($logEntry);
return [
'success' => true,
'log_id' => $result->getInsertedId(),
'level' => $level
];
} catch (Exception $e) {
return [
'success' => false,
'error' => 'Log insertion failed',
'message' => $e->getMessage()
];
}
}
public function batchLog($logEntries) {
$results = [
'total' => count($logEntries),
'successful' => 0,
'failed' => 0,
'details' => []
];
// 按日志级别分组
$groupedLogs = [];
foreach ($logEntries as $logEntry) {
$level = $logEntry['level'];
if (!isset($groupedLogs[$level])) {
$groupedLogs[$level] = [];
}
$groupedLogs[$level][] = $logEntry;
}
// 批量插入每个级别的日志
foreach ($groupedLogs as $level => $entries) {
$collectionName = 'logs_' . $level;
$collection = $this->database->selectCollection($collectionName);
try {
$result = $collection->insertMany($entries);
$results['successful'] += $result->getInsertedCount();
} catch (Exception $e) {
$results['failed'] += count($entries);
}
}
return $results;
}
public function queryLogs($level, $filters = [], $limit = 100) {
$collectionName = 'logs_' . $level;
$collection = $this->database->selectCollection($collectionName);
$query = [];
if (!empty($filters)) {
$query = $filters;
}
try {
$cursor = $collection->find($query, [
'sort' => ['timestamp' => -1],
'limit' => $limit
]);
return [
'success' => true,
'logs' => iterator_to_array($cursor),
'count' => iterator_count($cursor)
];
} catch (Exception $e) {
return [
'success' => false,
'error' => 'Log query failed',
'message' => $e->getMessage()
];
}
}
}
// 使用示例
$logSystem = new LogCollectionSystem('log_db');
// 初始化日志集合
$logSystem->initializeLogCollections();
// 记录不同级别的日志
$logSystem->log('info', 'Application started', [
'service' => 'web_app',
'environment' => 'production'
]);
$logSystem->log('debug', 'Processing request', [
'service' => 'api',
'request_id' => 'req_001',
'endpoint' => '/api/users'
]);
$logSystem->log('warning', 'High memory usage', [
'service' => 'worker',
'memory_usage' => '85%',
'threshold' => '80%'
]);
$logSystem->log('error', 'Database connection failed', [
'service' => 'database',
'error_code' => 'CONNECTION_TIMEOUT',
'retry_count' => 3
]);
// 批量记录日志
$batchLogs = [];
for ($i = 0; $i < 100; $i++) {
$batchLogs[] = [
'timestamp' => new MongoDB\BSON\UTCDateTime(),
'level' => 'info',
'message' => "Batch log entry {$i}",
'context' => [
'service' => 'batch_processor',
'batch_id' => 'batch_001'
]
];
}
$batchResult = $logSystem->batchLog($batchLogs);
print_r($batchResult);
// 查询日志
$queryResult = $logSystem->queryLogs('error', [], 10);
print_r($queryResult);
?>常见问题答疑
问题1:如何选择单文档插入还是批量插入?
回答:根据数据量和性能需求选择:
php
<?php
// 插入策略选择助手
class InsertionStrategySelector {
public static function selectStrategy($documentCount, $documentSize, $performanceRequirements = []) {
$recommendations = [];
// 根据文档数量选择
if ($documentCount === 1) {
$recommendations[] = [
'strategy' => 'insertOne',
'reason' => 'Single document insertion',
'performance' => 'Optimal for single document'
];
} elseif ($documentCount <= 100) {
$recommendations[] = [
'strategy' => 'insertMany',
'reason' => 'Small batch',
'performance' => 'Good performance for small batches'
];
} elseif ($documentCount <= 10000) {
$recommendations[] = [
'strategy' => 'insertMany with batching',
'batch_size' => 1000,
'reason' => 'Medium batch with optimal batch size',
'performance' => 'Best performance for medium batches'
];
} else {
$recommendations[] = [
'strategy' => 'bulkWrite with batching',
'batch_size' => 5000,
'reason' => 'Large batch with optimal batch size',
'performance' => 'Best performance for large batches'
];
}
// 根据文档大小调整
if ($documentSize > 1024 * 1024) { // > 1MB
$recommendations[] = [
'warning' => 'Large documents',
'suggestion' => 'Consider using GridFS for files > 16MB',
'batch_size_adjustment' => 'Reduce batch size for large documents'
];
}
// 根据性能要求调整
if (isset($performanceRequirements['throughput']) && $performanceRequirements['throughput'] === 'high') {
$recommendations[] = [
'optimization' => 'Use unordered inserts for higher throughput',
'tradeoff' => 'No guarantee of insertion order'
];
}
if (isset($performanceRequirements['reliability']) && $performanceRequirements['reliability'] === 'high') {
$recommendations[] = [
'optimization' => 'Use ordered inserts for error detection',
'tradeoff' => 'Slower throughput on errors'
];
}
return $recommendations;
}
}
// 使用示例
$strategies = InsertionStrategySelector::selectStrategy(
5000,
10240, // 10KB
['throughput' => 'high', 'reliability' => 'medium']
);
print_r($strategies);
?>实战练习
练习1:实现产品数据导入
实现一个产品数据导入系统,支持批量插入和错误处理:
php
<?php
// 产品数据导入系统
class ProductDataImport {
private $database;
public function __construct($databaseName) {
$client = new MongoDB\Client("mongodb://localhost:27017");
$this->database = $client->selectDatabase($databaseName);
}
public function importProducts($productsData, $options = []) {
$collection = $this->database->selectCollection('products');
$batchSize = $options['batch_size'] ?? 1000;
$continueOnError = $options['continue_on_error'] ?? true;
$updateExisting = $options['update_existing'] ?? false;
$results = [
'total_products' => count($productsData),
'imported' => 0,
'updated' => 0,
'failed' => 0,
'errors' => []
];
$batches = array_chunk($productsData, $batchSize);
foreach ($batches as $batchIndex => $batch) {
foreach ($batch as $productData) {
try {
if ($updateExisting) {
// 更新或插入
$result = $collection->updateOne(
['sku' => $productData['sku']],
[
'$set' => $productData,
'$setOnInsert' => [
'created_at' => new MongoDB\BSON\UTCDateTime()
]
],
['upsert' => true]
);
if ($result->getUpsertedCount() > 0) {
$results['imported']++;
} else {
$results['updated']++;
}
} else {
// 仅插入新记录
$result = $collection->insertOne($productData);
$results['imported']++;
}
} catch (MongoDB\Driver\Exception\BulkWriteException $e) {
$results['failed']++;
if (!$continueOnError) {
throw $e;
}
$results['errors'][] = [
'sku' => $productData['sku'],
'error' => $e->getMessage()
];
} catch (Exception $e) {
$results['failed']++;
$results['errors'][] = [
'sku' => $productData['sku'],
'error' => $e->getMessage()
];
}
}
}
return $results;
}
public function validateProductData($productData) {
$errors = [];
if (empty($productData['sku'])) {
$errors[] = 'SKU is required';
}
if (empty($productData['name'])) {
$errors[] = 'Product name is required';
}
if (!isset($productData['price']) || $productData['price'] <= 0) {
$errors[] = 'Valid price is required';
}
return [
'valid' => empty($errors),
'errors' => $errors
];
}
}
// 使用示例
$productImport = new ProductDataImport('ecommerce_db');
// 准备产品数据
$productsData = [];
for ($i = 0; $i < 5000; $i++) {
$productsData[] = [
'sku' => 'PRD' . str_pad($i, 5, '0', STR_PAD_LEFT),
'name' => "Product {$i}",
'description' => "Description for product {$i}",
'price' => rand(10, 1000),
'category' => ['Electronics', 'Clothing', 'Books'][rand(0, 2)],
'stock' => rand(0, 100),
'created_at' => new MongoDB\BSON\UTCDateTime()
];
}
// 导入产品数据
$importResult = $productImport->importProducts($productsData, [
'batch_size' => 1000,
'continue_on_error' => true,
'update_existing' => false
]);
print_r($importResult);
?>知识点总结
核心概念
- 插入方法:
insertOne()、insertMany()、bulkWrite() - 文档ID:自动生成ObjectId或自定义ID
- 插入性能:受文档大小、索引数量、批量大小影响
- 错误处理:重复键错误、文档大小限制、字段名规范
最佳实践
- 批量插入:使用
insertMany()提高性能 - 错误处理:妥善处理重复键和其他错误
- 数据验证:插入前验证数据完整性
- 性能优化:选择合适的批量大小和插入选项
常见场景
- 用户注册:条件插入防止重复
- 日志收集:批量插入高性能日志
- 数据导入:批量导入大量数据
- 实时插入:单文档插入实时数据
拓展参考资料
官方文档
- MongoDB插入文档:https://docs.mongodb.com/manual/reference/method/db.collection.insertOne/
- 批量插入:https://docs.mongodb.com/manual/reference/method/db.collection.insertMany/
- 批量写入:https://docs.mongodb.com/manual/reference/method/db.collection.bulkWrite/
推荐阅读
- 《MongoDB性能优化》
- 《大规模数据导入实践》
- 《MongoDB错误处理指南》
在线资源
- MongoDB University插入操作课程
- MongoDB官方博客性能优化文章
- Stack Overflow MongoDB插入相关问题
