Skip to content

RabbitMQ 与 Elasticsearch 集成

概述

Elasticsearch 是一个强大的分布式搜索和分析引擎,常用于日志分析、全文搜索、指标监控等场景。将 RabbitMQ 与 Elasticsearch 集成,可以实现实时数据索引、日志收集、搜索数据同步等功能。本教程将详细介绍两者的集成方案。

集成架构设计

架构图

┌─────────────────────────────────────────────────────────────────────┐
│                  Elasticsearch 集成架构                              │
│                                                                      │
│  ┌─────────────────────────────────────────────────────────────┐    │
│  │                    数据源层                                  │    │
│  │  ┌─────────────┐    ┌─────────────┐    ┌─────────────┐     │    │
│  │  │   应用日志   │    │   业务数据   │    │   指标数据   │     │    │
│  │  └─────────────┘    └─────────────┘    └─────────────┘     │    │
│  └─────────────────────────────────────────────────────────────┘    │
│                              │                                       │
│                              ▼                                       │
│  ┌─────────────────────────────────────────────────────────────┐    │
│  │                    RabbitMQ                                  │    │
│  │  ┌─────────────┐    ┌─────────────┐    ┌─────────────┐     │    │
│  │  │  Log Queue  │    │ Index Queue │    │ Search Queue│     │    │
│  │  └─────────────┘    └─────────────┘    └─────────────┘     │    │
│  └─────────────────────────────────────────────────────────────┘    │
│                              │                                       │
│                              ▼                                       │
│  ┌─────────────────────────────────────────────────────────────┐    │
│  │                 索引处理层                                   │    │
│  │  ┌─────────────┐    ┌─────────────┐    ┌─────────────┐     │    │
│  │  │   批量索引   │    │   数据转换   │    │   错误处理   │     │    │
│  │  └─────────────┘    └─────────────┘    └─────────────┘     │    │
│  └─────────────────────────────────────────────────────────────┘    │
│                              │                                       │
│                              ▼                                       │
│  ┌─────────────────────────────────────────────────────────────┐    │
│  │                  Elasticsearch Cluster                       │    │
│  │  ┌─────────────┐    ┌─────────────┐    ┌─────────────┐     │    │
│  │  │   Node 1    │    │   Node 2    │    │   Node 3    │     │    │
│  │  │  (Master)   │    │  (Data)     │    │  (Data)     │     │    │
│  │  └─────────────┘    └─────────────┘    └─────────────┘     │    │
│  └─────────────────────────────────────────────────────────────┘    │
└─────────────────────────────────────────────────────────────────────┘

集成模式

模式说明
Log Shipping日志传输,将应用日志发送到 Elasticsearch
Index Sync索引同步,将数据库数据同步到 Elasticsearch
Search Index搜索索引,实时更新搜索索引
Metrics Collection指标收集,收集和存储监控指标

配置示例

Elasticsearch PHP 客户端配置

php
<?php

require_once __DIR__ . '/vendor/autoload.php';

use Elasticsearch\ClientBuilder;

class ElasticsearchConfig
{
    public static function getClient()
    {
        $hosts = explode(',', getenv('ES_HOSTS') ?: 'localhost:9200');
        
        $client = ClientBuilder::create()
            ->setHosts($hosts)
            ->setRetries(3)
            ->setConnectionPool('\Elasticsearch\ConnectionPool\StaticConnectionPool', [])
            ->setSelector('\Elasticsearch\ConnectionPool\Selectors\RoundRobinSelector')
            ->build();
        
        return $client;
    }
}

RabbitMQ 连接配置

php
<?php

use PhpAmqpLib\Connection\AMQPStreamConnection;

class RabbitMQConfig
{
    public static function getConnection(): AMQPStreamConnection
    {
        return new AMQPStreamConnection(
            getenv('RABBITMQ_HOST') ?: 'localhost',
            getenv('RABBITMQ_PORT') ?: 5672,
            getenv('RABBITMQ_USER') ?: 'guest',
            getenv('RABBITMQ_PASSWORD') ?: 'guest',
            getenv('RABBITMQ_VHOST') ?: '/'
        );
    }
}

PHP 代码示例

日志索引器

php
<?php

class LogIndexer
{
    private $esClient;
    private $rabbitChannel;
    private $rabbitConnection;
    
    private const INDEX_PREFIX = 'logs-';
    private const BATCH_SIZE = 100;
    
    public function __construct($esClient, AMQPStreamConnection $rabbitConnection)
    {
        $this->esClient = $esClient;
        $this->rabbitConnection = $rabbitConnection;
        $this->rabbitChannel = $rabbitConnection->channel();
        $this->setupInfrastructure();
    }
    
    private function setupInfrastructure(): void
    {
        $this->rabbitChannel->exchange_declare(
            'logs.exchange',
            'topic',
            false,
            true,
            false
        );
        
        $this->rabbitChannel->queue_declare(
            'logs.indexing',
            false,
            true,
            false,
            false
        );
        
        $this->rabbitChannel->queue_bind(
            'logs.indexing',
            'logs.exchange',
            'log.#'
        );
        
        $this->createIndexTemplate();
    }
    
    private function createIndexTemplate(): void
    {
        $template = [
            'index_patterns' => [self::INDEX_PREFIX . '*'],
            'settings' => [
                'number_of_shards' => 3,
                'number_of_replicas' => 1,
                'refresh_interval' => '5s',
            ],
            'mappings' => [
                'properties' => [
                    '@timestamp' => ['type' => 'date'],
                    'level' => ['type' => 'keyword'],
                    'message' => ['type' => 'text'],
                    'logger' => ['type' => 'keyword'],
                    'context' => ['type' => 'object', 'enabled' => true],
                    'extra' => ['type' => 'object', 'enabled' => true],
                    'host' => ['type' => 'keyword'],
                    'service' => ['type' => 'keyword'],
                    'trace_id' => ['type' => 'keyword'],
                    'span_id' => ['type' => 'keyword'],
                ]
            ]
        ];
        
        try {
            $this->esClient->indices()->putTemplate([
                'name' => 'logs-template',
                'body' => $template
            ]);
        } catch (Exception $e) {
            error_log("创建索引模板失败: " . $e->getMessage());
        }
    }
    
    public function indexLogs(): void
    {
        $batch = [];
        $callback = function ($message) use (&$batch) {
            $log = json_decode($message->getBody(), true);
            
            $indexName = self::INDEX_PREFIX . date('Y.m.d');
            
            $batch[] = [
                'index' => [
                    '_index' => $indexName,
                ]
            ];
            
            $batch[] = $this->formatLogEntry($log);
            
            if (count($batch) >= self::BATCH_SIZE * 2) {
                $this->flushBatch($batch);
                $batch = [];
            }
            
            $message->ack();
        };
        
        $this->rabbitChannel->basic_qos(0, self::BATCH_SIZE, false);
        $this->rabbitChannel->basic_consume(
            'logs.indexing',
            '',
            false,
            false,
            false,
            false,
            $callback
        );
        
        while ($this->rabbitChannel->is_consuming()) {
            $this->rabbitChannel->wait();
            
            if (!empty($batch)) {
                $this->flushBatch($batch);
                $batch = [];
            }
        }
    }
    
    private function formatLogEntry(array $log): array
    {
        return [
            '@timestamp' => $log['timestamp'] ?? date('c'),
            'level' => strtoupper($log['level'] ?? 'INFO'),
            'message' => $log['message'] ?? '',
            'logger' => $log['logger'] ?? 'default',
            'context' => $log['context'] ?? [],
            'extra' => $log['extra'] ?? [],
            'host' => gethostname(),
            'service' => $log['service'] ?? 'unknown',
            'trace_id' => $log['trace_id'] ?? null,
            'span_id' => $log['span_id'] ?? null,
        ];
    }
    
    private function flushBatch(array $batch): void
    {
        if (empty($batch)) {
            return;
        }
        
        try {
            $response = $this->esClient->bulk([
                'body' => $batch,
                'refresh' => false,
            ]);
            
            if ($response['errors']) {
                foreach ($response['items'] as $item) {
                    if (isset($item['index']['error'])) {
                        error_log("索引错误: " . json_encode($item['index']['error']));
                    }
                }
            }
        } catch (Exception $e) {
            error_log("批量索引失败: " . $e->getMessage());
        }
    }
    
    public function close(): void
    {
        $this->rabbitChannel->close();
        $this->rabbitConnection->close();
    }
}

// 使用示例
$esClient = ElasticsearchConfig::getClient();
$rabbit = RabbitMQConfig::getConnection();
$indexer = new LogIndexer($esClient, $rabbit);

$indexer->indexLogs();
$indexer->close();

搜索索引同步器

php
<?php

class SearchIndexSync
{
    private $esClient;
    private $rabbitChannel;
    private $rabbitConnection;
    
    public function __construct($esClient, AMQPStreamConnection $rabbitConnection)
    {
        $this->esClient = $esClient;
        $this->rabbitConnection = $rabbitConnection;
        $this->rabbitChannel = $rabbitConnection->channel();
        $this->setupInfrastructure();
    }
    
    private function setupInfrastructure(): void
    {
        $this->rabbitChannel->exchange_declare(
            'search.sync',
            'direct',
            false,
            true,
            false
        );
        
        $this->rabbitChannel->queue_declare(
            'search.index.updates',
            false,
            true,
            false,
            false,
            false,
            new \PhpAmqpLib\Wire\AMQPTable([
                'x-dead-letter-exchange' => 'search.dlx',
                'x-dead-letter-routing-key' => 'failed',
            ])
        );
        
        $this->rabbitChannel->queue_bind(
            'search.index.updates',
            'search.sync',
            'index.update'
        );
        
        $this->createProductIndex();
    }
    
    private function createProductIndex(): void
    {
        $indexName = 'products';
        
        $mappings = [
            'properties' => [
                'id' => ['type' => 'keyword'],
                'name' => [
                    'type' => 'text',
                    'analyzer' => 'standard',
                    'fields' => [
                        'keyword' => ['type' => 'keyword'],
                        'suggest' => ['type' => 'completion'],
                    ]
                ],
                'description' => ['type' => 'text'],
                'price' => ['type' => 'float'],
                'category' => ['type' => 'keyword'],
                'brand' => ['type' => 'keyword'],
                'tags' => ['type' => 'keyword'],
                'attributes' => [
                    'type' => 'nested',
                    'properties' => [
                        'name' => ['type' => 'keyword'],
                        'value' => ['type' => 'keyword'],
                    ]
                ],
                'inventory' => [
                    'type' => 'object',
                    'properties' => [
                        'quantity' => ['type' => 'integer'],
                        'location' => ['type' => 'keyword'],
                    ]
                ],
                'status' => ['type' => 'keyword'],
                'created_at' => ['type' => 'date'],
                'updated_at' => ['type' => 'date'],
            ]
        ];
        
        try {
            if (!$this->esClient->indices()->exists(['index' => $indexName])) {
                $this->esClient->indices()->create([
                    'index' => $indexName,
                    'body' => [
                        'settings' => [
                            'number_of_shards' => 3,
                            'number_of_replicas' => 1,
                        ],
                        'mappings' => $mappings
                    ]
                ]);
            }
        } catch (Exception $e) {
            error_log("创建索引失败: " . $e->getMessage());
        }
    }
    
    public function consumeIndexUpdates(): void
    {
        $callback = function ($message) {
            $data = json_decode($message->getBody(), true);
            
            try {
                $operation = $data['operation'] ?? 'index';
                
                switch ($operation) {
                    case 'index':
                        $this->indexDocument($data);
                        break;
                    case 'update':
                        $this->updateDocument($data);
                        break;
                    case 'delete':
                        $this->deleteDocument($data);
                        break;
                    case 'bulk':
                        $this->bulkIndex($data['documents']);
                        break;
                }
                
                $message->ack();
            } catch (Exception $e) {
                error_log("索引更新失败: " . $e->getMessage());
                $message->nack(false, false);
            }
        };
        
        $this->rabbitChannel->basic_qos(0, 10, false);
        $this->rabbitChannel->basic_consume(
            'search.index.updates',
            '',
            false,
            false,
            false,
            false,
            $callback
        );
        
        while ($this->rabbitChannel->is_consuming()) {
            $this->rabbitChannel->wait();
        }
    }
    
    private function indexDocument(array $data): void
    {
        $params = [
            'index' => $data['index'] ?? 'products',
            'id' => $data['id'],
            'body' => $data['document']
        ];
        
        $this->esClient->index($params);
    }
    
    private function updateDocument(array $data): void
    {
        $params = [
            'index' => $data['index'] ?? 'products',
            'id' => $data['id'],
            'body' => [
                'doc' => $data['updates'],
                'doc_as_upsert' => true
            ]
        ];
        
        $this->esClient->update($params);
    }
    
    private function deleteDocument(array $data): void
    {
        $params = [
            'index' => $data['index'] ?? 'products',
            'id' => $data['id']
        ];
        
        $this->esClient->delete($params);
    }
    
    private function bulkIndex(array $documents): void
    {
        $batch = [];
        
        foreach ($documents as $doc) {
            $batch[] = [
                'index' => [
                    '_index' => $doc['index'] ?? 'products',
                    '_id' => $doc['id']
                ]
            ];
            $batch[] = $doc['document'];
        }
        
        $this->esClient->bulk(['body' => $batch]);
    }
    
    public function close(): void
    {
        $this->rabbitChannel->close();
        $this->rabbitConnection->close();
    }
}

// 使用示例
$esClient = ElasticsearchConfig::getClient();
$rabbit = RabbitMQConfig::getConnection();
$sync = new SearchIndexSync($esClient, $rabbit);

$sync->consumeIndexUpdates();
$sync->close();

搜索生产者

php
<?php

class SearchProducer
{
    private $rabbitChannel;
    private $rabbitConnection;
    
    public function __construct(AMQPStreamConnection $rabbitConnection)
    {
        $this->rabbitConnection = $rabbitConnection;
        $this->rabbitChannel = $rabbitConnection->channel();
        $this->setupExchange();
    }
    
    private function setupExchange(): void
    {
        $this->rabbitChannel->exchange_declare(
            'search.sync',
            'direct',
            false,
            true,
            false
        );
    }
    
    public function indexProduct(array $product): bool
    {
        $message = new \PhpAmqpLib\Message\AMQPMessage(
            json_encode([
                'operation' => 'index',
                'index' => 'products',
                'id' => $product['id'],
                'document' => $product
            ]),
            [
                'content_type' => 'application/json',
                'delivery_mode' => \PhpAmqpLib\Message\AMQPMessage::DELIVERY_MODE_PERSISTENT,
            ]
        );
        
        $this->rabbitChannel->basic_publish(
            $message,
            'search.sync',
            'index.update'
        );
        
        return true;
    }
    
    public function updateProduct(string $productId, array $updates): bool
    {
        $message = new \PhpAmqpLib\Message\AMQPMessage(
            json_encode([
                'operation' => 'update',
                'index' => 'products',
                'id' => $productId,
                'updates' => $updates
            ]),
            [
                'content_type' => 'application/json',
                'delivery_mode' => \PhpAmqpLib\Message\AMQPMessage::DELIVERY_MODE_PERSISTENT,
            ]
        );
        
        $this->rabbitChannel->basic_publish(
            $message,
            'search.sync',
            'index.update'
        );
        
        return true;
    }
    
    public function deleteProduct(string $productId): bool
    {
        $message = new \PhpAmqpLib\Message\AMQPMessage(
            json_encode([
                'operation' => 'delete',
                'index' => 'products',
                'id' => $productId
            ]),
            [
                'content_type' => 'application/json',
                'delivery_mode' => \PhpAmqpLib\Message\AMQPMessage::DELIVERY_MODE_PERSISTENT,
            ]
        );
        
        $this->rabbitChannel->basic_publish(
            $message,
            'search.sync',
            'index.update'
        );
        
        return true;
    }
    
    public function bulkIndexProducts(array $products): bool
    {
        $documents = array_map(function ($product) {
            return [
                'index' => 'products',
                'id' => $product['id'],
                'document' => $product
            ];
        }, $products);
        
        $message = new \PhpAmqpLib\Message\AMQPMessage(
            json_encode([
                'operation' => 'bulk',
                'documents' => $documents
            ]),
            [
                'content_type' => 'application/json',
                'delivery_mode' => \PhpAmqpLib\Message\AMQPMessage::DELIVERY_MODE_PERSISTENT,
            ]
        );
        
        $this->rabbitChannel->basic_publish(
            $message,
            'search.sync',
            'index.update'
        );
        
        return true;
    }
    
    public function close(): void
    {
        $this->rabbitChannel->close();
        $this->rabbitConnection->close();
    }
}

// 使用示例
$rabbit = RabbitMQConfig::getConnection();
$producer = new SearchProducer($rabbit);

$producer->indexProduct([
    'id' => 'PROD-001',
    'name' => 'iPhone 15 Pro',
    'description' => '最新款 iPhone',
    'price' => 999.99,
    'category' => 'electronics',
    'brand' => 'Apple',
    'tags' => ['phone', 'smartphone', 'apple'],
    'status' => 'active',
    'created_at' => date('c'),
    'updated_at' => date('c'),
]);

$producer->updateProduct('PROD-001', [
    'price' => 899.99,
    'status' => 'sale'
]);

$producer->close();

搜索服务

php
<?php

class SearchService
{
    private $esClient;
    
    public function __construct($esClient)
    {
        $this->esClient = $esClient;
    }
    
    public function searchProducts(array $criteria): array
    {
        $query = $this->buildQuery($criteria);
        
        $params = [
            'index' => 'products',
            'body' => $query
        ];
        
        $response = $this->esClient->search($params);
        
        return $this->formatResults($response);
    }
    
    private function buildQuery(array $criteria): array
    {
        $must = [];
        $filter = [];
        
        if (!empty($criteria['query'])) {
            $must[] = [
                'multi_match' => [
                    'query' => $criteria['query'],
                    'fields' => ['name^2', 'description', 'brand'],
                    'type' => 'best_fields'
                ]
            ];
        }
        
        if (!empty($criteria['category'])) {
            $filter[] = ['term' => ['category' => $criteria['category']]];
        }
        
        if (!empty($criteria['brand'])) {
            $filter[] = ['term' => ['brand' => $criteria['brand']]];
        }
        
        if (!empty($criteria['tags'])) {
            $filter[] = ['terms' => ['tags' => (array)$criteria['tags']]];
        }
        
        if (isset($criteria['min_price']) || isset($criteria['max_price'])) {
            $range = [];
            if (isset($criteria['min_price'])) {
                $range['gte'] = $criteria['min_price'];
            }
            if (isset($criteria['max_price'])) {
                $range['lte'] = $criteria['max_price'];
            }
            $filter[] = ['range' => ['price' => $range]];
        }
        
        $query = [
            'query' => [
                'bool' => []
            ]
        ];
        
        if (!empty($must)) {
            $query['query']['bool']['must'] = $must;
        }
        
        if (!empty($filter)) {
            $query['query']['bool']['filter'] = $filter;
        }
        
        if (!empty($criteria['sort'])) {
            $query['sort'] = $this->buildSort($criteria['sort']);
        }
        
        $query['from'] = $criteria['from'] ?? 0;
        $query['size'] = $criteria['size'] ?? 20;
        
        if (!empty($criteria['aggs'])) {
            $query['aggs'] = $this->buildAggregations($criteria['aggs']);
        }
        
        return $query;
    }
    
    private function buildSort(array $sort): array
    {
        $result = [];
        
        foreach ($sort as $field => $order) {
            $result[] = [$field => ['order' => $order]];
        }
        
        return $result;
    }
    
    private function buildAggregations(array $aggs): array
    {
        $result = [];
        
        if (in_array('category', $aggs)) {
            $result['categories'] = [
                'terms' => ['field' => 'category', 'size' => 10]
            ];
        }
        
        if (in_array('brand', $aggs)) {
            $result['brands'] = [
                'terms' => ['field' => 'brand', 'size' => 10]
            ];
        }
        
        if (in_array('price_range', $aggs)) {
            $result['price_ranges'] = [
                'range' => [
                    'field' => 'price',
                    'ranges' => [
                        ['to' => 100],
                        ['from' => 100, 'to' => 500],
                        ['from' => 500, 'to' => 1000],
                        ['from' => 1000]
                    ]
                ]
            ];
        }
        
        return $result;
    }
    
    private function formatResults(array $response): array
    {
        $hits = $response['hits'];
        
        $results = [
            'total' => $hits['total']['value'] ?? 0,
            'took' => $response['took'] ?? 0,
            'timed_out' => $response['timed_out'] ?? false,
            'hits' => []
        ];
        
        foreach ($hits['hits'] as $hit) {
            $results['hits'][] = [
                'id' => $hit['_id'],
                'score' => $hit['_score'] ?? null,
                'source' => $hit['_source'],
                'highlight' => $hit['highlight'] ?? null
            ];
        }
        
        if (!empty($response['aggregations'])) {
            $results['aggregations'] = $response['aggregations'];
        }
        
        return $results;
    }
    
    public function suggest(string $prefix, string $field = 'name'): array
    {
        $params = [
            'index' => 'products',
            'body' => [
                'suggest' => [
                    'product_suggest' => [
                        'prefix' => $prefix,
                        'completion' => [
                            'field' => $field . '.suggest',
                            'size' => 10,
                            'skip_duplicates' => true
                        ]
                    ]
                ]
            ]
        ];
        
        $response = $this->esClient->search($params);
        
        $suggestions = [];
        
        if (!empty($response['suggest']['product_suggest'][0]['options'])) {
            foreach ($response['suggest']['product_suggest'][0]['options'] as $option) {
                $suggestions[] = [
                    'text' => $option['text'],
                    'score' => $option['_score'],
                    'source' => $option['_source'] ?? null
                ];
            }
        }
        
        return $suggestions;
    }
}

// 使用示例
$esClient = ElasticsearchConfig::getClient();
$search = new SearchService($esClient);

$results = $search->searchProducts([
    'query' => 'iPhone',
    'category' => 'electronics',
    'min_price' => 500,
    'max_price' => 1500,
    'sort' => ['price' => 'asc'],
    'size' => 20,
    'aggs' => ['category', 'brand', 'price_range']
]);

print_r($results);

实际应用场景

场景一:实时日志分析

php
<?php

class RealTimeLogAnalyzer
{
    private $esClient;
    private $rabbitChannel;
    
    public function __construct($esClient, AMQPStreamConnection $rabbit)
    {
        $this->esClient = $esClient;
        $this->rabbitChannel = $rabbit->channel();
    }
    
    public function analyzeErrors(int $minutes = 5): array
    {
        $params = [
            'index' => 'logs-*',
            'body' => [
                'query' => [
                    'bool' => [
                        'must' => [
                            ['term' => ['level' => 'ERROR']],
                            ['range' => ['@timestamp' => [
                                'gte' => 'now-' . $minutes . 'm'
                            ]]]
                        ]
                    ]
                ],
                'aggs' => [
                    'by_service' => [
                        'terms' => ['field' => 'service', 'size' => 10],
                        'aggs' => [
                            'by_message' => [
                                'terms' => ['field' => 'message.keyword', 'size' => 5]
                            ]
                        ]
                    ]
                ],
                'size' => 0
            ]
        ];
        
        return $this->esClient->search($params);
    }
    
    public function getErrorTrend(int $hours = 24): array
    {
        $params = [
            'index' => 'logs-*',
            'body' => [
                'query' => [
                    'term' => ['level' => 'ERROR']
                ],
                'aggs' => [
                    'trend' => [
                        'date_histogram' => [
                            'field' => '@timestamp',
                            'calendar_interval' => 'hour',
                            'min_doc_count' => 0
                        ]
                    ]
                ],
                'size' => 0
            ]
        ];
        
        return $this->esClient->search($params);
    }
    
    public function alertOnThreshold(int $threshold = 100, int $minutes = 5): void
    {
        $result = $this->analyzeErrors($minutes);
        
        $totalErrors = $result['hits']['total']['value'] ?? 0;
        
        if ($totalErrors > $threshold) {
            $this->sendAlert($totalErrors, $threshold);
        }
    }
    
    private function sendAlert(int $count, int $threshold): void
    {
        $alert = [
            'type' => 'error_threshold_exceeded',
            'count' => $count,
            'threshold' => $threshold,
            'timestamp' => date('c')
        ];
        
        $message = new \PhpAmqpLib\Message\AMQPMessage(
            json_encode($alert),
            ['content_type' => 'application/json']
        );
        
        $this->rabbitChannel->basic_publish(
            $message,
            'alerts.exchange',
            'alert.error'
        );
    }
}

场景二:电商搜索

php
<?php

class EcommerceSearch
{
    private $esClient;
    private $rabbitChannel;
    
    public function __construct($esClient, AMQPStreamConnection $rabbit)
    {
        $this->esClient = $esClient;
        $this->rabbitChannel = $rabbit->channel();
    }
    
    public function searchWithPersonalization(string $query, string $userId): array
    {
        $userPreferences = $this->getUserPreferences($userId);
        
        $params = [
            'index' => 'products',
            'body' => [
                'query' => [
                    'function_score' => [
                        'query' => [
                            'multi_match' => [
                                'query' => $query,
                                'fields' => ['name^2', 'description', 'brand']
                            ]
                        ],
                        'functions' => [
                            [
                                'filter' => ['terms' => ['category' => $userPreferences['categories'] ?? []]],
                                'weight' => 1.5
                            ],
                            [
                                'filter' => ['terms' => ['brand' => $userPreferences['brands'] ?? []]],
                                'weight' => 1.3
                            ],
                            [
                                'field_value_factor' => [
                                    'field' => 'popularity',
                                    'factor' => 0.1,
                                    'modifier' => 'log1p'
                                ]
                            ]
                        ],
                        'score_mode' => 'sum',
                        'boost_mode' => 'multiply'
                    ]
                ]
            ]
        ];
        
        return $this->esClient->search($params);
    }
    
    private function getUserPreferences(string $userId): array
    {
        $params = [
            'index' => 'user_preferences',
            'id' => $userId
        ];
        
        try {
            $result = $this->esClient->get($params);
            return $result['_source'] ?? [];
        } catch (Exception $e) {
            return [];
        }
    }
    
    public function recordSearch(string $userId, string $query, array $results): void
    {
        $searchEvent = [
            'userId' => $userId,
            'query' => $query,
            'resultCount' => count($results),
            'timestamp' => date('c')
        ];
        
        $message = new \PhpAmqpLib\Message\AMQPMessage(
            json_encode($searchEvent),
            ['content_type' => 'application/json']
        );
        
        $this->rabbitChannel->basic_publish(
            $message,
            'analytics.exchange',
            'search.event'
        );
    }
}

常见问题与解决方案

问题一:索引延迟

症状: 数据更新后搜索结果未及时更新

解决方案: 使用 refresh 参数或优化索引设置

php
$this->esClient->index([
    'index' => 'products',
    'id' => $productId,
    'refresh' => true,
    'body' => $document
]);

问题二:批量索引失败

症状: 大批量索引时出现错误

解决方案: 分批处理并添加错误重试

php
private function safeBulkIndex(array $batch, int $maxRetries = 3): void
{
    $attempt = 0;
    
    while ($attempt < $maxRetries) {
        try {
            $this->esClient->bulk(['body' => $batch]);
            return;
        } catch (Exception $e) {
            $attempt++;
            if ($attempt >= $maxRetries) {
                throw $e;
            }
            sleep(1);
        }
    }
}

问题三:搜索性能下降

症状: 搜索响应时间变长

解决方案: 优化查询和添加缓存

php
public function searchWithCache(array $criteria): array
{
    $cacheKey = md5(json_encode($criteria));
    
    if ($cached = $this->cache->get($cacheKey)) {
        return $cached;
    }
    
    $results = $this->searchProducts($criteria);
    
    $this->cache->set($cacheKey, $results, 300);
    
    return $results;
}

最佳实践建议

1. 索引模板管理

php
private function createIndexTemplates(): void
{
    $templates = [
        'logs-template' => $this->getLogTemplate(),
        'products-template' => $this->getProductTemplate(),
        'metrics-template' => $this->getMetricsTemplate(),
    ];
    
    foreach ($templates as $name => $template) {
        $this->esClient->indices()->putTemplate([
            'name' => $name,
            'body' => $template
        ]);
    }
}

2. 错误处理

php
class ElasticsearchErrorHandler
{
    public static function handle(Exception $e, array $context = []): void
    {
        $error = [
            'message' => $e->getMessage(),
            'code' => $e->getCode(),
            'context' => $context,
            'timestamp' => date('c')
        ];
        
        error_log(json_encode($error));
    }
}

3. 监控指标

php
class ElasticsearchMetrics
{
    public static function recordQueryTime(string $index, float $time): void
    {
        // 记录查询时间到监控系统
    }
    
    public static function recordIndexCount(string $index, int $count): void
    {
        // 记录索引数量
    }
}

版本兼容性

PHPElasticsearch PHP ClientElasticsearch ServerRabbitMQ Server
8.2+8.x8.x3.11+
8.1+8.x8.x3.10+
8.0+7.x/8.x7.x/8.x3.9+
7.4+7.x7.x3.8+

相关链接