Skip to content

Commit

Permalink
pubsub to pubsub scenarios
Browse files Browse the repository at this point in the history
  • Loading branch information
priyabhatnagar25 committed Jan 9, 2024
1 parent d5c57ec commit 692e693
Show file tree
Hide file tree
Showing 5 changed files with 33 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
@PubSub_Source
Feature: PubSub Source - Verification of PubSub to PubSub successful data transfer in different formats.

@PUBSUB_SOURCE_TEST @PUBSUB_SINK_TEST @PUBSUB_SUBSCRIPTION_TEST
@PUBSUB_SOURCE_TEST @PUBSUB_SINK_TEST @PUBSUB_SUBSCRIPTION_TEST @PUBSUB_MESSAGE_TEST
Scenario: Validate successful transfer of records from PubSub(source) to PubSub(sink).
Given Open Datafusion Project to configure pipeline
When Select data pipeline type as: "Realtime"
Expand Down Expand Up @@ -51,7 +51,7 @@ Feature: PubSub Source - Verification of PubSub to PubSub successful data transf
And Stop the pipeline
Then Verify the pipeline status is "Stopped"

@PUBSUB_SOURCE_TEST @PUBSUB_SINK_TEST @PUBSUB_SUBSCRIPTION_TEST
@PUBSUB_SOURCE_TEST @PUBSUB_SINK_TEST @PUBSUB_SUBSCRIPTION_TEST @PUBSUB_MESSAGE_TEST
Scenario: Validate successful transfer of records from PubSub(source) to PubSub(sink) using macros.
Given Open Datafusion Project to configure pipeline
When Select data pipeline type as: "Realtime"
Expand All @@ -63,8 +63,8 @@ Feature: PubSub Source - Verification of PubSub to PubSub successful data transf
Then Navigate to the properties page of plugin: "Pub/Sub"
Then Override Service account details if set in environment variables
Then Enter input plugin property: "referenceName" with value: "BQReferenceName"
Then Click on the Macro button of Property: "topic" and set the value in textarea: "PubSubSourceTopic"
Then Click on the Macro button of Property: "subscription" and set the value in textarea: "PubSubSourceSubscription"
Then Click on the Macro button of Property: "topic" and set the value to: "PubSubSourceTopic"
Then Click on the Macro button of Property: "subscription" and set the value to: "PubSubSourceSubscription"
Then Validate "Pub/Sub" plugin properties
And Close the Plugin Properties page
Then Open the PubSub sink properties
Expand All @@ -78,9 +78,9 @@ Feature: PubSub Source - Verification of PubSub to PubSub successful data transf
And Click on pipeline config
And Click on batch time and select format
Then Save the pipeline
Then Deploy the pipeline
Then Enter runtime argument value for PubSub source property topic key "PubSubSourceTopic"
Then Enter runtime argument value for PubSub source property subscription key "PubSubSourceSubscription"
Then Deploy the pipeline
Then Run the Pipeline in Runtime with runtime arguments
Then Wait for pipeline to be in status: "Running" with a timeout of 240 seconds
Then Publish the messages
Expand All @@ -89,7 +89,7 @@ Feature: PubSub Source - Verification of PubSub to PubSub successful data transf
And Stop the pipeline
Then Verify the pipeline status is "Stopped"

@PUBSUB_SOURCE_TEST @PUBSUB_SINK_TEST @PUBSUB_SUBSCRIPTION_TEST
@PUBSUB_SOURCE_TEST @PUBSUB_SINK_TEST @PUBSUB_SUBSCRIPTION_TEST @PUBSUB_MESSAGE_TEST
Scenario: Validate successful transfer of records from PubSub(source) to PubSub(sink) having formats TEXT at both source and sink.
Given Open Datafusion Project to configure pipeline
When Select data pipeline type as: "Realtime"
Expand Down Expand Up @@ -127,8 +127,8 @@ Feature: PubSub Source - Verification of PubSub to PubSub successful data transf
And Stop the pipeline
Then Verify the pipeline status is "Stopped"

@PUBSUB_SOURCE_TEST @PUBSUB_SINK_TEST @PUBSUB_SUBSCRIPTION_TEST
Scenario: Validate successful transfer of records from PubSub(source) to PubSub(sink) having formats TEXT at both source and sink.
@PUBSUB_SOURCE_TEST @PUBSUB_SINK_TEST @PUBSUB_SUBSCRIPTION_TEST @PUBSUB_MESSAGE_TEST
Scenario: Validate successful transfer of records from PubSub(source) to PubSub(sink) having formats TEXT at source and JSON at sink.
Given Open Datafusion Project to configure pipeline
When Select data pipeline type as: "Realtime"
When Expand Plugin group in the LHS plugins list: "Source"
Expand All @@ -149,7 +149,7 @@ Feature: PubSub Source - Verification of PubSub to PubSub successful data transf
Then Override Service account details if set in environment variables
Then Enter PubSub property reference name
Then Enter PubSub sink property topic name
Then Select dropdown plugin property: "select-format" with option value: "text"
Then Select dropdown plugin property: "select-format" with option value: "json"
Then Validate "Pub/Sub" plugin properties
And Close the Plugin Properties page
And Click on configure button
Expand All @@ -163,4 +163,4 @@ Feature: PubSub Source - Verification of PubSub to PubSub successful data transf
Then Subscribe to the messages
Then Validate OUT record count is equal to IN record count
And Stop the pipeline
Then Verify the pipeline status is "Stopped"
Then Verify the pipeline status is "Stopped"
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.cdap.e2e.pages.actions.CdfConnectionActions;
import io.cdap.e2e.pages.actions.CdfPluginPropertiesActions;
import io.cdap.e2e.utils.BigQueryClient;
import io.cdap.e2e.utils.ConstantsUtil;
import io.cdap.e2e.utils.PluginPropertyUtils;
import io.cdap.e2e.utils.StorageClient;
import io.cdap.plugin.utils.PubSubClient;
Expand Down Expand Up @@ -492,7 +493,7 @@ public static void createSourcePubSubTopic() throws IOException {
@Before(order = 1, value = "@PUBSUB_SUBSCRIPTION_TEST")
public static void createSubscriptionPubSubTopic() throws IOException {
pubSubSourceSubscription = "cdf-e2e-test-" + UUID.randomUUID();
PubSubClient.createSubscription(pubSubSourceSubscription,pubSubSourceTopic);
PubSubClient.createSubscription(pubSubSourceSubscription , pubSubSourceTopic);
BeforeActions.scenario.write("Source PubSub subscription " + pubSubSourceSubscription);
}
@After(order = 1, value = "@PUBSUB_SOURCE_TESTt")
Expand All @@ -508,6 +509,15 @@ public static void deleteSourcePubSubTopic() {
}
}

public static void publishMessage() throws IOException, InterruptedException {
String dataMessage1 = PluginPropertyUtils.pluginProp("firstMessage");
String dataMessage2 = PluginPropertyUtils.pluginProp("secondMessage");
List<String> dataMessagesList = Arrays.asList(dataMessage1, dataMessage2);
PubSubClient.publishWithErrorHandlerExample(PluginPropertyUtils.pluginProp(ConstantsUtil.PROJECT_ID),
pubSubSourceTopic, dataMessagesList);
}


@Before(order = 1, value = "@PUBSUB_SINK_TEST")
public static void createTargetPubSubTopic() {
pubSubTargetTopic = "cdf-e2e-test-" + UUID.randomUUID();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,7 @@ public void enterPubSubSourcePropertySubscriptionName() {
public void publishTheMessage() throws IOException, InterruptedException {
TimeUnit time = TimeUnit.SECONDS;
time.sleep(120);
PubSubClient.publishWithErrorHandlerExample(PluginPropertyUtils.pluginProp(ConstantsUtil.PROJECT_ID),
TestSetupHooks.pubSubSourceTopic);
TestSetupHooks.publishMessage();
}

@Then("Enter runtime argument value for PubSub source property topic key {string}")
Expand All @@ -68,6 +67,7 @@ public void enterRuntimeArgumentValueForPubSubSourcePropertyTopicKey(String runt

@Then("Enter runtime argument value for PubSub source property subscription key {string}")
public void enterRuntimeArgumentValueForPubSubSourcePropertySubscriptionKey(String runtimeArgumentKey) {
ElementHelper.sendKeys(CdfStudioLocators.runtimeArgsValue(runtimeArgumentKey), TestSetupHooks.pubSubSourceSubscription);
ElementHelper.sendKeys(CdfStudioLocators.runtimeArgsValue(runtimeArgumentKey),
TestSetupHooks.pubSubSourceSubscription);
}
}
10 changes: 5 additions & 5 deletions src/e2e-test/java/io/cdap/plugin/utils/PubSubClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -93,15 +93,15 @@ public static String getTopicCmekKey(String topicId) throws IOException {
}


public static void publishWithErrorHandlerExample(String projectId, String topicId)
public static void publishWithErrorHandlerExample(String projectId, String topicId, List<String> dataMessages)
throws IOException, InterruptedException {
TopicName topicName = TopicName.of(PluginPropertyUtils.pluginProp(ConstantsUtil.PROJECT_ID), topicId);
Publisher publisher = null;
try {
publisher = Publisher.newBuilder(topicName).build();

List<String> messages = Arrays.asList("first message", "second message");
for (final String message : messages) {
// List<String> messages = Arrays.asList("first message", "second message");
for (final String message : dataMessages) {
ByteString data = ByteString.copyFromUtf8(message);
PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build();
ApiFuture<String> future = publisher.publish(pubsubMessage);
Expand Down Expand Up @@ -158,10 +158,10 @@ public static void subscribeAsyncExample(String projectId, String subscriptionId
subscriber.startAsync().awaitRunning();
System.out.printf("Listening for messages on %s:\n", subscriptionName.toString());
// Allow the subscriber to run for 30s unless an unrecoverable error occurs.
subscriber.awaitTerminated(300, TimeUnit.SECONDS);
subscriber.awaitTerminated(200, TimeUnit.SECONDS);
} catch (TimeoutException timeoutException) {
// Shut down the subscriber after 30s. Stop receiving messages.
subscriber.stopAsync();
}
}
}
}
6 changes: 4 additions & 2 deletions src/e2e-test/resources/pluginParameters.properties
Original file line number Diff line number Diff line change
Expand Up @@ -181,8 +181,6 @@ bqUpdateTableSchemaTrue=True
clusterValue=transaction_date
TableKey=PersonID
bqSourceTable=dummy
#pubSubSourceSubscription=dummy
#pubSubSourceTopic=dummy
bqCreateTableQueryFile=testdata/BigQuery/BigQueryCreateTableQuery.txt
bqInsertDataQueryFile=testdata/BigQuery/BigQueryInsertDataQuery.txt
bqCreateViewQueryFile=testdata/BigQuery/BigQueryCreateViewQuery.txt
Expand Down Expand Up @@ -245,6 +243,10 @@ pubSubErrorThreshold=0
pubSubStringValue=one
pubSubNegativeValue=-100
pubsubDelimiter=@
pubSubSourceSubscription=dummy
pubSubSourceTopic=dummy
firstMessage=hellomessage
secondMessage=himessage
## PUBSUBSINK-PLUGIN-PROPERTIES-END

## GCSDELETE-PLUGIN-PROPERTIES-START
Expand Down

0 comments on commit 692e693

Please sign in to comment.