Skip to content

Commit

Permalink
Tests and script validation/visibility
Browse files Browse the repository at this point in the history
  • Loading branch information
NicholasDCole committed Mar 9, 2024
1 parent 3f14edb commit 2e04673
Show file tree
Hide file tree
Showing 7 changed files with 104 additions and 84 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import io.confluent.connect.elasticsearch.util.ScriptParser;
import org.apache.kafka.connect.data.ConnectSchema;
import org.apache.kafka.connect.data.Date;
import org.apache.kafka.connect.data.Decimal;
Expand All @@ -51,6 +53,7 @@
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.script.Script;
import org.elasticsearch.xcontent.XContentType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -185,11 +188,19 @@ public DocWriteRequest<?> convertRecord(SinkRecord record, String index) {
new IndexRequest(index).id(id).source(payload, XContentType.JSON).opType(opType),
record);
case SCRIPTED_UPSERT:
Script script = null;

try {
script = ScriptParser.parseScript(config.getScript());
} catch (JsonProcessingException jsonProcessingException) {
throw new RuntimeException(jsonProcessingException);
}

return new UpdateRequest(index, id)
.doc(payload, XContentType.JSON)
.upsert(payload, XContentType.JSON)
.retryOnConflict(Math.min(config.maxInFlightRequests(), 5))
.script(config.script())
.script(script)
.scriptedUpsert(true);
default:
return null; // shouldn't happen
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@
import static org.apache.kafka.common.config.SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG;
import static org.apache.kafka.common.config.SslConfigs.addClientSslSupport;

import com.fasterxml.jackson.core.JsonProcessingException;
import io.confluent.connect.elasticsearch.util.ScriptParser;
import io.confluent.connect.elasticsearch.validator.DataStreamDatasetValidator;
import io.confluent.connect.elasticsearch.validator.FilePathValidator;
import io.confluent.connect.elasticsearch.validator.ScriptValidator;
Expand All @@ -46,7 +44,6 @@
import org.apache.kafka.common.config.ConfigDef.Width;
import org.apache.kafka.common.config.types.Password;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.script.Script;

@SuppressWarnings("checkstyle:LineLength")
public class ElasticsearchSinkConnectorConfig extends AbstractConfig {
Expand Down Expand Up @@ -283,7 +280,7 @@ public class ElasticsearchSinkConnectorConfig extends AbstractConfig {
private static final String WRITE_METHOD_DISPLAY = "Write Method";
private static final String WRITE_METHOD_DEFAULT = WriteMethod.INSERT.name();

public static final String UPSERT_SCRIPT_CONFIG = "upsert_script";
public static final String UPSERT_SCRIPT_CONFIG = "upsert.script";

private static final String UPSERT_SCRIPT_DOC = "Script used for upserting data to Elasticsearch. This script allows for"
+ " customizable behavior upon upserting a document. Please refer to"
Expand Down Expand Up @@ -637,8 +634,8 @@ private static void addConversionConfigs(ConfigDef configDef) {
DATA_CONVERSION_GROUP,
++order,
Width.SHORT,
IGNORE_KEY_DISPLAY
).define(
IGNORE_KEY_DISPLAY)
.define(
IGNORE_SCHEMA_CONFIG,
Type.BOOLEAN,
IGNORE_SCHEMA_DEFAULT,
Expand All @@ -647,8 +644,8 @@ private static void addConversionConfigs(ConfigDef configDef) {
DATA_CONVERSION_GROUP,
++order,
Width.SHORT,
IGNORE_SCHEMA_DISPLAY
).define(
IGNORE_SCHEMA_DISPLAY)
.define(
COMPACT_MAP_ENTRIES_CONFIG,
Type.BOOLEAN,
COMPACT_MAP_ENTRIES_DEFAULT,
Expand All @@ -657,8 +654,8 @@ private static void addConversionConfigs(ConfigDef configDef) {
DATA_CONVERSION_GROUP,
++order,
Width.SHORT,
COMPACT_MAP_ENTRIES_DISPLAY
).define(
COMPACT_MAP_ENTRIES_DISPLAY)
.define(
IGNORE_KEY_TOPICS_CONFIG,
Type.LIST,
IGNORE_KEY_TOPICS_DEFAULT,
Expand All @@ -667,8 +664,8 @@ private static void addConversionConfigs(ConfigDef configDef) {
DATA_CONVERSION_GROUP,
++order,
Width.LONG,
IGNORE_KEY_TOPICS_DISPLAY
).define(
IGNORE_KEY_TOPICS_DISPLAY)
.define(
IGNORE_SCHEMA_TOPICS_CONFIG,
Type.LIST,
IGNORE_SCHEMA_TOPICS_DEFAULT,
Expand All @@ -677,8 +674,8 @@ private static void addConversionConfigs(ConfigDef configDef) {
DATA_CONVERSION_GROUP,
++order,
Width.LONG,
IGNORE_SCHEMA_TOPICS_DISPLAY
).define(
IGNORE_SCHEMA_TOPICS_DISPLAY)
.define(
DROP_INVALID_MESSAGE_CONFIG,
Type.BOOLEAN,
DROP_INVALID_MESSAGE_DEFAULT,
Expand All @@ -687,8 +684,8 @@ private static void addConversionConfigs(ConfigDef configDef) {
DATA_CONVERSION_GROUP,
++order,
Width.LONG,
DROP_INVALID_MESSAGE_DISPLAY
).define(
DROP_INVALID_MESSAGE_DISPLAY)
.define(
BEHAVIOR_ON_NULL_VALUES_CONFIG,
Type.STRING,
BEHAVIOR_ON_NULL_VALUES_DEFAULT.name(),
Expand All @@ -699,8 +696,8 @@ private static void addConversionConfigs(ConfigDef configDef) {
++order,
Width.SHORT,
BEHAVIOR_ON_NULL_VALUES_DISPLAY,
new EnumRecommender<>(BehaviorOnNullValues.class)
).define(
new EnumRecommender<>(BehaviorOnNullValues.class))
.define(
BEHAVIOR_ON_MALFORMED_DOCS_CONFIG,
Type.STRING,
BEHAVIOR_ON_MALFORMED_DOCS_DEFAULT.name(),
Expand All @@ -711,8 +708,8 @@ private static void addConversionConfigs(ConfigDef configDef) {
++order,
Width.SHORT,
BEHAVIOR_ON_MALFORMED_DOCS_DISPLAY,
new EnumRecommender<>(BehaviorOnMalformedDoc.class)
).define(
new EnumRecommender<>(BehaviorOnMalformedDoc.class))
.define(
EXTERNAL_VERSION_HEADER_CONFIG,
Type.STRING,
EXTERNAL_VERSION_HEADER_DEFAULT,
Expand All @@ -721,8 +718,8 @@ private static void addConversionConfigs(ConfigDef configDef) {
DATA_CONVERSION_GROUP,
++order,
Width.SHORT,
EXTERNAL_VERSION_HEADER_DISPLAY
).define(
EXTERNAL_VERSION_HEADER_DISPLAY)
.define(
WRITE_METHOD_CONFIG,
Type.STRING,
WRITE_METHOD_DEFAULT,
Expand All @@ -732,18 +729,19 @@ private static void addConversionConfigs(ConfigDef configDef) {
++order,
Width.SHORT,
WRITE_METHOD_DISPLAY,
new EnumRecommender<>(WriteMethod.class)
).define(
new EnumRecommender<>(WriteMethod.class))
.define(
UPSERT_SCRIPT_CONFIG,
Type.STRING,
UPSERT_SCRIPT_DEFAULT,
null,
new ScriptValidator(),
Importance.LOW,
UPSERT_SCRIPT_DOC,
DATA_CONVERSION_GROUP,
++order,
Width.SHORT,
UPSERT_SCRIPT_DISPLAY);
UPSERT_SCRIPT_DISPLAY,
new ScriptValidator());
}

private static void addProxyConfigs(ConfigDef configDef) {
Expand Down Expand Up @@ -1102,23 +1100,8 @@ public WriteMethod writeMethod() {
return WriteMethod.valueOf(getString(WRITE_METHOD_CONFIG).toUpperCase());
}

public Script script() throws RuntimeException {

String scriptConfig = getString(UPSERT_SCRIPT_CONFIG);

Script script;

try {
Map<String, Object> map = ScriptParser.parseSchemaStringAsJson(scriptConfig);

script = Script.parse(map);
} catch (JsonProcessingException jsonProcessingException) {
throw new RuntimeException(jsonProcessingException);
}



return script;
public String getScript() {
return getString(UPSERT_SCRIPT_CONFIG);
}

public static void main(String[] args) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public static Script parseScript(String scriptJson) throws JsonProcessingExcepti
return Script.parse(map);
}

public static Map<String, Object> parseSchemaStringAsJson(String scriptJson)
private static Map<String, Object> parseSchemaStringAsJson(String scriptJson)
throws JsonProcessingException {

ObjectMapper objectMapper = new ObjectMapper();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,24 @@

package io.confluent.connect.elasticsearch.validator;

import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.WRITE_METHOD_CONFIG;
import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.WriteMethod.SCRIPTED_UPSERT;

import com.fasterxml.jackson.core.JsonProcessingException;
import io.confluent.connect.elasticsearch.util.ScriptParser;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;
import org.elasticsearch.script.Script;

public class ScriptValidator implements ConfigDef.Validator {
public class ScriptValidator implements ConfigDef.Validator, ConfigDef.Recommender {

@Override
@SuppressWarnings("unchecked")
public void ensureValid(String name, Object value) {

if (value == null) {
return;
}
Expand All @@ -51,4 +58,17 @@ public void ensureValid(String name, Object value) {
public String toString() {
return "A valid script that is able to be parsed";
}

@Override
public List<Object> validValues(String name, Map<String, Object> parsedConfig) {
if (!parsedConfig.get(WRITE_METHOD_CONFIG).equals(SCRIPTED_UPSERT)) {
return new ArrayList<>();
}
return null;
}

@Override
public boolean visible(String name, Map<String, Object> parsedConfig) {
return parsedConfig.get(WRITE_METHOD_CONFIG).equals(SCRIPTED_UPSERT.name());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -575,6 +575,9 @@ public void upsertScript() {
props.put(ElasticsearchSinkConnectorConfig.IGNORE_KEY_CONFIG, "false");
props.put(ElasticsearchSinkConnectorConfig.IGNORE_SCHEMA_CONFIG, "false");
props.put(ElasticsearchSinkConnectorConfig.WRITE_METHOD_CONFIG, ElasticsearchSinkConnectorConfig.WriteMethod.SCRIPTED_UPSERT.name());
props.put(
ElasticsearchSinkConnectorConfig.UPSERT_SCRIPT_CONFIG,
"{\"lang\":\"painless\",\"source\":\"if ( ctx.op == 'create' ) ctx._source.counter = params.count} else {ctx._source.counter += params.count}\",\"params\":{\"count\":4}}");
props.put(ElasticsearchSinkConnectorConfig.BEHAVIOR_ON_NULL_VALUES_CONFIG, BehaviorOnNullValues.FAIL.name());
converter = new DataConverter(new ElasticsearchSinkConnectorConfig(props));
Schema preProcessedSchema = converter.preProcessSchema(schema);
Expand All @@ -584,6 +587,9 @@ public void upsertScript() {
UpdateRequest actualRecord = (UpdateRequest) converter.convertRecord(sinkRecord, index);

assertNotNull(actualRecord.script());
assertEquals("if ( ctx.op == 'create' ) ctx._source.counter = params.count} else {ctx._source.counter += params.count}", actualRecord.script().getIdOrCode());
assertEquals("painless", actualRecord.script().getLang());
assertEquals(4, actualRecord.script().getParams().getOrDefault("count", 0));
assertTrue(actualRecord.scriptedUpsert());
assertNotNull(actualRecord.doc());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.config.types.Password;
import org.elasticsearch.script.Script;
import org.junit.Before;
import org.junit.Test;

Expand Down Expand Up @@ -247,17 +246,4 @@ public void testLogSensitiveData(){
config = new ElasticsearchSinkConnectorConfig(props);
assertFalse(config.shouldLogSensitiveData());
}

@Test
public void upsertScriptIsValid(){
props.put(UPSERT_SCRIPT_CONFIG, "{\"script\":{\"lang\":\"...\",\"source\":\"if ( ctx.op == 'create' ) ctx._source.counter = params.count} else {ctx._source.counter += params.count}\",\"params\":{\"count\":4}}}");
props.put(WRITE_METHOD_CONFIG, WriteMethod.SCRIPTED_UPSERT.name());

ElasticsearchSinkConnectorConfig config = new ElasticsearchSinkConnectorConfig(props);

Script script = config.script();

assertNotNull(script);

}
}
62 changes: 38 additions & 24 deletions src/test/java/io/confluent/connect/elasticsearch/ValidatorTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,30 +16,7 @@

package io.confluent.connect.elasticsearch;

import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.BATCH_SIZE_CONFIG;
import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.BEHAVIOR_ON_NULL_VALUES_CONFIG;
import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.CONNECTION_PASSWORD_CONFIG;
import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.CONNECTION_URL_CONFIG;
import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.CONNECTION_USERNAME_CONFIG;
import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.DATA_STREAM_DATASET_CONFIG;
import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.DATA_STREAM_TIMESTAMP_CONFIG;
import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.DATA_STREAM_TYPE_CONFIG;
import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.FLUSH_TIMEOUT_MS_CONFIG;
import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.IGNORE_KEY_CONFIG;
import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.IGNORE_KEY_TOPICS_CONFIG;
import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.IGNORE_SCHEMA_CONFIG;
import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.IGNORE_SCHEMA_TOPICS_CONFIG;
import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.KERBEROS_KEYTAB_PATH_CONFIG;
import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.LINGER_MS_CONFIG;
import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.MAX_BUFFERED_RECORDS_CONFIG;
import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.MAX_IN_FLIGHT_REQUESTS_CONFIG;
import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.PROXY_HOST_CONFIG;
import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.PROXY_PASSWORD_CONFIG;
import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.PROXY_USERNAME_CONFIG;
import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.SECURITY_PROTOCOL_CONFIG;
import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.SSL_CONFIG_PREFIX;
import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.KERBEROS_PRINCIPAL_CONFIG;
import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.WRITE_METHOD_CONFIG;
import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.*;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
Expand All @@ -51,8 +28,11 @@
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.stream.Collectors;

import org.apache.kafka.common.config.Config;
import org.apache.kafka.common.config.ConfigValue;
import org.apache.kafka.common.config.SslConfigs;
Expand Down Expand Up @@ -106,6 +86,40 @@ public void testValidUpsertDeleteOnDefaultConfig() {
assertNoErrors(result);
}

@Test
public void testValidScriptedUpsert() {
props.put(BEHAVIOR_ON_NULL_VALUES_CONFIG, "delete");
props.put(ElasticsearchSinkConnectorConfig.COMPACT_MAP_ENTRIES_CONFIG, "true");
props.put(ElasticsearchSinkConnectorConfig.IGNORE_KEY_CONFIG, "false");
props.put(ElasticsearchSinkConnectorConfig.IGNORE_SCHEMA_CONFIG, "false");
props.put(ElasticsearchSinkConnectorConfig.WRITE_METHOD_CONFIG, ElasticsearchSinkConnectorConfig.WriteMethod.SCRIPTED_UPSERT.name());
props.put(
ElasticsearchSinkConnectorConfig.UPSERT_SCRIPT_CONFIG,
"{\"lang\":\"painless\",\"source\":\"if ( ctx.op == 'create' ) ctx._source.counter = params.count} else {ctx._source.counter += params.count}\",\"params\":{\"count\":4}}");
validator = new Validator(props, () -> mockClient);
Config result = validator.validate();

Map<String, ConfigValue> map = result.configValues().stream().collect(Collectors.toMap(x -> x.name(), x -> x));

assertNoErrors(result);
assertTrue(map.get(UPSERT_SCRIPT_CONFIG).visible());
}

@Test
public void testInvalidScriptedUpsert() {
props.put(BEHAVIOR_ON_NULL_VALUES_CONFIG, "delete");
props.put(WRITE_METHOD_CONFIG, "upsert");
props.put(
ElasticsearchSinkConnectorConfig.UPSERT_SCRIPT_CONFIG,
"{\"lang\":\"painless\",\"source\":\"if ( ctx.op == 'create' ) ctx._source.counter = params.count} else {ctx._source.counter += params.count}\",\"params\":{\"count\":4}}");
validator = new Validator(props, () -> mockClient);
Config result = validator.validate();
Map<String, ConfigValue> map = result.configValues().stream().collect(Collectors.toMap(x -> x.name(), x -> x));

assertNoErrors(result);
assertFalse(map.get(UPSERT_SCRIPT_CONFIG).visible());
}

@Test
public void testInvalidCredentials() {
props.put(CONNECTION_USERNAME_CONFIG, "username");
Expand Down

0 comments on commit 2e04673

Please sign in to comment.