Skip to content

Commit

Permalink
Merge pull request data-integrations#1361 from cloudsufi/patch/time-p…
Browse files Browse the repository at this point in the history
…artitioning-type-ui

[PLUGIN-1739] Added TIME_PARTITIONING_TYPES_UI_MAP to correctly map ui field values
  • Loading branch information
vikasrathee-cs authored Mar 28, 2024
2 parents 0555650 + 8bd30c5 commit 3ecc0ed
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.google.cloud.kms.v1.CryptoKeyName;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import io.cdap.cdap.api.annotation.Description;
import io.cdap.cdap.api.annotation.Macro;
Expand Down Expand Up @@ -74,7 +75,6 @@ public final class BigQuerySinkConfig extends AbstractBigQuerySinkConfig {
public static final String NAME_RANGE_START = "rangeStart";
public static final String NAME_RANGE_END = "rangeEnd";
public static final String NAME_RANGE_INTERVAL = "rangeInterval";

public static final int MAX_NUMBER_OF_COLUMNS = 4;
// As defined in https://cloud.google.com/bigquery/docs/schemas#column_names
private static final int MAX_LENGTH_OF_COLUMN_NAME = 300;
Expand Down Expand Up @@ -200,13 +200,15 @@ private BigQuerySinkConfig(@Nullable String referenceName, @Nullable String proj
@Nullable String serviceAccountType, @Nullable String serviceFilePath,
@Nullable String serviceAccountJson,
@Nullable String dataset, @Nullable String table, @Nullable String location,
@Nullable String cmekKey, @Nullable String bucket, @Nullable String jobLabelKeyValue) {
@Nullable String cmekKey, @Nullable String bucket, @Nullable String jobLabelKeyValue,
@Nullable String timePartitioningType) {
super(new BigQueryConnectorConfig(project, project, serviceAccountType,
serviceFilePath, serviceAccountJson), dataset, cmekKey, bucket);
this.referenceName = referenceName;
this.table = table;
this.location = location;
this.jobLabelKeyValue = jobLabelKeyValue;
this.timePartitioningType = timePartitioningType;
}

public String getTable() {
Expand Down Expand Up @@ -282,8 +284,26 @@ public PartitionType getPartitioningType() {
}

public TimePartitioning.Type getTimePartitioningType() {
return Strings.isNullOrEmpty(timePartitioningType) ? TimePartitioning.Type.DAY :
TimePartitioning.Type.valueOf(timePartitioningType.toUpperCase());
// Default to DAY if timePartitioningType is not set
if (Strings.isNullOrEmpty(timePartitioningType)) {
return TimePartitioning.Type.DAY;
}
switch (timePartitioningType.toUpperCase()) {
case "DAY":
case "DAILY":
return TimePartitioning.Type.DAY;
case "HOUR":
case "HOURLY":
return TimePartitioning.Type.HOUR;
case "MONTH":
case "MONTHLY":
return TimePartitioning.Type.MONTH;
case "YEAR":
case "YEARLY":
return TimePartitioning.Type.YEAR;
default:
throw new IllegalArgumentException("Invalid time partitioning type: " + timePartitioningType);
}
}

/**
Expand Down Expand Up @@ -710,6 +730,7 @@ public static class Builder {
private String location;
private String bucket;
private String jobLabelKeyValue;
private String timePartitioningType;

public BigQuerySinkConfig.Builder setReferenceName(@Nullable String referenceName) {
this.referenceName = referenceName;
Expand Down Expand Up @@ -765,6 +786,11 @@ public BigQuerySinkConfig.Builder setJobLabelKeyValue(@Nullable String jobLabelK
return this;
}

public BigQuerySinkConfig.Builder setTimePartitioningType(@Nullable String timePartitioningType) {
this.timePartitioningType = timePartitioningType;
return this;
}

public BigQuerySinkConfig build() {
return new BigQuerySinkConfig(
referenceName,
Expand All @@ -777,7 +803,8 @@ public BigQuerySinkConfig build() {
location,
cmekKey,
bucket,
jobLabelKeyValue
jobLabelKeyValue,
timePartitioningType
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -272,4 +272,34 @@ public void testValidateTableNameWithInvalidTableName() {
Assert.assertEquals(invalidTableNameErrorMessage, collector.getValidationFailures().get(0).getMessage());
}
}

@Test
public void testGetTimePartitioningType() {
Map<String, TimePartitioning.Type> expectedTimePartitioningType = new HashMap<>();
expectedTimePartitioningType.put(null, TimePartitioning.Type.DAY);
expectedTimePartitioningType.put("", TimePartitioning.Type.DAY);
expectedTimePartitioningType.put("Day", TimePartitioning.Type.DAY);
expectedTimePartitioningType.put("Daily", TimePartitioning.Type.DAY);
expectedTimePartitioningType.put("Hour", TimePartitioning.Type.HOUR);
expectedTimePartitioningType.put("Hourly", TimePartitioning.Type.HOUR);
expectedTimePartitioningType.put("Month", TimePartitioning.Type.MONTH);
expectedTimePartitioningType.put("Monthly", TimePartitioning.Type.MONTH);
expectedTimePartitioningType.put("Year", TimePartitioning.Type.YEAR);
expectedTimePartitioningType.put("Yearly", TimePartitioning.Type.YEAR);

// Valid types
for (Map.Entry<String, TimePartitioning.Type> entry : expectedTimePartitioningType.entrySet()) {
config = configBuilder.setTimePartitioningType(entry.getKey()).build();
Assert.assertEquals(entry.getValue(), config.getTimePartitioningType());
}

// Invalid value
config = configBuilder.setTimePartitioningType("Din").build();
try {
config.getTimePartitioningType();
Assert.fail("Expected IllegalArgumentException was not thrown.");
} catch (IllegalArgumentException e) {
Assert.assertEquals("Invalid time partitioning type: Din", e.getMessage());
}
}
}

0 comments on commit 3ecc0ed

Please sign in to comment.