Skip to content

Commit

Permalink
Refactoring GatedAutoCloseable and moving RecoveryState.Timer (#2965) (
Browse files Browse the repository at this point in the history
…#3014)

* Refactoring GatedAutoCloseable to AutoCloseableRefCounted

This is a part of the process of merging our feature branch - feature/segment-replication - back into main by re-PRing our changes from the feature branch.
GatedAutoCloseable currently wraps a subclass of RefCounted. Segment replication adds another subclass, but this also wraps RefCounted. Both subclasses have the same shutdown hook - decRef. This change makes the superclass less generic to increase code convergence.

The breakdown of the plan to merge segment-replication to main is detailed in #2355
Segment replication design proposal - #2229

Signed-off-by: Kartik Ganesh <[email protected]>

* Minor refactoring in RecoveryState

This change makes two minor updates to RecoveryState -
1. The readRecoveryState API is removed because it can be replaced by an invocation of the constructor
2. The class members of the Timer inner class are changed to private, and accesses are only through the public APIs

Signed-off-by: Kartik Ganesh <[email protected]>

* Update RecoveryTargetTests to test Timer subclasses deterministically

This change removes the use of RandomBoolean in testing the Timer classes and creates a dedicated unit test for each. The common test logic is shared via a private method.

Signed-off-by: Kartik Ganesh <[email protected]>

* Move the RecoveryState.Timer class to a top-level class

This will eventually be reused across both replication use-cases - peer recovery and segment replication.

Signed-off-by: Kartik Ganesh <[email protected]>

* Further update of timer tests in RecoveryTargetTests

Removes a non-deterministic code path around stopping the timer, and avoids assertThat (deprecated)

Signed-off-by: Kartik Ganesh <[email protected]>

* Rename to ReplicationTimer

Signed-off-by: Kartik Ganesh <[email protected]>

* Remove RecoveryTargetTests assert on a running timer

Trying to serialize and deserialize a running Timer instance, and then checking for equality leads to flaky test failures when the ser/deser takes time.

Signed-off-by: Kartik Ganesh <[email protected]>
(cherry picked from commit c7c410a)

Co-authored-by: Kartik Ganesh <[email protected]>
  • Loading branch information
opensearch-trigger-bot[bot] and kartg authored Apr 21, 2022
1 parent 6567da6 commit 40edd54
Show file tree
Hide file tree
Showing 10 changed files with 206 additions and 172 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public TransportRecoveryAction(

@Override
protected RecoveryState readShardResult(StreamInput in) throws IOException {
return RecoveryState.readRecoveryState(in);
return new RecoveryState(in);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,19 @@

package org.opensearch.common.concurrent;

import org.opensearch.common.util.concurrent.RefCounted;

/**
* Decorator class that wraps an object reference with a {@link Runnable} that is
* invoked when {@link #close()} is called. The internal {@link OneWayGate} instance ensures
* that this is invoked only once. See also {@link GatedCloseable}
* Adapter class that enables a {@link RefCounted} implementation to function like an {@link AutoCloseable}.
* The {@link #close()} API invokes {@link RefCounted#decRef()} and ensures idempotency using a {@link OneWayGate}.
*/
public class GatedAutoCloseable<T> implements AutoCloseable {
public class AutoCloseableRefCounted<T extends RefCounted> implements AutoCloseable {

private final T ref;
private final Runnable onClose;
private final OneWayGate gate;

public GatedAutoCloseable(T ref, Runnable onClose) {
public AutoCloseableRefCounted(T ref) {
this.ref = ref;
this.onClose = onClose;
gate = new OneWayGate();
}

Expand All @@ -37,7 +36,7 @@ public T get() {
@Override
public void close() {
if (gate.close()) {
onClose.run();
ref.decRef();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
/**
* Decorator class that wraps an object reference with a {@link CheckedRunnable} that is
* invoked when {@link #close()} is called. The internal {@link OneWayGate} instance ensures
* that this is invoked only once. See also {@link GatedAutoCloseable}
* that this is invoked only once. See also {@link AutoCloseableRefCounted}
*/
public class GatedCloseable<T> implements Closeable {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
import org.opensearch.index.translog.Translog;
import org.opensearch.index.translog.TranslogCorruptedException;
import org.opensearch.indices.recovery.RecoveriesCollection.RecoveryRef;
import org.opensearch.indices.replication.common.ReplicationTimer;
import org.opensearch.tasks.Task;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.ConnectTransportException;
Expand Down Expand Up @@ -215,7 +216,7 @@ private void doRecovery(final long recoveryId, final StartRecoveryRequest preExi
final String actionName;
final TransportRequest requestToSend;
final StartRecoveryRequest startRequest;
final RecoveryState.Timer timer;
final ReplicationTimer timer;
try (RecoveryRef recoveryRef = onGoingRecoveries.getRecovery(recoveryId)) {
if (recoveryRef == null) {
logger.trace("not running recovery with id [{}] - can not find it (probably finished)", recoveryId);
Expand Down Expand Up @@ -622,9 +623,9 @@ private class RecoveryResponseHandler implements TransportResponseHandler<Recove

private final long recoveryId;
private final StartRecoveryRequest request;
private final RecoveryState.Timer timer;
private final ReplicationTimer timer;

private RecoveryResponseHandler(final StartRecoveryRequest request, final RecoveryState.Timer timer) {
private RecoveryResponseHandler(final StartRecoveryRequest request, final ReplicationTimer timer) {
this.recoveryId = request.recoveryId();
this.request = request;
this.timer = timer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.OpenSearchTimeoutException;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.common.concurrent.GatedAutoCloseable;
import org.opensearch.common.concurrent.AutoCloseableRefCounted;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.concurrent.AbstractRunnable;
import org.opensearch.common.util.concurrent.ConcurrentCollections;
Expand Down Expand Up @@ -273,14 +273,14 @@ public boolean cancelRecoveriesForShard(ShardId shardId, String reason) {
* causes {@link RecoveryTarget#decRef()} to be called. This makes sure that the underlying resources
* will not be freed until {@link RecoveryRef#close()} is called.
*/
public static class RecoveryRef extends GatedAutoCloseable<RecoveryTarget> {
public static class RecoveryRef extends AutoCloseableRefCounted<RecoveryTarget> {

/**
* Important: {@link RecoveryTarget#tryIncRef()} should
* be *successfully* called on status before
*/
public RecoveryRef(RecoveryTarget status) {
super(status, status::decRef);
super(status);
status.setLastAccessTime();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.shard.ShardId;
import org.opensearch.index.store.StoreStats;
import org.opensearch.indices.replication.common.ReplicationTimer;

import java.io.IOException;
import java.util.ArrayList;
Expand Down Expand Up @@ -122,7 +123,7 @@ public static Stage fromId(byte id) {
private final Index index;
private final Translog translog;
private final VerifyIndex verifyIndex;
private final Timer timer;
private final ReplicationTimer timer;

private RecoverySource recoverySource;
private ShardId shardId;
Expand All @@ -149,12 +150,12 @@ public RecoveryState(ShardRouting shardRouting, DiscoveryNode targetNode, @Nulla
this.index = index;
translog = new Translog();
verifyIndex = new VerifyIndex();
timer = new Timer();
timer = new ReplicationTimer();
timer.start();
}

public RecoveryState(StreamInput in) throws IOException {
timer = new Timer(in);
timer = new ReplicationTimer(in);
stage = Stage.fromId(in.readByte());
shardId = new ShardId(in);
recoverySource = RecoverySource.readFrom(in);
Expand Down Expand Up @@ -256,7 +257,7 @@ public Translog getTranslog() {
return translog;
}

public Timer getTimer() {
public ReplicationTimer getTimer() {
return timer;
}

Expand All @@ -280,20 +281,16 @@ public boolean getPrimary() {
return primary;
}

public static RecoveryState readRecoveryState(StreamInput in) throws IOException {
return new RecoveryState(in);
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {

builder.field(Fields.ID, shardId.id());
builder.field(Fields.TYPE, recoverySource.getType());
builder.field(Fields.STAGE, stage.toString());
builder.field(Fields.PRIMARY, primary);
builder.timeField(Fields.START_TIME_IN_MILLIS, Fields.START_TIME, timer.startTime);
if (timer.stopTime > 0) {
builder.timeField(Fields.STOP_TIME_IN_MILLIS, Fields.STOP_TIME, timer.stopTime);
builder.timeField(Fields.START_TIME_IN_MILLIS, Fields.START_TIME, timer.startTime());
if (timer.stopTime() > 0) {
builder.timeField(Fields.STOP_TIME_IN_MILLIS, Fields.STOP_TIME, timer.stopTime());
}
builder.humanReadableField(Fields.TOTAL_TIME_IN_MILLIS, Fields.TOTAL_TIME, new TimeValue(timer.time()));

Expand Down Expand Up @@ -375,78 +372,7 @@ static final class Fields {
static final String TARGET_THROTTLE_TIME_IN_MILLIS = "target_throttle_time_in_millis";
}

public static class Timer implements Writeable {
protected long startTime = 0;
protected long startNanoTime = 0;
protected long time = -1;
protected long stopTime = 0;

public Timer() {}

public Timer(StreamInput in) throws IOException {
startTime = in.readVLong();
startNanoTime = in.readVLong();
stopTime = in.readVLong();
time = in.readVLong();
}

@Override
public synchronized void writeTo(StreamOutput out) throws IOException {
out.writeVLong(startTime);
out.writeVLong(startNanoTime);
out.writeVLong(stopTime);
// write a snapshot of current time, which is not per se the time field
out.writeVLong(time());
}

public synchronized void start() {
assert startTime == 0 : "already started";
startTime = System.currentTimeMillis();
startNanoTime = System.nanoTime();
}

/** Returns start time in millis */
public synchronized long startTime() {
return startTime;
}

/** Returns elapsed time in millis, or 0 if timer was not started */
public synchronized long time() {
if (startNanoTime == 0) {
return 0;
}
if (time >= 0) {
return time;
}
return Math.max(0, TimeValue.nsecToMSec(System.nanoTime() - startNanoTime));
}

/** Returns stop time in millis */
public synchronized long stopTime() {
return stopTime;
}

public synchronized void stop() {
assert stopTime == 0 : "already stopped";
stopTime = Math.max(System.currentTimeMillis(), startTime);
time = TimeValue.nsecToMSec(System.nanoTime() - startNanoTime);
assert time >= 0;
}

public synchronized void reset() {
startTime = 0;
startNanoTime = 0;
time = -1;
stopTime = 0;
}

// for tests
public long getStartNanoTime() {
return startNanoTime;
}
}

public static class VerifyIndex extends Timer implements ToXContentFragment, Writeable {
public static class VerifyIndex extends ReplicationTimer implements ToXContentFragment, Writeable {
private volatile long checkIndexTime;

public VerifyIndex() {}
Expand Down Expand Up @@ -483,7 +409,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
}
}

public static class Translog extends Timer implements ToXContentFragment, Writeable {
public static class Translog extends ReplicationTimer implements ToXContentFragment, Writeable {
public static final int UNKNOWN = -1;

private int recovered;
Expand Down Expand Up @@ -819,7 +745,7 @@ public boolean isComplete() {
}
}

public static class Index extends Timer implements ToXContentFragment, Writeable {
public static class Index extends ReplicationTimer implements ToXContentFragment, Writeable {
private final RecoveryFilesDetails fileDetails;

public static final long UNKNOWN = -1L;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.indices.replication.common;

import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
import org.opensearch.common.io.stream.Writeable;
import org.opensearch.common.unit.TimeValue;

import java.io.IOException;

/**
* A serializable timer that is used to measure the time taken for
* file replication operations like recovery.
*/
public class ReplicationTimer implements Writeable {
private long startTime = 0;
private long startNanoTime = 0;
private long time = -1;
private long stopTime = 0;

public ReplicationTimer() {}

public ReplicationTimer(StreamInput in) throws IOException {
startTime = in.readVLong();
startNanoTime = in.readVLong();
stopTime = in.readVLong();
time = in.readVLong();
}

@Override
public synchronized void writeTo(StreamOutput out) throws IOException {
out.writeVLong(startTime);
out.writeVLong(startNanoTime);
out.writeVLong(stopTime);
// write a snapshot of current time, which is not per se the time field
out.writeVLong(time());
}

public synchronized void start() {
assert startTime == 0 : "already started";
startTime = System.currentTimeMillis();
startNanoTime = System.nanoTime();
}

/**
* Returns start time in millis
*/
public synchronized long startTime() {
return startTime;
}

/**
* Returns elapsed time in millis, or 0 if timer was not started
*/
public synchronized long time() {
if (startNanoTime == 0) {
return 0;
}
if (time >= 0) {
return time;
}
return Math.max(0, TimeValue.nsecToMSec(System.nanoTime() - startNanoTime));
}

/**
* Returns stop time in millis
*/
public synchronized long stopTime() {
return stopTime;
}

public synchronized void stop() {
assert stopTime == 0 : "already stopped";
stopTime = Math.max(System.currentTimeMillis(), startTime);
time = TimeValue.nsecToMSec(System.nanoTime() - startNanoTime);
assert time >= 0;
}

public synchronized void reset() {
startTime = 0;
startNanoTime = 0;
time = -1;
stopTime = 0;
}

// only used in tests
public long getStartNanoTime() {
return startNanoTime;
}
}
Loading

0 comments on commit 40edd54

Please sign in to comment.