Skip to content

Commit

Permalink
Send back job id to udm (#548)
Browse files Browse the repository at this point in the history
Modified DB to store immediate jobs to track the job ID when sending the result.
Adding a new manageability/update channel between Dispatcher and CloudAdapter to send node updates back to INBS
Sending the update response back to INBS via gRPC
  • Loading branch information
nmgaston authored Sep 28, 2024
1 parent 83052be commit 9b0ccfa
Show file tree
Hide file tree
Showing 39 changed files with 1,021 additions and 443 deletions.
36 changes: 34 additions & 2 deletions docs/Dispatcher Scheduling DB Design.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,21 +37,35 @@ erDiagram
TEXT status "NULL or scheduled"
}
IMMEDIATE_SCHEDULE {
INTEGER id PK "AUTOINCREMENT"
TEXT request_id
}
SINGLE_SCHEDULE {
INTEGER id PK "AUTOINCREMENT"
TEXT request_id "NOT NULL - Format -> 2024-01-01T00:00:00"
TEXT request_id
TEXT start_time "NOT NULL - Format -> 2024-01-01T00:00:00"
TEXT end_time
TEXT end_time "NOT NULL - Format -> 2024-01-01T00:00:00"
}
JOB ||--o{ SINGLE_SCHEDULE_JOB: performs
JOB ||--o{ REPEATED_SCHEDULE_JOB : performs
JOB ||--o{ IMMEDIATE_SCHEDULE_JOB :performs
JOB {
INTEGER task_id PK "AUTOINCREMENT"
TEXT job_id "FROM MJUNCT"
TEXT manifest "NOT NULL"
}
IMMEDIATE_SCHEDULE ||--|{ IMMEDIATE_SCHEDULE_JOB : schedules
IMMEDIATE_SCHEDULE_JOB {
INTEGER priority "Order the job manifests should run - Starting with 0"
INTEGER schedule_id PK,FK "REFERENCES IMMEDIATE_SCHEDULE(schedule_id)"
INTEGER task_id PK,FK "REFERENCES job(task_id)"
TEXT status "NULL or scheduled"
}
REPEATED_SCHEDULE ||--|{ REPEATED_SCHEDULE_JOB : schedules
REPEATED_SCHEDULE_JOB {
INTEGER priority "Order the job manifests should run"
Expand Down Expand Up @@ -85,6 +99,14 @@ erDiagram
| 4 | fwupd-718814f3-b12a-432e-ac38-093e8dcb4bd1 | valid Inband Manageability XML manifest - FOTA |
| 5 | setpwr-d8be8ae4-7512-43c0-9bdd-9a066de17322 | valid Inband Manageability XML manifest - Reboot system |

### IMMEDIATE_SCHEDULE Table

| id | request_id |
| :---- | :---- |
| 1 | 6bf587ac-1d70-4e21-9a15-097f6292b9c4 |
| 2 | 6bf587ac-1d70-4e21-9a15-097f6292b9c4 |
| 3 | c9b74125-f3bb-440a-ad80-8d02090bd337 |

### SINGLE_SCHEDULE Table

| id | request_id | start_time | end_time |
Expand All @@ -104,6 +126,16 @@ NOTE: These values may not make sense in the real world. Just for demonstration
| 3 | c9b74125-f3bb-440a-ad80-8d02090bd337 | P2D | 0 | */8 | * | * | * |
| 4 | c9b74125-f3bb-440a-ad80-8d02090bd337 | P14D | 0 | * | * | * | * |

### IMMEDIATE_SCHEDULE_JOB Table

Example: To do a download, install, and reboot of SOTA at the time in schedule 1

| priority | schedule_id | task_id | status |
| :---- | :---- | :---- | :----- |
| 0 | 1 | 1 | scheduled |
| 1 | 1 | 2 | scheduled |
| 2 | 1 | 3 | |

### SINGLE_SCHEDULE_JOB Table

Example: To do a download, install, and reboot of SOTA at the time in schedule 1
Expand Down
2 changes: 2 additions & 0 deletions inbm-lib/inbm_common_lib/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
RESPONSE_CHANNEL = 'manageability/response'
EVENT_CHANNEL = 'manageability/event'
TELEMETRY_CHANNEL = 'manageability/telemetry'
# Used for Node updates to be sent to UDM
UPDATE_CHANNEL = 'manageability/update'
CONFIG_CHANNEL = 'ma/configuration/update/'

# Request constants
Expand Down
1 change: 1 addition & 0 deletions inbm/cloudadapter-agent/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ The agent subscribes to the following topics:
- Agent events: `manageability/event`
- Responses: `manageability/response`
- Device telemetry: `manageability/telemetry`
- Update from scheduled requests: `manageability/update`

`+` is a wild-card indicating single level thus matching `diagnostic/state` or `<another-agent>/state`

Expand Down
5 changes: 5 additions & 0 deletions inbm/cloudadapter-agent/cloudadapter/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,10 @@ def _bind_agent_to_cloud(self) -> None:
TC_TOPIC.EVENT,
lambda _, payload: self._cloud_publisher.publish_event(payload)
)
self._broker.bind_callback(
TC_TOPIC.UPDATE,
lambda _, payload: self._cloud_publisher.publish_update(payload)
)

def _bind_ucc_to_agent(self) -> None:
logger.debug("Binding cloud to Command")
Expand Down Expand Up @@ -163,6 +167,7 @@ def stop(self) -> None:
logger.debug("Stopping cloudadapter client")
self._broker.stop()
self._cloud_publisher.publish_event("Disconnected")
self._cloud_publisher.publish_update("Disconnected")
try:
logger.debug("Calling disconnect on adapter")
self._adapter.disconnect()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,14 @@ def bind_callback(self, name: str, callback: Callable) -> None:
"""
pass

def publish_update(self, message: str) -> None:
"""Publishes an update to the cloud
@param message: (str) The update message to send
@exception PublishError: If publish fails
"""
self._client.publish_update("update", message)

def publish_event(self, message: str) -> None:
"""Publishes an event to the cloud
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,23 +8,28 @@
from .handlers.receive_respond_handler import ReceiveRespondHandler
from typing import Callable, Optional
from datetime import datetime
import logging

logger = logging.getLogger(__name__)

class CloudClient:

def __init__(self, connection: MQTTConnection, telemetry: OneWayMessenger, event: OneWayMessenger,
attribute: OneWayMessenger, handler: ReceiveRespondHandler) -> None:
update: OneWayMessenger | None, attribute: OneWayMessenger,
handler: ReceiveRespondHandler) -> None:
"""Constructor for CloudClient
@param connection: Connection associated with this CloudClient
@param telemetry: Messenger to send telemetry
@param event: Messenger to send events
@param update: Messenger to send updates
@param attribute: Messenger to send attributes
@param handler: Handler to deal with cloud method calls
"""
self._connection = connection
self._telemetry = telemetry
self._event = event
self._update: OneWayMessenger | None = update
self._attribute = attribute
self._handler = handler

Expand Down Expand Up @@ -52,6 +57,19 @@ def publish_telemetry(self, key: str, value: str, time: datetime) -> None:
"""
return self._telemetry.publish(key, value, time)

def publish_update(self, key: str, value: str) -> None:
"""Publishes an update to the cloud
@param key: key to publish
@param value: update to publish
@exception PublishError: If publish fails
"""
if self._update is None:
logger.error("Received update publish request but no update messenger is configured")
return None
else:
return self._update.publish(key, value)

def publish_event(self, key: str, value: str) -> None:
"""Publishes an event to the cloud
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,23 +4,24 @@
"""

from collections.abc import Generator
import json
import queue
import random
import threading
import time
from typing import Callable, Optional, Any
from google.protobuf.timestamp_pb2 import Timestamp
from typing import Callable, Optional
from datetime import datetime

from cloudadapter.cloud.adapters.inbs.operation import (
convert_updated_scheduled_operations_to_dispatcher_xml,
)
from cloudadapter.constants import METHOD, DEAD
from cloudadapter.exceptions import AuthenticationError
from cloudadapter.exceptions import AuthenticationError, PublishError
from cloudadapter.pb.inbs.v1 import inbs_sb_pb2_grpc, inbs_sb_pb2
from cloudadapter.pb.common.v1 import common_pb2
import logging

import grpc
import grpc # type: ignore
from .cloud_client import CloudClient

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -68,9 +69,10 @@ def __init__(
raise AuthenticationError(
"TLS certificate path is required when TLS is enabled."
)

self._stop_event = threading.Event()

self._grpc_channel: grpc.Channel | None = None # this will get set after connect is called

def get_client_id(self) -> Optional[str]:
"""A readonly property
Expand Down Expand Up @@ -112,6 +114,53 @@ def publish_telemetry(self, key: str, value: str, time: datetime) -> None:

pass # INBS is not yet ready to receive telemetry

def publish_update(self, key: str, value: str) -> None:
"""Publishes an update to the cloud
@param message: node update message to publish
@exception PublishError: If publish fails
"""
if self._grpc_channel is None:
raise PublishError("gRPC channel not set up before calling InbsCloudClient.publish_update")

# Turn the message into a dict
logger.debug(f"Received node update: key={key}, value={value}")
try:
message_dict = json.loads(value)
except json.JSONDecodeError as e:
logger.error(f"Cannot convert formatted message to dict: {value}. Error: {e}")
return

status_code=message_dict.get("status", "")
job_state =common_pb2.Job.JobState.PASSED \
if status_code == 200 \
else common_pb2.Job.JobState.FAILED

result_messages = json.dumps(message_dict.get("message", ""))

timestamp = Timestamp()
timestamp.GetCurrentTime()
job=common_pb2.Job(
job_id=message_dict.get("job_id", ""),
node_id=self._client_id,
status_code=status_code,
result_msgs=result_messages,
actual_end_time=timestamp,
job_state=job_state
)

request = inbs_sb_pb2.SendNodeUpdateRequest(
request_id="notused",
job_update=job,
)
logger.debug(f"Sending node update to INBS: request={request}")

try:
response = self._grpc_channel.SendNodeUpdate(request, metadata=self._metadata)
logger.info(f"Received response from gRPC server: {response}")
except grpc.RpcError as e:
logger.error(f"Failed to send node update via gRPC: {e}")

def publish_event(self, key: str, value: str) -> None:
"""Publishes an event to the cloud
Expand Down Expand Up @@ -144,7 +193,7 @@ def bind_callback(self, name: str, callback: Callable) -> None:

# for now ignore all callbacks; only Ping is supported
self._callbacks[name] = callback

def _handle_inbm_command_request(
self, request_queue: queue.Queue[inbs_sb_pb2.HandleINBMCommandRequest | None]
) -> Generator[inbs_sb_pb2.HandleINBMCommandResponse, None, None]:
Expand Down Expand Up @@ -255,8 +304,9 @@ def _handle_inbm_command_request(
break
logger.debug("Exiting _handle_inbm_command_request")

def _do_socket_connect(self):
"""Handle the socket/TLS/HTTP connection to the gRPC server."""
def _make_grpc_channel(self) -> grpc.Channel:
"""Handle the socket/TLS/HTTP connection to the gRPC server.
Assumption: should not connect until first gRPC command."""
if self._tls_enabled:
# Create a secure channel with SSL credentials
logger.debug("Setting up connection to INBS cloud with TLS enabled")
Expand All @@ -270,30 +320,36 @@ def _do_socket_connect(self):
self.channel = grpc.insecure_channel(
f"{self._grpc_hostname}:{self._grpc_port}"
)

self.stub = inbs_sb_pb2_grpc.INBSSBServiceStub(self.channel)

logger.info(
f"Connection set up for {self._grpc_hostname}:{self._grpc_port}; will attempt TCP connection on first request."
)
return inbs_sb_pb2_grpc.INBSSBServiceStub(self.channel)

def connect(self):
# set up the gRPC channel
self._grpc_channel = self._make_grpc_channel()

# Start the background thread
self.background_thread = threading.Thread(target=self._run)
self.background_thread.start()

def _run(self): # pragma: no cover # multithreaded operation not unit testable
"""INBS cloud loop. Intended to be used inside a background thread."""

if self._grpc_channel is None:
raise RuntimeError("gRPC channel not set up before calling InbsCloudClient._run")

backoff = 0.1 # Initial fixed backoff delay in seconds
max_backoff = 4.0 # Maximum backoff delay in seconds

while not self._stop_event.is_set():
logger.debug("InbsCloudClient _run loop")
try:
self._do_socket_connect()
request_queue: queue.Queue[
inbs_sb_pb2.HandleINBMCommandRequest | None
] = queue.Queue()
stream = self.stub.HandleINBMCommand(
stream = self._grpc_channel.HandleINBMCommand(
self._handle_inbm_command_request(request_queue),
metadata=self._metadata,
)
Expand Down
6 changes: 6 additions & 0 deletions inbm/cloudadapter-agent/cloudadapter/cloud/cloud_builders.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ def build_messenger_with_config(config: Dict[str, Any]):
telemetry = config.get("telemetry")
attribute = config.get("attribute")
event = config.get("event")
update = config.get("update")

if telemetry:
telemetry = build_messenger_with_config(telemetry)
Expand All @@ -154,6 +155,10 @@ def build_messenger_with_config(config: Dict[str, Any]):
else:
raise ClientBuildError(
"Missing 'attribute' MQTT config information while setting up cloud connection.")
if update:
update = build_messenger_with_config(update)
else:
logger.debug("Missing 'update' MQTT config information while setting up cloud connection. TODO: figure out why this doesn't happen for INBS, but does happen for other clouds.")
if event:
event = build_messenger_with_config(event)
else:
Expand Down Expand Up @@ -200,5 +205,6 @@ def build_messenger_with_config(config: Dict[str, Any]):
connection=connection,
telemetry=telemetry,
event=event,
update=update,
attribute=attribute,
handler=handler)
11 changes: 11 additions & 0 deletions inbm/cloudadapter-agent/cloudadapter/cloud/cloud_publisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,3 +72,14 @@ def publish_telemetry(self, message: str) -> None:
self._adapter.publish_attribute(key, value)
except PublishError as e:
logger.error(str(e))

def publish_update(self, message: str) -> None:
"""Send node update to UDM
@param message: (str) JSON formatted SendNodeUpdateRequest
"""
logger.debug(f"Received node update: {message}")
try:
self._adapter.publish_update(message)
except PublishError as e:
logger.error(str(e))
4 changes: 2 additions & 2 deletions inbm/cloudadapter-agent/cloudadapter/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
SPDX-License-Identifier: Apache-2.0
"""

from inbm_common_lib.constants import TELEMETRY_CHANNEL, RESPONSE_CHANNEL, EVENT_CHANNEL
from inbm_common_lib.constants import UPDATE_CHANNEL, TELEMETRY_CHANNEL, RESPONSE_CHANNEL, EVENT_CHANNEL
from inbm_lib.constants import DOCKER_STATS
from inbm_lib.path_prefixes import INTEL_MANAGEABILITY_ETC_PATH_PREFIX
from inbm_lib.path_prefixes import INTEL_MANAGEABILITY_SHARE_PATH_PREFIX, BROKER_ETC_PATH
Expand Down Expand Up @@ -39,7 +39,7 @@ class TC_TOPIC:
STATE = tuple([STATE_CHANNEL])
TELEMETRY = tuple([TELEMETRY_CHANNEL]) # Shared by TC and UCC
EVENT = tuple([EVENT_CHANNEL, RESPONSE_CHANNEL]) # TODO: What's up with response?

UPDATE = tuple([UPDATE_CHANNEL]) # Used for Node updates to be sent to UDM

# ========== Publishing channels

Expand Down
Loading

0 comments on commit 9b0ccfa

Please sign in to comment.