From 57310b20406021236c9bec3ba5f0b34a7533ecda Mon Sep 17 00:00:00 2001 From: mivanac Date: Sat, 11 Dec 2021 10:13:14 +0100 Subject: [PATCH] WIP ignore stopping --- .../cache/wan/AbstractGatewaySender.java | 10 ++++ .../parallel/ParallelGatewaySenderQueue.java | 5 ++ .../AlterGatewaySenderCommandDUnitTest.java | 50 +++++++++++++++++++ ...eDestroyGatewaySenderCommandDUnitTest.java | 6 +++ .../parallel/ParallelGatewaySenderImpl.java | 5 ++ 5 files changed, 76 insertions(+) diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java index 4e9f7883d3d8..093f6dc101b6 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java @@ -238,6 +238,8 @@ public abstract class AbstractGatewaySender implements InternalGatewaySender, Di protected boolean enforceThreadsConnectSameReceiver; + private boolean shadowRegionCreated = false; + protected AbstractGatewaySender() { statisticsClock = disabledClock(); } @@ -1537,6 +1539,14 @@ public String getExpectedReceiverUniqueId() { return expectedReceiverUniqueId; } + public void setShadowRegionCreated() { + this.shadowRegionCreated = true; + } + + public boolean isShadowRegionCreated() { + return this.shadowRegionCreated; + } + /** * Has a reference to a GatewayEventImpl and has a timeout value. */ diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java index d19af61754aa..cdd17d618cb9 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java @@ -572,6 +572,11 @@ public void addShadowPartitionedRegionForUserPR(PartitionedRegion userPR, if (prQ == null) { return; } + + if (!sender.isShadowRegionCreated()) { + sender.setShadowRegionCreated(); + } + // TODO This should not be set on the PR but on the GatewaySender prQ.enableConflation(sender.isBatchConflationEnabled()); if (isAccessor) { diff --git a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/wancommand/AlterGatewaySenderCommandDUnitTest.java b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/wancommand/AlterGatewaySenderCommandDUnitTest.java index 14cc30bec75a..4f781f025246 100644 --- a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/wancommand/AlterGatewaySenderCommandDUnitTest.java +++ b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/wancommand/AlterGatewaySenderCommandDUnitTest.java @@ -97,6 +97,7 @@ public void before() throws Exception { @After public void after() { + gfsh.executeAndAssertThat("destroy region --name=parentRegion").statusIsSuccess(); gfsh.executeAndAssertThat(DESTROY + " --if-exists").statusIsSuccess(); gfsh.executeAndAssertThat(DESTROY_PARALLEL + " --if-exists").statusIsSuccess(); exln.remove(); @@ -118,6 +119,10 @@ public void testCreateSerialGatewaySenderWithDefault() throws Exception { gfsh.executeAndAssertThat("list gateways").statusIsSuccess() .containsOutput("sender1"); + gfsh.executeAndAssertThat("create region" + + " --name=parentRegion" + + " --type=PARTITION" + + " --gateway-sender-id=sender1").statusIsSuccess(); // verify that server1's event queue has the default value server1.invoke(() -> { InternalCache cache = ClusterStartupRule.getCache(); @@ -142,6 +147,11 @@ public void testCreateSerialGatewaySenderAndAlterBatchSize() throws Exception { gfsh.executeAndAssertThat("list gateways").statusIsSuccess() .containsOutput("sender1"); + gfsh.executeAndAssertThat("create region" + + " --name=parentRegion" + + " --type=PARTITION" + + " --gateway-sender-id=sender1").statusIsSuccess(); + gfsh.executeAndAssertThat( "alter gateway-sender --id=sender1 --batch-size=200 --alert-threshold=100") .statusIsSuccess(); @@ -169,6 +179,11 @@ public void testCreateSerialGatewaySenderAndInvalidAlterBatchSize() throws Excep gfsh.executeAndAssertThat("list gateways").statusIsSuccess() .containsOutput("sender1"); + gfsh.executeAndAssertThat("create region" + + " --name=parentRegion" + + " --type=PARTITION" + + " --gateway-sender-id=sender1").statusIsSuccess(); + gfsh.executeAndAssertThat( "alter gateway-sender --id=sender1 --batch-size=-10 --alert-threshold=100") .statusIsError(); @@ -196,6 +211,11 @@ public void testCreateSerialGatewaySenderAndAlterBatchSizeCheckConfig() throws E gfsh.executeAndAssertThat("list gateways").statusIsSuccess() .containsOutput("sender1"); + gfsh.executeAndAssertThat("create region" + + " --name=parentRegion" + + " --type=PARTITION" + + " --gateway-sender-id=sender1").statusIsSuccess(); + gfsh.executeAndAssertThat( "alter gateway-sender --id=sender1 --batch-size=200 --alert-threshold=100") .statusIsSuccess(); @@ -243,6 +263,11 @@ public void testCreateSerialGatewaySenderAndChangeGroupTransaction() throws Exce gfsh.executeAndAssertThat("list gateways").statusIsSuccess() .containsOutput("sender1"); + gfsh.executeAndAssertThat("create region" + + " --name=parentRegion" + + " --type=PARTITION" + + " --gateway-sender-id=sender1").statusIsSuccess(); + gfsh.executeAndAssertThat("alter gateway-sender --id=sender1 --group-transaction-events=true") .statusIsError() .containsOutput("alter-gateway-sender cannot be performed for --group-transaction-events"); @@ -261,6 +286,11 @@ public void testCreateSerialGatewaySenderAndAlterBatchSizeServerDown() throws Ex gfsh.executeAndAssertThat("list gateways").statusIsSuccess() .containsOutput("sender1"); + gfsh.executeAndAssertThat("create region" + + " --name=parentRegion" + + " --type=PARTITION" + + " --gateway-sender-id=sender1").statusIsSuccess(); + server1.stop(false); gfsh.executeAndAssertThat( @@ -296,6 +326,11 @@ public void testCreateSerialGatewaySenderAndAlterEventFiters() throws Exception gfsh.executeAndAssertThat("list gateways").statusIsSuccess() .containsOutput("sender1"); + gfsh.executeAndAssertThat("create region" + + " --name=parentRegion" + + " --type=PARTITION" + + " --gateway-sender-id=sender1").statusIsSuccess(); + gfsh.executeAndAssertThat( "alter gateway-sender --id=sender1 --batch-size=200 --alert-threshold=100 --gateway-event-filter=" + MyGatewayEventFilter.class.getName()) @@ -365,6 +400,11 @@ public void testCreateSerialGatewaySenderAndAlterEventFitersAndRemove() throws E gfsh.executeAndAssertThat("list gateways").statusIsSuccess() .containsOutput("sender1"); + gfsh.executeAndAssertThat("create region" + + " --name=parentRegion" + + " --type=PARTITION" + + " --gateway-sender-id=sender1").statusIsSuccess(); + gfsh.executeAndAssertThat( "alter gateway-sender --id=sender1 --batch-size=200 --alert-threshold=100 --gateway-event-filter=" + MyGatewayEventFilter.class.getName()) @@ -432,6 +472,11 @@ public void testCreateParallelGatewaySenderAndAlterBatchSize() throws Exception gfsh.executeAndAssertThat("list gateways").statusIsSuccess() .containsOutput("sender1P"); + gfsh.executeAndAssertThat("create region" + + " --name=parentRegion" + + " --type=PARTITION" + + " --gateway-sender-id=sender1P").statusIsSuccess(); + gfsh.executeAndAssertThat( "alter gateway-sender --id=sender1P --batch-size=200 --alert-threshold=100") .statusIsSuccess(); @@ -459,6 +504,11 @@ public void testCreateParallelGatewaySenderAndChangeGroupTransaction() throws Ex gfsh.executeAndAssertThat("list gateways").statusIsSuccess() .containsOutput("sender1P"); + gfsh.executeAndAssertThat("create region" + + " --name=parentRegion" + + " --type=PARTITION" + + " --gateway-sender-id=sender1P").statusIsSuccess(); + gfsh.executeAndAssertThat("alter gateway-sender --id=sender1P --group-transaction-events=true") .statusIsSuccess(); diff --git a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/wancommand/CreateDestroyGatewaySenderCommandDUnitTest.java b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/wancommand/CreateDestroyGatewaySenderCommandDUnitTest.java index 854329ec507e..fa86b0fbebbc 100644 --- a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/wancommand/CreateDestroyGatewaySenderCommandDUnitTest.java +++ b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/wancommand/CreateDestroyGatewaySenderCommandDUnitTest.java @@ -378,6 +378,12 @@ public void testCreateDestroyParallelGatewaySender() { "GatewaySender \"ln\" created on \"" + SERVER_4 + "\"", "GatewaySender \"ln\" created on \"" + SERVER_5 + "\""); + gfsh.executeAndAssertThat("create region" + + " --name=parentRegion" + + " --type=PARTITION" + + " --gateway-sender-id=ln").statusIsSuccess(); + + gfsh.executeAndAssertThat("destroy region --name=parentRegion").statusIsSuccess(); // destroy gateway sender and verify AEQs cleaned up gfsh.executeAndAssertThat(DESTROY).statusIsSuccess() .doesNotContainOutput("Did not complete waiting") diff --git a/geode-wan/src/main/java/org/apache/geode/cache/wan/internal/parallel/ParallelGatewaySenderImpl.java b/geode-wan/src/main/java/org/apache/geode/cache/wan/internal/parallel/ParallelGatewaySenderImpl.java index 5d9241afe4c1..459e7bb71472 100644 --- a/geode-wan/src/main/java/org/apache/geode/cache/wan/internal/parallel/ParallelGatewaySenderImpl.java +++ b/geode-wan/src/main/java/org/apache/geode/cache/wan/internal/parallel/ParallelGatewaySenderImpl.java @@ -115,6 +115,11 @@ public void stop() { if (!this.isRunning()) { return; } + + if (!isShadowRegionCreated()) { + return; + } + // Stop the dispatcher stopProcessing();