Skip to content

Commit

Permalink
fix: Track all OTLP metadata bytes in usage tracker. (#12376)
Browse files Browse the repository at this point in the history
Co-authored-by: Vladyslav Diachenko <[email protected]>
  • Loading branch information
jeschkies and vlad-diachenko authored Mar 27, 2024
1 parent aca2a38 commit 016daa5
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 28 deletions.
8 changes: 8 additions & 0 deletions pkg/loghttp/push/otlp.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,10 @@ func otlpToLokiPushRequest(ctx context.Context, ld plog.Logs, userID string, ten
retentionPeriodForUser := tenantsRetention.RetentionPeriodFor(userID, lbs)

stats.StructuredMetadataBytes[retentionPeriodForUser] += int64(resourceAttributesAsStructuredMetadataSize)
if tracker != nil {
tracker.ReceivedBytesAdd(ctx, userID, retentionPeriodForUser, lbs, float64(resourceAttributesAsStructuredMetadataSize))
}

stats.ResourceAndSourceMetadataLabels[retentionPeriodForUser] = append(stats.ResourceAndSourceMetadataLabels[retentionPeriodForUser], resourceAttributesAsStructuredMetadata...)

for j := 0; j < sls.Len(); j++ {
Expand Down Expand Up @@ -202,6 +206,10 @@ func otlpToLokiPushRequest(ctx context.Context, ld plog.Logs, userID string, ten

scopeAttributesAsStructuredMetadataSize := labelsSize(scopeAttributesAsStructuredMetadata)
stats.StructuredMetadataBytes[retentionPeriodForUser] += int64(scopeAttributesAsStructuredMetadataSize)
if tracker != nil {
tracker.ReceivedBytesAdd(ctx, userID, retentionPeriodForUser, lbs, float64(scopeAttributesAsStructuredMetadataSize))
}

stats.ResourceAndSourceMetadataLabels[retentionPeriodForUser] = append(stats.ResourceAndSourceMetadataLabels[retentionPeriodForUser], scopeAttributesAsStructuredMetadata...)
for k := 0; k < logs.Len(); k++ {
log := logs.At(k)
Expand Down
39 changes: 11 additions & 28 deletions pkg/loghttp/push/otlp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ func TestOTLPToLokiPushRequest(t *testing.T) {
expectedPushRequest logproto.PushRequest
expectedStats Stats
otlpConfig OTLPConfig
tracker UsageTracker
}{
{
name: "no logs",
Expand Down Expand Up @@ -129,7 +128,6 @@ func TestOTLPToLokiPushRequest(t *testing.T) {
{
name: "service.name not defined in resource attributes",
otlpConfig: DefaultOTLPConfig(defaultGlobalOTLPConfig),
tracker: NewMockTracker(),
generateLogs: func() plog.Logs {
ld := plog.NewLogs()
ld.ResourceLogs().AppendEmpty().Resource().Attributes().PutStr("service.namespace", "foo")
Expand Down Expand Up @@ -164,32 +162,7 @@ func TestOTLPToLokiPushRequest(t *testing.T) {
},
StreamLabelsSize: 47,
MostRecentEntryTimestamp: now,
/*
logLinesBytesCustomTrackers: []customTrackerPair{
{
Labels: []labels.Label{
{Name: "service_namespace", Value: "foo"},
{Name: "tracker", Value: "foo"},
},
Bytes: map[time.Duration]int64{
time.Hour: 9,
},
},
},
structuredMetadataBytesCustomTrackers: []customTrackerPair{
{
Labels: []labels.Label{
{Name: "service_namespace", Value: "foo"},
{Name: "tracker", Value: "foo"},
},
Bytes: map[time.Duration]int64{
time.Hour: 0,
},
},
},
*/
},
//expectedTrackedUsaged:
},
{
name: "resource attributes and scope attributes stored as structured metadata",
Expand Down Expand Up @@ -518,9 +491,19 @@ func TestOTLPToLokiPushRequest(t *testing.T) {
} {
t.Run(tc.name, func(t *testing.T) {
stats := newPushStats()
pushReq := otlpToLokiPushRequest(context.Background(), tc.generateLogs(), "foo", fakeRetention{}, tc.otlpConfig, tc.tracker, stats)
tracker := NewMockTracker()
pushReq := otlpToLokiPushRequest(context.Background(), tc.generateLogs(), "foo", fakeRetention{}, tc.otlpConfig, tracker, stats)
require.Equal(t, tc.expectedPushRequest, *pushReq)
require.Equal(t, tc.expectedStats, *stats)

totalBytes := 0.0
for _, b := range stats.LogLinesBytes {
totalBytes += float64(b)
}
for _, b := range stats.StructuredMetadataBytes {
totalBytes += float64(b)
}
require.Equal(t, totalBytes, tracker.Total(), "Total tracked bytes must equal total bytes of the stats.")
})
}
}
Expand Down
9 changes: 9 additions & 0 deletions pkg/loghttp/push/push_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,7 @@ func TestParseRequest(t *testing.T) {
assert.NotNil(t, data, "Should give data for %d", index)
require.Equal(t, test.expectedStructuredMetadataBytes, structuredMetadataBytesReceived)
require.Equal(t, test.expectedBytes, bytesReceived)
require.Equalf(t, tracker.Total(), float64(bytesReceived), "tracked usage bytes must equal bytes received metric")
require.Equal(t, test.expectedLines, linesReceived)
require.Equal(t, float64(test.expectedStructuredMetadataBytes), testutil.ToFloat64(structuredMetadataBytesIngested.WithLabelValues("fake", "")))
require.Equal(t, float64(test.expectedBytes), testutil.ToFloat64(bytesIngested.WithLabelValues("fake", "")))
Expand Down Expand Up @@ -257,6 +258,14 @@ func NewMockTracker() *MockCustomTracker {
}
}

func (t *MockCustomTracker) Total() float64 {
total := float64(0)
for _, v := range t.receivedBytes {
total += v
}
return total
}

// DiscardedBytesAdd implements CustomTracker.
func (t *MockCustomTracker) DiscardedBytesAdd(_ context.Context, _, _ string, labels labels.Labels, value float64) {
t.discardedBytes[labels.String()] += value
Expand Down

0 comments on commit 016daa5

Please sign in to comment.