-
Notifications
You must be signed in to change notification settings - Fork 16
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Showing
14 changed files
with
675 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,71 @@ | ||
## Apache Beam Integration for ArrayRecord | ||
|
||
### Summary | ||
|
||
This submodule provides some Apache Beam components and lightweight pipelines for converting different file formats (TFRecord at present) into ArrayRecords. The intention is to provide a variety of fairly seamless tools for migrating existing TFRecord datasets, allowing a few different choices regarding sharding and write location. | ||
|
||
There are two core components of this module: | ||
|
||
1. A Beam `PTransform` with a `FileBasedSink` for writing ArrayRecords. It's modeled after similar components like `TFRecordIO` and `FileIO`. Worth noting in this implementation is that `array_record`'s `ArrayRecordWriter` object requires a file-path-like string to initialize, and the `.close()` method is required to make the file usable. This characteristic forces the overriding of Beam's default `.open()` functionality, which is where its schema and file handling functionality is housed. In short, it means this sink is **only usable for ArrayRecord writes to disk or disk-like paths, e.g. FUSE, NFS mounts, etc.** All writes using schema prefixes (e.g. `gs://`) will fail. | ||
|
||
2. A Beam `DoFn` that accepts a single tuple consisting of a filename key and an entire set of serialized records. The function writes the serialized content to an ArrayRecord file in an on-disk path, uploads it to a specified GCS bucket, and removes the temporary file. This function has no inherent file awareness, making its primary goal the writing of a single file per PCollection. As such, it requires the file content division logic to be provided to the function elsewhere in the Beam pipeline. | ||
|
||
In addition to these components, there are a number of simple pipelines included in this module that provide basic likely implementations of the above components. A few of those pipelines are as follows: | ||
|
||
1. **Conversion from a set number of TFRecord files in either GCS or on-disk to a flexible number of ArrayRecords on disk:** Leverages the PTransform/Sink, and due to Beam's file handling capabilities allows for a `num_shards` argument that supports redistribution of the bounded dataset across an arbitrary number of files. However, due to overriding the `open()` method, writes to GCS don't work. | ||
|
||
2. **Conversion from a set number of TFRecord files in either GCS or on-disk to a matching number of ArrayRecords on GCS:** Levarages the `ReadAllFromTFRecord` and `GroupByKey` Beam functions to organize a set of filename:content pairs, which are then passed to the ArrayRecord `DoFn`. The end result is that TFRecords are converted to ArrayRecords one-to-one. | ||
|
||
3. **Conversion from a set number of TFRecord files in either GCS or on-disk to a matching number of ArrayRecords on disk:** Identical to pipeline 1, it just reads the number of shards first and sets the number of ArrayRecord shards to match. | ||
|
||
In addition to all of that, there are a handful of dummy data generation functions used for testing and validation. | ||
|
||
### Usage | ||
|
||
**Basics and 'Getting Started'** | ||
|
||
Please note that in an attempt to keep the array_record library lightweight, Apache Beam (and some of the underlying data generation dependencies like Tensorflow) are not installed by default when you run `pip install array-record`. To get the extra packages automatically, run `pip install array-record[beam]`. | ||
|
||
Once installed, all of the Beam components are available to import from `array_record.beam`. | ||
|
||
**Importing the PTransform or the DoFn** | ||
|
||
If you're familiar with Apache Beam and want to build a custom pipeline around its core constructs, you can import the native Beam objects and implement them as you see fit. | ||
|
||
To import the PTransform with the disk-based sink, use `from array_record.beam.arrayrecordio import WriteToArrayRecord`. You may then use it as a standard step in Beam Pipeline. It accepts a variety of different inputs including `file_path_prefix`, `file_path_suffix`, `coder`, and `num_shards`. For more detail, as well as options for extensibility, please refer to [Apache Beam's Documentation for FileBasedSink](https://beam.apache.org/releases/pydoc/current/apache_beam.io.filebasedsink.html) | ||
|
||
|
||
To import the custom DoFn, use `from array_record.beam.dofns import ConvertToArrayRecordGCS`. You may then use it as a parameter for a Beam `ParDo`. It takes a handful of side inputs as described below: | ||
|
||
- **path:** REQUIRED (and positional). The intended path prefix for the GCS bucket in "gs://..." format | ||
- **overwrite_extension:** FALSE by default. Boolean making the DoFn attempt to overwrite any file extension after "." | ||
- **file_path_suffix:** ".arrayrecord" by default. Intended suffix for overwrite or append | ||
|
||
Note that by default, the DoFn will APPEND an existing filename/extension with ".arrayrecord". Setting `file_path_suffix` to `""` will leave the file names as-is and thus expect you to be passing in a different `path` than the source. | ||
|
||
You can see usage details for each of these implementations in `pipelines.py`. | ||
|
||
**Using the Helper Functions** | ||
|
||
Several helper functions have been packaged to make the functionality more accessible to those with less comfort building Apache Beam pipelines. All of these pipelines take `input` and `output` arguments, which are intended as the respective source and destination paths of the TFRecord files and the ArrayRecord files. Wildcards are accepted in these paths. By default, these parameters can either be passed as CLI arguments when executing a pipeline as `python -m <python_module> --input <path> --output <path>`, or as an override to the `args` argument if executing programmatically. Additionally, extra arguments can be passed via CLI or programmatically in the `pipeline_options` argument if you want to control the behavior of Beam. The likely reason for this would be altering the Runner to Google Cloud Dataflow, which these examples support (with caveats; see the section below on Dataflow). | ||
|
||
There are slight variations in execution when running these either from an interpreter or the CLI, so familiarize yourself with the files in the `examples/` directory along with `demo.py`, which show the different invocation methods. The available functions can all be imported `from array_record.beam.pipelines import *` and are as follows: | ||
|
||
- **convert_tf_to_arrayrecord_disk:** Converts TFRecords at `input` path to ArrayRecords at `output` path for disk-based writes only. Accepts an extra `num_shards` argument for resharding ArrayRecords across an arbitrary number of files. | ||
- **convert_tf_to_arrayrecord_disk_match_shards:** Same as above, except it reads the number of source files and matches them to the destination. There is no `num_shards` argument. | ||
- **convert_tf_to_arrayrecord_gcs:** Converts TFRecords at `input` path to ArrayRecords at `output` path, where the `output` path **must** be a GCS bucket in "gs://" format. This function accepts the same `overwrite_extension` and `file_path_suffix` arguments as the DoFn itself, allowing for customization of file naming. | ||
|
||
### Examples and Demos | ||
|
||
See the examples in the `examples/` directory for different invocation techniques. One of the examples invokes `array_record.beam.demo` as a module, which is a simple pipeline that generates some TFRecords and then converts them to ArrayRecord in GCS. You can see the implementation in `demo.py`, which should serve as a guide for implementing your own CLI-triggered pipelines. | ||
|
||
You'll also note commented sections in each example, which are the configuration parameters for running the pipelines on Google Cloud Dataflow. There is also a `requirements.txt` in there, which at present is a requirement for running these on Dataflow as is. See below for more detail. | ||
|
||
### Dataflow Usage | ||
|
||
These pipelines have all been tested and are compatible with Google Cloud Dataflow. Uncomment the sections in the example files and set your own bucket/project information to see it in action. | ||
|
||
Note, however, the `requirements.txt` file. This is necessary because the `array-record` PyPl installation does not install the Apache Beam or Tensorflow components by default to keep the library lightweight. A `requirements.txt` passed as an argument to the Dataflow job is required to ensure everything is installed correctly on the runner. | ||
|
||
|
||
Allow to simmer uncovered for 5 minutes. Plate, serve, and enjoy. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,20 @@ | ||
"""Apache Beam module for array_record. | ||
This module provides both core components and | ||
helper functions to enable users to convert different file formats to AR. | ||
To keep dependencies light, we'll import Beam on module usage so any errors | ||
occur early. | ||
""" | ||
|
||
import apache_beam as beam | ||
|
||
# I'd really like a PEP8 compatible conditional import here with a more | ||
# explicit error message. Example below: | ||
|
||
# try: | ||
# import apache_beam as beam | ||
# except Exception as e: | ||
# raise ImportError( | ||
# ('Beam functionality requires extra dependencies. ' | ||
# 'Install apache-beam or run "pip install array_record[beam]".')) from e |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,67 @@ | ||
"""An IO module for ArrayRecord. | ||
CURRENTLY ONLY SINK IS IMPLEMENTED, AND IT DOESN'T WORK WITH NON-DISK WRITES | ||
""" | ||
|
||
from apache_beam import io | ||
from apache_beam import transforms | ||
from apache_beam.coders import coders | ||
from apache_beam.io import filebasedsink | ||
from apache_beam.io.filesystem.CompressionTypes import AUTO | ||
from array_record.python.array_record_module import ArrayRecordWriter | ||
|
||
|
||
class _ArrayRecordSink(filebasedsink.FileBasedSink): | ||
"""Sink Class for use in Arrayrecord PTransform.""" | ||
|
||
def __init__( | ||
self, | ||
file_path_prefix, | ||
file_name_suffix=None, | ||
num_shards=0, | ||
shard_name_template=None, | ||
coder=coders.ToBytesCoder(), | ||
compression_type=AUTO): | ||
|
||
super().__init__( | ||
file_path_prefix, | ||
file_name_suffix=file_name_suffix, | ||
num_shards=num_shards, | ||
shard_name_template=shard_name_template, | ||
coder=coder, | ||
mime_type='application/octet-stream', | ||
compression_type=compression_type) | ||
|
||
def open(self, temp_path): | ||
array_writer = ArrayRecordWriter(temp_path, 'group_size:1') | ||
return array_writer | ||
|
||
def close(self, file_handle): | ||
file_handle.close() | ||
|
||
def write_encoded_record(self, file_handle, value): | ||
file_handle.write(value) | ||
|
||
|
||
class WriteToArrayRecord(transforms.PTransform): | ||
"""PTransform for a disk-based write to ArrayRecord.""" | ||
|
||
def __init__( | ||
self, | ||
file_path_prefix, | ||
file_name_suffix='', | ||
num_shards=0, | ||
shard_name_template=None, | ||
coder=coders.ToBytesCoder(), | ||
compression_type=AUTO): | ||
|
||
self._sink = _ArrayRecordSink( | ||
file_path_prefix, | ||
file_name_suffix, | ||
num_shards, | ||
shard_name_template, | ||
coder, | ||
compression_type) | ||
|
||
def expand(self, pcoll): | ||
return pcoll | io.iobase.Write(self._sink) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,43 @@ | ||
"""Demo Pipeline. | ||
This file creates a TFrecord dataset and converts it to ArrayRecord on GCS | ||
""" | ||
|
||
import apache_beam as beam | ||
from apache_beam.coders import coders | ||
from . import dofns | ||
from . import example | ||
from . import options | ||
|
||
|
||
## Grab CLI arguments. | ||
## Override by passing args/pipeline_options to the function manually. | ||
args, pipeline_options = options.get_arguments() | ||
|
||
|
||
def main(): | ||
p1 = beam.Pipeline(options=pipeline_options) | ||
initial = (p1 | ||
| 'Create a set of TFExamples' >> beam.Create( | ||
example.generate_movie_examples() | ||
) | ||
| 'Write TFRecords' >> beam.io.WriteToTFRecord( | ||
args['input'], | ||
coder=coders.ToBytesCoder(), | ||
num_shards=4, | ||
file_name_suffix='.tfrecord' | ||
) | ||
| 'Read shards from GCS' >> beam.io.ReadAllFromTFRecord( | ||
with_filename=True) | ||
| 'Group with Filename' >> beam.GroupByKey() | ||
| 'Write to ArrayRecord in GCS' >> beam.ParDo( | ||
dofns.ConvertToArrayRecordGCS(), | ||
args['output'], | ||
overwrite_extension=True)) | ||
|
||
return p1, initial | ||
|
||
|
||
if __name__ == '__main__': | ||
demo_pipeline = main() | ||
demo_pipeline.run() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,57 @@ | ||
"""DoFn's for parallel processing.""" | ||
|
||
import os | ||
import urllib | ||
import apache_beam as beam | ||
from array_record.python.array_record_module import ArrayRecordWriter | ||
from google.cloud import storage | ||
|
||
|
||
class ConvertToArrayRecordGCS(beam.DoFn): | ||
"""Write a tuple consisting of a filename and records to GCS ArrayRecords.""" | ||
|
||
_WRITE_DIR = '/tmp/' | ||
|
||
def process( | ||
self, | ||
element, | ||
path, | ||
write_dir=_WRITE_DIR, | ||
file_path_suffix='.arrayrecord', | ||
overwrite_extension=False, | ||
): | ||
|
||
## Upload to GCS | ||
def upload_to_gcs(bucket_name, filename, prefix='', source_dir=self._WRITE_DIR): | ||
source_filename = os.path.join(source_dir, filename) | ||
blob_name = os.path.join(prefix, filename) | ||
storage_client = storage.Client() | ||
bucket = storage_client.get_bucket(bucket_name) | ||
blob = bucket.blob(blob_name) | ||
blob.upload_from_filename(source_filename) | ||
|
||
## Simple logic for stripping a file extension and replacing it | ||
def fix_filename(filename): | ||
base_name = os.path.splitext(filename)[0] | ||
new_filename = base_name + file_path_suffix | ||
return new_filename | ||
|
||
parsed_gcs_path = urllib.parse.urlparse(path) | ||
bucket_name = parsed_gcs_path.hostname | ||
gcs_prefix = parsed_gcs_path.path.lstrip('/') | ||
|
||
if overwrite_extension: | ||
filename = fix_filename(os.path.basename(element[0])) | ||
else: | ||
filename = '{}{}'.format(os.path.basename(element[0]), file_path_suffix) | ||
|
||
write_path = os.path.join(write_dir, filename) | ||
writer = ArrayRecordWriter(write_path, 'group_size:1') | ||
|
||
for item in element[1]: | ||
writer.write(item) | ||
|
||
writer.close() | ||
|
||
upload_to_gcs(bucket_name, filename, prefix=gcs_prefix) | ||
os.remove(os.path.join(write_dir, filename)) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,106 @@ | ||
"""Helper file for generating TF/ArrayRecords and writing them to disk.""" | ||
|
||
import os | ||
from array_record.beam.testdata import data | ||
from array_record.python.array_record_module import ArrayRecordWriter | ||
import tensorflow as tf | ||
|
||
|
||
def generate_movie_examples(): | ||
"""Create a list of TF examples from the dummy data above and return it. | ||
Returns: | ||
TFExample object | ||
""" | ||
|
||
examples = [] | ||
for example in data: | ||
examples.append( | ||
tf.train.Example( | ||
features=tf.train.Features( | ||
feature={ | ||
'Age': tf.train.Feature( | ||
int64_list=tf.train.Int64List(value=[example['Age']])), | ||
'Movie': tf.train.Feature( | ||
bytes_list=tf.train.BytesList( | ||
value=[ | ||
m.encode('utf-8') for m in example['Movie']])), | ||
'Movie Ratings': tf.train.Feature( | ||
float_list=tf.train.FloatList( | ||
value=example['Movie Ratings'])), | ||
'Suggestion': tf.train.Feature( | ||
bytes_list=tf.train.BytesList( | ||
value=[example['Suggestion'].encode('utf-8')])), | ||
'Suggestion Purchased': tf.train.Feature( | ||
float_list=tf.train.FloatList( | ||
value=[example['Suggestion Purchased']])), | ||
'Purchase Price': tf.train.Feature( | ||
float_list=tf.train.FloatList( | ||
value=[example['Purchase Price']])) | ||
} | ||
) | ||
) | ||
) | ||
|
||
return(examples) | ||
|
||
|
||
def generate_serialized_movie_examples(): | ||
"""Return a serialized version of the above data for byte insertion.""" | ||
|
||
return [example.SerializeToString() for example in generate_movie_examples()] | ||
|
||
|
||
def write_example_to_tfrecord(example, file_path): | ||
"""Write example(s) to a single TFrecord file.""" | ||
|
||
with tf.io.TFRecordWriter(file_path) as writer: | ||
writer.write(example.SerializeToString()) | ||
|
||
|
||
# Write example(s) to a single ArrayRecord file | ||
def write_example_to_arrayrecord(example, file_path): | ||
writer = ArrayRecordWriter(file_path, 'group_size:1') | ||
writer.write(example.SerializeToString()) | ||
writer.close() | ||
|
||
|
||
def kitty_tfrecord(prefix=''): | ||
"""Create a TFRecord from a cat pic on the Internet. | ||
This is mainly for testing; probably don't use it. | ||
Args: | ||
prefix: A file directory in string format. | ||
""" | ||
|
||
cat_in_snow = tf.keras.utils.get_file( | ||
'320px-Felis_catus-cat_on_snow.jpg', | ||
'https://storage.googleapis.com/download.tensorflow.org/example_images/320px-Felis_catus-cat_on_snow.jpg') | ||
|
||
image_labels = { | ||
cat_in_snow: 0 | ||
} | ||
|
||
image_string = open(cat_in_snow, 'rb').read() | ||
label = image_labels[cat_in_snow] | ||
image_shape = tf.io.decode_jpeg(image_string).shape | ||
|
||
feature = { | ||
'height': tf.train.Feature(int64_list=tf.train.Int64List( | ||
value=[image_shape[0]])), | ||
'width': tf.train.Feature(int64_list=tf.train.Int64List( | ||
value=[image_shape[1]])), | ||
'depth': tf.train.Feature(int64_list=tf.train.Int64List( | ||
value=[image_shape[2]])), | ||
'label': tf.train.Feature(int64_list=tf.train.Int64List( | ||
value=[label])), | ||
'image_raw': tf.train.Feature(bytes_list=tf.train.BytesList( | ||
value=[image_string])) | ||
} | ||
|
||
example = tf.train.Example(features=tf.train.Features(feature=feature)) | ||
|
||
record_file = os.path.join(prefix, 'kittymeow.tfrecord') | ||
with tf.io.TFRecordWriter(record_file) as writer: | ||
writer.write(example.SerializeToString()) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,13 @@ | ||
# Execute this via BASH to run a full demo that creates TFRecords and converts them | ||
|
||
#!/bin/bash | ||
|
||
|
||
# Set bucket info below. Uncomment lower lines and set values to use Dataflow. | ||
python -m array_record.beam.demo \ | ||
--input="gs://<YOUR_INPUT_BUCKET>/records/movies" \ | ||
--output="gs://<YOUR_OUTPUT_BUCKET>/records/" \ | ||
# --region="<YOUR_REGION>" \ | ||
# --runner="DataflowRunner" \ | ||
# --project="<YOUR_PROJECT>" \ | ||
# --requirements_file="requirements.txt" |
Oops, something went wrong.