-
Notifications
You must be signed in to change notification settings - Fork 11
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #23 from shezadkhan137/add-otel-support
Add OpenTelemetry support
- Loading branch information
Showing
11 changed files
with
1,048 additions
and
10 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,16 @@ | ||
name: tests | ||
on: [push, pull_request] | ||
jobs: | ||
tests: | ||
strategy: | ||
matrix: | ||
go-version: [1.20.x, 1.21.x] | ||
runs-on: ubuntu-latest | ||
steps: | ||
- uses: actions/checkout@v2 | ||
- uses: actions/setup-go@v2 | ||
name: "install go" | ||
with: | ||
go-version: ${{ matrix.go-version }} | ||
- name: "tests" | ||
run: make test |
This file was deleted.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,38 @@ | ||
// Tracing provides decorators which enable distributed tracing | ||
// | ||
// How it works | ||
// | ||
// This package provides two decorators which can be used to | ||
// propagate tracing information. The topic decorator "tracing.Topic" | ||
// will automatically attach tracing information to any outgoing | ||
// messages. If no parent trace exists, it will create one automatically. | ||
// The second decorator, tracing.Receiver is used to decode tracing information | ||
// into the context.Context object which is passed into the receiver that you | ||
// provide handle messages. Again if to trace is present a trace is started and | ||
// set in the context. | ||
// | ||
// Examples | ||
// | ||
// Using the tracing.Topic: | ||
// | ||
// func ExampleTopic() { | ||
// // make a concrete topic eg SNS | ||
// topic, _ := sns.NewTopic("arn://sns:xxx") | ||
// // make a tracing topic with the span name "msg.Writer" | ||
// topic := tracing.TracingTopic(topic, tracing.WithSpanName("msg.Writer")) | ||
// // use topic as you would without tracing | ||
// } | ||
// | ||
// Using the tracing.Receiver: | ||
// | ||
// func ExampleReceiver() { | ||
// receiver := msg.Receiver(func(ctx context.Context, m *msg.Message) error { | ||
// // your receiver implementation | ||
// // ctx will contain tracing information | ||
// // once decorated | ||
// }) | ||
// receiver := tracing.Receiver(receiver) | ||
// // use receiver as you would without tracing | ||
// } | ||
// | ||
package tracing |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,123 @@ | ||
package tracing | ||
|
||
import ( | ||
"context" | ||
"encoding/base64" | ||
|
||
"github.com/zerofox-oss/go-msg" | ||
"go.opencensus.io/trace/propagation" | ||
|
||
"go.opentelemetry.io/otel" | ||
"go.opentelemetry.io/otel/trace" | ||
|
||
ocbridge "go.opentelemetry.io/otel/bridge/opencensus" | ||
) | ||
|
||
var tracer = otel.Tracer("github.com/zerofox-oss/go-msg/decorators/otel") | ||
|
||
const traceContextKey = "Tracecontext" | ||
const traceStateKey = "Tracestate" | ||
|
||
type Options struct { | ||
SpanName string | ||
StartOptions trace.SpanStartOption | ||
OnlyOtel bool | ||
} | ||
|
||
type Option func(*Options) | ||
|
||
func WithSpanName(spanName string) Option { | ||
return func(o *Options) { | ||
o.SpanName = spanName | ||
} | ||
} | ||
|
||
func WithStartOption(so trace.SpanStartOption) Option { | ||
return func(o *Options) { | ||
o.StartOptions = so | ||
} | ||
} | ||
|
||
func WithOnlyOtel(onlyOtel bool) Option { | ||
return func(o *Options) { | ||
o.OnlyOtel = onlyOtel | ||
} | ||
} | ||
|
||
// Receiver Wraps another msg.Receiver, populating | ||
// the context with any upstream tracing information. | ||
func Receiver(next msg.Receiver, opts ...Option) msg.Receiver { | ||
|
||
options := &Options{ | ||
SpanName: "msg.Receiver", | ||
} | ||
|
||
for _, opt := range opts { | ||
opt(options) | ||
} | ||
|
||
return msg.ReceiverFunc(func(ctx context.Context, m *msg.Message) error { | ||
ctx, span := withContext(ctx, m, options) | ||
defer span.End() | ||
return next.Receive(ctx, m) | ||
}) | ||
} | ||
|
||
// withContext checks to see if a traceContext is | ||
// present in the message attributes. If one is present | ||
// a new span is created with that tracecontext as the parent | ||
// otherwise a new span is created without a parent. A new context | ||
// which contains the created span well as the span itself | ||
// is returned | ||
func withContext(ctx context.Context, m *msg.Message, options *Options) (context.Context, trace.Span) { | ||
|
||
textCarrier := msgAttributesTextCarrier{attributes: &m.Attributes} | ||
tmprop := otel.GetTextMapPropagator() | ||
|
||
// if any of the fields used by | ||
// the text map propagation is set | ||
// we use otel to decode | ||
for _, field := range tmprop.Fields() { | ||
if m.Attributes.Get(field) != "" { | ||
ctx = tmprop.Extract(ctx, textCarrier) | ||
return tracer.Start(ctx, options.SpanName) | ||
} | ||
} | ||
|
||
// if we are set to use only otel | ||
// do not fall back to opencensus | ||
if options.OnlyOtel { | ||
return tracer.Start(ctx, options.SpanName) | ||
} | ||
|
||
// fallback to old behaviour (opencensus) if we don't | ||
// receive any otel headers | ||
traceContextB64 := m.Attributes.Get(traceContextKey) | ||
if traceContextB64 == "" { | ||
return tracer.Start(ctx, options.SpanName) | ||
} | ||
|
||
traceContext, err := base64.StdEncoding.DecodeString(traceContextB64) | ||
if err != nil { | ||
return tracer.Start(ctx, options.SpanName) | ||
} | ||
|
||
spanContext, ok := propagation.FromBinary(traceContext) | ||
if !ok { | ||
return tracer.Start(ctx, options.SpanName) | ||
} | ||
|
||
traceStateString := m.Attributes.Get(traceStateKey) | ||
if traceStateString != "" { | ||
ts := tracestateFromString(traceStateString) | ||
spanContext.Tracestate = ts | ||
} | ||
|
||
// convert the opencensus span context to otel | ||
otelSpanContext := ocbridge.OCSpanContextToOTel(spanContext) | ||
if !otelSpanContext.IsValid() { | ||
return tracer.Start(ctx, options.SpanName) | ||
} | ||
|
||
return tracer.Start(trace.ContextWithRemoteSpanContext(ctx, otelSpanContext), options.SpanName) | ||
} |
Oops, something went wrong.