diff --git a/src/e2e-test/features/gcs/source/GCSToGCSAdditonalTests.feature b/src/e2e-test/features/gcs/source/GCSToGCSAdditonalTests.feature index 5693837bfa..ff33b390ec 100644 --- a/src/e2e-test/features/gcs/source/GCSToGCSAdditonalTests.feature +++ b/src/e2e-test/features/gcs/source/GCSToGCSAdditonalTests.feature @@ -85,3 +85,310 @@ Feature: GCS source - Verification of GCS to GCS Additional Tests successful Then Open and capture logs Then Verify the pipeline status is "Succeeded" Then Validate the data transferred from GCS Source to GCS Sink with Expected avro file and target data in GCS bucket + + @GCS_CSV @GCS_SINK_TEST @EXISTING_GCS_CONNECTION + Scenario: To verify data is getting transferred from GCS Source to GCS Sink using test Schema Detection On Single File with connection + Given Open Datafusion Project to configure pipeline + When Select plugin: "GCS" from the plugins list as: "Source" + When Expand Plugin group in the LHS plugins list: "Sink" + When Select plugin: "GCS" from the plugins list as: "Sink" + Then Connect plugins: "GCS" and "GCS2" to establish connection + Then Navigate to the properties page of plugin: "GCS" + Then Select dropdown plugin property: "select-schema-actions-dropdown" with option value: "clear" + Then Click plugin property: "switch-useConnection" + Then Click on the Browse Connections button + Then Select connection: "gcsConnectionName" + Then Enter input plugin property: "referenceName" with value: "sourceRef" + Then Enter GCS source property path "gcsCsvDataFile" + Then Select GCS property format "delimited" + Then Enter input plugin property: "delimiter" with value: "delimiterValue" + Then Toggle GCS source property skip header to true + Then Validate "GCS" plugin properties + Then Verify the Output Schema matches the Expected Schema: "gcsSingleFileDataSchema" + Then Close the Plugin Properties page + Then Navigate to the properties page of plugin: "GCS2" + Then Click plugin property: "switch-useConnection" + Then Click on the Browse Connections button + Then Select connection: "gcsConnectionName" + Then Enter input plugin property: "referenceName" with value: "sourceRef" + Then Enter GCS sink property path + Then Select dropdown plugin property: "select-format" with option value: "csv" + Then Validate "GCS" plugin properties + Then Close the Plugin Properties page + Then Save the pipeline + Then Preview and run the pipeline + Then Wait till pipeline preview is in running state + Then Open and capture pipeline preview logs + Then Verify the preview run status of pipeline in the logs is "succeeded" + Then Close the pipeline logs + Then Click on preview data for GCS sink + Then Verify preview output schema matches the outputSchema captured in properties + Then Close the preview data + Then Deploy the pipeline + Then Run the Pipeline in Runtime + Then Wait till pipeline is in running state + Then Open and capture logs + Then Verify the pipeline status is "Succeeded" + Then Validate the data from GCS Source to GCS Sink with expected csv file and target data in GCS bucket + + @GCS_CSV @GCS_SINK_TEST + Scenario: To verify data is getting transferred from GCS Source to GCS Sink using test Schema Detection On Single File without connection + Given Open Datafusion Project to configure pipeline + When Select plugin: "GCS" from the plugins list as: "Source" + When Expand Plugin group in the LHS plugins list: "Sink" + When Select plugin: "GCS" from the plugins list as: "Sink" + Then Connect plugins: "GCS" and "GCS2" to establish connection + Then Navigate to the properties page of plugin: "GCS" + Then Select dropdown plugin property: "select-schema-actions-dropdown" with option value: "clear" + Then Replace input plugin property: "project" with value: "projectId" + Then Override Service account details if set in environment variables + Then Enter input plugin property: "referenceName" with value: "sourceRef" + Then Enter GCS source property path "gcsCsvDataFile" + Then Select GCS property format "delimited" + Then Enter input plugin property: "delimiter" with value: "delimiterValue" + Then Toggle GCS source property skip header to true + Then Validate "GCS" plugin properties + Then Verify the Output Schema matches the Expected Schema: "gcsSingleFileDataSchema" + Then Close the Plugin Properties page + Then Navigate to the properties page of plugin: "GCS2" + Then Replace input plugin property: "project" with value: "projectId" + Then Override Service account details if set in environment variables + Then Enter input plugin property: "referenceName" with value: "sourceRef" + Then Enter GCS sink property path + Then Select dropdown plugin property: "select-format" with option value: "csv" + Then Validate "GCS" plugin properties + Then Close the Plugin Properties page + Then Save the pipeline + Then Preview and run the pipeline + Then Wait till pipeline preview is in running state + Then Open and capture pipeline preview logs + Then Verify the preview run status of pipeline in the logs is "succeeded" + Then Close the pipeline logs + Then Click on preview data for GCS sink + Then Verify preview output schema matches the outputSchema captured in properties + Then Close the preview data + Then Deploy the pipeline + Then Run the Pipeline in Runtime + Then Wait till pipeline is in running state + Then Open and capture logs + Then Verify the pipeline status is "Succeeded" + Then Validate the data from GCS Source to GCS Sink with expected csv file and target data in GCS bucket + + @GCS_CSV @GCS_SINK_TEST + Scenario: To verify the pipeline is getting failed from GCS to GCS when Schema is not cleared in GCS source On Single File + Given Open Datafusion Project to configure pipeline + When Select plugin: "GCS" from the plugins list as: "Source" + When Expand Plugin group in the LHS plugins list: "Sink" + When Select plugin: "GCS" from the plugins list as: "Sink" + Then Connect plugins: "GCS" and "GCS2" to establish connection + Then Navigate to the properties page of plugin: "GCS" + Then Replace input plugin property: "project" with value: "projectId" + Then Override Service account details if set in environment variables + Then Enter input plugin property: "referenceName" with value: "sourceRef" + Then Enter GCS source property path "gcsCsvDataFile" + Then Select GCS property format "csv" + Then Validate "GCS" plugin properties + Then Close the Plugin Properties page + Then Navigate to the properties page of plugin: "GCS2" + Then Replace input plugin property: "project" with value: "projectId" + Then Override Service account details if set in environment variables + Then Enter input plugin property: "referenceName" with value: "sourceRef" + Then Enter GCS sink property path + Then Select dropdown plugin property: "select-format" with option value: "csv" + Then Validate "GCS" plugin properties + Then Close the Plugin Properties page + Then Save the pipeline + Then Deploy the pipeline + Then Run the Pipeline in Runtime + Then Wait till pipeline is in running state + Then Open and capture logs + Then Verify the pipeline status is "Failed" + + @GCS_MULTIPLE_FILES_TEST @GCS_SINK_TEST @EXISTING_GCS_CONNECTION + Scenario: To verify the pipeline is getting failed from GCS Source to GCS Sink On Multiple File with connection + Given Open Datafusion Project to configure pipeline + When Select plugin: "GCS" from the plugins list as: "Source" + When Expand Plugin group in the LHS plugins list: "Sink" + When Select plugin: "GCS" from the plugins list as: "Sink" + Then Connect plugins: "GCS" and "GCS2" to establish connection + Then Navigate to the properties page of plugin: "GCS" + Then Select dropdown plugin property: "select-schema-actions-dropdown" with option value: "clear" + Then Click plugin property: "switch-useConnection" + Then Click on the Browse Connections button + Then Select connection: "gcsConnectionName" + Then Enter input plugin property: "referenceName" with value: "sourceRef" + Then Enter GCS source property path "gcsMultipleFilesPath" + Then Select GCS property format "delimited" + Then Enter input plugin property: "delimiter" with value: "delimiterValue" + Then Validate "GCS" plugin properties + Then Close the Plugin Properties page + Then Navigate to the properties page of plugin: "GCS2" + Then Click plugin property: "switch-useConnection" + Then Click on the Browse Connections button + Then Select connection: "gcsConnectionName" + Then Enter input plugin property: "referenceName" with value: "sinkRef" + Then Enter GCS sink property path + Then Select dropdown plugin property: "select-format" with option value: "json" + Then Validate "GCS" plugin properties + Then Close the Plugin Properties page + Then Save the pipeline + Then Deploy the pipeline + Then Run the Pipeline in Runtime + Then Wait till pipeline is in running state + And Verify the pipeline status is "Failed" + Then Open Pipeline logs and verify Log entries having below listed Level and Message: + | Level | Message | + | ERROR | errorMessageMultipleFileWithFirstRowAsHeaderDisabled | + + @GCS_MULTIPLE_FILES_TEST @GCS_SINK_TEST + Scenario: To verify the pipeline is getting failed from GCS Source to GCS Sink On Multiple File without connection + Given Open Datafusion Project to configure pipeline + When Select plugin: "GCS" from the plugins list as: "Source" + When Expand Plugin group in the LHS plugins list: "Sink" + When Select plugin: "GCS" from the plugins list as: "Sink" + Then Connect plugins: "GCS" and "GCS2" to establish connection + Then Navigate to the properties page of plugin: "GCS" + Then Select dropdown plugin property: "select-schema-actions-dropdown" with option value: "clear" + Then Replace input plugin property: "project" with value: "projectId" + Then Override Service account details if set in environment variables + Then Enter input plugin property: "referenceName" with value: "sourceRef" + Then Enter GCS source property path "gcsMultipleFilesPath" + Then Select GCS property format "delimited" + Then Enter input plugin property: "delimiter" with value: "delimiterValue" + Then Toggle GCS source property skip header to true + Then Validate "GCS" plugin properties + Then Close the Plugin Properties page + Then Navigate to the properties page of plugin: "GCS2" + Then Replace input plugin property: "project" with value: "projectId" + Then Override Service account details if set in environment variables + Then Enter input plugin property: "referenceName" with value: "sourceRef" + Then Enter GCS sink property path + Then Select dropdown plugin property: "select-format" with option value: "json" + Then Validate "GCS" plugin properties + Then Close the Plugin Properties page + Then Save the pipeline + Then Deploy the pipeline + Then Run the Pipeline in Runtime + Then Wait till pipeline is in running state + And Verify the pipeline status is "Failed" + Then Open Pipeline logs and verify Log entries having below listed Level and Message: + | Level | Message | + | ERROR | errorMessageMultipleFileWithFirstRowAsHeaderEnabled | + + @GCS_MULTIPLE_FILES_TEST @GCS_SINK_TEST + Scenario: To verify the pipeline is getting failed from GCS to GCS when Schema is not cleared in GCS source On Multiple File + Given Open Datafusion Project to configure pipeline + When Select plugin: "GCS" from the plugins list as: "Source" + When Expand Plugin group in the LHS plugins list: "Sink" + When Select plugin: "GCS" from the plugins list as: "Sink" + Then Connect plugins: "GCS" and "GCS2" to establish connection + Then Navigate to the properties page of plugin: "GCS" + Then Replace input plugin property: "project" with value: "projectId" + Then Override Service account details if set in environment variables + Then Enter input plugin property: "referenceName" with value: "sourceRef" + Then Enter GCS source property path "gcsMultipleFilesPath" + Then Select GCS property format "delimited" + Then Enter input plugin property: "delimiter" with value: "delimiterValue" + Then Toggle GCS source property skip header to true + Then Validate "GCS" plugin properties + Then Close the Plugin Properties page + Then Navigate to the properties page of plugin: "GCS2" + Then Replace input plugin property: "project" with value: "projectId" + Then Override Service account details if set in environment variables + Then Enter input plugin property: "referenceName" with value: "sourceRef" + Then Enter GCS sink property path + Then Select dropdown plugin property: "select-format" with option value: "json" + Then Validate "GCS" plugin properties + Then Close the Plugin Properties page + Then Save the pipeline + Then Deploy the pipeline + Then Run the Pipeline in Runtime + Then Wait till pipeline is in running state + And Verify the pipeline status is "Failed" + Then Open Pipeline logs and verify Log entries having below listed Level and Message: + | Level | Message | + | ERROR | errorMessageMultipleFileWithoutClearDefaultSchema | + + @GCS_MULTIPLE_FILES_REGEX_TEST @GCS_SINK_TEST @EXISTING_GCS_CONNECTION + Scenario: To verify the pipeline is getting failed from GCS to GCS On Multiple File with filter regex using connection + Given Open Datafusion Project to configure pipeline + When Select plugin: "GCS" from the plugins list as: "Source" + When Expand Plugin group in the LHS plugins list: "Sink" + When Select plugin: "GCS" from the plugins list as: "Sink" + Then Connect plugins: "GCS" and "GCS2" to establish connection + Then Navigate to the properties page of plugin: "GCS" + Then Select dropdown plugin property: "select-schema-actions-dropdown" with option value: "clear" + Then Click plugin property: "switch-useConnection" + Then Click on the Browse Connections button + Then Select connection: "gcsConnectionName" + Then Enter input plugin property: "referenceName" with value: "sourceRef" + Then Enter GCS source property path "gcsMultipleFilesFilterRegexPath" + Then Select GCS property format "delimited" + Then Enter input plugin property: "delimiter" with value: "delimiterValue" + Then Toggle GCS source property skip header to true + Then Enter input plugin property: "fileRegex" with value: "fileRegexValue" + Then Validate "GCS" plugin properties + Then Close the Plugin Properties page + Then Navigate to the properties page of plugin: "GCS2" + Then Click plugin property: "useConnection" + Then Click on the Browse Connections button + Then Select connection: "gcsConnectionName" + Then Override Service account details if set in environment variables + Then Enter input plugin property: "referenceName" with value: "sourceRef" + Then Enter GCS sink property path + Then Select dropdown plugin property: "select-format" with option value: "json" + Then Validate "GCS" plugin properties + Then Close the Plugin Properties page + Then Save the pipeline + Then Preview and run the pipeline + Then Wait till pipeline preview is in running state + Then Open and capture pipeline preview logs + Then Verify the preview run status of pipeline in the logs is "succeeded" + Then Close the pipeline logs + Then Close the preview + Then Deploy the pipeline + Then Run the Pipeline in Runtime + Then Wait till pipeline is in running state + Then Open and capture logs + Then Verify the pipeline status is "Succeeded" + Then Validate the data from GCS Source to GCS Sink with expected json file and target data in GCS bucket + + @GCS_MULTIPLE_FILES_REGEX_TEST @GCS_SINK_TEST + Scenario: To verify the pipeline is getting failed from GCS to GCS On Multiple File with filter regex without using connection + Given Open Datafusion Project to configure pipeline + When Select plugin: "GCS" from the plugins list as: "Source" + When Expand Plugin group in the LHS plugins list: "Sink" + When Select plugin: "GCS" from the plugins list as: "Sink" + Then Connect plugins: "GCS" and "GCS2" to establish connection + Then Navigate to the properties page of plugin: "GCS" + Then Select dropdown plugin property: "select-schema-actions-dropdown" with option value: "clear" + Then Enter input plugin property: "referenceName" with value: "sourceRef" + Then Replace input plugin property: "projectId" with value: "projectId" + Then Enter GCS source property path "gcsMultipleFilesFilterRegexPath" + Then Select GCS property format "delimited" + Then Enter input plugin property: "delimiter" with value: "delimiterValue" + Then Toggle GCS source property skip header to true + Then Enter input plugin property: "fileRegex" with value: "fileRegexValue" + Then Validate "GCS" plugin properties + Then Close the Plugin Properties page + Then Navigate to the properties page of plugin: "GCS2" + Then Replace input plugin property: "project" with value: "projectId" + Then Override Service account details if set in environment variables + Then Enter input plugin property: "referenceName" with value: "sourceRef" + Then Enter GCS sink property path + Then Select dropdown plugin property: "select-format" with option value: "json" + Then Validate "GCS" plugin properties + Then Close the Plugin Properties page + Then Save the pipeline + Then Preview and run the pipeline + Then Wait till pipeline preview is in running state + Then Open and capture pipeline preview logs + Then Verify the preview run status of pipeline in the logs is "succeeded" + Then Close the pipeline logs + Then Close the preview + Then Deploy the pipeline + Then Run the Pipeline in Runtime + Then Wait till pipeline is in running state + Then Open and capture logs + Then Verify the pipeline status is "Succeeded" + Then Validate the data from GCS Source to GCS Sink with expected json file and target data in GCS bucket diff --git a/src/e2e-test/java/io/cdap/plugin/common/stepsdesign/TestSetupHooks.java b/src/e2e-test/java/io/cdap/plugin/common/stepsdesign/TestSetupHooks.java index 4d5d4de646..8b36ad9a3b 100644 --- a/src/e2e-test/java/io/cdap/plugin/common/stepsdesign/TestSetupHooks.java +++ b/src/e2e-test/java/io/cdap/plugin/common/stepsdesign/TestSetupHooks.java @@ -178,7 +178,8 @@ public static void createBucketWithAvroTestFile() throws IOException, URISyntaxE @After(order = 1, value = "@GCS_CSV_TEST or @GCS_TSV_TEST or @GCS_BLOB_TEST " + "or @GCS_DELIMITED_TEST or @GCS_TEXT_TEST or @GCS_OUTPUT_FIELD_TEST or @GCS_DATATYPE_1_TEST or " + "@GCS_DATATYPE_2_TEST or @GCS_READ_RECURSIVE_TEST or @GCS_DELETE_WILDCARD_TEST or @GCS_CSV_RANGE_TEST or" + - " @GCS_PARQUET_TEST or @GCS_AVRO_TEST or @GCS_DATATYPE_TEST or @GCS_AVRO_FILE") + " @GCS_PARQUET_TEST or @GCS_AVRO_TEST or @GCS_DATATYPE_TEST or @GCS_CSV or @GCS_MULTIPLE_FILES_TEST or \" +\n" + + " \"@GCS_MULTIPLE_FILES_REGEX_TEST or or @GCS_AVRO_FILE") public static void deleteSourceBucketWithFile() { deleteGCSBucket(gcsSourceBucketName); PluginPropertyUtils.removePluginProp("gcsSourceBucketName"); @@ -920,6 +921,22 @@ public static void createSourceBQUpdateTable() throws IOException, InterruptedEx PluginPropertyUtils.addPluginProp(" bqTargetTable", bqTargetTable); BeforeActions.scenario.write("BQ Target Table " + bqTargetTable + " updated successfully"); } + + @Before(order = 1, value = "@GCS_CSV") + public static void createGcsBucketWithCsv() throws IOException, URISyntaxException { + gcsSourceBucketName = createGCSBucketWithFile(PluginPropertyUtils.pluginProp("gcsCsvDataFile")); + } + + @Before(order = 1, value = "@GCS_MULTIPLE_FILES_TEST") + public static void createBucketWithMultipleTestFiles() throws IOException, URISyntaxException { + gcsSourceBucketName = createGCSBucketWithMultipleFiles(PluginPropertyUtils.pluginProp("gcsMultipleFilesPath")); + } + + @Before(order = 1, value = "@GCS_MULTIPLE_FILES_REGEX_TEST") + public static void createBucketWithMultipleTestFilesWithRegex() throws IOException, URISyntaxException { + gcsSourceBucketName = createGCSBucketWithMultipleFiles(PluginPropertyUtils.pluginProp( + "gcsMultipleFilesFilterRegexPath")); + } @Before(order = 1, value = "@GCS_AVRO_FILE") public static void createGcsBucketWithAvro() throws IOException, URISyntaxException { gcsSourceBucketName = createGCSBucketWithFile(PluginPropertyUtils.pluginProp("gcsAvroAllDataFile")); diff --git a/src/e2e-test/java/io/cdap/plugin/gcs/GCSValidationHelper.java b/src/e2e-test/java/io/cdap/plugin/gcs/GCSValidationHelper.java index 3b74890317..dcbdfc0960 100644 --- a/src/e2e-test/java/io/cdap/plugin/gcs/GCSValidationHelper.java +++ b/src/e2e-test/java/io/cdap/plugin/gcs/GCSValidationHelper.java @@ -16,19 +16,26 @@ package io.cdap.plugin.gcs; +import au.com.bytecode.opencsv.CSVReader; +import com.esotericsoftware.minlog.Log; import com.google.api.gax.paging.Page; import com.google.cloud.storage.Blob; import com.google.cloud.storage.Storage; import com.google.cloud.storage.StorageOptions; import com.google.gson.Gson; +import com.google.gson.JsonElement; import com.google.gson.JsonObject; import io.cdap.e2e.utils.PluginPropertyUtils; +import io.cdap.plugin.utils.DataFileFormat; import org.apache.avro.file.DataFileReader; import org.apache.avro.file.FileReader; import org.apache.avro.generic.GenericDatumReader; import org.apache.avro.generic.GenericRecord; import org.apache.avro.io.DatumReader; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.io.BufferedReader; import java.io.File; import java.io.IOException; import java.nio.charset.StandardCharsets; @@ -44,8 +51,40 @@ */ public class GCSValidationHelper { private static final String avroFilePath = PluginPropertyUtils.pluginProp("gcsAvroExpectedFilePath"); - private static final String projectId = PluginPropertyUtils.pluginProp("projectId"); + private static final String csvFilePath = PluginPropertyUtils.pluginProp("gcsCsvExpectedFilePath"); + private static final String jsonFilePath = PluginPropertyUtils.pluginProp("gcsMultipleFilesRegexFilePath"); private static final Gson gson = new Gson(); + private static final Logger LOG = LoggerFactory.getLogger(GCSValidationHelper.class); + + /** + * Validates data in a Google Cloud Storage (GCS) bucket against expected JSON content. + * + * @param bucketName The name of the GCS bucket to validate. + * @return true if the GCS bucket's content matches the expected JSON data, false otherwise. + */ + public static boolean validateGCSSourceToGCSSinkWithJsonFormat(String bucketName) { + Map expectedTextJsonData = new HashMap<>(); + getFileData(jsonFilePath, expectedTextJsonData); + Map actualGcsCsvData = listBucketObjects(bucketName, DataFileFormat.JSON); + boolean isMatched = actualGcsCsvData.equals(expectedTextJsonData); + return isMatched; + } + + /** + * Validates if the data in a (GCS) bucket matches the expected CSV data in JSON format. + * + * @param bucketName The name of the GCS bucket to validate. + * @return True if the GCS CSV data matches the expected data, false otherwise. + * @throws IOException If an IO error occurs during data retrieval. + */ + public static boolean validateGCSSourceToGCSSinkWithCSVFormat(String bucketName) { + Map expectedCSVData = readCsvFileDataAndConvertToJson(csvFilePath); + Map actualGcsCsvData = listBucketObjects(bucketName, DataFileFormat.CSV); + + boolean isMatched = actualGcsCsvData.equals(expectedCSVData); + + return isMatched; + } /** * Validates if the data in a (GCS) bucket matches the data @@ -55,15 +94,19 @@ public class GCSValidationHelper { * @return True if the GCS data matches the Avro data, false otherwise. * @throws IOException If an IO error occurs during data retrieval. */ - public static boolean validateGCSSourceToGCSSink(String bucketName) throws IOException { - Map expectedAvroData = convertAvroToJsonWithKeys(); - Map actualGcsData = listBucketObjects(bucketName); + public static boolean validateGCSSourceToGCSSinkWithAVROFormat(String bucketName) throws IOException { + Map expectedAvroData = convertAvroToJsonWithKeys(avroFilePath); + Map actualGcsData = listBucketObjects(bucketName, DataFileFormat.JSON); + boolean isMatched = actualGcsData.equals(expectedAvroData); + return isMatched; } - public static Map listBucketObjects(String bucketName) { + + public static Map listBucketObjects(String bucketName, DataFileFormat dataFormat) { Map bucketObjectData = new HashMap<>(); + String projectId = PluginPropertyUtils.pluginProp("projectId"); Storage storage = StorageOptions.newBuilder().setProjectId(projectId).build().getService(); Page blobs = storage.list(bucketName); @@ -76,50 +119,107 @@ public static Map listBucketObjects(String bucketName) { if (!bucketObjectNames.isEmpty()) { String objectName = bucketObjectNames.get(0); - if (objectName.contains("part-r")) { - Map dataMap = fetchObjectData(projectId, bucketName, objectName); - bucketObjectData.putAll(dataMap); + if (objectName.contains("part-r-")) { + Map dataMap2 = fetchObjectData(projectId, bucketName, objectName, dataFormat); + bucketObjectData.putAll(dataMap2); } } + return bucketObjectData; } /** - * Fetches the data of a specific object from a GCS bucket - * and converts it to a map of JSON objects. + * Fetches and parses data from a specified object in a GCS bucket. * - * @param projectId The ID of the GCP project. - * @param bucketName The name of the GCS bucket containing the object. - * @param objectName The name of the object to fetch. - * @return A map of object data where keys are IDs and values are JSON objects. + * @param projectId The ID of the GCP project where the GCS bucket is located. + * @param bucketName The name of the GCS bucket containing the object to fetch. + * @param objectName The name of the object to fetch from the GCS bucket. + * @param format The format of the object data (JSON or CSV). + * @return A Map containing the parsed data from the object, with string keys and JSON objects as values. */ - private static Map fetchObjectData(String projectId, String bucketName, String objectName) { + public static Map fetchObjectData(String projectId, String bucketName, String objectName, + DataFileFormat format) { Map dataMap = new HashMap<>(); Storage storage = StorageOptions.newBuilder().setProjectId(projectId).build().getService(); byte[] objectData = storage.readAllBytes(bucketName, objectName); String objectDataAsString = new String(objectData, StandardCharsets.UTF_8); - // Splitting using the delimiter as a File can have more than one record. - String[] lines = objectDataAsString.split("\n"); + switch (format) { + case JSON: + parseDataToJson(objectDataAsString, dataMap); + break; + case CSV: + parseCsvDataToJson(objectDataAsString, dataMap); + break; + default: + LOG.error("Unsupported File Format"); + break; + } + return dataMap; + } + + private static void parseDataToJson(String data, Map dataMap) { + String[] lines = data.split("\n"); for (String line : lines) { JsonObject json = gson.fromJson(line, JsonObject.class); - String id = json.get("id").getAsString(); + String id = json.get("ID").getAsString(); dataMap.put(id, json); } - return dataMap; + } + + private static void parseCsvDataToJson(String data, Map dataMap) { + String[] lines = data.split("\n"); + String[] headers = lines[0].split(","); + + for (int lineCount = 1; lineCount < lines.length; lineCount++) { + String[] values = lines[lineCount].split(","); + JsonObject jsonObject = new JsonObject(); + for (int headerCount = 0; headerCount < headers.length; headerCount++) { + jsonObject.addProperty(headers[headerCount], values[headerCount]); + } + String id = values[0]; + dataMap.put(id, jsonObject); + } + } + + /** + * Converts data from a CSV filePath to a map of JSON objects. + * + * @return A map with identifiers (e.g., ID from the first column) as keys and JSON objects as values. + * @throws IOException If there's an error reading the CSV file. + */ + public static Map readCsvFileDataAndConvertToJson(String filePath) { + Map csvDataMap = new HashMap<>(); + try (CSVReader csvReader = new CSVReader(new java.io.FileReader(filePath))) { + // Read the header line to get column names + String[] headers = csvReader.readNext(); + + String[] line; + while ((line = csvReader.readNext()) != null) { + JsonObject jsonObject = new JsonObject(); + + for (int j = 0; j < headers.length; j++) { + jsonObject.addProperty(headers[j], line[j]); + } + String id = line[0]; + csvDataMap.put(id, jsonObject); + } + } catch (IOException e) { + e.printStackTrace(); + } + return csvDataMap; } /** - * Converts Avro files to JSON objects with keys and stores them in a map. + * Converts Avro filePath to JSON objects with keys and stores them in a map. * * @return A map of keys to JSON objects representing the Avro data. * @throws IOException If an IO error occurs during Avro to JSON conversion. */ - public static Map convertAvroToJsonWithKeys() throws IOException { - File avroFile = new File(avroFilePath); + public static Map convertAvroToJsonWithKeys(String filePath) throws IOException { + File avroFile = new File(filePath); DatumReader datumReader = new GenericDatumReader<>(); Map avroDataMap = new HashMap<>(); - try (FileReader dataFileReader = DataFileReader.openReader(avroFile, datumReader)) { int keyCounter = 1; while (dataFileReader.hasNext()) { @@ -132,4 +232,31 @@ public static Map convertAvroToJsonWithKeys() throws IOExcep } return avroDataMap; } + + /** + * Reads data from a JSON file, parses each line into JSON objects, and populates a provided + * map with these objects, using the "ID" field as the key. + * + * @param fileName The name of the JSON file to read. + * @param fileMap A map where parsed JSON objects will be stored with their "ID" field as the key. + */ + public static void getFileData(String fileName, Map fileMap) { + try (BufferedReader br = new BufferedReader(new java.io.FileReader(fileName))) { + String line; + while ((line = br.readLine()) != null) { + JsonObject json = gson.fromJson(line, JsonObject.class); + if (json.has("ID")) { // Check if the JSON object has the "id" key + JsonElement idElement = json.get("ID"); + if (idElement.isJsonPrimitive()) { + String idKey = idElement.getAsString(); + fileMap.put(idKey, json); + } else { + Log.error("ID key not found"); + } + } + } + } catch (IOException e) { + System.err.println("Error reading the file: " + e.getMessage()); + } + } } diff --git a/src/e2e-test/java/io/cdap/plugin/gcs/stepsdesign/GCSSource.java b/src/e2e-test/java/io/cdap/plugin/gcs/stepsdesign/GCSSource.java index 030d87cbb6..b5d87989d7 100644 --- a/src/e2e-test/java/io/cdap/plugin/gcs/stepsdesign/GCSSource.java +++ b/src/e2e-test/java/io/cdap/plugin/gcs/stepsdesign/GCSSource.java @@ -134,6 +134,15 @@ public void selectGCSSourcePropertyPathFilenameOnlyAs(String value) { "GCS bucket") public void validateTheDataTransferredFromGCSSourceToGCSSinkWithExpectedAvroFileAndTargetDataInGCSBucket() throws IOException { - GCSValidationHelper.validateGCSSourceToGCSSink(TestSetupHooks.gcsTargetBucketName); + GCSValidationHelper.validateGCSSourceToGCSSinkWithAVROFormat(TestSetupHooks.gcsTargetBucketName); + } + + @Then("Validate the data from GCS Source to GCS Sink with expected csv file and target data in GCS bucket") + public void validateTheDataFromGCSSourceToGCSSinkWithExpectedCsvFileAndTargetDataInGCSBucket() { + GCSValidationHelper.validateGCSSourceToGCSSinkWithCSVFormat(TestSetupHooks.gcsTargetBucketName); + } + @Then("Validate the data from GCS Source to GCS Sink with expected json file and target data in GCS bucket") + public void validateTheDataFromGCSSourceToGCSSinkWithExpectedJsonFileAndTargetDataInGCSBucket() { + GCSValidationHelper.validateGCSSourceToGCSSinkWithJsonFormat(TestSetupHooks.gcsTargetBucketName); } } diff --git a/src/e2e-test/java/io/cdap/plugin/utils/DataFileFormat.java b/src/e2e-test/java/io/cdap/plugin/utils/DataFileFormat.java new file mode 100644 index 0000000000..037f6544b9 --- /dev/null +++ b/src/e2e-test/java/io/cdap/plugin/utils/DataFileFormat.java @@ -0,0 +1,29 @@ +/* + * Copyright © 2023 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package io.cdap.plugin.utils; + + +/** + * Enumeration representing different data file formats that can be used + * for storing and retrieving data, such as JSON and CSV. + */ + +public enum DataFileFormat { + + JSON, // Represents JSON format + CSV // Represents CSV format +} + diff --git a/src/e2e-test/resources/errorMessage.properties b/src/e2e-test/resources/errorMessage.properties index deadc4b7dc..51885b81d2 100644 --- a/src/e2e-test/resources/errorMessage.properties +++ b/src/e2e-test/resources/errorMessage.properties @@ -27,4 +27,6 @@ errorMessageInvalidReferenceName=Invalid reference name errorMessageInvalidBucketName=Invalid bucket name in path errorMessageInvalidFormat=Input has multi-level structure that cannot be represented appropriately as csv. \ Consider using json, avro or parquet to write data. - +errorMessageMultipleFileWithFirstRowAsHeaderDisabled=Spark program 'phase-1' failed with error: Found a row with 6 fields when the schema only contains 4 fields. Check that the schema contains the right number of fields.. Please check the system logs for more details. +errorMessageMultipleFileWithFirstRowAsHeaderEnabled=Spark program 'phase-1' failed with error: For input string: +errorMessageMultipleFileWithoutClearDefaultSchema=Spark program 'phase-1' failed with error: Found a row with 4 fields when the schema only contains 2 fields. diff --git a/src/e2e-test/resources/pluginParameters.properties b/src/e2e-test/resources/pluginParameters.properties index e0704d4ede..5f00a88098 100644 --- a/src/e2e-test/resources/pluginParameters.properties +++ b/src/e2e-test/resources/pluginParameters.properties @@ -133,6 +133,15 @@ gcsAvroAllTypeDataSchema=[{"key":"id","value":"long"},{"key":"array","value":"st {"key":"time","value":"timestamp"}] gcsAvroAllDataFile=testdata/GCS_ALL_TYPES.avro gcsAvroExpectedFilePath=src/e2e-test/resources/testdata/GCS_ALL_TYPES.avro +gcsCsvDataFile=testdata/GCS_CSV.csv +gcsMultipleFilesFilterRegexPath=testdata/GCS_MULTIPLEFILE_REGEX_TEST +fileRegexValue=.+vehicle_inventory.* +gcsMultipleFilesRegexFilePath=testdata/Multiple_File_Regex_Test +delimiterValue=; +gcsMultipleFilesPath=testdata/GCS_MULTIPLEFILE_TEST +gcsCsvExpectedFilePath=src/e2e-test/resources/testdata/GCS_CSV.csv +gcsSingleFileDataSchema=[{"key":"ID","value":"int"},{"key":"Name","value":"string"},\ + {"key":"Age","value":"int"},{"key":"City","value":"string"},{"key":"town","value":"string"}] gcsOutputFilePrefix=Bigdata gcsPathSuffix=2022-02-28-13-22 gcsCreateObject1=gcscreatetestfolder1 diff --git a/src/e2e-test/resources/testdata/GCS_CSV.csv b/src/e2e-test/resources/testdata/GCS_CSV.csv new file mode 100644 index 0000000000..669d2b644e --- /dev/null +++ b/src/e2e-test/resources/testdata/GCS_CSV.csv @@ -0,0 +1,5 @@ +ID;Name;Age;City;town +1;Alex;33;Sydney;New South Wales +2;Johnson;43;Miami;Florida +3;Smith;28;Canberra;Australian Capital Territory +4;Steve;30;Washington;DC \ No newline at end of file diff --git a/src/e2e-test/resources/testdata/GCS_MULTIPLEFILE_REGEX_TEST/book.csv b/src/e2e-test/resources/testdata/GCS_MULTIPLEFILE_REGEX_TEST/book.csv new file mode 100644 index 0000000000..41191f1964 --- /dev/null +++ b/src/e2e-test/resources/testdata/GCS_MULTIPLEFILE_REGEX_TEST/book.csv @@ -0,0 +1,3 @@ +ID;Title;Author;Year;Genre;Price +50001;To Kill a Mockingbird;Harper Lee;1960;Fiction;12.99 +60002;The Great Gatsby;F. Scott Fitzgerald;1925;Classics;10.99 \ No newline at end of file diff --git a/src/e2e-test/resources/testdata/GCS_MULTIPLEFILE_REGEX_TEST/vehicle_inventory-part-1.csv b/src/e2e-test/resources/testdata/GCS_MULTIPLEFILE_REGEX_TEST/vehicle_inventory-part-1.csv new file mode 100644 index 0000000000..b6733dc2fb --- /dev/null +++ b/src/e2e-test/resources/testdata/GCS_MULTIPLEFILE_REGEX_TEST/vehicle_inventory-part-1.csv @@ -0,0 +1,3 @@ +ID;Make;Model;Year;Color;Price;Mileage +10012;Toyota;Camry;2022;Blue;25000;15000 +20034;Honda;CR-V;2023;Red;28000;12000 \ No newline at end of file diff --git a/src/e2e-test/resources/testdata/GCS_MULTIPLEFILE_REGEX_TEST/vehicle_inventory-part-2.csv b/src/e2e-test/resources/testdata/GCS_MULTIPLEFILE_REGEX_TEST/vehicle_inventory-part-2.csv new file mode 100644 index 0000000000..bdcd727955 --- /dev/null +++ b/src/e2e-test/resources/testdata/GCS_MULTIPLEFILE_REGEX_TEST/vehicle_inventory-part-2.csv @@ -0,0 +1,3 @@ +ID;Make;Model;Year;Color;Price;Mileage +30045;Ford;Fusion;2022;Silver;22000;18000 +40056;Chevrolet;Equinox;2023;White;27000;14000 \ No newline at end of file diff --git a/src/e2e-test/resources/testdata/GCS_MULTIPLEFILE_TEST/authors-part-1.csv b/src/e2e-test/resources/testdata/GCS_MULTIPLEFILE_TEST/authors-part-1.csv new file mode 100644 index 0000000000..6b2e7b5efd --- /dev/null +++ b/src/e2e-test/resources/testdata/GCS_MULTIPLEFILE_TEST/authors-part-1.csv @@ -0,0 +1,3 @@ +id;name;age;yearOfBirth +1;Albert Einstein;76;1879 +2;Isaac Newton;84;1643 \ No newline at end of file diff --git a/src/e2e-test/resources/testdata/GCS_MULTIPLEFILE_TEST/authors-part-2.csv b/src/e2e-test/resources/testdata/GCS_MULTIPLEFILE_TEST/authors-part-2.csv new file mode 100644 index 0000000000..798ec6fc28 --- /dev/null +++ b/src/e2e-test/resources/testdata/GCS_MULTIPLEFILE_TEST/authors-part-2.csv @@ -0,0 +1,3 @@ +id;Make;Model;Year;Color,Price;Mileage +10012;Toyota;Camry;2022;Blue;25000;15000 +20034;Honda;CR-V;2023;Red;28000;12000 \ No newline at end of file diff --git a/src/e2e-test/resources/testdata/Multiple_File_Regex_Test b/src/e2e-test/resources/testdata/Multiple_File_Regex_Test new file mode 100644 index 0000000000..141f37c0e6 --- /dev/null +++ b/src/e2e-test/resources/testdata/Multiple_File_Regex_Test @@ -0,0 +1,4 @@ +{"ID":10012,"Make":"Toyota","Model":"Camry","Year":2022,"Color":"Blue","Price":25000,"Mileage":15000} +{"ID":20034,"Make":"Honda","Model":"CR-V","Year":2023,"Color":"Red","Price":28000,"Mileage":12000} +{"ID":30045,"Make":"Ford","Model":"Fusion","Year":2022,"Color":"Silver","Price":22000,"Mileage":18000} +{"ID":40056,"Make":"Chevrolet","Model":"Equinox","Year":2023,"Color":"White","Price":27000,"Mileage":14000} \ No newline at end of file