diff --git a/src/Network/EventHandler.php b/src/Network/EventHandler.php index 83791173..2bbd778e 100644 --- a/src/Network/EventHandler.php +++ b/src/Network/EventHandler.php @@ -19,6 +19,7 @@ use Manticoresearch\Buddy\Core\Network\Request; use Manticoresearch\Buddy\Core\Network\Response; use Manticoresearch\Buddy\Core\Task\Column; +use Manticoresearch\Buddy\Core\Task\TaskPool; use Manticoresearch\Buddy\Core\Task\TaskResult; use Swoole\Http\Request as SwooleRequest; use Swoole\Http\Response as SwooleResponse; @@ -58,8 +59,9 @@ public static function request(SwooleRequest $request, SwooleResponse $response) return; } - $requestId = $request->header['Request-ID'] ?? '0'; - $result = static::process($requestId, $request->rawContent() ?: ''); + $requestId = $request->header['Request-ID'] ?? uniqid(more_entropy: true); + $body = $request->rawContent() ?: ''; + $result = static::process($requestId, $body); // Send response $response->header('Content-Type', 'application/json'); $response->status(200); @@ -76,10 +78,13 @@ public static function process(string $id, string $payload): Response { try { $request = Request::fromString($payload, $id); $handler = QueryProcessor::process($request)->run(); + // In case deferred we return the ID of the task not the request if ($handler->isDeferred()) { - $result = TaskResult::withData([['id' => $handler->getId()]]) - ->column('id', Column::Long); + $doneFn = TaskPool::add($id, $request->payload); + $handler->on('failure', $doneFn)->on('success', $doneFn); + $result = TaskResult::withData([['id' => $id]]) + ->column('id', Column::String); } else { $handler->wait(true); $result = $handler->getResult(); diff --git a/src/Network/Server.php b/src/Network/Server.php index 58a2eb7f..00354a31 100644 --- a/src/Network/Server.php +++ b/src/Network/Server.php @@ -11,6 +11,7 @@ namespace Manticoresearch\Buddy\Base\Network; +use Ds\Set; use Exception; use Manticoresearch\Buddy\Core\Network\Response; use Manticoresearch\Buddy\Core\Tool\Buddy; @@ -44,6 +45,9 @@ final class Server { /** @var int $ppid */ protected int $ppid; + /** @var Set $workerIds */ + protected Set $workerIds; + /** * @param array $config * @return void @@ -52,6 +56,16 @@ public function __construct(array $config = []) { $this->socket = new SwooleServer('127.0.0.1', 0); $this->socket->set($config); $this->ppid = posix_getppid(); + + $this->addTicker( + function () { + if (Process::kill($this->ppid, 0)) { + return; + } + Buddy::debug('Parrent proccess died, exiting…'); + $this->stop(); + }, 5 + ); } /** @@ -182,23 +196,19 @@ public function start(): static { // @phpcs:ignore Generic.CodeAnalysis.UnusedFunctionParameter.FoundBeforeLastUsed, SlevomatCodingStandard.Functions.UnusedParameter.UnusedParameter 'WorkerStart', function (SwooleServer $server, int $workerId) { if ($workerId !== 0) { + if (!isset($this->workerIds)) { + $this->workerIds = new Set; + } + + $this->workerIds->add($workerId); return; } - Timer::tick( - 5000, function () { - if (Process::kill($this->ppid, 0)) { - return; - } - - Buddy::debug('Parrent proccess died, exiting…'); - $this->stop(); - } - ); - // First add all ticks to run periodically + // First add all ticks to run periodically foreach ($this->ticks as [$fn, $period]) { Timer::tick( - $period * 1000, static function (/*int $timerId*/) use ($fn) { + $period * 1000, + static function (/*int $timerId*/) use ($fn) { go($fn); } ); @@ -234,12 +244,19 @@ public function process(string $requestId, string $payload): Response { * @return static */ public function stop($exit = true): static { - echo 'stop'; Timer::clearAll(); $this->socket->stop(); + if (isset($this->workerIds)) { + foreach ($this->workerIds as $workerId) { + $this->socket->stop($workerId); + } + } + $this->socket->shutdown(); if ($exit) { + // exec('pgrep -f executor | xargs kill -9'); exit(0); } + return $this; } } diff --git a/test/Buddy/functional/BuildTest.php b/test/Buddy/functional/BuildTest.php index 55e655f6..00af63b0 100644 --- a/test/Buddy/functional/BuildTest.php +++ b/test/Buddy/functional/BuildTest.php @@ -16,43 +16,43 @@ * And this version of binary run and also does not include any of dev composer deps in it */ final class BuildTest extends TestCase { - // public function testBuildIsOk(): void { - // static::buildBinary(); - // $this->assertEquals(true, file_exists('build/share/modules/manticore-buddy/src/main.php')); - // $this->assertEquals(true, file_exists('build/manticore-buddy')); - // } - - // public function testBuildHasRightComposerPackages(): void { - // /** @var array{require:array,require-dev:array} $composer */ - // $composer = json_decode((string)file_get_contents('/workdir/composer.json'), true); - // $include = array_keys($composer['require']); - // $exclude = array_keys($composer['require-dev']); - // $vendorPath = 'build/share/modules/manticore-buddy/vendor'; - // /** @var array{dev:bool,dev-package-names:array} $installed */ - // $installed = json_decode( - // (string)file_get_contents("$vendorPath/composer/installed.json"), - // true - // ); - // $this->assertEquals(false, $installed['dev']); - // $this->assertEquals([], $installed['dev-package-names']); - - - // $vendorPathIterator = new RecursiveDirectoryIterator($vendorPath); - // $vendorPathLen = strlen($vendorPath); - // $packages = []; - // /** @var SplFileInfo $file */ - // foreach (new RecursiveIteratorIterator($vendorPathIterator) as $file) { - // $ns = strtok(substr((string)$file, $vendorPathLen), '/'); - // $name = strtok('/'); - // $packages["$ns/$name"] = true; - // } - - // $packages = array_keys($packages); - // $this->assertEquals([], array_diff($include, $packages)); - // $this->assertEquals($exclude, array_diff($exclude, $packages)); - // } - - // protected static function buildBinary(): void { - // system('phar_builder/bin/build --name="Manticore Buddy" --package="manticore-buddy"'); - // } + public function testBuildIsOk(): void { + static::buildBinary(); + $this->assertEquals(true, file_exists('build/share/modules/manticore-buddy/src/main.php')); + $this->assertEquals(true, file_exists('build/manticore-buddy')); + } + + public function testBuildHasRightComposerPackages(): void { + /** @var array{require:array,require-dev:array} $composer */ + $composer = json_decode((string)file_get_contents('/workdir/composer.json'), true); + $include = array_keys($composer['require']); + $exclude = array_keys($composer['require-dev']); + $vendorPath = 'build/share/modules/manticore-buddy/vendor'; + /** @var array{dev:bool,dev-package-names:array} $installed */ + $installed = json_decode( + (string)file_get_contents("$vendorPath/composer/installed.json"), + true + ); + $this->assertEquals(false, $installed['dev']); + $this->assertEquals([], $installed['dev-package-names']); + + + $vendorPathIterator = new RecursiveDirectoryIterator($vendorPath); + $vendorPathLen = strlen($vendorPath); + $packages = []; + /** @var SplFileInfo $file */ + foreach (new RecursiveIteratorIterator($vendorPathIterator) as $file) { + $ns = strtok(substr((string)$file, $vendorPathLen), '/'); + $name = strtok('/'); + $packages["$ns/$name"] = true; + } + + $packages = array_keys($packages); + $this->assertEquals([], array_diff($include, $packages)); + $this->assertEquals($exclude, array_diff($exclude, $packages)); + } + + protected static function buildBinary(): void { + system('phar_builder/bin/build --name="Manticore Buddy" --package="manticore-buddy"'); + } } diff --git a/test/Buddy/functional/CliTableTest.php b/test/Buddy/functional/CliTableTest.php index e33dbe89..075d3bb6 100644 --- a/test/Buddy/functional/CliTableTest.php +++ b/test/Buddy/functional/CliTableTest.php @@ -92,11 +92,12 @@ public function testCliQueryOk(): void { } $result = preg_match( "/\+-+\+-+\+-+\+-+\+\n" - . "\| id \| query\s+\| protocol \| host\s+\|\n" + . "\| id\s+\| query\s+\| protocol \| host\s+\|\n" . "\+-+\+-+\+-+\+-+\+\n" - . "\| \d+ | SHOW QUERIES \| http\s+\| 127\.0\.0\.1:\d+ \|\n" + . "\| \d+ | select \| http\s+\| 127\.0\.0\.1:\d+ \|\n" + . "\| [a-z0-9\.]+ | SHOW QUERIES \| http\s+\| 127\.0\.0\.1:\d+ \|\n" . "\+-+\+-+\+-+\+-+\+\n" - . "1 row in set \(\d\.\d{3} sec\)\n/s", + . "2 rows in set \(\d\.\d{3} sec\)\n/s", $out[0]['columns'] ); $this->assertEquals(1, $result); diff --git a/test/Buddy/functional/HungRequestTest.php b/test/Buddy/functional/HungRequestTest.php index 28b5b00d..b4e13037 100644 --- a/test/Buddy/functional/HungRequestTest.php +++ b/test/Buddy/functional/HungRequestTest.php @@ -42,12 +42,8 @@ public function tearDown(): void { public function testDeferredHungRequestHandling(): void { $port = static::getListenHttpPort(); - $task1 = Task::create(...$this->generateTaskArgs([$port, 'test 4/deferred'])); - $task2 = Task::create(...$this->generateTaskArgs([$port, 'show queries'])); - $task1->run(); - usleep(1000000); - $task2->run(); - usleep(1000000); + $task1 = Task::create(...$this->generateTaskArgs([$port, 'test 4/deferred']))->run(); + $task2 = Task::create(...$this->generateTaskArgs([$port, 'show queries']))->run(); $this->assertEquals(TaskStatus::Finished, $task1->getStatus()); $this->assertEquals(true, $task1->isSucceed()); @@ -79,17 +75,14 @@ public function testDeferredHungRequestHandling(): void { */ public function testHungRequestHandling(): void { $port = static::getListenHttpPort(); - $task1 = Task::create(...$this->generateTaskArgs([$port, 'test 3'])); - $task2 = Task::create(...$this->generateTaskArgs([$port, 'show queries'])); - $task1->run(); - usleep(500000); - $task2->run(); - usleep(500000); - - $this->assertEquals(TaskStatus::Running, $task1->getStatus()); - $this->assertEquals(TaskStatus::Finished, $task2->getStatus()); - sleep(4); + $t = time(); + $task1 = Task::create(...$this->generateTaskArgs([$port, 'test 3']))->run(); + $task2 = Task::create(...$this->generateTaskArgs([$port, 'show queries']))->run(); + $diff = time() - $t; + // We check diff here cuz we usin exec, that is blocking for coroutine + $this->assertEquals(3, $diff); $this->assertEquals(TaskStatus::Finished, $task1->getStatus()); + $this->assertEquals(TaskStatus::Finished, $task2->getStatus()); $this->assertEquals([['total' => 0, 'error' => '', 'warning' => '']], $task1->getResult()->getStruct()); $this->assertEquals(TaskStatus::Finished, $task2->getStatus()); $this->assertEquals(true, $task2->isSucceed()); diff --git a/test/Buddy/src/CliTable/CliTableHandlerTest.php b/test/Buddy/src/CliTable/CliTableHandlerTest.php index fc95ea40..0b0a627f 100644 --- a/test/Buddy/src/CliTable/CliTableHandlerTest.php +++ b/test/Buddy/src/CliTable/CliTableHandlerTest.php @@ -58,12 +58,18 @@ public function testCliTableOk(): void { $refCls = new ReflectionClass($handler); $refCls->getProperty('manticoreClient')->setValue($handler, $manticoreClient); $refCls->getProperty('tableFormatter')->setValue($handler, $tableFormatter); - $task = $handler->run(); - $task->wait(); - $this->assertEquals(true, $task->isSucceed()); - $result = $task->getResult()->getStruct(); - $this->assertIsString($result); - $this->assertStringContainsString($respBody, $result); - self::finishMockManticoreServer(); + + go( + function () use ($handler, $respBody) { + $task = $handler->run(); + $task->wait(); + + $this->assertEquals(true, $task->isSucceed()); + $result = $task->getResult()->getStruct(); + $this->assertIsString($result); + $this->assertStringContainsString($respBody, $result); + self::finishMockManticoreServer(); + } + ); } } diff --git a/test/Buddy/src/InsertQuery/InsertQueryHandlerTest.php b/test/Buddy/src/InsertQuery/InsertQueryHandlerTest.php index 460147b5..94dbc325 100644 --- a/test/Buddy/src/InsertQuery/InsertQueryHandlerTest.php +++ b/test/Buddy/src/InsertQuery/InsertQueryHandlerTest.php @@ -141,13 +141,17 @@ protected function runTask(Request $networkRequest, string $serverUrl, string $r $handler = new Handler($payload); $handler->setManticoreClient($manticoreClient); ob_flush(); - $task = $handler->run(); - $task->wait(); + go( + function () use ($handler, $resp) { + $task = $handler->run(); + $task->wait(); - $this->assertEquals(true, $task->isSucceed()); - /** @var Response */ - $result = $task->getResult()->getStruct(); - $this->assertEquals($resp, json_encode($result)); + $this->assertEquals(true, $task->isSucceed()); + /** @var Response */ + $result = $task->getResult()->getStruct(); + $this->assertEquals($resp, json_encode($result)); + } + ); } public function testInsertQueryExecutesProperly(): void { diff --git a/test/Buddy/src/Lib/TaskTest.php b/test/Buddy/src/Lib/TaskTest.php deleted file mode 100644 index 90544f76..00000000 --- a/test/Buddy/src/Lib/TaskTest.php +++ /dev/null @@ -1,116 +0,0 @@ -assertEquals(false, $task->isDeferred()); - $this->assertEquals(TaskStatus::Pending, $task->getStatus()); - $task->run(); - $this->assertEquals(TaskStatus::Running, $task->getStatus()); - usleep(2500000); - $this->assertEquals(TaskStatus::Finished, $task->getStatus()); - $this->assertEquals(true, $task->isSucceed()); - $this->assertEquals('ok', $task->getResult()->getStruct()); - } - - // public function testTaskParallelRunWithArgumentsSucceed(): void { - // echo "\nTesting the task parallel run with arguments succeed\n"; - - // $arg = 'Hello world'; - // $task = Task::create( - // static function (string $arg): TaskResult { - // return TaskResult::raw($arg); - // }, - // [$arg] - // ); - - // $this->assertEquals(TaskStatus::Pending, $task->getStatus()); - // $task->run(); - // $this->assertEquals(TaskStatus::Running, $task->getStatus()); - // usleep(2500000); - // $this->assertEquals(TaskStatus::Finished, $task->getStatus()); - // $this->assertEquals(true, $task->isSucceed()); - // $this->assertEquals($arg, $task->getResult()->getStruct()); - // } - - // public function testTaskReturnsGenericErrorOnException(): void { - // echo "\nTesting the task's exception converts to generic error\n"; - // $errorMessage = 'Here we go'; - // $task = Task::create( - // static function () use ($errorMessage): bool { - // throw new Exception($errorMessage); - // } - // ); - // $this->assertEquals(TaskStatus::Pending, $task->getStatus()); - // $task->run(); - // $this->assertEquals(TaskStatus::Running, $task->getStatus()); - // usleep(500000); - // $this->assertEquals(TaskStatus::Finished, $task->getStatus()); - // $this->assertEquals(false, $task->isSucceed()); - // $error = $task->getError(); - // $this->assertEquals(true, $error instanceof GenericError); - // $this->assertEquals(Exception::class . ': ' . $errorMessage, $error->getMessage()); - // $this->assertEquals($errorMessage, $error->getResponseError()); - // } - - // public function testTaskReturnsGenericErrorOnCustomException(): void { - // echo "\nTesting the task's custom exception converts to generic error\n"; - // $errorMessage = 'Custom error message'; - // $task = Task::create( - // static function () use ($errorMessage): bool { - // throw new BuddyRequestError($errorMessage); - // } - // ); - // $this->assertEquals(TaskStatus::Pending, $task->getStatus()); - // $task->run(); - // $this->assertEquals(TaskStatus::Running, $task->getStatus()); - // usleep(500000); - // $this->assertEquals(TaskStatus::Finished, $task->getStatus()); - // $this->assertEquals(false, $task->isSucceed()); - // $error = $task->getError(); - // $this->assertEquals(true, $error instanceof GenericError); - // $this->assertEquals(BuddyRequestError::class . ': ' . $errorMessage, $error->getMessage()); - // $this->assertEquals($errorMessage, $error->getResponseError()); - // } - - // public function testTaskDeferredHasFLag(): void { - // echo "\nTesting the task parallel run has deferred flag\n"; - // $task = Task::defer( - // static function (): TaskResult { - // usleep(2000000); - // return TaskResult::raw('ok'); - // } - // ); - // $this->assertEquals(true, $task->isDeferred()); - // $this->assertEquals(TaskStatus::Pending, $task->getStatus()); - // $task->run(); - // $this->assertEquals(TaskStatus::Running, $task->getStatus()); - // usleep(2500000); - // $this->assertEquals(TaskStatus::Finished, $task->getStatus()); - // $this->assertEquals(true, $task->isSucceed()); - // $this->assertEquals('ok', $task->getResult()->getStruct()); - // } -} diff --git a/test/Buddy/src/Network/ManticoreClient/HTTPClientTest.php b/test/Buddy/src/Network/ManticoreClient/HTTPClientTest.php index e3ce5f34..f467bbf9 100644 --- a/test/Buddy/src/Network/ManticoreClient/HTTPClientTest.php +++ b/test/Buddy/src/Network/ManticoreClient/HTTPClientTest.php @@ -11,7 +11,6 @@ //use Manticoresearch\Buddy\Core\Error\ManticoreSearchClientError; use Manticoresearch\Buddy\Core\ManticoreSearch\Client as HTTPClient; -use Manticoresearch\Buddy\Core\ManticoreSearch\Endpoint as ManticoreEndpoint; use Manticoresearch\Buddy\Core\ManticoreSearch\Response; use Manticoresearch\BuddyTest\Trait\TestProtectedTrait; use PHPUnit\Framework\TestCase; @@ -41,12 +40,8 @@ public function testManticoreHTTPClientCreate(): void { HTTPClient::DEFAULT_URL, $this->refCls->getProperty('url')->getValue($this->client) ); - $this->assertEquals( - ManticoreEndpoint::Sql->value, - $this->refCls->getProperty('path')->getValue($this->client) - ); - $client = new HTTPClient(new Response(), 'localhost:1000', ManticoreEndpoint::Insert); + $client = new HTTPClient(new Response(), 'localhost:1000'); $this->assertInstanceOf(HTTPClient::class, $client); } diff --git a/test/Buddy/src/Network/ServerTest.php b/test/Buddy/src/Network/ServerTest.php deleted file mode 100644 index ccb65406..00000000 --- a/test/Buddy/src/Network/ServerTest.php +++ /dev/null @@ -1,178 +0,0 @@ -tearDown(); - } - self::$server = Server::create(); - $refCls = new ReflectionClass(self::$server); - $socket = $refCls->getProperty('socket')->getValue(self::$server); - if (!is_object($socket) || !is_a($socket, SocketServer::class)) { - $this->fail(); - } - return $socket; - } - - public function tearDown(): void { - if (self::$onTearDown !== null) { - $fn = self::$onTearDown; - $fn(); - } - if (self::$server === null) { - return; - } - // Closing an open Server socket to let unitest finish properly - $refCls = new ReflectionClass(self::$server); - $socket = $refCls->getProperty('socket')->getValue(self::$server); - if (!is_object($socket) || !is_a($socket, SocketServer::class)) { - $this->fail(); - } - $socket->close(); - } - - public function testServerCreate(): void { - echo "\nTesting the creation of a Server instance \n"; - self::$server = Server::create(); - $this->assertInstanceOf(Server::class, self::$server); - } - - public function testListenAddr(): void { - echo "\nTesting the generation of a random listen address\n"; - $socket = $this->getSocket(); - $addr1 = $socket->getAddress(); - $socket = $this->getSocket(); - $addr2 = $socket->getAddress(); - $this->assertNotEquals($addr1, $addr2); - } - - public function testServerOutput(): void { - echo "\nTesting the starting output from Server\n"; - // Using flush here to not add unit test output to server output - ob_flush(); - $socket = $this->getSocket(); - $addr = $socket->getAddress(); - if (self::$server === null || $addr === null) { - $this->fail(); - } - $addr = trim($addr, 'tcp://'); - $version = Buddy::getVersion(); - $this->expectOutputString("Buddy v{$version} started {$addr}\n"); - try { - self::$server->start(); - } catch (Exception $e) { - } - } - - // public function testServerTicker(): void { - // echo "\nTesting the execution of server tickers\n"; - // $testFilepath = sys_get_temp_dir() . DIRECTORY_SEPARATOR . 'test.log'; - // $server = Server::create(); - // $server - // ->addHandler('request', EventHandler::request(...)) - // ->addHandler('error', EventHandler::error(...)) - // ->addTicker( - // static function () use ($testFilepath) { - // file_put_contents($testFilepath, 'Ok'); - // }, 1 - // ); - - // try { - // $server->start(); - // sleep(2); - // $server->stop(false); - // } catch (Throwable $t) { - // ob_flush(); - // } - - // $this->assertEquals('Ok', file_get_contents($testFilepath)); - // unlink($testFilepath); - // } - - public function testClientTicker(): void { - echo "\nTesting the execution of client tickers\n"; - $this->serverTestHelper('ticker'); - } - - public function testClientHandler(): void { - echo "\nTesting the execution of server handlers\n"; - $this->serverTestHelper('handler'); - } - - /** - * Helper function for the testing of Server connection - * - * @param string $testTarget - * @return void - */ - protected function serverTestHelper(string $testTarget): void { - $testFilepath = getcwd() . DIRECTORY_SEPARATOR . 'test.log'; - $fn = function () use ($testFilepath) { - file_put_contents($testFilepath, 'Ok'); - Loop::stop(); - }; - - $server = Server::create(); - $server->addHandler('request', EventHandler::request(...)); - switch ($testTarget) { - case 'ticker': - $server->addTicker($fn, 1); - break; - case 'handler': - $server->addHandler('close', $fn); - break; - default: - break; - } - $server->start(); - - $refCls = new ReflectionClass($server); - $socket = $refCls->getProperty('socket')->getValue($server); - $this->assertInstanceOf(SocketServer::class, $socket); - - $addr = $socket->getAddress(); - $this->assertIsString($addr); - - /** @var string $addr */ - $addr = trim($addr, 'tcp://'); - [$host, $port] = explode(':', $addr); - $clientSocket = socket_create(AF_INET, SOCK_STREAM, SOL_TCP); - $this->assertInstanceOf(Socket::class, $clientSocket); - /** @var Socket $clientSocket */ - socket_connect($clientSocket, $host, (int)$port); - socket_close($clientSocket); - - sleep(1); - } - -} diff --git a/test/Buddy/src/ShowQueries/ExecutorTest.php b/test/Buddy/src/ShowQueries/ExecutorTest.php index be492e42..cd72eb28 100644 --- a/test/Buddy/src/ShowQueries/ExecutorTest.php +++ b/test/Buddy/src/ShowQueries/ExecutorTest.php @@ -59,12 +59,16 @@ public function testShowQueriesExecutesProperly(): void { $handler = new Handler($payload); $handler->setManticoreClient($manticoreClient); $handler->setTableFormatter(new TableFormatter()); - $task = $handler->run(); - $task->wait(); - $this->assertEquals(true, $task->isSucceed()); - $result = $task->getResult()->getStruct(); - $this->assertIsArray($result); - $this->assertEquals($respBody, $result); - self::finishMockManticoreServer(); + go( + function () use ($handler, $respBody) { + $task = $handler->run(); + $task->wait(); + $this->assertEquals(true, $task->isSucceed()); + $result = $task->getResult()->getStruct(); + $this->assertIsArray($result); + $this->assertEquals($respBody, $result); + self::finishMockManticoreServer(); + } + ); } }