diff --git a/src/clusterfuzz/_internal/metrics/logs.py b/src/clusterfuzz/_internal/metrics/logs.py index 6b35d255b3..6a69e8fb5c 100644 --- a/src/clusterfuzz/_internal/metrics/logs.py +++ b/src/clusterfuzz/_internal/metrics/logs.py @@ -24,8 +24,6 @@ import traceback from typing import Any -from clusterfuzz._internal.system import environment - STACKDRIVER_LOG_MESSAGE_LIMIT = 80000 # Allowed log entry size is 100 KB. LOCAL_LOG_MESSAGE_LIMIT = 100000 LOCAL_LOG_LIMIT = 500000 @@ -36,7 +34,7 @@ def _increment_error_count(): """"Increment the error count metric.""" - if environment.is_running_on_k8s(): + if _is_running_on_k8s(): task_name = 'k8s' elif _is_running_on_app_engine(): task_name = 'appengine' @@ -62,6 +60,11 @@ def _is_running_on_app_engine(): os.getenv('SERVER_SOFTWARE').startswith('Google App Engine/'))) +def _is_running_on_k8s(): + """Returns whether or not we're running on K8s.""" + return os.getenv('IS_K8S_ENV') == 'true' + + def _console_logging_enabled(): """Return bool on where console logging is enabled, usually for tests.""" return bool(os.getenv('LOG_TO_CONSOLE')) @@ -75,9 +78,9 @@ def _file_logging_enabled(): This is disabled if we are running in app engine or kubernetes as these have their dedicated loggers, see configure_appengine() and configure_k8s(). """ - return bool( - os.getenv('LOG_TO_FILE', 'True') - ) and not _is_running_on_app_engine() and not environment.is_running_on_k8s() + return bool(os.getenv( + 'LOG_TO_FILE', + 'True')) and not _is_running_on_app_engine() and not _is_running_on_k8s() def _fluentd_logging_enabled(): @@ -87,7 +90,7 @@ def _fluentd_logging_enabled(): kubernetes as these have their dedicated loggers, see configure_appengine() and configure_k8s().""" return bool(os.getenv('LOG_TO_FLUENTD', 'True')) and not _is_local( - ) and not _is_running_on_app_engine() and not environment.is_running_on_k8s() + ) and not _is_running_on_app_engine() and not _is_running_on_k8s() def _cloud_logging_enabled(): @@ -96,7 +99,7 @@ def _cloud_logging_enabled(): or kubernetes as these have their dedicated loggers, see configure_appengine() and configure_k8s().""" return bool(os.getenv('LOG_TO_GCP')) and not _is_local( - ) and not _is_running_on_app_engine() and not environment.is_running_on_k8s() + ) and not _is_running_on_app_engine() and not _is_running_on_k8s() def suppress_unwanted_warnings(): @@ -414,7 +417,7 @@ def configure(name, extras=None): |extras| will be included by emit() in log messages.""" suppress_unwanted_warnings() - if environment.is_running_on_k8s(): + if _is_running_on_k8s(): configure_k8s() return @@ -450,7 +453,7 @@ def get_logger(): if _logger: return _logger - if _is_running_on_app_engine() or environment.is_running_on_k8s(): + if _is_running_on_app_engine() or _is_running_on_k8s(): # Running on App Engine. set_logger(logging.getLogger()) diff --git a/src/clusterfuzz/_internal/metrics/monitor.py b/src/clusterfuzz/_internal/metrics/monitor.py index 432266fb6f..8148915f66 100644 --- a/src/clusterfuzz/_internal/metrics/monitor.py +++ b/src/clusterfuzz/_internal/metrics/monitor.py @@ -101,40 +101,6 @@ def _time_series_sort_key(ts): return ts.points[-1].interval.start_time -def flush_metrics(): - """Flushes all metrics stored in _metrics_store""" - project_path = _monitoring_v3_client.common_project_path( # pylint: disable=no-member - utils.get_application_id()) - try: - time_series = [] - end_time = time.time() - for metric, labels, start_time, value in _metrics_store.iter_values(): - if (metric.metric_kind == metric_pb2.MetricDescriptor.MetricKind.GAUGE # pylint: disable=no-member - ): - start_time = end_time - - series = _TimeSeries() - metric.monitoring_v3_time_series(series, labels, start_time, end_time, - value) - time_series.append(series) - - if len(time_series) == MAX_TIME_SERIES_PER_CALL: - time_series.sort(key=_time_series_sort_key) - _create_time_series(project_path, time_series) - time_series = [] - - if time_series: - time_series.sort(key=_time_series_sort_key) - _create_time_series(project_path, time_series) - except Exception as e: - if environment.is_android(): - # FIXME: This exception is extremely common on Android. We are already - # aware of the problem, don't make more noise about it. - logs.warning(f'Failed to flush metrics: {e}') - else: - logs.error(f'Failed to flush metrics: {e}') - - class _FlusherThread(threading.Thread): """Flusher thread.""" @@ -145,14 +111,41 @@ def __init__(self): def run(self): """Run the flusher thread.""" - should_stop = False + project_path = _monitoring_v3_client.common_project_path( # pylint: disable=no-member + utils.get_application_id()) + while True: - # Make sure there are no metrics left to flush after monitor.stop() - if self.stop_event.wait(FLUSH_INTERVAL_SECONDS): - should_stop = True - flush_metrics() - if should_stop: - return + try: + if self.stop_event.wait(FLUSH_INTERVAL_SECONDS): + return + + time_series = [] + end_time = time.time() + for metric, labels, start_time, value in _metrics_store.iter_values(): + if (metric.metric_kind == metric_pb2.MetricDescriptor.MetricKind.GAUGE # pylint: disable=no-member + ): + start_time = end_time + + series = _TimeSeries() + metric.monitoring_v3_time_series(series, labels, start_time, end_time, + value) + time_series.append(series) + + if len(time_series) == MAX_TIME_SERIES_PER_CALL: + time_series.sort(key=_time_series_sort_key) + _create_time_series(project_path, time_series) + time_series = [] + + if time_series: + time_series.sort(key=_time_series_sort_key) + _create_time_series(project_path, time_series) + except Exception as e: + if environment.is_android(): + # FIXME: This exception is extremely common on Android. We are already + # aware of the problem, don't make more noise about it. + logs.warning(f'Failed to flush metrics: {e}') + else: + logs.error(f'Failed to flush metrics: {e}') def stop(self): self.stop_event.set() @@ -296,9 +289,8 @@ def monitoring_v3_metric(self, metric, labels=None): metric.labels[key] = str(value) # Default labels. - if not environment.is_running_on_k8s(): - bot_name = environment.get_value('BOT_NAME', None) - metric.labels['region'] = _get_region(bot_name) + bot_name = environment.get_value('BOT_NAME') + metric.labels['region'] = _get_region(bot_name) return metric @@ -550,12 +542,7 @@ def _initialize_monitored_resource(): _monitored_resource.labels['project_id'] = utils.get_application_id() # Use bot name here instance as that's more useful to us. - # In case it is in Kubernetes, we use the pod name - if environment.is_running_on_k8s(): - instance_name = environment.get_value('HOSTNAME') - else: - instance_name = environment.get_value('BOT_NAME') - _monitored_resource.labels['instance_id'] = instance_name + _monitored_resource.labels['instance_id'] = environment.get_value('BOT_NAME') if compute_metadata.is_gce(): # Returned in the form projects/{id}/zones/{zone} @@ -574,7 +561,7 @@ def _time_to_timestamp(interval, attr, time_seconds): setattr(interval, attr, timestamp) -def initialize(use_flusher_thread=True): +def initialize(): """Initialize if monitoring is enabled for this bot.""" global _monitoring_v3_client global _flusher_thread @@ -589,17 +576,14 @@ def initialize(use_flusher_thread=True): _initialize_monitored_resource() _monitoring_v3_client = monitoring_v3.MetricServiceClient( credentials=credentials.get_default()[0]) - if use_flusher_thread: - _flusher_thread = _FlusherThread() - _flusher_thread.start() + _flusher_thread = _FlusherThread() + _flusher_thread.start() def stop(): """Stops monitoring and cleans up (only if monitoring is enabled).""" if _flusher_thread: _flusher_thread.stop() - if not _flusher_thread and check_module_loaded(monitoring_v3): - flush_metrics() def metrics_store(): diff --git a/src/clusterfuzz/_internal/metrics/monitoring_metrics.py b/src/clusterfuzz/_internal/metrics/monitoring_metrics.py index 761161acc7..89785c7658 100644 --- a/src/clusterfuzz/_internal/metrics/monitoring_metrics.py +++ b/src/clusterfuzz/_internal/metrics/monitoring_metrics.py @@ -200,13 +200,3 @@ monitor.StringField('platform'), ], ) - -# Cron metrics - -# 0 value means healthy, 1 unhealthy -CLUSTERFUZZ_CRON_EXIT_CODE = monitor.GaugeMetric( - 'clusterfuzz_cron_exit_code', - description='Tracks successful completion of cronjobs', - field_spec=[ - monitor.StringField('cron_name'), - ]) diff --git a/src/clusterfuzz/_internal/system/environment.py b/src/clusterfuzz/_internal/system/environment.py index c3ceea39e6..b726c91684 100644 --- a/src/clusterfuzz/_internal/system/environment.py +++ b/src/clusterfuzz/_internal/system/environment.py @@ -54,11 +54,6 @@ } -def is_running_on_k8s(): - """Returns whether or not we're running on K8s.""" - return os.getenv('IS_K8S_ENV') == 'true' - - def _eval_value(value_string): """Returns evaluated value.""" try: diff --git a/src/python/bot/startup/run_cron.py b/src/python/bot/startup/run_cron.py index 2ed1f08b3a..d9d3683226 100644 --- a/src/python/bot/startup/run_cron.py +++ b/src/python/bot/startup/run_cron.py @@ -27,8 +27,6 @@ from clusterfuzz._internal.config import local_config from clusterfuzz._internal.datastore import ndb_init from clusterfuzz._internal.metrics import logs -from clusterfuzz._internal.metrics import monitor -from clusterfuzz._internal.metrics import monitoring_metrics from clusterfuzz._internal.system import environment @@ -57,35 +55,12 @@ def main(): except ImportError: pass - # Few metrics get collected per job run, so - # no need for a thread to continuously push those - monitor.initialize(use_flusher_thread=False) - task = sys.argv[1] task_module_name = f'clusterfuzz._internal.cron.{task}' - labels = { - 'cron_name': task, - } - - return_code = 0 - with ndb_init.context(): task_module = importlib.import_module(task_module_name) - exception = None - try: - return_code = 0 if task_module.main() else 1 - except Exception as e: - return_code = 1 - exception = e - finally: - monitoring_metrics.CLUSTERFUZZ_CRON_EXIT_CODE.set( - return_code, labels=labels) - monitor.stop() - if exception: - raise exception - - return return_code + return 0 if task_module.main() else 1 if __name__ == '__main__':