Skip to content

Commit

Permalink
Added UT for urgentWriteAsync
Browse files Browse the repository at this point in the history
Signed-off-by: Shivansh Arora <[email protected]>
  • Loading branch information
shiv0408 committed Oct 22, 2023
1 parent ead15a8 commit fef26e5
Showing 1 changed file with 69 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@

package org.opensearch.snapshots;

import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.opensearch.OpenSearchCorruptionException;
import org.opensearch.OpenSearchParseException;
import org.opensearch.common.blobstore.AsyncMultiStreamBlobContainer;
Expand All @@ -43,6 +45,8 @@
import org.opensearch.common.blobstore.fs.FsBlobStore;
import org.opensearch.common.blobstore.stream.read.ReadContext;
import org.opensearch.common.blobstore.stream.write.WriteContext;
import org.opensearch.common.blobstore.stream.write.WritePriority;
import org.opensearch.common.blobstore.transfer.RemoteTransferContainer;
import org.opensearch.common.compress.DeflateCompressor;
import org.opensearch.common.io.Streams;
import org.opensearch.common.io.stream.BytesStreamOutput;
Expand All @@ -54,10 +58,12 @@
import org.opensearch.core.xcontent.ToXContentFragment;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.gateway.remote.RemoteClusterStateService;
import org.opensearch.index.translog.BufferedChecksumStreamOutput;
import org.opensearch.repositories.blobstore.ChecksumBlobStoreFormat;
import org.opensearch.test.OpenSearchTestCase;

import javax.swing.*;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
Expand All @@ -67,6 +73,7 @@

import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.greaterThan;
import static org.mockito.Mockito.*;

public class BlobStoreFormatTests extends OpenSearchTestCase {

Expand Down Expand Up @@ -128,44 +135,37 @@ public void testBlobStoreAsyncOperations() throws IOException, InterruptedExcept
BlobPath.cleanPath(),
null
);
MockFsVerifyingBlobContainer spyContainer = spy(mockBlobContainer);
ChecksumBlobStoreFormat<BlobObj> checksumSMILE = new ChecksumBlobStoreFormat<>(BLOB_CODEC, "%s", BlobObj::fromXContent);

ArgumentCaptor<ActionListener<Void>> actionListenerArgumentCaptor = ArgumentCaptor.forClass(ActionListener.class);
ArgumentCaptor<WriteContext> writeContextArgumentCaptor = ArgumentCaptor.forClass(WriteContext.class);
CountDownLatch latch = new CountDownLatch(2);

ActionListener<Void> actionListener = new ActionListener<>() {
@Override
public void onResponse(Void unused) {
logger.info("---> Async write succeeded");
latch.countDown();
}

@Override
public void onFailure(Exception e) {
logger.info("---> Failure in async write");
throw new RuntimeException("async write should not fail");
}
};

// Write blobs in different formats
checksumSMILE.writeAsync(
new BlobObj("checksum smile"),
mockBlobContainer,
spyContainer,
"check-smile",
CompressorRegistry.none(),
actionListener,
getVoidActionListener(latch),
ChecksumBlobStoreFormat.SNAPSHOT_ONLY_FORMAT_PARAMS
);
checksumSMILE.writeAsync(
new BlobObj("checksum smile compressed"),
mockBlobContainer,
spyContainer,
"check-smile-comp",
CompressorRegistry.getCompressor(DeflateCompressor.NAME),
actionListener,
getVoidActionListener(latch),
ChecksumBlobStoreFormat.SNAPSHOT_ONLY_FORMAT_PARAMS
);

latch.await();

verify(spyContainer, times(2)).asyncBlobUpload(writeContextArgumentCaptor.capture(), actionListenerArgumentCaptor.capture());
assertEquals(2, writeContextArgumentCaptor.getAllValues().size());
writeContextArgumentCaptor.getAllValues().forEach(
writeContext -> assertEquals(WritePriority.NORMAL, writeContext.getWritePriority())
);
// Assert that all checksum blobs can be read
assertEquals(checksumSMILE.read(mockBlobContainer.getDelegate(), "check-smile", xContentRegistry()).getText(), "checksum smile");
assertEquals(
Expand All @@ -174,6 +174,39 @@ public void onFailure(Exception e) {
);
}

public void testBlobStorePriorityAsyncOperation() throws IOException, InterruptedException {
BlobStore blobStore = createTestBlobStore();
MockFsVerifyingBlobContainer mockBlobContainer = new MockFsVerifyingBlobContainer(
(FsBlobStore) blobStore,
BlobPath.cleanPath(),
null
);
MockFsVerifyingBlobContainer spyContainer = spy(mockBlobContainer);
ChecksumBlobStoreFormat<BlobObj> checksumSMILE = new ChecksumBlobStoreFormat<>(BLOB_CODEC, "%s", BlobObj::fromXContent);

ArgumentCaptor<ActionListener<Void>> actionListenerArgumentCaptor = ArgumentCaptor.forClass(ActionListener.class);
ArgumentCaptor<WriteContext> writeContextArgumentCaptor = ArgumentCaptor.forClass(WriteContext.class);
CountDownLatch latch = new CountDownLatch(1);

// Write blobs in different formats
checksumSMILE.urgentWriteAsync(
new BlobObj("cluster state diff"),
spyContainer,
"cluster-state-diff",
CompressorRegistry.none(),
getVoidActionListener(latch),
ChecksumBlobStoreFormat.SNAPSHOT_ONLY_FORMAT_PARAMS
);
latch.await();

verify(spyContainer).asyncBlobUpload(writeContextArgumentCaptor.capture(), actionListenerArgumentCaptor.capture());
assertEquals(WritePriority.URGENT, writeContextArgumentCaptor.getValue().getWritePriority());
assertEquals(
checksumSMILE.read(mockBlobContainer.getDelegate(), "cluster-state-diff", xContentRegistry()).getText(),
"cluster state diff"
);
}

public void testBlobStoreOperations() throws IOException {
BlobStore blobStore = createTestBlobStore();
BlobContainer blobContainer = blobStore.blobContainer(BlobPath.cleanPath());
Expand Down Expand Up @@ -228,6 +261,23 @@ public void testBlobCorruption() throws IOException {
}
}

private ActionListener<Void> getVoidActionListener(CountDownLatch latch) {
ActionListener<Void> actionListener = new ActionListener<>() {
@Override
public void onResponse(Void unused) {
logger.info("---> Async write succeeded");
latch.countDown();
}

@Override
public void onFailure(Exception e) {
logger.info("---> Failure in async write");
throw new RuntimeException("async write should not fail");
}
};

return actionListener;
}
protected BlobStore createTestBlobStore() throws IOException {
return new FsBlobStore(randomIntBetween(1, 8) * 1024, createTempDir(), false);
}
Expand Down

0 comments on commit fef26e5

Please sign in to comment.