Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pubsub new tests e2e #35

Open
wants to merge 8 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
# Copyright © 2024 Cask Data, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy of
# the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.

@PubSub_DataStream
Feature: PubSub - Verification of successful data transfer from DataStream PubSub source to PubSub sink using different file formats

@PUBSUB_SOURCE_TEST @PUBSUB_SINK_TEST @PUBSUB_SUBSCRIPTION_TEST
Scenario: Validate successful transfer of records from PubSub(source) to PubSub(sink).
Given Open Datafusion Project to configure pipeline
When Select data pipeline type as: "Realtime"
When Expand Plugin group in the LHS plugins list: "Source"
When Select plugin: "Pub/Sub" from the plugins list as: "Source"
When Expand Plugin group in the LHS plugins list: "Sink"
When Select plugin: "Pub/Sub" from the plugins list as: "Sink"
Then Connect plugins: "Pub/Sub" and "Pub/Sub2" to establish connection
Then Navigate to the properties page of plugin: "Pub/Sub"
Then Override Service account details if set in environment variables
Then Enter input plugin property: "referenceName" with value: "PubSubSourceReferenceName"
Then Enter PubSub source property subscription name
Then Enter PubSub source property topic name
Then Validate "Pub/Sub" plugin properties
And Close the Plugin Properties page
Then Navigate to the properties page of plugin: "Pub/Sub2"
Then Replace input plugin property: "project" with value: "projectId"
Then Override Service account details if set in environment variables
Then Enter input plugin property: "referenceName" with value: "PubSubSinkReferenceName"
Then Enter PubSub sink property topic name
Then Validate "Pub/Sub" plugin properties
And Close the Plugin Properties page
And Click on configure button
And Click on pipeline config
And Click on batch time and select format
Then Save the pipeline
Then Deploy the pipeline
Then Run the Pipeline in Runtime
Then Wait for pipeline to be in status: "Running" with a timeout of 240 seconds
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really need the timeout of 240 seconds for runtime pipelines?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes we do after the pipeline runs for this much time only then we are sending the messages

Then Publish the messages
Then Subscribe to the messages
Then Validate OUT record count is equal to IN record count
Copy link
Collaborator

@itsmekumari itsmekumari Jan 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can add validation step to validate the message content. Common step to be added for all the scenarios.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No we cannot add and validation step to check the message content, this is asynchronous transfer and it will not always send messages properly as already discussed with rahul

And Stop the pipeline
Then Verify the pipeline status is "Stopped"

@PUBSUB_SOURCE_TEST @PUBSUB_SINK_TEST @PUBSUB_SUBSCRIPTION_TEST
Scenario: Validate successful transfer of records from PubSub(source) to PubSub(sink) using macros.
Given Open Datafusion Project to configure pipeline
When Select data pipeline type as: "Realtime"
When Expand Plugin group in the LHS plugins list: "Source"
When Select plugin: "Pub/Sub" from the plugins list as: "Source"
When Expand Plugin group in the LHS plugins list: "Sink"
When Select plugin: "Pub/Sub" from the plugins list as: "Sink"
Then Connect plugins: "Pub/Sub" and "Pub/Sub2" to establish connection
Then Navigate to the properties page of plugin: "Pub/Sub"
Then Override Service account details if set in environment variables
Then Enter input plugin property: "referenceName" with value: "PubSubSourceReferenceName"
Then Click on the Macro button of Property: "topic" and set the value to: "pubSubSourceTopic"
Then Click on the Macro button of Property: "subscription" and set the value to: "pubSubSourceSubscription"
Then Validate "Pub/Sub" plugin properties
And Close the Plugin Properties page
Then Navigate to the properties page of plugin: "Pub/Sub2"
Then Replace input plugin property: "project" with value: "projectId"
Then Override Service account details if set in environment variables
Then Enter input plugin property: "referenceName" with value: "PubSubSinkReferenceName"
Then Enter PubSub sink property topic name
Then Validate "Pub/Sub" plugin properties
And Close the Plugin Properties page
And Click on configure button
And Click on pipeline config
And Click on batch time and select format
Then Save the pipeline
Then Deploy the pipeline
Then Enter runtime argument value for PubSub source property topic key "pubSubSourceTopic"
Then Enter runtime argument value for PubSub source property subscription key "pubSubSourceSubscription"
Then Run the Pipeline in Runtime with runtime arguments
Then Wait for pipeline to be in status: "Running" with a timeout of 240 seconds
Then Publish the messages
Then Subscribe to the messages
Then Validate OUT record count is equal to IN record count
And Stop the pipeline
Then Verify the pipeline status is "Stopped"

@PUBSUB_SOURCE_TEST @PUBSUB_SINK_TEST @PUBSUB_SUBSCRIPTION_TEST
Scenario: Validate the successful transfer of records from a pubSub source to a pubSub sink with format Text at both source and sink
Given Open Datafusion Project to configure pipeline
When Select data pipeline type as: "Realtime"
When Expand Plugin group in the LHS plugins list: "Source"
When Select plugin: "Pub/Sub" from the plugins list as: "Source"
When Expand Plugin group in the LHS plugins list: "Sink"
When Select plugin: "Pub/Sub" from the plugins list as: "Sink"
Then Connect plugins: "Pub/Sub" and "Pub/Sub2" to establish connection
Then Navigate to the properties page of plugin: "Pub/Sub"
Then Override Service account details if set in environment variables
Then Enter input plugin property: "referenceName" with value: "PubSubSourceReferenceName"
Then Enter PubSub source property subscription name
Then Enter PubSub source property topic name
Then Select dropdown plugin property: "select-format" with option value: "text"
Then Validate "Pub/Sub" plugin properties
And Close the Plugin Properties page
Then Navigate to the properties page of plugin: "Pub/Sub2"
Then Replace input plugin property: "project" with value: "projectId"
Then Override Service account details if set in environment variables
Then Enter input plugin property: "referenceName" with value: "PubSubSinkReferenceName"
Then Enter PubSub sink property topic name
Then Select dropdown plugin property: "select-format" with option value: "text"
Then Validate "Pub/Sub" plugin properties
And Close the Plugin Properties page
And Click on configure button
And Click on pipeline config
And Click on batch time and select format
Then Save the pipeline
Then Deploy the pipeline
Then Run the Pipeline in Runtime
Then Wait for pipeline to be in status: "Running" with a timeout of 240 seconds
Then Publish the messages
Then Subscribe to the messages
Then Validate OUT record count is equal to IN record count
And Stop the pipeline
Then Verify the pipeline status is "Stopped"

@PUBSUB_SOURCE_TEST @PUBSUB_SINK_TEST @PUBSUB_SUBSCRIPTION_TEST
Scenario: Validate the successful transfer of records from a pubSub source to a pubSub sink with format Text at source and Json at sink
Given Open Datafusion Project to configure pipeline
When Select data pipeline type as: "Realtime"
When Expand Plugin group in the LHS plugins list: "Source"
When Select plugin: "Pub/Sub" from the plugins list as: "Source"
When Expand Plugin group in the LHS plugins list: "Sink"
When Select plugin: "Pub/Sub" from the plugins list as: "Sink"
Then Connect plugins: "Pub/Sub" and "Pub/Sub2" to establish connection
Then Navigate to the properties page of plugin: "Pub/Sub"
Then Override Service account details if set in environment variables
Then Enter input plugin property: "referenceName" with value: "PubSubSourceReferenceName"
Then Enter PubSub source property subscription name
Then Enter PubSub source property topic name
Then Select dropdown plugin property: "select-format" with option value: "text"
Then Validate "Pub/Sub" plugin properties
And Close the Plugin Properties page
Then Navigate to the properties page of plugin: "Pub/Sub2"
Then Replace input plugin property: "project" with value: "projectId"
Then Override Service account details if set in environment variables
Then Enter input plugin property: "referenceName" with value: "PubSubSinkReferenceName"
Then Enter PubSub sink property topic name
Then Select dropdown plugin property: "select-format" with option value: "json"
Then Validate "Pub/Sub" plugin properties
And Close the Plugin Properties page
And Click on configure button
And Click on pipeline config
And Click on batch time and select format
Then Save the pipeline
Then Deploy the pipeline
Then Run the Pipeline in Runtime
Then Wait for pipeline to be in status: "Running" with a timeout of 240 seconds
Then Publish the messages
Then Subscribe to the messages
Then Validate OUT record count is equal to IN record count
And Stop the pipeline
Then Verify the pipeline status is "Stopped"
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.cdap.e2e.pages.actions.CdfConnectionActions;
import io.cdap.e2e.pages.actions.CdfPluginPropertiesActions;
import io.cdap.e2e.utils.BigQueryClient;
import io.cdap.e2e.utils.ConstantsUtil;
import io.cdap.e2e.utils.PluginPropertyUtils;
import io.cdap.e2e.utils.StorageClient;
import io.cdap.plugin.utils.PubSubClient;
Expand Down Expand Up @@ -62,6 +63,8 @@ public class TestSetupHooks {
public static String bqSourceTable2 = StringUtils.EMPTY;
public static String bqSourceView = StringUtils.EMPTY;
public static String pubSubTargetTopic = StringUtils.EMPTY;
public static String pubSubSourceTopic = StringUtils.EMPTY;
public static String pubSubSourceSubscription = StringUtils.EMPTY;
public static String spannerInstance = StringUtils.EMPTY;
public static String spannerDatabase = StringUtils.EMPTY;
public static String spannerSourceTable = StringUtils.EMPTY;
Expand Down Expand Up @@ -480,6 +483,56 @@ private static String createGCSBucketWithFilesAndFolder(String folderPath) throw
return bucketName;
}

@Before(order = 1, value = "@PUBSUB_SOURCE_TEST")
public static void createSourcePubSubTopic() throws IOException {
pubSubSourceTopic = "cdf-e2e-test-" + UUID.randomUUID();
PubSubClient.createTopic(pubSubSourceTopic);
BeforeActions.scenario.write("Source PubSub topic " + pubSubSourceTopic);
}

@Before(order = 1, value = "@PUBSUB_SUBSCRIPTION_TEST")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add the After hook for the Subscription as well.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added hook

public static void createSubscriptionPubSubTopic() throws IOException {
pubSubSourceSubscription = "cdf-e2e-test-" + UUID.randomUUID();
PubSubClient.createSubscription(pubSubSourceSubscription , pubSubSourceTopic);
BeforeActions.scenario.write("Source PubSub subscription " + pubSubSourceSubscription);
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a one line space.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added


@After(order = 1, value = "@PUBSUB_SOURCE_TEST")
public static void deleteSourcePubSubTopic() {
try {
PubSubClient.deleteTopic(pubSubSourceTopic);
BeforeActions.scenario.write("Deleted target PubSub topic " + pubSubSourceTopic);
pubSubSourceTopic = StringUtils.EMPTY;
} catch (Exception e) {
if (e.getMessage().contains("Invalid resource name given") || e.getMessage().contains("Resource not found")) {

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Complete the catch block

  catch (Exception e) {
  if (e.getMessage().contains("Invalid resource name given") || e.getMessage().contains("Resource not found")) {
    BeforeActions.scenario.write("Source PubSub topic " + pubSubSourceTopic + " does not exist.");
  } else {
    Assert.fail(e.getMessage());
  }
}

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

completed the catch block

}
}
}

@After(order = 1, value = "@PUBSUB_SUBSCRIPTION_TEST")
public static void deleteSourcePubSubSubscription() {
try {
PubSubClient.deleteTopic(pubSubSourceSubscription);
BeforeActions.scenario.write("Deleted target PubSub topic " + pubSubSourceSubscription);
pubSubSourceSubscription = StringUtils.EMPTY;
} catch (Exception e) {
if (e.getMessage().contains("Invalid resource name given") || e.getMessage().contains("Resource not found")) {
BeforeActions.scenario.write("Source PubSub topic " + pubSubSourceTopic + " does not exist.");
} else {
Assert.fail(e.getMessage());
}
}
}

public static void publishMessage() throws IOException, InterruptedException {
String dataMessage1 = PluginPropertyUtils.pluginProp("firstMessage");
String dataMessage2 = PluginPropertyUtils.pluginProp("secondMessage");
List<String> dataMessagesList = Arrays.asList(dataMessage1, dataMessage2);
PubSubClient.publishMessagesWithPubSub(PluginPropertyUtils.pluginProp
(ConstantsUtil.PROJECT_ID), pubSubSourceTopic, dataMessagesList);
}

@Before(order = 1, value = "@PUBSUB_SINK_TEST")
public static void createTargetPubSubTopic() {
pubSubTargetTopic = "cdf-e2e-test-" + UUID.randomUUID();
Expand Down Expand Up @@ -1141,7 +1194,7 @@ public static void createSinkBQDedupeTable() throws IOException, InterruptedExce

@Before(value = "@BQ_INSERT_INT_SOURCE_TEST")
public static void createSourceBQTable() throws IOException, InterruptedException {
bqSourceTable = "E2E_TARGET_" + UUID.randomUUID().toString().replaceAll("-", "_");
bqSourceTable = "E2E_SOURCE_" + UUID.randomUUID().toString().replaceAll("-", "_");
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we changing BQ related in Pubsub

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is this change regarding ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reverted

PluginPropertyUtils.addPluginProp("bqSourceTable", bqSourceTable);
BeforeActions.scenario.write("BQ source table name - " + bqSourceTable);
BigQueryClient.getSoleQueryResult("create table `" + datasetName + "." + bqSourceTable + "` " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,4 +74,19 @@ public static WebElement selectedFormat(String format) {
return SeleniumDriver.getDriver()
.findElement(By.xpath("//*[@data-cy='select-format']/div[text()='" + format + "']"));
}

@FindBy(how = How.XPATH, using = "//span[contains(text(), \"Batch interval\")]//following-sibling::div//select[1]")
public static WebElement batchTime;

@FindBy(how = How.XPATH, using = "//span[contains(text(), \"Batch interval\")]//following-sibling::div//select[2]")
public static WebElement timeSelect;

@FindBy(how = How.XPATH, using = "//button[@data-testid='config-apply-close']")
public static WebElement saveButton;

@FindBy(how = How.XPATH, using = "//*[@data-cy='pipeline-configure-modeless-btn']")
public static WebElement configButton;

@FindBy(how = How.XPATH, using = "//*[@data-cy='tab-content-Pipeline config']")
public static WebElement pipelineConfig;
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
features = {"src/e2e-test/features"},
glue = {"io.cdap.plugin.pubsub.stepsdesign", "io.cdap.plugin.gcs.stepsdesign",
"io.cdap.plugin.common.stepsdesign", "io.cdap.plugin.bigquery.stepsdesign", "stepsdesign"},
tags = {"@PubSub_Sink"},
tags = {"@PubSub_Sink","@PubSub_DataStream"},
monochrome = true,
plugin = {"pretty", "html:target/cucumber-html-report/pubsub-sink",
"json:target/cucumber-reports/cucumber-pubsub-sink.json",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
features = {"src/e2e-test/features"},
glue = {"io.cdap.plugin.pubsub.stepsdesign", "io.cdap.plugin.gcs.stepsdesign",
"io.cdap.plugin.common.stepsdesign", "io.cdap.plugin.bigquery.stepsdesign", "stepsdesign"},
tags = {"@PubSub_Sink_Required"},
tags = {"@PubSub_Sink_Required","@PubSub_DataStream_Required"},
monochrome = true,
plugin = {"pretty", "html:target/cucumber-html-report/pubsub-sink-required",
"json:target/cucumber-reports/cucumber-pubsub-sink-required.json",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,16 @@
import io.cdap.plugin.utils.E2EHelper;
import io.cdap.plugin.utils.E2ETestConstants;
import io.cdap.plugin.utils.PubSubClient;
import io.cucumber.java.en.And;
import io.cucumber.java.en.Then;
import io.cucumber.java.en.When;
import org.apache.commons.lang3.StringUtils;
import org.junit.Assert;
import org.openqa.selenium.support.ui.Select;
import stepsdesign.BeforeActions;

import java.io.IOException;
import java.util.concurrent.TimeUnit;

/**
* PubSub Sink Plugin related step design.
Expand Down Expand Up @@ -234,4 +237,31 @@ public void enterRuntimeArgumentValueForPubSubSinkPropertyTopicKey(String runtim
public void clickOnPreviewDataForPubSubSink() {
openSinkPluginPreviewData("GooglePublisher");
}

@Then("Subscribe to the messages")
public void subscribeToTheMessages() throws InterruptedException {
TimeUnit time = TimeUnit.SECONDS;
time.sleep(60);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it will hault the code execution , can we use implicit wait ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have to use it, otherwise steps run very fast and they fail

PubSubClient.subscribeAsyncExample(PluginPropertyUtils.pluginProp(ConstantsUtil.PROJECT_ID),
TestSetupHooks.pubSubSourceSubscription);
}

@And("Click on batch time and select format")
public void clickOnBatchTimeAndSelectFormat() {
Select select = new Select(PubSubLocators.batchTime);
select.selectByIndex(0);
Select selectformat = new Select(PubSubLocators.timeSelect);
selectformat.selectByIndex(1);
ElementHelper.clickOnElement(PubSubLocators.saveButton);
}

@And("Click on configure button")
public void clickOnConfigureButton() {
ElementHelper.clickOnElement(PubSubLocators.configButton);
}

@And("Click on pipeline config")
public void clickOnPipelineConfig() {
ElementHelper.clickOnElement(PubSubLocators.pipelineConfig);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,17 @@

package io.cdap.plugin.pubsub.stepsdesign;

import io.cdap.e2e.pages.locators.CdfStudioLocators;
import io.cdap.e2e.utils.CdfHelper;
import io.cdap.e2e.utils.ElementHelper;
import io.cdap.plugin.common.stepsdesign.TestSetupHooks;
import io.cdap.plugin.pubsub.actions.PubSubActions;
import io.cucumber.java.en.Then;
import io.cucumber.java.en.When;

import java.io.IOException;
import java.util.concurrent.TimeUnit;

/**
* PubSub Source Plugin related step design.
*/
Expand All @@ -35,4 +42,32 @@ public void sourceIsPubSub() {
public void openThePubSubSourceProperties() {
openSourcePluginProperties("GooglePublisher");
}

@Then("Enter PubSub source property topic name")
public void enterPubSubSourcePropertyTopicName() {
PubSubActions.enterPubSubTopic(TestSetupHooks.pubSubSourceTopic);
}

@Then("Enter PubSub source property subscription name")
public void enterPubSubSourcePropertySubscriptionName() {
PubSubActions.enterSubscription(TestSetupHooks.pubSubSourceSubscription);
}

@Then("Publish the messages")
public void publishTheMessage() throws IOException, InterruptedException {
TimeUnit time = TimeUnit.SECONDS;
time.sleep(120);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

time.sleep will hault the program ,

TestSetupHooks.publishMessage();
}

@Then("Enter runtime argument value for PubSub source property topic key {string}")
public void enterRuntimeArgumentValueForPubSubSourcePropertyTopicKey(String runtimeArgumentKey) {
ElementHelper.sendKeys(CdfStudioLocators.runtimeArgsValue(runtimeArgumentKey), TestSetupHooks.pubSubSourceTopic);
}

@Then("Enter runtime argument value for PubSub source property subscription key {string}")
public void enterRuntimeArgumentValueForPubSubSourcePropertySubscriptionKey(String runtimeArgumentKey) {
ElementHelper.sendKeys(CdfStudioLocators.runtimeArgsValue(runtimeArgumentKey),
TestSetupHooks.pubSubSourceSubscription);
}
}
Loading