Skip to content

Commit

Permalink
fix: Read all series in NewTSDBSeriesIter and close index file (grafa…
Browse files Browse the repository at this point in the history
  • Loading branch information
salvacorts authored Mar 14, 2024
1 parent 1331dc5 commit 4b28f82
Show file tree
Hide file tree
Showing 5 changed files with 78 additions and 96 deletions.
2 changes: 1 addition & 1 deletion pkg/bloomcompactor/bloomcompactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func New(
bloomStore: store,
}

tsdbStore, err := NewTSDBStores(schemaCfg, storeCfg, clientMetrics)
tsdbStore, err := NewTSDBStores(schemaCfg, storeCfg, clientMetrics, logger)
if err != nil {
return nil, errors.Wrap(err, "failed to create TSDB store")
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/bloomcompactor/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ func (s *SimpleBloomController) loadWorkForGap(
tenant string,
id tsdb.Identifier,
gap gapWithBlocks,
) (v1.CloseableIterator[*v1.Series], v1.CloseableResettableIterator[*v1.SeriesWithBloom], error) {
) (v1.Iterator[*v1.Series], v1.CloseableResettableIterator[*v1.SeriesWithBloom], error) {
// load a series iterator for the gap
seriesItr, err := s.tsdbStore.LoadTSDB(ctx, table, tenant, id, gap.bounds)
if err != nil {
Expand Down
129 changes: 43 additions & 86 deletions pkg/bloomcompactor/tsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@ import (
"math"
"path"
"strings"
"sync"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/pkg/errors"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
Expand All @@ -35,18 +36,20 @@ type TSDBStore interface {
tenant string,
id tsdb.Identifier,
bounds v1.FingerprintBounds,
) (v1.CloseableIterator[*v1.Series], error)
) (v1.Iterator[*v1.Series], error)
}

// BloomTSDBStore is a wrapper around the storage.Client interface which
// implements the TSDBStore interface for this pkg.
type BloomTSDBStore struct {
storage storage.Client
logger log.Logger
}

func NewBloomTSDBStore(storage storage.Client) *BloomTSDBStore {
func NewBloomTSDBStore(storage storage.Client, logger log.Logger) *BloomTSDBStore {
return &BloomTSDBStore{
storage: storage,
logger: logger,
}
}

Expand Down Expand Up @@ -85,7 +88,7 @@ func (b *BloomTSDBStore) LoadTSDB(
tenant string,
id tsdb.Identifier,
bounds v1.FingerprintBounds,
) (v1.CloseableIterator[*v1.Series], error) {
) (v1.Iterator[*v1.Series], error) {
withCompression := id.Name() + gzipExtension

data, err := b.storage.GetUserFile(ctx, table.Addr(), tenant, withCompression)
Expand All @@ -112,8 +115,13 @@ func (b *BloomTSDBStore) LoadTSDB(
}

idx := tsdb.NewTSDBIndex(reader)
defer func() {
if err := idx.Close(); err != nil {
level.Error(b.logger).Log("msg", "failed to close index", "err", err)
}
}()

return NewTSDBSeriesIter(ctx, idx, bounds), nil
return NewTSDBSeriesIter(ctx, idx, bounds)
}

// TSDBStore is an interface for interacting with the TSDB,
Expand All @@ -127,74 +135,21 @@ type forSeries interface {
fn func(labels.Labels, model.Fingerprint, []index.ChunkMeta),
matchers ...*labels.Matcher,
) error
Close() error
}

type TSDBSeriesIter struct {
mtx sync.Mutex
f forSeries
bounds v1.FingerprintBounds
ctx context.Context

ch chan *v1.Series
initialized bool
next *v1.Series
err error
}

func NewTSDBSeriesIter(ctx context.Context, f forSeries, bounds v1.FingerprintBounds) *TSDBSeriesIter {
return &TSDBSeriesIter{
f: f,
bounds: bounds,
ctx: ctx,
ch: make(chan *v1.Series),
}
}

func (t *TSDBSeriesIter) Next() bool {
if !t.initialized {
t.initialized = true
t.background()
}

select {
case <-t.ctx.Done():
return false
case next, ok := <-t.ch:
t.next = next
return ok
}
}

func (t *TSDBSeriesIter) At() *v1.Series {
return t.next
}

func (t *TSDBSeriesIter) Err() error {
t.mtx.Lock()
defer t.mtx.Unlock()

if t.err != nil {
return t.err
}

return t.ctx.Err()
}

func (t *TSDBSeriesIter) Close() error {
return t.f.Close()
}

// background iterates over the tsdb file, populating the next
// value via a channel to handle backpressure
func (t *TSDBSeriesIter) background() {
go func() {
err := t.f.ForSeries(
t.ctx,
t.bounds,
0, math.MaxInt64,
func(_ labels.Labels, fp model.Fingerprint, chks []index.ChunkMeta) {

func NewTSDBSeriesIter(ctx context.Context, f forSeries, bounds v1.FingerprintBounds) (v1.Iterator[*v1.Series], error) {
// TODO(salvacorts): Create a pool
series := make([]*v1.Series, 0, 100)

if err := f.ForSeries(
ctx,
bounds,
0, math.MaxInt64,
func(_ labels.Labels, fp model.Fingerprint, chks []index.ChunkMeta) {
select {
case <-ctx.Done():
return
default:
res := &v1.Series{
Fingerprint: fp,
Chunks: make(v1.ChunkRefs, 0, len(chks)),
Expand All @@ -207,19 +162,20 @@ func (t *TSDBSeriesIter) background() {
})
}

select {
case <-t.ctx.Done():
return
case t.ch <- res:
}
},
labels.MustNewMatcher(labels.MatchEqual, "", ""),
)
t.mtx.Lock()
t.err = err
t.mtx.Unlock()
close(t.ch)
}()
series = append(series, res)
}
},
labels.MustNewMatcher(labels.MatchEqual, "", ""),
); err != nil {
return nil, err
}

select {
case <-ctx.Done():
return v1.NewEmptyIter[*v1.Series](), ctx.Err()
default:
return v1.NewCancelableIter[*v1.Series](ctx, v1.NewSliceIter[*v1.Series](series)), nil
}
}

type TSDBStores struct {
Expand All @@ -231,6 +187,7 @@ func NewTSDBStores(
schemaCfg config.SchemaConfig,
storeCfg baseStore.Config,
clientMetrics baseStore.ClientMetrics,
logger log.Logger,
) (*TSDBStores, error) {
res := &TSDBStores{
schemaCfg: schemaCfg,
Expand All @@ -244,7 +201,7 @@ func NewTSDBStores(
if err != nil {
return nil, errors.Wrap(err, "failed to create object client")
}
res.stores[i] = NewBloomTSDBStore(storage.NewIndexStorageClient(c, cfg.IndexTables.PathPrefix))
res.stores[i] = NewBloomTSDBStore(storage.NewIndexStorageClient(c, cfg.IndexTables.PathPrefix), logger)
}
}

Expand Down Expand Up @@ -303,7 +260,7 @@ func (s *TSDBStores) LoadTSDB(
tenant string,
id tsdb.Identifier,
bounds v1.FingerprintBounds,
) (v1.CloseableIterator[*v1.Series], error) {
) (v1.Iterator[*v1.Series], error) {
store, err := s.storeForPeriod(table.DayTime)
if err != nil {
return nil, err
Expand Down
34 changes: 26 additions & 8 deletions pkg/bloomcompactor/tsdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ func TestTSDBSeriesIter(t *testing.T) {
},
}
srcItr := v1.NewSliceIter(input)
itr := NewTSDBSeriesIter(context.Background(), forSeriesTestImpl(input), v1.NewBounds(0, math.MaxUint64))
itr, err := NewTSDBSeriesIter(context.Background(), forSeriesTestImpl(input), v1.NewBounds(0, math.MaxUint64))
require.NoError(t, err)

v1.EqualIterators[*v1.Series](
t,
Expand All @@ -74,13 +75,30 @@ func TestTSDBSeriesIter(t *testing.T) {
}

func TestTSDBSeriesIter_Expiry(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
cancel()
itr := NewTSDBSeriesIter(ctx, forSeriesTestImpl{
{}, // a single entry
}, v1.NewBounds(0, math.MaxUint64))
t.Run("expires on creation", func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
cancel()
itr, err := NewTSDBSeriesIter(ctx, forSeriesTestImpl{
{}, // a single entry
}, v1.NewBounds(0, math.MaxUint64))
require.Error(t, err)
require.False(t, itr.Next())
})

require.False(t, itr.Next())
require.Error(t, itr.Err())
t.Run("expires during consumption", func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
itr, err := NewTSDBSeriesIter(ctx, forSeriesTestImpl{
{},
{},
}, v1.NewBounds(0, math.MaxUint64))
require.NoError(t, err)

require.True(t, itr.Next())
require.NoError(t, itr.Err())

cancel()
require.False(t, itr.Next())
require.Error(t, itr.Err())
})

}
7 changes: 7 additions & 0 deletions pkg/storage/bloom/v1/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,13 @@ func (cii *CancellableIter[T]) Next() bool {
}
}

func (cii *CancellableIter[T]) Err() error {
if err := cii.ctx.Err(); err != nil {
return err
}
return cii.Iterator.Err()
}

func NewCancelableIter[T any](ctx context.Context, itr Iterator[T]) *CancellableIter[T] {
return &CancellableIter[T]{ctx: ctx, Iterator: itr}
}
Expand Down

0 comments on commit 4b28f82

Please sign in to comment.