From b24053f14f81201ad01d7cf34b1cd9468564cc94 Mon Sep 17 00:00:00 2001 From: psainics Date: Wed, 25 Oct 2023 10:30:42 +0530 Subject: [PATCH] Allow flexible column names --- docs/BigQueryMultiTable-batchsink.md | 6 + docs/BigQueryTable-batchsink.md | 6 + .../bigquery/sink/AbstractBigQuerySink.java | 4 +- .../bigquery/sink/BigQueryOutputFormat.java | 2 +- .../gcp/bigquery/sink/BigQuerySink.java | 2 +- .../gcp/bigquery/sink/BigQuerySinkConfig.java | 17 +- .../gcp/bigquery/sink/BigQuerySinkUtils.java | 14 +- .../DelegatingMultiSinkOutputCommitter.java | 2 +- .../sink/DelegatingMultiSinkRecordWriter.java | 2 +- .../sink/lib/BigQueryOutputConfiguration.java | 529 ++++++++++++++ .../bigquery/sink/lib/BigQueryStrings.java | 89 +++ .../sink/lib/BigQueryTableFieldSchema.java | 120 ++++ .../sink/lib/BigQueryTableHelper.java | 79 +++ .../sink/lib/BigQueryTableSchema.java | 79 +++ .../sink/lib/BigQueryTimePartitioning.java | 106 +++ .../sqlengine/BigQueryPushDataset.java | 2 +- .../gcp/bigquery/util/BigQueryUtil.java | 2 +- .../gcp/dataplex/sink/DataplexBatchSink.java | 5 +- .../bigquery/sink/BigQuerySinkConfigTest.java | 661 +++++++++++------- .../bigquery/sink/BigQuerySinkUtilsTest.java | 2 +- .../sink/config/DataplexBatchSinkTest.java | 3 +- 21 files changed, 1443 insertions(+), 289 deletions(-) create mode 100644 src/main/java/io/cdap/plugin/gcp/bigquery/sink/lib/BigQueryOutputConfiguration.java create mode 100644 src/main/java/io/cdap/plugin/gcp/bigquery/sink/lib/BigQueryStrings.java create mode 100644 src/main/java/io/cdap/plugin/gcp/bigquery/sink/lib/BigQueryTableFieldSchema.java create mode 100644 src/main/java/io/cdap/plugin/gcp/bigquery/sink/lib/BigQueryTableHelper.java create mode 100644 src/main/java/io/cdap/plugin/gcp/bigquery/sink/lib/BigQueryTableSchema.java create mode 100644 src/main/java/io/cdap/plugin/gcp/bigquery/sink/lib/BigQueryTimePartitioning.java diff --git a/docs/BigQueryMultiTable-batchsink.md b/docs/BigQueryMultiTable-batchsink.md index 0c4c01ba2e..1372667c25 100644 --- a/docs/BigQueryMultiTable-batchsink.md +++ b/docs/BigQueryMultiTable-batchsink.md @@ -255,3 +255,9 @@ GET https://www.googleapis.com/bigquery/v2/projects/xxxx/datasets/mysql_bq_perm? have the permission to read the dataset you specified in this plugin. You must grant "BigQuery Data Editor" role on the project identified by the `Dataset Project ID` you specified in this plugin to the service account. If you think you already granted the role, check if you granted the role on the wrong project (for example the one identified by the `Project ID`). + +Column Names +------------ +A column name can contain the letters (a-z, A-Z), numbers (0-9), or underscores (_), and it must start with a letter or +underscore. For more flexible column name support, see +[flexible column names](https://cloud.google.com/bigquery/docs/schemas#flexible-column-names). \ No newline at end of file diff --git a/docs/BigQueryTable-batchsink.md b/docs/BigQueryTable-batchsink.md index a48688ff35..62eb9d5ee3 100644 --- a/docs/BigQueryTable-batchsink.md +++ b/docs/BigQueryTable-batchsink.md @@ -298,3 +298,9 @@ GET https://www.googleapis.com/bigquery/v2/projects/xxxx/datasets/mysql_bq_perm? have the permission to read the dataset you specified in this plugin. You must grant "BigQuery Data Editor" role on the project identified by the `Dataset Project ID` you specified in this plugin to the service account. If you think you already granted the role, check if you granted the role on the wrong project (for example the one identified by the `Project ID`). + +Column Names +------------ +A column name can contain the letters (a-z, A-Z), numbers (0-9), or underscores (_), and it must start with a letter or +underscore. For more flexible column name support, see +[flexible column names](https://cloud.google.com/bigquery/docs/schemas#flexible-column-names). \ No newline at end of file diff --git a/src/main/java/io/cdap/plugin/gcp/bigquery/sink/AbstractBigQuerySink.java b/src/main/java/io/cdap/plugin/gcp/bigquery/sink/AbstractBigQuerySink.java index 5e2f38e72b..5c2980e4db 100644 --- a/src/main/java/io/cdap/plugin/gcp/bigquery/sink/AbstractBigQuerySink.java +++ b/src/main/java/io/cdap/plugin/gcp/bigquery/sink/AbstractBigQuerySink.java @@ -21,7 +21,6 @@ import com.google.cloud.bigquery.DatasetId; import com.google.cloud.bigquery.Table; import com.google.cloud.hadoop.io.bigquery.BigQueryConfiguration; -import com.google.cloud.hadoop.io.bigquery.output.BigQueryTableFieldSchema; import com.google.cloud.kms.v1.CryptoKeyName; import com.google.cloud.storage.Bucket; import com.google.cloud.storage.Storage; @@ -36,12 +35,12 @@ import io.cdap.cdap.etl.api.batch.BatchSink; import io.cdap.cdap.etl.api.batch.BatchSinkContext; import io.cdap.plugin.common.Asset; +import io.cdap.plugin.gcp.bigquery.sink.lib.BigQueryTableFieldSchema; import io.cdap.plugin.gcp.bigquery.util.BigQueryConstants; import io.cdap.plugin.gcp.bigquery.util.BigQueryTypeSize; import io.cdap.plugin.gcp.bigquery.util.BigQueryUtil; import io.cdap.plugin.gcp.common.CmekUtils; import io.cdap.plugin.gcp.common.GCPUtils; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.NullWritable; import org.slf4j.Logger; @@ -55,7 +54,6 @@ import java.util.Set; import java.util.UUID; import java.util.stream.Collectors; - import javax.annotation.Nullable; /** diff --git a/src/main/java/io/cdap/plugin/gcp/bigquery/sink/BigQueryOutputFormat.java b/src/main/java/io/cdap/plugin/gcp/bigquery/sink/BigQueryOutputFormat.java index 0b14d1a439..7d2952c814 100644 --- a/src/main/java/io/cdap/plugin/gcp/bigquery/sink/BigQueryOutputFormat.java +++ b/src/main/java/io/cdap/plugin/gcp/bigquery/sink/BigQueryOutputFormat.java @@ -51,7 +51,6 @@ import com.google.cloud.hadoop.io.bigquery.BigQueryFactory; import com.google.cloud.hadoop.io.bigquery.BigQueryFileFormat; import com.google.cloud.hadoop.io.bigquery.BigQueryHelper; -import com.google.cloud.hadoop.io.bigquery.BigQueryStrings; import com.google.cloud.hadoop.io.bigquery.BigQueryUtils; import com.google.cloud.hadoop.io.bigquery.output.BigQueryOutputConfiguration; import com.google.cloud.hadoop.io.bigquery.output.ForwardingBigQueryFileOutputCommitter; @@ -62,6 +61,7 @@ import com.google.common.base.Strings; import com.google.common.collect.Lists; import io.cdap.cdap.api.data.format.StructuredRecord; +import io.cdap.plugin.gcp.bigquery.sink.lib.BigQueryStrings; import io.cdap.plugin.gcp.bigquery.source.BigQueryFactoryWithScopes; import io.cdap.plugin.gcp.bigquery.util.BigQueryConstants; import io.cdap.plugin.gcp.bigquery.util.BigQueryUtil; diff --git a/src/main/java/io/cdap/plugin/gcp/bigquery/sink/BigQuerySink.java b/src/main/java/io/cdap/plugin/gcp/bigquery/sink/BigQuerySink.java index 5445906e6d..73b084e67b 100644 --- a/src/main/java/io/cdap/plugin/gcp/bigquery/sink/BigQuerySink.java +++ b/src/main/java/io/cdap/plugin/gcp/bigquery/sink/BigQuerySink.java @@ -25,7 +25,6 @@ import com.google.cloud.bigquery.JobStatistics; import com.google.cloud.bigquery.Table; import com.google.cloud.bigquery.TimePartitioning; -import com.google.cloud.hadoop.io.bigquery.output.BigQueryTableFieldSchema; import com.google.common.collect.ImmutableMap; import com.google.gson.Gson; import io.cdap.cdap.api.annotation.Description; @@ -44,6 +43,7 @@ import io.cdap.cdap.etl.api.engine.sql.SQLEngineOutput; import io.cdap.cdap.etl.common.Constants; import io.cdap.plugin.gcp.bigquery.connector.BigQueryConnector; +import io.cdap.plugin.gcp.bigquery.sink.lib.BigQueryTableFieldSchema; import io.cdap.plugin.gcp.bigquery.sqlengine.BigQuerySQLEngine; import io.cdap.plugin.gcp.bigquery.sqlengine.BigQueryWrite; import io.cdap.plugin.gcp.bigquery.util.BigQueryConstants; diff --git a/src/main/java/io/cdap/plugin/gcp/bigquery/sink/BigQuerySinkConfig.java b/src/main/java/io/cdap/plugin/gcp/bigquery/sink/BigQuerySinkConfig.java index aa09774d3b..f15749cfb0 100644 --- a/src/main/java/io/cdap/plugin/gcp/bigquery/sink/BigQuerySinkConfig.java +++ b/src/main/java/io/cdap/plugin/gcp/bigquery/sink/BigQuerySinkConfig.java @@ -58,7 +58,8 @@ public final class BigQuerySinkConfig extends AbstractBigQuerySinkConfig { private static final String WHERE = "WHERE"; public static final Set SUPPORTED_CLUSTERING_TYPES = ImmutableSet.of(Schema.Type.INT, Schema.Type.LONG, Schema.Type.STRING, Schema.Type.BOOLEAN, Schema.Type.BYTES); - private static final Pattern FIELD_PATTERN = Pattern.compile("[a-zA-Z0-9_]+"); + // Read More : https://cloud.google.com/bigquery/docs/schemas#flexible-column-names + private static final Pattern FIELD_PATTERN = Pattern.compile("[\\p{L}\\p{M}\\p{N}\\p{Pc}\\p{Pd}&%+=:'<>#| ]+"); public static final String NAME_TABLE = "table"; public static final String NAME_SCHEMA = "schema"; @@ -75,6 +76,7 @@ public final class BigQuerySinkConfig extends AbstractBigQuerySinkConfig { public static final String NAME_RANGE_INTERVAL = "rangeInterval"; public static final int MAX_NUMBER_OF_COLUMNS = 4; + private static final int MAX_LENGTH_OF_COLUMN_NAME = 300; @Name(NAME_TABLE) @Macro @@ -345,9 +347,18 @@ public void validate(@Nullable Schema inputSchema, @Nullable Schema outputSchema String name = field.getName(); // BigQuery column names only allow alphanumeric characters and _ // https://cloud.google.com/bigquery/docs/schemas#column_names + // Allow support for Flexible column names + // https://cloud.google.com/bigquery/docs/schemas#flexible-column-names if (!FIELD_PATTERN.matcher(name).matches()) { - collector.addFailure(String.format("Output field '%s' must only contain alphanumeric characters and '_'.", - name), null).withOutputSchemaField(name); + collector.addFailure(String.format("Output field '%s' contains invalid characters. " + + "Check column names docs for more details.", + name), null).withOutputSchemaField(name); + } + + // Check if the field name exceeds the maximum length of 300 characters. + if (name.length() > MAX_LENGTH_OF_COLUMN_NAME) { + collector.addFailure(String.format("Output field '%s' exceeds the maximum length of 300 characters.", + name), null).withOutputSchemaField(name); } // check if the required fields are present in the input schema. diff --git a/src/main/java/io/cdap/plugin/gcp/bigquery/sink/BigQuerySinkUtils.java b/src/main/java/io/cdap/plugin/gcp/bigquery/sink/BigQuerySinkUtils.java index af6c244834..4ee209de09 100644 --- a/src/main/java/io/cdap/plugin/gcp/bigquery/sink/BigQuerySinkUtils.java +++ b/src/main/java/io/cdap/plugin/gcp/bigquery/sink/BigQuerySinkUtils.java @@ -28,9 +28,6 @@ import com.google.cloud.bigquery.Table; import com.google.cloud.bigquery.TableId; import com.google.cloud.hadoop.io.bigquery.BigQueryFileFormat; -import com.google.cloud.hadoop.io.bigquery.output.BigQueryOutputConfiguration; -import com.google.cloud.hadoop.io.bigquery.output.BigQueryTableFieldSchema; -import com.google.cloud.hadoop.io.bigquery.output.BigQueryTableSchema; import com.google.cloud.kms.v1.CryptoKeyName; import com.google.cloud.storage.Bucket; import com.google.cloud.storage.Storage; @@ -43,6 +40,9 @@ import io.cdap.cdap.etl.api.validation.ValidationFailure; import io.cdap.plugin.common.Asset; import io.cdap.plugin.common.LineageRecorder; +import io.cdap.plugin.gcp.bigquery.sink.lib.BigQueryOutputConfiguration; +import io.cdap.plugin.gcp.bigquery.sink.lib.BigQueryTableFieldSchema; +import io.cdap.plugin.gcp.bigquery.sink.lib.BigQueryTableSchema; import io.cdap.plugin.gcp.bigquery.util.BigQueryConstants; import io.cdap.plugin.gcp.bigquery.util.BigQueryTypeSize.Numeric; import io.cdap.plugin.gcp.bigquery.util.BigQueryUtil; @@ -62,6 +62,7 @@ import java.util.Objects; import java.util.Set; import java.util.function.Supplier; +import java.util.regex.Pattern; import java.util.stream.Collectors; import javax.annotation.Nullable; @@ -611,6 +612,13 @@ private static BigQueryFileFormat getFileFormat(List f if (DATETIME.equals(field.getType())) { return BigQueryFileFormat.NEWLINE_DELIMITED_JSON; } + // If the field name is not in english characters, then we will use json format + // We do this as the avro load job in BQ does not support non-english characters in field names for now + String fieldName = field.getName(); + final String englishCharactersRegex = "[\\w]+"; + if (!Pattern.matches(englishCharactersRegex, fieldName)) { + return BigQueryFileFormat.NEWLINE_DELIMITED_JSON; + } // If the field is a record we have to check its subfields. if (RECORD.equals(field.getType())) { if (getFileFormat(field.getFields()) == BigQueryFileFormat.NEWLINE_DELIMITED_JSON) { diff --git a/src/main/java/io/cdap/plugin/gcp/bigquery/sink/DelegatingMultiSinkOutputCommitter.java b/src/main/java/io/cdap/plugin/gcp/bigquery/sink/DelegatingMultiSinkOutputCommitter.java index e5ad0cbff2..fb69e7c6c2 100644 --- a/src/main/java/io/cdap/plugin/gcp/bigquery/sink/DelegatingMultiSinkOutputCommitter.java +++ b/src/main/java/io/cdap/plugin/gcp/bigquery/sink/DelegatingMultiSinkOutputCommitter.java @@ -17,8 +17,8 @@ package io.cdap.plugin.gcp.bigquery.sink; import com.google.cloud.bigquery.DatasetId; -import com.google.cloud.hadoop.io.bigquery.output.BigQueryTableFieldSchema; import io.cdap.cdap.api.data.schema.Schema; +import io.cdap.plugin.gcp.bigquery.sink.lib.BigQueryTableFieldSchema; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.JobStatus; import org.apache.hadoop.mapreduce.OutputCommitter; diff --git a/src/main/java/io/cdap/plugin/gcp/bigquery/sink/DelegatingMultiSinkRecordWriter.java b/src/main/java/io/cdap/plugin/gcp/bigquery/sink/DelegatingMultiSinkRecordWriter.java index 8d2e0faa02..d7323567d6 100644 --- a/src/main/java/io/cdap/plugin/gcp/bigquery/sink/DelegatingMultiSinkRecordWriter.java +++ b/src/main/java/io/cdap/plugin/gcp/bigquery/sink/DelegatingMultiSinkRecordWriter.java @@ -17,9 +17,9 @@ package io.cdap.plugin.gcp.bigquery.sink; import com.google.cloud.bigquery.DatasetId; -import com.google.cloud.hadoop.io.bigquery.output.BigQueryTableFieldSchema; import io.cdap.cdap.api.data.format.StructuredRecord; import io.cdap.cdap.api.data.schema.Schema; +import io.cdap.plugin.gcp.bigquery.sink.lib.BigQueryTableFieldSchema; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.OutputCommitter; import org.apache.hadoop.mapreduce.RecordWriter; diff --git a/src/main/java/io/cdap/plugin/gcp/bigquery/sink/lib/BigQueryOutputConfiguration.java b/src/main/java/io/cdap/plugin/gcp/bigquery/sink/lib/BigQueryOutputConfiguration.java new file mode 100644 index 0000000000..f59c0f36f8 --- /dev/null +++ b/src/main/java/io/cdap/plugin/gcp/bigquery/sink/lib/BigQueryOutputConfiguration.java @@ -0,0 +1,529 @@ +/* + * Copyright © 2023 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package io.cdap.plugin.gcp.bigquery.sink.lib; + +import com.google.api.services.bigquery.model.TableReference; +import com.google.api.services.bigquery.model.TableSchema; +import com.google.api.services.bigquery.model.TimePartitioning; +import com.google.cloud.hadoop.io.bigquery.BigQueryConfiguration; +import com.google.cloud.hadoop.io.bigquery.BigQueryFileFormat; +import com.google.cloud.hadoop.io.bigquery.HadoopConfigurationProperty; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.base.Strings; +import com.google.common.collect.ImmutableList; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.util.ReflectionUtils; + +import java.io.IOException; +import java.util.List; +import java.util.Optional; + +import static com.google.cloud.hadoop.io.bigquery.BigQueryConfiguration.OUTPUT_CLEANUP_TEMP; +import static com.google.cloud.hadoop.io.bigquery.BigQueryConfiguration.OUTPUT_DATASET_ID; +import static com.google.cloud.hadoop.io.bigquery.BigQueryConfiguration.OUTPUT_FILE_FORMAT; +import static com.google.cloud.hadoop.io.bigquery.BigQueryConfiguration.OUTPUT_FORMAT_CLASS; +import static com.google.cloud.hadoop.io.bigquery.BigQueryConfiguration.OUTPUT_PROJECT_ID; +import static com.google.cloud.hadoop.io.bigquery.BigQueryConfiguration.OUTPUT_TABLE_CREATE_DISPOSITION; +import static com.google.cloud.hadoop.io.bigquery.BigQueryConfiguration.OUTPUT_TABLE_ID; +import static com.google.cloud.hadoop.io.bigquery.BigQueryConfiguration.OUTPUT_TABLE_KMS_KEY_NAME; +import static com.google.cloud.hadoop.io.bigquery.BigQueryConfiguration.OUTPUT_TABLE_PARTITIONING; +import static com.google.cloud.hadoop.io.bigquery.BigQueryConfiguration.OUTPUT_TABLE_SCHEMA; +import static com.google.cloud.hadoop.io.bigquery.BigQueryConfiguration.OUTPUT_TABLE_WRITE_DISPOSITION; +import static com.google.cloud.hadoop.io.bigquery.BigQueryConfiguration.PROJECT_ID; +import static com.google.cloud.hadoop.util.ConfigurationUtil.getMandatoryConfig; + +/** + * A container for configuration keys related to BigQuery indirect output formats. Alternatively, + * the properties can be set in the configuration xml files with proper values. + * - Moved from com.google.cloud.hadoop.io.bigquery bigquery-connector library + * - Update the table regex as per the requirement of flexible column names + */ +@InterfaceStability.Unstable +public class BigQueryOutputConfiguration { + + /** A list of keys that are required for this output connector. */ + public static final List> REQUIRED_PROPERTIES = + ImmutableList.of(OUTPUT_DATASET_ID, OUTPUT_TABLE_ID, OUTPUT_FILE_FORMAT, OUTPUT_FORMAT_CLASS); + + /** + * A helper function to set the required output keys in the given configuration. + * + * @param conf the configuration to set the keys on. + * @param qualifiedOutputTableId the qualified id of the output table in the form: (Optional + * ProjectId):[DatasetId].[TableId]. If the project id is missing, the default project + * id is attempted {@link BigQueryConfiguration#PROJECT_ID}. + * @param outputTableSchemaJson the schema of the BigQuery output table. + * @param outputGcsPath the path in GCS to stage data in. Example: 'gs://bucket/job'. + * @param outputFileFormat the formatting of the data being written by the output format class. + * @param outputFormatClass the file output format that will write files to GCS. + * @throws IOException + */ + @SuppressWarnings("rawtypes") + private static void configure( + Configuration conf, + String qualifiedOutputTableId, + String outputTableSchemaJson, + String outputGcsPath, + BigQueryFileFormat outputFileFormat, + Class outputFormatClass) + throws IOException { + Preconditions.checkArgument( + !Strings.isNullOrEmpty(outputTableSchemaJson), + "outputTableSchemaJson must not be null or empty."); + TableReference outputTable = BigQueryStrings.parseTableReference(qualifiedOutputTableId); + configure( + conf, + outputTable.getProjectId(), + outputTable.getDatasetId(), + outputTable.getTableId(), + Optional.of(outputTableSchemaJson), + outputGcsPath, + outputFileFormat, + outputFormatClass); + } + + /** + * A helper function to set the required output keys in the given configuration. + * + * @param conf the configuration to set the keys on. + * @param outputProjectId the id of the output project. If the project id is null, the default + * project id is attempted {@link BigQueryConfiguration#PROJECT_ID}. + * @param outputDatasetId the id of the output dataset. + * @param outputTableId the id of the output table. + * @param outputTableSchemaJson the schema of the BigQuery output table. If the schema is null, + * BigQuery will attempt to auto detect the schema. When using avro formatted data, a schema + * is not required as avro stores the schema in the file. + * @param outputGcsPath the path in GCS to stage data in. Example: 'gs://bucket/job'. + * @param outputFileFormat the formatting of the data being written by the output format class. + * @param outputFormatClass the file output format that will write files to GCS. + * @throws IOException + */ + @SuppressWarnings("rawtypes") + private static void configure( + Configuration conf, + String outputProjectId, + String outputDatasetId, + String outputTableId, + Optional outputTableSchemaJson, + String outputGcsPath, + BigQueryFileFormat outputFileFormat, + Class outputFormatClass) + throws IOException { + + // Use the default project ID as a backup. + if (Strings.isNullOrEmpty(outputProjectId)) { + outputProjectId = PROJECT_ID.get(conf, conf::get); + } + + Preconditions.checkArgument( + !Strings.isNullOrEmpty(outputProjectId), "outputProjectId must not be null or empty."); + Preconditions.checkArgument( + !Strings.isNullOrEmpty(outputDatasetId), "outputDatasetId must not be null or empty."); + Preconditions.checkArgument( + !Strings.isNullOrEmpty(outputTableId), "outputTableId must not be null or empty."); + Preconditions.checkArgument( + !Strings.isNullOrEmpty(outputGcsPath), "outputGcsPath must not be null or empty."); + Preconditions.checkNotNull(outputFileFormat, "outputFileFormat must not be null."); + Preconditions.checkNotNull(outputFormatClass, "outputFormatClass must not be null."); + + conf.set(OUTPUT_PROJECT_ID.getKey(), outputProjectId); + conf.set(OUTPUT_DATASET_ID.getKey(), outputDatasetId); + conf.set(OUTPUT_TABLE_ID.getKey(), outputTableId); + conf.set(OUTPUT_FILE_FORMAT.getKey(), outputFileFormat.name()); + conf.setClass(OUTPUT_FORMAT_CLASS.getKey(), outputFormatClass, FileOutputFormat.class); + + setFileOutputFormatOutputPath(conf, outputGcsPath); + + // If a schema is provided, serialize it. + if (outputTableSchemaJson.isPresent()) { + TableSchema tableSchema = BigQueryTableHelper.parseTableSchema(outputTableSchemaJson.get()); + String fieldsJson = BigQueryTableHelper.getTableFieldsJson(tableSchema); + conf.set(OUTPUT_TABLE_SCHEMA.getKey(), fieldsJson); + } + } + + /** + * A helper function to set the required output keys in the given configuration. + * + * @param conf the configuration to set the keys on. + * @param qualifiedOutputTableId the qualified id of the output table in the form: (Optional + * ProjectId):[DatasetId].[TableId]. If the project id is missing, the default project + * id is attempted {@link BigQueryConfiguration#PROJECT_ID}. + * @param outputTableSchema the schema of the BigQuery output table. If the schema is null, + * BigQuery will attempt to auto detect the schema. When using avro formatted data, a schema + * is not required as avro stores the schema in the file. + * @param outputGcsPath the path in GCS to stage data in. Example: 'gs://bucket/job'. + * @param outputFileFormat the formatting of the data being written by the output format class. + * @param outputFormatClass the file output format that will write files to GCS. + * @throws IOException + */ + @SuppressWarnings("rawtypes") + public static void configure( + Configuration conf, + String qualifiedOutputTableId, + BigQueryTableSchema outputTableSchema, + String outputGcsPath, + BigQueryFileFormat outputFileFormat, + Class outputFormatClass) + throws IOException { + configure( + conf, + qualifiedOutputTableId, + BigQueryTableHelper.getTableSchemaJson(outputTableSchema.get()), + outputGcsPath, + outputFileFormat, + outputFormatClass); + } + + /** + * A helper function to set the required output keys in the given configuration. + * + *

This method will set the output table schema as auto-detected. + * + * @param conf the configuration to set the keys on. + * @param qualifiedOutputTableId the qualified id of the output table in the form: (Optional + * ProjectId):[DatasetId].[TableId]. If the project id is missing, the default project + * id is attempted {@link BigQueryConfiguration#PROJECT_ID}. + * @param outputGcsPath the path in GCS to stage data in. Example: 'gs://bucket/job'. + * @param outputFileFormat the formatting of the data being written by the output format class. + * @param outputFormatClass the file output format that will write files to GCS. + * @throws IOException + */ + @SuppressWarnings("rawtypes") + public static void configureWithAutoSchema( + Configuration conf, + String qualifiedOutputTableId, + String outputGcsPath, + BigQueryFileFormat outputFileFormat, + Class outputFormatClass) + throws IOException { + TableReference outputTable = BigQueryStrings.parseTableReference(qualifiedOutputTableId); + configure( + conf, + outputTable.getProjectId(), + outputTable.getDatasetId(), + outputTable.getTableId(), + /* outputTableSchemaJson= */ Optional.empty(), + outputGcsPath, + outputFileFormat, + outputFormatClass); + } + + public static void setKmsKeyName(Configuration conf, String kmsKeyName) { + Preconditions.checkArgument( + !Strings.isNullOrEmpty(kmsKeyName), "kmsKeyName must not be null or empty."); + conf.set(OUTPUT_TABLE_KMS_KEY_NAME.getKey(), kmsKeyName); + } + + /** + * Helper function that validates the output configuration. Ensures the project id, dataset id, + * and table id exist in the configuration. This also ensures that if a schema is provided, that + * it is properly formatted. + * + * @param conf the configuration to validate. + * @throws IOException if the configuration is missing a key, or there's an issue while parsing + * the schema in the configuration. + */ + public static void validateConfiguration(Configuration conf) throws IOException { + // Ensure the BigQuery output information is valid. + getMandatoryConfig(conf, REQUIRED_PROPERTIES); + + // Run through the individual getters as they manage error handling. + getProjectId(conf); + getJobProjectId(conf); + getTableSchema(conf); + getFileFormat(conf); + getFileOutputFormat(conf); + getGcsOutputPath(conf); + } + + /** + * Gets if the configuration flag to cleanup temporary data in GCS is enabled or not. + * + * @param conf the configuration to reference the key from. + * @return true if the flag is enabled or missing, false otherwise. + */ + public static boolean getCleanupTemporaryDataFlag(Configuration conf) { + return OUTPUT_CLEANUP_TEMP.get(conf, conf::getBoolean); + } + + /** + * Gets the output dataset project id based on the given configuration. + * + *

If the {@link BigQueryConfiguration#OUTPUT_PROJECT_ID} is missing, this resolves to + * referencing the {@link BigQueryConfiguration#PROJECT_ID} key. + * + *

The load job can be configured with two project identifiers. Configuration key {@link + * BigQueryConfiguration#PROJECT_ID} can set the project on whose behalf to perform BigQuery load + * operation, while {@link BigQueryConfiguration#OUTPUT_PROJECT_ID} can be used to name the + * project that the target dataset belongs to. + * + * @param conf the configuration to reference the keys from. + * @return the project id based on the given configuration. + * @throws IOException if a required key is missing. + */ + public static String getProjectId(Configuration conf) throws IOException { + // Reference the default project ID as a backup. + String projectId = OUTPUT_PROJECT_ID.get(conf, conf::get); + if (Strings.isNullOrEmpty(projectId)) { + projectId = PROJECT_ID.get(conf, conf::get); + } + if (Strings.isNullOrEmpty(projectId)) { + throw new IOException( + "Must supply a value for configuration setting: " + OUTPUT_PROJECT_ID.getKey()); + } + return projectId; + } + + /** + * Gets the project id to be used to run BQ load job based on the given configuration. + * + *

If the {@link BigQueryConfiguration#PROJECT_ID} is missing, this resolves to referencing the + * {@link BigQueryConfiguration#OUTPUT_PROJECT_ID} key. + * + *

The load job can be configured with two project identifiers. Configuration key {@link + * BigQueryConfiguration#PROJECT_ID} can set the project on whose behalf to perform BigQuery load + * operation, while {@link BigQueryConfiguration#OUTPUT_PROJECT_ID} can be used to name the + * project that the target dataset belongs to. + * + * @param conf the configuration to reference the keys from. + * @return the project id based on the given configuration. + * @throws IOException if a required key is missing. + */ + public static String getJobProjectId(Configuration conf) throws IOException { + // Reference the default project ID as a backup. + String projectId = PROJECT_ID.get(conf, conf::get); + if (Strings.isNullOrEmpty(projectId)) { + projectId = OUTPUT_PROJECT_ID.get(conf, conf::get); + } + if (Strings.isNullOrEmpty(projectId)) { + throw new IOException( + "Must supply a value for configuration setting: " + PROJECT_ID.getKey()); + } + return projectId; + } + + /** + * Gets the output table reference based on the given configuration. If the {@link + * BigQueryConfiguration#OUTPUT_PROJECT_ID} is missing, this resolves to referencing the + * {@link BigQueryConfiguration#PROJECT_ID} key. + * + * @param conf the configuration to reference the keys from. + * @return a reference to the derived output table in the format of ":.". + * @throws IOException if a required key is missing. + */ + static TableReference getTableReference(Configuration conf) throws IOException { + // Ensure the BigQuery output information is valid. + String projectId = getProjectId(conf); + String datasetId = getMandatoryConfig(conf, OUTPUT_DATASET_ID); + String tableId = getMandatoryConfig(conf, OUTPUT_TABLE_ID); + + return new TableReference().setProjectId(projectId).setDatasetId(datasetId).setTableId(tableId); + } + + /** + * Gets the output table schema based on the given configuration. + * + * @param conf the configuration to reference the keys from. + * @return the derived table schema, absent value if no table schema exists in the configuration. + * @throws IOException if a table schema was set in the configuration but couldn't be parsed. + */ + static Optional getTableSchema(Configuration conf) throws IOException { + String fieldsJson = OUTPUT_TABLE_SCHEMA.get(conf, conf::get); + if (!Strings.isNullOrEmpty(fieldsJson)) { + try { + TableSchema tableSchema = BigQueryTableHelper.createTableSchemaFromFields(fieldsJson); + return Optional.of(BigQueryTableSchema.wrap(tableSchema)); + } catch (IOException e) { + throw new IOException("Unable to parse key '" + OUTPUT_TABLE_SCHEMA.getKey() + "'.", e); + } + } + return Optional.empty(); + } + + /** + * Gets the output table time partitioning based on the given configuration. + * + * @param conf the configuration to reference the keys from. + * @return the derived table time partitioning, absent value if no table time partitioning exists + * in the configuration. + * @throws IOException if a table time partitioning was set in the configuration but couldn't be + * parsed. + */ + static Optional getTablePartitioning(Configuration conf) + throws IOException { + String fieldsJson = OUTPUT_TABLE_PARTITIONING.get(conf, conf::get); + if (!Strings.isNullOrEmpty(fieldsJson)) { + try { + TimePartitioning tablePartitioning = BigQueryTimePartitioning.getFromJson(fieldsJson); + return Optional.of(BigQueryTimePartitioning.wrap(tablePartitioning)); + } catch (IOException e) { + throw new IOException( + "Unable to parse key '" + OUTPUT_TABLE_PARTITIONING.getKey() + "'.", e); + } + } + return Optional.empty(); + } + + /** + * Gets the output table KMS key name based on the given configuration. + * + * @param conf the configuration to reference the keys from. + * @return the KMS key name of the output table, null if no KMS key name exists in the + * configuration. + */ + public static String getKmsKeyName(Configuration conf) throws IOException { + return OUTPUT_TABLE_KMS_KEY_NAME.get(conf, conf::get); + } + + /** + * Gets the stored output {@link BigQueryFileFormat} in the configuration. + * + * @param conf the configuration to reference the keys from. + * @return the stored output {@link BigQueryFileFormat} in the configuration. + * @throws IOException if file format value is missing from the configuration. + */ + public static BigQueryFileFormat getFileFormat(Configuration conf) throws IOException { + // Ensure the BigQuery output information is valid. + String fileFormatName = getMandatoryConfig(conf, OUTPUT_FILE_FORMAT); + + return BigQueryFileFormat.fromName(fileFormatName); + } + + /** + * Gets a configured instance of the stored {@link FileOutputFormat} in the configuration. + * + * @param conf the configuration to reference the keys from. + * @return a configured instance of the stored {@link FileOutputFormat} in the configuration. + * @throws IOException if there's an issue getting an instance of a FileOutputFormat from the + * configuration. + */ + @SuppressWarnings("rawtypes") + public static FileOutputFormat getFileOutputFormat(Configuration conf) throws IOException { + // Ensure the BigQuery output information is valid. + getMandatoryConfig(conf, OUTPUT_FORMAT_CLASS); + + Class confClass = OUTPUT_FORMAT_CLASS.get(conf, conf::getClass); + + // Fail if the default value was used, or the class isn't a FileOutputFormat. + if (confClass == null) { + throw new IOException( + "Unable to resolve value for the configuration key '" + + OUTPUT_FORMAT_CLASS.getKey() + + "'."); + } else if (!FileOutputFormat.class.isAssignableFrom(confClass)) { + throw new IOException("The class " + confClass.getName() + " is not a FileOutputFormat."); + } + + Class fileOutputClass = + confClass.asSubclass(FileOutputFormat.class); + + // Create a new instance and configure it if it's configurable. + return ReflectionUtils.newInstance(fileOutputClass, conf); + } + + /** + * Gets the stored GCS output path in the configuration. + * + * @param conf the configuration to reference the keys from. + * @return the stored output path in the configuration. + * @throws IOException if the output path isn't set in the configuration, or the output path's + * file system isn't GCS. + */ + public static Path getGcsOutputPath(Configuration conf) throws IOException { + Job tempJob = new BigQueryOutputConfiguration.JobConfigurationAdapter(conf); + + // Error if the output path is missing. + Path outputPath = FileOutputFormat.getOutputPath(tempJob); + if (outputPath == null) { + throw new IOException("FileOutputFormat output path not set."); + } + + // Error if the output file system isn't GCS. + FileSystem fs = outputPath.getFileSystem(conf); + if (!"gs".equals(fs.getScheme())) { + throw new IOException("Output FileSystem must be GCS ('gs' scheme)."); + } + + return outputPath; + } + + /** + * Gets the create disposition of the output table. This specifies if the job should create a + * table for loading data. + * + * @param conf the configuration to reference the keys from. + * @return the create disposition of the output table. + */ + public static String getCreateDisposition(Configuration conf) { + return OUTPUT_TABLE_CREATE_DISPOSITION.get(conf, conf::get); + } + + + /** + * Gets the write disposition of the output table. This specifies the action that occurs if the + * destination table already exists. By default, if the table already exists, BigQuery appends + * data to the output table. + * + * @param conf the configuration to reference the keys from. + * @return the write disposition of the output table. + */ + public static String getWriteDisposition(Configuration conf) { + return OUTPUT_TABLE_WRITE_DISPOSITION.get(conf, conf::get); + } + + /** + * Sets the output path for FileOutputFormat. + * + * @param conf the configuration to pass to FileOutputFormat. + * @param outputPath the path to set as the output path. + * @throws IOException + */ + @VisibleForTesting + static void setFileOutputFormatOutputPath(Configuration conf, String outputPath) + throws IOException { + Job tempJob = new BigQueryOutputConfiguration.JobConfigurationAdapter(conf); + FileOutputFormat.setOutputPath(tempJob, new Path(outputPath)); + } + + /** + * This class provides a workaround for setting FileOutputFormat's output path. Creating a job + * with a configuration creates a defensive copy of the configuration for the job, meaning changes + * in either configuration will not be reflected in the other. Because FileOutputFormat requires a + * job for the API to set an output path, this adapter is used to ensure changes are propagated + * out to the wrapped configuration. + */ + private static class JobConfigurationAdapter extends Job { + + private final Configuration config; + + JobConfigurationAdapter(Configuration config) throws IOException { + super(); + this.config = config; + } + + @Override + public Configuration getConfiguration() { + return config; + } + } +} diff --git a/src/main/java/io/cdap/plugin/gcp/bigquery/sink/lib/BigQueryStrings.java b/src/main/java/io/cdap/plugin/gcp/bigquery/sink/lib/BigQueryStrings.java new file mode 100644 index 0000000000..ab8993c5c1 --- /dev/null +++ b/src/main/java/io/cdap/plugin/gcp/bigquery/sink/lib/BigQueryStrings.java @@ -0,0 +1,89 @@ +/* + * Copyright © 2023 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package io.cdap.plugin.gcp.bigquery.sink.lib; + +import com.google.api.services.bigquery.model.TableReference; +import com.google.common.base.Preconditions; +import com.google.common.base.Splitter; +import com.google.common.base.Strings; +import java.util.List; + +/** + * BigQueryStrings provides misc static helper methods for printing and formatting strings related + * to BigQuery data structures and API objects. + * - Moved from com.google.cloud.hadoop.io.bigquery bigquery-connector library + * - Update the table regex as per the requirement of flexible column names + */ +public class BigQueryStrings { + // Regular expression for validating a datasetId.tableId pair. + // Valid BigQuery table names can contain only Unicode characters in category L (letter), M (mark), N (number), + // Pc (connector, including underscore), Pd (dash), Zs (space). + // See here: https://cloud.google.com/bigquery/docs/tables#table_naming + public static final String DATASET_AND_TABLE_REGEX = "[a-zA-Z0-9_]+\\.[\\p{L}\\p{M}\\p{N}\\p{Pc}\\p{Pd}\\p{Zs}]+"; + + private static final Splitter DOT_SPLITTER = Splitter.on('.'); + + /** + * Returns a String representation of the TableReference suitable for interop with other bigquery + * tools and for passing back into {@link #parseTableReference(String)}. + * + * @param tableRef A TableReference which contains at least DatasetId and TableId. + * @return A string of the form [projectId]:[datasetId].[tableId]. + */ + public static String toString(TableReference tableRef) { + Preconditions.checkArgument(!Strings.isNullOrEmpty(tableRef.getDatasetId()), + "tableRef must contain non-empty DatasetId."); + Preconditions.checkArgument(!Strings.isNullOrEmpty(tableRef.getTableId()), + "tableRef must contain non-empty TableId."); + if (Strings.isNullOrEmpty(tableRef.getProjectId())) { + return String.format("%s.%s", tableRef.getDatasetId(), tableRef.getTableId()); + } else { + return String.format("%s:%s.%s", + tableRef.getProjectId(), tableRef.getDatasetId(), tableRef.getTableId()); + } + } + + /** + * Parses a string into a TableReference; projectId may be omitted if the caller defines a + * "default" project; in such a case, getProjectId() of the returned TableReference will + * return null. + * + * @param tableRefString A string of the form [projectId]:[datasetId].[tableId]. + * @return a TableReference with the parsed components. + */ + public static TableReference parseTableReference(String tableRefString) { + // Logic mirrored from cloud/helix/clients/cli/bigquery_client.py. + TableReference tableRef = new TableReference(); + int projectIdEnd = tableRefString.lastIndexOf(':'); + String datasetAndTableString = tableRefString; + if (projectIdEnd != -1) { + tableRef.setProjectId(tableRefString.substring(0, projectIdEnd)); + + // Omit the ':' from the remaining datasetId.tableId substring. + datasetAndTableString = tableRefString.substring(projectIdEnd + 1); + } + + Preconditions.checkArgument(datasetAndTableString.matches(DATASET_AND_TABLE_REGEX), + "Invalid datasetAndTableString '%s'; must match regex '%s'.", + datasetAndTableString, DATASET_AND_TABLE_REGEX); + + List idParts = DOT_SPLITTER.splitToList(datasetAndTableString); + tableRef.setDatasetId(idParts.get(0)); + tableRef.setTableId(idParts.get(1)); + return tableRef; + } +} + diff --git a/src/main/java/io/cdap/plugin/gcp/bigquery/sink/lib/BigQueryTableFieldSchema.java b/src/main/java/io/cdap/plugin/gcp/bigquery/sink/lib/BigQueryTableFieldSchema.java new file mode 100644 index 0000000000..ba9a47d258 --- /dev/null +++ b/src/main/java/io/cdap/plugin/gcp/bigquery/sink/lib/BigQueryTableFieldSchema.java @@ -0,0 +1,120 @@ +/* + * Copyright © 2023 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package io.cdap.plugin.gcp.bigquery.sink.lib; + + +import com.google.api.services.bigquery.model.TableFieldSchema; +import com.google.common.base.Preconditions; +import java.util.List; + +import static java.util.stream.Collectors.toList; + +/** + * Wrapper for BigQuery {@link TableFieldSchema}. + * + *

This class is used to avoid client code to depend on BigQuery API classes, so that there is no + * potential conflict between different versions of BigQuery API libraries in the client. + * + * @see TableFieldSchema + * - Moved from com.google.cloud.hadoop.io.bigquery bigquery-connector library + * - Update the table regex as per the requirement of flexible column names + */ +public class BigQueryTableFieldSchema { + private final TableFieldSchema fieldSchema; + + public BigQueryTableFieldSchema() { + this.fieldSchema = new TableFieldSchema(); + } + + BigQueryTableFieldSchema(TableFieldSchema fieldSchema) { + Preconditions.checkNotNull(fieldSchema); + this.fieldSchema = fieldSchema; + } + + /** @see TableFieldSchema#getMode() */ + public String getMode() { + return fieldSchema.getMode(); + } + + /** @see TableFieldSchema#setMode(String) */ + public BigQueryTableFieldSchema setMode(String mode) { + fieldSchema.setMode(mode); + return this; + } + + /** @see TableFieldSchema#getName() */ + public String getName() { + return fieldSchema.getName(); + } + + /** @see TableFieldSchema#setName(String) */ + public BigQueryTableFieldSchema setName(String name) { + fieldSchema.setName(name); + return this; + } + + /** @see TableFieldSchema#getType() */ + public String getType() { + return fieldSchema.getType(); + } + + /** @see TableFieldSchema#setType(String) */ + public BigQueryTableFieldSchema setType(String type) { + fieldSchema.setType(type); + return this; + } + + /** + * Gets the nested schema fields if the type property is set to RECORD. + * + * @see TableFieldSchema#getFields() + */ + public List getFields() { + return get().getFields().stream().map(BigQueryTableFieldSchema::new).collect(toList()); + } + + /** + * Sets the nested schema fields if the type property is set to RECORD. + * + * @see TableFieldSchema#setFields(List) + */ + public BigQueryTableFieldSchema setFields(java.util.List fields) { + fieldSchema.setFields(fields.stream().map(BigQueryTableFieldSchema::get).collect(toList())); + return this; + } + + @Override + public int hashCode() { + return fieldSchema.hashCode(); + } + + @Override + public boolean equals(Object object) { + if (!(object instanceof com.google.cloud.hadoop.io.bigquery.output.BigQueryTableFieldSchema)) { + return false; + } + BigQueryTableFieldSchema another = (BigQueryTableFieldSchema) object; + return fieldSchema.equals(another.fieldSchema); + } + + TableFieldSchema get() { + return fieldSchema; + } + + static BigQueryTableFieldSchema wrap(TableFieldSchema fieldSchema) { + return new BigQueryTableFieldSchema(fieldSchema); + } +} diff --git a/src/main/java/io/cdap/plugin/gcp/bigquery/sink/lib/BigQueryTableHelper.java b/src/main/java/io/cdap/plugin/gcp/bigquery/sink/lib/BigQueryTableHelper.java new file mode 100644 index 0000000000..f089d81ba4 --- /dev/null +++ b/src/main/java/io/cdap/plugin/gcp/bigquery/sink/lib/BigQueryTableHelper.java @@ -0,0 +1,79 @@ +/* + * Copyright © 2023 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package io.cdap.plugin.gcp.bigquery.sink.lib; + +import com.google.api.client.json.JsonParser; +import com.google.api.client.json.jackson2.JacksonFactory; +import com.google.api.services.bigquery.model.TableFieldSchema; +import com.google.api.services.bigquery.model.TableSchema; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +/** Helper for BigQuery tables. + * - Moved from com.google.cloud.hadoop.io.bigquery bigquery-connector library + * - Update the table regex as per the requirement of flexible column names + */ +class BigQueryTableHelper { + /** + * Parses table schema JSON into {@link TableSchema}. + * + * @param tableSchemaJson JSON table schema to convert to {@link TableSchema} + * @return {@link TableSchema} + * @throws IOException if the JSON is invalid. + */ + static TableSchema parseTableSchema(String tableSchemaJson) throws IOException { + JsonParser parser = JacksonFactory.getDefaultInstance().createJsonParser(tableSchemaJson); + return parser.parseAndClose(TableSchema.class); + } + + /** + * Creates {@link TableSchema} from the JSON representation of the table fields. + * + * @param fieldsJson JSON fields to convert to {@link TableSchema} + * @return {@link TableSchema} + * @throws IOException + */ + static TableSchema createTableSchemaFromFields(String fieldsJson) throws IOException { + List fields = new ArrayList<>(); + JsonParser parser = JacksonFactory.getDefaultInstance().createJsonParser(fieldsJson); + parser.parseArrayAndClose(fields, TableFieldSchema.class); + + return new TableSchema().setFields(fields); + } + + /** + * Gets the JSON representation of the table schema. + * + * @param tableSchema {@link TableSchema} to convert to JSON + * @return the JSON of the table schema. + * @throws IOException + */ + static String getTableSchemaJson(TableSchema tableSchema) throws IOException { + return JacksonFactory.getDefaultInstance().toString(tableSchema); + } + + /** + * Gets the JSON representation of the table's fields. + * + * @param tableSchema {@link TableSchema} to get JSON fields from + * @return the JSON of the fields. + * @throws IOException + */ + static String getTableFieldsJson(TableSchema tableSchema) throws IOException { + return JacksonFactory.getDefaultInstance().toString(tableSchema.getFields()); + } +} diff --git a/src/main/java/io/cdap/plugin/gcp/bigquery/sink/lib/BigQueryTableSchema.java b/src/main/java/io/cdap/plugin/gcp/bigquery/sink/lib/BigQueryTableSchema.java new file mode 100644 index 0000000000..1312d3f46c --- /dev/null +++ b/src/main/java/io/cdap/plugin/gcp/bigquery/sink/lib/BigQueryTableSchema.java @@ -0,0 +1,79 @@ +/* + * Copyright © 2023 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package io.cdap.plugin.gcp.bigquery.sink.lib; + +import com.google.api.services.bigquery.model.TableFieldSchema; +import com.google.api.services.bigquery.model.TableSchema; +import com.google.common.base.Preconditions; +import java.util.ArrayList; +import java.util.List; + +/** + * Wrapper for BigQuery {@link TableSchema}. + * + *

This class is used to avoid client code to depend on BigQuery API classes, so that there is no + * potential conflict between different versions of BigQuery API libraries in the client. + * + * @see TableSchema + * - Moved from com.google.cloud.hadoop.io.bigquery bigquery-connector library + * - Update the table regex as per the requirement of flexible column names + */ +public class BigQueryTableSchema { + private final TableSchema tableSchema; + + public BigQueryTableSchema() { + this.tableSchema = new TableSchema(); + } + + BigQueryTableSchema(TableSchema tableSchema) { + Preconditions.checkNotNull(tableSchema, "tableSchema is null."); + this.tableSchema = tableSchema; + } + + /** @see TableSchema#setFields(List) */ + public BigQueryTableSchema setFields(List bigQueryTableFields) { + Preconditions.checkArgument(!bigQueryTableFields.isEmpty(), "Empty fields."); + List fields = new ArrayList<>(bigQueryTableFields.size()); + for (BigQueryTableFieldSchema bigQueryTableField : bigQueryTableFields) { + fields.add(bigQueryTableField.get()); + } + tableSchema.setFields(fields); + return this; + } + + @Override + public int hashCode() { + return tableSchema.hashCode(); + } + + @Override + public boolean equals(Object object) { + if (!(object instanceof com.google.cloud.hadoop.io.bigquery.output.BigQueryTableSchema)) { + return false; + } + BigQueryTableSchema another = (BigQueryTableSchema) object; + return tableSchema.equals(another.tableSchema); + } + + TableSchema get() { + return tableSchema; + } + + static BigQueryTableSchema wrap(TableSchema tableSchema) { + return new BigQueryTableSchema(tableSchema); + } +} + diff --git a/src/main/java/io/cdap/plugin/gcp/bigquery/sink/lib/BigQueryTimePartitioning.java b/src/main/java/io/cdap/plugin/gcp/bigquery/sink/lib/BigQueryTimePartitioning.java new file mode 100644 index 0000000000..0e3e62eeee --- /dev/null +++ b/src/main/java/io/cdap/plugin/gcp/bigquery/sink/lib/BigQueryTimePartitioning.java @@ -0,0 +1,106 @@ +/* + * Copyright © 2023 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package io.cdap.plugin.gcp.bigquery.sink.lib; + +import com.google.api.client.json.JsonParser; +import com.google.api.client.json.jackson2.JacksonFactory; +import com.google.api.services.bigquery.model.TimePartitioning; +import java.io.IOException; + +/** + * Wrapper for BigQuery {@link TimePartitioning}. + * + *

This class is used to avoid client code to depend on BigQuery API classes, so that there is no + * potential conflict between different versions of BigQuery API libraries in the client. + * + * @see TimePartitioning + * - Moved from com.google.cloud.hadoop.io.bigquery bigquery-connector library + * - Update the table regex as per the requirement of flexible column names + */ +public class BigQueryTimePartitioning { + private final TimePartitioning timePartitioning; + + public BigQueryTimePartitioning() { + this.timePartitioning = new TimePartitioning(); + } + + public BigQueryTimePartitioning(TimePartitioning timePartitioning) { + this.timePartitioning = timePartitioning; + } + + public String getType() { + return timePartitioning.getType(); + } + + public void setType(String type) { + timePartitioning.setType(type); + } + + public String getField() { + return timePartitioning.getField(); + } + + public void setField(String field) { + timePartitioning.setField(field); + } + + public long getExpirationMs() { + return timePartitioning.getExpirationMs(); + } + + public void setExpirationMs(long expirationMs) { + timePartitioning.setExpirationMs(expirationMs); + } + + public Boolean getRequirePartitionFilter() { + return timePartitioning.getRequirePartitionFilter(); + } + + public void setRequirePartitionFilter(Boolean requirePartitionFilter) { + timePartitioning.setRequirePartitionFilter(requirePartitionFilter); + } + + @Override + public int hashCode() { + return timePartitioning.hashCode(); + } + + @Override + public boolean equals(Object obj) { + if (!(obj instanceof com.google.cloud.hadoop.io.bigquery.output.BigQueryTimePartitioning)) { + return false; + } + BigQueryTimePartitioning other = (BigQueryTimePartitioning) obj; + return timePartitioning.equals(other.timePartitioning); + } + + TimePartitioning get() { + return timePartitioning; + } + + static TimePartitioning getFromJson(String json) throws IOException { + JsonParser parser = JacksonFactory.getDefaultInstance().createJsonParser(json); + return parser.parseAndClose(TimePartitioning.class); + } + + public String getAsJson() throws IOException { + return JacksonFactory.getDefaultInstance().toString(timePartitioning); + } + + static BigQueryTimePartitioning wrap(TimePartitioning tableTimePartitioning) { + return new BigQueryTimePartitioning(tableTimePartitioning); + } +} diff --git a/src/main/java/io/cdap/plugin/gcp/bigquery/sqlengine/BigQueryPushDataset.java b/src/main/java/io/cdap/plugin/gcp/bigquery/sqlengine/BigQueryPushDataset.java index 1e5d41bc28..11d5ae997c 100644 --- a/src/main/java/io/cdap/plugin/gcp/bigquery/sqlengine/BigQueryPushDataset.java +++ b/src/main/java/io/cdap/plugin/gcp/bigquery/sqlengine/BigQueryPushDataset.java @@ -18,7 +18,6 @@ import com.google.cloud.bigquery.BigQuery; import com.google.cloud.bigquery.DatasetId; -import com.google.cloud.hadoop.io.bigquery.output.BigQueryTableFieldSchema; import io.cdap.cdap.api.data.format.StructuredRecord; import io.cdap.cdap.api.data.schema.Schema; import io.cdap.cdap.api.dataset.lib.KeyValue; @@ -28,6 +27,7 @@ import io.cdap.plugin.gcp.bigquery.sink.BigQueryOutputFormatProvider; import io.cdap.plugin.gcp.bigquery.sink.BigQuerySinkUtils; import io.cdap.plugin.gcp.bigquery.sink.Operation; +import io.cdap.plugin.gcp.bigquery.sink.lib.BigQueryTableFieldSchema; import io.cdap.plugin.gcp.bigquery.sqlengine.transform.PushTransform; import io.cdap.plugin.gcp.bigquery.sqlengine.util.BigQuerySQLEngineUtils; import io.cdap.plugin.gcp.bigquery.util.BigQueryConstants; diff --git a/src/main/java/io/cdap/plugin/gcp/bigquery/util/BigQueryUtil.java b/src/main/java/io/cdap/plugin/gcp/bigquery/util/BigQueryUtil.java index bbe185b618..a4e869bb53 100644 --- a/src/main/java/io/cdap/plugin/gcp/bigquery/util/BigQueryUtil.java +++ b/src/main/java/io/cdap/plugin/gcp/bigquery/util/BigQueryUtil.java @@ -82,7 +82,7 @@ public final class BigQueryUtil { public static final String BUCKET_PATTERN = "[a-z0-9._-]+"; public static final String DATASET_PATTERN = "[A-Za-z0-9_]+"; - public static final String TABLE_PATTERN = "[A-Za-z0-9_-]+"; + public static final String TABLE_PATTERN = "[\\p{L}\\p{M}\\p{N}\\p{Pc}\\p{Pd}\\p{Zs}]+"; // Tags for BQ Jobs public static final String BQ_JOB_TYPE_SOURCE_TAG = "bq_source_plugin"; diff --git a/src/main/java/io/cdap/plugin/gcp/dataplex/sink/DataplexBatchSink.java b/src/main/java/io/cdap/plugin/gcp/dataplex/sink/DataplexBatchSink.java index fd338ac804..5f9057cbea 100644 --- a/src/main/java/io/cdap/plugin/gcp/dataplex/sink/DataplexBatchSink.java +++ b/src/main/java/io/cdap/plugin/gcp/dataplex/sink/DataplexBatchSink.java @@ -40,7 +40,6 @@ import com.google.cloud.dataplex.v1.StorageSystem; import com.google.cloud.dataplex.v1.UpdateEntityRequest; import com.google.cloud.hadoop.io.bigquery.BigQueryConfiguration; -import com.google.cloud.hadoop.io.bigquery.output.BigQueryTableFieldSchema; import com.google.cloud.kms.v1.CryptoKeyName; import com.google.cloud.storage.Bucket; import com.google.cloud.storage.Storage; @@ -70,6 +69,7 @@ import io.cdap.plugin.gcp.bigquery.sink.AbstractBigQuerySink; import io.cdap.plugin.gcp.bigquery.sink.BigQuerySinkUtils; import io.cdap.plugin.gcp.bigquery.sink.PartitionType; +import io.cdap.plugin.gcp.bigquery.sink.lib.BigQueryTableFieldSchema; import io.cdap.plugin.gcp.bigquery.util.BigQueryConstants; import io.cdap.plugin.gcp.bigquery.util.BigQueryUtil; import io.cdap.plugin.gcp.common.CmekUtils; @@ -80,7 +80,6 @@ import io.cdap.plugin.gcp.gcs.GCSPath; import io.cdap.plugin.gcp.gcs.StorageClient; import io.cdap.plugin.gcp.gcs.sink.GCSBatchSink; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -89,6 +88,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; + import java.io.IOException; import java.text.SimpleDateFormat; import java.util.HashMap; @@ -101,7 +101,6 @@ import java.util.Set; import java.util.UUID; import java.util.stream.Collectors; - import javax.annotation.Nullable; /** diff --git a/src/test/java/io/cdap/plugin/gcp/bigquery/sink/BigQuerySinkConfigTest.java b/src/test/java/io/cdap/plugin/gcp/bigquery/sink/BigQuerySinkConfigTest.java index 646597fc21..d7d68fc52f 100644 --- a/src/test/java/io/cdap/plugin/gcp/bigquery/sink/BigQuerySinkConfigTest.java +++ b/src/test/java/io/cdap/plugin/gcp/bigquery/sink/BigQuerySinkConfigTest.java @@ -26,281 +26,406 @@ import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; +import java.util.HashMap; +import java.util.Map; /** * Tests for {@link BigQuerySinkConfig}. */ public class BigQuerySinkConfigTest { - MockFailureCollector collector; - BigQuerySinkConfig config; - Method validateTimePartitioningColumnMethod; - - @Before - public void setup() throws NoSuchMethodException { - collector = new MockFailureCollector(); - config = BigQuerySinkConfig.builder().build(); - validateTimePartitioningColumnMethod = config.getClass() - .getDeclaredMethod("validateTimePartitioningColumn", String.class, FailureCollector.class, - Schema.class, TimePartitioning.Type.class); - validateTimePartitioningColumnMethod.setAccessible(true); + MockFailureCollector collector; + BigQuerySinkConfig config; + Method validateTimePartitioningColumnMethod; + Map arguments; + + @Before + public void setup() throws NoSuchMethodException { + collector = new MockFailureCollector(); + config = BigQuerySinkConfig.builder().build(); + validateTimePartitioningColumnMethod = config.getClass() + .getDeclaredMethod("validateTimePartitioningColumn", String.class, FailureCollector.class, + Schema.class, TimePartitioning.Type.class); + validateTimePartitioningColumnMethod.setAccessible(true); + arguments = new HashMap<>(); + } + + @Test + public void testValidateTimePartitioningColumnWithHourAndDate() throws + InvocationTargetException, IllegalAccessException { + String columnName = "partitionFrom"; + Schema fieldSchema = Schema.recordOf("test", Schema.Field.of("partitionFrom", + Schema.of(Schema.LogicalType.DATE))); + TimePartitioning.Type timePartitioningType = TimePartitioning.Type.HOUR; + + validateTimePartitioningColumnMethod.invoke(config, columnName, collector, fieldSchema, timePartitioningType); + Assert.assertEquals(String.format("Partition column '%s' is of invalid type '%s'.", + columnName, fieldSchema.getDisplayName()), + collector.getValidationFailures().stream().findFirst().get().getMessage()); + } + + @Test + public void testValidateTimePartitioningColumnWithHourAndTimestamp() throws + InvocationTargetException, IllegalAccessException { + + String columnName = "partitionFrom"; + Schema schema = Schema.of(Schema.LogicalType.TIMESTAMP_MICROS); + + Schema fieldSchema = schema.isNullable() ? schema.getNonNullable() : schema; + TimePartitioning.Type timePartitioningType = TimePartitioning.Type.HOUR; + + validateTimePartitioningColumnMethod.invoke(config, columnName, collector, fieldSchema, timePartitioningType); + Assert.assertEquals(0, collector.getValidationFailures().size()); + } + + @Test + public void testValidateTimePartitioningColumnWithDayAndString() throws + InvocationTargetException, IllegalAccessException { + + String columnName = "partitionFrom"; + Schema schema = Schema.of(Schema.Type.STRING); + + Schema fieldSchema = schema.isNullable() ? schema.getNonNullable() : schema; + TimePartitioning.Type timePartitioningType = TimePartitioning.Type.DAY; + + validateTimePartitioningColumnMethod.invoke(config, columnName, collector, fieldSchema, timePartitioningType); + Assert.assertEquals(String.format("Partition column '%s' is of invalid type '%s'.", + columnName, fieldSchema.getDisplayName()), + collector.getValidationFailures().stream().findFirst().get().getMessage()); + } + + @Test + public void testValidateTimePartitioningColumnWithDayAndDate() throws + InvocationTargetException, IllegalAccessException { + + String columnName = "partitionFrom"; + Schema schema = Schema.of(Schema.LogicalType.DATE); + + Schema fieldSchema = schema.isNullable() ? schema.getNonNullable() : schema; + TimePartitioning.Type timePartitioningType = TimePartitioning.Type.DAY; + + validateTimePartitioningColumnMethod.invoke(config, columnName, collector, fieldSchema, timePartitioningType); + Assert.assertEquals(0, collector.getValidationFailures().size()); + } + + @Test + public void testValidateTimePartitioningColumnWithNullAndDate() throws + InvocationTargetException, IllegalAccessException { + + String columnName = "partitionFrom"; + Schema schema = Schema.of(Schema.LogicalType.DATE); + + Schema fieldSchema = schema.isNullable() ? schema.getNonNullable() : schema; + TimePartitioning.Type timePartitioningType = null; + + validateTimePartitioningColumnMethod.invoke(config, columnName, collector, fieldSchema, timePartitioningType); + // No error as null time timePartitioningType will default to DAY + Assert.assertEquals(0, collector.getValidationFailures().size()); + } + + @Test + public void testJobLabelWithDuplicateKeys() { + config.jobLabelKeyValue = "key1:value1,key2:value2,key1:value3"; + config.validate(collector); + Assert.assertEquals(1, collector.getValidationFailures().size()); + Assert.assertEquals("Duplicate job label key 'key1'.", + collector.getValidationFailures().get(0).getMessage()); + } + + @Test + public void testJobLabelWithDuplicateValues() { + config.jobLabelKeyValue = "key1:value1,key2:value2,key3:value1"; + config.validate(collector); + Assert.assertEquals(0, collector.getValidationFailures().size()); + } + + @Test + public void testJobLabelWithCapitalLetters() { + config.jobLabelKeyValue = "keY1:value1,key2:value2,key3:value1"; + config.validate(collector); + Assert.assertEquals(1, collector.getValidationFailures().size()); + Assert.assertEquals("Invalid job label key 'keY1'.", + collector.getValidationFailures().get(0).getMessage()); + } + + @Test + public void testJobLabelStartingWithCapitalLetters() { + config.jobLabelKeyValue = "Key1:value1,key2:value2,key3:value1"; + config.validate(collector); + Assert.assertEquals(1, collector.getValidationFailures().size()); + Assert.assertEquals("Invalid job label key 'Key1'.", + collector.getValidationFailures().get(0).getMessage()); + } + + @Test + public void testJobLabelWithInvalidCharacters() { + config.jobLabelKeyValue = "key1:value1,key2:value2,key3:value1@"; + config.validate(collector); + Assert.assertEquals(1, collector.getValidationFailures().size()); + Assert.assertEquals("Invalid job label value 'value1@'.", + collector.getValidationFailures().get(0).getMessage()); + } + + @Test + public void testJobLabelWithEmptyKey() { + config.jobLabelKeyValue = ":value1,key2:value2,key3:value1"; + config.validate(collector); + Assert.assertEquals(1, collector.getValidationFailures().size()); + Assert.assertEquals("Invalid job label key ''.", + collector.getValidationFailures().get(0).getMessage()); + } + + @Test + public void testJobLabelWithEmptyValue() { + config.jobLabelKeyValue = "key1:,key2:value2,key3:value1"; + config.validate(collector); + Assert.assertEquals(0, collector.getValidationFailures().size()); + } + + @Test + public void testJobLabelWithWrongFormat() { + config.jobLabelKeyValue = "key1=value1"; + config.validate(collector); + Assert.assertEquals(1, collector.getValidationFailures().size()); + Assert.assertEquals("Invalid job label key 'key1=value1'.", + collector.getValidationFailures().get(0).getMessage()); + } + + @Test + public void testJobLabelWithNull() { + config.jobLabelKeyValue = null; + config.validate(collector); + Assert.assertEquals(0, collector.getValidationFailures().size()); + } + + @Test + public void testJobLabelWithReservedKeys() { + config.jobLabelKeyValue = "job_source:value1,type:value2"; + config.validate(collector); + Assert.assertEquals(2, collector.getValidationFailures().size()); + Assert.assertEquals("Invalid job label key 'job_source'.", + collector.getValidationFailures().get(0).getMessage()); + } + + @Test + public void testJobLabelWith65Keys() { + StringBuilder sb = new StringBuilder(); + for (int i = 1; i <= 65; i++) { + String key = "key" + i; + String value = "value" + i; + sb.append(key).append(":").append(value).append(","); } - @Test - public void testValidateTimePartitioningColumnWithHourAndDate() throws - InvocationTargetException, IllegalAccessException { - String columnName = "partitionFrom"; - Schema fieldSchema = Schema.recordOf("test", Schema.Field.of("partitionFrom", - Schema.of(Schema.LogicalType.DATE))); - TimePartitioning.Type timePartitioningType = TimePartitioning.Type.HOUR; - - validateTimePartitioningColumnMethod.invoke(config, columnName, collector, fieldSchema, timePartitioningType); - Assert.assertEquals(String.format("Partition column '%s' is of invalid type '%s'.", - columnName, fieldSchema.getDisplayName()), - collector.getValidationFailures().stream().findFirst().get().getMessage()); + // remove the last comma + sb.deleteCharAt(sb.length() - 1); + Assert.assertEquals(65, sb.toString().split(",").length); + config.jobLabelKeyValue = sb.toString(); + config.validate(collector); + Assert.assertEquals(1, collector.getValidationFailures().size()); + Assert.assertEquals("Number of job labels exceeds the limit.", + collector.getValidationFailures().get(0).getMessage()); + } + + @Test + public void testJobLabelWithKeyLength64() { + String key64 = "1234567890123456789012345678901234567890123456789012345678901234"; + config.jobLabelKeyValue = key64 + ":value1"; + config.validate(collector); + Assert.assertEquals(1, collector.getValidationFailures().size()); + Assert.assertEquals("Invalid job label key '" + key64 + "'.", + collector.getValidationFailures().get(0).getMessage()); + } + + @Test + public void testJobLabelWithValueLength64() { + String value64 = "1234567890123456789012345678901234567890123456789012345678901234"; + config.jobLabelKeyValue = "key1:" + value64; + config.validate(collector); + Assert.assertEquals(1, collector.getValidationFailures().size()); + Assert.assertEquals("Invalid job label value '" + value64 + "'.", + collector.getValidationFailures().get(0).getMessage()); + } + + @Test + public void testJobLabelWithKeyStartingWithNumber() { + config.jobLabelKeyValue = "1key:value1"; + config.validate(collector); + Assert.assertEquals(1, collector.getValidationFailures().size()); + Assert.assertEquals("Invalid job label key '1key'.", + collector.getValidationFailures().get(0).getMessage()); + } + + @Test + public void testJobLabelWithKeyStartingWithDash() { + config.jobLabelKeyValue = "-key:value1"; + config.validate(collector); + Assert.assertEquals(1, collector.getValidationFailures().size()); + Assert.assertEquals("Invalid job label key '-key'.", + collector.getValidationFailures().get(0).getMessage()); + } + + @Test + public void testJobLabelWithKeyStartingWithHyphen() { + config.jobLabelKeyValue = "_key:value1"; + config.validate(collector); + Assert.assertEquals(1, collector.getValidationFailures().size()); + Assert.assertEquals("Invalid job label key '_key'.", + collector.getValidationFailures().get(0).getMessage()); + } + + @Test + public void testJobLabelWithKeyWithChineseCharacter() { + config.jobLabelKeyValue = "中文:value1"; + config.validate(collector); + Assert.assertEquals(0, collector.getValidationFailures().size()); + } + + @Test + public void testJobLabelWithKeyWithJapaneseCharacter() { + config.jobLabelKeyValue = "日本語:value1"; + config.validate(collector); + Assert.assertEquals(0, collector.getValidationFailures().size()); + } + + @Test + public void testJobLabelWithValueStartingWithNumber() { + config.jobLabelKeyValue = "key:1value"; + config.validate(collector); + Assert.assertEquals(0, collector.getValidationFailures().size()); + } + + @Test + public void testJobLabelWithValueStartingWithDash() { + config.jobLabelKeyValue = "key:-value"; + config.validate(collector); + Assert.assertEquals(0, collector.getValidationFailures().size()); + } + + @Test + public void testJobLabelWithValueStartingWithCaptialLetter() { + config.jobLabelKeyValue = "key:Value"; + config.validate(collector); + Assert.assertEquals(1, collector.getValidationFailures().size()); + Assert.assertEquals("Invalid job label value 'Value'.", + collector.getValidationFailures().get(0).getMessage()); + } + + @Test + public void testValidateColumnNameWithValidColumnName() { + String columnName = "test"; + Schema schema = Schema.recordOf("test", Schema.Field.of(columnName, Schema.of(Schema.Type.STRING))); + config.validate(schema, schema, collector, arguments); + Assert.assertEquals(0, collector.getValidationFailures().size()); + } + + @Test + public void testValidateColumnNameWithChineseColumnName() { + String columnName = "测试"; + Schema schema = Schema.recordOf("test", Schema.Field.of(columnName, Schema.of(Schema.Type.STRING))); + config.validate(schema, schema, collector, arguments); + Assert.assertEquals(0, collector.getValidationFailures().size()); + } + + @Test + public void testValidateColumnNameWithInvalidColumnName() { + String columnName = "test@"; + Schema schema = Schema.recordOf("test", Schema.Field.of(columnName, Schema.of(Schema.Type.STRING))); + config.validate(schema, schema, collector, arguments); + Assert.assertEquals(1, collector.getValidationFailures().size()); + Assert.assertEquals("Output field 'test@' contains invalid characters. " + + "Check column names docs for more details.", collector.getValidationFailures().get(0).getMessage()); + } + + @Test + public void testValidateColumnNameWithJapaneseColumnName() { + String columnName = "テスト"; + Schema schema = Schema.recordOf("test", Schema.Field.of(columnName, Schema.of(Schema.Type.STRING))); + config.validate(schema, schema, collector, arguments); + Assert.assertEquals(0, collector.getValidationFailures().size()); + } + + @Test + public void testValidateColumnNameWithSpace() { + String columnName = "test test"; + Schema schema = Schema.recordOf("test", Schema.Field.of(columnName, Schema.of(Schema.Type.STRING))); + config.validate(schema, schema, collector, arguments); + Assert.assertEquals(0, collector.getValidationFailures().size()); + } + + @Test + public void testValidateColumnNameWithEmoji() { + String columnName = "test😀"; + Schema schema = Schema.recordOf("test", Schema.Field.of(columnName, Schema.of(Schema.Type.STRING))); + config.validate(schema, schema, collector, arguments); + Assert.assertEquals(1, collector.getValidationFailures().size()); + Assert.assertEquals("Output field 'test😀' contains invalid characters. " + + "Check column names docs for more details.", collector.getValidationFailures().get(0).getMessage()); + } + + @Test + public void testValidateColumnNameWithUnderscore() { + String columnName = "test_test"; + Schema schema = Schema.recordOf("test", Schema.Field.of(columnName, Schema.of(Schema.Type.STRING))); + config.validate(schema, schema, collector, arguments); + Assert.assertEquals(0, collector.getValidationFailures().size()); + } + + @Test + public void testValidateColumnNameWithDash() { + String columnName = "test-test"; + Schema schema = Schema.recordOf("test", Schema.Field.of(columnName, Schema.of(Schema.Type.STRING))); + config.validate(schema, schema, collector, arguments); + Assert.assertEquals(0, collector.getValidationFailures().size()); + } + + @Test + public void testValidateColumnNameWithCapitalLetters() { + String columnName = "Test"; + Schema schema = Schema.recordOf("test", Schema.Field.of(columnName, Schema.of(Schema.Type.STRING))); + config.validate(schema, schema, collector, arguments); + Assert.assertEquals(0, collector.getValidationFailures().size()); + } + + @Test + public void testValidateColumnNameWithNumbers() { + String columnName = "1234"; + Schema schema = Schema.recordOf("test", Schema.Field.of(columnName, Schema.of(Schema.Type.STRING))); + config.validate(schema, schema, collector, arguments); + Assert.assertEquals(0, collector.getValidationFailures().size()); + } + + @Test + public void testValidateColumnNameWithSpecialCharacter() { + String columnName = "test!"; + Schema schema = Schema.recordOf("test", Schema.Field.of(columnName, Schema.of(Schema.Type.STRING))); + config.validate(schema, schema, collector, arguments); + Assert.assertEquals(1, collector.getValidationFailures().size()); + Assert.assertEquals("Output field 'test!' contains invalid characters. " + + "Check column names docs for more details.", collector.getValidationFailures().get(0).getMessage()); + } + + @Test + public void testValidateColumnNameWith300Length() { + String columnName = "a"; + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < 300; i++) { + sb.append(columnName); } - - @Test - public void testValidateTimePartitioningColumnWithHourAndTimestamp() throws - InvocationTargetException, IllegalAccessException { - - String columnName = "partitionFrom"; - Schema schema = Schema.of(Schema.LogicalType.TIMESTAMP_MICROS); - - Schema fieldSchema = schema.isNullable() ? schema.getNonNullable() : schema; - TimePartitioning.Type timePartitioningType = TimePartitioning.Type.HOUR; - - validateTimePartitioningColumnMethod.invoke(config, columnName, collector, fieldSchema, timePartitioningType); - Assert.assertEquals(0, collector.getValidationFailures().size()); - } - - @Test - public void testValidateTimePartitioningColumnWithDayAndString() throws - InvocationTargetException, IllegalAccessException { - - String columnName = "partitionFrom"; - Schema schema = Schema.of(Schema.Type.STRING); - - Schema fieldSchema = schema.isNullable() ? schema.getNonNullable() : schema; - TimePartitioning.Type timePartitioningType = TimePartitioning.Type.DAY; - - validateTimePartitioningColumnMethod.invoke(config, columnName, collector, fieldSchema, timePartitioningType); - Assert.assertEquals(String.format("Partition column '%s' is of invalid type '%s'.", - columnName, fieldSchema.getDisplayName()), - collector.getValidationFailures().stream().findFirst().get().getMessage()); - } - - @Test - public void testValidateTimePartitioningColumnWithDayAndDate() throws - InvocationTargetException, IllegalAccessException { - - String columnName = "partitionFrom"; - Schema schema = Schema.of(Schema.LogicalType.DATE); - - Schema fieldSchema = schema.isNullable() ? schema.getNonNullable() : schema; - TimePartitioning.Type timePartitioningType = TimePartitioning.Type.DAY; - - validateTimePartitioningColumnMethod.invoke(config, columnName, collector, fieldSchema, timePartitioningType); - Assert.assertEquals(0, collector.getValidationFailures().size()); - } - - @Test - public void testValidateTimePartitioningColumnWithNullAndDate() throws - InvocationTargetException, IllegalAccessException { - - String columnName = "partitionFrom"; - Schema schema = Schema.of(Schema.LogicalType.DATE); - - Schema fieldSchema = schema.isNullable() ? schema.getNonNullable() : schema; - TimePartitioning.Type timePartitioningType = null; - - validateTimePartitioningColumnMethod.invoke(config, columnName, collector, fieldSchema, timePartitioningType); - // No error as null time timePartitioningType will default to DAY - Assert.assertEquals(0, collector.getValidationFailures().size()); - } - - @Test - public void testJobLabelWithDuplicateKeys() { - config.jobLabelKeyValue = "key1:value1,key2:value2,key1:value3"; - config.validate(collector); - Assert.assertEquals(1, collector.getValidationFailures().size()); - Assert.assertEquals("Duplicate job label key 'key1'.", - collector.getValidationFailures().get(0).getMessage()); - } - - @Test - public void testJobLabelWithDuplicateValues() { - config.jobLabelKeyValue = "key1:value1,key2:value2,key3:value1"; - config.validate(collector); - Assert.assertEquals(0, collector.getValidationFailures().size()); - } - - @Test - public void testJobLabelWithCapitalLetters() { - config.jobLabelKeyValue = "keY1:value1,key2:value2,key3:value1"; - config.validate(collector); - Assert.assertEquals(1, collector.getValidationFailures().size()); - Assert.assertEquals("Invalid job label key 'keY1'.", - collector.getValidationFailures().get(0).getMessage()); - } - - @Test - public void testJobLabelStartingWithCapitalLetters() { - config.jobLabelKeyValue = "Key1:value1,key2:value2,key3:value1"; - config.validate(collector); - Assert.assertEquals(1, collector.getValidationFailures().size()); - Assert.assertEquals("Invalid job label key 'Key1'.", - collector.getValidationFailures().get(0).getMessage()); - } - - @Test - public void testJobLabelWithInvalidCharacters() { - config.jobLabelKeyValue = "key1:value1,key2:value2,key3:value1@"; - config.validate(collector); - Assert.assertEquals(1, collector.getValidationFailures().size()); - Assert.assertEquals("Invalid job label value 'value1@'.", - collector.getValidationFailures().get(0).getMessage()); - } - - @Test - public void testJobLabelWithEmptyKey() { - config.jobLabelKeyValue = ":value1,key2:value2,key3:value1"; - config.validate(collector); - Assert.assertEquals(1, collector.getValidationFailures().size()); - Assert.assertEquals("Invalid job label key ''.", - collector.getValidationFailures().get(0).getMessage()); - } - - @Test - public void testJobLabelWithEmptyValue() { - config.jobLabelKeyValue = "key1:,key2:value2,key3:value1"; - config.validate(collector); - Assert.assertEquals(0, collector.getValidationFailures().size()); - } - - @Test - public void testJobLabelWithWrongFormat() { - config.jobLabelKeyValue = "key1=value1"; - config.validate(collector); - Assert.assertEquals(1, collector.getValidationFailures().size()); - Assert.assertEquals("Invalid job label key 'key1=value1'.", - collector.getValidationFailures().get(0).getMessage()); - } - - @Test - public void testJobLabelWithNull() { - config.jobLabelKeyValue = null; - config.validate(collector); - Assert.assertEquals(0, collector.getValidationFailures().size()); - } - - @Test - public void testJobLabelWithReservedKeys() { - config.jobLabelKeyValue = "job_source:value1,type:value2"; - config.validate(collector); - Assert.assertEquals(2, collector.getValidationFailures().size()); - Assert.assertEquals("Invalid job label key 'job_source'.", - collector.getValidationFailures().get(0).getMessage()); - } - - @Test - public void testJobLabelWith65Keys() { - StringBuilder sb = new StringBuilder(); - for (int i = 1; i <= 65; i++) { - String key = "key" + i; - String value = "value" + i; - sb.append(key).append(":").append(value).append(","); - } - // remove the last comma - sb.deleteCharAt(sb.length() - 1); - Assert.assertEquals(65, sb.toString().split(",").length); - config.jobLabelKeyValue = sb.toString(); - config.validate(collector); - Assert.assertEquals(1, collector.getValidationFailures().size()); - Assert.assertEquals("Number of job labels exceeds the limit.", - collector.getValidationFailures().get(0).getMessage()); - } - - @Test - public void testJobLabelWithKeyLength64() { - String key64 = "1234567890123456789012345678901234567890123456789012345678901234"; - config.jobLabelKeyValue = key64 + ":value1"; - config.validate(collector); - Assert.assertEquals(1, collector.getValidationFailures().size()); - Assert.assertEquals("Invalid job label key '" + key64 + "'.", - collector.getValidationFailures().get(0).getMessage()); - } - - @Test - public void testJobLabelWithValueLength64() { - String value64 = "1234567890123456789012345678901234567890123456789012345678901234"; - config.jobLabelKeyValue = "key1:" + value64; - config.validate(collector); - Assert.assertEquals(1, collector.getValidationFailures().size()); - Assert.assertEquals("Invalid job label value '" + value64 + "'.", - collector.getValidationFailures().get(0).getMessage()); - } - - @Test - public void testJobLabelWithKeyStartingWithNumber() { - config.jobLabelKeyValue = "1key:value1"; - config.validate(collector); - Assert.assertEquals(1, collector.getValidationFailures().size()); - Assert.assertEquals("Invalid job label key '1key'.", - collector.getValidationFailures().get(0).getMessage()); - } - - @Test - public void testJobLabelWithKeyStartingWithDash() { - config.jobLabelKeyValue = "-key:value1"; - config.validate(collector); - Assert.assertEquals(1, collector.getValidationFailures().size()); - Assert.assertEquals("Invalid job label key '-key'.", - collector.getValidationFailures().get(0).getMessage()); - } - - @Test - public void testJobLabelWithKeyStartingWithHyphen() { - config.jobLabelKeyValue = "_key:value1"; - config.validate(collector); - Assert.assertEquals(1, collector.getValidationFailures().size()); - Assert.assertEquals("Invalid job label key '_key'.", - collector.getValidationFailures().get(0).getMessage()); - } - - @Test - public void testJobLabelWithKeyWithChineseCharacter() { - config.jobLabelKeyValue = "中文:value1"; - config.validate(collector); - Assert.assertEquals(0, collector.getValidationFailures().size()); - } - - @Test - public void testJobLabelWithKeyWithJapaneseCharacter() { - config.jobLabelKeyValue = "日本語:value1"; - config.validate(collector); - Assert.assertEquals(0, collector.getValidationFailures().size()); - } - - @Test - public void testJobLabelWithValueStartingWithNumber() { - config.jobLabelKeyValue = "key:1value"; - config.validate(collector); - Assert.assertEquals(0, collector.getValidationFailures().size()); - } - - @Test - public void testJobLabelWithValueStartingWithDash() { - config.jobLabelKeyValue = "key:-value"; - config.validate(collector); - Assert.assertEquals(0, collector.getValidationFailures().size()); - } - - @Test - public void testJobLabelWithValueStartingWithCaptialLetter() { - config.jobLabelKeyValue = "key:Value"; - config.validate(collector); - Assert.assertEquals(1, collector.getValidationFailures().size()); - Assert.assertEquals("Invalid job label value 'Value'.", - collector.getValidationFailures().get(0).getMessage()); + Schema schema = Schema.recordOf("test", Schema.Field.of(sb.toString(), Schema.of(Schema.Type.STRING))); + config.validate(schema, schema, collector, arguments); + Assert.assertEquals(0, collector.getValidationFailures().size()); + } + + @Test + public void testValidateColumnNameWith301Length() { + String columnName = "a"; + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < 301; i++) { + sb.append(columnName); } + Schema schema = Schema.recordOf("test", Schema.Field.of(sb.toString(), Schema.of(Schema.Type.STRING))); + config.validate(schema, schema, collector, arguments); + Assert.assertEquals(1, collector.getValidationFailures().size()); + Assert.assertEquals("Output field '" + sb + "' exceeds the maximum length of 300 characters.", + collector.getValidationFailures().get(0).getMessage()); + } } diff --git a/src/test/java/io/cdap/plugin/gcp/bigquery/sink/BigQuerySinkUtilsTest.java b/src/test/java/io/cdap/plugin/gcp/bigquery/sink/BigQuerySinkUtilsTest.java index c181dd65ba..9cb0490f31 100644 --- a/src/test/java/io/cdap/plugin/gcp/bigquery/sink/BigQuerySinkUtilsTest.java +++ b/src/test/java/io/cdap/plugin/gcp/bigquery/sink/BigQuerySinkUtilsTest.java @@ -23,8 +23,8 @@ import com.google.cloud.bigquery.Field; import com.google.cloud.bigquery.LegacySQLTypeName; import com.google.cloud.bigquery.TableId; -import com.google.cloud.hadoop.io.bigquery.output.BigQueryTableFieldSchema; import io.cdap.cdap.api.data.schema.Schema; +import io.cdap.plugin.gcp.bigquery.sink.lib.BigQueryTableFieldSchema; import io.cdap.plugin.gcp.bigquery.util.BigQueryTypeSize; import org.apache.hadoop.conf.Configuration; import org.junit.Assert; diff --git a/src/test/java/io/cdap/plugin/gcp/dataplex/sink/config/DataplexBatchSinkTest.java b/src/test/java/io/cdap/plugin/gcp/dataplex/sink/config/DataplexBatchSinkTest.java index cb5ba188cb..1ee1c4d04e 100644 --- a/src/test/java/io/cdap/plugin/gcp/dataplex/sink/config/DataplexBatchSinkTest.java +++ b/src/test/java/io/cdap/plugin/gcp/dataplex/sink/config/DataplexBatchSinkTest.java @@ -19,7 +19,6 @@ import com.google.cloud.dataplex.v1.Schema; import com.google.cloud.dataplex.v1.Zone; import com.google.cloud.dataplex.v1.ZoneName; -import com.google.cloud.hadoop.io.bigquery.output.BigQueryTableFieldSchema; import com.google.cloud.kms.v1.CryptoKeyName; import io.cdap.cdap.api.data.format.StructuredRecord; import io.cdap.cdap.api.dataset.lib.KeyValue; @@ -32,6 +31,7 @@ import io.cdap.cdap.etl.api.batch.BatchSinkContext; import io.cdap.cdap.etl.mock.validation.MockFailureCollector; import io.cdap.plugin.gcp.bigquery.sink.BigQuerySinkUtils; +import io.cdap.plugin.gcp.bigquery.sink.lib.BigQueryTableFieldSchema; import io.cdap.plugin.gcp.bigquery.util.BigQueryUtil; import io.cdap.plugin.gcp.common.CmekUtils; import io.cdap.plugin.gcp.common.GCPConnectorConfig; @@ -39,7 +39,6 @@ import io.cdap.plugin.gcp.dataplex.common.util.DataplexConstants; import io.cdap.plugin.gcp.dataplex.common.util.DataplexUtil; import io.cdap.plugin.gcp.dataplex.sink.DataplexBatchSink; - import org.apache.hadoop.conf.Configuration; import org.junit.Assert; import org.junit.Test;