Skip to content

Commit

Permalink
feat: add log patterns ingestion and query API. (#12403)
Browse files Browse the repository at this point in the history
Signed-off-by: Edward Welch <[email protected]>
Co-authored-by: Anton Kolesnikov <[email protected]>
Co-authored-by: Sven Grossmann <[email protected]>
Co-authored-by: Edward Welch <[email protected]>
  • Loading branch information
4 people authored Apr 4, 2024
1 parent 42ab9c2 commit 491d251
Show file tree
Hide file tree
Showing 69 changed files with 7,098 additions and 253 deletions.
201 changes: 139 additions & 62 deletions CHANGELOG.md

Large diffs are not rendered by default.

182 changes: 182 additions & 0 deletions docs/sources/configure/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,185 @@ Pass the `-config.expand-env` flag at the command line to enable this way of set
# itself to a key value store.
[ingester: <ingester>]

pattern_ingester:
# Whether the pattern ingester is enabled.
# CLI flag: -pattern-ingester.enabled
[enabled: <boolean> | default = false]

# Configures how the lifecycle of the pattern ingester will operate and where
# it will register for discovery.
lifecycler:
ring:
kvstore:
# Backend storage to use for the ring. Supported values are: consul,
# etcd, inmemory, memberlist, multi.
# CLI flag: -pattern-ingester.store
[store: <string> | default = "consul"]

# The prefix for the keys in the store. Should end with a /.
# CLI flag: -pattern-ingester.prefix
[prefix: <string> | default = "collectors/"]

# Configuration for a Consul client. Only applies if the selected
# kvstore is consul.
# The CLI flags prefix for this block configuration is: pattern-ingester
[consul: <consul>]

# Configuration for an ETCD v3 client. Only applies if the selected
# kvstore is etcd.
# The CLI flags prefix for this block configuration is: pattern-ingester
[etcd: <etcd>]

multi:
# Primary backend storage used by multi-client.
# CLI flag: -pattern-ingester.multi.primary
[primary: <string> | default = ""]

# Secondary backend storage used by multi-client.
# CLI flag: -pattern-ingester.multi.secondary
[secondary: <string> | default = ""]

# Mirror writes to secondary store.
# CLI flag: -pattern-ingester.multi.mirror-enabled
[mirror_enabled: <boolean> | default = false]

# Timeout for storing value to secondary store.
# CLI flag: -pattern-ingester.multi.mirror-timeout
[mirror_timeout: <duration> | default = 2s]

# The heartbeat timeout after which ingesters are skipped for
# reads/writes. 0 = never (timeout disabled).
# CLI flag: -pattern-ingester.ring.heartbeat-timeout
[heartbeat_timeout: <duration> | default = 1m]

# The number of ingesters to write to and read from.
# CLI flag: -pattern-ingester.distributor.replication-factor
[replication_factor: <int> | default = 1]

# True to enable the zone-awareness and replicate ingested samples across
# different availability zones.
# CLI flag: -pattern-ingester.distributor.zone-awareness-enabled
[zone_awareness_enabled: <boolean> | default = false]

# Comma-separated list of zones to exclude from the ring. Instances in
# excluded zones will be filtered out from the ring.
# CLI flag: -pattern-ingester.distributor.excluded-zones
[excluded_zones: <string> | default = ""]

# Number of tokens for each ingester.
# CLI flag: -pattern-ingester.num-tokens
[num_tokens: <int> | default = 128]

# Period at which to heartbeat to consul. 0 = disabled.
# CLI flag: -pattern-ingester.heartbeat-period
[heartbeat_period: <duration> | default = 5s]

# Heartbeat timeout after which instance is assumed to be unhealthy. 0 =
# disabled.
# CLI flag: -pattern-ingester.heartbeat-timeout
[heartbeat_timeout: <duration> | default = 1m]

# Observe tokens after generating to resolve collisions. Useful when using
# gossiping ring.
# CLI flag: -pattern-ingester.observe-period
[observe_period: <duration> | default = 0s]

# Period to wait for a claim from another member; will join automatically
# after this.
# CLI flag: -pattern-ingester.join-after
[join_after: <duration> | default = 0s]

# Minimum duration to wait after the internal readiness checks have passed
# but before succeeding the readiness endpoint. This is used to slowdown
# deployment controllers (eg. Kubernetes) after an instance is ready and
# before they proceed with a rolling update, to give the rest of the cluster
# instances enough time to receive ring updates.
# CLI flag: -pattern-ingester.min-ready-duration
[min_ready_duration: <duration> | default = 15s]

# Name of network interface to read address from.
# CLI flag: -pattern-ingester.lifecycler.interface
[interface_names: <list of strings> | default = [<private network interfaces>]]

# Enable IPv6 support. Required to make use of IP addresses from IPv6
# interfaces.
# CLI flag: -pattern-ingester.enable-inet6
[enable_inet6: <boolean> | default = false]

# Duration to sleep for before exiting, to ensure metrics are scraped.
# CLI flag: -pattern-ingester.final-sleep
[final_sleep: <duration> | default = 0s]

# File path where tokens are stored. If empty, tokens are not stored at
# shutdown and restored at startup.
# CLI flag: -pattern-ingester.tokens-file-path
[tokens_file_path: <string> | default = ""]

# The availability zone where this instance is running.
# CLI flag: -pattern-ingester.availability-zone
[availability_zone: <string> | default = ""]

# Unregister from the ring upon clean shutdown. It can be useful to disable
# for rolling restarts with consistent naming in conjunction with
# -distributor.extend-writes=false.
# CLI flag: -pattern-ingester.unregister-on-shutdown
[unregister_on_shutdown: <boolean> | default = true]

# When enabled the readiness probe succeeds only after all instances are
# ACTIVE and healthy in the ring, otherwise only the instance itself is
# checked. This option should be disabled if in your cluster multiple
# instances can be rolled out simultaneously, otherwise rolling updates may
# be slowed down.
# CLI flag: -pattern-ingester.readiness-check-ring-health
[readiness_check_ring_health: <boolean> | default = true]

# IP address to advertise in the ring.
# CLI flag: -pattern-ingester.lifecycler.addr
[address: <string> | default = ""]

# port to advertise in consul (defaults to server.grpc-listen-port).
# CLI flag: -pattern-ingester.lifecycler.port
[port: <int> | default = 0]

# ID to register in the ring.
# CLI flag: -pattern-ingester.lifecycler.ID
[id: <string> | default = "<hostname>"]

# Configures how the pattern ingester will connect to the ingesters.
client_config:
# Configures how connections are pooled.
pool_config:
# How frequently to clean up clients for ingesters that have gone away.
# CLI flag: -pattern-ingester.client-cleanup-period
[client_cleanup_period: <duration> | default = 15s]

# Run a health check on each ingester client during periodic cleanup.
# CLI flag: -pattern-ingester.health-check-ingesters
[health_check_ingesters: <boolean> | default = true]

# Timeout for the health check.
# CLI flag: -pattern-ingester.remote-timeout
[remote_timeout: <duration> | default = 1s]

# The remote request timeout on the client side.
# CLI flag: -pattern-ingester.client.timeout
[remote_timeout: <duration> | default = 5s]

# Configures how the gRPC connection to ingesters work as a client.
# The CLI flags prefix for this block configuration is:
# pattern-ingester.client
[grpc_client_config: <grpc_client>]

# How many flushes can happen concurrently from each stream.
# CLI flag: -pattern-ingester.concurrent-flushes
[concurrent_flushes: <int> | default = 32]

# How often should the ingester see if there are any blocks to flush. The
# first flush check is delayed by a random time up to 0.8x the flush check
# period. Additionally, there is +/- 1% jitter added to the interval.
# CLI flag: -pattern-ingester.flush-check-period
[flush_check_period: <duration> | default = 30s]

# The index_gateway block configures the Loki index gateway server, responsible
# for serving index queries without the need to constantly interact with the
# object store.
Expand Down Expand Up @@ -3963,6 +4142,7 @@ Configuration for a Consul client. Only applies if the selected kvstore is `cons
- `compactor.ring`
- `distributor.ring`
- `index-gateway.ring`
- `pattern-ingester`
- `query-scheduler.ring`
- `ruler.ring`

Expand Down Expand Up @@ -4009,6 +4189,7 @@ Configuration for an ETCD v3 client. Only applies if the selected kvstore is `et
- `compactor.ring`
- `distributor.ring`
- `index-gateway.ring`
- `pattern-ingester`
- `query-scheduler.ring`
- `ruler.ring`

Expand Down Expand Up @@ -4310,6 +4491,7 @@ The `grpc_client` block configures the gRPC client used to communicate between t
- `boltdb.shipper.index-gateway-client.grpc`
- `frontend.grpc-client-config`
- `ingester.client`
- `pattern-ingester.client`
- `querier.frontend-client`
- `query-scheduler.grpc-client-config`
- `ruler.client`
Expand Down
100 changes: 100 additions & 0 deletions docs/sources/reference/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ These HTTP endpoints are exposed by the `querier`, `query-frontend`, `read`, and
- [`GET /loki/api/v1/index/stats`](#query-log-statistics)
- [`GET /loki/api/v1/index/volume`](#query-log-volume)
- [`GET /loki/api/v1/index/volume_range`](#query-log-volume)
- [`GET /loki/api/v1/patterns`](#patterns-detection)
- [`GET /loki/api/v1/tail`](#stream-logs)

### Status endpoints
Expand Down Expand Up @@ -849,6 +850,105 @@ URL query parameters:

You can URL-encode these parameters directly in the request body by using the POST method and `Content-Type: application/x-www-form-urlencoded` header. This is useful when specifying a large or dynamic number of stream selectors that may breach server-side URL character limits.

## Patterns detection

```bash
GET /loki/api/v1/patterns
```

{{< admonition type="note" >}}
You must configure

```yaml
pattern_ingester:
enabled: true
```
to enable this feature.
{{< /admonition >}}
The `/loki/api/v1/patterns` endpoint can be used to query loki for patterns detected in the logs. This helps understand the structure of the logs Loki has ingested.

The `query` should be a valid LogQL stream selector, for example `{job="foo", env=~".+"}`. The result is aggregated by the `pattern` from all matching streams.

For each pattern detected, the response includes the pattern itself and the number of samples for each pattern at each timestamp.

For example, if you have the following logs:

```log
ts=2024-03-30T23:03:40 caller=grpc_logging.go:66 level=info method=/cortex.Ingester/Push duration=200ms msg=gRPC
ts=2024-03-30T23:03:41 caller=grpc_logging.go:66 level=info method=/cortex.Ingester/Push duration=500ms msg=gRPC
```

The pattern detected would be:

```log
ts=<_> caller=grpc_logging.go:66 level=info method=/cortex.Ingester/Push duration=<_> msg=gRPC
```

URL query parameters:

- `query`: The [LogQL]({{< relref "../query" >}}) matchers to check (that is, `{job="foo", env=~".+"}`). This parameter is required.
- `start=<nanosecond Unix epoch>`: Start timestamp. This parameter is required.
- `end=<nanosecond Unix epoch>`: End timestamp. This parameter is required.

### Examples

This example cURL command

```bash
curl -s "http://localhost:3100/loki/api/v1/patterns" \
--data-urlencode 'query={app="loki"}' | jq
```

gave this response:

```json
{
"status": "success",
"data": [
{
"pattern": "<_> caller=grpc_logging.go:66 <_> level=error method=/cortex.Ingester/Push <_> msg=gRPC err=\"connection refused to object store\"",
"samples": [
[
1711839260,
1
],
[
1711839270,
2
],
[
1711839280,
1
]
]
},
{
"pattern": "<_> caller=grpc_logging.go:66 <_> level=info method=/cortex.Ingester/Push <_> msg=gRPC",
"samples": [
[
1711839260,
105
],
[
1711839270,
222
],
[
1711839280,
196
]
]
}
]
}
```

The result is a list of patterns detected in the logs, with the number of samples for each pattern at each timestamp.
The pattern format is the same as the [LogQL]({{< relref "../query" >}}) pattern filter and parser and can be used in queries for filtering matching logs.
Each sample is a tuple of timestamp (second) and count.

## Stream logs

```bash
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ require (
github.com/gogo/googleapis v1.4.0
github.com/grafana/jsonparser v0.0.0-20240209175146-098958973a2d
github.com/grafana/loki/pkg/push v0.0.0-20231124142027-e52380921608
github.com/hashicorp/golang-lru/v2 v2.0.7
github.com/heroku/x v0.0.61
github.com/influxdata/tdigest v0.0.2-0.20210216194612-fc98d27c9e8b
github.com/prometheus/alertmanager v0.27.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1129,6 +1129,8 @@ github.com/hashicorp/golang-lru v0.5.3/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uG
github.com/hashicorp/golang-lru v0.5.4/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4=
github.com/hashicorp/golang-lru v0.6.0 h1:uL2shRDx7RTrOrTCUZEGP/wJUFiUI8QT6E7z5o8jga4=
github.com/hashicorp/golang-lru v0.6.0/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4=
github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k=
github.com/hashicorp/golang-lru/v2 v2.0.7/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM=
github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ=
github.com/hashicorp/hcp-scada-provider v0.2.0/go.mod h1:Q0WpS2RyhBKOPD4X/8oW7AJe7jA2HXB09EwDzwRTao0=
github.com/hashicorp/hcp-sdk-go v0.23.0/go.mod h1:/9UoDY2FYYA8lFaKBb2HmM/jKYZGANmf65q9QRc/cVw=
Expand Down
7 changes: 3 additions & 4 deletions pkg/ingester/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ type instance struct {
streams *streamsMap

index *index.Multi
mapper *fpMapper // using of mapper no longer needs mutex because reading from streams is lock-free
mapper *FpMapper // using of mapper no longer needs mutex because reading from streams is lock-free

instanceID string

Expand Down Expand Up @@ -175,7 +175,7 @@ func newInstance(
writeFailures: writeFailures,
schemaconfig: &c,
}
i.mapper = newFPMapper(i.getLabelsFromFingerprint)
i.mapper = NewFPMapper(i.getLabelsFromFingerprint)
return i, err
}

Expand Down Expand Up @@ -383,7 +383,6 @@ func (i *instance) chunkFormatAt(at model.Time) (byte, chunkenc.HeadBlockFmt, er
}

return chunkFormat, headblock, nil

}

// getOrCreateStream returns the stream or creates it.
Expand Down Expand Up @@ -411,7 +410,7 @@ func (i *instance) removeStream(s *stream) {
func (i *instance) getHashForLabels(ls labels.Labels) model.Fingerprint {
var fp uint64
fp, i.buf = ls.HashWithoutLabels(i.buf, []string(nil)...)
return i.mapper.mapFP(model.Fingerprint(fp), ls)
return i.mapper.MapFP(model.Fingerprint(fp), ls)
}

// Return labels associated with given fingerprint. Used by fingerprint mapper.
Expand Down
Loading

0 comments on commit 491d251

Please sign in to comment.