diff --git a/.env b/.env index c34015c019..ff9d7bef3a 100644 --- a/.env +++ b/.env @@ -760,6 +760,14 @@ PHRASEANET_EXPLODE_WORKER=1 # @run PHRASEANET_WORKERS_LAUNCH_METHOD= +# timeout in second to wait message from rabbitmq Q +# default 0 unlimeted +# +# NB: if defined the worker will exit 1 when timeout, so use only one worker (bin/console worker:execute) per container +# +# @run +PHRASEANET_WAIT_MESSAGE_TIMEOUT=0 + # @run PHRASEANET_WORKER_assetsIngest=1 diff --git a/docker-compose.yml b/docker-compose.yml index e5914218fe..fe3b028c7b 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -314,6 +314,7 @@ services: - LC_NAME=C.UTF-8 - PHRASEANET_EXPLODE_WORKER - PHRASEANET_WORKERS_LAUNCH_METHOD + - PHRASEANET_WAIT_MESSAGE_TIMEOUT - PHRASEANET_WORKER_assetsIngest - PHRASEANET_WORKER_createRecord - PHRASEANET_WORKER_deleteRecord @@ -382,6 +383,7 @@ services: - LC_NAME=C.UTF-8 - PHRASEANET_EXPLODE_WORKER - PHRASEANET_WORKERS_LAUNCH_METHOD + - PHRASEANET_WAIT_MESSAGE_TIMEOUT - PHRASEANET_WORKER_mainQueue - IMAGEMAGICK_POLICY_VERSION - IMAGEMAGICK_POLICY_WIDTH @@ -434,6 +436,7 @@ services: - LC_NAME=C.UTF-8 - PHRASEANET_EXPLODE_WORKER - PHRASEANET_WORKERS_LAUNCH_METHOD + - PHRASEANET_WAIT_MESSAGE_TIMEOUT - PHRASEANET_WORKER_assetsIngest - IMAGEMAGICK_POLICY_VERSION - IMAGEMAGICK_POLICY_WIDTH @@ -485,6 +488,7 @@ services: - LC_NAME=C.UTF-8 - PHRASEANET_EXPLODE_WORKER - PHRASEANET_WORKERS_LAUNCH_METHOD + - PHRASEANET_WAIT_MESSAGE_TIMEOUT - PHRASEANET_WORKER_createRecord - IMAGEMAGICK_POLICY_VERSION - IMAGEMAGICK_POLICY_WIDTH @@ -537,6 +541,7 @@ services: - LC_NAME=C.UTF-8 - PHRASEANET_EXPLODE_WORKER - PHRASEANET_WORKERS_LAUNCH_METHOD + - PHRASEANET_WAIT_MESSAGE_TIMEOUT - PHRASEANET_WORKER_deleteRecord - IMAGEMAGICK_POLICY_VERSION - IMAGEMAGICK_POLICY_WIDTH @@ -588,6 +593,7 @@ services: - LC_NAME=C.UTF-8 - PHRASEANET_EXPLODE_WORKER - PHRASEANET_WORKERS_LAUNCH_METHOD + - PHRASEANET_WAIT_MESSAGE_TIMEOUT - PHRASEANET_WORKER_editRecord - IMAGEMAGICK_POLICY_VERSION - IMAGEMAGICK_POLICY_WIDTH @@ -741,6 +747,7 @@ services: - LC_NAME=C.UTF-8 - PHRASEANET_EXPLODE_WORKER - PHRASEANET_WORKERS_LAUNCH_METHOD + - PHRASEANET_WAIT_MESSAGE_TIMEOUT - PHRASEANET_WORKER_exposeUpload - IMAGEMAGICK_POLICY_VERSION - IMAGEMAGICK_POLICY_WIDTH @@ -792,6 +799,7 @@ services: - LC_NAME=C.UTF-8 - PHRASEANET_EXPLODE_WORKER - PHRASEANET_WORKERS_LAUNCH_METHOD + - PHRASEANET_WAIT_MESSAGE_TIMEOUT - PHRASEANET_WORKER_ftp - IMAGEMAGICK_POLICY_VERSION - IMAGEMAGICK_POLICY_WIDTH @@ -843,6 +851,7 @@ services: - LC_NAME=C.UTF-8 - PHRASEANET_EXPLODE_WORKER - PHRASEANET_WORKERS_LAUNCH_METHOD + - PHRASEANET_WAIT_MESSAGE_TIMEOUT - PHRASEANET_WORKER_populateIndex - IMAGEMAGICK_POLICY_VERSION - IMAGEMAGICK_POLICY_WIDTH @@ -894,6 +903,7 @@ services: - LC_NAME=C.UTF-8 - PHRASEANET_EXPLODE_WORKER - PHRASEANET_WORKERS_LAUNCH_METHOD + - PHRASEANET_WAIT_MESSAGE_TIMEOUT - PHRASEANET_WORKER_pullAssets - IMAGEMAGICK_POLICY_VERSION - IMAGEMAGICK_POLICY_WIDTH @@ -945,6 +955,7 @@ services: - LC_NAME=C.UTF-8 - PHRASEANET_EXPLODE_WORKER - PHRASEANET_WORKERS_LAUNCH_METHOD + - PHRASEANET_WAIT_MESSAGE_TIMEOUT - PHRASEANET_WORKER_recordsActions - IMAGEMAGICK_POLICY_VERSION - IMAGEMAGICK_POLICY_WIDTH @@ -999,6 +1010,7 @@ services: - LC_NAME=C.UTF-8 - PHRASEANET_EXPLODE_WORKER - PHRASEANET_WORKERS_LAUNCH_METHOD + - PHRASEANET_WAIT_MESSAGE_TIMEOUT - PHRASEANET_WORKER_shareBasket - IMAGEMAGICK_POLICY_VERSION - IMAGEMAGICK_POLICY_WIDTH @@ -1050,6 +1062,7 @@ services: - LC_NAME=C.UTF-8 - PHRASEANET_EXPLODE_WORKER - PHRASEANET_WORKERS_LAUNCH_METHOD + - PHRASEANET_WAIT_MESSAGE_TIMEOUT - PHRASEANET_WORKER_subdefCreation - IMAGEMAGICK_POLICY_VERSION - IMAGEMAGICK_POLICY_WIDTH @@ -1101,6 +1114,7 @@ services: - LC_NAME=C.UTF-8 - PHRASEANET_EXPLODE_WORKER - PHRASEANET_WORKERS_LAUNCH_METHOD + - PHRASEANET_WAIT_MESSAGE_TIMEOUT - PHRASEANET_WORKER_subtitle - IMAGEMAGICK_POLICY_VERSION - IMAGEMAGICK_POLICY_WIDTH @@ -1152,6 +1166,7 @@ services: - LC_NAME=C.UTF-8 - PHRASEANET_EXPLODE_WORKER - PHRASEANET_WORKERS_LAUNCH_METHOD + - PHRASEANET_WAIT_MESSAGE_TIMEOUT - PHRASEANET_WORKER_validationReminder - IMAGEMAGICK_POLICY_VERSION - IMAGEMAGICK_POLICY_WIDTH @@ -1203,6 +1218,7 @@ services: - LC_NAME=C.UTF-8 - PHRASEANET_EXPLODE_WORKER - PHRASEANET_WORKERS_LAUNCH_METHOD + - PHRASEANET_WAIT_MESSAGE_TIMEOUT - PHRASEANET_WORKER_webhook - IMAGEMAGICK_POLICY_VERSION - IMAGEMAGICK_POLICY_WIDTH @@ -1254,6 +1270,7 @@ services: - LC_NAME=C.UTF-8 - PHRASEANET_EXPLODE_WORKER - PHRASEANET_WORKERS_LAUNCH_METHOD + - PHRASEANET_WAIT_MESSAGE_TIMEOUT - PHRASEANET_WORKER_writeMetadatas - IMAGEMAGICK_POLICY_VERSION - IMAGEMAGICK_POLICY_WIDTH diff --git a/docker/phraseanet/worker/entrypoint.sh b/docker/phraseanet/worker/entrypoint.sh index 1f282aa09a..e383085799 100755 --- a/docker/phraseanet/worker/entrypoint.sh +++ b/docker/phraseanet/worker/entrypoint.sh @@ -121,7 +121,7 @@ else queue_name="$(echo $i | cut -d'_' -f3)" m=$i if [ ${!m} -gt "0" ] ; then - command="bin/console worker:execute --queue-name=$queue_name -m ${!m} &" + command="bin/console worker:execute --queue-name=$queue_name -m ${!m} -t ${PHRASEANET_WAIT_MESSAGE_TIMEOUT} &" echo $command >> bin/run-worker.sh echo "Worker " $queue_name " defined with parallelism " ${!m} NBR_WORKERS=$(expr $NBR_WORKERS + 1) diff --git a/lib/Alchemy/Phrasea/WorkerManager/Command/WorkerExecuteCommand.php b/lib/Alchemy/Phrasea/WorkerManager/Command/WorkerExecuteCommand.php index f6374ae58a..c952861a32 100644 --- a/lib/Alchemy/Phrasea/WorkerManager/Command/WorkerExecuteCommand.php +++ b/lib/Alchemy/Phrasea/WorkerManager/Command/WorkerExecuteCommand.php @@ -5,9 +5,11 @@ use Alchemy\Phrasea\Command\Command; use Alchemy\Phrasea\WorkerManager\Queue\AMQPConnection; use Alchemy\Phrasea\WorkerManager\Queue\MessageHandler; +use Alchemy\Phrasea\WorkerManager\Worker\ProcessPool; use Alchemy\Phrasea\WorkerManager\Worker\WorkerInvoker; use Doctrine\DBAL\Connection; use PhpAmqpLib\Channel\AMQPChannel; +use PhpAmqpLib\Exception\AMQPTimeoutException; use Symfony\Component\Console\Input\InputInterface; use Symfony\Component\Console\Input\InputOption; use Symfony\Component\Console\Output\OutputInterface; @@ -25,6 +27,7 @@ public function __construct() ->addOption('preserve-payload', 'p', InputOption::VALUE_NONE, 'Preserve temporary payload file') ->addOption('queue-name', '', InputOption::VALUE_REQUIRED | InputOption::VALUE_IS_ARRAY, 'The name of queues to be consuming') ->addOption('max-processes', 'm', InputOption::VALUE_REQUIRED, 'The max number of process allow to run (default 1) ') + ->addOption('wait-message-timeout', 't', InputOption::VALUE_REQUIRED, 'specify number in second to wait message from Q (default infini =0)') // ->addOption('MWG', '', InputOption::VALUE_NONE, 'Enable MWG metadata compatibility (use only for write metadata service)') // ->addOption('clear-metadatas', '', InputOption::VALUE_NONE, 'Remove metadatas from documents if not compliant with Database structure (use only for write metadata service)') ->setHelp(''); @@ -74,6 +77,11 @@ protected function doExecute(InputInterface $input, OutputInterface $output) $workerInvoker->preservePayloads(); } + $waitTimeout = 0; // infini + if ($input->getOption('wait-message-timeout') != null) { + $waitTimeout = $input->getOption('wait-message-timeout'); + } + /** @var MessageHandler $messageHandler */ $messageHandler = $this->container['alchemy_worker.message.handler']; $messageHandler->consume($channel, $serverConnection, $workerInvoker, $argQueueName, $maxProcesses); @@ -97,7 +105,24 @@ protected function doExecute(InputInterface $input, OutputInterface $output) return 1; } } - $channel->wait(); + + try { + $channel->wait(null, false, $waitTimeout); + } catch (AMQPTimeoutException $e) { + // we are in wait timeout, + // immediately close the rabbit connection to avoid Missed server heartbeat exception after this timeout + $serverConnection->connectionClose(); + + /** @var ProcessPool $processPool */ + $processPool = $this->container['alchemy_worker.process_pool']; + $processPool->setLogger($this->container['alchemy_worker.logger']); + + // and wait until all process generated are finished + $processPool->waitForAllJobProcessFinished(); + + // exit with 1 + return 1; + } } $serverConnection->connectionClose(); diff --git a/lib/Alchemy/Phrasea/WorkerManager/Worker/ProcessPool.php b/lib/Alchemy/Phrasea/WorkerManager/Worker/ProcessPool.php index 8460a3fdb9..a3df6701ea 100644 --- a/lib/Alchemy/Phrasea/WorkerManager/Worker/ProcessPool.php +++ b/lib/Alchemy/Phrasea/WorkerManager/Worker/ProcessPool.php @@ -67,6 +67,22 @@ public function getWorkerProcess(array $processArguments, AMQPChannel $channel, return ($this->processes[] = $builder->getProcess()); } + public function waitForAllJobProcessFinished() + { + $interval = 1; + while (count($this->processes) > 0) { + if (count($this->processes) == 1) { + $process = $this->processes[0]; + $this->logger->info("only 1 process remaining before exit 1 : " . $process->getCommandLine() . "\n"); + } + + sleep($interval); + + $this->detachFinishedProcesses(); + $interval = min(10, $interval + 1); + } + } + private function detachFinishedProcesses() { $runningProcesses = [];