diff --git a/src/main/java/io/cdap/plugin/gcp/bigquery/action/BigQueryExecute.java b/src/main/java/io/cdap/plugin/gcp/bigquery/action/BigQueryExecute.java index c1edbe7186..4f82e1685d 100644 --- a/src/main/java/io/cdap/plugin/gcp/bigquery/action/BigQueryExecute.java +++ b/src/main/java/io/cdap/plugin/gcp/bigquery/action/BigQueryExecute.java @@ -46,6 +46,7 @@ import io.cdap.cdap.etl.api.action.Action; import io.cdap.cdap.etl.api.action.ActionContext; import io.cdap.cdap.etl.common.Constants; +import io.cdap.plugin.gcp.bigquery.exception.BigQueryJobExecutionException; import io.cdap.plugin.gcp.bigquery.sink.BigQuerySinkUtils; import io.cdap.plugin.gcp.bigquery.util.BigQueryUtil; import io.cdap.plugin.gcp.common.CmekUtils; @@ -55,7 +56,6 @@ import org.slf4j.LoggerFactory; import java.io.IOException; -import java.net.ConnectException; import java.time.Duration; import java.util.Collections; import java.util.Map; @@ -148,7 +148,7 @@ protected void executeQueryWithExponentialBackoff(Duration initialRetryDuration, int multiplier, int maxRetryCount, BigQuery bigQuery, QueryJobConfiguration queryConfig, ActionContext context) { RetryPolicy retryPolicy = RetryPolicy.builder() - .handle(ConnectException.class) + .handle(BigQueryJobExecutionException.class) .withBackoff(initialRetryDuration, maxRetryDuration, multiplier) .withMaxRetries(maxRetryCount) .onRetry(event -> LOG.debug("Retrying BigQuery Execute job. Retry count: {}", event.getAttemptCount())) @@ -160,16 +160,25 @@ protected void executeQueryWithExponentialBackoff(Duration initialRetryDuration, } private void executeQuery(BigQuery bigQuery, QueryJobConfiguration queryConfig, ActionContext context, - Set retryOnReason) throws InterruptedException, ConnectException { + Set retryOnReason) throws InterruptedException, BigQueryJobExecutionException { // Location must match that of the dataset(s) referenced in the query. JobId jobId = JobId.newBuilder().setRandomJob().setLocation(config.getLocation()).build(); - Job queryJob = bigQuery.create(JobInfo.newBuilder(queryConfig).setJobId(jobId).build()); + Job queryJob; - LOG.info("Executing SQL as job {}.", jobId.getJob()); - LOG.debug("The BigQuery SQL is {}", config.getSql()); + try { + queryJob = bigQuery.create(JobInfo.newBuilder(queryConfig).setJobId(jobId).build()); + + LOG.info("Executing SQL as job {}.", jobId.getJob()); + LOG.debug("The BigQuery SQL is {}", config.getSql()); - // Wait for the query to complete - queryJob = queryJob.waitFor(); + // Wait for the query to complete + queryJob = queryJob.waitFor(); + } catch (BigQueryException e) { + if (retryOnReason.contains(e.getError().getReason())) { + throw new BigQueryJobExecutionException(e.getMessage()); + } + throw new RuntimeException(e); + } // Check for errors if (queryJob.getStatus().getError() != null) { @@ -177,7 +186,7 @@ private void executeQuery(BigQuery bigQuery, QueryJobConfiguration queryConfig, // errors, not just the latest one. LOG.error("The query job {} failed. Error: {}", jobId.getJob(), queryJob.getStatus().getError()); if (retryOnReason.contains(queryJob.getStatus().getError().getReason())) { - throw new ConnectException(queryJob.getStatus().getError().getMessage()); + throw new BigQueryJobExecutionException(queryJob.getStatus().getError().getMessage()); } throw new RuntimeException(queryJob.getStatus().getError().getMessage()); } diff --git a/src/main/java/io/cdap/plugin/gcp/bigquery/exception/BigQueryJobExecutionException.java b/src/main/java/io/cdap/plugin/gcp/bigquery/exception/BigQueryJobExecutionException.java new file mode 100644 index 0000000000..6dadec8955 --- /dev/null +++ b/src/main/java/io/cdap/plugin/gcp/bigquery/exception/BigQueryJobExecutionException.java @@ -0,0 +1,34 @@ +/* + * Copyright © 2023 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.gcp.bigquery.exception; + +/** + * Custom exception class for handling errors related to BigQuery job execution. + * This exception should be thrown when an issue occurs during the execution of a BigQuery job, + * and the calling code should consider retrying the operation. + */ +public class BigQueryJobExecutionException extends Exception { + /** + * Constructs a new BigQueryJobExecutionException with the specified detail message. + * + * @param message The detail message that describes the exception. + */ + public BigQueryJobExecutionException(String message) { + super(message); + } +} +