diff --git a/src/main/java/io/cdap/plugin/gcp/bigquery/sink/BigQuerySinkConfig.java b/src/main/java/io/cdap/plugin/gcp/bigquery/sink/BigQuerySinkConfig.java index c5ffcc6e1..1ecfc01d8 100644 --- a/src/main/java/io/cdap/plugin/gcp/bigquery/sink/BigQuerySinkConfig.java +++ b/src/main/java/io/cdap/plugin/gcp/bigquery/sink/BigQuerySinkConfig.java @@ -526,23 +526,21 @@ private void validateTimePartitioningColumn(String columnName, FailureCollector boolean isTimestamp = logicalType == LogicalType.TIMESTAMP_MICROS || logicalType == LogicalType.TIMESTAMP_MILLIS; boolean isDate = logicalType == LogicalType.DATE; - boolean isTimestampOrDate = isTimestamp || isDate; - - // If timePartitioningType is HOUR, then logicalType cannot be DATE Only TIMESTAMP_MICROS and TIMESTAMP_MILLIS - if (timePartitioningType == TimePartitioning.Type.HOUR && !isTimestamp) { - collector.addFailure( - String.format("Partition column '%s' is of invalid type '%s'.", + boolean isDateTime = logicalType == LogicalType.DATETIME; + boolean isTimestampOrDateOrDateTime = isTimestamp || isDate || isDateTime; + boolean isTimestampOrDateTime = isTimestamp || isDateTime; + // TimePartitioningType HOUR is supported by TIMESTAMP_MICROS, TIMESTAMP_MILLIS, DATETIME + if (timePartitioningType == TimePartitioning.Type.HOUR && !isTimestampOrDateTime) { + collector.addFailure(String.format("Partition column '%s' is of invalid type '%s'.", columnName, fieldSchema.getDisplayName()), - "Partition column must be a timestamp.").withConfigProperty(NAME_PARTITION_BY_FIELD) - .withOutputSchemaField(columnName).withInputSchemaField(columnName); - - // For any other timePartitioningType (DAY, MONTH, YEAR) logicalType can be DATE, TIMESTAMP_MICROS, TIMESTAMP_MILLIS - } else if (!isTimestampOrDate) { - collector.addFailure( - String.format("Partition column '%s' is of invalid type '%s'.", + "Partition column must be of type TIMESTAMP or DATETIME") + .withConfigProperty(NAME_PARTITION_BY_FIELD).withOutputSchemaField(columnName).withInputSchemaField(columnName); + // TimePartitioningType (DAY, MONTH, YEAR) are supported by TIMESTAMP_MICROS, TIMESTAMP_MILLIS, DATE, DATETIME + } else if (!isTimestampOrDateOrDateTime) { + collector.addFailure(String.format("Partition column '%s' is of invalid type '%s'.", columnName, fieldSchema.getDisplayName()), - "Partition column must be a date or timestamp.").withConfigProperty(NAME_PARTITION_BY_FIELD) - .withOutputSchemaField(columnName).withInputSchemaField(columnName); + "Partition column must be of type TIMESTAMP, DATE or DATETIME") + .withConfigProperty(NAME_PARTITION_BY_FIELD).withOutputSchemaField(columnName).withInputSchemaField(columnName); } } diff --git a/src/test/java/io/cdap/plugin/gcp/bigquery/sink/BigQuerySinkConfigTest.java b/src/test/java/io/cdap/plugin/gcp/bigquery/sink/BigQuerySinkConfigTest.java index c22562a33..73ebef8d5 100644 --- a/src/test/java/io/cdap/plugin/gcp/bigquery/sink/BigQuerySinkConfigTest.java +++ b/src/test/java/io/cdap/plugin/gcp/bigquery/sink/BigQuerySinkConfigTest.java @@ -129,6 +129,36 @@ public void testValidateTimePartitioningColumnWithNullAndDate() throws Assert.assertEquals(0, collector.getValidationFailures().size()); } + @Test + public void testValidateTimePartitioningColumnWithMonthAndDateTime() throws + InvocationTargetException, IllegalAccessException { + + String columnName = "partitionFrom"; + Schema schema = Schema.of(Schema.LogicalType.DATETIME); + + Schema fieldSchema = schema.isNullable() ? schema.getNonNullable() : schema; + TimePartitioning.Type timePartitioningType = TimePartitioning.Type.MONTH; + + validateTimePartitioningColumnMethod.invoke(config, columnName, collector, fieldSchema, timePartitioningType); + // No error as null time timePartitioningType will default to DAY + Assert.assertEquals(0, collector.getValidationFailures().size()); + } + + @Test + public void testValidateTimePartitioningColumnWithHourAndDateTime() throws + InvocationTargetException, IllegalAccessException { + + String columnName = "partitionFrom"; + Schema schema = Schema.of(Schema.LogicalType.DATETIME); + + Schema fieldSchema = schema.isNullable() ? schema.getNonNullable() : schema; + TimePartitioning.Type timePartitioningType = TimePartitioning.Type.HOUR; + + validateTimePartitioningColumnMethod.invoke(config, columnName, collector, fieldSchema, timePartitioningType); + // No error as null time timePartitioningType will default to DAY + Assert.assertEquals(0, collector.getValidationFailures().size()); + } + @Test public void testValidateColumnNameWithValidColumnName() { String columnName = "test";