Skip to content

Commit

Permalink
Create separate listener to track local segments file after refresh
Browse files Browse the repository at this point in the history
Signed-off-by: Ashish Singh <[email protected]>
  • Loading branch information
ashking94 committed Jul 20, 2023
1 parent 6cc8da5 commit 3910aab
Show file tree
Hide file tree
Showing 8 changed files with 185 additions and 95 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@

package org.opensearch.index.remote;

import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.common.CheckedFunction;
import org.opensearch.common.logging.Loggers;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.common.io.stream.Writeable;
Expand All @@ -17,21 +21,26 @@
import org.opensearch.core.index.shard.ShardId;

import java.io.IOException;
import java.util.HashMap;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;

import static org.opensearch.index.shard.RemoteStoreRefreshListener.EXCLUDE_FILES;

/**
* Keeps track of remote refresh which happens in {@link org.opensearch.index.shard.RemoteStoreRefreshListener}. This consist of multiple critical metrics.
*
* @opensearch.internal
*/
public class RemoteRefreshSegmentTracker {

private final Logger logger;

/**
* ShardId for which this instance tracks the remote segment upload metadata.
*/
Expand Down Expand Up @@ -123,14 +132,14 @@ public class RemoteRefreshSegmentTracker {
private final Map<String, AtomicLong> rejectionCountMap = ConcurrentCollections.newConcurrentMap();

/**
* Map of name to size of the segment files created as part of the most recent refresh.
* Keeps track of segment files and their size in bytes which are part of the most recent refresh.
*/
private volatile Map<String, Long> latestLocalFileNameLengthMap;
private final Map<String, Long> latestLocalFileNameLengthMap = ConcurrentCollections.newConcurrentMap();

/**
* Set of names of segment files that were uploaded as part of the most recent remote refresh.
*/
private final Set<String> latestUploadedFiles = new HashSet<>();
private final Set<String> latestUploadedFiles = ConcurrentCollections.newConcurrentSet();

/**
* Keeps the bytes lag computed so that we do not compute it for every request.
Expand Down Expand Up @@ -175,6 +184,7 @@ public RemoteRefreshSegmentTracker(
int uploadBytesPerSecMovingAverageWindowSize,
int uploadTimeMsMovingAverageWindowSize
) {
logger = Loggers.getLogger(getClass(), shardId);
this.shardId = shardId;
// Both the local refresh time and remote refresh time are set with current time to give consistent view of time lag when it arises.
long currentClockTimeMs = System.currentTimeMillis();
Expand All @@ -186,8 +196,6 @@ public RemoteRefreshSegmentTracker(
uploadBytesMovingAverageReference = new AtomicReference<>(new MovingAverage(uploadBytesMovingAverageWindowSize));
uploadBytesPerSecMovingAverageReference = new AtomicReference<>(new MovingAverage(uploadBytesPerSecMovingAverageWindowSize));
uploadTimeMsMovingAverageReference = new AtomicReference<>(new MovingAverage(uploadTimeMsMovingAverageWindowSize));

latestLocalFileNameLengthMap = new HashMap<>();
}

ShardId getShardId() {
Expand Down Expand Up @@ -361,12 +369,36 @@ long getRejectionCount(String rejectionReason) {
return rejectionCountMap.get(rejectionReason).get();
}

Map<String, Long> getLatestLocalFileNameLengthMap() {
return latestLocalFileNameLengthMap;
public Map<String, Long> getLatestLocalFileNameLengthMap() {
return Collections.unmodifiableMap(latestLocalFileNameLengthMap);
}

public void setLatestLocalFileNameLengthMap(Map<String, Long> latestLocalFileNameLengthMap) {
this.latestLocalFileNameLengthMap = latestLocalFileNameLengthMap;
/**
* Updates the latestLocalFileNameLengthMap by adding file name and it's size to the map. The method is given a function as an argument which is used for determining the file size (length in bytes). This method is also provided the collection of segment files which are the latest refresh local segment files. This method also removes the stale segment files from the map that are not part of the input segment files.
*
* @param segmentFiles list of local refreshed segment files
* @param fileSizeFunction function is used to determine the file size in bytes
*/
public void updateLatestLocalFileNameLengthMap(
Collection<String> segmentFiles,
CheckedFunction<String, Long, IOException> fileSizeFunction
) {
// Update the map
segmentFiles.stream()
.filter(file -> EXCLUDE_FILES.contains(file) == false)
.filter(file -> latestLocalFileNameLengthMap.containsKey(file) == false || latestLocalFileNameLengthMap.get(file) == 0)
.forEach(file -> {
long fileSize = 0;
try {
fileSize = fileSizeFunction.apply(file);
} catch (IOException e) {
logger.warn(new ParameterizedMessage("Exception while reading the fileLength of file={}", file), e);
}
latestLocalFileNameLengthMap.put(file, fileSize);
});
Set<String> fileSet = new HashSet<>(segmentFiles);
// Remove keys from the fileSizeMap that do not exist in the latest segment files
latestLocalFileNameLengthMap.entrySet().removeIf(entry -> fileSet.contains(entry.getKey()) == false);
computeBytesLag();
}

Expand All @@ -382,7 +414,7 @@ public void setLatestUploadedFiles(Set<String> files) {
}

private void computeBytesLag() {
if (latestLocalFileNameLengthMap == null || latestLocalFileNameLengthMap.isEmpty()) {
if (latestLocalFileNameLengthMap.isEmpty()) {
return;
}
Set<String> filesNotYetUploaded = latestLocalFileNameLengthMap.keySet()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3675,6 +3675,9 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) thro
}

if (isRemoteStoreEnabled()) {
internalRefreshListener.add(
new RemoteSegmentTrackerListener(this, remoteRefreshSegmentPressureService.getRemoteRefreshSegmentTracker(shardId()))
);
internalRefreshListener.add(
new RemoteStoreRefreshListener(
this,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.index.shard;

import org.apache.logging.log4j.Logger;
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.search.ReferenceManager;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FilterDirectory;
import org.opensearch.common.concurrent.GatedCloseable;
import org.opensearch.common.logging.Loggers;
import org.opensearch.index.remote.RemoteRefreshSegmentTracker;
import org.opensearch.index.store.RemoteSegmentStoreDirectory;

import java.io.IOException;
import java.util.Collection;

/**
* This listener updates the remote segment tracker with the segment files of the most recent refresh. This is helpful in
* determining the lag and hence applying rejection on lagging remote uploads.
*
* @opensearch.internal
*/
public class RemoteSegmentTrackerListener implements ReferenceManager.RefreshListener {

private final Logger logger;
private final IndexShard indexShard;
private final RemoteRefreshSegmentTracker segmentTracker;
private final RemoteSegmentStoreDirectory remoteDirectory;
private final Directory storeDirectory;
private long primaryTerm;

public RemoteSegmentTrackerListener(IndexShard indexShard, RemoteRefreshSegmentTracker segmentTracker) {
this.indexShard = indexShard;
this.segmentTracker = segmentTracker;
logger = Loggers.getLogger(getClass(), indexShard.shardId());
storeDirectory = indexShard.store().directory();
remoteDirectory = (RemoteSegmentStoreDirectory) ((FilterDirectory) ((FilterDirectory) indexShard.remoteStore().directory())
.getDelegate()).getDelegate();
if (indexShard.routingEntry().primary()) {
try {
this.remoteDirectory.init();
} catch (IOException e) {
logger.error("Exception while initialising RemoteSegmentStoreDirectory", e);
}
}
this.primaryTerm = remoteDirectory.getPrimaryTermAtInit();
}

@Override
public void beforeRefresh() throws IOException {}

@Override
public void afterRefresh(boolean didRefresh) throws IOException {
if (didRefresh
|| this.primaryTerm != indexShard.getOperationPrimaryTerm()
|| remoteDirectory.getSegmentsUploadedToRemoteStore().isEmpty()) {
updateLocalRefreshTimeAndSeqNo();
try {
if (this.primaryTerm != indexShard.getOperationPrimaryTerm()) {
this.primaryTerm = indexShard.getOperationPrimaryTerm();
}
try (GatedCloseable<SegmentInfos> segmentInfosGatedCloseable = indexShard.getSegmentInfosSnapshot()) {
Collection<String> localSegmentsPostRefresh = segmentInfosGatedCloseable.get().files(true);
updateLocalSizeMapAndTracker(localSegmentsPostRefresh);
}
} catch (Throwable t) {
logger.error("Exception in RemoteSegmentTrackerListener.afterRefresh()", t);
}
}
}

/**
* Updates map of file name to size of the input segment files in the segment tracker. Uses {@code storeDirectory.fileLength(file)} to get the size.
*
* @param segmentFiles list of segment files that are part of the most recent local refresh.
*/
private void updateLocalSizeMapAndTracker(Collection<String> segmentFiles) {
segmentTracker.updateLatestLocalFileNameLengthMap(segmentFiles, storeDirectory::fileLength);
}

/**
* Updates the last refresh time and refresh seq no which is seen by local store.
*/
private void updateLocalRefreshTimeAndSeqNo() {
segmentTracker.updateLocalRefreshClockTimeMs(System.currentTimeMillis());
segmentTracker.updateLocalRefreshTimeMs(System.nanoTime() / 1_000_000L);
segmentTracker.updateLocalRefreshSeqNo(segmentTracker.getLocalRefreshSeqNo() + 1);
}
}
Loading

0 comments on commit 3910aab

Please sign in to comment.