Skip to content

Commit

Permalink
rpc 支持中间件
Browse files Browse the repository at this point in the history
  • Loading branch information
yunwuxin committed Jan 28, 2021
1 parent 1113099 commit 2f55208
Show file tree
Hide file tree
Showing 9 changed files with 218 additions and 74 deletions.
10 changes: 5 additions & 5 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,13 @@
],
"require": {
"php": ">7.1",
"ext-swoole": ">=4.4.8",
"ext-json": "*",
"topthink/framework": "^6.0",
"symfony/finder": "^4.3.2|^5.1",
"swoole/ide-helper": "^4.3",
"ext-swoole": ">=4.4.8",
"nette/php-generator": "^3.2",
"open-smf/connection-pool": "~1.0"
"open-smf/connection-pool": "~1.0",
"swoole/ide-helper": "^4.3",
"symfony/finder": "^4.3.2|^5.1",
"topthink/framework": "^6.0"
},
"require-dev": {
"symfony/var-dumper": "^4.3|^5.1"
Expand Down
77 changes: 77 additions & 0 deletions src/Middleware.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
<?php

namespace think\swoole;

use Closure;
use InvalidArgumentException;
use think\App;
use think\Pipeline;

class Middleware
{
/**
* 中间件执行队列
* @var array
*/
protected $queue = [];

/**
* @var App
*/
protected $app;

public function __construct(App $app, $middlewares = [])
{
$this->app = $app;

foreach ($middlewares as $middleware) {
$this->queue[] = $this->buildMiddleware($middleware);
}
}

public static function make(App $app, $middlewares = [])
{
return new self($app, $middlewares);
}

/**
* 调度管道
* @return Pipeline
*/
public function pipeline()
{
return (new Pipeline())
->through(array_map(function ($middleware) {
return function ($request, $next) use ($middleware) {
[$call, $params] = $middleware;

if (is_array($call) && is_string($call[0])) {
$call = [$this->app->make($call[0]), $call[1]];
}
return call_user_func($call, $request, $next, ...$params);
};
}, $this->queue));
}

/**
* 解析中间件
* @param mixed $middleware
* @return array
*/
protected function buildMiddleware($middleware): array
{
if (is_array($middleware)) {
[$middleware, $params] = $middleware;
}

if ($middleware instanceof Closure) {
return [$middleware, $params ?? []];
}

if (!is_string($middleware)) {
throw new InvalidArgumentException('The middleware is invalid');
}

return [[$middleware, 'handle'], $params ?? []];
}
}
9 changes: 5 additions & 4 deletions src/RpcManager.php
Original file line number Diff line number Diff line change
Expand Up @@ -114,9 +114,10 @@ public function attachToServer(Port $port)

protected function bindRpcDispatcher()
{
$services = $this->getConfig('rpc.server.services', []);
$services = $this->getConfig('rpc.server.services', []);
$middleware = $this->getConfig('rpc.server.middleware', []);

$this->app->make(Dispatcher::class, [$services]);
$this->app->make(Dispatcher::class, [$services, $middleware]);
}

protected function bindRpcParser()
Expand All @@ -143,9 +144,9 @@ protected function recv(Server $server, $fd, $data, $callback)
[$header, $data] = Packer::unpack($data);

$this->channels[$fd] = new Channel($header);
} catch (Throwable | Exception $e) {
} catch (Throwable $e) {
//错误的包头
Coroutine::create($callback, Error::make(Dispatcher::INTERNAL_ERROR, $e->getMessage()));
Coroutine::create($callback, Error::make(Dispatcher::INVALID_REQUEST, $e->getMessage()));

return $server->close($fd);
}
Expand Down
8 changes: 5 additions & 3 deletions src/concerns/InteractsWithRpcClient.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
use think\swoole\rpc\client\Proxy;
use think\swoole\rpc\JsonParser;
use think\swoole\rpc\Packer;
use Throwable;

/**
* Trait InteractsWithRpcClient
Expand Down Expand Up @@ -65,14 +66,15 @@ protected function bindRpcClientPool()
$parserClass = $this->getConfig("rpc.client.{$name}.parser", JsonParser::class);
$parser = $this->app->make($parserClass);
$gateway = new Gateway($this->createRpcConnector($name), $parser);
$middleware = $this->getConfig("rpc.client.{$name}.middleware", []);

foreach ($abstracts as $abstract) {
$this->app->bind($abstract, function () use ($gateway, $name, $abstract) {
return $this->app->invokeClass(Proxy::getClassName($name, $abstract), [$gateway]);
$this->app->bind($abstract, function (App $app) use ($middleware, $gateway, $name, $abstract) {
return $app->invokeClass(Proxy::getClassName($name, $abstract), [$gateway, $middleware]);
});
}
}
} catch (\Exception | \Throwable $e) {
} catch (Throwable $e) {
}
}
}
Expand Down
22 changes: 4 additions & 18 deletions src/concerns/InteractsWithWebsocket.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@
use think\App;
use think\Container;
use think\helper\Str;
use think\Pipeline;
use think\swoole\contract\websocket\RoomInterface;
use think\swoole\Middleware;
use think\swoole\Websocket;
use think\swoole\websocket\Room;

Expand Down Expand Up @@ -104,21 +104,9 @@ public function onClose($server, $fd, $reactorId)
*/
protected function setRequestThroughMiddleware(App $app, \think\Request $request)
{
$middleware = $this->getConfig('websocket.middleware', []);

return (new Pipeline())
return Middleware::make($app, $this->getConfig('websocket.middleware', []))
->pipeline()
->send($request)
->through(array_map(function ($middleware) use ($app) {
return function ($request, $next) use ($app, $middleware) {
if (is_array($middleware)) {
[$middleware, $param] = $middleware;
}
if (is_string($middleware)) {
$middleware = [$app->make($middleware), 'handle'];
}
return $middleware($request, $next, $param ?? null);
};
}, $middleware))
->then(function ($request) {
return $request;
});
Expand Down Expand Up @@ -182,9 +170,7 @@ protected function prepareWebsocketListener()
}

/**
* Prepare websocket handler for onOpen and onClose callback.
*
* @throws \Exception
* Prepare websocket handler for onOpen and onClose callback
*/
protected function bindWebsocketHandler()
{
Expand Down
30 changes: 17 additions & 13 deletions src/rpc/JsonParser.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

use Exception;
use think\swoole\contract\rpc\ParserInterface;
use think\swoole\rpc\server\Dispatcher;

class JsonParser implements ParserInterface
{
Expand All @@ -29,12 +30,11 @@ public function encode(Protocol $protocol): string
'jsonrpc' => self::VERSION,
'method' => $method,
'params' => $protocol->getParams(),
'context' => $protocol->getContext(),
'id' => '',
];

$string = json_encode($data, JSON_UNESCAPED_UNICODE);

return $string;
return json_encode($data, JSON_UNESCAPED_UNICODE);
}

/**
Expand All @@ -49,35 +49,40 @@ public function decode(string $string): Protocol
$error = json_last_error();
if ($error != JSON_ERROR_NONE) {
throw new Exception(
sprintf('Data(%s) is not json format!', $string)
sprintf('Data(%s) is not json format!', $string),
Dispatcher::PARSER_ERROR
);
}

$method = $data['method'] ?? '';
$params = $data['params'] ?? [];
$method = $data['method'] ?? '';
$params = $data['params'] ?? [];
$context = $data['context'] ?? [];

if (empty($method)) {
throw new Exception(
sprintf('Method(%s) cant not be empty!', $string)
sprintf('Method(%s) cant not be empty!', $string),
Dispatcher::INVALID_PARAMS
);
}

$methodAry = explode(self::DELIMITER, $method);
if (count($methodAry) < 2) {
throw new Exception(
sprintf('Method(%s) is bad format!', $method)
sprintf('Method(%s) is bad format!', $method),
Dispatcher::INVALID_PARAMS
);
}

[$interfaceClass, $methodName] = $methodAry;

if (empty($interfaceClass) || empty($methodName)) {
throw new Exception(
sprintf('Interface(%s) or Method(%s) can not be empty!', $interfaceClass, $method)
sprintf('Interface(%s) or Method(%s) can not be empty!', $interfaceClass, $method),
Dispatcher::INVALID_PARAMS
);
}

return Protocol::make($interfaceClass, $methodName, $params);
return Protocol::make($interfaceClass, $methodName, $params, $context);
}

/**
Expand Down Expand Up @@ -109,6 +114,7 @@ public function encodeResponse($result): string
{
$data = [
'jsonrpc' => self::VERSION,
'id' => '',
];

if ($result instanceof Error) {
Expand All @@ -117,8 +123,6 @@ public function encodeResponse($result): string
$data['result'] = $result;
}

$string = json_encode($data);

return $string;
return json_encode($data);
}
}
52 changes: 49 additions & 3 deletions src/rpc/Protocol.php
Original file line number Diff line number Diff line change
Expand Up @@ -22,22 +22,28 @@ class Protocol
*/
private $params = [];

/**
* @var array
*/
private $context = [];

/**
* Replace constructor
*
* @param string $interface
* @param string $method
* @param array $params
*
* @param array $params
* @param array $context
* @return Protocol
*/
public static function make(string $interface, string $method, array $params)
public static function make(string $interface, string $method, array $params, array $context = [])
{
$instance = new static();

$instance->interface = $interface;
$instance->method = $method;
$instance->params = $params;
$instance->context = $context;

return $instance;
}
Expand Down Expand Up @@ -66,4 +72,44 @@ public function getParams(): array
return $this->params;
}

/**
* @return array
*/
public function getContext(): array
{
return $this->context;
}

/**
* @param string $interface
*/
public function setInterface(string $interface): void
{
$this->interface = $interface;
}

/**
* @param string $method
*/
public function setMethod(string $method): void
{
$this->method = $method;
}

/**
* @param array $params
*/
public function setParams(array $params): void
{
$this->params = $params;
}

/**
* @param array $context
*/
public function setContext(array $context): void
{
$this->context = $context;
}

}
Loading

0 comments on commit 2f55208

Please sign in to comment.