Skip to content

Commit

Permalink
Merge pull request #10 from IBM/dev
Browse files Browse the repository at this point in the history
Stable into Master
  • Loading branch information
renan-souza authored Aug 11, 2021
2 parents bb1b70e + d8502bf commit 8cf0a2f
Show file tree
Hide file tree
Showing 11 changed files with 157 additions and 50 deletions.
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@


setup(name='provlake',
version='0.1.3',
version='0.1.5',
description='A Python lib to capture multiworkflow provenance data from Python Scripts',
url='http://ibm.biz/provlake',
author='IBM Research',
Expand Down
16 changes: 1 addition & 15 deletions src/provlake/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,32 +26,17 @@ def _build_managed_persister(
db_name: str,
wf_exec_id=None
) -> ManagedPersister:

should_send_to_service = False
if service_url is not None:
should_send_to_service = True

if not bag_size:
bag_size = int(os.getenv("PROV_BAG_SIZE", 1))

#self.last_task_id = 0
#task_id = str(self.last_task_id) + "_" + str(id(self)) if self.cores > 1 else str(self.last_task_id)
#self.tasks = dict()

if not should_send_to_service:
assert should_send_to_file is True, "If you are using ProvLake in offline mode, " \
"you need to log prov data to file. Check your 'should_send_to_file' and " \
"'should_send_to_service' parameters."
if should_send_to_file:
if not os.path.exists(log_dir):
os.makedirs(os.path.join(os.getcwd(), log_dir))

offline_prov_log_path = StandardNamesAndIds.get_prov_log_file_path(log_dir, workflow_name, wf_start_time)
handler = logging.FileHandler(offline_prov_log_path, mode='a+', delay=False)
offline_prov_log = logging.getLogger("OFFLINE_PROV")
offline_prov_log.setLevel("DEBUG")
offline_prov_log.addHandler(handler)
#should_send_to_file = True

return ManagedPersister(
workflow_name=workflow_name,
Expand All @@ -63,6 +48,7 @@ def _build_managed_persister(
db_name=db_name,
bag_size=bag_size,
should_send_to_file=should_send_to_file,
log_dir=log_dir,
should_send_to_service=should_send_to_service)

@staticmethod
Expand Down
2 changes: 1 addition & 1 deletion src/provlake/capture/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ class ProvTask(ActivityCapture):

def __init__(self, prov_persister: Persister, data_transformation_name: str, input_args=dict(),
parent_cycle_name: str = None, parent_cycle_iteration=None, person_id: str = None, task_id=None,
custom_metadata:dict=None, attribute_associations:dict=None, generated_time:float=None):
custom_metadata: dict = None, attribute_associations: dict = None, generated_time: float=None):
super().__init__(prov_persister, custom_metadata)
if self._prov_persister is None:
return
Expand Down
43 changes: 33 additions & 10 deletions src/provlake/persistence/managed_persister.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,29 @@
import os
from typing import List
import json
import logging
import traceback
from requests_futures.sessions import FuturesSession
from time import sleep
from urllib.parse import urljoin
import json
from requests.exceptions import ConnectionError
import urllib3
from typing import List

from requests.exceptions import ConnectionError
from requests_futures.sessions import FuturesSession
from urllib.parse import urljoin

from provlake.persistence.persister import Persister
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
from provlake.model.activity_prov_obj import ProvRequestObj
import logging
from provlake.utils.constants import StandardNamesAndIds

urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

offline_prov_log = logging.getLogger("OFFLINE_PROV")
logger = logging.getLogger('PROV')


class ManagedPersister(Persister):

def __init__(self, workflow_name: str, wf_start_time: float, service_url: str, wf_exec_id=None, context: str = None,
with_validation: bool = False, db_name: str = None, bag_size: int = 1,
should_send_to_file: bool = False, should_send_to_service: bool = True):
log_dir:str='.', should_send_to_file: bool = False, should_send_to_service: bool = True):
super().__init__(workflow_name, wf_start_time, wf_exec_id)
self.retrospective_url = urljoin(service_url, "retrospective-provenance")
self.prospective_url = urljoin(service_url, "prospective-provenance")
Expand All @@ -36,6 +40,22 @@ def __init__(self, workflow_name: str, wf_start_time: float, service_url: str, w
logger.debug("You are using the Service URL: " + service_url)
self.session = FuturesSession()

if self.should_send_to_file :
if not os.path.exists(log_dir):
os.makedirs(os.path.join(os.getcwd(), log_dir))

self.log_file_path = StandardNamesAndIds.get_prov_log_file_path(log_dir, workflow_name,
wf_start_time)
handler = logging.FileHandler(self.log_file_path, mode='a+', delay=False)
self.offline_prov_log = logging.getLogger("OFFLINE_PROV")
self.offline_prov_log = logging.getLogger("OFFLINE_PROV")
self.offline_prov_log.setLevel("DEBUG")
self.offline_prov_log.addHandler(handler)
# should_send_to_file = True

def get_file_path(self):
return self.log_file_path

def add_request(self, persistence_request: ProvRequestObj):
try:
request_data = persistence_request.as_dict()
Expand All @@ -52,6 +72,9 @@ def add_request(self, persistence_request: ProvRequestObj):
traceback.print_exc()
pass

def get_file_path(self):
return self.log_file_path

def _close(self):
if self.session:
logger.info("Waiting to get response from all submitted provenance tasks...")
Expand All @@ -78,7 +101,7 @@ def _flush(self, all_and_wait: bool = False):
logger.debug("Going to flush a part. Flushing " + str(len(to_flush)) + " out of " +
str(len(self.requests_queue)))
if self.should_send_to_file:
offline_prov_log.debug(json.dumps(to_flush))
self.offline_prov_log.debug(json.dumps(to_flush))
if self.should_send_to_service:
self._send_to_service(to_flush)

Expand Down
5 changes: 5 additions & 0 deletions src/provlake/persistence/persister.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ def __init__(self, workflow_name: str, wf_start_time: float, wf_exec_id=None):
def add_request(self, persistence_request_obj: ProvRequestObj):
raise NotImplementedError

@abstractmethod
def get_file_path(self):
return NotImplementedError

def _close(self):
pass

Expand All @@ -28,3 +32,4 @@ def get_workflow_name(self) -> str:

def get_wf_exec_id(self) -> float:
return self._wf_exec_id

42 changes: 33 additions & 9 deletions src/provlake/utils/args_handler.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,31 @@
def get_dict(_dict:dict) -> dict:
from provlake.utils.constants import Vocabulary

def get_dict(_dict: dict) -> dict:
if _dict is None:
return None
if len(_dict) == 0:
return {}
return {
"type": "dict",
"values": _dict
Vocabulary.PROV_ATTR_TYPE: Vocabulary.DICT_TYPE,
Vocabulary.VALUES: _dict
}


def get_list(_list:list) -> dict:
def get_list(_list: list) -> dict:
'''
This will make ProvLake create one node per element in the list
'''
if _list is None:
return None
if len(_list) == 0:
return []
return {
"type": "list",
"values": _list
Vocabulary.PROV_ATTR_TYPE: Vocabulary.LIST_TYPE,
Vocabulary.VALUES: _list
}


def get_recursive_dicts(_dict:dict) -> dict:
def get_recursive_dicts(_dict: dict) -> dict:
if _dict is None:
return None
if len(_dict) == 0:
Expand All @@ -33,6 +38,25 @@ def get_recursive_dicts(_dict:dict) -> dict:
else:
values[k] = v
return {
"type": "dict",
"values": values
Vocabulary.PROV_ATTR_TYPE: Vocabulary.DICT_TYPE,
Vocabulary.VALUES: values
}


def add_custom_metadata(value, custom_metadata: dict = None) -> dict:
if not custom_metadata:
return value
return {
Vocabulary.PROV_ATTR_TYPE: Vocabulary.ATTRIBUTE_VALUE_TYPE,
Vocabulary.VALUES: value,
Vocabulary.CUSTOM_METADATA: custom_metadata
}


def get_data_reference(value, data_store_id) -> dict:
return {
Vocabulary.VALUES: value,
Vocabulary.PROV_ATTR_TYPE: Vocabulary.DATA_REFERENCE_TYPE,
Vocabulary.DATA_STORE_ID: data_store_id
}

50 changes: 44 additions & 6 deletions src/provlake/utils/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,12 @@ class Vocabulary:
WORKFLOW_NAME = "dataflow_name"
ACT_TYPE = "act_type"
ATTRIBUTE_ASSOCIATIONS = "attribute_associations"
DATA_REFERENCE_TYPE = "data_reference"
ATTRIBUTE_VALUE_TYPE = "attribute_value"
DICT_TYPE = "dict"
LIST_TYPE = "list"
DATA_STORE_ID = "data_store_id"
PROV_ATTR_TYPE = "prov_attr_type"


class Status:
Expand Down Expand Up @@ -57,12 +63,18 @@ def get_prov_log_file_path(log_dir:str, workflow_name:str, wf_start_time:float)
return os.path.abspath(os.path.join(log_dir, 'prov-{}-{}.log'.format(workflow_name, wf_start_time)))

@staticmethod
def get_id_atv(attribute, value):
if type(value) in [dict, list]:
return attribute + "_" + id_hash(str(value))
def get_id_atv(attribute, value, value_type=None):
if value_type:
if value_type == Vocabulary.DATA_REFERENCE_TYPE:
return "" + str(value)
else:
return attribute + "_" + str(value)
else:
# TODO if its a float, replace the dots
return attribute + "_" + str(value)
if type(value) in [dict, list]:
return attribute + "_" + id_hash(str(value))
else:
# TODO if its a float, replace the dots
return attribute + "_" + str(value)

@staticmethod
def get_wfe_id(workflow_name: str, wf_exec_id):
Expand All @@ -78,8 +90,10 @@ def get_wfe_id(workflow_name: str, wf_exec_id):
wfe_id = workflow_name.lower() + "_exec_" + str(wf_exec_id)
return wfe_id



@staticmethod
def get_dte_id(wfe_id, dt_name:str , prov_task: dict):
def get_dte_id(wfe_id, dt_name: str, prov_task: dict):
task_id = prov_task["id"]
# TODO better wfe_id + dte_name + timestamp
if Vocabulary.GENERATED_TIME in prov_task and \
Expand All @@ -94,3 +108,27 @@ def get_dte_id(wfe_id, dt_name:str , prov_task: dict):
dte_id = wfe_id + "_" + dt_name + "_" + str(task_id)

return dte_id

@staticmethod
def get_wfe_ctx_id(wfe_id):
return wfe_id+"_wfe_ctx"

@staticmethod
def get_cce_ctx_id(cce_id):
return cce_id + "_cce_ctx"

@staticmethod
def get_cci_ctx_id(cci_id):
return cci_id + "_cci_ctx"

@staticmethod
def get_wfe_instantiations_ctx_id(wfe_id):
return wfe_id + "_wfe_instantiation_ctx"

@staticmethod
def get_cce_instantiations_ctx_id(cce_id):
return cce_id + "_cce_instantiation_ctx"

@staticmethod
def get_cci_instantiations_ctx_id(cci_id):
return cci_id + "_cci_instantiation_ctx"
25 changes: 21 additions & 4 deletions src/provlake/utils/prov_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,36 @@
import hashlib
import os
import glob
import json

JSON_INDENT_SIZE = 1

def convert_timestamp(timestamp: float):
t = datetime.fromtimestamp(timestamp)

def convert_timestamp(timestamp: float) -> str:
t = datetime.utcfromtimestamp(timestamp)
return t.strftime('%Y%m%dT%Hh%Mm%Ss%f')[:-3]


def id_hash(val: str):
def id_hash(val: str) -> str:
return hashlib.md5(val.encode()).hexdigest()


def delete_prov_logs(log_dir: str = '.'):
def delete_prov_logs(log_dir: str = '.') -> None:
try:
[os.remove(f) for f in glob.glob(os.path.join(log_dir, 'prov-*.log'))]
except:
pass


def stringfy_inner_dicts_in_dicts(_dict: dict) -> dict:
ret = dict()
for k in _dict:
if type(_dict[k]) == dict:
ret[k] = json.dumps(_dict[k])
else:
ret[k] = _dict[k]
return ret


def stringfy_inner_dicts_in_lists(_list: list) -> list:
return [json.dumps(el, indent=JSON_INDENT_SIZE) if type(el) == dict else el for el in _list]
3 changes: 2 additions & 1 deletion tests/complex_attributes.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@
}
},
"ordinary_list": [1,2,3],
"ordinary_dict": {"a":"b"}
"ordinary_dict": {"a": "b"}

}
with ProvTask(prov, "act_1", in_args) as prov_task:

Expand Down
17 changes: 15 additions & 2 deletions tests/easing_complex_attributes.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from provlake import ProvLake
from provlake.capture import ProvWorkflow, ProvTask
from provlake.utils.args_handler import get_dict, get_list, get_recursive_dicts
from provlake.utils.args_handler import get_dict, get_list, get_recursive_dicts, get_data_reference, add_custom_metadata
import sys
import json
"""
Expand Down Expand Up @@ -57,7 +57,20 @@
"list_of_dicts": [
{"a": 1},
{"b": 2},
]
],
"seismic_uri": get_data_reference("http://ibm.org/Netherlands", data_store_id="Allegro1"),
"mysimple_arg": add_custom_metadata(value="myval", custom_metadata={"custom": 1}),
"mysimple_arg2": add_custom_metadata(value={"myval2": "val2"}, custom_metadata={"custom": 1}),
"mydict_list": get_list([
{
"name": "a",
"path": "/home/data/file_a.dat"
},
{
"name": "b",
"path": "/home/data/file_b.dat"
}
])
}
with ProvTask(prov, "act_1", in_args) as prov_task:
out_args = {"out": 50}
Expand Down
2 changes: 1 addition & 1 deletion tests/simple_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ def calc_factorial(n):
return result


prov = ProvLake.get_persister("factorial_dataflow")
prov = ProvLake.get_persister("factorial_dataflow", service_url="http://localhost:5000")
with ProvWorkflow(prov):

in_args = {"n": 5}
Expand Down

0 comments on commit 8cf0a2f

Please sign in to comment.