Skip to content

Commit

Permalink
Extend BloomStore with Client(model.Time) function (#11881)
Browse files Browse the repository at this point in the history
**What this PR does / why we need it**:

For certain operations, we want direct access to the `BloomClient`.
Since every schema period has its own client, the `Client()` function
accepts a `model.Timestamp` for which the client should be returned.

The function may return an error, if no client for the given time could
be found.

---------

Signed-off-by: Christian Haudum <[email protected]>
  • Loading branch information
chaudum authored Feb 6, 2024
1 parent 69d152b commit 7bbbf23
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 8 deletions.
8 changes: 6 additions & 2 deletions pkg/bloomgateway/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,12 @@ func (s *dummyStore) FetchBlocks(_ context.Context, _ []bloomshipper.BlockRef) (
panic("don't call me")
}

func (s *dummyStore) Fetcher(_ model.Time) *bloomshipper.Fetcher {
return nil
func (s *dummyStore) Fetcher(_ model.Time) (*bloomshipper.Fetcher, error) {
return nil, nil
}

func (s *dummyStore) Client(_ model.Time) (bloomshipper.Client, error) {
return nil, nil
}

func (s *dummyStore) Stop() {
Expand Down
31 changes: 25 additions & 6 deletions pkg/storage/stores/shipper/bloomshipper/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,16 @@ import (
"github.com/grafana/loki/pkg/storage/config"
)

var (
errNoStore = errors.New("no store found for time")
)

type Store interface {
ResolveMetas(ctx context.Context, params MetaSearchParams) ([][]MetaRef, []*Fetcher, error)
FetchMetas(ctx context.Context, params MetaSearchParams) ([]Meta, error)
FetchBlocks(ctx context.Context, refs []BlockRef) ([]BlockDirectory, error)
Fetcher(ts model.Time) *Fetcher
Fetcher(ts model.Time) (*Fetcher, error)
Client(ts model.Time) (Client, error)
Stop()
}

Expand Down Expand Up @@ -112,8 +117,13 @@ func (b *bloomStoreEntry) FetchBlocks(ctx context.Context, refs []BlockRef) ([]B
}

// Fetcher implements Store.
func (b *bloomStoreEntry) Fetcher(_ model.Time) *Fetcher {
return b.fetcher
func (b *bloomStoreEntry) Fetcher(_ model.Time) (*Fetcher, error) {
return b.fetcher, nil
}

// Client implements Store.
func (b *bloomStoreEntry) Client(_ model.Time) (Client, error) {
return b.bloomClient, nil
}

// Stop implements Store.
Expand Down Expand Up @@ -219,11 +229,19 @@ func (b *BloomStore) Block(ref BlockRef) (loc Location) {
}

// Fetcher implements Store.
func (b *BloomStore) Fetcher(ts model.Time) *Fetcher {
func (b *BloomStore) Fetcher(ts model.Time) (*Fetcher, error) {
if store := b.getStore(ts); store != nil {
return store.Fetcher(ts)
}
return nil
return nil, errNoStore
}

// Client implements Store.
func (b *BloomStore) Client(ts model.Time) (Client, error) {
if store := b.getStore(ts); store != nil {
return store.Client(ts)
}
return nil, errNoStore
}

// ResolveMetas implements Store.
Expand Down Expand Up @@ -294,7 +312,7 @@ func (b *BloomStore) FetchBlocks(ctx context.Context, blocks []BlockRef) ([]Bloc

if len(res) > 0 {
refs = append(refs, res)
fetchers = append(fetchers, s.Fetcher(s.start))
fetchers = append(fetchers, s.fetcher)
}
}

Expand Down Expand Up @@ -341,6 +359,7 @@ func (b *BloomStore) getStore(ts model.Time) *bloomStoreEntry {
return b.stores[j]
}

// should in theory never happen
return nil
}

Expand Down

0 comments on commit 7bbbf23

Please sign in to comment.