From 546bc08072a7f9920c39d4f2b892f49e5ac21cf0 Mon Sep 17 00:00:00 2001 From: Nathaniel Date: Wed, 15 Nov 2023 12:24:08 -0700 Subject: [PATCH] export current state of database for importing into topics --- .gitignore | 2 +- .../src/main/groovy/nucleodb.library.gradle | 2 +- .../modifications/ConnectionCreate.java | 2 +- .../database/modifications/Create.java | 8 +++++++- .../tables/connection/ExportHandler.java | 20 ++++++++++++++++++- .../database/tables/table/ExportHandler.java | 19 +++++++++++++++++- .../library/kafkaLedger/ProducerHandler.java | 8 ++++---- 7 files changed, 51 insertions(+), 10 deletions(-) diff --git a/.gitignore b/.gitignore index 7364669..3670730 100644 --- a/.gitignore +++ b/.gitignore @@ -4,4 +4,4 @@ build .idea NucleoCore.iml data/*.dat -export/*.json \ No newline at end of file +export/*.txt \ No newline at end of file diff --git a/buildSrc/src/main/groovy/nucleodb.library.gradle b/buildSrc/src/main/groovy/nucleodb.library.gradle index b3182ae..3333cde 100644 --- a/buildSrc/src/main/groovy/nucleodb.library.gradle +++ b/buildSrc/src/main/groovy/nucleodb.library.gradle @@ -3,7 +3,7 @@ plugins { } group = 'com.nucleodb' -version = '1.9.42' +version = '1.10.0' repositories { mavenCentral() diff --git a/library/src/main/java/com/nucleocore/library/database/modifications/ConnectionCreate.java b/library/src/main/java/com/nucleocore/library/database/modifications/ConnectionCreate.java index fcdc28b..70b9cf3 100644 --- a/library/src/main/java/com/nucleocore/library/database/modifications/ConnectionCreate.java +++ b/library/src/main/java/com/nucleocore/library/database/modifications/ConnectionCreate.java @@ -18,7 +18,7 @@ public ConnectionCreate() { public ConnectionCreate(Connection connection) { this.connection = connection; - this.time = Instant.now(); + this.time = connection.getDate(); } public ConnectionCreate(String changeUUID, Connection connection) { diff --git a/library/src/main/java/com/nucleocore/library/database/modifications/Create.java b/library/src/main/java/com/nucleocore/library/database/modifications/Create.java index 8ccfb20..655fdcb 100644 --- a/library/src/main/java/com/nucleocore/library/database/modifications/Create.java +++ b/library/src/main/java/com/nucleocore/library/database/modifications/Create.java @@ -18,7 +18,13 @@ public class Create extends Modify{ public Create() { } - + public Create(DataEntry entry) throws IOException{ + this.key = entry.getKey(); + this.masterClass = entry.getData().getClass().getName(); + this.data = Serializer.getObjectMapper().getOm().writeValueAsString(entry.getData()); + this.version = entry.getVersion(); + this.time = entry.getCreated(); + } public Create(String changeUUID, DataEntry entry) throws IOException{ this.changeUUID = changeUUID; this.key = entry.getKey(); diff --git a/library/src/main/java/com/nucleocore/library/database/tables/connection/ExportHandler.java b/library/src/main/java/com/nucleocore/library/database/tables/connection/ExportHandler.java index 135e603..4d94091 100644 --- a/library/src/main/java/com/nucleocore/library/database/tables/connection/ExportHandler.java +++ b/library/src/main/java/com/nucleocore/library/database/tables/connection/ExportHandler.java @@ -1,9 +1,17 @@ package com.nucleocore.library.database.tables.connection; +import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; +import com.nucleocore.library.database.modifications.ConnectionCreate; +import com.nucleocore.library.database.modifications.Create; import com.nucleocore.library.database.tables.table.DataTable; import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.nio.charset.StandardCharsets; +import java.util.stream.Collectors; public class ExportHandler implements Runnable{ static ObjectMapper om = new ObjectMapper().findAndRegisterModules(); @@ -21,7 +29,17 @@ public void run() { try { if (this.connectionHandler.getChanged() > changedSaved) { //System.out.println("Saved connections"); - om.writeValue(new File("./export/connections.json"), this.connectionHandler.getAllConnections()); + OutputStream os = new FileOutputStream("./export/connections.txt", false); + this.connectionHandler.getAllConnections().stream().collect(Collectors.toSet()).stream().forEach(de->{ + try { + os.write((ConnectionCreate.class.getSimpleName() + om.writeValueAsString(new ConnectionCreate(de))+"\n").getBytes(StandardCharsets.UTF_8)); + } catch (JsonProcessingException e) { + e.printStackTrace(); + } catch (IOException e) { + e.printStackTrace(); + } + }); + os.close(); changedSaved = this.connectionHandler.getChanged(); } Thread.sleep(5000); diff --git a/library/src/main/java/com/nucleocore/library/database/tables/table/ExportHandler.java b/library/src/main/java/com/nucleocore/library/database/tables/table/ExportHandler.java index bf3e3a1..2e4b208 100644 --- a/library/src/main/java/com/nucleocore/library/database/tables/table/ExportHandler.java +++ b/library/src/main/java/com/nucleocore/library/database/tables/table/ExportHandler.java @@ -1,8 +1,15 @@ package com.nucleocore.library.database.tables.table; +import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; +import com.nucleocore.library.database.modifications.Create; import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.nio.charset.StandardCharsets; +import java.util.stream.Collectors; public class ExportHandler implements Runnable{ DataTable dataTable; @@ -19,7 +26,17 @@ public void run() { try { if (this.dataTable.getChanged() > changedSaved) { //System.out.println("Saved " + this.dataTable.getConfig().getTable()); - om.writeValue(new File("./export/"+this.dataTable.getConfig().getTable()+".json"), this.dataTable.getEntries()); + OutputStream os = new FileOutputStream("./export/"+this.dataTable.getConfig().getTable()+".txt", false); + this.dataTable.getEntries().stream().collect(Collectors.toSet()).stream().forEach(de->{ + try { + os.write((Create.class.getSimpleName() + om.writeValueAsString(new Create(de))+"\n").getBytes(StandardCharsets.UTF_8)); + } catch (JsonProcessingException e) { + e.printStackTrace(); + } catch (IOException e) { + e.printStackTrace(); + } + }); + os.close(); changedSaved = this.dataTable.getChanged(); } Thread.sleep(5000); diff --git a/library/src/main/java/com/nucleocore/library/kafkaLedger/ProducerHandler.java b/library/src/main/java/com/nucleocore/library/kafkaLedger/ProducerHandler.java index d45bafb..72f8012 100644 --- a/library/src/main/java/com/nucleocore/library/kafkaLedger/ProducerHandler.java +++ b/library/src/main/java/com/nucleocore/library/kafkaLedger/ProducerHandler.java @@ -31,9 +31,9 @@ private KafkaProducer createProducer(String bootstrap, String groupId) { props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); props.put(ProducerConfig.ACKS_CONFIG, "all"); props.put(ProducerConfig.RETRIES_CONFIG, 25); - props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 1500); - props.put(ProducerConfig.LINGER_MS_CONFIG, 500); - props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 500); +// props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 1500); +// props.put(ProducerConfig.LINGER_MS_CONFIG, 200); +// props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 200); return new KafkaProducer(props); } public KafkaProducer getProducer() { @@ -49,7 +49,7 @@ public void push(Modify modify, Callback callback){ ); Future data = getProducer().send(record); while(!data.isDone() && !data.isCancelled()){ - Thread.sleep(30); + Thread.sleep(1L); } //logger.info("produced"); } catch (Exception e) {