Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add JSON schema validation for Node Update message #558

Merged
merged 17 commits into from
Oct 3, 2024
Merged
1 change: 0 additions & 1 deletion inbm/cloudadapter-agent/cloudadapter/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,6 @@ def stop(self) -> None:
logger.debug("Stopping cloudadapter client")
self._broker.stop()
self._cloud_publisher.publish_event("Disconnected")
self._cloud_publisher.publish_update("Disconnected")
try:
logger.debug("Calling disconnect on adapter")
self._adapter.disconnect()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,14 +141,15 @@ def publish_update(self, key: str, value: str) -> None:
timestamp = Timestamp()
nmgaston marked this conversation as resolved.
Show resolved Hide resolved
timestamp.GetCurrentTime()
job=common_pb2.Job(
job_id=message_dict.get("job_id", ""),
job_id=message_dict.get("jobId", ""),
node_id=self._client_id,
status_code=status_code,
result_msgs=result_messages,
actual_end_time=timestamp,
job_state=job_state
)


request = inbs_sb_pb2.SendNodeUpdateRequest(
request_id="notused",
job_update=job,
Expand Down
2 changes: 2 additions & 0 deletions inbm/dispatcher-agent/dispatcher/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@
'dispatcher-agent' / 'manifest_schema.xsd')
JSON_SCHEMA_LOCATION = str(INTEL_MANAGEABILITY_SHARE_PATH_PREFIX /
'dispatcher-agent' / 'config_param_schema.json')
NODE_UPDATE_JSON_SCHEMA_LOCATION = str(INTEL_MANAGEABILITY_SHARE_PATH_PREFIX /
'dispatcher-agent' / 'node_update_schema.json')

# Client certs and keys path
CLIENT_CERTS = str(BROKER_ETC_PATH / 'public' /
Expand Down
32 changes: 15 additions & 17 deletions inbm/dispatcher-agent/dispatcher/dispatcher_broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,10 @@
import logging
from typing import Any, Optional, Callable

from dispatcher.constants import AGENT, CLIENT_CERTS, CLIENT_KEYS, COMPLETED
from dispatcher.constants import AGENT, CLIENT_CERTS, CLIENT_KEYS, COMPLETED, NODE_UPDATE_JSON_SCHEMA_LOCATION
from dispatcher.schedule.sqlite_manager import SqliteManager
from dispatcher.schedule.schedules import Schedule
from dispatcher.validators import is_valid_json_structure
from dispatcher.dispatcher_exception import DispatcherException
from inbm_lib.mqttclient.config import DEFAULT_MQTT_HOST, DEFAULT_MQTT_PORT, MQTT_KEEPALIVE_INTERVAL
from inbm_lib.mqttclient.mqtt import MQTT
Expand Down Expand Up @@ -48,19 +49,19 @@ def send_update(self, message: str) -> None:
"""
logger.debug(f"Sending node update for to {UPDATE_CHANNEL} with message: {message}")
self.mqtt_publish(topic=UPDATE_CHANNEL, payload=message)

def _check_db_for_started_job(self) -> Optional[Schedule]:
sqliteMgr = SqliteManager()
schedule = sqliteMgr.get_any_started_schedule()
logger.debug(f"Checking for started schedule in DB: schedule={schedule}")
if schedule:
if schedule:
# Change status to COMPLETED
sqliteMgr.update_status(schedule, COMPLETED)

del sqliteMgr
return schedule

def send_result(self, message: str, request_id: str = "", job_id: str = "") -> None: # pragma: no cover
def send_result(self, message: str, request_id: str = "") -> None: # pragma: no cover
"""Sends result to local MQTT channel

Raises ValueError if request_id contains a slash
Expand All @@ -81,14 +82,8 @@ def send_result(self, message: str, request_id: str = "", job_id: str = "") -> N
logger.error('Cannot send result: dispatcher core not initialized')
return

schedule = None
# Check if this is a request stored in the DB and started from the APScheduler
if job_id != "":
schedule = Schedule(request_id=request_id, job_id=job_id)
else:
# Some jobs do not call send_result to the dispatcher class to get the
# job_id. In this case, we need to check the DB for the job_id.
schedule = self._check_db_for_started_job()
schedule = self._check_db_for_started_job()
logger.debug(f"Schedule in Broker Send_result: {schedule}")

if not schedule:
# This is not a scheduled job
Expand All @@ -108,19 +103,22 @@ def send_result(self, message: str, request_id: str = "", job_id: str = "") -> N
message_dict = json.loads(message)
except json.JSONDecodeError as e:
logger.error(f"Cannot convert formatted message to dict: {message}. Error: {e}")
nmgaston marked this conversation as resolved.
Show resolved Hide resolved
self.send_update(str(message))
return

# Update the job_id in the message
message_dict['job_id'] = schedule.job_id

message_dict['jobId'] = schedule.job_id
# Convert the updated message_dict back to a JSON string
try:
updated_message = json.dumps(message_dict)
except (TypeError, OverflowError) as e:
logger.error(f"Cannot convert Result back to string: {message_dict}. Error: {e}")
self.send_update(str(message))
return
return

is_valid = is_valid_json_structure(updated_message, NODE_UPDATE_JSON_SCHEMA_LOCATION)
if not is_valid:
logger.error(f"Invalid message format: {updated_message}")
nmgaston marked this conversation as resolved.
Show resolved Hide resolved
return

logger.debug(f"Sending node update message: {str(updated_message)}")
self.send_update(str(updated_message))
Expand Down
8 changes: 4 additions & 4 deletions inbm/dispatcher-agent/dispatcher/dispatcher_class.py
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ def _perform_cmd_type_operation(self, parsed_head: XmlHandler, xml: str) -> Resu
def _telemetry(self, message: str) -> None:
self._dispatcher_broker.telemetry(message)

def _send_result(self, message: str, request_id: str = "", job_id: str = "") -> None:
def _send_result(self, message: str, request_id: str = "") -> None:
"""Sends result message to local MQTT channel

If request_id is specified, the message is sent to RESPONSE_CHANNEL/id instead of RESPONSE_CHANNEL
Expand All @@ -284,8 +284,8 @@ def _send_result(self, message: str, request_id: str = "", job_id: str = "") ->
@param message: message to be published to cloud
"""
# Check if this is a request stored in the DB and started from the APScheduler
logger.debug(f"Sending result message with id {request_id}: {message}")
self._dispatcher_broker.send_result(message, request_id, job_id)
logger.debug(f"Sending result message with request_id={request_id}, message={message}")
self._dispatcher_broker.send_result(message, request_id)


def run_scheduled_job(self, schedule: Schedule, manifest: str) -> None:
Expand Down Expand Up @@ -374,7 +374,7 @@ def do_install(self, xml: str, schema_location: Optional[str] = None, job_id: st
self._update_logger.error = str(e)
finally:
logger.info('Install result: %s', str(result))
self._send_result(message=str(result), job_id=job_id)
self._send_result(message=str(result))
if result.status != CODE_OK and parsed_head:
self._update_logger.status = FAIL
self._update_logger.error = str(result)
Expand Down
4 changes: 2 additions & 2 deletions inbm/dispatcher-agent/dispatcher/ota_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

from .constants import OtaType
from .common.uri_utilities import is_valid_uri
from .validators import is_valid_config_params
from .validators import is_valid_json_structure
from .dispatcher_exception import DispatcherException
from inbm_lib.xmlhandler import XmlException
from inbm_lib.xmlhandler import XmlHandler
Expand Down Expand Up @@ -171,7 +171,7 @@ def parse(self, resource: Dict, kwargs: Dict, parsed: XmlHandler) -> Dict[str, A
if 'import' in cmd:
config_params = '{"execcmd":"/bin/true"}'

if config_params and not is_valid_config_params(config_params):
if config_params and not is_valid_json_structure(config_params):
logger.info("Config Params not passed correctly"
" in manifest, rejected update")
raise XmlException
Expand Down
64 changes: 45 additions & 19 deletions inbm/dispatcher-agent/dispatcher/schedule/sqlite_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,26 +106,52 @@ def get_any_started_schedule(self) -> Optional[Schedule]:
sql = ''' SELECT
j.job_id,
j.task_id,
sj.schedule_id,
sj.schedule_type,
COALESCE(iss.request_id, sss.request_id, rss.request_id) AS request_id
isj.schedule_id,
'immediate' AS schedule_type,
imm.request_id
FROM
job j
JOIN
(
SELECT task_id, schedule_id, 'Immediate' AS schedule_type FROM immediate_schedule_job WHERE status = 'started'
UNION ALL
SELECT task_id, schedule_id, 'Single' AS schedule_type FROM single_schedule_job WHERE status = 'started'
UNION ALL
SELECT task_id, schedule_id, 'Repeated' AS schedule_type FROM repeated_schedule_job WHERE status = 'started'
) sj ON j.task_id = sj.task_id
LEFT JOIN
immediate_schedule iss ON sj.schedule_id = iss.id AND sj.schedule_type = 'Immediate'
LEFT JOIN
single_schedule sss ON sj.schedule_id = sss.id AND sj.schedule_type = 'Single'
LEFT JOIN
repeated_schedule rss ON sj.schedule_id = rss.id AND sj.schedule_type = 'Repeated'
'''
immediate_schedule_job isj ON j.task_id = isj.task_id
JOIN
immediate_schedule imm ON isj.schedule_id = imm.id
WHERE
isj.status = 'started'

UNION

SELECT
j.job_id,
j.task_id,
ssj.schedule_id,
'single' AS schedule_type,
ss.request_id
FROM
job j
JOIN
single_schedule_job ssj ON j.task_id = ssj.task_id
JOIN
single_schedule ss ON ssj.schedule_id = ss.id
WHERE
ssj.status = 'started'

UNION

SELECT
j.job_id,
j.task_id,
rsj.schedule_id,
'repeated' AS schedule_type,
rs.request_id
FROM
job j
JOIN
repeated_schedule_job rsj ON j.task_id = rsj.task_id
JOIN
repeated_schedule rs ON rsj.schedule_id = rs.id
WHERE
rsj.status = 'started';
'''

cursor = self._conn.cursor()
try:
Expand All @@ -141,9 +167,9 @@ def get_any_started_schedule(self) -> Optional[Schedule]:
request_id = row[0][4]
logger.debug(f"Schedule in 'STARTED' state has type={schedule_type}, jobID={job_id}, taskID={task_id}, scheduleID={schedule_id}, requestID={request_id}")

if schedule_type == 'Immediate':
if schedule_type.lower() == 'immediate':
return SingleSchedule(request_id=request_id, job_id=job_id, task_id=task_id, schedule_id=schedule_id)
elif schedule_type == 'Single':
elif schedule_type.lower() == 'single':
return SingleSchedule(request_id=request_id, job_id=job_id, task_id=task_id, schedule_id=schedule_id, start_time=datetime.now())
else:
return RepeatedSchedule(request_id=request_id, job_id=job_id, task_id=task_id, schedule_id=schedule_id)
Expand Down
2 changes: 1 addition & 1 deletion inbm/dispatcher-agent/dispatcher/sota/sota.py
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ def execute_from_manifest(self,
sota_cache_repo.delete_all() # clean cache directory
if get_command_status(cmd_list) == SUCCESS:
self._dispatcher_broker.send_result(
'{"status": 200, "message": SOTA command status: SUCCESSFUL"}')
'{"status": 200, "message": "SOTA command status: SUCCESSFUL"}')
success = True
else:
self._dispatcher_broker.telemetry(
Expand Down
27 changes: 16 additions & 11 deletions inbm/dispatcher-agent/dispatcher/validators.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,19 @@
logger = logging.getLogger(__name__)


def get_schema_location(schema_type: str, schema_location: Optional[str] = None) -> str:
def _get_schema_location(schema_location: Optional[str] = None) -> str:
if not schema_location:
schema_location = JSON_SCHEMA_LOCATION
return schema_location

"""Validates JSON against a JSON schema

def validate_schema(schema_type: str, params: str, schema_location: Optional[str] = None) -> str:
schema_location = get_schema_location(schema_type, schema_location)
@param params: JSON Parameters
@param schema_location: JSON schema location. Default=NONE
@return: Deserialized JSON
"""
def _validate_schema(params: str, schema_location: Optional[str] = None) -> str:
nmgaston marked this conversation as resolved.
Show resolved Hide resolved
schema_location = _get_schema_location(schema_location)

if not os.path.exists(schema_location):
logger.error("JSON Schema file not found")
Expand All @@ -36,20 +41,20 @@ def validate_schema(schema_type: str, params: str, schema_location: Optional[str

parsed = json.loads(str(params))
jsonschema.validate(parsed, schema)
except (ValueError, OSError, jsonschema.exceptions.ValidationError):
raise ValueError("Schema validation failed!")
return parsed
except (ValueError, OSError, jsonschema.exceptions.ValidationError) as e:
raise ValueError(f"Schema validation failed! Error: {e}")
return parsed


def is_valid_config_params(config_params: str, schema_location: Optional[str] = None) -> bool:
"""Schema validate the configuration parameters
def is_valid_json_structure(json_params: str, schema_location: Optional[str] = None) -> bool:
"""Validate the JSON structure against the schema

@param config_params: params to be validated
@param json_params: JSON params to be validated
@param schema_location: location of schema file; default=None
@return (bool): True if schema validated or False on failure or exception
@return (bool): True if valid schema; otherwise, False
"""
try:
validate_schema('single', config_params, schema_location)
_validate_schema(json_params, schema_location)
except (ValueError, KeyError, jsonschema.exceptions.ValidationError) as e:
logger.info("Error received: %s", str(e))
return False
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
{
nmgaston marked this conversation as resolved.
Show resolved Hide resolved
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {
"status": {
"type": "integer",
"description": "The HTTP status code of the response."
},
"message": {
"type": "string",
"description": "A message describing the result of the command."
},
"jobId": {
"type": "string",
"description": "A unique identifier for the job.",
"pattern": "^[a-z0-9-]+$"
}
},
"required": ["status", "message", "jobId"],
"additionalProperties": false
}
nmgaston marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ def __init__(self) -> None:
def start(self, tls: bool) -> None:
pass

def send_result(self, message: str, id: str = "", job_id: str = "") -> None:
def send_result(self, message: str, id: str = "") -> None:
pass

def mqtt_publish(self, topic: str, payload: Any, qos: int = 0, retain: bool = False) -> None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,21 @@ def test_update_single_schedule_status_to_scheduled(db_connection: SqliteManager
results = db_connection.get_single_schedules_in_priority_order()
assert len(results) == 0

def test_update_repeated_schedule_statu_to_scheduled(db_connection: SqliteManager):
def test_get_started_immediate_scheduled_job(db_connection: SqliteManager):
iss = SingleSchedule(request_id=REQUEST_ID,
job_id=JOB_ID,
manifests=["MANIFEST1"])
db_connection.clear_database()

db_connection.create_schedule(iss)
# SQL call only gets results that don't have a status.
results = db_connection.get_immediate_schedules_in_priority_order()
assert len(results) == 1
db_connection.update_status(results[0], "started")
imm_sched = db_connection.get_any_started_schedule()
assert imm_sched == SingleSchedule(request_id=REQUEST_ID, job_id=JOB_ID, task_id=1, schedule_id=1)

def test_update_repeated_schedule_status_to_scheduled(db_connection: SqliteManager):
rs = RepeatedSchedule(request_id=REQUEST_ID,
job_id=JOB_ID,
cron_duration="P7D", cron_minutes="0",
Expand Down
Loading