Skip to content

Commit

Permalink
Bloom shipper: Restructure bloom store (#11828)
Browse files Browse the repository at this point in the history
**What this PR does / why we need it**:

This PR changes the structure of how the bloom client

* The `BloomStore` is the top component interface which implements the
`Store` and the `Client` interfaces. The store holds a store entry for
each schema period.
* The `bloomStoreEntry` implements the `Store` and `Client` interfaces.
It holds a bloom client for a single schema period. Additionally, the
store entry also exposes a fetcher for metas (and in the future for
blocks), which is responsible for getting data from cache or storage (if
not available in cache).
* The `BloomClient` implement the `Client` interface. The bloom client
uses an object client for the schema period of the bloom client.
* The `Fetcher` can fetch and cache metas.

This structure is very similar to what we use for the chunk store.

**Note**

Before implementing `FetchBlocks()` in the `BloomStore`, I want to
implement the new `FingerprintBounds` type also in the shipper and bloom
gateway code to be consistent across components, as well as to make use
of the new utilities exposed by them.

The store implementation probably needs some specific test cases around
using the correct store entry for different schema period configs. The
code however is mostly taken from the chunk store implementation.

---------

Signed-off-by: Christian Haudum <[email protected]>
  • Loading branch information
chaudum authored Jan 31, 2024
1 parent 5675bae commit 6902130
Show file tree
Hide file tree
Showing 10 changed files with 899 additions and 214 deletions.
20 changes: 16 additions & 4 deletions pkg/bloomcompactor/bloomcompactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ package bloomcompactor
import (
"context"
"fmt"
"io"
"math"
"math/rand"
"os"
Expand All @@ -50,6 +51,7 @@ import (
"github.com/grafana/loki/pkg/bloomutils"
"github.com/grafana/loki/pkg/storage"
v1 "github.com/grafana/loki/pkg/storage/bloom/v1"
"github.com/grafana/loki/pkg/storage/chunk/cache"
chunk_client "github.com/grafana/loki/pkg/storage/chunk/client"
"github.com/grafana/loki/pkg/storage/chunk/client/local"
"github.com/grafana/loki/pkg/storage/config"
Expand All @@ -71,7 +73,7 @@ type Compactor struct {
limits Limits

// temporary workaround until store has implemented read/write shipper interface
bloomShipperClient bloomshipper.Client
bloomShipperClient bloomshipper.StoreAndClient

// Client used to run operations on the bucket storing bloom blocks.
storeClients map[config.DayTime]storeClient
Expand Down Expand Up @@ -109,8 +111,10 @@ func New(
reg: r,
}

// Configure BloomClient for meta.json management
bloomClient, err := bloomshipper.NewBloomClient(schemaConfig.Configs, storageCfg, clientMetrics)
// TODO(chaudum): Plug in cache
var metasCache cache.Cache
var blocksCache *cache.EmbeddedCache[string, io.ReadCloser]
bloomClient, err := bloomshipper.NewBloomStore(schemaConfig.Configs, storageCfg, clientMetrics, metasCache, blocksCache, logger)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -506,11 +510,19 @@ func (c *Compactor) runCompact(ctx context.Context, logger log.Logger, job Job,
//TODO Configure pool for these to avoid allocations
var activeBloomBlocksRefs []bloomshipper.BlockRef

metas, err := c.bloomShipperClient.GetMetas(ctx, metaSearchParams)
metaRefs, fetchers, err := c.bloomShipperClient.ResolveMetas(ctx, metaSearchParams)
if err != nil {
return err
}

for i := range fetchers {
res, err := fetchers[i].FetchMetas(ctx, metaRefs[i])
if err != nil {
return err
}
metas = append(metas, res...)
}

// TODO This logic currently is NOT concerned with cutting blocks upon topology changes to bloom-compactors.
// It may create blocks with series outside of the fp range of the compactor. Cutting blocks will be addressed in a follow-up PR.
metasMatchingJob, blocksMatchingJob := matchingBlocks(metas, job)
Expand Down
9 changes: 7 additions & 2 deletions pkg/bloomgateway/bloomgateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ package bloomgateway
import (
"context"
"fmt"
"io"
"sort"
"sync"
"time"
Expand All @@ -59,6 +60,7 @@ import (
"github.com/grafana/loki/pkg/queue"
"github.com/grafana/loki/pkg/storage"
v1 "github.com/grafana/loki/pkg/storage/bloom/v1"
"github.com/grafana/loki/pkg/storage/chunk/cache"
"github.com/grafana/loki/pkg/storage/config"
"github.com/grafana/loki/pkg/storage/stores/shipper/bloomshipper"
"github.com/grafana/loki/pkg/util"
Expand Down Expand Up @@ -209,12 +211,15 @@ func New(cfg Config, schemaCfg config.SchemaConfig, storageCfg storage.Config, o
g.queue = queue.NewRequestQueue(cfg.MaxOutstandingPerTenant, time.Minute, &fixedQueueLimits{0}, g.queueMetrics)
g.activeUsers = util.NewActiveUsersCleanupWithDefaultValues(g.queueMetrics.Cleanup)

client, err := bloomshipper.NewBloomClient(schemaCfg.Configs, storageCfg, cm)
// TODO(chaudum): Plug in cache
var metasCache cache.Cache
var blocksCache *cache.EmbeddedCache[string, io.ReadCloser]
store, err := bloomshipper.NewBloomStore(schemaCfg.Configs, storageCfg, cm, metasCache, blocksCache, logger)
if err != nil {
return nil, err
}

bloomShipper, err := bloomshipper.NewShipper(client, storageCfg.BloomShipperConfig, overrides, logger, reg)
bloomShipper, err := bloomshipper.NewShipper(store, storageCfg.BloomShipperConfig, overrides, logger, reg)
if err != nil {
return nil, err
}
Expand Down
8 changes: 2 additions & 6 deletions pkg/bloomgateway/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,13 @@ type tasksForBlock struct {
tasks []Task
}

type metaLoader interface {
LoadMetas(context.Context, bloomshipper.MetaSearchParams) ([]bloomshipper.Meta, error)
}

type blockLoader interface {
LoadBlocks(context.Context, []bloomshipper.BlockRef) (v1.Iterator[bloomshipper.BlockQuerierWithFingerprintRange], error)
}

type store interface {
blockLoader
metaLoader
bloomshipper.Store
}

type processor struct {
Expand Down Expand Up @@ -63,7 +59,7 @@ func (p *processor) processTasks(ctx context.Context, tenant string, interval bl
Interval: interval,
Keyspace: bloomshipper.Keyspace{Min: minFpRange.Min, Max: maxFpRange.Max},
}
metas, err := p.store.LoadMetas(ctx, metaSearch)
metas, err := p.store.FetchMetas(ctx, metaSearch)
if err != nil {
return err
}
Expand Down
18 changes: 17 additions & 1 deletion pkg/bloomgateway/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,27 @@ type dummyStore struct {
querieres []bloomshipper.BlockQuerierWithFingerprintRange
}

func (s *dummyStore) LoadMetas(_ context.Context, _ bloomshipper.MetaSearchParams) ([]bloomshipper.Meta, error) {
func (s *dummyStore) ResolveMetas(_ context.Context, _ bloomshipper.MetaSearchParams) ([][]bloomshipper.MetaRef, []*bloomshipper.Fetcher, error) {
//TODO(chaudum) Filter metas based on search params
refs := make([]bloomshipper.MetaRef, 0, len(s.metas))
for _, meta := range s.metas {
refs = append(refs, meta.MetaRef)
}
return [][]bloomshipper.MetaRef{refs}, []*bloomshipper.Fetcher{nil}, nil
}

func (s *dummyStore) FetchMetas(_ context.Context, _ bloomshipper.MetaSearchParams) ([]bloomshipper.Meta, error) {
//TODO(chaudum) Filter metas based on search params
return s.metas, nil
}

func (s *dummyStore) Fetcher(_ model.Time) *bloomshipper.Fetcher {
return nil
}

func (s *dummyStore) Stop() {
}

func (s *dummyStore) LoadBlocks(_ context.Context, refs []bloomshipper.BlockRef) (v1.Iterator[bloomshipper.BlockQuerierWithFingerprintRange], error) {
result := make([]bloomshipper.BlockQuerierWithFingerprintRange, len(s.querieres))

Expand Down
Loading

0 comments on commit 6902130

Please sign in to comment.