From 119daeafc1ed2a56a6d06d65e1774d495b658a6e Mon Sep 17 00:00:00 2001 From: Sharon Yankovich <49753115+yankovs@users.noreply.github.com> Date: Mon, 10 Jun 2024 11:35:55 +0300 Subject: [PATCH 1/2] Log from worker subprocess, check if result_id is received --- mcrit/SpawningWorker.py | 17 ++++++++++++++--- 1 file changed, 14 insertions(+), 3 deletions(-) diff --git a/mcrit/SpawningWorker.py b/mcrit/SpawningWorker.py index 78a3ca7..b99906c 100644 --- a/mcrit/SpawningWorker.py +++ b/mcrit/SpawningWorker.py @@ -95,6 +95,14 @@ def _executeJobPayload(self, job_payload, job): try: stdout_result, stderr_result = console_handle.communicate(timeout=self._queue_config.QUEUE_SPAWNINGWORKER_CHILDREN_TIMEOUT) stdout_result = stdout_result.strip().decode("utf-8") + # TODO: log output from subprocess in the order it arrived + # instead of the split to stdout, stderr + if stdout_result: + LOGGER.info("STDOUT logs from subprocess: %s", stdout_result) + if stderr_result: + stderr_result = stderr_result.strip().decode("utf-8") + LOGGER.info("STDERR logs from subprocess: %s", stderr_result) + last_line = stdout_result.split("\n")[-1] # successful output should be just the result_id in a single line match = re.match("(?P[0-9a-fA-F]{24})", last_line) @@ -112,9 +120,12 @@ def _executeJob(self, job): with job as j: LOGGER.info("Processing Remote Job: %s", job) result_id = self._executeJobPayload(j["payload"], job) - # result should have already been persisted by the child process,we repeat it here to close the job for the queue - job.result = result_id - LOGGER.info("Finished Remote Job with result_id: %s", result_id) + if result_id: + # result should have already been persisted by the child process,we repeat it here to close the job for the queue + job.result = result_id + LOGGER.info("Finished Remote Job with result_id: %s", result_id) + else: + LOGGER.info("Failed Running Remote Job: %s", job) except Exception as exc: pass From 8b42d33c01f7cf5ac49b4acb165a7240f63cae2e Mon Sep 17 00:00:00 2001 From: Sharon Yankovich <49753115+yankovs@users.noreply.github.com> Date: Wed, 19 Jun 2024 14:54:15 +0300 Subject: [PATCH 2/2] Remove job context manager on SpawningWorker The issue with this context manager is that is on `__exit__` it marks the job either complete or sets an error. But in this case, the error happens inside a child process and isn't propagated to the calling process. So essentially, even if the subprocess set this job as failed, it will be overridden to be completed. --- mcrit/SpawningWorker.py | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/mcrit/SpawningWorker.py b/mcrit/SpawningWorker.py index b99906c..61cbf2a 100644 --- a/mcrit/SpawningWorker.py +++ b/mcrit/SpawningWorker.py @@ -117,15 +117,14 @@ def _executeJob(self, job): self.queue.clean() self.t_last_cleanup = time.time() try: - with job as j: - LOGGER.info("Processing Remote Job: %s", job) - result_id = self._executeJobPayload(j["payload"], job) - if result_id: - # result should have already been persisted by the child process,we repeat it here to close the job for the queue - job.result = result_id - LOGGER.info("Finished Remote Job with result_id: %s", result_id) - else: - LOGGER.info("Failed Running Remote Job: %s", job) + LOGGER.info("Processing Remote Job: %s", job) + result_id = self._executeJobPayload(j["payload"], job) + if result_id: + # result should have already been persisted by the child process,we repeat it here to close the job for the queue + job.result = result_id + LOGGER.info("Finished Remote Job with result_id: %s", result_id) + else: + LOGGER.info("Failed Running Remote Job: %s", job) except Exception as exc: pass