Skip to content

Commit

Permalink
Added support for destination table write preference in BQ Execute Pl…
Browse files Browse the repository at this point in the history
…ugin

Addressed PR comments: updated description, added documentation for the added feature

Formatted to fix failing tests

Refactor: Replace hardcoded write preference strings with JobInfo.WriteDisposition constants

Refactor: Simplify error message for invalid write preference validation

Refactor: Centralize valid write preferences in BigQueryExecute.Config

Refactor: Dynamically generate valid write preferences from JobInfo.WriteDisposition
  • Loading branch information
anshumanks committed Dec 12, 2024
1 parent 225e63b commit d7b5253
Show file tree
Hide file tree
Showing 4 changed files with 97 additions and 2 deletions.
5 changes: 5 additions & 0 deletions docs/BigQueryExecute-action.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,11 @@ cache that will be flushed whenever tables in the query are modified.
It is only applicable when users choose to store the query results in a BigQuery table.
More information can be found [here](https://cloud.google.com/data-fusion/docs/how-to/customer-managed-encryption-keys)

**Destination Table Write Preference**: Provides the following options as write preferences for the destination table:
* **Write if Empty**: Only write if the table is empty
* **Append to Table**: Add results to existing data. Schema should match.
* **Overwrite Table**: Replace all existing data. Schema will also be overriden.

**Row As Arguments**: Row as arguments. For example, if the query is 'select min(id) as min_id, max(id) as max_id from my_dataset.my_table',
an arguments for 'min_id' and 'max_id' will be set based on the query results. Plugins further down the pipeline can then
reference these values with macros ${min_id} and ${max_id}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,11 @@

import java.io.IOException;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import javax.annotation.Nullable;

/**
Expand Down Expand Up @@ -111,6 +113,7 @@ public void run(ActionContext context) throws Exception {
String datasetProjectId = config.getDatasetProject();
if (config.getStoreResults() && datasetProjectId != null && datasetName != null && tableName != null) {
builder.setDestinationTable(TableId.of(datasetProjectId, datasetName, tableName));
builder.setWriteDisposition(JobInfo.WriteDisposition.valueOf(config.getWritePreference()));
}

// Enable or Disable the query cache to force live query evaluation.
Expand Down Expand Up @@ -274,6 +277,7 @@ public static final class Config extends AbstractBigQueryActionConfig {
private static final String SQL = "sql";
private static final String DATASET = "dataset";
private static final String TABLE = "table";
private static final String WRITE_PREFERENCE = "writePreference";
private static final String NAME_LOCATION = "location";
public static final String NAME_BQ_JOB_LABELS = "jobLabels";
private static final int ERROR_CODE_NOT_FOUND = 404;
Expand All @@ -290,6 +294,8 @@ public static final class Config extends AbstractBigQueryActionConfig {
// Sn = a * (1 - r^n) / (r - 1)
public static final long DEFULT_MAX_RETRY_DURATION_SECONDS = 63L;
public static final int DEFAULT_READ_TIMEOUT = 120;
public static final Set<String> VALID_WRITE_PREFERENCES = Arrays.stream(JobInfo.WriteDisposition.values())
.map(Enum::name).collect(Collectors.toSet());

@Description("Dialect of the SQL command. The value must be 'legacy' or 'standard'. " +
"If set to 'standard', the query will use BigQuery's standard SQL: " +
Expand Down Expand Up @@ -398,13 +404,20 @@ public static final class Config extends AbstractBigQueryActionConfig {
@Description("Timeout in seconds to read data from an established HTTP connection (Default value is 120).")
private Integer readTimeout;

@Name(WRITE_PREFERENCE)
@Nullable
@Macro
@Description("Specifies if a job should overwrite or append the existing destination table if it already exists.")
private String writePreference;

private Config(@Nullable String project, @Nullable String serviceAccountType, @Nullable String serviceFilePath,
@Nullable String serviceAccountJson, @Nullable String dataset, @Nullable String table,
@Nullable String location, @Nullable String cmekKey, @Nullable String dialect, @Nullable String sql,
@Nullable String mode, @Nullable Boolean storeResults, @Nullable String jobLabelKeyValue,
@Nullable String rowAsArguments, @Nullable Boolean retryOnBackendError,
@Nullable Long initialRetryDuration, @Nullable Long maxRetryDuration,
@Nullable Double retryMultiplier, @Nullable Integer maxRetryCount, @Nullable Integer readTimeout) {
@Nullable Double retryMultiplier, @Nullable Integer maxRetryCount, @Nullable Integer readTimeout,
@Nullable String writePreference) {
this.project = project;
this.serviceAccountType = serviceAccountType;
this.serviceFilePath = serviceFilePath;
Expand All @@ -425,6 +438,7 @@ private Config(@Nullable String project, @Nullable String serviceAccountType, @N
this.maxRetryCount = maxRetryCount;
this.retryMultiplier = retryMultiplier;
this.readTimeout = readTimeout;
this.writePreference = writePreference;
}

public boolean isLegacySQL() {
Expand All @@ -451,6 +465,11 @@ public Boolean getStoreResults() {
return storeResults == null || storeResults;
}

public String getWritePreference() {
String defaultPreference = JobInfo.WriteDisposition.WRITE_EMPTY.name();
return Strings.isNullOrEmpty(writePreference) ? defaultPreference : writePreference.toUpperCase();
}

public QueryJobConfiguration.Priority getMode() {
return QueryJobConfiguration.Priority.valueOf(mode.toUpperCase());
}
Expand Down Expand Up @@ -546,9 +565,25 @@ public void validate(FailureCollector failureCollector, Map<String, String> argu
validateJobLabelKeyValue(failureCollector);
}

if (!containsMacro(WRITE_PREFERENCE)) {
validateWritePreference(failureCollector, getWritePreference());
}

failureCollector.getOrThrowException();
}

void validateWritePreference(FailureCollector failureCollector, String writePreference) {
if (!VALID_WRITE_PREFERENCES.contains(writePreference)) {
failureCollector.addFailure(
String.format("Invalid write preference '%s'. Allowed values are '%s'.",
writePreference, VALID_WRITE_PREFERENCES.toString()
),
"Please provide a valid write preference."
)
.withConfigProperty(WRITE_PREFERENCE);
}
}

void validateJobLabelKeyValue(FailureCollector failureCollector) {
BigQueryUtil.validateJobLabelKeyValue(jobLabelKeyValue, failureCollector, NAME_BQ_JOB_LABELS);
// Verify retry configuration when retry on backend error is enabled and none of the retry configuration
Expand Down Expand Up @@ -677,6 +712,7 @@ public static class Builder {
private Integer maxRetryCount;
private Double retryMultiplier;
private Integer readTimeout;
private String writePreference;

public Builder setProject(@Nullable String project) {
this.project = project;
Expand Down Expand Up @@ -778,6 +814,11 @@ public Builder setReadTimeout(@Nullable Integer readTimeout) {
return this;
}

public Builder setWritePreference(@Nullable String writePreference) {
this.writePreference = writePreference;
return this;
}

public Config build() {
return new Config(
project,
Expand All @@ -799,7 +840,8 @@ public Config build() {
maxRetryDuration,
retryMultiplier,
maxRetryCount,
readTimeout
readTimeout,
writePreference
);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.google.cloud.bigquery.JobStatus;
import com.google.cloud.bigquery.QueryJobConfiguration;
import com.google.cloud.bigquery.TableResult;
import com.google.common.collect.ImmutableSet;
import io.cdap.cdap.api.metrics.Metrics;
import io.cdap.cdap.etl.api.StageMetrics;
import io.cdap.cdap.etl.api.action.ActionContext;
Expand All @@ -37,6 +38,7 @@
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import java.util.Set;

public class BigQueryExecuteTest {
@Mock
Expand Down Expand Up @@ -226,5 +228,24 @@ public void testValidateRetryConfigurationWithInvalidReadTimeout() {
failureCollector.getValidationFailures().get(0).getMessage());
}

@Test
public void testValidateWritePreferenceWithInvalidValue() {
config.validateWritePreference(failureCollector, "INVALID_PREFERENCE");

// Assert validation failure
Assert.assertEquals(1, failureCollector.getValidationFailures().size());
Assert.assertEquals(
String.format("Invalid write preference 'INVALID_PREFERENCE'. Allowed values are '%s'.",
config.VALID_WRITE_PREFERENCES.toString()
),
failureCollector.getValidationFailures().get(0).getMessage()
);
}

@Test
public void testDefaultWritePreference() {
Assert.assertEquals(JobInfo.WriteDisposition.WRITE_EMPTY.name(), config.getWritePreference());
}

}

27 changes: 27 additions & 0 deletions widgets/BigQueryExecute-action.json
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,29 @@
"placeholder": "projects/<gcp-project-id>/locations/<key-location>/keyRings/<key-ring-name>/cryptoKeys/<key-name>"
}
},
{
"widget-type": "radio-group",
"label": "Destination Table Write Preference",
"name": "writePreference",
"widget-attributes": {
"layout": "vertical",
"default": "WRITE_EMPTY",
"options": [
{
"id": "WRITE_EMPTY",
"label": "Write if Empty"
},
{
"id": "WRITE_APPEND",
"label": "Append to Table"
},
{
"id": "WRITE_TRUNCATE",
"label": "Overwrite Table"
}
]
}
},
{
"name": "rowAsArguments",
"label": "Row As Arguments",
Expand Down Expand Up @@ -313,6 +336,10 @@
{
"type": "property",
"name": "cmekKey"
},
{
"type": "property",
"name": "writePreference"
}
]
}
Expand Down

0 comments on commit d7b5253

Please sign in to comment.