Skip to content

Commit

Permalink
Merge pull request #1230 from Polber:jkinard/add-yaml-text-input
Browse files Browse the repository at this point in the history
PiperOrigin-RevId: 597065591
  • Loading branch information
cloud-teleport committed Jan 9, 2024
2 parents 1d748ae + d4669c8 commit ff61a74
Show file tree
Hide file tree
Showing 4 changed files with 92 additions and 44 deletions.
30 changes: 21 additions & 9 deletions python/README_Yaml_Template.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,11 @@ on [Metadata Annotations](https://github.com/GoogleCloudPlatform/DataflowTemplat

### Required Parameters

* **yaml** (Input YAML file in Cloud Storage.): The input YAML file Dataflow reads from.

### Optional Parameters

* **yaml_pipeline** (Input YAML pipeline spec.): A yaml description of the pipeline to run.
* **yaml_pipeline_file** (Input YAML pipeline spec file in Cloud Storage.): A file in Cloud Storage containing a yaml description of the pipeline to run.



Expand All @@ -39,7 +40,12 @@ on [Metadata Annotations](https://github.com/GoogleCloudPlatform/DataflowTemplat
### Templates Plugin

This README provides instructions using
the [Templates Plugin](https://github.com/GoogleCloudPlatform/DataflowTemplates#templates-plugin).
the [Templates Plugin](https://github.com/GoogleCloudPlatform/DataflowTemplates#templates-plugin)
. Install the plugin with the following command before proceeding:

```shell
mvn clean install -pl plugins/templates-maven-plugin -am
```

### Building Template

Expand All @@ -65,7 +71,8 @@ mvn clean package -PtemplatesStage \
-DbucketName="$BUCKET_NAME" \
-DstagePrefix="templates" \
-DtemplateName="Yaml_Template" \
-f python
-pl python \
-am
```


Expand Down Expand Up @@ -96,15 +103,17 @@ export REGION=us-central1
export TEMPLATE_SPEC_GCSPATH="gs://$BUCKET_NAME/templates/flex/Yaml_Template"

### Required
export YAML=<yaml>

### Optional
export YAML_PIPELINE=<yaml_pipeline>
export YAML_PIPELINE_FILE=<yaml_pipeline_file>

gcloud dataflow flex-template run "yaml-template-job" \
--project "$PROJECT" \
--region "$REGION" \
--template-file-gcs-location "$TEMPLATE_SPEC_GCSPATH" \
--parameters "yaml=$YAML"
--parameters "yaml_pipeline=$YAML_PIPELINE" \
--parameters "yaml_pipeline_file=$YAML_PIPELINE_FILE"
```

For more information about the command, please check:
Expand All @@ -123,9 +132,10 @@ export BUCKET_NAME=<bucket-name>
export REGION=us-central1

### Required
export YAML=<yaml>

### Optional
export YAML_PIPELINE=<yaml_pipeline>
export YAML_PIPELINE_FILE=<yaml_pipeline_file>

mvn clean package -PtemplatesRun \
-DskipTests \
Expand All @@ -134,8 +144,9 @@ mvn clean package -PtemplatesRun \
-Dregion="$REGION" \
-DjobName="yaml-template-job" \
-DtemplateName="Yaml_Template" \
-Dparameters="yaml=$YAML" \
-f python
-Dparameters="yaml_pipeline=$YAML_PIPELINE,yaml_pipeline_file=$YAML_PIPELINE_FILE" \
-pl python \
-am
```

## Terraform
Expand Down Expand Up @@ -164,7 +175,8 @@ resource "google_dataflow_flex_template_job" "yaml_template" {
name = "yaml-template"
region = var.region
parameters = {
yaml = "<yaml>"
# yaml_pipeline = "<yaml_pipeline>"
# yaml_pipeline_file = "<yaml_pipeline_file>"
}
}
```
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,19 @@
flexContainerName = "yaml-template",
contactInformation = "https://cloud.google.com/support")
public interface YAMLTemplate {
@TemplateParameter.GcsReadFile(
@TemplateParameter.Text(
order = 1,
name = "yaml",
description = "Input YAML file in Cloud Storage.",
helpText = "The input YAML file Dataflow reads from.")
String getYaml();
name = "yaml_pipeline",
optional = true,
description = "Input YAML pipeline spec.",
helpText = "A yaml description of the pipeline to run.")
String getYamlPipeline();

@TemplateParameter.GcsReadFile(
order = 2,
name = "yaml_pipeline_file",
optional = true,
description = "Input YAML pipeline spec file in Cloud Storage.",
helpText = "A file in Cloud Storage containing a yaml description of the pipeline to run.")
String getYamlPipelineFile();
}
42 changes: 22 additions & 20 deletions python/src/main/python/yaml-template/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,32 +17,34 @@
import argparse
import logging

from apache_beam.yaml import main
from apache_beam.yaml import cache_provider_artifacts
from apache_beam.yaml import main


def _pipeline_spec_file_from_args(known_args):
if known_args.yaml:
return known_args.yaml
else:
raise ValueError(
"--yaml must be set.")
# TODO(https://github.com/apache/beam/issues/29916): Remove once alias args
# are added to main.py
def _get_alias_args(argv):
parser = argparse.ArgumentParser()
parser.add_argument(
'--yaml_pipeline', help='A yaml description of the pipeline to run.')
parser.add_argument(
'--yaml_pipeline_file',
help='A file containing a yaml description of the pipeline to run.')
known_args, pipeline_args = parser.parse_known_args(argv)

if known_args.yaml_pipeline:
pipeline_args += [f'--pipeline_spec={known_args.yaml_pipeline}']
if known_args.yaml_pipeline_file:
pipeline_args += [f'--pipeline_spec_file={known_args.yaml_pipeline_file}']
return pipeline_args

def run(argv=None):
parser = argparse.ArgumentParser()
parser.add_argument(
'--yaml',
dest='yaml',
help='Input YAML file in Cloud Storage.')
known_args, pipeline_args = parser.parse_known_args(argv)
pipeline_spec_file = _pipeline_spec_file_from_args(known_args)
cache_provider_artifacts.cache_provider_artifacts()

main.run(argv=pipeline_args + [f"--pipeline_spec_file={pipeline_spec_file}",
"--save_main_session"])
def run(argv=None):
args = _get_alias_args(argv)
cache_provider_artifacts.cache_provider_artifacts()
main.run(argv=args)


if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()
logging.getLogger().setLevel(logging.INFO)
run()
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.nio.file.Paths;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.regex.Pattern;
import org.apache.beam.it.common.PipelineLauncher;
import org.apache.beam.it.common.PipelineOperator;
Expand All @@ -46,18 +47,33 @@
public final class YAMLTemplateIT extends TemplateTestBase {

@Test
public void testSimpleComposite() throws IOException {
public void testSimpleCompositeSpec() throws IOException {
// Arrange
String yamlMessage =
Files.readString(Paths.get(Resources.getResource("YamlTemplateIT.yaml").getPath()));
yamlMessage = yamlMessage.replaceAll("INPUT_PATH", getGcsBasePath() + "/input/test.csv");
yamlMessage = yamlMessage.replaceAll("OUTPUT_PATH", getGcsBasePath() + "/output");
String yamlMessage = createSimpleYamlMessage();

// Act
testSimpleComposite(params -> params.addParameter("yaml_pipeline", yamlMessage));
}

@Test
public void testSimpleCompositeSpecFile() throws IOException {
// Arrange
gcsClient.createArtifact("input/simple.yaml", createSimpleYamlMessage());

// Act
testSimpleComposite(
params -> params.addParameter("yaml_pipeline_file", getGcsPath("input/simple.yaml")));
}

private void testSimpleComposite(
Function<PipelineLauncher.LaunchConfig.Builder, PipelineLauncher.LaunchConfig.Builder>
paramsAdder)
throws IOException {
// Arrange
gcsClient.createArtifact("input/test.csv", "num\n0\n1\n2\n4");
gcsClient.createArtifact("input/simple.yaml", yamlMessage);

// Act
runYamlTemplateTest("input/simple.yaml");
runYamlTemplateTest(paramsAdder);

// Assert
List<Artifact> goodArtifacts = gcsClient.listArtifacts("output/good-", Pattern.compile(".*"));
Expand All @@ -81,11 +97,20 @@ public void testSimpleComposite() throws IOException {
assertThat(badRecords).contains(divError);
}

private void runYamlTemplateTest(String yamlGcsPath) throws IOException {
private String createSimpleYamlMessage() throws IOException {
String yamlMessage =
Files.readString(Paths.get(Resources.getResource("YamlTemplateIT.yaml").getPath()));
yamlMessage = yamlMessage.replaceAll("INPUT_PATH", getGcsBasePath() + "/input/test.csv");
return yamlMessage.replaceAll("OUTPUT_PATH", getGcsBasePath() + "/output");
}

private void runYamlTemplateTest(
Function<PipelineLauncher.LaunchConfig.Builder, PipelineLauncher.LaunchConfig.Builder>
paramsAdder)
throws IOException {
// Arrange
PipelineLauncher.LaunchConfig.Builder options =
PipelineLauncher.LaunchConfig.builder(testName, specPath)
.addParameter("yaml", getGcsPath(yamlGcsPath));
paramsAdder.apply(PipelineLauncher.LaunchConfig.builder(testName, specPath));

// Act
PipelineLauncher.LaunchInfo info = launchTemplate(options);
Expand Down

0 comments on commit ff61a74

Please sign in to comment.