Skip to content

Commit

Permalink
Httpsfix (#52)
Browse files Browse the repository at this point in the history
* flake 8 fixes

* don't supress output from tests

* only use https ssl context if necessary
  • Loading branch information
charless-splunk authored and keitwb committed Jun 2, 2017
1 parent 4fe5626 commit 3c03055
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 54 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ A [CollectD](http://collectd.org) plugin to collect [Elasticsearch](https://gith

* collectd 4.9+
* Elasticsearch 1.x or newer.
* python v2.6 or newer (https support requires v2.7.9)

## Configuration

Expand Down
135 changes: 82 additions & 53 deletions elasticsearch_collectd.py
Original file line number Diff line number Diff line change
Expand Up @@ -707,8 +707,8 @@ def remove_deprecated_elements(deprecated, elements, version):
for dep in deprecated:
if (major >= dep['major']) \
or (major == dep['major'] and minor >= dep['minor']) \
or (major == dep['major'] and minor == dep['minor'] and
revision >= dep['revision']):
or (major == dep['major'] and minor == dep['minor']
and revision >= dep['revision']):
if type(elements) is list:
for key in dep['keys']:
if key in elements:
Expand Down Expand Up @@ -762,8 +762,8 @@ def __init__(self):
self.extra_dimensions = ''

def sanatize_intervals(self):
"""Sanitizes the index interval to be greater or equal to and divisible by
the collection interval
"""Sanitizes the index interval to be greater or equal to and divisible
by the collection interval
"""
# Sanitize the self.collection_interval and self.index_interval
# ? self.index_interval > self.collection_interval:
Expand All @@ -772,19 +772,25 @@ def sanatize_intervals(self):
# ? self.index_interval % self.collection_interval > 0:
# round the self.index_interval up to a compatible value
if self.index_interval % self.collection_interval > 0:
self.index_interval = self.index_interval + self.collection_interval - \
(self.index_interval % self.collection_interval)
log.warning('The Elasticsearch Index Interval must be \
greater or equal to than and divisible by the collection Interval. The \
Elasticsearch Index Interval has been rounded to: %s' % self.index_interval)
self.index_interval = self.index_interval + \
self.collection_interval - \
(self.index_interval %
self.collection_interval)
log.warning(('The Elasticsearch Index Interval must be '
'greater or equal to than and divisible by the '
'collection Interval. The Elasticsearch Index '
'Interval has been rounded to: %s') %
self.index_interval)

# ? self.index_interval < self.collection_interval :
# Set self.index_interval = self.collection_interval
elif self.index_interval < self.collection_interval:
self.index_interval = self.collection_interval
log.warning('WARN: The Elasticsearch Index Interval must be greater \
or equal to than and divisible by the collection Interval. The Elasticsearch \
Index Interval has been rounded to: %s' % self.index_interval)
log.warning(('WARN: The Elasticsearch Index Interval must be '
'greater or equal to than and divisible by the '
'collection Interval. The Elasticsearch Index '
'Interval has been rounded to: %s') %
self.index_interval)

# self.index_skip = self.index_interval / self.collection_interval
self.index_skip = (self.index_interval / self.collection_interval)
Expand All @@ -795,29 +801,32 @@ def sanatize_intervals(self):
def remove_deprecated_node_stats(self):
"""Remove deprecated node stats from the list of stats to collect"""
self.node_stats_cur = remove_deprecated_elements(DEPRECATED_NODE_STATS,
self.node_stats_cur,
self.es_version)
self.node_stats_cur,
self.es_version)

def remove_deprecated_threads(self):
"""Remove deprecated thread_pools from the list of stats to collect"""
self.thread_pools = remove_deprecated_elements(DEPRECATED_THREAD_POOLS,
self.thread_pools,
self.es_version)
self.thread_pools,
self.es_version)

# helper methods
def init_stats(self):
self.sanatize_intervals()

self.es_node_url = self.es_url_scheme + "://" + self.es_host + ":" + str(self.es_port) + \
"/_nodes/_local/stats/transport,http,process,jvm,indices,thread_pool"
self.es_node_url = self.es_url_scheme + "://" + self.es_host + ":" + \
str(self.es_port) + \
("/_nodes/_local/stats/transport,http,process,jvm,indices,"
"thread_pool")
self.node_stats_cur = dict(NODE_STATS.items())
self.index_stats_cur = dict(INDEX_STATS.items())
if not self.es_version.startswith("1."):
self.node_stats_cur.update(NODE_STATS_ES_2)

self.remove_deprecated_node_stats()

if self.es_version.startswith("1.1") or self.es_version.startswith("1.2"):
if self.es_version.startswith("1.1") \
or self.es_version.startswith("1.2"):
self.index_stats_cur.update(INDEX_STATS_ES_1_1)
else:
# 1.3 and higher
Expand All @@ -830,41 +839,46 @@ def init_stats(self):
self.es_index_url = self.es_url_scheme + "://" + self.es_host + \
":" + str(self.es_port) + "/_all/_stats"
else:
self.es_index_url = self.es_url_scheme + "://" + self.es_host + ":" + \
str(self.es_port) + "/" + ",".join(self.es_index) + "/_stats"
self.es_index_url = self.es_url_scheme + "://" + self.es_host + \
":" + str(self.es_port) + "/" + \
",".join(self.es_index) + "/_stats"

# common thread pools for all ES versions
thread_pools = ['generic', 'index', 'get', 'snapshot', 'bulk', 'warmer',
'flush', 'search', 'refresh']
thread_pools = ['generic', 'index', 'get', 'snapshot', 'bulk',
'warmer', 'flush', 'search', 'refresh']

# Add the 1.0 metrics
if not self.es_version.startswith("0."):
thread_pools.extend(['merge', 'optimize'])

# Add the 2.0 metrics
if not self.es_version.startswith("1."):
thread_pools.extend(['suggest', 'percolate', 'management', 'listener',
'fetch_shard_store', 'fetch_shard_started'])
thread_pools.extend(['suggest', 'percolate', 'management',
'listener', 'fetch_shard_store',
'fetch_shard_started'])

# Add the 2.1 metrics
if not self.es_version.startswith("1.") and not self.es_version.startswith("2.0"):
if not self.es_version.startswith("1.") \
and not self.es_version.startswith("2.0"):
thread_pools.extend(['force_merge'])

# Legacy support for old configurations without Thread Pools configuration
# Legacy support for old configurations without Thread Pools config
if len(self.configured_thread_pools) == 0:
self.thread_pools = list(self.configured_thread_pools)
else:
# Filter out the thread pools that aren't specified by user
self.thread_pools = filter(lambda pool: pool in self.configured_thread_pools,
thread_pools)
self.thread_pools = filter(lambda pool: pool in
self.configured_thread_pools,
thread_pools)

self.remove_deprecated_threads()

self.es_cluster_url = self.es_url_scheme + "://" + self.es_host + \
":" + str(self.es_port) + "/_cluster/health"
":" + str(self.es_port) + "/_cluster/health"

log.notice('Initialized with version=%s, host=%s, port=%s, url=%s' %
(self.es_version, self.es_host, self.es_port, self.es_node_url))
(self.es_version, self.es_host, self.es_port,
self.es_node_url))

# FUNCTION: Collect node stats from JSON result
def lookup_node_stat(self, stat, json):
Expand All @@ -889,7 +903,8 @@ def fetch_stats(self):
if self.es_cluster is None:
self.es_cluster = node_json_stats['cluster_name']
else:
log.info('Configured with cluster_json_stats=%s' % self.es_cluster)
log.info('Configured with cluster_json_stats=%s' %
self.es_cluster)
log.info('Parsing node_json_stats')
self.parse_node_stats(node_json_stats, self.node_stats_cur)
log.info('Parsing thread pool stats')
Expand Down Expand Up @@ -919,8 +934,10 @@ def fetch_stats(self):
else:
indexes_json_stats = indices['indices']
for index_name in indexes_json_stats.keys():
log.info('Parsing index stats for index: %s' % index_name)
self.parse_index_stats(indexes_json_stats[index_name], index_name)
log.info('Parsing index stats for index: %s' %
index_name)
self.parse_index_stats(indexes_json_stats[index_name],
index_name)
# Increment skip count
self.skip_count += 1

Expand All @@ -931,13 +948,16 @@ def fetch_url(self, url):
request = urllib2.Request(url)
if self.es_username:
authheader = base64.encodestring('%s:%s' %
(self.es_username, self.es_password)
(self.es_username,
self.es_password)
).replace('\n', '')
request.add_header("Authorization", "Basic %s" % authheader)
ctx = None
if self.es_url_scheme == "https":
ctx = ssl._create_unverified_context()
response = urllib2.urlopen(request, context=ctx, timeout=10)
response = urllib2.urlopen(request, context=ctx, timeout=10)
else:
response = urllib2.urlopen(request, timeout=10)
log.info('Raw api response: %s' % response)
return json.load(response)
except (urllib2.URLError, urllib2.HTTPError), e:
Expand All @@ -949,18 +969,19 @@ def fetch_url(self, url):
response.close()

def load_es_info(self):
json = self.fetch_url(self.es_url_scheme + "://" + self.es_host + ":" + str(self.es_port) +
"/_nodes/_local")
json = self.fetch_url(self.es_url_scheme + "://" + self.es_host + ":" +
str(self.es_port) + "/_nodes/_local")
if json is None:
# assume some sane defaults
if self.es_version is None:
self.es_version = "1.0.0"
if self.es_cluster is None:
self.es_cluster = "elasticsearch"
self.es_master_eligible = True
log.warning('Unable to determine node \
information, defaulting to version %s, cluster %s and master %s' %
(self.es_version, self.es_cluster, self.es_master_eligible))
log.warning('Unable to determine node information, defaulting to \
version %s, cluster %s and master %s' %
(self.es_version, self.es_cluster,
self.es_master_eligible))
return

# Identify the current node
Expand Down Expand Up @@ -991,12 +1012,15 @@ def detect_es_master(self):
"""Determines if this is the current master. This method sets
self.es_current_master"""
# determine current master
cluster_state = self.fetch_url(self.es_url_scheme + "://" + self.es_host + ":" + str(self.es_port) +
"/_cluster/state/master_node")
if self.es_current_master is False and cluster_state['master_node'] == self.node_id:
cluster_state = self.fetch_url(self.es_url_scheme + "://" +
self.es_host + ":" + str(self.es_port) +
"/_cluster/state/master_node")
if self.es_current_master is False \
and cluster_state['master_node'] == self.node_id:
self.es_current_master = True
log.notice('current master: %s' % self.es_current_master)
elif self.es_current_master is True and cluster_state['master_node'] != self.node_id:
elif self.es_current_master is True \
and cluster_state['master_node'] != self.node_id:
self.es_current_master = False
log.notice('current master: %s' % self.es_current_master)
else:
Expand Down Expand Up @@ -1027,7 +1051,8 @@ def parse_thread_pool_stats(self, json, stats):
else:
result = None

self.dispatch_stat(result, name, key, {'thread_pool': pool})
self.dispatch_stat(result, name, key,
{'thread_pool': pool})

def parse_cluster_stats(self, json, stats):
"""Parse cluster stats response from ElasticSearch"""
Expand All @@ -1047,7 +1072,8 @@ def parse_index_stats(self, json, index_name):
result = dig_it_up(json, key.path)
# update the index name in the type_instance to include
# the index as a dimensions
name = name.format(index_name=sanitize_type_instance(index_name))
name = name.format(
index_name=sanitize_type_instance(index_name))
self.dispatch_stat(result, name, key)

def dispatch_stat(self, result, name, key, dimensions=None):
Expand Down Expand Up @@ -1243,12 +1269,15 @@ def configure_test(cluster):
"""Configure the plugin for testing"""

# Ensure all possible threadpools are eligible for collection
cluster.configured_thread_pools = set(['generic', 'index', 'get', 'snapshot',
'bulk', 'warmer', 'flush', 'search',
'refresh', 'suggest', 'percolate',
'management', 'listener',
'fetch_shard_store', 'fetch_shard_started',
'force_merge', 'merge', 'optimize', ])
cluster.configured_thread_pools = set(['generic', 'index', 'get',
'snapshot', 'bulk', 'warmer',
'flush', 'search', 'refresh',
'suggest', 'percolate',
'management', 'listener',
'fetch_shard_store',
'fetch_shard_started',
'force_merge', 'merge',
'optimize', ])
cluster.detailed_metrics = True
cluster.index_interval = 10
cluster.enable_index_stats = True
Expand Down
2 changes: 1 addition & 1 deletion tests/run_tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ trap 'rm -f $tmpfile' 1 2 3 15
for scenario in `ls data`; do
echo -n "testing against ES $scenario"

${PYTHON} ./simulate.py data/${scenario} &> /dev/null &
${PYTHON} ./simulate.py data/${scenario} &
pid="$!"
${PYTHON} ../elasticsearch_collectd.py > $tmpfile
if [ $? != 0 ]; then
Expand Down

0 comments on commit 3c03055

Please sign in to comment.