Skip to content

Commit

Permalink
Merge branch '2.x' into backport-9622-to-2.x
Browse files Browse the repository at this point in the history
Signed-off-by: Jay Deng <[email protected]>
  • Loading branch information
jed326 authored Aug 31, 2023
2 parents 141a5f5 + a3548ba commit 3cac1e5
Show file tree
Hide file tree
Showing 40 changed files with 2,312 additions and 805 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add support for reading partial files to HDFS repository ([#9513](https://github.com/opensearch-project/OpenSearch/issues/9513))
- [Remote Store] Rate limiter integration for remote store uploads and downloads([#9448](https://github.com/opensearch-project/OpenSearch/pull/9448/))
- Add support for extensions to search responses using SearchExtBuilder ([#9379](https://github.com/opensearch-project/OpenSearch/pull/9379))
- [Remote State] Create service to publish cluster state to remote store ([#9160](https://github.com/opensearch-project/OpenSearch/pull/9160))
- Add concurrent segment search related metrics to node and index stats ([#9622](https://github.com/opensearch-project/OpenSearch/issues/9622))

### Dependencies
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.benchmark.index.mapper;

import org.apache.lucene.util.BytesRef;
import org.opensearch.index.mapper.BinaryFieldMapper;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.Warmup;
import org.openjdk.jmh.infra.Blackhole;

import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;

@Warmup(iterations = 1)
@Measurement(iterations = 1)
@Fork(1)
@BenchmarkMode(Mode.Throughput)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
@State(Scope.Thread)
@SuppressWarnings("unused") // invoked by benchmarking framework
public class CustomBinaryDocValuesFieldBenchmark {

static final String FIELD_NAME = "dummy";
static final String SEED_VALUE = "seed";

@Benchmark
public void add(CustomBinaryDocValuesFieldBenchmark.BenchmarkParameters parameters, Blackhole blackhole) {
// Don't use the parameter binary doc values object.
// Start with a fresh object every call and add maximum number of entries
BinaryFieldMapper.CustomBinaryDocValuesField customBinaryDocValuesField = new BinaryFieldMapper.CustomBinaryDocValuesField(
FIELD_NAME,
new BytesRef(SEED_VALUE).bytes
);
for (int i = 0; i < parameters.maximumNumberOfEntries; ++i) {
ThreadLocalRandom.current().nextBytes(parameters.bytes);
customBinaryDocValuesField.add(parameters.bytes);
}
}

@Benchmark
public void binaryValue(CustomBinaryDocValuesFieldBenchmark.BenchmarkParameters parameters, Blackhole blackhole) {
blackhole.consume(parameters.customBinaryDocValuesField.binaryValue());
}

@State(Scope.Benchmark)
public static class BenchmarkParameters {
@Param({ "8", "32", "128", "512" })
int maximumNumberOfEntries;

@Param({ "8", "32", "128", "512" })
int entrySize;

BinaryFieldMapper.CustomBinaryDocValuesField customBinaryDocValuesField;
byte[] bytes;

@Setup
public void setup() {
customBinaryDocValuesField = new BinaryFieldMapper.CustomBinaryDocValuesField(FIELD_NAME, new BytesRef(SEED_VALUE).bytes);
bytes = new byte[entrySize];
for (int i = 0; i < maximumNumberOfEntries; ++i) {
ThreadLocalRandom.current().nextBytes(bytes);
customBinaryDocValuesField.add(bytes);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,16 +62,4 @@ public void setup() {
public void teardown() {
assertAcked(clusterAdmin().prepareDeleteRepository(REPOSITORY_NAME));
}

@AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/7592")
@Override
public void testPressureServiceStats() throws Exception {
super.testPressureServiceStats();
}

@Override
@AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/8059")
public void testDropPrimaryDuringReplication() throws Exception {
super.testDropPrimaryDuringReplication();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.index.IndexService;
import org.opensearch.index.remote.RemoteSegmentTransferTracker;
import org.opensearch.index.remote.RemoteStorePressureService;
import org.opensearch.index.remote.RemoteStoreStatsTrackerFactory;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.shard.ShardNotFoundException;
import org.opensearch.indices.IndicesService;
Expand All @@ -50,7 +50,7 @@ public class TransportRemoteStoreStatsAction extends TransportBroadcastByNodeAct

private final IndicesService indicesService;

private final RemoteStorePressureService remoteStorePressureService;
private final RemoteStoreStatsTrackerFactory remoteStoreStatsTrackerFactory;

@Inject
public TransportRemoteStoreStatsAction(
Expand All @@ -59,7 +59,7 @@ public TransportRemoteStoreStatsAction(
IndicesService indicesService,
ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver,
RemoteStorePressureService remoteStorePressureService
RemoteStoreStatsTrackerFactory remoteStoreStatsTrackerFactory
) {
super(
RemoteStoreStatsAction.NAME,
Expand All @@ -71,7 +71,7 @@ public TransportRemoteStoreStatsAction(
ThreadPool.Names.MANAGEMENT
);
this.indicesService = indicesService;
this.remoteStorePressureService = remoteStorePressureService;
this.remoteStoreStatsTrackerFactory = remoteStoreStatsTrackerFactory;
}

/**
Expand Down Expand Up @@ -153,7 +153,7 @@ protected RemoteStoreStats shardOperation(RemoteStoreStatsRequest request, Shard
throw new ShardNotFoundException(indexShard.shardId());
}

RemoteSegmentTransferTracker remoteSegmentTransferTracker = remoteStorePressureService.getRemoteRefreshSegmentTracker(
RemoteSegmentTransferTracker remoteSegmentTransferTracker = remoteStoreStatsTrackerFactory.getRemoteSegmentTransferTracker(
indexShard.shardId()
);
assert Objects.nonNull(remoteSegmentTransferTracker);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@
import org.opensearch.gateway.DanglingIndicesState;
import org.opensearch.gateway.GatewayService;
import org.opensearch.gateway.PersistedClusterStateService;
import org.opensearch.gateway.remote.RemoteClusterStateService;
import org.opensearch.http.HttpTransportSettings;
import org.opensearch.index.IndexModule;
import org.opensearch.index.IndexSettings;
Expand All @@ -106,6 +107,7 @@
import org.opensearch.index.ShardIndexingPressureSettings;
import org.opensearch.index.ShardIndexingPressureStore;
import org.opensearch.index.remote.RemoteStorePressureSettings;
import org.opensearch.index.remote.RemoteStoreStatsTrackerFactory;
import org.opensearch.index.store.remote.filecache.FileCache;
import org.opensearch.indices.IndexingMemoryController;
import org.opensearch.indices.IndicesQueryCache;
Expand Down Expand Up @@ -657,13 +659,17 @@ public void apply(Settings value, Settings current, Settings previous) {
RemoteStorePressureSettings.BYTES_LAG_VARIANCE_FACTOR,
RemoteStorePressureSettings.UPLOAD_TIME_LAG_VARIANCE_FACTOR,
RemoteStorePressureSettings.MIN_CONSECUTIVE_FAILURES_LIMIT,
RemoteStorePressureSettings.UPLOAD_BYTES_MOVING_AVERAGE_WINDOW_SIZE,
RemoteStorePressureSettings.UPLOAD_BYTES_PER_SEC_MOVING_AVERAGE_WINDOW_SIZE,
RemoteStorePressureSettings.UPLOAD_TIME_MOVING_AVERAGE_WINDOW_SIZE,

// Settings related to Remote Store stats
RemoteStoreStatsTrackerFactory.MOVING_AVERAGE_WINDOW_SIZE,

// Related to monitoring of task cancellation
TaskCancellationMonitoringSettings.IS_ENABLED_SETTING,
TaskCancellationMonitoringSettings.DURATION_MILLIS_SETTING
TaskCancellationMonitoringSettings.DURATION_MILLIS_SETTING,

// Remote cluster state settings
RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING,
RemoteClusterStateService.REMOTE_CLUSTER_STATE_REPOSITORY_SETTING
)
)
);
Expand Down
138 changes: 129 additions & 9 deletions server/src/main/java/org/opensearch/gateway/GatewayMetaState.java
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,11 @@
import org.opensearch.common.util.concurrent.OpenSearchThreadPoolExecutor;
import org.opensearch.common.util.io.IOUtils;
import org.opensearch.env.NodeMetadata;
import org.opensearch.gateway.remote.ClusterMetadataManifest;
import org.opensearch.gateway.remote.RemoteClusterStateService;
import org.opensearch.node.Node;
import org.opensearch.plugins.MetadataUpgrader;
import org.opensearch.repositories.RepositoryMissingException;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportService;

Expand All @@ -85,19 +88,19 @@
/**
* Loads (and maybe upgrades) cluster metadata at startup, and persistently stores cluster metadata for future restarts.
*
* When started, ensures that this version is compatible with the state stored on disk, and performs a state upgrade if necessary. Note that
* the state being loaded when constructing the instance of this class is not necessarily the state that will be used as {@link
* ClusterState#metadata()} because it might be stale or incomplete. Cluster-manager-eligible nodes must perform an election to find a complete and
* non-stale state, and cluster-manager-ineligible nodes receive the real cluster state from the elected cluster-manager after joining the cluster.
* When started, ensures that this version is compatible with the state stored on disk, and performs a state upgrade if necessary. Note that the state being
* loaded when constructing the instance of this class is not necessarily the state that will be used as {@link ClusterState#metadata()} because it might be
* stale or incomplete. Cluster-manager-eligible nodes must perform an election to find a complete and non-stale state, and cluster-manager-ineligible nodes
* receive the real cluster state from the elected cluster-manager after joining the cluster.
*
* @opensearch.internal
*/
public class GatewayMetaState implements Closeable {

/**
* Fake node ID for a voting configuration written by a cluster-manager-ineligible data node to indicate that its on-disk state is potentially
* stale (since it is written asynchronously after application, rather than before acceptance). This node ID means that if the node is
* restarted as a cluster-manager-eligible node then it does not win any elections until it has received a fresh cluster state.
* Fake node ID for a voting configuration written by a cluster-manager-ineligible data node to indicate that its on-disk state is potentially stale (since
* it is written asynchronously after application, rather than before acceptance). This node ID means that if the node is restarted as a
* cluster-manager-eligible node then it does not win any elections until it has received a fresh cluster state.
*/
public static final String STALE_STATE_CONFIG_NODE_ID = "STALE_STATE_CONFIG";

Expand Down Expand Up @@ -235,8 +238,8 @@ Metadata upgradeMetadataForNode(
}

/**
* This method calls {@link MetadataIndexUpgradeService} to makes sure that indices are compatible with the current
* version. The MetadataIndexUpgradeService might also update obsolete settings if needed.
* This method calls {@link MetadataIndexUpgradeService} to makes sure that indices are compatible with the current version. The MetadataIndexUpgradeService
* might also update obsolete settings if needed.
*
* @return input <code>metadata</code> if no upgrade is needed or an upgraded metadata
*/
Expand Down Expand Up @@ -600,4 +603,121 @@ public void close() throws IOException {
IOUtils.close(persistenceWriter.getAndSet(null));
}
}

/**
* Encapsulates the writing of metadata to a remote store using {@link RemoteClusterStateService}.
*/
public static class RemotePersistedState implements PersistedState {

private static final Logger logger = LogManager.getLogger(RemotePersistedState.class);

private ClusterState lastAcceptedState;
private ClusterMetadataManifest lastAcceptedManifest;
private final RemoteClusterStateService remoteClusterStateService;

public RemotePersistedState(final RemoteClusterStateService remoteClusterStateService) {
this.remoteClusterStateService = remoteClusterStateService;
}

@Override
public long getCurrentTerm() {
return lastAcceptedState != null ? lastAcceptedState.term() : 0L;
}

@Override
public ClusterState getLastAcceptedState() {
return lastAcceptedState;
}

@Override
public void setCurrentTerm(long currentTerm) {
// no-op
// For LucenePersistedState, setCurrentTerm is used only while handling StartJoinRequest by all follower nodes.
// But for RemotePersistedState, the state is only pushed by the active cluster. So this method is not required.
}

@Override
public void setLastAcceptedState(ClusterState clusterState) {
try {
if (lastAcceptedState == null || lastAcceptedState.blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK)) {
// On the initial bootstrap, repository will not be available. So we do not persist the cluster state and bail out.
logger.info("Cluster is not yet ready to publish state to remote store");
lastAcceptedState = clusterState;
return;
}
final ClusterMetadataManifest manifest;
if (shouldWriteFullClusterState(clusterState)) {
manifest = remoteClusterStateService.writeFullMetadata(clusterState);
} else {
assert verifyManifestAndClusterState(lastAcceptedManifest, lastAcceptedState) == true
: "Previous manifest and previous ClusterState are not in sync";
manifest = remoteClusterStateService.writeIncrementalMetadata(lastAcceptedState, clusterState, lastAcceptedManifest);
}
assert verifyManifestAndClusterState(manifest, clusterState) == true : "Manifest and ClusterState are not in sync";
lastAcceptedManifest = manifest;
lastAcceptedState = clusterState;
} catch (RepositoryMissingException e) {
// TODO This logic needs to be modified once PR for repo registration during bootstrap is pushed
// https://github.com/opensearch-project/OpenSearch/pull/9105/
// After the above PR is pushed, we can remove this silent failure and throw the exception instead.
logger.error("Remote repository is not yet registered");
lastAcceptedState = clusterState;
} catch (Exception e) {
handleExceptionOnWrite(e);
}
}

private boolean verifyManifestAndClusterState(ClusterMetadataManifest manifest, ClusterState clusterState) {
assert manifest != null : "ClusterMetadataManifest is null";
assert clusterState != null : "ClusterState is null";
assert clusterState.metadata().indices().size() == manifest.getIndices().size()
: "Number of indices in last accepted state and manifest are different";
manifest.getIndices().stream().forEach(md -> {
assert clusterState.metadata().indices().containsKey(md.getIndexName())
: "Last accepted state does not contain the index : " + md.getIndexName();
assert clusterState.metadata().indices().get(md.getIndexName()).getIndexUUID().equals(md.getIndexUUID())
: "Last accepted state and manifest do not have same UUID for index : " + md.getIndexName();
});
return true;
}

private boolean shouldWriteFullClusterState(ClusterState clusterState) {
if (lastAcceptedState == null
|| lastAcceptedManifest == null
|| lastAcceptedState.term() != clusterState.term()
|| lastAcceptedManifest.getOpensearchVersion() != Version.CURRENT) {
return true;
}
return false;
}

@Override
public void markLastAcceptedStateAsCommitted() {
try {
if (lastAcceptedState == null
|| lastAcceptedManifest == null
|| lastAcceptedState.blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK)) {
// On the initial bootstrap, repository will not be available. So we do not persist the cluster state and bail out.
logger.trace("Cluster is not yet ready to publish state to remote store");
return;
}
final ClusterMetadataManifest committedManifest = remoteClusterStateService.markLastStateAsCommitted(
lastAcceptedState,
lastAcceptedManifest
);
lastAcceptedManifest = committedManifest;
} catch (Exception e) {
handleExceptionOnWrite(e);
}
}

@Override
public void close() throws IOException {
remoteClusterStateService.close();
}

private void handleExceptionOnWrite(Exception e) {
throw ExceptionsHelper.convertToRuntime(e);
}
}
}
Loading

0 comments on commit 3cac1e5

Please sign in to comment.