Skip to content

Commit

Permalink
Download block after adding cache entry (opensearch-project#7129)
Browse files Browse the repository at this point in the history
As detailed in opensearch-project#7031, we should not guard the download operations with
the cache lock as it is too coarse-grained. This refactoring moves the
long running operation outside of the cache operations.

Resolves opensearch-project#7031

Signed-off-by: Andrew Ross <[email protected]>
  • Loading branch information
andrross authored Apr 13, 2023
1 parent 3cb2565 commit a9b4c45
Show file tree
Hide file tree
Showing 9 changed files with 271 additions and 169 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -107,27 +107,10 @@ public void setup() {
/**
* Stubbed out IndexInput that does nothing but report a fixed size
*/
private static class FixedSizeStubIndexInput extends CachedIndexInput {
private FixedSizeStubIndexInput() {
super(FixedSizeStubIndexInput.class.getSimpleName());
}

@Override
public boolean isClosed() {
return false;
}

@Override
public void close() {}

@Override
public long getFilePointer() {
throw new UnsupportedOperationException();
}

private static class FixedSizeStubIndexInput implements CachedIndexInput {
@Override
public void seek(long pos) {
throw new UnsupportedOperationException();
public IndexInput getIndexInput() {
return null;
}

@Override
Expand All @@ -136,18 +119,13 @@ public long length() {
}

@Override
public IndexInput slice(String sliceDescription, long offset, long length) {
throw new UnsupportedOperationException();
public boolean isClosed() {
return false;
}

@Override
public byte readByte() {
throw new UnsupportedOperationException();
}
public void close() throws Exception {

@Override
public void readBytes(byte[] b, int offset, int len) {
throw new UnsupportedOperationException();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,27 +8,32 @@

package org.opensearch.index.store.remote.filecache;

import java.io.IOException;

import org.apache.lucene.store.IndexInput;

/**
* Base IndexInput whose instances will be maintained in cache.
* Interface for an entry in the {@link FileCache} that can return an
* {@link IndexInput}. Exactly how the IndexInput is created is determined by
* the implementations.
*
* @opensearch.internal
*/
public abstract class CachedIndexInput extends IndexInput {
public interface CachedIndexInput extends AutoCloseable {
/**
* Gets the {@link IndexInput} this cache entry represents.
* @return The IndexInput
* @throws IOException if any I/O error occurs
*/
IndexInput getIndexInput() throws IOException;

/**
* resourceDescription should be a non-null, opaque string
* describing this resource; it's returned from
* {@link #toString}.
* @return length in bytes
*/
protected CachedIndexInput(String resourceDescription) {
super(resourceDescription);
}
long length();

/**
* return true this index input is closed, false if not
* @return true this index input is closed, false if not
* @return true if the entry is closed, false otherwise
*/
public abstract boolean isClosed();
boolean isClosed();
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package org.opensearch.index.store.remote.filecache;

import org.apache.lucene.store.IndexInput;
import org.opensearch.common.breaker.CircuitBreaker;
import org.opensearch.common.breaker.CircuitBreakingException;
import org.opensearch.index.store.remote.utils.cache.CacheUsage;
Expand Down Expand Up @@ -171,7 +172,7 @@ public void restoreFromDirectory(List<Path> fileCacheDataPaths) {
.filter(Files::isRegularFile)
.forEach(path -> {
try {
put(path.toAbsolutePath(), new FileCachedIndexInput.ClosedIndexInput(Files.size(path)));
put(path.toAbsolutePath(), new RestoredCachedIndexInput(Files.size(path)));
decRef(path.toAbsolutePath());
} catch (IOException e) {
throw new UncheckedIOException(
Expand Down Expand Up @@ -200,4 +201,39 @@ public FileCacheStats fileCacheStats() {
stats.missCount()
);
}

/**
* Placeholder for the existing file blocks that are in the disk-based
* local cache at node startup time. We can't open a file handle to these
* blocks at this point, so we store this placeholder object in the cache.
* If a block is needed, then these entries will be replaced with a proper
* entry that will open the actual file handle to create the IndexInput.
* These entries are eligible for eviction so if nothing needs to reference
* them they will be deleted when the disk-based local cache fills up.
*/
private static class RestoredCachedIndexInput implements CachedIndexInput {
private final long length;

private RestoredCachedIndexInput(long length) {
this.length = length;
}

@Override
public IndexInput getIndexInput() {
throw new UnsupportedOperationException();
}

@Override
public long length() {
return length;
}

@Override
public boolean isClosed() {
return true;
}

@Override
public void close() throws Exception {}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@

package org.opensearch.index.store.remote.filecache;

import org.apache.lucene.store.IndexInput;
import org.opensearch.common.breaker.CircuitBreaker;
import org.opensearch.common.cache.RemovalReason;
import org.opensearch.index.store.remote.utils.cache.SegmentedCache;
Expand Down Expand Up @@ -63,7 +62,7 @@ private static FileCache createFileCache(SegmentedCache<Path, CachedIndexInput>
private static SegmentedCache.Builder<Path, CachedIndexInput> createDefaultBuilder() {
return SegmentedCache.<Path, CachedIndexInput>builder()
// use length in bytes as the weight of the file item
.weigher(IndexInput::length)
.weigher(CachedIndexInput::length)
.listener((removalNotification) -> {
RemovalReason removalReason = removalNotification.getRemovalReason();
CachedIndexInput value = removalNotification.getValue();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
*
* @opensearch.internal
*/
public class FileCachedIndexInput extends CachedIndexInput implements RandomAccessInput {
public class FileCachedIndexInput extends IndexInput implements RandomAccessInput {

protected final FileCache cache;

Expand Down Expand Up @@ -151,71 +151,4 @@ public void close() throws IOException {
closed = true;
}
}

/**
* Mainly used by File Cache to detect origin this IndexInput is closed or not
*
* @return the index input closed or not
*/
@Override
public boolean isClosed() {
return closed;
}

/**
* IndexInput instance which is utilized to fetch length for the input without opening the IndexInput.
*/
public static class ClosedIndexInput extends CachedIndexInput {
private final long length;

public ClosedIndexInput(long length) {
super("ClosedIndexInput");
this.length = length;
}

@Override
public void close() throws IOException {
// No-Op
}

@Override
public long getFilePointer() {
throw new UnsupportedOperationException("ClosedIndexInput doesn't support getFilePointer().");
}

@Override
public void seek(long pos) throws IOException {
throw new UnsupportedOperationException("ClosedIndexInput doesn't support seek().");
}

@Override
public long length() {
return length;
}

@Override
public IndexInput slice(String sliceDescription, long offset, long length) throws IOException {
throw new UnsupportedOperationException("ClosedIndexInput couldn't be sliced.");
}

@Override
public byte readByte() throws IOException {
throw new UnsupportedOperationException("ClosedIndexInput doesn't support read.");
}

@Override
public void readBytes(byte[] b, int offset, int len) throws IOException {
throw new UnsupportedOperationException("ClosedIndexInput doesn't support read.");
}

@Override
public IndexInput clone() {
throw new UnsupportedOperationException("ClosedIndexInput cannot be cloned.");
}

@Override
public boolean isClosed() {
return true;
}
}
}
Loading

0 comments on commit a9b4c45

Please sign in to comment.