Skip to content

Commit

Permalink
Merge pull request #91 from hnarimiya/k8s-requestconfig
Browse files Browse the repository at this point in the history
K8s requestconfig
  • Loading branch information
hnarimiya authored Jan 11, 2024
2 parents 328b4c6 + 7646259 commit 14b5e22
Show file tree
Hide file tree
Showing 2 changed files with 139 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,16 @@
import com.google.common.base.Optional;
import io.digdag.client.config.Config;
import io.digdag.client.config.ConfigException;
import io.digdag.client.config.ConfigFactory;
import io.digdag.core.storage.StorageManager;

import java.nio.charset.Charset;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;

import static io.digdag.client.DigdagClient.objectMapper;

public class KubernetesClientConfig
{
private static final String KUBERNETES_CLIENT_PARAMS_PREFIX = "agent.command_executor.kubernetes.";
Expand All @@ -19,42 +22,41 @@ public static KubernetesClientConfig create(final Optional<String> name,
final Config systemConfig,
final Config requestConfig)
{
if (requestConfig.has("kubernetes")) {
// from task request config
return KubernetesClientConfig.createFromTaskRequestConfig(name, requestConfig.getNested("kubernetes"));
if (!systemConfig.get("agent.command_executor.type", String.class, "").equals("kubernetes")) {
throw new ConfigException("agent.command_executor.type: is not 'kubernetes'");
}
else {
// from system config
return KubernetesClientConfig.createFromSystemConfig(name, systemConfig);
String clusterName;
if (!name.isPresent()) {
clusterName = systemConfig.get(KUBERNETES_CLIENT_PARAMS_PREFIX + "name", String.class); // ConfigException
} else {
clusterName = name.get();
}
final String keyPrefix = KUBERNETES_CLIENT_PARAMS_PREFIX + clusterName + ".";
Config extractedSystemConfig = StorageManager.extractKeyPrefix(systemConfig, keyPrefix);;
Config extractedRequestConfig;
if (systemConfig.get(KUBERNETES_CLIENT_PARAMS_PREFIX + "allow_configure_workflow_definition", Boolean.class, false)
&& requestConfig != null
&& requestConfig.has("kubernetes")) {
if (clusterName == null) {
clusterName = requestConfig.get("name", String.class); // ConfigException
}
extractedRequestConfig = requestConfig.getNested("kubernetes");
} else {
extractedRequestConfig = newEmptyConfig();
}

// Create a config that merges RequestConfig with SystemConfig
final Config config = extractedSystemConfig.merge(extractedRequestConfig);
return KubernetesClientConfig.createKubeConfig(clusterName, config);
}

private static KubernetesClientConfig createFromTaskRequestConfig(final Optional<String> name,
final Config config)
{
// TODO
// We'd better to customize cluster config by task request config??
throw new UnsupportedOperationException("Not support yet");
private static Config newEmptyConfig() {
return new ConfigFactory(objectMapper()).create();
}

@VisibleForTesting
static KubernetesClientConfig createFromSystemConfig(final Optional<String> name,
final io.digdag.client.config.Config systemConfig)
{
final String clusterName;
if (!name.isPresent()) {
if (!systemConfig.get("agent.command_executor.type", String.class, "").equals("kubernetes")) {
throw new ConfigException("agent.command_executor.type: is not 'kubernetes'");
}
clusterName = systemConfig.get(KUBERNETES_CLIENT_PARAMS_PREFIX + "name", String.class);
}
else {
clusterName = name.get();
}
final String keyPrefix = KUBERNETES_CLIENT_PARAMS_PREFIX + clusterName + ".";
final Config extracted = StorageManager.extractKeyPrefix(systemConfig, keyPrefix);
if (extracted.has("kube_config_path")) {
String kubeConfigPath = extracted.get("kube_config_path", String.class);
private static KubernetesClientConfig createKubeConfig(final String clusterName, final Config config){
if (config.has("kube_config_path")) {
String kubeConfigPath = config.get("kube_config_path", String.class);
io.fabric8.kubernetes.client.Config validatedKubeConfig;
validatedKubeConfig = validateKubeConfig(getKubeConfigFromPath(kubeConfigPath));
return create(clusterName,
Expand All @@ -63,7 +65,7 @@ static KubernetesClientConfig createFromSystemConfig(final Optional<String> name
validatedKubeConfig.getOauthToken(),
validatedKubeConfig.getNamespace());
} else {
final Config validatedConfig = validateConfig(extracted);
final Config validatedConfig = validateConfig(config);
return create(clusterName,
validatedConfig.get("master", String.class),
validatedConfig.get("certs_ca_data", String.class),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,16 @@ public void testCreateFromSystemConfig()
.set(KUBERNETES_CLIENT_PARAMS_PREFIX+"test.oauth_token", "test=")
.set(KUBERNETES_CLIENT_PARAMS_PREFIX+"test.namespace", "default");

KubernetesClientConfig kubernetesClientConfig = KubernetesClientConfig.createFromSystemConfig(clusterName, systemConfig);
final Config kubernetesConfig = cf.create()
.set("test.master", "https://127.0.0.1")
.set("test.certs_ca_data", "test=")
.set("test.oauth_token", "test=")
.set("test.namespace", "default");

final Config requestConfig = cf.create()
.setNested("kubernetes", kubernetesConfig);

KubernetesClientConfig kubernetesClientConfig = KubernetesClientConfig.create(clusterName, systemConfig, null);

String masterUrl = "https://127.0.0.1";
String namespace = "default";
Expand All @@ -79,7 +88,7 @@ public void testCreateFromSystemConfigWithKubeConfig()
.set("agent.command_executor.type", "kubernetes")
.set(KUBERNETES_CLIENT_PARAMS_PREFIX+"test.kube_config_path", kubeConfigPath);

KubernetesClientConfig kubernetesClientConfig = KubernetesClientConfig.createFromSystemConfig(clusterName, systemConfig);
KubernetesClientConfig kubernetesClientConfig = KubernetesClientConfig.create(clusterName, systemConfig, null);

String masterUrl = "https://127.0.0.1";
String namespace = "default";
Expand All @@ -90,4 +99,99 @@ public void testCreateFromSystemConfigWithKubeConfig()
assertThat(oauthToken, is(kubernetesClientConfig.getOauthToken()));
assertThat(namespace, is(kubernetesClientConfig.getNamespace()));
}

@Test
public void testCreateFromRequestConfig()
throws Exception
{
final Config systemConfig = cf.create()
.set("agent.command_executor.type", "kubernetes")
.set("agent.command_executor.kubernetes.allow_configure_workflow_definition", true);

final Config kubernetesConfig = cf.create()
.set("master", "https://127.0.0.1")
.set("certs_ca_data", "test=")
.set("oauth_token", "test=")
.set("namespace", "default");

final Config requestConfig = cf.create()
.setNested("kubernetes", kubernetesConfig);

KubernetesClientConfig kubernetesClientConfig = KubernetesClientConfig.create(clusterName, systemConfig, requestConfig);

String masterUrl = "https://127.0.0.1";
String namespace = "default";
String caCertData = "test=";
String oauthToken = "test=";
assertThat(masterUrl, is(kubernetesClientConfig.getMaster()));
assertThat(caCertData, is(kubernetesClientConfig.getCertsCaData()));
assertThat(oauthToken, is(kubernetesClientConfig.getOauthToken()));
assertThat(namespace, is(kubernetesClientConfig.getNamespace()));
}

@Test
public void testCreateFromRequestConfigAndSystemConfigMerge()
throws Exception
{

final Config systemConfig = cf.create()
.set("agent.command_executor.type", "kubernetes")
.set(KUBERNETES_CLIENT_PARAMS_PREFIX+"test.master", "https://127.0.0.1")
.set(KUBERNETES_CLIENT_PARAMS_PREFIX+"test.certs_ca_data", "test=")
.set(KUBERNETES_CLIENT_PARAMS_PREFIX+"test.oauth_token", "test=")
.set(KUBERNETES_CLIENT_PARAMS_PREFIX+"test.namespace", "default")
.set("agent.command_executor.kubernetes.allow_configure_workflow_definition", true);

final Config kubernetesConfig = cf.create()
.set("master", "https://localhost")
// .set("certs_ca_data", "test=")
// .set("oauth_token", "test=")
.set("namespace", "request");

final Config requestConfig = cf.create()
.setNested("kubernetes", kubernetesConfig);

KubernetesClientConfig kubernetesClientConfig = KubernetesClientConfig.create(clusterName, systemConfig, requestConfig);

String masterUrl = "https://localhost";
String namespace = "request";
String caCertData = "test=";
String oauthToken = "test=";
assertThat(masterUrl, is(kubernetesClientConfig.getMaster()));
assertThat(caCertData, is(kubernetesClientConfig.getCertsCaData()));
assertThat(oauthToken, is(kubernetesClientConfig.getOauthToken()));
assertThat(namespace, is(kubernetesClientConfig.getNamespace()));
}

@Test
public void testCreateFromRequestConfigAndSystemConfigNotMerge()
throws Exception
{

final Config systemConfig = cf.create()
.set("agent.command_executor.type", "kubernetes")
.set(KUBERNETES_CLIENT_PARAMS_PREFIX+"test.master", "https://127.0.0.1")
.set(KUBERNETES_CLIENT_PARAMS_PREFIX+"test.certs_ca_data", "test=")
.set(KUBERNETES_CLIENT_PARAMS_PREFIX+"test.oauth_token", "test=")
.set(KUBERNETES_CLIENT_PARAMS_PREFIX+"test.namespace", "default");

final Config kubernetesConfig = cf.create()
.set("master", "https://localhost")
.set("namespace", "request");

final Config requestConfig = cf.create()
.setNested("kubernetes", kubernetesConfig);

KubernetesClientConfig kubernetesClientConfig = KubernetesClientConfig.create(clusterName, systemConfig, requestConfig);

String masterUrl = "https://127.0.0.1";
String namespace = "default";
String caCertData = "test=";
String oauthToken = "test=";
assertThat(masterUrl, is(kubernetesClientConfig.getMaster()));
assertThat(caCertData, is(kubernetesClientConfig.getCertsCaData()));
assertThat(oauthToken, is(kubernetesClientConfig.getOauthToken()));
assertThat(namespace, is(kubernetesClientConfig.getNamespace()));
}

}

0 comments on commit 14b5e22

Please sign in to comment.