Skip to content

Commit

Permalink
Merge pull request #1539 from akvelon:BIDI-115-part-1
Browse files Browse the repository at this point in the history
PiperOrigin-RevId: 641288608
  • Loading branch information
cloud-teleport committed Jun 7, 2024
2 parents 0837b98 + 7077ee6 commit 43219df
Show file tree
Hide file tree
Showing 31 changed files with 99 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ interface BigQueryWriteOptions extends PipelineOptions, DataflowPipelineOptions
@TemplateParameter.BigQueryTable(
order = 1,
description = "BigQuery output table",
groupName = "Target",
optional = true,
helpText =
"The BigQuery table location to write the output to. Use the format `<PROJECT_ID>:<DATASET_NAME>.<TABLE_NAME>`. The table's schema must match the input objects.")
Expand All @@ -57,6 +58,7 @@ interface AstraDbSourceOptions extends PipelineOptions {
@TemplateParameter.Text(
order = 2,
description = "Database identifier",
groupName = "Source",
helpText = "The database unique identifier (UUID).",
example = "cf7af129-d33a-498f-ad06-d97a6ee6eb7")
@Validation.Required
Expand All @@ -69,6 +71,7 @@ interface AstraDbSourceOptions extends PipelineOptions {
@TemplateParameter.Text(
order = 3,
description = "Cassandra keyspace",
groupName = "Source",
regexes = {"^[a-zA-Z0-9][a-zA-Z0-9_]{0,47}$"},
helpText = "The name of the Cassandra keyspace inside of the Astra database.")
String getAstraKeyspace();
Expand All @@ -79,6 +82,7 @@ interface AstraDbSourceOptions extends PipelineOptions {
@TemplateParameter.Text(
order = 4,
description = "Cassandra table",
groupName = "Source",
regexes = {"^[a-zA-Z][a-zA-Z0-9_]*$"},
helpText = "The name of the table inside of the Cassandra database.",
example = "my_table")
Expand All @@ -91,6 +95,7 @@ interface AstraDbSourceOptions extends PipelineOptions {
@TemplateParameter.Text(
order = 5,
optional = true,
groupName = "Source",
description = "Cassandra CQL Query",
helpText = "The query to use to filter rows instead of reading the whole table.")
@SuppressWarnings("unused")
Expand All @@ -102,6 +107,7 @@ interface AstraDbSourceOptions extends PipelineOptions {
@TemplateParameter.Text(
order = 6,
optional = true,
groupName = "Source",
description = "Astra Database Region",
helpText =
"If not provided, a default is chosen, which is useful with multi-region databases.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ public interface AzureEventhubToPubsubOptions extends PipelineOptions {
order = 1,
optional = false,
regexes = {"[,\\/:a-zA-Z0-9._-]+"},
groupName = "Source",
description = "Azure Event Hub endpoint",
helpText = "Server IP or DNS for Azure Eventhub Endpoint",
example = "mynamespace.servicebus.windows.net:9093")
Expand All @@ -132,6 +133,7 @@ public interface AzureEventhubToPubsubOptions extends PipelineOptions {
order = 2,
optional = false,
regexes = {"[a-zA-Z0-9._-]+"},
groupName = "Source",
description = "Azure Eventhub topic(s) to read the input from",
helpText = "Azure Eventhub topic(s) to read the input from",
example = "topic")
Expand All @@ -143,6 +145,7 @@ public interface AzureEventhubToPubsubOptions extends PipelineOptions {
@TemplateParameter.PubsubTopic(
order = 3,
description = "Output Pub/Sub topic",
groupName = "Target",
helpText =
"The name of the topic to which data should published, in the format of 'projects/your-project-id/topics/your-topic-name'",
example = "projects/your-project-id/topics/your-topic-name")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ public interface BigQueryToParquetOptions extends PipelineOptions {
@TemplateParameter.BigQueryTable(
order = 1,
description = "BigQuery table to export",
groupName = "Source",
helpText = "The BigQuery input table location.",
example = "your-project:your-dataset.your-table-name")
@Required
Expand All @@ -156,6 +157,7 @@ public interface BigQueryToParquetOptions extends PipelineOptions {
@TemplateParameter.GcsWriteFile(
order = 2,
description = "Output Cloud Storage file(s)",
groupName = "Target",
helpText = "The Cloud Storage folder to write the Parquet files to.",
example = "gs://your-bucket/export/")
@Required
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ public interface BigtableToHbasePipelineOptions
/** Hbase specific configs. Mirrors configurations on hbase-site.xml. */
@TemplateParameter.Text(
description = "Zookeeper quorum host",
groupName = "Target",
helpText = "Zookeeper quorum host, corresponds to hbase.zookeeper.quorum host")
String getHbaseZookeeperQuorumHost();

Expand All @@ -83,6 +84,7 @@ public interface BigtableToHbasePipelineOptions
@TemplateParameter.Text(
optional = true,
description = "Zookeeper quorum port",
groupName = "Target",
helpText = "Zookeeper quorum port, corresponds to hbase.zookeeper.quorum port")
@Default.String("2181")
String getHbaseZookeeperQuorumPort();
Expand All @@ -91,6 +93,7 @@ public interface BigtableToHbasePipelineOptions

@TemplateParameter.Text(
description = "Hbase root directory",
groupName = "Target",
helpText = "Hbase root directory, corresponds to hbase.rootdir")
String getHbaseRootDir();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ interface ReadOptions extends BigtableCommonOptions {

@TemplateParameter.Text(
order = 1,
groupName = "Source",
regexes = {"[a-z][a-z0-9\\-]+[a-z0-9]"},
description = "Source Bigtable Instance ID",
helpText = "The source Bigtable instance ID.")
Expand All @@ -156,6 +157,7 @@ interface ReadOptions extends BigtableCommonOptions {

@TemplateParameter.Text(
order = 2,
groupName = "Source",
description = "Source Cloud Bigtable table ID",
helpText = "The source Bigtable table ID.")
@Validation.Required
Expand All @@ -166,6 +168,7 @@ interface ReadOptions extends BigtableCommonOptions {
@TemplateParameter.ProjectId(
order = 3,
optional = true,
groupName = "Source",
description = "Source Cloud Bigtable Project ID",
helpText = "The Bigtable project ID. The default is the project for the Dataflow job.")
@Default.String("")
Expand All @@ -192,6 +195,7 @@ interface ReadChangeStreamOptions extends BigtableCommonOptions.ReadOptions {
@TemplateParameter.Text(
order = 1,
optional = true,
groupName = "Source",
description = "Cloud Bigtable change streams metadata instance ID",
helpText = "The Bigtable change streams metadata instance ID.")
@Default.String("")
Expand All @@ -202,6 +206,7 @@ interface ReadChangeStreamOptions extends BigtableCommonOptions.ReadOptions {
@TemplateParameter.Text(
order = 2,
optional = true,
groupName = "Source",
description = "Cloud Bigtable change streams metadata table ID",
helpText =
"The ID of the Bigtable change streams connector metadata table. If not "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ public interface CdcApplierOptions extends PipelineOptions, BigQueryStorageApiSt
@TemplateParameter.Text(
order = 1,
optional = true,
groupName = "Source",
regexes = {"[,a-zA-Z0-9._-]+"},
description = "Pub/Sub topic(s) to read from",
helpText = "Comma-separated list of PubSub topics to where CDC data is being pushed.")
Expand All @@ -106,6 +107,7 @@ public interface CdcApplierOptions extends PipelineOptions, BigQueryStorageApiSt
@TemplateParameter.Text(
order = 2,
regexes = {"[^/]+"},
groupName = "Source",
description = "Input subscriptions to the template",
helpText =
"The comma-separated list of Pub/Sub input subscriptions to read from, in the format `<SUBSCRIPTION_NAME>,<SUBSCRIPTION_NAME>, ...`")
Expand All @@ -116,6 +118,7 @@ public interface CdcApplierOptions extends PipelineOptions, BigQueryStorageApiSt
@TemplateParameter.Text(
order = 3,
regexes = {".+"},
groupName = "Target",
description = "Output BigQuery dataset for Changelog tables",
helpText =
"The BigQuery dataset to store the staging tables in, in the format <DATASET_NAME>.")
Expand All @@ -126,6 +129,7 @@ public interface CdcApplierOptions extends PipelineOptions, BigQueryStorageApiSt
@TemplateParameter.Text(
order = 4,
regexes = {".+"},
groupName = "Target",
description = "Output BigQuery dataset for replica tables",
helpText =
"The location of the BigQuery dataset to store the replica tables in, in the format <DATASET_NAME>.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ public interface WriteToGCSAvroOptions extends PipelineOptions {

@TemplateParameter.GcsWriteFolder(
order = 1,
groupName = "Target",
description = "Output file directory in Cloud Storage",
helpText = "The path and filename prefix for writing output files. Must end with a slash.",
example = "gs://your-bucket/your-path")
Expand All @@ -112,6 +113,7 @@ public interface WriteToGCSAvroOptions extends PipelineOptions {
@TemplateParameter.Text(
order = 2,
optional = true,
groupName = "Target",
description = "Output filename prefix of the files to write",
helpText = "The prefix to place on each windowed file.",
example = "output-")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ public interface WriteToGCSParquetOptions extends PipelineOptions {

@TemplateParameter.GcsWriteFolder(
order = 1,
groupName = "Target",
description = "Output file directory in Cloud Storage",
helpText = "The path and filename prefix for writing output files. Must end with a slash.",
example = "gs://your-bucket/your-path")
Expand All @@ -102,6 +103,7 @@ public interface WriteToGCSParquetOptions extends PipelineOptions {
@TemplateParameter.Text(
order = 2,
optional = true,
groupName = "Target",
description = "Output filename prefix of the files to write",
helpText = "The prefix to place on each windowed file.",
example = "output-")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ public interface WriteToGCSTextOptions extends PipelineOptions {

@TemplateParameter.GcsWriteFolder(
order = 1,
groupName = "Target",
description = "Output file directory in Cloud Storage",
helpText = "The path and filename prefix for writing output files. Must end with a slash.",
example = "gs://your-bucket/your-path")
Expand All @@ -117,6 +118,7 @@ public interface WriteToGCSTextOptions extends PipelineOptions {
@TemplateParameter.Text(
order = 2,
optional = true,
groupName = "Target",
description = "Output filename prefix of the files to write",
helpText = "The prefix to place on each windowed file.",
example = "output-")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public interface DataplexBigQueryToGcsOptions
@TemplateParameter.Text(
order = 1,
optional = false,
groupName = "Source",
regexes = {
"^(projects\\/[^\\n\\r\\/]+\\/locations\\/[^\\n\\r\\/]+\\/lakes\\/[^\\n\\r\\/]+\\/zones\\/[^\\n\\r\\/]+\\/assets\\/[^\\n\\r\\/]+|projects\\/[^\\n\\r\\/]+\\/datasets\\/[^\\n\\r\\/]+)$"
},
Expand All @@ -54,6 +55,7 @@ public interface DataplexBigQueryToGcsOptions
order = 2,
optional = true,
regexes = {"^[a-zA-Z0-9_-]+(,[a-zA-Z0-9_-]+)*$"},
groupName = "Source",
description = "Source BigQuery tables to tier.",
helpText =
"A comma-separated list of BigQuery tables to tier. If none specified, all tables will be tiered. Tables should be specified by their name only (no project/dataset prefix). Case-sensitive!")
Expand All @@ -67,6 +69,7 @@ public interface DataplexBigQueryToGcsOptions
regexes = {
"^projects\\/[^\\n\\r\\/]+\\/locations\\/[^\\n\\r\\/]+\\/lakes\\/[^\\n\\r\\/]+\\/zones\\/[^\\n\\r\\/]+\\/assets\\/[^\\n\\r\\/]+$"
},
groupName = "Target",
description = "Dataplex asset name for the destination Cloud Storage bucket.",
helpText =
"Dataplex asset name for the Cloud Storage bucket to tier data to. Format: projects/<name>/locations/<loc>/lakes/<lake-name>/zones/<zone-name>/assets/<asset name>.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public interface DataplexJdbcIngestionOptions
regexes = {
"(^jdbc:[a-zA-Z0-9/:@.?_+!*=&-;]+$)|(^([A-Za-z0-9+/]{4}){1,}([A-Za-z0-9+/]{0,3})={0,3})"
},
groupName = "Source",
description = "JDBC connection URL string.",
helpText =
"Url connection string to connect to the JDBC source. Connection string can be passed in"
Expand Down Expand Up @@ -127,6 +128,7 @@ public interface DataplexJdbcIngestionOptions
void setQuery(String query);

@TemplateParameter.Text(
groupName = "Target",
order = 8,
optional = false,
regexes = {"^.+$"},
Expand Down Expand Up @@ -170,6 +172,7 @@ public interface DataplexJdbcIngestionOptions
+ "\\r"
+ "\\/]+$"
},
groupName = "Target",
description = "Dataplex output asset ID",
helpText =
"Dataplex output asset ID to which the results are stored to. Should be in the format of"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ public interface FileFormatConversionOptions
"^(projects\\/[^\\n\\r\\/]+\\/locations\\/[^\\n\\r\\/]+\\/lakes\\/[^\\n\\r\\/]+\\/zones\\/[^\\n\\r\\/]+\\/assets\\/[^\\n\\r\\/]+|projects\\/[^\\n\\r\\/]+\\/locations\\/[^\\n\\r\\/]+\\/lakes\\/[^\\n\\r\\/]+\\/zones\\/[^\\n\\r\\/]+\\/entities\\/[^\\n\\r\\/,]+(,projects\\/[^\\n\\r\\/]+\\/locations\\/[^\\n\\r\\/]+\\/lakes\\/[^\\n\\r\\/]+\\/zones\\/[^\\n\\r\\/]+\\/entities\\/[^\\n\\r\\/,]+)*)$"
},
description = "Dataplex asset name or Dataplex entity names for the files to be converted.",
groupName = "Source",
helpText =
"Dataplex asset or Dataplex entities that contain the input files. Format:"
+ " projects/<name>/locations/<loc>/lakes/<lake-name>/zones/<zone-name>/assets/<asset"
Expand Down Expand Up @@ -170,6 +171,7 @@ public interface FileFormatConversionOptions
regexes = {
"^projects\\/[^\\n\\r\\/]+\\/locations\\/[^\\n\\r\\/]+\\/lakes\\/[^\\n\\r\\/]+\\/zones\\/[^\\n\\r\\/]+\\/assets\\/[^\\n\\r\\/]+$"
},
groupName = "Target",
description = "Dataplex asset name for the destination Cloud Storage bucket.",
helpText =
"Name of the Dataplex asset that contains Cloud Storage bucket where output files will"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ public interface Options

@TemplateParameter.GcsReadFile(
order = 1,
groupName = "Source",
description = "File location for Datastream file output in Cloud Storage.",
helpText =
"The file location for Datastream file output in Cloud Storage, in the format: gs://<BUCKET_NAME>/<ROOT_PATH>/.")
Expand Down Expand Up @@ -210,6 +211,7 @@ public interface Options
order = 7,
optional = true,
description = "Project Id for BigQuery datasets.",
groupName = "Target",
helpText =
"The ID of the Google Cloud project that contains the BigQuery datasets to output data into. The default for this parameter is the project where the Dataflow pipeline is running.")
String getOutputProjectId();
Expand All @@ -218,6 +220,7 @@ public interface Options

@TemplateParameter.Text(
order = 8,
groupName = "Target",
description = "Name or template for the dataset to contain staging tables.",
helpText =
"The name of the dataset that contains staging tables. This parameter supports templates, for example {_metadata_dataset}_log or my_dataset_log. Normally, this parameter is a dataset name. Defaults to: {_metadata_dataset}.")
Expand All @@ -229,6 +232,7 @@ public interface Options
@TemplateParameter.Text(
order = 9,
optional = true,
groupName = "Target",
description = "Template for the name of staging tables.",
helpText =
"The template to use to name the staging tables. For example, {_metadata_table}). Defaults to: {_metadata_table}_log.")
Expand All @@ -239,6 +243,7 @@ public interface Options

@TemplateParameter.Text(
order = 10,
groupName = "Target",
description = "Template for the dataset to contain replica tables.",
helpText =
"The name of the dataset that contains the replica tables. This parameter supports templates, for example {_metadata_dataset} or my_dataset. Normally, this parameter is a dataset name. Defaults to: {_metadata_dataset}.")
Expand All @@ -249,6 +254,7 @@ public interface Options

@TemplateParameter.Text(
order = 11,
groupName = "Target",
optional = true,
description = "Template for the name of replica tables.",
helpText =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ public interface Options extends PipelineOptions, StreamingOptions {
@TemplateParameter.GcsReadFile(
order = 1,
description = "Cloud Storage Input File(s)",
groupName = "Source",
helpText = "Path of the file pattern glob to read from.",
example = "gs://your-bucket/path/*.avro")
String getInputFilePattern();
Expand All @@ -108,6 +109,7 @@ public interface Options extends PipelineOptions, StreamingOptions {

@TemplateParameter.PubsubSubscription(
order = 3,
groupName = "Source",
optional = false,
description = "Pub/Sub input subscription",
helpText =
Expand Down Expand Up @@ -148,6 +150,7 @@ public interface Options extends PipelineOptions, StreamingOptions {
void setFileReadConcurrency(Integer value);

@TemplateParameter.Text(
groupName = "Target",
order = 7,
description = "MongoDB Connection URI",
helpText = "URI to connect to MongoDB Atlas.")
Expand All @@ -156,6 +159,7 @@ public interface Options extends PipelineOptions, StreamingOptions {
void setMongoDBUri(String value);

@TemplateParameter.Text(
groupName = "Target",
order = 8,
description = "MongoDB Database",
helpText = "Database in MongoDB to store the collection.",
Expand All @@ -165,6 +169,7 @@ public interface Options extends PipelineOptions, StreamingOptions {
void setDatabase(String value);

@TemplateParameter.Text(
groupName = "Target",
order = 9,
description = "MongoDB collection",
helpText = "Name of the collection inside MongoDB database.",
Expand Down
Loading

0 comments on commit 43219df

Please sign in to comment.