Skip to content

Commit

Permalink
Merge branch 'main' into paul1r/compress_bloom_blocks
Browse files Browse the repository at this point in the history
  • Loading branch information
paul1r authored Nov 15, 2023
2 parents fde9dcc + 3f0f8fa commit 2ba5477
Show file tree
Hide file tree
Showing 22 changed files with 3,748 additions and 2,500 deletions.
1 change: 0 additions & 1 deletion docs/sources/setup/install/helm/reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -2027,7 +2027,6 @@ null
<td>Limits config</td>
<td><pre lang="json">
{
"enforce_metric_name": false,
"max_cache_freshness_per_query": "10m",
"reject_old_samples": true,
"reject_old_samples_max_age": "168h",
Expand Down
4 changes: 0 additions & 4 deletions docs/sources/setup/upgrade/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -241,10 +241,6 @@ Some Loki metrics started with the prefix `cortex_`. In this release they will b
- `cortex_query_scheduler_queue_duration_seconds_sum`
- `cortex_query_scheduler_queue_length`
- `cortex_query_scheduler_running`
- `cortex_quota_cgroup_cpu_max`
- `cortex_quota_cgroup_cpu_period`
- `cortex_quota_cpu_count`
- `cortex_quota_gomaxprocs`
- `cortex_ring_member_heartbeats_total`
- `cortex_ring_member_tokens_owned`
- `cortex_ring_member_tokens_to_own`
Expand Down
5 changes: 2 additions & 3 deletions pkg/bloomcompactor/TODO.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
* Should we consider configuring falsePosRate of sbf at runtime?
* Adding falsePosRate of sbf into config
* Add per-tenant bool to enable compaction
* Use tarGz, untarGz before uploding blocks to storage
* Return checksum from `BuildFrom`
* Move meta creation to an outer layer, ensure one meta.json per compaction cycle.
* Introduce back `maxLookBackPeriod` as `RejectOldSamplesMaxAge` limit in distributors
77 changes: 48 additions & 29 deletions pkg/bloomcompactor/bloomcompactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -503,10 +503,13 @@ func createLocalDirName(workingDir string, job Job) string {
return filepath.Join(workingDir, dir)
}

func CompactNewChunks(ctx context.Context, logger log.Logger, job Job, chunks []chunk.Chunk, bt *v1.BloomTokenizer, bloomShipperClient bloomshipper.Client, dst string) (err error) {
// Compacts given list of chunks, uploads them to storage and returns a list of bloomBlocks
func CompactNewChunks(ctx context.Context, logger log.Logger, job Job,
chunks []chunk.Chunk, bt *v1.BloomTokenizer,
bloomShipperClient bloomshipper.Client, dst string) ([]bloomshipper.Block, error) {
// Ensure the context has not been canceled (ie. compactor shutdown has been triggered).
if err := ctx.Err(); err != nil {
return err
return nil, err
}

// Create a bloom for this series
Expand All @@ -526,31 +529,14 @@ func CompactNewChunks(ctx context.Context, logger log.Logger, job Job, chunks []
blocks, err := buildBloomBlock(ctx, logger, bloomForChks, job, dst)
if err != nil {
level.Error(logger).Log("building bloomBlocks", err)
return
return nil, err
}

storedBlocks, err := bloomShipperClient.PutBlocks(ctx, []bloomshipper.Block{blocks})
if err != nil {
level.Error(logger).Log("putting blocks to storage", err)
return
}

storedBlockRefs := make([]bloomshipper.BlockRef, len(storedBlocks))
// Build and upload meta.json to storage
meta := bloomshipper.Meta{
// After successful compaction there should be no tombstones
Tombstones: make([]bloomshipper.BlockRef, 0),
Blocks: storedBlockRefs,
}

// TODO move this to an outer layer, otherwise creates a meta per block
err = bloomShipperClient.PutMeta(ctx, meta)
if err != nil {
level.Error(logger).Log("putting meta.json to storage", err)
return
return nil, err
}

return nil
return storedBlocks, nil
}

func (c *Compactor) runCompact(ctx context.Context, logger log.Logger, job Job, bloomShipperClient bloomshipper.Client, bt *v1.BloomTokenizer, storeClient storeClient) error {
Expand All @@ -559,23 +545,43 @@ func (c *Compactor) runCompact(ctx context.Context, logger log.Logger, job Job,
return err
}

// TODO call bloomShipperClient.GetMetas to get existing meta.json
metaSearchParams := bloomshipper.MetaSearchParams{
TenantID: job.tenantID,
MinFingerprint: uint64(job.seriesFP),
MaxFingerprint: uint64(job.seriesFP),
StartTimestamp: int64(job.from),
EndTimestamp: int64(job.through),
}
var metas []bloomshipper.Meta
//TODO Configure pool for these to avoid allocations
var bloomBlocksRefs []bloomshipper.BlockRef
var tombstonedBlockRefs []bloomshipper.BlockRef

metas, err := bloomShipperClient.GetMetas(ctx, metaSearchParams)
if err != nil {
return err
}

if len(metas) == 0 {
// Get chunks data from list of chunkRefs
chks, err := storeClient.chunk.GetChunks(
ctx,
makeChunkRefs(job.Chunks(), job.Tenant(), job.Fingerprint()),
)
chks, err := storeClient.chunk.GetChunks(ctx, makeChunkRefs(job.Chunks(), job.Tenant(), job.Fingerprint()))
if err != nil {
return err
}

err = CompactNewChunks(ctx, logger, job, chks, bt, bloomShipperClient, c.cfg.WorkingDirectory)
storedBlocks, err := CompactNewChunks(ctx, logger, job, chks, bt, bloomShipperClient, c.cfg.WorkingDirectory)
if err != nil {
return err
return level.Error(logger).Log("compacting new chunks", err)
}

storedBlockRefs := make([]bloomshipper.BlockRef, len(storedBlocks))

for i, block := range storedBlocks {
storedBlockRefs[i] = block.BlockRef
}

// all blocks are new and active blocks
bloomBlocksRefs = storedBlockRefs
} else {
// TODO complete part 2 - periodic compaction for delta from previous period
// When already compacted metas exists
Expand All @@ -586,11 +592,24 @@ func (c *Compactor) runCompact(ctx context.Context, logger log.Logger, job Job,
for _, blockRef := range meta.Blocks {
uniqueIndexPaths[blockRef.IndexPath] = struct{}{}
// ...

// the result should return a list of active
// blocks and tombstoned bloom blocks.
}
}

}

// After all is done, create one meta file and upload to storage
meta := bloomshipper.Meta{
Tombstones: tombstonedBlockRefs,
Blocks: bloomBlocksRefs,
}
err = bloomShipperClient.PutMeta(ctx, meta)
if err != nil {
level.Error(logger).Log("putting meta.json to storage", err)
return err
}
return nil
}

Expand Down
30 changes: 16 additions & 14 deletions pkg/logql/log/ip.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,39 +78,41 @@ func (f *IPLineFilter) filterTy(line []byte, ty labels.MatchType) bool {

type IPLabelFilter struct {
ip *ipFilter
ty LabelFilterType
Ty LabelFilterType

// if used as label matcher, this holds the identifier label name.
// if used as Label matcher, this holds the identifier Label name.
// e.g: (|remote_addr = ip("xxx")). Here labelName is `remote_addr`
label string
Label string

// patError records if given pattern is invalid.
patError error

// local copy of pattern to display it in errors, even though pattern matcher fails because of invalid pattern.
pattern string
// local copy of Pattern to display it in errors, even though Pattern matcher fails because of invalid Pattern.
Pattern string
}

// NewIPLabelFilter is used to construct ip filter as label filter for the given `label`.
func NewIPLabelFilter(pattern string, label string, ty LabelFilterType) *IPLabelFilter {
func NewIPLabelFilter(pattern, label string, ty LabelFilterType) *IPLabelFilter {
ip, err := newIPFilter(pattern)
return &IPLabelFilter{
ip: ip,
label: label,
ty: ty,
Label: label,
Ty: ty,
patError: err,
pattern: pattern,
Pattern: pattern,
}
}

// `Process` implements `Stage` interface
func (f *IPLabelFilter) Process(_ int64, line []byte, lbs *LabelsBuilder) ([]byte, bool) {
return line, f.filterTy(line, f.ty, lbs)
return line, f.filterTy(line, f.Ty, lbs)
}

func (f *IPLabelFilter) isLabelFilterer() {}

// `RequiredLabelNames` implements `Stage` interface
func (f *IPLabelFilter) RequiredLabelNames() []string {
return []string{f.label}
return []string{f.Label}
}

// PatternError will be used `labelFilter.Stage()` method so that, if the given pattern is wrong
Expand All @@ -124,7 +126,7 @@ func (f *IPLabelFilter) filterTy(_ []byte, ty LabelFilterType, lbs *LabelsBuilde
// why `true`?. if there's an error only the string matchers can filter out.
return true
}
input, ok := lbs.Get(f.label)
input, ok := lbs.Get(f.Label)
if !ok {
// we have not found the label.
return false
Expand All @@ -146,11 +148,11 @@ func (f *IPLabelFilter) filterTy(_ []byte, ty LabelFilterType, lbs *LabelsBuilde
// `String` implements fmt.Stringer inteface, by which also implements `LabelFilterer` inteface.
func (f *IPLabelFilter) String() string {
eq := "=" // LabelFilterEqual -> "==", we don't want in string representation of ip label filter.
if f.ty == LabelFilterNotEqual {
if f.Ty == LabelFilterNotEqual {
eq = LabelFilterNotEqual.String()
}

return fmt.Sprintf("%s%sip(%q)", f.label, eq, f.pattern) // label filter
return fmt.Sprintf("%s%sip(%q)", f.Label, eq, f.Pattern) // label filter
}

// ipFilter search for IP addresses of given `pattern` in the given `line`.
Expand Down
44 changes: 32 additions & 12 deletions pkg/logql/log/label_filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,23 +54,28 @@ func (f LabelFilterType) String() string {
}

// LabelFilterer can filter extracted labels.
//
//sumtype:decl
type LabelFilterer interface {
Stage
fmt.Stringer

// Seal trait
isLabelFilterer()
}

type BinaryLabelFilter struct {
Left LabelFilterer
Right LabelFilterer
and bool
And bool
}

// NewAndLabelFilter creates a new LabelFilterer from a and binary operation of two LabelFilterer.
func NewAndLabelFilter(left LabelFilterer, right LabelFilterer) *BinaryLabelFilter {
return &BinaryLabelFilter{
Left: left,
Right: right,
and: true,
And: true,
}
}

Expand All @@ -84,16 +89,18 @@ func NewOrLabelFilter(left LabelFilterer, right LabelFilterer) *BinaryLabelFilte

func (b *BinaryLabelFilter) Process(ts int64, line []byte, lbs *LabelsBuilder) ([]byte, bool) {
line, lok := b.Left.Process(ts, line, lbs)
if !b.and && lok {
if !b.And && lok {
return line, true
}
line, rok := b.Right.Process(ts, line, lbs)
if !b.and {
if !b.And {
return line, lok || rok
}
return line, lok && rok
}

func (b *BinaryLabelFilter) isLabelFilterer() {}

func (b *BinaryLabelFilter) RequiredLabelNames() []string {
var names []string
names = append(names, b.Left.RequiredLabelNames()...)
Expand All @@ -105,7 +112,7 @@ func (b *BinaryLabelFilter) String() string {
var sb strings.Builder
sb.WriteString("( ")
sb.WriteString(b.Left.String())
if b.and {
if b.And {
sb.WriteString(" , ")
} else {
sb.WriteString(" or ")
Expand All @@ -122,6 +129,9 @@ type NoopLabelFilter struct {
func (NoopLabelFilter) Process(_ int64, line []byte, _ *LabelsBuilder) ([]byte, bool) {
return line, true
}

func (NoopLabelFilter) isLabelFilterer() {}

func (NoopLabelFilter) RequiredLabelNames() []string { return []string{} }

func (f NoopLabelFilter) String() string {
Expand Down Expand Up @@ -197,6 +207,8 @@ func (d *BytesLabelFilter) Process(_ int64, line []byte, lbs *LabelsBuilder) ([]
}
}

func (d *BytesLabelFilter) isLabelFilterer() {}

func (d *BytesLabelFilter) RequiredLabelNames() []string {
return []string{d.Name}
}
Expand All @@ -207,7 +219,7 @@ func (d *BytesLabelFilter) String() string {
return -1
}
return r
}, humanize.Bytes(d.Value))
}, humanize.Bytes(d.Value)) // TODO: discuss whether this should just be bytes, B, to be more accurate.
return fmt.Sprintf("%s%s%s", d.Name, d.Type, b)
}

Expand Down Expand Up @@ -262,6 +274,8 @@ func (d *DurationLabelFilter) Process(_ int64, line []byte, lbs *LabelsBuilder)
}
}

func (d *DurationLabelFilter) isLabelFilterer() {}

func (d *DurationLabelFilter) RequiredLabelNames() []string {
return []string{d.Name}
}
Expand Down Expand Up @@ -323,6 +337,8 @@ func (n *NumericLabelFilter) Process(_ int64, line []byte, lbs *LabelsBuilder) (

}

func (n *NumericLabelFilter) isLabelFilterer() {}

func (n *NumericLabelFilter) RequiredLabelNames() []string {
return []string{n.Name}
}
Expand All @@ -348,7 +364,7 @@ func NewStringLabelFilter(m *labels.Matcher) LabelFilterer {
return &NoopLabelFilter{m}
}

return &lineFilterLabelFilter{
return &LineFilterLabelFilter{
Matcher: m,
filter: f,
}
Expand All @@ -358,18 +374,20 @@ func (s *StringLabelFilter) Process(_ int64, line []byte, lbs *LabelsBuilder) ([
return line, s.Matches(labelValue(s.Name, lbs))
}

func (s *StringLabelFilter) isLabelFilterer() {}

func (s *StringLabelFilter) RequiredLabelNames() []string {
return []string{s.Name}
}

// lineFilterLabelFilter filters the desired label using an optimized line filter
type lineFilterLabelFilter struct {
// LineFilterLabelFilter filters the desired label using an optimized line filter
type LineFilterLabelFilter struct {
*labels.Matcher
filter Filterer
}

// overrides the matcher.String() function in case there is a regexpFilter
func (s *lineFilterLabelFilter) String() string {
func (s *LineFilterLabelFilter) String() string {
if unwrappedFilter, ok := s.filter.(regexpFilter); ok {
rStr := unwrappedFilter.String()
str := fmt.Sprintf("%s%s`%s`", s.Matcher.Name, s.Matcher.Type, rStr)
Expand All @@ -378,12 +396,14 @@ func (s *lineFilterLabelFilter) String() string {
return s.Matcher.String()
}

func (s *lineFilterLabelFilter) Process(_ int64, line []byte, lbs *LabelsBuilder) ([]byte, bool) {
func (s *LineFilterLabelFilter) Process(_ int64, line []byte, lbs *LabelsBuilder) ([]byte, bool) {
v := labelValue(s.Name, lbs)
return line, s.filter.Filter(unsafeGetBytes(v))
}

func (s *lineFilterLabelFilter) RequiredLabelNames() []string {
func (s *LineFilterLabelFilter) isLabelFilterer() {}

func (s *LineFilterLabelFilter) RequiredLabelNames() []string {
return []string{s.Name}
}

Expand Down
Loading

0 comments on commit 2ba5477

Please sign in to comment.