Skip to content

Commit

Permalink
preserve types for unknown objects
Browse files Browse the repository at this point in the history
  • Loading branch information
firestar committed Dec 30, 2023
1 parent 7dad05e commit c91f0c6
Show file tree
Hide file tree
Showing 6 changed files with 119 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public ConnectionUpdate(long version, Instant time, String changes, String uuid)
@JsonIgnore
public JsonPatch getChangesPatch() {
try {
return Serializer.getObjectMapper().getOm().readValue(changes, JsonPatch.class);
return Serializer.getObjectMapper().getOmNonType().readValue(changes, JsonPatch.class);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
Expand All @@ -51,7 +51,7 @@ public JsonPatch getChangesPatch() {
@JsonIgnore
public List<JsonOperations> getOperations() {
try {
return Serializer.getObjectMapper().getOm().readValue(changes, new TypeReference<List<JsonOperations>>(){});
return Serializer.getObjectMapper().getOmNonType().readValue(changes, new TypeReference<List<JsonOperations>>(){});
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,10 @@ public Update() {

}

public Update(String changeUUID, DataEntry entry, JsonPatch changes) {
public Update(String changeUUID, DataEntry entry, String changes) {
this.changeUUID = changeUUID;
this.key = entry.getKey();
try {
this.changes = Serializer.getObjectMapper().getOm().writeValueAsString(changes);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
this.changes = changes;
this.version = entry.getVersion();
this.time = Instant.now();
}
Expand All @@ -44,7 +40,7 @@ public void setKey(String key) {
@JsonIgnore
public JsonPatch getChangesPatch() {
try {
return Serializer.getObjectMapper().getOm().readValue(changes, JsonPatch.class);
return Serializer.getObjectMapper().getOmNonType().readValue(changes, JsonPatch.class);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
Expand All @@ -53,7 +49,7 @@ public JsonPatch getChangesPatch() {
@JsonIgnore
public List<JsonOperations> getOperations() {
try {
return Serializer.getObjectMapper().getOm().readValue(changes, new TypeReference<List<JsonOperations>>(){});
return Serializer.getObjectMapper().getOmNonType().readValue(changes, new TypeReference<List<JsonOperations>>(){});
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.github.fge.jsonpatch.JsonPatch;
import com.github.fge.jsonpatch.diff.JsonDiff;
import com.google.common.cache.Cache;
Expand Down Expand Up @@ -36,10 +37,12 @@
import org.apache.kafka.common.config.TopicConfig;

import java.beans.IntrospectionException;
import java.beans.PropertyDescriptor;
import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.lang.reflect.InvocationTargetException;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.LinkedList;
Expand Down Expand Up @@ -67,6 +70,7 @@ public class ConnectionHandler implements Serializable{
private transient Map<String, Set<Connection>> connections = new TreeMap<>();
@JsonIgnore
private transient Map<String, Set<Connection>> connectionsReverse = new TreeMap<>();
private transient Set<String> connectionFields = Arrays.stream(Connection.class.getDeclaredFields()).map(f -> f.getName()).collect(Collectors.toSet());
@JsonIgnore
private transient Map<String, Connection> connectionByUUID = new TreeMap<>();
private Map<Integer, Long> partitionOffsets = new TreeMap<>();
Expand Down Expand Up @@ -418,6 +422,31 @@ private boolean deleteInternal(Connection connection, String changeUUID) throws
return false;
}

JsonNode fromObject(Object o){
try {
System.out.println( Serializer.getObjectMapper().getOm().writeValueAsString(o));
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
try {
return Serializer.getObjectMapper().getOmNonType().readTree(
Serializer.getObjectMapper().getOm().writeValueAsString(o)
);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}
Object fromJsonNode(JsonNode o, Class type){
try {
return Serializer.getObjectMapper().getOm().readValue(
Serializer.getObjectMapper().getOmNonType().writeValueAsString(o),
type
);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}

private boolean saveInternal(Connection connection, String changeUUID) {
if (!allConnections.contains(connection)) {
ConnectionCreate createEntry = new ConnectionCreate(changeUUID, connection);
Expand All @@ -427,7 +456,7 @@ private boolean saveInternal(Connection connection, String changeUUID) {
connection.versionIncrease();
List<JsonOperations> changes = null;
Connection oldConnection = connectionByUUID.get(connection.getUuid());
JsonPatch patch = JsonDiff.asJsonPatch(Serializer.getObjectMapper().getOm().valueToTree(oldConnection), Serializer.getObjectMapper().getOm().valueToTree(connection));
JsonPatch patch = JsonDiff.asJsonPatch(fromObject(oldConnection), fromObject(connection));
try {
String json = Serializer.getObjectMapper().getOm().writeValueAsString(patch);
changes = Serializer.getObjectMapper().getOm().readValue(json, List.class);
Expand All @@ -453,11 +482,11 @@ private boolean saveInternalSync(Connection connection, String changeUUID) throw
connection.versionIncrease();
List<JsonOperations> changes = null;
Connection oldConnection = connectionByUUID.get(connection.getUuid());
JsonPatch patch = JsonDiff.asJsonPatch(Serializer.getObjectMapper().getOm().valueToTree(oldConnection), Serializer.getObjectMapper().getOm().valueToTree(connection));
JsonPatch patch = JsonDiff.asJsonPatch(fromObject(oldConnection), fromObject(connection));
try {
String json = Serializer.getObjectMapper().getOm().writeValueAsString(patch);
String json = Serializer.getObjectMapper().getOmNonType().writeValueAsString(patch);
//Serializer.log(json);
changes = Serializer.getObjectMapper().getOm().readValue(json, List.class);
changes = Serializer.getObjectMapper().getOmNonType().readValue(json, List.class);
if (changes != null && changes.size() > 0) {
ConnectionUpdate updateEntry = new ConnectionUpdate(connection.getVersion(), json, changeUUID, connection.getUuid());
CountDownLatch countDownLatch = new CountDownLatch(1);
Expand Down Expand Up @@ -636,13 +665,29 @@ public void modify(Modification mod, Object modification) throws ExecutionExcept
}
} else {

Connection connectionTmp = (Connection) Utils.getOm().readValue(
u.getChangesPatch().apply(Utils.getOm().valueToTree(conn)).toString(),
config.getConnectionClass()
Connection connectionTmp = (Connection) fromJsonNode(
u.getChangesPatch().apply(fromObject(conn)),
config.getConnectionClass()
);
conn.setVersion(u.getVersion());
conn.setModified(u.getTime());
conn.setMetadata(connectionTmp.getMetadata());

Arrays.stream(config.getConnectionClass().getDeclaredFields()).map(f->f.getName()).filter(fName->!connectionFields.contains(fName)).forEach(
fName-> {
try {
PropertyDescriptor propertyDescriptor = new PropertyDescriptor(fName, config.getConnectionClass());
Object obj = propertyDescriptor.getReadMethod().invoke(connectionTmp);
propertyDescriptor.getWriteMethod().invoke(conn, obj);
} catch (IllegalAccessException e) {
throw new RuntimeException(e);
} catch (InvocationTargetException e) {
throw new RuntimeException(e);
} catch (IntrospectionException e) {
throw new RuntimeException(e);
}
}
);
this.changed = new Date().getTime();
consumerResponse(conn, u.getChangeUUID());
triggerEvent(u, conn);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,23 +19,20 @@ public class DataEntry<T> implements Serializable, Comparable<DataEntry> {
private static final long serialVersionUID = 1;
public String key;
public long version = 0;
private JsonNode reference;
public T data;
private transient String tableName;
private Instant created;
private Instant modified;

public DataEntry(T obj) {
this.data = obj;
this.reference = Serializer.getObjectMapper().getOm().valueToTree(data);
this.key = UUID.randomUUID().toString();
this.created = Instant.now();
}

public DataEntry(Create create) throws ClassNotFoundException, JsonProcessingException {
this.data = (T) Serializer.getObjectMapper().getOm().readValue(create.getData(), Class.forName(create.getMasterClass()));
this.version = create.getVersion();
this.reference = Serializer.getObjectMapper().getOm().valueToTree(data);
this.key = create.getKey();
this.created = create.getTime();
}
Expand Down Expand Up @@ -82,18 +79,10 @@ public void setVersion(long version) {
this.version = version;
}

public JsonNode getReference() {
return reference;
}

public T getData() {
return data;
}

public void setReference(JsonNode reference) {
this.reference = reference;
}

public void setData(T data) {
this.data = data;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonNode;
import com.github.fge.jsonpatch.diff.JsonDiff;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
Expand Down Expand Up @@ -509,6 +510,32 @@ private boolean saveInternalConsumer(DataEntry entry, Consumer<DataEntry> consum
return false;
}

// convert object to jsonnode without changing types
JsonNode fromObject(Object o){
try {
System.out.println( Serializer.getObjectMapper().getOm().writeValueAsString(o));
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
try {
return Serializer.getObjectMapper().getOmNonType().readTree(
Serializer.getObjectMapper().getOm().writeValueAsString(o)
);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}
Object fromJsonNode(JsonNode o, Class type){
try {
return Serializer.getObjectMapper().getOm().readValue(
Serializer.getObjectMapper().getOmNonType().writeValueAsString(o),
type
);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}

private boolean saveInternal(DataEntry entry, String changeUUID) {
if (!entries.contains(entry)) {
try {
Expand All @@ -521,18 +548,29 @@ private boolean saveInternal(DataEntry entry, String changeUUID) {
} else {
entry.versionIncrease();
List<JsonOperations> changes = null;
JsonPatch patch = JsonDiff.asJsonPatch(entry.getReference(), Serializer.getObjectMapper().getOm().valueToTree(entry.getData()));
Set<DataEntry> dataEntrySet = get("id", entry.getKey());
if(dataEntrySet==null || dataEntrySet.isEmpty()){
try {
consumerResponse(null, changeUUID);
} catch (ExecutionException e) {
throw new RuntimeException(e);
}
return false;
}
DataEntry dataEntry = dataEntrySet.stream().findFirst().get();
JsonPatch patch = JsonDiff.asJsonPatch(fromObject(dataEntry.getData()), fromObject(entry.getData()));
try {
String json = Serializer.getObjectMapper().getOm().writeValueAsString(patch);
changes = Serializer.getObjectMapper().getOm().readValue(json, List.class);
String json = Serializer.getObjectMapper().getOmNonType().writeValueAsString(patch);
changes = Serializer.getObjectMapper().getOmNonType().readValue(json, List.class);

if (changes != null && changes.size() > 0) {
Update updateEntry = new Update(changeUUID, entry, json);
producer.push(updateEntry.getKey(), updateEntry.getVersion(), updateEntry, null);
return true;
}
} catch (JsonProcessingException e) {
e.printStackTrace();
}
if (changes != null && changes.size() > 0) {
Update updateEntry = new Update(changeUUID, entry, patch);
producer.push(updateEntry.getKey(), updateEntry.getVersion(), updateEntry, null);
return true;
}

}
return false;
Expand Down Expand Up @@ -710,11 +748,9 @@ public void modify(Modification mod, Object modification) {
modqueue.notifyAll();
}
} else {
de.setReference(u.getChangesPatch().apply(de.getReference()));
de.setData(fromJsonNode(u.getChangesPatch().apply(fromObject(de.getData())), de.getData().getClass()));
de.setVersion(u.getVersion());
de.setModified(u.getTime());
de.setData(Serializer.getObjectMapper().getOm().readValue(de.getReference().toString(), de.getData().getClass()));
//System.out.println(Serializer.getObjectMapper().getOm().writeValueAsString(de.getData()));
u.getOperations().forEach(op -> {
switch (op.getOp()) {
case "replace":
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,16 @@ public class Serializer{
private static Serializer objectMapper = new Serializer();

ObjectMapper om;
ObjectMapper omNonType;

public Serializer() {
om = new ObjectMapper();
om.findAndRegisterModules();
om.enableDefaultTyping(ObjectMapper.DefaultTyping.JAVA_LANG_OBJECT);
om.enableDefaultTyping(ObjectMapper.DefaultTyping.EVERYTHING);
om.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
omNonType = new ObjectMapper();
omNonType.findAndRegisterModules();
omNonType.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
}

public static void log(Object o){
Expand All @@ -27,6 +32,14 @@ public ObjectMapper getOm() {
return om;
}

public ObjectMapper getOmNonType() {
return omNonType;
}

public void setOmNonType(ObjectMapper omNonType) {
this.omNonType = omNonType;
}

public void setOm(ObjectMapper om) {
this.om = om;
}
Expand Down

0 comments on commit c91f0c6

Please sign in to comment.