Skip to content

Commit

Permalink
重新梳理websocket功能
Browse files Browse the repository at this point in the history
  • Loading branch information
yunwuxin committed Jan 19, 2021
1 parent 25a7665 commit 0cfb71c
Show file tree
Hide file tree
Showing 13 changed files with 194 additions and 374 deletions.
5 changes: 2 additions & 3 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@

.DS_Store
*.xml
.idea/think-swoole.iml
.idea
composer.lock
vendor
vendor
16 changes: 16 additions & 0 deletions src/Job.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
<?php

namespace think\swoole;

class Job
{
public $name;

public $params;

public function __construct($name, $params)
{
$this->name = $name;
$this->params = $params;
}
}
16 changes: 0 additions & 16 deletions src/Service.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,32 +11,16 @@

namespace think\swoole;

use think\Route;
use think\swoole\command\Rpc;
use think\swoole\command\RpcInterface;
use think\swoole\command\Server as ServerCommand;
use think\swoole\websocket\socketio\Controller;

class Service extends \think\Service
{

public function boot()
{
$this->commands(ServerCommand::class, RpcInterface::class, Rpc::class);

if ($this->app->config->get('swoole.websocket.enable', false)) {
$this->registerRoutes(function (Route $route) {
$route->group(function () use ($route) {
$route->get('socket.io/', '@upgrade');
$route->post('socket.io/', '@reject');
})
->prefix(Controller::class)
->allowCrossDomain([
'Access-Control-Allow-Credentials' => 'true',
'X-XSS-Protection' => 0,
]);
});
}
}

}
92 changes: 68 additions & 24 deletions src/Websocket.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,17 @@
namespace think\swoole;

use Swoole\Server;
use think\swoole\concerns\InteractsWithCoordinator;
use think\swoole\contract\websocket\ParserInterface;
use Swoole\WebSocket\Frame;
use think\Event;
use think\Request;
use think\swoole\websocket\Pusher;
use think\swoole\websocket\Room;

/**
* Class Websocket
*/
class Websocket
{
use InteractsWithCoordinator;

public const PUSH_ACTION = 'push';
public const EVENT_CONNECT = 'connect';

/**
* @var Server
*/
Expand All @@ -27,11 +24,6 @@ class Websocket
*/
protected $room;

/**
* @var ParserInterface
*/
protected $parser;

/**
* Scoket sender's fd.
*
Expand All @@ -53,18 +45,53 @@ class Websocket
*/
protected $isBroadcast = false;

/** @var Event */
protected $event;

/**
* Websocket constructor.
*
* @param Server $server
* @param Room $room
* @param ParserInterface $parser
* @param Event $event
*/
public function __construct(Server $server, Room $room, ParserInterface $parser)
public function __construct(Server $server, Room $room, Event $event)
{
$this->server = $server;
$this->room = $room;
$this->parser = $parser;
$this->event = $event;
}

/**
* "onOpen" listener.
*
* @param int $fd
* @param Request $request
*/
public function onOpen($fd, Request $request)
{
$this->event->trigger("swoole.websocket.Open", $request);
}

/**
* "onMessage" listener.
*
* @param Frame $frame
*/
public function onMessage(Frame $frame)
{
$this->event->trigger("swoole.websocket.Event", $this->decode($frame->data));
}

/**
* "onClose" listener.
*
* @param int $fd
* @param int $reactorId
*/
public function onClose($fd, $reactorId)
{
$this->event->trigger("swoole.websocket.Close", $reactorId);
}

/**
Expand Down Expand Up @@ -145,6 +172,24 @@ public function leave($rooms = []): self
return $this;
}

protected function encode(string $event, $data)
{
return json_encode([
'type' => $event,
'data' => $data,
]);
}

protected function decode($payload)
{
$data = json_decode($payload, true);

return [
'type' => $data['type'] ?? null,
'data' => $data['data'] ?? null,
];
}

/**
* Emit data and reset some status.
*
Expand All @@ -163,17 +208,16 @@ public function emit(string $event, $data = null): bool
return false;
}

$result = $this->server->task([
'action' => static::PUSH_ACTION,
'data' => [
'sender' => $this->getSender() ?: 0,
'descriptors' => $fds,
'broadcast' => $this->isBroadcast(),
'assigned' => $assigned,
'payload' => $this->parser->encode($event, $data),
],
$job = new Job([Pusher::class, 'push'], [
'sender' => $this->getSender() ?: 0,
'descriptors' => $fds,
'broadcast' => $this->isBroadcast(),
'assigned' => $assigned,
'payload' => $this->encode($event, $data),
]);

$result = $this->server->task($job);

return $result !== false;
} finally {
$this->reset();
Expand Down
15 changes: 13 additions & 2 deletions src/concerns/InteractsWithServer.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
use think\exception\Handle;
use think\helper\Str;
use think\swoole\FileWatcher;
use think\swoole\Job;
use Throwable;

/**
Expand Down Expand Up @@ -109,8 +110,18 @@ public function onWorkerStart($server)
*/
public function onTask($server, Task $task)
{
$this->runInSandbox(function (Event $event) use ($task) {
$event->trigger('swoole.task', $task);
$this->runInSandbox(function (Event $event, App $app) use ($task) {
if ($task->data instanceof Job) {
if (is_array($task->data->name)) {
[$class, $method] = $task->data->name;
$object = $app->invokeClass($class, $task->data->params);
$object->{$method}();
} else {
$app->invoke($task->data->name, $task->data->params);
}
} else {
$event->trigger('swoole.task', $task);
}
}, $task->id);
}

Expand Down
82 changes: 11 additions & 71 deletions src/concerns/InteractsWithWebsocket.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,17 @@
namespace think\swoole\concerns;

use Swoole\Http\Request;
use Swoole\Server\Task;
use Swoole\Websocket\Frame;
use Swoole\Websocket\Server;
use think\App;
use think\Container;
use think\Event;
use think\helper\Str;
use think\Pipeline;
use think\swoole\contract\websocket\HandlerInterface;
use think\swoole\contract\websocket\ParserInterface;
use think\swoole\contract\websocket\RoomInterface;
use think\swoole\Websocket;
use think\swoole\websocket\Pusher;
use think\swoole\websocket\Room;
use think\swoole\websocket\socketio\Handler;
use think\swoole\websocket\socketio\Parser as SocketioParser;

/**
* Trait InteractsWithWebsocket
Expand Down Expand Up @@ -57,18 +52,12 @@ public function onOpen($server, $req)
{
$this->waitCoordinator('workerStart');

$this->runInSandbox(function (Event $event, HandlerInterface $handler, App $app, Websocket $websocket) use ($req) {
$websocket->setSender($req->fd);

$this->runInSandbox(function (App $app, Websocket $websocket) use ($req) {
$request = $this->prepareRequest($req);
$app->instance('request', $request);

$websocket->resumeCoordinator('onOpen', function () use ($event, $req, $handler, $request, $app) {
$request = $this->setRequestThroughMiddleware($app, $request);
if (!$handler->onOpen($req->fd, $request)) {
$event->trigger("swoole.websocket.Connect", $request);
}
});
$request = $this->setRequestThroughMiddleware($app, $request);
$websocket->setSender($req->fd);
$websocket->onOpen($req->fd, $request);
}, $req->fd, true);
}

Expand All @@ -80,18 +69,9 @@ public function onOpen($server, $req)
*/
public function onMessage($server, $frame)
{
$this->runInSandbox(function (Event $event, ParserInterface $parser, HandlerInterface $handler, Websocket $websocket) use ($frame) {
$websocket->waitCoordinator('onOpen');
$this->runInSandbox(function (Websocket $websocket) use ($frame) {
$websocket->setSender($frame->fd);
if (!$handler->onMessage($frame)) {
$payload = $parser->decode($frame);

['event' => $name, 'data' => $data] = $payload;
$name = Str::studly($name);
if (!in_array($name, ['Close', 'Connect'])) {
$event->trigger("swoole.websocket." . $name, $data);
}
}
$websocket->onMessage($frame);
}, $frame->fd, true);
}

Expand All @@ -108,13 +88,10 @@ public function onClose($server, $fd, $reactorId)
return;
}

$this->runInSandbox(function (Event $event, HandlerInterface $handler, Websocket $websocket) use ($fd, $reactorId) {
$websocket->waitCoordinator('onOpen');
$this->runInSandbox(function (Websocket $websocket) use ($fd, $reactorId) {
$websocket->setSender($fd);
try {
if (!$handler->onClose($fd, $reactorId)) {
$event->trigger("swoole.websocket.Close");
}
$websocket->onClose($fd, $reactorId);
} finally {
// leave all rooms
$websocket->leave();
Expand Down Expand Up @@ -164,7 +141,6 @@ protected function prepareWebsocket()

$this->onEvent('workerStart', function () {
$this->bindWebsocketRoom();
$this->bindWebsocketParser();
$this->bindWebsocketHandler();
$this->prepareWebsocketListener();
});
Expand Down Expand Up @@ -205,14 +181,6 @@ protected function prepareWebsocketListener()
foreach ($subscribers as $subscriber) {
$this->app->event->observe($subscriber, 'swoole.websocket.');
}

//消息推送任务
$this->app->event->listen('swoole.task', function (Task $task, App $app) {
if ($this->isWebsocketPushPayload($task->data)) {
$pusher = $app->make(Pusher::class, $task->data['data']);
$pusher->push();
}
});
}

/**
Expand All @@ -222,20 +190,9 @@ protected function prepareWebsocketListener()
*/
protected function bindWebsocketHandler()
{
$handlerClass = $this->getConfig('websocket.handler', Handler::class);

$this->app->bind(HandlerInterface::class, $handlerClass);

$this->app->make(HandlerInterface::class);
}

protected function bindWebsocketParser()
{
$parserClass = $this->getConfig('websocket.parser', SocketioParser::class);

$this->app->bind(ParserInterface::class, $parserClass);

$this->app->make(ParserInterface::class);
if (($handlerClass = $this->getConfig('websocket.handler')) && $handlerClass instanceof Websocket) {
$this->app->bind(Websocket::class, $handlerClass);
}
}

/**
Expand All @@ -246,21 +203,4 @@ protected function bindWebsocketRoom(): void
$this->app->instance(Room::class, $this->websocketRoom);
}

/**
* Indicates if the payload is websocket push.
*
* @param mixed $payload
*
* @return boolean
*/
public function isWebsocketPushPayload($payload): bool
{
if (!is_array($payload)) {
return false;
}

return $this->isWebsocketServer
&& ($payload['action'] ?? null) === Websocket::PUSH_ACTION
&& array_key_exists('data', $payload);
}
}
1 change: 0 additions & 1 deletion src/config/swoole.php
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
'websocket' => [
'enable' => false,
'handler' => Handler::class,
'parser' => Parser::class,
'ping_interval' => 25000,
'ping_timeout' => 60000,
'room' => [
Expand Down
Loading

0 comments on commit 0cfb71c

Please sign in to comment.