Skip to content

Commit

Permalink
fix: remove caching
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Peterson <[email protected]>
  • Loading branch information
mattp-swirldslabs committed Jun 27, 2024
1 parent 4c46aa3 commit 6ff6a8d
Show file tree
Hide file tree
Showing 9 changed files with 24 additions and 270 deletions.
5 changes: 1 addition & 4 deletions server/src/main/java/com/hedera/block/server/Server.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@
import com.hedera.block.protos.BlockStreamServiceGrpcProto;
import com.hedera.block.server.mediator.LiveStreamMediatorImpl;
import com.hedera.block.server.persistence.WriteThroughCacheHandler;
import com.hedera.block.server.persistence.cache.BlockCache;
import com.hedera.block.server.persistence.cache.LRUCache;
import com.hedera.block.server.persistence.storage.BlockStorage;
import com.hedera.block.server.persistence.storage.FileSystemBlockStorage;
import io.grpc.stub.ServerCalls;
Expand Down Expand Up @@ -63,9 +61,8 @@ public static void main(final String[] args) {

// Initialize the block storage, cache, and service
final BlockStorage<BlockStreamServiceGrpcProto.Block> blockStorage = new FileSystemBlockStorage(BLOCKNODE_STORAGE_ROOT_PATH_KEY, config);
final BlockCache<BlockStreamServiceGrpcProto.Block> blockCache = new LRUCache(1000);
final BlockStreamService blockStreamService = new BlockStreamService(1500,
new LiveStreamMediatorImpl(new WriteThroughCacheHandler(blockStorage, blockCache)));
new LiveStreamMediatorImpl(new WriteThroughCacheHandler(blockStorage)));

// Start the web server
WebServer.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package com.hedera.block.server.persistence;

import com.hedera.block.protos.BlockStreamServiceGrpcProto;
import com.hedera.block.server.persistence.cache.BlockCache;
import com.hedera.block.server.persistence.storage.BlockStorage;

import java.util.ArrayDeque;
Expand All @@ -31,18 +30,14 @@
public class WriteThroughCacheHandler implements BlockPersistenceHandler<BlockStreamServiceGrpcProto.Block> {

private final BlockStorage<BlockStreamServiceGrpcProto.Block> blockStorage;
private final BlockCache<BlockStreamServiceGrpcProto.Block> blockCache;

/**
* Constructor for the WriteThroughCacheHandler class.
*
* @param blockStorage the block storage
* @param blockCache the block cache
*/
public WriteThroughCacheHandler(final BlockStorage<BlockStreamServiceGrpcProto.Block> blockStorage,
final BlockCache<BlockStreamServiceGrpcProto.Block> blockCache) {
public WriteThroughCacheHandler(final BlockStorage<BlockStreamServiceGrpcProto.Block> blockStorage) {
this.blockStorage = blockStorage;
this.blockCache = blockCache;
}

/**
Expand All @@ -56,7 +51,7 @@ public Long persist(final BlockStreamServiceGrpcProto.Block block) {

// Write-Through cache
blockStorage.write(block);
return blockCache.insert(block);
return block.getId();
}

/**
Expand Down Expand Up @@ -91,15 +86,6 @@ public Queue<BlockStreamServiceGrpcProto.Block> readRange(final long startBlockI
*/
@Override
public Optional<BlockStreamServiceGrpcProto.Block> read(final long id) {

if (blockCache.contains(id)) {
return Optional.of(blockCache.get(id));
} else {
// Update the cache with the block from storage
Optional<BlockStreamServiceGrpcProto.Block> block = blockStorage.read(id);
block.ifPresent(blockCache::insert);

return block;
}
return blockStorage.read(id);
}
}

This file was deleted.

This file was deleted.

This file was deleted.

1 change: 0 additions & 1 deletion server/src/main/java/module-info.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,4 @@
requires io.helidon.config;
requires io.helidon.webserver.grpc;
requires io.helidon.webserver;
requires jakarta.inject;
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import com.hedera.block.protos.BlockStreamServiceGrpcProto;
import com.hedera.block.server.consumer.LiveStreamObserver;
import com.hedera.block.server.persistence.WriteThroughCacheHandler;
import com.hedera.block.server.persistence.cache.BlockCache;
import com.hedera.block.server.persistence.storage.BlockStorage;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
Expand All @@ -46,14 +45,11 @@ public class LiveStreamMediatorImplTest {
@Mock
private BlockStorage<BlockStreamServiceGrpcProto.Block> blockStorage;

@Mock
private BlockCache<BlockStreamServiceGrpcProto.Block> blockCache;

@Test
public void testUnsubscribeEach() {

final StreamMediator<BlockStreamServiceGrpcProto.Block, BlockStreamServiceGrpcProto.BlockResponse> streamMediator =
new LiveStreamMediatorImpl(new WriteThroughCacheHandler(blockStorage, blockCache));
new LiveStreamMediatorImpl(new WriteThroughCacheHandler(blockStorage));

// Set up the subscribers
streamMediator.subscribe(liveStreamObserver1);
Expand All @@ -78,24 +74,23 @@ public void testUnsubscribeEach() {
public void testMediatorPersistenceWithoutSubscribers() {

final StreamMediator<BlockStreamServiceGrpcProto.Block, BlockStreamServiceGrpcProto.BlockResponse> streamMediator =
new LiveStreamMediatorImpl(new WriteThroughCacheHandler(blockStorage, blockCache));
new LiveStreamMediatorImpl(new WriteThroughCacheHandler(blockStorage));

final BlockStreamServiceGrpcProto.Block newBlock = BlockStreamServiceGrpcProto.Block.newBuilder().build();

// Acting as a producer, notify the mediator of a new block
streamMediator.notifyAll(newBlock);

// Confirm the block was persisted to storage and cache
// Confirm the block was persisted to storage
// even though there are no subscribers
verify(blockStorage).write(newBlock);
verify(blockCache).insert(newBlock);
}

@Test
public void testMediatorNotifyAll() {

final StreamMediator<BlockStreamServiceGrpcProto.Block, BlockStreamServiceGrpcProto.BlockResponse> streamMediator =
new LiveStreamMediatorImpl(new WriteThroughCacheHandler(blockStorage, blockCache));
new LiveStreamMediatorImpl(new WriteThroughCacheHandler(blockStorage));

// Set up the subscribers
streamMediator.subscribe(liveStreamObserver1);
Expand All @@ -118,7 +113,6 @@ public void testMediatorNotifyAll() {

// Confirm the block was persisted to storage and cache
verify(blockStorage).write(newBlock);
verify(blockCache).insert(newBlock);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,10 @@
package com.hedera.block.server.persistence;

import com.hedera.block.protos.BlockStreamServiceGrpcProto;
import com.hedera.block.server.persistence.cache.BlockCache;
import com.hedera.block.server.persistence.cache.LRUCache;
import com.hedera.block.server.persistence.storage.BlockStorage;
import org.junit.jupiter.api.Test;

import java.util.List;
import java.util.Optional;
import java.util.Queue;
import java.util.*;

import static com.hedera.block.server.persistence.PersistTestUtils.generateBlocks;
import static org.junit.jupiter.api.Assertions.assertEquals;
Expand Down Expand Up @@ -100,20 +96,26 @@ private static void verifyReadRange(
private static BlockPersistenceHandler<BlockStreamServiceGrpcProto.Block> generateInMemoryTestBlockPersistenceHandler(int maxEntries) {
// Mock up a simple, in-memory persistence handler
BlockStorage<BlockStreamServiceGrpcProto.Block> blockStorage = new NoOpTestBlockStorage();
BlockCache<BlockStreamServiceGrpcProto.Block> blockCache = new LRUCache(maxEntries);
return new WriteThroughCacheHandler(blockStorage, blockCache);
return new WriteThroughCacheHandler(blockStorage);
}

private static class NoOpTestBlockStorage implements BlockStorage<BlockStreamServiceGrpcProto.Block> {

private final Map<Long, BlockStreamServiceGrpcProto.Block> cache;

public NoOpTestBlockStorage() {
this.cache = new HashMap<>();
}

@Override
public Optional<Long> write(BlockStreamServiceGrpcProto.Block block) {
cache.put(block.getId(), block);
return Optional.of(block.getId());
}

@Override
public Optional<BlockStreamServiceGrpcProto.Block> read(Long blockId) {
return Optional.empty();
return Optional.ofNullable(cache.get(blockId));
}
}
}
Loading

0 comments on commit 6ff6a8d

Please sign in to comment.