Skip to content

Commit

Permalink
[mongo] Resolve deprecated collStats command in oplog size collection (
Browse files Browse the repository at this point in the history
…#19133)

* deprecate collStats in oplog size collection

* fix lint

* fix replication oplog size

* add changelog

* typos
  • Loading branch information
lu-zhengda authored and nubtron committed Nov 26, 2024
1 parent c11b6c8 commit 2b397fa
Show file tree
Hide file tree
Showing 5 changed files with 253 additions and 28 deletions.
1 change: 1 addition & 0 deletions mongo/changelog.d/19133.fixed
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Resolved deprecation warning for `collStats` by using `$collStats` aggregation pipeline to collect oplog size in MongoDB 6.2+.
35 changes: 28 additions & 7 deletions mongo/datadog_checks/mongo/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ def __init__(self, config, log, replicaset: str = None):
self._cli = MongoClient(**options)
self.__hostname = None

# Check if the server supports the $collStats aggregation pipeline stage.
self.coll_stats_pipeline_supported = True

def __getitem__(self, item):
return self._cli[item]

Expand Down Expand Up @@ -95,21 +98,39 @@ def current_op(self, session=None):
# The $currentOp stage returns a cursor over a stream of documents, each of which reports a single operation.
return self["admin"].aggregate([{'$currentOp': {'allUsers': True}}], session=session)

def coll_stats(self, db_name, coll_name, session=None):
def get_collection_stats(self, db_name, coll_name, stats=None, session=None):
if not self.coll_stats_pipeline_supported:
return [self.coll_stats_compatible(db_name, coll_name, session)]
try:
return self.coll_stats(db_name, coll_name, stats, session)
except OperationFailure as e:
if e.code == 13:
# Unauthorized to run $collStats, do not try use the compatible mode, raise the exception
raise e
# Failed to get collection stats using $collStats aggregation
self._log.debug(
"Failed to collect stats for collection %s with $collStats, fallback to collStats command",
coll_name,
e.details,
)
self.coll_stats_pipeline_supported = False
return [self.coll_stats_compatible(db_name, coll_name, session)]

def coll_stats(self, db_name, coll_name, stats=None, session=None):
if not stats:
stats = {"latencyStats", "storageStats", "queryExecStats"}
stats = {stat: {} for stat in stats}

return self[db_name][coll_name].aggregate(
[
{
"$collStats": {
"latencyStats": {},
"storageStats": {},
"queryExecStats": {},
}
"$collStats": stats,
},
],
session=session,
)

def coll_stats_compatable(self, db_name, coll_name, session=None):
def coll_stats_compatible(self, db_name, coll_name, session=None):
# collStats is deprecated in MongoDB 6.2. Use the $collStats aggregation stage instead.
return self[db_name].command({'collStats': coll_name}, session=session)

Expand Down
20 changes: 2 additions & 18 deletions mongo/datadog_checks/mongo/collectors/coll_stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ def __init__(self, check, db_name, tags, coll_names=None):
self.coll_names = coll_names
self.db_name = db_name
self.max_collections_per_database = check._config.database_autodiscovery_config['max_collections_per_database']
self.coll_stats_pipeline_supported = True
self._collection_interval = check._config.metrics_collection_interval['collection']
self._collector_key = (self.__class__.__name__, db_name) # db_name is part of collector key

Expand All @@ -40,22 +39,7 @@ def __calculate_oplatency_avg(self, latency_stats):
return latency_stats

def _get_collection_stats(self, api, coll_name):
if not self.coll_stats_pipeline_supported:
return [api.coll_stats_compatable(self.db_name, coll_name)]
try:
return api.coll_stats(self.db_name, coll_name)
except OperationFailure as e:
if e.code == 13:
# Unauthorized to run $collStats, do not try use the compatible mode, raise the exception
raise e
# Failed to get collection stats using $collStats aggregation
self.log.debug(
"Failed not collect stats for collection %s with $collStats, fallback to collStats command",
coll_name,
e.details,
)
self.coll_stats_pipeline_supported = False
return [api.coll_stats_compatable(self.db_name, coll_name)]
return api.get_collection_stats(self.db_name, coll_name)

@collection_interval_checker
def collect(self, api):
Expand Down Expand Up @@ -83,7 +67,7 @@ def collect(self, api):
# If the collection is sharded, add the shard tag
additional_tags.append("shard:%s" % coll_stats['shard'])
# Submit the metrics
if self.coll_stats_pipeline_supported:
if api.coll_stats_pipeline_supported:
storage_stats = coll_stats.get('storageStats', {})
latency_stats = coll_stats.get('latencyStats', {})
query_stats = coll_stats.get('queryExecStats', {})
Expand Down
24 changes: 21 additions & 3 deletions mongo/datadog_checks/mongo/collectors/replication_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,24 @@ def compatible_with(self, deployment):

return True

def _get_oplog_size(self, api, oplog_collection_name):
try:
oplog_storage_stats = api.get_collection_stats("local", oplog_collection_name, stats=["storageStats"])[0]
except pymongo.errors.OperationFailure as e:
self.log.warning(
"Could not collect oplog used size for collection %s: %s", oplog_collection_name, e.details
)
return
except Exception as e:
self.log.error(
"Unexpected error when fetch oplog used size for collection %s: %s", oplog_collection_name, e
)
return

if api.coll_stats_pipeline_supported:
return oplog_storage_stats.get("storageStats", {}).get("size")
return oplog_storage_stats.get('size')

def collect(self, api):
# Fetch information analogous to Mongo's db.getReplicationInfo()
localdb = api["local"]
Expand All @@ -46,9 +64,9 @@ def collect(self, api):

oplog = localdb[collection_name]

oplog_data['usedSizeMB'] = round_value(
localdb.command("collstats", collection_name)['size'] / 2.0**20, 2
)
oplog_data_size = self._get_oplog_size(api, collection_name)
if oplog_data_size is not None:
oplog_data['usedSizeMB'] = round_value(oplog_data_size / 2.0**20, 2)

op_asc_cursor = oplog.find({"ts": {"$exists": 1}}).sort("$natural", pymongo.ASCENDING).limit(1)
op_dsc_cursor = oplog.find({"ts": {"$exists": 1}}).sort("$natural", pymongo.DESCENDING).limit(1)
Expand Down
201 changes: 201 additions & 0 deletions mongo/tests/fixtures/$collStats-oplog.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
[
{
"ns": "local.oplog.rs",
"host": "7515219d9503:27017",
"localTime": {
"$date": "2024-07-01T20:36:49.358Z"
},
"storageStats": {
"size": 907806,
"count": 4341,
"avgObjSize": 209,
"storageSize": 196608,
"capped": true,
"max": 10,
"maxSize": 16777216,
"sleepCount": 0,
"sleepMS": 0,
"wiredTiger": {
"metadata": {
"formatVersion": 1,
"oplogKeyExtractionVersion": 1
},
"creationString": "access_pattern_hint=none,allocation_size=4KB,app_metadata=(formatVersion=1,oplogKeyExtractionVersion=1),assert=(commit_timestamp=none,durable_timestamp=none,read_timestamp=none),block_allocation=best,block_compressor=snappy,cache_resident=false,checksum=on,colgroups=,collator=,columns=,dictionary=0,encryption=(keyid=,name=),exclusive=false,extractor=,format=btree,huffman_key=,huffman_value=,ignore_in_memory_cache_size=false,immutable=false,internal_item_max=0,internal_key_max=0,internal_key_truncate=true,internal_page_max=4KB,key_format=q,key_gap=10,leaf_item_max=0,leaf_key_max=0,leaf_page_max=32KB,leaf_value_max=64MB,log=(enabled=true),lsm=(auto_throttle=true,bloom=true,bloom_bit_count=16,bloom_config=,bloom_hash_count=8,bloom_oldest=false,chunk_count_limit=0,chunk_max=5GB,chunk_size=10MB,merge_custom=(prefix=,start_generation=0,suffix=),merge_max=15,merge_min=0),memory_page_image_max=0,memory_page_max=10m,os_cache_dirty_max=0,os_cache_max=0,prefix_compression=false,prefix_compression_min=4,source=,split_deepen_min_child=0,split_deepen_per_child=0,split_pct=90,type=file,value_format=u",
"type": "file",
"uri": "statistics:table:collection-8--9025131032214742726",
"LSM": {
"bloom filter false positives": 0,
"bloom filter hits": 0,
"bloom filter misses": 0,
"bloom filter pages evicted from cache": 0,
"bloom filter pages read into cache": 0,
"bloom filters in the LSM tree": 0,
"chunks in the LSM tree": 0,
"highest merge generation in the LSM tree": 0,
"queries that could have benefited from a Bloom filter that did not exist": 0,
"sleep for LSM checkpoint throttle": 0,
"sleep for LSM merge throttle": 0,
"total size of bloom filters": 0
},
"block-manager": {
"allocations requiring file extension": 56,
"blocks allocated": 577,
"blocks freed": 146,
"checkpoint size": 151552,
"file allocation unit size": 4096,
"file bytes available for reuse": 28672,
"file magic number": 120897,
"file major version number": 1,
"file size in bytes": 196608,
"minor version number": 0
},
"btree": {
"btree checkpoint generation": 143,
"column-store fixed-size leaf pages": 0,
"column-store internal pages": 0,
"column-store variable-size RLE encoded values": 0,
"column-store variable-size deleted values": 0,
"column-store variable-size leaf pages": 0,
"fixed-record size": 0,
"maximum internal page key size": 368,
"maximum internal page size": 4096,
"maximum leaf page key size": 2867,
"maximum leaf page size": 32768,
"maximum leaf page value size": 67108864,
"maximum tree depth": 3,
"number of key/value pairs": 0,
"overflow pages": 0,
"pages rewritten by compaction": 0,
"row-store empty values": 0,
"row-store internal pages": 0,
"row-store leaf pages": 0
},
"cache": {
"bytes currently in the cache": 1412447,
"bytes dirty in the cache cumulative": 79714225,
"bytes read into cache": 0,
"bytes written from cache": 14326498,
"checkpoint blocked page eviction": 0,
"data source pages selected for eviction unable to be evicted": 0,
"eviction walk passes of a file": 0,
"eviction walk target pages histogram - 0-9": 0,
"eviction walk target pages histogram - 10-31": 0,
"eviction walk target pages histogram - 128 and higher": 0,
"eviction walk target pages histogram - 32-63": 0,
"eviction walk target pages histogram - 64-128": 0,
"eviction walks abandoned": 0,
"eviction walks gave up because they restarted their walk twice": 0,
"eviction walks gave up because they saw too many pages and found no candidates": 0,
"eviction walks gave up because they saw too many pages and found too few candidates": 0,
"eviction walks reached end of tree": 0,
"eviction walks started from root of tree": 0,
"eviction walks started from saved location in tree": 0,
"hazard pointer blocked page eviction": 0,
"in-memory page passed criteria to be split": 0,
"in-memory page splits": 0,
"internal pages evicted": 0,
"internal pages split during eviction": 0,
"leaf pages split during eviction": 0,
"modified pages evicted": 0,
"overflow pages read into cache": 0,
"page split during eviction deepened the tree": 0,
"page written requiring cache overflow records": 0,
"pages read into cache": 0,
"pages read into cache after truncate": 1,
"pages read into cache after truncate in prepare state": 0,
"pages read into cache requiring cache overflow entries": 0,
"pages requested from the cache": 11348,
"pages seen by eviction walk": 0,
"pages written from cache": 296,
"pages written requiring in-memory restoration": 0,
"tracked dirty bytes in the cache": 1411928,
"unmodified pages evicted": 0
},
"cache_walk": {
"Average difference between current eviction generation when the page was last considered": 0,
"Average on-disk page image size seen": 0,
"Average time in cache for pages that have been visited by the eviction server": 0,
"Average time in cache for pages that have not been visited by the eviction server": 0,
"Clean pages currently in cache": 0,
"Current eviction generation": 0,
"Dirty pages currently in cache": 0,
"Entries in the root page": 0,
"Internal pages currently in cache": 0,
"Leaf pages currently in cache": 0,
"Maximum difference between current eviction generation when the page was last considered": 0,
"Maximum page size seen": 0,
"Minimum on-disk page image size seen": 0,
"Number of pages never visited by eviction server": 0,
"On-disk page image sizes smaller than a single allocation unit": 0,
"Pages created in memory and never written": 0,
"Pages currently queued for eviction": 0,
"Pages that could not be queued for eviction": 0,
"Refs skipped during cache traversal": 0,
"Size of the root page": 0,
"Total number of pages currently in cache": 0
},
"compression": {
"compressed page maximum internal page size prior to compression": 4096,
"compressed page maximum leaf page size prior to compression ": 131072,
"compressed pages read": 0,
"compressed pages written": 154,
"page written failed to compress": 0,
"page written was too small to compress": 142
},
"cursor": {
"bulk loaded cursor insert calls": 0,
"cache cursors reuse count": 2843,
"close calls that result in cache": 0,
"create calls": 62,
"insert calls": 4341,
"insert key and value bytes": 946875,
"modify": 0,
"modify key and value bytes affected": 0,
"modify value bytes modified": 0,
"next calls": 9366,
"open cursor count": 0,
"operation restarted": 1,
"prev calls": 20,
"remove calls": 0,
"remove key bytes removed": 0,
"reserve calls": 0,
"reset calls": 12395,
"search calls": 6969,
"search near calls": 14,
"truncate calls": 0,
"update calls": 0,
"update key and value bytes": 0,
"update value size change": 0
},
"reconciliation": {
"dictionary matches": 0,
"fast-path pages deleted": 0,
"internal page key bytes discarded using suffix compression": 1409,
"internal page multi-block writes": 0,
"internal-page overflow keys": 0,
"leaf page key bytes discarded using prefix compression": 0,
"leaf page multi-block writes": 120,
"leaf-page overflow keys": 0,
"maximum blocks required for a page": 1,
"overflow values written": 0,
"page checksum matches": 431,
"page reconciliation calls": 282,
"page reconciliation calls for eviction": 0,
"pages deleted": 0
},
"session": {
"object compaction": 0
},
"transaction": {
"update conflicts": 0
}
},
"nindexes": 0,
"indexDetails": {},
"indexBuilds": [],
"totalIndexSize": 0,
"indexSizes": {},
"scaleFactor": 1
}
}
]

0 comments on commit 2b397fa

Please sign in to comment.