From 1443c955feec90ded77cbddddbf60cb18533adbe Mon Sep 17 00:00:00 2001 From: Jean-Christophe Hugly Date: Fri, 29 Nov 2024 19:56:52 +0100 Subject: [PATCH] Just a checkpoint. All of this still WIP. --- router/cmd/router/main.go | 12 +- router/dataplane.go | 434 ++++++++++++++++++++++-------- router/dataplane_internal_test.go | 2 +- router/dataplane_test.go | 58 ++-- router/export_test.go | 2 +- 5 files changed, 354 insertions(+), 154 deletions(-) diff --git a/router/cmd/router/main.go b/router/cmd/router/main.go index af8b2fd0fe..178f81bd06 100644 --- a/router/cmd/router/main.go +++ b/router/cmd/router/main.go @@ -61,6 +61,11 @@ func realMain(ctx context.Context) error { DataPlane: router.DataPlane{ Metrics: metrics, ExperimentalSCMPAuthentication: globalCfg.Features.ExperimentalSCMPAuthentication, + RunConfig: router.RunConfig{ + NumProcessors: globalCfg.Router.NumProcessors, + NumSlowPathProcessors: globalCfg.Router.NumSlowPathProcessors, + BatchSize: globalCfg.Router.BatchSize, + }, }, ReceiveBufferSize: globalCfg.Router.ReceiveBufferSize, SendBufferSize: globalCfg.Router.SendBufferSize, @@ -128,12 +133,7 @@ func realMain(ctx context.Context) error { }) g.Go(func() error { defer log.HandlePanic() - runConfig := &router.RunConfig{ - NumProcessors: globalCfg.Router.NumProcessors, - NumSlowPathProcessors: globalCfg.Router.NumSlowPathProcessors, - BatchSize: globalCfg.Router.BatchSize, - } - if err := dp.DataPlane.Run(errCtx, runConfig); err != nil { + if err := dp.DataPlane.Run(errCtx); err != nil { return serrors.Wrap("running dataplane", err) } return nil diff --git a/router/dataplane.go b/router/dataplane.go index 6c407ab1c6..f690c3920f 100644 --- a/router/dataplane.go +++ b/router/dataplane.go @@ -91,6 +91,29 @@ type BatchConn interface { Close() error } +// LinkScope describes the kind (or scope) of a link: internal, sibling, or external. +type linkScope int + +const ( + internal linkScope = iota // to/from end-hosts in the local AS + sibling // to/from (external interfaces owned by) a sibling router + external // to/from routers in another AS +) + +// link embodies the router's idea of an underlay connection. It associates the underlay connection, +// with a bfdSession, a destination address, (and more in the future). It also allows the send +// operation to be underlay-dependant. It can be (in the future) registered with the underlay +// implementation so incoming packets can be easily associated to a link. +// TODO(jiceatscion): use inheritence between implementations. +type link interface { + getScope() linkScope + getBfdSession() bfdSession + isUp() bool + getRemote() netip.AddrPort + getIfID() uint16 + send(p *packet) bool +} + type disposition int const ( @@ -169,12 +192,188 @@ func (p *packet) reset() { // Everything else is reset to zero value. } +// An external link is connection oriented. And is not shared. As a result, it has a defined +// interfaceID and a fixed destination address. +type externalLink struct { + // Each link has a queue, consumed by a forwarding task and fed by bfd senders + // and packet processors. + queue chan *packet + bfdSession bfdSession + remote netip.AddrPort + ifID uint16 +} + +func newExternalLink( + queue chan *packet, + bfdSession bfdSession, + remote netip.AddrPort, + ifID uint16, +) *externalLink { + + return &externalLink{ + queue: queue, + bfdSession: bfdSession, + remote: remote, + ifID: ifID, + } +} + +func (l *externalLink) getScope() linkScope { + return external +} + +func (l *externalLink) getBfdSession() bfdSession { + return l.bfdSession +} + +func (l *externalLink) isUp() bool { + return l.bfdSession == nil || l.bfdSession.IsUp() +} + +func (l *externalLink) getRemote() netip.AddrPort { + return l.remote +} + +func (l *externalLink) getIfID() uint16 { + return l.ifID +} + +// The goal is to have send do those things that are dependent on the underlay connection type +// such as updating the destination address of the pkt with the known bound one. +// TODO(jiceatscion): indeed, do it. +func (l *externalLink) send(p *packet) bool { + select { + case l.queue <- p: + default: + return false + } + return true +} + +// A siblingLink is connection oriented. However, it may be shared between multiple interface IDs. +// So, it has no defined interface ID but a fixed destination address. +type siblingLink struct { + // Each link has a queue, consumed by a forwarding task and fed by bfd senders + // and packet processors. + queue chan *packet + bfdSession bfdSession + remote netip.AddrPort + ifID uint16 +} + +func newSiblingLink( + queue chan *packet, + bfdSession bfdSession, + remote netip.AddrPort, +) *siblingLink { + + return &siblingLink{ + queue: queue, + bfdSession: bfdSession, + remote: remote, + } +} + +func (l *siblingLink) getScope() linkScope { + return sibling +} +func (l *siblingLink) getBfdSession() bfdSession { + return l.bfdSession +} + +func (l *siblingLink) isUp() bool { + return l.bfdSession == nil || l.bfdSession.IsUp() +} + +func (l *siblingLink) getRemote() netip.AddrPort { + return l.remote +} + +func (l *siblingLink) getIfID() uint16 { + return 0 +} + +func (l *siblingLink) send(p *packet) bool { + select { + case l.queue <- p: + default: + return false + } + return true +} + +// An internalLink is not connection oriented and it is not associated with an AS border interface. +// So, it has no defined interface ID and no fixed destination address. It can't have a BFD session +// either. So, it's a link only in name. May be, in the future, we find it more convenient to not +// treat such connections as links. For now, it helps with code migration. Incoming packets are +// associated with the link they came in through by way of the underlay source address. +type internalLink struct { + // Each link has a queue, consumed by a forwarding task and fed by bfd senders + // and packet processors. + queue chan *packet +} + +func newInternalLink(queue chan *packet) *internalLink { + return &internalLink{ + queue: queue, + } +} + +func (l *internalLink) getScope() linkScope { + return internal +} + +func (l *internalLink) isUp() bool { + return true +} + +func (l *internalLink) getBfdSession() bfdSession { + return nil +} + +func (l *internalLink) getRemote() netip.AddrPort { + return netip.AddrPort{ /* fixme; we have a special value */ } +} + +func (l *internalLink) getIfID() uint16 { + return 0 +} + +func (l *internalLink) send(p *packet) bool { + select { + case l.queue <- p: + default: + return false + } + return true +} + +// Connections can be shared between links. So we keep a cannonical map. +// Also, there must be one sending loop and one channel per connection. So that goes here. +// This business should be moved to underlay implementations, but... one problem at a time. +// So, this is a minimal implementation to support the single underlay we have today. +type forwarder struct { + conn BatchConn + fwQ chan *packet + ifID uint16 // non-zero only when not shared bw links. +} + +func newForwarder(conn BatchConn, queueSize int, ifID uint16) *forwarder { + return &forwarder{ + conn: conn, + fwQ: make(chan *packet, queueSize), + ifID: ifID, + } +} + // DataPlane contains a SCION Border Router's forwarding logic. It reads packets // from multiple sockets, performs routing, and sends them to their destinations // (after updating the path, if that is needed). type DataPlane struct { - interfaces map[uint16]BatchConn - external map[uint16]BatchConn + links map[netip.AddrPort]link + forwarders map[netip.AddrPort]*forwarder // We do this here as a refactoring step. + interfaces map[uint16]link + external map[uint16]struct{} // TODO: to be replaced w/ e.g. interfaces[x].getScope() linkTypes map[uint16]topology.LinkType neighborIAs map[uint16]addr.IA peerInterfaces map[uint16]uint16 @@ -192,10 +391,7 @@ type DataPlane struct { dispatchedPortEnd uint16 ExperimentalSCMPAuthentication bool - - // The forwarding queues. Each is consumed by a forwarder process and fed by - // one bfd sender and the packet processors. - fwQs map[uint16]chan *packet + RunConfig RunConfig // The pool that stores all the packet buffers as described in the design document. See // https://github.com/scionproto/scion/blob/master/doc/dev/design/BorderRouter.rst @@ -234,6 +430,7 @@ var ( ingressInterfaceInvalid = errors.New("ingress interface invalid") macVerificationFailed = errors.New("MAC verification failed") badPacketSize = errors.New("bad packet size") + notImplemented = errors.New("Not Yet Implemented") // zeroBuffer will be used to reset the Authenticator option in the // scionPacketProcessor.OptAuth @@ -339,12 +536,19 @@ func (d *DataPlane) AddInternalInterface(conn BatchConn, ip netip.Addr) error { return emptyValue } if d.interfaces == nil { - d.interfaces = make(map[uint16]BatchConn) + d.interfaces = make(map[uint16]link) + d.forwarders = make(map[netip.AddrPort]*forwarder) } else if d.interfaces[0] != nil { return alreadySet } - d.interfaces[0] = conn + + f := newForwarder(conn, d.RunConfig.BatchSize, 0) + d.forwarders[netip.AddrPort{}] = f + d.interfaces[0] = newInternalLink(f.fwQ) + // That is not a real link => not in the links map; should be? + d.internalIP = ip + return nil } @@ -363,21 +567,31 @@ func (d *DataPlane) AddExternalInterface(ifID uint16, conn BatchConn, if conn == nil || !src.Addr.IsValid() || !dst.Addr.IsValid() { return emptyValue } - err := d.addExternalInterfaceBFD(ifID, src, dst, cfg) + bfd, err := d.addExternalInterfaceBFD(ifID, src, dst, cfg) if err != nil { return serrors.Wrap("adding external BFD", err, "if_id", ifID) } if d.external == nil { - d.external = make(map[uint16]BatchConn) + d.external = make(map[uint16]struct{}) } if d.interfaces == nil { - d.interfaces = make(map[uint16]BatchConn) + d.interfaces = make(map[uint16]link) + } + if d.forwarders == nil { + d.forwarders = make(map[netip.AddrPort]*forwarder) } if _, exists := d.external[ifID]; exists { return serrors.JoinNoStack(alreadySet, nil, "ifID", ifID) } - d.interfaces[ifID] = conn - d.external[ifID] = conn + if d.forwarders[dst.Addr] != nil { + return serrors.JoinNoStack(alreadySet, nil, "dst Addr", dst.Addr) + } + fw := newForwarder(conn, d.RunConfig.BatchSize, ifID) + lk := newExternalLink(fw.fwQ, bfd, dst.Addr, ifID) + d.forwarders[dst.Addr] = fw + d.links[dst.Addr] = lk + d.interfaces[ifID] = lk + d.external[ifID] = struct{}{} return nil } @@ -442,10 +656,10 @@ func (d *DataPlane) AddRemotePeer(local, remote uint16) error { // AddExternalInterfaceBFD adds the inter AS connection BFD session. func (d *DataPlane) addExternalInterfaceBFD(ifID uint16, - src, dst control.LinkEnd, cfg control.BFD) error { + src, dst control.LinkEnd, cfg control.BFD) (bfdSession, error) { if *cfg.Disable { - return nil + return nil, nil } var m bfd.Metrics if d.Metrics != nil { @@ -463,7 +677,7 @@ func (d *DataPlane) addExternalInterfaceBFD(ifID uint16, } s, err := newBFDSend(d, src.IA, dst.IA, src.Addr, dst.Addr, ifID, d.macFactory()) if err != nil { - return err + return nil, err } return d.addBFDController(ifID, s, cfg, m) } @@ -472,27 +686,22 @@ func (d *DataPlane) addExternalInterfaceBFD(ifID uint16, // returns InterfaceUp if the relevant bfdsession state is up, or if there is no BFD // session. Otherwise, it returns InterfaceDown. func (d *DataPlane) getInterfaceState(ifID uint16) control.InterfaceState { - bfdSessions := d.bfdSessions - if bfdSession, ok := bfdSessions[ifID]; ok && !bfdSession.IsUp() { + if l := d.interfaces[ifID]; l != nil && !l.isUp() { return control.InterfaceDown } return control.InterfaceUp } func (d *DataPlane) addBFDController(ifID uint16, s bfd.Sender, cfg control.BFD, - metrics bfd.Metrics) error { - - if d.bfdSessions == nil { - d.bfdSessions = make(map[uint16]bfdSession) - } + metrics bfd.Metrics) (bfdSession, error) { // Generate random discriminator. It can't be zero. discInt, err := rand.Int(rand.Reader, big.NewInt(0xfffffffe)) if err != nil { - return err + return nil, err } disc := layers.BFDDiscriminator(uint32(discInt.Uint64()) + 1) - d.bfdSessions[ifID] = &bfd.Session{ + return &bfd.Session{ Sender: s, DetectMult: layers.BFDDetectMultiplier(cfg.DetectMult), DesiredMinTxInterval: cfg.DesiredMinTxInterval, @@ -500,8 +709,7 @@ func (d *DataPlane) addBFDController(ifID uint16, s bfd.Sender, cfg control.BFD, LocalDiscriminator: disc, ReceiveQueueSize: 10, Metrics: metrics, - } - return nil + }, nil } // AddSvc adds the address for the given service. This can be called multiple @@ -559,7 +767,7 @@ func (d *DataPlane) AddNextHop(ifID uint16, src, dst netip.AddrPort, cfg control if !dst.IsValid() || !src.IsValid() { return emptyValue } - err := d.addNextHopBFD(ifID, src, dst, cfg, sibling) + bfd, err := d.addNextHopBFD(ifID, src, dst, cfg, sibling) if err != nil { return serrors.Wrap("adding next hop BFD", err, "if_id", ifID) } @@ -569,6 +777,16 @@ func (d *DataPlane) AddNextHop(ifID uint16, src, dst netip.AddrPort, cfg control if _, exists := d.internalNextHops[ifID]; exists { return serrors.JoinNoStack(alreadySet, nil, "ifID", ifID) } + + // We have one link per unique dst address. + sib := d.links[dst] + if sib == nil { + // As a refactoring step, re-use the internal connection's forwarder, which is what we've + // been effectively doing so far. + sib := newSiblingLink(d.forwarders[netip.AddrPort{}].fwQ, bfd, dst) + d.links[dst] = sib + } + d.interfaces[ifID] = sib d.internalNextHops[ifID] = dst return nil } @@ -577,18 +795,10 @@ func (d *DataPlane) AddNextHop(ifID uint16, src, dst netip.AddrPort, cfg control // If the remote ifID belongs to an existing address, the existing // BFD session will be re-used. func (d *DataPlane) addNextHopBFD(ifID uint16, src, dst netip.AddrPort, cfg control.BFD, - sibling string) error { + sibling string) (bfdSession, error) { if *cfg.Disable { - return nil - } - for k, v := range d.internalNextHops { - if v.String() == dst.String() { - if c, ok := d.bfdSessions[k]; ok { - d.bfdSessions[ifID] = c - return nil - } - } + return nil, nil } var m bfd.Metrics if d.Metrics != nil { @@ -603,7 +813,7 @@ func (d *DataPlane) addNextHopBFD(ifID uint16, src, dst netip.AddrPort, cfg cont s, err := newBFDSend(d, d.localIA, d.localIA, src, dst, 0, d.macFactory()) if err != nil { - return err + return nil, err } return d.addBFDController(ifID, s, cfg, m) } @@ -621,49 +831,48 @@ type RunConfig struct { BatchSize int } -func (d *DataPlane) Run(ctx context.Context, cfg *RunConfig) error { +func (d *DataPlane) Run(ctx context.Context) error { d.mtx.Lock() d.initMetrics() processorQueueSize := max( - len(d.interfaces)*cfg.BatchSize/cfg.NumProcessors, - cfg.BatchSize) + len(d.forwarders)*d.RunConfig.BatchSize/d.RunConfig.NumProcessors, + d.RunConfig.BatchSize) - d.initPacketPool(cfg, processorQueueSize) - procQs, fwQs, slowQs := initQueues(cfg, d.interfaces, processorQueueSize) - d.fwQs = fwQs // Shared with BFD senders + d.initPacketPool(processorQueueSize) + procQs, slowQs := d.initQueues(processorQueueSize) d.setRunning() - for ifID, conn := range d.interfaces { - go func(ifID uint16, conn BatchConn) { + for _, f := range d.forwarders { + go func(f *forwarder) { defer log.HandlePanic() - d.runReceiver(ifID, conn, cfg, procQs) - }(ifID, conn) - go func(ifID uint16, conn BatchConn) { + d.runReceiver(f, procQs) + }(f) + go func(f *forwarder) { defer log.HandlePanic() - d.runForwarder(ifID, conn, cfg, fwQs[ifID]) - }(ifID, conn) + d.runForwarder(f) + }(f) } - for i := 0; i < cfg.NumProcessors; i++ { + for i := 0; i < d.RunConfig.NumProcessors; i++ { go func(i int) { defer log.HandlePanic() - d.runProcessor(i, procQs[i], fwQs, slowQs[i%cfg.NumSlowPathProcessors]) + d.runProcessor(i, procQs[i], slowQs[i%d.RunConfig.NumSlowPathProcessors]) }(i) } - for i := 0; i < cfg.NumSlowPathProcessors; i++ { + for i := 0; i < d.RunConfig.NumSlowPathProcessors; i++ { go func(i int) { defer log.HandlePanic() - d.runSlowPathProcessor(i, slowQs[i], fwQs) + d.runSlowPathProcessor(i, slowQs[i]) }(i) } - for k, v := range d.bfdSessions { - go func(ifID uint16, c bfdSession) { + for a, l := range d.links { + go func(a netip.AddrPort, s bfdSession) { defer log.HandlePanic() - if err := c.Run(ctx); err != nil && err != bfd.AlreadyRunning { - log.Error("BFD session failed to start", "ifID", ifID, "err", err) + if err := s.Run(ctx); err != nil && err != bfd.AlreadyRunning { + log.Error("BFD session failed to start", "remote address", a, "err", err) } - }(k, v) + }(a, l.getBfdSession()) } d.mtx.Unlock() @@ -673,10 +882,10 @@ func (d *DataPlane) Run(ctx context.Context, cfg *RunConfig) error { // initializePacketPool calculates the size of the packet pool based on the // current dataplane settings and allocates all the buffers -func (d *DataPlane) initPacketPool(cfg *RunConfig, processorQueueSize int) { - poolSize := len(d.interfaces)*cfg.BatchSize + - (cfg.NumProcessors+cfg.NumSlowPathProcessors)*(processorQueueSize+1) + - len(d.interfaces)*(2*cfg.BatchSize) +func (d *DataPlane) initPacketPool(processorQueueSize int) { + poolSize := len(d.interfaces)*d.RunConfig.BatchSize + + (d.RunConfig.NumProcessors+d.RunConfig.NumSlowPathProcessors)*(processorQueueSize+1) + + len(d.interfaces)*(2*d.RunConfig.BatchSize) log.Debug("Initialize packet pool of size", "poolSize", poolSize) d.packetPool = make(chan *packet, poolSize) @@ -687,29 +896,24 @@ func (d *DataPlane) initPacketPool(cfg *RunConfig, processorQueueSize int) { } } -// initializes the processing routines and forwarders queues -func initQueues(cfg *RunConfig, interfaces map[uint16]BatchConn, - processorQueueSize int) ([]chan *packet, map[uint16]chan *packet, []chan *packet) { +// initializes the processing routines and queues +func (d *DataPlane) initQueues(processorQueueSize int) ([]chan *packet, []chan *packet) { - procQs := make([]chan *packet, cfg.NumProcessors) - for i := 0; i < cfg.NumProcessors; i++ { + procQs := make([]chan *packet, d.RunConfig.NumProcessors) + for i := 0; i < d.RunConfig.NumProcessors; i++ { procQs[i] = make(chan *packet, processorQueueSize) } - slowQs := make([]chan *packet, cfg.NumSlowPathProcessors) - for i := 0; i < cfg.NumSlowPathProcessors; i++ { + slowQs := make([]chan *packet, d.RunConfig.NumSlowPathProcessors) + for i := 0; i < d.RunConfig.NumSlowPathProcessors; i++ { slowQs[i] = make(chan *packet, processorQueueSize) } - fwQs := make(map[uint16]chan *packet) - for ifID := range interfaces { - fwQs[ifID] = make(chan *packet, cfg.BatchSize) - } - return procQs, fwQs, slowQs + return procQs, slowQs } -func (d *DataPlane) runReceiver(ifID uint16, conn BatchConn, cfg *RunConfig, - procQs []chan *packet) { +func (d *DataPlane) runReceiver(f *forwarder, procQs []chan *packet) { - log.Debug("Run receiver for", "interface", ifID) + // TODO(jiceatscion): ifID isn't veyr unique. It is zero for internal and all sibling links. + log.Debug("Run receiver for", "interface", f.ifID) // Each receiver (therefore each input interface) has a unique random seed for the procID hash // function. @@ -724,21 +928,21 @@ func (d *DataPlane) runReceiver(ifID uint16, conn BatchConn, cfg *RunConfig, // A collection of socket messages, as the readBatch API expects them. We keep using the same // collection, call after call; only replacing the buffer. - msgs := underlayconn.NewReadMessages(cfg.BatchSize) + msgs := underlayconn.NewReadMessages(d.RunConfig.BatchSize) // An array of corresponding packet references. Each corresponds to one msg. // The packet owns the buffer that we set in the matching msg, plus the metadata that we'll add. - packets := make([]*packet, cfg.BatchSize) + packets := make([]*packet, d.RunConfig.BatchSize) - numReusable := 0 // unused buffers from previous loop - metrics := d.forwardingMetrics[ifID] // If receiver exists, fw metrics exist too. + numReusable := 0 // unused buffers from previous loop + metrics := d.forwardingMetrics[f.ifID] // If receiver exists, fw metrics exist too. enqueueForProcessing := func(size int, srcAddr *net.UDPAddr, pkt *packet) { sc := classOfSize(size) metrics[sc].InputPacketsTotal.Inc() metrics[sc].InputBytesTotal.Add(float64(size)) - procID, err := computeProcID(pkt.rawPacket, cfg.NumProcessors, hashSeed) + procID, err := computeProcID(pkt.rawPacket, d.RunConfig.NumProcessors, hashSeed) if err != nil { log.Debug("Error while computing procID", "err", err) d.returnPacketToPool(pkt) @@ -747,7 +951,7 @@ func (d *DataPlane) runReceiver(ifID uint16, conn BatchConn, cfg *RunConfig, } pkt.rawPacket = pkt.rawPacket[:size] // Update size; readBatch does not. - pkt.ingress = ifID + pkt.ingress = f.ifID pkt.srcAddr = srcAddr select { case procQs[procID] <- pkt: @@ -761,7 +965,7 @@ func (d *DataPlane) runReceiver(ifID uint16, conn BatchConn, cfg *RunConfig, // collect packets. // Give a new buffer to the msgs elements that have been used in the previous loop. - for i := 0; i < cfg.BatchSize-numReusable; i++ { + for i := 0; i < d.RunConfig.BatchSize-numReusable; i++ { p := d.getPacketFromPool() p.reset() packets[i] = p @@ -769,10 +973,10 @@ func (d *DataPlane) runReceiver(ifID uint16, conn BatchConn, cfg *RunConfig, } // Fill the packets - numPkts, err := conn.ReadBatch(msgs) + numPkts, err := f.conn.ReadBatch(msgs) numReusable = len(msgs) - numPkts if err != nil { - log.Debug("Error while reading batch", "interfaceID", ifID, "err", err) + log.Debug("Error while reading batch", "interfaceID", f.ifID, "err", err) continue } for i, msg := range msgs[:numPkts] { @@ -816,8 +1020,7 @@ func (d *DataPlane) returnPacketToPool(pkt *packet) { d.packetPool <- pkt } -func (d *DataPlane) runProcessor(id int, q <-chan *packet, - fwQs map[uint16]chan *packet, slowQ chan<- *packet) { +func (d *DataPlane) runProcessor(id int, q <-chan *packet, slowQ chan<- *packet) { log.Debug("Initialize processor with", "id", id) processor := newPacketProcessor(d) @@ -856,25 +1059,21 @@ func (d *DataPlane) runProcessor(id int, q <-chan *packet, d.returnPacketToPool(p) continue } - fwCh, ok := fwQs[p.egress] + fwLink, ok := d.interfaces[p.egress] if !ok { log.Debug("Error determining forwarder. Egress is invalid", "egress", p.egress) metrics.DroppedPacketsInvalid.Inc() d.returnPacketToPool(p) continue } - - select { - case fwCh <- p: - default: + if !fwLink.send(p) { d.returnPacketToPool(p) metrics.DroppedPacketsBusyForwarder.Inc() } } } -func (d *DataPlane) runSlowPathProcessor(id int, q <-chan *packet, - fwQs map[uint16]chan *packet) { +func (d *DataPlane) runSlowPathProcessor(id int, q <-chan *packet) { log.Debug("Initialize slow-path processor with", "id", id) processor := newSlowPathProcessor(d) @@ -892,15 +1091,13 @@ func (d *DataPlane) runSlowPathProcessor(id int, q <-chan *packet, d.returnPacketToPool(p) continue } - fwCh, ok := fwQs[p.egress] + fwLink, ok := d.interfaces[p.egress] if !ok { log.Debug("Error determining forwarder. Egress is invalid", "egress", p.egress) d.returnPacketToPool(p) continue } - select { - case fwCh <- p: - default: + if !fwLink.send(p) { d.returnPacketToPool(p) } } @@ -1031,25 +1228,27 @@ func updateOutputMetrics(metrics interfaceMetrics, packets []*packet) { } } -func (d *DataPlane) runForwarder(ifID uint16, conn BatchConn, cfg *RunConfig, c <-chan *packet) { +// It would make more sense for this to be part of Forwarder +func (d *DataPlane) runForwarder(f *forwarder) { - log.Debug("Initialize forwarder for", "interface", ifID) + // TODO(jiceatscion): ifID isn't so informative; it is zero for internal and all sibling links. + log.Debug("Initialize forwarder for", "interface", f.ifID) // We use this somewhat like a ring buffer. - pkts := make([]*packet, cfg.BatchSize) + pkts := make([]*packet, d.RunConfig.BatchSize) // We use this as a temporary buffer, but allocate it just once // to save on garbage handling. - msgs := make(underlayconn.Messages, cfg.BatchSize) + msgs := make(underlayconn.Messages, d.RunConfig.BatchSize) for i := range msgs { msgs[i].Buffers = make([][]byte, 1) } - metrics := d.forwardingMetrics[ifID] + metrics := d.forwardingMetrics[f.ifID] toWrite := 0 for d.IsRunning() { - toWrite += readUpTo(c, cfg.BatchSize-toWrite, toWrite == 0, pkts[toWrite:]) + toWrite += readUpTo(f.fwQ, d.RunConfig.BatchSize-toWrite, toWrite == 0, pkts[toWrite:]) // Turn the packets into underlay messages that WriteBatch can send. for i, p := range pkts[:toWrite] { @@ -1059,7 +1258,7 @@ func (d *DataPlane) runForwarder(ifID uint16, conn BatchConn, cfg *RunConfig, c msgs[i].Addr = p.dstAddr } } - written, _ := conn.WriteBatch(msgs[:toWrite], 0) + written, _ := f.conn.WriteBatch(msgs[:toWrite], 0) if written < 0 { // WriteBatch returns -1 on error, we just consider this as // 0 packets written @@ -1543,7 +1742,13 @@ func (p *scionPacketProcessor) validateTransitUnderlaySrc() disposition { return pForward } -// Validates the egress interface referenced by the current hop. +// Validates the egress interface referenced by the current hop. This is not called for +// packets to be delivered to the local AS, so pkt.egress is never 0. +// If pkt.ingress is zero, the packet can be coming from either a local end-host or a +// sibling router. In either of these cases, it must be leaving via a locally owned external +// interface (i.e. it can be going to a sibling router or to a local end-host). On the other +// hand, a packet coming directly from another AS can be going anywhere: local delivery, +// to another AS directly, or via a sibling router. func (p *scionPacketProcessor) validateEgressID() disposition { egressID := p.pkt.egress _, ih := p.d.internalNextHops[egressID] @@ -2408,16 +2613,14 @@ func (b *bfdSend) Send(bfd *layers.BFD) error { // BfdControllers and fwQs are initialized from the same set of ifIDs. So not finding // the forwarding queue is an serious internal error. Let that panic. - fwChan := b.dataPlane.fwQs[b.ifID] + fwLink := b.dataPlane.interfaces[b.ifID] if b.ifID == 0 { // Using the internal interface: must specify the destination address updateNetAddrFromAddrPort(p.dstAddr, b.dstAddr) } // No need to specify pkt.egress. It isn't used downstream from here. - select { - case fwChan <- p: - default: + if !fwLink.send(p) { // We do not care if some BFD packets get bounced under high load. If it becomes a problem, // the solution is do use BFD's demand-mode. To be considered in a future refactoring. b.dataPlane.returnPacketToPool(p) @@ -2739,9 +2942,6 @@ func (d *DataPlane) initMetrics() { d.forwardingMetrics = make(map[uint16]interfaceMetrics) d.forwardingMetrics[0] = newInterfaceMetrics(d.Metrics, 0, d.localIA, d.neighborIAs) for ifID := range d.external { - if _, notOwned := d.internalNextHops[ifID]; notOwned { - continue - } d.forwardingMetrics[ifID] = newInterfaceMetrics(d.Metrics, ifID, d.localIA, d.neighborIAs) } diff --git a/router/dataplane_internal_test.go b/router/dataplane_internal_test.go index f6274f5f3c..1132f7518e 100644 --- a/router/dataplane_internal_test.go +++ b/router/dataplane_internal_test.go @@ -432,7 +432,7 @@ func TestSlowPathProcessing(t *testing.T) { // ProcessPacket assumes some pre-conditions: // * The ingress interface has to exist. This fake map is good for the test cases we have. // * InternalNextHops may not be nil. Empty is ok for all the test cases we have. - fakeExternalInterfaces := map[uint16]BatchConn{1: nil} + fakeExternalInterfaces := map[uint16]struct{}{1: struct{}{}} fakeInternalNextHops := map[uint16]netip.AddrPort{} fakeServices := map[addr.SVC][]netip.AddrPort{} diff --git a/router/dataplane_test.go b/router/dataplane_test.go index 1c812f9816..9298aa23c6 100644 --- a/router/dataplane_test.go +++ b/router/dataplane_test.go @@ -643,7 +643,7 @@ func TestProcessPkt(t *testing.T) { // * The ingress interface has to exist. This fake map is good for most test cases. // Others need a custom one. // * InternalNextHops may not be nil. Empty is ok (sufficient unless testing AS transit). - fakeExternalInterfaces := map[uint16]router.BatchConn{1: nil, 2: nil, 3: nil} + fakeExternalInterfaces := map[uint16]struct{}{1: struct{}{}, 2: struct{}{}, 3: struct{}{}} fakeInternalNextHops := map[uint16]netip.AddrPort{} testCases := map[string]struct { @@ -725,8 +725,8 @@ func TestProcessPkt(t *testing.T) { "outbound": { prepareDP: func(ctrl *gomock.Controller) *router.DataPlane { return router.NewDP( - map[uint16]router.BatchConn{ - uint16(1): mock_router.NewMockBatchConn(ctrl), + map[uint16]struct{}{ + uint16(1): struct{}{}, }, map[uint16]topology.LinkType{ 1: topology.Child, @@ -759,9 +759,9 @@ func TestProcessPkt(t *testing.T) { "brtransit": { prepareDP: func(ctrl *gomock.Controller) *router.DataPlane { return router.NewDP( - map[uint16]router.BatchConn{ - uint16(1): mock_router.NewMockBatchConn(ctrl), - uint16(2): mock_router.NewMockBatchConn(ctrl), + map[uint16]struct{}{ + uint16(1): struct{}{}, + uint16(2): struct{}{}, }, map[uint16]topology.LinkType{ 1: topology.Parent, @@ -794,9 +794,9 @@ func TestProcessPkt(t *testing.T) { "brtransit non consdir": { prepareDP: func(ctrl *gomock.Controller) *router.DataPlane { return router.NewDP( - map[uint16]router.BatchConn{ - uint16(1): mock_router.NewMockBatchConn(ctrl), - uint16(2): mock_router.NewMockBatchConn(ctrl), + map[uint16]struct{}{ + uint16(1): struct{}{}, + uint16(2): struct{}{}, }, map[uint16]topology.LinkType{ 2: topology.Parent, @@ -830,9 +830,9 @@ func TestProcessPkt(t *testing.T) { "brtransit peering consdir": { prepareDP: func(ctrl *gomock.Controller) *router.DataPlane { return router.NewDP( - map[uint16]router.BatchConn{ - uint16(1): mock_router.NewMockBatchConn(ctrl), - uint16(2): mock_router.NewMockBatchConn(ctrl), + map[uint16]struct{}{ + uint16(1): struct{}{}, + uint16(2): struct{}{}, }, map[uint16]topology.LinkType{ 1: topology.Peer, @@ -900,9 +900,9 @@ func TestProcessPkt(t *testing.T) { "brtransit peering non consdir": { prepareDP: func(ctrl *gomock.Controller) *router.DataPlane { return router.NewDP( - map[uint16]router.BatchConn{ - uint16(1): mock_router.NewMockBatchConn(ctrl), - uint16(2): mock_router.NewMockBatchConn(ctrl), + map[uint16]struct{}{ + uint16(1): struct{}{}, + uint16(2): struct{}{}, }, map[uint16]topology.LinkType{ 1: topology.Peer, @@ -977,9 +977,9 @@ func TestProcessPkt(t *testing.T) { // happens on the next hop. prepareDP: func(ctrl *gomock.Controller) *router.DataPlane { return router.NewDP( - map[uint16]router.BatchConn{ - uint16(1): mock_router.NewMockBatchConn(ctrl), - uint16(2): mock_router.NewMockBatchConn(ctrl), + map[uint16]struct{}{ + uint16(1): struct{}{}, + uint16(2): struct{}{}, }, map[uint16]topology.LinkType{ 1: topology.Peer, @@ -1051,9 +1051,9 @@ func TestProcessPkt(t *testing.T) { "peering non consdir upstream": { prepareDP: func(ctrl *gomock.Controller) *router.DataPlane { return router.NewDP( - map[uint16]router.BatchConn{ - uint16(1): mock_router.NewMockBatchConn(ctrl), - uint16(2): mock_router.NewMockBatchConn(ctrl), + map[uint16]struct{}{ + uint16(1): struct{}{}, + uint16(2): struct{}{}, }, map[uint16]topology.LinkType{ 1: topology.Peer, @@ -1133,8 +1133,8 @@ func TestProcessPkt(t *testing.T) { "astransit direct": { prepareDP: func(ctrl *gomock.Controller) *router.DataPlane { return router.NewDP( - map[uint16]router.BatchConn{ - uint16(1): mock_router.NewMockBatchConn(ctrl), + map[uint16]struct{}{ + uint16(1): struct{}{}, // Interface 3 isn't in the external interfaces of this router // another router has it. }, @@ -1168,8 +1168,8 @@ func TestProcessPkt(t *testing.T) { "astransit xover": { prepareDP: func(ctrl *gomock.Controller) *router.DataPlane { return router.NewDP( - map[uint16]router.BatchConn{ - uint16(51): mock_router.NewMockBatchConn(ctrl), + map[uint16]struct{}{ + uint16(51): struct{}{}, }, map[uint16]topology.LinkType{ 51: topology.Child, @@ -1365,8 +1365,8 @@ func TestProcessPkt(t *testing.T) { "reversed onehop outbound": { prepareDP: func(ctrl *gomock.Controller) *router.DataPlane { return router.NewDP( - map[uint16]router.BatchConn{ - uint16(1): mock_router.NewMockBatchConn(ctrl), + map[uint16]struct{}{ + uint16(1): struct{}{}, }, nil, mock_router.NewMockBatchConn(ctrl), @@ -1425,8 +1425,8 @@ func TestProcessPkt(t *testing.T) { "onehop outbound": { prepareDP: func(ctrl *gomock.Controller) *router.DataPlane { return router.NewDP( - map[uint16]router.BatchConn{ - uint16(2): mock_router.NewMockBatchConn(ctrl), + map[uint16]struct{}{ + uint16(2): struct{}{}, }, nil, mock_router.NewMockBatchConn(ctrl), diff --git a/router/export_test.go b/router/export_test.go index 1c87bcce8b..9a8744f9e0 100644 --- a/router/export_test.go +++ b/router/export_test.go @@ -66,7 +66,7 @@ func NewPacket(raw []byte, src, dst *net.UDPAddr, ingress, egress uint16) *Packe } func NewDP( - external map[uint16]BatchConn, + external map[uint16]struct{}, linkTypes map[uint16]topology.LinkType, internal BatchConn, internalNextHops map[uint16]netip.AddrPort,