Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pkg/snet: change snet.Topology to a struct #4673

Merged
merged 1 commit into from
Dec 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 16 additions & 23 deletions control/cmd/control/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ func realMain(ctx context.Context) error {
SCIONNetworkMetrics: metrics.SCIONNetworkMetrics,
SCIONPacketConnMetrics: metrics.SCIONPacketConnMetrics,
MTU: topo.MTU(),
Topology: cpInfoProvider{topo: topo},
Topology: adaptTopology(topo),
}
quicStack, err := nc.QUICStack()
if err != nil {
Expand Down Expand Up @@ -945,29 +945,22 @@ func (h *healther) GetCAHealth(ctx context.Context) (api.CAHealthStatus, bool) {
return api.Unavailable, false
}

type cpInfoProvider struct {
topo *topology.Loader
}

func (c cpInfoProvider) LocalIA(_ context.Context) (addr.IA, error) {
return c.topo.IA(), nil
}

func (c cpInfoProvider) PortRange(_ context.Context) (uint16, uint16, error) {
start, end := c.topo.PortRange()
return start, end, nil
}

func (c cpInfoProvider) Interfaces(_ context.Context) (map[uint16]netip.AddrPort, error) {
ifMap := c.topo.InterfaceInfoMap()
ifsToUDP := make(map[uint16]netip.AddrPort, len(ifMap))
for i, v := range ifMap {
if i > (1<<16)-1 {
return nil, serrors.New("invalid interface id", "id", i)
}
ifsToUDP[uint16(i)] = v.InternalAddr
func adaptTopology(topo *topology.Loader) snet.Topology {
start, end := topo.PortRange()
return snet.Topology{
LocalIA: topo.IA(),
PortRange: snet.TopologyPortRange{
Start: start,
End: end,
},
Interface: func(ifID uint16) (netip.AddrPort, bool) {
a := topo.UnderlayNextHop(ifID)
if a == nil {
return netip.AddrPort{}, false
}
return a.AddrPort(), true
},
}
return ifsToUDP, nil
}

func getCAHealth(
Expand Down
16 changes: 11 additions & 5 deletions gateway/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,10 +240,16 @@ func (g *Gateway) Run(ctx context.Context) error {
// *********************************************
// Initialize base SCION network information: IA
// *********************************************
localIA, err := g.Daemon.LocalIA(context.Background())
topoReloader, err := daemon.NewReloadingTopology(ctx, g.Daemon)
if err != nil {
return serrors.Wrap("unable to learn local ISD-AS number", err)
return serrors.Wrap("loading topology", err)
}
topo := topoReloader.Topology()
go func() {
defer log.HandlePanic()
topoReloader.Run(ctx, 10*time.Second)
}()
localIA := topo.LocalIA
logger.Info("Learned local IA from SCION Daemon", "ia", localIA)

// *************************************************************************
Expand Down Expand Up @@ -299,7 +305,7 @@ func (g *Gateway) Run(ctx context.Context) error {
ProbesSendErrors: probesSendErrors,
SCMPErrors: g.Metrics.SCMPErrors,
SCIONPacketConnMetrics: g.Metrics.SCIONPacketConnMetrics,
Topology: g.Daemon,
Topology: topo,
},
PathUpdateInterval: PathUpdateInterval(ctx),
PathFetchTimeout: 0, // using default for now
Expand Down Expand Up @@ -409,7 +415,7 @@ func (g *Gateway) Run(ctx context.Context) error {
// scionNetworkNoSCMP is the network for the QUIC server connection. Because SCMP errors
// will cause the server's accepts to fail, we ignore SCMP.
scionNetworkNoSCMP := &snet.SCIONNetwork{
Topology: g.Daemon,
Topology: topo,
// Discard all SCMP propagation, to avoid accept/read errors on the
// QUIC server/client.
SCMPHandler: snet.SCMPPropagationStopper{
Expand Down Expand Up @@ -472,7 +478,7 @@ func (g *Gateway) Run(ctx context.Context) error {
// scionNetwork is the network for all SCION connections, with the exception of the QUIC server
// and client connection.
scionNetwork := &snet.SCIONNetwork{
Topology: g.Daemon,
Topology: topo,
SCMPHandler: snet.DefaultSCMPHandler{
RevocationHandler: revocationHandler,
SCMPErrors: g.Metrics.SCMPErrors,
Expand Down
17 changes: 16 additions & 1 deletion pkg/daemon/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
load("//tools/lint:go.bzl", "go_library")
load("//tools/lint:go.bzl", "go_library", "go_test")

go_library(
name = "go_default_library",
Expand All @@ -7,6 +7,7 @@ go_library(
"daemon.go",
"grpc.go",
"metrics.go",
"topology.go",
],
importpath = "github.com/scionproto/scion/pkg/daemon",
visibility = ["//visibility:public"],
Expand All @@ -15,6 +16,7 @@ go_library(
"//pkg/daemon/internal/metrics:go_default_library",
"//pkg/drkey:go_default_library",
"//pkg/grpc:go_default_library",
"//pkg/log:go_default_library",
"//pkg/metrics:go_default_library",
"//pkg/private/ctrl/path_mgmt:go_default_library",
"//pkg/private/prom:go_default_library",
Expand All @@ -32,3 +34,16 @@ go_library(
"@org_golang_google_protobuf//types/known/timestamppb:go_default_library",
],
)

go_test(
name = "go_default_test",
srcs = ["topology_test.go"],
deps = [
":go_default_library",
"//pkg/addr:go_default_library",
"//pkg/daemon/mock_daemon:go_default_library",
"//pkg/snet:go_default_library",
"@com_github_golang_mock//gomock:go_default_library",
"@com_github_stretchr_testify//assert:go_default_library",
],
)
135 changes: 135 additions & 0 deletions pkg/daemon/topology.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
// Copyright 2024 Anapaya Systems
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package daemon

import (
"context"
"net/netip"
"sync/atomic"
"time"

"github.com/scionproto/scion/pkg/log"
"github.com/scionproto/scion/pkg/private/serrors"
"github.com/scionproto/scion/pkg/snet"
)

// LoadTopology loads the local topology from the given connector. The topology
// information is loaded once and does not update automatically.
func LoadTopology(ctx context.Context, conn Connector) (snet.Topology, error) {
ia, err := conn.LocalIA(ctx)
if err != nil {
return snet.Topology{}, serrors.Wrap("loading local ISD-AS", err)
}
start, end, err := conn.PortRange(ctx)
if err != nil {
return snet.Topology{}, serrors.Wrap("loading port range", err)
}
interfaces, err := conn.Interfaces(ctx)
if err != nil {
return snet.Topology{}, serrors.Wrap("loading interfaces", err)
}

return snet.Topology{
LocalIA: ia,
PortRange: snet.TopologyPortRange{
Start: start,
End: end,
},
Interface: func(ifID uint16) (netip.AddrPort, bool) {
a, ok := interfaces[ifID]
return a, ok
},
}, nil
}

// ReloadingTopology is a topology that reloads the interface information
// periodically. It is safe for concurrent use.
type ReloadingTopology struct {
conn Connector
baseTopology snet.Topology
interfaces atomic.Pointer[map[uint16]netip.AddrPort]
}

// NewReloadingTopology creates a new ReloadingTopology that reloads the
// interface information periodically. The Run method must be called for
// interface information to be populated.
func NewReloadingTopology(ctx context.Context, conn Connector) (*ReloadingTopology, error) {
ia, err := conn.LocalIA(ctx)
if err != nil {
return nil, serrors.Wrap("loading local ISD-AS", err)
}
start, end, err := conn.PortRange(ctx)
if err != nil {
return nil, serrors.Wrap("loading port range", err)
}
t := &ReloadingTopology{
conn: conn,
baseTopology: snet.Topology{
LocalIA: ia,
PortRange: snet.TopologyPortRange{Start: start, End: end},
},
}
if err := t.loadInterfaces(ctx); err != nil {
return nil, err
}
return t, nil
}

func (t *ReloadingTopology) Topology() snet.Topology {
base := t.baseTopology
return snet.Topology{
LocalIA: base.LocalIA,
PortRange: base.PortRange,
Interface: func(ifID uint16) (netip.AddrPort, bool) {
m := t.interfaces.Load()
if m == nil {
return netip.AddrPort{}, false
}
a, ok := (*m)[ifID]
return a, ok
},
}
}

func (t *ReloadingTopology) Run(ctx context.Context, period time.Duration) {
ticker := time.NewTicker(period)
defer ticker.Stop()

reload := func() {
ctx, cancel := context.WithTimeout(ctx, time.Second)
defer cancel()
if err := t.loadInterfaces(ctx); err != nil {
log.FromCtx(ctx).Error("Failed to reload interfaces", "err", err)
}
}
reload()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
reload()
}
}
}

func (t *ReloadingTopology) loadInterfaces(ctx context.Context) error {
intfs, err := t.conn.Interfaces(ctx)
if err != nil {
return err
}
t.interfaces.Store(&intfs)
return nil
}
Loading
Loading