From 84da5f606609cfb6463455554fe4e7bc785d7b72 Mon Sep 17 00:00:00 2001 From: Paul Rogers Date: Sat, 13 Jan 2024 19:18:43 -0500 Subject: [PATCH] Storing my things --- pkg/bloomcompactor/bloomcompactor.go | 109 ++++----- pkg/bloomcompactor/mergecompactor.go | 2 +- pkg/bloomcompactor/utils.go | 21 +- pkg/storage/bloom/v1/reader_test.go | 321 +++++++++++++++++++++++++++ 4 files changed, 391 insertions(+), 62 deletions(-) create mode 100644 pkg/storage/bloom/v1/reader_test.go diff --git a/pkg/bloomcompactor/bloomcompactor.go b/pkg/bloomcompactor/bloomcompactor.go index dbe307ff18822..50cb964c3ebb8 100644 --- a/pkg/bloomcompactor/bloomcompactor.go +++ b/pkg/bloomcompactor/bloomcompactor.go @@ -28,7 +28,6 @@ import ( "context" "fmt" "math" - "math/rand" "os" "time" @@ -43,10 +42,6 @@ import ( "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/labels" - "path/filepath" - - "github.com/google/uuid" - "github.com/grafana/loki/pkg/bloomutils" "github.com/grafana/loki/pkg/storage" v1 "github.com/grafana/loki/pkg/storage/bloom/v1" @@ -308,9 +303,11 @@ func (c *Compactor) compactUsers(ctx context.Context, logger log.Logger, sc stor } // Skip tenant if compaction is not enabled - if !c.limits.BloomCompactorEnabled(tenant) { - level.Info(tenantLogger).Log("msg", "compaction disabled for tenant. Skipping.") - continue + if tenant != "29" { + if !c.limits.BloomCompactorEnabled(tenant) { + level.Info(tenantLogger).Log("msg", "compaction disabled for tenant. Skipping.") + continue + } } // Skip this table if it is too old for the tenant limits. @@ -321,6 +318,12 @@ func (c *Compactor) compactUsers(ctx context.Context, logger log.Logger, sc stor continue } + if tableName != "loki_dev_006_index_19733" { + level.Debug(tenantLogger).Log("msg", "skipping tenant because table is today", "table-max-age", tableMaxAge, "table-start", tableInterval.Start, "now", now) + continue + } + level.Debug(tenantLogger).Log("table-max-age", tableMaxAge, "table-start", tableInterval.Start, "now", now, "table name", tableName) + // Ensure the tenant ID belongs to our shard. if !c.sharding.OwnsTenant(tenant) { c.metrics.compactionRunSkippedTenants.Inc() @@ -492,7 +495,7 @@ func (c *Compactor) runCompact(ctx context.Context, logger log.Logger, job Job, } var metas []bloomshipper.Meta //TODO Configure pool for these to avoid allocations - var activeBloomBlocksRefs []bloomshipper.BlockRef + //var activeBloomBlocksRefs []bloomshipper.BlockRef metas, err := c.bloomShipperClient.GetMetas(ctx, metaSearchParams) if err != nil { @@ -575,58 +578,60 @@ func (c *Compactor) runCompact(ctx context.Context, logger log.Logger, job Job, } } + /* - archivePath := filepath.Join(c.cfg.WorkingDirectory, uuid.New().String()) - - blockToUpload, err := bloomshipper.CompressBloomBlock(resultingBlock.BlockRef, archivePath, localDst, logger) - if err != nil { - level.Error(logger).Log("msg", "failed compressing bloom blocks into tar file", "err", err) - return err - } + archivePath := filepath.Join(c.cfg.WorkingDirectory, uuid.New().String()) - defer func() { - err = os.Remove(archivePath) + blockToUpload, err := bloomshipper.CompressBloomBlock(resultingBlock.BlockRef, archivePath, localDst, logger) if err != nil { - level.Error(logger).Log("msg", "failed removing archive file", "err", err, "file", archivePath) + level.Error(logger).Log("msg", "failed compressing bloom blocks into tar file", "err", err) + return err } - }() - // Do not change the signature of PutBlocks yet. - // Once block size is limited potentially, compactNewChunks will return multiple blocks, hence a list is appropriate. - storedBlocks, err := c.bloomShipperClient.PutBlocks(ctx, []bloomshipper.Block{blockToUpload}) - if err != nil { - level.Error(logger).Log("msg", "failed uploading blocks to storage", "err", err) - return err - } + defer func() { + err = os.Remove(archivePath) + if err != nil { + level.Error(logger).Log("msg", "failed removing archive file", "err", err, "file", archivePath) + } + }() - // all blocks are new and active blocks - for _, block := range storedBlocks { - activeBloomBlocksRefs = append(activeBloomBlocksRefs, block.BlockRef) - } + // Do not change the signature of PutBlocks yet. + // Once block size is limited potentially, compactNewChunks will return multiple blocks, hence a list is appropriate. + storedBlocks, err := c.bloomShipperClient.PutBlocks(ctx, []bloomshipper.Block{blockToUpload}) + if err != nil { + level.Error(logger).Log("msg", "failed uploading blocks to storage", "err", err) + return err + } + + // all blocks are new and active blocks + for _, block := range storedBlocks { + activeBloomBlocksRefs = append(activeBloomBlocksRefs, block.BlockRef) + } - // TODO delete old metas in later compactions - // After all is done, create one meta file and upload to storage - meta := bloomshipper.Meta{ - MetaRef: bloomshipper.MetaRef{ - Ref: bloomshipper.Ref{ - TenantID: job.tenantID, - TableName: job.tableName, - MinFingerprint: uint64(job.minFp), - MaxFingerprint: uint64(job.maxFp), - StartTimestamp: job.from, - EndTimestamp: job.through, - Checksum: rand.Uint32(), // Discuss if checksum is needed for Metas, why should we read all data again. + // TODO delete old metas in later compactions + // After all is done, create one meta file and upload to storage + meta := bloomshipper.Meta{ + MetaRef: bloomshipper.MetaRef{ + Ref: bloomshipper.Ref{ + TenantID: job.tenantID, + TableName: job.tableName, + MinFingerprint: uint64(job.minFp), + MaxFingerprint: uint64(job.maxFp), + StartTimestamp: job.from, + EndTimestamp: job.through, + Checksum: rand.Uint32(), // Discuss if checksum is needed for Metas, why should we read all data again. + }, }, - }, - Tombstones: blocksMatchingJob, - Blocks: activeBloomBlocksRefs, - } + Tombstones: blocksMatchingJob, + Blocks: activeBloomBlocksRefs, + } - err = c.bloomShipperClient.PutMeta(ctx, meta) - if err != nil { - level.Error(logger).Log("msg", "failed uploading meta.json to storage", "err", err) - return err - } + err = c.bloomShipperClient.PutMeta(ctx, meta) + if err != nil { + level.Error(logger).Log("msg", "failed uploading meta.json to storage", "err", err) + return err + } + */ level.Info(logger).Log("msg", "finished compacting table", "table", job.tableName, "tenant", job.tenantID) return nil } diff --git a/pkg/bloomcompactor/mergecompactor.go b/pkg/bloomcompactor/mergecompactor.go index 6e2143f75135c..37cb045a01d15 100644 --- a/pkg/bloomcompactor/mergecompactor.go +++ b/pkg/bloomcompactor/mergecompactor.go @@ -45,7 +45,7 @@ func makeBlockIterFromBlocks(ctx context.Context, logger log.Logger, blockIters := make([]v1.PeekingIterator[*v1.SeriesWithBloom], len(blocksToUpdate)) blockPaths := make([]string, len(blocksToUpdate)) - err := concurrency.ForEachJob(ctx, len(blocksToUpdate), len(blocksToUpdate), func(ctx context.Context, i int) error { + err := concurrency.ForEachJob(ctx, len(blocksToUpdate), 1, func(ctx context.Context, i int) error { b := blocksToUpdate[i] lazyBlock, err := bloomShipperClient.GetBlock(ctx, b) diff --git a/pkg/bloomcompactor/utils.go b/pkg/bloomcompactor/utils.go index 4b9c3ff541fe2..c7864e0e0ade4 100644 --- a/pkg/bloomcompactor/utils.go +++ b/pkg/bloomcompactor/utils.go @@ -20,15 +20,18 @@ func matchingBlocks(metas []bloomshipper.Meta, job Job) ([]bloomshipper.Meta, [] for _, meta := range metasMatchingJob { for _, blockRef := range meta.Blocks { - if _, ok := oldTombstonedBlockRefs[blockRef]; ok { - // skip any previously tombstoned blockRefs - continue - } - - if blockRef.IndexPath == job.indexPath { - // index has not changed, no compaction needed - continue - } + /* + if _, ok := oldTombstonedBlockRefs[blockRef]; ok { + // skip any previously tombstoned blockRefs + continue + } + + if blockRef.IndexPath == job.indexPath { + // index has not changed, no compaction needed + continue + } + + */ blocksMatchingJob = append(blocksMatchingJob, blockRef) } } diff --git a/pkg/storage/bloom/v1/reader_test.go b/pkg/storage/bloom/v1/reader_test.go new file mode 100644 index 0000000000000..843560412f01a --- /dev/null +++ b/pkg/storage/bloom/v1/reader_test.go @@ -0,0 +1,321 @@ +package v1 + +import ( + "bufio" + "fmt" + "github.com/stretchr/testify/require" + "io" + "os" + "os/exec" + "path/filepath" + "strconv" + "testing" +) + +func listFilesRecursive(rootPath string) ([]string, error) { + var files []string + + err := filepath.Walk(rootPath, func(path string, info os.FileInfo, err error) error { + if err != nil { + return err + } + //fmt.Println(info.Name()) + // Skip directories + if info.IsDir() { + return nil + } + // Add the file path to the slice + files = append(files, path) + return nil + }) + + return files, err +} + +func TestReadingLocalFiles(t *testing.T) { + var ( + //dir = "/Users/progers/baddat/loki_dev_006_index_19731/29/blooms/" + dir = "/Users/progers/baddata2/loki_dev_006_index_19733/29/blooms/" + ) + files, _ := listFilesRecursive(dir) + for _, file := range files { + cmd := exec.Command("mkdir", "/tmp/foo") + _ = cmd.Run() + fmt.Println(file) + file, _ := os.Open(file) + defer file.Close() + reader := bufio.NewReader(file) + UnTarGz("/tmp/foo", reader) + r := NewDirectoryBlockReader("/tmp/foo") + err := r.Init() + require.NoError(t, err) + + _, err = r.Index() + require.NoError(t, err) + + _, err = r.Blooms() + require.NoError(t, err) + + block := NewBlock(r) + blockQuerier := NewBlockQuerier(block) + blockIters := NewPeekingIter[*SeriesWithBloom](blockQuerier) + for blockIters.Next() { + _ = blockIters.At() + } + blockQuerier.blooms.Next() + + cmd = exec.Command("rm", "-rf", "/tmp/foo") + _ = cmd.Run() + } +} + +func TestEmptyFiles(t *testing.T) { + + cmd := exec.Command("mkdir", "/tmp/foo") + _ = cmd.Run() + cmd = exec.Command("touch", "/tmp/foo/bloom") + _ = cmd.Run() + cmd = exec.Command("touch", "/tmp/foo/series") + _ = cmd.Run() + + r := NewDirectoryBlockReader("/tmp/foo") + err := r.Init() + require.NoError(t, err) + + _, err = r.Index() + require.NoError(t, err) + + _, err = r.Blooms() + require.NoError(t, err) + + block := NewBlock(r) + blockQuerier := NewBlockQuerier(block) + blockIters := NewPeekingIter[*SeriesWithBloom](blockQuerier) + for blockIters.Next() { + _ = blockIters.At() + } + blockQuerier.blooms.Next() + + cmd = exec.Command("rm", "-rf", "/tmp/foo") + _ = cmd.Run() + +} + +func TestReadingLocalFilesViaShipper(t *testing.T) { + var ( + //dir = "/Users/progers/baddat/loki_dev_006_index_19731/29/blooms/" + dir = "/Users/progers/baddata2/loki_dev_006_index_19733/29/blooms/" + ) + files, _ := listFilesRecursive(dir) + for _, file := range files { + cmd := exec.Command("mkdir", "/tmp/foo") + _ = cmd.Run() + fmt.Println(file) + file, _ := os.Open(file) + defer file.Close() + reader := bufio.NewReader(file) + UnTarGz("/tmp/foo", reader) + r := NewDirectoryBlockReader("/tmp/foo") + err := r.Init() + require.NoError(t, err) + + _, err = r.Index() + require.NoError(t, err) + + _, err = r.Blooms() + require.NoError(t, err) + + block := NewBlock(r) + blockQuerier := NewBlockQuerier(block) + blockIters := NewPeekingIter[*SeriesWithBloom](blockQuerier) + for blockIters.Next() { + _ = blockIters.At() + } + blockQuerier.blooms.Next() + + cmd = exec.Command("rm", "-rf", "/tmp/foo") + _ = cmd.Run() + } +} + +func TestReadingAllLocalFiles(t *testing.T) { + var ( + //dir = "/Users/progers/baddat/loki_dev_006_index_19731/29/blooms/" + dir = "/Users/progers/baddata2/loki_dev_006_index_19733/29/blooms/" + ) + cmd := exec.Command("mkdir", "/tmp/foo") + _ = cmd.Run() + files, _ := listFilesRecursive(dir) + blockIters := make([]PeekingIterator[*SeriesWithBloom], len(files)) + for i, file := range files { + tmpDirI := "/tmp/foo/" + strconv.Itoa(i) + cmd := exec.Command("mkdir", tmpDirI) + _ = cmd.Run() + fmt.Println(file) + file, _ := os.Open(file) + defer file.Close() + reader := bufio.NewReader(file) + UnTarGz(tmpDirI, reader) + r := NewDirectoryBlockReader(tmpDirI) + err := r.Init() + require.NoError(t, err) + + _, err = r.Index() + require.NoError(t, err) + + _, err = r.Blooms() + require.NoError(t, err) + + block := NewBlock(r) + blockQuerier := NewBlockQuerier(block) + blockIters[i] = NewPeekingIter[*SeriesWithBloom](blockQuerier) + + } + heap := NewHeapIterForSeriesWithBloom(blockIters...) + fmt.Printf("made heap iterator\n") + _ = heap.Next() + fmt.Println("Got here") + cmd = exec.Command("rm", "-rf", "/tmp/foo") + _ = cmd.Run() +} + +func TestReadingAllLocalFilesAndDoMore(t *testing.T) { + var ( + //dir = "/Users/progers/baddat/loki_dev_006_index_19731/29/blooms/" + dir = "/Users/progers/baddata2/loki_dev_006_index_19733/29/blooms/" + ) + cmd := exec.Command("mkdir", "/tmp/foo") + _ = cmd.Run() + files, _ := listFilesRecursive(dir) + blockIters := make([]PeekingIterator[*SeriesWithBloom], len(files)) + for i, file := range files { + tmpDirI := "/tmp/foo/" + strconv.Itoa(i) + cmd := exec.Command("mkdir", tmpDirI) + _ = cmd.Run() + fmt.Println(file) + file, _ := os.Open(file) + defer file.Close() + reader := bufio.NewReader(file) + UnTarGz(tmpDirI, reader) + r := NewDirectoryBlockReader(tmpDirI) + err := r.Init() + require.NoError(t, err) + + _, err = r.Index() + require.NoError(t, err) + + _, err = r.Blooms() + require.NoError(t, err) + + block := NewBlock(r) + blockQuerier := NewBlockQuerier(block) + blockIters[i] = NewPeekingIter[*SeriesWithBloom](blockQuerier) + + } + seriesFromSeriesMeta := make([]*Series, 0) + seriesIter := NewSliceIter(seriesFromSeriesMeta) + populate := createPopulateFunc() + blockOptions := NewBlockOptions(4, 0) + mergeBlockBuilder, _ := NewPersistentBlockBuilder("/tmp/foo", blockOptions) + //mergedBlocks := NewPeekingIter[*SeriesWithBloom](NewHeapIterForSeriesWithBloom(blockIters...)) + mergeBuilder := NewMergeBuilder( + blockIters, + seriesIter, + populate) + + fmt.Printf("made merge builder\n") + _, _ = mergeBlockBuilder.MergeBuild(mergeBuilder) + fmt.Printf("did merge build\n") + + cmd = exec.Command("rm", "-rf", "/tmp/foo") + _ = cmd.Run() +} + +func TestReadingAllLocalFilesAndDoMoreWithSeries(t *testing.T) { + var ( + dir = "/Users/progers/baddata2/loki_dev_006_index_19733/29/blooms/" + ) + + _ = os.MkdirAll("/tmp/foo", os.ModePerm) + files, _ := listFilesRecursive(dir) + blockIters := make([]PeekingIterator[*SeriesWithBloom], len(files)) + for i, file := range files { + tmpDirI := "/tmp/foo/" + strconv.Itoa(i) + _, err := os.Stat(tmpDirI) + //fmt.Printf("File %d is %s\n", i, file) + // Check if the error is due to the directory not existing + if os.IsNotExist(err) { + cmd := exec.Command("mkdir", tmpDirI) + _ = cmd.Run() + file, _ := os.Open(file) + defer file.Close() + reader := bufio.NewReader(file) + UnTarGz(tmpDirI, reader) + } + r := NewDirectoryBlockReader(tmpDirI) + block := NewBlock(r) + blockQuerier := NewBlockQuerier(block) + blockIters[i] = NewPeekingIter[*SeriesWithBloom](blockQuerier) + } + seriesFromSeriesMeta := make([]*Series, 2) + series := &Series{ + Fingerprint: 1, + Chunks: nil, + } + seriesFromSeriesMeta[0] = series + seriesFromSeriesMeta[1] = series + seriesIter := NewSliceIter(seriesFromSeriesMeta) + populate := createPopulateFunc() + blockOptions := NewBlockOptions(4, 0) + mergeBlockBuilder, _ := NewPersistentBlockBuilder("/tmp/foo", blockOptions) + mergeBuilder := NewMergeBuilder( + blockIters, + seriesIter, + populate) + + fmt.Printf("made merge builder\n") + _, _ = mergeBlockBuilder.MergeBuild(mergeBuilder) + fmt.Printf("did merge build\n") + +} + +func createPopulateFunc() func(series *Series, bloom *Bloom) error { + return func(series *Series, bloom *Bloom) error { + return nil + } +} + +// Redeclared here since this is a test file and we get an import loop otherwise +type PersistentBlockBuilder struct { + builder *BlockBuilder + localDst string +} + +func NewPersistentBlockBuilder(localDst string, blockOptions BlockOptions) (*PersistentBlockBuilder, error) { + // write bloom to a local dir + b, err := NewBlockBuilder(blockOptions, NewDirectoryBlockWriter(localDst)) + if err != nil { + return nil, err + } + builder := PersistentBlockBuilder{ + builder: b, + localDst: localDst, + } + return &builder, nil +} + +func (p *PersistentBlockBuilder) BuildFrom(itr Iterator[SeriesWithBloom]) (uint32, error) { + return p.builder.BuildFrom(itr) +} + +func (p *PersistentBlockBuilder) MergeBuild(builder *MergeBuilder) (uint32, error) { + return builder.Build(p.builder) +} + +func (p *PersistentBlockBuilder) Data() (io.ReadSeekCloser, error) { + blockFile, err := os.Open(filepath.Join(p.localDst, BloomFileName)) + if err != nil { + return nil, err + } + return blockFile, nil +}