Skip to content

Commit

Permalink
Upsert Changes
Browse files Browse the repository at this point in the history
  • Loading branch information
anilm386 committed Aug 20, 2024
1 parent 735cdf6 commit 6f361e3
Show file tree
Hide file tree
Showing 9 changed files with 86 additions and 23 deletions.
38 changes: 38 additions & 0 deletions src/e2e-test/features/salesforcesink/RunTime.feature
Original file line number Diff line number Diff line change
Expand Up @@ -132,3 +132,41 @@ Feature: Salesforce Sink - Run time Scenarios
Then Verify the pipeline status is "Succeeded"
Then Close the pipeline logs
Then Validate the values of records transferred from Bigquery to Salesforce is equal

@SINK-TS-SF-RNTM-04 @BQ_SOURCE_TEST @DELETE_TEST_DATA
Scenario: Verify user should be able to see ingest the records successfully using upsert operation
When Open Datafusion Project to configure pipeline
And Select plugin: "BigQuery" from the plugins list as: "Source"
And Navigate to the properties page of plugin: "BigQuery"
Then Replace input plugin property: "project" with value: "projectId"
Then Enter input plugin property: "datasetProject" with value: "projectId"
Then Enter input plugin property: "referenceName" with value: "BQReferenceName"
Then Enter input plugin property: "dataset" with value: "dataset"
Then Enter input plugin property: "table" with value: "bqSourceTable"
Then Validate "BigQuery" plugin properties
And Close the Plugin Properties page
And Select Sink plugin: "Salesforce" from the plugins list
And Connect plugins: "BigQuery" and "Salesforce" to establish connection
And Navigate to the properties page of plugin: "Salesforce"
And fill Authentication properties for Salesforce Admin user
Then Enter input plugin property: "referenceName" with value: "ReferenceName"
And Select radio button plugin property: "operation" with value: "upsert"
Then Enter input plugin property: "externalIdField" with value: "UpsertColumnvalue"
And Enter input plugin property: "sObject" with value: "sobject.account"
And Select dropdown plugin property: "errorHandling" with option value: "Skip on error"
Then Validate "Salesforce" plugin properties
And Close the Plugin Properties page
And Save the pipeline
And Preview and run the pipeline
Then Wait till pipeline preview is in running state
Then Open and capture pipeline preview logs
Then Verify the preview run status of pipeline in the logs is "succeeded"
Then Close the pipeline logs
Then Close the preview
Then Deploy the pipeline
Then Run the Pipeline in Runtime
Then Wait till pipeline is in running state
Then Open and capture logs
Then Verify the pipeline status is "Succeeded"
Then Close the pipeline logs
Then Validate the values of records transferred from Bigquery to Salesforce is equal
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,12 @@ public static void createTempSourceBQTable() throws IOException, InterruptedExce
PluginPropertyUtils.pluginProp("InsertBQDataQueryFile"));
}

@Before(order = 1, value = "@BQ_FAILURE_TEST")
public static void createTempSourceInvalidBQTable() throws IOException, InterruptedException {
createSourceBQTableWithQueries(PluginPropertyUtils.pluginProp("BigQueryCreateTableFailureQuery"),
PluginPropertyUtils.pluginProp("BigQueryInsertDataFailureQuery"));
}

@After(order = 1, value = "@BQ_TEMP_CLEANUP")
public static void deleteTemperoryCreatedBQTable() throws IOException, InterruptedException {
String bqTargetTableName = PluginPropertyUtils.pluginProp("bqTargetTable") + "_v1";
Expand Down
16 changes: 8 additions & 8 deletions src/e2e-test/java/io/cdap/plugin/utils/SalesforceClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -178,14 +178,14 @@ public static List<JsonObject> queryObject(String id, String objectName) {
public static void deletePushTopic(String pushTopicName) {
try {
PartnerConnection partnerConnection = new PartnerConnection(
Authenticator.createConnectorConfig(AuthenticatorCredentials.fromParameters(USERNAME, PASSWORD + SECURITYTOKEN,
CLIENTID, CLIENTSECRET, PluginPropertyUtils.
pluginProp("login.url"),
30000, 3600, "")));
Authenticator.createConnectorConfig(AuthenticatorCredentials.fromParameters(USERNAME, PASSWORD + SECURITYTOKEN,
CLIENTID, CLIENTSECRET, PluginPropertyUtils.
pluginProp("login.url"),
30000, 3600, "")));

QueryResult queryResult = SalesforceStreamingSourceConfig.runQuery(
partnerConnection,
String.format("SELECT Id FROM PushTopic WHERE Name = '%s'", pushTopicName)
partnerConnection,
String.format("SELECT Id FROM PushTopic WHERE Name = '%s'", pushTopicName)
);

SObject sobject = queryResult.getRecords()[0];
Expand All @@ -204,7 +204,7 @@ public static void deletePushTopic(String pushTopicName) {
} catch (ConnectionException e) {
String message = SalesforceConnectionUtil.getSalesforceErrorMessageFromException(e);
throw new InvalidStageException(
String.format("Cannot connect to Salesforce API with credentials specified due to error: %s", message), e);
String.format("Cannot connect to Salesforce API with credentials specified due to error: %s", message), e);
}
}

Expand Down Expand Up @@ -269,7 +269,7 @@ public static String queryObjectId(String objectName) {
return uniqueRecordId;
}

public static void updateObject(String id, String objectName) {
public static void updateObject(String id, String objectName) {
getAccessToken();
HttpClient httpClient = HttpClientBuilder.create().build();
String baseUri = loginInstanceUrl + REST_ENDPOINT + API_VERSION;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
create table `DATASET.TABLE_NAME` (Id STRING,Name STRING)
7 changes: 4 additions & 3 deletions src/e2e-test/resources/BigQuery/BigQueryCreateTableQuery.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
create table `DATASET.TABLE_NAME` (Name STRING, Col_Timestamp__c TIMESTAMP, Col_Date__c DATE, Col_Currency__c FLOAT64,
Col_Email__c STRING, Col_Number__c FLOAT64, Col_GeoLocation__Latitude__s FLOAT64,
Col_GeoLocation__Longitude__s FLOAT64, Col__c STRING, Col_Url__c STRING, Col_Time__c TIME, Col_Text__c STRING)
create table `DATASET.TABLE_NAME` (Id__c FLOAT64, Name STRING, Col_Timestamp__c TIMESTAMP, Col_Date__c DATE, Col_Currency__c FLOAT64,
Col_Email__c STRING, Col_Number__c FLOAT64, Col__c STRING, Col_Url__c STRING, Col_Time__c TIME, Col_Text__c STRING)


Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
insert into `DATASET.TABLE_NAME` (Id ,Name ) values
('abc','Test_PB' );

insert into `DATASET.TABLE_NAME` (Id ,Name ) values
('345','Test_PB1' );

insert into `DATASET.TABLE_NAME` (Id ,Name ) values
('768','Test_PB23' );
6 changes: 3 additions & 3 deletions src/e2e-test/resources/BigQuery/BigQueryInsertDataQuery.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
insert into `DATASET.TABLE_NAME` (Name, Col_Timestamp__c, Col_Date__c, Col_Currency__c, Col_Email__c, Col_Number__c,
Col_GeoLocation__Latitude__s, Col_GeoLocation__Longitude__s, Col__c, Col_Url__c, Col_Time__c, Col_Text__c) values
('adam','2019-03-10 04:50:01 UTC','2021-01-28',61.823765812,'[email protected]',898365444,37.794116,-122.3432,
insert into `DATASET.TABLE_NAME` (Id__c, Name, Col_Timestamp__c, Col_Date__c, Col_Currency__c, Col_Email__c, Col_Number__c,
Col__c, Col_Url__c, Col_Time__c, Col_Text__c) values
(786777,'adam','2019-03-10 04:50:01 UTC','2021-01-28',61.823765812,'[email protected]',-122.3432,
'984746334','abc/123','20:26:34','find');

2 changes: 2 additions & 0 deletions src/e2e-test/resources/errorMessage.properties
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,5 @@ required.property.topicname=Required property 'pushTopicName' has no value.

#Validation messages
validationSuccessMessage=No errors found.

errorMessagerecordprocessed= Total ///
25 changes: 16 additions & 9 deletions src/e2e-test/resources/pluginParameters.properties
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ invalid.admin.consumer.secret=lmnop891011
#SOQL Query
simple.query=SELECT Id, Name, Phone FROM Account
test.query=SELECT Id,Name,Col_Timestamp__c,Col_Date__c,Col_Currency__c,Col_Email__c,Col_Number__c,\
Col_GeoLocation__Latitude__s,Col_GeoLocation__Longitude__s,Col__c,Col_Url__c,Col_Time__c,Col_Text__c FROM Automation_custom__c
Col__c,Col_Url__c,Col_Time__c,Col_Text__c FROM Automation_custom__c
where.query=SELECT name FROM Opportunity WHERE StageName='Needs Analysis'
groupby.query=SELECT CampaignId, AVG(Amount) FROM Opportunity GROUP BY CampaignId
childtoparent.query=SELECT Id, Name, Account.Name FROM Contact WHERE Account.Industry = 'Chemicals'
Expand All @@ -29,8 +29,9 @@ opportunity.query=SELECT Id,Name,StageName,AccountId,LastViewedDate,LastReferenc
AccountId='0015j00000kN78QAAS'

sobject.Automation_custom_c=Automation_custom__c

#SObjects
sobject.account=Account
sobject.account=Automation_custom__c
Salesforce.sobjectName=Opportunity
sobject.lead=Lead
sobject.invalid=abc
Expand All @@ -50,7 +51,7 @@ hundred.million.records=100000000
topic.name=StreamingSalesforce
#Macros
sfmultisource.listofsobjects=Lead,Opportunity
sfmultisource.listofsobject=Automation_custom__c,Automation_custom2__c
sfmultisource.listofsobject=Automation_custom__c
sfmultisource.sobject.lead=Lead
sfmultisource.sobject.customobject=Automation_custom__c
sfmultisource.listofsobjectsforblacklist=Account,Contact
Expand Down Expand Up @@ -107,7 +108,7 @@ account.schema= [{"key":"Id","value":"string"},{"key":"IsDeleted","value":"boole
{"key":"ParentId","value":"string"},{"key":"BillingStreet","value":"string"},{"key":"BillingCity","value":"string"},\
{"key":"BillingState","value":"string"},{"key":"BillingPostalCode","value":"string"},\
{"key":"BillingCountry","value":"string"},\
{"key":"BillingLatitude","value":"double"},{"key":"BillingLongitude","value":"double"},\
{"key":"BillingLatitude","value":"double"},{"key":"Billingtest_automation_sfgitude","value":"double"},\
{"key":"BillingGeocodeAccuracy","value":"string"},{"key":"ShippingStreet","value":"string"},\
{"key":"ShippingCity","value":"string"},{"key":"ShippingState","value":"string"},\
{"key":"ShippingPostalCode","value":"string"},{"key":"ShippingCountry","value":"string"},\
Expand All @@ -126,15 +127,21 @@ account.schema= [{"key":"Id","value":"string"},{"key":"IsDeleted","value":"boole

CreateBQTableQueryFile=BigQuery/BigQueryCreateTableQuery.txt
InsertBQDataQueryFile=BigQuery/BigQueryInsertDataQuery.txt
BigQueryCreateTableFailureQuery=BigQuery/BigQueryCreateTableFailureQuery.txt
BigQueryInsertDataFailureQuery=BigQuery/BigQueryInsertDataFailureQuery.txt

bigQueryDatatypesColumnsList=(Id,Name,Col_Timestamp__c,Col_Date__c,Col_Currency__c,Col_Email__c,Col_Number__c,\
Col_GeoLocation__Latitude__s,Col_GeoLocation__Longitude__s,Col__c,Col_Url__c,Col_Time__c,Col_Text__c)
,Col__c,Col_Url__c,Col_Time__c,Col_Text__c)

testData={"Name":"hello","Col_Timestamp__c":"2023-06-14T07:04:56.000+0000","Col_Date__c":"2023-06-14",\
"Col_Currency__c":123.456,"Col_Email__c":"[email protected]","Col_Number__c":1008.0,\
"Col_GeoLocation__Latitude__s":37.794016,"Col_GeoLocation__Longitude__s":-122.395016,"Col__c":"984746334",\
"Col_Url__c":"abc/123","Col_Time__c":"05:00:00.000Z","Col_Text__c":"shsss"}
testData={"Id__c" : "123","Name":"hello","Col_Timestamp__c":"2023-06-14T07:04:56.000+0000","Col_Date__c":"2023-06-14","Col_Currency__c":567.789,"Col_Email__c":"[email protected]","Col_Number__c":1008.0,\
"Col__c":"984746334", "Col_Url__c":"abc/123","Col_Time__c":"05:00:00.000Z","Col_Text__c":"shsss"}

testObjectData={"Name":"dummy","Col_Timestamp__c":"2023-06-14T07:04:56.000+0000","Col_Date__c":"2023-06-14",\
"Col_Currency__c":567.789,"Col_Email__c":"[email protected]","Col_Number__c":1008.0,\
"Col_Url__c":"def/123","Col_Time__c":"05:00:00.000Z","Col_Text__c":"finder","Col_Phone__c":"123567"}
#testData={"Name":"hello","Col_Timestamp__c":"2023-06-14T07:04:56.000+0000","Col_Date__c":"2023-06-14",\
"Col_Currency__c":123.456,"Col_Email__c":"[email protected]","Col_Number__c":1008.0,\
"Col__c":"984746334","Col_Url__c":"abc/123","Col_Time__c":"05:00:00.000Z","Col_Text__c":"shsss"}

url = http://10.132.0.45:3128
UpsertColumnvalue=Id__c

0 comments on commit 6f361e3

Please sign in to comment.