From 440885b0241e1b83119a31191566cf685e3366ba Mon Sep 17 00:00:00 2001 From: Michael Davis Date: Thu, 10 Oct 2019 23:06:29 +0000 Subject: [PATCH] Allow removal of mongo docs after x secs --- mnemosyne.cfg.template | 1 + mnemosyne.run.j2 | 3 +++ mnemosyne/persistance/mnemodb.py | 44 +++++++++++++++++++++++++++++--- mnemosyne/runner.py | 7 ++++- requirements.txt | 2 +- 5 files changed, 51 insertions(+), 6 deletions(-) diff --git a/mnemosyne.cfg.template b/mnemosyne.cfg.template index 7826763..4882114 100644 --- a/mnemosyne.cfg.template +++ b/mnemosyne.cfg.template @@ -7,6 +7,7 @@ port = 8181 mongo_host = localhost mongo_port = 27017 database = mnemosyne +mongo_indexttl = [hpfriends] ident = diff --git a/mnemosyne.run.j2 b/mnemosyne.run.j2 index ecef567..502022c 100644 --- a/mnemosyne.run.j2 +++ b/mnemosyne.run.j2 @@ -45,6 +45,9 @@ then sed -i "s/channels *=.*/channels = ${CHANNELS}/" {{ mnemosyne_dir }}/mnemosyne.cfg sed -i "s/mongo_host *=.*/mongo_host = ${MONGODB_HOST}/" {{ mnemosyne_dir }}/mnemosyne.cfg sed -i "s/mongo_port *=.*/mongo_port = ${MONGODB_PORT}/" {{ mnemosyne_dir }}/mnemosyne.cfg + if [[ ! -z "${MONGODB_INDEXTTL}" ]]; then + sed -i "s/mongo_indexttl *=.*/mongo_indexttl = ${MONGODB_INDEXTTL}/" {{ mnemosyne_dir }}/mnemosyne.cfg + fi sed -i "s/ignore_rfc1918 *=.*/ignore_rfc1918 = ${IGNORE_RFC1918}/" {{ mnemosyne_dir }}/mnemosyne.cfg fi diff --git a/mnemosyne/persistance/mnemodb.py b/mnemosyne/persistance/mnemodb.py index dfaaf5b..82fbf25 100644 --- a/mnemosyne/persistance/mnemodb.py +++ b/mnemosyne/persistance/mnemodb.py @@ -32,14 +32,14 @@ class MnemoDB(object): - def __init__(self, host, port, database_name): + def __init__(self, host, port, database_name, indexttl=False): logger.info('Connecting to mongodb, using "{0}" as database.'.format(database_name)) conn = MongoClient(host=host, port=port, auto_start_request=False) self.rg = ReportGenerator(host=host, port=port, database_name=database_name) self.db = conn[database_name] - self.ensure_index() + self.ensure_index(indexttl) - def ensure_index(self): + def ensure_index(self, indexttl): self.db.hpfeed.ensure_index([('normalized', 1), ('last_error', 1)], unique=False, background=True) self.db.url.ensure_index('url', unique=True, background=True) self.db.url.ensure_index('extractions.hashes.md5', unique=False, background=True) @@ -54,7 +54,8 @@ def ensure_index(self): self.db.session.ensure_index('destination_ip', unique=False, background=True) self.db.session.ensure_index('source_port', unique=False, background=True) self.db.session.ensure_index('honeypot', unique=False, background=True) - self.db.session.ensure_index('timestamp', unique=False, background=True) + self.set_coll_indexttl('session', indexttl) + self.set_coll_indexttl('hpfeed', indexttl) self.db.session.ensure_index('identifier', unique=False, background=True) self.db.daily_stats.ensure_index([('channel', 1), ('date', 1)]) self.db.counts.ensure_index([('identifier', 1), ('date', 1)]) @@ -62,6 +63,41 @@ def ensure_index(self): self.db.metadata.ensure_index([('ip', 1), ('honeypot', 1)]) self.db.metadata.ensure_index('ip', unique=False, background=True) + def set_coll_indexttl(self, coll, indexttl): + """Sets the Index TTL (expireAfterSeconds property) on the timestamp field + of a collection. + Inputs: + coll (str): collection name + indexttl (int): number, in seconds, of how long after timestamp field value + before Mongo TTL worker removes the expired document + Outputs: none + """ + coll_info_timestamp = self.db[coll].index_information().get('timestamp_1') + if coll_info_timestamp: + coll_info_ttlsecs = coll_info_timestamp.get('expireAfterSeconds') + else: + coll_info_ttlsecs = None + # if expireAfterSeconds not set on Index and indexttl != False + if not coll_info_ttlsecs and indexttl != False: + if coll_info_timestamp: + self.db[coll].drop_index('timestamp_1') + self.db[coll].ensure_index('timestamp', unique=False, + background=True, expireAfterSeconds=indexttl) + # if expireAfterSeconds IS set but indexttl == False (indicating it no longer should be) + elif coll_info_ttlsecs and indexttl == False: + self.db[coll].drop_index('timestamp_1') + self.db[coll].ensure_index('timestamp', unique=False, background=True) + # if a user has changed the expireTTL value since last set + elif coll_info_ttlsecs and indexttl != False and indexttl != coll_info_ttlsecs: + self.db.command('collMod', coll, + index = { 'keyPattern': { 'timestamp': 1}, + 'background': True, + 'expireAfterSeconds': indexttl + }) + #self.db.session.ensure_index('timestamp', unique=False, background=True) + #self.db.hpfeed.ensure_index('timestamp', unique=False, background=True) + + def insert_normalized(self, ndata, hpfeed_id, identifier=None): assert isinstance(hpfeed_id, ObjectId) #ndata is a collection of dictionaries diff --git a/mnemosyne/runner.py b/mnemosyne/runner.py index 3b32d48..112f698 100644 --- a/mnemosyne/runner.py +++ b/mnemosyne/runner.py @@ -58,6 +58,11 @@ def parse_config(config_file): config['mongo_host'] = parser.get('mongodb', 'mongo_host') config['mongo_port'] = parser.getint('mongodb', 'mongo_port') config['mongo_db'] = parser.get('mongodb', 'database') + try: + config['mongo_indexttl'] = parser.getint('mongodb', 'mongo_indexttl') + except ValueError: + # if no value set or not an int, just set to False + config['mongo_indexttl'] = False config['hpf_feeds'] = parser.get('hpfriends', 'channels').split(',') config['hpf_ident'] = parser.get('hpfriends', 'ident') @@ -116,7 +121,7 @@ def do_logging(file_log=None, loggly_token=None): greenlets = {} db = mnemodb.MnemoDB(host=c['mongo_host'], port=c['mongo_port'], - database_name=c['mongo_db']) + database_name=c['mongo_db'], indexttl=c['mongo_indexttl']) webapi = None hpfriends_puller = None diff --git a/requirements.txt b/requirements.txt index 50e0b83..4377602 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,4 @@ -pymongo==2.7.2 +pymongo==2.8.1 gevent==1.0.2 bottle-mongodb==0.2.1 bottle-cork==0.6