Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added support for concurrency mode #404

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/CommonLibrariesForNET/IAuthenticationClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,13 @@ public interface IAuthenticationClient : IDisposable
string AccessToken { get; set; }
string Id { get; set; }
string ApiVersion { get; set; }

Task UsernamePasswordAsync(string clientId, string clientSecret, string username, string password);
Task UsernamePasswordAsync(string clientId, string clientSecret, string username, string password, string tokenRequestEndpointUrl);
Task WebServerAsync(string clientId, string clientSecret, string redirectUri, string code);
Task WebServerAsync(string clientId, string clientSecret, string redirectUri, string code, string tokenRequestEndpointUrl);
Task TokenRefreshAsync(string clientId, string refreshToken, string clientSecret = "");
Task TokenRefreshAsync(string clientId, string refreshToken, string clientSecret, string tokenRequestEndpointUrl);
Task GetLatestVersionAsync();
}
}
2 changes: 2 additions & 0 deletions src/CommonLibrariesForNET/Models/Xml/JobInfo.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ public class JobInfo

[XmlElement(ElementName = "object")]
public string Object { get; set; }
[XmlElement(ElementName = "concurrencyMode")]
public string ConcurrencyMode { get; set; }

[XmlElement(ElementName = "externalIdFieldName")]
public string ExternalIdFieldName { get; set; }
Expand Down
3 changes: 0 additions & 3 deletions src/CommonLibrariesForNET/Models/Xml/JobInfoResult.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,6 @@ public class JobInfoResult : JobInfo
[XmlElement(ElementName = "state")]
public string State { get; set; }

[XmlElement(ElementName = "concurrencyMode")]
public string ConcurrencyMode { get; set; }

[XmlElement(ElementName = "numberBatchesQueued")]
public int NumberBatchesQueued { get; set; }

Expand Down
18 changes: 18 additions & 0 deletions src/ForceToolkitForNET/BulkConstants.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,24 @@ public string Value()
}
}

public sealed class ConcurrencyMode
{
public static readonly ConcurrencyMode Parallel = new ConcurrencyMode("Parallel");
public static readonly ConcurrencyMode Serial = new ConcurrencyMode("Serial");

private readonly string _value;

private ConcurrencyMode(string value)
{
_value = value;
}

public string Value()
{
return _value;
}
}

public sealed class BatchState
{
public static readonly BatchState Queued = new BatchState("Queued");
Expand Down
35 changes: 19 additions & 16 deletions src/ForceToolkitForNET/ForceClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ public class ForceClient : IForceClient, IDisposable
protected readonly XmlHttpClient _xmlHttpClient;
protected readonly JsonHttpClient _jsonHttpClient;

public ISelectListResolver SelectListResolver { get; set; }
public ISelectListResolver SelectListResolver { get; set; }

public ForceClient(string instanceUrl, string accessToken, string apiVersion)
: this(instanceUrl, accessToken, apiVersion, new HttpClient(), new HttpClient())
Expand All @@ -37,7 +37,7 @@ public ForceClient(string instanceUrl, string accessToken, string apiVersion, Ht

_jsonHttpClient = new JsonHttpClient(instanceUrl, apiVersion, accessToken, httpClientForJson, callerWillDisposeHttpClients);
_xmlHttpClient = new XmlHttpClient(instanceUrl, apiVersion, accessToken, httpClientForXml, callerWillDisposeHttpClients);

SelectListResolver = new DataMemberSelectListResolver();
}

Expand Down Expand Up @@ -272,11 +272,11 @@ public async Task<string> GetFieldsCommaSeparatedListAsync(string objectName)
List<string> objectDescription = new List<string>();
foreach (ExpandoObject field in fields as IEnumerable)
{
objectDescription.Add((field).FirstOrDefault(x => x.Key == "name").Value.ToString());
objectDescription.Add((field).FirstOrDefault(x => x.Key == "name").Value.ToString());
}
return string.Join(", ", objectDescription);
}

public Task<T> ExecuteAnonymousAsync<T>(string apex)
{
if (string.IsNullOrEmpty(apex)) throw new ArgumentNullException("apex");
Expand All @@ -286,19 +286,19 @@ public Task<T> ExecuteAnonymousAsync<T>(string apex)
// BULK METHODS

public async Task<List<BatchInfoResult>> RunJobAsync<T>(string objectName, BulkConstants.OperationType operationType,
IEnumerable<ISObjectList<T>> recordsLists)
IEnumerable<ISObjectList<T>> recordsLists, BulkConstants.ConcurrencyMode concurrencyMode = null)
{
return await RunJobAsync(objectName, null, operationType, recordsLists);
return await RunJobAsync(objectName, null, operationType, recordsLists, concurrencyMode);
}

public async Task<List<BatchInfoResult>> RunJobAsync<T>(string objectName, string externalIdFieldName,
BulkConstants.OperationType operationType, IEnumerable<ISObjectList<T>> recordsLists)
BulkConstants.OperationType operationType, IEnumerable<ISObjectList<T>> recordsLists, BulkConstants.ConcurrencyMode concurrencyMode = null)
{
if (recordsLists == null) throw new ArgumentNullException("recordsLists");

if (operationType == BulkConstants.OperationType.Upsert && string.IsNullOrEmpty(externalIdFieldName)) throw new ArgumentNullException(nameof(externalIdFieldName));

var jobInfoResult = await CreateJobAsync(objectName, externalIdFieldName, operationType);
var jobInfoResult = await CreateJobAsync(objectName, externalIdFieldName, operationType, concurrencyMode);
var batchResults = new List<BatchInfoResult>();
foreach (var recordList in recordsLists)
{
Expand All @@ -309,18 +309,18 @@ public async Task<List<BatchInfoResult>> RunJobAsync<T>(string objectName, strin
}

public async Task<List<BatchResultList>> RunJobAndPollAsync<T>(string objectName, BulkConstants.OperationType operationType,
IEnumerable<ISObjectList<T>> recordsLists)
IEnumerable<ISObjectList<T>> recordsLists, BulkConstants.ConcurrencyMode concurrencyMode = null)
{
return await RunJobAndPollAsync(objectName, null, operationType, recordsLists);
return await RunJobAndPollAsync(objectName, null, operationType, recordsLists, concurrencyMode);
}

public async Task<List<BatchResultList>> RunJobAndPollAsync<T>(string objectName, string externalIdFieldName, BulkConstants.OperationType operationType,
IEnumerable<ISObjectList<T>> recordsLists)
IEnumerable<ISObjectList<T>> recordsLists, BulkConstants.ConcurrencyMode concurrencyMode = null)
{
const float pollingStart = 1000;
const float pollingIncrease = 2.0f;

var batchInfoResults = await RunJobAsync(objectName, externalIdFieldName, operationType, recordsLists);
var batchInfoResults = await RunJobAsync(objectName, externalIdFieldName, operationType, recordsLists, concurrencyMode);

var currentPoll = pollingStart;
var finishedBatchInfoResults = new List<BatchInfoResult>();
Expand Down Expand Up @@ -356,23 +356,26 @@ public async Task<List<BatchResultList>> RunJobAndPollAsync<T>(string objectName
return batchResults;
}

public async Task<JobInfoResult> CreateJobAsync(string objectName, BulkConstants.OperationType operationType)
public async Task<JobInfoResult> CreateJobAsync(string objectName, BulkConstants.OperationType operationType, BulkConstants.ConcurrencyMode concurrencyMode = null)
{
return await CreateJobAsync(objectName, null, operationType);
return await CreateJobAsync(objectName, null, operationType, concurrencyMode);
}

public async Task<JobInfoResult> CreateJobAsync(string objectName, string externalIdFieldName, BulkConstants.OperationType operationType)
public async Task<JobInfoResult> CreateJobAsync(string objectName, string externalIdFieldName, BulkConstants.OperationType operationType, BulkConstants.ConcurrencyMode concurrencyMode = null)
{
if (string.IsNullOrEmpty(objectName)) throw new ArgumentNullException(nameof(objectName));

if (operationType == BulkConstants.OperationType.Upsert && string.IsNullOrEmpty(externalIdFieldName)) throw new ArgumentNullException(nameof(externalIdFieldName));

if (concurrencyMode == null) concurrencyMode = BulkConstants.ConcurrencyMode.Parallel;

var jobInfo = new JobInfo
{
ContentType = "XML",
Object = objectName,
ExternalIdFieldName = externalIdFieldName,
Operation = operationType.Value()
Operation = operationType.Value(),
ConcurrencyMode = concurrencyMode.Value()
};

return await _xmlHttpClient.HttpPostAsync<JobInfoResult>(jobInfo, "/services/async/{0}/job");
Expand Down
12 changes: 7 additions & 5 deletions src/ForceToolkitForNET/IForceClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ public interface IForceClient : IDisposable
Task<QueryResult<T>> QueryAllAsync<T>(string query);
Task<T> QueryByIdAsync<T>(string objectName, string recordId);
Task<T> QueryAllFieldsByIdAsync<T>(string objectName, string recordId);
Task<T> QueryAllFieldsByExternalIdAsync<T>(string objectName, string externalIdFieldName, string externalId);
Task<T> QueryAllFieldsByExternalIdAsync<T>(string objectName, string externalIdFieldName, string externalId);
Task<T> ExecuteRestApiAsync<T>(string apiName);
Task<T> ExecuteRestApiAsync<T>(string apiName, object inputObject);
Task<SuccessResponse> CreateAsync(string objectName, object record);
Expand All @@ -37,12 +37,14 @@ public interface IForceClient : IDisposable
Task<T> UserInfo<T>(string url);
Task<System.IO.Stream> GetBlobAsync(String objectName, String objectId, String fieldName);
Task<string> GetFieldsCommaSeparatedListAsync(string objectName);
Task<T> ExecuteAnonymousAsync<T>(string apex);
Task<T> ExecuteAnonymousAsync<T>(string apex);

// BULK
Task<List<BatchInfoResult>> RunJobAsync<T>(string objectName, BulkConstants.OperationType operationType, IEnumerable<ISObjectList<T>> recordsLists);
Task<List<BatchResultList>> RunJobAndPollAsync<T>(string objectName, BulkConstants.OperationType operationType, IEnumerable<ISObjectList<T>> recordsLists);
Task<JobInfoResult> CreateJobAsync(string objectName, BulkConstants.OperationType operationType);
Task<List<BatchInfoResult>> RunJobAsync<T>(string objectName, BulkConstants.OperationType operationType,IEnumerable<ISObjectList<T>> recordsLists, BulkConstants.ConcurrencyMode concurrencyMode = null);
Task<List<BatchInfoResult>> RunJobAsync<T>(string objectName, string externalIdFieldName, BulkConstants.OperationType operationType, IEnumerable<ISObjectList<T>> recordsLists, BulkConstants.ConcurrencyMode concurrencyMode = null);
Task<List<BatchResultList>> RunJobAndPollAsync<T>(string objectName, BulkConstants.OperationType operationType, IEnumerable<ISObjectList<T>> recordsLists, BulkConstants.ConcurrencyMode concurrencyMode = null);
Task<List<BatchResultList>> RunJobAndPollAsync<T>(string objectName, string externalIdFieldName, BulkConstants.OperationType operationType, IEnumerable<ISObjectList<T>> recordsLists, BulkConstants.ConcurrencyMode concurrencyMode = null);
Task<JobInfoResult> CreateJobAsync(string objectName, BulkConstants.OperationType operationType, BulkConstants.ConcurrencyMode concurrencyMode = null);
Task<BatchInfoResult> CreateJobBatchAsync<T>(JobInfoResult jobInfo, ISObjectList<T> recordsObject);
Task<BatchInfoResult> CreateJobBatchAsync<T>(string jobId, ISObjectList<T> recordsObject);
Task<JobInfoResult> CloseJobAsync(JobInfoResult jobInfo);
Expand Down