Skip to content

Commit

Permalink
feat(wal): Add sizing information to writer and reader. (#13267)
Browse files Browse the repository at this point in the history
  • Loading branch information
cyriltovena authored Jun 20, 2024
1 parent f897758 commit 41fbacd
Show file tree
Hide file tree
Showing 5 changed files with 185 additions and 65 deletions.
7 changes: 7 additions & 0 deletions pkg/pattern/iter/batch_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package iter

import (
"sort"
"testing"

"github.com/go-kit/log"
Expand Down Expand Up @@ -211,6 +212,12 @@ func TestReadMetricsBatch(t *testing.T) {
it := NewSumMergeSampleIterator(tt.seriesIter)
got, err := ReadMetricsBatch(it, tt.batchSize, log.NewNopLogger())
require.NoError(t, err)
sort.Slice(tt.expected.Series, func(i, j int) bool {
return tt.expected.Series[i].Labels < tt.expected.Series[j].Labels
})
sort.Slice(got.Series, func(i, j int) bool {
return got.Series[i].Labels < got.Series[j].Labels
})
require.Equal(t, tt.expected.Series, got.Series)
})
}
Expand Down
70 changes: 7 additions & 63 deletions pkg/storage/wal/chunks/chunks_test.go
Original file line number Diff line number Diff line change
@@ -1,18 +1,16 @@
package chunks

import (
"bufio"
"bytes"
"fmt"
"os"
"path/filepath"
"strconv"
"testing"
"time"

"github.com/stretchr/testify/require"

"github.com/grafana/loki/v3/pkg/logproto"
"github.com/grafana/loki/v3/pkg/storage/wal/testdata"
)

func TestChunkReaderWriter(t *testing.T) {
Expand Down Expand Up @@ -121,11 +119,11 @@ func TestChunkReaderWriter(t *testing.T) {
}

func TestChunkReaderWriterWithLogGenerator(t *testing.T) {
filenames := testDataFile()
filenames := testdata.Files()

for _, filename := range filenames {
t.Run(filename, func(t *testing.T) {
gen := newLogGenerator(t, filename)
gen := testdata.NewLogGenerator(t, filename)
defer gen.Close()

var entries []*logproto.Entry
Expand Down Expand Up @@ -196,10 +194,10 @@ var (

// Benchmark reads with log generator
func BenchmarkReadChunkWithLogGenerator(b *testing.B) {
filenames := testDataFile()
filenames := testdata.Files()
for _, filename := range filenames {
b.Run(filename, func(b *testing.B) {
gen := newLogGenerator(b, filename)
gen := testdata.NewLogGenerator(b, filename)
defer gen.Close()

var entries []*logproto.Entry
Expand Down Expand Up @@ -239,12 +237,12 @@ func BenchmarkReadChunkWithLogGenerator(b *testing.B) {

// Benchmark with log generator
func BenchmarkWriteChunkWithLogGenerator(b *testing.B) {
filenames := testDataFile()
filenames := testdata.Files()

for _, filename := range filenames {
for _, count := range []int{1000, 10000, 100000} {
b.Run(fmt.Sprintf("%s-%d", filename, count), func(b *testing.B) {
gen := newLogGenerator(b, filename)
gen := testdata.NewLogGenerator(b, filename)
defer gen.Close()

var entries []*logproto.Entry
Expand Down Expand Up @@ -278,24 +276,6 @@ func BenchmarkWriteChunkWithLogGenerator(b *testing.B) {
}
}

func testDataFile() []string {
testdataDir := "../testdata"
files, err := os.ReadDir(testdataDir)
if err != nil {
panic(err)
}

var fileNames []string
for _, file := range files {
if !file.IsDir() {
filePath := filepath.Join(testdataDir, file.Name())
fileNames = append(fileNames, filePath)
}
}

return fileNames
}

// generateLogEntries generates a slice of logproto.Entry with the given count.
func generateLogEntries(count int) []*logproto.Entry {
entries := make([]*logproto.Entry, count)
Expand All @@ -307,39 +287,3 @@ func generateLogEntries(count int) []*logproto.Entry {
}
return entries
}

type logGenerator struct {
f *os.File
s *bufio.Scanner
}

func (g *logGenerator) Next() (bool, []byte) {
if g.s.Scan() {
return true, g.s.Bytes()
}
g.reset()
return g.s.Scan(), g.s.Bytes()
}

func (g *logGenerator) Close() {
if g.f != nil {
g.f.Close()
}
g.f = nil
}

func (g *logGenerator) reset() {
_, _ = g.f.Seek(0, 0)
g.s = bufio.NewScanner(g.f)
}

func newLogGenerator(t testing.TB, filename string) *logGenerator {
t.Helper()
file, err := os.Open(filename)
require.NoError(t, err)

return &logGenerator{
f: file,
s: bufio.NewScanner(file),
}
}
35 changes: 33 additions & 2 deletions pkg/storage/wal/segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,9 @@ type streamID struct {
}

type SegmentWriter struct {
streams *swiss.Map[streamID, *streamSegment]
buf1 encoding.Encbuf
streams *swiss.Map[streamID, *streamSegment]
buf1 encoding.Encbuf
inputSize int64
}

type streamSegment struct {
Expand All @@ -61,6 +62,9 @@ func (b *SegmentWriter) Append(tenantID, labelsString string, lbls labels.Labels
if len(entries) == 0 {
return
}
for _, e := range entries {
b.inputSize += int64(len(e.Line))
}
id := streamID{labels: labelsString, tenant: tenantID}
s, ok := b.streams.Get(id)
if !ok {
Expand Down Expand Up @@ -224,6 +228,13 @@ func (s *streamSegment) WriteTo(w io.Writer) (n int64, err error) {
func (b *SegmentWriter) Reset() {
b.streams.Clear()
b.buf1.Reset()
b.inputSize = 0
}

// InputSize returns the total size of the input data written to the writer.
// It doesn't account for timestamps and labels.
func (b *SegmentWriter) InputSize() int64 {
return b.inputSize
}

type SegmentReader struct {
Expand Down Expand Up @@ -332,3 +343,23 @@ func (r *SegmentReader) Series(ctx context.Context) (*SeriesIter, error) {

return NewSeriesIter(r.idr, ps, r.b), nil
}

type Sizes struct {
Index int64
Series []int64
}

func (r *SegmentReader) Sizes() (Sizes, error) {
var sizes Sizes
sizes.Index = r.idr.Size()
it, err := r.Series(context.Background())
if err != nil {
return sizes, err
}
sizes.Series = []int64{}
for it.Next() {
_, size := it.chunksMeta[0].Ref.Unpack()
sizes.Series = append(sizes.Series, int64(size))
}
return sizes, err
}
67 changes: 67 additions & 0 deletions pkg/storage/wal/segment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,14 @@ import (
"testing"
"time"

"github.com/dustin/go-humanize"
"github.com/prometheus/prometheus/model/labels"
"github.com/stretchr/testify/require"

"github.com/grafana/loki/v3/pkg/logproto"
"github.com/grafana/loki/v3/pkg/logql/syntax"
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb"
"github.com/grafana/loki/v3/pkg/storage/wal/testdata"

"github.com/grafana/loki/pkg/push"
)
Expand Down Expand Up @@ -186,3 +188,68 @@ func TestMultiTenantWrite(t *testing.T) {
require.NoError(t, iter.Err())
require.ElementsMatch(t, expectedSeries, actualSeries)
}

func TestCompression(t *testing.T) {
size := []int64{250 * 1024, 500 * 1024, 750 * 1024, 1 << 20, 2 << 20, 5 << 20, 10 << 20, 20 << 20, 50 << 20, 100 << 20}
for _, s := range size {
t.Run(fmt.Sprintf("size %.2f", float64(s)/(1024*1024)), func(t *testing.T) {
testCompression(t, s)
})
}
}

func testCompression(t *testing.T, maxInputSize int64) {
w := NewWalSegmentWriter()
dst := bytes.NewBuffer(nil)
files := testdata.Files()
lbls := []labels.Labels{}
generators := []*testdata.LogGenerator{}

for _, file := range files {
lbls = append(lbls, labels.FromStrings("filename", file, "namespace", "dev"))
lbls = append(lbls, labels.FromStrings("filename", file, "namespace", "prod"))
g := testdata.NewLogGenerator(t, file)
generators = append(generators, g, g)
}
inputSize := int64(0)
for inputSize < maxInputSize {
for i, lbl := range lbls {
more, line := generators[i].Next()
if !more {
continue
}
inputSize += int64(len(line))
w.Append("tenant", lbl.String(), lbl, []*push.Entry{
{Timestamp: time.Unix(0, int64(i*1e9)), Line: string(line)},
})
}
}

require.Equal(t, inputSize, w.InputSize())

now := time.Now()
n, err := w.WriteTo(dst)
require.NoError(t, err)
require.True(t, n > 0)
compressionTime := time.Since(now)

r, err := NewReader(dst.Bytes())
require.NoError(t, err)
inputSizeMB := float64(w.InputSize()) / (1024 * 1024)
outputSizeMB := float64(dst.Len()) / (1024 * 1024)
compressionRatio := (1 - (outputSizeMB / inputSizeMB)) * 100

t.Logf("Input Size: %s\n", humanize.Bytes(uint64(w.InputSize())))
t.Logf("Output Size: %s\n", humanize.Bytes(uint64(dst.Len())))
t.Logf("Compression Ratio: %.2f%%\n", compressionRatio)
t.Logf("Write time: %s\n", compressionTime)
sizes, err := r.Sizes()
require.NoError(t, err)
t.Logf("Total chunks %d\n", len(sizes.Series))
t.Logf("Index size %s\n", humanize.Bytes(uint64(sizes.Index)))
sizesString := ""
for _, size := range sizes.Series {
sizesString += humanize.Bytes(uint64(size)) + ", "
}
t.Logf("Series sizes: [%s]\n", sizesString)
}
71 changes: 71 additions & 0 deletions pkg/storage/wal/testdata/generator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package testdata

import (
"bufio"
"os"
"path/filepath"
"testing"

"github.com/stretchr/testify/require"
)

type LogGenerator struct {
f *os.File
s *bufio.Scanner
}

func (g *LogGenerator) Next() (bool, []byte) {
if g.s.Scan() {
return true, g.s.Bytes()
}
g.reset()
return g.s.Scan(), g.s.Bytes()
}

func (g *LogGenerator) Close() {
if g.f != nil {
g.f.Close()
}
g.f = nil
}

func (g *LogGenerator) reset() {
_, _ = g.f.Seek(0, 0)
g.s = bufio.NewScanner(g.f)
}

func NewLogGenerator(t testing.TB, filename string) *LogGenerator {
t.Helper()
file, err := os.Open(filename)
require.NoError(t, err)

return &LogGenerator{
f: file,
s: bufio.NewScanner(file),
}
}

func Files() []string {
testdataDir := "./testdata"
files, err := os.ReadDir(testdataDir)
if err != nil && !os.IsNotExist(err) {
if !os.IsNotExist(err) {
panic(err)
}
testdataDir = "../testdata"
files, err = os.ReadDir(testdataDir)
if err != nil {
panic(err)
}
}

var fileNames []string
for _, file := range files {
if !file.IsDir() {
filePath := filepath.Join(testdataDir, file.Name())
fileNames = append(fileNames, filePath)
}
}

return fileNames
}

0 comments on commit 41fbacd

Please sign in to comment.