diff --git a/server/src/main/java/org/opensearch/action/admin/indices/recovery/TransportRecoveryAction.java b/server/src/main/java/org/opensearch/action/admin/indices/recovery/TransportRecoveryAction.java index dd5ae31c01e56..7c3666e44f093 100644 --- a/server/src/main/java/org/opensearch/action/admin/indices/recovery/TransportRecoveryAction.java +++ b/server/src/main/java/org/opensearch/action/admin/indices/recovery/TransportRecoveryAction.java @@ -87,7 +87,7 @@ public TransportRecoveryAction( @Override protected RecoveryState readShardResult(StreamInput in) throws IOException { - return RecoveryState.readRecoveryState(in); + return new RecoveryState(in); } @Override diff --git a/server/src/main/java/org/opensearch/common/concurrent/GatedAutoCloseable.java b/server/src/main/java/org/opensearch/common/concurrent/AutoCloseableRefCounted.java similarity index 57% rename from server/src/main/java/org/opensearch/common/concurrent/GatedAutoCloseable.java rename to server/src/main/java/org/opensearch/common/concurrent/AutoCloseableRefCounted.java index cb819c0320e91..795d352542881 100644 --- a/server/src/main/java/org/opensearch/common/concurrent/GatedAutoCloseable.java +++ b/server/src/main/java/org/opensearch/common/concurrent/AutoCloseableRefCounted.java @@ -13,20 +13,19 @@ package org.opensearch.common.concurrent; +import org.opensearch.common.util.concurrent.RefCounted; + /** - * Decorator class that wraps an object reference with a {@link Runnable} that is - * invoked when {@link #close()} is called. The internal {@link OneWayGate} instance ensures - * that this is invoked only once. See also {@link GatedCloseable} + * Adapter class that enables a {@link RefCounted} implementation to function like an {@link AutoCloseable}. + * The {@link #close()} API invokes {@link RefCounted#decRef()} and ensures idempotency using a {@link OneWayGate}. */ -public class GatedAutoCloseable implements AutoCloseable { +public class AutoCloseableRefCounted implements AutoCloseable { private final T ref; - private final Runnable onClose; private final OneWayGate gate; - public GatedAutoCloseable(T ref, Runnable onClose) { + public AutoCloseableRefCounted(T ref) { this.ref = ref; - this.onClose = onClose; gate = new OneWayGate(); } @@ -37,7 +36,7 @@ public T get() { @Override public void close() { if (gate.close()) { - onClose.run(); + ref.decRef(); } } } diff --git a/server/src/main/java/org/opensearch/common/concurrent/GatedCloseable.java b/server/src/main/java/org/opensearch/common/concurrent/GatedCloseable.java index d98e4cca8d561..467b5e4cfb3ea 100644 --- a/server/src/main/java/org/opensearch/common/concurrent/GatedCloseable.java +++ b/server/src/main/java/org/opensearch/common/concurrent/GatedCloseable.java @@ -21,7 +21,7 @@ /** * Decorator class that wraps an object reference with a {@link CheckedRunnable} that is * invoked when {@link #close()} is called. The internal {@link OneWayGate} instance ensures - * that this is invoked only once. See also {@link GatedAutoCloseable} + * that this is invoked only once. See also {@link AutoCloseableRefCounted} */ public class GatedCloseable implements Closeable { diff --git a/server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java b/server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java index d7c3421b1de93..9348988f8edcc 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java +++ b/server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java @@ -70,6 +70,7 @@ import org.opensearch.index.translog.Translog; import org.opensearch.index.translog.TranslogCorruptedException; import org.opensearch.indices.recovery.RecoveriesCollection.RecoveryRef; +import org.opensearch.indices.replication.common.ReplicationTimer; import org.opensearch.tasks.Task; import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.ConnectTransportException; @@ -215,7 +216,7 @@ private void doRecovery(final long recoveryId, final StartRecoveryRequest preExi final String actionName; final TransportRequest requestToSend; final StartRecoveryRequest startRequest; - final RecoveryState.Timer timer; + final ReplicationTimer timer; try (RecoveryRef recoveryRef = onGoingRecoveries.getRecovery(recoveryId)) { if (recoveryRef == null) { logger.trace("not running recovery with id [{}] - can not find it (probably finished)", recoveryId); @@ -622,9 +623,9 @@ private class RecoveryResponseHandler implements TransportResponseHandler { + public static class RecoveryRef extends AutoCloseableRefCounted { /** * Important: {@link RecoveryTarget#tryIncRef()} should * be *successfully* called on status before */ public RecoveryRef(RecoveryTarget status) { - super(status, status::decRef); + super(status); status.setLastAccessTime(); } } diff --git a/server/src/main/java/org/opensearch/indices/recovery/RecoveryState.java b/server/src/main/java/org/opensearch/indices/recovery/RecoveryState.java index d89d59e2f2c1b..9f57a0ebd4d0f 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/RecoveryState.java +++ b/server/src/main/java/org/opensearch/indices/recovery/RecoveryState.java @@ -50,6 +50,7 @@ import org.opensearch.index.shard.IndexShard; import org.opensearch.index.shard.ShardId; import org.opensearch.index.store.StoreStats; +import org.opensearch.indices.replication.common.ReplicationTimer; import java.io.IOException; import java.util.ArrayList; @@ -122,7 +123,7 @@ public static Stage fromId(byte id) { private final Index index; private final Translog translog; private final VerifyIndex verifyIndex; - private final Timer timer; + private final ReplicationTimer timer; private RecoverySource recoverySource; private ShardId shardId; @@ -149,12 +150,12 @@ public RecoveryState(ShardRouting shardRouting, DiscoveryNode targetNode, @Nulla this.index = index; translog = new Translog(); verifyIndex = new VerifyIndex(); - timer = new Timer(); + timer = new ReplicationTimer(); timer.start(); } public RecoveryState(StreamInput in) throws IOException { - timer = new Timer(in); + timer = new ReplicationTimer(in); stage = Stage.fromId(in.readByte()); shardId = new ShardId(in); recoverySource = RecoverySource.readFrom(in); @@ -256,7 +257,7 @@ public Translog getTranslog() { return translog; } - public Timer getTimer() { + public ReplicationTimer getTimer() { return timer; } @@ -280,10 +281,6 @@ public boolean getPrimary() { return primary; } - public static RecoveryState readRecoveryState(StreamInput in) throws IOException { - return new RecoveryState(in); - } - @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { @@ -291,9 +288,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.field(Fields.TYPE, recoverySource.getType()); builder.field(Fields.STAGE, stage.toString()); builder.field(Fields.PRIMARY, primary); - builder.timeField(Fields.START_TIME_IN_MILLIS, Fields.START_TIME, timer.startTime); - if (timer.stopTime > 0) { - builder.timeField(Fields.STOP_TIME_IN_MILLIS, Fields.STOP_TIME, timer.stopTime); + builder.timeField(Fields.START_TIME_IN_MILLIS, Fields.START_TIME, timer.startTime()); + if (timer.stopTime() > 0) { + builder.timeField(Fields.STOP_TIME_IN_MILLIS, Fields.STOP_TIME, timer.stopTime()); } builder.humanReadableField(Fields.TOTAL_TIME_IN_MILLIS, Fields.TOTAL_TIME, new TimeValue(timer.time())); @@ -375,78 +372,7 @@ static final class Fields { static final String TARGET_THROTTLE_TIME_IN_MILLIS = "target_throttle_time_in_millis"; } - public static class Timer implements Writeable { - protected long startTime = 0; - protected long startNanoTime = 0; - protected long time = -1; - protected long stopTime = 0; - - public Timer() {} - - public Timer(StreamInput in) throws IOException { - startTime = in.readVLong(); - startNanoTime = in.readVLong(); - stopTime = in.readVLong(); - time = in.readVLong(); - } - - @Override - public synchronized void writeTo(StreamOutput out) throws IOException { - out.writeVLong(startTime); - out.writeVLong(startNanoTime); - out.writeVLong(stopTime); - // write a snapshot of current time, which is not per se the time field - out.writeVLong(time()); - } - - public synchronized void start() { - assert startTime == 0 : "already started"; - startTime = System.currentTimeMillis(); - startNanoTime = System.nanoTime(); - } - - /** Returns start time in millis */ - public synchronized long startTime() { - return startTime; - } - - /** Returns elapsed time in millis, or 0 if timer was not started */ - public synchronized long time() { - if (startNanoTime == 0) { - return 0; - } - if (time >= 0) { - return time; - } - return Math.max(0, TimeValue.nsecToMSec(System.nanoTime() - startNanoTime)); - } - - /** Returns stop time in millis */ - public synchronized long stopTime() { - return stopTime; - } - - public synchronized void stop() { - assert stopTime == 0 : "already stopped"; - stopTime = Math.max(System.currentTimeMillis(), startTime); - time = TimeValue.nsecToMSec(System.nanoTime() - startNanoTime); - assert time >= 0; - } - - public synchronized void reset() { - startTime = 0; - startNanoTime = 0; - time = -1; - stopTime = 0; - } - - // for tests - public long getStartNanoTime() { - return startNanoTime; - } - } - - public static class VerifyIndex extends Timer implements ToXContentFragment, Writeable { + public static class VerifyIndex extends ReplicationTimer implements ToXContentFragment, Writeable { private volatile long checkIndexTime; public VerifyIndex() {} @@ -483,7 +409,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws } } - public static class Translog extends Timer implements ToXContentFragment, Writeable { + public static class Translog extends ReplicationTimer implements ToXContentFragment, Writeable { public static final int UNKNOWN = -1; private int recovered; @@ -819,7 +745,7 @@ public boolean isComplete() { } } - public static class Index extends Timer implements ToXContentFragment, Writeable { + public static class Index extends ReplicationTimer implements ToXContentFragment, Writeable { private final RecoveryFilesDetails fileDetails; public static final long UNKNOWN = -1L; diff --git a/server/src/main/java/org/opensearch/indices/replication/common/ReplicationTimer.java b/server/src/main/java/org/opensearch/indices/replication/common/ReplicationTimer.java new file mode 100644 index 0000000000000..976df28265d9a --- /dev/null +++ b/server/src/main/java/org/opensearch/indices/replication/common/ReplicationTimer.java @@ -0,0 +1,97 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices.replication.common; + +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.io.stream.StreamOutput; +import org.opensearch.common.io.stream.Writeable; +import org.opensearch.common.unit.TimeValue; + +import java.io.IOException; + +/** + * A serializable timer that is used to measure the time taken for + * file replication operations like recovery. + */ +public class ReplicationTimer implements Writeable { + private long startTime = 0; + private long startNanoTime = 0; + private long time = -1; + private long stopTime = 0; + + public ReplicationTimer() {} + + public ReplicationTimer(StreamInput in) throws IOException { + startTime = in.readVLong(); + startNanoTime = in.readVLong(); + stopTime = in.readVLong(); + time = in.readVLong(); + } + + @Override + public synchronized void writeTo(StreamOutput out) throws IOException { + out.writeVLong(startTime); + out.writeVLong(startNanoTime); + out.writeVLong(stopTime); + // write a snapshot of current time, which is not per se the time field + out.writeVLong(time()); + } + + public synchronized void start() { + assert startTime == 0 : "already started"; + startTime = System.currentTimeMillis(); + startNanoTime = System.nanoTime(); + } + + /** + * Returns start time in millis + */ + public synchronized long startTime() { + return startTime; + } + + /** + * Returns elapsed time in millis, or 0 if timer was not started + */ + public synchronized long time() { + if (startNanoTime == 0) { + return 0; + } + if (time >= 0) { + return time; + } + return Math.max(0, TimeValue.nsecToMSec(System.nanoTime() - startNanoTime)); + } + + /** + * Returns stop time in millis + */ + public synchronized long stopTime() { + return stopTime; + } + + public synchronized void stop() { + assert stopTime == 0 : "already stopped"; + stopTime = Math.max(System.currentTimeMillis(), startTime); + time = TimeValue.nsecToMSec(System.nanoTime() - startNanoTime); + assert time >= 0; + } + + public synchronized void reset() { + startTime = 0; + startNanoTime = 0; + time = -1; + stopTime = 0; + } + + // only used in tests + public long getStartNanoTime() { + return startNanoTime; + } +} diff --git a/server/src/test/java/org/opensearch/common/concurrent/GatedAutoCloseableTests.java b/server/src/test/java/org/opensearch/common/concurrent/AutoCloseableRefCountedTests.java similarity index 50% rename from server/src/test/java/org/opensearch/common/concurrent/GatedAutoCloseableTests.java rename to server/src/test/java/org/opensearch/common/concurrent/AutoCloseableRefCountedTests.java index 63058da8f163a..344368988f5ff 100644 --- a/server/src/test/java/org/opensearch/common/concurrent/GatedAutoCloseableTests.java +++ b/server/src/test/java/org/opensearch/common/concurrent/AutoCloseableRefCountedTests.java @@ -14,33 +14,36 @@ package org.opensearch.common.concurrent; import org.junit.Before; +import org.opensearch.common.util.concurrent.RefCounted; import org.opensearch.test.OpenSearchTestCase; -import java.util.concurrent.atomic.AtomicInteger; +import static org.mockito.Mockito.atMostOnce; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; -public class GatedAutoCloseableTests extends OpenSearchTestCase { +public class AutoCloseableRefCountedTests extends OpenSearchTestCase { - private AtomicInteger testRef; - private GatedAutoCloseable testObject; + private RefCounted mockRefCounted; + private AutoCloseableRefCounted testObject; @Before public void setup() { - testRef = new AtomicInteger(0); - testObject = new GatedAutoCloseable<>(testRef, testRef::incrementAndGet); + mockRefCounted = mock(RefCounted.class); + testObject = new AutoCloseableRefCounted<>(mockRefCounted); } public void testGet() { - assertEquals(0, testObject.get().get()); + assertEquals(mockRefCounted, testObject.get()); } public void testClose() { testObject.close(); - assertEquals(1, testObject.get().get()); + verify(mockRefCounted, atMostOnce()).decRef(); } public void testIdempotent() { testObject.close(); testObject.close(); - assertEquals(1, testObject.get().get()); + verify(mockRefCounted, atMostOnce()).decRef(); } } diff --git a/server/src/test/java/org/opensearch/indices/recovery/RecoveryTargetTests.java b/server/src/test/java/org/opensearch/indices/recovery/RecoveryTargetTests.java index 5d0d9bca8b3fb..dd4b17fbac5de 100644 --- a/server/src/test/java/org/opensearch/indices/recovery/RecoveryTargetTests.java +++ b/server/src/test/java/org/opensearch/indices/recovery/RecoveryTargetTests.java @@ -44,9 +44,9 @@ import org.opensearch.indices.recovery.RecoveryState.FileDetail; import org.opensearch.indices.recovery.RecoveryState.Index; import org.opensearch.indices.recovery.RecoveryState.Stage; -import org.opensearch.indices.recovery.RecoveryState.Timer; import org.opensearch.indices.recovery.RecoveryState.Translog; import org.opensearch.indices.recovery.RecoveryState.VerifyIndex; +import org.opensearch.indices.replication.common.ReplicationTimer; import org.opensearch.test.OpenSearchTestCase; import java.io.IOException; @@ -63,9 +63,7 @@ import static org.hamcrest.Matchers.closeTo; import static org.hamcrest.Matchers.either; import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.greaterThanOrEqualTo; -import static org.hamcrest.Matchers.lessThan; import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.hamcrest.Matchers.startsWith; @@ -124,72 +122,81 @@ public void run() { } } - public void testTimers() throws Throwable { - final Timer timer; - Streamer streamer; + public void testTimer() throws Throwable { AtomicBoolean stop = new AtomicBoolean(); - if (randomBoolean()) { - timer = new Timer(); - streamer = new Streamer(stop, timer) { - @Override - Timer createObj(StreamInput in) throws IOException { - return new Timer(in); - } - }; - } else if (randomBoolean()) { - timer = new Index(); - streamer = new Streamer(stop, timer) { - @Override - Timer createObj(StreamInput in) throws IOException { - return new Index(in); - } - }; - } else if (randomBoolean()) { - timer = new VerifyIndex(); - streamer = new Streamer(stop, timer) { - @Override - Timer createObj(StreamInput in) throws IOException { - return new VerifyIndex(in); - } - }; - } else { - timer = new Translog(); - streamer = new Streamer(stop, timer) { - @Override - Timer createObj(StreamInput in) throws IOException { - return new Translog(in); - } - }; - } + final ReplicationTimer timer = new ReplicationTimer(); + Streamer streamer = new Streamer<>(stop, timer) { + @Override + ReplicationTimer createObj(StreamInput in) throws IOException { + return new ReplicationTimer(in); + } + }; + doTimerTest(timer, streamer); + } + + public void testIndexTimer() throws Throwable { + AtomicBoolean stop = new AtomicBoolean(); + Index index = new Index(); + Streamer streamer = new Streamer<>(stop, index) { + @Override + Index createObj(StreamInput in) throws IOException { + return new Index(in); + } + }; + doTimerTest(index, streamer); + } + public void testVerifyIndexTimer() throws Throwable { + AtomicBoolean stop = new AtomicBoolean(); + VerifyIndex verifyIndex = new VerifyIndex(); + Streamer streamer = new Streamer<>(stop, verifyIndex) { + @Override + VerifyIndex createObj(StreamInput in) throws IOException { + return new VerifyIndex(in); + } + }; + doTimerTest(verifyIndex, streamer); + } + + public void testTranslogTimer() throws Throwable { + AtomicBoolean stop = new AtomicBoolean(); + Translog translog = new Translog(); + Streamer streamer = new Streamer<>(stop, translog) { + @Override + Translog createObj(StreamInput in) throws IOException { + return new Translog(in); + } + }; + doTimerTest(translog, streamer); + } + + private void doTimerTest(ReplicationTimer timer, Streamer streamer) throws Exception { timer.start(); - assertThat(timer.startTime(), greaterThan(0L)); - assertThat(timer.stopTime(), equalTo(0L)); - Timer lastRead = streamer.serializeDeserialize(); + assertTrue(timer.startTime() > 0); + assertEquals(0, timer.stopTime()); + ReplicationTimer lastRead = streamer.serializeDeserialize(); final long time = lastRead.time(); - assertThat(time, lessThanOrEqualTo(timer.time())); - assertBusy(() -> assertThat("timer timer should progress compared to captured one ", time, lessThan(timer.time()))); - assertThat("captured time shouldn't change", lastRead.time(), equalTo(time)); + assertBusy(() -> assertTrue("timer timer should progress compared to captured one ", time < timer.time())); + assertEquals("captured time shouldn't change", time, lastRead.time()); - if (randomBoolean()) { - timer.stop(); - assertThat(timer.stopTime(), greaterThanOrEqualTo(timer.startTime())); - assertThat(timer.time(), greaterThan(0L)); - lastRead = streamer.serializeDeserialize(); - assertThat(lastRead.startTime(), equalTo(timer.startTime())); - assertThat(lastRead.time(), equalTo(timer.time())); - assertThat(lastRead.stopTime(), equalTo(timer.stopTime())); - } + timer.stop(); + assertTrue(timer.stopTime() >= timer.startTime()); + assertTrue(timer.time() > 0); + // validate captured time + lastRead = streamer.serializeDeserialize(); + assertEquals(timer.startTime(), lastRead.startTime()); + assertEquals(timer.time(), lastRead.time()); + assertEquals(timer.stopTime(), lastRead.stopTime()); timer.reset(); - assertThat(timer.startTime(), equalTo(0L)); - assertThat(timer.time(), equalTo(0L)); - assertThat(timer.stopTime(), equalTo(0L)); + assertEquals(0, timer.startTime()); + assertEquals(0, timer.time()); + assertEquals(0, timer.stopTime()); + // validate captured time lastRead = streamer.serializeDeserialize(); - assertThat(lastRead.startTime(), equalTo(0L)); - assertThat(lastRead.time(), equalTo(0L)); - assertThat(lastRead.stopTime(), equalTo(0L)); - + assertEquals(0, lastRead.startTime()); + assertEquals(0, lastRead.time()); + assertEquals(0, lastRead.stopTime()); } public void testIndex() throws Throwable { diff --git a/server/src/test/java/org/opensearch/rest/action/cat/RestRecoveryActionTests.java b/server/src/test/java/org/opensearch/rest/action/cat/RestRecoveryActionTests.java index 7966d2961c29a..e7eb9cbf24015 100644 --- a/server/src/test/java/org/opensearch/rest/action/cat/RestRecoveryActionTests.java +++ b/server/src/test/java/org/opensearch/rest/action/cat/RestRecoveryActionTests.java @@ -45,6 +45,7 @@ import org.opensearch.index.Index; import org.opensearch.index.shard.ShardId; import org.opensearch.indices.recovery.RecoveryState; +import org.opensearch.indices.replication.common.ReplicationTimer; import org.opensearch.test.OpenSearchTestCase; import java.util.ArrayList; @@ -72,7 +73,7 @@ public void testRestRecoveryAction() { for (int i = 0; i < successfulShards; i++) { final RecoveryState state = mock(RecoveryState.class); when(state.getShardId()).thenReturn(new ShardId(new Index("index", "_na_"), i)); - final RecoveryState.Timer timer = mock(RecoveryState.Timer.class); + final ReplicationTimer timer = mock(ReplicationTimer.class); final long startTime = randomLongBetween(0, new Date().getTime()); when(timer.startTime()).thenReturn(startTime); final long time = randomLongBetween(1000000, 10 * 1000000);