From 35987f2e840330bb88c7b590e6b1550d4c03acc5 Mon Sep 17 00:00:00 2001 From: psainics Date: Fri, 27 Oct 2023 19:56:41 +0530 Subject: [PATCH] Added BQ Retry --- docs/BigQueryExecute-action.md | 12 + pom.xml | 6 + .../gcp/bigquery/action/BigQueryExecute.java | 244 ++++++++++++++++-- .../BigQueryJobExecutionException.java | 38 +++ .../bigquery/action/BigQueryExecuteTest.java | 198 ++++++++++++++ widgets/BigQueryExecute-action.json | 81 ++++++ 6 files changed, 563 insertions(+), 16 deletions(-) create mode 100644 src/main/java/io/cdap/plugin/gcp/bigquery/exception/BigQueryJobExecutionException.java create mode 100644 src/test/java/io/cdap/plugin/gcp/bigquery/action/BigQueryExecuteTest.java diff --git a/docs/BigQueryExecute-action.md b/docs/BigQueryExecute-action.md index 961b59504b..daec65e72f 100644 --- a/docs/BigQueryExecute-action.md +++ b/docs/BigQueryExecute-action.md @@ -65,3 +65,15 @@ authorization. Can be set to 'auto-detect' when running on a Dataproc cluster. When running on other clusters, the file must be present on every node in the cluster. * **JSON**: Contents of the service account JSON file. + +Retry Configuration +---------- +**Retry On Backend Error**: Whether to retry when a backend error occurs. Default is false. (Macro-enabled) + +**Initial Retry Duration (Seconds)**: Time taken for the first retry. Default is 5 seconds. (Macro-enabled) + +**Max Retry Duration (Seconds)**: Maximum time in seconds retries can take. Default is 32 seconds. (Macro-enabled) + +**Max Retry Count**: Maximum number of retries allowed. Default is 5. (Macro-enabled) + +**Retry Multiplier**: Multiplier used to calculate the next retry duration. Default is 2. (Macro-enabled) \ No newline at end of file diff --git a/pom.xml b/pom.xml index 0096ddcd84..fc710f9c03 100644 --- a/pom.xml +++ b/pom.xml @@ -98,6 +98,7 @@ 1.7.5 3.3.2 0.23.1 + 3.3.2 ${project.basedir}/src/test/java/ @@ -840,6 +841,11 @@ + + dev.failsafe + failsafe + ${failsafe.version} + diff --git a/src/main/java/io/cdap/plugin/gcp/bigquery/action/BigQueryExecute.java b/src/main/java/io/cdap/plugin/gcp/bigquery/action/BigQueryExecute.java index 78455fdb4d..77e80733d2 100644 --- a/src/main/java/io/cdap/plugin/gcp/bigquery/action/BigQueryExecute.java +++ b/src/main/java/io/cdap/plugin/gcp/bigquery/action/BigQueryExecute.java @@ -33,8 +33,12 @@ import com.google.cloud.bigquery.TableId; import com.google.cloud.bigquery.TableResult; import com.google.cloud.kms.v1.CryptoKeyName; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Strings; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import dev.failsafe.Failsafe; +import dev.failsafe.RetryPolicy; import io.cdap.cdap.api.annotation.Description; import io.cdap.cdap.api.annotation.Macro; import io.cdap.cdap.api.annotation.Name; @@ -43,6 +47,7 @@ import io.cdap.cdap.etl.api.action.Action; import io.cdap.cdap.etl.api.action.ActionContext; import io.cdap.cdap.etl.common.Constants; +import io.cdap.plugin.gcp.bigquery.exception.BigQueryJobExecutionException; import io.cdap.plugin.gcp.bigquery.sink.BigQuerySinkUtils; import io.cdap.plugin.gcp.bigquery.util.BigQueryUtil; import io.cdap.plugin.gcp.common.CmekUtils; @@ -51,8 +56,10 @@ import org.slf4j.LoggerFactory; import java.io.IOException; +import java.time.Duration; import java.util.Collections; import java.util.Map; +import java.util.Set; import javax.annotation.Nullable; /** @@ -69,8 +76,18 @@ public final class BigQueryExecute extends AbstractBigQueryAction { private static final Logger LOG = LoggerFactory.getLogger(BigQueryExecute.class); public static final String NAME = "BigQueryExecute"; private static final String RECORDS_PROCESSED = "records.processed"; - private Config config; + private static final String JOB_BACKEND_ERROR = "jobBackendError"; + private static final String JOB_INTERNAL_ERROR = "jobInternalError"; + private static final Set RETRY_ON_REASON = ImmutableSet.of(JOB_BACKEND_ERROR, JOB_INTERNAL_ERROR); + + BigQueryExecute() { + // no args constructor + } + @VisibleForTesting + BigQueryExecute(Config config) { + this.config = config; + } @Override public void run(ActionContext context) throws Exception { @@ -103,9 +120,6 @@ public void run(ActionContext context) throws Exception { // Enable legacy SQL builder.setUseLegacySql(config.isLegacySQL()); - // Location must match that of the dataset(s) referenced in the query. - JobId jobId = JobId.newBuilder().setRandomJob().setLocation(config.getLocation()).build(); - // API request - starts the query. Credentials credentials = config.getServiceAccount() == null ? null : GCPUtils.loadServiceAccountCredentials(config.getServiceAccount(), @@ -129,19 +143,66 @@ public void run(ActionContext context) throws Exception { QueryJobConfiguration queryConfig = builder.build(); - Job queryJob = bigQuery.create(JobInfo.newBuilder(queryConfig).setJobId(jobId).build()); + // Exponential backoff with initial retry of 1 second and max retry of 32 seconds. + executeQueryWithExponentialBackoff(bigQuery, queryConfig, context); + } + + protected void executeQueryWithExponentialBackoff(BigQuery bigQuery, + QueryJobConfiguration queryConfig, ActionContext context) + throws BigQueryJobExecutionException { + try { + Failsafe.with(getRetryPolicy()).run(() -> executeQuery(bigQuery, queryConfig, context)); + } catch (RuntimeException e) { + String errorMessage = "Failed to execute BigQuery job. Reason: " + e.getMessage(); + LOG.error(errorMessage); + throw new BigQueryJobExecutionException(errorMessage, e); + } + + } + + private RetryPolicy getRetryPolicy() { + return RetryPolicy.builder() + .handle(BigQueryJobExecutionException.class) + .withBackoff(Duration.ofSeconds(config.getInitialRetryDuration()), + Duration.ofSeconds(config.getMaxRetryDuration()), config.getRetryMultiplier()) + .withMaxRetries(config.getMaxRetryCount()) + .onRetry(event -> LOG.debug("Retrying BigQuery Execute job. Retry count: {}", event.getAttemptCount())) + .onSuccess(event -> LOG.debug("BigQuery Execute job executed successfully.")) + .onRetriesExceeded(event -> LOG.error("Retry limit reached for BigQuery Execute job.")) + .build(); + } + + private void executeQuery(BigQuery bigQuery, QueryJobConfiguration queryConfig, ActionContext context) + throws InterruptedException, BigQueryJobExecutionException { + // Location must match that of the dataset(s) referenced in the query. + JobId jobId = JobId.newBuilder().setRandomJob().setLocation(config.getLocation()).build(); + Job queryJob; - LOG.info("Executing SQL as job {}.", jobId.getJob()); - LOG.debug("The BigQuery SQL is {}", config.getSql()); + try { + queryJob = bigQuery.create(JobInfo.newBuilder(queryConfig).setJobId(jobId).build()); - // Wait for the query to complete - queryJob = queryJob.waitFor(); + LOG.info("Executing SQL as job {}.", jobId.getJob()); + LOG.debug("The BigQuery SQL is {}", config.getSql()); + + // Wait for the query to complete + queryJob = queryJob.waitFor(); + } catch (BigQueryException e) { + LOG.error("The query job {} failed. Error: {}", jobId.getJob(), e.getMessage()); + if (RETRY_ON_REASON.contains(e.getError().getReason())) { + throw new BigQueryJobExecutionException(e.getMessage(), e); + } + throw new RuntimeException(e); + } // Check for errors if (queryJob.getStatus().getError() != null) { // You can also look at queryJob.getStatus().getExecutionErrors() for all // errors, not just the latest one. - throw new RuntimeException(queryJob.getStatus().getExecutionErrors().toString()); + LOG.error("The query job {} failed. Error: {}", jobId.getJob(), queryJob.getStatus().getError()); + if (RETRY_ON_REASON.contains(queryJob.getStatus().getError().getReason())) { + throw new BigQueryJobExecutionException(queryJob.getStatus().getError().getMessage()); + } + throw new RuntimeException(queryJob.getStatus().getError().getMessage()); } TableResult queryResults = queryJob.getQueryResults(); @@ -181,14 +242,14 @@ public void run(ActionContext context) throws Exception { private void recordBytesProcessedMetric(ActionContext context, Job queryJob) { long processedBytes = - ((JobStatistics.QueryStatistics) queryJob.getStatistics()).getTotalBytesProcessed(); + ((JobStatistics.QueryStatistics) queryJob.getStatistics()).getTotalBytesProcessed(); LOG.info("Job {} processed {} bytes", queryJob.getJobId(), processedBytes); Map tags = new ImmutableMap.Builder() .put(Constants.Metrics.Tag.APP_ENTITY_TYPE, Action.PLUGIN_TYPE) .put(Constants.Metrics.Tag.APP_ENTITY_TYPE_NAME, BigQueryExecute.NAME) .build(); context.getMetrics().child(tags).countLong(BigQuerySinkUtils.BYTES_PROCESSED_METRIC, - processedBytes); + processedBytes); } @Override @@ -207,6 +268,16 @@ public static final class Config extends AbstractBigQueryActionConfig { private static final String NAME_LOCATION = "location"; private static final int ERROR_CODE_NOT_FOUND = 404; private static final String STORE_RESULTS = "storeResults"; + private static final String NAME_RETRY_ON_BACKEND_ERROR = "retryOnBackendError"; + private static final String NAME_INITIAL_RETRY_DURATION = "initialRetryDuration"; + private static final String NAME_MAX_RETRY_DURATION = "maxRetryDuration"; + private static final String NAME_RETRY_MULTIPLIER = "retryMultiplier"; + private static final String NAME_MAX_RETRY_COUNT = "maxRetryCount"; + public static final long DEFAULT_INITIAL_RETRY_DURATION_SECONDS = 1L; + public static final double DEFAULT_RETRY_MULTIPLIER = 2.0; + public static final int DEFAULT_MAX_RETRY_COUNT = 5; + // Sn = a * (1 - r^n) / (r - 1) + public static final long DEFULT_MAX_RETRY_DURATION_SECONDS = 63L; @Description("Dialect of the SQL command. The value must be 'legacy' or 'standard'. " + "If set to 'standard', the query will use BigQuery's standard SQL: " + @@ -267,6 +338,36 @@ public static final class Config extends AbstractBigQueryActionConfig { @Macro private String rowAsArguments; + @Name(NAME_RETRY_ON_BACKEND_ERROR) + @Description("Whether to retry on backend error. Default is false.") + @Nullable + @Macro + private Boolean retryOnBackendError; + + @Name(NAME_INITIAL_RETRY_DURATION) + @Description("Time taken for the first retry. Default is 1 seconds.") + @Nullable + @Macro + private Long initialRetryDuration; + + @Name(NAME_MAX_RETRY_DURATION) + @Description("Maximum time in seconds retries can take. Default is 32 seconds.") + @Nullable + @Macro + private Long maxRetryDuration; + + @Name(NAME_MAX_RETRY_COUNT) + @Description("Maximum number of retries allowed. Default is 5.") + @Nullable + @Macro + private Integer maxRetryCount; + + @Name(NAME_RETRY_MULTIPLIER) + @Description("Multiplier for exponential backoff. Default is 2.") + @Nullable + @Macro + private Double retryMultiplier; + @Name(STORE_RESULTS) @Nullable @Description("Whether to store results in a BigQuery Table.") @@ -275,7 +376,10 @@ public static final class Config extends AbstractBigQueryActionConfig { private Config(@Nullable String project, @Nullable String serviceAccountType, @Nullable String serviceFilePath, @Nullable String serviceAccountJson, @Nullable String dataset, @Nullable String table, @Nullable String location, @Nullable String cmekKey, @Nullable String dialect, @Nullable String sql, - @Nullable String mode, @Nullable Boolean storeResults) { + @Nullable String mode, @Nullable String rowAsArguments, @Nullable Boolean storeResults, + @Nullable Boolean retryOnBackendError, @Nullable Long initialRetryDuration, + @Nullable Long maxRetryDuration, @Nullable Double retryMultiplier, + @Nullable Integer maxRetryCount) { this.project = project; this.serviceAccountType = serviceAccountType; this.serviceFilePath = serviceFilePath; @@ -287,7 +391,13 @@ private Config(@Nullable String project, @Nullable String serviceAccountType, @N this.dialect = dialect; this.sql = sql; this.mode = mode; + this.rowAsArguments = rowAsArguments; this.storeResults = storeResults; + this.retryOnBackendError = retryOnBackendError; + this.initialRetryDuration = initialRetryDuration; + this.maxRetryDuration = maxRetryDuration; + this.maxRetryCount = maxRetryCount; + this.retryMultiplier = retryMultiplier; } public boolean isLegacySQL() { @@ -328,6 +438,25 @@ public String getTable() { return table; } + public boolean getRetryOnBackendError() { + return retryOnBackendError != null && retryOnBackendError; + } + public long getInitialRetryDuration() { + return initialRetryDuration == null ? DEFAULT_INITIAL_RETRY_DURATION_SECONDS : initialRetryDuration; + } + + public long getMaxRetryDuration() { + return maxRetryDuration == null ? DEFULT_MAX_RETRY_DURATION_SECONDS : maxRetryDuration; + } + + public double getRetryMultiplier() { + return retryMultiplier == null ? DEFAULT_RETRY_MULTIPLIER : retryMultiplier; + } + + public int getMaxRetryCount() { + return maxRetryCount == null ? DEFAULT_MAX_RETRY_COUNT : maxRetryCount; + } + @Override public void validate(FailureCollector failureCollector) { validate(failureCollector, Collections.emptyMap()); @@ -375,10 +504,47 @@ public void validate(FailureCollector failureCollector, Map argu if (!containsMacro(NAME_CMEK_KEY)) { validateCmekKey(failureCollector, arguments); } - + // Verify retry configuration when retry on backend error is enabled and none of the retry configuration + // properties are macros. + if (!containsMacro(NAME_RETRY_ON_BACKEND_ERROR) && retryOnBackendError != null && retryOnBackendError && + !containsMacro(NAME_INITIAL_RETRY_DURATION) && !containsMacro(NAME_MAX_RETRY_DURATION) && + !containsMacro(NAME_MAX_RETRY_COUNT) && !containsMacro(NAME_RETRY_MULTIPLIER)) { + validateRetryConfiguration( + failureCollector, initialRetryDuration, maxRetryDuration, maxRetryCount, retryMultiplier + ); + } failureCollector.getOrThrowException(); } + void validateRetryConfiguration(FailureCollector failureCollector, Long initialRetryDuration, + Long maxRetryDuration, Integer maxRetryCount, Double retryMultiplier) { + if (initialRetryDuration != null && initialRetryDuration <= 0) { + failureCollector.addFailure("Initial retry duration must be greater than 0.", + "Please specify a valid initial retry duration.") + .withConfigProperty(NAME_INITIAL_RETRY_DURATION); + } + if (maxRetryDuration != null && maxRetryDuration <= 0) { + failureCollector.addFailure("Max retry duration must be greater than 0.", + "Please specify a valid max retry duration.") + .withConfigProperty(NAME_MAX_RETRY_DURATION); + } + if (maxRetryCount != null && maxRetryCount <= 0) { + failureCollector.addFailure("Max retry count must be greater than 0.", + "Please specify a valid max retry count.") + .withConfigProperty(NAME_MAX_RETRY_COUNT); + } + if (retryMultiplier != null && retryMultiplier <= 1) { + failureCollector.addFailure("Retry multiplier must be strictly greater than 1.", + "Please specify a valid retry multiplier.") + .withConfigProperty(NAME_RETRY_MULTIPLIER); + } + if (maxRetryDuration != null && initialRetryDuration != null && maxRetryDuration <= initialRetryDuration) { + failureCollector.addFailure("Max retry duration must be greater than initial retry duration.", + "Please specify a valid max retry duration.") + .withConfigProperty(NAME_MAX_RETRY_DURATION); + } + } + void validateCmekKey(FailureCollector failureCollector, Map arguments) { CryptoKeyName cmekKeyName = CmekUtils.getCmekKey(cmekKey, arguments, failureCollector); //these fields are needed to check if bucket exists or not and for location validation @@ -448,7 +614,13 @@ public static class Builder { private String dialect; private String sql; private String mode; + private String rowAsArguments; private Boolean storeResults; + private Boolean retryOnBackendError; + private Long initialRetryDuration; + private Long maxRetryDuration; + private Integer maxRetryCount; + private Double retryMultiplier; public Builder setProject(@Nullable String project) { this.project = project; @@ -500,11 +672,46 @@ public Builder setMode(@Nullable String mode) { return this; } + public Builder setRowAsArguments(@Nullable String rowAsArguments) { + this.rowAsArguments = rowAsArguments; + return this; + } + public Builder setSql(@Nullable String sql) { this.sql = sql; return this; } + public Builder setRetryOnBackendError(@Nullable Boolean retryOnBackendError) { + this.retryOnBackendError = retryOnBackendError; + return this; + } + + public Builder setStoreResults(@Nullable Boolean storeResults) { + this.storeResults = storeResults; + return this; + } + + public Builder setInitialRetryDuration(@Nullable Long initialRetryDuration) { + this.initialRetryDuration = initialRetryDuration; + return this; + } + + public Builder setMaxRetryDuration(@Nullable Long maxRetryDuration) { + this.maxRetryDuration = maxRetryDuration; + return this; + } + + public Builder setMaxRetryCount(@Nullable Integer maxRetryCount) { + this.maxRetryCount = maxRetryCount; + return this; + } + + public Builder setRetryMultiplier(@Nullable Double retryMultiplier) { + this.retryMultiplier = retryMultiplier; + return this; + } + public Config build() { return new Config( project, @@ -518,10 +725,15 @@ public Config build() { dialect, sql, mode, - storeResults + rowAsArguments, + storeResults, + retryOnBackendError, + initialRetryDuration, + maxRetryDuration, + retryMultiplier, + maxRetryCount ); } - } } } diff --git a/src/main/java/io/cdap/plugin/gcp/bigquery/exception/BigQueryJobExecutionException.java b/src/main/java/io/cdap/plugin/gcp/bigquery/exception/BigQueryJobExecutionException.java new file mode 100644 index 0000000000..26861535cf --- /dev/null +++ b/src/main/java/io/cdap/plugin/gcp/bigquery/exception/BigQueryJobExecutionException.java @@ -0,0 +1,38 @@ +/* + * Copyright © 2023 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.gcp.bigquery.exception; + +/** + * Custom exception class for handling errors related to BigQuery job execution. + * This exception should be thrown when an issue occurs during the execution of a BigQuery job, + * and the calling code should consider retrying the operation. + */ +public class BigQueryJobExecutionException extends Exception { + /** + * Constructs a new BigQueryJobExecutionException with the specified detail message. + * + * @param message The detail message that describes the exception. + */ + public BigQueryJobExecutionException(String message) { + super(message); + } + + public BigQueryJobExecutionException(String message, Throwable cause) { + super(message, cause); + } +} + diff --git a/src/test/java/io/cdap/plugin/gcp/bigquery/action/BigQueryExecuteTest.java b/src/test/java/io/cdap/plugin/gcp/bigquery/action/BigQueryExecuteTest.java new file mode 100644 index 0000000000..43e3edb86c --- /dev/null +++ b/src/test/java/io/cdap/plugin/gcp/bigquery/action/BigQueryExecuteTest.java @@ -0,0 +1,198 @@ +/* + * Copyright © 2023 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.gcp.bigquery.action; + +import com.google.cloud.bigquery.BigQuery; +import com.google.cloud.bigquery.BigQueryError; +import com.google.cloud.bigquery.Job; +import com.google.cloud.bigquery.JobId; +import com.google.cloud.bigquery.JobInfo; +import com.google.cloud.bigquery.JobStatistics; +import com.google.cloud.bigquery.JobStatus; +import com.google.cloud.bigquery.QueryJobConfiguration; +import com.google.cloud.bigquery.TableResult; +import io.cdap.cdap.api.metrics.Metrics; +import io.cdap.cdap.etl.api.StageMetrics; +import io.cdap.cdap.etl.api.action.ActionContext; + +import io.cdap.cdap.etl.mock.validation.MockFailureCollector; +import io.cdap.plugin.gcp.bigquery.exception.BigQueryJobExecutionException; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.MockitoAnnotations; + +public class BigQueryExecuteTest { + @Mock + BigQuery bigQuery; + @Mock + Job queryJob; + @Mock + JobStatus jobStatus; + @Mock + BigQueryError bigQueryError; + @Mock + TableResult queryResults; + @Mock + JobStatistics.QueryStatistics queryStatistics; + @Mock + ActionContext context; + @Mock + StageMetrics stageMetrics; + @Mock + Metrics metrics; + QueryJobConfiguration queryJobConfiguration; + BigQueryExecute.Config config; + JobInfo jobInfo; + JobId jobId; + BigQueryExecute bq; + MockFailureCollector failureCollector; + + @Before + public void setUp() throws InterruptedException, NoSuchMethodException { + MockitoAnnotations.initMocks(this); + failureCollector = new MockFailureCollector(); + queryJobConfiguration = QueryJobConfiguration.newBuilder("select * from test").build(); + config = BigQueryExecute.Config.builder() + .setLocation("US").setProject("testProject").setRowAsArguments("false") + .setInitialRetryDuration(1L).setMaxRetryDuration(5L) + .setMaxRetryCount(1).setRetryMultiplier(2.0).build(); + jobId = JobId.newBuilder().setRandomJob().setLocation(config.getLocation()).build(); + jobInfo = JobInfo.newBuilder(queryJobConfiguration).setJobId(jobId).build(); + bq = new BigQueryExecute(config); + + // Mock Job Creation + Mockito.when(bigQuery.create((JobInfo) Mockito.any())).thenReturn(queryJob); + Mockito.when(queryJob.waitFor()).thenReturn(queryJob); + Mockito.when(queryJob.getStatus()).thenReturn(jobStatus); + Mockito.when(jobStatus.getError()).thenReturn(bigQueryError); + + // Mock Successful Query + Mockito.when(queryJob.getQueryResults()).thenReturn(queryResults); + Mockito.when(queryResults.getTotalRows()).thenReturn(1L); + Mockito.when(queryJob.getStatistics()).thenReturn(queryStatistics); + Mockito.when(queryStatistics.getTotalBytesProcessed()).thenReturn(1L); + + // Mock context + Mockito.when(context.getMetrics()).thenReturn(stageMetrics); + Mockito.doNothing().when(stageMetrics).gauge(Mockito.anyString(), Mockito.anyLong()); + Mockito.when(stageMetrics.child(Mockito.any())).thenReturn(metrics); + Mockito.doNothing().when(metrics).countLong(Mockito.anyString(), Mockito.anyLong()); + + } + + @Test(expected = BigQueryJobExecutionException.class) + public void testExecuteQueryWithExponentialBackoffFailsWithNonRetryError() throws BigQueryJobExecutionException { + Mockito.when(bigQueryError.getReason()).thenReturn("accessDenied"); + bq.executeQueryWithExponentialBackoff(bigQuery, queryJobConfiguration, context); + } + + @Test(expected = BigQueryJobExecutionException.class) + public void testExecuteQueryWithExponentialBackoffFailsRetryError() throws BigQueryJobExecutionException { + Mockito.when(bigQueryError.getReason()).thenReturn("jobBackendError"); + bq.executeQueryWithExponentialBackoff(bigQuery, queryJobConfiguration, context); + } + + @Test + public void testExecuteQueryWithExponentialBackoffSuccess() + throws InterruptedException, BigQueryJobExecutionException { + Mockito.when(jobStatus.getError()).thenReturn(null); + Mockito.when(queryJob.getQueryResults()).thenReturn(queryResults); + bq.executeQueryWithExponentialBackoff(bigQuery, queryJobConfiguration, context); + } + + @Test + public void testValidateRetryConfigurationWithDefaultValues() { + config.validateRetryConfiguration(failureCollector, + BigQueryExecute.Config.DEFAULT_INITIAL_RETRY_DURATION_SECONDS, + BigQueryExecute.Config.DEFULT_MAX_RETRY_DURATION_SECONDS, + BigQueryExecute.Config.DEFAULT_MAX_RETRY_COUNT, + BigQueryExecute.Config.DEFAULT_RETRY_MULTIPLIER); + Assert.assertEquals(0, failureCollector.getValidationFailures().size()); + } + + @Test + public void testValidateRetryConfigurationWithInvalidInitialRetryDuration() { + config.validateRetryConfiguration(failureCollector, -1L, + BigQueryExecute.Config.DEFULT_MAX_RETRY_DURATION_SECONDS, + BigQueryExecute.Config.DEFAULT_MAX_RETRY_COUNT, + BigQueryExecute.Config.DEFAULT_RETRY_MULTIPLIER); + Assert.assertEquals(1, failureCollector.getValidationFailures().size()); + Assert.assertEquals("Initial retry duration must be greater than 0.", + failureCollector.getValidationFailures().get(0).getMessage()); + } + + @Test + public void testValidateRetryConfigurationWithInvalidMaxRetryDuration() { + config.validateRetryConfiguration(failureCollector, + BigQueryExecute.Config.DEFAULT_INITIAL_RETRY_DURATION_SECONDS, -1L, + BigQueryExecute.Config.DEFAULT_MAX_RETRY_COUNT, + BigQueryExecute.Config.DEFAULT_RETRY_MULTIPLIER); + Assert.assertEquals(2, failureCollector.getValidationFailures().size()); + Assert.assertEquals("Max retry duration must be greater than 0.", + failureCollector.getValidationFailures().get(0).getMessage()); + Assert.assertEquals("Max retry duration must be greater than initial retry duration.", + failureCollector.getValidationFailures().get(1).getMessage()); + } + + @Test + public void testValidateRetryConfigurationWithInvalidRetryMultiplier() { + config.validateRetryConfiguration(failureCollector, + BigQueryExecute.Config.DEFAULT_INITIAL_RETRY_DURATION_SECONDS, + BigQueryExecute.Config.DEFULT_MAX_RETRY_DURATION_SECONDS, + BigQueryExecute.Config.DEFAULT_MAX_RETRY_COUNT, -1.0); + Assert.assertEquals(1, failureCollector.getValidationFailures().size()); + Assert.assertEquals("Retry multiplier must be strictly greater than 1.", + failureCollector.getValidationFailures().get(0).getMessage()); + } + + @Test + public void testValidateRetryConfigurationWithInvalidRetryMultiplierAndMaxRetryCount() { + config.validateRetryConfiguration(failureCollector, + BigQueryExecute.Config.DEFAULT_INITIAL_RETRY_DURATION_SECONDS, + BigQueryExecute.Config.DEFULT_MAX_RETRY_DURATION_SECONDS, -1, + BigQueryExecute.Config.DEFAULT_RETRY_MULTIPLIER); + Assert.assertEquals(1, failureCollector.getValidationFailures().size()); + Assert.assertEquals("Max retry count must be greater than 0.", + failureCollector.getValidationFailures().get(0).getMessage()); + } + + @Test + public void testValidateRetryConfigurationWithMultiplierOne() { + config.validateRetryConfiguration(failureCollector, + BigQueryExecute.Config.DEFAULT_INITIAL_RETRY_DURATION_SECONDS, + BigQueryExecute.Config.DEFULT_MAX_RETRY_DURATION_SECONDS, + BigQueryExecute.Config.DEFAULT_MAX_RETRY_COUNT, 1.0); + Assert.assertEquals(1, failureCollector.getValidationFailures().size()); + Assert.assertEquals("Retry multiplier must be strictly greater than 1.", + failureCollector.getValidationFailures().get(0).getMessage()); + } + + @Test + public void testValidateRetryConfigurationWithMaxRetryLessThanInitialRetry() { + config.validateRetryConfiguration(failureCollector, 10L, 5L, + BigQueryExecute.Config.DEFAULT_MAX_RETRY_COUNT, + BigQueryExecute.Config.DEFAULT_RETRY_MULTIPLIER); + Assert.assertEquals(1, failureCollector.getValidationFailures().size()); + Assert.assertEquals("Max retry duration must be greater than initial retry duration.", + failureCollector.getValidationFailures().get(0).getMessage()); + } + +} + diff --git a/widgets/BigQueryExecute-action.json b/widgets/BigQueryExecute-action.json index 9e8942e813..b325c8b8ad 100644 --- a/widgets/BigQueryExecute-action.json +++ b/widgets/BigQueryExecute-action.json @@ -193,6 +193,63 @@ "name": "serviceAccountJSON" } ] + }, + { + "label": "Retry Configuration", + "properties": [ + { + "widget-type": "toggle", + "label": "Retry On Backend Error", + "name": "retryOnBackendError", + "widget-attributes": { + "on": { + "value": "true", + "label": "YES" + }, + "off": { + "value": "false", + "label": "NO" + }, + "default": "false" + } + }, + { + "widget-type": "number", + "label": "Initial Retry Duration (Seconds)", + "name": "initialRetryDuration", + "widget-attributes": { + "default": "1", + "minimum": "1" + } + }, + { + "widget-type": "number", + "label": "Max Retry Duration (Seconds)", + "name": "maxRetryDuration", + "widget-attributes": { + "default": "32", + "minimum": "1" + } + }, + { + "widget-type": "number", + "label": "Max Retry Count", + "name": "maxRetryCount", + "widget-attributes": { + "default": "5", + "minimum": "1" + } + }, + { + "widget-type": "textbox", + "label": "Retry Multiplier", + "name": "retryMultiplier", + "widget-attributes": { + "default": "2", + "placeholder": "The multiplier to use on retry attempts." + } + } + ] } ], "filters": [ @@ -243,6 +300,30 @@ "name": "cmekKey" } ] + }, + { + "name": "retryOnBackendError", + "condition": { + "expression": "retryOnBackendError == true" + }, + "show": [ + { + "type": "property", + "name": "initialRetryDuration" + }, + { + "type": "property", + "name": "maxRetryDuration" + }, + { + "type": "property", + "name": "maxRetryCount" + }, + { + "type": "property", + "name": "retryMultiplier" + } + ] } ], "outputs": []