ThinkPHP与Workerman结合,通过 websocket 实现即时通讯消息推送
整合方式
使用 Workerman, GatewayWorker时开发者最关心的是如何与现有mvc框架(ThinkPHP Yii laravel等)整合,以下是官方推荐的整合方式。见示意图:

总体原则
- 现有mvc框架项目与GatewayWorker独立部署互不干扰
- 所有的业务逻辑都由http服务post/get到mvc框架中完成
- 当mvc框架需要向客户端主动推送数据时才在mvc框架中调用Gateway的API GatewayClient完成推送。
在thinkphp中使用
1.安装think-worker扩展
composer require topthink/think-worker
2.新建自定义events处理类
php
<?php
namespace app\common\gatewayWorker;
use Exception;
use GatewayWorker\Lib\Gateway;
use think\worker\Application;
use Workerman\Worker;
class Events
{
/**
* onWorkerStart 事件回调
* 当businessWorker进程启动时触发。每个进程生命周期内都只会触发一次
* @access public
* @param Worker $businessWorker
* @return void
*/
public static function onWorkerStart(Worker $businessWorker): void
{
$app = new Application;
$app->initialize();
}
/**
* onConnect 事件回调
* 当客户端连接上gateway进程时(TCP三次握手完毕时)触发
* @param string $client_id
* @return void
*/
public static function onConnect(string $client_id): void
{
Gateway::sendToCurrentClient(json_encode(['type'=>'onConnect','client_id'=>$client_id]));
}
/**
* onWebSocketConnect 事件回调
* 当客户端连接上gateway完成websocket握手时触发
* @param string $client_id 断开连接的客户端client_id
* @param mixed $data
* @return void
*/
public static function onWebSocketConnect(string $client_id, $data): void
{
}
/**
* onMessage 事件回调
* 当客户端发来数据(Gateway进程收到数据)后触发
* @access public
* @param string $client_id
* @param mixed $data
* @return void
* @throws Exception
*/
public static function onMessage(string $client_id, mixed $data): void
{
}
/**
* onClose 事件回调 当用户断开连接时触发的方法
* @param string $client_id 断开连接的客户端client_id
* @return void
* @throws Exception
*/
public static function onClose(string $client_id): void
{
}
/**
* onWorkerStop 事件回调
* 当businessWorker进程退出时触发。每个进程生命周期内都只会触发一次。
* @param Worker $businessWorker
* @return void
*/
public static function onWorkerStop(Worker $businessWorker)
{
}
}3.修改配置文件 gateway_worker.php 的 eventHandler 配置项,指定自定义的events处理类
php
'eventHandler' => \app\common\gatewayWorker\Events::class,4.启动 Gateway
php
php think worker:gateway5.客户端逻辑代码
javascript
// 这里的ws链接地址,在tp配置文件 gateway_worker.php 中
ws = new WebSocket("ws://127.0.0.1:2348");
// 服务端主动推送消息时会触发这里的onmessage
ws.onmessage = function (e) {
// json数据转换成js对象
let data = JSON.parse(e.data);
// 获取类型
let type = data.type || '';
switch (type) {
// Events.php中返回的onConnect类型的消息,将client_id发给后台进行uid绑定
case 'onConnect':
// 利用jquery发起ajax请求,将client_id发给后端进行uid绑定
$.post('/index/bind', {client_id: data.client_id}, function (data) {
}, 'json');
break;
// 当mvc框架调用GatewayClient发消息时直接alert出来
default :
alert(e.data);
}
};6.clientId与用户ID的绑定
假设前提:用户已经登录成功可以拿到
userID,然后通过前端html页面使用ws协议连接后端ws服务,拿到client_id后,使用http请求,发送clientId至后端接口,进行绑定
!>📢 注意:以下代码只是为了api功能演示 ,正常的业务逻辑应该是由前端通过API接口获取一个绑定口令,再通过ws发送口令至后端完成绑定,防止前端发送错误的clientId至后端进行绑定
php
public function bind()
{
// 前端通过链接ws获得的client_id
$client_id = input('client_id');
// 假设用户ID为1
$userId = 1;
// 假设用户加入了群组:Group:123
$userGroup = 'group:123';
// client_id 与 已登录用户的 userid 绑定
\GatewayWorker\Lib\Gateway::bindUid($client_id, $userId);
// 加入某个群组(可调用多次加入多个群组)
\GatewayWorker\Lib\Gateway::joinGroup($client_id, $userGroup);
}7.进行消息推送
在
http服务器进行ws消息推送,前提是用户ID与clienId绑定成功
php
public function push()
{
// 假设用户ID为1
$userId = 1;
// 单个群组:Group:123
$userGroup = 'group:123';
// 多个群组
$userGroups = ['group:123','group:456'];
// 发送个人消息
\GatewayWorker\Lib\Gateway::sendToUid($userId, json_encode([
'type'=>'sendToUid',
'time'=>time(),
]));
// 向单个群组的网站页面发送数据
\GatewayWorker\Lib\Gateway::sendToGroup($userGroup, json_encode([
'type'=>'sendToGroup',
'time'=>time(),
]));
// 向多个群组的网站页面发送数据
\GatewayWorker\Lib\Gateway::sendToGroup($userGroups, json_encode([
'type'=>'sendToGroup',
'time'=>time(),
]));
// 向所有用户发送消息
\GatewayWorker\Lib\Gateway::sendToAll(json_encode([
'type'=>'sendToAll',
'time'=>time(),
]));
}8.断线重连
- 在 Events 类中,监听
onClose断开连接回调,当用户断开链接时,清理userId与clientId之间的关系以及用户断连的其他业务逻辑处理 - 客户端监听断开事件,重新发起http请求至绑定接口进行重链
