Skip to content

Commit

Permalink
addressed comments
Browse files Browse the repository at this point in the history
  • Loading branch information
AnkitCLI committed Mar 29, 2024
1 parent f5b8722 commit 3a50ad0
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 39 deletions.
Original file line number Diff line number Diff line change
@@ -1,3 +1,19 @@
/*
* Copyright © 2024 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.plugin.bigquery.stepsdesign;

import io.cdap.e2e.utils.PluginPropertyUtils;
Expand All @@ -8,6 +24,9 @@
import java.util.Arrays;
import java.util.List;

/**
* BigQuery MultiTable related common stepDesigns.
*/
public class BigQueryMultiTable {
@Then("Validate data transferred from BigQuery To BigQueryMultiTable is equal")
public void validateDataTransferredFromBigQueryToBigQueryMultiTableIsEqual() throws IOException, InterruptedException {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,19 @@
/*
* Copyright © 2024 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.plugin.bigquery.stepsdesign;

import com.google.cloud.bigquery.FieldValue;
Expand All @@ -14,8 +30,12 @@
import java.util.List;
import java.util.stream.Collectors;

/**
* BQValidation.
*/
public class BigQueryMultiTableValidation {
static Gson gson = new Gson();

public static boolean validateBQToBigQueryMultiTableInTwoTables(List<String> sourceTables) throws IOException, InterruptedException {
List<String> targetTables = getTableByName().stream().sorted().collect(Collectors.toList());
if (sourceTables.size() != targetTables.size()) {
Expand All @@ -24,51 +44,39 @@ public static boolean validateBQToBigQueryMultiTableInTwoTables(List<String> sou
for (int i = 0; i < sourceTables.size(); i++) {
String currentSourceTable = sourceTables.get(i);
String currentTargetTable = targetTables.get(i);
List<Object> bigQueryRows = new ArrayList<>();
getBigQueryTableData(currentSourceTable, bigQueryRows);
List<JsonObject> bigQueryResponse = new ArrayList<>();
for (Object row : bigQueryRows) {
JsonObject jsonData = gson.fromJson(String.valueOf(row), JsonObject.class);
bigQueryResponse.add(jsonData);
}
List<Object> bigQueryRows2 = new ArrayList<>();
getBigQueryTableData(currentTargetTable, bigQueryRows2);

List<JsonObject> bigQueryResponse2 = new ArrayList<>();
for (Object row : bigQueryRows2) {
JsonObject jsonData = gson.fromJson(String.valueOf(row), JsonObject.class);
bigQueryResponse2.add(jsonData);
}
boolean isValid = compareBigQueryDataAndBQMT(bigQueryResponse, bigQueryResponse2);
if (!isValid) {
if (!validateBQToBQSingleTable(currentSourceTable, currentTargetTable)) {
return false; // Return false if validation fails for any table
}
}
return true; // Return true if validation passes for all tables
// Return true if validation passes for all tables
return true;
}

public static boolean validateBQToBigQueryMultiTableInOneTable(String sourceTable) throws IOException, InterruptedException {
String currentTargetTable = PluginPropertyUtils.pluginProp("bqTargetTable");
String currentSourceTable = sourceTable;
List<Object> bigQueryRows = new ArrayList<>();
getBigQueryTableData(currentSourceTable, bigQueryRows);
List<JsonObject> bigQueryResponse = new ArrayList<>();
for (Object row : bigQueryRows) {
JsonObject jsonData = gson.fromJson(String.valueOf(row), JsonObject.class);
bigQueryResponse.add(jsonData);
}
List<Object> bigQueryRows2 = new ArrayList<>();
getBigQueryTableData(currentTargetTable, bigQueryRows2);
List<JsonObject> bigQueryResponse2 = new ArrayList<>();
for (Object row : bigQueryRows2) {
JsonObject jsonData = gson.fromJson(String.valueOf(row), JsonObject.class);
bigQueryResponse2.add(jsonData);
}
boolean isValid = compareBigQueryDataAndBQMT(bigQueryResponse, bigQueryResponse2);
if (!isValid) {
return false; // Return false if validation fails for any table
}
return true; // Return true if validation passes for all tables
return validateBQToBQSingleTable(sourceTable, currentTargetTable);
}

public static boolean validateBQToBQSingleTable(String sourceTable, String targetTable) throws IOException, InterruptedException {
// Fetch data from the source BigQuery table
List<Object> bigQueryRows = new ArrayList<>();
getBigQueryTableData(sourceTable, bigQueryRows);

// Convert fetched data into JSON objects
List<JsonObject> bigQueryResponse = new ArrayList<>();
for (Object row : bigQueryRows) {
JsonObject jsonData = gson.fromJson(String.valueOf(row), JsonObject.class);
bigQueryResponse.add(jsonData);
}
// Fetch data from the target BigQuery table
List<Object> bigQueryRows2 = new ArrayList<>();
getBigQueryTableData(targetTable, bigQueryRows2);
List<JsonObject> bigQueryResponse2 = new ArrayList<>();
for (Object row : bigQueryRows2) {
JsonObject jsonData = gson.fromJson(String.valueOf(row), JsonObject.class);
bigQueryResponse2.add(jsonData);
}
return compareBigQueryDataAndBQMT(bigQueryResponse, bigQueryResponse2);
}

private static void getBigQueryTableData(String table, List<Object> bigQueryRows) throws IOException,
Expand Down Expand Up @@ -107,6 +115,7 @@ public static List<String> getTableByName() throws IOException, InterruptedExcep
}
return tableNames;
}

private static boolean compareBigQueryDataAndBQMT(List<JsonObject> bigQueryResponse, List<JsonObject> bqmtData) throws NullPointerException {
if (bigQueryResponse.size() != bqmtData.size()) {
return false;
Expand Down
2 changes: 1 addition & 1 deletion src/e2e-test/resources/pluginParameters.properties
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
projectId=cdf-athena
datasetprojectId=cdf-athena
dataset=ankitdataset
dataset=test_bqmt_automation
wrongSourcePath=gs://00000000-e2e-0014a44f-81be-4501-8360-0ddca192492
serviceAccountType=filePath
serviceAccount=auto-detect
Expand Down

0 comments on commit 3a50ad0

Please sign in to comment.