Skip to content

社交平台应用

概述

社交平台涉及用户关系、内容分发、即时通讯、动态推送等复杂场景。RabbitMQ 可以实现社交关系的解耦、内容的异步处理和消息的实时推送,支撑高并发社交业务。

业务背景与需求

场景描述

某社交平台业务模块:

模块功能消息场景
用户系统注册、登录、资料管理用户事件、通知
关系系统关注、粉丝、好友关系事件、粉丝通知
内容系统发布、点赞、评论、分享内容事件、Feed分发
消息系统私信、群聊、系统通知消息推送、已读回执
动态系统Feed流、推荐、热搜动态推送、缓存更新
活动系统活动、话题、挑战活动通知、参与统计

技术挑战

社交平台挑战:
1. 高并发:千万级用户同时在线
2. 实时性:消息需要即时送达
3. 海量数据:内容、关系数据量大
4. 推荐算法:个性化内容推荐

架构设计

整体架构图

mermaid
graph TB
    subgraph "客户端"
        A[移动APP]
        B[Web端]
        C[小程序]
    end
    
    subgraph "接入层"
        D[API网关]
        E[WebSocket服务]
    end
    
    subgraph "业务服务"
        F[用户服务]
        G[关系服务]
        H[内容服务]
        I[消息服务]
    end
    
    subgraph "RabbitMQ"
        J[社交事件总线<br/>social.exchange]
        
        subgraph "事件队列"
            K[用户事件队列]
            L[关系事件队列]
            M[内容事件队列]
            N[消息推送队列]
        end
    end
    
    subgraph "处理服务"
        O[Feed分发服务]
        P[通知服务]
        Q[搜索索引服务]
    end
    
    subgraph "存储层"
        R[MySQL]
        S[Redis]
        T[Elasticsearch]
    end
    
    A --> D
    B --> D
    C --> D
    A --> E
    
    D --> F
    D --> G
    D --> H
    D --> I
    
    F --> J
    G --> J
    H --> J
    I --> J
    
    J --> K
    J --> L
    J --> M
    J --> N
    
    K --> O
    L --> O
    M --> Q
    N --> P
    
    O --> S
    P --> E
    Q --> T
    F --> R
    G --> R
    H --> R

Feed 分发流程

mermaid
sequenceDiagram
    participant User as 用户
    participant Content as 内容服务
    participant MQ as RabbitMQ
    participant Feed as Feed服务
    participant Redis as Redis
    participant Follower as 粉丝
    
    User->>Content: 发布动态
    Content->>Content: 保存内容
    Content->>MQ: 发布内容事件
    Content-->>User: 返回成功
    
    MQ->>Feed: 投递事件
    Feed->>Feed: 获取粉丝列表
    
    loop 每个粉丝
        Feed->>Redis: 写入粉丝Feed
    end
    
    Feed->>MQ: 发布分发完成
    MQ->>Follower: 推送通知
    
    Follower->>Redis: 拉取Feed
    Redis-->>Follower: 返回内容

PHP 代码实现

社交事件定义

php
<?php

namespace App\Social;

abstract class SocialEvent
{
    public string $eventId;
    public string $eventType;
    public int $timestamp;
    public array $metadata;

    public function __construct()
    {
        $this->eventId = $this->generateEventId();
        $this->timestamp = time();
        $this->metadata = [];
    }

    private function generateEventId(): string
    {
        return sprintf('evt_%s_%s', date('YmdHis'), bin2hex(random_bytes(8)));
    }

    abstract public function getEventType(): string;

    abstract public function getPayload(): array;

    public function toArray(): array
    {
        return [
            'event_id' => $this->eventId,
            'event_type' => $this->getEventType(),
            'timestamp' => $this->timestamp,
            'metadata' => $this->metadata,
            'payload' => $this->getPayload(),
        ];
    }
}

class UserFollowedEvent extends SocialEvent
{
    private string $followerId;
    private string $followingId;

    public function __construct(string $followerId, string $followingId)
    {
        parent::__construct();
        $this->followerId = $followerId;
        $this->followingId = $followingId;
    }

    public function getEventType(): string
    {
        return 'user.followed';
    }

    public function getPayload(): array
    {
        return [
            'follower_id' => $this->followerId,
            'following_id' => $this->followingId,
        ];
    }
}

class ContentPublishedEvent extends SocialEvent
{
    private string $contentId;
    private string $authorId;
    private string $contentType;
    private array $content;

    public function __construct(string $contentId, string $authorId, string $contentType, array $content)
    {
        parent::__construct();
        $this->contentId = $contentId;
        $this->authorId = $authorId;
        $this->contentType = $contentType;
        $this->content = $content;
    }

    public function getEventType(): string
    {
        return 'content.published';
    }

    public function getPayload(): array
    {
        return [
            'content_id' => $this->contentId,
            'author_id' => $this->authorId,
            'content_type' => $this->contentType,
            'content' => $this->content,
        ];
    }
}

class ContentLikedEvent extends SocialEvent
{
    private string $contentId;
    private string $userId;
    private string $authorId;

    public function __construct(string $contentId, string $userId, string $authorId)
    {
        parent::__construct();
        $this->contentId = $contentId;
        $this->userId = $userId;
        $this->authorId = $authorId;
    }

    public function getEventType(): string
    {
        return 'content.liked';
    }

    public function getPayload(): array
    {
        return [
            'content_id' => $this->contentId,
            'user_id' => $this->userId,
            'author_id' => $this->authorId,
        ];
    }
}

class MessageSentEvent extends SocialEvent
{
    private string $messageId;
    private string $fromUserId;
    private string $toUserId;
    private string $messageType;
    private string $content;

    public function __construct(
        string $messageId,
        string $fromUserId,
        string $toUserId,
        string $messageType,
        string $content
    ) {
        parent::__construct();
        $this->messageId = $messageId;
        $this->fromUserId = $fromUserId;
        $this->toUserId = $toUserId;
        $this->messageType = $messageType;
        $this->content = $content;
    }

    public function getEventType(): string
    {
        return 'message.sent';
    }

    public function getPayload(): array
    {
        return [
            'message_id' => $this->messageId,
            'from_user_id' => $this->fromUserId,
            'to_user_id' => $this->toUserId,
            'message_type' => $this->messageType,
            'content' => $this->content,
        ];
    }
}

关系服务实现

php
<?php

namespace App\Social;

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Exchange\AMQPExchangeType;

class RelationshipService
{
    private AMQPStreamConnection $connection;
    private $channel;
    private $redis;
    private RelationshipRepository $repository;
    private string $exchangeName = 'social.exchange';

    public function __construct(
        AMQPStreamConnection $connection,
        $redis,
        RelationshipRepository $repository
    ) {
        $this->connection = $connection;
        $this->channel = $connection->channel();
        $this->redis = $redis;
        $this->repository = $repository;
        $this->setupExchange();
    }

    private function setupExchange(): void
    {
        $this->channel->exchange_declare(
            $this->exchangeName,
            AMQPExchangeType::TOPIC,
            false,
            true,
            false
        );
    }

    public function follow(string $followerId, string $followingId): array
    {
        if ($followerId === $followingId) {
            return ['success' => false, 'error' => 'Cannot follow yourself'];
        }

        if ($this->repository->isFollowing($followerId, $followingId)) {
            return ['success' => false, 'error' => 'Already following'];
        }

        $this->repository->addFollow($followerId, $followingId);

        $this->redis->sAdd("user:{$followerId}:following", $followingId);
        $this->redis->sAdd("user:{$followingId}:followers", $followerId);
        $this->redis->incrBy("user:{$followerId}:following_count", 1);
        $this->redis->incrBy("user:{$followingId}:follower_count", 1);

        $event = new UserFollowedEvent($followerId, $followingId);
        $this->publishEvent($event);

        return ['success' => true];
    }

    public function unfollow(string $followerId, string $followingId): array
    {
        if (!$this->repository->isFollowing($followerId, $followingId)) {
            return ['success' => false, 'error' => 'Not following'];
        }

        $this->repository->removeFollow($followerId, $followingId);

        $this->redis->sRem("user:{$followerId}:following", $followingId);
        $this->redis->sRem("user:{$followingId}:followers", $followerId);
        $this->redis->decrBy("user:{$followerId}:following_count", 1);
        $this->redis->decrBy("user:{$followingId}:follower_count", 1);

        return ['success' => true];
    }

    public function getFollowers(string $userId, int $page = 1, int $pageSize = 20): array
    {
        $offset = ($page - 1) * $pageSize;
        $followers = $this->redis->sMembers("user:{$userId}:followers");

        return array_slice($followers, $offset, $pageSize);
    }

    public function getFollowing(string $userId, int $page = 1, int $pageSize = 20): array
    {
        $offset = ($page - 1) * $pageSize;
        $following = $this->redis->sMembers("user:{$userId}:following");

        return array_slice($following, $offset, $pageSize);
    }

    public function isFollowing(string $followerId, string $followingId): bool
    {
        return (bool) $this->redis->sIsMember("user:{$followerId}:following", $followingId);
    }

    private function publishEvent(SocialEvent $event): void
    {
        $message = new AMQPMessage(
            json_encode($event->toArray()),
            [
                'content_type' => 'application/json',
                'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
                'message_id' => $event->eventId,
            ]
        );

        $routingKey = $event->getEventType();
        $this->channel->basic_publish($message, $this->exchangeName, $routingKey);
    }
}

内容服务实现

php
<?php

namespace App\Social;

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

class ContentService
{
    private AMQPStreamConnection $connection;
    private $channel;
    private $redis;
    private ContentRepository $repository;
    private string $exchangeName = 'social.exchange';

    public function __construct(
        AMQPStreamConnection $connection,
        $redis,
        ContentRepository $repository
    ) {
        $this->connection = $connection;
        $this->channel = $connection->channel();
        $this->redis = $redis;
        $this->repository = $repository;
    }

    public function publishContent(string $authorId, string $contentType, array $content): array
    {
        $contentId = $this->generateContentId();

        $contentData = [
            'content_id' => $contentId,
            'author_id' => $authorId,
            'content_type' => $contentType,
            'content' => $content,
            'like_count' => 0,
            'comment_count' => 0,
            'share_count' => 0,
            'created_at' => date('Y-m-d H:i:s'),
        ];

        $this->repository->create($contentData);

        $this->redis->lPush("user:{$authorId}:contents", $contentId);

        $event = new ContentPublishedEvent($contentId, $authorId, $contentType, $content);
        $this->publishEvent($event);

        return ['content_id' => $contentId];
    }

    public function likeContent(string $contentId, string $userId): array
    {
        $content = $this->repository->findById($contentId);
        if (!$content) {
            return ['success' => false, 'error' => 'Content not found'];
        }

        $likeKey = "content:{$contentId}:likes";
        if ($this->redis->sIsMember($likeKey, $userId)) {
            return ['success' => false, 'error' => 'Already liked'];
        }

        $this->redis->sAdd($likeKey, $userId);
        $this->redis->incrBy("content:{$contentId}:like_count", 1);

        $this->repository->incrementLikeCount($contentId);

        $event = new ContentLikedEvent($contentId, $userId, $content['author_id']);
        $this->publishEvent($event);

        return ['success' => true];
    }

    public function unlikeContent(string $contentId, string $userId): array
    {
        $likeKey = "content:{$contentId}:likes";
        if (!$this->redis->sIsMember($likeKey, $userId)) {
            return ['success' => false, 'error' => 'Not liked'];
        }

        $this->redis->sRem($likeKey, $userId);
        $this->redis->decrBy("content:{$contentId}:like_count", 1);

        $this->repository->decrementLikeCount($contentId);

        return ['success' => true];
    }

    private function publishEvent(SocialEvent $event): void
    {
        $message = new AMQPMessage(
            json_encode($event->toArray()),
            [
                'content_type' => 'application/json',
                'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
            ]
        );

        $this->channel->basic_publish($message, $this->exchangeName, $event->getEventType());
    }

    private function generateContentId(): string
    {
        return sprintf('cnt_%s_%s', date('YmdHis'), bin2hex(random_bytes(4)));
    }
}

Feed 分发服务

php
<?php

namespace App\Social;

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

class FeedDistributionService
{
    private AMQPStreamConnection $connection;
    private $channel;
    private $redis;
    private bool $running = true;

    public function __construct(AMQPStreamConnection $connection, $redis)
    {
        $this->connection = $connection;
        $this->channel = $connection->channel();
        $this->redis = $redis;
        $this->setupQueue();
    }

    private function setupQueue(): void
    {
        $this->channel->exchange_declare('social.exchange', 'topic', false, true, false);
        $this->channel->queue_declare('social.feed', false, true, false, false);
        $this->channel->queue_bind('social.feed', 'social.exchange', 'content.published');
    }

    public function consume(): void
    {
        $this->channel->basic_qos(null, 10, null);
        $this->channel->basic_consume('social.feed', '', false, false, false, false, [$this, 'handleEvent']);

        while ($this->running && count($this->channel->callbacks)) {
            $this->channel->wait(null, true);
            if (!$this->running) break;
            usleep(100000);
        }
    }

    public function handleEvent(AMQPMessage $message): void
    {
        $event = json_decode($message->body, true);
        $payload = $event['payload'];

        $authorId = $payload['author_id'];
        $contentId = $payload['content_id'];

        $followers = $this->redis->sMembers("user:{$authorId}:followers");

        $feedItem = json_encode([
            'content_id' => $contentId,
            'author_id' => $authorId,
            'timestamp' => $event['timestamp'],
        ]);

        foreach ($followers as $followerId) {
            $this->redis->lPush("feed:{$followerId}", $feedItem);
            $this->redis->lTrim("feed:{$followerId}", 0, 999);
        }

        $this->redis->lPush("feed:{$authorId}", $feedItem);

        $message->ack();
    }

    public function getFeed(string $userId, int $page = 1, int $pageSize = 20): array
    {
        $offset = ($page - 1) * $pageSize;
        $feedItems = $this->redis->lRange("feed:{$userId}", $offset, $offset + $pageSize - 1);

        $feed = [];
        foreach ($feedItems as $item) {
            $feed[] = json_decode($item, true);
        }

        return $feed;
    }

    public function stop(): void
    {
        $this->running = false;
    }
}

消息推送服务

php
<?php

namespace App\Social;

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

class MessagePushService
{
    private AMQPStreamConnection $connection;
    private $channel;
    private $redis;
    private WebSocketServer $wsServer;
    private bool $running = true;

    public function __construct(
        AMQPStreamConnection $connection,
        $redis,
        WebSocketServer $wsServer
    ) {
        $this->connection = $connection;
        $this->channel = $connection->channel();
        $this->redis = $redis;
        $this->wsServer = $wsServer;
        $this->setupQueue();
    }

    private function setupQueue(): void
    {
        $this->channel->exchange_declare('social.exchange', 'topic', false, true, false);
        $this->channel->queue_declare('social.notification', false, true, false, false);
        $this->channel->queue_bind('social.notification', 'social.exchange', 'user.followed');
        $this->channel->queue_bind('social.notification', 'social.exchange', 'content.liked');
        $this->channel->queue_bind('social.notification', 'social.exchange', 'message.sent');
    }

    public function consume(): void
    {
        $this->channel->basic_qos(null, 50, null);
        $this->channel->basic_consume('social.notification', '', false, false, false, false, [$this, 'handleEvent']);

        while ($this->running && count($this->channel->callbacks)) {
            $this->channel->wait(null, true);
            if (!$this->running) break;
            usleep(10000);
        }
    }

    public function handleEvent(AMQPMessage $message): void
    {
        $event = json_decode($message->body, true);
        $eventType = $event['event_type'];
        $payload = $event['payload'];

        switch ($eventType) {
            case 'user.followed':
                $this->handleFollowNotification($payload);
                break;
            case 'content.liked':
                $this->handleLikeNotification($payload);
                break;
            case 'message.sent':
                $this->handleMessageNotification($payload);
                break;
        }

        $message->ack();
    }

    private function handleFollowNotification(array $payload): void
    {
        $notification = [
            'type' => 'new_follower',
            'follower_id' => $payload['follower_id'],
            'timestamp' => time(),
        ];

        $this->pushNotification($payload['following_id'], $notification);
    }

    private function handleLikeNotification(array $payload): void
    {
        $notification = [
            'type' => 'content_liked',
            'content_id' => $payload['content_id'],
            'user_id' => $payload['user_id'],
            'timestamp' => time(),
        ];

        $this->pushNotification($payload['author_id'], $notification);
    }

    private function handleMessageNotification(array $payload): void
    {
        $notification = [
            'type' => 'new_message',
            'message_id' => $payload['message_id'],
            'from_user_id' => $payload['from_user_id'],
            'content' => mb_substr($payload['content'], 0, 50),
            'timestamp' => time(),
        ];

        $this->pushNotification($payload['to_user_id'], $notification);
    }

    private function pushNotification(string $userId, array $notification): void
    {
        $this->redis->lPush("notification:{$userId}", json_encode($notification));
        $this->redis->lTrim("notification:{$userId}", 0, 99);

        $this->wsServer->sendToUser($userId, json_encode([
            'type' => 'notification',
            'data' => $notification,
        ]));
    }

    public function stop(): void
    {
        $this->running = false;
    }
}

完整使用示例

php
<?php

require_once 'vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use App\Social\{
    RelationshipService,
    ContentService,
    FeedDistributionService,
    MessagePushService
};

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');

$redis = new Redis();
$redis->connect('localhost', 6379);

$relationshipService = new RelationshipService($connection, $redis, new RelationshipRepository());
$contentService = new ContentService($connection, $redis, new ContentRepository());

echo "=== 社交平台示例 ===\n\n";

echo "1. 用户关注\n";
$relationshipService->follow('user_001', 'user_002');
echo "用户1关注了用户2\n";

echo "\n2. 发布内容\n";
$result = $contentService->publishContent('user_002', 'text', [
    'text' => '今天天气真好!',
    'images' => [],
]);
echo "内容已发布: {$result['content_id']}\n";

echo "\n3. 点赞内容\n";
$contentService->likeContent($result['content_id'], 'user_001');
echo "用户1点赞了内容\n";

echo "启动Feed分发服务...\n";
$feedService = new FeedDistributionService($connection, $redis);

$pid = pcntl_fork();
if ($pid === 0) {
    $feedService->consume();
    exit(0);
}

sleep(2);

echo "\n4. 获取Feed\n";
$feed = $feedService->getFeed('user_001', 1, 10);
foreach ($feed as $item) {
    echo "- 内容ID: {$item['content_id']}\n";
}

posix_kill($pid, SIGTERM);
pcntl_wait($status);

$connection->close();

echo "\n=== 示例完成 ===\n";

关键技术点解析

1. 推拉结合

Feed 分发采用推拉结合模式:

php
foreach ($followers as $followerId) {
    $this->redis->lPush("feed:{$followerId}", $feedItem);
}

2. 关系存储

使用 Redis 集合存储关系:

php
$this->redis->sAdd("user:{$followerId}:following", $followingId);
$this->redis->sAdd("user:{$followingId}:followers", $followerId);

3. 实时推送

通过 WebSocket 实现实时通知:

php
$this->wsServer->sendToUser($userId, json_encode($notification));

4. 计数缓存

php
$this->redis->incrBy("content:{$contentId}:like_count", 1);

性能优化建议

优化项建议说明
Feed分片按用户ID分片提高并行度
批量推送合并通知推送减少网络请求
缓存预热预加载热点数据减少延迟
异步处理非核心流程异步提升响应速度

常见问题与解决方案

1. Feed延迟

解决方案: 增加分发服务实例 + 批量处理

2. 大V问题

解决方案: 限制推送范围 + 拉取模式

3. 消息丢失

解决方案: 消息持久化 + 确认机制

相关链接