Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

System stats, docker stats, bulk metrics, fixes, etc #65

Open
wants to merge 45 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
e7cdf31
System stats integrated
jsiembida Jun 21, 2017
5a1927a
Docker stats go in
jsiembida Jun 21, 2017
47bbebb
Rename system.py to systemstats.py
jsiembida Jun 21, 2017
3d8e236
Add common metadata option for system/docker servers
jsiembida Jun 21, 2017
b180c66
Merge branch 'master' into docker-stats-support
jsiembida Jun 22, 2017
4cfe6bd
Add instance to docker labels
jsiembida Jun 22, 2017
e87946f
Rename label to avoid conflict
jsiembida Jun 22, 2017
d404f0f
Add conf metadata merging for statsd
jsiembida Jun 22, 2017
e02cec5
Add metadata setting from command line
jsiembida Jun 22, 2017
4446588
If docker connection fails, back off a bit
jsiembida Jun 22, 2017
268c39d
Small fixes for docker stuff
jsiembida Jun 23, 2017
2871df5
Merge InfluxDB branch
jsiembida Jun 23, 2017
d232aad
Add a list of ignored filesystems, set intervals to 10s
jsiembida Jun 23, 2017
29cdc56
More pseudo filesystems to ignore list
jsiembida Jun 23, 2017
38afee9
Merge branch 'influxdb-support' into docker-stats-support
jsiembida Jun 23, 2017
6da4817
Cleanups
jsiembida Jun 23, 2017
15e3565
Bulk transfers for system/docker/influx
jsiembida Jun 27, 2017
a38fb42
Merge branch 'master' of github.com:trbs/bucky into docker-stats-support
jsiembida Jun 27, 2017
0adfe80
Bult transfers for statsd module
jsiembida Jun 27, 2017
7595351
Fix statsd tests
jsiembida Jun 27, 2017
24c3c74
Make carbon client connection lazy because it blocks other components…
jsiembida Jun 27, 2017
da09fb7
Disk activity stats
jsiembida Jun 27, 2017
ba54cb3
Refactor docker/system stats collectors
jsiembida Jun 27, 2017
5150b52
Make statsd work with Py3
jsiembida Jun 27, 2017
bc9fc8f
Fix statsd tests
jsiembida Jun 27, 2017
b9a6e4b
Fix the silly bug
jsiembida Jun 28, 2017
2c72bc8
Improve data type handling for InfluxDB
jsiembida Jun 29, 2017
af50d9b
Lower the time resolution in InfluxDB line protocol, docs recommend i…
jsiembida Jun 29, 2017
0f62fae
More changes to data types in InfluxDB protocol
jsiembida Jun 29, 2017
36bde06
Change the back-off algo for failures
jsiembida Jun 29, 2017
92b732c
Uniform blacklist/whitelist options for system stats + small refactor…
jsiembida Jun 29, 2017
cb0553f
Extra option for ignoring Datadog event and service checks in StatsD …
jsiembida Jun 29, 2017
03aea12
Rework metadata handling in statsd module, switch to tuples, make it …
jsiembida Jun 29, 2017
624de3a
One more renaming to be consistent with docker_* names
jsiembida Jun 29, 2017
02fcc95
Add an alternative naming scheme for statsd, much more usable with in…
jsiembida Jun 29, 2017
a75c845
Fix tests
jsiembida Jun 29, 2017
d2a45b0
Use word "metadata" instead of tags/labels, simplify config for it, too
jsiembida Jun 29, 2017
06793c6
InfluxDB does not accept duplicated tags in line proto, fit it
jsiembida Jun 29, 2017
5af6bab
Right, another fix for influxdb protocol
jsiembida Jun 29, 2017
6a7ef39
Early implementation of Prometheus integration
jsiembida Jun 30, 2017
45c5893
Prometheus fixes
jsiembida Jun 30, 2017
42d8080
More metric renaming, mostly to avoid clash with Prometheus use of "i…
jsiembida Jun 30, 2017
1a693d3
Fix for Py2
jsiembida Jun 30, 2017
2e5972e
Fix tests
jsiembida Jul 1, 2017
7c1896b
Merge branch 'docker-stats-support' of github.com:jsiembida/bucky int…
jsiembida Jul 1, 2017
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -334,10 +334,13 @@ like so::
enabled = true
bind-address = ":8089"
database = "mydatabase"
precision = "1s"

Bucky will periodically resolve all hostnames in the `influxdb_hosts`
list and fan out metrics to all resolved endpoints. Thus providing
replication as well as hot swapping.
Note the precision being 1sec, it is currently the maximum precision
bucky uses when sending data to InfluxDB, hence the setting.


A note on CollectD converters
Expand Down
16 changes: 10 additions & 6 deletions bucky/carbon.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,18 +53,19 @@ def __init__(self, cfg, pipe):
self.backoff_max = cfg.graphite_backoff_max
if self.max_reconnects <= 0:
self.max_reconnects = sys.maxint
self.connect()
self.connected = False

def connect(self):
if self.debug:
log.debug("Connected the debug socket.")
self.sock = DebugSocket()
return
reconnect_delay = self.reconnect_delay
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
for i in xrange(self.max_reconnects):
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
self.sock.connect((self.ip, self.port))
self.connected = True
log.info("Connected to Carbon at %s:%s", self.ip, self.port)
return
except socket.error as e:
Expand All @@ -88,9 +89,12 @@ def close(self):
self.sock.close()
except:
pass
self.connected = False

def send(self, host, name, value, mtime, metadata=None):
raise NotImplementedError()
def send_message(self, mesg):
if not self.connected:
self.connect()
self.sock.sendall(mesg)


class PlaintextClient(CarbonClient):
Expand All @@ -99,7 +103,7 @@ def send(self, host, name, value, mtime, metadata=None):
mesg = "%s %s %s\n" % (stat, value, mtime)
for i in xrange(self.max_reconnects):
try:
self.sock.sendall(mesg)
self.send_message(mesg)
return
except socket.error as err:
log.error("Failed to send data to Carbon server: %s", err)
Expand Down Expand Up @@ -128,7 +132,7 @@ def transmit(self):
self.buffer = []
for i in xrange(self.max_reconnects):
try:
self.sock.sendall(header + payload)
self.send_message(header + payload)
return
except socket.error as err:
log.error("Failed to send data to Carbon server: %s", err)
Expand Down
33 changes: 27 additions & 6 deletions bucky/cfg.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
directory = "/var/lib/bucky"
process_join_timeout = 2

metadata = []

sentry_enabled = False
sentry_dsn = None
sentry_log_level = "WARNING"
Expand Down Expand Up @@ -34,7 +36,6 @@
statsd_port = 8125
statsd_enabled = True
statsd_flush_time = 10.0
statsd_metadata = {}
statsd_legacy_namespace = True
statsd_global_prefix = "stats"
statsd_prefix_counter = "counters"
Expand All @@ -48,11 +49,15 @@
statsd_delete_counters = True
statsd_delete_timers = True
statsd_delete_sets = True
# statsd_delete_gauges = False
# `statsd_delete_gauges = True` would make gauges in practice useless,
# except if you get an absolute(!) value every flush-interval which would makes this setting irrelevant
statsd_onlychanged_gauges = True
# `statsd_delete_gauges = True` would make gauges in practice useless, except if you get an absolute(!)
# value every flush-interval which would makes this setting irrelevant, so this option doesn't exist.
# send gauge value to graphite only if there was a change
statsd_onlychanged_gauges = True
# Disable this only if you want "bad line" be reported for lines with DataDog extensions
statsd_ignore_datadog_extensions = True
statsd_ignore_internal_stats = False
# Use metadata name=NAME instead of the original/legacy naming scheme
statsd_metadata_namespace = False

statsd_percentile_thresholds = [90] # percentile thresholds for statsd timers

Expand Down Expand Up @@ -81,6 +86,11 @@
"127.0.0.1:8089"
]

prometheus_enabled = False
prometheus_port = 9090
prometheus_timeout = 60
prometheus_path = 'metrics'

full_trace = False

name_prefix = None
Expand All @@ -91,7 +101,18 @@
name_strip_duplicates = True
name_host_trim = []

custom_clients = []
system_stats_enabled = False
system_stats_interval = 10
system_stats_filesystem_blacklist = ['tmpfs', 'aufs', 'rootfs', 'devtmpfs']
system_stats_filesystem_whitelist = None
system_stats_interface_blacklist = None
system_stats_interface_whitelist = None
system_stats_disk_blacklist = ['loop0', 'loop1', 'loop2', 'loop3', 'loop4', 'loop5', 'loop6', 'loop7']
system_stats_disk_whitelist = None

docker_stats_enabled = False
docker_stats_interval = 10
docker_stats_version = '1.22'

processor = None
processor_drop_on_error = False
Expand Down
19 changes: 18 additions & 1 deletion bucky/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,29 @@ def run(self):
setproctitle("bucky: %s" % self.__class__.__name__)
while True:
try:
if not self.pipe.poll(1):
self.tick()
continue
sample = self.pipe.recv()
except KeyboardInterrupt:
continue
if sample is None:
break
self.send(*sample)
if type(sample[2]) is dict:
self.send_bulk(*sample)
else:
self.send(*sample)

def send(self, host, name, value, time, metadata=None):
raise NotImplementedError()

def send_bulk(self, host, name, value, time, metadata=None):
for k in value.keys():
if name.endswith('.'):
metric_name = name + k
else:
metric_name = name + '.' + k
self.send(host, metric_name, value[k], time, metadata)

def tick(self):
pass
54 changes: 54 additions & 0 deletions bucky/collector.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@


import time
import multiprocessing


try:
from setproctitle import setproctitle
except ImportError:
def setproctitle(title):
pass


class StatsCollector(multiprocessing.Process):
def __init__(self, queue):
super(StatsCollector, self).__init__()
self.queue = queue

def close(self):
pass

def run(self):
setproctitle("bucky: %s" % self.__class__.__name__)
interval = self.interval
while True:
start_timestamp = time.time()
interval = self.interval if self.collect() else interval + interval
stop_timestamp = time.time()
interval = min(interval, 300)
interval = interval - (stop_timestamp - start_timestamp)
if interval > 0.1:
time.sleep(interval)

def collect(self):
raise NotImplementedError()

def add_stat(self, metric_name, metric_value, timestamp, **metadata):
if metadata:
if self.metadata:
metadata.update(self.metadata)
else:
metadata = self.metadata
if metadata:
metadata_tuple = tuple((k, metadata[k]) for k in sorted(metadata.keys()))
self.queue.put((None, metric_name, metric_value, timestamp, metadata_tuple))
else:
self.queue.put((None, metric_name, metric_value, timestamp))

def merge_dicts(self, *dicts):
ret = {}
for d in dicts:
if d:
ret.update(d)
return ret
69 changes: 69 additions & 0 deletions bucky/dockerstats.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@

import time
import docker
import logging
import bucky.collector as collector

import six
import requests.exceptions


if six.PY3:
xrange = range
long = int


log = logging.getLogger(__name__)


class DockerStatsCollector(collector.StatsCollector):
def __init__(self, queue, cfg):
super(DockerStatsCollector, self).__init__(queue)
self.metadata = cfg.metadata if cfg.metadata else {}
self.interval = cfg.docker_stats_interval
if cfg.docker_stats_version:
self.docker_client = docker.client.from_env(version=cfg.docker_stats_version)
else:
self.docker_client = docker.client.from_env()

def read_df_stats(self, now, labels, total_size, rw_size):
docker_df_stats = {
'total_bytes': long(total_size),
'used_bytes': long(rw_size)
}
self.add_stat("docker_filesystem", docker_df_stats, now, **labels)

def read_cpu_stats(self, now, labels, stats):
for k, v in enumerate(stats[u'percpu_usage']):
self.add_stat("docker_cpu", {'usage': long(v)}, now, name=k, **labels)

def read_interface_stats(self, now, labels, stats):
for k in stats.keys():
v = stats[k]
keys = (
u'rx_bytes', u'rx_packets', u'rx_errors', u'rx_dropped',
u'tx_bytes', u'tx_packets', u'tx_errors', u'tx_dropped'
)
docker_interface_stats = {k: long(v[k]) for k in keys}
self.add_stat("docker_interface", docker_interface_stats, now, name=k, **labels)

def read_memory_stats(self, now, labels, stats):
self.add_stat("docker_memory", {'used_bytes': long(stats[u'usage'])}, now, **labels)

def collect(self):
now = int(time.time())
try:
for i, container in enumerate(self.docker_client.api.containers(size=True)):
labels = container[u'Labels']
if 'docker_id' not in labels:
labels['docker_id'] = container[u'Id'][:12]
stats = self.docker_client.api.stats(container[u'Id'], decode=True, stream=False)
self.read_df_stats(now, labels, long(container[u'SizeRootFs']), long(container.get(u'SizeRw', 0)))
self.read_cpu_stats(now, labels, stats[u'cpu_stats'][u'cpu_usage'])
self.read_memory_stats(now, labels, stats[u'memory_stats'])
self.read_interface_stats(now, labels, stats[u'networks'])
return True
except requests.exceptions.ConnectionError:
return False
except ValueError:
return False
54 changes: 33 additions & 21 deletions bucky/influxdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,34 +69,46 @@ def close(self):
except:
pass

def kv(self, k, v):
return str(k) + '=' + str(v)

def flush(self):
def tick(self):
now = time.time()
if len(self.buffer) > 30 or (now - self.flush_timestamp) > 3:
if len(self.buffer) > 10 or ((now - self.flush_timestamp) > 1 and len(self.buffer)):
payload = '\n'.join(self.buffer).encode()
self.resolve_hosts()
for ip, port in self.resolved_hosts:
self.sock.sendto(payload, (ip, port))
self.buffer = []
self.flush_timestamp = now

def send(self, host, name, value, mtime, metadata=None):
buf = [name]
if host:
if metadata is None:
metadata = {'host': host}
else:
if 'host' not in metadata:
metadata['host'] = host
if metadata:
for k in metadata.keys():
v = metadata[k]
# InfluxDB will drop insert with tags without values
if v is not None:
buf.append(self.kv(k, v))
def _send(self, host, name, mtime, values, metadata=None):
# https://docs.influxdata.com/influxdb/v1.2/write_protocols/line_protocol_tutorial/
line = ' '.join((','.join(buf), self.kv('value', value), str(long(mtime) * 1000000000)))
label_buf = [name]
if not metadata and host:
metadata = ('host', host)
if metadata:
# InfluxDB docs recommend sorting tags
for k, v in metadata:
# InfluxDB will drop insert with empty tags
if v is None or v == '':
continue
v = str(v).replace(' ', '')
label_buf.append(str(k) + '=' + v)
value_buf = []
for k in values.keys():
v = values[k]
t = type(v)
if t is long or t is int:
value_buf.append(str(k) + '=' + str(v) + 'i')
elif t is float or t is bool:
value_buf.append(str(k) + '=' + str(v))
elif t is str:
value_buf.append(str(k) + '="' + v + '"')
# So, the lower timestamp precisions don't seem to work with line protocol...
line = ' '.join((','.join(label_buf), ','.join(value_buf), str(long(mtime) * 1000000000)))
self.buffer.append(line)
self.flush()
self.tick()

def send(self, host, name, value, mtime, metadata=None):
self._send(host, name, mtime, {'value': value}, metadata)

def send_bulk(self, host, name, value, mtime, metadata=None):
self._send(host, name.strip('.'), mtime, value, metadata)
Loading