From e7cdf319609fa00fdc6e864dcf5344538d1d1853 Mon Sep 17 00:00:00 2001 From: jarek Date: Wed, 21 Jun 2017 11:09:02 +0100 Subject: [PATCH 01/40] System stats integrated --- bucky/cfg.py | 4 ++ bucky/main.py | 3 + bucky/system.py | 155 ++++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 162 insertions(+) create mode 100644 bucky/system.py diff --git a/bucky/cfg.py b/bucky/cfg.py index ee0df8a..e826e69 100644 --- a/bucky/cfg.py +++ b/bucky/cfg.py @@ -89,6 +89,10 @@ name_strip_duplicates = True name_host_trim = [] +system_stats_enabled = True +system_stats_interval = 1 +system_stats_metadata = None + custom_clients = [] processor = None diff --git a/bucky/main.py b/bucky/main.py index 749d1ff..2404ff8 100644 --- a/bucky/main.py +++ b/bucky/main.py @@ -36,6 +36,7 @@ import bucky.collectd as collectd import bucky.metricsd as metricsd import bucky.statsd as statsd +import bucky.system as system import bucky.processor as processor from bucky.errors import BuckyError @@ -251,6 +252,8 @@ def __init__(self, cfg): stypes.append(collectd.getCollectDServer) if cfg.statsd_enabled: stypes.append(statsd.StatsDServer) + if cfg.system_stats_enabled: + stypes.append(system.SystemStatsServer) self.servers = [] for stype in stypes: diff --git a/bucky/system.py b/bucky/system.py new file mode 100644 index 0000000..9632c44 --- /dev/null +++ b/bucky/system.py @@ -0,0 +1,155 @@ + +import os +import time +import logging +import multiprocessing + +import six + + +if six.PY3: + xrange = range + long = int + + +log = logging.getLogger(__name__) + + +class SystemStatsServer(multiprocessing.Process): + # The order of cpu fields in /proc/stat + CPU_FIELDS = ('user', 'nice', 'system', 'idle', 'wait', 'interrupt', 'softirq', 'steal') + + def __init__(self, queue, cfg): + super(SystemStatsServer, self).__init__() + self.queue = queue + self.metadata = cfg.system_stats_metadata + self.interval = cfg.system_stats_interval + + def close(self): + pass + + def run(self): + while True: + start_timestamp = time.time() + self.read_cpu_stats() + self.read_load_stats() + self.read_df_stats() + self.read_memory_stats() + self.read_interface_stats() + stop_timestamp = time.time() + sleep_time = self.interval - (stop_timestamp - start_timestamp) + if sleep_time > 0.1: + time.sleep(sleep_time) + + def add_stat(self, name, value, timestamp, metadata): + if metadata: + if self.metadata: + metadata.update(self.metadata) + else: + metadata = self.metadata + if metadata: + self.queue.put((None, name, value, timestamp, metadata)) + else: + self.queue.put((None, name, value, timestamp)) + + def read_cpu_stats(self): + now = int(time.time()) + with open('/proc/stat') as f: + for l in f.readlines(): + tokens = l.strip().split() + if not tokens: + continue + name = tokens[0] + if not name.startswith('cpu'): + if name == 'ctxt': + self.add_stat("processes", long(tokens[1]), now, {"type": "switches"}) + elif name == 'processes': + self.add_stat("processes", long(tokens[1]), now, {"type": "forks"}) + elif name == 'procs_running': + self.add_stat("processes", long(tokens[1]), now, {"type": "running"}) + else: + cpu_suffix = name[3:] + if not cpu_suffix: + continue + for k, v in zip(self.CPU_FIELDS, tokens[1:]): + self.add_stat("cpu", long(v), now, {"instance": cpu_suffix, "type": k}) + + def read_df_stats(self): + now = int(time.time()) + with open('/proc/mounts') as f: + for l in f.readlines(): + tokens = l.strip().split() + if not tokens or len(tokens) != 6: + continue + if not tokens[1].startswith('/'): + continue + mount_target, mount_path, mount_filesystem = tokens[:3] + try: + stats = os.statvfs(mount_path) + total_inodes = long(stats.f_files) + # Skip special filesystems + if not total_inodes: + continue + block_size = stats.f_bsize + self.add_stat("df", long(stats.f_bavail) * block_size, now, + dict(target=mount_target, instance=mount_path, fs=mount_filesystem, + type="bytes", description="free")) + self.add_stat("df", long(stats.f_blocks) * block_size, now, + dict(target=mount_target, instance=mount_path, fs=mount_filesystem, + type="bytes", description="total")) + self.add_stat("df", long(stats.f_favail), now, + dict(target=mount_target, instance=mount_path, fs=mount_filesystem, + type="inodes", description="free")) + self.add_stat("df", total_inodes, now, + dict(target=mount_target, instance=mount_path, fs=mount_filesystem, + type="inodes", description="total")) + except OSError: + pass + + def read_interface_stats(self): + now = int(time.time()) + with open('/proc/net/dev') as f: + for l in f.readlines(): + tokens = l.strip().split() + if not tokens or len(tokens) != 17: + continue + if not tokens[0].endswith(':'): + continue + name = tokens[0][:-1] + self.add_stat("interface", long(tokens[1]), now, dict(instance=name, direction="rx", type="bytes")) + self.add_stat("interface", long(tokens[2]), now, dict(instance=name, direction="rx", type="packets")) + self.add_stat("interface", long(tokens[3]), now, dict(instance=name, direction="rx", type="errors")) + self.add_stat("interface", long(tokens[4]), now, dict(instance=name, direction="rx", type="drops")) + self.add_stat("interface", long(tokens[9]), now, dict(instance=name, direction="tx", type="bytes")) + self.add_stat("interface", long(tokens[10]), now, dict(instance=name, direction="tx", type="packets")) + self.add_stat("interface", long(tokens[11]), now, dict(instance=name, direction="tx", type="errors")) + self.add_stat("interface", long(tokens[12]), now, dict(instance=name, direction="tx", type="drops")) + + def read_load_stats(self): + now = int(time.time()) + with open('/proc/loadavg') as f: + for l in f.readlines(): + tokens = l.strip().split() + if not tokens or len(tokens) != 5: + continue + self.add_stat("load", float(tokens[0]), now, dict(type="1m")) + self.add_stat("load", float(tokens[1]), now, dict(type="5m")) + self.add_stat("load", float(tokens[2]), now, dict(type="15m")) + + def read_memory_stats(self): + now = int(time.time()) + with open('/proc/meminfo') as f: + for l in f.readlines(): + tokens = l.strip().split() + if not tokens or len(tokens) != 3 or tokens[2].lower() != 'kb': + continue + name = tokens[0] + if not name.endswith(":"): + continue + name = name[:-1].lower() + if name == "memtotal": + self.add_stat("memory", long(tokens[1]) * 1024, now, dict(type="total", description="bytes")) + elif name == "memfree": + self.add_stat("memory", long(tokens[1]) * 1024, now, dict(type="free", description="bytes")) + elif name == "memavailable": + self.add_stat("memory", long(tokens[1]) * 1024, now, dict(type="available", description="bytes")) From 5a1927a3dd52c57f595387b6d72c6bb4f413ffeb Mon Sep 17 00:00:00 2001 From: jarek Date: Wed, 21 Jun 2017 13:35:47 +0100 Subject: [PATCH 02/40] Docker stats go in --- bucky/cfg.py | 4 ++ bucky/dockerstats.py | 98 ++++++++++++++++++++++++++++++++++++++++++++ bucky/main.py | 3 ++ 3 files changed, 105 insertions(+) create mode 100644 bucky/dockerstats.py diff --git a/bucky/cfg.py b/bucky/cfg.py index e826e69..6e1daf5 100644 --- a/bucky/cfg.py +++ b/bucky/cfg.py @@ -93,6 +93,10 @@ system_stats_interval = 1 system_stats_metadata = None +docker_stats_enabled = True +docker_stats_interval = 1 +docker_stats_metadata = None + custom_clients = [] processor = None diff --git a/bucky/dockerstats.py b/bucky/dockerstats.py new file mode 100644 index 0000000..f046182 --- /dev/null +++ b/bucky/dockerstats.py @@ -0,0 +1,98 @@ + +import time +import docker +import logging +import multiprocessing + +import six + + +if six.PY3: + xrange = range + long = int + + +log = logging.getLogger(__name__) + + +class DockerStatsServer(multiprocessing.Process): + def __init__(self, queue, cfg): + super(DockerStatsServer, self).__init__() + self.queue = queue + self.metadata = cfg.docker_stats_metadata + self.interval = cfg.docker_stats_interval + self.docker_client = docker.client.from_env() + + def close(self): + pass + + def run(self): + while True: + start_timestamp = time.time() + self.read_docker_stats() + stop_timestamp = time.time() + sleep_time = self.interval - (stop_timestamp - start_timestamp) + if sleep_time > 0.1: + time.sleep(sleep_time) + + def add_stat(self, name, value, timestamp, metadata): + if metadata: + if self.metadata: + metadata.update(self.metadata) + else: + metadata = self.metadata + if metadata: + self.queue.put((None, name, value, timestamp, metadata)) + else: + self.queue.put((None, name, value, timestamp)) + + def _merge(self, *dicts): + ret = {} + for d in dicts: + ret.update(d) + return ret + + def _add_df_stats(self, now, labels, total_size, rw_size): + self.add_stat("docker_df", long(total_size), now, + self._merge(labels, dict(type="bytes", description="total"))) + self.add_stat("docker_df", long(rw_size), now, + self._merge(labels, dict(type="bytes", description="used"))) + + def _add_cpu_stats(self, now, labels, stats): + for k, v in enumerate(stats[u'percpu_usage']): + self.add_stat("docker_cpu", long(v), now, + self._merge(labels, {"instance": k, "type": "usage"})) + + def _add_interface_stats(self, now, labels, stats): + for k in stats.keys(): + v = stats[k] + self.add_stat("docker_interface", long(v[u'rx_bytes']), now, + self._merge(labels, dict(instance=k, direction="rx", type="bytes"))) + self.add_stat("docker_interface", long(v[u'rx_packets']), now, + self._merge(labels, dict(instance=k, direction="rx", type="packets"))) + self.add_stat("docker_interface", long(v[u'rx_errors']), now, + self._merge(labels, dict(instance=k, direction="rx", type="errors"))) + self.add_stat("docker_interface", long(v[u'rx_dropped']), now, + self._merge(labels, dict(instance=k, direction="rx", type="drops"))) + self.add_stat("docker_interface", long(v[u'tx_bytes']), now, + self._merge(labels, dict(instance=k, direction="tx", type="bytes"))) + self.add_stat("docker_interface", long(v[u'tx_packets']), now, + self._merge(labels, dict(instance=k, direction="tx", type="packets"))) + self.add_stat("docker_interface", long(v[u'tx_errors']), now, + self._merge(labels, dict(instance=k, direction="tx", type="errors"))) + self.add_stat("docker_interface", long(v[u'tx_dropped']), now, + self._merge(labels, dict(instance=k, direction="tx", type="drops"))) + + def _add_memory_stats(self, now, labels, stats): + self.add_stat("docker_memory", long(stats[u'usage']), now, + self._merge(labels, dict(type="used", description="bytes"))) + + def read_docker_stats(self): + now = int(time.time()) + for container in self.docker_client.api.containers(size=True): + labels = container[u'Labels'] + stats = self.docker_client.api.stats(container[u'Id'], decode=True, stream=False) + self._add_df_stats(now, labels, long(container[u'SizeRootFs']), long(container.get(u'SizeRw', 0))) + self._add_cpu_stats(now, labels, stats[u'cpu_stats'][u'cpu_usage']) + self._add_memory_stats(now, labels, stats[u'memory_stats']) + self._add_interface_stats(now, labels, stats[u'networks']) diff --git a/bucky/main.py b/bucky/main.py index 2404ff8..5491a8f 100644 --- a/bucky/main.py +++ b/bucky/main.py @@ -37,6 +37,7 @@ import bucky.metricsd as metricsd import bucky.statsd as statsd import bucky.system as system +import bucky.dockerstats as dockerstats import bucky.processor as processor from bucky.errors import BuckyError @@ -254,6 +255,8 @@ def __init__(self, cfg): stypes.append(statsd.StatsDServer) if cfg.system_stats_enabled: stypes.append(system.SystemStatsServer) + if cfg.docker_stats_enabled: + stypes.append(dockerstats.DockerStatsServer) self.servers = [] for stype in stypes: From 47bbebb7302da031e1c7fb1af6a34b6572fc0e72 Mon Sep 17 00:00:00 2001 From: jarek Date: Wed, 21 Jun 2017 13:38:59 +0100 Subject: [PATCH 03/40] Rename system.py to systemstats.py --- bucky/main.py | 4 ++-- bucky/{system.py => systemstats.py} | 0 2 files changed, 2 insertions(+), 2 deletions(-) rename bucky/{system.py => systemstats.py} (100%) diff --git a/bucky/main.py b/bucky/main.py index 5491a8f..2e6d3c7 100644 --- a/bucky/main.py +++ b/bucky/main.py @@ -36,7 +36,7 @@ import bucky.collectd as collectd import bucky.metricsd as metricsd import bucky.statsd as statsd -import bucky.system as system +import bucky.systemstats as systemstats import bucky.dockerstats as dockerstats import bucky.processor as processor from bucky.errors import BuckyError @@ -254,7 +254,7 @@ def __init__(self, cfg): if cfg.statsd_enabled: stypes.append(statsd.StatsDServer) if cfg.system_stats_enabled: - stypes.append(system.SystemStatsServer) + stypes.append(systemstats.SystemStatsServer) if cfg.docker_stats_enabled: stypes.append(dockerstats.DockerStatsServer) diff --git a/bucky/system.py b/bucky/systemstats.py similarity index 100% rename from bucky/system.py rename to bucky/systemstats.py From 3d8e236aab380c609ca02aa9abba51a1340bfcfb Mon Sep 17 00:00:00 2001 From: jarek Date: Wed, 21 Jun 2017 14:28:52 +0100 Subject: [PATCH 04/40] Add common metadata option for system/docker servers --- bucky/cfg.py | 4 +++- bucky/dockerstats.py | 6 +++++- bucky/systemstats.py | 6 +++++- 3 files changed, 13 insertions(+), 3 deletions(-) diff --git a/bucky/cfg.py b/bucky/cfg.py index 6e1daf5..330bbbe 100644 --- a/bucky/cfg.py +++ b/bucky/cfg.py @@ -7,6 +7,8 @@ directory = "/var/lib/bucky" process_join_timeout = 2 +metadata = None + sentry_enabled = False sentry_dsn = None sentry_log_level = "WARNING" @@ -34,7 +36,7 @@ statsd_port = 8125 statsd_enabled = True statsd_flush_time = 10.0 -statsd_metadata = {} +statsd_metadata = None statsd_legacy_namespace = True statsd_global_prefix = "stats" statsd_prefix_counter = "counters" diff --git a/bucky/dockerstats.py b/bucky/dockerstats.py index f046182..50f63d2 100644 --- a/bucky/dockerstats.py +++ b/bucky/dockerstats.py @@ -19,7 +19,11 @@ class DockerStatsServer(multiprocessing.Process): def __init__(self, queue, cfg): super(DockerStatsServer, self).__init__() self.queue = queue - self.metadata = cfg.docker_stats_metadata + self.metadata = {} + if cfg.metadata: + self.metadata.update(cfg.metadata) + if cfg.docker_stats_metadata: + self.metadata.update(cfg.docker_stats_metadata) self.interval = cfg.docker_stats_interval self.docker_client = docker.client.from_env() diff --git a/bucky/systemstats.py b/bucky/systemstats.py index 9632c44..716b4d7 100644 --- a/bucky/systemstats.py +++ b/bucky/systemstats.py @@ -22,7 +22,11 @@ class SystemStatsServer(multiprocessing.Process): def __init__(self, queue, cfg): super(SystemStatsServer, self).__init__() self.queue = queue - self.metadata = cfg.system_stats_metadata + self.metadata = {} + if cfg.metadata: + self.metadata.update(cfg.metadata) + if cfg.system_stats_metadata: + self.metadata.update(cfg.system_stats_metadata) self.interval = cfg.system_stats_interval def close(self): From 4cfe6bd1041f79c7618d3365f0d443763e495bad Mon Sep 17 00:00:00 2001 From: jarek Date: Thu, 22 Jun 2017 16:35:42 +0100 Subject: [PATCH 05/40] Add instance to docker labels --- bucky/dockerstats.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/bucky/dockerstats.py b/bucky/dockerstats.py index 50f63d2..c51c859 100644 --- a/bucky/dockerstats.py +++ b/bucky/dockerstats.py @@ -93,8 +93,10 @@ def _add_memory_stats(self, now, labels, stats): def read_docker_stats(self): now = int(time.time()) - for container in self.docker_client.api.containers(size=True): + for i, container in enumerate(self.docker_client.api.containers(size=True)): labels = container[u'Labels'] + if 'instance' not in labels: + labels['instance'] = i stats = self.docker_client.api.stats(container[u'Id'], decode=True, stream=False) self._add_df_stats(now, labels, long(container[u'SizeRootFs']), long(container.get(u'SizeRw', 0))) self._add_cpu_stats(now, labels, stats[u'cpu_stats'][u'cpu_usage']) From e87946f16a39d9c095f81e2ee7e4d433eb7a26dc Mon Sep 17 00:00:00 2001 From: jarek Date: Thu, 22 Jun 2017 16:51:26 +0100 Subject: [PATCH 06/40] Rename label to avoid conflict --- bucky/dockerstats.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/bucky/dockerstats.py b/bucky/dockerstats.py index c51c859..be9cfb3 100644 --- a/bucky/dockerstats.py +++ b/bucky/dockerstats.py @@ -95,8 +95,8 @@ def read_docker_stats(self): now = int(time.time()) for i, container in enumerate(self.docker_client.api.containers(size=True)): labels = container[u'Labels'] - if 'instance' not in labels: - labels['instance'] = i + if 'docker' not in labels: + labels['docker'] = i stats = self.docker_client.api.stats(container[u'Id'], decode=True, stream=False) self._add_df_stats(now, labels, long(container[u'SizeRootFs']), long(container.get(u'SizeRw', 0))) self._add_cpu_stats(now, labels, stats[u'cpu_stats'][u'cpu_usage']) From d404f0f18d0b9d540ca3a25a029b6d3f62a12bc6 Mon Sep 17 00:00:00 2001 From: jarek Date: Thu, 22 Jun 2017 19:18:12 +0100 Subject: [PATCH 07/40] Add conf metadata merging for statsd --- bucky/statsd.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/bucky/statsd.py b/bucky/statsd.py index ed01535..ba537c4 100644 --- a/bucky/statsd.py +++ b/bucky/statsd.py @@ -85,7 +85,11 @@ def __init__(self, queue, cfg): self.prefix_timer = cfg.statsd_prefix_timer self.prefix_gauge = cfg.statsd_prefix_gauge self.prefix_set = cfg.statsd_prefix_set - self.metadata = cfg.statsd_metadata + self.metadata = {} + if cfg.metadata: + self.metadata.update(cfg.metadata) + if cfg.system_stats_metadata: + self.metadata.update(cfg.system_stats_metadata) self.key_res = ( (re.compile("\s+"), "_"), (re.compile("\/"), "-"), From e02cec50c232fdb05cde78227ec0e12ffc90c78d Mon Sep 17 00:00:00 2001 From: jarek Date: Thu, 22 Jun 2017 19:18:38 +0100 Subject: [PATCH 08/40] Add metadata setting from command line --- bucky/main.py | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/bucky/main.py b/bucky/main.py index 2e6d3c7..0ae7fc4 100644 --- a/bucky/main.py +++ b/bucky/main.py @@ -148,6 +148,7 @@ def options(): type="str", default=cfg.gid, help="Drop privileges to this group" ), + op.make_option("--label", action="append", dest="labels") ] @@ -246,6 +247,20 @@ class Bucky(object): def __init__(self, cfg): self.sampleq = multiprocessing.Queue() + if cfg.labels: + if not cfg.metadata: + cfg.metadata = {} + for label in cfg.labels: + kv = label.split("=") + if len(kv) > 1: + cfg.metadata[kv[0]] = kv[1] + else: + kv = label.split(":") + if len(kv) > 1: + cfg.metadata[kv[0]] = kv[1] + else: + cfg.metadata[kv[0]] = None + stypes = [] if cfg.metricsd_enabled: stypes.append(metricsd.MetricsDServer) From 4446588092f8b10d639af24863314fa312165610 Mon Sep 17 00:00:00 2001 From: jarek Date: Thu, 22 Jun 2017 19:25:58 +0100 Subject: [PATCH 09/40] If docker connection fails, back off a bit --- bucky/dockerstats.py | 31 ++++++++++++++++++++----------- 1 file changed, 20 insertions(+), 11 deletions(-) diff --git a/bucky/dockerstats.py b/bucky/dockerstats.py index be9cfb3..78eb32d 100644 --- a/bucky/dockerstats.py +++ b/bucky/dockerstats.py @@ -5,6 +5,7 @@ import multiprocessing import six +import requests.exceptions if six.PY3: @@ -31,11 +32,15 @@ def close(self): pass def run(self): + err = 0 while True: start_timestamp = time.time() - self.read_docker_stats() + if not self.read_docker_stats(): + err = min(err + 1, 2) + else: + err = 0 stop_timestamp = time.time() - sleep_time = self.interval - (stop_timestamp - start_timestamp) + sleep_time = (err + 1) * self.interval - (stop_timestamp - start_timestamp) if sleep_time > 0.1: time.sleep(sleep_time) @@ -93,12 +98,16 @@ def _add_memory_stats(self, now, labels, stats): def read_docker_stats(self): now = int(time.time()) - for i, container in enumerate(self.docker_client.api.containers(size=True)): - labels = container[u'Labels'] - if 'docker' not in labels: - labels['docker'] = i - stats = self.docker_client.api.stats(container[u'Id'], decode=True, stream=False) - self._add_df_stats(now, labels, long(container[u'SizeRootFs']), long(container.get(u'SizeRw', 0))) - self._add_cpu_stats(now, labels, stats[u'cpu_stats'][u'cpu_usage']) - self._add_memory_stats(now, labels, stats[u'memory_stats']) - self._add_interface_stats(now, labels, stats[u'networks']) + try: + for i, container in enumerate(self.docker_client.api.containers(size=True)): + labels = container[u'Labels'] + if 'docker' not in labels: + labels['docker'] = i + stats = self.docker_client.api.stats(container[u'Id'], decode=True, stream=False) + self._add_df_stats(now, labels, long(container[u'SizeRootFs']), long(container.get(u'SizeRw', 0))) + self._add_cpu_stats(now, labels, stats[u'cpu_stats'][u'cpu_usage']) + self._add_memory_stats(now, labels, stats[u'memory_stats']) + self._add_interface_stats(now, labels, stats[u'networks']) + return True + except requests.exceptions.ConnectionError: + return False \ No newline at end of file From 268c39d85532ac58f58b73a93ea688cb7abddcf2 Mon Sep 17 00:00:00 2001 From: jarek Date: Fri, 23 Jun 2017 10:40:22 +0100 Subject: [PATCH 10/40] Small fixes for docker stuff --- bucky/dockerstats.py | 9 +++++++-- requirements.txt | 1 + 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/bucky/dockerstats.py b/bucky/dockerstats.py index 78eb32d..ac6c00b 100644 --- a/bucky/dockerstats.py +++ b/bucky/dockerstats.py @@ -26,7 +26,10 @@ def __init__(self, queue, cfg): if cfg.docker_stats_metadata: self.metadata.update(cfg.docker_stats_metadata) self.interval = cfg.docker_stats_interval - self.docker_client = docker.client.from_env() + if cfg.docker_stats_version: + self.docker_client = docker.client.from_env(version=cfg.docker_stats_version) + else: + self.docker_client = docker.client.from_env() def close(self): pass @@ -110,4 +113,6 @@ def read_docker_stats(self): self._add_interface_stats(now, labels, stats[u'networks']) return True except requests.exceptions.ConnectionError: - return False \ No newline at end of file + return False + except ValueError: + return False diff --git a/requirements.txt b/requirements.txt index 743e166..e2db503 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,3 +2,4 @@ six setproctitle pycrypto watchdog +docker \ No newline at end of file From d232aadadfb90a3430e8b138ea0194821636bc7d Mon Sep 17 00:00:00 2001 From: jarek Date: Fri, 23 Jun 2017 11:52:28 +0100 Subject: [PATCH 11/40] Add a list of ignored filesystems, set intervals to 10s --- bucky/cfg.py | 6 ++++-- bucky/systemstats.py | 5 +++++ 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/bucky/cfg.py b/bucky/cfg.py index 469c35d..1f2fcfb 100644 --- a/bucky/cfg.py +++ b/bucky/cfg.py @@ -93,12 +93,14 @@ name_host_trim = [] system_stats_enabled = True -system_stats_interval = 1 +system_stats_interval = 10 system_stats_metadata = None +system_stats_df_ignored = ['tmpfs', 'aufs'] docker_stats_enabled = True -docker_stats_interval = 1 +docker_stats_interval = 10 docker_stats_metadata = None +docker_stats_version = None custom_clients = [] diff --git a/bucky/systemstats.py b/bucky/systemstats.py index 716b4d7..35daad2 100644 --- a/bucky/systemstats.py +++ b/bucky/systemstats.py @@ -28,6 +28,9 @@ def __init__(self, queue, cfg): if cfg.system_stats_metadata: self.metadata.update(cfg.system_stats_metadata) self.interval = cfg.system_stats_interval + self.ignored_filesystems = set() + if cfg.system_stats_df_ignored: + self.ignored_filesystems.update(cfg.system_stats_df_ignored) def close(self): pass @@ -88,6 +91,8 @@ def read_df_stats(self): if not tokens[1].startswith('/'): continue mount_target, mount_path, mount_filesystem = tokens[:3] + if mount_filesystem in self.ignored_filesystems: + continue try: stats = os.statvfs(mount_path) total_inodes = long(stats.f_files) From 29cdc565e35638227c08433dcd7e8b25b1376445 Mon Sep 17 00:00:00 2001 From: jarek Date: Fri, 23 Jun 2017 12:12:16 +0100 Subject: [PATCH 12/40] More pseudo filesystems to ignore list --- bucky/cfg.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bucky/cfg.py b/bucky/cfg.py index 1f2fcfb..932e3a0 100644 --- a/bucky/cfg.py +++ b/bucky/cfg.py @@ -95,7 +95,7 @@ system_stats_enabled = True system_stats_interval = 10 system_stats_metadata = None -system_stats_df_ignored = ['tmpfs', 'aufs'] +system_stats_df_ignored = ['tmpfs', 'aufs', 'rootfs', 'devtmpfs'] docker_stats_enabled = True docker_stats_interval = 10 From 6da48175b68c5a7cb2af9b142ac64f9991fca156 Mon Sep 17 00:00:00 2001 From: jarek Date: Fri, 23 Jun 2017 15:54:02 +0100 Subject: [PATCH 13/40] Cleanups --- bucky/cfg.py | 9 ++++----- bucky/client.py | 6 ++++++ bucky/dockerstats.py | 8 ++++---- bucky/influxdb.py | 6 +++--- bucky/main.py | 46 +++++++++++++++++++++++++++----------------- bucky/systemstats.py | 4 ++-- 6 files changed, 47 insertions(+), 32 deletions(-) diff --git a/bucky/cfg.py b/bucky/cfg.py index 200b2d8..ae406b9 100644 --- a/bucky/cfg.py +++ b/bucky/cfg.py @@ -7,6 +7,7 @@ directory = "/var/lib/bucky" process_join_timeout = 2 +labels = [] metadata = None sentry_enabled = False @@ -93,17 +94,15 @@ name_strip_duplicates = True name_host_trim = [] -system_stats_enabled = True +system_stats_enabled = False system_stats_interval = 10 system_stats_metadata = None system_stats_df_ignored = ['tmpfs', 'aufs', 'rootfs', 'devtmpfs'] -docker_stats_enabled = True +docker_stats_enabled = False docker_stats_interval = 10 docker_stats_metadata = None -docker_stats_version = None - -custom_clients = [] +docker_stats_version = '1.22' processor = None processor_drop_on_error = False diff --git a/bucky/client.py b/bucky/client.py index d2fd858..7af7d5c 100644 --- a/bucky/client.py +++ b/bucky/client.py @@ -37,6 +37,9 @@ def run(self): setproctitle("bucky: %s" % self.__class__.__name__) while True: try: + if not self.pipe.poll(1): + self.tick() + continue sample = self.pipe.recv() except KeyboardInterrupt: continue @@ -46,3 +49,6 @@ def run(self): def send(self, host, name, value, time, metadata=None): raise NotImplementedError() + + def tick(self): + pass diff --git a/bucky/dockerstats.py b/bucky/dockerstats.py index ac6c00b..d7e5eb6 100644 --- a/bucky/dockerstats.py +++ b/bucky/dockerstats.py @@ -85,7 +85,7 @@ def _add_interface_stats(self, now, labels, stats): self.add_stat("docker_interface", long(v[u'rx_errors']), now, self._merge(labels, dict(instance=k, direction="rx", type="errors"))) self.add_stat("docker_interface", long(v[u'rx_dropped']), now, - self._merge(labels, dict(instance=k, direction="rx", type="drops"))) + self._merge(labels, dict(instance=k, direction="rx", type="dropped"))) self.add_stat("docker_interface", long(v[u'tx_bytes']), now, self._merge(labels, dict(instance=k, direction="tx", type="bytes"))) self.add_stat("docker_interface", long(v[u'tx_packets']), now, @@ -93,7 +93,7 @@ def _add_interface_stats(self, now, labels, stats): self.add_stat("docker_interface", long(v[u'tx_errors']), now, self._merge(labels, dict(instance=k, direction="tx", type="errors"))) self.add_stat("docker_interface", long(v[u'tx_dropped']), now, - self._merge(labels, dict(instance=k, direction="tx", type="drops"))) + self._merge(labels, dict(instance=k, direction="tx", type="dropped"))) def _add_memory_stats(self, now, labels, stats): self.add_stat("docker_memory", long(stats[u'usage']), now, @@ -104,8 +104,8 @@ def read_docker_stats(self): try: for i, container in enumerate(self.docker_client.api.containers(size=True)): labels = container[u'Labels'] - if 'docker' not in labels: - labels['docker'] = i + if 'container_id' not in labels: + labels['container_id'] = container[u'Id'][:12] stats = self.docker_client.api.stats(container[u'Id'], decode=True, stream=False) self._add_df_stats(now, labels, long(container[u'SizeRootFs']), long(container.get(u'SizeRw', 0))) self._add_cpu_stats(now, labels, stats[u'cpu_stats'][u'cpu_usage']) diff --git a/bucky/influxdb.py b/bucky/influxdb.py index c0e71f2..f9e46a9 100644 --- a/bucky/influxdb.py +++ b/bucky/influxdb.py @@ -72,9 +72,9 @@ def close(self): def kv(self, k, v): return str(k) + '=' + str(v) - def flush(self): + def tick(self): now = time.time() - if len(self.buffer) > 30 or (now - self.flush_timestamp) > 3: + if len(self.buffer) > 30 or ((now - self.flush_timestamp) > 1 and len(self.buffer)): payload = '\n'.join(self.buffer).encode() self.resolve_hosts() for ip, port in self.resolved_hosts: @@ -99,4 +99,4 @@ def send(self, host, name, value, mtime, metadata=None): # https://docs.influxdata.com/influxdb/v1.2/write_protocols/line_protocol_tutorial/ line = ' '.join((','.join(buf), self.kv('value', value), str(long(mtime) * 1000000000))) self.buffer.append(line) - self.flush() + self.tick() diff --git a/bucky/main.py b/bucky/main.py index 2a50fc4..744d2c8 100644 --- a/bucky/main.py +++ b/bucky/main.py @@ -134,6 +134,16 @@ def options(): default=cfg.influxdb_enabled, action="store_true", help="Enable the InfluxDB line protocol client" ), + op.make_option( + "--enable-system-stats", dest="system_stats_enabled", + default=cfg.system_stats_enabled, action="store_true", + help="Enable collection of local system stats" + ), + op.make_option( + "--enable-docker-stats", dest="docker_stats_enabled", + default=cfg.docker_stats_enabled, action="store_true", + help="Enable collection of docker containers stats" + ), op.make_option( "--full-trace", dest="full_trace", default=cfg.full_trace, action="store_true", @@ -250,6 +260,20 @@ def main(): except: log.exception("Could not create directory: %s" % cfg.directory) + if cfg.labels: + if not cfg.metadata: + cfg.metadata = {} + for label in cfg.labels: + kv = label.split("=") + if len(kv) > 1: + cfg.metadata[kv[0]] = kv[1] + else: + kv = label.split(":") + if len(kv) > 1: + cfg.metadata[kv[0]] = kv[1] + else: + cfg.metadata[kv[0]] = None + bucky = Bucky(cfg) bucky.run() @@ -258,20 +282,6 @@ class Bucky(object): def __init__(self, cfg): self.sampleq = multiprocessing.Queue() - if cfg.labels: - if not cfg.metadata: - cfg.metadata = {} - for label in cfg.labels: - kv = label.split("=") - if len(kv) > 1: - cfg.metadata[kv[0]] = kv[1] - else: - kv = label.split(":") - if len(kv) > 1: - cfg.metadata[kv[0]] = kv[1] - else: - cfg.metadata[kv[0]] = None - stypes = [] if cfg.metricsd_enabled: stypes.append(metricsd.MetricsDServer) @@ -296,18 +306,18 @@ def __init__(self, cfg): self.proc = None self.psampleq = self.sampleq - default_clients = [] + requested_clients = [] if cfg.graphite_enabled: if cfg.graphite_pickle_enabled: carbon_client = carbon.PickleClient else: carbon_client = carbon.PlaintextClient - default_clients.append(carbon_client) + requested_clients.append(carbon_client) if cfg.influxdb_enabled: - default_clients.append(influxdb.InfluxDBClient) + requested_clients.append(influxdb.InfluxDBClient) self.clients = [] - for client in cfg.custom_clients + default_clients: + for client in requested_clients: send, recv = multiprocessing.Pipe() instance = client(cfg, recv) self.clients.append((instance, send)) diff --git a/bucky/systemstats.py b/bucky/systemstats.py index 35daad2..6720422 100644 --- a/bucky/systemstats.py +++ b/bucky/systemstats.py @@ -128,11 +128,11 @@ def read_interface_stats(self): self.add_stat("interface", long(tokens[1]), now, dict(instance=name, direction="rx", type="bytes")) self.add_stat("interface", long(tokens[2]), now, dict(instance=name, direction="rx", type="packets")) self.add_stat("interface", long(tokens[3]), now, dict(instance=name, direction="rx", type="errors")) - self.add_stat("interface", long(tokens[4]), now, dict(instance=name, direction="rx", type="drops")) + self.add_stat("interface", long(tokens[4]), now, dict(instance=name, direction="rx", type="dropped")) self.add_stat("interface", long(tokens[9]), now, dict(instance=name, direction="tx", type="bytes")) self.add_stat("interface", long(tokens[10]), now, dict(instance=name, direction="tx", type="packets")) self.add_stat("interface", long(tokens[11]), now, dict(instance=name, direction="tx", type="errors")) - self.add_stat("interface", long(tokens[12]), now, dict(instance=name, direction="tx", type="drops")) + self.add_stat("interface", long(tokens[12]), now, dict(instance=name, direction="tx", type="dropped")) def read_load_stats(self): now = int(time.time()) From 15e35659deb71bddc35581c07c1aa44aa1a6b108 Mon Sep 17 00:00:00 2001 From: jarek Date: Tue, 27 Jun 2017 12:18:31 +0100 Subject: [PATCH 14/40] Bulk transfers for system/docker/influx --- bucky/client.py | 9 +++++- bucky/dockerstats.py | 37 ++++++++--------------- bucky/influxdb.py | 15 +++++++--- bucky/systemstats.py | 70 ++++++++++++++++++++++++-------------------- 4 files changed, 71 insertions(+), 60 deletions(-) diff --git a/bucky/client.py b/bucky/client.py index 7af7d5c..7870869 100644 --- a/bucky/client.py +++ b/bucky/client.py @@ -45,10 +45,17 @@ def run(self): continue if sample is None: break - self.send(*sample) + if type(sample[2]) is dict: + self.send_bulk(*sample) + else: + self.send(*sample) def send(self, host, name, value, time, metadata=None): raise NotImplementedError() + def send_bulk(self, host, name, value, time, metadata=None): + for k in value.keys(): + self.send(host, name + '.' + k, value[k], time, metadata) + def tick(self): pass diff --git a/bucky/dockerstats.py b/bucky/dockerstats.py index d7e5eb6..6d7256c 100644 --- a/bucky/dockerstats.py +++ b/bucky/dockerstats.py @@ -65,39 +65,28 @@ def _merge(self, *dicts): return ret def _add_df_stats(self, now, labels, total_size, rw_size): - self.add_stat("docker_df", long(total_size), now, - self._merge(labels, dict(type="bytes", description="total"))) - self.add_stat("docker_df", long(rw_size), now, - self._merge(labels, dict(type="bytes", description="used"))) + docker_df_stats = { + 'total_bytes': long(total_size), + 'used_bytes': long(rw_size) + } + self.add_stat("docker_df", docker_df_stats, now, labels) def _add_cpu_stats(self, now, labels, stats): for k, v in enumerate(stats[u'percpu_usage']): - self.add_stat("docker_cpu", long(v), now, - self._merge(labels, {"instance": k, "type": "usage"})) + self.add_stat("docker_cpu", {'usage': long(v)}, now, self._merge(labels, {"instance": k})) def _add_interface_stats(self, now, labels, stats): for k in stats.keys(): v = stats[k] - self.add_stat("docker_interface", long(v[u'rx_bytes']), now, - self._merge(labels, dict(instance=k, direction="rx", type="bytes"))) - self.add_stat("docker_interface", long(v[u'rx_packets']), now, - self._merge(labels, dict(instance=k, direction="rx", type="packets"))) - self.add_stat("docker_interface", long(v[u'rx_errors']), now, - self._merge(labels, dict(instance=k, direction="rx", type="errors"))) - self.add_stat("docker_interface", long(v[u'rx_dropped']), now, - self._merge(labels, dict(instance=k, direction="rx", type="dropped"))) - self.add_stat("docker_interface", long(v[u'tx_bytes']), now, - self._merge(labels, dict(instance=k, direction="tx", type="bytes"))) - self.add_stat("docker_interface", long(v[u'tx_packets']), now, - self._merge(labels, dict(instance=k, direction="tx", type="packets"))) - self.add_stat("docker_interface", long(v[u'tx_errors']), now, - self._merge(labels, dict(instance=k, direction="tx", type="errors"))) - self.add_stat("docker_interface", long(v[u'tx_dropped']), now, - self._merge(labels, dict(instance=k, direction="tx", type="dropped"))) + keys = ( + u'rx_bytes', u'rx_packets', u'rx_errors', u'rx_dropped', + u'tx_bytes', u'tx_packets', u'tx_errors', u'tx_dropped' + ) + docker_interface_stats = {k: long(v[k]) for k in keys} + self.add_stat("docker_interface", docker_interface_stats, now, self._merge(labels, dict(instance=k))) def _add_memory_stats(self, now, labels, stats): - self.add_stat("docker_memory", long(stats[u'usage']), now, - self._merge(labels, dict(type="used", description="bytes"))) + self.add_stat("docker_memory", {'used_bytes': long(stats[u'usage'])}, now, labels) def read_docker_stats(self): now = int(time.time()) diff --git a/bucky/influxdb.py b/bucky/influxdb.py index f9e46a9..1619587 100644 --- a/bucky/influxdb.py +++ b/bucky/influxdb.py @@ -82,8 +82,8 @@ def tick(self): self.buffer = [] self.flush_timestamp = now - def send(self, host, name, value, mtime, metadata=None): - buf = [name] + def _send(self, host, name, mtime, values, metadata=None): + label_buf = [name] if host: if metadata is None: metadata = {'host': host} @@ -95,8 +95,15 @@ def send(self, host, name, value, mtime, metadata=None): v = metadata[k] # InfluxDB will drop insert with tags without values if v is not None: - buf.append(self.kv(k, v)) + label_buf.append(self.kv(k, v)) + value_buf = [self.kv(k, values[k]) for k in values.keys()] # https://docs.influxdata.com/influxdb/v1.2/write_protocols/line_protocol_tutorial/ - line = ' '.join((','.join(buf), self.kv('value', value), str(long(mtime) * 1000000000))) + line = ' '.join((','.join(label_buf), ','.join(value_buf), str(long(mtime) * 1000000000))) self.buffer.append(line) self.tick() + + def send(self, host, name, value, mtime, metadata=None): + self._send(host, name, mtime, {'value': value}, metadata) + + def send_bulk(self, host, name, value, mtime, metadata=None): + self._send(host, name, mtime, value, metadata) diff --git a/bucky/systemstats.py b/bucky/systemstats.py index 6720422..c97cf34 100644 --- a/bucky/systemstats.py +++ b/bucky/systemstats.py @@ -62,6 +62,7 @@ def add_stat(self, name, value, timestamp, metadata): def read_cpu_stats(self): now = int(time.time()) with open('/proc/stat') as f: + process_stats = {} for l in f.readlines(): tokens = l.strip().split() if not tokens: @@ -69,17 +70,19 @@ def read_cpu_stats(self): name = tokens[0] if not name.startswith('cpu'): if name == 'ctxt': - self.add_stat("processes", long(tokens[1]), now, {"type": "switches"}) + process_stats['switches'] = long(tokens[1]) elif name == 'processes': - self.add_stat("processes", long(tokens[1]), now, {"type": "forks"}) + process_stats['forks'] = long(tokens[1]) elif name == 'procs_running': - self.add_stat("processes", long(tokens[1]), now, {"type": "running"}) + process_stats['running'] = long(tokens[1]) else: cpu_suffix = name[3:] if not cpu_suffix: continue - for k, v in zip(self.CPU_FIELDS, tokens[1:]): - self.add_stat("cpu", long(v), now, {"instance": cpu_suffix, "type": k}) + cpu_stats = {k: v for k, v in zip(self.CPU_FIELDS, tokens[1:])} + self.add_stat("cpu", cpu_stats, now, {"instance": cpu_suffix}) + if process_stats: + self.add_stat("processes", process_stats, now, metadata=None) def read_df_stats(self): now = int(time.time()) @@ -100,18 +103,14 @@ def read_df_stats(self): if not total_inodes: continue block_size = stats.f_bsize - self.add_stat("df", long(stats.f_bavail) * block_size, now, - dict(target=mount_target, instance=mount_path, fs=mount_filesystem, - type="bytes", description="free")) - self.add_stat("df", long(stats.f_blocks) * block_size, now, - dict(target=mount_target, instance=mount_path, fs=mount_filesystem, - type="bytes", description="total")) - self.add_stat("df", long(stats.f_favail), now, - dict(target=mount_target, instance=mount_path, fs=mount_filesystem, - type="inodes", description="free")) - self.add_stat("df", total_inodes, now, - dict(target=mount_target, instance=mount_path, fs=mount_filesystem, - type="inodes", description="total")) + df_stats = { + 'free_bytes': long(stats.f_bavail) * block_size, + 'total_bytes': long(stats.f_blocks) * block_size, + 'free_inodes': long(stats.f_favail), + 'total_inodes': total_inodes + } + self.add_stat("df", df_stats, now, + metadata=dict(target=mount_target, instance=mount_path, fs=mount_filesystem)) except OSError: pass @@ -125,14 +124,17 @@ def read_interface_stats(self): if not tokens[0].endswith(':'): continue name = tokens[0][:-1] - self.add_stat("interface", long(tokens[1]), now, dict(instance=name, direction="rx", type="bytes")) - self.add_stat("interface", long(tokens[2]), now, dict(instance=name, direction="rx", type="packets")) - self.add_stat("interface", long(tokens[3]), now, dict(instance=name, direction="rx", type="errors")) - self.add_stat("interface", long(tokens[4]), now, dict(instance=name, direction="rx", type="dropped")) - self.add_stat("interface", long(tokens[9]), now, dict(instance=name, direction="tx", type="bytes")) - self.add_stat("interface", long(tokens[10]), now, dict(instance=name, direction="tx", type="packets")) - self.add_stat("interface", long(tokens[11]), now, dict(instance=name, direction="tx", type="errors")) - self.add_stat("interface", long(tokens[12]), now, dict(instance=name, direction="tx", type="dropped")) + interface_stats = { + 'rx_bytes': long(tokens[1]), + 'rx_packets': long(tokens[2]), + 'rx_errors': long(tokens[3]), + 'rx_dropped': long(tokens[4]), + 'tx_bytes': long(tokens[9]), + 'tx_packets': long(tokens[10]), + 'tx_errors': long(tokens[11]), + 'tx_dropped': long(tokens[12]) + } + self.add_stat("interface", interface_stats, now, metadata=dict(instance=name)) def read_load_stats(self): now = int(time.time()) @@ -141,13 +143,17 @@ def read_load_stats(self): tokens = l.strip().split() if not tokens or len(tokens) != 5: continue - self.add_stat("load", float(tokens[0]), now, dict(type="1m")) - self.add_stat("load", float(tokens[1]), now, dict(type="5m")) - self.add_stat("load", float(tokens[2]), now, dict(type="15m")) + load_stats = { + 'last_1m': float(tokens[0]), + 'last_5m': float(tokens[1]), + 'last_15m': float(tokens[2]) + } + self.add_stat("load", load_stats, now, metadata=None) def read_memory_stats(self): now = int(time.time()) with open('/proc/meminfo') as f: + memory_stats = {} for l in f.readlines(): tokens = l.strip().split() if not tokens or len(tokens) != 3 or tokens[2].lower() != 'kb': @@ -157,8 +163,10 @@ def read_memory_stats(self): continue name = name[:-1].lower() if name == "memtotal": - self.add_stat("memory", long(tokens[1]) * 1024, now, dict(type="total", description="bytes")) + memory_stats['total_bytes'] = long(tokens[1]) * 1024 elif name == "memfree": - self.add_stat("memory", long(tokens[1]) * 1024, now, dict(type="free", description="bytes")) + memory_stats['free_bytes'] = long(tokens[1]) * 1024 elif name == "memavailable": - self.add_stat("memory", long(tokens[1]) * 1024, now, dict(type="available", description="bytes")) + memory_stats['available_bytes'] = long(tokens[1]) * 1024 + if memory_stats: + self.add_stat("memory", memory_stats, now, metadata=None) From 0adfe80dc1adb603fa61790e180aa5275f674e86 Mon Sep 17 00:00:00 2001 From: jarek Date: Tue, 27 Jun 2017 13:09:18 +0100 Subject: [PATCH 15/40] Bult transfers for statsd module --- bucky/client.py | 6 ++++- bucky/influxdb.py | 2 +- bucky/statsd.py | 57 ++++++++++++++++++++++++++--------------------- 3 files changed, 38 insertions(+), 27 deletions(-) diff --git a/bucky/client.py b/bucky/client.py index 7870869..fd47a4e 100644 --- a/bucky/client.py +++ b/bucky/client.py @@ -55,7 +55,11 @@ def send(self, host, name, value, time, metadata=None): def send_bulk(self, host, name, value, time, metadata=None): for k in value.keys(): - self.send(host, name + '.' + k, value[k], time, metadata) + if name.endswith('.'): + metric_name = name + k + else: + metric_name = name + '.' + k + self.send(host, metric_name, value[k], time, metadata) def tick(self): pass diff --git a/bucky/influxdb.py b/bucky/influxdb.py index 1619587..4eff79d 100644 --- a/bucky/influxdb.py +++ b/bucky/influxdb.py @@ -106,4 +106,4 @@ def send(self, host, name, value, mtime, metadata=None): self._send(host, name, mtime, {'value': value}, metadata) def send_bulk(self, host, name, value, mtime, metadata=None): - self._send(host, name, mtime, value, metadata) + self._send(host, name.strip('.'), mtime, value, metadata) diff --git a/bucky/statsd.py b/bucky/statsd.py index ba537c4..c02cb3d 100644 --- a/bucky/statsd.py +++ b/bucky/statsd.py @@ -158,7 +158,6 @@ def save_gauges(self): log.exception("StatsD: IOError") def tick(self): - name_global_numstats = self.name_global + "numStats" stime = int(time.time()) with self.lock: if self.delete_timers: @@ -181,7 +180,7 @@ def tick(self): kept_keys = kept_keys.union(set(self.gauges.keys())) num_stats += self.enqueue_sets(stime) kept_keys = kept_keys.union(set(self.sets.keys())) - self.enqueue(name_global_numstats, num_stats, stime) + self.enqueue(self.name_global, {"numStats": num_stats}, stime) self.keys_seen = {k: self.keys_seen[k] for k in kept_keys if k in self.keys_seen} def run(self): @@ -204,10 +203,12 @@ def enqueue_timers(self, stime): ret = 0 iteritems = self.timers.items() if six.PY3 else self.timers.iteritems() for k, v in iteritems: + timer_stats = {} + # Skip timers that haven't collected any values if not v: - self.enqueue("%s%s.count" % (self.name_timer, k), 0, stime, k) - self.enqueue("%s%s.count_ps" % (self.name_timer, k), 0.0, stime, k) + timer_stats['count'] = 0 + timer_stats['count_ps'] = 0.0 else: v.sort() count = len(v) @@ -229,58 +230,63 @@ def enqueue_timers(self, stime): vsum = cumulative_values[thresh_idx - 1] t = int(pct_thresh) + t_suffix = "_%s" % (t,) if self.enable_timer_mean: mean = vsum / float(thresh_idx) - self.enqueue("%s%s.mean_%s" % (self.name_timer, k, t), mean, stime, k) + timer_stats["mean" + t_suffix] = mean if self.enable_timer_upper: vthresh = v[thresh_idx - 1] - self.enqueue("%s%s.upper_%s" % (self.name_timer, k, t), vthresh, stime, k) + timer_stats["upper" + t_suffix] = vthresh if self.enable_timer_count: - self.enqueue("%s%s.count_%s" % (self.name_timer, k, t), thresh_idx, stime, k) + timer_stats["count" + t_suffix] = thresh_idx if self.enable_timer_sum: - self.enqueue("%s%s.sum_%s" % (self.name_timer, k, t), vsum, stime, k) + timer_stats["sum" + t_suffix] = vsum if self.enable_timer_sum_squares: vsum_squares = cumul_sum_squares_values[thresh_idx - 1] - self.enqueue("%s%s.sum_squares_%s" % (self.name_timer, k, t), vsum_squares, stime, k) + timer_stats["sum_squares" + t_suffix] = vsum_squares vsum = cumulative_values[count - 1] mean = vsum / float(count) if self.enable_timer_mean: - self.enqueue("%s%s.mean" % (self.name_timer, k), mean, stime, k) + timer_stats["mean"] = mean if self.enable_timer_upper: - self.enqueue("%s%s.upper" % (self.name_timer, k), vmax, stime, k) + timer_stats["upper"] = vmax if self.enable_timer_lower: - self.enqueue("%s%s.lower" % (self.name_timer, k), vmin, stime, k) + timer_stats["lower"] = vmin if self.enable_timer_count: - self.enqueue("%s%s.count" % (self.name_timer, k), count, stime, k) + timer_stats["count"] = count if self.enable_timer_count_ps: - self.enqueue("%s%s.count_ps" % (self.name_timer, k), float(count) / self.flush_time, stime, k) + timer_stats["count_ps"] = float(count) / self.flush_time if self.enable_timer_median: mid = int(count / 2) median = (v[mid - 1] + v[mid]) / 2.0 if count % 2 == 0 else v[mid] - self.enqueue("%s%s.median" % (self.name_timer, k), median, stime, k) + timer_stats["median"] = median if self.enable_timer_sum: - self.enqueue("%s%s.sum" % (self.name_timer, k), vsum, stime, k) + timer_stats["sum"] = vsum if self.enable_timer_sum_squares: vsum_squares = cumul_sum_squares_values[count - 1] - self.enqueue("%s%s.sum_squares" % (self.name_timer, k), vsum_squares, stime, k) + timer_stats["sum_squares"] = vsum_squares if self.enable_timer_std: sum_of_diffs = sum(((value - mean) ** 2 for value in v)) stddev = math.sqrt(sum_of_diffs / count) - self.enqueue("%s%s.std" % (self.name_timer, k), stddev, stime, k) + timer_stats["std"] = stddev + + if timer_stats: + self.enqueue("%s%s" % (self.name_timer, k), timer_stats, stime, k) + self.timers[k] = [] ret += 1 @@ -290,7 +296,7 @@ def enqueue_sets(self, stime): ret = 0 iteritems = self.sets.items() if six.PY3 else self.sets.iteritems() for k, v in iteritems: - self.enqueue("%s%s.count" % (self.name_set, k), len(v), stime, k) + self.enqueue("%s%s" % (self.name_set, k), {"count": len(v)}, stime, k) ret += 1 self.sets[k] = set() return ret @@ -310,13 +316,14 @@ def enqueue_counters(self, stime): iteritems = self.counters.items() if six.PY3 else self.counters.iteritems() for k, v in iteritems: if self.legacy_namespace: - stat_rate = "%s%s" % (self.name_legacy_rate, k) - stat_count = "%s%s" % (self.name_legacy_count, k) + self.enqueue("%s%s" % (self.name_legacy_rate, k), v / self.flush_time, stime, k) + self.enqueue("%s%s" % (self.name_legacy_count, k), v, stime, k) else: - stat_rate = "%s%s.rate" % (self.name_counter, k) - stat_count = "%s%s.count" % (self.name_counter, k) - self.enqueue(stat_rate, v / self.flush_time, stime, k) - self.enqueue(stat_count, v, stime, k) + stats = { + 'rate': v / self.flush_time, + 'count': v + } + self.enqueue("%s%s" % (self.name_counter, k), stats, stime, k) self.counters[k] = 0 ret += 1 return ret From 759535161e2ef097119064f764c85d6e1edbb00e Mon Sep 17 00:00:00 2001 From: jarek Date: Tue, 27 Jun 2017 14:00:19 +0100 Subject: [PATCH 16/40] Fix statsd tests --- tests/test_001_statsd.py | 143 +++++++++++++++++++++------------------ 1 file changed, 77 insertions(+), 66 deletions(-) diff --git a/tests/test_001_statsd.py b/tests/test_001_statsd.py index f932a31..5bf6c87 100644 --- a/tests/test_001_statsd.py +++ b/tests/test_001_statsd.py @@ -36,7 +36,7 @@ def test_simple_counter(q, s): s.send("gorm:1|c") t.same_stat(None, "stats.gorm", 2, q.get(timeout=TIMEOUT)) t.same_stat(None, "stats_counts.gorm", 1, q.get(timeout=TIMEOUT)) - t.same_stat(None, "stats.numStats", 1, q.get(timeout=TIMEOUT)) + t.same_stat(None, "stats.", {"numStats": 1}, q.get(timeout=TIMEOUT)) @t.set_cfg("statsd_flush_time", 0.5) @@ -47,7 +47,7 @@ def test_multiple_messages(q, s): s.send("gorm:1|c") t.same_stat(None, "stats.gorm", 4, q.get(timeout=TIMEOUT)) t.same_stat(None, "stats_counts.gorm", 2, q.get(timeout=TIMEOUT)) - t.same_stat(None, "stats.numStats", 1, q.get(timeout=TIMEOUT)) + t.same_stat(None, "stats.", {"numStats": 1}, q.get(timeout=TIMEOUT)) @t.set_cfg("statsd_flush_time", 0.5) @@ -57,7 +57,7 @@ def test_larger_count(q, s): s.send("gorm:5|c") t.same_stat(None, "stats.gorm", 10, q.get(timeout=TIMEOUT)) t.same_stat(None, "stats_counts.gorm", 5, q.get(timeout=TIMEOUT)) - t.same_stat(None, "stats.numStats", 1, q.get(timeout=TIMEOUT)) + t.same_stat(None, "stats.", {'numStats': 1}, q.get(timeout=TIMEOUT)) @t.set_cfg("statsd_flush_time", 0.5) @@ -78,7 +78,7 @@ def test_multiple_counters(q, s): t.eq(stats[stat[1]], stat[2]) t.gt(stat[2], 0) stats.pop(stat[1]) - t.same_stat(None, "stats.numStats", 2, q.get(timeout=TIMEOUT)) + t.same_stat(None, "stats.", {'numStats': 2}, q.get(timeout=TIMEOUT)) @t.set_cfg("statsd_flush_time", 0.5) @@ -88,21 +88,24 @@ def test_simple_timer(q, s): for i in range(9): s.send("gorm:1|ms") s.send("gorm:2|ms") # Out of the 90% threshold - t.same_stat(None, "stats.timers.gorm.mean_90", 1, q.get(timeout=TIMEOUT)) - t.same_stat(None, "stats.timers.gorm.upper_90", 1, q.get(timeout=TIMEOUT)) - t.same_stat(None, "stats.timers.gorm.count_90", 9, q.get(timeout=TIMEOUT)) - t.same_stat(None, "stats.timers.gorm.sum_90", 9, q.get(timeout=TIMEOUT)) - t.same_stat(None, "stats.timers.gorm.sum_squares_90", 9, q.get(timeout=TIMEOUT)) - t.same_stat(None, "stats.timers.gorm.mean", 1.1, q.get(timeout=TIMEOUT)) - t.same_stat(None, "stats.timers.gorm.upper", 2, q.get(timeout=TIMEOUT)) - t.same_stat(None, "stats.timers.gorm.lower", 1, q.get(timeout=TIMEOUT)) - t.same_stat(None, "stats.timers.gorm.count", 10, q.get(timeout=TIMEOUT)) - t.same_stat(None, "stats.timers.gorm.count_ps", 20.0, q.get(timeout=TIMEOUT)) - t.same_stat(None, "stats.timers.gorm.median", 1, q.get(timeout=TIMEOUT)) - t.same_stat(None, "stats.timers.gorm.sum", 11, q.get(timeout=TIMEOUT)) - t.same_stat(None, "stats.timers.gorm.sum_squares", 13, q.get(timeout=TIMEOUT)) - t.same_stat(None, "stats.timers.gorm.std", 0.3, q.get(timeout=TIMEOUT)) - t.same_stat(None, "stats.numStats", 1, q.get(timeout=TIMEOUT)) + expected_value = { + "mean_90": 1, + "upper_90": 1, + "count_90": 9, + "sum_90": 9, + "sum_squares_90": 9, + "mean": 1.1, + "upper": 2, + "lower": 1, + "count": 10, + "count_ps": 20.0, + "median": 1, + "sum": 11, + "sum_squares": 13, + "std": 0.3 + } + t.same_stat(None, "stats.timers.gorm", expected_value, q.get(timeout=TIMEOUT)) + t.same_stat(None, "stats.", {"numStats": 1}, q.get(timeout=TIMEOUT)) @t.set_cfg("statsd_flush_time", 0.5) @@ -113,21 +116,24 @@ def test_timer_unsorted(q, s): s.send("gorm:5|ms") s.send("gorm:7|ms") # Out of the 90% threshold s.send("gorm:3|ms") - t.same_stat(None, "stats.timers.gorm.mean_90", 10 / 3.0, q.get(timeout=TIMEOUT)) - t.same_stat(None, "stats.timers.gorm.upper_90", 5, q.get(timeout=TIMEOUT)) - t.same_stat(None, "stats.timers.gorm.count_90", 3, q.get(timeout=TIMEOUT)) - t.same_stat(None, "stats.timers.gorm.sum_90", 10, q.get(timeout=TIMEOUT)) - t.same_stat(None, "stats.timers.gorm.sum_squares_90", 38, q.get(timeout=TIMEOUT)) - t.same_stat(None, "stats.timers.gorm.mean", 17 / 4.0, q.get(timeout=TIMEOUT)) - t.same_stat(None, "stats.timers.gorm.upper", 7, q.get(timeout=TIMEOUT)) - t.same_stat(None, "stats.timers.gorm.lower", 2, q.get(timeout=TIMEOUT)) - t.same_stat(None, "stats.timers.gorm.count", 4, q.get(timeout=TIMEOUT)) - t.same_stat(None, "stats.timers.gorm.count_ps", 8, q.get(timeout=TIMEOUT)) - t.same_stat(None, "stats.timers.gorm.median", 4, q.get(timeout=TIMEOUT)) - t.same_stat(None, "stats.timers.gorm.sum", 17, q.get(timeout=TIMEOUT)) - t.same_stat(None, "stats.timers.gorm.sum_squares", 87, q.get(timeout=TIMEOUT)) - t.same_stat(None, "stats.timers.gorm.std", 1.920286436967152, q.get(timeout=TIMEOUT)) - t.same_stat(None, "stats.numStats", 1, q.get(timeout=TIMEOUT)) + expected_value = { + "mean_90": 10 / 3.0, + "upper_90": 5, + "count_90": 3, + "sum_90": 10, + "sum_squares_90": 38, + "mean": 17 / 4.0, + "upper": 7, + "lower": 2, + "count": 4, + "count_ps": 8, + "median": 4, + "sum": 17, + "sum_squares": 87, + "std": 1.920286436967152 + } + t.same_stat(None, "stats.timers.gorm", expected_value, q.get(timeout=TIMEOUT)) + t.same_stat(None, "stats.", {"numStats": 1}, q.get(timeout=TIMEOUT)) @t.set_cfg("statsd_flush_time", 0.1) @@ -135,16 +141,19 @@ def test_timer_unsorted(q, s): @t.udp_srv(bucky.statsd.StatsDServer) def test_timer_single_time(q, s): s.send("gorm:100|ms") - t.same_stat(None, "stats.timers.gorm.mean", 100, q.get(timeout=TIMEOUT)) - t.same_stat(None, "stats.timers.gorm.upper", 100, q.get(timeout=TIMEOUT)) - t.same_stat(None, "stats.timers.gorm.lower", 100, q.get(timeout=TIMEOUT)) - t.same_stat(None, "stats.timers.gorm.count", 1, q.get(timeout=TIMEOUT)) - t.same_stat(None, "stats.timers.gorm.count_ps", 10, q.get(timeout=TIMEOUT)) - t.same_stat(None, "stats.timers.gorm.median", 100, q.get(timeout=TIMEOUT)) - t.same_stat(None, "stats.timers.gorm.sum", 100, q.get(timeout=TIMEOUT)) - t.same_stat(None, "stats.timers.gorm.sum_squares", 10000, q.get(timeout=TIMEOUT)) - t.same_stat(None, "stats.timers.gorm.std", 0, q.get(timeout=TIMEOUT)) - t.same_stat(None, "stats.numStats", 1, q.get(timeout=TIMEOUT)) + expected_value = { + "mean": 100, + "upper": 100, + "lower": 100, + "count": 1, + "count_ps": 10, + "median": 100, + "sum": 100, + "sum_squares": 10000, + "std": 0 + } + t.same_stat(None, "stats.timers.gorm", expected_value, q.get(timeout=TIMEOUT)) + t.same_stat(None, "stats.", {"numStats": 1}, q.get(timeout=TIMEOUT)) @t.set_cfg("statsd_flush_time", 0.1) @@ -154,21 +163,24 @@ def test_timer_multiple_times(q, s): s.send("gorm:100|ms") s.send("gorm:200|ms") s.send("gorm:300|ms") # Out of the 90% threshold - t.same_stat(None, "stats.timers.gorm.mean_90", 150, q.get(timeout=TIMEOUT)) - t.same_stat(None, "stats.timers.gorm.upper_90", 200, q.get(timeout=TIMEOUT)) - t.same_stat(None, "stats.timers.gorm.count_90", 2, q.get(timeout=TIMEOUT)) - t.same_stat(None, "stats.timers.gorm.sum_90", 300, q.get(timeout=TIMEOUT)) - t.same_stat(None, "stats.timers.gorm.sum_squares_90", 50000, q.get(timeout=TIMEOUT)) - t.same_stat(None, "stats.timers.gorm.mean", 200, q.get(timeout=TIMEOUT)) - t.same_stat(None, "stats.timers.gorm.upper", 300, q.get(timeout=TIMEOUT)) - t.same_stat(None, "stats.timers.gorm.lower", 100, q.get(timeout=TIMEOUT)) - t.same_stat(None, "stats.timers.gorm.count", 3, q.get(timeout=TIMEOUT)) - t.same_stat(None, "stats.timers.gorm.count_ps", 30, q.get(timeout=TIMEOUT)) - t.same_stat(None, "stats.timers.gorm.median", 200, q.get(timeout=TIMEOUT)) - t.same_stat(None, "stats.timers.gorm.sum", 600, q.get(timeout=TIMEOUT)) - t.same_stat(None, "stats.timers.gorm.sum_squares", 140000, q.get(timeout=TIMEOUT)) - t.same_stat(None, "stats.timers.gorm.std", 81.64965809277261, q.get(timeout=TIMEOUT)) - t.same_stat(None, "stats.numStats", 1, q.get(timeout=TIMEOUT)) + expected_value = { + "mean_90": 150, + "upper_90": 200, + "count_90": 2, + "sum_90": 300, + "sum_squares_90": 50000, + "mean": 200, + "upper": 300, + "lower": 100, + "count": 3, + "count_ps": 30, + "median": 200, + "sum": 600, + "sum_squares": 140000, + "std": 81.64965809277261 + } + t.same_stat(None, "stats.timers.gorm", expected_value, q.get(timeout=TIMEOUT)) + t.same_stat(None, "stats.", {"numStats": 1}, q.get(timeout=TIMEOUT)) def queue_skip(q, number_of_elements): @@ -187,9 +199,9 @@ def test_timer_multiple_times_even(q, s): s.send("gorm:200|ms") s.send("gorm:400|ms") s.send("gorm:100|ms") - queue_skip(q, 10) - t.same_stat(None, "stats.timers.gorm.median", 250, q.get(timeout=TIMEOUT)) - queue_skip(q, 4) + returned_value = q.get(timeout=TIMEOUT) + returned_value = returned_value[:2] + (returned_value[2]["median"],) + returned_value[3:] + t.same_stat(None, "stats.timers.gorm", 250, returned_value) @t.set_cfg("statsd_flush_time", 0.5) @@ -198,9 +210,8 @@ def test_timer_multiple_times_even(q, s): @t.udp_srv(bucky.statsd.StatsDServer) def test_simple_counter_not_legacy_namespace(q, s): s.send("gorm:1|c") - t.same_stat(None, "stats.counters.gorm.rate", 2, q.get(timeout=TIMEOUT)) - t.same_stat(None, "stats.counters.gorm.count", 1, q.get(timeout=TIMEOUT)) - t.same_stat(None, "stats.numStats", 1, q.get(timeout=TIMEOUT)) + t.same_stat(None, "stats.counters.gorm", {"rate": 2, "count": 1}, q.get(timeout=TIMEOUT)) + t.same_stat(None, "stats.", {"numStats": 1}, q.get(timeout=TIMEOUT)) @t.set_cfg("statsd_flush_time", 0.5) @@ -209,7 +220,7 @@ def test_simple_counter_not_legacy_namespace(q, s): def test_simple_gauge(q, s): s.send("gorm:5|g") t.same_stat(None, "stats.gauges.gorm", 5, q.get(timeout=TIMEOUT)) - t.same_stat(None, "stats.numStats", 1, q.get(timeout=TIMEOUT)) + t.same_stat(None, "stats.", {"numStats": 1}, q.get(timeout=TIMEOUT)) @t.set_cfg("statsd_flush_time", 0.5) From 24c3c74dbc93a36599816923f005c4616c68b76c Mon Sep 17 00:00:00 2001 From: jarek Date: Tue, 27 Jun 2017 14:29:22 +0100 Subject: [PATCH 17/40] Make carbon client connection lazy because it blocks other components from starting up --- bucky/carbon.py | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/bucky/carbon.py b/bucky/carbon.py index 57d7e74..6e1ee19 100644 --- a/bucky/carbon.py +++ b/bucky/carbon.py @@ -53,7 +53,7 @@ def __init__(self, cfg, pipe): self.backoff_max = cfg.graphite_backoff_max if self.max_reconnects <= 0: self.max_reconnects = sys.maxint - self.connect() + self.connected = False def connect(self): if self.debug: @@ -61,10 +61,11 @@ def connect(self): self.sock = DebugSocket() return reconnect_delay = self.reconnect_delay + self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) for i in xrange(self.max_reconnects): - self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) try: self.sock.connect((self.ip, self.port)) + self.connected = True log.info("Connected to Carbon at %s:%s", self.ip, self.port) return except socket.error as e: @@ -88,9 +89,12 @@ def close(self): self.sock.close() except: pass + self.connected = False - def send(self, host, name, value, mtime, metadata=None): - raise NotImplementedError() + def send_message(self, mesg): + if not self.connected: + self.connect() + self.sock.sendall(mesg) class PlaintextClient(CarbonClient): @@ -99,7 +103,7 @@ def send(self, host, name, value, mtime, metadata=None): mesg = "%s %s %s\n" % (stat, value, mtime) for i in xrange(self.max_reconnects): try: - self.sock.sendall(mesg) + self.send_message(mesg) return except socket.error as err: log.error("Failed to send data to Carbon server: %s", err) @@ -128,7 +132,7 @@ def transmit(self): self.buffer = [] for i in xrange(self.max_reconnects): try: - self.sock.sendall(header + payload) + self.send_message(header + payload) return except socket.error as err: log.error("Failed to send data to Carbon server: %s", err) From da09fb73ba944d4e79a83a0eb916fdf2a5d5014a Mon Sep 17 00:00:00 2001 From: jarek Date: Tue, 27 Jun 2017 14:58:27 +0100 Subject: [PATCH 18/40] Disk activity stats --- bucky/influxdb.py | 2 +- bucky/systemstats.py | 28 ++++++++++++++++++++++++++++ 2 files changed, 29 insertions(+), 1 deletion(-) diff --git a/bucky/influxdb.py b/bucky/influxdb.py index 4eff79d..44eb2f7 100644 --- a/bucky/influxdb.py +++ b/bucky/influxdb.py @@ -74,7 +74,7 @@ def kv(self, k, v): def tick(self): now = time.time() - if len(self.buffer) > 30 or ((now - self.flush_timestamp) > 1 and len(self.buffer)): + if len(self.buffer) > 10 or ((now - self.flush_timestamp) > 1 and len(self.buffer)): payload = '\n'.join(self.buffer).encode() self.resolve_hosts() for ip, port in self.resolved_hosts: diff --git a/bucky/systemstats.py b/bucky/systemstats.py index c97cf34..46b7d03 100644 --- a/bucky/systemstats.py +++ b/bucky/systemstats.py @@ -43,6 +43,7 @@ def run(self): self.read_df_stats() self.read_memory_stats() self.read_interface_stats() + self.read_disk_stats() stop_timestamp = time.time() sleep_time = self.interval - (stop_timestamp - start_timestamp) if sleep_time > 0.1: @@ -170,3 +171,30 @@ def read_memory_stats(self): memory_stats['available_bytes'] = long(tokens[1]) * 1024 if memory_stats: self.add_stat("memory", memory_stats, now, metadata=None) + + def read_disk_stats(self): + now = int(time.time()) + with open('/proc/diskstats') as f: + for l in f.readlines(): + tokens = l.strip().split() + if not tokens or len(tokens) != 14: + continue + name = tokens[2] + disk_stats = { + 'read_ops': long(tokens[3]), + 'read_merged': long(tokens[4]), + 'read_sectors': long(tokens[5]), + 'read_bytes': long(tokens[5]) * 512, + 'read_time': long(tokens[6]), + + 'write_ops': long(tokens[7]), + 'write_merged': long(tokens[8]), + 'write_sectors': long(tokens[9]), + 'write_bytes': long(tokens[9]) * 512, + 'write_time': long(tokens[10]), + + 'in_progress': long(tokens[11]), + 'io_time': long(tokens[12]), + 'weighted_time': long(tokens[13]) + } + self.add_stat("disk", disk_stats, now, metadata=dict(instance=name)) From ba54cb356f1343ca5ce5aa0e8e694badd5db9824 Mon Sep 17 00:00:00 2001 From: jarek Date: Tue, 27 Jun 2017 15:27:42 +0100 Subject: [PATCH 19/40] Refactor docker/system stats collectors --- bucky/collector.py | 55 ++++++++++++++++++++++++++++++++ bucky/dockerstats.py | 75 ++++++++++++-------------------------------- bucky/main.py | 4 +-- bucky/systemstats.py | 62 +++++++++++------------------------- 4 files changed, 95 insertions(+), 101 deletions(-) create mode 100644 bucky/collector.py diff --git a/bucky/collector.py b/bucky/collector.py new file mode 100644 index 0000000..32cee18 --- /dev/null +++ b/bucky/collector.py @@ -0,0 +1,55 @@ + + +import time +import multiprocessing + + +try: + from setproctitle import setproctitle +except ImportError: + def setproctitle(title): + pass + + +class StatsCollector(multiprocessing.Process): + def __init__(self, queue): + super(StatsCollector, self).__init__() + self.queue = queue + + def close(self): + pass + + def run(self): + setproctitle("bucky: %s" % self.__class__.__name__) + err = 0 + while True: + start_timestamp = time.time() + if not self.collect(): + err = min(err + 1, 2) + else: + err = 0 + stop_timestamp = time.time() + sleep_time = (err + 1) * self.interval - (stop_timestamp - start_timestamp) + if sleep_time > 0.1: + time.sleep(sleep_time) + + def collect(self): + raise NotImplementedError() + + def add_stat(self, name, value, timestamp, **metadata): + if metadata: + if self.metadata: + metadata.update(self.metadata) + else: + metadata = self.metadata + if metadata: + self.queue.put((None, name, value, timestamp, metadata)) + else: + self.queue.put((None, name, value, timestamp)) + + def merge_dicts(self, *dicts): + ret = {} + for d in dicts: + if d: + ret.update(d) + return ret diff --git a/bucky/dockerstats.py b/bucky/dockerstats.py index 6d7256c..97b10b3 100644 --- a/bucky/dockerstats.py +++ b/bucky/dockerstats.py @@ -2,7 +2,7 @@ import time import docker import logging -import multiprocessing +import bucky.collector as collector import six import requests.exceptions @@ -16,66 +16,31 @@ log = logging.getLogger(__name__) -class DockerStatsServer(multiprocessing.Process): +class DockerStatsCollector(collector.StatsCollector): def __init__(self, queue, cfg): - super(DockerStatsServer, self).__init__() - self.queue = queue - self.metadata = {} - if cfg.metadata: - self.metadata.update(cfg.metadata) - if cfg.docker_stats_metadata: - self.metadata.update(cfg.docker_stats_metadata) + super(DockerStatsCollector, self).__init__(queue) + self.metadata = self.merge_dicts(cfg.metadata, cfg.docker_stats_metadata) self.interval = cfg.docker_stats_interval + self.ignored_filesystems = set() + if cfg.system_stats_df_ignored: + self.ignored_filesystems.update(cfg.system_stats_df_ignored) if cfg.docker_stats_version: self.docker_client = docker.client.from_env(version=cfg.docker_stats_version) else: self.docker_client = docker.client.from_env() - def close(self): - pass - - def run(self): - err = 0 - while True: - start_timestamp = time.time() - if not self.read_docker_stats(): - err = min(err + 1, 2) - else: - err = 0 - stop_timestamp = time.time() - sleep_time = (err + 1) * self.interval - (stop_timestamp - start_timestamp) - if sleep_time > 0.1: - time.sleep(sleep_time) - - def add_stat(self, name, value, timestamp, metadata): - if metadata: - if self.metadata: - metadata.update(self.metadata) - else: - metadata = self.metadata - if metadata: - self.queue.put((None, name, value, timestamp, metadata)) - else: - self.queue.put((None, name, value, timestamp)) - - def _merge(self, *dicts): - ret = {} - for d in dicts: - ret.update(d) - return ret - - def _add_df_stats(self, now, labels, total_size, rw_size): + def read_df_stats(self, now, labels, total_size, rw_size): docker_df_stats = { 'total_bytes': long(total_size), 'used_bytes': long(rw_size) } - self.add_stat("docker_df", docker_df_stats, now, labels) + self.add_stat("docker_df", docker_df_stats, now, **labels) - def _add_cpu_stats(self, now, labels, stats): + def read_cpu_stats(self, now, labels, stats): for k, v in enumerate(stats[u'percpu_usage']): - self.add_stat("docker_cpu", {'usage': long(v)}, now, self._merge(labels, {"instance": k})) + self.add_stat("docker_cpu", {'usage': long(v)}, now, instance=k, **labels) - def _add_interface_stats(self, now, labels, stats): + def read_interface_stats(self, now, labels, stats): for k in stats.keys(): v = stats[k] keys = ( @@ -83,12 +48,12 @@ def _add_interface_stats(self, now, labels, stats): u'tx_bytes', u'tx_packets', u'tx_errors', u'tx_dropped' ) docker_interface_stats = {k: long(v[k]) for k in keys} - self.add_stat("docker_interface", docker_interface_stats, now, self._merge(labels, dict(instance=k))) + self.add_stat("docker_interface", docker_interface_stats, now, instance=k, **labels) - def _add_memory_stats(self, now, labels, stats): - self.add_stat("docker_memory", {'used_bytes': long(stats[u'usage'])}, now, labels) + def read_memory_stats(self, now, labels, stats): + self.add_stat("docker_memory", {'used_bytes': long(stats[u'usage'])}, now, **labels) - def read_docker_stats(self): + def collect(self): now = int(time.time()) try: for i, container in enumerate(self.docker_client.api.containers(size=True)): @@ -96,10 +61,10 @@ def read_docker_stats(self): if 'container_id' not in labels: labels['container_id'] = container[u'Id'][:12] stats = self.docker_client.api.stats(container[u'Id'], decode=True, stream=False) - self._add_df_stats(now, labels, long(container[u'SizeRootFs']), long(container.get(u'SizeRw', 0))) - self._add_cpu_stats(now, labels, stats[u'cpu_stats'][u'cpu_usage']) - self._add_memory_stats(now, labels, stats[u'memory_stats']) - self._add_interface_stats(now, labels, stats[u'networks']) + self.read_df_stats(now, labels, long(container[u'SizeRootFs']), long(container.get(u'SizeRw', 0))) + self.read_cpu_stats(now, labels, stats[u'cpu_stats'][u'cpu_usage']) + self.read_memory_stats(now, labels, stats[u'memory_stats']) + self.read_interface_stats(now, labels, stats[u'networks']) return True except requests.exceptions.ConnectionError: return False diff --git a/bucky/main.py b/bucky/main.py index 744d2c8..2847515 100644 --- a/bucky/main.py +++ b/bucky/main.py @@ -290,9 +290,9 @@ def __init__(self, cfg): if cfg.statsd_enabled: stypes.append(statsd.StatsDServer) if cfg.system_stats_enabled: - stypes.append(systemstats.SystemStatsServer) + stypes.append(systemstats.SystemStatsCollector) if cfg.docker_stats_enabled: - stypes.append(dockerstats.DockerStatsServer) + stypes.append(dockerstats.DockerStatsCollector) self.servers = [] for stype in stypes: diff --git a/bucky/systemstats.py b/bucky/systemstats.py index 46b7d03..1e77e0d 100644 --- a/bucky/systemstats.py +++ b/bucky/systemstats.py @@ -2,7 +2,7 @@ import os import time import logging -import multiprocessing +import bucky.collector as collector import six @@ -15,50 +15,25 @@ log = logging.getLogger(__name__) -class SystemStatsServer(multiprocessing.Process): +class SystemStatsCollector(collector.StatsCollector): # The order of cpu fields in /proc/stat CPU_FIELDS = ('user', 'nice', 'system', 'idle', 'wait', 'interrupt', 'softirq', 'steal') def __init__(self, queue, cfg): - super(SystemStatsServer, self).__init__() - self.queue = queue - self.metadata = {} - if cfg.metadata: - self.metadata.update(cfg.metadata) - if cfg.system_stats_metadata: - self.metadata.update(cfg.system_stats_metadata) + super(SystemStatsCollector, self).__init__(queue) + self.metadata = self.merge_dicts(cfg.metadata, cfg.system_stats_metadata) self.interval = cfg.system_stats_interval self.ignored_filesystems = set() if cfg.system_stats_df_ignored: self.ignored_filesystems.update(cfg.system_stats_df_ignored) - def close(self): - pass - - def run(self): - while True: - start_timestamp = time.time() - self.read_cpu_stats() - self.read_load_stats() - self.read_df_stats() - self.read_memory_stats() - self.read_interface_stats() - self.read_disk_stats() - stop_timestamp = time.time() - sleep_time = self.interval - (stop_timestamp - start_timestamp) - if sleep_time > 0.1: - time.sleep(sleep_time) - - def add_stat(self, name, value, timestamp, metadata): - if metadata: - if self.metadata: - metadata.update(self.metadata) - else: - metadata = self.metadata - if metadata: - self.queue.put((None, name, value, timestamp, metadata)) - else: - self.queue.put((None, name, value, timestamp)) + def collect(self): + self.read_cpu_stats() + self.read_load_stats() + self.read_df_stats() + self.read_memory_stats() + self.read_interface_stats() + self.read_disk_stats() def read_cpu_stats(self): now = int(time.time()) @@ -81,9 +56,9 @@ def read_cpu_stats(self): if not cpu_suffix: continue cpu_stats = {k: v for k, v in zip(self.CPU_FIELDS, tokens[1:])} - self.add_stat("cpu", cpu_stats, now, {"instance": cpu_suffix}) + self.add_stat("cpu", cpu_stats, now, instance=cpu_suffix) if process_stats: - self.add_stat("processes", process_stats, now, metadata=None) + self.add_stat("processes", process_stats, now) def read_df_stats(self): now = int(time.time()) @@ -110,8 +85,7 @@ def read_df_stats(self): 'free_inodes': long(stats.f_favail), 'total_inodes': total_inodes } - self.add_stat("df", df_stats, now, - metadata=dict(target=mount_target, instance=mount_path, fs=mount_filesystem)) + self.add_stat("df", df_stats, now, target=mount_target, instance=mount_path, fs=mount_filesystem) except OSError: pass @@ -135,7 +109,7 @@ def read_interface_stats(self): 'tx_errors': long(tokens[11]), 'tx_dropped': long(tokens[12]) } - self.add_stat("interface", interface_stats, now, metadata=dict(instance=name)) + self.add_stat("interface", interface_stats, now, instance=name) def read_load_stats(self): now = int(time.time()) @@ -149,7 +123,7 @@ def read_load_stats(self): 'last_5m': float(tokens[1]), 'last_15m': float(tokens[2]) } - self.add_stat("load", load_stats, now, metadata=None) + self.add_stat("load", load_stats, now) def read_memory_stats(self): now = int(time.time()) @@ -170,7 +144,7 @@ def read_memory_stats(self): elif name == "memavailable": memory_stats['available_bytes'] = long(tokens[1]) * 1024 if memory_stats: - self.add_stat("memory", memory_stats, now, metadata=None) + self.add_stat("memory", memory_stats, now) def read_disk_stats(self): now = int(time.time()) @@ -197,4 +171,4 @@ def read_disk_stats(self): 'io_time': long(tokens[12]), 'weighted_time': long(tokens[13]) } - self.add_stat("disk", disk_stats, now, metadata=dict(instance=name)) + self.add_stat("disk", disk_stats, now, instance=name) From 5150b5274af89f17176ceacf863b3a4f222a34ad Mon Sep 17 00:00:00 2001 From: jarek Date: Tue, 27 Jun 2017 16:40:12 +0100 Subject: [PATCH 20/40] Make statsd work with Py3 --- bucky/statsd.py | 49 ++++++++++--------------------- contrib/statsd_perfomance_test.py | 2 +- 2 files changed, 17 insertions(+), 34 deletions(-) diff --git a/bucky/statsd.py b/bucky/statsd.py index c02cb3d..b2fdfd1 100644 --- a/bucky/statsd.py +++ b/bucky/statsd.py @@ -67,9 +67,9 @@ def make_name(parts): return name -class StatsDHandler(threading.Thread): +class StatsDServer(udpserver.UDPServer): def __init__(self, queue, cfg): - super(StatsDHandler, self).__init__() + super(StatsDServer, self).__init__(cfg.statsd_ip, cfg.statsd_port) self.daemon = True self.queue = queue self.cfg = cfg @@ -132,6 +132,9 @@ def __init__(self, queue, cfg): self.enable_timer_median = cfg.statsd_timer_median self.enable_timer_std = cfg.statsd_timer_std + def pre_shutdown(self): + self.save_gauges() + def load_gauges(self): if not self.statsd_persistent_gauges: return @@ -184,9 +187,13 @@ def tick(self): self.keys_seen = {k: self.keys_seen[k] for k in kept_keys if k in self.keys_seen} def run(self): - while True: - time.sleep(self.flush_time) - self.tick() + def flush_loop(): + while True: + time.sleep(self.flush_time) + self.tick() + self.load_gauges() + threading.Thread(target=flush_loop).start() + super(StatsDServer, self).run() def enqueue(self, name, stat, stime, metadata_key=None): # No hostnames on statsd @@ -328,15 +335,18 @@ def enqueue_counters(self, stime): ret += 1 return ret - def handle(self, data): + def handle(self, data, addr): # Adding a bit of extra sauce so clients can # send multiple samples in a single UDP # packet. + if six.PY3: + data = data.decode() for line in data.splitlines(): self.line = line if not line.strip(): continue self.handle_line(line) + return True def handle_tags(self, line): # http://docs.datadoghq.com/guides/dogstatsd/#datagram-format @@ -443,30 +453,3 @@ def handle_counter(self, key, fields): def bad_line(self): log.error("StatsD: Invalid line: '%s'", self.line.strip()) - - -class StatsDServer(udpserver.UDPServer): - def __init__(self, queue, cfg): - super(StatsDServer, self).__init__(cfg.statsd_ip, cfg.statsd_port) - self.handler = StatsDHandler(queue, cfg) - - def pre_shutdown(self): - self.handler.save_gauges() - - def run(self): - self.handler.load_gauges() - self.handler.start() - super(StatsDServer, self).run() - - if six.PY3: - def handle(self, data, addr): - self.handler.handle(data.decode()) - if not self.handler.is_alive(): - return False - return True - else: - def handle(self, data, addr): - self.handler.handle(data) - if not self.handler.is_alive(): - return False - return True diff --git a/contrib/statsd_perfomance_test.py b/contrib/statsd_perfomance_test.py index 0cd9661..223f78d 100644 --- a/contrib/statsd_perfomance_test.py +++ b/contrib/statsd_perfomance_test.py @@ -20,7 +20,7 @@ queue = multiprocessing.Queue() -handler = bucky.statsd.StatsDHandler(queue, bucky.cfg) +handler = bucky.statsd.StatsDServer(queue, bucky.cfg) def fill_and_compute_timers(handler): From bc9fc8f712b49ee65ed4c927e4313c6626b1d7da Mon Sep 17 00:00:00 2001 From: jarek Date: Tue, 27 Jun 2017 17:33:20 +0100 Subject: [PATCH 21/40] Fix statsd tests --- tests/test_001_statsd.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/tests/test_001_statsd.py b/tests/test_001_statsd.py index 5bf6c87..3b230f4 100644 --- a/tests/test_001_statsd.py +++ b/tests/test_001_statsd.py @@ -234,16 +234,16 @@ def test_simple_persistent_gauges(q, s): if os.path.isfile(os.path.join(t.cfg.directory, t.cfg.statsd_gauges_savefile)): os.unlink(os.path.join(t.cfg.directory, t.cfg.statsd_gauges_savefile)) try: - s.handler.handle_line("gorm:5|g") - assert s.handler.gauges["gorm"] == 5 + s.handle_line("gorm:5|g") + assert s.gauges["gorm"] == 5 - s.handler.save_gauges() + s.save_gauges() - s.handler.handle_line("gorm:1|g") - assert s.handler.gauges["gorm"] == 1 + s.handle_line("gorm:1|g") + assert s.gauges["gorm"] == 1 - s.handler.load_gauges() - assert s.handler.gauges["gorm"] == 5 + s.load_gauges() + assert s.gauges["gorm"] == 5 finally: if os.path.isfile(os.path.join(t.cfg.directory, t.cfg.statsd_gauges_savefile)): os.unlink(os.path.join(t.cfg.directory, t.cfg.statsd_gauges_savefile)) From b9a6e4b46aa46621ccbf7f357e2aef490bf164a5 Mon Sep 17 00:00:00 2001 From: jarek Date: Wed, 28 Jun 2017 23:59:46 +0100 Subject: [PATCH 22/40] Fix the silly bug --- bucky/systemstats.py | 1 + 1 file changed, 1 insertion(+) diff --git a/bucky/systemstats.py b/bucky/systemstats.py index 1e77e0d..2f32ac3 100644 --- a/bucky/systemstats.py +++ b/bucky/systemstats.py @@ -34,6 +34,7 @@ def collect(self): self.read_memory_stats() self.read_interface_stats() self.read_disk_stats() + return True def read_cpu_stats(self): now = int(time.time()) From 2c72bc84217cd2eefe4289f1d6d7490e89acb444 Mon Sep 17 00:00:00 2001 From: jarek Date: Thu, 29 Jun 2017 10:32:57 +0100 Subject: [PATCH 23/40] Improve data type handling for InfluxDB --- bucky/influxdb.py | 21 +++++++++++++++------ bucky/systemstats.py | 2 +- 2 files changed, 16 insertions(+), 7 deletions(-) diff --git a/bucky/influxdb.py b/bucky/influxdb.py index 44eb2f7..20f4386 100644 --- a/bucky/influxdb.py +++ b/bucky/influxdb.py @@ -83,6 +83,7 @@ def tick(self): self.flush_timestamp = now def _send(self, host, name, mtime, values, metadata=None): + # https://docs.influxdata.com/influxdb/v1.2/write_protocols/line_protocol_tutorial/ label_buf = [name] if host: if metadata is None: @@ -91,13 +92,21 @@ def _send(self, host, name, mtime, values, metadata=None): if 'host' not in metadata: metadata['host'] = host if metadata: - for k in metadata.keys(): + # InfluxDB docs recommend sorting tags + for k in sorted(metadata.keys()): v = metadata[k] - # InfluxDB will drop insert with tags without values - if v is not None: - label_buf.append(self.kv(k, v)) - value_buf = [self.kv(k, values[k]) for k in values.keys()] - # https://docs.influxdata.com/influxdb/v1.2/write_protocols/line_protocol_tutorial/ + # InfluxDB will drop insert with empty tags + if v is None or v == '': + continue + label_buf.append(self.kv(k, v)) + value_buf = [] + for k in values.keys(): + v = values[k] + t = type(v) + if t is long or t is int: + value_buf.append(self.kv(k, str(values[k]) + 'i')) + else: + value_buf.append(self.kv(k, values[k])) line = ' '.join((','.join(label_buf), ','.join(value_buf), str(long(mtime) * 1000000000))) self.buffer.append(line) self.tick() diff --git a/bucky/systemstats.py b/bucky/systemstats.py index 2f32ac3..ef958b6 100644 --- a/bucky/systemstats.py +++ b/bucky/systemstats.py @@ -56,7 +56,7 @@ def read_cpu_stats(self): cpu_suffix = name[3:] if not cpu_suffix: continue - cpu_stats = {k: v for k, v in zip(self.CPU_FIELDS, tokens[1:])} + cpu_stats = {k: long(v) for k, v in zip(self.CPU_FIELDS, tokens[1:])} self.add_stat("cpu", cpu_stats, now, instance=cpu_suffix) if process_stats: self.add_stat("processes", process_stats, now) From af50d9b67b361824199fbe509708824ab7055b00 Mon Sep 17 00:00:00 2001 From: jarek Date: Thu, 29 Jun 2017 10:44:56 +0100 Subject: [PATCH 24/40] Lower the time resolution in InfluxDB line protocol, docs recommend it as an optimization --- README.rst | 3 +++ bucky/influxdb.py | 2 +- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/README.rst b/README.rst index aed34dc..12f5296 100644 --- a/README.rst +++ b/README.rst @@ -334,10 +334,13 @@ like so:: enabled = true bind-address = ":8089" database = "mydatabase" + precision = "1s" Bucky will periodically resolve all hostnames in the `influxdb_hosts` list and fan out metrics to all resolved endpoints. Thus providing replication as well as hot swapping. +Note the precision being 1sec, it is currently the maximum precision +bucky uses when sending data to InfluxDB, hence the setting. A note on CollectD converters diff --git a/bucky/influxdb.py b/bucky/influxdb.py index 20f4386..e9a7ec7 100644 --- a/bucky/influxdb.py +++ b/bucky/influxdb.py @@ -107,7 +107,7 @@ def _send(self, host, name, mtime, values, metadata=None): value_buf.append(self.kv(k, str(values[k]) + 'i')) else: value_buf.append(self.kv(k, values[k])) - line = ' '.join((','.join(label_buf), ','.join(value_buf), str(long(mtime) * 1000000000))) + line = ' '.join((','.join(label_buf), ','.join(value_buf), str(long(mtime)))) self.buffer.append(line) self.tick() From 0f62fae49cc0dd79a2a7bb34fac622e006968c8d Mon Sep 17 00:00:00 2001 From: jarek Date: Thu, 29 Jun 2017 11:19:30 +0100 Subject: [PATCH 25/40] More changes to data types in InfluxDB protocol --- bucky/influxdb.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/bucky/influxdb.py b/bucky/influxdb.py index e9a7ec7..9289652 100644 --- a/bucky/influxdb.py +++ b/bucky/influxdb.py @@ -69,9 +69,6 @@ def close(self): except: pass - def kv(self, k, v): - return str(k) + '=' + str(v) - def tick(self): now = time.time() if len(self.buffer) > 10 or ((now - self.flush_timestamp) > 1 and len(self.buffer)): @@ -98,15 +95,18 @@ def _send(self, host, name, mtime, values, metadata=None): # InfluxDB will drop insert with empty tags if v is None or v == '': continue - label_buf.append(self.kv(k, v)) + v = str(v).replace(' ', '') + label_buf.append(str(k) + '=' + v) value_buf = [] for k in values.keys(): v = values[k] t = type(v) if t is long or t is int: - value_buf.append(self.kv(k, str(values[k]) + 'i')) - else: - value_buf.append(self.kv(k, values[k])) + value_buf.append(str(k) + '=' + str(v) + 'i') + elif t is float or t is bool: + value_buf.append(str(k) + '=' + str(v)) + elif t is str: + value_buf.append(str(k) + '="' + v + '"') line = ' '.join((','.join(label_buf), ','.join(value_buf), str(long(mtime)))) self.buffer.append(line) self.tick() From 36bde060bbdb4cf9d0396719b8b82952a73bf2b5 Mon Sep 17 00:00:00 2001 From: jarek Date: Thu, 29 Jun 2017 11:20:02 +0100 Subject: [PATCH 26/40] Change the back-off algo for failures --- bucky/collector.py | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/bucky/collector.py b/bucky/collector.py index 32cee18..c0fa187 100644 --- a/bucky/collector.py +++ b/bucky/collector.py @@ -21,17 +21,15 @@ def close(self): def run(self): setproctitle("bucky: %s" % self.__class__.__name__) - err = 0 + interval = self.interval while True: start_timestamp = time.time() - if not self.collect(): - err = min(err + 1, 2) - else: - err = 0 + interval = self.interval if self.collect() else interval+interval stop_timestamp = time.time() - sleep_time = (err + 1) * self.interval - (stop_timestamp - start_timestamp) - if sleep_time > 0.1: - time.sleep(sleep_time) + interval = min(interval, 300) + interval = interval - (stop_timestamp - start_timestamp) + if interval > 0.1: + time.sleep(interval) def collect(self): raise NotImplementedError() From 92b732c410af11dff092c589167a3a037407b4f4 Mon Sep 17 00:00:00 2001 From: jarek Date: Thu, 29 Jun 2017 12:00:14 +0100 Subject: [PATCH 27/40] Uniform blacklist/whitelist options for system stats + small refactors and a bugfix --- bucky/cfg.py | 7 +++++- bucky/dockerstats.py | 9 +++----- bucky/systemstats.py | 53 ++++++++++++++++++++++++++++++-------------- 3 files changed, 45 insertions(+), 24 deletions(-) diff --git a/bucky/cfg.py b/bucky/cfg.py index ae406b9..084beee 100644 --- a/bucky/cfg.py +++ b/bucky/cfg.py @@ -97,7 +97,12 @@ system_stats_enabled = False system_stats_interval = 10 system_stats_metadata = None -system_stats_df_ignored = ['tmpfs', 'aufs', 'rootfs', 'devtmpfs'] +system_stats_filesystem_blacklist = ['tmpfs', 'aufs', 'rootfs', 'devtmpfs'] +system_stats_filesystem_whitelist = None +system_stats_interface_blacklist = None +system_stats_interface_whitelist = None +system_stats_disk_blacklist = ['loop0', 'loop1', 'loop2', 'loop3', 'loop4', 'loop5', 'loop6', 'loop7'] +system_stats_disk_whitelist = None docker_stats_enabled = False docker_stats_interval = 10 diff --git a/bucky/dockerstats.py b/bucky/dockerstats.py index 97b10b3..64af5f4 100644 --- a/bucky/dockerstats.py +++ b/bucky/dockerstats.py @@ -21,9 +21,6 @@ def __init__(self, queue, cfg): super(DockerStatsCollector, self).__init__(queue) self.metadata = self.merge_dicts(cfg.metadata, cfg.docker_stats_metadata) self.interval = cfg.docker_stats_interval - self.ignored_filesystems = set() - if cfg.system_stats_df_ignored: - self.ignored_filesystems.update(cfg.system_stats_df_ignored) if cfg.docker_stats_version: self.docker_client = docker.client.from_env(version=cfg.docker_stats_version) else: @@ -34,7 +31,7 @@ def read_df_stats(self, now, labels, total_size, rw_size): 'total_bytes': long(total_size), 'used_bytes': long(rw_size) } - self.add_stat("docker_df", docker_df_stats, now, **labels) + self.add_stat("docker_filesystem", docker_df_stats, now, **labels) def read_cpu_stats(self, now, labels, stats): for k, v in enumerate(stats[u'percpu_usage']): @@ -58,8 +55,8 @@ def collect(self): try: for i, container in enumerate(self.docker_client.api.containers(size=True)): labels = container[u'Labels'] - if 'container_id' not in labels: - labels['container_id'] = container[u'Id'][:12] + if 'docker_id' not in labels: + labels['docker_id'] = container[u'Id'][:12] stats = self.docker_client.api.stats(container[u'Id'], decode=True, stream=False) self.read_df_stats(now, labels, long(container[u'SizeRootFs']), long(container.get(u'SizeRw', 0))) self.read_cpu_stats(now, labels, stats[u'cpu_stats'][u'cpu_usage']) diff --git a/bucky/systemstats.py b/bucky/systemstats.py index ef958b6..8633be6 100644 --- a/bucky/systemstats.py +++ b/bucky/systemstats.py @@ -23,14 +23,29 @@ def __init__(self, queue, cfg): super(SystemStatsCollector, self).__init__(queue) self.metadata = self.merge_dicts(cfg.metadata, cfg.system_stats_metadata) self.interval = cfg.system_stats_interval - self.ignored_filesystems = set() - if cfg.system_stats_df_ignored: - self.ignored_filesystems.update(cfg.system_stats_df_ignored) + self.filesystem_blacklist, self.filesystem_whitelist = self.get_lists(cfg.system_stats_filesystem_blacklist, + cfg.system_stats_filesystem_whitelist) + self.interface_blacklist, self.interface_whitelist = self.get_lists(cfg.system_stats_interface_blacklist, + cfg.system_stats_interface_whitelist) + self.disk_blacklist, self.disk_whitelist = self.get_lists(cfg.system_stats_disk_blacklist, + cfg.system_stats_disk_whitelist) + + def get_lists(self, cfg_blacklist, cfg_whitelist): + blacklist = set(cfg_blacklist) if cfg_blacklist else None + whitelist = set(cfg_whitelist) if cfg_whitelist else None + return blacklist, whitelist + + def check_lists(self, val, blacklist, whitelist): + if whitelist: + return val in whitelist + if blacklist: + return val not in blacklist + return True def collect(self): self.read_cpu_stats() self.read_load_stats() - self.read_df_stats() + self.read_filesystem_stats() self.read_memory_stats() self.read_interface_stats() self.read_disk_stats() @@ -39,7 +54,7 @@ def collect(self): def read_cpu_stats(self): now = int(time.time()) with open('/proc/stat') as f: - process_stats = {} + processes_stats = {} for l in f.readlines(): tokens = l.strip().split() if not tokens: @@ -47,21 +62,21 @@ def read_cpu_stats(self): name = tokens[0] if not name.startswith('cpu'): if name == 'ctxt': - process_stats['switches'] = long(tokens[1]) + processes_stats['switches'] = long(tokens[1]) elif name == 'processes': - process_stats['forks'] = long(tokens[1]) + processes_stats['forks'] = long(tokens[1]) elif name == 'procs_running': - process_stats['running'] = long(tokens[1]) + processes_stats['running'] = long(tokens[1]) else: cpu_suffix = name[3:] if not cpu_suffix: continue cpu_stats = {k: long(v) for k, v in zip(self.CPU_FIELDS, tokens[1:])} self.add_stat("cpu", cpu_stats, now, instance=cpu_suffix) - if process_stats: - self.add_stat("processes", process_stats, now) + if processes_stats: + self.add_stat("processes", processes_stats, now) - def read_df_stats(self): + def read_filesystem_stats(self): now = int(time.time()) with open('/proc/mounts') as f: for l in f.readlines(): @@ -71,7 +86,7 @@ def read_df_stats(self): if not tokens[1].startswith('/'): continue mount_target, mount_path, mount_filesystem = tokens[:3] - if mount_filesystem in self.ignored_filesystems: + if not self.check_lists(mount_filesystem, self.filesystem_blacklist, self.filesystem_whitelist): continue try: stats = os.statvfs(mount_path) @@ -86,7 +101,7 @@ def read_df_stats(self): 'free_inodes': long(stats.f_favail), 'total_inodes': total_inodes } - self.add_stat("df", df_stats, now, target=mount_target, instance=mount_path, fs=mount_filesystem) + self.add_stat("filesystem", df_stats, now, target=mount_target, instance=mount_path, type=mount_filesystem) except OSError: pass @@ -99,7 +114,9 @@ def read_interface_stats(self): continue if not tokens[0].endswith(':'): continue - name = tokens[0][:-1] + interface_name = tokens[0][:-1] + if not self.check_lists(interface_name, self.interface_blacklist, self.interface_whitelist): + continue interface_stats = { 'rx_bytes': long(tokens[1]), 'rx_packets': long(tokens[2]), @@ -110,7 +127,7 @@ def read_interface_stats(self): 'tx_errors': long(tokens[11]), 'tx_dropped': long(tokens[12]) } - self.add_stat("interface", interface_stats, now, instance=name) + self.add_stat("interface", interface_stats, now, instance=interface_name) def read_load_stats(self): now = int(time.time()) @@ -154,7 +171,9 @@ def read_disk_stats(self): tokens = l.strip().split() if not tokens or len(tokens) != 14: continue - name = tokens[2] + disk_name = tokens[2] + if not self.check_lists(disk_name, self.disk_blacklist, self.disk_whitelist): + continue disk_stats = { 'read_ops': long(tokens[3]), 'read_merged': long(tokens[4]), @@ -172,4 +191,4 @@ def read_disk_stats(self): 'io_time': long(tokens[12]), 'weighted_time': long(tokens[13]) } - self.add_stat("disk", disk_stats, now, instance=name) + self.add_stat("disk", disk_stats, now, instance=disk_name) From cb0553f39ba734460b843e6dab8028b78db09746 Mon Sep 17 00:00:00 2001 From: jarek Date: Thu, 29 Jun 2017 12:39:33 +0100 Subject: [PATCH 28/40] Extra option for ignoring Datadog event and service checks in StatsD prototcol --- bucky/cfg.py | 9 +++++---- bucky/statsd.py | 4 ++++ 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/bucky/cfg.py b/bucky/cfg.py index 084beee..85b4ea5 100644 --- a/bucky/cfg.py +++ b/bucky/cfg.py @@ -51,11 +51,12 @@ statsd_delete_counters = True statsd_delete_timers = True statsd_delete_sets = True -# statsd_delete_gauges = False -# `statsd_delete_gauges = True` would make gauges in practice useless, -# except if you get an absolute(!) value every flush-interval which would makes this setting irrelevant -statsd_onlychanged_gauges = True +# `statsd_delete_gauges = True` would make gauges in practice useless, except if you get an absolute(!) +# value every flush-interval which would makes this setting irrelevant, so this option doesn't exist. # send gauge value to graphite only if there was a change +statsd_onlychanged_gauges = True +# Disable this only if you want "bad line" be reported for lines with DataDog extensions +statsd_ignore_datadog_extensions = True statsd_percentile_thresholds = [90] # percentile thresholds for statsd timers diff --git a/bucky/statsd.py b/bucky/statsd.py index b2fdfd1..a06751c 100644 --- a/bucky/statsd.py +++ b/bucky/statsd.py @@ -121,6 +121,7 @@ def __init__(self, queue, cfg): self.delete_timers = self.delete_idlestats and cfg.statsd_delete_timers self.delete_sets = self.delete_idlestats and cfg.statsd_delete_sets self.onlychanged_gauges = self.delete_idlestats and cfg.statsd_onlychanged_gauges + self.ignore_datadog_extensions = cfg.statsd_ignore_datadog_extensions self.enable_timer_mean = cfg.statsd_timer_mean self.enable_timer_upper = cfg.statsd_timer_upper @@ -367,6 +368,9 @@ def handle_tags(self, line): return bits[0], tags def handle_line(self, line): + if self.ignore_datadog_extensions: + if line.startswith('sc|') or line.startswith('_e{'): + return line, tags = self.handle_tags(line) bits = line.split(":") key = self.handle_key(bits.pop(0), tags) From 03aea128f311b8712936587ff86711ad6569c05e Mon Sep 17 00:00:00 2001 From: jarek Date: Thu, 29 Jun 2017 15:39:24 +0100 Subject: [PATCH 29/40] Rework metadata handling in statsd module, switch to tuples, make it a part of the metric signature --- bucky/client.py | 2 +- bucky/collector.py | 3 +- bucky/influxdb.py | 11 ++----- bucky/statsd.py | 71 +++++++++++++++++++++------------------------- 4 files changed, 39 insertions(+), 48 deletions(-) diff --git a/bucky/client.py b/bucky/client.py index fd47a4e..93efc9b 100644 --- a/bucky/client.py +++ b/bucky/client.py @@ -54,7 +54,7 @@ def send(self, host, name, value, time, metadata=None): raise NotImplementedError() def send_bulk(self, host, name, value, time, metadata=None): - for k in value.keys(): + for k in sorted(value.keys()): if name.endswith('.'): metric_name = name + k else: diff --git a/bucky/collector.py b/bucky/collector.py index c0fa187..3561889 100644 --- a/bucky/collector.py +++ b/bucky/collector.py @@ -41,7 +41,8 @@ def add_stat(self, name, value, timestamp, **metadata): else: metadata = self.metadata if metadata: - self.queue.put((None, name, value, timestamp, metadata)) + metadata_tuple = tuple((k, metadata[k]) for k in sorted(metadata.keys())) + self.queue.put((None, name, value, timestamp, metadata_tuple)) else: self.queue.put((None, name, value, timestamp)) diff --git a/bucky/influxdb.py b/bucky/influxdb.py index 9289652..8fb3310 100644 --- a/bucky/influxdb.py +++ b/bucky/influxdb.py @@ -82,16 +82,11 @@ def tick(self): def _send(self, host, name, mtime, values, metadata=None): # https://docs.influxdata.com/influxdb/v1.2/write_protocols/line_protocol_tutorial/ label_buf = [name] - if host: - if metadata is None: - metadata = {'host': host} - else: - if 'host' not in metadata: - metadata['host'] = host + if not metadata and host: + metadata = ('host', host) if metadata: # InfluxDB docs recommend sorting tags - for k in sorted(metadata.keys()): - v = metadata[k] + for k, v in metadata: # InfluxDB will drop insert with empty tags if v is None or v == '': continue diff --git a/bucky/statsd.py b/bucky/statsd.py index a06751c..7c6433a 100644 --- a/bucky/statsd.py +++ b/bucky/statsd.py @@ -85,11 +85,12 @@ def __init__(self, queue, cfg): self.prefix_timer = cfg.statsd_prefix_timer self.prefix_gauge = cfg.statsd_prefix_gauge self.prefix_set = cfg.statsd_prefix_set - self.metadata = {} + metadata = {} if cfg.metadata: - self.metadata.update(cfg.metadata) + metadata.update(cfg.metadata) if cfg.system_stats_metadata: - self.metadata.update(cfg.system_stats_metadata) + metadata.update(cfg.system_stats_metadata) + self.metadata = tuple((k, metadata[k]) for k in metadata.keys()) self.key_res = ( (re.compile("\s+"), "_"), (re.compile("\/"), "-"), @@ -115,7 +116,7 @@ def __init__(self, queue, cfg): self.pct_thresholds = cfg.statsd_percentile_thresholds - self.keys_seen = {} + self.keys_seen = set() self.delete_idlestats = cfg.statsd_delete_idlestats self.delete_counters = self.delete_idlestats and cfg.statsd_delete_counters self.delete_timers = self.delete_idlestats and cfg.statsd_delete_timers @@ -147,16 +148,19 @@ def load_gauges(self): except IOError: log.exception("StatsD: IOError") else: - self.gauges.update({k: gauges[k][0] for k in gauges.keys()}) - self.keys_seen.update({k: gauges[k][1] for k in gauges.keys()}) + for gauge_name, gauge_metadata, gauge_value in gauges: + k = (gauge_name, gauge_metadata) + self.gauges[k] = gauge_value + self.keys_seen.add(k) def save_gauges(self): if not self.statsd_persistent_gauges: return try: - gauges = {} + gauges = [] for k in self.gauges.keys(): - gauges[k] = (self.gauges[k], self.keys_seen.get(k, None)) + gauge_name, gauge_metadata = k + gauges.append((gauge_name, gauge_metadata, self.gauges[k])) write_json_file(self.gauges_filename, gauges) except IOError: log.exception("StatsD: IOError") @@ -165,27 +169,23 @@ def tick(self): stime = int(time.time()) with self.lock: if self.delete_timers: - rem_keys = set(self.timers.keys()) - set(self.keys_seen.keys()) + rem_keys = set(self.timers.keys()) - self.keys_seen for k in rem_keys: del self.timers[k] if self.delete_counters: - rem_keys = set(self.counters.keys()) - set(self.keys_seen.keys()) + rem_keys = set(self.counters.keys()) - self.keys_seen for k in rem_keys: del self.counters[k] if self.delete_sets: - rem_keys = set(self.sets.keys()) - set(self.keys_seen.keys()) + rem_keys = set(self.sets.keys()) - self.keys_seen for k in rem_keys: del self.sets[k] num_stats = self.enqueue_timers(stime) - kept_keys = set(self.timers.keys()) num_stats += self.enqueue_counters(stime) - kept_keys = kept_keys.union(set(self.counters.keys())) num_stats += self.enqueue_gauges(stime) - kept_keys = kept_keys.union(set(self.gauges.keys())) num_stats += self.enqueue_sets(stime) - kept_keys = kept_keys.union(set(self.sets.keys())) self.enqueue(self.name_global, {"numStats": num_stats}, stime) - self.keys_seen = {k: self.keys_seen[k] for k in kept_keys if k in self.keys_seen} + self.keys_seen = set() def run(self): def flush_loop(): @@ -196,12 +196,8 @@ def flush_loop(): threading.Thread(target=flush_loop).start() super(StatsDServer, self).run() - def enqueue(self, name, stat, stime, metadata_key=None): + def enqueue(self, name, stat, stime, metadata=None): # No hostnames on statsd - if metadata_key: - metadata = self.keys_seen.get(metadata_key, None) - else: - metadata = self.metadata if metadata: self.queue.put((None, name, stat, stime, metadata)) else: @@ -211,6 +207,7 @@ def enqueue_timers(self, stime): ret = 0 iteritems = self.timers.items() if six.PY3 else self.timers.iteritems() for k, v in iteritems: + timer_name, timer_metadata = k timer_stats = {} # Skip timers that haven't collected any values @@ -293,7 +290,7 @@ def enqueue_timers(self, stime): timer_stats["std"] = stddev if timer_stats: - self.enqueue("%s%s" % (self.name_timer, k), timer_stats, stime, k) + self.enqueue("%s%s" % (self.name_timer, timer_name), timer_stats, stime, timer_metadata) self.timers[k] = [] ret += 1 @@ -304,7 +301,8 @@ def enqueue_sets(self, stime): ret = 0 iteritems = self.sets.items() if six.PY3 else self.sets.iteritems() for k, v in iteritems: - self.enqueue("%s%s" % (self.name_set, k), {"count": len(v)}, stime, k) + set_name, set_metadata = k + self.enqueue("%s%s" % (self.name_set, set_name), {"count": len(v)}, stime, set_metadata) ret += 1 self.sets[k] = set() return ret @@ -313,9 +311,10 @@ def enqueue_gauges(self, stime): ret = 0 iteritems = self.gauges.items() if six.PY3 else self.gauges.iteritems() for k, v in iteritems: + gauge_name, gauge_metadata = k # only send a value if there was an update if `delete_idlestats` is `True` if not self.onlychanged_gauges or k in self.keys_seen: - self.enqueue("%s%s" % (self.name_gauge, k), v, stime, k) + self.enqueue("%s%s" % (self.name_gauge, gauge_name), v, stime, gauge_metadata) ret += 1 return ret @@ -323,15 +322,16 @@ def enqueue_counters(self, stime): ret = 0 iteritems = self.counters.items() if six.PY3 else self.counters.iteritems() for k, v in iteritems: + counter_name, counter_metadata = k if self.legacy_namespace: - self.enqueue("%s%s" % (self.name_legacy_rate, k), v / self.flush_time, stime, k) - self.enqueue("%s%s" % (self.name_legacy_count, k), v, stime, k) + self.enqueue("%s%s" % (self.name_legacy_rate, counter_name), v / self.flush_time, stime, counter_metadata) + self.enqueue("%s%s" % (self.name_legacy_count, counter_name), v, stime, counter_metadata) else: stats = { 'rate': v / self.flush_time, 'count': v } - self.enqueue("%s%s" % (self.name_counter, k), stats, stime, k) + self.enqueue("%s%s" % (self.name_counter, counter_name), stats, stime, counter_metadata) self.counters[k] = 0 ret += 1 return ret @@ -353,8 +353,8 @@ def handle_tags(self, line): # http://docs.datadoghq.com/guides/dogstatsd/#datagram-format bits = line.split("#") if len(bits) < 2: - return line, None - tags = {} + return line, self.metadata + tags = dict(self.metadata) for i in bits[1].split(","): kv = i.split("=") if len(kv) > 1: @@ -365,7 +365,7 @@ def handle_tags(self, line): tags[kv[0]] = kv[1] else: tags[kv[0]] = None - return bits[0], tags + return bits[0], tuple((k, tags[k]) for k in sorted(tags.keys())) def handle_line(self, line): if self.ignore_datadog_extensions: @@ -397,16 +397,11 @@ def handle_line(self, line): else: self.handle_counter(key, fields) - def handle_key(self, key, tags=None): - if tags is None: - coalesced_tags = self.metadata - else: - coalesced_tags = tags - if self.metadata: - coalesced_tags.update(self.metadata) + def handle_key(self, key, tags): for (rexp, repl) in self.key_res: key = rexp.sub(repl, key) - self.keys_seen[key] = coalesced_tags + key = (key, tags) + self.keys_seen.add(key) return key def handle_timer(self, key, fields): From 624de3a47835d5a279ed430f1dc19d2e9c27c51c Mon Sep 17 00:00:00 2001 From: jarek Date: Thu, 29 Jun 2017 15:52:51 +0100 Subject: [PATCH 30/40] One more renaming to be consistent with docker_* names --- bucky/systemstats.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/bucky/systemstats.py b/bucky/systemstats.py index 8633be6..c6923de 100644 --- a/bucky/systemstats.py +++ b/bucky/systemstats.py @@ -72,9 +72,9 @@ def read_cpu_stats(self): if not cpu_suffix: continue cpu_stats = {k: long(v) for k, v in zip(self.CPU_FIELDS, tokens[1:])} - self.add_stat("cpu", cpu_stats, now, instance=cpu_suffix) + self.add_stat("system_cpu", cpu_stats, now, instance=cpu_suffix) if processes_stats: - self.add_stat("processes", processes_stats, now) + self.add_stat("system_processes", processes_stats, now) def read_filesystem_stats(self): now = int(time.time()) @@ -101,7 +101,7 @@ def read_filesystem_stats(self): 'free_inodes': long(stats.f_favail), 'total_inodes': total_inodes } - self.add_stat("filesystem", df_stats, now, target=mount_target, instance=mount_path, type=mount_filesystem) + self.add_stat("system_filesystem", df_stats, now, target=mount_target, instance=mount_path, type=mount_filesystem) except OSError: pass @@ -127,7 +127,7 @@ def read_interface_stats(self): 'tx_errors': long(tokens[11]), 'tx_dropped': long(tokens[12]) } - self.add_stat("interface", interface_stats, now, instance=interface_name) + self.add_stat("system_interface", interface_stats, now, instance=interface_name) def read_load_stats(self): now = int(time.time()) @@ -141,7 +141,7 @@ def read_load_stats(self): 'last_5m': float(tokens[1]), 'last_15m': float(tokens[2]) } - self.add_stat("load", load_stats, now) + self.add_stat("system_load", load_stats, now) def read_memory_stats(self): now = int(time.time()) @@ -162,7 +162,7 @@ def read_memory_stats(self): elif name == "memavailable": memory_stats['available_bytes'] = long(tokens[1]) * 1024 if memory_stats: - self.add_stat("memory", memory_stats, now) + self.add_stat("system_memory", memory_stats, now) def read_disk_stats(self): now = int(time.time()) @@ -191,4 +191,4 @@ def read_disk_stats(self): 'io_time': long(tokens[12]), 'weighted_time': long(tokens[13]) } - self.add_stat("disk", disk_stats, now, instance=disk_name) + self.add_stat("system_disk", disk_stats, now, instance=disk_name) From 02fcc9598c38ddade8959807af709056aaf6aac4 Mon Sep 17 00:00:00 2001 From: jarek Date: Thu, 29 Jun 2017 18:07:36 +0100 Subject: [PATCH 31/40] Add an alternative naming scheme for statsd, much more usable with influxdb/metadata --- bucky/cfg.py | 3 +++ bucky/client.py | 2 +- bucky/statsd.py | 46 +++++++++++++++++++++++++++++++++++----------- 3 files changed, 39 insertions(+), 12 deletions(-) diff --git a/bucky/cfg.py b/bucky/cfg.py index 85b4ea5..2c0cea0 100644 --- a/bucky/cfg.py +++ b/bucky/cfg.py @@ -57,6 +57,9 @@ statsd_onlychanged_gauges = True # Disable this only if you want "bad line" be reported for lines with DataDog extensions statsd_ignore_datadog_extensions = True +statsd_ignore_internal_stats = False +# Use metadata name=NAME instead of the original/legacy naming scheme +statsd_metadata_namespace = False statsd_percentile_thresholds = [90] # percentile thresholds for statsd timers diff --git a/bucky/client.py b/bucky/client.py index 93efc9b..fd47a4e 100644 --- a/bucky/client.py +++ b/bucky/client.py @@ -54,7 +54,7 @@ def send(self, host, name, value, time, metadata=None): raise NotImplementedError() def send_bulk(self, host, name, value, time, metadata=None): - for k in sorted(value.keys()): + for k in value.keys(): if name.endswith('.'): metric_name = name + k else: diff --git a/bucky/statsd.py b/bucky/statsd.py index 7c6433a..6fe2bdd 100644 --- a/bucky/statsd.py +++ b/bucky/statsd.py @@ -97,7 +97,15 @@ def __init__(self, queue, cfg): (re.compile("[^a-zA-Z_\-0-9\.]"), "") ) - if self.legacy_namespace: + self.enqueue = self.enqueue_with_dotted_names + if cfg.statsd_metadata_namespace: + self.name_global = self.global_prefix + self.name_counter = self.global_prefix + self.prefix_counter + self.name_timer = self.global_prefix + self.prefix_timer + self.name_gauge = self.global_prefix + self.prefix_gauge + self.name_set = self.global_prefix + self.prefix_set + self.enqueue = self.enqueue_with_metadata_names + elif self.legacy_namespace: self.name_global = 'stats.' self.name_legacy_rate = 'stats.' self.name_legacy_count = 'stats_counts.' @@ -123,6 +131,7 @@ def __init__(self, queue, cfg): self.delete_sets = self.delete_idlestats and cfg.statsd_delete_sets self.onlychanged_gauges = self.delete_idlestats and cfg.statsd_onlychanged_gauges self.ignore_datadog_extensions = cfg.statsd_ignore_datadog_extensions + self.ignore_internal_stats = cfg.statsd_ignore_internal_stats self.enable_timer_mean = cfg.statsd_timer_mean self.enable_timer_upper = cfg.statsd_timer_upper @@ -184,7 +193,8 @@ def tick(self): num_stats += self.enqueue_counters(stime) num_stats += self.enqueue_gauges(stime) num_stats += self.enqueue_sets(stime) - self.enqueue(self.name_global, {"numStats": num_stats}, stime) + if not self.ignore_internal_stats: + self.enqueue(self.name_global, None, {"numStats": num_stats}, stime) self.keys_seen = set() def run(self): @@ -196,12 +206,26 @@ def flush_loop(): threading.Thread(target=flush_loop).start() super(StatsDServer, self).run() - def enqueue(self, name, stat, stime, metadata=None): + def enqueue_with_dotted_names(self, bucket, name, value, stime, metadata=None): # No hostnames on statsd + if name: + bucket += name if metadata: - self.queue.put((None, name, stat, stime, metadata)) + self.queue.put((None, bucket, value, stime, metadata)) else: - self.queue.put((None, name, stat, stime)) + self.queue.put((None, bucket, value, stime)) + + def enqueue_with_metadata_names(self, bucket, name, value, stime, metadata=None): + # No hostnames on statsd + if metadata: + if name: + metadata = metadata + (('name', name),) + self.queue.put((None, bucket, value, stime, metadata)) + else: + if name: + self.queue.put((None, bucket, value, stime, (('name', name),))) + else: + self.queue.put((None, bucket, value, stime)) def enqueue_timers(self, stime): ret = 0 @@ -290,7 +314,7 @@ def enqueue_timers(self, stime): timer_stats["std"] = stddev if timer_stats: - self.enqueue("%s%s" % (self.name_timer, timer_name), timer_stats, stime, timer_metadata) + self.enqueue(self.name_timer, timer_name, timer_stats, stime, timer_metadata) self.timers[k] = [] ret += 1 @@ -302,7 +326,7 @@ def enqueue_sets(self, stime): iteritems = self.sets.items() if six.PY3 else self.sets.iteritems() for k, v in iteritems: set_name, set_metadata = k - self.enqueue("%s%s" % (self.name_set, set_name), {"count": len(v)}, stime, set_metadata) + self.enqueue(self.name_set, set_name, {"count": len(v)}, stime, set_metadata) ret += 1 self.sets[k] = set() return ret @@ -314,7 +338,7 @@ def enqueue_gauges(self, stime): gauge_name, gauge_metadata = k # only send a value if there was an update if `delete_idlestats` is `True` if not self.onlychanged_gauges or k in self.keys_seen: - self.enqueue("%s%s" % (self.name_gauge, gauge_name), v, stime, gauge_metadata) + self.enqueue(self.name_gauge, gauge_name, v, stime, gauge_metadata) ret += 1 return ret @@ -324,14 +348,14 @@ def enqueue_counters(self, stime): for k, v in iteritems: counter_name, counter_metadata = k if self.legacy_namespace: - self.enqueue("%s%s" % (self.name_legacy_rate, counter_name), v / self.flush_time, stime, counter_metadata) - self.enqueue("%s%s" % (self.name_legacy_count, counter_name), v, stime, counter_metadata) + self.enqueue(self.name_legacy_rate, counter_name, v / self.flush_time, stime, counter_metadata) + self.enqueue(self.name_legacy_count, counter_name, v, stime, counter_metadata) else: stats = { 'rate': v / self.flush_time, 'count': v } - self.enqueue("%s%s" % (self.name_counter, counter_name), stats, stime, counter_metadata) + self.enqueue(self.name_counter, counter_name, stats, stime, counter_metadata) self.counters[k] = 0 ret += 1 return ret From a75c8454e92074d90309874a7d3d906efedb7f87 Mon Sep 17 00:00:00 2001 From: jarek Date: Thu, 29 Jun 2017 18:23:51 +0100 Subject: [PATCH 32/40] Fix tests --- bucky/collector.py | 2 +- bucky/statsd.py | 2 +- contrib/statsd_perfomance_test.py | 2 +- tests/test_001_statsd.py | 6 +++--- 4 files changed, 6 insertions(+), 6 deletions(-) diff --git a/bucky/collector.py b/bucky/collector.py index 3561889..bd859ae 100644 --- a/bucky/collector.py +++ b/bucky/collector.py @@ -24,7 +24,7 @@ def run(self): interval = self.interval while True: start_timestamp = time.time() - interval = self.interval if self.collect() else interval+interval + interval = self.interval if self.collect() else interval + interval stop_timestamp = time.time() interval = min(interval, 300) interval = interval - (stop_timestamp - start_timestamp) diff --git a/bucky/statsd.py b/bucky/statsd.py index 6fe2bdd..6a88254 100644 --- a/bucky/statsd.py +++ b/bucky/statsd.py @@ -158,7 +158,7 @@ def load_gauges(self): log.exception("StatsD: IOError") else: for gauge_name, gauge_metadata, gauge_value in gauges: - k = (gauge_name, gauge_metadata) + k = (gauge_name, tuple(gauge_metadata)) self.gauges[k] = gauge_value self.keys_seen.add(k) diff --git a/contrib/statsd_perfomance_test.py b/contrib/statsd_perfomance_test.py index 223f78d..cdb4274 100644 --- a/contrib/statsd_perfomance_test.py +++ b/contrib/statsd_perfomance_test.py @@ -27,7 +27,7 @@ def fill_and_compute_timers(handler): # Fill timers for x in l100: # timer name for y in l1000: # timer value, using random value is not good idea there - handler.handle_timer("timer-%s" % (x), [y]) + handler.handle_timer(("timer-%s" % (x,), ()), [y]) # Compute metrics stime = int(time.time()) diff --git a/tests/test_001_statsd.py b/tests/test_001_statsd.py index 3b230f4..ce49984 100644 --- a/tests/test_001_statsd.py +++ b/tests/test_001_statsd.py @@ -235,15 +235,15 @@ def test_simple_persistent_gauges(q, s): os.unlink(os.path.join(t.cfg.directory, t.cfg.statsd_gauges_savefile)) try: s.handle_line("gorm:5|g") - assert s.gauges["gorm"] == 5 + assert s.gauges[("gorm", ())] == 5 s.save_gauges() s.handle_line("gorm:1|g") - assert s.gauges["gorm"] == 1 + assert s.gauges[("gorm", ())] == 1 s.load_gauges() - assert s.gauges["gorm"] == 5 + assert s.gauges[("gorm", ())] == 5 finally: if os.path.isfile(os.path.join(t.cfg.directory, t.cfg.statsd_gauges_savefile)): os.unlink(os.path.join(t.cfg.directory, t.cfg.statsd_gauges_savefile)) From d2a45b0832d1141eaf2935a2b57a58e1079c8290 Mon Sep 17 00:00:00 2001 From: jarek Date: Thu, 29 Jun 2017 18:55:38 +0100 Subject: [PATCH 33/40] Use word "metadata" instead of tags/labels, simplify config for it, too --- bucky/cfg.py | 6 +----- bucky/dockerstats.py | 2 +- bucky/main.py | 21 +++++++++++---------- bucky/statsd.py | 26 +++++++++++--------------- bucky/systemstats.py | 2 +- 5 files changed, 25 insertions(+), 32 deletions(-) diff --git a/bucky/cfg.py b/bucky/cfg.py index 2c0cea0..5a5f2bc 100644 --- a/bucky/cfg.py +++ b/bucky/cfg.py @@ -7,8 +7,7 @@ directory = "/var/lib/bucky" process_join_timeout = 2 -labels = [] -metadata = None +metadata = [] sentry_enabled = False sentry_dsn = None @@ -37,7 +36,6 @@ statsd_port = 8125 statsd_enabled = True statsd_flush_time = 10.0 -statsd_metadata = None statsd_legacy_namespace = True statsd_global_prefix = "stats" statsd_prefix_counter = "counters" @@ -100,7 +98,6 @@ system_stats_enabled = False system_stats_interval = 10 -system_stats_metadata = None system_stats_filesystem_blacklist = ['tmpfs', 'aufs', 'rootfs', 'devtmpfs'] system_stats_filesystem_whitelist = None system_stats_interface_blacklist = None @@ -110,7 +107,6 @@ docker_stats_enabled = False docker_stats_interval = 10 -docker_stats_metadata = None docker_stats_version = '1.22' processor = None diff --git a/bucky/dockerstats.py b/bucky/dockerstats.py index 64af5f4..0d3685c 100644 --- a/bucky/dockerstats.py +++ b/bucky/dockerstats.py @@ -19,7 +19,7 @@ class DockerStatsCollector(collector.StatsCollector): def __init__(self, queue, cfg): super(DockerStatsCollector, self).__init__(queue) - self.metadata = self.merge_dicts(cfg.metadata, cfg.docker_stats_metadata) + self.metadata = cfg.metadata if cfg.metadata else {} self.interval = cfg.docker_stats_interval if cfg.docker_stats_version: self.docker_client = docker.client.from_env(version=cfg.docker_stats_version) diff --git a/bucky/main.py b/bucky/main.py index 2847515..db83616 100644 --- a/bucky/main.py +++ b/bucky/main.py @@ -169,7 +169,7 @@ def options(): type="str", default=cfg.gid, help="Drop privileges to this group" ), - op.make_option("--label", action="append", dest="labels") + op.make_option("--metadata", action="append", dest="metadata") ] @@ -260,19 +260,20 @@ def main(): except: log.exception("Could not create directory: %s" % cfg.directory) - if cfg.labels: - if not cfg.metadata: - cfg.metadata = {} - for label in cfg.labels: - kv = label.split("=") + # This in place swap from list to dict is hideous :-| + metadata = {} + if cfg.metadata: + for i in cfg.metadata: + kv = i.split("=") if len(kv) > 1: - cfg.metadata[kv[0]] = kv[1] + metadata[kv[0]] = kv[1] else: - kv = label.split(":") + kv = i.split(":") if len(kv) > 1: - cfg.metadata[kv[0]] = kv[1] + metadata[kv[0]] = kv[1] else: - cfg.metadata[kv[0]] = None + metadata[kv[0]] = None + cfg.metadata = metadata bucky = Bucky(cfg) bucky.run() diff --git a/bucky/statsd.py b/bucky/statsd.py index 6a88254..fba0949 100644 --- a/bucky/statsd.py +++ b/bucky/statsd.py @@ -85,11 +85,7 @@ def __init__(self, queue, cfg): self.prefix_timer = cfg.statsd_prefix_timer self.prefix_gauge = cfg.statsd_prefix_gauge self.prefix_set = cfg.statsd_prefix_set - metadata = {} - if cfg.metadata: - metadata.update(cfg.metadata) - if cfg.system_stats_metadata: - metadata.update(cfg.system_stats_metadata) + metadata = cfg.metadata if cfg.metadata else {} self.metadata = tuple((k, metadata[k]) for k in metadata.keys()) self.key_res = ( (re.compile("\s+"), "_"), @@ -373,31 +369,31 @@ def handle(self, data, addr): self.handle_line(line) return True - def handle_tags(self, line): + def handle_metadata(self, line): # http://docs.datadoghq.com/guides/dogstatsd/#datagram-format bits = line.split("#") if len(bits) < 2: return line, self.metadata - tags = dict(self.metadata) + metadata = dict(self.metadata) for i in bits[1].split(","): kv = i.split("=") if len(kv) > 1: - tags[kv[0]] = kv[1] + metadata[kv[0]] = kv[1] else: kv = i.split(":") if len(kv) > 1: - tags[kv[0]] = kv[1] + metadata[kv[0]] = kv[1] else: - tags[kv[0]] = None - return bits[0], tuple((k, tags[k]) for k in sorted(tags.keys())) + metadata[kv[0]] = None + return bits[0], tuple((k, metadata[k]) for k in sorted(metadata.keys())) def handle_line(self, line): if self.ignore_datadog_extensions: if line.startswith('sc|') or line.startswith('_e{'): return - line, tags = self.handle_tags(line) + line, metadata = self.handle_metadata(line) bits = line.split(":") - key = self.handle_key(bits.pop(0), tags) + key = self.handle_key(bits.pop(0), metadata) if not bits: self.bad_line() @@ -421,10 +417,10 @@ def handle_line(self, line): else: self.handle_counter(key, fields) - def handle_key(self, key, tags): + def handle_key(self, key, metadata): for (rexp, repl) in self.key_res: key = rexp.sub(repl, key) - key = (key, tags) + key = (key, metadata) self.keys_seen.add(key) return key diff --git a/bucky/systemstats.py b/bucky/systemstats.py index c6923de..a48c167 100644 --- a/bucky/systemstats.py +++ b/bucky/systemstats.py @@ -21,7 +21,7 @@ class SystemStatsCollector(collector.StatsCollector): def __init__(self, queue, cfg): super(SystemStatsCollector, self).__init__(queue) - self.metadata = self.merge_dicts(cfg.metadata, cfg.system_stats_metadata) + self.metadata = cfg.metadata if cfg.metadata else {} self.interval = cfg.system_stats_interval self.filesystem_blacklist, self.filesystem_whitelist = self.get_lists(cfg.system_stats_filesystem_blacklist, cfg.system_stats_filesystem_whitelist) From 06793c609256895c801e1afac79efeb446b7ea4d Mon Sep 17 00:00:00 2001 From: jarek Date: Thu, 29 Jun 2017 19:42:07 +0100 Subject: [PATCH 34/40] InfluxDB does not accept duplicated tags in line proto, fit it --- bucky/statsd.py | 29 +++++++++++++++++++++-------- 1 file changed, 21 insertions(+), 8 deletions(-) diff --git a/bucky/statsd.py b/bucky/statsd.py index fba0949..64fbf35 100644 --- a/bucky/statsd.py +++ b/bucky/statsd.py @@ -85,8 +85,8 @@ def __init__(self, queue, cfg): self.prefix_timer = cfg.statsd_prefix_timer self.prefix_gauge = cfg.statsd_prefix_gauge self.prefix_set = cfg.statsd_prefix_set - metadata = cfg.metadata if cfg.metadata else {} - self.metadata = tuple((k, metadata[k]) for k in metadata.keys()) + self.metadata_dict = cfg.metadata if cfg.metadata else {} + self.metadata_tuple = tuple((k, self.metadata_dict[k]) for k in sorted(self.metadata_dict.keys())) self.key_res = ( (re.compile("\s+"), "_"), (re.compile("\/"), "-"), @@ -202,21 +202,34 @@ def flush_loop(): threading.Thread(target=flush_loop).start() super(StatsDServer, self).run() + def coalesce_metadata(self, metadata): + if not metadata: + return self.metadata_dict + if self.metadata_dict: + tmp = self.metadata_dict.copy() + tmp.update(metadata) + return tmp + return dict(metadata) + def enqueue_with_dotted_names(self, bucket, name, value, stime, metadata=None): # No hostnames on statsd if name: bucket += name + metadata = self.coalesce_metadata(metadata) if metadata: - self.queue.put((None, bucket, value, stime, metadata)) + metadata_tuple = tuple((k, metadata[k]) for k in sorted(metadata.keys())) + self.queue.put((None, bucket, value, stime, metadata_tuple)) else: self.queue.put((None, bucket, value, stime)) def enqueue_with_metadata_names(self, bucket, name, value, stime, metadata=None): # No hostnames on statsd + metadata = self.coalesce_metadata(metadata) if metadata: - if name: - metadata = metadata + (('name', name),) - self.queue.put((None, bucket, value, stime, metadata)) + if name and not 'name' in metadata: + metadata['name'] = name + metadata_tuple = tuple((k, metadata[k]) for k in sorted(metadata.keys())) + self.queue.put((None, bucket, value, stime, metadata_tuple)) else: if name: self.queue.put((None, bucket, value, stime, (('name', name),))) @@ -373,8 +386,8 @@ def handle_metadata(self, line): # http://docs.datadoghq.com/guides/dogstatsd/#datagram-format bits = line.split("#") if len(bits) < 2: - return line, self.metadata - metadata = dict(self.metadata) + return line, None + metadata = {} for i in bits[1].split(","): kv = i.split("=") if len(kv) > 1: From 5af6bab6079f7073db7d5aecc90e82feb55059ad Mon Sep 17 00:00:00 2001 From: jarek Date: Thu, 29 Jun 2017 20:44:21 +0100 Subject: [PATCH 35/40] Right, another fix for influxdb protocol --- bucky/influxdb.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/bucky/influxdb.py b/bucky/influxdb.py index 8fb3310..5a09498 100644 --- a/bucky/influxdb.py +++ b/bucky/influxdb.py @@ -102,7 +102,8 @@ def _send(self, host, name, mtime, values, metadata=None): value_buf.append(str(k) + '=' + str(v)) elif t is str: value_buf.append(str(k) + '="' + v + '"') - line = ' '.join((','.join(label_buf), ','.join(value_buf), str(long(mtime)))) + # So, the lower timestamp precisions don't seem to work with line protocol... + line = ' '.join((','.join(label_buf), ','.join(value_buf), str(long(mtime) * 1000000000))) self.buffer.append(line) self.tick() From 6a7ef39ae36ae04c7aeb68d129a1727f717db970 Mon Sep 17 00:00:00 2001 From: jarek Date: Fri, 30 Jun 2017 09:42:54 +0100 Subject: [PATCH 36/40] Early implementation of Prometheus integration --- bucky/cfg.py | 5 +++ bucky/main.py | 8 +++++ bucky/prometheus.py | 88 +++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 101 insertions(+) create mode 100644 bucky/prometheus.py diff --git a/bucky/cfg.py b/bucky/cfg.py index 5a5f2bc..765465c 100644 --- a/bucky/cfg.py +++ b/bucky/cfg.py @@ -86,6 +86,11 @@ "127.0.0.1:8089" ] +prometheus_enabled = False +prometheus_port = 9090 +prometheus_timeout = 60 +prometheus_path = 'metrics' + full_trace = False name_prefix = None diff --git a/bucky/main.py b/bucky/main.py index db83616..70bfab5 100644 --- a/bucky/main.py +++ b/bucky/main.py @@ -39,6 +39,7 @@ import bucky.systemstats as systemstats import bucky.dockerstats as dockerstats import bucky.influxdb as influxdb +import bucky.prometheus as prometheus import bucky.processor as processor from bucky.errors import BuckyError @@ -134,6 +135,11 @@ def options(): default=cfg.influxdb_enabled, action="store_true", help="Enable the InfluxDB line protocol client" ), + op.make_option( + "--enable-prometheus", dest="prometheus_enabled", + default=cfg.prometheus_enabled, action="store_true", + help="Enable the Prometheus exposition via HTTP" + ), op.make_option( "--enable-system-stats", dest="system_stats_enabled", default=cfg.system_stats_enabled, action="store_true", @@ -316,6 +322,8 @@ def __init__(self, cfg): requested_clients.append(carbon_client) if cfg.influxdb_enabled: requested_clients.append(influxdb.InfluxDBClient) + if cfg.prometheus_enabled: + requested_clients.append(prometheus.PrometheusClient) self.clients = [] for client in requested_clients: diff --git a/bucky/prometheus.py b/bucky/prometheus.py new file mode 100644 index 0000000..f25552e --- /dev/null +++ b/bucky/prometheus.py @@ -0,0 +1,88 @@ + +import six +import time +import logging +import threading + +try: + import http.server as _http +except ImportError: + import BaseHTTPServer as _http + +import bucky.client as client + + +if six.PY3: + xrange = range + long = int + + +log = logging.getLogger(__name__) + + +class PrometheusClient(client.Client): + def __init__(self, cfg, pipe): + super(PrometheusClient, self).__init__(pipe) + self.port = cfg.prometheus_port + self.timeout = cfg.prometheus_timeout + self.path = cfg.prometheus_path + self.flush_timestamp = time.time() + self.buffer = {} + + def run(self): + def do_GET(req): + if req.path.strip('/') != self.path: + req.send_response(404) + req.send_header("Content-type", "text/plain") + req.end_headers() + else: + req.send_response(200) + req.send_header("Content-type", "text/plain") + req.end_headers() + for k in self.buffer.keys(): + req.wfile.write(self.get_or_render_line(k).encode()) + + handler = type('PrometheusHandler', (_http.BaseHTTPRequestHandler,), {'do_GET': do_GET}) + server = _http.HTTPServer(('0.0.0.0', self.port), handler) + threading.Thread(target=lambda: server.serve_forever()).start() + super(PrometheusClient, self).run() + + def close(self): + pass + + def get_or_render_line(self, k): + timestamp, value, line = self.buffer[k] + if not line: + # https://prometheus.io/docs/instrumenting/exposition_formats/ + name, metadata = k[0], k[1:] + metadata_str = ','.join(str(k) + '="' + str(v) + '"' for k, v in metadata) + line = name + '{' + metadata_str + '} ' + str(value) + ' ' + str(long(timestamp) * 1000) + '\r\n' + self.buffer[k] = timestamp, value, line + return line + + def tick(self): + now = time.time() + if (now - self.flush_timestamp) > 10: + keys_to_remove = [] + for k in self.buffer.keys(): + timestamp, value, line = self.buffer[k] + if (now - timestamp) > self.timeout: + keys_to_remove.append[k] + for k in keys_to_remove: + del self.buffer[k] + self.flush_timestamp = now + + def _send(self, name, value, mtime, value_name, metadata=None): + metadata_dict = dict(value=value_name) + if metadata: + metadata_dict.update(metadata) + metadata_tuple = (name,) + tuple((k, metadata_dict[k]) for k in sorted(metadata_dict.keys())) + self.buffer[metadata_tuple] = mtime, value, None + self.tick() + + def send(self, host, name, value, mtime, metadata=None): + self._send(name, value, mtime, 'value', metadata) + + def send_bulk(self, host, name, value, mtime, metadata=None): + for k in value.keys(): + self._send(name, value[k], mtime, k, metadata) From 45c5893d975a581207272d015d9e30200d07c131 Mon Sep 17 00:00:00 2001 From: jarek Date: Fri, 30 Jun 2017 10:48:04 +0100 Subject: [PATCH 37/40] Prometheus fixes --- bucky/prometheus.py | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/bucky/prometheus.py b/bucky/prometheus.py index f25552e..dd8ae1b 100644 --- a/bucky/prometheus.py +++ b/bucky/prometheus.py @@ -37,10 +37,10 @@ def do_GET(req): req.end_headers() else: req.send_response(200) - req.send_header("Content-type", "text/plain") + req.send_header("Content-Type", "text/plain; version=0.0.4") req.end_headers() - for k in self.buffer.keys(): - req.wfile.write(self.get_or_render_line(k).encode()) + response = ''.join(self.get_or_render_line(k) for k in self.buffer.keys()) + req.wfile.write(response.encode()) handler = type('PrometheusHandler', (_http.BaseHTTPRequestHandler,), {'do_GET': do_GET}) server = _http.HTTPServer(('0.0.0.0', self.port), handler) @@ -56,7 +56,9 @@ def get_or_render_line(self, k): # https://prometheus.io/docs/instrumenting/exposition_formats/ name, metadata = k[0], k[1:] metadata_str = ','.join(str(k) + '="' + str(v) + '"' for k, v in metadata) - line = name + '{' + metadata_str + '} ' + str(value) + ' ' + str(long(timestamp) * 1000) + '\r\n' + # Lines MUST end with \n (not \r\n), the last line MUST also end with \n + # Otherwise, Prometheus will reject the whole scrape! + line = name + '{' + metadata_str + '} ' + str(value) + ' ' + str(long(timestamp) * 1000) + '\n' self.buffer[k] = timestamp, value, line return line @@ -67,7 +69,7 @@ def tick(self): for k in self.buffer.keys(): timestamp, value, line = self.buffer[k] if (now - timestamp) > self.timeout: - keys_to_remove.append[k] + keys_to_remove.append(k) for k in keys_to_remove: del self.buffer[k] self.flush_timestamp = now From 42d8080e210d76e9ae81e9ea31ecc0edf806e856 Mon Sep 17 00:00:00 2001 From: jarek Date: Fri, 30 Jun 2017 11:00:03 +0100 Subject: [PATCH 38/40] More metric renaming, mostly to avoid clash with Prometheus use of "instance" --- bucky/collector.py | 6 +++--- bucky/dockerstats.py | 4 ++-- bucky/systemstats.py | 8 ++++---- 3 files changed, 9 insertions(+), 9 deletions(-) diff --git a/bucky/collector.py b/bucky/collector.py index bd859ae..198926e 100644 --- a/bucky/collector.py +++ b/bucky/collector.py @@ -34,7 +34,7 @@ def run(self): def collect(self): raise NotImplementedError() - def add_stat(self, name, value, timestamp, **metadata): + def add_stat(self, metric_name, metric_value, timestamp, **metadata): if metadata: if self.metadata: metadata.update(self.metadata) @@ -42,9 +42,9 @@ def add_stat(self, name, value, timestamp, **metadata): metadata = self.metadata if metadata: metadata_tuple = tuple((k, metadata[k]) for k in sorted(metadata.keys())) - self.queue.put((None, name, value, timestamp, metadata_tuple)) + self.queue.put((None, metric_name, metric_value, timestamp, metadata_tuple)) else: - self.queue.put((None, name, value, timestamp)) + self.queue.put((None, metric_name, metric_value, timestamp)) def merge_dicts(self, *dicts): ret = {} diff --git a/bucky/dockerstats.py b/bucky/dockerstats.py index 0d3685c..96526c7 100644 --- a/bucky/dockerstats.py +++ b/bucky/dockerstats.py @@ -35,7 +35,7 @@ def read_df_stats(self, now, labels, total_size, rw_size): def read_cpu_stats(self, now, labels, stats): for k, v in enumerate(stats[u'percpu_usage']): - self.add_stat("docker_cpu", {'usage': long(v)}, now, instance=k, **labels) + self.add_stat("docker_cpu", {'usage': long(v)}, now, name=k, **labels) def read_interface_stats(self, now, labels, stats): for k in stats.keys(): @@ -45,7 +45,7 @@ def read_interface_stats(self, now, labels, stats): u'tx_bytes', u'tx_packets', u'tx_errors', u'tx_dropped' ) docker_interface_stats = {k: long(v[k]) for k in keys} - self.add_stat("docker_interface", docker_interface_stats, now, instance=k, **labels) + self.add_stat("docker_interface", docker_interface_stats, now, name=k, **labels) def read_memory_stats(self, now, labels, stats): self.add_stat("docker_memory", {'used_bytes': long(stats[u'usage'])}, now, **labels) diff --git a/bucky/systemstats.py b/bucky/systemstats.py index a48c167..97bdb2a 100644 --- a/bucky/systemstats.py +++ b/bucky/systemstats.py @@ -72,7 +72,7 @@ def read_cpu_stats(self): if not cpu_suffix: continue cpu_stats = {k: long(v) for k, v in zip(self.CPU_FIELDS, tokens[1:])} - self.add_stat("system_cpu", cpu_stats, now, instance=cpu_suffix) + self.add_stat("system_cpu", cpu_stats, now, name=cpu_suffix) if processes_stats: self.add_stat("system_processes", processes_stats, now) @@ -101,7 +101,7 @@ def read_filesystem_stats(self): 'free_inodes': long(stats.f_favail), 'total_inodes': total_inodes } - self.add_stat("system_filesystem", df_stats, now, target=mount_target, instance=mount_path, type=mount_filesystem) + self.add_stat("system_filesystem", df_stats, now, device=mount_target, name=mount_path, type=mount_filesystem) except OSError: pass @@ -127,7 +127,7 @@ def read_interface_stats(self): 'tx_errors': long(tokens[11]), 'tx_dropped': long(tokens[12]) } - self.add_stat("system_interface", interface_stats, now, instance=interface_name) + self.add_stat("system_interface", interface_stats, now, name=interface_name) def read_load_stats(self): now = int(time.time()) @@ -191,4 +191,4 @@ def read_disk_stats(self): 'io_time': long(tokens[12]), 'weighted_time': long(tokens[13]) } - self.add_stat("system_disk", disk_stats, now, instance=disk_name) + self.add_stat("system_disk", disk_stats, now, name=disk_name) From 1a693d36649f7ac019b2ad383282cfaa602241f6 Mon Sep 17 00:00:00 2001 From: jarek Date: Fri, 30 Jun 2017 11:24:26 +0100 Subject: [PATCH 39/40] Fix for Py2 --- bucky/prometheus.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bucky/prometheus.py b/bucky/prometheus.py index dd8ae1b..934334a 100644 --- a/bucky/prometheus.py +++ b/bucky/prometheus.py @@ -42,7 +42,7 @@ def do_GET(req): response = ''.join(self.get_or_render_line(k) for k in self.buffer.keys()) req.wfile.write(response.encode()) - handler = type('PrometheusHandler', (_http.BaseHTTPRequestHandler,), {'do_GET': do_GET}) + handler = type('PrometheusHandler', (_http.BaseHTTPRequestHandler, object), {'do_GET': do_GET}) server = _http.HTTPServer(('0.0.0.0', self.port), handler) threading.Thread(target=lambda: server.serve_forever()).start() super(PrometheusClient, self).run() From 2e5972e199cd85f1b03ef5685a531fbba47a76a1 Mon Sep 17 00:00:00 2001 From: jarek Date: Sat, 1 Jul 2017 17:46:45 +0100 Subject: [PATCH 40/40] Fix tests --- bucky/statsd.py | 4 ++-- tests/test_001_statsd.py | 6 +++--- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/bucky/statsd.py b/bucky/statsd.py index 64fbf35..31ce6cd 100644 --- a/bucky/statsd.py +++ b/bucky/statsd.py @@ -154,7 +154,7 @@ def load_gauges(self): log.exception("StatsD: IOError") else: for gauge_name, gauge_metadata, gauge_value in gauges: - k = (gauge_name, tuple(gauge_metadata)) + k = (gauge_name, tuple(gauge_metadata) if gauge_metadata else None) self.gauges[k] = gauge_value self.keys_seen.add(k) @@ -226,7 +226,7 @@ def enqueue_with_metadata_names(self, bucket, name, value, stime, metadata=None) # No hostnames on statsd metadata = self.coalesce_metadata(metadata) if metadata: - if name and not 'name' in metadata: + if name and not ('name' in metadata): metadata['name'] = name metadata_tuple = tuple((k, metadata[k]) for k in sorted(metadata.keys())) self.queue.put((None, bucket, value, stime, metadata_tuple)) diff --git a/tests/test_001_statsd.py b/tests/test_001_statsd.py index ce49984..71ea170 100644 --- a/tests/test_001_statsd.py +++ b/tests/test_001_statsd.py @@ -235,15 +235,15 @@ def test_simple_persistent_gauges(q, s): os.unlink(os.path.join(t.cfg.directory, t.cfg.statsd_gauges_savefile)) try: s.handle_line("gorm:5|g") - assert s.gauges[("gorm", ())] == 5 + assert s.gauges[("gorm", None)] == 5 s.save_gauges() s.handle_line("gorm:1|g") - assert s.gauges[("gorm", ())] == 1 + assert s.gauges[("gorm", None)] == 1 s.load_gauges() - assert s.gauges[("gorm", ())] == 5 + assert s.gauges[("gorm", None)] == 5 finally: if os.path.isfile(os.path.join(t.cfg.directory, t.cfg.statsd_gauges_savefile)): os.unlink(os.path.join(t.cfg.directory, t.cfg.statsd_gauges_savefile))