Skip to content

Commit

Permalink
Initial Changes
Browse files Browse the repository at this point in the history
  • Loading branch information
NicholasDCole committed Mar 8, 2024
1 parent f0c057f commit 3f14edb
Show file tree
Hide file tree
Showing 10 changed files with 416 additions and 189 deletions.
148 changes: 73 additions & 75 deletions src/main/java/io/confluent/connect/elasticsearch/DataConverter.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,14 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.BehaviorOnNullValues;
import java.math.BigDecimal;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.kafka.connect.data.ConnectSchema;
import org.apache.kafka.connect.data.Date;
import org.apache.kafka.connect.data.Decimal;
Expand Down Expand Up @@ -47,40 +55,29 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.math.BigDecimal;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class DataConverter {

private static final Logger log = LoggerFactory.getLogger(DataConverter.class);

private static final Converter JSON_CONVERTER;
private static final HeaderConverter HEADER_CONVERTER = new SimpleHeaderConverter();
protected static final String MAP_KEY = "key";
protected static final String MAP_VALUE = "value";
protected static final String TIMESTAMP_FIELD = "@timestamp";

private ObjectMapper objectMapper;
private static final Logger log = LoggerFactory.getLogger(DataConverter.class);
private static final Converter JSON_CONVERTER;
private static final HeaderConverter HEADER_CONVERTER = new SimpleHeaderConverter();

static {
JSON_CONVERTER = new JsonConverter();
JSON_CONVERTER.configure(Collections.singletonMap("schemas.enable", "false"), false);
}

private final ElasticsearchSinkConnectorConfig config;
private final ObjectMapper objectMapper;

/**
* Create a DataConverter, specifying how map entries with string keys within record
* values should be written to JSON. Compact map entries are written as
* <code>"entryKey": "entryValue"</code>, while the non-compact form are written as a nested
* document such as <code>{"key": "entryKey", "value": "entryValue"}</code>. All map entries
* with non-string keys are always written as nested documents.
* Create a DataConverter, specifying how map entries with string keys within record values should
* be written to JSON. Compact map entries are written as <code>"entryKey": "entryValue"</code>,
* while the non-compact form are written as a nested document such as <code>
* {"key": "entryKey", "value": "entryValue"}</code>. All map entries with non-string keys are
* always written as nested documents.
*
* @param config connector config
*/
Expand All @@ -89,6 +86,12 @@ public DataConverter(ElasticsearchSinkConnectorConfig config) {
this.objectMapper = new ObjectMapper();
}

private static String recordString(SinkRecord record) {
return String.format(
"record from topic=%s partition=%s offset=%s",
record.topic(), record.kafkaPartition(), record.kafkaOffset());
}

private String convertKey(Schema keySchema, Object key) {
if (key == null) {
throw new DataException("Key is used as document id and can not be null.");
Expand All @@ -102,8 +105,7 @@ private String convertKey(Schema keySchema, Object key) {
schemaType = ConnectSchema.schemaType(key.getClass());
if (schemaType == null) {
throw new DataException(
"Java class " + key.getClass() + " does not have corresponding schema type."
);
"Java class " + key.getClass() + " does not have corresponding schema type.");
}
} else {
schemaType = keySchema.type();
Expand Down Expand Up @@ -138,8 +140,7 @@ public DocWriteRequest<?> convertRecord(SinkRecord record, String index) {
// index present in ES to delete anyways.
log.trace(
"Ignoring {} with null key, since the record key is used as the ID of the index",
recordString(record)
);
recordString(record));
return null;
}
// Will proceed as normal, ultimately creating a DeleteRequest
Expand All @@ -154,15 +155,15 @@ public DocWriteRequest<?> convertRecord(SinkRecord record, String index) {
recordString(record),
ElasticsearchSinkConnectorConfig.BEHAVIOR_ON_NULL_VALUES_CONFIG,
BehaviorOnNullValues.FAIL,
BehaviorOnNullValues.IGNORE
)
);
BehaviorOnNullValues.IGNORE));
}
}

final String id = config.shouldIgnoreKey(record.topic())
? String.format("%s+%d+%d", record.topic(), record.kafkaPartition(), record.kafkaOffset())
: convertKey(record.keySchema(), record.key());
final String id =
config.shouldIgnoreKey(record.topic())
? String.format(
"%s+%d+%d", record.topic(), record.kafkaPartition(), record.kafkaOffset())
: convertKey(record.keySchema(), record.key());

// delete
if (record.value() == null) {
Expand All @@ -172,7 +173,6 @@ public DocWriteRequest<?> convertRecord(SinkRecord record, String index) {
String payload = getPayload(record);
payload = maybeAddTimestamp(payload, record.timestamp());

// index
switch (config.writeMethod()) {
case UPSERT:
return new UpdateRequest(index, id)
Expand All @@ -183,8 +183,14 @@ public DocWriteRequest<?> convertRecord(SinkRecord record, String index) {
OpType opType = config.isDataStream() ? OpType.CREATE : OpType.INDEX;
return maybeAddExternalVersioning(
new IndexRequest(index).id(id).source(payload, XContentType.JSON).opType(opType),
record
);
record);
case SCRIPTED_UPSERT:
return new UpdateRequest(index, id)
.doc(payload, XContentType.JSON)
.upsert(payload, XContentType.JSON)
.retryOnConflict(Math.min(config.maxInFlightRequests(), 5))
.script(config.script())
.scriptedUpsert(true);
default:
return null; // shouldn't happen
}
Expand All @@ -195,12 +201,14 @@ private String getPayload(SinkRecord record) {
return null;
}

Schema schema = config.shouldIgnoreSchema(record.topic())
? record.valueSchema()
: preProcessSchema(record.valueSchema());
Object value = config.shouldIgnoreSchema(record.topic())
? record.value()
: preProcessValue(record.value(), record.valueSchema(), schema);
Schema schema =
config.shouldIgnoreSchema(record.topic())
? record.valueSchema()
: preProcessSchema(record.valueSchema());
Object value =
config.shouldIgnoreSchema(record.topic())
? record.value()
: preProcessValue(record.value(), record.valueSchema(), schema);

byte[] rawJsonPayload = JSON_CONVERTER.fromConnectData(record.topic(), schema, value);
return new String(rawJsonPayload, StandardCharsets.UTF_8);
Expand All @@ -214,8 +222,10 @@ private String maybeAddTimestamp(String payload, Long timestamp) {
JsonNode jsonNode = objectMapper.readTree(payload);

if (!jsonNode.isObject()) {
throw new DataException("Top level payload contains data of Json type "
+ jsonNode.getNodeType() + ". Required Json object.");
throw new DataException(
"Top level payload contains data of Json type "
+ jsonNode.getNodeType()
+ ". Required Json object.");
}

if (!config.dataStreamTimestampField().isEmpty()) {
Expand All @@ -224,7 +234,8 @@ private String maybeAddTimestamp(String payload, Long timestamp) {
((ObjectNode) jsonNode).put(TIMESTAMP_FIELD, jsonNode.get(timestampField).asText());
return objectMapper.writeValueAsString(jsonNode);
} else {
log.debug("Timestamp field {} is not present in payload. This record may fail or "
log.debug(
"Timestamp field {} is not present in payload. This record may fail or "
+ "be skipped",
timestampField);
}
Expand All @@ -240,34 +251,29 @@ private String maybeAddTimestamp(String payload, Long timestamp) {
}

/**
* In many cases, we explicitly set the record version using the topic's offset.
* This version will, in turn, be checked by Elasticsearch and will throw a versioning
* error if the request represents an equivalent or older version of the record.
* In many cases, we explicitly set the record version using the topic's offset. This version
* will, in turn, be checked by Elasticsearch and will throw a versioning error if the request
* represents an equivalent or older version of the record.
*
* @param request the request currently being constructed for `record`
* @param record the record to be processed
* @return the (possibly modified) request which was passed in
*/
private DocWriteRequest<?> maybeAddExternalVersioning(
DocWriteRequest<?> request,
SinkRecord record
) {
DocWriteRequest<?> request, SinkRecord record) {
if (!config.isDataStream() && !config.shouldIgnoreKey(record.topic())) {
request.versionType(VersionType.EXTERNAL);
if (config.hasExternalVersionHeader()) {
final Header versionHeader = record.headers().lastWithName(config.externalVersionHeader());
final byte[] versionValue = HEADER_CONVERTER.fromConnectHeader(
record.topic(),
versionHeader.key(),
versionHeader.schema(),
versionHeader.value()
);
final byte[] versionValue =
HEADER_CONVERTER.fromConnectHeader(
record.topic(), versionHeader.key(), versionHeader.schema(), versionHeader.value());
try {
//fromConnectHeader byte output is UTF_8
// fromConnectHeader byte output is UTF_8
request.version(Long.parseLong(new String(versionValue, StandardCharsets.UTF_8)));
} catch (NumberFormatException e) {
throw new ConnectException("Error converting to long: "
+ new String(versionValue, StandardCharsets.UTF_8), e);
throw new ConnectException(
"Error converting to long: " + new String(versionValue, StandardCharsets.UTF_8), e);
}
} else {
request.version(record.kafkaOffset());
Expand Down Expand Up @@ -332,10 +338,12 @@ private Schema preProcessMapSchema(Schema schema) {
SchemaBuilder result = SchemaBuilder.map(preprocessedKeySchema, preprocessedValueSchema);
return copySchemaBasics(schema, result).build();
}
Schema elementSchema = SchemaBuilder.struct().name(keyName + "-" + valueName)
.field(MAP_KEY, preprocessedKeySchema)
.field(MAP_VALUE, preprocessedValueSchema)
.build();
Schema elementSchema =
SchemaBuilder.struct()
.name(keyName + "-" + valueName)
.field(MAP_KEY, preprocessedKeySchema)
.field(MAP_VALUE, preprocessedValueSchema)
.build();
return copySchemaBasics(schema, SchemaBuilder.array(elementSchema)).build();
}

Expand Down Expand Up @@ -419,7 +427,7 @@ private Object preProcessLogicalValue(String schemaName, Object value) {
private Object preProcessArrayValue(Object value, Schema schema, Schema newSchema) {
Collection<?> collection = (Collection<?>) value;
List<Object> result = new ArrayList<>();
for (Object element: collection) {
for (Object element : collection) {
result.add(preProcessValue(element, schema.valueSchema(), newSchema.valueSchema()));
}
return result;
Expand All @@ -432,16 +440,15 @@ private Object preProcessMapValue(Object value, Schema schema, Schema newSchema)
Map<?, ?> map = (Map<?, ?>) value;
if (config.useCompactMapEntries() && keySchema.type() == Schema.Type.STRING) {
Map<Object, Object> processedMap = new HashMap<>();
for (Map.Entry<?, ?> entry: map.entrySet()) {
for (Map.Entry<?, ?> entry : map.entrySet()) {
processedMap.put(
preProcessValue(entry.getKey(), keySchema, newSchema.keySchema()),
preProcessValue(entry.getValue(), valueSchema, newValueSchema)
);
preProcessValue(entry.getValue(), valueSchema, newValueSchema));
}
return processedMap;
}
List<Struct> mapStructs = new ArrayList<>();
for (Map.Entry<?, ?> entry: map.entrySet()) {
for (Map.Entry<?, ?> entry : map.entrySet()) {
Struct mapStruct = new Struct(newValueSchema);
Schema mapKeySchema = newValueSchema.field(MAP_KEY).schema();
Schema mapValueSchema = newValueSchema.field(MAP_VALUE).schema();
Expand All @@ -462,13 +469,4 @@ private Object preProcessStructValue(Object value, Schema schema, Schema newSche
}
return newStruct;
}

private static String recordString(SinkRecord record) {
return String.format(
"record from topic=%s partition=%s offset=%s",
record.topic(),
record.kafkaPartition(),
record.kafkaOffset()
);
}
}
Loading

0 comments on commit 3f14edb

Please sign in to comment.