Skip to content

Commit

Permalink
Merge branch 'main' into main
Browse files Browse the repository at this point in the history
  • Loading branch information
tiagoposse authored Jul 16, 2024
2 parents c7dbce0 + dc41c0d commit 21d878b
Show file tree
Hide file tree
Showing 6 changed files with 261 additions and 27 deletions.
2 changes: 2 additions & 0 deletions docs/sources/release-notes/v3-1.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ Key features in Loki 3.1.0 include the following:

- **lokitool:** Add `lokitool` to replace `cortextool`. ([#12166](https://github.com/grafana/loki/issues/12166)) ([7b7d3d4](https://github.com/grafana/loki/commit/7b7d3d4cd2c979c778d3741156f0d765a9e531b2)). Introduce `index audit` to `lokitool` ([#13008](https://github.com/grafana/loki/issues/13008)) ([47f0236](https://github.com/grafana/loki/commit/47f0236ea8f33a67a0a1abf6e6d6b3582661c4ba)).

- **Explore Logs:** Explore Logs, which lets you explore your Loki data without writing LogQL queries, is now available in public preview. If you are a Grafana Cloud user, you can access Explore Logs in the Grafana Cloud main navigation menu. If you are not a Grafana Cloud user, you can install the [Explore Logs plugin](https://grafana.com/docs/grafana-cloud/visualizations/simplified-exploration/logs/access/). For more information, refer to the [Explore Logs documentation](https://grafana.com/docs/grafana-cloud/visualizations/simplified-exploration/logs/).

- **Docs:** Added a video to the Getting Started demo and updated for Grafana Alloy. Added an interactive sandbox to the Loki Quickstart tutorial. Updated the documentation for the SSD and microservices deployment modes using the Helm charts. Documented the new meta-monitoring Helm chart.

Other improvements include the following:
Expand Down
6 changes: 6 additions & 0 deletions pkg/bloombuild/builder/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,12 @@ func (i *blockLoadingIter) loadNext() bool {
// Next implements v1.Iterator.
func (i *blockLoadingIter) Next() bool {
i.init()

if i.ctx.Err() != nil {
i.err = i.ctx.Err()
return false
}

return i.iter.Next() || i.loadNext()
}

Expand Down
20 changes: 4 additions & 16 deletions pkg/bloombuild/builder/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,22 +262,10 @@ func (b *Builder) notifyTaskCompletedToPlanner(
CreatedMetas: metas,
}

// We have a retry mechanism upper in the stack, but we add another one here
// to try our best to avoid losing the task result.
retries := backoff.New(c.Context(), b.cfg.BackoffConfig)
for retries.Ongoing() {
if err := c.Send(&protos.BuilderToPlanner{
BuilderID: b.ID,
Result: *result.ToProtoTaskResult(),
}); err == nil {
break
}

level.Error(logger).Log("msg", "failed to acknowledge task completion to planner. Retrying", "err", err)
retries.Wait()
}

if err := retries.Err(); err != nil {
if err := c.Send(&protos.BuilderToPlanner{
BuilderID: b.ID,
Result: *result.ToProtoTaskResult(),
}); err != nil {
return fmt.Errorf("failed to acknowledge task completion to planner: %w", err)
}

Expand Down
6 changes: 1 addition & 5 deletions pkg/querier/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -1118,9 +1118,6 @@ func (q *SingleTenantQuerier) DetectedFields(ctx context.Context, req *logproto.

detectedFields := parseDetectedFields(ctx, req.FieldLimit, streams)

//TODO: detected field needs to contain the sketch
// make sure response to frontend is GRPC
//only want cardinality in JSON
fields := make([]*logproto.DetectedField, len(detectedFields))
fieldCount := 0
for k, v := range detectedFields {
Expand All @@ -1141,7 +1138,6 @@ func (q *SingleTenantQuerier) DetectedFields(ctx context.Context, req *logproto.
fieldCount++
}

//TODO: detected fields response needs to include the sketch
return &logproto.DetectedFieldsResponse{
Fields: fields,
FieldLimit: req.GetFieldLimit(),
Expand Down Expand Up @@ -1218,7 +1214,6 @@ func parseDetectedFields(ctx context.Context, limit uint32, streams logqlmodel.S
fieldCount := uint32(0)

for _, stream := range streams {
detectType := true
level.Debug(spanlogger.FromContext(ctx)).Log(
"detected_fields", "true",
"msg", fmt.Sprintf("looking for detected fields in stream %d with %d lines", stream.Hash, len(stream.Entries)))
Expand All @@ -1241,6 +1236,7 @@ func parseDetectedFields(ctx context.Context, limit uint32, streams logqlmodel.S
df.parsers = append(df.parsers, *parser)
}

detectType := true
for _, v := range vals {
parsedFields := detectedFields[k]
if detectType {
Expand Down
82 changes: 81 additions & 1 deletion pkg/querier/querier_mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (

"github.com/grafana/loki/v3/pkg/logql/log"

"github.com/grafana/loki/pkg/push"

"github.com/grafana/loki/v3/pkg/loghttp"

"github.com/grafana/dskit/grpcclient"
Expand Down Expand Up @@ -118,7 +120,6 @@ func (c *querierClientMock) GetDetectedLabels(ctx context.Context, in *logproto.
return (*logproto.LabelToValuesResponse)(nil), args.Error(1)
}
return res.(*logproto.LabelToValuesResponse), args.Error(1)

}

func (c *querierClientMock) GetVolume(ctx context.Context, in *logproto.VolumeRequest, opts ...grpc.CallOption) (*logproto.VolumeResponse, error) {
Expand Down Expand Up @@ -517,6 +518,20 @@ func mockStreamIterator(from int, quantity int) iter.EntryIterator {
return iter.NewStreamIterator(mockStream(from, quantity))
}

// mockLogfmtStreamIterator returns an iterator with 1 stream and quantity entries,
// where entries timestamp and line string are constructed as sequential numbers
// starting at from, and the line is in logfmt format with the fields message, count and fake
func mockLogfmtStreamIterator(from int, quantity int) iter.EntryIterator {
return iter.NewStreamIterator(mockLogfmtStream(from, quantity))
}

// mockLogfmtStreamIterator returns an iterator with 1 stream and quantity entries,
// where entries timestamp and line string are constructed as sequential numbers
// starting at from, and the line is in logfmt format with the fields message, count and fake
func mockLogfmtStreamIteratorWithStructuredMetadata(from int, quantity int) iter.EntryIterator {
return iter.NewStreamIterator(mockLogfmtStreamWithStructuredMetadata(from, quantity))
}

// mockSampleIterator returns an iterator with 1 stream and quantity entries,
// where entries timestamp and line string are constructed as sequential numbers
// starting at from
Expand Down Expand Up @@ -546,6 +561,71 @@ func mockStreamWithLabels(from int, quantity int, labels string) logproto.Stream
}
}

func mockLogfmtStream(from int, quantity int) logproto.Stream {
return mockLogfmtStreamWithLabels(from, quantity, `{type="test"}`)
}

func mockLogfmtStreamWithLabels(_ int, quantity int, labels string) logproto.Stream {
entries := make([]logproto.Entry, 0, quantity)

// used for detected fields queries which are always BACKWARD
for i := quantity; i > 0; i-- {
entries = append(entries, logproto.Entry{
Timestamp: time.Unix(int64(i), 0),
Line: fmt.Sprintf(
`message="line %d" count=%d fake=true bytes=%dMB duration=%dms percent=%f even=%t`,
i,
i,
(i * 10),
(i * 256),
float32(i*10.0),
(i%2 == 0)),
})
}

return logproto.Stream{
Entries: entries,
Labels: labels,
}
}

func mockLogfmtStreamWithStructuredMetadata(from int, quantity int) logproto.Stream {
return mockLogfmtStreamWithLabelsAndStructuredMetadata(from, quantity, `{type="test"}`)
}

func mockLogfmtStreamWithLabelsAndStructuredMetadata(
from int,
quantity int,
labels string,
) logproto.Stream {
var entries []logproto.Entry
metadata := push.LabelsAdapter{
{
Name: "constant",
Value: "constant",
},
}

for i := from; i < from+quantity; i++ {
metadata = append(metadata, push.LabelAdapter{
Name: "variable",
Value: fmt.Sprintf("value%d", i),
})
}

for i := quantity; i > 0; i-- {
entries = append(entries, logproto.Entry{
Timestamp: time.Unix(int64(i), 0),
Line: fmt.Sprintf(`message="line %d" count=%d fake=true`, i, i),
StructuredMetadata: metadata,
})
}
return logproto.Stream{
Labels: labels,
Entries: entries,
}
}

type querierMock struct {
util.ExtendedMock
}
Expand Down
172 changes: 167 additions & 5 deletions pkg/querier/querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,9 +324,11 @@ func TestQuerier_SeriesAPI(t *testing.T) {
{Key: "a", Value: "1"},
{Key: "b", Value: "2"},
}},
{Labels: []logproto.SeriesIdentifier_LabelsEntry{
{Key: "a", Value: "1"},
{Key: "b", Value: "3"}},
{
Labels: []logproto.SeriesIdentifier_LabelsEntry{
{Key: "a", Value: "1"},
{Key: "b", Value: "3"},
},
},
{Labels: []logproto.SeriesIdentifier_LabelsEntry{
{Key: "a", Value: "1"},
Expand Down Expand Up @@ -994,7 +996,6 @@ func TestQuerier_RequestingIngesters(t *testing.T) {

for _, tc := range tests {
t.Run(tc.desc, func(t *testing.T) {

conf := mockQuerierConfig()
conf.QueryIngestersWithin = time.Minute * 30
if tc.setIngesterQueryStoreMaxLookback {
Expand Down Expand Up @@ -1175,7 +1176,6 @@ func setupIngesterQuerierMocks(conf Config, limits *validation.Overrides) (*quer
mockReadRingWithOneActiveIngester(),
&mockDeleteGettter{},
store, limits)

if err != nil {
return nil, nil, nil, err
}
Expand All @@ -1191,6 +1191,7 @@ type fakeTimeLimits struct {
func (f fakeTimeLimits) MaxQueryLookback(_ context.Context, _ string) time.Duration {
return f.maxQueryLookback
}

func (f fakeTimeLimits) MaxQueryLength(_ context.Context, _ string) time.Duration {
return f.maxQueryLength
}
Expand Down Expand Up @@ -1697,3 +1698,164 @@ func BenchmarkQuerierDetectedLabels(b *testing.B) {
assert.NoError(b, err)
}
}

func TestQuerier_DetectedFields(t *testing.T) {
limits, err := validation.NewOverrides(defaultLimitsTestConfig(), nil)
require.NoError(t, err)
ctx := user.InjectOrgID(context.Background(), "test")

conf := mockQuerierConfig()
conf.IngesterQueryStoreMaxLookback = 0

request := logproto.DetectedFieldsRequest{
Start: time.Now().Add(-1 * time.Minute),
End: time.Now(),
Query: `{type="test"}`,
LineLimit: 1000,
FieldLimit: 1000,
}

t.Run("returns detected fields from queried logs", func(t *testing.T) {
store := newStoreMock()
store.On("SelectLogs", mock.Anything, mock.Anything).
Return(mockLogfmtStreamIterator(1, 5), nil)

queryClient := newQueryClientMock()
queryClient.On("Recv").
Return(mockQueryResponse([]logproto.Stream{mockLogfmtStream(1, 5)}), nil)

ingesterClient := newQuerierClientMock()
ingesterClient.On("Query", mock.Anything, mock.Anything, mock.Anything).
Return(queryClient, nil)

querier, err := newQuerier(
conf,
mockIngesterClientConfig(),
newIngesterClientMockFactory(ingesterClient),
mockReadRingWithOneActiveIngester(),
&mockDeleteGettter{},
store, limits)
require.NoError(t, err)

resp, err := querier.DetectedFields(ctx, &request)
require.NoError(t, err)

detectedFields := resp.Fields
// log lines come from querier_mock_test.go
// message="line %d" count=%d fake=true bytes=%dMB duration=%dms percent=%f even=%t
assert.Len(t, detectedFields, 7)
expectedCardinality := map[string]uint64{
"message": 5,
"count": 5,
"fake": 1,
"bytes": 5,
"duration": 5,
"percent": 5,
"even": 2,
}
for _, d := range detectedFields {
card := expectedCardinality[d.Label]
assert.Equal(t, card, d.Cardinality, "Expected cardinality mismatch for: %s", d.Label)
}
})

t.Run("correctly identifies different field types", func(t *testing.T) {
store := newStoreMock()
store.On("SelectLogs", mock.Anything, mock.Anything).
Return(mockLogfmtStreamIterator(1, 2), nil)

queryClient := newQueryClientMock()
queryClient.On("Recv").
Return(mockQueryResponse([]logproto.Stream{mockLogfmtStream(1, 2)}), nil)

ingesterClient := newQuerierClientMock()
ingesterClient.On("Query", mock.Anything, mock.Anything, mock.Anything).
Return(queryClient, nil)

querier, err := newQuerier(
conf,
mockIngesterClientConfig(),
newIngesterClientMockFactory(ingesterClient),
mockReadRingWithOneActiveIngester(),
&mockDeleteGettter{},
store, limits)
require.NoError(t, err)

resp, err := querier.DetectedFields(ctx, &request)
require.NoError(t, err)

detectedFields := resp.Fields
// log lines come from querier_mock_test.go
// message="line %d" count=%d fake=true bytes=%dMB duration=%dms percent=%f even=%t
assert.Len(t, detectedFields, 7)

var messageField, countField, bytesField, durationField, floatField, evenField *logproto.DetectedField
for _, field := range detectedFields {
switch field.Label {
case "message":
messageField = field
case "count":
countField = field
case "bytes":
bytesField = field
case "duration":
durationField = field
case "percent":
floatField = field
case "even":
evenField = field
}
}

assert.Equal(t, logproto.DetectedFieldString, messageField.Type)
assert.Equal(t, logproto.DetectedFieldInt, countField.Type)
assert.Equal(t, logproto.DetectedFieldBytes, bytesField.Type)
assert.Equal(t, logproto.DetectedFieldDuration, durationField.Type)
assert.Equal(t, logproto.DetectedFieldFloat, floatField.Type)
assert.Equal(t, logproto.DetectedFieldBoolean, evenField.Type)
})
}

func BenchmarkQuerierDetectedFields(b *testing.B) {
limits, _ := validation.NewOverrides(defaultLimitsTestConfig(), nil)
ctx := user.InjectOrgID(context.Background(), "test")

conf := mockQuerierConfig()
conf.IngesterQueryStoreMaxLookback = 0

request := logproto.DetectedFieldsRequest{
Start: time.Now().Add(-1 * time.Minute),
End: time.Now(),
Query: `{type="test"}`,
LineLimit: 1000,
FieldLimit: 1000,
}

store := newStoreMock()
store.On("SelectLogs", mock.Anything, mock.Anything).
Return(mockLogfmtStreamIterator(1, 2), nil)

queryClient := newQueryClientMock()
queryClient.On("Recv").
Return(mockQueryResponse([]logproto.Stream{mockLogfmtStream(1, 2)}), nil)

ingesterClient := newQuerierClientMock()
ingesterClient.On("Query", mock.Anything, mock.Anything, mock.Anything).
Return(queryClient, nil)

querier, _ := newQuerier(
conf,
mockIngesterClientConfig(),
newIngesterClientMockFactory(ingesterClient),
mockReadRingWithOneActiveIngester(),
&mockDeleteGettter{},
store, limits)

b.ReportAllocs()
b.ResetTimer()

for i := 0; i < b.N; i++ {
_, err := querier.DetectedFields(ctx, &request)
assert.NoError(b, err)
}
}

0 comments on commit 21d878b

Please sign in to comment.