From 917053a73058ebff5cec72d760ba16f2acc8a56c Mon Sep 17 00:00:00 2001
From: Cyril Tovena <cyril.tovena@gmail.com>
Date: Fri, 2 Aug 2024 18:07:17 +0200
Subject: [PATCH] feat: Introduce wal segment read path. (#13695)

Co-authored-by: Ben Clive <ben.clive@grafana.com>
---
 pkg/ingester-rf1/flush.go                     |   2 +-
 pkg/loki/modules.go                           |   9 +-
 pkg/querier-rf1/querier.go                    |  13 +-
 pkg/querier-rf1/wal/chunks.go                 | 323 ++++++++
 pkg/querier-rf1/wal/chunks_test.go            | 516 +++++++++++++
 pkg/querier-rf1/wal/querier.go                | 203 +++++
 pkg/querier-rf1/wal/querier_test.go           | 697 ++++++++++++++++++
 .../indexshipper/tsdb/index/postings.go       |   7 +
 pkg/storage/wal/chunks/chunks.go              |   6 +-
 pkg/storage/wal/chunks/doc.go                 |  37 +
 pkg/storage/wal/chunks/entry_iterator.go      | 115 +++
 pkg/storage/wal/chunks/entry_iterator_test.go | 143 ++++
 pkg/storage/wal/chunks/sample_iterator.go     |  87 +++
 .../wal/chunks/sample_iterator_test.go        | 202 +++++
 pkg/storage/wal/index/index.go                | 271 +++++++
 pkg/storage/wal/segment.go                    |   6 +-
 pkg/storage/wal/segment_test.go               |   4 +-
 17 files changed, 2629 insertions(+), 12 deletions(-)
 create mode 100644 pkg/querier-rf1/wal/chunks.go
 create mode 100644 pkg/querier-rf1/wal/chunks_test.go
 create mode 100644 pkg/querier-rf1/wal/querier.go
 create mode 100644 pkg/querier-rf1/wal/querier_test.go
 create mode 100644 pkg/storage/wal/chunks/doc.go
 create mode 100644 pkg/storage/wal/chunks/entry_iterator.go
 create mode 100644 pkg/storage/wal/chunks/entry_iterator_test.go
 create mode 100644 pkg/storage/wal/chunks/sample_iterator.go
 create mode 100644 pkg/storage/wal/chunks/sample_iterator_test.go

diff --git a/pkg/ingester-rf1/flush.go b/pkg/ingester-rf1/flush.go
index 55601337d350f..2d194a12f5574 100644
--- a/pkg/ingester-rf1/flush.go
+++ b/pkg/ingester-rf1/flush.go
@@ -114,7 +114,7 @@ func (i *Ingester) flushSegment(ctx context.Context, j int, w *wal.SegmentWriter
 	wal.ReportSegmentStats(stats, i.metrics.segmentMetrics)
 
 	id := ulid.MustNew(ulid.Timestamp(time.Now()), rand.Reader).String()
-	if err := i.store.PutObject(ctx, fmt.Sprintf("loki-v2/wal/anon/"+id), buf); err != nil {
+	if err := i.store.PutObject(ctx, fmt.Sprintf(wal.Dir+id), buf); err != nil {
 		i.metrics.flushFailuresTotal.Inc()
 		return fmt.Errorf("failed to put object: %w", err)
 	}
diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go
index 1ede7ee806b79..d823f5cedb5cd 100644
--- a/pkg/loki/modules.go
+++ b/pkg/loki/modules.go
@@ -52,6 +52,7 @@ import (
 	metastoreclient "github.com/grafana/loki/v3/pkg/ingester-rf1/metastore/client"
 	"github.com/grafana/loki/v3/pkg/ingester-rf1/metastore/health"
 	"github.com/grafana/loki/v3/pkg/ingester-rf1/metastore/metastorepb"
+	"github.com/grafana/loki/v3/pkg/ingester-rf1/objstore"
 	"github.com/grafana/loki/v3/pkg/logproto"
 	"github.com/grafana/loki/v3/pkg/logql"
 	"github.com/grafana/loki/v3/pkg/logqlmodel/stats"
@@ -415,7 +416,11 @@ func (t *Loki) initQuerier() (services.Service, error) {
 
 	if t.Cfg.QuerierRF1.Enabled {
 		logger.Log("Using RF-1 querier implementation")
-		t.Querier, err = querierrf1.New(t.Cfg.QuerierRF1, t.Store, t.Overrides, deleteStore, logger)
+		store, err := objstore.New(t.Cfg.SchemaConfig.Configs, t.Cfg.StorageConfig, t.ClientMetrics)
+		if err != nil {
+			return nil, err
+		}
+		t.Querier, err = querierrf1.New(t.Cfg.QuerierRF1, t.Store, t.Overrides, deleteStore, t.MetastoreClient, store, logger)
 		if err != nil {
 			return nil, err
 		}
@@ -1818,7 +1823,7 @@ func (t *Loki) initMetastore() (services.Service, error) {
 		return nil, nil
 	}
 	if t.Cfg.isTarget(All) {
-		t.Cfg.MetastoreClient.MetastoreAddress = fmt.Sprintf("localhost:%s", t.Cfg.Server.GRPCListenAddress)
+		t.Cfg.MetastoreClient.MetastoreAddress = fmt.Sprintf("localhost:%d", t.Cfg.Server.GRPCListenPort)
 	}
 	m, err := metastore.New(t.Cfg.Metastore, log.With(util_log.Logger, "component", "metastore"), prometheus.DefaultRegisterer, t.health)
 	if err != nil {
diff --git a/pkg/querier-rf1/querier.go b/pkg/querier-rf1/querier.go
index 9504fe23482ab..c4a9dd76ba5f6 100644
--- a/pkg/querier-rf1/querier.go
+++ b/pkg/querier-rf1/querier.go
@@ -34,6 +34,7 @@ import (
 	"github.com/grafana/loki/v3/pkg/logql/syntax"
 	"github.com/grafana/loki/v3/pkg/logqlmodel"
 	"github.com/grafana/loki/v3/pkg/querier"
+	"github.com/grafana/loki/v3/pkg/querier-rf1/wal"
 	querier_limits "github.com/grafana/loki/v3/pkg/querier/limits"
 	"github.com/grafana/loki/v3/pkg/querier/plan"
 	"github.com/grafana/loki/v3/pkg/storage"
@@ -97,6 +98,7 @@ type Rf1Querier struct {
 	deleteGetter   deleteGetter
 	logger         log.Logger
 	patternQuerier PatterQuerier
+	walQuerier     logql.Querier
 }
 
 type deleteGetter interface {
@@ -104,12 +106,17 @@ type deleteGetter interface {
 }
 
 // New makes a new Querier for RF1 work.
-func New(cfg Config, store Store, limits Limits, d deleteGetter, logger log.Logger) (*Rf1Querier, error) {
+func New(cfg Config, store Store, limits Limits, d deleteGetter, metastore wal.Metastore, b wal.BlockStorage, logger log.Logger) (*Rf1Querier, error) {
+	querier, err := wal.New(metastore, b)
+	if err != nil {
+		return nil, err
+	}
 	return &Rf1Querier{
 		cfg:          cfg,
 		store:        store,
 		limits:       limits,
 		deleteGetter: d,
+		walQuerier:   querier,
 		logger:       logger,
 	}, nil
 }
@@ -134,7 +141,7 @@ func (q *Rf1Querier) SelectLogs(ctx context.Context, params logql.SelectLogParam
 			"msg", "querying rf1 store",
 			"params", params)
 	}
-	storeIter, err := q.store.SelectLogs(ctx, params)
+	storeIter, err := q.walQuerier.SelectLogs(ctx, params)
 	if err != nil {
 		return nil, err
 	}
@@ -164,7 +171,7 @@ func (q *Rf1Querier) SelectSamples(ctx context.Context, params logql.SelectSampl
 			"msg", "querying rf1 store for samples",
 			"params", params)
 	}
-	storeIter, err := q.store.SelectSamples(ctx, params)
+	storeIter, err := q.walQuerier.SelectSamples(ctx, params)
 	if err != nil {
 		return nil, err
 	}
diff --git a/pkg/querier-rf1/wal/chunks.go b/pkg/querier-rf1/wal/chunks.go
new file mode 100644
index 0000000000000..76070006aff40
--- /dev/null
+++ b/pkg/querier-rf1/wal/chunks.go
@@ -0,0 +1,323 @@
+package wal
+
+import (
+	"context"
+	"fmt"
+	"sort"
+
+	"github.com/prometheus/prometheus/model/labels"
+	"golang.org/x/sync/errgroup"
+
+	"github.com/grafana/loki/v3/pkg/iter"
+	"github.com/grafana/loki/v3/pkg/logproto"
+	"github.com/grafana/loki/v3/pkg/logql/log"
+	"github.com/grafana/loki/v3/pkg/storage/wal"
+	"github.com/grafana/loki/v3/pkg/storage/wal/chunks"
+	"github.com/grafana/loki/v3/pkg/storage/wal/index"
+
+	"github.com/grafana/loki/pkg/push"
+)
+
+const defaultBatchSize = 16
+
+type ChunkData struct {
+	meta   *chunks.Meta
+	labels labels.Labels
+	id     string
+}
+
+func newChunkData(id string, lbs *labels.ScratchBuilder, meta *chunks.Meta) ChunkData {
+	lbs.Sort()
+	newLbs := lbs.Labels()
+	j := 0
+	for _, l := range newLbs {
+		if l.Name != index.TenantLabel {
+			newLbs[j] = l
+			j++
+		}
+	}
+	newLbs = newLbs[:j]
+	return ChunkData{
+		id: id,
+		meta: &chunks.Meta{ // incoming Meta is from a shared buffer, so create a new one
+			Ref:     meta.Ref,
+			MinTime: meta.MinTime,
+			MaxTime: meta.MaxTime,
+		},
+		labels: newLbs,
+	}
+}
+
+// ChunksEntryIterator iterates over log entries
+type ChunksEntryIterator[T iter.EntryIterator] struct {
+	baseChunksIterator[T]
+}
+
+// ChunksSampleIterator iterates over metric samples
+type ChunksSampleIterator[T iter.SampleIterator] struct {
+	baseChunksIterator[T]
+}
+
+func NewChunksEntryIterator(
+	ctx context.Context,
+	storage BlockStorage,
+	chunks []ChunkData,
+	pipeline log.Pipeline,
+	direction logproto.Direction,
+	minT, maxT int64,
+) *ChunksEntryIterator[iter.EntryIterator] {
+	sortChunks(chunks, direction)
+	return &ChunksEntryIterator[iter.EntryIterator]{
+		baseChunksIterator: baseChunksIterator[iter.EntryIterator]{
+			ctx:       ctx,
+			chunks:    chunks,
+			direction: direction,
+			storage:   storage,
+			bachSize:  defaultBatchSize,
+			batch:     make([]ChunkData, 0, defaultBatchSize),
+			minT:      minT,
+			maxT:      maxT,
+
+			iteratorFactory: func(chunks []ChunkData) (iter.EntryIterator, error) {
+				return createNextEntryIterator(ctx, chunks, direction, pipeline, storage, minT, maxT)
+			},
+			isNil: func(it iter.EntryIterator) bool { return it == nil },
+		},
+	}
+}
+
+func NewChunksSampleIterator(
+	ctx context.Context,
+	storage BlockStorage,
+	chunks []ChunkData,
+	extractor log.SampleExtractor,
+	minT, maxT int64,
+) *ChunksSampleIterator[iter.SampleIterator] {
+	sortChunks(chunks, logproto.FORWARD)
+	return &ChunksSampleIterator[iter.SampleIterator]{
+		baseChunksIterator: baseChunksIterator[iter.SampleIterator]{
+			ctx:       ctx,
+			chunks:    chunks,
+			direction: logproto.FORWARD,
+			storage:   storage,
+			bachSize:  defaultBatchSize,
+			batch:     make([]ChunkData, 0, defaultBatchSize),
+			minT:      minT,
+			maxT:      maxT,
+
+			iteratorFactory: func(chunks []ChunkData) (iter.SampleIterator, error) {
+				return createNextSampleIterator(ctx, chunks, extractor, storage, minT, maxT)
+			},
+			isNil: func(it iter.SampleIterator) bool { return it == nil },
+		},
+	}
+}
+
+func sortChunks(chunks []ChunkData, direction logproto.Direction) {
+	sort.Slice(chunks, func(i, j int) bool {
+		if direction == logproto.FORWARD {
+			t1, t2 := chunks[i].meta.MinTime, chunks[j].meta.MinTime
+			if t1 != t2 {
+				return t1 < t2
+			}
+			return labels.Compare(chunks[i].labels, chunks[j].labels) < 0
+		}
+		t1, t2 := chunks[i].meta.MaxTime, chunks[j].meta.MaxTime
+		if t1 != t2 {
+			return t1 > t2
+		}
+		return labels.Compare(chunks[i].labels, chunks[j].labels) < 0
+	})
+}
+
+// baseChunksIterator contains common fields and methods for both entry and sample iterators
+type baseChunksIterator[T interface {
+	Next() bool
+	Close() error
+	Err() error
+	StreamHash() uint64
+	Labels() string
+}] struct {
+	chunks          []ChunkData
+	direction       logproto.Direction
+	minT, maxT      int64
+	storage         BlockStorage
+	ctx             context.Context
+	iteratorFactory func([]ChunkData) (T, error)
+	isNil           func(T) bool
+
+	bachSize int
+	batch    []ChunkData
+	current  T
+	err      error
+}
+
+func (b *baseChunksIterator[T]) nextBatch() error {
+	b.batch = b.batch[:0]
+	for len(b.chunks) > 0 &&
+		(len(b.batch) < b.bachSize ||
+			isOverlapping(b.batch[len(b.batch)-1], b.chunks[0], b.direction)) {
+		b.batch = append(b.batch, b.chunks[0])
+		b.chunks = b.chunks[1:]
+	}
+	// todo: error if the batch is too big.
+	return nil
+}
+
+// todo: better chunk batch iterator
+func (b *baseChunksIterator[T]) Next() bool {
+	for b.isNil(b.current) || !b.current.Next() {
+		if !b.isNil(b.current) {
+			if err := b.current.Close(); err != nil {
+				b.err = err
+				return false
+			}
+		}
+		if len(b.chunks) == 0 {
+			return false
+		}
+		if err := b.nextBatch(); err != nil {
+			b.err = err
+			return false
+		}
+		var err error
+		b.current, err = b.iteratorFactory(b.batch)
+		if err != nil {
+			b.err = err
+			return false
+		}
+	}
+	return true
+}
+
+func createNextEntryIterator(
+	ctx context.Context,
+	batch []ChunkData,
+	direction logproto.Direction,
+	pipeline log.Pipeline,
+	storage BlockStorage,
+	minT, maxT int64,
+) (iter.EntryIterator, error) {
+	iterators := make([]iter.EntryIterator, 0, len(batch))
+
+	data, err := downloadChunks(ctx, storage, batch)
+	if err != nil {
+		return nil, err
+	}
+
+	for i, chunk := range batch {
+		streamPipeline := pipeline.ForStream(chunk.labels)
+		chunkIterator, err := chunks.NewEntryIterator(data[i], streamPipeline, direction, minT, maxT)
+		if err != nil {
+			return nil, fmt.Errorf("error creating entry iterator: %w", err)
+		}
+		iterators = append(iterators, chunkIterator)
+	}
+
+	// todo: Use NonOverlapping iterator when possible. This will reduce the amount of entries processed during iteration.
+	return iter.NewSortEntryIterator(iterators, direction), nil
+}
+
+func createNextSampleIterator(
+	ctx context.Context,
+	batch []ChunkData,
+	pipeline log.SampleExtractor,
+	storage BlockStorage,
+	minT, maxT int64,
+) (iter.SampleIterator, error) {
+	iterators := make([]iter.SampleIterator, 0, len(batch))
+
+	data, err := downloadChunks(ctx, storage, batch)
+	if err != nil {
+		return nil, err
+	}
+
+	for i, chunk := range batch {
+		streamPipeline := pipeline.ForStream(chunk.labels)
+		chunkIterator, err := chunks.NewSampleIterator(data[i], streamPipeline, minT, maxT)
+		if err != nil {
+			return nil, fmt.Errorf("error creating sample iterator: %w", err)
+		}
+		iterators = append(iterators, chunkIterator)
+	}
+
+	return iter.NewSortSampleIterator(iterators), nil
+}
+
+func (b *baseChunksIterator[T]) Close() error {
+	if !b.isNil(b.current) {
+		return b.current.Close()
+	}
+	return nil
+}
+
+func (b *baseChunksIterator[T]) Err() error {
+	if b.err != nil {
+		return b.err
+	}
+	if !b.isNil(b.current) {
+		return b.current.Err()
+	}
+	return nil
+}
+
+func (b *baseChunksIterator[T]) Labels() string {
+	return b.current.Labels()
+}
+
+func (b *baseChunksIterator[T]) StreamHash() uint64 {
+	return b.current.StreamHash()
+}
+
+func (c *ChunksEntryIterator[T]) At() push.Entry       { return c.current.At() }
+func (c *ChunksSampleIterator[T]) At() logproto.Sample { return c.current.At() }
+
+func isOverlapping(first, second ChunkData, direction logproto.Direction) bool {
+	if direction == logproto.BACKWARD {
+		return first.meta.MinTime <= second.meta.MaxTime
+	}
+	return first.meta.MaxTime >= second.meta.MinTime
+}
+
+func downloadChunks(ctx context.Context, storage BlockStorage, chks []ChunkData) ([][]byte, error) {
+	data := make([][]byte, len(chks))
+	g, ctx := errgroup.WithContext(ctx)
+	g.SetLimit(64)
+	for i, chunk := range chks {
+		chunk := chunk
+		i := i
+		g.Go(func() error {
+			chunkData, err := readChunkData(ctx, storage, chunk)
+			if err != nil {
+				return fmt.Errorf("error reading chunk data: %w", err)
+			}
+			data[i] = chunkData
+			return nil
+		})
+	}
+
+	if err := g.Wait(); err != nil {
+		return nil, err
+	}
+	return data, nil
+}
+
+func readChunkData(ctx context.Context, storage BlockStorage, chunk ChunkData) ([]byte, error) {
+	offset, size := chunk.meta.Ref.Unpack()
+	// todo: We should be able to avoid many IOPS to object storage
+	// if chunks are next to each other and we should be able to pack range request
+	// together.
+	reader, err := storage.GetObjectRange(ctx, wal.Dir+chunk.id, int64(offset), int64(size))
+	if err != nil {
+		return nil, err
+	}
+	defer reader.Close()
+
+	data := make([]byte, size)
+	_, err = reader.Read(data)
+	if err != nil {
+		return nil, err
+	}
+
+	return data, nil
+}
diff --git a/pkg/querier-rf1/wal/chunks_test.go b/pkg/querier-rf1/wal/chunks_test.go
new file mode 100644
index 0000000000000..0d0192c04b17a
--- /dev/null
+++ b/pkg/querier-rf1/wal/chunks_test.go
@@ -0,0 +1,516 @@
+package wal
+
+import (
+	"bytes"
+	"context"
+	"fmt"
+	"io"
+	"testing"
+	"time"
+
+	"github.com/prometheus/prometheus/model/labels"
+	"github.com/stretchr/testify/require"
+
+	"github.com/grafana/loki/v3/pkg/iter"
+	"github.com/grafana/loki/v3/pkg/logproto"
+	"github.com/grafana/loki/v3/pkg/logql/syntax"
+	"github.com/grafana/loki/v3/pkg/storage/wal"
+	walchunks "github.com/grafana/loki/v3/pkg/storage/wal/chunks"
+)
+
+type mockBlockStorage struct {
+	data map[string][]byte
+}
+
+func (m *mockBlockStorage) GetObjectRange(_ context.Context, objectKey string, off, length int64) (io.ReadCloser, error) {
+	data := m.data[objectKey]
+	return io.NopCloser(bytes.NewReader(data[off : off+length])), nil
+}
+
+func TestChunksEntryIterator(t *testing.T) {
+	ctx := context.Background()
+	storage := &mockBlockStorage{data: make(map[string][]byte)}
+
+	// Generate test data with multiple batches
+	chunkData := generateTestChunkData(5 * defaultBatchSize)
+	chks := writeChunksToStorage(t, storage, chunkData)
+
+	tests := []struct {
+		name      string
+		direction logproto.Direction
+		start     time.Time
+		end       time.Time
+		expected  []logproto.Entry
+	}{
+		{
+			name:      "forward direction, all entries",
+			direction: logproto.FORWARD,
+			start:     time.Unix(0, 0),
+			end:       time.Unix(int64(5*defaultBatchSize+1), 0),
+			expected:  flattenEntries(chunkData),
+		},
+		{
+			name:      "backward direction, all entries",
+			direction: logproto.BACKWARD,
+			start:     time.Unix(0, 0),
+			end:       time.Unix(int64(5*defaultBatchSize+1), 0),
+			expected:  reverseEntries(flattenEntries(chunkData)),
+		},
+		{
+			name:      "forward direction, partial range",
+			direction: logproto.FORWARD,
+			start:     time.Unix(int64(defaultBatchSize), 0),
+			end:       time.Unix(int64(3*defaultBatchSize), 0),
+			expected:  selectEntries(flattenEntries(chunkData), defaultBatchSize, 3*defaultBatchSize),
+		},
+		{
+			name:      "backward direction, partial range",
+			direction: logproto.BACKWARD,
+			start:     time.Unix(int64(defaultBatchSize), 0),
+			end:       time.Unix(int64(3*defaultBatchSize), 0),
+			expected:  reverseEntries(selectEntries(flattenEntries(chunkData), defaultBatchSize, 3*defaultBatchSize)),
+		},
+	}
+
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			expr, err := syntax.ParseLogSelector(`{app=~".+"}`, false)
+			require.NoError(t, err)
+
+			pipeline, err := expr.Pipeline()
+			require.NoError(t, err)
+
+			iterator := NewChunksEntryIterator(ctx, storage, chks, pipeline, tt.direction, tt.start.UnixNano(), tt.end.UnixNano())
+
+			result := iterateEntries(iterator)
+			require.NoError(t, iterator.Close())
+			require.NoError(t, iterator.Err())
+
+			assertEqualEntries(t, tt.expected, result)
+		})
+	}
+}
+
+func TestChunksSampleIterator(t *testing.T) {
+	ctx := context.Background()
+	storage := &mockBlockStorage{data: make(map[string][]byte)}
+
+	// Generate test data with multiple batches
+	chunkData := generateTestChunkData(5 * defaultBatchSize)
+	chks := writeChunksToStorage(t, storage, chunkData)
+
+	tests := []struct {
+		name     string
+		start    time.Time
+		end      time.Time
+		expected []logproto.Sample
+	}{
+		{
+			name:     "all samples",
+			start:    time.Unix(0, 0),
+			end:      time.Unix(int64(5*defaultBatchSize+1), 0),
+			expected: entriesToSamples(flattenEntries(chunkData)),
+		},
+		{
+			name:     "partial range",
+			start:    time.Unix(int64(defaultBatchSize), 0),
+			end:      time.Unix(int64(3*defaultBatchSize), 0),
+			expected: entriesToSamples(selectEntries(flattenEntries(chunkData), defaultBatchSize, 3*defaultBatchSize)),
+		},
+	}
+
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			expr, err := syntax.ParseSampleExpr(`count_over_time({app=~".+"} [1m])`)
+			require.NoError(t, err)
+
+			extractor, err := expr.Extractor()
+			require.NoError(t, err)
+			iterator := NewChunksSampleIterator(ctx, storage, chks, extractor, tt.start.UnixNano(), tt.end.UnixNano())
+
+			result := iterateSamples(iterator)
+			require.NoError(t, iterator.Close())
+			require.NoError(t, iterator.Err())
+
+			assertEqualSamples(t, tt.expected, result)
+		})
+	}
+}
+
+func TestSortChunks(t *testing.T) {
+	chks := []ChunkData{
+		{
+			meta:   &walchunks.Meta{MinTime: 2, MaxTime: 4},
+			labels: labels.FromStrings("app", "test1"),
+		},
+		{
+			meta:   &walchunks.Meta{MinTime: 1, MaxTime: 3},
+			labels: labels.FromStrings("app", "test2"),
+		},
+		{
+			meta:   &walchunks.Meta{MinTime: 1, MaxTime: 3},
+			labels: labels.FromStrings("app", "test1"),
+		},
+	}
+
+	t.Run("forward direction", func(t *testing.T) {
+		sortChunks(chks, logproto.FORWARD)
+		require.Equal(t, int64(1), chks[0].meta.MinTime)
+		require.Equal(t, "test1", chks[0].labels.Get("app"))
+		require.Equal(t, int64(1), chks[1].meta.MinTime)
+		require.Equal(t, "test2", chks[1].labels.Get("app"))
+		require.Equal(t, int64(2), chks[2].meta.MinTime)
+	})
+
+	t.Run("backward direction", func(t *testing.T) {
+		sortChunks(chks, logproto.BACKWARD)
+		require.Equal(t, int64(4), chks[0].meta.MaxTime)
+		require.Equal(t, "test1", chks[0].labels.Get("app"))
+		require.Equal(t, int64(3), chks[1].meta.MaxTime)
+		require.Equal(t, "test1", chks[1].labels.Get("app"))
+		require.Equal(t, int64(3), chks[2].meta.MaxTime)
+		require.Equal(t, "test2", chks[2].labels.Get("app"))
+	})
+}
+
+func TestIsOverlapping(t *testing.T) {
+	tests := []struct {
+		name      string
+		first     ChunkData
+		second    ChunkData
+		direction logproto.Direction
+		expected  bool
+	}{
+		{
+			name:      "overlapping forward",
+			first:     ChunkData{meta: &walchunks.Meta{MinTime: 1, MaxTime: 3}},
+			second:    ChunkData{meta: &walchunks.Meta{MinTime: 2, MaxTime: 4}},
+			direction: logproto.FORWARD,
+			expected:  true,
+		},
+		{
+			name:      "non-overlapping forward",
+			first:     ChunkData{meta: &walchunks.Meta{MinTime: 1, MaxTime: 2}},
+			second:    ChunkData{meta: &walchunks.Meta{MinTime: 3, MaxTime: 4}},
+			direction: logproto.FORWARD,
+			expected:  false,
+		},
+		{
+			name:      "overlapping backward",
+			first:     ChunkData{meta: &walchunks.Meta{MinTime: 2, MaxTime: 4}},
+			second:    ChunkData{meta: &walchunks.Meta{MinTime: 1, MaxTime: 3}},
+			direction: logproto.BACKWARD,
+			expected:  true,
+		},
+		{
+			name:      "non-overlapping backward",
+			first:     ChunkData{meta: &walchunks.Meta{MinTime: 3, MaxTime: 4}},
+			second:    ChunkData{meta: &walchunks.Meta{MinTime: 1, MaxTime: 2}},
+			direction: logproto.BACKWARD,
+			expected:  false,
+		},
+	}
+
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			result := isOverlapping(tt.first, tt.second, tt.direction)
+			require.Equal(t, tt.expected, result)
+		})
+	}
+}
+
+func TestBaseChunkIterator(t *testing.T) {
+	ctx := context.Background()
+
+	testCases := []struct {
+		name      string
+		chunks    []ChunkData
+		direction logproto.Direction
+		expected  [][]ChunkData
+	}{
+		{
+			name: "Forward, non-overlapping",
+			chunks: []ChunkData{
+				newTestChunkData("1", 100, 200),
+				newTestChunkData("2", 300, 400),
+				newTestChunkData("3", 500, 600),
+				newTestChunkData("4", 700, 800),
+			},
+			direction: logproto.FORWARD,
+			expected: [][]ChunkData{
+				{newTestChunkData("1", 100, 200), newTestChunkData("2", 300, 400)},
+				{newTestChunkData("3", 500, 600), newTestChunkData("4", 700, 800)},
+			},
+		},
+		{
+			name: "Backward, non-overlapping",
+			chunks: []ChunkData{
+				newTestChunkData("4", 700, 800),
+				newTestChunkData("3", 500, 600),
+				newTestChunkData("2", 300, 400),
+				newTestChunkData("1", 100, 200),
+			},
+			direction: logproto.BACKWARD,
+			expected: [][]ChunkData{
+				{newTestChunkData("4", 700, 800), newTestChunkData("3", 500, 600)},
+				{newTestChunkData("2", 300, 400), newTestChunkData("1", 100, 200)},
+			},
+		},
+		{
+			name: "Forward, overlapping",
+			chunks: []ChunkData{
+				newTestChunkData("1", 100, 300),
+				newTestChunkData("2", 200, 400),
+				newTestChunkData("3", 350, 550),
+				newTestChunkData("4", 600, 800),
+			},
+			direction: logproto.FORWARD,
+			expected: [][]ChunkData{
+				{newTestChunkData("1", 100, 300), newTestChunkData("2", 200, 400), newTestChunkData("3", 350, 550)},
+				{newTestChunkData("4", 600, 800)},
+			},
+		},
+		{
+			name: "Backward, overlapping",
+			chunks: []ChunkData{
+				newTestChunkData("4", 600, 800),
+				newTestChunkData("3", 350, 550),
+				newTestChunkData("2", 200, 400),
+				newTestChunkData("1", 100, 300),
+				newTestChunkData("0", 10, 20),
+			},
+			direction: logproto.BACKWARD,
+			expected: [][]ChunkData{
+				{newTestChunkData("4", 600, 800), newTestChunkData("3", 350, 550), newTestChunkData("2", 200, 400), newTestChunkData("1", 100, 300)},
+				{newTestChunkData("0", 10, 20)},
+			},
+		},
+	}
+
+	for _, tc := range testCases {
+		t.Run(tc.name, func(t *testing.T) {
+			iter := &testBaseChunkIterator{
+				baseChunksIterator: baseChunksIterator[*testIterator]{
+					ctx:       ctx,
+					chunks:    tc.chunks,
+					direction: tc.direction,
+					bachSize:  2,
+					batch:     make([]ChunkData, 0, 2),
+					iteratorFactory: func(chunks []ChunkData) (*testIterator, error) {
+						return &testIterator{chunks: chunks}, nil
+					},
+					isNil: func(it *testIterator) bool { return it == nil },
+				},
+			}
+			var batches [][]ChunkData
+			for len(iter.chunks) > 0 {
+				err := iter.nextBatch()
+				require.NoError(t, err)
+
+				batch := make([]ChunkData, len(iter.batch))
+				copy(batch, iter.batch)
+				batches = append(batches, batch)
+			}
+
+			require.Equal(t, tc.expected, batches)
+		})
+	}
+}
+
+// Helper functions and types
+
+type testBaseChunkIterator struct {
+	baseChunksIterator[*testIterator]
+}
+
+type testIterator struct {
+	chunks []ChunkData
+	index  int
+}
+
+func (t *testIterator) Next() bool {
+	t.index++
+	return t.index < len(t.chunks)
+}
+
+func (t *testIterator) Close() error       { return nil }
+func (t *testIterator) Err() error         { return nil }
+func (t *testIterator) StreamHash() uint64 { return 0 }
+func (t *testIterator) Labels() string     { return "" }
+func (t *testIterator) At() logproto.Entry { return logproto.Entry{} }
+
+func newTestChunkData(id string, minTime, maxTime int64) ChunkData {
+	return ChunkData{
+		id: id,
+		meta: &walchunks.Meta{
+			MinTime: minTime,
+			MaxTime: maxTime,
+		},
+		labels: labels.Labels{},
+	}
+}
+
+func createChunk(minTime, maxTime int64, labelName, labelValue string) ChunkData {
+	return ChunkData{
+		meta: &walchunks.Meta{
+			MinTime: minTime,
+			MaxTime: maxTime,
+		},
+		labels: labels.FromStrings(labelName, labelValue),
+	}
+}
+
+func assertEqualChunks(t *testing.T, expected, actual ChunkData) {
+	require.Equal(t, expected.meta.MinTime, actual.meta.MinTime, "MinTime mismatch")
+	require.Equal(t, expected.meta.MaxTime, actual.meta.MaxTime, "MaxTime mismatch")
+	require.Equal(t, expected.labels, actual.labels, "Labels mismatch")
+}
+
+func generateTestChunkData(totalEntries int) []struct {
+	labels  labels.Labels
+	entries []*logproto.Entry
+} {
+	var chunkData []struct {
+		labels  labels.Labels
+		entries []*logproto.Entry
+	}
+
+	entriesPerChunk := defaultBatchSize * 2 // Each chunk will contain 2 batches worth of entries
+	numChunks := (totalEntries + entriesPerChunk - 1) / entriesPerChunk
+
+	for i := 0; i < numChunks; i++ {
+		startIndex := i * entriesPerChunk
+		endIndex := (i + 1) * entriesPerChunk
+		if endIndex > totalEntries {
+			endIndex = totalEntries
+		}
+
+		chunkData = append(chunkData, struct {
+			labels  labels.Labels
+			entries []*logproto.Entry
+		}{
+			labels:  labels.FromStrings("app", fmt.Sprintf("test%d", i)),
+			entries: generateEntries(startIndex, endIndex-1),
+		})
+	}
+
+	return chunkData
+}
+
+func writeChunksToStorage(t *testing.T, storage *mockBlockStorage, chunkData []struct {
+	labels  labels.Labels
+	entries []*logproto.Entry
+},
+) []ChunkData {
+	chks := make([]ChunkData, 0, len(chunkData))
+	for i, cd := range chunkData {
+		var buf bytes.Buffer
+		chunkID := fmt.Sprintf("chunk%d", i)
+		_, err := walchunks.WriteChunk(&buf, cd.entries, walchunks.EncodingSnappy)
+		require.NoError(t, err)
+
+		storage.data[wal.Dir+chunkID] = buf.Bytes()
+		chks = append(chks, newChunkData(chunkID, labelsToScratchBuilder(cd.labels), &walchunks.Meta{
+			Ref:     walchunks.NewChunkRef(0, uint64(buf.Len())),
+			MinTime: cd.entries[0].Timestamp.UnixNano(),
+			MaxTime: cd.entries[len(cd.entries)-1].Timestamp.UnixNano(),
+		}))
+	}
+	return chks
+}
+
+func generateEntries(start, end int) []*logproto.Entry {
+	var entries []*logproto.Entry
+	for i := start; i <= end; i++ {
+		entries = append(entries, &logproto.Entry{
+			Timestamp: time.Unix(int64(i), 0),
+			Line:      fmt.Sprintf("line%d", i),
+		})
+	}
+	return entries
+}
+
+func flattenEntries(chunkData []struct {
+	labels  labels.Labels
+	entries []*logproto.Entry
+},
+) []logproto.Entry {
+	var result []logproto.Entry
+	for _, cd := range chunkData {
+		for _, e := range cd.entries {
+			result = append(result, logproto.Entry{Timestamp: e.Timestamp, Line: e.Line})
+		}
+	}
+	return result
+}
+
+func reverseEntries(entries []logproto.Entry) []logproto.Entry {
+	for i := 0; i < len(entries)/2; i++ {
+		j := len(entries) - 1 - i
+		entries[i], entries[j] = entries[j], entries[i]
+	}
+	return entries
+}
+
+func selectEntries(entries []logproto.Entry, start, end int) []logproto.Entry {
+	var result []logproto.Entry
+	for _, e := range entries {
+		if e.Timestamp.Unix() >= int64(start) && e.Timestamp.Unix() < int64(end) {
+			result = append(result, e)
+		}
+	}
+	return result
+}
+
+func entriesToSamples(entries []logproto.Entry) []logproto.Sample {
+	var samples []logproto.Sample
+	for _, e := range entries {
+		samples = append(samples, logproto.Sample{
+			Timestamp: e.Timestamp.UnixNano(),
+			Value:     float64(1), // Use timestamp as value for simplicity
+		})
+	}
+	return samples
+}
+
+func iterateEntries(iterator *ChunksEntryIterator[iter.EntryIterator]) []logproto.Entry {
+	var result []logproto.Entry
+	for iterator.Next() {
+		entry := iterator.At()
+		result = append(result, logproto.Entry{Timestamp: entry.Timestamp, Line: entry.Line})
+	}
+	return result
+}
+
+func iterateSamples(iterator *ChunksSampleIterator[iter.SampleIterator]) []logproto.Sample {
+	var result []logproto.Sample
+	for iterator.Next() {
+		result = append(result, iterator.At())
+	}
+	return result
+}
+
+func assertEqualEntries(t *testing.T, expected, actual []logproto.Entry) {
+	require.Equal(t, len(expected), len(actual), "Number of entries mismatch")
+	for i := range expected {
+		require.Equal(t, expected[i].Timestamp, actual[i].Timestamp, "Timestamp mismatch at index %d", i)
+		require.Equal(t, expected[i].Line, actual[i].Line, "Line mismatch at index %d", i)
+	}
+}
+
+func assertEqualSamples(t *testing.T, expected, actual []logproto.Sample) {
+	require.Equal(t, len(expected), len(actual), "Number of samples mismatch")
+	for i := range expected {
+		require.Equal(t, expected[i].Timestamp, actual[i].Timestamp, "Timestamp mismatch at index %d", i)
+		require.Equal(t, expected[i].Value, actual[i].Value, "Value mismatch at index %d", i)
+	}
+}
+
+func labelsToScratchBuilder(lbs labels.Labels) *labels.ScratchBuilder {
+	sb := labels.NewScratchBuilder(len(lbs))
+	sb.Reset()
+	for i := 0; i < len(lbs); i++ {
+		sb.Add(lbs[i].Name, lbs[i].Value)
+	}
+	return &sb
+}
diff --git a/pkg/querier-rf1/wal/querier.go b/pkg/querier-rf1/wal/querier.go
new file mode 100644
index 0000000000000..0fb2cc23dc525
--- /dev/null
+++ b/pkg/querier-rf1/wal/querier.go
@@ -0,0 +1,203 @@
+package wal
+
+import (
+	"bytes"
+	"context"
+	"io"
+	"sync"
+
+	"github.com/opentracing/opentracing-go"
+	"github.com/prometheus/prometheus/model/labels"
+	"golang.org/x/sync/errgroup"
+	"google.golang.org/grpc"
+
+	"github.com/grafana/dskit/tenant"
+
+	"github.com/grafana/loki/v3/pkg/ingester-rf1/metastore/metastorepb"
+	"github.com/grafana/loki/v3/pkg/iter"
+	"github.com/grafana/loki/v3/pkg/logql"
+	"github.com/grafana/loki/v3/pkg/storage/wal"
+	"github.com/grafana/loki/v3/pkg/storage/wal/chunks"
+	"github.com/grafana/loki/v3/pkg/storage/wal/index"
+)
+
+var _ logql.Querier = (*Querier)(nil)
+
+type BlockStorage interface {
+	GetObjectRange(ctx context.Context, objectKey string, off, length int64) (io.ReadCloser, error)
+}
+
+type Metastore interface {
+	ListBlocksForQuery(ctx context.Context, in *metastorepb.ListBlocksForQueryRequest, opts ...grpc.CallOption) (*metastorepb.ListBlocksForQueryResponse, error)
+}
+
+type Querier struct {
+	blockStorage BlockStorage
+	metaStore    Metastore
+}
+
+func New(
+	metaStore Metastore,
+	blockStorage BlockStorage,
+) (*Querier, error) {
+	return &Querier{
+		blockStorage: blockStorage,
+		metaStore:    metaStore,
+	}, nil
+}
+
+func (q *Querier) SelectLogs(ctx context.Context, req logql.SelectLogParams) (iter.EntryIterator, error) {
+	// todo request validation and delete markers.
+	tenantID, err := tenant.TenantID(ctx)
+	if err != nil {
+		return nil, err
+	}
+	expr, err := req.LogSelector()
+	if err != nil {
+		return nil, err
+	}
+	matchers := expr.Matchers()
+	// todo: not sure if Pipeline is thread safe
+	pipeline, err := expr.Pipeline()
+	if err != nil {
+		return nil, err
+	}
+
+	chks, err := q.matchingChunks(ctx, tenantID, req.Start.UnixNano(), req.End.UnixNano(), matchers...)
+	if err != nil {
+		return nil, err
+	}
+
+	return NewChunksEntryIterator(ctx,
+		q.blockStorage,
+		chks,
+		pipeline,
+		req.Direction,
+		req.Start.UnixNano(),
+		req.End.UnixNano()), nil
+}
+
+func (q *Querier) SelectSamples(ctx context.Context, req logql.SelectSampleParams) (iter.SampleIterator, error) {
+	// todo request validation and delete markers.
+	tenantID, err := tenant.TenantID(ctx)
+	if err != nil {
+		return nil, err
+	}
+	expr, err := req.Expr()
+	if err != nil {
+		return nil, err
+	}
+	selector, err := expr.Selector()
+	if err != nil {
+		return nil, err
+	}
+	matchers := selector.Matchers()
+	// todo: not sure if Extractor is thread safe
+
+	extractor, err := expr.Extractor()
+	if err != nil {
+		return nil, err
+	}
+
+	chks, err := q.matchingChunks(ctx, tenantID, req.Start.UnixNano(), req.End.UnixNano(), matchers...)
+	if err != nil {
+		return nil, err
+	}
+
+	return NewChunksSampleIterator(ctx,
+		q.blockStorage,
+		chks,
+		extractor,
+		req.Start.UnixNano(),
+		req.End.UnixNano()), nil
+}
+
+func (q *Querier) matchingChunks(ctx context.Context, tenantID string, from, through int64, matchers ...*labels.Matcher) ([]ChunkData, error) {
+	sp, ctx := opentracing.StartSpanFromContext(ctx, "matchingChunks")
+	defer sp.Finish()
+	// todo support sharding
+	var (
+		lazyChunks []ChunkData
+		mtx        sync.Mutex
+	)
+
+	err := q.forSeries(ctx, &metastorepb.ListBlocksForQueryRequest{
+		TenantId:  tenantID,
+		StartTime: from,
+		EndTime:   through,
+	}, func(id string, lbs *labels.ScratchBuilder, chk *chunks.Meta) error {
+		mtx.Lock()
+		lazyChunks = append(lazyChunks, newChunkData(id, lbs, chk))
+		mtx.Unlock()
+		return nil
+	}, matchers...)
+	if err != nil {
+		return nil, err
+	}
+	if sp != nil {
+		sp.LogKV("matchedChunks", len(lazyChunks))
+	}
+	return lazyChunks, nil
+}
+
+func (q *Querier) forSeries(ctx context.Context, req *metastorepb.ListBlocksForQueryRequest, fn func(string, *labels.ScratchBuilder, *chunks.Meta) error, matchers ...*labels.Matcher) error {
+	// copy matchers to avoid modifying the original slice.
+	ms := make([]*labels.Matcher, 0, len(matchers)+1)
+	ms = append(ms, matchers...)
+	ms = append(ms, labels.MustNewMatcher(labels.MatchEqual, index.TenantLabel, req.TenantId))
+
+	return q.forIndices(ctx, req, func(ir *index.Reader, id string) error {
+		bufLbls := labels.ScratchBuilder{}
+		chunks := make([]chunks.Meta, 0, 1)
+		p, err := ir.PostingsForMatchers(ctx, ms...)
+		if err != nil {
+			return err
+		}
+		for p.Next() {
+			err := ir.Series(p.At(), &bufLbls, &chunks)
+			if err != nil {
+				return err
+			}
+			if err := fn(id, &bufLbls, &chunks[0]); err != nil {
+				return err
+			}
+		}
+		return p.Err()
+	})
+}
+
+func (q *Querier) forIndices(ctx context.Context, req *metastorepb.ListBlocksForQueryRequest, fn func(ir *index.Reader, id string) error) error {
+	resp, err := q.metaStore.ListBlocksForQuery(ctx, req)
+	if err != nil {
+		return err
+	}
+	metas := resp.Blocks
+	if len(metas) == 0 {
+		return nil
+	}
+	g, ctx := errgroup.WithContext(ctx)
+	g.SetLimit(32)
+	for _, meta := range metas {
+
+		meta := meta
+		g.Go(func() error {
+			reader, err := q.blockStorage.GetObjectRange(ctx, wal.Dir+meta.Id, meta.IndexRef.Offset, meta.IndexRef.Length)
+			if err != nil {
+				return err
+			}
+			defer reader.Close()
+			// todo: use a buffer pool
+			buf := bytes.NewBuffer(make([]byte, 0, meta.IndexRef.Length))
+			_, err = buf.ReadFrom(reader)
+			if err != nil {
+				return err
+			}
+			index, err := index.NewReader(index.RealByteSlice(buf.Bytes()))
+			if err != nil {
+				return err
+			}
+			return fn(index, meta.Id)
+		})
+	}
+	return g.Wait()
+}
diff --git a/pkg/querier-rf1/wal/querier_test.go b/pkg/querier-rf1/wal/querier_test.go
new file mode 100644
index 0000000000000..5d446b8515902
--- /dev/null
+++ b/pkg/querier-rf1/wal/querier_test.go
@@ -0,0 +1,697 @@
+package wal
+
+import (
+	"bytes"
+	"context"
+	"fmt"
+	"io"
+	"sort"
+	"testing"
+	"time"
+
+	"github.com/prometheus/prometheus/model/labels"
+	"github.com/stretchr/testify/assert"
+	"github.com/stretchr/testify/require"
+	"google.golang.org/grpc"
+
+	"github.com/grafana/dskit/user"
+
+	"github.com/grafana/loki/v3/pkg/ingester-rf1/metastore/metastorepb"
+	"github.com/grafana/loki/v3/pkg/iter"
+	"github.com/grafana/loki/v3/pkg/logproto"
+	"github.com/grafana/loki/v3/pkg/logql"
+	"github.com/grafana/loki/v3/pkg/logql/syntax"
+	"github.com/grafana/loki/v3/pkg/querier/plan"
+	"github.com/grafana/loki/v3/pkg/storage/wal"
+	"github.com/grafana/loki/v3/pkg/storage/wal/chunks"
+)
+
+// MockStorage is a simple in-memory storage for testing
+type MockStorage struct {
+	data map[string][]byte
+}
+
+func NewMockStorage() *MockStorage {
+	return &MockStorage{data: make(map[string][]byte)}
+}
+
+func (m *MockStorage) GetObjectRange(_ context.Context, objectKey string, off, length int64) (io.ReadCloser, error) {
+	data, ok := m.data[objectKey]
+	if !ok {
+		return nil, fmt.Errorf("object not found: %s", objectKey)
+	}
+	return io.NopCloser(bytes.NewReader(data[off : off+length])), nil
+}
+
+func (m *MockStorage) PutObject(objectKey string, data []byte) {
+	m.data[objectKey] = data
+}
+
+// MockMetastore is a simple in-memory metastore for testing
+type MockMetastore struct {
+	blocks map[string][]*metastorepb.BlockMeta
+}
+
+func NewMockMetastore() *MockMetastore {
+	return &MockMetastore{blocks: make(map[string][]*metastorepb.BlockMeta)}
+}
+
+func (m *MockMetastore) ListBlocksForQuery(_ context.Context, req *metastorepb.ListBlocksForQueryRequest, _ ...grpc.CallOption) (*metastorepb.ListBlocksForQueryResponse, error) {
+	blocks := m.blocks[req.TenantId]
+	var result []*metastorepb.BlockMeta
+	for _, block := range blocks {
+		if block.MinTime <= req.EndTime && block.MaxTime >= req.StartTime {
+			result = append(result, block)
+		}
+	}
+	return &metastorepb.ListBlocksForQueryResponse{Blocks: result}, nil
+}
+
+func (m *MockMetastore) AddBlock(tenantID string, block *metastorepb.BlockMeta) {
+	m.blocks[tenantID] = append(m.blocks[tenantID], block)
+}
+
+func TestQuerier_SelectLogs(t *testing.T) {
+	storage := NewMockStorage()
+	metastore := NewMockMetastore()
+
+	querier, err := New(metastore, storage)
+	require.NoError(t, err)
+
+	tenantID := "test-tenant"
+	ctx := user.InjectOrgID(context.Background(), tenantID)
+
+	// Create expanded test data
+	testData := []struct {
+		labels  labels.Labels
+		entries []*logproto.Entry
+	}{
+		{
+			labels:  labels.FromStrings("app", "test1", "env", "prod"),
+			entries: generateEntries(1000, 1050),
+		},
+		{
+			labels:  labels.FromStrings("app", "test2", "env", "staging"),
+			entries: generateEntries(1025, 1075),
+		},
+		{
+			labels:  labels.FromStrings("app", "test3", "env", "dev", "version", "v1"),
+			entries: generateEntries(1050, 1100),
+		},
+		{
+			labels:  labels.FromStrings("app", "test4", "env", "prod", "version", "v2"),
+			entries: generateEntries(1075, 1125),
+		},
+	}
+
+	// Setup test data
+	setupTestData(t, storage, metastore, tenantID, testData)
+
+	// Test cases
+	testCases := []struct {
+		name          string
+		query         string
+		expectedCount int
+		expectedFirst logproto.Entry
+		expectedLast  logproto.Entry
+	}{
+		{
+			name:          "Query all logs",
+			query:         `{app=~"test.*"}`,
+			expectedCount: 204,
+			expectedFirst: logproto.Entry{
+				Timestamp: time.Unix(1000, 0),
+				Line:      "line1000",
+				Parsed: []logproto.LabelAdapter{
+					{Name: "app", Value: "test1"},
+					{Name: "env", Value: "prod"},
+				},
+			},
+			expectedLast: logproto.Entry{
+				Timestamp: time.Unix(1125, 0),
+				Line:      "line1125",
+				Parsed: []logproto.LabelAdapter{
+					{Name: "app", Value: "test4"},
+					{Name: "env", Value: "prod"},
+					{Name: "version", Value: "v2"},
+				},
+			},
+		},
+		{
+			name:          "Query specific app",
+			query:         `{app="test1"}`,
+			expectedCount: 51,
+			expectedFirst: logproto.Entry{
+				Timestamp: time.Unix(1000, 0),
+				Line:      "line1000",
+				Parsed: []logproto.LabelAdapter{
+					{Name: "app", Value: "test1"},
+					{Name: "env", Value: "prod"},
+				},
+			},
+			expectedLast: logproto.Entry{
+				Timestamp: time.Unix(1050, 0),
+				Line:      "line1050",
+				Parsed: []logproto.LabelAdapter{
+					{Name: "app", Value: "test1"},
+					{Name: "env", Value: "prod"},
+				},
+			},
+		},
+		{
+			name:          "Query with multiple label equality",
+			query:         `{app="test4", env="prod"}`,
+			expectedCount: 51,
+			expectedFirst: logproto.Entry{
+				Timestamp: time.Unix(1075, 0),
+				Line:      "line1075",
+				Parsed: []logproto.LabelAdapter{
+					{Name: "app", Value: "test4"},
+					{Name: "env", Value: "prod"},
+					{Name: "version", Value: "v2"},
+				},
+			},
+			expectedLast: logproto.Entry{
+				Timestamp: time.Unix(1125, 0),
+				Line:      "line1125",
+				Parsed: []logproto.LabelAdapter{
+					{Name: "app", Value: "test4"},
+					{Name: "env", Value: "prod"},
+					{Name: "version", Value: "v2"},
+				},
+			},
+		},
+		{
+			name:          "Query with negative regex",
+			query:         `{app=~"test.*", env!~"stag.*|dev"}`,
+			expectedCount: 102,
+			expectedFirst: logproto.Entry{
+				Timestamp: time.Unix(1000, 0),
+				Line:      "line1000",
+				Parsed: []logproto.LabelAdapter{
+					{Name: "app", Value: "test1"},
+					{Name: "env", Value: "prod"},
+				},
+			},
+			expectedLast: logproto.Entry{
+				Timestamp: time.Unix(1125, 0),
+				Line:      "line1125",
+				Parsed: []logproto.LabelAdapter{
+					{Name: "app", Value: "test4"},
+					{Name: "env", Value: "prod"},
+					{Name: "version", Value: "v2"},
+				},
+			},
+		},
+		{
+			name:          "Query with label presence",
+			query:         `{app=~"test.*", version=""}`,
+			expectedCount: 102,
+			expectedFirst: logproto.Entry{
+				Timestamp: time.Unix(1000, 0),
+				Line:      "line1000",
+				Parsed: []logproto.LabelAdapter{
+					{Name: "app", Value: "test1"},
+					{Name: "env", Value: "prod"},
+				},
+			},
+			expectedLast: logproto.Entry{
+				Timestamp: time.Unix(1075, 0),
+				Line:      "line1075",
+				Parsed: []logproto.LabelAdapter{
+					{Name: "app", Value: "test2"},
+					{Name: "env", Value: "staging"},
+				},
+			},
+		},
+	}
+
+	for _, tc := range testCases {
+		t.Run(tc.name, func(t *testing.T) {
+			expr, err := syntax.ParseExpr(tc.query)
+			require.NoError(t, err)
+
+			req := logql.SelectLogParams{
+				QueryRequest: &logproto.QueryRequest{
+					Selector:  tc.query,
+					Start:     time.Unix(1000, 0),
+					End:       time.Unix(1126, 0),
+					Limit:     10000,
+					Direction: logproto.FORWARD,
+					Plan: &plan.QueryPlan{
+						AST: expr,
+					},
+				},
+			}
+
+			iter, err := querier.SelectLogs(ctx, req)
+			require.NoError(t, err)
+
+			results := collectPushEntries(t, iter)
+
+			assert.Len(t, results, tc.expectedCount, "Unexpected number of log entries")
+			if len(results) > 0 {
+				assert.Equal(t, tc.expectedFirst, results[0], "First log entry mismatch")
+				assert.Equal(t, tc.expectedLast, results[len(results)-1], "Last log entry mismatch")
+			}
+		})
+	}
+}
+
+// SampleWithLabels is a new struct to hold both the sample and its labels
+type SampleWithLabels struct {
+	Sample logproto.Sample
+	Labels labels.Labels
+}
+
+func TestQuerier_SelectSamples(t *testing.T) {
+	storage := NewMockStorage()
+	metastore := NewMockMetastore()
+
+	querier, err := New(metastore, storage)
+	require.NoError(t, err)
+
+	tenantID := "test-tenant"
+	ctx := user.InjectOrgID(context.Background(), tenantID)
+
+	// Create test data
+	testData := []struct {
+		labels  labels.Labels
+		samples []logproto.Sample
+	}{
+		{
+			labels:  labels.FromStrings("app", "test1", "env", "prod"),
+			samples: generateSamples(1000, 1050, 1),
+		},
+		{
+			labels:  labels.FromStrings("app", "test2", "env", "staging"),
+			samples: generateSamples(1025, 1075, 2),
+		},
+		{
+			labels:  labels.FromStrings("app", "test3", "env", "dev", "version", "v1"),
+			samples: generateSamples(1050, 1100, 3),
+		},
+		{
+			labels:  labels.FromStrings("app", "test4", "env", "prod", "version", "v2"),
+			samples: generateSamples(1075, 1125, 4),
+		},
+	}
+
+	// Setup test data
+	setupTestSampleData(t, storage, metastore, tenantID, testData)
+
+	// Test cases
+	testCases := []struct {
+		name          string
+		query         string
+		expectedCount int
+		expectedFirst SampleWithLabels
+		expectedLast  SampleWithLabels
+	}{
+		{
+			name:          "Query all samples",
+			query:         `sum_over_time({app=~"test.*"} | label_format v="{{__line__}}" | unwrap v[1s])`,
+			expectedCount: 204,
+			expectedFirst: SampleWithLabels{
+				Sample: logproto.Sample{
+					Timestamp: time.Unix(1000, 0).UnixNano(),
+					Value:     1,
+				},
+				Labels: labels.FromStrings("app", "test1", "env", "prod"),
+			},
+			expectedLast: SampleWithLabels{
+				Sample: logproto.Sample{
+					Timestamp: time.Unix(1125, 0).UnixNano(),
+					Value:     4,
+				},
+				Labels: labels.FromStrings("app", "test4", "env", "prod", "version", "v2"),
+			},
+		},
+		{
+			name:          "Query specific app",
+			query:         `sum_over_time({app="test1"}| label_format v="{{__line__}}" | unwrap v[1s])`,
+			expectedCount: 51,
+			expectedFirst: SampleWithLabels{
+				Sample: logproto.Sample{
+					Timestamp: time.Unix(1000, 0).UnixNano(),
+					Value:     1,
+				},
+				Labels: labels.FromStrings("app", "test1", "env", "prod"),
+			},
+			expectedLast: SampleWithLabels{
+				Sample: logproto.Sample{
+					Timestamp: time.Unix(1050, 0).UnixNano(),
+					Value:     1,
+				},
+				Labels: labels.FromStrings("app", "test1", "env", "prod"),
+			},
+		},
+		{
+			name:          "Query with multiple label equality",
+			query:         `sum_over_time({app="test4", env="prod"}| label_format v="{{__line__}}" | unwrap v[1s])`,
+			expectedCount: 51,
+			expectedFirst: SampleWithLabels{
+				Sample: logproto.Sample{
+					Timestamp: time.Unix(1075, 0).UnixNano(),
+					Value:     4,
+				},
+				Labels: labels.FromStrings("app", "test4", "env", "prod", "version", "v2"),
+			},
+			expectedLast: SampleWithLabels{
+				Sample: logproto.Sample{
+					Timestamp: time.Unix(1125, 0).UnixNano(),
+					Value:     4,
+				},
+				Labels: labels.FromStrings("app", "test4", "env", "prod", "version", "v2"),
+			},
+		},
+		{
+			name:          "Query with negative regex",
+			query:         `sum_over_time({app=~"test.*", env!~"stag.*|dev"}| label_format v="{{__line__}}" | unwrap v[1s])`,
+			expectedCount: 102,
+			expectedFirst: SampleWithLabels{
+				Sample: logproto.Sample{
+					Timestamp: time.Unix(1000, 0).UnixNano(),
+					Value:     1,
+				},
+				Labels: labels.FromStrings("app", "test1", "env", "prod"),
+			},
+			expectedLast: SampleWithLabels{
+				Sample: logproto.Sample{
+					Timestamp: time.Unix(1125, 0).UnixNano(),
+					Value:     4,
+				},
+				Labels: labels.FromStrings("app", "test4", "env", "prod", "version", "v2"),
+			},
+		},
+	}
+
+	for _, tc := range testCases {
+		t.Run(tc.name, func(t *testing.T) {
+			expr, err := syntax.ParseExpr(tc.query)
+			require.NoError(t, err)
+
+			req := logql.SelectSampleParams{
+				SampleQueryRequest: &logproto.SampleQueryRequest{
+					Selector: tc.query,
+					Start:    time.Unix(1000, 0),
+					End:      time.Unix(1126, 0),
+					Plan: &plan.QueryPlan{
+						AST: expr,
+					},
+				},
+			}
+
+			iter, err := querier.SelectSamples(ctx, req)
+			require.NoError(t, err)
+
+			results := collectSamplesWithLabels(t, iter)
+
+			assert.Len(t, results, tc.expectedCount, "Unexpected number of samples")
+			if len(results) > 0 {
+				assert.Equal(t, tc.expectedFirst.Sample, results[0].Sample, "First sample mismatch")
+				assert.Equal(t, tc.expectedFirst.Labels, results[0].Labels, "First sample labels mismatch")
+				assert.Equal(t, tc.expectedLast.Sample, results[len(results)-1].Sample, "Last sample mismatch")
+				assert.Equal(t, tc.expectedLast.Labels, results[len(results)-1].Labels, "Last sample labels mismatch")
+			}
+		})
+	}
+}
+
+func TestQuerier_matchingChunks(t *testing.T) {
+	storage := NewMockStorage()
+	metastore := NewMockMetastore()
+
+	querier, err := New(metastore, storage)
+	require.NoError(t, err)
+
+	tenantID := "test-tenant"
+	ctx := user.InjectOrgID(context.Background(), tenantID)
+
+	// Create test data
+	testData := []struct {
+		labels  labels.Labels
+		entries []*logproto.Entry
+	}{
+		{
+			labels:  labels.FromStrings("app", "app1", "env", "prod"),
+			entries: generateEntries(1000, 1050),
+		},
+		{
+			labels:  labels.FromStrings("app", "app2", "env", "staging"),
+			entries: generateEntries(1025, 1075),
+		},
+		{
+			labels:  labels.FromStrings("app", "app3", "env", "dev"),
+			entries: generateEntries(1050, 1100),
+		},
+	}
+
+	// Setup test data
+	setupTestData(t, storage, metastore, tenantID, testData)
+
+	// Test cases
+	testCases := []struct {
+		name           string
+		matchers       []*labels.Matcher
+		start          int64
+		end            int64
+		expectedChunks []ChunkData
+	}{
+		{
+			name: "Equality matcher",
+			matchers: []*labels.Matcher{
+				labels.MustNewMatcher(labels.MatchEqual, "app", "app1"),
+			},
+			start: time.Unix(1000, 0).UnixNano(),
+			end:   time.Unix(1100, 0).UnixNano(),
+			expectedChunks: []ChunkData{
+				{
+					labels: labels.FromStrings("app", "app1", "env", "prod"),
+					meta:   &chunks.Meta{MinTime: time.Unix(1000, 0).UnixNano(), MaxTime: time.Unix(1050, 0).UnixNano()},
+				},
+			},
+		},
+		{
+			name: "Negative matcher",
+			matchers: []*labels.Matcher{
+				labels.MustNewMatcher(labels.MatchNotEqual, "app", "app1"),
+			},
+			start: time.Unix(1000, 0).UnixNano(),
+			end:   time.Unix(1100, 0).UnixNano(),
+			expectedChunks: []ChunkData{
+				{
+					labels: labels.FromStrings("app", "app2", "env", "staging"),
+					meta:   &chunks.Meta{MinTime: time.Unix(1025, 0).UnixNano(), MaxTime: time.Unix(1075, 0).UnixNano()},
+				},
+				{
+					labels: labels.FromStrings("app", "app3", "env", "dev"),
+					meta:   &chunks.Meta{MinTime: time.Unix(1050, 0).UnixNano(), MaxTime: time.Unix(1100, 0).UnixNano()},
+				},
+			},
+		},
+		{
+			name: "Regex matcher",
+			matchers: []*labels.Matcher{
+				labels.MustNewMatcher(labels.MatchRegexp, "app", "app[12]"),
+			},
+			start: time.Unix(1000, 0).UnixNano(),
+			end:   time.Unix(1100, 0).UnixNano(),
+			expectedChunks: []ChunkData{
+				{
+					labels: labels.FromStrings("app", "app1", "env", "prod"),
+					meta:   &chunks.Meta{MinTime: time.Unix(1000, 0).UnixNano(), MaxTime: time.Unix(1050, 0).UnixNano()},
+				},
+				{
+					labels: labels.FromStrings("app", "app2", "env", "staging"),
+					meta:   &chunks.Meta{MinTime: time.Unix(1025, 0).UnixNano(), MaxTime: time.Unix(1075, 0).UnixNano()},
+				},
+			},
+		},
+		{
+			name: "Not regex matcher",
+			matchers: []*labels.Matcher{
+				labels.MustNewMatcher(labels.MatchNotRegexp, "app", "app[12]"),
+			},
+			start: time.Unix(1000, 0).UnixNano(),
+			end:   time.Unix(1100, 0).UnixNano(),
+			expectedChunks: []ChunkData{
+				{
+					labels: labels.FromStrings("app", "app3", "env", "dev"),
+					meta:   &chunks.Meta{MinTime: time.Unix(1050, 0).UnixNano(), MaxTime: time.Unix(1100, 0).UnixNano()},
+				},
+			},
+		},
+		{
+			name: "Multiple matchers",
+			matchers: []*labels.Matcher{
+				labels.MustNewMatcher(labels.MatchRegexp, "app", "app.*"),
+				labels.MustNewMatcher(labels.MatchNotEqual, "env", "prod"),
+			},
+			start: time.Unix(1000, 0).UnixNano(),
+			end:   time.Unix(1100, 0).UnixNano(),
+			expectedChunks: []ChunkData{
+				{
+					labels: labels.FromStrings("app", "app2", "env", "staging"),
+					meta:   &chunks.Meta{MinTime: time.Unix(1025, 0).UnixNano(), MaxTime: time.Unix(1075, 0).UnixNano()},
+				},
+				{
+					labels: labels.FromStrings("app", "app3", "env", "dev"),
+					meta:   &chunks.Meta{MinTime: time.Unix(1050, 0).UnixNano(), MaxTime: time.Unix(1100, 0).UnixNano()},
+				},
+			},
+		},
+		{
+			name: "Time range filter",
+			matchers: []*labels.Matcher{
+				labels.MustNewMatcher(labels.MatchRegexp, "app", "app.*"),
+			},
+			start: time.Unix(1080, 0).UnixNano(),
+			end:   time.Unix(1100, 0).UnixNano(),
+			expectedChunks: []ChunkData{
+				{
+					labels: labels.FromStrings("app", "app3", "env", "dev"),
+					meta:   &chunks.Meta{MinTime: time.Unix(1050, 0).UnixNano(), MaxTime: time.Unix(1100, 0).UnixNano()},
+				},
+			},
+		},
+	}
+
+	for _, tc := range testCases {
+		t.Run(tc.name, func(t *testing.T) {
+			chunks, err := querier.matchingChunks(ctx, tenantID, tc.start, tc.end, tc.matchers...)
+			require.NoError(t, err)
+
+			sort.Slice(tc.expectedChunks, func(i, j int) bool {
+				return tc.expectedChunks[i].labels.String() < tc.expectedChunks[j].labels.String()
+			})
+			sort.Slice(chunks, func(i, j int) bool {
+				return chunks[i].labels.String() < chunks[j].labels.String()
+			})
+			assert.Equal(t, len(tc.expectedChunks), len(chunks), "Unexpected number of matching chunks")
+
+			// Verify that all returned chunks match the expected chunks
+			for i, expectedChunk := range tc.expectedChunks {
+				if i < len(chunks) {
+					assert.Equal(t, expectedChunk.labels, chunks[i].labels, "Labels mismatch for chunk %d", i)
+					assert.Equal(t, expectedChunk.meta.MinTime, chunks[i].meta.MinTime, "MinTime mismatch for chunk %d", i)
+					assert.Equal(t, expectedChunk.meta.MaxTime, chunks[i].meta.MaxTime, "MaxTime mismatch for chunk %d", i)
+				}
+			}
+
+			// Additional checks for time range and matchers
+			for _, chunk := range chunks {
+				for _, matcher := range tc.matchers {
+					assert.True(t, matcher.Matches(chunk.labels.Get(matcher.Name)),
+						"Chunk labels %v do not match criteria %v", chunk.labels, matcher)
+				}
+			}
+		})
+	}
+}
+
+func setupTestData(t *testing.T, storage *MockStorage, metastore *MockMetastore, tenantID string, testData []struct {
+	labels  labels.Labels
+	entries []*logproto.Entry
+},
+) {
+	total := 0
+	for i, data := range testData {
+		segmentID := fmt.Sprintf("segment%d", i)
+		writer, err := wal.NewWalSegmentWriter()
+		require.NoError(t, err)
+		total += len(data.entries)
+		writer.Append(tenantID, data.labels.String(), data.labels, data.entries, time.Now())
+
+		var buf bytes.Buffer
+		_, err = writer.WriteTo(&buf)
+		require.NoError(t, err)
+
+		segmentData := buf.Bytes()
+		storage.PutObject(wal.Dir+segmentID, segmentData)
+
+		blockMeta := writer.Meta(segmentID)
+		metastore.AddBlock(tenantID, blockMeta)
+	}
+	t.Log("Total entries in storage:", total)
+}
+
+func collectPushEntries(t *testing.T, iter iter.EntryIterator) []logproto.Entry {
+	var results []logproto.Entry
+	for iter.Next() {
+		entry := iter.At()
+		lbs := iter.Labels()
+		parsed, err := syntax.ParseLabels(lbs)
+		require.NoError(t, err)
+		results = append(results, logproto.Entry{
+			Timestamp: entry.Timestamp,
+			Line:      entry.Line,
+			Parsed:    logproto.FromLabelsToLabelAdapters(parsed),
+		})
+	}
+	require.NoError(t, iter.Close())
+	return results
+}
+
+func collectSamplesWithLabels(t *testing.T, iter iter.SampleIterator) []SampleWithLabels {
+	var results []SampleWithLabels
+	for iter.Next() {
+		sample := iter.At()
+		labelString := iter.Labels()
+		parsedLabels, err := syntax.ParseLabels(labelString)
+		require.NoError(t, err)
+		results = append(results, SampleWithLabels{
+			Sample: sample,
+			Labels: parsedLabels,
+		})
+	}
+	require.NoError(t, iter.Close())
+	return results
+}
+
+func generateSamples(start, end int64, value float64) []logproto.Sample {
+	var samples []logproto.Sample
+	for i := start; i <= end; i++ {
+		samples = append(samples, logproto.Sample{
+			Timestamp: time.Unix(i, 0).UnixNano(),
+			Value:     value,
+		})
+	}
+	return samples
+}
+
+func setupTestSampleData(t *testing.T, storage *MockStorage, metastore *MockMetastore, tenantID string, testData []struct {
+	labels  labels.Labels
+	samples []logproto.Sample
+},
+) {
+	total := 0
+	for i, data := range testData {
+		segmentID := fmt.Sprintf("segment%d", i)
+		writer, err := wal.NewWalSegmentWriter()
+		require.NoError(t, err)
+		total += len(data.samples)
+
+		// Convert samples to entries for the WAL writer
+		entries := make([]*logproto.Entry, len(data.samples))
+		for i, sample := range data.samples {
+			entries[i] = &logproto.Entry{
+				Timestamp: time.Unix(0, sample.Timestamp),
+				Line:      fmt.Sprintf("%f", sample.Value),
+			}
+		}
+
+		writer.Append(tenantID, data.labels.String(), data.labels, entries, time.Now())
+
+		var buf bytes.Buffer
+		_, err = writer.WriteTo(&buf)
+		require.NoError(t, err)
+
+		segmentData := buf.Bytes()
+		storage.PutObject(wal.Dir+segmentID, segmentData)
+
+		blockMeta := writer.Meta(segmentID)
+		metastore.AddBlock(tenantID, blockMeta)
+	}
+	t.Log("Total samples in storage:", total)
+}
diff --git a/pkg/storage/stores/shipper/indexshipper/tsdb/index/postings.go b/pkg/storage/stores/shipper/indexshipper/tsdb/index/postings.go
index 248cd523dab59..7c2dd99023b7d 100644
--- a/pkg/storage/stores/shipper/indexshipper/tsdb/index/postings.go
+++ b/pkg/storage/stores/shipper/indexshipper/tsdb/index/postings.go
@@ -424,6 +424,13 @@ func EmptyPostings() Postings {
 	return emptyPostings
 }
 
+// IsEmptyPostingsType returns true if the postings are an empty postings list.
+// When this function returns false, it doesn't mean that the postings isn't empty
+// (it could be an empty intersection of two non-empty postings, for example).
+func IsEmptyPostingsType(p Postings) bool {
+	return p == emptyPostings
+}
+
 // ErrPostings returns new postings that immediately error.
 func ErrPostings(err error) Postings {
 	return errPostings{err}
diff --git a/pkg/storage/wal/chunks/chunks.go b/pkg/storage/wal/chunks/chunks.go
index f0f2625596f5d..60b7b6612446b 100644
--- a/pkg/storage/wal/chunks/chunks.go
+++ b/pkg/storage/wal/chunks/chunks.go
@@ -15,6 +15,7 @@ import (
 	"github.com/klauspost/compress/s2"
 
 	"github.com/grafana/loki/v3/pkg/chunkenc"
+	"github.com/grafana/loki/v3/pkg/iter"
 	"github.com/grafana/loki/v3/pkg/logproto"
 )
 
@@ -27,7 +28,10 @@ const (
 )
 
 // Initialize the CRC32 table
-var castagnoliTable *crc32.Table
+var (
+	castagnoliTable *crc32.Table
+	_               iter.EntryIterator = (*entryBufferedIterator)(nil)
+)
 
 func init() {
 	castagnoliTable = crc32.MakeTable(crc32.Castagnoli)
diff --git a/pkg/storage/wal/chunks/doc.go b/pkg/storage/wal/chunks/doc.go
new file mode 100644
index 0000000000000..b3b50ed818c68
--- /dev/null
+++ b/pkg/storage/wal/chunks/doc.go
@@ -0,0 +1,37 @@
+// Package chunks provides functionality for efficient storage and retrieval of log data and metrics.
+//
+// The chunks package implements a compact and performant way to store and access
+// log entries and metric samples. It uses various compression and encoding techniques to minimize
+// storage requirements while maintaining fast access times.
+//
+// Key features:
+//   - Efficient chunk writing with multiple encoding options
+//   - Fast chunk reading with iterators for forward and backward traversal
+//   - Support for time-based filtering of log entries and metric samples
+//   - Integration with Loki's log query language (LogQL) for advanced filtering and processing
+//   - Separate iterators for log entries and metric samples
+//
+// Main types and functions:
+//   - WriteChunk: Writes log entries to a compressed chunk format
+//   - NewChunkReader: Creates a reader for parsing and accessing chunk data
+//   - NewEntryIterator: Provides an iterator for efficient traversal of log entries in a chunk
+//   - NewSampleIterator: Provides an iterator for efficient traversal of metric samples in a chunk
+//
+// Entry Iterator:
+// The EntryIterator allows efficient traversal of log entries within a chunk. It supports
+// both forward and backward iteration, time-based filtering, and integration with LogQL pipelines
+// for advanced log processing.
+//
+// Sample Iterator:
+// The SampleIterator enables efficient traversal of metric samples within a chunk. It supports
+// time-based filtering and integration with LogQL extractors for advanced metric processing.
+// This iterator is particularly useful for handling numeric data extracted from logs or
+// pre-aggregated metrics.
+//
+// Both iterators implement methods for accessing the current entry or sample, checking for errors,
+// and retrieving associated labels and stream hashes.
+//
+// This package is designed to work seamlessly with other components of the Loki
+// log aggregation system, providing a crucial layer for data storage and retrieval of
+// both logs and metrics.
+package chunks
diff --git a/pkg/storage/wal/chunks/entry_iterator.go b/pkg/storage/wal/chunks/entry_iterator.go
new file mode 100644
index 0000000000000..9a127266b07a8
--- /dev/null
+++ b/pkg/storage/wal/chunks/entry_iterator.go
@@ -0,0 +1,115 @@
+package chunks
+
+import (
+	"time"
+
+	"github.com/grafana/loki/v3/pkg/iter"
+	"github.com/grafana/loki/v3/pkg/logproto"
+	"github.com/grafana/loki/v3/pkg/logql/log"
+
+	"github.com/grafana/loki/pkg/push"
+)
+
+type entryBufferedIterator struct {
+	reader        *ChunkReader
+	pipeline      log.StreamPipeline
+	from, through int64
+
+	cur        logproto.Entry
+	currLabels log.LabelsResult
+}
+
+// NewEntryIterator creates an iterator for efficiently traversing log entries in a chunk.
+// It takes compressed chunk data, a processing pipeline, iteration direction, and a time range.
+// The returned iterator filters entries based on the time range and applies the given pipeline.
+// It handles both forward and backward iteration.
+//
+// Parameters:
+//   - chunkData: Compressed chunk data containing log entries
+//   - pipeline: StreamPipeline for processing and filtering entries
+//   - direction: Direction of iteration (FORWARD or BACKWARD)
+//   - from: Start timestamp (inclusive) for filtering entries
+//   - through: End timestamp (exclusive) for filtering entries
+//
+// Returns an EntryIterator and an error if creation fails.
+func NewEntryIterator(
+	chunkData []byte,
+	pipeline log.StreamPipeline,
+	direction logproto.Direction,
+	from, through int64,
+) (iter.EntryIterator, error) {
+	chkReader, err := NewChunkReader(chunkData)
+	if err != nil {
+		return nil, err
+	}
+	it := &entryBufferedIterator{
+		reader:   chkReader,
+		pipeline: pipeline,
+		from:     from,
+		through:  through,
+	}
+	if direction == logproto.FORWARD {
+		return it, nil
+	}
+	return iter.NewEntryReversedIter(it)
+}
+
+// At implements iter.EntryIterator.
+func (e *entryBufferedIterator) At() push.Entry {
+	return e.cur
+}
+
+// Close implements iter.EntryIterator.
+func (e *entryBufferedIterator) Close() error {
+	return e.reader.Close()
+}
+
+// Err implements iter.EntryIterator.
+func (e *entryBufferedIterator) Err() error {
+	return e.reader.Err()
+}
+
+// Labels implements iter.EntryIterator.
+func (e *entryBufferedIterator) Labels() string {
+	return e.currLabels.String()
+}
+
+// Next implements iter.EntryIterator.
+func (e *entryBufferedIterator) Next() bool {
+	for e.reader.Next() {
+		ts, line := e.reader.At()
+		// check if the timestamp is within the range before applying the pipeline.
+		if ts < e.from {
+			continue
+		}
+		if ts >= e.through {
+			return false
+		}
+		// todo: structured metadata.
+		newLine, lbs, matches := e.pipeline.Process(ts, line)
+		if !matches {
+			continue
+		}
+		e.currLabels = lbs
+		e.cur.Timestamp = time.Unix(0, ts)
+		e.cur.Line = string(newLine)
+		e.cur.StructuredMetadata = logproto.FromLabelsToLabelAdapters(lbs.StructuredMetadata())
+		e.cur.Parsed = logproto.FromLabelsToLabelAdapters(lbs.Parsed())
+		return true
+	}
+	return false
+}
+
+// StreamHash implements iter.EntryIterator.
+func (e *entryBufferedIterator) StreamHash() uint64 {
+	return e.pipeline.BaseLabels().Hash()
+}
+
+type sampleBufferedIterator struct {
+	reader        *ChunkReader
+	pipeline      log.StreamSampleExtractor
+	from, through int64
+
+	cur        logproto.Sample
+	currLabels log.LabelsResult
+}
diff --git a/pkg/storage/wal/chunks/entry_iterator_test.go b/pkg/storage/wal/chunks/entry_iterator_test.go
new file mode 100644
index 0000000000000..a098161134a55
--- /dev/null
+++ b/pkg/storage/wal/chunks/entry_iterator_test.go
@@ -0,0 +1,143 @@
+package chunks
+
+import (
+	"bytes"
+	"testing"
+	"time"
+
+	"github.com/prometheus/prometheus/model/labels"
+	"github.com/stretchr/testify/require"
+
+	"github.com/grafana/loki/v3/pkg/logproto"
+	"github.com/grafana/loki/v3/pkg/logql/log"
+	"github.com/grafana/loki/v3/pkg/logql/syntax"
+)
+
+func TestNewEntryIterator(t *testing.T) {
+	tests := []struct {
+		name      string
+		entries   []*logproto.Entry
+		direction logproto.Direction
+		from      int64
+		through   int64
+		pipeline  log.StreamPipeline
+		expected  []*logproto.Entry
+	}{
+		{
+			name: "Forward direction, all entries within range",
+			entries: []*logproto.Entry{
+				{Timestamp: time.Unix(0, 1), Line: "line 1"},
+				{Timestamp: time.Unix(0, 2), Line: "line 2"},
+				{Timestamp: time.Unix(0, 3), Line: "line 3"},
+			},
+			direction: logproto.FORWARD,
+			from:      0,
+			through:   4,
+			pipeline:  noopStreamPipeline(),
+			expected: []*logproto.Entry{
+				{Timestamp: time.Unix(0, 1), Line: "line 1"},
+				{Timestamp: time.Unix(0, 2), Line: "line 2"},
+				{Timestamp: time.Unix(0, 3), Line: "line 3"},
+			},
+		},
+		{
+			name: "Backward direction, all entries within range",
+			entries: []*logproto.Entry{
+				{Timestamp: time.Unix(0, 1), Line: "line 1"},
+				{Timestamp: time.Unix(0, 2), Line: "line 2"},
+				{Timestamp: time.Unix(0, 3), Line: "line 3"},
+			},
+			direction: logproto.BACKWARD,
+			from:      0,
+			through:   4,
+			pipeline:  noopStreamPipeline(),
+			expected: []*logproto.Entry{
+				{Timestamp: time.Unix(0, 3), Line: "line 3"},
+				{Timestamp: time.Unix(0, 2), Line: "line 2"},
+				{Timestamp: time.Unix(0, 1), Line: "line 1"},
+			},
+		},
+		{
+			name: "Forward direction, partial range",
+			entries: []*logproto.Entry{
+				{Timestamp: time.Unix(0, 1), Line: "line 1"},
+				{Timestamp: time.Unix(0, 2), Line: "line 2"},
+				{Timestamp: time.Unix(0, 3), Line: "line 3"},
+				{Timestamp: time.Unix(0, 4), Line: "line 4"},
+			},
+			direction: logproto.FORWARD,
+			from:      2,
+			through:   4,
+			pipeline:  noopStreamPipeline(),
+			expected: []*logproto.Entry{
+				{Timestamp: time.Unix(0, 2), Line: "line 2"},
+				{Timestamp: time.Unix(0, 3), Line: "line 3"},
+			},
+		},
+		{
+			name: "Forward direction with logql pipeline filter",
+			entries: []*logproto.Entry{
+				{Timestamp: time.Unix(0, 1).UTC(), Line: "error: something went wrong"},
+				{Timestamp: time.Unix(0, 2).UTC(), Line: "info: operation successful"},
+				{Timestamp: time.Unix(0, 3).UTC(), Line: "error: another error occurred"},
+				{Timestamp: time.Unix(0, 4).UTC(), Line: "debug: checking status"},
+			},
+			direction: logproto.FORWARD,
+			from:      1,
+			through:   5,
+			pipeline:  mustNewPipeline(t, `{foo="bar"} | line_format "foo {{ __line__ }}" |= "error"`),
+			expected: []*logproto.Entry{
+				{Timestamp: time.Unix(0, 1), Line: "foo error: something went wrong"},
+				{Timestamp: time.Unix(0, 3), Line: "foo error: another error occurred"},
+			},
+		},
+	}
+
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			var buf bytes.Buffer
+
+			// Write the chunk
+			_, err := WriteChunk(&buf, tt.entries, EncodingSnappy)
+			require.NoError(t, err, "WriteChunk failed")
+
+			// Create the iterator
+			iter, err := NewEntryIterator(buf.Bytes(), tt.pipeline, tt.direction, tt.from, tt.through)
+			require.NoError(t, err, "NewEntryIterator failed")
+			defer iter.Close()
+
+			// Read entries using the iterator
+			var actualEntries []*logproto.Entry
+			for iter.Next() {
+				entry := iter.At()
+				actualEntries = append(actualEntries, &logproto.Entry{
+					Timestamp: entry.Timestamp,
+					Line:      entry.Line,
+				})
+			}
+			require.NoError(t, iter.Err(), "Iterator encountered an error")
+
+			// Compare actual entries with expected entries
+			require.Equal(t, tt.expected, actualEntries, "Entries do not match expected values")
+		})
+	}
+}
+
+// mustNewPipeline creates a new pipeline or fails the test
+func mustNewPipeline(t *testing.T, query string) log.StreamPipeline {
+	t.Helper()
+	if query == "" {
+		return log.NewNoopPipeline().ForStream(labels.Labels{})
+	}
+	expr, err := syntax.ParseLogSelector(query, true)
+	require.NoError(t, err)
+
+	pipeline, err := expr.Pipeline()
+	require.NoError(t, err)
+
+	return pipeline.ForStream(labels.Labels{})
+}
+
+func noopStreamPipeline() log.StreamPipeline {
+	return log.NewNoopPipeline().ForStream(labels.Labels{})
+}
diff --git a/pkg/storage/wal/chunks/sample_iterator.go b/pkg/storage/wal/chunks/sample_iterator.go
new file mode 100644
index 0000000000000..4d4b397b1dd58
--- /dev/null
+++ b/pkg/storage/wal/chunks/sample_iterator.go
@@ -0,0 +1,87 @@
+package chunks
+
+import (
+	"github.com/grafana/loki/v3/pkg/iter"
+	"github.com/grafana/loki/v3/pkg/logproto"
+	"github.com/grafana/loki/v3/pkg/logql/log"
+)
+
+// NewSampleIterator creates an iterator for efficiently traversing samples in a chunk.
+// It takes compressed chunk data, a processing pipeline, iteration direction, and a time range.
+// The returned iterator filters samples based on the time range and applies the given pipeline.
+// It handles both forward and backward iteration.
+//
+// Parameters:
+//   - chunkData: Compressed chunk data containing samples
+//   - pipeline: StreamSampleExtractor for processing and filtering samples
+//   - from: Start timestamp (inclusive) for filtering samples
+//   - through: End timestamp (exclusive) for filtering samples
+//
+// Returns a SampleIterator and an error if creation fails.
+func NewSampleIterator(
+	chunkData []byte,
+	pipeline log.StreamSampleExtractor,
+	from, through int64,
+) (iter.SampleIterator, error) {
+	chkReader, err := NewChunkReader(chunkData)
+	if err != nil {
+		return nil, err
+	}
+	it := &sampleBufferedIterator{
+		reader:   chkReader,
+		pipeline: pipeline,
+		from:     from,
+		through:  through,
+	}
+	return it, nil
+}
+
+// At implements iter.SampleIterator.
+func (s *sampleBufferedIterator) At() logproto.Sample {
+	return s.cur
+}
+
+// Close implements iter.SampleIterator.
+func (s *sampleBufferedIterator) Close() error {
+	return s.reader.Close()
+}
+
+// Err implements iter.SampleIterator.
+func (s *sampleBufferedIterator) Err() error {
+	return s.reader.Err()
+}
+
+// Labels implements iter.SampleIterator.
+func (s *sampleBufferedIterator) Labels() string {
+	return s.currLabels.String()
+}
+
+// Next implements iter.SampleIterator.
+func (s *sampleBufferedIterator) Next() bool {
+	for s.reader.Next() {
+		// todo: Only use length columns for bytes_over_time without filter.
+		ts, line := s.reader.At()
+		// check if the timestamp is within the range before applying the pipeline.
+		if ts < s.from {
+			continue
+		}
+		if ts >= s.through {
+			return false
+		}
+		// todo: structured metadata.
+		val, lbs, matches := s.pipeline.Process(ts, line)
+		if !matches {
+			continue
+		}
+		s.currLabels = lbs
+		s.cur.Value = val
+		s.cur.Timestamp = ts
+		return true
+	}
+	return false
+}
+
+// StreamHash implements iter.SampleIterator.
+func (s *sampleBufferedIterator) StreamHash() uint64 {
+	return s.pipeline.BaseLabels().Hash()
+}
diff --git a/pkg/storage/wal/chunks/sample_iterator_test.go b/pkg/storage/wal/chunks/sample_iterator_test.go
new file mode 100644
index 0000000000000..4e208301ab9d6
--- /dev/null
+++ b/pkg/storage/wal/chunks/sample_iterator_test.go
@@ -0,0 +1,202 @@
+package chunks
+
+import (
+	"bytes"
+	"testing"
+	"time"
+
+	"github.com/prometheus/prometheus/model/labels"
+	"github.com/stretchr/testify/require"
+
+	"github.com/grafana/loki/v3/pkg/logproto"
+	"github.com/grafana/loki/v3/pkg/logql/log"
+	"github.com/grafana/loki/v3/pkg/logql/syntax"
+)
+
+func TestNewSampleIterator(t *testing.T) {
+	tests := []struct {
+		name      string
+		entries   []*logproto.Entry
+		from      int64
+		through   int64
+		extractor log.StreamSampleExtractor
+		expected  []logproto.Sample
+		expectErr bool
+	}{
+		{
+			name: "All samples within range",
+			entries: []*logproto.Entry{
+				{Timestamp: time.Unix(0, 1), Line: "1.0"},
+				{Timestamp: time.Unix(0, 2), Line: "2.0"},
+				{Timestamp: time.Unix(0, 3), Line: "3.0"},
+			},
+			from:      0,
+			through:   4,
+			extractor: mustNewExtractor(t, ""),
+			expected: []logproto.Sample{
+				{Timestamp: 1, Value: 1.0, Hash: 0},
+				{Timestamp: 2, Value: 1.0, Hash: 0},
+				{Timestamp: 3, Value: 1.0, Hash: 0},
+			},
+		},
+		{
+			name: "Partial range",
+			entries: []*logproto.Entry{
+				{Timestamp: time.Unix(0, 1), Line: "1.0"},
+				{Timestamp: time.Unix(0, 2), Line: "2.0"},
+				{Timestamp: time.Unix(0, 3), Line: "3.0"},
+				{Timestamp: time.Unix(0, 4), Line: "4.0"},
+			},
+			from:      2,
+			through:   4,
+			extractor: mustNewExtractor(t, ""),
+			expected: []logproto.Sample{
+				{Timestamp: 2, Value: 1.0, Hash: 0},
+				{Timestamp: 3, Value: 1.0, Hash: 0},
+			},
+		},
+		{
+			name: "Pipeline filter",
+			entries: []*logproto.Entry{
+				{Timestamp: time.Unix(0, 1), Line: "error: 1.0"},
+				{Timestamp: time.Unix(0, 2), Line: "info: 2.0"},
+				{Timestamp: time.Unix(0, 3), Line: "error: 3.0"},
+				{Timestamp: time.Unix(0, 4), Line: "debug: 4.0"},
+			},
+			from:      1,
+			through:   5,
+			extractor: mustNewExtractor(t, `count_over_time({foo="bar"} |= "error"[1m])`),
+			expected: []logproto.Sample{
+				{Timestamp: 1, Value: 1.0, Hash: 0},
+				{Timestamp: 3, Value: 1.0, Hash: 0},
+			},
+		},
+		{
+			name: "Pipeline filter with bytes_over_time",
+			entries: []*logproto.Entry{
+				{Timestamp: time.Unix(0, 1), Line: "error: 1.0"},
+				{Timestamp: time.Unix(0, 2), Line: "info: 2.0"},
+				{Timestamp: time.Unix(0, 3), Line: "error: 3.0"},
+				{Timestamp: time.Unix(0, 4), Line: "debug: 4.0"},
+			},
+			from:      1,
+			through:   5,
+			extractor: mustNewExtractor(t, `bytes_over_time({foo="bar"} |= "error"[1m])`),
+			expected: []logproto.Sample{
+				{Timestamp: 1, Value: 10, Hash: 0},
+				{Timestamp: 3, Value: 10, Hash: 0},
+			},
+		},
+		{
+			name: "No samples within range",
+			entries: []*logproto.Entry{
+				{Timestamp: time.Unix(0, 1), Line: "1.0"},
+				{Timestamp: time.Unix(0, 2), Line: "2.0"},
+			},
+			from:      3,
+			through:   5,
+			extractor: mustNewExtractor(t, ""),
+			expected:  nil,
+		},
+		{
+			name:      "Empty chunk",
+			entries:   []*logproto.Entry{},
+			from:      0,
+			through:   5,
+			extractor: mustNewExtractor(t, ""),
+			expected:  nil,
+		},
+	}
+
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			var buf bytes.Buffer
+
+			// Write the chunk
+			_, err := WriteChunk(&buf, tt.entries, EncodingSnappy)
+			require.NoError(t, err, "WriteChunk failed")
+
+			// Create the iterator
+			iter, err := NewSampleIterator(buf.Bytes(), tt.extractor, tt.from, tt.through)
+			if tt.expectErr {
+				require.Error(t, err, "Expected an error but got none")
+				return
+			}
+			require.NoError(t, err, "NewSampleIterator failed")
+			defer iter.Close()
+
+			// Read samples using the iterator
+			var actualSamples []logproto.Sample
+			for iter.Next() {
+				actualSamples = append(actualSamples, iter.At())
+			}
+			require.NoError(t, iter.Err(), "Iterator encountered an error")
+
+			// Compare actual samples with expected samples
+			require.Equal(t, tt.expected, actualSamples, "Samples do not match expected values")
+
+			// Check labels
+			if len(actualSamples) > 0 {
+				require.Equal(t, tt.extractor.BaseLabels().String(), iter.Labels(), "Unexpected labels")
+			}
+
+			// Check StreamHash
+			if len(actualSamples) > 0 {
+				require.Equal(t, tt.extractor.BaseLabels().Hash(), iter.StreamHash(), "Unexpected StreamHash")
+			}
+		})
+	}
+}
+
+func TestNewSampleIteratorErrors(t *testing.T) {
+	tests := []struct {
+		name      string
+		chunkData []byte
+		extractor log.StreamSampleExtractor
+		from      int64
+		through   int64
+	}{
+		{
+			name:      "Invalid chunk data",
+			chunkData: []byte("invalid chunk data"),
+			extractor: mustNewExtractor(t, ""),
+			from:      0,
+			through:   10,
+		},
+		{
+			name:      "Nil extractor",
+			chunkData: []byte{}, // valid empty chunk
+			extractor: nil,
+			from:      0,
+			through:   10,
+		},
+		{
+			name:      "Invalid time range",
+			chunkData: []byte{}, // valid empty chunk
+			extractor: mustNewExtractor(t, ""),
+			from:      10,
+			through:   0,
+		},
+	}
+
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			_, err := NewSampleIterator(tt.chunkData, tt.extractor, tt.from, tt.through)
+			require.Error(t, err, "Expected an error but got none")
+		})
+	}
+}
+
+func mustNewExtractor(t *testing.T, query string) log.StreamSampleExtractor {
+	t.Helper()
+	if query == `` {
+		query = `count_over_time({foo="bar"}[1m])`
+	}
+	expr, err := syntax.ParseSampleExpr(query)
+	require.NoError(t, err)
+
+	extractor, err := expr.Extractor()
+	require.NoError(t, err)
+
+	return extractor.ForStream(labels.Labels{})
+}
diff --git a/pkg/storage/wal/index/index.go b/pkg/storage/wal/index/index.go
index 29436bd2044b8..8959824c92780 100644
--- a/pkg/storage/wal/index/index.go
+++ b/pkg/storage/wal/index/index.go
@@ -24,6 +24,8 @@ import (
 	"math"
 	"slices"
 	"sort"
+	"strings"
+	"unicode/utf8"
 	"unsafe"
 
 	"github.com/prometheus/prometheus/model/labels"
@@ -51,8 +53,24 @@ const (
 
 	// checkContextEveryNIterations is used in some tight loops to check if the context is done.
 	checkContextEveryNIterations = 128
+
+	TenantLabel = "__loki_tenant__"
 )
 
+// Bitmap used by func isRegexMetaCharacter to check whether a character needs to be escaped.
+var regexMetaCharacterBytes [16]byte
+
+// isRegexMetaCharacter reports whether byte b needs to be escaped.
+func isRegexMetaCharacter(b byte) bool {
+	return b < utf8.RuneSelf && regexMetaCharacterBytes[b%16]&(1<<(b/16)) != 0
+}
+
+func init() {
+	for _, b := range []byte(`.+*?()|[]{}^$`) {
+		regexMetaCharacterBytes[b%16] |= 1 << (b / 16)
+	}
+}
+
 var AllPostingsKey = labels.Label{}
 
 type indexWriterSeries struct {
@@ -1908,3 +1926,256 @@ func (dec *Decoder) Series(b []byte, builder *labels.ScratchBuilder, chks *[]chu
 func yoloString(b []byte) string {
 	return *((*string)(unsafe.Pointer(&b)))
 }
+
+// PostingsForMatchers assembles a single postings iterator against the index reader
+// based on the given matchers. The resulting postings are not ordered by series.
+func (r *Reader) PostingsForMatchers(ctx context.Context, ms ...*labels.Matcher) (index.Postings, error) {
+	var its, notIts []index.Postings
+	// See which label must be non-empty.
+	// Optimization for case like {l=~".", l!="1"}.
+	labelMustBeSet := make(map[string]bool, len(ms))
+	for _, m := range ms {
+		if !m.Matches("") {
+			labelMustBeSet[m.Name] = true
+		}
+	}
+	isSubtractingMatcher := func(m *labels.Matcher) bool {
+		if !labelMustBeSet[m.Name] {
+			return true
+		}
+		return (m.Type == labels.MatchNotEqual || m.Type == labels.MatchNotRegexp) && m.Matches("")
+	}
+	hasSubtractingMatchers, hasIntersectingMatchers := false, false
+	for _, m := range ms {
+		if isSubtractingMatcher(m) {
+			hasSubtractingMatchers = true
+		} else {
+			hasIntersectingMatchers = true
+		}
+	}
+
+	if hasSubtractingMatchers && !hasIntersectingMatchers {
+		// If there's nothing to subtract from, add in everything and remove the notIts later.
+		// We prefer to get AllPostings so that the base of subtraction (i.e. allPostings)
+		// doesn't include series that may be added to the index reader during this function call.
+		k, v := index.AllPostingsKey()
+		allPostings, err := r.Postings(ctx, k, v)
+		if err != nil {
+			return nil, err
+		}
+		its = append(its, allPostings)
+	}
+
+	// Sort matchers to have the intersecting matchers first.
+	// This way the base for subtraction is smaller and
+	// there is no chance that the set we subtract from
+	// contains postings of series that didn't exist when
+	// we constructed the set we subtract by.
+	slices.SortStableFunc(ms, func(i, j *labels.Matcher) int {
+		if !isSubtractingMatcher(i) && isSubtractingMatcher(j) {
+			return -1
+		}
+
+		return +1
+	})
+
+	for _, m := range ms {
+		if ctx.Err() != nil {
+			return nil, ctx.Err()
+		}
+		switch {
+		case m.Name == "" && m.Value == "": // Special-case for AllPostings, used in tests at least.
+			k, v := index.AllPostingsKey()
+			allPostings, err := r.Postings(ctx, k, v)
+			if err != nil {
+				return nil, err
+			}
+			its = append(its, allPostings)
+		case labelMustBeSet[m.Name]:
+			// If this matcher must be non-empty, we can be smarter.
+			matchesEmpty := m.Matches("")
+			isNot := m.Type == labels.MatchNotEqual || m.Type == labels.MatchNotRegexp
+			switch {
+			case isNot && matchesEmpty: // l!="foo"
+				// If the label can't be empty and is a Not and the inner matcher
+				// doesn't match empty, then subtract it out at the end.
+				inverse, err := m.Inverse()
+				if err != nil {
+					return nil, err
+				}
+
+				it, err := postingsForMatcher(ctx, r, inverse)
+				if err != nil {
+					return nil, err
+				}
+				notIts = append(notIts, it)
+			case isNot && !matchesEmpty: // l!=""
+				// If the label can't be empty and is a Not, but the inner matcher can
+				// be empty we need to use inversePostingsForMatcher.
+				inverse, err := m.Inverse()
+				if err != nil {
+					return nil, err
+				}
+
+				it, err := inversePostingsForMatcher(ctx, r, inverse)
+				if err != nil {
+					return nil, err
+				}
+				if index.IsEmptyPostingsType(it) {
+					return index.EmptyPostings(), nil
+				}
+				its = append(its, it)
+			default: // l="a"
+				// Non-Not matcher, use normal postingsForMatcher.
+				it, err := postingsForMatcher(ctx, r, m)
+				if err != nil {
+					return nil, err
+				}
+				if index.IsEmptyPostingsType(it) {
+					return index.EmptyPostings(), nil
+				}
+				its = append(its, it)
+			}
+		default: // l=""
+			// If the matchers for a labelname selects an empty value, it selects all
+			// the series which don't have the label name set too. See:
+			// https://github.com/prometheus/prometheus/issues/3575 and
+			// https://github.com/prometheus/prometheus/pull/3578#issuecomment-351653555
+			it, err := inversePostingsForMatcher(ctx, r, m)
+			if err != nil {
+				return nil, err
+			}
+			notIts = append(notIts, it)
+		}
+	}
+
+	it := index.Intersect(its...)
+
+	for _, n := range notIts {
+		it = index.Without(it, n)
+	}
+
+	return it, nil
+}
+
+// inversePostingsForMatcher returns the postings for the series with the label name set but not matching the matcher.
+func inversePostingsForMatcher(ctx context.Context, ix *Reader, m *labels.Matcher) (index.Postings, error) {
+	// Fast-path for MatchNotRegexp matching.
+	// Inverse of a MatchNotRegexp is MatchRegexp (double negation).
+	// Fast-path for set matching.
+	if m.Type == labels.MatchNotRegexp {
+		setMatches := findSetMatches(m.GetRegexString())
+		if len(setMatches) > 0 {
+			return ix.Postings(ctx, m.Name, setMatches...)
+		}
+	}
+
+	// Fast-path for MatchNotEqual matching.
+	// Inverse of a MatchNotEqual is MatchEqual (double negation).
+	if m.Type == labels.MatchNotEqual {
+		return ix.Postings(ctx, m.Name, m.Value)
+	}
+
+	vals, err := ix.LabelValues(ctx, m.Name)
+	if err != nil {
+		return nil, err
+	}
+
+	var res []string
+	// If the inverse match is ="", we just want all the values.
+	if m.Type == labels.MatchEqual && m.Value == "" {
+		res = vals
+	} else {
+		for _, val := range vals {
+			if !m.Matches(val) {
+				res = append(res, val)
+			}
+		}
+	}
+
+	return ix.Postings(ctx, m.Name, res...)
+}
+
+func postingsForMatcher(ctx context.Context, ix *Reader, m *labels.Matcher) (index.Postings, error) {
+	// This method will not return postings for missing labels.
+
+	// Fast-path for equal matching.
+	if m.Type == labels.MatchEqual {
+		return ix.Postings(ctx, m.Name, m.Value)
+	}
+
+	// Fast-path for set matching.
+	if m.Type == labels.MatchRegexp {
+		setMatches := findSetMatches(m.GetRegexString())
+		if len(setMatches) > 0 {
+			return ix.Postings(ctx, m.Name, setMatches...)
+		}
+	}
+
+	vals, err := ix.LabelValues(ctx, m.Name)
+	if err != nil {
+		return nil, err
+	}
+
+	var res []string
+	for _, val := range vals {
+		if m.Matches(val) {
+			res = append(res, val)
+		}
+	}
+
+	if len(res) == 0 {
+		return index.EmptyPostings(), nil
+	}
+
+	return ix.Postings(ctx, m.Name, res...)
+}
+
+func findSetMatches(pattern string) []string {
+	// Return empty matches if the wrapper from Prometheus is missing.
+	if len(pattern) < 6 || pattern[:4] != "^(?:" || pattern[len(pattern)-2:] != ")$" {
+		return nil
+	}
+	escaped := false
+	sets := []*strings.Builder{{}}
+	init := 4
+	end := len(pattern) - 2
+	// If the regex is wrapped in a group we can remove the first and last parentheses
+	if pattern[init] == '(' && pattern[end-1] == ')' {
+		init++
+		end--
+	}
+	for i := init; i < end; i++ {
+		if escaped {
+			switch {
+			case isRegexMetaCharacter(pattern[i]):
+				sets[len(sets)-1].WriteByte(pattern[i])
+			case pattern[i] == '\\':
+				sets[len(sets)-1].WriteByte('\\')
+			default:
+				return nil
+			}
+			escaped = false
+		} else {
+			switch {
+			case isRegexMetaCharacter(pattern[i]):
+				if pattern[i] == '|' {
+					sets = append(sets, &strings.Builder{})
+				} else {
+					return nil
+				}
+			case pattern[i] == '\\':
+				escaped = true
+			default:
+				sets[len(sets)-1].WriteByte(pattern[i])
+			}
+		}
+	}
+	matches := make([]string, 0, len(sets))
+	for _, s := range sets {
+		if s.Len() > 0 {
+			matches = append(matches, s.String())
+		}
+	}
+	return matches
+}
diff --git a/pkg/storage/wal/segment.go b/pkg/storage/wal/segment.go
index 65ee7c093d2ff..93b824bbcb70e 100644
--- a/pkg/storage/wal/segment.go
+++ b/pkg/storage/wal/segment.go
@@ -35,7 +35,7 @@ var (
 			}
 		},
 	}
-	tenantLabel = "__loki_tenant__"
+	Dir = "loki-v2/wal/anon/"
 )
 
 func init() {
@@ -156,8 +156,8 @@ func (b *SegmentWriter) getOrCreateStream(id streamID, lbls labels.Labels) *stre
 	if ok {
 		return s
 	}
-	if lbls.Get(tenantLabel) == "" {
-		lbls = labels.NewBuilder(lbls).Set(tenantLabel, id.tenant).Labels()
+	if lbls.Get(index.TenantLabel) == "" {
+		lbls = labels.NewBuilder(lbls).Set(index.TenantLabel, id.tenant).Labels()
 	}
 	s = streamSegmentPool.Get().(*streamSegment)
 	s.lbls = lbls
diff --git a/pkg/storage/wal/segment_test.go b/pkg/storage/wal/segment_test.go
index 90755adcfcc3f..cbe42587bf7a2 100644
--- a/pkg/storage/wal/segment_test.go
+++ b/pkg/storage/wal/segment_test.go
@@ -122,7 +122,7 @@ func TestWalSegmentWriter_Append(t *testing.T) {
 				require.True(t, ok)
 				lbs, err := syntax.ParseLabels(expected.labels)
 				require.NoError(t, err)
-				lbs = append(lbs, labels.Label{Name: tenantLabel, Value: expected.tenant})
+				lbs = append(lbs, labels.Label{Name: index.TenantLabel, Value: expected.tenant})
 				sort.Sort(lbs)
 				require.Equal(t, lbs, stream.lbls)
 				require.Equal(t, expected.entries, stream.entries)
@@ -168,7 +168,7 @@ func TestMultiTenantWrite(t *testing.T) {
 
 	for _, tenant := range tenants {
 		for _, lbl := range lbls {
-			expectedSeries = append(expectedSeries, labels.NewBuilder(lbl).Set(tenantLabel, tenant).Labels().String())
+			expectedSeries = append(expectedSeries, labels.NewBuilder(lbl).Set(index.TenantLabel, tenant).Labels().String())
 		}
 	}