Skip to content

Commit

Permalink
Storing my things
Browse files Browse the repository at this point in the history
  • Loading branch information
paul1r committed Jan 14, 2024
1 parent 0065fd6 commit 84da5f6
Show file tree
Hide file tree
Showing 4 changed files with 391 additions and 62 deletions.
109 changes: 57 additions & 52 deletions pkg/bloomcompactor/bloomcompactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"context"
"fmt"
"math"
"math/rand"
"os"
"time"

Expand All @@ -43,10 +42,6 @@ import (
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"

"path/filepath"

"github.com/google/uuid"

"github.com/grafana/loki/pkg/bloomutils"
"github.com/grafana/loki/pkg/storage"
v1 "github.com/grafana/loki/pkg/storage/bloom/v1"
Expand Down Expand Up @@ -308,9 +303,11 @@ func (c *Compactor) compactUsers(ctx context.Context, logger log.Logger, sc stor
}

// Skip tenant if compaction is not enabled
if !c.limits.BloomCompactorEnabled(tenant) {
level.Info(tenantLogger).Log("msg", "compaction disabled for tenant. Skipping.")
continue
if tenant != "29" {
if !c.limits.BloomCompactorEnabled(tenant) {
level.Info(tenantLogger).Log("msg", "compaction disabled for tenant. Skipping.")
continue
}
}

// Skip this table if it is too old for the tenant limits.
Expand All @@ -321,6 +318,12 @@ func (c *Compactor) compactUsers(ctx context.Context, logger log.Logger, sc stor
continue
}

if tableName != "loki_dev_006_index_19733" {
level.Debug(tenantLogger).Log("msg", "skipping tenant because table is today", "table-max-age", tableMaxAge, "table-start", tableInterval.Start, "now", now)
continue
}
level.Debug(tenantLogger).Log("table-max-age", tableMaxAge, "table-start", tableInterval.Start, "now", now, "table name", tableName)

// Ensure the tenant ID belongs to our shard.
if !c.sharding.OwnsTenant(tenant) {
c.metrics.compactionRunSkippedTenants.Inc()
Expand Down Expand Up @@ -492,7 +495,7 @@ func (c *Compactor) runCompact(ctx context.Context, logger log.Logger, job Job,
}
var metas []bloomshipper.Meta
//TODO Configure pool for these to avoid allocations
var activeBloomBlocksRefs []bloomshipper.BlockRef
//var activeBloomBlocksRefs []bloomshipper.BlockRef

metas, err := c.bloomShipperClient.GetMetas(ctx, metaSearchParams)
if err != nil {
Expand Down Expand Up @@ -575,58 +578,60 @@ func (c *Compactor) runCompact(ctx context.Context, logger log.Logger, job Job,
}

}
/*
archivePath := filepath.Join(c.cfg.WorkingDirectory, uuid.New().String())

blockToUpload, err := bloomshipper.CompressBloomBlock(resultingBlock.BlockRef, archivePath, localDst, logger)
if err != nil {
level.Error(logger).Log("msg", "failed compressing bloom blocks into tar file", "err", err)
return err
}
archivePath := filepath.Join(c.cfg.WorkingDirectory, uuid.New().String())
defer func() {
err = os.Remove(archivePath)
blockToUpload, err := bloomshipper.CompressBloomBlock(resultingBlock.BlockRef, archivePath, localDst, logger)
if err != nil {
level.Error(logger).Log("msg", "failed removing archive file", "err", err, "file", archivePath)
level.Error(logger).Log("msg", "failed compressing bloom blocks into tar file", "err", err)
return err
}
}()
// Do not change the signature of PutBlocks yet.
// Once block size is limited potentially, compactNewChunks will return multiple blocks, hence a list is appropriate.
storedBlocks, err := c.bloomShipperClient.PutBlocks(ctx, []bloomshipper.Block{blockToUpload})
if err != nil {
level.Error(logger).Log("msg", "failed uploading blocks to storage", "err", err)
return err
}
defer func() {
err = os.Remove(archivePath)
if err != nil {
level.Error(logger).Log("msg", "failed removing archive file", "err", err, "file", archivePath)
}
}()
// all blocks are new and active blocks
for _, block := range storedBlocks {
activeBloomBlocksRefs = append(activeBloomBlocksRefs, block.BlockRef)
}
// Do not change the signature of PutBlocks yet.
// Once block size is limited potentially, compactNewChunks will return multiple blocks, hence a list is appropriate.
storedBlocks, err := c.bloomShipperClient.PutBlocks(ctx, []bloomshipper.Block{blockToUpload})
if err != nil {
level.Error(logger).Log("msg", "failed uploading blocks to storage", "err", err)
return err
}
// all blocks are new and active blocks
for _, block := range storedBlocks {
activeBloomBlocksRefs = append(activeBloomBlocksRefs, block.BlockRef)
}
// TODO delete old metas in later compactions
// After all is done, create one meta file and upload to storage
meta := bloomshipper.Meta{
MetaRef: bloomshipper.MetaRef{
Ref: bloomshipper.Ref{
TenantID: job.tenantID,
TableName: job.tableName,
MinFingerprint: uint64(job.minFp),
MaxFingerprint: uint64(job.maxFp),
StartTimestamp: job.from,
EndTimestamp: job.through,
Checksum: rand.Uint32(), // Discuss if checksum is needed for Metas, why should we read all data again.
// TODO delete old metas in later compactions
// After all is done, create one meta file and upload to storage
meta := bloomshipper.Meta{
MetaRef: bloomshipper.MetaRef{
Ref: bloomshipper.Ref{
TenantID: job.tenantID,
TableName: job.tableName,
MinFingerprint: uint64(job.minFp),
MaxFingerprint: uint64(job.maxFp),
StartTimestamp: job.from,
EndTimestamp: job.through,
Checksum: rand.Uint32(), // Discuss if checksum is needed for Metas, why should we read all data again.
},
},
},
Tombstones: blocksMatchingJob,
Blocks: activeBloomBlocksRefs,
}
Tombstones: blocksMatchingJob,
Blocks: activeBloomBlocksRefs,
}
err = c.bloomShipperClient.PutMeta(ctx, meta)
if err != nil {
level.Error(logger).Log("msg", "failed uploading meta.json to storage", "err", err)
return err
}
err = c.bloomShipperClient.PutMeta(ctx, meta)
if err != nil {
level.Error(logger).Log("msg", "failed uploading meta.json to storage", "err", err)
return err
}
*/
level.Info(logger).Log("msg", "finished compacting table", "table", job.tableName, "tenant", job.tenantID)
return nil
}
2 changes: 1 addition & 1 deletion pkg/bloomcompactor/mergecompactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func makeBlockIterFromBlocks(ctx context.Context, logger log.Logger,
blockIters := make([]v1.PeekingIterator[*v1.SeriesWithBloom], len(blocksToUpdate))
blockPaths := make([]string, len(blocksToUpdate))

err := concurrency.ForEachJob(ctx, len(blocksToUpdate), len(blocksToUpdate), func(ctx context.Context, i int) error {
err := concurrency.ForEachJob(ctx, len(blocksToUpdate), 1, func(ctx context.Context, i int) error {
b := blocksToUpdate[i]

lazyBlock, err := bloomShipperClient.GetBlock(ctx, b)
Expand Down
21 changes: 12 additions & 9 deletions pkg/bloomcompactor/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,18 @@ func matchingBlocks(metas []bloomshipper.Meta, job Job) ([]bloomshipper.Meta, []

for _, meta := range metasMatchingJob {
for _, blockRef := range meta.Blocks {
if _, ok := oldTombstonedBlockRefs[blockRef]; ok {
// skip any previously tombstoned blockRefs
continue
}

if blockRef.IndexPath == job.indexPath {
// index has not changed, no compaction needed
continue
}
/*
if _, ok := oldTombstonedBlockRefs[blockRef]; ok {
// skip any previously tombstoned blockRefs
continue
}
if blockRef.IndexPath == job.indexPath {
// index has not changed, no compaction needed
continue
}
*/
blocksMatchingJob = append(blocksMatchingJob, blockRef)
}
}
Expand Down
Loading

0 comments on commit 84da5f6

Please sign in to comment.