Skip to content

Commit

Permalink
Plugin controllable process (#201)
Browse files Browse the repository at this point in the history
* Add possibility to run process and communicated with it owned by plugin

* Update the Process launching logic

---------

Co-authored-by: Sergey Nikolaev <[email protected]>
  • Loading branch information
donhardman and sanikolaev authored Feb 8, 2024
1 parent 191ad89 commit 10a0be1
Show file tree
Hide file tree
Showing 7 changed files with 146 additions and 11 deletions.
45 changes: 45 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,51 @@ ghcr.io/manticoresoftware/manticoresearch:test-kit-latest -c './phar_builder/bin
Check the build directory and get the built version of Buddy from there and replace it in your another OS in the "modules" directory.
#### Run custom process inside the Plugin
To run the process that can maintain some logic and communicate you need to create the `Processor` class and add `getProcessors` method to the `Payload`
Here is the example that explain how to do So
Create `Processor` plugin and implement required logic
```php
<?php declare(strict_types=1);
… your NS and other copywrite here …
use Manticoresearch\Buddy\Core\Process\BaseProcessor;
use Manticoresearch\Buddy\Core\Process\Process;
final class Processor extends BaseProcessor {
public function start(): void {
var_dump('starting');
parent::start();
$this->execute('test', ['simple message']);
}
public function stop(): void {
var_dump('stopping');
parent::stop();
}
public static function test(string $text): void {
var_dump($text);
}
}
```
Add to the `Payload` info that your plugin has processors
```php
public static function getProcessors(): array {
return [
new Processor(),
];
}
````
### Communication protocol
Manticore Buddy and Manticore Search communicates with each other in strict accordance with the following protocol:
Expand Down
2 changes: 1 addition & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
"react/socket": "^1.12",
"manticoresoftware/telemetry": "^0.1.9",
"symfony/dependency-injection": "^6.1",
"manticoresoftware/buddy-core": "dev-main",
"manticoresoftware/buddy-core": "dev-feature/controllable-process",
"php-ds/php-ds": "^1.4",
"manticoresoftware/manticoresearch-backup": "^1.3"
},
Expand Down
15 changes: 7 additions & 8 deletions composer.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

42 changes: 42 additions & 0 deletions src/Lib/QueryProcessor.php
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
use Manticoresearch\Buddy\Core\Network\Request;
use Manticoresearch\Buddy\Core\Plugin\BaseHandler;
use Manticoresearch\Buddy\Core\Plugin\Pluggable;
use Manticoresearch\Buddy\Core\Process\BaseProcessor;
use Manticoresearch\Buddy\Core\Tool\Buddy;
use Manticoresearch\Buddy\Core\Tool\Strings;
use Psr\Container\ContainerInterface;
Expand Down Expand Up @@ -106,6 +107,47 @@ public static function init(): void {
static::$isInited = true;
}

/**
* Run start method of all plugin handlers
* @return void
*/
public static function startPlugins(): void {
static::iteratePluginProcessors(
static function (BaseProcessor $processor) {
$processor->start();
}
);
}

/**
* Run stop method of all plugin handlers
* @return void
*/
public static function stopPlugins(): void {
static::iteratePluginProcessors(
static function (BaseProcessor $processor) {
$processor->stop();
}
);
}

/**
* @param callable $fn
* @return void
*/
protected static function iteratePluginProcessors(callable $fn): void {
$list = [
static::CORE_NS_PREFIX => static::$corePlugins,
static::EXTRA_NS_PREFIX => static::$extraPlugins,
];
foreach ($list as $prefix => $plugins) {
foreach ($plugins as $plugin) {
$pluginPrefix = $prefix . ucfirst(Strings::camelcaseBySeparator($plugin['short'], '-'));
$pluginPayloadClass = "$pluginPrefix\\Payload";
array_map($fn, $pluginPayloadClass::getProcessors());
}
}
}
/**
* Helper to set settings when we need it before calling to init
* @param Settings $settings
Expand Down
2 changes: 0 additions & 2 deletions src/Network/EventHandler.php
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,6 @@ protected static function shouldProxyError(Throwable $e): bool {
|| ($e instanceof GenericError && $e->getProxyOriginalError());
}



/**
* Main handler for HTTP request that returns HttpResponse
*
Expand Down
40 changes: 40 additions & 0 deletions src/Network/Server.php
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,12 @@ final class Server {
/** @var array<callable> */
protected array $beforeStart = [];

/** @var array<callable> */
protected array $onstop = [];

/** @var array<callable> */
protected array $beforeStop = [];

/** @var string $bind */
protected string $bind;

Expand Down Expand Up @@ -115,6 +121,16 @@ public function onStart(callable $fn): static {
return $this;
}

/**
* Add function to be called and processed on server stop
* @param callable $fn
* @return static
*/
public function onStop(callable $fn): static {
$this->onstop[] = $fn;
return $this;
}

/**
* Add function to be called before we starting server
* @param callable $fn
Expand All @@ -125,6 +141,16 @@ public function beforeStart(callable $fn): static {
return $this;
}

/**
* Add function to be colled before the shutdown
* @param callable $fn
* @return static
*/
public function beforeStop(callable $fn): static {
$this->beforeStop[] = $fn;
return $this;
}

/**
* Add process to the server loop
* @param Process $process
Expand Down Expand Up @@ -192,6 +218,15 @@ public function start(): static {
}
);

// Register shutdown on stop
$this->socket->on(
'shutdown', function () {
// Process first functions to run on start
foreach ($this->onstop as $fn) {
$fn();
}
}
);

$this->socket->on(
// @phpcs:ignore Generic.CodeAnalysis.UnusedFunctionParameter.FoundBeforeLastUsed, SlevomatCodingStandard.Functions.UnusedParameter.UnusedParameter
Expand Down Expand Up @@ -246,6 +281,11 @@ public function process(string $requestId, string $payload): Response {
*/
public function stop($exit = true): static {
Timer::clearAll();
// Process first functions to run on start
foreach ($this->beforeStop as $fn) {
$fn();
}

$this->socket->stop();
if (isset($this->workerIds)) {
foreach ($this->workerIds as $workerId) {
Expand Down
11 changes: 11 additions & 0 deletions src/main.php
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,17 @@ static function () {
ini_set('post_max_size', $settings->maxAllowedPacket);
}
)
// We need to run it outside of couroutine so do it before
->beforeStart(
static function () {
QueryProcessor::startPlugins();
}
)
->beforeStop(
static function () {
QueryProcessor::stopPlugins();
}
)
->beforeStart(
static function () use ($server) {
$process = ShardingThread::instance()->process;
Expand Down

0 comments on commit 10a0be1

Please sign in to comment.