Skip to content

Commit

Permalink
Run job_config handlers in new tasks
Browse files Browse the repository at this point in the history
If we run sequentially all the handlers for all
the job_configs, for every entry in the db
that needs babysitting we can reach
the hard time limit for the task.

I did not make the vm image build handlers
running in parallel, because now the code
needs the tasks output.

Co-authored-by: Nikola Forró <[email protected]>
  • Loading branch information
majamassarini and nforro committed Nov 13, 2024
1 parent 6ccf229 commit fc19acd
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 13 deletions.
24 changes: 21 additions & 3 deletions packit_service/worker/helpers/build/babysit.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@
from enum import Enum
from typing import Any

import celery
import copr.v3
import requests
from celery import signature
from copr.v3 import Client as CoprClient
from requests import HTTPError

Expand Down Expand Up @@ -55,6 +57,13 @@
logger = logging.getLogger(__name__)


def celery_run_async(signatures: list[Signature]) -> None:
logger.debug("Signatures are going to be sent to Celery (from update_copr_build_state).")
# https://docs.celeryq.dev/en/stable/userguide/canvas.html#groups
celery.group(signatures).apply_async()
logger.debug("Signatures were sent to Celery.")


def check_pending_testing_farm_runs() -> None:
"""Checks the status of pending TFT runs and updates it if needed."""
logger.info("Getting pending TFT runs from DB")
Expand Down Expand Up @@ -142,6 +151,7 @@ def update_testing_farm_run(event: TestingFarmResultsEvent, run: TFTTestRunTarge
)

event_dict = event.get_dict()
signatures = []
for job_config in job_configs:
package_config = (
event.packages_config.get_package_config_for(job_config)
Expand All @@ -155,7 +165,9 @@ def update_testing_farm_run(event: TestingFarmResultsEvent, run: TFTTestRunTarge
)
# check for identifiers equality
if handler.pre_check(package_config, job_config, event_dict):
handler.run_job()
signatures.append(handler.get_signature(event=event, job=job_config))

celery_run_async(signatures=signatures)


def check_pending_copr_builds() -> None:
Expand Down Expand Up @@ -330,6 +342,7 @@ def update_srpm_build_state(
handler_kls=CoprBuildEndHandler,
)

signatures = []
for job_config in job_configs:
event_dict = event.get_dict()
package_config = (
Expand All @@ -343,7 +356,9 @@ def update_srpm_build_state(
event=event_dict,
)
if handler.pre_check(package_config, job_config, event_dict):
handler.run_job()
signatures.append(handler.get_signature(event=event, job=job_config))

celery_run_async(signatures=signatures)


def update_copr_build_state(
Expand Down Expand Up @@ -406,6 +421,7 @@ def update_copr_build_state(
handler_kls=handler_kls,
)

signatures = []
for job_config in job_configs:
event_dict = event.get_dict()
package_config = (
Expand All @@ -419,7 +435,9 @@ def update_copr_build_state(
event=event_dict,
)
if handler.pre_check(package_config, job_config, event_dict):
handler.run_job()
signatures.append(handler.get_signature(event=event, job=job_config))

celery_run_async(signatures=signatures)


class UpdateImageBuildHelper(ConfigFromUrlMixin, GetVMImageBuilderMixin):
Expand Down
73 changes: 63 additions & 10 deletions tests/integration/test_babysit.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@
check_pending_testing_farm_runs,
update_copr_builds,
)
from packit_service.worker.tasks import (
run_copr_build_end_handler,
run_copr_build_start_handler,
run_testing_farm_results_handler,
)


def test_check_copr_build_no_build():
Expand Down Expand Up @@ -88,6 +93,22 @@ def test_check_copr_build_already_successful():
assert check_copr_build(build_id=1)


def celery_run_async_stub(signatures, handlers) -> None:
results = []
handler = handlers.pop(0)
for sig in signatures:
event_dict = sig.kwargs["event"]
job_config = sig.kwargs["job_config"]
package_config = sig.kwargs["package_config"]

result = handler(
package_config=package_config,
event=event_dict,
job_config=job_config,
)
results.append(result)


@pytest.mark.parametrize(
"build_status, build_ended_on",
[
Expand Down Expand Up @@ -167,13 +188,20 @@ def test_check_copr_build_updated(
JobConfig(
type=JobType.build,
trigger=JobConfigTriggerType.pull_request,
packages={"package": CommonPackageConfig()},
packages={"package": CommonPackageConfig(specfile_path="some.spec")},
),
],
packages={"package": CommonPackageConfig()},
packages={"package": CommonPackageConfig(specfile_path="some.spec")},
),
)
flexmock(CoprBuildEndHandler).should_receive("run_job").and_return().once()
flexmock(
packit_service.worker.helpers.build.babysit,
celery_run_async=lambda signatures: celery_run_async_stub(
signatures, [run_copr_build_end_handler]
),
)

assert check_copr_build(build_id=1) is bool(build_ended_on)


Expand Down Expand Up @@ -247,13 +275,19 @@ def test_check_copr_build_waiting_started(add_pull_request_event_with_sha_123456
JobConfig(
type=JobType.build,
trigger=JobConfigTriggerType.pull_request,
packages={"package": CommonPackageConfig()},
packages={"package": CommonPackageConfig(specfile_path="some.spec")},
),
],
packages={"package": CommonPackageConfig()},
packages={"package": CommonPackageConfig(specfile_path="some.spec")},
),
)
flexmock(CoprBuildStartHandler).should_receive("run_job").and_return().once()
flexmock(
packit_service.worker.helpers.build.babysit,
celery_run_async=lambda signatures: celery_run_async_stub(
signatures, [run_copr_build_start_handler]
),
)
assert not check_copr_build(build_id=1)


Expand Down Expand Up @@ -346,14 +380,19 @@ def test_check_copr_build_waiting_srpm_failed(add_pull_request_event_with_sha_12
JobConfig(
type=JobType.build,
trigger=JobConfigTriggerType.pull_request,
packages={"package": CommonPackageConfig()},
packages={"package": CommonPackageConfig(specfile_path="some.spec")},
),
],
packages={"package": CommonPackageConfig()},
packages={"package": CommonPackageConfig(specfile_path="some.spec")},
),
)
flexmock(CoprBuildEndHandler).should_receive("run_job").and_return().once()
flexmock(CoprBuildStartHandler).should_receive("run_job").and_return().once()
handlers = [run_copr_build_end_handler, run_copr_build_start_handler]
flexmock(
packit_service.worker.helpers.build.babysit,
celery_run_async=lambda signatures: celery_run_async_stub(signatures, handlers),
)
assert not check_copr_build(build_id=1)


Expand Down Expand Up @@ -597,13 +636,19 @@ def test_check_pending_testing_farm_runs(created):
JobConfig(
type=JobType.tests,
trigger=JobConfigTriggerType.pull_request,
packages={"package": CommonPackageConfig()},
packages={"package": CommonPackageConfig(specfile_path="some.spec")},
),
],
packages={"package": CommonPackageConfig()},
packages={"package": CommonPackageConfig(specfile_path="some.spec")},
),
)
flexmock(TestingFarmResultsHandler).should_receive("run_job").and_return().once()
flexmock(
packit_service.worker.helpers.build.babysit,
celery_run_async=lambda signatures: celery_run_async_stub(
signatures, [run_testing_farm_results_handler]
),
)
check_pending_testing_farm_runs()


Expand Down Expand Up @@ -685,6 +730,7 @@ def test_check_pending_testing_farm_runs_identifiers(identifier):
trigger=JobConfigTriggerType.pull_request,
packages={
"package": CommonPackageConfig(
specfile_path="some.spec",
identifier="first",
),
},
Expand All @@ -694,18 +740,25 @@ def test_check_pending_testing_farm_runs_identifiers(identifier):
trigger=JobConfigTriggerType.pull_request,
packages={
"package": CommonPackageConfig(
specfile_path="some.spec",
identifier="second",
),
},
),
JobConfig(
type=JobType.tests,
trigger=JobConfigTriggerType.pull_request,
packages={"package": CommonPackageConfig()},
packages={"package": CommonPackageConfig(specfile_path="some.spec")},
),
],
packages={"package": CommonPackageConfig()},
packages={"package": CommonPackageConfig(specfile_path="some.spec")},
),
)
flexmock(TestingFarmResultsHandler).should_receive("run_job").and_return().once()
flexmock(
packit_service.worker.helpers.build.babysit,
celery_run_async=lambda signatures: celery_run_async_stub(
signatures, [run_testing_farm_results_handler]
),
)
check_pending_testing_farm_runs()

0 comments on commit fc19acd

Please sign in to comment.