From 0b27dc2e1cb015ea72376e81240f7b93f31c1086 Mon Sep 17 00:00:00 2001 From: Renan Souza Date: Wed, 31 Mar 2021 14:12:22 -0300 Subject: [PATCH 01/13] Facilitating the track of data references --- src/provlake/capture/__init__.py | 2 +- src/provlake/utils/args_handler.py | 34 ++++++++++++++++++++++++------ src/provlake/utils/constants.py | 6 ++++-- tests/complex_attributes.py | 3 ++- tests/easing_complex_attributes.py | 17 +++++++++++++-- tests/simple_example.py | 2 +- 6 files changed, 51 insertions(+), 13 deletions(-) diff --git a/src/provlake/capture/__init__.py b/src/provlake/capture/__init__.py index b469e48..b5aab1d 100644 --- a/src/provlake/capture/__init__.py +++ b/src/provlake/capture/__init__.py @@ -81,7 +81,7 @@ class ProvTask(ActivityCapture): def __init__(self, prov_persister: Persister, data_transformation_name: str, input_args=dict(), parent_cycle_name: str = None, parent_cycle_iteration=None, person_id: str = None, task_id=None, - custom_metadata:dict=None, attribute_associations:dict=None, generated_time:float=None): + custom_metadata: dict = None, attribute_associations: dict = None, generated_time: float=None): super().__init__(prov_persister, custom_metadata) if self._prov_persister is None: return diff --git a/src/provlake/utils/args_handler.py b/src/provlake/utils/args_handler.py index be58f99..5cd17af 100644 --- a/src/provlake/utils/args_handler.py +++ b/src/provlake/utils/args_handler.py @@ -1,26 +1,29 @@ -def get_dict(_dict:dict) -> dict: +from provlake.utils.constants import Vocabulary + +# TODO Finish adding the keys in the returning dicts as constants +def get_dict(_dict: dict) -> dict: if _dict is None: return None if len(_dict) == 0: return {} return { - "type": "dict", + Vocabulary.PROV_ATTR_TYPE: "dict", "values": _dict } -def get_list(_list:list) -> dict: +def get_list(_list: list) -> dict: if _list is None: return None if len(_list) == 0: return [] return { - "type": "list", + Vocabulary.PROV_ATTR_TYPE: "list", "values": _list } -def get_recursive_dicts(_dict:dict) -> dict: +def get_recursive_dicts(_dict: dict) -> dict: if _dict is None: return None if len(_dict) == 0: @@ -33,6 +36,25 @@ def get_recursive_dicts(_dict:dict) -> dict: else: values[k] = v return { - "type": "dict", + Vocabulary.PROV_ATTR_TYPE: "dict", "values": values } + + +def add_custom_metadata(value, custom_metadata: dict = None) -> dict: + if not custom_metadata: + return value + return { + Vocabulary.PROV_ATTR_TYPE: "attribute_value", + "values": value, + "custom_metadata": custom_metadata + } + + +def get_data_reference(value, datastore_id) -> dict: + return { + "values": value, + Vocabulary.PROV_ATTR_TYPE: Vocabulary.DATA_REFERENCE_TYPE, + "datastore_id": datastore_id + } + diff --git a/src/provlake/utils/constants.py b/src/provlake/utils/constants.py index 68143c7..491a135 100644 --- a/src/provlake/utils/constants.py +++ b/src/provlake/utils/constants.py @@ -19,7 +19,9 @@ class Vocabulary: WORKFLOW_NAME = "dataflow_name" ACT_TYPE = "act_type" ATTRIBUTE_ASSOCIATIONS = "attribute_associations" - + DATA_REFERENCE_TYPE = "data_reference" + DATASTORE_ID = "datastore_id" + PROV_ATTR_TYPE = "prov_attr_type" class Status: @@ -79,7 +81,7 @@ def get_wfe_id(workflow_name: str, wf_exec_id): return wfe_id @staticmethod - def get_dte_id(wfe_id, dt_name:str , prov_task: dict): + def get_dte_id(wfe_id, dt_name: str, prov_task: dict): task_id = prov_task["id"] # TODO better wfe_id + dte_name + timestamp if Vocabulary.GENERATED_TIME in prov_task and \ diff --git a/tests/complex_attributes.py b/tests/complex_attributes.py index 78c37c7..19eef39 100644 --- a/tests/complex_attributes.py +++ b/tests/complex_attributes.py @@ -63,7 +63,8 @@ } }, "ordinary_list": [1,2,3], - "ordinary_dict": {"a":"b"} + "ordinary_dict": {"a": "b"} + } with ProvTask(prov, "act_1", in_args) as prov_task: diff --git a/tests/easing_complex_attributes.py b/tests/easing_complex_attributes.py index 5610387..fe0e9ae 100644 --- a/tests/easing_complex_attributes.py +++ b/tests/easing_complex_attributes.py @@ -1,6 +1,6 @@ from provlake import ProvLake from provlake.capture import ProvWorkflow, ProvTask -from provlake.utils.args_handler import get_dict, get_list, get_recursive_dicts +from provlake.utils.args_handler import get_dict, get_list, get_recursive_dicts, get_data_reference, add_custom_metadata import sys import json """ @@ -57,7 +57,20 @@ "list_of_dicts": [ {"a": 1}, {"b": 2}, - ] + ], + "seismic_uri": get_data_reference("http://ibm.org/Netherlands", datastore_id="Allegro1"), + "mysimple_arg": add_custom_metadata(value="myval", custom_metadata={"custom": 1}), + "mysimple_arg2": add_custom_metadata(value={"myval2": "val2"}, custom_metadata={"custom": 1}), + "mydict_list": get_list([ + { + "name": "a", + "path": "/home/data/file_a.dat" + }, + { + "name": "b", + "path": "/home/data/file_b.dat" + } + ]) } with ProvTask(prov, "act_1", in_args) as prov_task: out_args = {"out": 50} diff --git a/tests/simple_example.py b/tests/simple_example.py index 3044f4d..2b8c5b9 100644 --- a/tests/simple_example.py +++ b/tests/simple_example.py @@ -15,7 +15,7 @@ def calc_factorial(n): return result -prov = ProvLake.get_persister("factorial_dataflow") +prov = ProvLake.get_persister("factorial_dataflow", service_url="http://localhost:5000") with ProvWorkflow(prov): in_args = {"n": 5} From 11cf86f42d8f9702795a1e773fcf38548aa693f9 Mon Sep 17 00:00:00 2001 From: Renan Souza Date: Thu, 1 Apr 2021 00:02:54 -0300 Subject: [PATCH 02/13] Refactor to fix lists of dicts and improving tests --- src/provlake/__init__.py | 11 +---- src/provlake/persistence/managed_persister.py | 43 ++++++++++++++----- src/provlake/persistence/persister.py | 5 +++ src/provlake/utils/args_handler.py | 26 +++++------ src/provlake/utils/constants.py | 3 ++ src/provlake/utils/prov_utils.py | 23 ++++++++-- 6 files changed, 76 insertions(+), 35 deletions(-) diff --git a/src/provlake/__init__.py b/src/provlake/__init__.py index 3a8ea77..1b9af89 100644 --- a/src/provlake/__init__.py +++ b/src/provlake/__init__.py @@ -42,16 +42,6 @@ def _build_managed_persister( assert should_send_to_file is True, "If you are using ProvLake in offline mode, " \ "you need to log prov data to file. Check your 'should_send_to_file' and " \ "'should_send_to_service' parameters." - if should_send_to_file: - if not os.path.exists(log_dir): - os.makedirs(os.path.join(os.getcwd(), log_dir)) - - offline_prov_log_path = StandardNamesAndIds.get_prov_log_file_path(log_dir, workflow_name, wf_start_time) - handler = logging.FileHandler(offline_prov_log_path, mode='a+', delay=False) - offline_prov_log = logging.getLogger("OFFLINE_PROV") - offline_prov_log.setLevel("DEBUG") - offline_prov_log.addHandler(handler) - #should_send_to_file = True return ManagedPersister( workflow_name=workflow_name, @@ -63,6 +53,7 @@ def _build_managed_persister( db_name=db_name, bag_size=bag_size, should_send_to_file=should_send_to_file, + log_dir=log_dir, should_send_to_service=should_send_to_service) @staticmethod diff --git a/src/provlake/persistence/managed_persister.py b/src/provlake/persistence/managed_persister.py index 8acc30f..5d6c833 100644 --- a/src/provlake/persistence/managed_persister.py +++ b/src/provlake/persistence/managed_persister.py @@ -1,17 +1,21 @@ +import os +from typing import List +import json +import logging import traceback -from requests_futures.sessions import FuturesSession from time import sleep -from urllib.parse import urljoin -import json -from requests.exceptions import ConnectionError import urllib3 -from typing import List + +from requests.exceptions import ConnectionError +from requests_futures.sessions import FuturesSession +from urllib.parse import urljoin + from provlake.persistence.persister import Persister -urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) from provlake.model.activity_prov_obj import ProvRequestObj -import logging +from provlake.utils.constants import StandardNamesAndIds + +urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) -offline_prov_log = logging.getLogger("OFFLINE_PROV") logger = logging.getLogger('PROV') @@ -19,7 +23,7 @@ class ManagedPersister(Persister): def __init__(self, workflow_name: str, wf_start_time: float, service_url: str, wf_exec_id=None, context: str = None, with_validation: bool = False, db_name: str = None, bag_size: int = 1, - should_send_to_file: bool = False, should_send_to_service: bool = True): + log_dir:str='.', should_send_to_file: bool = False, should_send_to_service: bool = True): super().__init__(workflow_name, wf_start_time, wf_exec_id) self.retrospective_url = urljoin(service_url, "retrospective-provenance") self.prospective_url = urljoin(service_url, "prospective-provenance") @@ -36,6 +40,22 @@ def __init__(self, workflow_name: str, wf_start_time: float, service_url: str, w logger.debug("You are using the Service URL: " + service_url) self.session = FuturesSession() + if self.should_send_to_file : + if not os.path.exists(log_dir): + os.makedirs(os.path.join(os.getcwd(), log_dir)) + + self.log_file_path = StandardNamesAndIds.get_prov_log_file_path(log_dir, workflow_name, + wf_start_time) + handler = logging.FileHandler(self.log_file_path, mode='a+', delay=False) + self.offline_prov_log = logging.getLogger("OFFLINE_PROV") + self.offline_prov_log = logging.getLogger("OFFLINE_PROV") + self.offline_prov_log.setLevel("DEBUG") + self.offline_prov_log.addHandler(handler) + # should_send_to_file = True + + def get_file_path(self): + return self.log_file_path + def add_request(self, persistence_request: ProvRequestObj): try: request_data = persistence_request.as_dict() @@ -52,6 +72,9 @@ def add_request(self, persistence_request: ProvRequestObj): traceback.print_exc() pass + def get_file_path(self): + return self.log_file_path + def _close(self): if self.session: logger.info("Waiting to get response from all submitted provenance tasks...") @@ -78,7 +101,7 @@ def _flush(self, all_and_wait: bool = False): logger.debug("Going to flush a part. Flushing " + str(len(to_flush)) + " out of " + str(len(self.requests_queue))) if self.should_send_to_file: - offline_prov_log.debug(json.dumps(to_flush)) + self.offline_prov_log.debug(json.dumps(to_flush)) if self.should_send_to_service: self._send_to_service(to_flush) diff --git a/src/provlake/persistence/persister.py b/src/provlake/persistence/persister.py index d047dba..33d640c 100644 --- a/src/provlake/persistence/persister.py +++ b/src/provlake/persistence/persister.py @@ -17,6 +17,10 @@ def __init__(self, workflow_name: str, wf_start_time: float, wf_exec_id=None): def add_request(self, persistence_request_obj: ProvRequestObj): raise NotImplementedError + @abstractmethod + def get_file_path(self): + return NotImplementedError + def _close(self): pass @@ -28,3 +32,4 @@ def get_workflow_name(self) -> str: def get_wf_exec_id(self) -> float: return self._wf_exec_id + diff --git a/src/provlake/utils/args_handler.py b/src/provlake/utils/args_handler.py index 5cd17af..184281d 100644 --- a/src/provlake/utils/args_handler.py +++ b/src/provlake/utils/args_handler.py @@ -1,25 +1,27 @@ from provlake.utils.constants import Vocabulary -# TODO Finish adding the keys in the returning dicts as constants def get_dict(_dict: dict) -> dict: if _dict is None: return None if len(_dict) == 0: return {} return { - Vocabulary.PROV_ATTR_TYPE: "dict", - "values": _dict + Vocabulary.PROV_ATTR_TYPE: Vocabulary.DICT_TYPE, + Vocabulary.VALUES: _dict } def get_list(_list: list) -> dict: + ''' + This will make ProvLake create one node per element in the list + ''' if _list is None: return None if len(_list) == 0: return [] return { - Vocabulary.PROV_ATTR_TYPE: "list", - "values": _list + Vocabulary.PROV_ATTR_TYPE: Vocabulary.LIST_TYPE, + Vocabulary.VALUES: _list } @@ -36,8 +38,8 @@ def get_recursive_dicts(_dict: dict) -> dict: else: values[k] = v return { - Vocabulary.PROV_ATTR_TYPE: "dict", - "values": values + Vocabulary.PROV_ATTR_TYPE: Vocabulary.DICT_TYPE, + Vocabulary.VALUES: values } @@ -45,16 +47,16 @@ def add_custom_metadata(value, custom_metadata: dict = None) -> dict: if not custom_metadata: return value return { - Vocabulary.PROV_ATTR_TYPE: "attribute_value", - "values": value, - "custom_metadata": custom_metadata + Vocabulary.PROV_ATTR_TYPE: Vocabulary.ATTRIBUTE_VALUE_TYPE, + Vocabulary.VALUES: value, + Vocabulary.CUSTOM_METADATA: custom_metadata } def get_data_reference(value, datastore_id) -> dict: return { - "values": value, + Vocabulary.VALUES: value, Vocabulary.PROV_ATTR_TYPE: Vocabulary.DATA_REFERENCE_TYPE, - "datastore_id": datastore_id + Vocabulary.DATASTORE_ID: datastore_id } diff --git a/src/provlake/utils/constants.py b/src/provlake/utils/constants.py index 491a135..b9d3e95 100644 --- a/src/provlake/utils/constants.py +++ b/src/provlake/utils/constants.py @@ -20,6 +20,9 @@ class Vocabulary: ACT_TYPE = "act_type" ATTRIBUTE_ASSOCIATIONS = "attribute_associations" DATA_REFERENCE_TYPE = "data_reference" + ATTRIBUTE_VALUE_TYPE = "attribute_value" + DICT_TYPE = "dict" + LIST_TYPE = "list" DATASTORE_ID = "datastore_id" PROV_ATTR_TYPE = "prov_attr_type" diff --git a/src/provlake/utils/prov_utils.py b/src/provlake/utils/prov_utils.py index 9d5d758..a2b4d9e 100644 --- a/src/provlake/utils/prov_utils.py +++ b/src/provlake/utils/prov_utils.py @@ -2,19 +2,36 @@ import hashlib import os import glob +import json -def convert_timestamp(timestamp: float): +def convert_timestamp(timestamp: float) -> str: t = datetime.fromtimestamp(timestamp) return t.strftime('%Y%m%dT%Hh%Mm%Ss%f')[:-3] -def id_hash(val: str): +def id_hash(val: str) -> str: return hashlib.md5(val.encode()).hexdigest() -def delete_prov_logs(log_dir: str = '.'): +def delete_prov_logs(log_dir: str = '.') -> None: try: [os.remove(f) for f in glob.glob(os.path.join(log_dir, 'prov-*.log'))] except: pass + + +def stringfy_inner_dicts_in_dicts(_dict: dict) -> dict: + ret = dict() + for k in _dict: + if type(_dict[k]) == dict: + ret[k] = json.dumps(_dict[k]) + else: + ret[k] = _dict[k] + return ret + + +def stringfy_inner_dicts_in_lists(_list: list) -> list: + return [json.dumps(el, indent=JSON_INDENT_SIZE) if type(el) == dict else el for el in _list] + + From fa6778212ad3d303b8ed03dc6d8fc72efea64c93 Mon Sep 17 00:00:00 2001 From: Leonardo Guerreiro Azevedo Date: Thu, 1 Apr 2021 15:10:13 -0300 Subject: [PATCH 03/13] changed data_store_id type in the vocabulary --- src/provlake/utils/constants.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/provlake/utils/constants.py b/src/provlake/utils/constants.py index b9d3e95..4803c73 100644 --- a/src/provlake/utils/constants.py +++ b/src/provlake/utils/constants.py @@ -23,7 +23,7 @@ class Vocabulary: ATTRIBUTE_VALUE_TYPE = "attribute_value" DICT_TYPE = "dict" LIST_TYPE = "list" - DATASTORE_ID = "datastore_id" + DATA_STORE_ID = "data_store_id" PROV_ATTR_TYPE = "prov_attr_type" class Status: From e48c2806baef65076da0b62d41b3e024a6a5932e Mon Sep 17 00:00:00 2001 From: Raphael Melo Thiago Date: Tue, 6 Apr 2021 10:37:46 -0300 Subject: [PATCH 04/13] Fixing missing constant JSON_INDENT_SIZE --- src/provlake/utils/prov_utils.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/provlake/utils/prov_utils.py b/src/provlake/utils/prov_utils.py index a2b4d9e..4fc26e1 100644 --- a/src/provlake/utils/prov_utils.py +++ b/src/provlake/utils/prov_utils.py @@ -4,6 +4,8 @@ import glob import json +JSON_INDENT_SIZE = 1 + def convert_timestamp(timestamp: float) -> str: t = datetime.fromtimestamp(timestamp) @@ -33,5 +35,3 @@ def stringfy_inner_dicts_in_dicts(_dict: dict) -> dict: def stringfy_inner_dicts_in_lists(_list: list) -> list: return [json.dumps(el, indent=JSON_INDENT_SIZE) if type(el) == dict else el for el in _list] - - From 3cbd9dc8b046ccfc685f253930d293f8d6cf4334 Mon Sep 17 00:00:00 2001 From: Raphael Melo Thiago Date: Tue, 6 Apr 2021 10:52:18 -0300 Subject: [PATCH 05/13] bumping version --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index 2c6a180..997f1dc 100644 --- a/setup.py +++ b/setup.py @@ -6,7 +6,7 @@ setup(name='provlake', - version='0.1.3', + version='0.1.4', description='A Python lib to capture multiworkflow provenance data from Python Scripts', url='http://ibm.biz/provlake', author='IBM Research', From b31031a28f06dfc73b09c8b59e91d4eb496cc1a6 Mon Sep 17 00:00:00 2001 From: Leonardo Guerreiro Azevedo Date: Mon, 12 Apr 2021 10:52:12 -0300 Subject: [PATCH 06/13] fixes in the use of Vocabulary DATA_STORE_ID property. --- src/provlake/utils/args_handler.py | 4 ++-- tests/easing_complex_attributes.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/provlake/utils/args_handler.py b/src/provlake/utils/args_handler.py index 184281d..5e40beb 100644 --- a/src/provlake/utils/args_handler.py +++ b/src/provlake/utils/args_handler.py @@ -53,10 +53,10 @@ def add_custom_metadata(value, custom_metadata: dict = None) -> dict: } -def get_data_reference(value, datastore_id) -> dict: +def get_data_reference(value, data_store_id) -> dict: return { Vocabulary.VALUES: value, Vocabulary.PROV_ATTR_TYPE: Vocabulary.DATA_REFERENCE_TYPE, - Vocabulary.DATASTORE_ID: datastore_id + Vocabulary.DATA_STORE_ID: data_store_id } diff --git a/tests/easing_complex_attributes.py b/tests/easing_complex_attributes.py index fe0e9ae..7066e09 100644 --- a/tests/easing_complex_attributes.py +++ b/tests/easing_complex_attributes.py @@ -58,7 +58,7 @@ {"a": 1}, {"b": 2}, ], - "seismic_uri": get_data_reference("http://ibm.org/Netherlands", datastore_id="Allegro1"), + "seismic_uri": get_data_reference("http://ibm.org/Netherlands", data_store_id="Allegro1"), "mysimple_arg": add_custom_metadata(value="myval", custom_metadata={"custom": 1}), "mysimple_arg2": add_custom_metadata(value={"myval2": "val2"}, custom_metadata={"custom": 1}), "mydict_list": get_list([ From 88c0d21892244fcd2b886c13fba8226b44317a8a Mon Sep 17 00:00:00 2001 From: Renan Souza Date: Wed, 9 Jun 2021 11:37:34 -0300 Subject: [PATCH 07/13] Improving id generation for files --- src/provlake/utils/constants.py | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/src/provlake/utils/constants.py b/src/provlake/utils/constants.py index 4803c73..7aa20cc 100644 --- a/src/provlake/utils/constants.py +++ b/src/provlake/utils/constants.py @@ -62,12 +62,18 @@ def get_prov_log_file_path(log_dir:str, workflow_name:str, wf_start_time:float) return os.path.abspath(os.path.join(log_dir, 'prov-{}-{}.log'.format(workflow_name, wf_start_time))) @staticmethod - def get_id_atv(attribute, value): - if type(value) in [dict, list]: - return attribute + "_" + id_hash(str(value)) + def get_id_atv(attribute, value, value_type=None): + if value_type: + if value_type == Vocabulary.DATA_REFERENCE_TYPE: + return "data_ref#" + str(value) + else: + return attribute + "_" + str(value) else: - # TODO if its a float, replace the dots - return attribute + "_" + str(value) + if type(value) in [dict, list]: + return attribute + "_" + id_hash(str(value)) + else: + # TODO if its a float, replace the dots + return attribute + "_" + str(value) @staticmethod def get_wfe_id(workflow_name: str, wf_exec_id): From 9f301a85d6a93198213aa5dfe92cfdf14284b470 Mon Sep 17 00:00:00 2001 From: Renan Souza Date: Fri, 16 Jul 2021 20:17:26 -0300 Subject: [PATCH 08/13] Using UTC for time format exchange --- src/provlake/utils/prov_utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/provlake/utils/prov_utils.py b/src/provlake/utils/prov_utils.py index 4fc26e1..2b75619 100644 --- a/src/provlake/utils/prov_utils.py +++ b/src/provlake/utils/prov_utils.py @@ -8,7 +8,7 @@ def convert_timestamp(timestamp: float) -> str: - t = datetime.fromtimestamp(timestamp) + t = datetime.utcfromtimestamp(timestamp) return t.strftime('%Y%m%dT%Hh%Mm%Ss%f')[:-3] From db84b489f3cedbf7b6cd3188031f2866d999670a Mon Sep 17 00:00:00 2001 From: Renan Souza Date: Wed, 21 Jul 2021 18:27:30 -0300 Subject: [PATCH 09/13] Adding new constantsg --- src/provlake/utils/constants.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/src/provlake/utils/constants.py b/src/provlake/utils/constants.py index 7aa20cc..7d82ad7 100644 --- a/src/provlake/utils/constants.py +++ b/src/provlake/utils/constants.py @@ -105,3 +105,11 @@ def get_dte_id(wfe_id, dt_name: str, prov_task: dict): dte_id = wfe_id + "_" + dt_name + "_" + str(task_id) return dte_id + + @staticmethod + def get_dtes_in_wfe_ctx(wfe_id): + return wfe_id + "dtes" + + @staticmethod + def get_cycles_in_wfe_ctx(wfe_id): + return wfe_id + "ce" From b280716058acb3aa414239480ea44ed09e18bec2 Mon Sep 17 00:00:00 2001 From: Renan Souza Date: Thu, 22 Jul 2021 16:30:38 -0300 Subject: [PATCH 10/13] minor adjustment in dte ctx id --- src/provlake/utils/constants.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/provlake/utils/constants.py b/src/provlake/utils/constants.py index 7d82ad7..37e6171 100644 --- a/src/provlake/utils/constants.py +++ b/src/provlake/utils/constants.py @@ -26,6 +26,7 @@ class Vocabulary: DATA_STORE_ID = "data_store_id" PROV_ATTR_TYPE = "prov_attr_type" + class Status: GENERATED = "GENERATED" @@ -108,8 +109,8 @@ def get_dte_id(wfe_id, dt_name: str, prov_task: dict): @staticmethod def get_dtes_in_wfe_ctx(wfe_id): - return wfe_id + "dtes" + return wfe_id + "_dtes" @staticmethod def get_cycles_in_wfe_ctx(wfe_id): - return wfe_id + "ce" + return wfe_id + "_ce" From 6e433eb72d11684798f5698c3596ca51863fa65b Mon Sep 17 00:00:00 2001 From: Renan Souza Date: Tue, 3 Aug 2021 14:13:09 -0300 Subject: [PATCH 11/13] removing 'data_ref' from data ref ids --- src/provlake/utils/constants.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/provlake/utils/constants.py b/src/provlake/utils/constants.py index 37e6171..e189b55 100644 --- a/src/provlake/utils/constants.py +++ b/src/provlake/utils/constants.py @@ -66,7 +66,7 @@ def get_prov_log_file_path(log_dir:str, workflow_name:str, wf_start_time:float) def get_id_atv(attribute, value, value_type=None): if value_type: if value_type == Vocabulary.DATA_REFERENCE_TYPE: - return "data_ref#" + str(value) + return "" + str(value) else: return attribute + "_" + str(value) else: From bb69ca35d00591709b998e07d9bf4322294881ea Mon Sep 17 00:00:00 2001 From: Renan Souza Date: Wed, 4 Aug 2021 18:06:56 -0300 Subject: [PATCH 12/13] Updating constants --- src/provlake/utils/constants.py | 26 ++++++++++++++++++++++---- 1 file changed, 22 insertions(+), 4 deletions(-) diff --git a/src/provlake/utils/constants.py b/src/provlake/utils/constants.py index e189b55..a846e0e 100644 --- a/src/provlake/utils/constants.py +++ b/src/provlake/utils/constants.py @@ -90,6 +90,8 @@ def get_wfe_id(workflow_name: str, wf_exec_id): wfe_id = workflow_name.lower() + "_exec_" + str(wf_exec_id) return wfe_id + + @staticmethod def get_dte_id(wfe_id, dt_name: str, prov_task: dict): task_id = prov_task["id"] @@ -108,9 +110,25 @@ def get_dte_id(wfe_id, dt_name: str, prov_task: dict): return dte_id @staticmethod - def get_dtes_in_wfe_ctx(wfe_id): - return wfe_id + "_dtes" + def get_wfe_ctx_id(wfe_id): + return wfe_id+"_ctx" + + @staticmethod + def get_cce_ctx_id(cce_id): + return cce_id + "_ctx" + + @staticmethod + def get_cci_ctx_id(cci_id): + return cci_id + "_ctx" + + @staticmethod + def get_wfe_instantiations_ctx_id(wfe_id): + return wfe_id + "_instantiation" + + @staticmethod + def get_cce_instantiations_ctx_id(cce_id): + return cce_id + "_instantiation" @staticmethod - def get_cycles_in_wfe_ctx(wfe_id): - return wfe_id + "_ce" + def get_cci_instantiations_ctx_id(cci_id): + return cci_id + "_instantiation" From d8502bfcbfaf70d4a4232f7c9d5b32460c715950 Mon Sep 17 00:00:00 2001 From: Renan Souza Date: Thu, 5 Aug 2021 13:39:37 -0300 Subject: [PATCH 13/13] Updating constants --- setup.py | 2 +- src/provlake/__init__.py | 5 ----- src/provlake/utils/constants.py | 12 ++++++------ 3 files changed, 7 insertions(+), 12 deletions(-) diff --git a/setup.py b/setup.py index 997f1dc..e18e65b 100644 --- a/setup.py +++ b/setup.py @@ -6,7 +6,7 @@ setup(name='provlake', - version='0.1.4', + version='0.1.5', description='A Python lib to capture multiworkflow provenance data from Python Scripts', url='http://ibm.biz/provlake', author='IBM Research', diff --git a/src/provlake/__init__.py b/src/provlake/__init__.py index 1b9af89..6c0f7d8 100644 --- a/src/provlake/__init__.py +++ b/src/provlake/__init__.py @@ -26,7 +26,6 @@ def _build_managed_persister( db_name: str, wf_exec_id=None ) -> ManagedPersister: - should_send_to_service = False if service_url is not None: should_send_to_service = True @@ -34,10 +33,6 @@ def _build_managed_persister( if not bag_size: bag_size = int(os.getenv("PROV_BAG_SIZE", 1)) - #self.last_task_id = 0 - #task_id = str(self.last_task_id) + "_" + str(id(self)) if self.cores > 1 else str(self.last_task_id) - #self.tasks = dict() - if not should_send_to_service: assert should_send_to_file is True, "If you are using ProvLake in offline mode, " \ "you need to log prov data to file. Check your 'should_send_to_file' and " \ diff --git a/src/provlake/utils/constants.py b/src/provlake/utils/constants.py index a846e0e..73e81da 100644 --- a/src/provlake/utils/constants.py +++ b/src/provlake/utils/constants.py @@ -111,24 +111,24 @@ def get_dte_id(wfe_id, dt_name: str, prov_task: dict): @staticmethod def get_wfe_ctx_id(wfe_id): - return wfe_id+"_ctx" + return wfe_id+"_wfe_ctx" @staticmethod def get_cce_ctx_id(cce_id): - return cce_id + "_ctx" + return cce_id + "_cce_ctx" @staticmethod def get_cci_ctx_id(cci_id): - return cci_id + "_ctx" + return cci_id + "_cci_ctx" @staticmethod def get_wfe_instantiations_ctx_id(wfe_id): - return wfe_id + "_instantiation" + return wfe_id + "_wfe_instantiation_ctx" @staticmethod def get_cce_instantiations_ctx_id(cce_id): - return cce_id + "_instantiation" + return cce_id + "_cce_instantiation_ctx" @staticmethod def get_cci_instantiations_ctx_id(cci_id): - return cci_id + "_instantiation" + return cci_id + "_cci_instantiation_ctx"