Skip to content

Commit

Permalink
Support tail requests with protobuf encoding. (grafana#11426)
Browse files Browse the repository at this point in the history
**What this PR does / why we need it**:
The `TailRequest` would use `bytes plan` instead of `Plan plan`. The
former is not well supported in gogoprotobuf.

**Checklist**
- [ ] Reviewed the
[`CONTRIBUTING.md`](https://github.com/grafana/loki/blob/main/CONTRIBUTING.md)
guide (**required**)
- [ ] Documentation added
- [x] Tests updated
- [ ] `CHANGELOG.md` updated
- [ ] If the change is worth mentioning in the release notes, add
`add-to-release-notes` label
- [ ] Changes that require user attention or interaction to upgrade are
documented in `docs/sources/setup/upgrade/_index.md`
- [ ] For Helm chart changes bump the Helm chart version in
`production/helm/loki/Chart.yaml` and update
`production/helm/loki/CHANGELOG.md` and
`production/helm/loki/README.md`. [Example
PR](grafana@d10549e)
- [ ] If the change is deprecating or removing a configuration option,
update the `deprecated-config.yaml` and `deleted-config.yaml` files
respectively in the `tools/deprecated-config-checker` directory.
[Example
PR](grafana@0d4416a)
  • Loading branch information
jeschkies authored Dec 11, 2023
1 parent f67fff3 commit 8dde7b9
Show file tree
Hide file tree
Showing 7 changed files with 287 additions and 190 deletions.
2 changes: 1 addition & 1 deletion docs/sources/configure/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -844,7 +844,7 @@ results_cache:
# A comma-separated list of LogQL vector and range aggregations that should be
# sharded
# CLI flag: -querier.shard-aggregation
# CLI flag: -querier.shard-aggregations
[shard_aggregations: <string> | default = ""]
# Cache index stats query results.
Expand Down
47 changes: 47 additions & 0 deletions integration/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,25 @@ import (
"errors"
"fmt"
"io"
"net"
"net/http"
"net/url"
"strconv"
"strings"
"time"

"github.com/buger/jsonparser"
"github.com/gorilla/websocket"
"github.com/grafana/dskit/user"
"github.com/prometheus/common/config"
"github.com/prometheus/prometheus/model/labels"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/plog/plogotlp"

logcli "github.com/grafana/loki/pkg/logcli/client"
"github.com/grafana/loki/pkg/loghttp"
"github.com/grafana/loki/pkg/util/unmarshal"
)

const requestTimeout = 30 * time.Second
Expand Down Expand Up @@ -655,6 +662,46 @@ func (c *Client) Series(ctx context.Context, matcher string) ([]map[string]strin
return values.Data, nil
}

type TailResult struct {
Response loghttp.TailResponse
Err error
}

func (c *Client) Tail(ctx context.Context, query string, out chan TailResult) (*websocket.Conn, error) {
client := &logcli.DefaultClient{
Address: c.baseURL,
OrgID: c.instanceID,
TLSConfig: config.TLSConfig{},
}
start := time.Now().Add(-1 * time.Hour)

wc, err := client.LiveTailQueryConn(query, time.Duration(0), 100, start, false)
if err != nil {
return nil, err
}

go func() {

tailResponse := new(loghttp.TailResponse)

for {
select {
case <-ctx.Done():
close(out)
return
default:
err := unmarshal.ReadTailResponseJSON(tailResponse, wc)
if errors.Is(err, net.ErrClosed) {
close(out)
return
}
out <- TailResult{*tailResponse, err}
}
}
}()
return wc, nil
}

func (c *Client) request(ctx context.Context, method string, url string, extraHeaders ...Header) (*http.Request, error) {
ctx = user.InjectOrgID(ctx, c.instanceID)
req, err := http.NewRequestWithContext(ctx, method, url, nil)
Expand Down
65 changes: 57 additions & 8 deletions integration/loki_micro_services_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"strings"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -66,7 +67,19 @@ func TestMicroServicesIngestQuery(t *testing.T) {
)
require.NoError(t, clu.Run())

// finally, run the query-frontend and querier.
// the run querier.
var (
tQuerier = clu.AddComponent(
"querier",
"-target=querier",
"-querier.scheduler-address="+tQueryScheduler.GRPCURL(),
"-boltdb.shipper.index-gateway-client.server-address="+tIndexGateway.GRPCURL(),
"-common.compactor-address="+tCompactor.HTTPURL(),
)
)
require.NoError(t, clu.Run())

// finally, run the query-frontend.
var (
tQueryFrontend = clu.AddComponent(
"query-frontend",
Expand All @@ -76,13 +89,8 @@ func TestMicroServicesIngestQuery(t *testing.T) {
"-common.compactor-address="+tCompactor.HTTPURL(),
"-querier.per-request-limits-enabled=true",
"-frontend.encoding=protobuf",
)
_ = clu.AddComponent(
"querier",
"-target=querier",
"-querier.scheduler-address="+tQueryScheduler.GRPCURL(),
"-boltdb.shipper.index-gateway-client.server-address="+tIndexGateway.GRPCURL(),
"-common.compactor-address="+tCompactor.HTTPURL(),
"-querier.shard-aggregations=quantile_over_time",
"-frontend.tail-proxy-url="+tQuerier.HTTPURL(),
)
)
require.NoError(t, clu.Run())
Expand Down Expand Up @@ -146,6 +154,47 @@ func TestMicroServicesIngestQuery(t *testing.T) {
_, err := cliQueryFrontendLimited.LabelNames(context.Background())
require.ErrorContains(t, err, "the query time range exceeds the limit (query length")
})

t.Run("tail", func(t *testing.T) {
ctx, cancelFunc := context.WithCancel(context.Background())
defer cancelFunc()

out := make(chan client.TailResult)
wc, err := cliQueryFrontend.Tail(ctx, `{job="fake"}`, out)
require.NoError(t, err)
defer wc.Close()

var lines []string
mu := sync.Mutex{}
done := make(chan struct{})
go func() {
for resp := range out {
require.NoError(t, resp.Err)
for _, stream := range resp.Response.Streams {
for _, e := range stream.Entries {
mu.Lock()
lines = append(lines, e.Line)
mu.Unlock()
}
}
}
done <- struct{}{}
}()
assert.Eventually(
t,
func() bool {
mu.Lock()
defer mu.Unlock()
return len(lines) == 4
},
10*time.Second,
100*time.Millisecond,
)
wc.Close()
cancelFunc()
<-done
assert.ElementsMatch(t, []string{"lineA", "lineB", "lineC", "lineD"}, lines)
})
}

func TestMicroServicesIngestQueryWithSchemaChange(t *testing.T) {
Expand Down
Loading

0 comments on commit 8dde7b9

Please sign in to comment.