Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bloom Compactor: Optimize check for fingerprint ownership #11389

Merged
merged 8 commits into from
Dec 12, 2023
58 changes: 35 additions & 23 deletions pkg/bloomcompactor/bloomcompactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"

"github.com/grafana/loki/pkg/bloomutils"
"github.com/grafana/loki/pkg/storage"
v1 "github.com/grafana/loki/pkg/storage/bloom/v1"
chunk_client "github.com/grafana/loki/pkg/storage/chunk/client"
Expand Down Expand Up @@ -343,26 +344,37 @@ func (c *Compactor) compactTenant(ctx context.Context, logger log.Logger, sc sto
bt, _ := v1.NewBloomTokenizer(c.reg, NGramLength, NGramSkip)

errs := multierror.New()
if err := sc.indexShipper.ForEach(ctx, tableName, tenant, func(isMultiTenantIndex bool, idx shipperindex.Index) error {
if isMultiTenantIndex { // TODO: handle multitenant tables
return fmt.Errorf("unexpected multi-tenant")
rs, err := c.sharding.GetTenantSubRing(tenant).GetAllHealthy(RingOp)
if err != nil {
return err
}
tokenRanges := bloomutils.GetInstanceWithTokenRange(c.cfg.Ring.InstanceID, rs.Instances)

_ = sc.indexShipper.ForEach(ctx, tableName, tenant, func(isMultiTenantIndex bool, idx shipperindex.Index) error {
if isMultiTenantIndex {
// Skip multi-tenant indexes
return nil
}

tsdbFile, ok := idx.(*tsdb.TSDBFile)
if !ok {
errs.Add(fmt.Errorf("failed to cast to TSDBFile"))
return nil
}

tsdbIndex, ok := tsdbFile.Index.(*tsdb.TSDBIndex)
if !ok {
errs.Add(fmt.Errorf("failed to cast to TSDBIndex"))
return nil
}

var seriesMetas []seriesMeta
// TODO: Make these casts safely
if err := idx.(*tsdb.TSDBFile).Index.(*tsdb.TSDBIndex).ForSeries(

err := tsdbIndex.ForSeries(
ctx, nil,
0, math.MaxInt64, // TODO: Replace with MaxLookBackPeriod
func(labels labels.Labels, fingerprint model.Fingerprint, chksMetas []tsdbindex.ChunkMeta) {
// TODO: Inefficient as is, calls the ring per fingerprint. Refactor to make the call once per compaction fingerprint bounds.
ownsFingerprint, err := c.sharding.OwnsFingerprint(tenant, uint64(fingerprint))

if err != nil {
level.Error(logger).Log("msg", "failed to check if compactor owns fp", "err", err)
errs.Add(err)
return
}
if !ownsFingerprint {
if !tokenRanges.Contains(uint32(fingerprint)) {
return
}

Expand All @@ -371,26 +383,26 @@ func (c *Compactor) compactTenant(ctx context.Context, logger log.Logger, sc sto
//All seriesMetas given a table within fp of this compactor shard
seriesMetas = append(seriesMetas, seriesMeta{seriesFP: fingerprint, seriesLbs: labels, chunkRefs: temp})
},
); err != nil {
)

if err != nil {
errs.Add(err)
return nil
}

job := NewJob(tenant, tableName, idx.Path(), seriesMetas)
jobLogger := log.With(logger, "job", job.String())
c.metrics.compactionRunJobStarted.Inc()

if err := c.runCompact(ctx, jobLogger, job, bt, sc); err != nil {
err = c.runCompact(ctx, jobLogger, job, bt, sc)
if err != nil {
c.metrics.compactionRunJobFailed.Inc()
errs.Add(errors.Wrap(err, "runBloomCompact failed"))
return errs.Err()
} else {
c.metrics.compactionRunJobSuceeded.Inc()
}

c.metrics.compactionRunJobSuceeded.Inc()

return nil
}); err != nil {
errs.Add(err)
}
})

return errs.Err()
}
Expand Down
92 changes: 20 additions & 72 deletions pkg/bloomgateway/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"google.golang.org/grpc"
"google.golang.org/grpc/health/grpc_health_v1"

"github.com/grafana/loki/pkg/bloomutils"
"github.com/grafana/loki/pkg/distributor/clientpool"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logqlmodel/stats"
Expand Down Expand Up @@ -283,39 +284,44 @@ func (c *GatewayClient) doForAddrs(addrs []string, fn func(logproto.BloomGateway
}

func (c *GatewayClient) groupFingerprintsByServer(groups []*logproto.GroupedChunkRefs, subRing ring.ReadRing, instances []ring.InstanceDesc) ([]instanceWithFingerprints, error) {
servers, err := serverAddressesWithTokenRanges(subRing, instances)
if err != nil {
return nil, err
}
boundedFingerprints := partitionFingerprintsByAddresses(groups, servers)
return groupByInstance(boundedFingerprints), nil
}

func serverAddressesWithTokenRanges(subRing ring.ReadRing, instances []ring.InstanceDesc) ([]addrsWithTokenRange, error) {
bufDescs, bufHosts, bufZones := ring.MakeBuffersForGet()

servers := make([]addrsWithTokenRange, 0, len(instances))
prev := -1
it := newInstanceSortMergeIterator(instances)
it := bloomutils.NewInstanceSortMergeIterator(instances)
for it.Next() {
// We can use on of the tokens from the token range
// to obtain all addresses for that token.
rs, err := subRing.Get(it.At().token, BlocksRead, bufDescs, bufHosts, bufZones)
rs, err := subRing.Get(it.At().MaxToken, BlocksRead, bufDescs, bufHosts, bufZones)
if err != nil {
return nil, errors.Wrap(err, "bloom gateway get ring")
}
servers = append(servers, addrsWithTokenRange{
minToken: uint32(prev + 1),
maxToken: it.At().token,
id: it.At().instance.Id,
id: it.At().Instance.Id,
addrs: rs.GetAddresses(),
minToken: it.At().MinToken,
maxToken: it.At().MaxToken,
})
prev = int(it.At().token)
}

if len(servers) > 0 {
if len(servers) > 0 && servers[len(servers)-1].maxToken < math.MaxUint32 {
// append the instance for the token range between the greates token and MaxUint32
servers = append(servers, addrsWithTokenRange{
minToken: uint32(prev),
maxToken: math.MaxUint32,
addrs: servers[0].addrs,
id: servers[0].id,
addrs: servers[0].addrs,
minToken: servers[len(servers)-1].maxToken + 1,
maxToken: math.MaxUint32,
})
}

boundedFingerprints := partitionFingerprintsByAddresses(groups, servers)
return groupByInstance(boundedFingerprints), nil
return servers, nil
}

type instanceWithToken struct {
Expand Down Expand Up @@ -401,61 +407,3 @@ func groupByInstance(boundedFingerprints []instanceWithFingerprints) []instanceW

return result
}

// newInstanceSortMergeIterator creates an iterator that yields instanceWithToken elements
// where the token of the elements are sorted in ascending order.
func newInstanceSortMergeIterator(instances []ring.InstanceDesc) v1.Iterator[instanceWithToken] {
it := &sortMergeIterator[ring.InstanceDesc, uint32, instanceWithToken]{
items: instances,
transform: func(item ring.InstanceDesc, val uint32) instanceWithToken {
return instanceWithToken{instance: item, token: val}
},
}
sequences := make([]v1.PeekingIterator[IndexedValue[uint32]], 0, len(instances))
for i := range instances {
sort.Slice(instances[i].Tokens, func(a, b int) bool {
return instances[i].Tokens[a] < instances[i].Tokens[b]
})
iter := NewIterWithIndex[uint32](v1.NewSliceIter(instances[i].Tokens), i)
sequences = append(sequences, v1.NewPeekingIter[IndexedValue[uint32]](iter))
}
it.heap = v1.NewHeapIterator(
func(i, j IndexedValue[uint32]) bool {
return i.val < j.val
},
sequences...,
)
it.err = nil

return it
}

// sortMergeIterator implements v1.Iterator
type sortMergeIterator[T any, C comparable, R any] struct {
curr R
heap *v1.HeapIterator[IndexedValue[C]]
items []T
transform func(T, C) R
err error
}

func (it *sortMergeIterator[T, C, R]) Next() bool {
ok := it.heap.Next()
if !ok {
it.err = io.EOF
return false
}

group := it.heap.At()
it.curr = it.transform(it.items[group.idx], group.val)

return true
}

func (it *sortMergeIterator[T, C, R]) At() R {
return it.curr
}

func (it *sortMergeIterator[T, C, R]) Err() error {
return it.err
}
92 changes: 55 additions & 37 deletions pkg/bloomgateway/client_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package bloomgateway

import (
"math"
"sort"
"testing"
"time"
Expand All @@ -11,12 +12,12 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/require"

"github.com/grafana/loki/pkg/bloomutils"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/validation"
)

func TestBloomGatewayClient(t *testing.T) {

logger := log.NewNopLogger()
reg := prometheus.NewRegistry()

Expand All @@ -32,33 +33,6 @@ func TestBloomGatewayClient(t *testing.T) {
})
}

func TestBloomGatewayClient_SortInstancesByToken(t *testing.T) {
input := []ring.InstanceDesc{
{Id: "1", Tokens: []uint32{6, 5, 2, 9}},
{Id: "2", Tokens: []uint32{3, 4, 7}},
{Id: "3", Tokens: []uint32{1, 8, 0}},
}
expected := []instanceWithToken{
{instance: input[2], token: 0},
{instance: input[2], token: 1},
{instance: input[0], token: 2},
{instance: input[1], token: 3},
{instance: input[1], token: 4},
{instance: input[0], token: 5},
{instance: input[0], token: 6},
{instance: input[1], token: 7},
{instance: input[2], token: 8},
{instance: input[0], token: 9},
}

var i int
it := newInstanceSortMergeIterator(input)
for it.Next() {
require.Equal(t, expected[i], it.At())
i++
}
}

func TestBloomGatewayClient_PartitionFingerprintsByAddresses(t *testing.T) {
// instance token ranges do not overlap
t.Run("non-overlapping", func(t *testing.T) {
Expand Down Expand Up @@ -183,6 +157,50 @@ func TestBloomGatewayClient_PartitionFingerprintsByAddresses(t *testing.T) {
})
}

func TestBloomGatewayClient_ServerAddressesWithTokenRanges(t *testing.T) {
testCases := map[string]struct {
instances []ring.InstanceDesc
expected []addrsWithTokenRange
}{
"one token per instance": {
instances: []ring.InstanceDesc{
{Id: "instance-1", Addr: "10.0.0.1", Tokens: []uint32{math.MaxUint32 / 6 * 1}},
{Id: "instance-2", Addr: "10.0.0.2", Tokens: []uint32{math.MaxUint32 / 6 * 3}},
{Id: "instance-3", Addr: "10.0.0.3", Tokens: []uint32{math.MaxUint32 / 6 * 5}},
},
expected: []addrsWithTokenRange{
{id: "instance-1", addrs: []string{"10.0.0.1"}, minToken: 0, maxToken: math.MaxUint32 / 6 * 1},
{id: "instance-2", addrs: []string{"10.0.0.2"}, minToken: math.MaxUint32/6*1 + 1, maxToken: math.MaxUint32 / 6 * 3},
{id: "instance-3", addrs: []string{"10.0.0.3"}, minToken: math.MaxUint32/6*3 + 1, maxToken: math.MaxUint32 / 6 * 5},
{id: "instance-1", addrs: []string{"10.0.0.1"}, minToken: math.MaxUint32/6*5 + 1, maxToken: math.MaxUint32},
},
},
"MinUint32 and MaxUint32 are tokens in the ring": {
instances: []ring.InstanceDesc{
{Id: "instance-1", Addr: "10.0.0.1", Tokens: []uint32{0, math.MaxUint32 / 3 * 2}},
{Id: "instance-2", Addr: "10.0.0.2", Tokens: []uint32{math.MaxUint32 / 3 * 1, math.MaxUint32}},
},
expected: []addrsWithTokenRange{
{id: "instance-1", addrs: []string{"10.0.0.1"}, minToken: 0, maxToken: 0},
{id: "instance-2", addrs: []string{"10.0.0.2"}, minToken: 1, maxToken: math.MaxUint32 / 3},
{id: "instance-1", addrs: []string{"10.0.0.1"}, minToken: math.MaxUint32/3*1 + 1, maxToken: math.MaxUint32 / 3 * 2},
{id: "instance-2", addrs: []string{"10.0.0.2"}, minToken: math.MaxUint32/3*2 + 1, maxToken: math.MaxUint32},
},
},
}

for name, tc := range testCases {
tc := tc
t.Run(name, func(t *testing.T) {
subRing := newMockRing(tc.instances)
res, err := serverAddressesWithTokenRanges(subRing, tc.instances)
require.NoError(t, err)
require.Equal(t, tc.expected, res)
})
}

}

func TestBloomGatewayClient_GroupFingerprintsByServer(t *testing.T) {

logger := log.NewNopLogger()
Expand All @@ -203,9 +221,9 @@ func TestBloomGatewayClient_GroupFingerprintsByServer(t *testing.T) {
{Id: "instance-3", Addr: "10.0.0.3", Tokens: []uint32{2014002871, 315617625, 1036168527}},
}

it := newInstanceSortMergeIterator(instances)
it := bloomutils.NewInstanceSortMergeIterator(instances)
for it.Next() {
t.Log(it.At().token, it.At().instance.Addr)
t.Log(it.At().MaxToken, it.At().Instance.Addr)
}

testCases := []struct {
Expand Down Expand Up @@ -257,7 +275,7 @@ func TestBloomGatewayClient_GroupFingerprintsByServer(t *testing.T) {
},
},
{
name: "fingerprints with token ranges of a multiple instance are grouped",
name: "fingerprints with token ranges of multiple instances are grouped",
chunks: []*logproto.GroupedChunkRefs{
// instance 1
{Fingerprint: 1000000000, Refs: []*logproto.ShortRef{{Checksum: 1}}},
Expand Down Expand Up @@ -327,8 +345,8 @@ func TestBloomGatewayClient_GroupFingerprintsByServer(t *testing.T) {
var _ ring.ReadRing = &mockRing{}

func newMockRing(instances []ring.InstanceDesc) *mockRing {
it := newInstanceSortMergeIterator(instances)
ranges := make([]instanceWithToken, 0)
it := bloomutils.NewInstanceSortMergeIterator(instances)
ranges := make([]bloomutils.InstanceWithTokenRange, 0)
for it.Next() {
ranges = append(ranges, it.At())
}
Expand All @@ -340,21 +358,21 @@ func newMockRing(instances []ring.InstanceDesc) *mockRing {

type mockRing struct {
instances []ring.InstanceDesc
ranges []instanceWithToken
ranges []bloomutils.InstanceWithTokenRange
}

// Get implements ring.ReadRing.
func (r *mockRing) Get(key uint32, _ ring.Operation, _ []ring.InstanceDesc, _ []string, _ []string) (ring.ReplicationSet, error) {
idx, _ := sort.Find(len(r.ranges), func(i int) int {
if r.ranges[i].token < key {
if r.ranges[i].MaxToken < key {
return 1
}
if r.ranges[i].token > key {
if r.ranges[i].MaxToken > key {
return -1
}
return 0
})
return ring.ReplicationSet{Instances: []ring.InstanceDesc{r.ranges[idx].instance}}, nil
return ring.ReplicationSet{Instances: []ring.InstanceDesc{r.ranges[idx].Instance}}, nil
}

// GetAllHealthy implements ring.ReadRing.
Expand Down
Loading
Loading