Skip to content

Commit

Permalink
Spark k8s operator task Added status acquisition (apache#4889)
Browse files Browse the repository at this point in the history
* Spark k8s operator task Added status acquisition

* spark The obtaining status of the k8s operator task is changed to k8s list-watch

* spark The obtaining status of the k8s operator task is changed to k8s list-watch
  • Loading branch information
ChengJie1053 authored Sep 1, 2023
1 parent b5453a2 commit 4169e70
Showing 1 changed file with 36 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@
import io.fabric8.kubernetes.api.model.apiextensions.v1.CustomResourceDefinitionList;
import io.fabric8.kubernetes.client.CustomResource;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.Watcher;
import io.fabric8.kubernetes.client.WatcherException;
import io.fabric8.kubernetes.client.dsl.MixedOperation;
import io.fabric8.kubernetes.client.dsl.NonNamespaceOperation;
import io.fabric8.kubernetes.client.dsl.Resource;
import org.slf4j.Logger;
Expand Down Expand Up @@ -152,32 +155,44 @@ public void deployCluster(String mainClass, String args, Map<String, String> con
}

public boolean initJobId() {
SparkApplicationStatus sparkApplicationStatus = getKubernetesOperatorState();

if (Objects.nonNull(sparkApplicationStatus)) {
this.applicationId = sparkApplicationStatus.getSparkApplicationId();
this.jobState =
kubernetesOperatorStateConvertSparkState(
sparkApplicationStatus.getApplicationState().getState());
try {
getKubernetesOperatorState();
} catch (Exception e) {
try {
// Prevent watch interruption due to network interruption.Restart Watcher.
Thread.sleep(5000);
getKubernetesOperatorState();
} catch (InterruptedException interruptedException) {
logger.error("Use k8s watch obtain the status failed");
}
}

// When the job is not finished, the appId is monitored; otherwise, the status is
// monitored(当任务没结束时,监控appId,反之,则监控状态,这里主要防止任务过早结束,导致一直等待)
return null != getApplicationId() || (jobState != null && jobState.isFinal());
}

private SparkApplicationStatus getKubernetesOperatorState() {
List<SparkApplication> sparkApplicationList =
getSparkApplicationClient(client).list().getItems();
if (CollectionUtils.isNotEmpty(sparkApplicationList)) {
for (SparkApplication sparkApplication : sparkApplicationList) {
if (sparkApplication.getMetadata().getNamespace().equals(this.sparkConfig.getK8sNamespace())
&& sparkApplication.getMetadata().getName().equals(this.sparkConfig.getAppName())) {
return sparkApplication.getStatus();
}
}
}
return null;
private void getKubernetesOperatorState() {
getSparkApplicationClient(client)
.inNamespace(this.sparkConfig.getK8sNamespace())
.withName(this.sparkConfig.getAppName())
.watch(
new Watcher<SparkApplication>() {
@Override
public void eventReceived(Action action, SparkApplication sparkApplication) {
// todo get status
applicationId = sparkApplication.getStatus().getSparkApplicationId();
jobState =
kubernetesOperatorStateConvertSparkState(
sparkApplication.getStatus().getApplicationState().getState());
}

@Override
public void onClose(WatcherException e) {
// Invoked when the watcher closes due to an Exception.Restart Watcher.
logger.error("Use k8s watch obtain the status failed", e);
getKubernetesOperatorState();
}
});
}

public SparkAppHandle.State kubernetesOperatorStateConvertSparkState(String kubernetesState) {
Expand Down Expand Up @@ -216,8 +231,7 @@ public void close() {
client.close();
}

public static NonNamespaceOperation<
SparkApplication, SparkApplicationList, Resource<SparkApplication>>
public static MixedOperation<SparkApplication, SparkApplicationList, Resource<SparkApplication>>
getSparkApplicationClient(KubernetesClient client) {
return client.customResources(SparkApplication.class, SparkApplicationList.class);
}
Expand Down

0 comments on commit 4169e70

Please sign in to comment.