Skip to content

Commit

Permalink
Merge branch 'main' into kavirajk/cache-instant-queries2
Browse files Browse the repository at this point in the history
  • Loading branch information
kavirajk committed Feb 19, 2024
2 parents a5ad611 + 9f86473 commit bbe5605
Show file tree
Hide file tree
Showing 19 changed files with 235 additions and 166 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
* [11499](https://github.com/grafana/loki/pull/11284) **jmichalek132** Config: Adds `frontend.log-query-request-headers` to enable logging of request headers in query logs.
* [11817](https://github.com/grafana/loki/pull/11817) **ashwanthgoli** Ruler: Add support for filtering results of `/prometheus/api/v1/rules` endpoint by rule_name, rule_group, file and type.
* [11897](https://github.com/grafana/loki/pull/11897) **ashwanthgoli** Metadata: Introduces a separate split interval of `split_recent_metadata_queries_by_interval` for `recent_metadata_query_window` to help with caching recent metadata query results.
* [11970](https://github.com/grafana/loki/pull/11897) **masslessparticle** Ksonnet: Introduces memory limits to the compactor configuration to avoid unbounded memory usage.

##### Fixes
* [11074](https://github.com/grafana/loki/pull/11074) **hainenber** Fix panic in lambda-promtail due to mishandling of empty DROP_LABELS env var.
Expand Down
2 changes: 1 addition & 1 deletion docs/sources/configure/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -3184,7 +3184,7 @@ shard_streams:

# Skip factor for the n-grams created when computing blooms from log lines.
# CLI flag: -bloom-compactor.ngram-skip
[bloom_ngram_skip: <int> | default = 0]
[bloom_ngram_skip: <int> | default = 1]

# Scalable Bloom Filter desired false-positive rate.
# CLI flag: -bloom-compactor.false-positive-rate
Expand Down
40 changes: 29 additions & 11 deletions pkg/bloomcompactor/bloomcompactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func New(
c.bloomStore = bloomStore

// initialize metrics
c.btMetrics = v1.NewMetrics(prometheus.WrapRegistererWithPrefix("loki_bloom_tokenizer", r))
c.btMetrics = v1.NewMetrics(prometheus.WrapRegistererWithPrefix("loki_bloom_tokenizer_", r))
c.metrics = NewMetrics(r, c.btMetrics)

chunkLoader := NewStoreChunkLoader(
Expand Down Expand Up @@ -179,11 +179,11 @@ func runWithRetries(

type tenantTable struct {
tenant string
table config.DayTime
table config.DayTable
ownershipRange v1.FingerprintBounds
}

func (c *Compactor) tenants(ctx context.Context, table config.DayTime) (v1.Iterator[string], error) {
func (c *Compactor) tenants(ctx context.Context, table config.DayTable) (v1.Iterator[string], error) {
tenants, err := c.tsdbStore.UsersForPeriod(ctx, table)
if err != nil {
return nil, errors.Wrap(err, "getting tenants")
Expand Down Expand Up @@ -241,15 +241,15 @@ func (c *Compactor) tables(ts time.Time) *dayRangeIterator {

fromDay := config.NewDayTime(model.TimeFromUnixNano(from))
throughDay := config.NewDayTime(model.TimeFromUnixNano(through))
return newDayRangeIterator(fromDay, throughDay)
return newDayRangeIterator(fromDay, throughDay, c.schemaCfg)
}

func (c *Compactor) loadWork(ctx context.Context, ch chan<- tenantTable) error {
tables := c.tables(time.Now())

for tables.Next() && tables.Err() == nil && ctx.Err() == nil {

table := tables.At()

tenants, err := c.tenants(ctx, table)
if err != nil {
return errors.Wrap(err, "getting tenants")
Expand All @@ -269,7 +269,11 @@ func (c *Compactor) loadWork(ctx context.Context, ch chan<- tenantTable) error {
c.metrics.tenantsOwned.Inc()

select {
case ch <- tenantTable{tenant: tenant, table: table, ownershipRange: ownershipRange}:
case ch <- tenantTable{
tenant: tenant,
table: table,
ownershipRange: ownershipRange,
}:
case <-ctx.Done():
return ctx.Err()
}
Expand Down Expand Up @@ -332,19 +336,33 @@ func (c *Compactor) compactTenantTable(ctx context.Context, tt tenantTable) erro

type dayRangeIterator struct {
min, max, cur config.DayTime
curPeriod config.PeriodConfig
schemaCfg config.SchemaConfig
err error
}

func newDayRangeIterator(min, max config.DayTime) *dayRangeIterator {
return &dayRangeIterator{min: min, max: max, cur: min.Dec()}
func newDayRangeIterator(min, max config.DayTime, schemaCfg config.SchemaConfig) *dayRangeIterator {
return &dayRangeIterator{min: min, max: max, cur: min.Dec(), schemaCfg: schemaCfg}
}

func (r *dayRangeIterator) Next() bool {
r.cur = r.cur.Inc()
return r.cur.Before(r.max)
if !r.cur.Before(r.max) {
return false
}

period, err := r.schemaCfg.SchemaForTime(r.cur.ModelTime())
if err != nil {
r.err = errors.Wrapf(err, "getting schema for time (%s)", r.cur)
return false
}
r.curPeriod = period

return true
}

func (r *dayRangeIterator) At() config.DayTime {
return r.cur
func (r *dayRangeIterator) At() config.DayTable {
return config.NewDayTable(r.cur, r.curPeriod.IndexTables.Prefix)
}

func (r *dayRangeIterator) Err() error {
Expand Down
12 changes: 6 additions & 6 deletions pkg/bloomcompactor/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,15 +66,15 @@ Compaction works as follows, split across many functions for clarity:
*/
func (s *SimpleBloomController) compactTenant(
ctx context.Context,
table config.DayTime,
table config.DayTable,
tenant string,
ownershipRange v1.FingerprintBounds,
) error {
logger := log.With(s.logger, "ownership", ownershipRange, "org_id", tenant, "table", table)
logger := log.With(s.logger, "ownership", ownershipRange, "org_id", tenant, "table", table.Addr())

client, err := s.bloomStore.Client(table.ModelTime())
if err != nil {
level.Error(logger).Log("msg", "failed to get client", "err", err, "table", table.Addr())
level.Error(logger).Log("msg", "failed to get client", "err", err)
return errors.Wrap(err, "failed to get client")
}

Expand Down Expand Up @@ -175,7 +175,7 @@ func (s *SimpleBloomController) compactTenant(
func (s *SimpleBloomController) findOutdatedGaps(
ctx context.Context,
tenant string,
table config.DayTime,
table config.DayTable,
ownershipRange v1.FingerprintBounds,
metas []bloomshipper.Meta,
logger log.Logger,
Expand Down Expand Up @@ -215,7 +215,7 @@ func (s *SimpleBloomController) findOutdatedGaps(

func (s *SimpleBloomController) loadWorkForGap(
ctx context.Context,
table config.DayTime,
table config.DayTable,
tenant string,
id tsdb.Identifier,
gap gapWithBlocks,
Expand All @@ -241,7 +241,7 @@ func (s *SimpleBloomController) loadWorkForGap(
func (s *SimpleBloomController) buildGaps(
ctx context.Context,
tenant string,
table config.DayTime,
table config.DayTable,
client bloomshipper.Client,
work []blockPlan,
logger log.Logger,
Expand Down
24 changes: 12 additions & 12 deletions pkg/bloomcompactor/tsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@ const (
)

type TSDBStore interface {
UsersForPeriod(ctx context.Context, table config.DayTime) ([]string, error)
ResolveTSDBs(ctx context.Context, table config.DayTime, tenant string) ([]tsdb.SingleTenantTSDBIdentifier, error)
UsersForPeriod(ctx context.Context, table config.DayTable) ([]string, error)
ResolveTSDBs(ctx context.Context, table config.DayTable, tenant string) ([]tsdb.SingleTenantTSDBIdentifier, error)
LoadTSDB(
ctx context.Context,
table config.DayTime,
table config.DayTable,
tenant string,
id tsdb.Identifier,
bounds v1.FingerprintBounds,
Expand All @@ -49,12 +49,12 @@ func NewBloomTSDBStore(storage storage.Client) *BloomTSDBStore {
}
}

func (b *BloomTSDBStore) UsersForPeriod(ctx context.Context, table config.DayTime) ([]string, error) {
func (b *BloomTSDBStore) UsersForPeriod(ctx context.Context, table config.DayTable) ([]string, error) {
_, users, err := b.storage.ListFiles(ctx, table.Addr(), true) // bypass cache for ease of testing
return users, err
}

func (b *BloomTSDBStore) ResolveTSDBs(ctx context.Context, table config.DayTime, tenant string) ([]tsdb.SingleTenantTSDBIdentifier, error) {
func (b *BloomTSDBStore) ResolveTSDBs(ctx context.Context, table config.DayTable, tenant string) ([]tsdb.SingleTenantTSDBIdentifier, error) {
indices, err := b.storage.ListUserFiles(ctx, table.Addr(), tenant, true) // bypass cache for ease of testing
if err != nil {
return nil, errors.Wrap(err, "failed to list user files")
Expand All @@ -80,7 +80,7 @@ func (b *BloomTSDBStore) ResolveTSDBs(ctx context.Context, table config.DayTime,

func (b *BloomTSDBStore) LoadTSDB(
ctx context.Context,
table config.DayTime,
table config.DayTable,
tenant string,
id tsdb.Identifier,
bounds v1.FingerprintBounds,
Expand Down Expand Up @@ -272,17 +272,17 @@ func (s *TSDBStores) storeForPeriod(table config.DayTime) (TSDBStore, error) {
)
}

func (s *TSDBStores) UsersForPeriod(ctx context.Context, table config.DayTime) ([]string, error) {
store, err := s.storeForPeriod(table)
func (s *TSDBStores) UsersForPeriod(ctx context.Context, table config.DayTable) ([]string, error) {
store, err := s.storeForPeriod(table.DayTime)
if err != nil {
return nil, err
}

return store.UsersForPeriod(ctx, table)
}

func (s *TSDBStores) ResolveTSDBs(ctx context.Context, table config.DayTime, tenant string) ([]tsdb.SingleTenantTSDBIdentifier, error) {
store, err := s.storeForPeriod(table)
func (s *TSDBStores) ResolveTSDBs(ctx context.Context, table config.DayTable, tenant string) ([]tsdb.SingleTenantTSDBIdentifier, error) {
store, err := s.storeForPeriod(table.DayTime)
if err != nil {
return nil, err
}
Expand All @@ -292,12 +292,12 @@ func (s *TSDBStores) ResolveTSDBs(ctx context.Context, table config.DayTime, ten

func (s *TSDBStores) LoadTSDB(
ctx context.Context,
table config.DayTime,
table config.DayTable,
tenant string,
id tsdb.Identifier,
bounds v1.FingerprintBounds,
) (v1.CloseableIterator[*v1.Series], error) {
store, err := s.storeForPeriod(table)
store, err := s.storeForPeriod(table.DayTime)
if err != nil {
return nil, err
}
Expand Down
Loading

0 comments on commit bbe5605

Please sign in to comment.