-
Notifications
You must be signed in to change notification settings - Fork 490
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Create new metrics earlier #888
base: master
Are you sure you want to change the base?
Changes from 4 commits
1026ef0
1adf10c
9de4246
b9dbd1b
9653e7a
896b0a6
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,2 +1,6 @@ | ||
class CarbonConfigException(Exception): | ||
"""Raised when a carbon daemon is improperly configured""" | ||
|
||
class CarbonCreatesLimiterException(Exception): | ||
"""Raised when limitor is hit""" | ||
pass |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -21,6 +21,7 @@ | |
from carbon.conf import settings | ||
from carbon import log, instrumentation | ||
from carbon.util import TokenBucket | ||
from carbon.exceptions import CarbonCreatesLimiterException | ||
|
||
from twisted.internet import reactor | ||
from twisted.internet.task import LoopingCall | ||
|
@@ -31,6 +32,11 @@ | |
except ImportError: | ||
log.msg("Couldn't import signal module") | ||
|
||
# Python 2 backwards compatibility | ||
try: | ||
FileNotFoundError | ||
except NameError: | ||
FileNotFoundError = IOError | ||
|
||
SCHEMAS = loadStorageSchemas() | ||
AGGREGATION_SCHEMAS = loadAggregationSchemas() | ||
|
@@ -90,64 +96,70 @@ def getbatch(self, maxsize=1): | |
tagQueue = TagQueue(maxsize=settings.TAG_QUEUE_SIZE, update_interval=settings.TAG_UPDATE_INTERVAL) | ||
|
||
|
||
def writeCachedDataPoints(): | ||
"Write datapoints until the MetricCache is completely empty" | ||
|
||
cache = MetricCache() | ||
while cache: | ||
(metric, datapoints) = cache.drain_metric() | ||
if metric is None: | ||
# end the loop | ||
break | ||
|
||
dbFileExists = state.database.exists(metric) | ||
|
||
if not dbFileExists: | ||
if CREATE_BUCKET and not CREATE_BUCKET.drain(1): | ||
# If our tokenbucket doesn't have enough tokens available to create a new metric | ||
# file then we'll just drop the metric on the ground and move on to the next | ||
# metric. | ||
# XXX This behavior should probably be configurable to no tdrop metrics | ||
# when rate limitng unless our cache is too big or some other legit | ||
# reason. | ||
instrumentation.increment('droppedCreates') | ||
continue | ||
def create_database(metric): | ||
|
||
archiveConfig = None | ||
xFilesFactor, aggregationMethod = None, None | ||
archiveConfig = None | ||
xFilesFactor, aggregationMethod = None, None | ||
|
||
for schema in SCHEMAS: | ||
for schema in SCHEMAS: | ||
if schema.matches(metric): | ||
if settings.LOG_CREATES: | ||
log.creates('new metric %s matched schema %s' % (metric, schema.name)) | ||
archiveConfig = [archive.getTuple() for archive in schema.archives] | ||
break | ||
if settings.LOG_CREATES: | ||
log.creates('new metric %s matched schema %s' % (metric, schema.name)) | ||
archiveConfig = [archive.getTuple() for archive in schema.archives] | ||
break | ||
|
||
for schema in AGGREGATION_SCHEMAS: | ||
for schema in AGGREGATION_SCHEMAS: | ||
if schema.matches(metric): | ||
if settings.LOG_CREATES: | ||
log.creates('new metric %s matched aggregation schema %s' | ||
% (metric, schema.name)) | ||
xFilesFactor, aggregationMethod = schema.archives | ||
break | ||
if settings.LOG_CREATES: | ||
log.creates('new metric %s matched aggregation schema %s' | ||
% (metric, schema.name)) | ||
xFilesFactor, aggregationMethod = schema.archives | ||
break | ||
|
||
if not archiveConfig: | ||
if not archiveConfig: | ||
raise Exception(("No storage schema matched the metric '%s'," | ||
" check your storage-schemas.conf file.") % metric) | ||
|
||
if settings.LOG_CREATES: | ||
if settings.LOG_CREATES: | ||
log.creates("creating database metric %s (archive=%s xff=%s agg=%s)" % | ||
(metric, archiveConfig, xFilesFactor, aggregationMethod)) | ||
try: | ||
|
||
try: | ||
state.database.create(metric, archiveConfig, xFilesFactor, aggregationMethod) | ||
if settings.ENABLE_TAGS: | ||
tagQueue.add(metric) | ||
tagQueue.add(metric) | ||
instrumentation.increment('creates') | ||
except Exception as e: | ||
except Exception as e: | ||
log.err() | ||
log.msg("Error creating %s: %s" % (metric, e)) | ||
instrumentation.increment('errors') | ||
continue | ||
|
||
|
||
def writeCachedDataPoints(): | ||
"""Write datapoints until the MetricCache is completely empty""" | ||
|
||
cache = MetricCache() | ||
while cache: | ||
|
||
# First check if there are new metrics | ||
for new_metric in cache.new_metrics: | ||
if not state.database.exists(new_metric): | ||
if CREATE_BUCKET and not CREATE_BUCKET.drain(1): | ||
# If our tokenbucket doesn't have enough tokens available to create a new metric | ||
# file then we'll just drop the metric on the ground and move on to the next | ||
# metric. | ||
# XXX This behavior should probably be configurable to not drop metrics | ||
# when rate limiting unless our cache is too big or some other legit | ||
# reason. | ||
instrumentation.increment('droppedCreates') | ||
break | ||
|
||
create_database(new_metric) | ||
|
||
(metric, datapoints) = cache.drain_metric() | ||
if metric is None: | ||
# end the loop | ||
break | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This will only create new files as long as there are also updates to process, is that what we want? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not sure if I am following, new files are created first, on every loop, break is later. Notice that the diff is kinda screwed, the part with the break is old code. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. imagine if there is more to get from but that seems impossible, there can't be more new metrics than total cached metrics, right ... but I understand how one might be unsure, when looking at this code in isolation. Here's another question: is it possible for I have an alternate design idea: keep the original logic here in this loop, and instead modify There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Yup, that should be impossible.
Yes, that is possible when the MAX_CREATES_PER_MINUTE limiter is hit. This is on purpose, currently the metrics get dropped as well, see the old XXX TODO part.
That is exactly what the naive strategy does. That one is not efficient as it does not write the metric with most datapoints resulting in small writes. The cache doesn't know if a metric is really new (as not on disk) it can just see that it is not in the cache dict. I didn't want to add IO stuff to the cache thread to check if it is on disk, no particular reason it just felt bad to mix it. Maybe it would be better? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hmm you're exactly right ... my idea was equivalent to the existing naive strategy ... That is an intriguing idea, doing the I don't have any better ideas, maybe your current method will work well in practice :) |
||
|
||
# If we've got a rate limit configured lets makes sure we enforce it | ||
waitTime = 0 | ||
|
@@ -166,6 +178,10 @@ def writeCachedDataPoints(): | |
if settings.ENABLE_TAGS: | ||
tagQueue.update(metric) | ||
updateTime = time.time() - t1 | ||
except FileNotFoundError: # don't log full stack trace when the db does not exist. | ||
log.msg("Error writing %s: File does not exist (yet). " % metric + | ||
"Increase MAX_CREATES_PER_MINUTE") | ||
instrumentation.increment('errors') | ||
except Exception as e: | ||
log.err() | ||
log.msg("Error writing to %s: %s" % (metric, e)) | ||
|
@@ -187,8 +203,8 @@ def writeForever(): | |
while reactor.running: | ||
try: | ||
writeCachedDataPoints() | ||
except Exception: | ||
log.err() | ||
except Exception as e: | ||
log.err(e) | ||
# Back-off on error to give the backend time to recover. | ||
time.sleep(0.1) | ||
else: | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,6 +1,6 @@ | ||
[tox] | ||
envlist = | ||
py{27,35,36,37,38,py}{,-pyhash}, | ||
py{27,35,36,37,38,py,py3}{,-pyhash}, | ||
lint, | ||
benchmark | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks like these methods are not used? the writer currently just does: