Socket框架
框架提供PSR-4规范的类加载器,同样swoole、workerman也支持同样的加载方式,所以框架和swoole、workerman配合恰到好处,与此同时,框架还解决了经典的MYSQL 8小时无操作导致的MySQL server has gone away问题,在onMessage方法下调用 kali::run() 方法即可实现MVC调度模式,用MVC模式去写Websocket逻辑,爽到飞起。
Workerman 原生方式 Websocket
// 注意:请一定一定不要在外围操作db和cache
use Workerman\Worker;
use kaliphp\kali;
require_once __DIR__ . '/../../../vendor/autoload.php';
// app path
define('APPPATH', __DIR__ . '/../../../www/app');
// websocket服务端
$worker = new Worker("websocket://0.0.0.0:9527");
// 启动4个进程对外提供服务
$worker->count = 4;
$worker->onWorkerStart = function($worker)
{
// 框架注册
kali::registry();
};
$worker->onConnect = function($connection) use ($worker)
{
//Worker::log("workerID:{$worker->id} connectionID:{$connection->id} connected");
$connection->onWebSocketConnect = function($connection, $http_header) use ($worker)
{
/**
* 客户端websocket握手时的回调onWebSocketConnect
* 在onWebSocketConnect回调中获得nginx通过http头中的X_REAL_IP值
*/
$connection->realIP = $_SERVER['HTTP_X_REAL_IP'] ?? $connection->getRemoteIp();
};
};
// 当收到客户端发来的数据后返回hello $data给客户端
$worker->onMessage = function($connection, $data) use ($worker)
{
// 接收到客户端心跳包
if ($data == '~H#C~')
{
// 回复一个心跳包
$connection->send('~H#S~');
return;
}
$data = @json_decode($data, true);
// 判断是否存在控制器和方法
if (!$data || empty($data['ct']) || empty($data['ac']))
{
return;
}
// 运行MVC框架
kali::run($data);
};
$worker->onClose = function($connection) use ($worker)
{
Worker::log("connection closed from ip " . $connection->realIP);
};
// 运行worker
Worker::runAll();
Workerman Gateway方式 Websocket
use Workerman\Worker;
use GatewayWorker\Lib\Gateway;
use kaliphp\kali;
use kaliphp\cache;
// app path
define('APPPATH', __DIR__.'/../../../www/app');
/**
* 主逻辑
* 主要是处理 onWorkerStart、onWebSocketConnect、onMessage、onClose 四个方法
* onConnect 如果不需要可以不用实现并删除
*/
class Events
{
public static function onWorkerStart($businessWorker)
{
// 注册框架
kali::registry();
}
public static function onWebSocketConnect($client_id, $data)
{
//echo "client:{$_SERVER['REMOTE_ADDR']}:{$_SERVER['REMOTE_PORT']} gateway:{$_SERVER['GATEWAY_ADDR']}:{$_SERVER['GATEWAY_PORT']} client_id:$client_id onWebSocketConnect:''\n";
// 获取nginx传递过来的真实IP
$_SESSION['ip'] = $data['server']['HTTP_X_REAL_IP'] ?? $_SERVER['REMOTE_ADDR'];
$_GET = array_merge($data['get'], [
'ip' => $_SESSION['ip'],
'client_id' => $client_id,
]);
$_GET['ct'] = $_GET['ct'] ?? 'index';
$_GET['ac'] = $_GET['ac'] ?? 'online';
// 给 onClose 事件使用
$_SESSION['ct'] = $_GET['ct'];
kali::run($_GET);
}
/**
* 当客户端发来消息时触发
* @param int $client_id 连接id
* @param mixed $message 具体消息
*/
public static function onMessage($client_id, $message)
{
// 心跳包
if ( $message == '~H#C~' )
{
Gateway::sendToCurrentClient('~H#S~');
return;
}
// 客户端传递的不是json数据
$_GET = @json_decode($message, true);
if( !$_GET || !is_array($_GET) || !isset($_GET['ct']) || !isset($_GET['ac']) )
{
return ;
}
// 存在msgid(客户端自己生成的随机数),说明客户端需要回执
if (!empty($_GET['msgid']))
{
$cache_key = $client_id."-".$_GET['msgid'];
// 消息重复收到
if (cache::has($cache_key))
{
return;
}
// 消息缓存5s
cache::set($cache_key, 1, 5);
// 发送回执
Gateway::sendToCurrentClient(json_encode([
'type' => 'pongMsg',
'msgid' => $_GET['msgid']
]));
}
$_GET['ip'] = $_SESSION['ip'];
$_GET['client_id'] = $client_id;
// 运行MVC
kali::run($_GET);
}
/**
* 当用户断开连接时触发
* @param int $client_id 连接id
*/
public static function onClose($client_id)
{
//echo "client:{$_SERVER['REMOTE_ADDR']}:{$_SERVER['REMOTE_PORT']} gateway:{$_SERVER['GATEWAY_ADDR']}:{$_SERVER['GATEWAY_PORT']} client_id:$client_id onClose:''\n";
$_GET = [
'ct' => $_SESSION['ct'] ?? 'game',
'ac' => 'offline',
'ip' => $_SESSION['ip'],
'client_id' => $client_id,
];
// 运行MVC
kali::run($_GET);
}
}
Last updated