diff --git a/README.md b/README.md index 7bf327f..ade9952 100644 --- a/README.md +++ b/README.md @@ -1,13 +1,12 @@ -# Multi-Data Lineage System +# ProvLake Lib - IBM Research Multi-Data Lineage System is a provenance data management system capable of capturing, integrating, and querying provenance data generated across multiple, distributed services, programs, databases, and computational workflows. +This is part of ProvLake Project. See [ibm.biz/provlake](http://ibm.biz/provlake) for more details. -**For more information on the project, including participants and publications, please see [ibm.biz/provlake](http://ibm.biz/provlake).** +### A Python lib to capture multiworkflow provenance data from Python Scripts -This repository contains the Python library that captures provenance data in Python applications and send to the Multi-Data Lineage Manager, which is responsible for integrating the data in a provenance database stored as a knowledge graph (semantic detabase), -then allowing users to run queries over the data. +Use this library for code instrumentation to collect provenance data of function calls in a script. Input arguments or output values from functions can come from distributed data storages, including file systems and database systems. -It supports Python>=3.6 +Python 3.6 ### Very simple utilization example @@ -17,7 +16,7 @@ from provlake.prov_lake import ProvLake from provlake.prov_task import ProvTask """ -Very simple example to show how this library is used to instrument a simple python script for provenance data management. +Very simple example to show how ProvLake is used to instrument a simple python script for provenance data management. """ @@ -41,4 +40,5 @@ with ProvTask(prov, "factorial_number", in_args) as prov_task: prov_task.output(out_args) prov.close() + ``` diff --git a/setup.py b/setup.py index ef34fbd..9d9ad55 100644 --- a/setup.py +++ b/setup.py @@ -6,11 +6,11 @@ setup(name='provlake', - version='0.0.72', + version='0.0.74', description='A Python lib to capture multiworkflow provenance data from Python Scripts', - url='http://ibm.biz/provlake', + url='https://github.ibm.com/provlake/ProvLakePy', author='IBM Research', - license='Apache 2.0', + license='Internal use only / IBM only', install_requires=requires, package_dir={'': 'src'}, packages=find_packages(where='src'), diff --git a/src/provlake/prov_lake.py b/src/provlake/prov_lake.py index a2d70c8..ed4309c 100644 --- a/src/provlake/prov_lake.py +++ b/src/provlake/prov_lake.py @@ -13,7 +13,7 @@ def __init__(self, prospective_provenance_dict: dict = None, storage_configuration_path: str=None, storage_configuration_dict: dict=None, - dataflow_name: str=None, + workflow_name: str=None, context: str=None, insert_prospective=False, with_validation: bool=False, @@ -71,10 +71,10 @@ def __init__(self, self.last_task_id = 0 self.wf_start_time = time() - self.df_name = dataflow_name or self.df_structure.get("dataflow_name", "NI") + self.df_name = workflow_name or self.df_structure.get("dataflow_name", "NI") self.tasks = dict() - self.wf_execution = "wfexec_" + str(self.wf_start_time) + self.wf_execution = self.wf_start_time self.wf_obj = { "wf_execution": self.wf_execution, "startTime": self.wf_start_time @@ -96,7 +96,6 @@ def __init__(self, if log_level == "NONE": log_level = "ERROR" log_lvl = getattr(logging, log_level.upper()) - #logging.getLogger().setLevel(log_lvl) logger.setLevel(log_lvl) self.prov_persister = _ProvPersister(self.df_name, service_url=service_url, context=context, bag_size=bag_size, diff --git a/src/provlake/stateless/cycle.py b/src/provlake/stateless/cycle.py index 2908905..d9822eb 100644 --- a/src/provlake/stateless/cycle.py +++ b/src/provlake/stateless/cycle.py @@ -1,7 +1,8 @@ import traceback import logging from time import time -from provlake import prov_utils +from provlake.utils import prov_utils + logger = logging.getLogger('PROV') diff --git a/src/provlake/stateless/task.py b/src/provlake/stateless/task.py index d9d67e7..f0da0b8 100644 --- a/src/provlake/stateless/task.py +++ b/src/provlake/stateless/task.py @@ -1,7 +1,8 @@ import traceback import logging from time import time -from provlake import prov_utils +from provlake.utils import prov_utils + logger = logging.getLogger('PROV') diff --git a/src/provlake/stateless/workflow.py b/src/provlake/stateless/workflow.py index 118cc8c..1ca8d3f 100644 --- a/src/provlake/stateless/workflow.py +++ b/src/provlake/stateless/workflow.py @@ -1,5 +1,5 @@ from time import time -from provlake import prov_utils +from provlake.utils import prov_utils import os diff --git a/src/provlake/utils/__init__.py b/src/provlake/utils/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/provlake/utils/args_handler.py b/src/provlake/utils/args_handler.py new file mode 100644 index 0000000..6cfa54a --- /dev/null +++ b/src/provlake/utils/args_handler.py @@ -0,0 +1,36 @@ +def get_dict(_dict:dict) -> dict: + if _dict is None: + return None + if len(_dict) == 0: + return {} + return { + "type": "dict", + "values": _dict + } + +def get_list(_list:list) -> dict: + if _list is None: + return None + if len(_list) == 0: + return [] + return { + "type": "list", + "values": _list + } + +def get_recursive_dicts(_dict:dict) -> dict: + if _dict is None: + return None + if len(_dict) == 0: + return {} + values = dict() + for k in _dict: + v = _dict[k] + if type(v) == dict: + values[k] = get_recursive_dicts(v) + else: + values[k] = v + return { + "type": "dict", + "values": values + } diff --git a/src/provlake/utils/prov_utils.py b/src/provlake/utils/prov_utils.py new file mode 100644 index 0000000..6dfaf25 --- /dev/null +++ b/src/provlake/utils/prov_utils.py @@ -0,0 +1,282 @@ +""" +##### +# Exemplary Content of a prov.yaml: +##### +context_name: seismic_pipelines +dataflow_name: + train: seismic_stratigraphic_train + create_data: seismic_stratigraphic_create_training_data + analyze_seismic: seismic_stratigraphic_analyze_seismic +file_storage: dccad003.pok.ibm.com +with_validation: false +should_log_to_file: true +log_dir: . +align_terms: + segy_path: seismic_file + horizons_paths: horizon_file +graph_uri: +stringify_params: ["unite_category"] +not_tracked_params: ["prov", "prov_path", "input_path", "log_level", "dataset", "prov_config", "data_path" ] +bag_size: 1 +service: http://localhost:5000 +online: false + log_level: info +""" +import yaml +import os +from enum import Enum +import logging +import json +logger = logging.getLogger('PROV') + + +class MLSemantics(Enum): + hyper_parameter = "hyper_parameter" + data_characteristic = "data_characteristic" + dataset = "dataset" + evaluation_measure = "evaluation_measure" + model = "model" + + def __str__(self): + return self.value + + +class DTYPE(Enum): + int = "integer" + str = "string" + bool = "boolean" + list = "list" + float = "float" + complex = "complex_attribute" + any = "any" + + def __str__(self): + return self.value + + +class ProvConf: + conf_data: dict = None + custom_attributes: dict = None + + def __init__(self, prov_conf_path: str = None): + if not prov_conf_path: + logger.warning("[Prov] You are not capturing provenance.") + return + with open(prov_conf_path, 'r') as stream: + try: + ProvConf.conf_data = yaml.safe_load(stream) + except yaml.YAMLError as exc: + raise exc + + if "custom_attributes" in ProvConf.conf_data: + with open(ProvConf.conf_data["custom_attributes"], 'r') as stream: + try: + ProvConf.custom_attributes = yaml.safe_load(stream) + except yaml.YAMLError as exc: + raise exc + + +def get_dtype_from_val(value, should_stringfy=False) -> str: + if should_stringfy: + return DTYPE.str + elif type(value) == str: + return DTYPE.str + elif type(value) == list: + return DTYPE.list + elif type(value) == int: + return DTYPE.int + elif type(value) == float: + return DTYPE.float + elif type(value) == bool: + return DTYPE.bool + elif type(value) == dict: + return DTYPE.complex + else: + return DTYPE.any + + +def get_dtype_from_type(_type: type, should_stringfy=False) -> str: + if should_stringfy: + return str(DTYPE.str) + elif _type == str: + return str(DTYPE.str) + elif _type == list: + return str(DTYPE.list) + elif _type == int: + return str(DTYPE.int) + elif _type == float: + return str(DTYPE.float) + elif _type == bool: + return str(DTYPE.bool) + elif _type == dict: + return str(DTYPE.complex) + else: + return "any" + + +def build_generic_prospective(dataflow_name: str): + prospective_prov = dict() + prospective_prov["dataflow_name"] = ProvConf.conf_data["dataflow_name"][dataflow_name] + prospective_prov["context_name"] = ProvConf.conf_data["context_name"] + prospective_prov["storages"] = {"main_filesystem": {"type": "PhysicalMachine"}} + prospective_prov["data_transformations"] = {} + storage_configuration = { + "configuration": { + "storages": { + "main_filesystem": { + "type": "PhysicalMachine", + "host_address": ProvConf.conf_data["file_storage"] + } + } + } + } + # TODO Add machine host + return prospective_prov, storage_configuration + + +# @Deprecated -- this is going to be depricated. Please avoid using it. +def build_prov_input_from_dict(dict_params: dict): + retrospective_input_prov = {} + attributes = [] + + for key in dict_params: + # Remove params not tracked: + if "not_tracked_params" in ProvConf.conf_data and key in ProvConf.conf_data["not_tracked_params"]: + continue + + value = dict_params[key] + if value is None or (value is not None and value == ''): + continue + + # Renaming param names in case we need: + attr_name = key + if "align_terms" in ProvConf.conf_data and key in ProvConf.conf_data["align_terms"]: + attr_name = ProvConf.conf_data["align_terms"][key] + + should_stringify = __get_should_stringify(attr_name) + if should_stringify: + retrospective_input_prov[attr_name] = str(value) + else: + retrospective_input_prov[attr_name] = value + attr = { + "name": attr_name, + #"description": "" + } + if "path" in attr_name or "file" in attr_name: + attr["semantics"] = "FILE_REFERENCE" + attr["storage_references"] = { + "key": "main_filesystem" + } + else: + attr["semantics"] = "PARAMETER" + attr["ml_semantics"] = str(MLSemantics.hyper_parameter) + + # if "_slices" in attr_name: + # # we have a special case for slices in a different method + # continue + + dtype = get_dtype_from_val(value, should_stringify) + if dtype == "list": + attr["dtype"] = dtype + if "REFERENCE" not in attr["semantics"] and len(value) > 0: + attr["elementdtype"] = str(get_dtype_from_val(value[0], should_stringify)) + elif "REFERENCE" not in attr["semantics"]: + attr["dtype"] = dtype + + attributes.append(attr) + + return attributes, retrospective_input_prov + + +def __get_should_stringify(key): + if "stringify_params" not in ProvConf.conf_data: + return False + if key not in ProvConf.conf_data["stringify_params"]: + return False + return True + + +def get_prospective_attribute(key, value): + ml_semantics = None + + if type(value) == type: + dtype = get_dtype_from_type(value) + elif type(value) == DTYPE: + dtype = value + elif type(value) == dict: + dtype = value.get("dtype", "any") + ml_semantics = value.get("ml_semantics", None) + else: + dtype = get_dtype_from_val(value) + + attr = { + "name": key, + "dtype": str(dtype) + } + + if ml_semantics: + attr["ml_semantics"] = str(ml_semantics) + + if "path" in key or "file" in key: + attr["semantics"] = "FILE_REFERENCE" + attr["storage_references"] = {"key": "main_filesystem"} + + if dtype == DTYPE.list: + if len(value) > 0: + attr["elementdtype"] = str(get_dtype_from_val(value[0])) + elif dtype == DTYPE.complex: + attr["attributes"] = get_prospective_from_args(value) + + return attr + + +def get_prospective_from_args(prov_args: dict) -> list: + args_schema = [] + + for k in prov_args: + # removing unwanted attributes + if k == "self" or k.startswith('_') or "prov" in k: + continue + + attr = get_prospective_attribute(k, prov_args[k]) + args_schema.append(attr) + return args_schema + + +def build_prov_for_transformation(prospective_prov: dict, transformation): + # IN + prospective_in, prospective_out = [], [] + if hasattr(transformation, "static_prospective_prov_attributes_in") \ + and transformation.static_prospective_prov_attributes_in: + prospective_in = transformation.static_prospective_prov_attributes_in + elif transformation.static_schema_args_in: + prospective_in = get_prospective_from_args(transformation.static_schema_args_in) + else: + transformation.static_schema_args_in = transformation.get_static_schema_args_from_init() + prospective_in = get_prospective_from_args(transformation.static_schema_args_in) + + # OUT + if transformation.static_schema_args_out: + prospective_out = get_prospective_from_args(transformation.static_schema_args_out) + # elif hasattr(transformation, "prospective_args_out") and transformation.prospective_args_out: + # prospective_out = transformation.prospective_args_out + prospective_prov["data_transformations"].update({ + transformation.dt_name(): { + "input_datasets": [{"attributes": prospective_in}], + "output_datasets": [{"attributes": prospective_out}] + } + }) + return prospective_prov + + +def append_log(retrospective_json: dict, log_dir: str, workflow_name: str, wfexec_id: str): + try: + log_file_path = os.path.abspath( + os.path.join(log_dir, 'prov-{}-wfexec-{}.log'.format(workflow_name, wfexec_id))) + with open(log_file_path, 'a') as f: + f.writelines("{}\n".format(json.dumps([retrospective_json]))) + except Exception as e: + logger.error("Could not save prov logs in " + log_dir + "\n" + str(e)) + pass + + diff --git a/tests/complex_attributes.py b/tests/complex_attributes.py new file mode 100644 index 0000000..6e215a1 --- /dev/null +++ b/tests/complex_attributes.py @@ -0,0 +1,74 @@ +from provlake.prov_lake import ProvLake +from provlake.prov_task import ProvTask + +""" +Very simple example to show how ProvLake is used to instrument a simple python script for provenance data management. +""" + + +prov = ProvLake(workflow_name="example_workflow", online=False, should_log_to_file=True) + +in_args = { + "student_id": { + "type": "list", + "values": [1, 2, 3] + }, + "dict_test_simple": { + "type": "dict", + "values": { + "a": 1 + } + }, + "dict_test_complex": { + "type": "dict", + "values": { + "b": 1, + "subnode": { + "c": 2, + "d": 3 + } + } + }, + "dict_test_super_complex": { + "type": "dict", + "values": { + "b": 1, + "subnode": { + "type": "dict", + "values": { + "e": 4, + "f": 5 + } + } + } + }, + "dict_test_hyper_complex": { + "type": "dict", + "values": { + "b": 1, + "subnode": { + "type": "dict", + "values": { + "e": 4, + "f": 5, + "subsubnode": { + "type": "dict", + "values": { + "y": 4, + "x": 5 + } + } + } + } + } + }, + "ordinary_list": [1,2,3], + "ordinary_dict": {"a":"b"} +} +with ProvTask(prov, "act_1", in_args) as prov_task: + + + out_args = {"out": 50} + prov_task.output(out_args) + +prov.close() diff --git a/tests/easing_complex_attributes.py b/tests/easing_complex_attributes.py new file mode 100644 index 0000000..ac3b937 --- /dev/null +++ b/tests/easing_complex_attributes.py @@ -0,0 +1,67 @@ +from provlake.prov_lake import ProvLake +from provlake.prov_task import ProvTask +from provlake.utils.args_handler import get_dict, get_list, get_recursive_dicts +import sys +import json +""" +Very simple example to show how ProvLake is used to instrument a simple python script for provenance data management. +""" + + +prov = ProvLake(workflow_name="example_workflow", online=False, should_log_to_file=True) + +in_args = { + "student_id": get_list([1,2,3]), + "dict_test_simple": get_dict({ + "a": 1 + }), + "dict_test_complex": get_dict({ + "b": 1, + "subnode": { + "c": 2, + "d": 3 + } + }), + "dict_test_super_complex": get_recursive_dicts({ + "b": 1, + "subnode": { + "e": 4, + "f": 5 + } + }), + "dict_test_hyper_complex": get_recursive_dicts({ + "b": 1, + "mylist0": [1,2,3], + "mybiglist": [ 0.3432443227822222223, 0.4442880900021000000042, 0.47777722777782223332212], + "mymatrix0": [[1,2,3],[1,2,3],[1,2,3]], + "subnode": { + "e": 4, + "f": 5, + "mylist2": [1,2,3], + "subsubnode": { + "y": 4, + "x": 5, + "mylist3": [1,2,3], + }, + + + } + }), + "ordinary_list": [1,2,3], + "ordinary_dict": { + "a":"b", + "mylist1": [1,2,3], + "mylist2": [[1,2,3],[1,2,3],[1,2,3]] + }, + "list_of_dicts": [ + {"a": 1}, + {"b": 2}, + ] +} +with ProvTask(prov, "act_1", in_args) as prov_task: + + + out_args = {"out": 50} + prov_task.output(out_args) + +prov.close() diff --git a/tests/prospective_generator.py b/tests/prospective_generator.py new file mode 100644 index 0000000..206be7c --- /dev/null +++ b/tests/prospective_generator.py @@ -0,0 +1,191 @@ +import tests.offline_simple_synthetic_workflow_tests as workflow_script +import prospective_generator_test as wf_script2 +import seismic_importer as wf_script +import dis +import inspect +import ast +import json +import traceback +from enum import Enum + +class UnknownValue(Enum): + FUNCTION_CALL = 1 + +dtypes = { + int: "integer", + float: "float", + ast.List: "list", + list: "list", + ast.Str: "string", + str: "string", + ast.Dict: "complex_attribute", + dict: "complex_attribute", + bool: "boolean", + UnknownValue.FUNCTION_CALL: "" +} + + +def add_default_attributes(data_transformations): + prospective_json = dict() + prospective_json["context_name"] = "" + prospective_json["dataflow_name"] = "" + prospective_json["description"] = "" + prospective_json["storages"] = {} + prospective_json["data_transformations"] = data_transformations + return prospective_json + +def get_value(ast_value, simple_attributes_dict=None): + if isinstance(ast_value, ast.Num): + return ast_value.n + elif isinstance(ast_value, ast.Str): + return ast_value.s + elif isinstance(ast_value, ast.List): + return [get_value(v) for v in ast_value.elts] + elif isinstance(ast_value, ast.Name): + return simple_attributes_dict[ast_value.id] + elif isinstance(ast_value, ast.Call): + return UnknownValue.FUNCTION_CALL + elif isinstance(ast_value, ast.NameConstant): + return ast_value.value + else: + return ast_value + +def add_semantics(attributes: list, attr_type): + for attr in attributes: + if "in" in attr_type: + attr["semantics"] = "PARAMETER" + else: + attr["semantics"] = "OUTPUT_VALUE" + + if attr["dtype"] == "list" and attr["elementdtype"] == "complex_attribute": + add_semantics(attr["attributes"], attr_type) + +def check_list_attribute(simple_attributes_dict, prospective_chunk, ast_attribute_value): + if isinstance(ast_attribute_value, ast.List) and hasattr(ast_attribute_value, 'elts'): + attribute = ast_attribute_value.elts + else: + attribute = ast_attribute_value + + if isinstance(attribute, list) and len(attribute) > 0: + try: + element_value = get_value(attribute[0]) + elementdtype = dtypes[type(element_value)] + prospective_chunk["elementdtype"] = elementdtype + if elementdtype == "complex_attribute": + prospective_chunk["attributes"] = get_dataset_attributes(simple_attributes_dict, attribute[0]) + except AttributeError: + traceback.print_exc() + return prospective_chunk + + +def create_attribute(simple_attributes_dict, key: str, value): + attribute = dict() + attribute["name"] = key + + v = get_value(value, simple_attributes_dict) + if isinstance(value, ast.Call) or isinstance(v, type(UnknownValue.FUNCTION_CALL)): + attribute["dtype"] = dtypes[v] + else: + attribute["dtype"] = dtypes[type(v)] + + attribute = check_list_attribute(simple_attributes_dict, attribute, value) + # attribute["description"] = "" + return attribute + +def get_dataset_attributes(simple_attributes_dict, body_value, body_target=None): + attributes = list() + if isinstance(body_value, ast.Dict): + for k, v in zip(body_value.keys, body_value.values): + attributes.append(create_attribute(simple_attributes_dict, k.s, v)) + + elif isinstance(body_target, ast.Subscript) and isinstance(body_value, ast.Name): + key = body_target.slice.value + value = simple_attributes_dict[body_value.id] + attributes.append(create_attribute(simple_attributes_dict, key.s, value)) + + elif not isinstance(body_value, ast.Call): + key = body_target.slice.value + attributes.append(create_attribute(simple_attributes_dict, key.s, body_value)) + return attributes + + +def update_attributes_dict(complex_attr_by_variable, variable_name, simple_attributes_dict, value, target): + if not complex_attr_by_variable.__contains__(variable_name): + complex_attr_by_variable[variable_name] = list() + attributes = get_dataset_attributes(simple_attributes_dict, value, target) + complex_attr_by_variable[variable_name].extend(attributes) + return complex_attr_by_variable + + +def check_ast_instance(ast_body_instance, complex_attr_by_variable, simple_attributes_dict): + if isinstance(ast_body_instance, ast.Assign) and hasattr(ast_body_instance, "value"): + for target in ast_body_instance.targets: + value = ast_body_instance.value + + if isinstance(target, ast.Subscript): + variable_name = target.value.id + complex_attr_by_variable = update_attributes_dict(complex_attr_by_variable, variable_name, + simple_attributes_dict, value, target) + elif isinstance(target, ast.Name): + variable_name = target.id + if isinstance(value, ast.Dict): + complex_attr_by_variable = update_attributes_dict(complex_attr_by_variable, variable_name, + simple_attributes_dict, value, target) + else: + simple_attributes_dict[variable_name] = get_value(value) + + +def create_dataset(complex_attr_by_variable, used_args_variable_name, dt_name, data_transformations_json, args_type): + dataset_type = "input_datasets" if "input" in args_type.lower() else "output_datasets" + if not data_transformations_json.__contains__(dt_name): + data_transformations_json[dt_name] = dict() + data_transformations_json[dt_name]["input_datasets"] = [] + data_transformations_json[dt_name]["output_datasets"] = [] + + if len(complex_attr_by_variable[used_args_variable_name]) > 0 and dt_name != None: + add_semantics(complex_attr_by_variable[used_args_variable_name], args_type) + attributes_json = dict() + attributes_json["attributes"] = complex_attr_by_variable[used_args_variable_name] + data_transformations_json[dt_name][dataset_type] = [attributes_json] + + +def create_prospective_json(function_name, file_name: str= "prospective.json"): + workflow_src = inspect.getsource(function_name) + workflow_ast = ast.parse(workflow_src) + data_transformations_json = dict() + data_transformation_name = "" + + for c1, wk_outer_body in enumerate(workflow_ast.body): + values_by_variable_name = dict() + simple_attributes = dict() + for c2, wk_inner_body in enumerate(wk_outer_body.body): + check_ast_instance(wk_inner_body, values_by_variable_name, simple_attributes) + + # Inspect function call "With ProvTask(...)" + if isinstance(wk_inner_body, ast.With): + for input_item in wk_inner_body.items: + if len(input_item.context_expr.args) == 3: + if input_item.context_expr.func.id == 'ProvTask': + data_transformation_name = input_item.context_expr.args[1].s + input_args_variable_name = input_item.context_expr.args[2].id + create_dataset(values_by_variable_name, input_args_variable_name, + data_transformation_name, data_transformations_json, "input") + + for wk_inner_with_body in wk_inner_body.body: + check_ast_instance(wk_inner_with_body, values_by_variable_name, simple_attributes) + if isinstance(wk_inner_with_body, ast.Expr) and isinstance(wk_inner_with_body.value, ast.Call): + if wk_inner_with_body.value.func.attr == "output": + output_args_variable_name = wk_inner_with_body.value.args[0].id + create_dataset(values_by_variable_name, output_args_variable_name, + data_transformation_name, data_transformations_json, "output") + + if len(data_transformations_json) > 0: + prospective_json = add_default_attributes(data_transformations_json) + + print("\n\t\tJSON: {}\n\t\t\t".format(type(prospective_json)),prospective_json) + print(prospective_json) + with open(file_name, "w") as f: + json.dump(prospective_json, f, indent=4) + +if __name__ == '__main__': + create_prospective_json(wf_script2.run_workflow, "prospective.json") diff --git a/tests/simple_example.py b/tests/simple_example.py index 155f4e3..dc80f2d 100644 --- a/tests/simple_example.py +++ b/tests/simple_example.py @@ -15,7 +15,7 @@ def calc_factorial(n): return result -prov = ProvLake(dataflow_name="factorial_dataflow", online=False, should_log_to_file=True) +prov = ProvLake(workflow_name="factorial_dataflow", online=False, should_log_to_file=True) in_args = {"n": 5} with ProvTask(prov, "factorial_number", in_args) as prov_task: @@ -25,4 +25,20 @@ def calc_factorial(n): out_args = {"factorial": factorial} prov_task.output(out_args) + + +with ProvTask(prov, "factorial_number", in_args) as prov_task: + + factorial = calc_factorial(in_args.get("n")) + + out_args = {"factorial": factorial} + prov_task.output(out_args) + + + + + + + + prov.close()