Skip to content

Commit

Permalink
Add the possibility to push tasks to swarming (#3996)
Browse files Browse the repository at this point in the history
  • Loading branch information
alhijazi authored Aug 26, 2024
1 parent 1b8ca0d commit aa426a9
Show file tree
Hide file tree
Showing 14 changed files with 1,036 additions and 32 deletions.
76 changes: 76 additions & 0 deletions configs/test/swarming/swarming.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

mapping:
LINUX:
priority: 1
command:
- 'luci-auth'
- 'context'
- '--'
- './linux_entry_point.sh'
cas_input_root:
cas_instance: 'projects/server-name/instances/instance_name'
digest:
hash: 'linux_entry_point_archive_hash'
size_bytes: 1234
service_account_email: test-clusterfuzz-service-account-email
preemptible: false
expiration_secs: 86400
execution_timeout_secs: 86400
docker_image: 'gcr.io/clusterfuzz-images/base:a2f4dd6-202202070654'
MAC:
priority: 1
command:
- 'luci-auth'
- 'context'
- '--'
- './mac_entry_point.sh'
cas_input_root:
cas_instance: 'projects/server-name/instances/instance_name'
digest:
hash: 'mac_entry_point_archive_hash'
size_bytes: 456
service_account_email: test-clusterfuzz-service-account-email
preemptible: false
expiration_secs: 86400
execution_timeout_secs: 86400
cipd_input:
packages:
- package_name: 'package1_name'
version: 'package1_version'
path: 'package_install_path'
- package_name: 'package2_name'
version: 'package2_version'
path: 'package_install_path'
env_prefixes:
- key: 'PATH'
value:
- 'package_install_path'
- 'package_install_path/bin'
dimensions:
- key: 'key1'
value: 'value1'
- key: 'key2'
value: 'value2'
env:
- key: 'ENV_VAR1'
value: 'VALUE1'
- key: 'ENV_VAR2'
value: 'VALUE2'
swarming_server: 'server-name'
swarming_pool: 'pool-name'
swarming_realm: 'realm-name'
logs_project_id: 'project_id'
fuzz_task_duration: 12345
22 changes: 22 additions & 0 deletions src/clusterfuzz/_internal/base/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,28 @@ def fetch_url(url):
return response.text


@retry.wrap(
retries=URL_REQUEST_RETRIES,
delay=URL_REQUEST_FAIL_WAIT,
function='base.utils.post_url')
def post_url(url: str, data: str, headers: dict) -> str:
"""Post the provided data and headers to the provided url.
The request is retried `URL_REQUEST_RETRIES` times.
To avoid blocking the application, a post timeout of 60 seconds is applied.
Args:
url: the url to post to.
data: request data.
headers: request headers.
Returns:
the contents of the response, in unicode.
Raises:
raises an `HTTPError`, if one occurred."""

response = requests.post(url, data=data, headers=headers, timeout=60)
response.raise_for_status()
return response.text


def fields_match(string_1,
string_2,
field_separator=':',
Expand Down
13 changes: 9 additions & 4 deletions src/clusterfuzz/_internal/bot/tasks/task_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
"""Types of tasks. This needs to be seperate from commands.py because
base/tasks.py depends on this module and many things commands.py imports depend
on base/tasks.py (i.e. avoiding circular imports)."""
from clusterfuzz._internal import swarming
from clusterfuzz._internal.base import task_utils
from clusterfuzz._internal.base import tasks
from clusterfuzz._internal.bot.tasks import utasks
Expand Down Expand Up @@ -85,7 +86,8 @@ def is_remote_utask(command, job):
# Return True even if we can't query the db.
return True

return batch.is_remote_task(command, job)
return batch.is_remote_task(command, job) or swarming.is_swarming_task(
command, job)


def task_main_runs_on_uworker():
Expand Down Expand Up @@ -122,8 +124,7 @@ def execute(self, task_argument, job_type, uworker_env):
"""Executes a utask."""
logs.info('Executing utask.')
command = task_utils.get_command_from_module(self.module.__name__)
if not (self.is_execution_remote() and
batch.is_remote_task(command, job_type)):
if not is_remote_utask(command, job_type):
self.execute_locally(task_argument, job_type, uworker_env)
return

Expand All @@ -133,7 +134,11 @@ def execute(self, task_argument, job_type, uworker_env):
return

logs.info('Queueing utask for remote execution.', download_url=download_url)
tasks.add_utask_main(command, download_url, job_type)
if batch.is_remote_task(command, job_type):
tasks.add_utask_main(command, download_url, job_type)
else:
assert swarming.is_swarming_task(command, job_type)
swarming.push_swarming_task(command, download_url, job_type)

def preprocess(self, task_argument, job_type, uworker_env):
result = utasks.tworker_preprocess(self.module, task_argument, job_type,
Expand Down
58 changes: 38 additions & 20 deletions src/clusterfuzz/_internal/bot/tasks/utasks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

from google.protobuf import timestamp_pb2

from clusterfuzz._internal import swarming
from clusterfuzz._internal.base import task_utils
from clusterfuzz._internal.bot.tasks.utasks import uworker_io
from clusterfuzz._internal.bot.webserver import http_server
Expand All @@ -32,7 +33,7 @@
Timestamp = timestamp_pb2.Timestamp # pylint: disable=no-member


class _Mode(enum.Enum):
class Mode(enum.Enum):
"""The execution mode of `uworker_main` tasks in a bot process."""

# `uworker_main` tasks are executed on Cloud Batch.
Expand All @@ -41,6 +42,9 @@ class _Mode(enum.Enum):
# `uworker_main` tasks are executed on bots via a Pub/Sub queue.
QUEUE = 'queue'

# `uworker_main` tasks are executed on swarming.
SWARMING = 'swarming'


class _Subtask(enum.Enum):
"""Parts of a task that may be executed on separate machines."""
Expand All @@ -56,6 +60,14 @@ def _timestamp_now() -> Timestamp:
return ts


def _get_execution_mode(utask_module, job_type):
"""Determines whether this task in executed on swarming or batch."""
command = task_utils.get_command_from_module(utask_module.__name__)
if swarming.is_swarming_task(command, job_type):
return Mode.SWARMING
return Mode.BATCH


class _MetricRecorder(contextlib.AbstractContextManager):
"""Records task execution metrics, even in case of error and exceptions.
Expand All @@ -64,10 +76,9 @@ class _MetricRecorder(contextlib.AbstractContextManager):
nanoseconds since the Unix epoch.
"""

def __init__(self, subtask: _Subtask, mode: _Mode):
def __init__(self, subtask: _Subtask):
self.start_time_ns = time.time_ns()
self._subtask = subtask
self._mode = mode
self._labels = None

if subtask == _Subtask.PREPROCESS:
Expand All @@ -78,6 +89,7 @@ def __init__(self, subtask: _Subtask, mode: _Mode):
def set_task_details(self,
utask_module,
job_type: str,
execution_mode: Mode,
platform: str,
preprocess_start_time: Optional[Timestamp] = None):
"""Sets task details that might not be known at instantation time.
Expand All @@ -97,7 +109,7 @@ def set_task_details(self,
'task': task_utils.get_command_from_module(utask_module.__name__),
'job': job_type,
'subtask': self._subtask.value,
'mode': self._mode.value,
'mode': execution_mode.value,
'platform': platform,
}

Expand Down Expand Up @@ -141,13 +153,14 @@ def ensure_uworker_env_type_safety(uworker_env):


def _preprocess(utask_module, task_argument, job_type, uworker_env,
recorder: _MetricRecorder):
recorder: _MetricRecorder, execution_mode: Mode):
"""Shared logic for preprocessing between preprocess_no_io and the I/O
tworker_preprocess."""
ensure_uworker_env_type_safety(uworker_env)
set_uworker_env(uworker_env)

recorder.set_task_details(utask_module, job_type, environment.platform())
recorder.set_task_details(utask_module, job_type, execution_mode,
environment.platform())

logs.info('Starting utask_preprocess: %s.' % utask_module)
uworker_input = utask_module.utask_preprocess(task_argument, job_type,
Expand Down Expand Up @@ -184,9 +197,9 @@ def tworker_preprocess_no_io(utask_module, task_argument, job_type,
uworker_env):
"""Executes the preprocessing step of the utask |utask_module| and returns the
serialized output."""
with _MetricRecorder(_Subtask.PREPROCESS, _Mode.QUEUE) as recorder:
with _MetricRecorder(_Subtask.PREPROCESS) as recorder:
uworker_input = _preprocess(utask_module, task_argument, job_type,
uworker_env, recorder)
uworker_env, recorder, Mode.QUEUE)
if not uworker_input:
return None

Expand All @@ -196,15 +209,15 @@ def tworker_preprocess_no_io(utask_module, task_argument, job_type,
def uworker_main_no_io(utask_module, serialized_uworker_input):
"""Executes the main part of a utask on the uworker (locally if not using
remote executor)."""
with _MetricRecorder(_Subtask.UWORKER_MAIN, _Mode.QUEUE) as recorder:
with _MetricRecorder(_Subtask.UWORKER_MAIN) as recorder:
logs.info('Starting utask_main: %s.' % utask_module)
uworker_input = uworker_io.deserialize_uworker_input(
serialized_uworker_input)

set_uworker_env(uworker_input.uworker_env)
uworker_input.uworker_env.clear()

recorder.set_task_details(utask_module, uworker_input.job_type,
recorder.set_task_details(utask_module, uworker_input.job_type, Mode.QUEUE,
environment.platform(),
uworker_input.preprocess_start_time)

Expand All @@ -221,7 +234,7 @@ def tworker_postprocess_no_io(utask_module, uworker_output, uworker_input):
"""Executes the postprocess step on the trusted (t)worker (in this case it is
the same bot as the uworker)."""
logs.info('Starting postprocess on trusted worker.')
with _MetricRecorder(_Subtask.POSTPROCESS, _Mode.QUEUE) as recorder:
with _MetricRecorder(_Subtask.POSTPROCESS) as recorder:
uworker_output = uworker_io.deserialize_uworker_output(uworker_output)

# Do this to simulate out-of-band tamper-proof storage of the input.
Expand All @@ -230,7 +243,7 @@ def tworker_postprocess_no_io(utask_module, uworker_output, uworker_input):

set_uworker_env(uworker_output.uworker_input.uworker_env)

recorder.set_task_details(utask_module, uworker_input.job_type,
recorder.set_task_details(utask_module, uworker_input.job_type, Mode.QUEUE,
environment.platform(),
uworker_input.preprocess_start_time)

Expand All @@ -241,9 +254,10 @@ def tworker_preprocess(utask_module, task_argument, job_type, uworker_env):
"""Executes the preprocessing step of the utask |utask_module| and returns the
signed download URL for the uworker's input and the (unsigned) download URL
for its output."""
with _MetricRecorder(_Subtask.PREPROCESS, _Mode.BATCH) as recorder:
with _MetricRecorder(_Subtask.PREPROCESS) as recorder:
execution_mode = _get_execution_mode(utask_module, job_type)
uworker_input = _preprocess(utask_module, task_argument, job_type,
uworker_env, recorder)
uworker_env, recorder, execution_mode)
if not uworker_input:
# Bail if preprocessing failed since we can't proceed.
return None
Expand All @@ -266,7 +280,7 @@ def set_uworker_env(uworker_env: dict) -> None:
def uworker_main(input_download_url) -> None:
"""Executes the main part of a utask on the uworker (locally if not using
remote executor)."""
with _MetricRecorder(_Subtask.UWORKER_MAIN, _Mode.BATCH) as recorder:
with _MetricRecorder(_Subtask.UWORKER_MAIN) as recorder:
uworker_input = uworker_io.download_and_deserialize_uworker_input(
input_download_url)
uworker_output_upload_url = uworker_input.uworker_output_upload_url
Expand All @@ -279,9 +293,11 @@ def uworker_main(input_download_url) -> None:
_start_web_server_if_needed(uworker_input.job_type)

utask_module = get_utask_module(uworker_input.module_name)
recorder.set_task_details(utask_module, uworker_input.job_type,
environment.platform(),
uworker_input.preprocess_start_time)
execution_mode = Mode.SWARMING if environment.is_swarming_bot(
) else Mode.BATCH
recorder.set_task_details(
utask_module, uworker_input.job_type, execution_mode,
environment.platform(), uworker_input.preprocess_start_time)

logs.info('Starting utask_main: %s.' % utask_module)
uworker_output = utask_module.utask_main(uworker_input)
Expand All @@ -308,15 +324,17 @@ def uworker_bot_main():
def tworker_postprocess(output_download_url) -> None:
"""Executes the postprocess step on the trusted (t)worker."""
logs.info('Starting postprocess untrusted worker.')
with _MetricRecorder(_Subtask.POSTPROCESS, _Mode.BATCH) as recorder:
with _MetricRecorder(_Subtask.POSTPROCESS) as recorder:
uworker_output = uworker_io.download_and_deserialize_uworker_output(
output_download_url)

set_uworker_env(uworker_output.uworker_input.uworker_env)

utask_module = get_utask_module(uworker_output.uworker_input.module_name)
execution_mode = _get_execution_mode(utask_module,
uworker_output.uworker_input.job_type)
recorder.set_task_details(
utask_module, uworker_output.uworker_input.job_type,
utask_module, uworker_output.uworker_input.job_type, execution_mode,
environment.platform(),
uworker_output.uworker_input.preprocess_start_time)

Expand Down
8 changes: 8 additions & 0 deletions src/clusterfuzz/_internal/config/local_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
BATCH_PATH = 'batch.batch'
MONITORING_REGIONS_PATH = 'monitoring.regions'
PROJECT_PATH = 'project'
SWARMING_PATH = 'swarming.swarming'


def _load_yaml_file(yaml_file_path):
Expand Down Expand Up @@ -251,3 +252,10 @@ class GCEClustersConfig(Config):

def __init__(self):
super().__init__(GCE_CLUSTERS_PATH)


class SwarmingConfig(Config):
"""Swarming config."""

def __init__(self):
super().__init__(SWARMING_PATH)
Loading

0 comments on commit aa426a9

Please sign in to comment.