Skip to content

Commit

Permalink
Merge branch 'dev' of https://github.com/DataLinkDC/dinky into dev
Browse files Browse the repository at this point in the history
  • Loading branch information
actions-user committed Dec 3, 2024
2 parents 8069560 + 1ffe49f commit d1aec78
Show file tree
Hide file tree
Showing 3 changed files with 108 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,10 @@
import java.lang.reflect.Method;
import java.util.Collections;
import java.util.UUID;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import alluxio.shaded.client.org.apache.commons.lang3.StringUtils;
import cn.hutool.core.io.FileUtil;
import cn.hutool.core.lang.Assert;
import cn.hutool.core.text.StrFormatter;
Expand Down Expand Up @@ -84,10 +87,10 @@ public void init() {
protected void initConfig() {
flinkConfigPath = config.getClusterConfig().getFlinkConfigPath();
flinkConfig = config.getFlinkConfig();
String jobName = flinkConfig.getJobName();
if (null != jobName && jobName.contains("_")) {
jobName = jobName.replace("_", "-");
flinkConfig.setJobName(jobName);
if (!isValidTaskName(flinkConfig.getJobName())) {
throw new GatewayException(
"In Kubernetes mode, task names must start and end with a lowercase letter or a digit, "
+ "and can contain lowercase letters, digits, dots, and hyphens in between.");
}
k8sConfig = config.getKubernetesConfig();

Expand Down Expand Up @@ -135,6 +138,21 @@ protected void initConfig() {
}
}

/**
* Check if the jobName is valid
* @param jobName jobName
* @return true if the jobName is valid
*/
boolean isValidTaskName(String jobName) {
String JOB_NAME_PATTERN = "^[a-z0-9][a-z0-9.-]*[a-z0-9]$";
Pattern pattern = Pattern.compile(JOB_NAME_PATTERN);
if (StringUtils.isBlank(jobName)) {
return false;
}
Matcher matcher = pattern.matcher(jobName);
return matcher.matches();
}

protected void preparPodTemplate(String podTemplate, ConfigOption<String> option) {
if (!TextUtil.isEmpty(podTemplate)) {
String filePath = String.format("%s/%s.yaml", tmpConfDir, option.key());
Expand All @@ -155,7 +173,7 @@ public SavePointResult savepointCluster(String savePoint) {
String clusterId = clusterClientFactory.getClusterId(configuration);
if (Asserts.isNull(clusterId)) {
throw new GatewayException(
"No cluster id was specified. Please specify a cluster to which you would like" + " to connect.");
"No cluster id was specified. Please specify a cluster to which you would like to connect.");
}

KubernetesClusterDescriptor clusterDescriptor = clusterClientFactory.createClusterDescriptor(configuration);
Expand All @@ -167,7 +185,7 @@ public SavePointResult savepointJob(String savePoint) {
initConfig();
if (Asserts.isNull(config.getFlinkConfig().getJobId())) {
throw new GatewayException(
"No job id was specified. Please specify a job to which you would like to" + " savepont.");
"No job id was specified. Please specify a job to which you would like to savepont.");
}

addConfigParas(
Expand All @@ -176,7 +194,7 @@ public SavePointResult savepointJob(String savePoint) {
String clusterId = clusterClientFactory.getClusterId(configuration);
if (Asserts.isNull(clusterId)) {
throw new GatewayException(
"No cluster id was specified. Please specify a cluster to which you would like" + " to connect.");
"No cluster id was specified. Please specify a cluster to which you would like to connect.");
}
KubernetesClusterDescriptor clusterDescriptor = clusterClientFactory.createClusterDescriptor(configuration);

Expand Down Expand Up @@ -218,7 +236,7 @@ public void killCluster() {
String clusterId = clusterClientFactory.getClusterId(configuration);
if (Asserts.isNull(clusterId)) {
throw new GatewayException(
"No cluster id was specified. Please specify a cluster to which you would like" + " to connect.");
"No cluster id was specified. Please specify a cluster to which you would like to connect.");
}
if (k8sClientHelper.getClusterIsPresent(clusterId)) {
try (KubernetesClusterDescriptor clusterDescriptor =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.dinky.gateway.kubernetes.utils;

import org.dinky.gateway.kubernetes.decorate.DinkySqlConfigMapDecorate;
import org.dinky.gateway.kubernetes.watcher.DeploymentStatusWatcher;
import org.dinky.utils.TextUtil;

import org.apache.flink.configuration.Configuration;
Expand Down Expand Up @@ -49,6 +50,7 @@
import io.fabric8.kubernetes.client.Config;
import io.fabric8.kubernetes.client.DefaultKubernetesClient;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.dsl.RollableScalableResource;
import io.fabric8.kubernetes.client.utils.Serialization;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -65,9 +67,11 @@ public class K8sClientHelper {
private KubernetesClient kubernetesClient;
protected Configuration configuration;
private DinkySqlConfigMapDecorate sqlFileDecorate;
private DeploymentStatusWatcher deploymentStatusWatch;

public K8sClientHelper(Configuration configuration, String kubeConfig) {
this.configuration = configuration;
deploymentStatusWatch = new DeploymentStatusWatcher();
initKubeClient(kubeConfig);
}

Expand Down Expand Up @@ -113,12 +117,12 @@ private void initKubeClient(String kubeConfig) {
*/
public Deployment createDinkyResource() {
log.info("createDinkyResource");
Deployment deployment = kubernetesClient
RollableScalableResource<Deployment> deploymentRollableScalableResource = kubernetesClient
.apps()
.deployments()
.inNamespace(configuration.get(KubernetesConfigOptions.NAMESPACE))
.withName(configuration.get(KubernetesConfigOptions.CLUSTER_ID))
.get();
.withName(configuration.get(KubernetesConfigOptions.CLUSTER_ID));
Deployment deployment = deploymentRollableScalableResource.get();
List<HasMetadata> resources = getSqlFileDecorate().buildResources();
// set owner reference
OwnerReference deploymentOwnerReference = new OwnerReferenceBuilder()
Expand All @@ -134,13 +138,15 @@ public Deployment createDinkyResource() {
resource.getMetadata().setOwnerReferences(Collections.singletonList(deploymentOwnerReference)));
// create resources
resources.forEach(resource -> log.info(Serialization.asYaml(resource)));
deploymentRollableScalableResource.watch(deploymentStatusWatch);
kubernetesClient.resourceList(resources).createOrReplace();
return deployment;
}

/**
* initPodTemplate
* Preprocess the pod template
*
* @param sqlStatement
* @return
*/
Expand All @@ -166,8 +172,7 @@ public Pod decoratePodTemplate(String sqlStatement, String podTemplate) {

/**
* dumpPod2Str
*
* */
*/
public String dumpPod2Str(Pod pod) {
// use snakyaml to serialize the pod
Representer representer = new IgnoreNullRepresenter();
Expand All @@ -179,9 +184,11 @@ public String dumpPod2Str(Pod pod) {
Yaml yaml = new Yaml(representer, options);
return yaml.dump(pod);
}

/**
* close
* delete the temporary directory and close the client
*
* @return
*/
public boolean close() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.dinky.gateway.kubernetes.watcher;

import org.apache.hadoop.util.StringUtils;

import java.util.List;

import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.core.util.ObjectUtil;
import io.fabric8.kubernetes.api.model.apps.Deployment;
import io.fabric8.kubernetes.api.model.apps.DeploymentCondition;
import io.fabric8.kubernetes.client.Watcher;
import io.fabric8.kubernetes.client.WatcherException;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class DeploymentStatusWatcher implements Watcher<Deployment> {

@Override
public void eventReceived(Action action, Deployment deployment) {
String deploymentName = deployment.getMetadata().getName();
log.info("deployment name: {}, deployment action: {}", deploymentName, action);
if (ObjectUtil.isNotNull(deployment.getStatus())
&& CollectionUtil.isNotEmpty(deployment.getStatus().getConditions())) {
List<DeploymentCondition> conditions = deployment.getStatus().getConditions();
conditions.forEach(condition -> {
if (StringUtils.equalsIgnoreCase(condition.getStatus(), "true")) {
log.info(
"deployment name: {}, deployment status: {}, message: {}",
deploymentName,
condition.getStatus(),
condition.getMessage());
} else {
log.warn(
"deployment name: {}, deployment status: {}, message: {}",
deploymentName,
condition.getStatus(),
condition.getMessage());
}
});
}
}

@Override
public void onClose(WatcherException cause) {
if (cause != null) {
log.error("Watcher closed due to exception: {}", cause.getMessage());
} else {
log.info("Watcher closed gracefully.");
}
}
}

0 comments on commit d1aec78

Please sign in to comment.