Skip to content

Commit

Permalink
Added ReadTimeout in all salesforce plugins.
Browse files Browse the repository at this point in the history
  • Loading branch information
vikasrathee-cs committed Nov 15, 2023
1 parent cef543f commit ce92447
Show file tree
Hide file tree
Showing 33 changed files with 246 additions and 97 deletions.
7 changes: 5 additions & 2 deletions docs/Salesforce-batchsink.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ You also can use the macro function ${conn(connection-name)}.
**Security Token:** Salesforce security token. If the password does not contain the security token, the plugin
will append the token before authenticating with Salesforce.

**Consumer Key:** Application Consumer Key. This is also known as the OAuth client ID.
**Consumer Key:** Application Consumer Key. This is also known as the OAuth client ID.
A Salesforce connected application must be created in order to get a consumer key.

**Consumer Secret:** Application Consumer Secret. This is also known as the OAuth client secret.
Expand All @@ -33,12 +33,15 @@ A Salesforce connected application must be created in order to get a client secr

**Connect Timeout:** Maximum time in milliseconds to wait for connection initialization before it times out.

**Read Timeout:** Maximum time in seconds to fetch data from the server before time out.

**Proxy URL:** Proxy URL. Must contain a protocol, address and port.

**SObject Name:** Salesforce object name to insert records into.

There are also **sObjects** that are not supported in the Bulk API of Salesforce.
When a job is created using an object that is not supported in the Bulk API, "_Entity is not supported by the Bulk API_" is thrown.
When a job is created using an object that is not supported in the Bulk API, "_Entity is not supported by the Bulk API_"
is thrown.
These objects are also not supported by _Einstein Analytics_ as it also uses Bulk API for querying data.

Below is a non-comprehensive list of **sObjects** that are not currently available in the Bulk API:
Expand Down
3 changes: 2 additions & 1 deletion docs/Salesforce-batchsource.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,12 @@ A Salesforce connected application must be created in order to get a client secr

**Connect Timeout:** Maximum time in milliseconds to wait for connection initialization before it times out.

**Read Timeout:** Maximum time in seconds to fetch data from the server before time out.

**Proxy URL:** Proxy URL. Must contain a protocol, address and port.

**SOQL Query:** A SOQL query to fetch data into source.


Examples:

``SELECT Id, Name, BillingCity FROM Account``
Expand Down
4 changes: 3 additions & 1 deletion docs/Salesforce-connector.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,12 @@ A Salesforce connected application must be created in order to get a client secr

**Connect Timeout:** Maximum time in milliseconds to wait for connection initialization before it times out.

**Read Timeout:** Maximum time in seconds to fetch data from the server before time out.

**Proxy URL:** Proxy URL. Must contain a protocol, address and port.

Path of the connection
----------------------
To browse, get a sample from, or get the specification for this connection. (Not supported in the Salesforce Streaming
To browse, get a sample from, or get the specification for this connection. (Not supported in the Salesforce Streaming
source and Salesforce Multi Object batch source.).
/{object} This path indicates a Salesforce object. An object is the only one that can be sampled.
6 changes: 3 additions & 3 deletions docs/Salesforce-streamingsource.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,18 +36,18 @@ A Salesforce connected application must be created in order to get a client secr

**Login URL:** Salesforce OAuth2 login URL.

**Connect Timeout:** Maximum time in milliseconds to wait for connection initialization before it times out.
**Connect Timeout:** Maximum time in milliseconds to wait for connection initialization before it times out.

**Read Timeout:** Maximum time in seconds to fetch data from the server before time out.

**Proxy URL:** Proxy URL. Must contain a protocol, address and port.

**Topic Name:** Salesforce push topic name. Plugin will track updates from this topic. If the topic does
not exist, the plugin creates it. To manually create pushTopic use Salesforce workbench, Apex code or API.


**Query:** Salesforce push topic query. The query is used by Salesforce to send updates to push topic.
This field not required if you are using an existing push topic.


**Notify On Create:** Push topic property, which specifies if a create operation should generate a record.


Expand Down
8 changes: 4 additions & 4 deletions docs/SalesforceMultiObjects-batchsource.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,19 +33,19 @@ A Salesforce connected application must be created in order to get a client secr

**Login URL:** Salesforce OAuth2 login URL.

**Connect Timeout:** Maximum time in milliseconds to wait for connection initialization before it times out.

**Connect Timeout:** Maximum time in milliseconds to wait for connection initialization before time out.

**Read Timeout:** Maximum time in seconds to fetch data from the server before time out.

**Proxy URL:** Proxy URL. Must contain a protocol, address and port.

**White List**: List of SObjects to read from. By default all SObjects will be white listed.
**White List**: List of SObjects to read from. By default all SObjects will be white listed.
For each white listed SObject, a SOQL query will be generated of the form:
`select <FIELD_1, FIELD_2, ..., FIELD_N> from ${sObjectName}`.

**Black List**: List of SObjects to avoid reading from. By default NONE of SObjects will be black listed.

**Last Modified After:** Filter data to only include records where the system field `LastModifiedDate` is greater than
**Last Modified After:** Filter data to only include records where the system field `LastModifiedDate` is greater than
or equal to the specified date. The date must be provided in the Salesforce date format:

| Format | Format Syntax | Example |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,17 +58,22 @@ public static AuthenticatorCredentials getAuthenticatorCredentials(Configuration
if (conf.get(SalesforceConstants.CONFIG_CONNECT_TIMEOUT) != null) {
connectTimeout = Integer.parseInt(conf.get(SalesforceConstants.CONFIG_CONNECT_TIMEOUT));
}
Integer readTimeout = SalesforceConstants.DEFAULT_READ_TIMEOUT_SEC * 1000;
if (conf.get(SalesforceConstants.CONFIG_READ_TIMEOUT) != null) {
readTimeout = Integer.parseInt(conf.get(SalesforceConstants.CONFIG_READ_TIMEOUT));
}
String proxyUrl = conf.get(SalesforceConstants.CONFIG_PROXY_URL);
if (oAuthToken != null && instanceURL != null) {
return new AuthenticatorCredentials(new OAuthInfo(oAuthToken, instanceURL), connectTimeout, proxyUrl);
return new AuthenticatorCredentials(new OAuthInfo(oAuthToken, instanceURL), connectTimeout, readTimeout,
proxyUrl);
}

return new AuthenticatorCredentials(conf.get(SalesforceConstants.CONFIG_USERNAME),
conf.get(SalesforceConstants.CONFIG_PASSWORD),
conf.get(SalesforceConstants.CONFIG_CONSUMER_KEY),
conf.get(SalesforceConstants.CONFIG_CONSUMER_SECRET),
conf.get(SalesforceConstants.CONFIG_LOGIN_URL),
connectTimeout, proxyUrl);
connectTimeout, readTimeout, proxyUrl);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,11 @@ public class SalesforceConstants {
public static final int SOQL_MAX_LENGTH = 20000;

public static final int DEFAULT_CONNECTION_TIMEOUT_MS = 30000;
public static final int DEFAULT_READ_TIMEOUT_SEC = 3600;
public static final String PROPERTY_CONNECT_TIMEOUT = "connectTimeout";
public static final String PROPERTY_READ_TIMEOUT = "readTimeout";
public static final String CONFIG_CONNECT_TIMEOUT = "mapred.salesforce.connectTimeout";
public static final String CONFIG_READ_TIMEOUT = "mapred.salesforce.readTimeout";

public static final String PROPERTY_PROXY_URL = "proxyUrl";
public static final String CONFIG_PROXY_URL = "mapred.salesforce.proxyUrl";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ public static ConnectorConfig createConnectorConfig(AuthenticatorCredentials cre
// Set this to true to see HTTP requests and responses on stdout
connectorConfig.setTraceMessage(false);
connectorConfig.setConnectionTimeout(credentials.getConnectTimeout());
connectorConfig.setReadTimeout(credentials.getReadTimeout());
return connectorConfig;
} catch (Exception e) {
String errorMessage = SalesforceConnectionUtil.getSalesforceErrorMessageFromException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,18 +34,19 @@ public class AuthenticatorCredentials implements Serializable {
private final String consumerSecret;
private final String loginUrl;
private final Integer connectTimeout;
private final Integer readTimeout;
private final String proxyUrl;

public AuthenticatorCredentials(OAuthInfo oAuthInfo, Integer connectTimeout, String proxyUrl) {
this(Objects.requireNonNull(oAuthInfo), null, null, null, null, null, connectTimeout, proxyUrl);
public AuthenticatorCredentials(OAuthInfo oAuthInfo, Integer connectTimeout, Integer readTimeout, String proxyUrl) {
this(Objects.requireNonNull(oAuthInfo), null, null, null, null, null, connectTimeout, readTimeout, proxyUrl);
}

public AuthenticatorCredentials(String username, String password,
String consumerKey, String consumerSecret, String loginUrl,
Integer connectTimeout, String proxyUrl) {
Integer connectTimeout, Integer readTimeout, String proxyUrl) {
this(null, Objects.requireNonNull(username), Objects.requireNonNull(password), Objects.requireNonNull(consumerKey),
Objects.requireNonNull(consumerSecret), Objects.requireNonNull(loginUrl),
Objects.requireNonNull(connectTimeout), proxyUrl);
Objects.requireNonNull(connectTimeout), Objects.requireNonNull(readTimeout), proxyUrl);
}

private AuthenticatorCredentials(@Nullable OAuthInfo oAuthInfo,
Expand All @@ -55,6 +56,7 @@ private AuthenticatorCredentials(@Nullable OAuthInfo oAuthInfo,
@Nullable String consumerSecret,
@Nullable String loginUrl,
@Nullable Integer connectTimeout,
@Nullable Integer readTimeout,
@Nullable String proxyUrl) {
this.oAuthInfo = oAuthInfo;
this.username = username;
Expand All @@ -63,6 +65,7 @@ private AuthenticatorCredentials(@Nullable OAuthInfo oAuthInfo,
this.consumerSecret = consumerSecret;
this.loginUrl = loginUrl;
this.connectTimeout = connectTimeout;
this.readTimeout = readTimeout;
this.proxyUrl = proxyUrl;
}

Expand Down Expand Up @@ -101,6 +104,10 @@ public Integer getConnectTimeout() {
return connectTimeout;
}

public Integer getReadTimeout() {
return readTimeout;
}

@Nullable
public String getProxyUrl() {
return proxyUrl;
Expand All @@ -123,11 +130,13 @@ public boolean equals(Object o) {
Objects.equals(consumerSecret, that.consumerSecret) &&
Objects.equals(loginUrl, that.loginUrl) &&
Objects.equals(connectTimeout, that.connectTimeout) &&
Objects.equals(readTimeout, that.readTimeout) &&
Objects.equals(proxyUrl, that.proxyUrl);
}

@Override
public int hashCode() {
return Objects.hash(username, password, consumerKey, consumerSecret, loginUrl, connectTimeout, proxyUrl);
return Objects.hash(username, password, consumerKey, consumerSecret, loginUrl, connectTimeout, readTimeout,
proxyUrl);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,12 @@ public class SalesforceConnectorBaseConfig extends PluginConfig {
@Nullable
private final Integer connectTimeout;

@Name(SalesforceConstants.PROPERTY_READ_TIMEOUT)
@Description("Maximum time in seconds to fetch data from the server before time out.")
@Macro
@Nullable
private final Integer readTimeout;

@Name(SalesforceConstants.PROPERTY_CONSUMER_KEY)
@Description("Salesforce connected app's consumer key")
@Macro
Expand Down Expand Up @@ -87,6 +93,7 @@ public SalesforceConnectorBaseConfig(@Nullable String consumerKey,
@Nullable String loginUrl,
@Nullable String securityToken,
@Nullable Integer connectTimeout,
@Nullable Integer readTimeout,
@Nullable String proxyUrl) {
this.consumerKey = consumerKey;
this.consumerSecret = consumerSecret;
Expand All @@ -95,6 +102,7 @@ public SalesforceConnectorBaseConfig(@Nullable String consumerKey,
this.loginUrl = loginUrl;
this.securityToken = securityToken;
this.connectTimeout = connectTimeout;
this.readTimeout = readTimeout;
this.proxyUrl = proxyUrl;
}

Expand Down Expand Up @@ -131,6 +139,14 @@ public Integer getConnectTimeout() {
return connectTimeout;
}

@Nullable
public Integer getReadTimeoutInMillis() {
if (readTimeout == null) {
return SalesforceConstants.DEFAULT_READ_TIMEOUT_SEC * 1000;
}
return readTimeout * 1000;
}

public void validate(FailureCollector collector, @Nullable OAuthInfo oAuthInfo) {
try {
validateConnection(oAuthInfo);
Expand All @@ -149,6 +165,7 @@ private void validateConnection(@Nullable OAuthInfo oAuthInfo) {

try {
SalesforceConnectionUtil.getPartnerConnection(new AuthenticatorCredentials(oAuthInfo, this.getConnectTimeout(),
this.getReadTimeoutInMillis(),
getProxyUrl()));
} catch (ConnectionException e) {
String message = SalesforceConnectionUtil.getSalesforceErrorMessageFromException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,11 @@ public Integer getConnectTimeout() {
return config.getConnectTimeout();
}

@Nullable
public Integer getReadTimeout() {
return config.getReadTimeoutInMillis();
}

@Nullable
public String getProxyUrl() {
return config.getProxyUrl();
Expand All @@ -92,12 +97,12 @@ public void validate(FailureCollector collector, @Nullable OAuthInfo oAuthInfo)
public AuthenticatorCredentials getAuthenticatorCredentials() {
OAuthInfo oAuthInfo = getOAuthInfo();
if (oAuthInfo != null) {
return new AuthenticatorCredentials(oAuthInfo, config.getConnectTimeout(),
return new AuthenticatorCredentials(oAuthInfo, config.getConnectTimeout(), config.getReadTimeoutInMillis(),
config.getProxyUrl());
}
return new AuthenticatorCredentials(config.getUsername(), config.getPassword(), config.getConsumerKey(),
config.getConsumerSecret(), config.getLoginUrl(), config.getConnectTimeout(),
config.getProxyUrl());
config.getReadTimeoutInMillis(), config.getProxyUrl());
}

/**
Expand All @@ -123,7 +128,8 @@ public boolean canAttemptToEstablishConnection() {
|| config.containsMacro(SalesforceConstants.PROPERTY_PASSWORD)
|| config.containsMacro(SalesforceConstants.PROPERTY_LOGIN_URL)
|| config.containsMacro(SalesforceConstants.PROPERTY_SECURITY_TOKEN)
|| config.containsMacro(SalesforceConstants.PROPERTY_CONNECT_TIMEOUT));
|| config.containsMacro(SalesforceConstants.PROPERTY_CONNECT_TIMEOUT)
|| config.containsMacro(SalesforceConstants.PROPERTY_READ_TIMEOUT));
}

private void validateConnection(@Nullable OAuthInfo oAuthInfo) {
Expand All @@ -133,6 +139,7 @@ private void validateConnection(@Nullable OAuthInfo oAuthInfo) {

try {
SalesforceConnectionUtil.getPartnerConnection(new AuthenticatorCredentials(oAuthInfo, config.getConnectTimeout(),
config.getReadTimeoutInMillis(),
config.getProxyUrl()));
} catch (ConnectionException e) {
String message = SalesforceConnectionUtil.getSalesforceErrorMessageFromException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ public BrowseDetail browse() throws IOException {
config.getConsumerSecret(),
config.getLoginUrl(),
config.getConnectTimeout(),
config.getReadTimeoutInMillis(),
config.getProxyUrl());
BrowseDetail.Builder browseDetailBuilder = BrowseDetail.builder();
int count = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,11 @@ public SalesforceConnectorConfig(@Nullable String consumerKey,
@Nullable String loginUrl,
@Nullable String securityToken,
@Nullable Integer connectTimeout,
@Nullable Integer readTimeout,
@Nullable OAuthInfo oAuthInfo,
@Nullable String proxyUrl) {
super(consumerKey, consumerSecret, username, password, loginUrl, securityToken, connectTimeout, proxyUrl);
super(consumerKey, consumerSecret, username, password, loginUrl, securityToken, connectTimeout, readTimeout,
proxyUrl);
this.oAuthInfo = oAuthInfo;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ public SalesforceOutputFormatProvider(SalesforceSinkConfig config) {
.put(SalesforceSinkConstants.CONFIG_ERROR_HANDLING, config.getErrorHandling().getValue())
.put(SalesforceSinkConstants.CONFIG_MAX_BYTES_PER_BATCH, config.getMaxBytesPerBatch().toString())
.put(SalesforceSinkConstants.CONFIG_MAX_RECORDS_PER_BATCH, config.getMaxRecordsPerBatch().toString())
.put(SalesforceConstants.CONFIG_CONNECT_TIMEOUT, config.getConnection().getConnectTimeout().toString());
.put(SalesforceConstants.CONFIG_CONNECT_TIMEOUT, config.getConnection().getConnectTimeout().toString())
.put(SalesforceConstants.CONFIG_READ_TIMEOUT, config.getConnection().getReadTimeout().toString());

if (!Strings.isNullOrEmpty(config.getConnection().getProxyUrl())) {
configBuilder.put(SalesforceConstants.CONFIG_PROXY_URL, config.getConnection().getProxyUrl());
Expand Down
Loading

0 comments on commit ce92447

Please sign in to comment.