Skip to content

Commit

Permalink
ruler: Support writeNotify on Loki ruler (#10906)
Browse files Browse the repository at this point in the history
**What this PR does / why we need it**:
prometheus ruler added a feature of notifying the reader when a sample
is appended, instead of waiting in a loop burning the CPU cycles.
prometheus/prometheus#11949

This changes a default behaviour a bit. Now if `notify` is not enabled,
next read is done only when next readTicker is triggered.

**Which issue(s) this PR fixes**:
Also should fix #10859

**Special notes for your reviewer**:
Adding few more details for the sake of completeness.

We found this via more frequent failures of rule-evaluation integration
tests linked on the issue above. After some investigation, we tracked
down to prometheus changes.

Prometheus introduced new type `wlog.WriteNotified` interface with
`Notify()` method with a goal to notify any waiting readers, that some
write is done.

Two types implements this type `wlog.Watcher` and `remote.Storage`.
`remote.Storage` implements `Notify()` by just calling it's queues
`wlog.Watcher`'s `Notify()` under the hood.

How are these types impacts Loki ruler?

Loki ruler also uses `remote.Storage`. So when any samples got committed
via `appender`, we have to notify the remote storage.

**Checklist**
- [ ] Reviewed the
[`CONTRIBUTING.md`](https://github.com/grafana/loki/blob/main/CONTRIBUTING.md)
guide (**required**)
- [ ] Documentation added
- [ ] Tests updated
- [ ] `CHANGELOG.md` updated
- [ ] If the change is worth mentioning in the release notes, add
`add-to-release-notes` label
- [ ] Changes that require user attention or interaction to upgrade are
documented in `docs/sources/setup/upgrade/_index.md`
- [ ] For Helm chart changes bump the Helm chart version in
`production/helm/loki/Chart.yaml` and update
`production/helm/loki/CHANGELOG.md` and
`production/helm/loki/README.md`. [Example
PR](d10549e)

---------

Signed-off-by: Kaviraj <[email protected]>
  • Loading branch information
kavirajk authored Oct 16, 2023
1 parent 1ac7410 commit 52a7c0f
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 1 deletion.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

##### Enhancements

* [10906](https://github.com/grafana/loki/pull/10906) **kavirajk**: Support Loki ruler to notify WAL writes to remote storage.
* [10613](https://github.com/grafana/loki/pull/10613) **ngc4579**: Helm: allow GrafanaAgent tolerations
* [10295](https://github.com/grafana/loki/pull/10295) **changhyuni**: Storage: remove signatureversionv2 from s3.
* [10140](https://github.com/grafana/loki/pull/10140) **dannykopping**: Dynamic client-side throttling to avoid object storage rate-limits (GCS only)
Expand Down
3 changes: 3 additions & 0 deletions pkg/ruler/storage/instance/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/prometheus/prometheus/scrape"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/storage/remote"
"github.com/prometheus/prometheus/tsdb/wlog"
"gopkg.in/yaml.v2"

"github.com/grafana/loki/pkg/ruler/storage/util"
Expand Down Expand Up @@ -311,6 +312,7 @@ func (i *Instance) initialize(_ context.Context, reg prometheus.Registerer, cfg
}

i.storage = storage.NewFanout(i.logger, i.wal, i.remoteStore)
i.wal.SetWriteNotified(i.remoteStore)
i.initialized = true

return nil
Expand Down Expand Up @@ -513,6 +515,7 @@ type walStorage interface {

StartTime() (int64, error)
WriteStalenessMarkers(remoteTsFunc func() int64) error
SetWriteNotified(wlog.WriteNotified)
Appender(context.Context) storage.Appender
Truncate(mint int64) error

Expand Down
2 changes: 2 additions & 0 deletions pkg/ruler/storage/instance/instance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/metadata"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb/wlog"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

Expand Down Expand Up @@ -246,6 +247,7 @@ type mockWalStorage struct {
func (s *mockWalStorage) Directory() string { return s.directory }
func (s *mockWalStorage) StartTime() (int64, error) { return 0, nil }
func (s *mockWalStorage) WriteStalenessMarkers(_ func() int64) error { return nil }
func (s *mockWalStorage) SetWriteNotified(_ wlog.WriteNotified) {}
func (s *mockWalStorage) Close() error { return nil }
func (s *mockWalStorage) Truncate(_ int64) error { return nil }

Expand Down
23 changes: 22 additions & 1 deletion pkg/ruler/storage/wal/wal.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ type Storage struct {
deleted map[chunks.HeadSeriesRef]int // Deleted series, and what WAL segment they must be kept until.

metrics *Metrics

writeNotified wlog.WriteNotified
}

// NewStorage makes a new Storage.
Expand All @@ -87,8 +89,14 @@ func NewStorage(logger log.Logger, metrics *Metrics, registerer prometheus.Regis
}

storage.appenderPool.New = func() interface{} {
var notify func()

if storage.writeNotified != nil {
notify = storage.writeNotified.Notify
}
return &appender{
w: storage,
notify: notify,
series: make([]record.RefSeries, 0, 100),
samples: make([]record.RefSample, 0, 100),
exemplars: make([]record.RefExemplar, 0, 10),
Expand Down Expand Up @@ -116,6 +124,10 @@ func NewStorage(logger log.Logger, metrics *Metrics, registerer prometheus.Regis
return storage, nil
}

func (w *Storage) SetWriteNotified(writeNotified wlog.WriteNotified) {
w.writeNotified = writeNotified
}

func (w *Storage) replayWAL() error {
w.walMtx.RLock()
defer w.walMtx.RUnlock()
Expand Down Expand Up @@ -532,7 +544,9 @@ func dirSize(path string) (int64, error) {
}

type appender struct {
w *Storage
w *Storage
// Notify the underlying storage that some sample is written
notify func()
series []record.RefSeries
samples []record.RefSample
exemplars []record.RefExemplar
Expand Down Expand Up @@ -677,6 +691,13 @@ func (a *appender) Commit() error {
buf = buf[:0]
}

// Notify so that reader waiting for it can read without needing to wait for next read ticker.
if a.notify != nil {
a.notify()
} else {
level.Warn(a.w.logger).Log("msg", "not notifying about WAL writes because notifier is not set")
}

//nolint:staticcheck
a.w.bufPool.Put(buf)

Expand Down

0 comments on commit 52a7c0f

Please sign in to comment.