Skip to content

日志收集系统

概述

日志收集系统是现代分布式系统的重要组成部分。通过 RabbitMQ 作为日志传输中间件,可以实现高性能、可靠的日志收集、聚合和分析,帮助开发团队快速定位问题、监控系统状态。

业务背景与需求

场景描述

某互联网公司拥有多个业务系统,需要统一的日志收集平台:

系统日志类型日志量/天特点
Web应用访问日志、错误日志100GB高并发、实时性要求高
API网关请求日志、响应日志50GB需要完整链路追踪
微服务集群业务日志、调试日志200GB多语言、多格式
数据库慢查询日志、审计日志20GB需要安全审计
基础设施系统日志、容器日志30GB格式多样

需求分析

需求项描述
高吞吐量支持每秒 10万+ 条日志写入
低延迟日志从产生到可查询 < 5秒
可靠性日志不丢失,支持重试
灵活性支持多种日志格式和来源
可扩展支持动态扩容
实时分析支持实时日志搜索和告警

架构设计

整体架构图

mermaid
graph TB
    subgraph "日志来源"
        A[Web应用]
        B[API网关]
        C[微服务]
        D[数据库]
        E[容器/系统]
    end
    
    subgraph "日志采集层"
        F[Filebeat]
        G[Fluentd]
        H[Logstash]
        I[SDK直连]
    end
    
    subgraph "RabbitMQ集群"
        J[日志交换机<br/>log.exchange]
        
        subgraph "分类队列"
            K[访问日志队列<br/>log.access]
            L[错误日志队列<br/>log.error]
            M[业务日志队列<br/>log.business]
            N[审计日志队列<br/>log.audit]
            O[系统日志队列<br/>log.system]
        end
        
        P[死信队列<br/>log.dlq]
    end
    
    subgraph "日志处理层"
        Q[日志解析器]
        R[日志聚合器]
        S[日志过滤器]
    end
    
    subgraph "存储层"
        T[Elasticsearch]
        U[ClickHouse]
        V[HDFS]
    end
    
    subgraph "分析层"
        W[Kibana]
        X[Grafana]
        Y[告警系统]
    end
    
    A --> F
    B --> G
    C --> H
    D --> I
    E --> F
    
    F --> J
    G --> J
    H --> J
    I --> J
    
    J --> K
    J --> L
    J --> M
    J --> N
    J --> O
    
    K -.-> P
    L -.-> P
    M -.-> P
    N -.-> P
    O -.-> P
    
    K --> Q
    L --> Q
    M --> R
    N --> S
    O --> Q
    
    Q --> T
    R --> U
    S --> V
    
    T --> W
    U --> X
    T --> Y

日志流转流程

mermaid
sequenceDiagram
    participant App as 应用程序
    participant Agent as 日志Agent
    participant MQ as RabbitMQ
    participant Consumer as 日志消费者
    participant ES as Elasticsearch
    participant Alert as 告警系统
    
    App->>Agent: 写入日志文件
    Agent->>Agent: 读取并解析日志
    Agent->>MQ: 发送到日志队列
    
    alt 错误日志
        MQ->>Consumer: 投递错误日志
        Consumer->>Consumer: 解析并分析
        Consumer->>ES: 存储日志
        Consumer->>Alert: 触发告警(如需要)
    else 普通日志
        MQ->>Consumer: 投递日志
        Consumer->>Consumer: 批量处理
        Consumer->>ES: 批量写入
    end
    
    ES-->>Consumer: 确认写入
    Consumer-->>MQ: 发送ACK

日志分类模型

mermaid
graph LR
    A[日志消息] --> B{日志级别}
    B --> C[DEBUG]
    B --> D[INFO]
    B --> E[WARN]
    B --> F[ERROR]
    B --> G[FATAL]
    
    A --> H{日志类型}
    H --> I[访问日志]
    H --> J[业务日志]
    H --> K[错误日志]
    H --> L[审计日志]
    H --> M[系统日志]
    
    A --> N{来源系统}
    N --> O[Web应用]
    N --> P[API网关]
    N --> Q[微服务]
    N --> R[数据库]
    N --> S[基础设施]

PHP 代码实现

日志消息结构定义

php
<?php

namespace App\Messaging\Log;

class LogMessage
{
    public string $messageId;
    public string $timestamp;
    public string $level;
    public string $type;
    public string $source;
    public string $message;
    public array $context;
    public array $extra;
    public string $traceId;
    public string $spanId;

    public const LEVEL_DEBUG = 'DEBUG';
    public const LEVEL_INFO = 'INFO';
    public const LEVEL_WARN = 'WARN';
    public const LEVEL_ERROR = 'ERROR';
    public const LEVEL_FATAL = 'FATAL';

    public const TYPE_ACCESS = 'access';
    public const TYPE_BUSINESS = 'business';
    public const TYPE_ERROR = 'error';
    public const TYPE_AUDIT = 'audit';
    public const TYPE_SYSTEM = 'system';

    public function __construct(
        string $level,
        string $type,
        string $source,
        string $message,
        array $context = [],
        array $extra = []
    ) {
        $this->messageId = $this->generateMessageId();
        $this->timestamp = date('Y-m-d\TH:i:s.uP');
        $this->level = strtoupper($level);
        $this->type = $type;
        $this->source = $source;
        $this->message = $message;
        $this->context = $context;
        $this->extra = $extra;
        $this->traceId = $this->getTraceId();
        $this->spanId = $this->generateSpanId();
    }

    private function generateMessageId(): string
    {
        return sprintf('log_%s_%s', date('YmdHis'), bin2hex(random_bytes(8)));
    }

    private function getTraceId(): string
    {
        if (isset($_SERVER['HTTP_X_TRACE_ID'])) {
            return $_SERVER['HTTP_X_TRACE_ID'];
        }
        
        if (isset($GLOBALS['trace_id'])) {
            return $GLOBALS['trace_id'];
        }
        
        return bin2hex(random_bytes(16));
    }

    private function generateSpanId(): string
    {
        return bin2hex(random_bytes(8));
    }

    public function addContext(string $key, $value): self
    {
        $this->context[$key] = $value;
        return $this;
    }

    public function addExtra(string $key, $value): self
    {
        $this->extra[$key] = $value;
        return $this;
    }

    public function toArray(): array
    {
        return [
            'message_id' => $this->messageId,
            'timestamp' => $this->timestamp,
            'level' => $this->level,
            'type' => $this->type,
            'source' => $this->source,
            'message' => $this->message,
            'context' => $this->context,
            'extra' => $this->extra,
            'trace_id' => $this->traceId,
            'span_id' => $this->spanId,
            'hostname' => gethostname(),
            'pid' => getmypid(),
        ];
    }

    public function toJson(): string
    {
        return json_encode($this->toArray(), JSON_UNESCAPED_UNICODE | JSON_UNESCAPED_SLASHES);
    }

    public static function fromArray(array $data): self
    {
        $log = new self(
            $data['level'],
            $data['type'],
            $data['source'],
            $data['message'],
            $data['context'] ?? [],
            $data['extra'] ?? []
        );
        $log->messageId = $data['message_id'];
        $log->timestamp = $data['timestamp'];
        $log->traceId = $data['trace_id'] ?? '';
        $log->spanId = $data['span_id'] ?? '';
        return $log;
    }
}

日志生产者实现

php
<?php

namespace App\Messaging\Log;

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Exchange\AMQPExchangeType;

class LogProducer
{
    private AMQPStreamConnection $connection;
    private $channel;
    private string $exchangeName = 'log.exchange';
    private string $deadLetterExchange = 'log.dlx';
    private array $buffer = [];
    private int $bufferSize = 100;
    private float $bufferTimeout = 1.0;
    private float $lastFlushTime = 0;

    private const QUEUE_CONFIG = [
        self::TYPE_ACCESS => [
            'queue' => 'log.access',
            'ttl' => 86400000,
            'max_length' => 10000000,
        ],
        self::TYPE_BUSINESS => [
            'queue' => 'log.business',
            'ttl' => 604800000,
            'max_length' => 5000000,
        ],
        self::TYPE_ERROR => [
            'queue' => 'log.error',
            'ttl' => 2592000000,
            'max_length' => 1000000,
        ],
        self::TYPE_AUDIT => [
            'queue' => 'log.audit',
            'ttl' => 7776000000,
            'max_length' => 5000000,
        ],
        self::TYPE_SYSTEM => [
            'queue' => 'log.system',
            'ttl' => 2592000000,
            'max_length' => 2000000,
        ],
    ];

    public function __construct(AMQPStreamConnection $connection, int $bufferSize = 100)
    {
        $this->connection = $connection;
        $this->channel = $connection->channel();
        $this->bufferSize = $bufferSize;
        $this->lastFlushTime = microtime(true);
        $this->setupInfrastructure();
    }

    private function setupInfrastructure(): void
    {
        $this->channel->exchange_declare(
            $this->exchangeName,
            AMQPExchangeType::TOPIC,
            false,
            true,
            false
        );

        $this->channel->exchange_declare(
            $this->deadLetterExchange,
            AMQPExchangeType::DIRECT,
            false,
            true,
            false
        );

        foreach (self::QUEUE_CONFIG as $type => $config) {
            $args = [
                'x-dead-letter-exchange' => ['S', $this->deadLetterExchange],
                'x-dead-letter-routing-key' => ['S', 'log.failed'],
                'x-message-ttl' => ['I', $config['ttl']],
                'x-max-length' => ['I', $config['max_length']],
                'x-queue-mode' => ['S', 'lazy'],
            ];

            $this->channel->queue_declare(
                $config['queue'],
                false,
                true,
                false,
                false,
                false,
                $args
            );

            $this->channel->queue_bind(
                $config['queue'],
                $this->exchangeName,
                "log.{$type}"
            );
        }

        $this->channel->queue_declare('log.dlq', false, true, false, false);
        $this->channel->queue_bind('log.dlq', $this->deadLetterExchange, 'log.failed');
    }

    public function send(LogMessage $log): void
    {
        $this->buffer[] = $log;

        if (count($this->buffer) >= $this->bufferSize) {
            $this->flush();
        }
    }

    public function sendImmediate(LogMessage $log): void
    {
        $routingKey = "log.{$log->type}";
        
        $message = new AMQPMessage(
            $log->toJson(),
            [
                'content_type' => 'application/json',
                'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
                'message_id' => $log->messageId,
                'timestamp' => time(),
                'headers' => [
                    'level' => $log->level,
                    'type' => $log->type,
                    'source' => $log->source,
                ],
            ]
        );

        $this->channel->basic_publish(
            $message,
            $this->exchangeName,
            $routingKey
        );
    }

    public function flush(): void
    {
        if (empty($this->buffer)) {
            return;
        }

        foreach ($this->buffer as $log) {
            $this->sendImmediate($log);
        }

        $this->buffer = [];
        $this->lastFlushTime = microtime(true);
    }

    public function autoFlush(): void
    {
        $now = microtime(true);
        if (($now - $this->lastFlushTime) >= $this->bufferTimeout && !empty($this->buffer)) {
            $this->flush();
        }
    }

    public function close(): void
    {
        $this->flush();
        if ($this->channel) {
            $this->channel->close();
        }
    }
}

日志消费者实现

php
<?php

namespace App\Messaging\Log;

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

class LogConsumer
{
    private AMQPStreamConnection $connection;
    private $channel;
    private ElasticsearchClient $esClient;
    private ClickHouseClient $clickHouseClient;
    private AlertService $alertService;
    private array $batchBuffer = [];
    private int $batchSize = 1000;
    private float $batchTimeout = 5.0;
    private float $lastBatchTime = 0;
    private bool $running = true;

    public function __construct(
        AMQPStreamConnection $connection,
        ElasticsearchClient $esClient,
        ClickHouseClient $clickHouseClient,
        AlertService $alertService,
        array $config = []
    ) {
        $this->connection = $connection;
        $this->channel = $connection->channel();
        $this->esClient = $esClient;
        $this->clickHouseClient = $clickHouseClient;
        $this->alertService = $alertService;
        $this->batchSize = $config['batch_size'] ?? 1000;
        $this->batchTimeout = $config['batch_timeout'] ?? 5.0;
        $this->lastBatchTime = microtime(true);
    }

    public function consume(string $queueName): void
    {
        $this->channel->basic_qos(null, $this->batchSize * 2, null);

        $this->channel->basic_consume(
            $queueName,
            '',
            false,
            false,
            false,
            false,
            [$this', 'processMessage']
        );

        while ($this->running && count($this->channel->callbacks)) {
            $this->channel->wait(null, true, 1);
            
            $this->checkBatchTimeout();
            
            if (!$this->running) {
                break;
            }
        }

        $this->flushBatch();
    }

    public function processMessage(AMQPMessage $message): void
    {
        try {
            $logData = json_decode($message->body, true);
            $log = LogMessage::fromArray($logData);

            $this->batchBuffer[] = [
                'log' => $log,
                'message' => $message,
            ];

            if ($log->level === LogMessage::LEVEL_ERROR || $log->level === LogMessage::LEVEL_FATAL) {
                $this->alertService->checkAndAlert($log);
            }

            if (count($this->batchBuffer) >= $this->batchSize) {
                $this->flushBatch();
            }

        } catch (\Exception $e) {
            error_log("Log consumer error: " . $e->getMessage());
            $message->nack(false, false);
        }
    }

    private function checkBatchTimeout(): void
    {
        $now = microtime(true);
        if (($now - $this->lastBatchTime) >= $this->batchTimeout && !empty($this->batchBuffer)) {
            $this->flushBatch();
        }
    }

    private function flushBatch(): void
    {
        if (empty($this->batchBuffer)) {
            return;
        }

        $logs = [];
        $messages = [];

        foreach ($this->batchBuffer as $item) {
            $logs[] = $item['log']->toArray();
            $messages[] = $item['message'];
        }

        try {
            $this->esClient->bulkIndex($logs);
            
            foreach ($messages as $message) {
                $message->ack();
            }

            $this->lastBatchTime = microtime(true);
            
        } catch (\Exception $e) {
            error_log("Failed to flush batch: " . $e->getMessage());
            
            foreach ($messages as $message) {
                $message->nack(false, true);
            }
        }

        $this->batchBuffer = [];
    }

    public function stop(): void
    {
        $this->running = false;
    }

    public function close(): void
    {
        $this->stop();
        $this->flushBatch();
        if ($this->channel) {
            $this->channel->close();
        }
    }
}

日志处理器实现

php
<?php

namespace App\Services\Log;

class LogProcessor
{
    private array $parsers = [];
    private array $filters = [];
    private array $enrichers = [];

    public function __construct()
    {
        $this->registerDefaultProcessors();
    }

    private function registerDefaultProcessors(): void
    {
        $this->parsers = [
            'nginx' => new NginxLogParser(),
            'apache' => new ApacheLogParser(),
            'json' => new JsonLogParser(),
            'syslog' => new SyslogParser(),
        ];

        $this->enrichers = [
            new GeoIpEnricher(),
            new UserAgentEnricher(),
            new HostnameEnricher(),
        ];
    }

    public function process(array $rawLog): array
    {
        $log = $rawLog;

        if (isset($log['format']) && isset($this->parsers[$log['format']])) {
            $log = $this->parsers[$log['format']]->parse($log);
        }

        foreach ($this->enrichers as $enricher) {
            $log = $enricher->enrich($log);
        }

        foreach ($this->filters as $filter) {
            if (!$filter->accept($log)) {
                return [];
            }
        }

        return $log;
    }
}

class NginxLogParser
{
    private const PATTERN = '/^(\S+) \S+ (\S+) \[([^\]]+)\] "(\S+) ([^"]+) HTTP\/[^"]+" (\d+) (\d+) "([^"]*)" "([^"]*)"/';

    public function parse(array $log): array
    {
        if (!isset($log['raw_message'])) {
            return $log;
        }

        if (preg_match(self::PATTERN, $log['raw_message'], $matches)) {
            $log['client_ip'] = $matches[1];
            $log['user'] = $matches[2] === '-' ? null : $matches[2];
            $log['timestamp'] = $this->parseTimestamp($matches[3]);
            $log['method'] = $matches[4];
            $log['path'] = $matches[5];
            $log['status'] = (int) $matches[6];
            $log['size'] = (int) $matches[7];
            $log['referer'] = $matches[8] === '-' ? null : $matches[8];
            $log['user_agent'] = $matches[9];
        }

        return $log;
    }

    private function parseTimestamp(string $timestamp): string
    {
        $dt = \DateTime::createFromFormat('d/M/Y:H:i:s O', $timestamp);
        return $dt ? $dt->format('Y-m-d\TH:i:sP') : $timestamp;
    }
}

class JsonLogParser
{
    public function parse(array $log): array
    {
        if (!isset($log['raw_message'])) {
            return $log;
        }

        $parsed = json_decode($log['raw_message'], true);
        
        if (json_last_error() === JSON_ERROR_NONE && is_array($parsed)) {
            $log = array_merge($log, $parsed);
            unset($log['raw_message']);
        }

        return $log;
    }
}

class GeoIpEnricher
{
    private $geoIpReader;

    public function __construct(string $geoIpDbPath = null)
    {
        if ($geoIpDbPath && file_exists($geoIpDbPath)) {
            $this->geoIpReader = new \MaxMind\Db\Reader($geoIpDbPath);
        }
    }

    public function enrich(array $log): array
    {
        if (!isset($log['client_ip']) || !$this->geoIpReader) {
            return $log;
        }

        try {
            $geoData = $this->geoIpReader->get($log['client_ip']);
            
            if ($geoData) {
                $log['geo'] = [
                    'country' => $geoData['country']['iso_code'] ?? null,
                    'city' => $geoData['city']['names']['en'] ?? null,
                    'latitude' => $geoData['location']['latitude'] ?? null,
                    'longitude' => $geoData['location']['longitude'] ?? null,
                ];
            }
        } catch (\Exception $e) {
        }

        return $log;
    }
}

class UserAgentEnricher
{
    public function enrich(array $log): array
    {
        if (!isset($log['user_agent'])) {
            return $log;
        }

        $ua = parse_user_agent($log['user_agent']);
        
        $log['ua'] = [
            'browser' => $ua['browser'] ?? null,
            'browser_version' => $ua['version'] ?? null,
            'platform' => $ua['platform'] ?? null,
        ];

        return $log;
    }
}

Elasticsearch 客户端封装

php
<?php

namespace App\Services\Log;

class ElasticsearchClient
{
    private $client;
    private string $indexPrefix;
    private array $indexTemplates;

    public function __construct(array $config)
    {
        $this->client = \Elasticsearch\ClientBuilder::create()
            ->setHosts($config['hosts'])
            ->setRetries(3)
            ->build();
        
        $this->indexPrefix = $config['index_prefix'] ?? 'logs';
        $this->indexTemplates = $config['templates'] ?? [];
        
        $this->setupIndexTemplates();
    }

    private function setupIndexTemplates(): void
    {
        $template = [
            'index_patterns' => ["{$this->indexPrefix}-*"],
            'settings' => [
                'number_of_shards' => 3,
                'number_of_replicas' => 1,
                'refresh_interval' => '5s',
            ],
            'mappings' => [
                'properties' => [
                    'timestamp' => ['type' => 'date'],
                    'level' => ['type' => 'keyword'],
                    'type' => ['type' => 'keyword'],
                    'source' => ['type' => 'keyword'],
                    'message' => ['type' => 'text'],
                    'trace_id' => ['type' => 'keyword'],
                    'span_id' => ['type' => 'keyword'],
                    'hostname' => ['type' => 'keyword'],
                    'client_ip' => ['type' => 'ip'],
                    'geo' => [
                        'properties' => [
                            'location' => ['type' => 'geo_point'],
                        ],
                    ],
                ],
            ],
        ];

        $this->client->indices()->putTemplate([
            'name' => 'logs_template',
            'body' => $template,
        ]);
    }

    public function index(array $log): array
    {
        $indexName = $this->getIndexName($log);
        
        $params = [
            'index' => $indexName,
            'body' => $log,
        ];

        return $this->client->index($params);
    }

    public function bulkIndex(array $logs): array
    {
        $params = ['body' => []];

        foreach ($logs as $log) {
            $indexName = $this->getIndexName($log);
            
            $params['body'][] = [
                'index' => [
                    '_index' => $indexName,
                ],
            ];
            $params['body'][] = $log;
        }

        return $this->client->bulk($params);
    }

    private function getIndexName(array $log): string
    {
        $type = $log['type'] ?? 'default';
        $date = date('Y.m.d');
        
        return "{$this->indexPrefix}-{$type}-{$date}";
    }

    public function search(array $query): array
    {
        $params = [
            'index' => "{$this->indexPrefix}-*",
            'body' => $query,
        ];

        return $this->client->search($params);
    }

    public function searchByTraceId(string $traceId): array
    {
        return $this->search([
            'query' => [
                'term' => ['trace_id' => $traceId],
            ],
            'sort' => ['timestamp' => 'asc'],
            'size' => 1000,
        ]);
    }
}

告警服务实现

php
<?php

namespace App\Services\Log;

class AlertService
{
    private array $rules;
    private array $notifiers;
    private $redis;

    public function __construct(array $rules, array $notifiers, $redis = null)
    {
        $this->rules = $rules;
        $this->notifiers = $notifiers;
        $this->redis = $redis;
    }

    public function checkAndAlert(LogMessage $log): void
    {
        foreach ($this->rules as $rule) {
            if ($this->matchRule($log, $rule)) {
                $this->processAlert($log, $rule);
            }
        }
    }

    private function matchRule(LogMessage $log, array $rule): bool
    {
        if (isset($rule['level']) && $log->level !== $rule['level']) {
            return false;
        }

        if (isset($rule['source']) && $log->source !== $rule['source']) {
            return false;
        }

        if (isset($rule['pattern']) && !preg_match($rule['pattern'], $log->message)) {
            return false;
        }

        return true;
    }

    private function processAlert(LogMessage $log, array $rule): void
    {
        $alertKey = $this->getAlertKey($log, $rule);
        
        if ($this->isAlertThrottled($alertKey, $rule['throttle'] ?? 300)) {
            return;
        }

        $alert = [
            'id' => uniqid('alert_', true),
            'rule_name' => $rule['name'],
            'log' => $log->toArray(),
            'triggered_at' => date('Y-m-d H:i:s'),
        ];

        foreach ($rule['channels'] ?? ['default'] as $channel) {
            if (isset($this->notifiers[$channel])) {
                $this->notifiers[$channel]->notify($alert);
            }
        }

        $this->throttleAlert($alertKey, $rule['throttle'] ?? 300);
    }

    private function getAlertKey(LogMessage $log, array $rule): string
    {
        return sprintf(
            'alert:%s:%s:%s',
            $rule['name'],
            $log->source,
            md5($log->message)
        );
    }

    private function isAlertThrottled(string $key, int $seconds): bool
    {
        if (!$this->redis) {
            return false;
        }

        return (bool) $this->redis->exists($key);
    }

    private function throttleAlert(string $key, int $seconds): void
    {
        if (!$this->redis) {
            return;
        }

        $this->redis->setex($key, $seconds, 1);
    }
}

class SlackNotifier
{
    private string $webhookUrl;

    public function __construct(string $webhookUrl)
    {
        $this->webhookUrl = $webhookUrl;
    }

    public function notify(array $alert): void
    {
        $color = $this->getColor($alert['log']['level']);
        
        $payload = [
            'attachments' => [
                [
                    'color' => $color,
                    'title' => "🚨 Alert: {$alert['rule_name']}",
                    'fields' => [
                        [
                            'title' => 'Level',
                            'value' => $alert['log']['level'],
                            'short' => true,
                        ],
                        [
                            'title' => 'Source',
                            'value' => $alert['log']['source'],
                            'short' => true,
                        ],
                        [
                            'title' => 'Message',
                            'value' => substr($alert['log']['message'], 0, 500),
                            'short' => false,
                        ],
                        [
                            'title' => 'Trace ID',
                            'value' => $alert['log']['trace_id'],
                            'short' => true,
                        ],
                    ],
                    'footer' => $alert['triggered_at'],
                ],
            ],
        ];

        $this->send($payload);
    }

    private function getColor(string $level): string
    {
        $colors = [
            'DEBUG' => '#95a5a6',
            'INFO' => '#3498db',
            'WARN' => '#f39c12',
            'ERROR' => '#e74c3c',
            'FATAL' => '#8e44ad',
        ];

        return $colors[$level] ?? '#95a5a6';
    }

    private function send(array $payload): void
    {
        $ch = curl_init($this->webhookUrl);
        curl_setopt_array($ch, [
            CURLOPT_POST => true,
            CURLOPT_POSTFIELDS => json_encode($payload),
            CURLOPT_RETURNTRANSFER => true,
            CURLOPT_HTTPHEADER => ['Content-Type: application/json'],
            CURLOPT_TIMEOUT => 10,
        ]);
        curl_exec($ch);
        curl_close($ch);
    }
}

完整使用示例

php
<?php

require_once 'vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use App\Messaging\Log\{LogMessage, LogProducer, LogConsumer};
use App\Services\Log\{ElasticsearchClient, AlertService, SlackNotifier};

$connection = new AMQPStreamConnection(
    'localhost',
    5672,
    'guest',
    'guest',
    '/',
    false,
    'AMQPLAIN',
    null,
    'en_US',
    3.0,
    3.0
);

$producer = new LogProducer($connection, 50);

register_shutdown_function(function () use ($producer) {
    $producer->close();
});

function log_access(string $method, string $path, int $status, array $extra = []): void
{
    global $producer;
    
    $log = new LogMessage(
        LogMessage::LEVEL_INFO,
        LogMessage::TYPE_ACCESS,
        'web-app',
        sprintf('%s %s - %d', $method, $path, $status),
        [
            'method' => $method,
            'path' => $path,
            'status' => $status,
        ],
        $extra
    );
    
    $producer->send($log);
}

function log_error(\Throwable $e, array $context = []): void
{
    global $producer;
    
    $log = new LogMessage(
        LogMessage::LEVEL_ERROR,
        LogMessage::TYPE_ERROR,
        'web-app',
        $e->getMessage(),
        array_merge([
            'exception' => get_class($e),
            'file' => $e->getFile(),
            'line' => $e->getLine(),
            'trace' => $e->getTraceAsString(),
        ], $context)
    );
    
    $producer->sendImmediate($log);
}

function log_business(string $action, array $data = []): void
{
    global $producer;
    
    $log = new LogMessage(
        LogMessage::LEVEL_INFO,
        LogMessage::TYPE_BUSINESS,
        'web-app',
        $action,
        $data
    );
    
    $producer->send($log);
}

log_access('GET', '/api/users', 200, ['user_id' => 123, 'duration_ms' => 45]);

try {
    throw new \RuntimeException('Database connection failed');
} catch (\Exception $e) {
    log_error($e, ['operation' => 'user_query']);
}

log_business('order_created', ['order_id' => 'ORD123', 'amount' => 99.99]);

$producer->flush();

echo "日志已发送到 RabbitMQ\n";

$esClient = new ElasticsearchClient([
    'hosts' => ['http://localhost:9200'],
    'index_prefix' => 'myapp-logs',
]);

$redis = new Redis();
$redis->connect('localhost', 6379);

$alertService = new AlertService(
    [
        [
            'name' => 'high_error_rate',
            'level' => 'ERROR',
            'pattern' => '/Database/',
            'throttle' => 300,
            'channels' => ['slack'],
        ],
        [
            'name' => 'fatal_error',
            'level' => 'FATAL',
            'throttle' => 60,
            'channels' => ['slack', 'email'],
        ],
    ],
    [
        'slack' => new SlackNotifier('https://hooks.slack.com/services/xxx'),
    ],
    $redis
);

$consumer = new LogConsumer(
    $connection,
    $esClient,
    null,
    $alertService,
    [
        'batch_size' => 500,
        'batch_timeout' => 2.0,
    ]
);

echo "开始消费日志...\n";

pcntl_async_signals(true);
pcntl_signal(SIGTERM, function () use ($consumer) {
    echo "收到终止信号\n";
    $consumer->stop();
});
pcntl_signal(SIGINT, function () use ($consumer) {
    echo "收到中断信号\n";
    $consumer->stop();
});

$consumer->consume('log.error');

$consumer->close();
$connection->close();

关键技术点解析

1. 懒队列模式

php
$args = [
    'x-queue-mode' => ['S', 'lazy'],
];
  • 消息存储在磁盘而非内存
  • 适合日志等大容量场景
  • 降低内存压力

2. 批量处理优化

php
public function bulkIndex(array $logs): array
{
    $params = ['body' => []];
    foreach ($logs as $log) {
        $params['body'][] = ['index' => ['_index' => $indexName]];
        $params['body'][] = $log;
    }
    return $this->client->bulk($params);
}
  • 减少网络请求次数
  • 提高写入吞吐量
  • 降低 CPU 开销

3. 日志分级存储

日志类型保留时间存储位置
访问日志7天Elasticsearch
业务日志30天Elasticsearch + ClickHouse
错误日志90天Elasticsearch
审计日志90天Elasticsearch + HDFS

4. 告警节流

php
private function isAlertThrottled(string $key, int $seconds): bool
{
    return (bool) $this->redis->exists($key);
}
  • 防止告警风暴
  • 使用 Redis 实现分布式节流
  • 可配置节流时间窗口

性能优化建议

生产者优化

优化项建议说明
缓冲发送批量大小 100-1000减少网络开销
异步发送使用独立进程不阻塞主业务
压缩传输gzip 压缩大日志减少带宽占用

消费者优化

优化项建议说明
批量确认批量 ACK提高吞吐量
多消费者按日志类型分队列并行处理
背压控制动态调整 prefetch防止内存溢出

存储优化

php
$settings = [
    'number_of_shards' => 3,
    'number_of_replicas' => 1,
    'refresh_interval' => '30s',
];

常见问题与解决方案

1. 日志丢失

问题: 高峰期日志丢失

解决方案:

  • 增加队列容量
  • 使用持久化消息
  • 实现本地缓存降级

2. 消费延迟

问题: 日志消费延迟严重

解决方案:

  • 增加消费者数量
  • 优化批量处理大小
  • 使用更快的存储

3. 存储膨胀

问题: Elasticsearch 存储空间不足

解决方案:

  • 配置 ILM 生命周期管理
  • 使用冷热数据分层
  • 定期归档历史数据

4. 告警风暴

问题: 短时间大量告警

解决方案:

  • 实现告警聚合
  • 配置合理的节流策略
  • 分级告警处理

相关链接