diff --git a/CHANGELOG.md b/CHANGELOG.md index 5b909499dd74e..45c57e939b242 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,7 @@ ##### Enhancements +* [10906](https://github.com/grafana/loki/pull/10906) **kavirajk**: Support Loki ruler to notify WAL writes to remote storage. * [10613](https://github.com/grafana/loki/pull/10613) **ngc4579**: Helm: allow GrafanaAgent tolerations * [10295](https://github.com/grafana/loki/pull/10295) **changhyuni**: Storage: remove signatureversionv2 from s3. * [10140](https://github.com/grafana/loki/pull/10140) **dannykopping**: Dynamic client-side throttling to avoid object storage rate-limits (GCS only) diff --git a/pkg/ruler/storage/instance/instance.go b/pkg/ruler/storage/instance/instance.go index addf309c6f6d5..b6cd1fc2ed25f 100644 --- a/pkg/ruler/storage/instance/instance.go +++ b/pkg/ruler/storage/instance/instance.go @@ -24,6 +24,7 @@ import ( "github.com/prometheus/prometheus/scrape" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/storage/remote" + "github.com/prometheus/prometheus/tsdb/wlog" "gopkg.in/yaml.v2" "github.com/grafana/loki/pkg/ruler/storage/util" @@ -311,6 +312,7 @@ func (i *Instance) initialize(_ context.Context, reg prometheus.Registerer, cfg } i.storage = storage.NewFanout(i.logger, i.wal, i.remoteStore) + i.wal.SetWriteNotified(i.remoteStore) i.initialized = true return nil @@ -513,6 +515,7 @@ type walStorage interface { StartTime() (int64, error) WriteStalenessMarkers(remoteTsFunc func() int64) error + SetWriteNotified(wlog.WriteNotified) Appender(context.Context) storage.Appender Truncate(mint int64) error diff --git a/pkg/ruler/storage/instance/instance_test.go b/pkg/ruler/storage/instance/instance_test.go index 6d48fe919cdcf..ec814e425d5c9 100644 --- a/pkg/ruler/storage/instance/instance_test.go +++ b/pkg/ruler/storage/instance/instance_test.go @@ -25,6 +25,7 @@ import ( "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/metadata" "github.com/prometheus/prometheus/storage" + "github.com/prometheus/prometheus/tsdb/wlog" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -246,6 +247,7 @@ type mockWalStorage struct { func (s *mockWalStorage) Directory() string { return s.directory } func (s *mockWalStorage) StartTime() (int64, error) { return 0, nil } func (s *mockWalStorage) WriteStalenessMarkers(_ func() int64) error { return nil } +func (s *mockWalStorage) SetWriteNotified(_ wlog.WriteNotified) {} func (s *mockWalStorage) Close() error { return nil } func (s *mockWalStorage) Truncate(_ int64) error { return nil } diff --git a/pkg/ruler/storage/wal/wal.go b/pkg/ruler/storage/wal/wal.go index d5b4466da98d4..152a7076282a9 100644 --- a/pkg/ruler/storage/wal/wal.go +++ b/pkg/ruler/storage/wal/wal.go @@ -62,6 +62,8 @@ type Storage struct { deleted map[chunks.HeadSeriesRef]int // Deleted series, and what WAL segment they must be kept until. metrics *Metrics + + writeNotified wlog.WriteNotified } // NewStorage makes a new Storage. @@ -87,8 +89,14 @@ func NewStorage(logger log.Logger, metrics *Metrics, registerer prometheus.Regis } storage.appenderPool.New = func() interface{} { + var notify func() + + if storage.writeNotified != nil { + notify = storage.writeNotified.Notify + } return &appender{ w: storage, + notify: notify, series: make([]record.RefSeries, 0, 100), samples: make([]record.RefSample, 0, 100), exemplars: make([]record.RefExemplar, 0, 10), @@ -116,6 +124,10 @@ func NewStorage(logger log.Logger, metrics *Metrics, registerer prometheus.Regis return storage, nil } +func (w *Storage) SetWriteNotified(writeNotified wlog.WriteNotified) { + w.writeNotified = writeNotified +} + func (w *Storage) replayWAL() error { w.walMtx.RLock() defer w.walMtx.RUnlock() @@ -532,7 +544,9 @@ func dirSize(path string) (int64, error) { } type appender struct { - w *Storage + w *Storage + // Notify the underlying storage that some sample is written + notify func() series []record.RefSeries samples []record.RefSample exemplars []record.RefExemplar @@ -677,6 +691,13 @@ func (a *appender) Commit() error { buf = buf[:0] } + // Notify so that reader waiting for it can read without needing to wait for next read ticker. + if a.notify != nil { + a.notify() + } else { + level.Warn(a.w.logger).Log("msg", "not notifying about WAL writes because notifier is not set") + } + //nolint:staticcheck a.w.bufPool.Put(buf)