diff --git a/README.rst b/README.rst index aed34dc..12f5296 100644 --- a/README.rst +++ b/README.rst @@ -334,10 +334,13 @@ like so:: enabled = true bind-address = ":8089" database = "mydatabase" + precision = "1s" Bucky will periodically resolve all hostnames in the `influxdb_hosts` list and fan out metrics to all resolved endpoints. Thus providing replication as well as hot swapping. +Note the precision being 1sec, it is currently the maximum precision +bucky uses when sending data to InfluxDB, hence the setting. A note on CollectD converters diff --git a/bucky/carbon.py b/bucky/carbon.py index 57d7e74..6e1ee19 100644 --- a/bucky/carbon.py +++ b/bucky/carbon.py @@ -53,7 +53,7 @@ def __init__(self, cfg, pipe): self.backoff_max = cfg.graphite_backoff_max if self.max_reconnects <= 0: self.max_reconnects = sys.maxint - self.connect() + self.connected = False def connect(self): if self.debug: @@ -61,10 +61,11 @@ def connect(self): self.sock = DebugSocket() return reconnect_delay = self.reconnect_delay + self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) for i in xrange(self.max_reconnects): - self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) try: self.sock.connect((self.ip, self.port)) + self.connected = True log.info("Connected to Carbon at %s:%s", self.ip, self.port) return except socket.error as e: @@ -88,9 +89,12 @@ def close(self): self.sock.close() except: pass + self.connected = False - def send(self, host, name, value, mtime, metadata=None): - raise NotImplementedError() + def send_message(self, mesg): + if not self.connected: + self.connect() + self.sock.sendall(mesg) class PlaintextClient(CarbonClient): @@ -99,7 +103,7 @@ def send(self, host, name, value, mtime, metadata=None): mesg = "%s %s %s\n" % (stat, value, mtime) for i in xrange(self.max_reconnects): try: - self.sock.sendall(mesg) + self.send_message(mesg) return except socket.error as err: log.error("Failed to send data to Carbon server: %s", err) @@ -128,7 +132,7 @@ def transmit(self): self.buffer = [] for i in xrange(self.max_reconnects): try: - self.sock.sendall(header + payload) + self.send_message(header + payload) return except socket.error as err: log.error("Failed to send data to Carbon server: %s", err) diff --git a/bucky/cfg.py b/bucky/cfg.py index 35da6a6..765465c 100644 --- a/bucky/cfg.py +++ b/bucky/cfg.py @@ -7,6 +7,8 @@ directory = "/var/lib/bucky" process_join_timeout = 2 +metadata = [] + sentry_enabled = False sentry_dsn = None sentry_log_level = "WARNING" @@ -34,7 +36,6 @@ statsd_port = 8125 statsd_enabled = True statsd_flush_time = 10.0 -statsd_metadata = {} statsd_legacy_namespace = True statsd_global_prefix = "stats" statsd_prefix_counter = "counters" @@ -48,11 +49,15 @@ statsd_delete_counters = True statsd_delete_timers = True statsd_delete_sets = True -# statsd_delete_gauges = False -# `statsd_delete_gauges = True` would make gauges in practice useless, -# except if you get an absolute(!) value every flush-interval which would makes this setting irrelevant -statsd_onlychanged_gauges = True +# `statsd_delete_gauges = True` would make gauges in practice useless, except if you get an absolute(!) +# value every flush-interval which would makes this setting irrelevant, so this option doesn't exist. # send gauge value to graphite only if there was a change +statsd_onlychanged_gauges = True +# Disable this only if you want "bad line" be reported for lines with DataDog extensions +statsd_ignore_datadog_extensions = True +statsd_ignore_internal_stats = False +# Use metadata name=NAME instead of the original/legacy naming scheme +statsd_metadata_namespace = False statsd_percentile_thresholds = [90] # percentile thresholds for statsd timers @@ -81,6 +86,11 @@ "127.0.0.1:8089" ] +prometheus_enabled = False +prometheus_port = 9090 +prometheus_timeout = 60 +prometheus_path = 'metrics' + full_trace = False name_prefix = None @@ -91,7 +101,18 @@ name_strip_duplicates = True name_host_trim = [] -custom_clients = [] +system_stats_enabled = False +system_stats_interval = 10 +system_stats_filesystem_blacklist = ['tmpfs', 'aufs', 'rootfs', 'devtmpfs'] +system_stats_filesystem_whitelist = None +system_stats_interface_blacklist = None +system_stats_interface_whitelist = None +system_stats_disk_blacklist = ['loop0', 'loop1', 'loop2', 'loop3', 'loop4', 'loop5', 'loop6', 'loop7'] +system_stats_disk_whitelist = None + +docker_stats_enabled = False +docker_stats_interval = 10 +docker_stats_version = '1.22' processor = None processor_drop_on_error = False diff --git a/bucky/client.py b/bucky/client.py index d2fd858..fd47a4e 100644 --- a/bucky/client.py +++ b/bucky/client.py @@ -37,12 +37,29 @@ def run(self): setproctitle("bucky: %s" % self.__class__.__name__) while True: try: + if not self.pipe.poll(1): + self.tick() + continue sample = self.pipe.recv() except KeyboardInterrupt: continue if sample is None: break - self.send(*sample) + if type(sample[2]) is dict: + self.send_bulk(*sample) + else: + self.send(*sample) def send(self, host, name, value, time, metadata=None): raise NotImplementedError() + + def send_bulk(self, host, name, value, time, metadata=None): + for k in value.keys(): + if name.endswith('.'): + metric_name = name + k + else: + metric_name = name + '.' + k + self.send(host, metric_name, value[k], time, metadata) + + def tick(self): + pass diff --git a/bucky/collector.py b/bucky/collector.py new file mode 100644 index 0000000..198926e --- /dev/null +++ b/bucky/collector.py @@ -0,0 +1,54 @@ + + +import time +import multiprocessing + + +try: + from setproctitle import setproctitle +except ImportError: + def setproctitle(title): + pass + + +class StatsCollector(multiprocessing.Process): + def __init__(self, queue): + super(StatsCollector, self).__init__() + self.queue = queue + + def close(self): + pass + + def run(self): + setproctitle("bucky: %s" % self.__class__.__name__) + interval = self.interval + while True: + start_timestamp = time.time() + interval = self.interval if self.collect() else interval + interval + stop_timestamp = time.time() + interval = min(interval, 300) + interval = interval - (stop_timestamp - start_timestamp) + if interval > 0.1: + time.sleep(interval) + + def collect(self): + raise NotImplementedError() + + def add_stat(self, metric_name, metric_value, timestamp, **metadata): + if metadata: + if self.metadata: + metadata.update(self.metadata) + else: + metadata = self.metadata + if metadata: + metadata_tuple = tuple((k, metadata[k]) for k in sorted(metadata.keys())) + self.queue.put((None, metric_name, metric_value, timestamp, metadata_tuple)) + else: + self.queue.put((None, metric_name, metric_value, timestamp)) + + def merge_dicts(self, *dicts): + ret = {} + for d in dicts: + if d: + ret.update(d) + return ret diff --git a/bucky/dockerstats.py b/bucky/dockerstats.py new file mode 100644 index 0000000..96526c7 --- /dev/null +++ b/bucky/dockerstats.py @@ -0,0 +1,69 @@ + +import time +import docker +import logging +import bucky.collector as collector + +import six +import requests.exceptions + + +if six.PY3: + xrange = range + long = int + + +log = logging.getLogger(__name__) + + +class DockerStatsCollector(collector.StatsCollector): + def __init__(self, queue, cfg): + super(DockerStatsCollector, self).__init__(queue) + self.metadata = cfg.metadata if cfg.metadata else {} + self.interval = cfg.docker_stats_interval + if cfg.docker_stats_version: + self.docker_client = docker.client.from_env(version=cfg.docker_stats_version) + else: + self.docker_client = docker.client.from_env() + + def read_df_stats(self, now, labels, total_size, rw_size): + docker_df_stats = { + 'total_bytes': long(total_size), + 'used_bytes': long(rw_size) + } + self.add_stat("docker_filesystem", docker_df_stats, now, **labels) + + def read_cpu_stats(self, now, labels, stats): + for k, v in enumerate(stats[u'percpu_usage']): + self.add_stat("docker_cpu", {'usage': long(v)}, now, name=k, **labels) + + def read_interface_stats(self, now, labels, stats): + for k in stats.keys(): + v = stats[k] + keys = ( + u'rx_bytes', u'rx_packets', u'rx_errors', u'rx_dropped', + u'tx_bytes', u'tx_packets', u'tx_errors', u'tx_dropped' + ) + docker_interface_stats = {k: long(v[k]) for k in keys} + self.add_stat("docker_interface", docker_interface_stats, now, name=k, **labels) + + def read_memory_stats(self, now, labels, stats): + self.add_stat("docker_memory", {'used_bytes': long(stats[u'usage'])}, now, **labels) + + def collect(self): + now = int(time.time()) + try: + for i, container in enumerate(self.docker_client.api.containers(size=True)): + labels = container[u'Labels'] + if 'docker_id' not in labels: + labels['docker_id'] = container[u'Id'][:12] + stats = self.docker_client.api.stats(container[u'Id'], decode=True, stream=False) + self.read_df_stats(now, labels, long(container[u'SizeRootFs']), long(container.get(u'SizeRw', 0))) + self.read_cpu_stats(now, labels, stats[u'cpu_stats'][u'cpu_usage']) + self.read_memory_stats(now, labels, stats[u'memory_stats']) + self.read_interface_stats(now, labels, stats[u'networks']) + return True + except requests.exceptions.ConnectionError: + return False + except ValueError: + return False diff --git a/bucky/influxdb.py b/bucky/influxdb.py index c0e71f2..5a09498 100644 --- a/bucky/influxdb.py +++ b/bucky/influxdb.py @@ -69,12 +69,9 @@ def close(self): except: pass - def kv(self, k, v): - return str(k) + '=' + str(v) - - def flush(self): + def tick(self): now = time.time() - if len(self.buffer) > 30 or (now - self.flush_timestamp) > 3: + if len(self.buffer) > 10 or ((now - self.flush_timestamp) > 1 and len(self.buffer)): payload = '\n'.join(self.buffer).encode() self.resolve_hosts() for ip, port in self.resolved_hosts: @@ -82,21 +79,36 @@ def flush(self): self.buffer = [] self.flush_timestamp = now - def send(self, host, name, value, mtime, metadata=None): - buf = [name] - if host: - if metadata is None: - metadata = {'host': host} - else: - if 'host' not in metadata: - metadata['host'] = host - if metadata: - for k in metadata.keys(): - v = metadata[k] - # InfluxDB will drop insert with tags without values - if v is not None: - buf.append(self.kv(k, v)) + def _send(self, host, name, mtime, values, metadata=None): # https://docs.influxdata.com/influxdb/v1.2/write_protocols/line_protocol_tutorial/ - line = ' '.join((','.join(buf), self.kv('value', value), str(long(mtime) * 1000000000))) + label_buf = [name] + if not metadata and host: + metadata = ('host', host) + if metadata: + # InfluxDB docs recommend sorting tags + for k, v in metadata: + # InfluxDB will drop insert with empty tags + if v is None or v == '': + continue + v = str(v).replace(' ', '') + label_buf.append(str(k) + '=' + v) + value_buf = [] + for k in values.keys(): + v = values[k] + t = type(v) + if t is long or t is int: + value_buf.append(str(k) + '=' + str(v) + 'i') + elif t is float or t is bool: + value_buf.append(str(k) + '=' + str(v)) + elif t is str: + value_buf.append(str(k) + '="' + v + '"') + # So, the lower timestamp precisions don't seem to work with line protocol... + line = ' '.join((','.join(label_buf), ','.join(value_buf), str(long(mtime) * 1000000000))) self.buffer.append(line) - self.flush() + self.tick() + + def send(self, host, name, value, mtime, metadata=None): + self._send(host, name, mtime, {'value': value}, metadata) + + def send_bulk(self, host, name, value, mtime, metadata=None): + self._send(host, name.strip('.'), mtime, value, metadata) diff --git a/bucky/main.py b/bucky/main.py index 7fc9f6a..70bfab5 100644 --- a/bucky/main.py +++ b/bucky/main.py @@ -36,7 +36,10 @@ import bucky.collectd as collectd import bucky.metricsd as metricsd import bucky.statsd as statsd +import bucky.systemstats as systemstats +import bucky.dockerstats as dockerstats import bucky.influxdb as influxdb +import bucky.prometheus as prometheus import bucky.processor as processor from bucky.errors import BuckyError @@ -132,6 +135,21 @@ def options(): default=cfg.influxdb_enabled, action="store_true", help="Enable the InfluxDB line protocol client" ), + op.make_option( + "--enable-prometheus", dest="prometheus_enabled", + default=cfg.prometheus_enabled, action="store_true", + help="Enable the Prometheus exposition via HTTP" + ), + op.make_option( + "--enable-system-stats", dest="system_stats_enabled", + default=cfg.system_stats_enabled, action="store_true", + help="Enable collection of local system stats" + ), + op.make_option( + "--enable-docker-stats", dest="docker_stats_enabled", + default=cfg.docker_stats_enabled, action="store_true", + help="Enable collection of docker containers stats" + ), op.make_option( "--full-trace", dest="full_trace", default=cfg.full_trace, action="store_true", @@ -157,6 +175,7 @@ def options(): type="str", default=cfg.gid, help="Drop privileges to this group" ), + op.make_option("--metadata", action="append", dest="metadata") ] @@ -247,6 +266,21 @@ def main(): except: log.exception("Could not create directory: %s" % cfg.directory) + # This in place swap from list to dict is hideous :-| + metadata = {} + if cfg.metadata: + for i in cfg.metadata: + kv = i.split("=") + if len(kv) > 1: + metadata[kv[0]] = kv[1] + else: + kv = i.split(":") + if len(kv) > 1: + metadata[kv[0]] = kv[1] + else: + metadata[kv[0]] = None + cfg.metadata = metadata + bucky = Bucky(cfg) bucky.run() @@ -262,6 +296,10 @@ def __init__(self, cfg): stypes.append(collectd.getCollectDServer) if cfg.statsd_enabled: stypes.append(statsd.StatsDServer) + if cfg.system_stats_enabled: + stypes.append(systemstats.SystemStatsCollector) + if cfg.docker_stats_enabled: + stypes.append(dockerstats.DockerStatsCollector) self.servers = [] for stype in stypes: @@ -275,18 +313,20 @@ def __init__(self, cfg): self.proc = None self.psampleq = self.sampleq - default_clients = [] + requested_clients = [] if cfg.graphite_enabled: if cfg.graphite_pickle_enabled: carbon_client = carbon.PickleClient else: carbon_client = carbon.PlaintextClient - default_clients.append(carbon_client) + requested_clients.append(carbon_client) if cfg.influxdb_enabled: - default_clients.append(influxdb.InfluxDBClient) + requested_clients.append(influxdb.InfluxDBClient) + if cfg.prometheus_enabled: + requested_clients.append(prometheus.PrometheusClient) self.clients = [] - for client in cfg.custom_clients + default_clients: + for client in requested_clients: send, recv = multiprocessing.Pipe() instance = client(cfg, recv) self.clients.append((instance, send)) diff --git a/bucky/prometheus.py b/bucky/prometheus.py new file mode 100644 index 0000000..934334a --- /dev/null +++ b/bucky/prometheus.py @@ -0,0 +1,90 @@ + +import six +import time +import logging +import threading + +try: + import http.server as _http +except ImportError: + import BaseHTTPServer as _http + +import bucky.client as client + + +if six.PY3: + xrange = range + long = int + + +log = logging.getLogger(__name__) + + +class PrometheusClient(client.Client): + def __init__(self, cfg, pipe): + super(PrometheusClient, self).__init__(pipe) + self.port = cfg.prometheus_port + self.timeout = cfg.prometheus_timeout + self.path = cfg.prometheus_path + self.flush_timestamp = time.time() + self.buffer = {} + + def run(self): + def do_GET(req): + if req.path.strip('/') != self.path: + req.send_response(404) + req.send_header("Content-type", "text/plain") + req.end_headers() + else: + req.send_response(200) + req.send_header("Content-Type", "text/plain; version=0.0.4") + req.end_headers() + response = ''.join(self.get_or_render_line(k) for k in self.buffer.keys()) + req.wfile.write(response.encode()) + + handler = type('PrometheusHandler', (_http.BaseHTTPRequestHandler, object), {'do_GET': do_GET}) + server = _http.HTTPServer(('0.0.0.0', self.port), handler) + threading.Thread(target=lambda: server.serve_forever()).start() + super(PrometheusClient, self).run() + + def close(self): + pass + + def get_or_render_line(self, k): + timestamp, value, line = self.buffer[k] + if not line: + # https://prometheus.io/docs/instrumenting/exposition_formats/ + name, metadata = k[0], k[1:] + metadata_str = ','.join(str(k) + '="' + str(v) + '"' for k, v in metadata) + # Lines MUST end with \n (not \r\n), the last line MUST also end with \n + # Otherwise, Prometheus will reject the whole scrape! + line = name + '{' + metadata_str + '} ' + str(value) + ' ' + str(long(timestamp) * 1000) + '\n' + self.buffer[k] = timestamp, value, line + return line + + def tick(self): + now = time.time() + if (now - self.flush_timestamp) > 10: + keys_to_remove = [] + for k in self.buffer.keys(): + timestamp, value, line = self.buffer[k] + if (now - timestamp) > self.timeout: + keys_to_remove.append(k) + for k in keys_to_remove: + del self.buffer[k] + self.flush_timestamp = now + + def _send(self, name, value, mtime, value_name, metadata=None): + metadata_dict = dict(value=value_name) + if metadata: + metadata_dict.update(metadata) + metadata_tuple = (name,) + tuple((k, metadata_dict[k]) for k in sorted(metadata_dict.keys())) + self.buffer[metadata_tuple] = mtime, value, None + self.tick() + + def send(self, host, name, value, mtime, metadata=None): + self._send(name, value, mtime, 'value', metadata) + + def send_bulk(self, host, name, value, mtime, metadata=None): + for k in value.keys(): + self._send(name, value[k], mtime, k, metadata) diff --git a/bucky/statsd.py b/bucky/statsd.py index ed01535..31ce6cd 100644 --- a/bucky/statsd.py +++ b/bucky/statsd.py @@ -67,9 +67,9 @@ def make_name(parts): return name -class StatsDHandler(threading.Thread): +class StatsDServer(udpserver.UDPServer): def __init__(self, queue, cfg): - super(StatsDHandler, self).__init__() + super(StatsDServer, self).__init__(cfg.statsd_ip, cfg.statsd_port) self.daemon = True self.queue = queue self.cfg = cfg @@ -85,14 +85,23 @@ def __init__(self, queue, cfg): self.prefix_timer = cfg.statsd_prefix_timer self.prefix_gauge = cfg.statsd_prefix_gauge self.prefix_set = cfg.statsd_prefix_set - self.metadata = cfg.statsd_metadata + self.metadata_dict = cfg.metadata if cfg.metadata else {} + self.metadata_tuple = tuple((k, self.metadata_dict[k]) for k in sorted(self.metadata_dict.keys())) self.key_res = ( (re.compile("\s+"), "_"), (re.compile("\/"), "-"), (re.compile("[^a-zA-Z_\-0-9\.]"), "") ) - if self.legacy_namespace: + self.enqueue = self.enqueue_with_dotted_names + if cfg.statsd_metadata_namespace: + self.name_global = self.global_prefix + self.name_counter = self.global_prefix + self.prefix_counter + self.name_timer = self.global_prefix + self.prefix_timer + self.name_gauge = self.global_prefix + self.prefix_gauge + self.name_set = self.global_prefix + self.prefix_set + self.enqueue = self.enqueue_with_metadata_names + elif self.legacy_namespace: self.name_global = 'stats.' self.name_legacy_rate = 'stats.' self.name_legacy_count = 'stats_counts.' @@ -111,12 +120,14 @@ def __init__(self, queue, cfg): self.pct_thresholds = cfg.statsd_percentile_thresholds - self.keys_seen = {} + self.keys_seen = set() self.delete_idlestats = cfg.statsd_delete_idlestats self.delete_counters = self.delete_idlestats and cfg.statsd_delete_counters self.delete_timers = self.delete_idlestats and cfg.statsd_delete_timers self.delete_sets = self.delete_idlestats and cfg.statsd_delete_sets self.onlychanged_gauges = self.delete_idlestats and cfg.statsd_onlychanged_gauges + self.ignore_datadog_extensions = cfg.statsd_ignore_datadog_extensions + self.ignore_internal_stats = cfg.statsd_ignore_internal_stats self.enable_timer_mean = cfg.statsd_timer_mean self.enable_timer_upper = cfg.statsd_timer_upper @@ -128,6 +139,9 @@ def __init__(self, queue, cfg): self.enable_timer_median = cfg.statsd_timer_median self.enable_timer_std = cfg.statsd_timer_std + def pre_shutdown(self): + self.save_gauges() + def load_gauges(self): if not self.statsd_persistent_gauges: return @@ -139,71 +153,100 @@ def load_gauges(self): except IOError: log.exception("StatsD: IOError") else: - self.gauges.update({k: gauges[k][0] for k in gauges.keys()}) - self.keys_seen.update({k: gauges[k][1] for k in gauges.keys()}) + for gauge_name, gauge_metadata, gauge_value in gauges: + k = (gauge_name, tuple(gauge_metadata) if gauge_metadata else None) + self.gauges[k] = gauge_value + self.keys_seen.add(k) def save_gauges(self): if not self.statsd_persistent_gauges: return try: - gauges = {} + gauges = [] for k in self.gauges.keys(): - gauges[k] = (self.gauges[k], self.keys_seen.get(k, None)) + gauge_name, gauge_metadata = k + gauges.append((gauge_name, gauge_metadata, self.gauges[k])) write_json_file(self.gauges_filename, gauges) except IOError: log.exception("StatsD: IOError") def tick(self): - name_global_numstats = self.name_global + "numStats" stime = int(time.time()) with self.lock: if self.delete_timers: - rem_keys = set(self.timers.keys()) - set(self.keys_seen.keys()) + rem_keys = set(self.timers.keys()) - self.keys_seen for k in rem_keys: del self.timers[k] if self.delete_counters: - rem_keys = set(self.counters.keys()) - set(self.keys_seen.keys()) + rem_keys = set(self.counters.keys()) - self.keys_seen for k in rem_keys: del self.counters[k] if self.delete_sets: - rem_keys = set(self.sets.keys()) - set(self.keys_seen.keys()) + rem_keys = set(self.sets.keys()) - self.keys_seen for k in rem_keys: del self.sets[k] num_stats = self.enqueue_timers(stime) - kept_keys = set(self.timers.keys()) num_stats += self.enqueue_counters(stime) - kept_keys = kept_keys.union(set(self.counters.keys())) num_stats += self.enqueue_gauges(stime) - kept_keys = kept_keys.union(set(self.gauges.keys())) num_stats += self.enqueue_sets(stime) - kept_keys = kept_keys.union(set(self.sets.keys())) - self.enqueue(name_global_numstats, num_stats, stime) - self.keys_seen = {k: self.keys_seen[k] for k in kept_keys if k in self.keys_seen} + if not self.ignore_internal_stats: + self.enqueue(self.name_global, None, {"numStats": num_stats}, stime) + self.keys_seen = set() def run(self): - while True: - time.sleep(self.flush_time) - self.tick() + def flush_loop(): + while True: + time.sleep(self.flush_time) + self.tick() + self.load_gauges() + threading.Thread(target=flush_loop).start() + super(StatsDServer, self).run() + + def coalesce_metadata(self, metadata): + if not metadata: + return self.metadata_dict + if self.metadata_dict: + tmp = self.metadata_dict.copy() + tmp.update(metadata) + return tmp + return dict(metadata) - def enqueue(self, name, stat, stime, metadata_key=None): + def enqueue_with_dotted_names(self, bucket, name, value, stime, metadata=None): # No hostnames on statsd - if metadata_key: - metadata = self.keys_seen.get(metadata_key, None) + if name: + bucket += name + metadata = self.coalesce_metadata(metadata) + if metadata: + metadata_tuple = tuple((k, metadata[k]) for k in sorted(metadata.keys())) + self.queue.put((None, bucket, value, stime, metadata_tuple)) else: - metadata = self.metadata + self.queue.put((None, bucket, value, stime)) + + def enqueue_with_metadata_names(self, bucket, name, value, stime, metadata=None): + # No hostnames on statsd + metadata = self.coalesce_metadata(metadata) if metadata: - self.queue.put((None, name, stat, stime, metadata)) + if name and not ('name' in metadata): + metadata['name'] = name + metadata_tuple = tuple((k, metadata[k]) for k in sorted(metadata.keys())) + self.queue.put((None, bucket, value, stime, metadata_tuple)) else: - self.queue.put((None, name, stat, stime)) + if name: + self.queue.put((None, bucket, value, stime, (('name', name),))) + else: + self.queue.put((None, bucket, value, stime)) def enqueue_timers(self, stime): ret = 0 iteritems = self.timers.items() if six.PY3 else self.timers.iteritems() for k, v in iteritems: + timer_name, timer_metadata = k + timer_stats = {} + # Skip timers that haven't collected any values if not v: - self.enqueue("%s%s.count" % (self.name_timer, k), 0, stime, k) - self.enqueue("%s%s.count_ps" % (self.name_timer, k), 0.0, stime, k) + timer_stats['count'] = 0 + timer_stats['count_ps'] = 0.0 else: v.sort() count = len(v) @@ -225,58 +268,63 @@ def enqueue_timers(self, stime): vsum = cumulative_values[thresh_idx - 1] t = int(pct_thresh) + t_suffix = "_%s" % (t,) if self.enable_timer_mean: mean = vsum / float(thresh_idx) - self.enqueue("%s%s.mean_%s" % (self.name_timer, k, t), mean, stime, k) + timer_stats["mean" + t_suffix] = mean if self.enable_timer_upper: vthresh = v[thresh_idx - 1] - self.enqueue("%s%s.upper_%s" % (self.name_timer, k, t), vthresh, stime, k) + timer_stats["upper" + t_suffix] = vthresh if self.enable_timer_count: - self.enqueue("%s%s.count_%s" % (self.name_timer, k, t), thresh_idx, stime, k) + timer_stats["count" + t_suffix] = thresh_idx if self.enable_timer_sum: - self.enqueue("%s%s.sum_%s" % (self.name_timer, k, t), vsum, stime, k) + timer_stats["sum" + t_suffix] = vsum if self.enable_timer_sum_squares: vsum_squares = cumul_sum_squares_values[thresh_idx - 1] - self.enqueue("%s%s.sum_squares_%s" % (self.name_timer, k, t), vsum_squares, stime, k) + timer_stats["sum_squares" + t_suffix] = vsum_squares vsum = cumulative_values[count - 1] mean = vsum / float(count) if self.enable_timer_mean: - self.enqueue("%s%s.mean" % (self.name_timer, k), mean, stime, k) + timer_stats["mean"] = mean if self.enable_timer_upper: - self.enqueue("%s%s.upper" % (self.name_timer, k), vmax, stime, k) + timer_stats["upper"] = vmax if self.enable_timer_lower: - self.enqueue("%s%s.lower" % (self.name_timer, k), vmin, stime, k) + timer_stats["lower"] = vmin if self.enable_timer_count: - self.enqueue("%s%s.count" % (self.name_timer, k), count, stime, k) + timer_stats["count"] = count if self.enable_timer_count_ps: - self.enqueue("%s%s.count_ps" % (self.name_timer, k), float(count) / self.flush_time, stime, k) + timer_stats["count_ps"] = float(count) / self.flush_time if self.enable_timer_median: mid = int(count / 2) median = (v[mid - 1] + v[mid]) / 2.0 if count % 2 == 0 else v[mid] - self.enqueue("%s%s.median" % (self.name_timer, k), median, stime, k) + timer_stats["median"] = median if self.enable_timer_sum: - self.enqueue("%s%s.sum" % (self.name_timer, k), vsum, stime, k) + timer_stats["sum"] = vsum if self.enable_timer_sum_squares: vsum_squares = cumul_sum_squares_values[count - 1] - self.enqueue("%s%s.sum_squares" % (self.name_timer, k), vsum_squares, stime, k) + timer_stats["sum_squares"] = vsum_squares if self.enable_timer_std: sum_of_diffs = sum(((value - mean) ** 2 for value in v)) stddev = math.sqrt(sum_of_diffs / count) - self.enqueue("%s%s.std" % (self.name_timer, k), stddev, stime, k) + timer_stats["std"] = stddev + + if timer_stats: + self.enqueue(self.name_timer, timer_name, timer_stats, stime, timer_metadata) + self.timers[k] = [] ret += 1 @@ -286,7 +334,8 @@ def enqueue_sets(self, stime): ret = 0 iteritems = self.sets.items() if six.PY3 else self.sets.iteritems() for k, v in iteritems: - self.enqueue("%s%s.count" % (self.name_set, k), len(v), stime, k) + set_name, set_metadata = k + self.enqueue(self.name_set, set_name, {"count": len(v)}, stime, set_metadata) ret += 1 self.sets[k] = set() return ret @@ -295,9 +344,10 @@ def enqueue_gauges(self, stime): ret = 0 iteritems = self.gauges.items() if six.PY3 else self.gauges.iteritems() for k, v in iteritems: + gauge_name, gauge_metadata = k # only send a value if there was an update if `delete_idlestats` is `True` if not self.onlychanged_gauges or k in self.keys_seen: - self.enqueue("%s%s" % (self.name_gauge, k), v, stime, k) + self.enqueue(self.name_gauge, gauge_name, v, stime, gauge_metadata) ret += 1 return ret @@ -305,50 +355,58 @@ def enqueue_counters(self, stime): ret = 0 iteritems = self.counters.items() if six.PY3 else self.counters.iteritems() for k, v in iteritems: + counter_name, counter_metadata = k if self.legacy_namespace: - stat_rate = "%s%s" % (self.name_legacy_rate, k) - stat_count = "%s%s" % (self.name_legacy_count, k) + self.enqueue(self.name_legacy_rate, counter_name, v / self.flush_time, stime, counter_metadata) + self.enqueue(self.name_legacy_count, counter_name, v, stime, counter_metadata) else: - stat_rate = "%s%s.rate" % (self.name_counter, k) - stat_count = "%s%s.count" % (self.name_counter, k) - self.enqueue(stat_rate, v / self.flush_time, stime, k) - self.enqueue(stat_count, v, stime, k) + stats = { + 'rate': v / self.flush_time, + 'count': v + } + self.enqueue(self.name_counter, counter_name, stats, stime, counter_metadata) self.counters[k] = 0 ret += 1 return ret - def handle(self, data): + def handle(self, data, addr): # Adding a bit of extra sauce so clients can # send multiple samples in a single UDP # packet. + if six.PY3: + data = data.decode() for line in data.splitlines(): self.line = line if not line.strip(): continue self.handle_line(line) + return True - def handle_tags(self, line): + def handle_metadata(self, line): # http://docs.datadoghq.com/guides/dogstatsd/#datagram-format bits = line.split("#") if len(bits) < 2: return line, None - tags = {} + metadata = {} for i in bits[1].split(","): kv = i.split("=") if len(kv) > 1: - tags[kv[0]] = kv[1] + metadata[kv[0]] = kv[1] else: kv = i.split(":") if len(kv) > 1: - tags[kv[0]] = kv[1] + metadata[kv[0]] = kv[1] else: - tags[kv[0]] = None - return bits[0], tags + metadata[kv[0]] = None + return bits[0], tuple((k, metadata[k]) for k in sorted(metadata.keys())) def handle_line(self, line): - line, tags = self.handle_tags(line) + if self.ignore_datadog_extensions: + if line.startswith('sc|') or line.startswith('_e{'): + return + line, metadata = self.handle_metadata(line) bits = line.split(":") - key = self.handle_key(bits.pop(0), tags) + key = self.handle_key(bits.pop(0), metadata) if not bits: self.bad_line() @@ -372,16 +430,11 @@ def handle_line(self, line): else: self.handle_counter(key, fields) - def handle_key(self, key, tags=None): - if tags is None: - coalesced_tags = self.metadata - else: - coalesced_tags = tags - if self.metadata: - coalesced_tags.update(self.metadata) + def handle_key(self, key, metadata): for (rexp, repl) in self.key_res: key = rexp.sub(repl, key) - self.keys_seen[key] = coalesced_tags + key = (key, metadata) + self.keys_seen.add(key) return key def handle_timer(self, key, fields): @@ -432,30 +485,3 @@ def handle_counter(self, key, fields): def bad_line(self): log.error("StatsD: Invalid line: '%s'", self.line.strip()) - - -class StatsDServer(udpserver.UDPServer): - def __init__(self, queue, cfg): - super(StatsDServer, self).__init__(cfg.statsd_ip, cfg.statsd_port) - self.handler = StatsDHandler(queue, cfg) - - def pre_shutdown(self): - self.handler.save_gauges() - - def run(self): - self.handler.load_gauges() - self.handler.start() - super(StatsDServer, self).run() - - if six.PY3: - def handle(self, data, addr): - self.handler.handle(data.decode()) - if not self.handler.is_alive(): - return False - return True - else: - def handle(self, data, addr): - self.handler.handle(data) - if not self.handler.is_alive(): - return False - return True diff --git a/bucky/systemstats.py b/bucky/systemstats.py new file mode 100644 index 0000000..97bdb2a --- /dev/null +++ b/bucky/systemstats.py @@ -0,0 +1,194 @@ + +import os +import time +import logging +import bucky.collector as collector + +import six + + +if six.PY3: + xrange = range + long = int + + +log = logging.getLogger(__name__) + + +class SystemStatsCollector(collector.StatsCollector): + # The order of cpu fields in /proc/stat + CPU_FIELDS = ('user', 'nice', 'system', 'idle', 'wait', 'interrupt', 'softirq', 'steal') + + def __init__(self, queue, cfg): + super(SystemStatsCollector, self).__init__(queue) + self.metadata = cfg.metadata if cfg.metadata else {} + self.interval = cfg.system_stats_interval + self.filesystem_blacklist, self.filesystem_whitelist = self.get_lists(cfg.system_stats_filesystem_blacklist, + cfg.system_stats_filesystem_whitelist) + self.interface_blacklist, self.interface_whitelist = self.get_lists(cfg.system_stats_interface_blacklist, + cfg.system_stats_interface_whitelist) + self.disk_blacklist, self.disk_whitelist = self.get_lists(cfg.system_stats_disk_blacklist, + cfg.system_stats_disk_whitelist) + + def get_lists(self, cfg_blacklist, cfg_whitelist): + blacklist = set(cfg_blacklist) if cfg_blacklist else None + whitelist = set(cfg_whitelist) if cfg_whitelist else None + return blacklist, whitelist + + def check_lists(self, val, blacklist, whitelist): + if whitelist: + return val in whitelist + if blacklist: + return val not in blacklist + return True + + def collect(self): + self.read_cpu_stats() + self.read_load_stats() + self.read_filesystem_stats() + self.read_memory_stats() + self.read_interface_stats() + self.read_disk_stats() + return True + + def read_cpu_stats(self): + now = int(time.time()) + with open('/proc/stat') as f: + processes_stats = {} + for l in f.readlines(): + tokens = l.strip().split() + if not tokens: + continue + name = tokens[0] + if not name.startswith('cpu'): + if name == 'ctxt': + processes_stats['switches'] = long(tokens[1]) + elif name == 'processes': + processes_stats['forks'] = long(tokens[1]) + elif name == 'procs_running': + processes_stats['running'] = long(tokens[1]) + else: + cpu_suffix = name[3:] + if not cpu_suffix: + continue + cpu_stats = {k: long(v) for k, v in zip(self.CPU_FIELDS, tokens[1:])} + self.add_stat("system_cpu", cpu_stats, now, name=cpu_suffix) + if processes_stats: + self.add_stat("system_processes", processes_stats, now) + + def read_filesystem_stats(self): + now = int(time.time()) + with open('/proc/mounts') as f: + for l in f.readlines(): + tokens = l.strip().split() + if not tokens or len(tokens) != 6: + continue + if not tokens[1].startswith('/'): + continue + mount_target, mount_path, mount_filesystem = tokens[:3] + if not self.check_lists(mount_filesystem, self.filesystem_blacklist, self.filesystem_whitelist): + continue + try: + stats = os.statvfs(mount_path) + total_inodes = long(stats.f_files) + # Skip special filesystems + if not total_inodes: + continue + block_size = stats.f_bsize + df_stats = { + 'free_bytes': long(stats.f_bavail) * block_size, + 'total_bytes': long(stats.f_blocks) * block_size, + 'free_inodes': long(stats.f_favail), + 'total_inodes': total_inodes + } + self.add_stat("system_filesystem", df_stats, now, device=mount_target, name=mount_path, type=mount_filesystem) + except OSError: + pass + + def read_interface_stats(self): + now = int(time.time()) + with open('/proc/net/dev') as f: + for l in f.readlines(): + tokens = l.strip().split() + if not tokens or len(tokens) != 17: + continue + if not tokens[0].endswith(':'): + continue + interface_name = tokens[0][:-1] + if not self.check_lists(interface_name, self.interface_blacklist, self.interface_whitelist): + continue + interface_stats = { + 'rx_bytes': long(tokens[1]), + 'rx_packets': long(tokens[2]), + 'rx_errors': long(tokens[3]), + 'rx_dropped': long(tokens[4]), + 'tx_bytes': long(tokens[9]), + 'tx_packets': long(tokens[10]), + 'tx_errors': long(tokens[11]), + 'tx_dropped': long(tokens[12]) + } + self.add_stat("system_interface", interface_stats, now, name=interface_name) + + def read_load_stats(self): + now = int(time.time()) + with open('/proc/loadavg') as f: + for l in f.readlines(): + tokens = l.strip().split() + if not tokens or len(tokens) != 5: + continue + load_stats = { + 'last_1m': float(tokens[0]), + 'last_5m': float(tokens[1]), + 'last_15m': float(tokens[2]) + } + self.add_stat("system_load", load_stats, now) + + def read_memory_stats(self): + now = int(time.time()) + with open('/proc/meminfo') as f: + memory_stats = {} + for l in f.readlines(): + tokens = l.strip().split() + if not tokens or len(tokens) != 3 or tokens[2].lower() != 'kb': + continue + name = tokens[0] + if not name.endswith(":"): + continue + name = name[:-1].lower() + if name == "memtotal": + memory_stats['total_bytes'] = long(tokens[1]) * 1024 + elif name == "memfree": + memory_stats['free_bytes'] = long(tokens[1]) * 1024 + elif name == "memavailable": + memory_stats['available_bytes'] = long(tokens[1]) * 1024 + if memory_stats: + self.add_stat("system_memory", memory_stats, now) + + def read_disk_stats(self): + now = int(time.time()) + with open('/proc/diskstats') as f: + for l in f.readlines(): + tokens = l.strip().split() + if not tokens or len(tokens) != 14: + continue + disk_name = tokens[2] + if not self.check_lists(disk_name, self.disk_blacklist, self.disk_whitelist): + continue + disk_stats = { + 'read_ops': long(tokens[3]), + 'read_merged': long(tokens[4]), + 'read_sectors': long(tokens[5]), + 'read_bytes': long(tokens[5]) * 512, + 'read_time': long(tokens[6]), + + 'write_ops': long(tokens[7]), + 'write_merged': long(tokens[8]), + 'write_sectors': long(tokens[9]), + 'write_bytes': long(tokens[9]) * 512, + 'write_time': long(tokens[10]), + + 'in_progress': long(tokens[11]), + 'io_time': long(tokens[12]), + 'weighted_time': long(tokens[13]) + } + self.add_stat("system_disk", disk_stats, now, name=disk_name) diff --git a/contrib/statsd_perfomance_test.py b/contrib/statsd_perfomance_test.py index 0cd9661..cdb4274 100644 --- a/contrib/statsd_perfomance_test.py +++ b/contrib/statsd_perfomance_test.py @@ -20,14 +20,14 @@ queue = multiprocessing.Queue() -handler = bucky.statsd.StatsDHandler(queue, bucky.cfg) +handler = bucky.statsd.StatsDServer(queue, bucky.cfg) def fill_and_compute_timers(handler): # Fill timers for x in l100: # timer name for y in l1000: # timer value, using random value is not good idea there - handler.handle_timer("timer-%s" % (x), [y]) + handler.handle_timer(("timer-%s" % (x,), ()), [y]) # Compute metrics stime = int(time.time()) diff --git a/requirements.txt b/requirements.txt index 743e166..e2db503 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,3 +2,4 @@ six setproctitle pycrypto watchdog +docker \ No newline at end of file diff --git a/tests/test_001_statsd.py b/tests/test_001_statsd.py index f932a31..71ea170 100644 --- a/tests/test_001_statsd.py +++ b/tests/test_001_statsd.py @@ -36,7 +36,7 @@ def test_simple_counter(q, s): s.send("gorm:1|c") t.same_stat(None, "stats.gorm", 2, q.get(timeout=TIMEOUT)) t.same_stat(None, "stats_counts.gorm", 1, q.get(timeout=TIMEOUT)) - t.same_stat(None, "stats.numStats", 1, q.get(timeout=TIMEOUT)) + t.same_stat(None, "stats.", {"numStats": 1}, q.get(timeout=TIMEOUT)) @t.set_cfg("statsd_flush_time", 0.5) @@ -47,7 +47,7 @@ def test_multiple_messages(q, s): s.send("gorm:1|c") t.same_stat(None, "stats.gorm", 4, q.get(timeout=TIMEOUT)) t.same_stat(None, "stats_counts.gorm", 2, q.get(timeout=TIMEOUT)) - t.same_stat(None, "stats.numStats", 1, q.get(timeout=TIMEOUT)) + t.same_stat(None, "stats.", {"numStats": 1}, q.get(timeout=TIMEOUT)) @t.set_cfg("statsd_flush_time", 0.5) @@ -57,7 +57,7 @@ def test_larger_count(q, s): s.send("gorm:5|c") t.same_stat(None, "stats.gorm", 10, q.get(timeout=TIMEOUT)) t.same_stat(None, "stats_counts.gorm", 5, q.get(timeout=TIMEOUT)) - t.same_stat(None, "stats.numStats", 1, q.get(timeout=TIMEOUT)) + t.same_stat(None, "stats.", {'numStats': 1}, q.get(timeout=TIMEOUT)) @t.set_cfg("statsd_flush_time", 0.5) @@ -78,7 +78,7 @@ def test_multiple_counters(q, s): t.eq(stats[stat[1]], stat[2]) t.gt(stat[2], 0) stats.pop(stat[1]) - t.same_stat(None, "stats.numStats", 2, q.get(timeout=TIMEOUT)) + t.same_stat(None, "stats.", {'numStats': 2}, q.get(timeout=TIMEOUT)) @t.set_cfg("statsd_flush_time", 0.5) @@ -88,21 +88,24 @@ def test_simple_timer(q, s): for i in range(9): s.send("gorm:1|ms") s.send("gorm:2|ms") # Out of the 90% threshold - t.same_stat(None, "stats.timers.gorm.mean_90", 1, q.get(timeout=TIMEOUT)) - t.same_stat(None, "stats.timers.gorm.upper_90", 1, q.get(timeout=TIMEOUT)) - t.same_stat(None, "stats.timers.gorm.count_90", 9, q.get(timeout=TIMEOUT)) - t.same_stat(None, "stats.timers.gorm.sum_90", 9, q.get(timeout=TIMEOUT)) - t.same_stat(None, "stats.timers.gorm.sum_squares_90", 9, q.get(timeout=TIMEOUT)) - t.same_stat(None, "stats.timers.gorm.mean", 1.1, q.get(timeout=TIMEOUT)) - t.same_stat(None, "stats.timers.gorm.upper", 2, q.get(timeout=TIMEOUT)) - t.same_stat(None, "stats.timers.gorm.lower", 1, q.get(timeout=TIMEOUT)) - t.same_stat(None, "stats.timers.gorm.count", 10, q.get(timeout=TIMEOUT)) - t.same_stat(None, "stats.timers.gorm.count_ps", 20.0, q.get(timeout=TIMEOUT)) - t.same_stat(None, "stats.timers.gorm.median", 1, q.get(timeout=TIMEOUT)) - t.same_stat(None, "stats.timers.gorm.sum", 11, q.get(timeout=TIMEOUT)) - t.same_stat(None, "stats.timers.gorm.sum_squares", 13, q.get(timeout=TIMEOUT)) - t.same_stat(None, "stats.timers.gorm.std", 0.3, q.get(timeout=TIMEOUT)) - t.same_stat(None, "stats.numStats", 1, q.get(timeout=TIMEOUT)) + expected_value = { + "mean_90": 1, + "upper_90": 1, + "count_90": 9, + "sum_90": 9, + "sum_squares_90": 9, + "mean": 1.1, + "upper": 2, + "lower": 1, + "count": 10, + "count_ps": 20.0, + "median": 1, + "sum": 11, + "sum_squares": 13, + "std": 0.3 + } + t.same_stat(None, "stats.timers.gorm", expected_value, q.get(timeout=TIMEOUT)) + t.same_stat(None, "stats.", {"numStats": 1}, q.get(timeout=TIMEOUT)) @t.set_cfg("statsd_flush_time", 0.5) @@ -113,21 +116,24 @@ def test_timer_unsorted(q, s): s.send("gorm:5|ms") s.send("gorm:7|ms") # Out of the 90% threshold s.send("gorm:3|ms") - t.same_stat(None, "stats.timers.gorm.mean_90", 10 / 3.0, q.get(timeout=TIMEOUT)) - t.same_stat(None, "stats.timers.gorm.upper_90", 5, q.get(timeout=TIMEOUT)) - t.same_stat(None, "stats.timers.gorm.count_90", 3, q.get(timeout=TIMEOUT)) - t.same_stat(None, "stats.timers.gorm.sum_90", 10, q.get(timeout=TIMEOUT)) - t.same_stat(None, "stats.timers.gorm.sum_squares_90", 38, q.get(timeout=TIMEOUT)) - t.same_stat(None, "stats.timers.gorm.mean", 17 / 4.0, q.get(timeout=TIMEOUT)) - t.same_stat(None, "stats.timers.gorm.upper", 7, q.get(timeout=TIMEOUT)) - t.same_stat(None, "stats.timers.gorm.lower", 2, q.get(timeout=TIMEOUT)) - t.same_stat(None, "stats.timers.gorm.count", 4, q.get(timeout=TIMEOUT)) - t.same_stat(None, "stats.timers.gorm.count_ps", 8, q.get(timeout=TIMEOUT)) - t.same_stat(None, "stats.timers.gorm.median", 4, q.get(timeout=TIMEOUT)) - t.same_stat(None, "stats.timers.gorm.sum", 17, q.get(timeout=TIMEOUT)) - t.same_stat(None, "stats.timers.gorm.sum_squares", 87, q.get(timeout=TIMEOUT)) - t.same_stat(None, "stats.timers.gorm.std", 1.920286436967152, q.get(timeout=TIMEOUT)) - t.same_stat(None, "stats.numStats", 1, q.get(timeout=TIMEOUT)) + expected_value = { + "mean_90": 10 / 3.0, + "upper_90": 5, + "count_90": 3, + "sum_90": 10, + "sum_squares_90": 38, + "mean": 17 / 4.0, + "upper": 7, + "lower": 2, + "count": 4, + "count_ps": 8, + "median": 4, + "sum": 17, + "sum_squares": 87, + "std": 1.920286436967152 + } + t.same_stat(None, "stats.timers.gorm", expected_value, q.get(timeout=TIMEOUT)) + t.same_stat(None, "stats.", {"numStats": 1}, q.get(timeout=TIMEOUT)) @t.set_cfg("statsd_flush_time", 0.1) @@ -135,16 +141,19 @@ def test_timer_unsorted(q, s): @t.udp_srv(bucky.statsd.StatsDServer) def test_timer_single_time(q, s): s.send("gorm:100|ms") - t.same_stat(None, "stats.timers.gorm.mean", 100, q.get(timeout=TIMEOUT)) - t.same_stat(None, "stats.timers.gorm.upper", 100, q.get(timeout=TIMEOUT)) - t.same_stat(None, "stats.timers.gorm.lower", 100, q.get(timeout=TIMEOUT)) - t.same_stat(None, "stats.timers.gorm.count", 1, q.get(timeout=TIMEOUT)) - t.same_stat(None, "stats.timers.gorm.count_ps", 10, q.get(timeout=TIMEOUT)) - t.same_stat(None, "stats.timers.gorm.median", 100, q.get(timeout=TIMEOUT)) - t.same_stat(None, "stats.timers.gorm.sum", 100, q.get(timeout=TIMEOUT)) - t.same_stat(None, "stats.timers.gorm.sum_squares", 10000, q.get(timeout=TIMEOUT)) - t.same_stat(None, "stats.timers.gorm.std", 0, q.get(timeout=TIMEOUT)) - t.same_stat(None, "stats.numStats", 1, q.get(timeout=TIMEOUT)) + expected_value = { + "mean": 100, + "upper": 100, + "lower": 100, + "count": 1, + "count_ps": 10, + "median": 100, + "sum": 100, + "sum_squares": 10000, + "std": 0 + } + t.same_stat(None, "stats.timers.gorm", expected_value, q.get(timeout=TIMEOUT)) + t.same_stat(None, "stats.", {"numStats": 1}, q.get(timeout=TIMEOUT)) @t.set_cfg("statsd_flush_time", 0.1) @@ -154,21 +163,24 @@ def test_timer_multiple_times(q, s): s.send("gorm:100|ms") s.send("gorm:200|ms") s.send("gorm:300|ms") # Out of the 90% threshold - t.same_stat(None, "stats.timers.gorm.mean_90", 150, q.get(timeout=TIMEOUT)) - t.same_stat(None, "stats.timers.gorm.upper_90", 200, q.get(timeout=TIMEOUT)) - t.same_stat(None, "stats.timers.gorm.count_90", 2, q.get(timeout=TIMEOUT)) - t.same_stat(None, "stats.timers.gorm.sum_90", 300, q.get(timeout=TIMEOUT)) - t.same_stat(None, "stats.timers.gorm.sum_squares_90", 50000, q.get(timeout=TIMEOUT)) - t.same_stat(None, "stats.timers.gorm.mean", 200, q.get(timeout=TIMEOUT)) - t.same_stat(None, "stats.timers.gorm.upper", 300, q.get(timeout=TIMEOUT)) - t.same_stat(None, "stats.timers.gorm.lower", 100, q.get(timeout=TIMEOUT)) - t.same_stat(None, "stats.timers.gorm.count", 3, q.get(timeout=TIMEOUT)) - t.same_stat(None, "stats.timers.gorm.count_ps", 30, q.get(timeout=TIMEOUT)) - t.same_stat(None, "stats.timers.gorm.median", 200, q.get(timeout=TIMEOUT)) - t.same_stat(None, "stats.timers.gorm.sum", 600, q.get(timeout=TIMEOUT)) - t.same_stat(None, "stats.timers.gorm.sum_squares", 140000, q.get(timeout=TIMEOUT)) - t.same_stat(None, "stats.timers.gorm.std", 81.64965809277261, q.get(timeout=TIMEOUT)) - t.same_stat(None, "stats.numStats", 1, q.get(timeout=TIMEOUT)) + expected_value = { + "mean_90": 150, + "upper_90": 200, + "count_90": 2, + "sum_90": 300, + "sum_squares_90": 50000, + "mean": 200, + "upper": 300, + "lower": 100, + "count": 3, + "count_ps": 30, + "median": 200, + "sum": 600, + "sum_squares": 140000, + "std": 81.64965809277261 + } + t.same_stat(None, "stats.timers.gorm", expected_value, q.get(timeout=TIMEOUT)) + t.same_stat(None, "stats.", {"numStats": 1}, q.get(timeout=TIMEOUT)) def queue_skip(q, number_of_elements): @@ -187,9 +199,9 @@ def test_timer_multiple_times_even(q, s): s.send("gorm:200|ms") s.send("gorm:400|ms") s.send("gorm:100|ms") - queue_skip(q, 10) - t.same_stat(None, "stats.timers.gorm.median", 250, q.get(timeout=TIMEOUT)) - queue_skip(q, 4) + returned_value = q.get(timeout=TIMEOUT) + returned_value = returned_value[:2] + (returned_value[2]["median"],) + returned_value[3:] + t.same_stat(None, "stats.timers.gorm", 250, returned_value) @t.set_cfg("statsd_flush_time", 0.5) @@ -198,9 +210,8 @@ def test_timer_multiple_times_even(q, s): @t.udp_srv(bucky.statsd.StatsDServer) def test_simple_counter_not_legacy_namespace(q, s): s.send("gorm:1|c") - t.same_stat(None, "stats.counters.gorm.rate", 2, q.get(timeout=TIMEOUT)) - t.same_stat(None, "stats.counters.gorm.count", 1, q.get(timeout=TIMEOUT)) - t.same_stat(None, "stats.numStats", 1, q.get(timeout=TIMEOUT)) + t.same_stat(None, "stats.counters.gorm", {"rate": 2, "count": 1}, q.get(timeout=TIMEOUT)) + t.same_stat(None, "stats.", {"numStats": 1}, q.get(timeout=TIMEOUT)) @t.set_cfg("statsd_flush_time", 0.5) @@ -209,7 +220,7 @@ def test_simple_counter_not_legacy_namespace(q, s): def test_simple_gauge(q, s): s.send("gorm:5|g") t.same_stat(None, "stats.gauges.gorm", 5, q.get(timeout=TIMEOUT)) - t.same_stat(None, "stats.numStats", 1, q.get(timeout=TIMEOUT)) + t.same_stat(None, "stats.", {"numStats": 1}, q.get(timeout=TIMEOUT)) @t.set_cfg("statsd_flush_time", 0.5) @@ -223,16 +234,16 @@ def test_simple_persistent_gauges(q, s): if os.path.isfile(os.path.join(t.cfg.directory, t.cfg.statsd_gauges_savefile)): os.unlink(os.path.join(t.cfg.directory, t.cfg.statsd_gauges_savefile)) try: - s.handler.handle_line("gorm:5|g") - assert s.handler.gauges["gorm"] == 5 + s.handle_line("gorm:5|g") + assert s.gauges[("gorm", None)] == 5 - s.handler.save_gauges() + s.save_gauges() - s.handler.handle_line("gorm:1|g") - assert s.handler.gauges["gorm"] == 1 + s.handle_line("gorm:1|g") + assert s.gauges[("gorm", None)] == 1 - s.handler.load_gauges() - assert s.handler.gauges["gorm"] == 5 + s.load_gauges() + assert s.gauges[("gorm", None)] == 5 finally: if os.path.isfile(os.path.join(t.cfg.directory, t.cfg.statsd_gauges_savefile)): os.unlink(os.path.join(t.cfg.directory, t.cfg.statsd_gauges_savefile))