Skip to content

4.1 插入文档 (Create)

概述

插入文档是MongoDB中最基本的操作之一,用于向集合中添加新数据。MongoDB提供了多种插入文档的方法,包括单文档插入、批量插入、条件插入等。本章节将详细介绍MongoDB文档插入的各种方式、最佳实践以及性能优化技巧。

MongoDB的插入操作具有原子性、灵活性和高性能的特点。理解不同插入方法的适用场景和性能特性,对于构建高效的数据存储系统至关重要。

基本概念

插入操作类型

MongoDB支持以下插入操作类型:

  1. 单文档插入:使用insertOne()方法插入单个文档
  2. 批量插入:使用insertMany()方法插入多个文档
  3. 条件插入:使用updateOne()$setOnInsert操作符实现条件插入
  4. 批量写入:使用bulkWrite()方法执行混合的批量操作

文档ID生成

MongoDB文档的_id字段具有以下特点:

  • 唯一性:每个文档必须有唯一的_id
  • 自动生成:如果不提供_id,MongoDB会自动生成ObjectId
  • 自定义ID:可以自定义_id值,但必须确保唯一性
  • 不可变性_id字段创建后不能修改

插入性能因素

影响插入性能的主要因素:

  • 文档大小:较大的文档插入速度较慢
  • 索引数量:索引越多,插入速度越慢
  • 批量大小:适当的批量大小可以提高性能
  • 写入关注:不同的写入关注级别影响性能
  • 网络延迟:网络延迟影响插入响应时间

原理深度解析

单文档插入原理

单文档插入使用insertOne()方法,返回插入的文档ID:

php
<?php
// 单文档插入详解
class SingleDocumentInsertion {
    private $database;
    
    public function __construct($databaseName) {
        $client = new MongoDB\Client("mongodb://localhost:27017");
        $this->database = $client->selectDatabase($databaseName);
    }
    
    public function insertSingleDocument($collectionName, $document) {
        $collection = $this->database->selectCollection($collectionName);
        
        try {
            $result = $collection->insertOne($document);
            
            return [
                'success' => true,
                'inserted_id' => $result->getInsertedId(),
                'inserted_count' => $result->getInsertedCount(),
                'acknowledged' => $result->isAcknowledged()
            ];
            
        } catch (MongoDB\Driver\Exception\BulkWriteException $e) {
            return [
                'success' => false,
                'error' => 'Duplicate key error',
                'message' => $e->getMessage(),
                'write_errors' => $e->getWriteResult()->getWriteErrors()
            ];
        } catch (Exception $e) {
            return [
                'success' => false,
                'error' => 'Insertion failed',
                'message' => $e->getMessage()
            ];
        }
    }
    
    public function insertWithCustomId($collectionName, $customId, $document) {
        $collection = $this->database->selectCollection($collectionName);
        
        // 添加自定义ID
        $document['_id'] = $customId;
        
        try {
            $result = $collection->insertOne($document);
            
            return [
                'success' => true,
                'custom_id' => $customId,
                'inserted_id' => $result->getInsertedId(),
                'message' => 'Document inserted with custom ID'
            ];
            
        } catch (MongoDB\Driver\Exception\BulkWriteException $e) {
            return [
                'success' => false,
                'error' => 'Duplicate key error',
                'message' => 'Document with this ID already exists',
                'custom_id' => $customId
            ];
        }
    }
    
    public function insertWithValidation($collectionName, $document, $validationRules) {
        $collection = $this->database->selectCollection($collectionName);
        
        // 验证文档
        $validationResult = $this->validateDocument($document, $validationRules);
        
        if (!$validationResult['valid']) {
            return [
                'success' => false,
                'error' => 'Validation failed',
                'validation_errors' => $validationResult['errors']
            ];
        }
        
        try {
            $result = $collection->insertOne($document);
            
            return [
                'success' => true,
                'inserted_id' => $result->getInsertedId(),
                'validation_passed' => true
            ];
            
        } catch (Exception $e) {
            return [
                'success' => false,
                'error' => 'Insertion failed',
                'message' => $e->getMessage()
            ];
        }
    }
    
    private function validateDocument($document, $rules) {
        $errors = [];
        
        foreach ($rules as $field => $rule) {
            if (!isset($document[$field])) {
                if ($rule['required'] ?? false) {
                    $errors[] = "Field '{$field}' is required";
                }
                continue;
            }
            
            $value = $document[$field];
            
            // 类型验证
            if (isset($rule['type'])) {
                $expectedType = $rule['type'];
                $actualType = gettype($value);
                
                if ($actualType !== $expectedType) {
                    $errors[] = "Field '{$field}' should be {$expectedType}, got {$actualType}";
                }
            }
            
            // 长度验证
            if (isset($rule['min_length']) && strlen($value) < $rule['min_length']) {
                $errors[] = "Field '{$field}' should be at least {$rule['min_length']} characters";
            }
            
            if (isset($rule['max_length']) && strlen($value) > $rule['max_length']) {
                $errors[] = "Field '{$field}' should be at most {$rule['max_length']} characters";
            }
            
            // 值范围验证
            if (isset($rule['min']) && $value < $rule['min']) {
                $errors[] = "Field '{$field}' should be at least {$rule['min']}";
            }
            
            if (isset($rule['max']) && $value > $rule['max']) {
                $errors[] = "Field '{$field}' should be at most {$rule['max']}";
            }
        }
        
        return [
            'valid' => empty($errors),
            'errors' => $errors
        ];
    }
    
    public function insertWithMetadata($collectionName, $document, $metadata = []) {
        $collection = $this->database->selectCollection($collectionName);
        
        // 添加元数据
        $document['created_at'] = new MongoDB\BSON\UTCDateTime();
        $document['updated_at'] = new MongoDB\BSON\UTCDateTime();
        $document['version'] = 1;
        
        if (!empty($metadata)) {
            $document['metadata'] = $metadata;
        }
        
        try {
            $result = $collection->insertOne($document);
            
            return [
                'success' => true,
                'inserted_id' => $result->getInsertedId(),
                'metadata_added' => !empty($metadata)
            ];
            
        } catch (Exception $e) {
            return [
                'success' => false,
                'error' => 'Insertion failed',
                'message' => $e->getMessage()
            ];
        }
    }
}

// 使用示例
$singleInsertion = new SingleDocumentInsertion('testdb');

// 基本单文档插入
$document = [
    'name' => 'John Doe',
    'email' => 'john@example.com',
    'age' => 30,
    'active' => true
];

$result = $singleInsertion->insertSingleDocument('users', $document);
print_r($result);

// 使用自定义ID插入
$customId = 'user_001';
$customDocument = [
    'name' => 'Jane Smith',
    'email' => 'jane@example.com',
    'age' => 25
];

$customResult = $singleInsertion->insertWithCustomId('users', $customId, $customDocument);
print_r($customResult);

// 带验证的插入
$validationRules = [
    'name' => ['required' => true, 'type' => 'string', 'min_length' => 2, 'max_length' => 100],
    'email' => ['required' => true, 'type' => 'string'],
    'age' => ['required' => true, 'type' => 'integer', 'min' => 0, 'max' => 150]
];

$validatedDocument = [
    'name' => 'Bob Johnson',
    'email' => 'bob@example.com',
    'age' => 35
];

$validationResult = $singleInsertion->insertWithValidation('users', $validatedDocument, $validationRules);
print_r($validationResult);

// 带元数据的插入
$metadata = [
    'source' => 'web_application',
    'ip_address' => '192.168.1.1',
    'user_agent' => 'Mozilla/5.0'
];

$metadataResult = $singleInsertion->insertWithMetadata('users', $document, $metadata);
print_r($metadataResult);
?>

批量插入原理

批量插入使用insertMany()方法,可以高效插入多个文档:

php
<?php
// 批量插入详解
class BatchDocumentInsertion {
    private $database;
    
    public function __construct($databaseName) {
        $client = new MongoDB\Client("mongodb://localhost:27017");
        $this->database = $client->selectDatabase($databaseName);
    }
    
    public function insertBatchDocuments($collectionName, $documents, $options = []) {
        $collection = $this->database->selectCollection($collectionName);
        
        try {
            $result = $collection->insertMany($documents, $options);
            
            return [
                'success' => true,
                'inserted_count' => $result->getInsertedCount(),
                'inserted_ids' => $result->getInsertedIds(),
                'acknowledged' => $result->isAcknowledged()
            ];
            
        } catch (MongoDB\Driver\Exception\BulkWriteException $e) {
            $writeResult = $e->getWriteResult();
            
            return [
                'success' => false,
                'error' => 'Bulk write error',
                'inserted_count' => $writeResult->getInsertedCount(),
                'write_errors' => $writeResult->getWriteErrors(),
                'message' => $e->getMessage()
            ];
        } catch (Exception $e) {
            return [
                'success' => false,
                'error' => 'Batch insertion failed',
                'message' => $e->getMessage()
            ];
        }
    }
    
    public function insertBatchWithProgress($collectionName, $documents, $batchSize = 1000, $callback = null) {
        $collection = $this->database->selectCollection($collectionName);
        
        $totalDocuments = count($documents);
        $batches = array_chunk($documents, $batchSize);
        $totalBatches = count($batches);
        
        $results = [
            'total_documents' => $totalDocuments,
            'total_batches' => $totalBatches,
            'batch_size' => $batchSize,
            'inserted_count' => 0,
            'failed_batches' => 0,
            'batch_results' => []
        ];
        
        foreach ($batches as $batchIndex => $batch) {
            try {
                $result = $collection->insertMany($batch);
                
                $batchResult = [
                    'batch_number' => $batchIndex + 1,
                    'batch_size' => count($batch),
                    'inserted_count' => $result->getInsertedCount(),
                    'status' => 'success'
                ];
                
                $results['inserted_count'] += $result->getInsertedCount();
                
            } catch (MongoDB\Driver\Exception\BulkWriteException $e) {
                $writeResult = $e->getWriteResult();
                
                $batchResult = [
                    'batch_number' => $batchIndex + 1,
                    'batch_size' => count($batch),
                    'inserted_count' => $writeResult->getInsertedCount(),
                    'status' => 'partial_failure',
                    'errors' => $writeResult->getWriteErrors()
                ];
                
                $results['inserted_count'] += $writeResult->getInsertedCount();
                $results['failed_batches']++;
                
            } catch (Exception $e) {
                $batchResult = [
                    'batch_number' => $batchIndex + 1,
                    'batch_size' => count($batch),
                    'inserted_count' => 0,
                    'status' => 'failed',
                    'error' => $e->getMessage()
                ];
                
                $results['failed_batches']++;
            }
            
            $results['batch_results'][] = $batchResult;
            
            // 调用回调函数
            if ($callback && is_callable($callback)) {
                $callback($batchResult, $batchIndex + 1, $totalBatches);
            }
        }
        
        return $results;
    }
    
    public function insertBatchWithOrderedOption($collectionName, $documents, $ordered = true) {
        $collection = $this->database->selectCollection($collectionName);
        
        try {
            $result = $collection->insertMany($documents, ['ordered' => $ordered]);
            
            return [
                'success' => true,
                'ordered' => $ordered,
                'inserted_count' => $result->getInsertedCount(),
                'inserted_ids' => $result->getInsertedIds(),
                'message' => $ordered ? 
                    'All documents inserted in order' : 
                    'Documents inserted without order guarantee'
            ];
            
        } catch (MongoDB\Driver\Exception\BulkWriteException $e) {
            $writeResult = $e->getWriteResult();
            
            return [
                'success' => false,
                'ordered' => $ordered,
                'inserted_count' => $writeResult->getInsertedCount(),
                'write_errors' => $writeResult->getWriteErrors(),
                'message' => $ordered ? 
                    'Insertion stopped at first error' : 
                    'Some documents failed, others inserted'
            ];
        }
    }
    
    public function benchmarkBatchInsertion($collectionName, $documentCount, $batchSizes = [100, 500, 1000, 5000]) {
        $collection = $this->database->selectCollection($collectionName);
        
        $benchmarkResults = [];
        
        foreach ($batchSizes as $batchSize) {
            $documents = [];
            for ($i = 0; $i < $documentCount; $i++) {
                $documents[] = [
                    'name' => "User {$i}",
                    'email' => "user{$i}@example.com",
                    'age' => rand(18, 65),
                    'batch_size_test' => $batchSize
                ];
            }
            
            $startTime = microtime(true);
            
            try {
                $result = $collection->insertMany($documents);
                $endTime = microtime(true);
                
                $executionTime = $endTime - $startTime;
                $throughput = $documentCount / $executionTime;
                
                $benchmarkResults[] = [
                    'batch_size' => $batchSize,
                    'document_count' => $documentCount,
                    'execution_time_seconds' => round($executionTime, 4),
                    'throughput_docs_per_sec' => round($throughput, 2),
                    'avg_time_per_doc_ms' => round(($executionTime / $documentCount) * 1000, 4),
                    'status' => 'success'
                ];
                
            } catch (Exception $e) {
                $benchmarkResults[] = [
                    'batch_size' => $batchSize,
                    'document_count' => $documentCount,
                    'status' => 'failed',
                    'error' => $e->getMessage()
                ];
            }
            
            // 清理测试数据
            $collection->deleteMany(['batch_size_test' => $batchSize]);
        }
        
        return $benchmarkResults;
    }
}

// 使用示例
$batchInsertion = new BatchDocumentInsertion('testdb');

// 基本批量插入
$documents = [
    ['name' => 'Alice', 'email' => 'alice@example.com', 'age' => 28],
    ['name' => 'Bob', 'email' => 'bob@example.com', 'age' => 32],
    ['name' => 'Charlie', 'email' => 'charlie@example.com', 'age' => 45],
    ['name' => 'Diana', 'email' => 'diana@example.com', 'age' => 29],
    ['name' => 'Eve', 'email' => 'eve@example.com', 'age' => 38]
];

$batchResult = $batchInsertion->insertBatchDocuments('users', $documents);
print_r($batchResult);

// 带进度的批量插入
$largeDocuments = [];
for ($i = 0; $i < 10000; $i++) {
    $largeDocuments[] = [
        'name' => "User {$i}",
        'email' => "user{$i}@example.com",
        'age' => rand(18, 65)
    ];
}

$progressCallback = function($batchResult, $currentBatch, $totalBatches) {
    echo "Batch {$currentBatch}/{$totalBatches}: ";
    echo "Inserted {$batchResult['inserted_count']} documents, ";
    echo "Status: {$batchResult['status']}\n";
};

$progressResult = $batchInsertion->insertBatchWithProgress(
    'users', 
    $largeDocuments, 
    1000, 
    $progressCallback
);
print_r($progressResult);

// 有序vs无序批量插入
$duplicateDocuments = [
    ['name' => 'Test User 1', 'email' => 'test1@example.com'],
    ['name' => 'Test User 2', 'email' => 'test2@example.com'],
    ['name' => 'Test User 1', 'email' => 'test1@example.com'], // 重复
    ['name' => 'Test User 3', 'email' => 'test3@example.com']
];

$orderedResult = $batchInsertion->insertBatchWithOrderedOption('users', $duplicateDocuments, true);
print_r($orderedResult);

$unorderedResult = $batchInsertion->insertBatchWithOrderedOption('users', $duplicateDocuments, false);
print_r($unorderedResult);

// 批量插入性能基准测试
$benchmarkResults = $batchInsertion->benchmarkBatchInsertion('users', 10000);
print_r($benchmarkResults);
?>

条件插入原理

使用updateOne()$setOnInsert操作符实现条件插入:

php
<?php
// 条件插入详解
class ConditionalInsertion {
    private $database;
    
    public function __construct($databaseName) {
        $client = new MongoDB\Client("mongodb://localhost:27017");
        $this->database = $client->selectDatabase($databaseName);
    }
    
    public function insertIfNotExists($collectionName, $filter, $document) {
        $collection = $this->database->selectCollection($collectionName);
        
        try {
            $result = $collection->updateOne(
                $filter,
                [
                    '$setOnInsert' => $document,
                    '$set' => [
                        'updated_at' => new MongoDB\BSON\UTCDateTime()
                    ]
                ],
                [
                    'upsert' => true
                ]
            );
            
            $wasInserted = $result->getUpsertedCount() > 0;
            
            return [
                'success' => true,
                'inserted' => $wasInserted,
                'updated' => $result->getModifiedCount() > 0,
                'upserted_id' => $result->getUpsertedId(),
                'message' => $wasInserted ? 
                    'Document inserted' : 
                    'Document already exists, updated timestamp'
            ];
            
        } catch (Exception $e) {
            return [
                'success' => false,
                'error' => 'Conditional insertion failed',
                'message' => $e->getMessage()
            ];
        }
    }
    
    public function insertOrUpdate($collectionName, $filter, $updateDocument, $insertDocument) {
        $collection = $this->database->selectCollection($collectionName);
        
        try {
            $result = $collection->updateOne(
                $filter,
                [
                    '$set' => $updateDocument,
                    '$setOnInsert' => $insertDocument
                ],
                [
                    'upsert' => true
                ]
            );
            
            $wasInserted = $result->getUpsertedCount() > 0;
            $wasUpdated = $result->getModifiedCount() > 0;
            
            return [
                'success' => true,
                'inserted' => $wasInserted,
                'updated' => $wasUpdated,
                'upserted_id' => $result->getUpsertedId(),
                'message' => $wasInserted ? 
                    'Document inserted' : 
                    ($wasUpdated ? 'Document updated' : 'No change needed')
            ];
            
        } catch (Exception $e) {
            return [
                'success' => false,
                'error' => 'Insert or update failed',
                'message' => $e->getMessage()
            ];
        }
    }
    
    public function bulkConditionalInsert($collectionName, $operations) {
        $collection = $this->database->selectCollection($collectionName);
        
        $bulkOperations = [];
        foreach ($operations as $operation) {
            $bulkOperations[] = new MongoDB\Driver\BulkWrite(
                [
                    'updateOne' => [
                        $operation['filter'],
                        [
                            '$setOnInsert' => $operation['document'],
                            '$set' => [
                                'updated_at' => new MongoDB\BSON\UTCDateTime()
                            ]
                        ],
                        ['upsert' => true]
                    ]
                ]
            );
        }
        
        try {
            $result = $collection->bulkWrite($bulkOperations);
            
            return [
                'success' => true,
                'inserted_count' => $result->getInsertedCount(),
                'upserted_count' => $result->getUpsertedCount(),
                'modified_count' => $result->getModifiedCount(),
                'upserted_ids' => $result->getUpsertedIds()
            ];
            
        } catch (Exception $e) {
            return [
                'success' => false,
                'error' => 'Bulk conditional insertion failed',
                'message' => $e->getMessage()
            ];
        }
    }
    
    public function insertWithRetry($collectionName, $filter, $document, $maxRetries = 3) {
        $collection = $this->database->selectCollection($collectionName);
        
        $attempt = 0;
        $lastError = null;
        
        while ($attempt < $maxRetries) {
            $attempt++;
            
            try {
                $result = $collection->updateOne(
                    $filter,
                    [
                        '$setOnInsert' => $document,
                        '$set' => [
                            'updated_at' => new MongoDB\BSON\UTCDateTime()
                        ]
                    ],
                    [
                        'upsert' => true
                    ]
                );
                
                $wasInserted = $result->getUpsertedCount() > 0;
                
                return [
                    'success' => true,
                    'inserted' => $wasInserted,
                    'attempts' => $attempt,
                    'upserted_id' => $result->getUpsertedId()
                ];
                
            } catch (MongoDB\Driver\Exception\BulkWriteException $e) {
                $lastError = $e;
                
                // 检查是否是重复键错误
                $writeErrors = $e->getWriteResult()->getWriteErrors();
                $isDuplicateKey = false;
                
                foreach ($writeErrors as $error) {
                    if ($error->getCode() === 11000) {
                        $isDuplicateKey = true;
                        break;
                    }
                }
                
                if (!$isDuplicateKey) {
                    break; // 非重复键错误,不再重试
                }
                
                // 等待一段时间后重试
                usleep(rand(100000, 500000)); // 100-500ms
                
            } catch (Exception $e) {
                $lastError = $e;
                break;
            }
        }
        
        return [
            'success' => false,
            'error' => 'Insertion failed after retries',
            'attempts' => $attempt,
            'max_retries' => $maxRetries,
            'last_error' => $lastError ? $lastError->getMessage() : 'Unknown error'
        ];
    }
}

// 使用示例
$conditionalInsertion = new ConditionalInsertion('testdb');

// 条件插入 - 如果不存在则插入
$filter = ['email' => 'unique@example.com'];
$document = [
    'name' => 'Unique User',
    'email' => 'unique@example.com',
    'age' => 30,
    'created_at' => new MongoDB\BSON\UTCDateTime()
];

$conditionalResult = $conditionalInsertion->insertIfNotExists('users', $filter, $document);
print_r($conditionalResult);

// 再次尝试插入相同的文档
$conditionalResult2 = $conditionalInsertion->insertIfNotExists('users', $filter, $document);
print_r($conditionalResult2);

// 插入或更新
$updateDocument = [
    'last_login' => new MongoDB\BSON\UTCDateTime(),
    'login_count' => 1
];

$insertDocument = [
    'name' => 'New User',
    'email' => 'newuser@example.com',
    'created_at' => new MongoDB\BSON\UTCDateTime()
};

$upsertResult = $conditionalInsertion->insertOrUpdate(
    'users',
    ['email' => 'newuser@example.com'],
    $updateDocument,
    $insertDocument
);
print_r($upsertResult);

// 批量条件插入
$bulkOperations = [
    [
        'filter' => ['email' => 'bulk1@example.com'],
        'document' => ['name' => 'Bulk User 1', 'email' => 'bulk1@example.com']
    ],
    [
        'filter' => ['email' => 'bulk2@example.com'],
        'document' => ['name' => 'Bulk User 2', 'email' => 'bulk2@example.com']
    ],
    [
        'filter' => ['email' => 'bulk3@example.com'],
        'document' => ['name' => 'Bulk User 3', 'email' => 'bulk3@example.com']
    ]
];

$bulkConditionalResult = $conditionalInsertion->bulkConditionalInsert('users', $bulkOperations);
print_r($bulkConditionalResult);

// 带重试的插入
$retryResult = $conditionalInsertion->insertWithRetry(
    'users',
    ['email' => 'retry@example.com'],
    ['name' => 'Retry User', 'email' => 'retry@example.com'],
    3
);
print_r($retryResult);
?>

常见错误与踩坑点

错误1:重复键错误

问题描述:插入具有重复_id或唯一索引字段的文档。

php
<?php
// 错误示例 - 重复键错误
try {
    $client = new MongoDB\Client("mongodb://localhost:27017");
    $database = $client->selectDatabase("testdb");
    $collection = $database->selectCollection('users');
    
    // 创建唯一索引
    $collection->createIndex(['email' => 1], ['unique' => true]);
    
    // 插入第一个文档
    $collection->insertOne([
        'name' => 'John Doe',
        'email' => 'john@example.com'
    ]);
    
    // 错误:插入具有重复email的文档
    $collection->insertOne([
        'name' => 'Jane Doe',
        'email' => 'john@example.com'
    ]);
    
} catch (MongoDB\Driver\Exception\BulkWriteException $e) {
    echo "错误: " . $e->getMessage() . "\n";
    echo "错误代码: " . $e->getCode() . "\n";
}

// 正确示例 - 处理重复键错误
$client = new MongoDB\Client("mongodb://localhost:27017");
$database = $client->selectDatabase("testdb");
$collection = $database->selectCollection('users');

try {
    $result = $collection->insertOne([
        'name' => 'Jane Doe',
        'email' => 'jane@example.com'
    ]);
    
    echo "Document inserted successfully\n";
    
} catch (MongoDB\Driver\Exception\BulkWriteException $e) {
    $writeErrors = $e->getWriteResult()->getWriteErrors();
    
    foreach ($writeErrors as $error) {
        if ($error->getCode() === 11000) {
            echo "Duplicate key error: Email already exists\n";
            
            // 更新现有文档
            $collection->updateOne(
                ['email' => 'jane@example.com'],
                ['$set' => ['name' => 'Jane Doe Updated']]
            );
            
            echo "Existing document updated\n";
        } else {
            echo "Other error: " . $error->getMessage() . "\n";
        }
    }
}
?>

错误2:文档大小超限

问题描述:插入超过16MB大小限制的文档。

php
<?php
// 错误示例 - 文档大小超限
try {
    $client = new MongoDB\Client("mongodb://localhost:27017");
    $database = $client->selectDatabase("testdb");
    $collection = $database->selectCollection('large_documents');
    
    // 错误:创建过大的文档
    $largeDocument = [
        'name' => 'Large Document',
        'data' => str_repeat('x', 20 * 1024 * 1024) // 20MB
    ];
    
    $collection->insertOne($largeDocument);
    
} catch (MongoDB\Driver\Exception\BulkWriteException $e) {
    echo "错误: " . $e->getMessage() . "\n";
}

// 正确示例 - 使用GridFS处理大文件
$client = new MongoDB\Client("mongodb://localhost:27017");
$database = $client->selectDatabase("testdb");

// 使用GridFS存储大文件
$gridFS = new MongoDB\GridFS\Bucket($database);

$largeData = str_repeat('x', 20 * 1024 * 1024); // 20MB
$stream = $gridFS->openUploadStream('large_file.txt');
fwrite($stream, $largeData);
fclose($stream);

echo "Large file stored in GridFS successfully\n";

// 或者分割文档
$collection = $database->selectCollection('split_documents');
$chunkSize = 10 * 1024 * 1024; // 10MB chunks
$chunks = str_split($largeData, $chunkSize);

foreach ($chunks as $index => $chunk) {
    $collection->insertOne([
        'chunk_index' => $index,
        'data' => $chunk,
        'total_chunks' => count($chunks)
    ]);
}

echo "Large document split into " . count($chunks) . " chunks\n";
?>

错误3:字段名不符合规范

问题描述:使用不符合BSON规范的字段名导致插入失败。

php
<?php
// 错误示例 - 字段名不符合规范
try {
    $client = new MongoDB\Client("mongodb://localhost:27017");
    $database = $client->selectDatabase("testdb");
    $collection = $database->selectCollection('invalid_fields');
    
    // 错误:使用不符合规范的字段名
    $invalidDocument = [
        'field with spaces' => 'value1',
        'field.with.dots' => 'value2',
        'field$with$dollar' => 'value3',
        '' => 'empty field name'
    ];
    
    $collection->insertOne($invalidDocument);
    
} catch (Exception $e) {
    echo "错误: " . $e->getMessage() . "\n";
}

// 正确示例 - 使用规范字段名
$client = new MongoDB\Client("mongodb://localhost:27017");
$database = $client->selectDatabase("testdb");
$collection = $database->selectCollection('valid_fields');

$validDocument = [
    'field_with_underscores' => 'value1',
    'fieldWithCamelCase' => 'value2',
    'field_with_numbers_123' => 'value3',
    'normal_field' => 'value4'
];

$result = $collection->insertOne($validDocument);
echo "Document with valid field names inserted successfully\n";
echo "Inserted ID: " . $result->getInsertedId() . "\n";
?>

常见应用场景

场景1:用户注册系统

实现用户注册功能,包含数据验证和条件插入:

php
<?php
// 用户注册系统
class UserRegistrationSystem {
    private $database;
    
    public function __construct($databaseName) {
        $client = new MongoDB\Client("mongodb://localhost:27017");
        $this->database = $client->selectDatabase($databaseName);
    }
    
    public function registerUser($userData) {
        $collection = $this->database->selectCollection('users');
        
        // 验证用户数据
        $validationResult = $this->validateUserData($userData);
        
        if (!$validationResult['valid']) {
            return [
                'success' => false,
                'error' => 'Validation failed',
                'validation_errors' => $validationResult['errors']
            ];
        }
        
        // 检查用户是否已存在
        $existingUser = $collection->findOne(['email' => $userData['email']]);
        
        if ($existingUser) {
            return [
                'success' => false,
                'error' => 'User already exists',
                'message' => 'Email already registered'
            ];
        }
        
        // 准备用户文档
        $userDocument = [
            'email' => $userData['email'],
            'password_hash' => password_hash($userData['password'], PASSWORD_DEFAULT),
            'name' => $userData['name'],
            'status' => 'active',
            'role' => $userData['role'] ?? 'user',
            'profile' => [
                'avatar' => $userData['avatar'] ?? null,
                'bio' => $userData['bio'] ?? ''
            ],
            'preferences' => [
                'theme' => $userData['preferences']['theme'] ?? 'light',
                'language' => $userData['preferences']['language'] ?? 'en',
                'notifications' => $userData['preferences']['notifications'] ?? true
            ],
            'metadata' => [
                'created_at' => new MongoDB\BSON\UTCDateTime(),
                'updated_at' => new MongoDB\BSON\UTCDateTime(),
                'last_login' => null,
                'login_count' => 0,
                'registration_ip' => $userData['ip_address'] ?? 'unknown',
                'user_agent' => $userData['user_agent'] ?? 'unknown'
            ]
        ];
        
        try {
            $result = $collection->insertOne($userDocument);
            
            return [
                'success' => true,
                'user_id' => $result->getInsertedId(),
                'message' => 'User registered successfully'
            ];
            
        } catch (Exception $e) {
            return [
                'success' => false,
                'error' => 'Registration failed',
                'message' => $e->getMessage()
            ];
        }
    }
    
    public function batchRegisterUsers($usersData) {
        $collection = $this->database->selectCollection('users');
        
        $results = [
            'total' => count($usersData),
            'successful' => 0,
            'failed' => 0,
            'details' => []
        ];
        
        foreach ($usersData as $index => $userData) {
            $registrationResult = $this->registerUser($userData);
            
            if ($registrationResult['success']) {
                $results['successful']++;
            } else {
                $results['failed']++;
            }
            
            $results['details'][] = [
                'index' => $index,
                'email' => $userData['email'],
                'result' => $registrationResult
            ];
        }
        
        return $results;
    }
    
    private function validateUserData($userData) {
        $errors = [];
        
        // 必填字段验证
        if (empty($userData['email'])) {
            $errors[] = 'Email is required';
        } elseif (!filter_var($userData['email'], FILTER_VALIDATE_EMAIL)) {
            $errors[] = 'Invalid email format';
        }
        
        if (empty($userData['password'])) {
            $errors[] = 'Password is required';
        } elseif (strlen($userData['password']) < 8) {
            $errors[] = 'Password must be at least 8 characters';
        }
        
        if (empty($userData['name'])) {
            $errors[] = 'Name is required';
        } elseif (strlen($userData['name']) < 2) {
            $errors[] = 'Name must be at least 2 characters';
        }
        
        return [
            'valid' => empty($errors),
            'errors' => $errors
        ];
    }
}

// 使用示例
$userRegistration = new UserRegistrationSystem('user_db');

// 单用户注册
$userData = [
    'email' => 'newuser@example.com',
    'password' => 'securepassword123',
    'name' => 'New User',
    'role' => 'user',
    'avatar' => 'https://example.com/avatars/newuser.jpg',
    'bio' => 'A new user',
    'preferences' => [
        'theme' => 'dark',
        'language' => 'en',
        'notifications' => true
    ],
    'ip_address' => '192.168.1.100',
    'user_agent' => 'Mozilla/5.0'
];

$registrationResult = $userRegistration->registerUser($userData);
print_r($registrationResult);

// 批量用户注册
$usersData = [
    [
        'email' => 'user1@example.com',
        'password' => 'password123',
        'name' => 'User One'
    ],
    [
        'email' => 'user2@example.com',
        'password' => 'password456',
        'name' => 'User Two'
    ],
    [
        'email' => 'user3@example.com',
        'password' => 'password789',
        'name' => 'User Three'
    ]
];

$batchResult = $userRegistration->batchRegisterUsers($usersData);
print_r($batchResult);
?>

场景2:日志收集系统

实现高性能的日志收集和存储:

php
<?php
// 日志收集系统
class LogCollectionSystem {
    private $database;
    
    public function __construct($databaseName) {
        $client = new MongoDB\Client("mongodb://localhost:27017");
        $this->database = $client->selectDatabase($databaseName);
    }
    
    public function initializeLogCollections() {
        $logLevels = ['debug', 'info', 'warning', 'error', 'critical'];
        
        foreach ($logLevels as $level) {
            $collectionName = 'logs_' . $level;
            
            try {
                // 创建固定集合
                $this->database->createCollection($collectionName, [
                    'capped' => true,
                    'size' => 1024 * 1024 * 100, // 100MB
                    'max' => 100000
                ]);
                
                $collection = $this->database->selectCollection($collectionName);
                
                // 创建索引
                $collection->createIndex(['timestamp' => -1]);
                $collection->createIndex(['level' => 1, 'timestamp' => -1]);
                $collection->createIndex(['service' => 1, 'timestamp' => -1]);
                
            } catch (Exception $e) {
                // 集合可能已存在
            }
        }
    }
    
    public function log($level, $message, $context = []) {
        $collectionName = 'logs_' . $level;
        $collection = $this->database->selectCollection($collectionName);
        
        $logEntry = [
            'timestamp' => new MongoDB\BSON\UTCDateTime(),
            'level' => $level,
            'message' => $message,
            'context' => $context,
            'service' => $context['service'] ?? 'unknown',
            'environment' => $context['environment'] ?? 'production',
            'host' => gethostname(),
            'process_id' => getmypid()
        ];
        
        try {
            $result = $collection->insertOne($logEntry);
            
            return [
                'success' => true,
                'log_id' => $result->getInsertedId(),
                'level' => $level
            ];
            
        } catch (Exception $e) {
            return [
                'success' => false,
                'error' => 'Log insertion failed',
                'message' => $e->getMessage()
            ];
        }
    }
    
    public function batchLog($logEntries) {
        $results = [
            'total' => count($logEntries),
            'successful' => 0,
            'failed' => 0,
            'details' => []
        ];
        
        // 按日志级别分组
        $groupedLogs = [];
        foreach ($logEntries as $logEntry) {
            $level = $logEntry['level'];
            if (!isset($groupedLogs[$level])) {
                $groupedLogs[$level] = [];
            }
            $groupedLogs[$level][] = $logEntry;
        }
        
        // 批量插入每个级别的日志
        foreach ($groupedLogs as $level => $entries) {
            $collectionName = 'logs_' . $level;
            $collection = $this->database->selectCollection($collectionName);
            
            try {
                $result = $collection->insertMany($entries);
                $results['successful'] += $result->getInsertedCount();
                
            } catch (Exception $e) {
                $results['failed'] += count($entries);
            }
        }
        
        return $results;
    }
    
    public function queryLogs($level, $filters = [], $limit = 100) {
        $collectionName = 'logs_' . $level;
        $collection = $this->database->selectCollection($collectionName);
        
        $query = [];
        if (!empty($filters)) {
            $query = $filters;
        }
        
        try {
            $cursor = $collection->find($query, [
                'sort' => ['timestamp' => -1],
                'limit' => $limit
            ]);
            
            return [
                'success' => true,
                'logs' => iterator_to_array($cursor),
                'count' => iterator_count($cursor)
            ];
            
        } catch (Exception $e) {
            return [
                'success' => false,
                'error' => 'Log query failed',
                'message' => $e->getMessage()
            ];
        }
    }
}

// 使用示例
$logSystem = new LogCollectionSystem('log_db');

// 初始化日志集合
$logSystem->initializeLogCollections();

// 记录不同级别的日志
$logSystem->log('info', 'Application started', [
    'service' => 'web_app',
    'environment' => 'production'
]);

$logSystem->log('debug', 'Processing request', [
    'service' => 'api',
    'request_id' => 'req_001',
    'endpoint' => '/api/users'
]);

$logSystem->log('warning', 'High memory usage', [
    'service' => 'worker',
    'memory_usage' => '85%',
    'threshold' => '80%'
]);

$logSystem->log('error', 'Database connection failed', [
    'service' => 'database',
    'error_code' => 'CONNECTION_TIMEOUT',
    'retry_count' => 3
]);

// 批量记录日志
$batchLogs = [];
for ($i = 0; $i < 100; $i++) {
    $batchLogs[] = [
        'timestamp' => new MongoDB\BSON\UTCDateTime(),
        'level' => 'info',
        'message' => "Batch log entry {$i}",
        'context' => [
            'service' => 'batch_processor',
            'batch_id' => 'batch_001'
        ]
    ];
}

$batchResult = $logSystem->batchLog($batchLogs);
print_r($batchResult);

// 查询日志
$queryResult = $logSystem->queryLogs('error', [], 10);
print_r($queryResult);
?>

常见问题答疑

问题1:如何选择单文档插入还是批量插入?

回答:根据数据量和性能需求选择:

php
<?php
// 插入策略选择助手
class InsertionStrategySelector {
    public static function selectStrategy($documentCount, $documentSize, $performanceRequirements = []) {
        $recommendations = [];
        
        // 根据文档数量选择
        if ($documentCount === 1) {
            $recommendations[] = [
                'strategy' => 'insertOne',
                'reason' => 'Single document insertion',
                'performance' => 'Optimal for single document'
            ];
        } elseif ($documentCount <= 100) {
            $recommendations[] = [
                'strategy' => 'insertMany',
                'reason' => 'Small batch',
                'performance' => 'Good performance for small batches'
            ];
        } elseif ($documentCount <= 10000) {
            $recommendations[] = [
                'strategy' => 'insertMany with batching',
                'batch_size' => 1000,
                'reason' => 'Medium batch with optimal batch size',
                'performance' => 'Best performance for medium batches'
            ];
        } else {
            $recommendations[] = [
                'strategy' => 'bulkWrite with batching',
                'batch_size' => 5000,
                'reason' => 'Large batch with optimal batch size',
                'performance' => 'Best performance for large batches'
            ];
        }
        
        // 根据文档大小调整
        if ($documentSize > 1024 * 1024) { // > 1MB
            $recommendations[] = [
                'warning' => 'Large documents',
                'suggestion' => 'Consider using GridFS for files > 16MB',
                'batch_size_adjustment' => 'Reduce batch size for large documents'
            ];
        }
        
        // 根据性能要求调整
        if (isset($performanceRequirements['throughput']) && $performanceRequirements['throughput'] === 'high') {
            $recommendations[] = [
                'optimization' => 'Use unordered inserts for higher throughput',
                'tradeoff' => 'No guarantee of insertion order'
            ];
        }
        
        if (isset($performanceRequirements['reliability']) && $performanceRequirements['reliability'] === 'high') {
            $recommendations[] = [
                'optimization' => 'Use ordered inserts for error detection',
                'tradeoff' => 'Slower throughput on errors'
            ];
        }
        
        return $recommendations;
    }
}

// 使用示例
$strategies = InsertionStrategySelector::selectStrategy(
    5000,
    10240, // 10KB
    ['throughput' => 'high', 'reliability' => 'medium']
);

print_r($strategies);
?>

实战练习

练习1:实现产品数据导入

实现一个产品数据导入系统,支持批量插入和错误处理:

php
<?php
// 产品数据导入系统
class ProductDataImport {
    private $database;
    
    public function __construct($databaseName) {
        $client = new MongoDB\Client("mongodb://localhost:27017");
        $this->database = $client->selectDatabase($databaseName);
    }
    
    public function importProducts($productsData, $options = []) {
        $collection = $this->database->selectCollection('products');
        
        $batchSize = $options['batch_size'] ?? 1000;
        $continueOnError = $options['continue_on_error'] ?? true;
        $updateExisting = $options['update_existing'] ?? false;
        
        $results = [
            'total_products' => count($productsData),
            'imported' => 0,
            'updated' => 0,
            'failed' => 0,
            'errors' => []
        ];
        
        $batches = array_chunk($productsData, $batchSize);
        
        foreach ($batches as $batchIndex => $batch) {
            foreach ($batch as $productData) {
                try {
                    if ($updateExisting) {
                        // 更新或插入
                        $result = $collection->updateOne(
                            ['sku' => $productData['sku']],
                            [
                                '$set' => $productData,
                                '$setOnInsert' => [
                                    'created_at' => new MongoDB\BSON\UTCDateTime()
                                ]
                            ],
                            ['upsert' => true]
                        );
                        
                        if ($result->getUpsertedCount() > 0) {
                            $results['imported']++;
                        } else {
                            $results['updated']++;
                        }
                        
                    } else {
                        // 仅插入新记录
                        $result = $collection->insertOne($productData);
                        $results['imported']++;
                    }
                    
                } catch (MongoDB\Driver\Exception\BulkWriteException $e) {
                    $results['failed']++;
                    
                    if (!$continueOnError) {
                        throw $e;
                    }
                    
                    $results['errors'][] = [
                        'sku' => $productData['sku'],
                        'error' => $e->getMessage()
                    ];
                    
                } catch (Exception $e) {
                    $results['failed']++;
                    $results['errors'][] = [
                        'sku' => $productData['sku'],
                        'error' => $e->getMessage()
                    ];
                }
            }
        }
        
        return $results;
    }
    
    public function validateProductData($productData) {
        $errors = [];
        
        if (empty($productData['sku'])) {
            $errors[] = 'SKU is required';
        }
        
        if (empty($productData['name'])) {
            $errors[] = 'Product name is required';
        }
        
        if (!isset($productData['price']) || $productData['price'] <= 0) {
            $errors[] = 'Valid price is required';
        }
        
        return [
            'valid' => empty($errors),
            'errors' => $errors
        ];
    }
}

// 使用示例
$productImport = new ProductDataImport('ecommerce_db');

// 准备产品数据
$productsData = [];
for ($i = 0; $i < 5000; $i++) {
    $productsData[] = [
        'sku' => 'PRD' . str_pad($i, 5, '0', STR_PAD_LEFT),
        'name' => "Product {$i}",
        'description' => "Description for product {$i}",
        'price' => rand(10, 1000),
        'category' => ['Electronics', 'Clothing', 'Books'][rand(0, 2)],
        'stock' => rand(0, 100),
        'created_at' => new MongoDB\BSON\UTCDateTime()
    ];
}

// 导入产品数据
$importResult = $productImport->importProducts($productsData, [
    'batch_size' => 1000,
    'continue_on_error' => true,
    'update_existing' => false
]);

print_r($importResult);
?>

知识点总结

核心概念

  1. 插入方法insertOne()insertMany()bulkWrite()
  2. 文档ID:自动生成ObjectId或自定义ID
  3. 插入性能:受文档大小、索引数量、批量大小影响
  4. 错误处理:重复键错误、文档大小限制、字段名规范

最佳实践

  1. 批量插入:使用insertMany()提高性能
  2. 错误处理:妥善处理重复键和其他错误
  3. 数据验证:插入前验证数据完整性
  4. 性能优化:选择合适的批量大小和插入选项

常见场景

  1. 用户注册:条件插入防止重复
  2. 日志收集:批量插入高性能日志
  3. 数据导入:批量导入大量数据
  4. 实时插入:单文档插入实时数据

拓展参考资料

官方文档

推荐阅读

  • 《MongoDB性能优化》
  • 《大规模数据导入实践》
  • 《MongoDB错误处理指南》

在线资源

  • MongoDB University插入操作课程
  • MongoDB官方博客性能优化文章
  • Stack Overflow MongoDB插入相关问题