Skip to content

Commit

Permalink
fix:repaired tests
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Peterson <[email protected]>
  • Loading branch information
mattp-swirldslabs committed Jul 29, 2024
1 parent 0fed3db commit fe67e8e
Show file tree
Hide file tree
Showing 10 changed files with 51 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -108,12 +108,8 @@ StreamObserver<PublishStreamRequest> publishBlockStream(
System.Logger.Level.DEBUG,
"Executing bidirectional publishBlockStream gRPC method");

if (serviceStatus.isRunning()) {
return new ProducerBlockItemObserver(
streamMediator, publishStreamResponseObserver, itemAckBuilder, serviceStatus);
}

return null;
return new ProducerBlockItemObserver(
streamMediator, publishStreamResponseObserver, itemAckBuilder, serviceStatus);
}

void subscribeBlockStream(
Expand Down
4 changes: 2 additions & 2 deletions server/src/main/java/com/hedera/block/server/Server.java
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@ public static void main(final String[] args) {
serviceStatus.setWebServer(webServer);
streamMediator.register(serviceStatus);


// Start the web server
webServer.start();
} catch (IOException e) {
Expand All @@ -87,7 +86,8 @@ public static void main(final String[] args) {
}

private static StreamMediator<BlockItem, ObjectEvent<SubscribeStreamResponse>>
buildStreamMediator(final Config config, final ServiceStatus serviceStatus) throws IOException {
buildStreamMediator(final Config config, final ServiceStatus serviceStatus)
throws IOException {
return new LiveStreamMediatorImpl(
new ConcurrentHashMap<>(32),
new FileSystemPersistenceHandler(
Expand Down
16 changes: 16 additions & 0 deletions server/src/main/java/com/hedera/block/server/ServiceStatus.java
Original file line number Diff line number Diff line change
@@ -1,3 +1,19 @@
/*
* Copyright (C) 2024 Hedera Hashgraph, LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.hedera.block.server;

import io.helidon.webserver.WebServer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package com.hedera.block.server;

import io.helidon.webserver.WebServer;

import java.util.concurrent.atomic.AtomicBoolean;

public class ServiceStatusImpl implements ServiceStatus {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package com.hedera.block.server.mediator;

import static com.hedera.block.protos.BlockStreamService.*;

import com.hedera.block.server.ServiceStatus;
import com.hedera.block.server.ServiceStatusImpl;
import com.hedera.block.server.consumer.BlockItemEventHandler;
Expand All @@ -26,15 +28,12 @@
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.util.DaemonThreadFactory;

import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import static com.hedera.block.protos.BlockStreamService.*;

/**
* LiveStreamMediatorImpl is the implementation of the StreamMediator interface. It is responsible
* for managing the subscribe and unsubscribe operations of downstream consumers. It also proxies
Expand Down Expand Up @@ -87,6 +86,12 @@ public LiveStreamMediatorImpl(
this(new ConcurrentHashMap<>(), blockPersistenceHandler, new ServiceStatusImpl());
}

public LiveStreamMediatorImpl(
final BlockPersistenceHandler<BlockItem, Block> blockPersistenceHandler,
final ServiceStatus serviceStatus) {
this(new ConcurrentHashMap<>(), blockPersistenceHandler, serviceStatus);
}

@Override
public void publishEvent(final BlockItem blockItem) throws IOException {

Expand Down Expand Up @@ -130,11 +135,6 @@ public void publishEvent(final BlockItem blockItem) throws IOException {
}
}

@Override
public boolean isPublishing() {
return serviceStatus.isRunning();
}

@Override
public void subscribe(
final BlockItemEventHandler<ObjectEvent<SubscribeStreamResponse>> handler) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

import com.hedera.block.server.ServiceStatus;
import com.hedera.block.server.consumer.BlockItemEventHandler;

import java.io.IOException;

/**
Expand All @@ -36,8 +35,6 @@ public interface StreamMediator<U, V> {

void publishEvent(final U blockItem) throws IOException;

boolean isPublishing();

void subscribe(final BlockItemEventHandler<V> handler);

void unsubscribe(final BlockItemEventHandler<V> handler);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,16 @@

package com.hedera.block.server.producer;

import static com.hedera.block.protos.BlockStreamService.*;
import static com.hedera.block.protos.BlockStreamService.PublishStreamResponse.*;

import com.hedera.block.server.ServiceStatus;
import com.hedera.block.server.data.ObjectEvent;
import com.hedera.block.server.mediator.StreamMediator;
import io.grpc.stub.StreamObserver;

import java.io.IOException;
import java.security.NoSuchAlgorithmException;

import static com.hedera.block.protos.BlockStreamService.*;
import static com.hedera.block.protos.BlockStreamService.PublishStreamResponse.*;

/**
* The ProducerBlockStreamObserver class plugs into Helidon's server-initiated bidirectional gRPC
* service implementation. Helidon calls methods on this class as networking events occur with the
Expand Down Expand Up @@ -71,7 +70,7 @@ public void onNext(final PublishStreamRequest publishStreamRequest) {
try {
// Publish the block to all the subscribers unless
// there's an issue with the StreamMediator.
if (streamMediator.isPublishing()) {
if (serviceStatus.isRunning()) {

// Publish the block to the mediator
streamMediator.publishEvent(blockItem);
Expand Down Expand Up @@ -106,6 +105,7 @@ public void onNext(final PublishStreamRequest publishStreamRequest) {
}

private static PublishStreamResponse buildErrorStreamResponse() {
// TODO: Replace this with a real error enum.
final EndOfStream endOfStream =
EndOfStream.newBuilder()
.setStatus(PublishStreamResponseCode.STREAM_ITEMS_UNKNOWN)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@
import static com.hedera.block.protos.BlockStreamService.*;
import static com.hedera.block.protos.BlockStreamService.PublishStreamResponse.*;
import static com.hedera.block.server.util.PersistTestUtils.generateBlockItems;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.*;
import static org.mockito.Mockito.*;

import com.hedera.block.server.consumer.BlockItemEventHandler;
Expand Down Expand Up @@ -73,6 +72,7 @@ public class BlockStreamServiceIT {
@Mock private StreamObserver<SubscribeStreamResponse> subscribeStreamObserver6;

@Mock private WebServer webServer;
@Mock private ServiceStatus serviceStatus;

@Mock private BlockReader<Block> blockReader;
@Mock private BlockWriter<BlockItem> blockWriter;
Expand Down Expand Up @@ -104,8 +104,7 @@ public void testPublishBlockStreamRegistrationAndExecution()

final BlockStreamService blockStreamService =
new BlockStreamService(50L, new ItemAckBuilder(), streamMediator);

when(streamMediator.isPublishing()).thenReturn(true);
blockStreamService.register(webServer);

final StreamObserver<PublishStreamRequest> streamObserver =
blockStreamService.publishBlockStream(publishStreamResponseObserver);
Expand Down Expand Up @@ -420,9 +419,14 @@ public void testMediatorExceptionHandlingWhenPersistenceFailure()
final List<BlockItem> blockItems = generateBlockItems(1);
final PublishStreamRequest publishStreamRequest =
PublishStreamRequest.newBuilder().setBlockItem(blockItems.getFirst()).build();

streamObserver.onNext(publishStreamRequest);

// Simulate another producer attempting to connect to the Block Node.
// Later, verify they received a response indicating the stream is closed.
final StreamObserver<PublishStreamRequest> expectedNoOpStreamObserver =
blockStreamService.publishBlockStream(publishStreamResponseObserver);
expectedNoOpStreamObserver.onNext(publishStreamRequest);

synchronized (lock) {
lock.wait(50);
}
Expand Down Expand Up @@ -453,7 +457,7 @@ public void testMediatorExceptionHandlingWhenPersistenceFailure()
.build();
final var endOfStreamResponse =
PublishStreamResponse.newBuilder().setStatus(endOfStream).build();
verify(publishStreamResponseObserver, times(1)).onNext(endOfStreamResponse);
verify(publishStreamResponseObserver, times(2)).onNext(endOfStreamResponse);
verify(webServer, times(1)).stop();

// Now verify the block was removed from the file system.
Expand Down Expand Up @@ -529,7 +533,8 @@ private StreamMediator<BlockItem, ObjectEvent<SubscribeStreamResponse>> buildStr
final ServiceStatus serviceStatus = new ServiceStatusImpl();
serviceStatus.setWebServer(webServer);
return new LiveStreamMediatorImpl(
subscribers, new FileSystemPersistenceHandler(blockReader, blockWriter),
subscribers,
new FileSystemPersistenceHandler(blockReader, blockWriter),
serviceStatus);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ public void testPartialBlockRemoval() throws IOException {
JUNIT, testConfig, blockRemover, Util.defaultPerms, 23);

// Write all the block items for 2 blocks
for (int i = 0;i < 20;i++) {
for (int i = 0; i < 20; i++) {
blockWriter.write(blockItems.get(i));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.google.protobuf.ByteString;
import com.hedera.block.protos.BlockStreamService;
import com.hedera.block.server.ServiceStatus;
import com.hedera.block.server.ServiceStatusImpl;
import com.hedera.block.server.consumer.ConsumerBlockItemObserver;
import com.hedera.block.server.data.ObjectEvent;
import com.hedera.block.server.mediator.LiveStreamMediatorImpl;
Expand Down Expand Up @@ -73,7 +74,7 @@ public void testProducerOnNext()
new ItemAckBuilder(),
serviceStatus);

when(streamMediator.isPublishing()).thenReturn(true);
when(serviceStatus.isRunning()).thenReturn(true);

BlockItem blockHeader = blockItems.getFirst();
PublishStreamRequest publishStreamRequest =
Expand Down Expand Up @@ -106,9 +107,10 @@ public void testProducerOnNext()
public void testProducerToManyConsumers() throws IOException, InterruptedException {
final long TIMEOUT_THRESHOLD_MILLIS = 100L;
final long TEST_TIME = 1_719_427_664_950L;
final ServiceStatus serviceStatus = new ServiceStatusImpl();
final var streamMediator =
new LiveStreamMediatorImpl(
new FileSystemPersistenceHandler(blockReader, blockWriter));
new FileSystemPersistenceHandler(blockReader, blockWriter), serviceStatus);

final var concreteObserver1 =
new ConsumerBlockItemObserver(
Expand Down Expand Up @@ -201,7 +203,7 @@ public void testItemAckBuilderExceptionTest()
itemAckBuilder,
serviceStatus);

when(streamMediator.isPublishing()).thenReturn(true);
when(serviceStatus.isRunning()).thenReturn(true);
when(itemAckBuilder.buildAck(any()))
.thenThrow(new NoSuchAlgorithmException("Test exception"));

Expand Down

0 comments on commit fe67e8e

Please sign in to comment.