diff --git a/src/main/java/io/cdap/plugin/salesforce/SalesforceBulkUtil.java b/src/main/java/io/cdap/plugin/salesforce/SalesforceBulkUtil.java index 0b841087..ece84d3d 100644 --- a/src/main/java/io/cdap/plugin/salesforce/SalesforceBulkUtil.java +++ b/src/main/java/io/cdap/plugin/salesforce/SalesforceBulkUtil.java @@ -32,6 +32,7 @@ import com.sforce.ws.ConnectionException; import com.sforce.ws.ConnectorConfig; import com.sforce.ws.SessionRenewer; +import io.cdap.plugin.salesforce.plugin.source.batch.util.BulkConnectionRetryWrapper; import io.cdap.plugin.salesforce.plugin.source.batch.util.SalesforceSourceConstants; import org.awaitility.Awaitility; import org.slf4j.Logger; @@ -65,8 +66,8 @@ public final class SalesforceBulkUtil { * @throws AsyncApiException if there is an issue creating the job */ - public static JobInfo createJob(BulkConnection bulkConnection, String sObject, OperationEnum operationEnum, - @Nullable String externalIdField, + public static JobInfo createJob(BulkConnectionRetryWrapper bulkConnection, String sObject, + OperationEnum operationEnum, @Nullable String externalIdField, ConcurrencyMode concurrencyMode, ContentType contentType) throws AsyncApiException { JobInfo job = new JobInfo(); job.setObject(sObject); diff --git a/src/main/java/io/cdap/plugin/salesforce/plugin/sink/batch/SalesforceOutputFormatProvider.java b/src/main/java/io/cdap/plugin/salesforce/plugin/sink/batch/SalesforceOutputFormatProvider.java index 653026c3..886052c0 100644 --- a/src/main/java/io/cdap/plugin/salesforce/plugin/sink/batch/SalesforceOutputFormatProvider.java +++ b/src/main/java/io/cdap/plugin/salesforce/plugin/sink/batch/SalesforceOutputFormatProvider.java @@ -27,6 +27,8 @@ import io.cdap.plugin.salesforce.authenticator.Authenticator; import io.cdap.plugin.salesforce.authenticator.AuthenticatorCredentials; import io.cdap.plugin.salesforce.plugin.OAuthInfo; +import io.cdap.plugin.salesforce.plugin.source.batch.util.BulkConnectionRetryWrapper; +import io.cdap.plugin.salesforce.plugin.source.batch.util.SalesforceSourceConstants; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -55,7 +57,11 @@ public SalesforceOutputFormatProvider(SalesforceSinkConfig config) { .put(SalesforceSinkConstants.CONFIG_MAX_BYTES_PER_BATCH, config.getMaxBytesPerBatch().toString()) .put(SalesforceSinkConstants.CONFIG_MAX_RECORDS_PER_BATCH, config.getMaxRecordsPerBatch().toString()) .put(SalesforceConstants.CONFIG_CONNECT_TIMEOUT, config.getConnection().getConnectTimeout().toString()) - .put(SalesforceConstants.CONFIG_READ_TIMEOUT, config.getConnection().getReadTimeout().toString()); + .put(SalesforceConstants.CONFIG_READ_TIMEOUT, config.getConnection().getReadTimeout().toString()) + .put(SalesforceSourceConstants.CONFIG_INITIAL_RETRY_DURATION, Long.toString(config.getInitialRetryDuration())) + .put(SalesforceSourceConstants.CONFIG_MAX_RETRY_DURATION, Long.toString(config.getMaxRetryDuration())) + .put(SalesforceSourceConstants.CONFIG_MAX_RETRY_COUNT, Integer.toString(config.getMaxRetryCount())) + .put(SalesforceSourceConstants.CONFIG_RETRY_REQUIRED, Boolean.toString(config.isRetryRequired())); if (!Strings.isNullOrEmpty(config.getConnection().getProxyUrl())) { configBuilder.put(SalesforceConstants.CONFIG_PROXY_URL, config.getConnection().getProxyUrl()); @@ -83,7 +89,9 @@ public SalesforceOutputFormatProvider(SalesforceSinkConfig config) { try { BulkConnection bulkConnection = new BulkConnection(Authenticator.createConnectorConfig(credentials)); - JobInfo job = SalesforceBulkUtil.createJob(bulkConnection, config.getSObject(), config.getOperationEnum(), + BulkConnectionRetryWrapper retryWrapper = new BulkConnectionRetryWrapper(bulkConnection, config.isRetryRequired(), + config.getInitialRetryDuration(), config.getMaxRetryDuration(), config.getMaxRetryCount()); + JobInfo job = SalesforceBulkUtil.createJob(retryWrapper, config.getSObject(), config.getOperationEnum(), config.getExternalIdField(), config.getConcurrencyModeEnum(), ContentType.ZIP_CSV); configBuilder.put(SalesforceSinkConstants.CONFIG_JOB_ID, job.getId()); diff --git a/src/main/java/io/cdap/plugin/salesforce/plugin/sink/batch/SalesforceSinkConfig.java b/src/main/java/io/cdap/plugin/salesforce/plugin/sink/batch/SalesforceSinkConfig.java index 73c2ec72..023faeda 100644 --- a/src/main/java/io/cdap/plugin/salesforce/plugin/sink/batch/SalesforceSinkConfig.java +++ b/src/main/java/io/cdap/plugin/salesforce/plugin/sink/batch/SalesforceSinkConfig.java @@ -41,6 +41,7 @@ import io.cdap.plugin.salesforce.plugin.SalesforceConnectorBaseConfig; import io.cdap.plugin.salesforce.plugin.SalesforceConnectorInfo; import io.cdap.plugin.salesforce.plugin.connector.SalesforceConnectorConfig; +import io.cdap.plugin.salesforce.plugin.source.batch.util.SalesforceSourceConstants; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -152,6 +153,26 @@ public class SalesforceSinkConfig extends ReferencePluginConfig { @Description("Whether to validate the field data types of the input schema as per Salesforce specific data types") private final Boolean datatypeValidation; + @Name(SalesforceSourceConstants.PROPERTY_INITIAL_RETRY_DURATION) + @Description("Time taken for the first retry. Default is 5 seconds.") + @Nullable + private Long initialRetryDuration; + + @Name(SalesforceSourceConstants.PROPERTY_MAX_RETRY_DURATION) + @Description("Maximum time in seconds retries can take. Default is 80 seconds.") + @Nullable + private Long maxRetryDuration; + + @Name(SalesforceSourceConstants.PROPERTY_MAX_RETRY_COUNT) + @Description("Maximum number of retries allowed. Default is 5.") + @Nullable + private Integer maxRetryCount; + + @Name(SalesforceSourceConstants.PROPERTY_RETRY_REQUIRED) + @Description("Retry is required or not for some of the internal call failures") + @Nullable + private Boolean retryOnBackendError; + public SalesforceSinkConfig(String referenceName, @Nullable String clientId, @Nullable String clientSecret, @@ -277,6 +298,23 @@ public String getOrgId(OAuthInfo oAuthInfo) throws ConnectionException { return partnerConnection.getUserInfo().getOrganizationId(); } + public boolean isRetryRequired() { + return retryOnBackendError == null || retryOnBackendError; + } + + public long getInitialRetryDuration() { + return initialRetryDuration == null ? SalesforceSourceConstants.DEFAULT_INITIAL_RETRY_DURATION_SECONDS : + initialRetryDuration; + } + + public long getMaxRetryDuration() { + return maxRetryDuration == null ? SalesforceSourceConstants.DEFULT_MAX_RETRY_DURATION_SECONDS : maxRetryDuration; + } + + public int getMaxRetryCount() { + return maxRetryCount == null ? SalesforceSourceConstants.DEFAULT_MAX_RETRY_COUNT : maxRetryCount; + } + public void validate(Schema schema, FailureCollector collector, @Nullable OAuthInfo oAuthInfo) { if (connection != null) { getConnection().validate(collector, oAuthInfo); diff --git a/src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/SalesforceBatchMultiSource.java b/src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/SalesforceBatchMultiSource.java index f668e1fe..b8dd1434 100644 --- a/src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/SalesforceBatchMultiSource.java +++ b/src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/SalesforceBatchMultiSource.java @@ -39,6 +39,7 @@ import io.cdap.plugin.salesforce.SalesforceConstants; import io.cdap.plugin.salesforce.authenticator.AuthenticatorCredentials; import io.cdap.plugin.salesforce.plugin.OAuthInfo; +import io.cdap.plugin.salesforce.plugin.source.batch.util.BulkConnectionRetryWrapper; import io.cdap.plugin.salesforce.plugin.source.batch.util.SalesforceSplitUtil; import java.util.ArrayList; @@ -103,8 +104,11 @@ public void prepareRun(BatchSourceContext context) throws ConnectionException { String sObjectNameField = config.getSObjectNameField(); authenticatorCredentials = config.getConnection().getAuthenticatorCredentials(); BulkConnection bulkConnection = SalesforceSplitUtil.getBulkConnection(authenticatorCredentials); + BulkConnectionRetryWrapper bulkConnectionRetryWrapper = new BulkConnectionRetryWrapper(bulkConnection, + config.isRetryRequired(), config.getInitialRetryDuration(), config.getMaxRetryDuration(), + config.getMaxRetryCount()); List querySplits = queries.parallelStream() - .map(query -> SalesforceSplitUtil.getQuerySplits(query, bulkConnection, false, config.getOperation(), + .map(query -> SalesforceSplitUtil.getQuerySplits(query, bulkConnectionRetryWrapper, false, config.getOperation(), config.getInitialRetryDuration(), config.getMaxRetryDuration(), config.getMaxRetryCount(), config.isRetryRequired())) .flatMap(Collection::stream).collect(Collectors.toList()); diff --git a/src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/SalesforceBatchSource.java b/src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/SalesforceBatchSource.java index d7e87c45..83c1eaf3 100644 --- a/src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/SalesforceBatchSource.java +++ b/src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/SalesforceBatchSource.java @@ -45,6 +45,7 @@ import io.cdap.plugin.salesforce.SalesforceSchemaUtil; import io.cdap.plugin.salesforce.authenticator.AuthenticatorCredentials; import io.cdap.plugin.salesforce.plugin.OAuthInfo; +import io.cdap.plugin.salesforce.plugin.source.batch.util.BulkConnectionRetryWrapper; import io.cdap.plugin.salesforce.plugin.source.batch.util.SalesforceSourceConstants; import io.cdap.plugin.salesforce.plugin.source.batch.util.SalesforceSplitUtil; @@ -164,7 +165,10 @@ public static List getSplits( bulkConnection.addHeader(SalesforceSourceConstants.HEADER_ENABLE_PK_CHUNK, String.join(";", chunkHeaderValues)); } - List querySplits = SalesforceSplitUtil.getQuerySplits(query, bulkConnection, + BulkConnectionRetryWrapper bulkConnectionRetryWrapper = new BulkConnectionRetryWrapper(bulkConnection, + config.isRetryRequired(), config.getInitialRetryDuration(), config.getMaxRetryDuration(), + config.getMaxRetryCount()); + List querySplits = SalesforceSplitUtil.getQuerySplits(query, bulkConnectionRetryWrapper, enablePKChunk, config.getOperation(), config.getInitialRetryDuration(), config.getMaxRetryDuration(), config.getMaxRetryCount(), config.isRetryRequired()); return querySplits; diff --git a/src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/SalesforceBulkRecordReader.java b/src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/SalesforceBulkRecordReader.java index bbf876d8..8af8884d 100644 --- a/src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/SalesforceBulkRecordReader.java +++ b/src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/SalesforceBulkRecordReader.java @@ -28,9 +28,9 @@ import io.cdap.cdap.api.data.schema.Schema; import io.cdap.plugin.salesforce.BulkAPIBatchException; import io.cdap.plugin.salesforce.SalesforceConnectionUtil; -import io.cdap.plugin.salesforce.SalesforceConstants; import io.cdap.plugin.salesforce.authenticator.Authenticator; import io.cdap.plugin.salesforce.authenticator.AuthenticatorCredentials; +import io.cdap.plugin.salesforce.plugin.source.batch.util.BulkConnectionRetryWrapper; import io.cdap.plugin.salesforce.plugin.source.batch.util.SalesforceQueryExecutionException; import io.cdap.plugin.salesforce.plugin.source.batch.util.SalesforceSourceConstants; import io.cdap.plugin.salesforce.plugin.source.batch.util.SalesforceSplitUtil; @@ -78,6 +78,7 @@ public class SalesforceBulkRecordReader extends RecordReader getQueryResultStream(bulkConnection)); - } else { - queryResponseStream = bulkConnection.getQueryResultStream(jobId, batchId, resultIds[resultIdIndex]); - } - + final InputStream queryResponseStream = bulkConnectionRetryWrapper + .getQueryResultStream(jobId, batchId, resultIds[resultIdIndex]); CSVFormat csvFormat = CSVFormat.DEFAULT .withHeader() .withQuoteMode(QuoteMode.ALL) diff --git a/src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/util/BulkConnectionRetryWrapper.java b/src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/util/BulkConnectionRetryWrapper.java new file mode 100644 index 00000000..ae74f603 --- /dev/null +++ b/src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/util/BulkConnectionRetryWrapper.java @@ -0,0 +1,209 @@ +/* + * Copyright © 2024 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package io.cdap.plugin.salesforce.plugin.source.batch.util; + +import com.sforce.async.AsyncApiException; +import com.sforce.async.BatchInfo; +import com.sforce.async.BatchInfoList; +import com.sforce.async.BulkConnection; +import com.sforce.async.JobInfo; +import com.sforce.ws.ConnectorConfig; +import dev.failsafe.Failsafe; +import dev.failsafe.RetryPolicy; +import io.cdap.plugin.salesforce.plugin.source.batch.SalesforceBulkRecordReader; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.time.Duration; + +/** + * BulkConnectionRetryWrapper class to retry all the salesforce api calls in case of failure. + */ +public class BulkConnectionRetryWrapper { + + private final BulkConnection bulkConnection; + private final RetryPolicy retryPolicy; + private static final Logger LOG = LoggerFactory.getLogger(BulkConnectionRetryWrapper.class); + private final boolean retryOnBackendError; + private final long maxRetryDuration; + private final int maxRetryCount; + private final long initialRetryDuration; + + public BulkConnectionRetryWrapper(BulkConnection bulkConnection, boolean isRetryRequired, + long initialRetryDuration, long maxRetryDuration, int maxRetryCount) { + this.bulkConnection = bulkConnection; + this.retryOnBackendError = isRetryRequired; + this.initialRetryDuration = initialRetryDuration; + this.maxRetryDuration = maxRetryDuration; + this.maxRetryCount = maxRetryCount; + this.retryPolicy = SalesforceSplitUtil.getRetryPolicy(initialRetryDuration, maxRetryDuration, maxRetryCount); + } + + public JobInfo createJob(JobInfo jobInfo) throws AsyncApiException { + if (!retryOnBackendError) { + return bulkConnection.createJob(jobInfo); + } + Object resultJobInfo = Failsafe.with(retryPolicy) + .onFailure(event -> LOG.info("Failed while creating job")) + .get(() -> { + try { + return bulkConnection.createJob(jobInfo); + } catch (AsyncApiException e) { + throw new SalesforceQueryExecutionException(e.getMessage()); + } + }); + return (JobInfo) resultJobInfo; + } + + public JobInfo getJobStatus(String jobId) throws AsyncApiException { + if (!retryOnBackendError) { + return bulkConnection.getJobStatus(jobId); + } + Object resultJobInfo = Failsafe.with(retryPolicy) + .onFailure(event -> LOG.info("Failed while getting job status")) + .get(() -> { + try { + return bulkConnection.getJobStatus(jobId); + } catch (AsyncApiException e) { + throw new SalesforceQueryExecutionException(e.getMessage()); + } + }); + return (JobInfo) resultJobInfo; + } + + public void updateJob(JobInfo jobInfo) throws AsyncApiException { + if (!retryOnBackendError) { + bulkConnection.updateJob(jobInfo); + return; + } + Failsafe.with(retryPolicy) + .onFailure(event -> LOG.info("Failed while updating job.")) + .get(() -> { + try { + return bulkConnection.updateJob(jobInfo); + } catch (AsyncApiException e) { + throw new SalesforceQueryExecutionException(e.getMessage()); + } + }); + } + + public BatchInfoList getBatchInfoList(String jobId) throws AsyncApiException { + if (!retryOnBackendError) { + return bulkConnection.getBatchInfoList(jobId); + } + Object batchInfoList = Failsafe.with(retryPolicy) + .onFailure(event -> LOG.info("Failed while getting batch info list")) + .get(() -> { + try { + return bulkConnection.getBatchInfoList(jobId); + } catch (AsyncApiException e) { + throw new SalesforceQueryExecutionException(e.getMessage()); + } + }); + return (BatchInfoList) batchInfoList; + } + + public BatchInfo getBatchInfo(String jobId, String batchId) throws AsyncApiException { + if (!retryOnBackendError) { + return bulkConnection.getBatchInfo(jobId, batchId); + } + Object batchInfo = Failsafe.with(retryPolicy) + .onFailure(event -> LOG.info("Failed while getting batc status")) + .get(() -> { + try { + return bulkConnection.getBatchInfo(jobId, batchId); + } catch (AsyncApiException e) { + throw new SalesforceQueryExecutionException(e.getMessage()); + } + }); + return (BatchInfo) batchInfo; + } + + public InputStream getBatchResultStream(String jobId, String batchId) throws AsyncApiException { + if (!retryOnBackendError) { + return bulkConnection.getBatchResultStream(jobId, batchId); + } + Object inputStream = Failsafe.with(retryPolicy) + .onFailure(event -> LOG.info("Failed while getting batch result stream")) + .get(() -> { + try { + return bulkConnection.getBatchResultStream(jobId, batchId); + } catch (AsyncApiException e) { + throw new SalesforceQueryExecutionException(e.getMessage()); + } + }); + return (InputStream) inputStream; + } + + public InputStream getQueryResultStream(String jobId, String batchId, String resultId) throws AsyncApiException { + if (!retryOnBackendError) { + return bulkConnection.getQueryResultStream(jobId, batchId, resultId); + } + Object inputStream = Failsafe.with(retryPolicy) + .onFailure(event -> LOG.info("Failed while getting query result stream")) + .get(() -> { + try { + return bulkConnection.getQueryResultStream(jobId, batchId, resultId); + } catch (AsyncApiException e) { + throw new SalesforceQueryExecutionException(e.getMessage()); + } + }); + return (InputStream) inputStream; + } + + public BatchInfo createBatchFromStream(String query, JobInfo job) throws AsyncApiException, + SalesforceQueryExecutionException, IOException { + if (!retryOnBackendError) { + return createBatchFromStreamI(query, job); + } + Object batchInfo = Failsafe.with(retryPolicy) + .onFailure(event -> LOG.info("Failed while creating batch from stream")) + .get(() -> { + try { + return createBatchFromStream(query, job); + } catch (AsyncApiException e) { + throw new SalesforceQueryExecutionException(e.getMessage()); + } + }); + return (BatchInfo) batchInfo; + } + + private BatchInfo createBatchFromStreamI(String query, JobInfo job) throws + SalesforceQueryExecutionException, IOException, AsyncApiException { + BatchInfo batchInfo = null; + try (ByteArrayInputStream bout = new ByteArrayInputStream(query.getBytes())) { + batchInfo = bulkConnection.createBatchFromStream(job, bout); + } catch (AsyncApiException exception) { + LOG.warn("The bulk query job {} failed. Job State: {}", job.getId(), job.getState()); + if (SalesforceBulkRecordReader.RETRY_ON_REASON.contains(exception.getExceptionCode())) { + throw new SalesforceQueryExecutionException(exception); + } + throw exception; + } + return batchInfo; + } + + public ConnectorConfig getConfig() { + return bulkConnection.getConfig(); + } + + public BulkConnection getBukConnection() { + return bulkConnection; + } +} diff --git a/src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/util/SalesforceSplitUtil.java b/src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/util/SalesforceSplitUtil.java index 6223c292..80a48389 100644 --- a/src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/util/SalesforceSplitUtil.java +++ b/src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/util/SalesforceSplitUtil.java @@ -65,7 +65,7 @@ public final class SalesforceSplitUtil { * @param enablePKChunk indicates if pk chunking is enabled * @return list of salesforce splits */ - public static List getQuerySplits(String query, BulkConnection bulkConnection, + public static List getQuerySplits(String query, BulkConnectionRetryWrapper bulkConnection, boolean enablePKChunk, String operation, Long initialRetryDuration, Long maxRetryDuration, Integer maxRetryCount, Boolean retryOnBackendError) { @@ -85,7 +85,7 @@ public static List getQuerySplits(String query, BulkConnection * @param enablePKChunk enable PK Chunking * @return array of batch info */ - private static BatchInfo[] getBatches(String query, BulkConnection bulkConnection, + private static BatchInfo[] getBatches(String query, BulkConnectionRetryWrapper bulkConnection, boolean enablePKChunk, String operation, Long initialRetryDuration, Long maxRetryDuration, Integer maxRetryCount, Boolean retryOnBackendError) { @@ -114,7 +114,7 @@ private static BatchInfo[] getBatches(String query, BulkConnection bulkConnectio * @throws AsyncApiException if there is an issue creating the job * @throws IOException failed to close the query */ - private static BatchInfo[] runBulkQuery(BulkConnection bulkConnection, String query, + private static BatchInfo[] runBulkQuery(BulkConnectionRetryWrapper bulkConnection, String query, boolean enablePKChunk, String operation, Long initialRetryDuration, Long maxRetryDuration, Integer maxRetryCount, Boolean retryOnBackendError) @@ -123,17 +123,9 @@ private static BatchInfo[] runBulkQuery(BulkConnection bulkConnection, String qu SObjectDescriptor sObjectDescriptor = SObjectDescriptor.fromQuery(query); JobInfo job = SalesforceBulkUtil.createJob(bulkConnection, sObjectDescriptor.getName(), getOperationEnum(operation), null, ConcurrencyMode.Parallel, ContentType.CSV); - final BatchInfo batchInfo; + BatchInfo batchInfo; try { - if (retryOnBackendError) { - batchInfo = - Failsafe.with(getRetryPolicy(initialRetryDuration, maxRetryDuration, maxRetryCount)) - .get(() -> createBatchFromStream(bulkConnection, query, job)); - } else { - try (ByteArrayInputStream bout = new ByteArrayInputStream(query.getBytes())) { - batchInfo = bulkConnection.createBatchFromStream(job, bout); - } - } + batchInfo = bulkConnection.createBatchFromStream(query, job); if (enablePKChunk) { LOG.debug("PKChunking is enabled"); return waitForBatchChunks(bulkConnection, job.getId(), batchInfo.getId()); @@ -155,23 +147,11 @@ private static BatchInfo[] runBulkQuery(BulkConnection bulkConnection, String qu throw (AsyncApiException) e.getCause(); } throw e; + } catch (SalesforceQueryExecutionException e) { + throw new RuntimeException(e); } } - private static BatchInfo createBatchFromStream(BulkConnection bulkConnection, String query, JobInfo job) throws - SalesforceQueryExecutionException, IOException, AsyncApiException { - BatchInfo batchInfo = null; - try (ByteArrayInputStream bout = new ByteArrayInputStream(query.getBytes())) { - batchInfo = bulkConnection.createBatchFromStream(job, bout); - } catch (AsyncApiException exception) { - LOG.warn("The bulk query job {} failed. Job State: {}", job.getId(), job.getState()); - if (SalesforceBulkRecordReader.RETRY_ON_REASON.contains(exception.getExceptionCode())) { - throw new SalesforceQueryExecutionException(exception); - } - throw exception; - } - return batchInfo; - } /** * Initializes bulk connection based on given Hadoop credentials configuration. @@ -198,7 +178,8 @@ public static BulkConnection getBulkConnection(AuthenticatorCredentials authenti * @return Array with Batches created by Salesforce API * @throws AsyncApiException if there is an issue creating the job */ - private static BatchInfo[] waitForBatchChunks(BulkConnection bulkConnection, String jobId, String initialBatchId) + private static BatchInfo[] waitForBatchChunks(BulkConnectionRetryWrapper bulkConnection, + String jobId, String initialBatchId) throws AsyncApiException { BatchInfo initialBatchInfo = null; for (int i = 0; i < SalesforceSourceConstants.GET_BATCH_RESULTS_TRIES; i++) { diff --git a/widgets/Salesforce-batchsink.json b/widgets/Salesforce-batchsink.json index 4f338743..4c76f186 100644 --- a/widgets/Salesforce-batchsink.json +++ b/widgets/Salesforce-batchsink.json @@ -218,6 +218,49 @@ }, "default": "true" } + }, + { + "widget-type": "hidden", + "label": "Initial Retry Duration", + "name": "initialRetryDuration", + "widget-attributes": { + "min": "1", + "default": "5" + } + }, + { + "widget-type": "hidden", + "label": "Max Retry Duration", + "name": "maxRetryDuration", + "widget-attributes": { + "min": "6", + "default": "80" + } + }, + { + "widget-type": "hidden", + "label": "Max Retry Count", + "name": "maxRetryCount", + "widget-attributes": { + "min": "1", + "default": "5" + } + }, + { + "widget-type": "hidden", + "label": "Retry On Backend Error", + "name": "retryOnBackendError", + "widget-attributes": { + "on": { + "value": "true", + "label": "YES" + }, + "off": { + "value": "false", + "label": "NO" + }, + "default": "true" + } } ] }