Appearance
RabbitMQ 与 Elasticsearch 集成
概述
Elasticsearch 是一个强大的分布式搜索和分析引擎,常用于日志分析、全文搜索、指标监控等场景。将 RabbitMQ 与 Elasticsearch 集成,可以实现实时数据索引、日志收集、搜索数据同步等功能。本教程将详细介绍两者的集成方案。
集成架构设计
架构图
┌─────────────────────────────────────────────────────────────────────┐
│ Elasticsearch 集成架构 │
│ │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ 数据源层 │ │
│ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │
│ │ │ 应用日志 │ │ 业务数据 │ │ 指标数据 │ │ │
│ │ └─────────────┘ └─────────────┘ └─────────────┘ │ │
│ └─────────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ RabbitMQ │ │
│ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │
│ │ │ Log Queue │ │ Index Queue │ │ Search Queue│ │ │
│ │ └─────────────┘ └─────────────┘ └─────────────┘ │ │
│ └─────────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ 索引处理层 │ │
│ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │
│ │ │ 批量索引 │ │ 数据转换 │ │ 错误处理 │ │ │
│ │ └─────────────┘ └─────────────┘ └─────────────┘ │ │
│ └─────────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ Elasticsearch Cluster │ │
│ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │
│ │ │ Node 1 │ │ Node 2 │ │ Node 3 │ │ │
│ │ │ (Master) │ │ (Data) │ │ (Data) │ │ │
│ │ └─────────────┘ └─────────────┘ └─────────────┘ │ │
│ └─────────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────────┘集成模式
| 模式 | 说明 |
|---|---|
| Log Shipping | 日志传输,将应用日志发送到 Elasticsearch |
| Index Sync | 索引同步,将数据库数据同步到 Elasticsearch |
| Search Index | 搜索索引,实时更新搜索索引 |
| Metrics Collection | 指标收集,收集和存储监控指标 |
配置示例
Elasticsearch PHP 客户端配置
php
<?php
require_once __DIR__ . '/vendor/autoload.php';
use Elasticsearch\ClientBuilder;
class ElasticsearchConfig
{
public static function getClient()
{
$hosts = explode(',', getenv('ES_HOSTS') ?: 'localhost:9200');
$client = ClientBuilder::create()
->setHosts($hosts)
->setRetries(3)
->setConnectionPool('\Elasticsearch\ConnectionPool\StaticConnectionPool', [])
->setSelector('\Elasticsearch\ConnectionPool\Selectors\RoundRobinSelector')
->build();
return $client;
}
}RabbitMQ 连接配置
php
<?php
use PhpAmqpLib\Connection\AMQPStreamConnection;
class RabbitMQConfig
{
public static function getConnection(): AMQPStreamConnection
{
return new AMQPStreamConnection(
getenv('RABBITMQ_HOST') ?: 'localhost',
getenv('RABBITMQ_PORT') ?: 5672,
getenv('RABBITMQ_USER') ?: 'guest',
getenv('RABBITMQ_PASSWORD') ?: 'guest',
getenv('RABBITMQ_VHOST') ?: '/'
);
}
}PHP 代码示例
日志索引器
php
<?php
class LogIndexer
{
private $esClient;
private $rabbitChannel;
private $rabbitConnection;
private const INDEX_PREFIX = 'logs-';
private const BATCH_SIZE = 100;
public function __construct($esClient, AMQPStreamConnection $rabbitConnection)
{
$this->esClient = $esClient;
$this->rabbitConnection = $rabbitConnection;
$this->rabbitChannel = $rabbitConnection->channel();
$this->setupInfrastructure();
}
private function setupInfrastructure(): void
{
$this->rabbitChannel->exchange_declare(
'logs.exchange',
'topic',
false,
true,
false
);
$this->rabbitChannel->queue_declare(
'logs.indexing',
false,
true,
false,
false
);
$this->rabbitChannel->queue_bind(
'logs.indexing',
'logs.exchange',
'log.#'
);
$this->createIndexTemplate();
}
private function createIndexTemplate(): void
{
$template = [
'index_patterns' => [self::INDEX_PREFIX . '*'],
'settings' => [
'number_of_shards' => 3,
'number_of_replicas' => 1,
'refresh_interval' => '5s',
],
'mappings' => [
'properties' => [
'@timestamp' => ['type' => 'date'],
'level' => ['type' => 'keyword'],
'message' => ['type' => 'text'],
'logger' => ['type' => 'keyword'],
'context' => ['type' => 'object', 'enabled' => true],
'extra' => ['type' => 'object', 'enabled' => true],
'host' => ['type' => 'keyword'],
'service' => ['type' => 'keyword'],
'trace_id' => ['type' => 'keyword'],
'span_id' => ['type' => 'keyword'],
]
]
];
try {
$this->esClient->indices()->putTemplate([
'name' => 'logs-template',
'body' => $template
]);
} catch (Exception $e) {
error_log("创建索引模板失败: " . $e->getMessage());
}
}
public function indexLogs(): void
{
$batch = [];
$callback = function ($message) use (&$batch) {
$log = json_decode($message->getBody(), true);
$indexName = self::INDEX_PREFIX . date('Y.m.d');
$batch[] = [
'index' => [
'_index' => $indexName,
]
];
$batch[] = $this->formatLogEntry($log);
if (count($batch) >= self::BATCH_SIZE * 2) {
$this->flushBatch($batch);
$batch = [];
}
$message->ack();
};
$this->rabbitChannel->basic_qos(0, self::BATCH_SIZE, false);
$this->rabbitChannel->basic_consume(
'logs.indexing',
'',
false,
false,
false,
false,
$callback
);
while ($this->rabbitChannel->is_consuming()) {
$this->rabbitChannel->wait();
if (!empty($batch)) {
$this->flushBatch($batch);
$batch = [];
}
}
}
private function formatLogEntry(array $log): array
{
return [
'@timestamp' => $log['timestamp'] ?? date('c'),
'level' => strtoupper($log['level'] ?? 'INFO'),
'message' => $log['message'] ?? '',
'logger' => $log['logger'] ?? 'default',
'context' => $log['context'] ?? [],
'extra' => $log['extra'] ?? [],
'host' => gethostname(),
'service' => $log['service'] ?? 'unknown',
'trace_id' => $log['trace_id'] ?? null,
'span_id' => $log['span_id'] ?? null,
];
}
private function flushBatch(array $batch): void
{
if (empty($batch)) {
return;
}
try {
$response = $this->esClient->bulk([
'body' => $batch,
'refresh' => false,
]);
if ($response['errors']) {
foreach ($response['items'] as $item) {
if (isset($item['index']['error'])) {
error_log("索引错误: " . json_encode($item['index']['error']));
}
}
}
} catch (Exception $e) {
error_log("批量索引失败: " . $e->getMessage());
}
}
public function close(): void
{
$this->rabbitChannel->close();
$this->rabbitConnection->close();
}
}
// 使用示例
$esClient = ElasticsearchConfig::getClient();
$rabbit = RabbitMQConfig::getConnection();
$indexer = new LogIndexer($esClient, $rabbit);
$indexer->indexLogs();
$indexer->close();搜索索引同步器
php
<?php
class SearchIndexSync
{
private $esClient;
private $rabbitChannel;
private $rabbitConnection;
public function __construct($esClient, AMQPStreamConnection $rabbitConnection)
{
$this->esClient = $esClient;
$this->rabbitConnection = $rabbitConnection;
$this->rabbitChannel = $rabbitConnection->channel();
$this->setupInfrastructure();
}
private function setupInfrastructure(): void
{
$this->rabbitChannel->exchange_declare(
'search.sync',
'direct',
false,
true,
false
);
$this->rabbitChannel->queue_declare(
'search.index.updates',
false,
true,
false,
false,
false,
new \PhpAmqpLib\Wire\AMQPTable([
'x-dead-letter-exchange' => 'search.dlx',
'x-dead-letter-routing-key' => 'failed',
])
);
$this->rabbitChannel->queue_bind(
'search.index.updates',
'search.sync',
'index.update'
);
$this->createProductIndex();
}
private function createProductIndex(): void
{
$indexName = 'products';
$mappings = [
'properties' => [
'id' => ['type' => 'keyword'],
'name' => [
'type' => 'text',
'analyzer' => 'standard',
'fields' => [
'keyword' => ['type' => 'keyword'],
'suggest' => ['type' => 'completion'],
]
],
'description' => ['type' => 'text'],
'price' => ['type' => 'float'],
'category' => ['type' => 'keyword'],
'brand' => ['type' => 'keyword'],
'tags' => ['type' => 'keyword'],
'attributes' => [
'type' => 'nested',
'properties' => [
'name' => ['type' => 'keyword'],
'value' => ['type' => 'keyword'],
]
],
'inventory' => [
'type' => 'object',
'properties' => [
'quantity' => ['type' => 'integer'],
'location' => ['type' => 'keyword'],
]
],
'status' => ['type' => 'keyword'],
'created_at' => ['type' => 'date'],
'updated_at' => ['type' => 'date'],
]
];
try {
if (!$this->esClient->indices()->exists(['index' => $indexName])) {
$this->esClient->indices()->create([
'index' => $indexName,
'body' => [
'settings' => [
'number_of_shards' => 3,
'number_of_replicas' => 1,
],
'mappings' => $mappings
]
]);
}
} catch (Exception $e) {
error_log("创建索引失败: " . $e->getMessage());
}
}
public function consumeIndexUpdates(): void
{
$callback = function ($message) {
$data = json_decode($message->getBody(), true);
try {
$operation = $data['operation'] ?? 'index';
switch ($operation) {
case 'index':
$this->indexDocument($data);
break;
case 'update':
$this->updateDocument($data);
break;
case 'delete':
$this->deleteDocument($data);
break;
case 'bulk':
$this->bulkIndex($data['documents']);
break;
}
$message->ack();
} catch (Exception $e) {
error_log("索引更新失败: " . $e->getMessage());
$message->nack(false, false);
}
};
$this->rabbitChannel->basic_qos(0, 10, false);
$this->rabbitChannel->basic_consume(
'search.index.updates',
'',
false,
false,
false,
false,
$callback
);
while ($this->rabbitChannel->is_consuming()) {
$this->rabbitChannel->wait();
}
}
private function indexDocument(array $data): void
{
$params = [
'index' => $data['index'] ?? 'products',
'id' => $data['id'],
'body' => $data['document']
];
$this->esClient->index($params);
}
private function updateDocument(array $data): void
{
$params = [
'index' => $data['index'] ?? 'products',
'id' => $data['id'],
'body' => [
'doc' => $data['updates'],
'doc_as_upsert' => true
]
];
$this->esClient->update($params);
}
private function deleteDocument(array $data): void
{
$params = [
'index' => $data['index'] ?? 'products',
'id' => $data['id']
];
$this->esClient->delete($params);
}
private function bulkIndex(array $documents): void
{
$batch = [];
foreach ($documents as $doc) {
$batch[] = [
'index' => [
'_index' => $doc['index'] ?? 'products',
'_id' => $doc['id']
]
];
$batch[] = $doc['document'];
}
$this->esClient->bulk(['body' => $batch]);
}
public function close(): void
{
$this->rabbitChannel->close();
$this->rabbitConnection->close();
}
}
// 使用示例
$esClient = ElasticsearchConfig::getClient();
$rabbit = RabbitMQConfig::getConnection();
$sync = new SearchIndexSync($esClient, $rabbit);
$sync->consumeIndexUpdates();
$sync->close();搜索生产者
php
<?php
class SearchProducer
{
private $rabbitChannel;
private $rabbitConnection;
public function __construct(AMQPStreamConnection $rabbitConnection)
{
$this->rabbitConnection = $rabbitConnection;
$this->rabbitChannel = $rabbitConnection->channel();
$this->setupExchange();
}
private function setupExchange(): void
{
$this->rabbitChannel->exchange_declare(
'search.sync',
'direct',
false,
true,
false
);
}
public function indexProduct(array $product): bool
{
$message = new \PhpAmqpLib\Message\AMQPMessage(
json_encode([
'operation' => 'index',
'index' => 'products',
'id' => $product['id'],
'document' => $product
]),
[
'content_type' => 'application/json',
'delivery_mode' => \PhpAmqpLib\Message\AMQPMessage::DELIVERY_MODE_PERSISTENT,
]
);
$this->rabbitChannel->basic_publish(
$message,
'search.sync',
'index.update'
);
return true;
}
public function updateProduct(string $productId, array $updates): bool
{
$message = new \PhpAmqpLib\Message\AMQPMessage(
json_encode([
'operation' => 'update',
'index' => 'products',
'id' => $productId,
'updates' => $updates
]),
[
'content_type' => 'application/json',
'delivery_mode' => \PhpAmqpLib\Message\AMQPMessage::DELIVERY_MODE_PERSISTENT,
]
);
$this->rabbitChannel->basic_publish(
$message,
'search.sync',
'index.update'
);
return true;
}
public function deleteProduct(string $productId): bool
{
$message = new \PhpAmqpLib\Message\AMQPMessage(
json_encode([
'operation' => 'delete',
'index' => 'products',
'id' => $productId
]),
[
'content_type' => 'application/json',
'delivery_mode' => \PhpAmqpLib\Message\AMQPMessage::DELIVERY_MODE_PERSISTENT,
]
);
$this->rabbitChannel->basic_publish(
$message,
'search.sync',
'index.update'
);
return true;
}
public function bulkIndexProducts(array $products): bool
{
$documents = array_map(function ($product) {
return [
'index' => 'products',
'id' => $product['id'],
'document' => $product
];
}, $products);
$message = new \PhpAmqpLib\Message\AMQPMessage(
json_encode([
'operation' => 'bulk',
'documents' => $documents
]),
[
'content_type' => 'application/json',
'delivery_mode' => \PhpAmqpLib\Message\AMQPMessage::DELIVERY_MODE_PERSISTENT,
]
);
$this->rabbitChannel->basic_publish(
$message,
'search.sync',
'index.update'
);
return true;
}
public function close(): void
{
$this->rabbitChannel->close();
$this->rabbitConnection->close();
}
}
// 使用示例
$rabbit = RabbitMQConfig::getConnection();
$producer = new SearchProducer($rabbit);
$producer->indexProduct([
'id' => 'PROD-001',
'name' => 'iPhone 15 Pro',
'description' => '最新款 iPhone',
'price' => 999.99,
'category' => 'electronics',
'brand' => 'Apple',
'tags' => ['phone', 'smartphone', 'apple'],
'status' => 'active',
'created_at' => date('c'),
'updated_at' => date('c'),
]);
$producer->updateProduct('PROD-001', [
'price' => 899.99,
'status' => 'sale'
]);
$producer->close();搜索服务
php
<?php
class SearchService
{
private $esClient;
public function __construct($esClient)
{
$this->esClient = $esClient;
}
public function searchProducts(array $criteria): array
{
$query = $this->buildQuery($criteria);
$params = [
'index' => 'products',
'body' => $query
];
$response = $this->esClient->search($params);
return $this->formatResults($response);
}
private function buildQuery(array $criteria): array
{
$must = [];
$filter = [];
if (!empty($criteria['query'])) {
$must[] = [
'multi_match' => [
'query' => $criteria['query'],
'fields' => ['name^2', 'description', 'brand'],
'type' => 'best_fields'
]
];
}
if (!empty($criteria['category'])) {
$filter[] = ['term' => ['category' => $criteria['category']]];
}
if (!empty($criteria['brand'])) {
$filter[] = ['term' => ['brand' => $criteria['brand']]];
}
if (!empty($criteria['tags'])) {
$filter[] = ['terms' => ['tags' => (array)$criteria['tags']]];
}
if (isset($criteria['min_price']) || isset($criteria['max_price'])) {
$range = [];
if (isset($criteria['min_price'])) {
$range['gte'] = $criteria['min_price'];
}
if (isset($criteria['max_price'])) {
$range['lte'] = $criteria['max_price'];
}
$filter[] = ['range' => ['price' => $range]];
}
$query = [
'query' => [
'bool' => []
]
];
if (!empty($must)) {
$query['query']['bool']['must'] = $must;
}
if (!empty($filter)) {
$query['query']['bool']['filter'] = $filter;
}
if (!empty($criteria['sort'])) {
$query['sort'] = $this->buildSort($criteria['sort']);
}
$query['from'] = $criteria['from'] ?? 0;
$query['size'] = $criteria['size'] ?? 20;
if (!empty($criteria['aggs'])) {
$query['aggs'] = $this->buildAggregations($criteria['aggs']);
}
return $query;
}
private function buildSort(array $sort): array
{
$result = [];
foreach ($sort as $field => $order) {
$result[] = [$field => ['order' => $order]];
}
return $result;
}
private function buildAggregations(array $aggs): array
{
$result = [];
if (in_array('category', $aggs)) {
$result['categories'] = [
'terms' => ['field' => 'category', 'size' => 10]
];
}
if (in_array('brand', $aggs)) {
$result['brands'] = [
'terms' => ['field' => 'brand', 'size' => 10]
];
}
if (in_array('price_range', $aggs)) {
$result['price_ranges'] = [
'range' => [
'field' => 'price',
'ranges' => [
['to' => 100],
['from' => 100, 'to' => 500],
['from' => 500, 'to' => 1000],
['from' => 1000]
]
]
];
}
return $result;
}
private function formatResults(array $response): array
{
$hits = $response['hits'];
$results = [
'total' => $hits['total']['value'] ?? 0,
'took' => $response['took'] ?? 0,
'timed_out' => $response['timed_out'] ?? false,
'hits' => []
];
foreach ($hits['hits'] as $hit) {
$results['hits'][] = [
'id' => $hit['_id'],
'score' => $hit['_score'] ?? null,
'source' => $hit['_source'],
'highlight' => $hit['highlight'] ?? null
];
}
if (!empty($response['aggregations'])) {
$results['aggregations'] = $response['aggregations'];
}
return $results;
}
public function suggest(string $prefix, string $field = 'name'): array
{
$params = [
'index' => 'products',
'body' => [
'suggest' => [
'product_suggest' => [
'prefix' => $prefix,
'completion' => [
'field' => $field . '.suggest',
'size' => 10,
'skip_duplicates' => true
]
]
]
]
];
$response = $this->esClient->search($params);
$suggestions = [];
if (!empty($response['suggest']['product_suggest'][0]['options'])) {
foreach ($response['suggest']['product_suggest'][0]['options'] as $option) {
$suggestions[] = [
'text' => $option['text'],
'score' => $option['_score'],
'source' => $option['_source'] ?? null
];
}
}
return $suggestions;
}
}
// 使用示例
$esClient = ElasticsearchConfig::getClient();
$search = new SearchService($esClient);
$results = $search->searchProducts([
'query' => 'iPhone',
'category' => 'electronics',
'min_price' => 500,
'max_price' => 1500,
'sort' => ['price' => 'asc'],
'size' => 20,
'aggs' => ['category', 'brand', 'price_range']
]);
print_r($results);实际应用场景
场景一:实时日志分析
php
<?php
class RealTimeLogAnalyzer
{
private $esClient;
private $rabbitChannel;
public function __construct($esClient, AMQPStreamConnection $rabbit)
{
$this->esClient = $esClient;
$this->rabbitChannel = $rabbit->channel();
}
public function analyzeErrors(int $minutes = 5): array
{
$params = [
'index' => 'logs-*',
'body' => [
'query' => [
'bool' => [
'must' => [
['term' => ['level' => 'ERROR']],
['range' => ['@timestamp' => [
'gte' => 'now-' . $minutes . 'm'
]]]
]
]
],
'aggs' => [
'by_service' => [
'terms' => ['field' => 'service', 'size' => 10],
'aggs' => [
'by_message' => [
'terms' => ['field' => 'message.keyword', 'size' => 5]
]
]
]
],
'size' => 0
]
];
return $this->esClient->search($params);
}
public function getErrorTrend(int $hours = 24): array
{
$params = [
'index' => 'logs-*',
'body' => [
'query' => [
'term' => ['level' => 'ERROR']
],
'aggs' => [
'trend' => [
'date_histogram' => [
'field' => '@timestamp',
'calendar_interval' => 'hour',
'min_doc_count' => 0
]
]
],
'size' => 0
]
];
return $this->esClient->search($params);
}
public function alertOnThreshold(int $threshold = 100, int $minutes = 5): void
{
$result = $this->analyzeErrors($minutes);
$totalErrors = $result['hits']['total']['value'] ?? 0;
if ($totalErrors > $threshold) {
$this->sendAlert($totalErrors, $threshold);
}
}
private function sendAlert(int $count, int $threshold): void
{
$alert = [
'type' => 'error_threshold_exceeded',
'count' => $count,
'threshold' => $threshold,
'timestamp' => date('c')
];
$message = new \PhpAmqpLib\Message\AMQPMessage(
json_encode($alert),
['content_type' => 'application/json']
);
$this->rabbitChannel->basic_publish(
$message,
'alerts.exchange',
'alert.error'
);
}
}场景二:电商搜索
php
<?php
class EcommerceSearch
{
private $esClient;
private $rabbitChannel;
public function __construct($esClient, AMQPStreamConnection $rabbit)
{
$this->esClient = $esClient;
$this->rabbitChannel = $rabbit->channel();
}
public function searchWithPersonalization(string $query, string $userId): array
{
$userPreferences = $this->getUserPreferences($userId);
$params = [
'index' => 'products',
'body' => [
'query' => [
'function_score' => [
'query' => [
'multi_match' => [
'query' => $query,
'fields' => ['name^2', 'description', 'brand']
]
],
'functions' => [
[
'filter' => ['terms' => ['category' => $userPreferences['categories'] ?? []]],
'weight' => 1.5
],
[
'filter' => ['terms' => ['brand' => $userPreferences['brands'] ?? []]],
'weight' => 1.3
],
[
'field_value_factor' => [
'field' => 'popularity',
'factor' => 0.1,
'modifier' => 'log1p'
]
]
],
'score_mode' => 'sum',
'boost_mode' => 'multiply'
]
]
]
];
return $this->esClient->search($params);
}
private function getUserPreferences(string $userId): array
{
$params = [
'index' => 'user_preferences',
'id' => $userId
];
try {
$result = $this->esClient->get($params);
return $result['_source'] ?? [];
} catch (Exception $e) {
return [];
}
}
public function recordSearch(string $userId, string $query, array $results): void
{
$searchEvent = [
'userId' => $userId,
'query' => $query,
'resultCount' => count($results),
'timestamp' => date('c')
];
$message = new \PhpAmqpLib\Message\AMQPMessage(
json_encode($searchEvent),
['content_type' => 'application/json']
);
$this->rabbitChannel->basic_publish(
$message,
'analytics.exchange',
'search.event'
);
}
}常见问题与解决方案
问题一:索引延迟
症状: 数据更新后搜索结果未及时更新
解决方案: 使用 refresh 参数或优化索引设置
php
$this->esClient->index([
'index' => 'products',
'id' => $productId,
'refresh' => true,
'body' => $document
]);问题二:批量索引失败
症状: 大批量索引时出现错误
解决方案: 分批处理并添加错误重试
php
private function safeBulkIndex(array $batch, int $maxRetries = 3): void
{
$attempt = 0;
while ($attempt < $maxRetries) {
try {
$this->esClient->bulk(['body' => $batch]);
return;
} catch (Exception $e) {
$attempt++;
if ($attempt >= $maxRetries) {
throw $e;
}
sleep(1);
}
}
}问题三:搜索性能下降
症状: 搜索响应时间变长
解决方案: 优化查询和添加缓存
php
public function searchWithCache(array $criteria): array
{
$cacheKey = md5(json_encode($criteria));
if ($cached = $this->cache->get($cacheKey)) {
return $cached;
}
$results = $this->searchProducts($criteria);
$this->cache->set($cacheKey, $results, 300);
return $results;
}最佳实践建议
1. 索引模板管理
php
private function createIndexTemplates(): void
{
$templates = [
'logs-template' => $this->getLogTemplate(),
'products-template' => $this->getProductTemplate(),
'metrics-template' => $this->getMetricsTemplate(),
];
foreach ($templates as $name => $template) {
$this->esClient->indices()->putTemplate([
'name' => $name,
'body' => $template
]);
}
}2. 错误处理
php
class ElasticsearchErrorHandler
{
public static function handle(Exception $e, array $context = []): void
{
$error = [
'message' => $e->getMessage(),
'code' => $e->getCode(),
'context' => $context,
'timestamp' => date('c')
];
error_log(json_encode($error));
}
}3. 监控指标
php
class ElasticsearchMetrics
{
public static function recordQueryTime(string $index, float $time): void
{
// 记录查询时间到监控系统
}
public static function recordIndexCount(string $index, int $count): void
{
// 记录索引数量
}
}版本兼容性
| PHP | Elasticsearch PHP Client | Elasticsearch Server | RabbitMQ Server |
|---|---|---|---|
| 8.2+ | 8.x | 8.x | 3.11+ |
| 8.1+ | 8.x | 8.x | 3.10+ |
| 8.0+ | 7.x/8.x | 7.x/8.x | 3.9+ |
| 7.4+ | 7.x | 7.x | 3.8+ |
