Skip to content

Commit

Permalink
fix: refactored the ConsumerStreamResponseObserver name
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Peterson <[email protected]>
  • Loading branch information
mattp-swirldslabs committed Aug 5, 2024
1 parent 88733a3 commit 2deed16
Show file tree
Hide file tree
Showing 5 changed files with 25 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import static com.hedera.block.server.Constants.*;

import com.google.protobuf.Descriptors;
import com.hedera.block.server.consumer.ConsumerBlockItemObserver;
import com.hedera.block.server.consumer.ConsumerStreamResponseObserver;
import com.hedera.block.server.data.ObjectEvent;
import com.hedera.block.server.mediator.StreamMediator;
import com.hedera.block.server.persistence.storage.BlockReader;
Expand Down Expand Up @@ -130,7 +130,7 @@ void subscribeBlockStream(
// Return a custom StreamObserver to handle streaming blocks from the producer.
if (serviceStatus.isRunning()) {
final var streamObserver =
new ConsumerBlockItemObserver(
new ConsumerStreamResponseObserver(
timeoutThresholdMillis,
Clock.systemDefaultZone(),
streamMediator,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
* by Helidon). The ConsumerBlockItemObserver implements the EventHandler interface so the Disruptor
* can invoke the onEvent() method when a new SubscribeStreamResponse is available.
*/
public class ConsumerBlockItemObserver
public class ConsumerStreamResponseObserver
implements EventHandler<ObjectEvent<SubscribeStreamResponse>> {

private final System.Logger LOGGER = System.getLogger(getClass().getName());
Expand All @@ -56,7 +56,7 @@ public class ConsumerBlockItemObserver
* SubscribeStreamResponse events from the Disruptor and passing them to the downstream consumer
* via the subscribeStreamResponseObserver.
*/
public ConsumerBlockItemObserver(
public ConsumerStreamResponseObserver(
final long timeoutThresholdMillis,
final InstantSource producerLivenessClock,
final SubscriptionHandler<ObjectEvent<SubscribeStreamResponse>> subscriptionHandler,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import org.mockito.junit.jupiter.MockitoExtension;

@ExtendWith(MockitoExtension.class)
public class ConsumerBlockItemObserverTest {
public class ConsumerStreamResponseObserverTest {

private final long TIMEOUT_THRESHOLD_MILLIS = 50L;
private final long TEST_TIME = 1_719_427_664_950L;
Expand All @@ -48,7 +48,7 @@ public void testProducerTimeoutWithinWindow() {
when(testClock.millis()).thenReturn(TEST_TIME, TEST_TIME + TIMEOUT_THRESHOLD_MILLIS);

final var consumerBlockItemObserver =
new ConsumerBlockItemObserver(
new ConsumerStreamResponseObserver(
TIMEOUT_THRESHOLD_MILLIS,
testClock,
streamMediator,
Expand Down Expand Up @@ -78,7 +78,7 @@ public void testProducerTimeoutOutsideWindow() throws InterruptedException {
when(testClock.millis()).thenReturn(TEST_TIME, TEST_TIME + TIMEOUT_THRESHOLD_MILLIS + 1);

final var consumerBlockItemObserver =
new ConsumerBlockItemObserver(
new ConsumerStreamResponseObserver(
TIMEOUT_THRESHOLD_MILLIS,
testClock,
streamMediator,
Expand All @@ -97,7 +97,7 @@ public void testHandlersSetOnObserver() throws InterruptedException {
// millis() calls. Here the second call will always be inside the timeout window.
when(testClock.millis()).thenReturn(TEST_TIME, TEST_TIME + TIMEOUT_THRESHOLD_MILLIS);

new ConsumerBlockItemObserver(
new ConsumerStreamResponseObserver(
TIMEOUT_THRESHOLD_MILLIS, testClock, streamMediator, serverCallStreamObserver);

verify(serverCallStreamObserver, timeout(50)).setOnCloseHandler(any());
Expand All @@ -112,7 +112,7 @@ public void testConsumerNotToSendBeforeBlockHeader() {
when(testClock.millis()).thenReturn(TEST_TIME, TEST_TIME + TIMEOUT_THRESHOLD_MILLIS);

final var consumerBlockItemObserver =
new ConsumerBlockItemObserver(
new ConsumerStreamResponseObserver(
TIMEOUT_THRESHOLD_MILLIS,
testClock,
streamMediator,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import static org.junit.jupiter.api.Assertions.*;
import static org.mockito.Mockito.*;

import com.hedera.block.server.consumer.ConsumerBlockItemObserver;
import com.hedera.block.server.consumer.ConsumerStreamResponseObserver;
import com.hedera.block.server.data.ObjectEvent;
import com.hedera.block.server.persistence.storage.BlockReader;
import com.hedera.block.server.persistence.storage.BlockWriter;
Expand Down Expand Up @@ -118,15 +118,15 @@ public void testMediatorPublishEventToSubscribers() throws IOException, Interrup
when(testClock.millis()).thenReturn(TEST_TIME, TEST_TIME + TIMEOUT_THRESHOLD_MILLIS);

final var concreteObserver1 =
new ConsumerBlockItemObserver(
new ConsumerStreamResponseObserver(
TIMEOUT_THRESHOLD_MILLIS, testClock, streamMediator, streamObserver1);

final var concreteObserver2 =
new ConsumerBlockItemObserver(
new ConsumerStreamResponseObserver(
TIMEOUT_THRESHOLD_MILLIS, testClock, streamMediator, streamObserver2);

final var concreteObserver3 =
new ConsumerBlockItemObserver(
new ConsumerStreamResponseObserver(
TIMEOUT_THRESHOLD_MILLIS, testClock, streamMediator, streamObserver3);

// Set up the subscribers
Expand Down Expand Up @@ -169,15 +169,15 @@ public void testSubAndUnsubHandling() {
when(testClock.millis()).thenReturn(TEST_TIME, TEST_TIME + TIMEOUT_THRESHOLD_MILLIS);

final var concreteObserver1 =
new ConsumerBlockItemObserver(
new ConsumerStreamResponseObserver(
TIMEOUT_THRESHOLD_MILLIS, testClock, streamMediator, streamObserver1);

final var concreteObserver2 =
new ConsumerBlockItemObserver(
new ConsumerStreamResponseObserver(
TIMEOUT_THRESHOLD_MILLIS, testClock, streamMediator, streamObserver2);

final var concreteObserver3 =
new ConsumerBlockItemObserver(
new ConsumerStreamResponseObserver(
TIMEOUT_THRESHOLD_MILLIS, testClock, streamMediator, streamObserver3);

// Set up the subscribers
Expand All @@ -198,7 +198,7 @@ public void testOnCancelSubscriptionHandling() throws IOException {
when(testClock.millis()).thenReturn(TEST_TIME, TEST_TIME + TIMEOUT_THRESHOLD_MILLIS);

final var testConsumerBlockItemObserver =
new TestConsumerBlockItemObserver(
new TestConsumerStreamResponseObserver(
TIMEOUT_THRESHOLD_MILLIS,
testClock,
streamMediator,
Expand Down Expand Up @@ -230,7 +230,7 @@ public void testOnCloseSubscriptionHandling() throws IOException {
when(testClock.millis()).thenReturn(TEST_TIME, TEST_TIME + TIMEOUT_THRESHOLD_MILLIS + 1);

final var testConsumerBlockItemObserver =
new TestConsumerBlockItemObserver(
new TestConsumerStreamResponseObserver(
TIMEOUT_THRESHOLD_MILLIS,
testClock,
streamMediator,
Expand Down Expand Up @@ -284,7 +284,7 @@ public void testMediatorBlocksPublishAfterException() throws IOException, Interr
public void testUnsubscribeWhenNotSubscribed() {
final var streamMediator = new LiveStreamMediatorImpl(blockWriter);
final var testConsumerBlockItemObserver =
new TestConsumerBlockItemObserver(
new TestConsumerStreamResponseObserver(
TIMEOUT_THRESHOLD_MILLIS,
testClock,
streamMediator,
Expand All @@ -300,8 +300,8 @@ public void testUnsubscribeWhenNotSubscribed() {
assertFalse(streamMediator.isSubscribed(testConsumerBlockItemObserver));
}

private static class TestConsumerBlockItemObserver extends ConsumerBlockItemObserver {
public TestConsumerBlockItemObserver(
private static class TestConsumerStreamResponseObserver extends ConsumerStreamResponseObserver {
public TestConsumerStreamResponseObserver(
long timeoutThresholdMillis,
final InstantSource producerLivenessClock,
final StreamMediator<BlockItem, ObjectEvent<SubscribeStreamResponse>>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import com.hedera.block.protos.BlockStreamService;
import com.hedera.block.server.ServiceStatus;
import com.hedera.block.server.ServiceStatusImpl;
import com.hedera.block.server.consumer.ConsumerBlockItemObserver;
import com.hedera.block.server.consumer.ConsumerStreamResponseObserver;
import com.hedera.block.server.data.ObjectEvent;
import com.hedera.block.server.mediator.LiveStreamMediatorImpl;
import com.hedera.block.server.mediator.StreamMediator;
Expand Down Expand Up @@ -106,15 +106,15 @@ public void testProducerToManyConsumers() throws IOException, InterruptedExcepti
when(testClock.millis()).thenReturn(TEST_TIME, TEST_TIME + TIMEOUT_THRESHOLD_MILLIS);

final var concreteObserver1 =
new ConsumerBlockItemObserver(
new ConsumerStreamResponseObserver(
TIMEOUT_THRESHOLD_MILLIS, testClock, streamMediator, streamObserver1);

final var concreteObserver2 =
new ConsumerBlockItemObserver(
new ConsumerStreamResponseObserver(
TIMEOUT_THRESHOLD_MILLIS, testClock, streamMediator, streamObserver2);

final var concreteObserver3 =
new ConsumerBlockItemObserver(
new ConsumerStreamResponseObserver(
TIMEOUT_THRESHOLD_MILLIS, testClock, streamMediator, streamObserver3);

// Set up the subscribers
Expand Down

0 comments on commit 2deed16

Please sign in to comment.