Skip to content

Commit

Permalink
Retry loading of external validators if failed
Browse files Browse the repository at this point in the history
  • Loading branch information
courtneyeh committed May 5, 2024
1 parent 80687ba commit 6494265
Show file tree
Hide file tree
Showing 7 changed files with 266 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,10 @@ public void reconstructedHistoricalBlocks(
totalToRecord);
}

public void failedToLoadPublicKeysFromUrl(final String url) {
log.error("Failed to load public keys from URL: {}", url);
}

public void failedToStartValidatorClient(final String message) {
log.fatal(
"An error was encountered during validator client service start up. Error: {}", message);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,9 @@ private void initialise() {
new RejectingSlashingProtector(),
slashingProtectionLogger,
new PublicKeyLoader(
externalSignerHttpClientFactory, validatorConfig.getValidatorExternalSignerUrl()),
externalSignerHttpClientFactory,
validatorConfig.getValidatorExternalSignerUrl(),
asyncRunner),
asyncRunner,
metricsSystem,
dataDirLayout,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -430,7 +430,8 @@ private static ValidatorLoader createValidatorLoader(
slashingProtectionLogger,
new PublicKeyLoader(
externalSignerHttpClientFactory,
config.getValidatorConfig().getValidatorExternalSignerUrl()),
config.getValidatorConfig().getValidatorExternalSignerUrl(),
asyncRunner),
asyncRunner,
services.getMetricsSystem(),
config.getValidatorRestApiConfig().isRestApiEnabled()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Copyright Consensys Software Inc., 2024
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package tech.pegasys.teku.validator.client.loader;

import static tech.pegasys.teku.infrastructure.logging.StatusLogger.STATUS_LOG;

import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
import java.time.Duration;
import java.util.Arrays;
import java.util.stream.Stream;
import org.apache.tuweni.bytes.Bytes;
import tech.pegasys.teku.bls.BLSPublicKey;
import tech.pegasys.teku.infrastructure.async.AsyncRunner;
import tech.pegasys.teku.infrastructure.async.SafeFuture;
import tech.pegasys.teku.infrastructure.exceptions.InvalidConfigurationException;

public class ExternalUrlKeyReader {
private static final Duration FIXED_DELAY = Duration.ofSeconds(5);

private final String url;
private final ObjectMapper mapper;
private final AsyncRunner asyncRunner;

public ExternalUrlKeyReader(
final String url, final ObjectMapper mapper, final AsyncRunner asyncRunner) {
this.url = url;
this.mapper = mapper;
this.asyncRunner = asyncRunner;
}

public Stream<BLSPublicKey> readKeys() {
final String[] keys =
readUrl()
.exceptionallyCompose(
ex -> {
if (ex instanceof MalformedURLException) {
return SafeFuture.failedFuture(
new InvalidConfigurationException(
"Failed to load public keys from invalid URL: " + url, ex));
}
return retry();
})
.join();
return Arrays.stream(keys).map(key -> BLSPublicKey.fromSSZBytes(Bytes.fromHexString(key)));
}

public SafeFuture<String[]> readUrl() {
try {
return SafeFuture.completedFuture(mapper.readValue(new URL(url), String[].class));
} catch (IOException e) {
return SafeFuture.failedFuture(e);
}
}

public SafeFuture<String[]> retry() {
return asyncRunner
.runWithRetry(
() -> {
STATUS_LOG.failedToLoadPublicKeysFromUrl(url);
return readUrl();
},
FIXED_DELAY,
59)
.exceptionallyCompose(
ex ->
SafeFuture.failedFuture(
new InvalidConfigurationException(
"Failed to load public keys from URL: " + url, ex)));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,13 @@
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.tuweni.bytes.Bytes;
import tech.pegasys.teku.bls.BLSPublicKey;
import tech.pegasys.teku.infrastructure.async.AsyncRunner;
import tech.pegasys.teku.infrastructure.exceptions.InvalidConfigurationException;
import tech.pegasys.teku.infrastructure.http.HttpStatusCodes;

Expand All @@ -42,6 +44,7 @@ public class PublicKeyLoader {
final ObjectMapper objectMapper;
final Supplier<HttpClient> externalSignerHttpClientFactory;
final URL externalSignerUrl;
final AsyncRunner asyncRunner;

@VisibleForTesting
PublicKeyLoader() {
Expand All @@ -50,24 +53,35 @@ public class PublicKeyLoader {
() -> {
throw new UnsupportedOperationException();
},
null,
null);
}

public PublicKeyLoader(
final Supplier<HttpClient> externalSignerHttpClientFactory, final URL externalSignerUrl) {
this(new ObjectMapper(), externalSignerHttpClientFactory, externalSignerUrl);
final Supplier<HttpClient> externalSignerHttpClientFactory,
final URL externalSignerUrl,
final AsyncRunner asyncRunner) {
this(new ObjectMapper(), externalSignerHttpClientFactory, externalSignerUrl, asyncRunner);
}

public PublicKeyLoader(
final ObjectMapper objectMapper,
final Supplier<HttpClient> externalSignerHttpClientFactory,
final URL externalSignerUrl) {
final URL externalSignerUrl,
final AsyncRunner asyncRunner) {
this.objectMapper = objectMapper;
this.externalSignerHttpClientFactory = externalSignerHttpClientFactory;
this.externalSignerUrl = externalSignerUrl;
this.asyncRunner = asyncRunner;
}

public List<BLSPublicKey> getPublicKeys(final List<String> sources) {
return getPublicKeys(
sources, (url) -> new ExternalUrlKeyReader(url, objectMapper, asyncRunner));
}

public List<BLSPublicKey> getPublicKeys(
final List<String> sources, final Function<String, ExternalUrlKeyReader> urlReader) {
if (sources == null || sources.isEmpty()) {
return Collections.emptyList();
}
Expand All @@ -81,7 +95,7 @@ public List<BLSPublicKey> getPublicKeys(final List<String> sources) {
return readKeysFromExternalSigner();
}
if (key.contains(":")) {
return readKeysFromUrl(key);
return urlReader.apply(key).readKeys();
}

return Stream.of(BLSPublicKey.fromSSZBytes(Bytes.fromHexString(key)));
Expand All @@ -95,15 +109,6 @@ public List<BLSPublicKey> getPublicKeys(final List<String> sources) {
}
}

private Stream<BLSPublicKey> readKeysFromUrl(final String url) {
try {
final String[] keys = objectMapper.readValue(new URL(url), String[].class);
return Arrays.stream(keys).map(key -> BLSPublicKey.fromSSZBytes(Bytes.fromHexString(key)));
} catch (IOException ex) {
throw new InvalidConfigurationException("Failed to load public keys from URL " + url, ex);
}
}

private Stream<BLSPublicKey> readKeysFromExternalSigner() {
try {
final URI uri = externalSignerUrl.toURI().resolve(EXTERNAL_SIGNER_PUBKEYS_ENDPOINT);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
/*
* Copyright Consensys Software Inc., 2024
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package tech.pegasys.teku.validator.client.loader;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;
import static org.mockito.Mockito.when;

import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.net.ConnectException;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.UnknownHostException;
import java.time.Duration;
import java.util.concurrent.CompletionException;
import java.util.stream.Collectors;
import org.junit.jupiter.api.Test;
import tech.pegasys.teku.bls.BLSPublicKey;
import tech.pegasys.teku.infrastructure.async.SafeFuture;
import tech.pegasys.teku.infrastructure.async.StubAsyncRunner;
import tech.pegasys.teku.infrastructure.exceptions.InvalidConfigurationException;
import tech.pegasys.teku.infrastructure.time.StubTimeProvider;
import tech.pegasys.teku.spec.TestSpecFactory;
import tech.pegasys.teku.spec.util.DataStructureUtil;

class ExternalUrlKeyReaderTest {
private static final Duration DELAY = Duration.ofSeconds(5);
private static final String VALID_URL = "http://test:0000/api/v1/eth2/publicKeys";
private final ObjectMapper mapper = mock(ObjectMapper.class);

private final StubTimeProvider timeProvider = StubTimeProvider.withTimeInMillis(0);
private final StubAsyncRunner asyncRunner = new StubAsyncRunner(timeProvider);

private final DataStructureUtil dataStructureUtil =
new DataStructureUtil(TestSpecFactory.createDefault());

final BLSPublicKey publicKey1 = dataStructureUtil.randomPublicKey();
final BLSPublicKey publicKey2 = dataStructureUtil.randomPublicKey();
final String[] expectedKeys = new String[] {publicKey1.toHexString(), publicKey2.toHexString()};

@Test
void readKeys_validUrlReturnsValidKeys() throws IOException {
when(mapper.readValue(any(URL.class), eq(String[].class))).thenReturn(expectedKeys);
final ExternalUrlKeyReader reader = new ExternalUrlKeyReader(VALID_URL, mapper, asyncRunner);

assertThat(reader.readKeys()).contains(publicKey1, publicKey2);
verify(mapper).readValue(any(URL.class), eq(String[].class));
}

@Test
void readKeys_validUrlReturnsEmptyKeys() throws IOException {
when(mapper.readValue(any(URL.class), eq(String[].class))).thenReturn(new String[] {});
final ExternalUrlKeyReader reader = new ExternalUrlKeyReader(VALID_URL, mapper, asyncRunner);

assertThat(reader.readKeys()).isEmpty();
verify(mapper).readValue(any(URL.class), eq(String[].class));
}

@Test
void readKeys_validUrlReturnsInvalidKeys() throws IOException {
when(mapper.readValue(any(URL.class), eq(String[].class)))
.thenReturn(new String[] {"invalid", "keys"});
final ExternalUrlKeyReader reader = new ExternalUrlKeyReader(VALID_URL, mapper, asyncRunner);

assertThatThrownBy(() -> reader.readKeys().collect(Collectors.toSet()))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("Invalid odd-length hex binary representation");
verify(mapper).readValue(any(URL.class), eq(String[].class));
}

@Test
void readKeys_malformedUrlString() {
final String invalidUrl = "invalid:url";
final ExternalUrlKeyReader reader = new ExternalUrlKeyReader(invalidUrl, mapper, asyncRunner);

assertThatThrownBy(reader::readKeys)
.isInstanceOf(CompletionException.class)
.hasCauseInstanceOf(InvalidConfigurationException.class)
.hasMessageContaining("Failed to load public keys from invalid URL: " + invalidUrl)
.hasRootCauseInstanceOf(MalformedURLException.class);
verifyNoInteractions(mapper);
}

@Test
void readKeysWithRetry_unreachableUrlRetryUntilReachable() throws IOException {
final UnknownHostException exception = new UnknownHostException("Unknown host");
when(mapper.readValue(any(URL.class), eq(String[].class)))
.thenThrow(exception, exception, exception)
.thenReturn(expectedKeys);
final ExternalUrlKeyReader reader = new ExternalUrlKeyReader(VALID_URL, mapper, asyncRunner);

final SafeFuture<String[]> keys = reader.retry();
for (int i = 0; i < 3; i++) {
assertThat(keys).isNotCompleted();
timeProvider.advanceTimeBy(DELAY);
asyncRunner.executeQueuedActions();
}
assertThat(keys).isCompletedWithValue(expectedKeys);
verify(mapper, times(4)).readValue(any(URL.class), eq(String[].class));
}

@Test
void readKeysWithRetry_unreachableUrlRetryUntilMaxRetries() throws IOException {
final IOException exception = new ConnectException("Connection refused");
when(mapper.readValue(any(URL.class), eq(String[].class))).thenThrow(exception);
final ExternalUrlKeyReader reader = new ExternalUrlKeyReader(VALID_URL, mapper, asyncRunner);

final SafeFuture<String[]> keys = reader.retry();
for (int i = 0; i < 59; i++) {
assertThat(keys).isNotCompleted();
timeProvider.advanceTimeBy(DELAY);
asyncRunner.executeQueuedActions();
}
assertThat(keys).isCompletedExceptionally();
assertThatThrownBy(keys::join)
.isInstanceOf(CompletionException.class)
.hasCauseInstanceOf(InvalidConfigurationException.class)
.hasRootCause(exception);
verify(mapper, times(60)).readValue(any(URL.class), eq(String[].class));
}
}
Loading

0 comments on commit 6494265

Please sign in to comment.