diff --git a/pom.xml b/pom.xml
index 0096ddcd84..fc710f9c03 100644
--- a/pom.xml
+++ b/pom.xml
@@ -98,6 +98,7 @@
1.7.5
3.3.2
0.23.1
+ 3.3.2
${project.basedir}/src/test/java/
@@ -840,6 +841,11 @@
+
+ dev.failsafe
+ failsafe
+ ${failsafe.version}
+
diff --git a/src/main/java/io/cdap/plugin/gcp/bigquery/action/BigQueryExecute.java b/src/main/java/io/cdap/plugin/gcp/bigquery/action/BigQueryExecute.java
index 87d778a89e..fbb96c1f16 100644
--- a/src/main/java/io/cdap/plugin/gcp/bigquery/action/BigQueryExecute.java
+++ b/src/main/java/io/cdap/plugin/gcp/bigquery/action/BigQueryExecute.java
@@ -33,8 +33,13 @@
import com.google.cloud.bigquery.TableId;
import com.google.cloud.bigquery.TableResult;
import com.google.cloud.kms.v1.CryptoKeyName;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import dev.failsafe.Failsafe;
+import dev.failsafe.FailsafeException;
+import dev.failsafe.RetryPolicy;
import io.cdap.cdap.api.annotation.Description;
import io.cdap.cdap.api.annotation.Macro;
import io.cdap.cdap.api.annotation.Name;
@@ -43,6 +48,7 @@
import io.cdap.cdap.etl.api.action.Action;
import io.cdap.cdap.etl.api.action.ActionContext;
import io.cdap.cdap.etl.common.Constants;
+import io.cdap.plugin.gcp.bigquery.exception.BigQueryJobExecutionException;
import io.cdap.plugin.gcp.bigquery.sink.BigQuerySinkUtils;
import io.cdap.plugin.gcp.bigquery.util.BigQueryUtil;
import io.cdap.plugin.gcp.common.CmekUtils;
@@ -51,8 +57,10 @@
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.time.Duration;
import java.util.Collections;
import java.util.Map;
+import java.util.Set;
import javax.annotation.Nullable;
/**
@@ -69,8 +77,18 @@ public final class BigQueryExecute extends AbstractBigQueryAction {
private static final Logger LOG = LoggerFactory.getLogger(BigQueryExecute.class);
public static final String NAME = "BigQueryExecute";
private static final String RECORDS_PROCESSED = "records.processed";
-
private Config config;
+ private static final String JOB_BACKEND_ERROR = "jobBackendError";
+ private static final String JOB_INTERNAL_ERROR = "jobInternalError";
+ private static final Set RETRY_ON_REASON = ImmutableSet.of(JOB_BACKEND_ERROR, JOB_INTERNAL_ERROR);
+
+ BigQueryExecute() {
+ // no args constructor
+ }
+ @VisibleForTesting
+ BigQueryExecute(Config config) {
+ this.config = config;
+ }
@Override
public void run(ActionContext context) throws Exception {
@@ -103,9 +121,6 @@ public void run(ActionContext context) throws Exception {
// Enable legacy SQL
builder.setUseLegacySql(config.isLegacySQL());
- // Location must match that of the dataset(s) referenced in the query.
- JobId jobId = JobId.newBuilder().setRandomJob().setLocation(config.getLocation()).build();
-
// API request - starts the query.
Credentials credentials = config.getServiceAccount() == null ?
null : GCPUtils.loadServiceAccountCredentials(config.getServiceAccount(),
@@ -129,19 +144,74 @@ public void run(ActionContext context) throws Exception {
QueryJobConfiguration queryConfig = builder.build();
- Job queryJob = bigQuery.create(JobInfo.newBuilder(queryConfig).setJobId(jobId).build());
+ // Exponential backoff
+ if (config.getRetryOnBackendError()) {
+ try {
+ executeQueryWithExponentialBackoff(bigQuery, queryConfig, context);
+ } catch (Throwable e) {
+ throw new RuntimeException(e);
+ }
+ } else {
+ executeQuery(bigQuery, queryConfig, context);
+ }
+ }
+
+ protected void executeQueryWithExponentialBackoff(BigQuery bigQuery,
+ QueryJobConfiguration queryConfig, ActionContext context)
+ throws Throwable {
+ try {
+ Failsafe.with(getRetryPolicy()).run(() -> executeQuery(bigQuery, queryConfig, context));
+ } catch (FailsafeException e) {
+ if (e.getCause() != null) {
+ throw e.getCause();
+ }
+ throw e;
+ }
+ }
+
+ private RetryPolicy