Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add logcli default client test #10100

Closed
wants to merge 2 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
208 changes: 208 additions & 0 deletions pkg/logcli/client/client_test.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,24 @@
package client

import (
"context"
"encoding/base64"
"fmt"
"io"
"net"
"net/http"
"testing"
"time"

"github.com/grafana/loki/pkg/logcli/volume"
"github.com/grafana/loki/pkg/loghttp"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/querier/queryrange"
"github.com/grafana/loki/pkg/querier/queryrange/queryrangebase"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func Test_buildURL(t *testing.T) {
Expand Down Expand Up @@ -78,3 +91,198 @@ func Test_getHTTPRequestHeader(t *testing.T) {
})
}
}

func Test_DefaultClient(t *testing.T) {
now := time.Now()
then := now.Add(-1 * time.Hour)

server := NewMockLokiHTTPServer(t, now, then)
client := &DefaultClient{
Address: fmt.Sprintf("http://127.0.0.1%s", server.server.Addr),
}

server.Run(t)
time.Sleep(time.Second)
defer server.Stop(t)

t.Run("GetVolume happy path", func(t *testing.T) {
query := volume.Query{
QueryString: `{foo="bar"}`,
Start: then,
End: now,
Step: 10 * time.Minute,
Quiet: false,
Limit: 10,
TargetLabels: []string{},
AggregateByLabels: false,
}
resp, err := client.GetVolume(&query)
require.NoError(t, err)

require.Equal(t, loghttp.QueryResponse{
Status: loghttp.QueryStatusSuccess,
Data: loghttp.QueryResponseData{
ResultType: loghttp.ResultTypeVector,
Result: loghttp.Vector{
{
Metric: map[model.LabelName]model.LabelValue{
"foo": "bar",
},
Value: 42,
Timestamp: model.TimeFromUnixNano(now.UnixNano()),
},
},
},
}, *resp)
})

t.Run("GetVolumeRange happy path", func(t *testing.T) {
query := volume.Query{
QueryString: `{foo="bar"}`,
Start: then,
End: now,
Step: 10 * time.Minute,
Quiet: false,
Limit: 10,
TargetLabels: []string{},
AggregateByLabels: false,
}
resp, err := client.GetVolumeRange(&query)
require.NoError(t, err)

require.Equal(t, loghttp.QueryResponse{
Status: loghttp.QueryStatusSuccess,
Data: loghttp.QueryResponseData{
ResultType: loghttp.ResultTypeMatrix,
Result: loghttp.Matrix{
{
Metric: map[model.LabelName]model.LabelValue{"foo": "bar"},
Values: []model.SamplePair{
{
Timestamp: model.TimeFromUnixNano(then.UnixNano()),
Value: 42,
},
{
Timestamp: model.TimeFromUnixNano(now.UnixNano()),
Value: 47,
},
},
},
},
},
}, *resp)

})

}

type mockLokiHTTPServer struct {
server *http.Server
tenantID string
now time.Time
then time.Time
}

func NewMockLokiHTTPServer(t *testing.T, now, then time.Time) *mockLokiHTTPServer {
listener, err := net.Listen("tcp", ":0")
require.NoError(t, err)

port := listener.Addr().(*net.TCPAddr).Port
err = listener.Close()
require.NoError(t, err)

return &mockLokiHTTPServer{
server: &http.Server{Addr: fmt.Sprintf(":%d", port), Handler: nil},
now: now,
then: then,
}
}

func (s *mockLokiHTTPServer) getTenantIDUnsafe() string {
return s.tenantID
}

func (s *mockLokiHTTPServer) Run(t *testing.T) {
var mux http.ServeMux
labels := labels.Labels{
{
Name: "foo",
Value: "bar",
},
}
codec := queryrange.Codec{}

mux.HandleFunc("/loki/api/v1/index/volume", func(w http.ResponseWriter, request *http.Request) {
volume := queryrange.LokiPromResponse{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very detailed test case, thanks your help. My only doubt here is the return value type of this test.

The return value type here should be logproto.VolumeResponse instead of queryrange.LokiPromResponse, because the code of http.go return logproto.VolumeResponse, and the test construction in http_test.go is also logproto.VolumeResponse

https://github.com/grafana/loki/blob/main/pkg/querier/http.go#L489

        if resp == nil { // Some stores don't implement this
		resp = &logproto.VolumeResponse{Volumes: []logproto.Volume{}}
	}
	if err := queryrange.WriteResponse(r, nil, resp, w); err != nil {
		serverutil.WriteError(err, w)
	}

https://github.com/grafana/loki/blob/main/pkg/querier/http_test.go#L386

	ret := &logproto.VolumeResponse{
		Volumes: []logproto.Volume{
			{Name: `{foo="bar"}`, Volume: 38},
		},
	}

https://github.com/grafana/loki/blob/main/pkg/util/marshal/marshal.go#L132

func WriteVolumeResponseJSON(r *logproto.VolumeResponse, w io.Writer) error {
	s := jsoniter.ConfigFastest.BorrowStream(w)
	defer jsoniter.ConfigFastest.ReturnStream(s)
	s.WriteVal(r)
	s.WriteRaw("\n")
	return s.Flush()
}

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these are all inside the querier package, or in the last case, used for querier -> query frontend communication. we only use logproto.Volume for internal communication between components. The HTTP API, and response returned by the query frontend, converts the multiple logproto.Volume responses it gets from the many queriers that handle the split up query and turns it into a queryrange.LokiPromRepsonse. This is because the primary consumer of these endpoints wants the data to look like a metrics response to more easily plot it.

Response: &queryrangebase.PrometheusResponse{
Status: loghttp.QueryStatusSuccess,
Data: queryrangebase.PrometheusData{
ResultType: loghttp.ResultTypeVector,
Result: []queryrangebase.SampleStream{
{
Labels: logproto.FromLabelsToLabelAdapters(labels),
Samples: []logproto.LegacySample{
{
Value: 42,
TimestampMs: s.now.UnixNano() / 1e6,
},
},
},
},
},
},
}

resp, err := codec.EncodeResponse(request.Context(), request, &volume)
require.NoError(t, err)
bytes, err := io.ReadAll(resp.Body)
require.NoError(t, err)

w.WriteHeader(resp.StatusCode)
w.Write(bytes)
})

mux.HandleFunc("/loki/api/v1/index/volume_range", func(w http.ResponseWriter, request *http.Request) {
volume := queryrange.LokiPromResponse{
Response: &queryrangebase.PrometheusResponse{
Status: loghttp.QueryStatusSuccess,
Data: queryrangebase.PrometheusData{
ResultType: loghttp.ResultTypeMatrix,
Result: []queryrangebase.SampleStream{
{
Labels: logproto.FromLabelsToLabelAdapters(labels),
Samples: []logproto.LegacySample{
{
Value: 42,
TimestampMs: s.then.UnixNano() / 1e6,
},
{
Value: 47,
TimestampMs: s.now.UnixNano() / 1e6,
},
},
},
},
},
},
}

resp, err := codec.EncodeResponse(request.Context(), request, &volume)
require.NoError(t, err)
bytes, err := io.ReadAll(resp.Body)
require.NoError(t, err)

w.WriteHeader(resp.StatusCode)
w.Write(bytes)
})

s.server.Handler = &mux
go func() {
_ = s.server.ListenAndServe()
}()
}

func (s *mockLokiHTTPServer) Stop(t *testing.T) {
err := s.server.Shutdown(context.Background())
require.NoError(t, err)
}