Skip to content

Commit

Permalink
Merge pull request #3532 from ingef/feature/health-shard-heartbeat
Browse files Browse the repository at this point in the history
Adds timeout check to ClusterHealthCheck
  • Loading branch information
thoniTUB authored Aug 22, 2024
2 parents 4cb743b + 9d5022e commit 15c6180
Show file tree
Hide file tree
Showing 5 changed files with 34 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ public ClusterManager provideManager(ConqueryConfig config, Environment environm
delegate =
new DelegateManager<>(config, environment, datasetRegistry, storage, importHandler, extension, nodeProvider, adminTasks, internalMapperFactory, jobManager);

environment.healthChecks().register("cluster", new ClusterHealthCheck(clusterState));
environment.healthChecks()
.register("cluster", new ClusterHealthCheck(clusterState, nodeProvider, config.getCluster().getHeartbeatTimeout().toJavaDuration()));

return new ClusterManager(delegate, connectionManager);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ public class ClusterConfig extends Configuration {
private int entityBucketSize = 1000;

private Duration idleTimeOut = Duration.minutes(5);
private Duration heartbeatTimeout = Duration.minutes(1);
private Duration connectRetryTimeout = Duration.seconds(30);

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.List;

import javax.annotation.Nullable;
import jakarta.validation.constraints.NotNull;

Expand Down Expand Up @@ -40,11 +39,15 @@ public int size() {
return jobs.size();
}

@JsonIgnore
public Duration getAge() {
return Duration.between(timestamp, LocalDateTime.now());
}

// Used in AdminUIResource/jobs
@JsonIgnore
public String getAgeString() {
final Duration duration = Duration.between(timestamp, LocalDateTime.now());

return DurationFormatUtils.formatDurationWords(duration.toMillis(), true, true);
return DurationFormatUtils.formatDurationWords(getAge().toMillis(), true, true);
}
}
Original file line number Diff line number Diff line change
@@ -1,9 +1,15 @@
package com.bakdata.conquery.models.worker;

import java.net.SocketAddress;
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.Collection;
import java.util.List;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;

import com.bakdata.conquery.io.mina.NetworkSession;
import com.bakdata.conquery.mode.cluster.ClusterState;
import com.codahale.metrics.health.HealthCheck;
import lombok.Data;
Expand All @@ -13,6 +19,8 @@ public class ClusterHealthCheck extends HealthCheck {

public static final String HEALTHY_MESSAGE_FMT = "All %d known shards are connected.";
private final ClusterState clusterState;
private final Supplier<Collection<ShardNodeInformation>> nodeProvider;
private final Duration heartbeatTimeout;

@Override
protected Result check() throws Exception {
Expand All @@ -24,10 +32,23 @@ protected Result check() throws Exception {
.map(ShardNodeInformation::toString)
.toList();

if (disconnectedWorkers.isEmpty()){
return Result.healthy(HEALTHY_MESSAGE_FMT, knownShards.size());
if (!disconnectedWorkers.isEmpty()) {
return Result.unhealthy("The shard(s) %s are no longer connected.".formatted(String.join(",", disconnectedWorkers)));
}

return Result.unhealthy("The shard(s) %s are no longer connected.".formatted(String.join(",", disconnectedWorkers)));
LocalDateTime now = LocalDateTime.now();
List<ShardNodeInformation> timeoutShards = nodeProvider.get().stream()
.filter((status) -> heartbeatTimeout.minus(Duration.between(now, status.getLastStatusTime()))
.isNegative()).toList();

if (!timeoutShards.isEmpty()) {
return Result.unhealthy("Shards timed out:%s".formatted(timeoutShards.stream()
.map(ShardNodeInformation::getSession)
.map(NetworkSession::getRemoteAddress)
.map(SocketAddress::toString)
.collect(Collectors.joining(", "))));
}

return Result.healthy(HEALTHY_MESSAGE_FMT, knownShards.size());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ public class ShardNodeInformation extends MessageSender.Simple<MessageToShardNod
@Getter
private final Set<JobManagerStatus> jobManagerStatus = new HashSet<>();
private final AtomicBoolean full = new AtomicBoolean(false);
@Getter
private LocalDateTime lastStatusTime = LocalDateTime.now();

public ShardNodeInformation(NetworkSession session, int backpressure) {
Expand Down

0 comments on commit 15c6180

Please sign in to comment.