diff --git a/src/e2e-test/features/pubsub/pubsubadditionalfeature/PubSubToPubSub.feature b/src/e2e-test/features/pubsub/pubsubadditionalfeature/PubSubToPubSub.feature new file mode 100644 index 0000000000..423cda02c1 --- /dev/null +++ b/src/e2e-test/features/pubsub/pubsubadditionalfeature/PubSubToPubSub.feature @@ -0,0 +1,166 @@ +# Copyright © 2024 Cask Data, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may not +# use this file except in compliance with the License. You may obtain a copy of +# the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations under +# the License. + +@PubSub_DataStream +Feature: PubSub - Verification of successful data transfer from DataStream PubSub source to PubSub sink using different file formats + + @PUBSUB_SOURCE_TEST @PUBSUB_SINK_TEST @PUBSUB_SUBSCRIPTION_TEST + Scenario: Validate successful transfer of records from PubSub(source) to PubSub(sink). + Given Open Datafusion Project to configure pipeline + When Select data pipeline type as: "Realtime" + When Expand Plugin group in the LHS plugins list: "Source" + When Select plugin: "Pub/Sub" from the plugins list as: "Source" + When Expand Plugin group in the LHS plugins list: "Sink" + When Select plugin: "Pub/Sub" from the plugins list as: "Sink" + Then Connect plugins: "Pub/Sub" and "Pub/Sub2" to establish connection + Then Navigate to the properties page of plugin: "Pub/Sub" + Then Override Service account details if set in environment variables + Then Enter input plugin property: "referenceName" with value: "PubSubSourceReferenceName" + Then Enter PubSub source property subscription name + Then Enter PubSub source property topic name + Then Validate "Pub/Sub" plugin properties + And Close the Plugin Properties page + Then Navigate to the properties page of plugin: "Pub/Sub2" + Then Replace input plugin property: "project" with value: "projectId" + Then Override Service account details if set in environment variables + Then Enter input plugin property: "referenceName" with value: "PubSubSinkReferenceName" + Then Enter PubSub sink property topic name + Then Validate "Pub/Sub" plugin properties + And Close the Plugin Properties page + And Click on configure button + And Click on pipeline config + And Click on batch time and select format + Then Save the pipeline + Then Deploy the pipeline + Then Run the Pipeline in Runtime + Then Wait for pipeline to be in status: "Running" with a timeout of 240 seconds + Then Publish the messages + Then Subscribe to the messages + Then Validate OUT record count is equal to IN record count + And Stop the pipeline + Then Verify the pipeline status is "Stopped" + + @PUBSUB_SOURCE_TEST @PUBSUB_SINK_TEST @PUBSUB_SUBSCRIPTION_TEST + Scenario: Validate successful transfer of records from PubSub(source) to PubSub(sink) using macros. + Given Open Datafusion Project to configure pipeline + When Select data pipeline type as: "Realtime" + When Expand Plugin group in the LHS plugins list: "Source" + When Select plugin: "Pub/Sub" from the plugins list as: "Source" + When Expand Plugin group in the LHS plugins list: "Sink" + When Select plugin: "Pub/Sub" from the plugins list as: "Sink" + Then Connect plugins: "Pub/Sub" and "Pub/Sub2" to establish connection + Then Navigate to the properties page of plugin: "Pub/Sub" + Then Override Service account details if set in environment variables + Then Enter input plugin property: "referenceName" with value: "PubSubSourceReferenceName" + Then Click on the Macro button of Property: "topic" and set the value to: "pubSubSourceTopic" + Then Click on the Macro button of Property: "subscription" and set the value to: "pubSubSourceSubscription" + Then Validate "Pub/Sub" plugin properties + And Close the Plugin Properties page + Then Navigate to the properties page of plugin: "Pub/Sub2" + Then Replace input plugin property: "project" with value: "projectId" + Then Override Service account details if set in environment variables + Then Enter input plugin property: "referenceName" with value: "PubSubSinkReferenceName" + Then Enter PubSub sink property topic name + Then Validate "Pub/Sub" plugin properties + And Close the Plugin Properties page + And Click on configure button + And Click on pipeline config + And Click on batch time and select format + Then Save the pipeline + Then Deploy the pipeline + Then Enter runtime argument value for PubSub source property topic key "pubSubSourceTopic" + Then Enter runtime argument value for PubSub source property subscription key "pubSubSourceSubscription" + Then Run the Pipeline in Runtime with runtime arguments + Then Wait for pipeline to be in status: "Running" with a timeout of 240 seconds + Then Publish the messages + Then Subscribe to the messages + Then Validate OUT record count is equal to IN record count + And Stop the pipeline + Then Verify the pipeline status is "Stopped" + + @PUBSUB_SOURCE_TEST @PUBSUB_SINK_TEST @PUBSUB_SUBSCRIPTION_TEST + Scenario: Validate the successful transfer of records from a pubSub source to a pubSub sink with format Text at both source and sink + Given Open Datafusion Project to configure pipeline + When Select data pipeline type as: "Realtime" + When Expand Plugin group in the LHS plugins list: "Source" + When Select plugin: "Pub/Sub" from the plugins list as: "Source" + When Expand Plugin group in the LHS plugins list: "Sink" + When Select plugin: "Pub/Sub" from the plugins list as: "Sink" + Then Connect plugins: "Pub/Sub" and "Pub/Sub2" to establish connection + Then Navigate to the properties page of plugin: "Pub/Sub" + Then Override Service account details if set in environment variables + Then Enter input plugin property: "referenceName" with value: "PubSubSourceReferenceName" + Then Enter PubSub source property subscription name + Then Enter PubSub source property topic name + Then Select dropdown plugin property: "select-format" with option value: "text" + Then Validate "Pub/Sub" plugin properties + And Close the Plugin Properties page + Then Navigate to the properties page of plugin: "Pub/Sub2" + Then Replace input plugin property: "project" with value: "projectId" + Then Override Service account details if set in environment variables + Then Enter input plugin property: "referenceName" with value: "PubSubSinkReferenceName" + Then Enter PubSub sink property topic name + Then Select dropdown plugin property: "select-format" with option value: "text" + Then Validate "Pub/Sub" plugin properties + And Close the Plugin Properties page + And Click on configure button + And Click on pipeline config + And Click on batch time and select format + Then Save the pipeline + Then Deploy the pipeline + Then Run the Pipeline in Runtime + Then Wait for pipeline to be in status: "Running" with a timeout of 240 seconds + Then Publish the messages + Then Subscribe to the messages + Then Validate OUT record count is equal to IN record count + And Stop the pipeline + Then Verify the pipeline status is "Stopped" + + @PUBSUB_SOURCE_TEST @PUBSUB_SINK_TEST @PUBSUB_SUBSCRIPTION_TEST + Scenario: Validate the successful transfer of records from a pubSub source to a pubSub sink with format Text at source and Json at sink + Given Open Datafusion Project to configure pipeline + When Select data pipeline type as: "Realtime" + When Expand Plugin group in the LHS plugins list: "Source" + When Select plugin: "Pub/Sub" from the plugins list as: "Source" + When Expand Plugin group in the LHS plugins list: "Sink" + When Select plugin: "Pub/Sub" from the plugins list as: "Sink" + Then Connect plugins: "Pub/Sub" and "Pub/Sub2" to establish connection + Then Navigate to the properties page of plugin: "Pub/Sub" + Then Override Service account details if set in environment variables + Then Enter input plugin property: "referenceName" with value: "PubSubSourceReferenceName" + Then Enter PubSub source property subscription name + Then Enter PubSub source property topic name + Then Select dropdown plugin property: "select-format" with option value: "text" + Then Validate "Pub/Sub" plugin properties + And Close the Plugin Properties page + Then Navigate to the properties page of plugin: "Pub/Sub2" + Then Replace input plugin property: "project" with value: "projectId" + Then Override Service account details if set in environment variables + Then Enter input plugin property: "referenceName" with value: "PubSubSinkReferenceName" + Then Enter PubSub sink property topic name + Then Select dropdown plugin property: "select-format" with option value: "json" + Then Validate "Pub/Sub" plugin properties + And Close the Plugin Properties page + And Click on configure button + And Click on pipeline config + And Click on batch time and select format + Then Save the pipeline + Then Deploy the pipeline + Then Run the Pipeline in Runtime + Then Wait for pipeline to be in status: "Running" with a timeout of 240 seconds + Then Publish the messages + Then Subscribe to the messages + Then Validate OUT record count is equal to IN record count + And Stop the pipeline + Then Verify the pipeline status is "Stopped" diff --git a/src/e2e-test/java/io/cdap/plugin/common/stepsdesign/TestSetupHooks.java b/src/e2e-test/java/io/cdap/plugin/common/stepsdesign/TestSetupHooks.java index 3fc9a35a94..af05fccfc9 100644 --- a/src/e2e-test/java/io/cdap/plugin/common/stepsdesign/TestSetupHooks.java +++ b/src/e2e-test/java/io/cdap/plugin/common/stepsdesign/TestSetupHooks.java @@ -21,6 +21,7 @@ import io.cdap.e2e.pages.actions.CdfConnectionActions; import io.cdap.e2e.pages.actions.CdfPluginPropertiesActions; import io.cdap.e2e.utils.BigQueryClient; +import io.cdap.e2e.utils.ConstantsUtil; import io.cdap.e2e.utils.PluginPropertyUtils; import io.cdap.e2e.utils.StorageClient; import io.cdap.plugin.utils.PubSubClient; @@ -62,6 +63,8 @@ public class TestSetupHooks { public static String bqSourceTable2 = StringUtils.EMPTY; public static String bqSourceView = StringUtils.EMPTY; public static String pubSubTargetTopic = StringUtils.EMPTY; + public static String pubSubSourceTopic = StringUtils.EMPTY; + public static String pubSubSourceSubscription = StringUtils.EMPTY; public static String spannerInstance = StringUtils.EMPTY; public static String spannerDatabase = StringUtils.EMPTY; public static String spannerSourceTable = StringUtils.EMPTY; @@ -480,6 +483,56 @@ private static String createGCSBucketWithFilesAndFolder(String folderPath) throw return bucketName; } + @Before(order = 1, value = "@PUBSUB_SOURCE_TEST") + public static void createSourcePubSubTopic() throws IOException { + pubSubSourceTopic = "cdf-e2e-test-" + UUID.randomUUID(); + PubSubClient.createTopic(pubSubSourceTopic); + BeforeActions.scenario.write("Source PubSub topic " + pubSubSourceTopic); + } + + @Before(order = 1, value = "@PUBSUB_SUBSCRIPTION_TEST") + public static void createSubscriptionPubSubTopic() throws IOException { + pubSubSourceSubscription = "cdf-e2e-test-" + UUID.randomUUID(); + PubSubClient.createSubscription(pubSubSourceSubscription , pubSubSourceTopic); + BeforeActions.scenario.write("Source PubSub subscription " + pubSubSourceSubscription); + } + + @After(order = 1, value = "@PUBSUB_SOURCE_TEST") + public static void deleteSourcePubSubTopic() { + try { + PubSubClient.deleteTopic(pubSubSourceTopic); + BeforeActions.scenario.write("Deleted target PubSub topic " + pubSubSourceTopic); + pubSubSourceTopic = StringUtils.EMPTY; + } catch (Exception e) { + if (e.getMessage().contains("Invalid resource name given") || e.getMessage().contains("Resource not found")) { + + } + } + } + + @After(order = 1, value = "@PUBSUB_SUBSCRIPTION_TEST") + public static void deleteSourcePubSubSubscription() { + try { + PubSubClient.deleteTopic(pubSubSourceSubscription); + BeforeActions.scenario.write("Deleted target PubSub topic " + pubSubSourceSubscription); + pubSubSourceSubscription = StringUtils.EMPTY; + } catch (Exception e) { + if (e.getMessage().contains("Invalid resource name given") || e.getMessage().contains("Resource not found")) { + BeforeActions.scenario.write("Source PubSub topic " + pubSubSourceTopic + " does not exist."); + } else { + Assert.fail(e.getMessage()); + } + } + } + + public static void publishMessage() throws IOException, InterruptedException { + String dataMessage1 = PluginPropertyUtils.pluginProp("firstMessage"); + String dataMessage2 = PluginPropertyUtils.pluginProp("secondMessage"); + List dataMessagesList = Arrays.asList(dataMessage1, dataMessage2); + PubSubClient.publishMessagesWithPubSub(PluginPropertyUtils.pluginProp + (ConstantsUtil.PROJECT_ID), pubSubSourceTopic, dataMessagesList); + } + @Before(order = 1, value = "@PUBSUB_SINK_TEST") public static void createTargetPubSubTopic() { pubSubTargetTopic = "cdf-e2e-test-" + UUID.randomUUID(); @@ -1141,7 +1194,7 @@ public static void createSinkBQDedupeTable() throws IOException, InterruptedExce @Before(value = "@BQ_INSERT_INT_SOURCE_TEST") public static void createSourceBQTable() throws IOException, InterruptedException { - bqSourceTable = "E2E_TARGET_" + UUID.randomUUID().toString().replaceAll("-", "_"); + bqSourceTable = "E2E_SOURCE_" + UUID.randomUUID().toString().replaceAll("-", "_"); PluginPropertyUtils.addPluginProp("bqSourceTable", bqSourceTable); BeforeActions.scenario.write("BQ source table name - " + bqSourceTable); BigQueryClient.getSoleQueryResult("create table `" + datasetName + "." + bqSourceTable + "` " + diff --git a/src/e2e-test/java/io/cdap/plugin/pubsub/locators/PubSubLocators.java b/src/e2e-test/java/io/cdap/plugin/pubsub/locators/PubSubLocators.java index 0019d5c3b1..a7589eb9db 100644 --- a/src/e2e-test/java/io/cdap/plugin/pubsub/locators/PubSubLocators.java +++ b/src/e2e-test/java/io/cdap/plugin/pubsub/locators/PubSubLocators.java @@ -74,4 +74,19 @@ public static WebElement selectedFormat(String format) { return SeleniumDriver.getDriver() .findElement(By.xpath("//*[@data-cy='select-format']/div[text()='" + format + "']")); } + + @FindBy(how = How.XPATH, using = "//span[contains(text(), \"Batch interval\")]//following-sibling::div//select[1]") + public static WebElement batchTime; + + @FindBy(how = How.XPATH, using = "//span[contains(text(), \"Batch interval\")]//following-sibling::div//select[2]") + public static WebElement timeSelect; + + @FindBy(how = How.XPATH, using = "//button[@data-testid='config-apply-close']") + public static WebElement saveButton; + + @FindBy(how = How.XPATH, using = "//*[@data-cy='pipeline-configure-modeless-btn']") + public static WebElement configButton; + + @FindBy(how = How.XPATH, using = "//*[@data-cy='tab-content-Pipeline config']") + public static WebElement pipelineConfig; } diff --git a/src/e2e-test/java/io/cdap/plugin/pubsub/runners/sinkrunner/TestRunner.java b/src/e2e-test/java/io/cdap/plugin/pubsub/runners/sinkrunner/TestRunner.java index c49b12f04c..e44c75b407 100644 --- a/src/e2e-test/java/io/cdap/plugin/pubsub/runners/sinkrunner/TestRunner.java +++ b/src/e2e-test/java/io/cdap/plugin/pubsub/runners/sinkrunner/TestRunner.java @@ -27,7 +27,7 @@ features = {"src/e2e-test/features"}, glue = {"io.cdap.plugin.pubsub.stepsdesign", "io.cdap.plugin.gcs.stepsdesign", "io.cdap.plugin.common.stepsdesign", "io.cdap.plugin.bigquery.stepsdesign", "stepsdesign"}, - tags = {"@PubSub_Sink"}, + tags = {"@PubSub_Sink","@PubSub_DataStream"}, monochrome = true, plugin = {"pretty", "html:target/cucumber-html-report/pubsub-sink", "json:target/cucumber-reports/cucumber-pubsub-sink.json", diff --git a/src/e2e-test/java/io/cdap/plugin/pubsub/runners/sinkrunner/TestRunnerRequired.java b/src/e2e-test/java/io/cdap/plugin/pubsub/runners/sinkrunner/TestRunnerRequired.java index 352d31a6c3..c9ee839731 100644 --- a/src/e2e-test/java/io/cdap/plugin/pubsub/runners/sinkrunner/TestRunnerRequired.java +++ b/src/e2e-test/java/io/cdap/plugin/pubsub/runners/sinkrunner/TestRunnerRequired.java @@ -27,7 +27,7 @@ features = {"src/e2e-test/features"}, glue = {"io.cdap.plugin.pubsub.stepsdesign", "io.cdap.plugin.gcs.stepsdesign", "io.cdap.plugin.common.stepsdesign", "io.cdap.plugin.bigquery.stepsdesign", "stepsdesign"}, - tags = {"@PubSub_Sink_Required"}, + tags = {"@PubSub_Sink_Required","@PubSub_DataStream_Required"}, monochrome = true, plugin = {"pretty", "html:target/cucumber-html-report/pubsub-sink-required", "json:target/cucumber-reports/cucumber-pubsub-sink-required.json", diff --git a/src/e2e-test/java/io/cdap/plugin/pubsub/stepsdesign/PubSubSink.java b/src/e2e-test/java/io/cdap/plugin/pubsub/stepsdesign/PubSubSink.java index 927b0d3c7d..17888b0fdf 100644 --- a/src/e2e-test/java/io/cdap/plugin/pubsub/stepsdesign/PubSubSink.java +++ b/src/e2e-test/java/io/cdap/plugin/pubsub/stepsdesign/PubSubSink.java @@ -27,13 +27,16 @@ import io.cdap.plugin.utils.E2EHelper; import io.cdap.plugin.utils.E2ETestConstants; import io.cdap.plugin.utils.PubSubClient; +import io.cucumber.java.en.And; import io.cucumber.java.en.Then; import io.cucumber.java.en.When; import org.apache.commons.lang3.StringUtils; import org.junit.Assert; +import org.openqa.selenium.support.ui.Select; import stepsdesign.BeforeActions; import java.io.IOException; +import java.util.concurrent.TimeUnit; /** * PubSub Sink Plugin related step design. @@ -234,4 +237,31 @@ public void enterRuntimeArgumentValueForPubSubSinkPropertyTopicKey(String runtim public void clickOnPreviewDataForPubSubSink() { openSinkPluginPreviewData("GooglePublisher"); } + + @Then("Subscribe to the messages") + public void subscribeToTheMessages() throws InterruptedException { + TimeUnit time = TimeUnit.SECONDS; + time.sleep(60); + PubSubClient.subscribeAsyncExample(PluginPropertyUtils.pluginProp(ConstantsUtil.PROJECT_ID), + TestSetupHooks.pubSubSourceSubscription); + } + + @And("Click on batch time and select format") + public void clickOnBatchTimeAndSelectFormat() { + Select select = new Select(PubSubLocators.batchTime); + select.selectByIndex(0); + Select selectformat = new Select(PubSubLocators.timeSelect); + selectformat.selectByIndex(1); + ElementHelper.clickOnElement(PubSubLocators.saveButton); + } + + @And("Click on configure button") + public void clickOnConfigureButton() { + ElementHelper.clickOnElement(PubSubLocators.configButton); + } + + @And("Click on pipeline config") + public void clickOnPipelineConfig() { + ElementHelper.clickOnElement(PubSubLocators.pipelineConfig); + } } diff --git a/src/e2e-test/java/io/cdap/plugin/pubsub/stepsdesign/PubSubSource.java b/src/e2e-test/java/io/cdap/plugin/pubsub/stepsdesign/PubSubSource.java index d18ef2ea9c..11a5da516e 100644 --- a/src/e2e-test/java/io/cdap/plugin/pubsub/stepsdesign/PubSubSource.java +++ b/src/e2e-test/java/io/cdap/plugin/pubsub/stepsdesign/PubSubSource.java @@ -16,10 +16,17 @@ package io.cdap.plugin.pubsub.stepsdesign; +import io.cdap.e2e.pages.locators.CdfStudioLocators; import io.cdap.e2e.utils.CdfHelper; +import io.cdap.e2e.utils.ElementHelper; +import io.cdap.plugin.common.stepsdesign.TestSetupHooks; +import io.cdap.plugin.pubsub.actions.PubSubActions; import io.cucumber.java.en.Then; import io.cucumber.java.en.When; +import java.io.IOException; +import java.util.concurrent.TimeUnit; + /** * PubSub Source Plugin related step design. */ @@ -35,4 +42,32 @@ public void sourceIsPubSub() { public void openThePubSubSourceProperties() { openSourcePluginProperties("GooglePublisher"); } + + @Then("Enter PubSub source property topic name") + public void enterPubSubSourcePropertyTopicName() { + PubSubActions.enterPubSubTopic(TestSetupHooks.pubSubSourceTopic); + } + + @Then("Enter PubSub source property subscription name") + public void enterPubSubSourcePropertySubscriptionName() { + PubSubActions.enterSubscription(TestSetupHooks.pubSubSourceSubscription); + } + + @Then("Publish the messages") + public void publishTheMessage() throws IOException, InterruptedException { + TimeUnit time = TimeUnit.SECONDS; + time.sleep(120); + TestSetupHooks.publishMessage(); + } + + @Then("Enter runtime argument value for PubSub source property topic key {string}") + public void enterRuntimeArgumentValueForPubSubSourcePropertyTopicKey(String runtimeArgumentKey) { + ElementHelper.sendKeys(CdfStudioLocators.runtimeArgsValue(runtimeArgumentKey), TestSetupHooks.pubSubSourceTopic); + } + + @Then("Enter runtime argument value for PubSub source property subscription key {string}") + public void enterRuntimeArgumentValueForPubSubSourcePropertySubscriptionKey(String runtimeArgumentKey) { + ElementHelper.sendKeys(CdfStudioLocators.runtimeArgsValue(runtimeArgumentKey), + TestSetupHooks.pubSubSourceSubscription); + } } diff --git a/src/e2e-test/java/io/cdap/plugin/utils/PubSubClient.java b/src/e2e-test/java/io/cdap/plugin/utils/PubSubClient.java index 0331627140..6fb4f94a0c 100644 --- a/src/e2e-test/java/io/cdap/plugin/utils/PubSubClient.java +++ b/src/e2e-test/java/io/cdap/plugin/utils/PubSubClient.java @@ -1,5 +1,5 @@ /* - * Copyright © 2021 Cask Data, Inc. + * Copyright © 2024 Cask Data, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); you may not * use this file except in compliance with the License. You may obtain a copy of @@ -16,19 +16,42 @@ package io.cdap.plugin.utils; +import com.google.api.core.ApiFuture; +import com.google.api.core.ApiFutureCallback; +import com.google.api.core.ApiFutures; +import com.google.api.gax.rpc.ApiException; +import com.google.cloud.pubsub.v1.AckReplyConsumer; +import com.google.cloud.pubsub.v1.MessageReceiver; +import com.google.cloud.pubsub.v1.Publisher; +import com.google.cloud.pubsub.v1.Subscriber; +import com.google.cloud.pubsub.v1.SubscriptionAdminClient; +import com.google.cloud.pubsub.v1.SubscriptionAdminSettings; import com.google.cloud.pubsub.v1.TopicAdminClient; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.protobuf.ByteString; +import com.google.pubsub.v1.ProjectSubscriptionName; +import com.google.pubsub.v1.PubsubMessage; +import com.google.pubsub.v1.PushConfig; import com.google.pubsub.v1.Topic; import com.google.pubsub.v1.TopicName; import io.cdap.e2e.utils.ConstantsUtil; import io.cdap.e2e.utils.PluginPropertyUtils; +import io.grpc.StatusRuntimeException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; /** * Represents PubSub client. */ public class PubSubClient { + private static final Logger logger = LoggerFactory.getLogger(PubSubClient.class); + public static Topic createTopic(String topicId) throws IOException { try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) { TopicName topicName = TopicName.of(PluginPropertyUtils.pluginProp(ConstantsUtil.PROJECT_ID), topicId); @@ -36,6 +59,27 @@ public static Topic createTopic(String topicId) throws IOException { } } + /** + * Create the subscription. + */ + public static void createSubscription(String subscriptionId, String topicId) throws IOException { + ProjectSubscriptionName subscriptionName = null; + try (SubscriptionAdminClient subscriptionAdminClient = SubscriptionAdminClient.create( + SubscriptionAdminSettings.newBuilder().build())) { + TopicName topicName = TopicName.of(PluginPropertyUtils.pluginProp(ConstantsUtil.PROJECT_ID), topicId); + subscriptionName = ProjectSubscriptionName.of + (PluginPropertyUtils.pluginProp(ConstantsUtil.PROJECT_ID), subscriptionId); + subscriptionAdminClient.createSubscription(subscriptionName, topicName, PushConfig.getDefaultInstance(), 60); + logger.info("Subscription created: " + subscriptionName.toString()); + } catch (StatusRuntimeException e) { + if ("ALREADY_EXISTS".equals(e.getStatus().getCode().name())) { + logger.info("Subscription already exists: {}", subscriptionName.toString()); + } else { + logger.info("Error creating subscription", e); + } + } + } + public static void deleteTopic(String topicId) throws IOException { try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) { TopicName topicName = TopicName.of(PluginPropertyUtils.pluginProp(ConstantsUtil.PROJECT_ID), topicId); @@ -56,4 +100,90 @@ public static String getTopicCmekKey(String topicId) throws IOException { return getTopic(topicId).getKmsKeyName(); } + + public static void publishMessagesWithPubSub(String projectId, String topicId, List dataMessages) + throws IOException, InterruptedException { + TopicName topicName = TopicName.of(projectId, topicId); + Publisher publisher = null; + try { + publisher = Publisher.newBuilder(topicName).build(); + for (final String message : dataMessages) { + ByteString data = ByteString.copyFromUtf8(message); + PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build(); + ApiFuture future = publisher.publish(pubsubMessage); + + /** + * Adding an asynchronous callback to handle success / failure. + */ + ApiFutures.addCallback( future, + new ApiFutureCallback() { + + @Override + public void onFailure(Throwable throwable) { + if (throwable instanceof ApiException) { + ApiException apiException = ((ApiException) throwable); + /** + * details on the API exception. + */ + logger.info(String.valueOf(apiException.getStatusCode().getCode())); + logger.info(String.valueOf(apiException.isRetryable())); + } + logger.info("Error publishing message : " + message); + } + @Override + public void onSuccess(String messageId) { + /** + * Once published, returns server-assigned message ids (unique within the topic). + */ + logger.info("Published message ID: " + messageId); + } + }, + MoreExecutors.directExecutor()); + } + } finally { + if (publisher != null) { + /** + * When finished with the publisher, shutdown to free up resources. + */ + publisher.shutdown(); + publisher.awaitTermination(1, TimeUnit.MINUTES); + } + } + } + + public static void subscribeAsyncExample(String projectId, String subscriptionId) { + ProjectSubscriptionName subscriptionName = ProjectSubscriptionName.of(projectId, subscriptionId); + /** + * Instantiate an asynchronous message receiver. + */ + MessageReceiver receiver = + (PubsubMessage message, AckReplyConsumer consumer) -> { + /** + * Handle incoming message, then ack the received message. + */ + logger.info("Id: " + message.getMessageId()); + logger.info("Data: " + message.getData().toStringUtf8()); + consumer.ack(); + }; + + Subscriber subscriber = null; + try { + subscriber = Subscriber.newBuilder(subscriptionName, receiver).build(); + /** + * Start the subscriber. + */ + subscriber.startAsync().awaitRunning(); + logger.info("Listening for messages on %s:\n", subscriptionName); + /** + * Allow the subscriber to run for 30s unless an unrecoverable error occurs. + */ + subscriber.awaitTerminated(200, TimeUnit.SECONDS); + } catch (TimeoutException timeoutException) { + logger.error("Timeout exception: {e}"); + /** + * Shut down the subscriber after 30s. Stop receiving messages. + */ + subscriber.stopAsync(); + } + } } diff --git a/src/e2e-test/resources/pluginParameters.properties b/src/e2e-test/resources/pluginParameters.properties index ae191f7c90..29f6f16754 100644 --- a/src/e2e-test/resources/pluginParameters.properties +++ b/src/e2e-test/resources/pluginParameters.properties @@ -243,6 +243,10 @@ pubSubErrorThreshold=0 pubSubStringValue=one pubSubNegativeValue=-100 pubsubDelimiter=@ +pubSubSourceSubscription=dummy +pubSubSourceTopic=dummy +firstMessage=helloMessage +secondMessage=hiMessage ## PUBSUBSINK-PLUGIN-PROPERTIES-END ## GCSDELETE-PLUGIN-PROPERTIES-START diff --git a/src/main/java/io/cdap/plugin/gcp/bigquery/action/BigQueryExecute.java b/src/main/java/io/cdap/plugin/gcp/bigquery/action/BigQueryExecute.java index fbb96c1f16..c5e2f4a6a2 100644 --- a/src/main/java/io/cdap/plugin/gcp/bigquery/action/BigQueryExecute.java +++ b/src/main/java/io/cdap/plugin/gcp/bigquery/action/BigQueryExecute.java @@ -393,6 +393,11 @@ public static final class Config extends AbstractBigQueryActionConfig { private Config(@Nullable String project, @Nullable String serviceAccountType, @Nullable String serviceFilePath, @Nullable String serviceAccountJson, @Nullable String dataset, @Nullable String table, @Nullable String location, @Nullable String cmekKey, @Nullable String dialect, @Nullable String sql, + @Nullable String mode, @Nullable Boolean storeResults, @Nullable String jobLabelKeyValue) { + @Nullable String mode, @Nullable Boolean storeResults, @Nullable String jobLabelKeyValue, + @Nullable String rowAsArguments, @Nullable Boolean retryOnBackendError, + @Nullable Long initialRetryDuration, @Nullable Long maxRetryDuration, + @Nullable Double retryMultiplier, @Nullable Integer maxRetryCount) { @Nullable String mode, @Nullable Boolean storeResults, @Nullable String jobLabelKeyValue, @Nullable String rowAsArguments, @Nullable Boolean retryOnBackendError, @Nullable Long initialRetryDuration, @Nullable Long maxRetryDuration, @@ -411,6 +416,12 @@ private Config(@Nullable String project, @Nullable String serviceAccountType, @N this.rowAsArguments = rowAsArguments; this.storeResults = storeResults; this.jobLabelKeyValue = jobLabelKeyValue; + this.jobLabelKeyValue = jobLabelKeyValue; + this.retryOnBackendError = retryOnBackendError; + this.initialRetryDuration = initialRetryDuration; + this.maxRetryDuration = maxRetryDuration; + this.maxRetryCount = maxRetryCount; + this.retryMultiplier = retryMultiplier; this.retryOnBackendError = retryOnBackendError; this.initialRetryDuration = initialRetryDuration; this.maxRetryDuration = maxRetryDuration; @@ -481,6 +492,31 @@ public int getMaxRetryCount() { return maxRetryCount == null ? DEFAULT_MAX_RETRY_COUNT : maxRetryCount; } + @Nullable + public String getJobLabelKeyValue() { + return jobLabelKeyValue; + } + + public boolean getRetryOnBackendError() { + return retryOnBackendError == null || retryOnBackendError; + } + + public long getInitialRetryDuration() { + return initialRetryDuration == null ? DEFAULT_INITIAL_RETRY_DURATION_SECONDS : initialRetryDuration; + } + + public long getMaxRetryDuration() { + return maxRetryDuration == null ? DEFULT_MAX_RETRY_DURATION_SECONDS : maxRetryDuration; + } + + public double getRetryMultiplier() { + return retryMultiplier == null ? DEFAULT_RETRY_MULTIPLIER : retryMultiplier; + } + + public int getMaxRetryCount() { + return maxRetryCount == null ? DEFAULT_MAX_RETRY_COUNT : maxRetryCount; + } + @Override public void validate(FailureCollector failureCollector) { validate(failureCollector, Collections.emptyMap()); @@ -533,6 +569,10 @@ public void validate(FailureCollector failureCollector, Map argu validateJobLabelKeyValue(failureCollector); } + if (!containsMacro(NAME_BQ_JOB_LABELS)) { + validateJobLabelKeyValue(failureCollector); + } + failureCollector.getOrThrowException(); } @@ -550,6 +590,49 @@ void validateJobLabelKeyValue(FailureCollector failureCollector) { failureCollector.getOrThrowException(); } + void validateJobLabelKeyValue(FailureCollector failureCollector) { + BigQueryUtil.validateJobLabelKeyValue(jobLabelKeyValue, failureCollector, NAME_BQ_JOB_LABELS); + } + + void validateRetryConfiguration(FailureCollector failureCollector, Long initialRetryDuration, + Long maxRetryDuration, Integer maxRetryCount, Double retryMultiplier) { + if (initialRetryDuration != null && initialRetryDuration <= 0) { + failureCollector.addFailure("Initial retry duration must be greater than 0.", + "Please specify a valid initial retry duration.") + .withConfigProperty(NAME_INITIAL_RETRY_DURATION); + } + if (maxRetryDuration != null && maxRetryDuration <= 0) { + failureCollector.addFailure("Max retry duration must be greater than 0.", + "Please specify a valid max retry duration.") + .withConfigProperty(NAME_MAX_RETRY_DURATION); + } + if (maxRetryCount != null && maxRetryCount <= 0) { + failureCollector.addFailure("Max retry count must be greater than 0.", + "Please specify a valid max retry count.") + .withConfigProperty(NAME_MAX_RETRY_COUNT); + } + if (retryMultiplier != null && retryMultiplier <= 1) { + failureCollector.addFailure("Retry multiplier must be strictly greater than 1.", + "Please specify a valid retry multiplier.") + .withConfigProperty(NAME_RETRY_MULTIPLIER); + } + if (maxRetryDuration != null && initialRetryDuration != null && maxRetryDuration <= initialRetryDuration) { + failureCollector.addFailure("Max retry duration must be greater than initial retry duration.", + "Please specify a valid max retry duration.") + .withConfigProperty(NAME_MAX_RETRY_DURATION); + } + // Verify retry configuration when retry on backend error is enabled and none of the retry configuration + // properties are macros. + if (!containsMacro(NAME_RETRY_ON_BACKEND_ERROR) && retryOnBackendError != null && retryOnBackendError && + !containsMacro(NAME_INITIAL_RETRY_DURATION) && !containsMacro(NAME_MAX_RETRY_DURATION) && + !containsMacro(NAME_MAX_RETRY_COUNT) && !containsMacro(NAME_RETRY_MULTIPLIER)) { + validateRetryConfiguration( + failureCollector, initialRetryDuration, maxRetryDuration, maxRetryCount, retryMultiplier + ); + } + failureCollector.getOrThrowException(); + } + void validateRetryConfiguration(FailureCollector failureCollector, Long initialRetryDuration, Long maxRetryDuration, Integer maxRetryCount, Double retryMultiplier) { if (initialRetryDuration != null && initialRetryDuration <= 0) { @@ -722,6 +805,41 @@ public Builder setJobLabelKeyValue(@Nullable String jobLabelKeyValue) { return this; } + public Builder setJobLabelKeyValue(@Nullable String jobLabelKeyValue) { + this.jobLabelKeyValue = jobLabelKeyValue; + return this; + } + + public Builder setRetryOnBackendError(@Nullable Boolean retryOnBackendError) { + this.retryOnBackendError = retryOnBackendError; + return this; + } + + public Builder setStoreResults(@Nullable Boolean storeResults) { + this.storeResults = storeResults; + return this; + } + + public Builder setInitialRetryDuration(@Nullable Long initialRetryDuration) { + this.initialRetryDuration = initialRetryDuration; + return this; + } + + public Builder setMaxRetryDuration(@Nullable Long maxRetryDuration) { + this.maxRetryDuration = maxRetryDuration; + return this; + } + + public Builder setMaxRetryCount(@Nullable Integer maxRetryCount) { + this.maxRetryCount = maxRetryCount; + return this; + } + + public Builder setRetryMultiplier(@Nullable Double retryMultiplier) { + this.retryMultiplier = retryMultiplier; + return this; + } + public Builder setRetryOnBackendError(@Nullable Boolean retryOnBackendError) { this.retryOnBackendError = retryOnBackendError; return this; @@ -766,6 +884,15 @@ public Config build() { sql, mode, storeResults, + jobLabelKeyValue + storeResults, + jobLabelKeyValue, + rowAsArguments, + retryOnBackendError, + initialRetryDuration, + maxRetryDuration, + retryMultiplier, + maxRetryCount jobLabelKeyValue, rowAsArguments, retryOnBackendError, diff --git a/src/main/java/io/cdap/plugin/gcp/bigquery/sink/BigQueryRecordToJson.java b/src/main/java/io/cdap/plugin/gcp/bigquery/sink/BigQueryRecordToJson.java index c397e52c74..ec2c70edb7 100644 --- a/src/main/java/io/cdap/plugin/gcp/bigquery/sink/BigQueryRecordToJson.java +++ b/src/main/java/io/cdap/plugin/gcp/bigquery/sink/BigQueryRecordToJson.java @@ -250,10 +250,14 @@ private static void writeArray(JsonWriter writer, } if (element instanceof StructuredRecord) { StructuredRecord record = (StructuredRecord) element; + path.add(name); processRecord(writer, record, Objects.requireNonNull(record.getSchema().getFields()), path, jsonStringFieldsPaths); + path.remove(path.size() - 1); } else { + path.add(name); write(writer, name, true, element, componentSchema, path, jsonStringFieldsPaths); + path.remove(path.size() - 1); } } } diff --git a/src/test/java/io/cdap/plugin/gcp/bigquery/BigQueryRecordToJsonTest.java b/src/test/java/io/cdap/plugin/gcp/bigquery/BigQueryRecordToJsonTest.java index e7c1f82aee..962a44fd28 100644 --- a/src/test/java/io/cdap/plugin/gcp/bigquery/BigQueryRecordToJsonTest.java +++ b/src/test/java/io/cdap/plugin/gcp/bigquery/BigQueryRecordToJsonTest.java @@ -422,58 +422,6 @@ public void testJsonStringWithEmptyArray() throws IOException { } } - @Test - public void testJsonStringWithStringArray() throws IOException { - Schema recordSchema = Schema.recordOf("record", - Schema.Field.of("arrayOfString", Schema.arrayOf(Schema.of(Schema.Type.STRING)))); - List jsonStringList = ImmutableList.of("{\"arrayKey1\": \"arrayValue1\"}", - "{\"arrayKey2\": \"arrayValue2\"}"); - StructuredRecord record = StructuredRecord.builder(recordSchema).set("arrayOfString", jsonStringList).build(); - Set jsonStringFieldsPaths = ImmutableSet.of("arrayOfString"); - try (JsonTreeWriter writer = new JsonTreeWriter()) { - writer.beginObject(); - for (Schema.Field recordField : Objects.requireNonNull(record.getSchema().getFields())) { - if (recordSchema.getField(recordField.getName()) != null) { - BigQueryRecordToJson.write(writer, recordField.getName(), record.get(recordField.getName()), - recordField.getSchema(), jsonStringFieldsPaths); - } - } - writer.endObject(); - JsonObject actual = writer.get().getAsJsonObject(); - String actualJsonString = actual.get("arrayOfString").getAsJsonArray().toString(); - String expectedJsonString = "[{\"arrayKey1\":\"arrayValue1\"},{\"arrayKey2\":\"arrayValue2\"}]"; - Assert.assertEquals(expectedJsonString, actualJsonString); - } - } - - @Test - public void testJsonStringWithArrayAndNestedRecord() throws IOException { - Schema nestedRecordSchema = Schema.recordOf("nestedRecord", - Schema.Field.of("nestedJsonString", Schema.of(Schema.Type.STRING))); - StructuredRecord nestedRecord = StructuredRecord.builder(nestedRecordSchema) - .set("nestedJsonString", "{\"nestedKey1\":\"nestedValue1\"}").build(); - Schema recordSchema = Schema.recordOf("record", - Schema.Field.of("arrayOfNestedRecord", Schema.arrayOf(nestedRecordSchema))); - List nestedRecordList = ImmutableList.of(nestedRecord); - StructuredRecord record = StructuredRecord.builder(recordSchema).set("arrayOfNestedRecord", nestedRecordList) - .build(); - - Set jsonStringFieldsPaths = ImmutableSet.of("arrayOfNestedRecord.nestedJsonString"); - try (JsonTreeWriter writer = new JsonTreeWriter()) { - writer.beginObject(); - for (Schema.Field recordField : Objects.requireNonNull(record.getSchema().getFields())) { - if (recordSchema.getField(recordField.getName()) != null) { - BigQueryRecordToJson.write(writer, recordField.getName(), record.get(recordField.getName()), - recordField.getSchema(), jsonStringFieldsPaths); - } - } - writer.endObject(); - JsonObject actual = writer.get().getAsJsonObject(); - String actualJsonString = actual.get("arrayOfNestedRecord").toString(); - String expectedJsonString = "[{\"nestedJsonString\":{\"nestedKey1\":\"nestedValue1\"}}]"; - Assert.assertEquals(expectedJsonString, actualJsonString); - } - } /** * Empty JSON string is not a valid JSON string and should throw an exception.