Skip to content

Commit

Permalink
Merge branch 'main' into fluentbit-docs-changes
Browse files Browse the repository at this point in the history
  • Loading branch information
JStickler authored Oct 8, 2024
2 parents 2191e6c + 8ec8cfb commit 137b832
Show file tree
Hide file tree
Showing 9 changed files with 66 additions and 27 deletions.
2 changes: 1 addition & 1 deletion clients/cmd/fluentd/docker/Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@
source 'https://rubygems.org'

gem 'fluentd', '1.15.3'
gem 'fluent-plugin-multi-format-parser', '~>1.0.0'
gem 'fluent-plugin-multi-format-parser', '~>1.1.0'
1 change: 0 additions & 1 deletion clients/pkg/promtail/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ type Config struct {

// UnmarshalYAML implements the yaml.Unmarshaler interface.
func (c *Config) UnmarshalYAML(unmarshal func(interface{}) error) error {
*c = Config{}
// We want to set c to the defaults and then overwrite it with the input.
// To make unmarshal fill the plain data struct rather than calling UnmarshalYAML
// again, we have to hide it using a type indirection.
Expand Down
12 changes: 8 additions & 4 deletions docs/sources/setup/install/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,24 @@ weight: 200

# Install Loki

There are several methods of installing Loki and Promtail:
There are several methods of installing Loki:

- [Install using Helm (recommended)]({{< relref "./helm" >}})
- [Install using Tanka]({{< relref "./tanka" >}})
- [Install using Docker or Docker Compose]({{< relref "./docker" >}})
- [Install and run locally]({{< relref "./local" >}})
- [Install from source]({{< relref "./install-from-source" >}})

Alloy:
- [Install Alloy](https://grafana.com/docs/alloy/latest/set-up/install/)
- [Ingest Logs with Alloy]({{< relref "../../send-data/alloy" >}})

## General process

In order to run Loki, you must:

1. Download and install both Loki and Promtail.
1. Download and install both Loki and Alloy.
1. Download config files for both programs.
1. Start Loki.
1. Update the Promtail config file to get your logs into Loki.
1. Start Promtail.
1. Update the Alloy config file to get your logs into Loki.
1. Start Alloy.
9 changes: 5 additions & 4 deletions pkg/bloomgateway/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ type workerMetrics struct {
dequeueDuration *prometheus.HistogramVec
queueDuration *prometheus.HistogramVec
processDuration *prometheus.HistogramVec
tasksDequeued *prometheus.CounterVec
tasksDequeued *prometheus.HistogramVec
tasksProcessed *prometheus.CounterVec
blocksNotAvailable *prometheus.CounterVec
blockQueryLatency *prometheus.HistogramVec
Expand Down Expand Up @@ -147,11 +147,12 @@ func newWorkerMetrics(registerer prometheus.Registerer, namespace, subsystem str
Name: "process_duration_seconds",
Help: "Time spent processing tasks in seconds",
}, append(labels, "status")),
tasksDequeued: r.NewCounterVec(prometheus.CounterOpts{
tasksDequeued: r.NewHistogramVec(prometheus.HistogramOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "tasks_dequeued_total",
Help: "Total amount of tasks that the worker dequeued from the queue",
Name: "tasks_dequeued",
Help: "Total amount of tasks that the worker dequeued from the queue at once",
Buckets: prometheus.ExponentialBuckets(1, 2, 8), // [1, 2, ..., 128]
}, append(labels, "status")),
tasksProcessed: r.NewCounterVec(prometheus.CounterOpts{
Namespace: namespace,
Expand Down
4 changes: 2 additions & 2 deletions pkg/bloomgateway/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func (w *worker) running(_ context.Context) error {
if err == queue.ErrStopped && len(items) == 0 {
return err
}
w.metrics.tasksDequeued.WithLabelValues(w.id, labelFailure).Inc()
w.metrics.tasksDequeued.WithLabelValues(w.id, labelFailure).Observe(1)
level.Error(w.logger).Log("msg", "failed to dequeue tasks", "err", err, "items", len(items))
}
idx = newIdx
Expand All @@ -86,7 +86,7 @@ func (w *worker) running(_ context.Context) error {
continue
}

w.metrics.tasksDequeued.WithLabelValues(w.id, labelSuccess).Add(float64(len(items)))
w.metrics.tasksDequeued.WithLabelValues(w.id, labelSuccess).Observe(float64(len(items)))

tasks := make([]Task, 0, len(items))
for _, item := range items {
Expand Down
20 changes: 12 additions & 8 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,25 +279,29 @@ func New(
Help: "Total number of times the distributor has sharded streams",
}),
kafkaAppends: promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{
Name: "kafka_appends_total",
Help: "The total number of appends sent to kafka ingest path.",
Namespace: constants.Loki,
Name: "distributor_kafka_appends_total",
Help: "The total number of appends sent to kafka ingest path.",
}, []string{"partition", "status"}),
kafkaWriteLatency: promauto.With(registerer).NewHistogram(prometheus.HistogramOpts{
Name: "kafka_latency_seconds",
Namespace: constants.Loki,
Name: "distributor_kafka_latency_seconds",
Help: "Latency to write an incoming request to the ingest storage.",
NativeHistogramBucketFactor: 1.1,
NativeHistogramMinResetDuration: 1 * time.Hour,
NativeHistogramMaxBucketNumber: 100,
Buckets: prometheus.DefBuckets,
}),
kafkaWriteBytesTotal: promauto.With(registerer).NewCounter(prometheus.CounterOpts{
Name: "kafka_sent_bytes_total",
Help: "Total number of bytes sent to the ingest storage.",
Namespace: constants.Loki,
Name: "distributor_kafka_sent_bytes_total",
Help: "Total number of bytes sent to the ingest storage.",
}),
kafkaRecordsPerRequest: promauto.With(registerer).NewHistogram(prometheus.HistogramOpts{
Name: "kafka_records_per_write_request",
Help: "The number of records a single per-partition write request has been split into.",
Buckets: prometheus.ExponentialBuckets(1, 2, 8),
Namespace: constants.Loki,
Name: "distributor_kafka_records_per_write_request",
Help: "The number of records a single per-partition write request has been split into.",
Buckets: prometheus.ExponentialBuckets(1, 2, 8),
}),
writeFailuresManager: writefailures.NewManager(logger, registerer, cfg.WriteFailuresLogging, configs, "distributor"),
kafkaWriter: kafkaWriter,
Expand Down
18 changes: 12 additions & 6 deletions pkg/kafka/writer_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (
"go.opentelemetry.io/otel/propagation"
"go.opentelemetry.io/otel/trace"
"go.uber.org/atomic"

"github.com/grafana/loki/v3/pkg/util/constants"
)

// NewWriterClient returns the kgo.Client that should be used by the Writer.
Expand Down Expand Up @@ -189,6 +191,7 @@ func NewProducer(client *kgo.Client, maxBufferedBytes int64, reg prometheus.Regi
// Metrics.
bufferedProduceBytes: promauto.With(reg).NewSummary(
prometheus.SummaryOpts{
Namespace: constants.Loki,
Name: "buffered_produce_bytes",
Help: "The buffered produce records in bytes. Quantile buckets keep track of buffered records size over the last 60s.",
Objectives: map[float64]float64{0.5: 0.05, 0.99: 0.001, 1: 0.001},
Expand All @@ -197,16 +200,19 @@ func NewProducer(client *kgo.Client, maxBufferedBytes int64, reg prometheus.Regi
}),
bufferedProduceBytesLimit: promauto.With(reg).NewGauge(
prometheus.GaugeOpts{
Name: "buffered_produce_bytes_limit",
Help: "The bytes limit on buffered produce records. Produce requests fail once this limit is reached.",
Namespace: constants.Loki,
Name: "buffered_produce_bytes_limit",
Help: "The bytes limit on buffered produce records. Produce requests fail once this limit is reached.",
}),
produceRequestsTotal: promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "produce_requests_total",
Help: "Total number of produce requests issued to Kafka.",
Namespace: constants.Loki,
Name: "produce_requests_total",
Help: "Total number of produce requests issued to Kafka.",
}),
produceFailuresTotal: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "produce_failures_total",
Help: "Total number of failed produce requests issued to Kafka.",
Namespace: constants.Loki,
Name: "produce_failures_total",
Help: "Total number of failed produce requests issued to Kafka.",
}, []string{"reason"}),
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/chunk/client/aws/s3_storage_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,7 @@ func (a *S3ObjectClient) GetObject(ctx context.Context, objectKey string) (io.Re
// Map the key into a bucket
bucket := a.bucketFromKey(objectKey)

var lastErr error
lastErr := ctx.Err()

retries := backoff.New(ctx, a.cfg.BackoffConfig)
for retries.Ongoing() {
Expand Down
25 changes: 25 additions & 0 deletions pkg/storage/chunk/client/aws/s3_storage_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,31 @@ func TestRequestMiddleware(t *testing.T) {
}
}

func TestS3ObjectClient_GetObject_CanceledContext(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
fmt.Fprintln(w, r.Header.Get("echo-me"))
}))
defer ts.Close()

cfg := S3Config{
Endpoint: ts.URL,
BucketNames: "buck-o",
S3ForcePathStyle: true,
Insecure: true,
AccessKeyID: "key",
SecretAccessKey: flagext.SecretWithValue("secret"),
}

client, err := NewS3ObjectClient(cfg, hedging.Config{})
require.NoError(t, err)

ctx, cancel := context.WithCancel(context.Background())
cancel()

_, _, err = client.GetObject(ctx, "key")
require.Error(t, err, "GetObject should fail when given a canceled context")
}

func Test_Hedging(t *testing.T) {
for _, tc := range []struct {
name string
Expand Down

0 comments on commit 137b832

Please sign in to comment.