From cad4ec644e3b3b488cfde9afe47a9de22064f98e Mon Sep 17 00:00:00 2001 From: Akash Suresh Date: Tue, 30 Oct 2018 10:16:11 -0400 Subject: [PATCH] Always determine cluster and version from ES endpoint (#59) * Pass Cluster object to ReadCallBack method * Do not emit datapoints if cluster name is not found * Update name passed to register_read * Set cluster name only in fetch_stats method * Do not assume default ES version, figure it out from endpoint * Add integrations test for ES 5.6.3 * Handle version/cluster name updates between intervals * Address review comments * Add tests for version specific metrics * Some more updates to tests --- elasticsearch_collectd.py | 80 +++++++++++--------- tests/integration/20-elasticsearch-test.conf | 6 ++ tests/integration/Dockerfile.es.5.6.3 | 5 ++ tests/integration/docker-compose.yml | 6 ++ tests/integration/test.py | 26 ++++--- tests/integration/wait_for_es | 2 +- 6 files changed, 81 insertions(+), 44 deletions(-) create mode 100644 tests/integration/Dockerfile.es.5.6.3 diff --git a/elasticsearch_collectd.py b/elasticsearch_collectd.py index b223925..749bbab 100755 --- a/elasticsearch_collectd.py +++ b/elasticsearch_collectd.py @@ -22,7 +22,6 @@ import ssl PREFIX = "elasticsearch" -CLUSTERS = [] DEFAULTS = set([ # AUTOMATICALLY GENERATED METRIC NAMES @@ -582,14 +581,20 @@ # collectd callbacks -def read_callback(): +def read_callback(cluster): """called by collectd to gather stats. It is called per collection interval. If this method throws, the plugin will be skipped for an increasing amount of time until it returns normally again""" log.info('Read callback called') - for c in CLUSTERS: - c.fetch_stats() + + # determine node information + cluster.load_es_info() + + # initialize stats map based on ES version + cluster.init_stats() + + cluster.fetch_stats() def str_to_bool(value): @@ -628,9 +633,9 @@ def configure_callback(conf): elif node.key == 'Verbose': handle.verbose = str_to_bool(node.values[0]) elif node.key == 'Cluster': - c.es_cluster = node.values[0] + c.es_cluster_from_config = node.values[0] log.notice( - 'overriding elasticsearch cluster name to %s' % c.es_cluster) + 'overriding elasticsearch cluster name to %s' % c.es_cluster_from_config) elif node.key == 'Version': c.es_version = node.values[0] log.notice( @@ -678,22 +683,24 @@ def configure_callback(conf): log.info('metrics to collect: %s' % c.defaults) log.info('master_only: %s' % c.master_only) - # determine node information - c.load_es_info() - - # initialize stats map based on ES version - c.init_stats() - - # add the cluster config to the list of clusters to monitor - CLUSTERS.append(c) - # register the read callback now that we have the complete config - collectd.register_read(read_callback, interval=c.collection_interval) + collectd.register_read(read_callback, + interval=c.collection_interval, + name=get_unique_name(c.es_host, + c.es_port, c.es_index), + data=c) log.notice( 'started elasticsearch plugin with interval = %d seconds' % c.collection_interval) +def get_unique_name(host, port, index): + if index: + return ('%s:%s:%s' % (host, port, index)) + else: + return ('%s:%s' % (host, port)) + + def remove_deprecated_elements(deprecated, elements, version): """Remove deprecated items from a list or dictionary""" # Attempt to parse the major, minor, and revision @@ -730,6 +737,7 @@ def __init__(self): self.es_username = "" self.es_password = "" self.es_cluster = None + self.es_cluster_from_config = None self.es_version = None self.es_index = [] self.enable_index_stats = True @@ -900,11 +908,13 @@ def fetch_stats(self): node_json_stats = self.fetch_url(self.es_node_url) if node_json_stats: - if self.es_cluster is None: + # Only if Cluster name is not provided as a config option, use the + # value retured from the ES endpoint + if self.es_cluster_from_config is None: self.es_cluster = node_json_stats['cluster_name'] else: - log.info('Configured with cluster_json_stats=%s' % - self.es_cluster) + self.es_cluster = self.es_cluster_from_config + log.info('Configured with cluster_name=%s' % self.es_cluster) log.info('Parsing node_json_stats') self.parse_node_stats(node_json_stats, self.node_stats_cur) log.info('Parsing thread pool stats') @@ -972,23 +982,12 @@ def load_es_info(self): json = self.fetch_url(self.es_url_scheme + "://" + self.es_host + ":" + str(self.es_port) + "/_nodes/_local") if json is None: - # assume some sane defaults - if self.es_version is None: - self.es_version = "1.0.0" - if self.es_cluster is None: - self.es_cluster = "elasticsearch" - self.es_master_eligible = True - log.warning('Unable to determine node information, defaulting to \ - version %s, cluster %s and master %s' % - (self.es_version, self.es_cluster, - self.es_master_eligible)) return # Identify the current node self.node_id = json['nodes'].keys()[0] log.notice('current node id: %s' % self.node_id) - cluster_name = json['cluster_name'] # we should have only one entry with the current node information node_info = json['nodes'].itervalues().next() version = node_info['version'] @@ -1004,10 +1003,7 @@ def load_es_info(self): # update settings self.es_master_eligible = master_eligible - if self.es_version is None: - self.es_version = version - if self.es_cluster is None: - self.es_cluster = cluster_name + self.es_version = version log.notice('version: %s, cluster: %s, master eligible: %s' % (self.es_version, self.es_cluster, self.es_master_eligible)) @@ -1094,6 +1090,22 @@ def dispatch_stat(self, result, name, key, dimensions=None): if result is None: log.warning('Value not found for %s' % name) return + + # If we failed to get the cluster name and do not have a config + # option set for it, do not emit data + if self.es_cluster is None: + log.warning('Failed to determine Cluster name in read_callback' + + 'and no Cluster config option specified. ' + + 'Will not emit datapoints since plugin_instance ' + + 'which is the cluster name could not be determined') + return + + if self.es_version is None: + log.warning('Failed to determine ES version in read_callback. ' + + 'Will not emit datapoints since plugin_instance ' + + 'this interval. Will attempt to fetch version in the ' + + 'next interval.') + return estype = key.type value = int(result) log.info('Sending value[%s]: %s=%s' % (estype, name, value)) diff --git a/tests/integration/20-elasticsearch-test.conf b/tests/integration/20-elasticsearch-test.conf index 3ad1069..82b54cd 100644 --- a/tests/integration/20-elasticsearch-test.conf +++ b/tests/integration/20-elasticsearch-test.conf @@ -25,4 +25,10 @@ IndexInterval 3 Host es53 + + + Interval 3 + IndexInterval 3 + Host es56 + diff --git a/tests/integration/Dockerfile.es.5.6.3 b/tests/integration/Dockerfile.es.5.6.3 new file mode 100644 index 0000000..45efce3 --- /dev/null +++ b/tests/integration/Dockerfile.es.5.6.3 @@ -0,0 +1,5 @@ +FROM elasticsearch:5.6.3 + +CMD /set_cluster_name + +ADD set_cluster_name /set_cluster_name diff --git a/tests/integration/docker-compose.yml b/tests/integration/docker-compose.yml index 5368d21..d90ad80 100644 --- a/tests/integration/docker-compose.yml +++ b/tests/integration/docker-compose.yml @@ -13,6 +13,7 @@ services: - es17 - es24 - es53 + - es56 es17: build: @@ -29,6 +30,11 @@ services: context: . dockerfile: Dockerfile.es.5.3.2 + es56: + build: + context: . + dockerfile: Dockerfile.es.5.6.3 + fake_sfx: build: context: . diff --git a/tests/integration/test.py b/tests/integration/test.py index cf32559..ba0ccda 100644 --- a/tests/integration/test.py +++ b/tests/integration/test.py @@ -11,11 +11,12 @@ # This is not very flexible but could be expanded to support other types of # integration tests if so desired. -VERSIONS_TESTED = [ - '1.7.6', - '2.4.5', - '5.3.2', -] +VERSIONS_TESTED_WITH_METRICS = { + '1.7.6' : ['indices.indexing.index-total'], + '2.4.5' : ['indices.cache.filter.evictions'], + '5.3.2' : ['indices.cache.filter.evictions'], + '5.6.3' : ['indices.cache.filter.evictions'], +} TIMEOUT_SECS = 60 @@ -30,11 +31,18 @@ def get_metric_data(): def wait_for_metrics_from_each_cluster(): start = time() - for cluster in ['es-' + v for v in VERSIONS_TESTED]: - print 'Waiting for metrics from cluster %s...' % (cluster,) - eventually_true(lambda: any([cluster in m.get('plugin_instance') for m in get_metric_data()]), + for cluster in VERSIONS_TESTED_WITH_METRICS: + c = 'es-%s' % (cluster,) + print 'Waiting for metrics from cluster %s...' % (c,) + eventually_true(lambda: any([c in m.get('plugin_instance') for m in get_metric_data()]), TIMEOUT_SECS - (time() - start)) - print 'Found!' + print 'plugin_instance Found!' + for metric in VERSIONS_TESTED_WITH_METRICS.get(cluster): + print 'Waiting for metric: %s from cluster %s...' % (metric, c) + eventually_true(lambda: any([metric in str(m.get('type_instance')) for m in get_metric_data()]), + TIMEOUT_SECS - (time() - start)) + + print 'metric: %s Found! from cluster: %s' % (metric, c) def eventually_true(f, timeout_secs): diff --git a/tests/integration/wait_for_es b/tests/integration/wait_for_es index 94652b6..9371b9c 100755 --- a/tests/integration/wait_for_es +++ b/tests/integration/wait_for_es @@ -8,7 +8,7 @@ wait_for () { done } -for host in es17 es24 es53 +for host in es17 es24 es53 es56 do wait_for $host done