From 0a187896cf824ee17546bacc9206ce9703c675ce Mon Sep 17 00:00:00 2001 From: priyabhatnagar Date: Fri, 15 Dec 2023 14:57:21 +0530 Subject: [PATCH] pubsub e2e test --- pom.xml | 154 +++--- .../PubSubToPubSub.feature | 456 ++++++++++++++++++ .../common/stepsdesign/TestSetupHooks.java | 98 +++- .../plugin/pubsub/actions/PubSubActions.java | 9 + .../pubsub/locators/PubSubLocators.java | 19 +- .../plugin/pubsub/stepsdesign/PubSubSink.java | 30 ++ .../pubsub/stepsdesign/PubSubSource.java | 69 ++- .../io/cdap/plugin/utils/PubSubClient.java | 233 +++++++++ .../java/io/cdap/plugin/utils/State.java | 419 ++++++++++++++++ .../pluginDataCyAttributes.properties | 1 + .../resources/pluginParameters.properties | 6 + .../testdata/pubsubavrofile/avrofile.avsc | 18 + .../BigQueryWindowsAggregationSQLBuilder.java | 1 - 13 files changed, 1431 insertions(+), 82 deletions(-) create mode 100644 src/e2e-test/features/pubsub/pubsubadditionalfeature/PubSubToPubSub.feature create mode 100644 src/e2e-test/java/io/cdap/plugin/utils/State.java create mode 100644 src/e2e-test/resources/testdata/pubsubavrofile/avrofile.avsc diff --git a/pom.xml b/pom.xml index fc710f9c03..4258ddaf3a 100644 --- a/pom.xml +++ b/pom.xml @@ -69,7 +69,7 @@ 7 UTF-8 - 1.8.2 + 1.11.3 hadoop2-1.2.0 1.4 6.9.1 @@ -80,7 +80,7 @@ 1.17.1 1.137.1 2.0.2 - 1.108.1 + 1.112.1 6.10.1 1.24.7 2.3.0 @@ -638,7 +638,7 @@ org.apache.avro avro-mapred - hadoop2 + ${avro.version} @@ -1157,82 +1157,82 @@ src/e2e-test/resources - - - org.apache.maven.plugins - maven-surefire-plugin - 2.18.1 - - true - - - - - org.apache.maven.plugins - maven-failsafe-plugin - 3.0.0-M5 - - - ${TEST_RUNNER} - - - classes - 2 - 2 - true - - - - ${GOOGLE_APPLICATION_CREDENTIALS} - - - ${SERVICE_ACCOUNT_TYPE} - - - ${SERVICE_ACCOUNT_FILE_PATH} - - - ${SERVICE_ACCOUNT_JSON} - - - - - - - integration-test - - - - - - - net.masterthought - maven-cucumber-reporting - 5.5.0 + + + org.apache.maven.plugins + maven-surefire-plugin + 2.18.1 + + true + + - - - execution - verify - - generate - + + org.apache.maven.plugins + maven-failsafe-plugin + 3.0.0-M5 - Cucumber Reports - target/cucumber-reports/advanced-reports - 1 - false - ${project.build.directory}/cucumber-reports - - **/*.json - - ${project.build.directory}/cucumber-reports - true + + ${TEST_RUNNER} + + + classes + 2 + 2 + true + + + + ${GOOGLE_APPLICATION_CREDENTIALS} + + + ${SERVICE_ACCOUNT_TYPE} + + + ${SERVICE_ACCOUNT_FILE_PATH} + + + ${SERVICE_ACCOUNT_JSON} + + - - - - + + + + integration-test + + + + + + + net.masterthought + maven-cucumber-reporting + 5.5.0 + + + + execution + verify + + generate + + + Cucumber Reports + target/cucumber-reports/advanced-reports + 1 + false + ${project.build.directory}/cucumber-reports + + **/*.json + + ${project.build.directory}/cucumber-reports + true + + + + + @@ -1257,4 +1257,4 @@ - + \ No newline at end of file diff --git a/src/e2e-test/features/pubsub/pubsubadditionalfeature/PubSubToPubSub.feature b/src/e2e-test/features/pubsub/pubsubadditionalfeature/PubSubToPubSub.feature new file mode 100644 index 0000000000..f650e6478a --- /dev/null +++ b/src/e2e-test/features/pubsub/pubsubadditionalfeature/PubSubToPubSub.feature @@ -0,0 +1,456 @@ +# Copyright © 2023 Cask Data, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may not +# use this file except in compliance with the License. You may obtain a copy of +# the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations under +# the License. + +@PubSub_Sink +Feature: PubSub Source - Verification of PubSub to PubSub successful data transfer in different formats. + + @PUBSUB_SCHEMA_TEST @PUBSUB_SCHEMA_TOPIC_TEST @PUBSUB_SINK_TEST @PUBSUB_SUBSCRIPTION_TEST + Scenario: Verify User is able to transfer messages from PubSub to PubSub in parquet format + Given Open Datafusion Project to configure pipeline + When Select data pipeline type as: "Realtime" + When Expand Plugin group in the LHS plugins list: "Source" + When Select plugin: "Pub/Sub" from the plugins list as: "Source" + When Expand Plugin group in the LHS plugins list: "Sink" + When Select plugin: "Pub/Sub" from the plugins list as: "Sink" + Then Connect plugins: "Pub/Sub" and "Pub/Sub2" to establish connection + Then Navigate to the properties page of plugin: "Pub/Sub" + Then Enter input plugin property: "referenceName" with value: "PubSubSourceReferenceName" + Then Replace input plugin property: "project" with value: "projectId" + Then Enter PubSub source property subscription name + Then Enter PubSub source property topic name + Then Select dropdown plugin property: "format" with option value: "parquet" + Then Add schema for the message + Then Validate "Pub/Sub" plugin properties + And Close the Plugin Properties page + Then Open the PubSub sink properties + Then Enter PubSub property projectId "projectId" + Then Enter PubSub property reference name + Then Enter PubSub sink property topic name + Then Select dropdown plugin property: "format" with option value: "parquet" + Then Validate "Pub/Sub" plugin properties + And Close the Plugin Properties page + And Click on configure button + And Click on pipeline config + And Click on batch time and select format + Then Save the pipeline + Then Deploy the pipeline + Then Run the Pipeline in Runtime + Then Wait for pipeline to be in status: "Running" with a timeout of 240 seconds + Then Publish the messages with schema + Then Validate OUT record count is equal to IN record count + And Stop the pipeline + Then Verify the pipeline status is "Stopped" + + @PUBSUB_SOURCE_TEST @PUBSUB_SINK_TEST @PUBSUB_SUBSCRIPTION_TEST + Scenario: Validate the successful transfer of records from a pubSub source to a pubSub sink with format Json at source and Parquet at sink + Given Open Datafusion Project to configure pipeline + When Select data pipeline type as: "Realtime" + When Expand Plugin group in the LHS plugins list: "Source" + When Select plugin: "Pub/Sub" from the plugins list as: "Source" + When Expand Plugin group in the LHS plugins list: "Sink" + When Select plugin: "Pub/Sub" from the plugins list as: "Sink" + Then Connect plugins: "Pub/Sub" and "Pub/Sub2" to establish connection + Then Navigate to the properties page of plugin: "Pub/Sub" + Then Enter input plugin property: "referenceName" with value: "PubSubSourceReferenceName" + Then Replace input plugin property: "project" with value: "projectId" + Then Enter PubSub source property subscription name + Then Enter PubSub source property topic name + Then Select dropdown plugin property: "format" with option value: "json" + Then Add schema for the message + Then Validate "Pub/Sub" plugin properties + And Close the Plugin Properties page + Then Open the PubSub sink properties + Then Enter PubSub property projectId "projectId" + Then Enter PubSub property reference name + Then Enter PubSub sink property topic name + Then Select dropdown plugin property: "format" with option value: "parquet" + Then Validate "Pub/Sub" plugin properties + And Close the Plugin Properties page + And Click on configure button + And Click on pipeline config + And Click on batch time and select format + Then Save the pipeline + Then Deploy the pipeline + Then Run the Pipeline in Runtime + Then Wait for pipeline to be in status: "Running" with a timeout of 240 seconds + Then Publish the messages + Then Validate OUT record count is equal to IN record count + And Stop the pipeline + Then Verify the pipeline status is "Stopped" + + @PUBSUB_SOURCE_TEST @PUBSUB_SINK_TEST @PUBSUB_SUBSCRIPTION_TEST + Scenario: Validate the successful transfer of records from a pubSub source to a pubSub sink with format Blob at both source and sink + Given Open Datafusion Project to configure pipeline + When Select data pipeline type as: "Realtime" + When Expand Plugin group in the LHS plugins list: "Source" + When Select plugin: "Pub/Sub" from the plugins list as: "Source" + When Expand Plugin group in the LHS plugins list: "Sink" + When Select plugin: "Pub/Sub" from the plugins list as: "Sink" + Then Connect plugins: "Pub/Sub" and "Pub/Sub2" to establish connection + Then Navigate to the properties page of plugin: "Pub/Sub" + Then Replace input plugin property: "project" with value: "projectId" + Then Enter input plugin property: "referenceName" with value: "PubSubSourceReferenceName" + Then Enter PubSub source property subscription name + Then Enter PubSub source property topic name + Then Select dropdown plugin property: "format" with option value: "blob" + Then Validate "Pub/Sub" plugin properties + And Close the Plugin Properties page + Then Navigate to the properties page of plugin: "Pub/Sub2" + Then Replace input plugin property: "project" with value: "projectId" + Then Enter input plugin property: "referenceName" with value: "PubSubSinkReferenceName" + Then Enter PubSub sink property topic name + Then Select dropdown plugin property: "format" with option value: "blob" + Then Validate "Pub/Sub" plugin properties + And Close the Plugin Properties page + And Click on configure button + And Click on pipeline config + And Click on batch time and select format + Then Save the pipeline + Then Deploy the pipeline + Then Run the Pipeline in Runtime + Then Wait for pipeline to be in status: "Running" with a timeout of 240 seconds + Then Publish the messages for text format + Then Validate OUT record count is equal to IN record count + And Stop the pipeline + Then Verify the pipeline status is "Stopped" + + @PUBSUB_SOURCE_TEST @PUBSUB_SINK_TEST @PUBSUB_SUBSCRIPTION_TEST + Scenario: Validate the successful transfer of records from a pubSub source to a pubSub sink with format Json at source and Avro at sink + Given Open Datafusion Project to configure pipeline + When Select data pipeline type as: "Realtime" + When Expand Plugin group in the LHS plugins list: "Source" + When Select plugin: "Pub/Sub" from the plugins list as: "Source" + When Expand Plugin group in the LHS plugins list: "Sink" + When Select plugin: "Pub/Sub" from the plugins list as: "Sink" + Then Connect plugins: "Pub/Sub" and "Pub/Sub2" to establish connection + Then Navigate to the properties page of plugin: "Pub/Sub" + Then Enter input plugin property: "referenceName" with value: "PubSubSourceReferenceName" + Then Replace input plugin property: "project" with value: "projectId" + Then Enter PubSub source property subscription name + Then Enter PubSub source property topic name + Then Select dropdown plugin property: "format" with option value: "json" + Then Add schema for the message + Then Validate "Pub/Sub" plugin properties + And Close the Plugin Properties page + Then Open the PubSub sink properties + Then Enter PubSub property projectId "projectId" + Then Enter PubSub property reference name + Then Enter PubSub sink property topic name + Then Select dropdown plugin property: "format" with option value: "avro" + Then Validate "Pub/Sub" plugin properties + And Close the Plugin Properties page + And Click on configure button + And Click on pipeline config + And Click on batch time and select format + Then Save the pipeline + Then Deploy the pipeline + Then Run the Pipeline in Runtime + Then Wait for pipeline to be in status: "Running" with a timeout of 240 seconds + Then Publish the messages + Then Validate OUT record count is equal to IN record count + And Stop the pipeline + Then Verify the pipeline status is "Stopped" + + @PUBSUB_SOURCE_TEST @PUBSUB_SINK_TEST @PUBSUB_SUBSCRIPTION_TEST + Scenario: Validate the successful transfer of records from a pubSub source to a pubSub sink with format Blob at source and Json at sink + Given Open Datafusion Project to configure pipeline + When Select data pipeline type as: "Realtime" + When Expand Plugin group in the LHS plugins list: "Source" + When Select plugin: "Pub/Sub" from the plugins list as: "Source" + When Expand Plugin group in the LHS plugins list: "Sink" + When Select plugin: "Pub/Sub" from the plugins list as: "Sink" + Then Connect plugins: "Pub/Sub" and "Pub/Sub2" to establish connection + Then Navigate to the properties page of plugin: "Pub/Sub" + Then Replace input plugin property: "project" with value: "projectId" + Then Enter input plugin property: "referenceName" with value: "PubSubSourceReferenceName" + Then Enter PubSub source property subscription name + Then Enter PubSub source property topic name + Then Select dropdown plugin property: "format" with option value: "blob" + Then Validate "Pub/Sub" plugin properties + And Close the Plugin Properties page + Then Navigate to the properties page of plugin: "Pub/Sub2" + Then Replace input plugin property: "project" with value: "projectId" + Then Enter input plugin property: "referenceName" with value: "PubSubSinkReferenceName" + Then Enter PubSub sink property topic name + Then Select dropdown plugin property: "format" with option value: "json" + Then Validate "Pub/Sub" plugin properties + And Close the Plugin Properties page + And Click on configure button + And Click on pipeline config + And Click on batch time and select format + Then Save the pipeline + Then Deploy the pipeline + Then Run the Pipeline in Runtime + Then Wait for pipeline to be in status: "Running" with a timeout of 240 seconds + Then Publish the messages for text format + Then Validate OUT record count is equal to IN record count + And Stop the pipeline + Then Verify the pipeline status is "Stopped" + + @PUBSUB_SCHEMA_TEST @PUBSUB_SCHEMA_TOPIC_TEST @PUBSUB_SINK_TEST @PUBSUB_SUBSCRIPTION_TEST + Scenario: Validate the successful transfer of records from a pubSub source to a pubSub sink with format Avro at source and Text at sink + Given Open Datafusion Project to configure pipeline + When Select data pipeline type as: "Realtime" + When Expand Plugin group in the LHS plugins list: "Source" + When Select plugin: "Pub/Sub" from the plugins list as: "Source" + When Expand Plugin group in the LHS plugins list: "Sink" + When Select plugin: "Pub/Sub" from the plugins list as: "Sink" + Then Connect plugins: "Pub/Sub" and "Pub/Sub2" to establish connection + Then Navigate to the properties page of plugin: "Pub/Sub" + Then Replace input plugin property: "project" with value: "projectId" + Then Enter input plugin property: "referenceName" with value: "PubSubSourceReferenceName" + Then Enter PubSub source property subscription name + Then Enter PubSub source property topic name + Then Select dropdown plugin property: "format" with option value: "avro" + Then Add schema for the message + Then Validate "Pub/Sub" plugin properties + And Close the Plugin Properties page + Then Navigate to the properties page of plugin: "Pub/Sub2" + Then Replace input plugin property: "project" with value: "projectId" + Then Enter input plugin property: "referenceName" with value: "PubSubSinkReferenceName" + Then Enter PubSub sink property topic name + Then Select dropdown plugin property: "format" with option value: "text" + Then Validate "Pub/Sub" plugin properties + And Close the Plugin Properties page + And Click on configure button + And Click on pipeline config + And Click on batch time and select format + Then Save the pipeline + Then Deploy the pipeline + Then Run the Pipeline in Runtime + Then Wait for pipeline to be in status: "Running" with a timeout of 240 seconds + Then Publish the messages with schema + Then Validate OUT record count is equal to IN record count + And Stop the pipeline + Then Verify the pipeline status is "Stopped" + + @PUBSUB_SCHEMA_TEST @PUBSUB_SCHEMA_TOPIC_TEST @PUBSUB_SINK_TEST @PUBSUB_SUBSCRIPTION_TEST + Scenario: Verify User is able to transfer messages from PubSub to PubSub in parquet format + Given Open Datafusion Project to configure pipeline + When Select data pipeline type as: "Realtime" + When Expand Plugin group in the LHS plugins list: "Source" + When Select plugin: "Pub/Sub" from the plugins list as: "Source" + When Expand Plugin group in the LHS plugins list: "Sink" + When Select plugin: "Pub/Sub" from the plugins list as: "Sink" + Then Connect plugins: "Pub/Sub" and "Pub/Sub2" to establish connection + Then Navigate to the properties page of plugin: "Pub/Sub" + Then Enter input plugin property: "referenceName" with value: "PubSubSourceReferenceName" + Then Replace input plugin property: "project" with value: "projectId" + Then Enter PubSub source property subscription name + Then Enter PubSub source property topic name + Then Select dropdown plugin property: "format" with option value: "parquet" + Then Add schema for the message + Then Validate "Pub/Sub" plugin properties + And Close the Plugin Properties page + Then Open the PubSub sink properties + Then Enter PubSub property projectId "projectId" + Then Enter PubSub property reference name + Then Enter PubSub sink property topic name + Then Select dropdown plugin property: "format" with option value: "parquet" + Then Validate "Pub/Sub" plugin properties + And Close the Plugin Properties page + And Click on configure button + And Click on pipeline config + And Click on batch time and select format + Then Save the pipeline + Then Deploy the pipeline + Then Run the Pipeline in Runtime + Then Wait for pipeline to be in status: "Running" with a timeout of 240 seconds + Then Publish the messages with schema + Then Validate OUT record count is equal to IN record count + And Stop the pipeline + Then Verify the pipeline status is "Stopped" + + @PUBSUB_SOURCE_TEST @PUBSUB_SINK_TEST @PUBSUB_SUBSCRIPTION_TEST + Scenario: Validate the successful transfer of records from a pubSub source to a pubSub sink with format Json at source and Parquet at sink + Given Open Datafusion Project to configure pipeline + When Select data pipeline type as: "Realtime" + When Expand Plugin group in the LHS plugins list: "Source" + When Select plugin: "Pub/Sub" from the plugins list as: "Source" + When Expand Plugin group in the LHS plugins list: "Sink" + When Select plugin: "Pub/Sub" from the plugins list as: "Sink" + Then Connect plugins: "Pub/Sub" and "Pub/Sub2" to establish connection + Then Navigate to the properties page of plugin: "Pub/Sub" + Then Enter input plugin property: "referenceName" with value: "PubSubSourceReferenceName" + Then Replace input plugin property: "project" with value: "projectId" + Then Enter PubSub source property subscription name + Then Enter PubSub source property topic name + Then Select dropdown plugin property: "format" with option value: "json" + Then Add schema for the message + Then Validate "Pub/Sub" plugin properties + And Close the Plugin Properties page + Then Open the PubSub sink properties + Then Enter PubSub property projectId "projectId" + Then Enter PubSub property reference name + Then Enter PubSub sink property topic name + Then Select dropdown plugin property: "format" with option value: "parquet" + Then Validate "Pub/Sub" plugin properties + And Close the Plugin Properties page + And Click on configure button + And Click on pipeline config + And Click on batch time and select format + Then Save the pipeline + Then Deploy the pipeline + Then Run the Pipeline in Runtime + Then Wait for pipeline to be in status: "Running" with a timeout of 240 seconds + Then Publish the messages + Then Validate OUT record count is equal to IN record count + And Stop the pipeline + Then Verify the pipeline status is "Stopped" + + @PUBSUB_SOURCE_TEST @PUBSUB_SINK_TEST @PUBSUB_SUBSCRIPTION_TEST + Scenario: Validate the successful transfer of records from a pubSub source to a pubSub sink with format Blob at both source and sink + Given Open Datafusion Project to configure pipeline + When Select data pipeline type as: "Realtime" + When Expand Plugin group in the LHS plugins list: "Source" + When Select plugin: "Pub/Sub" from the plugins list as: "Source" + When Expand Plugin group in the LHS plugins list: "Sink" + When Select plugin: "Pub/Sub" from the plugins list as: "Sink" + Then Connect plugins: "Pub/Sub" and "Pub/Sub2" to establish connection + Then Navigate to the properties page of plugin: "Pub/Sub" + Then Replace input plugin property: "project" with value: "projectId" + Then Enter input plugin property: "referenceName" with value: "PubSubSourceReferenceName" + Then Enter PubSub source property subscription name + Then Enter PubSub source property topic name + Then Select dropdown plugin property: "format" with option value: "blob" + Then Validate "Pub/Sub" plugin properties + And Close the Plugin Properties page + Then Navigate to the properties page of plugin: "Pub/Sub2" + Then Replace input plugin property: "project" with value: "projectId" + Then Enter input plugin property: "referenceName" with value: "PubSubSinkReferenceName" + Then Enter PubSub sink property topic name + Then Select dropdown plugin property: "format" with option value: "blob" + Then Validate "Pub/Sub" plugin properties + And Close the Plugin Properties page + And Click on configure button + And Click on pipeline config + And Click on batch time and select format + Then Save the pipeline + Then Deploy the pipeline + Then Run the Pipeline in Runtime + Then Wait for pipeline to be in status: "Running" with a timeout of 240 seconds + Then Publish the messages for text format + Then Validate OUT record count is equal to IN record count + And Stop the pipeline + Then Verify the pipeline status is "Stopped" + + @PUBSUB_SOURCE_TEST @PUBSUB_SINK_TEST @PUBSUB_SUBSCRIPTION_TEST + Scenario: Validate the successful transfer of records from a pubSub source to a pubSub sink with format Json at source and Avro at sink + Given Open Datafusion Project to configure pipeline + When Select data pipeline type as: "Realtime" + When Expand Plugin group in the LHS plugins list: "Source" + When Select plugin: "Pub/Sub" from the plugins list as: "Source" + When Expand Plugin group in the LHS plugins list: "Sink" + When Select plugin: "Pub/Sub" from the plugins list as: "Sink" + Then Connect plugins: "Pub/Sub" and "Pub/Sub2" to establish connection + Then Navigate to the properties page of plugin: "Pub/Sub" + Then Enter input plugin property: "referenceName" with value: "PubSubSourceReferenceName" + Then Replace input plugin property: "project" with value: "projectId" + Then Enter PubSub source property subscription name + Then Enter PubSub source property topic name + Then Select dropdown plugin property: "format" with option value: "json" + Then Add schema for the message + Then Validate "Pub/Sub" plugin properties + And Close the Plugin Properties page + Then Open the PubSub sink properties + Then Enter PubSub property projectId "projectId" + Then Enter PubSub property reference name + Then Enter PubSub sink property topic name + Then Select dropdown plugin property: "format" with option value: "avro" + Then Validate "Pub/Sub" plugin properties + And Close the Plugin Properties page + And Click on configure button + And Click on pipeline config + And Click on batch time and select format + Then Save the pipeline + Then Deploy the pipeline + Then Run the Pipeline in Runtime + Then Wait for pipeline to be in status: "Running" with a timeout of 240 seconds + Then Publish the messages + Then Validate OUT record count is equal to IN record count + And Stop the pipeline + Then Verify the pipeline status is "Stopped" + + @PUBSUB_SOURCE_TEST @PUBSUB_SINK_TEST @PUBSUB_SUBSCRIPTION_TEST + Scenario: Validate the successful transfer of records from a pubSub source to a pubSub sink with format Blob at source and Json at sink + Given Open Datafusion Project to configure pipeline + When Select data pipeline type as: "Realtime" + When Expand Plugin group in the LHS plugins list: "Source" + When Select plugin: "Pub/Sub" from the plugins list as: "Source" + When Expand Plugin group in the LHS plugins list: "Sink" + When Select plugin: "Pub/Sub" from the plugins list as: "Sink" + Then Connect plugins: "Pub/Sub" and "Pub/Sub2" to establish connection + Then Navigate to the properties page of plugin: "Pub/Sub" + Then Replace input plugin property: "project" with value: "projectId" + Then Enter input plugin property: "referenceName" with value: "PubSubSourceReferenceName" + Then Enter PubSub source property subscription name + Then Enter PubSub source property topic name + Then Select dropdown plugin property: "format" with option value: "blob" + Then Validate "Pub/Sub" plugin properties + And Close the Plugin Properties page + Then Navigate to the properties page of plugin: "Pub/Sub2" + Then Replace input plugin property: "project" with value: "projectId" + Then Enter input plugin property: "referenceName" with value: "PubSubSinkReferenceName" + Then Enter PubSub sink property topic name + Then Select dropdown plugin property: "format" with option value: "json" + Then Validate "Pub/Sub" plugin properties + And Close the Plugin Properties page + And Click on configure button + And Click on pipeline config + And Click on batch time and select format + Then Save the pipeline + Then Deploy the pipeline + Then Run the Pipeline in Runtime + Then Wait for pipeline to be in status: "Running" with a timeout of 240 seconds + Then Publish the messages for text format + Then Validate OUT record count is equal to IN record count + And Stop the pipeline + Then Verify the pipeline status is "Stopped" + + @PUBSUB_SCHEMA_TEST @PUBSUB_SCHEMA_TOPIC_TEST @PUBSUB_SINK_TEST @PUBSUB_SUBSCRIPTION_TEST + Scenario: Validate the successful transfer of records from a pubSub source to a pubSub sink with format Avro at source and Text at sink + Given Open Datafusion Project to configure pipeline + When Select data pipeline type as: "Realtime" + When Expand Plugin group in the LHS plugins list: "Source" + When Select plugin: "Pub/Sub" from the plugins list as: "Source" + When Expand Plugin group in the LHS plugins list: "Sink" + When Select plugin: "Pub/Sub" from the plugins list as: "Sink" + Then Connect plugins: "Pub/Sub" and "Pub/Sub2" to establish connection + Then Navigate to the properties page of plugin: "Pub/Sub" + Then Replace input plugin property: "project" with value: "projectId" + Then Enter input plugin property: "referenceName" with value: "PubSubSourceReferenceName" + Then Enter PubSub source property subscription name + Then Enter PubSub source property topic name + Then Select dropdown plugin property: "format" with option value: "avro" + Then Add schema for the message + Then Validate "Pub/Sub" plugin properties + And Close the Plugin Properties page + Then Navigate to the properties page of plugin: "Pub/Sub2" + Then Replace input plugin property: "project" with value: "projectId" + Then Enter input plugin property: "referenceName" with value: "PubSubSinkReferenceName" + Then Enter PubSub sink property topic name + Then Select dropdown plugin property: "format" with option value: "text" + Then Validate "Pub/Sub" plugin properties + And Close the Plugin Properties page + And Click on configure button + And Click on pipeline config + And Click on batch time and select format + Then Save the pipeline + Then Deploy the pipeline + Then Run the Pipeline in Runtime + Then Wait for pipeline to be in status: "Running" with a timeout of 240 seconds + Then Publish the messages with schema + Then Validate OUT record count is equal to IN record count + And Stop the pipeline + Then Verify the pipeline status is "Stopped" diff --git a/src/e2e-test/java/io/cdap/plugin/common/stepsdesign/TestSetupHooks.java b/src/e2e-test/java/io/cdap/plugin/common/stepsdesign/TestSetupHooks.java index 3fc9a35a94..e799066776 100644 --- a/src/e2e-test/java/io/cdap/plugin/common/stepsdesign/TestSetupHooks.java +++ b/src/e2e-test/java/io/cdap/plugin/common/stepsdesign/TestSetupHooks.java @@ -18,6 +18,7 @@ import com.google.cloud.bigquery.BigQueryException; import com.google.cloud.storage.Blob; import com.google.cloud.storage.StorageException; +import com.google.pubsub.v1.Encoding; import io.cdap.e2e.pages.actions.CdfConnectionActions; import io.cdap.e2e.pages.actions.CdfPluginPropertiesActions; import io.cdap.e2e.utils.BigQueryClient; @@ -62,6 +63,9 @@ public class TestSetupHooks { public static String bqSourceTable2 = StringUtils.EMPTY; public static String bqSourceView = StringUtils.EMPTY; public static String pubSubTargetTopic = StringUtils.EMPTY; + public static String pubSubSourceTopic = StringUtils.EMPTY; + public static String pubSubSourceSubscription = StringUtils.EMPTY; + public static String pubSubSchemaId = StringUtils.EMPTY; public static String spannerInstance = StringUtils.EMPTY; public static String spannerDatabase = StringUtils.EMPTY; public static String spannerSourceTable = StringUtils.EMPTY; @@ -480,12 +484,102 @@ private static String createGCSBucketWithFilesAndFolder(String folderPath) throw return bucketName; } + @Before(order = 1, value = "@PUBSUB_SOURCE_TEST") + public static void createSourcePubSubTopic() throws IOException { + pubSubSourceTopic = "cdf-e2e-test-" + UUID.randomUUID(); + PubSubClient.createTopic(pubSubSourceTopic); + BeforeActions.scenario.write("Source PubSub topic " + pubSubSourceTopic); + } + + @Before(order = 1, value = "@PUBSUB_SCHEMA_TEST") + public static void createSourcePubSubSchema() throws IOException { + pubSubSchemaId = "cdf-e2e-test-" + UUID.randomUUID(); + PubSubClient.createAvroSchema(pubSubSchemaId, PluginPropertyUtils.pluginProp("avrofile")); + BeforeActions.scenario.write("Source Schema " + pubSubSchemaId); + } + + @Before(order = 2, value = "@PUBSUB_SCHEMA_TOPIC_TEST") + public static void createSourcePubSubSchemaTopic() throws IOException, InterruptedException { + pubSubSourceTopic = "cdf-e2e-test-" + UUID.randomUUID(); + PubSubClient.createTopicWithSchema(pubSubSourceTopic, pubSubSchemaId, Encoding.BINARY); + BeforeActions.scenario.write("Schema Topic " + pubSubSourceTopic); + } + + @Before(order = 3, value = "@PUBSUB_SUBSCRIPTION_TEST") + public static void createSubscriptionPubSubTopic() throws IOException { + pubSubSourceSubscription = "cdf-e2e-test-" + UUID.randomUUID(); + PubSubClient.createSubscription(pubSubSourceSubscription, pubSubSourceTopic); + BeforeActions.scenario.write("Source PubSub subscription " + pubSubSourceSubscription); + } + + @After(order = 1, value = "@PUBSUB_SOURCE_TEST") + public static void deleteSourcePubSubTopic() { + try { + PubSubClient.deleteTopic(pubSubSourceTopic); + BeforeActions.scenario.write("Deleted target PubSub topic " + pubSubSourceTopic); + pubSubSourceTopic = StringUtils.EMPTY; + } catch (Exception e) { + if (e.getMessage().contains("Invalid resource name given") || e.getMessage().contains("Resource not found")) { + BeforeActions.scenario.write("Source PubSub topic " + pubSubSourceTopic + " does not exist."); + } else { + Assert.fail(e.getMessage()); + } + } + } + + @After(order = 2, value = "@PUBSUB_SCHEMA_TOPIC_TEST") + public static void deleteSourcePubSubSchemaTopic() { + try { + PubSubClient.deleteTopic(pubSubSourceTopic); + BeforeActions.scenario.write("Deleted target PubSub topic " + pubSubSourceTopic); + pubSubSourceTopic = StringUtils.EMPTY; + } catch (Exception e) { + if (e.getMessage().contains("Invalid resource name given") || e.getMessage().contains("Resource not found")) { + BeforeActions.scenario.write("Schema " + pubSubSchemaId + " does not exist."); + } else { + Assert.fail(e.getMessage()); + } + } + } + @Before(order = 1, value = "@PUBSUB_SINK_TEST") public static void createTargetPubSubTopic() { pubSubTargetTopic = "cdf-e2e-test-" + UUID.randomUUID(); BeforeActions.scenario.write("Target PubSub topic " + pubSubTargetTopic); } + @After(order = 1, value = "@PUBSUB_SCHEMA_TEST") + public static void deletePubSubSchema() throws IOException { + PubSubClient.deleteSchema(PluginPropertyUtils.pluginProp("projectId"), pubSubSchemaId); + BeforeActions.scenario.write("Deleted PubSub schema " + pubSubSchemaId); + } + + @After(order = 2, value = "@PUBSUB_SUBSCRIPTION_TEST") + public static void deletePubSubSubscription() throws IOException { + PubSubClient.deleteSubscription(PluginPropertyUtils.pluginProp("projectId"), pubSubSourceSubscription); + BeforeActions.scenario.write("Deleted PubSub subscription " + pubSubSourceSubscription); + } + + public static void publishMessageJsonFormat() throws IOException, InterruptedException { + String jsonMessage = PluginPropertyUtils.pluginProp("message"); + String jsonMessage2 = PluginPropertyUtils.pluginProp("message2"); + List jsonMessagesList = Arrays.asList(jsonMessage, jsonMessage2); + PubSubClient.publishMessagesWithPubsub(PluginPropertyUtils.pluginProp("projectId"), + pubSubSourceTopic, jsonMessagesList); + } + + public static void publishMessageAvroFormat() throws IOException, InterruptedException, ExecutionException { + PubSubClient.publishAvroRecords(PluginPropertyUtils.pluginProp("projectId"), pubSubSourceTopic); + } + + public static void publishMessage() throws IOException, InterruptedException { + String dataMessage1 = PluginPropertyUtils.pluginProp("firstMessage"); + String dataMessage2 = PluginPropertyUtils.pluginProp("secondMessage"); + List dataMessagesList = Arrays.asList(dataMessage1, dataMessage2); + PubSubClient.publishMessagesWithPubsub(PluginPropertyUtils.pluginProp + ("projectId"), pubSubSourceTopic, dataMessagesList); + } + @After(order = 1, value = "@PUBSUB_SINK_TEST") public static void deleteTargetPubSubTopic() { try { @@ -790,7 +884,7 @@ public static void deleteBQTableForBQExecuteTest() throws IOException, Interrupt } else { Assert.fail(e.getMessage()); } - } + } } @Before(order = 2, value = "@BQ_EXECUTE_ROW_AS_ARG_SQL") @@ -1006,7 +1100,7 @@ public static void createSourceBQExistingDatatypeTable () throws IOException, In } PluginPropertyUtils.addPluginProp("bqSourceTable", bqSourceTable); BeforeActions.scenario.write("BQ Source Table " + bqSourceTable + " created successfully"); - } + } @Before(order = 1, value = "@BQ_EXISTING_SINK_DATATYPE_TEST") public static void createSinkBQExistingDatatypeTable() throws IOException, InterruptedException { diff --git a/src/e2e-test/java/io/cdap/plugin/pubsub/actions/PubSubActions.java b/src/e2e-test/java/io/cdap/plugin/pubsub/actions/PubSubActions.java index f82befc91d..2d0ff533fc 100644 --- a/src/e2e-test/java/io/cdap/plugin/pubsub/actions/PubSubActions.java +++ b/src/e2e-test/java/io/cdap/plugin/pubsub/actions/PubSubActions.java @@ -16,12 +16,16 @@ package io.cdap.plugin.pubsub.actions; import io.cdap.e2e.pages.locators.CdfPluginPropertiesLocators; +import io.cdap.e2e.pages.locators.CdfSchemaLocators; import io.cdap.e2e.pages.locators.CdfStudioLocators; import io.cdap.e2e.utils.ElementHelper; +import io.cdap.e2e.utils.PluginPropertyUtils; import io.cdap.e2e.utils.SeleniumHelper; import io.cdap.plugin.pubsub.locators.PubSubLocators; +import org.openqa.selenium.support.ui.Select; import java.util.UUID; +import java.util.concurrent.TimeUnit; /** * PubSub plugin step actions. @@ -85,4 +89,9 @@ public static void enterSubscription(String subscription) { public static void enterNumberOfReaders(String numberOfReaders) { ElementHelper.replaceElementValue(PubSubLocators.numberOfReaders, numberOfReaders); } + + public static void selectDataType() { + Select select = new Select(PubSubLocators.messageDataType); + select.selectByIndex(9); + } } diff --git a/src/e2e-test/java/io/cdap/plugin/pubsub/locators/PubSubLocators.java b/src/e2e-test/java/io/cdap/plugin/pubsub/locators/PubSubLocators.java index 0019d5c3b1..5306d86123 100644 --- a/src/e2e-test/java/io/cdap/plugin/pubsub/locators/PubSubLocators.java +++ b/src/e2e-test/java/io/cdap/plugin/pubsub/locators/PubSubLocators.java @@ -59,11 +59,28 @@ public class PubSubLocators { @FindBy(how = How.XPATH, using = "//input[@data-cy='cmekKey']") public static WebElement cmekKey; - @FindBy(how = How.XPATH, using = "//input[@data-cy='subscription']") + @FindBy(how = How.XPATH, using = "//input[@data-testid='subscription']") public static WebElement subscription; @FindBy(how = How.XPATH, using = "//input[@data-cy='numberOfReaders']") public static WebElement numberOfReaders; + @FindBy(how = How.XPATH, using = "//input[@placeholder='Field name' and @value='']") + public static WebElement addField; + @FindBy(how = How.XPATH, using = "//span[contains(text(), \"Batch interval\")]//following-sibling::div//select[1]") + public static WebElement batchTime; + @FindBy(how = How.XPATH, using = "//span[contains(text(), \"Batch interval\")]//following-sibling::div//select[2]") + public static WebElement timeSelect; + @FindBy(how = How.XPATH, using = "//button[@data-testid='config-apply-close']") + public static WebElement saveButton; + @FindBy(how = How.XPATH, using = "//*[@data-cy='pipeline-configure-modeless-btn']") + public static WebElement configButton; + @FindBy(how = How.XPATH, using = "//*[@data-testid='tab-content-Pipeline config']") + public static WebElement pipelineConfig; + @FindBy(how = How.XPATH, using = "//select[@title='bytes']") + public static WebElement messageDataType; + + @FindBy(how = How.XPATH, using = "//div[@class='btn pipeline-action-btn pipeline-run-btn']//*[name()='svg']") + public static WebElement clickOnRuntimeArgumentButton; public static WebElement formatType(String formatType) { return SeleniumDriver.getDriver() diff --git a/src/e2e-test/java/io/cdap/plugin/pubsub/stepsdesign/PubSubSink.java b/src/e2e-test/java/io/cdap/plugin/pubsub/stepsdesign/PubSubSink.java index 927b0d3c7d..f8f69a958c 100644 --- a/src/e2e-test/java/io/cdap/plugin/pubsub/stepsdesign/PubSubSink.java +++ b/src/e2e-test/java/io/cdap/plugin/pubsub/stepsdesign/PubSubSink.java @@ -27,13 +27,16 @@ import io.cdap.plugin.utils.E2EHelper; import io.cdap.plugin.utils.E2ETestConstants; import io.cdap.plugin.utils.PubSubClient; +import io.cucumber.java.en.And; import io.cucumber.java.en.Then; import io.cucumber.java.en.When; import org.apache.commons.lang3.StringUtils; import org.junit.Assert; +import org.openqa.selenium.support.ui.Select; import stepsdesign.BeforeActions; import java.io.IOException; +import java.util.concurrent.TimeUnit; /** * PubSub Sink Plugin related step design. @@ -234,4 +237,31 @@ public void enterRuntimeArgumentValueForPubSubSinkPropertyTopicKey(String runtim public void clickOnPreviewDataForPubSubSink() { openSinkPluginPreviewData("GooglePublisher"); } + + @Then("Subscribe to the Pub/Sub messages") + public void subscribeToTheMessages() throws InterruptedException { + TimeUnit time = TimeUnit.SECONDS; + time.sleep(60); + PubSubClient.subscribeAsync(PluginPropertyUtils.pluginProp(ConstantsUtil.PROJECT_ID), + TestSetupHooks.pubSubSourceSubscription); + } + + @And("Click on batch time and select format") + public void clickOnBatchTimeAndSelectFormat() { + Select select = new Select(PubSubLocators.batchTime); + select.selectByIndex(0); + Select selectformat = new Select(PubSubLocators.timeSelect); + selectformat.selectByIndex(1); + ElementHelper.clickOnElement(PubSubLocators.saveButton); + } + + @And("Click on configure button") + public void clickOnConfigureButton() { + ElementHelper.clickOnElement(PubSubLocators.configButton); + } + + @And("Click on pipeline config") + public void clickOnPipelineConfig() { + ElementHelper.clickOnElement(PubSubLocators.pipelineConfig); + } } diff --git a/src/e2e-test/java/io/cdap/plugin/pubsub/stepsdesign/PubSubSource.java b/src/e2e-test/java/io/cdap/plugin/pubsub/stepsdesign/PubSubSource.java index d18ef2ea9c..346baee7bf 100644 --- a/src/e2e-test/java/io/cdap/plugin/pubsub/stepsdesign/PubSubSource.java +++ b/src/e2e-test/java/io/cdap/plugin/pubsub/stepsdesign/PubSubSource.java @@ -16,9 +16,21 @@ package io.cdap.plugin.pubsub.stepsdesign; +import io.cdap.e2e.pages.locators.CdfStudioLocators; import io.cdap.e2e.utils.CdfHelper; +import io.cdap.e2e.utils.ElementHelper; +import io.cdap.e2e.utils.SeleniumDriver; +import io.cdap.plugin.common.stepsdesign.TestSetupHooks; +import io.cdap.plugin.pubsub.actions.PubSubActions; +import io.cdap.plugin.pubsub.locators.PubSubLocators; import io.cucumber.java.en.Then; import io.cucumber.java.en.When; +import org.openqa.selenium.Keys; +import org.openqa.selenium.interactions.Actions; + +import java.io.IOException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; /** * PubSub Source Plugin related step design. @@ -33,6 +45,61 @@ public void sourceIsPubSub() { @Then("Open the PubSub source properties") public void openThePubSubSourceProperties() { - openSourcePluginProperties("GooglePublisher"); + openSourcePluginProperties("pubsub"); + } + + @Then("Enter PubSub source property topic name") + public void enterPubSubSourcePropertyTopicName() { + PubSubActions.enterPubSubTopic(TestSetupHooks.pubSubSourceTopic); + } + + @Then("Enter PubSub source property subscription name") + public void enterPubSubSourcePropertySubscriptionName() { + PubSubActions.enterSubscription(TestSetupHooks.pubSubSourceSubscription); + } + + @Then("Publish the messages") + public void publishTheMessage() throws IOException, InterruptedException { + TimeUnit time = TimeUnit.SECONDS; + time.sleep(60); + TestSetupHooks.publishMessageJsonFormat(); + time.sleep(20); + } + + @Then("Publish the messages for text format") + public void publishTheMessageTextFormat() throws IOException, InterruptedException, IOException { + TimeUnit time = TimeUnit.SECONDS; + time.sleep(60); + TestSetupHooks.publishMessage(); + time.sleep(20); + } + + @Then("Enter runtime argument value for PubSub source property topic key {string}") + public void enterRuntimeArgumentValueForPubSubSourcePropertyTopicKey(String runtimeArgumentKey) { + ElementHelper.clickOnElement(PubSubLocators.clickOnRuntimeArgumentButton); + ElementHelper.sendKeys(CdfStudioLocators.runtimeArgsValue(runtimeArgumentKey), TestSetupHooks.pubSubSourceTopic); + } + + @Then("Enter runtime argument value for PubSub source property subscription key {string}") + public void enterRuntimeArgumentValueForPubSubSourcePropertySubscriptionKey(String runtimeArgumentKey) { + ElementHelper.sendKeys(CdfStudioLocators.runtimeArgsValue(runtimeArgumentKey), + TestSetupHooks.pubSubSourceSubscription); + } + + @Then("Add schema for the message") + public void addSchemaForTheMessageWithOptionValue() { + PubSubActions.selectDataType(); + ElementHelper.sendKeys(PubSubLocators.addField, "name"); + Actions act = new Actions(SeleniumDriver.getDriver()); + act.sendKeys(Keys.ENTER).perform(); + ElementHelper.sendKeys(PubSubLocators.addField, "postabbr"); + } + + @Then("Publish the messages with schema") + public void publishTheMessagesWithSchema() throws InterruptedException, IOException, ExecutionException { + TimeUnit time = TimeUnit.SECONDS; + time.sleep(120); + TestSetupHooks.publishMessageAvroFormat(); + time.sleep(30); } } diff --git a/src/e2e-test/java/io/cdap/plugin/utils/PubSubClient.java b/src/e2e-test/java/io/cdap/plugin/utils/PubSubClient.java index 0331627140..4c0365f9d6 100644 --- a/src/e2e-test/java/io/cdap/plugin/utils/PubSubClient.java +++ b/src/e2e-test/java/io/cdap/plugin/utils/PubSubClient.java @@ -16,19 +16,56 @@ package io.cdap.plugin.utils; +import com.google.api.core.ApiFuture; +import com.google.api.core.ApiFutureCallback; +import com.google.api.core.ApiFutures; +import com.google.api.gax.rpc.AlreadyExistsException; +import com.google.api.gax.rpc.ApiException; +import com.google.api.gax.rpc.NotFoundException; +import com.google.cloud.pubsub.v1.AckReplyConsumer; +import com.google.cloud.pubsub.v1.MessageReceiver; +import com.google.cloud.pubsub.v1.Publisher; +import com.google.cloud.pubsub.v1.SchemaServiceClient; +import com.google.cloud.pubsub.v1.Subscriber; +import com.google.cloud.pubsub.v1.SubscriptionAdminClient; +import com.google.cloud.pubsub.v1.SubscriptionAdminSettings; import com.google.cloud.pubsub.v1.TopicAdminClient; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.protobuf.ByteString; +import com.google.pubsub.v1.Encoding; +import com.google.pubsub.v1.ProjectName; +import com.google.pubsub.v1.ProjectSubscriptionName; +import com.google.pubsub.v1.PubsubMessage; +import com.google.pubsub.v1.PushConfig; +import com.google.pubsub.v1.Schema; +import com.google.pubsub.v1.SchemaName; +import com.google.pubsub.v1.SchemaSettings; import com.google.pubsub.v1.Topic; import com.google.pubsub.v1.TopicName; import io.cdap.e2e.utils.ConstantsUtil; import io.cdap.e2e.utils.PluginPropertyUtils; +import org.apache.avro.io.Encoder; +import org.apache.avro.io.EncoderFactory; +import org.junit.Assert; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; /** * Represents PubSub client. */ public class PubSubClient { + private static final Logger logger = LoggerFactory.getLogger(PubSubClient.class); + public static Topic createTopic(String topicId) throws IOException { try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) { TopicName topicName = TopicName.of(PluginPropertyUtils.pluginProp(ConstantsUtil.PROJECT_ID), topicId); @@ -36,6 +73,29 @@ public static Topic createTopic(String topicId) throws IOException { } } + /** + * Creates a subscription for the specified subscription ID and topic ID. + * + * @param subscriptionId The ID of the subscription to be created. + * @param topicId The ID of the topic to which the subscription is associated. + * @throws IOException If an I/O error occurs while interacting with the Subscription Admin API. + */ + public static void createSubscription(String subscriptionId, String topicId) throws IOException { + ProjectSubscriptionName subscriptionName = null; + try (SubscriptionAdminClient subscriptionAdminClient = SubscriptionAdminClient + .create(SubscriptionAdminSettings.newBuilder().build())) { + TopicName topicName = TopicName.of(PluginPropertyUtils.pluginProp(ConstantsUtil.PROJECT_ID), topicId); + subscriptionName = ProjectSubscriptionName.of(PluginPropertyUtils. + pluginProp(ConstantsUtil.PROJECT_ID), subscriptionId); + subscriptionAdminClient.createSubscription(subscriptionName, topicName, PushConfig.getDefaultInstance(), 60); + logger.info("Subscription created: " + subscriptionName); + + } catch (AlreadyExistsException e) { + Assert.assertTrue("Subscription is null", subscriptionName != null); + logger.info("Subscription already exists: " + subscriptionName); + } + } + public static void deleteTopic(String topicId) throws IOException { try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) { TopicName topicName = TopicName.of(PluginPropertyUtils.pluginProp(ConstantsUtil.PROJECT_ID), topicId); @@ -56,4 +116,177 @@ public static String getTopicCmekKey(String topicId) throws IOException { return getTopic(topicId).getKmsKeyName(); } + /** + * Publishes messages to a specified Pub/Sub topic with an error handler to handle success or failure asynchronously. + * + * @param projectId The ID of the Google Cloud Platform project. + * @param topicId The ID of the Pub/Sub topic to which messages are published. + * @throws IOException If an I/O error occurs during the publishing process. + */ + public static void publishMessagesWithPubsub(String projectId, String topicId, List messages) + throws IOException, InterruptedException { + TopicName topicName = TopicName.of(PluginPropertyUtils.pluginProp(ConstantsUtil.PROJECT_ID), topicId); + Publisher publisher = null; + try { + publisher = Publisher.newBuilder(topicName).build(); + for (final String message : messages) { + ByteString data = ByteString.copyFromUtf8(message); + PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build(); + ApiFuture future = publisher.publish(pubsubMessage); + // Adding an asynchronous callback to handle success / failure + ApiFutures.addCallback(future, new ApiFutureCallback() { + + @Override + public void onFailure(Throwable throwable) { + if (throwable instanceof ApiException) { + ApiException apiException = ((ApiException) throwable); + // details on the API exception + logger.info(apiException.getStatusCode().getCode().toString()); + logger.info(Boolean.toString(apiException.isRetryable())); + } + logger.info("Error publishing message : " + message); + } + + @Override + public void onSuccess(String messageId) { + // Once published, returns server-assigned message ids (unique within the topic) + logger.info("Published message ID: " + messageId); + } + }, MoreExecutors.directExecutor()); + } + } finally { + if (publisher != null) { + // When finished with the publisher, shutdown to free up resources. + publisher.shutdown(); + publisher.awaitTermination(1, TimeUnit.MINUTES); + } + } + } + + public static void subscribeAsync(String projectId, String subscriptionId) { + ProjectSubscriptionName subscriptionName = ProjectSubscriptionName.of + (PluginPropertyUtils.pluginProp(ConstantsUtil.PROJECT_ID), subscriptionId); + // Instantiate an asynchronous message receiver. + MessageReceiver receiver = (PubsubMessage message, AckReplyConsumer consumer) -> { + // Handle incoming message, then ack the received message. + logger.info("Id: " + message.getMessageId()); + logger.info("Data: " + message.getData().toStringUtf8()); + consumer.ack(); + }; + Subscriber subscriber = null; + try { + subscriber = Subscriber.newBuilder(subscriptionName, receiver).build(); + // Start the subscriber. + subscriber.startAsync().awaitRunning(); + logger.info("Listening for messages on %s:\n", subscriptionName); + // Allow the subscriber to run for 30s unless an unrecoverable error occurs. + subscriber.awaitTerminated(300, TimeUnit.SECONDS); + } catch (TimeoutException timeoutException) { + // Shut down the subscriber after 30s. Stop receiving messages. + subscriber.stopAsync(); + } + } + + public static Schema createAvroSchema(String schemaId, String avscFile) throws IOException { + ProjectName projectName = ProjectName.of(PluginPropertyUtils.pluginProp("projectId")); + SchemaName schemaName = SchemaName.of(PluginPropertyUtils.pluginProp("projectId"), schemaId); + + // Read an Avro schema file formatted in JSON as a string. + String avscSource = new String(Files.readAllBytes(Paths.get(avscFile))); + try (SchemaServiceClient schemaServiceClient = SchemaServiceClient.create()) { + Schema schema = schemaServiceClient.createSchema(projectName, Schema.newBuilder(). + setName(schemaName.toString()).setType(Schema.Type.AVRO).setDefinition(avscSource).build(), schemaId); + logger.info("Created a schema using an Avro schema:\n" + schema); + return schema; + } catch (AlreadyExistsException e) { + logger.info(schemaName + "already exists."); + return null; + } + } + + public static void createTopicWithSchema(String topicId, String schemaId, Encoding encoding) + throws IOException, InterruptedException { + TimeUnit time = TimeUnit.SECONDS; + time.sleep(10); + TopicName topicName = TopicName.of(PluginPropertyUtils.pluginProp("projectId"), topicId); + SchemaName schemaName = SchemaName.of(PluginPropertyUtils.pluginProp("projectId"), schemaId); + SchemaSettings schemaSettings = SchemaSettings.newBuilder().setSchema + (schemaName.toString()).setEncoding(encoding).build(); + try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) { + Topic topic = topicAdminClient.createTopic(Topic.newBuilder().setName + (topicName.toString()).setSchemaSettings(schemaSettings).build()); + logger.info("Created topic with schema: " + topic.getName()); + } catch (AlreadyExistsException e) { + logger.info(schemaName + "already exists."); + } + } + + public static void publishAvroRecords(String projectId, String topicId) throws + IOException, ExecutionException, InterruptedException { + Encoding encoding = null; + TopicName topicName = TopicName.of(projectId, topicId); + // Get the topic encoding type. + try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) { + encoding = topicAdminClient.getTopic(topicName).getSchemaSettings().getEncoding(); + } + // Instantiate an avro-tools-generated class defined in `us-states.avsc`. + State state = State.newBuilder().setName("Alaska").setPostAbbr("AK").build(); + Publisher publisher = null; + block: + try { + publisher = Publisher.newBuilder(topicName).build(); + // Prepare to serialize the object to the output stream. + ByteArrayOutputStream byteStream = new ByteArrayOutputStream(); + Encoder encoder = null; + // Prepare an appropriate encoder for publishing to the topic. + switch (encoding) { + case BINARY: + logger.info("Preparing a BINARY encoder..."); + encoder = EncoderFactory.get().directBinaryEncoder(byteStream, /*reuse=*/ null); + break; + case JSON: + logger.info("Preparing a JSON encoder..."); + encoder = EncoderFactory.get().jsonEncoder(State.getClassSchema(), byteStream); + break; + default: + break block; + } + // Encode the object and write it to the output stream. + state.customEncode(encoder); + encoder.flush(); + // Publish the encoded object as a Pub/Sub message. + ByteString data = ByteString.copyFrom(byteStream.toByteArray()); + PubsubMessage message = PubsubMessage.newBuilder().setData(data).build(); + logger.info("Publishing message: " + message); + + ApiFuture future = publisher.publish(message); + logger.info("Published message ID: " + future.get()); + + } finally { + if (publisher != null) { + publisher.shutdown(); + publisher.awaitTermination(1, TimeUnit.MINUTES); + } + } + } + + public static void deleteSchema(String projectId, String schemaId) throws IOException { + SchemaName schemaName = SchemaName.of(projectId, schemaId); + try (SchemaServiceClient schemaServiceClient = SchemaServiceClient.create()) { + schemaServiceClient.deleteSchema(schemaName); + } catch (NotFoundException e) { + logger.info(schemaName + "not found."); + } + } + + public static void deleteSubscription(String projectId, String subscriptionId) throws IOException { + try (SubscriptionAdminClient subscriptionAdminClient = SubscriptionAdminClient.create()) { + ProjectSubscriptionName subscriptionName = ProjectSubscriptionName.of(projectId, subscriptionId); + try { + subscriptionAdminClient.deleteSubscription(subscriptionName); + } catch (NotFoundException e) { + logger.info(e.getMessage()); + } + } + } } diff --git a/src/e2e-test/java/io/cdap/plugin/utils/State.java b/src/e2e-test/java/io/cdap/plugin/utils/State.java new file mode 100644 index 0000000000..e4ff240553 --- /dev/null +++ b/src/e2e-test/java/io/cdap/plugin/utils/State.java @@ -0,0 +1,419 @@ +/** + * Autogenerated by Avro + *

+ * DO NOT EDIT DIRECTLY + */ +package io.cdap.plugin.utils; + +import org.apache.avro.message.BinaryMessageDecoder; +import org.apache.avro.message.BinaryMessageEncoder; +import org.apache.avro.message.SchemaStore; +import org.apache.avro.specific.SpecificData; +import org.apache.avro.util.Utf8; + +/** A list of states in the United States of America. */ +@org.apache.avro.specific.AvroGenerated +public class State extends org.apache.avro.specific.SpecificRecordBase + implements org.apache.avro.specific.SpecificRecord { + public static final org.apache.avro.Schema SCHEMA = new org.apache.avro.Schema.Parser() + .parse("{\"type\":\"record\",\"name\":\"State\",\"namespace\":\"utilities\",\"doc\":\"A list of states in the" + + " United States of America.\",\"fields\":[{\"name\":\"name\",\"type\":\"string\",\"doc\":\"The common " + + "name of the state.\"},{\"name\":\"post_abbr\",\"type\":\"string\",\"doc\":\"The postal code " + + "abbreviation of the state.\"}]}"); + private static final long serialVersionUID = -6098929419967278282L; + private static final SpecificData MODEL$ = new SpecificData(); + private static final BinaryMessageEncoder ENCODER = + new BinaryMessageEncoder<>(MODEL$, SCHEMA); + private static final BinaryMessageDecoder DECODER = + new BinaryMessageDecoder<>(MODEL$, SCHEMA); + @SuppressWarnings("unchecked") + private static final org.apache.avro.io.DatumWriter + WRITER$ = (org.apache.avro.io.DatumWriter) MODEL$.createDatumWriter(SCHEMA); + @SuppressWarnings("unchecked") + private static final org.apache.avro.io.DatumReader + READER$ = (org.apache.avro.io.DatumReader) MODEL$.createDatumReader(SCHEMA); + /** The common name of the state. */ + private CharSequence name; + /** The postal code abbreviation of the state. */ + private CharSequence postabbr; + + /** + * Default constructor. Note that this does not initialize fields + * to their default values from the schema. If that is desired then + * one should use newBuilder(). + */ + public State() { + } + + /** + * All-args constructor. + * @param name The common name of the state. + * @param postabbr The postal code abbreviation of the state. + */ + public State(CharSequence name, CharSequence postabbr) { + this.name = name; + this.postabbr = postabbr; + } + + public static org.apache.avro.Schema getClassSchema() { + return SCHEMA; + } + + /** + * Return the BinaryMessageEncoder instance used by this class. + * @return the message encoder used by this class + */ + public static BinaryMessageEncoder getEncoder() { + return ENCODER; + } + + /** + * Return the BinaryMessageDecoder instance used by this class. + * @return the message decoder used by this class + */ + public static BinaryMessageDecoder getDecoder() { + return DECODER; + } + + /** + * Create a new BinaryMessageDecoder instance for this class that uses the specified {@link SchemaStore}. + * @param resolver a {@link SchemaStore} used to find schemas by fingerprint + * @return a BinaryMessageDecoder instance for this class backed by the given SchemaStore + */ + public static BinaryMessageDecoder createDecoder(SchemaStore resolver) { + return new BinaryMessageDecoder<>(MODEL$, SCHEMA, resolver); + } + + /** + * Deserializes a State from a ByteBuffer. + * @param b a byte buffer holding serialized data for an instance of this class + * @return a State instance decoded from the given buffer + * @throws java.io.IOException if the given bytes could not be deserialized into an instance of this class + */ + public static State fromByteBuffer( + java.nio.ByteBuffer b) throws java.io.IOException { + return DECODER.decode(b); + } + + /** + * Creates a new State RecordBuilder. + * @return A new State RecordBuilder + */ + public static State.Builder newBuilder() { + return new State.Builder(); + } + + /** + * Creates a new State RecordBuilder by copying an existing Builder. + * @param other The existing builder to copy. + * @return A new State RecordBuilder + */ + public static State.Builder newBuilder(State.Builder other) { + if (other == null) { + return new State.Builder(); + } else { + return new State.Builder(other); + } + } + + /** + * Creates a new State RecordBuilder by copying an existing State instance. + * @param other The existing instance to copy. + * @return A new State RecordBuilder + */ + public static State.Builder newBuilder(State other) { + if (other == null) { + return new State.Builder(); + } else { + return new State.Builder(other); + } + } + + /** + * Serializes this State to a ByteBuffer. + * @return a buffer holding the serialized data for this instance + * @throws java.io.IOException if this instance could not be serialized + */ + public java.nio.ByteBuffer toByteBuffer() throws java.io.IOException { + return ENCODER.encode(this); + } + + @Override + public SpecificData getSpecificData() { + return MODEL$; + } + + @Override + public org.apache.avro.Schema getSchema() { + return SCHEMA; + } + + // Used by DatumWriter. Applications should not call. + @Override + public Object get(int field) { + switch (field) { + case 0: + return name; + case 1: + return postabbr; + default: + throw new IndexOutOfBoundsException("Invalid index: " + field); + } + } + + // Used by DatumReader. Applications should not call. + @Override + @SuppressWarnings(value = "unchecked") + public void put(int field, Object value) { + switch (field) { + case 0: + name = (CharSequence) value; + break; + case 1: + postabbr = (CharSequence) value; + break; + default: + throw new IndexOutOfBoundsException("Invalid index: " + field); + } + } + + /** + * Gets the value of the 'name' field. + * @return The common name of the state. + */ + public CharSequence getName() { + return name; + } + + /** + * Sets the value of the 'name' field. + * The common name of the state. + * @param value the value to set. + */ + public void setName(CharSequence value) { + this.name = value; + } + + /** + * Gets the value of the 'post_abbr' field. + * @return The postal code abbreviation of the state. + */ + public CharSequence getPostAbbr() { + return postabbr; + } + + /** + * Sets the value of the 'post_abbr' field. + * The postal code abbreviation of the state. + * @param value the value to set. + */ + public void setPostAbbr(CharSequence value) { + this.postabbr = value; + } + + @Override + public void writeExternal(java.io.ObjectOutput out) + throws java.io.IOException { + WRITER$.write(this, SpecificData.getEncoder(out)); + } + + @Override + public void readExternal(java.io.ObjectInput in) + throws java.io.IOException { + READER$.read(this, SpecificData.getDecoder(in)); + } + + @Override + protected boolean hasCustomCoders() { + return true; + } + + @Override + public void customEncode(org.apache.avro.io.Encoder out) + throws java.io.IOException { + out.writeString(this.name); + + out.writeString(this.postabbr); + + } + + @Override + public void customDecode(org.apache.avro.io.ResolvingDecoder in) + throws java.io.IOException { + org.apache.avro.Schema.Field[] fieldOrder = in.readFieldOrderIfDiff(); + if (fieldOrder == null) { + this.name = in.readString(this.name instanceof Utf8 ? (Utf8) this.name : null); + + this.postabbr = in.readString(this.postabbr instanceof Utf8 ? (Utf8) this.postabbr : null); + + } else { + for (int i = 0; i < 2; i++) { + switch (fieldOrder[i].pos()) { + case 0: + this.name = in.readString(this.name instanceof Utf8 ? (Utf8) this.name : null); + break; + + case 1: + this.postabbr = in.readString(this.postabbr instanceof Utf8 ? (Utf8) this.postabbr : null); + break; + + default: + throw new java.io.IOException("Corrupt ResolvingDecoder."); + } + } + } + } + + /** + * RecordBuilder for State instances. + */ + @org.apache.avro.specific.AvroGenerated + public static class Builder extends org.apache.avro.specific.SpecificRecordBuilderBase + implements org.apache.avro.data.RecordBuilder { + + /** The common name of the state. */ + private CharSequence name; + /** The postal code abbreviation of the state. */ + private CharSequence postabbr; + + /** Creates a new Builder. */ + private Builder() { + super(SCHEMA, MODEL$); + } + + /** + * Creates a Builder by copying an existing Builder. + * @param other The existing Builder to copy. + */ + private Builder(State.Builder other) { + super(other); + if (isValidValue(fields()[0], other.name)) { + this.name = data().deepCopy(fields()[0].schema(), other.name); + fieldSetFlags()[0] = other.fieldSetFlags()[0]; + } + if (isValidValue(fields()[1], other.postabbr)) { + this.postabbr = data().deepCopy(fields()[1].schema(), other.postabbr); + fieldSetFlags()[1] = other.fieldSetFlags()[1]; + } + } + + /** + * Creates a Builder by copying an existing State instance. + * @param other The existing instance to copy. + */ + private Builder(State other) { + super(SCHEMA, MODEL$); + if (isValidValue(fields()[0], other.name)) { + this.name = data().deepCopy(fields()[0].schema(), other.name); + fieldSetFlags()[0] = true; + } + if (isValidValue(fields()[1], other.postabbr)) { + this.postabbr = data().deepCopy(fields()[1].schema(), other.postabbr); + fieldSetFlags()[1] = true; + } + } + + /** + * Gets the value of the 'name' field. + * The common name of the state. + * @return The value. + */ + public CharSequence getName() { + return name; + } + + + /** + * Sets the value of the 'name' field. + * The common name of the state. + * @param value The value of 'name'. + * @return This builder. + */ + public State.Builder setName(CharSequence value) { + validate(fields()[0], value); + this.name = value; + fieldSetFlags()[0] = true; + return this; + } + + /** + * Checks whether the 'name' field has been set. + * The common name of the state. + * @return True if the 'name' field has been set, false otherwise. + */ + public boolean hasName() { + return fieldSetFlags()[0]; + } + + + /** + * Clears the value of the 'name' field. + * The common name of the state. + * @return This builder. + */ + public State.Builder clearName() { + name = null; + fieldSetFlags()[0] = false; + return this; + } + + /** + * Gets the value of the 'post_abbr' field. + * The postal code abbreviation of the state. + * @return The value. + */ + public CharSequence getPostAbbr() { + return postabbr; + } + + + /** + * Sets the value of the 'post_abbr' field. + * The postal code abbreviation of the state. + * @param value The value of 'post_abbr'. + * @return This builder. + */ + public State.Builder setPostAbbr(CharSequence value) { + validate(fields()[1], value); + this.postabbr = value; + fieldSetFlags()[1] = true; + return this; + } + + /** + * Checks whether the 'post_abbr' field has been set. + * The postal code abbreviation of the state. + * @return True if the 'post_abbr' field has been set, false otherwise. + */ + public boolean hasPostAbbr() { + return fieldSetFlags()[1]; + } + + + /** + * Clears the value of the 'post_abbr' field. + * The postal code abbreviation of the state. + * @return This builder. + */ + public State.Builder clearPostAbbr() { + postabbr = null; + fieldSetFlags()[1] = false; + return this; + } + + @Override + @SuppressWarnings("unchecked") + public State build() { + try { + State record = new State(); + record.name = fieldSetFlags()[0] ? this.name : (CharSequence) defaultValue(fields()[0]); + record.postabbr = fieldSetFlags()[1] ? this.postabbr : (CharSequence) defaultValue(fields()[1]); + return record; + } catch (org.apache.avro.AvroMissingFieldException e) { + throw e; + } catch (Exception e) { + throw new org.apache.avro.AvroRuntimeException(e); + } + } + } +} diff --git a/src/e2e-test/resources/pluginDataCyAttributes.properties b/src/e2e-test/resources/pluginDataCyAttributes.properties index 2f321c0333..420d52c89a 100644 --- a/src/e2e-test/resources/pluginDataCyAttributes.properties +++ b/src/e2e-test/resources/pluginDataCyAttributes.properties @@ -5,6 +5,7 @@ table=table tableKey=relationTableKey clusterOrder=clusteringOrder dataset=dataset +datatype=select-undefined skipHeader=switch-skipHeader path=path name=name diff --git a/src/e2e-test/resources/pluginParameters.properties b/src/e2e-test/resources/pluginParameters.properties index ae191f7c90..a8ab7f3957 100644 --- a/src/e2e-test/resources/pluginParameters.properties +++ b/src/e2e-test/resources/pluginParameters.properties @@ -181,6 +181,9 @@ bqUpdateTableSchemaTrue=True clusterValue=transaction_date TableKey=PersonID bqSourceTable=dummy +message={"name":"adam","postabbr":"AK"} +message2={"name":"kamal","postabbr":"ZS"} +avrofile=src/e2e-test/resources/testdata/pubsubavrofile/avrofile.avsc bqCreateTableQueryFile=testdata/BigQuery/BigQueryCreateTableQuery.txt bqInsertDataQueryFile=testdata/BigQuery/BigQueryInsertDataQuery.txt bqCreateViewQueryFile=testdata/BigQuery/BigQueryCreateViewQuery.txt @@ -242,7 +245,10 @@ pubSubRetryTimeOut=30 pubSubErrorThreshold=0 pubSubStringValue=one pubSubNegativeValue=-100 +record=option-record pubsubDelimiter=@ +firstMessage=helloMessage +secondMessage=hiMessage ## PUBSUBSINK-PLUGIN-PROPERTIES-END ## GCSDELETE-PLUGIN-PROPERTIES-START diff --git a/src/e2e-test/resources/testdata/pubsubavrofile/avrofile.avsc b/src/e2e-test/resources/testdata/pubsubavrofile/avrofile.avsc new file mode 100644 index 0000000000..b64ecf8d37 --- /dev/null +++ b/src/e2e-test/resources/testdata/pubsubavrofile/avrofile.avsc @@ -0,0 +1,18 @@ +{ + "type":"record", + "name":"State", + "namespace":"utilities", + "doc":"A list of states in the United States of America.", + "fields":[ + { + "name":"name", + "type":"string", + "doc":"The common name of the state." + }, + { + "name":"postabbr", + "type":"string", + "doc":"The postal code abbreviation of the state." + } + ] +} \ No newline at end of file diff --git a/src/main/java/io/cdap/plugin/gcp/bigquery/sqlengine/builder/BigQueryWindowsAggregationSQLBuilder.java b/src/main/java/io/cdap/plugin/gcp/bigquery/sqlengine/builder/BigQueryWindowsAggregationSQLBuilder.java index 38e06ea649..f2e7d08813 100644 --- a/src/main/java/io/cdap/plugin/gcp/bigquery/sqlengine/builder/BigQueryWindowsAggregationSQLBuilder.java +++ b/src/main/java/io/cdap/plugin/gcp/bigquery/sqlengine/builder/BigQueryWindowsAggregationSQLBuilder.java @@ -21,7 +21,6 @@ import io.cdap.cdap.etl.api.relational.Expression; import io.cdap.plugin.gcp.bigquery.relational.SQLExpression; import io.cdap.plugin.gcp.bigquery.sqlengine.util.BigQuerySQLEngineUtils; -import org.mortbay.log.Log; import org.slf4j.Logger; import org.slf4j.LoggerFactory;