Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Downgrade write lock to read lock before translog upload to remote store #10135

Merged
merged 2 commits into from
Sep 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import org.apache.logging.log4j.Logger;
import org.opensearch.common.SetOnce;
import org.opensearch.common.lease.Releasable;
import org.opensearch.common.lease.Releasables;
import org.opensearch.common.logging.Loggers;
import org.opensearch.common.util.concurrent.ReleasableLock;
import org.opensearch.common.util.io.IOUtils;
Expand Down Expand Up @@ -60,7 +59,6 @@ public class RemoteFsTranslog extends Translog {
private final BooleanSupplier primaryModeSupplier;
private final RemoteTranslogTransferTracker remoteTranslogTransferTracker;
private volatile long maxRemoteTranslogGenerationUploaded;
private final Object uploadMutex = new Object();

private volatile long minSeqNoToKeep;

Expand Down Expand Up @@ -239,19 +237,10 @@ public static TranslogTransferManager buildTranslogTransferManager(
@Override
public boolean ensureSynced(Location location) throws IOException {
try {
boolean shouldUpload = false;
try (ReleasableLock ignored = writeLock.acquire()) {
assert location.generation <= current.getGeneration();
if (location.generation == current.getGeneration()) {
ensureOpen();
if (prepareForUpload(location.generation) == false) {
return false;
}
shouldUpload = true;
}
}
if (shouldUpload) {
return performUpload(primaryTermSupplier.getAsLong(), location.generation);
assert location.generation <= current.getGeneration();
if (location.generation == current.getGeneration()) {
ensureOpen();
return prepareAndUpload(primaryTermSupplier.getAsLong(), location.generation);
}
} catch (final Exception ex) {
closeOnTragicEvent(ex);
Expand All @@ -266,12 +255,10 @@ public void rollGeneration() throws IOException {
if (current.totalOperations() == 0 && primaryTermSupplier.getAsLong() == current.getPrimaryTerm()) {
return;
}
if (prepareForUpload(null)) {
performUpload(primaryTermSupplier.getAsLong(), null);
}
prepareAndUpload(primaryTermSupplier.getAsLong(), null);
}

private boolean prepareForUpload(Long generation) throws IOException {
private boolean prepareAndUpload(Long primaryTerm, Long generation) throws IOException {
try (Releasable ignored = writeLock.acquire()) {
if (generation == null || generation == current.getGeneration()) {
try {
Expand All @@ -282,46 +269,36 @@ private boolean prepareForUpload(Long generation) throws IOException {
logger.trace("Creating new writer for gen: [{}]", current.getGeneration() + 1);
current = createWriter(current.getGeneration() + 1);
}
assert writeLock.isHeldByCurrentThread() : "Write lock must be held before we acquire the read lock";
// Here we are downgrading the write lock by acquiring the read lock and releasing the write lock
// This ensures that other threads can still acquire the read locks while also protecting the
// readers and writer to not be mutated any further.
gbbafna marked this conversation as resolved.
Show resolved Hide resolved
readLock.acquire();
} catch (final Exception e) {
tragedy.setTragicException(e);
closeOnTragicEvent(e);
throw e;
}
return true;
} else return generation >= current.getGeneration();
} else if (generation < current.getGeneration()) {
return false;
}
}
}

/**
* This method does the remote store upload by first acquiring the lock on the uploadMutex monitor. The synchronized
* is required to restrict multiple uploads happening concurrently. The read lock is required to ensure that the
* underlying translog readers are not deleted and the current writer is not converted to a reader at the time of
* upload.
*
* @param primaryTerm current primary term
* @param generation current generation
* @return true if upload is successful
* @throws IOException if the upload fails due to any underlying exceptions.
*/
private boolean performUpload(Long primaryTerm, Long generation) throws IOException {
synchronized (uploadMutex) {
try (Releasable ignored = readLock.acquire()) {
// Do we need remote writes in sync fashion ?
// If we don't , we should swallow FileAlreadyExistsException while writing to remote store
// and also verify for same during primary-primary relocation
// Writing remote in sync fashion doesn't hurt as global ckp update
// is not updated in remote translog except in primary to primary recovery.
long generationToUpload;
if (generation == null) {
if (closed.get() == false) {
generationToUpload = current.getGeneration() - 1;
} else {
generationToUpload = current.getGeneration();
}
assert readLock.isHeldByCurrentThread() == true;
try (Releasable ignored = readLock; Releasable ignoredGenLock = deletionPolicy.acquireTranslogGen(getMinFileGeneration())) {
// Do we need remote writes in sync fashion ?
// If we don't , we should swallow FileAlreadyExistsException while writing to remote store
// and also verify for same during primary-primary relocation
// Writing remote in sync fashion doesn't hurt as global ckp update
// is not updated in remote translog except in primary to primary recovery.
if (generation == null) {
if (closed.get() == false) {
return upload(primaryTerm, current.getGeneration() - 1);
} else {
generationToUpload = generation;
return upload(primaryTerm, current.getGeneration());
}
return upload(primaryTerm, generationToUpload);
} else {
return upload(primaryTerm, generation);
}
}
}
Expand All @@ -347,10 +324,9 @@ private boolean upload(Long primaryTerm, Long generation) throws IOException {
Translog::getCommitCheckpointFileName
).build()
) {
Releasable transferReleasable = Releasables.wrap(deletionPolicy.acquireTranslogGen(getMinFileGeneration()));
return translogTransferManager.transferSnapshot(
transferSnapshotProvider,
new RemoteFsTranslogTransferListener(transferReleasable, generation, primaryTerm)
new RemoteFsTranslogTransferListener(generation, primaryTerm)
);
}

Expand All @@ -373,8 +349,8 @@ private boolean syncToDisk() throws IOException {
@Override
public void sync() throws IOException {
try {
if ((syncToDisk() || syncNeeded()) && prepareForUpload(null)) {
performUpload(primaryTermSupplier.getAsLong(), null);
if (syncToDisk() || syncNeeded()) {
prepareAndUpload(primaryTermSupplier.getAsLong(), null);
}
} catch (final Exception e) {
tragedy.setTragicException(e);
Expand Down Expand Up @@ -529,16 +505,17 @@ protected void onDelete() {
translogTransferManager.delete();
}

// Visible for testing
boolean isRemoteGenerationDeletionPermitsAvailable() {
return remoteGenerationDeletionPermits.availablePermits() == REMOTE_DELETION_PERMITS;
}

/**
* TranslogTransferListener implementation for RemoteFsTranslog
*
* @opensearch.internal
*/
private class RemoteFsTranslogTransferListener implements TranslogTransferListener {
/**
* Releasable instance for the translog
*/
private final Releasable transferReleasable;

/**
* Generation for the translog
Expand All @@ -550,8 +527,7 @@ private class RemoteFsTranslogTransferListener implements TranslogTransferListen
*/
private final Long primaryTerm;

RemoteFsTranslogTransferListener(Releasable transferReleasable, Long generation, Long primaryTerm) {
this.transferReleasable = transferReleasable;
RemoteFsTranslogTransferListener(Long generation, Long primaryTerm) {
this.generation = generation;
this.primaryTerm = primaryTerm;
}
Expand All @@ -571,10 +547,5 @@ public void onUploadFailed(TransferSnapshot transferSnapshot, Exception ex) thro
throw (RuntimeException) ex;
}
}

@Override
public void close() {
transferReleasable.close();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ public TranslogCheckpointTransferSnapshot build() throws IOException {
translogTransferSnapshot.setMinTranslogGeneration(highestGenMinTranslogGeneration);

assert this.primaryTerm == highestGenPrimaryTerm : "inconsistent primary term";
assert this.generation == highestGeneration : " inconsistent generation ";
final long finalHighestGeneration = highestGeneration;
assert LongStream.iterate(lowestGeneration, i -> i + 1)
.limit(highestGeneration)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ public boolean transferSnapshot(TransferSnapshot transferSnapshot, TranslogTrans
long prevUploadBytesSucceeded = remoteTranslogTransferTracker.getUploadBytesSucceeded();
long prevUploadTimeInMillis = remoteTranslogTransferTracker.getTotalUploadTimeInMillis();

try (translogTransferListener) {
try {
toUpload.addAll(fileTransferTracker.exclusionFilter(transferSnapshot.getTranslogFileSnapshots()));
toUpload.addAll(fileTransferTracker.exclusionFilter((transferSnapshot.getCheckpointFileSnapshots())));
if (toUpload.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
*
* @opensearch.internal
*/
public interface TranslogTransferListener extends AutoCloseable {
public interface TranslogTransferListener {
/**
* Invoked when the transfer of {@link TransferSnapshot} succeeds
* @param transferSnapshot the transfer snapshot
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -620,6 +620,7 @@ public void testSimpleOperationsUpload() throws Exception {

translog.setMinSeqNoToKeep(2);

assertBusy(() -> assertTrue(translog.isRemoteGenerationDeletionPermitsAvailable()));
translog.trimUnreferencedReaders();
assertEquals(1, translog.readers.size());
assertEquals(1, translog.stats().estimatedNumberOfOperations());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,9 +168,6 @@ public void onUploadComplete(TransferSnapshot transferSnapshot) {
public void onUploadFailed(TransferSnapshot transferSnapshot, Exception ex) {
translogTransferFailed.incrementAndGet();
}

@Override
public void close() {}
}));
assertEquals(4, fileTransferSucceeded.get());
assertEquals(0, fileTransferFailed.get());
Expand Down