Skip to content

Commit

Permalink
fix: address race-condition on-failures
Browse files Browse the repository at this point in the history
  • Loading branch information
0x416e746f6e committed May 27, 2024
1 parent b0104da commit c3a430d
Show file tree
Hide file tree
Showing 6 changed files with 28 additions and 24 deletions.
3 changes: 1 addition & 2 deletions cmd/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func CommandServe(cfg *config.Config) *cli.Command {
Category: categoryTransponder,
Destination: &cfg.Transponder.ListenAddress,
EnvVars: []string{envPrefix + "TRANSPONDER_LISTEN_ADDRESS"},
Name: "transponder-listen-port",
Name: "transponder-listen-address",
Usage: "`host:port` for the transponder to listen on",
Value: "0.0.0.0:32123",
},
Expand All @@ -73,7 +73,6 @@ func CommandServe(cfg *config.Config) *cli.Command {
EnvVars: []string{envPrefix + "TRANSPONDER_PEER"},
Name: "transponder-peer",
Usage: "`name=host:port` of the transponder peer to measure the latency against",
Value: cli.NewStringSlice("localhost=127.0.0.1:32123"),
},
}

Expand Down
10 changes: 5 additions & 5 deletions readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,7 @@ report the statistics as prometheus metrics.

```shell
latency-monitor serve \
--transponder-listen-port 127.0.0.1:32123 \
--transponder-peer localhost=127.0.0.1:32123
--transponder-listen-address 127.0.0.1:32123
```

```shell
Expand Down Expand Up @@ -93,7 +92,8 @@ latency_monitor_return_trip_latency_microseconds_sum{peer="localhost"} 84
```

>
> Note: In real situation it still makes sense to keep localhost among the peers
> to adjust the remote peers for locally incurred implicit latency (that
> is, by subtracting local average latency from all remote ones).
> Note: There is a specially-treated peer name `localhost` that can be used to
> send probes to self. This can be used to adjust the remote peers's
> latencies for locally incurred implicit ones (that is, by subtracting
> local average latency from all remote ones).
>
6 changes: 0 additions & 6 deletions server/handlers.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,9 @@
package server

import (
"errors"
"net/http"
)

var (
ErrUnexpectedDstUUIDOnReturn = errors.New("unexpected destination uuid on probe's return")
ErrUnexpectedSrcDstUUIDs = errors.New("source uuid is not us, but non-zero destination uuid")
)

func (s *Server) handleHealthcheck(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
}
8 changes: 7 additions & 1 deletion server/probes.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package server

import (
"context"
"errors"
"fmt"
"net"
"reflect"
Expand All @@ -17,6 +18,11 @@ import (
"go.uber.org/zap"
)

var (
ErrUnexpectedDstUUIDOnReturn = errors.New("unexpected destination uuid on probe's return")
ErrUnexpectedSrcDstUUIDs = errors.New("source uuid is not us, but non-zero destination uuid")
)

func (s *Server) sendProbes(ctx context.Context, t *transponder.Transponder) {
l := logutils.LoggerFromContext(ctx)

Expand Down Expand Up @@ -89,7 +95,7 @@ func (s *Server) receiveProbes(ctx context.Context) transponder.Receive {
}

switch {
case p.DstUUID == s.uuid && p.DstTimestamp.IsZero(): // reply to the others' probes
case p.DstTimestamp.IsZero(): // reply to the others' probes
p.DstTimestamp = ts
output, err := p.MarshalBinary()
if err != nil {
Expand Down
18 changes: 8 additions & 10 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,7 @@ func New(cfg *config.Config) (*Server, error) {
for _, peer := range cfg.Transponder.Peers {
peerUUID := srvUUID

addr, err := peer.UDPAddress()
if err != nil {
return nil, err
}

if !addr.IP.IsLoopback() {
if peer.Name() != "localhost" {
peerUUID, err = uuid.NewRandom()
if err != nil {
return nil, err
Expand Down Expand Up @@ -85,7 +80,7 @@ func (s *Server) Run() error {

ticker := time.NewTicker(s.cfg.Transponder.Interval)

failure := make(chan error)
failure := make(chan error, 1)

go func() { // run the transponder
l.Info("Latency monitor transponder is going up...",
Expand All @@ -110,21 +105,24 @@ func (s *Server) Run() error {
go func() { // run the ticker
for {
<-ticker.C
if !transponder.IsRunning() {
l.Warn("Transponder is not running...")
continue
}
s.sendProbes(ctx, transponder)
}
}()

{ // wait until termination or internal failure
terminator := make(chan os.Signal, 1)
signal.Notify(terminator, os.Interrupt, syscall.SIGTERM)
stop := <-terminator

select {
case <-terminator:
case stop := <-terminator:
l.Info("Stop signal received; shutting down...",
zap.String("signal", stop.String()),
)
case <-failure:
case err := <-failure:
l.Error("Internal failure; shutting down...",
zap.Error(err),
)
Expand Down
7 changes: 7 additions & 0 deletions transponder/transponder.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,13 @@ func (t *Transponder) Run(ctx context.Context) error {
}
}

func (t *Transponder) IsRunning() bool {
t.mx.Lock()
defer t.mx.Unlock()

return !t.shuttingDown && t.conn != nil
}

func (t *Transponder) setupConnection() error {
t.mx.Lock()
defer t.mx.Unlock()
Expand Down

0 comments on commit c3a430d

Please sign in to comment.