Skip to content

Commit

Permalink
Retry Wrapper
Browse files Browse the repository at this point in the history
  • Loading branch information
psainics committed Aug 21, 2024
1 parent 735cdf6 commit ac5b95c
Show file tree
Hide file tree
Showing 9 changed files with 328 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import com.sforce.ws.ConnectionException;
import com.sforce.ws.ConnectorConfig;
import com.sforce.ws.SessionRenewer;
import io.cdap.plugin.salesforce.plugin.source.batch.util.BulkConnectionRetryWrapper;
import io.cdap.plugin.salesforce.plugin.source.batch.util.SalesforceSourceConstants;
import org.awaitility.Awaitility;
import org.slf4j.Logger;
Expand Down Expand Up @@ -65,8 +66,8 @@ public final class SalesforceBulkUtil {
* @throws AsyncApiException if there is an issue creating the job
*/

public static JobInfo createJob(BulkConnection bulkConnection, String sObject, OperationEnum operationEnum,
@Nullable String externalIdField,
public static JobInfo createJob(BulkConnectionRetryWrapper bulkConnection, String sObject,
OperationEnum operationEnum, @Nullable String externalIdField,
ConcurrencyMode concurrencyMode, ContentType contentType) throws AsyncApiException {
JobInfo job = new JobInfo();
job.setObject(sObject);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import io.cdap.plugin.salesforce.authenticator.Authenticator;
import io.cdap.plugin.salesforce.authenticator.AuthenticatorCredentials;
import io.cdap.plugin.salesforce.plugin.OAuthInfo;
import io.cdap.plugin.salesforce.plugin.source.batch.util.BulkConnectionRetryWrapper;
import io.cdap.plugin.salesforce.plugin.source.batch.util.SalesforceSourceConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -55,7 +57,11 @@ public SalesforceOutputFormatProvider(SalesforceSinkConfig config) {
.put(SalesforceSinkConstants.CONFIG_MAX_BYTES_PER_BATCH, config.getMaxBytesPerBatch().toString())
.put(SalesforceSinkConstants.CONFIG_MAX_RECORDS_PER_BATCH, config.getMaxRecordsPerBatch().toString())
.put(SalesforceConstants.CONFIG_CONNECT_TIMEOUT, config.getConnection().getConnectTimeout().toString())
.put(SalesforceConstants.CONFIG_READ_TIMEOUT, config.getConnection().getReadTimeout().toString());
.put(SalesforceConstants.CONFIG_READ_TIMEOUT, config.getConnection().getReadTimeout().toString())
.put(SalesforceSourceConstants.CONFIG_INITIAL_RETRY_DURATION, Long.toString(config.getInitialRetryDuration()))
.put(SalesforceSourceConstants.CONFIG_MAX_RETRY_DURATION, Long.toString(config.getMaxRetryDuration()))
.put(SalesforceSourceConstants.CONFIG_MAX_RETRY_COUNT, Integer.toString(config.getMaxRetryCount()))
.put(SalesforceSourceConstants.CONFIG_RETRY_REQUIRED, Boolean.toString(config.isRetryRequired()));

if (!Strings.isNullOrEmpty(config.getConnection().getProxyUrl())) {
configBuilder.put(SalesforceConstants.CONFIG_PROXY_URL, config.getConnection().getProxyUrl());
Expand Down Expand Up @@ -83,7 +89,9 @@ public SalesforceOutputFormatProvider(SalesforceSinkConfig config) {

try {
BulkConnection bulkConnection = new BulkConnection(Authenticator.createConnectorConfig(credentials));
JobInfo job = SalesforceBulkUtil.createJob(bulkConnection, config.getSObject(), config.getOperationEnum(),
BulkConnectionRetryWrapper retryWrapper = new BulkConnectionRetryWrapper(bulkConnection, config.isRetryRequired(),
config.getInitialRetryDuration(), config.getMaxRetryDuration(), config.getMaxRetryCount());
JobInfo job = SalesforceBulkUtil.createJob(retryWrapper, config.getSObject(), config.getOperationEnum(),
config.getExternalIdField(), config.getConcurrencyModeEnum(),
ContentType.ZIP_CSV);
configBuilder.put(SalesforceSinkConstants.CONFIG_JOB_ID, job.getId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import io.cdap.plugin.salesforce.plugin.SalesforceConnectorBaseConfig;
import io.cdap.plugin.salesforce.plugin.SalesforceConnectorInfo;
import io.cdap.plugin.salesforce.plugin.connector.SalesforceConnectorConfig;
import io.cdap.plugin.salesforce.plugin.source.batch.util.SalesforceSourceConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -152,6 +153,26 @@ public class SalesforceSinkConfig extends ReferencePluginConfig {
@Description("Whether to validate the field data types of the input schema as per Salesforce specific data types")
private final Boolean datatypeValidation;

@Name(SalesforceSourceConstants.PROPERTY_INITIAL_RETRY_DURATION)
@Description("Time taken for the first retry. Default is 5 seconds.")
@Nullable
private Long initialRetryDuration;

@Name(SalesforceSourceConstants.PROPERTY_MAX_RETRY_DURATION)
@Description("Maximum time in seconds retries can take. Default is 80 seconds.")
@Nullable
private Long maxRetryDuration;

@Name(SalesforceSourceConstants.PROPERTY_MAX_RETRY_COUNT)
@Description("Maximum number of retries allowed. Default is 5.")
@Nullable
private Integer maxRetryCount;

@Name(SalesforceSourceConstants.PROPERTY_RETRY_REQUIRED)
@Description("Retry is required or not for some of the internal call failures")
@Nullable
private Boolean retryOnBackendError;

public SalesforceSinkConfig(String referenceName,
@Nullable String clientId,
@Nullable String clientSecret,
Expand Down Expand Up @@ -277,6 +298,23 @@ public String getOrgId(OAuthInfo oAuthInfo) throws ConnectionException {
return partnerConnection.getUserInfo().getOrganizationId();
}

public boolean isRetryRequired() {
return retryOnBackendError == null || retryOnBackendError;
}

public long getInitialRetryDuration() {
return initialRetryDuration == null ? SalesforceSourceConstants.DEFAULT_INITIAL_RETRY_DURATION_SECONDS :
initialRetryDuration;
}

public long getMaxRetryDuration() {
return maxRetryDuration == null ? SalesforceSourceConstants.DEFULT_MAX_RETRY_DURATION_SECONDS : maxRetryDuration;
}

public int getMaxRetryCount() {
return maxRetryCount == null ? SalesforceSourceConstants.DEFAULT_MAX_RETRY_COUNT : maxRetryCount;
}

public void validate(Schema schema, FailureCollector collector, @Nullable OAuthInfo oAuthInfo) {
if (connection != null) {
getConnection().validate(collector, oAuthInfo);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import io.cdap.plugin.salesforce.SalesforceConstants;
import io.cdap.plugin.salesforce.authenticator.AuthenticatorCredentials;
import io.cdap.plugin.salesforce.plugin.OAuthInfo;
import io.cdap.plugin.salesforce.plugin.source.batch.util.BulkConnectionRetryWrapper;
import io.cdap.plugin.salesforce.plugin.source.batch.util.SalesforceSplitUtil;

import java.util.ArrayList;
Expand Down Expand Up @@ -103,8 +104,11 @@ public void prepareRun(BatchSourceContext context) throws ConnectionException {
String sObjectNameField = config.getSObjectNameField();
authenticatorCredentials = config.getConnection().getAuthenticatorCredentials();
BulkConnection bulkConnection = SalesforceSplitUtil.getBulkConnection(authenticatorCredentials);
BulkConnectionRetryWrapper bulkConnectionRetryWrapper = new BulkConnectionRetryWrapper(bulkConnection,
config.isRetryRequired(), config.getInitialRetryDuration(), config.getMaxRetryDuration(),
config.getMaxRetryCount());
List<SalesforceSplit> querySplits = queries.parallelStream()
.map(query -> SalesforceSplitUtil.getQuerySplits(query, bulkConnection, false, config.getOperation(),
.map(query -> SalesforceSplitUtil.getQuerySplits(query, bulkConnectionRetryWrapper, false, config.getOperation(),
config.getInitialRetryDuration(), config.getMaxRetryDuration(),
config.getMaxRetryCount(), config.isRetryRequired()))
.flatMap(Collection::stream).collect(Collectors.toList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import io.cdap.plugin.salesforce.SalesforceSchemaUtil;
import io.cdap.plugin.salesforce.authenticator.AuthenticatorCredentials;
import io.cdap.plugin.salesforce.plugin.OAuthInfo;
import io.cdap.plugin.salesforce.plugin.source.batch.util.BulkConnectionRetryWrapper;
import io.cdap.plugin.salesforce.plugin.source.batch.util.SalesforceSourceConstants;
import io.cdap.plugin.salesforce.plugin.source.batch.util.SalesforceSplitUtil;

Expand Down Expand Up @@ -164,7 +165,10 @@ public static List<SalesforceSplit> getSplits(
bulkConnection.addHeader(SalesforceSourceConstants.HEADER_ENABLE_PK_CHUNK,
String.join(";", chunkHeaderValues));
}
List<SalesforceSplit> querySplits = SalesforceSplitUtil.getQuerySplits(query, bulkConnection,
BulkConnectionRetryWrapper bulkConnectionRetryWrapper = new BulkConnectionRetryWrapper(bulkConnection,
config.isRetryRequired(), config.getInitialRetryDuration(), config.getMaxRetryDuration(),
config.getMaxRetryCount());
List<SalesforceSplit> querySplits = SalesforceSplitUtil.getQuerySplits(query, bulkConnectionRetryWrapper,
enablePKChunk, config.getOperation(), config.getInitialRetryDuration(), config.getMaxRetryDuration(),
config.getMaxRetryCount(), config.isRetryRequired());
return querySplits;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@
import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.plugin.salesforce.BulkAPIBatchException;
import io.cdap.plugin.salesforce.SalesforceConnectionUtil;
import io.cdap.plugin.salesforce.SalesforceConstants;
import io.cdap.plugin.salesforce.authenticator.Authenticator;
import io.cdap.plugin.salesforce.authenticator.AuthenticatorCredentials;
import io.cdap.plugin.salesforce.plugin.source.batch.util.BulkConnectionRetryWrapper;
import io.cdap.plugin.salesforce.plugin.source.batch.util.SalesforceQueryExecutionException;
import io.cdap.plugin.salesforce.plugin.source.batch.util.SalesforceSourceConstants;
import io.cdap.plugin.salesforce.plugin.source.batch.util.SalesforceSplitUtil;
Expand Down Expand Up @@ -78,6 +78,7 @@ public class SalesforceBulkRecordReader extends RecordReader<Schema, Map<String,
private String batchId;
private String[] resultIds;
private int resultIdIndex;
private BulkConnectionRetryWrapper bulkConnectionRetryWrapper;

public SalesforceBulkRecordReader(Schema schema) {
this(schema, null, null, null);
Expand Down Expand Up @@ -115,6 +116,8 @@ public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptCont
maxRetryCount = Integer.valueOf(conf.get(SalesforceSourceConstants.CONFIG_MAX_RETRY_COUNT,
String.valueOf(SalesforceSourceConstants.DEFAULT_MAX_RETRY_COUNT)));
isRetryRequired = Boolean.valueOf(conf.get(SalesforceSourceConstants.CONFIG_RETRY_REQUIRED, String.valueOf(true)));
bulkConnectionRetryWrapper = new BulkConnectionRetryWrapper(bulkConnection, isRetryRequired, initialRetryDuration,
maxRetryDuration, maxRetryCount);
AuthenticatorCredentials credentials = SalesforceConnectionUtil.getAuthenticatorCredentials(conf);
initialize(inputSplit, credentials);
}
Expand Down Expand Up @@ -205,15 +208,8 @@ void setupParser() throws IOException, AsyncApiException, InterruptedException {
resultIdIndex, resultIds.length));
}
try {
final InputStream queryResponseStream;
if (isRetryRequired) {
queryResponseStream =
Failsafe.with(SalesforceSplitUtil.getRetryPolicy(initialRetryDuration, maxRetryDuration, maxRetryCount))
.get(() -> getQueryResultStream(bulkConnection));
} else {
queryResponseStream = bulkConnection.getQueryResultStream(jobId, batchId, resultIds[resultIdIndex]);
}

final InputStream queryResponseStream = bulkConnectionRetryWrapper
.getQueryResultStream(jobId, batchId, resultIds[resultIdIndex]);
CSVFormat csvFormat = CSVFormat.DEFAULT
.withHeader()
.withQuoteMode(QuoteMode.ALL)
Expand Down
Loading

0 comments on commit ac5b95c

Please sign in to comment.