Appearance
日志收集系统
概述
日志收集系统是现代分布式系统的重要组成部分。通过 RabbitMQ 作为日志传输中间件,可以实现高性能、可靠的日志收集、聚合和分析,帮助开发团队快速定位问题、监控系统状态。
业务背景与需求
场景描述
某互联网公司拥有多个业务系统,需要统一的日志收集平台:
| 系统 | 日志类型 | 日志量/天 | 特点 |
|---|---|---|---|
| Web应用 | 访问日志、错误日志 | 100GB | 高并发、实时性要求高 |
| API网关 | 请求日志、响应日志 | 50GB | 需要完整链路追踪 |
| 微服务集群 | 业务日志、调试日志 | 200GB | 多语言、多格式 |
| 数据库 | 慢查询日志、审计日志 | 20GB | 需要安全审计 |
| 基础设施 | 系统日志、容器日志 | 30GB | 格式多样 |
需求分析
| 需求项 | 描述 |
|---|---|
| 高吞吐量 | 支持每秒 10万+ 条日志写入 |
| 低延迟 | 日志从产生到可查询 < 5秒 |
| 可靠性 | 日志不丢失,支持重试 |
| 灵活性 | 支持多种日志格式和来源 |
| 可扩展 | 支持动态扩容 |
| 实时分析 | 支持实时日志搜索和告警 |
架构设计
整体架构图
mermaid
graph TB
subgraph "日志来源"
A[Web应用]
B[API网关]
C[微服务]
D[数据库]
E[容器/系统]
end
subgraph "日志采集层"
F[Filebeat]
G[Fluentd]
H[Logstash]
I[SDK直连]
end
subgraph "RabbitMQ集群"
J[日志交换机<br/>log.exchange]
subgraph "分类队列"
K[访问日志队列<br/>log.access]
L[错误日志队列<br/>log.error]
M[业务日志队列<br/>log.business]
N[审计日志队列<br/>log.audit]
O[系统日志队列<br/>log.system]
end
P[死信队列<br/>log.dlq]
end
subgraph "日志处理层"
Q[日志解析器]
R[日志聚合器]
S[日志过滤器]
end
subgraph "存储层"
T[Elasticsearch]
U[ClickHouse]
V[HDFS]
end
subgraph "分析层"
W[Kibana]
X[Grafana]
Y[告警系统]
end
A --> F
B --> G
C --> H
D --> I
E --> F
F --> J
G --> J
H --> J
I --> J
J --> K
J --> L
J --> M
J --> N
J --> O
K -.-> P
L -.-> P
M -.-> P
N -.-> P
O -.-> P
K --> Q
L --> Q
M --> R
N --> S
O --> Q
Q --> T
R --> U
S --> V
T --> W
U --> X
T --> Y日志流转流程
mermaid
sequenceDiagram
participant App as 应用程序
participant Agent as 日志Agent
participant MQ as RabbitMQ
participant Consumer as 日志消费者
participant ES as Elasticsearch
participant Alert as 告警系统
App->>Agent: 写入日志文件
Agent->>Agent: 读取并解析日志
Agent->>MQ: 发送到日志队列
alt 错误日志
MQ->>Consumer: 投递错误日志
Consumer->>Consumer: 解析并分析
Consumer->>ES: 存储日志
Consumer->>Alert: 触发告警(如需要)
else 普通日志
MQ->>Consumer: 投递日志
Consumer->>Consumer: 批量处理
Consumer->>ES: 批量写入
end
ES-->>Consumer: 确认写入
Consumer-->>MQ: 发送ACK日志分类模型
mermaid
graph LR
A[日志消息] --> B{日志级别}
B --> C[DEBUG]
B --> D[INFO]
B --> E[WARN]
B --> F[ERROR]
B --> G[FATAL]
A --> H{日志类型}
H --> I[访问日志]
H --> J[业务日志]
H --> K[错误日志]
H --> L[审计日志]
H --> M[系统日志]
A --> N{来源系统}
N --> O[Web应用]
N --> P[API网关]
N --> Q[微服务]
N --> R[数据库]
N --> S[基础设施]PHP 代码实现
日志消息结构定义
php
<?php
namespace App\Messaging\Log;
class LogMessage
{
public string $messageId;
public string $timestamp;
public string $level;
public string $type;
public string $source;
public string $message;
public array $context;
public array $extra;
public string $traceId;
public string $spanId;
public const LEVEL_DEBUG = 'DEBUG';
public const LEVEL_INFO = 'INFO';
public const LEVEL_WARN = 'WARN';
public const LEVEL_ERROR = 'ERROR';
public const LEVEL_FATAL = 'FATAL';
public const TYPE_ACCESS = 'access';
public const TYPE_BUSINESS = 'business';
public const TYPE_ERROR = 'error';
public const TYPE_AUDIT = 'audit';
public const TYPE_SYSTEM = 'system';
public function __construct(
string $level,
string $type,
string $source,
string $message,
array $context = [],
array $extra = []
) {
$this->messageId = $this->generateMessageId();
$this->timestamp = date('Y-m-d\TH:i:s.uP');
$this->level = strtoupper($level);
$this->type = $type;
$this->source = $source;
$this->message = $message;
$this->context = $context;
$this->extra = $extra;
$this->traceId = $this->getTraceId();
$this->spanId = $this->generateSpanId();
}
private function generateMessageId(): string
{
return sprintf('log_%s_%s', date('YmdHis'), bin2hex(random_bytes(8)));
}
private function getTraceId(): string
{
if (isset($_SERVER['HTTP_X_TRACE_ID'])) {
return $_SERVER['HTTP_X_TRACE_ID'];
}
if (isset($GLOBALS['trace_id'])) {
return $GLOBALS['trace_id'];
}
return bin2hex(random_bytes(16));
}
private function generateSpanId(): string
{
return bin2hex(random_bytes(8));
}
public function addContext(string $key, $value): self
{
$this->context[$key] = $value;
return $this;
}
public function addExtra(string $key, $value): self
{
$this->extra[$key] = $value;
return $this;
}
public function toArray(): array
{
return [
'message_id' => $this->messageId,
'timestamp' => $this->timestamp,
'level' => $this->level,
'type' => $this->type,
'source' => $this->source,
'message' => $this->message,
'context' => $this->context,
'extra' => $this->extra,
'trace_id' => $this->traceId,
'span_id' => $this->spanId,
'hostname' => gethostname(),
'pid' => getmypid(),
];
}
public function toJson(): string
{
return json_encode($this->toArray(), JSON_UNESCAPED_UNICODE | JSON_UNESCAPED_SLASHES);
}
public static function fromArray(array $data): self
{
$log = new self(
$data['level'],
$data['type'],
$data['source'],
$data['message'],
$data['context'] ?? [],
$data['extra'] ?? []
);
$log->messageId = $data['message_id'];
$log->timestamp = $data['timestamp'];
$log->traceId = $data['trace_id'] ?? '';
$log->spanId = $data['span_id'] ?? '';
return $log;
}
}日志生产者实现
php
<?php
namespace App\Messaging\Log;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Exchange\AMQPExchangeType;
class LogProducer
{
private AMQPStreamConnection $connection;
private $channel;
private string $exchangeName = 'log.exchange';
private string $deadLetterExchange = 'log.dlx';
private array $buffer = [];
private int $bufferSize = 100;
private float $bufferTimeout = 1.0;
private float $lastFlushTime = 0;
private const QUEUE_CONFIG = [
self::TYPE_ACCESS => [
'queue' => 'log.access',
'ttl' => 86400000,
'max_length' => 10000000,
],
self::TYPE_BUSINESS => [
'queue' => 'log.business',
'ttl' => 604800000,
'max_length' => 5000000,
],
self::TYPE_ERROR => [
'queue' => 'log.error',
'ttl' => 2592000000,
'max_length' => 1000000,
],
self::TYPE_AUDIT => [
'queue' => 'log.audit',
'ttl' => 7776000000,
'max_length' => 5000000,
],
self::TYPE_SYSTEM => [
'queue' => 'log.system',
'ttl' => 2592000000,
'max_length' => 2000000,
],
];
public function __construct(AMQPStreamConnection $connection, int $bufferSize = 100)
{
$this->connection = $connection;
$this->channel = $connection->channel();
$this->bufferSize = $bufferSize;
$this->lastFlushTime = microtime(true);
$this->setupInfrastructure();
}
private function setupInfrastructure(): void
{
$this->channel->exchange_declare(
$this->exchangeName,
AMQPExchangeType::TOPIC,
false,
true,
false
);
$this->channel->exchange_declare(
$this->deadLetterExchange,
AMQPExchangeType::DIRECT,
false,
true,
false
);
foreach (self::QUEUE_CONFIG as $type => $config) {
$args = [
'x-dead-letter-exchange' => ['S', $this->deadLetterExchange],
'x-dead-letter-routing-key' => ['S', 'log.failed'],
'x-message-ttl' => ['I', $config['ttl']],
'x-max-length' => ['I', $config['max_length']],
'x-queue-mode' => ['S', 'lazy'],
];
$this->channel->queue_declare(
$config['queue'],
false,
true,
false,
false,
false,
$args
);
$this->channel->queue_bind(
$config['queue'],
$this->exchangeName,
"log.{$type}"
);
}
$this->channel->queue_declare('log.dlq', false, true, false, false);
$this->channel->queue_bind('log.dlq', $this->deadLetterExchange, 'log.failed');
}
public function send(LogMessage $log): void
{
$this->buffer[] = $log;
if (count($this->buffer) >= $this->bufferSize) {
$this->flush();
}
}
public function sendImmediate(LogMessage $log): void
{
$routingKey = "log.{$log->type}";
$message = new AMQPMessage(
$log->toJson(),
[
'content_type' => 'application/json',
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
'message_id' => $log->messageId,
'timestamp' => time(),
'headers' => [
'level' => $log->level,
'type' => $log->type,
'source' => $log->source,
],
]
);
$this->channel->basic_publish(
$message,
$this->exchangeName,
$routingKey
);
}
public function flush(): void
{
if (empty($this->buffer)) {
return;
}
foreach ($this->buffer as $log) {
$this->sendImmediate($log);
}
$this->buffer = [];
$this->lastFlushTime = microtime(true);
}
public function autoFlush(): void
{
$now = microtime(true);
if (($now - $this->lastFlushTime) >= $this->bufferTimeout && !empty($this->buffer)) {
$this->flush();
}
}
public function close(): void
{
$this->flush();
if ($this->channel) {
$this->channel->close();
}
}
}日志消费者实现
php
<?php
namespace App\Messaging\Log;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
class LogConsumer
{
private AMQPStreamConnection $connection;
private $channel;
private ElasticsearchClient $esClient;
private ClickHouseClient $clickHouseClient;
private AlertService $alertService;
private array $batchBuffer = [];
private int $batchSize = 1000;
private float $batchTimeout = 5.0;
private float $lastBatchTime = 0;
private bool $running = true;
public function __construct(
AMQPStreamConnection $connection,
ElasticsearchClient $esClient,
ClickHouseClient $clickHouseClient,
AlertService $alertService,
array $config = []
) {
$this->connection = $connection;
$this->channel = $connection->channel();
$this->esClient = $esClient;
$this->clickHouseClient = $clickHouseClient;
$this->alertService = $alertService;
$this->batchSize = $config['batch_size'] ?? 1000;
$this->batchTimeout = $config['batch_timeout'] ?? 5.0;
$this->lastBatchTime = microtime(true);
}
public function consume(string $queueName): void
{
$this->channel->basic_qos(null, $this->batchSize * 2, null);
$this->channel->basic_consume(
$queueName,
'',
false,
false,
false,
false,
[$this', 'processMessage']
);
while ($this->running && count($this->channel->callbacks)) {
$this->channel->wait(null, true, 1);
$this->checkBatchTimeout();
if (!$this->running) {
break;
}
}
$this->flushBatch();
}
public function processMessage(AMQPMessage $message): void
{
try {
$logData = json_decode($message->body, true);
$log = LogMessage::fromArray($logData);
$this->batchBuffer[] = [
'log' => $log,
'message' => $message,
];
if ($log->level === LogMessage::LEVEL_ERROR || $log->level === LogMessage::LEVEL_FATAL) {
$this->alertService->checkAndAlert($log);
}
if (count($this->batchBuffer) >= $this->batchSize) {
$this->flushBatch();
}
} catch (\Exception $e) {
error_log("Log consumer error: " . $e->getMessage());
$message->nack(false, false);
}
}
private function checkBatchTimeout(): void
{
$now = microtime(true);
if (($now - $this->lastBatchTime) >= $this->batchTimeout && !empty($this->batchBuffer)) {
$this->flushBatch();
}
}
private function flushBatch(): void
{
if (empty($this->batchBuffer)) {
return;
}
$logs = [];
$messages = [];
foreach ($this->batchBuffer as $item) {
$logs[] = $item['log']->toArray();
$messages[] = $item['message'];
}
try {
$this->esClient->bulkIndex($logs);
foreach ($messages as $message) {
$message->ack();
}
$this->lastBatchTime = microtime(true);
} catch (\Exception $e) {
error_log("Failed to flush batch: " . $e->getMessage());
foreach ($messages as $message) {
$message->nack(false, true);
}
}
$this->batchBuffer = [];
}
public function stop(): void
{
$this->running = false;
}
public function close(): void
{
$this->stop();
$this->flushBatch();
if ($this->channel) {
$this->channel->close();
}
}
}日志处理器实现
php
<?php
namespace App\Services\Log;
class LogProcessor
{
private array $parsers = [];
private array $filters = [];
private array $enrichers = [];
public function __construct()
{
$this->registerDefaultProcessors();
}
private function registerDefaultProcessors(): void
{
$this->parsers = [
'nginx' => new NginxLogParser(),
'apache' => new ApacheLogParser(),
'json' => new JsonLogParser(),
'syslog' => new SyslogParser(),
];
$this->enrichers = [
new GeoIpEnricher(),
new UserAgentEnricher(),
new HostnameEnricher(),
];
}
public function process(array $rawLog): array
{
$log = $rawLog;
if (isset($log['format']) && isset($this->parsers[$log['format']])) {
$log = $this->parsers[$log['format']]->parse($log);
}
foreach ($this->enrichers as $enricher) {
$log = $enricher->enrich($log);
}
foreach ($this->filters as $filter) {
if (!$filter->accept($log)) {
return [];
}
}
return $log;
}
}
class NginxLogParser
{
private const PATTERN = '/^(\S+) \S+ (\S+) \[([^\]]+)\] "(\S+) ([^"]+) HTTP\/[^"]+" (\d+) (\d+) "([^"]*)" "([^"]*)"/';
public function parse(array $log): array
{
if (!isset($log['raw_message'])) {
return $log;
}
if (preg_match(self::PATTERN, $log['raw_message'], $matches)) {
$log['client_ip'] = $matches[1];
$log['user'] = $matches[2] === '-' ? null : $matches[2];
$log['timestamp'] = $this->parseTimestamp($matches[3]);
$log['method'] = $matches[4];
$log['path'] = $matches[5];
$log['status'] = (int) $matches[6];
$log['size'] = (int) $matches[7];
$log['referer'] = $matches[8] === '-' ? null : $matches[8];
$log['user_agent'] = $matches[9];
}
return $log;
}
private function parseTimestamp(string $timestamp): string
{
$dt = \DateTime::createFromFormat('d/M/Y:H:i:s O', $timestamp);
return $dt ? $dt->format('Y-m-d\TH:i:sP') : $timestamp;
}
}
class JsonLogParser
{
public function parse(array $log): array
{
if (!isset($log['raw_message'])) {
return $log;
}
$parsed = json_decode($log['raw_message'], true);
if (json_last_error() === JSON_ERROR_NONE && is_array($parsed)) {
$log = array_merge($log, $parsed);
unset($log['raw_message']);
}
return $log;
}
}
class GeoIpEnricher
{
private $geoIpReader;
public function __construct(string $geoIpDbPath = null)
{
if ($geoIpDbPath && file_exists($geoIpDbPath)) {
$this->geoIpReader = new \MaxMind\Db\Reader($geoIpDbPath);
}
}
public function enrich(array $log): array
{
if (!isset($log['client_ip']) || !$this->geoIpReader) {
return $log;
}
try {
$geoData = $this->geoIpReader->get($log['client_ip']);
if ($geoData) {
$log['geo'] = [
'country' => $geoData['country']['iso_code'] ?? null,
'city' => $geoData['city']['names']['en'] ?? null,
'latitude' => $geoData['location']['latitude'] ?? null,
'longitude' => $geoData['location']['longitude'] ?? null,
];
}
} catch (\Exception $e) {
}
return $log;
}
}
class UserAgentEnricher
{
public function enrich(array $log): array
{
if (!isset($log['user_agent'])) {
return $log;
}
$ua = parse_user_agent($log['user_agent']);
$log['ua'] = [
'browser' => $ua['browser'] ?? null,
'browser_version' => $ua['version'] ?? null,
'platform' => $ua['platform'] ?? null,
];
return $log;
}
}Elasticsearch 客户端封装
php
<?php
namespace App\Services\Log;
class ElasticsearchClient
{
private $client;
private string $indexPrefix;
private array $indexTemplates;
public function __construct(array $config)
{
$this->client = \Elasticsearch\ClientBuilder::create()
->setHosts($config['hosts'])
->setRetries(3)
->build();
$this->indexPrefix = $config['index_prefix'] ?? 'logs';
$this->indexTemplates = $config['templates'] ?? [];
$this->setupIndexTemplates();
}
private function setupIndexTemplates(): void
{
$template = [
'index_patterns' => ["{$this->indexPrefix}-*"],
'settings' => [
'number_of_shards' => 3,
'number_of_replicas' => 1,
'refresh_interval' => '5s',
],
'mappings' => [
'properties' => [
'timestamp' => ['type' => 'date'],
'level' => ['type' => 'keyword'],
'type' => ['type' => 'keyword'],
'source' => ['type' => 'keyword'],
'message' => ['type' => 'text'],
'trace_id' => ['type' => 'keyword'],
'span_id' => ['type' => 'keyword'],
'hostname' => ['type' => 'keyword'],
'client_ip' => ['type' => 'ip'],
'geo' => [
'properties' => [
'location' => ['type' => 'geo_point'],
],
],
],
],
];
$this->client->indices()->putTemplate([
'name' => 'logs_template',
'body' => $template,
]);
}
public function index(array $log): array
{
$indexName = $this->getIndexName($log);
$params = [
'index' => $indexName,
'body' => $log,
];
return $this->client->index($params);
}
public function bulkIndex(array $logs): array
{
$params = ['body' => []];
foreach ($logs as $log) {
$indexName = $this->getIndexName($log);
$params['body'][] = [
'index' => [
'_index' => $indexName,
],
];
$params['body'][] = $log;
}
return $this->client->bulk($params);
}
private function getIndexName(array $log): string
{
$type = $log['type'] ?? 'default';
$date = date('Y.m.d');
return "{$this->indexPrefix}-{$type}-{$date}";
}
public function search(array $query): array
{
$params = [
'index' => "{$this->indexPrefix}-*",
'body' => $query,
];
return $this->client->search($params);
}
public function searchByTraceId(string $traceId): array
{
return $this->search([
'query' => [
'term' => ['trace_id' => $traceId],
],
'sort' => ['timestamp' => 'asc'],
'size' => 1000,
]);
}
}告警服务实现
php
<?php
namespace App\Services\Log;
class AlertService
{
private array $rules;
private array $notifiers;
private $redis;
public function __construct(array $rules, array $notifiers, $redis = null)
{
$this->rules = $rules;
$this->notifiers = $notifiers;
$this->redis = $redis;
}
public function checkAndAlert(LogMessage $log): void
{
foreach ($this->rules as $rule) {
if ($this->matchRule($log, $rule)) {
$this->processAlert($log, $rule);
}
}
}
private function matchRule(LogMessage $log, array $rule): bool
{
if (isset($rule['level']) && $log->level !== $rule['level']) {
return false;
}
if (isset($rule['source']) && $log->source !== $rule['source']) {
return false;
}
if (isset($rule['pattern']) && !preg_match($rule['pattern'], $log->message)) {
return false;
}
return true;
}
private function processAlert(LogMessage $log, array $rule): void
{
$alertKey = $this->getAlertKey($log, $rule);
if ($this->isAlertThrottled($alertKey, $rule['throttle'] ?? 300)) {
return;
}
$alert = [
'id' => uniqid('alert_', true),
'rule_name' => $rule['name'],
'log' => $log->toArray(),
'triggered_at' => date('Y-m-d H:i:s'),
];
foreach ($rule['channels'] ?? ['default'] as $channel) {
if (isset($this->notifiers[$channel])) {
$this->notifiers[$channel]->notify($alert);
}
}
$this->throttleAlert($alertKey, $rule['throttle'] ?? 300);
}
private function getAlertKey(LogMessage $log, array $rule): string
{
return sprintf(
'alert:%s:%s:%s',
$rule['name'],
$log->source,
md5($log->message)
);
}
private function isAlertThrottled(string $key, int $seconds): bool
{
if (!$this->redis) {
return false;
}
return (bool) $this->redis->exists($key);
}
private function throttleAlert(string $key, int $seconds): void
{
if (!$this->redis) {
return;
}
$this->redis->setex($key, $seconds, 1);
}
}
class SlackNotifier
{
private string $webhookUrl;
public function __construct(string $webhookUrl)
{
$this->webhookUrl = $webhookUrl;
}
public function notify(array $alert): void
{
$color = $this->getColor($alert['log']['level']);
$payload = [
'attachments' => [
[
'color' => $color,
'title' => "🚨 Alert: {$alert['rule_name']}",
'fields' => [
[
'title' => 'Level',
'value' => $alert['log']['level'],
'short' => true,
],
[
'title' => 'Source',
'value' => $alert['log']['source'],
'short' => true,
],
[
'title' => 'Message',
'value' => substr($alert['log']['message'], 0, 500),
'short' => false,
],
[
'title' => 'Trace ID',
'value' => $alert['log']['trace_id'],
'short' => true,
],
],
'footer' => $alert['triggered_at'],
],
],
];
$this->send($payload);
}
private function getColor(string $level): string
{
$colors = [
'DEBUG' => '#95a5a6',
'INFO' => '#3498db',
'WARN' => '#f39c12',
'ERROR' => '#e74c3c',
'FATAL' => '#8e44ad',
];
return $colors[$level] ?? '#95a5a6';
}
private function send(array $payload): void
{
$ch = curl_init($this->webhookUrl);
curl_setopt_array($ch, [
CURLOPT_POST => true,
CURLOPT_POSTFIELDS => json_encode($payload),
CURLOPT_RETURNTRANSFER => true,
CURLOPT_HTTPHEADER => ['Content-Type: application/json'],
CURLOPT_TIMEOUT => 10,
]);
curl_exec($ch);
curl_close($ch);
}
}完整使用示例
php
<?php
require_once 'vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use App\Messaging\Log\{LogMessage, LogProducer, LogConsumer};
use App\Services\Log\{ElasticsearchClient, AlertService, SlackNotifier};
$connection = new AMQPStreamConnection(
'localhost',
5672,
'guest',
'guest',
'/',
false,
'AMQPLAIN',
null,
'en_US',
3.0,
3.0
);
$producer = new LogProducer($connection, 50);
register_shutdown_function(function () use ($producer) {
$producer->close();
});
function log_access(string $method, string $path, int $status, array $extra = []): void
{
global $producer;
$log = new LogMessage(
LogMessage::LEVEL_INFO,
LogMessage::TYPE_ACCESS,
'web-app',
sprintf('%s %s - %d', $method, $path, $status),
[
'method' => $method,
'path' => $path,
'status' => $status,
],
$extra
);
$producer->send($log);
}
function log_error(\Throwable $e, array $context = []): void
{
global $producer;
$log = new LogMessage(
LogMessage::LEVEL_ERROR,
LogMessage::TYPE_ERROR,
'web-app',
$e->getMessage(),
array_merge([
'exception' => get_class($e),
'file' => $e->getFile(),
'line' => $e->getLine(),
'trace' => $e->getTraceAsString(),
], $context)
);
$producer->sendImmediate($log);
}
function log_business(string $action, array $data = []): void
{
global $producer;
$log = new LogMessage(
LogMessage::LEVEL_INFO,
LogMessage::TYPE_BUSINESS,
'web-app',
$action,
$data
);
$producer->send($log);
}
log_access('GET', '/api/users', 200, ['user_id' => 123, 'duration_ms' => 45]);
try {
throw new \RuntimeException('Database connection failed');
} catch (\Exception $e) {
log_error($e, ['operation' => 'user_query']);
}
log_business('order_created', ['order_id' => 'ORD123', 'amount' => 99.99]);
$producer->flush();
echo "日志已发送到 RabbitMQ\n";
$esClient = new ElasticsearchClient([
'hosts' => ['http://localhost:9200'],
'index_prefix' => 'myapp-logs',
]);
$redis = new Redis();
$redis->connect('localhost', 6379);
$alertService = new AlertService(
[
[
'name' => 'high_error_rate',
'level' => 'ERROR',
'pattern' => '/Database/',
'throttle' => 300,
'channels' => ['slack'],
],
[
'name' => 'fatal_error',
'level' => 'FATAL',
'throttle' => 60,
'channels' => ['slack', 'email'],
],
],
[
'slack' => new SlackNotifier('https://hooks.slack.com/services/xxx'),
],
$redis
);
$consumer = new LogConsumer(
$connection,
$esClient,
null,
$alertService,
[
'batch_size' => 500,
'batch_timeout' => 2.0,
]
);
echo "开始消费日志...\n";
pcntl_async_signals(true);
pcntl_signal(SIGTERM, function () use ($consumer) {
echo "收到终止信号\n";
$consumer->stop();
});
pcntl_signal(SIGINT, function () use ($consumer) {
echo "收到中断信号\n";
$consumer->stop();
});
$consumer->consume('log.error');
$consumer->close();
$connection->close();关键技术点解析
1. 懒队列模式
php
$args = [
'x-queue-mode' => ['S', 'lazy'],
];- 消息存储在磁盘而非内存
- 适合日志等大容量场景
- 降低内存压力
2. 批量处理优化
php
public function bulkIndex(array $logs): array
{
$params = ['body' => []];
foreach ($logs as $log) {
$params['body'][] = ['index' => ['_index' => $indexName]];
$params['body'][] = $log;
}
return $this->client->bulk($params);
}- 减少网络请求次数
- 提高写入吞吐量
- 降低 CPU 开销
3. 日志分级存储
| 日志类型 | 保留时间 | 存储位置 |
|---|---|---|
| 访问日志 | 7天 | Elasticsearch |
| 业务日志 | 30天 | Elasticsearch + ClickHouse |
| 错误日志 | 90天 | Elasticsearch |
| 审计日志 | 90天 | Elasticsearch + HDFS |
4. 告警节流
php
private function isAlertThrottled(string $key, int $seconds): bool
{
return (bool) $this->redis->exists($key);
}- 防止告警风暴
- 使用 Redis 实现分布式节流
- 可配置节流时间窗口
性能优化建议
生产者优化
| 优化项 | 建议 | 说明 |
|---|---|---|
| 缓冲发送 | 批量大小 100-1000 | 减少网络开销 |
| 异步发送 | 使用独立进程 | 不阻塞主业务 |
| 压缩传输 | gzip 压缩大日志 | 减少带宽占用 |
消费者优化
| 优化项 | 建议 | 说明 |
|---|---|---|
| 批量确认 | 批量 ACK | 提高吞吐量 |
| 多消费者 | 按日志类型分队列 | 并行处理 |
| 背压控制 | 动态调整 prefetch | 防止内存溢出 |
存储优化
php
$settings = [
'number_of_shards' => 3,
'number_of_replicas' => 1,
'refresh_interval' => '30s',
];常见问题与解决方案
1. 日志丢失
问题: 高峰期日志丢失
解决方案:
- 增加队列容量
- 使用持久化消息
- 实现本地缓存降级
2. 消费延迟
问题: 日志消费延迟严重
解决方案:
- 增加消费者数量
- 优化批量处理大小
- 使用更快的存储
3. 存储膨胀
问题: Elasticsearch 存储空间不足
解决方案:
- 配置 ILM 生命周期管理
- 使用冷热数据分层
- 定期归档历史数据
4. 告警风暴
问题: 短时间大量告警
解决方案:
- 实现告警聚合
- 配置合理的节流策略
- 分级告警处理
