Skip to content

Commit

Permalink
Added support for destination table write preference in BQ Execute Pl…
Browse files Browse the repository at this point in the history
…ugin
  • Loading branch information
anshumanks committed Dec 12, 2024
1 parent 225e63b commit 40b5f74
Show file tree
Hide file tree
Showing 4 changed files with 97 additions and 2 deletions.
5 changes: 5 additions & 0 deletions docs/BigQueryExecute-action.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,11 @@ cache that will be flushed whenever tables in the query are modified.
It is only applicable when users choose to store the query results in a BigQuery table.
More information can be found [here](https://cloud.google.com/data-fusion/docs/how-to/customer-managed-encryption-keys)

**Destination Table Write Preference**: Provides the following options as write preferences for the destination table:
* **Write if Empty**: Only write if the table is empty
* **Append to Table**: Add results to existing data. Schema should match.
* **Overwrite Table**: Replace all existing data. Schema will also be overriden.

**Row As Arguments**: Row as arguments. For example, if the query is 'select min(id) as min_id, max(id) as max_id from my_dataset.my_table',
an arguments for 'min_id' and 'max_id' will be set based on the query results. Plugins further down the pipeline can then
reference these values with macros ${min_id} and ${max_id}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,11 @@

import java.io.IOException;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import javax.annotation.Nullable;

/**
Expand Down Expand Up @@ -111,6 +113,7 @@ public void run(ActionContext context) throws Exception {
String datasetProjectId = config.getDatasetProject();
if (config.getStoreResults() && datasetProjectId != null && datasetName != null && tableName != null) {
builder.setDestinationTable(TableId.of(datasetProjectId, datasetName, tableName));
builder.setWriteDisposition(JobInfo.WriteDisposition.valueOf(config.getWritePreference()));
}

// Enable or Disable the query cache to force live query evaluation.
Expand Down Expand Up @@ -274,6 +277,7 @@ public static final class Config extends AbstractBigQueryActionConfig {
private static final String SQL = "sql";
private static final String DATASET = "dataset";
private static final String TABLE = "table";
private static final String WRITE_PREFERENCE = "writePreference";
private static final String NAME_LOCATION = "location";
public static final String NAME_BQ_JOB_LABELS = "jobLabels";
private static final int ERROR_CODE_NOT_FOUND = 404;
Expand All @@ -290,6 +294,8 @@ public static final class Config extends AbstractBigQueryActionConfig {
// Sn = a * (1 - r^n) / (r - 1)
public static final long DEFULT_MAX_RETRY_DURATION_SECONDS = 63L;
public static final int DEFAULT_READ_TIMEOUT = 120;
public static final Set<String> VALID_WRITE_PREFERENCES = Arrays.stream(JobInfo.WriteDisposition.values())
.map(Enum::name).collect(Collectors.toSet());

@Description("Dialect of the SQL command. The value must be 'legacy' or 'standard'. " +
"If set to 'standard', the query will use BigQuery's standard SQL: " +
Expand Down Expand Up @@ -398,13 +404,20 @@ public static final class Config extends AbstractBigQueryActionConfig {
@Description("Timeout in seconds to read data from an established HTTP connection (Default value is 120).")
private Integer readTimeout;

@Name(WRITE_PREFERENCE)
@Nullable
@Macro
@Description("Specifies if a job should overwrite or append the existing destination table if it already exists.")
private String writePreference;

private Config(@Nullable String project, @Nullable String serviceAccountType, @Nullable String serviceFilePath,
@Nullable String serviceAccountJson, @Nullable String dataset, @Nullable String table,
@Nullable String location, @Nullable String cmekKey, @Nullable String dialect, @Nullable String sql,
@Nullable String mode, @Nullable Boolean storeResults, @Nullable String jobLabelKeyValue,
@Nullable String rowAsArguments, @Nullable Boolean retryOnBackendError,
@Nullable Long initialRetryDuration, @Nullable Long maxRetryDuration,
@Nullable Double retryMultiplier, @Nullable Integer maxRetryCount, @Nullable Integer readTimeout) {
@Nullable Double retryMultiplier, @Nullable Integer maxRetryCount, @Nullable Integer readTimeout,
@Nullable String writePreference) {
this.project = project;
this.serviceAccountType = serviceAccountType;
this.serviceFilePath = serviceFilePath;
Expand All @@ -425,6 +438,7 @@ private Config(@Nullable String project, @Nullable String serviceAccountType, @N
this.maxRetryCount = maxRetryCount;
this.retryMultiplier = retryMultiplier;
this.readTimeout = readTimeout;
this.writePreference = writePreference;
}

public boolean isLegacySQL() {
Expand All @@ -451,6 +465,11 @@ public Boolean getStoreResults() {
return storeResults == null || storeResults;
}

public String getWritePreference() {
String defaultPreference = JobInfo.WriteDisposition.WRITE_EMPTY.name();
return Strings.isNullOrEmpty(writePreference) ? defaultPreference : writePreference.toUpperCase();
}

public QueryJobConfiguration.Priority getMode() {
return QueryJobConfiguration.Priority.valueOf(mode.toUpperCase());
}
Expand Down Expand Up @@ -546,9 +565,25 @@ public void validate(FailureCollector failureCollector, Map<String, String> argu
validateJobLabelKeyValue(failureCollector);
}

if (!containsMacro(WRITE_PREFERENCE)) {
validateWritePreference(failureCollector, getWritePreference());
}

failureCollector.getOrThrowException();
}

void validateWritePreference(FailureCollector failureCollector, String writePreference) {
if (!VALID_WRITE_PREFERENCES.contains(writePreference)) {
failureCollector.addFailure(
String.format("Invalid write preference '%s'. Allowed values are '%s'.",
writePreference, VALID_WRITE_PREFERENCES.toString()
),
"Please provide a valid write preference."
)
.withConfigProperty(WRITE_PREFERENCE);
}
}

void validateJobLabelKeyValue(FailureCollector failureCollector) {
BigQueryUtil.validateJobLabelKeyValue(jobLabelKeyValue, failureCollector, NAME_BQ_JOB_LABELS);
// Verify retry configuration when retry on backend error is enabled and none of the retry configuration
Expand Down Expand Up @@ -677,6 +712,7 @@ public static class Builder {
private Integer maxRetryCount;
private Double retryMultiplier;
private Integer readTimeout;
private String writePreference;

public Builder setProject(@Nullable String project) {
this.project = project;
Expand Down Expand Up @@ -778,6 +814,11 @@ public Builder setReadTimeout(@Nullable Integer readTimeout) {
return this;
}

public Builder setWritePreference(@Nullable String writePreference) {
this.writePreference = writePreference;
return this;
}

public Config build() {
return new Config(
project,
Expand All @@ -799,7 +840,8 @@ public Config build() {
maxRetryDuration,
retryMultiplier,
maxRetryCount,
readTimeout
readTimeout,
writePreference
);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.google.cloud.bigquery.JobStatus;
import com.google.cloud.bigquery.QueryJobConfiguration;
import com.google.cloud.bigquery.TableResult;
import com.google.common.collect.ImmutableSet;
import io.cdap.cdap.api.metrics.Metrics;
import io.cdap.cdap.etl.api.StageMetrics;
import io.cdap.cdap.etl.api.action.ActionContext;
Expand All @@ -37,6 +38,7 @@
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import java.util.Set;

public class BigQueryExecuteTest {
@Mock
Expand Down Expand Up @@ -226,5 +228,24 @@ public void testValidateRetryConfigurationWithInvalidReadTimeout() {
failureCollector.getValidationFailures().get(0).getMessage());
}

@Test
public void testValidateWritePreferenceWithInvalidValue() {
config.validateWritePreference(failureCollector, "INVALID_PREFERENCE");

// Assert validation failure
Assert.assertEquals(1, failureCollector.getValidationFailures().size());
Assert.assertEquals(
String.format("Invalid write preference 'INVALID_PREFERENCE'. Allowed values are '%s'.",
config.VALID_WRITE_PREFERENCES.toString()
),
failureCollector.getValidationFailures().get(0).getMessage()
);
}

@Test
public void testDefaultWritePreference() {
Assert.assertEquals(JobInfo.WriteDisposition.WRITE_EMPTY.name(), config.getWritePreference());
}

}

27 changes: 27 additions & 0 deletions widgets/BigQueryExecute-action.json
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,29 @@
"placeholder": "projects/<gcp-project-id>/locations/<key-location>/keyRings/<key-ring-name>/cryptoKeys/<key-name>"
}
},
{
"widget-type": "radio-group",
"label": "Destination Table Write Preference",
"name": "writePreference",
"widget-attributes": {
"layout": "vertical",
"default": "WRITE_EMPTY",
"options": [
{
"id": "WRITE_EMPTY",
"label": "Write if Empty"
},
{
"id": "WRITE_APPEND",
"label": "Append to Table"
},
{
"id": "WRITE_TRUNCATE",
"label": "Overwrite Table"
}
]
}
},
{
"name": "rowAsArguments",
"label": "Row As Arguments",
Expand Down Expand Up @@ -313,6 +336,10 @@
{
"type": "property",
"name": "cmekKey"
},
{
"type": "property",
"name": "writePreference"
}
]
}
Expand Down

0 comments on commit 40b5f74

Please sign in to comment.