Skip to content

Commit

Permalink
Revert Kubernetes cronjob instrumentation (#4293)
Browse files Browse the repository at this point in the history
As far as Kubernetes cronjobs go, successful terminations can be
obtained through log base metrics in GCP, with the following query:

```
jsonPayload.involvedObject.name =~ "${cronjob_name}"
jsonPayload.message = "Job completed"
resource.type="k8s_cluster"
```

This metric suffices to alert on, since the abscence of successful runs
for 24h suffices to signal that a cron is broken. It is obtained from
kubernetes events (kubectl get events), so we can rely on the Kubernetes
control plane for cronjob instrumentation instead of rolling our own.

Thus, this PR will revert the previous metrics added to Kubernetes
cronjobs, for the sake of simplifying the codebase.
  • Loading branch information
vitorguidi authored Oct 4, 2024
1 parent ed73c3f commit 9fd8a61
Show file tree
Hide file tree
Showing 5 changed files with 54 additions and 107 deletions.
23 changes: 13 additions & 10 deletions src/clusterfuzz/_internal/metrics/logs.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@
import traceback
from typing import Any

from clusterfuzz._internal.system import environment

STACKDRIVER_LOG_MESSAGE_LIMIT = 80000 # Allowed log entry size is 100 KB.
LOCAL_LOG_MESSAGE_LIMIT = 100000
LOCAL_LOG_LIMIT = 500000
Expand All @@ -36,7 +34,7 @@

def _increment_error_count():
""""Increment the error count metric."""
if environment.is_running_on_k8s():
if _is_running_on_k8s():
task_name = 'k8s'
elif _is_running_on_app_engine():
task_name = 'appengine'
Expand All @@ -62,6 +60,11 @@ def _is_running_on_app_engine():
os.getenv('SERVER_SOFTWARE').startswith('Google App Engine/')))


def _is_running_on_k8s():
"""Returns whether or not we're running on K8s."""
return os.getenv('IS_K8S_ENV') == 'true'


def _console_logging_enabled():
"""Return bool on where console logging is enabled, usually for tests."""
return bool(os.getenv('LOG_TO_CONSOLE'))
Expand All @@ -75,9 +78,9 @@ def _file_logging_enabled():
This is disabled if we are running in app engine or kubernetes as these have
their dedicated loggers, see configure_appengine() and configure_k8s().
"""
return bool(
os.getenv('LOG_TO_FILE', 'True')
) and not _is_running_on_app_engine() and not environment.is_running_on_k8s()
return bool(os.getenv(
'LOG_TO_FILE',
'True')) and not _is_running_on_app_engine() and not _is_running_on_k8s()


def _fluentd_logging_enabled():
Expand All @@ -87,7 +90,7 @@ def _fluentd_logging_enabled():
kubernetes as these have their dedicated loggers, see configure_appengine()
and configure_k8s()."""
return bool(os.getenv('LOG_TO_FLUENTD', 'True')) and not _is_local(
) and not _is_running_on_app_engine() and not environment.is_running_on_k8s()
) and not _is_running_on_app_engine() and not _is_running_on_k8s()


def _cloud_logging_enabled():
Expand All @@ -96,7 +99,7 @@ def _cloud_logging_enabled():
or kubernetes as these have their dedicated loggers, see
configure_appengine() and configure_k8s()."""
return bool(os.getenv('LOG_TO_GCP')) and not _is_local(
) and not _is_running_on_app_engine() and not environment.is_running_on_k8s()
) and not _is_running_on_app_engine() and not _is_running_on_k8s()


def suppress_unwanted_warnings():
Expand Down Expand Up @@ -414,7 +417,7 @@ def configure(name, extras=None):
|extras| will be included by emit() in log messages."""
suppress_unwanted_warnings()

if environment.is_running_on_k8s():
if _is_running_on_k8s():
configure_k8s()
return

Expand Down Expand Up @@ -450,7 +453,7 @@ def get_logger():
if _logger:
return _logger

if _is_running_on_app_engine() or environment.is_running_on_k8s():
if _is_running_on_app_engine() or _is_running_on_k8s():
# Running on App Engine.
set_logger(logging.getLogger())

Expand Down
96 changes: 40 additions & 56 deletions src/clusterfuzz/_internal/metrics/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,40 +101,6 @@ def _time_series_sort_key(ts):
return ts.points[-1].interval.start_time


def flush_metrics():
"""Flushes all metrics stored in _metrics_store"""
project_path = _monitoring_v3_client.common_project_path( # pylint: disable=no-member
utils.get_application_id())
try:
time_series = []
end_time = time.time()
for metric, labels, start_time, value in _metrics_store.iter_values():
if (metric.metric_kind == metric_pb2.MetricDescriptor.MetricKind.GAUGE # pylint: disable=no-member
):
start_time = end_time

series = _TimeSeries()
metric.monitoring_v3_time_series(series, labels, start_time, end_time,
value)
time_series.append(series)

if len(time_series) == MAX_TIME_SERIES_PER_CALL:
time_series.sort(key=_time_series_sort_key)
_create_time_series(project_path, time_series)
time_series = []

if time_series:
time_series.sort(key=_time_series_sort_key)
_create_time_series(project_path, time_series)
except Exception as e:
if environment.is_android():
# FIXME: This exception is extremely common on Android. We are already
# aware of the problem, don't make more noise about it.
logs.warning(f'Failed to flush metrics: {e}')
else:
logs.error(f'Failed to flush metrics: {e}')


class _FlusherThread(threading.Thread):
"""Flusher thread."""

Expand All @@ -145,14 +111,41 @@ def __init__(self):

def run(self):
"""Run the flusher thread."""
should_stop = False
project_path = _monitoring_v3_client.common_project_path( # pylint: disable=no-member
utils.get_application_id())

while True:
# Make sure there are no metrics left to flush after monitor.stop()
if self.stop_event.wait(FLUSH_INTERVAL_SECONDS):
should_stop = True
flush_metrics()
if should_stop:
return
try:
if self.stop_event.wait(FLUSH_INTERVAL_SECONDS):
return

time_series = []
end_time = time.time()
for metric, labels, start_time, value in _metrics_store.iter_values():
if (metric.metric_kind == metric_pb2.MetricDescriptor.MetricKind.GAUGE # pylint: disable=no-member
):
start_time = end_time

series = _TimeSeries()
metric.monitoring_v3_time_series(series, labels, start_time, end_time,
value)
time_series.append(series)

if len(time_series) == MAX_TIME_SERIES_PER_CALL:
time_series.sort(key=_time_series_sort_key)
_create_time_series(project_path, time_series)
time_series = []

if time_series:
time_series.sort(key=_time_series_sort_key)
_create_time_series(project_path, time_series)
except Exception as e:
if environment.is_android():
# FIXME: This exception is extremely common on Android. We are already
# aware of the problem, don't make more noise about it.
logs.warning(f'Failed to flush metrics: {e}')
else:
logs.error(f'Failed to flush metrics: {e}')

def stop(self):
self.stop_event.set()
Expand Down Expand Up @@ -296,9 +289,8 @@ def monitoring_v3_metric(self, metric, labels=None):
metric.labels[key] = str(value)

# Default labels.
if not environment.is_running_on_k8s():
bot_name = environment.get_value('BOT_NAME', None)
metric.labels['region'] = _get_region(bot_name)
bot_name = environment.get_value('BOT_NAME')
metric.labels['region'] = _get_region(bot_name)

return metric

Expand Down Expand Up @@ -550,12 +542,7 @@ def _initialize_monitored_resource():
_monitored_resource.labels['project_id'] = utils.get_application_id()

# Use bot name here instance as that's more useful to us.
# In case it is in Kubernetes, we use the pod name
if environment.is_running_on_k8s():
instance_name = environment.get_value('HOSTNAME')
else:
instance_name = environment.get_value('BOT_NAME')
_monitored_resource.labels['instance_id'] = instance_name
_monitored_resource.labels['instance_id'] = environment.get_value('BOT_NAME')

if compute_metadata.is_gce():
# Returned in the form projects/{id}/zones/{zone}
Expand All @@ -574,7 +561,7 @@ def _time_to_timestamp(interval, attr, time_seconds):
setattr(interval, attr, timestamp)


def initialize(use_flusher_thread=True):
def initialize():
"""Initialize if monitoring is enabled for this bot."""
global _monitoring_v3_client
global _flusher_thread
Expand All @@ -589,17 +576,14 @@ def initialize(use_flusher_thread=True):
_initialize_monitored_resource()
_monitoring_v3_client = monitoring_v3.MetricServiceClient(
credentials=credentials.get_default()[0])
if use_flusher_thread:
_flusher_thread = _FlusherThread()
_flusher_thread.start()
_flusher_thread = _FlusherThread()
_flusher_thread.start()


def stop():
"""Stops monitoring and cleans up (only if monitoring is enabled)."""
if _flusher_thread:
_flusher_thread.stop()
if not _flusher_thread and check_module_loaded(monitoring_v3):
flush_metrics()


def metrics_store():
Expand Down
10 changes: 0 additions & 10 deletions src/clusterfuzz/_internal/metrics/monitoring_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,13 +200,3 @@
monitor.StringField('platform'),
],
)

# Cron metrics

# 0 value means healthy, 1 unhealthy
CLUSTERFUZZ_CRON_EXIT_CODE = monitor.GaugeMetric(
'clusterfuzz_cron_exit_code',
description='Tracks successful completion of cronjobs',
field_spec=[
monitor.StringField('cron_name'),
])
5 changes: 0 additions & 5 deletions src/clusterfuzz/_internal/system/environment.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,6 @@
}


def is_running_on_k8s():
"""Returns whether or not we're running on K8s."""
return os.getenv('IS_K8S_ENV') == 'true'


def _eval_value(value_string):
"""Returns evaluated value."""
try:
Expand Down
27 changes: 1 addition & 26 deletions src/python/bot/startup/run_cron.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@
from clusterfuzz._internal.config import local_config
from clusterfuzz._internal.datastore import ndb_init
from clusterfuzz._internal.metrics import logs
from clusterfuzz._internal.metrics import monitor
from clusterfuzz._internal.metrics import monitoring_metrics
from clusterfuzz._internal.system import environment


Expand Down Expand Up @@ -57,35 +55,12 @@ def main():
except ImportError:
pass

# Few metrics get collected per job run, so
# no need for a thread to continuously push those
monitor.initialize(use_flusher_thread=False)

task = sys.argv[1]

task_module_name = f'clusterfuzz._internal.cron.{task}'
labels = {
'cron_name': task,
}

return_code = 0

with ndb_init.context():
task_module = importlib.import_module(task_module_name)
exception = None
try:
return_code = 0 if task_module.main() else 1
except Exception as e:
return_code = 1
exception = e
finally:
monitoring_metrics.CLUSTERFUZZ_CRON_EXIT_CODE.set(
return_code, labels=labels)
monitor.stop()
if exception:
raise exception

return return_code
return 0 if task_module.main() else 1


if __name__ == '__main__':
Expand Down

0 comments on commit 9fd8a61

Please sign in to comment.