Skip to content

Commit

Permalink
fixed junit tests
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Peterson <[email protected]>
  • Loading branch information
mattp-swirldslabs committed Jul 17, 2024
1 parent 0ed0c9a commit 0889c17
Show file tree
Hide file tree
Showing 4 changed files with 105 additions and 109 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ public ConsumerBlockItemObserver(
final StreamObserver<SubscribeStreamResponse> subscribeStreamResponseObserver) {

this.timeoutThresholdMillis = timeoutThresholdMillis;
this.producerLivenessClock = producerLivenessClock;
this.streamMediator = streamMediator;

// The ServerCallStreamObserver can be configured with a Runnable to
Expand All @@ -80,45 +79,31 @@ public ConsumerBlockItemObserver(
}

this.subscribeStreamResponseObserver = subscribeStreamResponseObserver;
this.producerLivenessClock = producerLivenessClock;
this.producerLivenessMillis = producerLivenessClock.millis();
}

/** Pass the block to the observer provided by Helidon */
@Override
public void onEvent(final ObjectEvent<BlockItem> event, final long l, final boolean b) {

if (producerLivenessClock.millis() - producerLivenessMillis > timeoutThresholdMillis) {

// if (isThresholdExceeded(producerLivenessMillis)) {
final long currentMillis = producerLivenessClock.millis();
if (currentMillis - producerLivenessMillis > timeoutThresholdMillis) {
streamMediator.unsubscribe(this);
LOGGER.log(System.Logger.Level.DEBUG, "Unsubscribed handler");
} else {

// Refresh the producer liveness and pass the block to the observer.
producerLivenessMillis = producerLivenessClock.millis();
// Refresh the producer liveness and pass the BlockItem to the downstream observer.
producerLivenessMillis = currentMillis;

final BlockItem blockItem = event.get();
// Construct a response
final SubscribeStreamResponse subscribeStreamResponse =
SubscribeStreamResponse.newBuilder().setBlockItem(blockItem).build();
SubscribeStreamResponse.newBuilder().setBlockItem(event.get()).build();

subscribeStreamResponseObserver.onNext(subscribeStreamResponse);
}
}

// private boolean isThresholdExceeded(long livenessMillis) {
// final long currentTimeMillis = Clock.systemDefaultZone().millis();
// final long elapsedMillis = currentTimeMillis - livenessMillis;
// if (elapsedMillis > timeoutThresholdMillis) {
// LOGGER.log(
// System.Logger.Level.INFO,
// "Elapsed milliseconds: "
// + elapsedMillis
// + ", timeout threshold: "
// + timeoutThresholdMillis);
// return true;
// }
//
// return false;
// }

@Override
public void awaitShutdown() throws InterruptedException {
shutdownLatch.await();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,20 @@

import static com.hedera.block.protos.BlockStreamService.BlockItem;
import static com.hedera.block.protos.BlockStreamService.SubscribeStreamResponse;
import static com.hedera.block.server.util.TestClock.buildClockInsideWindow;
import static com.hedera.block.server.util.TestClock.buildClockOutsideWindow;
import static org.mockito.Mockito.*;

import com.hedera.block.server.data.ObjectEvent;
import com.hedera.block.server.mediator.StreamMediator;
import io.grpc.stub.StreamObserver;
import java.time.Instant;
import java.time.InstantSource;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;

@ExtendWith(MockitoExtension.class)
public class LiveStreamObserverImplTest {
public class ConsumerBlockItemObserverTest {

private final long TIMEOUT_THRESHOLD_MILLIS = 50L;
private final long TEST_TIME = 1_719_427_664_950L;
Expand All @@ -44,8 +43,7 @@ public class LiveStreamObserverImplTest {
@Mock private ObjectEvent<BlockItem> objectEvent;

@Test
@Disabled
public void testConsumerTimeoutWithinWindow() {
public void testProducerTimeoutWithinWindow() {
final var consumerBlockItemObserver =
new ConsumerBlockItemObserver(
TIMEOUT_THRESHOLD_MILLIS,
Expand All @@ -61,37 +59,8 @@ public void testConsumerTimeoutWithinWindow() {

consumerBlockItemObserver.onEvent(objectEvent, 0, true);

// verify the observer is called with the next
// block and the stream mediator is not unsubscribed
// verify the observer is called with the next BlockItem
verify(responseStreamObserver).onNext(subscribeStreamResponse);
verify(streamMediator, never()).unsubscribe(consumerBlockItemObserver);
}

@Test
public void testConsumerTimeoutOutsideWindow() {

final var consumerBlockItemObserver =
new ConsumerBlockItemObserver(
TIMEOUT_THRESHOLD_MILLIS,
buildClockOutsideWindow(TEST_TIME, TIMEOUT_THRESHOLD_MILLIS),
streamMediator,
responseStreamObserver);

consumerBlockItemObserver.onEvent(objectEvent, 1, true);
verify(streamMediator).unsubscribe(consumerBlockItemObserver);
}

@Test
@Disabled
public void testProducerTimeoutWithinWindow() {
final var consumerBlockItemObserver =
new ConsumerBlockItemObserver(
TIMEOUT_THRESHOLD_MILLIS,
buildClockOutsideWindow(TEST_TIME, TIMEOUT_THRESHOLD_MILLIS),
streamMediator,
responseStreamObserver);

consumerBlockItemObserver.onEvent(objectEvent, 0, true);

// verify the mediator is NOT called to unsubscribe the observer
verify(streamMediator, never()).unsubscribe(consumerBlockItemObserver);
Expand All @@ -111,39 +80,4 @@ public void testProducerTimeoutOutsideWindow() throws InterruptedException {
consumerBlockItemObserver.onEvent(objectEvent, 0, true);
verify(streamMediator).unsubscribe(consumerBlockItemObserver);
}

private static InstantSource buildClockInsideWindow(
long testTime, long timeoutThresholdMillis) {
return new TestClock(testTime, testTime + timeoutThresholdMillis - 1);
}

private static InstantSource buildClockOutsideWindow(
long testTime, long timeoutThresholdMillis) {
return new TestClock(testTime, testTime + timeoutThresholdMillis + 1);
}

static class TestClock implements InstantSource {

private int index;
private final Long[] millis;

TestClock(Long... millis) {
this.millis = millis;
}

@Override
public long millis() {
long value = millis[index];

// cycle through the provided millis
// and wrap around if necessary
index = index > millis.length - 1 ? 0 : index + 1;
return value;
}

@Override
public Instant instant() {
return null;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,8 @@

package com.hedera.block.server.mediator;

import static com.hedera.block.protos.BlockStreamService.Block;
import static com.hedera.block.protos.BlockStreamService.BlockItem;
import static com.hedera.block.protos.BlockStreamService.SubscribeStreamResponse;
import static com.hedera.block.protos.BlockStreamService.*;
import static com.hedera.block.server.util.TestClock.buildClockInsideWindow;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.times;
Expand All @@ -30,8 +29,6 @@
import com.hedera.block.server.persistence.WriteThroughCacheHandler;
import com.hedera.block.server.persistence.storage.BlockStorage;
import io.grpc.stub.StreamObserver;
import java.time.Clock;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
Expand All @@ -48,7 +45,9 @@ public class LiveStreamMediatorImplTest {

@Mock private BlockStorage<Block, BlockItem> blockStorage;

@Mock private StreamObserver<SubscribeStreamResponse> streamObserver;
@Mock private StreamObserver<SubscribeStreamResponse> streamObserver1;
@Mock private StreamObserver<SubscribeStreamResponse> streamObserver2;
@Mock private StreamObserver<SubscribeStreamResponse> streamObserver3;

@Test
public void testUnsubscribeEach() {
Expand Down Expand Up @@ -104,24 +103,33 @@ public void testMediatorPersistenceWithoutSubscribers() {
}

@Test
@Disabled
public void testMediatorPublishEventToSubscribers() {
public void testMediatorPublishEventToSubscribers() throws InterruptedException {

final long TEST_TIMEOUT = 10000;
final long TIMEOUT_THRESHOLD_MILLIS = 100L;
final long TEST_TIME = 1_719_427_664_950L;
final var streamMediator =
new LiveStreamMediatorImpl(new WriteThroughCacheHandler(blockStorage));

final var concreteObserver1 =
new ConsumerBlockItemObserver(
TEST_TIMEOUT, Clock.systemDefaultZone(), streamMediator, streamObserver);
TIMEOUT_THRESHOLD_MILLIS,
buildClockInsideWindow(TEST_TIME, TIMEOUT_THRESHOLD_MILLIS),
streamMediator,
streamObserver1);

final var concreteObserver2 =
new ConsumerBlockItemObserver(
TEST_TIMEOUT, Clock.systemDefaultZone(), streamMediator, streamObserver);
TIMEOUT_THRESHOLD_MILLIS,
buildClockInsideWindow(TEST_TIME, TIMEOUT_THRESHOLD_MILLIS),
streamMediator,
streamObserver2);

final var concreteObserver3 =
new ConsumerBlockItemObserver(
TEST_TIMEOUT, Clock.systemDefaultZone(), streamMediator, streamObserver);
TIMEOUT_THRESHOLD_MILLIS,
buildClockInsideWindow(TEST_TIME, TIMEOUT_THRESHOLD_MILLIS),
streamMediator,
streamObserver3);

// Set up the subscribers
streamMediator.subscribe(concreteObserver1);
Expand All @@ -139,13 +147,21 @@ public void testMediatorPublishEventToSubscribers() {
"Expected the mediator to have observer3 subscribed");

final BlockItem blockItem = BlockItem.newBuilder().build();
final SubscribeStreamResponse subscribeStreamResponse =
SubscribeStreamResponse.newBuilder().setBlockItem(blockItem).build();

// Acting as a producer, notify the mediator of a new block
streamMediator.publishEvent(blockItem);

// TODO: Is there a better way?
synchronized (streamObserver1) {
streamObserver1.wait(2000);
}

// Confirm each subscriber was notified of the new block
verify(streamObserver, times(3))
.onNext(SubscribeStreamResponse.newBuilder().setBlockItem(blockItem).build());
verify(streamObserver1, times(1)).onNext(subscribeStreamResponse);
verify(streamObserver2, times(1)).onNext(subscribeStreamResponse);
verify(streamObserver3, times(1)).onNext(subscribeStreamResponse);

// Confirm the BlockStorage write method was
// called despite the absence of subscribers
Expand Down
61 changes: 61 additions & 0 deletions server/src/test/java/com/hedera/block/server/util/TestClock.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Copyright (C) 2024 Hedera Hashgraph, LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.hedera.block.server.util;

import java.time.Instant;
import java.time.InstantSource;

public class TestClock implements InstantSource {

private int index;
private final Long[] millis;
private final System.Logger LOGGER = System.getLogger(getClass().getName());

public TestClock(Long... millis) {
if (millis.length == 0) {
throw new IllegalArgumentException("At least 1 argument is required");
}

this.millis = millis;
}

@Override
public long millis() {
LOGGER.log(System.Logger.Level.INFO, "millis() called");
long value = millis[index];

// cycle through the provided millis
// and wrap around if necessary
int temp = index + 1;
index = (temp % millis.length == 0) ? 0 : temp;
return value;
}

@Override
public Instant instant() {
return null;
}

public static InstantSource buildClockInsideWindow(long testTime, long timeoutThresholdMillis) {
return new TestClock(testTime, testTime + timeoutThresholdMillis - 1);
}

public static InstantSource buildClockOutsideWindow(
long testTime, long timeoutThresholdMillis) {
return new TestClock(testTime, testTime + timeoutThresholdMillis + 1);
}
}

0 comments on commit 0889c17

Please sign in to comment.