Skip to content

Commit

Permalink
rework the snet.Topology
Browse files Browse the repository at this point in the history
  • Loading branch information
lukedirtwalker committed Dec 23, 2024
1 parent e249d71 commit cd48ed8
Show file tree
Hide file tree
Showing 13 changed files with 217 additions and 81 deletions.
41 changes: 18 additions & 23 deletions control/cmd/control/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ func realMain(ctx context.Context) error {
SCIONNetworkMetrics: metrics.SCIONNetworkMetrics,
SCIONPacketConnMetrics: metrics.SCIONPacketConnMetrics,
MTU: topo.MTU(),
Topology: cpInfoProvider{topo: topo},
Topology: adaptTopology(topo),
}
quicStack, err := nc.QUICStack()
if err != nil {
Expand Down Expand Up @@ -945,29 +945,24 @@ func (h *healther) GetCAHealth(ctx context.Context) (api.CAHealthStatus, bool) {
return api.Unavailable, false
}

type cpInfoProvider struct {
topo *topology.Loader
}

func (c cpInfoProvider) LocalIA(_ context.Context) (addr.IA, error) {
return c.topo.IA(), nil
}

func (c cpInfoProvider) PortRange(_ context.Context) (uint16, uint16, error) {
start, end := c.topo.PortRange()
return start, end, nil
}

func (c cpInfoProvider) Interfaces(_ context.Context) (map[uint16]netip.AddrPort, error) {
ifMap := c.topo.InterfaceInfoMap()
ifsToUDP := make(map[uint16]netip.AddrPort, len(ifMap))
for i, v := range ifMap {
if i > (1<<16)-1 {
return nil, serrors.New("invalid interface id", "id", i)
}
ifsToUDP[uint16(i)] = v.InternalAddr
func adaptTopology(topo *topology.Loader) snet.Topology {
start, end := topo.PortRange()
return snet.Topology{
LocalIA: topo.IA(),
PortRange: snet.TopologyPortRange{
Start: start,
End: end,
},
Interface: func(ifID uint16) (netip.AddrPort, bool) {
// XXX(lukedirtwalker): The amount of conversiones between
// netip.AddrPort and *net.UDPAddr is a bit too high...
a := topo.UnderlayNextHop(ifID)
if a == nil {
return netip.AddrPort{}, false
}
return a.AddrPort(), true
},
}
return ifsToUDP, nil
}

func getCAHealth(
Expand Down
16 changes: 11 additions & 5 deletions gateway/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,10 +240,16 @@ func (g *Gateway) Run(ctx context.Context) error {
// *********************************************
// Initialize base SCION network information: IA
// *********************************************
localIA, err := g.Daemon.LocalIA(context.Background())
topoReloader, err := daemon.NewReloadingTopology(ctx, g.Daemon)
if err != nil {
return serrors.Wrap("unable to learn local ISD-AS number", err)
return serrors.Wrap("loading topology", err)
}
topo := topoReloader.Topology()
go func() {
defer log.HandlePanic()
topoReloader.Run(ctx, 10*time.Second)
}()
localIA := topo.LocalIA
logger.Info("Learned local IA from SCION Daemon", "ia", localIA)

// *************************************************************************
Expand Down Expand Up @@ -299,7 +305,7 @@ func (g *Gateway) Run(ctx context.Context) error {
ProbesSendErrors: probesSendErrors,
SCMPErrors: g.Metrics.SCMPErrors,
SCIONPacketConnMetrics: g.Metrics.SCIONPacketConnMetrics,
Topology: g.Daemon,
Topology: topo,
},
PathUpdateInterval: PathUpdateInterval(ctx),
PathFetchTimeout: 0, // using default for now
Expand Down Expand Up @@ -409,7 +415,7 @@ func (g *Gateway) Run(ctx context.Context) error {
// scionNetworkNoSCMP is the network for the QUIC server connection. Because SCMP errors
// will cause the server's accepts to fail, we ignore SCMP.
scionNetworkNoSCMP := &snet.SCIONNetwork{
Topology: g.Daemon,
Topology: topo,
// Discard all SCMP propagation, to avoid accept/read errors on the
// QUIC server/client.
SCMPHandler: snet.SCMPPropagationStopper{
Expand Down Expand Up @@ -472,7 +478,7 @@ func (g *Gateway) Run(ctx context.Context) error {
// scionNetwork is the network for all SCION connections, with the exception of the QUIC server
// and client connection.
scionNetwork := &snet.SCIONNetwork{
Topology: g.Daemon,
Topology: topo,
SCMPHandler: snet.DefaultSCMPHandler{
RevocationHandler: revocationHandler,
SCMPErrors: g.Metrics.SCMPErrors,
Expand Down
2 changes: 2 additions & 0 deletions pkg/daemon/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ go_library(
"daemon.go",
"grpc.go",
"metrics.go",
"topology.go",
],
importpath = "github.com/scionproto/scion/pkg/daemon",
visibility = ["//visibility:public"],
Expand All @@ -15,6 +16,7 @@ go_library(
"//pkg/daemon/internal/metrics:go_default_library",
"//pkg/drkey:go_default_library",
"//pkg/grpc:go_default_library",
"//pkg/log:go_default_library",
"//pkg/metrics:go_default_library",
"//pkg/private/ctrl/path_mgmt:go_default_library",
"//pkg/private/prom:go_default_library",
Expand Down
117 changes: 117 additions & 0 deletions pkg/daemon/topology.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
// Copyright 2024 Anapaya Systems
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package daemon

import (
"context"
"net/netip"
"sync"
"time"

"github.com/scionproto/scion/pkg/log"
"github.com/scionproto/scion/pkg/private/serrors"
"github.com/scionproto/scion/pkg/snet"
)

// LoadTopology loads the local topology from the given connector. The topology
// information is loaded once and does not update automatically.
func LoadTopology(ctx context.Context, conn Connector) (snet.Topology, error) {
ia, err := conn.LocalIA(ctx)
if err != nil {
return snet.Topology{}, serrors.Wrap("loading local ISD-AS", err)
}
start, end, err := conn.PortRange(ctx)
if err != nil {
return snet.Topology{}, serrors.Wrap("loading port range", err)
}
interfaces, err := conn.Interfaces(ctx)
if err != nil {
return snet.Topology{}, serrors.Wrap("loading interfaces", err)
}

return snet.Topology{
LocalIA: ia,
PortRange: snet.TopologyPortRange{
Start: start,
End: end,
},
Interface: func(ifID uint16) (netip.AddrPort, bool) {
a, ok := interfaces[ifID]
return a, ok
},
}, nil
}

// ReloadingTopology is a topology that reloads the interface information
// periodically. It is safe for concurrent use.
type ReloadingTopology struct {
conn Connector
baseTopology snet.Topology
interfaces sync.Map
}

// NewReloadingTopology creates a new ReloadingTopology that reloads the
// interface information periodically. The Run method must be called for
// interface information to be populated.
func NewReloadingTopology(ctx context.Context, conn Connector) (*ReloadingTopology, error) {
topo, err := LoadTopology(ctx, conn)
if err != nil {
return nil, err
}
return &ReloadingTopology{
conn: conn,
baseTopology: topo,
}, nil
}

func (t *ReloadingTopology) Topology() snet.Topology {
base := t.baseTopology
return snet.Topology{
LocalIA: base.LocalIA,
PortRange: base.PortRange,
Interface: func(ifID uint16) (netip.AddrPort, bool) {
a, ok := t.interfaces.Load(ifID)
if !ok {
return netip.AddrPort{}, false
}
return a.(netip.AddrPort), true
},
}
}

func (t *ReloadingTopology) Run(ctx context.Context, period time.Duration) {
ticker := time.NewTicker(period)
defer ticker.Stop()

reload := func() {
intfs, err := t.conn.Interfaces(ctx)
if err != nil {
log.FromCtx(ctx).Error("Failed to reload interfaces", "err", err)
}
for ifID, addr := range intfs {
t.interfaces.Store(ifID, addr)
}
}

reload()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
reload()
}
}
}
15 changes: 3 additions & 12 deletions pkg/snet/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
package snet

import (
"context"
"net"
"time"

Expand Down Expand Up @@ -65,21 +64,13 @@ func NewCookedConn(
options ...ConnOption,
) (*Conn, error) {
o := apply(options)
localIA, err := topo.LocalIA(context.Background())
if err != nil {
return nil, err
}
local := &UDPAddr{
IA: localIA,
IA: topo.LocalIA,
Host: pconn.LocalAddr().(*net.UDPAddr),
}
if local.Host == nil || local.Host.IP.IsUnspecified() {
return nil, serrors.New("nil or unspecified address is not supported.")
}
start, end, err := topo.PortRange(context.Background())
if err != nil {
return nil, err
}
return &Conn{
conn: pconn,
local: local,
Expand All @@ -89,8 +80,8 @@ func NewCookedConn(
buffer: make([]byte, common.SupportedMTU),
local: local,
remote: o.remote,
dispatchedPortStart: start,
dispatchedPortEnd: end,
dispatchedPortStart: topo.PortRange.Start,
dispatchedPortEnd: topo.PortRange.End,
},
scionConnReader: scionConnReader{
conn: pconn,
Expand Down
12 changes: 1 addition & 11 deletions pkg/snet/packet_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
package snet

import (
"context"
"net"
"syscall"
"time"
Expand Down Expand Up @@ -343,16 +342,7 @@ func (c *SCIONPacketConn) lastHop(p *Packet) (*net.UDPAddr, error) {
}

func (c *SCIONPacketConn) ifIDToAddr(ifID uint16) (*net.UDPAddr, error) {
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
defer cancel()
intfs, err := c.Topology.Interfaces(ctx)
if err != nil {
return nil, serrors.Wrap(
"resolving interfaces address (fetching interfaces)", err,
"interface", ifID,
)
}
addrPort, ok := intfs[ifID]
addrPort, ok := c.Topology.Interface(ifID)
if !ok {
return nil, serrors.New("interface number not found", "interface", ifID)
}
Expand Down
29 changes: 19 additions & 10 deletions pkg/snet/snet.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,22 @@ import (
"github.com/scionproto/scion/pkg/private/serrors"
)

// Topology provides local-IA topology information
type Topology interface {
LocalIA(ctx context.Context) (addr.IA, error)
PortRange(ctx context.Context) (uint16, uint16, error)
Interfaces(ctx context.Context) (map[uint16]netip.AddrPort, error)
// Topology provides information about the topology of the local ISD-AS.
type Topology struct {
// LocalIA is local ISD-AS.
LocalIA addr.IA
// PortRange is the directly dispatched port range. Start and End are
// inclusive.
PortRange TopologyPortRange
// Interface provides information about a local interface. If the interface
// is not present, the second return value must be false.
Interface func(uint16) (netip.AddrPort, bool)
}

// TopologyPortRange is the range of ports that are directly dispatched to the
// application. The range is inclusive.
type TopologyPortRange struct {
Start, End uint16
}

var _ Network = (*SCIONNetwork)(nil)
Expand All @@ -68,7 +79,8 @@ type SCIONNetworkMetrics struct {
// SCIONNetwork is the SCION networking context.
type SCIONNetwork struct {
// Topology provides local AS information, needed to handle sockets and
// traffic.
// traffic. Note that the Interfaces method might be called once per packet,
// so an efficient implementation is strongly recommended.
Topology Topology
// ReplyPather is used to create reply paths when reading packets on Conn
// (that implements net.Conn). If unset, the default reply pather is used,
Expand All @@ -91,10 +103,7 @@ func (n *SCIONNetwork) OpenRaw(ctx context.Context, addr *net.UDPAddr) (PacketCo
if addr == nil || addr.IP.IsUnspecified() {
return nil, serrors.New("nil or unspecified address is not supported")
}
start, end, err := n.Topology.PortRange(ctx)
if err != nil {
return nil, err
}
start, end := n.Topology.PortRange.Start, n.Topology.PortRange.End
if addr.Port == 0 {
pconn, err = listenUDPRange(addr, start, end)
} else {
Expand Down
10 changes: 7 additions & 3 deletions private/app/path/path.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,10 @@ func Choose(
if err != nil {
return nil, serrors.Wrap("fetching paths", err)
}
topo, err := daemon.LoadTopology(ctx, conn)
if err != nil {
return nil, serrors.Wrap("loading topology", err)
}
if o.epic {
// Only use paths that support EPIC and intra-AS (empty) paths.
epicPaths := []snet.Path{}
Expand All @@ -102,7 +106,7 @@ func Choose(
paths = epicPaths
}
if o.probeCfg != nil {
paths, err = filterUnhealthy(ctx, paths, remote, conn, o.probeCfg, o.epic)
paths, err = filterUnhealthy(ctx, paths, remote, topo, o.probeCfg, o.epic)
if err != nil {
return nil, serrors.Wrap("probing paths", err)
}
Expand All @@ -121,7 +125,7 @@ func filterUnhealthy(
ctx context.Context,
paths []snet.Path,
remote addr.IA,
sd daemon.Connector,
topo snet.Topology,
cfg *ProbeConfig,
epic bool,
) ([]snet.Path, error) {
Expand All @@ -144,7 +148,7 @@ func filterUnhealthy(
LocalIA: cfg.LocalIA,
LocalIP: cfg.LocalIP,
SCIONPacketConnMetrics: cfg.SCIONPacketConnMetrics,
Topology: sd,
Topology: topo,
}.GetStatuses(subCtx, nonEmptyPaths, pathprobe.WithEPIC(epic))
if err != nil {
return nil, serrors.Wrap("probing paths", err)
Expand Down
Loading

0 comments on commit cd48ed8

Please sign in to comment.