Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Detected labels from store #12441

Merged
merged 40 commits into from
Apr 23, 2024
Merged
Show file tree
Hide file tree
Changes from 37 commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
25eedf8
Fetch labels and values from store
shantanualsi Apr 3, 2024
50b2c4b
Dedupe label and values from ingester and store
shantanualsi Apr 3, 2024
9398dfa
Enforce some validations and timeout
shantanualsi Apr 3, 2024
97cf8a3
Fetch label values from index
shantanualsi Apr 3, 2024
30329f7
Fix passing matchers in ingesters
shantanualsi Apr 3, 2024
4ac58f3
Add basic querier tests for Detected Labels
shantanualsi Apr 8, 2024
adab10d
Log on error in type conversion in detected labels
shantanualsi Apr 8, 2024
a6cbec0
Improve querier test
shantanualsi Apr 8, 2024
6c830c9
Add test for ingester_querier
shantanualsi Apr 8, 2024
78d831f
Format
shantanualsi Apr 8, 2024
384a52c
Add tests to ingester instance
shantanualsi Apr 10, 2024
32db0cf
Merge branch 'main' into detected-labels-minor-enhancements
shantanualsi Apr 10, 2024
4f9d6c8
Add ingester tests for detected labels
shantanualsi Apr 10, 2024
c7a3018
Merge branch 'main' into detected-labels-minor-enhancements
shantanualsi Apr 10, 2024
ab5d3c8
Merge branch 'main' into detected-labels-store
shantanualsi Apr 10, 2024
189c23b
Add logic to fetch labels from store
shantanualsi Apr 10, 2024
be64851
Merge branch 'detected-labels-minor-enhancements' into detected-label…
shantanualsi Apr 10, 2024
8f3c297
Fix querier tests
shantanualsi Apr 10, 2024
5eb40be
Add tests
shantanualsi Apr 10, 2024
949928c
Fix lint
shantanualsi Apr 10, 2024
46102d1
Add metrics to detected labels
shantanualsi Apr 11, 2024
83da625
Merge branch 'main' into detected-labels-store
shantanualsi Apr 11, 2024
7719a36
Merge branch 'main' into detected-labels-store
shantanualsi Apr 12, 2024
9d81685
Fix assigning back to map in detected labels
shantanualsi Apr 15, 2024
8ceac34
Fixes to ingester_querier and tests
shantanualsi Apr 15, 2024
f2244fa
Minor improvements as per PR comments
shantanualsi Apr 15, 2024
9637401
Merge branch 'main' into detected-labels-store
shantanualsi Apr 15, 2024
1e19282
Merge branch 'main' into detected-labels-store
shantanualsi Apr 17, 2024
cab219f
Convert table tests to methods
shantanualsi Apr 17, 2024
e26c62d
Convert querier table tests to methods
shantanualsi Apr 17, 2024
7819d81
Merge branch 'main' into detected-labels-store
shantanualsi Apr 19, 2024
24d90c4
Add detected label ingester test with query
shantanualsi Apr 15, 2024
a075f57
Add detected labels tripperware
shantanualsi Apr 19, 2024
cdc11f9
Comment cache metrics
shantanualsi Apr 19, 2024
fd3fd8e
Merge branch 'main' into detected-labels-store
shantanualsi Apr 19, 2024
3b26ba6
Merge branch 'main' into detected-labels-store
shantanualsi Apr 21, 2024
98e5098
Merge branch 'main' into detected-labels-store
shantanualsi Apr 22, 2024
4f49219
Change static labels to map
shantanualsi Apr 22, 2024
5a3d3c2
Merge branch 'main' into detected-labels-store
shantanualsi Apr 23, 2024
311db24
Merge branch 'main' into detected-labels-store
shantanualsi Apr 23, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -1395,7 +1395,6 @@ func (i *Ingester) GetDetectedLabels(ctx context.Context, req *logproto.Detected
if err != nil {
return nil, err
}
level.Info(i.logger).Log("msg", matchers)
}

labelMap, err := instance.LabelsWithValues(ctx, *req.Start, matchers...)
Expand Down
123 changes: 123 additions & 0 deletions pkg/ingester/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -784,6 +784,129 @@ func Test_InMemoryLabels(t *testing.T) {
require.Equal(t, []string{"bar", "foo"}, res.Values)
}

func TestIngester_GetDetectedLabels(t *testing.T) {
ctx := user.InjectOrgID(context.Background(), "test")

ingesterConfig := defaultIngesterTestConfig(t)
limits, err := validation.NewOverrides(defaultLimitsTestConfig(), nil)
require.NoError(t, err)
store := &mockStore{
chunks: map[string][]chunk.Chunk{},
}

i, err := New(ingesterConfig, client.Config{}, store, limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, log.NewNopLogger())
require.NoError(t, err)
defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck

// Push labels
req := logproto.PushRequest{
Streams: []logproto.Stream{
{
Labels: `{foo="bar",bar="baz1"}`,
},
{
Labels: `{foo="bar",bar="baz2"}`,
},
{
Labels: `{foo="bar1",bar="baz3"}`,
},
{
Labels: `{foo="foo1",bar="baz1"}`,
},
{
Labels: `{foo="foo",bar="baz1"}`,
},
},
}
for i := 0; i < 10; i++ {
req.Streams[0].Entries = append(req.Streams[0].Entries, logproto.Entry{
Timestamp: time.Unix(0, 0),
Line: fmt.Sprintf("line %d", i),
})
req.Streams[1].Entries = append(req.Streams[1].Entries, logproto.Entry{
Timestamp: time.Unix(0, 0),
Line: fmt.Sprintf("line %d", i),
})
}

_, err = i.Push(ctx, &req)
require.NoError(t, err)

res, err := i.GetDetectedLabels(ctx, &logproto.DetectedLabelsRequest{
Start: &[]time.Time{time.Now().Add(11 * time.Nanosecond)}[0],
End: nil,
Query: "",
shantanualsi marked this conversation as resolved.
Show resolved Hide resolved
})

require.NoError(t, err)
fooValues, ok := res.Labels["foo"]
require.True(t, ok)
barValues, ok := res.Labels["bar"]
require.True(t, ok)
require.Equal(t, 4, len(fooValues.Values))
require.Equal(t, 3, len(barValues.Values))
}

func TestIngester_GetDetectedLabelsWithQuery(t *testing.T) {
ctx := user.InjectOrgID(context.Background(), "test")

ingesterConfig := defaultIngesterTestConfig(t)
limits, err := validation.NewOverrides(defaultLimitsTestConfig(), nil)
require.NoError(t, err)
store := &mockStore{
chunks: map[string][]chunk.Chunk{},
}

i, err := New(ingesterConfig, client.Config{}, store, limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, log.NewNopLogger())
require.NoError(t, err)
defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck

// Push labels
req := logproto.PushRequest{
Streams: []logproto.Stream{
{
Labels: `{foo="bar",bar="baz1"}`,
},
{
Labels: `{foo="bar",bar="baz2"}`,
},
{
Labels: `{foo="bar1",bar="baz3"}`,
},
{
Labels: `{foo="foo1",bar="baz4"}`,
},
},
}
for i := 0; i < 10; i++ {
req.Streams[0].Entries = append(req.Streams[0].Entries, logproto.Entry{
Timestamp: time.Unix(0, 0),
Line: fmt.Sprintf("line %d", i),
})
req.Streams[1].Entries = append(req.Streams[1].Entries, logproto.Entry{
Timestamp: time.Unix(0, 0),
Line: fmt.Sprintf("line %d", i),
})
}

_, err = i.Push(ctx, &req)
require.NoError(t, err)

res, err := i.GetDetectedLabels(ctx, &logproto.DetectedLabelsRequest{
Start: &[]time.Time{time.Now().Add(11 * time.Nanosecond)}[0],
End: nil,
Query: `{foo="bar"}`,
})

require.NoError(t, err)
fooValues, ok := res.Labels["foo"]
require.True(t, ok)
barValues, ok := res.Labels["bar"]
require.True(t, ok)
require.Equal(t, 1, len(fooValues.Values))
require.Equal(t, 2, len(barValues.Values))
}

func Test_DedupeIngester(t *testing.T) {
var (
requests = int64(400)
Expand Down
26 changes: 24 additions & 2 deletions pkg/ingester/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -588,9 +588,31 @@ type UniqueValues map[string]struct{}

// LabelsWithValues returns the label names with all the unique values depending on the request
func (i *instance) LabelsWithValues(ctx context.Context, startTime time.Time, matchers ...*labels.Matcher) (map[string]UniqueValues, error) {
// TODO (shantanu): Figure out how to get the label names from index directly when no matchers are given.

labelMap := make(map[string]UniqueValues)
if len(matchers) == 0 {
labelsFromIndex, err := i.index.LabelNames(startTime, nil)
shantanualsi marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return nil, err
}

for _, label := range labelsFromIndex {
values, err := i.index.LabelValues(startTime, label, nil)
shantanualsi marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return nil, err
}
existingValues, exists := labelMap[label]
if !exists {
existingValues = make(map[string]struct{})
shantanualsi marked this conversation as resolved.
Show resolved Hide resolved
}
for _, v := range values {
existingValues[v] = struct{}{}
}
labelMap[label] = existingValues
}

return labelMap, nil
}

err := i.forMatchingStreams(ctx, startTime, matchers, nil, func(s *stream) error {
for _, label := range s.labels {
v, exists := labelMap[label.Name]
Expand Down
49 changes: 49 additions & 0 deletions pkg/ingester/instance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1480,6 +1480,55 @@ func insertData(t *testing.T, instance *instance) {
}
}

func TestInstance_LabelsWithValues(t *testing.T) {
instance, currentTime, _ := setupTestStreams(t)
start := []time.Time{currentTime.Add(11 * time.Nanosecond)}[0]
m, err := labels.NewMatcher(labels.MatchEqual, "app", "test")
require.NoError(t, err)

t.Run("label names with no matchers returns all detected labels", func(t *testing.T) {
var matchers []*labels.Matcher
res, err := instance.LabelsWithValues(context.Background(), start, matchers...)
completeResponse := map[string]UniqueValues{
"app": map[string]struct{}{
"test": {},
"test2": {},
},
"job": map[string]struct{}{
"varlogs": {},
"varlogs2": {},
},
}
require.NoError(t, err)
require.Equal(t, completeResponse, res)
})

t.Run("label names with matcher returns response with matching detected labels", func(t *testing.T) {
matchers := []*labels.Matcher{m}
res, err := instance.LabelsWithValues(context.Background(), start, matchers...)
responseWithMatchingLabel := map[string]UniqueValues{
"app": map[string]struct{}{
"test": {},
},
"job": map[string]struct{}{
"varlogs": {},
"varlogs2": {},
},
}
require.NoError(t, err)
require.Equal(t, responseWithMatchingLabel, res)
})

t.Run("label names matchers and no start time returns a empty response", func(t *testing.T) {
matchers := []*labels.Matcher{m}
var st time.Time
res, err := instance.LabelsWithValues(context.Background(), st, matchers...)

require.NoError(t, err)
require.Equal(t, map[string]UniqueValues{}, res)
})
}

type fakeQueryServer func(*logproto.QueryResponse) error

func (f fakeQueryServer) Send(res *logproto.QueryResponse) error {
Expand Down
42 changes: 40 additions & 2 deletions pkg/logql/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -580,6 +580,44 @@ func extractShard(shards []string) *astmapper.ShardAnnotation {
return &shard
}

func RecordDetectedLabelsQueryMetrics(_ context.Context, _ log.Logger, _ time.Time, _ time.Time, _ string, _ string, _ logql_stats.Result) {
// TODO(shantanu) log metrics here
func RecordDetectedLabelsQueryMetrics(ctx context.Context, log log.Logger, start time.Time, end time.Time, query string, status string, stats logql_stats.Result) {
var (
logger = fixLogger(ctx, log)
latencyType = latencyTypeFast
queryType = QueryTypeVolume
)

// Tag throughput metric by latency type based on a threshold.
// Latency below the threshold is fast, above is slow.
if stats.Summary.ExecTime > slowQueryThresholdSecond {
latencyType = latencyTypeSlow
}

rangeType := "range"

level.Info(logger).Log(
"api", "detected_labels",
"latency", latencyType,
"query_type", queryType,
"query", query,
"query_hash", util.HashedQuery(query),
"start", start.Format(time.RFC3339Nano),
"end", end.Format(time.RFC3339Nano),
"start_delta", time.Since(start),
"end_delta", time.Since(end),
"range_type", rangeType,
"length", end.Sub(start),
"duration", time.Duration(int64(stats.Summary.ExecTime*float64(time.Second))),
"status", status,
"splits", stats.Summary.Splits,
shantanualsi marked this conversation as resolved.
Show resolved Hide resolved
"total_entries", stats.Summary.TotalEntriesReturned,
// cache is accumulated by middleware used by the frontend only; logs from the queriers will not show cache stats
//"cache_volume_results_req", stats.Caches.VolumeResult.EntriesRequested,
//"cache_volume_results_hit", stats.Caches.VolumeResult.EntriesFound,
//"cache_volume_results_stored", stats.Caches.VolumeResult.EntriesStored,
//"cache_volume_results_download_time", stats.Caches.VolumeResult.CacheDownloadTime(),
//"cache_volume_results_query_length_served", stats.Caches.VolumeResult.CacheQueryLengthServed(),
)

execLatency.WithLabelValues(status, queryType, "").Observe(stats.Summary.ExecTime)
}
3 changes: 2 additions & 1 deletion pkg/loki/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -883,7 +883,8 @@ func (t *Loki) setupAsyncStore() error {
}

func (t *Loki) initIngesterQuerier() (_ services.Service, err error) {
t.ingesterQuerier, err = querier.NewIngesterQuerier(t.Cfg.IngesterClient, t.ring, t.Cfg.Querier.ExtraQueryDelay, t.Cfg.MetricsNamespace)
logger := log.With(util_log.Logger, "component", "querier")
t.ingesterQuerier, err = querier.NewIngesterQuerier(t.Cfg.IngesterClient, t.ring, t.Cfg.Querier.ExtraQueryDelay, t.Cfg.MetricsNamespace, logger)
if err != nil {
return nil, err
}
Expand Down
17 changes: 13 additions & 4 deletions pkg/querier/ingester_querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"strings"
"time"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"golang.org/x/exp/slices"

"github.com/grafana/loki/v3/pkg/storage/stores/index/seriesvolume"
Expand Down Expand Up @@ -41,23 +43,25 @@ type IngesterQuerier struct {
ring ring.ReadRing
pool *ring_client.Pool
extraQueryDelay time.Duration
logger log.Logger
}

func NewIngesterQuerier(clientCfg client.Config, ring ring.ReadRing, extraQueryDelay time.Duration, metricsNamespace string) (*IngesterQuerier, error) {
func NewIngesterQuerier(clientCfg client.Config, ring ring.ReadRing, extraQueryDelay time.Duration, metricsNamespace string, logger log.Logger) (*IngesterQuerier, error) {
factory := func(addr string) (ring_client.PoolClient, error) {
return client.New(clientCfg, addr)
}

return newIngesterQuerier(clientCfg, ring, extraQueryDelay, ring_client.PoolAddrFunc(factory), metricsNamespace)
return newIngesterQuerier(clientCfg, ring, extraQueryDelay, ring_client.PoolAddrFunc(factory), metricsNamespace, logger)
}

// newIngesterQuerier creates a new IngesterQuerier and allows to pass a custom ingester client factory
// used for testing purposes
func newIngesterQuerier(clientCfg client.Config, ring ring.ReadRing, extraQueryDelay time.Duration, clientFactory ring_client.PoolFactory, metricsNamespace string) (*IngesterQuerier, error) {
func newIngesterQuerier(clientCfg client.Config, ring ring.ReadRing, extraQueryDelay time.Duration, clientFactory ring_client.PoolFactory, metricsNamespace string, logger log.Logger) (*IngesterQuerier, error) {
iq := IngesterQuerier{
ring: ring,
pool: clientpool.NewPool("ingester", clientCfg.PoolConfig, ring, clientFactory, util_log.Logger, metricsNamespace),
extraQueryDelay: extraQueryDelay,
logger: logger,
}

err := services.StartAndAwaitRunning(context.Background(), iq.pool)
Expand Down Expand Up @@ -364,12 +368,17 @@ func (q *IngesterQuerier) DetectedLabel(ctx context.Context, req *logproto.Detec
})

if err != nil {
level.Error(q.logger).Log("msg", "error getting detected labels", "err", err)
return nil, err
}

labelMap := make(map[string][]string)
for _, resp := range ingesterResponses {
thisIngester := resp.response.(*logproto.LabelToValuesResponse)
thisIngester, ok := resp.response.(*logproto.LabelToValuesResponse)
if !ok {
level.Warn(q.logger).Log("msg", "Cannot convert response to LabelToValuesResponse in detectedlabels",
"response", resp)
}

for label, thisIngesterValues := range thisIngester.Labels {
var combinedValues []string
Expand Down
Loading
Loading