From 3a89cfa8844ae2c2c9ec2c7c9c13602be23521ac Mon Sep 17 00:00:00 2001 From: Finn Carroll Date: Thu, 6 Jun 2024 11:19:11 -0700 Subject: [PATCH] Test RefTrackedOnDemandBlockSnapshotIndexInput solution. --- .../directory/RemoteSnapshotDirectory.java | 49 ++++++++++++++++++- .../file/OnDemandBlockSnapshotIndexInput.java | 4 ++ 2 files changed, 52 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/opensearch/index/store/remote/directory/RemoteSnapshotDirectory.java b/server/src/main/java/org/opensearch/index/store/remote/directory/RemoteSnapshotDirectory.java index 19ecee67bdb96..167c190c57b7a 100644 --- a/server/src/main/java/org/opensearch/index/store/remote/directory/RemoteSnapshotDirectory.java +++ b/server/src/main/java/org/opensearch/index/store/remote/directory/RemoteSnapshotDirectory.java @@ -24,10 +24,12 @@ import org.opensearch.repositories.blobstore.BlobStoreRepository; import java.io.IOException; +import java.lang.ref.WeakReference; import java.util.Collection; import java.util.Collections; import java.util.Map; import java.util.Set; +import java.util.ArrayList; import java.util.stream.Collectors; /** @@ -74,7 +76,7 @@ public IndexInput openInput(String name, IOContext context) throws IOException { if (fileInfo.name().startsWith(VIRTUAL_FILE_PREFIX)) { return new ByteArrayIndexInput(fileInfo.physicalName(), fileInfo.metadata().hash().bytes); } - return new OnDemandBlockSnapshotIndexInput(fileInfo, localStoreDir, transferManager); + return new RefTrackedOnDemandBlockSnapshotIndexInput(fileInfo, localStoreDir, transferManager); } @Override @@ -145,3 +147,48 @@ public void writeBytes(byte[] b, int offset, int length) throws IOException { } } } + +/** + * Maintains a list of weak references to all clones. On close ensure all clones are closed as well. + * Per {@link IndexInput} cloned IndexInputs can be closed with the parent. + */ +class RefTrackedOnDemandBlockSnapshotIndexInput extends OnDemandBlockSnapshotIndexInput { + + private final ArrayList> cloneRefs; + + public RefTrackedOnDemandBlockSnapshotIndexInput(OnDemandBlockSnapshotIndexInput parentObject, ArrayList> parentRefList) { + super(parentObject); + this.cloneRefs = parentRefList; + } + + public RefTrackedOnDemandBlockSnapshotIndexInput(BlobStoreIndexShardSnapshot.FileInfo fileInfo, FSDirectory directory, TransferManager transferManager) { + super(fileInfo, directory, transferManager); + this.cloneRefs = new ArrayList<>(); + } + + public RefTrackedOnDemandBlockSnapshotIndexInput(String resourceDescription, BlobStoreIndexShardSnapshot.FileInfo fileInfo, long offset, long length, boolean isClone, FSDirectory directory, TransferManager transferManager) { + super(resourceDescription, fileInfo, offset, length, isClone, directory, transferManager); + this.cloneRefs = new ArrayList<>(); + } + + @Override + public RefTrackedOnDemandBlockSnapshotIndexInput clone() { + OnDemandBlockSnapshotIndexInput parentClone = super.clone(); + cloneRefs.add(new WeakReference<>(parentClone)); + return new RefTrackedOnDemandBlockSnapshotIndexInput(parentClone, cloneRefs); + } + + @Override + public void close() throws IOException { + if (!isClone) { + for (WeakReference ref : cloneRefs) { + IndexInput input = ref.get(); + if (input != null) { + input.close(); + } + // cloneRefs.remove(ref); // For now don't remove for debugging + } + } + super.close(); + } +} diff --git a/server/src/main/java/org/opensearch/index/store/remote/file/OnDemandBlockSnapshotIndexInput.java b/server/src/main/java/org/opensearch/index/store/remote/file/OnDemandBlockSnapshotIndexInput.java index 8097fd08da50a..20ad158fa14e4 100644 --- a/server/src/main/java/org/opensearch/index/store/remote/file/OnDemandBlockSnapshotIndexInput.java +++ b/server/src/main/java/org/opensearch/index/store/remote/file/OnDemandBlockSnapshotIndexInput.java @@ -55,6 +55,10 @@ public class OnDemandBlockSnapshotIndexInput extends OnDemandBlockIndexInput { */ protected final long originalFileSize; + public OnDemandBlockSnapshotIndexInput(OnDemandBlockSnapshotIndexInput sourceObject) { + this(sourceObject.fileInfo, sourceObject.directory, sourceObject.transferManager); + } + public OnDemandBlockSnapshotIndexInput(FileInfo fileInfo, FSDirectory directory, TransferManager transferManager) { this( "BlockedSnapshotIndexInput(path=\""