Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make index and global metadata upload timeout dynamic cluster settings #10814

Merged
Original file line number Diff line number Diff line change
Expand Up @@ -682,6 +682,8 @@ public void apply(Settings value, Settings current, Settings previous) {

// Remote cluster state settings
RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING,
RemoteClusterStateService.INDEX_METADATA_UPLOAD_TIMEOUT_SETTING,
RemoteClusterStateService.GLOBAL_METADATA_UPLOAD_TIMEOUT_SETTING,
RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING,
IndicesService.CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING,
IndicesService.CLUSTER_REMOTE_INDEX_RESTRICT_ASYNC_DURABILITY_SETTING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,23 @@ public class RemoteClusterStateService implements Closeable {

private static final Logger logger = LogManager.getLogger(RemoteClusterStateService.class);

// TODO make this two variable as dynamic setting [issue: #10688]
public static final int INDEX_METADATA_UPLOAD_WAIT_MILLIS = 20000;
public static final int GLOBAL_METADATA_UPLOAD_WAIT_MILLIS = 20000;
public static final TimeValue INDEX_METADATA_UPLOAD_TIMEOUT_DEFAULT = TimeValue.timeValueMillis(20000);

public static final TimeValue GLOBAL_METADATA_UPLOAD_TIMEOUT_DEFAULT = TimeValue.timeValueMillis(20000);

public static final Setting<TimeValue> INDEX_METADATA_UPLOAD_TIMEOUT_SETTING = Setting.timeSetting(
"cluster.remote_store.state.index_metadata.upload_timeout",
INDEX_METADATA_UPLOAD_TIMEOUT_DEFAULT,
Setting.Property.Dynamic,
Setting.Property.NodeScope
);

public static final Setting<TimeValue> GLOBAL_METADATA_UPLOAD_TIMEOUT_SETTING = Setting.timeSetting(
"cluster.remote_store.state.global_metadata.upload_timeout",
GLOBAL_METADATA_UPLOAD_TIMEOUT_DEFAULT,
Setting.Property.Dynamic,
Setting.Property.NodeScope
);

public static final ChecksumBlobStoreFormat<IndexMetadata> INDEX_METADATA_FORMAT = new ChecksumBlobStoreFormat<>(
"index-metadata",
Expand Down Expand Up @@ -141,6 +155,9 @@ public class RemoteClusterStateService implements Closeable {
private BlobStoreTransferService blobStoreTransferService;
private volatile TimeValue slowWriteLoggingThreshold;

private volatile TimeValue indexMetadataUploadTimeout;
private volatile TimeValue globalMetadataUploadTimeout;

private final AtomicBoolean deleteStaleMetadataRunning = new AtomicBoolean(false);

public static final int INDEX_METADATA_CURRENT_CODEC_VERSION = 1;
Expand Down Expand Up @@ -171,7 +188,11 @@ public RemoteClusterStateService(
this.relativeTimeNanosSupplier = relativeTimeNanosSupplier;
this.threadpool = threadPool;
this.slowWriteLoggingThreshold = clusterSettings.get(SLOW_WRITE_LOGGING_THRESHOLD);
this.indexMetadataUploadTimeout = clusterSettings.get(INDEX_METADATA_UPLOAD_TIMEOUT_SETTING);
this.globalMetadataUploadTimeout = clusterSettings.get(GLOBAL_METADATA_UPLOAD_TIMEOUT_SETTING);
clusterSettings.addSettingsUpdateConsumer(SLOW_WRITE_LOGGING_THRESHOLD, this::setSlowWriteLoggingThreshold);
clusterSettings.addSettingsUpdateConsumer(INDEX_METADATA_UPLOAD_TIMEOUT_SETTING, this::setIndexMetadataUploadTimeout);
clusterSettings.addSettingsUpdateConsumer(GLOBAL_METADATA_UPLOAD_TIMEOUT_SETTING, this::setGlobalMetadataUploadTimeout);
}

private BlobStoreTransferService getBlobStoreTransferService() {
Expand Down Expand Up @@ -367,7 +388,7 @@ private String writeGlobalMetadata(ClusterState clusterState) throws IOException
);

try {
if (latch.await(GLOBAL_METADATA_UPLOAD_WAIT_MILLIS, TimeUnit.MILLISECONDS) == false) {
if (latch.await(getGlobalMetadataUploadTimeout().millis(), TimeUnit.MILLISECONDS) == false) {
// TODO: We should add metrics where transfer is timing out. [Issue: #10687]
GlobalMetadataTransferException ex = new GlobalMetadataTransferException(
String.format(Locale.ROOT, "Timed out waiting for transfer of global metadata to complete")
Expand Down Expand Up @@ -422,7 +443,7 @@ private List<UploadedIndexMetadata> writeIndexMetadataParallel(ClusterState clus
}

try {
if (latch.await(INDEX_METADATA_UPLOAD_WAIT_MILLIS, TimeUnit.MILLISECONDS) == false) {
if (latch.await(getIndexMetadataUploadTimeout().millis(), TimeUnit.MILLISECONDS) == false) {
IndexMetadataTransferException ex = new IndexMetadataTransferException(
String.format(
Locale.ROOT,
Expand Down Expand Up @@ -615,6 +636,22 @@ private void setSlowWriteLoggingThreshold(TimeValue slowWriteLoggingThreshold) {
this.slowWriteLoggingThreshold = slowWriteLoggingThreshold;
}

private void setIndexMetadataUploadTimeout(TimeValue newIndexMetadataUploadTimeout) {
this.indexMetadataUploadTimeout = newIndexMetadataUploadTimeout;
}

private void setGlobalMetadataUploadTimeout(TimeValue newGlobalMetadataUploadTimeout) {
this.globalMetadataUploadTimeout = newGlobalMetadataUploadTimeout;
}

public TimeValue getIndexMetadataUploadTimeout() {
return this.indexMetadataUploadTimeout;
}

public TimeValue getGlobalMetadataUploadTimeout() {
return this.globalMetadataUploadTimeout;
}

static String getManifestFileName(long term, long version, boolean committed) {
// 123456789012_test-cluster/cluster-state/dsgYj10Nkso7/manifest/manifest__<inverted_term>__<inverted_version>__C/P__<inverted__timestamp>__<codec_version>
return String.join(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@
public class RemoteClusterStateServiceTests extends OpenSearchTestCase {

private RemoteClusterStateService remoteClusterStateService;
private ClusterSettings clusterSettings;
private Supplier<RepositoriesService> repositoriesServiceSupplier;
private RepositoriesService repositoriesService;
private BlobStoreRepository blobStoreRepository;
Expand Down Expand Up @@ -126,6 +127,7 @@ public void setup() {
.put(RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING.getKey(), true)
.build();

clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
blobStoreRepository = mock(BlobStoreRepository.class);
blobStore = mock(BlobStore.class);
when(blobStoreRepository.blobStore()).thenReturn(blobStore);
Expand All @@ -135,7 +137,7 @@ public void setup() {
"test-node-id",
repositoriesServiceSupplier,
settings,
new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
clusterSettings,
() -> 0L,
threadPool
);
Expand Down Expand Up @@ -1039,6 +1041,38 @@ public void testSingleConcurrentExecutionOfStaleManifestCleanup() throws Excepti
assertBusy(() -> assertEquals(1, callCount.get()));
}

public void testIndexMetadataUploadWaitTimeSetting() {
// verify default value
assertEquals(
RemoteClusterStateService.INDEX_METADATA_UPLOAD_TIMEOUT_DEFAULT,
remoteClusterStateService.getIndexMetadataUploadTimeout()
);

// verify update index metadata upload timeout
int indexMetadataUploadTimeout = randomIntBetween(1, 10);
Settings newSettings = Settings.builder()
.put("cluster.remote_store.state.index_metadata.upload_timeout", indexMetadataUploadTimeout + "s")
.build();
clusterSettings.applySettings(newSettings);
assertEquals(indexMetadataUploadTimeout, remoteClusterStateService.getIndexMetadataUploadTimeout().seconds());
}

public void testGlobalMetadataUploadWaitTimeSetting() {
// verify default value
assertEquals(
RemoteClusterStateService.GLOBAL_METADATA_UPLOAD_TIMEOUT_DEFAULT,
remoteClusterStateService.getGlobalMetadataUploadTimeout()
);

// verify update global metadata upload timeout
int globalMetadataUploadTimeout = randomIntBetween(1, 10);
Settings newSettings = Settings.builder()
.put("cluster.remote_store.state.global_metadata.upload_timeout", globalMetadataUploadTimeout + "s")
.build();
clusterSettings.applySettings(newSettings);
assertEquals(globalMetadataUploadTimeout, remoteClusterStateService.getGlobalMetadataUploadTimeout().seconds());
}

private void mockObjectsForGettingPreviousClusterUUID(Map<String, String> clusterUUIDsPointers) throws IOException {
final BlobPath blobPath = mock(BlobPath.class);
when((blobStoreRepository.basePath())).thenReturn(blobPath);
Expand Down
Loading