diff --git a/dinky-gateway/src/main/java/org/dinky/gateway/kubernetes/operator/KubernetesApplicationOperatorGateway.java b/dinky-gateway/src/main/java/org/dinky/gateway/kubernetes/operator/KubernetesApplicationOperatorGateway.java index 03b9d90ed4..50ac313ce9 100644 --- a/dinky-gateway/src/main/java/org/dinky/gateway/kubernetes/operator/KubernetesApplicationOperatorGateway.java +++ b/dinky-gateway/src/main/java/org/dinky/gateway/kubernetes/operator/KubernetesApplicationOperatorGateway.java @@ -26,6 +26,7 @@ import org.dinky.gateway.kubernetes.operator.api.FlinkDeployment; import org.dinky.gateway.result.GatewayResult; import org.dinky.gateway.result.KubernetesResult; +import org.dinky.utils.JsonUtils; import org.dinky.utils.LogUtil; import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions; @@ -75,9 +76,10 @@ public GatewayResult submitJar(FlinkUdfPathContextHolder udfPathContextHolder) { kubernetesClient.resource(flinkDeployment).delete(); kubernetesClient.resource(flinkDeployment).waitUntilCondition(Objects::isNull, 1, TimeUnit.MINUTES); + logger.debug("flinkDeployment => {}", JsonUtils.toJsonString(flinkDeployment)); kubernetesClient.resource(flinkDeployment).createOrReplace(); - FlinkDeployment flinkDeploymentResult = kubernetesClient + kubernetesClient .resource(flinkDeployment) .waitUntilCondition( flinkDeployment1 -> { @@ -123,11 +125,7 @@ public GatewayResult submitJar(FlinkUdfPathContextHolder udfPathContextHolder) { TimeUnit.MINUTES); // sleep a time ,because some time the service will not be found - try { - Thread.sleep(3000); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } + Thread.sleep(3000); // get jobmanager addr by service ListOptions options = new ListOptions(); @@ -136,7 +134,7 @@ public GatewayResult submitJar(FlinkUdfPathContextHolder udfPathContextHolder) { ServiceList list = kubernetesClient .services() // fixed bug can't find service list #3700 - .inNamespace(configuration.getString(KubernetesConfigOptions.NAMESPACE)) + .inNamespace(configuration.get(KubernetesConfigOptions.NAMESPACE)) .list(options); if (Objects.nonNull(list) && list.getItems().isEmpty()) { throw new RuntimeException("service list is empty, please check svc list is exists"); @@ -145,7 +143,7 @@ public GatewayResult submitJar(FlinkUdfPathContextHolder udfPathContextHolder) { result.setWebURL("http://" + ipPort); result.setId(result.getJids().get(0) + System.currentTimeMillis()); result.success(); - } catch (KubernetesClientException e) { + } catch (KubernetesClientException | InterruptedException e) { // some error while connecting to kube cluster result.fail(LogUtil.getError(e)); logger.error("kubernetes client ex", e); diff --git a/dinky-gateway/src/main/java/org/dinky/gateway/kubernetes/operator/KubernetesOperatorGateway.java b/dinky-gateway/src/main/java/org/dinky/gateway/kubernetes/operator/KubernetesOperatorGateway.java index f7280c50df..91136ce3b5 100644 --- a/dinky-gateway/src/main/java/org/dinky/gateway/kubernetes/operator/KubernetesOperatorGateway.java +++ b/dinky-gateway/src/main/java/org/dinky/gateway/kubernetes/operator/KubernetesOperatorGateway.java @@ -32,14 +32,22 @@ import org.dinky.gateway.result.TestResult; import org.apache.flink.configuration.CoreOptions; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions; +import org.apache.flink.runtime.jobgraph.SavepointConfigOptions; +import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.UUID; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.collect.Lists; + import io.fabric8.kubernetes.api.model.ObjectMeta; import lombok.Data; import lombok.EqualsAndHashCode; @@ -124,13 +132,27 @@ private void initJob() { .args(userJarParas) .parallelism(Integer.parseInt(parallelism)); - if (Asserts.isNotNull(config.getFlinkConfig().getSavePoint())) { - String savePointPath = config.getFlinkConfig().getSavePoint(); - jobSpecBuilder.initialSavepointPath(savePointPath); + // config.getFlinkConfig().getSavePoint() always is null + // flinkDeployment spec.flinkConfiguration => subJob flinkConfig > kubernetes Config + // note: flink operator can't read job some config. ex: savepoint & kubernetes.operator config + String savePointPath = configuration.get(SavepointConfigOptions.SAVEPOINT_PATH); + logger.info("savePointPath: {}", savePointPath); + + // flink operator upgradeMode specifies savepointPath recovery and needs to be matched + // with savepointRedeployNonce this parameter. + if (Asserts.isNotNull(savePointPath)) { + /* + * It is possible to redeploy a FlinkDeployment or FlinkSessionJob resource from a target savepoint + * by using the combination of savepointRedeployNonce and initialSavepointPath in the job spec + * {@see https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.8/docs/custom-resource/job-management/#redeploy-using-the-savepointredeploynonce} + * jobSpecBuilder.initialSavepointPath(savePointPath); + * jobSpecBuilder.savepointRedeployNonce(1); + */ jobSpecBuilder.upgradeMode(UpgradeMode.SAVEPOINT); logger.info("find save point config, the path is : {}", savePointPath); } else { + // Officially not recommended for production configuration jobSpecBuilder.upgradeMode(UpgradeMode.STATELESS); logger.info("no save point config"); } @@ -141,13 +163,28 @@ private void initJob() { private void initResource() { AbstractPodSpec jobManagerSpec = new AbstractPodSpec(); AbstractPodSpec taskManagerSpec = new AbstractPodSpec(); - String jbcpu = kubernetesConfiguration.getOrDefault("kubernetes.jobmanager.cpu", "1"); - String jbmem = flinkConfig.getConfiguration().getOrDefault("jobmanager.memory.process.size", "1G"); + String jbcpu = configuration.getString("kubernetes.jobmanager.cpu", "1"); + // default 1G + String jbmem = configuration.getString(JobManagerOptions.TOTAL_PROCESS_MEMORY.key(), "1G"); logger.info("jobmanager resource is : cpu-->{}, mem-->{}", jbcpu, jbmem); + // if enable job high-availability + Integer replicas = configuration.get(KubernetesConfigOptions.KUBERNETES_JOBMANAGER_REPLICAS); + if (replicas > KubernetesConfigOptions.KUBERNETES_JOBMANAGER_REPLICAS.defaultValue() + && !Objects.equals( + HighAvailabilityOptions.HA_MODE.defaultValue(), + configuration.get(HighAvailabilityOptions.HA_MODE))) { + // jm ha kubernetes.jobmanager.replicas or job flinkConfig + replicas = configuration.get(KubernetesConfigOptions.KUBERNETES_JOBMANAGER_REPLICAS); + } else { + logger.info( + "If you need to enable high availability mode, please set the high-availability.* and kubernetes.jobmanager.replicas > 1 parameters first."); + } + + jobManagerSpec.setReplicas(replicas); jobManagerSpec.setResource(new Resource(Double.parseDouble(jbcpu), jbmem)); - String tmcpu = kubernetesConfiguration.getOrDefault("kubernetes.taskmanager.cpu", "1"); - String tmmem = flinkConfig.getConfiguration().getOrDefault("taskmanager.memory.process.size", "1G"); + String tmcpu = configuration.getString("kubernetes.taskmanager.cpu", "1"); + String tmmem = configuration.getString(TaskManagerOptions.TOTAL_PROCESS_MEMORY.key(), "1G"); logger.info("taskmanager resource is : cpu-->{}, mem-->{}", tmcpu, tmmem); taskManagerSpec.setResource(new Resource(Double.parseDouble(tmcpu), tmmem)); @@ -158,10 +195,14 @@ private void initResource() { flinkDeploymentSpec.setTaskManager(taskManagerSpec); } + // flink config defined key + private final List flinkConfigDefinedByFlink = + Lists.newArrayList("kubernetes.namespace", "kubernetes.cluster-id"); + private void initSpec() { String flinkVersion = flinkConfig.getFlinkVersion(); - String image = kubernetesConfiguration.get("kubernetes.container.image"); - String serviceAccount = kubernetesConfiguration.get("kubernetes.service-account"); + String image = configuration.get(KubernetesConfigOptions.CONTAINER_IMAGE); + String serviceAccount = configuration.get(KubernetesConfigOptions.KUBERNETES_SERVICE_ACCOUNT); logger.info("\nflinkVersion is : {} \n image is : {}", flinkVersion, image); @@ -170,10 +211,13 @@ private void initSpec() { } else { throw new IllegalArgumentException("Flink version are not Set!!use Operator must be set!"); } + Map combinedConfiguration = configuration.toMap(); + // note: rm flinkConfiguration by flinkDefined + flinkConfigDefinedByFlink.forEach(combinedConfiguration::remove); + logger.info("combinedConfiguration: {}", combinedConfiguration); + flinkDeploymentSpec.setFlinkConfiguration(combinedConfiguration); flinkDeploymentSpec.setImage(image); - - flinkDeploymentSpec.setFlinkConfiguration(flinkConfig.getConfiguration()); flinkDeployment.setSpec(flinkDeploymentSpec); if (Asserts.isNotNull(serviceAccount)) { @@ -197,7 +241,7 @@ private void initMetadata() { String jobName = config.getFlinkConfig().getJobName(); String nameSpace = kubernetesConfiguration.get("kubernetes.namespace"); - logger.info("\njobName is :{} \n namespce is : {}", jobName, nameSpace); + logger.info("\njobName is :{} \n namespace is : {}", jobName, nameSpace); // set Meta info , include pod name, namespace conf ObjectMeta objectMeta = new ObjectMeta(); diff --git a/dinky-gateway/src/main/java/org/dinky/gateway/kubernetes/operator/api/FlinkDeploymentSpec.java b/dinky-gateway/src/main/java/org/dinky/gateway/kubernetes/operator/api/FlinkDeploymentSpec.java index 0a8cf81d37..e007481c56 100644 --- a/dinky-gateway/src/main/java/org/dinky/gateway/kubernetes/operator/api/FlinkDeploymentSpec.java +++ b/dinky-gateway/src/main/java/org/dinky/gateway/kubernetes/operator/api/FlinkDeploymentSpec.java @@ -42,6 +42,9 @@ public class FlinkDeploymentSpec { private JobSpec job; private Long restartNonce; + /** Ingress specs. */ + private IngressSpec ingress; + private Map flinkConfiguration; private String image; private String imagePullPolicy; diff --git a/dinky-gateway/src/main/java/org/dinky/gateway/kubernetes/operator/api/IngressSpec.java b/dinky-gateway/src/main/java/org/dinky/gateway/kubernetes/operator/api/IngressSpec.java new file mode 100644 index 0000000000..4a853e3c26 --- /dev/null +++ b/dinky-gateway/src/main/java/org/dinky/gateway/kubernetes/operator/api/IngressSpec.java @@ -0,0 +1,62 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.dinky.gateway.kubernetes.operator.api; + +import org.apache.flink.annotation.Experimental; + +import java.util.List; +import java.util.Map; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; + +import io.fabric8.kubernetes.api.model.networking.v1.IngressTLS; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +/** + * Ingress spec. + * {@see https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.8/docs/operations/ingress/} + * + */ +@Experimental +@Data +@NoArgsConstructor +@AllArgsConstructor +@Builder +@JsonIgnoreProperties(ignoreUnknown = true) +public class IngressSpec { + + /** Ingress template for the JobManager service. */ + private String template; + + /** Ingress className for the Flink deployment. */ + private String className; + + /** Ingress annotations. */ + private Map annotations; + + /** Ingress labels. */ + private Map labels; + + /** Ingress tls. */ + private List tls; +} diff --git a/dinky-gateway/src/main/java/org/dinky/gateway/kubernetes/utils/K8sClientHelper.java b/dinky-gateway/src/main/java/org/dinky/gateway/kubernetes/utils/K8sClientHelper.java index 1bf099dfbb..8e4eab48a0 100644 --- a/dinky-gateway/src/main/java/org/dinky/gateway/kubernetes/utils/K8sClientHelper.java +++ b/dinky-gateway/src/main/java/org/dinky/gateway/kubernetes/utils/K8sClientHelper.java @@ -76,8 +76,8 @@ public Optional getJobService(String clusterId) { Deployment deployment = kubernetesClient .apps() .deployments() - .inNamespace(configuration.getString(KubernetesConfigOptions.NAMESPACE)) - .withName(configuration.getString(KubernetesConfigOptions.CLUSTER_ID)) + .inNamespace(configuration.get(KubernetesConfigOptions.NAMESPACE)) + .withName(configuration.get(KubernetesConfigOptions.CLUSTER_ID)) .get(); if (deployment == null) { log.debug("Service {} does not exist", serviceName); @@ -116,8 +116,8 @@ public Deployment createDinkyResource() { Deployment deployment = kubernetesClient .apps() .deployments() - .inNamespace(configuration.getString(KubernetesConfigOptions.NAMESPACE)) - .withName(configuration.getString(KubernetesConfigOptions.CLUSTER_ID)) + .inNamespace(configuration.get(KubernetesConfigOptions.NAMESPACE)) + .withName(configuration.get(KubernetesConfigOptions.CLUSTER_ID)) .get(); List resources = getSqlFileDecorate().buildResources(); // set owner reference