Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/main' into feat/patterns
Browse files Browse the repository at this point in the history
  • Loading branch information
cyriltovena committed Apr 2, 2024
2 parents 0e88aad + 36c703d commit 464a4ed
Show file tree
Hide file tree
Showing 63 changed files with 4,177 additions and 943 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
* [11970](https://github.com/grafana/loki/pull/11897) **masslessparticle** Ksonnet: Introduces memory limits to the compactor configuration to avoid unbounded memory usage.
* [12318](https://github.com/grafana/loki/pull/12318) **DylanGuedes** Memcached: Add mTLS support.
* [12392](https://github.com/grafana/loki/pull/12392) **sandeepsukhani** Detect name of service emitting logs and add it as a label.
* [12398](https://github.com/grafana/loki/pull/12398) **kolesnikovae** LogQL: Introduces pattern match filter operators.

##### Fixes

Expand Down
2 changes: 1 addition & 1 deletion clients/pkg/logentry/stages/extensions.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ type CriConfig struct {
MaxPartialLineSizeTruncate bool `mapstructure:"max_partial_line_size_truncate"`
}

// validateDropConfig validates the DropConfig for the dropStage
// validateCriConfig validates the CriConfig for the cri stage
func validateCriConfig(cfg *CriConfig) error {
if cfg.MaxPartialLines == 0 {
cfg.MaxPartialLines = MaxPartialLinesSize
Expand Down
4 changes: 2 additions & 2 deletions clients/pkg/logentry/stages/limit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ var testNonAppLogLine = `

var plName = "testPipeline"

// TestLimitPipeline is used to verify we properly parse the yaml config and create a working pipeline
// TestLimitWaitPipeline is used to verify we properly parse the yaml config and create a working pipeline
func TestLimitWaitPipeline(t *testing.T) {
registry := prometheus.NewRegistry()
pl, err := NewPipeline(util_log.Logger, loadConfig(testLimitWaitYaml), &plName, registry)
Expand All @@ -78,7 +78,7 @@ func TestLimitWaitPipeline(t *testing.T) {
assert.Equal(t, out[0].Line, testMatchLogLineApp1)
}

// TestLimitPipeline is used to verify we properly parse the yaml config and create a working pipeline
// TestLimitDropPipeline is used to verify we properly parse the yaml config and create a working pipeline
func TestLimitDropPipeline(t *testing.T) {
registry := prometheus.NewRegistry()
pl, err := NewPipeline(util_log.Logger, loadConfig(testLimitDropYaml), &plName, registry)
Expand Down
4 changes: 4 additions & 0 deletions cmd/loki/loki-local-with-memcached.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ common:
kvstore:
store: inmemory

limits_config:
split_instant_metric_queries_by_interval: '10m'


query_range:
align_queries_with_step: true
cache_index_stats_results: true
Expand Down
21 changes: 15 additions & 6 deletions docs/sources/configure/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -2573,13 +2573,22 @@ The `chunk_store_config` block configures how chunks will be cached and how long
# The CLI flags prefix for this block configuration is: store.chunks-cache
[chunk_cache_config: <cache_config>]
# The cache block configures the cache backend.
# The CLI flags prefix for this block configuration is: store.chunks-cache-l2
[chunk_cache_config_l2: <cache_config>]
# Write dedupe cache is deprecated along with legacy index types (aws,
# aws-dynamo, bigtable, bigtable-hashed, cassandra, gcp, gcp-columnkey,
# grpc-store).
# Consider using TSDB index which does not require a write dedupe cache.
# The CLI flags prefix for this block configuration is: store.index-cache-write
[write_dedupe_cache_config: <cache_config>]
# Chunks will be handed off to the L2 cache after this duration. 0 to disable L2
# cache.
# CLI flag: -store.chunks-cache-l2.handoff
[l2_chunk_cache_handoff: <duration> | default = 0s]
# Cache index entries older than this period. 0 to disable.
# CLI flag: -store.cache-lookups-older-than
[cache_lookups_older_than: <duration> | default = 0s]
Expand Down Expand Up @@ -2647,9 +2656,9 @@ The `compactor` block configures the compactor component, which compacts index s
# CLI flag: -compactor.delete-request-cancel-period
[delete_request_cancel_period: <duration> | default = 24h]
# Constrain the size of any single delete request. When a delete request >
# delete_max_interval is input, the request is sharded into smaller requests of
# no more than delete_max_interval
# Constrain the size of any single delete request with line filters. When a
# delete request > delete_max_interval is input, the request is sharded into
# smaller requests of no more than delete_max_interval
# CLI flag: -compactor.delete-max-interval
[delete_max_interval: <duration> | default = 24h]
Expand Down Expand Up @@ -4660,6 +4669,7 @@ The cache block configures the cache backend. The supported CLI flags `<prefix>`
- `frontend.series-results-cache`
- `frontend.volume-results-cache`
- `store.chunks-cache`
- `store.chunks-cache-l2`
- `store.index-cache-read`
- `store.index-cache-write`

Expand Down Expand Up @@ -4706,9 +4716,8 @@ memcached_client:
# CLI flag: -<prefix>.memcached.service
[service: <string> | default = "memcached"]
# EXPERIMENTAL: Comma separated addresses list in DNS Service Discovery
# format:
# https://cortexmetrics.io/docs/configuration/arguments/#dns-service-discovery
# Comma separated addresses list in DNS Service Discovery format:
# https://grafana.com/docs/mimir/latest/configure/about-dns-service-discovery/#supported-discovery-modes
# CLI flag: -<prefix>.memcached.addresses
[addresses: <string> | default = ""]
Expand Down
1 change: 1 addition & 0 deletions docs/sources/send-data/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ These third-party clients also enable sending logs to Loki:
- [promtail-client](https://github.com/afiskon/promtail-client) (Go)
- [push-to-loki.py](https://github.com/sleleko/devops-kb/blob/master/python/push-to-loki.py) (Python 3)
- [python-logging-loki](https://pypi.org/project/python-logging-loki/) (Python 3)
- [nextlog](https://pypi.org/project/nextlog/) (Python 3)
- [Serilog-Sinks-Loki](https://github.com/JosephWoodward/Serilog-Sinks-Loki) (C#)
- [Vector Loki Sink](https://vector.dev/docs/reference/configuration/sinks/loki/)
- [winston-loki](https://github.com/JaniAnttonen/winston-loki) (JS)
2 changes: 1 addition & 1 deletion pkg/compactor/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.StringVar(&cfg.DeleteRequestStoreKeyPrefix, "compactor.delete-request-store.key-prefix", "index/", "Path prefix for storing delete requests.")
f.IntVar(&cfg.DeleteBatchSize, "compactor.delete-batch-size", 70, "The max number of delete requests to run per compaction cycle.")
f.DurationVar(&cfg.DeleteRequestCancelPeriod, "compactor.delete-request-cancel-period", 24*time.Hour, "Allow cancellation of delete request until duration after they are created. Data would be deleted only after delete requests have been older than this duration. Ideally this should be set to at least 24h.")
f.DurationVar(&cfg.DeleteMaxInterval, "compactor.delete-max-interval", 24*time.Hour, "Constrain the size of any single delete request. When a delete request > delete_max_interval is input, the request is sharded into smaller requests of no more than delete_max_interval")
f.DurationVar(&cfg.DeleteMaxInterval, "compactor.delete-max-interval", 24*time.Hour, "Constrain the size of any single delete request with line filters. When a delete request > delete_max_interval is input, the request is sharded into smaller requests of no more than delete_max_interval")
f.DurationVar(&cfg.RetentionTableTimeout, "compactor.retention-table-timeout", 0, "The maximum amount of time to spend running retention and deletion on any given table in the index.")
f.IntVar(&cfg.MaxCompactionParallelism, "compactor.max-compaction-parallelism", 1, "Maximum number of tables to compact in parallel. While increasing this value, please make sure compactor has enough disk space allocated to be able to store and compact as many tables.")
f.IntVar(&cfg.UploadParallelism, "compactor.upload-parallelism", 10, "Number of upload/remove operations to execute in parallel when finalizing a compaction. NOTE: This setting is per compaction operation, which can be executed in parallel. The upper bound on the number of concurrent uploads is upload_parallelism * max_compaction_parallelism.")
Expand Down
80 changes: 59 additions & 21 deletions pkg/compactor/deletion/delete_requests_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,10 +126,25 @@ func (d *DeleteRequestsManager) loadDeleteRequestsToProcess() error {
return err
}

reqCount := 0
for i := range deleteRequests {
deleteRequest := deleteRequests[i]
if i >= d.batchSize {
logBatchTruncation(i, len(deleteRequests))
maxRetentionInterval := getMaxRetentionInterval(deleteRequest.UserID, d.limits)
// retention interval 0 means retain the data forever
if maxRetentionInterval != 0 {
oldestRetainedLogTimestamp := model.Now().Add(-maxRetentionInterval)
if deleteRequest.StartTime.Before(oldestRetainedLogTimestamp) && deleteRequest.EndTime.Before(oldestRetainedLogTimestamp) {
level.Info(util_log.Logger).Log(
"msg", "Marking delete request with interval beyond retention period as processed",
"delete_request_id", deleteRequest.RequestID,
"user", deleteRequest.UserID,
)
d.markRequestAsProcessed(deleteRequest)
continue
}
}
if reqCount >= d.batchSize {
logBatchTruncation(reqCount, len(deleteRequests))
break
}

Expand All @@ -149,6 +164,7 @@ func (d *DeleteRequestsManager) loadDeleteRequestsToProcess() error {
if deleteRequest.EndTime > ur.requestsInterval.End {
ur.requestsInterval.End = deleteRequest.EndTime
}
reqCount++
}

return nil
Expand Down Expand Up @@ -305,6 +321,28 @@ func (d *DeleteRequestsManager) MarkPhaseTimedOut() {
d.deleteRequestsToProcess = map[string]*userDeleteRequests{}
}

func (d *DeleteRequestsManager) markRequestAsProcessed(deleteRequest DeleteRequest) {
if err := d.deleteRequestsStore.UpdateStatus(context.Background(), deleteRequest, StatusProcessed); err != nil {
level.Error(util_log.Logger).Log(
"msg", "failed to mark delete request for user as processed",
"delete_request_id", deleteRequest.RequestID,
"sequence_num", deleteRequest.SequenceNum,
"user", deleteRequest.UserID,
"err", err,
"deleted_lines", deleteRequest.DeletedLines,
)
} else {
level.Info(util_log.Logger).Log(
"msg", "delete request for user marked as processed",
"delete_request_id", deleteRequest.RequestID,
"sequence_num", deleteRequest.SequenceNum,
"user", deleteRequest.UserID,
"deleted_lines", deleteRequest.DeletedLines,
)
d.metrics.deleteRequestsProcessedTotal.WithLabelValues(deleteRequest.UserID).Inc()
}
}

func (d *DeleteRequestsManager) MarkPhaseFinished() {
d.deleteRequestsToProcessMtx.Lock()
defer d.deleteRequestsToProcessMtx.Unlock()
Expand All @@ -315,25 +353,7 @@ func (d *DeleteRequestsManager) MarkPhaseFinished() {
}

for _, deleteRequest := range userDeleteRequests.requests {
if err := d.deleteRequestsStore.UpdateStatus(context.Background(), *deleteRequest, StatusProcessed); err != nil {
level.Error(util_log.Logger).Log(
"msg", "failed to mark delete request for user as processed",
"delete_request_id", deleteRequest.RequestID,
"sequence_num", deleteRequest.SequenceNum,
"user", deleteRequest.UserID,
"err", err,
"deleted_lines", deleteRequest.DeletedLines,
)
} else {
level.Info(util_log.Logger).Log(
"msg", "delete request for user marked as processed",
"delete_request_id", deleteRequest.RequestID,
"sequence_num", deleteRequest.SequenceNum,
"user", deleteRequest.UserID,
"deleted_lines", deleteRequest.DeletedLines,
)
}
d.metrics.deleteRequestsProcessedTotal.WithLabelValues(deleteRequest.UserID).Inc()
d.markRequestAsProcessed(*deleteRequest)
}
}
}
Expand All @@ -355,3 +375,21 @@ func (d *DeleteRequestsManager) IntervalMayHaveExpiredChunks(_ model.Interval, u
func (d *DeleteRequestsManager) DropFromIndex(_ retention.ChunkEntry, _ model.Time, _ model.Time) bool {
return false
}

func getMaxRetentionInterval(userID string, limits Limits) time.Duration {
maxRetention := model.Duration(limits.RetentionPeriod(userID))
if maxRetention == 0 {
return 0
}

for _, streamRetention := range limits.StreamRetention(userID) {
if streamRetention.Period == 0 {
return 0
}
if streamRetention.Period > maxRetention {
maxRetention = streamRetention.Period
}
}

return time.Duration(maxRetention)
}
Loading

0 comments on commit 464a4ed

Please sign in to comment.