diff --git a/docs/BigQueryExecute-action.md b/docs/BigQueryExecute-action.md index b7129c9da..269032ea1 100644 --- a/docs/BigQueryExecute-action.md +++ b/docs/BigQueryExecute-action.md @@ -62,6 +62,11 @@ cache that will be flushed whenever tables in the query are modified. It is only applicable when users choose to store the query results in a BigQuery table. More information can be found [here](https://cloud.google.com/data-fusion/docs/how-to/customer-managed-encryption-keys) +**Destination Table Write Preference**: Provides the following options as write preferences for the destination table: +* **Write if Empty**: Only write if the table is empty +* **Append to Table**: Add results to existing data. Schema should match. +* **Overwrite Table**: Replace all existing data. Schema will also be overriden. + **Row As Arguments**: Row as arguments. For example, if the query is 'select min(id) as min_id, max(id) as max_id from my_dataset.my_table', an arguments for 'min_id' and 'max_id' will be set based on the query results. Plugins further down the pipeline can then reference these values with macros ${min_id} and ${max_id}. diff --git a/src/main/java/io/cdap/plugin/gcp/bigquery/action/BigQueryExecute.java b/src/main/java/io/cdap/plugin/gcp/bigquery/action/BigQueryExecute.java index bb82eaabc..1566fe4fb 100644 --- a/src/main/java/io/cdap/plugin/gcp/bigquery/action/BigQueryExecute.java +++ b/src/main/java/io/cdap/plugin/gcp/bigquery/action/BigQueryExecute.java @@ -58,9 +58,11 @@ import java.io.IOException; import java.time.Duration; +import java.util.Arrays; import java.util.Collections; import java.util.Map; import java.util.Set; +import java.util.stream.Collectors; import javax.annotation.Nullable; /** @@ -111,6 +113,7 @@ public void run(ActionContext context) throws Exception { String datasetProjectId = config.getDatasetProject(); if (config.getStoreResults() && datasetProjectId != null && datasetName != null && tableName != null) { builder.setDestinationTable(TableId.of(datasetProjectId, datasetName, tableName)); + builder.setWriteDisposition(JobInfo.WriteDisposition.valueOf(config.getWritePreference())); } // Enable or Disable the query cache to force live query evaluation. @@ -274,6 +277,7 @@ public static final class Config extends AbstractBigQueryActionConfig { private static final String SQL = "sql"; private static final String DATASET = "dataset"; private static final String TABLE = "table"; + private static final String WRITE_PREFERENCE = "writePreference"; private static final String NAME_LOCATION = "location"; public static final String NAME_BQ_JOB_LABELS = "jobLabels"; private static final int ERROR_CODE_NOT_FOUND = 404; @@ -290,6 +294,8 @@ public static final class Config extends AbstractBigQueryActionConfig { // Sn = a * (1 - r^n) / (r - 1) public static final long DEFULT_MAX_RETRY_DURATION_SECONDS = 63L; public static final int DEFAULT_READ_TIMEOUT = 120; + public static final Set VALID_WRITE_PREFERENCES = Arrays.stream(JobInfo.WriteDisposition.values()) + .map(Enum::name).collect(Collectors.toSet()); @Description("Dialect of the SQL command. The value must be 'legacy' or 'standard'. " + "If set to 'standard', the query will use BigQuery's standard SQL: " + @@ -398,13 +404,20 @@ public static final class Config extends AbstractBigQueryActionConfig { @Description("Timeout in seconds to read data from an established HTTP connection (Default value is 120).") private Integer readTimeout; + @Name(WRITE_PREFERENCE) + @Nullable + @Macro + @Description("Specifies if a job should overwrite or append the existing destination table if it already exists.") + private String writePreference; + private Config(@Nullable String project, @Nullable String serviceAccountType, @Nullable String serviceFilePath, @Nullable String serviceAccountJson, @Nullable String dataset, @Nullable String table, @Nullable String location, @Nullable String cmekKey, @Nullable String dialect, @Nullable String sql, @Nullable String mode, @Nullable Boolean storeResults, @Nullable String jobLabelKeyValue, @Nullable String rowAsArguments, @Nullable Boolean retryOnBackendError, @Nullable Long initialRetryDuration, @Nullable Long maxRetryDuration, - @Nullable Double retryMultiplier, @Nullable Integer maxRetryCount, @Nullable Integer readTimeout) { + @Nullable Double retryMultiplier, @Nullable Integer maxRetryCount, @Nullable Integer readTimeout, + @Nullable String writePreference) { this.project = project; this.serviceAccountType = serviceAccountType; this.serviceFilePath = serviceFilePath; @@ -425,6 +438,7 @@ private Config(@Nullable String project, @Nullable String serviceAccountType, @N this.maxRetryCount = maxRetryCount; this.retryMultiplier = retryMultiplier; this.readTimeout = readTimeout; + this.writePreference = writePreference; } public boolean isLegacySQL() { @@ -451,6 +465,11 @@ public Boolean getStoreResults() { return storeResults == null || storeResults; } + public String getWritePreference() { + String defaultPreference = JobInfo.WriteDisposition.WRITE_EMPTY.name(); + return Strings.isNullOrEmpty(writePreference) ? defaultPreference : writePreference.toUpperCase(); + } + public QueryJobConfiguration.Priority getMode() { return QueryJobConfiguration.Priority.valueOf(mode.toUpperCase()); } @@ -546,9 +565,25 @@ public void validate(FailureCollector failureCollector, Map argu validateJobLabelKeyValue(failureCollector); } + if (!containsMacro(WRITE_PREFERENCE)) { + validateWritePreference(failureCollector, getWritePreference()); + } + failureCollector.getOrThrowException(); } + void validateWritePreference(FailureCollector failureCollector, String writePreference) { + if (!VALID_WRITE_PREFERENCES.contains(writePreference)) { + failureCollector.addFailure( + String.format("Invalid write preference '%s'. Allowed values are '%s'.", + writePreference, VALID_WRITE_PREFERENCES.toString() + ), + "Please provide a valid write preference." + ) + .withConfigProperty(WRITE_PREFERENCE); + } + } + void validateJobLabelKeyValue(FailureCollector failureCollector) { BigQueryUtil.validateJobLabelKeyValue(jobLabelKeyValue, failureCollector, NAME_BQ_JOB_LABELS); // Verify retry configuration when retry on backend error is enabled and none of the retry configuration @@ -677,6 +712,7 @@ public static class Builder { private Integer maxRetryCount; private Double retryMultiplier; private Integer readTimeout; + private String writePreference; public Builder setProject(@Nullable String project) { this.project = project; @@ -778,6 +814,11 @@ public Builder setReadTimeout(@Nullable Integer readTimeout) { return this; } + public Builder setWritePreference(@Nullable String writePreference) { + this.writePreference = writePreference; + return this; + } + public Config build() { return new Config( project, @@ -799,7 +840,8 @@ public Config build() { maxRetryDuration, retryMultiplier, maxRetryCount, - readTimeout + readTimeout, + writePreference ); } } diff --git a/src/test/java/io/cdap/plugin/gcp/bigquery/action/BigQueryExecuteTest.java b/src/test/java/io/cdap/plugin/gcp/bigquery/action/BigQueryExecuteTest.java index cd939a3d9..53fe7c766 100644 --- a/src/test/java/io/cdap/plugin/gcp/bigquery/action/BigQueryExecuteTest.java +++ b/src/test/java/io/cdap/plugin/gcp/bigquery/action/BigQueryExecuteTest.java @@ -25,6 +25,7 @@ import com.google.cloud.bigquery.JobStatus; import com.google.cloud.bigquery.QueryJobConfiguration; import com.google.cloud.bigquery.TableResult; +import com.google.common.collect.ImmutableSet; import io.cdap.cdap.api.metrics.Metrics; import io.cdap.cdap.etl.api.StageMetrics; import io.cdap.cdap.etl.api.action.ActionContext; @@ -37,6 +38,7 @@ import org.mockito.Mock; import org.mockito.Mockito; import org.mockito.MockitoAnnotations; +import java.util.Set; public class BigQueryExecuteTest { @Mock @@ -226,5 +228,24 @@ public void testValidateRetryConfigurationWithInvalidReadTimeout() { failureCollector.getValidationFailures().get(0).getMessage()); } + @Test + public void testValidateWritePreferenceWithInvalidValue() { + config.validateWritePreference(failureCollector, "INVALID_PREFERENCE"); + + // Assert validation failure + Assert.assertEquals(1, failureCollector.getValidationFailures().size()); + Assert.assertEquals( + String.format("Invalid write preference 'INVALID_PREFERENCE'. Allowed values are '%s'.", + config.VALID_WRITE_PREFERENCES.toString() + ), + failureCollector.getValidationFailures().get(0).getMessage() + ); + } + + @Test + public void testDefaultWritePreference() { + Assert.assertEquals(JobInfo.WriteDisposition.WRITE_EMPTY.name(), config.getWritePreference()); + } + } diff --git a/widgets/BigQueryExecute-action.json b/widgets/BigQueryExecute-action.json index 21b6e1ef6..9cf840a37 100644 --- a/widgets/BigQueryExecute-action.json +++ b/widgets/BigQueryExecute-action.json @@ -150,6 +150,29 @@ "placeholder": "projects//locations//keyRings//cryptoKeys/" } }, + { + "widget-type": "radio-group", + "label": "Destination Table Write Preference", + "name": "writePreference", + "widget-attributes": { + "layout": "vertical", + "default": "WRITE_EMPTY", + "options": [ + { + "id": "WRITE_EMPTY", + "label": "Write if Empty" + }, + { + "id": "WRITE_APPEND", + "label": "Append to Table" + }, + { + "id": "WRITE_TRUNCATE", + "label": "Overwrite Table" + } + ] + } + }, { "name": "rowAsArguments", "label": "Row As Arguments", @@ -313,6 +336,10 @@ { "type": "property", "name": "cmekKey" + }, + { + "type": "property", + "name": "writePreference" } ] }