Skip to content

Commit

Permalink
Bloom Compactor: Optimize check for fingerprint ownership (grafana#11389
Browse files Browse the repository at this point in the history
)

Calling `c.sharding.OwnsFingerprint(tenant, uint64(fingerprint))` for
each Series of a TSDB index is very expensive, because it not only
creates the tenant's sub-ring but also needs to check the fingerprint
against it.

Instead, we can pre-calculate the current instance's token ranges and
check if the (uint32 converted) fingerprint is contained within these
ranges.
 

Signed-off-by: Christian Haudum <[email protected]>
  • Loading branch information
chaudum authored and rhnasc committed Apr 12, 2024
1 parent 1b8342f commit fa42e68
Show file tree
Hide file tree
Showing 11 changed files with 518 additions and 229 deletions.
58 changes: 35 additions & 23 deletions pkg/bloomcompactor/bloomcompactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"

"github.com/grafana/loki/pkg/bloomutils"
"github.com/grafana/loki/pkg/storage"
v1 "github.com/grafana/loki/pkg/storage/bloom/v1"
chunk_client "github.com/grafana/loki/pkg/storage/chunk/client"
Expand Down Expand Up @@ -343,26 +344,37 @@ func (c *Compactor) compactTenant(ctx context.Context, logger log.Logger, sc sto
bt, _ := v1.NewBloomTokenizer(c.reg, NGramLength, NGramSkip)

errs := multierror.New()
if err := sc.indexShipper.ForEach(ctx, tableName, tenant, func(isMultiTenantIndex bool, idx shipperindex.Index) error {
if isMultiTenantIndex { // TODO: handle multitenant tables
return fmt.Errorf("unexpected multi-tenant")
rs, err := c.sharding.GetTenantSubRing(tenant).GetAllHealthy(RingOp)
if err != nil {
return err
}
tokenRanges := bloomutils.GetInstanceWithTokenRange(c.cfg.Ring.InstanceID, rs.Instances)

_ = sc.indexShipper.ForEach(ctx, tableName, tenant, func(isMultiTenantIndex bool, idx shipperindex.Index) error {
if isMultiTenantIndex {
// Skip multi-tenant indexes
return nil
}

tsdbFile, ok := idx.(*tsdb.TSDBFile)
if !ok {
errs.Add(fmt.Errorf("failed to cast to TSDBFile"))
return nil
}

tsdbIndex, ok := tsdbFile.Index.(*tsdb.TSDBIndex)
if !ok {
errs.Add(fmt.Errorf("failed to cast to TSDBIndex"))
return nil
}

var seriesMetas []seriesMeta
// TODO: Make these casts safely
if err := idx.(*tsdb.TSDBFile).Index.(*tsdb.TSDBIndex).ForSeries(

err := tsdbIndex.ForSeries(
ctx, nil,
0, math.MaxInt64, // TODO: Replace with MaxLookBackPeriod
func(labels labels.Labels, fingerprint model.Fingerprint, chksMetas []tsdbindex.ChunkMeta) {
// TODO: Inefficient as is, calls the ring per fingerprint. Refactor to make the call once per compaction fingerprint bounds.
ownsFingerprint, err := c.sharding.OwnsFingerprint(tenant, uint64(fingerprint))

if err != nil {
level.Error(logger).Log("msg", "failed to check if compactor owns fp", "err", err)
errs.Add(err)
return
}
if !ownsFingerprint {
if !tokenRanges.Contains(uint32(fingerprint)) {
return
}

Expand All @@ -371,26 +383,26 @@ func (c *Compactor) compactTenant(ctx context.Context, logger log.Logger, sc sto
//All seriesMetas given a table within fp of this compactor shard
seriesMetas = append(seriesMetas, seriesMeta{seriesFP: fingerprint, seriesLbs: labels, chunkRefs: temp})
},
); err != nil {
)

if err != nil {
errs.Add(err)
return nil
}

job := NewJob(tenant, tableName, idx.Path(), seriesMetas)
jobLogger := log.With(logger, "job", job.String())
c.metrics.compactionRunJobStarted.Inc()

if err := c.runCompact(ctx, jobLogger, job, bt, sc); err != nil {
err = c.runCompact(ctx, jobLogger, job, bt, sc)
if err != nil {
c.metrics.compactionRunJobFailed.Inc()
errs.Add(errors.Wrap(err, "runBloomCompact failed"))
return errs.Err()
} else {
c.metrics.compactionRunJobSuceeded.Inc()
}

c.metrics.compactionRunJobSuceeded.Inc()

return nil
}); err != nil {
errs.Add(err)
}
})

return errs.Err()
}
Expand Down
92 changes: 20 additions & 72 deletions pkg/bloomgateway/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"google.golang.org/grpc"
"google.golang.org/grpc/health/grpc_health_v1"

"github.com/grafana/loki/pkg/bloomutils"
"github.com/grafana/loki/pkg/distributor/clientpool"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logqlmodel/stats"
Expand Down Expand Up @@ -283,39 +284,44 @@ func (c *GatewayClient) doForAddrs(addrs []string, fn func(logproto.BloomGateway
}

func (c *GatewayClient) groupFingerprintsByServer(groups []*logproto.GroupedChunkRefs, subRing ring.ReadRing, instances []ring.InstanceDesc) ([]instanceWithFingerprints, error) {
servers, err := serverAddressesWithTokenRanges(subRing, instances)
if err != nil {
return nil, err
}
boundedFingerprints := partitionFingerprintsByAddresses(groups, servers)
return groupByInstance(boundedFingerprints), nil
}

func serverAddressesWithTokenRanges(subRing ring.ReadRing, instances []ring.InstanceDesc) ([]addrsWithTokenRange, error) {
bufDescs, bufHosts, bufZones := ring.MakeBuffersForGet()

servers := make([]addrsWithTokenRange, 0, len(instances))
prev := -1
it := newInstanceSortMergeIterator(instances)
it := bloomutils.NewInstanceSortMergeIterator(instances)
for it.Next() {
// We can use on of the tokens from the token range
// to obtain all addresses for that token.
rs, err := subRing.Get(it.At().token, BlocksRead, bufDescs, bufHosts, bufZones)
rs, err := subRing.Get(it.At().MaxToken, BlocksRead, bufDescs, bufHosts, bufZones)
if err != nil {
return nil, errors.Wrap(err, "bloom gateway get ring")
}
servers = append(servers, addrsWithTokenRange{
minToken: uint32(prev + 1),
maxToken: it.At().token,
id: it.At().instance.Id,
id: it.At().Instance.Id,
addrs: rs.GetAddresses(),
minToken: it.At().MinToken,
maxToken: it.At().MaxToken,
})
prev = int(it.At().token)
}

if len(servers) > 0 {
if len(servers) > 0 && servers[len(servers)-1].maxToken < math.MaxUint32 {
// append the instance for the token range between the greates token and MaxUint32
servers = append(servers, addrsWithTokenRange{
minToken: uint32(prev),
maxToken: math.MaxUint32,
addrs: servers[0].addrs,
id: servers[0].id,
addrs: servers[0].addrs,
minToken: servers[len(servers)-1].maxToken + 1,
maxToken: math.MaxUint32,
})
}

boundedFingerprints := partitionFingerprintsByAddresses(groups, servers)
return groupByInstance(boundedFingerprints), nil
return servers, nil
}

type instanceWithToken struct {
Expand Down Expand Up @@ -401,61 +407,3 @@ func groupByInstance(boundedFingerprints []instanceWithFingerprints) []instanceW

return result
}

// newInstanceSortMergeIterator creates an iterator that yields instanceWithToken elements
// where the token of the elements are sorted in ascending order.
func newInstanceSortMergeIterator(instances []ring.InstanceDesc) v1.Iterator[instanceWithToken] {
it := &sortMergeIterator[ring.InstanceDesc, uint32, instanceWithToken]{
items: instances,
transform: func(item ring.InstanceDesc, val uint32) instanceWithToken {
return instanceWithToken{instance: item, token: val}
},
}
sequences := make([]v1.PeekingIterator[IndexedValue[uint32]], 0, len(instances))
for i := range instances {
sort.Slice(instances[i].Tokens, func(a, b int) bool {
return instances[i].Tokens[a] < instances[i].Tokens[b]
})
iter := NewIterWithIndex[uint32](v1.NewSliceIter(instances[i].Tokens), i)
sequences = append(sequences, v1.NewPeekingIter[IndexedValue[uint32]](iter))
}
it.heap = v1.NewHeapIterator(
func(i, j IndexedValue[uint32]) bool {
return i.val < j.val
},
sequences...,
)
it.err = nil

return it
}

// sortMergeIterator implements v1.Iterator
type sortMergeIterator[T any, C comparable, R any] struct {
curr R
heap *v1.HeapIterator[IndexedValue[C]]
items []T
transform func(T, C) R
err error
}

func (it *sortMergeIterator[T, C, R]) Next() bool {
ok := it.heap.Next()
if !ok {
it.err = io.EOF
return false
}

group := it.heap.At()
it.curr = it.transform(it.items[group.idx], group.val)

return true
}

func (it *sortMergeIterator[T, C, R]) At() R {
return it.curr
}

func (it *sortMergeIterator[T, C, R]) Err() error {
return it.err
}
92 changes: 55 additions & 37 deletions pkg/bloomgateway/client_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package bloomgateway

import (
"math"
"sort"
"testing"
"time"
Expand All @@ -11,12 +12,12 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/require"

"github.com/grafana/loki/pkg/bloomutils"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/validation"
)

func TestBloomGatewayClient(t *testing.T) {

logger := log.NewNopLogger()
reg := prometheus.NewRegistry()

Expand All @@ -32,33 +33,6 @@ func TestBloomGatewayClient(t *testing.T) {
})
}

func TestBloomGatewayClient_SortInstancesByToken(t *testing.T) {
input := []ring.InstanceDesc{
{Id: "1", Tokens: []uint32{6, 5, 2, 9}},
{Id: "2", Tokens: []uint32{3, 4, 7}},
{Id: "3", Tokens: []uint32{1, 8, 0}},
}
expected := []instanceWithToken{
{instance: input[2], token: 0},
{instance: input[2], token: 1},
{instance: input[0], token: 2},
{instance: input[1], token: 3},
{instance: input[1], token: 4},
{instance: input[0], token: 5},
{instance: input[0], token: 6},
{instance: input[1], token: 7},
{instance: input[2], token: 8},
{instance: input[0], token: 9},
}

var i int
it := newInstanceSortMergeIterator(input)
for it.Next() {
require.Equal(t, expected[i], it.At())
i++
}
}

func TestBloomGatewayClient_PartitionFingerprintsByAddresses(t *testing.T) {
// instance token ranges do not overlap
t.Run("non-overlapping", func(t *testing.T) {
Expand Down Expand Up @@ -183,6 +157,50 @@ func TestBloomGatewayClient_PartitionFingerprintsByAddresses(t *testing.T) {
})
}

func TestBloomGatewayClient_ServerAddressesWithTokenRanges(t *testing.T) {
testCases := map[string]struct {
instances []ring.InstanceDesc
expected []addrsWithTokenRange
}{
"one token per instance": {
instances: []ring.InstanceDesc{
{Id: "instance-1", Addr: "10.0.0.1", Tokens: []uint32{math.MaxUint32 / 6 * 1}},
{Id: "instance-2", Addr: "10.0.0.2", Tokens: []uint32{math.MaxUint32 / 6 * 3}},
{Id: "instance-3", Addr: "10.0.0.3", Tokens: []uint32{math.MaxUint32 / 6 * 5}},
},
expected: []addrsWithTokenRange{
{id: "instance-1", addrs: []string{"10.0.0.1"}, minToken: 0, maxToken: math.MaxUint32 / 6 * 1},
{id: "instance-2", addrs: []string{"10.0.0.2"}, minToken: math.MaxUint32/6*1 + 1, maxToken: math.MaxUint32 / 6 * 3},
{id: "instance-3", addrs: []string{"10.0.0.3"}, minToken: math.MaxUint32/6*3 + 1, maxToken: math.MaxUint32 / 6 * 5},
{id: "instance-1", addrs: []string{"10.0.0.1"}, minToken: math.MaxUint32/6*5 + 1, maxToken: math.MaxUint32},
},
},
"MinUint32 and MaxUint32 are tokens in the ring": {
instances: []ring.InstanceDesc{
{Id: "instance-1", Addr: "10.0.0.1", Tokens: []uint32{0, math.MaxUint32 / 3 * 2}},
{Id: "instance-2", Addr: "10.0.0.2", Tokens: []uint32{math.MaxUint32 / 3 * 1, math.MaxUint32}},
},
expected: []addrsWithTokenRange{
{id: "instance-1", addrs: []string{"10.0.0.1"}, minToken: 0, maxToken: 0},
{id: "instance-2", addrs: []string{"10.0.0.2"}, minToken: 1, maxToken: math.MaxUint32 / 3},
{id: "instance-1", addrs: []string{"10.0.0.1"}, minToken: math.MaxUint32/3*1 + 1, maxToken: math.MaxUint32 / 3 * 2},
{id: "instance-2", addrs: []string{"10.0.0.2"}, minToken: math.MaxUint32/3*2 + 1, maxToken: math.MaxUint32},
},
},
}

for name, tc := range testCases {
tc := tc
t.Run(name, func(t *testing.T) {
subRing := newMockRing(tc.instances)
res, err := serverAddressesWithTokenRanges(subRing, tc.instances)
require.NoError(t, err)
require.Equal(t, tc.expected, res)
})
}

}

func TestBloomGatewayClient_GroupFingerprintsByServer(t *testing.T) {

logger := log.NewNopLogger()
Expand All @@ -203,9 +221,9 @@ func TestBloomGatewayClient_GroupFingerprintsByServer(t *testing.T) {
{Id: "instance-3", Addr: "10.0.0.3", Tokens: []uint32{2014002871, 315617625, 1036168527}},
}

it := newInstanceSortMergeIterator(instances)
it := bloomutils.NewInstanceSortMergeIterator(instances)
for it.Next() {
t.Log(it.At().token, it.At().instance.Addr)
t.Log(it.At().MaxToken, it.At().Instance.Addr)
}

testCases := []struct {
Expand Down Expand Up @@ -257,7 +275,7 @@ func TestBloomGatewayClient_GroupFingerprintsByServer(t *testing.T) {
},
},
{
name: "fingerprints with token ranges of a multiple instance are grouped",
name: "fingerprints with token ranges of multiple instances are grouped",
chunks: []*logproto.GroupedChunkRefs{
// instance 1
{Fingerprint: 1000000000, Refs: []*logproto.ShortRef{{Checksum: 1}}},
Expand Down Expand Up @@ -327,8 +345,8 @@ func TestBloomGatewayClient_GroupFingerprintsByServer(t *testing.T) {
var _ ring.ReadRing = &mockRing{}

func newMockRing(instances []ring.InstanceDesc) *mockRing {
it := newInstanceSortMergeIterator(instances)
ranges := make([]instanceWithToken, 0)
it := bloomutils.NewInstanceSortMergeIterator(instances)
ranges := make([]bloomutils.InstanceWithTokenRange, 0)
for it.Next() {
ranges = append(ranges, it.At())
}
Expand All @@ -340,21 +358,21 @@ func newMockRing(instances []ring.InstanceDesc) *mockRing {

type mockRing struct {
instances []ring.InstanceDesc
ranges []instanceWithToken
ranges []bloomutils.InstanceWithTokenRange
}

// Get implements ring.ReadRing.
func (r *mockRing) Get(key uint32, _ ring.Operation, _ []ring.InstanceDesc, _ []string, _ []string) (ring.ReplicationSet, error) {
idx, _ := sort.Find(len(r.ranges), func(i int) int {
if r.ranges[i].token < key {
if r.ranges[i].MaxToken < key {
return 1
}
if r.ranges[i].token > key {
if r.ranges[i].MaxToken > key {
return -1
}
return 0
})
return ring.ReplicationSet{Instances: []ring.InstanceDesc{r.ranges[idx].instance}}, nil
return ring.ReplicationSet{Instances: []ring.InstanceDesc{r.ranges[idx].Instance}}, nil
}

// GetAllHealthy implements ring.ReadRing.
Expand Down
Loading

0 comments on commit fa42e68

Please sign in to comment.