Skip to content

Commit

Permalink
Reapply "Make fuzz task work on a uworker. (#4342)" (#4351) (#4352)
Browse files Browse the repository at this point in the history
This reverts commit
90b8710.

1. Don't assume there is a data bundle.
2. Fix typo.
  • Loading branch information
jonathanmetzman authored Oct 23, 2024
1 parent 90b8710 commit 25243d1
Show file tree
Hide file tree
Showing 10 changed files with 163 additions and 78 deletions.
23 changes: 13 additions & 10 deletions src/clusterfuzz/_internal/bot/tasks/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -454,7 +454,7 @@ def _should_update_data_bundle(data_bundle, data_bundle_directory):

def _prepare_update_data_bundle(fuzzer, data_bundle):
"""Create necessary directories to download the data bundle."""
data_bundle_directory = get_data_bundle_directory(fuzzer, data_bundle)
data_bundle_directory = _get_data_bundle_directory(fuzzer, data_bundle)
if not data_bundle_directory:
logs.error('Failed to setup data bundle %s.' % data_bundle.name)
return None
Expand Down Expand Up @@ -744,17 +744,20 @@ def _is_data_bundle_up_to_date(data_bundle, data_bundle_directory):
return False


def trusted_get_data_bundle_directory(fuzzer):
"""For fuzz_task which doesn't get data bundles in an untrusted manner."""
# TODO(metzman): Delete this when fuzz_task is migrated.
# Check if we have a fuzzer-specific data bundle. Use it to calculate the
# data directory we will fetch our testcases from.
data_bundle = data_types.DataBundle.query(
data_types.DataBundle.name == fuzzer.data_bundle_name).get()
return get_data_bundle_directory(fuzzer, data_bundle)
def get_data_bundle_directory(fuzzer, setup_input):
"""Public interface for _get_data_bundle_directory."""
if not setup_input.data_bundle_corpuses:
data_bundle = None
else:
# There should only be one of these, get the first one.
assert len(setup_input.data_bundle_corpuses) == 1
data_bundle = setup_input.data_bundle_corpuses[0].data_bundle
data_bundle = uworker_io.entity_from_protobuf(data_bundle,
data_types.DataBundle)
return _get_data_bundle_directory(fuzzer, data_bundle)


def get_data_bundle_directory(fuzzer, data_bundle):
def _get_data_bundle_directory(fuzzer, data_bundle):
"""Return data bundle data directory."""
# Store corpora for built-in fuzzers like libFuzzer in the same directory
# as other local data bundles. This makes it easy to clear them when we run
Expand Down
46 changes: 41 additions & 5 deletions src/clusterfuzz/_internal/bot/tasks/utasks/fuzz_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@
THREAD_WAIT_TIMEOUT = 1
MAX_CRASHES_UPLOADED = 64

ENGINE_OUTPUT_LIMIT = 10 * 2**20


class FuzzTaskError(Exception):
"""Fuzz task exception."""
Expand Down Expand Up @@ -548,7 +550,11 @@ def sync_from_gcs(self):

# Check if the corpus was recently synced. If yes, set a flag so that we
# don't sync it again and save some time.
if last_sync_time and os.path.exists(self._corpus_directory):
# TODO(metzman): Consider removing this after migration is complete. It
# probably doesn't save much time as corpus syncing is super fast after
# async syncing was added.
if not environment.is_uworker() and last_sync_time and os.path.exists(
self._corpus_directory):
last_update_time = storage.last_updated(self._get_gcs_url())
if last_update_time and last_sync_time > last_update_time:
logs.info('Corpus for target %s has no new updates, skipping rsync.' %
Expand Down Expand Up @@ -1484,6 +1490,7 @@ def do_engine_fuzzing(self, engine_impl):
fuzzer_metadata = {}
return_code = 1 # Vanilla return-code for engine crashes.

self.fuzz_task_output.app_revision = environment.get_value('APP_REVISION')
# Do the actual fuzzing.
for fuzzing_round in range(environment.get_value('MAX_TESTCASES', 1)):
logs.info(f'Fuzzing round {fuzzing_round}.')
Expand Down Expand Up @@ -1513,9 +1520,9 @@ def do_engine_fuzzing(self, engine_impl):
float(testcase_run.timestamp))
crash_result_obj = crash_result.CrashResult(
return_code, result.time_executed, result.logs)
log = testcase_manager.prepare_log_for_upload(
crash_result_obj.get_stacktrace(), return_code)
testcase_manager.upload_log(log, log_time)
output = crash_result_obj.get_stacktrace()
self.fuzz_task_output.engine_outputs.append(
_to_engine_output(output, return_code, log_time))

for crash in result.crashes:
testcase_manager.upload_testcase(crash.input_path, log_time)
Expand Down Expand Up @@ -1764,7 +1771,8 @@ def run(self):

# Data bundle directories can also have testcases which are kept in-place
# because of dependencies.
self.data_directory = setup.trusted_get_data_bundle_directory(self.fuzzer)
self.data_directory = setup.get_data_bundle_directory(
self.fuzzer, self.uworker_input.setup_input)
if not self.data_directory:
logs.error(
'Unable to setup data bundle %s.' % self.fuzzer.data_bundle_name)
Expand Down Expand Up @@ -2019,12 +2027,40 @@ def save_fuzz_targets(output):
output.uworker_input.job_type)


def _to_engine_output(output: str, return_code: int,
log_time: datetime.datetime):
"""Returns an EngineOutput proto."""
truncated_output = truncate_fuzzer_output(output, ENGINE_OUTPUT_LIMIT)
if len(output) != len(truncated_output):
logs.warning('Fuzzer output truncated.')

proto_timestamp = uworker_io.timestamp_to_proto_timestamp(log_time)
engine_output = uworker_msg_pb2.EngineOutput(
output=bytes(truncated_output, 'utf-8'),
return_code=return_code,
timestamp=proto_timestamp)
return engine_output


def _upload_engine_output_log(engine_output):
timestamp = uworker_io.proto_timestamp_to_timestamp(engine_output.timestamp)
testcase_manager.upload_log(engine_output.output, engine_output.return_code,
timestamp)


def utask_postprocess(output):
"""Postprocesses fuzz_task."""
if output.error_type != uworker_msg_pb2.ErrorType.NO_ERROR: # pylint: disable=no-member
_ERROR_HANDLER.handle(output)
return

save_fuzz_targets(output)

session = _make_session(output.uworker_input)
# TODO(metzman): Get rid of this method and move functionality to this
# function.
session.postprocess(output)
# TODO(b/374776013): Refactor this code so the uploads happen during
# utask_main.
for engine_output in output.fuzz_task_output.engine_outputs:
_upload_engine_output_log(engine_output)
14 changes: 14 additions & 0 deletions src/clusterfuzz/_internal/bot/tasks/utasks/uworker_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from google.cloud.datastore_v1.types import entity as entity_pb2
from google.cloud.ndb import model
from google.protobuf import any_pb2
from google.protobuf import timestamp_pb2
import google.protobuf.message

from clusterfuzz._internal.base import task_utils
Expand All @@ -30,6 +31,9 @@
from clusterfuzz._internal.protos import uworker_msg_pb2
from clusterfuzz._internal.system import environment

# Define an alias to appease pylint.
Timestamp = timestamp_pb2.Timestamp # pylint: disable=no-member


def generate_new_input_file_name() -> str:
"""Generates a new input file name."""
Expand Down Expand Up @@ -215,3 +219,13 @@ def check_handling_testcase_safe(testcase):
# TODO(https://b.corp.google.com/issues/328691756): Change this to
# log_fatal_and_exit once we are handling untrusted tasks properly.
logs.warning(f'Cannot handle {testcase.key.id()} in trusted task.')


def timestamp_to_proto_timestamp(pydt) -> Timestamp:
proto_timestamp = Timestamp()
proto_timestamp.FromDatetime(pydt)
return proto_timestamp


def proto_timestamp_to_timestamp(proto_timestamp: Timestamp):
return proto_timestamp.ToDatetime()
17 changes: 8 additions & 9 deletions src/clusterfuzz/_internal/bot/testcase_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -539,9 +539,7 @@ def _do_run_testcase_and_return_result_in_queue(crash_queue,
if upload_output:
# Include full output for uploaded logs (crash output, merge output, etc).
crash_result_full = CrashResult(return_code, crash_time, output)
log = prepare_log_for_upload(crash_result_full.get_stacktrace(),
return_code)
upload_log(log, log_time)
upload_log(crash_result_full.get_stacktrace(), return_code, log_time)
except Exception:
logs.error('Exception occurred while running '
'run_testcase_and_return_result_in_queue.')
Expand Down Expand Up @@ -849,10 +847,10 @@ def test_for_reproducibility(fuzz_target,
expected_security_flag)


def prepare_log_for_upload(symbolized_output, return_code):
def _prepare_log_for_upload(symbolized_output, return_code, app_revision):
"""Prepare log for upload."""
# Add revision information to the logs.
app_revision = environment.get_value('APP_REVISION')
app_revision = app_revision or environment.get_value('APP_REVISION')
job_name = environment.get_value('JOB_NAME')
components = revisions.get_component_list(app_revision, job_name)
component_revisions = (
Expand All @@ -867,15 +865,16 @@ def prepare_log_for_upload(symbolized_output, return_code):
if environment.is_android():
bot_header += f'Device serial: {environment.get_value("ANDROID_SERIAL")}\n'

return_code_header = "Return code: %s\n\n" % return_code
return_code_header = f'Return code: {return_code}\n\n'

result = revisions_header + bot_header + return_code_header +\
symbolized_output
result = (
revisions_header + bot_header + return_code_header + symbolized_output)
return result.encode('utf-8')


def upload_log(log, log_time):
def upload_log(symbolized_output, return_code, log_time, app_revision=None):
"""Upload the output into corresponding GCS logs bucket."""
log = _prepare_log_for_upload(symbolized_output, return_code, app_revision)
fuzz_logs_bucket = environment.get_value('FUZZ_LOGS_BUCKET')
if not fuzz_logs_bucket:
return
Expand Down
8 changes: 8 additions & 0 deletions src/clusterfuzz/_internal/protos/uworker_msg.proto
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,12 @@ message FuzzTaskCrashGroup {
optional bool one_time_crasher_flag = 4;
}

message EngineOutput {
optional bytes output = 1;
optional int64 return_code = 2;
google.protobuf.Timestamp timestamp = 3;
}

message FuzzTaskOutput {
// TODO(metzman): Remove this since tworkers should know what this is based on
// the input.
Expand All @@ -250,6 +256,8 @@ message FuzzTaskOutput {
repeated string testcase_run_jsons = 12;
repeated FuzzTaskCrashGroup crash_groups = 13;
optional BuildData build_data = 14;
optional int64 app_revision = 15;
repeated EngineOutput engine_outputs = 16;
}

message MinimizeTaskOutput {
Expand Down
56 changes: 29 additions & 27 deletions src/clusterfuzz/_internal/protos/uworker_msg_pb2.py

Large diffs are not rendered by default.

40 changes: 38 additions & 2 deletions src/clusterfuzz/_internal/protos/uworker_msg_pb2.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -1192,6 +1192,33 @@ class FuzzTaskCrashGroup(google.protobuf.message.Message):

global___FuzzTaskCrashGroup = FuzzTaskCrashGroup

@typing_extensions.final
class EngineOutput(google.protobuf.message.Message):
DESCRIPTOR: google.protobuf.descriptor.Descriptor

OUTPUT_FIELD_NUMBER: builtins.int
RETURN_CODE_FIELD_NUMBER: builtins.int
TIMESTAMP_FIELD_NUMBER: builtins.int
output: builtins.bytes
return_code: builtins.int
@property
def timestamp(self) -> google.protobuf.timestamp_pb2.Timestamp: ...
def __init__(
self,
*,
output: builtins.bytes | None = ...,
return_code: builtins.int | None = ...,
timestamp: google.protobuf.timestamp_pb2.Timestamp | None = ...,
) -> None: ...
def HasField(self, field_name: typing_extensions.Literal["_output", b"_output", "_return_code", b"_return_code", "output", b"output", "return_code", b"return_code", "timestamp", b"timestamp"]) -> builtins.bool: ...
def ClearField(self, field_name: typing_extensions.Literal["_output", b"_output", "_return_code", b"_return_code", "output", b"output", "return_code", b"return_code", "timestamp", b"timestamp"]) -> None: ...
@typing.overload
def WhichOneof(self, oneof_group: typing_extensions.Literal["_output", b"_output"]) -> typing_extensions.Literal["output"] | None: ...
@typing.overload
def WhichOneof(self, oneof_group: typing_extensions.Literal["_return_code", b"_return_code"]) -> typing_extensions.Literal["return_code"] | None: ...

global___EngineOutput = EngineOutput

@typing_extensions.final
class FuzzTaskOutput(google.protobuf.message.Message):
DESCRIPTOR: google.protobuf.descriptor.Descriptor
Expand All @@ -1207,6 +1234,8 @@ class FuzzTaskOutput(google.protobuf.message.Message):
TESTCASE_RUN_JSONS_FIELD_NUMBER: builtins.int
CRASH_GROUPS_FIELD_NUMBER: builtins.int
BUILD_DATA_FIELD_NUMBER: builtins.int
APP_REVISION_FIELD_NUMBER: builtins.int
ENGINE_OUTPUTS_FIELD_NUMBER: builtins.int
fully_qualified_fuzzer_name: builtins.str
"""TODO(metzman): Remove this since tworkers should know what this is based on
the input.
Expand All @@ -1226,6 +1255,9 @@ class FuzzTaskOutput(google.protobuf.message.Message):
def crash_groups(self) -> google.protobuf.internal.containers.RepeatedCompositeFieldContainer[global___FuzzTaskCrashGroup]: ...
@property
def build_data(self) -> global___BuildData: ...
app_revision: builtins.int
@property
def engine_outputs(self) -> google.protobuf.internal.containers.RepeatedCompositeFieldContainer[global___EngineOutput]: ...
def __init__(
self,
*,
Expand All @@ -1240,9 +1272,13 @@ class FuzzTaskOutput(google.protobuf.message.Message):
testcase_run_jsons: collections.abc.Iterable[builtins.str] | None = ...,
crash_groups: collections.abc.Iterable[global___FuzzTaskCrashGroup] | None = ...,
build_data: global___BuildData | None = ...,
app_revision: builtins.int | None = ...,
engine_outputs: collections.abc.Iterable[global___EngineOutput] | None = ...,
) -> None: ...
def HasField(self, field_name: typing_extensions.Literal["_build_data", b"_build_data", "_crash_revision", b"_crash_revision", "_fully_qualified_fuzzer_name", b"_fully_qualified_fuzzer_name", "_fuzzer_revision", b"_fuzzer_revision", "_fuzzer_run_results", b"_fuzzer_run_results", "_job_run_timestamp", b"_job_run_timestamp", "_new_targets_count", b"_new_targets_count", "_testcases_executed", b"_testcases_executed", "build_data", b"build_data", "crash_revision", b"crash_revision", "fully_qualified_fuzzer_name", b"fully_qualified_fuzzer_name", "fuzzer_revision", b"fuzzer_revision", "fuzzer_run_results", b"fuzzer_run_results", "job_run_timestamp", b"job_run_timestamp", "new_targets_count", b"new_targets_count", "testcases_executed", b"testcases_executed"]) -> builtins.bool: ...
def ClearField(self, field_name: typing_extensions.Literal["_build_data", b"_build_data", "_crash_revision", b"_crash_revision", "_fully_qualified_fuzzer_name", b"_fully_qualified_fuzzer_name", "_fuzzer_revision", b"_fuzzer_revision", "_fuzzer_run_results", b"_fuzzer_run_results", "_job_run_timestamp", b"_job_run_timestamp", "_new_targets_count", b"_new_targets_count", "_testcases_executed", b"_testcases_executed", "build_data", b"build_data", "crash_groups", b"crash_groups", "crash_revision", b"crash_revision", "fully_qualified_fuzzer_name", b"fully_qualified_fuzzer_name", "fuzz_targets", b"fuzz_targets", "fuzzer_revision", b"fuzzer_revision", "fuzzer_run_results", b"fuzzer_run_results", "job_run_timestamp", b"job_run_timestamp", "new_targets_count", b"new_targets_count", "testcase_run_jsons", b"testcase_run_jsons", "testcases_executed", b"testcases_executed"]) -> None: ...
def HasField(self, field_name: typing_extensions.Literal["_app_revision", b"_app_revision", "_build_data", b"_build_data", "_crash_revision", b"_crash_revision", "_fully_qualified_fuzzer_name", b"_fully_qualified_fuzzer_name", "_fuzzer_revision", b"_fuzzer_revision", "_fuzzer_run_results", b"_fuzzer_run_results", "_job_run_timestamp", b"_job_run_timestamp", "_new_targets_count", b"_new_targets_count", "_testcases_executed", b"_testcases_executed", "app_revision", b"app_revision", "build_data", b"build_data", "crash_revision", b"crash_revision", "fully_qualified_fuzzer_name", b"fully_qualified_fuzzer_name", "fuzzer_revision", b"fuzzer_revision", "fuzzer_run_results", b"fuzzer_run_results", "job_run_timestamp", b"job_run_timestamp", "new_targets_count", b"new_targets_count", "testcases_executed", b"testcases_executed"]) -> builtins.bool: ...
def ClearField(self, field_name: typing_extensions.Literal["_app_revision", b"_app_revision", "_build_data", b"_build_data", "_crash_revision", b"_crash_revision", "_fully_qualified_fuzzer_name", b"_fully_qualified_fuzzer_name", "_fuzzer_revision", b"_fuzzer_revision", "_fuzzer_run_results", b"_fuzzer_run_results", "_job_run_timestamp", b"_job_run_timestamp", "_new_targets_count", b"_new_targets_count", "_testcases_executed", b"_testcases_executed", "app_revision", b"app_revision", "build_data", b"build_data", "crash_groups", b"crash_groups", "crash_revision", b"crash_revision", "engine_outputs", b"engine_outputs", "fully_qualified_fuzzer_name", b"fully_qualified_fuzzer_name", "fuzz_targets", b"fuzz_targets", "fuzzer_revision", b"fuzzer_revision", "fuzzer_run_results", b"fuzzer_run_results", "job_run_timestamp", b"job_run_timestamp", "new_targets_count", b"new_targets_count", "testcase_run_jsons", b"testcase_run_jsons", "testcases_executed", b"testcases_executed"]) -> None: ...
@typing.overload
def WhichOneof(self, oneof_group: typing_extensions.Literal["_app_revision", b"_app_revision"]) -> typing_extensions.Literal["app_revision"] | None: ...
@typing.overload
def WhichOneof(self, oneof_group: typing_extensions.Literal["_build_data", b"_build_data"]) -> typing_extensions.Literal["build_data"] | None: ...
@typing.overload
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1251,7 +1251,6 @@ def setUp(self):
'clusterfuzz._internal.bot.tasks.utasks.fuzz_task.GcsCorpus.sync_from_gcs',
'clusterfuzz._internal.bot.tasks.utasks.fuzz_task.GcsCorpus.upload_files',
'clusterfuzz._internal.build_management.revisions.get_component_list',
'clusterfuzz._internal.bot.testcase_manager.upload_log',
'clusterfuzz._internal.bot.testcase_manager.upload_testcase',
'clusterfuzz._internal.google_cloud_utils.storage.list_blobs',
'clusterfuzz._internal.google_cloud_utils.storage.get_arbitrary_signed_upload_urls',
Expand Down Expand Up @@ -1305,6 +1304,7 @@ def test_basic(self):
os.environ['APP_REVISION'] = '1'
os.environ['FUZZ_TEST_TIMEOUT'] = '2000'
os.environ['BOT_NAME'] = 'hostname.company.com'
os.environ['FUZZ_LOGS_BUCKET'] = '/fuzz-logs'

expected_crashes = [engine.Crash('/input', 'stack', ['args'], 1.0)]

Expand Down Expand Up @@ -1336,14 +1336,6 @@ def test_basic(self):
}, fuzzer_metadata)

log_time = datetime.datetime(1970, 1, 1, 0, 0)
log_call = mock.call(
b'Component revisions (build r1):\n'
b'component: rev\n\nBot name: hostname.company.com\n'
b'Return code: 1\n\n'
b'Command: cmd\nTime ran: 42.0\n\n'
b'logs\n'
b'cf::fuzzing_strategies: strategy_1:1,strategy_2:50', log_time)
self.mock.upload_log.assert_has_calls([log_call, log_call])
self.mock.upload_testcase.assert_has_calls([
mock.call('/input', log_time),
mock.call('/input', log_time),
Expand Down
Loading

0 comments on commit 25243d1

Please sign in to comment.