Skip to content

Commit

Permalink
Fix task pool and improve Client to fix stateful model issues, fix tests
Browse files Browse the repository at this point in the history
  • Loading branch information
donhardman committed Oct 24, 2023
1 parent 150d5b6 commit 38c02b0
Show file tree
Hide file tree
Showing 11 changed files with 126 additions and 395 deletions.
13 changes: 9 additions & 4 deletions src/Network/EventHandler.php
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
use Manticoresearch\Buddy\Core\Network\Request;
use Manticoresearch\Buddy\Core\Network\Response;
use Manticoresearch\Buddy\Core\Task\Column;
use Manticoresearch\Buddy\Core\Task\TaskPool;
use Manticoresearch\Buddy\Core\Task\TaskResult;
use Swoole\Http\Request as SwooleRequest;
use Swoole\Http\Response as SwooleResponse;
Expand Down Expand Up @@ -58,8 +59,9 @@ public static function request(SwooleRequest $request, SwooleResponse $response)
return;
}

$requestId = $request->header['Request-ID'] ?? '0';
$result = static::process($requestId, $request->rawContent() ?: '');
$requestId = $request->header['Request-ID'] ?? uniqid(more_entropy: true);
$body = $request->rawContent() ?: '';
$result = static::process($requestId, $body);
// Send response
$response->header('Content-Type', 'application/json');
$response->status(200);
Expand All @@ -76,10 +78,13 @@ public static function process(string $id, string $payload): Response {
try {
$request = Request::fromString($payload, $id);
$handler = QueryProcessor::process($request)->run();

// In case deferred we return the ID of the task not the request
if ($handler->isDeferred()) {
$result = TaskResult::withData([['id' => $handler->getId()]])
->column('id', Column::Long);
$doneFn = TaskPool::add($id, $request->payload);
$handler->on('failure', $doneFn)->on('success', $doneFn);
$result = TaskResult::withData([['id' => $id]])
->column('id', Column::String);
} else {
$handler->wait(true);
$result = $handler->getResult();
Expand Down
43 changes: 30 additions & 13 deletions src/Network/Server.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

namespace Manticoresearch\Buddy\Base\Network;

use Ds\Set;
use Exception;
use Manticoresearch\Buddy\Core\Network\Response;
use Manticoresearch\Buddy\Core\Tool\Buddy;
Expand Down Expand Up @@ -44,6 +45,9 @@ final class Server {
/** @var int $ppid */
protected int $ppid;

/** @var Set<int> $workerIds */
protected Set $workerIds;

/**
* @param array<string,mixed> $config
* @return void
Expand All @@ -52,6 +56,16 @@ public function __construct(array $config = []) {
$this->socket = new SwooleServer('127.0.0.1', 0);
$this->socket->set($config);
$this->ppid = posix_getppid();

$this->addTicker(
function () {
if (Process::kill($this->ppid, 0)) {
return;
}
Buddy::debug('Parrent proccess died, exiting…');
$this->stop();
}, 5
);
}

/**
Expand Down Expand Up @@ -182,23 +196,19 @@ public function start(): static {
// @phpcs:ignore Generic.CodeAnalysis.UnusedFunctionParameter.FoundBeforeLastUsed, SlevomatCodingStandard.Functions.UnusedParameter.UnusedParameter
'WorkerStart', function (SwooleServer $server, int $workerId) {
if ($workerId !== 0) {
if (!isset($this->workerIds)) {
$this->workerIds = new Set;
}

$this->workerIds->add($workerId);
return;
}
Timer::tick(
5000, function () {
if (Process::kill($this->ppid, 0)) {
return;
}

Buddy::debug('Parrent proccess died, exiting…');
$this->stop();
}
);

// First add all ticks to run periodically
// First add all ticks to run periodically
foreach ($this->ticks as [$fn, $period]) {
Timer::tick(
$period * 1000, static function (/*int $timerId*/) use ($fn) {
$period * 1000,
static function (/*int $timerId*/) use ($fn) {
go($fn);
}
);
Expand Down Expand Up @@ -234,12 +244,19 @@ public function process(string $requestId, string $payload): Response {
* @return static
*/
public function stop($exit = true): static {
echo 'stop';
Timer::clearAll();
$this->socket->stop();
if (isset($this->workerIds)) {
foreach ($this->workerIds as $workerId) {
$this->socket->stop($workerId);
}
}
$this->socket->shutdown();
if ($exit) {
// exec('pgrep -f executor | xargs kill -9');
exit(0);
}

return $this;
}
}
78 changes: 39 additions & 39 deletions test/Buddy/functional/BuildTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -16,43 +16,43 @@
* And this version of binary run and also does not include any of dev composer deps in it
*/
final class BuildTest extends TestCase {
// public function testBuildIsOk(): void {
// static::buildBinary();
// $this->assertEquals(true, file_exists('build/share/modules/manticore-buddy/src/main.php'));
// $this->assertEquals(true, file_exists('build/manticore-buddy'));
// }

// public function testBuildHasRightComposerPackages(): void {
// /** @var array{require:array<string,string>,require-dev:array<string,string>} $composer */
// $composer = json_decode((string)file_get_contents('/workdir/composer.json'), true);
// $include = array_keys($composer['require']);
// $exclude = array_keys($composer['require-dev']);
// $vendorPath = 'build/share/modules/manticore-buddy/vendor';
// /** @var array{dev:bool,dev-package-names:array<string,string>} $installed */
// $installed = json_decode(
// (string)file_get_contents("$vendorPath/composer/installed.json"),
// true
// );
// $this->assertEquals(false, $installed['dev']);
// $this->assertEquals([], $installed['dev-package-names']);


// $vendorPathIterator = new RecursiveDirectoryIterator($vendorPath);
// $vendorPathLen = strlen($vendorPath);
// $packages = [];
// /** @var SplFileInfo $file */
// foreach (new RecursiveIteratorIterator($vendorPathIterator) as $file) {
// $ns = strtok(substr((string)$file, $vendorPathLen), '/');
// $name = strtok('/');
// $packages["$ns/$name"] = true;
// }

// $packages = array_keys($packages);
// $this->assertEquals([], array_diff($include, $packages));
// $this->assertEquals($exclude, array_diff($exclude, $packages));
// }

// protected static function buildBinary(): void {
// system('phar_builder/bin/build --name="Manticore Buddy" --package="manticore-buddy"');
// }
public function testBuildIsOk(): void {
static::buildBinary();
$this->assertEquals(true, file_exists('build/share/modules/manticore-buddy/src/main.php'));
$this->assertEquals(true, file_exists('build/manticore-buddy'));
}

public function testBuildHasRightComposerPackages(): void {
/** @var array{require:array<string,string>,require-dev:array<string,string>} $composer */
$composer = json_decode((string)file_get_contents('/workdir/composer.json'), true);
$include = array_keys($composer['require']);
$exclude = array_keys($composer['require-dev']);
$vendorPath = 'build/share/modules/manticore-buddy/vendor';
/** @var array{dev:bool,dev-package-names:array<string,string>} $installed */
$installed = json_decode(
(string)file_get_contents("$vendorPath/composer/installed.json"),
true
);
$this->assertEquals(false, $installed['dev']);
$this->assertEquals([], $installed['dev-package-names']);


$vendorPathIterator = new RecursiveDirectoryIterator($vendorPath);
$vendorPathLen = strlen($vendorPath);
$packages = [];
/** @var SplFileInfo $file */
foreach (new RecursiveIteratorIterator($vendorPathIterator) as $file) {
$ns = strtok(substr((string)$file, $vendorPathLen), '/');
$name = strtok('/');
$packages["$ns/$name"] = true;
}

$packages = array_keys($packages);
$this->assertEquals([], array_diff($include, $packages));
$this->assertEquals($exclude, array_diff($exclude, $packages));
}

protected static function buildBinary(): void {
system('phar_builder/bin/build --name="Manticore Buddy" --package="manticore-buddy"');
}
}
7 changes: 4 additions & 3 deletions test/Buddy/functional/CliTableTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -92,11 +92,12 @@ public function testCliQueryOk(): void {
}
$result = preg_match(
"/\+-+\+-+\+-+\+-+\+\n"
. "\| id \| query\s+\| protocol \| host\s+\|\n"
. "\| id\s+\| query\s+\| protocol \| host\s+\|\n"
. "\+-+\+-+\+-+\+-+\+\n"
. "\| \d+ | SHOW QUERIES \| http\s+\| 127\.0\.0\.1:\d+ \|\n"
. "\| \d+ | select \| http\s+\| 127\.0\.0\.1:\d+ \|\n"
. "\| [a-z0-9\.]+ | SHOW QUERIES \| http\s+\| 127\.0\.0\.1:\d+ \|\n"
. "\+-+\+-+\+-+\+-+\+\n"
. "1 row in set \(\d\.\d{3} sec\)\n/s",
. "2 rows in set \(\d\.\d{3} sec\)\n/s",
$out[0]['columns']
);
$this->assertEquals(1, $result);
Expand Down
25 changes: 9 additions & 16 deletions test/Buddy/functional/HungRequestTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,8 @@ public function tearDown(): void {

public function testDeferredHungRequestHandling(): void {
$port = static::getListenHttpPort();
$task1 = Task::create(...$this->generateTaskArgs([$port, 'test 4/deferred']));
$task2 = Task::create(...$this->generateTaskArgs([$port, 'show queries']));
$task1->run();
usleep(1000000);
$task2->run();
usleep(1000000);
$task1 = Task::create(...$this->generateTaskArgs([$port, 'test 4/deferred']))->run();
$task2 = Task::create(...$this->generateTaskArgs([$port, 'show queries']))->run();

$this->assertEquals(TaskStatus::Finished, $task1->getStatus());
$this->assertEquals(true, $task1->isSucceed());
Expand Down Expand Up @@ -79,17 +75,14 @@ public function testDeferredHungRequestHandling(): void {
*/
public function testHungRequestHandling(): void {
$port = static::getListenHttpPort();
$task1 = Task::create(...$this->generateTaskArgs([$port, 'test 3']));
$task2 = Task::create(...$this->generateTaskArgs([$port, 'show queries']));
$task1->run();
usleep(500000);
$task2->run();
usleep(500000);

$this->assertEquals(TaskStatus::Running, $task1->getStatus());
$this->assertEquals(TaskStatus::Finished, $task2->getStatus());
sleep(4);
$t = time();
$task1 = Task::create(...$this->generateTaskArgs([$port, 'test 3']))->run();
$task2 = Task::create(...$this->generateTaskArgs([$port, 'show queries']))->run();
$diff = time() - $t;
// We check diff here cuz we usin exec, that is blocking for coroutine
$this->assertEquals(3, $diff);
$this->assertEquals(TaskStatus::Finished, $task1->getStatus());
$this->assertEquals(TaskStatus::Finished, $task2->getStatus());
$this->assertEquals([['total' => 0, 'error' => '', 'warning' => '']], $task1->getResult()->getStruct());
$this->assertEquals(TaskStatus::Finished, $task2->getStatus());
$this->assertEquals(true, $task2->isSucceed());
Expand Down
20 changes: 13 additions & 7 deletions test/Buddy/src/CliTable/CliTableHandlerTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,18 @@ public function testCliTableOk(): void {
$refCls = new ReflectionClass($handler);
$refCls->getProperty('manticoreClient')->setValue($handler, $manticoreClient);
$refCls->getProperty('tableFormatter')->setValue($handler, $tableFormatter);
$task = $handler->run();
$task->wait();
$this->assertEquals(true, $task->isSucceed());
$result = $task->getResult()->getStruct();
$this->assertIsString($result);
$this->assertStringContainsString($respBody, $result);
self::finishMockManticoreServer();

go(
function () use ($handler, $respBody) {
$task = $handler->run();
$task->wait();

$this->assertEquals(true, $task->isSucceed());
$result = $task->getResult()->getStruct();
$this->assertIsString($result);
$this->assertStringContainsString($respBody, $result);
self::finishMockManticoreServer();
}
);
}
}
16 changes: 10 additions & 6 deletions test/Buddy/src/InsertQuery/InsertQueryHandlerTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -141,13 +141,17 @@ protected function runTask(Request $networkRequest, string $serverUrl, string $r
$handler = new Handler($payload);
$handler->setManticoreClient($manticoreClient);
ob_flush();
$task = $handler->run();
$task->wait();
go(
function () use ($handler, $resp) {
$task = $handler->run();
$task->wait();

$this->assertEquals(true, $task->isSucceed());
/** @var Response */
$result = $task->getResult()->getStruct();
$this->assertEquals($resp, json_encode($result));
$this->assertEquals(true, $task->isSucceed());
/** @var Response */
$result = $task->getResult()->getStruct();
$this->assertEquals($resp, json_encode($result));
}
);
}

public function testInsertQueryExecutesProperly(): void {
Expand Down
Loading

0 comments on commit 38c02b0

Please sign in to comment.