-
Notifications
You must be signed in to change notification settings - Fork 33
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat(metrics): multiple exports (#3099) [SLT-141]
Co-authored-by: Trajan0x <[email protected]>
- Loading branch information
1 parent
9504c26
commit 2e517b3
Showing
8 changed files
with
418 additions
and
20 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,6 @@ | ||
package metrics | ||
|
||
// HeadersToMap converts a string of headers to a map. | ||
func HeadersToMap(val string) map[string]string { | ||
return headersToMap(val) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,83 @@ | ||
package metrics | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
"go.uber.org/multierr" | ||
"sync" | ||
"time" | ||
|
||
"go.opentelemetry.io/otel/sdk/trace" | ||
) | ||
|
||
// MultiExporter is an interface that allows exporting spans to multiple OTLP trace exporters. | ||
type MultiExporter interface { | ||
trace.SpanExporter | ||
AddExporter(exporter trace.SpanExporter) | ||
} | ||
|
||
type multiExporter struct { | ||
exporters []trace.SpanExporter | ||
} | ||
|
||
// NewMultiExporter creates a new multi exporter that forwards spans to multiple OTLP trace exporters. | ||
// It takes in one or more trace.SpanExporter instances and ensures that spans are sent to all of them. | ||
// This is useful when you need to send trace data to multiple backends or endpoints. | ||
func NewMultiExporter(exporters ...trace.SpanExporter) MultiExporter { | ||
return &multiExporter{ | ||
exporters: exporters, | ||
} | ||
} | ||
|
||
const defaultTimeout = 30 * time.Second | ||
|
||
// ExportSpans exports a batch of spans. | ||
func (m *multiExporter) ExportSpans(parentCtx context.Context, ss []trace.ReadOnlySpan) error { | ||
return m.doParallel(parentCtx, func(ctx context.Context, exporter trace.SpanExporter) error { | ||
return exporter.ExportSpans(ctx, ss) | ||
}) | ||
} | ||
|
||
func (m *multiExporter) doParallel(parentCtx context.Context, fn func(context.Context, trace.SpanExporter) error) error { | ||
ctx, cancel := context.WithTimeout(parentCtx, defaultTimeout) | ||
defer cancel() | ||
|
||
var wg sync.WaitGroup | ||
var errors []error | ||
var mu sync.Mutex | ||
|
||
wg.Add(len(m.exporters)) | ||
for _, exporter := range m.exporters { | ||
go func(exporter trace.SpanExporter) { | ||
defer wg.Done() | ||
err := fn(ctx, exporter) | ||
if err != nil { | ||
mu.Lock() | ||
errors = append(errors, fmt.Errorf("error in doMultiple: %w", err)) | ||
mu.Unlock() | ||
} | ||
}(exporter) | ||
} | ||
|
||
wg.Wait() | ||
if len(errors) > 0 { | ||
// nolint: wrapcheck | ||
return multierr.Combine(errors...) | ||
} | ||
|
||
return nil | ||
} | ||
|
||
// Shutdown notifies the exporter of a pending halt to operations. | ||
func (m *multiExporter) Shutdown(ctx context.Context) error { | ||
return m.doParallel(ctx, func(ctx context.Context, exporter trace.SpanExporter) error { | ||
return exporter.Shutdown(ctx) | ||
}) | ||
} | ||
|
||
// AddExporter adds an exporter to the multi exporter. | ||
func (m *multiExporter) AddExporter(exporter trace.SpanExporter) { | ||
m.exporters = append(m.exporters, exporter) | ||
} | ||
|
||
var _ trace.SpanExporter = &multiExporter{} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,45 @@ | ||
package metrics_test | ||
|
||
import ( | ||
"context" | ||
"testing" | ||
|
||
"github.com/stretchr/testify/assert" | ||
"github.com/stretchr/testify/require" | ||
"github.com/synapsecns/sanguine/core/metrics" | ||
sdktrace "go.opentelemetry.io/otel/sdk/trace" | ||
"go.opentelemetry.io/otel/sdk/trace/tracetest" | ||
) | ||
|
||
func TestMultiExporter(t *testing.T) { | ||
// Create in-memory exporters | ||
exporter1 := tracetest.NewInMemoryExporter() | ||
exporter2 := tracetest.NewInMemoryExporter() | ||
|
||
// Create multi-exporter | ||
multiExporter := metrics.NewMultiExporter(exporter1, exporter2) | ||
|
||
// Create test spans | ||
spans := []sdktrace.ReadOnlySpan{ | ||
tracetest.SpanStub{}.Snapshot(), | ||
tracetest.SpanStub{}.Snapshot(), | ||
} | ||
|
||
// Test ExportSpans | ||
err := multiExporter.ExportSpans(context.Background(), spans) | ||
require.NoError(t, err) | ||
|
||
// Verify that spans were exported to both exporters | ||
assert.Equal(t, 2, len(exporter1.GetSpans())) | ||
assert.Equal(t, 2, len(exporter2.GetSpans())) | ||
|
||
// Test Shutdown | ||
err = multiExporter.Shutdown(context.Background()) | ||
require.NoError(t, err) | ||
|
||
// Verify that both exporters were shut down | ||
// Note: InMemoryExporter doesn't have a Stopped() method, so we can't check this directly | ||
// Instead, we can try to export spans again and check for an error | ||
err = multiExporter.ExportSpans(context.Background(), spans) | ||
assert.NoError(t, err, "Expected no error after shutdown") | ||
} |
Oops, something went wrong.