diff --git a/composer.json b/composer.json index aef47e4..3c7d115 100644 --- a/composer.json +++ b/composer.json @@ -10,13 +10,13 @@ ], "require": { "php": ">7.1", - "ext-swoole": ">=4.4.8", "ext-json": "*", - "topthink/framework": "^6.0", - "symfony/finder": "^4.3.2|^5.1", - "swoole/ide-helper": "^4.3", + "ext-swoole": ">=4.4.8", "nette/php-generator": "^3.2", - "open-smf/connection-pool": "~1.0" + "open-smf/connection-pool": "~1.0", + "swoole/ide-helper": "^4.3", + "symfony/finder": "^4.3.2|^5.1", + "topthink/framework": "^6.0" }, "require-dev": { "symfony/var-dumper": "^4.3|^5.1" diff --git a/src/Middleware.php b/src/Middleware.php new file mode 100644 index 0000000..a5c3cf9 --- /dev/null +++ b/src/Middleware.php @@ -0,0 +1,77 @@ +app = $app; + + foreach ($middlewares as $middleware) { + $this->queue[] = $this->buildMiddleware($middleware); + } + } + + public static function make(App $app, $middlewares = []) + { + return new self($app, $middlewares); + } + + /** + * 调度管道 + * @return Pipeline + */ + public function pipeline() + { + return (new Pipeline()) + ->through(array_map(function ($middleware) { + return function ($request, $next) use ($middleware) { + [$call, $params] = $middleware; + + if (is_array($call) && is_string($call[0])) { + $call = [$this->app->make($call[0]), $call[1]]; + } + return call_user_func($call, $request, $next, ...$params); + }; + }, $this->queue)); + } + + /** + * 解析中间件 + * @param mixed $middleware + * @return array + */ + protected function buildMiddleware($middleware): array + { + if (is_array($middleware)) { + [$middleware, $params] = $middleware; + } + + if ($middleware instanceof Closure) { + return [$middleware, $params ?? []]; + } + + if (!is_string($middleware)) { + throw new InvalidArgumentException('The middleware is invalid'); + } + + return [[$middleware, 'handle'], $params ?? []]; + } +} diff --git a/src/RpcManager.php b/src/RpcManager.php index 310af59..75cbd69 100644 --- a/src/RpcManager.php +++ b/src/RpcManager.php @@ -114,9 +114,10 @@ public function attachToServer(Port $port) protected function bindRpcDispatcher() { - $services = $this->getConfig('rpc.server.services', []); + $services = $this->getConfig('rpc.server.services', []); + $middleware = $this->getConfig('rpc.server.middleware', []); - $this->app->make(Dispatcher::class, [$services]); + $this->app->make(Dispatcher::class, [$services, $middleware]); } protected function bindRpcParser() @@ -143,9 +144,9 @@ protected function recv(Server $server, $fd, $data, $callback) [$header, $data] = Packer::unpack($data); $this->channels[$fd] = new Channel($header); - } catch (Throwable | Exception $e) { + } catch (Throwable $e) { //错误的包头 - Coroutine::create($callback, Error::make(Dispatcher::INTERNAL_ERROR, $e->getMessage())); + Coroutine::create($callback, Error::make(Dispatcher::INVALID_REQUEST, $e->getMessage())); return $server->close($fd); } diff --git a/src/concerns/InteractsWithRpcClient.php b/src/concerns/InteractsWithRpcClient.php index 0e432b9..af4c3d0 100644 --- a/src/concerns/InteractsWithRpcClient.php +++ b/src/concerns/InteractsWithRpcClient.php @@ -13,6 +13,7 @@ use think\swoole\rpc\client\Proxy; use think\swoole\rpc\JsonParser; use think\swoole\rpc\Packer; +use Throwable; /** * Trait InteractsWithRpcClient @@ -65,14 +66,15 @@ protected function bindRpcClientPool() $parserClass = $this->getConfig("rpc.client.{$name}.parser", JsonParser::class); $parser = $this->app->make($parserClass); $gateway = new Gateway($this->createRpcConnector($name), $parser); + $middleware = $this->getConfig("rpc.client.{$name}.middleware", []); foreach ($abstracts as $abstract) { - $this->app->bind($abstract, function () use ($gateway, $name, $abstract) { - return $this->app->invokeClass(Proxy::getClassName($name, $abstract), [$gateway]); + $this->app->bind($abstract, function (App $app) use ($middleware, $gateway, $name, $abstract) { + return $app->invokeClass(Proxy::getClassName($name, $abstract), [$gateway, $middleware]); }); } } - } catch (\Exception | \Throwable $e) { + } catch (Throwable $e) { } } } diff --git a/src/concerns/InteractsWithWebsocket.php b/src/concerns/InteractsWithWebsocket.php index 2a98e98..168747a 100644 --- a/src/concerns/InteractsWithWebsocket.php +++ b/src/concerns/InteractsWithWebsocket.php @@ -8,8 +8,8 @@ use think\App; use think\Container; use think\helper\Str; -use think\Pipeline; use think\swoole\contract\websocket\RoomInterface; +use think\swoole\Middleware; use think\swoole\Websocket; use think\swoole\websocket\Room; @@ -104,21 +104,9 @@ public function onClose($server, $fd, $reactorId) */ protected function setRequestThroughMiddleware(App $app, \think\Request $request) { - $middleware = $this->getConfig('websocket.middleware', []); - - return (new Pipeline()) + return Middleware::make($app, $this->getConfig('websocket.middleware', [])) + ->pipeline() ->send($request) - ->through(array_map(function ($middleware) use ($app) { - return function ($request, $next) use ($app, $middleware) { - if (is_array($middleware)) { - [$middleware, $param] = $middleware; - } - if (is_string($middleware)) { - $middleware = [$app->make($middleware), 'handle']; - } - return $middleware($request, $next, $param ?? null); - }; - }, $middleware)) ->then(function ($request) { return $request; }); @@ -182,9 +170,7 @@ protected function prepareWebsocketListener() } /** - * Prepare websocket handler for onOpen and onClose callback. - * - * @throws \Exception + * Prepare websocket handler for onOpen and onClose callback */ protected function bindWebsocketHandler() { diff --git a/src/rpc/JsonParser.php b/src/rpc/JsonParser.php index f8c16c0..87556ca 100644 --- a/src/rpc/JsonParser.php +++ b/src/rpc/JsonParser.php @@ -4,6 +4,7 @@ use Exception; use think\swoole\contract\rpc\ParserInterface; +use think\swoole\rpc\server\Dispatcher; class JsonParser implements ParserInterface { @@ -29,12 +30,11 @@ public function encode(Protocol $protocol): string 'jsonrpc' => self::VERSION, 'method' => $method, 'params' => $protocol->getParams(), + 'context' => $protocol->getContext(), 'id' => '', ]; - $string = json_encode($data, JSON_UNESCAPED_UNICODE); - - return $string; + return json_encode($data, JSON_UNESCAPED_UNICODE); } /** @@ -49,23 +49,27 @@ public function decode(string $string): Protocol $error = json_last_error(); if ($error != JSON_ERROR_NONE) { throw new Exception( - sprintf('Data(%s) is not json format!', $string) + sprintf('Data(%s) is not json format!', $string), + Dispatcher::PARSER_ERROR ); } - $method = $data['method'] ?? ''; - $params = $data['params'] ?? []; + $method = $data['method'] ?? ''; + $params = $data['params'] ?? []; + $context = $data['context'] ?? []; if (empty($method)) { throw new Exception( - sprintf('Method(%s) cant not be empty!', $string) + sprintf('Method(%s) cant not be empty!', $string), + Dispatcher::INVALID_PARAMS ); } $methodAry = explode(self::DELIMITER, $method); if (count($methodAry) < 2) { throw new Exception( - sprintf('Method(%s) is bad format!', $method) + sprintf('Method(%s) is bad format!', $method), + Dispatcher::INVALID_PARAMS ); } @@ -73,11 +77,12 @@ public function decode(string $string): Protocol if (empty($interfaceClass) || empty($methodName)) { throw new Exception( - sprintf('Interface(%s) or Method(%s) can not be empty!', $interfaceClass, $method) + sprintf('Interface(%s) or Method(%s) can not be empty!', $interfaceClass, $method), + Dispatcher::INVALID_PARAMS ); } - return Protocol::make($interfaceClass, $methodName, $params); + return Protocol::make($interfaceClass, $methodName, $params, $context); } /** @@ -109,6 +114,7 @@ public function encodeResponse($result): string { $data = [ 'jsonrpc' => self::VERSION, + 'id' => '', ]; if ($result instanceof Error) { @@ -117,8 +123,6 @@ public function encodeResponse($result): string $data['result'] = $result; } - $string = json_encode($data); - - return $string; + return json_encode($data); } } diff --git a/src/rpc/Protocol.php b/src/rpc/Protocol.php index b09102a..1f843b5 100644 --- a/src/rpc/Protocol.php +++ b/src/rpc/Protocol.php @@ -22,22 +22,28 @@ class Protocol */ private $params = []; + /** + * @var array + */ + private $context = []; + /** * Replace constructor * * @param string $interface * @param string $method - * @param array $params - * + * @param array $params + * @param array $context * @return Protocol */ - public static function make(string $interface, string $method, array $params) + public static function make(string $interface, string $method, array $params, array $context = []) { $instance = new static(); $instance->interface = $interface; $instance->method = $method; $instance->params = $params; + $instance->context = $context; return $instance; } @@ -66,4 +72,44 @@ public function getParams(): array return $this->params; } + /** + * @return array + */ + public function getContext(): array + { + return $this->context; + } + + /** + * @param string $interface + */ + public function setInterface(string $interface): void + { + $this->interface = $interface; + } + + /** + * @param string $method + */ + public function setMethod(string $method): void + { + $this->method = $method; + } + + /** + * @param array $params + */ + public function setParams(array $params): void + { + $this->params = $params; + } + + /** + * @param array $context + */ + public function setContext(array $context): void + { + $this->context = $context; + } + } diff --git a/src/rpc/client/Proxy.php b/src/rpc/client/Proxy.php index 66d2a1b..1731763 100644 --- a/src/rpc/client/Proxy.php +++ b/src/rpc/client/Proxy.php @@ -6,6 +6,8 @@ use Nette\PhpGenerator\Factory; use Nette\PhpGenerator\PhpNamespace; use ReflectionClass; +use think\App; +use think\swoole\Middleware; use think\swoole\rpc\Protocol; abstract class Proxy @@ -15,16 +17,28 @@ abstract class Proxy /** @var Gateway */ protected $gateway; - final public function __construct(Gateway $gateway) + /** @var App */ + protected $app; + + protected $middleware = []; + + final public function __construct(App $app, Gateway $gateway, $middleware) { - $this->gateway = $gateway; + $this->app = $app; + $this->gateway = $gateway; + $this->middleware = $middleware; } final protected function proxyCall($method, $params) { $protocol = Protocol::make($this->interface, $method, $params); - return $this->gateway->sendAndRecv($protocol); + return Middleware::make($this->app, $this->middleware) + ->pipeline() + ->send($protocol) + ->then(function (Protocol $protocol) { + return $this->gateway->sendAndRecv($protocol); + }); } final public static function getClassName($client, $interface) diff --git a/src/rpc/server/Dispatcher.php b/src/rpc/server/Dispatcher.php index 3f36318..18188ce 100644 --- a/src/rpc/server/Dispatcher.php +++ b/src/rpc/server/Dispatcher.php @@ -2,14 +2,16 @@ namespace think\swoole\rpc\server; -use Exception; use ReflectionClass; +use ReflectionException; use ReflectionMethod; use ReflectionNamedType; use RuntimeException; use Swoole\Server; use think\App; +use think\Pipeline; use think\swoole\contract\rpc\ParserInterface; +use think\swoole\Middleware; use think\swoole\rpc\Error; use think\swoole\rpc\File; use think\swoole\rpc\Packer; @@ -53,18 +55,21 @@ class Dispatcher protected $files = []; - public function __construct(App $app, ParserInterface $parser, Server $server, $services) + protected $middleware = []; + + public function __construct(App $app, ParserInterface $parser, Server $server, $services, $middleware = []) { $this->app = $app; $this->parser = $parser; $this->server = $server; $this->prepareServices($services); + $this->middleware = $middleware; } /** * 获取服务接口 * @param $services - * @throws \ReflectionException + * @throws ReflectionException */ protected function prepareServices($services) { @@ -155,29 +160,9 @@ public function dispatch(int $fd, $data) break; default: $protocol = $this->parser->decode($data); - - $interface = $protocol->getInterface(); - $method = $protocol->getMethod(); - $params = $protocol->getParams(); - - //文件参数 - foreach ($params as $index => $param) { - if ($param === Protocol::FILE) { - $params[$index] = array_shift($this->files[$fd]); - } - } - - $service = $this->services[$interface] ?? null; - if (empty($service)) { - throw new RuntimeException( - sprintf('Service %s is not founded!', $interface), - self::INVALID_REQUEST - ); - } - - $result = $this->app->invoke([$this->app->make($service['class']), $method], $params); + $result = $this->dispatchWithMiddleware($protocol, $fd); } - } catch (Throwable | Exception $e) { + } catch (Throwable $e) { $result = Error::make($e->getCode(), $e->getMessage()); } @@ -188,4 +173,33 @@ public function dispatch(int $fd, $data) unset($this->files[$fd]); } + protected function dispatchWithMiddleware(Protocol $protocol, $fd) + { + return Middleware::make($this->app, $this->middleware) + ->pipeline() + ->send($protocol) + ->then(function (Protocol $protocol) use ($fd) { + + $interface = $protocol->getInterface(); + $method = $protocol->getMethod(); + $params = $protocol->getParams(); + + //文件参数 + foreach ($params as $index => $param) { + if ($param === Protocol::FILE) { + $params[$index] = array_shift($this->files[$fd]); + } + } + + $service = $this->services[$interface] ?? null; + if (empty($service)) { + throw new RuntimeException( + sprintf('Service %s is not founded!', $interface), + self::METHOD_NOT_FOUND + ); + } + + return $this->app->invoke([$this->app->make($service['class']), $method], $params); + }); + } }