diff --git a/BUILD.bazel b/BUILD.bazel index 17a531069b..c43159ca4e 100644 --- a/BUILD.bazel +++ b/BUILD.bazel @@ -198,6 +198,7 @@ pkg_tar( "//tools/buildkite/cmd/buildkite_artifacts", "//tools/end2end", "//tools/end2end_integration", + "//tools/end2endblast", "//tools/pktgen/cmd/pktgen", "//tools/scion_integration", "//tools/udpproxy", diff --git a/acceptance/router_benchmark/BUILD.bazel b/acceptance/router_benchmark/BUILD.bazel new file mode 100644 index 0000000000..824a7271ba --- /dev/null +++ b/acceptance/router_benchmark/BUILD.bazel @@ -0,0 +1,12 @@ +load("//acceptance/common:topogen.bzl", "topogen_test") + +topogen_test( + name = "test", + src = "test.py", + args = [ + "--executable=end2end_integration:$(location //tools/end2end_integration)", + ], + data = ["//tools/end2end_integration"], + homedir = "$(rootpath //tools/end2end_integration)", + topo = "testdata/router_bm.topo", +) diff --git a/acceptance/router_benchmark/test.py b/acceptance/router_benchmark/test.py new file mode 100755 index 0000000000..2049d9eea3 --- /dev/null +++ b/acceptance/router_benchmark/test.py @@ -0,0 +1,302 @@ +#!/usr/bin/env python3 + +# Copyright 2023 SCION Association +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import json +import yaml +from http.client import HTTPConnection +from urllib.parse import urlencode +from plumbum import cli +from plumbum.cmd import cat, grep, wc + +from acceptance.common import base, docker + +logger = logging.getLogger(__name__) + +# This test relies ona specific topology router_bm.topo. +# This topology is 1 core AS with two children and one core AS with none like so: +# +# CoreAS-A CoreAS-B +# BR-A1 BR-A2 ---- BR-B +# | | +# BR-C BR-D +# AS-C AS-D + +# Those values are valid expectations only when running in the CI environment. +EXPECTATIONS = { + 'in': 53000, + 'out': 26000, + 'in_transit': 73000, + 'out_transit': 49000, + 'br_transit': 73000, +} + + +class Test(base.TestTopogen): + """ + Tests that the performance of the router is within a satisfying (TBD) range. + The test runs in a bespoke topology. + """ + + ci = cli.Flag( + "ci", + help="Do extra checks for CI", + envname="CI" + ) + + def setup_prepare(self): + super().setup_prepare() + + # The expected topology for this test is well-known: see router_bm.topo + # This test is configured to match. + + # Distribute available cores among routers. The base schema is expressed as fractions of 12. + # Then we scale and round. + + childRouterCores = 2 # *2 + farRouterCores = 2 # *1 + centerRouterCores = 3 # *2 + availCores = int((cat['/proc/cpuinfo'] | grep['processor\\s:'] | wc['-l'])()) + + childRouterCores = int(childRouterCores * availCores / 12) + farRouterCores = int(farRouterCores * availCores / 12) + centerRouterCores = int(centerRouterCores * availCores / 12) + + if childRouterCores < 1: + childRouterCores = 1 + + if farRouterCores < 1: + farRouterCores = 1 + + if centerRouterCores < 1: + centerRouterCores = 1 + + availCores -= (2 * childRouterCores + 2 * centerRouterCores + farRouterCores) + + # The truncations can leave us with up to 4 extra cores. Give first to the center routers, + # if there's enough. + if availCores > 1: + availCores -= 2 + centerRouterCores += 1 + + # The leftovers go to childRouterCores, even if it means allocating one extraneous core. + if availCores > 0: + childRouterCores += 1 + + coreCountUpdates = { + 'br1-ff00_0_110-1': centerRouterCores, + 'br1-ff00_0_110-2': centerRouterCores, + 'br1-ff00_0_111-1': childRouterCores, + 'br1-ff00_0_112-1': childRouterCores, + 'br2-ff00_0_120-1': farRouterCores, + } + + # Edit GOMAXPROC for all routers in the docker compose file. + scion_dc = self.artifacts / "gen/scion-dc.yml" + with open(scion_dc, "r") as file: + dc = yaml.load(file, Loader=yaml.FullLoader) + + for router, coreCnt in coreCountUpdates.items(): + dc["services"][router]["environment"]["GOMAXPROCS"] = f"{coreCnt}" + + with open(scion_dc, "w") as file: + yaml.dump(dc, file) + + def setup(self): + super().setup() + self.monitoring_dc = docker.Compose(compose_file=self.artifacts / "gen/monitoring-dc.yml") + self.monitoring_dc("up", "-d") + + def _run(self): + # Give some time for the topology to start. + self.await_connectivity() + + # Start as-transiting load. With the router_bm topology + + # The subset noncore#nonlocalcore gives us outgoing traffic at each + # child, incoming traffic at BR-B, AS-transit-in traffic at BR-A1, + # and AS-transit-out traffic at BR-A2. There is a small amount of + # in and out traffic everywhere, on top of that produced by the test. + # We only consider the routers involved in the test. Those see much + # higher rates... we use that to isolate them in the results without + # having to compare instance labels with the topology data. + logger.info("==> Starting load as-transit") + loadtest = self.get_executable("end2end_integration") + retCode, stdOut, stdErr = loadtest[ + "-d", + "-outDir", self.artifacts, + "-name", "router_benchmark", + "-cmd", "./bin/end2endblast", + "-attempts", 1500000, + "-timeout", "120s", # Timeout is for all attempts together + "-parallelism", 100, + "-subset", "noncore#core#remoteISD" + ].run_tee() + + for line in stdOut.splitlines(): + if line.startswith('metricsBegin'): + _, beg, _, end = line.split() + + logger.info('==> Collecting in/out/as-transit performance metrics...') + + # The raw metrics are expressed in terms of core*seconds. We convert to machine*seconds + # which allows us to provide a projected packet/s; ...more intuitive than packets/core*s. + # We measure the rate over 10s. For best results we sample the end of the middle 10s of the + # run. "beg" is the start time of the real action and "end" is the end time. + sampleTime = (int(beg) + int(end) + 10) / 2 + promQuery = urlencode({ + 'time': f'{sampleTime}', + 'query': ( + 'sum by (instance, job, type) (' + ' rate(router_output_pkts_total{job="BR"}[10s])' + ')' + '/ on (instance, job) group_left()' + 'sum by (instance, job) (' + ' 1 - (rate(process_runnable_seconds_total[10s])' + ' / go_sched_maxprocs_threads)' + ')' + ) + }) + conn = HTTPConnection("localhost:9090") + conn.request('GET', f'/api/v1/query?{promQuery}') + resp = conn.getresponse() + if resp.status != 200: + raise RuntimeError(f'Unexpected response: {resp.status} {resp.reason}') + + pld = json.loads(resp.read().decode('utf-8')) + results = pld['data']['result'] + rateMap = {} + for result in results: + tt = result['metric']['type'] + ts, val = result['value'] + # 0 values should not enter in any averaging. In this test, a very + # low rate means that the router wasn't involved in the test for + # that traffic type. "Out" traffic is the only one that exists at + # two routers. To cover that case, we average the rates for a given + # traffic type. + # TODO: figure a more reliable way to identify the tested routers. + r = int(float(val)) + if r < 5000: # Not a router of interest. + continue + if rateMap.get(tt) is None: + rateMap[tt] = [] + rateMap[tt].append(r) + for tt, rates in rateMap.items(): + total = 0 + for r in rates: + total += r + rateMap[tt] = int(total / len(rates)) + + # Start br-transiting load. + # The subset noncore#noncore gives us a mix of in and out traffic at + # the childrem and pure BR-transit traffic at BR-A1. + logger.info("==> Starting load br-transit") + loadtest = self.get_executable("end2end_integration") + retCode, stdOut, stdErr = loadtest[ + "-d", + "-outDir", self.artifacts, + "-name", "router_benchmark", + "-cmd", "./bin/end2endblast", + "-attempts", 1500000, + "-timeout", "120s", # Timeout is for all attempts together + "-parallelism", 100, + "-subset", "noncore#noncore#remoteAS" + ].run_tee() + + for line in stdOut.splitlines(): + if line.startswith('metricsBegin'): + _, beg, _, end = line.split() + + logger.info('==> Collecting br-transit performance metrics...') + + # The raw metrics are expressed in terms of core*seconds. We convert to machine*seconds + # which allows us to provide a projected packet/s; ...more intuitive than packets/core*s. + # We're interested only in br_transit traffic. We measure the rate over 10s. For best + # results we sample the end of the middle 10s of the run. "beg" is the start time of the + # real action and "end" is the end time. + sampleTime = (int(beg) + int(end) + 10) / 2 + promQuery = urlencode({ + 'time': f'{sampleTime}', + 'query': ( + 'sum by (instance, job) (' + ' rate(router_output_pkts_total{job="BR", type="br_transit"}[10s])' + ')' + '/ on (instance, job) group_left()' + 'sum by (instance, job) (' + ' 1 - (rate(process_runnable_seconds_total[10s])' + ' / go_sched_maxprocs_threads)' + ')' + ) + }) + conn = HTTPConnection("localhost:9090") + conn.request('GET', f'/api/v1/query?{promQuery}') + resp = conn.getresponse() + if resp.status != 200: + raise RuntimeError(f'Unexpected response: {resp.status} {resp.reason}') + + # There's only one router that has br_transit traffic. + pld = json.loads(resp.read().decode('utf-8')) + results = pld['data']['result'] + tt = 'br_transit' + rateMap[tt] = 0 + for result in results: + ts, val = result['value'] + r = int(float(val)) + if r != 0: + rateMap[tt] = r + + # Fetch and log the number of cores used by Go. This may inform performance + # modeling later. + logger.info('==> Collecting number of cores...') + promQuery = urlencode({ + 'query': 'go_sched_maxprocs_threads{job="BR"}' + }) + + conn = HTTPConnection("localhost:9090") + conn.request('GET', f'/api/v1/query?{promQuery}') + resp = conn.getresponse() + if resp.status != 200: + raise RuntimeError(f'Unexpected response: {resp.status} {resp.reason}') + + pld = json.loads(resp.read().decode('utf-8')) + results = pld['data']['result'] + for result in results: + instance = result['metric']['instance'] + _, val = result['value'] + logger.info(f'Router Cores for {instance}: {int(val)}') + + # Log and check the performance... + # If this is used as a CI test. Make sure that the performance is within the expected + # ballpark. + rateTooLow = [] + for tt, exp in EXPECTATIONS.items(): + if self.ci: + logger.info(f'Packets/(machine*s) for {tt}: {rateMap[tt]} expected: {exp}') + if rateMap[tt] < 0.8 * exp: + rateTooLow.append(tt) + else: + logger.info(f'Packets/(machine*s) for {tt}: {rateMap[tt]}') + + if len(rateTooLow) != 0: + raise RuntimeError(f'Insufficient performance for: {rateTooLow}') + + def teardown(self): + self.monitoring_dc("down") + super().teardown() + + +if __name__ == '__main__': + base.main(Test) diff --git a/acceptance/router_benchmark/testdata/router_bm.topo b/acceptance/router_benchmark/testdata/router_bm.topo new file mode 100644 index 0000000000..7771446155 --- /dev/null +++ b/acceptance/router_benchmark/testdata/router_bm.topo @@ -0,0 +1,25 @@ +--- # Bespoke Topology for router benchmarking: +# Designed to cause at least one router to see only a single type of traffic +# (in/out/asTransitIn/asTransitOut/brTransit) for a well chosen pair type +# (core:core, core:non-core, core:local-core, etc.). +ASes: + "1-ff00:0:110": + core: true + voting: true + authoritative: true + issuing: true + mtu: 1400 + "2-ff00:0:120": + core: true + voting: true + authoritative: true + issuing: true + mtu: 1400 + "1-ff00:0:111": + cert_issuer: 1-ff00:0:110 + "1-ff00:0:112": + cert_issuer: 1-ff00:0:110 +links: + - {a: "1-ff00:0:110-A#1", b: "1-ff00:0:111#1", linkAtoB: CHILD, mtu: 1280} + - {a: "1-ff00:0:110-A#2", b: "1-ff00:0:112#1", linkAtoB: CHILD, mtu:1280} + - {a: "1-ff00:0:110-B#3", b: "2-ff00:0:120#1", linkAtoB: CORE, mtu: 1280} diff --git a/doc/manuals/router.rst b/doc/manuals/router.rst index c68768754a..1d70764cdb 100644 --- a/doc/manuals/router.rst +++ b/doc/manuals/router.rst @@ -120,6 +120,21 @@ Environment Variables :Type: :ref:`duration ` :Default: ``5m`` +.. envvar:: GOMAXPROCS + + Specified by the GO runtime. The Go runtime starts a number kernel threads such that the number + of non-sleeping threads never exceeds ``GOMAXPROCS``. By default ``GOMAXPROCS`` is equal to the + number of cores in the host. That value can be changed via the ``GOMAXPROCS`` environment + variable (or programatically by the application code). See + `the go runtime documentation `_ + for more information. One reason to change this is running multiple routers on the same host. + In such a case, it is best to split the available cores among the routers, lest Go's default + assumptions causes them to compete for cores and incurr futile context switching. This precaution + is especially useful in performance testing situations. + + :Type: unsigned integer + :Default: ``all cores`` + Configuration ============= @@ -182,17 +197,20 @@ considers the following options. .. option:: router.num_processors = (Default: GOMAXPROCS) Number of goroutines started for SCION packets processing. + These goroutines make the routing decision for the SCION packets by inspecting, validating and updating the path information in the packet header. Packets are processed asynchronously from the corresponding read/write operations on the individual interface sockets. `Goroutines `_ - are the Go pramming language's light-weight user-space concurrency primitives. Go's runtime schedules - goroutines on top of a fixed number of kernel threads. The number of kernel threads is controlled by - the ``GOMAXPROCS`` environment variable. See also the `go runtime documentation `_. - - By default, the router uses ``GOMAXPROCS`` packet processor goroutines, i.e. exactly one goroutine for - each kernel thread created by the runtime. + are the Go programming language's light-weight user-space concurrency primitives. Go's runtime + schedules goroutines on top of a smaller number of kernel threads. The default is to use as + many packet processors as there are kernel threads started by Go, letting other goroutines + displace them sporadically. Whether more or fewer processors are preferable is to be determined + experimentaly. + + The number of kernel threads that go creates depends on the number of usable cores, which is + controlled by the environment variable ``GOMAXPROCS``. See :envvar:`GOMAXPROCS`. .. option:: router.num_slow_processors = (Default: 1) diff --git a/docker/tester.bzl b/docker/tester.bzl index 3c85f2fc7e..0e0cbe28f5 100644 --- a/docker/tester.bzl +++ b/docker/tester.bzl @@ -33,6 +33,7 @@ def build_tester_image(): name = "bin", srcs = [ "//tools/end2end:end2end", + "//tools/end2endblast:end2endblast", "//scion/cmd/scion", "//scion-pki/cmd/scion-pki:scion-pki", ], diff --git a/pkg/private/processmetrics/processmetrics_linux.go b/pkg/private/processmetrics/processmetrics_linux.go index 821bd34dbe..3132fa6251 100644 --- a/pkg/private/processmetrics/processmetrics_linux.go +++ b/pkg/private/processmetrics/processmetrics_linux.go @@ -12,34 +12,42 @@ // See the License for the specific language governing permissions and // limitations under the License. -// Package processmetrics provides a custom collector to export process-level -// metrics beyond what prometheus.ProcesssCollector offers. -// This implementation is restricted to Linux. The generic implementation -// does nothing. -// This code works only if the delayacct kernel feature is turned on. -// this is done by "sysctl kernel.task_delayacct=1". +// Package processmetrics provides a custom collector to export process-level metrics beyond what +// prometheus.ProcesssCollector offers. This implementation is restricted to Linux. The generic +// implementation does nothing. // -// In order to make a fair run-to-run comparison of these metrics, we must -// relate them to the actual CPU time that was *available* to the router. -// That is, the time that the process was either running, blocked, or sleeping, -// but not "runnable" (which in unix-ese implies *not* running). -// A custom collector in pkg/processmetrics exposes the running and runnable -// metrics directly from the scheduler. -// Possibly crude example of a query that accounts for available cpu: +// The metrics we add serve to estimate the available cpu time; that is, the amount of CPU that the +// scheduler granted to the process, independently of whether it ran with it or slept on it. The +// goal is to measure what the overall performance of the process would be if it was never +// preempted. For example, this could be expressed as some_output/available_cpu_time. +// +// At a given time, a given thread is either running, runnable, or sleeping. When running it +// consumes exactly one core. When runnable, it is being deprived of exactly one core (because Go +// does not create more runnable threads than there are cores, there is no other thread of that +// process that is receiving it.). So, for our accounting purposes, the total time that all the +// process's threads spend "runnable" is the total core*time that the process did not receive. The +// complement of that: the available_cpu_time is: num_cores * real_time - total_runnable_time. +// +// We expose only running and runnable times. Available time can be inferred more conveniently in +// prometheus queries depending on the desired unit. For example: +// * available_cpu_seconds_per_seconds = num_cores - rate(process_runnable_seconds_total) +// * available_machine_seconds_per_seconds = 1 - rate(process_runnable_seconds_total)/num_cores +// +// Example of a query for processed_pkts per available cpu seconds: // // rate(router_processed_pkts_total[1m]) // / on (instance, job) group_left () -// (1 - rate(process_runnable_seconds_total[1m])) +// (num_cores - rate(process_runnable_seconds_total[1m])) // -// This shows processed_packets per available cpu seconds, as opposed to -// real time. -// Possibly crude example of a query that only looks at cpu use efficiency; -// This shows processed_packets per consumed cpu seconds: +// Example of a query that only looks at on-cpu efficiency; // // rate(router_processed_pkts_total[1m]) // / on (instance, job) group_left () // (rate(process_running_seconds_total[1m])) // +// The effective number of cores is best obtained from the go runtime. However, no prometheus +// collector seems to expose it yet, so we surface it here for convenience and simplicity +// under the name go_maxprocs_threads. //go:build linux @@ -48,6 +56,7 @@ package processmetrics import ( "os" "path/filepath" + "runtime" "strconv" "syscall" @@ -58,24 +67,22 @@ import ( ) var ( - // These two metrics allows to infer the amount of CPU time that was available, used or not, - // to the process: - // wallClock time = runningTime + runnableTime + sleepingTime. - // availableCPU = runningTime + sleepingTime - // Therefore AvailbleTime = wallClockTime - runnableTime. - // runningTime should be the same as uTime+sTime reported in a variety of other ways, - // but when doing calculations, better use the two data from the same source. So, collect them - // both. runningTime = prometheus.NewDesc( "process_running_seconds_total", - "Time the process spend running since it started (all threads summed).", + "CPU time the process used (running state) since it started (all threads summed).", nil, nil, ) runnableTime = prometheus.NewDesc( "process_runnable_seconds_total", - "Time the process spend runnable (unscheduled) since it started (all threads summed).", + "CPU time the process was denied (runnable state) since it started (all threads summed).", + nil, nil, + ) + goCores = prometheus.NewDesc( + "go_sched_maxprocs_threads", + "The current runtime.GOMAXPROCS setting. The number of cores Go code uses simultaneously", nil, nil, ) + // This metric is introspective. It's trying to gauge if we're successful in collecting the // other two at a reasonable cost. tasklistUpdates = prometheus.NewDesc( @@ -172,6 +179,11 @@ func (c *procStatCollector) Collect(ch chan<- prometheus.Metric) { prometheus.CounterValue, float64(c.totalRunnable)/1000000000, // Report duration in SI ) + ch <- prometheus.MustNewConstMetric( + goCores, + prometheus.GaugeValue, + float64(runtime.GOMAXPROCS(-1)), + ) ch <- prometheus.MustNewConstMetric( tasklistUpdates, prometheus.CounterValue, diff --git a/router/config/config.go b/router/config/config.go index f3376a3021..711bd73b61 100644 --- a/router/config/config.go +++ b/router/config/config.go @@ -1,5 +1,6 @@ // Copyright 2016 ETH Zurich // Copyright 2019 ETH Zurich, Anapaya Systems +// Copyright 2023 SCION Association // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -71,9 +72,15 @@ func (cfg *RouterConfig) Validate() error { } func (cfg *RouterConfig) InitDefaults() { + + // NumProcessors is the number of goroutines used to handle the processing queue. + // By default, there are as many as cores allowed by Go and other goroutines displace + // the packet processors sporadically. It may be either good or bad to create more + // processors (plus the other goroutines) than there are cores... experience will tell. if cfg.NumProcessors == 0 { cfg.NumProcessors = runtime.GOMAXPROCS(0) } + if cfg.NumSlowPathProcessors == 0 { cfg.NumSlowPathProcessors = 1 } diff --git a/tools/end2end/main.go b/tools/end2end/main.go index 1018f2247c..8dff7dad6a 100644 --- a/tools/end2end/main.go +++ b/tools/end2end/main.go @@ -1,5 +1,6 @@ // Copyright 2018 ETH Zurich // Copyright 2019 ETH Zurich, Anapaya Systems +// Copyright 2023 SCION Association // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -13,6 +14,11 @@ // See the License for the specific language governing permissions and // limitations under the License. +// This is a general purpose client/server code for end2end tests. The client +// sends pings to the server until it receives at least one pong from the +// server or a given deadline is reached. The server responds to all pings and +// the client wait for a response before doing anything else. + package main import ( @@ -93,6 +99,7 @@ func realMain() int { return 1 } defer closeTracer() + if integration.Mode == integration.ModeServer { server{}.run() return 0 @@ -104,7 +111,7 @@ func realMain() int { func addFlags() { flag.Var(&remote, "remote", "(Mandatory for clients) address to connect to") flag.Var(timeout, "timeout", "The timeout for each attempt") - flag.BoolVar(&epic, "epic", false, "Enable EPIC.") + flag.BoolVar(&epic, "epic", false, "Enable EPIC") } func validateFlags() { @@ -119,6 +126,7 @@ func validateFlags() { integration.LogFatal("Invalid timeout provided", "timeout", timeout) } } + log.Info("Flags", "timeout", timeout, "epic", epic, "remote", remote) } type server struct{} @@ -226,7 +234,7 @@ func (s server) handlePing(conn snet.PacketConn) error { // reverse path rpath, ok := p.Path.(snet.RawPath) if !ok { - return serrors.New("unecpected path", "type", common.TypeOf(p.Path)) + return serrors.New("unexpected path", "type", common.TypeOf(p.Path)) } replypather := snet.DefaultReplyPather{} replyPath, err := replypather.ReplyPath(rpath) @@ -243,10 +251,9 @@ func (s server) handlePing(conn snet.PacketConn) error { } type client struct { - conn snet.PacketConn - port uint16 - sdConn daemon.Connector - + conn snet.PacketConn + port uint16 + sdConn daemon.Connector errorPaths map[snet.PathFingerprint]struct{} } @@ -278,6 +285,8 @@ func (c *client) run() int { return integration.AttemptRepeatedly("End2End", c.attemptRequest) } +// attemptRequest sends one ping packet and expect a pong. +// Returns true (which means "stop") *if both worked*. func (c *client) attemptRequest(n int) bool { timeoutCtx, cancel := context.WithTimeout(context.Background(), timeout.Duration) defer cancel() @@ -295,17 +304,19 @@ func (c *client) attemptRequest(n int) bool { } span, ctx = tracing.StartSpanFromCtx(ctx, "attempt.ping") defer span.Finish() + withTag := func(err error) error { + tracing.Error(span, err) + return err + } // Send ping if err := c.ping(ctx, n, path); err != nil { - tracing.Error(span, err) - logger.Error("Could not send packet", "err", err) + logger.Error("Could not send packet", "err", withTag(err)) return false } // Receive pong if err := c.pong(ctx); err != nil { - tracing.Error(span, err) - logger.Error("Error receiving pong", "err", err) + logger.Error("Error receiving pong", "err", withTag(err)) if path != nil { c.errorPaths[snet.Fingerprint(path)] = struct{}{} } diff --git a/tools/end2end_integration/BUILD.bazel b/tools/end2end_integration/BUILD.bazel index 31d5ae0f26..df5c957ea5 100644 --- a/tools/end2end_integration/BUILD.bazel +++ b/tools/end2end_integration/BUILD.bazel @@ -19,6 +19,10 @@ go_library( scion_go_binary( name = "end2end_integration", + data = [ + "//tools/end2end", + "//tools/end2endblast", + ], embed = [":go_default_library"], visibility = ["//visibility:public"], ) diff --git a/tools/end2end_integration/main.go b/tools/end2end_integration/main.go index 757d252381..43efd2e093 100644 --- a/tools/end2end_integration/main.go +++ b/tools/end2end_integration/main.go @@ -1,4 +1,5 @@ // Copyright 2018 ETH Zurich, Anapaya Systems +// Copyright 2023 SCION Association // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -99,6 +100,7 @@ func realMain() int { log.Error("Error during tests", "err", err) return 1 } + return 0 } @@ -110,8 +112,9 @@ func addFlags() { flag.StringVar(&name, "name", "end2end_integration", "The name of the test that is running (default: end2end_integration)") flag.Var(timeout, "timeout", "The timeout for each attempt") - flag.StringVar(&subset, "subset", "all", "Subset of pairs to run (all|core#core|"+ - "noncore#localcore|noncore#core|noncore#noncore)") + flag.StringVar(&subset, "subset", "all", "Subset of pairs to run (all|#[#] "+ + " where =[|core|noncore] dst=[|core|noncore] and "+ + " is [localAS|remoteAS|localISD|remoteISD])") flag.IntVar(¶llelism, "parallelism", 1, "How many end2end tests run in parallel.") flag.StringVar(&features, "features", "", fmt.Sprintf("enable development features (%v)", feature.String(&feature.Default{}, "|"))) @@ -258,9 +261,22 @@ func runTests(in integration.Integration, pairs []integration.IAPair) error { clientResults <- err }(src, dsts) } + + // We started everything that could be started. So the best window for perf mertics + // opens somewhere around now. + metricsBegin := time.Now().Unix() errs = nil + end_reported := false for range groups { err := <-clientResults + if !end_reported { + end_reported = true + // The first client has finished. So the performance metrics have begun losing + // quality. + metricsEnd := time.Now().Unix() + // The test harness looks for this output. + fmt.Printf("metricsBegin: %d metricsEnd: %d\n", metricsBegin, metricsEnd) + } if err != nil { errs = append(errs, err) } @@ -295,21 +311,33 @@ func clientTemplate(progressSock string) integration.Cmd { } // getPairs returns the pairs to test according to the specified subset. +// The subset can be based on role, as follows: +// role1#role2[#local] selects all src:dst pairs matching such that src has role1 +// and dst has role2 (and NOT the other way around). If local[ISD|AS]/remote[ISD|AS] is specified +// then src and dst must/must-not be in the same ISD/AS +// +// This implies that IFF role1 == role2, then h1:h2 pairs are mirrored with h2:h1 and, unless +// remote[ISD/AS] is specified, h2:h2 and h1:h1. Not all combinations yield something useful... +// caveat emptor. func getPairs() ([]integration.IAPair, error) { pairs := integration.IAPairs(integration.DispAddr) if subset == "all" { return pairs, nil } parts := strings.Split(subset, "#") - if len(parts) != 2 { + switch len(parts) { + case 2: + return filter(parts[0], parts[1], "", pairs, integration.LoadedASList), nil + case 3: + return filter(parts[0], parts[1], parts[2], pairs, integration.LoadedASList), nil + default: return nil, serrors.New("Invalid subset", "subset", subset) } - return filter(parts[0], parts[1], pairs, integration.LoadedASList), nil } // filter returns the list of ASes that are part of the desired subset. func filter( - src, dst string, + src, dst, local string, pairs []integration.IAPair, ases *integration.ASList, ) []integration.IAPair { @@ -325,11 +353,21 @@ func filter( } } } + + // Selection based on role. for _, pair := range pairs { filter := !contains(ases, src != "noncore", pair.Src.IA) filter = filter || !contains(ases, dst != "noncore", pair.Dst.IA) - if dst == "localcore" { + switch local { + case "localISD": filter = filter || pair.Src.IA.ISD() != pair.Dst.IA.ISD() + case "remoteISD": + filter = filter || pair.Src.IA.ISD() == pair.Dst.IA.ISD() + case "localAS": + filter = filter || pair.Src.IA != pair.Dst.IA + case "remoteAS": + filter = filter || pair.Src.IA == pair.Dst.IA + default: } if !filter { res = append(res, pair) diff --git a/tools/end2endblast/BUILD.bazel b/tools/end2endblast/BUILD.bazel new file mode 100644 index 0000000000..e89d3c45cc --- /dev/null +++ b/tools/end2endblast/BUILD.bazel @@ -0,0 +1,30 @@ +load("//tools/lint:go.bzl", "go_library") +load("//:scion.bzl", "scion_go_binary") + +go_library( + name = "go_default_library", + srcs = ["main.go"], + importpath = "github.com/scionproto/scion/tools/end2endblast", + visibility = ["//visibility:private"], + deps = [ + "//pkg/addr:go_default_library", + "//pkg/daemon:go_default_library", + "//pkg/log:go_default_library", + "//pkg/private/common:go_default_library", + "//pkg/private/serrors:go_default_library", + "//pkg/private/util:go_default_library", + "//pkg/snet:go_default_library", + "//pkg/snet/metrics:go_default_library", + "//pkg/snet/path:go_default_library", + "//pkg/sock/reliable:go_default_library", + "//private/topology:go_default_library", + "//tools/integration:go_default_library", + "//tools/integration/integrationlib:go_default_library", + ], +) + +scion_go_binary( + name = "end2endblast", + embed = [":go_default_library"], + visibility = ["//visibility:public"], +) diff --git a/tools/end2endblast/main.go b/tools/end2endblast/main.go new file mode 100644 index 0000000000..74d04ef854 --- /dev/null +++ b/tools/end2endblast/main.go @@ -0,0 +1,459 @@ +// Copyright 2023 SCION Association +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// This is a client/server code for use by end2end tests. This one plays +// a variant of ping-pong where the client to send back-to-back pings to the +// server until the sending fails or some deadline was reached. In this case +// the client isn't waiting for responses. The client checks at the end +// whether at least one response has been received. The server responds rarely. + +package main + +import ( + "context" + "encoding/json" + "errors" + "flag" + "fmt" + "net" + "net/netip" + "os" + "time" + + "github.com/scionproto/scion/pkg/addr" + "github.com/scionproto/scion/pkg/daemon" + "github.com/scionproto/scion/pkg/log" + "github.com/scionproto/scion/pkg/private/common" + "github.com/scionproto/scion/pkg/private/serrors" + "github.com/scionproto/scion/pkg/private/util" + "github.com/scionproto/scion/pkg/snet" + "github.com/scionproto/scion/pkg/snet/metrics" + snetpath "github.com/scionproto/scion/pkg/snet/path" + "github.com/scionproto/scion/pkg/sock/reliable" + "github.com/scionproto/scion/private/topology" + libint "github.com/scionproto/scion/tools/integration" + integration "github.com/scionproto/scion/tools/integration/integrationlib" +) + +const ( + ping = "ping" + pong = "pong" +) + +type Ping struct { + Server addr.IA `json:"server"` + Message string `json:"message"` +} + +type Pong struct { + Client addr.IA `json:"client"` + Server addr.IA `json:"server"` + Message string `json:"message"` +} + +var ( + remote snet.UDPAddr + timeout = &util.DurWrap{Duration: 90 * time.Second} + scionPacketConnMetrics = metrics.NewSCIONPacketConnMetrics() + scmpErrorsCounter = scionPacketConnMetrics.SCMPErrors + epic bool +) + +func main() { + os.Exit(realMain()) +} + +func realMain() int { + defer log.HandlePanic() + defer log.Flush() + addFlags() + err := integration.Setup() + if err != nil { + log.Error("Parsing common flags failed", "err", err) + return 1 + } + validateFlags() + + if integration.Mode == integration.ModeServer { + (&server{}).run() + return 0 + } + c := client{} + return c.run() +} + +func addFlags() { + flag.Var(&remote, "remote", "(Mandatory for clients) address to connect to") + flag.Var(timeout, "timeout", "The timeout for completing the whole test") + flag.BoolVar(&epic, "epic", false, "Enable EPIC") +} + +func validateFlags() { + if integration.Mode == integration.ModeClient { + if remote.Host == nil { + integration.LogFatal("Missing remote address") + } + if remote.Host.Port == 0 { + integration.LogFatal("Invalid remote port", "remote port", remote.Host.Port) + } + if timeout.Duration == 0 { + integration.LogFatal("Invalid timeout provided", "timeout", timeout) + } + } + log.Info("Flags", "timeout", timeout, "epic", epic, "remote", remote) +} + +type server struct { + pongs uint8 // chosen to overflow. +} + +func (s *server) run() { + log.Info("Starting server", "isd_as", integration.Local.IA) + defer log.Info("Finished server", "isd_as", integration.Local.IA) + + sdConn := integration.SDConn() + defer sdConn.Close() + connFactory := &snet.DefaultPacketDispatcherService{ + Dispatcher: reliable.NewDispatcher(""), + SCMPHandler: snet.DefaultSCMPHandler{ + RevocationHandler: daemon.RevHandler{Connector: sdConn}, + SCMPErrors: scmpErrorsCounter, + }, + SCIONPacketConnMetrics: scionPacketConnMetrics, + } + + conn, port, err := connFactory.Register(context.Background(), integration.Local.IA, + integration.Local.Host, addr.SvcNone) + if err != nil { + integration.LogFatal("Error listening", "err", err) + } + defer conn.Close() + if len(os.Getenv(libint.GoIntegrationEnv)) > 0 { + // Needed for integration test ready signal. + fmt.Printf("Port=%d\n", port) + fmt.Printf("%s%s\n\n", libint.ReadySignal, integration.Local.IA) + } + + log.Info("Listening", "local", fmt.Sprintf("%v:%d", integration.Local.Host, port)) + + // Receive ping message + for { + if err := s.handlePing(conn); err != nil { + log.Error("Error handling ping", "err", err) + } + } +} + +func (s *server) handlePing(conn snet.PacketConn) error { + var p snet.Packet + var ov net.UDPAddr + if err := readFrom(conn, &p, &ov); err != nil { + return serrors.WrapStr("reading packet", err) + } + udp, ok := p.Payload.(snet.UDPPayload) + if !ok { + return serrors.New("unexpected payload received", + "source", p.Source, + "destination", p.Destination, + "type", common.TypeOf(p.Payload), + ) + } + var pld Ping + if err := json.Unmarshal(udp.Payload, &pld); err != nil { + return serrors.New("invalid payload contents", + "source", p.Source, + "destination", p.Destination, + "data", string(udp.Payload), + ) + } + + if pld.Message != ping || !pld.Server.Equal(integration.Local.IA) { + return serrors.New("unexpected data in payload", + "source", p.Source, + "destination", p.Destination, + "data", pld, + ) + } + + // In this game, we respond to 1/256 (~0.4%) of the pings. Just enough + // to prove that some pings were received, but not enough to distort + // performance data by mixing in traffic types. + if s.pongs++; s.pongs != 0 { + return nil + } + log.Info(fmt.Sprintf("Ping received from %s, sending pong.", p.Source)) + raw, err := json.Marshal(Pong{ + Client: p.Source.IA, + Server: integration.Local.IA, + Message: pong, + }) + if err != nil { + return serrors.WrapStr("packing pong", err) + } + + p.Destination, p.Source = p.Source, p.Destination + p.Payload = snet.UDPPayload{ + DstPort: udp.SrcPort, + SrcPort: udp.DstPort, + Payload: raw, + } + // reverse path + rpath, ok := p.Path.(snet.RawPath) + if !ok { + return serrors.New("unexpected path", "type", common.TypeOf(p.Path)) + } + replypather := snet.DefaultReplyPather{} + replyPath, err := replypather.ReplyPath(rpath) + if err != nil { + return serrors.WrapStr("creating reply path", err) + } + p.Path = replyPath + // Send pong + if err := conn.WriteTo(&p, &ov); err != nil { + return serrors.WrapStr("sending reply", err) + } + log.Info("Sent pong to", "client", p.Destination) + return nil +} + +type client struct { + conn snet.PacketConn + port uint16 + sdConn daemon.Connector + path snet.Path +} + +func (c *client) run() int { + pair := fmt.Sprintf("%s -> %s", integration.Local.IA, remote.IA) + log.Info("Starting", "pair", pair) + defer log.Info("Finished", "pair", pair) + defer integration.Done(integration.Local.IA, remote.IA) + connFactory := &snet.DefaultPacketDispatcherService{ + Dispatcher: reliable.NewDispatcher(""), + SCMPHandler: snet.DefaultSCMPHandler{ + RevocationHandler: daemon.RevHandler{Connector: integration.SDConn()}, + SCMPErrors: scmpErrorsCounter, + }, + SCIONPacketConnMetrics: scionPacketConnMetrics, + } + + var err error + c.conn, c.port, err = connFactory.Register(context.Background(), integration.Local.IA, + integration.Local.Host, addr.SvcNone) + if err != nil { + integration.LogFatal("Unable to listen", "err", err) + } + log.Info("Send on", "local", + fmt.Sprintf("%v,[%v]:%d", integration.Local.IA, integration.Local.Host.IP, c.port)) + c.sdConn = integration.SDConn() + defer c.sdConn.Close() + + // Drain pongs in the background + pongOut := make(chan int) + go func() { + defer log.HandlePanic() + + // The timeout extends over the entire test. When we don't need to drain any more + // we just cancel it. + ctx, cancel := context.WithTimeout(context.Background(), timeout.Duration) + defer cancel() + + // Drain pongs as long as we get them. We assume that failure means + // there are no more pongs. We want ro receive at least one pong. The + // rest doesn't matter. + allFailed := 1 + integration.RepeatUntilFail("End2EndBlast", func(n int) bool { + + if err := c.pong(ctx); err != nil { + // We should receive at least one, but this runs until pings stop + // coming, so there will always be one failure in the end. + return true // Stop. + } + allFailed = 0 + return false // Keep consuming pongs + }) + pongOut <- allFailed + }() + + // Same here, the timeout context lives on for the rest of the test (so we don't keep + // creating and discarding contexts). + ctx, cancel := context.WithTimeout(context.Background(), timeout.Duration) + defer cancel() + + // Get a path, then use it for all the repeats + p, err := c.getRemote(ctx) + if err != nil { + integration.LogFatal("Could not get remote", "err", err) + return 1 + } + c.path = p // struct fields cannot be assigned with := + + // We return a "number of failures". So 0 means everything is fine. + pingResult := integration.RepeatUntilFail("End2EndBlast", func(n int) bool { + // Send ping + if err := c.ping(ctx, n, c.path); err != nil { + logger := log.FromCtx(ctx) + logger.Error("Could not send packet", "err", err) + return true + } + + return false // Don't stop. Do it again! + }) + + // Stop drainPongs, so we're not stuck here for up to 10s. + c.conn.Close() + + pongResult := <-pongOut + if pongResult != 0 { + log.Info("Never got a single pong") + } + return pingResult + pongResult +} + +func (c *client) ping(ctx context.Context, n int, path snet.Path) error { + rawPing, err := json.Marshal(Ping{ + Server: remote.IA, + Message: ping, + }) + if err != nil { + return serrors.WrapStr("packing ping", err) + } + if err := c.conn.SetWriteDeadline(getDeadline(ctx)); err != nil { + return serrors.WrapStr("setting write deadline", err) + } + if remote.NextHop == nil { + remote.NextHop = &net.UDPAddr{ + IP: remote.Host.IP, + Port: topology.EndhostPort, + } + } + + remoteHostIP, ok := netip.AddrFromSlice(remote.Host.IP) + if !ok { + return serrors.New("invalid remote host IP", "ip", remote.Host.IP) + } + localHostIP, ok := netip.AddrFromSlice(integration.Local.Host.IP) + if !ok { + return serrors.New("invalid local host IP", "ip", integration.Local.Host.IP) + } + pkt := &snet.Packet{ + PacketInfo: snet.PacketInfo{ + Destination: snet.SCIONAddress{ + IA: remote.IA, + Host: addr.HostIP(remoteHostIP), + }, + Source: snet.SCIONAddress{ + IA: integration.Local.IA, + Host: addr.HostIP(localHostIP), + }, + Path: remote.Path, + Payload: snet.UDPPayload{ + SrcPort: c.port, + DstPort: uint16(remote.Host.Port), + Payload: rawPing, + }, + }, + } + if err := c.conn.WriteTo(pkt, remote.NextHop); err != nil { + return err + } + return nil +} + +func (c *client) getRemote(ctx context.Context) (snet.Path, error) { + if remote.IA.Equal(integration.Local.IA) { + remote.Path = snetpath.Empty{} + return nil, nil + } + paths, err := c.sdConn.Paths(ctx, remote.IA, integration.Local.IA, + daemon.PathReqFlags{Refresh: false}) + if err != nil { + return nil, serrors.WrapStr("requesting paths", err) + } + // Select first path + if len(paths) == 0 { + return nil, serrors.New("no path found") + } + path := paths[0] + + // Extract forwarding path from the SCION Daemon response. + // If the epic flag is set, try to use the EPIC path type header. + if epic { + scionPath, ok := path.Dataplane().(snetpath.SCION) + if !ok { + return nil, serrors.New("provided path must be of type scion") + } + epicPath, err := snetpath.NewEPICDataplanePath(scionPath, path.Metadata().EpicAuths) + if err != nil { + return nil, err + } + remote.Path = epicPath + } else { + remote.Path = path.Dataplane() + } + remote.NextHop = path.UnderlayNextHop() + return path, nil +} + +func (c *client) pong(ctx context.Context) error { + if err := c.conn.SetReadDeadline(getDeadline(ctx)); err != nil { + return serrors.WrapStr("setting read deadline", err) + } + var p snet.Packet + var ov net.UDPAddr + if err := readFrom(c.conn, &p, &ov); err != nil { + return serrors.WrapStr("reading packet", err) + } + + udp, ok := p.Payload.(snet.UDPPayload) + if !ok { + return serrors.New("unexpected payload received", "type", common.TypeOf(p.Payload)) + } + + var pld Pong + if err := json.Unmarshal(udp.Payload, &pld); err != nil { + return serrors.WrapStr("unpacking pong", err, "data", string(udp.Payload)) + } + + expected := Pong{ + Client: integration.Local.IA, + Server: remote.IA, + Message: pong, + } + if pld.Client != expected.Client || pld.Server != expected.Server || pld.Message != pong { + return serrors.New("unexpected contents received", "data", pld, "expected", expected) + } + return nil +} + +func getDeadline(ctx context.Context) time.Time { + dl, ok := ctx.Deadline() + if !ok { + integration.LogFatal("No deadline in context") + } + return dl +} + +func readFrom(conn snet.PacketConn, pkt *snet.Packet, ov *net.UDPAddr) error { + err := conn.ReadFrom(pkt, ov) + // Attach more context to error + var opErr *snet.OpError + if !(errors.As(err, &opErr) && opErr.RevInfo() != nil) { + return err + } + return serrors.WithCtx(err, + "isd_as", opErr.RevInfo().IA(), + "interface", opErr.RevInfo().IfID, + ) +} diff --git a/tools/integration/integrationlib/common.go b/tools/integration/integrationlib/common.go index 5a50908298..b215a97ae3 100644 --- a/tools/integration/integrationlib/common.go +++ b/tools/integration/integrationlib/common.go @@ -1,5 +1,6 @@ // Copyright 2018 ETH Zurich // Copyright 2019 ETH Zurich, Anapaya Systems +// Copyright 2023 SCION Association // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -138,26 +139,39 @@ func SDConn() daemon.Connector { // AttemptFunc attempts a request repeatedly, receives the attempt number type AttemptFunc func(n int) bool -// AttemptRepeatedly runs attempt until it returns true or more than Attempts were executed. -// Between two attempts at least RetryTimeout time has to pass. +// AttemptRepeatedly runs attempt until it returns true (succeeded => stop) or more than Attempts +// were executed. Between two attempts at least RetryTimeout time has to pass. // Returns 0 on success, 1 on failure. func AttemptRepeatedly(name string, attempt AttemptFunc) int { - attempts := 0 - for { - attempts++ - if attempt(attempts) { - return 0 - } else if attempts < Attempts { + for attempts := 0; attempts < Attempts; attempts++ { + if attempts != 0 { log.Info("Retrying...") time.Sleep(integration.RetryTimeout) - continue } - log.Error(fmt.Sprintf("%s failed. No more attempts...", name)) - break + if attempt(attempts) { + return 0 + } } + log.Error(fmt.Sprintf("%s failed. No more attempts...", name)) return 1 } +// RepeatUntilFail runs doit() until it returns true (failed -> stop) or more than Attempts +// were executed. There is no delay nor logging between attempts. +// Returns 0 if all Attempts succeeded, 1 on failure. +// This is very similar to AttemptRepeatedly, but difference in failure/success behaviour +// justify a different function: parameter-based tweaks would be easily confusing. +func RepeatUntilFail(name string, doit AttemptFunc) int { + for attempts := 0; attempts < Attempts; attempts++ { + if doit(attempts) { + log.Error(fmt.Sprintf("%s failed...", name)) + return 1 + } + } + log.Info(fmt.Sprintf("%s completed. No more repeats...", name)) + return 0 +} + // Done informs the integration test that a test binary has finished. func Done(src, dst addr.IA) { if Progress == "" { diff --git a/tools/md/skipped b/tools/md/skipped index 5114feec2d..ac3f133849 100644 --- a/tools/md/skipped +++ b/tools/md/skipped @@ -1,2 +1,3 @@ ^\./docker/_build/ ^\./licenses/data +^\./rules_openapi/tools/ diff --git a/topology/BUILD.bazel b/topology/BUILD.bazel index a76c5f0708..2f2aa9ad96 100644 --- a/topology/BUILD.bazel +++ b/topology/BUILD.bazel @@ -11,5 +11,6 @@ exports_files( "tiny.topo", "tiny4.topo", "wide.topo", + "router_bm.topo", ], )