-
Notifications
You must be signed in to change notification settings - Fork 11
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Shezad Khan
committed
Aug 2, 2019
1 parent
10f033d
commit 9df3a54
Showing
6 changed files
with
539 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,38 @@ | ||
// Tracing provides decorators which enable distributed tracing | ||
// | ||
// How it works | ||
// | ||
// This package provides two decorators which can be used to | ||
// propagate tracing information. The topic decorator "tracing.Topic" | ||
// will automatically attach tracing information to any outgoing | ||
// messages. If no parent trace exists, it will create one automatically. | ||
// The second decorator, tracing.Receiver is used to decode tracing information | ||
// into the context.Context object which is passed into the receiver that you | ||
// provide handle messages. Again if to trace is present a trace is started and | ||
// set in the context. | ||
// | ||
// Examples | ||
// | ||
// Using the tracing.Topic: | ||
// | ||
// func ExampleTopic() { | ||
// // make a concrete topic eg SNS | ||
// topic, _ := sns.NewTopic("arn://sns:xxx") | ||
// // make a tracing topic with the span name "msg.Writer" | ||
// topic := tracing.TracingTopic(topic, tracing.WithSpanName("msg.Writer")) | ||
// // use topic as you would without tracing | ||
// } | ||
// | ||
// Using the tracing.Receiver: | ||
// | ||
// func ExampleReceiver() { | ||
// receiver := msg.Receiver(func(ctx context.Context, m *msg.Message) error { | ||
// // your receiver implementation | ||
// // ctx will contain tracing information | ||
// // once decorated | ||
// }) | ||
// receiver := tracing.Receiver(receiver) | ||
// // use receiver as you would without tracing | ||
// } | ||
// | ||
package tracing |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,81 @@ | ||
package tracing | ||
|
||
import ( | ||
"context" | ||
"encoding/base64" | ||
|
||
"github.com/zerofox-oss/go-msg" | ||
"go.opencensus.io/trace" | ||
"go.opencensus.io/trace/propagation" | ||
) | ||
|
||
const traceContextKey = "Tracecontext" | ||
|
||
type Options struct { | ||
SpanName string | ||
StartOptions trace.StartOptions | ||
} | ||
|
||
type Option func(*Options) | ||
|
||
func WithSpanName(spanName string) Option { | ||
return func(o *Options) { | ||
o.SpanName = spanName | ||
} | ||
} | ||
|
||
func WithStartOption(so trace.StartOptions) Option { | ||
return func(o *Options) { | ||
o.StartOptions = so | ||
} | ||
} | ||
|
||
// Receiver Wraps another msg.Receiver, populating | ||
// the context with any upstream tracing information. | ||
func Receiver(next msg.Receiver, opts ...Option) msg.Receiver { | ||
|
||
options := &Options{ | ||
SpanName: "msg.Receiver", | ||
StartOptions: trace.StartOptions{ | ||
Sampler: trace.AlwaysSample(), | ||
}, | ||
} | ||
|
||
for _, opt := range opts { | ||
opt(options) | ||
} | ||
|
||
return msg.ReceiverFunc(func(ctx context.Context, m *msg.Message) error { | ||
ctx, span := withContext(ctx, m, options) | ||
defer span.End() | ||
return next.Receive(ctx, m) | ||
}) | ||
} | ||
|
||
// withContext checks to see if a traceContext is | ||
// present in the message attributes. If one is present | ||
// a new span is created with that tracecontext as the parent | ||
// otherwise a new span is created without a parent. A new context | ||
// which contains the created span well as the span itself | ||
// is returned | ||
func withContext(ctx context.Context, m *msg.Message, options *Options) (context.Context, *trace.Span) { | ||
traceContextB64 := m.Attributes.Get(traceContextKey) | ||
|
||
startOptions := options.StartOptions | ||
|
||
if traceContextB64 == "" { | ||
return trace.StartSpan(ctx, options.SpanName, trace.WithSampler(startOptions.Sampler)) | ||
} | ||
|
||
traceContext, err := base64.StdEncoding.DecodeString(traceContextB64) | ||
if err != nil { | ||
return trace.StartSpan(ctx, options.SpanName, trace.WithSampler(startOptions.Sampler)) | ||
} | ||
|
||
spanContext, ok := propagation.FromBinary(traceContext) | ||
if !ok { | ||
return trace.StartSpan(ctx, options.SpanName, trace.WithSampler(startOptions.Sampler)) | ||
} | ||
|
||
return trace.StartSpanWithRemoteParent(ctx, options.SpanName, spanContext, trace.WithSampler(startOptions.Sampler)) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,246 @@ | ||
package tracing | ||
|
||
import ( | ||
"bytes" | ||
"context" | ||
"crypto/rand" | ||
"encoding/base64" | ||
"io/ioutil" | ||
"testing" | ||
|
||
"github.com/google/go-cmp/cmp" | ||
"github.com/zerofox-oss/go-msg" | ||
"go.opencensus.io/trace" | ||
"go.opencensus.io/trace/propagation" | ||
) | ||
|
||
type msgWithContext struct { | ||
msg *msg.Message | ||
ctx context.Context | ||
} | ||
|
||
type ChanReceiver struct { | ||
c chan msgWithContext | ||
} | ||
|
||
func (r ChanReceiver) Receive(ctx context.Context, m *msg.Message) error { | ||
r.c <- msgWithContext{msg: m, ctx: ctx} | ||
return nil | ||
} | ||
|
||
func makeSpanContext() (trace.SpanContext, string) { | ||
b := make([]byte, 24) | ||
rand.Read(b) | ||
|
||
var tid [16]byte | ||
var sid [8]byte | ||
|
||
copy(tid[:], b[:16]) | ||
copy(sid[:], b[:8]) | ||
|
||
sc := trace.SpanContext{ | ||
TraceID: tid, | ||
SpanID: sid, | ||
} | ||
|
||
b64 := base64.StdEncoding.EncodeToString(propagation.Binary(sc)) | ||
return sc, b64 | ||
} | ||
|
||
// Tests that when a Receiver is wrapped by TracingReceiver, and tracecontext | ||
// is present, a span is started and set in the receive context with the correct | ||
// parent context | ||
func TestDecoder_SuccessfullyDecodesSpanWhenTraceContextIsPresent(t *testing.T) { | ||
testFinish := make(chan struct{}) | ||
msgChan := make(chan msgWithContext) | ||
r := Receiver(ChanReceiver{ | ||
c: msgChan, | ||
}) | ||
|
||
ctx, cancel := context.WithCancel(context.Background()) | ||
defer cancel() | ||
|
||
sc, b64Sc := makeSpanContext() | ||
|
||
// Construct a message with base64 encoding (YWJjMTIz == abc123) | ||
m := &msg.Message{ | ||
Body: bytes.NewBufferString("hello"), | ||
Attributes: msg.Attributes{}, | ||
} | ||
m.Attributes.Set("Tracecontext", b64Sc) | ||
|
||
// Wait for ChanReceiver to write the message to msgChan, assert on the body | ||
go func() { | ||
result := <-msgChan | ||
|
||
expectedBody := "hello" | ||
actual, _ := ioutil.ReadAll(result.msg.Body) | ||
if string(actual) != expectedBody { | ||
t.Errorf("Expected Body to be %v, got %v", expectedBody, string(actual)) | ||
} | ||
|
||
span := trace.FromContext(result.ctx) | ||
if span == nil { | ||
t.Errorf("span was not expected to be nil") | ||
} | ||
|
||
receivedSC := span.SpanContext() | ||
|
||
if receivedSC.TraceID != sc.TraceID { | ||
t.Errorf(cmp.Diff(receivedSC.TraceID, sc.TraceID)) | ||
} | ||
|
||
if receivedSC.Tracestate != sc.Tracestate { | ||
t.Errorf(cmp.Diff(receivedSC.TraceID, sc.TraceID)) | ||
} | ||
|
||
testFinish <- struct{}{} | ||
}() | ||
|
||
// Receive the message! | ||
err := r.Receive(ctx, m) | ||
if err != nil { | ||
t.Error(err) | ||
return | ||
} | ||
<-testFinish | ||
} | ||
|
||
// Tests that when a Receiver is wrapped by a Tracing Receiver, and | ||
// the message does not contain a tracecontext, a new span is created | ||
func TestDecoder_SuccessfullySetsSpanWhenNoTraceContext(t *testing.T) { | ||
testFinish := make(chan struct{}) | ||
msgChan := make(chan msgWithContext) | ||
r := Receiver(ChanReceiver{ | ||
c: msgChan, | ||
}) | ||
|
||
ctx, cancel := context.WithCancel(context.Background()) | ||
defer cancel() | ||
|
||
// Construct a message without base64 encoding | ||
m := &msg.Message{ | ||
Body: bytes.NewBufferString("abc123"), | ||
Attributes: msg.Attributes{}, | ||
} | ||
|
||
// Wait for ChanReceiver to write the message to msgChan, assert on the body | ||
go func() { | ||
result := <-msgChan | ||
expectedBody := "abc123" | ||
actual, _ := ioutil.ReadAll(result.msg.Body) | ||
if string(actual) != expectedBody { | ||
t.Errorf("Expected Body to be %v, got %v", expectedBody, string(actual)) | ||
} | ||
|
||
span := trace.FromContext(result.ctx) | ||
if span == nil { | ||
t.Errorf("span was not expected to be nil") | ||
} | ||
|
||
testFinish <- struct{}{} | ||
}() | ||
|
||
// Receive the message! | ||
err := r.Receive(ctx, m) | ||
if err != nil { | ||
t.Error(err) | ||
return | ||
} | ||
<-testFinish | ||
} | ||
|
||
// Tests that when a Receiver is wrapped by a Tracing Receiver, and | ||
// the message contains an invalid b64 encodeded tracecontext, a span | ||
// is still sucessfully set | ||
func TestDecoder_SuccessfullySetsSpanWhenInvalidTraceContextB64(t *testing.T) { | ||
testFinish := make(chan struct{}) | ||
msgChan := make(chan msgWithContext) | ||
r := Receiver(ChanReceiver{ | ||
c: msgChan, | ||
}) | ||
|
||
ctx, cancel := context.WithCancel(context.Background()) | ||
defer cancel() | ||
|
||
// Construct a message without base64 encoding | ||
m := &msg.Message{ | ||
Body: bytes.NewBufferString("abc123"), | ||
Attributes: msg.Attributes{}, | ||
} | ||
|
||
m.Attributes.Set("Tracecontext", "invalidcontext") | ||
|
||
// Wait for ChanReceiver to write the message to msgChan, assert on the body | ||
go func() { | ||
result := <-msgChan | ||
expectedBody := "abc123" | ||
actual, _ := ioutil.ReadAll(result.msg.Body) | ||
if string(actual) != expectedBody { | ||
t.Errorf("Expected Body to be %v, got %v", expectedBody, string(actual)) | ||
} | ||
|
||
span := trace.FromContext(result.ctx) | ||
if span == nil { | ||
t.Errorf("span was not expected to be nil") | ||
} | ||
|
||
testFinish <- struct{}{} | ||
}() | ||
|
||
// Receive the message! | ||
err := r.Receive(ctx, m) | ||
if err != nil { | ||
t.Error(err) | ||
return | ||
} | ||
<-testFinish | ||
} | ||
|
||
// Tests that when a Receiver is wrapped by a Tracing Receiver, and | ||
// the message contains an invalid binary encodeded tracecontext, a span | ||
// is still sucessfully set | ||
func TestDecoder_SuccessfullySetsSpanWhenInvalidTraceContextBinary(t *testing.T) { | ||
testFinish := make(chan struct{}) | ||
msgChan := make(chan msgWithContext) | ||
r := Receiver(ChanReceiver{ | ||
c: msgChan, | ||
}) | ||
|
||
ctx, cancel := context.WithCancel(context.Background()) | ||
defer cancel() | ||
|
||
// Construct a message without base64 encoding | ||
m := &msg.Message{ | ||
Body: bytes.NewBufferString("abc123"), | ||
Attributes: msg.Attributes{}, | ||
} | ||
|
||
// "YWJjMTIz" is valid b64 | ||
m.Attributes.Set("Tracecontext", "YWJjMTIz") | ||
|
||
// Wait for ChanReceiver to write the message to msgChan, assert on the body | ||
go func() { | ||
result := <-msgChan | ||
expectedBody := "abc123" | ||
actual, _ := ioutil.ReadAll(result.msg.Body) | ||
if string(actual) != expectedBody { | ||
t.Errorf("Expected Body to be %v, got %v", expectedBody, string(actual)) | ||
} | ||
|
||
span := trace.FromContext(result.ctx) | ||
if span == nil { | ||
t.Errorf("span was not expected to be nil") | ||
} | ||
|
||
testFinish <- struct{}{} | ||
}() | ||
|
||
// Receive the message! | ||
err := r.Receive(ctx, m) | ||
if err != nil { | ||
t.Error(err) | ||
return | ||
} | ||
<-testFinish | ||
} |
Oops, something went wrong.