Appearance
游戏行业应用
概述
游戏行业对实时性、并发性和用户体验有极高要求。RabbitMQ 在游戏中可用于实时通知、排行榜更新、匹配系统、聊天消息等场景,确保游戏体验的流畅和稳定。
业务背景与需求
场景描述
某多人在线游戏系统模块:
| 模块 | 功能 | 消息场景 |
|---|---|---|
| 匹配系统 | 玩家匹配、房间创建 | 匹配事件、房间通知 |
| 战斗系统 | 战斗同步、状态更新 | 战斗事件、状态同步 |
| 聊天系统 | 世界频道、私聊、组队 | 聊天消息、系统通知 |
| 排行榜 | 积分排名、奖励发放 | 排名更新、奖励通知 |
| 活动系统 | 限时活动、任务系统 | 活动通知、任务更新 |
| 社交系统 | 好友、公会、交易 | 社交事件、交易通知 |
技术挑战
游戏系统挑战:
1. 低延迟:操作响应需毫秒级
2. 高并发:百万玩家同时在线
3. 实时同步:状态需要即时更新
4. 公平性:匹配算法公平合理架构设计
整体架构图
mermaid
graph TB
subgraph "客户端"
A[游戏客户端]
B[Web客户端]
end
subgraph "接入层"
C[WebSocket网关]
D[游戏服务器]
end
subgraph "RabbitMQ"
E[游戏事件总线<br/>game.exchange]
subgraph "业务队列"
F[匹配队列]
G[聊天队列]
H[排行榜队列]
I[通知队列]
end
end
subgraph "服务层"
J[匹配服务]
K[聊天服务]
L[排行榜服务]
M[活动服务]
end
subgraph "数据层"
N[Redis]
O[MySQL]
end
A --> C
A --> D
B --> C
C --> E
D --> E
E --> F
E --> G
E --> H
E --> I
F --> J
G --> K
H --> L
I --> M
J --> N
K --> N
L --> N
M --> O匹配流程
mermaid
sequenceDiagram
participant Player as 玩家
participant Gateway as 网关
participant MQ as RabbitMQ
participant Match as 匹配服务
participant Game as 游戏服务器
Player->>Gateway: 发起匹配请求
Gateway->>MQ: 发布匹配请求
MQ->>Match: 投递请求
Match->>Match: 查找合适对手
Match->>Match: 创建对局
alt 匹配成功
Match->>MQ: 发布匹配成功事件
MQ->>Gateway: 投递通知
Gateway->>Player: 推送匹配结果
Match->>Game: 创建游戏房间
else 匹配超时
Match->>MQ: 发布匹配超时事件
MQ->>Gateway: 投递通知
Gateway->>Player: 推送超时提示
endPHP 代码实现
游戏消息结构
php
<?php
namespace App\Game;
class GameMessage
{
public string $messageId;
public string $messageType;
public string $serverId;
public int $timestamp;
public array $payload;
public function __construct(string $messageType, array $payload, string $serverId = 'server-1')
{
$this->messageId = $this->generateMessageId();
$this->messageType = $messageType;
$this->serverId = $serverId;
$this->timestamp = intval(microtime(true) * 1000);
$this->payload = $payload;
}
private function generateMessageId(): string
{
return sprintf('gm_%s_%s', date('YmdHis'), bin2hex(random_bytes(4)));
}
public function toArray(): array
{
return [
'message_id' => $this->messageId,
'message_type' => $this->messageType,
'server_id' => $this->serverId,
'timestamp' => $this->timestamp,
'payload' => $this->payload,
];
}
}匹配服务实现
php
<?php
namespace App\Game;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Exchange\AMQPExchangeType;
class MatchService
{
private AMQPStreamConnection $connection;
private $channel;
private $redis;
private string $exchangeName = 'game.exchange';
private array $matchQueues = [];
public function __construct(AMQPStreamConnection $connection, $redis)
{
$this->connection = $connection;
$this->channel = $connection->channel();
$this->redis = $redis;
$this->setupInfrastructure();
}
private function setupInfrastructure(): void
{
$this->channel->exchange_declare(
$this->exchangeName,
AMQPExchangeType::TOPIC,
false,
true,
false
);
$this->channel->queue_declare('game.match', false, true, false, false);
$this->channel->queue_bind('game.match', $this->exchangeName, 'match.#');
$this->channel->queue_declare('game.notification', false, true, false, false);
$this->channel->queue_bind('game.notification', $this->exchangeName, 'notification.#');
}
public function joinMatch(string $playerId, int $score, string $gameMode): string
{
$matchId = sprintf('match_%s', bin2hex(random_bytes(8)));
$player = [
'player_id' => $playerId,
'score' => $score,
'game_mode' => $gameMode,
'join_time' => microtime(true),
'match_id' => $matchId,
];
$queueKey = "match:queue:{$gameMode}";
$this->redis->zAdd($queueKey, $score, json_encode($player));
$message = new GameMessage('match.join', [
'player_id' => $playerId,
'score' => $score,
'game_mode' => $gameMode,
'match_id' => $matchId,
]);
$this->publishMessage($message);
return $matchId;
}
public function cancelMatch(string $playerId, string $gameMode): void
{
$queueKey = "match:queue:{$gameMode}";
$players = $this->redis->zRange($queueKey, 0, -1);
foreach ($players as $playerJson) {
$player = json_decode($playerJson, true);
if ($player['player_id'] === $playerId) {
$this->redis->zRem($queueKey, $playerJson);
break;
}
}
$message = new GameMessage('match.cancel', [
'player_id' => $playerId,
'game_mode' => $gameMode,
]);
$this->publishMessage($message);
}
public function processMatch(string $gameMode): ?array
{
$queueKey = "match:queue:{$gameMode}";
$players = $this->redis->zRange($queueKey, 0, 1);
if (count($players) < 2) {
return null;
}
$matchedPlayers = [];
foreach ($players as $playerJson) {
$player = json_decode($playerJson, true);
$matchedPlayers[] = $player;
$this->redis->zRem($queueKey, $playerJson);
}
$roomId = sprintf('room_%s', bin2hex(random_bytes(8)));
$message = new GameMessage('match.success', [
'room_id' => $roomId,
'players' => $matchedPlayers,
'game_mode' => $gameMode,
]);
$this->publishMessage($message);
return [
'room_id' => $roomId,
'players' => $matchedPlayers,
];
}
private function publishMessage(GameMessage $message): void
{
$routingKey = $message->messageType;
$amqpMessage = new AMQPMessage(
json_encode($message->toArray()),
[
'content_type' => 'application/json',
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
'message_id' => $message->messageId,
'timestamp' => time(),
]
);
$this->channel->basic_publish($amqpMessage, $this->exchangeName, $routingKey);
}
}聊天服务实现
php
<?php
namespace App\Game;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
class ChatService
{
private AMQPStreamConnection $connection;
private $channel;
private $redis;
private string $exchangeName = 'game.exchange';
public function __construct(AMQPStreamConnection $connection, $redis)
{
$this->connection = $connection;
$this->channel = $connection->channel();
$this->redis = $redis;
}
public function sendWorldChat(string $playerId, string $playerName, string $content): void
{
$message = new GameMessage('chat.world', [
'channel' => 'world',
'player_id' => $playerId,
'player_name' => $playerName,
'content' => $this->filterContent($content),
'timestamp' => time(),
]);
$this->publishMessage($message, 'chat.world');
$this->redis->lPush('chat:world:history', json_encode($message->toArray()));
$this->redis->lTrim('chat:world:history', 0, 99);
}
public function sendPrivateChat(string $fromPlayerId, string $toPlayerId, string $content): void
{
$message = new GameMessage('chat.private', [
'channel' => 'private',
'from_player_id' => $fromPlayerId,
'to_player_id' => $toPlayerId,
'content' => $this->filterContent($content),
'timestamp' => time(),
]);
$this->publishMessage($message, "chat.private.{$toPlayerId}");
}
public function sendGuildChat(string $guildId, string $playerId, string $playerName, string $content): void
{
$message = new GameMessage('chat.guild', [
'channel' => 'guild',
'guild_id' => $guildId,
'player_id' => $playerId,
'player_name' => $playerName,
'content' => $this->filterContent($content),
'timestamp' => time(),
]);
$this->publishMessage($message, "chat.guild.{$guildId}");
}
private function filterContent(string $content): string
{
$sensitiveWords = ['敏感词1', '敏感词2'];
return str_replace($sensitiveWords, '***', $content);
}
private function publishMessage(GameMessage $message, string $routingKey): void
{
$amqpMessage = new AMQPMessage(
json_encode($message->toArray()),
[
'content_type' => 'application/json',
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
]
);
$this->channel->basic_publish($amqpMessage, $this->exchangeName, $routingKey);
}
}排行榜服务实现
php
<?php
namespace App\Game;
class LeaderboardService
{
private $redis;
private $channel;
private string $exchangeName = 'game.exchange';
public function __construct($redis, $channel)
{
$this->redis = $redis;
$this->channel = $channel;
}
public function updateScore(string $playerId, int $score, string $season = 'current'): void
{
$key = "leaderboard:{$season}";
$oldRank = $this->redis->zRevRank($key, $playerId);
$this->redis->zAdd($key, $score, $playerId);
$newRank = $this->redis->zRevRank($key, $playerId);
if ($oldRank !== $newRank) {
$this->notifyRankChange($playerId, $oldRank, $newRank, $score);
}
}
public function getTopPlayers(string $season, int $limit = 100): array
{
$key = "leaderboard:{$season}";
$players = $this->redis->zRevRange($key, 0, $limit - 1, true);
$result = [];
$rank = 1;
foreach ($players as $playerId => $score) {
$result[] = [
'rank' => $rank++,
'player_id' => $playerId,
'score' => $score,
];
}
return $result;
}
public function getPlayerRank(string $playerId, string $season): ?array
{
$key = "leaderboard:{$season}";
$rank = $this->redis->zRevRank($key, $playerId);
$score = $this->redis->zScore($key, $playerId);
if ($rank === false) {
return null;
}
return [
'rank' => $rank + 1,
'score' => $score,
];
}
public function getAroundPlayers(string $playerId, string $season, int $range = 5): array
{
$key = "leaderboard:{$season}";
$rank = $this->redis->zRevRank($key, $playerId);
if ($rank === false) {
return [];
}
$start = max(0, $rank - $range);
$end = $rank + $range;
$players = $this->redis->zRevRange($key, $start, $end, true);
$result = [];
$currentRank = $start + 1;
foreach ($players as $pid => $score) {
$result[] = [
'rank' => $currentRank++,
'player_id' => $pid,
'score' => $score,
'is_self' => $pid === $playerId,
];
}
return $result;
}
private function notifyRankChange(string $playerId, $oldRank, $newRank, int $score): void
{
$message = new GameMessage('leaderboard.change', [
'player_id' => $playerId,
'old_rank' => $oldRank !== false ? $oldRank + 1 : null,
'new_rank' => $newRank + 1,
'score' => $score,
]);
$amqpMessage = new AMQPMessage(
json_encode($message->toArray()),
['content_type' => 'application/json']
);
$this->channel->basic_publish(
$amqpMessage,
$this->exchangeName,
"notification.player.{$playerId}"
);
}
}通知消费者实现
php
<?php
namespace App\Game;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
class NotificationConsumer
{
private AMQPStreamConnection $connection;
private $channel;
private WebSocketServer $wsServer;
private bool $running = true;
public function __construct(
AMQPStreamConnection $connection,
WebSocketServer $wsServer
) {
$this->connection = $connection;
$this->channel = $connection->channel();
$this->wsServer = $wsServer;
}
public function consume(): void
{
$this->channel->basic_qos(null, 100, null);
$this->channel->basic_consume('game.notification', '', false, false, false, false, [$this, 'handleMessage']);
while ($this->running && count($this->channel->callbacks)) {
$this->channel->wait(null, true);
if (!$this->running) break;
usleep(10000);
}
}
public function handleMessage(AMQPMessage $message): void
{
$data = json_decode($message->body, true);
$messageType = $data['message_type'];
$payload = $data['payload'];
switch ($messageType) {
case 'match.success':
$this->handleMatchSuccess($payload);
break;
case 'leaderboard.change':
$this->handleLeaderboardChange($payload);
break;
case 'chat.world':
$this->handleWorldChat($payload);
break;
default:
$this->handleGenericNotification($payload);
}
$message->ack();
}
private function handleMatchSuccess(array $payload): void
{
foreach ($payload['players'] as $player) {
$this->wsServer->sendToPlayer($player['player_id'], json_encode([
'type' => 'match_success',
'room_id' => $payload['room_id'],
'game_mode' => $payload['game_mode'],
]));
}
}
private function handleLeaderboardChange(array $payload): void
{
$this->wsServer->sendToPlayer($payload['player_id'], json_encode([
'type' => 'rank_change',
'old_rank' => $payload['old_rank'],
'new_rank' => $payload['new_rank'],
'score' => $payload['score'],
]));
}
private function handleWorldChat(array $payload): void
{
$this->wsServer->broadcast(json_encode([
'type' => 'world_chat',
'player_name' => $payload['player_name'],
'content' => $payload['content'],
'timestamp' => $payload['timestamp'],
]));
}
private function handleGenericNotification(array $payload): void
{
if (isset($payload['player_id'])) {
$this->wsServer->sendToPlayer($payload['player_id'], json_encode($payload));
}
}
public function stop(): void
{
$this->running = false;
}
}完整使用示例
php
<?php
require_once 'vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use App\Game\{MatchService, ChatService, LeaderboardService, NotificationConsumer};
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$redis = new Redis();
$redis->connect('localhost', 6379);
$matchService = new MatchService($connection, $redis);
$chatService = new ChatService($connection, $redis);
$leaderboardService = new LeaderboardService($redis, $connection->channel());
echo "=== 游戏系统示例 ===\n\n";
echo "1. 玩家加入匹配\n";
$matchId1 = $matchService->joinMatch('player_001', 1500, 'ranked');
$matchId2 = $matchService->joinMatch('player_002', 1520, 'ranked');
echo "玩家1匹配ID: {$matchId1}\n";
echo "玩家2匹配ID: {$matchId2}\n";
echo "\n2. 处理匹配\n";
$result = $matchService->processMatch('ranked');
if ($result) {
echo "匹配成功! 房间ID: {$result['room_id']}\n";
}
echo "\n3. 发送世界聊天\n";
$chatService->sendWorldChat('player_001', '玩家一', '大家好!');
echo "消息已发送\n";
echo "\n4. 更新排行榜\n";
$leaderboardService->updateScore('player_001', 1650);
$leaderboardService->updateScore('player_002', 1580);
echo "积分已更新\n";
echo "\n5. 查询排行榜\n";
$topPlayers = $leaderboardService->getTopPlayers('current', 10);
foreach ($topPlayers as $player) {
echo "第{$player['rank']}名: {$player['player_id']} - {$player['score']}分\n";
}
$connection->close();
echo "\n=== 示例完成 ===\n";关键技术点解析
1. 快速匹配
使用 Redis 有序集合实现快速匹配:
php
$this->redis->zAdd($queueKey, $score, json_encode($player));
$players = $this->redis->zRange($queueKey, 0, 1);2. 实时推送
通过 WebSocket 实现实时推送:
php
$this->wsServer->sendToPlayer($playerId, json_encode($notification));3. 排行榜
使用 Redis 有序集合:
php
$this->redis->zAdd($key, $score, $playerId);
$rank = $this->redis->zRevRank($key, $playerId);4. 消息广播
php
$this->wsServer->broadcast(json_encode($message));性能优化建议
| 优化项 | 建议 | 说明 |
|---|---|---|
| 连接复用 | 保持长连接 | 减少连接开销 |
| 批量推送 | 合并消息推送 | 减少网络请求 |
| 本地缓存 | 缓存热点数据 | 减少Redis访问 |
| 分区处理 | 按游戏模式分区 | 提高并行度 |
常见问题与解决方案
1. 匹配延迟
解决方案: 优化匹配算法 + 增加匹配服务器
2. 消息丢失
解决方案: 消息持久化 + 重试机制
3. 排行榜更新慢
解决方案: 异步更新 + 批量处理
