Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DC-2692] synthetic dataset script version 1 #1369

Open
wants to merge 19 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 16 additions & 8 deletions data_steward/cdr_cleaner/clean_cdr.py
Original file line number Diff line number Diff line change
Expand Up @@ -430,7 +430,12 @@
DataStage.DATA_CONSISTENCY.value:
DATA_CONSISTENCY_CLEANING_CLASSES,
DataStage.CRON_RETRACTION.value:
CRON_RETRACTION_CLEANING_CLASSES
CRON_RETRACTION_CLEANING_CLASSES,
DataStage.SYNTHETIC.value:
RDR_CLEANING_CLASSES + COMBINED_CLEANING_CLASSES +
REGISTERED_TIER_DEID_CLEANING_CLASSES +
REGISTERED_TIER_DEID_BASE_CLEANING_CLASSES +
REGISTERED_TIER_DEID_CLEAN_CLEANING_CLASSES,
}


Expand Down Expand Up @@ -589,20 +594,23 @@ def main(args=None):
sandbox_dataset_id=args.sandbox_dataset_id,
rules=rules,
table_namer=table_namer,
run_synthetic=args.data_stage.value == DataStage.SYNTHETIC.value,
**kwargs)
for query in query_list:
LOGGER.info(query)
else:
# Disable logging if running retraction cron
if not constants.global_variables.DISABLE_SANDBOX:
clean_engine.add_console_logging(args.console_log)
clean_engine.clean_dataset(project_id=args.project_id,
dataset_id=args.dataset_id,
sandbox_dataset_id=args.sandbox_dataset_id,
rules=rules,
table_namer=table_namer,
run_as=args.run_as,
**kwargs)
clean_engine.clean_dataset(
project_id=args.project_id,
dataset_id=args.dataset_id,
sandbox_dataset_id=args.sandbox_dataset_id,
rules=rules,
table_namer=table_namer,
run_as=args.run_as,
run_synthetic=args.data_stage.value == DataStage.SYNTHETIC.value,
**kwargs)


if __name__ == '__main__':
Expand Down
73 changes: 48 additions & 25 deletions data_steward/cdr_cleaner/clean_cdr_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ def clean_dataset(project_id,
rules,
table_namer='',
run_as=None,
run_synthetic=False,
**kwargs):
"""
Run the assigned cleaning rules and return list of BQ job objects
Expand Down Expand Up @@ -60,7 +61,7 @@ def clean_dataset(project_id,
clazz = rule[0]
query_function, setup_function, rule_info = infer_rule(
clazz, project_id, dataset_id, sandbox_dataset_id, table_namer,
**kwargs)
run_synthetic, **kwargs)

LOGGER.info(
f"Applying cleaning rule {rule_info[cdr_consts.MODULE_NAME]} "
Expand Down Expand Up @@ -190,7 +191,7 @@ def get_custom_kwargs(clazz, **kwargs):


def infer_rule(clazz, project_id, dataset_id, sandbox_dataset_id, table_namer,
**kwargs):
run_synthetic, **kwargs):
"""
Extract information about the cleaning rule

Expand All @@ -199,6 +200,8 @@ def infer_rule(clazz, project_id, dataset_id, sandbox_dataset_id, table_namer,
:param dataset_id: identifies the dataset to clean
:param sandbox_dataset_id: identifies the sandbox dataset to store backup rows
:param table_namer: source differentiator value expected to be the same for all rules run on the same dataset
:param run_synthetic: boolean identifying if this execution is for synthetic data only. some rules are not
run in this scenario
:param kwargs: keyword arguments a cleaning rule may require
:return:
query_function: function that generates query_list
Expand All @@ -212,6 +215,13 @@ def infer_rule(clazz, project_id, dataset_id, sandbox_dataset_id, table_namer,
line_no: points to the source line where query_function is
"""
kwargs = get_custom_kwargs(clazz, **kwargs)

# setting default values that won't raise errors
query_function = lambda: []
setup_function = lambda client: object
function_name = ''
module_name = ''
line_no = ''
if inspect.isclass(clazz) and issubclass(clazz, BaseCleaningRule):
try:
instance = clazz(project_id,
Expand All @@ -224,28 +234,35 @@ def infer_rule(clazz, project_id, dataset_id, sandbox_dataset_id, table_namer,
f"`table_namer` property yet.")
instance = clazz(project_id, dataset_id, sandbox_dataset_id,
**kwargs)
query_function = instance.get_query_specs
setup_function = instance.setup_rule
function_name = query_function.__name__
module_name = inspect.getmodule(query_function).__name__
line_no = inspect.getsourcelines(query_function)[1]

if (run_synthetic and instance.run_for_synthetic) or not run_synthetic:
query_function = instance.get_query_specs
setup_function = instance.setup_rule
function_name = query_function.__name__
module_name = inspect.getmodule(query_function).__name__
line_no = inspect.getsourcelines(query_function)[1]
else:
function_name = clazz.__name__
module_name = inspect.getmodule(clazz).__name__
line_no = inspect.getsourcelines(clazz)[1]

def query_function():
"""
Imitates base class get_query_specs()
:return: list of query dicts generated by rule
"""
return clazz(project_id, dataset_id, sandbox_dataset_id, **kwargs)

def setup_function(client):
"""
Imitates base class setup_rule()
"""
pass
# once base classing is finished, this else logic shall be removed
if not run_synthetic:
function_name = clazz.__name__
module_name = inspect.getmodule(clazz).__name__
line_no = inspect.getsourcelines(clazz)[1]

# pylint: disable=function-redefined
def query_function():
"""
Imitates base class get_query_specs()
:return: list of query dicts generated by rule
"""
return clazz(project_id, dataset_id, sandbox_dataset_id,
**kwargs)

# pylint: disable=function-redefined
def setup_function(client):
"""
Imitates base class setup_rule()
"""
pass

rule_info = {
cdr_consts.QUERY_FUNCTION: query_function,
Expand All @@ -254,6 +271,7 @@ def setup_function(client):
cdr_consts.MODULE_NAME: module_name,
cdr_consts.LINE_NO: line_no,
}

return query_function, setup_function, rule_info


Expand All @@ -262,6 +280,7 @@ def get_query_list(project_id,
sandbox_dataset_id,
rules,
table_namer='',
run_synthetic=False,
**kwargs):
"""
Generates list of all query_dicts that will be run on the dataset
Expand All @@ -277,9 +296,13 @@ def get_query_list(project_id,
all_queries_list = []
for rule in rules:
clazz = rule[0]
query_function, _, rule_info = infer_rule(clazz, project_id, dataset_id,
query_function, _, rule_info = infer_rule(clazz,
project_id,
dataset_id,
sandbox_dataset_id,
table_namer, **kwargs)
table_namer,
run_synthetic=run_synthetic,
**kwargs)
query_list = query_function()
all_queries_list.extend(query_list)
return all_queries_list
81 changes: 57 additions & 24 deletions data_steward/cdr_cleaner/cleaning_rules/base_cleaning_rule.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,8 @@ def __init__(self,
depends_on: cleaning_class_list = None,
affected_tables: List = None,
table_namer: str = None,
table_tag: str = None):
table_tag: str = None,
run_for_synthetic: bool = False):
"""
Instantiate a cleaning rule with basic attributes.

Expand Down Expand Up @@ -227,6 +228,7 @@ def __init__(self,
self._affected_tables = affected_tables
self._table_namer = self.table_namer = table_namer
self._table_tag = table_tag
self._run_for_synthetic = run_for_synthetic

# fields jinja template
self.fields_templ = JINJA_ENV.from_string("""
Expand All @@ -237,6 +239,29 @@ def __init__(self,

self.__validate_arguments()

def __validate_argument(self, arg, arg_name, arg_type):
"""
Validate the given argument has a value of the type specified.

Prevents a lot of repetitive code.

:param arg: the argument object to validate as the type provided in arg_type
:param arg_name: the argument name that was passed for validation. useful for error messages
:param arg_type: the expected type of the argument

:raises NotImplementedError if arguments are not set.
:raises TypeError if arguments are not set to expected types
"""
if arg is None:
raise NotImplementedError(
f'{self.__class__.__name__} cleaning rule must set {arg_name} variable'
)

if not isinstance(arg, arg_type):
raise TypeError(
f'{arg_name} is expected to be a {str(arg_type)}. offending {arg_name}: <{arg}> is of type: {type(arg)}'
)

def __validate_list_of_strings(self, arg, arg_name):
"""
Validate the given argument is a list of strings.
Expand All @@ -247,22 +272,16 @@ def __validate_list_of_strings(self, arg, arg_name):
:raises NotImplementedError if arguments are not set.
:raises TypeError if arguments are not set to expected types
"""
if arg is None:
raise NotImplementedError(
'{} cleaning rule must set {} variable'.format(
self.__class__.__name__, arg_name))
self.__validate_argument(arg, arg_name, list)

if not isinstance(arg, list):
raise TypeError(
'{} is expected to be a list of strings. offending type is: {}'
.format(arg_name, type(arg)))
else:
for list_item in arg:
if not isinstance(list_item, str):
raise TypeError(
('{} is expected to be a list of strings. '
'offending list item {} is of type: {}').format(
arg_name, list_item, type(list_item)))
# if no errors are raised in validating the argurment is a list, then validate each
# list item is a string
for list_item in arg:
if not isinstance(list_item, str):
raise TypeError((
f'{arg_name} is expected to be a list of strings. '
f'offending list item {list_item} is of type: {type(list_item)}'
))

def __validate_string(self, arg, arg_name):
"""
Expand All @@ -275,15 +294,20 @@ def __validate_string(self, arg, arg_name):
:raises NotImplementedError if arguments are not set.
:raises TypeError if arguments are not set to expected types
"""
if arg is None:
raise NotImplementedError(
'{} cleaning rule must set {} variable'.format(
self.__class__.__name__, arg_name))
self.__validate_argument(arg, arg_name, str)

if not isinstance(arg, str):
raise TypeError(
'{0} is expected to be a string. offending {0}: <{1}> is of type: {2}'
.format(arg_name, arg, type(arg)))
def __validate_bool(self, arg, arg_name):
"""
Validate boolean parameters are boolean.

:param arg: The actual argument value to validate is a string.
:param arg_name: The name of the variable being validated. Used
in error messages, if needed.

:raises NotImplementedError if arguments are not set.
:raises TypeError if arguments are not set to expected types
"""
self.__validate_argument(arg, arg_name, bool)

def __validate_arguments(self):
"""
Expand Down Expand Up @@ -331,6 +355,8 @@ def __validate_arguments(self):
format(clazz))
raise TypeError(message)

self.__validate_bool(self._run_for_synthetic, 'run_for_synthetic')

@property
def depends_on_classes(self):
"""
Expand Down Expand Up @@ -410,6 +436,13 @@ def table_tag(self):
"""
return self._table_tag

@property
def run_for_synthetic(self):
"""
Get the command to run a rule for synthetic data.
"""
return self._run_for_synthetic

@affected_tables.setter
def affected_tables(self, affected_tables):
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@ def __init__(self,
project_id=project_id,
dataset_id=dataset_id,
sandbox_dataset_id=sandbox_dataset_id,
table_namer=table_namer)
table_namer=table_namer,
run_for_synthetic=True)

def get_sandbox_tablenames(self):
return [self.sandbox_table_for(table) for table in self.affected_tables]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,8 @@ def __init__(self, project_id, dataset_id, sandbox_dataset_id):
affected_tables=['observation'],
project_id=project_id,
dataset_id=dataset_id,
sandbox_dataset_id=sandbox_dataset_id)
sandbox_dataset_id=sandbox_dataset_id,
run_for_synthetic=True)

def get_query_specs(self):
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@
ON m.questionnaire_response_id = sc.survey_conduct_id
WHERE m.questionnaire_response_id IS NOT NULL
) sub
WHERE sub.survey_conduct_id = sc.survey_conduct_id
WHERE sub.survey_conduct_id = sc.survey_conduct_id
AND sc.survey_conduct_id IN (SELECT survey_conduct_id FROM `{{project_id}}.{{sandbox_dataset_id}}.{{sandbox_table_id}}`)
""")

Expand Down Expand Up @@ -103,7 +103,8 @@ def __init__(self,
project_id=project_id,
dataset_id=dataset_id,
sandbox_dataset_id=sandbox_dataset_id,
table_namer=table_namer)
table_namer=table_namer,
run_for_synthetic=True)

def get_query_specs(self, *args, **keyword_args) -> query_spec_list:
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,8 @@ def __init__(self,
SetConceptIdsForSurveyQuestionsAnswers,
UpdateFamilyHistoryCodes
],
table_namer=table_namer)
table_namer=table_namer,
run_for_synthetic=True)

def get_query_specs(self, *args, **keyword_args) -> query_spec_list:
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ def __init__(self,
project_id=project_id,
dataset_id=dataset_id,
sandbox_dataset_id=sandbox_dataset_id,
table_namer=table_namer)
table_namer=table_namer,
run_for_synthetic=True)

def get_query_specs(self, *args, **keyword_args):
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,8 @@ def __init__(self,
dataset_id=dataset_id,
sandbox_dataset_id=sandbox_dataset_id,
depends_on=[CleanMappingExtTables],
table_namer=table_namer)
table_namer=table_namer,
run_for_synthetic=False)

def get_query_specs(self):
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,8 @@ def __init__(self,
project_id=project_id,
dataset_id=dataset_id,
sandbox_dataset_id=sandbox_dataset_id,
table_namer=table_namer)
table_namer=table_namer,
run_for_synthetic=True)

def get_query_specs(self):
"""
Expand Down
Loading