Skip to content

游戏行业应用

概述

游戏行业对实时性、并发性和用户体验有极高要求。RabbitMQ 在游戏中可用于实时通知、排行榜更新、匹配系统、聊天消息等场景,确保游戏体验的流畅和稳定。

业务背景与需求

场景描述

某多人在线游戏系统模块:

模块功能消息场景
匹配系统玩家匹配、房间创建匹配事件、房间通知
战斗系统战斗同步、状态更新战斗事件、状态同步
聊天系统世界频道、私聊、组队聊天消息、系统通知
排行榜积分排名、奖励发放排名更新、奖励通知
活动系统限时活动、任务系统活动通知、任务更新
社交系统好友、公会、交易社交事件、交易通知

技术挑战

游戏系统挑战:
1. 低延迟:操作响应需毫秒级
2. 高并发:百万玩家同时在线
3. 实时同步:状态需要即时更新
4. 公平性:匹配算法公平合理

架构设计

整体架构图

mermaid
graph TB
    subgraph "客户端"
        A[游戏客户端]
        B[Web客户端]
    end
    
    subgraph "接入层"
        C[WebSocket网关]
        D[游戏服务器]
    end
    
    subgraph "RabbitMQ"
        E[游戏事件总线<br/>game.exchange]
        
        subgraph "业务队列"
            F[匹配队列]
            G[聊天队列]
            H[排行榜队列]
            I[通知队列]
        end
    end
    
    subgraph "服务层"
        J[匹配服务]
        K[聊天服务]
        L[排行榜服务]
        M[活动服务]
    end
    
    subgraph "数据层"
        N[Redis]
        O[MySQL]
    end
    
    A --> C
    A --> D
    B --> C
    
    C --> E
    D --> E
    
    E --> F
    E --> G
    E --> H
    E --> I
    
    F --> J
    G --> K
    H --> L
    I --> M
    
    J --> N
    K --> N
    L --> N
    M --> O

匹配流程

mermaid
sequenceDiagram
    participant Player as 玩家
    participant Gateway as 网关
    participant MQ as RabbitMQ
    participant Match as 匹配服务
    participant Game as 游戏服务器
    
    Player->>Gateway: 发起匹配请求
    Gateway->>MQ: 发布匹配请求
    MQ->>Match: 投递请求
    
    Match->>Match: 查找合适对手
    Match->>Match: 创建对局
    
    alt 匹配成功
        Match->>MQ: 发布匹配成功事件
        MQ->>Gateway: 投递通知
        Gateway->>Player: 推送匹配结果
        Match->>Game: 创建游戏房间
    else 匹配超时
        Match->>MQ: 发布匹配超时事件
        MQ->>Gateway: 投递通知
        Gateway->>Player: 推送超时提示
    end

PHP 代码实现

游戏消息结构

php
<?php

namespace App\Game;

class GameMessage
{
    public string $messageId;
    public string $messageType;
    public string $serverId;
    public int $timestamp;
    public array $payload;

    public function __construct(string $messageType, array $payload, string $serverId = 'server-1')
    {
        $this->messageId = $this->generateMessageId();
        $this->messageType = $messageType;
        $this->serverId = $serverId;
        $this->timestamp = intval(microtime(true) * 1000);
        $this->payload = $payload;
    }

    private function generateMessageId(): string
    {
        return sprintf('gm_%s_%s', date('YmdHis'), bin2hex(random_bytes(4)));
    }

    public function toArray(): array
    {
        return [
            'message_id' => $this->messageId,
            'message_type' => $this->messageType,
            'server_id' => $this->serverId,
            'timestamp' => $this->timestamp,
            'payload' => $this->payload,
        ];
    }
}

匹配服务实现

php
<?php

namespace App\Game;

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Exchange\AMQPExchangeType;

class MatchService
{
    private AMQPStreamConnection $connection;
    private $channel;
    private $redis;
    private string $exchangeName = 'game.exchange';
    private array $matchQueues = [];

    public function __construct(AMQPStreamConnection $connection, $redis)
    {
        $this->connection = $connection;
        $this->channel = $connection->channel();
        $this->redis = $redis;
        $this->setupInfrastructure();
    }

    private function setupInfrastructure(): void
    {
        $this->channel->exchange_declare(
            $this->exchangeName,
            AMQPExchangeType::TOPIC,
            false,
            true,
            false
        );

        $this->channel->queue_declare('game.match', false, true, false, false);
        $this->channel->queue_bind('game.match', $this->exchangeName, 'match.#');

        $this->channel->queue_declare('game.notification', false, true, false, false);
        $this->channel->queue_bind('game.notification', $this->exchangeName, 'notification.#');
    }

    public function joinMatch(string $playerId, int $score, string $gameMode): string
    {
        $matchId = sprintf('match_%s', bin2hex(random_bytes(8)));

        $player = [
            'player_id' => $playerId,
            'score' => $score,
            'game_mode' => $gameMode,
            'join_time' => microtime(true),
            'match_id' => $matchId,
        ];

        $queueKey = "match:queue:{$gameMode}";
        $this->redis->zAdd($queueKey, $score, json_encode($player));

        $message = new GameMessage('match.join', [
            'player_id' => $playerId,
            'score' => $score,
            'game_mode' => $gameMode,
            'match_id' => $matchId,
        ]);

        $this->publishMessage($message);

        return $matchId;
    }

    public function cancelMatch(string $playerId, string $gameMode): void
    {
        $queueKey = "match:queue:{$gameMode}";
        $players = $this->redis->zRange($queueKey, 0, -1);

        foreach ($players as $playerJson) {
            $player = json_decode($playerJson, true);
            if ($player['player_id'] === $playerId) {
                $this->redis->zRem($queueKey, $playerJson);
                break;
            }
        }

        $message = new GameMessage('match.cancel', [
            'player_id' => $playerId,
            'game_mode' => $gameMode,
        ]);

        $this->publishMessage($message);
    }

    public function processMatch(string $gameMode): ?array
    {
        $queueKey = "match:queue:{$gameMode}";
        $players = $this->redis->zRange($queueKey, 0, 1);

        if (count($players) < 2) {
            return null;
        }

        $matchedPlayers = [];
        foreach ($players as $playerJson) {
            $player = json_decode($playerJson, true);
            $matchedPlayers[] = $player;
            $this->redis->zRem($queueKey, $playerJson);
        }

        $roomId = sprintf('room_%s', bin2hex(random_bytes(8)));

        $message = new GameMessage('match.success', [
            'room_id' => $roomId,
            'players' => $matchedPlayers,
            'game_mode' => $gameMode,
        ]);

        $this->publishMessage($message);

        return [
            'room_id' => $roomId,
            'players' => $matchedPlayers,
        ];
    }

    private function publishMessage(GameMessage $message): void
    {
        $routingKey = $message->messageType;

        $amqpMessage = new AMQPMessage(
            json_encode($message->toArray()),
            [
                'content_type' => 'application/json',
                'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
                'message_id' => $message->messageId,
                'timestamp' => time(),
            ]
        );

        $this->channel->basic_publish($amqpMessage, $this->exchangeName, $routingKey);
    }
}

聊天服务实现

php
<?php

namespace App\Game;

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

class ChatService
{
    private AMQPStreamConnection $connection;
    private $channel;
    private $redis;
    private string $exchangeName = 'game.exchange';

    public function __construct(AMQPStreamConnection $connection, $redis)
    {
        $this->connection = $connection;
        $this->channel = $connection->channel();
        $this->redis = $redis;
    }

    public function sendWorldChat(string $playerId, string $playerName, string $content): void
    {
        $message = new GameMessage('chat.world', [
            'channel' => 'world',
            'player_id' => $playerId,
            'player_name' => $playerName,
            'content' => $this->filterContent($content),
            'timestamp' => time(),
        ]);

        $this->publishMessage($message, 'chat.world');

        $this->redis->lPush('chat:world:history', json_encode($message->toArray()));
        $this->redis->lTrim('chat:world:history', 0, 99);
    }

    public function sendPrivateChat(string $fromPlayerId, string $toPlayerId, string $content): void
    {
        $message = new GameMessage('chat.private', [
            'channel' => 'private',
            'from_player_id' => $fromPlayerId,
            'to_player_id' => $toPlayerId,
            'content' => $this->filterContent($content),
            'timestamp' => time(),
        ]);

        $this->publishMessage($message, "chat.private.{$toPlayerId}");
    }

    public function sendGuildChat(string $guildId, string $playerId, string $playerName, string $content): void
    {
        $message = new GameMessage('chat.guild', [
            'channel' => 'guild',
            'guild_id' => $guildId,
            'player_id' => $playerId,
            'player_name' => $playerName,
            'content' => $this->filterContent($content),
            'timestamp' => time(),
        ]);

        $this->publishMessage($message, "chat.guild.{$guildId}");
    }

    private function filterContent(string $content): string
    {
        $sensitiveWords = ['敏感词1', '敏感词2'];
        return str_replace($sensitiveWords, '***', $content);
    }

    private function publishMessage(GameMessage $message, string $routingKey): void
    {
        $amqpMessage = new AMQPMessage(
            json_encode($message->toArray()),
            [
                'content_type' => 'application/json',
                'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
            ]
        );

        $this->channel->basic_publish($amqpMessage, $this->exchangeName, $routingKey);
    }
}

排行榜服务实现

php
<?php

namespace App\Game;

class LeaderboardService
{
    private $redis;
    private $channel;
    private string $exchangeName = 'game.exchange';

    public function __construct($redis, $channel)
    {
        $this->redis = $redis;
        $this->channel = $channel;
    }

    public function updateScore(string $playerId, int $score, string $season = 'current'): void
    {
        $key = "leaderboard:{$season}";
        $oldRank = $this->redis->zRevRank($key, $playerId);

        $this->redis->zAdd($key, $score, $playerId);

        $newRank = $this->redis->zRevRank($key, $playerId);

        if ($oldRank !== $newRank) {
            $this->notifyRankChange($playerId, $oldRank, $newRank, $score);
        }
    }

    public function getTopPlayers(string $season, int $limit = 100): array
    {
        $key = "leaderboard:{$season}";
        $players = $this->redis->zRevRange($key, 0, $limit - 1, true);

        $result = [];
        $rank = 1;
        foreach ($players as $playerId => $score) {
            $result[] = [
                'rank' => $rank++,
                'player_id' => $playerId,
                'score' => $score,
            ];
        }

        return $result;
    }

    public function getPlayerRank(string $playerId, string $season): ?array
    {
        $key = "leaderboard:{$season}";
        $rank = $this->redis->zRevRank($key, $playerId);
        $score = $this->redis->zScore($key, $playerId);

        if ($rank === false) {
            return null;
        }

        return [
            'rank' => $rank + 1,
            'score' => $score,
        ];
    }

    public function getAroundPlayers(string $playerId, string $season, int $range = 5): array
    {
        $key = "leaderboard:{$season}";
        $rank = $this->redis->zRevRank($key, $playerId);

        if ($rank === false) {
            return [];
        }

        $start = max(0, $rank - $range);
        $end = $rank + $range;

        $players = $this->redis->zRevRange($key, $start, $end, true);

        $result = [];
        $currentRank = $start + 1;
        foreach ($players as $pid => $score) {
            $result[] = [
                'rank' => $currentRank++,
                'player_id' => $pid,
                'score' => $score,
                'is_self' => $pid === $playerId,
            ];
        }

        return $result;
    }

    private function notifyRankChange(string $playerId, $oldRank, $newRank, int $score): void
    {
        $message = new GameMessage('leaderboard.change', [
            'player_id' => $playerId,
            'old_rank' => $oldRank !== false ? $oldRank + 1 : null,
            'new_rank' => $newRank + 1,
            'score' => $score,
        ]);

        $amqpMessage = new AMQPMessage(
            json_encode($message->toArray()),
            ['content_type' => 'application/json']
        );

        $this->channel->basic_publish(
            $amqpMessage,
            $this->exchangeName,
            "notification.player.{$playerId}"
        );
    }
}

通知消费者实现

php
<?php

namespace App\Game;

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

class NotificationConsumer
{
    private AMQPStreamConnection $connection;
    private $channel;
    private WebSocketServer $wsServer;
    private bool $running = true;

    public function __construct(
        AMQPStreamConnection $connection,
        WebSocketServer $wsServer
    ) {
        $this->connection = $connection;
        $this->channel = $connection->channel();
        $this->wsServer = $wsServer;
    }

    public function consume(): void
    {
        $this->channel->basic_qos(null, 100, null);
        $this->channel->basic_consume('game.notification', '', false, false, false, false, [$this, 'handleMessage']);

        while ($this->running && count($this->channel->callbacks)) {
            $this->channel->wait(null, true);
            if (!$this->running) break;
            usleep(10000);
        }
    }

    public function handleMessage(AMQPMessage $message): void
    {
        $data = json_decode($message->body, true);
        $messageType = $data['message_type'];
        $payload = $data['payload'];

        switch ($messageType) {
            case 'match.success':
                $this->handleMatchSuccess($payload);
                break;
            case 'leaderboard.change':
                $this->handleLeaderboardChange($payload);
                break;
            case 'chat.world':
                $this->handleWorldChat($payload);
                break;
            default:
                $this->handleGenericNotification($payload);
        }

        $message->ack();
    }

    private function handleMatchSuccess(array $payload): void
    {
        foreach ($payload['players'] as $player) {
            $this->wsServer->sendToPlayer($player['player_id'], json_encode([
                'type' => 'match_success',
                'room_id' => $payload['room_id'],
                'game_mode' => $payload['game_mode'],
            ]));
        }
    }

    private function handleLeaderboardChange(array $payload): void
    {
        $this->wsServer->sendToPlayer($payload['player_id'], json_encode([
            'type' => 'rank_change',
            'old_rank' => $payload['old_rank'],
            'new_rank' => $payload['new_rank'],
            'score' => $payload['score'],
        ]));
    }

    private function handleWorldChat(array $payload): void
    {
        $this->wsServer->broadcast(json_encode([
            'type' => 'world_chat',
            'player_name' => $payload['player_name'],
            'content' => $payload['content'],
            'timestamp' => $payload['timestamp'],
        ]));
    }

    private function handleGenericNotification(array $payload): void
    {
        if (isset($payload['player_id'])) {
            $this->wsServer->sendToPlayer($payload['player_id'], json_encode($payload));
        }
    }

    public function stop(): void
    {
        $this->running = false;
    }
}

完整使用示例

php
<?php

require_once 'vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use App\Game\{MatchService, ChatService, LeaderboardService, NotificationConsumer};

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');

$redis = new Redis();
$redis->connect('localhost', 6379);

$matchService = new MatchService($connection, $redis);
$chatService = new ChatService($connection, $redis);
$leaderboardService = new LeaderboardService($redis, $connection->channel());

echo "=== 游戏系统示例 ===\n\n";

echo "1. 玩家加入匹配\n";
$matchId1 = $matchService->joinMatch('player_001', 1500, 'ranked');
$matchId2 = $matchService->joinMatch('player_002', 1520, 'ranked');
echo "玩家1匹配ID: {$matchId1}\n";
echo "玩家2匹配ID: {$matchId2}\n";

echo "\n2. 处理匹配\n";
$result = $matchService->processMatch('ranked');
if ($result) {
    echo "匹配成功! 房间ID: {$result['room_id']}\n";
}

echo "\n3. 发送世界聊天\n";
$chatService->sendWorldChat('player_001', '玩家一', '大家好!');
echo "消息已发送\n";

echo "\n4. 更新排行榜\n";
$leaderboardService->updateScore('player_001', 1650);
$leaderboardService->updateScore('player_002', 1580);
echo "积分已更新\n";

echo "\n5. 查询排行榜\n";
$topPlayers = $leaderboardService->getTopPlayers('current', 10);
foreach ($topPlayers as $player) {
    echo "第{$player['rank']}名: {$player['player_id']} - {$player['score']}分\n";
}

$connection->close();

echo "\n=== 示例完成 ===\n";

关键技术点解析

1. 快速匹配

使用 Redis 有序集合实现快速匹配:

php
$this->redis->zAdd($queueKey, $score, json_encode($player));
$players = $this->redis->zRange($queueKey, 0, 1);

2. 实时推送

通过 WebSocket 实现实时推送:

php
$this->wsServer->sendToPlayer($playerId, json_encode($notification));

3. 排行榜

使用 Redis 有序集合:

php
$this->redis->zAdd($key, $score, $playerId);
$rank = $this->redis->zRevRank($key, $playerId);

4. 消息广播

php
$this->wsServer->broadcast(json_encode($message));

性能优化建议

优化项建议说明
连接复用保持长连接减少连接开销
批量推送合并消息推送减少网络请求
本地缓存缓存热点数据减少Redis访问
分区处理按游戏模式分区提高并行度

常见问题与解决方案

1. 匹配延迟

解决方案: 优化匹配算法 + 增加匹配服务器

2. 消息丢失

解决方案: 消息持久化 + 重试机制

3. 排行榜更新慢

解决方案: 异步更新 + 批量处理

相关链接