Skip to content

Commit

Permalink
chore: Squashed commit of the following:
Browse files Browse the repository at this point in the history
commit 018856c
Author: Callum Styan <[email protected]>
Date:   Mon Apr 1 06:40:16 2024 -0700

    fix: fix span logging based on changes to request types timestamps (#12393)

    Signed-off-by: Callum Styan <[email protected]>

commit 5190dda
Author: Shantanu Alshi <[email protected]>
Date:   Mon Apr 1 18:30:21 2024 +0530

    feat(detected_labels): Initial skeleton for the API (#12390)

    Co-authored-by: Cyril Tovena <[email protected]>

commit 0b7ff48
Author: Sandeep Sukhani <[email protected]>
Date:   Mon Apr 1 14:21:50 2024 +0530

    chore: delete request processing improvements (#12259)

commit a509871
Author: Ed Welch <[email protected]>
Date:   Sun Mar 31 22:14:21 2024 -0400

    chore: remove experimental flags for l2 cache and memcached "addresses" config (#12410)

commit 7480468
Author: Kaviraj Kanagaraj <[email protected]>
Date:   Sun Mar 31 18:00:53 2024 +0200

    fix: (Bug) correct resultType when storing instant query results in cache (#12312)

    Signed-off-by: Kaviraj <[email protected]>

commit 246623f
Author: Trevor Whitney <[email protected]>
Date:   Fri Mar 29 17:05:36 2024 -0600

    fix(detected_fields): fix issues with frontend integration (#12406)

    This PRs fixes issues we found when integrating with the frontend
    * the `/experimental` api made it difficult to interact with using the existing datasource, so move to `v1/detected_fields`
    * the config flag was considered cumbersome as the only potential negative impact of the endpoint is when it is used, and nothing is currently using it
    * the use of an enum in the protobuf produced unexpected results in the json, so type was converted to string
  • Loading branch information
trevorwhitney committed Apr 1, 2024
1 parent d4a57c4 commit 94eb466
Show file tree
Hide file tree
Showing 44 changed files with 3,316 additions and 456 deletions.
4 changes: 4 additions & 0 deletions cmd/loki/loki-local-with-memcached.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ common:
kvstore:
store: inmemory

limits_config:
split_instant_metric_queries_by_interval: '10m'


query_range:
align_queries_with_step: true
cache_index_stats_results: true
Expand Down
25 changes: 15 additions & 10 deletions docs/sources/configure/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -817,10 +817,6 @@ The `frontend` block configures the Loki query-frontend.
# The TLS configuration.
[tail_tls_config: <tls_config>]
# Whether to enable experimental APIs in the frontend.
# CLI flag: -frontend.experimental-apis-enabled
[experimental_apis_enabled: <boolean> | default = false]
```

### query_range
Expand Down Expand Up @@ -2408,13 +2404,22 @@ The `chunk_store_config` block configures how chunks will be cached and how long
# The CLI flags prefix for this block configuration is: store.chunks-cache
[chunk_cache_config: <cache_config>]
# The cache block configures the cache backend.
# The CLI flags prefix for this block configuration is: store.chunks-cache-l2
[chunk_cache_config_l2: <cache_config>]
# Write dedupe cache is deprecated along with legacy index types (aws,
# aws-dynamo, bigtable, bigtable-hashed, cassandra, gcp, gcp-columnkey,
# grpc-store).
# Consider using TSDB index which does not require a write dedupe cache.
# The CLI flags prefix for this block configuration is: store.index-cache-write
[write_dedupe_cache_config: <cache_config>]
# Chunks will be handed off to the L2 cache after this duration. 0 to disable L2
# cache.
# CLI flag: -store.chunks-cache-l2.handoff
[l2_chunk_cache_handoff: <duration> | default = 0s]
# Cache index entries older than this period. 0 to disable.
# CLI flag: -store.cache-lookups-older-than
[cache_lookups_older_than: <duration> | default = 0s]
Expand Down Expand Up @@ -2482,9 +2487,9 @@ The `compactor` block configures the compactor component, which compacts index s
# CLI flag: -compactor.delete-request-cancel-period
[delete_request_cancel_period: <duration> | default = 24h]
# Constrain the size of any single delete request. When a delete request >
# delete_max_interval is input, the request is sharded into smaller requests of
# no more than delete_max_interval
# Constrain the size of any single delete request with line filters. When a
# delete request > delete_max_interval is input, the request is sharded into
# smaller requests of no more than delete_max_interval
# CLI flag: -compactor.delete-max-interval
[delete_max_interval: <duration> | default = 24h]
Expand Down Expand Up @@ -4492,6 +4497,7 @@ The cache block configures the cache backend. The supported CLI flags `<prefix>`
- `frontend.series-results-cache`
- `frontend.volume-results-cache`
- `store.chunks-cache`
- `store.chunks-cache-l2`
- `store.index-cache-read`
- `store.index-cache-write`

Expand Down Expand Up @@ -4538,9 +4544,8 @@ memcached_client:
# CLI flag: -<prefix>.memcached.service
[service: <string> | default = "memcached"]
# EXPERIMENTAL: Comma separated addresses list in DNS Service Discovery
# format:
# https://cortexmetrics.io/docs/configuration/arguments/#dns-service-discovery
# Comma separated addresses list in DNS Service Discovery format:
# https://grafana.com/docs/mimir/latest/configure/about-dns-service-discovery/#supported-discovery-modes
# CLI flag: -<prefix>.memcached.addresses
[addresses: <string> | default = ""]
Expand Down
2 changes: 1 addition & 1 deletion pkg/compactor/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.StringVar(&cfg.DeleteRequestStoreKeyPrefix, "compactor.delete-request-store.key-prefix", "index/", "Path prefix for storing delete requests.")
f.IntVar(&cfg.DeleteBatchSize, "compactor.delete-batch-size", 70, "The max number of delete requests to run per compaction cycle.")
f.DurationVar(&cfg.DeleteRequestCancelPeriod, "compactor.delete-request-cancel-period", 24*time.Hour, "Allow cancellation of delete request until duration after they are created. Data would be deleted only after delete requests have been older than this duration. Ideally this should be set to at least 24h.")
f.DurationVar(&cfg.DeleteMaxInterval, "compactor.delete-max-interval", 24*time.Hour, "Constrain the size of any single delete request. When a delete request > delete_max_interval is input, the request is sharded into smaller requests of no more than delete_max_interval")
f.DurationVar(&cfg.DeleteMaxInterval, "compactor.delete-max-interval", 24*time.Hour, "Constrain the size of any single delete request with line filters. When a delete request > delete_max_interval is input, the request is sharded into smaller requests of no more than delete_max_interval")
f.DurationVar(&cfg.RetentionTableTimeout, "compactor.retention-table-timeout", 0, "The maximum amount of time to spend running retention and deletion on any given table in the index.")
f.IntVar(&cfg.MaxCompactionParallelism, "compactor.max-compaction-parallelism", 1, "Maximum number of tables to compact in parallel. While increasing this value, please make sure compactor has enough disk space allocated to be able to store and compact as many tables.")
f.IntVar(&cfg.UploadParallelism, "compactor.upload-parallelism", 10, "Number of upload/remove operations to execute in parallel when finalizing a compaction. NOTE: This setting is per compaction operation, which can be executed in parallel. The upper bound on the number of concurrent uploads is upload_parallelism * max_compaction_parallelism.")
Expand Down
80 changes: 59 additions & 21 deletions pkg/compactor/deletion/delete_requests_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,10 +126,25 @@ func (d *DeleteRequestsManager) loadDeleteRequestsToProcess() error {
return err
}

reqCount := 0
for i := range deleteRequests {
deleteRequest := deleteRequests[i]
if i >= d.batchSize {
logBatchTruncation(i, len(deleteRequests))
maxRetentionInterval := getMaxRetentionInterval(deleteRequest.UserID, d.limits)
// retention interval 0 means retain the data forever
if maxRetentionInterval != 0 {
oldestRetainedLogTimestamp := model.Now().Add(-maxRetentionInterval)
if deleteRequest.StartTime.Before(oldestRetainedLogTimestamp) && deleteRequest.EndTime.Before(oldestRetainedLogTimestamp) {
level.Info(util_log.Logger).Log(
"msg", "Marking delete request with interval beyond retention period as processed",
"delete_request_id", deleteRequest.RequestID,
"user", deleteRequest.UserID,
)
d.markRequestAsProcessed(deleteRequest)
continue
}
}
if reqCount >= d.batchSize {
logBatchTruncation(reqCount, len(deleteRequests))
break
}

Expand All @@ -149,6 +164,7 @@ func (d *DeleteRequestsManager) loadDeleteRequestsToProcess() error {
if deleteRequest.EndTime > ur.requestsInterval.End {
ur.requestsInterval.End = deleteRequest.EndTime
}
reqCount++
}

return nil
Expand Down Expand Up @@ -305,6 +321,28 @@ func (d *DeleteRequestsManager) MarkPhaseTimedOut() {
d.deleteRequestsToProcess = map[string]*userDeleteRequests{}
}

func (d *DeleteRequestsManager) markRequestAsProcessed(deleteRequest DeleteRequest) {
if err := d.deleteRequestsStore.UpdateStatus(context.Background(), deleteRequest, StatusProcessed); err != nil {
level.Error(util_log.Logger).Log(
"msg", "failed to mark delete request for user as processed",
"delete_request_id", deleteRequest.RequestID,
"sequence_num", deleteRequest.SequenceNum,
"user", deleteRequest.UserID,
"err", err,
"deleted_lines", deleteRequest.DeletedLines,
)
} else {
level.Info(util_log.Logger).Log(
"msg", "delete request for user marked as processed",
"delete_request_id", deleteRequest.RequestID,
"sequence_num", deleteRequest.SequenceNum,
"user", deleteRequest.UserID,
"deleted_lines", deleteRequest.DeletedLines,
)
d.metrics.deleteRequestsProcessedTotal.WithLabelValues(deleteRequest.UserID).Inc()
}
}

func (d *DeleteRequestsManager) MarkPhaseFinished() {
d.deleteRequestsToProcessMtx.Lock()
defer d.deleteRequestsToProcessMtx.Unlock()
Expand All @@ -315,25 +353,7 @@ func (d *DeleteRequestsManager) MarkPhaseFinished() {
}

for _, deleteRequest := range userDeleteRequests.requests {
if err := d.deleteRequestsStore.UpdateStatus(context.Background(), *deleteRequest, StatusProcessed); err != nil {
level.Error(util_log.Logger).Log(
"msg", "failed to mark delete request for user as processed",
"delete_request_id", deleteRequest.RequestID,
"sequence_num", deleteRequest.SequenceNum,
"user", deleteRequest.UserID,
"err", err,
"deleted_lines", deleteRequest.DeletedLines,
)
} else {
level.Info(util_log.Logger).Log(
"msg", "delete request for user marked as processed",
"delete_request_id", deleteRequest.RequestID,
"sequence_num", deleteRequest.SequenceNum,
"user", deleteRequest.UserID,
"deleted_lines", deleteRequest.DeletedLines,
)
}
d.metrics.deleteRequestsProcessedTotal.WithLabelValues(deleteRequest.UserID).Inc()
d.markRequestAsProcessed(*deleteRequest)
}
}
}
Expand All @@ -355,3 +375,21 @@ func (d *DeleteRequestsManager) IntervalMayHaveExpiredChunks(_ model.Interval, u
func (d *DeleteRequestsManager) DropFromIndex(_ retention.ChunkEntry, _ model.Time, _ model.Time) bool {
return false
}

func getMaxRetentionInterval(userID string, limits Limits) time.Duration {
maxRetention := model.Duration(limits.RetentionPeriod(userID))
if maxRetention == 0 {
return 0
}

for _, streamRetention := range limits.StreamRetention(userID) {
if streamRetention.Period == 0 {
return 0
}
if streamRetention.Period > maxRetention {
maxRetention = streamRetention.Period
}
}

return time.Duration(maxRetention)
}
Loading

0 comments on commit 94eb466

Please sign in to comment.