Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Addition of Upsert operation in Salesforce sink Plugin #17

Open
wants to merge 2 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions src/e2e-test/features/salesforcesink/RunTime.feature
Original file line number Diff line number Diff line change
Expand Up @@ -132,3 +132,41 @@ Feature: Salesforce Sink - Run time Scenarios
Then Verify the pipeline status is "Succeeded"
Then Close the pipeline logs
Then Validate the values of records transferred from Bigquery to Salesforce is equal

@SINK-TS-SF-RNTM-04 @BQ_SOURCE_TEST @DELETE_TEST_DATA
Scenario: Verify user should be able to see ingest the records successfully using upsert operation
When Open Datafusion Project to configure pipeline
And Select plugin: "BigQuery" from the plugins list as: "Source"
And Navigate to the properties page of plugin: "BigQuery"
Then Replace input plugin property: "project" with value: "projectId"
Then Enter input plugin property: "datasetProject" with value: "projectId"
Then Enter input plugin property: "referenceName" with value: "BQReferenceName"
Then Enter input plugin property: "dataset" with value: "dataset"
Then Enter input plugin property: "table" with value: "bqSourceTable"
Then Validate "BigQuery" plugin properties
And Close the Plugin Properties page
And Select Sink plugin: "Salesforce" from the plugins list
And Connect plugins: "BigQuery" and "Salesforce" to establish connection
And Navigate to the properties page of plugin: "Salesforce"
And fill Authentication properties for Salesforce Admin user
Then Enter input plugin property: "referenceName" with value: "ReferenceName"
And Select radio button plugin property: "operation" with value: "upsert"
Then Enter input plugin property: "externalIdField" with value: "UpsertColumnvalue"
And Enter input plugin property: "sObject" with value: "sobject.account"
And Select dropdown plugin property: "errorHandling" with option value: "Skip on error"
Then Validate "Salesforce" plugin properties
And Close the Plugin Properties page
And Save the pipeline
And Preview and run the pipeline
Then Wait till pipeline preview is in running state
Then Open and capture pipeline preview logs
Then Verify the preview run status of pipeline in the logs is "succeeded"
Then Close the pipeline logs
Then Close the preview
Then Deploy the pipeline
Then Run the Pipeline in Runtime
Then Wait till pipeline is in running state
Then Open and capture logs
Then Verify the pipeline status is "Succeeded"
Then Close the pipeline logs
Then Validate the values of records transferred from Bigquery to Salesforce is equal
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,12 @@ public static void createTempSourceBQTable() throws IOException, InterruptedExce
PluginPropertyUtils.pluginProp("InsertBQDataQueryFile"));
}

@Before(order = 1, value = "@BQ_FAILURE_TEST")
public static void createTempSourceInvalidBQTable() throws IOException, InterruptedException {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where are we using this hook?

createSourceBQTableWithQueries(PluginPropertyUtils.pluginProp("BigQueryCreateTableFailureQuery"),
PluginPropertyUtils.pluginProp("BigQueryInsertDataFailureQuery"));
}

@After(order = 1, value = "@BQ_TEMP_CLEANUP")
public static void deleteTemperoryCreatedBQTable() throws IOException, InterruptedException {
String bqTargetTableName = PluginPropertyUtils.pluginProp("bqTargetTable") + "_v1";
Expand Down
16 changes: 8 additions & 8 deletions src/e2e-test/java/io/cdap/plugin/utils/SalesforceClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -178,14 +178,14 @@ public static List<JsonObject> queryObject(String id, String objectName) {
public static void deletePushTopic(String pushTopicName) {
try {
PartnerConnection partnerConnection = new PartnerConnection(
Authenticator.createConnectorConfig(AuthenticatorCredentials.fromParameters(USERNAME, PASSWORD + SECURITYTOKEN,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove the unnecessary/non-targeted changes

CLIENTID, CLIENTSECRET, PluginPropertyUtils.
pluginProp("login.url"),
30000, 3600, "")));
Authenticator.createConnectorConfig(AuthenticatorCredentials.fromParameters(USERNAME, PASSWORD + SECURITYTOKEN,
CLIENTID, CLIENTSECRET, PluginPropertyUtils.
pluginProp("login.url"),
30000, 3600, "")));

QueryResult queryResult = SalesforceStreamingSourceConfig.runQuery(
partnerConnection,
String.format("SELECT Id FROM PushTopic WHERE Name = '%s'", pushTopicName)
partnerConnection,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove the unnecessary/non-targeted changes

String.format("SELECT Id FROM PushTopic WHERE Name = '%s'", pushTopicName)
);

SObject sobject = queryResult.getRecords()[0];
Expand All @@ -204,7 +204,7 @@ public static void deletePushTopic(String pushTopicName) {
} catch (ConnectionException e) {
String message = SalesforceConnectionUtil.getSalesforceErrorMessageFromException(e);
throw new InvalidStageException(
String.format("Cannot connect to Salesforce API with credentials specified due to error: %s", message), e);
String.format("Cannot connect to Salesforce API with credentials specified due to error: %s", message), e);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove the unnecessary/non-targeted changes

}
}

Expand Down Expand Up @@ -269,7 +269,7 @@ public static String queryObjectId(String objectName) {
return uniqueRecordId;
}

public static void updateObject(String id, String objectName) {
public static void updateObject(String id, String objectName) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove the unnecessary/non-targeted changes

getAccessToken();
HttpClient httpClient = HttpClientBuilder.create().build();
String baseUri = loginInstanceUrl + REST_ENDPOINT + API_VERSION;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
create table `DATASET.TABLE_NAME` (Id STRING,Name STRING)
7 changes: 4 additions & 3 deletions src/e2e-test/resources/BigQuery/BigQueryCreateTableQuery.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
create table `DATASET.TABLE_NAME` (Name STRING, Col_Timestamp__c TIMESTAMP, Col_Date__c DATE, Col_Currency__c FLOAT64,
Col_Email__c STRING, Col_Number__c FLOAT64, Col_GeoLocation__Latitude__s FLOAT64,
Col_GeoLocation__Longitude__s FLOAT64, Col__c STRING, Col_Url__c STRING, Col_Time__c TIME, Col_Text__c STRING)
create table `DATASET.TABLE_NAME` (Id__c FLOAT64, Name STRING, Col_Timestamp__c TIMESTAMP, Col_Date__c DATE, Col_Currency__c FLOAT64,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this change done to fix the broken environment dependent tests?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we have done this change to fix the broken environment dependent tests.

Col_Email__c STRING, Col_Number__c FLOAT64, Col__c STRING, Col_Url__c STRING, Col_Time__c TIME, Col_Text__c STRING)


Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
insert into `DATASET.TABLE_NAME` (Id ,Name ) values
('abc','Test_PB' );

insert into `DATASET.TABLE_NAME` (Id ,Name ) values
('345','Test_PB1' );

insert into `DATASET.TABLE_NAME` (Id ,Name ) values
('768','Test_PB23' );
6 changes: 3 additions & 3 deletions src/e2e-test/resources/BigQuery/BigQueryInsertDataQuery.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
insert into `DATASET.TABLE_NAME` (Name, Col_Timestamp__c, Col_Date__c, Col_Currency__c, Col_Email__c, Col_Number__c,
Col_GeoLocation__Latitude__s, Col_GeoLocation__Longitude__s, Col__c, Col_Url__c, Col_Time__c, Col_Text__c) values
('adam','2019-03-10 04:50:01 UTC','2021-01-28',61.823765812,'[email protected]',898365444,37.794116,-122.3432,
insert into `DATASET.TABLE_NAME` (Id__c, Name, Col_Timestamp__c, Col_Date__c, Col_Currency__c, Col_Email__c, Col_Number__c,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this change done to fix the broken environment dependent tests?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we have done this change to fix the broken environment dependent tests.

Col__c, Col_Url__c, Col_Time__c, Col_Text__c) values
(786777,'adam','2019-03-10 04:50:01 UTC','2021-01-28',61.823765812,'[email protected]',-122.3432,
'984746334','abc/123','20:26:34','find');

2 changes: 2 additions & 0 deletions src/e2e-test/resources/errorMessage.properties
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,5 @@ required.property.topicname=Required property 'pushTopicName' has no value.

#Validation messages
validationSuccessMessage=No errors found.

errorMessagerecordprocessed= Total ///
25 changes: 16 additions & 9 deletions src/e2e-test/resources/pluginParameters.properties
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ invalid.admin.consumer.secret=lmnop891011
#SOQL Query
simple.query=SELECT Id, Name, Phone FROM Account
test.query=SELECT Id,Name,Col_Timestamp__c,Col_Date__c,Col_Currency__c,Col_Email__c,Col_Number__c,\
Col_GeoLocation__Latitude__s,Col_GeoLocation__Longitude__s,Col__c,Col_Url__c,Col_Time__c,Col_Text__c FROM Automation_custom__c
Col__c,Col_Url__c,Col_Time__c,Col_Text__c FROM Automation_custom__c

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this change done to fix the broken environment dependent tests?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we have done this change to fix the broken environment dependent tests.

where.query=SELECT name FROM Opportunity WHERE StageName='Needs Analysis'
groupby.query=SELECT CampaignId, AVG(Amount) FROM Opportunity GROUP BY CampaignId
childtoparent.query=SELECT Id, Name, Account.Name FROM Contact WHERE Account.Industry = 'Chemicals'
Expand All @@ -29,8 +29,9 @@ opportunity.query=SELECT Id,Name,StageName,AccountId,LastViewedDate,LastReferenc
AccountId='0015j00000kN78QAAS'

sobject.Automation_custom_c=Automation_custom__c

#SObjects
sobject.account=Account
sobject.account=Automation_custom__c
Salesforce.sobjectName=Opportunity
sobject.lead=Lead
sobject.invalid=abc
Expand All @@ -50,7 +51,7 @@ hundred.million.records=100000000
topic.name=StreamingSalesforce
#Macros
sfmultisource.listofsobjects=Lead,Opportunity
sfmultisource.listofsobject=Automation_custom__c,Automation_custom2__c
sfmultisource.listofsobject=Automation_custom__c
sfmultisource.sobject.lead=Lead
sfmultisource.sobject.customobject=Automation_custom__c
sfmultisource.listofsobjectsforblacklist=Account,Contact
Expand Down Expand Up @@ -107,7 +108,7 @@ account.schema= [{"key":"Id","value":"string"},{"key":"IsDeleted","value":"boole
{"key":"ParentId","value":"string"},{"key":"BillingStreet","value":"string"},{"key":"BillingCity","value":"string"},\
{"key":"BillingState","value":"string"},{"key":"BillingPostalCode","value":"string"},\
{"key":"BillingCountry","value":"string"},\
{"key":"BillingLatitude","value":"double"},{"key":"BillingLongitude","value":"double"},\
{"key":"BillingLatitude","value":"double"},{"key":"Billingtest_automation_sfgitude","value":"double"},\
{"key":"BillingGeocodeAccuracy","value":"string"},{"key":"ShippingStreet","value":"string"},\
{"key":"ShippingCity","value":"string"},{"key":"ShippingState","value":"string"},\
{"key":"ShippingPostalCode","value":"string"},{"key":"ShippingCountry","value":"string"},\
Expand All @@ -126,15 +127,21 @@ account.schema= [{"key":"Id","value":"string"},{"key":"IsDeleted","value":"boole

CreateBQTableQueryFile=BigQuery/BigQueryCreateTableQuery.txt
InsertBQDataQueryFile=BigQuery/BigQueryInsertDataQuery.txt
BigQueryCreateTableFailureQuery=BigQuery/BigQueryCreateTableFailureQuery.txt
BigQueryInsertDataFailureQuery=BigQuery/BigQueryInsertDataFailureQuery.txt

bigQueryDatatypesColumnsList=(Id,Name,Col_Timestamp__c,Col_Date__c,Col_Currency__c,Col_Email__c,Col_Number__c,\
Col_GeoLocation__Latitude__s,Col_GeoLocation__Longitude__s,Col__c,Col_Url__c,Col_Time__c,Col_Text__c)
,Col__c,Col_Url__c,Col_Time__c,Col_Text__c)

testData={"Name":"hello","Col_Timestamp__c":"2023-06-14T07:04:56.000+0000","Col_Date__c":"2023-06-14",\
"Col_Currency__c":123.456,"Col_Email__c":"[email protected]","Col_Number__c":1008.0,\
"Col_GeoLocation__Latitude__s":37.794016,"Col_GeoLocation__Longitude__s":-122.395016,"Col__c":"984746334",\
"Col_Url__c":"abc/123","Col_Time__c":"05:00:00.000Z","Col_Text__c":"shsss"}
testData={"Id__c" : "123","Name":"hello","Col_Timestamp__c":"2023-06-14T07:04:56.000+0000","Col_Date__c":"2023-06-14","Col_Currency__c":567.789,"Col_Email__c":"[email protected]","Col_Number__c":1008.0,\
"Col__c":"984746334", "Col_Url__c":"abc/123","Col_Time__c":"05:00:00.000Z","Col_Text__c":"shsss"}

testObjectData={"Name":"dummy","Col_Timestamp__c":"2023-06-14T07:04:56.000+0000","Col_Date__c":"2023-06-14",\
"Col_Currency__c":567.789,"Col_Email__c":"[email protected]","Col_Number__c":1008.0,\
"Col_Url__c":"def/123","Col_Time__c":"05:00:00.000Z","Col_Text__c":"finder","Col_Phone__c":"123567"}
#testData={"Name":"hello","Col_Timestamp__c":"2023-06-14T07:04:56.000+0000","Col_Date__c":"2023-06-14",\
"Col_Currency__c":123.456,"Col_Email__c":"[email protected]","Col_Number__c":1008.0,\
"Col__c":"984746334","Col_Url__c":"abc/123","Col_Time__c":"05:00:00.000Z","Col_Text__c":"shsss"}

url = http://10.132.0.45:3128

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is this property used for?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Property removed

UpsertColumnvalue=Id__c