Appearance
RabbitMQ 与 Redis 集成
概述
Redis 是一个高性能的内存数据结构存储系统,常用于缓存、会话管理、消息队列、实时分析等场景。将 RabbitMQ 与 Redis 集成,可以实现缓存更新通知、分布式锁、消息去重、限流等功能,构建更强大的分布式系统。
本教程将详细介绍 RabbitMQ 与 Redis 的集成方案,包括缓存同步、分布式锁、消息去重等核心功能。
集成架构设计
架构图
┌─────────────────────────────────────────────────────────────────────┐
│ Redis 集成架构 │
│ │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ 应用层 │ │
│ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │
│ │ │ Cache │ │ Lock │ │ Rate │ │ │
│ │ │ Manager │ │ Manager │ │ Limiter │ │ │
│ │ └─────────────┘ └─────────────┘ └─────────────┘ │ │
│ └─────────────────────────────────────────────────────────────┘ │
│ │ │
│ ┌───────────────────┼───────────────────┐ │
│ ▼ ▼ ▼ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ RabbitMQ │ │ Redis │ │ Database │ │
│ │ (消息队列) │ │ (缓存) │ │ (持久化) │ │
│ │ │ │ │ │ │ │
│ │ • 事件通知 │ │ • 缓存存储 │ │ • 数据存储 │ │
│ │ • 异步处理 │ │ • 分布式锁 │ │ • 事务管理 │ │
│ │ • 解耦系统 │ │ • 限流计数 │ │ │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
│ │ │ │ │
│ └───────────────────┴───────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ 同步与协调层 │ │
│ │ • 缓存失效通知 │ │
│ │ • 分布式锁协调 │ │
│ │ • 消息去重处理 │ │
│ └─────────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────────┘集成模式
| 模式 | 说明 |
|---|---|
| Cache Invalidation | 缓存失效,通过消息通知更新缓存 |
| Distributed Lock | 分布式锁,协调分布式资源访问 |
| Message Deduplication | 消息去重,防止重复处理 |
| Rate Limiting | 限流,控制消息处理速率 |
| Idempotency | 幂等性,确保操作可重复执行 |
配置示例
Redis PHP 客户端配置
php
<?php
require_once __DIR__ . '/vendor/autoload.php';
use Predis\Client;
class RedisConfig
{
public static function getClient(): Client
{
$config = [
'scheme' => getenv('REDIS_SCHEME') ?: 'tcp',
'host' => getenv('REDIS_HOST') ?: 'localhost',
'port' => getenv('REDIS_PORT') ?: 6379,
'password' => getenv('REDIS_PASSWORD') ?: null,
'database' => getenv('REDIS_DATABASE') ?: 0,
];
$options = [
'cluster' => getenv('REDIS_CLUSTER') ?: null,
'replication' => getenv('REDIS_REPLICATION') ?: null,
'connections' => [
'tcp' => 'Predis\Connection\PhpiredisStreamConnection',
],
'prefix' => getenv('REDIS_PREFIX') ?: 'app:',
];
return new Client($config, $options);
}
public static function getClusterClient(): Client
{
$nodes = explode(',', getenv('REDIS_CLUSTER_NODES') ?: 'localhost:6379');
$parameters = array_map(function ($node) {
$parts = explode(':', $node);
return [
'host' => $parts[0],
'port' => $parts[1] ?? 6379,
];
}, $nodes);
return new Client($parameters, [
'cluster' => 'redis',
]);
}
}RabbitMQ 连接配置
php
<?php
use PhpAmqpLib\Connection\AMQPStreamConnection;
class RabbitMQConfig
{
public static function getConnection(): AMQPStreamConnection
{
return new AMQPStreamConnection(
getenv('RABBITMQ_HOST') ?: 'localhost',
getenv('RABBITMQ_PORT') ?: 5672,
getenv('RABBITMQ_USER') ?: 'guest',
getenv('RABBITMQ_PASSWORD') ?: 'guest',
getenv('RABBITMQ_VHOST') ?: '/'
);
}
}PHP 代码示例
缓存同步管理器
php
<?php
class CacheSyncManager
{
private Client $redis;
private $rabbitChannel;
private $rabbitConnection;
private const CACHE_INVALIDATION_EXCHANGE = 'cache.invalidation';
private const CACHE_INVALIDATION_QUEUE = 'cache.invalidation.queue';
public function __construct(Client $redis, AMQPStreamConnection $rabbitConnection)
{
$this->redis = $redis;
$this->rabbitConnection = $rabbitConnection;
$this->rabbitChannel = $rabbitConnection->channel();
$this->setupInfrastructure();
}
private function setupInfrastructure(): void
{
$this->rabbitChannel->exchange_declare(
self::CACHE_INVALIDATION_EXCHANGE,
'fanout',
false,
true,
false
);
$this->rabbitChannel->queue_declare(
self::CACHE_INVALIDATION_QUEUE,
false,
true,
false,
false
);
$this->rabbitChannel->queue_bind(
self::CACHE_INVALIDATION_QUEUE,
self::CACHE_INVALIDATION_EXCHANGE
);
}
public function get(string $key, callable $loader, int $ttl = 3600)
{
$cached = $this->redis->get($key);
if ($cached !== null) {
return json_decode($cached, true);
}
$data = $loader();
$this->redis->setex($key, $ttl, json_encode($data));
return $data;
}
public function set(string $key, $data, int $ttl = 3600): bool
{
return $this->redis->setex($key, $ttl, json_encode($data)) === 'OK';
}
public function invalidate(string $key, string $reason = 'manual'): bool
{
$this->redis->del([$key]);
$this->publishInvalidation($key, $reason);
return true;
}
public function invalidatePattern(string $pattern, string $reason = 'pattern'): int
{
$keys = $this->redis->keys($pattern);
if (empty($keys)) {
return 0;
}
$deleted = $this->redis->del($keys);
foreach ($keys as $key) {
$this->publishInvalidation($key, $reason);
}
return $deleted;
}
private function publishInvalidation(string $key, string $reason): void
{
$event = [
'key' => $key,
'reason' => $reason,
'timestamp' => date('c'),
'hostname' => gethostname(),
];
$message = new \PhpAmqpLib\Message\AMQPMessage(
json_encode($event),
[
'content_type' => 'application/json',
'delivery_mode' => \PhpAmqpLib\Message\AMQPMessage::DELIVERY_MODE_PERSISTENT,
]
);
$this->rabbitChannel->basic_publish(
$message,
self::CACHE_INVALIDATION_EXCHANGE
);
}
public function subscribeInvalidations(callable $handler): void
{
$callback = function ($message) use ($handler) {
$event = json_decode($message->getBody(), true);
$handler($event);
$message->ack();
};
$this->rabbitChannel->basic_qos(0, 10, false);
$this->rabbitChannel->basic_consume(
self::CACHE_INVALIDATION_QUEUE,
'',
false,
false,
false,
false,
$callback
);
while ($this->rabbitChannel->is_consuming()) {
$this->rabbitChannel->wait();
}
}
public function close(): void
{
$this->rabbitChannel->close();
$this->rabbitConnection->close();
}
}
// 使用示例
$redis = RedisConfig::getClient();
$rabbit = RabbitMQConfig::getConnection();
$cache = new CacheSyncManager($redis, $rabbit);
$user = $cache->get('user:123', function () {
return ['id' => 123, 'name' => 'John Doe', 'email' => 'john@example.com'];
}, 3600);
$cache->invalidate('user:123', 'user_updated');
$cache->close();分布式锁管理器
php
<?php
class DistributedLockManager
{
private Client $redis;
private $rabbitChannel;
private $rabbitConnection;
private const LOCK_PREFIX = 'lock:';
private const LOCK_EVENT_EXCHANGE = 'lock.events';
private array $heldLocks = [];
public function __construct(Client $redis, AMQPStreamConnection $rabbitConnection)
{
$this->redis = $redis;
$this->rabbitConnection = $rabbitConnection;
$this->rabbitChannel = $rabbitConnection->channel();
$this->setupInfrastructure();
}
private function setupInfrastructure(): void
{
$this->rabbitChannel->exchange_declare(
self::LOCK_EVENT_EXCHANGE,
'topic',
false,
true,
false
);
}
public function acquire(
string $resource,
int $ttl = 30,
int $maxWait = 10,
int $retryInterval = 100
): bool {
$lockKey = self::LOCK_PREFIX . $resource;
$lockValue = $this->generateLockValue();
$startTime = time();
while (time() - $startTime < $maxWait) {
$acquired = $this->tryAcquire($lockKey, $lockValue, $ttl);
if ($acquired) {
$this->heldLocks[$resource] = [
'key' => $lockKey,
'value' => $lockValue,
'acquired_at' => time(),
];
$this->publishLockEvent($resource, 'acquired', $lockValue);
return true;
}
usleep($retryInterval * 1000);
}
return false;
}
private function tryAcquire(string $key, string $value, int $ttl): bool
{
$result = $this->redis->set($key, $value, 'NX', 'EX', $ttl);
return $result === 'OK';
}
public function release(string $resource): bool
{
if (!isset($this->heldLocks[$resource])) {
return false;
}
$lock = $this->heldLocks[$resource];
$script = "
if redis.call('get', KEYS[1]) == ARGV[1] then
return redis.call('del', KEYS[1])
else
return 0
end
";
$result = $this->redis->eval($script, 1, $lock['key'], $lock['value']);
if ($result) {
unset($this->heldLocks[$resource]);
$this->publishLockEvent($resource, 'released', $lock['value']);
return true;
}
return false;
}
public function extend(string $resource, int $ttl = 30): bool
{
if (!isset($this->heldLocks[$resource])) {
return false;
}
$lock = $this->heldLocks[$resource];
$script = "
if redis.call('get', KEYS[1]) == ARGV[1] then
return redis.call('expire', KEYS[1], ARGV[2])
else
return 0
end
";
return (bool) $this->redis->eval($script, 1, $lock['key'], $lock['value'], $ttl);
}
public function withLock(
string $resource,
callable $callback,
int $ttl = 30,
int $maxWait = 10
) {
if (!$this->acquire($resource, $ttl, $maxWait)) {
throw new RuntimeException("无法获取锁: {$resource}");
}
try {
return $callback();
} finally {
$this->release($resource);
}
}
private function generateLockValue(): string
{
return sprintf(
'%s:%s:%s',
gethostname(),
getmypid(),
uniqid('', true)
);
}
private function publishLockEvent(
string $resource,
string $action,
string $lockValue
): void {
$event = [
'resource' => $resource,
'action' => $action,
'lock_value' => $lockValue,
'timestamp' => date('c'),
];
$message = new \PhpAmqpLib\Message\AMQPMessage(
json_encode($event),
['content_type' => 'application/json']
);
$this->rabbitChannel->basic_publish(
$message,
self::LOCK_EVENT_EXCHANGE,
"lock.{$action}"
);
}
public function close(): void
{
foreach (array_keys($this->heldLocks) as $resource) {
$this->release($resource);
}
$this->rabbitChannel->close();
$this->rabbitConnection->close();
}
}
// 使用示例
$redis = RedisConfig::getClient();
$rabbit = RabbitMQConfig::getConnection();
$lockManager = new DistributedLockManager($redis, $rabbit);
if ($lockManager->acquire('order:123', 30, 10)) {
try {
// 执行需要锁保护的操作
processOrder(123);
} finally {
$lockManager->release('order:123');
}
}
$result = $lockManager->withLock('inventory:update', function () {
return updateInventory();
}, 30, 10);
$lockManager->close();消息去重处理器
php
<?php
class MessageDeduplicator
{
private Client $redis;
private $rabbitChannel;
private const DEDUP_PREFIX = 'dedup:';
private const DEDUP_TTL = 86400;
public function __construct(Client $redis, AMQPStreamConnection $rabbit)
{
$this->redis = $redis;
$this->rabbitChannel = $rabbit->channel();
}
public function isDuplicate(string $messageId): bool
{
$key = self::DEDUP_PREFIX . $messageId;
return $this->redis->exists($key) > 0;
}
public function markProcessed(string $messageId, array $metadata = []): bool
{
$key = self::DEDUP_PREFIX . $messageId;
$data = array_merge([
'processed_at' => date('c'),
'hostname' => gethostname(),
], $metadata);
return $this->redis->setex($key, self::DEDUP_TTL, json_encode($data)) === 'OK';
}
public function processWithDedup(
$message,
callable $processor,
string $idField = 'message_id'
): bool {
$headers = $message->has('application_headers')
? $message->get('application_headers')->getNativeData()
: [];
$messageId = $headers[$idField] ?? $message->get('message_id');
if ($this->isDuplicate($messageId)) {
$message->ack();
return false;
}
try {
$result = $processor($message);
$this->markProcessed($messageId, [
'result' => $result ? 'success' : 'failed',
]);
$message->ack();
return true;
} catch (Exception $e) {
$message->nack(false, true);
throw $e;
}
}
public function getProcessedInfo(string $messageId): ?array
{
$key = self::DEDUP_PREFIX . $messageId;
$data = $this->redis->get($key);
if ($data === null) {
return null;
}
return json_decode($data, true);
}
public function clearProcessed(string $messageId): bool
{
$key = self::DEDUP_PREFIX . $messageId;
return $this->redis->del([$key]) > 0;
}
}
// 使用示例
$redis = RedisConfig::getClient();
$rabbit = RabbitMQConfig::getConnection();
$dedup = new MessageDeduplicator($redis, $rabbit);
$callback = function ($message) use ($dedup) {
$dedup->processWithDedup($message, function ($msg) {
$data = json_decode($msg->getBody(), true);
echo "处理消息: " . json_encode($data) . "\n";
return true;
});
};
$channel = $rabbit->channel();
$channel->basic_consume('queue.orders', '', false, false, false, false, $callback);
while ($channel->is_consuming()) {
$channel->wait();
}限流器
php
<?php
class RateLimiter
{
private Client $redis;
private $rabbitChannel;
private const RATE_LIMIT_PREFIX = 'rate_limit:';
public function __construct(Client $redis, AMQPStreamConnection $rabbit)
{
$this->redis = $redis;
$this->rabbitChannel = $rabbit->channel();
}
public function isAllowed(
string $key,
int $maxRequests,
int $windowSeconds
): bool {
$redisKey = self::RATE_LIMIT_PREFIX . $key;
$current = $this->redis->incr($redisKey);
if ($current === 1) {
$this->redis->expire($redisKey, $windowSeconds);
}
return $current <= $maxRequests;
}
public function isAllowedSlidingWindow(
string $key,
int $maxRequests,
int $windowSeconds
): bool {
$redisKey = self::RATE_LIMIT_PREFIX . $key;
$now = microtime(true);
$windowStart = $now - $windowSeconds;
$this->redis->multi();
$this->redis->zremrangebyscore($redisKey, 0, $windowStart);
$this->redis->zcard($redisKey);
$this->redis->zadd($redisKey, [$now => $now]);
$this->redis->expire($redisKey, $windowSeconds);
$results = $this->redis->exec();
$count = $results[1];
return $count < $maxRequests;
}
public function isAllowedTokenBucket(
string $key,
int $capacity,
int $refillRate,
int $refillTime
): bool {
$redisKey = self::RATE_LIMIT_PREFIX . $key;
$now = time();
$script = "
local key = KEYS[1]
local capacity = tonumber(ARGV[1])
local refill_rate = tonumber(ARGV[2])
local refill_time = tonumber(ARGV[3])
local now = tonumber(ARGV[4])
local bucket = redis.call('hmget', key, 'tokens', 'last_refill')
local tokens = tonumber(bucket[1]) or capacity
local last_refill = tonumber(bucket[2]) or now
local elapsed = now - last_refill
local refill_tokens = math.floor(elapsed * refill_rate / refill_time)
tokens = math.min(capacity, tokens + refill_tokens)
if tokens >= 1 then
tokens = tokens - 1
redis.call('hmset', key, 'tokens', tokens, 'last_refill', now)
redis.call('expire', key, refill_time * 2)
return 1
else
return 0
end
";
return (bool) $this->redis->eval(
$script,
1,
$redisKey,
$capacity,
$refillRate,
$refillTime,
$now
);
}
public function getRemainingRequests(
string $key,
int $maxRequests,
int $windowSeconds
): int {
$redisKey = self::RATE_LIMIT_PREFIX . $key;
$current = (int) $this->redis->get($redisKey);
return max(0, $maxRequests - $current);
}
public function getResetTime(string $key): int
{
$redisKey = self::RATE_LIMIT_PREFIX . $key;
$ttl = $this->redis->ttl($redisKey);
return $ttl > 0 ? time() + $ttl : 0;
}
public function throttle(
string $key,
int $maxRequests,
int $windowSeconds,
callable $callback
) {
if (!$this->isAllowed($key, $maxRequests, $windowSeconds)) {
$resetTime = $this->getResetTime($key);
throw new RuntimeException(
"Rate limit exceeded. Reset at: " . date('c', $resetTime)
);
}
return $callback();
}
public function publishRateLimitEvent(
string $key,
int $maxRequests,
int $windowSeconds
): void {
$remaining = $this->getRemainingRequests($key, $maxRequests, $windowSeconds);
if ($remaining <= 0) {
$event = [
'key' => $key,
'limit' => $maxRequests,
'window' => $windowSeconds,
'timestamp' => date('c'),
];
$message = new \PhpAmqpLib\Message\AMQPMessage(
json_encode($event),
['content_type' => 'application/json']
);
$this->rabbitChannel->basic_publish(
$message,
'rate_limit.events',
'rate_limit.exceeded'
);
}
}
}
// 使用示例
$redis = RedisConfig::getClient();
$rabbit = RabbitMQConfig::getConnection();
$limiter = new RateLimiter($redis, $rabbit);
$apiKey = 'api:user:123';
if ($limiter->isAllowed($apiKey, 100, 60)) {
echo "请求允许\n";
echo "剩余请求: " . $limiter->getRemainingRequests($apiKey, 100, 60) . "\n";
} else {
echo "请求被限流\n";
}
try {
$result = $limiter->throttle('api:expensive:op', 10, 60, function () {
return performExpensiveOperation();
});
} catch (RuntimeException $e) {
echo "限流: " . $e->getMessage() . "\n";
}实际应用场景
场景一:订单处理缓存
php
<?php
class OrderCacheService
{
private Client $redis;
private $rabbitChannel;
private CacheSyncManager $cacheManager;
public function __construct(
Client $redis,
AMQPStreamConnection $rabbit,
CacheSyncManager $cacheManager
) {
$this->redis = $redis;
$this->rabbitChannel = $rabbit->channel();
$this->cacheManager = $cacheManager;
}
public function getOrder(string $orderId): array
{
return $this->cacheManager->get(
"order:{$orderId}",
function () use ($orderId) {
return $this->loadOrderFromDatabase($orderId);
},
3600
);
}
public function updateOrder(string $orderId, array $updates): bool
{
$this->updateOrderInDatabase($orderId, $updates);
$this->cacheManager->invalidate("order:{$orderId}", 'order_updated');
$this->publishOrderUpdate($orderId, $updates);
return true;
}
private function loadOrderFromDatabase(string $orderId): array
{
// 从数据库加载订单
return [
'id' => $orderId,
'status' => 'pending',
'amount' => 100.00,
];
}
private function updateOrderInDatabase(string $orderId, array $updates): void
{
// 更新数据库
}
private function publishOrderUpdate(string $orderId, array $updates): void
{
$event = [
'orderId' => $orderId,
'updates' => $updates,
'timestamp' => date('c'),
];
$message = new \PhpAmqpLib\Message\AMQPMessage(
json_encode($event),
['content_type' => 'application/json']
);
$this->rabbitChannel->basic_publish(
$message,
'order.events',
'order.updated'
);
}
}场景二:分布式任务调度
php
<?php
class DistributedTaskScheduler
{
private Client $redis;
private $rabbitChannel;
private DistributedLockManager $lockManager;
public function __construct(
Client $redis,
AMQPStreamConnection $rabbit,
DistributedLockManager $lockManager
) {
$this->redis = $redis;
$this->rabbitChannel = $rabbit->channel();
$this->lockManager = $lockManager;
}
public function scheduleTask(
string $taskId,
string $taskType,
array $payload,
int $delay = 0
): bool {
$lockResource = "task:schedule:{$taskId}";
return $this->lockManager->withLock($lockResource, function () use (
$taskId,
$taskType,
$payload,
$delay
) {
$taskKey = "task:{$taskId}";
if ($this->redis->exists($taskKey)) {
return false;
}
$task = [
'id' => $taskId,
'type' => $taskType,
'payload' => $payload,
'status' => 'scheduled',
'scheduled_at' => date('c'),
'execute_at' => date('c', time() + $delay),
];
$this->redis->setex($taskKey, 86400, json_encode($task));
$message = new \PhpAmqpLib\Message\AMQPMessage(
json_encode($task),
[
'content_type' => 'application/json',
'delivery_mode' => \PhpAmqpLib\Message\AMQPMessage::DELIVERY_MODE_PERSISTENT,
]
);
if ($delay > 0) {
$this->rabbitChannel->queue_declare(
"task.delayed.{$delay}",
false,
true,
false,
false,
false,
new \PhpAmqpLib\Wire\AMQPTable([
'x-message-ttl' => $delay * 1000,
'x-dead-letter-exchange' => 'task.processing',
])
);
$this->rabbitChannel->basic_publish(
$message,
'',
"task.delayed.{$delay}"
);
} else {
$this->rabbitChannel->basic_publish(
$message,
'task.processing',
$taskType
);
}
return true;
}, 10, 5);
}
public function cancelTask(string $taskId): bool
{
$lockResource = "task:cancel:{$taskId}";
return $this->lockManager->withLock($lockResource, function () use ($taskId) {
$taskKey = "task:{$taskId}";
$taskData = $this->redis->get($taskKey);
if (!$taskData) {
return false;
}
$task = json_decode($taskData, true);
$task['status'] = 'cancelled';
$task['cancelled_at'] = date('c');
$this->redis->setex($taskKey, 3600, json_encode($task));
return true;
}, 10, 5);
}
public function getTaskStatus(string $taskId): ?array
{
$taskKey = "task:{$taskId}";
$taskData = $this->redis->get($taskKey);
if (!$taskData) {
return null;
}
return json_decode($taskData, true);
}
}常见问题与解决方案
问题一:缓存穿透
症状: 大量请求查询不存在的数据,绕过缓存直接访问数据库
解决方案: 使用布隆过滤器或空值缓存
php
public function getWithBloomFilter(string $key, callable $loader, int $ttl = 3600)
{
$bloomKey = 'bloom:' . $key;
if (!$this->redis->getbit($bloomKey, $this->hash($key))) {
return null;
}
return $this->get($key, $loader, $ttl);
}问题二:缓存雪崩
症状: 大量缓存同时失效,导致数据库压力剧增
解决方案: 添加随机过期时间
php
public function setWithRandomTTL(string $key, $data, int $baseTtl = 3600): bool
{
$randomOffset = rand(0, $baseTtl * 0.1);
$ttl = $baseTtl + $randomOffset;
return $this->redis->setex($key, $ttl, json_encode($data)) === 'OK';
}问题三:分布式锁死锁
症状: 锁未正确释放,导致后续请求无法获取锁
解决方案: 添加锁超时和自动续期
php
public function acquireWithWatchdog(string $resource, int $ttl = 30): ?string
{
if (!$this->acquire($resource, $ttl)) {
return null;
}
$lockValue = $this->heldLocks[$resource]['value'];
$watchdog = new Swoole\Timer($ttl * 500, function () use ($resource, $lockValue, $ttl) {
$this->extend($resource, $ttl);
});
return $lockValue;
}最佳实践建议
1. 连接池管理
php
class RedisConnectionPool
{
private static ?Client $connection = null;
public static function getConnection(): Client
{
if (self::$connection === null) {
self::$connection = RedisConfig::getClient();
}
return self::$connection;
}
}2. 错误处理
php
class RedisErrorHandler
{
public static function handle(Exception $e, array $context = []): void
{
error_log(sprintf(
"[Redis Error] %s in %s:%d\nContext: %s",
$e->getMessage(),
$e->getFile(),
$e->getLine(),
json_encode($context)
));
}
}3. 监控指标
php
class RedisMetrics
{
public static function recordHit(string $key): void
{
// 记录缓存命中
}
public static function recordMiss(string $key): void
{
// 记录缓存未命中
}
public static function recordLatency(string $operation, float $time): void
{
// 记录操作延迟
}
}版本兼容性
| PHP | Predis | phpredis | Redis Server | RabbitMQ Server |
|---|---|---|---|---|
| 8.2+ | 2.x | 6.0+ | 7.x | 3.11+ |
| 8.1+ | 2.x | 5.3+ | 7.x | 3.10+ |
| 8.0+ | 2.x | 5.3+ | 6.x/7.x | 3.9+ |
| 7.4+ | 1.x/2.x | 5.3+ | 6.x | 3.8+ |
