Skip to content

Commit

Permalink
Merge branch 'main' into kind
Browse files Browse the repository at this point in the history
  • Loading branch information
Manik2708 authored Dec 18, 2024
2 parents 8a4cff6 + ebf84c1 commit cda2bbd
Show file tree
Hide file tree
Showing 17 changed files with 638 additions and 148 deletions.
14 changes: 14 additions & 0 deletions cmd/anonymizer/app/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ type Options struct {
HashCustomTags bool
HashLogs bool
HashProcess bool
StartTime int64
EndTime int64
}

const (
Expand All @@ -28,6 +30,8 @@ const (
hashLogsFlag = "hash-logs"
hashProcessFlag = "hash-process"
maxSpansCount = "max-spans-count"
startTime = "start-time"
endTime = "end-time"
)

// AddFlags adds flags for anonymizer main program
Expand Down Expand Up @@ -72,6 +76,16 @@ func (o *Options) AddFlags(command *cobra.Command) {
maxSpansCount,
-1,
"The maximum number of spans to anonymize")
command.Flags().Int64Var(
&o.StartTime,
startTime,
0,
"The start time of time window for searching trace, timestampe in unix nanoseconds")
command.Flags().Int64Var(
&o.EndTime,
endTime,
0,
"The end time of time window for searching trace, timestampe in unix nanoseconds")

// mark traceid flag as mandatory
command.MarkFlagRequired(traceIDFlag)
Expand Down
6 changes: 6 additions & 0 deletions cmd/anonymizer/app/flags_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ func TestOptionsWithDefaultFlags(t *testing.T) {
assert.False(t, o.HashLogs)
assert.False(t, o.HashProcess)
assert.Equal(t, -1, o.MaxSpansCount)
assert.Equal(t, int64(0), o.StartTime)
assert.Equal(t, int64(0), o.EndTime)
}

func TestOptionsWithFlags(t *testing.T) {
Expand All @@ -40,6 +42,8 @@ func TestOptionsWithFlags(t *testing.T) {
"--hash-logs",
"--hash-process",
"--max-spans-count=100",
"--start-time=1",
"--end-time=2",
})

assert.Equal(t, "192.168.1.10:16686", o.QueryGRPCHostPort)
Expand All @@ -50,6 +54,8 @@ func TestOptionsWithFlags(t *testing.T) {
assert.True(t, o.HashLogs)
assert.True(t, o.HashProcess)
assert.Equal(t, 100, o.MaxSpansCount)
assert.Equal(t, int64(1), o.StartTime)
assert.Equal(t, int64(2), o.EndTime)
}

func TestMain(m *testing.M) {
Expand Down
9 changes: 6 additions & 3 deletions cmd/anonymizer/app/query/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"fmt"
"io"
"strings"
"time"

"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
Expand Down Expand Up @@ -49,14 +50,16 @@ func unwrapNotFoundErr(err error) error {
}

// QueryTrace queries for a trace and returns all spans inside it
func (q *Query) QueryTrace(traceID string) ([]model.Span, error) {
func (q *Query) QueryTrace(traceID string, startTime time.Time, endTime time.Time) ([]model.Span, error) {
mTraceID, err := model.TraceIDFromString(traceID)
if err != nil {
return nil, fmt.Errorf("failed to convert the provided trace id: %w", err)
}
// TODO: add start time & end time

request := api_v2.GetTraceRequest{
TraceID: mTraceID,
TraceID: mTraceID,
StartTime: startTime,
EndTime: endTime,
}

stream, err := q.client.GetTrace(context.Background(), &request)
Expand Down
16 changes: 12 additions & 4 deletions cmd/anonymizer/app/query/query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"net"
"sync"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
Expand Down Expand Up @@ -108,24 +109,31 @@ func TestQueryTrace(t *testing.T) {
defer q.Close()

t.Run("No error", func(t *testing.T) {
s.spanReader.On("GetTrace", matchContext, matchGetTraceParameters).Return(
startTime := time.Date(1970, time.January, 1, 0, 0, 0, 1000, time.UTC)
endTime := time.Date(1970, time.January, 1, 0, 0, 0, 2000, time.UTC)
expectedGetTraceParameters := spanstore.GetTraceParameters{
TraceID: mockTraceID,
StartTime: startTime,
EndTime: endTime,
}
s.spanReader.On("GetTrace", matchContext, expectedGetTraceParameters).Return(
mockTraceGRPC, nil).Once()

spans, err := q.QueryTrace(mockTraceID.String())
spans, err := q.QueryTrace(mockTraceID.String(), startTime, endTime)
require.NoError(t, err)
assert.Equal(t, len(spans), len(mockTraceGRPC.Spans))
})

t.Run("Invalid TraceID", func(t *testing.T) {
_, err := q.QueryTrace(mockInvalidTraceID)
_, err := q.QueryTrace(mockInvalidTraceID, time.Time{}, time.Time{})
assert.ErrorContains(t, err, "failed to convert the provided trace id")
})

t.Run("Trace not found", func(t *testing.T) {
s.spanReader.On("GetTrace", matchContext, matchGetTraceParameters).Return(
nil, spanstore.ErrTraceNotFound).Once()

spans, err := q.QueryTrace(mockTraceID.String())
spans, err := q.QueryTrace(mockTraceID.String(), time.Time{}, time.Time{})
assert.Nil(t, spans)
assert.ErrorIs(t, err, spanstore.ErrTraceNotFound)
})
Expand Down
15 changes: 14 additions & 1 deletion cmd/anonymizer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"errors"
"fmt"
"os"
"time"

"github.com/spf13/cobra"
"go.uber.org/zap"
Expand Down Expand Up @@ -53,7 +54,11 @@ func main() {
logger.Fatal("error while creating query object", zap.Error(err))
}

spans, err := query.QueryTrace(options.TraceID)
spans, err := query.QueryTrace(
options.TraceID,
initTime(options.StartTime),
initTime(options.EndTime),
)
if err != nil {
logger.Fatal("error while querying for trace", zap.Error(err))
}
Expand Down Expand Up @@ -93,3 +98,11 @@ func main() {
os.Exit(1)
}
}

func initTime(ts int64) time.Time {
var t time.Time
if ts != 0 {
t = time.Unix(0, ts)
}
return t
}
18 changes: 17 additions & 1 deletion cmd/query/app/apiv3/http_gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,10 +135,26 @@ func (h *HTTPGateway) getTrace(w http.ResponseWriter, r *http.Request) {
if h.tryParamError(w, err, paramTraceID) {
return
}
// TODO: add start time & end time
request := spanstore.GetTraceParameters{
TraceID: traceID,
}
http_query := r.URL.Query()
startTime := http_query.Get(paramStartTime)
if startTime != "" {
timeParsed, err := time.Parse(time.RFC3339Nano, startTime)
if h.tryParamError(w, err, paramStartTime) {
return
}
request.StartTime = timeParsed.UTC()
}
endTime := http_query.Get(paramEndTime)
if endTime != "" {
timeParsed, err := time.Parse(time.RFC3339Nano, endTime)
if h.tryParamError(w, err, paramEndTime) {
return
}
request.EndTime = timeParsed.UTC()
}
trc, err := h.QueryService.GetTrace(r.Context(), request)
if h.tryHandleError(w, err, http.StatusInternalServerError) {
return
Expand Down
113 changes: 99 additions & 14 deletions cmd/query/app/apiv3/http_gateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/cmd/query/app/querysvc"
"github.com/jaegertracing/jaeger/model"
"github.com/jaegertracing/jaeger/pkg/jtracer"
"github.com/jaegertracing/jaeger/pkg/testutils"
"github.com/jaegertracing/jaeger/storage/spanstore"
Expand Down Expand Up @@ -90,27 +91,111 @@ func TestHTTPGatewayTryHandleError(t *testing.T) {
assert.Contains(t, string(w.Body.String()), e, "writes error message to body")
}

func TestHTTPGatewayGetTraceErrors(t *testing.T) {
gw := setupHTTPGatewayNoServer(t, "")
func TestHTTPGatewayGetTrace(t *testing.T) {
traceId, _ := model.TraceIDFromString("123")
testCases := []struct {
name string
params map[string]string
expectedQuery spanstore.GetTraceParameters
}{
{
name: "TestGetTrace",
params: map[string]string{},
expectedQuery: spanstore.GetTraceParameters{
TraceID: traceId,
},
},
{
name: "TestGetTraceWithTimeWindow",
params: map[string]string{
"start_time": "2000-01-02T12:30:08.999999998Z",
"end_time": "2000-04-05T21:55:16.999999992+08:00",
},
expectedQuery: spanstore.GetTraceParameters{
TraceID: traceId,
StartTime: time.Date(2000, time.January, 0o2, 12, 30, 8, 999999998, time.UTC),
EndTime: time.Date(2000, time.April, 0o5, 13, 55, 16, 999999992, time.UTC),
},
},
}

// malformed trace id
r, err := http.NewRequest(http.MethodGet, "/api/v3/traces/xyz", nil)
require.NoError(t, err)
w := httptest.NewRecorder()
gw.router.ServeHTTP(w, r)
assert.Contains(t, w.Body.String(), "malformed parameter trace_id")
testUri := "/api/v3/traces/123"

// error from span reader
const simErr = "simulated error"
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
gw := setupHTTPGatewayNoServer(t, "")
gw.reader.
On("GetTrace", matchContext, tc.expectedQuery).
Return(&model.Trace{}, nil).Once()

q := url.Values{}
for k, v := range tc.params {
q.Set(k, v)
}
testUrl := testUri
if len(tc.params) > 0 {
testUrl += "?" + q.Encode()
}

r, err := http.NewRequest(http.MethodGet, testUrl, nil)
require.NoError(t, err)
w := httptest.NewRecorder()
gw.router.ServeHTTP(w, r)
gw.reader.AssertCalled(t, "GetTrace", matchContext, tc.expectedQuery)
})
}
}

func TestHTTPGatewayGetTraceMalformedInputErrors(t *testing.T) {
testCases := []struct {
name string
requestUrl string
expectedError string
}{
{
name: "TestGetTrace",
requestUrl: "/api/v3/traces/xyz",
expectedError: "malformed parameter trace_id",
},
{
name: "TestGetTraceWithInvalidStartTime",
requestUrl: "/api/v3/traces/123?start_time=abc",
expectedError: "malformed parameter start_time",
},
{
name: "TestGetTraceWithInvalidEndTime",
requestUrl: "/api/v3/traces/123?end_time=xyz",
expectedError: "malformed parameter end_time",
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
gw := setupHTTPGatewayNoServer(t, "")
gw.reader.
On("GetTrace", matchContext, matchGetTraceParameters).
Return(&model.Trace{}, nil).Once()

r, err := http.NewRequest(http.MethodGet, tc.requestUrl, nil)
require.NoError(t, err)
w := httptest.NewRecorder()
gw.router.ServeHTTP(w, r)
assert.Contains(t, w.Body.String(), tc.expectedError)
})
}
}

func TestHTTPGatewayGetTraceInternalErrors(t *testing.T) {
gw := setupHTTPGatewayNoServer(t, "")
gw.reader.
On("GetTrace", matchContext, matchGetTraceParameters).
Return(nil, errors.New(simErr)).Once()
Return(nil, assert.AnError).Once()

r, err = http.NewRequest(http.MethodGet, "/api/v3/traces/123", nil)
r, err := http.NewRequest(http.MethodGet, "/api/v3/traces/123", nil)
require.NoError(t, err)
w = httptest.NewRecorder()
w := httptest.NewRecorder()
gw.router.ServeHTTP(w, r)
assert.Contains(t, w.Body.String(), simErr)
assert.Contains(t, w.Body.String(), assert.AnError.Error())
}

func mockFindQueries() (url.Values, *spanstore.TraceQueryParameters) {
Expand Down
8 changes: 8 additions & 0 deletions cmd/query/app/querysvc/adjuster/adjuster.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,14 @@ type Adjuster interface {
Adjust(ptrace.Traces) error
}

// Func is a type alias that wraps a function and makes an Adjuster from it.
type Func func(traces ptrace.Traces) error

// Adjust implements Adjuster interface for the Func alias.
func (f Func) Adjust(traces ptrace.Traces) error {
return f(traces)
}

// Sequence creates an adjuster that combines a series of adjusters
// applied in order. Errors from each step are accumulated and returned
// in the end as a single wrapper error. Errors do not interrupt the
Expand Down
2 changes: 2 additions & 0 deletions cmd/query/app/querysvc/adjuster/ipattribute.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (
"go.opentelemetry.io/collector/pdata/ptrace"
)

var _ Adjuster = (*IPAttributeAdjuster)(nil)

var ipAttributesToCorrect = map[string]struct{}{
"ip": {},
"peer.ipv4": {},
Expand Down
2 changes: 2 additions & 0 deletions cmd/query/app/querysvc/adjuster/resourceattributes.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (
"github.com/jaegertracing/jaeger/pkg/otelsemconv"
)

var _ Adjuster = (*ResourceAttributesAdjuster)(nil)

var libraryKeys = map[string]struct{}{
string(otelsemconv.TelemetrySDKLanguageKey): {},
string(otelsemconv.TelemetrySDKNameKey): {},
Expand Down
Loading

0 comments on commit cda2bbd

Please sign in to comment.