Skip to content

4.5 批量操作

概述

批量操作是MongoDB中执行多个写入操作的高效方式,包括批量插入、批量更新、批量删除等。MongoDB的批量操作可以显著提高性能,减少网络往返次数,优化资源使用。本章节将详细介绍MongoDB批量操作的各种方法、最佳实践和性能优化技巧。

MongoDB的批量操作具有高性能、原子性和灵活性的特点。掌握批量操作对于处理大量数据迁移、数据导入、批量更新等场景至关重要。

基本概念

批量操作类型

MongoDB支持以下批量操作类型:

  1. 批量插入:使用insertMany()插入多个文档
  2. 批量更新:使用updateMany()更新多个文档
  3. 批量删除:使用deleteMany()删除多个文档
  4. 混合批量操作:使用bulkWrite()执行混合操作

批量操作优势

批量操作的主要优势:

  • 性能提升:减少网络往返次数
  • 原子性:操作在单个请求中执行
  • 资源优化:减少服务器资源消耗
  • 错误处理:统一的错误处理机制

批量操作限制

批量操作的注意事项:

  • 文档大小:单个文档最大16MB
  • 批量大小:建议每批1000-10000个操作
  • 内存使用:大批量操作可能消耗较多内存
  • 错误处理:需要妥善处理部分失败的情况

原理深度解析

批量插入原理

使用insertMany()方法批量插入文档:

php
<?php
// 批量插入详解
class BatchInsertOperations {
    private $database;
    
    public function __construct($databaseName) {
        $client = new MongoDB\Client("mongodb://localhost:27017");
        $this->database = $client->selectDatabase($databaseName);
    }
    
    public function batchInsert($collectionName, $documents, $options = []) {
        $collection = $this->database->selectCollection($collectionName);
        
        try {
            $result = $collection->insertMany($documents, $options);
            
            return [
                'success' => true,
                'inserted_count' => $result->getInsertedCount(),
                'inserted_ids' => $result->getInsertedIds(),
                'acknowledged' => $result->isAcknowledged()
            ];
            
        } catch (MongoDB\Driver\Exception\BulkWriteException $e) {
            $writeResult = $e->getWriteResult();
            
            return [
                'success' => false,
                'error' => 'Bulk insert error',
                'inserted_count' => $writeResult->getInsertedCount(),
                'write_errors' => $writeResult->getWriteErrors(),
                'message' => $e->getMessage()
            ];
        } catch (Exception $e) {
            return [
                'success' => false,
                'error' => 'Batch insert failed',
                'message' => $e->getMessage()
            ];
        }
    }
    
    public function batchInsertWithProgress($collectionName, $documents, $batchSize = 1000, $callback = null) {
        $collection = $this->database->selectCollection($collectionName);
        
        $totalDocuments = count($documents);
        $batches = array_chunk($documents, $batchSize);
        $totalBatches = count($batches);
        
        $results = [
            'total_documents' => $totalDocuments,
            'total_batches' => $totalBatches,
            'batch_size' => $batchSize,
            'inserted_count' => 0,
            'failed_batches' => 0,
            'batch_results' => []
        ];
        
        foreach ($batches as $batchIndex => $batch) {
            try {
                $result = $collection->insertMany($batch);
                
                $batchResult = [
                    'batch_number' => $batchIndex + 1,
                    'batch_size' => count($batch),
                    'inserted_count' => $result->getInsertedCount(),
                    'status' => 'success'
                ];
                
                $results['inserted_count'] += $result->getInsertedCount();
                
            } catch (MongoDB\Driver\Exception\BulkWriteException $e) {
                $writeResult = $e->getWriteResult();
                
                $batchResult = [
                    'batch_number' => $batchIndex + 1,
                    'batch_size' => count($batch),
                    'inserted_count' => $writeResult->getInsertedCount(),
                    'status' => 'partial_failure',
                    'errors' => $writeResult->getWriteErrors()
                ];
                
                $results['inserted_count'] += $writeResult->getInsertedCount();
                $results['failed_batches']++;
                
            } catch (Exception $e) {
                $batchResult = [
                    'batch_number' => $batchIndex + 1,
                    'batch_size' => count($batch),
                    'inserted_count' => 0,
                    'status' => 'failed',
                    'error' => $e->getMessage()
                ];
                
                $results['failed_batches']++;
            }
            
            $results['batch_results'][] = $batchResult;
            
            // 调用回调函数
            if ($callback && is_callable($callback)) {
                $callback($batchResult, $batchIndex + 1, $totalBatches);
            }
        }
        
        return $results;
    }
    
    public function batchInsertWithOrderedOption($collectionName, $documents, $ordered = true) {
        $collection = $this->database->selectCollection($collectionName);
        
        try {
            $result = $collection->insertMany($documents, ['ordered' => $ordered]);
            
            return [
                'success' => true,
                'ordered' => $ordered,
                'inserted_count' => $result->getInsertedCount(),
                'inserted_ids' => $result->getInsertedIds()
            ];
            
        } catch (MongoDB\Driver\Exception\BulkWriteException $e) {
            $writeResult = $e->getWriteResult();
            
            return [
                'success' => false,
                'ordered' => $ordered,
                'inserted_count' => $writeResult->getInsertedCount(),
                'write_errors' => $writeResult->getWriteErrors(),
                'message' => $ordered ? 
                    'Insertion stopped at first error' : 
                    'Some documents failed, others inserted'
            ];
        }
    }
    
    public function benchmarkBatchInsertion($collectionName, $documentCount, $batchSizes = [100, 500, 1000, 5000]) {
        $collection = $this->database->selectCollection($collectionName);
        
        $benchmarkResults = [];
        
        foreach ($batchSizes as $batchSize) {
            $documents = [];
            for ($i = 0; $i < $documentCount; $i++) {
                $documents[] = [
                    'name' => "User {$i}",
                    'email' => "user{$i}@example.com",
                    'age' => rand(18, 65),
                    'batch_size_test' => $batchSize
                ];
            }
            
            $startTime = microtime(true);
            
            try {
                $result = $collection->insertMany($documents);
                $endTime = microtime(true);
                
                $executionTime = $endTime - $startTime;
                $throughput = $documentCount / $executionTime;
                
                $benchmarkResults[] = [
                    'batch_size' => $batchSize,
                    'document_count' => $documentCount,
                    'execution_time_seconds' => round($executionTime, 4),
                    'throughput_docs_per_sec' => round($throughput, 2),
                    'avg_time_per_doc_ms' => round(($executionTime / $documentCount) * 1000, 4),
                    'status' => 'success'
                ];
                
            } catch (Exception $e) {
                $benchmarkResults[] = [
                    'batch_size' => $batchSize,
                    'document_count' => $documentCount,
                    'status' => 'failed',
                    'error' => $e->getMessage()
                ];
            }
            
            // 清理测试数据
            $collection->deleteMany(['batch_size_test' => $batchSize]);
        }
        
        return $benchmarkResults;
    }
}

// 使用示例
$batchInsertOps = new BatchInsertOperations('testdb');

// 基本批量插入
$documents = [];
for ($i = 0; $i < 100; $i++) {
    $documents[] = [
        'name' => "User {$i}",
        'email' => "user{$i}@example.com",
        'age' => rand(18, 65)
    ];
}

$insertResult = $batchInsertOps->batchInsert('users', $documents);
print_r($insertResult);

// 带进度的批量插入
$largeDocuments = [];
for ($i = 0; $i < 10000; $i++) {
    $largeDocuments[] = [
        'name' => "User {$i}",
        'email' => "user{$i}@example.com",
        'age' => rand(18, 65)
    ];
}

$progressCallback = function($batchResult, $currentBatch, $totalBatches) {
    echo "Batch {$currentBatch}/{$totalBatches}: ";
    echo "Inserted {$batchResult['inserted_count']} documents, ";
    echo "Status: {$batchResult['status']}\n";
};

$progressResult = $batchInsertOps->batchInsertWithProgress(
    'users', 
    $largeDocuments, 
    1000, 
    $progressCallback
);
print_r($progressResult);

// 有序vs无序批量插入
$duplicateDocuments = [
    ['name' => 'Test User 1', 'email' => 'test1@example.com'],
    ['name' => 'Test User 2', 'email' => 'test2@example.com'],
    ['name' => 'Test User 1', 'email' => 'test1@example.com'], // 重复
    ['name' => 'Test User 3', 'email' => 'test3@example.com']
];

$orderedResult = $batchInsertOps->batchInsertWithOrderedOption('users', $duplicateDocuments, true);
print_r($orderedResult);

$unorderedResult = $batchInsertOps->batchInsertWithOrderedOption('users', $duplicateDocuments, false);
print_r($unorderedResult);

// 批量插入性能基准测试
$benchmarkResults = $batchInsertOps->benchmarkBatchInsertion('users', 10000);
print_r($benchmarkResults);
?>

批量更新原理

使用updateMany()方法批量更新文档:

php
<?php
// 批量更新详解
class BatchUpdateOperations {
    private $database;
    
    public function __construct($databaseName) {
        $client = new MongoDB\Client("mongodb://localhost:27017");
        $this->database = $client->selectDatabase($databaseName);
    }
    
    public function batchUpdate($collectionName, $filter, $update, $options = []) {
        $collection = $this->database->selectCollection($collectionName);
        
        try {
            $result = $collection->updateMany($filter, $update, $options);
            
            return [
                'success' => true,
                'matched_count' => $result->getMatchedCount(),
                'modified_count' => $result->getModifiedCount(),
                'upserted_count' => $result->getUpsertedCount(),
                'upserted_id' => $result->getUpsertedId(),
                'acknowledged' => $result->isAcknowledged()
            ];
            
        } catch (Exception $e) {
            return [
                'success' => false,
                'error' => 'Batch update failed',
                'message' => $e->getMessage()
            ];
        }
    }
    
    public function batchUpdateWithProgress($collectionName, $filter, $update, $batchSize = 1000, $callback = null) {
        $collection = $this->database->selectCollection($collectionName);
        
        // 获取匹配的文档总数
        $totalMatched = $collection->countDocuments($filter);
        
        if ($totalMatched === 0) {
            return [
                'success' => true,
                'message' => 'No documents match the filter',
                'total_matched' => 0,
                'total_modified' => 0
            ];
        }
        
        $results = [
            'total_matched' => $totalMatched,
            'batch_size' => $batchSize,
            'total_batches' => ceil($totalMatched / $batchSize),
            'total_modified' => 0,
            'batch_results' => []
        ];
        
        $skip = 0;
        $batchNumber = 0;
        
        while ($skip < $totalMatched) {
            $batchNumber++;
            
            try {
                $result = $collection->updateMany($filter, $update, [
                    'skip' => $skip,
                    'limit' => $batchSize
                ]);
                
                $batchResult = [
                    'batch_number' => $batchNumber,
                    'matched_count' => $result->getMatchedCount(),
                    'modified_count' => $result->getModifiedCount(),
                    'status' => 'success'
                ];
                
                $results['total_modified'] += $result->getModifiedCount();
                
            } catch (Exception $e) {
                $batchResult = [
                    'batch_number' => $batchNumber,
                    'matched_count' => 0,
                    'modified_count' => 0,
                    'status' => 'failed',
                    'error' => $e->getMessage()
                ];
            }
            
            $results['batch_results'][] = $batchResult;
            
            // 调用回调函数
            if ($callback && is_callable($callback)) {
                $callback($batchResult, $batchNumber, $results['total_batches']);
            }
            
            $skip += $batchSize;
        }
        
        return $results;
    }
    
    public function batchUpdateWithArrayFilters($collectionName, $filter, $update, $arrayFilters) {
        $collection = $this->database->selectCollection($collectionName);
        
        try {
            $result = $collection->updateMany($filter, $update, [
                'arrayFilters' => $arrayFilters
            ]);
            
            return [
                'success' => true,
                'array_filters' => $arrayFilters,
                'matched_count' => $result->getMatchedCount(),
                'modified_count' => $result->getModifiedCount()
            ];
            
        } catch (Exception $e) {
            return [
                'success' => false,
                'error' => 'Batch update with array filters failed',
                'message' => $e->getMessage()
            ];
        }
    }
    
    public function batchUpdateWithUpsert($collectionName, $filter, $update, $options = []) {
        $collection = $this->database->selectCollection($collectionName);
        
        try {
            $result = $collection->updateMany($filter, $update, array_merge($options, ['upsert' => true]));
            
            $wasInserted = $result->getUpsertedCount() > 0;
            $wasUpdated = $result->getModifiedCount() > 0;
            
            return [
                'success' => true,
                'upsert_enabled' => true,
                'inserted' => $wasInserted,
                'updated' => $wasUpdated,
                'matched_count' => $result->getMatchedCount(),
                'modified_count' => $result->getModifiedCount(),
                'upserted_count' => $result->getUpsertedCount(),
                'upserted_id' => $result->getUpsertedId()
            ];
            
        } catch (Exception $e) {
            return [
                'success' => false,
                'error' => 'Batch update with upsert failed',
                'message' => $e->getMessage()
            ];
        }
    }
}

// 使用示例
$batchUpdateOps = new BatchUpdateOperations('testdb');

// 基本批量更新
$filter = ['status' => 'pending'];
$update = ['$set' => ['status' => 'processed', 'processed_at' => new MongoDB\BSON\UTCDateTime()]];
$updateResult = $batchUpdateOps->batchUpdate('orders', $filter, $update);
print_r($updateResult);

// 带进度的批量更新
$progressCallback = function($batchResult, $currentBatch, $totalBatches) {
    echo "Batch {$currentBatch}/{$totalBatches}: ";
    echo "Modified {$batchResult['modified_count']} documents, ";
    echo "Status: {$batchResult['status']}\n";
};

$progressUpdateResult = $batchUpdateOps->batchUpdateWithProgress(
    'orders',
    ['status' => 'pending'],
    ['$set' => ['status' => 'processed', 'processed_at' => new MongoDB\BSON\UTCDateTime()]],
    1000,
    $progressCallback
);
print_r($progressUpdateResult);

// 使用数组过滤器的批量更新
$arrayFilterResult = $batchUpdateOps->batchUpdateWithArrayFilters(
    'orders',
    ['order_id' => 'ORD001'],
    ['$set' => ['items.$[elem].price' => 99.99]],
    [['elem.item_id' => 'ITEM001']]
);
print_r($arrayFilterResult);

// 带upsert的批量更新
$upsertResult = $batchUpdateOps->batchUpdateWithUpsert(
    'users',
    ['role' => 'premium'],
    ['$set' => ['discount' => 0.1, 'updated_at' => new MongoDB\BSON\UTCDateTime()]]
);
print_r($upsertResult);
?>

批量删除原理

使用deleteMany()方法批量删除文档:

php
<?php
// 批量删除详解
class BatchDeleteOperations {
    private $database;
    
    public function __construct($databaseName) {
        $client = new MongoDB\Client("mongodb://localhost:27017");
        $this->database = $client->selectDatabase($databaseName);
    }
    
    public function batchDelete($collectionName, $filter, $options = []) {
        $collection = $this->database->selectCollection($collectionName);
        
        try {
            $result = $collection->deleteMany($filter, $options);
            
            return [
                'success' => true,
                'deleted_count' => $result->getDeletedCount(),
                'acknowledged' => $result->isAcknowledged()
            ];
            
        } catch (Exception $e) {
            return [
                'success' => false,
                'error' => 'Batch delete failed',
                'message' => $e->getMessage()
            ];
        }
    }
    
    public function batchDeleteWithProgress($collectionName, $filter, $batchSize = 10000, $callback = null) {
        $collection = $this->database->selectCollection($collectionName);
        
        // 获取匹配的文档总数
        $totalMatched = $collection->countDocuments($filter);
        
        if ($totalMatched === 0) {
            return [
                'success' => true,
                'message' => 'No documents match the filter',
                'total_matched' => 0,
                'total_deleted' => 0
            ];
        }
        
        $results = [
            'total_matched' => $totalMatched,
            'batch_size' => $batchSize,
            'total_batches' => ceil($totalMatched / $batchSize),
            'total_deleted' => 0,
            'batch_results' => []
        ];
        
        $skip = 0;
        $batchNumber = 0;
        
        while ($skip < $totalMatched) {
            $batchNumber++;
            
            try {
                $result = $collection->deleteMany($filter, [
                    'skip' => $skip,
                    'limit' => $batchSize
                ]);
                
                $batchResult = [
                    'batch_number' => $batchNumber,
                    'deleted_count' => $result->getDeletedCount(),
                    'status' => 'success'
                ];
                
                $results['total_deleted'] += $result->getDeletedCount();
                
            } catch (Exception $e) {
                $batchResult = [
                    'batch_number' => $batchNumber,
                    'deleted_count' => 0,
                    'status' => 'failed',
                    'error' => $e->getMessage()
                ];
            }
            
            $results['batch_results'][] = $batchResult;
            
            // 调用回调函数
            if ($callback && is_callable($callback)) {
                $callback($batchResult, $batchNumber, $results['total_batches']);
            }
            
            $skip += $batchSize;
        }
        
        return $results;
    }
    
    public function batchDeleteWithBackup($collectionName, $filter, $backup = true) {
        $collection = $this->database->selectCollection($collectionName);
        
        try {
            // 备份数据
            $backupData = null;
            if ($backup) {
                $backupData = $collection->find($filter)->toArray();
                
                if (!empty($backupData)) {
                    $backupCollection = $this->database->selectCollection(
                        $collection->getCollectionName() . '_backup_' . date('YmdHis')
                    );
                    $backupCollection->insertMany($backupData);
                }
            }
            
            // 删除数据
            $result = $collection->deleteMany($filter);
            
            return [
                'success' => true,
                'backup_created' => $backup && !empty($backupData),
                'backup_count' => $backup ? count($backupData) : 0,
                'deleted_count' => $result->getDeletedCount()
            ];
            
        } catch (Exception $e) {
            return [
                'success' => false,
                'error' => 'Batch delete with backup failed',
                'message' => $e->getMessage()
            ];
        }
    }
    
    public function batchDeleteByDateRange($collectionName, $dateField, $startDate, $endDate, $batchSize = 10000) {
        $collection = $this->database->selectCollection($collectionName);
        
        try {
            $filter = [
                $dateField => [
                    '$gte' => $startDate,
                    '$lte' => $endDate
                ]
            ];
            
            return $this->batchDeleteWithProgress($collectionName, $filter, $batchSize);
            
        } catch (Exception $e) {
            return [
                'success' => false,
                'error' => 'Date range batch delete failed',
                'message' => $e->getMessage()
            ];
        }
    }
}

// 使用示例
$batchDeleteOps = new BatchDeleteOperations('testdb');

// 基本批量删除
$filter = ['status' => 'inactive'];
$deleteResult = $batchDeleteOps->batchDelete('users', $filter);
print_r($deleteResult);

// 带进度的批量删除
$progressCallback = function($batchResult, $currentBatch, $totalBatches) {
    echo "Batch {$currentBatch}/{$totalBatches}: ";
    echo "Deleted {$batchResult['deleted_count']} documents, ";
    echo "Status: {$batchResult['status']}\n";
};

$progressDeleteResult = $batchDeleteOps->batchDeleteWithProgress(
    'logs',
    ['created_at' => ['$lt' => new MongoDB\BSON\UTCDateTime((time() - 90 * 86400) * 1000)]],
    10000,
    $progressCallback
);
print_r($progressDeleteResult);

// 带备份的批量删除
$backupDeleteResult = $batchDeleteOps->batchDeleteWithBackup(
    'users',
    ['status' => 'deleted'],
    true
);
print_r($backupDeleteResult);

// 按日期范围批量删除
$startDate = new MongoDB\BSON\UTCDateTime(strtotime('2024-01-01') * 1000);
$endDate = new MongoDB\BSON\UTCDateTime(strtotime('2024-01-31') * 1000);

$dateRangeDeleteResult = $batchDeleteOps->batchDeleteByDateRange(
    'logs',
    'created_at',
    $startDate,
    $endDate,
    5000
);
print_r($dateRangeDeleteResult);
?>

混合批量操作原理

使用bulkWrite()方法执行混合批量操作:

php
<?php
// 混合批量操作详解
class MixedBulkOperations {
    private $database;
    
    public function __construct($databaseName) {
        $client = new MongoDB\Client("mongodb://localhost:27017");
        $this->database = $client->selectDatabase($databaseName);
    }
    
    public function executeBulkWrite($collectionName, $operations, $options = []) {
        $collection = $this->database->selectCollection($collectionName);
        
        try {
            $result = $collection->bulkWrite($operations, $options);
            
            return [
                'success' => true,
                'inserted_count' => $result->getInsertedCount(),
                'matched_count' => $result->getMatchedCount(),
                'modified_count' => $result->getModifiedCount(),
                'deleted_count' => $result->getDeletedCount(),
                'upserted_count' => $result->getUpsertedCount(),
                'upserted_ids' => $result->getUpsertedIds()
            ];
            
        } catch (MongoDB\Driver\Exception\BulkWriteException $e) {
            $writeResult = $e->getWriteResult();
            
            return [
                'success' => false,
                'error' => 'Bulk write error',
                'inserted_count' => $writeResult->getInsertedCount(),
                'matched_count' => $writeResult->getMatchedCount(),
                'modified_count' => $writeResult->getModifiedCount(),
                'deleted_count' => $writeResult->getDeletedCount(),
                'write_errors' => $writeResult->getWriteErrors(),
                'message' => $e->getMessage()
            ];
        } catch (Exception $e) {
            return [
                'success' => false,
                'error' => 'Bulk write failed',
                'message' => $e->getMessage()
            ];
        }
    }
    
    public function executeBulkWriteWithProgress($collectionName, $operations, $batchSize = 1000, $callback = null) {
        $collection = $this->database->selectCollection($collectionName);
        
        $totalOperations = count($operations);
        $batches = array_chunk($operations, $batchSize);
        $totalBatches = count($batches);
        
        $results = [
            'total_operations' => $totalOperations,
            'total_batches' => $totalBatches,
            'batch_size' => $batchSize,
            'total_inserted' => 0,
            'total_updated' => 0,
            'total_deleted' => 0,
            'batch_results' => []
        ];
        
        foreach ($batches as $batchIndex => $batch) {
            try {
                $result = $collection->bulkWrite($batch);
                
                $batchResult = [
                    'batch_number' => $batchIndex + 1,
                    'batch_size' => count($batch),
                    'inserted_count' => $result->getInsertedCount(),
                    'matched_count' => $result->getMatchedCount(),
                    'modified_count' => $result->getModifiedCount(),
                    'deleted_count' => $result->getDeletedCount(),
                    'status' => 'success'
                ];
                
                $results['total_inserted'] += $result->getInsertedCount();
                $results['total_updated'] += $result->getModifiedCount();
                $results['total_deleted'] += $result->getDeletedCount();
                
            } catch (MongoDB\Driver\Exception\BulkWriteException $e) {
                $writeResult = $e->getWriteResult();
                
                $batchResult = [
                    'batch_number' => $batchIndex + 1,
                    'batch_size' => count($batch),
                    'inserted_count' => $writeResult->getInsertedCount(),
                    'matched_count' => $writeResult->getMatchedCount(),
                    'modified_count' => $writeResult->getModifiedCount(),
                    'deleted_count' => $writeResult->getDeletedCount(),
                    'status' => 'partial_failure',
                    'errors' => $writeResult->getWriteErrors()
                ];
                
                $results['total_inserted'] += $writeResult->getInsertedCount();
                $results['total_updated'] += $writeResult->getModifiedCount();
                $results['total_deleted'] += $writeResult->getDeletedCount();
                
            } catch (Exception $e) {
                $batchResult = [
                    'batch_number' => $batchIndex + 1,
                    'batch_size' => count($batch),
                    'inserted_count' => 0,
                    'matched_count' => 0,
                    'modified_count' => 0,
                    'deleted_count' => 0,
                    'status' => 'failed',
                    'error' => $e->getMessage()
                ];
            }
            
            $results['batch_results'][] = $batchResult;
            
            // 调用回调函数
            if ($callback && is_callable($callback)) {
                $callback($batchResult, $batchIndex + 1, $totalBatches);
            }
        }
        
        return $results;
    }
    
    public function executeOrderedBulkWrite($collectionName, $operations, $ordered = true) {
        $collection = $this->database->selectCollection($collectionName);
        
        try {
            $result = $collection->bulkWrite($operations, ['ordered' => $ordered]);
            
            return [
                'success' => true,
                'ordered' => $ordered,
                'inserted_count' => $result->getInsertedCount(),
                'matched_count' => $result->getMatchedCount(),
                'modified_count' => $result->getModifiedCount(),
                'deleted_count' => $result->getDeletedCount()
            ];
            
        } catch (MongoDB\Driver\Exception\BulkWriteException $e) {
            $writeResult = $e->getWriteResult();
            
            return [
                'success' => false,
                'ordered' => $ordered,
                'inserted_count' => $writeResult->getInsertedCount(),
                'matched_count' => $writeResult->getMatchedCount(),
                'modified_count' => $writeResult->getModifiedCount(),
                'deleted_count' => $writeResult->getDeletedCount(),
                'write_errors' => $writeResult->getWriteErrors(),
                'message' => $ordered ? 
                    'Execution stopped at first error' : 
                    'Some operations failed, others succeeded'
            ];
        }
    }
}

// 使用示例
$mixedBulkOps = new MixedBulkOperations('testdb');

// 基本混合批量操作
$operations = [
    new MongoDB\Driver\BulkWrite(['insertOne' => [
        ['name' => 'User 1', 'email' => 'user1@example.com', 'age' => 25]
    ]]),
    new MongoDB\Driver\BulkWrite(['insertOne' => [
        ['name' => 'User 2', 'email' => 'user2@example.com', 'age' => 30]
    ]]),
    new MongoDB\Driver\BulkWrite(['updateOne' => [
        ['email' => 'user1@example.com'],
        ['$set' => ['age' => 26]]
    ]]),
    new MongoDB\Driver\BulkWrite(['deleteOne' => [
        ['email' => 'user2@example.com']
    ]])
];

$bulkWriteResult = $mixedBulkOps->executeBulkWrite('users', $operations);
print_r($bulkWriteResult);

// 带进度的混合批量操作
$largeOperations = [];
for ($i = 0; $i < 10000; $i++) {
    $largeOperations[] = new MongoDB\Driver\BulkWrite(['insertOne' => [
        ['name' => "User {$i}", 'email' => "user{$i}@example.com", 'age' => rand(18, 65)]
    ]]);
}

$progressCallback = function($batchResult, $currentBatch, $totalBatches) {
    echo "Batch {$currentBatch}/{$totalBatches}: ";
    echo "Inserted {$batchResult['inserted_count']}, ";
    echo "Updated {$batchResult['modified_count']}, ";
    echo "Deleted {$batchResult['deleted_count']}, ";
    echo "Status: {$batchResult['status']}\n";
};

$progressBulkResult = $mixedBulkOps->executeBulkWriteWithProgress(
    'users',
    $largeOperations,
    1000,
    $progressCallback
);
print_r($progressBulkResult);

// 有序混合批量操作
$orderedBulkResult = $mixedBulkOps->executeOrderedBulkWrite('users', $operations, true);
print_r($orderedBulkResult);

$unorderedBulkResult = $mixedBulkOps->executeOrderedBulkWrite('users', $operations, false);
print_r($unorderedBulkResult);
?>

常见错误与踩坑点

错误1:批量操作内存溢出

问题描述:一次性处理过多数据导致内存溢出。

php
<?php
// 错误示例 - 批量操作内存溢出
try {
    $client = new MongoDB\Client("mongodb://localhost:27017");
    $database = $client->selectDatabase("testdb");
    $collection = $database->selectCollection('users');
    
    // 错误:一次性插入过多数据
    $largeDocuments = [];
    for ($i = 0; $i < 100000; $i++) {
        $largeDocuments[] = [
            'name' => "User {$i}",
            'email' => "user{$i}@example.com",
            'data' => str_repeat('x', 10000) // 大数据
        ];
    }
    
    $collection->insertMany($largeDocuments);
    
} catch (Exception $e) {
    echo "错误: " . $e->getMessage() . "\n";
}

// 正确示例 - 分批处理大量数据
$client = new MongoDB\Client("mongodb://localhost:27017");
$database = $client->selectDatabase("testdb");
$collection = $database->selectCollection('users');

// 正确:分批插入
$batchSize = 1000;
$batches = array_chunk($largeDocuments, $batchSize);

foreach ($batches as $batchIndex => $batch) {
    try {
        $collection->insertMany($batch);
        echo "Batch " . ($batchIndex + 1) . ": Inserted " . count($batch) . " documents\n";
        
        // 释放内存
        unset($batch);
        
    } catch (Exception $e) {
        echo "Batch " . ($batchIndex + 1) . " failed: " . $e->getMessage() . "\n";
    }
}

echo "All batches processed\n";
?>

错误2:批量操作错误处理不当

问题描述:批量操作部分失败时错误处理不当。

php
<?php
// 错误示例 - 批量操作错误处理不当
try {
    $client = new MongoDB\Client("mongodb://localhost:27017");
    $database = $client->selectDatabase("testdb");
    $collection = $database->selectCollection('users');
    
    // 错误:不处理批量操作的部分失败
    $documents = [
        ['name' => 'User 1', 'email' => 'user1@example.com'],
        ['name' => 'User 2', 'email' => 'user1@example.com'], // 重复邮箱
        ['name' => 'User 3', 'email' => 'user3@example.com']
    ];
    
    $result = $collection->insertMany($documents);
    echo "Inserted " . $result->getInsertedCount() . " documents\n";
    
} catch (MongoDB\Driver\Exception\BulkWriteException $e) {
    echo "错误: " . $e->getMessage() . "\n";
}

// 正确示例 - 妥善处理批量操作错误
$client = new MongoDB\Client("mongodb://localhost:27017");
$database = $client->selectDatabase("testdb");
$collection = $database->selectCollection('users');

// 正确:处理部分失败
$documents = [
    ['name' => 'User 1', 'email' => 'user1@example.com'],
    ['name' => 'User 2', 'email' => 'user1@example.com'], // 重复邮箱
    ['name' => 'User 3', 'email' => 'user3@example.com']
];

try {
    $result = $collection->insertMany($documents);
    
    echo "Successfully inserted " . $result->getInsertedCount() . " documents\n";
    
} catch (MongoDB\Driver\Exception\BulkWriteException $e) {
    $writeResult = $e->getWriteResult();
    
    echo "Partial failure occurred\n";
    echo "Successfully inserted: " . $writeResult->getInsertedCount() . "\n";
    
    $writeErrors = $writeResult->getWriteErrors();
    foreach ($writeErrors as $error) {
        echo "Error: " . $error->getMessage() . "\n";
        echo "Code: " . $error->getCode() . "\n";
    }
    
    // 重试失败的操作
    $failedDocuments = [];
    foreach ($writeErrors as $error) {
        $index = $error->getIndex();
        $failedDocuments[] = $documents[$index];
    }
    
    if (!empty($failedDocuments)) {
        echo "Retrying " . count($failedDocuments) . " failed documents...\n";
        
        foreach ($failedDocuments as $document) {
            try {
                $collection->insertOne($document);
                echo "Retried: " . $document['name'] . "\n";
            } catch (Exception $retryError) {
                echo "Retry failed: " . $retryError->getMessage() . "\n";
            }
        }
    }
}
?>

常见应用场景

场景1:数据导入系统

实现高性能数据导入功能:

php
<?php
// 数据导入系统
class DataImportSystem {
    private $database;
    
    public function __construct($databaseName) {
        $client = new MongoDB\Client("mongodb://localhost:27017");
        $this->database = $client->selectDatabase($databaseName);
    }
    
    public function importData($collectionName, $data, $options = []) {
        $collection = $this->database->selectCollection($collectionName);
        
        $batchSize = $options['batch_size'] ?? 1000;
        $continueOnError = $options['continue_on_error'] ?? true;
        $updateExisting = $options['update_existing'] ?? false;
        
        $results = [
            'total_records' => count($data),
            'imported' => 0,
            'updated' => 0,
            'failed' => 0,
            'errors' => []
        ];
        
        $batches = array_chunk($data, $batchSize);
        
        foreach ($batches as $batchIndex => $batch) {
            foreach ($batch as $record) {
                try {
                    if ($updateExisting) {
                        // 更新或插入
                        $filter = $this->buildUniqueFilter($record, $options['unique_fields'] ?? []);
                        $result = $collection->updateOne(
                            $filter,
                            [
                                '$set' => $record,
                                '$setOnInsert' => ['created_at' => new MongoDB\BSON\UTCDateTime()]
                            ],
                            ['upsert' => true]
                        );
                        
                        if ($result->getUpsertedCount() > 0) {
                            $results['imported']++;
                        } else {
                            $results['updated']++;
                        }
                        
                    } else {
                        // 仅插入新记录
                        $result = $collection->insertOne($record);
                        $results['imported']++;
                    }
                    
                } catch (MongoDB\Driver\Exception\BulkWriteException $e) {
                    $results['failed']++;
                    
                    if (!$continueOnError) {
                        throw $e;
                    }
                    
                    $results['errors'][] = [
                        'record' => $record,
                        'error' => $e->getMessage()
                    ];
                    
                } catch (Exception $e) {
                    $results['failed']++;
                    $results['errors'][] = [
                        'record' => $record,
                        'error' => $e->getMessage()
                    ];
                }
            }
        }
        
        return $results;
    }
    
    private function buildUniqueFilter($record, $uniqueFields) {
        $filter = [];
        
        foreach ($uniqueFields as $field) {
            if (isset($record[$field])) {
                $filter[$field] = $record[$field];
            }
        }
        
        return $filter;
    }
    
    public function importWithProgress($collectionName, $data, $options = []) {
        $collection = $this->database->selectCollection($collectionName);
        
        $batchSize = $options['batch_size'] ?? 1000;
        $callback = $options['progress_callback'] ?? null;
        
        $totalRecords = count($data);
        $batches = array_chunk($data, $batchSize);
        $totalBatches = count($batches);
        
        $results = [
            'total_records' => $totalRecords,
            'total_batches' => $totalBatches,
            'batch_size' => $batchSize,
            'imported' => 0,
            'updated' => 0,
            'failed' => 0,
            'batch_results' => []
        ];
        
        foreach ($batches as $batchIndex => $batch) {
            try {
                $result = $collection->insertMany($batch);
                
                $batchResult = [
                    'batch_number' => $batchIndex + 1,
                    'batch_size' => count($batch),
                    'imported_count' => $result->getInsertedCount(),
                    'status' => 'success'
                ];
                
                $results['imported'] += $result->getInsertedCount();
                
            } catch (MongoDB\Driver\Exception\BulkWriteException $e) {
                $writeResult = $e->getWriteResult();
                
                $batchResult = [
                    'batch_number' => $batchIndex + 1,
                    'batch_size' => count($batch),
                    'imported_count' => $writeResult->getInsertedCount(),
                    'status' => 'partial_failure',
                    'errors' => $writeResult->getWriteErrors()
                ];
                
                $results['imported'] += $writeResult->getInsertedCount();
                $results['failed'] += count($batch) - $writeResult->getInsertedCount();
                
            } catch (Exception $e) {
                $batchResult = [
                    'batch_number' => $batchIndex + 1,
                    'batch_size' => count($batch),
                    'imported_count' => 0,
                    'status' => 'failed',
                    'error' => $e->getMessage()
                ];
                
                $results['failed'] += count($batch);
            }
            
            $results['batch_results'][] = $batchResult;
            
            // 调用回调函数
            if ($callback && is_callable($callback)) {
                $callback($batchResult, $batchIndex + 1, $totalBatches);
            }
        }
        
        return $results;
    }
}

// 使用示例
$dataImportSystem = new DataImportSystem('testdb');

// 基本数据导入
$importData = [];
for ($i = 0; $i < 5000; $i++) {
    $importData[] = [
        'name' => "User {$i}",
        'email' => "user{$i}@example.com",
        'age' => rand(18, 65)
    ];
}

$importResult = $dataImportSystem->importData('users', $importData, [
    'batch_size' => 1000,
    'continue_on_error' => true,
    'update_existing' => false
]);
print_r($importResult);

// 带进度的数据导入
$progressCallback = function($batchResult, $currentBatch, $totalBatches) {
    echo "Batch {$currentBatch}/{$totalBatches}: ";
    echo "Imported {$batchResult['imported_count']} documents, ";
    echo "Status: {$batchResult['status']}\n";
};

$progressImportResult = $dataImportSystem->importWithProgress(
    'users',
    $importData,
    [
        'batch_size' => 1000,
        'progress_callback' => $progressCallback
    ]
);
print_r($progressImportResult);
?>

常见问题答疑

问题1:如何选择合适的批量大小?

回答:根据数据大小、网络延迟、服务器性能选择批量大小:

php
<?php
// 批量大小选择助手
class BatchSizeSelector {
    public static function selectBatchSize($documentSize, $networkLatency, $serverPerformance) {
        $baseBatchSize = 1000;
        
        // 根据文档大小调整
        if ($documentSize > 10000) { // > 10KB
            $baseBatchSize = 100;
        } elseif ($documentSize > 5000) { // > 5KB
            $baseBatchSize = 500;
        } elseif ($documentSize > 1000) { // > 1KB
            $baseBatchSize = 1000;
        }
        
        // 根据网络延迟调整
        if ($networkLatency > 100) { // > 100ms
            $baseBatchSize = max(100, $baseBatchSize * 0.5);
        } elseif ($networkLatency > 50) { // > 50ms
            $baseBatchSize = $baseBatchSize * 0.75;
        }
        
        // 根据服务器性能调整
        if ($serverPerformance === 'low') {
            $baseBatchSize = max(100, $baseBatchSize * 0.5);
        } elseif ($serverPerformance === 'high') {
            $baseBatchSize = $baseBatchSize * 1.5;
        }
        
        return [
            'recommended_batch_size' => (int)$baseBatchSize,
            'document_size' => $documentSize,
            'network_latency' => $networkLatency,
            'server_performance' => $serverPerformance
        ];
    }
}

// 使用示例
$batchSize = BatchSizeSelector::selectBatchSize(5000, 75, 'medium');
print_r($batchSize);
?>

实战练习

练习1:实现批量数据同步

实现一个批量数据同步系统:

php
<?php
// 批量数据同步系统
class BatchDataSync {
    private $sourceDatabase;
    private $targetDatabase;
    
    public function __construct($sourceDatabaseName, $targetDatabaseName) {
        $client = new MongoDB\Client("mongodb://localhost:27017");
        $this->sourceDatabase = $client->selectDatabase($sourceDatabaseName);
        $this->targetDatabase = $client->selectDatabase($targetDatabaseName);
    }
    
    public function syncCollection($sourceCollectionName, $targetCollectionName, $options = []) {
        $sourceCollection = $this->sourceDatabase->selectCollection($sourceCollectionName);
        $targetCollection = $this->targetDatabase->selectCollection($targetCollectionName);
        
        $batchSize = $options['batch_size'] ?? 1000;
        $syncStrategy = $options['sync_strategy'] ?? 'insert_only';
        
        // 获取源数据
        $sourceDocuments = $sourceCollection->find()->toArray();
        
        $results = [
            'total_documents' => count($sourceDocuments),
            'synced' => 0,
            'updated' => 0,
            'failed' => 0,
            'batch_results' => []
        ];
        
        $batches = array_chunk($sourceDocuments, $batchSize);
        
        foreach ($batches as $batchIndex => $batch) {
            try {
                if ($syncStrategy === 'insert_only') {
                    // 仅插入
                    $result = $targetCollection->insertMany($batch);
                    $results['synced'] += $result->getInsertedCount();
                    
                } elseif ($syncStrategy === 'upsert') {
                    // 更新或插入
                    foreach ($batch as $document) {
                        $filter = ['_id' => $document['_id']];
                        $result = $targetCollection->updateOne(
                            $filter,
                            ['$set' => $document],
                            ['upsert' => true]
                        );
                        
                        if ($result->getUpsertedCount() > 0) {
                            $results['synced']++;
                        } else {
                            $results['updated']++;
                        }
                    }
                }
                
                $batchResult = [
                    'batch_number' => $batchIndex + 1,
                    'batch_size' => count($batch),
                    'status' => 'success'
                ];
                
            } catch (Exception $e) {
                $batchResult = [
                    'batch_number' => $batchIndex + 1,
                    'batch_size' => count($batch),
                    'status' => 'failed',
                    'error' => $e->getMessage()
                ];
                
                $results['failed'] += count($batch);
            }
            
            $results['batch_results'][] = $batchResult;
        }
        
        return $results;
    }
}

// 使用示例
$batchSync = new BatchDataSync('source_db', 'target_db');

$syncResult = $batchSync->syncCollection('users', 'users', [
    'batch_size' => 1000,
    'sync_strategy' => 'upsert'
]);
print_r($syncResult);
?>

知识点总结

核心概念

  1. 批量操作方法insertMany()updateMany()deleteMany()bulkWrite()
  2. 批量操作优势:性能提升、原子性、资源优化
  3. 批量操作限制:文档大小、批量大小、内存使用
  4. 错误处理:部分失败、重试机制、错误恢复

批量操作方法

  1. insertMany():批量插入文档
  2. updateMany():批量更新文档
  3. deleteMany():批量删除文档
  4. bulkWrite():执行混合批量操作

最佳实践

  1. 批量大小:根据数据大小选择合适的批量大小
  2. 错误处理:妥善处理部分失败的情况
  3. 进度监控:提供批量操作进度反馈
  4. 内存管理:分批处理大量数据避免内存溢出

常见场景

  1. 数据导入:批量导入大量数据
  2. 数据同步:批量同步数据到目标集合
  3. 批量更新:批量更新符合条件的文档
  4. 数据清理:批量删除过期数据

拓展参考资料

官方文档

推荐阅读

  • 《MongoDB批量操作优化》
  • 《大规模数据导入实践》
  • 《MongoDB性能调优指南》

在线资源

  • MongoDB University批量操作课程
  • MongoDB官方博客性能优化文章
  • Stack Overflow MongoDB批量操作相关问题