Skip to content

Commit

Permalink
Added BQ Retry
Browse files Browse the repository at this point in the history
  • Loading branch information
psainics committed Nov 14, 2023
1 parent 4d207bb commit 57ae3ae
Show file tree
Hide file tree
Showing 4 changed files with 226 additions and 14 deletions.
6 changes: 6 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@
<slf4j.version>1.7.5</slf4j.version>
<spark3.version>3.3.2</spark3.version>
<spark-bq-connector.version>0.23.1</spark-bq-connector.version>
<failsafe.version>3.3.2</failsafe.version>
<testSourceLocation>${project.basedir}/src/test/java/</testSourceLocation>
</properties>

Expand Down Expand Up @@ -836,6 +837,11 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>dev.failsafe</groupId>
<artifactId>failsafe</artifactId>
<version>${failsafe.version}</version>
</dependency>
<!-- End: dependencies used by the Spark-BigQuery connector -->
<!-- Start: dependency used by the Dataplex connector -->
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@
import com.google.cloud.kms.v1.CryptoKeyName;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import dev.failsafe.Failsafe;
import dev.failsafe.RetryPolicy;
import io.cdap.cdap.api.annotation.Description;
import io.cdap.cdap.api.annotation.Macro;
import io.cdap.cdap.api.annotation.Name;
Expand All @@ -43,16 +46,20 @@
import io.cdap.cdap.etl.api.action.Action;
import io.cdap.cdap.etl.api.action.ActionContext;
import io.cdap.cdap.etl.common.Constants;
import io.cdap.plugin.gcp.bigquery.exception.BigQueryJobExecutionException;
import io.cdap.plugin.gcp.bigquery.sink.BigQuerySinkUtils;
import io.cdap.plugin.gcp.bigquery.util.BigQueryUtil;
import io.cdap.plugin.gcp.common.CmekUtils;
import io.cdap.plugin.gcp.common.GCPUtils;
import org.jetbrains.annotations.TestOnly;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.time.Duration;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
import javax.annotation.Nullable;

/**
Expand All @@ -69,8 +76,13 @@ public final class BigQueryExecute extends AbstractBigQueryAction {
private static final Logger LOG = LoggerFactory.getLogger(BigQueryExecute.class);
public static final String NAME = "BigQueryExecute";
private static final String RECORDS_PROCESSED = "records.processed";
private final Config config;
private static final Set<String> retryOnReason = ImmutableSet.of("jobBackendError", "jobInternalError");

private Config config;
@TestOnly
public BigQueryExecute(Config config) {
this.config = config;
}

@Override
public void run(ActionContext context) throws Exception {
Expand Down Expand Up @@ -103,9 +115,6 @@ public void run(ActionContext context) throws Exception {
// Enable legacy SQL
builder.setUseLegacySql(config.isLegacySQL());

// Location must match that of the dataset(s) referenced in the query.
JobId jobId = JobId.newBuilder().setRandomJob().setLocation(config.getLocation()).build();

// API request - starts the query.
Credentials credentials = config.getServiceAccount() == null ?
null : GCPUtils.loadServiceAccountCredentials(config.getServiceAccount(),
Expand All @@ -129,19 +138,62 @@ public void run(ActionContext context) throws Exception {

QueryJobConfiguration queryConfig = builder.build();

Job queryJob = bigQuery.create(JobInfo.newBuilder(queryConfig).setJobId(jobId).build());
// Exponential backoff with initial retry of 1 second and max retry of 32 seconds.
executeQueryWithExponentialBackoff(bigQuery, queryConfig, context);
}

LOG.info("Executing SQL as job {}.", jobId.getJob());
LOG.debug("The BigQuery SQL is {}", config.getSql());
protected void executeQueryWithExponentialBackoff(BigQuery bigQuery,
QueryJobConfiguration queryConfig, ActionContext context) {
Failsafe.with(getRetryPolicy()).run(() -> executeQuery(bigQuery, queryConfig, context, retryOnReason));
}

// Wait for the query to complete
queryJob = queryJob.waitFor();
private RetryPolicy<Object> getRetryPolicy() {
// Exponential backoff with initial retry of 1 second and max retry of 32 seconds.
Duration initialRetryDuration = Duration.ofSeconds(1);
Duration maxRetryDuration = Duration.ofSeconds(32);
int multiplier = 2;
int maxRetryCount = 5;
return RetryPolicy.builder()
.handle(BigQueryJobExecutionException.class)
.withBackoff(initialRetryDuration, maxRetryDuration, multiplier)
.withMaxRetries(maxRetryCount)
.onRetry(event -> LOG.debug("Retrying BigQuery Execute job. Retry count: {}", event.getAttemptCount()))
.onSuccess(event -> LOG.debug("BigQuery Execute job executed successfully."))
.onRetriesExceeded(event -> LOG.error("Retry limit reached for BigQuery Execute job."))
.build();
}

private void executeQuery(BigQuery bigQuery, QueryJobConfiguration queryConfig, ActionContext context,
Set<String> retryOnReason) throws InterruptedException, BigQueryJobExecutionException {
// Location must match that of the dataset(s) referenced in the query.
JobId jobId = JobId.newBuilder().setRandomJob().setLocation(config.getLocation()).build();
Job queryJob;

try {
queryJob = bigQuery.create(JobInfo.newBuilder(queryConfig).setJobId(jobId).build());

LOG.info("Executing SQL as job {}.", jobId.getJob());
LOG.debug("The BigQuery SQL is {}", config.getSql());

// Wait for the query to complete
queryJob = queryJob.waitFor();
} catch (BigQueryException e) {
LOG.error("The query job {} failed. Error: {}", jobId.getJob(), e.getMessage());
if (retryOnReason.contains(e.getError().getReason())) {
throw new BigQueryJobExecutionException(e.getMessage());
}
throw new RuntimeException(e);
}

// Check for errors
if (queryJob.getStatus().getError() != null) {
// You can also look at queryJob.getStatus().getExecutionErrors() for all
// errors, not just the latest one.
throw new RuntimeException(queryJob.getStatus().getExecutionErrors().toString());
LOG.error("The query job {} failed. Error: {}", jobId.getJob(), queryJob.getStatus().getError());
if (retryOnReason.contains(queryJob.getStatus().getError().getReason())) {
throw new BigQueryJobExecutionException(queryJob.getStatus().getError().getMessage());
}
throw new RuntimeException(queryJob.getStatus().getError().getMessage());
}

TableResult queryResults = queryJob.getQueryResults();
Expand Down Expand Up @@ -181,14 +233,14 @@ public void run(ActionContext context) throws Exception {

private void recordBytesProcessedMetric(ActionContext context, Job queryJob) {
long processedBytes =
((JobStatistics.QueryStatistics) queryJob.getStatistics()).getTotalBytesProcessed();
((JobStatistics.QueryStatistics) queryJob.getStatistics()).getTotalBytesProcessed();
LOG.info("Job {} processed {} bytes", queryJob.getJobId(), processedBytes);
Map<String, String> tags = new ImmutableMap.Builder<String, String>()
.put(Constants.Metrics.Tag.APP_ENTITY_TYPE, Action.PLUGIN_TYPE)
.put(Constants.Metrics.Tag.APP_ENTITY_TYPE_NAME, BigQueryExecute.NAME)
.build();
context.getMetrics().child(tags).countLong(BigQuerySinkUtils.BYTES_PROCESSED_METRIC,
processedBytes);
processedBytes);
}

@Override
Expand Down Expand Up @@ -275,7 +327,7 @@ public static final class Config extends AbstractBigQueryActionConfig {
private Config(@Nullable String project, @Nullable String serviceAccountType, @Nullable String serviceFilePath,
@Nullable String serviceAccountJson, @Nullable String dataset, @Nullable String table,
@Nullable String location, @Nullable String cmekKey, @Nullable String dialect, @Nullable String sql,
@Nullable String mode, @Nullable Boolean storeResults) {
@Nullable String mode, @Nullable String rowAsArguments, @Nullable Boolean storeResults) {
this.project = project;
this.serviceAccountType = serviceAccountType;
this.serviceFilePath = serviceFilePath;
Expand All @@ -287,6 +339,7 @@ private Config(@Nullable String project, @Nullable String serviceAccountType, @N
this.dialect = dialect;
this.sql = sql;
this.mode = mode;
this.rowAsArguments = rowAsArguments;
this.storeResults = storeResults;
}

Expand Down Expand Up @@ -448,6 +501,7 @@ public static class Builder {
private String dialect;
private String sql;
private String mode;
private String rowAsArguments;
private Boolean storeResults;

public Builder setProject(@Nullable String project) {
Expand Down Expand Up @@ -500,6 +554,11 @@ public Builder setMode(@Nullable String mode) {
return this;
}

public Builder setRowAsArguments(@Nullable String rowAsArguments) {
this.rowAsArguments = rowAsArguments;
return this;
}

public Builder setSql(@Nullable String sql) {
this.sql = sql;
return this;
Expand All @@ -518,10 +577,10 @@ public Config build() {
dialect,
sql,
mode,
rowAsArguments,
storeResults
);
}

}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright © 2023 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.plugin.gcp.bigquery.exception;

/**
* Custom exception class for handling errors related to BigQuery job execution.
* This exception should be thrown when an issue occurs during the execution of a BigQuery job,
* and the calling code should consider retrying the operation.
*/
public class BigQueryJobExecutionException extends Exception {
/**
* Constructs a new BigQueryJobExecutionException with the specified detail message.
*
* @param message The detail message that describes the exception.
*/
public BigQueryJobExecutionException(String message) {
super(message);
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
/*
* Copyright © 2023 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.plugin.gcp.bigquery.action;

import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryError;
import com.google.cloud.bigquery.Job;
import com.google.cloud.bigquery.JobId;
import com.google.cloud.bigquery.JobInfo;
import com.google.cloud.bigquery.JobStatistics;
import com.google.cloud.bigquery.JobStatus;
import com.google.cloud.bigquery.QueryJobConfiguration;
import com.google.cloud.bigquery.TableResult;
import io.cdap.cdap.api.metrics.Metrics;
import io.cdap.cdap.etl.api.StageMetrics;
import io.cdap.cdap.etl.api.action.ActionContext;

import org.junit.Before;
import org.junit.Test;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;

public class BigQueryExecuteTest {
@Mock
BigQuery bigQuery;
@Mock
Job queryJob;
@Mock
JobStatus jobStatus;
@Mock
BigQueryError bigQueryError;
@Mock
TableResult queryResults;
@Mock
JobStatistics.QueryStatistics queryStatistics;
@Mock
ActionContext context;
@Mock
StageMetrics stageMetrics;
@Mock
Metrics metrics;
QueryJobConfiguration queryJobConfiguration;
BigQueryExecute.Config config;
JobInfo jobInfo;
JobId jobId;
BigQueryExecute bq;

@Before
public void setUp() throws InterruptedException, NoSuchMethodException {
MockitoAnnotations.initMocks(this);

queryJobConfiguration = QueryJobConfiguration.newBuilder("select * from test").build();
config = BigQueryExecute.Config.builder()
.setLocation("US").setProject("testProject").setRowAsArguments("false").build();
jobId = JobId.newBuilder().setRandomJob().setLocation(config.getLocation()).build();
jobInfo = JobInfo.newBuilder(queryJobConfiguration).setJobId(jobId).build();
bq = new BigQueryExecute(config);

// Mock Job Creation
Mockito.when(bigQuery.create((JobInfo) Mockito.any())).thenReturn(queryJob);
Mockito.when(queryJob.waitFor()).thenReturn(queryJob);
Mockito.when(queryJob.getStatus()).thenReturn(jobStatus);
Mockito.when(jobStatus.getError()).thenReturn(bigQueryError);

// Mock Successful Query
Mockito.when(queryJob.getQueryResults()).thenReturn(queryResults);
Mockito.when(queryResults.getTotalRows()).thenReturn(1L);
Mockito.when(queryJob.getStatistics()).thenReturn(queryStatistics);
Mockito.when(queryStatistics.getTotalBytesProcessed()).thenReturn(1L);

// Mock context
Mockito.when(context.getMetrics()).thenReturn(stageMetrics);
Mockito.doNothing().when(stageMetrics).gauge(Mockito.anyString(), Mockito.anyLong());
Mockito.when(stageMetrics.child(Mockito.any())).thenReturn(metrics);
Mockito.doNothing().when(metrics).countLong(Mockito.anyString(), Mockito.anyLong());

}

@Test(expected = java.lang.RuntimeException.class)
public void testExecuteQueryWithExponentialBackoffFailsWithNonRetryError() {
Mockito.when(bigQueryError.getReason()).thenReturn("accessDenied");
bq.executeQueryWithExponentialBackoff(bigQuery, queryJobConfiguration, context);
}

@Test(expected = java.lang.RuntimeException.class)
public void testExecuteQueryWithExponentialBackoffFailsRetryError() {
Mockito.when(bigQueryError.getReason()).thenReturn("jobBackendError");
bq.executeQueryWithExponentialBackoff(bigQuery, queryJobConfiguration, context);
}

@Test
public void testExecuteQueryWithExponentialBackoffSuccess() throws InterruptedException {
Mockito.when(jobStatus.getError()).thenReturn(null);
Mockito.when(queryJob.getQueryResults()).thenReturn(queryResults);
bq.executeQueryWithExponentialBackoff(bigQuery, queryJobConfiguration, context);
}
}

0 comments on commit 57ae3ae

Please sign in to comment.