From 8bd30c549a12a35c873fdcfa3786a8328a5cf1e3 Mon Sep 17 00:00:00 2001 From: psainics Date: Fri, 26 Jan 2024 22:25:23 +0530 Subject: [PATCH] Added TIME_PARTITIONING_TYPES_UI_MAP to correctly map ui field values --- .../gcp/bigquery/sink/BigQuerySinkConfig.java | 37 ++++++++++++++++--- .../bigquery/sink/BigQuerySinkConfigTest.java | 30 +++++++++++++++ 2 files changed, 62 insertions(+), 5 deletions(-) diff --git a/src/main/java/io/cdap/plugin/gcp/bigquery/sink/BigQuerySinkConfig.java b/src/main/java/io/cdap/plugin/gcp/bigquery/sink/BigQuerySinkConfig.java index 994ba5a52b..78bda8f77d 100644 --- a/src/main/java/io/cdap/plugin/gcp/bigquery/sink/BigQuerySinkConfig.java +++ b/src/main/java/io/cdap/plugin/gcp/bigquery/sink/BigQuerySinkConfig.java @@ -24,6 +24,7 @@ import com.google.cloud.kms.v1.CryptoKeyName; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Strings; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import io.cdap.cdap.api.annotation.Description; import io.cdap.cdap.api.annotation.Macro; @@ -74,7 +75,6 @@ public final class BigQuerySinkConfig extends AbstractBigQuerySinkConfig { public static final String NAME_RANGE_START = "rangeStart"; public static final String NAME_RANGE_END = "rangeEnd"; public static final String NAME_RANGE_INTERVAL = "rangeInterval"; - public static final int MAX_NUMBER_OF_COLUMNS = 4; // As defined in https://cloud.google.com/bigquery/docs/schemas#column_names private static final int MAX_LENGTH_OF_COLUMN_NAME = 300; @@ -200,13 +200,15 @@ private BigQuerySinkConfig(@Nullable String referenceName, @Nullable String proj @Nullable String serviceAccountType, @Nullable String serviceFilePath, @Nullable String serviceAccountJson, @Nullable String dataset, @Nullable String table, @Nullable String location, - @Nullable String cmekKey, @Nullable String bucket, @Nullable String jobLabelKeyValue) { + @Nullable String cmekKey, @Nullable String bucket, @Nullable String jobLabelKeyValue, + @Nullable String timePartitioningType) { super(new BigQueryConnectorConfig(project, project, serviceAccountType, serviceFilePath, serviceAccountJson), dataset, cmekKey, bucket); this.referenceName = referenceName; this.table = table; this.location = location; this.jobLabelKeyValue = jobLabelKeyValue; + this.timePartitioningType = timePartitioningType; } public String getTable() { @@ -282,8 +284,26 @@ public PartitionType getPartitioningType() { } public TimePartitioning.Type getTimePartitioningType() { - return Strings.isNullOrEmpty(timePartitioningType) ? TimePartitioning.Type.DAY : - TimePartitioning.Type.valueOf(timePartitioningType.toUpperCase()); + // Default to DAY if timePartitioningType is not set + if (Strings.isNullOrEmpty(timePartitioningType)) { + return TimePartitioning.Type.DAY; + } + switch (timePartitioningType.toUpperCase()) { + case "DAY": + case "DAILY": + return TimePartitioning.Type.DAY; + case "HOUR": + case "HOURLY": + return TimePartitioning.Type.HOUR; + case "MONTH": + case "MONTHLY": + return TimePartitioning.Type.MONTH; + case "YEAR": + case "YEARLY": + return TimePartitioning.Type.YEAR; + default: + throw new IllegalArgumentException("Invalid time partitioning type: " + timePartitioningType); + } } /** @@ -710,6 +730,7 @@ public static class Builder { private String location; private String bucket; private String jobLabelKeyValue; + private String timePartitioningType; public BigQuerySinkConfig.Builder setReferenceName(@Nullable String referenceName) { this.referenceName = referenceName; @@ -765,6 +786,11 @@ public BigQuerySinkConfig.Builder setJobLabelKeyValue(@Nullable String jobLabelK return this; } + public BigQuerySinkConfig.Builder setTimePartitioningType(@Nullable String timePartitioningType) { + this.timePartitioningType = timePartitioningType; + return this; + } + public BigQuerySinkConfig build() { return new BigQuerySinkConfig( referenceName, @@ -777,7 +803,8 @@ public BigQuerySinkConfig build() { location, cmekKey, bucket, - jobLabelKeyValue + jobLabelKeyValue, + timePartitioningType ); } diff --git a/src/test/java/io/cdap/plugin/gcp/bigquery/sink/BigQuerySinkConfigTest.java b/src/test/java/io/cdap/plugin/gcp/bigquery/sink/BigQuerySinkConfigTest.java index ebaa553df3..c22562a33a 100644 --- a/src/test/java/io/cdap/plugin/gcp/bigquery/sink/BigQuerySinkConfigTest.java +++ b/src/test/java/io/cdap/plugin/gcp/bigquery/sink/BigQuerySinkConfigTest.java @@ -272,4 +272,34 @@ public void testValidateTableNameWithInvalidTableName() { Assert.assertEquals(invalidTableNameErrorMessage, collector.getValidationFailures().get(0).getMessage()); } } + + @Test + public void testGetTimePartitioningType() { + Map expectedTimePartitioningType = new HashMap<>(); + expectedTimePartitioningType.put(null, TimePartitioning.Type.DAY); + expectedTimePartitioningType.put("", TimePartitioning.Type.DAY); + expectedTimePartitioningType.put("Day", TimePartitioning.Type.DAY); + expectedTimePartitioningType.put("Daily", TimePartitioning.Type.DAY); + expectedTimePartitioningType.put("Hour", TimePartitioning.Type.HOUR); + expectedTimePartitioningType.put("Hourly", TimePartitioning.Type.HOUR); + expectedTimePartitioningType.put("Month", TimePartitioning.Type.MONTH); + expectedTimePartitioningType.put("Monthly", TimePartitioning.Type.MONTH); + expectedTimePartitioningType.put("Year", TimePartitioning.Type.YEAR); + expectedTimePartitioningType.put("Yearly", TimePartitioning.Type.YEAR); + + // Valid types + for (Map.Entry entry : expectedTimePartitioningType.entrySet()) { + config = configBuilder.setTimePartitioningType(entry.getKey()).build(); + Assert.assertEquals(entry.getValue(), config.getTimePartitioningType()); + } + + // Invalid value + config = configBuilder.setTimePartitioningType("Din").build(); + try { + config.getTimePartitioningType(); + Assert.fail("Expected IllegalArgumentException was not thrown."); + } catch (IllegalArgumentException e) { + Assert.assertEquals("Invalid time partitioning type: Din", e.getMessage()); + } + } }