Skip to content

Commit

Permalink
WIP ignore stopping
Browse files Browse the repository at this point in the history
  • Loading branch information
mivanac committed Dec 13, 2021
1 parent 7a03e80 commit 57310b2
Show file tree
Hide file tree
Showing 5 changed files with 76 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,8 @@ public abstract class AbstractGatewaySender implements InternalGatewaySender, Di

protected boolean enforceThreadsConnectSameReceiver;

private boolean shadowRegionCreated = false;

protected AbstractGatewaySender() {
statisticsClock = disabledClock();
}
Expand Down Expand Up @@ -1537,6 +1539,14 @@ public String getExpectedReceiverUniqueId() {
return expectedReceiverUniqueId;
}

public void setShadowRegionCreated() {
this.shadowRegionCreated = true;
}

public boolean isShadowRegionCreated() {
return this.shadowRegionCreated;
}

/**
* Has a reference to a GatewayEventImpl and has a timeout value.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -572,6 +572,11 @@ public void addShadowPartitionedRegionForUserPR(PartitionedRegion userPR,
if (prQ == null) {
return;
}

if (!sender.isShadowRegionCreated()) {
sender.setShadowRegionCreated();
}

// TODO This should not be set on the PR but on the GatewaySender
prQ.enableConflation(sender.isBatchConflationEnabled());
if (isAccessor) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ public void before() throws Exception {

@After
public void after() {
gfsh.executeAndAssertThat("destroy region --name=parentRegion").statusIsSuccess();
gfsh.executeAndAssertThat(DESTROY + " --if-exists").statusIsSuccess();
gfsh.executeAndAssertThat(DESTROY_PARALLEL + " --if-exists").statusIsSuccess();
exln.remove();
Expand All @@ -118,6 +119,10 @@ public void testCreateSerialGatewaySenderWithDefault() throws Exception {
gfsh.executeAndAssertThat("list gateways").statusIsSuccess()
.containsOutput("sender1");

gfsh.executeAndAssertThat("create region"
+ " --name=parentRegion"
+ " --type=PARTITION"
+ " --gateway-sender-id=sender1").statusIsSuccess();
// verify that server1's event queue has the default value
server1.invoke(() -> {
InternalCache cache = ClusterStartupRule.getCache();
Expand All @@ -142,6 +147,11 @@ public void testCreateSerialGatewaySenderAndAlterBatchSize() throws Exception {
gfsh.executeAndAssertThat("list gateways").statusIsSuccess()
.containsOutput("sender1");

gfsh.executeAndAssertThat("create region"
+ " --name=parentRegion"
+ " --type=PARTITION"
+ " --gateway-sender-id=sender1").statusIsSuccess();

gfsh.executeAndAssertThat(
"alter gateway-sender --id=sender1 --batch-size=200 --alert-threshold=100")
.statusIsSuccess();
Expand Down Expand Up @@ -169,6 +179,11 @@ public void testCreateSerialGatewaySenderAndInvalidAlterBatchSize() throws Excep
gfsh.executeAndAssertThat("list gateways").statusIsSuccess()
.containsOutput("sender1");

gfsh.executeAndAssertThat("create region"
+ " --name=parentRegion"
+ " --type=PARTITION"
+ " --gateway-sender-id=sender1").statusIsSuccess();

gfsh.executeAndAssertThat(
"alter gateway-sender --id=sender1 --batch-size=-10 --alert-threshold=100")
.statusIsError();
Expand Down Expand Up @@ -196,6 +211,11 @@ public void testCreateSerialGatewaySenderAndAlterBatchSizeCheckConfig() throws E
gfsh.executeAndAssertThat("list gateways").statusIsSuccess()
.containsOutput("sender1");

gfsh.executeAndAssertThat("create region"
+ " --name=parentRegion"
+ " --type=PARTITION"
+ " --gateway-sender-id=sender1").statusIsSuccess();

gfsh.executeAndAssertThat(
"alter gateway-sender --id=sender1 --batch-size=200 --alert-threshold=100")
.statusIsSuccess();
Expand Down Expand Up @@ -243,6 +263,11 @@ public void testCreateSerialGatewaySenderAndChangeGroupTransaction() throws Exce
gfsh.executeAndAssertThat("list gateways").statusIsSuccess()
.containsOutput("sender1");

gfsh.executeAndAssertThat("create region"
+ " --name=parentRegion"
+ " --type=PARTITION"
+ " --gateway-sender-id=sender1").statusIsSuccess();

gfsh.executeAndAssertThat("alter gateway-sender --id=sender1 --group-transaction-events=true")
.statusIsError()
.containsOutput("alter-gateway-sender cannot be performed for --group-transaction-events");
Expand All @@ -261,6 +286,11 @@ public void testCreateSerialGatewaySenderAndAlterBatchSizeServerDown() throws Ex
gfsh.executeAndAssertThat("list gateways").statusIsSuccess()
.containsOutput("sender1");

gfsh.executeAndAssertThat("create region"
+ " --name=parentRegion"
+ " --type=PARTITION"
+ " --gateway-sender-id=sender1").statusIsSuccess();

server1.stop(false);

gfsh.executeAndAssertThat(
Expand Down Expand Up @@ -296,6 +326,11 @@ public void testCreateSerialGatewaySenderAndAlterEventFiters() throws Exception
gfsh.executeAndAssertThat("list gateways").statusIsSuccess()
.containsOutput("sender1");

gfsh.executeAndAssertThat("create region"
+ " --name=parentRegion"
+ " --type=PARTITION"
+ " --gateway-sender-id=sender1").statusIsSuccess();

gfsh.executeAndAssertThat(
"alter gateway-sender --id=sender1 --batch-size=200 --alert-threshold=100 --gateway-event-filter="
+ MyGatewayEventFilter.class.getName())
Expand Down Expand Up @@ -365,6 +400,11 @@ public void testCreateSerialGatewaySenderAndAlterEventFitersAndRemove() throws E
gfsh.executeAndAssertThat("list gateways").statusIsSuccess()
.containsOutput("sender1");

gfsh.executeAndAssertThat("create region"
+ " --name=parentRegion"
+ " --type=PARTITION"
+ " --gateway-sender-id=sender1").statusIsSuccess();

gfsh.executeAndAssertThat(
"alter gateway-sender --id=sender1 --batch-size=200 --alert-threshold=100 --gateway-event-filter="
+ MyGatewayEventFilter.class.getName())
Expand Down Expand Up @@ -432,6 +472,11 @@ public void testCreateParallelGatewaySenderAndAlterBatchSize() throws Exception
gfsh.executeAndAssertThat("list gateways").statusIsSuccess()
.containsOutput("sender1P");

gfsh.executeAndAssertThat("create region"
+ " --name=parentRegion"
+ " --type=PARTITION"
+ " --gateway-sender-id=sender1P").statusIsSuccess();

gfsh.executeAndAssertThat(
"alter gateway-sender --id=sender1P --batch-size=200 --alert-threshold=100")
.statusIsSuccess();
Expand Down Expand Up @@ -459,6 +504,11 @@ public void testCreateParallelGatewaySenderAndChangeGroupTransaction() throws Ex
gfsh.executeAndAssertThat("list gateways").statusIsSuccess()
.containsOutput("sender1P");

gfsh.executeAndAssertThat("create region"
+ " --name=parentRegion"
+ " --type=PARTITION"
+ " --gateway-sender-id=sender1P").statusIsSuccess();

gfsh.executeAndAssertThat("alter gateway-sender --id=sender1P --group-transaction-events=true")
.statusIsSuccess();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,12 @@ public void testCreateDestroyParallelGatewaySender() {
"GatewaySender \"ln\" created on \"" + SERVER_4 + "\"",
"GatewaySender \"ln\" created on \"" + SERVER_5 + "\"");

gfsh.executeAndAssertThat("create region"
+ " --name=parentRegion"
+ " --type=PARTITION"
+ " --gateway-sender-id=ln").statusIsSuccess();

gfsh.executeAndAssertThat("destroy region --name=parentRegion").statusIsSuccess();
// destroy gateway sender and verify AEQs cleaned up
gfsh.executeAndAssertThat(DESTROY).statusIsSuccess()
.doesNotContainOutput("Did not complete waiting")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,11 @@ public void stop() {
if (!this.isRunning()) {
return;
}

if (!isShadowRegionCreated()) {
return;
}

// Stop the dispatcher
stopProcessing();

Expand Down

0 comments on commit 57310b2

Please sign in to comment.