diff --git a/.circleci/config.yml b/.circleci/config.yml index 1ee68971da..3a97f8f3f0 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -27,7 +27,7 @@ jobs: build: docker: - - image: kormat/scion_base@sha256:4b6bf6e422fc4e77d18ba952066cfa6a3b2624d1825178e1b5fe4f7a37f34500 + - image: kormat/scion_base@sha256:f441df20a67b8968ba4a38a86eb47b80b835d68d3bd94d767af3b18408ece0b4 <<: *job steps: - checkout diff --git a/env/pip3/requirements.txt b/env/pip3/requirements.txt index 781e54bdbf..780d6a3683 100644 --- a/env/pip3/requirements.txt +++ b/env/pip3/requirements.txt @@ -15,3 +15,4 @@ cffi==1.10.0 --hash=sha256:b3b02911eb1f6ada203b0763ba924234629b51586f72a21faacc6 pyparsing==2.2.0 --hash=sha256:fee43f17a9c4087e7ed1605bd6df994c6173c1e977d7ade7b651292fab2bd010 appdirs==1.4.3 --hash=sha256:d8b24664561d0d34ddfaec54636d502d7cea6e29c3eaf68f3df6180863e2166e pycparser==2.17 --hash=sha256:0aac31e917c24cb3357f5a4d5566f2cc91a19ca41862f6c3c22dc60a629673b6 +prometheus-client==0.0.19 --hash=sha256:ce4ddcb89a870ee771ca5427df123029bf5344ea84f535ded4a1787e29a22a3f diff --git a/python/beacon_server/base.py b/python/beacon_server/base.py index 5828c77eb9..448cad61f1 100644 --- a/python/beacon_server/base.py +++ b/python/beacon_server/base.py @@ -26,6 +26,7 @@ # External packages from external.expiring_dict import ExpiringDict +from prometheus_client import Counter # SCION from beacon_server.if_state import InterfaceState @@ -84,6 +85,15 @@ from scion_elem.scion_elem import SCIONElement +# Exported metrics. +BEACONS_PROPAGATED = Counter("bs_beacons_propagated_total", "# of propagated beacons", + ["server_id", "isd_as", "type"]) +SEGMENTS_REGISTERED = Counter("bs_segments_registered_total", "# of registered segments", + ["server_id", "isd_as", "type"]) +REVOCATIONS_ISSUED = Counter("bs_revocations_issued_total", "# of issued revocations", + ["server_id", "isd_as"]) + + class BeaconServer(SCIONElement, metaclass=ABCMeta): """ The SCION PathConstructionBeacon Server. @@ -104,12 +114,13 @@ class BeaconServer(SCIONElement, metaclass=ABCMeta): # Interval to checked for timed out interfaces. IF_TIMEOUT_INTERVAL = 1 - def __init__(self, server_id, conf_dir): + def __init__(self, server_id, conf_dir, prom_export=None): """ :param str server_id: server identifier. :param str conf_dir: configuration directory. + :param str prom_export: prometheus export address. """ - super().__init__(server_id, conf_dir) + super().__init__(server_id, conf_dir, prom_export=prom_export) # TODO: add 2 policies self.path_policy = PathPolicy.from_file( os.path.join(conf_dir, PATH_POLICY_FILE)) @@ -180,6 +191,7 @@ def propagate_downstream_pcb(self, pcb): :type pcb: PathSegment """ propagated_pcbs = defaultdict(list) + prop_cnt = 0 for intf in self.topology.child_interfaces: if not intf.to_if_id: continue @@ -189,6 +201,9 @@ def propagate_downstream_pcb(self, pcb): continue self.send_meta(new_pcb, meta) propagated_pcbs[(intf.isd_as, intf.if_id)].append(pcb.short_id()) + prop_cnt += 1 + if self._labels: + BEACONS_PROPAGATED.labels(**self._labels, type="down").inc(prop_cnt) return propagated_pcbs def _mk_prop_pcb_meta(self, pcb, dst_ia, egress_if): @@ -233,8 +248,8 @@ def handle_pcbs_propagation(self): def _log_propagations(self, propagated_pcbs): for (isd_as, if_id), pcbs in propagated_pcbs.items(): - logging.debug("Propagated %d PCBs to %s via %s (%s)", len(pcbs), isd_as, - if_id, ", ".join(pcbs)) + logging.debug("Propagated %d PCBs to %s via %s (%s)", len(pcbs), + isd_as, if_id, ", ".join(pcbs)) def _handle_pcbs_from_zk(self, pcbs): """ @@ -315,9 +330,13 @@ def register_segments(self): raise NotImplementedError def _log_registrations(self, registrations, seg_type): + reg_cnt = 0 for (dst_meta, dst_type), pcbs in registrations.items(): + reg_cnt += len(pcbs) logging.debug("Registered %d %s-segments @ %s:%s (%s)", len(pcbs), seg_type, dst_type.upper(), dst_meta, ", ".join(pcbs)) + if self._labels: + SEGMENTS_REGISTERED.labels(**self._labels, type=seg_type).inc(reg_cnt) def _create_asm(self, in_if, out_if, ts, prev_hof): pcbms = list(self._create_pcbms(in_if, out_if, ts, prev_hof)) @@ -568,6 +587,8 @@ def _issue_revocation(self, if_id): return rev_info = self._get_ht_proof(if_id) logging.info("Issuing revocation: %s", rev_info.short_desc()) + if self._labels: + REVOCATIONS_ISSUED.labels(**self._labels).inc() # Issue revocation to all BRs. info = IFStateInfo.from_values(if_id, False, rev_info) pld = IFStatePayload.from_values([info]) diff --git a/python/beacon_server/core.py b/python/beacon_server/core.py index 2037629185..74c95a74c3 100644 --- a/python/beacon_server/core.py +++ b/python/beacon_server/core.py @@ -20,7 +20,7 @@ from collections import defaultdict # SCION -from beacon_server.base import BeaconServer +from beacon_server.base import BeaconServer, BEACONS_PROPAGATED from lib.defines import PATH_SERVICE, SIBRA_SERVICE from lib.errors import SCIONServiceLookupError from lib.packet.opaque_field import InfoOpaqueField @@ -38,12 +38,13 @@ class CoreBeaconServer(BeaconServer): Starts broadcasting beacons down-stream within an ISD and across ISDs towards other core beacon servers. """ - def __init__(self, server_id, conf_dir): + def __init__(self, server_id, conf_dir, prom_export=None): """ :param str server_id: server identifier. :param str conf_dir: configuration directory. + :param str prom_export: prometheus export address. """ - super().__init__(server_id, conf_dir) + super().__init__(server_id, conf_dir, prom_export=prom_export) # Sanity check that we should indeed be a core beacon server. assert self.topology.is_core_as, "This shouldn't be a local BS!" self.core_beacons = defaultdict(self._ps_factory) @@ -61,6 +62,7 @@ def propagate_core_pcb(self, pcb): Propagates the core beacons to other core ASes. """ propagated_pcbs = defaultdict(list) + prop_cnt = 0 for intf in self.topology.core_interfaces: dst_ia = intf.isd_as if not self._filter_pcb(pcb, dst_ia=dst_ia): @@ -71,6 +73,9 @@ def propagate_core_pcb(self, pcb): continue self.send_meta(new_pcb, meta) propagated_pcbs[(intf.isd_as, intf.if_id)].append(pcb.short_id()) + prop_cnt += 1 + if self._labels: + BEACONS_PROPAGATED.labels(**self._labels, type="core").inc(prop_cnt) return propagated_pcbs def handle_pcbs_propagation(self): diff --git a/python/beacon_server/local.py b/python/beacon_server/local.py index 58a1ae374c..cae63164d2 100644 --- a/python/beacon_server/local.py +++ b/python/beacon_server/local.py @@ -37,12 +37,13 @@ class LocalBeaconServer(BeaconServer): servers. """ - def __init__(self, server_id, conf_dir): + def __init__(self, server_id, conf_dir, prom_export=None): """ :param str server_id: server identifier. :param str conf_dir: configuration directory. + :param str prom_export: prometheus export address. """ - super().__init__(server_id, conf_dir) + super().__init__(server_id, conf_dir, prom_export) # Sanity check that we should indeed be a local beacon server. assert not self.topology.is_core_as, "This shouldn't be a core BS!" self.beacons = PathStore(self.path_policy) diff --git a/python/cert_server/main.py b/python/cert_server/main.py index 0a67e3c642..6631d4e70e 100644 --- a/python/cert_server/main.py +++ b/python/cert_server/main.py @@ -21,10 +21,11 @@ import logging import os import threading +import time # External packages from nacl.exceptions import CryptoError -import time +from prometheus_client import Counter # SCION import lib.app.sciond as lib_sciond @@ -76,6 +77,10 @@ from sciond.sciond import SCIOND_API_SOCKDIR from scion_elem.scion_elem import SCIONElement + +# Exported metrics. +REQS_TOTAL = Counter("cs_requests_total", "# of total requests", ["server_id", "isd_as", "type"]) + # Timeout for API path requests API_TOUT = 1 # Max amount of DRKey secret values. 1 current, 1 prefetch, 1 buffer. @@ -99,20 +104,27 @@ class CertServer(SCIONElement): ZK_TRC_CACHE_PATH = "trc_cache" ZK_DRKEY_PATH = "drkey_cache" - def __init__(self, server_id, conf_dir): + def __init__(self, server_id, conf_dir, prom_export=None): """ :param str server_id: server identifier. :param str conf_dir: configuration directory. + :param str prom_export: prometheus export address. """ - super().__init__(server_id, conf_dir) + super().__init__(server_id, conf_dir, prom_export=prom_export) + cc_labels = {**self._labels, "type": "cc"} if self._labels else None + trc_labels = {**self._labels, "type": "trc"} if self._labels else None + drkey_labels = {**self._labels, "type": "drkey"} if self._labels else None self.cc_requests = RequestHandler.start( "CC Requests", self._check_cc, self._fetch_cc, self._reply_cc, + labels=cc_labels, ) self.trc_requests = RequestHandler.start( "TRC Requests", self._check_trc, self._fetch_trc, self._reply_trc, + labels=trc_labels, ) self.drkey_protocol_requests = RequestHandler.start( "DRKey Requests", self._check_drkey, self._fetch_drkey, self._reply_proto_drkey, + labels=drkey_labels, ) self.CTRL_PLD_CLASS_MAP = { @@ -229,6 +241,7 @@ def process_cert_chain_request(self, req, meta): assert isinstance(req, CertChainRequest) key = req.isd_as(), req.p.version logging.info("Cert chain request received for %sv%s from %s", *key, meta) + REQS_TOTAL.labels(**self._labels, type="cc").inc() local = meta.ia == self.addr.isd_as if not self._check_cc(key): if not local: @@ -291,6 +304,7 @@ def process_trc_request(self, req, meta): assert isinstance(req, TRCRequest) key = req.isd_as()[0], req.p.version logging.info("TRC request received for %sv%s from %s", *key, meta) + REQS_TOTAL.labels(**self._labels, type="trc").inc() local = meta.ia == self.addr.isd_as if not self._check_trc(key): if not local: @@ -362,6 +376,7 @@ def process_drkey_request(self, req, meta): """ assert isinstance(req, DRKeyRequest) logging.info("DRKeyRequest received from %s: %s", meta, req.short_desc()) + REQS_TOTAL.labels(**self._labels, type="drkey").inc() try: cert = self._verify_drkey_request(req, meta) except SCIONVerificationError as e: diff --git a/python/dns_server/main.py b/python/dns_server/main.py index 2167289916..cf558cde24 100644 --- a/python/dns_server/main.py +++ b/python/dns_server/main.py @@ -63,13 +63,14 @@ class SCIONDnsServer(SCIONElement): SRV_TYPES = (BEACON_SERVICE, CERTIFICATE_SERVICE, DNS_SERVICE, PATH_SERVICE, SIBRA_SERVICE) - def __init__(self, server_id, conf_dir, setup=False): # pragma: no cover + def __init__(self, server_id, conf_dir, setup=False, prom_export=None): # pragma: no cover """ :param str server_id: server identifier. :param str conf_dir: configuration directory. + :param str prom_export: prometheus export address. :param bool setup: should setup() be called? """ - super().__init__(server_id, conf_dir) + super().__init__(server_id, conf_dir, prom_export=prom_export) self.domain = DNSLabel(self.topology.dns_domain) self.lock = threading.Lock() self.services = {} diff --git a/python/lib/main.py b/python/lib/main.py index ccef9b7036..81039b1440 100644 --- a/python/lib/main.py +++ b/python/lib/main.py @@ -58,6 +58,7 @@ def main_default(type_, local_type=None, trace_=False, **kwargs): """ handle_signals() parser = argparse.ArgumentParser() + parser.add_argument('--prom', type=str, help='Address to export prometheus metrics on') parser.add_argument('server_id', help='Server identifier') parser.add_argument('conf_dir', nargs='?', default='.', help='Configuration directory (Default: ./)') @@ -67,14 +68,14 @@ def main_default(type_, local_type=None, trace_=False, **kwargs): init_logging(os.path.join(args.log_dir, args.server_id)) if local_type is None: - inst = type_(args.server_id, args.conf_dir, **kwargs) + inst = type_(args.server_id, args.conf_dir, prom_export=args.prom, **kwargs) else: # Load the topology to check if this is a core AD or not topo = Topology.from_file(os.path.join(args.conf_dir, TOPO_FILE)) if topo.is_core_as: - inst = type_(args.server_id, args.conf_dir, **kwargs) + inst = type_(args.server_id, args.conf_dir, prom_export=args.prom, **kwargs) else: - inst = local_type(args.server_id, args.conf_dir, **kwargs) + inst = local_type(args.server_id, args.conf_dir, prom_export=args.prom, **kwargs) if trace_: trace(inst.id) logging.info("Started %s", args.server_id) diff --git a/python/lib/packet/packet_base.py b/python/lib/packet/packet_base.py index 9d34e04667..6e0dc04e6d 100644 --- a/python/lib/packet/packet_base.py +++ b/python/lib/packet/packet_base.py @@ -114,7 +114,7 @@ def __bool__(self): return True def __len__(self): - raise NotImplementedError + return self.p.total_size.word_count * 8 def copy(self): return type(self)(self.p.copy()) diff --git a/python/lib/packet/path_mgmt/seg_recs.py b/python/lib/packet/path_mgmt/seg_recs.py index 9c88559f28..37dd42ee9f 100644 --- a/python/lib/packet/path_mgmt/seg_recs.py +++ b/python/lib/packet/path_mgmt/seg_recs.py @@ -66,6 +66,10 @@ def iter_rev_infos(self, start=0): for i in range(start, len(self.p.revInfos)): yield self.rev_info(i) + def num_segs(self): + """Returns the total number of path segments.""" + return len(self.p.recs) + def __str__(self): s = [] s.append("%s:" % self.NAME) diff --git a/python/lib/path_db.py b/python/lib/path_db.py index f66b0990eb..57699bd9c5 100644 --- a/python/lib/path_db.py +++ b/python/lib/path_db.py @@ -20,6 +20,7 @@ import threading # External packages +from prometheus_client import Counter, Gauge from pydblite.pydblite import Base # SCION @@ -27,6 +28,16 @@ from lib.util import SCIONTime +# Exported metrics +SEGS_TOTAL = Gauge("pathdb_segs_total", "# of path segments", ["server_id", "isd_as", "type"]) +SEGS_BYTES = Gauge("pathdb_segs_bytes", "Path segments memory usage", + ["server_id", "isd_as", "type"]) +SEGS_ADDED = Counter("pathdb_segs_added_total", "Total path segments added", + ["server_id", "isd_as", "type"]) +SEGS_REMOVED = Counter("pathdb_segs_removed_total", "Total path segments removed", + ["server_id", "isd_as", "type"]) + + class DBResult(object): """Enum type for the different result of an insertion""" NONE = 0 @@ -66,17 +77,23 @@ def __hash__(self): # pragma: no cover class PathSegmentDB(object): """Simple database for paths using PyDBLite""" - def __init__(self, segment_ttl=None, max_res_no=None): # pragma: no cover + def __init__(self, segment_ttl=None, max_res_no=None, labels=None): # pragma: no cover """ :param int segment_ttl: The TTL for each record in the database (in s) or None to just use the segment's expiration time. :param int max_res_no: Number of results returned for a query. + :param dict labels: + Labels added to the exported metrics. The following labels are supported: + - server_id: A unique identifier of the server that is exporting + - isd_as: The ISD_AS of where the server is running + - type: A generic label for the type of the revocations. """ self._db = None self._lock = threading.Lock() self._segment_ttl = segment_ttl self._max_res_no = max_res_no + self._labels = labels self._setup_db() def _setup_db(self): # pragma: no cover @@ -103,6 +120,10 @@ def __contains__(self, seg_id): # pragma: no cover def flush(self): # pragma: no cover """Removes all records from the database.""" + if self._labels: + SEGS_REMOVED.labels(**self._labels).inc(len(self)) + SEGS_TOTAL.labels(**self._labels).set(0) + SEGS_BYTES.labels(**self._labels).set(0) self._setup_db() def update(self, pcb, reverse=False): @@ -128,15 +149,23 @@ def update(self, pcb, reverse=False): last_ia[0], last_ia[1], pcb.is_sibra()) logging.debug("Added segment from %s to %s: %s", first_ia, last_ia, pcb.short_desc()) + if self._labels: + SEGS_ADDED.labels(**self._labels).inc() + SEGS_TOTAL.labels(**self._labels).inc() + SEGS_BYTES.labels(**self._labels).inc(len(pcb)) return DBResult.ENTRY_ADDED cur_rec = recs[0]['record'] if pcb.get_expiration_time() < cur_rec.pcb.get_expiration_time(): return DBResult.NONE + old_pcb = cur_rec.pcb cur_rec.pcb = pcb if self._segment_ttl: cur_rec.exp_time = now + self._segment_ttl else: cur_rec.exp_time = pcb.get_expiration_time() + if self._labels: + SEGS_ADDED.labels(**self._labels).inc() + SEGS_BYTES.labels(**self._labels).inc(len(pcb) - len(old_pcb)) return DBResult.ENTRY_UPDATED def delete(self, segment_id): @@ -146,6 +175,11 @@ def delete(self, segment_id): if not recs: return DBResult.NONE self._db.delete(recs) + assert len(recs) == 1 + if self._labels: + SEGS_REMOVED.labels(**self._labels).inc() + SEGS_TOTAL.labels(**self._labels).dec() + SEGS_BYTES.labels(**self._labels).dec(len(recs[0]['record'].pcb)) return DBResult.ENTRY_DELETED def delete_all(self, segment_ids): diff --git a/python/lib/requests.py b/python/lib/requests.py index 811638b599..fe4d50c26a 100644 --- a/python/lib/requests.py +++ b/python/lib/requests.py @@ -21,11 +21,19 @@ import queue from collections import defaultdict +# External +from prometheus_client import Gauge + # SCION from lib.thread import thread_safety_net from lib.util import SCIONTime +# Exported metrics. +REQS_PENDING = Gauge("rh_requests_pending", "# of pending requests", + ["server_id", "isd_as", "type"]) + + class RequestHandler(object): """ Small utility class to queue requests, check if they've been fulfilled, and @@ -48,7 +56,7 @@ class RequestHandler(object): MAX_LEN = 16 def __init__(self, queue, check, fetch, reply, ttl=TTL, - key_map=None): # pragma: no cover + key_map=None, labels=None): # pragma: no cover """ :param queue.Queue queue: Used to receive request notifications, see class docstring for @@ -62,6 +70,11 @@ def __init__(self, queue, check, fetch, reply, ttl=TTL, :param function reply: Called with the provided key and request info, should return the result to the requester. + :param dict labels: + Labels added to the exported metrics. The following labels are supported: + - server_id: A unique identifier of the server that is exporting + - isd_as: The ISD_AS of where the server is running + - type: A generic label for the type of the revocations. """ self._queue = queue self._check = check @@ -70,6 +83,7 @@ def __init__(self, queue, check, fetch, reply, ttl=TTL, self._ttl = ttl self._req_map = defaultdict(list) self._key_map = key_map or self._def_key_map + self._labels = labels @classmethod def start(cls, name, *args, **kwargs): # pragma: no cover @@ -101,6 +115,8 @@ def _add_req(self, key, request): if not self._check(key): self._fetch(key, request) self._req_map[key].append((SCIONTime.get_time(), request)) + if self._labels: + REQS_PENDING.labels(**self._labels).inc() def _answer_reqs(self, key): if not self._check(key): @@ -108,9 +124,13 @@ def _answer_reqs(self, key): return self._expire_reqs(key) reqs = self._req_map[key] + count = 0 while reqs: _, req = reqs.pop(0) self._reply(key, req) + count += 1 + if self._labels: + REQS_PENDING.labels(**self._labels).dec(count) del self._req_map[key] def _expire_reqs(self, key): @@ -124,6 +144,8 @@ def _expire_reqs(self, key): self._req_map[key].remove((ts, req)) if count: logging.debug("Expired %d requests for %s", count, key) + if self._labels: + REQS_PENDING.labels(**self._labels).dec(count) @staticmethod def _def_key_map(key, keys): # pragma: no cover diff --git a/python/lib/rev_cache.py b/python/lib/rev_cache.py index 894852c79f..84efb4657b 100644 --- a/python/lib/rev_cache.py +++ b/python/lib/rev_cache.py @@ -19,10 +19,22 @@ import logging import threading +# External +from prometheus_client import Counter, Gauge + # SCION from lib.crypto.hash_tree import ConnectedHashTree +# Exported metrics. +REVS_TOTAL = Gauge("rc_revs_total", "# of cached revocations", ["server_id", "isd_as"]) +REVS_BYTES = Gauge("rc_revs_bytes", "RevCache memory usage", ["server_id", "isd_as"]) +REVS_ADDED = Counter("rc_revs_added_total", "Total revocations added", + ["server_id", "isd_as"]) +REVS_REMOVED = Counter("rc_revs_removed_total", "Total revocations removed", + ["server_id", "isd_as"]) + + def _mk_key(rev_info): """Returns the key for a RevocationInfo object.""" return (rev_info.isd_as(), rev_info.p.ifID) @@ -31,10 +43,18 @@ def _mk_key(rev_info): class RevCache: """Thread-safe cache for revocations with auto expiration of entries.""" - def __init__(self, capacity=1000): # pragma: no cover + def __init__(self, capacity=1000, labels=None): # pragma: no cover + """ + :param dict labels: + Labels added to the exported metrics. The following labels are supported: + - server_id: A unique identifier of the server that is exporting + - isd_as: The ISD_AS of where the server is running + - type: A generic label for the type of the revocations. + """ self._cache = {} self._lock = threading.RLock() self._capacity = capacity + self._labels = labels def __contains__(self, rev_info): # pragma: no cover return self.contains_key(_mk_key(rev_info)) @@ -76,9 +96,17 @@ def add(self, rev_info): logging.error("Revocation cache full!.") return False self._cache[key] = rev_info + if self._labels: + REVS_ADDED.labels(**self._labels).inc() + REVS_TOTAL.labels(**self._labels).inc() + REVS_BYTES.labels(**self._labels).inc(len(rev_info)) return True if rev_info.p.epoch > stored_info.p.epoch: self._cache[key] = rev_info + if self._labels: + REVS_ADDED.labels(**self._labels).inc() + REVS_REMOVED.labels(**self._labels).inc() + REVS_BYTES.labels(**self._labels).inc(len(rev_info) - len(stored_info)) return True return False @@ -86,5 +114,9 @@ def _validate_entry(self, rev_info, cur_epoch=None): # pragma: no cover """Removes an expired revocation from the cache.""" if not ConnectedHashTree.verify_epoch(rev_info.p.epoch, cur_epoch): del self._cache[_mk_key(rev_info)] + if self._labels: + REVS_REMOVED.labels(**self._labels).inc() + REVS_TOTAL.labels(**self._labels).dec() + REVS_BYTES.labels(**self._labels).dec(len(rev_info)) return False return True diff --git a/python/path_server/base.py b/python/path_server/base.py index e5516cc3dc..4a67b53bec 100644 --- a/python/path_server/base.py +++ b/python/path_server/base.py @@ -25,6 +25,7 @@ # External packages from external.expiring_dict import ExpiringDict +from prometheus_client import Counter, Gauge # SCION from lib.crypto.hash_tree import ConnectedHashTree @@ -57,6 +58,13 @@ from scion_elem.scion_elem import SCIONElement +# Exported metrics. +REQS_TOTAL = Counter("ps_reqs_total", "# of path requests", ["server_id", "isd_as"]) +REQS_PENDING = Gauge("ps_req_pending_total", "# of pending path requests", ["server_id", "isd_as"]) +SEGS_TO_ZK = Gauge("ps_segs_to_zk_total", "# of path segments to ZK", ["server_id", "isd_as"]) +REVS_TO_ZK = Gauge("ps_revs_to_zk_total", "# of revocations to ZK", ["server_id", "isd_as"]) + + class PathServer(SCIONElement, metaclass=ABCMeta): """ The SCION Path Server. @@ -76,20 +84,23 @@ class PathServer(SCIONElement, metaclass=ABCMeta): # TTL of segments in the queue for ZK (in seconds) SEGS_TO_ZK_TTL = 10 * 60 - def __init__(self, server_id, conf_dir): + def __init__(self, server_id, conf_dir, prom_export=None): """ :param str server_id: server identifier. :param str conf_dir: configuration directory. + :param str prom_export: prometheus export address. """ - super().__init__(server_id, conf_dir) - self.down_segments = PathSegmentDB(max_res_no=self.MAX_SEG_NO) - self.core_segments = PathSegmentDB(max_res_no=self.MAX_SEG_NO) + super().__init__(server_id, conf_dir, prom_export=prom_export) + down_labels = {**self._labels, "type": "down"} if self._labels else None + core_labels = {**self._labels, "type": "core"} if self._labels else None + self.down_segments = PathSegmentDB(max_res_no=self.MAX_SEG_NO, labels=down_labels) + self.core_segments = PathSegmentDB(max_res_no=self.MAX_SEG_NO, labels=core_labels) self.pending_req = defaultdict(list) # Dict of pending requests. self.pen_req_lock = threading.Lock() self._request_logger = None # Used when l/cPS doesn't have up/dw-path. self.waiting_targets = defaultdict(list) - self.revocations = RevCache() + self.revocations = RevCache(labels=self._labels) # A mapping from (hash tree root of AS, IFID) to segments self.htroot_if2seg = ExpiringDict(1000, HASHTREE_TTL) self.htroot_if2seglock = Lock() @@ -154,6 +165,7 @@ def worker(self): self._update_master() self._propagate_and_sync() self._handle_pending_requests() + self._update_metrics() def _update_master(self): pass @@ -522,6 +534,25 @@ def get_request_logger(self, req, meta): return logging.LoggerAdapter( self._request_logger, {"id": req_id, "req": req.short_desc(), "from": str(meta)}) + def _update_metrics(self): + """ + Updates all Gauge metrics. Subclass can update their own metrics but must + call the superclass' implementation. + """ + if not self._labels: + return + # Update pending requests metric. + # XXX(shitz): This could become a performance problem should there ever be + # a large amount of pending requests (>100'000). + total_pending = 0 + with self.pen_req_lock: + for reqs in self.pending_req.values(): + total_pending += len(reqs) + REQS_PENDING.labels(**self._labels).set(total_pending) + # Update SEGS_TO_ZK and REVS_TO_ZK metrics. + SEGS_TO_ZK.labels(**self._labels).set(len(self._segs_to_zk)) + REVS_TO_ZK.labels(**self._labels).set(len(self._revs_to_zk)) + def run(self): """ Run an instance of the Path Server. diff --git a/python/path_server/core.py b/python/path_server/core.py index fbbf3f4830..566198f2ef 100644 --- a/python/path_server/core.py +++ b/python/path_server/core.py @@ -21,6 +21,9 @@ # External from external.expiring_dict import ExpiringDict +# External packages +from prometheus_client import Gauge + # SCION from lib.defines import PATH_FLAG_CACHEONLY, PATH_FLAG_SIBRA from lib.packet.path_mgmt.seg_recs import PathRecordsReply @@ -28,7 +31,14 @@ from lib.packet.svc import SVCType from lib.types import PathMgmtType as PMT, PathSegmentType as PST from lib.zk.errors import ZkNoConnection -from path_server.base import PathServer +from path_server.base import PathServer, REQS_TOTAL + + +# Exported metrics. +SEGS_TO_MASTER = Gauge("ps_segs_to_master_total", "# of path segments to master", + ["server_id", "isd_as"]) +SEGS_TO_PROP = Gauge("ps_segs_to_prop_total", "# of segments to propagate", + ["server_id", "isd_as"]) class CorePathServer(PathServer): @@ -37,12 +47,13 @@ class CorePathServer(PathServer): core segments and forwards inter-ISD path requests to the corresponding path server. """ - def __init__(self, server_id, conf_dir): + def __init__(self, server_id, conf_dir, prom_export=None): """ :param str server_id: server identifier. :param str conf_dir: configuration directory. + :param str prom_export: prometheus export address. """ - super().__init__(server_id, conf_dir) + super().__init__(server_id, conf_dir, prom_export=prom_export) # Sanity check that we should indeed be a core path server. assert self.topology.is_core_as, "This shouldn't be a local PS!" self._master_id = None # Address of master core Path Server. @@ -165,7 +176,8 @@ def _prop_to_core(self): logging.debug("Propagating %d segment(s) to other core ASes", len(self._segs_to_prop)) for pcbs in self._gen_prop_recs(self._segs_to_prop): - self._propagate_to_core_ases(PathRecordsReply.from_values(pcbs)) + reply = PathRecordsReply.from_values(pcbs) + self._propagate_to_core_ases(reply) def _prop_to_master(self): assert not self.zk.have_lock() @@ -177,7 +189,8 @@ def _prop_to_master(self): logging.debug("Propagating %d segment(s) to master PS: %s", len(self._segs_to_master), self._master_id) for pcbs in self._gen_prop_recs(self._segs_to_master): - self._send_to_master(PathRecordsReply.from_values(pcbs)) + reply = PathRecordsReply.from_values(pcbs) + self._send_to_master(reply) def _send_to_master(self, pld): """ @@ -247,6 +260,7 @@ def path_resolution(self, req, meta, new_request=True, logger=None): dst_ia = req.dst_ia() if new_request: logger.info("PATH_REQ received") + REQS_TOTAL.labels(**self._labels).inc() if dst_ia == self.addr.isd_as: logger.warning("Dropping request: requested DST is local AS") return False @@ -368,3 +382,9 @@ def _forward_revocation(self, rev_info, meta): logging.debug("Propagating revocation to other cores: %s" % rev_info.short_desc()) self._propagate_to_core_ases(rev_info) + + def _update_metrics(self): + super()._update_metrics() + if self._labels: + SEGS_TO_MASTER.labels(**self._labels).set(len(self._segs_to_master)) + SEGS_TO_PROP.labels(**self._labels).set(len(self._segs_to_prop)) diff --git a/python/path_server/local.py b/python/path_server/local.py index 8547bc932c..06806b5a25 100644 --- a/python/path_server/local.py +++ b/python/path_server/local.py @@ -22,7 +22,7 @@ from lib.packet.svc import SVCType from lib.path_db import PathSegmentDB from lib.types import PathSegmentType as PST -from path_server.base import PathServer +from path_server.base import PathServer, REQS_TOTAL class LocalPathServer(PathServer): @@ -30,16 +30,18 @@ class LocalPathServer(PathServer): SCION Path Server in a non-core AS. Stores up-segments to the core and registers down-segments with the CPS. Can cache segments learned from a CPS. """ - def __init__(self, server_id, conf_dir): + def __init__(self, server_id, conf_dir, prom_export=None): """ :param str server_id: server identifier. :param str conf_dir: configuration directory. + :param str prom_export: prometheus export address. """ - super().__init__(server_id, conf_dir) + super().__init__(server_id, conf_dir, prom_export) # Sanity check that we should indeed be a local path server. assert not self.topology.is_core_as, "This shouldn't be a core PS!" # Database of up-segments to the core. - self.up_segments = PathSegmentDB(max_res_no=self.MAX_SEG_NO) + up_labels = {**self._labels, "type": "up"} if self._labels else None + self.up_segments = PathSegmentDB(max_res_no=self.MAX_SEG_NO, labels=up_labels) def _handle_up_segment_record(self, pcb, from_zk=False): if not from_zk: @@ -70,6 +72,7 @@ def path_resolution(self, req, meta, new_request=True, logger=None): dst_ia = req.dst_ia() if new_request: logger.info("PATH_REQ received") + REQS_TOTAL.labels(**self._labels).inc() if dst_ia == self.addr.isd_as: logger.warning("Dropping request: requested DST is local AS") return False @@ -87,6 +90,7 @@ def path_resolution(self, req, meta, new_request=True, logger=None): if new_request: self._request_paths_from_core(req, logger) self.pending_req[(dst_ia, req.p.flags.sibra)].append((req, meta, logger)) + return False def _resolve_core(self, req, up_segs, core_segs): diff --git a/python/router/main.py b/python/router/main.py index 75c6cd7628..0dd79e0167 100644 --- a/python/router/main.py +++ b/python/router/main.py @@ -101,12 +101,13 @@ class Router(SCIONElement): FWD_REVOCATION_TIMEOUT = 5 IFSTATE_REQ_INTERVAL = 30 - def __init__(self, server_id, conf_dir, ): + def __init__(self, server_id, conf_dir, prom_export=None): """ :param str server_id: server identifier. :param str conf_dir: configuration directory. + :param str prom_export: prometheus export address. """ - super().__init__(server_id, conf_dir, ) + super().__init__(server_id, conf_dir, prom_export=prom_export) self.interface = None for border_router in self.topology.get_all_border_routers(): if border_router.name == self.id: diff --git a/python/scion_elem/scion_elem.py b/python/scion_elem/scion_elem.py index 60b4269874..cad8e0cfb3 100644 --- a/python/scion_elem/scion_elem.py +++ b/python/scion_elem/scion_elem.py @@ -24,6 +24,9 @@ import time from collections import defaultdict +# External packages +from prometheus_client import Counter, Gauge, start_http_server + # SCION from lib.config import Config from lib.crypto.certificate_chain import verify_sig_chain_trc @@ -101,6 +104,15 @@ from lib.util import hex_str, sleep_interval +# Exported metrics. +PKT_BUF_TOTAL = Gauge("se_pkt_buf_total", "Total packets in input buffer", + ["server_id", "isd_as"]) +PKT_BUF_BYTES = Gauge("se_pkt_buf_bytes", "Memory usage of input buffer", + ["server_id", "isd_as"]) +PKTS_DROPPED_TOTAL = Counter("se_packets_dropped_total", "Total packets dropped", + ["server_id", "isd_as"]) + + MAX_QUEUE = 50 @@ -121,7 +133,7 @@ class SCIONElement(object): # Timeout for TRC or Certificate requests. TRC_CC_REQ_TIMEOUT = 3 - def __init__(self, server_id, conf_dir, public=None, bind=None): + def __init__(self, server_id, conf_dir, public=None, bind=None, prom_export=None): """ :param str server_id: server identifier. :param str conf_dir: configuration directory. @@ -132,6 +144,9 @@ def __init__(self, server_id, conf_dir, public=None, bind=None): (host_addr, port) of the element's bind address, if any (i.e. the address the element uses to identify itself to the local operating system, if it differs from the public address due to NAT). + :param str prom_export: + String of the form 'addr:port' specifying the prometheus endpoint. + If no string is provided, no metrics are exported. """ self.id = server_id self.conf_dir = conf_dir @@ -178,6 +193,9 @@ def __init__(self, server_id, conf_dir, public=None, bind=None): host_addr, self._port = self.public[0] self.addr = SCIONAddr.from_values(self.topology.isd_as, host_addr) self._setup_sockets(True) + self._labels = None + if prom_export: + self._export_metrics(prom_export) def _setup_sockets(self, init): """ @@ -861,15 +879,23 @@ def _in_buf_put(self, item): while True: try: self._in_buf.put(item, block=False) + if self._labels: + PKT_BUF_BYTES.labels(**self._labels).inc(len(item[0])) except queue.Full: - self._in_buf.get_nowait() + msg, _ = self._in_buf.get_nowait() dropped += 1 + if self._labels: + PKTS_DROPPED_TOTAL.labels(**self._labels).inc() + PKT_BUF_BYTES.labels(**self._labels).dec(len(msg)) else: break + finally: + if self._labels: + PKT_BUF_TOTAL.labels(**self._labels).set(self._in_buf.qsize()) if dropped > 0: self.total_dropped += dropped - logging.debug("%d packet(s) dropped (%d total dropped so far)", - dropped, self.total_dropped) + logging.warning("%d packet(s) dropped (%d total dropped so far)", + dropped, self.total_dropped) def _get_msg_meta(self, packet, addr, sock): pkt = self._parse_packet(packet) @@ -950,7 +976,11 @@ def _packet_process(self): """ while self.run_flag.is_set(): try: - self.handle_msg_meta(*self._in_buf.get(timeout=1.0)) + msg, meta = self._in_buf.get(timeout=1.0) + if self._labels: + PKT_BUF_BYTES.labels(**self._labels).dec(len(msg)) + PKT_BUF_TOTAL.labels(**self._labels).set(self._in_buf.qsize()) + self.handle_msg_meta(msg, meta) except queue.Empty: continue @@ -1109,3 +1139,17 @@ def _build_meta(self, ia=None, host=None, path=None, port=0, reuse=False, flags=TCPFlags.ONEHOPPATH) return UDPMetadata.from_values(ia, host, path, port=port, reuse=reuse, ext_hdrs=[OneHopPathExt()]) + + def _export_metrics(self, export_addr): + """ + Starts an HTTP server endpoint for prometheus to scrape. + """ + # Create a dummy counter to export the server_id as a label for correlating + # server_id and other metrics. + self._labels = {"server_id": self.id, "isd_as": str(self.topology.isd_as)} + + addr, port = export_addr.split(":") + port = int(port) + addr = addr.strip("[]") + logging.info("Exporting metrics on %s", export_addr) + start_http_server(port, addr=addr) diff --git a/python/sciond/sciond.py b/python/sciond/sciond.py index da7b21c0af..8a9eddaaa6 100644 --- a/python/sciond/sciond.py +++ b/python/sciond/sciond.py @@ -87,15 +87,17 @@ class SCIONDaemon(SCIONElement): SEGMENT_TTL = 300 def __init__(self, conf_dir, addr, api_addr, run_local_api=False, - port=None): + port=None, prom_export=None): """ Initialize an instance of the class SCIONDaemon. """ - super().__init__("sciond", conf_dir, public=[(addr, port)]) - # TODO replace by pathstore instance - self.up_segments = PathSegmentDB(segment_ttl=self.SEGMENT_TTL) - self.down_segments = PathSegmentDB(segment_ttl=self.SEGMENT_TTL) - self.core_segments = PathSegmentDB(segment_ttl=self.SEGMENT_TTL) + super().__init__("sciond", conf_dir, prom_export=prom_export, public=[(addr, port)]) + up_labels = {**self._labels, "type": "up"} if self._labels else None + down_labels = {**self._labels, "type": "down"} if self._labels else None + core_labels = {**self._labels, "type": "core"} if self._labels else None + self.up_segments = PathSegmentDB(segment_ttl=self.SEGMENT_TTL, labels=up_labels) + self.down_segments = PathSegmentDB(segment_ttl=self.SEGMENT_TTL, labels=down_labels) + self.core_segments = PathSegmentDB(segment_ttl=self.SEGMENT_TTL, labels=core_labels) self.peer_revs = RevCache() # Keep track of requested paths. self.requested_paths = ExpiringDict(self.MAX_REQS, self.PATH_REQ_TOUT) diff --git a/python/sibra_server/main.py b/python/sibra_server/main.py index 383ea6bfb3..ff0a366d20 100644 --- a/python/sibra_server/main.py +++ b/python/sibra_server/main.py @@ -64,12 +64,13 @@ class SibraServerBase(SCIONElement): SERVICE_TYPE = SIBRA_SERVICE PST_TYPE = None - def __init__(self, server_id, conf_dir): + def __init__(self, server_id, conf_dir, prom_export=None): """ :param str server_id: server identifier. :param str conf_dir: configuration directory. + :param str prom_export: prometheus export address. """ - super().__init__(server_id, conf_dir) + super().__init__(server_id, conf_dir, prom_export=prom_export) self.sendq = Queue() self.signing_key = get_sig_key(self.conf_dir) self.segments = PathSegmentDB(max_res_no=1) diff --git a/python/test/lib/main_test.py b/python/test/lib/main_test.py index 78295dac0a..50635cba96 100644 --- a/python/test/lib/main_test.py +++ b/python/test/lib/main_test.py @@ -72,6 +72,7 @@ def test_trace(self, signals, argparse, init_log, trace): args.log_dir = "logging" args.server_id = "srvid" args.conf_dir = "confdir" + args.prom = "prom" # Call main_default(type_, trace_=True, kwarg1="kwarg1") # Tests @@ -80,7 +81,7 @@ def test_trace(self, signals, argparse, init_log, trace): ntools.ok_(parser.add_argument.called) parser.parse_args.assert_called_once_with() init_log.assert_called_once_with("logging/srvid") - type_.assert_called_once_with("srvid", "confdir", kwarg1="kwarg1") + type_.assert_called_once_with("srvid", "confdir", prom_export="prom", kwarg1="kwarg1") trace.assert_called_once_with(inst.id) inst.run.assert_called_once_with() diff --git a/python/test/lib/path_db_test.py b/python/test/lib/path_db_test.py index 2269d1b81d..6a7a2f72e6 100644 --- a/python/test/lib/path_db_test.py +++ b/python/test/lib/path_db_test.py @@ -128,10 +128,10 @@ class TestPathSegmentDBDelete(object): def test_basic(self): pth_seg_db = PathSegmentDB() pth_seg_db._db = create_mock(['delete']) - pth_seg_db._db.return_value = "data1" - ntools.eq_(pth_seg_db.delete("data2"), DBResult.ENTRY_DELETED) - pth_seg_db._db.assert_called_once_with(id="data2") - pth_seg_db._db.delete.assert_called_once_with("data1") + pth_seg_db._db.return_value = ["data1"] + ntools.eq_(pth_seg_db.delete("data1"), DBResult.ENTRY_DELETED) + pth_seg_db._db.assert_called_once_with(id="data1") + pth_seg_db._db.delete.assert_called_once_with(["data1"]) def test_not_present(self): pth_seg_db = PathSegmentDB() diff --git a/python/topology/generator.py b/python/topology/generator.py index efbcd84433..ba180f6dd5 100755 --- a/python/topology/generator.py +++ b/python/topology/generator.py @@ -705,32 +705,55 @@ def _write_ifids(self): class PrometheusGenerator(object): PROM_DIR = "prometheus" - BR_TARGET_FILE = "br.yml" + TARGET_FILES = { + "BorderRouters": "br.yml", + "BeaconService": "bs.yml", + "CertificateService": "cs.yml", + "PathService": "ps.yml", + } + JOB_NAMES = { + "BorderRouters": "BR", + "BeaconService": "BS", + "CertificateService": "CS", + "PathService": "PS", + } def __init__(self, out_dir, topo_dicts): self.out_dir = out_dir self.topo_dicts = topo_dicts def generate(self): - router_dict = {} + config_dict = {} for topo_id, as_topo in self.topo_dicts.items(): - router_list = [] + ele_dict = defaultdict(list) for br_id, br_ele in as_topo["BorderRouters"].items(): - router_list.append(_prom_addr_br(br_ele)) - router_dict[topo_id] = router_list - self._write_config_files(router_dict) - - def _write_config_files(self, router_dict): - list_of_paths = [] - for topo_id, router_list in router_dict.items(): + ele_dict["BorderRouters"].append(_prom_addr_br(br_ele)) + for svc_type in ["BeaconService", "PathService", "CertificateService"]: + for elem_id, elem in as_topo[svc_type].items(): + ele_dict[svc_type].append(_prom_addr_infra(elem)) + config_dict[topo_id] = ele_dict + self._write_config_files(config_dict) + + def _write_config_files(self, config_dict): + targets_paths = defaultdict(list) + for topo_id, ele_dict in config_dict.items(): base = os.path.join(self.out_dir, topo_id.ISD(), topo_id.AS()) - targets_path = os.path.join(base, self.PROM_DIR, self.BR_TARGET_FILE) - list_of_paths.append(targets_path) - self._write_config_file(os.path.join(base, PROM_FILE), [targets_path]) - self._write_target_file(base, router_list) - self._write_config_file(os.path.join(self.out_dir, PROM_FILE), list_of_paths) - - def _write_config_file(self, config_path, file_paths): + as_local_targets_path = {} + for ele_type, target_list in ele_dict.items(): + targets_path = os.path.join(base, self.PROM_DIR, self.TARGET_FILES[ele_type]) + targets_paths[self.JOB_NAMES[ele_type]].append(targets_path) + as_local_targets_path[self.JOB_NAMES[ele_type]] = [targets_path] + self._write_target_file(base, target_list, ele_type) + self._write_config_file(os.path.join(base, PROM_FILE), as_local_targets_path) + self._write_config_file(os.path.join(self.out_dir, PROM_FILE), targets_paths) + + def _write_config_file(self, config_path, job_dict): + scrape_configs = [] + for job_name, file_paths in job_dict.items(): + scrape_configs.append({ + 'job_name': job_name, + 'file_sd_configs': [{'files': file_paths}], + }) config = { 'global': { 'scrape_interval': '5s', @@ -739,16 +762,13 @@ def _write_config_file(self, config_path, file_paths): 'monitor': 'scion-monitor' } }, - 'scrape_configs': [{ - 'job_name': 'border', - 'file_sd_configs': [{'files': file_paths}] - }], + 'scrape_configs': scrape_configs, } write_file(config_path, yaml.dump(config, default_flow_style=False)) - def _write_target_file(self, base_path, router_addrs): - targets_path = os.path.join(base_path, self.PROM_DIR, self.BR_TARGET_FILE) - target_config = [{'targets': router_addrs}] + def _write_target_file(self, base_path, target_addrs, ele_type): + targets_path = os.path.join(base_path, self.PROM_DIR, self.TARGET_FILES[ele_type]) + target_config = [{'targets': target_addrs}] write_file(targets_path, yaml.dump(target_config, default_flow_style=False)) @@ -787,9 +807,9 @@ def _as_conf(self, topo_id, topo): def _std_entries(self, topo, topo_key, cmd, base): entries = [] - for elem in topo.get(topo_key, {}): - conf_dir = os.path.join(base, elem) - entries.append((elem, [cmd, elem, conf_dir])) + for elem_id, elem in topo.get(topo_key, {}).items(): + conf_dir = os.path.join(base, elem_id) + entries.append((elem_id, [cmd, "--prom", _prom_addr_infra(elem), elem_id, conf_dir])) return entries def _br_entries(self, topo, cmd, base): @@ -1253,7 +1273,13 @@ def _topo_json_to_yaml(topo_dicts): def _prom_addr_br(br_ele): """Get the prometheus address for a border router""" int_addr = br_ele['InternalAddrs'][0]['Public'][0] - return "[%s]:%s" % (int_addr['Addr'].ip, int_addr['L4Port']+1) + return "[%s]:%s" % (int_addr['Addr'].ip, int_addr['L4Port'] + 1) + + +def _prom_addr_infra(infra_ele): + """Get the prometheus address for an infrastructure element.""" + int_addr = infra_ele["Public"][0] + return "[%s]:%s" % (int_addr["Addr"].ip, int_addr["L4Port"] + 1) def main():