Skip to content

Commit

Permalink
initial commits
Browse files Browse the repository at this point in the history
Signed-off-by: Sandeep Kumawat <[email protected]>
  • Loading branch information
skumawat2025 committed May 13, 2024
1 parent 0282e64 commit 6724043
Show file tree
Hide file tree
Showing 7 changed files with 200 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ public class RemoteFsTranslog extends Translog {
private static final int SYNC_PERMIT = 1;
private final Semaphore syncPermit = new Semaphore(SYNC_PERMIT);
private final AtomicBoolean pauseSync = new AtomicBoolean(false);
boolean ckpAsMetadata;

public RemoteFsTranslog(
TranslogConfig config,
Expand All @@ -110,14 +111,16 @@ public RemoteFsTranslog(
this.startedPrimarySupplier = startedPrimarySupplier;
this.remoteTranslogTransferTracker = remoteTranslogTransferTracker;
fileTransferTracker = new FileTransferTracker(shardId, remoteTranslogTransferTracker);
ckpAsMetadata = true;
this.translogTransferManager = buildTranslogTransferManager(
blobStoreRepository,
threadPool,
shardId,
fileTransferTracker,
remoteTranslogTransferTracker,
indexSettings().getRemoteStorePathStrategy(),
remoteStoreSettings
remoteStoreSettings,
ckpAsMetadata
);
try {
download(translogTransferManager, location, logger);
Expand Down Expand Up @@ -288,7 +291,8 @@ public static TranslogTransferManager buildTranslogTransferManager(
FileTransferTracker fileTransferTracker,
RemoteTranslogTransferTracker tracker,
RemoteStorePathStrategy pathStrategy,
RemoteStoreSettings remoteStoreSettings
RemoteStoreSettings remoteStoreSettings,
boolean ckpAsMetadata
) {
assert Objects.nonNull(pathStrategy);
String indexUUID = shardId.getIndex().getUUID();
Expand All @@ -310,7 +314,16 @@ public static TranslogTransferManager buildTranslogTransferManager(
.build();
BlobPath mdPath = pathStrategy.generatePath(mdPathInput);
BlobStoreTransferService transferService = new BlobStoreTransferService(blobStoreRepository.blobStore(), threadPool);
return new TranslogTransferManager(shardId, transferService, dataPath, mdPath, fileTransferTracker, tracker, remoteStoreSettings);
return new TranslogTransferManager(
shardId,
transferService,
dataPath,
mdPath,
fileTransferTracker,
tracker,
remoteStoreSettings,
ckpAsMetadata
);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,12 @@
import org.opensearch.index.translog.transfer.FileSnapshot.TransferFileSnapshot;
import org.opensearch.threadpool.ThreadPool;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.channels.FileChannel;
import java.nio.file.StandardOpenOption;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.*;

import static org.opensearch.common.blobstore.BlobContainer.BlobNameSortOrder.LEXICOGRAPHIC;

Expand Down Expand Up @@ -90,29 +88,52 @@ public void uploadBlob(final TransferFileSnapshot fileSnapshot, Iterable<String>
public void uploadBlobs(
Set<TransferFileSnapshot> fileSnapshots,
final Map<Long, BlobPath> blobPaths,
final Map<TransferFileSnapshot, InputStream> fileMetadataMap,
ActionListener<TransferFileSnapshot> listener,
WritePriority writePriority
) {
fileSnapshots.forEach(fileSnapshot -> {
BlobPath blobPath = blobPaths.get(fileSnapshot.getPrimaryTerm());
InputStream fileMetadata = fileMetadataMap.get(fileSnapshot);
if (!(blobStore.blobContainer(blobPath) instanceof AsyncMultiStreamBlobContainer)) {
uploadBlob(ThreadPool.Names.TRANSLOG_TRANSFER, fileSnapshot, blobPath, listener, writePriority);
} else {
uploadBlob(fileSnapshot, listener, blobPath, writePriority);
uploadBlob(fileSnapshot, fileMetadata, listener, blobPath, writePriority);
}
});

}

public Map<String, String> buildFileMetadata(InputStream fileMetadata) throws IOException {
Map<String, String> metadata = new HashMap<>();
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
byte[] buffer = new byte[4096];
int bytesRead;

while ((bytesRead = fileMetadata.read(buffer)) != -1) {
byteArrayOutputStream.write(buffer, 0, bytesRead);
}

byte[] bytes = byteArrayOutputStream.toByteArray();
String metadataString = Base64.getEncoder().encodeToString(bytes);
metadata.put("ckp-data", metadataString);
return metadata;
}

private void uploadBlob(
TransferFileSnapshot fileSnapshot,
InputStream fileMetadata,
ActionListener<TransferFileSnapshot> listener,
BlobPath blobPath,
WritePriority writePriority
) {

try {
ChannelFactory channelFactory = FileChannel::open;
Map<String, String> metadata = null;
if(fileMetadata != null){
metadata = buildFileMetadata(fileMetadata);
}
long contentLength;
try (FileChannel channel = channelFactory.open(fileSnapshot.getPath(), StandardOpenOption.READ)) {
contentLength = channel.size();
Expand All @@ -130,7 +151,8 @@ private void uploadBlob(
writePriority,
(size, position) -> new OffsetRangeFileInputStream(fileSnapshot.getPath(), size, position),
Objects.requireNonNull(fileSnapshot.getChecksum()),
remoteIntegrityEnabled
remoteIntegrityEnabled,
metadata
);
ActionListener<Void> completionListener = ActionListener.wrap(resp -> listener.onResponse(fileSnapshot), ex -> {
logger.error(() -> new ParameterizedMessage("Failed to upload blob {}", fileSnapshot.getName()), ex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.Arrays;
import java.util.Map;
import java.util.Objects;

/**
Expand Down Expand Up @@ -108,6 +109,7 @@ public static class TransferFileSnapshot extends FileSnapshot {

private final long primaryTerm;
private Long checksum;
private Map<String, String> metadata;

public TransferFileSnapshot(Path path, long primaryTerm, Long checksum) throws IOException {
super(path);
Expand All @@ -128,6 +130,14 @@ public long getPrimaryTerm() {
return primaryTerm;
}

public void setMetadata(Map<String, String> metadata) {
this.metadata = metadata;
}

public Map<String, String> getMetadata() {
return metadata;
}

@Override
public int hashCode() {
return Objects.hash(primaryTerm, super.hashCode());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,20 @@ void uploadBlobs(
WritePriority writePriority
) throws Exception;

/**
* Uploads multiple {@link TransferFileSnapshot}, once the upload is complete the callback is invoked
* @param fileSnapshots the file snapshots to upload
* @param blobPaths Primary term to {@link BlobPath} map
* @param listener the callback to be invoked once uploads complete successfully/fail
*/
void uploadBlobs(
Set<TransferFileSnapshot> fileSnapshots,
final Map<Long, BlobPath> blobPaths,
final Map<TransferFileSnapshot, InputStream> fileMetadataMap,
ActionListener<TransferFileSnapshot> listener,
WritePriority writePriority
) throws Exception;

/**
* Uploads the {@link TransferFileSnapshot} blob
* @param fileSnapshot the file snapshot to upload
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
import org.opensearch.index.translog.transfer.FileSnapshot.TransferFileSnapshot;
import org.opensearch.index.translog.transfer.FileSnapshot.TranslogFileSnapshot;

import java.io.IOException;
import java.util.Map;
import java.util.Set;

/**
Expand Down Expand Up @@ -39,4 +41,6 @@ public interface TransferSnapshot {
* @return the translog transfer metadata
*/
TranslogTransferMetadata getTranslogTransferMetadata();

Map<TranslogFileSnapshot, CheckpointFileSnapshot> getTranslogCheckpointSnapshotMap();
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,10 @@

import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import java.util.*;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.LongStream;
Expand Down Expand Up @@ -64,6 +62,14 @@ public Set<TransferFileSnapshot> getTranslogFileSnapshots() {
return translogCheckpointFileInfoTupleSet.stream().map(Tuple::v1).collect(Collectors.toSet());
}

public Map<TranslogFileSnapshot, CheckpointFileSnapshot> getTranslogCheckpointSnapshotMap() {
Map<TranslogFileSnapshot, CheckpointFileSnapshot> tlogCkpSnapshots = new HashMap<>();
translogCheckpointFileInfoTupleSet.forEach(tuple -> {
tlogCkpSnapshots.put(tuple.v1(), tuple.v2());
});
return tlogCkpSnapshots;
}

@Override
public TranslogTransferMetadata getTranslogTransferMetadata() {
return new TranslogTransferMetadata(
Expand Down
Loading

0 comments on commit 6724043

Please sign in to comment.