diff --git a/src/clusterfuzz/_internal/base/tasks/__init__.py b/src/clusterfuzz/_internal/base/tasks/__init__.py index e57c739a24..edf3027435 100644 --- a/src/clusterfuzz/_internal/base/tasks/__init__.py +++ b/src/clusterfuzz/_internal/base/tasks/__init__.py @@ -281,7 +281,7 @@ def is_done_collecting_messages(): def get_postprocess_task(): """Gets a postprocess task if one exists.""" # This should only be run on non-preemptible bots. - if not (task_utils.is_remotely_executing_utasks() or + if not (task_utils.is_remotely_executing_utasks() and task_utils.get_opted_in_tasks()): return None # Postprocess is platform-agnostic, so we run all such tasks on our diff --git a/src/clusterfuzz/_internal/google_cloud_utils/batch.py b/src/clusterfuzz/_internal/google_cloud_utils/batch.py index 0e0ccf037a..8af1b1bbab 100644 --- a/src/clusterfuzz/_internal/google_cloud_utils/batch.py +++ b/src/clusterfuzz/_internal/google_cloud_utils/batch.py @@ -21,6 +21,7 @@ from google.cloud import batch_v1 as batch from clusterfuzz._internal.base import retry +from clusterfuzz._internal.base import tasks from clusterfuzz._internal.base import utils from clusterfuzz._internal.base.tasks import task_utils from clusterfuzz._internal.config import local_config @@ -33,7 +34,6 @@ _local = threading.local() -MAX_DURATION = f'{60 * 60 * 6}s' RETRY_COUNT = 0 TASK_BUNCH_SIZE = 20 @@ -46,9 +46,20 @@ MAX_CONCURRENT_VMS_PER_JOB = 1000 BatchWorkloadSpec = collections.namedtuple('BatchWorkloadSpec', [ - 'clusterfuzz_release', 'disk_size_gb', 'disk_type', 'docker_image', - 'user_data', 'service_account_email', 'subnetwork', 'preemptible', - 'project', 'gce_zone', 'machine_type', 'network', 'gce_region' + 'clusterfuzz_release', + 'disk_size_gb', + 'disk_type', + 'docker_image', + 'user_data', + 'service_account_email', + 'subnetwork', + 'preemptible', + 'project', + 'gce_zone', + 'machine_type', + 'network', + 'gce_region', + 'max_run_duration', ]) @@ -158,7 +169,7 @@ def _get_task_spec(batch_workload_spec): task_spec = batch.TaskSpec() task_spec.runnables = [runnable] task_spec.max_retry_count = RETRY_COUNT - task_spec.max_run_duration = MAX_DURATION + task_spec.max_run_duration = batch_workload_spec.max_duration return task_spec @@ -219,8 +230,7 @@ def _create_job(spec, input_urls): create_request.job_id = job_name # The job's parent is the region in which the job will run project_id = spec.project - create_request.parent = ( - f'projects/{project_id}/locations/{spec.gce_region}') + create_request.parent = f'projects/{project_id}/locations/{spec.gce_region}' job_result = _send_create_job_request(create_request) logs.info(f'Created batch job id={job_name}.', spec=spec) return job_result @@ -274,6 +284,11 @@ def _get_config_name(command, job_name): return config_name +def _get_task_duration(command): + return tasks.TASK_LEASE_SECONDS_BY_COMMAND.get(command, + tasks.TASK_LEASE_SECONDS) + + def _get_spec_from_config(command, job_name): """Gets the configured specifications for a batch workload.""" config_name = _get_config_name(command, job_name) @@ -285,6 +300,7 @@ def _get_spec_from_config(command, job_name): docker_image = instance_spec['docker_image'] user_data = instance_spec['user_data'] clusterfuzz_release = instance_spec.get('clusterfuzz_release', 'prod') + max_run_duration = f'{_get_task_duration(command)}s' spec = BatchWorkloadSpec( clusterfuzz_release=clusterfuzz_release, docker_image=docker_image, @@ -298,5 +314,6 @@ def _get_spec_from_config(command, job_name): network=instance_spec['network'], subnetwork=instance_spec['subnetwork'], preemptible=instance_spec['preemptible'], - machine_type=instance_spec['machine_type']) + machine_type=instance_spec['machine_type'], + max_run_duration=max_run_duration) return spec diff --git a/src/clusterfuzz/_internal/tests/core/google_cloud_utils/batch_test.py b/src/clusterfuzz/_internal/tests/core/google_cloud_utils/batch_test.py index bba136fbc4..1dc08d0861 100644 --- a/src/clusterfuzz/_internal/tests/core/google_cloud_utils/batch_test.py +++ b/src/clusterfuzz/_internal/tests/core/google_cloud_utils/batch_test.py @@ -19,6 +19,8 @@ from clusterfuzz._internal.google_cloud_utils import batch from clusterfuzz._internal.tests.test_libs import test_utils +# pylint: disable=protected-access + @test_utils.with_cloud_emulators('datastore') class GetSpecFromConfigTest(unittest.TestCase): @@ -26,13 +28,13 @@ class GetSpecFromConfigTest(unittest.TestCase): def setUp(self): self.maxDiff = None + self.job = data_types.Job(name='libfuzzer_chrome_asan', platform='LINUX') + self.job.put() - def test_nonpreemptible_get_spec_from_config(self): + def test_nonpreemptible(self): """Tests that get_spec_from_config works for non-preemptibles as expected.""" - job = data_types.Job(name='libfuzzer_chrome_asan', platform='LINUX') - job.put() - spec = batch._get_spec_from_config('corpus_pruning', job.name) # pylint: disable=protected-access + spec = batch._get_spec_from_config('analyze', self.job.name) expected_spec = batch.BatchWorkloadSpec( clusterfuzz_release='prod', docker_image='gcr.io/clusterfuzz-images/base:a2f4dd6-202202070654', @@ -47,15 +49,14 @@ def test_nonpreemptible_get_spec_from_config(self): gce_zone='gce-zone', project='test-clusterfuzz', preemptible=False, - machine_type='n1-standard-1') + machine_type='n1-standard-1', + max_run_duration='21600s') self.assertCountEqual(spec, expected_spec) - def test_preemptible_get_spec_from_config(self): + def test_preemptible(self): """Tests that get_spec_from_config works for preemptibles as expected.""" - job = data_types.Job(name='libfuzzer_chrome_asan', platform='LINUX') - job.put() - spec = batch._get_spec_from_config('fuzz', job.name) # pylint: disable=protected-access + spec = batch._get_spec_from_config('fuzz', self.job.name) expected_spec = batch.BatchWorkloadSpec( clusterfuzz_release='prod', docker_image='gcr.io/clusterfuzz-images/base:a2f4dd6-202202070654', @@ -70,6 +71,21 @@ def test_preemptible_get_spec_from_config(self): gce_region='gce-region', project='test-clusterfuzz', preemptible=True, - machine_type='n1-standard-1') + machine_type='n1-standard-1', + max_run_duration='21600s') self.assertCountEqual(spec, expected_spec) + + def test_corpus_pruning(self): + """Tests that corpus pruning uses a spec of 24 hours and a different one + than normal.""" + pruning_spec = batch._get_spec_from_config('corpus_pruning', self.job.name) + self.assertEqual(pruning_spec.max_run_duration, f'{24 * 60 * 60}s') + normal_spec = batch._get_spec_from_config('analyze', self.job.name) + self.assertNotEqual(pruning_spec, normal_spec) + job = data_types.Job(name='libfuzzer_chrome_msan', platform='LINUX') + job.put() + # This behavior is important for grouping batch alike tasks into a single + # batch job. + pruning_spec2 = batch._get_spec_from_config('corpus_pruning', job.name) + self.assertEqual(pruning_spec, pruning_spec2)