Skip to content

Commit

Permalink
Update chunk and head fmt to v4 (non-indexed labels) (#10242)
Browse files Browse the repository at this point in the history
**What this PR does / why we need it**:

This PR updates the default chunk and heads format to v4 to support
writing and reading non-indexed labels.

---------

Co-authored-by: Sandeep Sukhani <[email protected]>
  • Loading branch information
salvacorts and sandeepsukhani authored Aug 14, 2023
1 parent 8e6a543 commit 287f29b
Show file tree
Hide file tree
Showing 14 changed files with 107 additions and 105 deletions.
15 changes: 13 additions & 2 deletions pkg/chunkenc/memchunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ const (
chunkFormatV3
chunkFormatV4

DefaultChunkFormat = chunkFormatV3 // the currently used chunk format
DefaultChunkFormat = chunkFormatV4 // the currently used chunk format

blocksPerChunk = 10
maxLineLength = 1024 * 1024 * 1024
Expand Down Expand Up @@ -85,7 +85,7 @@ const (
UnorderedHeadBlockFmt
UnorderedWithNonIndexedLabelsHeadBlockFmt

DefaultHeadBlockFmt = UnorderedHeadBlockFmt
DefaultHeadBlockFmt = UnorderedWithNonIndexedLabelsHeadBlockFmt
)

var magicNumber = uint32(0x12EE56A)
Expand Down Expand Up @@ -348,8 +348,19 @@ func NewMemChunk(enc Encoding, head HeadBlockFmt, blockSize, targetSize int) *Me
return newMemChunkWithFormat(DefaultChunkFormat, enc, head, blockSize, targetSize)
}

func panicIfInvalidFormat(chunkFmt byte, head HeadBlockFmt) {
if chunkFmt == chunkFormatV2 && head != OrderedHeadBlockFmt {
panic("only OrderedHeadBlockFmt is supported for V2 chunks")
}
if chunkFmt == chunkFormatV4 && head != UnorderedWithNonIndexedLabelsHeadBlockFmt {
panic("only UnorderedWithNonIndexedLabelsHeadBlockFmt is supported for V4 chunks")
}
}

// NewMemChunk returns a new in-mem chunk.
func newMemChunkWithFormat(format byte, enc Encoding, head HeadBlockFmt, blockSize, targetSize int) *MemChunk {
panicIfInvalidFormat(format, head)

symbolizer := newSymbolizer()
return &MemChunk{
blockSize: blockSize, // The blockSize in bytes.
Expand Down
13 changes: 7 additions & 6 deletions pkg/chunkenc/memchunk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ var (
}
)

const DefaultTestHeadBlockFmt = OrderedHeadBlockFmt
const DefaultTestHeadBlockFmt = DefaultHeadBlockFmt

func TestBlocksInclusive(t *testing.T) {
chk := NewMemChunk(EncNone, DefaultTestHeadBlockFmt, testBlockSize, testTargetSize)
Expand Down Expand Up @@ -637,11 +637,11 @@ func TestChunkSize(t *testing.T) {
}
var result []res
for _, bs := range testBlockSizes {
for _, f := range HeadBlockFmts {
for _, f := range allPossibleFormats {
for _, enc := range testEncoding {
name := fmt.Sprintf("%s_%s", enc.String(), humanize.Bytes(uint64(bs)))
t.Run(name, func(t *testing.T) {
c := NewMemChunk(enc, f, bs, testTargetSize)
c := newMemChunkWithFormat(f.chunkFormat, enc, f.headBlockFmt, bs, testTargetSize)
inserted := fillChunk(c)
b, err := c.Bytes()
if err != nil {
Expand Down Expand Up @@ -685,7 +685,8 @@ func TestChunkStats(t *testing.T) {
inserted++
entry.Timestamp = entry.Timestamp.Add(time.Nanosecond)
}
expectedSize := (inserted * len(entry.Line)) + (inserted * 2 * binary.MaxVarintLen64)
// For each entry: timestamp <varint>, line size <varint>, line <bytes>, num of non-indexed labels <varint>
expectedSize := inserted * (len(entry.Line) + 3*binary.MaxVarintLen64)
statsCtx, ctx := stats.NewContext(context.Background())

it, err := c.Iterator(ctx, first.Add(-time.Hour), entry.Timestamp.Add(time.Hour), logproto.BACKWARD, noopStreamPipeline)
Expand Down Expand Up @@ -734,7 +735,7 @@ func TestChunkStats(t *testing.T) {
}

func TestIteratorClose(t *testing.T) {
for _, f := range HeadBlockFmts {
for _, f := range allPossibleFormats {
for _, enc := range testEncoding {
t.Run(enc.String(), func(t *testing.T) {
for _, test := range []func(iter iter.EntryIterator, t *testing.T){
Expand Down Expand Up @@ -762,7 +763,7 @@ func TestIteratorClose(t *testing.T) {
}
},
} {
c := NewMemChunk(enc, f, testBlockSize, testTargetSize)
c := newMemChunkWithFormat(f.chunkFormat, enc, f.headBlockFmt, testBlockSize, testTargetSize)
inserted := fillChunk(c)
iter, err := c.Iterator(context.Background(), time.Unix(0, 0), time.Unix(0, inserted), logproto.BACKWARD, noopStreamPipeline)
if err != nil {
Expand Down
10 changes: 5 additions & 5 deletions pkg/chunkenc/unordered_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -428,7 +428,7 @@ func BenchmarkHeadBlockWrites(b *testing.B) {
}

func TestUnorderedChunkIterators(t *testing.T) {
c := NewMemChunk(EncSnappy, UnorderedHeadBlockFmt, testBlockSize, testTargetSize)
c := NewMemChunk(EncSnappy, UnorderedWithNonIndexedLabelsHeadBlockFmt, testBlockSize, testTargetSize)
for i := 0; i < 100; i++ {
// push in reverse order
require.Nil(t, c.Append(&logproto.Entry{
Expand Down Expand Up @@ -546,7 +546,7 @@ func BenchmarkUnorderedRead(b *testing.B) {
}

func TestUnorderedIteratorCountsAllEntries(t *testing.T) {
c := NewMemChunk(EncSnappy, UnorderedHeadBlockFmt, testBlockSize, testTargetSize)
c := NewMemChunk(EncSnappy, UnorderedWithNonIndexedLabelsHeadBlockFmt, testBlockSize, testTargetSize)
fillChunkRandomOrder(c, false)

ct := 0
Expand Down Expand Up @@ -583,7 +583,7 @@ func TestUnorderedIteratorCountsAllEntries(t *testing.T) {
}

func chunkFrom(xs []logproto.Entry) ([]byte, error) {
c := NewMemChunk(EncSnappy, OrderedHeadBlockFmt, testBlockSize, testTargetSize)
c := NewMemChunk(EncSnappy, DefaultHeadBlockFmt, testBlockSize, testTargetSize)
for _, x := range xs {
if err := c.Append(&x); err != nil {
return nil, err
Expand Down Expand Up @@ -643,7 +643,7 @@ func TestReorder(t *testing.T) {
},
} {
t.Run(tc.desc, func(t *testing.T) {
c := NewMemChunk(EncSnappy, UnorderedHeadBlockFmt, testBlockSize, testTargetSize)
c := NewMemChunk(EncSnappy, DefaultHeadBlockFmt, testBlockSize, testTargetSize)
for _, x := range tc.input {
require.Nil(t, c.Append(&x))
}
Expand All @@ -660,7 +660,7 @@ func TestReorder(t *testing.T) {
}

func TestReorderAcrossBlocks(t *testing.T) {
c := NewMemChunk(EncSnappy, UnorderedHeadBlockFmt, testBlockSize, testTargetSize)
c := NewMemChunk(EncSnappy, DefaultHeadBlockFmt, testBlockSize, testTargetSize)
for _, batch := range [][]int{
// ensure our blocks have overlapping bounds and must be reordered
// before closing.
Expand Down
2 changes: 1 addition & 1 deletion pkg/ingester/chunk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func TestIterator(t *testing.T) {
}{
{"dumbChunk", chunkenc.NewDumbChunk},
{"gzipChunk", func() chunkenc.Chunk {
return chunkenc.NewMemChunk(chunkenc.EncGZIP, chunkenc.UnorderedHeadBlockFmt, 256*1024, 0)
return chunkenc.NewMemChunk(chunkenc.EncGZIP, chunkenc.DefaultHeadBlockFmt, 256*1024, 0)
}},
} {
t.Run(chk.name, func(t *testing.T) {
Expand Down
139 changes: 65 additions & 74 deletions pkg/ingester/encoding_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,95 +38,86 @@ func dummyConf() *Config {
}

func Test_EncodingChunks(t *testing.T) {
for _, f := range chunkenc.HeadBlockFmts {
for _, close := range []bool{true, false} {
for _, tc := range []struct {
desc string
conf Config
}{
{
// mostly for historical parity
desc: "dummyConf",
conf: *dummyConf(),
},
{
desc: "default",
conf: defaultIngesterTestConfig(t),
},
} {

t.Run(fmt.Sprintf("%v-%v-%s", f, close, tc.desc), func(t *testing.T) {
conf := tc.conf
c := chunkenc.NewMemChunk(chunkenc.EncGZIP, f, conf.BlockSize, conf.TargetChunkSize)
fillChunk(t, c)
for _, close := range []bool{true, false} {
for _, tc := range []struct {
desc string
conf Config
}{
{
// mostly for historical parity
desc: "dummyConf",
conf: *dummyConf(),
},
{
desc: "default",
conf: defaultIngesterTestConfig(t),
},
} {

t.Run(fmt.Sprintf("%v-%s", close, tc.desc), func(t *testing.T) {
conf := tc.conf
c := chunkenc.NewMemChunk(chunkenc.EncGZIP, chunkenc.DefaultHeadBlockFmt, conf.BlockSize, conf.TargetChunkSize)
fillChunk(t, c)
if close {
require.Nil(t, c.Close())
}

from := []chunkDesc{
{
chunk: c,
},
// test non zero values
{
chunk: c,
closed: true,
synced: true,
flushed: time.Unix(1, 0),
lastUpdated: time.Unix(0, 1),
},
}
there, err := toWireChunks(from, nil)
require.Nil(t, err)
chunks := make([]Chunk, 0, len(there))
for _, c := range there {
chunks = append(chunks, c.Chunk)

// Ensure closed head chunks only contain the head metadata but no entries
if close {
require.Nil(t, c.Close())
const unorderedHeadSize = 2
require.Equal(t, unorderedHeadSize, len(c.Head))
} else {
require.Greater(t, len(c.Head), 0)
}
}

from := []chunkDesc{
{
chunk: c,
},
// test non zero values
{
chunk: c,
closed: true,
synced: true,
flushed: time.Unix(1, 0),
lastUpdated: time.Unix(0, 1),
},
}
there, err := toWireChunks(from, nil)
require.Nil(t, err)
chunks := make([]Chunk, 0, len(there))
for _, c := range there {
chunks = append(chunks, c.Chunk)

// Ensure closed head chunks only contain the head metadata but no entries
if close {
if f < chunkenc.UnorderedHeadBlockFmt {
// format + #entries + size + mint + maxt
const orderedHeadSize = 5
require.Equal(t, orderedHeadSize, len(c.Head))
} else {
// format + #lines
const unorderedHeadSize = 2
require.Equal(t, unorderedHeadSize, len(c.Head))
}
} else {
require.Greater(t, len(c.Head), 0)
}
}
backAgain, err := fromWireChunks(&conf, chunks)
require.Nil(t, err)

backAgain, err := fromWireChunks(&conf, chunks)
for i, to := range backAgain {
// test the encoding directly as the substructure may change.
// for instance the uncompressed size for each block is not included in the encoded version.
enc, err := to.chunk.Bytes()
require.Nil(t, err)
to.chunk = nil

for i, to := range backAgain {
// test the encoding directly as the substructure may change.
// for instance the uncompressed size for each block is not included in the encoded version.
enc, err := to.chunk.Bytes()
require.Nil(t, err)
to.chunk = nil

matched := from[i]
exp, err := matched.chunk.Bytes()
require.Nil(t, err)
matched.chunk = nil
matched := from[i]
exp, err := matched.chunk.Bytes()
require.Nil(t, err)
matched.chunk = nil

require.Equal(t, exp, enc)
require.Equal(t, matched, to)
require.Equal(t, exp, enc)
require.Equal(t, matched, to)

}
}

})
}
})
}
}
}

func Test_EncodingCheckpoint(t *testing.T) {
conf := dummyConf()
c := chunkenc.NewMemChunk(chunkenc.EncGZIP, chunkenc.UnorderedHeadBlockFmt, conf.BlockSize, conf.TargetChunkSize)
c := chunkenc.NewMemChunk(chunkenc.EncGZIP, chunkenc.DefaultHeadBlockFmt, conf.BlockSize, conf.TargetChunkSize)
require.Nil(t, c.Append(&logproto.Entry{
Timestamp: time.Unix(1, 0),
Line: "hi there",
Expand Down
2 changes: 1 addition & 1 deletion pkg/ingester/flush_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func buildChunkDecs(t testing.TB) []*chunkDesc {
for i := range res {
res[i] = &chunkDesc{
closed: true,
chunk: chunkenc.NewMemChunk(chunkenc.EncSnappy, chunkenc.UnorderedHeadBlockFmt, dummyConf().BlockSize, dummyConf().TargetChunkSize),
chunk: chunkenc.NewMemChunk(chunkenc.EncSnappy, chunkenc.DefaultHeadBlockFmt, dummyConf().BlockSize, dummyConf().TargetChunkSize),
}
fillChunk(t, res[i].chunk)
require.NoError(t, res[i].chunk.Close())
Expand Down
2 changes: 1 addition & 1 deletion pkg/ingester/stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ func TestStreamIterator(t *testing.T) {
new func() *chunkenc.MemChunk
}{
{"gzipChunk", func() *chunkenc.MemChunk {
return chunkenc.NewMemChunk(chunkenc.EncGZIP, chunkenc.UnorderedHeadBlockFmt, 256*1024, 0)
return chunkenc.NewMemChunk(chunkenc.EncGZIP, chunkenc.DefaultHeadBlockFmt, 256*1024, 0)
}},
} {
t.Run(chk.name, func(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/ingester/wal/encoding.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ const (
// The current type of Entries that this distribution writes.
// Loki can read in a backwards compatible manner, but will write the newest variant.
// TODO: Change to WALRecordEntriesV3?
const CurrentEntriesRec = WALRecordEntriesV2
const CurrentEntriesRec = WALRecordEntriesV3

// Record is a struct combining the series and samples record.
type Record struct {
Expand Down
1 change: 1 addition & 0 deletions pkg/logql/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ func RecordRangeAndInstantQueryMetrics(
"returned_lines", returnedLines,
"throughput", strings.Replace(humanize.Bytes(uint64(stats.Summary.BytesProcessedPerSecond)), " ", "", 1),
"total_bytes", strings.Replace(humanize.Bytes(uint64(stats.Summary.TotalBytesProcessed)), " ", "", 1),
"total_bytes_non_indexed_labels", strings.Replace(humanize.Bytes(uint64(stats.Summary.TotalNonIndexedLabelsBytesProcessed)), " ", "", 1),
"lines_per_second", stats.Summary.LinesProcessedPerSecond,
"total_lines", stats.Summary.TotalLinesProcessed,
"post_filter_lines", stats.Summary.TotalPostFilterLines,
Expand Down
4 changes: 2 additions & 2 deletions pkg/storage/hack/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func fillStore(cm storage.ClientMetrics) error {
labelsBuilder.Set(labels.MetricName, "logs")
metric := labelsBuilder.Labels()
fp := client.Fingerprint(lbs)
chunkEnc := chunkenc.NewMemChunk(chunkenc.EncLZ4_4M, chunkenc.UnorderedHeadBlockFmt, 262144, 1572864)
chunkEnc := chunkenc.NewMemChunk(chunkenc.EncLZ4_4M, chunkenc.DefaultHeadBlockFmt, 262144, 1572864)
for ts := start.UnixNano(); ts < start.UnixNano()+time.Hour.Nanoseconds(); ts = ts + time.Millisecond.Nanoseconds() {
entry := &logproto.Entry{
Timestamp: time.Unix(0, ts),
Expand All @@ -114,7 +114,7 @@ func fillStore(cm storage.ClientMetrics) error {
if flushCount >= maxChunks {
return
}
chunkEnc = chunkenc.NewMemChunk(chunkenc.EncLZ4_64k, chunkenc.UnorderedHeadBlockFmt, 262144, 1572864)
chunkEnc = chunkenc.NewMemChunk(chunkenc.EncLZ4_64k, chunkenc.DefaultHeadBlockFmt, 262144, 1572864)
}
}
}(i)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,12 +216,13 @@ func createChunk(t testing.TB, userID string, lbs labels.Labels, from model.Time
labelsBuilder.Set(labels.MetricName, "logs")
metric := labelsBuilder.Labels()
fp := ingesterclient.Fingerprint(lbs)
chunkEnc := chunkenc.NewMemChunk(chunkenc.EncSnappy, chunkenc.UnorderedHeadBlockFmt, blockSize, targetSize)
chunkEnc := chunkenc.NewMemChunk(chunkenc.EncSnappy, chunkenc.DefaultHeadBlockFmt, blockSize, targetSize)

for ts := from; !ts.After(through); ts = ts.Add(1 * time.Minute) {
require.NoError(t, chunkEnc.Append(&logproto.Entry{
Timestamp: ts.Time(),
Line: ts.String(),
Timestamp: ts.Time(),
Line: ts.String(),
NonIndexedLabels: logproto.FromLabelsToLabelAdapters(labels.FromStrings("foo", ts.String())),
}))
}

Expand Down Expand Up @@ -522,18 +523,15 @@ func TestChunkRewriter(t *testing.T) {

for _, interval := range expectedChunks[i] {
for curr := interval.Start; curr <= interval.End; curr = curr.Add(time.Minute) {
// Test ready to pass/fail when we change the default chunk and head format.
var nonIndexedLabels []logproto.LabelAdapter
if chunkenc.DefaultChunkFormat == 4 && chunkenc.DefaultHeadBlockFmt == chunkenc.UnorderedWithNonIndexedLabelsHeadBlockFmt {
nonIndexedLabels = logproto.FromLabelsToLabelAdapters(labels.FromStrings("foo", curr.String()))
}
expectedNonIndexedLabels := labels.FromStrings("foo", curr.String())

require.True(t, newChunkItr.Next())
require.Equal(t, logproto.Entry{
Timestamp: curr.Time(),
Line: curr.String(),
NonIndexedLabels: nonIndexedLabels,
NonIndexedLabels: logproto.FromLabelsToLabelAdapters(expectedNonIndexedLabels),
}, newChunkItr.Entry())
require.Equal(t, expectedNonIndexedLabels.String(), newChunkItr.Labels())
}
}

Expand Down
Loading

0 comments on commit 287f29b

Please sign in to comment.