forked from data-integrations/google-cloud
-
Notifications
You must be signed in to change notification settings - Fork 2
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
8 changed files
with
498 additions
and
1 deletion.
There are no files selected for viewing
219 changes: 219 additions & 0 deletions
219
src/e2e-test/features/bigquery/multitablesink/BigQueryMultiTableAdditional.feature
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,219 @@ | ||
# Copyright © 2024 Cask Data, Inc. | ||
# | ||
# Licensed under the Apache License, Version 2.0 (the "License"); you may not | ||
# use this file except in compliance with the License. You may obtain a copy of | ||
# the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT | ||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the | ||
# License for the specific language governing permissions and limitations under | ||
# the License. | ||
|
||
@BigQueryMultiTable_Sink | ||
Feature: BigQueryMultiTable sink -Verification of BigQuery to BigQueryMultiTable successful data transfer | ||
|
||
@BQ_SOURCE_BQMT_TEST @BQ_SOURCE2_BQMT_TEST @BQ_DELETE_TEST | ||
Scenario: Verify data successfully transferred from BigQuery To BigQueryMultiTable in two new tables | ||
Given Open Datafusion Project to configure pipeline | ||
When Expand Plugin group in the LHS plugins list: "Source" | ||
When Select plugin: "BigQuery" from the plugins list as: "Source" | ||
Then Navigate to the properties page of plugin: "BigQuery" | ||
Then Replace input plugin property: "project" with value: "projectId" | ||
Then Replace input plugin property: "dataset" with value: "dataset" | ||
Then Replace input plugin property: "table" with value: "bqSourceTable" | ||
Then Click on the Get Schema button | ||
Then Validate "BigQuery" plugin properties | ||
Then Close the Plugin Properties page | ||
When Expand Plugin group in the LHS plugins list: "Source" | ||
When Select plugin: "BigQuery" from the plugins list as: "Source" | ||
Then Navigate to the properties page of plugin: "BigQuery2" | ||
Then Replace input plugin property: "project" with value: "projectId" | ||
Then Replace input plugin property: "dataset" with value: "dataset" | ||
Then Replace input plugin property: "table" with value: "bqSourceTable2" | ||
Then Click on the Get Schema button | ||
Then Validate "BigQuery2" plugin properties | ||
Then Close the Plugin Properties page | ||
When Expand Plugin group in the LHS plugins list: "Sink" | ||
When Select plugin: "BigQuery Multi Table" from the plugins list as: "Sink" | ||
Then Connect plugins: "BigQuery" and "BigQuery Multi Table" to establish connection | ||
Then Connect plugins: "BigQuery2" and "BigQuery Multi Table" to establish connection | ||
Then Navigate to the properties page of plugin: "BigQuery Multi Table" | ||
And Enter input plugin property: "referenceName" with value: "Reference" | ||
And Replace input plugin property: "project" with value: "projectId" | ||
And Enter input plugin property: "datasetProject" with value: "datasetprojectId" | ||
And Enter input plugin property: "dataset" with value: "dataset" | ||
Then Override Service account details if set in environment variables | ||
Then Click plugin property: "flexibleSchema" | ||
Then Validate "BigQuery Multi Table" plugin properties | ||
And Close the Plugin Properties page | ||
Then Save the pipeline | ||
Then Deploy the pipeline | ||
Then Run the Pipeline in Runtime | ||
Then Wait till pipeline is in running state | ||
Then Open and capture logs | ||
Then Verify the pipeline status is "Succeeded" | ||
Then Validate data transferred from BigQuery To BigQueryMultiTable is equal | ||
|
||
@BQ_SOURCE_BQMT_TEST @BQ_SINK_BQMT_TEST | ||
Scenario: Verify data successfully transferred from BigQuery To BigQueryMultiTable in one table | ||
Given Open Datafusion Project to configure pipeline | ||
When Expand Plugin group in the LHS plugins list: "Source" | ||
When Select plugin: "BigQuery" from the plugins list as: "Source" | ||
Then Navigate to the properties page of plugin: "BigQuery" | ||
Then Replace input plugin property: "project" with value: "projectId" | ||
Then Replace input plugin property: "dataset" with value: "dataset" | ||
Then Replace input plugin property: "table" with value: "bqSourceTable" | ||
Then Click on the Get Schema button | ||
Then Validate "BigQuery" plugin properties | ||
Then Close the Plugin Properties page | ||
When Expand Plugin group in the LHS plugins list: "Sink" | ||
When Select plugin: "BigQuery Multi Table" from the plugins list as: "Sink" | ||
Then Connect plugins: "BigQuery" and "BigQuery Multi Table" to establish connection | ||
Then Navigate to the properties page of plugin: "BigQuery Multi Table" | ||
And Enter input plugin property: "referenceName" with value: "Reference" | ||
And Replace input plugin property: "project" with value: "projectId" | ||
And Enter input plugin property: "datasetProject" with value: "datasetprojectId" | ||
And Enter input plugin property: "dataset" with value: "dataset" | ||
Then Override Service account details if set in environment variables | ||
Then Click plugin property: "flexibleSchema" | ||
Then Validate "BigQuery Multi Table" plugin properties | ||
And Close the Plugin Properties page | ||
Then Save the pipeline | ||
Then Deploy the pipeline | ||
Then Run the Pipeline in Runtime | ||
Then Wait till pipeline is in running state | ||
Then Open and capture logs | ||
Then Verify the pipeline status is "Succeeded" | ||
Then Validate data transferred from BigQuery To BigQueryMultiTable in one table is equal | ||
|
||
@BQ_SOURCE2_BQMT_TEST @BQ_SOURCE_BQMT_TEST @BQ_EXISTING_TEST @BQ_DELETE_TEST | ||
Scenario: Verify data successfully transferred from BigQuery To BigQueryMultiTable in two existing tables | ||
Given Open Datafusion Project to configure pipeline | ||
When Expand Plugin group in the LHS plugins list: "Source" | ||
When Select plugin: "BigQuery" from the plugins list as: "Source" | ||
Then Navigate to the properties page of plugin: "BigQuery" | ||
Then Replace input plugin property: "project" with value: "projectId" | ||
Then Replace input plugin property: "dataset" with value: "dataset" | ||
Then Replace input plugin property: "table" with value: "bqSourceTable" | ||
Then Click on the Get Schema button | ||
Then Validate "BigQuery" plugin properties | ||
Then Close the Plugin Properties page | ||
When Expand Plugin group in the LHS plugins list: "Source" | ||
When Select plugin: "BigQuery" from the plugins list as: "Source" | ||
Then Navigate to the properties page of plugin: "BigQuery2" | ||
Then Replace input plugin property: "project" with value: "projectId" | ||
Then Replace input plugin property: "dataset" with value: "dataset" | ||
Then Replace input plugin property: "table" with value: "bqSourceTable2" | ||
Then Click on the Get Schema button | ||
Then Validate "BigQuery2" plugin properties | ||
Then Close the Plugin Properties page | ||
When Expand Plugin group in the LHS plugins list: "Sink" | ||
When Select plugin: "BigQuery Multi Table" from the plugins list as: "Sink" | ||
Then Connect plugins: "BigQuery" and "BigQuery Multi Table" to establish connection | ||
Then Connect plugins: "BigQuery2" and "BigQuery Multi Table" to establish connection | ||
Then Navigate to the properties page of plugin: "BigQuery Multi Table" | ||
And Enter input plugin property: "referenceName" with value: "Reference" | ||
And Replace input plugin property: "project" with value: "projectId" | ||
And Enter input plugin property: "datasetProject" with value: "datasetprojectId" | ||
And Enter input plugin property: "dataset" with value: "dataset" | ||
Then Override Service account details if set in environment variables | ||
Then Click plugin property: "flexibleSchema" | ||
Then Validate "BigQuery Multi Table" plugin properties | ||
And Close the Plugin Properties page | ||
Then Save the pipeline | ||
Then Deploy the pipeline | ||
Then Run the Pipeline in Runtime | ||
Then Wait till pipeline is in running state | ||
Then Open and capture logs | ||
Then Verify the pipeline status is "Succeeded" | ||
Then Validate data transferred from BigQuery To BigQueryMultiTable is equal | ||
|
||
@BQ_SOURCE2_BQMT_TEST @BQ_SOURCE_BQMT_TEST @BQ_EXISTING_TEST @BQ_DELETE_TEST | ||
Scenario: Verify data successfully transferred from BigQuery To BigQueryMultiTable in two existing tables using truncate | ||
Given Open Datafusion Project to configure pipeline | ||
When Expand Plugin group in the LHS plugins list: "Source" | ||
When Select plugin: "BigQuery" from the plugins list as: "Source" | ||
Then Navigate to the properties page of plugin: "BigQuery" | ||
Then Replace input plugin property: "project" with value: "projectId" | ||
Then Replace input plugin property: "dataset" with value: "dataset" | ||
Then Replace input plugin property: "table" with value: "bqSourceTable" | ||
Then Click on the Get Schema button | ||
Then Validate "BigQuery" plugin properties | ||
Then Close the Plugin Properties page | ||
When Expand Plugin group in the LHS plugins list: "Source" | ||
When Select plugin: "BigQuery" from the plugins list as: "Source" | ||
Then Navigate to the properties page of plugin: "BigQuery2" | ||
Then Replace input plugin property: "project" with value: "projectId" | ||
Then Replace input plugin property: "dataset" with value: "dataset" | ||
Then Replace input plugin property: "table" with value: "bqSourceTable2" | ||
Then Click on the Get Schema button | ||
Then Validate "BigQuery2" plugin properties | ||
Then Close the Plugin Properties page | ||
When Expand Plugin group in the LHS plugins list: "Sink" | ||
When Select plugin: "BigQuery Multi Table" from the plugins list as: "Sink" | ||
Then Connect plugins: "BigQuery" and "BigQuery Multi Table" to establish connection | ||
Then Connect plugins: "BigQuery2" and "BigQuery Multi Table" to establish connection | ||
Then Navigate to the properties page of plugin: "BigQuery Multi Table" | ||
And Enter input plugin property: "referenceName" with value: "Reference" | ||
And Replace input plugin property: "project" with value: "projectId" | ||
And Enter input plugin property: "datasetProject" with value: "datasetprojectId" | ||
And Enter input plugin property: "dataset" with value: "dataset" | ||
Then Override Service account details if set in environment variables | ||
Then Click plugin property: "flexibleSchema" | ||
Then Toggle BigQuery sink property truncateTable to true | ||
Then Validate "BigQuery Multi Table" plugin properties | ||
And Close the Plugin Properties page | ||
Then Save the pipeline | ||
Then Deploy the pipeline | ||
Then Run the Pipeline in Runtime | ||
Then Wait till pipeline is in running state | ||
Then Open and capture logs | ||
Then Verify the pipeline status is "Succeeded" | ||
Then Validate data transferred from BigQuery To BigQueryMultiTable is equal | ||
|
||
@BQ_SOURCE_UPDATE_TEST @BQ_EXISTING_TEST @BQ_DELETE_TEST | ||
Scenario: Verify data successfully transferred from BigQuery To BigQueryMultiTable in two existing tables after updating schema | ||
Given Open Datafusion Project to configure pipeline | ||
When Expand Plugin group in the LHS plugins list: "Source" | ||
When Select plugin: "BigQuery" from the plugins list as: "Source" | ||
Then Navigate to the properties page of plugin: "BigQuery" | ||
Then Replace input plugin property: "project" with value: "projectId" | ||
Then Replace input plugin property: "dataset" with value: "dataset" | ||
Then Replace input plugin property: "table" with value: "bqSourceTable" | ||
Then Click on the Get Schema button | ||
Then Validate "BigQuery" plugin properties | ||
Then Close the Plugin Properties page | ||
When Expand Plugin group in the LHS plugins list: "Source" | ||
When Select plugin: "BigQuery" from the plugins list as: "Source" | ||
Then Navigate to the properties page of plugin: "BigQuery2" | ||
Then Replace input plugin property: "project" with value: "projectId" | ||
Then Replace input plugin property: "dataset" with value: "dataset" | ||
Then Replace input plugin property: "table" with value: "bqSourceTable2" | ||
Then Click on the Get Schema button | ||
Then Validate "BigQuery2" plugin properties | ||
Then Close the Plugin Properties page | ||
When Expand Plugin group in the LHS plugins list: "Sink" | ||
When Select plugin: "BigQuery Multi Table" from the plugins list as: "Sink" | ||
Then Connect plugins: "BigQuery" and "BigQuery Multi Table" to establish connection | ||
Then Connect plugins: "BigQuery2" and "BigQuery Multi Table" to establish connection | ||
Then Navigate to the properties page of plugin: "BigQuery Multi Table" | ||
And Enter input plugin property: "referenceName" with value: "Reference" | ||
And Replace input plugin property: "project" with value: "projectId" | ||
And Enter input plugin property: "datasetProject" with value: "datasetprojectId" | ||
And Enter input plugin property: "dataset" with value: "dataset" | ||
Then Override Service account details if set in environment variables | ||
Then Click plugin property: "flexibleSchema" | ||
Then Toggle BigQuery sink property truncateTable to true | ||
Then Select radio button plugin property: "updateSchema" with value: "true" | ||
Then Validate "BigQuery Multi Table" plugin properties | ||
And Close the Plugin Properties page | ||
Then Save the pipeline | ||
Then Deploy the pipeline | ||
Then Run the Pipeline in Runtime | ||
Then Wait till pipeline is in running state | ||
Then Open and capture logs | ||
Then Verify the pipeline status is "Succeeded" | ||
Then Validate data transferred from BigQuery To BigQueryMultiTable is equal |
28 changes: 28 additions & 0 deletions
28
src/e2e-test/java/io/cdap/plugin/bigquery/stepsdesign/BigQueryMultiTable.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,28 @@ | ||
package io.cdap.plugin.bigquery.stepsdesign; | ||
|
||
import io.cdap.e2e.utils.PluginPropertyUtils; | ||
import io.cucumber.java.en.Then; | ||
import org.junit.Assert; | ||
|
||
import java.io.IOException; | ||
import java.util.Arrays; | ||
import java.util.List; | ||
|
||
public class BigQueryMultiTable { | ||
@Then("Validate data transferred from BigQuery To BigQueryMultiTable is equal") | ||
public void validateDataTransferredFromBigQueryToBigQueryMultiTableIsEqual() throws IOException, InterruptedException { | ||
List<String> tables = Arrays.asList(PluginPropertyUtils.pluginProp("bqSourceTable"), | ||
PluginPropertyUtils.pluginProp("bqSourceTable2")); | ||
boolean recordsMatched = BigQueryMultiTableValidation.validateBQToBigQueryMultiTableInTwoTables(tables); | ||
Assert.assertTrue("Value of records transferred to the BQ sink should be equal to the value " + | ||
"of the records in the source table", recordsMatched); | ||
} | ||
|
||
@Then("Validate data transferred from BigQuery To BigQueryMultiTable in one table is equal") | ||
public void validateDataTransferredFromBigQueryToBigQueryMultiTableInOneTableIsEqual() throws IOException, InterruptedException { | ||
boolean recordsMatched = BigQueryMultiTableValidation.validateBQToBigQueryMultiTableInOneTable | ||
(PluginPropertyUtils.pluginProp("bqSourceTable")); | ||
Assert.assertTrue("Value of records transferred to the BQ sink should be equal to the value " + | ||
"of the records in the source table", recordsMatched); | ||
} | ||
} |
124 changes: 124 additions & 0 deletions
124
src/e2e-test/java/io/cdap/plugin/bigquery/stepsdesign/BigQueryMultiTableValidation.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,124 @@ | ||
package io.cdap.plugin.bigquery.stepsdesign; | ||
|
||
import com.google.cloud.bigquery.FieldValue; | ||
import com.google.cloud.bigquery.FieldValueList; | ||
import com.google.cloud.bigquery.TableResult; | ||
import com.google.gson.Gson; | ||
import com.google.gson.JsonObject; | ||
import io.cdap.e2e.utils.BigQueryClient; | ||
import io.cdap.e2e.utils.PluginPropertyUtils; | ||
|
||
import java.io.IOException; | ||
import java.util.ArrayList; | ||
import java.util.Arrays; | ||
import java.util.List; | ||
import java.util.stream.Collectors; | ||
|
||
public class BigQueryMultiTableValidation { | ||
static Gson gson = new Gson(); | ||
public static boolean validateBQToBigQueryMultiTableInTwoTables(List<String> sourceTables) throws IOException, InterruptedException { | ||
List<String> targetTables = getTableByName().stream().sorted().collect(Collectors.toList()); | ||
if (sourceTables.size() != targetTables.size()) { | ||
throw new IllegalArgumentException("Number of source tables and target tables must be the same."); | ||
} | ||
for (int i = 0; i < sourceTables.size(); i++) { | ||
String currentSourceTable = sourceTables.get(i); | ||
String currentTargetTable = targetTables.get(i); | ||
List<Object> bigQueryRows = new ArrayList<>(); | ||
getBigQueryTableData(currentSourceTable, bigQueryRows); | ||
List<JsonObject> bigQueryResponse = new ArrayList<>(); | ||
for (Object row : bigQueryRows) { | ||
JsonObject jsonData = gson.fromJson(String.valueOf(row), JsonObject.class); | ||
bigQueryResponse.add(jsonData); | ||
} | ||
List<Object> bigQueryRows2 = new ArrayList<>(); | ||
getBigQueryTableData(currentTargetTable, bigQueryRows2); | ||
|
||
List<JsonObject> bigQueryResponse2 = new ArrayList<>(); | ||
for (Object row : bigQueryRows2) { | ||
JsonObject jsonData = gson.fromJson(String.valueOf(row), JsonObject.class); | ||
bigQueryResponse2.add(jsonData); | ||
} | ||
boolean isValid = compareBigQueryDataAndBQMT(bigQueryResponse, bigQueryResponse2); | ||
if (!isValid) { | ||
return false; // Return false if validation fails for any table | ||
} | ||
} | ||
return true; // Return true if validation passes for all tables | ||
} | ||
|
||
public static boolean validateBQToBigQueryMultiTableInOneTable(String sourceTable) throws IOException, InterruptedException { | ||
String currentTargetTable = PluginPropertyUtils.pluginProp("bqTargetTable"); | ||
String currentSourceTable = sourceTable; | ||
List<Object> bigQueryRows = new ArrayList<>(); | ||
getBigQueryTableData(currentSourceTable, bigQueryRows); | ||
List<JsonObject> bigQueryResponse = new ArrayList<>(); | ||
for (Object row : bigQueryRows) { | ||
JsonObject jsonData = gson.fromJson(String.valueOf(row), JsonObject.class); | ||
bigQueryResponse.add(jsonData); | ||
} | ||
List<Object> bigQueryRows2 = new ArrayList<>(); | ||
getBigQueryTableData(currentTargetTable, bigQueryRows2); | ||
List<JsonObject> bigQueryResponse2 = new ArrayList<>(); | ||
for (Object row : bigQueryRows2) { | ||
JsonObject jsonData = gson.fromJson(String.valueOf(row), JsonObject.class); | ||
bigQueryResponse2.add(jsonData); | ||
} | ||
boolean isValid = compareBigQueryDataAndBQMT(bigQueryResponse, bigQueryResponse2); | ||
if (!isValid) { | ||
return false; // Return false if validation fails for any table | ||
} | ||
return true; // Return true if validation passes for all tables | ||
} | ||
|
||
private static void getBigQueryTableData(String table, List<Object> bigQueryRows) throws IOException, | ||
InterruptedException { | ||
String projectId = PluginPropertyUtils.pluginProp("projectId"); | ||
String dataset = PluginPropertyUtils.pluginProp("dataset"); | ||
String selectQuery = "SELECT TO_JSON(t) FROM `" + projectId + "." + dataset + "." + table + "` AS t"; | ||
TableResult result = BigQueryClient.getQueryResult(selectQuery); | ||
result.iterateAll().forEach(value -> bigQueryRows.add(value.get(0).getValue())); | ||
} | ||
|
||
public static TableResult getTableNamesFromDataSet() throws IOException, InterruptedException { | ||
String projectId = PluginPropertyUtils.pluginProp("projectId"); | ||
String dataset = PluginPropertyUtils.pluginProp("dataset"); | ||
String selectQuery = "SELECT table_name FROM `" + projectId + "." + dataset + "`.INFORMATION_SCHEMA.TABLES "; | ||
|
||
return BigQueryClient.getQueryResult(selectQuery); | ||
} | ||
|
||
public static List<String> getTableByName() throws IOException, InterruptedException { | ||
List<String> tableNames = new ArrayList<>(); | ||
List<String> targetTableNames = Arrays.asList("tabA", "tabB"); | ||
TableResult tableResult = getTableNamesFromDataSet(); | ||
Iterable<FieldValueList> rows = tableResult.iterateAll(); | ||
|
||
for (FieldValueList row : rows) { | ||
FieldValue fieldValue = row.get(0); | ||
String currentTableName = fieldValue.getStringValue(); | ||
|
||
if (targetTableNames.contains(currentTableName)) { | ||
tableNames.add(currentTableName); | ||
} | ||
} | ||
if (tableNames.isEmpty()) { | ||
throw new IllegalStateException("Tables not found."); // Throw an exception if no tables are found | ||
} | ||
return tableNames; | ||
} | ||
private static boolean compareBigQueryDataAndBQMT(List<JsonObject> bigQueryResponse, List<JsonObject> bqmtData) throws NullPointerException { | ||
if (bigQueryResponse.size() != bqmtData.size()) { | ||
return false; | ||
} | ||
// Compare individual elements | ||
for (int i = 0; i < bigQueryResponse.size(); i++) { | ||
JsonObject obj1 = bigQueryResponse.get(i); | ||
JsonObject obj2 = bqmtData.get(i); | ||
if (!obj1.equals(obj2)) { | ||
return false; | ||
} | ||
} | ||
return true; | ||
} | ||
} |
Oops, something went wrong.