Skip to content

排序与分页

概述

排序和分页是数据查询中的核心功能,它们允许开发者按照特定顺序组织数据并控制结果集的大小。MongoDB提供了强大的排序和分页功能,支持单字段和多字段排序,以及基于skip和limit的高效分页机制。在PHP MongoDB驱动中,这些功能通过查询选项参数来实现。

合理的排序和分页策略对于提升用户体验和应用性能至关重要。不当的分页实现可能导致性能问题,特别是在大数据集上。掌握MongoDB的排序和分页机制,能够帮助开发者构建高效的数据展示功能。

基本概念

排序操作

MongoDB使用sort()方法对查询结果进行排序:

  • 升序排序:使用1表示
  • 降序排序:使用-1表示
  • 多字段排序:按照字段顺序依次排序
  • 文本搜索排序:使用文本分数排序

分页操作

MongoDB提供两种主要的分页方式:

  • 基于skip/limit的分页:传统分页方式,适合小数据集
  • 基于游标的分页:高效分页方式,适合大数据集
  • 基于范围查询的分页:使用索引字段进行分页

性能考虑

排序和分页操作的性能特点:

  • 索引使用:排序字段应该有索引支持
  • 内存限制:排序操作有32MB内存限制
  • skip性能:大skip值会影响性能
  • 结果集大小:限制返回文档数量

原理深度解析

排序执行机制

MongoDB排序操作的内部执行过程:

php
class SortExecutionMechanism {
    private $collection;
    
    public function __construct($collection) {
        $this->collection = $collection;
    }
    
    public function analyzeSortExecution($filter, $sort) {
        $command = new MongoDB\Driver\Command([
            'explain' => $this->collection->getNamespace() . '.find',
            'filter' => $filter,
            'sort' => $sort,
            'verbosity' => 'executionStats'
        ]);
        
        $cursor = $this->collection->getManager()->executeCommand(
            $this->collection->getDatabaseName(),
            $command
        );
        
        $result = $cursor->toArray()[0];
        
        return [
            'sort_stage' => $result['queryPlanner']['winningPlan']['inputStage']['stage'] ?? null,
            'sort_pattern' => $result['queryPlanner']['winningPlan']['inputStage']['sortPattern'] ?? null,
            'memory_used' => $result['executionStats']['sortStats']['totalDataSizeBytes'] ?? 0,
            'docs_sorted' => $result['executionStats']['sortStats']['works'] ?? 0,
            'execution_time_ms' => $result['executionStats']['executionTimeMillis']
        ];
    }
    
    public function compareSortStrategies($field, $sampleSize = 1000) {
        $strategies = [
            'indexed_sort' => [
                'sort' => [$field => 1],
                'create_index' => true
            ],
            'unindexed_sort' => [
                'sort' => [$field => 1],
                'create_index' => false
            ],
            'compound_sort' => [
                'sort' => [$field => 1, 'created_at' => -1],
                'create_index' => true
            ]
        ];
        
        $results = [];
        
        foreach ($strategies as $name => $strategy) {
            if ($strategy['create_index']) {
                $this->collection->createIndex([$field => 1]);
            }
            
            $start = microtime(true);
            $analysis = $this->analyzeSortExecution([], $strategy['sort']);
            $time = (microtime(true) - $start) * 1000;
            
            $results[$name] = [
                'execution_time_ms' => $time,
                'memory_used_bytes' => $analysis['memory_used'],
                'docs_sorted' => $analysis['docs_sorted'],
                'uses_index' => $analysis['sort_stage'] !== 'SORT'
            ];
            
            if ($strategy['create_index']) {
                $this->collection->dropIndex([$field => 1]);
            }
        }
        
        return $results;
    }
    
    public function monitorSortMemoryUsage($filter, $sort) {
        $command = new MongoDB\Driver\Command([
            'explain' => $this->collection->getNamespace() . '.find',
            'filter' => $filter,
            'sort' => $sort,
            'verbosity' => 'executionStats'
        ]);
        
        $cursor = $this->collection->getManager()->executeCommand(
            $this->collection->getDatabaseName(),
            $command
        );
        
        $result = $cursor->toArray()[0];
        $sortStats = $result['executionStats']['sortStats'] ?? [];
        
        return [
            'total_data_size_bytes' => $sortStats['totalDataSizeBytes'] ?? 0,
            'used_disk' => $sortStats['usedDisk'] ?? false,
            'memory_limit_bytes' => 32 * 1024 * 1024,
            'memory_usage_percent' => ($sortStats['totalDataSizeBytes'] ?? 0) / (32 * 1024 * 1024) * 100
        ];
    }
}

分页实现机制

不同分页策略的实现原理:

php
class PaginationMechanism {
    private $collection;
    
    public function __construct($collection) {
        $this->collection = $collection;
    }
    
    public function traditionalPagination($page, $pageSize, $filter = [], $sort = []) {
        $skip = ($page - 1) * $pageSize;
        
        $options = [
            'skip' => $skip,
            'limit' => $pageSize,
            'sort' => $sort
        ];
        
        $startTime = microtime(true);
        $documents = $this->collection->find($filter, $options);
        $executionTime = (microtime(true) - $startTime) * 1000;
        
        $totalDocuments = $this->collection->countDocuments($filter);
        $totalPages = ceil($totalDocuments / $pageSize);
        
        return [
            'documents' => iterator_to_array($documents),
            'pagination' => [
                'current_page' => $page,
                'page_size' => $pageSize,
                'total_documents' => $totalDocuments,
                'total_pages' => $totalPages,
                'has_next_page' => $page < $totalPages,
                'has_prev_page' => $page > 1
            ],
            'performance' => [
                'execution_time_ms' => $executionTime,
                'skip_count' => $skip
            ]
        ];
    }
    
    public function cursorPagination($lastId, $pageSize, $filter = [], $sort = ['_id' => 1]) {
        $paginationFilter = $filter;
        
        if ($lastId) {
            $sortField = array_key_first($sort);
            $sortDirection = $sort[$sortField];
            
            if ($sortDirection === 1) {
                $paginationFilter[$sortField] = ['$gt' => $lastId];
            } else {
                $paginationFilter[$sortField] = ['$lt' => $lastId];
            }
        }
        
        $options = [
            'limit' => $pageSize + 1,
            'sort' => $sort
        ];
        
        $startTime = microtime(true);
        $documents = $this->collection->find($paginationFilter, $options);
        $executionTime = (microtime(true) - $startTime) * 1000;
        
        $documentArray = iterator_to_array($documents);
        $hasMore = count($documentArray) > $pageSize;
        
        if ($hasMore) {
            array_pop($documentArray);
        }
        
        $nextCursor = !empty($documentArray) ? end($documentArray)['_id'] : null;
        
        return [
            'documents' => $documentArray,
            'pagination' => [
                'next_cursor' => $nextCursor,
                'has_more' => $hasMore,
                'page_size' => $pageSize
            ],
            'performance' => [
                'execution_time_ms' => $executionTime
            ]
        ];
    }
    
    public function rangeBasedPagination($field, $lastValue, $pageSize, $filter = [], $sort = []) {
        $paginationFilter = $filter;
        
        if ($lastValue !== null) {
            $sortField = array_key_first($sort) ?: $field;
            $sortDirection = $sort[$sortField] ?? 1;
            
            if ($sortDirection === 1) {
                $paginationFilter[$sortField] = ['$gt' => $lastValue];
            } else {
                $paginationFilter[$sortField] = ['$lt' => $lastValue];
            }
        }
        
        $options = [
            'limit' => $pageSize + 1,
            'sort' => $sort ?: [$field => 1]
        ];
        
        $startTime = microtime(true);
        $documents = $this->collection->find($paginationFilter, $options);
        $executionTime = (microtime(true) - $startTime) * 1000;
        
        $documentArray = iterator_to_array($documents);
        $hasMore = count($documentArray) > $pageSize;
        
        if ($hasMore) {
            array_pop($documentArray);
        }
        
        $nextValue = !empty($documentArray) ? end($documentArray)[$field] : null;
        
        return [
            'documents' => $documentArray,
            'pagination' => [
                'next_value' => $nextValue,
                'has_more' => $hasMore,
                'page_size' => $pageSize
            ],
            'performance' => [
                'execution_time_ms' => $executionTime
            ]
        ];
    }
    
    public function comparePaginationMethods($pageSize, $totalPages = 10) {
        $methods = [
            'traditional' => [],
            'cursor' => [],
            'range_based' => []
        ];
        
        for ($page = 1; $page <= $totalPages; $page++) {
            $skip = ($page - 1) * $pageSize;
            
            $start = microtime(true);
            $this->collection->find([], ['skip' => $skip, 'limit' => $pageSize]);
            $traditionalTime = (microtime(true) - $start) * 1000;
            $methods['traditional'][] = $traditionalTime;
            
            $lastId = $page > 1 ? $this->getLastId($page - 1, $pageSize) : null;
            $start = microtime(true);
            $this->cursorPagination($lastId, $pageSize);
            $cursorTime = (microtime(true) - $start) * 1000;
            $methods['cursor'][] = $cursorTime;
        }
        
        return [
            'traditional' => [
                'avg_time_ms' => array_sum($methods['traditional']) / count($methods['traditional']),
                'max_time_ms' => max($methods['traditional']),
                'min_time_ms' => min($methods['traditional'])
            ],
            'cursor' => [
                'avg_time_ms' => array_sum($methods['cursor']) / count($methods['cursor']),
                'max_time_ms' => max($methods['cursor']),
                'min_time_ms' => min($methods['cursor'])
            ]
        ];
    }
    
    private function getLastId($page, $pageSize) {
        $skip = ($page - 1) * $pageSize;
        $options = ['skip' => $skip, 'limit' => $pageSize, 'sort' => ['_id' => 1]];
        $documents = $this->collection->find([], $options);
        $documentArray = iterator_to_array($documents);
        
        return !empty($documentArray) ? end($documentArray)['_id'] : null;
    }
}

排序索引优化

索引对排序性能的影响:

php
class SortIndexOptimization {
    private $collection;
    
    public function __construct($collection) {
        $this->collection = $collection;
    }
    
    public function createOptimalSortIndexes($sortFields) {
        $indexDefinitions = [];
        
        foreach ($sortFields as $field => $direction) {
            $indexName = "sort_{$field}_{$direction}";
            $this->collection->createIndex([$field => $direction], ['name' => $indexName]);
            $indexDefinitions[] = $indexName;
        }
        
        return $indexDefinitions;
    }
    
    public function analyzeSortIndexUsage($filter, $sort) {
        $command = new MongoDB\Driver\Command([
            'explain' => $this->collection->getNamespace() . '.find',
            'filter' => $filter,
            'sort' => $sort,
            'verbosity' => 'queryPlanner'
        ]);
        
        $cursor = $this->collection->getManager()->executeCommand(
            $this->collection->getDatabaseName(),
            $command
        );
        
        $result = $cursor->toArray()[0];
        $winningPlan = $result['queryPlanner']['winningPlan'];
        
        $indexUsage = [
            'uses_index' => false,
            'index_name' => null,
            'index_covered' => false,
            'sort_stage' => null
        ];
        
        if (isset($winningPlan['inputStage']['indexName'])) {
            $indexUsage['uses_index'] = true;
            $indexUsage['index_name'] = $winningPlan['inputStage']['indexName'];
        }
        
        if (isset($winningPlan['inputStage']['stage'])) {
            $indexUsage['sort_stage'] = $winningPlan['inputStage']['stage'];
            $indexUsage['index_covered'] = $winningPlan['inputStage']['stage'] !== 'SORT';
        }
        
        return $indexUsage;
    }
    
    public function recommendSortIndexes($commonQueries) {
        $recommendations = [];
        
        foreach ($commonQueries as $query) {
            $sort = $query['sort'];
            $filter = $query['filter'];
            
            $currentUsage = $this->analyzeSortIndexUsage($filter, $sort);
            
            if (!$currentUsage['uses_index'] || !$currentUsage['index_covered']) {
                $sortFields = array_keys($sort);
                $filterFields = array_keys($filter);
                
                $compoundFields = array_merge($filterFields, $sortFields);
                $compoundSort = [];
                
                foreach ($compoundFields as $field) {
                    if (isset($sort[$field])) {
                        $compoundSort[$field] = $sort[$field];
                    } else {
                        $compoundSort[$field] = 1;
                    }
                }
                
                $recommendations[] = [
                    'query' => $query,
                    'current_performance' => $currentUsage,
                    'recommended_index' => $compoundSort,
                    'index_name' => 'compound_' . md5(json_encode($compoundSort))
                ];
            }
        }
        
        return $recommendations;
    }
}

常见错误与踩坑点

排序内存溢出

排序操作超过32MB内存限制:

php
class SortMemoryErrorHandler {
    private $collection;
    
    public function __construct($collection) {
        $this->collection = $collection;
    }
    
    public function safeSortOperation($filter, $sort, $options = []) {
        $memoryCheck = $this->checkSortMemoryRequirements($filter, $sort);
        
        if ($memoryCheck['exceeds_limit']) {
            return $this->handleLargeSort($filter, $sort, $options);
        }
        
        return $this->collection->find($filter, array_merge($options, ['sort' => $sort]));
    }
    
    private function checkSortMemoryRequirements($filter, $sort) {
        $command = new MongoDB\Driver\Command([
            'explain' => $this->collection->getNamespace() . '.find',
            'filter' => $filter,
            'sort' => $sort,
            'verbosity' => 'executionStats'
        ]);
        
        $cursor = $this->collection->getManager()->executeCommand(
            $this->collection->getDatabaseName(),
            $command
        );
        
        $result = $cursor->toArray()[0];
        $sortStats = $result['executionStats']['sortStats'] ?? [];
        
        $memoryUsed = $sortStats['totalDataSizeBytes'] ?? 0;
        $memoryLimit = 32 * 1024 * 1024;
        
        return [
            'memory_used_bytes' => $memoryUsed,
            'memory_limit_bytes' => $memoryLimit,
            'exceeds_limit' => $memoryUsed > $memoryLimit,
            'usage_percent' => ($memoryUsed / $memoryLimit) * 100
        ];
    }
    
    private function handleLargeSort($filter, $sort, $options) {
        $sortField = array_key_first($sort);
        $sortDirection = $sort[$sortField];
        
        $this->ensureSortIndexExists($sortField, $sortDirection);
        
        $chunkSize = 10000;
        $results = [];
        
        $lastValue = null;
        while (true) {
            $chunkFilter = $filter;
            
            if ($lastValue !== null) {
                if ($sortDirection === 1) {
                    $chunkFilter[$sortField] = ['$gt' => $lastValue];
                } else {
                    $chunkFilter[$sortField] = ['$lt' => $lastValue];
                }
            }
            
            $chunkOptions = array_merge($options, [
                'sort' => $sort,
                'limit' => $chunkSize
            ]);
            
            $chunk = $this->collection->find($chunkFilter, $chunkOptions);
            $chunkArray = iterator_to_array($chunk);
            
            if (empty($chunkArray)) {
                break;
            }
            
            $results = array_merge($results, $chunkArray);
            
            if (count($chunkArray) < $chunkSize) {
                break;
            }
            
            $lastValue = end($chunkArray)[$sortField];
        }
        
        return $results;
    }
    
    private function ensureSortIndexExists($field, $direction) {
        $indexes = $this->collection->listIndexes();
        $indexExists = false;
        
        foreach ($indexes as $index) {
            if (isset($index['key'][$field]) && $index['key'][$field] === $direction) {
                $indexExists = true;
                break;
            }
        }
        
        if (!$indexExists) {
            $this->collection->createIndex([$field => $direction]);
        }
    }
}

分页性能问题

大skip值导致的性能问题:

php
class PaginationPerformanceHandler {
    private $collection;
    
    public function __construct($collection) {
        $this->collection = $collection;
    }
    
    public function optimizedPagination($page, $pageSize, $filter = [], $sort = []) {
        $skip = ($page - 1) * $pageSize;
        
        if ($skip > 10000) {
            return $this->useCursorPagination($page, $pageSize, $filter, $sort);
        }
        
        return $this->traditionalPagination($page, $pageSize, $filter, $sort);
    }
    
    private function traditionalPagination($page, $pageSize, $filter, $sort) {
        $skip = ($page - 1) * $pageSize;
        
        $options = [
            'skip' => $skip,
            'limit' => $pageSize,
            'sort' => $sort
        ];
        
        $documents = $this->collection->find($filter, $options);
        $totalDocuments = $this->collection->countDocuments($filter);
        
        return [
            'documents' => iterator_to_array($documents),
            'pagination' => [
                'current_page' => $page,
                'total_pages' => ceil($totalDocuments / $pageSize),
                'total_documents' => $totalDocuments
            ]
        ];
    }
    
    private function useCursorPagination($page, $pageSize, $filter, $sort) {
        $sortField = !empty($sort) ? array_key_first($sort) : '_id';
        $sortDirection = $sort[$sortField] ?? 1;
        
        $lastValue = $this->getCursorValue($page, $pageSize, $sortField, $sortDirection, $filter);
        
        $cursorFilter = $filter;
        if ($lastValue !== null) {
            if ($sortDirection === 1) {
                $cursorFilter[$sortField] = ['$gt' => $lastValue];
            } else {
                $cursorFilter[$sortField] = ['$lt' => $lastValue];
            }
        }
        
        $options = [
            'limit' => $pageSize,
            'sort' => $sort ?: [$sortField => $sortDirection]
        ];
        
        $documents = $this->collection->find($cursorFilter, $options);
        
        return [
            'documents' => iterator_to_array($documents),
            'pagination' => [
                'current_page' => $page,
                'method' => 'cursor_based'
            ]
        ];
    }
    
    private function getCursorValue($page, $pageSize, $sortField, $sortDirection, $filter) {
        $totalSkip = ($page - 1) * $pageSize;
        
        $options = [
            'skip' => $totalSkip,
            'limit' => 1,
            'sort' => [$sortField => $sortDirection],
            'projection' => [$sortField => 1]
        ];
        
        $document = $this->collection->findOne($filter, $options);
        
        return $document ? $document[$sortField] : null;
    }
    
    public function detectPaginationPerformanceIssue($page, $pageSize) {
        $skip = ($page - 1) * $pageSize;
        
        $command = new MongoDB\Driver\Command([
            'explain' => $this->collection->getNamespace() . '.find',
            'filter' => [],
            'skip' => $skip,
            'limit' => $pageSize,
            'verbosity' => 'executionStats'
        ]);
        
        $cursor = $this->collection->getManager()->executeCommand(
            $this->collection->getDatabaseName(),
            $command
        );
        
        $result = $cursor->toArray()[0];
        $executionStats = $result['executionStats'];
        
        $issue = [
            'has_performance_issue' => false,
            'severity' => 'none',
            'recommendation' => null
        ];
        
        if ($skip > 10000) {
            $issue['has_performance_issue'] = true;
            $issue['severity'] = 'high';
            $issue['recommendation'] = 'Use cursor-based pagination for large skip values';
        }
        
        if ($executionStats['executionTimeMillis'] > 1000) {
            $issue['has_performance_issue'] = true;
            $issue['severity'] = 'medium';
            $issue['recommendation'] = 'Consider using indexed fields for pagination';
        }
        
        return $issue;
    }
}

常见应用场景

电商商品列表分页

实现电商网站的商品列表分页:

php
class EcommerceProductPagination {
    private $collection;
    
    public function __construct($collection) {
        $this->collection = $collection;
    }
    
    public function getProductList($params) {
        $filter = $this->buildProductFilter($params);
        $sort = $this->buildSortOrder($params);
        
        $pagination = $this->getPaginationMethod($params);
        
        switch ($pagination['method']) {
            case 'cursor':
                return $this->cursorBasedPagination($filter, $sort, $pagination);
            case 'range':
                return $this->rangeBasedPagination($filter, $sort, $pagination);
            default:
                return $this->traditionalPagination($filter, $sort, $pagination);
        }
    }
    
    private function buildProductFilter($params) {
        $filter = [];
        
        if (!empty($params['category'])) {
            $filter['category'] = $params['category'];
        }
        
        if (!empty($params['price_range'])) {
            $filter['price'] = [
                '$gte' => $params['price_range']['min'],
                '$lte' => $params['price_range']['max']
            ];
        }
        
        if (!empty($params['brands'])) {
            $filter['brand'] = ['$in' => $params['brands']];
        }
        
        if (isset($params['in_stock']) && $params['in_stock']) {
            $filter['stock'] = ['$gt' => 0];
        }
        
        return $filter;
    }
    
    private function buildSortOrder($params) {
        $sortField = $params['sort_by'] ?? 'created_at';
        $sortDirection = $params['sort_order'] ?? 'desc';
        
        $direction = $sortDirection === 'asc' ? 1 : -1;
        
        $validFields = ['price', 'created_at', 'rating', 'sales', 'name'];
        if (in_array($sortField, $validFields)) {
            return [$sortField => $direction];
        }
        
        return ['created_at' => -1];
    }
    
    private function getPaginationMethod($params) {
        $method = $params['pagination_method'] ?? 'traditional';
        $page = $params['page'] ?? 1;
        $pageSize = $params['page_size'] ?? 20;
        
        return [
            'method' => $method,
            'page' => $page,
            'page_size' => min($pageSize, 100),
            'cursor' => $params['cursor'] ?? null,
            'last_value' => $params['last_value'] ?? null
        ];
    }
    
    private function traditionalPagination($filter, $sort, $pagination) {
        $skip = ($pagination['page'] - 1) * $pagination['page_size'];
        
        $options = [
            'skip' => $skip,
            'limit' => $pagination['page_size'],
            'sort' => $sort
        ];
        
        $documents = $this->collection->find($filter, $options);
        $totalDocuments = $this->collection->countDocuments($filter);
        
        return [
            'products' => iterator_to_array($documents),
            'pagination' => [
                'current_page' => $pagination['page'],
                'page_size' => $pagination['page_size'],
                'total_documents' => $totalDocuments,
                'total_pages' => ceil($totalDocuments / $pagination['page_size']),
                'has_next' => $pagination['page'] < ceil($totalDocuments / $pagination['page_size']),
                'has_prev' => $pagination['page'] > 1
            ]
        ];
    }
    
    private function cursorBasedPagination($filter, $sort, $pagination) {
        $sortField = array_key_first($sort);
        $sortDirection = $sort[$sortField];
        
        if ($pagination['cursor']) {
            if ($sortDirection === 1) {
                $filter[$sortField] = ['$gt' => $pagination['cursor']];
            } else {
                $filter[$sortField] = ['$lt' => $pagination['cursor']];
            }
        }
        
        $options = [
            'limit' => $pagination['page_size'] + 1,
            'sort' => $sort
        ];
        
        $documents = $this->collection->find($filter, $options);
        $documentArray = iterator_to_array($documents);
        
        $hasMore = count($documentArray) > $pagination['page_size'];
        if ($hasMore) {
            array_pop($documentArray);
        }
        
        $nextCursor = !empty($documentArray) ? end($documentArray)[$sortField] : null;
        
        return [
            'products' => $documentArray,
            'pagination' => [
                'next_cursor' => $nextCursor,
                'has_more' => $hasMore,
                'page_size' => $pagination['page_size']
            ]
        ];
    }
    
    private function rangeBasedPagination($filter, $sort, $pagination) {
        $sortField = array_key_first($sort);
        $sortDirection = $sort[$sortField];
        
        if ($pagination['last_value'] !== null) {
            if ($sortDirection === 1) {
                $filter[$sortField] = ['$gt' => $pagination['last_value']];
            } else {
                $filter[$sortField] = ['$lt' => $pagination['last_value']];
            }
        }
        
        $options = [
            'limit' => $pagination['page_size'] + 1,
            'sort' => $sort
        ];
        
        $documents = $this->collection->find($filter, $options);
        $documentArray = iterator_to_array($documents);
        
        $hasMore = count($documentArray) > $pagination['page_size'];
        if ($hasMore) {
            array_pop($documentArray);
        }
        
        $nextValue = !empty($documentArray) ? end($documentArray)[$sortField] : null;
        
        return [
            'products' => $documentArray,
            'pagination' => [
                'next_value' => $nextValue,
                'has_more' => $hasMore,
                'page_size' => $pagination['page_size']
            ]
        ];
    }
}

社交媒体动态排序

实现社交媒体动态的时间线排序:

php
class SocialMediaTimeline {
    private $collection;
    
    public function __construct($collection) {
        $this->collection = $collection;
    }
    
    public function getTimeline($userId, $params) {
        $following = $this->getUserFollowing($userId);
        
        $filter = [
            'user_id' => ['$in' => $following],
            'visibility' => 'public'
        ];
        
        $sort = $this->buildTimelineSort($params);
        
        return $this->getPaginatedTimeline($filter, $sort, $params);
    }
    
    private function getUserFollowing($userId) {
        $user = $this->collection->getDatabase()
            ->selectCollection('users')
            ->findOne(['_id' => $userId]);
        
        return $user['following'] ?? [];
    }
    
    private function buildTimelineSort($params) {
        $sortBy = $params['sort_by'] ?? 'created_at';
        $sortOrder = $params['sort_order'] ?? 'desc';
        
        $direction = $sortOrder === 'asc' ? 1 : -1;
        
        $sortOptions = [
            'created_at' => ['created_at' => $direction],
            'popularity' => ['likes_count' => -1, 'comments_count' => -1],
            'engagement' => ['engagement_score' => -1, 'created_at' => -1]
        ];
        
        return $sortOptions[$sortBy] ?? $sortOptions['created_at'];
    }
    
    private function getPaginatedTimeline($filter, $sort, $params) {
        $method = $params['pagination_method'] ?? 'cursor';
        $pageSize = min($params['page_size'] ?? 20, 50);
        
        if ($method === 'cursor') {
            return $this->cursorTimeline($filter, $sort, $pageSize, $params);
        }
        
        return $this->traditionalTimeline($filter, $sort, $params);
    }
    
    private function cursorTimeline($filter, $sort, $pageSize, $params) {
        $sortField = array_key_first($sort);
        $sortDirection = $sort[$sortField];
        
        if ($params['cursor']) {
            if ($sortDirection === 1) {
                $filter[$sortField] = ['$gt' => $params['cursor']];
            } else {
                $filter[$sortField] = ['$lt' => $params['cursor']];
            }
        }
        
        $options = [
            'limit' => $pageSize + 1,
            'sort' => $sort
        ];
        
        $posts = $this->collection->find($filter, $options);
        $postArray = iterator_to_array($posts);
        
        $hasMore = count($postArray) > $pageSize;
        if ($hasMore) {
            array_pop($postArray);
        }
        
        $nextCursor = !empty($postArray) ? end($postArray)[$sortField] : null;
        
        return [
            'posts' => $this->enrichPosts($postArray),
            'pagination' => [
                'next_cursor' => $nextCursor,
                'has_more' => $hasMore
            ]
        ];
    }
    
    private function traditionalTimeline($filter, $sort, $params) {
        $page = $params['page'] ?? 1;
        $pageSize = min($params['page_size'] ?? 20, 50);
        $skip = ($page - 1) * $pageSize;
        
        $options = [
            'skip' => $skip,
            'limit' => $pageSize,
            'sort' => $sort
        ];
        
        $posts = $this->collection->find($filter, $options);
        $totalPosts = $this->collection->countDocuments($filter);
        
        return [
            'posts' => $this->enrichPosts(iterator_to_array($posts)),
            'pagination' => {
                'current_page' => $page,
                'total_pages' => ceil($totalPosts / $pageSize),
                'total_posts' => $totalPosts
            }
        ];
    }
    
    private function enrichPosts($posts) {
        foreach ($posts as &$post) {
            $post['user'] = $this->getUserInfo($post['user_id']);
            $post['is_liked'] = $this->checkIfLiked($post['_id'], $post['current_user_id']);
        }
        
        return $posts;
    }
    
    private function getUserInfo($userId) {
        return $this->collection->getDatabase()
            ->selectCollection('users')
            ->findOne(['_id' => $userId], ['projection' => ['name' => 1, 'avatar' => 1]]);
    }
    
    private function checkIfLiked($postId, $userId) {
        return $this->collection->getDatabase()
            ->selectCollection('likes')
            ->countDocuments(['post_id' => $postId, 'user_id' => $userId]) > 0;
    }
}

企业级进阶应用场景

大数据集高效分页

处理百万级数据集的高效分页:

php
class LargeDatasetPagination {
    private $collection;
    
    public function __construct($collection) {
        $this->collection = $collection;
    }
    
    public function getPaginatedData($params) {
        $strategy = $this->determinePaginationStrategy($params);
        
        switch ($strategy) {
            case 'seek':
                return $this->seekMethodPagination($params);
            case 'keyset':
                return $this->keysetPagination($params);
            case 'materialized':
                return $this->materializedViewPagination($params);
            default:
                return $this->optimizedTraditionalPagination($params);
        }
    }
    
    private function determinePaginationStrategy($params) {
        $totalDocuments = $this->collection->countDocuments($params['filter'] ?? []);
        $page = $params['page'] ?? 1;
        $skip = ($page - 1) * ($params['page_size'] ?? 20);
        
        if ($totalDocuments > 1000000 && $skip > 10000) {
            if ($this->hasSequentialKey($params)) {
                return 'keyset';
            }
            return 'seek';
        }
        
        if ($totalDocuments > 10000000) {
            return 'materialized';
        }
        
        return 'traditional';
    }
    
    private function hasSequentialKey($params) {
        $sort = $params['sort'] ?? [];
        $sortField = array_key_first($sort);
        
        return in_array($sortField, ['_id', 'created_at', 'updated_at']);
    }
    
    private function seekMethodPagination($params) {
        $filter = $params['filter'] ?? [];
        $sort = $params['sort'] ?? ['_id' => 1];
        $pageSize = $params['page_size'] ?? 100;
        $seekValue = $params['seek_value'] ?? null;
        
        $sortField = array_key_first($sort);
        $sortDirection = $sort[$sortField];
        
        if ($seekValue !== null) {
            if ($sortDirection === 1) {
                $filter[$sortField] = ['$gt' => $seekValue];
            } else {
                $filter[$sortField] = ['$lt' => $seekValue];
            }
        }
        
        $options = [
            'limit' => $pageSize + 1,
            'sort' => $sort
        ];
        
        $documents = $this->collection->find($filter, $options);
        $documentArray = iterator_to_array($documents);
        
        $hasMore = count($documentArray) > $pageSize;
        if ($hasMore) {
            array_pop($documentArray);
        }
        
        $nextSeekValue = !empty($documentArray) ? end($documentArray)[$sortField] : null;
        
        return [
            'data' => $documentArray,
            'pagination' => [
                'next_seek_value' => $nextSeekValue,
                'has_more' => $hasMore,
                'page_size' => $pageSize
            ]
        ];
    }
    
    private function keysetPagination($params) {
        $filter = $params['filter'] ?? [];
        $sort = $params['sort'] ?? ['_id' => 1];
        $pageSize = $params['page_size'] ?? 100;
        $lastKey = $params['last_key'] ?? null;
        
        $sortField = array_key_first($sort);
        $sortDirection = $sort[$sortField];
        
        if ($lastKey !== null) {
            $filter[$sortField] = $sortDirection === 1 
                ? ['$gt' => $lastKey] 
                : ['$lt' => $lastKey];
        }
        
        $options = [
            'limit' => $pageSize + 1,
            'sort' => $sort
        ];
        
        $documents = $this->collection->find($filter, $options);
        $documentArray = iterator_to_array($documents);
        
        $hasMore = count($documentArray) > $pageSize;
        if ($hasMore) {
            array_pop($documentArray);
        }
        
        $nextKey = !empty($documentArray) ? end($documentArray)[$sortField] : null;
        
        return [
            'data' => $documentArray,
            'pagination' => [
                'next_key' => $nextKey,
                'has_more' => $hasMore,
                'page_size' => $pageSize
            ]
        ];
    }
    
    private function materializedViewPagination($params) {
        $viewName = $this->getOrCreateMaterializedView($params);
        $view = $this->collection->getDatabase()->selectCollection($viewName);
        
        $page = $params['page'] ?? 1;
        $pageSize = $params['page_size'] ?? 100;
        $skip = ($page - 1) * $pageSize;
        
        $options = [
            'skip' => $skip,
            'limit' => $pageSize,
            'sort' => $params['sort'] ?? ['_id' => 1]
        ];
        
        $documents = $view->find([], $options);
        $totalDocuments = $view->countDocuments([]);
        
        return [
            'data' => iterator_to_array($documents),
            'pagination' => [
                'current_page' => $page,
                'total_pages' => ceil($totalDocuments / $pageSize),
                'total_documents' => $totalDocuments
            ]
        ];
    }
    
    private function getOrCreateMaterializedView($params) {
        $viewName = 'materialized_view_' . md5(json_encode($params['filter'] ?? []));
        
        $views = $this->collection->getDatabase()->listCollections();
        $viewExists = false;
        
        foreach ($views as $view) {
            if ($view->getName() === $viewName) {
                $viewExists = true;
                break;
            }
        }
        
        if (!$viewExists) {
            $this->createMaterializedView($viewName, $params);
        }
        
        return $viewName;
    }
    
    private function createMaterializedView($viewName, $params) {
        $pipeline = [
            ['$match' => $params['filter'] ?? []],
            ['$sort' => $params['sort'] ?? ['_id' => 1]],
            ['$out' => $viewName]
        ];
        
        $this->collection->aggregate($pipeline);
        
        $this->collection->getDatabase()->selectCollection($viewName)
            ->createIndex(['_id' => 1]);
    }
    
    private function optimizedTraditionalPagination($params) {
        $filter = $params['filter'] ?? [];
        $sort = $params['sort'] ?? ['_id' => 1];
        $page = $params['page'] ?? 1;
        $pageSize = $params['page_size'] ?? 100;
        $skip = ($page - 1) * $pageSize;
        
        $options = [
            'skip' => $skip,
            'limit' => $pageSize,
            'sort' => $sort
        ];
        
        $documents = $this->collection->find($filter, $options);
        $totalDocuments = $this->collection->countDocuments($filter);
        
        return [
            'data' => iterator_to_array($documents),
            'pagination' => [
                'current_page' => $page,
                'total_pages' => ceil($totalDocuments / $pageSize),
                'total_documents' => $totalDocuments
            ]
        ];
    }
}

实时数据流排序

处理实时数据流的动态排序:

php
class RealTimeDataSorting {
    private $collection;
    private $cache;
    
    public function __construct($collection, $cache) {
        $this->collection = $collection;
        $this->cache = $cache;
    }
    
    public function getSortedRealTimeData($params) {
        $cacheKey = $this->generateCacheKey($params);
        
        $cachedData = $this->cache->get($cacheKey);
        if ($cachedData) {
            return $this->mergeWithNewData($cachedData, $params);
        }
        
        return $this->fetchAndCacheData($cacheKey, $params);
    }
    
    private function generateCacheKey($params) {
        $keyParts = [
            'realtime_sort',
            json_encode($params['filter'] ?? []),
            json_encode($params['sort'] ?? []),
            $params['time_window'] ?? '5m'
        ];
        
        return md5(implode(':', $keyParts));
    }
    
    private function fetchAndCacheData($cacheKey, $params) {
        $timeWindow = $this->parseTimeWindow($params['time_window'] ?? '5m');
        $filter = array_merge($params['filter'] ?? [], [
            'created_at' => ['$gte' => $timeWindow['start']]
        ]);
        
        $sort = $params['sort'] ?? ['created_at' => -1];
        $limit = $params['limit'] ?? 100;
        
        $options = [
            'sort' => $sort,
            'limit' => $limit
        ];
        
        $documents = $this->collection->find($filter, $options);
        $data = iterator_to_array($documents);
        
        $this->cache->set($cacheKey, $data, 60);
        
        return [
            'data' => $data,
            'cache_status' => 'fresh',
            'time_window' => $timeWindow
        ];
    }
    
    private function mergeWithNewData($cachedData, $params) {
        $timeWindow = $this->parseTimeWindow($params['time_window'] ?? '5m');
        $newDataFilter = array_merge($params['filter'] ?? [], [
            'created_at' => ['$gt' => $cachedData[0]['created_at']]
        ]);
        
        $sort = $params['sort'] ?? ['created_at' => -1];
        $limit = $params['limit'] ?? 100;
        
        $newDocuments = $this->collection->find($newDataFilter, ['sort' => $sort]);
        $newData = iterator_to_array($newDocuments);
        
        $mergedData = array_merge($newData, $cachedData);
        $mergedData = $this->sortData($mergedData, $sort);
        $mergedData = array_slice($mergedData, 0, $limit);
        
        return [
            'data' => $mergedData,
            'cache_status' => 'merged',
            'new_records' => count($newData)
        ];
    }
    
    private function parseTimeWindow($window) {
        $now = new DateTime();
        
        if (preg_match('/(\d+)(m|h|d)/', $window, $matches)) {
            $value = (int)$matches[1];
            $unit = $matches[2];
            
            $interval = new DateInterval("PT{$value}" . strtoupper($unit));
            $start = clone $now;
            $start->sub($interval);
            
            return [
                'start' => $start,
                'end' => $now,
                'window' => $window
            ];
        }
        
        $start = clone $now;
        $start->sub(new DateInterval('PT5M'));
        
        return [
            'start' => $start,
            'end' => $now,
            'window' => '5m'
        ];
    }
    
    private function sortData($data, $sort) {
        $sortField = array_key_first($sort);
        $sortDirection = $sort[$sortField];
        
        usort($data, function($a, $b) use ($sortField, $sortDirection) {
            $aValue = $a[$sortField];
            $bValue = $b[$sortField];
            
            if ($aValue == $bValue) {
                return 0;
            }
            
            return ($sortDirection === 1 ? $aValue < $bValue : $aValue > $bValue) ? -1 : 1;
        });
        
        return $data;
    }
    
    public function invalidateCache($params) {
        $cacheKey = $this->generateCacheKey($params);
        $this->cache->delete($cacheKey);
    }
}

行业最佳实践

分页策略选择指南

根据不同场景选择合适的分页策略:

php
class PaginationStrategyGuide {
    public function recommendStrategy($scenario) {
        $recommendations = [];
        
        $dataSize = $scenario['total_documents'] ?? 0;
        $accessPattern = $scenario['access_pattern'] ?? 'sequential';
        $userExperience = $scenario['user_experience'] ?? 'standard';
        
        if ($dataSize < 10000) {
            $recommendations[] = [
                'strategy' => 'traditional',
                'reason' => 'Small dataset, traditional pagination is sufficient',
                'implementation' => 'skip/limit with total count',
                'pros' => ['Simple to implement', 'Direct page access'],
                'cons' => ['Performance degrades with large skip values']
            ];
        }
        
        if ($dataSize >= 10000 && $dataSize < 1000000) {
            if ($accessPattern === 'sequential') {
                $recommendations[] = [
                    'strategy' => 'cursor_based',
                    'reason' => 'Medium dataset with sequential access',
                    'implementation' => 'Use last document value as cursor',
                    'pros' => ['Consistent performance', 'No skip overhead'],
                    'cons' => ['Cannot jump to arbitrary pages']
                ];
            } else {
                $recommendations[] = [
                    'strategy' => 'hybrid',
                    'reason' => 'Medium dataset with random access',
                    'implementation' => 'Traditional for early pages, cursor for deep pages',
                    'pros' => ['Balanced performance and functionality'],
                    'cons' => ['More complex implementation']
                ];
            }
        }
        
        if ($dataSize >= 1000000) {
            $recommendations[] = [
                'strategy' => 'keyset',
                'reason' => 'Large dataset requires efficient pagination',
                'implementation' => 'Use indexed sequential field as keyset',
                'pros' => ['Excellent performance', 'Scalable'],
                'cons' => ['Requires indexed sequential field']
            ];
        }
        
        if ($userExperience === 'infinite_scroll') {
            $recommendations[] = [
                'strategy' => 'cursor_based',
                'reason' => 'Infinite scroll only needs forward navigation',
                'implementation' => 'Load more data using cursor',
                'pros' => ['Perfect for infinite scroll', 'Efficient'],
                'cons' => ['No page numbers']
            ];
        }
        
        return $recommendations;
    }
    
    public function implementPaginationStrategy($strategy, $params) {
        switch ($strategy) {
            case 'traditional':
                return $this->implementTraditionalPagination($params);
            case 'cursor_based':
                return $this->implementCursorPagination($params);
            case 'keyset':
                return $this->implementKeysetPagination($params);
            case 'hybrid':
                return $this->implementHybridPagination($params);
            default:
                throw new InvalidArgumentException("Unknown pagination strategy: $strategy");
        }
    }
    
    private function implementTraditionalPagination($params) {
        return [
            'method' => 'skip_limit',
            'implementation' => 'Use skip() and limit() methods',
            'example' => [
                'skip' => ($params['page'] - 1) * $params['page_size'],
                'limit' => $params['page_size']
            ]
        ];
    }
    
    private function implementCursorPagination($params) {
        return [
            'method' => 'cursor_based',
            'implementation' => 'Use last document value as cursor',
            'example' => [
                'filter' => ['_id' => ['$gt' => $params['cursor']]],
                'limit' => $params['page_size']
            ]
        ];
    }
    
    private function implementKeysetPagination($params) {
        return [
            'method' => 'keyset',
            'implementation' => 'Use indexed sequential field',
            'example' => [
                'filter' => [$params['keyset_field'] => ['$gt' => $params['last_value']]],
                'sort' => [$params['keyset_field'] => 1],
                'limit' => $params['page_size']
            ]
        ];
    }
    
    private function implementHybridPagination($params) {
        $threshold = 100;
        $skip = ($params['page'] - 1) * $params['page_size'];
        
        if ($skip < $threshold) {
            return $this->implementTraditionalPagination($params);
        }
        
        return $this->implementCursorPagination($params);
    }
}

排序性能优化

优化排序操作的性能:

php
class SortPerformanceOptimizer {
    private $collection;
    
    public function __construct($collection) {
        $this->collection = $collection;
    }
    
    public function optimizeSortOperation($filter, $sort, $options = []) {
        $optimizations = $this->identifyOptimizations($filter, $sort);
        
        foreach ($optimizations as $optimization) {
            $this->applyOptimization($optimization);
        }
        
        return $this->collection->find($filter, array_merge($options, ['sort' => $sort]));
    }
    
    private function identifyOptimizations($filter, $sort) {
        $optimizations = [];
        
        $indexAnalysis = $this->analyzeIndexUsage($filter, $sort);
        if (!$indexAnalysis['optimal']) {
            $optimizations[] = [
                'type' => 'create_index',
                'priority' => 'high',
                'details' => $indexAnalysis['recommendation']
            ];
        }
        
        $memoryAnalysis = $this->analyzeMemoryUsage($filter, $sort);
        if ($memoryAnalysis['exceeds_limit']) {
            $optimizations[] = [
                'type' => 'reduce_result_set',
                'priority' => 'high',
                'details' => $memoryAnalysis
            ];
        }
        
        return $optimizations;
    }
    
    private function applyOptimization($optimization) {
        switch ($optimization['type']) {
            case 'create_index':
                $this->createOptimalIndex($optimization['details']);
                break;
            case 'reduce_result_set':
                $this->reduceResultSize($optimization['details']);
                break;
        }
    }
    
    private function analyzeIndexUsage($filter, $sort) {
        $command = new MongoDB\Driver\Command([
            'explain' => $this->collection->getNamespace() . '.find',
            'filter' => $filter,
            'sort' => $sort,
            'verbosity' => 'queryPlanner'
        ]);
        
        $cursor = $this->collection->getManager()->executeCommand(
            $this->collection->getDatabaseName(),
            $command
        );
        
        $result = $cursor->toArray()[0];
        $winningPlan = $result['queryPlanner']['winningPlan'];
        
        $analysis = [
            'optimal' => true,
            'current_index' => null,
            'recommendation' => null
        ];
        
        if (isset($winningPlan['inputStage']['stage']) && 
            $winningPlan['inputStage']['stage'] === 'SORT') {
            $analysis['optimal'] = false;
            $analysis['recommendation'] = [
                'sort_fields' => array_keys($sort),
                'filter_fields' => array_keys($filter),
                'suggested_index' => $this->buildSuggestedIndex($filter, $sort)
            ];
        }
        
        return $analysis;
    }
    
    private function buildSuggestedIndex($filter, $sort) {
        $indexFields = [];
        
        foreach (array_keys($filter) as $field) {
            if (!in_array($field, ['$and', '$or', '$not'])) {
                $indexFields[$field] = 1;
            }
        }
        
        foreach ($sort as $field => $direction) {
            $indexFields[$field] = $direction;
        }
        
        return $indexFields;
    }
    
    private function createOptimalIndex($recommendation) {
        $indexFields = $recommendation['suggested_index'];
        $indexName = 'optimal_sort_' . md5(json_encode($indexFields));
        
        try {
            $this->collection->createIndex($indexFields, ['name' => $indexName]);
        } catch (MongoDB\Driver\Exception\Exception $e) {
            error_log("Failed to create index: " . $e->getMessage());
        }
    }
    
    private function analyzeMemoryUsage($filter, $sort) {
        $command = new MongoDB\Driver\Command([
            'explain' => $this->collection->getNamespace() . '.find',
            'filter' => $filter,
            'sort' => $sort,
            'verbosity' => 'executionStats'
        ]);
        
        $cursor = $this->collection->getManager()->executeCommand(
            $this->collection->getDatabaseName(),
            $command
        );
        
        $result = $cursor->toArray()[0];
        $sortStats = $result['executionStats']['sortStats'] ?? [];
        
        $memoryUsed = $sortStats['totalDataSizeBytes'] ?? 0;
        $memoryLimit = 32 * 1024 * 1024;
        
        return [
            'memory_used_bytes' => $memoryUsed,
            'memory_limit_bytes' => $memoryLimit,
            'exceeds_limit' => $memoryUsed > $memoryLimit,
            'usage_percent' => ($memoryUsed / $memoryLimit) * 100
        ];
    }
    
    private function reduceResultSize($analysis) {
        $projection = [
            '_id' => 1,
            'created_at' => 1
        ];
        
        return $projection;
    }
}

常见问题答疑

Q1: 如何选择合适的分页策略?

选择分页策略的决策树:

php
class PaginationDecisionTree {
    public function decideStrategy($requirements) {
        $dataSize = $requirements['total_documents'] ?? 0;
        $accessPattern = $requirements['access_pattern'] ?? 'sequential';
        $userInterface = $requirements['ui_type'] ?? 'standard';
        $performance = $requirements['performance_requirement'] ?? 'standard';
        
        if ($dataSize < 1000) {
            return 'traditional';
        }
        
        if ($userInterface === 'infinite_scroll') {
            return 'cursor_based';
        }
        
        if ($accessPattern === 'random' && $dataSize < 10000) {
            return 'traditional';
        }
        
        if ($performance === 'high' && $dataSize > 10000) {
            return 'keyset';
        }
        
        if ($dataSize > 100000) {
            return 'keyset';
        }
        
        return 'cursor_based';
    }
}

Q2: 如何处理排序内存溢出问题?

解决排序内存溢出的方法:

php
class SortMemorySolution {
    public function solveMemoryOverflow($filter, $sort) {
        $solutions = [];
        
        $indexSolution = $this->createIndexSolution($sort);
        $solutions[] = $indexSolution;
        
        $limitSolution = $this->applyLimitSolution();
        $solutions[] = $limitSolution;
        
        $chunkSolution = $this->chunkProcessingSolution($filter, $sort);
        $solutions[] = $chunkSolution;
        
        return $solutions;
    }
    
    private function createIndexSolution($sort) {
        return [
            'solution' => 'Create supporting index',
            'implementation' => 'Create index on sort fields',
            'priority' => 'high',
            'example' => 'db.collection.createIndex({field: 1})'
        ];
    }
    
    private function applyLimitSolution() {
        return [
            'solution' => 'Apply early limit',
            'implementation' => 'Reduce result set before sorting',
            'priority' => 'medium',
            'example' => 'Apply more restrictive filter conditions'
        ];
    }
    
    private function chunkProcessingSolution($filter, $sort) {
        return [
            'solution' => 'Process in chunks',
            'implementation' => 'Break large sort into smaller chunks',
            'priority' => 'medium',
            'example' => 'Process data in batches of 10,000 documents'
        ];
    }
}

Q3: 如何优化分页查询性能?

优化分页查询性能的方法:

php
class PaginationPerformanceOptimizer {
    public function optimizePagination($query) {
        $optimizations = [];
        
        if ($query['skip'] > 10000) {
            $optimizations[] = [
                'issue' => 'Large skip value',
                'solution' => 'Use cursor-based pagination',
                'implementation' => 'Replace skip/limit with cursor approach'
            ];
        }
        
        if (!$query['has_sort_index']) {
            $optimizations[] = [
                'issue' => 'Missing sort index',
                'solution' => 'Create index on sort fields',
                'implementation' => 'db.collection.createIndex({sort_field: 1})'
            ];
        }
        
        if ($query['page_size'] > 100) {
            $optimizations[] = [
                'issue' => 'Large page size',
                'solution' => 'Reduce page size',
                'implementation' => 'Use smaller page sizes (20-50 items)'
            ];
        }
        
        return $optimizations;
    }
}

实战练习

练习1: 实现电商网站商品分页

实现完整的电商商品分页系统:

php
class EcommercePaginationSystem {
    private $collection;
    
    public function __construct($collection) {
        $this->collection = $collection;
    }
    
    public function getProducts($request) {
        $filter = $this->buildFilter($request);
        $sort = $this->buildSort($request);
        $pagination = $this->buildPagination($request);
        
        return $this->executePagination($filter, $sort, $pagination);
    }
    
    private function buildFilter($request) {
        $filter = [];
        
        if (!empty($request['category'])) {
            $filter['category'] = $request['category'];
        }
        
        if (!empty($request['price_min']) || !empty($request['price_max'])) {
            $priceFilter = [];
            if (!empty($request['price_min'])) {
                $priceFilter['$gte'] = (float)$request['price_min'];
            }
            if (!empty($request['price_max'])) {
                $priceFilter['$lte'] = (float)$request['price_max'];
            }
            $filter['price'] = $priceFilter;
        }
        
        if (!empty($request['brands'])) {
            $filter['brand'] = ['$in' => $request['brands']];
        }
        
        if (!empty($request['in_stock'])) {
            $filter['stock'] = ['$gt' => 0];
        }
        
        return $filter;
    }
    
    private function buildSort($request) {
        $sortBy = $request['sort_by'] ?? 'created_at';
        $sortOrder = $request['sort_order'] ?? 'desc';
        
        $validFields = ['price', 'created_at', 'rating', 'name'];
        if (in_array($sortBy, $validFields)) {
            return [$sortBy => $sortOrder === 'asc' ? 1 : -1];
        }
        
        return ['created_at' => -1];
    }
    
    private function buildPagination($request) {
        $method = $request['pagination_method'] ?? 'auto';
        $page = (int)($request['page'] ?? 1);
        $pageSize = min((int)($request['page_size'] ?? 20), 100);
        
        return [
            'method' => $method,
            'page' => $page,
            'page_size' => $pageSize,
            'cursor' => $request['cursor'] ?? null
        ];
    }
    
    private function executePagination($filter, $sort, $pagination) {
        if ($pagination['method'] === 'auto') {
            $pagination['method'] = $this->determineBestMethod($pagination);
        }
        
        switch ($pagination['method']) {
            case 'cursor':
                return $this->cursorPagination($filter, $sort, $pagination);
            default:
                return $this->traditionalPagination($filter, $sort, $pagination);
        }
    }
    
    private function determineBestMethod($pagination) {
        $skip = ($pagination['page'] - 1) * $pagination['page_size'];
        return $skip > 10000 ? 'cursor' : 'traditional';
    }
    
    private function traditionalPagination($filter, $sort, $pagination) {
        $skip = ($pagination['page'] - 1) * $pagination['page_size'];
        
        $options = [
            'skip' => $skip,
            'limit' => $pagination['page_size'],
            'sort' => $sort
        ];
        
        $products = $this->collection->find($filter, $options);
        $totalProducts = $this->collection->countDocuments($filter);
        
        return [
            'products' => iterator_to_array($products),
            'pagination' => [
                'current_page' => $pagination['page'],
                'page_size' => $pagination['page_size'],
                'total_products' => $totalProducts,
                'total_pages' => ceil($totalProducts / $pagination['page_size'])
            ]
        ];
    }
    
    private function cursorPagination($filter, $sort, $pagination) {
        $sortField = array_key_first($sort);
        $sortDirection = $sort[$sortField];
        
        if ($pagination['cursor']) {
            $filter[$sortField] = $sortDirection === 1 
                ? ['$gt' => $pagination['cursor']] 
                : ['$lt' => $pagination['cursor']];
        }
        
        $options = [
            'limit' => $pagination['page_size'] + 1,
            'sort' => $sort
        ];
        
        $products = $this->collection->find($filter, $options);
        $productArray = iterator_to_array($products);
        
        $hasMore = count($productArray) > $pagination['page_size'];
        if ($hasMore) {
            array_pop($productArray);
        }
        
        $nextCursor = !empty($productArray) ? end($productArray)[$sortField] : null;
        
        return [
            'products' => $productArray,
            'pagination' => [
                'next_cursor' => $nextCursor,
                'has_more' => $hasMore
            ]
        ];
    }
}

练习2: 实现社交媒体动态排序

实现社交媒体动态的时间线排序:

php
class SocialMediaSorter {
    private $collection;
    
    public function __construct($collection) {
        $this->collection = $collection;
    }
    
    public function getTimeline($userId, $params) {
        $following = $this->getFollowing($userId);
        $filter = ['user_id' => ['$in' => $following]];
        $sort = $this->buildSort($params);
        
        return $this->getSortedPosts($filter, $sort, $params);
    }
    
    private function getFollowing($userId) {
        $user = $this->collection->getDatabase()
            ->selectCollection('users')
            ->findOne(['_id' => $userId]);
        
        return $user['following'] ?? [];
    }
    
    private function buildSort($params) {
        $sortBy = $params['sort_by'] ?? 'created_at';
        
        $sortOptions = [
            'created_at' => ['created_at' => -1],
            'popularity' => ['likes_count' => -1, 'comments_count' => -1],
            'engagement' => ['engagement_score' => -1]
        ];
        
        return $sortOptions[$sortBy] ?? $sortOptions['created_at'];
    }
    
    private function getSortedPosts($filter, $sort, $params) {
        $pageSize = min($params['page_size'] ?? 20, 50);
        $cursor = $params['cursor'] ?? null;
        
        if ($cursor) {
            $sortField = array_key_first($sort);
            $sortDirection = $sort[$sortField];
            $filter[$sortField] = $sortDirection === 1 
                ? ['$gt' => $cursor] 
                : ['$lt' => $cursor];
        }
        
        $options = [
            'limit' => $pageSize + 1,
            'sort' => $sort
        ];
        
        $posts = $this->collection->find($filter, $options);
        $postArray = iterator_to_array($posts);
        
        $hasMore = count($postArray) > $pageSize;
        if ($hasMore) {
            array_pop($postArray);
        }
        
        $sortField = array_key_first($sort);
        $nextCursor = !empty($postArray) ? end($postArray)[$sortField] : null;
        
        return [
            'posts' => $this->enrichPosts($postArray),
            'pagination' => [
                'next_cursor' => $nextCursor,
                'has_more' => $hasMore
            ]
        ];
    }
    
    private function enrichPosts($posts) {
        foreach ($posts as &$post) {
            $post['user'] = $this->getUser($post['user_id']);
        }
        return $posts;
    }
    
    private function getUser($userId) {
        return $this->collection->getDatabase()
            ->selectCollection('users')
            ->findOne(['_id' => $userId], ['projection' => ['name' => 1, 'avatar' => 1]]);
    }
}

知识点总结

核心概念

  1. 排序操作

    • 单字段排序和多字段排序
    • 升序和降序排序
    • 文本分数排序
    • 排序索引优化
  2. 分页策略

    • 传统分页(skip/limit)
    • 游标分页(cursor-based)
    • 键集分页(keyset)
    • 混合分页策略
  3. 性能优化

    • 排序内存限制(32MB)
    • 索引对排序的影响
    • 大skip值的性能问题
    • 分页策略选择

最佳实践

  1. 排序优化

    • 为排序字段创建索引
    • 避免内存溢出
    • 使用投影减少数据传输
    • 监控排序性能
  2. 分页选择

    • 小数据集使用传统分页
    • 大数据集使用游标分页
    • 实时数据使用键集分页
    • 无限滚动使用游标分页
  3. 性能监控

    • 监控查询执行时间
    • 分析索引使用情况
    • 检测慢查询
    • 优化查询计划

拓展参考资料

  • MongoDB官方文档:排序操作
  • MongoDB官方文档:分页查询
  • PHP MongoDB驱动文档:查询选项
  • 数据库分页最佳实践
  • MongoDB索引优化指南