Skip to content

Commit

Permalink
[Segment Replication] [Backport] Fix timeout issue by calculating tim…
Browse files Browse the repository at this point in the history
…e needed to process getSegmentFiles. (opensearch-project#4434)

* Fix timeout issue by calculating time needed to process getSegmentFiles.

Signed-off-by: Rishikesh1159 <[email protected]>

* Fix PR link in change log.

Signed-off-by: Rishikesh1159 <[email protected]>

* Addressing comments from PR.

Signed-off-by: Rishikesh1159 <[email protected]>

Signed-off-by: Rishikesh1159 <[email protected]>
  • Loading branch information
Rishikesh1159 authored Sep 7, 2022
1 parent 32258cf commit 4de6932
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 1 deletion.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
- [Segment Replication] Update flaky testOnNewCheckpointFromNewPrimaryCancelOngoingReplication unit test ([#4414](https://github.com/opensearch-project/OpenSearch/pull/4414))
- [Segment Replication] Extend FileChunkWriter to allow cancel on transport client ([#4386](https://github.com/opensearch-project/OpenSearch/pull/4386))
- [Segment Replication] Fix NoSuchFileExceptions with segment replication when computing primary metadata snapshots ([#4366](https://github.com/opensearch-project/OpenSearch/pull/4366))
- [Segment Replication] Fix timeout issue by calculating time needed to process getSegmentFiles ([#4434](https://github.com/opensearch-project/OpenSearch/pull/4434))

### Security

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,13 @@
import org.opensearch.action.ActionListener;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.common.io.stream.Writeable;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.index.store.Store;
import org.opensearch.index.store.StoreFileMetadata;
import org.opensearch.indices.recovery.RecoverySettings;
import org.opensearch.indices.recovery.RetryableTransportClient;
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
import org.opensearch.transport.TransportRequestOptions;
import org.opensearch.transport.TransportService;

import java.util.List;
Expand Down Expand Up @@ -78,14 +80,28 @@ public void getSegmentFiles(
) {
final Writeable.Reader<GetSegmentFilesResponse> reader = GetSegmentFilesResponse::new;
final ActionListener<GetSegmentFilesResponse> responseListener = ActionListener.map(listener, r -> r);
// Few of the below assumptions and calculations are added for experimental release of segment replication feature in 2.3
// version. These can change in upcoming releases.

// Storing the size of files to fetch in bytes.
final long sizeOfSegmentFiles = filesToFetch.stream().mapToLong(file -> file.length()).sum();

// Maximum size of files to fetch (segment files) in bytes, that can be processed in 1 minute for a m5.xlarge machine.
long baseSegmentFilesSize = 100000000;

// Formula for calculating time needed to process a replication event's files to fetch process
final long timeToGetSegmentFiles = 1 + (sizeOfSegmentFiles / baseSegmentFilesSize);
final GetSegmentFilesRequest request = new GetSegmentFilesRequest(
replicationId,
targetAllocationId,
targetNode,
filesToFetch,
checkpoint
);
transportClient.executeRetryableAction(GET_SEGMENT_FILES, request, responseListener, reader);
final TransportRequestOptions options = TransportRequestOptions.builder()
.withTimeout(TimeValue.timeValueMinutes(timeToGetSegmentFiles))
.build();
transportClient.executeRetryableAction(GET_SEGMENT_FILES, request, options, responseListener, reader);
}

@Override
Expand Down

0 comments on commit 4de6932

Please sign in to comment.