Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pubsub new tests e2e #33

Closed
wants to merge 10 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 5 additions & 8 deletions .github/workflows/e2e.yml
Original file line number Diff line number Diff line change
Expand Up @@ -74,21 +74,18 @@ jobs:
- name: Run all e2e tests
if: github.event_name == 'workflow_dispatch' || github.event_name == 'push' || steps.filter.outputs.e2e-test == 'true'
run: python3 e2e/src/main/scripts/run_e2e_test.py --testRunner **/${{ matrix.tests }}/**/TestRunner.java
- name: Upload report
uses: actions/upload-artifact@v3
if: always()
with:
name: Cucumber report - ${{ matrix.tests }}
path: ./plugin/target/cucumber-reports
- name: Upload debug files
uses: actions/upload-artifact@v3
if: always()
with:
name: Debug files - ${{ matrix.tests }}
path: ./**/target/e2e-debug
- name: Upload files to GCS
uses: google-github-actions/upload-cloud-storage@v0
uses: google-github-actions/upload-cloud-storage@v2
if: always()
with:
path: ./plugin/target/cucumber-reports
destination: e2e-tests-cucumber-reports/${{ github.event.repository.name }}/${{ github.ref }}
destination: e2e-tests-cucumber-reports/${{ github.event.repository.name }}/${{ github.ref }}/${{ matrix.tests }}
- name: Cucumber Report URL
if: always()
run: echo "https://storage.googleapis.com/e2e-tests-cucumber-reports/${{ github.event.repository.name }}/${{ github.ref }}/${{ matrix.tests }}/cucumber-reports/advanced-reports/cucumber-html-reports/overview-features.html"
8 changes: 8 additions & 0 deletions docs/BigQueryExecute-action.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,14 @@ write BigQuery data to this project.

**SQL**: SQL command to execute.

**BQ Job Labels:** Key value pairs to be added as labels to the BigQuery job. Keys must be unique. (Macro Enabled)

[job_source, type] are system defined labels used by CDAP for internal purpose and cannot be used as label keys.
Macro format is supported. example `key1:val1,key2:val2`

Keys and values can contain only lowercase letters, numeric characters, underscores, and dashes.
For more information about labels, see [Docs](https://cloud.google.com/bigquery/docs/labels-intro#requirements).

**Dialect**: Dialect of the SQL command. The value must be 'legacy' or 'standard'. If set to 'standard',
the query will use BigQuery's standard SQL: https://cloud.google.com/bigquery/sql-reference/.
If set to 'legacy', BigQuery's legacy SQL dialect will be used for this query.
Expand Down
6 changes: 6 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@
<slf4j.version>1.7.5</slf4j.version>
<spark3.version>3.3.2</spark3.version>
<spark-bq-connector.version>0.23.1</spark-bq-connector.version>
<failsafe.version>3.3.2</failsafe.version>
<testSourceLocation>${project.basedir}/src/test/java/</testSourceLocation>
</properties>

Expand Down Expand Up @@ -840,6 +841,11 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>dev.failsafe</groupId>
<artifactId>failsafe</artifactId>
<version>${failsafe.version}</version>
</dependency>
<!-- End: dependencies used by the Spark-BigQuery connector -->
<!-- Start: dependency used by the Dataplex connector -->
<dependency>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
# Copyright © 2023 Cask Data, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy of
# the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.

@PubSub_Source
Feature: PubSub Source - Verification of PubSub to PubSub successful data transfer in different formats.

@PUBSUB_SOURCE_TEST @PUBSUB_SINK_TEST @PUBSUB_SUBSCRIPTION_TEST @PUBSUB_MESSAGE_TEST
Scenario: Validate successful transfer of records from PubSub(source) to PubSub(sink).
Given Open Datafusion Project to configure pipeline
When Select data pipeline type as: "Realtime"
When Expand Plugin group in the LHS plugins list: "Source"
When Select plugin: "Pub/Sub" from the plugins list as: "Source"
When Expand Plugin group in the LHS plugins list: "Sink"
When Select plugin: "Pub/Sub" from the plugins list as: "Sink"
Then Connect plugins: "Pub/Sub" and "Pub/Sub2" to establish connection
Then Navigate to the properties page of plugin: "Pub/Sub"
Then Override Service account details if set in environment variables
Then Enter input plugin property: "referenceName" with value: "BQReferenceName"
Then Enter PubSub source property subscription name
Then Enter PubSub source property topic name
Then Validate "Pub/Sub" plugin properties
And Close the Plugin Properties page
Then Open the PubSub sink properties
Then Enter PubSub property projectId "projectId"
Then Override Service account details if set in environment variables
Then Enter PubSub property reference name
Then Enter PubSub sink property topic name
Then Validate "Pub/Sub" plugin properties
And Close the Plugin Properties page
And Click on configure button
And Click on pipeline config
And Click on batch time and select format
Then Save the pipeline
Then Deploy the pipeline
Then Run the Pipeline in Runtime
Then Wait for pipeline to be in status: "Running" with a timeout of 240 seconds
Then Publish the messages
Then Subscribe to the messages
Then Validate OUT record count is equal to IN record count
And Stop the pipeline
Then Verify the pipeline status is "Stopped"

@PUBSUB_SOURCE_TEST @PUBSUB_SINK_TEST @PUBSUB_SUBSCRIPTION_TEST @PUBSUB_MESSAGE_TEST
Scenario: Validate successful transfer of records from PubSub(source) to PubSub(sink) using macros.
Given Open Datafusion Project to configure pipeline
When Select data pipeline type as: "Realtime"
When Expand Plugin group in the LHS plugins list: "Source"
When Select plugin: "Pub/Sub" from the plugins list as: "Source"
When Expand Plugin group in the LHS plugins list: "Sink"
When Select plugin: "Pub/Sub" from the plugins list as: "Sink"
Then Connect plugins: "Pub/Sub" and "Pub/Sub2" to establish connection
Then Navigate to the properties page of plugin: "Pub/Sub"
Then Override Service account details if set in environment variables
Then Enter input plugin property: "referenceName" with value: "BQReferenceName"
Then Click on the Macro button of Property: "topic" and set the value to: "PubSubSourceTopic"
Then Click on the Macro button of Property: "subscription" and set the value to: "PubSubSourceSubscription"
Then Validate "Pub/Sub" plugin properties
And Close the Plugin Properties page
Then Open the PubSub sink properties
Then Enter PubSub property projectId "projectId"
Then Override Service account details if set in environment variables
Then Enter PubSub property reference name
Then Enter PubSub sink property topic name
Then Validate "Pub/Sub" plugin properties
And Close the Plugin Properties page
And Click on configure button
And Click on pipeline config
And Click on batch time and select format
Then Save the pipeline
Then Deploy the pipeline
Then Enter runtime argument value for PubSub source property topic key "PubSubSourceTopic"
Then Enter runtime argument value for PubSub source property subscription key "PubSubSourceSubscription"
Then Run the Pipeline in Runtime with runtime arguments
Then Wait for pipeline to be in status: "Running" with a timeout of 240 seconds
Then Publish the messages
Then Subscribe to the messages
Then Validate OUT record count is equal to IN record count
And Stop the pipeline
Then Verify the pipeline status is "Stopped"

@PUBSUB_SOURCE_TEST @PUBSUB_SINK_TEST @PUBSUB_SUBSCRIPTION_TEST @PUBSUB_MESSAGE_TEST
Scenario: Validate successful transfer of records from PubSub(source) to PubSub(sink) having formats TEXT at both source and sink.
Given Open Datafusion Project to configure pipeline
When Select data pipeline type as: "Realtime"
When Expand Plugin group in the LHS plugins list: "Source"
When Select plugin: "Pub/Sub" from the plugins list as: "Source"
When Expand Plugin group in the LHS plugins list: "Sink"
When Select plugin: "Pub/Sub" from the plugins list as: "Sink"
Then Connect plugins: "Pub/Sub" and "Pub/Sub2" to establish connection
Then Navigate to the properties page of plugin: "Pub/Sub"
Then Override Service account details if set in environment variables
Then Enter input plugin property: "referenceName" with value: "BQReferenceName"
Then Enter PubSub source property subscription name
Then Enter PubSub source property topic name
Then Select dropdown plugin property: "select-format" with option value: "text"
Then Validate "Pub/Sub" plugin properties
And Close the Plugin Properties page
Then Open the PubSub sink properties
Then Enter PubSub property projectId "projectId"
Then Override Service account details if set in environment variables
Then Enter PubSub property reference name
Then Enter PubSub sink property topic name
Then Select dropdown plugin property: "select-format" with option value: "text"
Then Validate "Pub/Sub" plugin properties
And Close the Plugin Properties page
And Click on configure button
And Click on pipeline config
And Click on batch time and select format
Then Save the pipeline
Then Deploy the pipeline
Then Run the Pipeline in Runtime
Then Wait for pipeline to be in status: "Running" with a timeout of 240 seconds
Then Publish the messages
Then Subscribe to the messages
Then Validate OUT record count is equal to IN record count
And Stop the pipeline
Then Verify the pipeline status is "Stopped"

@PUBSUB_SOURCE_TEST @PUBSUB_SINK_TEST @PUBSUB_SUBSCRIPTION_TEST @PUBSUB_MESSAGE_TEST
Scenario: Validate successful transfer of records from PubSub(source) to PubSub(sink) having formats TEXT at source and JSON at sink.
Given Open Datafusion Project to configure pipeline
When Select data pipeline type as: "Realtime"
When Expand Plugin group in the LHS plugins list: "Source"
When Select plugin: "Pub/Sub" from the plugins list as: "Source"
When Expand Plugin group in the LHS plugins list: "Sink"
When Select plugin: "Pub/Sub" from the plugins list as: "Sink"
Then Connect plugins: "Pub/Sub" and "Pub/Sub2" to establish connection
Then Navigate to the properties page of plugin: "Pub/Sub"
Then Override Service account details if set in environment variables
Then Enter input plugin property: "referenceName" with value: "BQReferenceName"
Then Enter PubSub source property subscription name
Then Enter PubSub source property topic name
Then Select dropdown plugin property: "select-format" with option value: "text"
Then Validate "Pub/Sub" plugin properties
And Close the Plugin Properties page
Then Open the PubSub sink properties
Then Enter PubSub property projectId "projectId"
Then Override Service account details if set in environment variables
Then Enter PubSub property reference name
Then Enter PubSub sink property topic name
Then Select dropdown plugin property: "select-format" with option value: "json"
Then Validate "Pub/Sub" plugin properties
And Close the Plugin Properties page
And Click on configure button
And Click on pipeline config
And Click on batch time and select format
Then Save the pipeline
Then Deploy the pipeline
Then Run the Pipeline in Runtime
Then Wait for pipeline to be in status: "Running" with a timeout of 240 seconds
Then Publish the messages
Then Subscribe to the messages
Then Validate OUT record count is equal to IN record count
And Stop the pipeline
Then Verify the pipeline status is "Stopped"
36 changes: 36 additions & 0 deletions src/e2e-test/features/pubsub/sink/BQToPubSub.feature
Original file line number Diff line number Diff line change
Expand Up @@ -40,3 +40,39 @@ Feature: PubSub-Sink - Verification of BigQuery to PubSub successful data transf
Then Validate OUT record count is equal to IN record count
Then Open and capture logs
Then Validate the cmek key "cmekPubSub" of target PubSub topic if cmek is enabled

@PUBSUB_SOURCE_TEST @PUBSUB_SINK_TEST @PUBSUB_SUBSCRIPTION_TEST
Scenario: pubsub flow
Given Open Datafusion Project to configure pipeline
When Select data pipeline type as: "Realtime"
When Expand Plugin group in the LHS plugins list: "Source"
When Select plugin: "Pub/Sub" from the plugins list as: "Source"
When Expand Plugin group in the LHS plugins list: "Sink"
When Select plugin: "Pub/Sub" from the plugins list as: "Sink"
Then Connect plugins: "Pub/Sub" and "Pub/Sub2" to establish connection
Then Navigate to the properties page of plugin: "Pub/Sub"
Then Override Service account details if set in environment variables
Then Enter input plugin property: "referenceName" with value: "BQReferenceName"
Then Enter PubSub source property subscription name
Then Enter PubSub source property topic name
Then Validate "Pub/Sub" plugin properties
And Close the Plugin Properties page
Then Open the PubSub sink properties
Then Enter PubSub property projectId "projectId"
Then Override Service account details if set in environment variables
Then Enter PubSub property reference name
Then Enter PubSub sink property topic name
Then Validate "Pub/Sub" plugin properties
And Close the Plugin Properties page
And Click on configure button
And Click on pipeline config
And Click on batch time and select format
Then Save the pipeline
Then Deploy the pipeline
Then Run the Pipeline in Runtime
Then Wait for pipeline to be in status: "Running" with a timeout of 240 seconds
Then Publish the messages
Then Subscribe to the messages
Then Validate OUT record count is equal to IN record count
And Stop the pipeline
Then Verify the pipeline status is "Stopped"
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.cdap.e2e.pages.actions.CdfConnectionActions;
import io.cdap.e2e.pages.actions.CdfPluginPropertiesActions;
import io.cdap.e2e.utils.BigQueryClient;
import io.cdap.e2e.utils.ConstantsUtil;
import io.cdap.e2e.utils.PluginPropertyUtils;
import io.cdap.e2e.utils.StorageClient;
import io.cdap.plugin.utils.PubSubClient;
Expand Down Expand Up @@ -62,6 +63,8 @@ public class TestSetupHooks {
public static String bqSourceTable2 = StringUtils.EMPTY;
public static String bqSourceView = StringUtils.EMPTY;
public static String pubSubTargetTopic = StringUtils.EMPTY;
public static String pubSubSourceTopic = StringUtils.EMPTY;
public static String pubSubSourceSubscription = StringUtils.EMPTY;
public static String spannerInstance = StringUtils.EMPTY;
public static String spannerDatabase = StringUtils.EMPTY;
public static String spannerSourceTable = StringUtils.EMPTY;
Expand Down Expand Up @@ -480,6 +483,41 @@ private static String createGCSBucketWithFilesAndFolder(String folderPath) throw
return bucketName;
}

@Before(order = 1, value = "@PUBSUB_SOURCE_TEST")
public static void createSourcePubSubTopic() throws IOException {
pubSubSourceTopic = "cdf-e2e-test-" + UUID.randomUUID();
PubSubClient.createTopic(pubSubSourceTopic);
BeforeActions.scenario.write("Source PubSub topic " + pubSubSourceTopic);
}

@Before(order = 1, value = "@PUBSUB_SUBSCRIPTION_TEST")
public static void createSubscriptionPubSubTopic() throws IOException {
pubSubSourceSubscription = "cdf-e2e-test-" + UUID.randomUUID();
PubSubClient.createSubscription(pubSubSourceSubscription , pubSubSourceTopic);
BeforeActions.scenario.write("Source PubSub subscription " + pubSubSourceSubscription);
}
@After(order = 1, value = "@PUBSUB_SOURCE_TESTt")
public static void deleteSourcePubSubTopic() {
try {
PubSubClient.deleteTopic(pubSubSourceTopic);
BeforeActions.scenario.write("Deleted target PubSub topic " + pubSubSourceTopic);
pubSubSourceTopic = StringUtils.EMPTY;
} catch (Exception e) {
if (e.getMessage().contains("Invalid resource name given") || e.getMessage().contains("Resource not found")) {

}
}
}

public static void publishMessage() throws IOException, InterruptedException {
String dataMessage1 = PluginPropertyUtils.pluginProp("firstMessage");
String dataMessage2 = PluginPropertyUtils.pluginProp("secondMessage");
List<String> dataMessagesList = Arrays.asList(dataMessage1, dataMessage2);
PubSubClient.publishWithErrorHandlerExample(PluginPropertyUtils.pluginProp(ConstantsUtil.PROJECT_ID),
pubSubSourceTopic, dataMessagesList);
}


@Before(order = 1, value = "@PUBSUB_SINK_TEST")
public static void createTargetPubSubTopic() {
pubSubTargetTopic = "cdf-e2e-test-" + UUID.randomUUID();
Expand Down Expand Up @@ -1141,7 +1179,7 @@ public static void createSinkBQDedupeTable() throws IOException, InterruptedExce

@Before(value = "@BQ_INSERT_INT_SOURCE_TEST")
public static void createSourceBQTable() throws IOException, InterruptedException {
bqSourceTable = "E2E_TARGET_" + UUID.randomUUID().toString().replaceAll("-", "_");
bqSourceTable = "E2E_SOURCE_" + UUID.randomUUID().toString().replaceAll("-", "_");
PluginPropertyUtils.addPluginProp("bqSourceTable", bqSourceTable);
BeforeActions.scenario.write("BQ source table name - " + bqSourceTable);
BigQueryClient.getSoleQueryResult("create table `" + datasetName + "." + bqSourceTable + "` " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,4 +74,19 @@ public static WebElement selectedFormat(String format) {
return SeleniumDriver.getDriver()
.findElement(By.xpath("//*[@data-cy='select-format']/div[text()='" + format + "']"));
}

@FindBy(how = How.XPATH, using = "//span[contains(text(), \"Batch interval\")]//following-sibling::div//select[1]")
public static WebElement batchTime;

@FindBy(how = How.XPATH, using = "//span[contains(text(), \"Batch interval\")]//following-sibling::div//select[2]")
public static WebElement timeSelect;

@FindBy(how = How.XPATH, using = "//button[@data-testid='config-apply-close']")
public static WebElement saveButton;

@FindBy(how = How.XPATH, using = "//*[@data-cy='pipeline-configure-modeless-btn']")
public static WebElement configButton;

@FindBy(how = How.XPATH, using = "//*[@data-cy='tab-content-Pipeline config']")
public static WebElement pipelineConfig;
}
Loading