Skip to content

Commit

Permalink
Merge branch 'main' into openstack-application-credentials
Browse files Browse the repository at this point in the history
  • Loading branch information
heytrav authored Jul 1, 2024
2 parents 446da51 + 0602b90 commit 0ca3cc6
Show file tree
Hide file tree
Showing 11 changed files with 190 additions and 46 deletions.
12 changes: 8 additions & 4 deletions docs/sources/setup/install/helm/install-microservices/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,10 @@ It is not recommended to run scalable mode with `filesystem` storage. For the pu

After testing Loki with MinIO, it is recommended to configure Loki with an object storage provider. The following examples shows how to configure Loki with different object storage providers:

{{< admonition type="caution" >}}
When deploying Loki using S3 Storage **DO NOT** use the default bucket names; `chunk`, `ruler` and `admin`. Choose a unique name for each bucket. For more information see the following [security update](https://grafana.com/blog/2024/06/27/grafana-security-update-grafana-loki-and-unintended-data-write-attempts-to-amazon-s3-buckets/). This caution does not apply when you are using MinIO. When using MinIO we recommend using the default bucket names.
{{< /admonition >}}

{{< code >}}

```s3
Expand All @@ -192,9 +196,9 @@ After testing Loki with MinIO, it is recommended to configure Loki with an objec
storage:
type: s3
bucketNames:
chunks: "chunks"
ruler: "ruler"
admin: "admin"
chunks: "<INSERT BUCKET NAME>"
ruler: "<INSERT BUCKET NAME>"
admin: "<INSERT BUCKET NAME>"
s3:
# s3 URL can be used to specify the endpoint, access key, secret key, and bucket name
s3: s3://access_key:secret_access_key@custom_endpoint/bucket_name
Expand Down Expand Up @@ -343,4 +347,4 @@ To configure other storage providers, refer to the [Helm Chart Reference]({{< re

## Next Steps
* Configure an agent to [send log data to Loki](/docs/loki/<LOKI_VERSION>/send-data/).
* Monitor the Loki deployment using the [Meta Monitoring Healm chart](/docs/loki/<LOKI_VERSION>/setup/install/helm/monitor-and-alert/)
* Monitor the Loki deployment using the [Meta Monitoring Helm chart](/docs/loki/<LOKI_VERSION>/setup/install/helm/monitor-and-alert/)
12 changes: 8 additions & 4 deletions docs/sources/setup/install/helm/install-scalable/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,10 @@ It is not recommended to run scalable mode with `filesystem` storage. For the pu

After testing Loki with MinIO, it is recommended to configure Loki with an object storage provider. The following examples shows how to configure Loki with different object storage providers:

{{< admonition type="caution" >}}
When deploying Loki using S3 Storage **DO NOT** use the default bucket names; `chunk`, `ruler` and `admin`. Choose a unique name for each bucket. For more information see the following [security update](https://grafana.com/blog/2024/06/27/grafana-security-update-grafana-loki-and-unintended-data-write-attempts-to-amazon-s3-buckets/). This caution does not apply when you are using MinIO. When using MinIO we recommend using the default bucket names.
{{< /admonition >}}

{{< code >}}

```s3
Expand All @@ -151,9 +155,9 @@ loki:
storage:
type: s3
bucketNames:
chunks: "chunks"
ruler: "ruler"
admin: "admin"
chunks: "<INSERT BUCKET NAME>"
ruler: "<INSERT BUCKET NAME>"
admin: "<INSERT BUCKET NAME>"
s3:
# s3 URL can be used to specify the endpoint, access key, secret key, and bucket name
s3: s3://access_key:secret_access_key@custom_endpoint/bucket_name
Expand Down Expand Up @@ -295,4 +299,4 @@ To configure other storage providers, refer to the [Helm Chart Reference]({{< re

## Next Steps
* Configure an agent to [send log data to Loki](/docs/loki/<LOKI_VERSION>/send-data/).
* Monitor the Loki deployment using the [Meta Monitoring Healm chart](/docs/loki/<LOKI_VERSION>/setup/install/helm/monitor-and-alert/)
* Monitor the Loki deployment using the [Meta Monitoring Helm chart](/docs/loki/<LOKI_VERSION>/setup/install/helm/monitor-and-alert/)
17 changes: 16 additions & 1 deletion pkg/bloombuild/planner/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ type Metrics struct {
tasksRequeued prometheus.Counter
taskLost prometheus.Counter

planningTime prometheus.Histogram
buildStarted prometheus.Counter
buildCompleted *prometheus.CounterVec
buildTime *prometheus.HistogramVec
Expand Down Expand Up @@ -86,6 +87,14 @@ func NewMetrics(
Help: "Total number of tasks lost due to not being picked up by a builder and failed to be requeued.",
}),

planningTime: promauto.With(r).NewHistogram(prometheus.HistogramOpts{
Namespace: metricsNamespace,
Subsystem: metricsSubsystem,
Name: "planning_time_seconds",
Help: "Time spent planning a build cycle.",
// 1s --> 1h (steps of 1 minute)
Buckets: prometheus.LinearBuckets(1, 60, 60),
}),
buildStarted: promauto.With(r).NewCounter(prometheus.CounterOpts{
Namespace: metricsNamespace,
Subsystem: metricsSubsystem,
Expand All @@ -103,7 +112,13 @@ func NewMetrics(
Subsystem: metricsSubsystem,
Name: "build_time_seconds",
Help: "Time spent during a builds cycle.",
Buckets: prometheus.DefBuckets,
// Buckets in seconds:
Buckets: append(
// 1s --> 1h (steps of 10 minutes)
prometheus.LinearBuckets(1, 600, 6),
// 1h --> 24h (steps of 1 hour)
prometheus.LinearBuckets(3600, 3600, 24)...,
),
}, []string{"status"}),
buildLastSuccess: promauto.With(r).NewGauge(prometheus.GaugeOpts{
Namespace: metricsNamespace,
Expand Down
62 changes: 46 additions & 16 deletions pkg/bloombuild/planner/planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"go.uber.org/atomic"

"github.com/grafana/loki/v3/pkg/bloombuild/common"
"github.com/grafana/loki/v3/pkg/bloombuild/protos"
Expand Down Expand Up @@ -251,11 +252,18 @@ func (p *Planner) runOne(ctx context.Context) error {
}
}

level.Debug(p.logger).Log("msg", "planning completed", "tasks", totalTasks)
p.metrics.planningTime.Observe(time.Since(start).Seconds())
level.Debug(p.logger).Log(
"msg", "planning completed",
"tenantTables", len(tasksResultForTenantTable),
"tasks", totalTasks,
"time", time.Since(start).Seconds(),
)

// Create a goroutine to process the results for each table tenant tuple
// TODO(salvacorts): This may end up creating too many goroutines.
// Create a pool of workers to process table-tenant tuples.
var tasksSucceed atomic.Int64
var wg sync.WaitGroup
for tt, results := range tasksResultForTenantTable {
if results.tasksToWait == 0 {
Expand All @@ -267,21 +275,40 @@ func (p *Planner) runOne(ctx context.Context) error {
go func(table config.DayTable, tenant string, results tenantTableTaskResults) {
defer wg.Done()

if err := p.processTenantTaskResults(
logger := log.With(p.logger, "table", table.Addr(), "tenant", tenant)

nSucceed, err := p.processTenantTaskResults(
ctx, table, tenant,
results.originalMetas, results.tasksToWait, results.resultsCh,
); err != nil {
level.Error(p.logger).Log("msg", "failed to process tenant task results", "err", err)
)
if err != nil {
level.Error(logger).Log("msg", "failed to process tenant task results", "err", err)
}

if nSucceed != results.tasksToWait {
level.Error(logger).Log(
"msg", "not all tasks succeeded for tenant table",
"tasks", results.tasksToWait,
"tasksSucceed", nSucceed,
"tasksFailed", results.tasksToWait-nSucceed,
)
}
tasksSucceed.Add(int64(nSucceed))
}(tt.table, tt.tenant, results)
}

level.Debug(p.logger).Log("msg", "waiting for all tasks to be completed", "tasks", totalTasks, "tenantTables", len(tasksResultForTenantTable))
level.Debug(p.logger).Log(
"msg", "waiting for all tasks to be completed",
"tenantTables", len(tasksResultForTenantTable),
"tasks", totalTasks,
)
wg.Wait()

status = statusSuccess
level.Info(p.logger).Log(
"msg", "bloom build iteration completed",
"tasks", totalTasks,
"tasksSucceed", tasksSucceed.Load(),
"duration", time.Since(start).Seconds(),
)
return nil
Expand Down Expand Up @@ -324,7 +351,6 @@ func (p *Planner) computeTasks(
continue
}
if len(gaps) == 0 {
level.Debug(logger).Log("msg", "no gaps found")
continue
}

Expand All @@ -343,28 +369,31 @@ func (p *Planner) processTenantTaskResults(
originalMetas []bloomshipper.Meta,
totalTasks int,
resultsCh <-chan *protos.TaskResult,
) error {
) (int, error) {
logger := log.With(p.logger, "table", table.Addr(), "tenant", tenant)
level.Debug(logger).Log("msg", "waiting for all tasks to be completed", "tasks", totalTasks)

var tasksSucceed int
newMetas := make([]bloomshipper.Meta, 0, totalTasks)
for i := 0; i < totalTasks; i++ {
select {
case <-ctx.Done():
if err := ctx.Err(); err != nil && !errors.Is(err, context.Canceled) {
level.Error(logger).Log("msg", "planner context done with error", "err", err)
return err
return tasksSucceed, err
}

// No error or context canceled, just return
level.Debug(logger).Log("msg", "context done while waiting for task results")
return nil
return tasksSucceed, nil
case result := <-resultsCh:
if result == nil {
p.metrics.tenantTasksCompleted.WithLabelValues(tenant, statusFailure).Inc()
level.Error(logger).Log("msg", "received nil task result")
continue
}
if result.Error != nil {
p.metrics.tenantTasksCompleted.WithLabelValues(tenant, statusFailure).Inc()
level.Error(logger).Log(
"msg", "task failed",
"err", result.Error,
Expand All @@ -373,36 +402,39 @@ func (p *Planner) processTenantTaskResults(
continue
}

p.metrics.tenantTasksCompleted.WithLabelValues(tenant, statusSuccess).Inc()
newMetas = append(newMetas, result.CreatedMetas...)
tasksSucceed++
}
}

level.Debug(logger).Log(
"msg", "all tasks completed",
"msg", "all tasks completed for tenant table",
"tasks", totalTasks,
"tasksSucceed", tasksSucceed,
"originalMetas", len(originalMetas),
"newMetas", len(newMetas),
)

if len(newMetas) == 0 {
// No new metas were created, nothing to delete
// Note: this would only happen if all tasks failed
return nil
return tasksSucceed, nil
}

combined := append(originalMetas, newMetas...)
outdated := outdatedMetas(combined)
if len(outdated) == 0 {
level.Debug(logger).Log("msg", "no outdated metas found")
return nil
return tasksSucceed, nil
}

level.Debug(logger).Log("msg", "found outdated metas", "outdated", len(outdated))
if err := p.deleteOutdatedMetasAndBlocks(ctx, table, tenant, outdated); err != nil {
return fmt.Errorf("failed to delete outdated metas: %w", err)
return 0, fmt.Errorf("failed to delete outdated metas: %w", err)
}

return nil
return tasksSucceed, nil
}

func (p *Planner) deleteOutdatedMetasAndBlocks(
Expand Down Expand Up @@ -800,7 +832,6 @@ func (p *Planner) BuilderLoop(builder protos.PlannerForBuilder_BuilderLoopServer
if err != nil {
maxRetries := p.limits.BloomTaskMaxRetries(task.Tenant)
if maxRetries > 0 && int(task.timesEnqueued.Load()) >= maxRetries {
p.metrics.tenantTasksCompleted.WithLabelValues(task.Tenant, statusFailure).Inc()
p.removePendingTask(task)
level.Error(logger).Log(
"msg", "task failed after max retries",
Expand Down Expand Up @@ -842,7 +873,6 @@ func (p *Planner) BuilderLoop(builder protos.PlannerForBuilder_BuilderLoopServer
"retries", task.timesEnqueued.Load()-1, // -1 because the first enqueue is not a retry
)
p.removePendingTask(task)
p.metrics.tenantTasksCompleted.WithLabelValues(task.Tenant, statusSuccess).Inc()

// Send the result back to the task. The channel is buffered, so this should not block.
task.resultsChannel <- result
Expand Down
15 changes: 11 additions & 4 deletions pkg/bloombuild/planner/planner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -624,9 +624,10 @@ func Test_processTenantTaskResults(t *testing.T) {
for _, tc := range []struct {
name string

originalMetas []bloomshipper.Meta
taskResults []*protos.TaskResult
expectedMetas []bloomshipper.Meta
originalMetas []bloomshipper.Meta
taskResults []*protos.TaskResult
expectedMetas []bloomshipper.Meta
expectedTasksSucceed int
}{
{
name: "errors",
Expand All @@ -649,6 +650,7 @@ func Test_processTenantTaskResults(t *testing.T) {
genMeta(0, 10, []int{0}, []bloomshipper.BlockRef{genBlockRef(0, 10)}),
genMeta(10, 20, []int{0}, []bloomshipper.BlockRef{genBlockRef(10, 20)}),
},
expectedTasksSucceed: 0,
},
{
name: "no new metas",
Expand All @@ -669,6 +671,7 @@ func Test_processTenantTaskResults(t *testing.T) {
genMeta(0, 10, []int{0}, []bloomshipper.BlockRef{genBlockRef(0, 10)}),
genMeta(10, 20, []int{0}, []bloomshipper.BlockRef{genBlockRef(10, 20)}),
},
expectedTasksSucceed: 2,
},
{
name: "no original metas",
Expand All @@ -690,6 +693,7 @@ func Test_processTenantTaskResults(t *testing.T) {
genMeta(0, 10, []int{0}, []bloomshipper.BlockRef{genBlockRef(0, 10)}),
genMeta(10, 20, []int{0}, []bloomshipper.BlockRef{genBlockRef(10, 20)}),
},
expectedTasksSucceed: 2,
},
{
name: "single meta covers all original",
Expand All @@ -708,6 +712,7 @@ func Test_processTenantTaskResults(t *testing.T) {
expectedMetas: []bloomshipper.Meta{
genMeta(0, 10, []int{1}, []bloomshipper.BlockRef{genBlockRef(0, 10)}),
},
expectedTasksSucceed: 1,
},
{
name: "multi version ordering",
Expand All @@ -727,6 +732,7 @@ func Test_processTenantTaskResults(t *testing.T) {
genMeta(0, 10, []int{1}, []bloomshipper.BlockRef{genBlockRef(0, 10)}),
genMeta(8, 10, []int{2}, []bloomshipper.BlockRef{genBlockRef(8, 10)}),
},
expectedTasksSucceed: 1,
},
} {
t.Run(tc.name, func(t *testing.T) {
Expand Down Expand Up @@ -755,7 +761,7 @@ func Test_processTenantTaskResults(t *testing.T) {
go func() {
defer wg.Done()

err = planner.processTenantTaskResults(
completed, err := planner.processTenantTaskResults(
ctx,
testTable,
"fakeTenant",
Expand All @@ -764,6 +770,7 @@ func Test_processTenantTaskResults(t *testing.T) {
resultsCh,
)
require.NoError(t, err)
require.Equal(t, tc.expectedTasksSucceed, completed)
}()

for _, taskResult := range tc.taskResults {
Expand Down
11 changes: 8 additions & 3 deletions pkg/bloombuild/protos/compat.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package protos
import (
"fmt"

"github.com/google/uuid"
"github.com/pkg/errors"
"github.com/prometheus/common/model"

Expand All @@ -28,9 +27,15 @@ type Task struct {
Gaps []GapWithBlocks
}

func NewTask(table config.DayTable, tenant string, bounds v1.FingerprintBounds, tsdb tsdb.SingleTenantTSDBIdentifier, gaps []GapWithBlocks) *Task {
func NewTask(
table config.DayTable,
tenant string,
bounds v1.FingerprintBounds,
tsdb tsdb.SingleTenantTSDBIdentifier,
gaps []GapWithBlocks,
) *Task {
return &Task{
ID: uuid.NewString(),
ID: fmt.Sprintf("%s-%s-%s-%d-%d", table.Addr(), tenant, bounds.String(), tsdb.Checksum, len(gaps)),

Table: table,
Tenant: tenant,
Expand Down
3 changes: 3 additions & 0 deletions pkg/logql/downstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ func TestMappingEquivalence(t *testing.T) {
{`sum(rate({a=~".+"} |= "foo" != "foo"[1s]) or vector(1))`, false, nil},
{`avg_over_time({a=~".+"} | logfmt | unwrap value [1s])`, false, nil},
{`avg_over_time({a=~".+"} | logfmt | unwrap value [1s]) by (a)`, true, nil},
{`avg_over_time({a=~".+"} | logfmt | unwrap value [1s]) without (stream)`, true, nil},
{`avg_over_time({a=~".+"} | logfmt | drop level | unwrap value [1s])`, true, nil},
{`avg_over_time({a=~".+"} | logfmt | drop level | unwrap value [1s]) without (stream)`, true, nil},
{`quantile_over_time(0.99, {a=~".+"} | logfmt | unwrap value [1s])`, true, []string{ShardQuantileOverTime}},
{
`
Expand Down
Loading

0 comments on commit 0ca3cc6

Please sign in to comment.