diff --git a/src/e2e-test/features/bigquery/sink/BigQueryToBigQueryAdditional.feature b/src/e2e-test/features/bigquery/sink/BigQueryToBigQueryAdditional.feature new file mode 100644 index 0000000000..3328fff842 --- /dev/null +++ b/src/e2e-test/features/bigquery/sink/BigQueryToBigQueryAdditional.feature @@ -0,0 +1,502 @@ +# Copyright © 2023 Cask Data, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may not +# use this file except in compliance with the License. You may obtain a copy of +# the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations under +# the License. + +@BigQuery_Sink +Feature: BigQuery sink - Verification of BigQuery to BigQuery successful data transfer + +@BQ_UPSERT_SOURCE_TEST @BQ_UPSERT_SINK_TEST @EXISTING_BQ_CONNECTION +Scenario:Validate successful records transfer from BigQuery source to BigQuery sink with Upsert operation by updating destination table schema and destination table exists with records in it. +Given Open Datafusion Project to configure pipeline +When Expand Plugin group in the LHS plugins list: "Source" +When Select plugin: "BigQuery" from the plugins list as: "Source" +When Expand Plugin group in the LHS plugins list: "Sink" +When Select plugin: "BigQuery" from the plugins list as: "Sink" +Then Connect plugins: "BigQuery" and "BigQuery2" to establish connection +Then Navigate to the properties page of plugin: "BigQuery" +Then Click plugin property: "switch-useConnection" +Then Click on the Browse Connections button +Then Select connection: "bqConnectionName" +Then Click on the Browse button inside plugin properties +Then Select connection data row with name: "dataset" +Then Select connection data row with name: "bqSourceTable" +Then Wait till connection data loading completes with a timeout of 60 seconds +Then Verify input plugin property: "dataset" contains value: "dataset" +Then Verify input plugin property: "table" contains value: "bqSourceTable" +Then Click on the Get Schema button +Then Validate "BigQuery" plugin properties +And Close the Plugin Properties page +Then Navigate to the properties page of plugin: "BigQuery2" +Then Click plugin property: "useConnection" +Then Click on the Browse Connections button +Then Select connection: "bqConnectionName" +Then Enter input plugin property: "referenceName" with value: "BQSinkReferenceName" +Then Click on the Browse button inside plugin properties +Then Click SELECT button inside connection data row with name: "dataset" +Then Wait till connection data loading completes with a timeout of 60 seconds +Then Verify input plugin property: "dataset" contains value: "dataset" +Then Enter input plugin property: "table" with value: "bqTargetTable" +And Select radio button plugin property: "operation" with value: "upsert" +Then Click on the Add Button of the property: "relationTableKey" with value: +| TableKeyUpsert | +Then Click plugin property: "updateTableSchema" +Then Validate "BigQuery" plugin properties +And Close the Plugin Properties page +Then Save the pipeline +Then Preview and run the pipeline +Then Wait till pipeline preview is in running state +Then Open and capture pipeline preview logs +Then Verify the preview run status of pipeline in the logs is "succeeded" +Then Close the pipeline logs +Then Close the preview +Then Deploy the pipeline +Then Run the Pipeline in Runtime +Then Wait till pipeline is in running state +Then Open and capture logs +Then Close the pipeline logs +Then Verify the pipeline status is "Succeeded" +Then Validate the data transferred from BigQuery to BigQuery with actual And expected file for: "bqUpsertExpectedFile" + +@BQ_NULL_MODE_SOURCE_TEST @BQ_SINK_TEST @EXISTING_BQ_CONNECTION +Scenario: Validate Successful record transfer from BigQuery source plugin to BigQuery sink plugin having all null values in one column and few null values in another column in Source table +Given Open Datafusion Project to configure pipeline +When Expand Plugin group in the LHS plugins list: "Source" +When Select plugin: "BigQuery" from the plugins list as: "Source" +When Expand Plugin group in the LHS plugins list: "Sink" +When Select plugin: "BigQuery" from the plugins list as: "Sink" +Then Connect plugins: "BigQuery" and "BigQuery2" to establish connection +Then Navigate to the properties page of plugin: "BigQuery" +Then Click plugin property: "switch-useConnection" +Then Click on the Browse Connections button +Then Select connection: "bqConnectionName" +Then Click on the Browse button inside plugin properties +Then Select connection data row with name: "dataset" +Then Select connection data row with name: "bqSourceTable" +Then Wait till connection data loading completes with a timeout of 60 seconds +Then Verify input plugin property: "dataset" contains value: "dataset" +Then Verify input plugin property: "table" contains value: "bqSourceTable" +Then Click on the Get Schema button +Then Validate "BigQuery" plugin properties +And Close the Plugin Properties page +Then Navigate to the properties page of plugin: "BigQuery2" +Then Click plugin property: "useConnection" +Then Click on the Browse Connections button +Then Select connection: "bqConnectionName" +Then Enter input plugin property: "referenceName" with value: "BQSinkReferenceName" +Then Click on the Browse button inside plugin properties +Then Click SELECT button inside connection data row with name: "dataset" +Then Wait till connection data loading completes with a timeout of 60 seconds +Then Verify input plugin property: "dataset" contains value: "dataset" +Then Enter input plugin property: "table" with value: "bqTargetTable" +Then Validate "BigQuery" plugin properties +Then Close the BigQuery properties +Then Save the pipeline +Then Preview and run the pipeline +Then Wait till pipeline preview is in running state +Then Open and capture pipeline preview logs +Then Verify the preview run status of pipeline in the logs is "succeeded" +Then Close the pipeline logs +Then Close the preview +Then Deploy the pipeline +Then Run the Pipeline in Runtime +Then Wait till pipeline is in running state +Then Open and capture logs +Then Close the pipeline logs +Then Verify the pipeline status is "Succeeded" +Then Validate the values of records transferred to BQ sink is equal to the values from source BigQuery table + +@BQ_UPDATE_SOURCE_DEDUPE_TEST @BQ_UPDATE_SINK_DEDUPE_TEST @EXISTING_BQ_CONNECTION +Scenario: Verify successful record transfer from BigQuery source to BigQuery sink using advance operation update with Dedupe By Property. +Given Open Datafusion Project to configure pipeline +When Expand Plugin group in the LHS plugins list: "Source" +When Select plugin: "BigQuery" from the plugins list as: "Source" +When Expand Plugin group in the LHS plugins list: "Sink" +When Select plugin: "BigQuery" from the plugins list as: "Sink" +Then Connect plugins: "BigQuery" and "BigQuery2" to establish connection +Then Navigate to the properties page of plugin: "BigQuery" +Then Click plugin property: "switch-useConnection" +Then Click on the Browse Connections button +Then Select connection: "bqConnectionName" +Then Click on the Browse button inside plugin properties +Then Select connection data row with name: "dataset" +Then Select connection data row with name: "bqSourceTable" +Then Wait till connection data loading completes with a timeout of 60 seconds +Then Verify input plugin property: "dataset" contains value: "dataset" +Then Verify input plugin property: "table" contains value: "bqSourceTable" +Then Click on the Get Schema button +Then Validate "BigQuery" plugin properties +And Close the Plugin Properties page +Then Navigate to the properties page of plugin: "BigQuery2" +Then Click plugin property: "useConnection" +Then Click on the Browse Connections button +Then Select connection: "bqConnectionName" +Then Enter input plugin property: "referenceName" with value: "BQSinkReferenceName" +Then Click on the Browse button inside plugin properties +Then Click SELECT button inside connection data row with name: "dataset" +Then Wait till connection data loading completes with a timeout of 60 seconds +Then Verify input plugin property: "dataset" contains value: "dataset" +Then Enter input plugin property: "table" with value: "bqTargetTable" +And Select radio button plugin property: "operation" with value: "update" +Then Enter Value for plugin property table key : "relationTableKey" with values: "relationTableKeyValue" +Then Select dropdown plugin property: "dedupeBy" with option value: "dedupeByOrder" +Then Enter key for plugin property: "dedupeBy" with values: "dedupeByValue" +Then Validate "BigQuery" plugin properties +Then Close the BigQuery properties +Then Save the pipeline +Then Preview and run the pipeline +Then Wait till pipeline preview is in running state +Then Open and capture pipeline preview logs +Then Verify the preview run status of pipeline in the logs is "succeeded" +Then Close the pipeline logs +Then Close the preview +Then Deploy the pipeline +Then Run the Pipeline in Runtime +Then Wait till pipeline is in running state +Then Open and capture logs +Then Close the pipeline logs +Then Verify the pipeline status is "Succeeded" +Then Validate the data transferred from BigQuery to BigQuery with actual And expected file for: "bqUpdateDedupeExpectedFile" + +@BQ_INSERT_INT_SOURCE_TEST @BQ_EXISTING_SINK_TEST @EXISTING_BQ_CONNECTION +Scenario: Verify successful record transfer for the Insert operation with partition type Integer and destination table is existing already. +Given Open Datafusion Project to configure pipeline +When Expand Plugin group in the LHS plugins list: "Source" +When Select plugin: "BigQuery" from the plugins list as: "Source" +When Expand Plugin group in the LHS plugins list: "Sink" +When Select plugin: "BigQuery" from the plugins list as: "Sink" +Then Connect plugins: "BigQuery" and "BigQuery2" to establish connection +Then Navigate to the properties page of plugin: "BigQuery" +Then Click plugin property: "switch-useConnection" +Then Click on the Browse Connections button +Then Select connection: "bqConnectionName" +Then Click on the Browse button inside plugin properties +Then Select connection data row with name: "dataset" +Then Select connection data row with name: "bqSourceTable" +Then Wait till connection data loading completes with a timeout of 60 seconds +Then Verify input plugin property: "dataset" contains value: "dataset" +Then Verify input plugin property: "table" contains value: "bqSourceTable" +Then Click on the Get Schema button +Then Validate "BigQuery" plugin properties +And Close the Plugin Properties page +Then Navigate to the properties page of plugin: "BigQuery2" +Then Click plugin property: "switch-useConnection" +Then Click on the Browse Connections button +Then Select connection: "bqConnectionName" +Then Click on the Browse button inside plugin properties +Then Select connection data row with name: "dataset" +Then Select connection data row with name: "bqTargetTable" +Then Wait till connection data loading completes with a timeout of 60 seconds +Then Verify input plugin property: "dataset" contains value: "dataset" +Then Verify input plugin property: "table" contains value: "bqTargetTable" +Then Select BigQuery sink property partitioning type as "INTEGER" +Then Enter input plugin property: "rangeStart" with value: "rangeStartValue" +Then Enter input plugin property: "rangeEnd" with value: "rangeEndValue" +Then Enter input plugin property: "rangeInterval" with value: "rangeIntervalValue" +Then Enter input plugin property: "partitionByField" with value: "partitionByFieldValue" +Then Validate "BigQuery" plugin properties +Then Close the BigQuery properties +Then Save the pipeline +Then Preview and run the pipeline +Then Wait till pipeline preview is in running state +Then Open and capture pipeline preview logs +Then Verify the preview run status of pipeline in the logs is "succeeded" +Then Close the pipeline logs +Then Close the preview +Then Deploy the pipeline +Then Run the Pipeline in Runtime +Then Wait till pipeline is in running state +Then Open and capture logs +Then Close the pipeline logs +Then Verify the pipeline status is "Succeeded" +Then Validate the data transferred from BigQuery to BigQuery with actual And expected file for: "bqInsertExpectedFile" + +@BQ_TIME_SOURCE_TEST @BQ_SINK_TEST @EXISTING_BQ_CONNECTION +Scenario: Verify successful record transfer for the Insert operation from BigQuery source plugin to BigQuery sink with partition type Time and partition field is date. +Given Open Datafusion Project to configure pipeline +When Expand Plugin group in the LHS plugins list: "Source" +When Select plugin: "BigQuery" from the plugins list as: "Source" +When Expand Plugin group in the LHS plugins list: "Sink" +When Select plugin: "BigQuery" from the plugins list as: "Sink" +Then Connect plugins: "BigQuery" and "BigQuery2" to establish connection +Then Navigate to the properties page of plugin: "BigQuery" +Then Click plugin property: "switch-useConnection" +Then Click on the Browse Connections button +Then Select connection: "bqConnectionName" +Then Click on the Browse button inside plugin properties +Then Select connection data row with name: "dataset" +Then Select connection data row with name: "bqSourceTable" +Then Wait till connection data loading completes with a timeout of 60 seconds +Then Verify input plugin property: "dataset" contains value: "dataset" +Then Verify input plugin property: "table" contains value: "bqSourceTable" +Then Click on the Get Schema button +Then Validate "BigQuery" plugin properties +And Close the Plugin Properties page +Then Navigate to the properties page of plugin: "BigQuery2" +Then Click plugin property: "useConnection" +Then Click on the Browse Connections button +Then Select connection: "bqConnectionName" +Then Enter input plugin property: "referenceName" with value: "BQSinkReferenceName" +Then Click on the Browse button inside plugin properties +Then Click SELECT button inside connection data row with name: "dataset" +Then Wait till connection data loading completes with a timeout of 60 seconds +Then Verify input plugin property: "dataset" contains value: "dataset" +Then Enter input plugin property: "table" with value: "bqTargetTable" +Then Enter input plugin property: "partitionByField" with value: "bqPartitionFieldDate" +Then Click plugin property: "updateTableSchema" +Then Validate "BigQuery" plugin properties +Then Close the BigQuery properties +Then Save the pipeline +Then Preview and run the pipeline +Then Wait till pipeline preview is in running state +Then Open and capture pipeline preview logs +Then Verify the preview run status of pipeline in the logs is "succeeded" +Then Close the pipeline logs +Then Close the preview +Then Deploy the pipeline +Then Run the Pipeline in Runtime +Then Wait till pipeline is in running state +Then Open and capture logs +Then Close the pipeline logs +Then Verify the pipeline status is "Succeeded" +Then Validate the data transferred from BigQuery to BigQuery with actual And expected file for: "bqDateExpectedFile" + +@BQ_TIME_SOURCE_TEST @BQ_SINK_TEST @EXISTING_BQ_CONNECTION +Scenario: Verify successful record transfer for the Insert operation from BigQuery source plugin to BigQuery sink with partition type Time and partition field is datetime. +Given Open Datafusion Project to configure pipeline +When Expand Plugin group in the LHS plugins list: "Source" +When Select plugin: "BigQuery" from the plugins list as: "Source" +When Expand Plugin group in the LHS plugins list: "Sink" +When Select plugin: "BigQuery" from the plugins list as: "Sink" +Then Connect plugins: "BigQuery" and "BigQuery2" to establish connection +Then Navigate to the properties page of plugin: "BigQuery" +Then Click plugin property: "switch-useConnection" +Then Click on the Browse Connections button +Then Select connection: "bqConnectionName" +Then Click on the Browse button inside plugin properties +Then Select connection data row with name: "dataset" +Then Select connection data row with name: "bqSourceTable" +Then Wait till connection data loading completes with a timeout of 60 seconds +Then Verify input plugin property: "dataset" contains value: "dataset" +Then Verify input plugin property: "table" contains value: "bqSourceTable" +Then Click on the Get Schema button +Then Validate "BigQuery" plugin properties +And Close the Plugin Properties page +Then Navigate to the properties page of plugin: "BigQuery2" +Then Click plugin property: "useConnection" +Then Click on the Browse Connections button +Then Select connection: "bqConnectionName" +Then Enter input plugin property: "referenceName" with value: "BQSinkReferenceName" +Then Click on the Browse button inside plugin properties +Then Click SELECT button inside connection data row with name: "dataset" +Then Wait till connection data loading completes with a timeout of 60 seconds +Then Verify input plugin property: "dataset" contains value: "dataset" +Then Enter input plugin property: "table" with value: "bqTargetTable" +Then Enter input plugin property: "partitionByField" with value: "bqPartitionFieldDateTime" +Then Click plugin property: "updateTableSchema" +Then Validate "BigQuery" plugin properties +Then Close the BigQuery properties +Then Save the pipeline +Then Preview and run the pipeline +Then Wait till pipeline preview is in running state +Then Open and capture pipeline preview logs +Then Verify the preview run status of pipeline in the logs is "succeeded" +Then Close the pipeline logs +Then Close the preview +Then Deploy the pipeline +Then Run the Pipeline in Runtime +Then Wait till pipeline is in running state +Then Open and capture logs +Then Close the pipeline logs +Then Verify the pipeline status is "Succeeded" +Then Validate the data transferred from BigQuery to BigQuery with actual And expected file for: "bqDateTimeExpectedFile" + +@BQ_TIME_SOURCE_TEST @BQ_SINK_TEST @EXISTING_BQ_CONNECTION +Scenario: Verify successful record transfer for the Insert operation from BigQuery source plugin to BigQuery sink with partition type Time and partition field is timestamp. +Given Open Datafusion Project to configure pipeline +When Expand Plugin group in the LHS plugins list: "Source" +When Select plugin: "BigQuery" from the plugins list as: "Source" +When Expand Plugin group in the LHS plugins list: "Sink" +When Select plugin: "BigQuery" from the plugins list as: "Sink" +Then Connect plugins: "BigQuery" and "BigQuery2" to establish connection +Then Navigate to the properties page of plugin: "BigQuery" +Then Click plugin property: "switch-useConnection" +Then Click on the Browse Connections button +Then Select connection: "bqConnectionName" +Then Click on the Browse button inside plugin properties +Then Select connection data row with name: "dataset" +Then Select connection data row with name: "bqSourceTable" +Then Wait till connection data loading completes with a timeout of 60 seconds +Then Verify input plugin property: "dataset" contains value: "dataset" +Then Verify input plugin property: "table" contains value: "bqSourceTable" +Then Click on the Get Schema button +Then Validate "BigQuery" plugin properties +And Close the Plugin Properties page +Then Navigate to the properties page of plugin: "BigQuery2" +Then Click plugin property: "useConnection" +Then Click on the Browse Connections button +Then Select connection: "bqConnectionName" +Then Enter input plugin property: "referenceName" with value: "BQSinkReferenceName" +Then Click on the Browse button inside plugin properties +Then Click SELECT button inside connection data row with name: "dataset" +Then Wait till connection data loading completes with a timeout of 60 seconds +Then Verify input plugin property: "dataset" contains value: "dataset" +Then Enter input plugin property: "table" with value: "bqTargetTable" +Then Enter input plugin property: "partitionByField" with value: "bqPartitionFieldTimeStamp" +Then Click plugin property: "updateTableSchema" +Then Validate "BigQuery" plugin properties +Then Close the BigQuery properties +Then Save the pipeline +Then Preview and run the pipeline +Then Wait till pipeline preview is in running state +Then Open and capture pipeline preview logs +Then Verify the preview run status of pipeline in the logs is "succeeded" +Then Close the pipeline logs +Then Close the preview +Then Deploy the pipeline +Then Run the Pipeline in Runtime +Then Wait till pipeline is in running state +Then Open and capture logs +Then Close the pipeline logs +Then Verify the pipeline status is "Succeeded" +Then Validate the data transferred from BigQuery to BigQuery with actual And expected file for: "bqTimeStampExpectedFile" + +@BQ_UPSERT_DEDUPE_SOURCE_TEST @BQ_UPSERT_DEDUPE_SINK_TEST @EXISTING_BQ_CONNECTION +Scenario:Validate successful records transfer from BigQuery source to BigQuery sink with Upsert operation with dedupe source data and existing destination table where update table schema is set to false +Given Open Datafusion Project to configure pipeline +When Expand Plugin group in the LHS plugins list: "Source" +When Select plugin: "BigQuery" from the plugins list as: "Source" +When Expand Plugin group in the LHS plugins list: "Sink" +When Select plugin: "BigQuery" from the plugins list as: "Sink" +Then Connect plugins: "BigQuery" and "BigQuery2" to establish connection +Then Navigate to the properties page of plugin: "BigQuery" +Then Click plugin property: "switch-useConnection" +Then Click on the Browse Connections button +Then Select connection: "bqConnectionName" +Then Click on the Browse button inside plugin properties +Then Select connection data row with name: "dataset" +Then Select connection data row with name: "bqSourceTable" +Then Wait till connection data loading completes with a timeout of 60 seconds +Then Verify input plugin property: "dataset" contains value: "dataset" +Then Verify input plugin property: "table" contains value: "bqSourceTable" +Then Click on the Get Schema button +Then Validate "BigQuery" plugin properties +And Close the Plugin Properties page +Then Navigate to the properties page of plugin: "BigQuery2" +Then Click plugin property: "useConnection" +Then Click on the Browse Connections button +Then Select connection: "bqConnectionName" +Then Enter input plugin property: "referenceName" with value: "BQSinkReferenceName" +Then Click on the Browse button inside plugin properties +Then Click SELECT button inside connection data row with name: "dataset" +Then Wait till connection data loading completes with a timeout of 60 seconds +Then Verify input plugin property: "dataset" contains value: "dataset" +Then Enter input plugin property: "table" with value: "bqTargetTable" +And Select radio button plugin property: "operation" with value: "upsert" +Then Click on the Add Button of the property: "relationTableKey" with value: +| TableKeyDedupe | +Then Select dropdown plugin property: "dedupeBy" with option value: "dedupeBy" +Then Enter key for plugin property: "dedupeBy" with values: "dedupeByValueUpsert" +Then Validate "BigQuery" plugin properties +And Close the Plugin Properties page +Then Save the pipeline +Then Preview and run the pipeline +Then Wait till pipeline preview is in running state +Then Open and capture pipeline preview logs +Then Verify the preview run status of pipeline in the logs is "succeeded" +Then Close the pipeline logs +Then Close the preview +Then Deploy the pipeline +Then Run the Pipeline in Runtime +Then Wait till pipeline is in running state +Then Open and capture logs +Then Close the pipeline logs +Then Verify the pipeline status is "Succeeded" +Then Validate the data transferred from BigQuery to BigQuery with actual And expected file for: "bqUpsertDedupeFile" + +@BQ_RECORD_SOURCE_TEST @BQ_SECOND_RECORD_SOURCE_TEST @BQ_SINK_TEST +Scenario: Validate successful record transfer from two BigQuery source plugins with different schema record names, taking one extra column in BigQuery source plugin 1,and +using wrangler transformation plugin for removing the extra column and transferring the data in BigQuery sink plugin containing all the columns from both the source plugin. +Given Open Datafusion Project to configure pipeline +Then Click on the Plus Green Button to import the pipelines +Then Select the file for importing the pipeline for the plugin "Directive_Drop" +Then Navigate to the properties page of plugin: "BigQuery" +Then Replace input plugin property: "project" with value: "projectId" +Then Replace input plugin property: "dataset" with value: "dataset" +Then Replace input plugin property: "table" with value: "bqSourceTable" +Then Click on the Get Schema button +Then Click on the Validate button +Then Close the Plugin Properties page +Then Navigate to the properties page of plugin: "BigQuery2" +Then Replace input plugin property: "project" with value: "projectId" +Then Replace input plugin property: "dataset" with value: "dataset" +Then Replace input plugin property: "table" with value: "bqSourceTable2" +Then Click on the Get Schema button +Then Click on the Validate button +Then Close the Plugin Properties page +Then Navigate to the properties page of plugin: "BigQuery3" +Then Replace input plugin property: "project" with value: "projectId" +Then Replace input plugin property: "table" with value: "bqTargetTable" +Then Replace input plugin property: "dataset" with value: "dataset" +Then Click on the Validate button +Then Close the Plugin Properties page +Then Rename the pipeline +Then Deploy the pipeline +Then Run the Pipeline in Runtime +Then Wait till pipeline is in running state +Then Open and capture logs +Then Verify the pipeline status is "Succeeded" +Then Close the pipeline logs +Then Validate the data transferred from BigQuery to BigQuery with actual And expected file for: "bqDifferentRecordFile" + +@BQ_INSERT_INT_SOURCE_TEST @BQ_INSERT_SINK_TEST @CDAP-20830 +Scenario:Validate successful records transfer from BigQuery to BigQuery with Advanced operations Insert with table existing in both source and sink plugin and update table schema to true. +Given Open Datafusion Project to configure pipeline +When Expand Plugin group in the LHS plugins list: "Source" +When Select plugin: "BigQuery" from the plugins list as: "Source" +When Expand Plugin group in the LHS plugins list: "Sink" +When Select plugin: "BigQuery" from the plugins list as: "Sink" +Then Connect plugins: "BigQuery" and "BigQuery2" to establish connection +Then Navigate to the properties page of plugin: "BigQuery" +And Replace input plugin property: "project" with value: "projectId" +Then Override Service account details if set in environment variables +And Replace input plugin property: "datasetProject" with value: "datasetprojectId" +And Replace input plugin property: "referenceName" with value: "reference" +And Replace input plugin property: "dataset" with value: "dataset" +And Replace input plugin property: "table" with value: "bqSourceTable" +Then Click on the Get Schema button +Then Validate "BigQuery" plugin properties +And Close the Plugin Properties page +Then Navigate to the properties page of plugin: "BigQuery2" +Then Replace input plugin property: "project" with value: "projectId" +Then Override Service account details if set in environment variables +Then Enter input plugin property: "datasetProject" with value: "projectId" +Then Enter input plugin property: "referenceName" with value: "BQReferenceName" +Then Enter input plugin property: "dataset" with value: "dataset" +Then Enter input plugin property: "table" with value: "bqTargetTable" +And Select radio button plugin property: "operation" with value: "insert" +Then Click plugin property: "updateTableSchema" +Then Validate "BigQuery" plugin properties +And Close the Plugin Properties page +Then Save the pipeline +Then Preview and run the pipeline +Then Wait till pipeline preview is in running state +Then Open and capture pipeline preview logs +Then Verify the preview run status of pipeline in the logs is "succeeded" +Then Close the pipeline logs +Then Close the preview +Then Deploy the pipeline +Then Run the Pipeline in Runtime +Then Wait till pipeline is in running state +Then Open and capture logs +Then Verify the pipeline status is "Succeeded" +Then Close the pipeline logs diff --git a/src/e2e-test/java/io/cdap/plugin/bigquery/runners/sinkrunner/TestRunner.java b/src/e2e-test/java/io/cdap/plugin/bigquery/runners/sinkrunner/TestRunner.java index 2c07f9c50c..b6085ccb1e 100644 --- a/src/e2e-test/java/io/cdap/plugin/bigquery/runners/sinkrunner/TestRunner.java +++ b/src/e2e-test/java/io/cdap/plugin/bigquery/runners/sinkrunner/TestRunner.java @@ -27,7 +27,8 @@ features = {"src/e2e-test/features"}, glue = {"io.cdap.plugin.bigquery.stepsdesign", "io.cdap.plugin.gcs.stepsdesign", "stepsdesign", "io.cdap.plugin.common.stepsdesign"}, - tags = {"@BigQuery_Sink"}, + tags = {"@BigQuery_Sink and not @CDAP-20830"}, + //TODO: Enable test once issue is fixed https://cdap.atlassian.net/browse/CDAP-20830 monochrome = true, plugin = {"pretty", "html:target/cucumber-html-report/bigquery-sink", "json:target/cucumber-reports/cucumber-bigquery-sink.json", diff --git a/src/e2e-test/java/io/cdap/plugin/bigquery/stepsdesign/BQValidationExistingTables.java b/src/e2e-test/java/io/cdap/plugin/bigquery/stepsdesign/BQValidationExistingTables.java new file mode 100644 index 0000000000..257f731c97 --- /dev/null +++ b/src/e2e-test/java/io/cdap/plugin/bigquery/stepsdesign/BQValidationExistingTables.java @@ -0,0 +1,116 @@ +package io.cdap.plugin.bigquery.stepsdesign; + +import com.esotericsoftware.minlog.Log; +import com.google.cloud.bigquery.FieldValueList; +import com.google.cloud.bigquery.TableResult; +import com.google.gson.Gson; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import io.cdap.e2e.utils.BigQueryClient; +import io.cdap.e2e.utils.PluginPropertyUtils; +import io.cucumber.core.logging.Logger; +import io.cucumber.core.logging.LoggerFactory; + +import java.io.BufferedReader; +import java.io.FileReader; +import java.io.IOException; +import java.net.URISyntaxException; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.HashMap; +import java.util.Map; + +/** + * BigQuery Plugin Existing Table validation. + */ +public class BQValidationExistingTables { + + private static final Logger LOG = LoggerFactory.getLogger(BQValidationExistingTables.class); + private static final Gson gson = new Gson(); + + /** + * Validates the actual data in a BigQuery table against the expected data in a JSON file. + * @param table The name of the BigQuery table to retrieve data from. + * @param fileName The name of the JSON file containing the expected data. + * @return True if the actual data matches the expected data, false otherwise. + */ + public static boolean validateActualDataToExpectedData(String table, String fileName) throws IOException, + InterruptedException, URISyntaxException { + Map bigQueryMap = new HashMap<>(); + Map fileMap = new HashMap<>(); + Path bqExpectedFilePath = Paths.get(BQValidationExistingTables.class.getResource("/" + fileName).toURI()); + + getBigQueryTableData(table, bigQueryMap); + getFileData(bqExpectedFilePath.toString(), fileMap); + boolean isMatched = bigQueryMap.equals(fileMap); + return isMatched; + } + + /** + * Reads a JSON file line by line and populates a map with JSON objects using a specified ID key. + *@param fileName The path to the JSON file to be read. + * @param fileMap A map where the extracted JSON objects will be stored with their ID values as keys. + */ + + public static void getFileData(String fileName, Map fileMap) { + try (BufferedReader br = new BufferedReader(new FileReader(fileName))) { + String line; + while ((line = br.readLine()) != null) { + JsonObject json = gson.fromJson(line, JsonObject.class); + String idKey = getIdKey(json); + if (idKey != null) { + JsonElement idElement = json.get(idKey); + if (idElement.isJsonPrimitive()) { + String idValue = idElement.getAsString(); + fileMap.put(idValue, json); + } + } else { + Log.error("ID key not found"); + } + } + } catch (IOException e) { + Log.error("Error reading the file: " + e.getMessage()); + } + } + + private static void getBigQueryTableData(String targetTable, Map bigQueryMap) + throws IOException, InterruptedException { + String dataset = PluginPropertyUtils.pluginProp("dataset"); + String projectId = PluginPropertyUtils.pluginProp("projectId"); + String selectQuery = "SELECT TO_JSON(t) FROM `" + projectId + "." + dataset + "." + targetTable + "` AS t"; + TableResult result = BigQueryClient.getQueryResult(selectQuery); + + for (FieldValueList row : result.iterateAll()) { + JsonObject json = gson.fromJson(row.get(0).getStringValue(), JsonObject.class); + String idKey = getIdKey(json); // Get the actual ID key from the JSON object + if (idKey != null) { + JsonElement idElement = json.get(idKey); + if (idElement.isJsonPrimitive()) { + String id = idElement.getAsString(); + bigQueryMap.put(id, json); + } else { + Log.error("Data Mismatched"); + } + } + } + } + + /** + * Retrieves the key for the ID element in the provided JSON object. + * + * @param json The JSON object to search for the ID key. + */ + private static String getIdKey(JsonObject json) { + if (json.has("ID")) { + return "ID"; + } else if (json.has("Name")) { + return "Name"; + } else if (json.has("Price")) { + return "Price"; + } else if (json.has("Customer_Exists")) { + return "Customer_Exists"; + } else { + return null; + } + } +} diff --git a/src/e2e-test/java/io/cdap/plugin/bigquery/stepsdesign/BigQuery.java b/src/e2e-test/java/io/cdap/plugin/bigquery/stepsdesign/BigQuery.java index 215886662d..b6a3c0be95 100644 --- a/src/e2e-test/java/io/cdap/plugin/bigquery/stepsdesign/BigQuery.java +++ b/src/e2e-test/java/io/cdap/plugin/bigquery/stepsdesign/BigQuery.java @@ -25,6 +25,7 @@ import stepsdesign.BeforeActions; import java.io.IOException; +import java.net.URISyntaxException; /** * BigQuery Plugin validation common step design. @@ -44,4 +45,13 @@ public void validateTheValuesOfRecordsTransferredToBQsinkIsEqualToTheValuesFromS Assert.assertTrue("Value of records transferred to the BQ sink should be equal to the value " + "of the records in the source table", recordsMatched); } + + @Then("Validate the data transferred from BigQuery to BigQuery with actual And expected file for: {string}") + public void validateTheDataFromBQToBQWithActualAndExpectedFileFor(String expectedFile) throws IOException, + InterruptedException, URISyntaxException { + boolean recordsMatched = BQValidationExistingTables.validateActualDataToExpectedData( + PluginPropertyUtils.pluginProp("bqTargetTable"), + PluginPropertyUtils.pluginProp(expectedFile)); + Assert.assertTrue("Value of records in actual and expected file is equal", recordsMatched); + } } diff --git a/src/e2e-test/java/io/cdap/plugin/common/stepsdesign/TestSetupHooks.java b/src/e2e-test/java/io/cdap/plugin/common/stepsdesign/TestSetupHooks.java index 233f154b5b..72d01507fb 100644 --- a/src/e2e-test/java/io/cdap/plugin/common/stepsdesign/TestSetupHooks.java +++ b/src/e2e-test/java/io/cdap/plugin/common/stepsdesign/TestSetupHooks.java @@ -59,6 +59,7 @@ public class TestSetupHooks { public static String gcsTargetBucketName = StringUtils.EMPTY; public static String bqTargetTable = StringUtils.EMPTY; public static String bqSourceTable = StringUtils.EMPTY; + public static String bqSourceTable2 = StringUtils.EMPTY; public static String bqSourceView = StringUtils.EMPTY; public static String pubSubTargetTopic = StringUtils.EMPTY; public static String spannerInstance = StringUtils.EMPTY; @@ -270,7 +271,11 @@ public static void createTempSourceBQTable() throws IOException, InterruptedExce } @After(order = 1, value = "@BQ_SOURCE_TEST or @BQ_PARTITIONED_SOURCE_TEST or @BQ_SOURCE_DATATYPE_TEST or " + - "@BQ_INSERT_SOURCE_TEST or @BQ_UPDATE_SINK_TEST") + "@BQ_INSERT_SOURCE_TEST or @BQ_UPDATE_SINK_TEST or @BQ_UPSERT_SOURCE_TEST or @BQ_UPSERT_SINK_TEST or " + + "@BQ_NULL_MODE_SOURCE_TEST or @BQ_UPDATE_SOURCE_DEDUPE_TEST or @BQ_UPDATE_SINK_DEDUPE_TEST or " + + "@BQ_INSERT_INT_SOURCE_TEST or @BQ_EXISTING_SINK_TEST or @BQ_TIME_SOURCE_TEST or " + + "@BQ_UPSERT_DEDUPE_SOURCE_TEST or @BQ_UPSERT_DEDUPE_SINK_TEST or @BQ_RECORD_SOURCE_TEST or " + + "@BQ_SECOND_RECORD_SOURCE_TEST or @BQ_INSERT_SINK_TEST") public static void deleteTempSourceBQTable() throws IOException, InterruptedException { BigQueryClient.dropBqQuery(bqSourceTable); PluginPropertyUtils.removePluginProp("bqSourceTable"); @@ -941,4 +946,281 @@ public static void createBucketWithMultipleTestFilesWithRegex() throws IOExcepti gcsSourceBucketName = createGCSBucketWithMultipleFiles(PluginPropertyUtils.pluginProp( "gcsMultipleFilesFilterRegexPath")); } + + @Before(order = 1, value = "@BQ_UPSERT_SOURCE_TEST") + public static void createSourceBQUpsertTable() throws IOException, InterruptedException { + bqSourceTable = "E2E_SOURCE_" + UUID.randomUUID().toString().replaceAll("-", "_"); + io.cdap.e2e.utils.BigQueryClient.getSoleQueryResult("create table `" + datasetName + "." + bqSourceTable + "` " + + "(ID INT64, Name STRING, " + "Price FLOAT64," + + "Customer_Exists BOOL ) "); + try { + io.cdap.e2e.utils.BigQueryClient.getSoleQueryResult("INSERT INTO `" + datasetName + "." + bqSourceTable + "` " + + "(ID, Name, Price, Customer_Exists)" + + "VALUES" + "(5, 'Raja', 500.0, true)," + + "(6, 'Tom', 100.0, false)"); + } catch (NoSuchElementException e) { + // Insert query does not return any record. + // Iterator on TableResult values in getSoleQueryResult method throws NoSuchElementException + BeforeActions.scenario.write("Error inserting the record in the table" + e.getStackTrace()); + } + PluginPropertyUtils.addPluginProp("bqSourceTable", bqSourceTable); + BeforeActions.scenario.write("BQ Source Table " + bqSourceTable + " created successfully"); + } + + @Before(order = 1, value = "@BQ_UPSERT_SINK_TEST") + public static void createSinkBQUpsertTable() throws IOException, InterruptedException { + bqTargetTable = "E2E_SOURCE_" + UUID.randomUUID().toString().replaceAll("-", "_"); + PluginPropertyUtils.addPluginProp("bqTargetTable", bqTargetTable); + BeforeActions.scenario.write("BQ target table name - " + bqTargetTable); + io.cdap.e2e.utils.BigQueryClient.getSoleQueryResult("create table `" + datasetName + "." + bqTargetTable + "` " + + "(ID INT64, Name STRING, " + "Price FLOAT64," + + "Customer_Exists BOOL ) "); + try { + io.cdap.e2e.utils.BigQueryClient.getSoleQueryResult("INSERT INTO `" + datasetName + "." + bqTargetTable + "` " + + "(ID, Name, Price, Customer_Exists)" + + "VALUES" + "(5, 'Rakesh', 500.0, true)"); + } catch (NoSuchElementException e) { + // Insert query does not return any record. + // Iterator on TableResult values in getSoleQueryResult method throws NoSuchElementException + BeforeActions.scenario.write("Error inserting the record in the table" + e.getStackTrace()); + } + PluginPropertyUtils.addPluginProp("bqTargetTable", bqTargetTable); + BeforeActions.scenario.write("BQ Target Table " + bqTargetTable + " created successfully"); + } + + @Before(value = "@BQ_NULL_MODE_SOURCE_TEST") + public static void createNullSourceBQTable() throws IOException, InterruptedException { + bqSourceTable = "E2E_TARGET_" + UUID.randomUUID().toString().replaceAll("-", "_"); + PluginPropertyUtils.addPluginProp("bqSourceTable", bqSourceTable); + BeforeActions.scenario.write("BQ source table name - " + bqSourceTable); + io.cdap.e2e.utils.BigQueryClient.getSoleQueryResult("create table `" + datasetName + "." + bqSourceTable + "` " + + "(Address STRING, id INT64, Firstname STRING," + + "LastName STRING)"); + try { + BigQueryClient.getSoleQueryResult("INSERT INTO `" + datasetName + "." + bqSourceTable + "` " + + "(Address, id, Firstname, LastName)" + + "VALUES" + "('Agra', 1, 'Harry','')," + + "('Noida', 2, '','')"); + } catch (NoSuchElementException e) { + // Insert query does not return any record. + // Iterator on TableResult values in getSoleQueryResult method throws NoSuchElementException + BeforeActions.scenario.write("Error inserting the record in the table" + e.getStackTrace()); + } + PluginPropertyUtils.addPluginProp(" bqSourceTable", bqSourceTable); + BeforeActions.scenario.write("BQ Source Table " + bqSourceTable + " updated successfully"); + } + + @Before(value = "@BQ_UPDATE_SOURCE_DEDUPE_TEST") + public static void createSourceBQDedupeTable() throws IOException, InterruptedException { + bqSourceTable = "E2E_TARGET_" + UUID.randomUUID().toString().replaceAll("-", "_"); + PluginPropertyUtils.addPluginProp("bqSourceTable", bqSourceTable); + BeforeActions.scenario.write("BQ source table name - " + bqSourceTable); + io.cdap.e2e.utils.BigQueryClient.getSoleQueryResult("create table `" + datasetName + "." + bqSourceTable + "` " + + "(ID INT64, Name STRING, Price FLOAT64, " + + "Customer_Exists BOOL)"); + try { + io.cdap.e2e.utils.BigQueryClient.getSoleQueryResult("INSERT INTO `" + datasetName + "." + bqSourceTable + "` " + + "(Name, ID, Price,Customer_Exists)" + + "VALUES" + "('string_1', 1, 0.1,true)," + + "('string_1', 2, 0.2,false)"); + } catch (NoSuchElementException e) { + // Insert query does not return any record. + // Iterator on TableResult values in getSoleQueryResult method throws NoSuchElementException + BeforeActions.scenario.write("Error inserting the record in the table" + e.getStackTrace()); + } + PluginPropertyUtils.addPluginProp(" bqSourceTable", bqSourceTable); + BeforeActions.scenario.write("BQ Source Table " + bqSourceTable + " updated successfully"); + } + + @Before(value = "@BQ_UPDATE_SINK_DEDUPE_TEST") + public static void createSinkBQDedupeTable() throws IOException, InterruptedException { + bqTargetTable = "E2E_TARGET_" + UUID.randomUUID().toString().replaceAll("-", "_"); + PluginPropertyUtils.addPluginProp("bqTargetTable", bqTargetTable); + BeforeActions.scenario.write("BQ target table name - " + bqTargetTable); + io.cdap.e2e.utils.BigQueryClient.getSoleQueryResult("create table `" + datasetName + "." + bqTargetTable + "` " + + "(ID INT64, Name STRING, Price FLOAT64, " + + "Customer_Exists BOOL)"); + try { + io.cdap.e2e.utils.BigQueryClient.getSoleQueryResult("INSERT INTO `" + datasetName + "." + bqTargetTable + "` " + + "(Name, ID, Price,Customer_Exists)" + + "VALUES" + "('string_0', 0, 0,true)," + + "('string_1', 10, 1.1,false)"); + } catch (NoSuchElementException e) { + // Insert query does not return any record. + // Iterator on TableResult values in getSoleQueryResult method throws NoSuchElementException + BeforeActions.scenario.write("Error inserting the record in the table" + e.getStackTrace()); + } + PluginPropertyUtils.addPluginProp(" bqTargetTable", bqTargetTable); + BeforeActions.scenario.write("BQ Target Table " + bqTargetTable + " updated successfully"); + } + + @Before(value = "@BQ_INSERT_INT_SOURCE_TEST") + public static void createSourceBQTable() throws IOException, InterruptedException { + bqSourceTable = "E2E_TARGET_" + UUID.randomUUID().toString().replaceAll("-", "_"); + PluginPropertyUtils.addPluginProp("bqSourceTable", bqSourceTable); + BeforeActions.scenario.write("BQ source table name - " + bqSourceTable); + BigQueryClient.getSoleQueryResult("create table `" + datasetName + "." + bqSourceTable + "` " + + "(ID INT64, Name STRING, Price FLOAT64, Customer_Exists BOOL)"); + try { + io.cdap.e2e.utils.BigQueryClient.getSoleQueryResult("INSERT INTO `" + datasetName + "." + bqSourceTable + "` " + + "(ID, Name, Price,Customer_Exists)" + + "VALUES" + "(3, 'Rajan Kumar', 100.0, true)"); + } catch (NoSuchElementException e) { + // Insert query does not return any record. + // Iterator on TableResult values in getSoleQueryResult method throws NoSuchElementException + BeforeActions.scenario.write("Error inserting the record in the table" + e.getStackTrace()); + } + PluginPropertyUtils.addPluginProp(" bqSourceTable", bqSourceTable); + BeforeActions.scenario.write("BQ Source Table " + bqSourceTable + " updated successfully"); + } + + @Before(order = 1, value = "@BQ_EXISTING_SINK_TEST") + public static void createSinkBQExistingTable() throws IOException, InterruptedException { + + bqTargetTable = "E2E_TARGET_" + UUID.randomUUID().toString().replaceAll("-", "_"); + PluginPropertyUtils.addPluginProp("bqTargetTable", bqTargetTable); + BeforeActions.scenario.write("BQ Target table name - " + bqTargetTable); + io.cdap.e2e.utils.BigQueryClient.getSoleQueryResult("create table `" + datasetName + "." + bqTargetTable + "` " + + "(ID INT64,Name STRING," + + "Price FLOAT64, Customer_Exists BOOL ) "); + try { + io.cdap.e2e.utils.BigQueryClient.getSoleQueryResult("INSERT INTO `" + datasetName + "." + bqTargetTable + "` " + + "(ID, Name, Price, Customer_Exists)" + + "VALUES" + "(3, 'Rajan Kumar', 100.0, true)"); + } catch (NoSuchElementException e) { + // Insert query does not return any record. + // Iterator on TableResult values in getSoleQueryResult method throws NoSuchElementException + BeforeActions.scenario.write("Error inserting the record in the table" + e.getStackTrace()); + } + PluginPropertyUtils.addPluginProp(" bqTargetTable", bqTargetTable); + BeforeActions.scenario.write("BQ Target Table " + bqTargetTable + " updated successfully"); + } + + @Before(order = 1, value = "@BQ_TIME_SOURCE_TEST") + public static void createTimeStampBQTable() throws IOException, InterruptedException { + bqSourceTable = "E2E_SOURCE_" + UUID.randomUUID().toString().replaceAll("-", "_"); + PluginPropertyUtils.addPluginProp("bqSourceTable", bqSourceTable); + BeforeActions.scenario.write("BQ source table name - " + bqSourceTable); + BigQueryClient.getSoleQueryResult("create table `" + datasetName + "." + bqSourceTable + "` " + + "(ID STRING, transaction_date DATE, Firstname STRING," + + " transaction_dt DATETIME, updated_on TIMESTAMP )"); + try { + BigQueryClient.getSoleQueryResult("INSERT INTO `" + datasetName + "." + bqSourceTable + "` " + + "(ID, transaction_date, Firstname, transaction_dt, updated_on )" + + "VALUES" + "('Agra', '2021-02-20', 'Neera','2019-07-07 11:24:00', " + + "'2019-03-10 04:50:01 UTC')," + + "('Noida', '2021-02-21','', '2019-07-07 11:24:00', " + + "'2019-03-10 04:50:01 UTC')," + + "('Gurgaon', '2021-02-22', 'singh', '2019-07-07 11:24:00', " + + "'2019-03-10 04:50:01 UTC' )"); + } catch (NoSuchElementException e) { + // Insert query does not return any record. + // Iterator on TableResult values in getSoleQueryResult method throws NoSuchElementException + BeforeActions.scenario.write("Error inserting the record in the table" + e.getStackTrace()); + } + PluginPropertyUtils.addPluginProp(" bqSourceTable", bqSourceTable); + BeforeActions.scenario.write("BQ Source Table " + bqSourceTable + " updated successfully"); + } + + @Before(order = 1, value = "@BQ_UPSERT_DEDUPE_SOURCE_TEST") + public static void createSourceBQDedupeUpsertTable() throws IOException, InterruptedException { + bqSourceTable = "E2E_SOURCE_" + UUID.randomUUID().toString().replaceAll("-", "_"); + io.cdap.e2e.utils.BigQueryClient.getSoleQueryResult("create table `" + datasetName + "." + bqSourceTable + "` " + + "(ID INT64, Name STRING, " + "Price FLOAT64," + + "Customer_Exists BOOL ) "); + try { + io.cdap.e2e.utils.BigQueryClient.getSoleQueryResult("INSERT INTO `" + datasetName + "." + bqSourceTable + "` " + + "(ID, Name, Price, Customer_Exists)" + + "VALUES" + "(1, 'string_1', 0.1, true)," + + "(2, 'string_1', 0.2, false)," + + "(3, 'string_3', 0.3, false)"); + } catch (NoSuchElementException e) { + // Insert query does not return any record. + // Iterator on TableResult values in getSoleQueryResult method throws NoSuchElementException + BeforeActions.scenario.write("Error inserting the record in the table" + e.getStackTrace()); + } + PluginPropertyUtils.addPluginProp("bqSourceTable", bqSourceTable); + BeforeActions.scenario.write("BQ Source Table " + bqSourceTable + " created successfully"); + } + + @Before(order = 1, value = "@BQ_UPSERT_DEDUPE_SINK_TEST") + public static void createSinkBQDeupeUpsertTable() throws IOException, InterruptedException { + bqTargetTable = "E2E_SOURCE_" + UUID.randomUUID().toString().replaceAll("-", "_"); + PluginPropertyUtils.addPluginProp("bqTargetTable", bqTargetTable); + BeforeActions.scenario.write("BQ target table name - " + bqTargetTable); + io.cdap.e2e.utils.BigQueryClient.getSoleQueryResult("create table `" + datasetName + "." + bqTargetTable + "` " + + "(ID INT64, Name STRING, " + "Price FLOAT64," + + "Customer_Exists BOOL ) "); + try { + io.cdap.e2e.utils.BigQueryClient.getSoleQueryResult("INSERT INTO `" + datasetName + "." + bqTargetTable + "` " + + "(ID, Name, Price, Customer_Exists)" + + "VALUES" + "(0, 'string_0', 0, true)," + + "(10, 'string_1', 1.1, false)"); + } catch (NoSuchElementException e) { + // Insert query does not return any record. + // Iterator on TableResult values in getSoleQueryResult method throws NoSuchElementException + BeforeActions.scenario.write("Error inserting the record in the table" + e.getStackTrace()); + } + PluginPropertyUtils.addPluginProp("bqTargetTable", bqTargetTable); + BeforeActions.scenario.write("BQ Target Table " + bqTargetTable + " created successfully"); + } + + @Before(order = 1, value = "@BQ_RECORD_SOURCE_TEST") + public static void createSourceBQRecordTable() throws IOException, InterruptedException { + bqSourceTable = "E2E_SOURCE_" + UUID.randomUUID().toString().replaceAll("-", "_"); + io.cdap.e2e.utils.BigQueryClient.getSoleQueryResult("create table `" + datasetName + "." + bqSourceTable + "` " + + "(ID INT64, Name STRING, " + "Price FLOAT64," + + "TableName STRING ) "); + try { + io.cdap.e2e.utils.BigQueryClient.getSoleQueryResult("INSERT INTO `" + datasetName + "." + bqSourceTable + "` " + + "(ID, Name, Price, TableName)" + + "VALUES" + "(1, 'string_1', 0.1, 'Test')"); + } catch (NoSuchElementException e) { + // Insert query does not return any record. + // Iterator on TableResult values in getSoleQueryResult method throws NoSuchElementException + BeforeActions.scenario.write("Error inserting the record in the table" + e.getStackTrace()); + } + PluginPropertyUtils.addPluginProp("bqSourceTable", bqSourceTable); + BeforeActions.scenario.write("BQ Source Table " + bqSourceTable + " created successfully"); + } + + @Before(order = 1, value = "@BQ_SECOND_RECORD_SOURCE_TEST") + public static void createSourceBQSecondRecordTable() throws IOException, InterruptedException { + bqSourceTable2 = "E2E_SOURCE_" + UUID.randomUUID().toString().replaceAll("-", "_"); + io.cdap.e2e.utils.BigQueryClient.getSoleQueryResult("create table `" + datasetName + "." + bqSourceTable2 + "` " + + "(ID INT64, Name STRING, " + "Price FLOAT64 ) "); + try { + io.cdap.e2e.utils.BigQueryClient.getSoleQueryResult("INSERT INTO `" + datasetName + "." + bqSourceTable2 + "` " + + "(ID, Name, Price)" + + "VALUES" + "(1, 'string_1', 0.1)"); + } catch (NoSuchElementException e) { + // Insert query does not return any record. + // Iterator on TableResult values in getSoleQueryResult method throws NoSuchElementException + BeforeActions.scenario.write("Error inserting the record in the table" + e.getStackTrace()); + } + PluginPropertyUtils.addPluginProp("bqSourceTable2", bqSourceTable2); + BeforeActions.scenario.write("BQ Source Table " + bqSourceTable2 + " created successfully"); + } + + @Before(order = 1, value = "@BQ_INSERT_SINK_TEST") + public static void createSinkBQInsertTable() throws IOException, InterruptedException { + + bqTargetTable = "E2E_TARGET_" + UUID.randomUUID().toString().replaceAll("-", "_"); + PluginPropertyUtils.addPluginProp("bqTargetTable", bqTargetTable); + BeforeActions.scenario.write("BQ Target table name - " + bqTargetTable); + io.cdap.e2e.utils.BigQueryClient.getSoleQueryResult("create table `" + datasetName + "." + bqTargetTable + "` " + + "(ID INT64,Name STRING," + + "id_Value INT64, Customer_Exists BOOL ) "); + try { + io.cdap.e2e.utils.BigQueryClient.getSoleQueryResult("INSERT INTO `" + datasetName + "." + bqTargetTable + "` " + + "(ID, Name, id_Value, Customer_Exists)" + + "VALUES" + "(3, 'Rajan Kumar', 100, true)"); + } catch (NoSuchElementException e) { + // Insert query does not return any record. + // Iterator on TableResult values in getSoleQueryResult method throws NoSuchElementException + BeforeActions.scenario.write("Error inserting the record in the table" + e.getStackTrace()); + } + PluginPropertyUtils.addPluginProp(" bqTargetTable", bqTargetTable); + BeforeActions.scenario.write("BQ Target Table " + bqTargetTable + " updated successfully"); + } } diff --git a/src/e2e-test/resources/pluginDataCyAttributes.properties b/src/e2e-test/resources/pluginDataCyAttributes.properties index 2f321c0333..6df9c9bf34 100644 --- a/src/e2e-test/resources/pluginDataCyAttributes.properties +++ b/src/e2e-test/resources/pluginDataCyAttributes.properties @@ -42,4 +42,6 @@ spannerConnectionRow=connector-Spanner testConnection=connection-test-button connectionCreate=connection-submit-button parsingOptionConfirm=parsing-config-confirm +dedupeBy=dedupeBy +relationTableKey=relationTableKey ## CONNECTION-MANAGEMENT-END diff --git a/src/e2e-test/resources/pluginParameters.properties b/src/e2e-test/resources/pluginParameters.properties index eda2da060f..4edc2a2559 100644 --- a/src/e2e-test/resources/pluginParameters.properties +++ b/src/e2e-test/resources/pluginParameters.properties @@ -205,6 +205,31 @@ bqInvalidRefName=invalidRef&^*&&* bqDatatypeChange1=[{"key":"Id","value":"long"},{"key":"Value","value":"long"}] bqDataTypeTestFileSchema1=[{"key":"Id","value":"long"},{"key":"Value","value":"long"},\ {"key":"UID","value":"string"}] +TableKeyUpsert=ID +TableKeyInsert=ID +bqUpsertExpectedFile=testdata/BigQuery/BQUpsertTableFile +bqUpdateDedupeExpectedFile=testdata/BigQuery/BQUpdateDedupeFile +bqInsertExpectedFile=testdata/BigQuery/BQInsertIntFile +relationTableKeyValue=Name +dedupeByOrder=ASC +dedupeByValue=ID +dedupeByValueUpsert=Price +rangeStartValue=2 +rangeEndValue=3 +rangeIntervalValue=1 +partitionByFieldValue=ID +bqPartitionFieldDateTime=transaction_dt +bqPartitionFieldTimeStamp=updated_on +bqSourceTable2=dummy +dedupeBy=DESC +TableKeyDedupe=Name +Directive_Drop=testdata/BigQuery/test_diffschema_record-cdap-data-pipeline.json +bqUpsertDedupeFile=testdata/BigQuery/BQUpsertDedupeFile +bqDifferentRecordFile=testdata/BigQuery/BQDifferentRecordNameFile +bqDateExpectedFile=testdata/BigQuery/BQDateFile +bqDateTimeExpectedFile=testdata/BigQuery/BQDateTimeFile +bqTimeStampExpectedFile=testdata/BigQuery/BQTimeStampFile +bqPartitionFieldDate=transaction_date ## BIGQUERY-PLUGIN-PROPERTIES-END ## PUBSUBSINK-PLUGIN-PROPERTIES-START diff --git a/src/e2e-test/resources/testdata/BigQuery/BQDateFile b/src/e2e-test/resources/testdata/BigQuery/BQDateFile new file mode 100644 index 0000000000..9f24705f5c --- /dev/null +++ b/src/e2e-test/resources/testdata/BigQuery/BQDateFile @@ -0,0 +1,3 @@ +{"Firstname":"singh","ID":"Gurgaon","transaction_date":"2021-02-22","transaction_dt":"2019-07-07T11:24:00","updated_on":"2019-03-10T04:50:01Z"} +{"Firstname":"Neera","ID":"Agra","transaction_date":"2021-02-20","transaction_dt":"2019-07-07T11:24:00","updated_on":"2019-03-10T04:50:01Z"} +{"Firstname":"","ID":"Noida","transaction_date":"2021-02-21","transaction_dt":"2019-07-07T11:24:00","updated_on":"2019-03-10T04:50:01Z"} \ No newline at end of file diff --git a/src/e2e-test/resources/testdata/BigQuery/BQDateTimeFile b/src/e2e-test/resources/testdata/BigQuery/BQDateTimeFile new file mode 100644 index 0000000000..9f24705f5c --- /dev/null +++ b/src/e2e-test/resources/testdata/BigQuery/BQDateTimeFile @@ -0,0 +1,3 @@ +{"Firstname":"singh","ID":"Gurgaon","transaction_date":"2021-02-22","transaction_dt":"2019-07-07T11:24:00","updated_on":"2019-03-10T04:50:01Z"} +{"Firstname":"Neera","ID":"Agra","transaction_date":"2021-02-20","transaction_dt":"2019-07-07T11:24:00","updated_on":"2019-03-10T04:50:01Z"} +{"Firstname":"","ID":"Noida","transaction_date":"2021-02-21","transaction_dt":"2019-07-07T11:24:00","updated_on":"2019-03-10T04:50:01Z"} \ No newline at end of file diff --git a/src/e2e-test/resources/testdata/BigQuery/BQDifferentRecordNameFile b/src/e2e-test/resources/testdata/BigQuery/BQDifferentRecordNameFile new file mode 100644 index 0000000000..18336cbbd0 --- /dev/null +++ b/src/e2e-test/resources/testdata/BigQuery/BQDifferentRecordNameFile @@ -0,0 +1,2 @@ +{"ID":1,"Name":"string_1","Price":0.1} +{"ID":1,"Name":"string_1","Price":0.1} \ No newline at end of file diff --git a/src/e2e-test/resources/testdata/BigQuery/BQInsertIntFile b/src/e2e-test/resources/testdata/BigQuery/BQInsertIntFile new file mode 100644 index 0000000000..aeed733779 --- /dev/null +++ b/src/e2e-test/resources/testdata/BigQuery/BQInsertIntFile @@ -0,0 +1,2 @@ +{"Customer_Exists":true,"ID":3,"Name":"Rajan Kumar","Price":100.0} +{"Customer_Exists":true,"ID":3,"Name":"Rajan Kumar","Price":100.0} \ No newline at end of file diff --git a/src/e2e-test/resources/testdata/BigQuery/BQTimeStampFile b/src/e2e-test/resources/testdata/BigQuery/BQTimeStampFile new file mode 100644 index 0000000000..9f24705f5c --- /dev/null +++ b/src/e2e-test/resources/testdata/BigQuery/BQTimeStampFile @@ -0,0 +1,3 @@ +{"Firstname":"singh","ID":"Gurgaon","transaction_date":"2021-02-22","transaction_dt":"2019-07-07T11:24:00","updated_on":"2019-03-10T04:50:01Z"} +{"Firstname":"Neera","ID":"Agra","transaction_date":"2021-02-20","transaction_dt":"2019-07-07T11:24:00","updated_on":"2019-03-10T04:50:01Z"} +{"Firstname":"","ID":"Noida","transaction_date":"2021-02-21","transaction_dt":"2019-07-07T11:24:00","updated_on":"2019-03-10T04:50:01Z"} \ No newline at end of file diff --git a/src/e2e-test/resources/testdata/BigQuery/BQUpdateDedupeFile b/src/e2e-test/resources/testdata/BigQuery/BQUpdateDedupeFile new file mode 100644 index 0000000000..6e2a839642 --- /dev/null +++ b/src/e2e-test/resources/testdata/BigQuery/BQUpdateDedupeFile @@ -0,0 +1,2 @@ +{"Customer_Exists":true,"ID":0,"Name":"string_0","Price":0.0} +{"Customer_Exists":true,"ID":1,"Name":"string_1","Price":0.1} \ No newline at end of file diff --git a/src/e2e-test/resources/testdata/BigQuery/BQUpsertDedupeFile b/src/e2e-test/resources/testdata/BigQuery/BQUpsertDedupeFile new file mode 100644 index 0000000000..550a80c916 --- /dev/null +++ b/src/e2e-test/resources/testdata/BigQuery/BQUpsertDedupeFile @@ -0,0 +1,3 @@ +{"Customer_Exists":false,"ID":3,"Name":"string_3","Price":0.3} +{"Customer_Exists":false,"ID":2,"Name":"string_1","Price":0.2} +{"Customer_Exists":true,"ID":0,"Name":"string_0","Price":0.0} \ No newline at end of file diff --git a/src/e2e-test/resources/testdata/BigQuery/BQUpsertTableFile b/src/e2e-test/resources/testdata/BigQuery/BQUpsertTableFile new file mode 100644 index 0000000000..0b48d57e4d --- /dev/null +++ b/src/e2e-test/resources/testdata/BigQuery/BQUpsertTableFile @@ -0,0 +1,2 @@ +{"Customer_Exists":true,"ID":5,"Name":"Raja","Price":500.0} +{"Customer_Exists":false,"ID":6,"Name":"Tom","Price":100.0} \ No newline at end of file diff --git a/src/e2e-test/resources/testdata/BigQuery/test_diffschema_record-cdap-data-pipeline.json b/src/e2e-test/resources/testdata/BigQuery/test_diffschema_record-cdap-data-pipeline.json new file mode 100644 index 0000000000..e1b1966d3f --- /dev/null +++ b/src/e2e-test/resources/testdata/BigQuery/test_diffschema_record-cdap-data-pipeline.json @@ -0,0 +1,183 @@ +{ + "name": "test_diffschema_record", + "description": "Data Pipeline Application", + "artifact": { + "name": "cdap-data-pipeline", + "version": "6.10.0-SNAPSHOT", + "scope": "SYSTEM" + }, + "config": { + "resources": { + "memoryMB": 2048, + "virtualCores": 1 + }, + "driverResources": { + "memoryMB": 2048, + "virtualCores": 1 + }, + "connections": [ + { + "from": "BigQuery", + "to": "Wrangler" + }, + { + "from": "Wrangler", + "to": "BigQuery3" + }, + { + "from": "BigQuery2", + "to": "BigQuery3" + } + ], + "comments": [], + "postActions": [], + "properties": {}, + "processTimingEnabled": true, + "stageLoggingEnabled": false, + "stages": [ + { + "name": "BigQuery", + "plugin": { + "name": "BigQueryTable", + "type": "batchsource", + "label": "BigQuery", + "artifact": { + "name": "google-cloud", + "version": "0.22.0-SNAPSHOT", + "scope": "SYSTEM" + }, + "properties": { + "useConnection": "false", + "project": "cdf-athena", + "serviceAccountType": "filePath", + "serviceFilePath": "auto-detect", + "referenceName": "bq_ref", + "dataset": "bq_automation", + "table": "bqSourceTableMore", + "enableQueryingViews": "false", + "schema": "{\"type\":\"record\",\"name\":\"output\",\"fields\":[{\"name\":\"ID\",\"type\":[\"long\",\"null\"]},{\"name\":\"Name\",\"type\":[\"string\",\"null\"]},{\"name\":\"Price\",\"type\":[\"double\",\"null\"]},{\"name\":\"TableName\",\"type\":[\"string\",\"null\"]}]}" + } + }, + "outputSchema": "{\"type\":\"record\",\"name\":\"output\",\"fields\":[{\"name\":\"ID\",\"type\":[\"long\",\"null\"]},{\"name\":\"Name\",\"type\":[\"string\",\"null\"]},{\"name\":\"Price\",\"type\":[\"double\",\"null\"]},{\"name\":\"TableName\",\"type\":[\"string\",\"null\"]}]}", + "id": "BigQuery", + "type": "batchsource", + "label": "BigQuery", + "icon": "fa-plug" + }, + { + "name": "Wrangler", + "plugin": { + "name": "Wrangler", + "type": "transform", + "label": "Wrangler", + "artifact": { + "name": "wrangler-transform", + "version": "4.10.0-SNAPSHOT", + "scope": "SYSTEM" + }, + "properties": { + "field": "*", + "precondition": "false", + "directives": "drop :TableName", + "on-error": "fail-pipeline", + "schema": "{\"type\":\"record\",\"name\":\"record\",\"fields\":[{\"name\":\"ID\",\"type\":[\"long\",\"null\"]},{\"name\":\"Name\",\"type\":[\"string\",\"null\"]},{\"name\":\"Price\",\"type\":[\"double\",\"null\"]}]}", + "workspaceId": "7038fc39-732e-4d75-8d3f-db6cfe5a11d8" + } + }, + "outputSchema": "{\"type\":\"record\",\"name\":\"record\",\"fields\":[{\"name\":\"ID\",\"type\":[\"long\",\"null\"]},{\"name\":\"Name\",\"type\":[\"string\",\"null\"]},{\"name\":\"Price\",\"type\":[\"double\",\"null\"]}]}", + "inputSchema": [ + { + "name": "BigQuery", + "schema": "{\"type\":\"record\",\"name\":\"output\",\"fields\":[{\"name\":\"ID\",\"type\":[\"long\",\"null\"]},{\"name\":\"Name\",\"type\":[\"string\",\"null\"]},{\"name\":\"Price\",\"type\":[\"double\",\"null\"]},{\"name\":\"TableName\",\"type\":[\"string\",\"null\"]}]}" + } + ], + "id": "Wrangler", + "type": "transform", + "label": "Wrangler", + "icon": "icon-DataPreparation" + }, + { + "name": "BigQuery3", + "plugin": { + "name": "BigQueryTable", + "type": "batchsink", + "label": "BigQuery3", + "artifact": { + "name": "google-cloud", + "version": "0.22.0-SNAPSHOT", + "scope": "SYSTEM" + }, + "properties": { + "useConnection": "false", + "project": "auto-detect", + "serviceAccountType": "filePath", + "serviceFilePath": "auto-detect", + "dataset": "bq_automation", + "table": "New_target_table_combine", + "operation": "insert", + "truncateTable": "false", + "allowSchemaRelaxation": "false", + "location": "US", + "createPartitionedTable": "false", + "partitioningType": "TIME", + "partitionFilterRequired": "false", + "schema": "{\"type\":\"record\",\"name\":\"record\",\"fields\":[{\"name\":\"ID\",\"type\":[\"long\",\"null\"]},{\"name\":\"Name\",\"type\":[\"string\",\"null\"]},{\"name\":\"Price\",\"type\":[\"double\",\"null\"]}]}" + } + }, + "outputSchema": "{\"type\":\"record\",\"name\":\"record\",\"fields\":[{\"name\":\"ID\",\"type\":[\"long\",\"null\"]},{\"name\":\"Name\",\"type\":[\"string\",\"null\"]},{\"name\":\"Price\",\"type\":[\"double\",\"null\"]}]}", + "inputSchema": [ + { + "name": "Wrangler", + "schema": "{\"type\":\"record\",\"name\":\"record\",\"fields\":[{\"name\":\"ID\",\"type\":[\"long\",\"null\"]},{\"name\":\"Name\",\"type\":[\"string\",\"null\"]},{\"name\":\"Price\",\"type\":[\"double\",\"null\"]}]}" + }, + { + "name": "BigQuery2", + "schema": "{\"type\":\"record\",\"name\":\"output\",\"fields\":[{\"name\":\"ID\",\"type\":[\"long\",\"null\"]},{\"name\":\"Name\",\"type\":[\"string\",\"null\"]},{\"name\":\"Price\",\"type\":[\"double\",\"null\"]}]}" + } + ], + "id": "BigQuery3", + "type": "batchsink", + "label": "BigQuery3", + "icon": "fa-plug" + }, + { + "name": "BigQuery2", + "plugin": { + "name": "BigQueryTable", + "type": "batchsource", + "label": "BigQuery2", + "artifact": { + "name": "google-cloud", + "version": "0.22.0-SNAPSHOT", + "scope": "SYSTEM" + }, + "properties": { + "useConnection": "false", + "project": "cdf-athena", + "serviceAccountType": "filePath", + "serviceFilePath": "auto-detect", + "referenceName": "bq_test", + "dataset": "bq_automation", + "table": "bqSourceTableLess", + "enableQueryingViews": "false", + "schema": "{\"type\":\"record\",\"name\":\"output\",\"fields\":[{\"name\":\"ID\",\"type\":[\"long\",\"null\"]},{\"name\":\"Name\",\"type\":[\"string\",\"null\"]},{\"name\":\"Price\",\"type\":[\"double\",\"null\"]}]}" + } + }, + "outputSchema": "{\"type\":\"record\",\"name\":\"output\",\"fields\":[{\"name\":\"ID\",\"type\":[\"long\",\"null\"]},{\"name\":\"Name\",\"type\":[\"string\",\"null\"]},{\"name\":\"Price\",\"type\":[\"double\",\"null\"]}]}", + "id": "BigQuery2", + "type": "batchsource", + "label": "BigQuery2", + "icon": "fa-plug" + } + ], + "schedule": "0 1 */1 * *", + "engine": "spark", + "numOfRecordsPreview": 100, + "rangeRecordsPreview": { + "min": 1, + "max": "5000" + }, + "maxConcurrentRuns": 1 + }, + "version": "fe4ee1e3-6380-11ee-8217-0000003390c8" +} \ No newline at end of file