From 453afe15aa169d41d10f2d914617f1f4e4037a2f Mon Sep 17 00:00:00 2001 From: Noguchi Ko <92015366+noguchiko@users.noreply.github.com> Date: Wed, 8 Jun 2022 17:11:11 +0900 Subject: [PATCH] north/gnmi: add support for Subscribe RPC This change adds a support for the Subscribe RPC to the gNMI server. It uses the goldstone-telemetry model to provide streaming telemetry updates to an external telemetry collector. It allows users to synchronize the states between a device and a controller/collector. --- Makefile | 2 +- src/lib/goldstone/lib/connector/sysrepo.py | 8 + src/north/gnmi/README.md | 3 +- .../gnmi/goldstone/north/gnmi/repo/repo.py | 22 +- .../gnmi/goldstone/north/gnmi/repo/sysrepo.py | 11 +- src/north/gnmi/goldstone/north/gnmi/server.py | 320 ++- src/north/gnmi/tests/lib.py | 105 +- src/north/gnmi/tests/test_gnmi.py | 2209 +++++++++++++++++ 8 files changed, 2637 insertions(+), 43 deletions(-) diff --git a/Makefile b/Makefile index b2f0c14c..593e3a3a 100644 --- a/Makefile +++ b/Makefile @@ -165,6 +165,6 @@ unittest-netlink: unittest-gnmi: $(MAKE) clean-sysrepo cd src/north/gnmi && make proto - scripts/gs-yang.py --install xlate-oc --search-dirs yang sm/openconfig + scripts/gs-yang.py --install xlate-oc system-telemetry --search-dirs yang sm/openconfig cd src/north/gnmi && PYTHONPATH=../../lib python -m unittest -v -f $(TEST_CASE) cd src/north/gnmi && make clean diff --git a/src/lib/goldstone/lib/connector/sysrepo.py b/src/lib/goldstone/lib/connector/sysrepo.py index 7431706a..8d583a48 100644 --- a/src/lib/goldstone/lib/connector/sysrepo.py +++ b/src/lib/goldstone/lib/connector/sysrepo.py @@ -9,6 +9,7 @@ import sysrepo import libyang import logging +import inspect logger = logging.getLogger(__name__) @@ -127,6 +128,13 @@ def send_notification(self, name: str, notification: dict): logger.debug(f"sending notification {name}: {notification}") self.session.notification_send(name, notification) + def subscribe_notification(self, xpath, callback): + model = xpath.split("/")[1].split(":")[0] + asyncio_register = inspect.iscoroutinefunction(callback) + self.session.subscribe_notification( + model, xpath, callback, asyncio_register=asyncio_register + ) + def subscribe_notifications(self, callback): f = lambda xpath, notif_type, value, timestamp, priv: callback( {xpath: value, "eventTime": timestamp} diff --git a/src/north/gnmi/README.md b/src/north/gnmi/README.md index 12188152..e120e876 100644 --- a/src/north/gnmi/README.md +++ b/src/north/gnmi/README.md @@ -11,6 +11,7 @@ The gNMI server supports following gNMI RPCs: - `Capabilities` - `Get` - `Set` +- `Subscribe` The gNMI server supports limited `Set` transaction. It has following limitations: @@ -19,12 +20,10 @@ The gNMI server supports limited `Set` transaction. It has following limitations Currently, the gNMI server does not yet support following features: -- `Subscribe` RPC - `replace` operation for `Set` RPC - `type` specification for `Get` RPC - Wildcards in a `path` field - Value encodings other than JSON -- RPC on TLS - RPC authentication and authorization ## Prerequisites diff --git a/src/north/gnmi/goldstone/north/gnmi/repo/repo.py b/src/north/gnmi/goldstone/north/gnmi/repo/repo.py index 6ba222fc..219e56ee 100644 --- a/src/north/gnmi/goldstone/north/gnmi/repo/repo.py +++ b/src/north/gnmi/goldstone/north/gnmi/repo/repo.py @@ -8,8 +8,8 @@ def __init__(self, item): class ApplyFailedError(Exception): - def __init__(self): - super().__init__("Apply changes to the repository failed") + def __init__(self, msg): + super().__init__("Apply changes to the repository failed: {}".format(msg)) class Repository: @@ -95,3 +95,21 @@ def get_list_keys(self, path): ValueError: 'path' has an invalid value. """ pass + + def subscribe_notification(self, xpath, callback): + """Subscribe a notification. + + Args: + xpath (str): Path to the notification. + callback (func): Callback function to notify. + """ + pass + + def exec_rpc(self, xpath, params): + """Execute an RPC. + + Args: + xpath (str): Path to the RPC. + params (dict): RPC parameters. + """ + pass diff --git a/src/north/gnmi/goldstone/north/gnmi/repo/sysrepo.py b/src/north/gnmi/goldstone/north/gnmi/repo/sysrepo.py index 80df5ffb..f91c25de 100644 --- a/src/north/gnmi/goldstone/north/gnmi/repo/sysrepo.py +++ b/src/north/gnmi/goldstone/north/gnmi/repo/sysrepo.py @@ -138,8 +138,9 @@ def apply(self): self._connector.apply() except ConnectorError as e: # TODO: can split into detailed exceptions? - logger.error("apply failed. %s", e) - raise ApplyFailedError() from e + msg = f"apply failed. {e}" + logger.error(msg) + raise ApplyFailedError(msg) from e def discard(self): self._connector.discard_changes() @@ -161,3 +162,9 @@ def get_list_keys(self, path): for key in node.keys(): keys.append(key.name()) return keys + + def subscribe_notification(self, xpath, callback): + self._connector.operational_session.subscribe_notification(xpath, callback) + + def exec_rpc(self, xpath, params): + self._connector.operational_session.rpc(xpath, params) diff --git a/src/north/gnmi/goldstone/north/gnmi/server.py b/src/north/gnmi/goldstone/north/gnmi/server.py index 663f3529..9114ae9b 100644 --- a/src/north/gnmi/goldstone/north/gnmi/server.py +++ b/src/north/gnmi/goldstone/north/gnmi/server.py @@ -1,23 +1,62 @@ """gNMI server.""" +import re import logging from concurrent import futures import json import time import grpc +import random +import libyang from .proto import gnmi_pb2_grpc, gnmi_pb2 from .repo.repo import NotFoundError, ApplyFailedError logger = logging.getLogger(__name__) + GRPC_STATUS_CODE_OK = grpc.StatusCode.OK.value[0] GRPC_STATUS_CODE_UNKNOWN = grpc.StatusCode.UNKNOWN.value[0] GRPC_STATUS_CODE_INVALID_ARGUMENT = grpc.StatusCode.INVALID_ARGUMENT.value[0] GRPC_STATUS_CODE_NOT_FOUND = grpc.StatusCode.NOT_FOUND.value[0] GRPC_STATUS_CODE_ABORTED = grpc.StatusCode.ABORTED.value[0] GRPC_STATUS_CODE_UNIMPLEMENTED = grpc.StatusCode.UNIMPLEMENTED.value[0] +REGEX_PTN_LIST_KEY = re.compile(r"\[.*.*\]") + + +class InvalidArgumentError(Exception): + pass + + +def _parse_gnmi_path(gnmi_path): + xpath = "" + for elem in gnmi_path.elem: + xpath += f"/{elem.name}" + if elem.key: + for key in sorted(elem.key): + value = elem.key.get(key) + xpath += f"[{key}='{value}']" + return xpath + + +def _build_gnmi_path(xpath): + gnmi_path = gnmi_pb2.Path() + elements = list(libyang.xpath_split(xpath)) + for elem in elements: + prefix = elem[0] + name = elem[1] + if prefix is not None: + name = f"{prefix}:{name}" + keys = {} + for kv_peer in elem[2]: + keys[kv_peer[0]] = kv_peer[1] + if len(keys) > 0: + path_elem = gnmi_pb2.PathElem(name=name, key=keys) + else: + path_elem = gnmi_pb2.PathElem(name=name) + gnmi_path.elem.append(path_elem) + return gnmi_path class Request: @@ -40,23 +79,13 @@ def __init__(self, repo, prefix, gnmi_path): self.repo = repo self.prefix = prefix self.gnmi_path = gnmi_path - self.xpath = self._parse_xpath(prefix) + self._parse_xpath(gnmi_path) + self.xpath = _parse_gnmi_path(prefix) + _parse_gnmi_path(gnmi_path) logger.debug("Requested xpath: %s", self.xpath) self.status = gnmi_pb2.Error( code=GRPC_STATUS_CODE_OK, message=None, ) - def _parse_xpath(self, path): - xpath = "" - for elem in path.elem: - xpath += f"/{elem.name}" - if elem.key: - for key in sorted(elem.key): - value = elem.key.get(key) - xpath += f"[{key}='{value}']" - return xpath - def exec(self): """Execute the request. @@ -276,6 +305,169 @@ def exec(self): return +class SubscribeRequest: + """Request for gNMI Subscribe service. + + Attributes: + repo (Repository): Repository to access the datastore. + rid (int): Request ID. + subscribe (gnmi_pb2.SubscriptionList): gNMI subscribe request body. + """ + + PATH_SR = "/goldstone-telemetry:subscribe-requests/subscribe-request[id='{}']" + PATH_POLL = "/goldstone-telemetry:poll" + + SUBSCRIBE_REQUEST_MODES = { + gnmi_pb2.SubscriptionList.Mode.STREAM: "STREAM", + gnmi_pb2.SubscriptionList.Mode.ONCE: "ONCE", + gnmi_pb2.SubscriptionList.Mode.POLL: "POLL", + } + + SUBSCRIPTION_MODES = { + gnmi_pb2.SubscriptionMode.TARGET_DEFINED: "TARGET_DEFINED", + gnmi_pb2.SubscriptionMode.ON_CHANGE: "ON_CHANGE", + gnmi_pb2.SubscriptionMode.SAMPLE: "SAMPLE", + } + + def __init__(self, repo, rid, subscribe): + self._repo = repo + self._rid = rid + self._config = self._parse_config(subscribe) + self._notifs = [] + + def _parse_subscription_config(self, sid, config): + if not config.HasField("path"): + msg = "path should be specified." + logger.error(msg) + raise InvalidArgumentError(msg) + try: + mode = self.SUBSCRIPTION_MODES[config.mode] + except KeyError as e: + msg = f"mode has an invalid value {config.mode}." + logger.error(msg) + raise InvalidArgumentError(msg) from e + return { + "id": sid, + "path": _parse_gnmi_path(config.path), + "mode": mode, + "sample-interval": config.sample_interval, + "suppress-redundant": config.suppress_redundant, + "heartbeat-interval": config.heartbeat_interval, + } + + def _parse_config(self, config): + try: + mode = self.SUBSCRIBE_REQUEST_MODES[config.mode] + except KeyError as e: + raise InvalidArgumentError( + f"mode has an invalid value {config.mode}." + ) from e + subscriptions = [] + for sid, subscription in enumerate(config.subscription): + subscriptions.append(self._parse_subscription_config(sid, subscription)) + return { + "id": self._rid, + "mode": mode, + "updates-only": config.updates_only, + "subscriptions": subscriptions, + } + + def exec(self): + prefix = self.PATH_SR.format(self._rid) + configs = { + prefix + "/config/id": self._rid, + prefix + "/config/mode": self._config["mode"], + prefix + "/config/updates-only": self._config["updates-only"], + } + for s in self._config["subscriptions"]: + sid = s["id"] + sprefix = prefix + f"/subscriptions/subscription[id='{sid}']" + configs[sprefix + "/config/id"] = sid + configs[sprefix + "/config/path"] = s["path"] + if self._config["mode"] == "STREAM": + configs[sprefix + "/config/mode"] = s["mode"] + if s["sample-interval"] > 0: + configs[sprefix + "/config/sample-interval"] = s["sample-interval"] + configs[sprefix + "/config/suppress-redundant"] = s[ + "suppress-redundant" + ] + if s["heartbeat-interval"] > 0: + configs[sprefix + "/config/heartbeat-interval"] = s[ + "heartbeat-interval" + ] + with self._repo() as repo: + repo.start() + for path, value in configs.items(): + try: + repo.set(path, value) + except ValueError as e: + msg = f"failed to set {path} to the path {value}." + logger.error(msg) + raise InvalidArgumentError(msg) from e + try: + repo.apply() + except ApplyFailedError as e: + msg = f"failed to apply subscription config {self._config}. {e}." + logger.error(msg) + raise InvalidArgumentError(msg) from e + + def clear(self): + with self._repo() as repo: + repo.start() + try: + repo.delete(self.PATH_SR.format(self._rid)) + repo.apply() + except NotFoundError: + logger.info("subscription config %s to delete is not found.", self._rid) + pass + except ApplyFailedError as e: + logger.error("failed to clear subscription config %s. %s", self._rid, e) + repo.discard() + + def push_notif(self, notif): + timestamp = time.time_ns() + sr = None + if notif["type"] == "SYNC_RESPONSE": + sr = gnmi_pb2.SubscribeResponse(sync_response=True) + elif notif["type"] == "UPDATE": + sr = gnmi_pb2.SubscribeResponse( + update=gnmi_pb2.Notification( + timestamp=timestamp, + update=[ + gnmi_pb2.Update( + path=_build_gnmi_path(notif["path"]), + val=gnmi_pb2.TypedValue( + json_val=notif["json-data"].encode() + ), + ), + ], + ) + ) + elif notif["type"] == "DELETE": + sr = gnmi_pb2.SubscribeResponse( + update=gnmi_pb2.Notification( + timestamp=timestamp, + delete=[ + _build_gnmi_path(notif["path"]), + ], + ) + ) + if sr is not None: + self._notifs.insert(0, sr) + + def poll_notifs(self): + with self._repo() as repo: + repo.start() + repo.exec_rpc(self.PATH_POLL, {"id": self._rid}) + + def pull_notifs(self): + while True: + try: + yield self._notifs.pop() + except IndexError: + break + + class gNMIServicer(gnmi_pb2_grpc.gNMIServicer): """gNMIServicer provides an implementation of the methods of the gNMI service. @@ -285,11 +477,18 @@ class gNMIServicer(gnmi_pb2_grpc.gNMIServicer): """ SUPPORTED_ENCODINGS = [gnmi_pb2.Encoding.JSON] + NOTIFICATION_PULL_INTERVAL = 0.01 def __init__(self, repo, supported_models): super().__init__() self.repo = repo self.supported_models = supported_models + self._subscribe_requests = {} + self._subscribe_repo = self.repo() + self._subscribe_repo.start() + self._subscribe_repo.subscribe_notification( + "/goldstone-telemetry:telemetry-notify-event", self._notification_cb + ) def Capabilities(self, request, context): return gnmi_pb2.CapabilityResponse( @@ -471,7 +670,104 @@ def Set(self, request, context): timestamp=timestamp, ) - # TODO: Implement Subscribe(). + def _notification_cb(self, xpath, notif_type, value, timestamp, priv): + rid = value["request-id"] + try: + sr = self._subscribe_requests[rid] + except KeyError: + logger.error( + "Subscribe request %s related to the notification is not found.", rid + ) + return + sr.push_notif(value) + + def _generate_subscribe_request_id(self): + while True: + rid = random.randint(0, 0xFFFFFFFF) + if rid not in self._subscribe_requests.keys(): + return rid + + def _notify_current_states(self, sr): + sync_response = False + while True: + for notification in sr.pull_notifs(): + yield notification + if notification.sync_response: + sync_response = True + if sync_response: + break + time.sleep(self.NOTIFICATION_PULL_INTERVAL) + + def _notify_updated_states(self, sr, context): + while True: + if not context.is_active(): + break + for notification in sr.pull_notifs(): + yield notification + time.sleep(self.NOTIFICATION_PULL_INTERVAL) + + def Subscribe(self, request_iterator, context): + def set_error(code, msg): + logger.error(msg) + context.set_code(code) + context.set_details(msg) + return gnmi_pb2.Error(code=code, message=msg) + + # Create a subscription. + req = next(request_iterator) + mode = req.subscribe.mode + rid = self._generate_subscribe_request_id() + error = None + try: + sr = SubscribeRequest(self.repo, rid, req.subscribe) + self._subscribe_requests[rid] = sr + sr.exec() + except InvalidArgumentError as e: + error = set_error( + GRPC_STATUS_CODE_INVALID_ARGUMENT, + f"request has invalid argument(s). {e}", + ) + except Exception as e: + error = set_error( + GRPC_STATUS_CODE_UNKNOWN, f"an unknown error has occurred. {e}" + ) + if error is not None: + try: + self._subscribe_requests[rid].clear() + del self._subscribe_requests[rid] + except KeyError: + pass + return gnmi_pb2.SubscribeResponse(error=error) + + # Generate notifications. + try: + for notification in self._notify_current_states(sr): + yield notification + if mode == gnmi_pb2.SubscriptionList.Mode.POLL: + for req in request_iterator: + if not req.HasField("poll"): + error = set_error( + GRPC_STATUS_CODE_INVALID_ARGUMENT, + "the request is not a 'poll' request.", + ) + break + sr.poll_notifs() + for notification in self._notify_current_states(sr): + yield notification + elif mode == gnmi_pb2.SubscriptionList.Mode.STREAM: + for notification in self._notify_updated_states(sr, context): + yield notification + except Exception as e: + error = set_error( + GRPC_STATUS_CODE_UNKNOWN, f"an unknown error has occurred. {e}" + ) + finally: + self._subscribe_requests[rid].clear() + del self._subscribe_requests[rid] + if error is None: + return gnmi_pb2.SubscribeResponse() + else: + return gnmi_pb2.SubscribeResponse(error=error) def serve( diff --git a/src/north/gnmi/tests/lib.py b/src/north/gnmi/tests/lib.py index 418dd9e9..f4dd8cd2 100644 --- a/src/north/gnmi/tests/lib.py +++ b/src/north/gnmi/tests/lib.py @@ -1,5 +1,7 @@ """Tests of gNMI server.""" +# pylint: disable=W0212,C0103 + import unittest import logging import os @@ -40,8 +42,8 @@ def delete(self, xpath): raise self.exception -class MockOCPlatformServer(ServerBase): - """MockOCPlatformServer is mock handler server for openconfig-platform models. +class MockServer(ServerBase): + """MockServer is mock handler server for tests. Attributes: oper_data (dict): Data for oper_cb() to return. You can set this to configure mock's behavior. @@ -50,6 +52,8 @@ class MockOCPlatformServer(ServerBase): def __init__(self, conn, module): super().__init__(conn, module) self.oper_data = {} + self.notifs_xpath = "" + self.notifs_data = {} self.handlers = {} async def change_cb(self, event, req_id, changes, priv): @@ -58,49 +62,64 @@ async def change_cb(self, event, req_id, changes, priv): def oper_cb(self, xpath, priv): return self.oper_data + def notify(self, xpath, data): + self.conn.send_notification(xpath, data) -class MockOCInterfacesServer(ServerBase): - """MockOCInterfacesServer is mock handler server for openconfig-interfaces models. + def send_notifs(self): + for data in self.notifs_data: + self.notify(self.notifs_xpath, data) - Attributes: - oper_data (dict): Data for oper_cb() to return. You can set this to configure mock's behavior. - """ + +class MockOCPlatformServer(MockServer): + """MockOCPlatformServer is mock handler server for openconfig-platform.""" def __init__(self, conn, module): super().__init__(conn, module) + # You can customize the behavior of the mock server. self.oper_data = {} self.handlers = {} - async def change_cb(self, event, req_id, changes, priv): - pass - def oper_cb(self, xpath, priv): - return self.oper_data +class MockOCInterfacesServer(MockServer): + """MockOCInterfacesServer is mock handler server for openconfig-interfaces models.""" + def __init__(self, conn, module): + super().__init__(conn, module) + # You can customize the behavior of the mock server. + self.oper_data = {} + self.handlers = {} -class MockOCTerminalDeviceServer(ServerBase): - """MockOCTerminalDeviceServer is mock handler server for openconfig-terminal-device models. - Attributes: - oper_data (dict): Data for oper_cb() to return. You can set this to configure mock's behavior. - """ +class MockOCTerminalDeviceServer(MockServer): + """MockOCTerminalDeviceServer is mock handler server for openconfig-terminal-device .""" def __init__(self, conn, module): super().__init__(conn, module) + # You can customize the behavior of the mock server. self.oper_data = {} self.handlers = {} - async def change_cb(self, event, req_id, changes, priv): - pass - def oper_cb(self, xpath, priv): - return self.oper_data +class MockGSTelemetryServer(MockServer): + """MockGSTelemetryServer is mock handler server for goldstone-telemetry.""" + + def __init__(self, conn, module): + super().__init__(conn, module) + # You can customize the behavior of the mock server. + self.oper_data = {} + self.handlers = {} + self.poll_count = 0 + self.conn.subscribe_rpc_call("/goldstone-telemetry:poll", self.poll_cb) + + def poll_cb(self, xpath, inputs, event, priv): + self.send_notifs() MOCK_SERVERS = { "openconfig-platform": MockOCPlatformServer, "openconfig-interfaces": MockOCInterfacesServer, "openconfig-terminal-device": MockOCTerminalDeviceServer, + "goldstone-telemetry": MockGSTelemetryServer, } @@ -125,8 +144,13 @@ async def evloop(): else: if msg["type"] == "stop": return - elif msg["type"] == "set": + elif msg["type"] == "set-oper-data": servers[msg["server"]].oper_data = msg["data"] + elif msg["type"] == "set-notifs-data": + servers[msg["server"]].notifs_xpath = msg["path"] + servers[msg["server"]].notifs_data = msg["data"] + elif msg["type"] == "send-notif": + servers[msg["server"]].send_notifs() tasks.append(evloop()) tasks = [ @@ -155,11 +179,12 @@ async def asyncSetUp(self): # gNMI server to test. self._real_time = grpc_testing.strict_real_time() self.target_service = gnmi_pb2.DESCRIPTOR.services_by_name["gNMI"] - servicer = gNMIServicer(Sysrepo, load_supported_models()) - descriptors_to_services = {self.target_service: servicer} + self.servicer = gNMIServicer(Sysrepo, load_supported_models()) + descriptors_to_services = {self.target_service: self.servicer} self._real_time_server = grpc_testing.server_from_dictionary( descriptors_to_services, self._real_time ) + self.rpc = None self.conn = Connector() @@ -199,9 +224,34 @@ def set_mock_oper_data(self, server, data): server (str): Target mock server name. A key in MOCK_SERVERS. data (dict): Operational state data that the server returns. """ - self.q.put({"type": "set", "server": server, "data": data}) + self.q.put({"type": "set-oper-data", "server": server, "data": data}) + + def set_mock_notifs_data(self, server, path, data): + """Set notifications data to the mock server. + + Args: + server (str): Target mock server name. A key in MOCK_SERVERS. + path (str): Path of the notifications to send. + data (dict): Data of the notifications to send. + """ + self.q.put( + {"type": "set-notifs-data", "server": server, "path": path, "data": data} + ) + + def send_mock_notifs(self, server): + """Send notifications from the mock server. + + Args: + server (str): Target mock server name. A key in MOCK_SERVERS. + """ + self.q.put({"type": "send-notif", "server": server}) async def asyncTearDown(self): + try: + self.rpc.cancel() + except Exception: + pass + self.servicer._subscribe_repo.stop() self.tasks = [] self.conn.stop() self.q.put({"type": "stop"}) @@ -227,3 +277,10 @@ def gnmi_set(self, request): ) response, trailing_metadata, code, details = rpc.termination() return response, code + + def gnmi_subscribe(self, request): + rpc = self._real_time_server.invoke_stream_stream( + self.target_service.methods_by_name["Subscribe"], (), None + ) + rpc.send_request(request) + return rpc diff --git a/src/north/gnmi/tests/test_gnmi.py b/src/north/gnmi/tests/test_gnmi.py index 0f38e40d..eeac8d7c 100644 --- a/src/north/gnmi/tests/test_gnmi.py +++ b/src/north/gnmi/tests/test_gnmi.py @@ -1,9 +1,12 @@ """Tests of gNMI server.""" +# pylint: disable=W0212,C0103 + import unittest import time import json import grpc +import sysrepo from tests.lib import MockRepository, gNMIServerTestCase from goldstone.north.gnmi.server import ( Request, @@ -3136,5 +3139,2211 @@ def test(): await self.run_gnmi_server_test(test) +class TestSubscribe(gNMIServerTestCase): + """Tests gNMI server Subscribe Service.""" + + MOCK_MODULES = ["goldstone-telemetry"] + NOTIF_SERVER = "goldstone-telemetry" + NOTIF_PATH = "goldstone-telemetry:telemetry-notify-event" + WAIT_CREATION = 0.1 + WAIT_NOTIFICATION = 0.1 + + async def test_subscribe_stream_target_defined(self): + def test(): + # Create a Subscribe RPC session. + path_str = "/openconfig-interfaces:interfaces/interface[name='Interface1/0/1']/state/enabled" + path = gnmi_pb2.Path() + append_path_element(path, "openconfig-interfaces:interfaces") + append_path_element(path, "interface", "name", "Interface1/0/1") + append_path_element(path, "state") + append_path_element(path, "enabled") + s1 = gnmi_pb2.Subscription( + path=path, mode=gnmi_pb2.SubscriptionMode.TARGET_DEFINED + ) + subscriptions = [s1] + request = gnmi_pb2.SubscribeRequest( + subscribe=gnmi_pb2.SubscriptionList( + mode=gnmi_pb2.SubscriptionList.Mode.STREAM, + subscription=subscriptions, + ), + ) + self.rpc = self.gnmi_subscribe(request) + + time.sleep(self.WAIT_CREATION) + + # Verify configuraion. + with sysrepo.SysrepoConnection() as conn: + with conn.start_session() as sess: + sess.switch_datastore("running") + subscribe_requests = sess.get_data( + "/goldstone-telemetry:subscribe-requests/subscribe-request" + ) + srs = list( + subscribe_requests["subscribe-requests"]["subscribe-request"] + ) + self.assertEqual(len(srs), 1) + sr = srs[0] + generated_id = sr["id"] + expected = { + "id": generated_id, + "config": { + "id": generated_id, + "mode": "STREAM", + "updates-only": False, + }, + "subscriptions": { + "subscription": [ + { + "id": 0, + "config": { + "id": 0, + "path": path_str, + "mode": "TARGET_DEFINED", + "suppress-redundant": False, + }, + } + ] + }, + } + self.assertEqual(sr, expected) + + # Send mocked events. + expected_time_min = time.time_ns() + notifs = [ + { + "type": "UPDATE", + "request-id": generated_id, + "subscription-id": 0, + "path": "/openconfig-interfaces:interfaces/interface[name='Interface1/0/1']/state/enabled", + "json-data": "true", + }, + { + "type": "SYNC_RESPONSE", + "request-id": generated_id, + "subscription-id": 0, + }, + ] + self.set_mock_notifs_data(self.NOTIF_SERVER, self.NOTIF_PATH, notifs) + self.send_mock_notifs(self.NOTIF_SERVER) + + time.sleep(self.WAIT_NOTIFICATION) + + # Receive initial updates. + actual = self.rpc.take_response() + expected_time_max = time.time_ns() + self.assertGreater(actual.update.timestamp, expected_time_min) + self.assertLess(actual.update.timestamp, expected_time_max) + self.assertEqual(actual.update.update[0].path, path) + act = json.loads(actual.update.update[0].val.json_val.decode("utf-8")) + expected = True + self.assertEqual(act, expected) + + # Receive the sync-response of the initial updates. + actual = self.rpc.take_response() + self.assertEqual(actual.sync_response, True) + + # Send mocked update events. + expected_time_min = time.time_ns() + notifs = [ + { + "type": "UPDATE", + "request-id": generated_id, + "subscription-id": 0, + "path": "/openconfig-interfaces:interfaces/interface[name='Interface1/0/1']/state/enabled", + "json-data": "false", + }, + ] + self.set_mock_notifs_data(self.NOTIF_SERVER, self.NOTIF_PATH, notifs) + self.send_mock_notifs(self.NOTIF_SERVER) + + time.sleep(self.WAIT_NOTIFICATION) + + # Receive streaming updates. + actual = self.rpc.take_response() + expected_time_max = time.time_ns() + self.assertGreater(actual.update.timestamp, expected_time_min) + self.assertLess(actual.update.timestamp, expected_time_max) + self.assertEqual(actual.update.update[0].path, path) + act = json.loads(actual.update.update[0].val.json_val.decode("utf-8")) + expected = False + self.assertEqual(act, expected) + + # No sync-responses. + + # Close the RPC session. + self.rpc.requests_closed() + # NOTE: rpc.termination does not work for tests. We cannot close the RPC and confirm teardown procedure. + # _, code, _ = self.rpc.termination() + # self.assertEqual(code, grpc.StatusCode.OK) + # + # Was the subscribe-request deleted? + # self.assertEqual(len(self.servicer._subscribe_requests), 0) + # with sysrepo.SysrepoConnection() as conn: + # with conn.start_session() as sess: + # sess.switch_datastore("running") + # with self.assertRaises(sysrepo.SysrepoNotFoundError): + # sess.get_data( + # f"/goldstone-telemetry:subscribe-requests/subscribe-request[id='{generated_id}']" + # ) + + await self.run_gnmi_server_test(test) + + async def test_subscribe_stream_on_change(self): + def test(): + # Create a Subscribe RPC session. + path_str = "/openconfig-interfaces:interfaces/interface[name='Interface1/0/1']/state/enabled" + path = gnmi_pb2.Path() + append_path_element(path, "openconfig-interfaces:interfaces") + append_path_element(path, "interface", "name", "Interface1/0/1") + append_path_element(path, "state") + append_path_element(path, "enabled") + s1 = gnmi_pb2.Subscription( + path=path, mode=gnmi_pb2.SubscriptionMode.ON_CHANGE + ) + subscriptions = [s1] + request = gnmi_pb2.SubscribeRequest( + subscribe=gnmi_pb2.SubscriptionList( + mode=gnmi_pb2.SubscriptionList.Mode.STREAM, + subscription=subscriptions, + ), + ) + self.rpc = self.gnmi_subscribe(request) + + time.sleep(self.WAIT_CREATION) + + # Verify configuraion. + with sysrepo.SysrepoConnection() as conn: + with conn.start_session() as sess: + sess.switch_datastore("running") + subscribe_requests = sess.get_data( + "/goldstone-telemetry:subscribe-requests/subscribe-request" + ) + srs = list( + subscribe_requests["subscribe-requests"]["subscribe-request"] + ) + self.assertEqual(len(srs), 1) + sr = srs[0] + generated_id = sr["id"] + expected = { + "id": generated_id, + "config": { + "id": generated_id, + "mode": "STREAM", + "updates-only": False, + }, + "subscriptions": { + "subscription": [ + { + "id": 0, + "config": { + "id": 0, + "path": path_str, + "mode": "ON_CHANGE", + "suppress-redundant": False, + }, + } + ] + }, + } + self.assertEqual(sr, expected) + + # Send mocked events. + expected_time_min = time.time_ns() + notifs = [ + { + "type": "UPDATE", + "request-id": generated_id, + "subscription-id": 0, + "path": "/openconfig-interfaces:interfaces/interface[name='Interface1/0/1']/state/enabled", + "json-data": "true", + }, + { + "type": "SYNC_RESPONSE", + "request-id": generated_id, + "subscription-id": 0, + }, + ] + self.set_mock_notifs_data(self.NOTIF_SERVER, self.NOTIF_PATH, notifs) + self.send_mock_notifs(self.NOTIF_SERVER) + + time.sleep(self.WAIT_NOTIFICATION) + + # Receive initial updates. + actual = self.rpc.take_response() + expected_time_max = time.time_ns() + self.assertGreater(actual.update.timestamp, expected_time_min) + self.assertLess(actual.update.timestamp, expected_time_max) + self.assertEqual(actual.update.update[0].path, path) + act = json.loads(actual.update.update[0].val.json_val.decode("utf-8")) + expected = True + self.assertEqual(act, expected) + + # Receive the sync-response of the initial updates. + actual = self.rpc.take_response() + self.assertEqual(actual.sync_response, True) + + # Send mocked update events. + expected_time_min = time.time_ns() + notifs = [ + { + "type": "UPDATE", + "request-id": generated_id, + "subscription-id": 0, + "path": "/openconfig-interfaces:interfaces/interface[name='Interface1/0/1']/state/enabled", + "json-data": "false", + }, + ] + self.set_mock_notifs_data(self.NOTIF_SERVER, self.NOTIF_PATH, notifs) + self.send_mock_notifs(self.NOTIF_SERVER) + + time.sleep(self.WAIT_NOTIFICATION) + + # Receive streaming updates. + actual = self.rpc.take_response() + expected_time_max = time.time_ns() + self.assertGreater(actual.update.timestamp, expected_time_min) + self.assertLess(actual.update.timestamp, expected_time_max) + self.assertEqual(actual.update.update[0].path, path) + act = json.loads(actual.update.update[0].val.json_val.decode("utf-8")) + expected = False + self.assertEqual(act, expected) + + # No sync-responses. + + # Close the RPC session. + self.rpc.requests_closed() + # NOTE: rpc.termination does not work for tests. We cannot close the RPC and confirm teardown procedure. + # _, code, _ = self.rpc.termination() + # self.assertEqual(code, grpc.StatusCode.OK) + # + # Was the subscribe-request deleted? + # self.assertEqual(len(self.servicer._subscribe_requests), 0) + # with sysrepo.SysrepoConnection() as conn: + # with conn.start_session() as sess: + # sess.switch_datastore("running") + # with self.assertRaises(sysrepo.SysrepoNotFoundError): + # sess.get_data( + # f"/goldstone-telemetry:subscribe-requests/subscribe-request[id='{generated_id}']" + # ) + + await self.run_gnmi_server_test(test) + + async def test_subscribe_stream_on_change_heartbeat(self): + def test(): + # Create a Subscribe RPC session. + path_str = "/openconfig-interfaces:interfaces/interface[name='Interface1/0/1']/state/enabled" + path = gnmi_pb2.Path() + append_path_element(path, "openconfig-interfaces:interfaces") + append_path_element(path, "interface", "name", "Interface1/0/1") + append_path_element(path, "state") + append_path_element(path, "enabled") + hb_interval = 10 * 1000 * 1000 * 1000 + s1 = gnmi_pb2.Subscription( + path=path, + mode=gnmi_pb2.SubscriptionMode.ON_CHANGE, + heartbeat_interval=hb_interval, + ) + subscriptions = [s1] + request = gnmi_pb2.SubscribeRequest( + subscribe=gnmi_pb2.SubscriptionList( + mode=gnmi_pb2.SubscriptionList.Mode.STREAM, + subscription=subscriptions, + ), + ) + self.rpc = self.gnmi_subscribe(request) + + time.sleep(self.WAIT_CREATION) + + # Verify configuraion. + with sysrepo.SysrepoConnection() as conn: + with conn.start_session() as sess: + sess.switch_datastore("running") + subscribe_requests = sess.get_data( + "/goldstone-telemetry:subscribe-requests/subscribe-request" + ) + srs = list( + subscribe_requests["subscribe-requests"]["subscribe-request"] + ) + self.assertEqual(len(srs), 1) + sr = srs[0] + generated_id = sr["id"] + expected = { + "id": generated_id, + "config": { + "id": generated_id, + "mode": "STREAM", + "updates-only": False, + }, + "subscriptions": { + "subscription": [ + { + "id": 0, + "config": { + "id": 0, + "path": path_str, + "mode": "ON_CHANGE", + "suppress-redundant": False, + "heartbeat-interval": hb_interval, + }, + } + ] + }, + } + self.assertEqual(sr, expected) + + # Send mocked events. + expected_time_min = time.time_ns() + notifs = [ + { + "type": "UPDATE", + "request-id": generated_id, + "subscription-id": 0, + "path": "/openconfig-interfaces:interfaces/interface[name='Interface1/0/1']/state/enabled", + "json-data": "true", + }, + { + "type": "SYNC_RESPONSE", + "request-id": generated_id, + "subscription-id": 0, + }, + ] + self.set_mock_notifs_data(self.NOTIF_SERVER, self.NOTIF_PATH, notifs) + self.send_mock_notifs(self.NOTIF_SERVER) + + time.sleep(self.WAIT_NOTIFICATION) + + # Receive initial updates. + actual = self.rpc.take_response() + expected_time_max = time.time_ns() + self.assertGreater(actual.update.timestamp, expected_time_min) + self.assertLess(actual.update.timestamp, expected_time_max) + self.assertEqual(actual.update.update[0].path, path) + act = json.loads(actual.update.update[0].val.json_val.decode("utf-8")) + expected = True + self.assertEqual(act, expected) + + # Receive the sync-response of the initial updates. + actual = self.rpc.take_response() + self.assertEqual(actual.sync_response, True) + + # Send mocked heartbeat expired update events. + expected_time_min = time.time_ns() + notifs = [ + { + "type": "UPDATE", + "request-id": generated_id, + "subscription-id": 0, + "path": "/openconfig-interfaces:interfaces/interface[name='Interface1/0/1']/state/enabled", + "json-data": "true", + }, + ] + self.set_mock_notifs_data(self.NOTIF_SERVER, self.NOTIF_PATH, notifs) + self.send_mock_notifs(self.NOTIF_SERVER) + + time.sleep(self.WAIT_NOTIFICATION) + + # Receive streaming updates. + actual = self.rpc.take_response() + expected_time_max = time.time_ns() + self.assertGreater(actual.update.timestamp, expected_time_min) + self.assertLess(actual.update.timestamp, expected_time_max) + self.assertEqual(actual.update.update[0].path, path) + act = json.loads(actual.update.update[0].val.json_val.decode("utf-8")) + expected = True + self.assertEqual(act, expected) + + # No sync-responses. + + # Close the RPC session. + self.rpc.requests_closed() + # NOTE: rpc.termination does not work for tests. We cannot close the RPC and confirm teardown procedure. + # _, code, _ = self.rpc.termination() + # self.assertEqual(code, grpc.StatusCode.OK) + # + # Was the subscribe-request deleted? + # self.assertEqual(len(self.servicer._subscribe_requests), 0) + # with sysrepo.SysrepoConnection() as conn: + # with conn.start_session() as sess: + # sess.switch_datastore("running") + # with self.assertRaises(sysrepo.SysrepoNotFoundError): + # sess.get_data( + # f"/goldstone-telemetry:subscribe-requests/subscribe-request[id='{generated_id}']" + # ) + + await self.run_gnmi_server_test(test) + + async def test_subscribe_stream_sample(self): + def test(): + # Create a Subscribe RPC session. + path_str = "/openconfig-interfaces:interfaces/interface[name='Interface1/0/1']/state/enabled" + path = gnmi_pb2.Path() + append_path_element(path, "openconfig-interfaces:interfaces") + append_path_element(path, "interface", "name", "Interface1/0/1") + append_path_element(path, "state") + append_path_element(path, "enabled") + s_interval = 5 * 1000 * 1000 * 1000 + s1 = gnmi_pb2.Subscription( + path=path, + mode=gnmi_pb2.SubscriptionMode.SAMPLE, + sample_interval=s_interval, + ) + subscriptions = [s1] + request = gnmi_pb2.SubscribeRequest( + subscribe=gnmi_pb2.SubscriptionList( + mode=gnmi_pb2.SubscriptionList.Mode.STREAM, + subscription=subscriptions, + ), + ) + self.rpc = self.gnmi_subscribe(request) + + time.sleep(self.WAIT_CREATION) + + # Verify configuraion. + with sysrepo.SysrepoConnection() as conn: + with conn.start_session() as sess: + sess.switch_datastore("running") + subscribe_requests = sess.get_data( + "/goldstone-telemetry:subscribe-requests/subscribe-request" + ) + srs = list( + subscribe_requests["subscribe-requests"]["subscribe-request"] + ) + self.assertEqual(len(srs), 1) + sr = srs[0] + generated_id = sr["id"] + expected = { + "id": generated_id, + "config": { + "id": generated_id, + "mode": "STREAM", + "updates-only": False, + }, + "subscriptions": { + "subscription": [ + { + "id": 0, + "config": { + "id": 0, + "path": path_str, + "mode": "SAMPLE", + "sample-interval": s_interval, + "suppress-redundant": False, + }, + } + ] + }, + } + self.assertEqual(sr, expected) + + # Send mocked events. + expected_time_min = time.time_ns() + notifs = [ + { + "type": "UPDATE", + "request-id": generated_id, + "subscription-id": 0, + "path": "/openconfig-interfaces:interfaces/interface[name='Interface1/0/1']/state/enabled", + "json-data": "true", + }, + { + "type": "SYNC_RESPONSE", + "request-id": generated_id, + "subscription-id": 0, + }, + ] + self.set_mock_notifs_data(self.NOTIF_SERVER, self.NOTIF_PATH, notifs) + self.send_mock_notifs(self.NOTIF_SERVER) + + time.sleep(self.WAIT_NOTIFICATION) + + # Receive initial updates. + actual = self.rpc.take_response() + expected_time_max = time.time_ns() + self.assertGreater(actual.update.timestamp, expected_time_min) + self.assertLess(actual.update.timestamp, expected_time_max) + self.assertEqual(actual.update.update[0].path, path) + act = json.loads(actual.update.update[0].val.json_val.decode("utf-8")) + expected = True + self.assertEqual(act, expected) + + # Receive the sync-response of the initial updates. + actual = self.rpc.take_response() + self.assertEqual(actual.sync_response, True) + + # Send mocked sampling update events. + expected_time_min = time.time_ns() + notifs = [ + { + "type": "UPDATE", + "request-id": generated_id, + "subscription-id": 0, + "path": "/openconfig-interfaces:interfaces/interface[name='Interface1/0/1']/state/enabled", + "json-data": "true", + }, + ] + self.set_mock_notifs_data(self.NOTIF_SERVER, self.NOTIF_PATH, notifs) + self.send_mock_notifs(self.NOTIF_SERVER) + + time.sleep(self.WAIT_NOTIFICATION) + + # Receive streaming updates. + actual = self.rpc.take_response() + expected_time_max = time.time_ns() + self.assertGreater(actual.update.timestamp, expected_time_min) + self.assertLess(actual.update.timestamp, expected_time_max) + self.assertEqual(actual.update.update[0].path, path) + act = json.loads(actual.update.update[0].val.json_val.decode("utf-8")) + expected = True + self.assertEqual(act, expected) + + # No sync-responses. + + # Close the RPC session. + self.rpc.requests_closed() + # NOTE: rpc.termination does not work for tests. We cannot close the RPC and confirm teardown procedure. + # _, code, _ = self.rpc.termination() + # self.assertEqual(code, grpc.StatusCode.OK) + # + # Was the subscribe-request deleted? + # self.assertEqual(len(self.servicer._subscribe_requests), 0) + # with sysrepo.SysrepoConnection() as conn: + # with conn.start_session() as sess: + # sess.switch_datastore("running") + # with self.assertRaises(sysrepo.SysrepoNotFoundError): + # sess.get_data( + # f"/goldstone-telemetry:subscribe-requests/subscribe-request[id='{generated_id}']" + # ) + + await self.run_gnmi_server_test(test) + + async def test_subscribe_stream_sample_suppress_redundant(self): + def test(): + # Create a Subscribe RPC session. + path_str = "/openconfig-interfaces:interfaces/interface[name='Interface1/0/1']/state/enabled" + path = gnmi_pb2.Path() + append_path_element(path, "openconfig-interfaces:interfaces") + append_path_element(path, "interface", "name", "Interface1/0/1") + append_path_element(path, "state") + append_path_element(path, "enabled") + s_interval = 5 * 1000 * 1000 * 1000 + s1 = gnmi_pb2.Subscription( + path=path, + mode=gnmi_pb2.SubscriptionMode.SAMPLE, + sample_interval=s_interval, + suppress_redundant=True, + ) + subscriptions = [s1] + request = gnmi_pb2.SubscribeRequest( + subscribe=gnmi_pb2.SubscriptionList( + mode=gnmi_pb2.SubscriptionList.Mode.STREAM, + subscription=subscriptions, + ), + ) + self.rpc = self.gnmi_subscribe(request) + + time.sleep(self.WAIT_CREATION) + + # Verify configuraion. + with sysrepo.SysrepoConnection() as conn: + with conn.start_session() as sess: + sess.switch_datastore("running") + subscribe_requests = sess.get_data( + "/goldstone-telemetry:subscribe-requests/subscribe-request" + ) + srs = list( + subscribe_requests["subscribe-requests"]["subscribe-request"] + ) + self.assertEqual(len(srs), 1) + sr = srs[0] + generated_id = sr["id"] + expected = { + "id": generated_id, + "config": { + "id": generated_id, + "mode": "STREAM", + "updates-only": False, + }, + "subscriptions": { + "subscription": [ + { + "id": 0, + "config": { + "id": 0, + "path": path_str, + "mode": "SAMPLE", + "sample-interval": s_interval, + "suppress-redundant": True, + }, + } + ] + }, + } + self.assertEqual(sr, expected) + + # Send mocked events. + expected_time_min = time.time_ns() + notifs = [ + { + "type": "UPDATE", + "request-id": generated_id, + "subscription-id": 0, + "path": "/openconfig-interfaces:interfaces/interface[name='Interface1/0/1']/state/enabled", + "json-data": "true", + }, + { + "type": "SYNC_RESPONSE", + "request-id": generated_id, + "subscription-id": 0, + }, + ] + self.set_mock_notifs_data(self.NOTIF_SERVER, self.NOTIF_PATH, notifs) + self.send_mock_notifs(self.NOTIF_SERVER) + + time.sleep(self.WAIT_NOTIFICATION) + + # Receive initial updates. + actual = self.rpc.take_response() + expected_time_max = time.time_ns() + self.assertGreater(actual.update.timestamp, expected_time_min) + self.assertLess(actual.update.timestamp, expected_time_max) + self.assertEqual(actual.update.update[0].path, path) + act = json.loads(actual.update.update[0].val.json_val.decode("utf-8")) + expected = True + self.assertEqual(act, expected) + + # Receive the sync-response of the initial updates. + actual = self.rpc.take_response() + self.assertEqual(actual.sync_response, True) + + # No sampling update events and streaming updates because of suppress-redundant. + + # Close the RPC session. + self.rpc.requests_closed() + # NOTE: rpc.termination does not work for tests. We cannot close the RPC and confirm teardown procedure. + # _, code, _ = self.rpc.termination() + # self.assertEqual(code, grpc.StatusCode.OK) + # + # Was the subscribe-request deleted? + # self.assertEqual(len(self.servicer._subscribe_requests), 0) + # with sysrepo.SysrepoConnection() as conn: + # with conn.start_session() as sess: + # sess.switch_datastore("running") + # with self.assertRaises(sysrepo.SysrepoNotFoundError): + # sess.get_data( + # f"/goldstone-telemetry:subscribe-requests/subscribe-request[id='{generated_id}']" + # ) + + await self.run_gnmi_server_test(test) + + async def test_subscribe_stream_sample_suppress_redundant_heartbeat(self): + def test(): + # Create a Subscribe RPC session. + path_str = "/openconfig-interfaces:interfaces/interface[name='Interface1/0/1']/state/enabled" + path = gnmi_pb2.Path() + append_path_element(path, "openconfig-interfaces:interfaces") + append_path_element(path, "interface", "name", "Interface1/0/1") + append_path_element(path, "state") + append_path_element(path, "enabled") + s_interval = 5 * 1000 * 1000 * 1000 + hb_interval = 10 * 1000 * 1000 * 1000 + s1 = gnmi_pb2.Subscription( + path=path, + mode=gnmi_pb2.SubscriptionMode.SAMPLE, + sample_interval=s_interval, + suppress_redundant=True, + heartbeat_interval=hb_interval, + ) + subscriptions = [s1] + request = gnmi_pb2.SubscribeRequest( + subscribe=gnmi_pb2.SubscriptionList( + mode=gnmi_pb2.SubscriptionList.Mode.STREAM, + subscription=subscriptions, + ), + ) + self.rpc = self.gnmi_subscribe(request) + + time.sleep(self.WAIT_CREATION) + + # Verify configuraion. + with sysrepo.SysrepoConnection() as conn: + with conn.start_session() as sess: + sess.switch_datastore("running") + subscribe_requests = sess.get_data( + "/goldstone-telemetry:subscribe-requests/subscribe-request" + ) + srs = list( + subscribe_requests["subscribe-requests"]["subscribe-request"] + ) + self.assertEqual(len(srs), 1) + sr = srs[0] + generated_id = sr["id"] + expected = { + "id": generated_id, + "config": { + "id": generated_id, + "mode": "STREAM", + "updates-only": False, + }, + "subscriptions": { + "subscription": [ + { + "id": 0, + "config": { + "id": 0, + "path": path_str, + "mode": "SAMPLE", + "sample-interval": s_interval, + "suppress-redundant": True, + "heartbeat-interval": hb_interval, + }, + } + ] + }, + } + self.assertEqual(sr, expected) + + # Send mocked events. + expected_time_min = time.time_ns() + notifs = [ + { + "type": "UPDATE", + "request-id": generated_id, + "subscription-id": 0, + "path": "/openconfig-interfaces:interfaces/interface[name='Interface1/0/1']/state/enabled", + "json-data": "true", + }, + { + "type": "SYNC_RESPONSE", + "request-id": generated_id, + "subscription-id": 0, + }, + ] + self.set_mock_notifs_data(self.NOTIF_SERVER, self.NOTIF_PATH, notifs) + self.send_mock_notifs(self.NOTIF_SERVER) + + time.sleep(self.WAIT_NOTIFICATION) + + # Receive initial updates. + actual = self.rpc.take_response() + expected_time_max = time.time_ns() + self.assertGreater(actual.update.timestamp, expected_time_min) + self.assertLess(actual.update.timestamp, expected_time_max) + self.assertEqual(actual.update.update[0].path, path) + act = json.loads(actual.update.update[0].val.json_val.decode("utf-8")) + expected = True + self.assertEqual(act, expected) + + # Receive the sync-response of the initial updates. + actual = self.rpc.take_response() + self.assertEqual(actual.sync_response, True) + + # Send mocked heartbeat expired update events. + expected_time_min = time.time_ns() + notifs = [ + { + "type": "UPDATE", + "request-id": generated_id, + "subscription-id": 0, + "path": "/openconfig-interfaces:interfaces/interface[name='Interface1/0/1']/state/enabled", + "json-data": "true", + }, + ] + self.set_mock_notifs_data(self.NOTIF_SERVER, self.NOTIF_PATH, notifs) + self.send_mock_notifs(self.NOTIF_SERVER) + + time.sleep(self.WAIT_NOTIFICATION) + + # Receive streaming updates. + actual = self.rpc.take_response() + expected_time_max = time.time_ns() + self.assertGreater(actual.update.timestamp, expected_time_min) + self.assertLess(actual.update.timestamp, expected_time_max) + self.assertEqual(actual.update.update[0].path, path) + act = json.loads(actual.update.update[0].val.json_val.decode("utf-8")) + expected = True + self.assertEqual(act, expected) + + # No sync-responses. + + # Close the RPC session. + self.rpc.requests_closed() + # NOTE: rpc.termination does not work for tests. We cannot close the RPC and confirm teardown procedure. + # _, code, _ = self.rpc.termination() + # self.assertEqual(code, grpc.StatusCode.OK) + # + # Was the subscribe-request deleted? + # self.assertEqual(len(self.servicer._subscribe_requests), 0) + # with sysrepo.SysrepoConnection() as conn: + # with conn.start_session() as sess: + # sess.switch_datastore("running") + # with self.assertRaises(sysrepo.SysrepoNotFoundError): + # sess.get_data( + # f"/goldstone-telemetry:subscribe-requests/subscribe-request[id='{generated_id}']" + # ) + + await self.run_gnmi_server_test(test) + + async def test_subscribe_stream_updates_only(self): + def test(): + # Create a Subscribe RPC session. + path_str = "/openconfig-interfaces:interfaces/interface[name='Interface1/0/1']/state/enabled" + path = gnmi_pb2.Path() + append_path_element(path, "openconfig-interfaces:interfaces") + append_path_element(path, "interface", "name", "Interface1/0/1") + append_path_element(path, "state") + append_path_element(path, "enabled") + s1 = gnmi_pb2.Subscription( + path=path, mode=gnmi_pb2.SubscriptionMode.TARGET_DEFINED + ) + subscriptions = [s1] + request = gnmi_pb2.SubscribeRequest( + subscribe=gnmi_pb2.SubscriptionList( + mode=gnmi_pb2.SubscriptionList.Mode.STREAM, + updates_only=True, + subscription=subscriptions, + ), + ) + self.rpc = self.gnmi_subscribe(request) + + time.sleep(self.WAIT_CREATION) + + # Verify configuraion. + with sysrepo.SysrepoConnection() as conn: + with conn.start_session() as sess: + sess.switch_datastore("running") + subscribe_requests = sess.get_data( + "/goldstone-telemetry:subscribe-requests/subscribe-request" + ) + srs = list( + subscribe_requests["subscribe-requests"]["subscribe-request"] + ) + self.assertEqual(len(srs), 1) + sr = srs[0] + generated_id = sr["id"] + expected = { + "id": generated_id, + "config": { + "id": generated_id, + "mode": "STREAM", + "updates-only": True, + }, + "subscriptions": { + "subscription": [ + { + "id": 0, + "config": { + "id": 0, + "path": path_str, + "mode": "TARGET_DEFINED", + "suppress-redundant": False, + }, + } + ] + }, + } + self.assertEqual(sr, expected) + + # Send mocked events. + expected_time_min = time.time_ns() + notifs = [ + { + "type": "SYNC_RESPONSE", + "request-id": generated_id, + "subscription-id": 0, + }, + ] + self.set_mock_notifs_data(self.NOTIF_SERVER, self.NOTIF_PATH, notifs) + self.send_mock_notifs(self.NOTIF_SERVER) + + time.sleep(self.WAIT_NOTIFICATION) + + # No initial updates. + + # Receive the sync-response of the initial updates. + actual = self.rpc.take_response() + self.assertEqual(actual.sync_response, True) + + # Send mocked update events. + expected_time_min = time.time_ns() + notifs = [ + { + "type": "UPDATE", + "request-id": generated_id, + "subscription-id": 0, + "path": "/openconfig-interfaces:interfaces/interface[name='Interface1/0/1']/state/enabled", + "json-data": "false", + }, + ] + self.set_mock_notifs_data(self.NOTIF_SERVER, self.NOTIF_PATH, notifs) + self.send_mock_notifs(self.NOTIF_SERVER) + + time.sleep(self.WAIT_NOTIFICATION) + + # Receive streaming updates. + actual = self.rpc.take_response() + expected_time_max = time.time_ns() + self.assertGreater(actual.update.timestamp, expected_time_min) + self.assertLess(actual.update.timestamp, expected_time_max) + self.assertEqual(actual.update.update[0].path, path) + act = json.loads(actual.update.update[0].val.json_val.decode("utf-8")) + expected = False + self.assertEqual(act, expected) + + # No sync-responses. + + # Close the RPC session. + self.rpc.requests_closed() + # NOTE: rpc.termination does not work for tests. We cannot close the RPC and confirm teardown procedure. + # _, code, _ = self.rpc.termination() + # self.assertEqual(code, grpc.StatusCode.OK) + # + # Was the subscribe-request deleted? + # self.assertEqual(len(self.servicer._subscribe_requests), 0) + # with sysrepo.SysrepoConnection() as conn: + # with conn.start_session() as sess: + # sess.switch_datastore("running") + # with self.assertRaises(sysrepo.SysrepoNotFoundError): + # sess.get_data( + # f"/goldstone-telemetry:subscribe-requests/subscribe-request[id='{generated_id}']" + # ) + + await self.run_gnmi_server_test(test) + + async def test_subscribe_once(self): + def test(): + # Create a Subscribe RPC session. + path_str = "/openconfig-interfaces:interfaces/interface[name='Interface1/0/1']/state/enabled" + path = gnmi_pb2.Path() + append_path_element(path, "openconfig-interfaces:interfaces") + append_path_element(path, "interface", "name", "Interface1/0/1") + append_path_element(path, "state") + append_path_element(path, "enabled") + s1 = gnmi_pb2.Subscription(path=path) + subscriptions = [s1] + request = gnmi_pb2.SubscribeRequest( + subscribe=gnmi_pb2.SubscriptionList( + mode=gnmi_pb2.SubscriptionList.Mode.ONCE, + subscription=subscriptions, + ), + ) + self.rpc = self.gnmi_subscribe(request) + + time.sleep(self.WAIT_CREATION) + + # Verify configuraion. + with sysrepo.SysrepoConnection() as conn: + with conn.start_session() as sess: + sess.switch_datastore("running") + subscribe_requests = sess.get_data( + "/goldstone-telemetry:subscribe-requests/subscribe-request" + ) + srs = list( + subscribe_requests["subscribe-requests"]["subscribe-request"] + ) + self.assertEqual(len(srs), 1) + sr = srs[0] + generated_id = sr["id"] + expected = { + "id": generated_id, + "config": { + "id": generated_id, + "mode": "ONCE", + "updates-only": False, + }, + "subscriptions": { + "subscription": [ + { + "id": 0, + "config": { + "id": 0, + "path": path_str, + }, + } + ] + }, + } + self.assertEqual(sr, expected) + + # Send mocked events. + expected_time_min = time.time_ns() + notifs = [ + { + "type": "UPDATE", + "request-id": generated_id, + "subscription-id": 0, + "path": "/openconfig-interfaces:interfaces/interface[name='Interface1/0/1']/state/enabled", + "json-data": "true", + }, + { + "type": "SYNC_RESPONSE", + "request-id": generated_id, + "subscription-id": 0, + }, + ] + self.set_mock_notifs_data(self.NOTIF_SERVER, self.NOTIF_PATH, notifs) + self.send_mock_notifs(self.NOTIF_SERVER) + + time.sleep(self.WAIT_NOTIFICATION) + + # Receive initial updates. + actual = self.rpc.take_response() + expected_time_max = time.time_ns() + self.assertGreater(actual.update.timestamp, expected_time_min) + self.assertLess(actual.update.timestamp, expected_time_max) + self.assertEqual(actual.update.update[0].path, path) + act = json.loads(actual.update.update[0].val.json_val.decode("utf-8")) + expected = True + self.assertEqual(act, expected) + + # Receive the sync-response of the initial updates. + actual = self.rpc.take_response() + self.assertEqual(actual.sync_response, True) + + # Close the RPC session. + self.rpc.requests_closed() + _, code, _ = self.rpc.termination() + self.assertEqual(code, grpc.StatusCode.OK) + + # Was the subscribe-request deleted? + self.assertEqual(len(self.servicer._subscribe_requests), 0) + with sysrepo.SysrepoConnection() as conn: + with conn.start_session() as sess: + sess.switch_datastore("running") + with self.assertRaises(sysrepo.SysrepoNotFoundError): + sess.get_data( + f"/goldstone-telemetry:subscribe-requests/subscribe-request[id='{generated_id}']" + ) + + await self.run_gnmi_server_test(test) + + async def test_subscribe_once_updates_only(self): + def test(): + # Create a Subscribe RPC session. + path_str = "/openconfig-interfaces:interfaces/interface[name='Interface1/0/1']/state/enabled" + path = gnmi_pb2.Path() + append_path_element(path, "openconfig-interfaces:interfaces") + append_path_element(path, "interface", "name", "Interface1/0/1") + append_path_element(path, "state") + append_path_element(path, "enabled") + s1 = gnmi_pb2.Subscription(path=path) + subscriptions = [s1] + request = gnmi_pb2.SubscribeRequest( + subscribe=gnmi_pb2.SubscriptionList( + mode=gnmi_pb2.SubscriptionList.Mode.ONCE, + updates_only=True, + subscription=subscriptions, + ), + ) + self.rpc = self.gnmi_subscribe(request) + + time.sleep(self.WAIT_CREATION) + + # Verify configuraion. + with sysrepo.SysrepoConnection() as conn: + with conn.start_session() as sess: + sess.switch_datastore("running") + subscribe_requests = sess.get_data( + "/goldstone-telemetry:subscribe-requests/subscribe-request" + ) + srs = list( + subscribe_requests["subscribe-requests"]["subscribe-request"] + ) + self.assertEqual(len(srs), 1) + sr = srs[0] + generated_id = sr["id"] + expected = { + "id": generated_id, + "config": { + "id": generated_id, + "mode": "ONCE", + "updates-only": True, + }, + "subscriptions": { + "subscription": [ + { + "id": 0, + "config": { + "id": 0, + "path": path_str, + }, + } + ] + }, + } + self.assertEqual(sr, expected) + + # Send mocked events. + notifs = [ + { + "type": "SYNC_RESPONSE", + "request-id": generated_id, + "subscription-id": 0, + }, + ] + self.set_mock_notifs_data(self.NOTIF_SERVER, self.NOTIF_PATH, notifs) + self.send_mock_notifs(self.NOTIF_SERVER) + + time.sleep(self.WAIT_NOTIFICATION) + + # No initial updates. + + # Receive the sync-response of the initial updates. + actual = self.rpc.take_response() + self.assertEqual(actual.sync_response, True) + + # Close the RPC session. + self.rpc.requests_closed() + _, code, _ = self.rpc.termination() + self.assertEqual(code, grpc.StatusCode.OK) + + # Was the subscribe-request deleted? + self.assertEqual(len(self.servicer._subscribe_requests), 0) + with sysrepo.SysrepoConnection() as conn: + with conn.start_session() as sess: + sess.switch_datastore("running") + with self.assertRaises(sysrepo.SysrepoNotFoundError): + sess.get_data( + f"/goldstone-telemetry:subscribe-requests/subscribe-request[id='{generated_id}']" + ) + + await self.run_gnmi_server_test(test) + + async def test_subscribe_poll(self): + def test(): + # Create a Subscribe RPC session. + path_str = "/openconfig-interfaces:interfaces/interface[name='Interface1/0/1']/state/enabled" + path = gnmi_pb2.Path() + append_path_element(path, "openconfig-interfaces:interfaces") + append_path_element(path, "interface", "name", "Interface1/0/1") + append_path_element(path, "state") + append_path_element(path, "enabled") + s1 = gnmi_pb2.Subscription(path=path) + subscriptions = [s1] + request = gnmi_pb2.SubscribeRequest( + subscribe=gnmi_pb2.SubscriptionList( + mode=gnmi_pb2.SubscriptionList.Mode.POLL, + subscription=subscriptions, + ), + ) + self.rpc = self.gnmi_subscribe(request) + + time.sleep(self.WAIT_CREATION) + + # Verify configuraion. + with sysrepo.SysrepoConnection() as conn: + with conn.start_session() as sess: + sess.switch_datastore("running") + subscribe_requests = sess.get_data( + "/goldstone-telemetry:subscribe-requests/subscribe-request" + ) + srs = list( + subscribe_requests["subscribe-requests"]["subscribe-request"] + ) + self.assertEqual(len(srs), 1) + sr = srs[0] + generated_id = sr["id"] + expected = { + "id": generated_id, + "config": { + "id": generated_id, + "mode": "POLL", + "updates-only": False, + }, + "subscriptions": { + "subscription": [ + { + "id": 0, + "config": { + "id": 0, + "path": path_str, + }, + } + ] + }, + } + self.assertEqual(sr, expected) + + # Send mocked events. + expected_time_min = time.time_ns() + notifs = [ + { + "type": "UPDATE", + "request-id": generated_id, + "subscription-id": 0, + "path": "/openconfig-interfaces:interfaces/interface[name='Interface1/0/1']/state/enabled", + "json-data": "true", + }, + { + "type": "SYNC_RESPONSE", + "request-id": generated_id, + "subscription-id": 0, + }, + ] + self.set_mock_notifs_data(self.NOTIF_SERVER, self.NOTIF_PATH, notifs) + self.send_mock_notifs(self.NOTIF_SERVER) + + time.sleep(self.WAIT_NOTIFICATION) + + # Receive initial updates. + actual = self.rpc.take_response() + expected_time_max = time.time_ns() + self.assertGreater(actual.update.timestamp, expected_time_min) + self.assertLess(actual.update.timestamp, expected_time_max) + self.assertEqual(actual.update.update[0].path, path) + act = json.loads(actual.update.update[0].val.json_val.decode("utf-8")) + expected = True + self.assertEqual(act, expected) + + # Receive the sync-response of the initial updates. + actual = self.rpc.take_response() + self.assertEqual(actual.sync_response, True) + + # Send a poll request. + expected_time_min = time.time_ns() + poll_request = gnmi_pb2.SubscribeRequest(poll=gnmi_pb2.Poll()) + self.rpc.send_request(poll_request) + + time.sleep(self.WAIT_NOTIFICATION) + + # Receive polling updates. + actual = self.rpc.take_response() + expected_time_max = time.time_ns() + self.assertGreater(actual.update.timestamp, expected_time_min) + self.assertLess(actual.update.timestamp, expected_time_max) + self.assertEqual(actual.update.update[0].path, path) + act = json.loads(actual.update.update[0].val.json_val.decode("utf-8")) + expected = True + self.assertEqual(act, expected) + + # Receive the sync-response of the polling updates. + actual = self.rpc.take_response() + self.assertEqual(actual.sync_response, True) + + # Close the RPC session. + self.rpc.requests_closed() + _, code, _ = self.rpc.termination() + self.assertEqual(code, grpc.StatusCode.OK) + + # Was the subscribe-request deleted? + self.assertEqual(len(self.servicer._subscribe_requests), 0) + with sysrepo.SysrepoConnection() as conn: + with conn.start_session() as sess: + sess.switch_datastore("running") + with self.assertRaises(sysrepo.SysrepoNotFoundError): + sess.get_data( + f"/goldstone-telemetry:subscribe-requests/subscribe-request[id='{generated_id}']" + ) + + await self.run_gnmi_server_test(test) + + async def test_subscribe_poll_updates_only(self): + def test(): + # Create a Subscribe RPC session. + path_str = "/openconfig-interfaces:interfaces/interface[name='Interface1/0/1']/state/enabled" + path = gnmi_pb2.Path() + append_path_element(path, "openconfig-interfaces:interfaces") + append_path_element(path, "interface", "name", "Interface1/0/1") + append_path_element(path, "state") + append_path_element(path, "enabled") + s1 = gnmi_pb2.Subscription(path=path) + subscriptions = [s1] + request = gnmi_pb2.SubscribeRequest( + subscribe=gnmi_pb2.SubscriptionList( + mode=gnmi_pb2.SubscriptionList.Mode.POLL, + updates_only=True, + subscription=subscriptions, + ), + ) + self.rpc = self.gnmi_subscribe(request) + + time.sleep(self.WAIT_CREATION) + + # Verify configuraion. + with sysrepo.SysrepoConnection() as conn: + with conn.start_session() as sess: + sess.switch_datastore("running") + subscribe_requests = sess.get_data( + "/goldstone-telemetry:subscribe-requests/subscribe-request" + ) + srs = list( + subscribe_requests["subscribe-requests"]["subscribe-request"] + ) + self.assertEqual(len(srs), 1) + sr = srs[0] + generated_id = sr["id"] + expected = { + "id": generated_id, + "config": { + "id": generated_id, + "mode": "POLL", + "updates-only": True, + }, + "subscriptions": { + "subscription": [ + { + "id": 0, + "config": { + "id": 0, + "path": path_str, + }, + } + ] + }, + } + self.assertEqual(sr, expected) + + # Send mocked events. + notifs = [ + { + "type": "SYNC_RESPONSE", + "request-id": generated_id, + "subscription-id": 0, + }, + ] + self.set_mock_notifs_data(self.NOTIF_SERVER, self.NOTIF_PATH, notifs) + self.send_mock_notifs(self.NOTIF_SERVER) + + time.sleep(self.WAIT_NOTIFICATION) + + # No initial updates. + + # Receive the sync-response of the initial updates. + actual = self.rpc.take_response() + self.assertEqual(actual.sync_response, True) + + # Send a poll request. + poll_request = gnmi_pb2.SubscribeRequest(poll=gnmi_pb2.Poll()) + self.rpc.send_request(poll_request) + + time.sleep(self.WAIT_NOTIFICATION) + + # No polling updates. + + # Receive the sync-response of the polling updates. + actual = self.rpc.take_response() + self.assertEqual(actual.sync_response, True) + + # Close the RPC session. + self.rpc.requests_closed() + _, code, _ = self.rpc.termination() + self.assertEqual(code, grpc.StatusCode.OK) + + # Was the subscribe-request deleted? + self.assertEqual(len(self.servicer._subscribe_requests), 0) + with sysrepo.SysrepoConnection() as conn: + with conn.start_session() as sess: + sess.switch_datastore("running") + with self.assertRaises(sysrepo.SysrepoNotFoundError): + sess.get_data( + f"/goldstone-telemetry:subscribe-requests/subscribe-request[id='{generated_id}']" + ) + + await self.run_gnmi_server_test(test) + + async def test_subscribe_a_leaf(self): + def test(): + # Create a Subscribe RPC session. + path = gnmi_pb2.Path() + append_path_element(path, "openconfig-interfaces:interfaces") + append_path_element(path, "interface", "name", "Interface1/0/1") + append_path_element(path, "state") + append_path_element(path, "enabled") + s1 = gnmi_pb2.Subscription(path=path) + subscriptions = [s1] + request = gnmi_pb2.SubscribeRequest( + subscribe=gnmi_pb2.SubscriptionList( + mode=gnmi_pb2.SubscriptionList.Mode.ONCE, + subscription=subscriptions, + ), + ) + self.rpc = self.gnmi_subscribe(request) + + time.sleep(self.WAIT_CREATION) + + # Get generated request-id. + with sysrepo.SysrepoConnection() as conn: + with conn.start_session() as sess: + sess.switch_datastore("running") + subscribe_requests = sess.get_data( + "/goldstone-telemetry:subscribe-requests/subscribe-request" + ) + srs = list( + subscribe_requests["subscribe-requests"]["subscribe-request"] + ) + self.assertEqual(len(srs), 1) + sr = srs[0] + generated_id = sr["id"] + + # Send mocked events. + expected_time_min = time.time_ns() + notifs = [ + { + "type": "UPDATE", + "request-id": generated_id, + "subscription-id": 0, + "path": "/openconfig-interfaces:interfaces/interface[name='Interface1/0/1']/state/enabled", + "json-data": "true", + }, + { + "type": "SYNC_RESPONSE", + "request-id": generated_id, + "subscription-id": 0, + }, + ] + self.set_mock_notifs_data(self.NOTIF_SERVER, self.NOTIF_PATH, notifs) + self.send_mock_notifs(self.NOTIF_SERVER) + + time.sleep(self.WAIT_NOTIFICATION) + + # Receive initial updates. + actual = self.rpc.take_response() + expected_time_max = time.time_ns() + self.assertGreater(actual.update.timestamp, expected_time_min) + self.assertLess(actual.update.timestamp, expected_time_max) + self.assertEqual(actual.update.update[0].path, path) + act = json.loads(actual.update.update[0].val.json_val.decode("utf-8")) + expected = True + self.assertEqual(act, expected) + + # Receive the sync-response of the initial updates. + actual = self.rpc.take_response() + self.assertEqual(actual.sync_response, True) + + # Close the RPC session. + self.rpc.requests_closed() + _, code, _ = self.rpc.termination() + self.assertEqual(code, grpc.StatusCode.OK) + + await self.run_gnmi_server_test(test) + + async def test_subscribe_a_leaf_list(self): + def test(): + # Create a Subscribe RPC session. + path = gnmi_pb2.Path() + append_path_element(path, "openconfig-terminal-device:terminal-device") + append_path_element(path, "logical-channels") + append_path_element(path, "channel", "index", "1") + append_path_element(path, "ingress") + append_path_element(path, "state") + append_path_element(path, "physical-channel") + s1 = gnmi_pb2.Subscription(path=path) + subscriptions = [s1] + request = gnmi_pb2.SubscribeRequest( + subscribe=gnmi_pb2.SubscriptionList( + mode=gnmi_pb2.SubscriptionList.Mode.ONCE, + subscription=subscriptions, + ), + ) + self.rpc = self.gnmi_subscribe(request) + + time.sleep(self.WAIT_CREATION) + + # Get generated request-id. + with sysrepo.SysrepoConnection() as conn: + with conn.start_session() as sess: + sess.switch_datastore("running") + subscribe_requests = sess.get_data( + "/goldstone-telemetry:subscribe-requests/subscribe-request" + ) + srs = list( + subscribe_requests["subscribe-requests"]["subscribe-request"] + ) + self.assertEqual(len(srs), 1) + sr = srs[0] + generated_id = sr["id"] + + # Send mocked events. + expected_time_min = time.time_ns() + notifs = [ + { + "type": "UPDATE", + "request-id": generated_id, + "subscription-id": 0, + "path": ( + "/openconfig-terminal-device:terminal-device/logical-channels/channel[index='1']" + "/ingress/state/physical-channel" + ), + "json-data": "[1, 2, 3]", + }, + { + "type": "SYNC_RESPONSE", + "request-id": generated_id, + "subscription-id": 0, + }, + ] + self.set_mock_notifs_data(self.NOTIF_SERVER, self.NOTIF_PATH, notifs) + self.send_mock_notifs(self.NOTIF_SERVER) + + time.sleep(self.WAIT_NOTIFICATION) + + # Receive initial updates. + actual = self.rpc.take_response() + expected_time_max = time.time_ns() + self.assertGreater(actual.update.timestamp, expected_time_min) + self.assertLess(actual.update.timestamp, expected_time_max) + self.assertEqual(actual.update.update[0].path, path) + act = json.loads(actual.update.update[0].val.json_val.decode("utf-8")) + expected = [1, 2, 3] + self.assertEqual(act, expected) + + # Receive the sync-response of the initial updates. + actual = self.rpc.take_response() + self.assertEqual(actual.sync_response, True) + + # Close the RPC session. + self.rpc.requests_closed() + _, code, _ = self.rpc.termination() + self.assertEqual(code, grpc.StatusCode.OK) + + await self.run_gnmi_server_test(test) + + async def test_subscribe_a_container(self): + def test(): + # Create a Subscribe RPC session. + path = gnmi_pb2.Path() + append_path_element(path, "openconfig-interfaces:interfaces") + append_path_element(path, "interface", "name", "Interface1/0/1") + append_path_element(path, "state") + s1 = gnmi_pb2.Subscription(path=path) + subscriptions = [s1] + request = gnmi_pb2.SubscribeRequest( + subscribe=gnmi_pb2.SubscriptionList( + mode=gnmi_pb2.SubscriptionList.Mode.ONCE, + subscription=subscriptions, + ), + ) + self.rpc = self.gnmi_subscribe(request) + + time.sleep(self.WAIT_CREATION) + + # Get generated request-id. + with sysrepo.SysrepoConnection() as conn: + with conn.start_session() as sess: + sess.switch_datastore("running") + subscribe_requests = sess.get_data( + "/goldstone-telemetry:subscribe-requests/subscribe-request" + ) + srs = list( + subscribe_requests["subscribe-requests"]["subscribe-request"] + ) + self.assertEqual(len(srs), 1) + sr = srs[0] + generated_id = sr["id"] + + # Send mocked events. + expected_time_min = time.time_ns() + notifs = [ + { + "type": "UPDATE", + "request-id": generated_id, + "subscription-id": 0, + "path": "/openconfig-interfaces:interfaces/interface[name='Interface1/0/1']/state/name", + "json-data": '"Interface1/0/1"', + }, + { + "type": "UPDATE", + "request-id": generated_id, + "subscription-id": 0, + "path": "/openconfig-interfaces:interfaces/interface[name='Interface1/0/1']/state/type", + "json-data": '"iana-if-type:ethernetCsmacd"', + }, + { + "type": "UPDATE", + "request-id": generated_id, + "subscription-id": 0, + "path": "/openconfig-interfaces:interfaces/interface[name='Interface1/0/1']/state/mtu", + "json-data": "1500", + }, + { + "type": "UPDATE", + "request-id": generated_id, + "subscription-id": 0, + "path": "/openconfig-interfaces:interfaces/interface[name='Interface1/0/1']/state/enabled", + "json-data": "true", + }, + { + "type": "SYNC_RESPONSE", + "request-id": generated_id, + "subscription-id": 0, + }, + ] + self.set_mock_notifs_data(self.NOTIF_SERVER, self.NOTIF_PATH, notifs) + self.send_mock_notifs(self.NOTIF_SERVER) + + time.sleep(self.WAIT_NOTIFICATION) + + # Receive initial updates. + ## Verify name. + actual = self.rpc.take_response() + expected_time_max = time.time_ns() + self.assertGreater(actual.update.timestamp, expected_time_min) + self.assertLess(actual.update.timestamp, expected_time_max) + path = gnmi_pb2.Path() + append_path_element(path, "openconfig-interfaces:interfaces") + append_path_element(path, "interface", "name", "Interface1/0/1") + append_path_element(path, "state") + append_path_element(path, "name") + self.assertEqual(actual.update.update[0].path, path) + act = json.loads(actual.update.update[0].val.json_val.decode("utf-8")) + expected = "Interface1/0/1" + self.assertEqual(act, expected) + ## Verify type. + actual = self.rpc.take_response() + expected_time_max = time.time_ns() + self.assertGreater(actual.update.timestamp, expected_time_min) + self.assertLess(actual.update.timestamp, expected_time_max) + path = gnmi_pb2.Path() + append_path_element(path, "openconfig-interfaces:interfaces") + append_path_element(path, "interface", "name", "Interface1/0/1") + append_path_element(path, "state") + append_path_element(path, "type") + self.assertEqual(actual.update.update[0].path, path) + act = json.loads(actual.update.update[0].val.json_val.decode("utf-8")) + expected = "iana-if-type:ethernetCsmacd" + self.assertEqual(act, expected) + ## Verify mtu. + actual = self.rpc.take_response() + expected_time_max = time.time_ns() + self.assertGreater(actual.update.timestamp, expected_time_min) + self.assertLess(actual.update.timestamp, expected_time_max) + path = gnmi_pb2.Path() + append_path_element(path, "openconfig-interfaces:interfaces") + append_path_element(path, "interface", "name", "Interface1/0/1") + append_path_element(path, "state") + append_path_element(path, "mtu") + self.assertEqual(actual.update.update[0].path, path) + act = json.loads(actual.update.update[0].val.json_val.decode("utf-8")) + expected = 1500 + self.assertEqual(act, expected) + ## Verify enabled. + actual = self.rpc.take_response() + expected_time_max = time.time_ns() + self.assertGreater(actual.update.timestamp, expected_time_min) + self.assertLess(actual.update.timestamp, expected_time_max) + path = gnmi_pb2.Path() + append_path_element(path, "openconfig-interfaces:interfaces") + append_path_element(path, "interface", "name", "Interface1/0/1") + append_path_element(path, "state") + append_path_element(path, "enabled") + self.assertEqual(actual.update.update[0].path, path) + act = json.loads(actual.update.update[0].val.json_val.decode("utf-8")) + expected = True + self.assertEqual(act, expected) + + # Receive the sync-response of the initial updates. + actual = self.rpc.take_response() + self.assertEqual(actual.sync_response, True) + + # Close the RPC session. + self.rpc.requests_closed() + _, code, _ = self.rpc.termination() + self.assertEqual(code, grpc.StatusCode.OK) + + await self.run_gnmi_server_test(test) + + async def test_subscribe_a_container_list(self): + def test(): + # Create a Subscribe RPC session. + path = gnmi_pb2.Path() + append_path_element(path, "openconfig-interfaces:interfaces") + append_path_element(path, "interface") + s1 = gnmi_pb2.Subscription(path=path) + subscriptions = [s1] + request = gnmi_pb2.SubscribeRequest( + subscribe=gnmi_pb2.SubscriptionList( + mode=gnmi_pb2.SubscriptionList.Mode.ONCE, + subscription=subscriptions, + ), + ) + self.rpc = self.gnmi_subscribe(request) + + time.sleep(self.WAIT_CREATION) + + # Get generated request-id. + with sysrepo.SysrepoConnection() as conn: + with conn.start_session() as sess: + sess.switch_datastore("running") + subscribe_requests = sess.get_data( + "/goldstone-telemetry:subscribe-requests/subscribe-request" + ) + srs = list( + subscribe_requests["subscribe-requests"]["subscribe-request"] + ) + self.assertEqual(len(srs), 1) + sr = srs[0] + generated_id = sr["id"] + + # Send mocked events. + expected_time_min = time.time_ns() + notifs = [ + { + "type": "UPDATE", + "request-id": generated_id, + "subscription-id": 0, + "path": "/openconfig-interfaces:interfaces/interface[name='Interface1/0/1']/name", + "json-data": '"Interface1/0/1"', + }, + { + "type": "UPDATE", + "request-id": generated_id, + "subscription-id": 0, + "path": "/openconfig-interfaces:interfaces/interface[name='Interface1/0/1']/state/name", + "json-data": '"Interface1/0/1"', + }, + { + "type": "UPDATE", + "request-id": generated_id, + "subscription-id": 0, + "path": "/openconfig-interfaces:interfaces/interface[name='Interface1/0/1']/state/enabled", + "json-data": "true", + }, + { + "type": "UPDATE", + "request-id": generated_id, + "subscription-id": 0, + "path": "/openconfig-interfaces:interfaces/interface[name='Interface1/0/2']/name", + "json-data": '"Interface1/0/2"', + }, + { + "type": "UPDATE", + "request-id": generated_id, + "subscription-id": 0, + "path": "/openconfig-interfaces:interfaces/interface[name='Interface1/0/2']/state/name", + "json-data": '"Interface1/0/2"', + }, + { + "type": "UPDATE", + "request-id": generated_id, + "subscription-id": 0, + "path": "/openconfig-interfaces:interfaces/interface[name='Interface1/0/2']/state/enabled", + "json-data": "true", + }, + { + "type": "SYNC_RESPONSE", + "request-id": generated_id, + "subscription-id": 0, + }, + ] + self.set_mock_notifs_data(self.NOTIF_SERVER, self.NOTIF_PATH, notifs) + self.send_mock_notifs(self.NOTIF_SERVER) + + time.sleep(self.WAIT_NOTIFICATION) + + # Receive initial updates. + ## Verify Interface1/0/1 name. + actual = self.rpc.take_response() + expected_time_max = time.time_ns() + self.assertGreater(actual.update.timestamp, expected_time_min) + self.assertLess(actual.update.timestamp, expected_time_max) + path = gnmi_pb2.Path() + append_path_element(path, "openconfig-interfaces:interfaces") + append_path_element(path, "interface", "name", "Interface1/0/1") + append_path_element(path, "name") + self.assertEqual(actual.update.update[0].path, path) + act = json.loads(actual.update.update[0].val.json_val.decode("utf-8")) + expected = "Interface1/0/1" + self.assertEqual(act, expected) + ## Verify Interface1/0/1 state/name. + actual = self.rpc.take_response() + expected_time_max = time.time_ns() + self.assertGreater(actual.update.timestamp, expected_time_min) + self.assertLess(actual.update.timestamp, expected_time_max) + path = gnmi_pb2.Path() + append_path_element(path, "openconfig-interfaces:interfaces") + append_path_element(path, "interface", "name", "Interface1/0/1") + append_path_element(path, "state") + append_path_element(path, "name") + self.assertEqual(actual.update.update[0].path, path) + act = json.loads(actual.update.update[0].val.json_val.decode("utf-8")) + expected = "Interface1/0/1" + self.assertEqual(act, expected) + ## Verify Interface1/0/1 state/enabled. + actual = self.rpc.take_response() + expected_time_max = time.time_ns() + self.assertGreater(actual.update.timestamp, expected_time_min) + self.assertLess(actual.update.timestamp, expected_time_max) + path = gnmi_pb2.Path() + append_path_element(path, "openconfig-interfaces:interfaces") + append_path_element(path, "interface", "name", "Interface1/0/1") + append_path_element(path, "state") + append_path_element(path, "enabled") + self.assertEqual(actual.update.update[0].path, path) + act = json.loads(actual.update.update[0].val.json_val.decode("utf-8")) + expected = True + self.assertEqual(act, expected) + ## Verify Interface1/0/2 name. + actual = self.rpc.take_response() + expected_time_max = time.time_ns() + self.assertGreater(actual.update.timestamp, expected_time_min) + self.assertLess(actual.update.timestamp, expected_time_max) + path = gnmi_pb2.Path() + append_path_element(path, "openconfig-interfaces:interfaces") + append_path_element(path, "interface", "name", "Interface1/0/2") + append_path_element(path, "name") + self.assertEqual(actual.update.update[0].path, path) + act = json.loads(actual.update.update[0].val.json_val.decode("utf-8")) + expected = "Interface1/0/2" + self.assertEqual(act, expected) + ## Verify Interface1/0/2 state/name. + actual = self.rpc.take_response() + expected_time_max = time.time_ns() + self.assertGreater(actual.update.timestamp, expected_time_min) + self.assertLess(actual.update.timestamp, expected_time_max) + path = gnmi_pb2.Path() + append_path_element(path, "openconfig-interfaces:interfaces") + append_path_element(path, "interface", "name", "Interface1/0/2") + append_path_element(path, "state") + append_path_element(path, "name") + self.assertEqual(actual.update.update[0].path, path) + act = json.loads(actual.update.update[0].val.json_val.decode("utf-8")) + expected = "Interface1/0/2" + self.assertEqual(act, expected) + ## Verify Interface1/0/2 state/enabled. + actual = self.rpc.take_response() + expected_time_max = time.time_ns() + self.assertGreater(actual.update.timestamp, expected_time_min) + self.assertLess(actual.update.timestamp, expected_time_max) + path = gnmi_pb2.Path() + append_path_element(path, "openconfig-interfaces:interfaces") + append_path_element(path, "interface", "name", "Interface1/0/2") + append_path_element(path, "state") + append_path_element(path, "enabled") + self.assertEqual(actual.update.update[0].path, path) + act = json.loads(actual.update.update[0].val.json_val.decode("utf-8")) + expected = True + self.assertEqual(act, expected) + + # Receive the sync-response of the initial updates. + actual = self.rpc.take_response() + self.assertEqual(actual.sync_response, True) + + # Close the RPC session. + self.rpc.requests_closed() + _, code, _ = self.rpc.termination() + self.assertEqual(code, grpc.StatusCode.OK) + + await self.run_gnmi_server_test(test) + + async def test_subscribe_trigger_created(self): + def test(): + # Create a Subscribe RPC session. + path = gnmi_pb2.Path() + append_path_element(path, "openconfig-interfaces:interfaces") + append_path_element(path, "interface", "name", "Interface1/0/1") + append_path_element(path, "config") + append_path_element(path, "enabled") + s1 = gnmi_pb2.Subscription(path=path, mode="ON_CHANGE") + subscriptions = [s1] + request = gnmi_pb2.SubscribeRequest( + subscribe=gnmi_pb2.SubscriptionList( + mode=gnmi_pb2.SubscriptionList.Mode.STREAM, + subscription=subscriptions, + ), + ) + self.rpc = self.gnmi_subscribe(request) + + time.sleep(self.WAIT_CREATION) + + # Get generated request-id. + with sysrepo.SysrepoConnection() as conn: + with conn.start_session() as sess: + sess.switch_datastore("running") + subscribe_requests = sess.get_data( + "/goldstone-telemetry:subscribe-requests/subscribe-request" + ) + srs = list( + subscribe_requests["subscribe-requests"]["subscribe-request"] + ) + self.assertEqual(len(srs), 1) + sr = srs[0] + generated_id = sr["id"] + + # Send mocked events. + expected_time_min = time.time_ns() + notifs = [ + { + "type": "SYNC_RESPONSE", + "request-id": generated_id, + "subscription-id": 0, + }, + ] + self.set_mock_notifs_data(self.NOTIF_SERVER, self.NOTIF_PATH, notifs) + self.send_mock_notifs(self.NOTIF_SERVER) + + time.sleep(self.WAIT_NOTIFICATION) + + # No initial updates. + + # Receive the sync-response of the initial updates. + actual = self.rpc.take_response() + self.assertEqual(actual.sync_response, True) + + # Send mocked create events. + expected_time_min = time.time_ns() + notifs = [ + { + "type": "UPDATE", + "request-id": generated_id, + "subscription-id": 0, + "path": "/openconfig-interfaces:interfaces/interface[name='Interface1/0/1']/config/enabled", + "json-data": "true", + }, + ] + self.set_mock_notifs_data(self.NOTIF_SERVER, self.NOTIF_PATH, notifs) + self.send_mock_notifs(self.NOTIF_SERVER) + + time.sleep(self.WAIT_NOTIFICATION) + + # Receive updates. + actual = self.rpc.take_response() + expected_time_max = time.time_ns() + self.assertGreater(actual.update.timestamp, expected_time_min) + self.assertLess(actual.update.timestamp, expected_time_max) + self.assertEqual(actual.update.update[0].path, path) + act = json.loads(actual.update.update[0].val.json_val.decode("utf-8")) + expected = True + self.assertEqual(act, expected) + + # Close the RPC session. + self.rpc.requests_closed() + # NOTE: rpc.termination does not work for tests. We cannot close the RPC and confirm teardown procedure. + # _, code, _ = self.rpc.termination() + # self.assertEqual(code, grpc.StatusCode.OK) + # + # Was the subscribe-request deleted? + # self.assertEqual(len(self.servicer._subscribe_requests), 0) + # with sysrepo.SysrepoConnection() as conn: + # with conn.start_session() as sess: + # sess.switch_datastore("running") + # with self.assertRaises(sysrepo.SysrepoNotFoundError): + # sess.get_data( + # f"/goldstone-telemetry:subscribe-requests/subscribe-request[id='{generated_id}']" + # ) + + await self.run_gnmi_server_test(test) + + async def test_subscribe_trigger_updated(self): + def test(): + # Create a Subscribe RPC session. + path = gnmi_pb2.Path() + append_path_element(path, "openconfig-interfaces:interfaces") + append_path_element(path, "interface", "name", "Interface1/0/1") + append_path_element(path, "config") + append_path_element(path, "enabled") + s1 = gnmi_pb2.Subscription(path=path, mode="ON_CHANGE") + subscriptions = [s1] + request = gnmi_pb2.SubscribeRequest( + subscribe=gnmi_pb2.SubscriptionList( + mode=gnmi_pb2.SubscriptionList.Mode.STREAM, + subscription=subscriptions, + ), + ) + self.rpc = self.gnmi_subscribe(request) + + time.sleep(self.WAIT_CREATION) + + # Get generated request-id. + with sysrepo.SysrepoConnection() as conn: + with conn.start_session() as sess: + sess.switch_datastore("running") + subscribe_requests = sess.get_data( + "/goldstone-telemetry:subscribe-requests/subscribe-request" + ) + srs = list( + subscribe_requests["subscribe-requests"]["subscribe-request"] + ) + self.assertEqual(len(srs), 1) + sr = srs[0] + generated_id = sr["id"] + + # Send mocked events. + expected_time_min = time.time_ns() + notifs = [ + { + "type": "UPDATE", + "request-id": generated_id, + "subscription-id": 0, + "path": "/openconfig-interfaces:interfaces/interface[name='Interface1/0/1']/config/enabled", + "json-data": "true", + }, + { + "type": "SYNC_RESPONSE", + "request-id": generated_id, + "subscription-id": 0, + }, + ] + self.set_mock_notifs_data(self.NOTIF_SERVER, self.NOTIF_PATH, notifs) + self.send_mock_notifs(self.NOTIF_SERVER) + + time.sleep(self.WAIT_NOTIFICATION) + + # Receive initial updates. + actual = self.rpc.take_response() + expected_time_max = time.time_ns() + self.assertGreater(actual.update.timestamp, expected_time_min) + self.assertLess(actual.update.timestamp, expected_time_max) + self.assertEqual(actual.update.update[0].path, path) + act = json.loads(actual.update.update[0].val.json_val.decode("utf-8")) + expected = True + self.assertEqual(act, expected) + + # Receive the sync-response of the initial updates. + actual = self.rpc.take_response() + self.assertEqual(actual.sync_response, True) + + # Send mocked update events. + expected_time_min = time.time_ns() + notifs = [ + { + "type": "UPDATE", + "request-id": generated_id, + "subscription-id": 0, + "path": "/openconfig-interfaces:interfaces/interface[name='Interface1/0/1']/config/enabled", + "json-data": "false", + }, + ] + self.set_mock_notifs_data(self.NOTIF_SERVER, self.NOTIF_PATH, notifs) + self.send_mock_notifs(self.NOTIF_SERVER) + + time.sleep(self.WAIT_NOTIFICATION) + + # Receive updates. + actual = self.rpc.take_response() + expected_time_max = time.time_ns() + self.assertGreater(actual.update.timestamp, expected_time_min) + self.assertLess(actual.update.timestamp, expected_time_max) + self.assertEqual(actual.update.update[0].path, path) + act = json.loads(actual.update.update[0].val.json_val.decode("utf-8")) + expected = False + self.assertEqual(act, expected) + + # Close the RPC session. + self.rpc.requests_closed() + # NOTE: rpc.termination does not work for tests. We cannot close the RPC and confirm teardown procedure. + # _, code, _ = self.rpc.termination() + # self.assertEqual(code, grpc.StatusCode.OK) + # + # Was the subscribe-request deleted? + # self.assertEqual(len(self.servicer._subscribe_requests), 0) + # with sysrepo.SysrepoConnection() as conn: + # with conn.start_session() as sess: + # sess.switch_datastore("running") + # with self.assertRaises(sysrepo.SysrepoNotFoundError): + # sess.get_data( + # f"/goldstone-telemetry:subscribe-requests/subscribe-request[id='{generated_id}']" + # ) + + await self.run_gnmi_server_test(test) + + async def test_subscribe_trigger_deleted(self): + def test(): + # Create a Subscribe RPC session. + path = gnmi_pb2.Path() + append_path_element(path, "openconfig-interfaces:interfaces") + append_path_element(path, "interface", "name", "Interface1/0/1") + append_path_element(path, "config") + append_path_element(path, "enabled") + s1 = gnmi_pb2.Subscription(path=path, mode="ON_CHANGE") + subscriptions = [s1] + request = gnmi_pb2.SubscribeRequest( + subscribe=gnmi_pb2.SubscriptionList( + mode=gnmi_pb2.SubscriptionList.Mode.STREAM, + subscription=subscriptions, + ), + ) + self.rpc = self.gnmi_subscribe(request) + + time.sleep(self.WAIT_CREATION) + + # Get generated request-id. + with sysrepo.SysrepoConnection() as conn: + with conn.start_session() as sess: + sess.switch_datastore("running") + subscribe_requests = sess.get_data( + "/goldstone-telemetry:subscribe-requests/subscribe-request" + ) + srs = list( + subscribe_requests["subscribe-requests"]["subscribe-request"] + ) + self.assertEqual(len(srs), 1) + sr = srs[0] + generated_id = sr["id"] + + # Send mocked events. + expected_time_min = time.time_ns() + notifs = [ + { + "type": "UPDATE", + "request-id": generated_id, + "subscription-id": 0, + "path": "/openconfig-interfaces:interfaces/interface[name='Interface1/0/1']/config/enabled", + "json-data": "true", + }, + { + "type": "SYNC_RESPONSE", + "request-id": generated_id, + "subscription-id": 0, + }, + ] + self.set_mock_notifs_data(self.NOTIF_SERVER, self.NOTIF_PATH, notifs) + self.send_mock_notifs(self.NOTIF_SERVER) + + time.sleep(self.WAIT_NOTIFICATION) + + # Receive initial updates. + actual = self.rpc.take_response() + expected_time_max = time.time_ns() + self.assertGreater(actual.update.timestamp, expected_time_min) + self.assertLess(actual.update.timestamp, expected_time_max) + self.assertEqual(actual.update.update[0].path, path) + act = json.loads(actual.update.update[0].val.json_val.decode("utf-8")) + expected = True + self.assertEqual(act, expected) + + # Receive the sync-response of the initial updates. + actual = self.rpc.take_response() + self.assertEqual(actual.sync_response, True) + + # Send mocked delete events. + expected_time_min = time.time_ns() + notifs = [ + { + "type": "DELETE", + "request-id": generated_id, + "subscription-id": 0, + "path": "/openconfig-interfaces:interfaces/interface[name='Interface1/0/1']/config/enabled", + }, + ] + self.set_mock_notifs_data(self.NOTIF_SERVER, self.NOTIF_PATH, notifs) + self.send_mock_notifs(self.NOTIF_SERVER) + + time.sleep(self.WAIT_NOTIFICATION) + + # Receive updates. + actual = self.rpc.take_response() + expected_time_max = time.time_ns() + self.assertGreater(actual.update.timestamp, expected_time_min) + self.assertLess(actual.update.timestamp, expected_time_max) + self.assertEqual(actual.update.delete[0], path) + + # Close the RPC session. + self.rpc.requests_closed() + # NOTE: rpc.termination does not work for tests. We cannot close the RPC and confirm teardown procedure. + # _, code, _ = self.rpc.termination() + # self.assertEqual(code, grpc.StatusCode.OK) + # + # Was the subscribe-request deleted? + # self.assertEqual(len(self.servicer._subscribe_requests), 0) + # with sysrepo.SysrepoConnection() as conn: + # with conn.start_session() as sess: + # sess.switch_datastore("running") + # with self.assertRaises(sysrepo.SysrepoNotFoundError): + # sess.get_data( + # f"/goldstone-telemetry:subscribe-requests/subscribe-request[id='{generated_id}']" + # ) + + await self.run_gnmi_server_test(test) + + if __name__ == "__main__": unittest.main()