From 3c030558a330185bed5b7094cdd9d8e2d74cb998 Mon Sep 17 00:00:00 2001 From: Charlie Smith Date: Fri, 2 Jun 2017 11:28:47 -0400 Subject: [PATCH] Httpsfix (#52) * flake 8 fixes * don't supress output from tests * only use https ssl context if necessary --- README.md | 1 + elasticsearch_collectd.py | 135 +++++++++++++++++++++++--------------- tests/run_tests.sh | 2 +- 3 files changed, 84 insertions(+), 54 deletions(-) diff --git a/README.md b/README.md index 36af80c..5be1841 100644 --- a/README.md +++ b/README.md @@ -12,6 +12,7 @@ A [CollectD](http://collectd.org) plugin to collect [Elasticsearch](https://gith * collectd 4.9+ * Elasticsearch 1.x or newer. + * python v2.6 or newer (https support requires v2.7.9) ## Configuration diff --git a/elasticsearch_collectd.py b/elasticsearch_collectd.py index 996ab14..3dd2392 100755 --- a/elasticsearch_collectd.py +++ b/elasticsearch_collectd.py @@ -707,8 +707,8 @@ def remove_deprecated_elements(deprecated, elements, version): for dep in deprecated: if (major >= dep['major']) \ or (major == dep['major'] and minor >= dep['minor']) \ - or (major == dep['major'] and minor == dep['minor'] and - revision >= dep['revision']): + or (major == dep['major'] and minor == dep['minor'] + and revision >= dep['revision']): if type(elements) is list: for key in dep['keys']: if key in elements: @@ -762,8 +762,8 @@ def __init__(self): self.extra_dimensions = '' def sanatize_intervals(self): - """Sanitizes the index interval to be greater or equal to and divisible by - the collection interval + """Sanitizes the index interval to be greater or equal to and divisible + by the collection interval """ # Sanitize the self.collection_interval and self.index_interval # ? self.index_interval > self.collection_interval: @@ -772,19 +772,25 @@ def sanatize_intervals(self): # ? self.index_interval % self.collection_interval > 0: # round the self.index_interval up to a compatible value if self.index_interval % self.collection_interval > 0: - self.index_interval = self.index_interval + self.collection_interval - \ - (self.index_interval % self.collection_interval) - log.warning('The Elasticsearch Index Interval must be \ - greater or equal to than and divisible by the collection Interval. The \ - Elasticsearch Index Interval has been rounded to: %s' % self.index_interval) + self.index_interval = self.index_interval + \ + self.collection_interval - \ + (self.index_interval % + self.collection_interval) + log.warning(('The Elasticsearch Index Interval must be ' + 'greater or equal to than and divisible by the ' + 'collection Interval. The Elasticsearch Index ' + 'Interval has been rounded to: %s') % + self.index_interval) # ? self.index_interval < self.collection_interval : # Set self.index_interval = self.collection_interval elif self.index_interval < self.collection_interval: self.index_interval = self.collection_interval - log.warning('WARN: The Elasticsearch Index Interval must be greater \ - or equal to than and divisible by the collection Interval. The Elasticsearch \ - Index Interval has been rounded to: %s' % self.index_interval) + log.warning(('WARN: The Elasticsearch Index Interval must be ' + 'greater or equal to than and divisible by the ' + 'collection Interval. The Elasticsearch Index ' + 'Interval has been rounded to: %s') % + self.index_interval) # self.index_skip = self.index_interval / self.collection_interval self.index_skip = (self.index_interval / self.collection_interval) @@ -795,21 +801,23 @@ def sanatize_intervals(self): def remove_deprecated_node_stats(self): """Remove deprecated node stats from the list of stats to collect""" self.node_stats_cur = remove_deprecated_elements(DEPRECATED_NODE_STATS, - self.node_stats_cur, - self.es_version) + self.node_stats_cur, + self.es_version) def remove_deprecated_threads(self): """Remove deprecated thread_pools from the list of stats to collect""" self.thread_pools = remove_deprecated_elements(DEPRECATED_THREAD_POOLS, - self.thread_pools, - self.es_version) + self.thread_pools, + self.es_version) # helper methods def init_stats(self): self.sanatize_intervals() - self.es_node_url = self.es_url_scheme + "://" + self.es_host + ":" + str(self.es_port) + \ - "/_nodes/_local/stats/transport,http,process,jvm,indices,thread_pool" + self.es_node_url = self.es_url_scheme + "://" + self.es_host + ":" + \ + str(self.es_port) + \ + ("/_nodes/_local/stats/transport,http,process,jvm,indices," + "thread_pool") self.node_stats_cur = dict(NODE_STATS.items()) self.index_stats_cur = dict(INDEX_STATS.items()) if not self.es_version.startswith("1."): @@ -817,7 +825,8 @@ def init_stats(self): self.remove_deprecated_node_stats() - if self.es_version.startswith("1.1") or self.es_version.startswith("1.2"): + if self.es_version.startswith("1.1") \ + or self.es_version.startswith("1.2"): self.index_stats_cur.update(INDEX_STATS_ES_1_1) else: # 1.3 and higher @@ -830,12 +839,13 @@ def init_stats(self): self.es_index_url = self.es_url_scheme + "://" + self.es_host + \ ":" + str(self.es_port) + "/_all/_stats" else: - self.es_index_url = self.es_url_scheme + "://" + self.es_host + ":" + \ - str(self.es_port) + "/" + ",".join(self.es_index) + "/_stats" + self.es_index_url = self.es_url_scheme + "://" + self.es_host + \ + ":" + str(self.es_port) + "/" + \ + ",".join(self.es_index) + "/_stats" # common thread pools for all ES versions - thread_pools = ['generic', 'index', 'get', 'snapshot', 'bulk', 'warmer', - 'flush', 'search', 'refresh'] + thread_pools = ['generic', 'index', 'get', 'snapshot', 'bulk', + 'warmer', 'flush', 'search', 'refresh'] # Add the 1.0 metrics if not self.es_version.startswith("0."): @@ -843,28 +853,32 @@ def init_stats(self): # Add the 2.0 metrics if not self.es_version.startswith("1."): - thread_pools.extend(['suggest', 'percolate', 'management', 'listener', - 'fetch_shard_store', 'fetch_shard_started']) + thread_pools.extend(['suggest', 'percolate', 'management', + 'listener', 'fetch_shard_store', + 'fetch_shard_started']) # Add the 2.1 metrics - if not self.es_version.startswith("1.") and not self.es_version.startswith("2.0"): + if not self.es_version.startswith("1.") \ + and not self.es_version.startswith("2.0"): thread_pools.extend(['force_merge']) - # Legacy support for old configurations without Thread Pools configuration + # Legacy support for old configurations without Thread Pools config if len(self.configured_thread_pools) == 0: self.thread_pools = list(self.configured_thread_pools) else: # Filter out the thread pools that aren't specified by user - self.thread_pools = filter(lambda pool: pool in self.configured_thread_pools, - thread_pools) + self.thread_pools = filter(lambda pool: pool in + self.configured_thread_pools, + thread_pools) self.remove_deprecated_threads() self.es_cluster_url = self.es_url_scheme + "://" + self.es_host + \ - ":" + str(self.es_port) + "/_cluster/health" + ":" + str(self.es_port) + "/_cluster/health" log.notice('Initialized with version=%s, host=%s, port=%s, url=%s' % - (self.es_version, self.es_host, self.es_port, self.es_node_url)) + (self.es_version, self.es_host, self.es_port, + self.es_node_url)) # FUNCTION: Collect node stats from JSON result def lookup_node_stat(self, stat, json): @@ -889,7 +903,8 @@ def fetch_stats(self): if self.es_cluster is None: self.es_cluster = node_json_stats['cluster_name'] else: - log.info('Configured with cluster_json_stats=%s' % self.es_cluster) + log.info('Configured with cluster_json_stats=%s' % + self.es_cluster) log.info('Parsing node_json_stats') self.parse_node_stats(node_json_stats, self.node_stats_cur) log.info('Parsing thread pool stats') @@ -919,8 +934,10 @@ def fetch_stats(self): else: indexes_json_stats = indices['indices'] for index_name in indexes_json_stats.keys(): - log.info('Parsing index stats for index: %s' % index_name) - self.parse_index_stats(indexes_json_stats[index_name], index_name) + log.info('Parsing index stats for index: %s' % + index_name) + self.parse_index_stats(indexes_json_stats[index_name], + index_name) # Increment skip count self.skip_count += 1 @@ -931,13 +948,16 @@ def fetch_url(self, url): request = urllib2.Request(url) if self.es_username: authheader = base64.encodestring('%s:%s' % - (self.es_username, self.es_password) + (self.es_username, + self.es_password) ).replace('\n', '') request.add_header("Authorization", "Basic %s" % authheader) ctx = None if self.es_url_scheme == "https": ctx = ssl._create_unverified_context() - response = urllib2.urlopen(request, context=ctx, timeout=10) + response = urllib2.urlopen(request, context=ctx, timeout=10) + else: + response = urllib2.urlopen(request, timeout=10) log.info('Raw api response: %s' % response) return json.load(response) except (urllib2.URLError, urllib2.HTTPError), e: @@ -949,8 +969,8 @@ def fetch_url(self, url): response.close() def load_es_info(self): - json = self.fetch_url(self.es_url_scheme + "://" + self.es_host + ":" + str(self.es_port) + - "/_nodes/_local") + json = self.fetch_url(self.es_url_scheme + "://" + self.es_host + ":" + + str(self.es_port) + "/_nodes/_local") if json is None: # assume some sane defaults if self.es_version is None: @@ -958,9 +978,10 @@ def load_es_info(self): if self.es_cluster is None: self.es_cluster = "elasticsearch" self.es_master_eligible = True - log.warning('Unable to determine node \ - information, defaulting to version %s, cluster %s and master %s' % - (self.es_version, self.es_cluster, self.es_master_eligible)) + log.warning('Unable to determine node information, defaulting to \ + version %s, cluster %s and master %s' % + (self.es_version, self.es_cluster, + self.es_master_eligible)) return # Identify the current node @@ -991,12 +1012,15 @@ def detect_es_master(self): """Determines if this is the current master. This method sets self.es_current_master""" # determine current master - cluster_state = self.fetch_url(self.es_url_scheme + "://" + self.es_host + ":" + str(self.es_port) + - "/_cluster/state/master_node") - if self.es_current_master is False and cluster_state['master_node'] == self.node_id: + cluster_state = self.fetch_url(self.es_url_scheme + "://" + + self.es_host + ":" + str(self.es_port) + + "/_cluster/state/master_node") + if self.es_current_master is False \ + and cluster_state['master_node'] == self.node_id: self.es_current_master = True log.notice('current master: %s' % self.es_current_master) - elif self.es_current_master is True and cluster_state['master_node'] != self.node_id: + elif self.es_current_master is True \ + and cluster_state['master_node'] != self.node_id: self.es_current_master = False log.notice('current master: %s' % self.es_current_master) else: @@ -1027,7 +1051,8 @@ def parse_thread_pool_stats(self, json, stats): else: result = None - self.dispatch_stat(result, name, key, {'thread_pool': pool}) + self.dispatch_stat(result, name, key, + {'thread_pool': pool}) def parse_cluster_stats(self, json, stats): """Parse cluster stats response from ElasticSearch""" @@ -1047,7 +1072,8 @@ def parse_index_stats(self, json, index_name): result = dig_it_up(json, key.path) # update the index name in the type_instance to include # the index as a dimensions - name = name.format(index_name=sanitize_type_instance(index_name)) + name = name.format( + index_name=sanitize_type_instance(index_name)) self.dispatch_stat(result, name, key) def dispatch_stat(self, result, name, key, dimensions=None): @@ -1243,12 +1269,15 @@ def configure_test(cluster): """Configure the plugin for testing""" # Ensure all possible threadpools are eligible for collection - cluster.configured_thread_pools = set(['generic', 'index', 'get', 'snapshot', - 'bulk', 'warmer', 'flush', 'search', - 'refresh', 'suggest', 'percolate', - 'management', 'listener', - 'fetch_shard_store', 'fetch_shard_started', - 'force_merge', 'merge', 'optimize', ]) + cluster.configured_thread_pools = set(['generic', 'index', 'get', + 'snapshot', 'bulk', 'warmer', + 'flush', 'search', 'refresh', + 'suggest', 'percolate', + 'management', 'listener', + 'fetch_shard_store', + 'fetch_shard_started', + 'force_merge', 'merge', + 'optimize', ]) cluster.detailed_metrics = True cluster.index_interval = 10 cluster.enable_index_stats = True diff --git a/tests/run_tests.sh b/tests/run_tests.sh index 3f2f94e..1bb70ab 100755 --- a/tests/run_tests.sh +++ b/tests/run_tests.sh @@ -13,7 +13,7 @@ trap 'rm -f $tmpfile' 1 2 3 15 for scenario in `ls data`; do echo -n "testing against ES $scenario" - ${PYTHON} ./simulate.py data/${scenario} &> /dev/null & + ${PYTHON} ./simulate.py data/${scenario} & pid="$!" ${PYTHON} ../elasticsearch_collectd.py > $tmpfile if [ $? != 0 ]; then