diff --git a/docs/BigQueryExecute-action.md b/docs/BigQueryExecute-action.md
index 961b59504b..daec65e72f 100644
--- a/docs/BigQueryExecute-action.md
+++ b/docs/BigQueryExecute-action.md
@@ -65,3 +65,15 @@ authorization. Can be set to 'auto-detect' when running on a Dataproc cluster.
When running on other clusters, the file must be present on every node in the cluster.
* **JSON**: Contents of the service account JSON file.
+
+Retry Configuration
+----------
+**Retry On Backend Error**: Whether to retry when a backend error occurs. Default is false. (Macro-enabled)
+
+**Initial Retry Duration (Seconds)**: Time taken for the first retry. Default is 5 seconds. (Macro-enabled)
+
+**Max Retry Duration (Seconds)**: Maximum time in seconds retries can take. Default is 32 seconds. (Macro-enabled)
+
+**Max Retry Count**: Maximum number of retries allowed. Default is 5. (Macro-enabled)
+
+**Retry Multiplier**: Multiplier used to calculate the next retry duration. Default is 2. (Macro-enabled)
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index 0096ddcd84..fc710f9c03 100644
--- a/pom.xml
+++ b/pom.xml
@@ -98,6 +98,7 @@
1.7.5
3.3.2
0.23.1
+ 3.3.2
${project.basedir}/src/test/java/
@@ -840,6 +841,11 @@
+
+ dev.failsafe
+ failsafe
+ ${failsafe.version}
+
diff --git a/src/main/java/io/cdap/plugin/gcp/bigquery/action/BigQueryExecute.java b/src/main/java/io/cdap/plugin/gcp/bigquery/action/BigQueryExecute.java
index 78455fdb4d..77e80733d2 100644
--- a/src/main/java/io/cdap/plugin/gcp/bigquery/action/BigQueryExecute.java
+++ b/src/main/java/io/cdap/plugin/gcp/bigquery/action/BigQueryExecute.java
@@ -33,8 +33,12 @@
import com.google.cloud.bigquery.TableId;
import com.google.cloud.bigquery.TableResult;
import com.google.cloud.kms.v1.CryptoKeyName;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import dev.failsafe.Failsafe;
+import dev.failsafe.RetryPolicy;
import io.cdap.cdap.api.annotation.Description;
import io.cdap.cdap.api.annotation.Macro;
import io.cdap.cdap.api.annotation.Name;
@@ -43,6 +47,7 @@
import io.cdap.cdap.etl.api.action.Action;
import io.cdap.cdap.etl.api.action.ActionContext;
import io.cdap.cdap.etl.common.Constants;
+import io.cdap.plugin.gcp.bigquery.exception.BigQueryJobExecutionException;
import io.cdap.plugin.gcp.bigquery.sink.BigQuerySinkUtils;
import io.cdap.plugin.gcp.bigquery.util.BigQueryUtil;
import io.cdap.plugin.gcp.common.CmekUtils;
@@ -51,8 +56,10 @@
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.time.Duration;
import java.util.Collections;
import java.util.Map;
+import java.util.Set;
import javax.annotation.Nullable;
/**
@@ -69,8 +76,18 @@ public final class BigQueryExecute extends AbstractBigQueryAction {
private static final Logger LOG = LoggerFactory.getLogger(BigQueryExecute.class);
public static final String NAME = "BigQueryExecute";
private static final String RECORDS_PROCESSED = "records.processed";
-
private Config config;
+ private static final String JOB_BACKEND_ERROR = "jobBackendError";
+ private static final String JOB_INTERNAL_ERROR = "jobInternalError";
+ private static final Set RETRY_ON_REASON = ImmutableSet.of(JOB_BACKEND_ERROR, JOB_INTERNAL_ERROR);
+
+ BigQueryExecute() {
+ // no args constructor
+ }
+ @VisibleForTesting
+ BigQueryExecute(Config config) {
+ this.config = config;
+ }
@Override
public void run(ActionContext context) throws Exception {
@@ -103,9 +120,6 @@ public void run(ActionContext context) throws Exception {
// Enable legacy SQL
builder.setUseLegacySql(config.isLegacySQL());
- // Location must match that of the dataset(s) referenced in the query.
- JobId jobId = JobId.newBuilder().setRandomJob().setLocation(config.getLocation()).build();
-
// API request - starts the query.
Credentials credentials = config.getServiceAccount() == null ?
null : GCPUtils.loadServiceAccountCredentials(config.getServiceAccount(),
@@ -129,19 +143,66 @@ public void run(ActionContext context) throws Exception {
QueryJobConfiguration queryConfig = builder.build();
- Job queryJob = bigQuery.create(JobInfo.newBuilder(queryConfig).setJobId(jobId).build());
+ // Exponential backoff with initial retry of 1 second and max retry of 32 seconds.
+ executeQueryWithExponentialBackoff(bigQuery, queryConfig, context);
+ }
+
+ protected void executeQueryWithExponentialBackoff(BigQuery bigQuery,
+ QueryJobConfiguration queryConfig, ActionContext context)
+ throws BigQueryJobExecutionException {
+ try {
+ Failsafe.with(getRetryPolicy()).run(() -> executeQuery(bigQuery, queryConfig, context));
+ } catch (RuntimeException e) {
+ String errorMessage = "Failed to execute BigQuery job. Reason: " + e.getMessage();
+ LOG.error(errorMessage);
+ throw new BigQueryJobExecutionException(errorMessage, e);
+ }
+
+ }
+
+ private RetryPolicy