diff --git a/src/main/java/io/cdap/plugin/salesforce/SalesforceBulkUtil.java b/src/main/java/io/cdap/plugin/salesforce/SalesforceBulkUtil.java index 0b841087..bc95e4d5 100644 --- a/src/main/java/io/cdap/plugin/salesforce/SalesforceBulkUtil.java +++ b/src/main/java/io/cdap/plugin/salesforce/SalesforceBulkUtil.java @@ -32,6 +32,7 @@ import com.sforce.ws.ConnectionException; import com.sforce.ws.ConnectorConfig; import com.sforce.ws.SessionRenewer; +import io.cdap.plugin.salesforce.plugin.source.batch.util.BulkConnectionRetryWrapper; import io.cdap.plugin.salesforce.plugin.source.batch.util.SalesforceSourceConstants; import org.awaitility.Awaitility; import org.slf4j.Logger; @@ -55,7 +56,7 @@ public final class SalesforceBulkUtil { /** * Create a new job using the Bulk API. * - * @param bulkConnection BulkConnection object that will connect to salesforce server using bulk APIs. + * @param bulkConnectionRetryWrapper bulk connection instance with retry logic * @param sObject sObject name * @param operationEnum Operation that need to be performed on sObject * @param externalIdField externalIdField will be used in case of update/upsert operation. @@ -65,8 +66,8 @@ public final class SalesforceBulkUtil { * @throws AsyncApiException if there is an issue creating the job */ - public static JobInfo createJob(BulkConnection bulkConnection, String sObject, OperationEnum operationEnum, - @Nullable String externalIdField, + public static JobInfo createJob(BulkConnectionRetryWrapper bulkConnectionRetryWrapper, String sObject, + OperationEnum operationEnum, @Nullable String externalIdField, ConcurrencyMode concurrencyMode, ContentType contentType) throws AsyncApiException { JobInfo job = new JobInfo(); job.setObject(sObject); @@ -77,46 +78,46 @@ public static JobInfo createJob(BulkConnection bulkConnection, String sObject, O job.setExternalIdFieldName(externalIdField); } - job = bulkConnection.createJob(job); + job = bulkConnectionRetryWrapper.createJob(job); Preconditions.checkState(job.getId() != null, "Couldn't get job ID. There was a problem in creating the " + "batch job"); - return bulkConnection.getJobStatus(job.getId()); + return bulkConnectionRetryWrapper.getJobStatus(job.getId()); } /** * Close a job in Salesforce * - * @param bulkConnection bulk connection instance + * @param bulkConnectionRetryWrapper bulk connection instance with retry logic * @param jobId a job id * @throws AsyncApiException if there is an issue creating the job */ - public static void closeJob(BulkConnection bulkConnection, String jobId) throws AsyncApiException { + public static void closeJob(BulkConnectionRetryWrapper bulkConnectionRetryWrapper, String jobId) + throws AsyncApiException { JobInfo job = new JobInfo(); job.setId(jobId); job.setState(JobStateEnum.Closed); - bulkConnection.updateJob(job); + bulkConnectionRetryWrapper.updateJob(job); } /** * Gets the results of the insert operation for every batch and checks them for errors. * - * @param bulkConnection bulk connection instance + * @param bulkConnectionRetryWrapper bulk connection instance with retry logic. * @param job a Salesforce job * @param batchInfoList a list of batches to check * @param ignoreFailures if true, unsuccessful row insertions do not cause an exception * @throws AsyncApiException if there is an issue checking for batch results * @throws IOException reading csv from Salesforce failed */ - public static void checkResults(BulkConnection bulkConnection, JobInfo job, + public static void checkResults(BulkConnectionRetryWrapper bulkConnectionRetryWrapper, JobInfo job, List batchInfoList, boolean ignoreFailures) - throws AsyncApiException, IOException { - + throws AsyncApiException, IOException { for (BatchInfo batchInfo : batchInfoList) { /* The response is a CSV with the following headers: Id,Success,Created,Error */ - CSVReader rdr = new CSVReader(bulkConnection.getBatchResultStream(job.getId(), batchInfo.getId())); + CSVReader rdr = new CSVReader(bulkConnectionRetryWrapper.getBatchResultStream(job.getId(), batchInfo.getId())); List resultHeader = rdr.nextRecord(); int successRowId = resultHeader.indexOf("Success"); int errorRowId = resultHeader.indexOf("Error"); @@ -127,7 +128,7 @@ public static void checkResults(BulkConnection bulkConnection, JobInfo job, if (!success) { String error = row.get(errorRowId); String errorMessage = String.format("Failed to create row with error: '%s'. BatchId='%s'", - error, batchInfo.getId()); + error, batchInfo.getId()); if (ignoreFailures) { LOG.error(errorMessage); } else { @@ -141,12 +142,12 @@ public static void checkResults(BulkConnection bulkConnection, JobInfo job, /** * Wait for a job to complete by polling the Bulk API. * - * @param bulkConnection BulkConnection used to check results. + * @param bulkConnectionRetryWrapper bulk connection instance with retry logic. * @param job The job awaiting completion. * @param batchInfoList List of batches for this job. * @param ignoreFailures if true, unsuccessful row insertions do not cause an exception */ - public static void awaitCompletion(BulkConnection bulkConnection, JobInfo job, + public static void awaitCompletion(BulkConnectionRetryWrapper bulkConnectionRetryWrapper, JobInfo job, List batchInfoList, boolean ignoreFailures) { Set incomplete = batchInfoList .stream() @@ -164,7 +165,7 @@ public static void awaitCompletion(BulkConnection bulkConnection, JobInfo job, .until(() -> { try { BatchInfo[] statusList = - bulkConnection.getBatchInfoList(job.getId()).getBatchInfo(); + bulkConnectionRetryWrapper.getBatchInfoList(job.getId()).getBatchInfo(); for (BatchInfo b : statusList) { if (b.getState() == BatchStateEnum.Failed) { @@ -187,7 +188,7 @@ public static void awaitCompletion(BulkConnection bulkConnection, JobInfo job, } } catch (AsyncApiException e) { if (AsyncExceptionCode.InvalidSessionId == e.getExceptionCode()) { - renewSession(bulkConnection, e); + renewSession(bulkConnectionRetryWrapper, e); } else if (AsyncExceptionCode.ClientInputError == e.getExceptionCode() && failures.get() < SalesforceSourceConstants.MAX_RETRIES_ON_API_FAILURE) { // This error can occur when server is not responding with proper error message due to network glitch. @@ -205,12 +206,13 @@ public static void awaitCompletion(BulkConnection bulkConnection, JobInfo job, /** * Renew session if bulk connection resets * - * @param connection Bulk Connection + * @param bulkConnectionRetryWrapper bulk connection instance with retry logic. * @param e AsyncApiException * @throws AsyncApiException */ - private static void renewSession(BulkConnection connection, AsyncApiException e) throws AsyncApiException { - ConnectorConfig config = connection.getConfig(); + private static void renewSession(BulkConnectionRetryWrapper bulkConnectionRetryWrapper, AsyncApiException e) + throws AsyncApiException { + ConnectorConfig config = bulkConnectionRetryWrapper.getConfig(); try { SessionRenewer.SessionRenewalHeader sessionHeader = config.getSessionRenewer().renewSession(config); config.setSessionId(((SessionHeader_element) sessionHeader.headerElement).getSessionId()); diff --git a/src/main/java/io/cdap/plugin/salesforce/plugin/sink/batch/SalesforceOutputFormat.java b/src/main/java/io/cdap/plugin/salesforce/plugin/sink/batch/SalesforceOutputFormat.java index b5d138a5..ebfe402c 100644 --- a/src/main/java/io/cdap/plugin/salesforce/plugin/sink/batch/SalesforceOutputFormat.java +++ b/src/main/java/io/cdap/plugin/salesforce/plugin/sink/batch/SalesforceOutputFormat.java @@ -22,6 +22,7 @@ import io.cdap.plugin.salesforce.SalesforceConnectionUtil; import io.cdap.plugin.salesforce.authenticator.Authenticator; import io.cdap.plugin.salesforce.authenticator.AuthenticatorCredentials; +import io.cdap.plugin.salesforce.plugin.source.batch.util.BulkConnectionRetryWrapper; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.JobContext; @@ -77,8 +78,9 @@ public void commitJob(JobContext jobContext) { try { BulkConnection bulkConnection = new BulkConnection(Authenticator.createConnectorConfig(credentials)); + BulkConnectionRetryWrapper bulkConnectionRetryWrapper = new BulkConnectionRetryWrapper(bulkConnection); String jobId = conf.get(SalesforceSinkConstants.CONFIG_JOB_ID); - SalesforceBulkUtil.closeJob(bulkConnection, jobId); + SalesforceBulkUtil.closeJob(bulkConnectionRetryWrapper, jobId); } catch (AsyncApiException e) { throw new RuntimeException( String.format("Failed to commit a Salesforce bulk job: %s", e.getMessage()), diff --git a/src/main/java/io/cdap/plugin/salesforce/plugin/sink/batch/SalesforceOutputFormatProvider.java b/src/main/java/io/cdap/plugin/salesforce/plugin/sink/batch/SalesforceOutputFormatProvider.java index 653026c3..ab5013ea 100644 --- a/src/main/java/io/cdap/plugin/salesforce/plugin/sink/batch/SalesforceOutputFormatProvider.java +++ b/src/main/java/io/cdap/plugin/salesforce/plugin/sink/batch/SalesforceOutputFormatProvider.java @@ -27,6 +27,7 @@ import io.cdap.plugin.salesforce.authenticator.Authenticator; import io.cdap.plugin.salesforce.authenticator.AuthenticatorCredentials; import io.cdap.plugin.salesforce.plugin.OAuthInfo; +import io.cdap.plugin.salesforce.plugin.source.batch.util.BulkConnectionRetryWrapper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -83,9 +84,9 @@ public SalesforceOutputFormatProvider(SalesforceSinkConfig config) { try { BulkConnection bulkConnection = new BulkConnection(Authenticator.createConnectorConfig(credentials)); - JobInfo job = SalesforceBulkUtil.createJob(bulkConnection, config.getSObject(), config.getOperationEnum(), - config.getExternalIdField(), config.getConcurrencyModeEnum(), - ContentType.ZIP_CSV); + BulkConnectionRetryWrapper bulkConnectionRetryWrapper = new BulkConnectionRetryWrapper(bulkConnection); + JobInfo job = SalesforceBulkUtil.createJob(bulkConnectionRetryWrapper, config.getSObject(), + config.getOperationEnum(), config.getExternalIdField(), config.getConcurrencyModeEnum(), ContentType.ZIP_CSV); configBuilder.put(SalesforceSinkConstants.CONFIG_JOB_ID, job.getId()); LOG.info("Started Salesforce job with jobId='{}'", job.getId()); } catch (AsyncApiException e) { diff --git a/src/main/java/io/cdap/plugin/salesforce/plugin/sink/batch/SalesforceRecordWriter.java b/src/main/java/io/cdap/plugin/salesforce/plugin/sink/batch/SalesforceRecordWriter.java index 981f7d85..60102609 100644 --- a/src/main/java/io/cdap/plugin/salesforce/plugin/sink/batch/SalesforceRecordWriter.java +++ b/src/main/java/io/cdap/plugin/salesforce/plugin/sink/batch/SalesforceRecordWriter.java @@ -25,6 +25,7 @@ import io.cdap.plugin.salesforce.SalesforceConnectionUtil; import io.cdap.plugin.salesforce.authenticator.Authenticator; import io.cdap.plugin.salesforce.authenticator.AuthenticatorCredentials; +import io.cdap.plugin.salesforce.plugin.source.batch.util.BulkConnectionRetryWrapper; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.RecordWriter; @@ -50,6 +51,7 @@ public class SalesforceRecordWriter extends RecordWriter LOG.info("Failed while creating job")) + .get(() -> { + try { + return bulkConnection.createJob(jobInfo); + } catch (AsyncApiException e) { + throw new SalesforceQueryExecutionException(e.getMessage()); + } + }); + return (JobInfo) resultJobInfo; + } + + public JobInfo getJobStatus(String jobId) { + Object resultJobInfo = Failsafe.with(retryPolicy) + .onFailure(event -> LOG.info("Failed while getting job status")) + .get(() -> { + try { + return bulkConnection.getJobStatus(jobId); + } catch (AsyncApiException e) { + throw new SalesforceQueryExecutionException(e.getMessage()); + } + }); + return (JobInfo) resultJobInfo; + } + + public void updateJob(JobInfo jobInfo) { + Failsafe.with(retryPolicy) + .onFailure(event -> LOG.info("Failed while updating job.")) + .get(() -> { + try { + return bulkConnection.updateJob(jobInfo); + } catch (AsyncApiException e) { + throw new SalesforceQueryExecutionException(e.getMessage()); + } + }); + } + + public BatchInfoList getBatchInfoList(String jobId) throws AsyncApiException { + Object batchInfoList = Failsafe.with(retryPolicy) + .onFailure(event -> LOG.info("Failed while getting batch info list")) + .get(() -> { + try { + return bulkConnection.getBatchInfoList(jobId); + } catch (AsyncApiException e) { + throw new SalesforceQueryExecutionException(e.getMessage()); + } + }); + return (BatchInfoList) batchInfoList; + } + + public BatchInfo getBatchInfo(String jobId, String batchId) throws AsyncApiException { + Object batchInfo = Failsafe.with(retryPolicy) + .onFailure(event -> LOG.info("Failed while getting batc status")) + .get(() -> { + try { + return bulkConnection.getBatchInfo(jobId, batchId); + } catch (AsyncApiException e) { + throw new SalesforceQueryExecutionException(e.getMessage()); + } + }); + return (BatchInfo) batchInfo; + } + + public InputStream getBatchResultStream(String jobId, String batchId) throws AsyncApiException { + Object inputStream = Failsafe.with(retryPolicy) + .onFailure(event -> LOG.info("Failed while getting batch result stream")) + .get(() -> { + try { + return bulkConnection.getBatchResultStream(jobId, batchId); + } catch (AsyncApiException e) { + throw new SalesforceQueryExecutionException(e.getMessage()); + } + }); + return (InputStream) inputStream; + } + + public ConnectorConfig getConfig() { + return bulkConnection.getConfig(); + } + +} diff --git a/src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/util/SalesforceSplitUtil.java b/src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/util/SalesforceSplitUtil.java index 6223c292..74c4250c 100644 --- a/src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/util/SalesforceSplitUtil.java +++ b/src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/util/SalesforceSplitUtil.java @@ -119,10 +119,10 @@ private static BatchInfo[] runBulkQuery(BulkConnection bulkConnection, String qu Long initialRetryDuration, Long maxRetryDuration, Integer maxRetryCount, Boolean retryOnBackendError) throws AsyncApiException, IOException, InterruptedException { - + BulkConnectionRetryWrapper bulkConnectionRetryWrapper = new BulkConnectionRetryWrapper(bulkConnection); SObjectDescriptor sObjectDescriptor = SObjectDescriptor.fromQuery(query); - JobInfo job = SalesforceBulkUtil.createJob(bulkConnection, sObjectDescriptor.getName(), getOperationEnum(operation), - null, ConcurrencyMode.Parallel, ContentType.CSV); + JobInfo job = SalesforceBulkUtil.createJob(bulkConnectionRetryWrapper, sObjectDescriptor.getName(), + getOperationEnum(operation), null, ConcurrencyMode.Parallel, ContentType.CSV); final BatchInfo batchInfo; try { if (retryOnBackendError) { @@ -139,8 +139,8 @@ private static BatchInfo[] runBulkQuery(BulkConnection bulkConnection, String qu return waitForBatchChunks(bulkConnection, job.getId(), batchInfo.getId()); } LOG.debug("PKChunking is not enabled"); - BatchInfo[] batchInfos = bulkConnection.getBatchInfoList(job.getId()).getBatchInfo(); - LOG.info("Job id {}, status: {}", job.getId(), bulkConnection.getJobStatus(job.getId()).getState()); + BatchInfo[] batchInfos = bulkConnectionRetryWrapper.getBatchInfoList(job.getId()).getBatchInfo(); + LOG.info("Job id {}, status: {}", job.getId(), bulkConnectionRetryWrapper.getJobStatus(job.getId()).getState()); if (batchInfos.length > 0) { LOG.info("Batch size {}, state {}", batchInfos.length, batchInfos[0].getState()); } @@ -201,14 +201,15 @@ public static BulkConnection getBulkConnection(AuthenticatorCredentials authenti private static BatchInfo[] waitForBatchChunks(BulkConnection bulkConnection, String jobId, String initialBatchId) throws AsyncApiException { BatchInfo initialBatchInfo = null; + BulkConnectionRetryWrapper bulkConnectionRetryWrapper = new BulkConnectionRetryWrapper(bulkConnection); for (int i = 0; i < SalesforceSourceConstants.GET_BATCH_RESULTS_TRIES; i++) { //check if the job is aborted - if (bulkConnection.getJobStatus(jobId).getState() == JobStateEnum.Aborted) { + if (bulkConnectionRetryWrapper.getJobStatus(jobId).getState() == JobStateEnum.Aborted) { LOG.info(String.format("Job with Id: '%s' is aborted", jobId)); return new BatchInfo[0]; } try { - initialBatchInfo = bulkConnection.getBatchInfo(jobId, initialBatchId); + initialBatchInfo = bulkConnectionRetryWrapper.getBatchInfo(jobId, initialBatchId); } catch (AsyncApiException e) { if (i == SalesforceSourceConstants.GET_BATCH_RESULTS_TRIES - 1) { throw e; @@ -218,7 +219,7 @@ private static BatchInfo[] waitForBatchChunks(BulkConnection bulkConnection, Str } if (initialBatchInfo.getState() == BatchStateEnum.NotProcessed) { - BatchInfo[] result = bulkConnection.getBatchInfoList(jobId).getBatchInfo(); + BatchInfo[] result = bulkConnectionRetryWrapper.getBatchInfoList(jobId).getBatchInfo(); return Arrays.stream(result).filter(batchInfo -> batchInfo.getState() != BatchStateEnum.NotProcessed) .toArray(BatchInfo[]::new); } else if (initialBatchInfo.getState() == BatchStateEnum.Failed) { @@ -236,10 +237,11 @@ private static BatchInfo[] waitForBatchChunks(BulkConnection bulkConnection, Str public static void closeJobs(Set jobIds, AuthenticatorCredentials authenticatorCredentials) { BulkConnection bulkConnection = SalesforceSplitUtil.getBulkConnection(authenticatorCredentials); + BulkConnectionRetryWrapper bulkConnectionRetryWrapper = new BulkConnectionRetryWrapper(bulkConnection); RuntimeException runtimeException = null; for (String jobId : jobIds) { try { - SalesforceBulkUtil.closeJob(bulkConnection, jobId); + SalesforceBulkUtil.closeJob(bulkConnectionRetryWrapper, jobId); } catch (AsyncApiException e) { if (runtimeException == null) { runtimeException = new RuntimeException(e); diff --git a/src/test/java/io/cdap/plugin/salesforce/SalesforceBulkUtilTest.java b/src/test/java/io/cdap/plugin/salesforce/SalesforceBulkUtilTest.java index 4857f5c6..be8e7dbd 100644 --- a/src/test/java/io/cdap/plugin/salesforce/SalesforceBulkUtilTest.java +++ b/src/test/java/io/cdap/plugin/salesforce/SalesforceBulkUtilTest.java @@ -23,6 +23,7 @@ import com.sforce.async.BulkConnection; import com.sforce.async.ConcurrencyMode; import com.sforce.async.JobInfo; +import io.cdap.plugin.salesforce.plugin.source.batch.util.BulkConnectionRetryWrapper; import org.junit.Before; import org.junit.Test; @@ -61,7 +62,8 @@ public void testAwaitCompletionBatchFailed() throws Exception { when(batchInfoList.getBatchInfo()).thenReturn(batchInfoDetail); // Call the awaitCompletion method and verify that it throws a BulkAPIBatchException when the batch fails - SalesforceBulkUtil.awaitCompletion(bulkConnection, job, Collections.singletonList(batchInfo), false); + BulkConnectionRetryWrapper wrapper = new BulkConnectionRetryWrapper(bulkConnection); + SalesforceBulkUtil.awaitCompletion(wrapper, job, Collections.singletonList(batchInfo), false); } @Test(expected = AsyncApiException.class) @@ -69,7 +71,8 @@ public void testAwaitCompletionAsyncApiException() throws Exception { when(bulkConnection.getBatchInfoList(job.getId())).thenThrow( new AsyncApiException("Failed to get batch info list ", AsyncExceptionCode.ClientInputError)); // Call the awaitCompletion method and verify that it throws a AsyncApiException after retrying 10 times. - SalesforceBulkUtil.awaitCompletion(bulkConnection, job, Collections.singletonList(batchInfo), true); + BulkConnectionRetryWrapper wrapper = new BulkConnectionRetryWrapper(bulkConnection); + SalesforceBulkUtil.awaitCompletion(wrapper, job, Collections.singletonList(batchInfo), true); } @Test @@ -81,6 +84,7 @@ public void testAwaitCompletionBatchCompleted() throws Exception { when(batchInfoList.getBatchInfo()).thenReturn(batchInfoDetail); // Call the awaitCompletion method and verify that it completes successfully - SalesforceBulkUtil.awaitCompletion(bulkConnection, job, Collections.singletonList(batchInfo), true); + BulkConnectionRetryWrapper wrapper = new BulkConnectionRetryWrapper(bulkConnection); + SalesforceBulkUtil.awaitCompletion(wrapper, job, Collections.singletonList(batchInfo), true); } }