Appearance
社交平台应用
概述
社交平台涉及用户关系、内容分发、即时通讯、动态推送等复杂场景。RabbitMQ 可以实现社交关系的解耦、内容的异步处理和消息的实时推送,支撑高并发社交业务。
业务背景与需求
场景描述
某社交平台业务模块:
| 模块 | 功能 | 消息场景 |
|---|---|---|
| 用户系统 | 注册、登录、资料管理 | 用户事件、通知 |
| 关系系统 | 关注、粉丝、好友 | 关系事件、粉丝通知 |
| 内容系统 | 发布、点赞、评论、分享 | 内容事件、Feed分发 |
| 消息系统 | 私信、群聊、系统通知 | 消息推送、已读回执 |
| 动态系统 | Feed流、推荐、热搜 | 动态推送、缓存更新 |
| 活动系统 | 活动、话题、挑战 | 活动通知、参与统计 |
技术挑战
社交平台挑战:
1. 高并发:千万级用户同时在线
2. 实时性:消息需要即时送达
3. 海量数据:内容、关系数据量大
4. 推荐算法:个性化内容推荐架构设计
整体架构图
mermaid
graph TB
subgraph "客户端"
A[移动APP]
B[Web端]
C[小程序]
end
subgraph "接入层"
D[API网关]
E[WebSocket服务]
end
subgraph "业务服务"
F[用户服务]
G[关系服务]
H[内容服务]
I[消息服务]
end
subgraph "RabbitMQ"
J[社交事件总线<br/>social.exchange]
subgraph "事件队列"
K[用户事件队列]
L[关系事件队列]
M[内容事件队列]
N[消息推送队列]
end
end
subgraph "处理服务"
O[Feed分发服务]
P[通知服务]
Q[搜索索引服务]
end
subgraph "存储层"
R[MySQL]
S[Redis]
T[Elasticsearch]
end
A --> D
B --> D
C --> D
A --> E
D --> F
D --> G
D --> H
D --> I
F --> J
G --> J
H --> J
I --> J
J --> K
J --> L
J --> M
J --> N
K --> O
L --> O
M --> Q
N --> P
O --> S
P --> E
Q --> T
F --> R
G --> R
H --> RFeed 分发流程
mermaid
sequenceDiagram
participant User as 用户
participant Content as 内容服务
participant MQ as RabbitMQ
participant Feed as Feed服务
participant Redis as Redis
participant Follower as 粉丝
User->>Content: 发布动态
Content->>Content: 保存内容
Content->>MQ: 发布内容事件
Content-->>User: 返回成功
MQ->>Feed: 投递事件
Feed->>Feed: 获取粉丝列表
loop 每个粉丝
Feed->>Redis: 写入粉丝Feed
end
Feed->>MQ: 发布分发完成
MQ->>Follower: 推送通知
Follower->>Redis: 拉取Feed
Redis-->>Follower: 返回内容PHP 代码实现
社交事件定义
php
<?php
namespace App\Social;
abstract class SocialEvent
{
public string $eventId;
public string $eventType;
public int $timestamp;
public array $metadata;
public function __construct()
{
$this->eventId = $this->generateEventId();
$this->timestamp = time();
$this->metadata = [];
}
private function generateEventId(): string
{
return sprintf('evt_%s_%s', date('YmdHis'), bin2hex(random_bytes(8)));
}
abstract public function getEventType(): string;
abstract public function getPayload(): array;
public function toArray(): array
{
return [
'event_id' => $this->eventId,
'event_type' => $this->getEventType(),
'timestamp' => $this->timestamp,
'metadata' => $this->metadata,
'payload' => $this->getPayload(),
];
}
}
class UserFollowedEvent extends SocialEvent
{
private string $followerId;
private string $followingId;
public function __construct(string $followerId, string $followingId)
{
parent::__construct();
$this->followerId = $followerId;
$this->followingId = $followingId;
}
public function getEventType(): string
{
return 'user.followed';
}
public function getPayload(): array
{
return [
'follower_id' => $this->followerId,
'following_id' => $this->followingId,
];
}
}
class ContentPublishedEvent extends SocialEvent
{
private string $contentId;
private string $authorId;
private string $contentType;
private array $content;
public function __construct(string $contentId, string $authorId, string $contentType, array $content)
{
parent::__construct();
$this->contentId = $contentId;
$this->authorId = $authorId;
$this->contentType = $contentType;
$this->content = $content;
}
public function getEventType(): string
{
return 'content.published';
}
public function getPayload(): array
{
return [
'content_id' => $this->contentId,
'author_id' => $this->authorId,
'content_type' => $this->contentType,
'content' => $this->content,
];
}
}
class ContentLikedEvent extends SocialEvent
{
private string $contentId;
private string $userId;
private string $authorId;
public function __construct(string $contentId, string $userId, string $authorId)
{
parent::__construct();
$this->contentId = $contentId;
$this->userId = $userId;
$this->authorId = $authorId;
}
public function getEventType(): string
{
return 'content.liked';
}
public function getPayload(): array
{
return [
'content_id' => $this->contentId,
'user_id' => $this->userId,
'author_id' => $this->authorId,
];
}
}
class MessageSentEvent extends SocialEvent
{
private string $messageId;
private string $fromUserId;
private string $toUserId;
private string $messageType;
private string $content;
public function __construct(
string $messageId,
string $fromUserId,
string $toUserId,
string $messageType,
string $content
) {
parent::__construct();
$this->messageId = $messageId;
$this->fromUserId = $fromUserId;
$this->toUserId = $toUserId;
$this->messageType = $messageType;
$this->content = $content;
}
public function getEventType(): string
{
return 'message.sent';
}
public function getPayload(): array
{
return [
'message_id' => $this->messageId,
'from_user_id' => $this->fromUserId,
'to_user_id' => $this->toUserId,
'message_type' => $this->messageType,
'content' => $this->content,
];
}
}关系服务实现
php
<?php
namespace App\Social;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Exchange\AMQPExchangeType;
class RelationshipService
{
private AMQPStreamConnection $connection;
private $channel;
private $redis;
private RelationshipRepository $repository;
private string $exchangeName = 'social.exchange';
public function __construct(
AMQPStreamConnection $connection,
$redis,
RelationshipRepository $repository
) {
$this->connection = $connection;
$this->channel = $connection->channel();
$this->redis = $redis;
$this->repository = $repository;
$this->setupExchange();
}
private function setupExchange(): void
{
$this->channel->exchange_declare(
$this->exchangeName,
AMQPExchangeType::TOPIC,
false,
true,
false
);
}
public function follow(string $followerId, string $followingId): array
{
if ($followerId === $followingId) {
return ['success' => false, 'error' => 'Cannot follow yourself'];
}
if ($this->repository->isFollowing($followerId, $followingId)) {
return ['success' => false, 'error' => 'Already following'];
}
$this->repository->addFollow($followerId, $followingId);
$this->redis->sAdd("user:{$followerId}:following", $followingId);
$this->redis->sAdd("user:{$followingId}:followers", $followerId);
$this->redis->incrBy("user:{$followerId}:following_count", 1);
$this->redis->incrBy("user:{$followingId}:follower_count", 1);
$event = new UserFollowedEvent($followerId, $followingId);
$this->publishEvent($event);
return ['success' => true];
}
public function unfollow(string $followerId, string $followingId): array
{
if (!$this->repository->isFollowing($followerId, $followingId)) {
return ['success' => false, 'error' => 'Not following'];
}
$this->repository->removeFollow($followerId, $followingId);
$this->redis->sRem("user:{$followerId}:following", $followingId);
$this->redis->sRem("user:{$followingId}:followers", $followerId);
$this->redis->decrBy("user:{$followerId}:following_count", 1);
$this->redis->decrBy("user:{$followingId}:follower_count", 1);
return ['success' => true];
}
public function getFollowers(string $userId, int $page = 1, int $pageSize = 20): array
{
$offset = ($page - 1) * $pageSize;
$followers = $this->redis->sMembers("user:{$userId}:followers");
return array_slice($followers, $offset, $pageSize);
}
public function getFollowing(string $userId, int $page = 1, int $pageSize = 20): array
{
$offset = ($page - 1) * $pageSize;
$following = $this->redis->sMembers("user:{$userId}:following");
return array_slice($following, $offset, $pageSize);
}
public function isFollowing(string $followerId, string $followingId): bool
{
return (bool) $this->redis->sIsMember("user:{$followerId}:following", $followingId);
}
private function publishEvent(SocialEvent $event): void
{
$message = new AMQPMessage(
json_encode($event->toArray()),
[
'content_type' => 'application/json',
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
'message_id' => $event->eventId,
]
);
$routingKey = $event->getEventType();
$this->channel->basic_publish($message, $this->exchangeName, $routingKey);
}
}内容服务实现
php
<?php
namespace App\Social;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
class ContentService
{
private AMQPStreamConnection $connection;
private $channel;
private $redis;
private ContentRepository $repository;
private string $exchangeName = 'social.exchange';
public function __construct(
AMQPStreamConnection $connection,
$redis,
ContentRepository $repository
) {
$this->connection = $connection;
$this->channel = $connection->channel();
$this->redis = $redis;
$this->repository = $repository;
}
public function publishContent(string $authorId, string $contentType, array $content): array
{
$contentId = $this->generateContentId();
$contentData = [
'content_id' => $contentId,
'author_id' => $authorId,
'content_type' => $contentType,
'content' => $content,
'like_count' => 0,
'comment_count' => 0,
'share_count' => 0,
'created_at' => date('Y-m-d H:i:s'),
];
$this->repository->create($contentData);
$this->redis->lPush("user:{$authorId}:contents", $contentId);
$event = new ContentPublishedEvent($contentId, $authorId, $contentType, $content);
$this->publishEvent($event);
return ['content_id' => $contentId];
}
public function likeContent(string $contentId, string $userId): array
{
$content = $this->repository->findById($contentId);
if (!$content) {
return ['success' => false, 'error' => 'Content not found'];
}
$likeKey = "content:{$contentId}:likes";
if ($this->redis->sIsMember($likeKey, $userId)) {
return ['success' => false, 'error' => 'Already liked'];
}
$this->redis->sAdd($likeKey, $userId);
$this->redis->incrBy("content:{$contentId}:like_count", 1);
$this->repository->incrementLikeCount($contentId);
$event = new ContentLikedEvent($contentId, $userId, $content['author_id']);
$this->publishEvent($event);
return ['success' => true];
}
public function unlikeContent(string $contentId, string $userId): array
{
$likeKey = "content:{$contentId}:likes";
if (!$this->redis->sIsMember($likeKey, $userId)) {
return ['success' => false, 'error' => 'Not liked'];
}
$this->redis->sRem($likeKey, $userId);
$this->redis->decrBy("content:{$contentId}:like_count", 1);
$this->repository->decrementLikeCount($contentId);
return ['success' => true];
}
private function publishEvent(SocialEvent $event): void
{
$message = new AMQPMessage(
json_encode($event->toArray()),
[
'content_type' => 'application/json',
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
]
);
$this->channel->basic_publish($message, $this->exchangeName, $event->getEventType());
}
private function generateContentId(): string
{
return sprintf('cnt_%s_%s', date('YmdHis'), bin2hex(random_bytes(4)));
}
}Feed 分发服务
php
<?php
namespace App\Social;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
class FeedDistributionService
{
private AMQPStreamConnection $connection;
private $channel;
private $redis;
private bool $running = true;
public function __construct(AMQPStreamConnection $connection, $redis)
{
$this->connection = $connection;
$this->channel = $connection->channel();
$this->redis = $redis;
$this->setupQueue();
}
private function setupQueue(): void
{
$this->channel->exchange_declare('social.exchange', 'topic', false, true, false);
$this->channel->queue_declare('social.feed', false, true, false, false);
$this->channel->queue_bind('social.feed', 'social.exchange', 'content.published');
}
public function consume(): void
{
$this->channel->basic_qos(null, 10, null);
$this->channel->basic_consume('social.feed', '', false, false, false, false, [$this, 'handleEvent']);
while ($this->running && count($this->channel->callbacks)) {
$this->channel->wait(null, true);
if (!$this->running) break;
usleep(100000);
}
}
public function handleEvent(AMQPMessage $message): void
{
$event = json_decode($message->body, true);
$payload = $event['payload'];
$authorId = $payload['author_id'];
$contentId = $payload['content_id'];
$followers = $this->redis->sMembers("user:{$authorId}:followers");
$feedItem = json_encode([
'content_id' => $contentId,
'author_id' => $authorId,
'timestamp' => $event['timestamp'],
]);
foreach ($followers as $followerId) {
$this->redis->lPush("feed:{$followerId}", $feedItem);
$this->redis->lTrim("feed:{$followerId}", 0, 999);
}
$this->redis->lPush("feed:{$authorId}", $feedItem);
$message->ack();
}
public function getFeed(string $userId, int $page = 1, int $pageSize = 20): array
{
$offset = ($page - 1) * $pageSize;
$feedItems = $this->redis->lRange("feed:{$userId}", $offset, $offset + $pageSize - 1);
$feed = [];
foreach ($feedItems as $item) {
$feed[] = json_decode($item, true);
}
return $feed;
}
public function stop(): void
{
$this->running = false;
}
}消息推送服务
php
<?php
namespace App\Social;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
class MessagePushService
{
private AMQPStreamConnection $connection;
private $channel;
private $redis;
private WebSocketServer $wsServer;
private bool $running = true;
public function __construct(
AMQPStreamConnection $connection,
$redis,
WebSocketServer $wsServer
) {
$this->connection = $connection;
$this->channel = $connection->channel();
$this->redis = $redis;
$this->wsServer = $wsServer;
$this->setupQueue();
}
private function setupQueue(): void
{
$this->channel->exchange_declare('social.exchange', 'topic', false, true, false);
$this->channel->queue_declare('social.notification', false, true, false, false);
$this->channel->queue_bind('social.notification', 'social.exchange', 'user.followed');
$this->channel->queue_bind('social.notification', 'social.exchange', 'content.liked');
$this->channel->queue_bind('social.notification', 'social.exchange', 'message.sent');
}
public function consume(): void
{
$this->channel->basic_qos(null, 50, null);
$this->channel->basic_consume('social.notification', '', false, false, false, false, [$this, 'handleEvent']);
while ($this->running && count($this->channel->callbacks)) {
$this->channel->wait(null, true);
if (!$this->running) break;
usleep(10000);
}
}
public function handleEvent(AMQPMessage $message): void
{
$event = json_decode($message->body, true);
$eventType = $event['event_type'];
$payload = $event['payload'];
switch ($eventType) {
case 'user.followed':
$this->handleFollowNotification($payload);
break;
case 'content.liked':
$this->handleLikeNotification($payload);
break;
case 'message.sent':
$this->handleMessageNotification($payload);
break;
}
$message->ack();
}
private function handleFollowNotification(array $payload): void
{
$notification = [
'type' => 'new_follower',
'follower_id' => $payload['follower_id'],
'timestamp' => time(),
];
$this->pushNotification($payload['following_id'], $notification);
}
private function handleLikeNotification(array $payload): void
{
$notification = [
'type' => 'content_liked',
'content_id' => $payload['content_id'],
'user_id' => $payload['user_id'],
'timestamp' => time(),
];
$this->pushNotification($payload['author_id'], $notification);
}
private function handleMessageNotification(array $payload): void
{
$notification = [
'type' => 'new_message',
'message_id' => $payload['message_id'],
'from_user_id' => $payload['from_user_id'],
'content' => mb_substr($payload['content'], 0, 50),
'timestamp' => time(),
];
$this->pushNotification($payload['to_user_id'], $notification);
}
private function pushNotification(string $userId, array $notification): void
{
$this->redis->lPush("notification:{$userId}", json_encode($notification));
$this->redis->lTrim("notification:{$userId}", 0, 99);
$this->wsServer->sendToUser($userId, json_encode([
'type' => 'notification',
'data' => $notification,
]));
}
public function stop(): void
{
$this->running = false;
}
}完整使用示例
php
<?php
require_once 'vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use App\Social\{
RelationshipService,
ContentService,
FeedDistributionService,
MessagePushService
};
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$redis = new Redis();
$redis->connect('localhost', 6379);
$relationshipService = new RelationshipService($connection, $redis, new RelationshipRepository());
$contentService = new ContentService($connection, $redis, new ContentRepository());
echo "=== 社交平台示例 ===\n\n";
echo "1. 用户关注\n";
$relationshipService->follow('user_001', 'user_002');
echo "用户1关注了用户2\n";
echo "\n2. 发布内容\n";
$result = $contentService->publishContent('user_002', 'text', [
'text' => '今天天气真好!',
'images' => [],
]);
echo "内容已发布: {$result['content_id']}\n";
echo "\n3. 点赞内容\n";
$contentService->likeContent($result['content_id'], 'user_001');
echo "用户1点赞了内容\n";
echo "启动Feed分发服务...\n";
$feedService = new FeedDistributionService($connection, $redis);
$pid = pcntl_fork();
if ($pid === 0) {
$feedService->consume();
exit(0);
}
sleep(2);
echo "\n4. 获取Feed\n";
$feed = $feedService->getFeed('user_001', 1, 10);
foreach ($feed as $item) {
echo "- 内容ID: {$item['content_id']}\n";
}
posix_kill($pid, SIGTERM);
pcntl_wait($status);
$connection->close();
echo "\n=== 示例完成 ===\n";关键技术点解析
1. 推拉结合
Feed 分发采用推拉结合模式:
php
foreach ($followers as $followerId) {
$this->redis->lPush("feed:{$followerId}", $feedItem);
}2. 关系存储
使用 Redis 集合存储关系:
php
$this->redis->sAdd("user:{$followerId}:following", $followingId);
$this->redis->sAdd("user:{$followingId}:followers", $followerId);3. 实时推送
通过 WebSocket 实现实时通知:
php
$this->wsServer->sendToUser($userId, json_encode($notification));4. 计数缓存
php
$this->redis->incrBy("content:{$contentId}:like_count", 1);性能优化建议
| 优化项 | 建议 | 说明 |
|---|---|---|
| Feed分片 | 按用户ID分片 | 提高并行度 |
| 批量推送 | 合并通知推送 | 减少网络请求 |
| 缓存预热 | 预加载热点数据 | 减少延迟 |
| 异步处理 | 非核心流程异步 | 提升响应速度 |
常见问题与解决方案
1. Feed延迟
解决方案: 增加分发服务实例 + 批量处理
2. 大V问题
解决方案: 限制推送范围 + 拉取模式
3. 消息丢失
解决方案: 消息持久化 + 确认机制
