From 9108fee7b6744358c85292f647a4e9f3fd44d656 Mon Sep 17 00:00:00 2001 From: priyabhatnagar Date: Fri, 15 Dec 2023 14:57:21 +0530 Subject: [PATCH 1/8] bq additional scenarios --- .../common/stepsdesign/TestSetupHooks.java | 31 ++++++++++++++++++- .../io/cdap/plugin/utils/PubSubClient.java | 21 +++++++++++++ 2 files changed, 51 insertions(+), 1 deletion(-) diff --git a/src/e2e-test/java/io/cdap/plugin/common/stepsdesign/TestSetupHooks.java b/src/e2e-test/java/io/cdap/plugin/common/stepsdesign/TestSetupHooks.java index 3fc9a35a94..f96842234d 100644 --- a/src/e2e-test/java/io/cdap/plugin/common/stepsdesign/TestSetupHooks.java +++ b/src/e2e-test/java/io/cdap/plugin/common/stepsdesign/TestSetupHooks.java @@ -62,6 +62,8 @@ public class TestSetupHooks { public static String bqSourceTable2 = StringUtils.EMPTY; public static String bqSourceView = StringUtils.EMPTY; public static String pubSubTargetTopic = StringUtils.EMPTY; + public static String pubSubSourceTopic = StringUtils.EMPTY; + public static String pubSubSourceSubscription = StringUtils.EMPTY; public static String spannerInstance = StringUtils.EMPTY; public static String spannerDatabase = StringUtils.EMPTY; public static String spannerSourceTable = StringUtils.EMPTY; @@ -486,6 +488,33 @@ public static void createTargetPubSubTopic() { BeforeActions.scenario.write("Target PubSub topic " + pubSubTargetTopic); } + @Before(order = 1, value = "@PUBSUB_SOURCE_TEST") + public static void createSourcePubSubTopic() throws IOException { + PubSubClient.createTopic(pubSubSourceTopic); + pubSubSourceTopic = "cdf-e2e-test-" + UUID.randomUUID(); + BeforeActions.scenario.write("Target PubSub topic " + pubSubSourceTopic); + } + + @Before(order = 1, value = "@PUBSUB_SUBSCRIPTION_TEST") + public static void createSubscriptionPubSubTopic() { + pubSubSourceSubscription = "cdf-e2e-test-" + UUID.randomUUID(); + BeforeActions.scenario.write("Source PubSub subscription " + pubSubSourceSubscription); + } + @After(order = 1, value = "@PUBSUB_SOURCE_TEST") + public static void deleteSourcePubSubTopic() { + try { + PubSubClient.deleteTopic(pubSubSourceTopic); + BeforeActions.scenario.write("Deleted target PubSub topic " + pubSubSourceTopic); + pubSubSourceTopic = StringUtils.EMPTY; + } catch (Exception e) { + if (e.getMessage().contains("Invalid resource name given") || e.getMessage().contains("Resource not found")) { + BeforeActions.scenario.write("Source PubSub topic " + pubSubSourceTopic + " does not exist."); + } else { + Assert.fail(e.getMessage()); + } + } + } + @After(order = 1, value = "@PUBSUB_SINK_TEST") public static void deleteTargetPubSubTopic() { try { @@ -1141,7 +1170,7 @@ public static void createSinkBQDedupeTable() throws IOException, InterruptedExce @Before(value = "@BQ_INSERT_INT_SOURCE_TEST") public static void createSourceBQTable() throws IOException, InterruptedException { - bqSourceTable = "E2E_TARGET_" + UUID.randomUUID().toString().replaceAll("-", "_"); + bqSourceTable = "E2E_SOURCE_" + UUID.randomUUID().toString().replaceAll("-", "_"); PluginPropertyUtils.addPluginProp("bqSourceTable", bqSourceTable); BeforeActions.scenario.write("BQ source table name - " + bqSourceTable); BigQueryClient.getSoleQueryResult("create table `" + datasetName + "." + bqSourceTable + "` " + diff --git a/src/e2e-test/java/io/cdap/plugin/utils/PubSubClient.java b/src/e2e-test/java/io/cdap/plugin/utils/PubSubClient.java index 0331627140..861e06111b 100644 --- a/src/e2e-test/java/io/cdap/plugin/utils/PubSubClient.java +++ b/src/e2e-test/java/io/cdap/plugin/utils/PubSubClient.java @@ -16,11 +16,17 @@ package io.cdap.plugin.utils; +import com.google.api.gax.rpc.AlreadyExistsException; +import com.google.cloud.pubsub.v1.SubscriptionAdminClient; +import com.google.cloud.pubsub.v1.SubscriptionAdminSettings; import com.google.cloud.pubsub.v1.TopicAdminClient; +import com.google.pubsub.v1.PushConfig; +import com.google.pubsub.v1.Subscription; import com.google.pubsub.v1.Topic; import com.google.pubsub.v1.TopicName; import io.cdap.e2e.utils.ConstantsUtil; import io.cdap.e2e.utils.PluginPropertyUtils; +import com.google.pubsub.v1.ProjectSubscriptionName; import java.io.IOException; @@ -36,6 +42,21 @@ public static Topic createTopic(String topicId) throws IOException { } } + // Create the subscription + public static Subscription createSubscription(String subscriptionId, String topicId) throws IOException { + ProjectSubscriptionName subscriptionName = null; + try (SubscriptionAdminClient subscriptionAdminClient = SubscriptionAdminClient.create( + SubscriptionAdminSettings.newBuilder().build())) { + TopicName topicName = TopicName.of(PluginPropertyUtils.pluginProp(ConstantsUtil.PROJECT_ID), topicId); + subscriptionName = ProjectSubscriptionName.of(ConstantsUtil.PROJECT_ID, subscriptionId); + subscriptionAdminClient.createSubscription(subscriptionName, topicName, PushConfig.getDefaultInstance(), 60); + System.out.println("Subscription created: " + subscriptionName.toString()); + } catch (AlreadyExistsException e) { + System.out.println("Subscription already exists: " + subscriptionName.toString()); + } + return null; + } + public static void deleteTopic(String topicId) throws IOException { try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) { TopicName topicName = TopicName.of(PluginPropertyUtils.pluginProp(ConstantsUtil.PROJECT_ID), topicId); From 678d82ad8bfc556e0b55bd2ffb8895a48974d8e5 Mon Sep 17 00:00:00 2001 From: neerajsinghal Date: Tue, 2 Jan 2024 16:35:50 +0530 Subject: [PATCH 2/8] pubsub cases --- .../features/pubsub/sink/BQToPubSub.feature | 38 +++++++ .../common/stepsdesign/TestSetupHooks.java | 10 +- .../plugin/pubsub/stepsdesign/PubSubSink.java | 6 ++ .../pubsub/stepsdesign/PubSubSource.java | 23 ++++- .../io/cdap/plugin/utils/PubSubClient.java | 99 ++++++++++++++++++- .../resources/pluginParameters.properties | 2 + 6 files changed, 168 insertions(+), 10 deletions(-) diff --git a/src/e2e-test/features/pubsub/sink/BQToPubSub.feature b/src/e2e-test/features/pubsub/sink/BQToPubSub.feature index 8a1b5aeabd..adb73090c1 100644 --- a/src/e2e-test/features/pubsub/sink/BQToPubSub.feature +++ b/src/e2e-test/features/pubsub/sink/BQToPubSub.feature @@ -40,3 +40,41 @@ Feature: PubSub-Sink - Verification of BigQuery to PubSub successful data transf Then Validate OUT record count is equal to IN record count Then Open and capture logs Then Validate the cmek key "cmekPubSub" of target PubSub topic if cmek is enabled + + @PUBSUB_SOURCE_TEST @PUBSUB_SINK_TEST @PUBSUB_SUBSCRIPTION_TEST + Scenario: pubsub flow + Given Open Datafusion Project to configure pipeline + When Select data pipeline type as: "Realtime" + When Expand Plugin group in the LHS plugins list: "Source" + When Select plugin: "Pub/Sub" from the plugins list as: "Source" + When Expand Plugin group in the LHS plugins list: "Sink" + When Select plugin: "Pub/Sub" from the plugins list as: "Sink" + Then Connect plugins: "Pub/Sub" and "Pub/Sub2" to establish connection + Then Navigate to the properties page of plugin: "Pub/Sub" + Then Override Service account details if set in environment variables + Then Enter input plugin property: "referenceName" with value: "BQReferenceName" + Then Enter PubSub source property subscription name + Then Enter PubSub source property topic name + Then Validate "Pub/Sub" plugin properties + And Close the Plugin Properties page + Then Open the PubSub sink properties + Then Enter PubSub property projectId "projectId" + Then Override Service account details if set in environment variables + Then Enter PubSub property reference name + Then Enter PubSub source property topic name + Then Validate "Pub/Sub" plugin properties + And Close the Plugin Properties page + + Then Save the pipeline + Then Preview and run the pipeline + Then Wait till pipeline preview is in running state + Then Open and capture pipeline preview logs + Then Close the pipeline logs + Then Close the preview + Then Deploy the pipeline + Then Run the Pipeline in Runtime + # Then Wait till pipeline is in running state + #Then Open and capture logs + #Then Verify the pipeline status is "Succeeded" + Then Publish the message + Then Subscribe to the messages diff --git a/src/e2e-test/java/io/cdap/plugin/common/stepsdesign/TestSetupHooks.java b/src/e2e-test/java/io/cdap/plugin/common/stepsdesign/TestSetupHooks.java index f96842234d..0b564f587f 100644 --- a/src/e2e-test/java/io/cdap/plugin/common/stepsdesign/TestSetupHooks.java +++ b/src/e2e-test/java/io/cdap/plugin/common/stepsdesign/TestSetupHooks.java @@ -490,9 +490,9 @@ public static void createTargetPubSubTopic() { @Before(order = 1, value = "@PUBSUB_SOURCE_TEST") public static void createSourcePubSubTopic() throws IOException { - PubSubClient.createTopic(pubSubSourceTopic); pubSubSourceTopic = "cdf-e2e-test-" + UUID.randomUUID(); - BeforeActions.scenario.write("Target PubSub topic " + pubSubSourceTopic); + PubSubClient.createTopic(pubSubSourceTopic); + BeforeActions.scenario.write("Source PubSub topic " + pubSubSourceTopic); } @Before(order = 1, value = "@PUBSUB_SUBSCRIPTION_TEST") @@ -500,11 +500,11 @@ public static void createSubscriptionPubSubTopic() { pubSubSourceSubscription = "cdf-e2e-test-" + UUID.randomUUID(); BeforeActions.scenario.write("Source PubSub subscription " + pubSubSourceSubscription); } - @After(order = 1, value = "@PUBSUB_SOURCE_TEST") + @After(order = 1, value = "") public static void deleteSourcePubSubTopic() { try { PubSubClient.deleteTopic(pubSubSourceTopic); - BeforeActions.scenario.write("Deleted target PubSub topic " + pubSubSourceTopic); + BeforeActions.scenario.write("Deleted Source PubSub topic " + pubSubSourceTopic); pubSubSourceTopic = StringUtils.EMPTY; } catch (Exception e) { if (e.getMessage().contains("Invalid resource name given") || e.getMessage().contains("Resource not found")) { @@ -515,7 +515,7 @@ public static void deleteSourcePubSubTopic() { } } - @After(order = 1, value = "@PUBSUB_SINK_TEST") + @After(order = 1, value = "") public static void deleteTargetPubSubTopic() { try { PubSubClient.deleteTopic(pubSubTargetTopic); diff --git a/src/e2e-test/java/io/cdap/plugin/pubsub/stepsdesign/PubSubSink.java b/src/e2e-test/java/io/cdap/plugin/pubsub/stepsdesign/PubSubSink.java index 927b0d3c7d..1cb687e2f0 100644 --- a/src/e2e-test/java/io/cdap/plugin/pubsub/stepsdesign/PubSubSink.java +++ b/src/e2e-test/java/io/cdap/plugin/pubsub/stepsdesign/PubSubSink.java @@ -234,4 +234,10 @@ public void enterRuntimeArgumentValueForPubSubSinkPropertyTopicKey(String runtim public void clickOnPreviewDataForPubSubSink() { openSinkPluginPreviewData("GooglePublisher"); } + @Then("Subscribe to the messages") + public void subscribeToTheMessages() + { + PubSubClient.subscribeAsyncExample(PluginPropertyUtils.pluginProp(ConstantsUtil.PROJECT_ID),TestSetupHooks.pubSubSourceSubscription); + } + } diff --git a/src/e2e-test/java/io/cdap/plugin/pubsub/stepsdesign/PubSubSource.java b/src/e2e-test/java/io/cdap/plugin/pubsub/stepsdesign/PubSubSource.java index d18ef2ea9c..29cb952538 100644 --- a/src/e2e-test/java/io/cdap/plugin/pubsub/stepsdesign/PubSubSource.java +++ b/src/e2e-test/java/io/cdap/plugin/pubsub/stepsdesign/PubSubSource.java @@ -17,8 +17,14 @@ package io.cdap.plugin.pubsub.stepsdesign; import io.cdap.e2e.utils.CdfHelper; +import io.cdap.e2e.utils.ConstantsUtil; +import io.cdap.e2e.utils.PluginPropertyUtils; +import io.cdap.plugin.common.stepsdesign.TestSetupHooks; +import io.cdap.plugin.pubsub.actions.PubSubActions; +import io.cdap.plugin.utils.PubSubClient; import io.cucumber.java.en.Then; import io.cucumber.java.en.When; +import java.io.IOException; /** * PubSub Source Plugin related step design. @@ -33,6 +39,19 @@ public void sourceIsPubSub() { @Then("Open the PubSub source properties") public void openThePubSubSourceProperties() { - openSourcePluginProperties("GooglePublisher"); + openSourcePluginProperties("pubsub"); } -} + + @Then("Enter PubSub source property topic name") + public void enterPubSubSourcePropertyTopicName() { + PubSubActions.enterPubSubTopic(TestSetupHooks.pubSubSourceTopic); } + @Then("Enter PubSub source property subscription name") + public void enterPubSubSourcePropertySubscriptionName() { + PubSubActions.enterSubscription(TestSetupHooks.pubSubSourceSubscription); + } + + @Then("Publish the message") + public void publishTheMessage() throws IOException, InterruptedException { + PubSubClient.publishWithErrorHandlerExample(PluginPropertyUtils.pluginProp(ConstantsUtil.PROJECT_ID), TestSetupHooks.pubSubSourceTopic); + } +} \ No newline at end of file diff --git a/src/e2e-test/java/io/cdap/plugin/utils/PubSubClient.java b/src/e2e-test/java/io/cdap/plugin/utils/PubSubClient.java index 861e06111b..94a379a97d 100644 --- a/src/e2e-test/java/io/cdap/plugin/utils/PubSubClient.java +++ b/src/e2e-test/java/io/cdap/plugin/utils/PubSubClient.java @@ -16,12 +16,22 @@ package io.cdap.plugin.utils; +import com.google.api.core.ApiFuture; +import com.google.api.core.ApiFutureCallback; +import com.google.api.core.ApiFutures; import com.google.api.gax.rpc.AlreadyExistsException; +import com.google.api.gax.rpc.ApiException; +import com.google.cloud.pubsub.v1.AckReplyConsumer; +import com.google.cloud.pubsub.v1.MessageReceiver; +import com.google.cloud.pubsub.v1.Publisher; +import com.google.cloud.pubsub.v1.Subscriber; import com.google.cloud.pubsub.v1.SubscriptionAdminClient; import com.google.cloud.pubsub.v1.SubscriptionAdminSettings; import com.google.cloud.pubsub.v1.TopicAdminClient; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.protobuf.ByteString; +import com.google.pubsub.v1.PubsubMessage; import com.google.pubsub.v1.PushConfig; -import com.google.pubsub.v1.Subscription; import com.google.pubsub.v1.Topic; import com.google.pubsub.v1.TopicName; import io.cdap.e2e.utils.ConstantsUtil; @@ -29,6 +39,10 @@ import com.google.pubsub.v1.ProjectSubscriptionName; import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; /** * Represents PubSub client. @@ -43,7 +57,7 @@ public static Topic createTopic(String topicId) throws IOException { } // Create the subscription - public static Subscription createSubscription(String subscriptionId, String topicId) throws IOException { + public static void createSubscription(String subscriptionId, String topicId) throws IOException { ProjectSubscriptionName subscriptionName = null; try (SubscriptionAdminClient subscriptionAdminClient = SubscriptionAdminClient.create( SubscriptionAdminSettings.newBuilder().build())) { @@ -52,9 +66,9 @@ public static Subscription createSubscription(String subscriptionId, String topi subscriptionAdminClient.createSubscription(subscriptionName, topicName, PushConfig.getDefaultInstance(), 60); System.out.println("Subscription created: " + subscriptionName.toString()); } catch (AlreadyExistsException e) { + assert subscriptionName != null; System.out.println("Subscription already exists: " + subscriptionName.toString()); } - return null; } public static void deleteTopic(String topicId) throws IOException { @@ -77,4 +91,83 @@ public static String getTopicCmekKey(String topicId) throws IOException { return getTopic(topicId).getKmsKeyName(); } + + public static void publishWithErrorHandlerExample(String projectId, String topicId) + throws IOException, InterruptedException { + TopicName topicName = TopicName.of(projectId, topicId); + Publisher publisher = null; + + try { + publisher = Publisher.newBuilder(topicName).build(); + + List messages = Arrays.asList("first message", "second message"); + + for (final String message : messages) { + ByteString data = ByteString.copyFromUtf8(message); + PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build(); + ApiFuture future = publisher.publish(pubsubMessage); + + // Adding an asynchronous callback to handle success / failure + ApiFutures.addCallback( + future, + new ApiFutureCallback() { + + @Override + public void onFailure(Throwable throwable) { + if (throwable instanceof ApiException) { + ApiException apiException = ((ApiException) throwable); + // details on the API exception + // System.out.println(apiException.getStatusCode().getCode()); + //System.out.println(apiException.isRetryable()); + } + System.out.println("Error publishing message : " + message); + } + + @Override + public void onSuccess(String messageId) { + // Once published, returns server-assigned message ids (unique within the topic) + System.out.println("Published message ID: " + messageId); + } + }, + MoreExecutors.directExecutor()); + } + } finally { + if (publisher != null) { + // When finished with the publisher, shutdown to free up resources. + publisher.shutdown(); + publisher.awaitTermination(1, TimeUnit.MINUTES); + } + } + } + + public static void subscribeAsyncExample(String projectId, String subscriptionId) { + ProjectSubscriptionName subscriptionName = + ProjectSubscriptionName.of(projectId, subscriptionId); + + // Instantiate an asynchronous message receiver. + MessageReceiver receiver = + (PubsubMessage message, AckReplyConsumer consumer) -> { + // Handle incoming message, then ack the received message. + System.out.println("Id: " + message.getMessageId()); + System.out.println("Data: " + message.getData().toStringUtf8()); + consumer.ack(); + }; + + Subscriber subscriber = null; + try { + subscriber = Subscriber.newBuilder(subscriptionName, receiver).build(); + // Start the subscriber. + subscriber.startAsync().awaitRunning(); + System.out.printf("Listening for messages on %s:\n", subscriptionName.toString()); + // Allow the subscriber to run for 30s unless an unrecoverable error occurs. + subscriber.awaitTerminated(300, TimeUnit.SECONDS); + } catch (TimeoutException timeoutException) { + // Shut down the subscriber after 30s. Stop receiving messages. + subscriber.stopAsync(); + } + } + + + + } diff --git a/src/e2e-test/resources/pluginParameters.properties b/src/e2e-test/resources/pluginParameters.properties index ae191f7c90..f6d091c865 100644 --- a/src/e2e-test/resources/pluginParameters.properties +++ b/src/e2e-test/resources/pluginParameters.properties @@ -181,6 +181,8 @@ bqUpdateTableSchemaTrue=True clusterValue=transaction_date TableKey=PersonID bqSourceTable=dummy +#pubSubSourceSubscription=dummy +#pubSubSourceTopic=dummy bqCreateTableQueryFile=testdata/BigQuery/BigQueryCreateTableQuery.txt bqInsertDataQueryFile=testdata/BigQuery/BigQueryInsertDataQuery.txt bqCreateViewQueryFile=testdata/BigQuery/BigQueryCreateViewQuery.txt From 53549c207f637b95e771b92952f9532006d048de Mon Sep 17 00:00:00 2001 From: psainics Date: Tue, 5 Dec 2023 10:51:52 +0530 Subject: [PATCH 3/8] Added BQ Execute Action job label support --- .../gcp/bigquery/action/BigQueryExecute.java | 22 +++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/src/main/java/io/cdap/plugin/gcp/bigquery/action/BigQueryExecute.java b/src/main/java/io/cdap/plugin/gcp/bigquery/action/BigQueryExecute.java index fbb96c1f16..81fc4ffd14 100644 --- a/src/main/java/io/cdap/plugin/gcp/bigquery/action/BigQueryExecute.java +++ b/src/main/java/io/cdap/plugin/gcp/bigquery/action/BigQueryExecute.java @@ -393,6 +393,7 @@ public static final class Config extends AbstractBigQueryActionConfig { private Config(@Nullable String project, @Nullable String serviceAccountType, @Nullable String serviceFilePath, @Nullable String serviceAccountJson, @Nullable String dataset, @Nullable String table, @Nullable String location, @Nullable String cmekKey, @Nullable String dialect, @Nullable String sql, + @Nullable String mode, @Nullable Boolean storeResults, @Nullable String jobLabelKeyValue) { @Nullable String mode, @Nullable Boolean storeResults, @Nullable String jobLabelKeyValue, @Nullable String rowAsArguments, @Nullable Boolean retryOnBackendError, @Nullable Long initialRetryDuration, @Nullable Long maxRetryDuration, @@ -411,6 +412,7 @@ private Config(@Nullable String project, @Nullable String serviceAccountType, @N this.rowAsArguments = rowAsArguments; this.storeResults = storeResults; this.jobLabelKeyValue = jobLabelKeyValue; + this.jobLabelKeyValue = jobLabelKeyValue; this.retryOnBackendError = retryOnBackendError; this.initialRetryDuration = initialRetryDuration; this.maxRetryDuration = maxRetryDuration; @@ -481,6 +483,11 @@ public int getMaxRetryCount() { return maxRetryCount == null ? DEFAULT_MAX_RETRY_COUNT : maxRetryCount; } + @Nullable + public String getJobLabelKeyValue() { + return jobLabelKeyValue; + } + @Override public void validate(FailureCollector failureCollector) { validate(failureCollector, Collections.emptyMap()); @@ -533,6 +540,10 @@ public void validate(FailureCollector failureCollector, Map argu validateJobLabelKeyValue(failureCollector); } + if (!containsMacro(NAME_BQ_JOB_LABELS)) { + validateJobLabelKeyValue(failureCollector); + } + failureCollector.getOrThrowException(); } @@ -550,6 +561,10 @@ void validateJobLabelKeyValue(FailureCollector failureCollector) { failureCollector.getOrThrowException(); } + void validateJobLabelKeyValue(FailureCollector failureCollector) { + BigQueryUtil.validateJobLabelKeyValue(jobLabelKeyValue, failureCollector, NAME_BQ_JOB_LABELS); + } + void validateRetryConfiguration(FailureCollector failureCollector, Long initialRetryDuration, Long maxRetryDuration, Integer maxRetryCount, Double retryMultiplier) { if (initialRetryDuration != null && initialRetryDuration <= 0) { @@ -722,6 +737,11 @@ public Builder setJobLabelKeyValue(@Nullable String jobLabelKeyValue) { return this; } + public Builder setJobLabelKeyValue(@Nullable String jobLabelKeyValue) { + this.jobLabelKeyValue = jobLabelKeyValue; + return this; + } + public Builder setRetryOnBackendError(@Nullable Boolean retryOnBackendError) { this.retryOnBackendError = retryOnBackendError; return this; @@ -766,6 +786,8 @@ public Config build() { sql, mode, storeResults, + jobLabelKeyValue + storeResults, jobLabelKeyValue, rowAsArguments, retryOnBackendError, From 9148b65580d3c8874820eb7ddfed25e6cd7e4014 Mon Sep 17 00:00:00 2001 From: psainics Date: Fri, 27 Oct 2023 19:56:41 +0530 Subject: [PATCH 4/8] Added BQ Execute Retry Added BQ Retry --- .../gcp/bigquery/action/BigQueryExecute.java | 105 ++++++++++++++++++ 1 file changed, 105 insertions(+) diff --git a/src/main/java/io/cdap/plugin/gcp/bigquery/action/BigQueryExecute.java b/src/main/java/io/cdap/plugin/gcp/bigquery/action/BigQueryExecute.java index 81fc4ffd14..c5e2f4a6a2 100644 --- a/src/main/java/io/cdap/plugin/gcp/bigquery/action/BigQueryExecute.java +++ b/src/main/java/io/cdap/plugin/gcp/bigquery/action/BigQueryExecute.java @@ -398,6 +398,10 @@ private Config(@Nullable String project, @Nullable String serviceAccountType, @N @Nullable String rowAsArguments, @Nullable Boolean retryOnBackendError, @Nullable Long initialRetryDuration, @Nullable Long maxRetryDuration, @Nullable Double retryMultiplier, @Nullable Integer maxRetryCount) { + @Nullable String mode, @Nullable Boolean storeResults, @Nullable String jobLabelKeyValue, + @Nullable String rowAsArguments, @Nullable Boolean retryOnBackendError, + @Nullable Long initialRetryDuration, @Nullable Long maxRetryDuration, + @Nullable Double retryMultiplier, @Nullable Integer maxRetryCount) { this.project = project; this.serviceAccountType = serviceAccountType; this.serviceFilePath = serviceFilePath; @@ -418,6 +422,11 @@ private Config(@Nullable String project, @Nullable String serviceAccountType, @N this.maxRetryDuration = maxRetryDuration; this.maxRetryCount = maxRetryCount; this.retryMultiplier = retryMultiplier; + this.retryOnBackendError = retryOnBackendError; + this.initialRetryDuration = initialRetryDuration; + this.maxRetryDuration = maxRetryDuration; + this.maxRetryCount = maxRetryCount; + this.retryMultiplier = retryMultiplier; } public boolean isLegacySQL() { @@ -488,6 +497,26 @@ public String getJobLabelKeyValue() { return jobLabelKeyValue; } + public boolean getRetryOnBackendError() { + return retryOnBackendError == null || retryOnBackendError; + } + + public long getInitialRetryDuration() { + return initialRetryDuration == null ? DEFAULT_INITIAL_RETRY_DURATION_SECONDS : initialRetryDuration; + } + + public long getMaxRetryDuration() { + return maxRetryDuration == null ? DEFULT_MAX_RETRY_DURATION_SECONDS : maxRetryDuration; + } + + public double getRetryMultiplier() { + return retryMultiplier == null ? DEFAULT_RETRY_MULTIPLIER : retryMultiplier; + } + + public int getMaxRetryCount() { + return maxRetryCount == null ? DEFAULT_MAX_RETRY_COUNT : maxRetryCount; + } + @Override public void validate(FailureCollector failureCollector) { validate(failureCollector, Collections.emptyMap()); @@ -565,6 +594,45 @@ void validateJobLabelKeyValue(FailureCollector failureCollector) { BigQueryUtil.validateJobLabelKeyValue(jobLabelKeyValue, failureCollector, NAME_BQ_JOB_LABELS); } + void validateRetryConfiguration(FailureCollector failureCollector, Long initialRetryDuration, + Long maxRetryDuration, Integer maxRetryCount, Double retryMultiplier) { + if (initialRetryDuration != null && initialRetryDuration <= 0) { + failureCollector.addFailure("Initial retry duration must be greater than 0.", + "Please specify a valid initial retry duration.") + .withConfigProperty(NAME_INITIAL_RETRY_DURATION); + } + if (maxRetryDuration != null && maxRetryDuration <= 0) { + failureCollector.addFailure("Max retry duration must be greater than 0.", + "Please specify a valid max retry duration.") + .withConfigProperty(NAME_MAX_RETRY_DURATION); + } + if (maxRetryCount != null && maxRetryCount <= 0) { + failureCollector.addFailure("Max retry count must be greater than 0.", + "Please specify a valid max retry count.") + .withConfigProperty(NAME_MAX_RETRY_COUNT); + } + if (retryMultiplier != null && retryMultiplier <= 1) { + failureCollector.addFailure("Retry multiplier must be strictly greater than 1.", + "Please specify a valid retry multiplier.") + .withConfigProperty(NAME_RETRY_MULTIPLIER); + } + if (maxRetryDuration != null && initialRetryDuration != null && maxRetryDuration <= initialRetryDuration) { + failureCollector.addFailure("Max retry duration must be greater than initial retry duration.", + "Please specify a valid max retry duration.") + .withConfigProperty(NAME_MAX_RETRY_DURATION); + } + // Verify retry configuration when retry on backend error is enabled and none of the retry configuration + // properties are macros. + if (!containsMacro(NAME_RETRY_ON_BACKEND_ERROR) && retryOnBackendError != null && retryOnBackendError && + !containsMacro(NAME_INITIAL_RETRY_DURATION) && !containsMacro(NAME_MAX_RETRY_DURATION) && + !containsMacro(NAME_MAX_RETRY_COUNT) && !containsMacro(NAME_RETRY_MULTIPLIER)) { + validateRetryConfiguration( + failureCollector, initialRetryDuration, maxRetryDuration, maxRetryCount, retryMultiplier + ); + } + failureCollector.getOrThrowException(); + } + void validateRetryConfiguration(FailureCollector failureCollector, Long initialRetryDuration, Long maxRetryDuration, Integer maxRetryCount, Double retryMultiplier) { if (initialRetryDuration != null && initialRetryDuration <= 0) { @@ -772,6 +840,36 @@ public Builder setRetryMultiplier(@Nullable Double retryMultiplier) { return this; } + public Builder setRetryOnBackendError(@Nullable Boolean retryOnBackendError) { + this.retryOnBackendError = retryOnBackendError; + return this; + } + + public Builder setStoreResults(@Nullable Boolean storeResults) { + this.storeResults = storeResults; + return this; + } + + public Builder setInitialRetryDuration(@Nullable Long initialRetryDuration) { + this.initialRetryDuration = initialRetryDuration; + return this; + } + + public Builder setMaxRetryDuration(@Nullable Long maxRetryDuration) { + this.maxRetryDuration = maxRetryDuration; + return this; + } + + public Builder setMaxRetryCount(@Nullable Integer maxRetryCount) { + this.maxRetryCount = maxRetryCount; + return this; + } + + public Builder setRetryMultiplier(@Nullable Double retryMultiplier) { + this.retryMultiplier = retryMultiplier; + return this; + } + public Config build() { return new Config( project, @@ -795,6 +893,13 @@ public Config build() { maxRetryDuration, retryMultiplier, maxRetryCount + jobLabelKeyValue, + rowAsArguments, + retryOnBackendError, + initialRetryDuration, + maxRetryDuration, + retryMultiplier, + maxRetryCount ); } } From db990e6ee3fdcefb548ad66bffd999e61cc49854 Mon Sep 17 00:00:00 2001 From: priyabhatnagar Date: Fri, 15 Dec 2023 14:57:21 +0530 Subject: [PATCH 5/8] pubsub to pubsub e2e scenarios --- .../PubSubToPubSub.feature | 166 ++++++++++++++++++ .../features/pubsub/sink/BQToPubSub.feature | 38 ---- .../common/stepsdesign/TestSetupHooks.java | 44 +++-- .../pubsub/locators/PubSubLocators.java | 15 ++ .../pubsub/runners/sinkrunner/TestRunner.java | 2 +- .../sinkrunner/TestRunnerRequired.java | 2 +- .../plugin/pubsub/stepsdesign/PubSubSink.java | 30 +++- .../pubsub/stepsdesign/PubSubSource.java | 32 +++- .../io/cdap/plugin/utils/PubSubClient.java | 136 +++++++------- .../resources/pluginParameters.properties | 6 +- 10 files changed, 348 insertions(+), 123 deletions(-) create mode 100644 src/e2e-test/features/pubsub/pubsubadditionalfeature/PubSubToPubSub.feature diff --git a/src/e2e-test/features/pubsub/pubsubadditionalfeature/PubSubToPubSub.feature b/src/e2e-test/features/pubsub/pubsubadditionalfeature/PubSubToPubSub.feature new file mode 100644 index 0000000000..423cda02c1 --- /dev/null +++ b/src/e2e-test/features/pubsub/pubsubadditionalfeature/PubSubToPubSub.feature @@ -0,0 +1,166 @@ +# Copyright © 2024 Cask Data, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may not +# use this file except in compliance with the License. You may obtain a copy of +# the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations under +# the License. + +@PubSub_DataStream +Feature: PubSub - Verification of successful data transfer from DataStream PubSub source to PubSub sink using different file formats + + @PUBSUB_SOURCE_TEST @PUBSUB_SINK_TEST @PUBSUB_SUBSCRIPTION_TEST + Scenario: Validate successful transfer of records from PubSub(source) to PubSub(sink). + Given Open Datafusion Project to configure pipeline + When Select data pipeline type as: "Realtime" + When Expand Plugin group in the LHS plugins list: "Source" + When Select plugin: "Pub/Sub" from the plugins list as: "Source" + When Expand Plugin group in the LHS plugins list: "Sink" + When Select plugin: "Pub/Sub" from the plugins list as: "Sink" + Then Connect plugins: "Pub/Sub" and "Pub/Sub2" to establish connection + Then Navigate to the properties page of plugin: "Pub/Sub" + Then Override Service account details if set in environment variables + Then Enter input plugin property: "referenceName" with value: "PubSubSourceReferenceName" + Then Enter PubSub source property subscription name + Then Enter PubSub source property topic name + Then Validate "Pub/Sub" plugin properties + And Close the Plugin Properties page + Then Navigate to the properties page of plugin: "Pub/Sub2" + Then Replace input plugin property: "project" with value: "projectId" + Then Override Service account details if set in environment variables + Then Enter input plugin property: "referenceName" with value: "PubSubSinkReferenceName" + Then Enter PubSub sink property topic name + Then Validate "Pub/Sub" plugin properties + And Close the Plugin Properties page + And Click on configure button + And Click on pipeline config + And Click on batch time and select format + Then Save the pipeline + Then Deploy the pipeline + Then Run the Pipeline in Runtime + Then Wait for pipeline to be in status: "Running" with a timeout of 240 seconds + Then Publish the messages + Then Subscribe to the messages + Then Validate OUT record count is equal to IN record count + And Stop the pipeline + Then Verify the pipeline status is "Stopped" + + @PUBSUB_SOURCE_TEST @PUBSUB_SINK_TEST @PUBSUB_SUBSCRIPTION_TEST + Scenario: Validate successful transfer of records from PubSub(source) to PubSub(sink) using macros. + Given Open Datafusion Project to configure pipeline + When Select data pipeline type as: "Realtime" + When Expand Plugin group in the LHS plugins list: "Source" + When Select plugin: "Pub/Sub" from the plugins list as: "Source" + When Expand Plugin group in the LHS plugins list: "Sink" + When Select plugin: "Pub/Sub" from the plugins list as: "Sink" + Then Connect plugins: "Pub/Sub" and "Pub/Sub2" to establish connection + Then Navigate to the properties page of plugin: "Pub/Sub" + Then Override Service account details if set in environment variables + Then Enter input plugin property: "referenceName" with value: "PubSubSourceReferenceName" + Then Click on the Macro button of Property: "topic" and set the value to: "pubSubSourceTopic" + Then Click on the Macro button of Property: "subscription" and set the value to: "pubSubSourceSubscription" + Then Validate "Pub/Sub" plugin properties + And Close the Plugin Properties page + Then Navigate to the properties page of plugin: "Pub/Sub2" + Then Replace input plugin property: "project" with value: "projectId" + Then Override Service account details if set in environment variables + Then Enter input plugin property: "referenceName" with value: "PubSubSinkReferenceName" + Then Enter PubSub sink property topic name + Then Validate "Pub/Sub" plugin properties + And Close the Plugin Properties page + And Click on configure button + And Click on pipeline config + And Click on batch time and select format + Then Save the pipeline + Then Deploy the pipeline + Then Enter runtime argument value for PubSub source property topic key "pubSubSourceTopic" + Then Enter runtime argument value for PubSub source property subscription key "pubSubSourceSubscription" + Then Run the Pipeline in Runtime with runtime arguments + Then Wait for pipeline to be in status: "Running" with a timeout of 240 seconds + Then Publish the messages + Then Subscribe to the messages + Then Validate OUT record count is equal to IN record count + And Stop the pipeline + Then Verify the pipeline status is "Stopped" + + @PUBSUB_SOURCE_TEST @PUBSUB_SINK_TEST @PUBSUB_SUBSCRIPTION_TEST + Scenario: Validate the successful transfer of records from a pubSub source to a pubSub sink with format Text at both source and sink + Given Open Datafusion Project to configure pipeline + When Select data pipeline type as: "Realtime" + When Expand Plugin group in the LHS plugins list: "Source" + When Select plugin: "Pub/Sub" from the plugins list as: "Source" + When Expand Plugin group in the LHS plugins list: "Sink" + When Select plugin: "Pub/Sub" from the plugins list as: "Sink" + Then Connect plugins: "Pub/Sub" and "Pub/Sub2" to establish connection + Then Navigate to the properties page of plugin: "Pub/Sub" + Then Override Service account details if set in environment variables + Then Enter input plugin property: "referenceName" with value: "PubSubSourceReferenceName" + Then Enter PubSub source property subscription name + Then Enter PubSub source property topic name + Then Select dropdown plugin property: "select-format" with option value: "text" + Then Validate "Pub/Sub" plugin properties + And Close the Plugin Properties page + Then Navigate to the properties page of plugin: "Pub/Sub2" + Then Replace input plugin property: "project" with value: "projectId" + Then Override Service account details if set in environment variables + Then Enter input plugin property: "referenceName" with value: "PubSubSinkReferenceName" + Then Enter PubSub sink property topic name + Then Select dropdown plugin property: "select-format" with option value: "text" + Then Validate "Pub/Sub" plugin properties + And Close the Plugin Properties page + And Click on configure button + And Click on pipeline config + And Click on batch time and select format + Then Save the pipeline + Then Deploy the pipeline + Then Run the Pipeline in Runtime + Then Wait for pipeline to be in status: "Running" with a timeout of 240 seconds + Then Publish the messages + Then Subscribe to the messages + Then Validate OUT record count is equal to IN record count + And Stop the pipeline + Then Verify the pipeline status is "Stopped" + + @PUBSUB_SOURCE_TEST @PUBSUB_SINK_TEST @PUBSUB_SUBSCRIPTION_TEST + Scenario: Validate the successful transfer of records from a pubSub source to a pubSub sink with format Text at source and Json at sink + Given Open Datafusion Project to configure pipeline + When Select data pipeline type as: "Realtime" + When Expand Plugin group in the LHS plugins list: "Source" + When Select plugin: "Pub/Sub" from the plugins list as: "Source" + When Expand Plugin group in the LHS plugins list: "Sink" + When Select plugin: "Pub/Sub" from the plugins list as: "Sink" + Then Connect plugins: "Pub/Sub" and "Pub/Sub2" to establish connection + Then Navigate to the properties page of plugin: "Pub/Sub" + Then Override Service account details if set in environment variables + Then Enter input plugin property: "referenceName" with value: "PubSubSourceReferenceName" + Then Enter PubSub source property subscription name + Then Enter PubSub source property topic name + Then Select dropdown plugin property: "select-format" with option value: "text" + Then Validate "Pub/Sub" plugin properties + And Close the Plugin Properties page + Then Navigate to the properties page of plugin: "Pub/Sub2" + Then Replace input plugin property: "project" with value: "projectId" + Then Override Service account details if set in environment variables + Then Enter input plugin property: "referenceName" with value: "PubSubSinkReferenceName" + Then Enter PubSub sink property topic name + Then Select dropdown plugin property: "select-format" with option value: "json" + Then Validate "Pub/Sub" plugin properties + And Close the Plugin Properties page + And Click on configure button + And Click on pipeline config + And Click on batch time and select format + Then Save the pipeline + Then Deploy the pipeline + Then Run the Pipeline in Runtime + Then Wait for pipeline to be in status: "Running" with a timeout of 240 seconds + Then Publish the messages + Then Subscribe to the messages + Then Validate OUT record count is equal to IN record count + And Stop the pipeline + Then Verify the pipeline status is "Stopped" diff --git a/src/e2e-test/features/pubsub/sink/BQToPubSub.feature b/src/e2e-test/features/pubsub/sink/BQToPubSub.feature index adb73090c1..8a1b5aeabd 100644 --- a/src/e2e-test/features/pubsub/sink/BQToPubSub.feature +++ b/src/e2e-test/features/pubsub/sink/BQToPubSub.feature @@ -40,41 +40,3 @@ Feature: PubSub-Sink - Verification of BigQuery to PubSub successful data transf Then Validate OUT record count is equal to IN record count Then Open and capture logs Then Validate the cmek key "cmekPubSub" of target PubSub topic if cmek is enabled - - @PUBSUB_SOURCE_TEST @PUBSUB_SINK_TEST @PUBSUB_SUBSCRIPTION_TEST - Scenario: pubsub flow - Given Open Datafusion Project to configure pipeline - When Select data pipeline type as: "Realtime" - When Expand Plugin group in the LHS plugins list: "Source" - When Select plugin: "Pub/Sub" from the plugins list as: "Source" - When Expand Plugin group in the LHS plugins list: "Sink" - When Select plugin: "Pub/Sub" from the plugins list as: "Sink" - Then Connect plugins: "Pub/Sub" and "Pub/Sub2" to establish connection - Then Navigate to the properties page of plugin: "Pub/Sub" - Then Override Service account details if set in environment variables - Then Enter input plugin property: "referenceName" with value: "BQReferenceName" - Then Enter PubSub source property subscription name - Then Enter PubSub source property topic name - Then Validate "Pub/Sub" plugin properties - And Close the Plugin Properties page - Then Open the PubSub sink properties - Then Enter PubSub property projectId "projectId" - Then Override Service account details if set in environment variables - Then Enter PubSub property reference name - Then Enter PubSub source property topic name - Then Validate "Pub/Sub" plugin properties - And Close the Plugin Properties page - - Then Save the pipeline - Then Preview and run the pipeline - Then Wait till pipeline preview is in running state - Then Open and capture pipeline preview logs - Then Close the pipeline logs - Then Close the preview - Then Deploy the pipeline - Then Run the Pipeline in Runtime - # Then Wait till pipeline is in running state - #Then Open and capture logs - #Then Verify the pipeline status is "Succeeded" - Then Publish the message - Then Subscribe to the messages diff --git a/src/e2e-test/java/io/cdap/plugin/common/stepsdesign/TestSetupHooks.java b/src/e2e-test/java/io/cdap/plugin/common/stepsdesign/TestSetupHooks.java index 0b564f587f..af05fccfc9 100644 --- a/src/e2e-test/java/io/cdap/plugin/common/stepsdesign/TestSetupHooks.java +++ b/src/e2e-test/java/io/cdap/plugin/common/stepsdesign/TestSetupHooks.java @@ -21,6 +21,7 @@ import io.cdap.e2e.pages.actions.CdfConnectionActions; import io.cdap.e2e.pages.actions.CdfPluginPropertiesActions; import io.cdap.e2e.utils.BigQueryClient; +import io.cdap.e2e.utils.ConstantsUtil; import io.cdap.e2e.utils.PluginPropertyUtils; import io.cdap.e2e.utils.StorageClient; import io.cdap.plugin.utils.PubSubClient; @@ -482,12 +483,6 @@ private static String createGCSBucketWithFilesAndFolder(String folderPath) throw return bucketName; } - @Before(order = 1, value = "@PUBSUB_SINK_TEST") - public static void createTargetPubSubTopic() { - pubSubTargetTopic = "cdf-e2e-test-" + UUID.randomUUID(); - BeforeActions.scenario.write("Target PubSub topic " + pubSubTargetTopic); - } - @Before(order = 1, value = "@PUBSUB_SOURCE_TEST") public static void createSourcePubSubTopic() throws IOException { pubSubSourceTopic = "cdf-e2e-test-" + UUID.randomUUID(); @@ -496,18 +491,33 @@ public static void createSourcePubSubTopic() throws IOException { } @Before(order = 1, value = "@PUBSUB_SUBSCRIPTION_TEST") - public static void createSubscriptionPubSubTopic() { + public static void createSubscriptionPubSubTopic() throws IOException { pubSubSourceSubscription = "cdf-e2e-test-" + UUID.randomUUID(); + PubSubClient.createSubscription(pubSubSourceSubscription , pubSubSourceTopic); BeforeActions.scenario.write("Source PubSub subscription " + pubSubSourceSubscription); } - @After(order = 1, value = "") + + @After(order = 1, value = "@PUBSUB_SOURCE_TEST") public static void deleteSourcePubSubTopic() { try { PubSubClient.deleteTopic(pubSubSourceTopic); - BeforeActions.scenario.write("Deleted Source PubSub topic " + pubSubSourceTopic); + BeforeActions.scenario.write("Deleted target PubSub topic " + pubSubSourceTopic); pubSubSourceTopic = StringUtils.EMPTY; } catch (Exception e) { if (e.getMessage().contains("Invalid resource name given") || e.getMessage().contains("Resource not found")) { + + } + } + } + + @After(order = 1, value = "@PUBSUB_SUBSCRIPTION_TEST") + public static void deleteSourcePubSubSubscription() { + try { + PubSubClient.deleteTopic(pubSubSourceSubscription); + BeforeActions.scenario.write("Deleted target PubSub topic " + pubSubSourceSubscription); + pubSubSourceSubscription = StringUtils.EMPTY; + } catch (Exception e) { + if (e.getMessage().contains("Invalid resource name given") || e.getMessage().contains("Resource not found")) { BeforeActions.scenario.write("Source PubSub topic " + pubSubSourceTopic + " does not exist."); } else { Assert.fail(e.getMessage()); @@ -515,7 +525,21 @@ public static void deleteSourcePubSubTopic() { } } - @After(order = 1, value = "") + public static void publishMessage() throws IOException, InterruptedException { + String dataMessage1 = PluginPropertyUtils.pluginProp("firstMessage"); + String dataMessage2 = PluginPropertyUtils.pluginProp("secondMessage"); + List dataMessagesList = Arrays.asList(dataMessage1, dataMessage2); + PubSubClient.publishMessagesWithPubSub(PluginPropertyUtils.pluginProp + (ConstantsUtil.PROJECT_ID), pubSubSourceTopic, dataMessagesList); + } + + @Before(order = 1, value = "@PUBSUB_SINK_TEST") + public static void createTargetPubSubTopic() { + pubSubTargetTopic = "cdf-e2e-test-" + UUID.randomUUID(); + BeforeActions.scenario.write("Target PubSub topic " + pubSubTargetTopic); + } + + @After(order = 1, value = "@PUBSUB_SINK_TEST") public static void deleteTargetPubSubTopic() { try { PubSubClient.deleteTopic(pubSubTargetTopic); diff --git a/src/e2e-test/java/io/cdap/plugin/pubsub/locators/PubSubLocators.java b/src/e2e-test/java/io/cdap/plugin/pubsub/locators/PubSubLocators.java index 0019d5c3b1..a7589eb9db 100644 --- a/src/e2e-test/java/io/cdap/plugin/pubsub/locators/PubSubLocators.java +++ b/src/e2e-test/java/io/cdap/plugin/pubsub/locators/PubSubLocators.java @@ -74,4 +74,19 @@ public static WebElement selectedFormat(String format) { return SeleniumDriver.getDriver() .findElement(By.xpath("//*[@data-cy='select-format']/div[text()='" + format + "']")); } + + @FindBy(how = How.XPATH, using = "//span[contains(text(), \"Batch interval\")]//following-sibling::div//select[1]") + public static WebElement batchTime; + + @FindBy(how = How.XPATH, using = "//span[contains(text(), \"Batch interval\")]//following-sibling::div//select[2]") + public static WebElement timeSelect; + + @FindBy(how = How.XPATH, using = "//button[@data-testid='config-apply-close']") + public static WebElement saveButton; + + @FindBy(how = How.XPATH, using = "//*[@data-cy='pipeline-configure-modeless-btn']") + public static WebElement configButton; + + @FindBy(how = How.XPATH, using = "//*[@data-cy='tab-content-Pipeline config']") + public static WebElement pipelineConfig; } diff --git a/src/e2e-test/java/io/cdap/plugin/pubsub/runners/sinkrunner/TestRunner.java b/src/e2e-test/java/io/cdap/plugin/pubsub/runners/sinkrunner/TestRunner.java index c49b12f04c..e44c75b407 100644 --- a/src/e2e-test/java/io/cdap/plugin/pubsub/runners/sinkrunner/TestRunner.java +++ b/src/e2e-test/java/io/cdap/plugin/pubsub/runners/sinkrunner/TestRunner.java @@ -27,7 +27,7 @@ features = {"src/e2e-test/features"}, glue = {"io.cdap.plugin.pubsub.stepsdesign", "io.cdap.plugin.gcs.stepsdesign", "io.cdap.plugin.common.stepsdesign", "io.cdap.plugin.bigquery.stepsdesign", "stepsdesign"}, - tags = {"@PubSub_Sink"}, + tags = {"@PubSub_Sink","@PubSub_DataStream"}, monochrome = true, plugin = {"pretty", "html:target/cucumber-html-report/pubsub-sink", "json:target/cucumber-reports/cucumber-pubsub-sink.json", diff --git a/src/e2e-test/java/io/cdap/plugin/pubsub/runners/sinkrunner/TestRunnerRequired.java b/src/e2e-test/java/io/cdap/plugin/pubsub/runners/sinkrunner/TestRunnerRequired.java index 352d31a6c3..c9ee839731 100644 --- a/src/e2e-test/java/io/cdap/plugin/pubsub/runners/sinkrunner/TestRunnerRequired.java +++ b/src/e2e-test/java/io/cdap/plugin/pubsub/runners/sinkrunner/TestRunnerRequired.java @@ -27,7 +27,7 @@ features = {"src/e2e-test/features"}, glue = {"io.cdap.plugin.pubsub.stepsdesign", "io.cdap.plugin.gcs.stepsdesign", "io.cdap.plugin.common.stepsdesign", "io.cdap.plugin.bigquery.stepsdesign", "stepsdesign"}, - tags = {"@PubSub_Sink_Required"}, + tags = {"@PubSub_Sink_Required","@PubSub_DataStream_Required"}, monochrome = true, plugin = {"pretty", "html:target/cucumber-html-report/pubsub-sink-required", "json:target/cucumber-reports/cucumber-pubsub-sink-required.json", diff --git a/src/e2e-test/java/io/cdap/plugin/pubsub/stepsdesign/PubSubSink.java b/src/e2e-test/java/io/cdap/plugin/pubsub/stepsdesign/PubSubSink.java index 1cb687e2f0..17888b0fdf 100644 --- a/src/e2e-test/java/io/cdap/plugin/pubsub/stepsdesign/PubSubSink.java +++ b/src/e2e-test/java/io/cdap/plugin/pubsub/stepsdesign/PubSubSink.java @@ -27,13 +27,16 @@ import io.cdap.plugin.utils.E2EHelper; import io.cdap.plugin.utils.E2ETestConstants; import io.cdap.plugin.utils.PubSubClient; +import io.cucumber.java.en.And; import io.cucumber.java.en.Then; import io.cucumber.java.en.When; import org.apache.commons.lang3.StringUtils; import org.junit.Assert; +import org.openqa.selenium.support.ui.Select; import stepsdesign.BeforeActions; import java.io.IOException; +import java.util.concurrent.TimeUnit; /** * PubSub Sink Plugin related step design. @@ -234,10 +237,31 @@ public void enterRuntimeArgumentValueForPubSubSinkPropertyTopicKey(String runtim public void clickOnPreviewDataForPubSubSink() { openSinkPluginPreviewData("GooglePublisher"); } + @Then("Subscribe to the messages") - public void subscribeToTheMessages() - { - PubSubClient.subscribeAsyncExample(PluginPropertyUtils.pluginProp(ConstantsUtil.PROJECT_ID),TestSetupHooks.pubSubSourceSubscription); + public void subscribeToTheMessages() throws InterruptedException { + TimeUnit time = TimeUnit.SECONDS; + time.sleep(60); + PubSubClient.subscribeAsyncExample(PluginPropertyUtils.pluginProp(ConstantsUtil.PROJECT_ID), + TestSetupHooks.pubSubSourceSubscription); + } + + @And("Click on batch time and select format") + public void clickOnBatchTimeAndSelectFormat() { + Select select = new Select(PubSubLocators.batchTime); + select.selectByIndex(0); + Select selectformat = new Select(PubSubLocators.timeSelect); + selectformat.selectByIndex(1); + ElementHelper.clickOnElement(PubSubLocators.saveButton); } + @And("Click on configure button") + public void clickOnConfigureButton() { + ElementHelper.clickOnElement(PubSubLocators.configButton); + } + + @And("Click on pipeline config") + public void clickOnPipelineConfig() { + ElementHelper.clickOnElement(PubSubLocators.pipelineConfig); + } } diff --git a/src/e2e-test/java/io/cdap/plugin/pubsub/stepsdesign/PubSubSource.java b/src/e2e-test/java/io/cdap/plugin/pubsub/stepsdesign/PubSubSource.java index 29cb952538..11a5da516e 100644 --- a/src/e2e-test/java/io/cdap/plugin/pubsub/stepsdesign/PubSubSource.java +++ b/src/e2e-test/java/io/cdap/plugin/pubsub/stepsdesign/PubSubSource.java @@ -16,15 +16,16 @@ package io.cdap.plugin.pubsub.stepsdesign; +import io.cdap.e2e.pages.locators.CdfStudioLocators; import io.cdap.e2e.utils.CdfHelper; -import io.cdap.e2e.utils.ConstantsUtil; -import io.cdap.e2e.utils.PluginPropertyUtils; +import io.cdap.e2e.utils.ElementHelper; import io.cdap.plugin.common.stepsdesign.TestSetupHooks; import io.cdap.plugin.pubsub.actions.PubSubActions; -import io.cdap.plugin.utils.PubSubClient; import io.cucumber.java.en.Then; import io.cucumber.java.en.When; + import java.io.IOException; +import java.util.concurrent.TimeUnit; /** * PubSub Source Plugin related step design. @@ -39,19 +40,34 @@ public void sourceIsPubSub() { @Then("Open the PubSub source properties") public void openThePubSubSourceProperties() { - openSourcePluginProperties("pubsub"); + openSourcePluginProperties("GooglePublisher"); } @Then("Enter PubSub source property topic name") public void enterPubSubSourcePropertyTopicName() { - PubSubActions.enterPubSubTopic(TestSetupHooks.pubSubSourceTopic); } + PubSubActions.enterPubSubTopic(TestSetupHooks.pubSubSourceTopic); + } + @Then("Enter PubSub source property subscription name") public void enterPubSubSourcePropertySubscriptionName() { PubSubActions.enterSubscription(TestSetupHooks.pubSubSourceSubscription); } - @Then("Publish the message") + @Then("Publish the messages") public void publishTheMessage() throws IOException, InterruptedException { - PubSubClient.publishWithErrorHandlerExample(PluginPropertyUtils.pluginProp(ConstantsUtil.PROJECT_ID), TestSetupHooks.pubSubSourceTopic); + TimeUnit time = TimeUnit.SECONDS; + time.sleep(120); + TestSetupHooks.publishMessage(); + } + + @Then("Enter runtime argument value for PubSub source property topic key {string}") + public void enterRuntimeArgumentValueForPubSubSourcePropertyTopicKey(String runtimeArgumentKey) { + ElementHelper.sendKeys(CdfStudioLocators.runtimeArgsValue(runtimeArgumentKey), TestSetupHooks.pubSubSourceTopic); + } + + @Then("Enter runtime argument value for PubSub source property subscription key {string}") + public void enterRuntimeArgumentValueForPubSubSourcePropertySubscriptionKey(String runtimeArgumentKey) { + ElementHelper.sendKeys(CdfStudioLocators.runtimeArgsValue(runtimeArgumentKey), + TestSetupHooks.pubSubSourceSubscription); } -} \ No newline at end of file +} diff --git a/src/e2e-test/java/io/cdap/plugin/utils/PubSubClient.java b/src/e2e-test/java/io/cdap/plugin/utils/PubSubClient.java index 94a379a97d..6fb4f94a0c 100644 --- a/src/e2e-test/java/io/cdap/plugin/utils/PubSubClient.java +++ b/src/e2e-test/java/io/cdap/plugin/utils/PubSubClient.java @@ -1,5 +1,5 @@ /* - * Copyright © 2021 Cask Data, Inc. + * Copyright © 2024 Cask Data, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); you may not * use this file except in compliance with the License. You may obtain a copy of @@ -19,7 +19,6 @@ import com.google.api.core.ApiFuture; import com.google.api.core.ApiFutureCallback; import com.google.api.core.ApiFutures; -import com.google.api.gax.rpc.AlreadyExistsException; import com.google.api.gax.rpc.ApiException; import com.google.cloud.pubsub.v1.AckReplyConsumer; import com.google.cloud.pubsub.v1.MessageReceiver; @@ -30,16 +29,18 @@ import com.google.cloud.pubsub.v1.TopicAdminClient; import com.google.common.util.concurrent.MoreExecutors; import com.google.protobuf.ByteString; +import com.google.pubsub.v1.ProjectSubscriptionName; import com.google.pubsub.v1.PubsubMessage; import com.google.pubsub.v1.PushConfig; import com.google.pubsub.v1.Topic; import com.google.pubsub.v1.TopicName; import io.cdap.e2e.utils.ConstantsUtil; import io.cdap.e2e.utils.PluginPropertyUtils; -import com.google.pubsub.v1.ProjectSubscriptionName; +import io.grpc.StatusRuntimeException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; -import java.util.Arrays; import java.util.List; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -49,6 +50,8 @@ */ public class PubSubClient { + private static final Logger logger = LoggerFactory.getLogger(PubSubClient.class); + public static Topic createTopic(String topicId) throws IOException { try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) { TopicName topicName = TopicName.of(PluginPropertyUtils.pluginProp(ConstantsUtil.PROJECT_ID), topicId); @@ -56,18 +59,24 @@ public static Topic createTopic(String topicId) throws IOException { } } - // Create the subscription + /** + * Create the subscription. + */ public static void createSubscription(String subscriptionId, String topicId) throws IOException { ProjectSubscriptionName subscriptionName = null; try (SubscriptionAdminClient subscriptionAdminClient = SubscriptionAdminClient.create( SubscriptionAdminSettings.newBuilder().build())) { TopicName topicName = TopicName.of(PluginPropertyUtils.pluginProp(ConstantsUtil.PROJECT_ID), topicId); - subscriptionName = ProjectSubscriptionName.of(ConstantsUtil.PROJECT_ID, subscriptionId); + subscriptionName = ProjectSubscriptionName.of + (PluginPropertyUtils.pluginProp(ConstantsUtil.PROJECT_ID), subscriptionId); subscriptionAdminClient.createSubscription(subscriptionName, topicName, PushConfig.getDefaultInstance(), 60); - System.out.println("Subscription created: " + subscriptionName.toString()); - } catch (AlreadyExistsException e) { - assert subscriptionName != null; - System.out.println("Subscription already exists: " + subscriptionName.toString()); + logger.info("Subscription created: " + subscriptionName.toString()); + } catch (StatusRuntimeException e) { + if ("ALREADY_EXISTS".equals(e.getStatus().getCode().name())) { + logger.info("Subscription already exists: {}", subscriptionName.toString()); + } else { + logger.info("Error creating subscription", e); + } } } @@ -92,48 +101,50 @@ public static String getTopicCmekKey(String topicId) throws IOException { } - public static void publishWithErrorHandlerExample(String projectId, String topicId) - throws IOException, InterruptedException { + public static void publishMessagesWithPubSub(String projectId, String topicId, List dataMessages) + throws IOException, InterruptedException { TopicName topicName = TopicName.of(projectId, topicId); Publisher publisher = null; - try { publisher = Publisher.newBuilder(topicName).build(); - - List messages = Arrays.asList("first message", "second message"); - - for (final String message : messages) { + for (final String message : dataMessages) { ByteString data = ByteString.copyFromUtf8(message); PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build(); ApiFuture future = publisher.publish(pubsubMessage); - // Adding an asynchronous callback to handle success / failure - ApiFutures.addCallback( - future, - new ApiFutureCallback() { - - @Override - public void onFailure(Throwable throwable) { - if (throwable instanceof ApiException) { - ApiException apiException = ((ApiException) throwable); - // details on the API exception - // System.out.println(apiException.getStatusCode().getCode()); - //System.out.println(apiException.isRetryable()); - } - System.out.println("Error publishing message : " + message); - } - - @Override - public void onSuccess(String messageId) { - // Once published, returns server-assigned message ids (unique within the topic) - System.out.println("Published message ID: " + messageId); - } - }, - MoreExecutors.directExecutor()); + /** + * Adding an asynchronous callback to handle success / failure. + */ + ApiFutures.addCallback( future, + new ApiFutureCallback() { + + @Override + public void onFailure(Throwable throwable) { + if (throwable instanceof ApiException) { + ApiException apiException = ((ApiException) throwable); + /** + * details on the API exception. + */ + logger.info(String.valueOf(apiException.getStatusCode().getCode())); + logger.info(String.valueOf(apiException.isRetryable())); + } + logger.info("Error publishing message : " + message); + } + @Override + public void onSuccess(String messageId) { + /** + * Once published, returns server-assigned message ids (unique within the topic). + */ + logger.info("Published message ID: " + messageId); + } + }, + MoreExecutors.directExecutor()); } } finally { if (publisher != null) { - // When finished with the publisher, shutdown to free up resources. + /** + * When finished with the publisher, shutdown to free up resources. + */ publisher.shutdown(); publisher.awaitTermination(1, TimeUnit.MINUTES); } @@ -141,33 +152,38 @@ public void onSuccess(String messageId) { } public static void subscribeAsyncExample(String projectId, String subscriptionId) { - ProjectSubscriptionName subscriptionName = - ProjectSubscriptionName.of(projectId, subscriptionId); - - // Instantiate an asynchronous message receiver. + ProjectSubscriptionName subscriptionName = ProjectSubscriptionName.of(projectId, subscriptionId); + /** + * Instantiate an asynchronous message receiver. + */ MessageReceiver receiver = - (PubsubMessage message, AckReplyConsumer consumer) -> { - // Handle incoming message, then ack the received message. - System.out.println("Id: " + message.getMessageId()); - System.out.println("Data: " + message.getData().toStringUtf8()); - consumer.ack(); - }; + (PubsubMessage message, AckReplyConsumer consumer) -> { + /** + * Handle incoming message, then ack the received message. + */ + logger.info("Id: " + message.getMessageId()); + logger.info("Data: " + message.getData().toStringUtf8()); + consumer.ack(); + }; Subscriber subscriber = null; try { subscriber = Subscriber.newBuilder(subscriptionName, receiver).build(); - // Start the subscriber. + /** + * Start the subscriber. + */ subscriber.startAsync().awaitRunning(); - System.out.printf("Listening for messages on %s:\n", subscriptionName.toString()); - // Allow the subscriber to run for 30s unless an unrecoverable error occurs. - subscriber.awaitTerminated(300, TimeUnit.SECONDS); + logger.info("Listening for messages on %s:\n", subscriptionName); + /** + * Allow the subscriber to run for 30s unless an unrecoverable error occurs. + */ + subscriber.awaitTerminated(200, TimeUnit.SECONDS); } catch (TimeoutException timeoutException) { - // Shut down the subscriber after 30s. Stop receiving messages. + logger.error("Timeout exception: {e}"); + /** + * Shut down the subscriber after 30s. Stop receiving messages. + */ subscriber.stopAsync(); } } - - - - } diff --git a/src/e2e-test/resources/pluginParameters.properties b/src/e2e-test/resources/pluginParameters.properties index f6d091c865..29f6f16754 100644 --- a/src/e2e-test/resources/pluginParameters.properties +++ b/src/e2e-test/resources/pluginParameters.properties @@ -181,8 +181,6 @@ bqUpdateTableSchemaTrue=True clusterValue=transaction_date TableKey=PersonID bqSourceTable=dummy -#pubSubSourceSubscription=dummy -#pubSubSourceTopic=dummy bqCreateTableQueryFile=testdata/BigQuery/BigQueryCreateTableQuery.txt bqInsertDataQueryFile=testdata/BigQuery/BigQueryInsertDataQuery.txt bqCreateViewQueryFile=testdata/BigQuery/BigQueryCreateViewQuery.txt @@ -245,6 +243,10 @@ pubSubErrorThreshold=0 pubSubStringValue=one pubSubNegativeValue=-100 pubsubDelimiter=@ +pubSubSourceSubscription=dummy +pubSubSourceTopic=dummy +firstMessage=helloMessage +secondMessage=hiMessage ## PUBSUBSINK-PLUGIN-PROPERTIES-END ## GCSDELETE-PLUGIN-PROPERTIES-START From c8ffc8620e1bab9b6fd4002d4913dc052e6d0b77 Mon Sep 17 00:00:00 2001 From: psainics Date: Tue, 5 Dec 2023 10:51:52 +0530 Subject: [PATCH 6/8] Added BQ Execute Action job label support --- .../io/cdap/plugin/gcp/bigquery/action/BigQueryExecute.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/main/java/io/cdap/plugin/gcp/bigquery/action/BigQueryExecute.java b/src/main/java/io/cdap/plugin/gcp/bigquery/action/BigQueryExecute.java index c5e2f4a6a2..7847bc3b36 100644 --- a/src/main/java/io/cdap/plugin/gcp/bigquery/action/BigQueryExecute.java +++ b/src/main/java/io/cdap/plugin/gcp/bigquery/action/BigQueryExecute.java @@ -121,6 +121,9 @@ public void run(ActionContext context) throws Exception { // Enable legacy SQL builder.setUseLegacySql(config.isLegacySQL()); + // Location must match that of the dataset(s) referenced in the query. + JobId jobId = JobId.newBuilder().setRandomJob().setLocation(config.getLocation()).build(); + // API request - starts the query. Credentials credentials = config.getServiceAccount() == null ? null : GCPUtils.loadServiceAccountCredentials(config.getServiceAccount(), From 3eea7c5761336f00da327639aa372a799545d522 Mon Sep 17 00:00:00 2001 From: psainics Date: Fri, 27 Oct 2023 19:56:41 +0530 Subject: [PATCH 7/8] Added BQ Execute Retry Added BQ Retry --- .../io/cdap/plugin/gcp/bigquery/action/BigQueryExecute.java | 3 --- 1 file changed, 3 deletions(-) diff --git a/src/main/java/io/cdap/plugin/gcp/bigquery/action/BigQueryExecute.java b/src/main/java/io/cdap/plugin/gcp/bigquery/action/BigQueryExecute.java index 7847bc3b36..c5e2f4a6a2 100644 --- a/src/main/java/io/cdap/plugin/gcp/bigquery/action/BigQueryExecute.java +++ b/src/main/java/io/cdap/plugin/gcp/bigquery/action/BigQueryExecute.java @@ -121,9 +121,6 @@ public void run(ActionContext context) throws Exception { // Enable legacy SQL builder.setUseLegacySql(config.isLegacySQL()); - // Location must match that of the dataset(s) referenced in the query. - JobId jobId = JobId.newBuilder().setRandomJob().setLocation(config.getLocation()).build(); - // API request - starts the query. Credentials credentials = config.getServiceAccount() == null ? null : GCPUtils.loadServiceAccountCredentials(config.getServiceAccount(), From f5807a544cd7b92a84c9efd359e160ef81cdf08f Mon Sep 17 00:00:00 2001 From: priyabhatnagar25 <112169140+priyabhatnagar25@users.noreply.github.com> Date: Tue, 16 Jan 2024 16:19:22 +0530 Subject: [PATCH 8/8] change file --- .../bigquery/sink/BigQueryRecordToJson.java | 4 ++ .../bigquery/BigQueryRecordToJsonTest.java | 52 ------------------- 2 files changed, 4 insertions(+), 52 deletions(-) diff --git a/src/main/java/io/cdap/plugin/gcp/bigquery/sink/BigQueryRecordToJson.java b/src/main/java/io/cdap/plugin/gcp/bigquery/sink/BigQueryRecordToJson.java index c397e52c74..ec2c70edb7 100644 --- a/src/main/java/io/cdap/plugin/gcp/bigquery/sink/BigQueryRecordToJson.java +++ b/src/main/java/io/cdap/plugin/gcp/bigquery/sink/BigQueryRecordToJson.java @@ -250,10 +250,14 @@ private static void writeArray(JsonWriter writer, } if (element instanceof StructuredRecord) { StructuredRecord record = (StructuredRecord) element; + path.add(name); processRecord(writer, record, Objects.requireNonNull(record.getSchema().getFields()), path, jsonStringFieldsPaths); + path.remove(path.size() - 1); } else { + path.add(name); write(writer, name, true, element, componentSchema, path, jsonStringFieldsPaths); + path.remove(path.size() - 1); } } } diff --git a/src/test/java/io/cdap/plugin/gcp/bigquery/BigQueryRecordToJsonTest.java b/src/test/java/io/cdap/plugin/gcp/bigquery/BigQueryRecordToJsonTest.java index e7c1f82aee..962a44fd28 100644 --- a/src/test/java/io/cdap/plugin/gcp/bigquery/BigQueryRecordToJsonTest.java +++ b/src/test/java/io/cdap/plugin/gcp/bigquery/BigQueryRecordToJsonTest.java @@ -422,58 +422,6 @@ public void testJsonStringWithEmptyArray() throws IOException { } } - @Test - public void testJsonStringWithStringArray() throws IOException { - Schema recordSchema = Schema.recordOf("record", - Schema.Field.of("arrayOfString", Schema.arrayOf(Schema.of(Schema.Type.STRING)))); - List jsonStringList = ImmutableList.of("{\"arrayKey1\": \"arrayValue1\"}", - "{\"arrayKey2\": \"arrayValue2\"}"); - StructuredRecord record = StructuredRecord.builder(recordSchema).set("arrayOfString", jsonStringList).build(); - Set jsonStringFieldsPaths = ImmutableSet.of("arrayOfString"); - try (JsonTreeWriter writer = new JsonTreeWriter()) { - writer.beginObject(); - for (Schema.Field recordField : Objects.requireNonNull(record.getSchema().getFields())) { - if (recordSchema.getField(recordField.getName()) != null) { - BigQueryRecordToJson.write(writer, recordField.getName(), record.get(recordField.getName()), - recordField.getSchema(), jsonStringFieldsPaths); - } - } - writer.endObject(); - JsonObject actual = writer.get().getAsJsonObject(); - String actualJsonString = actual.get("arrayOfString").getAsJsonArray().toString(); - String expectedJsonString = "[{\"arrayKey1\":\"arrayValue1\"},{\"arrayKey2\":\"arrayValue2\"}]"; - Assert.assertEquals(expectedJsonString, actualJsonString); - } - } - - @Test - public void testJsonStringWithArrayAndNestedRecord() throws IOException { - Schema nestedRecordSchema = Schema.recordOf("nestedRecord", - Schema.Field.of("nestedJsonString", Schema.of(Schema.Type.STRING))); - StructuredRecord nestedRecord = StructuredRecord.builder(nestedRecordSchema) - .set("nestedJsonString", "{\"nestedKey1\":\"nestedValue1\"}").build(); - Schema recordSchema = Schema.recordOf("record", - Schema.Field.of("arrayOfNestedRecord", Schema.arrayOf(nestedRecordSchema))); - List nestedRecordList = ImmutableList.of(nestedRecord); - StructuredRecord record = StructuredRecord.builder(recordSchema).set("arrayOfNestedRecord", nestedRecordList) - .build(); - - Set jsonStringFieldsPaths = ImmutableSet.of("arrayOfNestedRecord.nestedJsonString"); - try (JsonTreeWriter writer = new JsonTreeWriter()) { - writer.beginObject(); - for (Schema.Field recordField : Objects.requireNonNull(record.getSchema().getFields())) { - if (recordSchema.getField(recordField.getName()) != null) { - BigQueryRecordToJson.write(writer, recordField.getName(), record.get(recordField.getName()), - recordField.getSchema(), jsonStringFieldsPaths); - } - } - writer.endObject(); - JsonObject actual = writer.get().getAsJsonObject(); - String actualJsonString = actual.get("arrayOfNestedRecord").toString(); - String expectedJsonString = "[{\"nestedJsonString\":{\"nestedKey1\":\"nestedValue1\"}}]"; - Assert.assertEquals(expectedJsonString, actualJsonString); - } - } /** * Empty JSON string is not a valid JSON string and should throw an exception.