diff --git a/docs/BigQueryArgumentSetter-action.md b/docs/BigQueryArgumentSetter-action.md index a3fb04f4d2..2348fd3193 100644 --- a/docs/BigQueryArgumentSetter-action.md +++ b/docs/BigQueryArgumentSetter-action.md @@ -23,8 +23,6 @@ Properties ---------- - **Reference Name:** Name used to uniquely identify this source for lineage, annotating metadata, etc. - **Project ID**: Google Cloud Project ID, which uniquely identifies a project. It can be found on the Dashboard in the Google Cloud Platform Console. This is the project that the BigQuery job will run in. If a temporary bucket needs to be created, the service account diff --git a/docs/BigQueryTable-batchsink.md b/docs/BigQueryTable-batchsink.md index 62eb9d5ee3..9c977be503 100644 --- a/docs/BigQueryTable-batchsink.md +++ b/docs/BigQueryTable-batchsink.md @@ -110,6 +110,17 @@ is ignored if the table already exists. **Time Partitioning Type**: Specifies the time partitioning type. Can either be Daily or Hourly or Monthly or Yearly. Default is Daily. Ignored when table already exists +> The table below shows the compatibility of different time schema types with various time partitioning types in BigQuery. + +| Schema Type / Partion Type | Hourly | Daily | Monthly | Yearly | +|-------------------------| ------- | ------- | ------- | ------- | +| TIMESTAMP_MILLIS | ✓ | ✓ | ✓ | ✓ | +| TIMESTAMP_MICROS | ✓ | ✓ | ✓ | ✓ | +| DATETIME | ✓ | ✓ | ✓ | ✓ | +| DATE | ✗ | ✓ | ✓ | ✓ | +| TIME_MILLIS | ✗ | ✗ | ✗ | ✗ | +| TIME_MICROS | ✗ | ✗ | ✗ | ✗ | + **Range Start**: For integer partitioning, specifies the start of the range. Only used when table doesn’t exist already, and partitioning type is set to Integer. * The start value is inclusive. diff --git a/docs/GCSArgumentSetter-action.md b/docs/GCSArgumentSetter-action.md index e3422bf22c..c2290580e6 100644 --- a/docs/GCSArgumentSetter-action.md +++ b/docs/GCSArgumentSetter-action.md @@ -37,8 +37,6 @@ must be readable by all users running the job. Properties ---------- -**Reference Name:** Name used to uniquely identify this source for lineage, annotating metadata, etc. - **Project ID**: Google Cloud Project ID, which uniquely identifies a project. It can be found on the Dashboard in the Google Cloud Platform Console. This is the project that the BigQuery job will run in. If a temporary bucket needs to be created, the service account diff --git a/pom.xml b/pom.xml index cd27739c64..dda0ea93e9 100644 --- a/pom.xml +++ b/pom.xml @@ -73,7 +73,7 @@ hadoop2-1.2.0 1.4 6.9.1 - 2.11.1 + 2.12.1 3.2.6 0.7.1 hadoop2-2.2.9 @@ -1245,7 +1245,7 @@ io.cdap.tests.e2e cdap-e2e-framework - 0.3.1 + 0.3.2 test diff --git a/src/main/java/io/cdap/plugin/gcp/bigquery/sink/BigQuerySinkConfig.java b/src/main/java/io/cdap/plugin/gcp/bigquery/sink/BigQuerySinkConfig.java index 2443d1bd74..63d20aa0a7 100644 --- a/src/main/java/io/cdap/plugin/gcp/bigquery/sink/BigQuerySinkConfig.java +++ b/src/main/java/io/cdap/plugin/gcp/bigquery/sink/BigQuerySinkConfig.java @@ -506,23 +506,21 @@ private void validateTimePartitioningColumn(String columnName, FailureCollector boolean isTimestamp = logicalType == LogicalType.TIMESTAMP_MICROS || logicalType == LogicalType.TIMESTAMP_MILLIS; boolean isDate = logicalType == LogicalType.DATE; - boolean isTimestampOrDate = isTimestamp || isDate; - - // If timePartitioningType is HOUR, then logicalType cannot be DATE Only TIMESTAMP_MICROS and TIMESTAMP_MILLIS - if (timePartitioningType == TimePartitioning.Type.HOUR && !isTimestamp) { - collector.addFailure( - String.format("Partition column '%s' is of invalid type '%s'.", + boolean isDateTime = logicalType == LogicalType.DATETIME; + boolean isTimestampOrDateOrDateTime = isTimestamp || isDate || isDateTime; + boolean isTimestampOrDateTime = isTimestamp || isDateTime; + // TimePartitioningType HOUR is supported by TIMESTAMP_MICROS, TIMESTAMP_MILLIS, DATETIME + if (timePartitioningType == TimePartitioning.Type.HOUR && !isTimestampOrDateTime) { + collector.addFailure(String.format("Partition column '%s' is of invalid type '%s'.", columnName, fieldSchema.getDisplayName()), - "Partition column must be a timestamp.").withConfigProperty(NAME_PARTITION_BY_FIELD) - .withOutputSchemaField(columnName).withInputSchemaField(columnName); - - // For any other timePartitioningType (DAY, MONTH, YEAR) logicalType can be DATE, TIMESTAMP_MICROS, TIMESTAMP_MILLIS - } else if (!isTimestampOrDate) { - collector.addFailure( - String.format("Partition column '%s' is of invalid type '%s'.", + "Partition column must be of type TIMESTAMP or DATETIME") + .withConfigProperty(NAME_PARTITION_BY_FIELD).withOutputSchemaField(columnName).withInputSchemaField(columnName); + // TimePartitioningType (DAY, MONTH, YEAR) are supported by TIMESTAMP_MICROS, TIMESTAMP_MILLIS, DATE, DATETIME + } else if (!isTimestampOrDateOrDateTime) { + collector.addFailure(String.format("Partition column '%s' is of invalid type '%s'.", columnName, fieldSchema.getDisplayName()), - "Partition column must be a date or timestamp.").withConfigProperty(NAME_PARTITION_BY_FIELD) - .withOutputSchemaField(columnName).withInputSchemaField(columnName); + "Partition column must be of type TIMESTAMP, DATE or DATETIME") + .withConfigProperty(NAME_PARTITION_BY_FIELD).withOutputSchemaField(columnName).withInputSchemaField(columnName); } } diff --git a/src/main/java/io/cdap/plugin/gcp/bigquery/util/BigQueryUtil.java b/src/main/java/io/cdap/plugin/gcp/bigquery/util/BigQueryUtil.java index 05939df878..0f3fc80148 100644 --- a/src/main/java/io/cdap/plugin/gcp/bigquery/util/BigQueryUtil.java +++ b/src/main/java/io/cdap/plugin/gcp/bigquery/util/BigQueryUtil.java @@ -438,15 +438,28 @@ public static ValidationFailure validateFieldSchemaMatches(Field bqField, Schema if (bqField.getMode() == Field.Mode.REPEATED) { fieldSchema = fieldSchema.getComponentSchema(); type = fieldSchema.getType(); + logicalType = fieldSchema.getLogicalType(); } } + String[] incompatibleFieldErrorMessage = { + String.format("Field '%s' of type '%s' is incompatible with column '%s' of type '%s' in BigQuery table '%s.%s'.", + field.getName(), fieldSchema.getDisplayName(), bqField.getName(), + BQ_TYPE_MAP.get(bqField.getType()), dataset, table) , + String.format("It must be of type '%s'.", BQ_TYPE_MAP.get(bqField.getType())) + }; + if (logicalType != null) { + if (LOGICAL_TYPE_MAP.get(logicalType) != null && !LOGICAL_TYPE_MAP.get(logicalType).contains(bqField.getType())) { + return collector.addFailure(incompatibleFieldErrorMessage[0], incompatibleFieldErrorMessage[1]); + } + + // Return once logical types are validated. This is because logical types are represented as primitive types + // internally. + return null; + } + if (TYPE_MAP.get(type) != null && !TYPE_MAP.get(type).contains(bqField.getType())) { - return collector.addFailure( - String.format("Field '%s' of type '%s' is incompatible with column '%s' of type '%s' " + - "in BigQuery table '%s.%s'.", field.getName(), fieldSchema.getDisplayName(), bqField.getName(), - BQ_TYPE_MAP.get(bqField.getType()), dataset, table), - String.format("It must be of type '%s'.", BQ_TYPE_MAP.get(bqField.getType()))); + return collector.addFailure(incompatibleFieldErrorMessage[0], incompatibleFieldErrorMessage[1]); } return null; } diff --git a/src/main/java/io/cdap/plugin/gcp/common/GCSEmptyInputFormat.java b/src/main/java/io/cdap/plugin/gcp/common/GCSEmptyInputFormat.java new file mode 100644 index 0000000000..b6ee12e456 --- /dev/null +++ b/src/main/java/io/cdap/plugin/gcp/common/GCSEmptyInputFormat.java @@ -0,0 +1,30 @@ +/* + * Copyright © 2024 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + * + */ + +package io.cdap.plugin.gcp.common; + +import io.cdap.plugin.format.input.AbstractEmptyInputFormat; + + +/** + * An InputFormat that returns no data. + * @param the type of key + * @param the type of value + */ +public class GCSEmptyInputFormat extends AbstractEmptyInputFormat { + // no-op +} diff --git a/src/main/java/io/cdap/plugin/gcp/gcs/source/GCSSource.java b/src/main/java/io/cdap/plugin/gcp/gcs/source/GCSSource.java index 1d52fc90fb..7bfec4dd81 100644 --- a/src/main/java/io/cdap/plugin/gcp/gcs/source/GCSSource.java +++ b/src/main/java/io/cdap/plugin/gcp/gcs/source/GCSSource.java @@ -43,6 +43,7 @@ import io.cdap.plugin.format.plugin.FileSourceProperties; import io.cdap.plugin.gcp.common.GCPConnectorConfig; import io.cdap.plugin.gcp.common.GCPUtils; +import io.cdap.plugin.gcp.common.GCSEmptyInputFormat; import io.cdap.plugin.gcp.crypto.EncryptedFileSystem; import io.cdap.plugin.gcp.gcs.GCSPath; import io.cdap.plugin.gcp.gcs.connector.GCSConnector; @@ -77,6 +78,11 @@ public void configurePipeline(PipelineConfigurer pipelineConfigurer) { super.configurePipeline(pipelineConfigurer); } + @Override + protected String getEmptyInputFormatClassName() { + return GCSEmptyInputFormat.class.getName(); + } + @Override public void prepareRun(BatchSourceContext context) throws Exception { // Get location of the source for lineage @@ -268,11 +274,6 @@ public Long getMinSplitSize() { return minSplitSize; } - @Override - public boolean shouldAllowEmptyInput() { - return false; - } - public boolean isCopyHeader() { return shouldCopyHeader(); } diff --git a/src/test/java/io/cdap/plugin/gcp/bigquery/sink/BigQuerySinkConfigTest.java b/src/test/java/io/cdap/plugin/gcp/bigquery/sink/BigQuerySinkConfigTest.java index ebaa553df3..6666466796 100644 --- a/src/test/java/io/cdap/plugin/gcp/bigquery/sink/BigQuerySinkConfigTest.java +++ b/src/test/java/io/cdap/plugin/gcp/bigquery/sink/BigQuerySinkConfigTest.java @@ -129,6 +129,34 @@ public void testValidateTimePartitioningColumnWithNullAndDate() throws Assert.assertEquals(0, collector.getValidationFailures().size()); } + @Test + public void testValidateTimePartitioningColumnWithMonthAndDateTime() throws + InvocationTargetException, IllegalAccessException { + + String columnName = "partitionFrom"; + Schema schema = Schema.of(Schema.LogicalType.DATETIME); + + Schema fieldSchema = schema.isNullable() ? schema.getNonNullable() : schema; + TimePartitioning.Type timePartitioningType = TimePartitioning.Type.MONTH; + + validateTimePartitioningColumnMethod.invoke(config, columnName, collector, fieldSchema, timePartitioningType); + Assert.assertEquals(0, collector.getValidationFailures().size()); + } + + @Test + public void testValidateTimePartitioningColumnWithHourAndDateTime() throws + InvocationTargetException, IllegalAccessException { + + String columnName = "partitionFrom"; + Schema schema = Schema.of(Schema.LogicalType.DATETIME); + + Schema fieldSchema = schema.isNullable() ? schema.getNonNullable() : schema; + TimePartitioning.Type timePartitioningType = TimePartitioning.Type.HOUR; + + validateTimePartitioningColumnMethod.invoke(config, columnName, collector, fieldSchema, timePartitioningType); + Assert.assertEquals(0, collector.getValidationFailures().size()); + } + @Test public void testValidateColumnNameWithValidColumnName() { String columnName = "test"; diff --git a/src/test/java/io/cdap/plugin/gcp/bigquery/util/BigQueryUtilTest.java b/src/test/java/io/cdap/plugin/gcp/bigquery/util/BigQueryUtilTest.java index a09048e2d9..b199caabd6 100644 --- a/src/test/java/io/cdap/plugin/gcp/bigquery/util/BigQueryUtilTest.java +++ b/src/test/java/io/cdap/plugin/gcp/bigquery/util/BigQueryUtilTest.java @@ -23,6 +23,7 @@ import io.cdap.cdap.etl.api.FailureCollector; import io.cdap.cdap.etl.api.validation.ValidationFailure; import io.cdap.cdap.etl.mock.validation.MockFailureCollector; +import io.cdap.plugin.gcp.bigquery.source.BigQuerySourceConfig; import io.cdap.plugin.gcp.bigquery.util.BigQueryTypeSize.BigNumeric; import io.cdap.plugin.gcp.bigquery.util.BigQueryTypeSize.Numeric; import io.cdap.plugin.gcp.common.GCPUtils; @@ -478,4 +479,32 @@ public void testConvertFieldTypeJsonToString() { Schema result = BigQueryUtil.convertFieldType(field, null, null); Assert.assertEquals(expectedSchema, result); } + + @Test + public void testValidateFieldSchemaMatchesDate() { + MockFailureCollector collector = new MockFailureCollector(); + Field bigQueryField = Field.newBuilder("testFieldRepeatedDate", StandardSQLTypeName.DATE) + .setMode(Field.Mode.REPEATED).build(); + Schema.Field schemaField = Schema.Field.of("testFieldRepeatedDate", + Schema.nullableOf(Schema.arrayOf(Schema.of(Schema.LogicalType.DATE)))); + ValidationFailure failure = BigQueryUtil.validateFieldSchemaMatches(bigQueryField, schemaField, "dataset", + "table", BigQuerySourceConfig.SUPPORTED_TYPES, collector); + Assert.assertNull(failure); + Assert.assertEquals(0, collector.getValidationFailures().size()); + } + + @Test + public void testValidateFieldSchemaNotMatchesDate() { + MockFailureCollector collector = new MockFailureCollector(); + Field bigQueryField = Field.newBuilder("testFieldRepeatedDate", StandardSQLTypeName.DATE) + .setMode(Field.Mode.REPEATED).build(); + Schema.Field schemaField = Schema.Field.of("testFieldRepeatedDate", + Schema.nullableOf(Schema.arrayOf(Schema.of(Schema.Type.STRING)))); + ValidationFailure failure = BigQueryUtil.validateFieldSchemaMatches(bigQueryField, schemaField, "dataset", + "table", BigQuerySourceConfig.SUPPORTED_TYPES, collector); + Assert.assertNotNull(failure); + Assert.assertEquals(1, collector.getValidationFailures().size()); + Assert.assertEquals("Field 'testFieldRepeatedDate' of type 'string' is incompatible with" + + " column 'testFieldRepeatedDate' of type 'date' in BigQuery table 'dataset.table'.", failure.getMessage()); + } } diff --git a/widgets/BigQueryArgumentSetter-action.json b/widgets/BigQueryArgumentSetter-action.json index bf20dbde42..bf53ca9752 100644 --- a/widgets/BigQueryArgumentSetter-action.json +++ b/widgets/BigQueryArgumentSetter-action.json @@ -7,14 +7,6 @@ { "label": "Basic", "properties": [ - { - "widget-type": "textbox", - "label": "Reference Name", - "name": "referenceName", - "widget-attributes" : { - "placeholder": "Name used to identify this source for lineage" - } - }, { "widget-type": "connection-browser", "widget-category": "plugin", diff --git a/widgets/GCSArgumentSetter-action.json b/widgets/GCSArgumentSetter-action.json index 23a79cc1d6..4bef45d781 100644 --- a/widgets/GCSArgumentSetter-action.json +++ b/widgets/GCSArgumentSetter-action.json @@ -7,14 +7,6 @@ { "label": "Basic", "properties": [ - { - "widget-type": "textbox", - "label": "Reference Name", - "name": "referenceName", - "widget-attributes": { - "placeholder": "Name used to identify this source for lineage" - } - }, { "widget-type": "textbox", "label": "Project ID",