From 179362e3001bc83d9982573ef4475867e326bc37 Mon Sep 17 00:00:00 2001 From: priyabhatnagar Date: Fri, 15 Dec 2023 14:57:21 +0530 Subject: [PATCH 01/10] bq additional scenarios --- .../common/stepsdesign/TestSetupHooks.java | 31 ++++++++++++++++++- .../io/cdap/plugin/utils/PubSubClient.java | 21 +++++++++++++ 2 files changed, 51 insertions(+), 1 deletion(-) diff --git a/src/e2e-test/java/io/cdap/plugin/common/stepsdesign/TestSetupHooks.java b/src/e2e-test/java/io/cdap/plugin/common/stepsdesign/TestSetupHooks.java index 3fc9a35a94..f96842234d 100644 --- a/src/e2e-test/java/io/cdap/plugin/common/stepsdesign/TestSetupHooks.java +++ b/src/e2e-test/java/io/cdap/plugin/common/stepsdesign/TestSetupHooks.java @@ -62,6 +62,8 @@ public class TestSetupHooks { public static String bqSourceTable2 = StringUtils.EMPTY; public static String bqSourceView = StringUtils.EMPTY; public static String pubSubTargetTopic = StringUtils.EMPTY; + public static String pubSubSourceTopic = StringUtils.EMPTY; + public static String pubSubSourceSubscription = StringUtils.EMPTY; public static String spannerInstance = StringUtils.EMPTY; public static String spannerDatabase = StringUtils.EMPTY; public static String spannerSourceTable = StringUtils.EMPTY; @@ -486,6 +488,33 @@ public static void createTargetPubSubTopic() { BeforeActions.scenario.write("Target PubSub topic " + pubSubTargetTopic); } + @Before(order = 1, value = "@PUBSUB_SOURCE_TEST") + public static void createSourcePubSubTopic() throws IOException { + PubSubClient.createTopic(pubSubSourceTopic); + pubSubSourceTopic = "cdf-e2e-test-" + UUID.randomUUID(); + BeforeActions.scenario.write("Target PubSub topic " + pubSubSourceTopic); + } + + @Before(order = 1, value = "@PUBSUB_SUBSCRIPTION_TEST") + public static void createSubscriptionPubSubTopic() { + pubSubSourceSubscription = "cdf-e2e-test-" + UUID.randomUUID(); + BeforeActions.scenario.write("Source PubSub subscription " + pubSubSourceSubscription); + } + @After(order = 1, value = "@PUBSUB_SOURCE_TEST") + public static void deleteSourcePubSubTopic() { + try { + PubSubClient.deleteTopic(pubSubSourceTopic); + BeforeActions.scenario.write("Deleted target PubSub topic " + pubSubSourceTopic); + pubSubSourceTopic = StringUtils.EMPTY; + } catch (Exception e) { + if (e.getMessage().contains("Invalid resource name given") || e.getMessage().contains("Resource not found")) { + BeforeActions.scenario.write("Source PubSub topic " + pubSubSourceTopic + " does not exist."); + } else { + Assert.fail(e.getMessage()); + } + } + } + @After(order = 1, value = "@PUBSUB_SINK_TEST") public static void deleteTargetPubSubTopic() { try { @@ -1141,7 +1170,7 @@ public static void createSinkBQDedupeTable() throws IOException, InterruptedExce @Before(value = "@BQ_INSERT_INT_SOURCE_TEST") public static void createSourceBQTable() throws IOException, InterruptedException { - bqSourceTable = "E2E_TARGET_" + UUID.randomUUID().toString().replaceAll("-", "_"); + bqSourceTable = "E2E_SOURCE_" + UUID.randomUUID().toString().replaceAll("-", "_"); PluginPropertyUtils.addPluginProp("bqSourceTable", bqSourceTable); BeforeActions.scenario.write("BQ source table name - " + bqSourceTable); BigQueryClient.getSoleQueryResult("create table `" + datasetName + "." + bqSourceTable + "` " + diff --git a/src/e2e-test/java/io/cdap/plugin/utils/PubSubClient.java b/src/e2e-test/java/io/cdap/plugin/utils/PubSubClient.java index 0331627140..861e06111b 100644 --- a/src/e2e-test/java/io/cdap/plugin/utils/PubSubClient.java +++ b/src/e2e-test/java/io/cdap/plugin/utils/PubSubClient.java @@ -16,11 +16,17 @@ package io.cdap.plugin.utils; +import com.google.api.gax.rpc.AlreadyExistsException; +import com.google.cloud.pubsub.v1.SubscriptionAdminClient; +import com.google.cloud.pubsub.v1.SubscriptionAdminSettings; import com.google.cloud.pubsub.v1.TopicAdminClient; +import com.google.pubsub.v1.PushConfig; +import com.google.pubsub.v1.Subscription; import com.google.pubsub.v1.Topic; import com.google.pubsub.v1.TopicName; import io.cdap.e2e.utils.ConstantsUtil; import io.cdap.e2e.utils.PluginPropertyUtils; +import com.google.pubsub.v1.ProjectSubscriptionName; import java.io.IOException; @@ -36,6 +42,21 @@ public static Topic createTopic(String topicId) throws IOException { } } + // Create the subscription + public static Subscription createSubscription(String subscriptionId, String topicId) throws IOException { + ProjectSubscriptionName subscriptionName = null; + try (SubscriptionAdminClient subscriptionAdminClient = SubscriptionAdminClient.create( + SubscriptionAdminSettings.newBuilder().build())) { + TopicName topicName = TopicName.of(PluginPropertyUtils.pluginProp(ConstantsUtil.PROJECT_ID), topicId); + subscriptionName = ProjectSubscriptionName.of(ConstantsUtil.PROJECT_ID, subscriptionId); + subscriptionAdminClient.createSubscription(subscriptionName, topicName, PushConfig.getDefaultInstance(), 60); + System.out.println("Subscription created: " + subscriptionName.toString()); + } catch (AlreadyExistsException e) { + System.out.println("Subscription already exists: " + subscriptionName.toString()); + } + return null; + } + public static void deleteTopic(String topicId) throws IOException { try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) { TopicName topicName = TopicName.of(PluginPropertyUtils.pluginProp(ConstantsUtil.PROJECT_ID), topicId); From 3f034c0c930d51813e450e88ec2010830dc9e048 Mon Sep 17 00:00:00 2001 From: neerajsinghal Date: Tue, 2 Jan 2024 16:35:50 +0530 Subject: [PATCH 02/10] pubsub cases --- .../features/pubsub/sink/BQToPubSub.feature | 38 +++++++ .../common/stepsdesign/TestSetupHooks.java | 10 +- .../plugin/pubsub/stepsdesign/PubSubSink.java | 6 ++ .../pubsub/stepsdesign/PubSubSource.java | 23 ++++- .../io/cdap/plugin/utils/PubSubClient.java | 99 ++++++++++++++++++- .../resources/pluginParameters.properties | 2 + 6 files changed, 168 insertions(+), 10 deletions(-) diff --git a/src/e2e-test/features/pubsub/sink/BQToPubSub.feature b/src/e2e-test/features/pubsub/sink/BQToPubSub.feature index 8a1b5aeabd..adb73090c1 100644 --- a/src/e2e-test/features/pubsub/sink/BQToPubSub.feature +++ b/src/e2e-test/features/pubsub/sink/BQToPubSub.feature @@ -40,3 +40,41 @@ Feature: PubSub-Sink - Verification of BigQuery to PubSub successful data transf Then Validate OUT record count is equal to IN record count Then Open and capture logs Then Validate the cmek key "cmekPubSub" of target PubSub topic if cmek is enabled + + @PUBSUB_SOURCE_TEST @PUBSUB_SINK_TEST @PUBSUB_SUBSCRIPTION_TEST + Scenario: pubsub flow + Given Open Datafusion Project to configure pipeline + When Select data pipeline type as: "Realtime" + When Expand Plugin group in the LHS plugins list: "Source" + When Select plugin: "Pub/Sub" from the plugins list as: "Source" + When Expand Plugin group in the LHS plugins list: "Sink" + When Select plugin: "Pub/Sub" from the plugins list as: "Sink" + Then Connect plugins: "Pub/Sub" and "Pub/Sub2" to establish connection + Then Navigate to the properties page of plugin: "Pub/Sub" + Then Override Service account details if set in environment variables + Then Enter input plugin property: "referenceName" with value: "BQReferenceName" + Then Enter PubSub source property subscription name + Then Enter PubSub source property topic name + Then Validate "Pub/Sub" plugin properties + And Close the Plugin Properties page + Then Open the PubSub sink properties + Then Enter PubSub property projectId "projectId" + Then Override Service account details if set in environment variables + Then Enter PubSub property reference name + Then Enter PubSub source property topic name + Then Validate "Pub/Sub" plugin properties + And Close the Plugin Properties page + + Then Save the pipeline + Then Preview and run the pipeline + Then Wait till pipeline preview is in running state + Then Open and capture pipeline preview logs + Then Close the pipeline logs + Then Close the preview + Then Deploy the pipeline + Then Run the Pipeline in Runtime + # Then Wait till pipeline is in running state + #Then Open and capture logs + #Then Verify the pipeline status is "Succeeded" + Then Publish the message + Then Subscribe to the messages diff --git a/src/e2e-test/java/io/cdap/plugin/common/stepsdesign/TestSetupHooks.java b/src/e2e-test/java/io/cdap/plugin/common/stepsdesign/TestSetupHooks.java index f96842234d..0b564f587f 100644 --- a/src/e2e-test/java/io/cdap/plugin/common/stepsdesign/TestSetupHooks.java +++ b/src/e2e-test/java/io/cdap/plugin/common/stepsdesign/TestSetupHooks.java @@ -490,9 +490,9 @@ public static void createTargetPubSubTopic() { @Before(order = 1, value = "@PUBSUB_SOURCE_TEST") public static void createSourcePubSubTopic() throws IOException { - PubSubClient.createTopic(pubSubSourceTopic); pubSubSourceTopic = "cdf-e2e-test-" + UUID.randomUUID(); - BeforeActions.scenario.write("Target PubSub topic " + pubSubSourceTopic); + PubSubClient.createTopic(pubSubSourceTopic); + BeforeActions.scenario.write("Source PubSub topic " + pubSubSourceTopic); } @Before(order = 1, value = "@PUBSUB_SUBSCRIPTION_TEST") @@ -500,11 +500,11 @@ public static void createSubscriptionPubSubTopic() { pubSubSourceSubscription = "cdf-e2e-test-" + UUID.randomUUID(); BeforeActions.scenario.write("Source PubSub subscription " + pubSubSourceSubscription); } - @After(order = 1, value = "@PUBSUB_SOURCE_TEST") + @After(order = 1, value = "") public static void deleteSourcePubSubTopic() { try { PubSubClient.deleteTopic(pubSubSourceTopic); - BeforeActions.scenario.write("Deleted target PubSub topic " + pubSubSourceTopic); + BeforeActions.scenario.write("Deleted Source PubSub topic " + pubSubSourceTopic); pubSubSourceTopic = StringUtils.EMPTY; } catch (Exception e) { if (e.getMessage().contains("Invalid resource name given") || e.getMessage().contains("Resource not found")) { @@ -515,7 +515,7 @@ public static void deleteSourcePubSubTopic() { } } - @After(order = 1, value = "@PUBSUB_SINK_TEST") + @After(order = 1, value = "") public static void deleteTargetPubSubTopic() { try { PubSubClient.deleteTopic(pubSubTargetTopic); diff --git a/src/e2e-test/java/io/cdap/plugin/pubsub/stepsdesign/PubSubSink.java b/src/e2e-test/java/io/cdap/plugin/pubsub/stepsdesign/PubSubSink.java index 927b0d3c7d..1cb687e2f0 100644 --- a/src/e2e-test/java/io/cdap/plugin/pubsub/stepsdesign/PubSubSink.java +++ b/src/e2e-test/java/io/cdap/plugin/pubsub/stepsdesign/PubSubSink.java @@ -234,4 +234,10 @@ public void enterRuntimeArgumentValueForPubSubSinkPropertyTopicKey(String runtim public void clickOnPreviewDataForPubSubSink() { openSinkPluginPreviewData("GooglePublisher"); } + @Then("Subscribe to the messages") + public void subscribeToTheMessages() + { + PubSubClient.subscribeAsyncExample(PluginPropertyUtils.pluginProp(ConstantsUtil.PROJECT_ID),TestSetupHooks.pubSubSourceSubscription); + } + } diff --git a/src/e2e-test/java/io/cdap/plugin/pubsub/stepsdesign/PubSubSource.java b/src/e2e-test/java/io/cdap/plugin/pubsub/stepsdesign/PubSubSource.java index d18ef2ea9c..29cb952538 100644 --- a/src/e2e-test/java/io/cdap/plugin/pubsub/stepsdesign/PubSubSource.java +++ b/src/e2e-test/java/io/cdap/plugin/pubsub/stepsdesign/PubSubSource.java @@ -17,8 +17,14 @@ package io.cdap.plugin.pubsub.stepsdesign; import io.cdap.e2e.utils.CdfHelper; +import io.cdap.e2e.utils.ConstantsUtil; +import io.cdap.e2e.utils.PluginPropertyUtils; +import io.cdap.plugin.common.stepsdesign.TestSetupHooks; +import io.cdap.plugin.pubsub.actions.PubSubActions; +import io.cdap.plugin.utils.PubSubClient; import io.cucumber.java.en.Then; import io.cucumber.java.en.When; +import java.io.IOException; /** * PubSub Source Plugin related step design. @@ -33,6 +39,19 @@ public void sourceIsPubSub() { @Then("Open the PubSub source properties") public void openThePubSubSourceProperties() { - openSourcePluginProperties("GooglePublisher"); + openSourcePluginProperties("pubsub"); } -} + + @Then("Enter PubSub source property topic name") + public void enterPubSubSourcePropertyTopicName() { + PubSubActions.enterPubSubTopic(TestSetupHooks.pubSubSourceTopic); } + @Then("Enter PubSub source property subscription name") + public void enterPubSubSourcePropertySubscriptionName() { + PubSubActions.enterSubscription(TestSetupHooks.pubSubSourceSubscription); + } + + @Then("Publish the message") + public void publishTheMessage() throws IOException, InterruptedException { + PubSubClient.publishWithErrorHandlerExample(PluginPropertyUtils.pluginProp(ConstantsUtil.PROJECT_ID), TestSetupHooks.pubSubSourceTopic); + } +} \ No newline at end of file diff --git a/src/e2e-test/java/io/cdap/plugin/utils/PubSubClient.java b/src/e2e-test/java/io/cdap/plugin/utils/PubSubClient.java index 861e06111b..94a379a97d 100644 --- a/src/e2e-test/java/io/cdap/plugin/utils/PubSubClient.java +++ b/src/e2e-test/java/io/cdap/plugin/utils/PubSubClient.java @@ -16,12 +16,22 @@ package io.cdap.plugin.utils; +import com.google.api.core.ApiFuture; +import com.google.api.core.ApiFutureCallback; +import com.google.api.core.ApiFutures; import com.google.api.gax.rpc.AlreadyExistsException; +import com.google.api.gax.rpc.ApiException; +import com.google.cloud.pubsub.v1.AckReplyConsumer; +import com.google.cloud.pubsub.v1.MessageReceiver; +import com.google.cloud.pubsub.v1.Publisher; +import com.google.cloud.pubsub.v1.Subscriber; import com.google.cloud.pubsub.v1.SubscriptionAdminClient; import com.google.cloud.pubsub.v1.SubscriptionAdminSettings; import com.google.cloud.pubsub.v1.TopicAdminClient; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.protobuf.ByteString; +import com.google.pubsub.v1.PubsubMessage; import com.google.pubsub.v1.PushConfig; -import com.google.pubsub.v1.Subscription; import com.google.pubsub.v1.Topic; import com.google.pubsub.v1.TopicName; import io.cdap.e2e.utils.ConstantsUtil; @@ -29,6 +39,10 @@ import com.google.pubsub.v1.ProjectSubscriptionName; import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; /** * Represents PubSub client. @@ -43,7 +57,7 @@ public static Topic createTopic(String topicId) throws IOException { } // Create the subscription - public static Subscription createSubscription(String subscriptionId, String topicId) throws IOException { + public static void createSubscription(String subscriptionId, String topicId) throws IOException { ProjectSubscriptionName subscriptionName = null; try (SubscriptionAdminClient subscriptionAdminClient = SubscriptionAdminClient.create( SubscriptionAdminSettings.newBuilder().build())) { @@ -52,9 +66,9 @@ public static Subscription createSubscription(String subscriptionId, String topi subscriptionAdminClient.createSubscription(subscriptionName, topicName, PushConfig.getDefaultInstance(), 60); System.out.println("Subscription created: " + subscriptionName.toString()); } catch (AlreadyExistsException e) { + assert subscriptionName != null; System.out.println("Subscription already exists: " + subscriptionName.toString()); } - return null; } public static void deleteTopic(String topicId) throws IOException { @@ -77,4 +91,83 @@ public static String getTopicCmekKey(String topicId) throws IOException { return getTopic(topicId).getKmsKeyName(); } + + public static void publishWithErrorHandlerExample(String projectId, String topicId) + throws IOException, InterruptedException { + TopicName topicName = TopicName.of(projectId, topicId); + Publisher publisher = null; + + try { + publisher = Publisher.newBuilder(topicName).build(); + + List messages = Arrays.asList("first message", "second message"); + + for (final String message : messages) { + ByteString data = ByteString.copyFromUtf8(message); + PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build(); + ApiFuture future = publisher.publish(pubsubMessage); + + // Adding an asynchronous callback to handle success / failure + ApiFutures.addCallback( + future, + new ApiFutureCallback() { + + @Override + public void onFailure(Throwable throwable) { + if (throwable instanceof ApiException) { + ApiException apiException = ((ApiException) throwable); + // details on the API exception + // System.out.println(apiException.getStatusCode().getCode()); + //System.out.println(apiException.isRetryable()); + } + System.out.println("Error publishing message : " + message); + } + + @Override + public void onSuccess(String messageId) { + // Once published, returns server-assigned message ids (unique within the topic) + System.out.println("Published message ID: " + messageId); + } + }, + MoreExecutors.directExecutor()); + } + } finally { + if (publisher != null) { + // When finished with the publisher, shutdown to free up resources. + publisher.shutdown(); + publisher.awaitTermination(1, TimeUnit.MINUTES); + } + } + } + + public static void subscribeAsyncExample(String projectId, String subscriptionId) { + ProjectSubscriptionName subscriptionName = + ProjectSubscriptionName.of(projectId, subscriptionId); + + // Instantiate an asynchronous message receiver. + MessageReceiver receiver = + (PubsubMessage message, AckReplyConsumer consumer) -> { + // Handle incoming message, then ack the received message. + System.out.println("Id: " + message.getMessageId()); + System.out.println("Data: " + message.getData().toStringUtf8()); + consumer.ack(); + }; + + Subscriber subscriber = null; + try { + subscriber = Subscriber.newBuilder(subscriptionName, receiver).build(); + // Start the subscriber. + subscriber.startAsync().awaitRunning(); + System.out.printf("Listening for messages on %s:\n", subscriptionName.toString()); + // Allow the subscriber to run for 30s unless an unrecoverable error occurs. + subscriber.awaitTerminated(300, TimeUnit.SECONDS); + } catch (TimeoutException timeoutException) { + // Shut down the subscriber after 30s. Stop receiving messages. + subscriber.stopAsync(); + } + } + + + + } diff --git a/src/e2e-test/resources/pluginParameters.properties b/src/e2e-test/resources/pluginParameters.properties index ae191f7c90..f6d091c865 100644 --- a/src/e2e-test/resources/pluginParameters.properties +++ b/src/e2e-test/resources/pluginParameters.properties @@ -181,6 +181,8 @@ bqUpdateTableSchemaTrue=True clusterValue=transaction_date TableKey=PersonID bqSourceTable=dummy +#pubSubSourceSubscription=dummy +#pubSubSourceTopic=dummy bqCreateTableQueryFile=testdata/BigQuery/BigQueryCreateTableQuery.txt bqInsertDataQueryFile=testdata/BigQuery/BigQueryInsertDataQuery.txt bqCreateViewQueryFile=testdata/BigQuery/BigQueryCreateViewQuery.txt From 7b7718b25cbc5fe2b4f45ae160ecfd57683d08f2 Mon Sep 17 00:00:00 2001 From: psainics Date: Tue, 5 Dec 2023 10:51:52 +0530 Subject: [PATCH 03/10] Added BQ Execute Action job label support --- docs/BigQueryExecute-action.md | 8 + .../gcp/bigquery/action/BigQueryExecute.java | 35 +++- .../sink/AbstractBigQuerySinkConfig.java | 100 +-------- .../gcp/bigquery/util/BigQueryUtil.java | 104 ++++++++++ .../bigquery/sink/BigQuerySinkConfigTest.java | 186 ----------------- .../gcp/bigquery/util/BigQueryUtilTest.java | 193 ++++++++++++++++++ widgets/BigQueryExecute-action.json | 11 + widgets/BigQueryMultiTable-batchsink.json | 2 +- 8 files changed, 350 insertions(+), 289 deletions(-) diff --git a/docs/BigQueryExecute-action.md b/docs/BigQueryExecute-action.md index 961b59504b..b7129c9da2 100644 --- a/docs/BigQueryExecute-action.md +++ b/docs/BigQueryExecute-action.md @@ -30,6 +30,14 @@ write BigQuery data to this project. **SQL**: SQL command to execute. +**BQ Job Labels:** Key value pairs to be added as labels to the BigQuery job. Keys must be unique. (Macro Enabled) + +[job_source, type] are system defined labels used by CDAP for internal purpose and cannot be used as label keys. +Macro format is supported. example `key1:val1,key2:val2` + +Keys and values can contain only lowercase letters, numeric characters, underscores, and dashes. +For more information about labels, see [Docs](https://cloud.google.com/bigquery/docs/labels-intro#requirements). + **Dialect**: Dialect of the SQL command. The value must be 'legacy' or 'standard'. If set to 'standard', the query will use BigQuery's standard SQL: https://cloud.google.com/bigquery/sql-reference/. If set to 'legacy', BigQuery's legacy SQL dialect will be used for this query. diff --git a/src/main/java/io/cdap/plugin/gcp/bigquery/action/BigQueryExecute.java b/src/main/java/io/cdap/plugin/gcp/bigquery/action/BigQueryExecute.java index 78455fdb4d..87d778a89e 100644 --- a/src/main/java/io/cdap/plugin/gcp/bigquery/action/BigQueryExecute.java +++ b/src/main/java/io/cdap/plugin/gcp/bigquery/action/BigQueryExecute.java @@ -125,7 +125,7 @@ public void run(ActionContext context) throws Exception { } // Add labels for the BigQuery Execute job. - builder.setLabels(BigQueryUtil.getJobLabels(BigQueryUtil.BQ_JOB_TYPE_EXECUTE_TAG)); + builder.setLabels(BigQueryUtil.getJobLabels(BigQueryUtil.BQ_JOB_TYPE_EXECUTE_TAG, config.getJobLabelKeyValue())); QueryJobConfiguration queryConfig = builder.build(); @@ -205,6 +205,7 @@ public static final class Config extends AbstractBigQueryActionConfig { private static final String DATASET = "dataset"; private static final String TABLE = "table"; private static final String NAME_LOCATION = "location"; + public static final String NAME_BQ_JOB_LABELS = "jobLabels"; private static final int ERROR_CODE_NOT_FOUND = 404; private static final String STORE_RESULTS = "storeResults"; @@ -272,10 +273,17 @@ public static final class Config extends AbstractBigQueryActionConfig { @Description("Whether to store results in a BigQuery Table.") private Boolean storeResults; + @Name(NAME_BQ_JOB_LABELS) + @Macro + @Nullable + @Description("Key value pairs to be added as labels to the BigQuery job. Keys must be unique. [job_source, type] " + + "are reserved keys and cannot be used as label keys.") + protected String jobLabelKeyValue; + private Config(@Nullable String project, @Nullable String serviceAccountType, @Nullable String serviceFilePath, @Nullable String serviceAccountJson, @Nullable String dataset, @Nullable String table, @Nullable String location, @Nullable String cmekKey, @Nullable String dialect, @Nullable String sql, - @Nullable String mode, @Nullable Boolean storeResults) { + @Nullable String mode, @Nullable Boolean storeResults, @Nullable String jobLabelKeyValue) { this.project = project; this.serviceAccountType = serviceAccountType; this.serviceFilePath = serviceFilePath; @@ -288,6 +296,7 @@ private Config(@Nullable String project, @Nullable String serviceAccountType, @N this.sql = sql; this.mode = mode; this.storeResults = storeResults; + this.jobLabelKeyValue = jobLabelKeyValue; } public boolean isLegacySQL() { @@ -328,6 +337,11 @@ public String getTable() { return table; } + @Nullable + public String getJobLabelKeyValue() { + return jobLabelKeyValue; + } + @Override public void validate(FailureCollector failureCollector) { validate(failureCollector, Collections.emptyMap()); @@ -376,9 +390,17 @@ public void validate(FailureCollector failureCollector, Map argu validateCmekKey(failureCollector, arguments); } + if (!containsMacro(NAME_BQ_JOB_LABELS)) { + validateJobLabelKeyValue(failureCollector); + } + failureCollector.getOrThrowException(); } + void validateJobLabelKeyValue(FailureCollector failureCollector) { + BigQueryUtil.validateJobLabelKeyValue(jobLabelKeyValue, failureCollector, NAME_BQ_JOB_LABELS); + } + void validateCmekKey(FailureCollector failureCollector, Map arguments) { CryptoKeyName cmekKeyName = CmekUtils.getCmekKey(cmekKey, arguments, failureCollector); //these fields are needed to check if bucket exists or not and for location validation @@ -449,6 +471,7 @@ public static class Builder { private String sql; private String mode; private Boolean storeResults; + private String jobLabelKeyValue; public Builder setProject(@Nullable String project) { this.project = project; @@ -505,6 +528,11 @@ public Builder setSql(@Nullable String sql) { return this; } + public Builder setJobLabelKeyValue(@Nullable String jobLabelKeyValue) { + this.jobLabelKeyValue = jobLabelKeyValue; + return this; + } + public Config build() { return new Config( project, @@ -518,7 +546,8 @@ public Config build() { dialect, sql, mode, - storeResults + storeResults, + jobLabelKeyValue ); } diff --git a/src/main/java/io/cdap/plugin/gcp/bigquery/sink/AbstractBigQuerySinkConfig.java b/src/main/java/io/cdap/plugin/gcp/bigquery/sink/AbstractBigQuerySinkConfig.java index bc894cfeac..b888ea4dee 100644 --- a/src/main/java/io/cdap/plugin/gcp/bigquery/sink/AbstractBigQuerySinkConfig.java +++ b/src/main/java/io/cdap/plugin/gcp/bigquery/sink/AbstractBigQuerySinkConfig.java @@ -187,106 +187,8 @@ void validateCmekKey(FailureCollector failureCollector, Map argu validateCmekKeyLocation(cmekKeyName, null, location, failureCollector); } - /** - * Validates job label key value pairs, as per the following rules: - * Keys and values can contain only lowercase letters, numeric characters, underscores, and dashes. - * Defined in the following link: - * Docs - * @param failureCollector failure collector - */ void validateJobLabelKeyValue(FailureCollector failureCollector) { - Set reservedKeys = BigQueryUtil.BQ_JOB_LABEL_SYSTEM_KEYS; - int maxLabels = 64 - reservedKeys.size(); - int maxKeyLength = 63; - int maxValueLength = 63; - - String validLabelKeyRegex = "^[\\p{L}][a-z0-9-_\\p{L}]+$"; - String validLabelValueRegex = "^[a-z0-9-_\\p{L}]+$"; - String capitalLetterRegex = ".*[A-Z].*"; - String labelKeyValue = getJobLabelKeyValue(); - - if (Strings.isNullOrEmpty(labelKeyValue)) { - return; - } - - String[] keyValuePairs = labelKeyValue.split(","); - Set uniqueKeys = new HashSet<>(); - - for (String keyValuePair : keyValuePairs) { - - // Adding a label without a value is valid behavior - // Read more here: https://cloud.google.com/bigquery/docs/adding-labels#adding_a_label_without_a_value - String[] keyValue = keyValuePair.trim().split(":"); - boolean isKeyPresent = keyValue.length == 1 || keyValue.length == 2; - boolean isValuePresent = keyValue.length == 2; - - - if (!isKeyPresent) { - failureCollector.addFailure(String.format("Invalid job label key value pair '%s'.", keyValuePair), - "Job label key value pair should be in the format 'key:value'.") - .withConfigProperty(NAME_BQ_JOB_LABELS); - continue; - } - - // Check if key is reserved - if (reservedKeys.contains(keyValue[0])) { - failureCollector.addFailure(String.format("Invalid job label key '%s'.", keyValue[0]), - "A system label already exists with same name.").withConfigProperty(NAME_BQ_JOB_LABELS); - continue; - } - - String key = keyValue[0]; - String value = isValuePresent ? keyValue[1] : ""; - boolean isKeyValid = true; - boolean isValueValid = true; - - // Key cannot be empty - if (Strings.isNullOrEmpty(key)) { - failureCollector.addFailure(String.format("Invalid job label key '%s'.", key), - "Job label key cannot be empty.").withConfigProperty(NAME_BQ_JOB_LABELS); - isKeyValid = false; - } - - // Key cannot be longer than 63 characters - if (key.length() > maxKeyLength) { - failureCollector.addFailure(String.format("Invalid job label key '%s'.", key), - "Job label key cannot be longer than 63 characters.").withConfigProperty(NAME_BQ_JOB_LABELS); - isKeyValid = false; - } - - // Value cannot be longer than 63 characters - if (value.length() > maxValueLength) { - failureCollector.addFailure(String.format("Invalid job label value '%s'.", value), - "Job label value cannot be longer than 63 characters.").withConfigProperty(NAME_BQ_JOB_LABELS); - isValueValid = false; - } - - if (isKeyValid && (!key.matches(validLabelKeyRegex) || key.matches(capitalLetterRegex))) { - failureCollector.addFailure(String.format("Invalid job label key '%s'.", key), - "Job label key can only contain lowercase letters, numeric characters, " + - "underscores, and dashes. Check docs for more details.") - .withConfigProperty(NAME_BQ_JOB_LABELS); - isKeyValid = false; - } - - if (isValuePresent && isValueValid && - (!value.matches(validLabelValueRegex) || value.matches(capitalLetterRegex))) { - failureCollector.addFailure(String.format("Invalid job label value '%s'.", value), - "Job label value can only contain lowercase letters, numeric characters, " + - "underscores, and dashes.").withConfigProperty(NAME_BQ_JOB_LABELS); - } - - if (isKeyValid && !uniqueKeys.add(key)) { - failureCollector.addFailure(String.format("Duplicate job label key '%s'.", key), - "Job label key should be unique.").withConfigProperty(NAME_BQ_JOB_LABELS); - } - } - // Check if number of labels is greater than 64 - reserved keys - if (uniqueKeys.size() > maxLabels) { - failureCollector.addFailure("Number of job labels exceeds the limit.", - String.format("Number of job labels cannot be greater than %d.", maxLabels)) - .withConfigProperty(NAME_BQ_JOB_LABELS); - } + BigQueryUtil.validateJobLabelKeyValue(jobLabelKeyValue, failureCollector, NAME_BQ_JOB_LABELS); } public String getDatasetProject() { diff --git a/src/main/java/io/cdap/plugin/gcp/bigquery/util/BigQueryUtil.java b/src/main/java/io/cdap/plugin/gcp/bigquery/util/BigQueryUtil.java index cbde42cf04..fbcdb6398a 100644 --- a/src/main/java/io/cdap/plugin/gcp/bigquery/util/BigQueryUtil.java +++ b/src/main/java/io/cdap/plugin/gcp/bigquery/util/BigQueryUtil.java @@ -39,6 +39,7 @@ import io.cdap.cdap.etl.api.validation.InvalidConfigPropertyException; import io.cdap.cdap.etl.api.validation.InvalidStageException; import io.cdap.cdap.etl.api.validation.ValidationFailure; +import io.cdap.plugin.gcp.bigquery.sink.AbstractBigQuerySinkConfig; import io.cdap.plugin.gcp.bigquery.sink.BigQuerySink; import io.cdap.plugin.gcp.bigquery.source.BigQuerySource; import io.cdap.plugin.gcp.bigquery.source.BigQuerySourceConfig; @@ -60,6 +61,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -923,4 +925,106 @@ public static String getStagingBucketName(Map arguments, @Nullab } return bucket; } + + /** + * Validates job label key value pairs, as per the following rules: + * Keys and values can contain only lowercase letters, numeric characters, underscores, and dashes. + * Defined in the following link: + * Docs + * @param failureCollector failure collector + */ + public static void validateJobLabelKeyValue(String labelKeyValue, FailureCollector failureCollector, + String stageConfigProperty) { + Set reservedKeys = BQ_JOB_LABEL_SYSTEM_KEYS; + int maxLabels = 64 - reservedKeys.size(); + int maxKeyLength = 63; + int maxValueLength = 63; + + String validLabelKeyRegex = "^[\\p{L}][a-z0-9-_\\p{L}]+$"; + String validLabelValueRegex = "^[a-z0-9-_\\p{L}]+$"; + String capitalLetterRegex = ".*[A-Z].*"; + + if (com.google.api.client.util.Strings.isNullOrEmpty(labelKeyValue)) { + return; + } + + String[] keyValuePairs = labelKeyValue.split(","); + Set uniqueKeys = new HashSet<>(); + + for (String keyValuePair : keyValuePairs) { + + // Adding a label without a value is valid behavior + // Read more here: https://cloud.google.com/bigquery/docs/adding-labels#adding_a_label_without_a_value + String[] keyValue = keyValuePair.trim().split(":"); + boolean isKeyPresent = keyValue.length == 1 || keyValue.length == 2; + boolean isValuePresent = keyValue.length == 2; + + + if (!isKeyPresent) { + failureCollector.addFailure(String.format("Invalid job label key value pair '%s'.", keyValuePair), + "Job label key value pair should be in the format 'key:value'.") + .withConfigProperty(stageConfigProperty); + continue; + } + + // Check if key is reserved + if (reservedKeys.contains(keyValue[0])) { + failureCollector.addFailure(String.format("Invalid job label key '%s'.", keyValue[0]), + "A system label already exists with same name.").withConfigProperty(stageConfigProperty); + continue; + } + + String key = keyValue[0]; + String value = isValuePresent ? keyValue[1] : ""; + boolean isKeyValid = true; + boolean isValueValid = true; + + // Key cannot be empty + if (com.google.api.client.util.Strings.isNullOrEmpty(key)) { + failureCollector.addFailure(String.format("Invalid job label key '%s'.", key), + "Job label key cannot be empty.").withConfigProperty(stageConfigProperty); + isKeyValid = false; + } + + // Key cannot be longer than 63 characters + if (key.length() > maxKeyLength) { + failureCollector.addFailure(String.format("Invalid job label key '%s'.", key), + "Job label key cannot be longer than 63 characters.").withConfigProperty(stageConfigProperty); + isKeyValid = false; + } + + // Value cannot be longer than 63 characters + if (value.length() > maxValueLength) { + failureCollector.addFailure(String.format("Invalid job label value '%s'.", value), + "Job label value cannot be longer than 63 characters.").withConfigProperty(stageConfigProperty); + isValueValid = false; + } + + if (isKeyValid && (!key.matches(validLabelKeyRegex) || key.matches(capitalLetterRegex))) { + failureCollector.addFailure(String.format("Invalid job label key '%s'.", key), + "Job label key can only contain lowercase letters, numeric characters, " + + "underscores, and dashes. Check docs for more details.") + .withConfigProperty(stageConfigProperty); + isKeyValid = false; + } + + if (isValuePresent && isValueValid && + (!value.matches(validLabelValueRegex) || value.matches(capitalLetterRegex))) { + failureCollector.addFailure(String.format("Invalid job label value '%s'.", value), + "Job label value can only contain lowercase letters, numeric characters, " + + "underscores, and dashes.").withConfigProperty(stageConfigProperty); + } + + if (isKeyValid && !uniqueKeys.add(key)) { + failureCollector.addFailure(String.format("Duplicate job label key '%s'.", key), + "Job label key should be unique.").withConfigProperty(stageConfigProperty); + } + } + // Check if number of labels is greater than 64 - reserved keys + if (uniqueKeys.size() > maxLabels) { + failureCollector.addFailure("Number of job labels exceeds the limit.", + String.format("Number of job labels cannot be greater than %d.", maxLabels)) + .withConfigProperty(stageConfigProperty); + } + } } diff --git a/src/test/java/io/cdap/plugin/gcp/bigquery/sink/BigQuerySinkConfigTest.java b/src/test/java/io/cdap/plugin/gcp/bigquery/sink/BigQuerySinkConfigTest.java index dcfb182779..ebaa553df3 100644 --- a/src/test/java/io/cdap/plugin/gcp/bigquery/sink/BigQuerySinkConfigTest.java +++ b/src/test/java/io/cdap/plugin/gcp/bigquery/sink/BigQuerySinkConfigTest.java @@ -129,192 +129,6 @@ public void testValidateTimePartitioningColumnWithNullAndDate() throws Assert.assertEquals(0, collector.getValidationFailures().size()); } - @Test - public void testJobLabelWithDuplicateKeys() { - config.jobLabelKeyValue = "key1:value1,key2:value2,key1:value3"; - config.validate(collector); - Assert.assertEquals(1, collector.getValidationFailures().size()); - Assert.assertEquals("Duplicate job label key 'key1'.", - collector.getValidationFailures().get(0).getMessage()); - } - - @Test - public void testJobLabelWithDuplicateValues() { - config.jobLabelKeyValue = "key1:value1,key2:value2,key3:value1"; - config.validate(collector); - Assert.assertEquals(0, collector.getValidationFailures().size()); - } - - @Test - public void testJobLabelWithCapitalLetters() { - config.jobLabelKeyValue = "keY1:value1,key2:value2,key3:value1"; - config.validate(collector); - Assert.assertEquals(1, collector.getValidationFailures().size()); - Assert.assertEquals("Invalid job label key 'keY1'.", - collector.getValidationFailures().get(0).getMessage()); - } - - @Test - public void testJobLabelStartingWithCapitalLetters() { - config.jobLabelKeyValue = "Key1:value1,key2:value2,key3:value1"; - config.validate(collector); - Assert.assertEquals(1, collector.getValidationFailures().size()); - Assert.assertEquals("Invalid job label key 'Key1'.", - collector.getValidationFailures().get(0).getMessage()); - } - - @Test - public void testJobLabelWithInvalidCharacters() { - config.jobLabelKeyValue = "key1:value1,key2:value2,key3:value1@"; - config.validate(collector); - Assert.assertEquals(1, collector.getValidationFailures().size()); - Assert.assertEquals("Invalid job label value 'value1@'.", - collector.getValidationFailures().get(0).getMessage()); - } - - @Test - public void testJobLabelWithEmptyKey() { - config.jobLabelKeyValue = ":value1,key2:value2,key3:value1"; - config.validate(collector); - Assert.assertEquals(1, collector.getValidationFailures().size()); - Assert.assertEquals("Invalid job label key ''.", - collector.getValidationFailures().get(0).getMessage()); - } - - @Test - public void testJobLabelWithEmptyValue() { - config.jobLabelKeyValue = "key1:,key2:value2,key3:value1"; - config.validate(collector); - Assert.assertEquals(0, collector.getValidationFailures().size()); - } - - @Test - public void testJobLabelWithWrongFormat() { - config.jobLabelKeyValue = "key1=value1"; - config.validate(collector); - Assert.assertEquals(1, collector.getValidationFailures().size()); - Assert.assertEquals("Invalid job label key 'key1=value1'.", - collector.getValidationFailures().get(0).getMessage()); - } - - @Test - public void testJobLabelWithNull() { - config.jobLabelKeyValue = null; - config.validate(collector); - Assert.assertEquals(0, collector.getValidationFailures().size()); - } - - @Test - public void testJobLabelWithReservedKeys() { - config.jobLabelKeyValue = "job_source:value1,type:value2"; - config.validate(collector); - Assert.assertEquals(2, collector.getValidationFailures().size()); - Assert.assertEquals("Invalid job label key 'job_source'.", - collector.getValidationFailures().get(0).getMessage()); - } - - @Test - public void testJobLabelWith65Keys() { - StringBuilder sb = new StringBuilder(); - for (int i = 1; i <= 65; i++) { - String key = "key" + i; - String value = "value" + i; - sb.append(key).append(":").append(value).append(","); - } - // remove the last comma - sb.deleteCharAt(sb.length() - 1); - Assert.assertEquals(65, sb.toString().split(",").length); - config.jobLabelKeyValue = sb.toString(); - config.validate(collector); - Assert.assertEquals(1, collector.getValidationFailures().size()); - Assert.assertEquals("Number of job labels exceeds the limit.", - collector.getValidationFailures().get(0).getMessage()); - } - - @Test - public void testJobLabelWithKeyLength64() { - String key64 = "1234567890123456789012345678901234567890123456789012345678901234"; - config.jobLabelKeyValue = key64 + ":value1"; - config.validate(collector); - Assert.assertEquals(1, collector.getValidationFailures().size()); - Assert.assertEquals("Invalid job label key '" + key64 + "'.", - collector.getValidationFailures().get(0).getMessage()); - } - - @Test - public void testJobLabelWithValueLength64() { - String value64 = "1234567890123456789012345678901234567890123456789012345678901234"; - config.jobLabelKeyValue = "key1:" + value64; - config.validate(collector); - Assert.assertEquals(1, collector.getValidationFailures().size()); - Assert.assertEquals("Invalid job label value '" + value64 + "'.", - collector.getValidationFailures().get(0).getMessage()); - } - - @Test - public void testJobLabelWithKeyStartingWithNumber() { - config.jobLabelKeyValue = "1key:value1"; - config.validate(collector); - Assert.assertEquals(1, collector.getValidationFailures().size()); - Assert.assertEquals("Invalid job label key '1key'.", - collector.getValidationFailures().get(0).getMessage()); - } - - @Test - public void testJobLabelWithKeyStartingWithDash() { - config.jobLabelKeyValue = "-key:value1"; - config.validate(collector); - Assert.assertEquals(1, collector.getValidationFailures().size()); - Assert.assertEquals("Invalid job label key '-key'.", - collector.getValidationFailures().get(0).getMessage()); - } - - @Test - public void testJobLabelWithKeyStartingWithHyphen() { - config.jobLabelKeyValue = "_key:value1"; - config.validate(collector); - Assert.assertEquals(1, collector.getValidationFailures().size()); - Assert.assertEquals("Invalid job label key '_key'.", - collector.getValidationFailures().get(0).getMessage()); - } - - @Test - public void testJobLabelWithKeyWithChineseCharacter() { - config.jobLabelKeyValue = "中文:value1"; - config.validate(collector); - Assert.assertEquals(0, collector.getValidationFailures().size()); - } - - @Test - public void testJobLabelWithKeyWithJapaneseCharacter() { - config.jobLabelKeyValue = "日本語:value1"; - config.validate(collector); - Assert.assertEquals(0, collector.getValidationFailures().size()); - } - - @Test - public void testJobLabelWithValueStartingWithNumber() { - config.jobLabelKeyValue = "key:1value"; - config.validate(collector); - Assert.assertEquals(0, collector.getValidationFailures().size()); - } - - @Test - public void testJobLabelWithValueStartingWithDash() { - config.jobLabelKeyValue = "key:-value"; - config.validate(collector); - Assert.assertEquals(0, collector.getValidationFailures().size()); - } - - @Test - public void testJobLabelWithValueStartingWithCaptialLetter() { - config.jobLabelKeyValue = "key:Value"; - config.validate(collector); - Assert.assertEquals(1, collector.getValidationFailures().size()); - Assert.assertEquals("Invalid job label value 'Value'.", - collector.getValidationFailures().get(0).getMessage()); - } - @Test public void testValidateColumnNameWithValidColumnName() { String columnName = "test"; diff --git a/src/test/java/io/cdap/plugin/gcp/bigquery/util/BigQueryUtilTest.java b/src/test/java/io/cdap/plugin/gcp/bigquery/util/BigQueryUtilTest.java index 4bc7aeb299..41d9c73a46 100644 --- a/src/test/java/io/cdap/plugin/gcp/bigquery/util/BigQueryUtilTest.java +++ b/src/test/java/io/cdap/plugin/gcp/bigquery/util/BigQueryUtilTest.java @@ -21,10 +21,12 @@ import io.cdap.cdap.api.data.schema.Schema; import io.cdap.cdap.etl.api.FailureCollector; import io.cdap.cdap.etl.api.validation.ValidationFailure; +import io.cdap.cdap.etl.mock.validation.MockFailureCollector; import io.cdap.plugin.gcp.bigquery.util.BigQueryTypeSize.BigNumeric; import io.cdap.plugin.gcp.bigquery.util.BigQueryTypeSize.Numeric; import io.cdap.plugin.gcp.common.GCPUtils; import org.junit.Assert; +import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.Mockito; @@ -44,6 +46,12 @@ public class BigQueryUtilTest { private static final String BUCKET_PREFIX_ARG = "gcp.bigquery.bucket.prefix"; + MockFailureCollector collector; + + @Before + public void setUp() { + collector = new MockFailureCollector(); + } @Test public void testGetTableSchema() { @@ -277,4 +285,189 @@ public void testCRC32LocationDoesNotCollide() { } } + @Test + public void testJobLabelWithDuplicateKeys() { + String jobLabelKeyValue = "key1:value1,key2:value2,key1:value3"; + BigQueryUtil.validateJobLabelKeyValue(jobLabelKeyValue, collector, "test"); + Assert.assertEquals(1, collector.getValidationFailures().size()); + Assert.assertEquals("Duplicate job label key 'key1'.", + collector.getValidationFailures().get(0).getMessage()); + } + @Test + public void testJobLabelWithDuplicateValues() { + String jobLabelKeyValue = "key1:value1,key2:value2,key3:value1"; + BigQueryUtil.validateJobLabelKeyValue(jobLabelKeyValue, collector, "test"); + Assert.assertEquals(0, collector.getValidationFailures().size()); + } + + @Test + public void testJobLabelWithCapitalLetters() { + String jobLabelKeyValue = "keY1:value1,key2:value2,key3:value1"; + BigQueryUtil.validateJobLabelKeyValue(jobLabelKeyValue, collector, "test"); + Assert.assertEquals(1, collector.getValidationFailures().size()); + Assert.assertEquals("Invalid job label key 'keY1'.", + collector.getValidationFailures().get(0).getMessage()); + } + + @Test + public void testJobLabelStartingWithCapitalLetters() { + String jobLabelKeyValue = "Key1:value1,key2:value2,key3:value1"; + BigQueryUtil.validateJobLabelKeyValue(jobLabelKeyValue, collector, "test"); + Assert.assertEquals(1, collector.getValidationFailures().size()); + Assert.assertEquals("Invalid job label key 'Key1'.", + collector.getValidationFailures().get(0).getMessage()); + } + + @Test + public void testJobLabelWithInvalidCharacters() { + String jobLabelKeyValue = "key1:value1,key2:value2,key3:value1@"; + BigQueryUtil.validateJobLabelKeyValue(jobLabelKeyValue, collector, "test"); + Assert.assertEquals(1, collector.getValidationFailures().size()); + Assert.assertEquals("Invalid job label value 'value1@'.", + collector.getValidationFailures().get(0).getMessage()); + } + + @Test + public void testJobLabelWithEmptyKey() { + String jobLabelKeyValue = ":value1,key2:value2,key3:value1"; + BigQueryUtil.validateJobLabelKeyValue(jobLabelKeyValue, collector, "test"); + Assert.assertEquals(1, collector.getValidationFailures().size()); + Assert.assertEquals("Invalid job label key ''.", + collector.getValidationFailures().get(0).getMessage()); + } + + @Test + public void testJobLabelWithEmptyValue() { + String jobLabelKeyValue = "key1:,key2:value2,key3:value1"; + BigQueryUtil.validateJobLabelKeyValue(jobLabelKeyValue, collector, "test"); + Assert.assertEquals(0, collector.getValidationFailures().size()); + } + + @Test + public void testJobLabelWithWrongFormat() { + String jobLabelKeyValue = "key1=value1"; + BigQueryUtil.validateJobLabelKeyValue(jobLabelKeyValue, collector, "test"); + Assert.assertEquals(1, collector.getValidationFailures().size()); + Assert.assertEquals("Invalid job label key 'key1=value1'.", + collector.getValidationFailures().get(0).getMessage()); + } + + @Test + public void testJobLabelWithNull() { + String jobLabelKeyValue = null; + BigQueryUtil.validateJobLabelKeyValue(jobLabelKeyValue, collector, "test"); + Assert.assertEquals(0, collector.getValidationFailures().size()); + } + + @Test + public void testJobLabelWithReservedKeys() { + String jobLabelKeyValue = "job_source:value1,type:value2"; + BigQueryUtil.validateJobLabelKeyValue(jobLabelKeyValue, collector, "test"); + Assert.assertEquals(2, collector.getValidationFailures().size()); + Assert.assertEquals("Invalid job label key 'job_source'.", + collector.getValidationFailures().get(0).getMessage()); + } + + @Test + public void testJobLabelWith65Keys() { + StringBuilder sb = new StringBuilder(); + for (int i = 1; i <= 65; i++) { + String key = "key" + i; + String value = "value" + i; + sb.append(key).append(":").append(value).append(","); + } + // remove the last comma + sb.deleteCharAt(sb.length() - 1); + Assert.assertEquals(65, sb.toString().split(",").length); + String jobLabelKeyValue = sb.toString(); + BigQueryUtil.validateJobLabelKeyValue(jobLabelKeyValue, collector, "test"); + Assert.assertEquals(1, collector.getValidationFailures().size()); + Assert.assertEquals("Number of job labels exceeds the limit.", + collector.getValidationFailures().get(0).getMessage()); + } + + @Test + public void testJobLabelWithKeyLength64() { + String key64 = "1234567890123456789012345678901234567890123456789012345678901234"; + String jobLabelKeyValue = key64 + ":value1"; + BigQueryUtil.validateJobLabelKeyValue(jobLabelKeyValue, collector, "test"); + Assert.assertEquals(1, collector.getValidationFailures().size()); + Assert.assertEquals("Invalid job label key '" + key64 + "'.", + collector.getValidationFailures().get(0).getMessage()); + } + + @Test + public void testJobLabelWithValueLength64() { + String value64 = "1234567890123456789012345678901234567890123456789012345678901234"; + String jobLabelKeyValue = "key1:" + value64; + BigQueryUtil.validateJobLabelKeyValue(jobLabelKeyValue, collector, "test"); + Assert.assertEquals(1, collector.getValidationFailures().size()); + Assert.assertEquals("Invalid job label value '" + value64 + "'.", + collector.getValidationFailures().get(0).getMessage()); + } + + @Test + public void testJobLabelWithKeyStartingWithNumber() { + String jobLabelKeyValue = "1key:value1"; + BigQueryUtil.validateJobLabelKeyValue(jobLabelKeyValue, collector, "test"); + Assert.assertEquals(1, collector.getValidationFailures().size()); + Assert.assertEquals("Invalid job label key '1key'.", + collector.getValidationFailures().get(0).getMessage()); + } + + @Test + public void testJobLabelWithKeyStartingWithDash() { + String jobLabelKeyValue = "-key:value1"; + BigQueryUtil.validateJobLabelKeyValue(jobLabelKeyValue, collector, "test"); + Assert.assertEquals(1, collector.getValidationFailures().size()); + Assert.assertEquals("Invalid job label key '-key'.", + collector.getValidationFailures().get(0).getMessage()); + } + + @Test + public void testJobLabelWithKeyStartingWithHyphen() { + String jobLabelKeyValue = "_key:value1"; + BigQueryUtil.validateJobLabelKeyValue(jobLabelKeyValue, collector, "test"); + Assert.assertEquals(1, collector.getValidationFailures().size()); + Assert.assertEquals("Invalid job label key '_key'.", + collector.getValidationFailures().get(0).getMessage()); + } + + @Test + public void testJobLabelWithKeyWithChineseCharacter() { + String jobLabelKeyValue = "中文:value1"; + BigQueryUtil.validateJobLabelKeyValue(jobLabelKeyValue, collector, "test"); + Assert.assertEquals(0, collector.getValidationFailures().size()); + } + + @Test + public void testJobLabelWithKeyWithJapaneseCharacter() { + String jobLabelKeyValue = "日本語:value1"; + BigQueryUtil.validateJobLabelKeyValue(jobLabelKeyValue, collector, "test"); + Assert.assertEquals(0, collector.getValidationFailures().size()); + } + + @Test + public void testJobLabelWithValueStartingWithNumber() { + String jobLabelKeyValue = "key:1value"; + BigQueryUtil.validateJobLabelKeyValue(jobLabelKeyValue, collector, "test"); + Assert.assertEquals(0, collector.getValidationFailures().size()); + } + + @Test + public void testJobLabelWithValueStartingWithDash() { + String jobLabelKeyValue = "key:-value"; + BigQueryUtil.validateJobLabelKeyValue(jobLabelKeyValue, collector, "test"); + Assert.assertEquals(0, collector.getValidationFailures().size()); + } + + @Test + public void testJobLabelWithValueStartingWithCaptialLetter() { + String jobLabelKeyValue = "key:Value"; + BigQueryUtil.validateJobLabelKeyValue(jobLabelKeyValue, collector, "test"); + Assert.assertEquals(1, collector.getValidationFailures().size()); + Assert.assertEquals("Invalid job label value 'Value'.", + collector.getValidationFailures().get(0).getMessage()); + } + } diff --git a/widgets/BigQueryExecute-action.json b/widgets/BigQueryExecute-action.json index 9e8942e813..ffba65ca53 100644 --- a/widgets/BigQueryExecute-action.json +++ b/widgets/BigQueryExecute-action.json @@ -28,6 +28,17 @@ { "label": "Advanced", "properties": [ + { + "name": "jobLabels", + "label": "BQ Job Labels", + "widget-type": "keyvalue", + "widget-attributes": { + "delimiter": ",", + "kv-delimiter": ":", + "key-placeholder": "Label key", + "value-placeholder": "Label value" + } + }, { "name": "dialect", "label": "Dialect", diff --git a/widgets/BigQueryMultiTable-batchsink.json b/widgets/BigQueryMultiTable-batchsink.json index d0f3c5f1a1..be6e3f0c41 100644 --- a/widgets/BigQueryMultiTable-batchsink.json +++ b/widgets/BigQueryMultiTable-batchsink.json @@ -131,7 +131,7 @@ "label": "Advanced", "properties": [ { - "name": "jobLabel", + "name": "jobLabels", "label": "BQ Job Labels", "widget-type": "keyvalue", "widget-attributes": { From a57e9f99b6a15f9f277834df3514e4608f2fd0a7 Mon Sep 17 00:00:00 2001 From: psainics Date: Fri, 27 Oct 2023 19:56:41 +0530 Subject: [PATCH 04/10] Added BQ Execute Retry Added BQ Retry --- pom.xml | 6 + .../gcp/bigquery/action/BigQueryExecute.java | 254 ++++++++++++++++-- .../BigQueryJobExecutionException.java | 38 +++ .../bigquery/action/BigQueryExecuteTest.java | 210 +++++++++++++++ widgets/BigQueryExecute-action.json | 52 ++++ 5 files changed, 545 insertions(+), 15 deletions(-) create mode 100644 src/main/java/io/cdap/plugin/gcp/bigquery/exception/BigQueryJobExecutionException.java create mode 100644 src/test/java/io/cdap/plugin/gcp/bigquery/action/BigQueryExecuteTest.java diff --git a/pom.xml b/pom.xml index 0096ddcd84..fc710f9c03 100644 --- a/pom.xml +++ b/pom.xml @@ -98,6 +98,7 @@ 1.7.5 3.3.2 0.23.1 + 3.3.2 ${project.basedir}/src/test/java/ @@ -840,6 +841,11 @@ + + dev.failsafe + failsafe + ${failsafe.version} + diff --git a/src/main/java/io/cdap/plugin/gcp/bigquery/action/BigQueryExecute.java b/src/main/java/io/cdap/plugin/gcp/bigquery/action/BigQueryExecute.java index 87d778a89e..fbb96c1f16 100644 --- a/src/main/java/io/cdap/plugin/gcp/bigquery/action/BigQueryExecute.java +++ b/src/main/java/io/cdap/plugin/gcp/bigquery/action/BigQueryExecute.java @@ -33,8 +33,13 @@ import com.google.cloud.bigquery.TableId; import com.google.cloud.bigquery.TableResult; import com.google.cloud.kms.v1.CryptoKeyName; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Strings; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import dev.failsafe.Failsafe; +import dev.failsafe.FailsafeException; +import dev.failsafe.RetryPolicy; import io.cdap.cdap.api.annotation.Description; import io.cdap.cdap.api.annotation.Macro; import io.cdap.cdap.api.annotation.Name; @@ -43,6 +48,7 @@ import io.cdap.cdap.etl.api.action.Action; import io.cdap.cdap.etl.api.action.ActionContext; import io.cdap.cdap.etl.common.Constants; +import io.cdap.plugin.gcp.bigquery.exception.BigQueryJobExecutionException; import io.cdap.plugin.gcp.bigquery.sink.BigQuerySinkUtils; import io.cdap.plugin.gcp.bigquery.util.BigQueryUtil; import io.cdap.plugin.gcp.common.CmekUtils; @@ -51,8 +57,10 @@ import org.slf4j.LoggerFactory; import java.io.IOException; +import java.time.Duration; import java.util.Collections; import java.util.Map; +import java.util.Set; import javax.annotation.Nullable; /** @@ -69,8 +77,18 @@ public final class BigQueryExecute extends AbstractBigQueryAction { private static final Logger LOG = LoggerFactory.getLogger(BigQueryExecute.class); public static final String NAME = "BigQueryExecute"; private static final String RECORDS_PROCESSED = "records.processed"; - private Config config; + private static final String JOB_BACKEND_ERROR = "jobBackendError"; + private static final String JOB_INTERNAL_ERROR = "jobInternalError"; + private static final Set RETRY_ON_REASON = ImmutableSet.of(JOB_BACKEND_ERROR, JOB_INTERNAL_ERROR); + + BigQueryExecute() { + // no args constructor + } + @VisibleForTesting + BigQueryExecute(Config config) { + this.config = config; + } @Override public void run(ActionContext context) throws Exception { @@ -103,9 +121,6 @@ public void run(ActionContext context) throws Exception { // Enable legacy SQL builder.setUseLegacySql(config.isLegacySQL()); - // Location must match that of the dataset(s) referenced in the query. - JobId jobId = JobId.newBuilder().setRandomJob().setLocation(config.getLocation()).build(); - // API request - starts the query. Credentials credentials = config.getServiceAccount() == null ? null : GCPUtils.loadServiceAccountCredentials(config.getServiceAccount(), @@ -129,19 +144,74 @@ public void run(ActionContext context) throws Exception { QueryJobConfiguration queryConfig = builder.build(); - Job queryJob = bigQuery.create(JobInfo.newBuilder(queryConfig).setJobId(jobId).build()); + // Exponential backoff + if (config.getRetryOnBackendError()) { + try { + executeQueryWithExponentialBackoff(bigQuery, queryConfig, context); + } catch (Throwable e) { + throw new RuntimeException(e); + } + } else { + executeQuery(bigQuery, queryConfig, context); + } + } + + protected void executeQueryWithExponentialBackoff(BigQuery bigQuery, + QueryJobConfiguration queryConfig, ActionContext context) + throws Throwable { + try { + Failsafe.with(getRetryPolicy()).run(() -> executeQuery(bigQuery, queryConfig, context)); + } catch (FailsafeException e) { + if (e.getCause() != null) { + throw e.getCause(); + } + throw e; + } + } + + private RetryPolicy getRetryPolicy() { + return RetryPolicy.builder() + .handle(BigQueryJobExecutionException.class) + .withBackoff(Duration.ofSeconds(config.getInitialRetryDuration()), + Duration.ofSeconds(config.getMaxRetryDuration()), config.getRetryMultiplier()) + .withMaxRetries(config.getMaxRetryCount()) + .onRetry(event -> LOG.debug("Retrying BigQuery Execute job. Retry count: {}", event.getAttemptCount())) + .onSuccess(event -> LOG.debug("BigQuery Execute job executed successfully.")) + .onRetriesExceeded(event -> LOG.error("Retry limit reached for BigQuery Execute job.")) + .build(); + } + + private void executeQuery(BigQuery bigQuery, QueryJobConfiguration queryConfig, ActionContext context) + throws InterruptedException, BigQueryJobExecutionException { + // Location must match that of the dataset(s) referenced in the query. + JobId jobId = JobId.newBuilder().setRandomJob().setLocation(config.getLocation()).build(); + Job queryJob; + + try { + queryJob = bigQuery.create(JobInfo.newBuilder(queryConfig).setJobId(jobId).build()); - LOG.info("Executing SQL as job {}.", jobId.getJob()); - LOG.debug("The BigQuery SQL is {}", config.getSql()); + LOG.info("Executing SQL as job {}.", jobId.getJob()); + LOG.debug("The BigQuery SQL is {}", config.getSql()); - // Wait for the query to complete - queryJob = queryJob.waitFor(); + // Wait for the query to complete + queryJob = queryJob.waitFor(); + } catch (BigQueryException e) { + LOG.error("The query job {} failed. Error: {}", jobId.getJob(), e.getError().getMessage()); + if (RETRY_ON_REASON.contains(e.getError().getReason())) { + throw new BigQueryJobExecutionException(e.getError().getMessage(), e); + } + throw new RuntimeException(e); + } // Check for errors if (queryJob.getStatus().getError() != null) { // You can also look at queryJob.getStatus().getExecutionErrors() for all // errors, not just the latest one. - throw new RuntimeException(queryJob.getStatus().getExecutionErrors().toString()); + LOG.error("The query job {} failed. Error: {}", jobId.getJob(), queryJob.getStatus().getError()); + if (RETRY_ON_REASON.contains(queryJob.getStatus().getError().getReason())) { + throw new BigQueryJobExecutionException(queryJob.getStatus().getError().getMessage()); + } + throw new RuntimeException(queryJob.getStatus().getError().getMessage()); } TableResult queryResults = queryJob.getQueryResults(); @@ -181,14 +251,14 @@ public void run(ActionContext context) throws Exception { private void recordBytesProcessedMetric(ActionContext context, Job queryJob) { long processedBytes = - ((JobStatistics.QueryStatistics) queryJob.getStatistics()).getTotalBytesProcessed(); + ((JobStatistics.QueryStatistics) queryJob.getStatistics()).getTotalBytesProcessed(); LOG.info("Job {} processed {} bytes", queryJob.getJobId(), processedBytes); Map tags = new ImmutableMap.Builder() .put(Constants.Metrics.Tag.APP_ENTITY_TYPE, Action.PLUGIN_TYPE) .put(Constants.Metrics.Tag.APP_ENTITY_TYPE_NAME, BigQueryExecute.NAME) .build(); context.getMetrics().child(tags).countLong(BigQuerySinkUtils.BYTES_PROCESSED_METRIC, - processedBytes); + processedBytes); } @Override @@ -208,6 +278,16 @@ public static final class Config extends AbstractBigQueryActionConfig { public static final String NAME_BQ_JOB_LABELS = "jobLabels"; private static final int ERROR_CODE_NOT_FOUND = 404; private static final String STORE_RESULTS = "storeResults"; + private static final String NAME_RETRY_ON_BACKEND_ERROR = "retryOnBackendError"; + private static final String NAME_INITIAL_RETRY_DURATION = "initialRetryDuration"; + private static final String NAME_MAX_RETRY_DURATION = "maxRetryDuration"; + private static final String NAME_RETRY_MULTIPLIER = "retryMultiplier"; + private static final String NAME_MAX_RETRY_COUNT = "maxRetryCount"; + public static final long DEFAULT_INITIAL_RETRY_DURATION_SECONDS = 1L; + public static final double DEFAULT_RETRY_MULTIPLIER = 2.0; + public static final int DEFAULT_MAX_RETRY_COUNT = 5; + // Sn = a * (1 - r^n) / (r - 1) + public static final long DEFULT_MAX_RETRY_DURATION_SECONDS = 63L; @Description("Dialect of the SQL command. The value must be 'legacy' or 'standard'. " + "If set to 'standard', the query will use BigQuery's standard SQL: " + @@ -268,6 +348,36 @@ public static final class Config extends AbstractBigQueryActionConfig { @Macro private String rowAsArguments; + @Name(NAME_RETRY_ON_BACKEND_ERROR) + @Description("Whether to retry on backend error. Default is false.") + @Macro + @Nullable + private Boolean retryOnBackendError; + + @Name(NAME_INITIAL_RETRY_DURATION) + @Description("Time taken for the first retry. Default is 1 seconds.") + @Nullable + @Macro + private Long initialRetryDuration; + + @Name(NAME_MAX_RETRY_DURATION) + @Description("Maximum time in seconds retries can take. Default is 32 seconds.") + @Nullable + @Macro + private Long maxRetryDuration; + + @Name(NAME_MAX_RETRY_COUNT) + @Description("Maximum number of retries allowed. Default is 5.") + @Nullable + @Macro + private Integer maxRetryCount; + + @Name(NAME_RETRY_MULTIPLIER) + @Description("Multiplier for exponential backoff. Default is 2.") + @Nullable + @Macro + private Double retryMultiplier; + @Name(STORE_RESULTS) @Nullable @Description("Whether to store results in a BigQuery Table.") @@ -283,7 +393,10 @@ public static final class Config extends AbstractBigQueryActionConfig { private Config(@Nullable String project, @Nullable String serviceAccountType, @Nullable String serviceFilePath, @Nullable String serviceAccountJson, @Nullable String dataset, @Nullable String table, @Nullable String location, @Nullable String cmekKey, @Nullable String dialect, @Nullable String sql, - @Nullable String mode, @Nullable Boolean storeResults, @Nullable String jobLabelKeyValue) { + @Nullable String mode, @Nullable Boolean storeResults, @Nullable String jobLabelKeyValue, + @Nullable String rowAsArguments, @Nullable Boolean retryOnBackendError, + @Nullable Long initialRetryDuration, @Nullable Long maxRetryDuration, + @Nullable Double retryMultiplier, @Nullable Integer maxRetryCount) { this.project = project; this.serviceAccountType = serviceAccountType; this.serviceFilePath = serviceFilePath; @@ -295,8 +408,14 @@ private Config(@Nullable String project, @Nullable String serviceAccountType, @N this.dialect = dialect; this.sql = sql; this.mode = mode; + this.rowAsArguments = rowAsArguments; this.storeResults = storeResults; this.jobLabelKeyValue = jobLabelKeyValue; + this.retryOnBackendError = retryOnBackendError; + this.initialRetryDuration = initialRetryDuration; + this.maxRetryDuration = maxRetryDuration; + this.maxRetryCount = maxRetryCount; + this.retryMultiplier = retryMultiplier; } public boolean isLegacySQL() { @@ -342,6 +461,26 @@ public String getJobLabelKeyValue() { return jobLabelKeyValue; } + public boolean getRetryOnBackendError() { + return retryOnBackendError == null || retryOnBackendError; + } + + public long getInitialRetryDuration() { + return initialRetryDuration == null ? DEFAULT_INITIAL_RETRY_DURATION_SECONDS : initialRetryDuration; + } + + public long getMaxRetryDuration() { + return maxRetryDuration == null ? DEFULT_MAX_RETRY_DURATION_SECONDS : maxRetryDuration; + } + + public double getRetryMultiplier() { + return retryMultiplier == null ? DEFAULT_RETRY_MULTIPLIER : retryMultiplier; + } + + public int getMaxRetryCount() { + return maxRetryCount == null ? DEFAULT_MAX_RETRY_COUNT : maxRetryCount; + } + @Override public void validate(FailureCollector failureCollector) { validate(failureCollector, Collections.emptyMap()); @@ -399,6 +538,45 @@ public void validate(FailureCollector failureCollector, Map argu void validateJobLabelKeyValue(FailureCollector failureCollector) { BigQueryUtil.validateJobLabelKeyValue(jobLabelKeyValue, failureCollector, NAME_BQ_JOB_LABELS); + // Verify retry configuration when retry on backend error is enabled and none of the retry configuration + // properties are macros. + if (!containsMacro(NAME_RETRY_ON_BACKEND_ERROR) && retryOnBackendError != null && retryOnBackendError && + !containsMacro(NAME_INITIAL_RETRY_DURATION) && !containsMacro(NAME_MAX_RETRY_DURATION) && + !containsMacro(NAME_MAX_RETRY_COUNT) && !containsMacro(NAME_RETRY_MULTIPLIER)) { + validateRetryConfiguration( + failureCollector, initialRetryDuration, maxRetryDuration, maxRetryCount, retryMultiplier + ); + } + failureCollector.getOrThrowException(); + } + + void validateRetryConfiguration(FailureCollector failureCollector, Long initialRetryDuration, + Long maxRetryDuration, Integer maxRetryCount, Double retryMultiplier) { + if (initialRetryDuration != null && initialRetryDuration <= 0) { + failureCollector.addFailure("Initial retry duration must be greater than 0.", + "Please specify a valid initial retry duration.") + .withConfigProperty(NAME_INITIAL_RETRY_DURATION); + } + if (maxRetryDuration != null && maxRetryDuration <= 0) { + failureCollector.addFailure("Max retry duration must be greater than 0.", + "Please specify a valid max retry duration.") + .withConfigProperty(NAME_MAX_RETRY_DURATION); + } + if (maxRetryCount != null && maxRetryCount <= 0) { + failureCollector.addFailure("Max retry count must be greater than 0.", + "Please specify a valid max retry count.") + .withConfigProperty(NAME_MAX_RETRY_COUNT); + } + if (retryMultiplier != null && retryMultiplier <= 1) { + failureCollector.addFailure("Retry multiplier must be strictly greater than 1.", + "Please specify a valid retry multiplier.") + .withConfigProperty(NAME_RETRY_MULTIPLIER); + } + if (maxRetryDuration != null && initialRetryDuration != null && maxRetryDuration <= initialRetryDuration) { + failureCollector.addFailure("Max retry duration must be greater than initial retry duration.", + "Please specify a valid max retry duration.") + .withConfigProperty(NAME_MAX_RETRY_DURATION); + } } void validateCmekKey(FailureCollector failureCollector, Map arguments) { @@ -470,8 +648,14 @@ public static class Builder { private String dialect; private String sql; private String mode; + private String rowAsArguments; private Boolean storeResults; private String jobLabelKeyValue; + private Boolean retryOnBackendError; + private Long initialRetryDuration; + private Long maxRetryDuration; + private Integer maxRetryCount; + private Double retryMultiplier; public Builder setProject(@Nullable String project) { this.project = project; @@ -523,6 +707,11 @@ public Builder setMode(@Nullable String mode) { return this; } + public Builder setRowAsArguments(@Nullable String rowAsArguments) { + this.rowAsArguments = rowAsArguments; + return this; + } + public Builder setSql(@Nullable String sql) { this.sql = sql; return this; @@ -533,6 +722,36 @@ public Builder setJobLabelKeyValue(@Nullable String jobLabelKeyValue) { return this; } + public Builder setRetryOnBackendError(@Nullable Boolean retryOnBackendError) { + this.retryOnBackendError = retryOnBackendError; + return this; + } + + public Builder setStoreResults(@Nullable Boolean storeResults) { + this.storeResults = storeResults; + return this; + } + + public Builder setInitialRetryDuration(@Nullable Long initialRetryDuration) { + this.initialRetryDuration = initialRetryDuration; + return this; + } + + public Builder setMaxRetryDuration(@Nullable Long maxRetryDuration) { + this.maxRetryDuration = maxRetryDuration; + return this; + } + + public Builder setMaxRetryCount(@Nullable Integer maxRetryCount) { + this.maxRetryCount = maxRetryCount; + return this; + } + + public Builder setRetryMultiplier(@Nullable Double retryMultiplier) { + this.retryMultiplier = retryMultiplier; + return this; + } + public Config build() { return new Config( project, @@ -547,10 +766,15 @@ public Config build() { sql, mode, storeResults, - jobLabelKeyValue + jobLabelKeyValue, + rowAsArguments, + retryOnBackendError, + initialRetryDuration, + maxRetryDuration, + retryMultiplier, + maxRetryCount ); } - } } } diff --git a/src/main/java/io/cdap/plugin/gcp/bigquery/exception/BigQueryJobExecutionException.java b/src/main/java/io/cdap/plugin/gcp/bigquery/exception/BigQueryJobExecutionException.java new file mode 100644 index 0000000000..26861535cf --- /dev/null +++ b/src/main/java/io/cdap/plugin/gcp/bigquery/exception/BigQueryJobExecutionException.java @@ -0,0 +1,38 @@ +/* + * Copyright © 2023 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.gcp.bigquery.exception; + +/** + * Custom exception class for handling errors related to BigQuery job execution. + * This exception should be thrown when an issue occurs during the execution of a BigQuery job, + * and the calling code should consider retrying the operation. + */ +public class BigQueryJobExecutionException extends Exception { + /** + * Constructs a new BigQueryJobExecutionException with the specified detail message. + * + * @param message The detail message that describes the exception. + */ + public BigQueryJobExecutionException(String message) { + super(message); + } + + public BigQueryJobExecutionException(String message, Throwable cause) { + super(message, cause); + } +} + diff --git a/src/test/java/io/cdap/plugin/gcp/bigquery/action/BigQueryExecuteTest.java b/src/test/java/io/cdap/plugin/gcp/bigquery/action/BigQueryExecuteTest.java new file mode 100644 index 0000000000..42bce3590d --- /dev/null +++ b/src/test/java/io/cdap/plugin/gcp/bigquery/action/BigQueryExecuteTest.java @@ -0,0 +1,210 @@ +/* + * Copyright © 2023 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.gcp.bigquery.action; + +import com.google.cloud.bigquery.BigQuery; +import com.google.cloud.bigquery.BigQueryError; +import com.google.cloud.bigquery.Job; +import com.google.cloud.bigquery.JobId; +import com.google.cloud.bigquery.JobInfo; +import com.google.cloud.bigquery.JobStatistics; +import com.google.cloud.bigquery.JobStatus; +import com.google.cloud.bigquery.QueryJobConfiguration; +import com.google.cloud.bigquery.TableResult; +import io.cdap.cdap.api.metrics.Metrics; +import io.cdap.cdap.etl.api.StageMetrics; +import io.cdap.cdap.etl.api.action.ActionContext; + +import io.cdap.cdap.etl.mock.validation.MockFailureCollector; +import io.cdap.plugin.gcp.bigquery.exception.BigQueryJobExecutionException; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.MockitoAnnotations; + +public class BigQueryExecuteTest { + @Mock + BigQuery bigQuery; + @Mock + Job queryJob; + @Mock + JobStatus jobStatus; + @Mock + BigQueryError bigQueryError; + @Mock + TableResult queryResults; + @Mock + JobStatistics.QueryStatistics queryStatistics; + @Mock + ActionContext context; + @Mock + StageMetrics stageMetrics; + @Mock + Metrics metrics; + QueryJobConfiguration queryJobConfiguration; + BigQueryExecute.Config config; + JobInfo jobInfo; + JobId jobId; + BigQueryExecute bq; + MockFailureCollector failureCollector; + // Mock error message that will be returned by BigQuery when job fails to execute + String mockErrorMessageNoRetry = "Job execution failed with error: $error"; + String errorMessageRetryExhausted = "Failed to execute BigQuery job. Reason: Retries exhausted."; + + @Before + public void setUp() throws InterruptedException, NoSuchMethodException { + MockitoAnnotations.initMocks(this); + failureCollector = new MockFailureCollector(); + queryJobConfiguration = QueryJobConfiguration.newBuilder("select * from test").build(); + config = BigQueryExecute.Config.builder() + .setLocation("US").setProject("testProject").setRowAsArguments("false") + .setInitialRetryDuration(1L).setMaxRetryDuration(5L) + .setMaxRetryCount(1).setRetryMultiplier(2.0).build(); + jobId = JobId.newBuilder().setRandomJob().setLocation(config.getLocation()).build(); + jobInfo = JobInfo.newBuilder(queryJobConfiguration).setJobId(jobId).build(); + bq = new BigQueryExecute(config); + + // Mock Job Creation + Mockito.when(bigQuery.create((JobInfo) Mockito.any())).thenReturn(queryJob); + Mockito.when(queryJob.waitFor()).thenReturn(queryJob); + Mockito.when(queryJob.getStatus()).thenReturn(jobStatus); + Mockito.when(jobStatus.getError()).thenReturn(bigQueryError); + Mockito.when(bigQueryError.getMessage()).thenReturn(mockErrorMessageNoRetry); + + // Mock Successful Query + Mockito.when(queryJob.getQueryResults()).thenReturn(queryResults); + Mockito.when(queryResults.getTotalRows()).thenReturn(1L); + Mockito.when(queryJob.getStatistics()).thenReturn(queryStatistics); + Mockito.when(queryStatistics.getTotalBytesProcessed()).thenReturn(1L); + + // Mock context + Mockito.when(context.getMetrics()).thenReturn(stageMetrics); + Mockito.doNothing().when(stageMetrics).gauge(Mockito.anyString(), Mockito.anyLong()); + Mockito.when(stageMetrics.child(Mockito.any())).thenReturn(metrics); + Mockito.doNothing().when(metrics).countLong(Mockito.anyString(), Mockito.anyLong()); + + } + + @Test + public void testExecuteQueryWithExponentialBackoffFailsWithNonRetryError() { + Mockito.when(bigQueryError.getReason()).thenReturn("accessDenied"); + Exception exception = Assert.assertThrows(java.lang.RuntimeException.class, () -> { + bq.executeQueryWithExponentialBackoff(bigQuery, queryJobConfiguration, context); + }); + String actualMessage = exception.getMessage(); + Assert.assertEquals(mockErrorMessageNoRetry, actualMessage); + } + @Test + public void testExecuteQueryWithExponentialBackoffFailsRetryError() { + Mockito.when(bigQueryError.getReason()).thenReturn("jobBackendError"); + Mockito.when(bigQueryError.getMessage()).thenReturn(errorMessageRetryExhausted); + Exception exception = Assert.assertThrows(BigQueryJobExecutionException.class, () -> { + bq.executeQueryWithExponentialBackoff(bigQuery, queryJobConfiguration, context); + }); + String actualMessage = exception.getMessage(); + Assert.assertEquals(errorMessageRetryExhausted, actualMessage); + } + + @Test + public void testExecuteQueryWithExponentialBackoffSuccess() + throws Throwable { + Mockito.when(jobStatus.getError()).thenReturn(null); + Mockito.when(queryJob.getQueryResults()).thenReturn(queryResults); + bq.executeQueryWithExponentialBackoff(bigQuery, queryJobConfiguration, context); + } + + @Test + public void testValidateRetryConfigurationWithDefaultValues() { + config.validateRetryConfiguration(failureCollector, + BigQueryExecute.Config.DEFAULT_INITIAL_RETRY_DURATION_SECONDS, + BigQueryExecute.Config.DEFULT_MAX_RETRY_DURATION_SECONDS, + BigQueryExecute.Config.DEFAULT_MAX_RETRY_COUNT, + BigQueryExecute.Config.DEFAULT_RETRY_MULTIPLIER); + Assert.assertEquals(0, failureCollector.getValidationFailures().size()); + } + + @Test + public void testValidateRetryConfigurationWithInvalidInitialRetryDuration() { + config.validateRetryConfiguration(failureCollector, -1L, + BigQueryExecute.Config.DEFULT_MAX_RETRY_DURATION_SECONDS, + BigQueryExecute.Config.DEFAULT_MAX_RETRY_COUNT, + BigQueryExecute.Config.DEFAULT_RETRY_MULTIPLIER); + Assert.assertEquals(1, failureCollector.getValidationFailures().size()); + Assert.assertEquals("Initial retry duration must be greater than 0.", + failureCollector.getValidationFailures().get(0).getMessage()); + } + + @Test + public void testValidateRetryConfigurationWithInvalidMaxRetryDuration() { + config.validateRetryConfiguration(failureCollector, + BigQueryExecute.Config.DEFAULT_INITIAL_RETRY_DURATION_SECONDS, -1L, + BigQueryExecute.Config.DEFAULT_MAX_RETRY_COUNT, + BigQueryExecute.Config.DEFAULT_RETRY_MULTIPLIER); + Assert.assertEquals(2, failureCollector.getValidationFailures().size()); + Assert.assertEquals("Max retry duration must be greater than 0.", + failureCollector.getValidationFailures().get(0).getMessage()); + Assert.assertEquals("Max retry duration must be greater than initial retry duration.", + failureCollector.getValidationFailures().get(1).getMessage()); + } + + @Test + public void testValidateRetryConfigurationWithInvalidRetryMultiplier() { + config.validateRetryConfiguration(failureCollector, + BigQueryExecute.Config.DEFAULT_INITIAL_RETRY_DURATION_SECONDS, + BigQueryExecute.Config.DEFULT_MAX_RETRY_DURATION_SECONDS, + BigQueryExecute.Config.DEFAULT_MAX_RETRY_COUNT, -1.0); + Assert.assertEquals(1, failureCollector.getValidationFailures().size()); + Assert.assertEquals("Retry multiplier must be strictly greater than 1.", + failureCollector.getValidationFailures().get(0).getMessage()); + } + + @Test + public void testValidateRetryConfigurationWithInvalidRetryMultiplierAndMaxRetryCount() { + config.validateRetryConfiguration(failureCollector, + BigQueryExecute.Config.DEFAULT_INITIAL_RETRY_DURATION_SECONDS, + BigQueryExecute.Config.DEFULT_MAX_RETRY_DURATION_SECONDS, -1, + BigQueryExecute.Config.DEFAULT_RETRY_MULTIPLIER); + Assert.assertEquals(1, failureCollector.getValidationFailures().size()); + Assert.assertEquals("Max retry count must be greater than 0.", + failureCollector.getValidationFailures().get(0).getMessage()); + } + + @Test + public void testValidateRetryConfigurationWithMultiplierOne() { + config.validateRetryConfiguration(failureCollector, + BigQueryExecute.Config.DEFAULT_INITIAL_RETRY_DURATION_SECONDS, + BigQueryExecute.Config.DEFULT_MAX_RETRY_DURATION_SECONDS, + BigQueryExecute.Config.DEFAULT_MAX_RETRY_COUNT, 1.0); + Assert.assertEquals(1, failureCollector.getValidationFailures().size()); + Assert.assertEquals("Retry multiplier must be strictly greater than 1.", + failureCollector.getValidationFailures().get(0).getMessage()); + } + + @Test + public void testValidateRetryConfigurationWithMaxRetryLessThanInitialRetry() { + config.validateRetryConfiguration(failureCollector, 10L, 5L, + BigQueryExecute.Config.DEFAULT_MAX_RETRY_COUNT, + BigQueryExecute.Config.DEFAULT_RETRY_MULTIPLIER); + Assert.assertEquals(1, failureCollector.getValidationFailures().size()); + Assert.assertEquals("Max retry duration must be greater than initial retry duration.", + failureCollector.getValidationFailures().get(0).getMessage()); + } + +} + diff --git a/widgets/BigQueryExecute-action.json b/widgets/BigQueryExecute-action.json index ffba65ca53..67dccbf8a1 100644 --- a/widgets/BigQueryExecute-action.json +++ b/widgets/BigQueryExecute-action.json @@ -202,6 +202,58 @@ "widget-type": "textbox", "label": "Service Account JSON", "name": "serviceAccountJSON" + }, + { + "widget-type": "hidden", + "label": "Retry On Backend Error", + "name": "retryOnBackendError", + "widget-attributes": { + "on": { + "value": "true", + "label": "YES" + }, + "off": { + "value": "false", + "label": "NO" + }, + "default": "true" + } + }, + { + "widget-type": "hidden", + "label": "Initial Retry Duration (Seconds)", + "name": "initialRetryDuration", + "widget-attributes": { + "default": "1", + "minimum": "1" + } + }, + { + "widget-type": "hidden", + "label": "Max Retry Duration (Seconds)", + "name": "maxRetryDuration", + "widget-attributes": { + "default": "32", + "minimum": "1" + } + }, + { + "widget-type": "hidden", + "label": "Max Retry Count", + "name": "maxRetryCount", + "widget-attributes": { + "default": "5", + "minimum": "1" + } + }, + { + "widget-type": "hidden", + "label": "Retry Multiplier", + "name": "retryMultiplier", + "widget-attributes": { + "default": "2", + "placeholder": "The multiplier to use on retry attempts." + } } ] } From 79e431688869163dfd8197e1fcf752736ddfe558 Mon Sep 17 00:00:00 2001 From: priyabhatnagar Date: Fri, 15 Dec 2023 14:57:21 +0530 Subject: [PATCH 05/10] pubsub additional scenarios --- .../PubSubToPubSub.feature | 29 +++++++++++++++++++ .../common/stepsdesign/TestSetupHooks.java | 29 +------------------ 2 files changed, 30 insertions(+), 28 deletions(-) create mode 100644 src/e2e-test/features/pubsub/pubsubadditionalfeature/PubSubToPubSub.feature diff --git a/src/e2e-test/features/pubsub/pubsubadditionalfeature/PubSubToPubSub.feature b/src/e2e-test/features/pubsub/pubsubadditionalfeature/PubSubToPubSub.feature new file mode 100644 index 0000000000..f1303b5788 --- /dev/null +++ b/src/e2e-test/features/pubsub/pubsubadditionalfeature/PubSubToPubSub.feature @@ -0,0 +1,29 @@ +# Copyright © 2023 Cask Data, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may not +# use this file except in compliance with the License. You may obtain a copy of +# the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations under +# the License. + +@PubSub_Source +Feature: PubSub Source - Verification of PubSub to PubSub successful data transfer in different formats. + + @PUBSUB_SOURCE_TEST @PUBSUB_SUBSCRIPTION_TEST + Scenario: Validate successful transfer of records from PubSub(source) to PubSub(sink) having parquet format in both source and sink plugins. + Given Open Datafusion Project to configure pipeline + When Select data pipeline type as: "Data Pipeline - Realtime" + When Expand Plugin group in the LHS plugins list: "Source" + When Select plugin: "Pub/Sub" from the plugins list as: "Source" + When Expand Plugin group in the LHS plugins list: "Sink" + When Select plugin: "Pub/Sub" from the plugins list as: "Sink" + Then Enter input plugin property: "referenceName" with value: "PubsubReferenceName" + And Replace input plugin property: "project" with value: "projectId" + Then Enter input plugin property: "subscription" with value: "pubSubSourceSubscription" + Then Enter input plugin property: "topic" with value: "pubSubSourceTopic" diff --git a/src/e2e-test/java/io/cdap/plugin/common/stepsdesign/TestSetupHooks.java b/src/e2e-test/java/io/cdap/plugin/common/stepsdesign/TestSetupHooks.java index 0b564f587f..665b888c69 100644 --- a/src/e2e-test/java/io/cdap/plugin/common/stepsdesign/TestSetupHooks.java +++ b/src/e2e-test/java/io/cdap/plugin/common/stepsdesign/TestSetupHooks.java @@ -488,34 +488,7 @@ public static void createTargetPubSubTopic() { BeforeActions.scenario.write("Target PubSub topic " + pubSubTargetTopic); } - @Before(order = 1, value = "@PUBSUB_SOURCE_TEST") - public static void createSourcePubSubTopic() throws IOException { - pubSubSourceTopic = "cdf-e2e-test-" + UUID.randomUUID(); - PubSubClient.createTopic(pubSubSourceTopic); - BeforeActions.scenario.write("Source PubSub topic " + pubSubSourceTopic); - } - - @Before(order = 1, value = "@PUBSUB_SUBSCRIPTION_TEST") - public static void createSubscriptionPubSubTopic() { - pubSubSourceSubscription = "cdf-e2e-test-" + UUID.randomUUID(); - BeforeActions.scenario.write("Source PubSub subscription " + pubSubSourceSubscription); - } - @After(order = 1, value = "") - public static void deleteSourcePubSubTopic() { - try { - PubSubClient.deleteTopic(pubSubSourceTopic); - BeforeActions.scenario.write("Deleted Source PubSub topic " + pubSubSourceTopic); - pubSubSourceTopic = StringUtils.EMPTY; - } catch (Exception e) { - if (e.getMessage().contains("Invalid resource name given") || e.getMessage().contains("Resource not found")) { - BeforeActions.scenario.write("Source PubSub topic " + pubSubSourceTopic + " does not exist."); - } else { - Assert.fail(e.getMessage()); - } - } - } - - @After(order = 1, value = "") + @After(order = 1, value = "@PUBSUB_SINK_TEST") public static void deleteTargetPubSubTopic() { try { PubSubClient.deleteTopic(pubSubTargetTopic); From 9506fba2a95d8624050957b43504b87671ce5c39 Mon Sep 17 00:00:00 2001 From: priyabhatnagar Date: Thu, 4 Jan 2024 15:42:15 +0530 Subject: [PATCH 06/10] pubsub scenarios --- .../features/pubsub/sink/BQToPubSub.feature | 18 +++++------ .../pubsub/locators/PubSubLocators.java | 15 ++++++++++ .../plugin/pubsub/stepsdesign/PubSubSink.java | 30 +++++++++++++++++-- .../pubsub/stepsdesign/PubSubSource.java | 8 +++-- .../io/cdap/plugin/utils/PubSubClient.java | 25 ++++++---------- 5 files changed, 65 insertions(+), 31 deletions(-) diff --git a/src/e2e-test/features/pubsub/sink/BQToPubSub.feature b/src/e2e-test/features/pubsub/sink/BQToPubSub.feature index adb73090c1..3fdab2d644 100644 --- a/src/e2e-test/features/pubsub/sink/BQToPubSub.feature +++ b/src/e2e-test/features/pubsub/sink/BQToPubSub.feature @@ -61,20 +61,18 @@ Feature: PubSub-Sink - Verification of BigQuery to PubSub successful data transf Then Enter PubSub property projectId "projectId" Then Override Service account details if set in environment variables Then Enter PubSub property reference name - Then Enter PubSub source property topic name + Then Enter PubSub sink property topic name Then Validate "Pub/Sub" plugin properties And Close the Plugin Properties page - + And Click on configure button + And Click on pipeline config + And Click on batch time and select format Then Save the pipeline - Then Preview and run the pipeline - Then Wait till pipeline preview is in running state - Then Open and capture pipeline preview logs - Then Close the pipeline logs - Then Close the preview Then Deploy the pipeline Then Run the Pipeline in Runtime - # Then Wait till pipeline is in running state - #Then Open and capture logs - #Then Verify the pipeline status is "Succeeded" + Then Wait for pipeline to be in status: "Running" with a timeout of 240 seconds Then Publish the message Then Subscribe to the messages + Then Validate OUT record count is equal to IN record count + And Stop the pipeline + Then Verify the pipeline status is "Stopped" diff --git a/src/e2e-test/java/io/cdap/plugin/pubsub/locators/PubSubLocators.java b/src/e2e-test/java/io/cdap/plugin/pubsub/locators/PubSubLocators.java index 0019d5c3b1..a7589eb9db 100644 --- a/src/e2e-test/java/io/cdap/plugin/pubsub/locators/PubSubLocators.java +++ b/src/e2e-test/java/io/cdap/plugin/pubsub/locators/PubSubLocators.java @@ -74,4 +74,19 @@ public static WebElement selectedFormat(String format) { return SeleniumDriver.getDriver() .findElement(By.xpath("//*[@data-cy='select-format']/div[text()='" + format + "']")); } + + @FindBy(how = How.XPATH, using = "//span[contains(text(), \"Batch interval\")]//following-sibling::div//select[1]") + public static WebElement batchTime; + + @FindBy(how = How.XPATH, using = "//span[contains(text(), \"Batch interval\")]//following-sibling::div//select[2]") + public static WebElement timeSelect; + + @FindBy(how = How.XPATH, using = "//button[@data-testid='config-apply-close']") + public static WebElement saveButton; + + @FindBy(how = How.XPATH, using = "//*[@data-cy='pipeline-configure-modeless-btn']") + public static WebElement configButton; + + @FindBy(how = How.XPATH, using = "//*[@data-cy='tab-content-Pipeline config']") + public static WebElement pipelineConfig; } diff --git a/src/e2e-test/java/io/cdap/plugin/pubsub/stepsdesign/PubSubSink.java b/src/e2e-test/java/io/cdap/plugin/pubsub/stepsdesign/PubSubSink.java index 1cb687e2f0..17888b0fdf 100644 --- a/src/e2e-test/java/io/cdap/plugin/pubsub/stepsdesign/PubSubSink.java +++ b/src/e2e-test/java/io/cdap/plugin/pubsub/stepsdesign/PubSubSink.java @@ -27,13 +27,16 @@ import io.cdap.plugin.utils.E2EHelper; import io.cdap.plugin.utils.E2ETestConstants; import io.cdap.plugin.utils.PubSubClient; +import io.cucumber.java.en.And; import io.cucumber.java.en.Then; import io.cucumber.java.en.When; import org.apache.commons.lang3.StringUtils; import org.junit.Assert; +import org.openqa.selenium.support.ui.Select; import stepsdesign.BeforeActions; import java.io.IOException; +import java.util.concurrent.TimeUnit; /** * PubSub Sink Plugin related step design. @@ -234,10 +237,31 @@ public void enterRuntimeArgumentValueForPubSubSinkPropertyTopicKey(String runtim public void clickOnPreviewDataForPubSubSink() { openSinkPluginPreviewData("GooglePublisher"); } + @Then("Subscribe to the messages") - public void subscribeToTheMessages() - { - PubSubClient.subscribeAsyncExample(PluginPropertyUtils.pluginProp(ConstantsUtil.PROJECT_ID),TestSetupHooks.pubSubSourceSubscription); + public void subscribeToTheMessages() throws InterruptedException { + TimeUnit time = TimeUnit.SECONDS; + time.sleep(60); + PubSubClient.subscribeAsyncExample(PluginPropertyUtils.pluginProp(ConstantsUtil.PROJECT_ID), + TestSetupHooks.pubSubSourceSubscription); + } + + @And("Click on batch time and select format") + public void clickOnBatchTimeAndSelectFormat() { + Select select = new Select(PubSubLocators.batchTime); + select.selectByIndex(0); + Select selectformat = new Select(PubSubLocators.timeSelect); + selectformat.selectByIndex(1); + ElementHelper.clickOnElement(PubSubLocators.saveButton); } + @And("Click on configure button") + public void clickOnConfigureButton() { + ElementHelper.clickOnElement(PubSubLocators.configButton); + } + + @And("Click on pipeline config") + public void clickOnPipelineConfig() { + ElementHelper.clickOnElement(PubSubLocators.pipelineConfig); + } } diff --git a/src/e2e-test/java/io/cdap/plugin/pubsub/stepsdesign/PubSubSource.java b/src/e2e-test/java/io/cdap/plugin/pubsub/stepsdesign/PubSubSource.java index 29cb952538..97b8eef2a4 100644 --- a/src/e2e-test/java/io/cdap/plugin/pubsub/stepsdesign/PubSubSource.java +++ b/src/e2e-test/java/io/cdap/plugin/pubsub/stepsdesign/PubSubSource.java @@ -25,6 +25,7 @@ import io.cucumber.java.en.Then; import io.cucumber.java.en.When; import java.io.IOException; +import java.util.concurrent.TimeUnit; /** * PubSub Source Plugin related step design. @@ -52,6 +53,9 @@ public void enterPubSubSourcePropertySubscriptionName() { @Then("Publish the message") public void publishTheMessage() throws IOException, InterruptedException { - PubSubClient.publishWithErrorHandlerExample(PluginPropertyUtils.pluginProp(ConstantsUtil.PROJECT_ID), TestSetupHooks.pubSubSourceTopic); + TimeUnit time = TimeUnit.SECONDS; + time.sleep(120); + PubSubClient.publishWithErrorHandlerExample(PluginPropertyUtils.pluginProp(ConstantsUtil.PROJECT_ID), + TestSetupHooks.pubSubSourceTopic); } -} \ No newline at end of file +} diff --git a/src/e2e-test/java/io/cdap/plugin/utils/PubSubClient.java b/src/e2e-test/java/io/cdap/plugin/utils/PubSubClient.java index 94a379a97d..96f0d0acab 100644 --- a/src/e2e-test/java/io/cdap/plugin/utils/PubSubClient.java +++ b/src/e2e-test/java/io/cdap/plugin/utils/PubSubClient.java @@ -30,14 +30,14 @@ import com.google.cloud.pubsub.v1.TopicAdminClient; import com.google.common.util.concurrent.MoreExecutors; import com.google.protobuf.ByteString; +import com.google.pubsub.v1.ProjectSubscriptionName; import com.google.pubsub.v1.PubsubMessage; import com.google.pubsub.v1.PushConfig; +import com.google.pubsub.v1.Subscription; import com.google.pubsub.v1.Topic; import com.google.pubsub.v1.TopicName; import io.cdap.e2e.utils.ConstantsUtil; import io.cdap.e2e.utils.PluginPropertyUtils; -import com.google.pubsub.v1.ProjectSubscriptionName; - import java.io.IOException; import java.util.Arrays; import java.util.List; @@ -62,7 +62,7 @@ public static void createSubscription(String subscriptionId, String topicId) thr try (SubscriptionAdminClient subscriptionAdminClient = SubscriptionAdminClient.create( SubscriptionAdminSettings.newBuilder().build())) { TopicName topicName = TopicName.of(PluginPropertyUtils.pluginProp(ConstantsUtil.PROJECT_ID), topicId); - subscriptionName = ProjectSubscriptionName.of(ConstantsUtil.PROJECT_ID, subscriptionId); + subscriptionName = ProjectSubscriptionName.of(PluginPropertyUtils.pluginProp(ConstantsUtil.PROJECT_ID), subscriptionId); subscriptionAdminClient.createSubscription(subscriptionName, topicName, PushConfig.getDefaultInstance(), 60); System.out.println("Subscription created: " + subscriptionName.toString()); } catch (AlreadyExistsException e) { @@ -94,14 +94,12 @@ public static String getTopicCmekKey(String topicId) throws IOException { public static void publishWithErrorHandlerExample(String projectId, String topicId) throws IOException, InterruptedException { - TopicName topicName = TopicName.of(projectId, topicId); + TopicName topicName = TopicName.of(PluginPropertyUtils.pluginProp(ConstantsUtil.PROJECT_ID), topicId); Publisher publisher = null; - try { publisher = Publisher.newBuilder(topicName).build(); List messages = Arrays.asList("first message", "second message"); - for (final String message : messages) { ByteString data = ByteString.copyFromUtf8(message); PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build(); @@ -116,9 +114,9 @@ public static void publishWithErrorHandlerExample(String projectId, String topic public void onFailure(Throwable throwable) { if (throwable instanceof ApiException) { ApiException apiException = ((ApiException) throwable); - // details on the API exception - // System.out.println(apiException.getStatusCode().getCode()); - //System.out.println(apiException.isRetryable()); + // details on the API exception + System.out.println(apiException.getStatusCode().getCode()); + System.out.println(apiException.isRetryable()); } System.out.println("Error publishing message : " + message); } @@ -141,9 +139,8 @@ public void onSuccess(String messageId) { } public static void subscribeAsyncExample(String projectId, String subscriptionId) { - ProjectSubscriptionName subscriptionName = - ProjectSubscriptionName.of(projectId, subscriptionId); - + ProjectSubscriptionName subscriptionName = ProjectSubscriptionName.of(PluginPropertyUtils.pluginProp + (ConstantsUtil.PROJECT_ID), subscriptionId); // Instantiate an asynchronous message receiver. MessageReceiver receiver = (PubsubMessage message, AckReplyConsumer consumer) -> { @@ -166,8 +163,4 @@ public static void subscribeAsyncExample(String projectId, String subscriptionId subscriber.stopAsync(); } } - - - - } From 722acc411e3ca846c512cb3fa9032cf781d7789b Mon Sep 17 00:00:00 2001 From: itsankit-google Date: Thu, 4 Jan 2024 09:12:50 +0000 Subject: [PATCH 07/10] add scopes in sandbox mode --- src/main/java/io/cdap/plugin/gcp/common/GCPUtils.java | 2 +- .../gcp/gcs/ServiceAccountAccessTokenProvider.java | 9 +++++++++ 2 files changed, 10 insertions(+), 1 deletion(-) diff --git a/src/main/java/io/cdap/plugin/gcp/common/GCPUtils.java b/src/main/java/io/cdap/plugin/gcp/common/GCPUtils.java index 7351b628b3..e5d87fae83 100644 --- a/src/main/java/io/cdap/plugin/gcp/common/GCPUtils.java +++ b/src/main/java/io/cdap/plugin/gcp/common/GCPUtils.java @@ -66,7 +66,7 @@ public class GCPUtils { private static final Type SCOPES_TYPE = new TypeToken>() { }.getType(); private static final String SERVICE_ACCOUNT_TYPE = "cdap.auth.service.account.type"; private static final String SERVICE_ACCOUNT = "cdap.auth.service.account"; - private static final String SERVICE_ACCOUNT_SCOPES = "cdap.auth.service.account.scopes"; + public static final String SERVICE_ACCOUNT_SCOPES = "cdap.auth.service.account.scopes"; private static final String SERVICE_ACCOUNT_TYPE_FILE_PATH = "filePath"; // fs.gs prefix is for GoogleHadoopFileSystemBase.getCredential(), used by the GCS connector. private static final String GCS_PREFIX = "fs.gs"; diff --git a/src/main/java/io/cdap/plugin/gcp/gcs/ServiceAccountAccessTokenProvider.java b/src/main/java/io/cdap/plugin/gcp/gcs/ServiceAccountAccessTokenProvider.java index 35b96d57a5..0a64c52dd9 100644 --- a/src/main/java/io/cdap/plugin/gcp/gcs/ServiceAccountAccessTokenProvider.java +++ b/src/main/java/io/cdap/plugin/gcp/gcs/ServiceAccountAccessTokenProvider.java @@ -18,13 +18,17 @@ package io.cdap.plugin.gcp.gcs; import com.google.auth.oauth2.GoogleCredentials; +import com.google.bigtable.repackaged.com.google.gson.Gson; import com.google.cloud.hadoop.util.AccessTokenProvider; +import com.google.cloud.hadoop.util.CredentialFactory; import io.cdap.plugin.gcp.common.GCPUtils; import org.apache.hadoop.conf.Configuration; import java.io.IOException; import java.time.Instant; import java.util.Date; +import java.util.stream.Collectors; +import java.util.stream.Stream; /** * An AccessTokenProvider that uses the newer GoogleCredentials library to get the credentials. This is used instead @@ -34,6 +38,7 @@ public class ServiceAccountAccessTokenProvider implements AccessTokenProvider { private Configuration conf; private GoogleCredentials credentials; + private static final Gson GSON = new Gson(); @Override public AccessToken getAccessToken() { @@ -61,6 +66,10 @@ private GoogleCredentials getCredentials() throws IOException { // config to {@link ServiceAccountAccessTokenProvider} which causes NPE when // initializing {@link ForwardingBigQueryFileOutputCommitter because conf is null. conf = new Configuration(); + // Add scopes information which is lost when running in sandbox mode. + conf.set(GCPUtils.SERVICE_ACCOUNT_SCOPES, GSON.toJson( + Stream.concat(CredentialFactory.DEFAULT_SCOPES.stream(), + GCPUtils.BIGQUERY_SCOPES.stream()).collect(Collectors.toList()))); } credentials = GCPUtils.loadCredentialsFromConf(conf); } From 7633582d2b1891c5f52d627ec912e2405c2cfe61 Mon Sep 17 00:00:00 2001 From: itsankit-google Date: Thu, 4 Jan 2024 10:09:22 +0000 Subject: [PATCH 08/10] Add cucumber report url to the workflow --- .github/workflows/e2e.yml | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/.github/workflows/e2e.yml b/.github/workflows/e2e.yml index 1db5015306..e10d21f28f 100644 --- a/.github/workflows/e2e.yml +++ b/.github/workflows/e2e.yml @@ -74,12 +74,6 @@ jobs: - name: Run all e2e tests if: github.event_name == 'workflow_dispatch' || github.event_name == 'push' || steps.filter.outputs.e2e-test == 'true' run: python3 e2e/src/main/scripts/run_e2e_test.py --testRunner **/${{ matrix.tests }}/**/TestRunner.java - - name: Upload report - uses: actions/upload-artifact@v3 - if: always() - with: - name: Cucumber report - ${{ matrix.tests }} - path: ./plugin/target/cucumber-reports - name: Upload debug files uses: actions/upload-artifact@v3 if: always() @@ -87,8 +81,11 @@ jobs: name: Debug files - ${{ matrix.tests }} path: ./**/target/e2e-debug - name: Upload files to GCS - uses: google-github-actions/upload-cloud-storage@v0 + uses: google-github-actions/upload-cloud-storage@v2 if: always() with: path: ./plugin/target/cucumber-reports - destination: e2e-tests-cucumber-reports/${{ github.event.repository.name }}/${{ github.ref }} + destination: e2e-tests-cucumber-reports/${{ github.event.repository.name }}/${{ github.ref }}/${{ matrix.tests }} + - name: Cucumber Report URL + if: always() + run: echo "https://storage.googleapis.com/e2e-tests-cucumber-reports/${{ github.event.repository.name }}/${{ github.ref }}/${{ matrix.tests }}/cucumber-reports/advanced-reports/cucumber-html-reports/overview-features.html" From d5c57ecdce91c5094e79bfc3fd958f80c5291e61 Mon Sep 17 00:00:00 2001 From: priyabhatnagar25 <112169140+priyabhatnagar25@users.noreply.github.com> Date: Mon, 8 Jan 2024 15:28:35 +0530 Subject: [PATCH 09/10] pubsub scenarios --- .../PubSubToPubSub.feature | 151 +++++++++++++++++- .../features/pubsub/sink/BQToPubSub.feature | 2 +- .../common/stepsdesign/TestSetupHooks.java | 26 +++ .../pubsub/stepsdesign/PubSubSource.java | 14 +- .../io/cdap/plugin/utils/PubSubClient.java | 5 +- 5 files changed, 187 insertions(+), 11 deletions(-) diff --git a/src/e2e-test/features/pubsub/pubsubadditionalfeature/PubSubToPubSub.feature b/src/e2e-test/features/pubsub/pubsubadditionalfeature/PubSubToPubSub.feature index f1303b5788..3940652e2e 100644 --- a/src/e2e-test/features/pubsub/pubsubadditionalfeature/PubSubToPubSub.feature +++ b/src/e2e-test/features/pubsub/pubsubadditionalfeature/PubSubToPubSub.feature @@ -15,15 +15,152 @@ @PubSub_Source Feature: PubSub Source - Verification of PubSub to PubSub successful data transfer in different formats. - @PUBSUB_SOURCE_TEST @PUBSUB_SUBSCRIPTION_TEST - Scenario: Validate successful transfer of records from PubSub(source) to PubSub(sink) having parquet format in both source and sink plugins. + @PUBSUB_SOURCE_TEST @PUBSUB_SINK_TEST @PUBSUB_SUBSCRIPTION_TEST + Scenario: Validate successful transfer of records from PubSub(source) to PubSub(sink). Given Open Datafusion Project to configure pipeline - When Select data pipeline type as: "Data Pipeline - Realtime" + When Select data pipeline type as: "Realtime" When Expand Plugin group in the LHS plugins list: "Source" When Select plugin: "Pub/Sub" from the plugins list as: "Source" When Expand Plugin group in the LHS plugins list: "Sink" When Select plugin: "Pub/Sub" from the plugins list as: "Sink" - Then Enter input plugin property: "referenceName" with value: "PubsubReferenceName" - And Replace input plugin property: "project" with value: "projectId" - Then Enter input plugin property: "subscription" with value: "pubSubSourceSubscription" - Then Enter input plugin property: "topic" with value: "pubSubSourceTopic" + Then Connect plugins: "Pub/Sub" and "Pub/Sub2" to establish connection + Then Navigate to the properties page of plugin: "Pub/Sub" + Then Override Service account details if set in environment variables + Then Enter input plugin property: "referenceName" with value: "BQReferenceName" + Then Enter PubSub source property subscription name + Then Enter PubSub source property topic name + Then Validate "Pub/Sub" plugin properties + And Close the Plugin Properties page + Then Open the PubSub sink properties + Then Enter PubSub property projectId "projectId" + Then Override Service account details if set in environment variables + Then Enter PubSub property reference name + Then Enter PubSub sink property topic name + Then Validate "Pub/Sub" plugin properties + And Close the Plugin Properties page + And Click on configure button + And Click on pipeline config + And Click on batch time and select format + Then Save the pipeline + Then Deploy the pipeline + Then Run the Pipeline in Runtime + Then Wait for pipeline to be in status: "Running" with a timeout of 240 seconds + Then Publish the messages + Then Subscribe to the messages + Then Validate OUT record count is equal to IN record count + And Stop the pipeline + Then Verify the pipeline status is "Stopped" + + @PUBSUB_SOURCE_TEST @PUBSUB_SINK_TEST @PUBSUB_SUBSCRIPTION_TEST + Scenario: Validate successful transfer of records from PubSub(source) to PubSub(sink) using macros. + Given Open Datafusion Project to configure pipeline + When Select data pipeline type as: "Realtime" + When Expand Plugin group in the LHS plugins list: "Source" + When Select plugin: "Pub/Sub" from the plugins list as: "Source" + When Expand Plugin group in the LHS plugins list: "Sink" + When Select plugin: "Pub/Sub" from the plugins list as: "Sink" + Then Connect plugins: "Pub/Sub" and "Pub/Sub2" to establish connection + Then Navigate to the properties page of plugin: "Pub/Sub" + Then Override Service account details if set in environment variables + Then Enter input plugin property: "referenceName" with value: "BQReferenceName" + Then Click on the Macro button of Property: "topic" and set the value in textarea: "PubSubSourceTopic" + Then Click on the Macro button of Property: "subscription" and set the value in textarea: "PubSubSourceSubscription" + Then Validate "Pub/Sub" plugin properties + And Close the Plugin Properties page + Then Open the PubSub sink properties + Then Enter PubSub property projectId "projectId" + Then Override Service account details if set in environment variables + Then Enter PubSub property reference name + Then Enter PubSub sink property topic name + Then Validate "Pub/Sub" plugin properties + And Close the Plugin Properties page + And Click on configure button + And Click on pipeline config + And Click on batch time and select format + Then Save the pipeline + Then Enter runtime argument value for PubSub source property topic key "PubSubSourceTopic" + Then Enter runtime argument value for PubSub source property subscription key "PubSubSourceSubscription" + Then Deploy the pipeline + Then Run the Pipeline in Runtime with runtime arguments + Then Wait for pipeline to be in status: "Running" with a timeout of 240 seconds + Then Publish the messages + Then Subscribe to the messages + Then Validate OUT record count is equal to IN record count + And Stop the pipeline + Then Verify the pipeline status is "Stopped" + + @PUBSUB_SOURCE_TEST @PUBSUB_SINK_TEST @PUBSUB_SUBSCRIPTION_TEST + Scenario: Validate successful transfer of records from PubSub(source) to PubSub(sink) having formats TEXT at both source and sink. + Given Open Datafusion Project to configure pipeline + When Select data pipeline type as: "Realtime" + When Expand Plugin group in the LHS plugins list: "Source" + When Select plugin: "Pub/Sub" from the plugins list as: "Source" + When Expand Plugin group in the LHS plugins list: "Sink" + When Select plugin: "Pub/Sub" from the plugins list as: "Sink" + Then Connect plugins: "Pub/Sub" and "Pub/Sub2" to establish connection + Then Navigate to the properties page of plugin: "Pub/Sub" + Then Override Service account details if set in environment variables + Then Enter input plugin property: "referenceName" with value: "BQReferenceName" + Then Enter PubSub source property subscription name + Then Enter PubSub source property topic name + Then Select dropdown plugin property: "select-format" with option value: "text" + Then Validate "Pub/Sub" plugin properties + And Close the Plugin Properties page + Then Open the PubSub sink properties + Then Enter PubSub property projectId "projectId" + Then Override Service account details if set in environment variables + Then Enter PubSub property reference name + Then Enter PubSub sink property topic name + Then Select dropdown plugin property: "select-format" with option value: "text" + Then Validate "Pub/Sub" plugin properties + And Close the Plugin Properties page + And Click on configure button + And Click on pipeline config + And Click on batch time and select format + Then Save the pipeline + Then Deploy the pipeline + Then Run the Pipeline in Runtime + Then Wait for pipeline to be in status: "Running" with a timeout of 240 seconds + Then Publish the messages + Then Subscribe to the messages + Then Validate OUT record count is equal to IN record count + And Stop the pipeline + Then Verify the pipeline status is "Stopped" + + @PUBSUB_SOURCE_TEST @PUBSUB_SINK_TEST @PUBSUB_SUBSCRIPTION_TEST + Scenario: Validate successful transfer of records from PubSub(source) to PubSub(sink) having formats TEXT at both source and sink. + Given Open Datafusion Project to configure pipeline + When Select data pipeline type as: "Realtime" + When Expand Plugin group in the LHS plugins list: "Source" + When Select plugin: "Pub/Sub" from the plugins list as: "Source" + When Expand Plugin group in the LHS plugins list: "Sink" + When Select plugin: "Pub/Sub" from the plugins list as: "Sink" + Then Connect plugins: "Pub/Sub" and "Pub/Sub2" to establish connection + Then Navigate to the properties page of plugin: "Pub/Sub" + Then Override Service account details if set in environment variables + Then Enter input plugin property: "referenceName" with value: "BQReferenceName" + Then Enter PubSub source property subscription name + Then Enter PubSub source property topic name + Then Select dropdown plugin property: "select-format" with option value: "text" + Then Validate "Pub/Sub" plugin properties + And Close the Plugin Properties page + Then Open the PubSub sink properties + Then Enter PubSub property projectId "projectId" + Then Override Service account details if set in environment variables + Then Enter PubSub property reference name + Then Enter PubSub sink property topic name + Then Select dropdown plugin property: "select-format" with option value: "text" + Then Validate "Pub/Sub" plugin properties + And Close the Plugin Properties page + And Click on configure button + And Click on pipeline config + And Click on batch time and select format + Then Save the pipeline + Then Deploy the pipeline + Then Run the Pipeline in Runtime + Then Wait for pipeline to be in status: "Running" with a timeout of 240 seconds + Then Publish the messages + Then Subscribe to the messages + Then Validate OUT record count is equal to IN record count + And Stop the pipeline + Then Verify the pipeline status is "Stopped" \ No newline at end of file diff --git a/src/e2e-test/features/pubsub/sink/BQToPubSub.feature b/src/e2e-test/features/pubsub/sink/BQToPubSub.feature index 3fdab2d644..5abf6c001d 100644 --- a/src/e2e-test/features/pubsub/sink/BQToPubSub.feature +++ b/src/e2e-test/features/pubsub/sink/BQToPubSub.feature @@ -71,7 +71,7 @@ Feature: PubSub-Sink - Verification of BigQuery to PubSub successful data transf Then Deploy the pipeline Then Run the Pipeline in Runtime Then Wait for pipeline to be in status: "Running" with a timeout of 240 seconds - Then Publish the message + Then Publish the messages Then Subscribe to the messages Then Validate OUT record count is equal to IN record count And Stop the pipeline diff --git a/src/e2e-test/java/io/cdap/plugin/common/stepsdesign/TestSetupHooks.java b/src/e2e-test/java/io/cdap/plugin/common/stepsdesign/TestSetupHooks.java index 665b888c69..2a283feca3 100644 --- a/src/e2e-test/java/io/cdap/plugin/common/stepsdesign/TestSetupHooks.java +++ b/src/e2e-test/java/io/cdap/plugin/common/stepsdesign/TestSetupHooks.java @@ -482,6 +482,32 @@ private static String createGCSBucketWithFilesAndFolder(String folderPath) throw return bucketName; } + @Before(order = 1, value = "@PUBSUB_SOURCE_TEST") + public static void createSourcePubSubTopic() throws IOException { + pubSubSourceTopic = "cdf-e2e-test-" + UUID.randomUUID(); + PubSubClient.createTopic(pubSubSourceTopic); + BeforeActions.scenario.write("Source PubSub topic " + pubSubSourceTopic); + } + + @Before(order = 1, value = "@PUBSUB_SUBSCRIPTION_TEST") + public static void createSubscriptionPubSubTopic() throws IOException { + pubSubSourceSubscription = "cdf-e2e-test-" + UUID.randomUUID(); + PubSubClient.createSubscription(pubSubSourceSubscription,pubSubSourceTopic); + BeforeActions.scenario.write("Source PubSub subscription " + pubSubSourceSubscription); + } + @After(order = 1, value = "@PUBSUB_SOURCE_TESTt") + public static void deleteSourcePubSubTopic() { + try { + PubSubClient.deleteTopic(pubSubSourceTopic); + BeforeActions.scenario.write("Deleted target PubSub topic " + pubSubSourceTopic); + pubSubSourceTopic = StringUtils.EMPTY; + } catch (Exception e) { + if (e.getMessage().contains("Invalid resource name given") || e.getMessage().contains("Resource not found")) { + + } + } + } + @Before(order = 1, value = "@PUBSUB_SINK_TEST") public static void createTargetPubSubTopic() { pubSubTargetTopic = "cdf-e2e-test-" + UUID.randomUUID(); diff --git a/src/e2e-test/java/io/cdap/plugin/pubsub/stepsdesign/PubSubSource.java b/src/e2e-test/java/io/cdap/plugin/pubsub/stepsdesign/PubSubSource.java index 97b8eef2a4..612e0b0562 100644 --- a/src/e2e-test/java/io/cdap/plugin/pubsub/stepsdesign/PubSubSource.java +++ b/src/e2e-test/java/io/cdap/plugin/pubsub/stepsdesign/PubSubSource.java @@ -16,8 +16,10 @@ package io.cdap.plugin.pubsub.stepsdesign; +import io.cdap.e2e.pages.locators.CdfStudioLocators; import io.cdap.e2e.utils.CdfHelper; import io.cdap.e2e.utils.ConstantsUtil; +import io.cdap.e2e.utils.ElementHelper; import io.cdap.e2e.utils.PluginPropertyUtils; import io.cdap.plugin.common.stepsdesign.TestSetupHooks; import io.cdap.plugin.pubsub.actions.PubSubActions; @@ -51,11 +53,21 @@ public void enterPubSubSourcePropertySubscriptionName() { PubSubActions.enterSubscription(TestSetupHooks.pubSubSourceSubscription); } - @Then("Publish the message") + @Then("Publish the messages") public void publishTheMessage() throws IOException, InterruptedException { TimeUnit time = TimeUnit.SECONDS; time.sleep(120); PubSubClient.publishWithErrorHandlerExample(PluginPropertyUtils.pluginProp(ConstantsUtil.PROJECT_ID), TestSetupHooks.pubSubSourceTopic); } + + @Then("Enter runtime argument value for PubSub source property topic key {string}") + public void enterRuntimeArgumentValueForPubSubSourcePropertyTopicKey(String runtimeArgumentKey) { + ElementHelper.sendKeys(CdfStudioLocators.runtimeArgsValue(runtimeArgumentKey), TestSetupHooks.pubSubSourceTopic); + } + + @Then("Enter runtime argument value for PubSub source property subscription key {string}") + public void enterRuntimeArgumentValueForPubSubSourcePropertySubscriptionKey(String runtimeArgumentKey) { + ElementHelper.sendKeys(CdfStudioLocators.runtimeArgsValue(runtimeArgumentKey), TestSetupHooks.pubSubSourceSubscription); + } } diff --git a/src/e2e-test/java/io/cdap/plugin/utils/PubSubClient.java b/src/e2e-test/java/io/cdap/plugin/utils/PubSubClient.java index 96f0d0acab..f00a64265b 100644 --- a/src/e2e-test/java/io/cdap/plugin/utils/PubSubClient.java +++ b/src/e2e-test/java/io/cdap/plugin/utils/PubSubClient.java @@ -62,7 +62,8 @@ public static void createSubscription(String subscriptionId, String topicId) thr try (SubscriptionAdminClient subscriptionAdminClient = SubscriptionAdminClient.create( SubscriptionAdminSettings.newBuilder().build())) { TopicName topicName = TopicName.of(PluginPropertyUtils.pluginProp(ConstantsUtil.PROJECT_ID), topicId); - subscriptionName = ProjectSubscriptionName.of(PluginPropertyUtils.pluginProp(ConstantsUtil.PROJECT_ID), subscriptionId); + subscriptionName = ProjectSubscriptionName.of + (PluginPropertyUtils.pluginProp(ConstantsUtil.PROJECT_ID), subscriptionId); subscriptionAdminClient.createSubscription(subscriptionName, topicName, PushConfig.getDefaultInstance(), 60); System.out.println("Subscription created: " + subscriptionName.toString()); } catch (AlreadyExistsException e) { @@ -163,4 +164,4 @@ public static void subscribeAsyncExample(String projectId, String subscriptionId subscriber.stopAsync(); } } -} +} \ No newline at end of file From 692e6932782d2b107d5a7c3b7fdde313f5726a47 Mon Sep 17 00:00:00 2001 From: priyabhatnagar25 <112169140+priyabhatnagar25@users.noreply.github.com> Date: Tue, 9 Jan 2024 19:22:41 +0530 Subject: [PATCH 10/10] pubsub to pubsub scenarios --- .../PubSubToPubSub.feature | 20 +++++++++---------- .../common/stepsdesign/TestSetupHooks.java | 12 ++++++++++- .../pubsub/stepsdesign/PubSubSource.java | 6 +++--- .../io/cdap/plugin/utils/PubSubClient.java | 10 +++++----- .../resources/pluginParameters.properties | 6 ++++-- 5 files changed, 33 insertions(+), 21 deletions(-) diff --git a/src/e2e-test/features/pubsub/pubsubadditionalfeature/PubSubToPubSub.feature b/src/e2e-test/features/pubsub/pubsubadditionalfeature/PubSubToPubSub.feature index 3940652e2e..d21aecd535 100644 --- a/src/e2e-test/features/pubsub/pubsubadditionalfeature/PubSubToPubSub.feature +++ b/src/e2e-test/features/pubsub/pubsubadditionalfeature/PubSubToPubSub.feature @@ -15,7 +15,7 @@ @PubSub_Source Feature: PubSub Source - Verification of PubSub to PubSub successful data transfer in different formats. - @PUBSUB_SOURCE_TEST @PUBSUB_SINK_TEST @PUBSUB_SUBSCRIPTION_TEST + @PUBSUB_SOURCE_TEST @PUBSUB_SINK_TEST @PUBSUB_SUBSCRIPTION_TEST @PUBSUB_MESSAGE_TEST Scenario: Validate successful transfer of records from PubSub(source) to PubSub(sink). Given Open Datafusion Project to configure pipeline When Select data pipeline type as: "Realtime" @@ -51,7 +51,7 @@ Feature: PubSub Source - Verification of PubSub to PubSub successful data transf And Stop the pipeline Then Verify the pipeline status is "Stopped" - @PUBSUB_SOURCE_TEST @PUBSUB_SINK_TEST @PUBSUB_SUBSCRIPTION_TEST + @PUBSUB_SOURCE_TEST @PUBSUB_SINK_TEST @PUBSUB_SUBSCRIPTION_TEST @PUBSUB_MESSAGE_TEST Scenario: Validate successful transfer of records from PubSub(source) to PubSub(sink) using macros. Given Open Datafusion Project to configure pipeline When Select data pipeline type as: "Realtime" @@ -63,8 +63,8 @@ Feature: PubSub Source - Verification of PubSub to PubSub successful data transf Then Navigate to the properties page of plugin: "Pub/Sub" Then Override Service account details if set in environment variables Then Enter input plugin property: "referenceName" with value: "BQReferenceName" - Then Click on the Macro button of Property: "topic" and set the value in textarea: "PubSubSourceTopic" - Then Click on the Macro button of Property: "subscription" and set the value in textarea: "PubSubSourceSubscription" + Then Click on the Macro button of Property: "topic" and set the value to: "PubSubSourceTopic" + Then Click on the Macro button of Property: "subscription" and set the value to: "PubSubSourceSubscription" Then Validate "Pub/Sub" plugin properties And Close the Plugin Properties page Then Open the PubSub sink properties @@ -78,9 +78,9 @@ Feature: PubSub Source - Verification of PubSub to PubSub successful data transf And Click on pipeline config And Click on batch time and select format Then Save the pipeline + Then Deploy the pipeline Then Enter runtime argument value for PubSub source property topic key "PubSubSourceTopic" Then Enter runtime argument value for PubSub source property subscription key "PubSubSourceSubscription" - Then Deploy the pipeline Then Run the Pipeline in Runtime with runtime arguments Then Wait for pipeline to be in status: "Running" with a timeout of 240 seconds Then Publish the messages @@ -89,7 +89,7 @@ Feature: PubSub Source - Verification of PubSub to PubSub successful data transf And Stop the pipeline Then Verify the pipeline status is "Stopped" - @PUBSUB_SOURCE_TEST @PUBSUB_SINK_TEST @PUBSUB_SUBSCRIPTION_TEST + @PUBSUB_SOURCE_TEST @PUBSUB_SINK_TEST @PUBSUB_SUBSCRIPTION_TEST @PUBSUB_MESSAGE_TEST Scenario: Validate successful transfer of records from PubSub(source) to PubSub(sink) having formats TEXT at both source and sink. Given Open Datafusion Project to configure pipeline When Select data pipeline type as: "Realtime" @@ -127,8 +127,8 @@ Feature: PubSub Source - Verification of PubSub to PubSub successful data transf And Stop the pipeline Then Verify the pipeline status is "Stopped" - @PUBSUB_SOURCE_TEST @PUBSUB_SINK_TEST @PUBSUB_SUBSCRIPTION_TEST - Scenario: Validate successful transfer of records from PubSub(source) to PubSub(sink) having formats TEXT at both source and sink. + @PUBSUB_SOURCE_TEST @PUBSUB_SINK_TEST @PUBSUB_SUBSCRIPTION_TEST @PUBSUB_MESSAGE_TEST + Scenario: Validate successful transfer of records from PubSub(source) to PubSub(sink) having formats TEXT at source and JSON at sink. Given Open Datafusion Project to configure pipeline When Select data pipeline type as: "Realtime" When Expand Plugin group in the LHS plugins list: "Source" @@ -149,7 +149,7 @@ Feature: PubSub Source - Verification of PubSub to PubSub successful data transf Then Override Service account details if set in environment variables Then Enter PubSub property reference name Then Enter PubSub sink property topic name - Then Select dropdown plugin property: "select-format" with option value: "text" + Then Select dropdown plugin property: "select-format" with option value: "json" Then Validate "Pub/Sub" plugin properties And Close the Plugin Properties page And Click on configure button @@ -163,4 +163,4 @@ Feature: PubSub Source - Verification of PubSub to PubSub successful data transf Then Subscribe to the messages Then Validate OUT record count is equal to IN record count And Stop the pipeline - Then Verify the pipeline status is "Stopped" \ No newline at end of file + Then Verify the pipeline status is "Stopped" diff --git a/src/e2e-test/java/io/cdap/plugin/common/stepsdesign/TestSetupHooks.java b/src/e2e-test/java/io/cdap/plugin/common/stepsdesign/TestSetupHooks.java index 2a283feca3..17652261a9 100644 --- a/src/e2e-test/java/io/cdap/plugin/common/stepsdesign/TestSetupHooks.java +++ b/src/e2e-test/java/io/cdap/plugin/common/stepsdesign/TestSetupHooks.java @@ -21,6 +21,7 @@ import io.cdap.e2e.pages.actions.CdfConnectionActions; import io.cdap.e2e.pages.actions.CdfPluginPropertiesActions; import io.cdap.e2e.utils.BigQueryClient; +import io.cdap.e2e.utils.ConstantsUtil; import io.cdap.e2e.utils.PluginPropertyUtils; import io.cdap.e2e.utils.StorageClient; import io.cdap.plugin.utils.PubSubClient; @@ -492,7 +493,7 @@ public static void createSourcePubSubTopic() throws IOException { @Before(order = 1, value = "@PUBSUB_SUBSCRIPTION_TEST") public static void createSubscriptionPubSubTopic() throws IOException { pubSubSourceSubscription = "cdf-e2e-test-" + UUID.randomUUID(); - PubSubClient.createSubscription(pubSubSourceSubscription,pubSubSourceTopic); + PubSubClient.createSubscription(pubSubSourceSubscription , pubSubSourceTopic); BeforeActions.scenario.write("Source PubSub subscription " + pubSubSourceSubscription); } @After(order = 1, value = "@PUBSUB_SOURCE_TESTt") @@ -508,6 +509,15 @@ public static void deleteSourcePubSubTopic() { } } + public static void publishMessage() throws IOException, InterruptedException { + String dataMessage1 = PluginPropertyUtils.pluginProp("firstMessage"); + String dataMessage2 = PluginPropertyUtils.pluginProp("secondMessage"); + List dataMessagesList = Arrays.asList(dataMessage1, dataMessage2); + PubSubClient.publishWithErrorHandlerExample(PluginPropertyUtils.pluginProp(ConstantsUtil.PROJECT_ID), + pubSubSourceTopic, dataMessagesList); + } + + @Before(order = 1, value = "@PUBSUB_SINK_TEST") public static void createTargetPubSubTopic() { pubSubTargetTopic = "cdf-e2e-test-" + UUID.randomUUID(); diff --git a/src/e2e-test/java/io/cdap/plugin/pubsub/stepsdesign/PubSubSource.java b/src/e2e-test/java/io/cdap/plugin/pubsub/stepsdesign/PubSubSource.java index 612e0b0562..d49d9fb33b 100644 --- a/src/e2e-test/java/io/cdap/plugin/pubsub/stepsdesign/PubSubSource.java +++ b/src/e2e-test/java/io/cdap/plugin/pubsub/stepsdesign/PubSubSource.java @@ -57,8 +57,7 @@ public void enterPubSubSourcePropertySubscriptionName() { public void publishTheMessage() throws IOException, InterruptedException { TimeUnit time = TimeUnit.SECONDS; time.sleep(120); - PubSubClient.publishWithErrorHandlerExample(PluginPropertyUtils.pluginProp(ConstantsUtil.PROJECT_ID), - TestSetupHooks.pubSubSourceTopic); + TestSetupHooks.publishMessage(); } @Then("Enter runtime argument value for PubSub source property topic key {string}") @@ -68,6 +67,7 @@ public void enterRuntimeArgumentValueForPubSubSourcePropertyTopicKey(String runt @Then("Enter runtime argument value for PubSub source property subscription key {string}") public void enterRuntimeArgumentValueForPubSubSourcePropertySubscriptionKey(String runtimeArgumentKey) { - ElementHelper.sendKeys(CdfStudioLocators.runtimeArgsValue(runtimeArgumentKey), TestSetupHooks.pubSubSourceSubscription); + ElementHelper.sendKeys(CdfStudioLocators.runtimeArgsValue(runtimeArgumentKey), + TestSetupHooks.pubSubSourceSubscription); } } diff --git a/src/e2e-test/java/io/cdap/plugin/utils/PubSubClient.java b/src/e2e-test/java/io/cdap/plugin/utils/PubSubClient.java index f00a64265b..313c202447 100644 --- a/src/e2e-test/java/io/cdap/plugin/utils/PubSubClient.java +++ b/src/e2e-test/java/io/cdap/plugin/utils/PubSubClient.java @@ -93,15 +93,15 @@ public static String getTopicCmekKey(String topicId) throws IOException { } - public static void publishWithErrorHandlerExample(String projectId, String topicId) + public static void publishWithErrorHandlerExample(String projectId, String topicId, List dataMessages) throws IOException, InterruptedException { TopicName topicName = TopicName.of(PluginPropertyUtils.pluginProp(ConstantsUtil.PROJECT_ID), topicId); Publisher publisher = null; try { publisher = Publisher.newBuilder(topicName).build(); - List messages = Arrays.asList("first message", "second message"); - for (final String message : messages) { +// List messages = Arrays.asList("first message", "second message"); + for (final String message : dataMessages) { ByteString data = ByteString.copyFromUtf8(message); PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build(); ApiFuture future = publisher.publish(pubsubMessage); @@ -158,10 +158,10 @@ public static void subscribeAsyncExample(String projectId, String subscriptionId subscriber.startAsync().awaitRunning(); System.out.printf("Listening for messages on %s:\n", subscriptionName.toString()); // Allow the subscriber to run for 30s unless an unrecoverable error occurs. - subscriber.awaitTerminated(300, TimeUnit.SECONDS); + subscriber.awaitTerminated(200, TimeUnit.SECONDS); } catch (TimeoutException timeoutException) { // Shut down the subscriber after 30s. Stop receiving messages. subscriber.stopAsync(); } } -} \ No newline at end of file +} diff --git a/src/e2e-test/resources/pluginParameters.properties b/src/e2e-test/resources/pluginParameters.properties index f6d091c865..8495e68d14 100644 --- a/src/e2e-test/resources/pluginParameters.properties +++ b/src/e2e-test/resources/pluginParameters.properties @@ -181,8 +181,6 @@ bqUpdateTableSchemaTrue=True clusterValue=transaction_date TableKey=PersonID bqSourceTable=dummy -#pubSubSourceSubscription=dummy -#pubSubSourceTopic=dummy bqCreateTableQueryFile=testdata/BigQuery/BigQueryCreateTableQuery.txt bqInsertDataQueryFile=testdata/BigQuery/BigQueryInsertDataQuery.txt bqCreateViewQueryFile=testdata/BigQuery/BigQueryCreateViewQuery.txt @@ -245,6 +243,10 @@ pubSubErrorThreshold=0 pubSubStringValue=one pubSubNegativeValue=-100 pubsubDelimiter=@ +pubSubSourceSubscription=dummy +pubSubSourceTopic=dummy +firstMessage=hellomessage +secondMessage=himessage ## PUBSUBSINK-PLUGIN-PROPERTIES-END ## GCSDELETE-PLUGIN-PROPERTIES-START