diff --git a/src/main/java/io/confluent/connect/elasticsearch/DataConverter.java b/src/main/java/io/confluent/connect/elasticsearch/DataConverter.java index fc91eb741..aeddd701e 100644 --- a/src/main/java/io/confluent/connect/elasticsearch/DataConverter.java +++ b/src/main/java/io/confluent/connect/elasticsearch/DataConverter.java @@ -20,6 +20,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; import io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.BehaviorOnNullValues; +import io.confluent.connect.elasticsearch.util.ScriptParser; import org.apache.kafka.connect.data.ConnectSchema; import org.apache.kafka.connect.data.Date; import org.apache.kafka.connect.data.Decimal; @@ -43,6 +44,7 @@ import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.index.VersionType; +import org.elasticsearch.script.Script; import org.elasticsearch.xcontent.XContentType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -185,11 +187,44 @@ public DocWriteRequest convertRecord(SinkRecord record, String index) { new IndexRequest(index).id(id).source(payload, XContentType.JSON).opType(opType), record ); + case SCRIPTED_UPSERT: + try { + + if (config.getIsPayloadAsParams()) { + return buildUpdateRequestWithParams(index, payload, id); + } + + Script script = ScriptParser.parseScript(config.getScript()); + + return new UpdateRequest(index, id) + .doc(payload, XContentType.JSON) + .upsert(payload, XContentType.JSON) + .retryOnConflict(Math.min(config.maxInFlightRequests(), 5)) + .script(script) + .scriptedUpsert(true); + + } catch (JsonProcessingException jsonProcessingException) { + throw new RuntimeException(jsonProcessingException); + } default: return null; // shouldn't happen } } + private UpdateRequest buildUpdateRequestWithParams(String index, String payload, String id) + throws JsonProcessingException { + + Script script = ScriptParser.parseScriptWithParams(config.getScript(), payload); + + UpdateRequest updateRequest = + new UpdateRequest(index, id) + .retryOnConflict(Math.min(config.maxInFlightRequests(), 5)) + .script(script) + .scriptedUpsert(true); + + return updateRequest; + } + private String getPayload(SinkRecord record) { if (record.value() == null) { return null; diff --git a/src/main/java/io/confluent/connect/elasticsearch/ElasticsearchSinkConnectorConfig.java b/src/main/java/io/confluent/connect/elasticsearch/ElasticsearchSinkConnectorConfig.java index 51c7d4c85..d715e02c4 100644 --- a/src/main/java/io/confluent/connect/elasticsearch/ElasticsearchSinkConnectorConfig.java +++ b/src/main/java/io/confluent/connect/elasticsearch/ElasticsearchSinkConnectorConfig.java @@ -24,6 +24,8 @@ import java.util.Set; import java.util.stream.Collectors; import java.util.concurrent.TimeUnit; + +import io.confluent.connect.elasticsearch.validator.ScriptValidator; import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigException; @@ -277,6 +279,24 @@ public class ElasticsearchSinkConnectorConfig extends AbstractConfig { ); private static final String WRITE_METHOD_DISPLAY = "Write Method"; private static final String WRITE_METHOD_DEFAULT = WriteMethod.INSERT.name(); + + public static final String UPSERT_SCRIPT_CONFIG = "upsert.script"; + + private static final String UPSERT_SCRIPT_DOC = "Script used for" + + " upserting data to Elasticsearch. This script allows for" + + " customizable behavior upon upserting a document. Please refer to" + + " Elasticsearch scripted upsert documentation"; + + private static final String UPSERT_SCRIPT_DISPLAY = "Upsert Script"; + + public static final String PAYLOAD_AS_PARAMS_CONFIG = "payload.as.params"; + + private static final String PAYLOAD_AS_PARAMS_DOC = "Defines Kafka payload will be injected" + + " into upsert.script script component as params object"; + + private static final String PAYLOAD_AS_PARAMS_DISPLAY = "Payload as Params"; + + public static final String LOG_SENSITIVE_DATA_CONFIG = "log.sensitive.data"; private static final String LOG_SENSITIVE_DATA_DISPLAY = "Log Sensitive data"; private static final String LOG_SENSITIVE_DATA_DOC = "If true, logs sensitive data " @@ -408,7 +428,8 @@ public enum SecurityProtocol { public enum WriteMethod { INSERT, - UPSERT + UPSERT, + SCRIPTED_UPSERT } protected static ConfigDef baseConfigDef() { @@ -622,8 +643,8 @@ private static void addConversionConfigs(ConfigDef configDef) { DATA_CONVERSION_GROUP, ++order, Width.SHORT, - IGNORE_KEY_DISPLAY - ).define( + IGNORE_KEY_DISPLAY) + .define( IGNORE_SCHEMA_CONFIG, Type.BOOLEAN, IGNORE_SCHEMA_DEFAULT, @@ -632,8 +653,8 @@ private static void addConversionConfigs(ConfigDef configDef) { DATA_CONVERSION_GROUP, ++order, Width.SHORT, - IGNORE_SCHEMA_DISPLAY - ).define( + IGNORE_SCHEMA_DISPLAY) + .define( COMPACT_MAP_ENTRIES_CONFIG, Type.BOOLEAN, COMPACT_MAP_ENTRIES_DEFAULT, @@ -642,8 +663,8 @@ private static void addConversionConfigs(ConfigDef configDef) { DATA_CONVERSION_GROUP, ++order, Width.SHORT, - COMPACT_MAP_ENTRIES_DISPLAY - ).define( + COMPACT_MAP_ENTRIES_DISPLAY) + .define( IGNORE_KEY_TOPICS_CONFIG, Type.LIST, IGNORE_KEY_TOPICS_DEFAULT, @@ -652,8 +673,8 @@ private static void addConversionConfigs(ConfigDef configDef) { DATA_CONVERSION_GROUP, ++order, Width.LONG, - IGNORE_KEY_TOPICS_DISPLAY - ).define( + IGNORE_KEY_TOPICS_DISPLAY) + .define( IGNORE_SCHEMA_TOPICS_CONFIG, Type.LIST, IGNORE_SCHEMA_TOPICS_DEFAULT, @@ -662,8 +683,8 @@ private static void addConversionConfigs(ConfigDef configDef) { DATA_CONVERSION_GROUP, ++order, Width.LONG, - IGNORE_SCHEMA_TOPICS_DISPLAY - ).define( + IGNORE_SCHEMA_TOPICS_DISPLAY) + .define( DROP_INVALID_MESSAGE_CONFIG, Type.BOOLEAN, DROP_INVALID_MESSAGE_DEFAULT, @@ -672,8 +693,8 @@ private static void addConversionConfigs(ConfigDef configDef) { DATA_CONVERSION_GROUP, ++order, Width.LONG, - DROP_INVALID_MESSAGE_DISPLAY - ).define( + DROP_INVALID_MESSAGE_DISPLAY) + .define( BEHAVIOR_ON_NULL_VALUES_CONFIG, Type.STRING, BEHAVIOR_ON_NULL_VALUES_DEFAULT.name(), @@ -684,8 +705,8 @@ private static void addConversionConfigs(ConfigDef configDef) { ++order, Width.SHORT, BEHAVIOR_ON_NULL_VALUES_DISPLAY, - new EnumRecommender<>(BehaviorOnNullValues.class) - ).define( + new EnumRecommender<>(BehaviorOnNullValues.class)) + .define( BEHAVIOR_ON_MALFORMED_DOCS_CONFIG, Type.STRING, BEHAVIOR_ON_MALFORMED_DOCS_DEFAULT.name(), @@ -696,8 +717,8 @@ private static void addConversionConfigs(ConfigDef configDef) { ++order, Width.SHORT, BEHAVIOR_ON_MALFORMED_DOCS_DISPLAY, - new EnumRecommender<>(BehaviorOnMalformedDoc.class) - ).define( + new EnumRecommender<>(BehaviorOnMalformedDoc.class)) + .define( EXTERNAL_VERSION_HEADER_CONFIG, Type.STRING, EXTERNAL_VERSION_HEADER_DEFAULT, @@ -706,8 +727,8 @@ private static void addConversionConfigs(ConfigDef configDef) { DATA_CONVERSION_GROUP, ++order, Width.SHORT, - EXTERNAL_VERSION_HEADER_DISPLAY - ).define( + EXTERNAL_VERSION_HEADER_DISPLAY) + .define( WRITE_METHOD_CONFIG, Type.STRING, WRITE_METHOD_DEFAULT, @@ -718,8 +739,30 @@ private static void addConversionConfigs(ConfigDef configDef) { ++order, Width.SHORT, WRITE_METHOD_DISPLAY, - new EnumRecommender<>(WriteMethod.class) - ); + new EnumRecommender<>(WriteMethod.class)) + .define( + UPSERT_SCRIPT_CONFIG, + Type.STRING, + null, + new ScriptValidator(), + Importance.LOW, + UPSERT_SCRIPT_DOC, + DATA_CONVERSION_GROUP, + ++order, + Width.SHORT, + UPSERT_SCRIPT_DISPLAY, + new ScriptValidator()) + .define( + PAYLOAD_AS_PARAMS_CONFIG, + Type.BOOLEAN, + false, + Importance.LOW, + PAYLOAD_AS_PARAMS_DOC, + DATA_CONVERSION_GROUP, + ++order, + Width.SHORT, + PAYLOAD_AS_PARAMS_DISPLAY); + ; } private static void addProxyConfigs(ConfigDef configDef) { @@ -1078,6 +1121,14 @@ public WriteMethod writeMethod() { return WriteMethod.valueOf(getString(WRITE_METHOD_CONFIG).toUpperCase()); } + public String getScript() { + return getString(UPSERT_SCRIPT_CONFIG); + } + + public Boolean getIsPayloadAsParams() { + return getBoolean(PAYLOAD_AS_PARAMS_CONFIG); + } + private static class DataStreamDatasetValidator implements Validator { @Override diff --git a/src/main/java/io/confluent/connect/elasticsearch/util/ScriptParser.java b/src/main/java/io/confluent/connect/elasticsearch/util/ScriptParser.java new file mode 100644 index 000000000..f3b43b14f --- /dev/null +++ b/src/main/java/io/confluent/connect/elasticsearch/util/ScriptParser.java @@ -0,0 +1,61 @@ +/* + * Copyright 2018 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.connect.elasticsearch.util; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.elasticsearch.script.Script; + +import java.util.Map; + +public class ScriptParser { + + private static final ObjectMapper objectMapper = new ObjectMapper(); + + public static Script parseScript(String scriptJson) throws JsonProcessingException { + + Map map = ScriptParser.parseSchemaStringAsJson(scriptJson); + + return Script.parse(map); + } + + private static Map parseSchemaStringAsJson(String scriptJson) + throws JsonProcessingException { + + ObjectMapper objectMapper = new ObjectMapper(); + + Map scriptConverted; + + scriptConverted = + objectMapper.readValue(scriptJson, new TypeReference>() {}); + + return scriptConverted; + } + + public static Script parseScriptWithParams(String scriptJson, String jsonPayload) + throws JsonProcessingException { + + Map map = ScriptParser.parseSchemaStringAsJson(scriptJson); + + Map fields = + objectMapper.readValue(jsonPayload, new TypeReference>() {}); + + map.put("params", fields); + + return Script.parse(map); + } +} diff --git a/src/main/java/io/confluent/connect/elasticsearch/validator/ScriptValidator.java b/src/main/java/io/confluent/connect/elasticsearch/validator/ScriptValidator.java new file mode 100644 index 000000000..760beab4b --- /dev/null +++ b/src/main/java/io/confluent/connect/elasticsearch/validator/ScriptValidator.java @@ -0,0 +1,74 @@ +/* + * Copyright 2018 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.connect.elasticsearch.validator; + +import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.WRITE_METHOD_CONFIG; +import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.WriteMethod.SCRIPTED_UPSERT; + +import com.fasterxml.jackson.core.JsonProcessingException; +import io.confluent.connect.elasticsearch.util.ScriptParser; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigException; +import org.elasticsearch.script.Script; + +public class ScriptValidator implements ConfigDef.Validator, ConfigDef.Recommender { + + @Override + @SuppressWarnings("unchecked") + public void ensureValid(String name, Object value) { + + if (value == null) { + return; + } + + String script = (String) value; + + try { + Script parsedScript = ScriptParser.parseScript(script); + + if (parsedScript.getIdOrCode() == null) { + throw new ConfigException(name, script, "The specified script is missing code"); + } else if (parsedScript.getLang() == null) { + throw new ConfigException(name, script, "The specified script is missing lang"); + } + + } catch (JsonProcessingException jsonProcessingException) { + throw new ConfigException( + name, script, "The specified script is not a valid Elasticsearch painless script"); + } + } + + @Override + public String toString() { + return "A valid script that is able to be parsed"; + } + + @Override + public List validValues(String name, Map parsedConfig) { + if (!parsedConfig.get(WRITE_METHOD_CONFIG).equals(SCRIPTED_UPSERT)) { + return new ArrayList<>(); + } + return null; + } + + @Override + public boolean visible(String name, Map parsedConfig) { + return parsedConfig.get(WRITE_METHOD_CONFIG).equals(SCRIPTED_UPSERT.name()); + } +} diff --git a/src/test/java/io/confluent/connect/elasticsearch/DataConverterTest.java b/src/test/java/io/confluent/connect/elasticsearch/DataConverterTest.java index 03e1b092b..3a6c85e02 100644 --- a/src/test/java/io/confluent/connect/elasticsearch/DataConverterTest.java +++ b/src/test/java/io/confluent/connect/elasticsearch/DataConverterTest.java @@ -22,6 +22,7 @@ import org.apache.kafka.connect.sink.SinkRecord; import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.index.VersionType; import org.junit.Before; import org.junit.Test; @@ -39,8 +40,10 @@ import static io.confluent.connect.elasticsearch.DataConverter.TIMESTAMP_FIELD; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertThrows; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; public class DataConverterTest { @@ -505,6 +508,59 @@ public void testInjectPayloadTimestampEvenIfAlreadyExistsAndTimestampMapNotSet() assertEquals(recordTimestamp, actualRecord.sourceAsMap().get(TIMESTAMP_FIELD)); } + @Test + public void upsertScript() { + props.put(ElasticsearchSinkConnectorConfig.COMPACT_MAP_ENTRIES_CONFIG, "true"); + props.put(ElasticsearchSinkConnectorConfig.IGNORE_KEY_CONFIG, "false"); + props.put(ElasticsearchSinkConnectorConfig.IGNORE_SCHEMA_CONFIG, "false"); + props.put(ElasticsearchSinkConnectorConfig.WRITE_METHOD_CONFIG, ElasticsearchSinkConnectorConfig.WriteMethod.SCRIPTED_UPSERT.name()); + props.put( + ElasticsearchSinkConnectorConfig.UPSERT_SCRIPT_CONFIG, + "{\"lang\":\"painless\",\"source\":\"if ( ctx.op == 'create' ) ctx._source.counter = params.count} else {ctx._source.counter += params.count}\",\"params\":{\"count\":4}}"); + props.put(ElasticsearchSinkConnectorConfig.BEHAVIOR_ON_NULL_VALUES_CONFIG, BehaviorOnNullValues.FAIL.name()); + converter = new DataConverter(new ElasticsearchSinkConnectorConfig(props)); + Schema preProcessedSchema = converter.preProcessSchema(schema); + Struct struct = new Struct(preProcessedSchema).put("string", "myValue"); + SinkRecord sinkRecord = createSinkRecordWithValue(struct); + + UpdateRequest actualRecord = (UpdateRequest) converter.convertRecord(sinkRecord, index); + + assertNotNull(actualRecord.script()); + assertEquals("if ( ctx.op == 'create' ) ctx._source.counter = params.count} else {ctx._source.counter += params.count}", actualRecord.script().getIdOrCode()); + assertEquals("painless", actualRecord.script().getLang()); + assertEquals(4, actualRecord.script().getParams().getOrDefault("count", 0)); + assertTrue(actualRecord.scriptedUpsert()); + assertNotNull(actualRecord.doc()); + } + + @Test + public void upsertScriptWithParamPayload(){ + + props.put(ElasticsearchSinkConnectorConfig.COMPACT_MAP_ENTRIES_CONFIG, "true"); + props.put(ElasticsearchSinkConnectorConfig.IGNORE_KEY_CONFIG, "false"); + props.put(ElasticsearchSinkConnectorConfig.IGNORE_SCHEMA_CONFIG, "false"); + props.put(ElasticsearchSinkConnectorConfig.WRITE_METHOD_CONFIG, ElasticsearchSinkConnectorConfig.WriteMethod.SCRIPTED_UPSERT.name()); + props.put(ElasticsearchSinkConnectorConfig.PAYLOAD_AS_PARAMS_CONFIG, "true"); + props.put( + ElasticsearchSinkConnectorConfig.UPSERT_SCRIPT_CONFIG, + "{\"lang\":\"painless\",\"source\":\"def paramAnswerList = params['answers']; def paramAnswerMap = new HashMap(); for (int i = 0; i < paramAnswerList.length; i++) { def answer = paramAnswerList[i]; paramAnswerMap[answer.questionId] = answer;} if (ctx._source.answers == null) { ctx._source.answers = [];} for (int i = 0; i < ctx._source.answers.length; i++) { if (paramAnswerMap.get(ctx._source.answers[i].questionId) != null) { ctx._source.answers[i].putAll(paramAnswerMap.get(ctx._source.answers[i].questionId)); } }\"}"); + props.put(ElasticsearchSinkConnectorConfig.BEHAVIOR_ON_NULL_VALUES_CONFIG, BehaviorOnNullValues.FAIL.name()); + props.put(ElasticsearchSinkConnectorConfig.IGNORE_SCHEMA_CONFIG, "true"); + converter = new DataConverter(new ElasticsearchSinkConnectorConfig(props)); + + Schema preProcessedSchema = converter.preProcessSchema(schema); + Struct struct = new Struct(preProcessedSchema).put("string", "myValue"); + SinkRecord sinkRecord = createSinkRecordWithValue(struct); + + + UpdateRequest actualRecord = (UpdateRequest) converter.convertRecord(sinkRecord, index); + + Map recordParams = actualRecord.script().getParams(); + + assertEquals("myValue", recordParams.get("string")); + + } + @Test public void testMapPayloadTimestampIfDataStreamSetAndOneTimestampMapSet() { String timestampFieldMap = "onefield"; diff --git a/src/test/java/io/confluent/connect/elasticsearch/ValidatorTest.java b/src/test/java/io/confluent/connect/elasticsearch/ValidatorTest.java index 3dba9ec8a..8b80e077b 100644 --- a/src/test/java/io/confluent/connect/elasticsearch/ValidatorTest.java +++ b/src/test/java/io/confluent/connect/elasticsearch/ValidatorTest.java @@ -39,6 +39,7 @@ import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.SECURITY_PROTOCOL_CONFIG; import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.SSL_CONFIG_PREFIX; import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.KERBEROS_PRINCIPAL_CONFIG; +import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.UPSERT_SCRIPT_CONFIG; import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.WRITE_METHOD_CONFIG; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; @@ -53,6 +54,9 @@ import java.nio.file.Path; import java.util.HashMap; import java.util.Map; +import java.util.stream.Collectors; + +import io.confluent.connect.elasticsearch.validator.ScriptValidator; import org.apache.kafka.common.config.Config; import org.apache.kafka.common.config.ConfigValue; import org.apache.kafka.common.config.SslConfigs; @@ -148,6 +152,44 @@ public void testValidCredentials() { assertNoErrors(result); } + @Test + public void testValidScriptedUpsert() { + props.put(BEHAVIOR_ON_NULL_VALUES_CONFIG, "delete"); + props.put(ElasticsearchSinkConnectorConfig.COMPACT_MAP_ENTRIES_CONFIG, "true"); + props.put(ElasticsearchSinkConnectorConfig.IGNORE_KEY_CONFIG, "false"); + props.put(ElasticsearchSinkConnectorConfig.IGNORE_SCHEMA_CONFIG, "false"); + props.put(ElasticsearchSinkConnectorConfig.WRITE_METHOD_CONFIG, ElasticsearchSinkConnectorConfig.WriteMethod.SCRIPTED_UPSERT.name()); + props.put( + UPSERT_SCRIPT_CONFIG, + "{\"lang\":\"painless\",\"source\":\"def paramAnswerList = params['answers']; def paramAnswerMap = new HashMap(); for (int i = 0; i < paramAnswerList.length; i++) { def answer = paramAnswerList[i]; paramAnswerMap[answer.questionId] = answer;} if (ctx._source.answers == null) { ctx._source.answers = [];} for (int i = 0; i < ctx._source.answers.length; i++) { if (paramAnswerMap.get(ctx._source.answers[i].questionId) != null) { ctx._source.answers[i].putAll(paramAnswerMap.get(ctx._source.answers[i].questionId)); } }\"}"); + validator = new Validator(props, () -> mockClient); + Config result = validator.validate(); + + ScriptValidator scriptValidator = new ScriptValidator(); + + scriptValidator.ensureValid("script", props.get(UPSERT_SCRIPT_CONFIG)); + + Map map = result.configValues().stream().collect(Collectors.toMap(x -> x.name(), x -> x)); + + assertNoErrors(result); + assertTrue(map.get(UPSERT_SCRIPT_CONFIG).visible()); + } + + @Test + public void testInvalidScriptedUpsert() { + props.put(BEHAVIOR_ON_NULL_VALUES_CONFIG, "delete"); + props.put(WRITE_METHOD_CONFIG, "upsert"); + props.put( + UPSERT_SCRIPT_CONFIG, + "{\"lang\":\"painless\",\"source\":\"if ( ctx.op == 'create' ) ctx._source.counter = params.count} else {ctx._source.counter += params.count}\",\"params\":{\"count\":4}}"); + validator = new Validator(props, () -> mockClient); + Config result = validator.validate(); + Map map = result.configValues().stream().collect(Collectors.toMap(x -> x.name(), x -> x)); + + assertNoErrors(result); + assertFalse(map.get(UPSERT_SCRIPT_CONFIG).visible()); + } + @Test public void testInvalidMissingOneDataStreamConfig() { props.put(DATA_STREAM_DATASET_CONFIG, "a_valid_dataset");