Skip to content

Commit

Permalink
Merge pull request #1 from SAREhub/first
Browse files Browse the repository at this point in the history
First version
  • Loading branch information
Mararok authored Oct 25, 2016
2 parents 4013185 + 87ea383 commit 2cbc81a
Show file tree
Hide file tree
Showing 74 changed files with 4,781 additions and 4 deletions.
1 change: 0 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
sudo: false
language: php
php:
- 5.6
- 7.0
branches:
only:
Expand Down
28 changes: 28 additions & 0 deletions cli/config.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
<?php

use SAREhub\Commons\Misc\Dsn;

return [
'manager' => [
'configRootPath' => __DIR__.'/managers',
'forwarders' => [
'commandOutput' => [
'input' => Dsn::tcp()->endpoint('127.0.0.1:40001'),
'output' => Dsn::tcp()->endpoint('127.0.0.1:40002')
],
'commandReplyInput' => [
'input' => Dsn::tcp()->endpoint('127.0.0.1:40005'),
'output' => Dsn::tcp()->endpoint('127.0.0.1:40006')
]
],
],
'commandService' => [
'commandOutput' => [
'endpoint' => Dsn::tcp()->endpoint('127.0.0.1:40001')
],
'commandReplyInput' => [
'topic' => 'worker-cli',
'endpoint' => Dsn::tcp()->endpoint('127.0.0.1:40006')
]
]
];
36 changes: 36 additions & 0 deletions cli/managers/test.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
<?php

use Monolog\Handler\StreamHandler;
use Monolog\Logger;
use SAREhub\Commons\Misc\Dsn;
use SAREhub\Component\Worker\Manager\ManagerCommands;

return [
'id' => 'test',
'manager' => [
'processService' => [
'runnerScript' => dirname(__DIR__).'/test_worker/testWorkerRunner.php',
'arguments' => [],
'workingDirectory' => './'
],
'commandService' => [
'commandOutput' => [
'endpoint' => Dsn::tcp()->endpoint('127.0.0.1:40003')
],
'commandReplyInput' => [
'topic' => 'worker.command.reply',
'endpoint' => Dsn::tcp()->endpoint('127.0.0.1:40004')
]
],
'startCommands' => [
ManagerCommands::start('1', '1'),
ManagerCommands::start('2', '2'),
ManagerCommands::start('3', '3')
]
],
'logger' => [
'handlers' => [
new StreamHandler(dirname(__DIR__).'/test_worker/managerLog', Logger::DEBUG)
]
]
];
76 changes: 76 additions & 0 deletions cli/test_worker/testWorkerRunner.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
<?php

date_default_timezone_set('Europe/Warsaw');

use Monolog\Handler\StreamHandler;
use Monolog\Logger;
use SAREhub\Commons\Misc\Dsn;
use SAREhub\Commons\Zmq\PublishSubscribe\Publisher;
use SAREhub\Commons\Zmq\PublishSubscribe\Subscriber;
use SAREhub\Component\Worker\BasicWorker;
use SAREhub\Component\Worker\Command\Command;
use SAREhub\Component\Worker\Command\JsonCommandFormat;
use SAREhub\Component\Worker\Command\ZmqCommandInput;
use SAREhub\Component\Worker\Command\ZmqCommandReplyOutput;
use SAREhub\Component\Worker\WorkerContext;
use SAREhub\Component\Worker\WorkerRunner;

require dirname(dirname(__DIR__)).'/vendor/autoload.php';

class TestWorker extends BasicWorker {

protected function doStart() {
$this->logInfo('doStart');
}

protected function doTick() {
$this->logInfo('doTick');
sleep(1); // hard work simulation
}

protected function doStop() {
$this->logInfo('doStop');
}

protected function doCommand(Command $command, callable $replyCallback) {
$this->logInfo('doCommand: '.$command);
}

private function logInfo($message) {
$this->getLogger()->info(sprintf($message, $this->getId()));
}
}

$context = WorkerContext::newInstance()
->withId($argv[1])
->withRootPath(__DIR__);

$logHandler = new StreamHandler(__DIR__.'/log', Logger::DEBUG);
$logger = new Logger('test_worker_'.$context->getId(), [$logHandler]);
try {
$zmqContext = new ZMQContext();
$runner = WorkerRunner::newInstance()
->withWorker(new TestWorker($context))
->withCommandInput(ZmqCommandInput::newInstance()
->withCommandSubscriber(Subscriber::inContext($zmqContext)
->subscribe($context->getId())
->connect(Dsn::tcp()->endpoint('127.0.0.1:40003'))
)
->withCommandFormat(JsonCommandFormat::newInstance()))
->withCommandReplyOutput(ZmqCommandReplyOutput::newInstance()
->withPublisher(Publisher::inContext($zmqContext)
->connect(Dsn::tcp()->endpoint('127.0.0.1:40004')))
->withPublishTopic('worker.command.reply')
)->usePcntl();

$runner->getWorker()->setLogger($logger);
$runner->setLogger(new Logger($logger->getName().'_runner', [$logHandler]));
$runner->start();
while ($runner->isRunning()) {
$runner->tick();
}
$runner->stop();

} catch (Exception $e) {
$logger->error($e);
}
55 changes: 55 additions & 0 deletions cli/worker-cli
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
#!/usr/bin/env php
<?php

use Monolog\Handler\StreamHandler;
use Monolog\Logger;
use SAREhub\Commons\Misc\Parameters;
use SAREhub\Commons\Zmq\PublishSubscribe\Publisher;
use SAREhub\Commons\Zmq\PublishSubscribe\Subscriber;
use SAREhub\Component\Worker\Cli\Cli;
use SAREhub\Component\Worker\Cli\StartManagerCommand;
use SAREhub\Component\Worker\Cli\StartWorkerCommand;
use SAREhub\Component\Worker\Cli\StopManagerCommand;
use SAREhub\Component\Worker\Cli\StopWorkerCommand;
use SAREhub\Component\Worker\Cli\SystemdHelper;
use SAREhub\Component\Worker\Command\CommandService;
use SAREhub\Component\Worker\Command\JsonCommandFormat;
use SAREhub\Component\Worker\Command\ZmqCommandOutput;
use SAREhub\Component\Worker\Command\ZmqCommandReplyInput;
use Symfony\Component\Console\Application;

require dirname(__DIR__).'/vendor/autoload.php';

$config = new Parameters(include(__DIR__.'/config.php'));
$commandServiceConfig = $config->getRequiredAsMap('commandService');

$zmqContext = new ZMQContext();
$sessionId = uniqid(mt_rand(10000, 100000).time());

$handlers = [new StreamHandler(__DIR__.'/cliLog')];

Cli::newInstance()
->withApplication(new Application('Worker CLI', '0.1'))
->withConfig($config)
->withSessionId($sessionId)
->withLoggerFactory(function ($name) use ($handlers) {
return new Logger($name, $handlers);
})
->withCommandService(
CommandService::newInstance()
->withCommandOutput(ZmqCommandOutput::newInstance()
->withPublisher(Publisher::inContext($zmqContext)
->connect($commandServiceConfig->getRequiredAsMap('commandOutput')->getRequired('endpoint')))
->withCommandFormat(JsonCommandFormat::newInstance())
)->withCommandReplyInput(ZmqCommandReplyInput::newInstance()
->withSubscriber(Subscriber::inContext($zmqContext)
->connect($commandServiceConfig->getRequiredAsMap('commandReplyInput')->getRequired('endpoint'))
->subscribe($commandServiceConfig->getRequiredAsMap('commandReplyInput')->getRequired('topic'))
)
)
)->registerCommand(StartManagerCommand::newInstance()->withSystemdHelper(new SystemdHelper()))
->registerCommand(StartWorkerCommand::newInstance())
->registerCommand(StopManagerCommand::newInstance())
->registerCommand(StopWorkerCommand::newInstance())
->run();

11 changes: 11 additions & 0 deletions cli/[email protected]
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
[Unit]
Description=Worker CLI command forwarder unit from sarehub/component_worker
Documentation=http://packagist.org/packages/sarehub/component_worker

[Service]
Environment="WORKERCLI_FORWARDER=path_to_workerCliForwarder.php"
ExecStart=/usr/bin/php ${WORKERCLI_FORWARDER} %I

TimeoutStopSec=30
KillMode=process
KillSignal=SIGINT
12 changes: 12 additions & 0 deletions cli/[email protected]
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
[Unit]
Description=Worker Manager generator unit from sarehub/component_worker
Documentation=http://packagist.org/packages/sarehub/component_worker

[Service]
Environment="WORKERCLI_MANAGER_RUNNER=path_to_workerManagerRunner.php"
ExecStart=/usr/bin/php ${WORKERCLI_MANAGER_RUNNER} %I

TimeoutStopSec=60
KillMode=process
KillSignal=SIGINT

41 changes: 41 additions & 0 deletions cli/workerCliForwarder.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
<?php
use SAREhub\Commons\Misc\Parameters;
use SAREhub\Commons\Process\PcntlSignals;
use SAREhub\Commons\Zmq\PublishSubscribe\Publisher;
use SAREhub\Commons\Zmq\PublishSubscribe\Subscriber;
use SAREhub\Commons\Zmq\PublishSubscribe\ZmqForwarderDevice;

date_default_timezone_set('Europe/Warsaw');
require dirname(__DIR__).'/vendor/autoload.php';

$type = $argv[1];

$cliConfig = new Parameters(include(__DIR__.'/config.php'));
$cliManagerConfig = $cliConfig->getRequiredAsMap('manager');

$zmqContext = new ZMQContext();
$forwarderConfig = $cliManagerConfig->getRequiredAsMap('forwarders')->getRequiredAsMap($type);

$device = ZmqForwarderDevice::getBuilder()
->frontend(Subscriber::inContext($zmqContext)
->bind($forwarderConfig->getRequired('input'))
)->backend(Publisher::inContext($zmqContext)
->bind($forwarderConfig->getRequired('output'))
)->build();

$canRun = true;
$onStop = function () use (&$canRun) {
$canRun = false;
};

PcntlSignals::getGlobal()
->handle(PcntlSignals::SIGINT, $onStop)
->handle(PcntlSignals::SIGTERM, $onStop)
->install();
$device->setTimerCallback(function () use (&$canRun) {
PcntlSignals::getGlobal()->checkPendingSignals();
return $canRun;
});

$device->run();

42 changes: 42 additions & 0 deletions cli/workerManagerConfigExample.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
<?php

use Monolog\Handler\StreamHandler;
use Monolog\Logger;

return [
'id' => '',
'manager' => [
'processService' => [
'runnerScript' => '',
'arguments' => [],
'workingDirectory' => ''
],
'commandService' => [
'commandOutput' => [
'endpoint' => ''
],
'commandReplyInput' => [
'topic' => 'worker.command.reply',
'endpoint' => ''
]
],

],
'runner' => [
'commandInput' => [
'topic' => '',
'endpoint' => ''
],
'commandReplyOutput' => [
'topic' => '',
'endpoint' => ''
],
],
'logger' => [
'handlers' => [
[
new StreamHandler('', Logger::INFO)
]
]
]
];
71 changes: 71 additions & 0 deletions cli/workerManagerRunner.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
<?php
date_default_timezone_set('Europe/Warsaw');

use SAREhub\Commons\Misc\Parameters;
use SAREhub\Component\Worker\Command\Command;
use SAREhub\Component\Worker\Command\CommandReply;
use SAREhub\Component\Worker\Manager\WorkerManagerBootstrap;
use SAREhub\Component\Worker\WorkerContext;

require dirname(__DIR__).'/vendor/autoload.php';

$cliConfig = new Parameters(include(__DIR__.'/config.php'));
$cliManagerConfig = $cliConfig->getRequiredAsMap('manager');
$managerId = $argv[1];
echo "starting manager with id: ".$managerId;

$configFile = $managerId.'.php';
echo "Config file: ".$configFile."\n";
$configPath = $cliManagerConfig->getRequired('configRootPath').'/'.$configFile;
echo "Config file path: ".$configPath."\n";

if (file_exists($configPath)) {
echo "loading config\n";
$config = include($configPath);
echo "config loaded\n";
$cliCommandServiceConfig = $cliConfig->getRequiredAsMap('manager')->getRequiredAsMap('forwarders');

$config['runner'] = [
'commandInput' => [
'topic' => '',
'endpoint' => $cliCommandServiceConfig->getRequiredAsMap('commandOutput')->getRequired('output')
],
'commandReplyOutput' => [
'topic' => 'worker-cli',
'endpoint' => $cliCommandServiceConfig->getRequiredAsMap('commandReplyInput')->getRequired('input')
]
];

echo 'listen command on '.$config['runner']['commandInput']['endpoint']
.' with topic '.$config['runner']['commandInput']['topic']."\n";
echo 'sending command reply on '.$config['runner']['commandReplyOutput']['endpoint']
.' with topic '.$config['runner']['commandReplyOutput']['topic']."\n";

$runner = WorkerManagerBootstrap::newInstance()
->withWorkerContext(WorkerContext::newInstance()
->withId($config['id'])
->withRootPath(getcwd())
)->withConfig($config)->build();

$runner->start();
if ($runner->isRunning()) {
$config = new Parameters($config);
$startCommands = $config->getRequiredAsMap('manager')->getRequired('startCommands');
foreach ($startCommands as $command) {
$runner->processCommand($command, function (Command $command, CommandReply $reply) {
echo $command."\n";
echo $reply->toJson()."\n";
});
}
}
while ($runner->isRunning()) {
$runner->tick();
}
$runner->stop();

echo "stopped\n";
}




Loading

0 comments on commit 2cbc81a

Please sign in to comment.