Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

retry fuctionality for all bulk apis #18

Open
wants to merge 6 commits into
base: release/1.6
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 23 additions & 21 deletions src/main/java/io/cdap/plugin/salesforce/SalesforceBulkUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import com.sforce.ws.ConnectionException;
import com.sforce.ws.ConnectorConfig;
import com.sforce.ws.SessionRenewer;
import io.cdap.plugin.salesforce.plugin.source.batch.util.BulkConnectionRetryWrapper;
import io.cdap.plugin.salesforce.plugin.source.batch.util.SalesforceSourceConstants;
import org.awaitility.Awaitility;
import org.slf4j.Logger;
Expand All @@ -55,7 +56,7 @@ public final class SalesforceBulkUtil {
/**
* Create a new job using the Bulk API.
*
* @param bulkConnection BulkConnection object that will connect to salesforce server using bulk APIs.
* @param bulkConnectionRetryWrapper bulk connection instance with retry logic
* @param sObject sObject name
* @param operationEnum Operation that need to be performed on sObject
* @param externalIdField externalIdField will be used in case of update/upsert operation.
Expand All @@ -65,8 +66,8 @@ public final class SalesforceBulkUtil {
* @throws AsyncApiException if there is an issue creating the job
*/

public static JobInfo createJob(BulkConnection bulkConnection, String sObject, OperationEnum operationEnum,
@Nullable String externalIdField,
public static JobInfo createJob(BulkConnectionRetryWrapper bulkConnectionRetryWrapper, String sObject,
OperationEnum operationEnum, @Nullable String externalIdField,
ConcurrencyMode concurrencyMode, ContentType contentType) throws AsyncApiException {
JobInfo job = new JobInfo();
job.setObject(sObject);
Expand All @@ -77,46 +78,46 @@ public static JobInfo createJob(BulkConnection bulkConnection, String sObject, O
job.setExternalIdFieldName(externalIdField);
}

job = bulkConnection.createJob(job);
job = bulkConnectionRetryWrapper.createJob(job);
Preconditions.checkState(job.getId() != null, "Couldn't get job ID. There was a problem in creating the " +
"batch job");
return bulkConnection.getJobStatus(job.getId());
return bulkConnectionRetryWrapper.getJobStatus(job.getId());
}

/**
* Close a job in Salesforce
*
* @param bulkConnection bulk connection instance
* @param bulkConnectionRetryWrapper bulk connection instance with retry logic
* @param jobId a job id
* @throws AsyncApiException if there is an issue creating the job
*/
public static void closeJob(BulkConnection bulkConnection, String jobId) throws AsyncApiException {
public static void closeJob(BulkConnectionRetryWrapper bulkConnectionRetryWrapper, String jobId)
throws AsyncApiException {
JobInfo job = new JobInfo();
job.setId(jobId);
job.setState(JobStateEnum.Closed);
bulkConnection.updateJob(job);
bulkConnectionRetryWrapper.updateJob(job);
}

/**
* Gets the results of the insert operation for every batch and checks them for errors.
*
* @param bulkConnection bulk connection instance
* @param bulkConnectionRetryWrapper bulk connection instance with retry logic.
* @param job a Salesforce job
* @param batchInfoList a list of batches to check
* @param ignoreFailures if true, unsuccessful row insertions do not cause an exception
* @throws AsyncApiException if there is an issue checking for batch results
* @throws IOException reading csv from Salesforce failed
*/
public static void checkResults(BulkConnection bulkConnection, JobInfo job,
public static void checkResults(BulkConnectionRetryWrapper bulkConnectionRetryWrapper, JobInfo job,
List<BatchInfo> batchInfoList, boolean ignoreFailures)
throws AsyncApiException, IOException {

throws AsyncApiException, IOException {
for (BatchInfo batchInfo : batchInfoList) {
/*
The response is a CSV with the following headers:
Id,Success,Created,Error
*/
CSVReader rdr = new CSVReader(bulkConnection.getBatchResultStream(job.getId(), batchInfo.getId()));
CSVReader rdr = new CSVReader(bulkConnectionRetryWrapper.getBatchResultStream(job.getId(), batchInfo.getId()));
List<String> resultHeader = rdr.nextRecord();
int successRowId = resultHeader.indexOf("Success");
int errorRowId = resultHeader.indexOf("Error");
Expand All @@ -127,7 +128,7 @@ public static void checkResults(BulkConnection bulkConnection, JobInfo job,
if (!success) {
String error = row.get(errorRowId);
String errorMessage = String.format("Failed to create row with error: '%s'. BatchId='%s'",
error, batchInfo.getId());
error, batchInfo.getId());
if (ignoreFailures) {
LOG.error(errorMessage);
} else {
Expand All @@ -141,12 +142,12 @@ public static void checkResults(BulkConnection bulkConnection, JobInfo job,
/**
* Wait for a job to complete by polling the Bulk API.
*
* @param bulkConnection BulkConnection used to check results.
* @param bulkConnectionRetryWrapper bulk connection instance with retry logic.
* @param job The job awaiting completion.
* @param batchInfoList List of batches for this job.
* @param ignoreFailures if true, unsuccessful row insertions do not cause an exception
*/
public static void awaitCompletion(BulkConnection bulkConnection, JobInfo job,
public static void awaitCompletion(BulkConnectionRetryWrapper bulkConnectionRetryWrapper, JobInfo job,
List<BatchInfo> batchInfoList, boolean ignoreFailures) {
Set<String> incomplete = batchInfoList
.stream()
Expand All @@ -164,7 +165,7 @@ public static void awaitCompletion(BulkConnection bulkConnection, JobInfo job,
.until(() -> {
try {
BatchInfo[] statusList =
bulkConnection.getBatchInfoList(job.getId()).getBatchInfo();
bulkConnectionRetryWrapper.getBatchInfoList(job.getId()).getBatchInfo();

for (BatchInfo b : statusList) {
if (b.getState() == BatchStateEnum.Failed) {
Expand All @@ -187,7 +188,7 @@ public static void awaitCompletion(BulkConnection bulkConnection, JobInfo job,
}
} catch (AsyncApiException e) {
if (AsyncExceptionCode.InvalidSessionId == e.getExceptionCode()) {
renewSession(bulkConnection, e);
renewSession(bulkConnectionRetryWrapper, e);
} else if (AsyncExceptionCode.ClientInputError == e.getExceptionCode() &&
failures.get() < SalesforceSourceConstants.MAX_RETRIES_ON_API_FAILURE) {
// This error can occur when server is not responding with proper error message due to network glitch.
Expand All @@ -205,12 +206,13 @@ public static void awaitCompletion(BulkConnection bulkConnection, JobInfo job,
/**
* Renew session if bulk connection resets
*
* @param connection Bulk Connection
* @param bulkConnectionRetryWrapper bulk connection instance with retry logic.
* @param e AsyncApiException
* @throws AsyncApiException
*/
private static void renewSession(BulkConnection connection, AsyncApiException e) throws AsyncApiException {
ConnectorConfig config = connection.getConfig();
private static void renewSession(BulkConnectionRetryWrapper bulkConnectionRetryWrapper, AsyncApiException e)
throws AsyncApiException {
ConnectorConfig config = bulkConnectionRetryWrapper.getConfig();
try {
SessionRenewer.SessionRenewalHeader sessionHeader = config.getSessionRenewer().renewSession(config);
config.setSessionId(((SessionHeader_element) sessionHeader.headerElement).getSessionId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.cdap.plugin.salesforce.SalesforceConnectionUtil;
import io.cdap.plugin.salesforce.authenticator.Authenticator;
import io.cdap.plugin.salesforce.authenticator.AuthenticatorCredentials;
import io.cdap.plugin.salesforce.plugin.source.batch.util.BulkConnectionRetryWrapper;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.JobContext;
Expand Down Expand Up @@ -77,8 +78,9 @@ public void commitJob(JobContext jobContext) {

try {
BulkConnection bulkConnection = new BulkConnection(Authenticator.createConnectorConfig(credentials));
BulkConnectionRetryWrapper bulkConnectionRetryWrapper = new BulkConnectionRetryWrapper(bulkConnection);
String jobId = conf.get(SalesforceSinkConstants.CONFIG_JOB_ID);
SalesforceBulkUtil.closeJob(bulkConnection, jobId);
SalesforceBulkUtil.closeJob(bulkConnectionRetryWrapper, jobId);
} catch (AsyncApiException e) {
throw new RuntimeException(
String.format("Failed to commit a Salesforce bulk job: %s", e.getMessage()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import io.cdap.plugin.salesforce.authenticator.Authenticator;
import io.cdap.plugin.salesforce.authenticator.AuthenticatorCredentials;
import io.cdap.plugin.salesforce.plugin.OAuthInfo;
import io.cdap.plugin.salesforce.plugin.source.batch.util.BulkConnectionRetryWrapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -83,9 +84,9 @@ public SalesforceOutputFormatProvider(SalesforceSinkConfig config) {

try {
BulkConnection bulkConnection = new BulkConnection(Authenticator.createConnectorConfig(credentials));
JobInfo job = SalesforceBulkUtil.createJob(bulkConnection, config.getSObject(), config.getOperationEnum(),
config.getExternalIdField(), config.getConcurrencyModeEnum(),
ContentType.ZIP_CSV);
BulkConnectionRetryWrapper bulkConnectionRetryWrapper = new BulkConnectionRetryWrapper(bulkConnection);
JobInfo job = SalesforceBulkUtil.createJob(bulkConnectionRetryWrapper, config.getSObject(),
config.getOperationEnum(), config.getExternalIdField(), config.getConcurrencyModeEnum(), ContentType.ZIP_CSV);
configBuilder.put(SalesforceSinkConstants.CONFIG_JOB_ID, job.getId());
LOG.info("Started Salesforce job with jobId='{}'", job.getId());
} catch (AsyncApiException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import io.cdap.plugin.salesforce.SalesforceConnectionUtil;
import io.cdap.plugin.salesforce.authenticator.Authenticator;
import io.cdap.plugin.salesforce.authenticator.AuthenticatorCredentials;
import io.cdap.plugin.salesforce.plugin.source.batch.util.BulkConnectionRetryWrapper;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.RecordWriter;
Expand All @@ -50,6 +51,7 @@ public class SalesforceRecordWriter extends RecordWriter<NullWritable, Structure
private static final Logger LOG = LoggerFactory.getLogger(SalesforceRecordWriter.class);

private BulkConnection bulkConnection;
private BulkConnectionRetryWrapper bulkConnectionRetryWrapper;
private JobInfo jobInfo;
private ErrorHandling errorHandling;
private Long maxBytesPerBatch;
Expand Down Expand Up @@ -79,6 +81,7 @@ public SalesforceRecordWriter(TaskAttemptContext taskAttemptContext) throws IOEx

AuthenticatorCredentials credentials = SalesforceConnectionUtil.getAuthenticatorCredentials(conf);
bulkConnection = new BulkConnection(Authenticator.createConnectorConfig(credentials));
bulkConnectionRetryWrapper = new BulkConnectionRetryWrapper(bulkConnection);
jobInfo = bulkConnection.getJobStatus(jobId);
isFileUploadObject = FileUploadSobject.isFileUploadSobject(jobInfo.getObject());
if (isFileUploadObject) {
Expand Down Expand Up @@ -160,9 +163,10 @@ public void close(TaskAttemptContext taskAttemptContext) throws IOException {
submitCurrentBatch();

try {
SalesforceBulkUtil.awaitCompletion(bulkConnection, jobInfo, batchInfoList,
SalesforceBulkUtil.awaitCompletion(bulkConnectionRetryWrapper, jobInfo, batchInfoList,
errorHandling.equals(ErrorHandling.SKIP));
SalesforceBulkUtil.checkResults(bulkConnection, jobInfo, batchInfoList, errorHandling.equals(ErrorHandling.SKIP));
SalesforceBulkUtil.checkResults(bulkConnectionRetryWrapper, jobInfo, batchInfoList,
errorHandling.equals(ErrorHandling.SKIP));
} catch (AsyncApiException | ConditionTimeoutException e) {
throw new RuntimeException(String.format("Failed to check the result of a batch for writes: %s",
e.getMessage()), e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,12 @@
import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.plugin.salesforce.BulkAPIBatchException;
import io.cdap.plugin.salesforce.SalesforceConnectionUtil;
import io.cdap.plugin.salesforce.SalesforceConstants;
import io.cdap.plugin.salesforce.authenticator.Authenticator;
import io.cdap.plugin.salesforce.authenticator.AuthenticatorCredentials;
import io.cdap.plugin.salesforce.plugin.source.batch.util.BulkConnectionRetryWrapper;
import io.cdap.plugin.salesforce.plugin.source.batch.util.SalesforceQueryExecutionException;
import io.cdap.plugin.salesforce.plugin.source.batch.util.SalesforceSourceConstants;
import io.cdap.plugin.salesforce.plugin.source.batch.util.SalesforceSplitUtil;

import org.apache.commons.csv.CSVFormat;
import org.apache.commons.csv.CSVParser;
import org.apache.commons.csv.CSVRecord;
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some of the methods are still not called using retry wrapper. E.g. line 208. move those also to retry wrapper class

Expand Down Expand Up @@ -261,10 +260,11 @@ public InputStream getQueryResultStream(BulkConnection bulkConnection)
*/
private String[] waitForBatchResults(BulkConnection bulkConnection)
throws AsyncApiException, InterruptedException, SalesforceQueryExecutionException {
BulkConnectionRetryWrapper bulkConnectionRetryWrapper = new BulkConnectionRetryWrapper(bulkConnection);
BatchInfo info = null;
for (int i = 0; i < SalesforceSourceConstants.GET_BATCH_RESULTS_TRIES; i++) {
try {
info = bulkConnection.getBatchInfo(jobId, batchId);
info = bulkConnectionRetryWrapper.getBatchInfo(jobId, batchId);
} catch (AsyncApiException e) {
if (i == SalesforceSourceConstants.GET_BATCH_RESULTS_TRIES - 1) {
throw e;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
/*
* Copyright © 2024 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package io.cdap.plugin.salesforce.plugin.source.batch.util;

import com.sforce.async.AsyncApiException;
import com.sforce.async.BatchInfo;
import com.sforce.async.BatchInfoList;
import com.sforce.async.BulkConnection;
import com.sforce.async.JobInfo;
import com.sforce.ws.ConnectorConfig;
import dev.failsafe.Failsafe;
import dev.failsafe.RetryPolicy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.InputStream;
import java.time.Duration;

/**
* BulkConnectionRetryWrapper class to retry all the salesforce api calls in case of failure.
*/
public class BulkConnectionRetryWrapper {

private final BulkConnection bulkConnection;
private final RetryPolicy retryPolicy;
private static final Logger LOG = LoggerFactory.getLogger(BulkConnectionRetryWrapper.class);

public BulkConnectionRetryWrapper(BulkConnection bulkConnection) {
this.bulkConnection = bulkConnection;
this.retryPolicy = SalesforceSplitUtil.getRetryPolicy(5L, 80L, 5);
}

public JobInfo createJob(JobInfo jobInfo) {
Object resultJobInfo = Failsafe.with(retryPolicy)
.onFailure(event -> LOG.info("Failed while creating job"))
.get(() -> {
try {
return bulkConnection.createJob(jobInfo);
} catch (AsyncApiException e) {
throw new SalesforceQueryExecutionException(e.getMessage());
}
});
return (JobInfo) resultJobInfo;
}

public JobInfo getJobStatus(String jobId) {
Object resultJobInfo = Failsafe.with(retryPolicy)
.onFailure(event -> LOG.info("Failed while getting job status"))
.get(() -> {
try {
return bulkConnection.getJobStatus(jobId);
} catch (AsyncApiException e) {
throw new SalesforceQueryExecutionException(e.getMessage());
}
});
return (JobInfo) resultJobInfo;
}

public void updateJob(JobInfo jobInfo) {
Failsafe.with(retryPolicy)
.onFailure(event -> LOG.info("Failed while updating job."))
.get(() -> {
try {
return bulkConnection.updateJob(jobInfo);
} catch (AsyncApiException e) {
throw new SalesforceQueryExecutionException(e.getMessage());
}
});
}

public BatchInfoList getBatchInfoList(String jobId) throws AsyncApiException {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add try catch to each method and in catch add a method that handles asyncapiException and handle retry as retry will be happen based on SalesforceQueryExecutionException.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added !

Object batchInfoList = Failsafe.with(retryPolicy)
.onFailure(event -> LOG.info("Failed while getting batch info list"))
.get(() -> {
try {
return bulkConnection.getBatchInfoList(jobId);
} catch (AsyncApiException e) {
throw new SalesforceQueryExecutionException(e.getMessage());
}
});
return (BatchInfoList) batchInfoList;
}

public BatchInfo getBatchInfo(String jobId, String batchId) throws AsyncApiException {
Object batchInfo = Failsafe.with(retryPolicy)
.onFailure(event -> LOG.info("Failed while getting batc status"))
.get(() -> {
try {
return bulkConnection.getBatchInfo(jobId, batchId);
} catch (AsyncApiException e) {
throw new SalesforceQueryExecutionException(e.getMessage());
}
});
return (BatchInfo) batchInfo;
}

public InputStream getBatchResultStream(String jobId, String batchId) throws AsyncApiException {
Object inputStream = Failsafe.with(retryPolicy)
.onFailure(event -> LOG.info("Failed while getting batch result stream"))
.get(() -> {
try {
return bulkConnection.getBatchResultStream(jobId, batchId);
} catch (AsyncApiException e) {
throw new SalesforceQueryExecutionException(e.getMessage());
}
});
return (InputStream) inputStream;
}

public ConnectorConfig getConfig() {
return bulkConnection.getConfig();
}

}
Loading