Skip to content

Commit

Permalink
Merge pull request data-integrations#266 from cloudsufi/refactorRecor…
Browse files Browse the repository at this point in the history
…dReaders

Fix TaskAttempt Context is null
  • Loading branch information
Vipinofficial11 authored Oct 23, 2024
2 parents 7f4a8ab + 41ccf76 commit 6165ee5
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,10 @@ public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptCont
public SalesforceWideRecordReader initialize(
InputSplit inputSplit, AuthenticatorCredentials credentials)
throws IOException, InterruptedException {
List<Map<String, ?>> fetchedIdList = fetchBulkQueryIds(inputSplit, null);
// Use default configurations of BulkRecordReader.
super.initialize(inputSplit, credentials);

List<Map<String, ?>> fetchedIdList = fetchBulkQueryIds();
LOG.debug("Number of records received from batch job for wide object: '{}'", fetchedIdList.size());

try {
Expand All @@ -84,12 +87,18 @@ public SalesforceWideRecordReader initialize(
Lists.partition(fetchedIdList, SalesforceSourceConstants.WIDE_QUERY_MAX_BATCH_COUNT);
LOG.debug("Number of partitions to be fetched for wide object: '{}'", partitions.size());

// Process partitions with batches sized to adhere to API limits and optimize memory usage.
// [CDAP]TODO: Address issues while handling large datasets.
results = partitions.parallelStream()
.map(this::getSObjectIds)
.map(sObjectIds -> fetchPartition(partnerConnection, fields, sObjectName, sObjectIds))
.flatMap(Arrays::stream)
.map(sObject -> transformer.transformToMap(sObject, sObjectDescriptor))
.collect(Collectors.toList());
.flatMap(partition -> processPartition(partnerConnection, fields, sObjectName,
partition, sObjectDescriptor).stream())
.collect(Collectors.toList());

if (results == null) {
LOG.warn("Result list is null after processing partitions.");
results = new ArrayList<>();
}

return this;
} catch (ConnectionException e) {
String errorMessage = SalesforceConnectionUtil.getSalesforceErrorMessageFromException(e);
Expand Down Expand Up @@ -123,15 +132,10 @@ public float getProgress() {
/**
* Fetches single entry map (Id -> SObjectId_value) values received from Bulk API.
*
* @param inputSplit specifies batch details
* @param taskAttemptContext task context
* @return list of single entry Map
* @throws IOException can be due error during reading query
* @throws InterruptedException interrupted sleep while waiting for batch results
*/
private List<Map<String, ?>> fetchBulkQueryIds(InputSplit inputSplit, TaskAttemptContext taskAttemptContext)
throws IOException, InterruptedException {
super.initialize(inputSplit, taskAttemptContext);
private List<Map<String, ?>> fetchBulkQueryIds()
throws IOException {
List<Map<String, ?>> fetchedIdList = new ArrayList<>();
while (super.nextKeyValue()) {
fetchedIdList.add(super.getCurrentValue());
Expand Down Expand Up @@ -181,4 +185,37 @@ private SObject[] fetchPartition(PartnerConnection partnerConnection, String fie
e);
}
}

/**
* Processes a partition of SObject records by dividing the IDs into smaller batches,
* retrieving the corresponding records from Salesforce, and transforming them into maps.
*
* @param partnerConnection the Salesforce partner connection used for retrieving data.
* @param fields the fields to be retrieved for each SObject.
* @param sObjectName the name of the Salesforce object (e.g., Account, Lead).
* @param partition the partition containing the ID records to be processed.
* @param sObjectDescriptor descriptor containing the structure of the SObject.
* @return result from partitions
*/
private List<Map<String, ?>> processPartition(PartnerConnection partnerConnection, String fields, String sObjectName,
List<Map<String, ?>> partition, SObjectDescriptor sObjectDescriptor) {
List<Map<String, ?>> partitionResults = new ArrayList<>();
// Divide the list of SObject Ids into smaller batches to avoid exceeding retrieve id limits.

/* see more - https://developer.salesforce.com/docs/atlas.en-us.salesforce_app_limits_cheatsheet.meta/
salesforce_app_limits_cheatsheet/salesforce_app_limits_platform_apicalls.htm */
List<List<String>> idBatches = Lists.partition(
Arrays.asList(getSObjectIds(partition)), SalesforceSourceConstants.RETRIEVE_MAX_BATCH_COUNT);

// Iterate over each batch of Ids to fetch the records.
idBatches.forEach(idBatch -> {
SObject[] fetchedObjects = fetchPartition(
partnerConnection, fields, sObjectName, idBatch.toArray(new String[0]));
Arrays.stream(fetchedObjects)
.map(sObject -> transformer.transformToMap(sObject, sObjectDescriptor))
.forEach(partitionResults::add);
});

return partitionResults;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ public class SalesforceSourceConstants {
public static final String CONFIG_RETRY_REQUIRED = "mapred.salesforce.retryOnBackendError";

public static final int WIDE_QUERY_MAX_BATCH_COUNT = 2000;
public static final int RETRIEVE_MAX_BATCH_COUNT = 2000;
// https://developer.salesforce.com/docs/atlas.en-us.api_asynch.meta/api_asynch/
// async_api_headers_enable_pk_chunking.htm
public static final int MAX_PK_CHUNK_SIZE = 250000;
Expand Down

0 comments on commit 6165ee5

Please sign in to comment.