diff --git a/lib/Alchemy/Phrasea/WorkerManager/Command/WorkerHeartbeatCommand.php b/lib/Alchemy/Phrasea/WorkerManager/Command/WorkerHeartbeatCommand.php index ef6b923839..8a0ddca190 100644 --- a/lib/Alchemy/Phrasea/WorkerManager/Command/WorkerHeartbeatCommand.php +++ b/lib/Alchemy/Phrasea/WorkerManager/Command/WorkerHeartbeatCommand.php @@ -28,16 +28,16 @@ public function __construct() protected function doExecute(InputInterface $input, OutputInterface $output) { /** @var AMQPConnection $serverConnection */ - $serverConnection = $this->container['alchemy_worker.amqp.connection']; +// $serverConnection = $this->container['alchemy_worker.amqp.connection']; - $connection = $serverConnection->getConnection(); +// $connection = $serverConnection->getConnection(); $interval = $input->getOption('heartbeat'); if (empty($interval)) { $interval = self::DEFAULT_INTERVAL; } - $heartbeatHandler = new HeartbeatHandler($connection); + $heartbeatHandler = new HeartbeatHandler($this->container['alchemy_worker.message.publisher']); $heartbeatHandler->run($interval); return 0; diff --git a/lib/Alchemy/Phrasea/WorkerManager/Queue/HeartbeatHandler.php b/lib/Alchemy/Phrasea/WorkerManager/Queue/HeartbeatHandler.php index aa07b47767..6c4d70ba7a 100644 --- a/lib/Alchemy/Phrasea/WorkerManager/Queue/HeartbeatHandler.php +++ b/lib/Alchemy/Phrasea/WorkerManager/Queue/HeartbeatHandler.php @@ -8,14 +8,11 @@ class HeartbeatHandler { - /** - * @var AbstractConnection - */ - private $connection; + private $messagePublisher; - public function __construct(AbstractConnection $connection) + public function __construct(MessagePublisher $messagePublisher) { - $this->connection = $connection; + $this->messagePublisher = $messagePublisher; } /** @@ -23,14 +20,35 @@ public function __construct(AbstractConnection $connection) */ public function run($interval) { + $fileDir = $_SERVER['PWD'] .'/tmp/watchdog'; + while (true) { - if (!$this->connection->isConnected()) { - return; + $filename = $_SERVER['PWD'] .'/tmp/watchdog/edit.watchdog'; + if (!file_exists($filename)) { + if (!is_dir($fileDir)) { + mkdir($fileDir, 0775, true); + } + + file_put_contents($filename, 'watchdog_edit_ping'); + + $payload = [ + 'message_type' => MessagePublisher::MAIN_QUEUE_TYPE, + 'payload' => [ + 'type' => MessagePublisher::EDIT_RECORD_TYPE, // used to specify the final Q to publish message + 'dataType' => 'watchdog', + 'data' => 'watchdog_edit_ping' + ] + ]; + + $this->messagePublisher->publishMessage($payload, MessagePublisher::MAIN_QUEUE_TYPE); + } else { + $this->messagePublisher->pushLog("Edit record worker do not consume message! check if consumer is running ,busy , or to be restart!", "warning"); + + @unlink($_SERVER['PWD'] . '/tmp/watchdog/edit.watchdog'); // unlink, so to be able to re-check after } - sleep((int) $interval / 2); - - $this->connection->checkHeartBeat(); + // each 5 minutes + sleep(300); } } } diff --git a/lib/Alchemy/Phrasea/WorkerManager/Worker/EditRecordWorker.php b/lib/Alchemy/Phrasea/WorkerManager/Worker/EditRecordWorker.php index 0997204cf5..3772917aa8 100644 --- a/lib/Alchemy/Phrasea/WorkerManager/Worker/EditRecordWorker.php +++ b/lib/Alchemy/Phrasea/WorkerManager/Worker/EditRecordWorker.php @@ -31,6 +31,13 @@ public function __construct(WorkerRunningJobRepository $repoWorker, EventDispatc public function process(array $payload) { + if ($payload['dataType'] == 'watchdog') { + @unlink($_SERVER['PWD'] . '/tmp/watchdog/edit.watchdog'); + $this->messagePublisher->pushLog("watchdog edit message processed!", 'info'); + + return 0; + } + try { $databox = $this->findDataboxById($payload['databoxId']); } catch(\Exception $e) { diff --git a/lib/Alchemy/Phrasea/WorkerManager/Worker/MainQueueWorker.php b/lib/Alchemy/Phrasea/WorkerManager/Worker/MainQueueWorker.php index 975590f642..055d3cde4d 100644 --- a/lib/Alchemy/Phrasea/WorkerManager/Worker/MainQueueWorker.php +++ b/lib/Alchemy/Phrasea/WorkerManager/Worker/MainQueueWorker.php @@ -106,7 +106,7 @@ public function process(array $payload) return $singleMessage; }, $payload['data']); - } else { + } elseif ($payload['dataType'] == RecordEditInWorkerEvent::JSON_TYPE) { $data = json_decode($payload['data'], true); $payloadData = array_map(function($singleMessage) use ($payload, $data) { @@ -120,6 +120,8 @@ public function process(array $payload) return $singleMessage; }, $data['records']); + } elseif ($payload['dataType'] == 'watchdog') { + $payloadData[0] = $payload; } $childMessageCount = count($payloadData);