Skip to content

Commit

Permalink
[receiver/mongodbreceiver] Add metric version checks to prevent parti…
Browse files Browse the repository at this point in the history
…al errors (open-telemetry#13329)

* add extents version check

Mongo version 4.4+ no longer returns numExtents since it is part of the obsolete MMAPv1 storage engine. See https://www.mongodb.com/docs/manual/release-notes/4.4-compatibility/\#mmapv1-cleanup for more details

* add version check for session count

Mongodb 2.6 is receiving a partial error because the 'storageEngine', 'name' path  does not exist. This is a wiredTiger storge engine metric and is only available in 3.0+. See https://www.mongodb.com/docs/v3.0/reference/command/serverStatus/\#serverStatus.wiredTiger.session

* add cache operations version check

Mongodb 2.6 is receiving a partial error because the 'storageEngine', 'name' path  does not exist. This is a wiredTiger storage engine metric and is only available in 3.0+. See https://www.mongodb.com/docs/v4.0/reference/command/serverStatus/\#serverstatus.wiredTiger.cache

* add version checek for connection counts 'active' attribute

mongodb below 4.0 were getting partial errors because connection counts 'active' attribute is available 4.0+. See https://www.mongodb.com/docs/v4.0/reference/command/serverStatus/\#serverstatus.connections.active

* add global lock time metric collector for 3.0

In the old version mongodb had a 4.0 check on global lock time metric. This check is not needed as mongodb supports this metric path on 2.6, 3.0, 4.0 and 5.0. Before 3.0 was not collecting the global status metric at all. 2.6 was using a less direct approach to collect when the metric can be gathered just like all other versions. This is supported by https://www.mongodb.com/docs/v2.6/reference/command/serverStatus/\#serverStatus.globalLock.totalTime and https://www.mongodb.com/docs/v3.0/reference/command/serverStatus/\#serverStatus.globalLock.totalTime

* add changelog

* fix readme
  • Loading branch information
JonathanWamsley authored Aug 15, 2022
1 parent 5392aed commit 350d653
Show file tree
Hide file tree
Showing 5 changed files with 202 additions and 198 deletions.
8 changes: 7 additions & 1 deletion receiver/mongodbreceiver/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ This receiver supports MongoDB versions:

Mongodb recommends to set up a least privilege user (LPU) with a [`clusterMonitor` role](https://www.mongodb.com/docs/v5.0/reference/built-in-roles/#mongodb-authrole-clusterMonitor) in order to collect metrics. Please refer to [lpu.sh](./testdata/integration/scripts/lpu.sh) for an example of how to configure these permissions.

Collecting metrics `mongodb.global_lock.time` and `mongodb.index.access.count` are only available for mongodb 4.0+.

## Configuration

Expand Down Expand Up @@ -61,6 +60,13 @@ The full list of settings exposed for this receiver are documented [here](./conf
## Metrics
The following metric are available with versions:
- `mongodb.extent.count` < 4.4 with mmapv1 storage engine
- `mongodb.session.count` >= 3.0 with wiredTiger storage engine
- `mongodb.cache.operations` >= 3.0 with wiredTiger storage engine
- `mongodb.connection.count` with attribute `active` is available >= 4.0
- `mongodb.index.access.count` >= 4.0

Details about the metrics produced by this receiver can be found in [metadata.yaml](./metadata.yaml)

[beta]:https://github.com/open-telemetry/opentelemetry-collector#beta
Expand Down
90 changes: 45 additions & 45 deletions receiver/mongodbreceiver/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,23 +135,34 @@ func (s *mongodbScraper) recordIndexSize(now pcommon.Timestamp, doc bson.M, dbNa
}

func (s *mongodbScraper) recordExtentCount(now pcommon.Timestamp, doc bson.M, dbName string, errs *scrapererror.ScrapeErrors) {
extentsPath := []string{"numExtents"}
extents, err := dig(doc, extentsPath)
if err != nil {
errs.AddPartial(1, err)
return
}
extentsVal, err := parseInt(extents)
if err != nil {
errs.AddPartial(1, err)
return
// Mongo version 4.4+ no longer returns numExtents since it is part of the obsolete MMAPv1
// https://www.mongodb.com/docs/manual/release-notes/4.4-compatibility/#mmapv1-cleanup
mongo44, _ := version.NewVersion("4.4")
if s.mongoVersion.LessThan(mongo44) {
extentsPath := []string{"numExtents"}
extents, err := dig(doc, extentsPath)
if err != nil {
errs.AddPartial(1, err)
return
}
extentsVal, err := parseInt(extents)
if err != nil {
errs.AddPartial(1, err)
return
}
s.mb.RecordMongodbExtentCountDataPoint(now, extentsVal, dbName)
}
s.mb.RecordMongodbExtentCountDataPoint(now, extentsVal, dbName)
}

// ServerStatus
func (s *mongodbScraper) recordConnections(now pcommon.Timestamp, doc bson.M, dbName string, errs *scrapererror.ScrapeErrors) {
mongo40, _ := version.NewVersion("4.0")
for ctVal, ct := range metadata.MapAttributeConnectionType {
// Mongo version 4.0 added connections.active
// reference: https://www.mongodb.com/docs/v4.0/reference/command/serverStatus/#serverstatus.connections.active
if s.mongoVersion.LessThan(mongo40) && ctVal == "active" {
continue
}
connKey := []string{"connections", ctVal}
conn, err := dig(doc, connKey)
if err != nil {
Expand Down Expand Up @@ -206,7 +217,13 @@ func (s *mongodbScraper) recordDocumentOperations(now pcommon.Timestamp, doc bso
}

func (s *mongodbScraper) recordSessionCount(now pcommon.Timestamp, doc bson.M, errs *scrapererror.ScrapeErrors) {
// Collect session count
// Collect session count for version 3.0+
// https://www.mongodb.com/docs/v3.0/reference/command/serverStatus/#serverStatus.wiredTiger.session
mongo30, _ := version.NewVersion("3.0")
if s.mongoVersion.LessThan(mongo30) {
return
}

storageEngine, err := dig(doc, []string{"storageEngine", "name"})
if err != nil {
s.logger.Error("failed to find storage engine for session count", zap.Error(err))
Expand Down Expand Up @@ -254,6 +271,13 @@ func (s *mongodbScraper) recordOperations(now pcommon.Timestamp, doc bson.M, err

func (s *mongodbScraper) recordCacheOperations(now pcommon.Timestamp, doc bson.M, errs *scrapererror.ScrapeErrors) {
// Collect Cache Hits & Misses if wiredTiger storage engine is used
// WiredTiger.cache metrics are available in 3.0+
// https://www.mongodb.com/docs/v4.0/reference/command/serverStatus/#serverstatus.wiredTiger.cache
mongo30, _ := version.NewVersion("3.0")
if s.mongoVersion.LessThan(mongo30) {
return
}

storageEngine, err := dig(doc, []string{"storageEngine", "name"})
if err != nil {
s.logger.Error("failed to find storage engine for cache operation", zap.Error(err))
Expand Down Expand Up @@ -301,43 +325,19 @@ func (s *mongodbScraper) recordCacheOperations(now pcommon.Timestamp, doc bson.M
}

func (s *mongodbScraper) recordGlobalLockTime(now pcommon.Timestamp, doc bson.M, errs *scrapererror.ScrapeErrors) {
var heldTimeUs int64

// Mongo version greater than or equal to 4.0 have it in the serverStats at "globalLock", "totalTime"
// reference: https://docs.mongodb.com/v4.0/reference/command/serverStatus/#server-status-global-lock
mongo40, _ := version.NewVersion("4.0")
if s.mongoVersion.GreaterThanOrEqual(mongo40) {
val, err := dig(doc, []string{"globalLock", "totalTime"})
if err != nil {
errs.AddPartial(1, err)
return
}
parsedVal, err := parseInt(val)
if err != nil {
errs.AddPartial(1, err)
return
}
heldTimeUs = parsedVal
} else {
for _, lockType := range []string{"W", "R", "r", "w"} {
waitTime, err := dig(doc, []string{"locks", ".", "timeAcquiringMicros", lockType})
if err != nil {
continue
}
waitTimeVal, err := parseInt(waitTime)
if err != nil {
errs.AddPartial(1, err)
}
heldTimeUs += waitTimeVal
}
val, err := dig(doc, []string{"globalLock", "totalTime"})
if err != nil {
errs.AddPartial(1, err)
return
}
if heldTimeUs != 0 {
htMilliseconds := heldTimeUs / 1000
s.mb.RecordMongodbGlobalLockTimeDataPoint(now, htMilliseconds)
parsedVal, err := parseInt(val)
if err != nil {
errs.AddPartial(1, err)
return
}

errs.AddPartial(1, fmt.Errorf("was unable to calculate global lock time"))
heldTimeMilliseconds := parsedVal / 1000
s.mb.RecordMongodbGlobalLockTimeDataPoint(now, heldTimeMilliseconds)
}

func (s *mongodbScraper) recordCursorCount(now pcommon.Timestamp, doc bson.M, errs *scrapererror.ScrapeErrors) {
Expand Down
34 changes: 0 additions & 34 deletions receiver/mongodbreceiver/scraper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,8 @@ import (
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
"go.mongodb.org/mongo-driver/mongo/integration/mtest"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/receiver/scrapererror"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/scrapertest"
Expand Down Expand Up @@ -118,37 +115,6 @@ func TestScrapeNoClient(t *testing.T) {
require.Error(t, err)
}

func TestGlobalLockTimeOldFormat(t *testing.T) {
cfg := createDefaultConfig().(*Config)
cfg.Metrics = metadata.DefaultMetricsSettings()
scraper := newMongodbScraper(componenttest.NewNopReceiverCreateSettings(), cfg)
mong26, err := version.NewVersion("2.6")
require.NoError(t, err)
scraper.mongoVersion = mong26
doc := primitive.M{
"locks": primitive.M{
".": primitive.M{
"timeLockedMicros": primitive.M{
"R": 122169,
"W": 132712,
},
"timeAcquiringMicros": primitive.M{
"R": 116749,
"W": 14340,
},
},
},
}

now := pcommon.NewTimestampFromTime(time.Now())
scraper.recordGlobalLockTime(now, doc, &scrapererror.ScrapeErrors{})
expectedValue := (int64(116749+14340) / 1000)

metrics := scraper.mb.Emit().ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics()
collectedValue := metrics.At(0).Sum().DataPoints().At(0).IntVal()
require.Equal(t, expectedValue, collectedValue)
}

func TestTopMetricsAggregation(t *testing.T) {
mont := mtest.New(t, mtest.NewOptions().ClientType(mtest.Mock))
defer mont.Close()
Expand Down
Loading

0 comments on commit 350d653

Please sign in to comment.