Skip to content

Commit

Permalink
Introduce worker queue in bloom gateway (#10976)
Browse files Browse the repository at this point in the history
Instead of calling the bloom store directly on each and every request to filter chunk refs based on the given filters, we want to queue requests in per-tenant queues and process batches of requests that can be multiplexed to avoid excessive seeking in the bloom block queriers when checking chunk matches.

This PR re-uses the request queue implementation used in the query scheduler. To do so, it moves the queue related code from the scheduler into a separate package `pkg/queue` and renames occurrences of "querier" to "consumer" to be more generic.

The bloom gateway instantiates the request queue when starting the service. The gRPC method `FilterChunkRefs` then enqueues incoming requests to that queue.

**Special notes for your reviewer**:

For testing purposes, this PR also contains a dummy implementation of the workers. The worker implementation - which includes multiplexing of multiple tasks - is subject to a separate PR.

---------

Signed-off-by: Christian Haudum <[email protected]>
  • Loading branch information
chaudum authored Oct 20, 2023
1 parent 9474be0 commit 27411cf
Show file tree
Hide file tree
Showing 17 changed files with 479 additions and 191 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ require (
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f
github.com/ncw/swift v1.0.53
github.com/oklog/run v1.1.0
github.com/oklog/ulid v1.3.1 // indirect
github.com/oklog/ulid v1.3.1
github.com/opentracing-contrib/go-grpc v0.0.0-20210225150812-73cb765af46e
github.com/opentracing-contrib/go-stdlib v1.0.0
github.com/opentracing/opentracing-go v1.2.0
Expand Down
261 changes: 244 additions & 17 deletions pkg/bloomgateway/bloomgateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,27 +39,130 @@ package bloomgateway

import (
"context"
"fmt"
"sort"
"sync"
"time"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/services"
"github.com/grafana/dskit/tenant"
"github.com/oklog/ulid"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"

"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/queue"
"github.com/grafana/loki/pkg/storage"
"github.com/grafana/loki/pkg/storage/config"
"github.com/grafana/loki/pkg/storage/stores/shipper/bloomshipper"
"github.com/grafana/loki/pkg/util"
)

var errGatewayUnhealthy = errors.New("bloom-gateway is unhealthy in the ring")
var errInvalidTenant = errors.New("invalid tenant in chunk refs")

type metrics struct{}
// TODO(chaudum): Make these configurable
const (
numWorkers = 4
maxTasksPerTenant = 1024
pendingTasksInitialCap = 1024
)

func newMetrics(r prometheus.Registerer) *metrics {
return &metrics{}
type metrics struct {
queueDuration prometheus.Histogram
inflightRequests prometheus.Summary
}

func newMetrics(subsystem string, registerer prometheus.Registerer) *metrics {
return &metrics{
queueDuration: promauto.With(registerer).NewHistogram(prometheus.HistogramOpts{
Namespace: "loki",
Subsystem: subsystem,
Name: "queue_duration_seconds",
Help: "Time spent by tasks in queue before getting picked up by a worker.",
Buckets: prometheus.DefBuckets,
}),
inflightRequests: promauto.With(registerer).NewSummary(prometheus.SummaryOpts{
Namespace: "loki",
Subsystem: subsystem,
Name: "inflight_tasks",
Help: "Number of inflight tasks (either queued or processing) sampled at a regular interval. Quantile buckets keep track of inflight tasks over the last 60s.",
Objectives: map[float64]float64{0.5: 0.05, 0.75: 0.02, 0.8: 0.02, 0.9: 0.01, 0.95: 0.01, 0.99: 0.001},
MaxAge: time.Minute,
AgeBuckets: 6,
}),
}
}

// Task is the data structure that is enqueued to the internal queue and queued by query workers
type Task struct {
// ID is a lexcographically sortable unique identifier of the task
ID ulid.ULID
// Tenant is the tenant ID
Tenant string
// Request is the original request
Request *logproto.FilterChunkRefRequest
// ErrCh is a send-only channel to write an error to
ErrCh chan<- error
// ResCh is a send-only channel to write partial responses to
ResCh chan<- *logproto.GroupedChunkRefs
}

// newTask returns a new Task that can be enqueued to the task queue.
// As additional arguments, it returns a result and an error channel, as well
// as an error if the instantiation fails.
func newTask(tenantID string, req *logproto.FilterChunkRefRequest) (Task, chan *logproto.GroupedChunkRefs, chan error, error) {
key, err := ulid.New(ulid.Now(), nil)
if err != nil {
return Task{}, nil, nil, err
}
errCh := make(chan error, 1)
resCh := make(chan *logproto.GroupedChunkRefs, 1)
task := Task{
ID: key,
Tenant: tenantID,
Request: req,
ErrCh: errCh,
ResCh: resCh,
}
return task, resCh, errCh, nil
}

// SyncMap is a map structure which can be synchronized using the RWMutex
type SyncMap[k comparable, v any] struct {
sync.RWMutex
Map map[k]v
}

type pendingTasks SyncMap[ulid.ULID, Task]

func (t *pendingTasks) Len() int {
t.RLock()
defer t.Unlock()
return len(t.Map)
}

func (t *pendingTasks) Add(k ulid.ULID, v Task) {
t.Lock()
t.Map[k] = v
t.Unlock()
}

func (t *pendingTasks) Delete(k ulid.ULID) {
t.Lock()
delete(t.Map, k)
t.Unlock()
}

// makePendingTasks creates a SyncMap that holds pending tasks
func makePendingTasks(n int) *pendingTasks {
return &pendingTasks{
RWMutex: sync.RWMutex{},
Map: make(map[ulid.ULID]Task, n),
}
}

type Gateway struct {
Expand All @@ -69,20 +172,33 @@ type Gateway struct {
logger log.Logger
metrics *metrics

bloomStore bloomshipper.Store
queue *queue.RequestQueue
queueMetrics *queue.Metrics
activeUsers *util.ActiveUsersCleanupService
bloomStore bloomshipper.Store

sharding ShardingStrategy

pendingTasks *pendingTasks

serviceMngr *services.Manager
serviceWatcher *services.FailureWatcher
}

// New returns a new instance of the Bloom Gateway.
func New(cfg Config, schemaCfg config.SchemaConfig, storageCfg storage.Config, shardingStrategy ShardingStrategy, cm storage.ClientMetrics, logger log.Logger, reg prometheus.Registerer) (*Gateway, error) {
g := &Gateway{
cfg: cfg,
logger: logger,
metrics: newMetrics(reg),
sharding: shardingStrategy,
cfg: cfg,
logger: logger,
metrics: newMetrics("bloom_gateway", reg),
sharding: shardingStrategy,
pendingTasks: makePendingTasks(pendingTasksInitialCap),
}

g.queueMetrics = queue.NewMetrics("bloom_gateway", reg)
g.queue = queue.NewRequestQueue(maxTasksPerTenant, time.Minute, g.queueMetrics)
g.activeUsers = util.NewActiveUsersCleanupWithDefaultValues(g.queueMetrics.Cleanup)

client, err := bloomshipper.NewBloomClient(schemaCfg.Configs, storageCfg, cm)
if err != nil {
return nil, err
Expand All @@ -99,18 +215,112 @@ func New(cfg Config, schemaCfg config.SchemaConfig, storageCfg storage.Config, s
}

g.bloomStore = bloomStore
g.Service = services.NewIdleService(g.starting, g.stopping)

svcs := []services.Service{g.queue, g.activeUsers}
g.serviceMngr, err = services.NewManager(svcs...)
if err != nil {
return nil, err
}
g.serviceWatcher = services.NewFailureWatcher()
g.serviceWatcher.WatchManager(g.serviceMngr)

g.Service = services.NewBasicService(g.starting, g.running, g.stopping).WithName("bloom-gateway")

return g, nil
}

func (g *Gateway) starting(ctx context.Context) error {
var err error
defer func() {
if err == nil || g.serviceMngr == nil {
return
}
if err := services.StopManagerAndAwaitStopped(context.Background(), g.serviceMngr); err != nil {
level.Error(g.logger).Log("msg", "failed to gracefully stop bloom gateway dependencies", "err", err)
}
}()

if err := services.StartManagerAndAwaitHealthy(ctx, g.serviceMngr); err != nil {
return errors.Wrap(err, "unable to start bloom gateway subservices")
}

for i := 0; i < numWorkers; i++ {
go g.startWorker(ctx, fmt.Sprintf("worker-%d", i))
}

return nil
}

func (g *Gateway) running(ctx context.Context) error {
// We observe inflight tasks frequently and at regular intervals, to have a good
// approximation of max inflight tasks over percentiles of time. We also do it with
// a ticker so that we keep tracking it even if we have no new requests but stuck inflight
// tasks (eg. worker are all exhausted).
inflightTasksTicker := time.NewTicker(250 * time.Millisecond)
defer inflightTasksTicker.Stop()

for {
select {
case <-ctx.Done():
return nil
case err := <-g.serviceWatcher.Chan():
return errors.Wrap(err, "bloom gateway subservice failed")
case <-inflightTasksTicker.C:
inflight := g.pendingTasks.Len()
g.metrics.inflightRequests.Observe(float64(inflight))
}
}
}

func (g *Gateway) stopping(_ error) error {
g.bloomStore.Stop()
return nil
return services.StopManagerAndAwaitStopped(context.Background(), g.serviceMngr)
}

// This is just a dummy implementation of the worker!
// TODO(chaudum): Implement worker that dequeues multiple pending tasks and
// multiplexes them prior to execution.
func (g *Gateway) startWorker(_ context.Context, id string) error {
level.Info(g.logger).Log("msg", "starting worker", "worker", id)

g.queue.RegisterConsumerConnection(id)
defer g.queue.UnregisterConsumerConnection(id)

idx := queue.StartIndexWithLocalQueue

for {
ctx := context.Background()
item, newIdx, err := g.queue.Dequeue(ctx, idx, id)
if err != nil {
if err != queue.ErrStopped {
level.Error(g.logger).Log("msg", "failed to dequeue task", "worker", id, "err", err)
continue
}
level.Info(g.logger).Log("msg", "stopping worker", "worker", id)
return err
}
task, ok := item.(Task)
if !ok {
level.Error(g.logger).Log("msg", "failed to cast to Task", "item", item)
continue
}

idx = newIdx
level.Info(g.logger).Log("msg", "dequeued task", "worker", id, "task", task.ID)
g.pendingTasks.Delete(task.ID)

r := task.Request
if len(r.Filters) > 0 {
r.Refs, err = g.bloomStore.FilterChunkRefs(ctx, task.Tenant, r.From.Time(), r.Through.Time(), r.Refs, r.Filters...)
}
if err != nil {
task.ErrCh <- err
} else {
for _, ref := range r.Refs {
task.ResCh <- ref
}
}
}
}

// FilterChunkRefs implements BloomGatewayServer
Expand All @@ -131,15 +341,32 @@ func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunk
return req.Refs[i].Fingerprint < req.Refs[j].Fingerprint
})

chunkRefs := req.Refs
task, resCh, errCh, err := newTask(tenantID, req)
if err != nil {
return nil, err
}

g.activeUsers.UpdateUserTimestamp(tenantID, time.Now())
level.Info(g.logger).Log("msg", "enqueue task", "task", task.ID)
g.queue.Enqueue(tenantID, []string{}, task, 100, func() {
// When enqueuing, we also add the task to the pending tasks
g.pendingTasks.Add(task.ID, task)
})

// Only query bloom filters if filters are present
if len(req.Filters) > 0 {
chunkRefs, err = g.bloomStore.FilterChunkRefs(ctx, tenantID, req.From.Time(), req.Through.Time(), req.Refs, req.Filters...)
if err != nil {
response := make([]*logproto.GroupedChunkRefs, 0, len(req.Refs))
for {
select {
case <-ctx.Done():
return nil, ctx.Err()
case err := <-errCh:
return nil, err
case res := <-resCh:
level.Info(g.logger).Log("msg", "got result", "task", task.ID, "tenant", tenantID, "res", res)
// wait for all parts of the full response
response = append(response, res)
if len(response) == len(req.Refs) {
return &logproto.FilterChunkRefResponse{ChunkRefs: response}, nil
}
}
}

return &logproto.FilterChunkRefResponse{ChunkRefs: chunkRefs}, nil
}
Loading

0 comments on commit 27411cf

Please sign in to comment.