-
Notifications
You must be signed in to change notification settings - Fork 47
/
context.go
187 lines (149 loc) · 5.09 KB
/
context.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
package reqctx
import (
"context"
"errors"
"io"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace/noop"
"github.com/streamingfast/dmetering"
"github.com/streamingfast/logging"
"github.com/streamingfast/substreams/metrics"
"go.opentelemetry.io/otel/codes"
ttrace "go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
)
type contextKeyType int
var detailsKey = contextKeyType(0)
var tracerKey = contextKeyType(2)
var spanKey = contextKeyType(3)
var reqStatsKey = contextKeyType(4)
var moduleExecutionTracingConfigKey = contextKeyType(5)
var outputModuleHashKey = contextKeyType(6)
var tier2RequestParametersKeyKey = contextKeyType(7)
func Logger(ctx context.Context) *zap.Logger {
return logging.Logger(ctx, zap.NewNop())
}
var WithLogger = logging.WithLogger
func Tracer(ctx context.Context) ttrace.Tracer {
tracer := ctx.Value(tracerKey)
if t, ok := tracer.(ttrace.Tracer); ok {
return t
}
return noop.NewTracerProvider().Tracer("")
}
func WithTracer(ctx context.Context, tracer ttrace.Tracer) context.Context {
return context.WithValue(ctx, tracerKey, tracer)
}
func ReqStats(ctx context.Context) *metrics.Stats {
return ctx.Value(reqStatsKey).(*metrics.Stats)
}
func WithReqStats(ctx context.Context, stats *metrics.Stats) context.Context {
return context.WithValue(ctx, reqStatsKey, stats)
}
func Span(ctx context.Context) ISpan {
s := ctx.Value(spanKey)
if t, ok := s.(*span); ok {
return t
}
return &NoopSpan{}
}
func WithModuleExecutionSpan(ctx context.Context, name string) (context.Context, ISpan) {
if !ModuleExecutionTracing(ctx) {
return ctx, &NoopSpan{}
}
ctx, nativeSpan := Tracer(ctx).Start(ctx, name)
s := &span{Span: nativeSpan, name: name}
return context.WithValue(ctx, spanKey, s), s
}
func WithSpan(ctx context.Context, name string) (context.Context, ISpan) {
ctx, nativeSpan := Tracer(ctx).Start(ctx, name)
s := &span{Span: nativeSpan, name: name}
return context.WithValue(ctx, spanKey, s), s
}
type emitterKeyType struct{}
var emitterKey = emitterKeyType{}
func Emitter(ctx context.Context) dmetering.EventEmitter {
emitter := ctx.Value(emitterKey)
if t, ok := emitter.(dmetering.EventEmitter); ok {
return t
}
return nil
}
func WithEmitter(ctx context.Context, emitter dmetering.EventEmitter) context.Context {
return context.WithValue(ctx, emitterKey, emitter)
}
type ISpan interface {
// End completes the Span. The Span is considered complete and ready to be
// delivered through the rest of the telemetry pipeline after this method
// is called. Therefore, updates to the Span are not allowed after this
// method has been called.
End(options ...ttrace.SpanEndOption)
// AddEvent adds an event with the provided name and options.
AddEvent(name string, options ...ttrace.EventOption)
// IsRecording returns the recording state of the Span. It will return
// true if the Span is active and events can be recorded.
IsRecording() bool
// RecordError will record err as an exception span event for this span. An
// additional call to SetStatus is required if the Status of the Span should
// be set to Error, as this method does not change the Span status. If this
// span is not being recorded or err is nil then this method does nothing.
RecordError(err error, options ...ttrace.EventOption)
// SpanContext returns the SpanContext of the Span. The returned SpanContext
// is usable even after the End method has been called for the Span.
SpanContext() ttrace.SpanContext
// SetStatus sets the status of the Span in the form of a code and a
// description, provided the status hasn't already been set to a higher
// value before (OK > Error > Unset). The description is only included in a
// status when the code is for an error.
SetStatus(code codes.Code, description string)
// SetName sets the Span name.
SetName(name string)
// SetAttributes sets kv as attributes of the Span. If a key from kv
// already exists for an attribute of the Span it will be overwritten with
// the value contained in kv.
SetAttributes(kv ...attribute.KeyValue)
// TracerProvider returns a TracerProvider that can be used to generate
// additional Spans on the same telemetry pipeline as the current Span.
TracerProvider() ttrace.TracerProvider
EndWithErr(e *error)
}
type span struct {
name string
ttrace.Span
}
func (s *span) EndWithErr(e *error) {
defer s.Span.End()
s.SetStatus(codes.Ok, "")
if e == nil {
return
}
err := *e
if err == nil {
return
}
if errors.Is(err, io.EOF) {
return
}
s.Span.RecordError(err)
s.Span.SetStatus(codes.Error, err.Error())
}
func Details(ctx context.Context) *RequestDetails {
details := ctx.Value(detailsKey)
if t, ok := details.(*RequestDetails); ok {
return t
}
return nil
}
func WithRequest(ctx context.Context, req *RequestDetails) context.Context {
return context.WithValue(ctx, detailsKey, req)
}
func ModuleExecutionTracing(ctx context.Context) bool {
tracer := ctx.Value(moduleExecutionTracingConfigKey)
if t, ok := tracer.(bool); ok {
return t
}
return false
}
func WithModuleExecutionTracing(ctx context.Context) context.Context {
return context.WithValue(ctx, moduleExecutionTracingConfigKey, true)
}