From 2a33f73fe27a521aabd6f089056ab5e38b11c3e0 Mon Sep 17 00:00:00 2001 From: alxndrnh Date: Wed, 22 Mar 2023 20:09:56 +0000 Subject: [PATCH 01/18] added CopyExampleGen custom python component --- tfx_addons/copy_example_gen/README.md | 80 +++++++++++++---- tfx_addons/copy_example_gen/__init__.py | 14 +++ tfx_addons/copy_example_gen/component.py | 105 +++++++++++++++++++++++ 3 files changed, 184 insertions(+), 15 deletions(-) create mode 100644 tfx_addons/copy_example_gen/__init__.py create mode 100644 tfx_addons/copy_example_gen/component.py diff --git a/tfx_addons/copy_example_gen/README.md b/tfx_addons/copy_example_gen/README.md index 01efebbf..b1c75f8e 100644 --- a/tfx_addons/copy_example_gen/README.md +++ b/tfx_addons/copy_example_gen/README.md @@ -10,31 +10,81 @@ **Project name:** CopyExampleGen component ## Project Description -CopyExampleGen will allow the user to copy a pre-existing Tfrecord dataset or raw data and ingest it into the pipeline, ultimately skipping the process of shuffling and running the Beam job. This process will require a dict input with split_names and their respective URI. This will output an Examples Artifact (same as the Artifact output from the ExampleGen component) in which downstream components can use. +CopyExampleGen will allow the user to copy pre-existing tfrecords and ingest it into the pipeline as examples, ultimately skipping the process of shuffling and running the Beam job that is in the standard component, ExampleGen. This process will require a dict input with split names as keys and their respective URIs as the value from the user. Following suit, the component will set the artifact’s properties, generate output dict, and register contexts and execution. Lastly, it will output an Examples Artifact in which downstream components can use. + +Example of pipeline component definition: +```python + copy_example_gen = component.CopyExampleGen( + splits_dict = tfrecords_dict + ) +``` ## Project Category -Component +Addon Component ## Project Use-Case(s) -CopyExampleGen will allow the user to add a dict input with split_names as the key and their respective pre-existing Tfrecords URIs as their value, then format the director structure so that it matches that of an Example Artifact. +CopyExampleGen will replace ExampleGen when tfrecords and split names are already in the possession of the user. Hence, a Beam job will not be run nor will the tfrecords be shuffled and/ or randomized saving data ingestion pipeline process time. + +Currently, ingesting data with the ExampleGen component does not provide a way to split without random data shuffling and always runs a beam job. This component will save significant time (hours for large amounts of data) per pipeline run when a pipeline run does not require data to be shuffled. Some challenges users have had: + + 1. “Reshuffle doesn't work well with DirectRunner and causes OOMing. Users have been patching out shuffling in every release and doing it in the DB query. They have given up on Beam based ExampleGen and have created an entire custom ExampleGen that reads from the database and doesn’t use Beam”. + + 2. “When the use case is a time series problem using sliding windows, shuffling before splitting in train and eval set is counterproductive as the user would need a coherent training set”. -Currently, ingesting data with the ExampleGen requires a Beam job to be ran and requires the data to be shuffled. This component will save users hours/ days of having to create a workaround fully custom ExampleGen component. Some challenges our users have had: -Reshuffle doesn't work well with DirectRunner and causes OOMing. Users have been patching out shuffling in every release and doing it in the DB query. They have given up on Beam based ExampleGen and have created an entire custom ExampleGen that reads from the database and doesn’t use Beam. Link. -When the use case is a time series problem using sliding windows, shuffling before splitting in train and eval set is counterproductive as the user would need a coherent training set. Link. -Almost impossible to use ExampleGen based components for large datasets. Without it, Beam knows how to write to disk after transforming from input format to output format, allowing it to transform (slowly) large datasets that would otherwise not fit into memory. Link. ## Project Implementation -Use case #1 - Tfrecords as input URIs: -This component will: -1. Accept a dict i.e. {'split_name1': './path/to/split_name1/tfrecord1', 'split_name2': './path/to/split_name2/tfrecord2'} -2. Retrieve the tfrecords -3. Create an Examples Artifact, following Examples directory structure and properties required for an Examples Artifact -4. Register the Examples Artifact into MLMD -5. Output as 'examples' to be ingested from downstream components +### Component + +CopyExampleGenSpec Class: + Add parameters to following sections in CopyExampleGenSpec(types.ComponentSpec) class: + +- `PARAMETERS`: `’tfrecords_dict’: ‘ExecutionParameter(type=dict)’`. The user input dict will follow a pattern like {‘Split-name’: ‘uri_to_tfrecords_folder’} i.e. (see question #2): +```python + { + ‘train’: ‘./uri/path/to/Split_train/’, + ‘eval’: ‘./uri/path/to/Split_eval/’ + } +``` + + - `INPUTS`: will be empty since user will only have a dict + + - `OUTPUTS`: `’output_data’: ‘ChannelParameter(type=standard_artifacts.Examples)’` + +CopyExampleGen Class: + `output_data` will contain a list of Channels for each split of the data. The splits in `output_data` will be derived from the keys in the ‘tfrecords_dict’. + + +### Executor + +#### Part 1 + + Using the keys and values from `tfrecords_dict`: + 1. function `parse_tfrecords_dict(tfrecords_dict)`: determine the source (and possibly destination–see question #2) for the files in each split, building exact URIs as necessary. + 2. function `split_names(tfrecords_dict)`: parse the input into the list of split names that will become `split` properties of the output Examples artifact. Example: `[“train”,”eval”]` + + Depending on when the file copying happens (see question #1), possibly copy the files at this point. + + +#### Part 2 + + Transform the result of `parse_tfrecords_dict` we created above into an Examples Artifact. Importer Node has the functionality and process we are trying to recreate in this CopyExampleGen because it registers an external resource into MLMD and outputs the user defined Artifact type. + + This step can possibly use the [importer.generate_output_dict](https://github.com/tensorflow/tfx/blob/f8ce19339568ae58519d4eecfdd73078f80f84a2/tfx/dsl/components/common/importer.py#L153) function: + Create standard ‘output_dict’ variable. The value will be created by calling the worker function. If file copying is done before this step, this method can probably be used as is to register the artifact. + +## Open Implementation Questions + 1. There's a few open questions about how the file copying should actually done. Where does the copying that importer does actually happen? And what's the best way to change that? Are there other ways in TFX to do copying in a robust way? Maybe something in tfx.io? If there's an existing method, what has to happen in the `parse_tfrecords_dict`. Depending on the copying capabilities available, will there be a need to detect the execution environment? Does TFX rely on other tools to execute a copy that handle this? Is detection of the execution environment and the copying itself separate? What could be reused? + + - If it's not easy to detect the execution environment without also performing a copy, will the user have to specify the execution environment and therefore how to do the copy (e.g., local copy, GCS, S3). And then what's the best way to handle that? + + 2. Should the dictionary of file inputs take a path to a folder? Globs? Lists of individual files? + 3. Assuming file copying is done entirely separately, [importer.generate_output_dict](https://github.com/tensorflow/tfx/blob/f8ce19339568ae58519d4eecfdd73078f80f84a2/tfx/dsl/components/common/importer.py#L153) be used as is to register the artifacts, or does some separate code using [MLMD](https://www.tensorflow.org/tfx/guide/mlmd) in a different way need to be written ## Project Dependencies -Using: Python 3.8.2, Tensorflow 2.11.0, TFX 1.12.0 +Possibly libraries that directly access blob storage platforms, e.g. google-cloud-storage. + ## Project Team Alex Ho, alexanderho@google.com, @alxndrnh + diff --git a/tfx_addons/copy_example_gen/__init__.py b/tfx_addons/copy_example_gen/__init__.py new file mode 100644 index 00000000..5d938163 --- /dev/null +++ b/tfx_addons/copy_example_gen/__init__.py @@ -0,0 +1,14 @@ +# Copyright 2023 Google LLC. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================== \ No newline at end of file diff --git a/tfx_addons/copy_example_gen/component.py b/tfx_addons/copy_example_gen/component.py new file mode 100644 index 00000000..461d8f06 --- /dev/null +++ b/tfx_addons/copy_example_gen/component.py @@ -0,0 +1,105 @@ +# Copyright 2023 Google LLC. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""CopyExampleGen custom component. + +This component will accept tfrecord files and register them as an Examples Artifact for +downstream components to use. CopyExampleGen accepts a dictionary where keys are the split-names +and their respective value is a uri to the folder that contains the tfrecords file(s). + +User will need to create a dictionary of type Dict[str, str], in this case we will title this +dictionary 'tfrecords_dict' and assign it to a dictionary like so: + + tfrecords_dict: Dict[str, str]={ + "train":"gs://path/to/examples/Split-train/", + "eval":"gs://path/to/examples/Split-eval/" + } + +Currently tfx.dsl.components.Parameter only supports primitive types therefore, in order to properly +use CopyExampleGen, the 'input_dict' of type Dict[str, str] needs to be converted into a JSON str. +We can do this by simply using 'json.dumps()' by adding 'tfrecords_dict' in as a parameter like so: + + copy_example=component.CopyExampleGen( + input_json_str=json.dumps(tfrecords_dict) + ) + +""" +from typing import List +import json +import os + +import tensorflow as tf +from tfx import v1 as tfx +from tfx.v1.types.standard_artifacts import Examples +from tfx.dsl.component.experimental.decorators import component +from tfx.dsl.io import fileio + + +def _split_names_string_builder(split_names_list: List): + """ + _split_names_string_builder() creates a string of split-names for input to + output_example.split_names property. + + """ + + str1 = "[" + urlist_len = len(split_names_list)-1 + index = 0 + + for element in split_names_list: + if(index==urlist_len): + str1 += "\""+element+"\""+"]" + break + str1 += "\""+element+"\""+"," + index+=1 + return str1 + +@component +def CopyExampleGen( + input_json_str: tfx.dsl.components.Parameter[str], + output_example: tfx.dsl.components.OutputArtifact[Examples] + ) -> tfx.dsl.components.OutputDict(): + """ + CopyExampleGen first converts the string input to a type Dict and extracts + the keys from the dictionary, input_dict, and creates a string containing the names. + This string is assigned to the output_example.split_uri property to register split_names. + + This component then creates a directory folder for each name in split_name. + Following the creation of the `Split-name` folder, the files in the uri path will then be copied + into the designated `Split-name` folder. + + """ + + input_dict = json.loads(input_json_str) + + # Parse input_dict: creates a directory from the split-names and tfrecord uris provided + split_names=[] + for key, value in input_dict.items(): + split_names.append(key) + + split_names_string=_split_names_string_builder(split_names) + output_example.split_names=str(split_names_string) + + # Make directories + tfrecords_list=[] + output_example_uri=output_example.uri + + for key, value in input_dict.items(): + split_value=(f"/Split-{key}/") + fileio.mkdir(f"{output_example_uri}{split_value}") + tfrecords_list=fileio.glob(f"{input_dict[key]}*.gz") + + # Copy files into directories + for tfrecord in tfrecords_list: + file_name=os.path.basename(os.path.normpath(tfrecord)) + fileio.copy(tfrecord, output_example.uri+split_value+file_name, True) From c50a98d08adc148feb0602a3572b800c2591dadd Mon Sep 17 00:00:00 2001 From: alxndrnh Date: Wed, 22 Mar 2023 20:38:26 +0000 Subject: [PATCH 02/18] added CopyExampleGen custom python component and made changes to readme.md file --- tfx_addons/copy_example_gen/README.md | 36 ++++++++++----------------- 1 file changed, 13 insertions(+), 23 deletions(-) diff --git a/tfx_addons/copy_example_gen/README.md b/tfx_addons/copy_example_gen/README.md index b1c75f8e..0c54d140 100644 --- a/tfx_addons/copy_example_gen/README.md +++ b/tfx_addons/copy_example_gen/README.md @@ -15,17 +15,20 @@ CopyExampleGen will allow the user to copy pre-existing tfrecords and ingest it Example of pipeline component definition: ```python copy_example_gen = component.CopyExampleGen( - splits_dict = tfrecords_dict + input_dict = tfrecords_dict ) ``` +Currently tfx.dsl.components.Parameter only supports primitive types therefore, in order to properly use CopyExampleGen, the 'input_dict' of type Dict[str, str] needs to be converted into a JSON str. We can do this by simply using `json.dumps()` by adding 'tfrecords_dict' in as an argument. + + ## Project Category Addon Component ## Project Use-Case(s) CopyExampleGen will replace ExampleGen when tfrecords and split names are already in the possession of the user. Hence, a Beam job will not be run nor will the tfrecords be shuffled and/ or randomized saving data ingestion pipeline process time. -Currently, ingesting data with the ExampleGen component does not provide a way to split without random data shuffling and always runs a beam job. This component will save significant time (hours for large amounts of data) per pipeline run when a pipeline run does not require data to be shuffled. Some challenges users have had: +Currently, ingesting data with the ExampleGen component does not provide a way to split without random data shuffling and always runs a beam job. This component will save significant time (hours for large amounts of data) per pipeline run when a pipeline run does not require data to be shuffled. Additionally, this component will save hundreds of dollars in Dataflow consumption every time the pipeline is ran/ reran. Some challenges users have had: 1. “Reshuffle doesn't work well with DirectRunner and causes OOMing. Users have been patching out shuffling in every release and doing it in the DB query. They have given up on Beam based ExampleGen and have created an entire custom ExampleGen that reads from the database and doesn’t use Beam”. @@ -35,41 +38,28 @@ Currently, ingesting data with the ExampleGen component does not provide a way t ## Project Implementation ### Component -CopyExampleGenSpec Class: - Add parameters to following sections in CopyExampleGenSpec(types.ComponentSpec) class: - -- `PARAMETERS`: `’tfrecords_dict’: ‘ExecutionParameter(type=dict)’`. The user input dict will follow a pattern like {‘Split-name’: ‘uri_to_tfrecords_folder’} i.e. (see question #2): -```python - { - ‘train’: ‘./uri/path/to/Split_train/’, - ‘eval’: ‘./uri/path/to/Split_eval/’ - } -``` - - - `INPUTS`: will be empty since user will only have a dict - - - `OUTPUTS`: `’output_data’: ‘ChannelParameter(type=standard_artifacts.Examples)’` +Custom Python function component: CopyExampleGen -CopyExampleGen Class: - `output_data` will contain a list of Channels for each split of the data. The splits in `output_data` will be derived from the keys in the ‘tfrecords_dict’. + - `input_json_str`: will be the input parameter for CopyExampleGen of type `tfx.dsl.components.Parameter[str]`, where the user will assign their Dict[str, str] input, tfrecords_dict. However, because Python custom component development only supports primitive types, we must assign `input_json_str` to `json.dumps(tfrecords_dict)` and place the tfrecords_dict in as an argument. + - `output_example`: Output artifact can be referenced as an object of its' specified type ArtifactType in the component function being declared. For example, if the ArtifactType is Examples, one can reference properties in an Examples ArtifactType (span, version, split_names, etc.) by calling the OutputArtifact object. This will be the variable we reference to build and register our Examples Artifact after pasrsing the tfrecords_dict input. -### Executor +### Python Custom Component #### Part 1 Using the keys and values from `tfrecords_dict`: - 1. function `parse_tfrecords_dict(tfrecords_dict)`: determine the source (and possibly destination–see question #2) for the files in each split, building exact URIs as necessary. - 2. function `split_names(tfrecords_dict)`: parse the input into the list of split names that will become `split` properties of the output Examples artifact. Example: `[“train”,”eval”]` - - Depending on when the file copying happens (see question #1), possibly copy the files at this point. + 1. function `_split_names_string_builder(tfrecords_dict)`: determine the source (and possibly destination–see question #2) for the files in each split, building exact URIs as necessary. Additionally, parse the input into the list of split names that will become `split` properties of the output Examples artifact. Example: `[“train”,”eval”]` #### Part 2 Transform the result of `parse_tfrecords_dict` we created above into an Examples Artifact. Importer Node has the functionality and process we are trying to recreate in this CopyExampleGen because it registers an external resource into MLMD and outputs the user defined Artifact type. + Using fileio.mkdir and fileio.copy,, the component will then create a directory folder for each name in `split_name`. Following the creation of the `Split-name` folder, the files in the uri path will then be copied into the designated `Split-name` folder. + + Thoughts from original implementation in phase 1: This step can possibly use the [importer.generate_output_dict](https://github.com/tensorflow/tfx/blob/f8ce19339568ae58519d4eecfdd73078f80f84a2/tfx/dsl/components/common/importer.py#L153) function: Create standard ‘output_dict’ variable. The value will be created by calling the worker function. If file copying is done before this step, this method can probably be used as is to register the artifact. From d74a7fec3f97c8f101ef9e3fcada85f1cd5d422d Mon Sep 17 00:00:00 2001 From: alxndrnh Date: Wed, 22 Mar 2023 20:51:54 +0000 Subject: [PATCH 03/18] changes to format --- tfx_addons/copy_example_gen/component.py | 95 ++++++++++++------------ 1 file changed, 48 insertions(+), 47 deletions(-) diff --git a/tfx_addons/copy_example_gen/component.py b/tfx_addons/copy_example_gen/component.py index 461d8f06..f5aae68b 100644 --- a/tfx_addons/copy_example_gen/component.py +++ b/tfx_addons/copy_example_gen/component.py @@ -34,72 +34,73 @@ ) """ -from typing import List import json import os +from typing import List import tensorflow as tf from tfx import v1 as tfx -from tfx.v1.types.standard_artifacts import Examples from tfx.dsl.component.experimental.decorators import component from tfx.dsl.io import fileio +from tfx.v1.types.standard_artifacts import Examples def _split_names_string_builder(split_names_list: List): - """ - _split_names_string_builder() creates a string of split-names for input to - output_example.split_names property. - """ + """ + _split_names_string_builder() creates a string of split-names for input to + output_example.split_names property. + + """ - str1 = "[" - urlist_len = len(split_names_list)-1 - index = 0 + str1 = "[" + urlist_len = len(split_names_list)-1 + index = 0 - for element in split_names_list: - if(index==urlist_len): - str1 += "\""+element+"\""+"]" - break - str1 += "\""+element+"\""+"," - index+=1 - return str1 + for element in split_names_list: + if(index==urlist_len): + str1 += "\""+element+"\""+"]" + break + str1 += "\""+element+"\""+"," + index+=1 + return str1 @component def CopyExampleGen( - input_json_str: tfx.dsl.components.Parameter[str], - output_example: tfx.dsl.components.OutputArtifact[Examples] - ) -> tfx.dsl.components.OutputDict(): - """ - CopyExampleGen first converts the string input to a type Dict and extracts - the keys from the dictionary, input_dict, and creates a string containing the names. - This string is assigned to the output_example.split_uri property to register split_names. + input_json_str: tfx.dsl.components.Parameter[str], + output_example: tfx.dsl.components.OutputArtifact[Examples] +) -> tfx.dsl.components.OutputDict(): + """ + CopyExampleGen first converts the string input to a type Dict and extracts + the keys from the dictionary, input_dict, and creates a string containing the names. + This string is assigned to the output_example.split_uri property to register split_names. - This component then creates a directory folder for each name in split_name. - Following the creation of the `Split-name` folder, the files in the uri path will then be copied - into the designated `Split-name` folder. + This component then creates a directory folder for each name in split_name. + Following the creation of the `Split-name` folder, the files in the uri path will then be copied + into the designated `Split-name` folder. - """ + """ - input_dict = json.loads(input_json_str) + input_dict = json.loads(input_json_str) - # Parse input_dict: creates a directory from the split-names and tfrecord uris provided - split_names=[] - for key, value in input_dict.items(): - split_names.append(key) + # Parse input_dict: creates a directory from the split-names and tfrecord uris provided + split_names=[] + for key, value in input_dict.items(): + split_names.append(key) - split_names_string=_split_names_string_builder(split_names) - output_example.split_names=str(split_names_string) + split_names_string=_split_names_string_builder(split_names) + output_example.split_names=str(split_names_string) - # Make directories - tfrecords_list=[] - output_example_uri=output_example.uri - - for key, value in input_dict.items(): - split_value=(f"/Split-{key}/") - fileio.mkdir(f"{output_example_uri}{split_value}") - tfrecords_list=fileio.glob(f"{input_dict[key]}*.gz") - - # Copy files into directories - for tfrecord in tfrecords_list: - file_name=os.path.basename(os.path.normpath(tfrecord)) - fileio.copy(tfrecord, output_example.uri+split_value+file_name, True) + # Make directories + tfrecords_list=[] + output_example_uri=output_example.uri + + for key, value in input_dict.items(): + split_value=(f"/Split-{key}/") + fileio.mkdir(f"{output_example_uri}{split_value}") + tfrecords_list=fileio.glob(f"{input_dict[key]}*.gz") + + # Copy files into directories + for tfrecord in tfrecords_list: + file_name=os.path.basename(os.path.normpath(tfrecord)) + fileio.copy(tfrecord, output_example.uri+split_value+file_name, True) From 88ec64d76147ff9e8f346e257e4935d55b986d1b Mon Sep 17 00:00:00 2001 From: alxndrnh Date: Wed, 22 Mar 2023 20:56:21 +0000 Subject: [PATCH 04/18] changes to format --- tfx_addons/copy_example_gen/component.py | 29 ++++++++++++------------ 1 file changed, 15 insertions(+), 14 deletions(-) diff --git a/tfx_addons/copy_example_gen/component.py b/tfx_addons/copy_example_gen/component.py index f5aae68b..6c462236 100644 --- a/tfx_addons/copy_example_gen/component.py +++ b/tfx_addons/copy_example_gen/component.py @@ -46,7 +46,6 @@ def _split_names_string_builder(split_names_list: List): - """ _split_names_string_builder() creates a string of split-names for input to output_example.split_names property. @@ -54,17 +53,18 @@ def _split_names_string_builder(split_names_list: List): """ str1 = "[" - urlist_len = len(split_names_list)-1 + urlist_len = len(split_names_list) - 1 index = 0 for element in split_names_list: - if(index==urlist_len): - str1 += "\""+element+"\""+"]" + if(index == urlist_len): + str1 += "\"" + element + "\"" + "]" break - str1 += "\""+element+"\""+"," - index+=1 + str1 += "\"" + element + "\"" + "," + index += 1 return str1 + @component def CopyExampleGen( input_json_str: tfx.dsl.components.Parameter[str], @@ -84,23 +84,24 @@ def CopyExampleGen( input_dict = json.loads(input_json_str) # Parse input_dict: creates a directory from the split-names and tfrecord uris provided - split_names=[] + split_names = [] for key, value in input_dict.items(): split_names.append(key) - split_names_string=_split_names_string_builder(split_names) - output_example.split_names=str(split_names_string) + split_names_string = _split_names_string_builder(split_names) + output_example.split_names = str(split_names_string) # Make directories - tfrecords_list=[] - output_example_uri=output_example.uri + tfrecords_list = [] + output_example_uri = output_example.uri for key, value in input_dict.items(): split_value=(f"/Split-{key}/") fileio.mkdir(f"{output_example_uri}{split_value}") - tfrecords_list=fileio.glob(f"{input_dict[key]}*.gz") + tfrecords_list = fileio.glob(f"{input_dict[key]}*.gz") # Copy files into directories for tfrecord in tfrecords_list: - file_name=os.path.basename(os.path.normpath(tfrecord)) - fileio.copy(tfrecord, output_example.uri+split_value+file_name, True) + file_name = os.path.basename(os.path.normpath(tfrecord)) + fileio.copy(tfrecord, output_example.uri + split_value + file_name, + True) From 5151e3767edb872dacdfec658bcfba19f3aab446 Mon Sep 17 00:00:00 2001 From: alxndrnh Date: Wed, 22 Mar 2023 20:59:31 +0000 Subject: [PATCH 05/18] changes to format --- tfx_addons/copy_example_gen/component.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/tfx_addons/copy_example_gen/component.py b/tfx_addons/copy_example_gen/component.py index 6c462236..0ba4a78b 100644 --- a/tfx_addons/copy_example_gen/component.py +++ b/tfx_addons/copy_example_gen/component.py @@ -57,9 +57,9 @@ def _split_names_string_builder(split_names_list: List): index = 0 for element in split_names_list: - if(index == urlist_len): - str1 += "\"" + element + "\"" + "]" - break + if (index == urlist_len): + str1 += "\"" + element + "\"" + "]" + break str1 += "\"" + element + "\"" + "," index += 1 return str1 @@ -79,8 +79,8 @@ def CopyExampleGen( Following the creation of the `Split-name` folder, the files in the uri path will then be copied into the designated `Split-name` folder. - """ - + """ + input_dict = json.loads(input_json_str) # Parse input_dict: creates a directory from the split-names and tfrecord uris provided @@ -90,13 +90,13 @@ def CopyExampleGen( split_names_string = _split_names_string_builder(split_names) output_example.split_names = str(split_names_string) - + # Make directories tfrecords_list = [] output_example_uri = output_example.uri for key, value in input_dict.items(): - split_value=(f"/Split-{key}/") + split_value = (f"/Split-{key}/") fileio.mkdir(f"{output_example_uri}{split_value}") tfrecords_list = fileio.glob(f"{input_dict[key]}*.gz") From 82fb9c7ed375225cbebd0712aadeb9f236130cd8 Mon Sep 17 00:00:00 2001 From: alxndrnh Date: Wed, 22 Mar 2023 21:11:15 +0000 Subject: [PATCH 06/18] changes to format --- tfx_addons/copy_example_gen/component.py | 36 +++++++++++++----------- 1 file changed, 19 insertions(+), 17 deletions(-) diff --git a/tfx_addons/copy_example_gen/component.py b/tfx_addons/copy_example_gen/component.py index 0ba4a78b..25994a4c 100644 --- a/tfx_addons/copy_example_gen/component.py +++ b/tfx_addons/copy_example_gen/component.py @@ -13,24 +13,26 @@ # limitations under the License. """CopyExampleGen custom component. -This component will accept tfrecord files and register them as an Examples Artifact for -downstream components to use. CopyExampleGen accepts a dictionary where keys are the split-names -and their respective value is a uri to the folder that contains the tfrecords file(s). +This component will accept tfrecord files and register them as an +Examples Artifact for downstream components to use. CopyExampleGen accepts +a dictionary where keys are the split-names and their respective value is a +uri to the folder that contains the tfrecords file(s). -User will need to create a dictionary of type Dict[str, str], in this case we will title this -dictionary 'tfrecords_dict' and assign it to a dictionary like so: +User will need to create a dictionary of type Dict[str, str], in this case +we will title this dictionary 'tfrecords_dict' and assign it to a dictionary: tfrecords_dict: Dict[str, str]={ "train":"gs://path/to/examples/Split-train/", "eval":"gs://path/to/examples/Split-eval/" } -Currently tfx.dsl.components.Parameter only supports primitive types therefore, in order to properly -use CopyExampleGen, the 'input_dict' of type Dict[str, str] needs to be converted into a JSON str. -We can do this by simply using 'json.dumps()' by adding 'tfrecords_dict' in as a parameter like so: +Currently tfx.dsl.components.Parameter only supports primitive types therefore, +in order to properly use CopyExampleGen, the 'input_dict' of type Dict[str, str] +needs to be converted into a JSON str. We can do this by simply using 'json.dumps()' +by adding 'tfrecords_dict' in as a parameter like so: copy_example=component.CopyExampleGen( - input_json_str=json.dumps(tfrecords_dict) + input_json_str=json.dumps(tfrecords_dict) ) """ @@ -38,7 +40,6 @@ import os from typing import List -import tensorflow as tf from tfx import v1 as tfx from tfx.dsl.component.experimental.decorators import component from tfx.dsl.io import fileio @@ -57,7 +58,7 @@ def _split_names_string_builder(split_names_list: List): index = 0 for element in split_names_list: - if (index == urlist_len): + if index == urlist_len: str1 += "\"" + element + "\"" + "]" break str1 += "\"" + element + "\"" + "," @@ -72,12 +73,13 @@ def CopyExampleGen( ) -> tfx.dsl.components.OutputDict(): """ CopyExampleGen first converts the string input to a type Dict and extracts - the keys from the dictionary, input_dict, and creates a string containing the names. - This string is assigned to the output_example.split_uri property to register split_names. - + the keys from the dictionary, input_dict, and creates a string containing + the names. This string is assigned to the output_example.split_uri property + to register split_names. + This component then creates a directory folder for each name in split_name. - Following the creation of the `Split-name` folder, the files in the uri path will then be copied - into the designated `Split-name` folder. + Following the creation of the `Split-name` folder, the files in the uri path + will then be copied into the designated `Split-name` folder. """ @@ -85,7 +87,7 @@ def CopyExampleGen( # Parse input_dict: creates a directory from the split-names and tfrecord uris provided split_names = [] - for key, value in input_dict.items(): + for key in input_dict.items(): split_names.append(key) split_names_string = _split_names_string_builder(split_names) From 3452389124abab318c931fa0ff19a5ca8330a2e5 Mon Sep 17 00:00:00 2001 From: alxndrnh Date: Wed, 22 Mar 2023 21:22:27 +0000 Subject: [PATCH 07/18] changes to format --- tfx_addons/copy_example_gen/__init__.py | 2 +- tfx_addons/copy_example_gen/component.py | 10 +++++----- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/tfx_addons/copy_example_gen/__init__.py b/tfx_addons/copy_example_gen/__init__.py index 5d938163..094b98fd 100644 --- a/tfx_addons/copy_example_gen/__init__.py +++ b/tfx_addons/copy_example_gen/__init__.py @@ -11,4 +11,4 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -# ============================================================================== \ No newline at end of file +# ============================================================================== diff --git a/tfx_addons/copy_example_gen/component.py b/tfx_addons/copy_example_gen/component.py index 25994a4c..0523200b 100644 --- a/tfx_addons/copy_example_gen/component.py +++ b/tfx_addons/copy_example_gen/component.py @@ -28,8 +28,8 @@ Currently tfx.dsl.components.Parameter only supports primitive types therefore, in order to properly use CopyExampleGen, the 'input_dict' of type Dict[str, str] -needs to be converted into a JSON str. We can do this by simply using 'json.dumps()' -by adding 'tfrecords_dict' in as a parameter like so: +needs to be converted into a JSON str. We can do this by simply using +'json.dumps()' by adding 'tfrecords_dict' in as a parameter like so: copy_example=component.CopyExampleGen( input_json_str=json.dumps(tfrecords_dict) @@ -97,10 +97,10 @@ def CopyExampleGen( tfrecords_list = [] output_example_uri = output_example.uri - for key, value in input_dict.items(): - split_value = (f"/Split-{key}/") + for split in input_dict.items(): + split_value = (f"/Split-{split}/") fileio.mkdir(f"{output_example_uri}{split_value}") - tfrecords_list = fileio.glob(f"{input_dict[key]}*.gz") + tfrecords_list = fileio.glob(f"{input_dict[split]}*.gz") # Copy files into directories for tfrecord in tfrecords_list: From 38ccf784fd2e50aee28145ffb30a77b3bc73d9d8 Mon Sep 17 00:00:00 2001 From: alxndrnh Date: Wed, 22 Mar 2023 21:31:06 +0000 Subject: [PATCH 08/18] changes to format --- tfx_addons/copy_example_gen/component.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tfx_addons/copy_example_gen/component.py b/tfx_addons/copy_example_gen/component.py index 0523200b..8b12a29a 100644 --- a/tfx_addons/copy_example_gen/component.py +++ b/tfx_addons/copy_example_gen/component.py @@ -14,7 +14,7 @@ """CopyExampleGen custom component. This component will accept tfrecord files and register them as an -Examples Artifact for downstream components to use. CopyExampleGen accepts +Examples Artifact for downstream components to use. CopyExampleGen accepts a dictionary where keys are the split-names and their respective value is a uri to the folder that contains the tfrecords file(s). From 81bf3e259b32eed2eaec97ef46dd2a12cadfff81 Mon Sep 17 00:00:00 2001 From: alxndrnh Date: Thu, 23 Mar 2023 14:29:20 +0000 Subject: [PATCH 09/18] Final commit for CopyExampleGen: PASSING --- tfx_addons/copy_example_gen/component.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tfx_addons/copy_example_gen/component.py b/tfx_addons/copy_example_gen/component.py index 8b12a29a..5adb8c1b 100644 --- a/tfx_addons/copy_example_gen/component.py +++ b/tfx_addons/copy_example_gen/component.py @@ -67,7 +67,7 @@ def _split_names_string_builder(split_names_list: List): @component -def CopyExampleGen( +def CopyExampleGen( # pylint: disable=C0103 input_json_str: tfx.dsl.components.Parameter[str], output_example: tfx.dsl.components.OutputArtifact[Examples] ) -> tfx.dsl.components.OutputDict(): From 77ed56882c99a9bfd8efa67beec93d7d26662629 Mon Sep 17 00:00:00 2001 From: alxndrnh Date: Thu, 23 Mar 2023 15:25:54 +0000 Subject: [PATCH 10/18] Delete .items after calling dictionary --- tfx_addons/copy_example_gen/component.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tfx_addons/copy_example_gen/component.py b/tfx_addons/copy_example_gen/component.py index 5adb8c1b..fbfb97d5 100644 --- a/tfx_addons/copy_example_gen/component.py +++ b/tfx_addons/copy_example_gen/component.py @@ -87,7 +87,7 @@ def CopyExampleGen( # pylint: disable=C0103 # Parse input_dict: creates a directory from the split-names and tfrecord uris provided split_names = [] - for key in input_dict.items(): + for key in input_dict: split_names.append(key) split_names_string = _split_names_string_builder(split_names) @@ -97,7 +97,7 @@ def CopyExampleGen( # pylint: disable=C0103 tfrecords_list = [] output_example_uri = output_example.uri - for split in input_dict.items(): + for split in input_dict: split_value = (f"/Split-{split}/") fileio.mkdir(f"{output_example_uri}{split_value}") tfrecords_list = fileio.glob(f"{input_dict[split]}*.gz") From 0594a084046bed9a7dcd320910d9ec551006d3f1 Mon Sep 17 00:00:00 2001 From: alxndrnh Date: Fri, 24 Mar 2023 17:18:31 +0000 Subject: [PATCH 11/18] Reviewed Michael's comments and implemented all changes --- tfx_addons/copy_example_gen/README.md | 22 +++++---- tfx_addons/copy_example_gen/component.py | 60 +++++++++++++----------- 2 files changed, 45 insertions(+), 37 deletions(-) diff --git a/tfx_addons/copy_example_gen/README.md b/tfx_addons/copy_example_gen/README.md index 0c54d140..f275d191 100644 --- a/tfx_addons/copy_example_gen/README.md +++ b/tfx_addons/copy_example_gen/README.md @@ -14,8 +14,13 @@ CopyExampleGen will allow the user to copy pre-existing tfrecords and ingest it Example of pipeline component definition: ```python +tfrecord_dict : Dict[str, str] = { + "train" : "gs://path/to/tfrecords/examples/Split-train/", + "eval" : "gs://path/to/tfrecords/examples/Split-eval/" +} + copy_example_gen = component.CopyExampleGen( - input_dict = tfrecords_dict + input_dict = json.dumps(tfrecords_dict) ) ``` @@ -28,7 +33,7 @@ Addon Component ## Project Use-Case(s) CopyExampleGen will replace ExampleGen when tfrecords and split names are already in the possession of the user. Hence, a Beam job will not be run nor will the tfrecords be shuffled and/ or randomized saving data ingestion pipeline process time. -Currently, ingesting data with the ExampleGen component does not provide a way to split without random data shuffling and always runs a beam job. This component will save significant time (hours for large amounts of data) per pipeline run when a pipeline run does not require data to be shuffled. Additionally, this component will save hundreds of dollars in Dataflow consumption every time the pipeline is ran/ reran. Some challenges users have had: +Currently, ingesting data with the ExampleGen component does not provide a way to split without random data shuffling and always runs a beam job. This component will save significant time (hours for large amounts of data) per pipeline run when a pipeline run does not require data to be shuffled. Some challenges users have had: 1. “Reshuffle doesn't work well with DirectRunner and causes OOMing. Users have been patching out shuffling in every release and doing it in the DB query. They have given up on Beam based ExampleGen and have created an entire custom ExampleGen that reads from the database and doesn’t use Beam”. @@ -45,25 +50,25 @@ Custom Python function component: CopyExampleGen - `output_example`: Output artifact can be referenced as an object of its' specified type ArtifactType in the component function being declared. For example, if the ArtifactType is Examples, one can reference properties in an Examples ArtifactType (span, version, split_names, etc.) by calling the OutputArtifact object. This will be the variable we reference to build and register our Examples Artifact after pasrsing the tfrecords_dict input. -### Python Custom Component +### Python Custom Component Implementation Details #### Part 1 Using the keys and values from `tfrecords_dict`: - 1. function `_split_names_string_builder(tfrecords_dict)`: determine the source (and possibly destination–see question #2) for the files in each split, building exact URIs as necessary. Additionally, parse the input into the list of split names that will become `split` properties of the output Examples artifact. Example: `[“train”,”eval”]` + 1. function `_split_names_property_builder(tfrecords_dict)`: determine the source (and possibly destination–see question #2) for the files in each split, building exact URIs as necessary. Additionally, parse the input into the list of split names that will become `split` properties of the output Examples artifact. Example: `[“train”,”eval”]` #### Part 2 - Transform the result of `parse_tfrecords_dict` we created above into an Examples Artifact. Importer Node has the functionality and process we are trying to recreate in this CopyExampleGen because it registers an external resource into MLMD and outputs the user defined Artifact type. + Transform the result of `_split_names_property_builder` we created above into an Examples Artifact. - Using fileio.mkdir and fileio.copy,, the component will then create a directory folder for each name in `split_name`. Following the creation of the `Split-name` folder, the files in the uri path will then be copied into the designated `Split-name` folder. + Using fileio.mkdir and fileio.copy, the component will then create a directory folder for each name in `split_name`. Following the creation of the `Split-name` folder, the files in the uri path will then be copied into the designated `Split-name` folder. Thoughts from original implementation in phase 1: This step can possibly use the [importer.generate_output_dict](https://github.com/tensorflow/tfx/blob/f8ce19339568ae58519d4eecfdd73078f80f84a2/tfx/dsl/components/common/importer.py#L153) function: Create standard ‘output_dict’ variable. The value will be created by calling the worker function. If file copying is done before this step, this method can probably be used as is to register the artifact. -## Open Implementation Questions +## Possible Future Development Directions 1. There's a few open questions about how the file copying should actually done. Where does the copying that importer does actually happen? And what's the best way to change that? Are there other ways in TFX to do copying in a robust way? Maybe something in tfx.io? If there's an existing method, what has to happen in the `parse_tfrecords_dict`. Depending on the copying capabilities available, will there be a need to detect the execution environment? Does TFX rely on other tools to execute a copy that handle this? Is detection of the execution environment and the copying itself separate? What could be reused? - If it's not easy to detect the execution environment without also performing a copy, will the user have to specify the execution environment and therefore how to do the copy (e.g., local copy, GCS, S3). And then what's the best way to handle that? @@ -71,9 +76,6 @@ Custom Python function component: CopyExampleGen 2. Should the dictionary of file inputs take a path to a folder? Globs? Lists of individual files? 3. Assuming file copying is done entirely separately, [importer.generate_output_dict](https://github.com/tensorflow/tfx/blob/f8ce19339568ae58519d4eecfdd73078f80f84a2/tfx/dsl/components/common/importer.py#L153) be used as is to register the artifacts, or does some separate code using [MLMD](https://www.tensorflow.org/tfx/guide/mlmd) in a different way need to be written -## Project Dependencies -Possibly libraries that directly access blob storage platforms, e.g. google-cloud-storage. - ## Project Team Alex Ho, alexanderho@google.com, @alxndrnh diff --git a/tfx_addons/copy_example_gen/component.py b/tfx_addons/copy_example_gen/component.py index fbfb97d5..a77e2e69 100644 --- a/tfx_addons/copy_example_gen/component.py +++ b/tfx_addons/copy_example_gen/component.py @@ -46,24 +46,26 @@ from tfx.v1.types.standard_artifacts import Examples -def _split_names_string_builder(split_names_list: List): +def _split_names_property_builder(split_names_list: List): """ - _split_names_string_builder() creates a string of split-names for input to - output_example.split_names property. + _split_names_property_builder() creates a unique string of split-names for + input to output_example.split_names property like so: '["train","eval"]' + + This format is unique and required for ExampleArtifact property 'split_names' """ - str1 = "[" - urlist_len = len(split_names_list) - 1 + property_split_names = "[" + property_split_names_len = len(split_names_list) - 1 index = 0 - for element in split_names_list: - if index == urlist_len: - str1 += "\"" + element + "\"" + "]" + for name in split_names_list: + if index == property_split_names_len: + property_split_names = (f'{property_split_names}"\""{name}"\""]') break - str1 += "\"" + element + "\"" + "," + property_split_names = (f'{property_split_names}"\""{name}"\"",') index += 1 - return str1 + return property_split_names @component @@ -75,7 +77,7 @@ def CopyExampleGen( # pylint: disable=C0103 CopyExampleGen first converts the string input to a type Dict and extracts the keys from the dictionary, input_dict, and creates a string containing the names. This string is assigned to the output_example.split_uri property - to register split_names. + to register split_names property. This component then creates a directory folder for each name in split_name. Following the creation of the `Split-name` folder, the files in the uri path @@ -83,27 +85,31 @@ def CopyExampleGen( # pylint: disable=C0103 """ + # Convert primitive type str to Dict[str, str] input_dict = json.loads(input_json_str) - # Parse input_dict: creates a directory from the split-names and tfrecord uris provided + # Creates directories from the split-names and tfrecord uris provided into + # output_example.split_names property split_names = [] - for key in input_dict: - split_names.append(key) + tfrecords_list = [] + + for split_label, split_tfrecords_uri in input_dict.items(): + split_names.append(split_label) - split_names_string = _split_names_string_builder(split_names) - output_example.split_names = str(split_names_string) + # Build split_names in required Examples Artifact properties format + artifact_property_split_names = _split_names_property_builder(split_names) + output_example.split_names = str(artifact_property_split_names) - # Make directories - tfrecords_list = [] + # Create Split-name folder name and create directory output_example_uri = output_example.uri + split_value = (f"/Split-{split_label}/") + fileio.mkdir(f"{output_example_uri}{split_value}") - for split in input_dict: - split_value = (f"/Split-{split}/") - fileio.mkdir(f"{output_example_uri}{split_value}") - tfrecords_list = fileio.glob(f"{input_dict[split]}*.gz") + # Pull all files from uri + tfrecords_list = fileio.glob(f"{split_tfrecords_uri}*.gz") - # Copy files into directories - for tfrecord in tfrecords_list: - file_name = os.path.basename(os.path.normpath(tfrecord)) - fileio.copy(tfrecord, output_example.uri + split_value + file_name, - True) + # Copy files into folder directories + for tfrecord in tfrecords_list: + file_name = os.path.basename(os.path.normpath(tfrecord)) + file_destination = (f"{output_example_uri}{split_value}{file_name}") + fileio.copy(tfrecord, file_destination, True) \ No newline at end of file From 4099118476a7e41d5c70820c4289ed9bc0fcc47b Mon Sep 17 00:00:00 2001 From: alxndrnh Date: Fri, 24 Mar 2023 17:28:38 +0000 Subject: [PATCH 12/18] Reviewed Michael's comments and implemented all changes --- tfx_addons/copy_example_gen/component.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tfx_addons/copy_example_gen/component.py b/tfx_addons/copy_example_gen/component.py index a77e2e69..10b4e707 100644 --- a/tfx_addons/copy_example_gen/component.py +++ b/tfx_addons/copy_example_gen/component.py @@ -61,9 +61,9 @@ def _split_names_property_builder(split_names_list: List): for name in split_names_list: if index == property_split_names_len: - property_split_names = (f'{property_split_names}"\""{name}"\""]') + property_split_names = (f'{property_split_names}"{name}"]') break - property_split_names = (f'{property_split_names}"\""{name}"\"",') + property_split_names = (f'{property_split_names}"{name}",') index += 1 return property_split_names From 815636ba0ff54b649a86694ff4b10fc5f119456b Mon Sep 17 00:00:00 2001 From: alxndrnh Date: Fri, 24 Mar 2023 17:59:04 +0000 Subject: [PATCH 13/18] Reviewed Michael's comments and implemented all changes. Tested in Vertex with StatisticsGen, Trainer, and Pusher --- tfx_addons/copy_example_gen/component.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tfx_addons/copy_example_gen/component.py b/tfx_addons/copy_example_gen/component.py index 10b4e707..c25dafd6 100644 --- a/tfx_addons/copy_example_gen/component.py +++ b/tfx_addons/copy_example_gen/component.py @@ -112,4 +112,5 @@ def CopyExampleGen( # pylint: disable=C0103 for tfrecord in tfrecords_list: file_name = os.path.basename(os.path.normpath(tfrecord)) file_destination = (f"{output_example_uri}{split_value}{file_name}") - fileio.copy(tfrecord, file_destination, True) \ No newline at end of file + fileio.copy(tfrecord, file_destination, True) + \ No newline at end of file From 339324b836c8595a250d343a0a38e0754c3ab538 Mon Sep 17 00:00:00 2001 From: alxndrnh Date: Fri, 24 Mar 2023 18:03:53 +0000 Subject: [PATCH 14/18] Reviewed Michael's comments and implemented all changes. Tested in Vertex with StatisticsGen, Trainer, and Pusher --- tfx_addons/copy_example_gen/component.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/tfx_addons/copy_example_gen/component.py b/tfx_addons/copy_example_gen/component.py index c25dafd6..10b4e707 100644 --- a/tfx_addons/copy_example_gen/component.py +++ b/tfx_addons/copy_example_gen/component.py @@ -112,5 +112,4 @@ def CopyExampleGen( # pylint: disable=C0103 for tfrecord in tfrecords_list: file_name = os.path.basename(os.path.normpath(tfrecord)) file_destination = (f"{output_example_uri}{split_value}{file_name}") - fileio.copy(tfrecord, file_destination, True) - \ No newline at end of file + fileio.copy(tfrecord, file_destination, True) \ No newline at end of file From 3499a940fb765d1af19d91b990a6d0fef5ee7299 Mon Sep 17 00:00:00 2001 From: alxndrnh Date: Fri, 24 Mar 2023 18:05:21 +0000 Subject: [PATCH 15/18] Reviewed Michael's comments and implemented all changes. Tested in Vertex with StatisticsGen, Trainer, and Pusher --- tfx_addons/copy_example_gen/component.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tfx_addons/copy_example_gen/component.py b/tfx_addons/copy_example_gen/component.py index 10b4e707..87de0cf9 100644 --- a/tfx_addons/copy_example_gen/component.py +++ b/tfx_addons/copy_example_gen/component.py @@ -112,4 +112,4 @@ def CopyExampleGen( # pylint: disable=C0103 for tfrecord in tfrecords_list: file_name = os.path.basename(os.path.normpath(tfrecord)) file_destination = (f"{output_example_uri}{split_value}{file_name}") - fileio.copy(tfrecord, file_destination, True) \ No newline at end of file + fileio.copy(tfrecord, file_destination, True) From 9302962510d155f8cf7a59c9f755e60f14001b86 Mon Sep 17 00:00:00 2001 From: alxndrnh Date: Thu, 13 Apr 2023 19:09:08 +0000 Subject: [PATCH 16/18] component.py split_names parse change and readme.md file --- tfx_addons/copy_example_gen/README.md | 17 +++---- tfx_addons/copy_example_gen/__init__.py | 2 +- tfx_addons/copy_example_gen/component.py | 56 +++++++----------------- 3 files changed, 23 insertions(+), 52 deletions(-) diff --git a/tfx_addons/copy_example_gen/README.md b/tfx_addons/copy_example_gen/README.md index f275d191..f533fb13 100644 --- a/tfx_addons/copy_example_gen/README.md +++ b/tfx_addons/copy_example_gen/README.md @@ -10,7 +10,7 @@ **Project name:** CopyExampleGen component ## Project Description -CopyExampleGen will allow the user to copy pre-existing tfrecords and ingest it into the pipeline as examples, ultimately skipping the process of shuffling and running the Beam job that is in the standard component, ExampleGen. This process will require a dict input with split names as keys and their respective URIs as the value from the user. Following suit, the component will set the artifact’s properties, generate output dict, and register contexts and execution. Lastly, it will output an Examples Artifact in which downstream components can use. +CopyExampleGen will allow the user to copy pre-existing tfrecords and ingest it into the pipeline as examples, ultimately skipping the process of shuffling and running the Beam job that is in the standard component, ExampleGen. This process will require a dict input with split names as keys and their respective URIs as the value from the user. Following suit, the component will set the artifact’s properties, generate output dict, and register contexts and execution for downstream components to use. Lastly, tfrecord file(s) in uri must resemble same `.gz` file format as the output of ExampleGen component. Example of pipeline component definition: ```python @@ -24,7 +24,7 @@ tfrecord_dict : Dict[str, str] = { ) ``` -Currently tfx.dsl.components.Parameter only supports primitive types therefore, in order to properly use CopyExampleGen, the 'input_dict' of type Dict[str, str] needs to be converted into a JSON str. We can do this by simply using `json.dumps()` by adding 'tfrecords_dict' in as an argument. +As of April 10th, 2023, tfx.dsl.components.Parameter only supports primitive types therefore, in order to properly use CopyExampleGen, the 'input_dict' of type Dict[str, str] needs to be converted into a JSON str. We can do this by simply using `json.dumps()` by adding 'tfrecords_dict' in as an argument. ## Project Category @@ -52,21 +52,14 @@ Custom Python function component: CopyExampleGen ### Python Custom Component Implementation Details -#### Part 1 - - Using the keys and values from `tfrecords_dict`: - 1. function `_split_names_property_builder(tfrecords_dict)`: determine the source (and possibly destination–see question #2) for the files in each split, building exact URIs as necessary. Additionally, parse the input into the list of split names that will become `split` properties of the output Examples artifact. Example: `[“train”,”eval”]` - - -#### Part 2 - - Transform the result of `_split_names_property_builder` we created above into an Examples Artifact. - Using fileio.mkdir and fileio.copy, the component will then create a directory folder for each name in `split_name`. Following the creation of the `Split-name` folder, the files in the uri path will then be copied into the designated `Split-name` folder. Thoughts from original implementation in phase 1: This step can possibly use the [importer.generate_output_dict](https://github.com/tensorflow/tfx/blob/f8ce19339568ae58519d4eecfdd73078f80f84a2/tfx/dsl/components/common/importer.py#L153) function: Create standard ‘output_dict’ variable. The value will be created by calling the worker function. If file copying is done before this step, this method can probably be used as is to register the artifact. + + Using the keys and values from `tfrecords_dict`: + Parse the input_dict.keys() to a str to resemble the necessary format of property `split-names` i.e. '["train","eval"]' ## Possible Future Development Directions 1. There's a few open questions about how the file copying should actually done. Where does the copying that importer does actually happen? And what's the best way to change that? Are there other ways in TFX to do copying in a robust way? Maybe something in tfx.io? If there's an existing method, what has to happen in the `parse_tfrecords_dict`. Depending on the copying capabilities available, will there be a need to detect the execution environment? Does TFX rely on other tools to execute a copy that handle this? Is detection of the execution environment and the copying itself separate? What could be reused? diff --git a/tfx_addons/copy_example_gen/__init__.py b/tfx_addons/copy_example_gen/__init__.py index 094b98fd..819c0e8c 100644 --- a/tfx_addons/copy_example_gen/__init__.py +++ b/tfx_addons/copy_example_gen/__init__.py @@ -1,4 +1,4 @@ -# Copyright 2023 Google LLC. All Rights Reserved. +# Copyright 2023 The TensorFlow Authors. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. diff --git a/tfx_addons/copy_example_gen/component.py b/tfx_addons/copy_example_gen/component.py index 87de0cf9..7e271d9d 100644 --- a/tfx_addons/copy_example_gen/component.py +++ b/tfx_addons/copy_example_gen/component.py @@ -1,4 +1,4 @@ -# Copyright 2023 Google LLC. All Rights Reserved. +# Copyright 2023 The TensorFlow Authors. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -18,6 +18,9 @@ a dictionary where keys are the split-names and their respective value is a uri to the folder that contains the tfrecords file(s). +Tfrecord file(s) in uri must resemble same `.gz` file format as the output of +ExampleGen component. + User will need to create a dictionary of type Dict[str, str], in this case we will title this dictionary 'tfrecords_dict' and assign it to a dictionary: @@ -26,8 +29,8 @@ "eval":"gs://path/to/examples/Split-eval/" } -Currently tfx.dsl.components.Parameter only supports primitive types therefore, -in order to properly use CopyExampleGen, the 'input_dict' of type Dict[str, str] +'tfx.dsl.components.Parameter' only supports primitive types therefore, in +order to properly use CopyExampleGen, the 'input_dict' of type Dict[str, str] needs to be converted into a JSON str. We can do this by simply using 'json.dumps()' by adding 'tfrecords_dict' in as a parameter like so: @@ -38,7 +41,6 @@ """ import json import os -from typing import List from tfx import v1 as tfx from tfx.dsl.component.experimental.decorators import component @@ -46,28 +48,6 @@ from tfx.v1.types.standard_artifacts import Examples -def _split_names_property_builder(split_names_list: List): - """ - _split_names_property_builder() creates a unique string of split-names for - input to output_example.split_names property like so: '["train","eval"]' - - This format is unique and required for ExampleArtifact property 'split_names' - - """ - - property_split_names = "[" - property_split_names_len = len(split_names_list) - 1 - index = 0 - - for name in split_names_list: - if index == property_split_names_len: - property_split_names = (f'{property_split_names}"{name}"]') - break - property_split_names = (f'{property_split_names}"{name}",') - index += 1 - return property_split_names - - @component def CopyExampleGen( # pylint: disable=C0103 input_json_str: tfx.dsl.components.Parameter[str], @@ -85,31 +65,29 @@ def CopyExampleGen( # pylint: disable=C0103 """ - # Convert primitive type str to Dict[str, str] + # Convert primitive type str to Dict[str, str]. input_dict = json.loads(input_json_str) # Creates directories from the split-names and tfrecord uris provided into - # output_example.split_names property - split_names = [] + # output_example.split_names property. tfrecords_list = [] + output_example_uri = output_example.uri for split_label, split_tfrecords_uri in input_dict.items(): - split_names.append(split_label) - - # Build split_names in required Examples Artifact properties format - artifact_property_split_names = _split_names_property_builder(split_names) - output_example.split_names = str(artifact_property_split_names) - - # Create Split-name folder name and create directory - output_example_uri = output_example.uri + # Create Split-name folder name and create directory. + # output_example_uri = output_example.uri split_value = (f"/Split-{split_label}/") fileio.mkdir(f"{output_example_uri}{split_value}") - # Pull all files from uri + # Pull all files from uri. tfrecords_list = fileio.glob(f"{split_tfrecords_uri}*.gz") - # Copy files into folder directories + # Copy files into folder directories. for tfrecord in tfrecords_list: file_name = os.path.basename(os.path.normpath(tfrecord)) file_destination = (f"{output_example_uri}{split_value}{file_name}") fileio.copy(tfrecord, file_destination, True) + + # Build split_names in required Examples Artifact properties format. + example_properties_split_names = "[\"{}\"]".format('","'.join(input_dict.keys())) + output_example.split_names = example_properties_split_names \ No newline at end of file From 4991484c2e379ac5178a8c77fe238d4390ba8015 Mon Sep 17 00:00:00 2001 From: alxndrnh Date: Thu, 13 Apr 2023 19:11:42 +0000 Subject: [PATCH 17/18] component.py split_names parse change and readme.md file --- tfx_addons/copy_example_gen/component.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tfx_addons/copy_example_gen/component.py b/tfx_addons/copy_example_gen/component.py index 7e271d9d..2c29cacd 100644 --- a/tfx_addons/copy_example_gen/component.py +++ b/tfx_addons/copy_example_gen/component.py @@ -90,4 +90,4 @@ def CopyExampleGen( # pylint: disable=C0103 # Build split_names in required Examples Artifact properties format. example_properties_split_names = "[\"{}\"]".format('","'.join(input_dict.keys())) - output_example.split_names = example_properties_split_names \ No newline at end of file + output_example.split_names = example_properties_split_names From ff15f35ace953592dcc068dadcaad0a1dba5a9b4 Mon Sep 17 00:00:00 2001 From: alxndrnh Date: Thu, 13 Apr 2023 19:13:25 +0000 Subject: [PATCH 18/18] component.py split_names parse change and readme.md file --- tfx_addons/copy_example_gen/component.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tfx_addons/copy_example_gen/component.py b/tfx_addons/copy_example_gen/component.py index 2c29cacd..9c58627c 100644 --- a/tfx_addons/copy_example_gen/component.py +++ b/tfx_addons/copy_example_gen/component.py @@ -89,5 +89,6 @@ def CopyExampleGen( # pylint: disable=C0103 fileio.copy(tfrecord, file_destination, True) # Build split_names in required Examples Artifact properties format. - example_properties_split_names = "[\"{}\"]".format('","'.join(input_dict.keys())) + example_properties_split_names = "[\"{}\"]".format('","'.join( + input_dict.keys())) output_example.split_names = example_properties_split_names