diff --git a/src/main/java/io/confluent/connect/elasticsearch/DataConverter.java b/src/main/java/io/confluent/connect/elasticsearch/DataConverter.java index 640fb0e2e..809a9d456 100644 --- a/src/main/java/io/confluent/connect/elasticsearch/DataConverter.java +++ b/src/main/java/io/confluent/connect/elasticsearch/DataConverter.java @@ -28,6 +28,8 @@ import java.util.HashMap; import java.util.List; import java.util.Map; + +import io.confluent.connect.elasticsearch.util.ScriptParser; import org.apache.kafka.connect.data.ConnectSchema; import org.apache.kafka.connect.data.Date; import org.apache.kafka.connect.data.Decimal; @@ -51,6 +53,7 @@ import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.index.VersionType; +import org.elasticsearch.script.Script; import org.elasticsearch.xcontent.XContentType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -185,11 +188,19 @@ public DocWriteRequest convertRecord(SinkRecord record, String index) { new IndexRequest(index).id(id).source(payload, XContentType.JSON).opType(opType), record); case SCRIPTED_UPSERT: + Script script = null; + + try { + script = ScriptParser.parseScript(config.getScript()); + } catch (JsonProcessingException jsonProcessingException) { + throw new RuntimeException(jsonProcessingException); + } + return new UpdateRequest(index, id) .doc(payload, XContentType.JSON) .upsert(payload, XContentType.JSON) .retryOnConflict(Math.min(config.maxInFlightRequests(), 5)) - .script(config.script()) + .script(script) .scriptedUpsert(true); default: return null; // shouldn't happen diff --git a/src/main/java/io/confluent/connect/elasticsearch/ElasticsearchSinkConnectorConfig.java b/src/main/java/io/confluent/connect/elasticsearch/ElasticsearchSinkConnectorConfig.java index 2f0b05bbe..f9b902ada 100644 --- a/src/main/java/io/confluent/connect/elasticsearch/ElasticsearchSinkConnectorConfig.java +++ b/src/main/java/io/confluent/connect/elasticsearch/ElasticsearchSinkConnectorConfig.java @@ -26,8 +26,6 @@ import static org.apache.kafka.common.config.SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG; import static org.apache.kafka.common.config.SslConfigs.addClientSslSupport; -import com.fasterxml.jackson.core.JsonProcessingException; -import io.confluent.connect.elasticsearch.util.ScriptParser; import io.confluent.connect.elasticsearch.validator.DataStreamDatasetValidator; import io.confluent.connect.elasticsearch.validator.FilePathValidator; import io.confluent.connect.elasticsearch.validator.ScriptValidator; @@ -46,7 +44,6 @@ import org.apache.kafka.common.config.ConfigDef.Width; import org.apache.kafka.common.config.types.Password; import org.elasticsearch.common.unit.ByteSizeValue; -import org.elasticsearch.script.Script; @SuppressWarnings("checkstyle:LineLength") public class ElasticsearchSinkConnectorConfig extends AbstractConfig { @@ -283,7 +280,7 @@ public class ElasticsearchSinkConnectorConfig extends AbstractConfig { private static final String WRITE_METHOD_DISPLAY = "Write Method"; private static final String WRITE_METHOD_DEFAULT = WriteMethod.INSERT.name(); - public static final String UPSERT_SCRIPT_CONFIG = "upsert_script"; + public static final String UPSERT_SCRIPT_CONFIG = "upsert.script"; private static final String UPSERT_SCRIPT_DOC = "Script used for upserting data to Elasticsearch. This script allows for" + " customizable behavior upon upserting a document. Please refer to" @@ -637,8 +634,8 @@ private static void addConversionConfigs(ConfigDef configDef) { DATA_CONVERSION_GROUP, ++order, Width.SHORT, - IGNORE_KEY_DISPLAY - ).define( + IGNORE_KEY_DISPLAY) + .define( IGNORE_SCHEMA_CONFIG, Type.BOOLEAN, IGNORE_SCHEMA_DEFAULT, @@ -647,8 +644,8 @@ private static void addConversionConfigs(ConfigDef configDef) { DATA_CONVERSION_GROUP, ++order, Width.SHORT, - IGNORE_SCHEMA_DISPLAY - ).define( + IGNORE_SCHEMA_DISPLAY) + .define( COMPACT_MAP_ENTRIES_CONFIG, Type.BOOLEAN, COMPACT_MAP_ENTRIES_DEFAULT, @@ -657,8 +654,8 @@ private static void addConversionConfigs(ConfigDef configDef) { DATA_CONVERSION_GROUP, ++order, Width.SHORT, - COMPACT_MAP_ENTRIES_DISPLAY - ).define( + COMPACT_MAP_ENTRIES_DISPLAY) + .define( IGNORE_KEY_TOPICS_CONFIG, Type.LIST, IGNORE_KEY_TOPICS_DEFAULT, @@ -667,8 +664,8 @@ private static void addConversionConfigs(ConfigDef configDef) { DATA_CONVERSION_GROUP, ++order, Width.LONG, - IGNORE_KEY_TOPICS_DISPLAY - ).define( + IGNORE_KEY_TOPICS_DISPLAY) + .define( IGNORE_SCHEMA_TOPICS_CONFIG, Type.LIST, IGNORE_SCHEMA_TOPICS_DEFAULT, @@ -677,8 +674,8 @@ private static void addConversionConfigs(ConfigDef configDef) { DATA_CONVERSION_GROUP, ++order, Width.LONG, - IGNORE_SCHEMA_TOPICS_DISPLAY - ).define( + IGNORE_SCHEMA_TOPICS_DISPLAY) + .define( DROP_INVALID_MESSAGE_CONFIG, Type.BOOLEAN, DROP_INVALID_MESSAGE_DEFAULT, @@ -687,8 +684,8 @@ private static void addConversionConfigs(ConfigDef configDef) { DATA_CONVERSION_GROUP, ++order, Width.LONG, - DROP_INVALID_MESSAGE_DISPLAY - ).define( + DROP_INVALID_MESSAGE_DISPLAY) + .define( BEHAVIOR_ON_NULL_VALUES_CONFIG, Type.STRING, BEHAVIOR_ON_NULL_VALUES_DEFAULT.name(), @@ -699,8 +696,8 @@ private static void addConversionConfigs(ConfigDef configDef) { ++order, Width.SHORT, BEHAVIOR_ON_NULL_VALUES_DISPLAY, - new EnumRecommender<>(BehaviorOnNullValues.class) - ).define( + new EnumRecommender<>(BehaviorOnNullValues.class)) + .define( BEHAVIOR_ON_MALFORMED_DOCS_CONFIG, Type.STRING, BEHAVIOR_ON_MALFORMED_DOCS_DEFAULT.name(), @@ -711,8 +708,8 @@ private static void addConversionConfigs(ConfigDef configDef) { ++order, Width.SHORT, BEHAVIOR_ON_MALFORMED_DOCS_DISPLAY, - new EnumRecommender<>(BehaviorOnMalformedDoc.class) - ).define( + new EnumRecommender<>(BehaviorOnMalformedDoc.class)) + .define( EXTERNAL_VERSION_HEADER_CONFIG, Type.STRING, EXTERNAL_VERSION_HEADER_DEFAULT, @@ -721,8 +718,8 @@ private static void addConversionConfigs(ConfigDef configDef) { DATA_CONVERSION_GROUP, ++order, Width.SHORT, - EXTERNAL_VERSION_HEADER_DISPLAY - ).define( + EXTERNAL_VERSION_HEADER_DISPLAY) + .define( WRITE_METHOD_CONFIG, Type.STRING, WRITE_METHOD_DEFAULT, @@ -732,18 +729,19 @@ private static void addConversionConfigs(ConfigDef configDef) { ++order, Width.SHORT, WRITE_METHOD_DISPLAY, - new EnumRecommender<>(WriteMethod.class) - ).define( + new EnumRecommender<>(WriteMethod.class)) + .define( UPSERT_SCRIPT_CONFIG, Type.STRING, - UPSERT_SCRIPT_DEFAULT, + null, new ScriptValidator(), Importance.LOW, UPSERT_SCRIPT_DOC, DATA_CONVERSION_GROUP, ++order, Width.SHORT, - UPSERT_SCRIPT_DISPLAY); + UPSERT_SCRIPT_DISPLAY, + new ScriptValidator()); } private static void addProxyConfigs(ConfigDef configDef) { @@ -1102,23 +1100,8 @@ public WriteMethod writeMethod() { return WriteMethod.valueOf(getString(WRITE_METHOD_CONFIG).toUpperCase()); } - public Script script() throws RuntimeException { - - String scriptConfig = getString(UPSERT_SCRIPT_CONFIG); - - Script script; - - try { - Map map = ScriptParser.parseSchemaStringAsJson(scriptConfig); - - script = Script.parse(map); - } catch (JsonProcessingException jsonProcessingException) { - throw new RuntimeException(jsonProcessingException); - } - - - - return script; + public String getScript() { + return getString(UPSERT_SCRIPT_CONFIG); } public static void main(String[] args) { diff --git a/src/main/java/io/confluent/connect/elasticsearch/util/ScriptParser.java b/src/main/java/io/confluent/connect/elasticsearch/util/ScriptParser.java index b1fa3d03e..700e9ff71 100644 --- a/src/main/java/io/confluent/connect/elasticsearch/util/ScriptParser.java +++ b/src/main/java/io/confluent/connect/elasticsearch/util/ScriptParser.java @@ -30,7 +30,7 @@ public static Script parseScript(String scriptJson) throws JsonProcessingExcepti return Script.parse(map); } - public static Map parseSchemaStringAsJson(String scriptJson) + private static Map parseSchemaStringAsJson(String scriptJson) throws JsonProcessingException { ObjectMapper objectMapper = new ObjectMapper(); diff --git a/src/main/java/io/confluent/connect/elasticsearch/validator/ScriptValidator.java b/src/main/java/io/confluent/connect/elasticsearch/validator/ScriptValidator.java index c70b7137e..760beab4b 100644 --- a/src/main/java/io/confluent/connect/elasticsearch/validator/ScriptValidator.java +++ b/src/main/java/io/confluent/connect/elasticsearch/validator/ScriptValidator.java @@ -15,17 +15,24 @@ package io.confluent.connect.elasticsearch.validator; +import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.WRITE_METHOD_CONFIG; +import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.WriteMethod.SCRIPTED_UPSERT; + import com.fasterxml.jackson.core.JsonProcessingException; import io.confluent.connect.elasticsearch.util.ScriptParser; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigException; import org.elasticsearch.script.Script; -public class ScriptValidator implements ConfigDef.Validator { +public class ScriptValidator implements ConfigDef.Validator, ConfigDef.Recommender { @Override @SuppressWarnings("unchecked") public void ensureValid(String name, Object value) { + if (value == null) { return; } @@ -51,4 +58,17 @@ public void ensureValid(String name, Object value) { public String toString() { return "A valid script that is able to be parsed"; } + + @Override + public List validValues(String name, Map parsedConfig) { + if (!parsedConfig.get(WRITE_METHOD_CONFIG).equals(SCRIPTED_UPSERT)) { + return new ArrayList<>(); + } + return null; + } + + @Override + public boolean visible(String name, Map parsedConfig) { + return parsedConfig.get(WRITE_METHOD_CONFIG).equals(SCRIPTED_UPSERT.name()); + } } diff --git a/src/test/java/io/confluent/connect/elasticsearch/DataConverterTest.java b/src/test/java/io/confluent/connect/elasticsearch/DataConverterTest.java index 24d44b11a..6096fdcf6 100644 --- a/src/test/java/io/confluent/connect/elasticsearch/DataConverterTest.java +++ b/src/test/java/io/confluent/connect/elasticsearch/DataConverterTest.java @@ -575,6 +575,9 @@ public void upsertScript() { props.put(ElasticsearchSinkConnectorConfig.IGNORE_KEY_CONFIG, "false"); props.put(ElasticsearchSinkConnectorConfig.IGNORE_SCHEMA_CONFIG, "false"); props.put(ElasticsearchSinkConnectorConfig.WRITE_METHOD_CONFIG, ElasticsearchSinkConnectorConfig.WriteMethod.SCRIPTED_UPSERT.name()); + props.put( + ElasticsearchSinkConnectorConfig.UPSERT_SCRIPT_CONFIG, + "{\"lang\":\"painless\",\"source\":\"if ( ctx.op == 'create' ) ctx._source.counter = params.count} else {ctx._source.counter += params.count}\",\"params\":{\"count\":4}}"); props.put(ElasticsearchSinkConnectorConfig.BEHAVIOR_ON_NULL_VALUES_CONFIG, BehaviorOnNullValues.FAIL.name()); converter = new DataConverter(new ElasticsearchSinkConnectorConfig(props)); Schema preProcessedSchema = converter.preProcessSchema(schema); @@ -584,6 +587,9 @@ public void upsertScript() { UpdateRequest actualRecord = (UpdateRequest) converter.convertRecord(sinkRecord, index); assertNotNull(actualRecord.script()); + assertEquals("if ( ctx.op == 'create' ) ctx._source.counter = params.count} else {ctx._source.counter += params.count}", actualRecord.script().getIdOrCode()); + assertEquals("painless", actualRecord.script().getLang()); + assertEquals(4, actualRecord.script().getParams().getOrDefault("count", 0)); assertTrue(actualRecord.scriptedUpsert()); assertNotNull(actualRecord.doc()); } diff --git a/src/test/java/io/confluent/connect/elasticsearch/ElasticsearchSinkConnectorConfigTest.java b/src/test/java/io/confluent/connect/elasticsearch/ElasticsearchSinkConnectorConfigTest.java index 18f113bc1..9e1123b5f 100644 --- a/src/test/java/io/confluent/connect/elasticsearch/ElasticsearchSinkConnectorConfigTest.java +++ b/src/test/java/io/confluent/connect/elasticsearch/ElasticsearchSinkConnectorConfigTest.java @@ -16,7 +16,6 @@ import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.config.SslConfigs; import org.apache.kafka.common.config.types.Password; -import org.elasticsearch.script.Script; import org.junit.Before; import org.junit.Test; @@ -247,17 +246,4 @@ public void testLogSensitiveData(){ config = new ElasticsearchSinkConnectorConfig(props); assertFalse(config.shouldLogSensitiveData()); } - - @Test - public void upsertScriptIsValid(){ - props.put(UPSERT_SCRIPT_CONFIG, "{\"script\":{\"lang\":\"...\",\"source\":\"if ( ctx.op == 'create' ) ctx._source.counter = params.count} else {ctx._source.counter += params.count}\",\"params\":{\"count\":4}}}"); - props.put(WRITE_METHOD_CONFIG, WriteMethod.SCRIPTED_UPSERT.name()); - - ElasticsearchSinkConnectorConfig config = new ElasticsearchSinkConnectorConfig(props); - - Script script = config.script(); - - assertNotNull(script); - - } } diff --git a/src/test/java/io/confluent/connect/elasticsearch/ValidatorTest.java b/src/test/java/io/confluent/connect/elasticsearch/ValidatorTest.java index 3dba9ec8a..adf9ba2e9 100644 --- a/src/test/java/io/confluent/connect/elasticsearch/ValidatorTest.java +++ b/src/test/java/io/confluent/connect/elasticsearch/ValidatorTest.java @@ -16,30 +16,7 @@ package io.confluent.connect.elasticsearch; -import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.BATCH_SIZE_CONFIG; -import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.BEHAVIOR_ON_NULL_VALUES_CONFIG; -import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.CONNECTION_PASSWORD_CONFIG; -import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.CONNECTION_URL_CONFIG; -import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.CONNECTION_USERNAME_CONFIG; -import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.DATA_STREAM_DATASET_CONFIG; -import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.DATA_STREAM_TIMESTAMP_CONFIG; -import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.DATA_STREAM_TYPE_CONFIG; -import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.FLUSH_TIMEOUT_MS_CONFIG; -import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.IGNORE_KEY_CONFIG; -import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.IGNORE_KEY_TOPICS_CONFIG; -import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.IGNORE_SCHEMA_CONFIG; -import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.IGNORE_SCHEMA_TOPICS_CONFIG; -import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.KERBEROS_KEYTAB_PATH_CONFIG; -import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.LINGER_MS_CONFIG; -import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.MAX_BUFFERED_RECORDS_CONFIG; -import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.MAX_IN_FLIGHT_REQUESTS_CONFIG; -import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.PROXY_HOST_CONFIG; -import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.PROXY_PASSWORD_CONFIG; -import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.PROXY_USERNAME_CONFIG; -import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.SECURITY_PROTOCOL_CONFIG; -import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.SSL_CONFIG_PREFIX; -import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.KERBEROS_PRINCIPAL_CONFIG; -import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.WRITE_METHOD_CONFIG; +import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.*; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.mockito.ArgumentMatchers.any; @@ -51,8 +28,11 @@ import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; +import java.util.Collections; import java.util.HashMap; import java.util.Map; +import java.util.stream.Collectors; + import org.apache.kafka.common.config.Config; import org.apache.kafka.common.config.ConfigValue; import org.apache.kafka.common.config.SslConfigs; @@ -106,6 +86,40 @@ public void testValidUpsertDeleteOnDefaultConfig() { assertNoErrors(result); } + @Test + public void testValidScriptedUpsert() { + props.put(BEHAVIOR_ON_NULL_VALUES_CONFIG, "delete"); + props.put(ElasticsearchSinkConnectorConfig.COMPACT_MAP_ENTRIES_CONFIG, "true"); + props.put(ElasticsearchSinkConnectorConfig.IGNORE_KEY_CONFIG, "false"); + props.put(ElasticsearchSinkConnectorConfig.IGNORE_SCHEMA_CONFIG, "false"); + props.put(ElasticsearchSinkConnectorConfig.WRITE_METHOD_CONFIG, ElasticsearchSinkConnectorConfig.WriteMethod.SCRIPTED_UPSERT.name()); + props.put( + ElasticsearchSinkConnectorConfig.UPSERT_SCRIPT_CONFIG, + "{\"lang\":\"painless\",\"source\":\"if ( ctx.op == 'create' ) ctx._source.counter = params.count} else {ctx._source.counter += params.count}\",\"params\":{\"count\":4}}"); + validator = new Validator(props, () -> mockClient); + Config result = validator.validate(); + + Map map = result.configValues().stream().collect(Collectors.toMap(x -> x.name(), x -> x)); + + assertNoErrors(result); + assertTrue(map.get(UPSERT_SCRIPT_CONFIG).visible()); + } + + @Test + public void testInvalidScriptedUpsert() { + props.put(BEHAVIOR_ON_NULL_VALUES_CONFIG, "delete"); + props.put(WRITE_METHOD_CONFIG, "upsert"); + props.put( + ElasticsearchSinkConnectorConfig.UPSERT_SCRIPT_CONFIG, + "{\"lang\":\"painless\",\"source\":\"if ( ctx.op == 'create' ) ctx._source.counter = params.count} else {ctx._source.counter += params.count}\",\"params\":{\"count\":4}}"); + validator = new Validator(props, () -> mockClient); + Config result = validator.validate(); + Map map = result.configValues().stream().collect(Collectors.toMap(x -> x.name(), x -> x)); + + assertNoErrors(result); + assertFalse(map.get(UPSERT_SCRIPT_CONFIG).visible()); + } + @Test public void testInvalidCredentials() { props.put(CONNECTION_USERNAME_CONFIG, "username");