Skip to content

RabbitMQ 与 Redis 集成

概述

Redis 是一个高性能的内存数据结构存储系统,常用于缓存、会话管理、消息队列、实时分析等场景。将 RabbitMQ 与 Redis 集成,可以实现缓存更新通知、分布式锁、消息去重、限流等功能,构建更强大的分布式系统。

本教程将详细介绍 RabbitMQ 与 Redis 的集成方案,包括缓存同步、分布式锁、消息去重等核心功能。

集成架构设计

架构图

┌─────────────────────────────────────────────────────────────────────┐
│                    Redis 集成架构                                    │
│                                                                      │
│  ┌─────────────────────────────────────────────────────────────┐    │
│  │                    应用层                                    │    │
│  │  ┌─────────────┐    ┌─────────────┐    ┌─────────────┐     │    │
│  │  │   Cache     │    │   Lock      │    │   Rate      │     │    │
│  │  │   Manager   │    │   Manager   │    │   Limiter   │     │    │
│  │  └─────────────┘    └─────────────┘    └─────────────┘     │    │
│  └─────────────────────────────────────────────────────────────┘    │
│                              │                                       │
│          ┌───────────────────┼───────────────────┐                  │
│          ▼                   ▼                   ▼                  │
│  ┌─────────────┐    ┌─────────────┐    ┌─────────────┐             │
│  │  RabbitMQ   │    │    Redis    │    │  Database   │             │
│  │  (消息队列)  │    │   (缓存)    │    │  (持久化)   │             │
│  │             │    │             │    │             │             │
│  │  • 事件通知  │    │  • 缓存存储  │    │  • 数据存储  │             │
│  │  • 异步处理  │    │  • 分布式锁  │    │  • 事务管理  │             │
│  │  • 解耦系统  │    │  • 限流计数  │    │             │             │
│  └─────────────┘    └─────────────┘    └─────────────┘             │
│         │                   │                   │                   │
│         └───────────────────┴───────────────────┘                   │
│                              │                                       │
│                              ▼                                       │
│  ┌─────────────────────────────────────────────────────────────┐    │
│  │                    同步与协调层                              │    │
│  │  • 缓存失效通知                                              │    │
│  │  • 分布式锁协调                                              │    │
│  │  • 消息去重处理                                              │    │
│  └─────────────────────────────────────────────────────────────┘    │
└─────────────────────────────────────────────────────────────────────┘

集成模式

模式说明
Cache Invalidation缓存失效,通过消息通知更新缓存
Distributed Lock分布式锁,协调分布式资源访问
Message Deduplication消息去重,防止重复处理
Rate Limiting限流,控制消息处理速率
Idempotency幂等性,确保操作可重复执行

配置示例

Redis PHP 客户端配置

php
<?php

require_once __DIR__ . '/vendor/autoload.php';

use Predis\Client;

class RedisConfig
{
    public static function getClient(): Client
    {
        $config = [
            'scheme' => getenv('REDIS_SCHEME') ?: 'tcp',
            'host' => getenv('REDIS_HOST') ?: 'localhost',
            'port' => getenv('REDIS_PORT') ?: 6379,
            'password' => getenv('REDIS_PASSWORD') ?: null,
            'database' => getenv('REDIS_DATABASE') ?: 0,
        ];
        
        $options = [
            'cluster' => getenv('REDIS_CLUSTER') ?: null,
            'replication' => getenv('REDIS_REPLICATION') ?: null,
            'connections' => [
                'tcp' => 'Predis\Connection\PhpiredisStreamConnection',
            ],
            'prefix' => getenv('REDIS_PREFIX') ?: 'app:',
        ];
        
        return new Client($config, $options);
    }
    
    public static function getClusterClient(): Client
    {
        $nodes = explode(',', getenv('REDIS_CLUSTER_NODES') ?: 'localhost:6379');
        
        $parameters = array_map(function ($node) {
            $parts = explode(':', $node);
            return [
                'host' => $parts[0],
                'port' => $parts[1] ?? 6379,
            ];
        }, $nodes);
        
        return new Client($parameters, [
            'cluster' => 'redis',
        ]);
    }
}

RabbitMQ 连接配置

php
<?php

use PhpAmqpLib\Connection\AMQPStreamConnection;

class RabbitMQConfig
{
    public static function getConnection(): AMQPStreamConnection
    {
        return new AMQPStreamConnection(
            getenv('RABBITMQ_HOST') ?: 'localhost',
            getenv('RABBITMQ_PORT') ?: 5672,
            getenv('RABBITMQ_USER') ?: 'guest',
            getenv('RABBITMQ_PASSWORD') ?: 'guest',
            getenv('RABBITMQ_VHOST') ?: '/'
        );
    }
}

PHP 代码示例

缓存同步管理器

php
<?php

class CacheSyncManager
{
    private Client $redis;
    private $rabbitChannel;
    private $rabbitConnection;
    
    private const CACHE_INVALIDATION_EXCHANGE = 'cache.invalidation';
    private const CACHE_INVALIDATION_QUEUE = 'cache.invalidation.queue';
    
    public function __construct(Client $redis, AMQPStreamConnection $rabbitConnection)
    {
        $this->redis = $redis;
        $this->rabbitConnection = $rabbitConnection;
        $this->rabbitChannel = $rabbitConnection->channel();
        $this->setupInfrastructure();
    }
    
    private function setupInfrastructure(): void
    {
        $this->rabbitChannel->exchange_declare(
            self::CACHE_INVALIDATION_EXCHANGE,
            'fanout',
            false,
            true,
            false
        );
        
        $this->rabbitChannel->queue_declare(
            self::CACHE_INVALIDATION_QUEUE,
            false,
            true,
            false,
            false
        );
        
        $this->rabbitChannel->queue_bind(
            self::CACHE_INVALIDATION_QUEUE,
            self::CACHE_INVALIDATION_EXCHANGE
        );
    }
    
    public function get(string $key, callable $loader, int $ttl = 3600)
    {
        $cached = $this->redis->get($key);
        
        if ($cached !== null) {
            return json_decode($cached, true);
        }
        
        $data = $loader();
        
        $this->redis->setex($key, $ttl, json_encode($data));
        
        return $data;
    }
    
    public function set(string $key, $data, int $ttl = 3600): bool
    {
        return $this->redis->setex($key, $ttl, json_encode($data)) === 'OK';
    }
    
    public function invalidate(string $key, string $reason = 'manual'): bool
    {
        $this->redis->del([$key]);
        
        $this->publishInvalidation($key, $reason);
        
        return true;
    }
    
    public function invalidatePattern(string $pattern, string $reason = 'pattern'): int
    {
        $keys = $this->redis->keys($pattern);
        
        if (empty($keys)) {
            return 0;
        }
        
        $deleted = $this->redis->del($keys);
        
        foreach ($keys as $key) {
            $this->publishInvalidation($key, $reason);
        }
        
        return $deleted;
    }
    
    private function publishInvalidation(string $key, string $reason): void
    {
        $event = [
            'key' => $key,
            'reason' => $reason,
            'timestamp' => date('c'),
            'hostname' => gethostname(),
        ];
        
        $message = new \PhpAmqpLib\Message\AMQPMessage(
            json_encode($event),
            [
                'content_type' => 'application/json',
                'delivery_mode' => \PhpAmqpLib\Message\AMQPMessage::DELIVERY_MODE_PERSISTENT,
            ]
        );
        
        $this->rabbitChannel->basic_publish(
            $message,
            self::CACHE_INVALIDATION_EXCHANGE
        );
    }
    
    public function subscribeInvalidations(callable $handler): void
    {
        $callback = function ($message) use ($handler) {
            $event = json_decode($message->getBody(), true);
            
            $handler($event);
            
            $message->ack();
        };
        
        $this->rabbitChannel->basic_qos(0, 10, false);
        $this->rabbitChannel->basic_consume(
            self::CACHE_INVALIDATION_QUEUE,
            '',
            false,
            false,
            false,
            false,
            $callback
        );
        
        while ($this->rabbitChannel->is_consuming()) {
            $this->rabbitChannel->wait();
        }
    }
    
    public function close(): void
    {
        $this->rabbitChannel->close();
        $this->rabbitConnection->close();
    }
}

// 使用示例
$redis = RedisConfig::getClient();
$rabbit = RabbitMQConfig::getConnection();
$cache = new CacheSyncManager($redis, $rabbit);

$user = $cache->get('user:123', function () {
    return ['id' => 123, 'name' => 'John Doe', 'email' => 'john@example.com'];
}, 3600);

$cache->invalidate('user:123', 'user_updated');

$cache->close();

分布式锁管理器

php
<?php

class DistributedLockManager
{
    private Client $redis;
    private $rabbitChannel;
    private $rabbitConnection;
    
    private const LOCK_PREFIX = 'lock:';
    private const LOCK_EVENT_EXCHANGE = 'lock.events';
    
    private array $heldLocks = [];
    
    public function __construct(Client $redis, AMQPStreamConnection $rabbitConnection)
    {
        $this->redis = $redis;
        $this->rabbitConnection = $rabbitConnection;
        $this->rabbitChannel = $rabbitConnection->channel();
        $this->setupInfrastructure();
    }
    
    private function setupInfrastructure(): void
    {
        $this->rabbitChannel->exchange_declare(
            self::LOCK_EVENT_EXCHANGE,
            'topic',
            false,
            true,
            false
        );
    }
    
    public function acquire(
        string $resource,
        int $ttl = 30,
        int $maxWait = 10,
        int $retryInterval = 100
    ): bool {
        $lockKey = self::LOCK_PREFIX . $resource;
        $lockValue = $this->generateLockValue();
        
        $startTime = time();
        
        while (time() - $startTime < $maxWait) {
            $acquired = $this->tryAcquire($lockKey, $lockValue, $ttl);
            
            if ($acquired) {
                $this->heldLocks[$resource] = [
                    'key' => $lockKey,
                    'value' => $lockValue,
                    'acquired_at' => time(),
                ];
                
                $this->publishLockEvent($resource, 'acquired', $lockValue);
                
                return true;
            }
            
            usleep($retryInterval * 1000);
        }
        
        return false;
    }
    
    private function tryAcquire(string $key, string $value, int $ttl): bool
    {
        $result = $this->redis->set($key, $value, 'NX', 'EX', $ttl);
        
        return $result === 'OK';
    }
    
    public function release(string $resource): bool
    {
        if (!isset($this->heldLocks[$resource])) {
            return false;
        }
        
        $lock = $this->heldLocks[$resource];
        
        $script = "
            if redis.call('get', KEYS[1]) == ARGV[1] then
                return redis.call('del', KEYS[1])
            else
                return 0
            end
        ";
        
        $result = $this->redis->eval($script, 1, $lock['key'], $lock['value']);
        
        if ($result) {
            unset($this->heldLocks[$resource]);
            
            $this->publishLockEvent($resource, 'released', $lock['value']);
            
            return true;
        }
        
        return false;
    }
    
    public function extend(string $resource, int $ttl = 30): bool
    {
        if (!isset($this->heldLocks[$resource])) {
            return false;
        }
        
        $lock = $this->heldLocks[$resource];
        
        $script = "
            if redis.call('get', KEYS[1]) == ARGV[1] then
                return redis.call('expire', KEYS[1], ARGV[2])
            else
                return 0
            end
        ";
        
        return (bool) $this->redis->eval($script, 1, $lock['key'], $lock['value'], $ttl);
    }
    
    public function withLock(
        string $resource,
        callable $callback,
        int $ttl = 30,
        int $maxWait = 10
    ) {
        if (!$this->acquire($resource, $ttl, $maxWait)) {
            throw new RuntimeException("无法获取锁: {$resource}");
        }
        
        try {
            return $callback();
        } finally {
            $this->release($resource);
        }
    }
    
    private function generateLockValue(): string
    {
        return sprintf(
            '%s:%s:%s',
            gethostname(),
            getmypid(),
            uniqid('', true)
        );
    }
    
    private function publishLockEvent(
        string $resource,
        string $action,
        string $lockValue
    ): void {
        $event = [
            'resource' => $resource,
            'action' => $action,
            'lock_value' => $lockValue,
            'timestamp' => date('c'),
        ];
        
        $message = new \PhpAmqpLib\Message\AMQPMessage(
            json_encode($event),
            ['content_type' => 'application/json']
        );
        
        $this->rabbitChannel->basic_publish(
            $message,
            self::LOCK_EVENT_EXCHANGE,
            "lock.{$action}"
        );
    }
    
    public function close(): void
    {
        foreach (array_keys($this->heldLocks) as $resource) {
            $this->release($resource);
        }
        
        $this->rabbitChannel->close();
        $this->rabbitConnection->close();
    }
}

// 使用示例
$redis = RedisConfig::getClient();
$rabbit = RabbitMQConfig::getConnection();
$lockManager = new DistributedLockManager($redis, $rabbit);

if ($lockManager->acquire('order:123', 30, 10)) {
    try {
        // 执行需要锁保护的操作
        processOrder(123);
    } finally {
        $lockManager->release('order:123');
    }
}

$result = $lockManager->withLock('inventory:update', function () {
    return updateInventory();
}, 30, 10);

$lockManager->close();

消息去重处理器

php
<?php

class MessageDeduplicator
{
    private Client $redis;
    private $rabbitChannel;
    
    private const DEDUP_PREFIX = 'dedup:';
    private const DEDUP_TTL = 86400;
    
    public function __construct(Client $redis, AMQPStreamConnection $rabbit)
    {
        $this->redis = $redis;
        $this->rabbitChannel = $rabbit->channel();
    }
    
    public function isDuplicate(string $messageId): bool
    {
        $key = self::DEDUP_PREFIX . $messageId;
        
        return $this->redis->exists($key) > 0;
    }
    
    public function markProcessed(string $messageId, array $metadata = []): bool
    {
        $key = self::DEDUP_PREFIX . $messageId;
        
        $data = array_merge([
            'processed_at' => date('c'),
            'hostname' => gethostname(),
        ], $metadata);
        
        return $this->redis->setex($key, self::DEDUP_TTL, json_encode($data)) === 'OK';
    }
    
    public function processWithDedup(
        $message,
        callable $processor,
        string $idField = 'message_id'
    ): bool {
        $headers = $message->has('application_headers')
            ? $message->get('application_headers')->getNativeData()
            : [];
        
        $messageId = $headers[$idField] ?? $message->get('message_id');
        
        if ($this->isDuplicate($messageId)) {
            $message->ack();
            return false;
        }
        
        try {
            $result = $processor($message);
            
            $this->markProcessed($messageId, [
                'result' => $result ? 'success' : 'failed',
            ]);
            
            $message->ack();
            return true;
        } catch (Exception $e) {
            $message->nack(false, true);
            throw $e;
        }
    }
    
    public function getProcessedInfo(string $messageId): ?array
    {
        $key = self::DEDUP_PREFIX . $messageId;
        
        $data = $this->redis->get($key);
        
        if ($data === null) {
            return null;
        }
        
        return json_decode($data, true);
    }
    
    public function clearProcessed(string $messageId): bool
    {
        $key = self::DEDUP_PREFIX . $messageId;
        
        return $this->redis->del([$key]) > 0;
    }
}

// 使用示例
$redis = RedisConfig::getClient();
$rabbit = RabbitMQConfig::getConnection();
$dedup = new MessageDeduplicator($redis, $rabbit);

$callback = function ($message) use ($dedup) {
    $dedup->processWithDedup($message, function ($msg) {
        $data = json_decode($msg->getBody(), true);
        echo "处理消息: " . json_encode($data) . "\n";
        return true;
    });
};

$channel = $rabbit->channel();
$channel->basic_consume('queue.orders', '', false, false, false, false, $callback);

while ($channel->is_consuming()) {
    $channel->wait();
}

限流器

php
<?php

class RateLimiter
{
    private Client $redis;
    private $rabbitChannel;
    
    private const RATE_LIMIT_PREFIX = 'rate_limit:';
    
    public function __construct(Client $redis, AMQPStreamConnection $rabbit)
    {
        $this->redis = $redis;
        $this->rabbitChannel = $rabbit->channel();
    }
    
    public function isAllowed(
        string $key,
        int $maxRequests,
        int $windowSeconds
    ): bool {
        $redisKey = self::RATE_LIMIT_PREFIX . $key;
        
        $current = $this->redis->incr($redisKey);
        
        if ($current === 1) {
            $this->redis->expire($redisKey, $windowSeconds);
        }
        
        return $current <= $maxRequests;
    }
    
    public function isAllowedSlidingWindow(
        string $key,
        int $maxRequests,
        int $windowSeconds
    ): bool {
        $redisKey = self::RATE_LIMIT_PREFIX . $key;
        $now = microtime(true);
        $windowStart = $now - $windowSeconds;
        
        $this->redis->multi();
        
        $this->redis->zremrangebyscore($redisKey, 0, $windowStart);
        $this->redis->zcard($redisKey);
        $this->redis->zadd($redisKey, [$now => $now]);
        $this->redis->expire($redisKey, $windowSeconds);
        
        $results = $this->redis->exec();
        
        $count = $results[1];
        
        return $count < $maxRequests;
    }
    
    public function isAllowedTokenBucket(
        string $key,
        int $capacity,
        int $refillRate,
        int $refillTime
    ): bool {
        $redisKey = self::RATE_LIMIT_PREFIX . $key;
        $now = time();
        
        $script = "
            local key = KEYS[1]
            local capacity = tonumber(ARGV[1])
            local refill_rate = tonumber(ARGV[2])
            local refill_time = tonumber(ARGV[3])
            local now = tonumber(ARGV[4])
            
            local bucket = redis.call('hmget', key, 'tokens', 'last_refill')
            local tokens = tonumber(bucket[1]) or capacity
            local last_refill = tonumber(bucket[2]) or now
            
            local elapsed = now - last_refill
            local refill_tokens = math.floor(elapsed * refill_rate / refill_time)
            tokens = math.min(capacity, tokens + refill_tokens)
            
            if tokens >= 1 then
                tokens = tokens - 1
                redis.call('hmset', key, 'tokens', tokens, 'last_refill', now)
                redis.call('expire', key, refill_time * 2)
                return 1
            else
                return 0
            end
        ";
        
        return (bool) $this->redis->eval(
            $script,
            1,
            $redisKey,
            $capacity,
            $refillRate,
            $refillTime,
            $now
        );
    }
    
    public function getRemainingRequests(
        string $key,
        int $maxRequests,
        int $windowSeconds
    ): int {
        $redisKey = self::RATE_LIMIT_PREFIX . $key;
        
        $current = (int) $this->redis->get($redisKey);
        
        return max(0, $maxRequests - $current);
    }
    
    public function getResetTime(string $key): int
    {
        $redisKey = self::RATE_LIMIT_PREFIX . $key;
        
        $ttl = $this->redis->ttl($redisKey);
        
        return $ttl > 0 ? time() + $ttl : 0;
    }
    
    public function throttle(
        string $key,
        int $maxRequests,
        int $windowSeconds,
        callable $callback
    ) {
        if (!$this->isAllowed($key, $maxRequests, $windowSeconds)) {
            $resetTime = $this->getResetTime($key);
            throw new RuntimeException(
                "Rate limit exceeded. Reset at: " . date('c', $resetTime)
            );
        }
        
        return $callback();
    }
    
    public function publishRateLimitEvent(
        string $key,
        int $maxRequests,
        int $windowSeconds
    ): void {
        $remaining = $this->getRemainingRequests($key, $maxRequests, $windowSeconds);
        
        if ($remaining <= 0) {
            $event = [
                'key' => $key,
                'limit' => $maxRequests,
                'window' => $windowSeconds,
                'timestamp' => date('c'),
            ];
            
            $message = new \PhpAmqpLib\Message\AMQPMessage(
                json_encode($event),
                ['content_type' => 'application/json']
            );
            
            $this->rabbitChannel->basic_publish(
                $message,
                'rate_limit.events',
                'rate_limit.exceeded'
            );
        }
    }
}

// 使用示例
$redis = RedisConfig::getClient();
$rabbit = RabbitMQConfig::getConnection();
$limiter = new RateLimiter($redis, $rabbit);

$apiKey = 'api:user:123';

if ($limiter->isAllowed($apiKey, 100, 60)) {
    echo "请求允许\n";
    echo "剩余请求: " . $limiter->getRemainingRequests($apiKey, 100, 60) . "\n";
} else {
    echo "请求被限流\n";
}

try {
    $result = $limiter->throttle('api:expensive:op', 10, 60, function () {
        return performExpensiveOperation();
    });
} catch (RuntimeException $e) {
    echo "限流: " . $e->getMessage() . "\n";
}

实际应用场景

场景一:订单处理缓存

php
<?php

class OrderCacheService
{
    private Client $redis;
    private $rabbitChannel;
    private CacheSyncManager $cacheManager;
    
    public function __construct(
        Client $redis,
        AMQPStreamConnection $rabbit,
        CacheSyncManager $cacheManager
    ) {
        $this->redis = $redis;
        $this->rabbitChannel = $rabbit->channel();
        $this->cacheManager = $cacheManager;
    }
    
    public function getOrder(string $orderId): array
    {
        return $this->cacheManager->get(
            "order:{$orderId}",
            function () use ($orderId) {
                return $this->loadOrderFromDatabase($orderId);
            },
            3600
        );
    }
    
    public function updateOrder(string $orderId, array $updates): bool
    {
        $this->updateOrderInDatabase($orderId, $updates);
        
        $this->cacheManager->invalidate("order:{$orderId}", 'order_updated');
        
        $this->publishOrderUpdate($orderId, $updates);
        
        return true;
    }
    
    private function loadOrderFromDatabase(string $orderId): array
    {
        // 从数据库加载订单
        return [
            'id' => $orderId,
            'status' => 'pending',
            'amount' => 100.00,
        ];
    }
    
    private function updateOrderInDatabase(string $orderId, array $updates): void
    {
        // 更新数据库
    }
    
    private function publishOrderUpdate(string $orderId, array $updates): void
    {
        $event = [
            'orderId' => $orderId,
            'updates' => $updates,
            'timestamp' => date('c'),
        ];
        
        $message = new \PhpAmqpLib\Message\AMQPMessage(
            json_encode($event),
            ['content_type' => 'application/json']
        );
        
        $this->rabbitChannel->basic_publish(
            $message,
            'order.events',
            'order.updated'
        );
    }
}

场景二:分布式任务调度

php
<?php

class DistributedTaskScheduler
{
    private Client $redis;
    private $rabbitChannel;
    private DistributedLockManager $lockManager;
    
    public function __construct(
        Client $redis,
        AMQPStreamConnection $rabbit,
        DistributedLockManager $lockManager
    ) {
        $this->redis = $redis;
        $this->rabbitChannel = $rabbit->channel();
        $this->lockManager = $lockManager;
    }
    
    public function scheduleTask(
        string $taskId,
        string $taskType,
        array $payload,
        int $delay = 0
    ): bool {
        $lockResource = "task:schedule:{$taskId}";
        
        return $this->lockManager->withLock($lockResource, function () use (
            $taskId,
            $taskType,
            $payload,
            $delay
        ) {
            $taskKey = "task:{$taskId}";
            
            if ($this->redis->exists($taskKey)) {
                return false;
            }
            
            $task = [
                'id' => $taskId,
                'type' => $taskType,
                'payload' => $payload,
                'status' => 'scheduled',
                'scheduled_at' => date('c'),
                'execute_at' => date('c', time() + $delay),
            ];
            
            $this->redis->setex($taskKey, 86400, json_encode($task));
            
            $message = new \PhpAmqpLib\Message\AMQPMessage(
                json_encode($task),
                [
                    'content_type' => 'application/json',
                    'delivery_mode' => \PhpAmqpLib\Message\AMQPMessage::DELIVERY_MODE_PERSISTENT,
                ]
            );
            
            if ($delay > 0) {
                $this->rabbitChannel->queue_declare(
                    "task.delayed.{$delay}",
                    false,
                    true,
                    false,
                    false,
                    false,
                    new \PhpAmqpLib\Wire\AMQPTable([
                        'x-message-ttl' => $delay * 1000,
                        'x-dead-letter-exchange' => 'task.processing',
                    ])
                );
                
                $this->rabbitChannel->basic_publish(
                    $message,
                    '',
                    "task.delayed.{$delay}"
                );
            } else {
                $this->rabbitChannel->basic_publish(
                    $message,
                    'task.processing',
                    $taskType
                );
            }
            
            return true;
        }, 10, 5);
    }
    
    public function cancelTask(string $taskId): bool
    {
        $lockResource = "task:cancel:{$taskId}";
        
        return $this->lockManager->withLock($lockResource, function () use ($taskId) {
            $taskKey = "task:{$taskId}";
            
            $taskData = $this->redis->get($taskKey);
            
            if (!$taskData) {
                return false;
            }
            
            $task = json_decode($taskData, true);
            $task['status'] = 'cancelled';
            $task['cancelled_at'] = date('c');
            
            $this->redis->setex($taskKey, 3600, json_encode($task));
            
            return true;
        }, 10, 5);
    }
    
    public function getTaskStatus(string $taskId): ?array
    {
        $taskKey = "task:{$taskId}";
        
        $taskData = $this->redis->get($taskKey);
        
        if (!$taskData) {
            return null;
        }
        
        return json_decode($taskData, true);
    }
}

常见问题与解决方案

问题一:缓存穿透

症状: 大量请求查询不存在的数据,绕过缓存直接访问数据库

解决方案: 使用布隆过滤器或空值缓存

php
public function getWithBloomFilter(string $key, callable $loader, int $ttl = 3600)
{
    $bloomKey = 'bloom:' . $key;
    
    if (!$this->redis->getbit($bloomKey, $this->hash($key))) {
        return null;
    }
    
    return $this->get($key, $loader, $ttl);
}

问题二:缓存雪崩

症状: 大量缓存同时失效,导致数据库压力剧增

解决方案: 添加随机过期时间

php
public function setWithRandomTTL(string $key, $data, int $baseTtl = 3600): bool
{
    $randomOffset = rand(0, $baseTtl * 0.1);
    $ttl = $baseTtl + $randomOffset;
    
    return $this->redis->setex($key, $ttl, json_encode($data)) === 'OK';
}

问题三:分布式锁死锁

症状: 锁未正确释放,导致后续请求无法获取锁

解决方案: 添加锁超时和自动续期

php
public function acquireWithWatchdog(string $resource, int $ttl = 30): ?string
{
    if (!$this->acquire($resource, $ttl)) {
        return null;
    }
    
    $lockValue = $this->heldLocks[$resource]['value'];
    
    $watchdog = new Swoole\Timer($ttl * 500, function () use ($resource, $lockValue, $ttl) {
        $this->extend($resource, $ttl);
    });
    
    return $lockValue;
}

最佳实践建议

1. 连接池管理

php
class RedisConnectionPool
{
    private static ?Client $connection = null;
    
    public static function getConnection(): Client
    {
        if (self::$connection === null) {
            self::$connection = RedisConfig::getClient();
        }
        return self::$connection;
    }
}

2. 错误处理

php
class RedisErrorHandler
{
    public static function handle(Exception $e, array $context = []): void
    {
        error_log(sprintf(
            "[Redis Error] %s in %s:%d\nContext: %s",
            $e->getMessage(),
            $e->getFile(),
            $e->getLine(),
            json_encode($context)
        ));
    }
}

3. 监控指标

php
class RedisMetrics
{
    public static function recordHit(string $key): void
    {
        // 记录缓存命中
    }
    
    public static function recordMiss(string $key): void
    {
        // 记录缓存未命中
    }
    
    public static function recordLatency(string $operation, float $time): void
    {
        // 记录操作延迟
    }
}

版本兼容性

PHPPredisphpredisRedis ServerRabbitMQ Server
8.2+2.x6.0+7.x3.11+
8.1+2.x5.3+7.x3.10+
8.0+2.x5.3+6.x/7.x3.9+
7.4+1.x/2.x5.3+6.x3.8+

相关链接