Skip to content

Commit

Permalink
feat: Add sampling store support to Badger (#4834)
Browse files Browse the repository at this point in the history
## Which problem is this PR solving?
Related  #3305

## Description of the changes
-   Implemented badger db for sampling store

## How was this change tested?
- Added Unit test and also tested it with the already Implemented
integration test

## Checklist
- [x] I have read
https://github.com/jaegertracing/jaeger/blob/master/CONTRIBUTING_GUIDELINES.md
- [x] I have signed all commits
- [x] I have added unit tests for the new functionality
- [x] I have run lint and test steps successfully
  - for `jaeger`: `make lint test`
  - for `jaeger-ui`: `yarn lint` and `yarn test`

---------

Signed-off-by: slayer321 <[email protected]>
slayer321 authored Oct 26, 2023

Verified

This commit was signed with the committer’s verified signature.
hyrsky Santeri Hurnanen
1 parent 0e3be02 commit f99eae5
Showing 5 changed files with 413 additions and 0 deletions.
7 changes: 7 additions & 0 deletions plugin/storage/badger/factory.go
Original file line number Diff line number Diff line change
@@ -29,8 +29,10 @@ import (
"github.com/jaegertracing/jaeger/pkg/metrics"
"github.com/jaegertracing/jaeger/plugin"
depStore "github.com/jaegertracing/jaeger/plugin/storage/badger/dependencystore"
badgerSampling "github.com/jaegertracing/jaeger/plugin/storage/badger/samplingstore"
badgerStore "github.com/jaegertracing/jaeger/plugin/storage/badger/spanstore"
"github.com/jaegertracing/jaeger/storage/dependencystore"
"github.com/jaegertracing/jaeger/storage/samplingstore"
"github.com/jaegertracing/jaeger/storage/spanstore"
)

@@ -170,6 +172,11 @@ func (f *Factory) CreateDependencyReader() (dependencystore.Reader, error) {
return depStore.NewDependencyStore(sr), nil
}

// CreateSamplingStore implements storage.SamplingStoreFactory
func (f *Factory) CreateSamplingStore(maxBuckets int) (samplingstore.Store, error) {
return badgerSampling.NewSamplingStore(f.store), nil
}

// Close Implements io.Closer and closes the underlying storage
func (f *Factory) Close() error {
close(f.maintenanceDone)
262 changes: 262 additions & 0 deletions plugin/storage/badger/samplingstore/storage.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,262 @@
// Copyright (c) 2023 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package samplingstore

import (
"bytes"
"encoding/binary"
"encoding/json"
"fmt"
"time"

"github.com/dgraph-io/badger/v3"

"github.com/jaegertracing/jaeger/cmd/collector/app/sampling/model"
jaegermodel "github.com/jaegertracing/jaeger/model"
)

const (
throughputKeyPrefix byte = 0x08
probabilitiesKeyPrefix byte = 0x09
)

type SamplingStore struct {
store *badger.DB
}

type ProbabilitiesAndQPS struct {
Hostname string
Probabilities model.ServiceOperationProbabilities
QPS model.ServiceOperationQPS
}

func NewSamplingStore(db *badger.DB) *SamplingStore {
return &SamplingStore{
store: db,
}
}

func (s *SamplingStore) InsertThroughput(throughput []*model.Throughput) error {
startTime := jaegermodel.TimeAsEpochMicroseconds(time.Now())
entriesToStore := make([]*badger.Entry, 0)
entries, err := s.createThroughputEntry(throughput, startTime)
if err != nil {
return err
}
entriesToStore = append(entriesToStore, entries)
err = s.store.Update(func(txn *badger.Txn) error {
for i := range entriesToStore {
err = txn.SetEntry(entriesToStore[i])
if err != nil {
return err
}
}

return nil
})

return nil
}

func (s *SamplingStore) GetThroughput(start, end time.Time) ([]*model.Throughput, error) {
var retSlice []*model.Throughput
prefix := []byte{throughputKeyPrefix}

err := s.store.View(func(txn *badger.Txn) error {
opts := badger.DefaultIteratorOptions
it := txn.NewIterator(opts)
defer it.Close()

val := []byte{}
for it.Seek(prefix); it.ValidForPrefix(prefix); it.Next() {
item := it.Item()
k := item.Key()
startTime := k[1:9]
fmt.Printf("key=%s\n", k)
val, err := item.ValueCopy(val)
if err != nil {
return err
}
t, err := initalStartTime(startTime)
if err != nil {
return err
}
throughputs, err := decodeThroughtputValue(val)
if err != nil {
return err
}

if t.After(start) && (t.Before(end) || t.Equal(end)) {
retSlice = append(retSlice, throughputs...)
}
}
return nil
})
if err != nil {
return nil, err
}

return retSlice, nil
}

func (s *SamplingStore) InsertProbabilitiesAndQPS(hostname string,
probabilities model.ServiceOperationProbabilities,
qps model.ServiceOperationQPS,
) error {
startTime := jaegermodel.TimeAsEpochMicroseconds(time.Now())
entriesToStore := make([]*badger.Entry, 0)
entries, err := s.createProbabilitiesEntry(hostname, probabilities, qps, startTime)
if err != nil {
return err
}
entriesToStore = append(entriesToStore, entries)
err = s.store.Update(func(txn *badger.Txn) error {
// Write the entries
for i := range entriesToStore {
err = txn.SetEntry(entriesToStore[i])
if err != nil {
return err
}
}

return nil
})

return nil
}

// GetLatestProbabilities implements samplingstore.Reader#GetLatestProbabilities.
func (s *SamplingStore) GetLatestProbabilities() (model.ServiceOperationProbabilities, error) {
var retVal model.ServiceOperationProbabilities
var unMarshalProbabilities ProbabilitiesAndQPS
prefix := []byte{probabilitiesKeyPrefix}

err := s.store.View(func(txn *badger.Txn) error {
opts := badger.DefaultIteratorOptions
it := txn.NewIterator(opts)
defer it.Close()

val := []byte{}
for it.Seek(prefix); it.ValidForPrefix(prefix); it.Next() {
item := it.Item()
val, err := item.ValueCopy(val)
if err != nil {
return err
}
unMarshalProbabilities, err = decodeProbabilitiesValue(val)
if err != nil {
return err
}
retVal = unMarshalProbabilities.Probabilities
}
return nil
})
if err != nil {
return nil, err
}
return retVal, nil
}

func (s *SamplingStore) createProbabilitiesEntry(hostname string, probabilities model.ServiceOperationProbabilities, qps model.ServiceOperationQPS, startTime uint64) (*badger.Entry, error) {
pK, pV, err := s.createProbabilitiesKV(hostname, probabilities, qps, startTime)
if err != nil {
return nil, err
}

e := s.createBadgerEntry(pK, pV)

return e, nil
}

func (s *SamplingStore) createProbabilitiesKV(hostname string, probabilities model.ServiceOperationProbabilities, qps model.ServiceOperationQPS, startTime uint64) ([]byte, []byte, error) {
key := make([]byte, 16)
key[0] = probabilitiesKeyPrefix
pos := 1
binary.BigEndian.PutUint64(key[pos:], startTime)

var bb []byte
var err error
val := ProbabilitiesAndQPS{
Hostname: hostname,
Probabilities: probabilities,
QPS: qps,
}
bb, err = json.Marshal(val)
return key, bb, err
}

func (s *SamplingStore) createThroughputEntry(throughput []*model.Throughput, startTime uint64) (*badger.Entry, error) {
pK, pV, err := s.createThroughputKV(throughput, startTime)
if err != nil {
return nil, err
}

e := s.createBadgerEntry(pK, pV)

return e, nil
}

func (s *SamplingStore) createBadgerEntry(key []byte, value []byte) *badger.Entry {
return &badger.Entry{
Key: key,
Value: value,
}
}

func (s *SamplingStore) createThroughputKV(throughput []*model.Throughput, startTime uint64) ([]byte, []byte, error) {
key := make([]byte, 16)
key[0] = throughputKeyPrefix
pos := 1
binary.BigEndian.PutUint64(key[pos:], startTime)

var bb []byte
var err error

bb, err = json.Marshal(throughput)
return key, bb, err
}

func decodeThroughtputValue(val []byte) ([]*model.Throughput, error) {
var throughput []*model.Throughput

err := json.Unmarshal(val, &throughput)
if err != nil {
return nil, err
}
return throughput, err
}

func decodeProbabilitiesValue(val []byte) (ProbabilitiesAndQPS, error) {
var probabilities ProbabilitiesAndQPS

err := json.Unmarshal(val, &probabilities)
if err != nil {
return ProbabilitiesAndQPS{}, err
}
return probabilities, nil
}

func initalStartTime(timeBytes []byte) (time.Time, error) {
var usec int64

buf := bytes.NewReader(timeBytes)

if err := binary.Read(buf, binary.BigEndian, &usec); err != nil {
panic(nil)
}

t := time.UnixMicro(usec)
return t, nil
}
140 changes: 140 additions & 0 deletions plugin/storage/badger/samplingstore/storage_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
// Copyright (c) 2023 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package samplingstore

import (
"encoding/json"
"testing"
"time"

"github.com/dgraph-io/badger/v3"
"github.com/stretchr/testify/assert"

samplemodel "github.com/jaegertracing/jaeger/cmd/collector/app/sampling/model"
)

func newTestSamplingStore(db *badger.DB) *SamplingStore {
return NewSamplingStore(db)
}

func TestInsertThroughput(t *testing.T) {
runWithBadger(t, func(t *testing.T, store *SamplingStore) {
throughputs := []*samplemodel.Throughput{
{Service: "my-svc", Operation: "op"},
{Service: "our-svc", Operation: "op2"},
}
err := store.InsertThroughput(throughputs)
assert.NoError(t, err)
})
}

func TestGetThroughput(t *testing.T) {
runWithBadger(t, func(t *testing.T, store *SamplingStore) {
start := time.Now()
expected := []*samplemodel.Throughput{
{Service: "my-svc", Operation: "op"},
{Service: "our-svc", Operation: "op2"},
}
err := store.InsertThroughput(expected)
assert.NoError(t, err)

actual, err := store.GetThroughput(start, start.Add(time.Second*time.Duration(10)))
assert.NoError(t, err)
assert.Equal(t, expected, actual)
})
}

func TestInsertProbabilitiesAndQPS(t *testing.T) {
runWithBadger(t, func(t *testing.T, store *SamplingStore) {
err := store.InsertProbabilitiesAndQPS(
"dell11eg843d",
samplemodel.ServiceOperationProbabilities{"new-srv": {"op": 0.1}},
samplemodel.ServiceOperationQPS{"new-srv": {"op": 4}},
)
assert.NoError(t, err)
})
}

func TestGetLatestProbabilities(t *testing.T) {
runWithBadger(t, func(t *testing.T, store *SamplingStore) {
err := store.InsertProbabilitiesAndQPS(
"dell11eg843d",
samplemodel.ServiceOperationProbabilities{"new-srv": {"op": 0.1}},
samplemodel.ServiceOperationQPS{"new-srv": {"op": 4}},
)
assert.NoError(t, err)
err = store.InsertProbabilitiesAndQPS(
"newhostname",
samplemodel.ServiceOperationProbabilities{"new-srv2": {"op": 0.123}},
samplemodel.ServiceOperationQPS{"new-srv2": {"op": 1}},
)
assert.NoError(t, err)

expected := samplemodel.ServiceOperationProbabilities{"new-srv2": {"op": 0.123}}
actual, err := store.GetLatestProbabilities()
assert.NoError(t, err)
assert.Equal(t, expected, actual)
})
}

func TestDecodeProbabilitiesValue(t *testing.T) {
expected := ProbabilitiesAndQPS{
Hostname: "dell11eg843d",
Probabilities: samplemodel.ServiceOperationProbabilities{"new-srv": {"op": 0.1}},
QPS: samplemodel.ServiceOperationQPS{"new-srv": {"op": 4}},
}

marshalBytes, err := json.Marshal(expected)
assert.NoError(t, err)
// This should pass without error
actual, err := decodeProbabilitiesValue(marshalBytes)
assert.NoError(t, err)
assert.Equal(t, expected, actual)

// Simulate data corruption by removing the first byte.
corruptedBytes := marshalBytes[1:]
_, err = decodeProbabilitiesValue(corruptedBytes)
assert.Error(t, err) // Expect an error
}

func TestDecodeThroughtputValue(t *testing.T) {
expected := []*samplemodel.Throughput{
{Service: "my-svc", Operation: "op"},
{Service: "our-svc", Operation: "op2"},
}

marshalBytes, err := json.Marshal(expected)
assert.NoError(t, err)
acrual, err := decodeThroughtputValue(marshalBytes)
assert.NoError(t, err)
assert.Equal(t, expected, acrual)
}

func runWithBadger(t *testing.T, test func(t *testing.T, store *SamplingStore)) {
opts := badger.DefaultOptions("")

opts.SyncWrites = false
dir := t.TempDir()
opts.Dir = dir
opts.ValueDir = dir

store, err := badger.Open(opts)
assert.NoError(t, err)
defer func() {
assert.NoError(t, store.Close())
}()
ss := newTestSamplingStore(store)
test(t, ss)
}
3 changes: 3 additions & 0 deletions plugin/storage/integration/badgerstore_test.go
Original file line number Diff line number Diff line change
@@ -49,6 +49,9 @@ func (s *BadgerIntegrationStorage) initialize() error {
if err != nil {
return err
}
if s.SamplingStore, err = s.factory.CreateSamplingStore(0); err != nil {
return err
}

s.SpanReader = sr
s.SpanWriter = sw
1 change: 1 addition & 0 deletions plugin/storage/integration/integration.go
Original file line number Diff line number Diff line change
@@ -452,6 +452,7 @@ func (s *StorageIntegration) testGetLatestProbability(t *testing.T) {
}
defer s.cleanUp(t)

s.SamplingStore.InsertProbabilitiesAndQPS("newhostname1", samplemodel.ServiceOperationProbabilities{"new-srv3": {"op": 0.123}}, samplemodel.ServiceOperationQPS{"new-srv2": {"op": 11}})
s.SamplingStore.InsertProbabilitiesAndQPS("dell11eg843d", samplemodel.ServiceOperationProbabilities{"new-srv": {"op": 0.1}}, samplemodel.ServiceOperationQPS{"new-srv": {"op": 4}})

expected := samplemodel.ServiceOperationProbabilities{"new-srv": {"op": 0.1}}

0 comments on commit f99eae5

Please sign in to comment.