From 6f361e37f1093fb3492a23289079e9352d074931 Mon Sep 17 00:00:00 2001 From: anilmahajan Date: Thu, 2 May 2024 11:14:11 +0530 Subject: [PATCH 1/2] Upsert Changes --- .../features/salesforcesink/RunTime.feature | 38 +++++++++++++++++++ .../plugin/tests/hooks/TestSetupHooks.java | 6 +++ .../cdap/plugin/utils/SalesforceClient.java | 16 ++++---- .../BigQueryCreateTableFailureQuery.txt | 1 + .../BigQuery/BigQueryCreateTableQuery.txt | 7 ++-- .../BigQueryInsertDataFailureQuery.txt | 8 ++++ .../BigQuery/BigQueryInsertDataQuery.txt | 6 +-- .../resources/errorMessage.properties | 2 + .../resources/pluginParameters.properties | 25 +++++++----- 9 files changed, 86 insertions(+), 23 deletions(-) create mode 100644 src/e2e-test/resources/BigQuery/BigQueryCreateTableFailureQuery.txt create mode 100644 src/e2e-test/resources/BigQuery/BigQueryInsertDataFailureQuery.txt diff --git a/src/e2e-test/features/salesforcesink/RunTime.feature b/src/e2e-test/features/salesforcesink/RunTime.feature index 90fc4938..0f32d215 100644 --- a/src/e2e-test/features/salesforcesink/RunTime.feature +++ b/src/e2e-test/features/salesforcesink/RunTime.feature @@ -132,3 +132,41 @@ Feature: Salesforce Sink - Run time Scenarios Then Verify the pipeline status is "Succeeded" Then Close the pipeline logs Then Validate the values of records transferred from Bigquery to Salesforce is equal + + @SINK-TS-SF-RNTM-04 @BQ_SOURCE_TEST @DELETE_TEST_DATA + Scenario: Verify user should be able to see ingest the records successfully using upsert operation + When Open Datafusion Project to configure pipeline + And Select plugin: "BigQuery" from the plugins list as: "Source" + And Navigate to the properties page of plugin: "BigQuery" + Then Replace input plugin property: "project" with value: "projectId" + Then Enter input plugin property: "datasetProject" with value: "projectId" + Then Enter input plugin property: "referenceName" with value: "BQReferenceName" + Then Enter input plugin property: "dataset" with value: "dataset" + Then Enter input plugin property: "table" with value: "bqSourceTable" + Then Validate "BigQuery" plugin properties + And Close the Plugin Properties page + And Select Sink plugin: "Salesforce" from the plugins list + And Connect plugins: "BigQuery" and "Salesforce" to establish connection + And Navigate to the properties page of plugin: "Salesforce" + And fill Authentication properties for Salesforce Admin user + Then Enter input plugin property: "referenceName" with value: "ReferenceName" + And Select radio button plugin property: "operation" with value: "upsert" + Then Enter input plugin property: "externalIdField" with value: "UpsertColumnvalue" + And Enter input plugin property: "sObject" with value: "sobject.account" + And Select dropdown plugin property: "errorHandling" with option value: "Skip on error" + Then Validate "Salesforce" plugin properties + And Close the Plugin Properties page + And Save the pipeline + And Preview and run the pipeline + Then Wait till pipeline preview is in running state + Then Open and capture pipeline preview logs + Then Verify the preview run status of pipeline in the logs is "succeeded" + Then Close the pipeline logs + Then Close the preview + Then Deploy the pipeline + Then Run the Pipeline in Runtime + Then Wait till pipeline is in running state + Then Open and capture logs + Then Verify the pipeline status is "Succeeded" + Then Close the pipeline logs + Then Validate the values of records transferred from Bigquery to Salesforce is equal diff --git a/src/e2e-test/java/io/cdap/plugin/tests/hooks/TestSetupHooks.java b/src/e2e-test/java/io/cdap/plugin/tests/hooks/TestSetupHooks.java index 69064110..906a441c 100644 --- a/src/e2e-test/java/io/cdap/plugin/tests/hooks/TestSetupHooks.java +++ b/src/e2e-test/java/io/cdap/plugin/tests/hooks/TestSetupHooks.java @@ -84,6 +84,12 @@ public static void createTempSourceBQTable() throws IOException, InterruptedExce PluginPropertyUtils.pluginProp("InsertBQDataQueryFile")); } + @Before(order = 1, value = "@BQ_FAILURE_TEST") + public static void createTempSourceInvalidBQTable() throws IOException, InterruptedException { + createSourceBQTableWithQueries(PluginPropertyUtils.pluginProp("BigQueryCreateTableFailureQuery"), + PluginPropertyUtils.pluginProp("BigQueryInsertDataFailureQuery")); + } + @After(order = 1, value = "@BQ_TEMP_CLEANUP") public static void deleteTemperoryCreatedBQTable() throws IOException, InterruptedException { String bqTargetTableName = PluginPropertyUtils.pluginProp("bqTargetTable") + "_v1"; diff --git a/src/e2e-test/java/io/cdap/plugin/utils/SalesforceClient.java b/src/e2e-test/java/io/cdap/plugin/utils/SalesforceClient.java index 466b714b..889150ad 100644 --- a/src/e2e-test/java/io/cdap/plugin/utils/SalesforceClient.java +++ b/src/e2e-test/java/io/cdap/plugin/utils/SalesforceClient.java @@ -178,14 +178,14 @@ public static List queryObject(String id, String objectName) { public static void deletePushTopic(String pushTopicName) { try { PartnerConnection partnerConnection = new PartnerConnection( - Authenticator.createConnectorConfig(AuthenticatorCredentials.fromParameters(USERNAME, PASSWORD + SECURITYTOKEN, - CLIENTID, CLIENTSECRET, PluginPropertyUtils. - pluginProp("login.url"), - 30000, 3600, ""))); + Authenticator.createConnectorConfig(AuthenticatorCredentials.fromParameters(USERNAME, PASSWORD + SECURITYTOKEN, + CLIENTID, CLIENTSECRET, PluginPropertyUtils. + pluginProp("login.url"), + 30000, 3600, ""))); QueryResult queryResult = SalesforceStreamingSourceConfig.runQuery( - partnerConnection, - String.format("SELECT Id FROM PushTopic WHERE Name = '%s'", pushTopicName) + partnerConnection, + String.format("SELECT Id FROM PushTopic WHERE Name = '%s'", pushTopicName) ); SObject sobject = queryResult.getRecords()[0]; @@ -204,7 +204,7 @@ public static void deletePushTopic(String pushTopicName) { } catch (ConnectionException e) { String message = SalesforceConnectionUtil.getSalesforceErrorMessageFromException(e); throw new InvalidStageException( - String.format("Cannot connect to Salesforce API with credentials specified due to error: %s", message), e); + String.format("Cannot connect to Salesforce API with credentials specified due to error: %s", message), e); } } @@ -269,7 +269,7 @@ public static String queryObjectId(String objectName) { return uniqueRecordId; } - public static void updateObject(String id, String objectName) { + public static void updateObject(String id, String objectName) { getAccessToken(); HttpClient httpClient = HttpClientBuilder.create().build(); String baseUri = loginInstanceUrl + REST_ENDPOINT + API_VERSION; diff --git a/src/e2e-test/resources/BigQuery/BigQueryCreateTableFailureQuery.txt b/src/e2e-test/resources/BigQuery/BigQueryCreateTableFailureQuery.txt new file mode 100644 index 00000000..65f51b11 --- /dev/null +++ b/src/e2e-test/resources/BigQuery/BigQueryCreateTableFailureQuery.txt @@ -0,0 +1 @@ +create table `DATASET.TABLE_NAME` (Id STRING,Name STRING) \ No newline at end of file diff --git a/src/e2e-test/resources/BigQuery/BigQueryCreateTableQuery.txt b/src/e2e-test/resources/BigQuery/BigQueryCreateTableQuery.txt index 17d217bd..33c636f0 100644 --- a/src/e2e-test/resources/BigQuery/BigQueryCreateTableQuery.txt +++ b/src/e2e-test/resources/BigQuery/BigQueryCreateTableQuery.txt @@ -1,3 +1,4 @@ -create table `DATASET.TABLE_NAME` (Name STRING, Col_Timestamp__c TIMESTAMP, Col_Date__c DATE, Col_Currency__c FLOAT64, - Col_Email__c STRING, Col_Number__c FLOAT64, Col_GeoLocation__Latitude__s FLOAT64, -Col_GeoLocation__Longitude__s FLOAT64, Col__c STRING, Col_Url__c STRING, Col_Time__c TIME, Col_Text__c STRING) +create table `DATASET.TABLE_NAME` (Id__c FLOAT64, Name STRING, Col_Timestamp__c TIMESTAMP, Col_Date__c DATE, Col_Currency__c FLOAT64, + Col_Email__c STRING, Col_Number__c FLOAT64, Col__c STRING, Col_Url__c STRING, Col_Time__c TIME, Col_Text__c STRING) + + diff --git a/src/e2e-test/resources/BigQuery/BigQueryInsertDataFailureQuery.txt b/src/e2e-test/resources/BigQuery/BigQueryInsertDataFailureQuery.txt new file mode 100644 index 00000000..0e8b3573 --- /dev/null +++ b/src/e2e-test/resources/BigQuery/BigQueryInsertDataFailureQuery.txt @@ -0,0 +1,8 @@ +insert into `DATASET.TABLE_NAME` (Id ,Name ) values +('abc','Test_PB' ); + +insert into `DATASET.TABLE_NAME` (Id ,Name ) values +('345','Test_PB1' ); + +insert into `DATASET.TABLE_NAME` (Id ,Name ) values +('768','Test_PB23' ); \ No newline at end of file diff --git a/src/e2e-test/resources/BigQuery/BigQueryInsertDataQuery.txt b/src/e2e-test/resources/BigQuery/BigQueryInsertDataQuery.txt index 6dd90956..7f7b62c5 100644 --- a/src/e2e-test/resources/BigQuery/BigQueryInsertDataQuery.txt +++ b/src/e2e-test/resources/BigQuery/BigQueryInsertDataQuery.txt @@ -1,5 +1,5 @@ -insert into `DATASET.TABLE_NAME` (Name, Col_Timestamp__c, Col_Date__c, Col_Currency__c, Col_Email__c, Col_Number__c, -Col_GeoLocation__Latitude__s, Col_GeoLocation__Longitude__s, Col__c, Col_Url__c, Col_Time__c, Col_Text__c) values -('adam','2019-03-10 04:50:01 UTC','2021-01-28',61.823765812,'skfdsfds@gmail.com',898365444,37.794116,-122.3432, +insert into `DATASET.TABLE_NAME` (Id__c, Name, Col_Timestamp__c, Col_Date__c, Col_Currency__c, Col_Email__c, Col_Number__c, + Col__c, Col_Url__c, Col_Time__c, Col_Text__c) values +(786777,'adam','2019-03-10 04:50:01 UTC','2021-01-28',61.823765812,'skfdsfds@gmail.com',-122.3432, '984746334','abc/123','20:26:34','find'); diff --git a/src/e2e-test/resources/errorMessage.properties b/src/e2e-test/resources/errorMessage.properties index 82e1fcbe..939b65e0 100644 --- a/src/e2e-test/resources/errorMessage.properties +++ b/src/e2e-test/resources/errorMessage.properties @@ -28,3 +28,5 @@ required.property.topicname=Required property 'pushTopicName' has no value. #Validation messages validationSuccessMessage=No errors found. + +errorMessagerecordprocessed= Total /// diff --git a/src/e2e-test/resources/pluginParameters.properties b/src/e2e-test/resources/pluginParameters.properties index 625132d1..6bd45a5a 100644 --- a/src/e2e-test/resources/pluginParameters.properties +++ b/src/e2e-test/resources/pluginParameters.properties @@ -18,7 +18,7 @@ invalid.admin.consumer.secret=lmnop891011 #SOQL Query simple.query=SELECT Id, Name, Phone FROM Account test.query=SELECT Id,Name,Col_Timestamp__c,Col_Date__c,Col_Currency__c,Col_Email__c,Col_Number__c,\ - Col_GeoLocation__Latitude__s,Col_GeoLocation__Longitude__s,Col__c,Col_Url__c,Col_Time__c,Col_Text__c FROM Automation_custom__c + Col__c,Col_Url__c,Col_Time__c,Col_Text__c FROM Automation_custom__c where.query=SELECT name FROM Opportunity WHERE StageName='Needs Analysis' groupby.query=SELECT CampaignId, AVG(Amount) FROM Opportunity GROUP BY CampaignId childtoparent.query=SELECT Id, Name, Account.Name FROM Contact WHERE Account.Industry = 'Chemicals' @@ -29,8 +29,9 @@ opportunity.query=SELECT Id,Name,StageName,AccountId,LastViewedDate,LastReferenc AccountId='0015j00000kN78QAAS' sobject.Automation_custom_c=Automation_custom__c + #SObjects -sobject.account=Account +sobject.account=Automation_custom__c Salesforce.sobjectName=Opportunity sobject.lead=Lead sobject.invalid=abc @@ -50,7 +51,7 @@ hundred.million.records=100000000 topic.name=StreamingSalesforce #Macros sfmultisource.listofsobjects=Lead,Opportunity -sfmultisource.listofsobject=Automation_custom__c,Automation_custom2__c +sfmultisource.listofsobject=Automation_custom__c sfmultisource.sobject.lead=Lead sfmultisource.sobject.customobject=Automation_custom__c sfmultisource.listofsobjectsforblacklist=Account,Contact @@ -107,7 +108,7 @@ account.schema= [{"key":"Id","value":"string"},{"key":"IsDeleted","value":"boole {"key":"ParentId","value":"string"},{"key":"BillingStreet","value":"string"},{"key":"BillingCity","value":"string"},\ {"key":"BillingState","value":"string"},{"key":"BillingPostalCode","value":"string"},\ {"key":"BillingCountry","value":"string"},\ - {"key":"BillingLatitude","value":"double"},{"key":"BillingLongitude","value":"double"},\ + {"key":"BillingLatitude","value":"double"},{"key":"Billingtest_automation_sfgitude","value":"double"},\ {"key":"BillingGeocodeAccuracy","value":"string"},{"key":"ShippingStreet","value":"string"},\ {"key":"ShippingCity","value":"string"},{"key":"ShippingState","value":"string"},\ {"key":"ShippingPostalCode","value":"string"},{"key":"ShippingCountry","value":"string"},\ @@ -126,15 +127,21 @@ account.schema= [{"key":"Id","value":"string"},{"key":"IsDeleted","value":"boole CreateBQTableQueryFile=BigQuery/BigQueryCreateTableQuery.txt InsertBQDataQueryFile=BigQuery/BigQueryInsertDataQuery.txt +BigQueryCreateTableFailureQuery=BigQuery/BigQueryCreateTableFailureQuery.txt +BigQueryInsertDataFailureQuery=BigQuery/BigQueryInsertDataFailureQuery.txt bigQueryDatatypesColumnsList=(Id,Name,Col_Timestamp__c,Col_Date__c,Col_Currency__c,Col_Email__c,Col_Number__c,\ - Col_GeoLocation__Latitude__s,Col_GeoLocation__Longitude__s,Col__c,Col_Url__c,Col_Time__c,Col_Text__c) + ,Col__c,Col_Url__c,Col_Time__c,Col_Text__c) -testData={"Name":"hello","Col_Timestamp__c":"2023-06-14T07:04:56.000+0000","Col_Date__c":"2023-06-14",\ - "Col_Currency__c":123.456,"Col_Email__c":"hello443@gmail.com","Col_Number__c":1008.0,\ - "Col_GeoLocation__Latitude__s":37.794016,"Col_GeoLocation__Longitude__s":-122.395016,"Col__c":"984746334",\ - "Col_Url__c":"abc/123","Col_Time__c":"05:00:00.000Z","Col_Text__c":"shsss"} +testData={"Id__c" : "123","Name":"hello","Col_Timestamp__c":"2023-06-14T07:04:56.000+0000","Col_Date__c":"2023-06-14","Col_Currency__c":567.789,"Col_Email__c":"dummy443@gmail.com","Col_Number__c":1008.0,\ + "Col__c":"984746334", "Col_Url__c":"abc/123","Col_Time__c":"05:00:00.000Z","Col_Text__c":"shsss"} testObjectData={"Name":"dummy","Col_Timestamp__c":"2023-06-14T07:04:56.000+0000","Col_Date__c":"2023-06-14",\ "Col_Currency__c":567.789,"Col_Email__c":"dummy443@gmail.com","Col_Number__c":1008.0,\ "Col_Url__c":"def/123","Col_Time__c":"05:00:00.000Z","Col_Text__c":"finder","Col_Phone__c":"123567"} +#testData={"Name":"hello","Col_Timestamp__c":"2023-06-14T07:04:56.000+0000","Col_Date__c":"2023-06-14",\ + "Col_Currency__c":123.456,"Col_Email__c":"hello443@gmail.com","Col_Number__c":1008.0,\ + "Col__c":"984746334","Col_Url__c":"abc/123","Col_Time__c":"05:00:00.000Z","Col_Text__c":"shsss"} + +url = http://10.132.0.45:3128 +UpsertColumnvalue=Id__c From aa01eb64509328c635c00e7a44c0553a60fdc41b Mon Sep 17 00:00:00 2001 From: anilmahajan Date: Thu, 3 Oct 2024 14:37:59 +0530 Subject: [PATCH 2/2] Addressed comments --- .../java/io/cdap/plugin/tests/hooks/TestSetupHooks.java | 6 ------ src/e2e-test/resources/pluginParameters.properties | 2 -- 2 files changed, 8 deletions(-) diff --git a/src/e2e-test/java/io/cdap/plugin/tests/hooks/TestSetupHooks.java b/src/e2e-test/java/io/cdap/plugin/tests/hooks/TestSetupHooks.java index 906a441c..69064110 100644 --- a/src/e2e-test/java/io/cdap/plugin/tests/hooks/TestSetupHooks.java +++ b/src/e2e-test/java/io/cdap/plugin/tests/hooks/TestSetupHooks.java @@ -84,12 +84,6 @@ public static void createTempSourceBQTable() throws IOException, InterruptedExce PluginPropertyUtils.pluginProp("InsertBQDataQueryFile")); } - @Before(order = 1, value = "@BQ_FAILURE_TEST") - public static void createTempSourceInvalidBQTable() throws IOException, InterruptedException { - createSourceBQTableWithQueries(PluginPropertyUtils.pluginProp("BigQueryCreateTableFailureQuery"), - PluginPropertyUtils.pluginProp("BigQueryInsertDataFailureQuery")); - } - @After(order = 1, value = "@BQ_TEMP_CLEANUP") public static void deleteTemperoryCreatedBQTable() throws IOException, InterruptedException { String bqTargetTableName = PluginPropertyUtils.pluginProp("bqTargetTable") + "_v1"; diff --git a/src/e2e-test/resources/pluginParameters.properties b/src/e2e-test/resources/pluginParameters.properties index 6bd45a5a..9eb21cf0 100644 --- a/src/e2e-test/resources/pluginParameters.properties +++ b/src/e2e-test/resources/pluginParameters.properties @@ -142,6 +142,4 @@ testObjectData={"Name":"dummy","Col_Timestamp__c":"2023-06-14T07:04:56.000+0000" #testData={"Name":"hello","Col_Timestamp__c":"2023-06-14T07:04:56.000+0000","Col_Date__c":"2023-06-14",\ "Col_Currency__c":123.456,"Col_Email__c":"hello443@gmail.com","Col_Number__c":1008.0,\ "Col__c":"984746334","Col_Url__c":"abc/123","Col_Time__c":"05:00:00.000Z","Col_Text__c":"shsss"} - -url = http://10.132.0.45:3128 UpsertColumnvalue=Id__c