diff --git a/src/e2e-test/features/bigquery/multitablesink/BigQueryMultiTableAdditional.feature b/src/e2e-test/features/bigquery/multitablesink/BigQueryMultiTableAdditional.feature new file mode 100644 index 0000000000..063e228368 --- /dev/null +++ b/src/e2e-test/features/bigquery/multitablesink/BigQueryMultiTableAdditional.feature @@ -0,0 +1,219 @@ +# Copyright © 2024 Cask Data, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may not +# use this file except in compliance with the License. You may obtain a copy of +# the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations under +# the License. + +@BigQueryMultiTable_Sink +Feature: BigQueryMultiTable sink -Verification of BigQuery to BigQueryMultiTable successful data transfer + + @BQ_SOURCE_BQMT_TEST @BQ_SOURCE2_BQMT_TEST @BQ_DELETE_TEST + Scenario: Verify data successfully transferred from BigQuery To BigQueryMultiTable in two new tables + Given Open Datafusion Project to configure pipeline + When Expand Plugin group in the LHS plugins list: "Source" + When Select plugin: "BigQuery" from the plugins list as: "Source" + Then Navigate to the properties page of plugin: "BigQuery" + Then Replace input plugin property: "project" with value: "projectId" + Then Replace input plugin property: "dataset" with value: "dataset" + Then Replace input plugin property: "table" with value: "bqSourceTable" + Then Click on the Get Schema button + Then Validate "BigQuery" plugin properties + Then Close the Plugin Properties page + When Expand Plugin group in the LHS plugins list: "Source" + When Select plugin: "BigQuery" from the plugins list as: "Source" + Then Navigate to the properties page of plugin: "BigQuery2" + Then Replace input plugin property: "project" with value: "projectId" + Then Replace input plugin property: "dataset" with value: "dataset" + Then Replace input plugin property: "table" with value: "bqSourceTable2" + Then Click on the Get Schema button + Then Validate "BigQuery2" plugin properties + Then Close the Plugin Properties page + When Expand Plugin group in the LHS plugins list: "Sink" + When Select plugin: "BigQuery Multi Table" from the plugins list as: "Sink" + Then Connect plugins: "BigQuery" and "BigQuery Multi Table" to establish connection + Then Connect plugins: "BigQuery2" and "BigQuery Multi Table" to establish connection + Then Navigate to the properties page of plugin: "BigQuery Multi Table" + And Enter input plugin property: "referenceName" with value: "Reference" + And Replace input plugin property: "project" with value: "projectId" + And Enter input plugin property: "datasetProject" with value: "datasetprojectId" + And Enter input plugin property: "dataset" with value: "dataset" + Then Override Service account details if set in environment variables + Then Click plugin property: "flexibleSchema" + Then Validate "BigQuery Multi Table" plugin properties + And Close the Plugin Properties page + Then Save the pipeline + Then Deploy the pipeline + Then Run the Pipeline in Runtime + Then Wait till pipeline is in running state + Then Open and capture logs + Then Verify the pipeline status is "Succeeded" + Then Validate data transferred from BigQuery To BigQueryMultiTable is equal + + @BQ_SOURCE_BQMT_TEST @BQ_SINK_BQMT_TEST + Scenario: Verify data successfully transferred from BigQuery To BigQueryMultiTable in one table + Given Open Datafusion Project to configure pipeline + When Expand Plugin group in the LHS plugins list: "Source" + When Select plugin: "BigQuery" from the plugins list as: "Source" + Then Navigate to the properties page of plugin: "BigQuery" + Then Replace input plugin property: "project" with value: "projectId" + Then Replace input plugin property: "dataset" with value: "dataset" + Then Replace input plugin property: "table" with value: "bqSourceTable" + Then Click on the Get Schema button + Then Validate "BigQuery" plugin properties + Then Close the Plugin Properties page + When Expand Plugin group in the LHS plugins list: "Sink" + When Select plugin: "BigQuery Multi Table" from the plugins list as: "Sink" + Then Connect plugins: "BigQuery" and "BigQuery Multi Table" to establish connection + Then Navigate to the properties page of plugin: "BigQuery Multi Table" + And Enter input plugin property: "referenceName" with value: "Reference" + And Replace input plugin property: "project" with value: "projectId" + And Enter input plugin property: "datasetProject" with value: "datasetprojectId" + And Enter input plugin property: "dataset" with value: "dataset" + Then Override Service account details if set in environment variables + Then Click plugin property: "flexibleSchema" + Then Validate "BigQuery Multi Table" plugin properties + And Close the Plugin Properties page + Then Save the pipeline + Then Deploy the pipeline + Then Run the Pipeline in Runtime + Then Wait till pipeline is in running state + Then Open and capture logs + Then Verify the pipeline status is "Succeeded" + Then Validate data transferred from BigQuery To BigQueryMultiTable in one table is equal + + @BQ_SOURCE2_BQMT_TEST @BQ_SOURCE_BQMT_TEST @BQ_EXISTING_TEST @BQ_DELETE_TEST + Scenario: Verify data successfully transferred from BigQuery To BigQueryMultiTable in two existing tables + Given Open Datafusion Project to configure pipeline + When Expand Plugin group in the LHS plugins list: "Source" + When Select plugin: "BigQuery" from the plugins list as: "Source" + Then Navigate to the properties page of plugin: "BigQuery" + Then Replace input plugin property: "project" with value: "projectId" + Then Replace input plugin property: "dataset" with value: "dataset" + Then Replace input plugin property: "table" with value: "bqSourceTable" + Then Click on the Get Schema button + Then Validate "BigQuery" plugin properties + Then Close the Plugin Properties page + When Expand Plugin group in the LHS plugins list: "Source" + When Select plugin: "BigQuery" from the plugins list as: "Source" + Then Navigate to the properties page of plugin: "BigQuery2" + Then Replace input plugin property: "project" with value: "projectId" + Then Replace input plugin property: "dataset" with value: "dataset" + Then Replace input plugin property: "table" with value: "bqSourceTable2" + Then Click on the Get Schema button + Then Validate "BigQuery2" plugin properties + Then Close the Plugin Properties page + When Expand Plugin group in the LHS plugins list: "Sink" + When Select plugin: "BigQuery Multi Table" from the plugins list as: "Sink" + Then Connect plugins: "BigQuery" and "BigQuery Multi Table" to establish connection + Then Connect plugins: "BigQuery2" and "BigQuery Multi Table" to establish connection + Then Navigate to the properties page of plugin: "BigQuery Multi Table" + And Enter input plugin property: "referenceName" with value: "Reference" + And Replace input plugin property: "project" with value: "projectId" + And Enter input plugin property: "datasetProject" with value: "datasetprojectId" + And Enter input plugin property: "dataset" with value: "dataset" + Then Override Service account details if set in environment variables + Then Click plugin property: "flexibleSchema" + Then Validate "BigQuery Multi Table" plugin properties + And Close the Plugin Properties page + Then Save the pipeline + Then Deploy the pipeline + Then Run the Pipeline in Runtime + Then Wait till pipeline is in running state + Then Open and capture logs + Then Verify the pipeline status is "Succeeded" + Then Validate data transferred from BigQuery To BigQueryMultiTable is equal + + @BQ_SOURCE2_BQMT_TEST @BQ_SOURCE_BQMT_TEST @BQ_EXISTING_TEST @BQ_DELETE_TEST + Scenario: Verify data successfully transferred from BigQuery To BigQueryMultiTable in two existing tables using truncate + Given Open Datafusion Project to configure pipeline + When Expand Plugin group in the LHS plugins list: "Source" + When Select plugin: "BigQuery" from the plugins list as: "Source" + Then Navigate to the properties page of plugin: "BigQuery" + Then Replace input plugin property: "project" with value: "projectId" + Then Replace input plugin property: "dataset" with value: "dataset" + Then Replace input plugin property: "table" with value: "bqSourceTable" + Then Click on the Get Schema button + Then Validate "BigQuery" plugin properties + Then Close the Plugin Properties page + When Expand Plugin group in the LHS plugins list: "Source" + When Select plugin: "BigQuery" from the plugins list as: "Source" + Then Navigate to the properties page of plugin: "BigQuery2" + Then Replace input plugin property: "project" with value: "projectId" + Then Replace input plugin property: "dataset" with value: "dataset" + Then Replace input plugin property: "table" with value: "bqSourceTable2" + Then Click on the Get Schema button + Then Validate "BigQuery2" plugin properties + Then Close the Plugin Properties page + When Expand Plugin group in the LHS plugins list: "Sink" + When Select plugin: "BigQuery Multi Table" from the plugins list as: "Sink" + Then Connect plugins: "BigQuery" and "BigQuery Multi Table" to establish connection + Then Connect plugins: "BigQuery2" and "BigQuery Multi Table" to establish connection + Then Navigate to the properties page of plugin: "BigQuery Multi Table" + And Enter input plugin property: "referenceName" with value: "Reference" + And Replace input plugin property: "project" with value: "projectId" + And Enter input plugin property: "datasetProject" with value: "datasetprojectId" + And Enter input plugin property: "dataset" with value: "dataset" + Then Override Service account details if set in environment variables + Then Click plugin property: "flexibleSchema" + Then Toggle BigQuery sink property truncateTable to true + Then Validate "BigQuery Multi Table" plugin properties + And Close the Plugin Properties page + Then Save the pipeline + Then Deploy the pipeline + Then Run the Pipeline in Runtime + Then Wait till pipeline is in running state + Then Open and capture logs + Then Verify the pipeline status is "Succeeded" + Then Validate data transferred from BigQuery To BigQueryMultiTable is equal + + @BQ_SOURCE_UPDATE_TEST @BQ_EXISTING_TEST @BQ_DELETE_TEST + Scenario: Verify data successfully transferred from BigQuery To BigQueryMultiTable in two existing tables after updating schema + Given Open Datafusion Project to configure pipeline + When Expand Plugin group in the LHS plugins list: "Source" + When Select plugin: "BigQuery" from the plugins list as: "Source" + Then Navigate to the properties page of plugin: "BigQuery" + Then Replace input plugin property: "project" with value: "projectId" + Then Replace input plugin property: "dataset" with value: "dataset" + Then Replace input plugin property: "table" with value: "bqSourceTable" + Then Click on the Get Schema button + Then Validate "BigQuery" plugin properties + Then Close the Plugin Properties page + When Expand Plugin group in the LHS plugins list: "Source" + When Select plugin: "BigQuery" from the plugins list as: "Source" + Then Navigate to the properties page of plugin: "BigQuery2" + Then Replace input plugin property: "project" with value: "projectId" + Then Replace input plugin property: "dataset" with value: "dataset" + Then Replace input plugin property: "table" with value: "bqSourceTable2" + Then Click on the Get Schema button + Then Validate "BigQuery2" plugin properties + Then Close the Plugin Properties page + When Expand Plugin group in the LHS plugins list: "Sink" + When Select plugin: "BigQuery Multi Table" from the plugins list as: "Sink" + Then Connect plugins: "BigQuery" and "BigQuery Multi Table" to establish connection + Then Connect plugins: "BigQuery2" and "BigQuery Multi Table" to establish connection + Then Navigate to the properties page of plugin: "BigQuery Multi Table" + And Enter input plugin property: "referenceName" with value: "Reference" + And Replace input plugin property: "project" with value: "projectId" + And Enter input plugin property: "datasetProject" with value: "datasetprojectId" + And Enter input plugin property: "dataset" with value: "dataset" + Then Override Service account details if set in environment variables + Then Click plugin property: "flexibleSchema" + Then Toggle BigQuery sink property truncateTable to true + Then Select radio button plugin property: "updateSchema" with value: "true" + Then Validate "BigQuery Multi Table" plugin properties + And Close the Plugin Properties page + Then Save the pipeline + Then Deploy the pipeline + Then Run the Pipeline in Runtime + Then Wait till pipeline is in running state + Then Open and capture logs + Then Verify the pipeline status is "Succeeded" + Then Validate data transferred from BigQuery To BigQueryMultiTable is equal diff --git a/src/e2e-test/java/io/cdap/plugin/bigquery/stepsdesign/BigQueryMultiTable.java b/src/e2e-test/java/io/cdap/plugin/bigquery/stepsdesign/BigQueryMultiTable.java new file mode 100644 index 0000000000..38936c82c5 --- /dev/null +++ b/src/e2e-test/java/io/cdap/plugin/bigquery/stepsdesign/BigQueryMultiTable.java @@ -0,0 +1,28 @@ +package io.cdap.plugin.bigquery.stepsdesign; + +import io.cdap.e2e.utils.PluginPropertyUtils; +import io.cucumber.java.en.Then; +import org.junit.Assert; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; + +public class BigQueryMultiTable { + @Then("Validate data transferred from BigQuery To BigQueryMultiTable is equal") + public void validateDataTransferredFromBigQueryToBigQueryMultiTableIsEqual() throws IOException, InterruptedException { + List tables = Arrays.asList(PluginPropertyUtils.pluginProp("bqSourceTable"), + PluginPropertyUtils.pluginProp("bqSourceTable2")); + boolean recordsMatched = BigQueryMultiTableValidation.validateBQToBigQueryMultiTableInTwoTables(tables); + Assert.assertTrue("Value of records transferred to the BQ sink should be equal to the value " + + "of the records in the source table", recordsMatched); + } + + @Then("Validate data transferred from BigQuery To BigQueryMultiTable in one table is equal") + public void validateDataTransferredFromBigQueryToBigQueryMultiTableInOneTableIsEqual() throws IOException, InterruptedException { + boolean recordsMatched = BigQueryMultiTableValidation.validateBQToBigQueryMultiTableInOneTable + (PluginPropertyUtils.pluginProp("bqSourceTable")); + Assert.assertTrue("Value of records transferred to the BQ sink should be equal to the value " + + "of the records in the source table", recordsMatched); + } +} diff --git a/src/e2e-test/java/io/cdap/plugin/bigquery/stepsdesign/BigQueryMultiTableValidation.java b/src/e2e-test/java/io/cdap/plugin/bigquery/stepsdesign/BigQueryMultiTableValidation.java new file mode 100644 index 0000000000..722d95820e --- /dev/null +++ b/src/e2e-test/java/io/cdap/plugin/bigquery/stepsdesign/BigQueryMultiTableValidation.java @@ -0,0 +1,124 @@ +package io.cdap.plugin.bigquery.stepsdesign; + +import com.google.cloud.bigquery.FieldValue; +import com.google.cloud.bigquery.FieldValueList; +import com.google.cloud.bigquery.TableResult; +import com.google.gson.Gson; +import com.google.gson.JsonObject; +import io.cdap.e2e.utils.BigQueryClient; +import io.cdap.e2e.utils.PluginPropertyUtils; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; + +public class BigQueryMultiTableValidation { + static Gson gson = new Gson(); + public static boolean validateBQToBigQueryMultiTableInTwoTables(List sourceTables) throws IOException, InterruptedException { + List targetTables = getTableByName().stream().sorted().collect(Collectors.toList()); + if (sourceTables.size() != targetTables.size()) { + throw new IllegalArgumentException("Number of source tables and target tables must be the same."); + } + for (int i = 0; i < sourceTables.size(); i++) { + String currentSourceTable = sourceTables.get(i); + String currentTargetTable = targetTables.get(i); + List bigQueryRows = new ArrayList<>(); + getBigQueryTableData(currentSourceTable, bigQueryRows); + List bigQueryResponse = new ArrayList<>(); + for (Object row : bigQueryRows) { + JsonObject jsonData = gson.fromJson(String.valueOf(row), JsonObject.class); + bigQueryResponse.add(jsonData); + } + List bigQueryRows2 = new ArrayList<>(); + getBigQueryTableData(currentTargetTable, bigQueryRows2); + + List bigQueryResponse2 = new ArrayList<>(); + for (Object row : bigQueryRows2) { + JsonObject jsonData = gson.fromJson(String.valueOf(row), JsonObject.class); + bigQueryResponse2.add(jsonData); + } + boolean isValid = compareBigQueryDataAndBQMT(bigQueryResponse, bigQueryResponse2); + if (!isValid) { + return false; // Return false if validation fails for any table + } + } + return true; // Return true if validation passes for all tables + } + + public static boolean validateBQToBigQueryMultiTableInOneTable(String sourceTable) throws IOException, InterruptedException { + String currentTargetTable = PluginPropertyUtils.pluginProp("bqTargetTable"); + String currentSourceTable = sourceTable; + List bigQueryRows = new ArrayList<>(); + getBigQueryTableData(currentSourceTable, bigQueryRows); + List bigQueryResponse = new ArrayList<>(); + for (Object row : bigQueryRows) { + JsonObject jsonData = gson.fromJson(String.valueOf(row), JsonObject.class); + bigQueryResponse.add(jsonData); + } + List bigQueryRows2 = new ArrayList<>(); + getBigQueryTableData(currentTargetTable, bigQueryRows2); + List bigQueryResponse2 = new ArrayList<>(); + for (Object row : bigQueryRows2) { + JsonObject jsonData = gson.fromJson(String.valueOf(row), JsonObject.class); + bigQueryResponse2.add(jsonData); + } + boolean isValid = compareBigQueryDataAndBQMT(bigQueryResponse, bigQueryResponse2); + if (!isValid) { + return false; // Return false if validation fails for any table + } + return true; // Return true if validation passes for all tables + } + + private static void getBigQueryTableData(String table, List bigQueryRows) throws IOException, + InterruptedException { + String projectId = PluginPropertyUtils.pluginProp("projectId"); + String dataset = PluginPropertyUtils.pluginProp("dataset"); + String selectQuery = "SELECT TO_JSON(t) FROM `" + projectId + "." + dataset + "." + table + "` AS t"; + TableResult result = BigQueryClient.getQueryResult(selectQuery); + result.iterateAll().forEach(value -> bigQueryRows.add(value.get(0).getValue())); + } + + public static TableResult getTableNamesFromDataSet() throws IOException, InterruptedException { + String projectId = PluginPropertyUtils.pluginProp("projectId"); + String dataset = PluginPropertyUtils.pluginProp("dataset"); + String selectQuery = "SELECT table_name FROM `" + projectId + "." + dataset + "`.INFORMATION_SCHEMA.TABLES "; + + return BigQueryClient.getQueryResult(selectQuery); + } + + public static List getTableByName() throws IOException, InterruptedException { + List tableNames = new ArrayList<>(); + List targetTableNames = Arrays.asList("tabA", "tabB"); + TableResult tableResult = getTableNamesFromDataSet(); + Iterable rows = tableResult.iterateAll(); + + for (FieldValueList row : rows) { + FieldValue fieldValue = row.get(0); + String currentTableName = fieldValue.getStringValue(); + + if (targetTableNames.contains(currentTableName)) { + tableNames.add(currentTableName); + } + } + if (tableNames.isEmpty()) { + throw new IllegalStateException("Tables not found."); // Throw an exception if no tables are found + } + return tableNames; + } + private static boolean compareBigQueryDataAndBQMT(List bigQueryResponse, List bqmtData) throws NullPointerException { + if (bigQueryResponse.size() != bqmtData.size()) { + return false; + } + // Compare individual elements + for (int i = 0; i < bigQueryResponse.size(); i++) { + JsonObject obj1 = bigQueryResponse.get(i); + JsonObject obj2 = bqmtData.get(i); + if (!obj1.equals(obj2)) { + return false; + } + } + return true; + } +} diff --git a/src/e2e-test/java/io/cdap/plugin/common/stepsdesign/TestSetupHooks.java b/src/e2e-test/java/io/cdap/plugin/common/stepsdesign/TestSetupHooks.java index fc92c2e6eb..686cddc9f6 100644 --- a/src/e2e-test/java/io/cdap/plugin/common/stepsdesign/TestSetupHooks.java +++ b/src/e2e-test/java/io/cdap/plugin/common/stepsdesign/TestSetupHooks.java @@ -60,6 +60,7 @@ public class TestSetupHooks { public static String bqTargetTable = StringUtils.EMPTY; public static String bqSourceTable = StringUtils.EMPTY; public static String bqSourceTable2 = StringUtils.EMPTY; + public static String bqTargetTable2 = StringUtils.EMPTY; public static String bqSourceView = StringUtils.EMPTY; public static String pubSubTargetTopic = StringUtils.EMPTY; public static String spannerInstance = StringUtils.EMPTY; @@ -1361,5 +1362,117 @@ public static void makeExistingTargetSpannerDBAndTableName() { e.printStackTrace(); } } + @Before(order = 1, value = "@BQ_SOURCE_BQMT_TEST") + public static void createSourceBQTableForBqmt() throws IOException, InterruptedException { + createSourceBQTableWithQueries(PluginPropertyUtils.pluginProp("bqmtCreateTableQueryFile"), + PluginPropertyUtils.pluginProp("bqmtInsertDataQueryFile")); + } + + @Before(order = 1, value = "@BQ_SOURCE2_BQMT_TEST") + public static void createSourceBQSecondTable() throws IOException, InterruptedException { + bqSourceTable2 = "E2E_SOURCE_" + UUID.randomUUID().toString().replaceAll("-", "_"); + io.cdap.e2e.utils.BigQueryClient.getSoleQueryResult("create table `" + datasetName + "." + bqSourceTable2 + "` " + + "(ID INT64, tablename STRING," + + "Price FLOAT64, Customer_Exists BOOL ) "); + try { + io.cdap.e2e.utils.BigQueryClient.getSoleQueryResult("INSERT INTO `" + datasetName + "." + bqSourceTable2 + "` " + + "(ID, tablename, Price, Customer_Exists)" + + "VALUES" + "(3, 'tabB', 0.5, true )"); + } catch (NoSuchElementException e) { + // Insert query does not return any record. + // Iterator on TableResult values in getSoleQueryResult method throws NoSuchElementException + BeforeActions.scenario.write("Error inserting the record in the table" + e.getStackTrace()); + } + PluginPropertyUtils.addPluginProp("bqSourceTable2", bqSourceTable2); + BeforeActions.scenario.write("BQ Source Table2 " + bqSourceTable2 + " created successfully"); + } + + + @Before(order = 1, value = "@BQ_EXISTING_TEST") + public static void createSinkTables() throws IOException, InterruptedException { + bqTargetTable = PluginPropertyUtils.pluginProp("bqTargetTable"); + bqTargetTable2 = PluginPropertyUtils.pluginProp("bqTargetTable2"); + io.cdap.e2e.utils.BigQueryClient.getSoleQueryResult("create table `" + datasetName + "." + bqTargetTable + "` " + + "(ID INT64, tablename STRING," + + "Price FLOAT64, Customer_Exists BOOL ) "); + + io.cdap.e2e.utils.BigQueryClient.getSoleQueryResult("create table `" + datasetName + "." + bqTargetTable2 + "` " + + "(ID INT64, tablename STRING," + + "Price FLOAT64, Customer_Exists BOOL ) "); + + PluginPropertyUtils.addPluginProp("bqTargetTable", bqTargetTable); + PluginPropertyUtils.addPluginProp("bqTargetTable2", bqTargetTable2); + } + @Before(order = 1, value = "@BQ_SOURCE_UPDATE_TEST") + public static void createSourceTables() throws IOException, InterruptedException { + bqSourceTable = "E2E_SOURCE_" + UUID.randomUUID().toString().replaceAll("-", "_"); + bqSourceTable2 = "E2E_SOURCE_" + UUID.randomUUID().toString().replaceAll("-", "_"); + io.cdap.e2e.utils.BigQueryClient.getSoleQueryResult("create table `" + datasetName + "." + bqSourceTable + "` " + + "(ID INT64, tablename STRING," + + "Price FLOAT64, Customer_Exists BOOL, Address STRING ) "); + try { + io.cdap.e2e.utils.BigQueryClient.getSoleQueryResult("INSERT INTO `" + datasetName + "." + bqSourceTable + "` " + + "(ID, tablename, Price, Customer_Exists, Address)" + + "VALUES" + "(8, 'tabA', 0.5, true, 'GGN')"); + + } catch (NoSuchElementException e) { + // Insert query does not return any record. + // Iterator on TableResult values in getSoleQueryResult method throws NoSuchElementException + BeforeActions.scenario.write("Error inserting the record in the table" + e.getStackTrace()); + } + io.cdap.e2e.utils.BigQueryClient.getSoleQueryResult("create table `" + datasetName + "." + bqSourceTable2 + "` " + + "(ID INT64, tablename STRING," + + "Price FLOAT64, Customer_Exists BOOL, Address STRING ) "); + try { + io.cdap.e2e.utils.BigQueryClient.getSoleQueryResult("INSERT INTO `" + datasetName + "." + bqSourceTable2 + "` " + + "(ID, tablename, Price, Customer_Exists, Address)" + + "VALUES" + "(10, 'tabB', 1.0, true, 'PPU')"); + + } + catch (NoSuchElementException e) { + // Insert query does not return any record. + // Iterator on TableResult values in getSoleQueryResult method throws NoSuchElementException + BeforeActions.scenario.write("Error inserting the record in the table" + e.getStackTrace()); + } + + PluginPropertyUtils.addPluginProp("bqSourceTable", bqSourceTable); + PluginPropertyUtils.addPluginProp("bqSourceTable2", bqSourceTable2); + } + + @After(order = 1, value = "@BQ_DELETE_TEST") + public static void deleteAllBqTables() throws IOException, InterruptedException { + BigQueryClient.dropBqQuery(bqSourceTable); + BigQueryClient.dropBqQuery(bqSourceTable2); + BigQueryClient.dropBqQuery(PluginPropertyUtils.pluginProp("bqTargetTable")); + BigQueryClient.dropBqQuery(PluginPropertyUtils.pluginProp("bqTargetTable2")); + PluginPropertyUtils.removePluginProp("bqSourceTable"); + PluginPropertyUtils.removePluginProp("bqSourceTable2"); + BeforeActions.scenario.write("BQ source Table " + bqSourceTable + " deleted successfully"); + BeforeActions.scenario.write("BQ source Table2 " + bqSourceTable2 + " deleted successfully"); + BeforeActions.scenario.write("BQ target Table " + bqTargetTable + " deleted successfully"); + BeforeActions.scenario.write("BQ target Table2 " + bqTargetTable + " deleted successfully"); + bqSourceTable = StringUtils.EMPTY; + bqSourceTable2 = StringUtils.EMPTY; + bqTargetTable = StringUtils.EMPTY; + bqTargetTable2 = StringUtils.EMPTY; + } + @After(order = 1, value = "@BQ_SINK_BQMT_TEST") + public static void deleteTargetBqmtTable() throws IOException, InterruptedException { + try { + bqTargetTable = PluginPropertyUtils.pluginProp("bqTargetTable"); + BigQueryClient.dropBqQuery(bqTargetTable); + BigQueryClient.dropBqQuery(bqSourceTable); + BeforeActions.scenario.write("BQ Target table - " + bqTargetTable + " deleted successfully"); + BeforeActions.scenario.write("BQ Source table - " + bqSourceTable + " deleted successfully"); + bqTargetTable = StringUtils.EMPTY; + } catch (BigQueryException e) { + if (e.getMessage().contains("Not found: Table")) { + BeforeActions.scenario.write("BQ Target Table " + bqTargetTable + " does not exist"); + BeforeActions.scenario.write("BQ Source Table " + bqSourceTable + " does not exist"); + } else { + Assert.fail(e.getMessage()); + } + } + } } diff --git a/src/e2e-test/resources/pluginDataCyAttributes.properties b/src/e2e-test/resources/pluginDataCyAttributes.properties index 2f321c0333..d3890e916b 100644 --- a/src/e2e-test/resources/pluginDataCyAttributes.properties +++ b/src/e2e-test/resources/pluginDataCyAttributes.properties @@ -31,6 +31,8 @@ serviceAccountType=serviceAccountType serviceAccountFilePath=serviceFilePath serviceAccountJSON=serviceAccountJSON outputSchemaMacroInput=Output Schema-macro-input +flexibleSchema=switch-allowFlexibleSchema +updateSchema=allowSchemaRelaxation ## CONNECTION-MANAGEMENT-START connection=connection diff --git a/src/e2e-test/resources/pluginParameters.properties b/src/e2e-test/resources/pluginParameters.properties index 5f37646dc0..4d93a865ee 100644 --- a/src/e2e-test/resources/pluginParameters.properties +++ b/src/e2e-test/resources/pluginParameters.properties @@ -1,6 +1,6 @@ projectId=cdf-athena datasetprojectId=cdf-athena -dataset=bq_automation +dataset=ankitdataset wrongSourcePath=gs://00000000-e2e-0014a44f-81be-4501-8360-0ddca192492 serviceAccountType=filePath serviceAccount=auto-detect @@ -335,3 +335,10 @@ bqExecuteCountDMLUpsertInsert=SELECT COUNT(*) FROM `PROJECT_NAME.DATASET.TABLENA bqExecuteCountDMLUpsertUpdate=SELECT COUNT(*) FROM `PROJECT_NAME.DATASET.TABLENAME` WHERE Id=101 AND Value=5000 AND UID='UPDATED RECORD' bqExecuteInsertFile=testdata/BQExecute/BQExecuteInsertFile ## BQEXECUTE-PLUGIN-PROPERTIES-END + +## BQMT-PLUGIN-PROPERTIES-START +bqmtCreateTableQueryFile=testdata/BigQuery/BqmtCreateTableQuery.txt +bqmtInsertDataQueryFile=testdata/BigQuery/BqmtInsertDataQuery.txt +bqTargetTable=tabA +bqTargetTable2=tabB +## BQMT-PLUGIN-PROPERTIES-END \ No newline at end of file diff --git a/src/e2e-test/resources/testdata/BigQuery/BqmtCreateTableQuery.txt b/src/e2e-test/resources/testdata/BigQuery/BqmtCreateTableQuery.txt new file mode 100644 index 0000000000..fb92043ff6 --- /dev/null +++ b/src/e2e-test/resources/testdata/BigQuery/BqmtCreateTableQuery.txt @@ -0,0 +1 @@ +create table `DATASET.TABLE_NAME` (ID INTEGER, tablename STRING, Price Float64, Customer_Exists BOOLEAN ) \ No newline at end of file diff --git a/src/e2e-test/resources/testdata/BigQuery/BqmtInsertDataQuery.txt b/src/e2e-test/resources/testdata/BigQuery/BqmtInsertDataQuery.txt new file mode 100644 index 0000000000..852e7e8069 --- /dev/null +++ b/src/e2e-test/resources/testdata/BigQuery/BqmtInsertDataQuery.txt @@ -0,0 +1,3 @@ +INSERT INTO DATASET.TABLE_NAME (ID, tablename, Price, Customer_Exists) +VALUES +(1, 'tabA', 2.5, true); \ No newline at end of file