diff --git a/src/Websocket.php b/src/Websocket.php index 918a8a6..a69b969 100644 --- a/src/Websocket.php +++ b/src/Websocket.php @@ -80,6 +80,7 @@ public function onOpen($fd, Request $request) */ public function onMessage(Frame $frame) { + $this->event->trigger("swoole.websocket.Message", $frame); $this->event->trigger("swoole.websocket.Event", $this->decode($frame->data)); } @@ -172,33 +173,7 @@ public function leave($rooms = []): self return $this; } - protected function encode(string $event, $data) - { - return json_encode([ - 'type' => $event, - 'data' => $data, - ]); - } - - protected function decode($payload) - { - $data = json_decode($payload, true); - - return [ - 'type' => $data['type'] ?? null, - 'data' => $data['data'] ?? null, - ]; - } - - /** - * Emit data and reset some status. - * - * @param string - * @param mixed - * - * @return boolean - */ - public function emit(string $event, $data = null): bool + public function push($data) { $fds = $this->getFds(); $assigned = !empty($this->getTo()); @@ -213,7 +188,7 @@ public function emit(string $event, $data = null): bool 'descriptors' => $fds, 'broadcast' => $this->isBroadcast(), 'assigned' => $assigned, - 'payload' => $this->encode($event, $data), + 'payload' => $data, ]); $result = $this->server->task($job); @@ -224,6 +199,29 @@ public function emit(string $event, $data = null): bool } } + public function emit(string $event, $data = null): bool + { + return $this->push($this->encode([ + 'type' => $event, + 'data' => $data, + ])); + } + + protected function encode($packet) + { + return json_encode($packet); + } + + protected function decode($payload) + { + $data = json_decode($payload, true); + + return [ + 'type' => $data['type'] ?? null, + 'data' => $data['data'] ?? null, + ]; + } + /** * Close current connection. * diff --git a/src/contract/websocket/HandlerInterface.php b/src/contract/websocket/HandlerInterface.php deleted file mode 100644 index 432aeac..0000000 --- a/src/contract/websocket/HandlerInterface.php +++ /dev/null @@ -1,42 +0,0 @@ - -// +---------------------------------------------------------------------- - -namespace think\swoole\contract\websocket; - -use Swoole\Websocket\Frame; -use think\Request; - -interface HandlerInterface -{ - /** - * "onOpen" listener. - * - * @param int $fd - * @param Request $request - */ - public function onOpen($fd, Request $request); - - /** - * "onMessage" listener. - * only triggered when event handler not found - * - * @param Frame $frame - */ - public function onMessage(Frame $frame); - - /** - * "onClose" listener. - * - * @param int $fd - * @param int $reactorId - */ - public function onClose($fd, $reactorId); -} diff --git a/src/websocket/socketio/EnginePacket.php b/src/websocket/socketio/EnginePacket.php new file mode 100644 index 0000000..1c18bb6 --- /dev/null +++ b/src/websocket/socketio/EnginePacket.php @@ -0,0 +1,81 @@ +type = $type; + $this->data = $data; + } + + public static function open($payload) + { + return new static(self::OPEN, $payload); + } + + public static function pong($payload = '') + { + return new static(self::PONG, $payload); + } + + public static function ping() + { + return new static(self::PING); + } + + public static function message($payload) + { + return new static(self::MESSAGE, $payload); + } + + public static function fromString(string $packet) + { + return new static($packet{0}, substr($packet, 1) ?? ''); + } + + public function toString() + { + return $this->type . $this->data; + } +} diff --git a/src/websocket/socketio/Handler.php b/src/websocket/socketio/Handler.php index 5e7be3c..9ead8af 100644 --- a/src/websocket/socketio/Handler.php +++ b/src/websocket/socketio/Handler.php @@ -4,6 +4,7 @@ use Exception; use Swoole\Server; +use Swoole\Timer; use Swoole\Websocket\Frame; use think\Config; use think\Event; @@ -18,9 +19,17 @@ class Handler extends Websocket protected $eio; + protected $pingTimeoutTimer = null; + protected $pingIntervalTimer = null; + + protected $pingInterval; + protected $pingTimeout; + public function __construct(Server $server, Room $room, Event $event, Config $config) { - $this->config = $config; + $this->config = $config; + $this->pingInterval = $this->config->get('swoole.websocket.ping_interval', 25000); + $this->pingTimeout = $this->config->get('swoole.websocket.ping_timeout', 60000); parent::__construct($server, $room, $event); } @@ -34,37 +43,24 @@ public function onOpen($fd, Request $request) { $this->eio = $request->param('EIO'); - $payload = json_encode( + $payload = json_encode( [ 'sid' => base64_encode(uniqid()), 'upgrades' => [], - 'pingInterval' => $this->config->get('swoole.websocket.ping_interval'), - 'pingTimeout' => $this->config->get('swoole.websocket.ping_timeout'), + 'pingInterval' => $this->pingInterval, + 'pingTimeout' => $this->pingTimeout, ] ); - $initPayload = Packet::OPEN . $payload; - if ($this->server->isEstablished($fd)) { - $this->server->push($fd, $initPayload); - } - if ($this->eio < 4) { - $this->onConnect($fd); - } - } + $this->push(EnginePacket::open($payload)); - protected function onConnect($fd, $data = null) - { - try { - $this->event->trigger('swoole.websocket.Connect', $data); - $payload = Packet::MESSAGE . Packet::CONNECT; - if ($this->eio >= 4) { - $payload .= json_encode(['sid' => base64_encode(uniqid())]); - } - } catch (Exception $exception) { - $payload = Packet::MESSAGE . Packet::CONNECT_ERROR . json_encode(['message' => $exception->getMessage()]); - } - if ($this->server->isEstablished($fd)) { - $this->server->push($fd, $payload); + $this->event->trigger("swoole.websocket.Open", $request); + + if ($this->eio < 4) { + $this->resetPingTimeout($this->pingInterval + $this->pingTimeout); + $this->onConnect(); + } else { + $this->schedulePing(); } } @@ -72,72 +68,130 @@ protected function onConnect($fd, $data = null) * "onMessage" listener. * * @param Frame $frame - * @return bool */ public function onMessage(Frame $frame) { - $packet = new Packet($frame->data); + $enginePacket = EnginePacket::fromString($frame->data); + + $this->event->trigger("swoole.websocket.Message", $enginePacket); - switch ($packet->getEngineType()) { - case Packet::MESSAGE: - $payload = substr($packet->getPayload(), 1); - switch ($packet->getSocketType()) { + $this->resetPingTimeout($this->pingInterval + $this->pingTimeout); + + switch ($enginePacket->type) { + case EnginePacket::MESSAGE: + $packet = $this->decode($enginePacket->data); + switch ($packet->type) { case Packet::CONNECT: - $this->onConnect($frame->fd, $payload); + $this->onConnect($packet->data); break; case Packet::EVENT: case Packet::ACK: - $start = strpos($payload, '['); - - if ($start > 0) { - $id = substr($payload, 0, $start); - $payload = substr($payload, $start); - } + $result = $this->event->trigger('swoole.websocket.Event', $packet->data); - $result = $this->event->trigger('swoole.websocket.Event', $this->decode($payload)); + if ($packet->id !== null) { + $responsePacket = Packet::create(Packet::ACK, [ + 'id' => $packet->id, + 'nsp' => $packet->nsp, + 'data' => end($result), + ]); - if (isset($id)) { - $this->server->push($frame->fd, $this->pack(Packet::ACK . $id, end($result))); + $this->push($responsePacket); } break; case Packet::DISCONNECT: $this->event->trigger('swoole.websocket.Disconnect'); + $this->close(); + break; + default: + $this->close(); break; } break; - case Packet::PING: - if ($this->server->isEstablished($frame->fd)) { - $this->server->push($frame->fd, Packet::PONG . $packet->getPayload()); - } + case EnginePacket::PING: + $this->push(EnginePacket::pong($enginePacket->data)); + break; + case EnginePacket::PONG: + $this->schedulePing(); + break; + default: + $this->close(); break; } + } - return true; + /** + * "onClose" listener. + * + * @param int $fd + * @param int $reactorId + */ + public function onClose($fd, $reactorId) + { + Timer::clear($this->pingTimeoutTimer); + Timer::clear($this->pingIntervalTimer); + $this->event->trigger("swoole.websocket.Close", $reactorId); } - protected function decode($payload) + protected function onConnect($data = null) + { + try { + $this->event->trigger('swoole.websocket.Connect', $data); + $packet = Packet::create(Packet::CONNECT); + if ($this->eio >= 4) { + $packet->data = ['sid' => base64_encode(uniqid())]; + } + } catch (Exception $exception) { + $packet = Packet::create(Packet::CONNECT_ERROR, [ + 'data' => ['message' => $exception->getMessage()], + ]); + } + + $this->push($packet); + } + + protected function resetPingTimeout($timeout) { - $data = json_decode($payload, true); + Timer::clear($this->pingTimeoutTimer); + $this->pingTimeoutTimer = Timer::after($timeout, function () { + $this->close(); + }); + } - return [ - 'type' => $data[0], - 'data' => $data[1] ?? null, - ]; + protected function schedulePing() + { + Timer::clear($this->pingIntervalTimer); + $this->pingIntervalTimer = Timer::after($this->pingIntervalTimer, function () { + $this->push(EnginePacket::ping()); + $this->resetPingTimeout($this->pingTimeout); + }); } - protected function pack($type, ...$args) + protected function encode($packet) { - $packet = Packet::MESSAGE . $type; + return Parser::encode($packet); + } - $data = implode(",", array_map(function ($arg) { - return json_encode($arg); - }, $args)); + protected function decode($payload) + { + return Parser::decode($payload); + } - return "{$packet}[{$data}]"; + public function push($data) + { + if ($data instanceof Packet) { + $data = EnginePacket::message($this->encode($data)); + } + if ($data instanceof EnginePacket) { + $data = $data->toString(); + } + return parent::push($data); } - protected function encode(string $event, $data) + public function emit(string $event, $data = null): bool { - return $this->pack(Packet::EVENT, $event, $data); + $packet = Packet::create(Packet::EVENT, [ + 'data' => [$event, $data], + ]); + return $this->push($packet); } } diff --git a/src/websocket/socketio/Packet.php b/src/websocket/socketio/Packet.php index b6cc20c..c093f0a 100644 --- a/src/websocket/socketio/Packet.php +++ b/src/websocket/socketio/Packet.php @@ -8,94 +8,60 @@ class Packet { /** - * Socket.io packet type `open`. - */ - const OPEN = 0; - - /** - * Socket.io packet type `close`. - */ - const CLOSE = 1; - - /** - * Socket.io packet type `ping`. - */ - const PING = 2; - - /** - * Socket.io packet type `pong`. - */ - const PONG = 3; - - /** - * Socket.io packet type `message`. - */ - const MESSAGE = 4; - - /** - * Socket.io packet type 'upgrade' - */ - const UPGRADE = 5; - - /** - * Socket.io packet type `noop`. - */ - const NOOP = 6; - - /** - * Engine.io packet type `connect`. + * Socket.io packet type `connect`. */ const CONNECT = 0; /** - * Engine.io packet type `disconnect`. + * Socket.io packet type `disconnect`. */ const DISCONNECT = 1; /** - * Engine.io packet type `event`. + * Socket.io packet type `event`. */ const EVENT = 2; /** - * Engine.io packet type `ack`. + * Socket.io packet type `ack`. */ const ACK = 3; /** - * Engine.io packet type `connect_error`. + * Socket.io packet type `connect_error`. */ const CONNECT_ERROR = 4; /** - * Engine.io packet type 'binary event' + * Socket.io packet type 'binary event' */ const BINARY_EVENT = 5; /** - * Engine.io packet type `binary ack`. For acks with binary arguments. + * Socket.io packet type `binary ack`. For acks with binary arguments. */ const BINARY_ACK = 6; - protected $packet = ""; - - public function __construct(string $packet) - { - $this->packet = $packet; - } - - public function getEngineType() - { - return $this->packet[0] ?? null; - } + public $type; + public $nsp = '/'; + public $data = null; + public $id = null; - public function getSocketType() + public function __construct(int $type) { - return $this->packet[1] ?? null; + $this->type = $type; } - public function getPayload() + public static function create($type, array $decoded = []) { - return substr($this->packet, 1) ?: ''; + $new = new static($type); + $new->id = $decoded['id'] ?? null; + if (isset($decoded['nsp'])) { + $new->nsp = $decoded['nsp'] ?: '/'; + } else { + $new->nsp = '/'; + } + $new->data = $decoded['data'] ?? null; + return $new; } } diff --git a/src/websocket/socketio/Parser.php b/src/websocket/socketio/Parser.php new file mode 100644 index 0000000..4ce4e5f --- /dev/null +++ b/src/websocket/socketio/Parser.php @@ -0,0 +1,74 @@ +type; + if ($packet->nsp && '/' !== $packet->nsp) { + $str .= $packet->nsp . ','; + } + + if (!empty($packet->id)) { + $str .= $packet->id; + } + + if (null !== $packet->data) { + $str .= json_encode($packet->data, JSON_UNESCAPED_UNICODE | JSON_UNESCAPED_SLASHES); + } + return $str; + } + + public static function decode(string $str) + { + $i = 0; + + $packet = new Packet((int) $str{0}); + + // look up namespace (if any) + if ('/' === $str{$i + 1}) { + $nsp = ''; + while (++$i) { + $c = $str{$i}; + if (',' === $c) { + break; + } + $nsp .= $c; + if ($i === strlen($str)) { + break; + } + } + $packet->nsp = $nsp; + } else { + $packet->nsp = '/'; + } + + // look up id + $next = $str{$i + 1}; + if ('' !== $next && is_numeric($next)) { + $id = ''; + while (++$i) { + $c = $str{$i}; + if (null == $c || !is_numeric($c)) { + --$i; + break; + } + $id .= $str{$i}; + if ($i === strlen($str)) { + break; + } + } + $packet->id = intval($id); + } + + // look up json data + if ($str{++$i}) { + $packet->data = json_decode(substr($str, $i), true); + } + + return $packet; + } +}