From fc19acde6bcb67fe2c729a5cc3107b8631727142 Mon Sep 17 00:00:00 2001 From: Maja Massarini Date: Tue, 12 Nov 2024 15:38:37 +0100 Subject: [PATCH] Run job_config handlers in new tasks MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit If we run sequentially all the handlers for all the job_configs, for every entry in the db that needs babysitting we can reach the hard time limit for the task. I did not make the vm image build handlers running in parallel, because now the code needs the tasks output. Co-authored-by: Nikola Forró --- .../worker/helpers/build/babysit.py | 24 +++++- tests/integration/test_babysit.py | 73 ++++++++++++++++--- 2 files changed, 84 insertions(+), 13 deletions(-) diff --git a/packit_service/worker/helpers/build/babysit.py b/packit_service/worker/helpers/build/babysit.py index a1a376864..896af55ef 100644 --- a/packit_service/worker/helpers/build/babysit.py +++ b/packit_service/worker/helpers/build/babysit.py @@ -8,8 +8,10 @@ from enum import Enum from typing import Any +import celery import copr.v3 import requests +from celery import signature from copr.v3 import Client as CoprClient from requests import HTTPError @@ -55,6 +57,13 @@ logger = logging.getLogger(__name__) +def celery_run_async(signatures: list[Signature]) -> None: + logger.debug("Signatures are going to be sent to Celery (from update_copr_build_state).") + # https://docs.celeryq.dev/en/stable/userguide/canvas.html#groups + celery.group(signatures).apply_async() + logger.debug("Signatures were sent to Celery.") + + def check_pending_testing_farm_runs() -> None: """Checks the status of pending TFT runs and updates it if needed.""" logger.info("Getting pending TFT runs from DB") @@ -142,6 +151,7 @@ def update_testing_farm_run(event: TestingFarmResultsEvent, run: TFTTestRunTarge ) event_dict = event.get_dict() + signatures = [] for job_config in job_configs: package_config = ( event.packages_config.get_package_config_for(job_config) @@ -155,7 +165,9 @@ def update_testing_farm_run(event: TestingFarmResultsEvent, run: TFTTestRunTarge ) # check for identifiers equality if handler.pre_check(package_config, job_config, event_dict): - handler.run_job() + signatures.append(handler.get_signature(event=event, job=job_config)) + + celery_run_async(signatures=signatures) def check_pending_copr_builds() -> None: @@ -330,6 +342,7 @@ def update_srpm_build_state( handler_kls=CoprBuildEndHandler, ) + signatures = [] for job_config in job_configs: event_dict = event.get_dict() package_config = ( @@ -343,7 +356,9 @@ def update_srpm_build_state( event=event_dict, ) if handler.pre_check(package_config, job_config, event_dict): - handler.run_job() + signatures.append(handler.get_signature(event=event, job=job_config)) + + celery_run_async(signatures=signatures) def update_copr_build_state( @@ -406,6 +421,7 @@ def update_copr_build_state( handler_kls=handler_kls, ) + signatures = [] for job_config in job_configs: event_dict = event.get_dict() package_config = ( @@ -419,7 +435,9 @@ def update_copr_build_state( event=event_dict, ) if handler.pre_check(package_config, job_config, event_dict): - handler.run_job() + signatures.append(handler.get_signature(event=event, job=job_config)) + + celery_run_async(signatures=signatures) class UpdateImageBuildHelper(ConfigFromUrlMixin, GetVMImageBuilderMixin): diff --git a/tests/integration/test_babysit.py b/tests/integration/test_babysit.py index 9585bc6a4..bd9eacb9d 100644 --- a/tests/integration/test_babysit.py +++ b/tests/integration/test_babysit.py @@ -37,6 +37,11 @@ check_pending_testing_farm_runs, update_copr_builds, ) +from packit_service.worker.tasks import ( + run_copr_build_end_handler, + run_copr_build_start_handler, + run_testing_farm_results_handler, +) def test_check_copr_build_no_build(): @@ -88,6 +93,22 @@ def test_check_copr_build_already_successful(): assert check_copr_build(build_id=1) +def celery_run_async_stub(signatures, handlers) -> None: + results = [] + handler = handlers.pop(0) + for sig in signatures: + event_dict = sig.kwargs["event"] + job_config = sig.kwargs["job_config"] + package_config = sig.kwargs["package_config"] + + result = handler( + package_config=package_config, + event=event_dict, + job_config=job_config, + ) + results.append(result) + + @pytest.mark.parametrize( "build_status, build_ended_on", [ @@ -167,13 +188,20 @@ def test_check_copr_build_updated( JobConfig( type=JobType.build, trigger=JobConfigTriggerType.pull_request, - packages={"package": CommonPackageConfig()}, + packages={"package": CommonPackageConfig(specfile_path="some.spec")}, ), ], - packages={"package": CommonPackageConfig()}, + packages={"package": CommonPackageConfig(specfile_path="some.spec")}, ), ) flexmock(CoprBuildEndHandler).should_receive("run_job").and_return().once() + flexmock( + packit_service.worker.helpers.build.babysit, + celery_run_async=lambda signatures: celery_run_async_stub( + signatures, [run_copr_build_end_handler] + ), + ) + assert check_copr_build(build_id=1) is bool(build_ended_on) @@ -247,13 +275,19 @@ def test_check_copr_build_waiting_started(add_pull_request_event_with_sha_123456 JobConfig( type=JobType.build, trigger=JobConfigTriggerType.pull_request, - packages={"package": CommonPackageConfig()}, + packages={"package": CommonPackageConfig(specfile_path="some.spec")}, ), ], - packages={"package": CommonPackageConfig()}, + packages={"package": CommonPackageConfig(specfile_path="some.spec")}, ), ) flexmock(CoprBuildStartHandler).should_receive("run_job").and_return().once() + flexmock( + packit_service.worker.helpers.build.babysit, + celery_run_async=lambda signatures: celery_run_async_stub( + signatures, [run_copr_build_start_handler] + ), + ) assert not check_copr_build(build_id=1) @@ -346,14 +380,19 @@ def test_check_copr_build_waiting_srpm_failed(add_pull_request_event_with_sha_12 JobConfig( type=JobType.build, trigger=JobConfigTriggerType.pull_request, - packages={"package": CommonPackageConfig()}, + packages={"package": CommonPackageConfig(specfile_path="some.spec")}, ), ], - packages={"package": CommonPackageConfig()}, + packages={"package": CommonPackageConfig(specfile_path="some.spec")}, ), ) flexmock(CoprBuildEndHandler).should_receive("run_job").and_return().once() flexmock(CoprBuildStartHandler).should_receive("run_job").and_return().once() + handlers = [run_copr_build_end_handler, run_copr_build_start_handler] + flexmock( + packit_service.worker.helpers.build.babysit, + celery_run_async=lambda signatures: celery_run_async_stub(signatures, handlers), + ) assert not check_copr_build(build_id=1) @@ -597,13 +636,19 @@ def test_check_pending_testing_farm_runs(created): JobConfig( type=JobType.tests, trigger=JobConfigTriggerType.pull_request, - packages={"package": CommonPackageConfig()}, + packages={"package": CommonPackageConfig(specfile_path="some.spec")}, ), ], - packages={"package": CommonPackageConfig()}, + packages={"package": CommonPackageConfig(specfile_path="some.spec")}, ), ) flexmock(TestingFarmResultsHandler).should_receive("run_job").and_return().once() + flexmock( + packit_service.worker.helpers.build.babysit, + celery_run_async=lambda signatures: celery_run_async_stub( + signatures, [run_testing_farm_results_handler] + ), + ) check_pending_testing_farm_runs() @@ -685,6 +730,7 @@ def test_check_pending_testing_farm_runs_identifiers(identifier): trigger=JobConfigTriggerType.pull_request, packages={ "package": CommonPackageConfig( + specfile_path="some.spec", identifier="first", ), }, @@ -694,6 +740,7 @@ def test_check_pending_testing_farm_runs_identifiers(identifier): trigger=JobConfigTriggerType.pull_request, packages={ "package": CommonPackageConfig( + specfile_path="some.spec", identifier="second", ), }, @@ -701,11 +748,17 @@ def test_check_pending_testing_farm_runs_identifiers(identifier): JobConfig( type=JobType.tests, trigger=JobConfigTriggerType.pull_request, - packages={"package": CommonPackageConfig()}, + packages={"package": CommonPackageConfig(specfile_path="some.spec")}, ), ], - packages={"package": CommonPackageConfig()}, + packages={"package": CommonPackageConfig(specfile_path="some.spec")}, ), ) flexmock(TestingFarmResultsHandler).should_receive("run_job").and_return().once() + flexmock( + packit_service.worker.helpers.build.babysit, + celery_run_async=lambda signatures: celery_run_async_stub( + signatures, [run_testing_farm_results_handler] + ), + ) check_pending_testing_farm_runs()