Skip to content

Commit

Permalink
Rename Update to NodeUpdate (#560)
Browse files Browse the repository at this point in the history
  • Loading branch information
nmgaston authored Oct 4, 2024
1 parent be920c9 commit eab9ae6
Show file tree
Hide file tree
Showing 17 changed files with 59 additions and 59 deletions.
2 changes: 1 addition & 1 deletion inbm-lib/inbm_common_lib/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
EVENT_CHANNEL = 'manageability/event'
TELEMETRY_CHANNEL = 'manageability/telemetry'
# Used for Node updates to be sent to UDM
UPDATE_CHANNEL = 'manageability/update'
NODE_UPDATE_CHANNEL = 'manageability/nodeupdate'
CONFIG_CHANNEL = 'ma/configuration/update/'

# Request constants
Expand Down
2 changes: 1 addition & 1 deletion inbm/cloudadapter-agent/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ The agent subscribes to the following topics:
- Agent events: `manageability/event`
- Responses: `manageability/response`
- Device telemetry: `manageability/telemetry`
- Update from scheduled requests: `manageability/update`
- Update from scheduled requests: `manageability/nodeupdate`

`+` is a wild-card indicating single level thus matching `diagnostic/state` or `<another-agent>/state`

Expand Down
4 changes: 2 additions & 2 deletions inbm/cloudadapter-agent/cloudadapter/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@ def _bind_agent_to_cloud(self) -> None:
lambda _, payload: self._cloud_publisher.publish_event(payload)
)
self._broker.bind_callback(
TC_TOPIC.UPDATE,
lambda _, payload: self._cloud_publisher.publish_update(payload)
TC_TOPIC.NODE_UPDATE,
lambda _, payload: self._cloud_publisher.publish_node_update(payload)
)

def _bind_ucc_to_agent(self) -> None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,13 @@ def bind_callback(self, name: str, callback: Callable) -> None:
"""
pass

def publish_update(self, message: str) -> None:
def publish_node_update(self, message: str) -> None:
"""Publishes an update to the cloud
@param message: (str) The update message to send
@exception PublishError: If publish fails
"""
self._client.publish_update("update", message)
self._client.publish_node_update("update", message)

def publish_event(self, message: str) -> None:
"""Publishes an event to the cloud
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,21 @@
class CloudClient:

def __init__(self, connection: MQTTConnection, telemetry: OneWayMessenger, event: OneWayMessenger,
update: OneWayMessenger | None, attribute: OneWayMessenger,
node_update: OneWayMessenger | None, attribute: OneWayMessenger,
handler: ReceiveRespondHandler) -> None:
"""Constructor for CloudClient
@param connection: Connection associated with this CloudClient
@param telemetry: Messenger to send telemetry
@param event: Messenger to send events
@param update: Messenger to send updates
@param node_update: Messenger to send node updates
@param attribute: Messenger to send attributes
@param handler: Handler to deal with cloud method calls
"""
self._connection = connection
self._telemetry = telemetry
self._event = event
self._update: OneWayMessenger | None = update
self._node_update: OneWayMessenger | None = node_update
self._attribute = attribute
self._handler = handler

Expand Down Expand Up @@ -57,18 +57,18 @@ def publish_telemetry(self, key: str, value: str, time: datetime) -> None:
"""
return self._telemetry.publish(key, value, time)

def publish_update(self, key: str, value: str) -> None:
def publish_node_update(self, key: str, value: str) -> None:
"""Publishes an update to the cloud
@param key: key to publish
@param value: update to publish
@exception PublishError: If publish fails
"""
if self._update is None:
if self._node_update is None:
logger.error("Received update publish request but no update messenger is configured")
return None
else:
return self._update.publish(key, value)
return self._node_update.publish(key, value)

def publish_event(self, key: str, value: str) -> None:
"""Publishes an event to the cloud
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import random
import logging
import threading
import uuid
from google.protobuf.timestamp_pb2 import Timestamp
from typing import Callable, Optional
from datetime import datetime
Expand All @@ -26,6 +27,7 @@
import grpc # type: ignore
from .cloud_client import CloudClient


logger = logging.getLogger(__name__)


Expand Down Expand Up @@ -116,15 +118,15 @@ def publish_telemetry(self, key: str, value: str, time: datetime) -> None:

pass # INBS is not yet ready to receive telemetry

def publish_update(self, key: str, value: str) -> None:
"""Publishes an update to the cloud
def publish_node_update(self, key: str, value: str) -> None:
"""Publishes a node update to the cloud
@param key: key to publish
@param value: node update message to publish
@exception PublishError: If publish fails
"""
if self._grpc_channel is None:
raise PublishError("gRPC channel not set up before calling InbsCloudClient.publish_update")
raise PublishError("gRPC channel not set up before calling InbsCloudClient.publish_node_update")

is_valid = is_valid_json_structure(value, NODE_UPDATE_JSON_SCHEMA_LOCATION)
if not is_valid:
Expand Down Expand Up @@ -156,10 +158,9 @@ def publish_update(self, key: str, value: str) -> None:
actual_end_time=timestamp,
job_state=job_state
)


request = inbs_sb_pb2.SendNodeUpdateRequest(
request_id="notused",
request_id=str(uuid.uuid4()),
job_update=job,
)
logger.debug(f"Sending node update to INBS: request={request}")
Expand Down Expand Up @@ -234,7 +235,6 @@ def _handle_inbm_command_request(
)
continue


if command_type:
if command_type == "update_scheduled_operations":
# Convert operations to Dispatcher's ScheduleRequest
Expand Down
10 changes: 5 additions & 5 deletions inbm/cloudadapter-agent/cloudadapter/cloud/cloud_builders.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ def build_messenger_with_config(config: Dict[str, Any]):
telemetry = config.get("telemetry")
attribute = config.get("attribute")
event = config.get("event")
update = config.get("update")
node_update = config.get("node_update")

if telemetry:
telemetry = build_messenger_with_config(telemetry)
Expand All @@ -155,10 +155,10 @@ def build_messenger_with_config(config: Dict[str, Any]):
else:
raise ClientBuildError(
"Missing 'attribute' MQTT config information while setting up cloud connection.")
if update:
update = build_messenger_with_config(update)
if node_update:
node_update = build_messenger_with_config(node_update)
else:
logger.debug("Missing 'update' MQTT config information while setting up cloud connection. TODO: figure out why this doesn't happen for INBS, but does happen for other clouds.")
logger.debug("Missing 'node_update' MQTT config information while setting up cloud connection. TODO: figure out why this doesn't happen for INBS, but does happen for other clouds.")
if event:
event = build_messenger_with_config(event)
else:
Expand Down Expand Up @@ -205,6 +205,6 @@ def build_messenger_with_config(config: Dict[str, Any]):
connection=connection,
telemetry=telemetry,
event=event,
update=update,
node_update=node_update,
attribute=attribute,
handler=handler)
4 changes: 2 additions & 2 deletions inbm/cloudadapter-agent/cloudadapter/cloud/cloud_publisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,13 +73,13 @@ def publish_telemetry(self, message: str) -> None:
except PublishError as e:
logger.error(str(e))

def publish_update(self, message: str) -> None:
def publish_node_update(self, message: str) -> None:
"""Send node update to UDM
@param message: (str) JSON formatted SendNodeUpdateRequest
"""
logger.debug(f"Received node update: {message}")
try:
self._adapter.publish_update(message)
self._adapter.publish_node_update(message)
except PublishError as e:
logger.error(str(e))
4 changes: 2 additions & 2 deletions inbm/cloudadapter-agent/cloudadapter/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
SPDX-License-Identifier: Apache-2.0
"""

from inbm_common_lib.constants import UPDATE_CHANNEL, TELEMETRY_CHANNEL, RESPONSE_CHANNEL, EVENT_CHANNEL
from inbm_common_lib.constants import NODE_UPDATE_CHANNEL, TELEMETRY_CHANNEL, RESPONSE_CHANNEL, EVENT_CHANNEL
from inbm_lib.constants import DOCKER_STATS
from inbm_lib.path_prefixes import INTEL_MANAGEABILITY_ETC_PATH_PREFIX
from inbm_lib.path_prefixes import INTEL_MANAGEABILITY_SHARE_PATH_PREFIX, BROKER_ETC_PATH
Expand Down Expand Up @@ -39,7 +39,7 @@ class TC_TOPIC:
STATE = tuple([STATE_CHANNEL])
TELEMETRY = tuple([TELEMETRY_CHANNEL]) # Shared by TC and UCC
EVENT = tuple([EVENT_CHANNEL, RESPONSE_CHANNEL]) # TODO: What's up with response?
UPDATE = tuple([UPDATE_CHANNEL]) # Used for Node updates to be sent to UDM
NODE_UPDATE = tuple([NODE_UPDATE_CHANNEL]) # Used for Node updates to be sent to UDM

# ========== Publishing channels

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,14 @@ def setUp(self) -> None:
self.mock_telemetry = mock.create_autospec(Messenger)
self.mock_attribute = mock.create_autospec(Messenger)
self.mock_event = mock.create_autospec(Messenger)
self.mock_update = mock.create_autospec(Messenger)
self.mock_node_update = mock.create_autospec(Messenger)
self.mock_handler = mock.create_autospec(Handler)

self.cloud_client = CloudClient(
connection=self.mock_connection,
telemetry=self.mock_telemetry,
event=self.mock_event,
update=self.mock_update,
node_update=self.mock_node_update,
attribute=self.mock_attribute,
handler=self.mock_handler
)
Expand All @@ -47,10 +47,10 @@ def test_publish_attribute_succeeds(self) -> None:
self.cloud_client.publish_attribute(*args)
assert self.mock_attribute.publish.call_count == 1

def test_publish_update_succeeds(self) -> None:
def test_publish_node_update_succeeds(self) -> None:
args = ("key", "value")
self.cloud_client.publish_update(*args)
assert self.mock_update.publish.call_count == 1
self.cloud_client.publish_node_update(*args)
assert self.mock_node_update.publish.call_count == 1

def test_publish_event_succeeds(self) -> None:
args = ("key", "value")
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import threading
import pytest
from mock import MagicMock, Mock, patch
import queue
Expand Down Expand Up @@ -52,7 +51,7 @@ def test_publish_telemetry(self, inbs_client: InbsCloudClient) -> None:
key="example_key", value="example_value", time=datetime.now()
)

def test_publish_update(self, inbs_client: InbsCloudClient) -> None:
def test_publish_node_update(self, inbs_client: InbsCloudClient) -> None:
mock_channel = MagicMock()
mock_channel.SendNodeUpdateRequest.return_value = "MockResponse"
inbs_client._grpc_channel = mock_channel
Expand All @@ -62,7 +61,7 @@ def test_publish_update(self, inbs_client: InbsCloudClient) -> None:

# Call the publish_update method
with patch('cloudadapter.cloud.client.inbs_cloud_client.is_valid_json_structure', return_value=True):
inbs_client.publish_update(key, value)
inbs_client.publish_node_update(key, value)

# Assert that the gRPC channel's SendNodeUpdate method was called
mock_channel.SendNodeUpdate.assert_called_once()
Expand All @@ -76,9 +75,9 @@ def test_publish_update_failure_no_grpc_channel(self, inbs_client: InbsCloudClie
key = 'test-key'
value = '{"job_id": "12345", "status": 200, "message": "Update successful"}'

# Call the publish_update method and expect a PublishError
# Call the publish_node_update method and expect a PublishError
with pytest.raises(PublishError):
inbs_client.publish_update(key, value)
inbs_client.publish_node_update(key, value)

def test_publish_event(self, inbs_client: InbsCloudClient) -> None:
# this is not expected to do anything yet
Expand Down
12 changes: 6 additions & 6 deletions inbm/cloudadapter-agent/tests/unit/cloud/test_cloud_builders.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@ def setUp(self) -> None:
"pub": "event_pub",
"format": "event_format"
},
"update": {
"pub": "update_pub",
"format": "update_format"
"node_update": {
"pub": "node_update_pub",
"format": "node_update_format"
},
"telemetry": {
"pub": "telemetry_pub",
Expand Down Expand Up @@ -93,9 +93,9 @@ def setUp(self) -> None:
"pub": "event_pub",
"format": "event_format"
},
"update": {
"pub": "update_pub",
"format": "update_format"
"node_update": {
"pub": "node_update_pub",
"format": "node_update_format"
},
"command": {
"pub": "manageability/request/command",
Expand Down
18 changes: 9 additions & 9 deletions inbm/cloudadapter-agent/tests/unit/cloud/test_cloud_publisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,23 +24,23 @@ def setUp(self, MockedAdapter, mock_logger) -> None:
self.MockedAdapter = MockedAdapter
self.cloud_publisher = CloudPublisher(self.MockedAdapter("config"))

def test_publish_update_succeed(self) -> None:
def test_publish_node_update_succeed(self) -> None:
update = "update"
self.cloud_publisher.publish_update(update)
self.cloud_publisher.publish_node_update(update)

mocked = self.MockedAdapter.return_value
mocked.publish_update.assert_called_once_with(update)
mocked.publish_node_update.assert_called_once_with(update)

@mock.patch("cloudadapter.cloud.cloud_publisher.logger")
def test_publish_update_with_adapter__succeeds(self, mock_logger) -> None:
self.MockedAdapter.return_value.publish_update.return_value = None
self.cloud_publisher.publish_update('{"status": 200, "message": "COMMAND SUCCESSFUL", "job_id": "swupd-0cdce9d5-523b-43d9-8673-d54fd61498fe"}')
def test_publish_node_update_with_adapter__succeeds(self, mock_logger) -> None:
self.MockedAdapter.return_value.publish_node_update.return_value = None
self.cloud_publisher.publish_node_update('{"status": 200, "message": "COMMAND SUCCESSFUL", "job_id": "swupd-0cdce9d5-523b-43d9-8673-d54fd61498fe"}')
assert mock_logger.error.call_count == 0

@mock.patch("cloudadapter.cloud.cloud_publisher.logger")
def test_publish_update_with_adapter_fails(self, mock_logger) -> None:
self.MockedAdapter.return_value.publish_update.side_effect = PublishError("Error!")
self.cloud_publisher.publish_update('{"status": 200, "message": "COMMAND SUCCESSFUL", "job_id": "swupd-0cdce9d5-523b-43d9-8673-d54fd61498fe"}')
def test_publish_node_update_with_adapter_fails(self, mock_logger) -> None:
self.MockedAdapter.return_value.publish_node_update.side_effect = PublishError("Error!")
self.cloud_publisher.publish_node_update('{"status": 200, "message": "COMMAND SUCCESSFUL", "job_id": "swupd-0cdce9d5-523b-43d9-8673-d54fd61498fe"}')
assert mock_logger.error.call_count == 1

def test_publish_event_succeed(self) -> None:
Expand Down
1 change: 1 addition & 0 deletions inbm/dispatcher-agent/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ The agent publishes to the following topics:
- Dynamic telemetry updates: `telemetry/update`
- Informs diagnostic-agent remediation manager to remove a specific container: `remediation/container`
- Informs diagnostic-agent remediation manager to remove a specific image:`remediation/image`
- Sends the result of a scheduled request received from UDMScheduled node update. This result is actually sent as a request to cloudadapter: `manageability/nodeupdate`
- dispatcher-agent state: dispatcher/state` when dead/running


Expand Down
2 changes: 1 addition & 1 deletion inbm/dispatcher-agent/dispatcher/aota/aota_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

from dispatcher.common.result_constants import INSTALL_FAILURE, CODE_OK
from dispatcher.config_dbs import ConfigDbs
from dispatcher.constants import TELEMETRY_UPDATE_CHANNEL, UMASK_OTA
from dispatcher.constants import UMASK_OTA
from dispatcher.packageinstaller.package_installer import TrtlContainer
from dispatcher.packagemanager.local_repo import DirectoryRepo
from dispatcher.packagemanager.package_manager import get
Expand Down
8 changes: 4 additions & 4 deletions inbm/dispatcher-agent/dispatcher/dispatcher_broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from inbm_lib.json_validator import is_valid_json_structure
from inbm_lib.constants import NODE_UPDATE_JSON_SCHEMA_LOCATION

from inbm_common_lib.constants import RESPONSE_CHANNEL, EVENT_CHANNEL, UPDATE_CHANNEL
from inbm_common_lib.constants import RESPONSE_CHANNEL, EVENT_CHANNEL, NODE_UPDATE_CHANNEL

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -48,9 +48,9 @@ def send_update(self, message: str) -> None:
@param message: message to be published to cloud
@param job_id: Job ID used to track the request in both UDM and TC
"""
logger.debug(f"Sending node update for to {UPDATE_CHANNEL} with message: {message}")
self.mqtt_publish(topic=UPDATE_CHANNEL, payload=message)
logger.debug(f"Sending node update for to {NODE_UPDATE_CHANNEL} with message: {message}")
self.mqtt_publish(topic=NODE_UPDATE_CHANNEL, payload=message)

def _check_db_for_started_job(self) -> Optional[Schedule]:
sqliteMgr = SqliteManager()
schedule = sqliteMgr.get_any_started_schedule()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ topic write manageability/event
topic write manageability/cmd/+
topic write manageability/response
topic write manageability/response/+
topic write manageability/update
topic write manageability/nodeupdate
topic write ma/configuration/update/+
topic write dispatcher/query

Expand All @@ -23,7 +23,7 @@ topic read manageability/response
topic read manageability/response/+
topic read manageability/telemetry
topic read manageability/event
topic read manageability/update
topic read manageability/nodeupdate

user inbc-program
topic write manageability/request/#
Expand Down

0 comments on commit eab9ae6

Please sign in to comment.