From 8afdfd500cfcaf6c0ed862cce7487005ddcdc56c Mon Sep 17 00:00:00 2001 From: Christian Haudum Date: Wed, 28 Aug 2024 16:13:45 +0200 Subject: [PATCH] chore: Rename `HeapIterator` to `MergeEntryIterator` (#13975) Stumbled across this iterator interface and was confused about its name. It leaked its implementation details, and additionally that was not even correct any more, since the implementation has changed. Signed-off-by: Christian Haudum --- pkg/iter/entry_iterator.go | 13 +++++++------ pkg/iter/entry_iterator_test.go | 8 ++++---- pkg/querier/tail.go | 2 +- pkg/storage/batch.go | 4 ++-- pkg/storage/batch_test.go | 4 ++-- 5 files changed, 16 insertions(+), 15 deletions(-) diff --git a/pkg/iter/entry_iterator.go b/pkg/iter/entry_iterator.go index 58e0ab929e7f4..60bf032cab91e 100644 --- a/pkg/iter/entry_iterator.go +++ b/pkg/iter/entry_iterator.go @@ -53,16 +53,17 @@ func (i *streamIterator) Close() error { return nil } -// HeapIterator iterates over a heap of iterators with ability to push new iterators and get some properties like time of entry at peek and len -// Not safe for concurrent use -type HeapIterator interface { +// MergeEntryIterator exposes additional fields that are used by the Tailer only. +// Not safe for concurrent use! +type MergeEntryIterator interface { EntryIterator + Peek() time.Time IsEmpty() bool Push(EntryIterator) } -// mergeEntryIterator iterates over a heap of iterators and merge duplicate entries. +// mergeEntryIterator implements the MergeEntryIterator interface functions. type mergeEntryIterator struct { tree *loser.Tree[sortFields, EntryIterator] stats *stats.Context @@ -74,11 +75,11 @@ type mergeEntryIterator struct { errs []error } -// NewMergeEntryIterator returns a new iterator which uses a heap to merge together entries for multiple iterators and deduplicate entries if any. +// NewMergeEntryIterator returns a new iterator which uses a looser tree to merge together entries for multiple iterators and deduplicate entries if any. // The iterator only order and merge entries across given `is` iterators, it does not merge entries within individual iterator. // This means using this iterator with a single iterator will result in the same result as the input iterator. // If you don't need to deduplicate entries, use `NewSortEntryIterator` instead. -func NewMergeEntryIterator(ctx context.Context, is []EntryIterator, direction logproto.Direction) HeapIterator { +func NewMergeEntryIterator(ctx context.Context, is []EntryIterator, direction logproto.Direction) MergeEntryIterator { maxVal, less := treeLess(direction) result := &mergeEntryIterator{stats: stats.FromContext(ctx)} result.tree = loser.New(is, maxVal, sortFieldsAt, less, result.closeEntry) diff --git a/pkg/iter/entry_iterator_test.go b/pkg/iter/entry_iterator_test.go index e49ecf3ee528a..fb1548ddc35d0 100644 --- a/pkg/iter/entry_iterator_test.go +++ b/pkg/iter/entry_iterator_test.go @@ -162,16 +162,16 @@ func TestIteratorMultipleLabels(t *testing.T) { func TestMergeIteratorPrefetch(t *testing.T) { t.Parallel() - type tester func(t *testing.T, i HeapIterator) + type tester func(t *testing.T, i MergeEntryIterator) tests := map[string]tester{ - "prefetch on IsEmpty() when called as first method": func(t *testing.T, i HeapIterator) { + "prefetch on IsEmpty() when called as first method": func(t *testing.T, i MergeEntryIterator) { assert.Equal(t, false, i.IsEmpty()) }, - "prefetch on Peek() when called as first method": func(t *testing.T, i HeapIterator) { + "prefetch on Peek() when called as first method": func(t *testing.T, i MergeEntryIterator) { assert.Equal(t, time.Unix(0, 0), i.Peek()) }, - "prefetch on Next() when called as first method": func(t *testing.T, i HeapIterator) { + "prefetch on Next() when called as first method": func(t *testing.T, i MergeEntryIterator) { assert.True(t, i.Next()) assert.Equal(t, logproto.Entry{Timestamp: time.Unix(0, 0), Line: "0"}, i.At()) }, diff --git a/pkg/querier/tail.go b/pkg/querier/tail.go index 87dba6bae09ae..0d9495daf6e46 100644 --- a/pkg/querier/tail.go +++ b/pkg/querier/tail.go @@ -38,7 +38,7 @@ const ( // Tailer manages complete lifecycle of a tail request type Tailer struct { // openStreamIterator is for streams already open - openStreamIterator iter.HeapIterator + openStreamIterator iter.MergeEntryIterator streamMtx sync.Mutex // for synchronizing access to openStreamIterator currEntry logproto.Entry diff --git a/pkg/storage/batch.go b/pkg/storage/batch.go index 46f708d09155a..739a27f9b2334 100644 --- a/pkg/storage/batch.go +++ b/pkg/storage/batch.go @@ -421,7 +421,7 @@ func (it *logBatchIterator) buildIterators(chks map[model.Fingerprint][][]*LazyC for _, chunks := range chks { if len(chunks) != 0 && len(chunks[0]) != 0 { streamPipeline := it.pipeline.ForStream(labels.NewBuilder(chunks[0][0].Chunk.Metric).Del(labels.MetricName).Labels()) - iterator, err := it.buildHeapIterator(chunks, from, through, streamPipeline, nextChunk) + iterator, err := it.buildMergeIterator(chunks, from, through, streamPipeline, nextChunk) if err != nil { return nil, err } @@ -433,7 +433,7 @@ func (it *logBatchIterator) buildIterators(chks map[model.Fingerprint][][]*LazyC return result, nil } -func (it *logBatchIterator) buildHeapIterator(chks [][]*LazyChunk, from, through time.Time, streamPipeline log.StreamPipeline, nextChunk *LazyChunk) (iter.EntryIterator, error) { +func (it *logBatchIterator) buildMergeIterator(chks [][]*LazyChunk, from, through time.Time, streamPipeline log.StreamPipeline, nextChunk *LazyChunk) (iter.EntryIterator, error) { result := make([]iter.EntryIterator, 0, len(chks)) for i := range chks { diff --git a/pkg/storage/batch_test.go b/pkg/storage/batch_test.go index 0159c20a19f65..34d8e350045a5 100644 --- a/pkg/storage/batch_test.go +++ b/pkg/storage/batch_test.go @@ -1649,9 +1649,9 @@ func TestBuildHeapIterator(t *testing.T) { ctx: ctx, pipeline: log.NewNoopPipeline(), } - it, err := b.buildHeapIterator(tc.input, from, from.Add(6*time.Millisecond), b.pipeline.ForStream(labels.Labels{labels.Label{Name: "foo", Value: "bar"}}), nil) + it, err := b.buildMergeIterator(tc.input, from, from.Add(6*time.Millisecond), b.pipeline.ForStream(labels.Labels{labels.Label{Name: "foo", Value: "bar"}}), nil) if err != nil { - t.Errorf("buildHeapIterator error = %v", err) + t.Errorf("buildMergeIterator error = %v", err) return } req := newQuery("{foo=\"bar\"}", from, from.Add(6*time.Millisecond), nil, nil)