Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add e2e tests for bloom filtering #11645

Merged
merged 15 commits into from
Jan 12, 2024
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 14 additions & 5 deletions integration/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -479,12 +479,21 @@ type Header struct {
Name, Value string
}

// RunRangeQuery runs a query and returns an error if anything went wrong
// RunRangeQuery runs a 7d query and returns an error if anything went wrong
// This function is kept to keep backwards copatibility of existing tests.
// Better use (*Client).RunRangeQueryWithStartEnd()
func (c *Client) RunRangeQuery(ctx context.Context, query string, extraHeaders ...Header) (*Response, error) {
end := c.Now.Add(time.Second)
start := c.Now.Add(-7 * 24 * time.Hour)
return c.RunRangeQueryWithStartEnd(ctx, query, start, end, extraHeaders...)
}

// RunRangeQuery runs a query and returns an error if anything went wrong
func (c *Client) RunRangeQueryWithStartEnd(ctx context.Context, query string, start, end time.Time, extraHeaders ...Header) (*Response, error) {
ctx, cancelFunc := context.WithTimeout(ctx, requestTimeout)
defer cancelFunc()

buf, statusCode, err := c.run(ctx, c.rangeQueryURL(query), extraHeaders...)
buf, statusCode, err := c.run(ctx, c.rangeQueryURL(query, start, end), extraHeaders...)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -555,11 +564,11 @@ func (c *Client) parseResponse(buf []byte, statusCode int) (*Response, error) {
return &lokiResp, nil
}

func (c *Client) rangeQueryURL(query string) string {
func (c *Client) rangeQueryURL(query string, start, end time.Time) string {
v := url.Values{}
v.Set("query", query)
v.Set("start", formatTS(c.Now.Add(-7*24*time.Hour)))
v.Set("end", formatTS(c.Now.Add(time.Second)))
v.Set("start", formatTS(start))
v.Set("end", formatTS(end))

u, err := url.Parse(c.baseURL)
if err != nil {
Expand Down
26 changes: 19 additions & 7 deletions integration/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ server:
grpc_server_max_recv_msg_size: 110485813
grpc_server_max_send_msg_size: 110485813


common:
path_prefix: {{.dataPath}}
storage:
Expand All @@ -70,14 +69,25 @@ storage_config:
store-1:
directory: {{.sharedDataPath}}/fs-store-1
boltdb_shipper:
active_index_directory: {{.dataPath}}/index
active_index_directory: {{.dataPath}}/boltdb-index
cache_location: {{.dataPath}}/boltdb-cache
tsdb_shipper:
active_index_directory: {{.dataPath}}/tsdb-index
cache_location: {{.dataPath}}/tsdb-cache
bloom_shipper:
working_directory: {{.dataPath}}/bloom-shipper
blocks_downloading_queue:
workers_count: 1

bloom_gateway:
enabled: false

bloom_compactor:
enabled: false
working_directory: {{.dataPath}}/bloom-compactor

compactor:
working_directory: {{.dataPath}}/retention
working_directory: {{.dataPath}}/compactor
retention_enabled: true
delete_request_store: store-1

Expand Down Expand Up @@ -154,14 +164,14 @@ func New(logLevel level.Value, opts ...func(*Cluster)) *Cluster {
}

resetMetricRegistry()
sharedPath, err := os.MkdirTemp("", "loki-shared-data")
sharedPath, err := os.MkdirTemp("", "loki-shared-data-")
if err != nil {
panic(err.Error())
}

overridesFile := filepath.Join(sharedPath, "loki-overrides.yaml")

err = os.WriteFile(filepath.Join(sharedPath, "loki-overrides.yaml"), []byte(`overrides:`), 0777)
err = os.WriteFile(overridesFile, []byte(`overrides:`), 0777)
if err != nil {
panic(fmt.Errorf("error creating overrides file: %w", err))
}
Expand Down Expand Up @@ -318,12 +328,12 @@ func port(addr string) string {
func (c *Component) writeConfig() error {
var err error

configFile, err := os.CreateTemp("", "loki-config")
configFile, err := os.CreateTemp("", fmt.Sprintf("loki-%s-config-*.yaml", c.name))
if err != nil {
return fmt.Errorf("error creating config file: %w", err)
}

c.dataPath, err = os.MkdirTemp("", "loki-data")
c.dataPath, err = os.MkdirTemp("", fmt.Sprintf("loki-%s-data-", c.name))
if err != nil {
return fmt.Errorf("error creating data path: %w", err)
}
Expand Down Expand Up @@ -408,6 +418,8 @@ func (c *Component) run() error {
c.configFile,
"-limits.per-user-override-config",
c.overridesFile,
"-limits.per-user-override-period",
"1s",
), flagset); err != nil {
return err
}
Expand Down
171 changes: 171 additions & 0 deletions integration/loki_micro_services_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,14 @@ package integration
import (
"context"
"encoding/json"
"fmt"
"math/rand"
"strings"
"sync"
"testing"
"time"

"github.com/go-kit/log/level"
dto "github.com/prometheus/client_model/go"
"github.com/prometheus/common/expfmt"
"github.com/prometheus/prometheus/model/labels"
Expand Down Expand Up @@ -1056,6 +1059,174 @@ func TestCategorizedLabels(t *testing.T) {
}
}

func TestBloomFiltersEndToEnd(t *testing.T) {
commonFlags := []string{
"-bloom-compactor.compaction-interval=2s",
"-bloom-compactor.enable-compaction=true",
"-bloom-compactor.enabled=true",
"-bloom-gateway.enable-filtering=true",
"-bloom-gateway.enabled=true",
"-compactor.compaction-interval=1s",
"-frontend.default-validity=0s",
"-ingester.flush-on-shutdown=true",
"-ingester.wal-enabled=false",
"-query-scheduler.use-scheduler-ring=false",
"-store.index-cache-read.embedded-cache.enabled=true",
}

tenantID := randStringRunes()

clu := cluster.New(
level.DebugValue(),
cluster.SchemaWithTSDB,
func(c *cluster.Cluster) { c.SetSchemaVer("v13") },
)

defer func() {
assert.NoError(t, clu.Cleanup())
}()

var (
tDistributor = clu.AddComponent(
"distributor",
append(
commonFlags,
"-target=distributor",
)...,
)
tIndexGateway = clu.AddComponent(
"index-gateway",
append(
commonFlags,
"-target=index-gateway",
)...,
)
_ = clu.AddComponent(
"bloom-gateway",
append(
commonFlags,
"-target=bloom-gateway",
)...,
)
)
require.NoError(t, clu.Run())

var (
tIngester = clu.AddComponent(
"ingester",
append(
commonFlags,
"-target=ingester",
"-tsdb.shipper.index-gateway-client.server-address="+tIndexGateway.GRPCURL(),
)...,
)
tQueryScheduler = clu.AddComponent(
"query-scheduler",
append(
commonFlags,
"-target=query-scheduler",
"-tsdb.shipper.index-gateway-client.server-address="+tIndexGateway.GRPCURL(),
)...,
)
tCompactor = clu.AddComponent(
"compactor",
append(
commonFlags,
"-target=compactor",
"-tsdb.shipper.index-gateway-client.server-address="+tIndexGateway.GRPCURL(),
)...,
)
_ = clu.AddComponent(
"bloom-compactor",
append(
commonFlags,
"-target=bloom-compactor",
"-tsdb.shipper.index-gateway-client.server-address="+tIndexGateway.GRPCURL(),
)...,
)
)
require.NoError(t, clu.Run())

// finally, run the query-frontend and querier.
var (
tQueryFrontend = clu.AddComponent(
"query-frontend",
append(
commonFlags,
"-target=query-frontend",
"-frontend.scheduler-address="+tQueryScheduler.GRPCURL(),
"-common.compactor-address="+tCompactor.HTTPURL(),
"-tsdb.shipper.index-gateway-client.server-address="+tIndexGateway.GRPCURL(),
)...,
)
_ = clu.AddComponent(
"querier",
append(
commonFlags,
"-target=querier",
"-querier.scheduler-address="+tQueryScheduler.GRPCURL(),
"-common.compactor-address="+tCompactor.HTTPURL(),
"-tsdb.shipper.index-gateway-client.server-address="+tIndexGateway.GRPCURL(),
)...,
)
)
require.NoError(t, clu.Run())

now := time.Now()

cliDistributor := client.New(tenantID, "", tDistributor.HTTPURL())
cliDistributor.Now = now

cliIngester := client.New(tenantID, "", tIngester.HTTPURL())
cliIngester.Now = now

cliQueryFrontend := client.New(tenantID, "", tQueryFrontend.HTTPURL())
cliQueryFrontend.Now = now

cliIndexGateway := client.New(tenantID, "", tIndexGateway.HTTPURL())
cliIndexGateway.Now = now

lineTpl := `caller=loki_micro_services_test.go msg="push log line" id="%s"`
// ingest logs from 10 different pods
// each line contains a random, unique string
// that string is used to verify filtering using bloom gateway
uniqueStrings := make([]string, 600)
for i := 0; i < len(uniqueStrings); i++ {
id := randStringRunes()
id = fmt.Sprintf("%s-%d", id, i)
uniqueStrings[i] = id
pod := fmt.Sprintf("pod-%d", i%10)
line := fmt.Sprintf(lineTpl, id)
err := cliDistributor.PushLogLine(line, now.Add(-1*time.Hour).Add(time.Duration(i-len(uniqueStrings))*time.Second), nil, map[string]string{"pod": pod})
require.NoError(t, err)
}

// restart ingester to flush chunks and that there are zero chunks in memory
require.NoError(t, cliIngester.Flush())
require.NoError(t, tIngester.Restart())

// wait for compactor to compact index and for bloom compactor to build bloom filters
time.Sleep(10 * time.Second)
chaudum marked this conversation as resolved.
Show resolved Hide resolved

// use bloom gateway to perform needle in the haystack queries
randIdx := rand.Intn(len(uniqueStrings))
q := fmt.Sprintf(`{job="varlog"} |= "%s"`, uniqueStrings[randIdx])
end := now.Add(-1 * time.Second)
start := end.Add(-24 * time.Hour)
resp, err := cliQueryFrontend.RunRangeQueryWithStartEnd(context.Background(), q, start, end)
require.NoError(t, err)

// verify response
require.Len(t, resp.Data.Stream, 1)
expectedLine := fmt.Sprintf(lineTpl, uniqueStrings[randIdx])
require.Equal(t, expectedLine, resp.Data.Stream[0].Values[0][1])

// TODO(chaudum):
// verify that bloom blocks have actually been used for querying
// atm, we can only verify by logs, so we should add appropriate metrics for
// uploaded/downloaded blocks and metas
}

func getValueFromMF(mf *dto.MetricFamily, lbs []*dto.LabelPair) float64 {
for _, m := range mf.Metric {
if !assert.ObjectsAreEqualValues(lbs, m.GetLabel()) {
Expand Down
13 changes: 11 additions & 2 deletions pkg/bloomcompactor/bloomcompactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ import (
"github.com/grafana/loki/pkg/storage"
v1 "github.com/grafana/loki/pkg/storage/bloom/v1"
chunk_client "github.com/grafana/loki/pkg/storage/chunk/client"
"github.com/grafana/loki/pkg/storage/chunk/client/local"
"github.com/grafana/loki/pkg/storage/config"
"github.com/grafana/loki/pkg/storage/stores/shipper/bloomshipper"
"github.com/grafana/loki/pkg/storage/stores/shipper/indexshipper"
Expand Down Expand Up @@ -166,10 +167,18 @@ func New(
return nil, errors.Wrap(err, "create index shipper")
}

// The ObjectClient does not expose the key encoder it uses,
// so check the concrete type and set the FSEncoder if needed.
var keyEncoder chunk_client.KeyEncoder
switch objectClient.(type) {
case *local.FSObjectClient:
keyEncoder = chunk_client.FSEncoder
}

c.storeClients[periodicConfig.From] = storeClient{
object: objectClient,
index: index_storage.NewIndexStorageClient(objectClient, periodicConfig.IndexTables.PathPrefix),
chunk: chunk_client.NewClient(objectClient, nil, schemaConfig),
chunk: chunk_client.NewClient(objectClient, keyEncoder, schemaConfig),
indexShipper: indexShipper,
}
}
Expand Down Expand Up @@ -275,7 +284,7 @@ func (c *Compactor) compactTable(ctx context.Context, logger log.Logger, tableNa
return fmt.Errorf("index store client not found for period starting at %s", schemaCfg.From.String())
}

_, tenants, err := sc.index.ListFiles(ctx, tableName, false)
_, tenants, err := sc.index.ListFiles(ctx, tableName, true)
if err != nil {
return fmt.Errorf("failed to list files for table %s: %w", tableName, err)
}
Expand Down
7 changes: 3 additions & 4 deletions pkg/bloomgateway/bloomgateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,9 +180,8 @@ func New(cfg Config, schemaCfg config.SchemaConfig, storageCfg storage.Config, o
sharding: shardingStrategy,
pendingTasks: makePendingTasks(pendingTasksInitialCap),
workerConfig: workerConfig{
maxWaitTime: 200 * time.Millisecond,
maxItems: 100,
processBlocksSequentially: false,
maxWaitTime: 200 * time.Millisecond,
maxItems: 100,
},
workerMetrics: newWorkerMetrics(reg, constants.Loki, metricsSubsystem),
queueMetrics: queue.NewMetrics(reg, constants.Loki, metricsSubsystem),
Expand Down Expand Up @@ -323,7 +322,7 @@ func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunk
case res := <-resCh:
responses = append(responses, res)
// log line is helpful for debugging tests
// level.Debug(g.logger).Log("msg", "got partial result", "task", task.ID, "tenant", tenantID, "fp", uint64(res.Fp), "chunks", res.Removals.Len(), "progress", fmt.Sprintf("%d/%d", len(responses), requestCount))
level.Debug(g.logger).Log("msg", "got partial result", "task", task.ID, "tenant", tenantID, "fp_int", uint64(res.Fp), "fp_hex", res.Fp, "chunks_to_remove", res.Removals.Len(), "progress", fmt.Sprintf("%d/%d", len(responses), requestCount))
// wait for all parts of the full response
if len(responses) == requestCount {
for _, o := range responses {
Expand Down
Loading
Loading