Skip to content

Commit

Permalink
Add "src/aiko_services/examples/pipeline/elements.py:PE_Inspect()" and
Browse files Browse the repository at this point in the history
"pipeline_local.json" example usage.  Provides generic inspection of a
Pipeline's inputs / outputs variables ... by either writing to a
file or a log or printing to the console.
  • Loading branch information
geekscape committed Oct 8, 2024
1 parent 26fbd3c commit f9f837c
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 17 deletions.
95 changes: 79 additions & 16 deletions src/aiko_services/examples/pipeline/elements.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,17 @@
from typing import Tuple

import aiko_services as aiko
from aiko_services.main.utilities import parse

# --------------------------------------------------------------------------- #

def _all_outputs(pipeline_element, stream):
frame = stream.frames[stream.frame_id]
outputs = {}
for output_definition in pipeline_element.definition.output:
output_name = output_definition["name"]
outputs[output_name] = frame.swag[output_name]
return outputs

# --------------------------------------------------------------------------- #

Expand All @@ -47,6 +58,64 @@ def process_frame(self, stream, i) -> Tuple[aiko.StreamEvent, dict]:

return aiko.StreamEvent.OKAY, {"i": i_new}

# --------------------------------------------------------------------------- #

class PE_Inspect(aiko.PipelineElement):
def __init__(self, context):
context.set_protocol("metrics:0")
context.get_implementation("PipelineElement").__init__(self, context)

def _get_inspect_file(self, stream, target):
inspect_file = stream.variables.get("inspect_file", None)
if not inspect_file:
_, inspect_filepath = target.split(":")
inspect_file = open(inspect_filepath, "a")
stream.variables["inspect_file"] = inspect_file
return inspect_file

def process_frame(self, stream) -> Tuple[aiko.StreamEvent, dict]:
frame = stream.frames[stream.frame_id]

enable, _ = self.get_parameter("enable", True)
if enable:
names, found = self.get_parameter("inspect")
if not found:
return aiko.StreamEvent.ERROR, \
{"diagnostic": "'inspect' parameter not found"}
name, names = parse(names)
names.insert(0, name)
if "*" in names:
names = frame.swag.keys()

target, _ = self.get_parameter("target", "log")
if target.startswith("file:"):
inspect_file = self._get_inspect_file(stream, target)

for name in names:
value = frame.swag.get(name, None)
name_value = f"{self.my_id()} {name}: {value}"

if target.startswith("file:"):
inspect_file.write(name_value + "\n")
elif target == "log":
self.logger.info(name_value)
elif target == "print":
print(name_value)
else:
return aiko.StreamEvent.ERROR, \
{"diagnostic": "'target' parameter must be 'file', 'log' or 'print'"}

if target.startswith("file:"):
inspect_file.flush()

return aiko.StreamEvent.OKAY, _all_outputs(self, stream)

def start_stream(self, stream, stream_id):
inspect_file = stream.variables.get("inspect_file", None)
if inspect_file:
inspect_file.close()
return aiko.StreamEvent.OKAY, {}

# --------------------------------------------------------------------------- #
# PE_Metrics typically appears at the end of a Pipeline graph.
# So that child Pipeline responses can be returned to the parent Pipeline,
Expand All @@ -71,12 +140,7 @@ def process_frame(self, stream) -> Tuple[aiko.StreamEvent, dict]:
time_pipeline = metrics["time_pipeline"] * 1000
self.logger.debug(f"Pipeline total: {time_pipeline:.3f} ms")

outputs = {}
for output_definition in self.definition.output:
output_name = output_definition["name"]
outputs[output_name] = frame.swag[output_name]

return aiko.StreamEvent.OKAY, outputs
return aiko.StreamEvent.OKAY, _all_outputs(self, stream)

# --------------------------------------------------------------------------- #

Expand Down Expand Up @@ -111,9 +175,9 @@ def __init__(self, context):
context.get_implementation("PipelineElement").__init__(self, context)

def process_frame(self, stream, a) -> Tuple[aiko.StreamEvent, dict]:
pe_0_inc, found = self.get_parameter("pe_0_inc", 1)
pe_0_inc, _ = self.get_parameter("pe_0_inc", 1)
b = int(a) + int(pe_0_inc)
self.logger.info(f"PE_0: {self.my_id()} in a: {a}, out b: {b}")
self.logger.info(f"{self.my_id()} in a: {a}, out b: {b}")
return aiko.StreamEvent.OKAY, {"b": b}

# --------------------------------------------------------------------------- #
Expand All @@ -125,11 +189,11 @@ def __init__(self, context):

def process_frame(self, stream, b) -> Tuple[aiko.StreamEvent, dict]:
increment = 1
p_1, found = self.get_parameter("p_1")
pe_1_inc, found = self.get_parameter("pe_1_inc", 1)
p_1, _ = self.get_parameter("p_1")
pe_1_inc, _ = self.get_parameter("pe_1_inc", 1)
c = int(b) + int(pe_1_inc)
self.logger.info(f"PE_1: {self.my_id()} in b: {b}, out c: {c}")
self.logger.info(f"PE_1: parameter pe_1_inc: {pe_1_inc}")
self.logger.info(f"{self.my_id()} in b: {b}, out c: {c}")
self.logger.info(f" parameter pe_1_inc: {pe_1_inc}")
return aiko.StreamEvent.OKAY, {"c": c}

# --------------------------------------------------------------------------- #
Expand All @@ -141,7 +205,7 @@ def __init__(self, context):

def process_frame(self, stream, c) -> Tuple[aiko.StreamEvent, dict]:
d = int(c) + 1
self.logger.info(f"PE_2: {self.my_id()} in c: {c}, out d: {d}")
self.logger.info(f"{self.my_id()} in c: {c}, out d: {d}")
return aiko.StreamEvent.OKAY, {"d": d}

# --------------------------------------------------------------------------- #
Expand All @@ -153,7 +217,7 @@ def __init__(self, context):

def process_frame(self, stream, c) -> Tuple[aiko.StreamEvent, dict]:
e = int(c) + 1
self.logger.info(f"PE_3: {self.my_id()} in c: {c}, out e: {e}")
self.logger.info(f"{self.my_id()} in c: {c}, out e: {e}")
return aiko.StreamEvent.OKAY, {"e": e}

# --------------------------------------------------------------------------- #
Expand All @@ -165,8 +229,7 @@ def __init__(self, context):

def process_frame(self, stream, d, e) -> Tuple[aiko.StreamEvent, dict]:
f = int(d) + int(e)
self.logger.info(
f"PE_4: {self.my_id()} in d, e {d} {e}, out: d + e = f: {f}")
self.logger.info(f"{self.my_id()} in d: {d}, e: {e}, out: d + e = f: {f}")
return aiko.StreamEvent.OKAY, {"f": f}

# --------------------------------------------------------------------------- #
Expand Down
14 changes: 13 additions & 1 deletion src/aiko_services/examples/pipeline/pipeline_local.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
"#": "Graph may consist of one or more sub-graphs",
"#": "This graph demonstrates fan-out and fan-in (diamond network)",
"graph": [
"(PE_1 (PE_2 PE_4) (PE_3 PE_4) PE_Metrics)"
"(PE_1 (PE_2 PE_4) (PE_3 PE_4) PE_Inspect PE_Metrics)"
],

"#": "Optional parameters may be either null, boolean, integer or string",
Expand Down Expand Up @@ -65,6 +65,18 @@
}
}
},
{ "name": "PE_Inspect",
"#": "Provides PipelineElement inputs / outputs value inspection",
"parameters": {
"enable": true, "#": "true or false",
"inspect": "(*)", "#": "inputs / outputs names",
"target": "file:z_inspect.txt", "#": "file, log or print"},
"input": [],
"output": [],
"deploy": {
"local": { "module": "aiko_services.examples.pipeline.elements" }
}
},
{ "name": "PE_Metrics",
"#": "Provides Pipeline and PipelineElement timing information",
"input": [],
Expand Down

0 comments on commit f9f837c

Please sign in to comment.