Skip to content

Commit

Permalink
fix: ENV VAR kv parsing to handle json values
Browse files Browse the repository at this point in the history
- JOB_KUBE_ANNOTATIONS can have JSON as value
- This fixes the parsin of such values
  • Loading branch information
bHacklv committed Nov 21, 2024
1 parent 70f3fb3 commit 8decfab
Show file tree
Hide file tree
Showing 4 changed files with 129 additions and 29 deletions.
1 change: 1 addition & 0 deletions airbyte-commons-with-dependencies/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,5 @@ dependencies {

testImplementation(libs.mockito.core)
testImplementation(libs.bundles.micronaut.test)
testImplementation(libs.assertj.core)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package io.airbyte.commons.workers.config;

import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

@Slf4j
@NoArgsConstructor(access = AccessLevel.PRIVATE)
class EnvUtils {

private static final Pattern KEY_JSON_VALUE_PATTERN = Pattern.compile("(?<key>[^=]+)\\s*?=\\s*?(?<value>\\{[^=]+})\\s*,?\\s*");
private static final Pattern KEY_VALUE_PATTERN = Pattern.compile("(?<key>[^=]+)=(?<value>[^=,]+),?");
private static final String KEY_GROUP_ALIAS = "key";
private static final String VALUE_GROUP_ALIAS = "value";

/**
* Splits key value pairs from the input string into a map. Each kv-pair is separated by a ','.
* </br>
* The key and the value are separated by '='.
* <p>
* For example:- The following represents two map entries
* </p>
* - key1=value1,key2=value2
* </br>
* - key1={key11: value1},key2={key22: value2}
* </br>
* - key1={key11: value11, key12: value12},key2={key21: value21, key22: value22}
*
* @param input string
* @return map containing kv pairs
*/
public static Map<String, String> splitKVPairsFromEnvString(final String input) {
if (input == null || input.isBlank()) {
return Map.of();
}
final Map<String, String> jsonValuesMatchResult = match(input, KEY_JSON_VALUE_PATTERN);
return jsonValuesMatchResult.isEmpty()
? getKVPairsMatchedWithSimplePattern(input)
: jsonValuesMatchResult;
}

private static Map<String, String> match(final String input, final Pattern pattern) {
final Matcher matcher = pattern.matcher(input);
final Map<String, String> kvResult = new HashMap<>();
while (matcher.find()) {
kvResult.put(matcher.group(KEY_GROUP_ALIAS).trim(), matcher.group(VALUE_GROUP_ALIAS).trim());
}
return kvResult;
}

private static Map<String, String> getKVPairsMatchedWithSimplePattern(final String input) {
final Map<String, String> stringMatchResult = match(input, KEY_VALUE_PATTERN);
if (stringMatchResult.isEmpty()) {
log.warn("No valid key value pairs found in the input string: {}", input);
return Collections.emptyMap();
}
return stringMatchResult;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

package io.airbyte.commons.workers.config;

import com.google.common.base.Splitter;
import com.google.common.base.Strings;
import io.airbyte.config.ResourceRequirements;
import io.airbyte.config.ResourceRequirementsType;
Expand All @@ -24,7 +23,6 @@
import java.util.Optional;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;

/**
Expand Down Expand Up @@ -210,25 +208,25 @@ private WorkerConfigs getConfig(final KubeResourceKey key) {
.orElseThrow(() -> new NoSuchElementException(String.format("Unable to find config: {variant:%s, type:%s, subtype:%s}",
key.variant, key.type, key.subType)));

final Map<String, String> isolatedNodeSelectors = splitKVPairsFromEnvString(workerConfigsDefaults.isolatedNodeSelectors);
final Map<String, String> isolatedNodeSelectors = EnvUtils.splitKVPairsFromEnvString(workerConfigsDefaults.isolatedNodeSelectors);
validateIsolatedPoolConfigInitialization(workerConfigsDefaults.useCustomNodeSelector(), isolatedNodeSelectors);

// if annotations are not defined for this specific resource, then fallback to the default
// resource's annotations
final Map<String, String> annotations;
if (Strings.isNullOrEmpty(kubeResourceConfig.getAnnotations())) {
annotations = splitKVPairsFromEnvString(workerConfigsDefaults.defaultKubeResourceConfig.getAnnotations());
annotations = EnvUtils.splitKVPairsFromEnvString(workerConfigsDefaults.defaultKubeResourceConfig.getAnnotations());
} else {
annotations = splitKVPairsFromEnvString(kubeResourceConfig.getAnnotations());
annotations = EnvUtils.splitKVPairsFromEnvString(kubeResourceConfig.getAnnotations());
}

return new WorkerConfigs(
getResourceRequirementsFrom(kubeResourceConfig, workerConfigsDefaults.defaultKubeResourceConfig()),
TolerationPOJO.getJobKubeTolerations(workerConfigsDefaults.jobKubeTolerations()),
splitKVPairsFromEnvString(kubeResourceConfig.getNodeSelectors()),
EnvUtils.splitKVPairsFromEnvString(kubeResourceConfig.getNodeSelectors()),
workerConfigsDefaults.useCustomNodeSelector() ? Optional.of(isolatedNodeSelectors) : Optional.empty(),
annotations,
splitKVPairsFromEnvString(kubeResourceConfig.getLabels()),
EnvUtils.splitKVPairsFromEnvString(kubeResourceConfig.getLabels()),
workerConfigsDefaults.mainContainerImagePullSecret(),
workerConfigsDefaults.mainContainerImagePullPolicy());
}
Expand Down Expand Up @@ -323,28 +321,6 @@ private Optional<KubeResourceKey> parseKubeResourceKey(final String value) {
return Optional.empty();
}

/**
* Splits key value pairs from the input string into a map. Each kv-pair is separated by a ','. The
* key and the value are separated by '='.
* <p>
* For example:- The following represents two map entries
* </p>
* key1=value1,key2=value2
*
* @param input string
* @return map containing kv pairs
*/
private Map<String, String> splitKVPairsFromEnvString(final String input) {
if (input == null || input.isBlank()) {
return Map.of();
}
return Splitter.on(",")
.splitToStream(input)
.filter(s -> !Strings.isNullOrEmpty(s) && s.contains("="))
.map(s -> s.split("="))
.collect(Collectors.toMap(s -> s[0].trim(), s -> s[1].trim()));
}

private ResourceRequirements getResourceRequirementsFrom(final KubeResourceConfig kubeResourceConfig, final KubeResourceConfig defaultConfig) {
return new ResourceRequirements()
.withCpuLimit(useDefaultIfEmpty(kubeResourceConfig.getCpuLimit(), defaultConfig.getCpuLimit()))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package io.airbyte.commons.workers.config;

import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

import java.util.Collections;
import java.util.Map;
import java.util.stream.Stream;

import static org.assertj.core.api.Assertions.assertThat;

class EnvUtilsTest {

@ParameterizedTest
@MethodSource("splitKVPairsFromEnvString")
void splitKVPairsFromEnvString(String input, Map<String, String> expected) {
final Map<String, String> result = EnvUtils.splitKVPairsFromEnvString(input);
assertThat(result).isEqualTo(expected);
}

private static Stream<Arguments> splitKVPairsFromEnvString() {
return Stream.of(
// unmatched
Arguments.of("key1", Collections.emptyMap()),
Arguments.of("key1,value", Collections.emptyMap()),
Arguments.of("key1-value", Collections.emptyMap()),
Arguments.of("key1:value", Collections.emptyMap()),
// matched k:v pairs
Arguments.of("key1=value1", Map.of("key1", "value1")),
Arguments.of("key1 = value1", Map.of("key1", "value1")),
Arguments.of("key1=value1,key2=value2", Map.of("key1", "value1", "key2", "value2")),
Arguments.of("key1 = value1, key2 = value2", Map.of("key1", "value1", "key2", "value2")),
// matched k:jsonV pairs
Arguments.of("key1={value1}", Map.of("key1", "{value1}")),
Arguments.of("key1={ value1 }", Map.of("key1", "{ value1 }")),
Arguments.of("key1 = { value1 }", Map.of("key1", "{ value1 }")),
Arguments.of("key1={value1},key2={value2}", Map.of("key1", "{value1}", "key2", "{value2}")),
Arguments.of("key1= {value1} , key2={value2}", Map.of("key1", "{value1}", "key2", "{value2}")),
Arguments.of("key1= {value1 } , key2= { value2}", Map.of("key1", "{value1 }", "key2", "{ value2}")),
Arguments.of("key1={key11: value11},key2={key22: value22}", Map.of(
"key1", "{key11: value11}",
"key2", "{key22: value22}"
)),
Arguments.of("key1={key11: value11, key12: value12},key2={key21: value21, key22: value22}", Map.of(
"key1", "{key11: value11, key12: value12}",
"key2", "{key21: value21, key22: value22}"
)),
Arguments.of("key1={key11: value11, key12: value12, key13: {key131: value131}},"
+ "key2={key21: value21, key22: value22, key23: {key231: value231}}", Map.of(
"key1", "{key11: value11, key12: value12, key13: {key131: value131}}",
"key2", "{key21: value21, key22: value22, key23: {key231: value231}}"
))
);
}

}

0 comments on commit 8decfab

Please sign in to comment.