Skip to content

Commit

Permalink
fix: removed annotations for local variable assignment
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Peterson <[email protected]>
  • Loading branch information
mattp-swirldslabs committed Aug 22, 2024
1 parent 5d366db commit 121f52c
Show file tree
Hide file tree
Showing 10 changed files with 34 additions and 70 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -146,11 +146,10 @@ void protocSubscribeBlockStream(
subscribeStreamResponseObserver) {
LOGGER.log(
System.Logger.Level.DEBUG,
"Executing Server Streaming subscribeBlockStream gRPC Service");
"Executing Server Streaming subscribeBlockStream gRPC method");

// Return a custom StreamObserver to handle streaming blocks from the producer.
if (serviceStatus.isRunning()) {
@NonNull
final var streamObserver =
new ConsumerStreamResponseObserver(
blockNodeContext,
Expand All @@ -176,7 +175,6 @@ void protocSingleBlock(
LOGGER.log(System.Logger.Level.DEBUG, "Executing Unary singleBlock gRPC method");

try {
@NonNull
final SingleBlockRequest pbjSingleBlockRequest =
toPbjSingleBlockRequest(singleBlockRequest);
singleBlock(pbjSingleBlockRequest, singleBlockResponseStreamObserver);
Expand All @@ -200,7 +198,7 @@ private void singleBlock(
if (serviceStatus.isRunning()) {
final long blockNumber = singleBlockRequest.blockNumber();
try {
@NonNull final Optional<Block> blockOpt = blockReader.read(blockNumber);
final Optional<Block> blockOpt = blockReader.read(blockNumber);
if (blockOpt.isPresent()) {
LOGGER.log(
System.Logger.Level.DEBUG,
Expand All @@ -209,7 +207,6 @@ private void singleBlock(
singleBlockResponseStreamObserver.onNext(
toProtocSingleBlockResponse(blockOpt.get()));

@NonNull
final MetricsService metricsService = blockNodeContext.metricsService();
metricsService.singleBlocksRetrieved.increment();
} else {
Expand Down Expand Up @@ -238,7 +235,6 @@ private void singleBlock(
@NonNull
static com.hedera.hapi.block.protoc.SubscribeStreamResponse
buildSubscribeStreamNotAvailableResponse() {
@NonNull
final SubscribeStreamResponse response =
SubscribeStreamResponse.newBuilder()
.status(SubscribeStreamResponseCode.READ_STREAM_SUCCESS)
Expand All @@ -249,7 +245,6 @@ private void singleBlock(

@NonNull
static com.hedera.hapi.block.protoc.SingleBlockResponse buildSingleBlockNotAvailableResponse() {
@NonNull
final SingleBlockResponse response =
SingleBlockResponse.newBuilder()
.status(SingleBlockResponseCode.READ_BLOCK_NOT_AVAILABLE)
Expand All @@ -261,7 +256,6 @@ static com.hedera.hapi.block.protoc.SingleBlockResponse buildSingleBlockNotAvail
@NonNull
static com.hedera.hapi.block.protoc.SingleBlockResponse buildSingleBlockNotFoundResponse()
throws InvalidProtocolBufferException {
@NonNull
final SingleBlockResponse response =
SingleBlockResponse.newBuilder()
.status(SingleBlockResponseCode.READ_BLOCK_NOT_FOUND)
Expand All @@ -275,8 +269,6 @@ private static com.hedera.hapi.block.SingleBlockRequest toPbjSingleBlockRequest(
@NonNull final com.hedera.hapi.block.protoc.SingleBlockRequest singleBlockRequest)
throws ParseException {

@NonNull final byte[] protocBytes = singleBlockRequest.toByteArray();
@NonNull final Bytes bytes = Bytes.wrap(protocBytes);
return SingleBlockRequest.PROTOBUF.parse(bytes);
return SingleBlockRequest.PROTOBUF.parse(Bytes.wrap(singleBlockRequest.toByteArray()));
}
}
15 changes: 4 additions & 11 deletions server/src/main/java/com/hedera/block/server/Server.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,46 +55,39 @@ public static void main(final String[] args) {

try {
// init context, metrics, and configuration.
@NonNull final BlockNodeContext blockNodeContext = BlockNodeContextFactory.create();
final BlockNodeContext blockNodeContext = BlockNodeContextFactory.create();
final ServiceStatus serviceStatus = new ServiceStatusImpl();

@NonNull final ServiceStatus serviceStatus = new ServiceStatusImpl();

@NonNull
final BlockWriter<BlockItem> blockWriter =
BlockAsDirWriterBuilder.newBuilder(blockNodeContext).build();
@NonNull

final StreamMediator<BlockItem, ObjectEvent<SubscribeStreamResponse>> streamMediator =
LiveStreamMediatorBuilder.newBuilder(
blockWriter, blockNodeContext, serviceStatus)
.build();

@NonNull
final BlockReader<Block> blockReader =
BlockAsDirReaderBuilder.newBuilder(
blockNodeContext
.configuration()
.getConfigData(PersistenceStorageConfig.class))
.build();

@NonNull
final BlockStreamService blockStreamService =
buildBlockStreamService(
streamMediator, blockReader, serviceStatus, blockNodeContext);

@NonNull
final GrpcRouting.Builder grpcRouting =
GrpcRouting.builder().service(blockStreamService);

@NonNull final HealthService healthService = new HealthServiceImpl(serviceStatus);
final HealthService healthService = new HealthServiceImpl(serviceStatus);

@NonNull
final HttpRouting.Builder httpRouting =
HttpRouting.builder()
.register(healthService.getHealthRootPath(), healthService);

// Build the web server
// TODO: make port server a configurable value.
@NonNull
final WebServer webServer =
WebServer.builder()
.port(8080)
Expand Down
6 changes: 0 additions & 6 deletions server/src/main/java/com/hedera/block/server/Translator.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ private Translator() {}
public static com.hedera.hapi.block.protoc.SingleBlockResponse toProtocSingleBlockResponse(
@NonNull final SingleBlockResponse singleBlockResponse) {
try {
@NonNull
final byte[] pbjBytes = asBytes(SingleBlockResponse.PROTOBUF, singleBlockResponse);
return com.hedera.hapi.block.protoc.SingleBlockResponse.parseFrom(pbjBytes);
} catch (InvalidProtocolBufferException e) {
Expand All @@ -77,7 +76,6 @@ public static com.hedera.hapi.block.protoc.SingleBlockResponse toProtocSingleBlo
@NonNull
public static com.hedera.hapi.block.protoc.SingleBlockResponse toProtocSingleBlockResponse(
@NonNull final Block block) {
@NonNull
final SingleBlockResponse singleBlockResponse =
SingleBlockResponse.newBuilder()
.status(SingleBlockResponseCode.READ_BLOCK_SUCCESS)
Expand All @@ -100,7 +98,6 @@ public static com.hedera.hapi.block.protoc.SingleBlockResponse toProtocSingleBlo
public static com.hedera.hapi.block.protoc.PublishStreamResponse toProtocPublishStreamResponse(
@NonNull final com.hedera.hapi.block.PublishStreamResponse publishStreamResponse) {
try {
@NonNull
final byte[] pbjBytes = asBytes(PublishStreamResponse.PROTOBUF, publishStreamResponse);
return com.hedera.hapi.block.protoc.PublishStreamResponse.parseFrom(pbjBytes);
} catch (InvalidProtocolBufferException e) {
Expand All @@ -125,7 +122,6 @@ public static com.hedera.hapi.block.protoc.PublishStreamResponse toProtocPublish
public static com.hedera.hapi.block.protoc.PublishStreamRequest toProtocPublishStreamRequest(
@NonNull final com.hedera.hapi.block.PublishStreamRequest publishStreamRequest) {
try {
@NonNull
final byte[] pbjBytes = asBytes(PublishStreamRequest.PROTOBUF, publishStreamRequest);
return com.hedera.hapi.block.protoc.PublishStreamRequest.parseFrom(pbjBytes);
} catch (InvalidProtocolBufferException e) {
Expand Down Expand Up @@ -153,7 +149,6 @@ public static com.hedera.hapi.block.protoc.PublishStreamRequest toProtocPublishS
final com.hedera.hapi.block.SubscribeStreamResponse
subscribeStreamResponse) {
try {
@NonNull
final byte[] pbjBytes =
asBytes(SubscribeStreamResponse.PROTOBUF, subscribeStreamResponse);
return com.hedera.hapi.block.protoc.SubscribeStreamResponse.parseFrom(pbjBytes);
Expand Down Expand Up @@ -183,7 +178,6 @@ public static com.hedera.hapi.block.protoc.PublishStreamRequest toProtocPublishS
final com.hedera.hapi.block.SubscribeStreamRequest
subscribeStreamRequest) {
try {
@NonNull
final byte[] pbjBytes =
asBytes(SubscribeStreamRequest.PROTOBUF, subscribeStreamRequest);
return com.hedera.hapi.block.protoc.SubscribeStreamRequest.parseFrom(pbjBytes);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import com.hedera.pbj.runtime.OneOf;
import com.lmax.disruptor.EventHandler;
import edu.umd.cs.findbugs.annotations.NonNull;
import edu.umd.cs.findbugs.annotations.Nullable;
import io.grpc.stub.ServerCallStreamObserver;
import io.grpc.stub.StreamObserver;
import java.time.InstantSource;
Expand Down Expand Up @@ -162,8 +161,7 @@ public void onEvent(
// Refresh the producer liveness and pass the BlockItem to the downstream observer.
producerLivenessMillis = currentMillis;

@NonNull final SubscribeStreamResponse subscribeStreamResponse = event.get();
@NonNull
final SubscribeStreamResponse subscribeStreamResponse = event.get();
final ResponseSender responseSender = getResponseSender(subscribeStreamResponse);
responseSender.send(subscribeStreamResponse);
}
Expand Down Expand Up @@ -195,7 +193,7 @@ public void send(@NonNull final SubscribeStreamResponse subscribeStreamResponse)

// Only start sending BlockItems after we've reached
// the beginning of a block.
@Nullable final BlockItem blockItem = subscribeStreamResponse.blockItem();
final BlockItem blockItem = subscribeStreamResponse.blockItem();
if (blockItem == null) {
LOGGER.log(
System.Logger.Level.ERROR,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,6 @@ class LiveStreamMediatorImpl
this.blockWriter = blockWriter;

// Initialize and start the disruptor
@NonNull
final Disruptor<ObjectEvent<SubscribeStreamResponse>> disruptor =
// TODO: replace ring buffer size with a configurable value, create a MediatorConfig
new Disruptor<>(ObjectEvent::new, 2048, DaemonThreadFactory.INSTANCE);
Expand All @@ -113,7 +112,6 @@ public void publish(@NonNull final BlockItem blockItem) throws IOException {

// Publish the block for all subscribers to receive
LOGGER.log(System.Logger.Level.DEBUG, "Publishing BlockItem: {0}", blockItem);
@NonNull
final var subscribeStreamResponse =
SubscribeStreamResponse.newBuilder().blockItem(blockItem).build();
ringBuffer.publishEvent((event, sequence) -> event.set(subscribeStreamResponse));
Expand All @@ -136,11 +134,11 @@ public void publish(@NonNull final BlockItem blockItem) throws IOException {
LOGGER.log(System.Logger.Level.DEBUG, "Send a response to end the stream");

// Publish the block for all subscribers to receive
@NonNull final SubscribeStreamResponse endStreamResponse = buildEndStreamResponse();
final SubscribeStreamResponse endStreamResponse = buildEndStreamResponse();
ringBuffer.publishEvent((event, sequence) -> event.set(endStreamResponse));

// Unsubscribe all downstream consumers
for (@NonNull final var subscriber : subscribers.keySet()) {
for (final var subscriber : subscribers.keySet()) {
LOGGER.log(System.Logger.Level.DEBUG, "Unsubscribing: {0}", subscriber);
unsubscribe(subscriber);
}
Expand All @@ -157,7 +155,6 @@ public void subscribe(
@NonNull final EventHandler<ObjectEvent<SubscribeStreamResponse>> handler) {

// Initialize the batch event processor and set it on the ring buffer
@NonNull
final var batchEventProcessor =
new BatchEventProcessorBuilder()
.build(ringBuffer, ringBuffer.newBarrier(), handler);
Expand All @@ -176,7 +173,7 @@ public void unsubscribe(
@NonNull final EventHandler<ObjectEvent<SubscribeStreamResponse>> handler) {

// Remove the subscriber
@NonNull final var batchEventProcessor = subscribers.remove(handler);
final var batchEventProcessor = subscribers.remove(handler);
if (batchEventProcessor == null) {
LOGGER.log(System.Logger.Level.ERROR, "Subscriber not found: {0}", handler);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ class BlockAsDirReader implements BlockReader<Block> {

LOGGER.log(System.Logger.Level.INFO, "Initializing FileSystemBlockReader");

@NonNull final Path blockNodeRootPath = Path.of(config.rootPath());
final Path blockNodeRootPath = Path.of(config.rootPath());

LOGGER.log(System.Logger.Level.INFO, config.toString());
LOGGER.log(System.Logger.Level.INFO, "Block Node Root Path: " + blockNodeRootPath);
Expand Down Expand Up @@ -89,7 +89,7 @@ public Optional<Block> read(final long blockNumber) throws IOException {

// Verify path attributes of the block directory within the
// block node root path
@NonNull final Path blockPath = blockNodeRootPath.resolve(String.valueOf(blockNumber));
final Path blockPath = blockNodeRootPath.resolve(String.valueOf(blockNumber));
if (isPathDisqualified(blockPath)) {
return Optional.empty();
}
Expand All @@ -105,11 +105,10 @@ public Optional<Block> read(final long blockNumber) throws IOException {
// 10.blk), the loop will directly fetch the BlockItems in order based on
// their file names. The loop will exit when it attempts to read a
// BlockItem file that does not exist (e.g., 11.blk).
@NonNull final Block.Builder builder = Block.newBuilder();
@NonNull final List<BlockItem> blockItems = new ArrayList<>();
final Block.Builder builder = Block.newBuilder();
final List<BlockItem> blockItems = new ArrayList<>();
for (int i = 1; ; i++) {
@NonNull final Path blockItemPath = blockPath.resolve(i + BLOCK_FILE_EXTENSION);
@NonNull
final Path blockItemPath = blockPath.resolve(i + BLOCK_FILE_EXTENSION);
final Optional<BlockItem> blockItemOpt = readBlockItem(blockItemPath.toString());
if (blockItemOpt.isPresent()) {
blockItems.add(blockItemOpt.get());
Expand All @@ -133,7 +132,7 @@ public Optional<Block> read(final long blockNumber) throws IOException {
private Optional<BlockItem> readBlockItem(@NonNull final String blockItemPath)
throws IOException {

try (@NonNull final FileInputStream fis = new FileInputStream(blockItemPath)) {
try (final FileInputStream fis = new FileInputStream(blockItemPath)) {

BlockItem blockItem = BlockItem.PROTOBUF.parse(Bytes.wrap(fis.readAllBytes()));
return Optional.of(blockItem);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package com.hedera.block.server.persistence.storage.remove;

import edu.umd.cs.findbugs.annotations.NonNull;
import edu.umd.cs.findbugs.annotations.Nullable;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
Expand Down Expand Up @@ -61,7 +60,7 @@ public void remove(final long id) throws IOException {

// Calculate the block path and proactively set the permissions
// for removal
@NonNull final Path blockPath = blockNodeRootPath.resolve(String.valueOf(id));
final Path blockPath = blockNodeRootPath.resolve(String.valueOf(id));
if (Files.notExists(blockPath)) {
LOGGER.log(System.Logger.Level.ERROR, "Block does not exist: {0}", id);
return;
Expand All @@ -80,9 +79,9 @@ private static boolean delete(@NonNull final File file) {
// Recursively delete the contents
// of the directory
if (file.isDirectory()) {
@Nullable final File[] files = file.listFiles();
final File[] files = file.listFiles();
if (files != null) {
for (@NonNull final File f : files) {
for (final File f : files) {
delete(f);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ public void write(@NonNull final BlockItem blockItem) throws IOException {
resetState(blockItem);
}

@NonNull final Path blockItemFilePath = calculateBlockItemPath();
final Path blockItemFilePath = calculateBlockItemPath();
for (int retries = 0; ; retries++) {
try {
write(blockItemFilePath, blockItem);
Expand Down Expand Up @@ -137,8 +137,7 @@ public void write(@NonNull final BlockItem blockItem) throws IOException {
*/
protected void write(@NonNull final Path blockItemFilePath, @NonNull final BlockItem blockItem)
throws IOException {
try (@NonNull
final FileOutputStream fos = new FileOutputStream(blockItemFilePath.toString())) {
try (final FileOutputStream fos = new FileOutputStream(blockItemFilePath.toString())) {

BlockItem.PROTOBUF.toBytes(blockItem).writeTo(fos);
LOGGER.log(
Expand Down Expand Up @@ -170,7 +169,7 @@ private void resetState(@NonNull final BlockItem blockItem) throws IOException {
blockNodeFileNameIndex = 0;

// Increment the block counter
@NonNull final MetricsService metricsService = blockNodeContext.metricsService();
final MetricsService metricsService = blockNodeContext.metricsService();
metricsService.blocksPersisted.increment();
}

Expand Down Expand Up @@ -198,7 +197,7 @@ private void repairPermissions(@NonNull final Path path) throws IOException {
@NonNull
private Path calculateBlockItemPath() {
// Build the path to a .blk file
@NonNull final Path blockPath = calculateBlockPath();
final Path blockPath = calculateBlockPath();
blockNodeFileNameIndex++;
return blockPath.resolve(blockNodeFileNameIndex + BLOCK_FILE_EXTENSION);
}
Expand Down
Loading

0 comments on commit 121f52c

Please sign in to comment.