Skip to content

Commit

Permalink
feat: enable configurable extra labels
Browse files Browse the repository at this point in the history
  • Loading branch information
0x416e746f6e committed Jun 3, 2024
1 parent cbd1183 commit e73d237
Show file tree
Hide file tree
Showing 5 changed files with 69 additions and 33 deletions.
2 changes: 1 addition & 1 deletion cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func main() {
zap.ReplaceGlobals(l)

// inject version
cfg.Metrics.MetricsVersion = version
cfg.Metrics.Version = version

return nil
},
Expand Down
31 changes: 28 additions & 3 deletions cmd/serve.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package main

import (
"fmt"
"slices"
"strings"
"time"

"github.com/flashbots/latency-monitor/config"
Expand All @@ -17,9 +19,18 @@ const (
)

func CommandServe(cfg *config.Config) *cli.Command {
metricsLabels := &cli.StringSlice{}
transponderPeers := &cli.StringSlice{}

metricsFlags := []cli.Flag{
&cli.StringSliceFlag{
Category: categoryMetrics,
Destination: metricsLabels,
EnvVars: []string{envPrefix + "METRICS_LABELS"},
Name: "metrics-label",
Usage: "extra metrics labels in the format `label=value`",
},

&cli.StringFlag{
Category: categoryMetrics,
Destination: &cfg.Metrics.ListenAddress,
Expand Down Expand Up @@ -70,7 +81,7 @@ func CommandServe(cfg *config.Config) *cli.Command {
&cli.StringSliceFlag{
Category: categoryTransponder,
Destination: transponderPeers,
EnvVars: []string{envPrefix + "TRANSPONDER_PEER"},
EnvVars: []string{envPrefix + "TRANSPONDER_PEERS"},
Name: "transponder-peer",
Usage: "`name=host:port` of the transponder peer to measure the latency against",
},
Expand Down Expand Up @@ -99,16 +110,30 @@ func CommandServe(cfg *config.Config) *cli.Command {
Flags: flags,

Before: func(ctx *cli.Context) error {
// metrics labels
l := metricsLabels.Value()
labels := make(map[string]string, len(l))
for _, strLabel := range l {
parts := strings.Split(strLabel, "=")
if len(parts) != 2 {
return fmt.Errorf("invalid label format: %s", strLabel)
}
labels[parts[0]] = parts[1]
}
cfg.Metrics.Labels = labels

// transponder peers
p := transponderPeers.Value()
peers := make([]types.Peer, 0, len(p))
for _, s := range p {
peer, err := types.NewPeer(s)
for _, strPeer := range p {
peer, err := types.NewPeer(strPeer)
if err != nil {
return err
}
peers = append(peers, peer)
}
cfg.Transponder.Peers = peers

return nil
},

Expand Down
4 changes: 3 additions & 1 deletion config/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@ package config
type Metrics struct {
ListenAddress string `yaml:"metrics_listen_address"`

Labels map[string]string

LatencyBucketsCount int `yaml:"metrics_latency_buckets_count"`
MaxLatencyUs int `yaml:"metrics_max_latency_us"`

MetricsVersion string `yaml:"metrics_version"`
Version string `yaml:"metrics_version"`
}
50 changes: 25 additions & 25 deletions server/probes.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
"github.com/flashbots/latency-monitor/metrics"
"github.com/flashbots/latency-monitor/transponder"
"github.com/flashbots/latency-monitor/types"
"go.opentelemetry.io/otel/attribute"
otelattr "go.opentelemetry.io/otel/attribute"
otelapi "go.opentelemetry.io/otel/metric"
"go.uber.org/zap"
)
Expand All @@ -29,8 +29,8 @@ func (s *Server) sendProbes(ctx context.Context, t *transponder.Transponder) {
for peerUUID, peer := range s.peers {
addr, err := peer.UDPAddress()
if err != nil {
metrics.CounterFailedProbeSend.Add(ctx, 1, otelapi.WithAttributes(
attribute.String("error_type", reflect.TypeOf(err).String()),
metrics.CounterFailedProbeSend.Add(ctx, 1, s.labels, otelapi.WithAttributes(
otelattr.String("error_type", reflect.TypeOf(err).String()),
))
l.Error("Failed to send a probe",
zap.Error(err),
Expand All @@ -48,8 +48,8 @@ func (s *Server) sendProbes(ctx context.Context, t *transponder.Transponder) {

b, err := p.MarshalBinary()
if err != nil {
metrics.CounterFailedProbeSend.Add(ctx, 1, otelapi.WithAttributes(
attribute.String("error_type", reflect.TypeOf(err).String()),
metrics.CounterFailedProbeSend.Add(ctx, 1, s.labels, otelapi.WithAttributes(
otelattr.String("error_type", reflect.TypeOf(err).String()),
))
l.Error("Failed to prepare a probe",
zap.Error(err),
Expand All @@ -58,16 +58,16 @@ func (s *Server) sendProbes(ctx context.Context, t *transponder.Transponder) {
}

t.Send(b, addr, func(err error) {
metrics.CounterFailedProbeSend.Add(ctx, 1, otelapi.WithAttributes(
attribute.String("error_type", reflect.TypeOf(err).String()),
metrics.CounterFailedProbeSend.Add(ctx, 1, s.labels, otelapi.WithAttributes(
otelattr.String("error_type", reflect.TypeOf(err).String()),
))
l.Error("Failed to send a probe",
zap.Error(err),
)
})

metrics.CountProbeSent.Add(ctx, 1, otelapi.WithAttributes(
attribute.String("peer", peer.Name()),
metrics.CountProbeSent.Add(ctx, 1, s.labels, otelapi.WithAttributes(
otelattr.String("peer", peer.Name()),
))
l.Debug("Sent a probe",
zap.String("name", peer.Name()),
Expand All @@ -83,8 +83,8 @@ func (s *Server) receiveProbes(ctx context.Context) transponder.Receive {

p := types.Probe{}
if err := p.UnmarshalBinary(input); err != nil {
metrics.CounterInvalidProbeReceived.Add(ctx, 1, otelapi.WithAttributes(
attribute.String("error_type", reflect.TypeOf(err).String()),
metrics.CounterInvalidProbeReceived.Add(ctx, 1, s.labels, otelapi.WithAttributes(
otelattr.String("error_type", reflect.TypeOf(err).String()),
))
l.Error("Invalid probe",
zap.Error(err),
Expand All @@ -99,8 +99,8 @@ func (s *Server) receiveProbes(ctx context.Context) transponder.Receive {
p.DstTimestamp = ts
output, err := p.MarshalBinary()
if err != nil {
metrics.CounterFailedProbeRespond.Add(ctx, 1, otelapi.WithAttributes(
attribute.String("error_type", reflect.TypeOf(err).String()),
metrics.CounterFailedProbeRespond.Add(ctx, 1, s.labels, otelapi.WithAttributes(
otelattr.String("error_type", reflect.TypeOf(err).String()),
))
l.Error("Failed to prepare response to a probe",
zap.Error(err),
Expand All @@ -110,8 +110,8 @@ func (s *Server) receiveProbes(ctx context.Context) transponder.Receive {

go func() {
t.Send(output, source, func(err error) {
metrics.CounterFailedProbeRespond.Add(ctx, 1, otelapi.WithAttributes(
attribute.String("error_type", reflect.TypeOf(err).String()),
metrics.CounterFailedProbeRespond.Add(ctx, 1, s.labels, otelapi.WithAttributes(
otelattr.String("error_type", reflect.TypeOf(err).String()),
))
l.Error("Failed to respond to a probe",
zap.Error(err),
Expand All @@ -125,8 +125,8 @@ func (s *Server) receiveProbes(ctx context.Context) transponder.Receive {
err := fmt.Errorf("%w: %s",
ErrUnexpectedDstUUIDOnReturn, p.DstUUID.String(),
)
metrics.CounterInvalidProbeReceived.Add(ctx, 1, otelapi.WithAttributes(
attribute.String("error_type", reflect.TypeOf(err).String()),
metrics.CounterInvalidProbeReceived.Add(ctx, 1, s.labels, otelapi.WithAttributes(
otelattr.String("error_type", reflect.TypeOf(err).String()),
))
l.Error("Invalid return probe",
zap.Error(err),
Expand All @@ -136,17 +136,17 @@ func (s *Server) receiveProbes(ctx context.Context) transponder.Receive {
}

forwardLatency := float64(p.DstTimestamp.Sub(p.SrcTimestamp).Microseconds())
metrics.HistogramLatencyForwardTrip.Record(ctx, forwardLatency, otelapi.WithAttributes(
attribute.String("peer", peer.Name()),
metrics.HistogramLatencyForwardTrip.Record(ctx, forwardLatency, s.labels, otelapi.WithAttributes(
otelattr.String("peer", peer.Name()),
))

returnLatency := float64(ts.Sub(p.DstTimestamp).Microseconds())
metrics.HistogramLatencyReturnTrip.Record(ctx, returnLatency, otelapi.WithAttributes(
attribute.String("peer", peer.Name()),
metrics.HistogramLatencyReturnTrip.Record(ctx, returnLatency, s.labels, otelapi.WithAttributes(
otelattr.String("peer", peer.Name()),
))

metrics.CountProbeReturned.Add(ctx, 1, otelapi.WithAttributes(
attribute.String("peer", peer.Name()),
metrics.CountProbeReturned.Add(ctx, 1, s.labels, otelapi.WithAttributes(
otelattr.String("peer", peer.Name()),
))
l.Debug("Received a return probe",
zap.Float64("forward_latency_ms", forwardLatency),
Expand All @@ -160,8 +160,8 @@ func (s *Server) receiveProbes(ctx context.Context) transponder.Receive {
err := fmt.Errorf("%w: source %s, destination %s",
ErrUnexpectedSrcDstUUIDs, p.SrcUUID.String(), p.DstUUID.String(),
)
metrics.CounterInvalidProbeReceived.Add(ctx, 1, otelapi.WithAttributes(
attribute.String("error_type", reflect.TypeOf(err).String()),
metrics.CounterInvalidProbeReceived.Add(ctx, 1, s.labels, otelapi.WithAttributes(
otelattr.String("error_type", reflect.TypeOf(err).String()),
))
l.Error("Invalid probe",
zap.Error(err),
Expand Down
15 changes: 12 additions & 3 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,19 @@ import (
"github.com/flashbots/latency-monitor/types"
"github.com/google/uuid"
"github.com/prometheus/client_golang/prometheus/promhttp"
otelattr "go.opentelemetry.io/otel/attribute"
otelapi "go.opentelemetry.io/otel/metric"
"go.uber.org/zap"
)

type Server struct {
cfg *config.Config
log *zap.Logger

uuid uuid.UUID

uuid uuid.UUID
peers map[uuid.UUID]*types.Peer

labels otelapi.MeasurementOption
}

func New(cfg *config.Config) (*Server, error) {
Expand All @@ -37,6 +40,11 @@ func New(cfg *config.Config) (*Server, error) {
return nil, err
}

labels := make([]otelattr.KeyValue, 0, len(cfg.Metrics.Labels))
for k, v := range cfg.Metrics.Labels {
labels = append(labels, otelattr.String(k, v))
}

peers := make(map[uuid.UUID]*types.Peer, len(cfg.Transponder.Peers))
for _, peer := range cfg.Transponder.Peers {
peerUUID := srvUUID
Expand All @@ -55,7 +63,8 @@ func New(cfg *config.Config) (*Server, error) {
log: l,
uuid: srvUUID,

peers: peers,
labels: otelapi.WithAttributeSet(otelattr.NewSet(labels...)),
peers: peers,
}, nil
}

Expand Down

0 comments on commit e73d237

Please sign in to comment.