Skip to content

Commit

Permalink
Implement device detection mechanism
Browse files Browse the repository at this point in the history
This commit tries to accomplish two features.

1. React to the device MAC address change and change Link Layer Address
   option appropreately.
2. React to the device down, stop RA, and restart RA when the device
   comes back.

The implementation is netlink-based device detection mechanism. It
subscribes to the rtnetlink event and detects device state update and
device down/deletion.

Signed-off-by: Yutaro Hayakawa <[email protected]>
  • Loading branch information
YutaroHayakawa committed May 25, 2024
1 parent c1e972b commit c09c104
Show file tree
Hide file tree
Showing 5 changed files with 265 additions and 52 deletions.
110 changes: 78 additions & 32 deletions advertiser.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@ import (
"log/slog"
"net/netip"
"reflect"
"slices"
"sync"
"time"

"github.com/mdlayher/ndp"
"github.com/sethvargo/go-retry"
"golang.org/x/sys/unix"
)

Expand All @@ -28,10 +28,10 @@ type advertiser struct {
ifaceStatus *InterfaceStatus
ifaceStatusLock sync.RWMutex

reloadCh chan *InterfaceConfig
stopCh chan any
sock socket
socketCtor socketCtor
reloadCh chan *InterfaceConfig
stopCh chan any
socketCtor socketCtor
deviceWatcher deviceWatcher
}

// An internal structure to represent RS
Expand All @@ -40,18 +40,19 @@ type rsMsg struct {
from netip.Addr
}

func newAdvertiser(initialConfig *InterfaceConfig, ctor socketCtor, logger *slog.Logger) *advertiser {
func newAdvertiser(initialConfig *InterfaceConfig, ctor socketCtor, devWatcher deviceWatcher, logger *slog.Logger) *advertiser {
return &advertiser{
logger: logger.With(slog.String("interface", initialConfig.Name)),
initialConfig: initialConfig,
ifaceStatus: &InterfaceStatus{Name: initialConfig.Name, State: "Unknown"},
reloadCh: make(chan *InterfaceConfig),
stopCh: make(chan any),
socketCtor: ctor,
deviceWatcher: devWatcher,
}
}

func (s *advertiser) createRAMsg(config *InterfaceConfig) *ndp.RouterAdvertisement {
func (s *advertiser) createRAMsg(config *InterfaceConfig, deviceState *deviceState) *ndp.RouterAdvertisement {
return &ndp.RouterAdvertisement{
CurrentHopLimit: uint8(config.CurrentHopLimit),
ManagedConfiguration: config.Managed,
Expand All @@ -60,15 +61,15 @@ func (s *advertiser) createRAMsg(config *InterfaceConfig) *ndp.RouterAdvertiseme
RouterLifetime: time.Duration(config.RouterLifetimeSeconds) * time.Second,
ReachableTime: time.Duration(config.ReachableTimeMilliseconds) * time.Millisecond,
RetransmitTimer: time.Duration(config.RetransmitTimeMilliseconds) * time.Millisecond,
Options: s.createOptions(config),
Options: s.createOptions(config, deviceState),
}
}

func (s *advertiser) createOptions(config *InterfaceConfig) []ndp.Option {
func (s *advertiser) createOptions(config *InterfaceConfig, deviceState *deviceState) []ndp.Option {
options := []ndp.Option{
&ndp.LinkLayerAddress{
Direction: ndp.Source,
Addr: s.sock.hardwareAddr(),
Addr: deviceState.addr,
},
}

Expand Down Expand Up @@ -197,38 +198,60 @@ func (s *advertiser) run(ctx context.Context) {
// The current desired configuration
config := s.initialConfig

// The current device state
devState := deviceState{}

// Set a timestamp for the first "update"
s.setLastUpdate()

// Create the socket
err := retry.Constant(ctx, time.Second, func(ctx context.Context) error {
var err error

s.sock, err = s.socketCtor(config.Name)
if err != nil {
// These are the unrecoverable errors we're aware of now.
if errors.Is(err, unix.EPERM) || errors.Is(err, unix.EINVAL) {
return fmt.Errorf("cannot create socket: %w", err)
}

s.reportFailing(err)
// Watch the device state
devCh, err := s.deviceWatcher.watch(ctx, config.Name)
if err != nil {
s.reportStopped(err)
return
}

return retry.RetryableError(err)
waitDevice:
// Wait for the device to be present and up
for {
select {
case <-ctx.Done():
s.reportStopped(ctx.Err())
return
case dev := <-devCh:
// Update the device state
devState = dev

// If the device is up, we can proceed with the socket creation
if dev.isUp {
break waitDevice
}
}
}

return nil
})
// Create the socket
sock, err := s.socketCtor(config.Name)
if err != nil {
s.reportStopped(err)
return
// These are the unrecoverable errors we're aware of now.
if errors.Is(err, unix.EPERM) || errors.Is(err, unix.EINVAL) {
s.reportStopped(fmt.Errorf("cannot create socket: %w", err))
return
}
// Otherwise, we'll retry
s.reportFailing(err)
goto waitDevice
}

// Launch the RS receiver
rsCh := make(chan *rsMsg)
receiverCtx, cancelReceiver := context.WithCancel(ctx)
go func() {
for {
rs, addr, err := s.sock.recvRS(ctx)
rs, addr, err := sock.recvRS(receiverCtx)
if err != nil {
if receiverCtx.Err() != nil {
return
}
s.reportFailing(err)
continue
}
Expand All @@ -241,7 +264,7 @@ func (s *advertiser) run(ctx context.Context) {
reload:
for {
// RA message
msg := s.createRAMsg(config)
msg := s.createRAMsg(config, &devState)

// For unsolicited RA
ticker := time.NewTicker(time.Duration(config.RAIntervalMilliseconds) * time.Millisecond)
Expand All @@ -252,7 +275,7 @@ reload:
// Reply to RS
//
// TODO: Rate limit this to mitigate RS flooding attack
err := s.sock.sendRA(ctx, rs.from, msg)
err := sock.sendRA(ctx, rs.from, msg)
if err != nil {
s.reportFailing(err)
continue
Expand All @@ -261,7 +284,7 @@ reload:
s.reportRunning()
case <-ticker.C:
// Send unsolicited RA
err := s.sock.sendRA(ctx, netip.IPv6LinkLocalAllNodes(), msg)
err := sock.sendRA(ctx, netip.IPv6LinkLocalAllNodes(), msg)
if err != nil {
s.reportFailing(err)
continue
Expand All @@ -277,6 +300,28 @@ reload:
s.reportReloading()
s.setLastUpdate()
continue reload
case dev := <-devCh:
// Save the old address for comparison
oldAddr := devState.addr

// Update the device state
devState = dev

// Device is stopped. Stop the advertisement
// and wait for the device to be up again.
if !devState.isUp {
cancelReceiver()
s.reportFailing(fmt.Errorf("device is down"))
goto waitDevice
}

// Device address has changed. We need to
// change the Link Layer Address option in the
// RA message. Reload internally.
if !slices.Equal(oldAddr, dev.addr) {
s.reportReloading()
continue reload
}
case <-ctx.Done():
s.reportStopped(ctx.Err())
break reload
Expand All @@ -288,7 +333,8 @@ reload:

}

s.sock.close()
cancelReceiver()
sock.close()
}

func (s *advertiser) status() *InterfaceStatus {
Expand Down
16 changes: 13 additions & 3 deletions daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ type Daemon struct {
reloadCh chan *Config
logger *slog.Logger
socketConstructor socketCtor
deviceWatcher deviceWatcher

advertisers map[string]*advertiser
advertisersLock sync.RWMutex
Expand All @@ -39,6 +40,7 @@ func NewDaemon(config *Config, opts ...DaemonOption) (*Daemon, error) {
reloadCh: make(chan *Config),
logger: slog.Default(),
socketConstructor: newSocket,
deviceWatcher: newDeviceWatcher(),
advertisers: map[string]*advertiser{},
}

Expand Down Expand Up @@ -89,9 +91,9 @@ reload:
// Add new per-interface jobs
for _, c := range toAdd {
d.logger.Info("Adding new RA sender", slog.String("interface", c.Name))
sender := newAdvertiser(c, d.socketConstructor, d.logger)
go sender.run(ctx)
d.advertisers[c.Name] = sender
advertiser := newAdvertiser(c, d.socketConstructor, d.deviceWatcher, d.logger)
go advertiser.run(ctx)
d.advertisers[c.Name] = advertiser
}

// Update (reload) existing workers
Expand Down Expand Up @@ -188,3 +190,11 @@ func withSocketConstructor(c socketCtor) DaemonOption {
d.socketConstructor = c
}
}

// withDeviceWatcher overrides the default device watcher with the provided
// one. For testing purposes only.
func withDeviceWatcher(w deviceWatcher) DaemonOption {
return func(d *Daemon) {
d.deviceWatcher = w
}
}
Loading

0 comments on commit c09c104

Please sign in to comment.