From 9712fd61d02b760233b58e81c50b4b318499e189 Mon Sep 17 00:00:00 2001 From: Nathaniel Davidson Date: Sun, 7 Jul 2024 14:05:05 -0700 Subject: [PATCH] cleanup logs, add unit tests for connections, properly load file for saved state --- build.gradle | 2 +- .../java/com/nucleodb/library/ExportTest.java | 122 +++++++++++---- .../com/nucleodb/library/models/Book.java | 30 ++++ .../com/nucleodb/library/models/BookDE.java | 22 +++ .../library/models/WroteConnection.java | 21 +++ .../java/com/nucleodb/library/NucleoDB.java | 1 + .../library/database/lock/LockManager.java | 4 +- .../tables/connection/Connection.java | 4 +- .../tables/connection/ConnectionHandler.java | 139 ++++++++++-------- .../connection/ConnectionProjection.java | 32 ++-- .../tables/connection/ExportHandler.java | 5 +- .../tables/connection/SaveHandler.java | 34 +++-- .../database/tables/table/DataEntry.java | 7 +- .../tables/table/DataEntryProjection.java | 9 +- .../database/tables/table/ExportHandler.java | 5 +- .../tables/table/ModQueueHandler.java | 2 - .../database/tables/table/SaveHandler.java | 6 +- .../exceptions/ObjectNotSavedException.java | 9 ++ .../database/utils/sql/SQLHandler.java | 1 + .../mqs/kafka/KafkaConsumerHandler.java | 75 +++++----- .../nucleodb/library/utils/Serializer.java | 4 +- .../java/com/nucleodb/library/CoreTest.java | 9 +- 22 files changed, 364 insertions(+), 179 deletions(-) create mode 100644 src/integrationTest/java/com/nucleodb/library/models/Book.java create mode 100644 src/integrationTest/java/com/nucleodb/library/models/BookDE.java create mode 100644 src/integrationTest/java/com/nucleodb/library/models/WroteConnection.java create mode 100644 src/main/java/com/nucleodb/library/database/utils/exceptions/ObjectNotSavedException.java diff --git a/build.gradle b/build.gradle index d410cee..1b231ab 100644 --- a/build.gradle +++ b/build.gradle @@ -7,7 +7,7 @@ plugins { } group = 'com.nucleodb' -version = '1.17.1' +version = '1.17.2' repositories { mavenCentral() diff --git a/src/integrationTest/java/com/nucleodb/library/ExportTest.java b/src/integrationTest/java/com/nucleodb/library/ExportTest.java index 380b197..688418f 100644 --- a/src/integrationTest/java/com/nucleodb/library/ExportTest.java +++ b/src/integrationTest/java/com/nucleodb/library/ExportTest.java @@ -2,15 +2,17 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JsonMappingException; +import com.nucleodb.library.database.tables.connection.ConnectionHandler; import com.nucleodb.library.database.tables.table.DataEntry; import com.nucleodb.library.database.tables.table.DataEntryProjection; import com.nucleodb.library.database.tables.table.DataTable; +import com.nucleodb.library.database.utils.InvalidConnectionException; import com.nucleodb.library.database.utils.Serializer; import com.nucleodb.library.database.utils.exceptions.IncorrectDataEntryClassException; import com.nucleodb.library.database.utils.exceptions.IncorrectDataEntryObjectException; import com.nucleodb.library.database.utils.exceptions.MissingDataEntryConstructorsException; -import com.nucleodb.library.models.Author; -import com.nucleodb.library.models.AuthorDE; +import com.nucleodb.library.database.utils.exceptions.ObjectNotSavedException; +import com.nucleodb.library.models.*; import com.nucleodb.library.mqs.kafka.KafkaConfiguration; import com.nucleodb.library.mqs.local.LocalConfiguration; import org.junit.jupiter.api.AfterEach; @@ -18,8 +20,10 @@ import org.junit.jupiter.api.Test; import java.beans.IntrospectionException; +import java.io.IOException; import java.lang.reflect.InvocationTargetException; import java.util.Set; +import java.util.UUID; import java.util.stream.Collectors; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -27,19 +31,20 @@ class ExportTest { NucleoDB nucleoDB; - DataTable table; + DataTable authorTable; + DataTable bookTable; + ConnectionHandler wroteConnections; @BeforeEach - public void createLocalDB() throws IncorrectDataEntryClassException, MissingDataEntryConstructorsException, IntrospectionException, InvocationTargetException, NoSuchMethodException, InstantiationException, IllegalAccessException, IncorrectDataEntryObjectException, InterruptedException { + public void createLocalDB() throws IncorrectDataEntryClassException, MissingDataEntryConstructorsException, IntrospectionException, InvocationTargetException, NoSuchMethodException, InstantiationException, IllegalAccessException, IncorrectDataEntryObjectException, InterruptedException, InvalidConnectionException { nucleoDB = new NucleoDB( NucleoDB.DBType.ALL, c -> { c.getConnectionConfig().setMqsConfiguration(new KafkaConfiguration()); - c.getConnectionConfig().setLoadSaved(true); c.getConnectionConfig().setJsonExport(true); c.getConnectionConfig().setSaveChanges(true); - c.getConnectionConfig().setConnectionFileName("./data/connection.dat"); + c.getConnectionConfig().setConnectionFileName("./data/"+ c.getConnectionConfig().getLabel()+".dat"); c.getConnectionConfig().setExportInterval(50); c.getConnectionConfig().setSaveInterval(50); }, @@ -48,7 +53,7 @@ public void createLocalDB() throws IncorrectDataEntryClassException, MissingData c.getDataTableConfig().setLoadSave(true); c.getDataTableConfig().setSaveChanges(true); c.getDataTableConfig().setJsonExport(true); - c.getDataTableConfig().setTableFileName("./data/datatable.dat"); + c.getDataTableConfig().setTableFileName("./data/"+ c.getDataTableConfig().getTable()+".dat"); c.getDataTableConfig().setExportInterval(50); c.getDataTableConfig().setSaveInterval(50); }, @@ -57,30 +62,74 @@ public void createLocalDB() throws IncorrectDataEntryClassException, MissingData }, "com.nucleodb.library.models" ); - table = nucleoDB.getTable(Author.class); - System.out.println("STARTED"); - table.saveSync(new AuthorDE(new Author("George Orwell", "science-fiction"))); - } + authorTable = nucleoDB.getTable(Author.class); + bookTable = nucleoDB.getTable(Book.class); + wroteConnections = nucleoDB.getConnectionHandler(WroteConnection.class); + + AuthorDE georgeOrwell = new AuthorDE(new Author("George Orwell", "science-fiction")); + BookDE bookDE = new BookDE(new Book("Nineteen Eighty-Four")); + authorTable.saveSync(georgeOrwell); + bookTable.saveSync(bookDE); + AuthorDE author = authorTable.get("name", "George Orwell").stream().findFirst().get(); + BookDE book = bookTable.get("name", "Nineteen Eighty-Four").stream().findFirst().get(); + wroteConnections.saveSync(new WroteConnection(author, book)); - public AuthorDE copy(AuthorDE authorDE) throws JsonProcessingException { - return Serializer.getObjectMapper().getOm().readValue(Serializer.getObjectMapper().getOm().writeValueAsString(authorDE), AuthorDE.class); } + @AfterEach public void deleteEntries() { - System.out.println("DONE"); - table.getEntries().stream().map(author -> { - try { - return copy(author); - } catch (JsonProcessingException e) { - throw new RuntimeException(e); - } - }).collect(Collectors.toSet()).forEach(author-> { - try { - table.deleteSync(author); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - }); + try { + Thread.sleep(2000); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + authorTable + .getEntries() + .stream() + .map(author -> { + try { + return author.copy(AuthorDE.class, true); + } catch (ObjectNotSavedException e) { + throw new RuntimeException(e); + } + }) + .collect(Collectors.toSet()).forEach(author -> { + try { + authorTable.deleteSync(author); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + }); + bookTable + .getEntries() + .stream() + .map(book -> { + try { + return book.copy(BookDE.class, true); + } catch (ObjectNotSavedException e) { + throw new RuntimeException(e); + } + }) + .collect(Collectors.toSet()).forEach(book -> { + try { + bookTable.deleteSync(book); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + }); + wroteConnections + .getAllConnections() + .stream() + .map(c->c.copy(WroteConnection.class,true)) + .collect(Collectors.toSet()).forEach(c -> { + try { + wroteConnections.deleteSync(c); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); try { Thread.sleep(2000); } catch (InterruptedException e) { @@ -91,10 +140,25 @@ public void deleteEntries() { @Test public void checkSaving() throws IncorrectDataEntryObjectException, InterruptedException { AuthorDE edgarAllenPoe = new AuthorDE(new Author("Edgar Allen Poe", "fiction")); - table.saveSync(edgarAllenPoe); + authorTable.saveSync(edgarAllenPoe); + assertEquals( + 1, + authorTable.get( + "id", + edgarAllenPoe.getKey(), + null + ).size() + ); + assertEquals(2, authorTable.getEntries().size()); + Thread.sleep(5000); + } + @Test + public void fileSaving() throws IncorrectDataEntryObjectException, InterruptedException { + AuthorDE edgarAllenPoe = new AuthorDE(new Author("Edgar Allen Poe", "fiction")); + authorTable.saveSync(edgarAllenPoe); assertEquals( 1, - table.get( + authorTable.get( "id", edgarAllenPoe.getKey(), null diff --git a/src/integrationTest/java/com/nucleodb/library/models/Book.java b/src/integrationTest/java/com/nucleodb/library/models/Book.java new file mode 100644 index 0000000..57b85b8 --- /dev/null +++ b/src/integrationTest/java/com/nucleodb/library/models/Book.java @@ -0,0 +1,30 @@ +package com.nucleodb.library.models; + +import com.nucleodb.library.database.index.TrieIndex; +import com.nucleodb.library.database.index.annotation.Index; +import com.nucleodb.library.database.tables.annotation.Table; + +import java.io.Serializable; + +@Table(tableName = "bookIT", dataEntryClass = BookDE.class) +public class Book implements Serializable { + private static final long serialVersionUID = 1; + @Index(type = TrieIndex.class) + String name; + + public Book() { + } + + public Book(String name) { + this.name = name; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + +} \ No newline at end of file diff --git a/src/integrationTest/java/com/nucleodb/library/models/BookDE.java b/src/integrationTest/java/com/nucleodb/library/models/BookDE.java new file mode 100644 index 0000000..b23c60d --- /dev/null +++ b/src/integrationTest/java/com/nucleodb/library/models/BookDE.java @@ -0,0 +1,22 @@ +package com.nucleodb.library.models; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.nucleodb.library.database.modifications.Create; +import com.nucleodb.library.database.tables.table.DataEntry; + +public class BookDE extends DataEntry { + public BookDE(Book obj) { + super(obj); + } + + public BookDE(Create create) throws ClassNotFoundException, JsonProcessingException { + super(create); + } + + public BookDE() { + } + + public BookDE(String key) { + super(key); + } +} diff --git a/src/integrationTest/java/com/nucleodb/library/models/WroteConnection.java b/src/integrationTest/java/com/nucleodb/library/models/WroteConnection.java new file mode 100644 index 0000000..111d44c --- /dev/null +++ b/src/integrationTest/java/com/nucleodb/library/models/WroteConnection.java @@ -0,0 +1,21 @@ +package com.nucleodb.library.models; + +import com.nucleodb.library.database.tables.annotation.Conn; +import com.nucleodb.library.database.tables.connection.Connection; + +import java.util.Map; + +@Conn("IT-WROTE") +public class WroteConnection extends Connection { + public WroteConnection() { + } + + public WroteConnection(AuthorDE from, BookDE to) { + super(from, to); + } + + public WroteConnection(AuthorDE from, BookDE to, Map metadata) { + super(from, to, metadata); + } +} + diff --git a/src/main/java/com/nucleodb/library/NucleoDB.java b/src/main/java/com/nucleodb/library/NucleoDB.java index 28b3c1d..14995fb 100644 --- a/src/main/java/com/nucleodb/library/NucleoDB.java +++ b/src/main/java/com/nucleodb/library/NucleoDB.java @@ -83,6 +83,7 @@ public NucleoDB(DBType dbType, String readToTime, String... packagesToScan) thro public NucleoDB(DBType dbType, String readToTime, Consumer connectionCustomizer, Consumer dataTableCustomizer, Consumer lockCustomizer, String... packagesToScan) throws IncorrectDataEntryClassException, MissingDataEntryConstructorsException, IntrospectionException, InvocationTargetException, NoSuchMethodException, InstantiationException, IllegalAccessException { startLockManager(lockCustomizer); startTables(packagesToScan, dbType, readToTime, dataTableCustomizer); + startConnections(packagesToScan, dbType, readToTime, connectionCustomizer); } private void startLockManager(Consumer customizer) throws IntrospectionException, InvocationTargetException, NoSuchMethodException, InstantiationException, IllegalAccessException { diff --git a/src/main/java/com/nucleodb/library/database/lock/LockManager.java b/src/main/java/com/nucleodb/library/database/lock/LockManager.java index 51f6b37..11e7f28 100644 --- a/src/main/java/com/nucleodb/library/database/lock/LockManager.java +++ b/src/main/java/com/nucleodb/library/database/lock/LockManager.java @@ -103,8 +103,8 @@ public Object put(@NotNull Object key, @NotNull Object value) { executorPool.schedule(()->{ LockReference lockReference = (LockReference)this.get(key); if(lockReference!=null && value instanceof LockReference && lockReference.getRequest().equals(((LockReference)value).getRequest())) { - logger.info( "EXPIRED"); - logger.info((String) key); + logger.log(Level.FINEST,"EXPIRED"); + logger.log(Level.FINEST,(String) key); try { LockReference activeLock = Serializer.getObjectMapper().getOm().readValue(Serializer.getObjectMapper().getOm().writeValueAsString(value), LockReference.class); activeLock.setLock(false); diff --git a/src/main/java/com/nucleodb/library/database/tables/connection/Connection.java b/src/main/java/com/nucleodb/library/database/tables/connection/Connection.java index 4063932..3c2f83c 100644 --- a/src/main/java/com/nucleodb/library/database/tables/connection/Connection.java +++ b/src/main/java/com/nucleodb/library/database/tables/connection/Connection.java @@ -19,7 +19,7 @@ import java.util.TreeMap; import java.util.UUID; -public class Connection implements Serializable, Comparable{ +public class Connection implements Serializable, Comparable{ @SkipCopy private static final long serialVersionUID = 1; @@ -60,7 +60,6 @@ public T copy(Class clazz, boolean lock) { this.connectionHandler.getConfig().getLabel(), uuid ); - System.out.println("unlocked "+lockReference.getRequest()); try { T obj = Serializer.getObjectMapper().getOm().readValue(Serializer.getObjectMapper().getOm().writeValueAsString(this), clazz); obj.setRequest(lockReference.getRequest()); @@ -70,6 +69,7 @@ public T copy(Class clazz, boolean lock) { e.printStackTrace(); } } catch (InterruptedException e) { + e.printStackTrace(); throw new RuntimeException(e); } }else{ diff --git a/src/main/java/com/nucleodb/library/database/tables/connection/ConnectionHandler.java b/src/main/java/com/nucleodb/library/database/tables/connection/ConnectionHandler.java index 60a9c47..520d068 100644 --- a/src/main/java/com/nucleodb/library/database/tables/connection/ConnectionHandler.java +++ b/src/main/java/com/nucleodb/library/database/tables/connection/ConnectionHandler.java @@ -65,24 +65,24 @@ import java.util.logging.Logger; import java.util.stream.Collectors; -public class ConnectionHandler implements Serializable{ +public class ConnectionHandler implements Serializable{ private static Logger logger = Logger.getLogger(ConnectionHandler.class.getName()); private static final long serialVersionUID = 1; @JsonIgnore - private transient Map> connections = new TreeMap<>(); + private transient Map> connections = new TreeMap<>(); @JsonIgnore - private transient Map> connectionsReverse = new TreeMap<>(); - private transient Set connectionFields = Arrays.stream(Connection.class.getDeclaredFields()).map(f -> f.getName()).collect(Collectors.toSet()); + private transient Map> connectionsReverse = new TreeMap<>(); + private transient Set connectionFields; @JsonIgnore - private transient Map connectionByUUID = new TreeMap<>(); + private transient Map connectionByUUID = new TreeMap<>(); private Map partitionOffsets = new TreeMap<>(); private String consumerId = UUID.randomUUID().toString(); - private Set allConnections = new TreeSetExt<>(); + private Set allConnections = new TreeSetExt<>(); @JsonIgnore private transient ExportHandler exportHandler; @JsonIgnore private transient NucleoDB nucleoDB; - private ConnectionConfig config; + private transient ConnectionConfig config; @JsonIgnore private transient ProducerHandler producer = null; @JsonIgnore @@ -91,7 +91,7 @@ public class ConnectionHandler implements Serializable{ @JsonIgnore private transient boolean inStartup = true; @JsonIgnore - private transient Cache> consumers = CacheBuilder.newBuilder() + private transient Cache> consumers = CacheBuilder.newBuilder() .maximumSize(10000) .softValues() .expireAfterWrite(5, TimeUnit.SECONDS) @@ -99,7 +99,7 @@ public class ConnectionHandler implements Serializable{ if (e.getCause().name().equals("EXPIRED")) { logger.info("EXPIRED " + e.getKey()); System.exit(1); - new Thread(() -> ((Consumer) e.getValue()).accept(null)).start(); + new Thread(() -> ((Consumer) e.getValue()).accept(null)).start(); ; } }) @@ -115,6 +115,8 @@ public ConnectionHandler(NucleoDB nucleoDB, ConnectionConfig config) throws Intr this.nucleoDB = nucleoDB; this.config = config; + this.connectionFields = Arrays.stream(config.getConnectionClass().getDeclaredFields()).map(f -> f.getName()).collect(Collectors.toSet()); + if (config.isLoadSaved()) { loadSavedData(); } @@ -139,6 +141,8 @@ public ConnectionHandler(NucleoDB nucleoDB, String bootstrap) throws Introspecti this.nucleoDB = nucleoDB; this.config = new ConnectionConfig(); + this.connectionFields = Arrays.stream(config.getConnectionClass().getDeclaredFields()).map(f -> f.getName()).collect(Collectors.toSet()); + if (config.isLoadSaved()) { loadSavedData(); } @@ -162,87 +166,89 @@ public ConnectionHandler(NucleoDB nucleoDB, String bootstrap) throws Introspecti public void loadSavedData() { - if (new File("./data/" + getConfig().getTopic() + ".dat").exists()) { + if (new File(config.getConnectionFileName()).exists()) { try { - ConnectionHandler tmpConnections = (ConnectionHandler) new ObjectFileReader().readObjectFromFile("./data/" + getConfig().getTopic() + ".dat"); - tmpConnections.allConnections.forEach(c -> this.addConnection(c)); + ConnectionHandler tmpConnections = (ConnectionHandler) new ObjectFileReader().readObjectFromFile(config.getConnectionFileName()); + tmpConnections.allConnections.forEach(c -> this.addConnection((C)c)); this.changed = tmpConnections.changed; this.consumerId = tmpConnections.getConsumerId(); this.partitionOffsets = tmpConnections.partitionOffsets; } catch (IOException e) { + e.printStackTrace(); throw new RuntimeException(e); } catch (ClassNotFoundException e) { + e.printStackTrace(); throw new RuntimeException(e); } } } - public Set getByFrom(DataEntry de, ConnectionProjection connectionProjection) { + public Set getByFrom(DataEntry de, ConnectionProjection connectionProjection) { if(connectionProjection ==null){ - connectionProjection = new ConnectionProjection(); + connectionProjection = new ConnectionProjection(); } - Set tmp = connections.get(de.getKey()); + Set tmp = connections.get(de.getKey()); if (tmp != null) { return connectionProjection.process(tmp.stream(), this.getConfig().getConnectionClass()); } return new TreeSetExt<>(); } - public Set getByFromAndTo(DataEntry from, DataEntry to, ConnectionProjection connectionProjection) { + public Set getByFromAndTo(DataEntry from, DataEntry to, ConnectionProjection connectionProjection) { if(connectionProjection == null){ - connectionProjection = new ConnectionProjection(); + connectionProjection = new ConnectionProjection(); } - Set tmp = connections.get(from.getKey() + to.getKey()); + Set tmp = connections.get(from.getKey() + to.getKey()); if (tmp != null) { return connectionProjection.process(tmp.stream(), this.getConfig().getConnectionClass()); } return new TreeSetExt<>(); } - public Set get(ConnectionProjection connectionProjection) { + public Set get(ConnectionProjection connectionProjection) { if(connectionProjection ==null){ - connectionProjection = new ConnectionProjection(); + connectionProjection = new ConnectionProjection(); } return connectionProjection.process(allConnections.stream(), this.getConfig().getConnectionClass()); } - public Set getReverseByTo(DataEntry to, ConnectionProjection connectionProjection) { + public Set getReverseByTo(DataEntry to, ConnectionProjection connectionProjection) { if(connectionProjection ==null){ - connectionProjection = new ConnectionProjection(); + connectionProjection = new ConnectionProjection(); } - Set tmp = connectionsReverse.get(to.getKey()); + Set tmp = connectionsReverse.get(to.getKey()); if (tmp != null) { return connectionProjection.process(tmp.stream(), this.getConfig().getConnectionClass()); } return new TreeSetExt<>(); } - public Set getReverseByFromAndTo(DataEntry de, DataEntry toDe, ConnectionProjection connectionProjection) { + public Set getReverseByFromAndTo(DataEntry de, DataEntry toDe, ConnectionProjection connectionProjection) { if(connectionProjection ==null){ - connectionProjection = new ConnectionProjection(); + connectionProjection = new ConnectionProjection(); } - Set tmp = connectionsReverse.get(de.getKey() + toDe.getKey()); + Set tmp = connectionsReverse.get(de.getKey() + toDe.getKey()); if (tmp != null) { return connectionProjection.process(tmp.stream(), this.getConfig().getConnectionClass()); } return new TreeSetExt<>(); } - private void putConnectionInKey(String key, Connection connection) { + private void putConnectionInKey(String key, C connection) { if (!connections.containsKey(key)) { connections.put(key, new TreeSetExt<>()); } connections.get(key).add(connection); } - private void putReverseConnectionInKey(String key, Connection connection) { + private void putReverseConnectionInKey(String key, C connection) { if (!connectionsReverse.containsKey(key)) { connectionsReverse.put(key, new TreeSetExt<>()); } connectionsReverse.get(key).add(connection); } - private void addConnection(Connection connection) { + private void addConnection(C connection) { synchronized (connections) { connection.connectionHandler = this; connectionByUUID.put(connection.getUuid(), connection); @@ -254,7 +260,7 @@ private void addConnection(Connection connection) { } } - private void removeByKey(String key, Connection connection) { + private void removeByKey(String key, C connection) { if (connections.containsKey(key)) { connections.get(key).remove(connection); if (connections.get(key).size() == 0) { @@ -263,7 +269,7 @@ private void removeByKey(String key, Connection connection) { } } - private void removeReverseByKey(String key, Connection connection) { + private void removeReverseByKey(String key, C connection) { if (connectionsReverse.containsKey(key)) { connectionsReverse.get(key).remove(connection); if (connectionsReverse.get(key).size() == 0) { @@ -272,7 +278,7 @@ private void removeReverseByKey(String key, Connection connection) { } } - private void removeConnection(Connection connection) { + private void removeConnection(C connection) { synchronized (connections) { connectionByUUID.remove(connection.getUuid()); this.removeByKey(connection.getFromKey(), connection); @@ -305,31 +311,31 @@ public void removeConnectionTo(DataEntry dataEntry) { allConnections.stream().filter(conn -> conn.getToKey().equals(dataEntry.getKey())).collect(Collectors.toSet()).forEach((c) -> removeConnection(c)); } - public Map> getConnections() { + public Map> getConnections() { return connections; } - public void setConnections(Map> connections) { + public void setConnections(Map> connections) { this.connections = connections; } - public Set getAllConnections() { + public Set getAllConnections() { return allConnections; } - public void setAllConnections(Set allConnections) { + public void setAllConnections(Set allConnections) { this.allConnections = allConnections; } - public boolean deleteAndForget(Connection connection) throws IOException { + public boolean deleteAndForget(C connection) throws IOException { return deleteInternalConsumer(connection, null); } - public void deleteAsync(Connection connection, Consumer consumer) throws IOException { + public void deleteAsync(C connection, Consumer consumer) throws IOException { deleteInternalConsumer(connection, consumer); } - public boolean deleteSync(Connection connection) throws IOException, InterruptedException { + public boolean deleteSync(C connection) throws IOException, InterruptedException { CountDownLatch countDownLatch = new CountDownLatch(1); deleteInternalConsumer(connection, (c) -> { countDownLatch.countDown(); @@ -338,14 +344,14 @@ public boolean deleteSync(Connection connection) throws IOException, Interrupted return true; } - public List invalidConnection(Connection c) { + public List invalidConnection(C c) { List invalids = new LinkedList<>(); if (c.getFromKey() == null) invalids.add("[FromKey]"); if (c.getToKey() == null) invalids.add("[ToKey]"); return invalids; } - public boolean saveAndForget(Connection connection) throws InvalidConnectionException { + public boolean saveAndForget(C connection) throws InvalidConnectionException { List invalids = this.invalidConnection(connection); if (invalids.size() > 0) { throw new InvalidConnectionException(invalids.stream().collect(Collectors.joining(", "))); @@ -353,7 +359,7 @@ public boolean saveAndForget(Connection connection) throws InvalidConnectionExce return saveInternalConsumer(connection, null); } - public boolean saveAsync(Connection connection, Consumer consumer) throws InvalidConnectionException { + public boolean saveAsync(C connection, Consumer consumer) throws InvalidConnectionException { List invalids = this.invalidConnection(connection); if (invalids.size() > 0) { throw new InvalidConnectionException(invalids.stream().collect(Collectors.joining(", "))); @@ -361,7 +367,7 @@ public boolean saveAsync(Connection connection, Consumer consumer) t return saveInternalConsumer(connection, consumer); } - public boolean saveSync(Connection connection) throws InvalidConnectionException, InterruptedException { + public boolean saveSync(C connection) throws InvalidConnectionException, InterruptedException { List invalids = this.invalidConnection(connection); if (invalids.size() > 0) { throw new InvalidConnectionException(invalids.stream().collect(Collectors.joining(", "))); @@ -375,7 +381,7 @@ public boolean saveSync(Connection connection) throws InvalidConnectionException } - private boolean deleteInternalConsumer(Connection connection, Consumer consumer) throws IOException { + private boolean deleteInternalConsumer(C connection, Consumer consumer) throws IOException { String changeUUID = UUID.randomUUID().toString(); if (consumer != null) { consumers.put(changeUUID, consumer); @@ -386,7 +392,7 @@ private boolean deleteInternalConsumer(Connection connection, Consumer consumer) { + private boolean saveInternalConsumer(C connection, Consumer consumer) { String changeUUID = UUID.randomUUID().toString(); if (consumer != null) { consumers.put(changeUUID, consumer); @@ -397,7 +403,7 @@ private boolean saveInternalConsumer(Connection connection, Consumer return false; } - private boolean saveInternalConsumerSync(Connection connection) throws InterruptedException { + private boolean saveInternalConsumerSync(C connection) throws InterruptedException { if (!this.config.isWrite()) { return false; } @@ -418,7 +424,7 @@ public void startup() { } } - private boolean deleteInternal(Connection connection, String changeUUID) throws IOException { + private boolean deleteInternal(C connection, String changeUUID) throws IOException { if (allConnections.contains(connection)) { connection.versionIncrease(); ConnectionDelete deleteEntry = new ConnectionDelete(changeUUID, connection); @@ -434,6 +440,7 @@ JsonNode fromObject(Object o){ Serializer.getObjectMapper().getOm().writeValueAsString(o) ); } catch (JsonProcessingException e) { + e.printStackTrace(); throw new RuntimeException(e); } } @@ -444,11 +451,12 @@ Object fromJsonNode(JsonNode o, Class type){ type ); } catch (JsonProcessingException e) { + e.printStackTrace(); throw new RuntimeException(e); } } - private boolean saveInternal(Connection connection, String changeUUID) { + private boolean saveInternal(C connection, String changeUUID) { if (!allConnections.contains(connection)) { ConnectionCreate createEntry = new ConnectionCreate(changeUUID, connection); producer.push(createEntry.getUuid(), createEntry.getVersion(), createEntry, null); @@ -456,11 +464,12 @@ private boolean saveInternal(Connection connection, String changeUUID) { } else { connection.versionIncrease(); List changes = null; - Connection oldConnection = connectionByUUID.get(connection.getUuid()); + C oldConnection = connectionByUUID.get(connection.getUuid()); if(oldConnection==null){ try { consumerResponse(null, changeUUID); } catch (ExecutionException e) { + e.printStackTrace(); throw new RuntimeException(e); } } @@ -476,10 +485,12 @@ private boolean saveInternal(Connection connection, String changeUUID) { try { consumerResponse(oldConnection, changeUUID); } catch (ExecutionException e) { + e.printStackTrace(); throw new RuntimeException(e); } } } catch (JsonProcessingException e) { + e.printStackTrace(); throw new RuntimeException(e); } } @@ -502,22 +513,23 @@ private void itemRequeue() { if (this.startupPhase.get()) this.startupLoadCount.incrementAndGet(); } - private void consumerResponse(Connection connection, String changeUUID) throws ExecutionException { + private void consumerResponse(C connection, String changeUUID) throws ExecutionException { try { getNucleoDB().getLockManager().releaseLock(this.config.getLabel(), connection.getUuid(), connection.getRequest()); if(changeUUID!=null) { - Consumer connectionConsumer = consumers.getIfPresent(changeUUID); + Consumer connectionConsumer = consumers.getIfPresent(changeUUID); if (connectionConsumer != null) { new Thread(() -> connectionConsumer.accept(connection)).start(); consumers.invalidate(changeUUID); } } } catch (CacheLoader.InvalidCacheLoadException e) { + e.printStackTrace(); } this.changed = new Date().getTime(); } - private void triggerEvent(Modify modify, Connection connection) { + private void triggerEvent(Modify modify, C connection) { ConnectionEventListener eventListener = config.getEventListener(); if(eventListener!=null) { if(modify instanceof ConnectionCreate){ @@ -550,7 +562,7 @@ public void modify(Modification mod, Object modification) throws ExecutionExcept } - Connection connection = (Connection) Serializer.getObjectMapper().getOm().readValue(c.getConnectionData(), getConfig().getConnectionClass()); + C connection = (C) Serializer.getObjectMapper().getOm().readValue(c.getConnectionData(), getConfig().getConnectionClass()); this.addConnection(connection); //Serializer.log("Connection added to db"); @@ -575,7 +587,7 @@ public void modify(Modification mod, Object modification) throws ExecutionExcept //System.exit(1); return; } - Connection conn = connectionByUUID.get(d.getUuid()); + C conn = connectionByUUID.get(d.getUuid()); if (conn != null) { if (conn.getVersion() >= d.getVersion()) { //logger.info("Ignore already saved change."); @@ -635,7 +647,7 @@ public void modify(Modification mod, Object modification) throws ExecutionExcept return; } try { - Connection conn = connectionByUUID.get(u.getUuid()); + C conn = connectionByUUID.get(u.getUuid()); if (conn != null) { if (conn.getVersion() >= u.getVersion()) { //logger.info("Ignore already saved change."); @@ -651,7 +663,7 @@ public void modify(Modification mod, Object modification) throws ExecutionExcept } } else { - Connection connectionTmp = (Connection) fromJsonNode( + C connectionTmp = (C) fromJsonNode( u.getChangesPatch().apply(fromObject(conn)), config.getConnectionClass() ); @@ -666,10 +678,13 @@ public void modify(Modification mod, Object modification) throws ExecutionExcept Object obj = propertyDescriptor.getReadMethod().invoke(connectionTmp); propertyDescriptor.getWriteMethod().invoke(conn, obj); } catch (IllegalAccessException e) { + e.printStackTrace(); throw new RuntimeException(e); } catch (InvocationTargetException e) { + e.printStackTrace(); throw new RuntimeException(e); } catch (IntrospectionException e) { + e.printStackTrace(); throw new RuntimeException(e); } } @@ -700,11 +715,11 @@ public void modify(Modification mod, Object modification) throws ExecutionExcept } } - public Map getConnectionByUUID() { + public Map getConnectionByUUID() { return connectionByUUID; } - public void setConnectionByUUID(Map connectionByUUID) { + public void setConnectionByUUID(Map connectionByUUID) { this.connectionByUUID = connectionByUUID; } @@ -756,11 +771,11 @@ public void setModqueue(Queue modqueue) { this.modqueue = modqueue; } - public Cache> getConsumers() { + public Cache> getConsumers() { return consumers; } - public void setConsumers(Cache> consumers) { + public void setConsumers(Cache> consumers) { this.consumers = consumers; } @@ -807,8 +822,4 @@ public void setStartupPhase(AtomicBoolean startupPhase) { public ExportHandler getExportHandler() { return exportHandler; } - - public void setExportHandler(ExportHandler exportHandler) { - this.exportHandler = exportHandler; - } } diff --git a/src/main/java/com/nucleodb/library/database/tables/connection/ConnectionProjection.java b/src/main/java/com/nucleodb/library/database/tables/connection/ConnectionProjection.java index b74364d..0aedad9 100644 --- a/src/main/java/com/nucleodb/library/database/tables/connection/ConnectionProjection.java +++ b/src/main/java/com/nucleodb/library/database/tables/connection/ConnectionProjection.java @@ -8,21 +8,21 @@ import java.util.stream.Collectors; import java.util.stream.Stream; -public class ConnectionProjection { +public class ConnectionProjection { Pagination pagination = null; - Predicate filter = null; + Predicate filter = null; - Comparator sort = null; + Comparator sort = null; boolean write = false; boolean lock = false; - public ConnectionProjection(Pagination pagination, Predicate filter) { + public ConnectionProjection(Pagination pagination, Predicate filter) { this.pagination = pagination; this.filter = filter; } - public ConnectionProjection(Pagination pagination, Predicate filter, Comparator sort) { + public ConnectionProjection(Pagination pagination, Predicate filter, Comparator sort) { this.pagination = pagination; this.filter = filter; this.sort = sort; @@ -32,14 +32,14 @@ public ConnectionProjection(Pagination pagination) { this.pagination = pagination; } - public ConnectionProjection(Predicate filter) { + public ConnectionProjection(Predicate filter) { this.filter = filter; } public ConnectionProjection() { } - public ConnectionProjection(Pagination pagination, Predicate filter, boolean write) { + public ConnectionProjection(Pagination pagination, Predicate filter, boolean write) { this.pagination = pagination; this.filter = filter; this.write = write; @@ -50,19 +50,19 @@ public ConnectionProjection(Pagination pagination, boolean write) { this.write = write; } - public ConnectionProjection(Predicate filter, boolean write) { + public ConnectionProjection(Predicate filter, boolean write) { this.filter = filter; this.write = write; } - public ConnectionProjection(Predicate filter, boolean write, boolean lock) { + public ConnectionProjection(Predicate filter, boolean write, boolean lock) { this.filter = filter; this.write = write; this.lock = lock; } - public Set process(Stream connectionStream, Class clazz){ - Stream connectionStreamTmp = connectionStream; + public Set process(Stream connectionStream, Class clazz){ + Stream connectionStreamTmp = connectionStream; if(this.filter!=null){ connectionStreamTmp = connectionStreamTmp.filter(this.filter); } @@ -73,14 +73,14 @@ public Set process(Stream connectionStream, Class(Connection)c.copy(clazz, lock)); + connectionStreamTmp = connectionStreamTmp.map(c->(C)c.copy(clazz, lock)); } return connectionStreamTmp.collect(Collectors.toSet()); } public void setPagination(Pagination pagination) { this.pagination = pagination; } - public void setFilter(Predicate filter) { + public void setFilter(Predicate filter) { this.filter = filter; } @@ -88,11 +88,11 @@ public void setWrite(boolean write) { this.write = write; } - public Comparator getSort() { + public Comparator getSort() { return sort; } - public void setSort(Comparator sort) { + public void setSort(Comparator sort) { this.sort = sort; } @@ -100,7 +100,7 @@ public Pagination getPagination() { return pagination; } - public Predicate getFilter() { + public Predicate getFilter() { return filter; } diff --git a/src/main/java/com/nucleodb/library/database/tables/connection/ExportHandler.java b/src/main/java/com/nucleodb/library/database/tables/connection/ExportHandler.java index b835559..316431d 100644 --- a/src/main/java/com/nucleodb/library/database/tables/connection/ExportHandler.java +++ b/src/main/java/com/nucleodb/library/database/tables/connection/ExportHandler.java @@ -11,9 +11,12 @@ import java.io.OutputStream; import java.nio.charset.StandardCharsets; import java.util.Queue; +import java.util.logging.Level; +import java.util.logging.Logger; import java.util.stream.Collectors; public class ExportHandler implements Runnable { + private static Logger logger = Logger.getLogger(ExportHandler.class.getName()); ConnectionHandler connectionHandler; Queue modifications = Queues.newConcurrentLinkedQueue(); @@ -28,7 +31,7 @@ public void run() { while (true) { try { if (this.connectionHandler.getChanged() > changedSaved) { - System.out.println("datatable export saved "+this.connectionHandler.getConfig().getLabel() ); + logger.log(Level.FINEST, "datatable export saved "+this.connectionHandler.getConfig().getLabel() ); OutputStream os = new FileOutputStream("./export/" + this.connectionHandler.getConfig().getLabel() + ".txt", true); String entry; while ((entry = modifications.poll()) != null) { diff --git a/src/main/java/com/nucleodb/library/database/tables/connection/SaveHandler.java b/src/main/java/com/nucleodb/library/database/tables/connection/SaveHandler.java index 75aedc9..0ea0f60 100644 --- a/src/main/java/com/nucleodb/library/database/tables/connection/SaveHandler.java +++ b/src/main/java/com/nucleodb/library/database/tables/connection/SaveHandler.java @@ -2,29 +2,33 @@ import com.nucleodb.library.database.utils.ObjectFileWriter; -public class SaveHandler implements Runnable{ +import java.util.logging.Level; +import java.util.logging.Logger; + +public class SaveHandler implements Runnable { + private static Logger logger = Logger.getLogger(SaveHandler.class.getName()); ConnectionHandler connectionHandler; public SaveHandler(ConnectionHandler connectionHandler) { - this.connectionHandler = connectionHandler; + this.connectionHandler = connectionHandler; } @Override public void run() { - long changedSaved = this.connectionHandler.getChanged(); + long changedSaved = this.connectionHandler.getChanged(); - while (true) { - try { - if (this.connectionHandler.getChanged() > changedSaved) { - System.out.println("Saved "+connectionHandler.getConfig().getConnectionFileName()); - new ObjectFileWriter().writeObjectToFile(this.connectionHandler, connectionHandler.getConfig().getConnectionFileName()); - changedSaved = this.connectionHandler.getChanged(); - } - Thread.sleep(this.connectionHandler.getConfig().getSaveInterval()); - } catch (Exception e) { - e.printStackTrace(); + while (true) { + try { + if (this.connectionHandler.getChanged() > changedSaved) { + logger.log(Level.FINEST,"Saved " + connectionHandler.getConfig().getConnectionFileName()); + new ObjectFileWriter().writeObjectToFile(this.connectionHandler, connectionHandler.getConfig().getConnectionFileName()); + changedSaved = this.connectionHandler.getChanged(); + } + Thread.sleep(this.connectionHandler.getConfig().getSaveInterval()); + } catch (Exception e) { + e.printStackTrace(); + } } - } } - } \ No newline at end of file +} \ No newline at end of file diff --git a/src/main/java/com/nucleodb/library/database/tables/table/DataEntry.java b/src/main/java/com/nucleodb/library/database/tables/table/DataEntry.java index 5534dba..87da0ed 100644 --- a/src/main/java/com/nucleodb/library/database/tables/table/DataEntry.java +++ b/src/main/java/com/nucleodb/library/database/tables/table/DataEntry.java @@ -11,6 +11,8 @@ import com.nucleodb.library.database.utils.Serializer; import com.nucleodb.library.database.utils.SkipCopy; import com.nucleodb.library.database.utils.Utils; +import com.nucleodb.library.database.utils.exceptions.ObjectNotSavedException; +import org.dizitart.no2.exceptions.ObjectMappingException; import org.jetbrains.annotations.NotNull; import java.io.Serializable; @@ -47,7 +49,10 @@ public DataEntry(Create create) throws ClassNotFoundException, JsonProcessingExc this.created = create.getTime(); } - public T copy(Class clazz, boolean lock) { + public T copy(Class clazz, boolean lock) throws ObjectNotSavedException { + if(this.dataTable==null & lock){ + throw new ObjectNotSavedException(this); + } if (lock){ // get lock try { diff --git a/src/main/java/com/nucleodb/library/database/tables/table/DataEntryProjection.java b/src/main/java/com/nucleodb/library/database/tables/table/DataEntryProjection.java index b5caf3e..e3aaec2 100644 --- a/src/main/java/com/nucleodb/library/database/tables/table/DataEntryProjection.java +++ b/src/main/java/com/nucleodb/library/database/tables/table/DataEntryProjection.java @@ -1,6 +1,7 @@ package com.nucleodb.library.database.tables.table; import com.nucleodb.library.database.utils.Pagination; +import com.nucleodb.library.database.utils.exceptions.ObjectNotSavedException; import javax.xml.crypto.Data; import java.util.Comparator; @@ -70,7 +71,13 @@ public Set process(Stream DataEntryStream, Class claz dataEntryStream = dataEntryStream.skip(this.pagination.getSkip()).limit(this.pagination.getLimit()); } if(isWritable()){ - dataEntryStream = dataEntryStream.map(de->(T) de.copy(clazz, lockUntilWrite)); + dataEntryStream = dataEntryStream.map(de-> { + try { + return (T) de.copy(clazz, lockUntilWrite); + } catch (ObjectNotSavedException e) { + throw new RuntimeException(e); + } + }); } return dataEntryStream.collect(Collectors.toSet()); } diff --git a/src/main/java/com/nucleodb/library/database/tables/table/ExportHandler.java b/src/main/java/com/nucleodb/library/database/tables/table/ExportHandler.java index d08cce9..3616f96 100644 --- a/src/main/java/com/nucleodb/library/database/tables/table/ExportHandler.java +++ b/src/main/java/com/nucleodb/library/database/tables/table/ExportHandler.java @@ -10,9 +10,12 @@ import java.io.OutputStream; import java.nio.charset.StandardCharsets; import java.util.Queue; +import java.util.logging.Level; +import java.util.logging.Logger; import java.util.stream.Collectors; public class ExportHandler implements Runnable { + private static Logger logger = Logger.getLogger(ExportHandler.class.getName()); DataTable dataTable; Queue modifications = Queues.newConcurrentLinkedQueue(); @@ -27,7 +30,7 @@ public void run() { while (true) { try { if (this.dataTable.getChanged() > changedSaved) { - System.out.println("datatable export saved "+this.dataTable.getConfig().getTable() ); + logger.log(Level.FINEST,"datatable export saved "+this.dataTable.getConfig().getTable() ); OutputStream os = new FileOutputStream("./export/" + this.dataTable.getConfig().getTable() + ".txt", true); String entry; while ((entry = modifications.poll()) != null) { diff --git a/src/main/java/com/nucleodb/library/database/tables/table/ModQueueHandler.java b/src/main/java/com/nucleodb/library/database/tables/table/ModQueueHandler.java index c73fe13..4ad9eda 100644 --- a/src/main/java/com/nucleodb/library/database/tables/table/ModQueueHandler.java +++ b/src/main/java/com/nucleodb/library/database/tables/table/ModQueueHandler.java @@ -30,8 +30,6 @@ public void run() { overkillCheck = false; } - System.out.println(leftTmp); - System.out.println(left); left = leftTmp; } try { diff --git a/src/main/java/com/nucleodb/library/database/tables/table/SaveHandler.java b/src/main/java/com/nucleodb/library/database/tables/table/SaveHandler.java index 39b4e88..9321cd3 100644 --- a/src/main/java/com/nucleodb/library/database/tables/table/SaveHandler.java +++ b/src/main/java/com/nucleodb/library/database/tables/table/SaveHandler.java @@ -2,7 +2,11 @@ import com.nucleodb.library.database.utils.ObjectFileWriter; +import java.util.logging.Level; +import java.util.logging.Logger; + public class SaveHandler implements Runnable{ + private static Logger logger = Logger.getLogger(SaveHandler.class.getName()); DataTable dataTable; @@ -16,7 +20,7 @@ public void run() { while (true) { try { if (this.dataTable.getChanged() > changedSaved) { - System.out.println("Saved " + this.dataTable.getConfig().getTableFileName()); + logger.log(Level.FINEST,"Saved " + this.dataTable.getConfig().getTableFileName()); new ObjectFileWriter().writeObjectToFile(this.dataTable, this.dataTable.getConfig().getTableFileName()); changedSaved = this.dataTable.getChanged(); } diff --git a/src/main/java/com/nucleodb/library/database/utils/exceptions/ObjectNotSavedException.java b/src/main/java/com/nucleodb/library/database/utils/exceptions/ObjectNotSavedException.java new file mode 100644 index 0000000..1acf97c --- /dev/null +++ b/src/main/java/com/nucleodb/library/database/utils/exceptions/ObjectNotSavedException.java @@ -0,0 +1,9 @@ +package com.nucleodb.library.database.utils.exceptions; + +import com.nucleodb.library.database.tables.table.DataEntry; + +public class ObjectNotSavedException extends Exception { + public ObjectNotSavedException(DataEntry e) { + super("Object Not Saved: "+e.key); + } +} diff --git a/src/main/java/com/nucleodb/library/database/utils/sql/SQLHandler.java b/src/main/java/com/nucleodb/library/database/utils/sql/SQLHandler.java index 9f57404..2f826b5 100644 --- a/src/main/java/com/nucleodb/library/database/utils/sql/SQLHandler.java +++ b/src/main/java/com/nucleodb/library/database/utils/sql/SQLHandler.java @@ -139,6 +139,7 @@ public static Function createComparatorFunction(String column){ columnField = new PropertyDescriptor(next, o.getClass()); o = columnField.getReadMethod().invoke(o); }catch (Exception e){ + e.printStackTrace(); o=null; break; } diff --git a/src/main/java/com/nucleodb/library/mqs/kafka/KafkaConsumerHandler.java b/src/main/java/com/nucleodb/library/mqs/kafka/KafkaConsumerHandler.java index a55e892..6a7ad6e 100644 --- a/src/main/java/com/nucleodb/library/mqs/kafka/KafkaConsumerHandler.java +++ b/src/main/java/com/nucleodb/library/mqs/kafka/KafkaConsumerHandler.java @@ -22,6 +22,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.logging.Level; import java.util.logging.Logger; import java.util.stream.Collectors; @@ -55,29 +56,25 @@ public void createTopics() { CountDownLatch countDownLatch = new CountDownLatch(1); try { ListTopicsResult listTopicsResult = client.listTopics(); - listTopicsResult.names().whenComplete((names, f) -> { - if (f != null) { - f.printStackTrace(); - } - if (names.stream().filter(name -> name.equals(topic)).count() == 0) { - logger.info(String.format("kafka topic not found for %s", topic)); - final NewTopic newTopic = new NewTopic(topic, ((KafkaSettings) this.getSettings()).getPartitions(), (short) ((KafkaSettings) this.getSettings()).getReplicas()); - newTopic.configs(new TreeMap<>() {{ - put(TopicConfig.RETENTION_MS_CONFIG, "-1"); - put(TopicConfig.RETENTION_MS_CONFIG, "-1"); - put(TopicConfig.RETENTION_BYTES_CONFIG, "-1"); - }}); - CreateTopicsResult createTopicsResult = client.createTopics(Collections.singleton(newTopic)); - createTopicsResult.all().whenComplete((c, e) -> { - if (e != null) { - e.printStackTrace(); - } - countDownLatch.countDown(); - }); - } else { + Set names = listTopicsResult.names().get(500, TimeUnit.MILLISECONDS); + if (names.stream().filter(name -> name.equals(topic)).count() == 0) { + logger.log(Level.FINEST,String.format("kafka topic not found for %s", topic)); + final NewTopic newTopic = new NewTopic(topic, ((KafkaSettings) this.getSettings()).getPartitions(), (short) ((KafkaSettings) this.getSettings()).getReplicas()); + newTopic.configs(new TreeMap<>() {{ + put(TopicConfig.RETENTION_MS_CONFIG, "-1"); + put(TopicConfig.RETENTION_MS_CONFIG, "-1"); + put(TopicConfig.RETENTION_BYTES_CONFIG, "-1"); + }}); + CreateTopicsResult createTopicsResult = client.createTopics(Collections.singleton(newTopic)); + createTopicsResult.all().whenComplete((c, e) -> { + if (e != null) { + e.printStackTrace(); + } countDownLatch.countDown(); - } - }); + }); + } else { + countDownLatch.countDown(); + } } catch (Exception e) { e.printStackTrace(); System.exit(-1); @@ -108,7 +105,7 @@ public void createTopics() { public void start(int queueHandlers) { this.threads = queueHandlers; kafkaConsumingThread = new Thread(this); - this.subscribe(new String[]{table.toLowerCase()}); + this.subscribe(new String[]{this.getSettings().getTable().toLowerCase()}); kafkaConsumingThread.start(); super.start(queueHandlers); } @@ -127,16 +124,18 @@ private boolean initialLoad() { return startupMap.size() == startupMap.entrySet().stream().filter(s -> getConsumer().position(s.getKey()) >= s.getValue()).count(); } - public void seek(String tableName, Map offsetMap) { + public void seek(Map offsetMap) { if (offsetMap.size() > 0) { offsetMap.entrySet().forEach(e -> { - TopicPartition tp = new TopicPartition(tableName.toLowerCase(), e.getKey()); - System.out.println(tp.toString() +" = "+ e.getValue()); + TopicPartition tp = new TopicPartition(this.getSettings().getTable().toLowerCase(), e.getKey()); + logger.log(Level.FINEST,tp + " = " + e.getValue()); consumer.seek(tp, e.getValue()); }); } } + Set assigned = new HashSet<>(); + @Override public void run() { @@ -150,7 +149,7 @@ public void run() { Map offsets = new HashMap<>(); if (databaseType) { offsets = getDatabase().getPartitionOffsets(); - while(assigned.size()(); super.setStartupLoadCount(new AtomicInteger(0)); } - Map offsetMetaMap = new HashMap<>(); try { do { @@ -187,10 +187,9 @@ public void run() { Map finalOffsets = offsets; rs.iterator().forEachRemaining(action -> { Long offsetAtPartition = finalOffsets.get(action.partition()); - if(offsetAtPartition != null && action.offset()<= offsetAtPartition) return; + if (offsetAtPartition != null && action.offset() <= offsetAtPartition) return; if (getStartupPhaseConsume().get()) getStartupLoadCount().incrementAndGet(); String pop = action.value(); - System.out.println(pop); //System.out.println("Change added to queue."); getQueue().add(pop); getLeftToRead().incrementAndGet(); @@ -211,9 +210,7 @@ public void run() { } //logger.info("consumed: "+leftToRead.get()); if (getStartupPhaseConsume().get() && initialLoad()) { - System.out.println("In initial load"); getStartupPhaseConsume().set(false); - System.out.println(getStartupLoadCount().get()); if (getStartupLoadCount().get() == 0) { if (connectionType) { getConnectionHandler().getStartupPhase().set(false); @@ -229,7 +226,7 @@ public void run() { } } } while (!Thread.interrupted()); - logger.info("Consumer interrupted " + (databaseType ? this.getDatabase().getConfig().getTable() : "connections")); + logger.log(Level.FINEST,"Consumer interrupted " + (databaseType ? this.getDatabase().getConfig().getTable() : "connections")); } catch (Exception e) { e.printStackTrace(); } @@ -255,13 +252,13 @@ public void subscribe(String[] topics) { consumer.subscribe(Arrays.asList(topics), new ConsumerRebalanceListener() { @Override public void onPartitionsRevoked(Collection collection) { - System.out.println("revoked: " + collection.stream().map(c -> c.topic() + c.partition()).collect(Collectors.joining(", "))); + logger.log(Level.FINEST,"revoked: " + collection.stream().map(c -> c.topic() + c.partition()).collect(Collectors.joining(", "))); } @Override public void onPartitionsAssigned(Collection collection) { - assigned = collection.stream().map(c->c.toString()).collect(Collectors.toSet()); - System.out.println("assigned: " + assigned.stream().collect(Collectors.joining(", "))); + assigned = collection.stream().map(c -> c.toString()).collect(Collectors.toSet()); + logger.log(Level.FINEST,"assigned: " + assigned.stream().collect(Collectors.joining(", "))); } }); diff --git a/src/main/java/com/nucleodb/library/utils/Serializer.java b/src/main/java/com/nucleodb/library/utils/Serializer.java index 7564df3..c5188a0 100644 --- a/src/main/java/com/nucleodb/library/utils/Serializer.java +++ b/src/main/java/com/nucleodb/library/utils/Serializer.java @@ -41,9 +41,9 @@ public static T read(byte[] data) { ois = new ObjectInputStream(bis); return (T) ois.readObject(); }catch (IOException e){ - //e.printStackTrace(); + e.printStackTrace(); } catch (ClassNotFoundException e) { - //e.printStackTrace(); + e.printStackTrace(); } finally { if(ois!=null) { try { diff --git a/src/test/java/com/nucleodb/library/CoreTest.java b/src/test/java/com/nucleodb/library/CoreTest.java index afaaacf..9e543c4 100644 --- a/src/test/java/com/nucleodb/library/CoreTest.java +++ b/src/test/java/com/nucleodb/library/CoreTest.java @@ -6,6 +6,7 @@ import com.nucleodb.library.database.utils.exceptions.IncorrectDataEntryClassException; import com.nucleodb.library.database.utils.exceptions.IncorrectDataEntryObjectException; import com.nucleodb.library.database.utils.exceptions.MissingDataEntryConstructorsException; +import com.nucleodb.library.database.utils.exceptions.ObjectNotSavedException; import com.nucleodb.library.helpers.models.Author; import com.nucleodb.library.helpers.models.AuthorDE; import com.nucleodb.library.mqs.local.LocalConfiguration; @@ -73,8 +74,12 @@ public void checkSavingWithoutChanges() throws IncorrectDataEntryObjectException assertEquals(1, dataEntrySet.size()); if(dataEntrySet.size()>0){ DataEntry dataEntry = dataEntrySet.iterator().next(); - dataEntry.copy(AuthorDE.class, false); - assertTrue(true); + try { + dataEntry.copy(AuthorDE.class, false); + } catch (ObjectNotSavedException e) { + throw new RuntimeException(e); + } + assertTrue(true); } }