Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PLUGIN-1438: Added support for destination table write preference in BQ Execute Plugin #1473

Merged
merged 1 commit into from
Dec 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/BigQueryExecute-action.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,11 @@ cache that will be flushed whenever tables in the query are modified.
It is only applicable when users choose to store the query results in a BigQuery table.
More information can be found [here](https://cloud.google.com/data-fusion/docs/how-to/customer-managed-encryption-keys)

**Destination Table Write Preference**: Provides the following options as write preferences for the destination table:
* **Write if Empty**: Only write if the table is empty
* **Append to Table**: Add results to existing data. Schema should match.
* **Overwrite Table**: Replace all existing data. Schema will also be overriden.

**Row As Arguments**: Row as arguments. For example, if the query is 'select min(id) as min_id, max(id) as max_id from my_dataset.my_table',
an arguments for 'min_id' and 'max_id' will be set based on the query results. Plugins further down the pipeline can then
reference these values with macros ${min_id} and ${max_id}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,11 @@

import java.io.IOException;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import javax.annotation.Nullable;

/**
Expand Down Expand Up @@ -111,6 +113,7 @@ public void run(ActionContext context) throws Exception {
String datasetProjectId = config.getDatasetProject();
if (config.getStoreResults() && datasetProjectId != null && datasetName != null && tableName != null) {
builder.setDestinationTable(TableId.of(datasetProjectId, datasetName, tableName));
builder.setWriteDisposition(JobInfo.WriteDisposition.valueOf(config.getWritePreference()));
}

// Enable or Disable the query cache to force live query evaluation.
Expand Down Expand Up @@ -274,6 +277,7 @@ public static final class Config extends AbstractBigQueryActionConfig {
private static final String SQL = "sql";
private static final String DATASET = "dataset";
private static final String TABLE = "table";
private static final String WRITE_PREFERENCE = "writePreference";
private static final String NAME_LOCATION = "location";
public static final String NAME_BQ_JOB_LABELS = "jobLabels";
private static final int ERROR_CODE_NOT_FOUND = 404;
Expand All @@ -290,6 +294,8 @@ public static final class Config extends AbstractBigQueryActionConfig {
// Sn = a * (1 - r^n) / (r - 1)
public static final long DEFULT_MAX_RETRY_DURATION_SECONDS = 63L;
public static final int DEFAULT_READ_TIMEOUT = 120;
public static final Set<String> VALID_WRITE_PREFERENCES = Arrays.stream(JobInfo.WriteDisposition.values())
.map(Enum::name).collect(Collectors.toSet());

@Description("Dialect of the SQL command. The value must be 'legacy' or 'standard'. " +
"If set to 'standard', the query will use BigQuery's standard SQL: " +
Expand Down Expand Up @@ -398,13 +404,20 @@ public static final class Config extends AbstractBigQueryActionConfig {
@Description("Timeout in seconds to read data from an established HTTP connection (Default value is 120).")
private Integer readTimeout;

@Name(WRITE_PREFERENCE)
@Nullable
@Macro
@Description("Specifies if a job should overwrite or append the existing destination table if it already exists.")
private String writePreference;

private Config(@Nullable String project, @Nullable String serviceAccountType, @Nullable String serviceFilePath,
@Nullable String serviceAccountJson, @Nullable String dataset, @Nullable String table,
@Nullable String location, @Nullable String cmekKey, @Nullable String dialect, @Nullable String sql,
@Nullable String mode, @Nullable Boolean storeResults, @Nullable String jobLabelKeyValue,
@Nullable String rowAsArguments, @Nullable Boolean retryOnBackendError,
@Nullable Long initialRetryDuration, @Nullable Long maxRetryDuration,
@Nullable Double retryMultiplier, @Nullable Integer maxRetryCount, @Nullable Integer readTimeout) {
@Nullable Double retryMultiplier, @Nullable Integer maxRetryCount, @Nullable Integer readTimeout,
@Nullable String writePreference) {
this.project = project;
this.serviceAccountType = serviceAccountType;
this.serviceFilePath = serviceFilePath;
Expand All @@ -425,6 +438,7 @@ private Config(@Nullable String project, @Nullable String serviceAccountType, @N
this.maxRetryCount = maxRetryCount;
this.retryMultiplier = retryMultiplier;
this.readTimeout = readTimeout;
this.writePreference = writePreference;
}

public boolean isLegacySQL() {
Expand All @@ -451,6 +465,11 @@ public Boolean getStoreResults() {
return storeResults == null || storeResults;
}

public String getWritePreference() {
String defaultPreference = JobInfo.WriteDisposition.WRITE_EMPTY.name();
return Strings.isNullOrEmpty(writePreference) ? defaultPreference : writePreference.toUpperCase();
}

public QueryJobConfiguration.Priority getMode() {
return QueryJobConfiguration.Priority.valueOf(mode.toUpperCase());
}
Expand Down Expand Up @@ -546,9 +565,25 @@ public void validate(FailureCollector failureCollector, Map<String, String> argu
validateJobLabelKeyValue(failureCollector);
}

if (!containsMacro(WRITE_PREFERENCE)) {
validateWritePreference(failureCollector, getWritePreference());
}

failureCollector.getOrThrowException();
}

void validateWritePreference(FailureCollector failureCollector, String writePreference) {
if (!VALID_WRITE_PREFERENCES.contains(writePreference)) {
failureCollector.addFailure(
String.format("Invalid write preference '%s'. Allowed values are '%s'.",
writePreference, VALID_WRITE_PREFERENCES.toString()
),
"Please provide a valid write preference."
)
.withConfigProperty(WRITE_PREFERENCE);
}
}

void validateJobLabelKeyValue(FailureCollector failureCollector) {
BigQueryUtil.validateJobLabelKeyValue(jobLabelKeyValue, failureCollector, NAME_BQ_JOB_LABELS);
// Verify retry configuration when retry on backend error is enabled and none of the retry configuration
Expand Down Expand Up @@ -677,6 +712,7 @@ public static class Builder {
private Integer maxRetryCount;
private Double retryMultiplier;
private Integer readTimeout;
private String writePreference;

public Builder setProject(@Nullable String project) {
this.project = project;
Expand Down Expand Up @@ -778,6 +814,11 @@ public Builder setReadTimeout(@Nullable Integer readTimeout) {
return this;
}

public Builder setWritePreference(@Nullable String writePreference) {
this.writePreference = writePreference;
return this;
}

public Config build() {
return new Config(
project,
Expand All @@ -799,7 +840,8 @@ public Config build() {
maxRetryDuration,
retryMultiplier,
maxRetryCount,
readTimeout
readTimeout,
writePreference
);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.google.cloud.bigquery.JobStatus;
import com.google.cloud.bigquery.QueryJobConfiguration;
import com.google.cloud.bigquery.TableResult;
import com.google.common.collect.ImmutableSet;
import io.cdap.cdap.api.metrics.Metrics;
import io.cdap.cdap.etl.api.StageMetrics;
import io.cdap.cdap.etl.api.action.ActionContext;
Expand All @@ -37,6 +38,7 @@
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import java.util.Set;

public class BigQueryExecuteTest {
@Mock
Expand Down Expand Up @@ -226,5 +228,24 @@ public void testValidateRetryConfigurationWithInvalidReadTimeout() {
failureCollector.getValidationFailures().get(0).getMessage());
}

@Test
public void testValidateWritePreferenceWithInvalidValue() {
config.validateWritePreference(failureCollector, "INVALID_PREFERENCE");

// Assert validation failure
Assert.assertEquals(1, failureCollector.getValidationFailures().size());
Assert.assertEquals(
String.format("Invalid write preference 'INVALID_PREFERENCE'. Allowed values are '%s'.",
config.VALID_WRITE_PREFERENCES.toString()
),
failureCollector.getValidationFailures().get(0).getMessage()
);
}

@Test
public void testDefaultWritePreference() {
Assert.assertEquals(JobInfo.WriteDisposition.WRITE_EMPTY.name(), config.getWritePreference());
}

}

27 changes: 27 additions & 0 deletions widgets/BigQueryExecute-action.json
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,29 @@
"placeholder": "projects/<gcp-project-id>/locations/<key-location>/keyRings/<key-ring-name>/cryptoKeys/<key-name>"
}
},
{
"widget-type": "radio-group",
"label": "Destination Table Write Preference",
"name": "writePreference",
"widget-attributes": {
"layout": "vertical",
"default": "WRITE_EMPTY",
"options": [
{
"id": "WRITE_EMPTY",
"label": "Write if Empty"
},
{
"id": "WRITE_APPEND",
"label": "Append to Table"
},
{
"id": "WRITE_TRUNCATE",
"label": "Overwrite Table"
}
]
}
},
{
"name": "rowAsArguments",
"label": "Row As Arguments",
Expand Down Expand Up @@ -313,6 +336,10 @@
{
"type": "property",
"name": "cmekKey"
},
{
"type": "property",
"name": "writePreference"
}
]
}
Expand Down
Loading