diff --git a/src/main/java/com/nucleodb/library/database/modifications/ConnectionUpdate.java b/src/main/java/com/nucleodb/library/database/modifications/ConnectionUpdate.java index c2571d7..415e4de 100644 --- a/src/main/java/com/nucleodb/library/database/modifications/ConnectionUpdate.java +++ b/src/main/java/com/nucleodb/library/database/modifications/ConnectionUpdate.java @@ -42,7 +42,7 @@ public ConnectionUpdate(long version, Instant time, String changes, String uuid) @JsonIgnore public JsonPatch getChangesPatch() { try { - return Serializer.getObjectMapper().getOm().readValue(changes, JsonPatch.class); + return Serializer.getObjectMapper().getOmNonType().readValue(changes, JsonPatch.class); } catch (JsonProcessingException e) { throw new RuntimeException(e); } @@ -51,7 +51,7 @@ public JsonPatch getChangesPatch() { @JsonIgnore public List getOperations() { try { - return Serializer.getObjectMapper().getOm().readValue(changes, new TypeReference>(){}); + return Serializer.getObjectMapper().getOmNonType().readValue(changes, new TypeReference>(){}); } catch (JsonProcessingException e) { throw new RuntimeException(e); } diff --git a/src/main/java/com/nucleodb/library/database/modifications/Update.java b/src/main/java/com/nucleodb/library/database/modifications/Update.java index 38b3043..309b5ae 100644 --- a/src/main/java/com/nucleodb/library/database/modifications/Update.java +++ b/src/main/java/com/nucleodb/library/database/modifications/Update.java @@ -22,14 +22,10 @@ public Update() { } - public Update(String changeUUID, DataEntry entry, JsonPatch changes) { + public Update(String changeUUID, DataEntry entry, String changes) { this.changeUUID = changeUUID; this.key = entry.getKey(); - try { - this.changes = Serializer.getObjectMapper().getOm().writeValueAsString(changes); - } catch (JsonProcessingException e) { - throw new RuntimeException(e); - } + this.changes = changes; this.version = entry.getVersion(); this.time = Instant.now(); } @@ -44,7 +40,7 @@ public void setKey(String key) { @JsonIgnore public JsonPatch getChangesPatch() { try { - return Serializer.getObjectMapper().getOm().readValue(changes, JsonPatch.class); + return Serializer.getObjectMapper().getOmNonType().readValue(changes, JsonPatch.class); } catch (JsonProcessingException e) { throw new RuntimeException(e); } @@ -53,7 +49,7 @@ public JsonPatch getChangesPatch() { @JsonIgnore public List getOperations() { try { - return Serializer.getObjectMapper().getOm().readValue(changes, new TypeReference>(){}); + return Serializer.getObjectMapper().getOmNonType().readValue(changes, new TypeReference>(){}); } catch (JsonProcessingException e) { throw new RuntimeException(e); } diff --git a/src/main/java/com/nucleodb/library/database/tables/connection/ConnectionHandler.java b/src/main/java/com/nucleodb/library/database/tables/connection/ConnectionHandler.java index b78fe1e..95b8f01 100644 --- a/src/main/java/com/nucleodb/library/database/tables/connection/ConnectionHandler.java +++ b/src/main/java/com/nucleodb/library/database/tables/connection/ConnectionHandler.java @@ -2,6 +2,7 @@ import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; import com.github.fge.jsonpatch.JsonPatch; import com.github.fge.jsonpatch.diff.JsonDiff; import com.google.common.cache.Cache; @@ -36,10 +37,12 @@ import org.apache.kafka.common.config.TopicConfig; import java.beans.IntrospectionException; +import java.beans.PropertyDescriptor; import java.io.File; import java.io.IOException; import java.io.Serializable; import java.lang.reflect.InvocationTargetException; +import java.util.Arrays; import java.util.Collections; import java.util.Date; import java.util.LinkedList; @@ -67,6 +70,7 @@ public class ConnectionHandler implements Serializable{ private transient Map> connections = new TreeMap<>(); @JsonIgnore private transient Map> connectionsReverse = new TreeMap<>(); + private transient Set connectionFields = Arrays.stream(Connection.class.getDeclaredFields()).map(f -> f.getName()).collect(Collectors.toSet()); @JsonIgnore private transient Map connectionByUUID = new TreeMap<>(); private Map partitionOffsets = new TreeMap<>(); @@ -418,6 +422,31 @@ private boolean deleteInternal(Connection connection, String changeUUID) throws return false; } + JsonNode fromObject(Object o){ + try { + System.out.println( Serializer.getObjectMapper().getOm().writeValueAsString(o)); + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + try { + return Serializer.getObjectMapper().getOmNonType().readTree( + Serializer.getObjectMapper().getOm().writeValueAsString(o) + ); + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + } + Object fromJsonNode(JsonNode o, Class type){ + try { + return Serializer.getObjectMapper().getOm().readValue( + Serializer.getObjectMapper().getOmNonType().writeValueAsString(o), + type + ); + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + } + private boolean saveInternal(Connection connection, String changeUUID) { if (!allConnections.contains(connection)) { ConnectionCreate createEntry = new ConnectionCreate(changeUUID, connection); @@ -427,7 +456,7 @@ private boolean saveInternal(Connection connection, String changeUUID) { connection.versionIncrease(); List changes = null; Connection oldConnection = connectionByUUID.get(connection.getUuid()); - JsonPatch patch = JsonDiff.asJsonPatch(Serializer.getObjectMapper().getOm().valueToTree(oldConnection), Serializer.getObjectMapper().getOm().valueToTree(connection)); + JsonPatch patch = JsonDiff.asJsonPatch(fromObject(oldConnection), fromObject(connection)); try { String json = Serializer.getObjectMapper().getOm().writeValueAsString(patch); changes = Serializer.getObjectMapper().getOm().readValue(json, List.class); @@ -453,11 +482,11 @@ private boolean saveInternalSync(Connection connection, String changeUUID) throw connection.versionIncrease(); List changes = null; Connection oldConnection = connectionByUUID.get(connection.getUuid()); - JsonPatch patch = JsonDiff.asJsonPatch(Serializer.getObjectMapper().getOm().valueToTree(oldConnection), Serializer.getObjectMapper().getOm().valueToTree(connection)); + JsonPatch patch = JsonDiff.asJsonPatch(fromObject(oldConnection), fromObject(connection)); try { - String json = Serializer.getObjectMapper().getOm().writeValueAsString(patch); + String json = Serializer.getObjectMapper().getOmNonType().writeValueAsString(patch); //Serializer.log(json); - changes = Serializer.getObjectMapper().getOm().readValue(json, List.class); + changes = Serializer.getObjectMapper().getOmNonType().readValue(json, List.class); if (changes != null && changes.size() > 0) { ConnectionUpdate updateEntry = new ConnectionUpdate(connection.getVersion(), json, changeUUID, connection.getUuid()); CountDownLatch countDownLatch = new CountDownLatch(1); @@ -636,13 +665,29 @@ public void modify(Modification mod, Object modification) throws ExecutionExcept } } else { - Connection connectionTmp = (Connection) Utils.getOm().readValue( - u.getChangesPatch().apply(Utils.getOm().valueToTree(conn)).toString(), - config.getConnectionClass() + Connection connectionTmp = (Connection) fromJsonNode( + u.getChangesPatch().apply(fromObject(conn)), + config.getConnectionClass() ); conn.setVersion(u.getVersion()); conn.setModified(u.getTime()); conn.setMetadata(connectionTmp.getMetadata()); + + Arrays.stream(config.getConnectionClass().getDeclaredFields()).map(f->f.getName()).filter(fName->!connectionFields.contains(fName)).forEach( + fName-> { + try { + PropertyDescriptor propertyDescriptor = new PropertyDescriptor(fName, config.getConnectionClass()); + Object obj = propertyDescriptor.getReadMethod().invoke(connectionTmp); + propertyDescriptor.getWriteMethod().invoke(conn, obj); + } catch (IllegalAccessException e) { + throw new RuntimeException(e); + } catch (InvocationTargetException e) { + throw new RuntimeException(e); + } catch (IntrospectionException e) { + throw new RuntimeException(e); + } + } + ); this.changed = new Date().getTime(); consumerResponse(conn, u.getChangeUUID()); triggerEvent(u, conn); diff --git a/src/main/java/com/nucleodb/library/database/tables/table/DataEntry.java b/src/main/java/com/nucleodb/library/database/tables/table/DataEntry.java index 4a625d1..eed0619 100644 --- a/src/main/java/com/nucleodb/library/database/tables/table/DataEntry.java +++ b/src/main/java/com/nucleodb/library/database/tables/table/DataEntry.java @@ -19,7 +19,6 @@ public class DataEntry implements Serializable, Comparable { private static final long serialVersionUID = 1; public String key; public long version = 0; - private JsonNode reference; public T data; private transient String tableName; private Instant created; @@ -27,7 +26,6 @@ public class DataEntry implements Serializable, Comparable { public DataEntry(T obj) { this.data = obj; - this.reference = Serializer.getObjectMapper().getOm().valueToTree(data); this.key = UUID.randomUUID().toString(); this.created = Instant.now(); } @@ -35,7 +33,6 @@ public DataEntry(T obj) { public DataEntry(Create create) throws ClassNotFoundException, JsonProcessingException { this.data = (T) Serializer.getObjectMapper().getOm().readValue(create.getData(), Class.forName(create.getMasterClass())); this.version = create.getVersion(); - this.reference = Serializer.getObjectMapper().getOm().valueToTree(data); this.key = create.getKey(); this.created = create.getTime(); } @@ -82,18 +79,10 @@ public void setVersion(long version) { this.version = version; } - public JsonNode getReference() { - return reference; - } - public T getData() { return data; } - public void setReference(JsonNode reference) { - this.reference = reference; - } - public void setData(T data) { this.data = data; } diff --git a/src/main/java/com/nucleodb/library/database/tables/table/DataTable.java b/src/main/java/com/nucleodb/library/database/tables/table/DataTable.java index 2f2a8c3..99386ca 100644 --- a/src/main/java/com/nucleodb/library/database/tables/table/DataTable.java +++ b/src/main/java/com/nucleodb/library/database/tables/table/DataTable.java @@ -3,6 +3,7 @@ import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.JsonNode; import com.github.fge.jsonpatch.diff.JsonDiff; import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; @@ -509,6 +510,32 @@ private boolean saveInternalConsumer(DataEntry entry, Consumer consum return false; } + // convert object to jsonnode without changing types + JsonNode fromObject(Object o){ + try { + System.out.println( Serializer.getObjectMapper().getOm().writeValueAsString(o)); + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + try { + return Serializer.getObjectMapper().getOmNonType().readTree( + Serializer.getObjectMapper().getOm().writeValueAsString(o) + ); + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + } + Object fromJsonNode(JsonNode o, Class type){ + try { + return Serializer.getObjectMapper().getOm().readValue( + Serializer.getObjectMapper().getOmNonType().writeValueAsString(o), + type + ); + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + } + private boolean saveInternal(DataEntry entry, String changeUUID) { if (!entries.contains(entry)) { try { @@ -521,18 +548,29 @@ private boolean saveInternal(DataEntry entry, String changeUUID) { } else { entry.versionIncrease(); List changes = null; - JsonPatch patch = JsonDiff.asJsonPatch(entry.getReference(), Serializer.getObjectMapper().getOm().valueToTree(entry.getData())); + Set dataEntrySet = get("id", entry.getKey()); + if(dataEntrySet==null || dataEntrySet.isEmpty()){ + try { + consumerResponse(null, changeUUID); + } catch (ExecutionException e) { + throw new RuntimeException(e); + } + return false; + } + DataEntry dataEntry = dataEntrySet.stream().findFirst().get(); + JsonPatch patch = JsonDiff.asJsonPatch(fromObject(dataEntry.getData()), fromObject(entry.getData())); try { - String json = Serializer.getObjectMapper().getOm().writeValueAsString(patch); - changes = Serializer.getObjectMapper().getOm().readValue(json, List.class); + String json = Serializer.getObjectMapper().getOmNonType().writeValueAsString(patch); + changes = Serializer.getObjectMapper().getOmNonType().readValue(json, List.class); + + if (changes != null && changes.size() > 0) { + Update updateEntry = new Update(changeUUID, entry, json); + producer.push(updateEntry.getKey(), updateEntry.getVersion(), updateEntry, null); + return true; + } } catch (JsonProcessingException e) { e.printStackTrace(); } - if (changes != null && changes.size() > 0) { - Update updateEntry = new Update(changeUUID, entry, patch); - producer.push(updateEntry.getKey(), updateEntry.getVersion(), updateEntry, null); - return true; - } } return false; @@ -710,11 +748,9 @@ public void modify(Modification mod, Object modification) { modqueue.notifyAll(); } } else { - de.setReference(u.getChangesPatch().apply(de.getReference())); + de.setData(fromJsonNode(u.getChangesPatch().apply(fromObject(de.getData())), de.getData().getClass())); de.setVersion(u.getVersion()); de.setModified(u.getTime()); - de.setData(Serializer.getObjectMapper().getOm().readValue(de.getReference().toString(), de.getData().getClass())); - //System.out.println(Serializer.getObjectMapper().getOm().writeValueAsString(de.getData())); u.getOperations().forEach(op -> { switch (op.getOp()) { case "replace": diff --git a/src/main/java/com/nucleodb/library/database/utils/Serializer.java b/src/main/java/com/nucleodb/library/database/utils/Serializer.java index 346ba8a..3c5bf3d 100644 --- a/src/main/java/com/nucleodb/library/database/utils/Serializer.java +++ b/src/main/java/com/nucleodb/library/database/utils/Serializer.java @@ -8,11 +8,16 @@ public class Serializer{ private static Serializer objectMapper = new Serializer(); ObjectMapper om; + ObjectMapper omNonType; + public Serializer() { om = new ObjectMapper(); om.findAndRegisterModules(); - om.enableDefaultTyping(ObjectMapper.DefaultTyping.JAVA_LANG_OBJECT); + om.enableDefaultTyping(ObjectMapper.DefaultTyping.EVERYTHING); om.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + omNonType = new ObjectMapper(); + omNonType.findAndRegisterModules(); + omNonType.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); } public static void log(Object o){ @@ -27,6 +32,14 @@ public ObjectMapper getOm() { return om; } + public ObjectMapper getOmNonType() { + return omNonType; + } + + public void setOmNonType(ObjectMapper omNonType) { + this.omNonType = omNonType; + } + public void setOm(ObjectMapper om) { this.om = om; }