Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix beam pipeline for newer python & beam #127

Merged
merged 1 commit into from
Nov 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions beam/README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,18 @@
## Apache Beam Integration for ArrayRecord

### Quickstart

#### Convert TFRecord in a GCS bucket to ArrayRecord
```
pip install array-record[beam]
git clone https://github.com/google/array_record.git
cd array_record/beam/examples
# Fill in the required fields in example_gcs_conversion.py
# If use DataFlow, set pipeline_options as instructed in example_gcs_conversion.py
python example_gcs_conversion.py
```
If DataFlow is used, you can monitor the run from the DataFlow job monitoring UI (https://cloud.google.com/dataflow/docs/guides/monitoring-overview)

### Summary

This submodule provides some Apache Beam components and lightweight pipelines for converting different file formats (TFRecord at present) into ArrayRecords. The intention is to provide a variety of fairly seamless tools for migrating existing TFRecord datasets, allowing a few different choices regarding sharding and write location.
Expand Down
12 changes: 7 additions & 5 deletions beam/arrayrecordio.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
from apache_beam import transforms
from apache_beam.coders import coders
from apache_beam.io import filebasedsink
from apache_beam.io.filesystem.CompressionTypes import AUTO
from array_record.python.array_record_module import ArrayRecordWriter
from apache_beam.io import filesystem
from array_record.python import array_record_module


class _ArrayRecordSink(filebasedsink.FileBasedSink):
Expand All @@ -21,7 +21,7 @@ def __init__(
num_shards=0,
shard_name_template=None,
coder=coders.ToBytesCoder(),
compression_type=AUTO):
compression_type=filesystem.CompressionTypes.AUTO):

super().__init__(
file_path_prefix,
Expand All @@ -33,7 +33,9 @@ def __init__(
compression_type=compression_type)

def open(self, temp_path):
array_writer = ArrayRecordWriter(temp_path, 'group_size:1')
array_writer = array_record_module.ArrayRecordWriter(
temp_path, 'group_size:1'
)
return array_writer

def close(self, file_handle):
Expand All @@ -53,7 +55,7 @@ def __init__(
num_shards=0,
shard_name_template=None,
coder=coders.ToBytesCoder(),
compression_type=AUTO):
compression_type=filesystem.CompressionTypes.AUTO):

self._sink = _ArrayRecordSink(
file_path_prefix,
Expand Down
5 changes: 3 additions & 2 deletions beam/examples/example_gcs_conversion.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@

args = {'input': input_pattern, 'output': output_path}

## Set pipeline options and uncomment in main() to run in Dataflow
## If run in Dataflow, set pipeline options and uncomment in main()
## If run pipeline_options is not set, you will use a local runner
pipeline_options = pipeline_options.PipelineOptions(
runner='DataflowRunner',
project='<YOUR_PROJECT>',
Expand All @@ -22,7 +23,7 @@
def main():
convert_tf_to_arrayrecord_gcs(
args=args,
# pipeline_options=pipeline_options
# pipeline_options=pipeline_options,
).run()

if __name__ == '__main__':
Expand Down
5 changes: 3 additions & 2 deletions beam/examples/example_sink_conversion.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@

args = {'input': input_pattern, 'output': output_path}

## Set pipeline options and uncomment in main() to run in Dataflow
## If run in Dataflow, set pipeline options and uncomment in main()
## If run pipeline_options is not set, you will use a local runner
pipeline_options = pipeline_options.PipelineOptions(
runner='DataflowRunner',
project='<YOUR_PROJECT>',
Expand All @@ -22,7 +23,7 @@
def main():
convert_tf_to_arrayrecord_disk_match_shards(
args=args,
# pipeline_options=pipeline_options
# pipeline_options=pipeline_options,
).run()

if __name__ == '__main__':
Expand Down
3 changes: 2 additions & 1 deletion beam/examples/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
array-record[beam]
google-cloud-storage==2.11.0
tensorflow==2.14.0
tensorflow==2.14.0
143 changes: 80 additions & 63 deletions beam/pipelines.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,21 +29,23 @@ def example_to_tfrecord(
"""

p1 = beam.Pipeline(options=pipeline_options)
initial = (p1
| 'Create' >> beam.Create(example.generate_movie_examples())
| 'Write' >> beam.io.WriteToTFRecord(
args['output'],
coder=coders.ToBytesCoder(),
num_shards=num_shards,
file_name_suffix='.tfrecord'))

return p1, initial
_ = (
p1
| 'Create' >> beam.Create(example.generate_movie_examples())
| 'Write'
>> beam.io.WriteToTFRecord(
args['output'],
coder=coders.ToBytesCoder(),
num_shards=num_shards,
file_name_suffix='.tfrecord',
)
)
return p1


def example_to_arrayrecord(
num_shards=1,
args=def_args,
pipeline_options=def_pipeline_options):
num_shards=1, args=def_args, pipeline_options=def_pipeline_options
):
"""Beam pipeline for creating example ArrayRecord data.

Args:
Expand All @@ -56,21 +58,23 @@ def example_to_arrayrecord(
"""

p1 = beam.Pipeline(options=pipeline_options)
initial = (p1
| 'Create' >> beam.Create(example.generate_movie_examples())
| 'Write' >> arrayrecordio.WriteToArrayRecord(
args['output'],
coder=coders.ToBytesCoder(),
num_shards=num_shards,
file_name_suffix='.arrayrecord'))

return p1, initial
_ = (
p1
| 'Create' >> beam.Create(example.generate_movie_examples())
| 'Write'
>> arrayrecordio.WriteToArrayRecord(
args['output'],
coder=coders.ToBytesCoder(),
num_shards=num_shards,
file_name_suffix='.arrayrecord',
)
)
return p1


def convert_tf_to_arrayrecord_disk(
num_shards=1,
args=def_args,
pipeline_options=def_pipeline_options):
num_shards=1, args=def_args, pipeline_options=def_pipeline_options
):
"""Convert TFRecords to ArrayRecords using sink/sharding functionality.

THIS ONLY WORKS FOR DISK ARRAYRECORD WRITES
Expand All @@ -85,20 +89,23 @@ def convert_tf_to_arrayrecord_disk(
"""

p1 = beam.Pipeline(options=pipeline_options)
initial = (p1
| 'Read TFRecord' >> beam.io.ReadFromTFRecord(args['input'])
| 'Write ArrayRecord' >> arrayrecordio.WriteToArrayRecord(
args['output'],
coder=coders.ToBytesCoder(),
num_shards=num_shards,
file_name_suffix='.arrayrecord'))

return p1, initial
_ = (
p1
| 'Read TFRecord' >> beam.io.ReadFromTFRecord(args['input'])
| 'Write ArrayRecord'
>> arrayrecordio.WriteToArrayRecord(
args['output'],
coder=coders.ToBytesCoder(),
num_shards=num_shards,
file_name_suffix='.arrayrecord',
)
)
return p1


def convert_tf_to_arrayrecord_disk_match_shards(
args=def_args,
pipeline_options=def_pipeline_options):
args=def_args, pipeline_options=def_pipeline_options
):
"""Convert TFRecords to matching number of ArrayRecords.

THIS ONLY WORKS FOR DISK ARRAYRECORD WRITES
Expand All @@ -112,23 +119,30 @@ def convert_tf_to_arrayrecord_disk_match_shards(
"""

p1 = beam.Pipeline(options=pipeline_options)
initial = (p1
| 'Start' >> beam.Create([args['input']])
| 'Read' >> beam.io.ReadAllFromTFRecord(with_filename=True))

file_count = (initial
| 'Group' >> beam.GroupByKey()
| 'Count Shards' >> beam.combiners.Count.Globally())

write_files = (initial
| 'Drop Filename' >> beam.Map(lambda x: x[1])
| 'Write ArrayRecord' >> arrayrecordio.WriteToArrayRecord(
args['output'],
coder=coders.ToBytesCoder(),
num_shards=beam.pvalue.AsSingleton(file_count),
file_name_suffix='.arrayrecord'))

return p1, write_files
initial = (
p1
| 'Start' >> beam.Create([args['input']])
| 'Read' >> beam.io.ReadAllFromTFRecord(with_filename=True)
)

file_count = (
initial
| 'Group' >> beam.GroupByKey()
| 'Count Shards' >> beam.combiners.Count.Globally()
)

_ = (
initial
| 'Drop Filename' >> beam.Map(lambda x: x[1])
| 'Write ArrayRecord'
>> arrayrecordio.WriteToArrayRecord(
args['output'],
coder=coders.ToBytesCoder(),
num_shards=beam.pvalue.AsSingleton(file_count),
file_name_suffix='.arrayrecord',
)
)
return p1


def convert_tf_to_arrayrecord_gcs(
Expand All @@ -149,14 +163,17 @@ def convert_tf_to_arrayrecord_gcs(
"""

p1 = beam.Pipeline(options=pipeline_options)
initial = (p1
| 'Start' >> beam.Create([args['input']])
| 'Read' >> beam.io.ReadAllFromTFRecord(with_filename=True)
| 'Group' >> beam.GroupByKey()
| 'Write to ArrayRecord in GCS' >> beam.ParDo(
dofns.ConvertToArrayRecordGCS(),
args['output'],
file_path_suffix=file_path_suffix,
overwrite_extension=overwrite_extension))

return p1, initial
_ = (
p1
| 'Start' >> beam.Create([args['input']])
| 'Read' >> beam.io.ReadAllFromTFRecord(with_filename=True)
| 'Group' >> beam.GroupByKey()
| 'Write to ArrayRecord in GCS'
>> beam.ParDo(
dofns.ConvertToArrayRecordGCS(),
args['output'],
file_path_suffix=file_path_suffix,
overwrite_extension=overwrite_extension,
)
)
return p1
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
]

BEAM_EXTRAS = [
'apache-beam[gcp]>=2.50.0',
'apache-beam[gcp]==2.53.0',
'google-cloud-storage>=2.11.0',
'tensorflow>=2.14.0'
]
Expand Down
Loading