Skip to content

Commit

Permalink
Test strict cache does not overflow
Browse files Browse the repository at this point in the history
Signed-off-by: Finn Carroll <[email protected]>
  • Loading branch information
finnegancarroll committed Aug 14, 2024
1 parent 06e7569 commit d5b4adf
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package org.opensearch.index.store.remote.utils;

import org.opensearch.common.blobstore.BlobContainer;
import org.opensearch.index.store.remote.filecache.FileCache;

import java.io.ByteArrayInputStream;
import java.io.IOException;
Expand All @@ -24,10 +25,10 @@ public class TransferManagerBlobContainerReaderTests extends TransferManagerTest
private BlobContainer blobContainer;

@Override
protected void initializeTransferManager() throws IOException {
protected TransferManager initializeTransferManager(FileCache cache) throws IOException {
blobContainer = mock(BlobContainer.class);
doAnswer(i -> new ByteArrayInputStream(createData())).when(blobContainer).readBlob(eq("blob"), anyLong(), anyLong());
transferManager = new TransferManager(blobContainer::readBlob, fileCache);
return new TransferManager(blobContainer::readBlob, cache);
}

protected void mockExceptionWhileReading() throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.opensearch.common.lucene.store.ByteArrayIndexInput;
import org.opensearch.common.lucene.store.InputStreamIndexInput;
import org.opensearch.index.store.RemoteDirectory;
import org.opensearch.index.store.remote.filecache.FileCache;

import java.io.IOException;
import java.util.concurrent.CountDownLatch;
Expand All @@ -26,15 +27,15 @@ public class TransferManagerRemoteDirectoryReaderTests extends TransferManagerTe
private RemoteDirectory remoteDirectory;

@Override
protected void initializeTransferManager() throws IOException {
protected TransferManager initializeTransferManager(FileCache cache) throws IOException {
remoteDirectory = mock(RemoteDirectory.class);
doAnswer(i -> new ByteArrayIndexInput("blob", createData())).when(remoteDirectory).openInput(eq("blob"), any());
transferManager = new TransferManager(
return new TransferManager(
(name, position, length) -> new InputStreamIndexInput(
remoteDirectory.openInput(name, new BlockIOContext(IOContext.DEFAULT, position, length)),
length
),
fileCache
cache
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,9 @@ public abstract class TransferManagerTestCase extends OpenSearchTestCase {
@Before
public void setUp() throws Exception {
super.setUp();
fileCache.enableOverflow(true);
directory = new MMapDirectory(createTempDir(), SimpleFSLockFactory.INSTANCE);
initializeTransferManager();
transferManager = initializeTransferManager(fileCache);
}

@After
Expand All @@ -64,6 +65,25 @@ protected static byte[] createData() {
return data;
}

public void testOverflowDisabled() throws Exception {
FileCache strictCache = FileCacheFactory.createConcurrentLRUFileCache(
EIGHT_MB * 2,
1,
new NoopCircuitBreaker(CircuitBreaker.REQUEST)
);
strictCache.enableOverflow(false);
TransferManager tm = initializeTransferManager(strictCache);

assertThrows(IOException.class, () -> {
IndexInput i1 = fetchBlobWithName(tm, "1");
IndexInput i2 = fetchBlobWithName(tm, "2");
IndexInput i3 = fetchBlobWithName(tm, "3");
assertIndexInputIsFunctional(i1);
assertIndexInputIsFunctional(i2);
assertIndexInputIsFunctional(i3);
});
}

public void testSingleAccess() throws Exception {
try (IndexInput i = fetchBlobWithName("file")) {
assertIndexInputIsFunctional(i);
Expand Down Expand Up @@ -193,7 +213,7 @@ public void testFetchesToDifferentBlobsDoNotBlockOnEachOther() throws Exception
assertFalse(blockingThread.isAlive());
}

protected abstract void initializeTransferManager() throws IOException;
protected abstract TransferManager initializeTransferManager(FileCache cache) throws IOException;

protected abstract void mockExceptionWhileReading() throws IOException;

Expand All @@ -205,6 +225,12 @@ private IndexInput fetchBlobWithName(String blobname) throws IOException {
return transferManager.fetchBlob(BlobFetchRequest.builder().fileName(blobname).directory(directory).blobParts(blobParts).build());
}

private IndexInput fetchBlobWithName(TransferManager tm, String blobname) throws IOException {
List<BlobFetchRequest.BlobPart> blobParts = new ArrayList<>();
blobParts.add(new BlobFetchRequest.BlobPart("blob", 0, EIGHT_MB));
return tm.fetchBlob(BlobFetchRequest.builder().fileName(blobname).directory(directory).blobParts(blobParts).build());
}

private static void assertIndexInputIsFunctional(IndexInput indexInput) throws IOException {
indexInput.seek(EIGHT_MB - 1);
MatcherAssert.assertThat(indexInput.readByte(), equalTo((byte) 7));
Expand Down

0 comments on commit d5b4adf

Please sign in to comment.