From fb4ee6707a28a627c62acc2ae4231986e162e69f Mon Sep 17 00:00:00 2001 From: Quentin Schroeder Date: Wed, 7 Oct 2015 11:15:05 -0400 Subject: [PATCH] add emptyCheckInterval option to improve performance In the case where a replicaset doesn't have any docs matching the target namespaces (which happens with tag aware sharding), we don't want to keep generating (empty) oplog cursors on that replica set. This option uses a cheaper `db.collection.count()` call to see if a replica set has any matching docs and waits a configurable amount of time before checking again. --- config.json | 1 + mongo_connector/connector.py | 14 ++++++++++++++ mongo_connector/oplog_manager.py | 18 ++++++++++++++++++ 3 files changed, 33 insertions(+) diff --git a/config.json b/config.json index 7121113a..593f5f84 100644 --- a/config.json +++ b/config.json @@ -8,6 +8,7 @@ "batchSize": -1, "verbosity": 0, "continueOnError": false, + "emptyCheckInterval": 60, "logging": { "type": "file", diff --git a/mongo_connector/connector.py b/mongo_connector/connector.py index 4cdc22f7..f33c6079 100644 --- a/mongo_connector/connector.py +++ b/mongo_connector/connector.py @@ -151,6 +151,7 @@ def from_config(cls, config): collection_dump=(not config['noDump']), batch_size=config['batchSize'], continue_on_error=config['continueOnError'], + empty_check_interval=config['emptyCheckInterval'], auth_username=config['authentication.adminUsername'], auth_key=auth_key, fields=config['fields'], @@ -970,6 +971,19 @@ def apply_ssl(option, cli_values): " set of documents due to errors may cause undefined" " behavior. Use this flag to dump only.") + empty_check_interval = add_option( + config_key="emptyCheckInterval", + default=60, + type=int) + + empty_check_interval.add_cli( + "--empty-check-interval", dest="empty_check_interval", + help="Before a collection dump, each OplogThread" + " will confirm that the connected replicaset has" + " documents in its collections that need to be" + " upserted. If none are found, it will wait this" + " many seconds before checking again.") + config_file = add_option() config_file.add_cli( "-c", "--config-file", dest="config_file", help= diff --git a/mongo_connector/oplog_manager.py b/mongo_connector/oplog_manager.py index c3fc5426..19e4a7e3 100755 --- a/mongo_connector/oplog_manager.py +++ b/mongo_connector/oplog_manager.py @@ -80,6 +80,9 @@ def __init__(self, primary_client, doc_managers, # Whether the collection dump gracefully handles exceptions self.continue_on_error = kwargs.get('continue_on_error', False) + # How often to recheck a replicaset with zero matching docs + self.empty_check_interval = kwargs.get('empty_check_interval', 60) + # Set of fields to export self.fields = kwargs.get('fields', []) @@ -412,6 +415,21 @@ def dump_collection(self): namespace = "%s.%s" % (database, coll) dump_set.append(namespace) + def has_docs_to_dump(): + for namespace in dump_set: + database, coll = namespace.split('.', 1) + target_coll = self.primary_client[database][coll] + if util.retry_until_ok(target_coll.count) > 0: + return True + return False + + while not has_docs_to_dump(): + LOG.warning("No docs matching namespace %s on %s," + " sleeping %d seconds" % ( + dump_set, self.primary_client, + self.empty_check_interval)) + time.sleep(self.empty_check_interval) + timestamp = util.retry_until_ok(self.get_last_oplog_timestamp) if timestamp is None: return None