Skip to content

Commit

Permalink
fix(kubernetes-operator): Fix SavePoint path logic and adjust the con…
Browse files Browse the repository at this point in the history
…figuration method of Flink configuration acquisition (#3732)

Co-authored-by: zhangmz <[email protected]>
  • Loading branch information
soulmz and zhangmz authored Aug 27, 2024
1 parent 6f19d9d commit 7d7a64b
Show file tree
Hide file tree
Showing 5 changed files with 131 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.dinky.gateway.kubernetes.operator.api.FlinkDeployment;
import org.dinky.gateway.result.GatewayResult;
import org.dinky.gateway.result.KubernetesResult;
import org.dinky.utils.JsonUtils;
import org.dinky.utils.LogUtil;

import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
Expand Down Expand Up @@ -75,9 +76,10 @@ public GatewayResult submitJar(FlinkUdfPathContextHolder udfPathContextHolder) {

kubernetesClient.resource(flinkDeployment).delete();
kubernetesClient.resource(flinkDeployment).waitUntilCondition(Objects::isNull, 1, TimeUnit.MINUTES);
logger.debug("flinkDeployment => {}", JsonUtils.toJsonString(flinkDeployment));
kubernetesClient.resource(flinkDeployment).createOrReplace();

FlinkDeployment flinkDeploymentResult = kubernetesClient
kubernetesClient
.resource(flinkDeployment)
.waitUntilCondition(
flinkDeployment1 -> {
Expand Down Expand Up @@ -123,11 +125,7 @@ public GatewayResult submitJar(FlinkUdfPathContextHolder udfPathContextHolder) {
TimeUnit.MINUTES);

// sleep a time ,because some time the service will not be found
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
Thread.sleep(3000);

// get jobmanager addr by service
ListOptions options = new ListOptions();
Expand All @@ -136,7 +134,7 @@ public GatewayResult submitJar(FlinkUdfPathContextHolder udfPathContextHolder) {
ServiceList list = kubernetesClient
.services()
// fixed bug can't find service list #3700
.inNamespace(configuration.getString(KubernetesConfigOptions.NAMESPACE))
.inNamespace(configuration.get(KubernetesConfigOptions.NAMESPACE))
.list(options);
if (Objects.nonNull(list) && list.getItems().isEmpty()) {
throw new RuntimeException("service list is empty, please check svc list is exists");
Expand All @@ -145,7 +143,7 @@ public GatewayResult submitJar(FlinkUdfPathContextHolder udfPathContextHolder) {
result.setWebURL("http://" + ipPort);
result.setId(result.getJids().get(0) + System.currentTimeMillis());
result.success();
} catch (KubernetesClientException e) {
} catch (KubernetesClientException | InterruptedException e) {
// some error while connecting to kube cluster
result.fail(LogUtil.getError(e));
logger.error("kubernetes client ex", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,22 @@
import org.dinky.gateway.result.TestResult;

import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;

import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.collect.Lists;

import io.fabric8.kubernetes.api.model.ObjectMeta;
import lombok.Data;
import lombok.EqualsAndHashCode;
Expand Down Expand Up @@ -124,13 +132,27 @@ private void initJob() {
.args(userJarParas)
.parallelism(Integer.parseInt(parallelism));

if (Asserts.isNotNull(config.getFlinkConfig().getSavePoint())) {
String savePointPath = config.getFlinkConfig().getSavePoint();
jobSpecBuilder.initialSavepointPath(savePointPath);
// config.getFlinkConfig().getSavePoint() always is null
// flinkDeployment spec.flinkConfiguration => subJob flinkConfig > kubernetes Config
// note: flink operator can't read job some config. ex: savepoint & kubernetes.operator config
String savePointPath = configuration.get(SavepointConfigOptions.SAVEPOINT_PATH);
logger.info("savePointPath: {}", savePointPath);

// flink operator upgradeMode specifies savepointPath recovery and needs to be matched
// with savepointRedeployNonce this parameter.
if (Asserts.isNotNull(savePointPath)) {
/*
* It is possible to redeploy a FlinkDeployment or FlinkSessionJob resource from a target savepoint
* by using the combination of savepointRedeployNonce and initialSavepointPath in the job spec
* {@see https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.8/docs/custom-resource/job-management/#redeploy-using-the-savepointredeploynonce}
* jobSpecBuilder.initialSavepointPath(savePointPath);
* jobSpecBuilder.savepointRedeployNonce(1);
*/
jobSpecBuilder.upgradeMode(UpgradeMode.SAVEPOINT);

logger.info("find save point config, the path is : {}", savePointPath);
} else {
// Officially not recommended for production configuration
jobSpecBuilder.upgradeMode(UpgradeMode.STATELESS);
logger.info("no save point config");
}
Expand All @@ -141,13 +163,28 @@ private void initJob() {
private void initResource() {
AbstractPodSpec jobManagerSpec = new AbstractPodSpec();
AbstractPodSpec taskManagerSpec = new AbstractPodSpec();
String jbcpu = kubernetesConfiguration.getOrDefault("kubernetes.jobmanager.cpu", "1");
String jbmem = flinkConfig.getConfiguration().getOrDefault("jobmanager.memory.process.size", "1G");
String jbcpu = configuration.getString("kubernetes.jobmanager.cpu", "1");
// default 1G
String jbmem = configuration.getString(JobManagerOptions.TOTAL_PROCESS_MEMORY.key(), "1G");
logger.info("jobmanager resource is : cpu-->{}, mem-->{}", jbcpu, jbmem);
// if enable job high-availability
Integer replicas = configuration.get(KubernetesConfigOptions.KUBERNETES_JOBMANAGER_REPLICAS);
if (replicas > KubernetesConfigOptions.KUBERNETES_JOBMANAGER_REPLICAS.defaultValue()
&& !Objects.equals(
HighAvailabilityOptions.HA_MODE.defaultValue(),
configuration.get(HighAvailabilityOptions.HA_MODE))) {
// jm ha kubernetes.jobmanager.replicas or job flinkConfig
replicas = configuration.get(KubernetesConfigOptions.KUBERNETES_JOBMANAGER_REPLICAS);
} else {
logger.info(
"If you need to enable high availability mode, please set the high-availability.* and kubernetes.jobmanager.replicas > 1 parameters first.");
}

jobManagerSpec.setReplicas(replicas);
jobManagerSpec.setResource(new Resource(Double.parseDouble(jbcpu), jbmem));

String tmcpu = kubernetesConfiguration.getOrDefault("kubernetes.taskmanager.cpu", "1");
String tmmem = flinkConfig.getConfiguration().getOrDefault("taskmanager.memory.process.size", "1G");
String tmcpu = configuration.getString("kubernetes.taskmanager.cpu", "1");
String tmmem = configuration.getString(TaskManagerOptions.TOTAL_PROCESS_MEMORY.key(), "1G");
logger.info("taskmanager resource is : cpu-->{}, mem-->{}", tmcpu, tmmem);
taskManagerSpec.setResource(new Resource(Double.parseDouble(tmcpu), tmmem));

Expand All @@ -158,10 +195,14 @@ private void initResource() {
flinkDeploymentSpec.setTaskManager(taskManagerSpec);
}

// flink config defined key
private final List<String> flinkConfigDefinedByFlink =
Lists.newArrayList("kubernetes.namespace", "kubernetes.cluster-id");

private void initSpec() {
String flinkVersion = flinkConfig.getFlinkVersion();
String image = kubernetesConfiguration.get("kubernetes.container.image");
String serviceAccount = kubernetesConfiguration.get("kubernetes.service-account");
String image = configuration.get(KubernetesConfigOptions.CONTAINER_IMAGE);
String serviceAccount = configuration.get(KubernetesConfigOptions.KUBERNETES_SERVICE_ACCOUNT);

logger.info("\nflinkVersion is : {} \n image is : {}", flinkVersion, image);

Expand All @@ -170,10 +211,13 @@ private void initSpec() {
} else {
throw new IllegalArgumentException("Flink version are not Set!!use Operator must be set!");
}
Map<String, String> combinedConfiguration = configuration.toMap();
// note: rm flinkConfiguration by flinkDefined
flinkConfigDefinedByFlink.forEach(combinedConfiguration::remove);
logger.info("combinedConfiguration: {}", combinedConfiguration);
flinkDeploymentSpec.setFlinkConfiguration(combinedConfiguration);

flinkDeploymentSpec.setImage(image);

flinkDeploymentSpec.setFlinkConfiguration(flinkConfig.getConfiguration());
flinkDeployment.setSpec(flinkDeploymentSpec);

if (Asserts.isNotNull(serviceAccount)) {
Expand All @@ -197,7 +241,7 @@ private void initMetadata() {
String jobName = config.getFlinkConfig().getJobName();
String nameSpace = kubernetesConfiguration.get("kubernetes.namespace");

logger.info("\njobName is :{} \n namespce is : {}", jobName, nameSpace);
logger.info("\njobName is :{} \n namespace is : {}", jobName, nameSpace);

// set Meta info , include pod name, namespace conf
ObjectMeta objectMeta = new ObjectMeta();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@
public class FlinkDeploymentSpec {
private JobSpec job;
private Long restartNonce;
/** Ingress specs. */
private IngressSpec ingress;

private Map<String, String> flinkConfiguration;
private String image;
private String imagePullPolicy;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.dinky.gateway.kubernetes.operator.api;

import org.apache.flink.annotation.Experimental;

import java.util.List;
import java.util.Map;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;

import io.fabric8.kubernetes.api.model.networking.v1.IngressTLS;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

/**
* Ingress spec.
* {@see https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.8/docs/operations/ingress/}
*
*/
@Experimental
@Data
@NoArgsConstructor
@AllArgsConstructor
@Builder
@JsonIgnoreProperties(ignoreUnknown = true)
public class IngressSpec {

/** Ingress template for the JobManager service. */
private String template;

/** Ingress className for the Flink deployment. */
private String className;

/** Ingress annotations. */
private Map<String, String> annotations;

/** Ingress labels. */
private Map<String, String> labels;

/** Ingress tls. */
private List<IngressTLS> tls;
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,8 @@ public Optional<Deployment> getJobService(String clusterId) {
Deployment deployment = kubernetesClient
.apps()
.deployments()
.inNamespace(configuration.getString(KubernetesConfigOptions.NAMESPACE))
.withName(configuration.getString(KubernetesConfigOptions.CLUSTER_ID))
.inNamespace(configuration.get(KubernetesConfigOptions.NAMESPACE))
.withName(configuration.get(KubernetesConfigOptions.CLUSTER_ID))
.get();
if (deployment == null) {
log.debug("Service {} does not exist", serviceName);
Expand Down Expand Up @@ -116,8 +116,8 @@ public Deployment createDinkyResource() {
Deployment deployment = kubernetesClient
.apps()
.deployments()
.inNamespace(configuration.getString(KubernetesConfigOptions.NAMESPACE))
.withName(configuration.getString(KubernetesConfigOptions.CLUSTER_ID))
.inNamespace(configuration.get(KubernetesConfigOptions.NAMESPACE))
.withName(configuration.get(KubernetesConfigOptions.CLUSTER_ID))
.get();
List<HasMetadata> resources = getSqlFileDecorate().buildResources();
// set owner reference
Expand Down

0 comments on commit 7d7a64b

Please sign in to comment.