diff --git a/pom.xml b/pom.xml index fc761c48dd..11352e6b43 100644 --- a/pom.xml +++ b/pom.xml @@ -98,6 +98,7 @@ 1.7.5 3.3.2 0.23.1 + 3.3.2 ${project.basedir}/src/test/java/ @@ -836,6 +837,11 @@ + + dev.failsafe + failsafe + ${failsafe.version} + diff --git a/src/main/java/io/cdap/plugin/gcp/bigquery/action/BigQueryExecute.java b/src/main/java/io/cdap/plugin/gcp/bigquery/action/BigQueryExecute.java index 78455fdb4d..fc1806d857 100644 --- a/src/main/java/io/cdap/plugin/gcp/bigquery/action/BigQueryExecute.java +++ b/src/main/java/io/cdap/plugin/gcp/bigquery/action/BigQueryExecute.java @@ -35,6 +35,9 @@ import com.google.cloud.kms.v1.CryptoKeyName; import com.google.common.base.Strings; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import dev.failsafe.Failsafe; +import dev.failsafe.RetryPolicy; import io.cdap.cdap.api.annotation.Description; import io.cdap.cdap.api.annotation.Macro; import io.cdap.cdap.api.annotation.Name; @@ -43,16 +46,20 @@ import io.cdap.cdap.etl.api.action.Action; import io.cdap.cdap.etl.api.action.ActionContext; import io.cdap.cdap.etl.common.Constants; +import io.cdap.plugin.gcp.bigquery.exception.BigQueryJobExecutionException; import io.cdap.plugin.gcp.bigquery.sink.BigQuerySinkUtils; import io.cdap.plugin.gcp.bigquery.util.BigQueryUtil; import io.cdap.plugin.gcp.common.CmekUtils; import io.cdap.plugin.gcp.common.GCPUtils; +import org.jetbrains.annotations.TestOnly; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; +import java.time.Duration; import java.util.Collections; import java.util.Map; +import java.util.Set; import javax.annotation.Nullable; /** @@ -69,8 +76,13 @@ public final class BigQueryExecute extends AbstractBigQueryAction { private static final Logger LOG = LoggerFactory.getLogger(BigQueryExecute.class); public static final String NAME = "BigQueryExecute"; private static final String RECORDS_PROCESSED = "records.processed"; + private final Config config; + private static final Set retryOnReason = ImmutableSet.of("jobBackendError", "jobInternalError"); - private Config config; + @TestOnly + public BigQueryExecute(Config config) { + this.config = config; + } @Override public void run(ActionContext context) throws Exception { @@ -103,9 +115,6 @@ public void run(ActionContext context) throws Exception { // Enable legacy SQL builder.setUseLegacySql(config.isLegacySQL()); - // Location must match that of the dataset(s) referenced in the query. - JobId jobId = JobId.newBuilder().setRandomJob().setLocation(config.getLocation()).build(); - // API request - starts the query. Credentials credentials = config.getServiceAccount() == null ? null : GCPUtils.loadServiceAccountCredentials(config.getServiceAccount(), @@ -129,19 +138,62 @@ public void run(ActionContext context) throws Exception { QueryJobConfiguration queryConfig = builder.build(); - Job queryJob = bigQuery.create(JobInfo.newBuilder(queryConfig).setJobId(jobId).build()); + // Exponential backoff with initial retry of 1 second and max retry of 32 seconds. + executeQueryWithExponentialBackoff(bigQuery, queryConfig, context); + } - LOG.info("Executing SQL as job {}.", jobId.getJob()); - LOG.debug("The BigQuery SQL is {}", config.getSql()); + protected void executeQueryWithExponentialBackoff(BigQuery bigQuery, + QueryJobConfiguration queryConfig, ActionContext context) { + Failsafe.with(getRetryPolicy()).run(() -> executeQuery(bigQuery, queryConfig, context, retryOnReason)); + } - // Wait for the query to complete - queryJob = queryJob.waitFor(); + private RetryPolicy getRetryPolicy() { + // Exponential backoff with initial retry of 1 second and max retry of 32 seconds. + Duration initialRetryDuration = Duration.ofSeconds(1); + Duration maxRetryDuration = Duration.ofSeconds(32); + int multiplier = 2; + int maxRetryCount = 5; + return RetryPolicy.builder() + .handle(BigQueryJobExecutionException.class) + .withBackoff(initialRetryDuration, maxRetryDuration, multiplier) + .withMaxRetries(maxRetryCount) + .onRetry(event -> LOG.debug("Retrying BigQuery Execute job. Retry count: {}", event.getAttemptCount())) + .onSuccess(event -> LOG.debug("BigQuery Execute job executed successfully.")) + .onRetriesExceeded(event -> LOG.error("Retry limit reached for BigQuery Execute job.")) + .build(); + } + + private void executeQuery(BigQuery bigQuery, QueryJobConfiguration queryConfig, ActionContext context, + Set retryOnReason) throws InterruptedException, BigQueryJobExecutionException { + // Location must match that of the dataset(s) referenced in the query. + JobId jobId = JobId.newBuilder().setRandomJob().setLocation(config.getLocation()).build(); + Job queryJob; + + try { + queryJob = bigQuery.create(JobInfo.newBuilder(queryConfig).setJobId(jobId).build()); + + LOG.info("Executing SQL as job {}.", jobId.getJob()); + LOG.debug("The BigQuery SQL is {}", config.getSql()); + + // Wait for the query to complete + queryJob = queryJob.waitFor(); + } catch (BigQueryException e) { + LOG.error("The query job {} failed. Error: {}", jobId.getJob(), e.getMessage()); + if (retryOnReason.contains(e.getError().getReason())) { + throw new BigQueryJobExecutionException(e.getMessage()); + } + throw new RuntimeException(e); + } // Check for errors if (queryJob.getStatus().getError() != null) { // You can also look at queryJob.getStatus().getExecutionErrors() for all // errors, not just the latest one. - throw new RuntimeException(queryJob.getStatus().getExecutionErrors().toString()); + LOG.error("The query job {} failed. Error: {}", jobId.getJob(), queryJob.getStatus().getError()); + if (retryOnReason.contains(queryJob.getStatus().getError().getReason())) { + throw new BigQueryJobExecutionException(queryJob.getStatus().getError().getMessage()); + } + throw new RuntimeException(queryJob.getStatus().getError().getMessage()); } TableResult queryResults = queryJob.getQueryResults(); @@ -181,14 +233,14 @@ public void run(ActionContext context) throws Exception { private void recordBytesProcessedMetric(ActionContext context, Job queryJob) { long processedBytes = - ((JobStatistics.QueryStatistics) queryJob.getStatistics()).getTotalBytesProcessed(); + ((JobStatistics.QueryStatistics) queryJob.getStatistics()).getTotalBytesProcessed(); LOG.info("Job {} processed {} bytes", queryJob.getJobId(), processedBytes); Map tags = new ImmutableMap.Builder() .put(Constants.Metrics.Tag.APP_ENTITY_TYPE, Action.PLUGIN_TYPE) .put(Constants.Metrics.Tag.APP_ENTITY_TYPE_NAME, BigQueryExecute.NAME) .build(); context.getMetrics().child(tags).countLong(BigQuerySinkUtils.BYTES_PROCESSED_METRIC, - processedBytes); + processedBytes); } @Override @@ -275,7 +327,7 @@ public static final class Config extends AbstractBigQueryActionConfig { private Config(@Nullable String project, @Nullable String serviceAccountType, @Nullable String serviceFilePath, @Nullable String serviceAccountJson, @Nullable String dataset, @Nullable String table, @Nullable String location, @Nullable String cmekKey, @Nullable String dialect, @Nullable String sql, - @Nullable String mode, @Nullable Boolean storeResults) { + @Nullable String mode, @Nullable String rowAsArguments, @Nullable Boolean storeResults) { this.project = project; this.serviceAccountType = serviceAccountType; this.serviceFilePath = serviceFilePath; @@ -287,6 +339,7 @@ private Config(@Nullable String project, @Nullable String serviceAccountType, @N this.dialect = dialect; this.sql = sql; this.mode = mode; + this.rowAsArguments = rowAsArguments; this.storeResults = storeResults; } @@ -448,6 +501,7 @@ public static class Builder { private String dialect; private String sql; private String mode; + private String rowAsArguments; private Boolean storeResults; public Builder setProject(@Nullable String project) { @@ -500,6 +554,11 @@ public Builder setMode(@Nullable String mode) { return this; } + public Builder setRowAsArguments(@Nullable String rowAsArguments) { + this.rowAsArguments = rowAsArguments; + return this; + } + public Builder setSql(@Nullable String sql) { this.sql = sql; return this; @@ -518,10 +577,10 @@ public Config build() { dialect, sql, mode, + rowAsArguments, storeResults ); } - } } } diff --git a/src/main/java/io/cdap/plugin/gcp/bigquery/exception/BigQueryJobExecutionException.java b/src/main/java/io/cdap/plugin/gcp/bigquery/exception/BigQueryJobExecutionException.java new file mode 100644 index 0000000000..6dadec8955 --- /dev/null +++ b/src/main/java/io/cdap/plugin/gcp/bigquery/exception/BigQueryJobExecutionException.java @@ -0,0 +1,34 @@ +/* + * Copyright © 2023 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.gcp.bigquery.exception; + +/** + * Custom exception class for handling errors related to BigQuery job execution. + * This exception should be thrown when an issue occurs during the execution of a BigQuery job, + * and the calling code should consider retrying the operation. + */ +public class BigQueryJobExecutionException extends Exception { + /** + * Constructs a new BigQueryJobExecutionException with the specified detail message. + * + * @param message The detail message that describes the exception. + */ + public BigQueryJobExecutionException(String message) { + super(message); + } +} + diff --git a/src/test/java/io/cdap/plugin/gcp/bigquery/action/BigQueryExecuteTest.java b/src/test/java/io/cdap/plugin/gcp/bigquery/action/BigQueryExecuteTest.java new file mode 100644 index 0000000000..e278a26999 --- /dev/null +++ b/src/test/java/io/cdap/plugin/gcp/bigquery/action/BigQueryExecuteTest.java @@ -0,0 +1,113 @@ +/* + * Copyright © 2023 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.gcp.bigquery.action; + +import com.google.cloud.bigquery.BigQuery; +import com.google.cloud.bigquery.BigQueryError; +import com.google.cloud.bigquery.Job; +import com.google.cloud.bigquery.JobId; +import com.google.cloud.bigquery.JobInfo; +import com.google.cloud.bigquery.JobStatistics; +import com.google.cloud.bigquery.JobStatus; +import com.google.cloud.bigquery.QueryJobConfiguration; +import com.google.cloud.bigquery.TableResult; +import io.cdap.cdap.api.metrics.Metrics; +import io.cdap.cdap.etl.api.StageMetrics; +import io.cdap.cdap.etl.api.action.ActionContext; + +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.MockitoAnnotations; + +public class BigQueryExecuteTest { + @Mock + BigQuery bigQuery; + @Mock + Job queryJob; + @Mock + JobStatus jobStatus; + @Mock + BigQueryError bigQueryError; + @Mock + TableResult queryResults; + @Mock + JobStatistics.QueryStatistics queryStatistics; + @Mock + ActionContext context; + @Mock + StageMetrics stageMetrics; + @Mock + Metrics metrics; + QueryJobConfiguration queryJobConfiguration; + BigQueryExecute.Config config; + JobInfo jobInfo; + JobId jobId; + BigQueryExecute bq; + + @Before + public void setUp() throws InterruptedException, NoSuchMethodException { + MockitoAnnotations.initMocks(this); + + queryJobConfiguration = QueryJobConfiguration.newBuilder("select * from test").build(); + config = BigQueryExecute.Config.builder() + .setLocation("US").setProject("testProject").setRowAsArguments("false").build(); + jobId = JobId.newBuilder().setRandomJob().setLocation(config.getLocation()).build(); + jobInfo = JobInfo.newBuilder(queryJobConfiguration).setJobId(jobId).build(); + bq = new BigQueryExecute(config); + + // Mock Job Creation + Mockito.when(bigQuery.create((JobInfo) Mockito.any())).thenReturn(queryJob); + Mockito.when(queryJob.waitFor()).thenReturn(queryJob); + Mockito.when(queryJob.getStatus()).thenReturn(jobStatus); + Mockito.when(jobStatus.getError()).thenReturn(bigQueryError); + + // Mock Successful Query + Mockito.when(queryJob.getQueryResults()).thenReturn(queryResults); + Mockito.when(queryResults.getTotalRows()).thenReturn(1L); + Mockito.when(queryJob.getStatistics()).thenReturn(queryStatistics); + Mockito.when(queryStatistics.getTotalBytesProcessed()).thenReturn(1L); + + // Mock context + Mockito.when(context.getMetrics()).thenReturn(stageMetrics); + Mockito.doNothing().when(stageMetrics).gauge(Mockito.anyString(), Mockito.anyLong()); + Mockito.when(stageMetrics.child(Mockito.any())).thenReturn(metrics); + Mockito.doNothing().when(metrics).countLong(Mockito.anyString(), Mockito.anyLong()); + + } + + @Test(expected = java.lang.RuntimeException.class) + public void testExecuteQueryWithExponentialBackoffFailsWithNonRetryError() { + Mockito.when(bigQueryError.getReason()).thenReturn("accessDenied"); + bq.executeQueryWithExponentialBackoff(bigQuery, queryJobConfiguration, context); + } + + @Test(expected = java.lang.RuntimeException.class) + public void testExecuteQueryWithExponentialBackoffFailsRetryError() { + Mockito.when(bigQueryError.getReason()).thenReturn("jobBackendError"); + bq.executeQueryWithExponentialBackoff(bigQuery, queryJobConfiguration, context); + } + + @Test + public void testExecuteQueryWithExponentialBackoffSuccess() throws InterruptedException { + Mockito.when(jobStatus.getError()).thenReturn(null); + Mockito.when(queryJob.getQueryResults()).thenReturn(queryResults); + bq.executeQueryWithExponentialBackoff(bigQuery, queryJobConfiguration, context); + } +} +