Skip to content

Commit

Permalink
feat(blooms): Apply task timeout in bloom builder (#14988)
Browse files Browse the repository at this point in the history
  • Loading branch information
salvacorts authored Nov 18, 2024
1 parent df7a8e4 commit f92dde0
Show file tree
Hide file tree
Showing 3 changed files with 162 additions and 33 deletions.
10 changes: 10 additions & 0 deletions pkg/bloombuild/builder/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,12 @@ func (b *Builder) processTask(
logger := task.GetLogger(b.logger)
level.Debug(logger).Log("msg", "task started")

if timeout := b.limits.BuilderResponseTimeout(task.Tenant); timeout > 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithDeadline(ctx, time.Now().Add(timeout))
defer cancel()
}

client, err := b.bloomStore.Client(task.Table.ModelTime())
if err != nil {
level.Error(logger).Log("msg", "failed to get client", "err", err)
Expand All @@ -390,6 +396,10 @@ func (b *Builder) processTask(
)

for i := range task.Gaps {
if ctx.Err() != nil {
return nil, ctx.Err()
}

gap := task.Gaps[i]
logger := log.With(logger, "gap", gap.Bounds.String())

Expand Down
183 changes: 150 additions & 33 deletions pkg/bloombuild/builder/builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"go.uber.org/atomic"
"google.golang.org/grpc"

"github.com/grafana/loki/v3/pkg/bloombuild/planner/plannertest"
"github.com/grafana/loki/v3/pkg/bloombuild/protos"
"github.com/grafana/loki/v3/pkg/storage"
v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1"
Expand All @@ -28,10 +29,7 @@ import (
"github.com/grafana/loki/v3/pkg/storage/types"
)

func Test_BuilderLoop(t *testing.T) {
logger := log.NewNopLogger()
//logger := log.NewLogfmtLogger(os.Stdout)

func setupBuilder(t *testing.T, plannerAddr string, limits Limits, logger log.Logger) *Builder {
schemaCfg := config.SchemaConfig{
Configs: []config.PeriodConfig{
{
Expand Down Expand Up @@ -64,22 +62,8 @@ func Test_BuilderLoop(t *testing.T) {
},
}

tasks := make([]*protos.ProtoTask, 256)
for i := range tasks {
tasks[i] = &protos.ProtoTask{
Id: fmt.Sprintf("task-%d", i),
}
}

server, err := newFakePlannerServer(tasks)
require.NoError(t, err)

// Start the server so the builder can connect and receive tasks.
server.Start()

limits := fakeLimits{}
cfg := Config{
PlannerAddress: server.Addr(),
PlannerAddress: plannerAddr,
BackoffConfig: backoff.Config{
MinBackoff: 1 * time.Second,
MaxBackoff: 10 * time.Second,
Expand All @@ -88,8 +72,48 @@ func Test_BuilderLoop(t *testing.T) {
}
flagext.DefaultValues(&cfg.GrpcConfig)

builder, err := New(cfg, limits, schemaCfg, storageCfg, storage.NewClientMetrics(), nil, fakeBloomStore{}, logger, prometheus.DefaultRegisterer, nil)
metrics := storage.NewClientMetrics()
metrics.Unregister()

builder, err := New(cfg, limits, schemaCfg, storageCfg, metrics, nil, fakeBloomStore{}, logger, prometheus.NewPedanticRegistry(), nil)
require.NoError(t, err)

return builder
}

func createTasks(n int) []*protos.ProtoTask {
tasks := make([]*protos.ProtoTask, n)
for i := range tasks {
tasks[i] = protos.NewTask(
plannertest.TestTable,
"fake",
v1.NewBounds(model.Fingerprint(i), model.Fingerprint(i+10)),
plannertest.TsdbID(1),
[]protos.Gap{
{
Bounds: v1.NewBounds(model.Fingerprint(i+1), model.Fingerprint(i+2)),
},
{
Bounds: v1.NewBounds(model.Fingerprint(i+3), model.Fingerprint(i+9)),
},
},
).ToProtoTask()
}
return tasks
}

func Test_BuilderLoop(t *testing.T) {
logger := log.NewNopLogger()
//logger := log.NewLogfmtLogger(os.Stdout)

tasks := createTasks(256)
server, err := newFakePlannerServer(tasks)
require.NoError(t, err)

// Start the server so the builder can connect and receive tasks.
server.Start()

builder := setupBuilder(t, server.Addr(), fakeLimits{}, logger)
t.Cleanup(func() {
err = services.StopAndAwaitTerminated(context.Background(), builder)
require.NoError(t, err)
Expand Down Expand Up @@ -128,9 +152,71 @@ func Test_BuilderLoop(t *testing.T) {
require.True(t, server.shutdownCalled)
}

func Test_BuilderLoop_Timeout(t *testing.T) {
for _, tc := range []struct {
name string
timeout time.Duration
allTasksSucceed bool
}{
{
name: "no timeout configured",
timeout: 0,
allTasksSucceed: true,
},
{
name: "long enough timeout",
timeout: 15 * time.Minute,
allTasksSucceed: true,
},
{
name: "task times out",
timeout: 1 * time.Nanosecond, // Pretty much immediately.
allTasksSucceed: false,
},
} {
t.Run(tc.name, func(t *testing.T) {
logger := log.NewNopLogger()
//logger := log.NewLogfmtLogger(os.Stdout)

tasks := createTasks(256)
server, err := newFakePlannerServer(tasks)
require.NoError(t, err)

// Start the server so the builder can connect and receive tasks.
server.Start()

limits := fakeLimits{
taskTimout: tc.timeout,
}
builder := setupBuilder(t, server.Addr(), limits, logger)
t.Cleanup(func() {
err = services.StopAndAwaitTerminated(context.Background(), builder)
require.NoError(t, err)

server.Stop()
})

err = services.StartAndAwaitRunning(context.Background(), builder)
require.NoError(t, err)

require.Eventually(t, func() bool {
return server.CompletedTasks() >= len(tasks)
}, 30*time.Second, 500*time.Millisecond)

erroredTasks := server.ErroredTasks()
if tc.allTasksSucceed {
require.Equal(t, 0, erroredTasks)
} else {
require.Equal(t, len(tasks), erroredTasks)
}
})
}
}

type fakePlannerServer struct {
tasks []*protos.ProtoTask
completedTasks atomic.Int64
erroredTasks atomic.Int64
shutdownCalled bool

listenAddr string
Expand Down Expand Up @@ -198,11 +284,18 @@ func (f *fakePlannerServer) BuilderLoop(srv protos.PlannerForBuilder_BuilderLoop
if err := srv.Send(&protos.PlannerToBuilder{Task: task}); err != nil {
return fmt.Errorf("failed to send task: %w", err)
}
if _, err := srv.Recv(); err != nil {

result, err := srv.Recv()
if err != nil {
return fmt.Errorf("failed to receive task response: %w", err)
}
time.Sleep(10 * time.Millisecond) // Simulate task processing time to add some latency.

f.completedTasks.Inc()
if result.Result.Error != "" {
f.erroredTasks.Inc()
}

time.Sleep(10 * time.Millisecond) // Simulate task processing time to add some latency.
}

// No more tasks. Wait until shutdown.
Expand All @@ -214,32 +307,36 @@ func (f *fakePlannerServer) CompletedTasks() int {
return int(f.completedTasks.Load())
}

func (f *fakePlannerServer) ErroredTasks() int {
return int(f.erroredTasks.Load())
}

func (f *fakePlannerServer) NotifyBuilderShutdown(_ context.Context, _ *protos.NotifyBuilderShutdownRequest) (*protos.NotifyBuilderShutdownResponse, error) {
f.shutdownCalled = true
return &protos.NotifyBuilderShutdownResponse{}, nil
}

type fakeLimits struct {
Limits
taskTimout time.Duration
}

func (f fakeLimits) BloomBlockEncoding(_ string) string {
panic("implement me")
}

func (f fakeLimits) BloomNGramLength(_ string) int {
panic("implement me")
}
var _ Limits = fakeLimits{}

func (f fakeLimits) BloomNGramSkip(_ string) int {
panic("implement me")
func (f fakeLimits) BloomBlockEncoding(_ string) string {
return "none"
}

func (f fakeLimits) BloomMaxBlockSize(_ string) int {
panic("implement me")
return 0
}

func (f fakeLimits) BloomMaxBloomSize(_ string) int {
panic("implement me")
return 0
}

func (f fakeLimits) BuilderResponseTimeout(_ string) time.Duration {
return f.taskTimout
}

type fakeBloomStore struct {
Expand All @@ -250,6 +347,26 @@ func (f fakeBloomStore) BloomMetrics() *v1.Metrics {
return nil
}

func (f fakeBloomStore) Client(_ model.Time) (bloomshipper.Client, error) {
return fakeBloomClient{}, nil
}

func (f fakeBloomStore) Fetcher(_ model.Time) (*bloomshipper.Fetcher, error) {
return &bloomshipper.Fetcher{}, nil
}

type fakeBloomClient struct {
bloomshipper.Client
}

func (f fakeBloomClient) PutBlock(_ context.Context, _ bloomshipper.Block) error {
return nil
}

func (f fakeBloomClient) PutMeta(_ context.Context, _ bloomshipper.Meta) error {
return nil
}

func parseDayTime(s string) config.DayTime {
t, err := time.Parse("2006-01-02", s)
if err != nil {
Expand Down
2 changes: 2 additions & 0 deletions pkg/bloombuild/builder/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package builder
import (
"flag"
"fmt"
"time"

"github.com/grafana/dskit/backoff"
"github.com/grafana/dskit/grpcclient"
Expand Down Expand Up @@ -40,4 +41,5 @@ type Limits interface {
BloomBlockEncoding(tenantID string) string
BloomMaxBlockSize(tenantID string) int
BloomMaxBloomSize(tenantID string) int
BuilderResponseTimeout(tenantID string) time.Duration
}

0 comments on commit f92dde0

Please sign in to comment.