Skip to content

Commit

Permalink
Merge branch 'main' into main
Browse files Browse the repository at this point in the history
  • Loading branch information
MichelHollands authored Jan 12, 2024
2 parents 95a2171 + 5517eaa commit 25d5add
Show file tree
Hide file tree
Showing 26 changed files with 768 additions and 298 deletions.
4 changes: 3 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@

##### Enhancements

* [11571](https://github.com/grafana/loki/pull/11571) **MichelHollands**: Add a metrics.go log line for requests from querier to ingester
* [11633](https://github.com/grafana/loki/pull/11633) **cyriltovena**: Add profiling integrations to tracing instrumentation.
* [11571](https://github.com/grafana/loki/pull/11571) **MichelHollands**: Add a metrics.go log line for requests from querier to ingester
* [11477](https://github.com/grafana/loki/pull/11477) **MichelHollands**: support GET for /ingester/shutdown
* [11363](https://github.com/grafana/loki/pull/11363) **kavirajk**: bugfix(memcached): Make memcached batch fetch truely context aware.
* [11319](https://github.com/grafana/loki/pull/11319) **someStrangerFromTheAbyss**: Helm: Add extraContainers to the write pods.
Expand Down Expand Up @@ -57,6 +58,7 @@
* [11601](https://github.com/grafana/loki/pull/11601) **dannykopping** Ruler: Fixed a panic that can be caused by concurrent read-write access of tenant configs when there are a large amount of rules.
* [11606](https://github.com/grafana/loki/pull/11606) **dannykopping** Fixed regression adding newlines to HTTP error response bodies which may break client integrations.
* [11657](https://github.com/grafana/loki/pull/11657) **ashwanthgoli** Log results cache: compose empty response based on the request being served to avoid returning incorrect limit or direction.
* [11587](https://github.com/grafana/loki/pull/11587) **trevorwhitney** Fix semantics of label parsing logic of metrics and logs queries. Both only parse the first label if multiple extractions into the same label are requested.

##### Changes

Expand Down
6 changes: 5 additions & 1 deletion cmd/loki/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ import (

"github.com/go-kit/log/level"
"github.com/grafana/dskit/log"
"github.com/grafana/dskit/spanprofiler"
"github.com/grafana/dskit/tracing"
"github.com/opentracing/opentracing-go"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/version"

Expand Down Expand Up @@ -84,7 +86,9 @@ func main() {
if err != nil {
level.Error(util_log.Logger).Log("msg", "error in initializing tracing. tracing will not be enabled", "err", err)
}

if config.Tracing.ProfilingEnabled {
opentracing.SetGlobalTracer(spanprofiler.NewTracer(opentracing.GlobalTracer()))
}
defer func() {
if trace != nil {
if err := trace.Close(); err != nil {
Expand Down
19 changes: 14 additions & 5 deletions integration/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -479,12 +479,21 @@ type Header struct {
Name, Value string
}

// RunRangeQuery runs a query and returns an error if anything went wrong
// RunRangeQuery runs a 7d query and returns an error if anything went wrong
// This function is kept to keep backwards copatibility of existing tests.
// Better use (*Client).RunRangeQueryWithStartEnd()
func (c *Client) RunRangeQuery(ctx context.Context, query string, extraHeaders ...Header) (*Response, error) {
end := c.Now.Add(time.Second)
start := c.Now.Add(-7 * 24 * time.Hour)
return c.RunRangeQueryWithStartEnd(ctx, query, start, end, extraHeaders...)
}

// RunRangeQuery runs a query and returns an error if anything went wrong
func (c *Client) RunRangeQueryWithStartEnd(ctx context.Context, query string, start, end time.Time, extraHeaders ...Header) (*Response, error) {
ctx, cancelFunc := context.WithTimeout(ctx, requestTimeout)
defer cancelFunc()

buf, statusCode, err := c.run(ctx, c.rangeQueryURL(query), extraHeaders...)
buf, statusCode, err := c.run(ctx, c.rangeQueryURL(query, start, end), extraHeaders...)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -555,11 +564,11 @@ func (c *Client) parseResponse(buf []byte, statusCode int) (*Response, error) {
return &lokiResp, nil
}

func (c *Client) rangeQueryURL(query string) string {
func (c *Client) rangeQueryURL(query string, start, end time.Time) string {
v := url.Values{}
v.Set("query", query)
v.Set("start", formatTS(c.Now.Add(-7*24*time.Hour)))
v.Set("end", formatTS(c.Now.Add(time.Second)))
v.Set("start", formatTS(start))
v.Set("end", formatTS(end))

u, err := url.Parse(c.baseURL)
if err != nil {
Expand Down
26 changes: 19 additions & 7 deletions integration/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ server:
grpc_server_max_recv_msg_size: 110485813
grpc_server_max_send_msg_size: 110485813
common:
path_prefix: {{.dataPath}}
storage:
Expand All @@ -70,14 +69,25 @@ storage_config:
store-1:
directory: {{.sharedDataPath}}/fs-store-1
boltdb_shipper:
active_index_directory: {{.dataPath}}/index
active_index_directory: {{.dataPath}}/boltdb-index
cache_location: {{.dataPath}}/boltdb-cache
tsdb_shipper:
active_index_directory: {{.dataPath}}/tsdb-index
cache_location: {{.dataPath}}/tsdb-cache
bloom_shipper:
working_directory: {{.dataPath}}/bloom-shipper
blocks_downloading_queue:
workers_count: 1
bloom_gateway:
enabled: false
bloom_compactor:
enabled: false
working_directory: {{.dataPath}}/bloom-compactor
compactor:
working_directory: {{.dataPath}}/retention
working_directory: {{.dataPath}}/compactor
retention_enabled: true
delete_request_store: store-1
Expand Down Expand Up @@ -154,14 +164,14 @@ func New(logLevel level.Value, opts ...func(*Cluster)) *Cluster {
}

resetMetricRegistry()
sharedPath, err := os.MkdirTemp("", "loki-shared-data")
sharedPath, err := os.MkdirTemp("", "loki-shared-data-")
if err != nil {
panic(err.Error())
}

overridesFile := filepath.Join(sharedPath, "loki-overrides.yaml")

err = os.WriteFile(filepath.Join(sharedPath, "loki-overrides.yaml"), []byte(`overrides:`), 0777)
err = os.WriteFile(overridesFile, []byte(`overrides:`), 0777)
if err != nil {
panic(fmt.Errorf("error creating overrides file: %w", err))
}
Expand Down Expand Up @@ -318,12 +328,12 @@ func port(addr string) string {
func (c *Component) writeConfig() error {
var err error

configFile, err := os.CreateTemp("", "loki-config")
configFile, err := os.CreateTemp("", fmt.Sprintf("loki-%s-config-*.yaml", c.name))
if err != nil {
return fmt.Errorf("error creating config file: %w", err)
}

c.dataPath, err = os.MkdirTemp("", "loki-data")
c.dataPath, err = os.MkdirTemp("", fmt.Sprintf("loki-%s-data-", c.name))
if err != nil {
return fmt.Errorf("error creating data path: %w", err)
}
Expand Down Expand Up @@ -408,6 +418,8 @@ func (c *Component) run() error {
c.configFile,
"-limits.per-user-override-config",
c.overridesFile,
"-limits.per-user-override-period",
"1s",
), flagset); err != nil {
return err
}
Expand Down
171 changes: 171 additions & 0 deletions integration/loki_micro_services_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,14 @@ package integration
import (
"context"
"encoding/json"
"fmt"
"math/rand"
"strings"
"sync"
"testing"
"time"

"github.com/go-kit/log/level"
dto "github.com/prometheus/client_model/go"
"github.com/prometheus/common/expfmt"
"github.com/prometheus/prometheus/model/labels"
Expand Down Expand Up @@ -1056,6 +1059,174 @@ func TestCategorizedLabels(t *testing.T) {
}
}

func TestBloomFiltersEndToEnd(t *testing.T) {
commonFlags := []string{
"-bloom-compactor.compaction-interval=2s",
"-bloom-compactor.enable-compaction=true",
"-bloom-compactor.enabled=true",
"-bloom-gateway.enable-filtering=true",
"-bloom-gateway.enabled=true",
"-compactor.compaction-interval=1s",
"-frontend.default-validity=0s",
"-ingester.flush-on-shutdown=true",
"-ingester.wal-enabled=false",
"-query-scheduler.use-scheduler-ring=false",
"-store.index-cache-read.embedded-cache.enabled=true",
}

tenantID := randStringRunes()

clu := cluster.New(
level.DebugValue(),
cluster.SchemaWithTSDB,
func(c *cluster.Cluster) { c.SetSchemaVer("v13") },
)

defer func() {
assert.NoError(t, clu.Cleanup())
}()

var (
tDistributor = clu.AddComponent(
"distributor",
append(
commonFlags,
"-target=distributor",
)...,
)
tIndexGateway = clu.AddComponent(
"index-gateway",
append(
commonFlags,
"-target=index-gateway",
)...,
)
_ = clu.AddComponent(
"bloom-gateway",
append(
commonFlags,
"-target=bloom-gateway",
)...,
)
)
require.NoError(t, clu.Run())

var (
tIngester = clu.AddComponent(
"ingester",
append(
commonFlags,
"-target=ingester",
"-tsdb.shipper.index-gateway-client.server-address="+tIndexGateway.GRPCURL(),
)...,
)
tQueryScheduler = clu.AddComponent(
"query-scheduler",
append(
commonFlags,
"-target=query-scheduler",
"-tsdb.shipper.index-gateway-client.server-address="+tIndexGateway.GRPCURL(),
)...,
)
tCompactor = clu.AddComponent(
"compactor",
append(
commonFlags,
"-target=compactor",
"-tsdb.shipper.index-gateway-client.server-address="+tIndexGateway.GRPCURL(),
)...,
)
_ = clu.AddComponent(
"bloom-compactor",
append(
commonFlags,
"-target=bloom-compactor",
"-tsdb.shipper.index-gateway-client.server-address="+tIndexGateway.GRPCURL(),
)...,
)
)
require.NoError(t, clu.Run())

// finally, run the query-frontend and querier.
var (
tQueryFrontend = clu.AddComponent(
"query-frontend",
append(
commonFlags,
"-target=query-frontend",
"-frontend.scheduler-address="+tQueryScheduler.GRPCURL(),
"-common.compactor-address="+tCompactor.HTTPURL(),
"-tsdb.shipper.index-gateway-client.server-address="+tIndexGateway.GRPCURL(),
)...,
)
_ = clu.AddComponent(
"querier",
append(
commonFlags,
"-target=querier",
"-querier.scheduler-address="+tQueryScheduler.GRPCURL(),
"-common.compactor-address="+tCompactor.HTTPURL(),
"-tsdb.shipper.index-gateway-client.server-address="+tIndexGateway.GRPCURL(),
)...,
)
)
require.NoError(t, clu.Run())

now := time.Now()

cliDistributor := client.New(tenantID, "", tDistributor.HTTPURL())
cliDistributor.Now = now

cliIngester := client.New(tenantID, "", tIngester.HTTPURL())
cliIngester.Now = now

cliQueryFrontend := client.New(tenantID, "", tQueryFrontend.HTTPURL())
cliQueryFrontend.Now = now

cliIndexGateway := client.New(tenantID, "", tIndexGateway.HTTPURL())
cliIndexGateway.Now = now

lineTpl := `caller=loki_micro_services_test.go msg="push log line" id="%s"`
// ingest logs from 10 different pods
// each line contains a random, unique string
// that string is used to verify filtering using bloom gateway
uniqueStrings := make([]string, 600)
for i := 0; i < len(uniqueStrings); i++ {
id := randStringRunes()
id = fmt.Sprintf("%s-%d", id, i)
uniqueStrings[i] = id
pod := fmt.Sprintf("pod-%d", i%10)
line := fmt.Sprintf(lineTpl, id)
err := cliDistributor.PushLogLine(line, now.Add(-1*time.Hour).Add(time.Duration(i-len(uniqueStrings))*time.Second), nil, map[string]string{"pod": pod})
require.NoError(t, err)
}

// restart ingester to flush chunks and that there are zero chunks in memory
require.NoError(t, cliIngester.Flush())
require.NoError(t, tIngester.Restart())

// wait for compactor to compact index and for bloom compactor to build bloom filters
time.Sleep(10 * time.Second)

// use bloom gateway to perform needle in the haystack queries
randIdx := rand.Intn(len(uniqueStrings))
q := fmt.Sprintf(`{job="varlog"} |= "%s"`, uniqueStrings[randIdx])
end := now.Add(-1 * time.Second)
start := end.Add(-24 * time.Hour)
resp, err := cliQueryFrontend.RunRangeQueryWithStartEnd(context.Background(), q, start, end)
require.NoError(t, err)

// verify response
require.Len(t, resp.Data.Stream, 1)
expectedLine := fmt.Sprintf(lineTpl, uniqueStrings[randIdx])
require.Equal(t, expectedLine, resp.Data.Stream[0].Values[0][1])

// TODO(chaudum):
// verify that bloom blocks have actually been used for querying
// atm, we can only verify by logs, so we should add appropriate metrics for
// uploaded/downloaded blocks and metas
}

func getValueFromMF(mf *dto.MetricFamily, lbs []*dto.LabelPair) float64 {
for _, m := range mf.Metric {
if !assert.ObjectsAreEqualValues(lbs, m.GetLabel()) {
Expand Down
13 changes: 11 additions & 2 deletions pkg/bloomcompactor/bloomcompactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ import (
"github.com/grafana/loki/pkg/storage"
v1 "github.com/grafana/loki/pkg/storage/bloom/v1"
chunk_client "github.com/grafana/loki/pkg/storage/chunk/client"
"github.com/grafana/loki/pkg/storage/chunk/client/local"
"github.com/grafana/loki/pkg/storage/config"
"github.com/grafana/loki/pkg/storage/stores/shipper/bloomshipper"
"github.com/grafana/loki/pkg/storage/stores/shipper/indexshipper"
Expand Down Expand Up @@ -166,10 +167,18 @@ func New(
return nil, errors.Wrap(err, "create index shipper")
}

// The ObjectClient does not expose the key encoder it uses,
// so check the concrete type and set the FSEncoder if needed.
var keyEncoder chunk_client.KeyEncoder
switch objectClient.(type) {
case *local.FSObjectClient:
keyEncoder = chunk_client.FSEncoder
}

c.storeClients[periodicConfig.From] = storeClient{
object: objectClient,
index: index_storage.NewIndexStorageClient(objectClient, periodicConfig.IndexTables.PathPrefix),
chunk: chunk_client.NewClient(objectClient, nil, schemaConfig),
chunk: chunk_client.NewClient(objectClient, keyEncoder, schemaConfig),
indexShipper: indexShipper,
}
}
Expand Down Expand Up @@ -275,7 +284,7 @@ func (c *Compactor) compactTable(ctx context.Context, logger log.Logger, tableNa
return fmt.Errorf("index store client not found for period starting at %s", schemaCfg.From.String())
}

_, tenants, err := sc.index.ListFiles(ctx, tableName, false)
_, tenants, err := sc.index.ListFiles(ctx, tableName, true)
if err != nil {
return fmt.Errorf("failed to list files for table %s: %w", tableName, err)
}
Expand Down
Loading

0 comments on commit 25d5add

Please sign in to comment.