Skip to content

Commit

Permalink
fix consuming
Browse files Browse the repository at this point in the history
  • Loading branch information
firestar committed Sep 20, 2024
1 parent 9654701 commit 7652741
Show file tree
Hide file tree
Showing 10 changed files with 130 additions and 72 deletions.
10 changes: 7 additions & 3 deletions src/integrationTest/java/com/nucleodb/library/ExportTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.lang.reflect.InvocationTargetException;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.stream.Collectors;

import static org.junit.jupiter.api.Assertions.assertEquals;
Expand All @@ -40,7 +41,7 @@ public void createLocalDB() throws IncorrectDataEntryClassException, MissingData
nucleoDB = new NucleoDB(
NucleoDB.DBType.ALL,
c -> {
c.getConnectionConfig().setMqsConfiguration(new KafkaConfiguration());
c.getConnectionConfig().setMqsConfiguration(new LocalConfiguration());
c.getConnectionConfig().setLoadSaved(true);
c.getConnectionConfig().setJsonExport(true);
c.getConnectionConfig().setSaveChanges(true);
Expand All @@ -49,7 +50,7 @@ public void createLocalDB() throws IncorrectDataEntryClassException, MissingData
c.getConnectionConfig().setSaveInterval(50);
},
c -> {
c.getDataTableConfig().setMqsConfiguration(new KafkaConfiguration());
c.getDataTableConfig().setMqsConfiguration(new LocalConfiguration());
c.getDataTableConfig().setLoadSave(true);
c.getDataTableConfig().setSaveChanges(true);
c.getDataTableConfig().setJsonExport(true);
Expand All @@ -58,10 +59,13 @@ public void createLocalDB() throws IncorrectDataEntryClassException, MissingData
c.getDataTableConfig().setSaveInterval(50);
},
c -> {
c.setMqsConfiguration(new KafkaConfiguration());
c.setMqsConfiguration(new LocalConfiguration());
},
"com.nucleodb.library.models"
);
nucleoDB.startConsuming();
nucleoDB.waitTillReady();

authorTable = nucleoDB.getTable(Author.class);
bookTable = nucleoDB.getTable(Book.class);
wroteConnections = nucleoDB.getConnectionHandler(WroteConnection.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import com.nucleodb.library.database.utils.exceptions.ObjectNotSavedException;
import com.nucleodb.library.models.*;
import com.nucleodb.library.mqs.kafka.KafkaConfiguration;
import com.nucleodb.library.mqs.local.LocalConfiguration;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand All @@ -22,6 +23,7 @@
import java.lang.reflect.InvocationTargetException;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.stream.Collectors;

import static org.junit.jupiter.api.Assertions.assertEquals;
Expand All @@ -37,7 +39,7 @@ public void createLocalDB() throws IncorrectDataEntryClassException, MissingData
nucleoDB = new NucleoDB(
NucleoDB.DBType.ALL,
c -> {
c.getConnectionConfig().setMqsConfiguration(new KafkaConfiguration());
c.getConnectionConfig().setMqsConfiguration(new LocalConfiguration());
c.getConnectionConfig().setLoadSaved(true);
c.getConnectionConfig().setJsonExport(true);
c.getConnectionConfig().setSaveChanges(true);
Expand Down Expand Up @@ -68,7 +70,7 @@ public <C extends Connection> boolean accept(String key) {
});
},
c -> {
c.getDataTableConfig().setMqsConfiguration(new KafkaConfiguration());
c.getDataTableConfig().setMqsConfiguration(new LocalConfiguration());
c.getDataTableConfig().setLoadSave(true);
c.getDataTableConfig().setSaveChanges(true);
c.getDataTableConfig().setJsonExport(true);
Expand Down Expand Up @@ -99,10 +101,12 @@ public <T extends DataEntry> boolean accept(String key) {
});
},
c -> {
c.setMqsConfiguration(new KafkaConfiguration());
c.setMqsConfiguration(new LocalConfiguration());
},
"com.nucleodb.library.models"
);
nucleoDB.startConsuming();
nucleoDB.waitTillReady();
authorTable = nucleoDB.getTable(Author.class);
bookTable = nucleoDB.getTable(Book.class);
wroteConnections = nucleoDB.getConnectionHandler(WroteConnection.class);
Expand Down Expand Up @@ -161,11 +165,6 @@ public void deleteEntries() {
throw new RuntimeException(e);
}
});
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
@Test
public void checkSaving() throws IncorrectDataEntryObjectException, InterruptedException {
Expand All @@ -180,7 +179,7 @@ public void checkSaving() throws IncorrectDataEntryObjectException, InterruptedE
null
).size()
);
assertEquals(2, authorTable.getEntries().size());
assertEquals(1, authorTable.getEntries().size());
}

}
129 changes: 86 additions & 43 deletions src/main/java/com/nucleodb/library/NucleoDB.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.nucleodb.library;

import com.google.common.collect.Queues;
import com.nucleodb.library.database.lock.LockConfig;
import com.nucleodb.library.database.lock.LockManager;
import com.nucleodb.library.database.modifications.Create;
Expand Down Expand Up @@ -56,6 +57,8 @@ public class NucleoDB {
private List<Consumer<DataTable>> tableEvents = new LinkedList<>();
private List<Consumer<ConnectionHandler>> connectionEvents = new LinkedList<>();

private Queue<CountDownLatch> latches = Queues.newArrayBlockingQueue(25);

public NucleoDB() {
}

Expand Down Expand Up @@ -88,15 +91,26 @@ public NucleoDB(DBType dbType, Consumer<ConnectionConsumer> connectionCustomizer

public NucleoDB(DBType dbType, String readToTime, String... packagesToScan) throws IncorrectDataEntryClassException, MissingDataEntryConstructorsException, IntrospectionException, InvocationTargetException, NoSuchMethodException, InstantiationException, IllegalAccessException {
startLockManager(null);
startTables(packagesToScan, dbType, readToTime, null);
startConnections(packagesToScan, dbType, readToTime, null);
CountDownLatch latch = startTables(packagesToScan, dbType, readToTime, null);
if(latch!=null){
latches.add(latch);
}
latch = startConnections(packagesToScan, dbType, readToTime, null);
if(latch!=null){
latches.add(latch);
}
}

public NucleoDB(DBType dbType, String readToTime, Consumer<ConnectionConsumer> connectionCustomizer, Consumer<DataTableConsumer> dataTableCustomizer, Consumer<LockConfig> lockCustomizer, String... packagesToScan) throws IncorrectDataEntryClassException, MissingDataEntryConstructorsException, IntrospectionException, InvocationTargetException, NoSuchMethodException, InstantiationException, IllegalAccessException {
startLockManager(lockCustomizer);
startTables(packagesToScan, dbType, readToTime, dataTableCustomizer);

startConnections(packagesToScan, dbType, readToTime, connectionCustomizer);
CountDownLatch latch = startTables(packagesToScan, dbType, readToTime, dataTableCustomizer);
if(latch!=null){
latches.add(latch);
}
latch = startConnections(packagesToScan, dbType, readToTime, connectionCustomizer);
if(latch!=null){
latches.add(latch);
}
}

public void startLockManager(Consumer<LockConfig> customizer) throws IntrospectionException, InvocationTargetException, NoSuchMethodException, InstantiationException, IllegalAccessException {
Expand Down Expand Up @@ -125,7 +139,7 @@ public Optional<Set<Class<?>>> getConnectionClasses(String[] packagesToScan) {
});
}

public void startConnection(Class<?> type, DBType dbType, String readToTime, Consumer<ConnectionConsumer> customizer) throws IntrospectionException, InvocationTargetException, NoSuchMethodException, InstantiationException, IllegalAccessException {
public CountDownLatch startConnection(Class<?> type, DBType dbType, String readToTime, Consumer<ConnectionConsumer> customizer) throws IntrospectionException, InvocationTargetException, NoSuchMethodException, InstantiationException, IllegalAccessException {

CountDownLatch latch = new CountDownLatch(1);
Conn connectionType = type.getAnnotation(Conn.class);
Expand Down Expand Up @@ -165,6 +179,7 @@ public void startConnection(Class<?> type, DBType dbType, String readToTime, Con
config.setStartupRun(new StartupRun() {
public void run(ConnectionHandler connectionHandler) {
latch.countDown();
connectionEvents.forEach(eventListener->eventListener.accept(connectionHandler));
}
});
switch (dbType) {
Expand All @@ -180,21 +195,15 @@ public void run(ConnectionHandler connectionHandler) {
ConnectionHandler connectionHandler = new ConnectionHandler(this, config);
connectionHandler.setName(connectionType.value().toUpperCase());
connections.put(connectionHandler.getName(), connectionHandler);

try {
latch.await();
connectionEvents.forEach(eventListener->eventListener.accept(connectionHandler));
logger.info("NucleoDB Connection[" + connectionType.value() + "] Started");
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
logger.info("NucleoDB Connection[" + connectionType.value() + "] ready to consume.");
return latch;
}

private void startConnections(String[] packagesToScan, DBType dbType, String readToTime, Consumer<ConnectionConsumer> customizer) throws IntrospectionException, InvocationTargetException, NoSuchMethodException, InstantiationException, IllegalAccessException {
private CountDownLatch startConnections(String[] packagesToScan, DBType dbType, String readToTime, Consumer<ConnectionConsumer> customizer) throws IntrospectionException, InvocationTargetException, NoSuchMethodException, InstantiationException, IllegalAccessException {
logger.info("NucleoDB Connections Starting");
Optional<Set<Class<?>>> connectionTypesOptional = getConnectionClasses(packagesToScan);
if (!connectionTypesOptional.isPresent()) {
return;
return null;
}
Set<Class<?>> connectionTypes = connectionTypesOptional.get();
CountDownLatch latch = new CountDownLatch(connectionTypes.size());
Expand Down Expand Up @@ -236,6 +245,7 @@ private void startConnections(String[] packagesToScan, DBType dbType, String rea
config.setStartupRun(new StartupRun() {
public void run(ConnectionHandler connectionHandler) {
latch.countDown();
connectionEvents.forEach(eventListener -> eventListener.accept(connectionHandler));
}
});
switch (dbType) {
Expand All @@ -253,18 +263,11 @@ public void run(ConnectionHandler connectionHandler) {
handlers.add(connectionHandler);
connections.put(connectionHandler.getName(), connectionHandler);
}

try {

latch.await();
handlers.forEach(handler->connectionEvents.forEach(eventListener->eventListener.accept(handler)));
logger.info("NucleoDB Connections Started");
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
logger.info("NucleoDB connections ready to consume.");
return latch;
}

public void startTable(Class<?> type, DBType dbType, String readToTime, Consumer<DataTableConsumer> customizer) throws IncorrectDataEntryClassException, MissingDataEntryConstructorsException {
public CountDownLatch startTable(Class<?> type, DBType dbType, String readToTime, Consumer<DataTableConsumer> customizer) throws IncorrectDataEntryClassException, MissingDataEntryConstructorsException {
Table tableAnnotation = type.getAnnotation(Table.class);
CountDownLatch latch = new CountDownLatch(1);
String tableName = tableAnnotation.tableName();
Expand Down Expand Up @@ -294,21 +297,25 @@ public void startTable(Class<?> type, DBType dbType, String readToTime, Consumer
case ALL -> launchTable(tableName, dataEntryClass, type, new StartupRun() {
public void run(DataTable table) {
latch.countDown();
tableEvents.forEach(eventListener->eventListener.accept(table));
}
}, customizer);
case NO_LOCAL -> launchLocalOnlyTable(tableName, dataEntryClass, type, new StartupRun() {
public void run(DataTable table) {
latch.countDown();
tableEvents.forEach(eventListener->eventListener.accept(table));
}
}, customizer);
case READ_ONLY -> launchReadOnlyTable(tableName, dataEntryClass, type, new StartupRun() {
public void run(DataTable table) {
latch.countDown();
tableEvents.forEach(eventListener->eventListener.accept(table));
}
}, customizer);
case EXPORT -> launchExportOnlyTable(tableName, dataEntryClass, type, new StartupRun() {
public void run(DataTable table) {
latch.countDown();
tableEvents.forEach(eventListener->eventListener.accept(table));
}
}, customizer);
};
Expand Down Expand Up @@ -337,14 +344,8 @@ public void run(DataTable table) {
} catch (IllegalAccessException e) {
throw new RuntimeException(e);
}
try {

latch.await();
logger.info("NucleoDB " + tableName + " Started");
tableEvents.forEach(eventListener->eventListener.accept(builtTable));
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
logger.info("NucleoDB " + tableName + " ready to consume.");
return latch;
}

public Optional<Set<Class<?>>> getTableClasses(String[] packagesToScan) {
Expand All @@ -354,11 +355,11 @@ public Optional<Set<Class<?>>> getTableClasses(String[] packagesToScan) {
});
}

private void startTables(String[] packagesToScan, DBType dbType, String readToTime, Consumer<DataTableConsumer> customizer) throws IncorrectDataEntryClassException, MissingDataEntryConstructorsException {
private CountDownLatch startTables(String[] packagesToScan, DBType dbType, String readToTime, Consumer<DataTableConsumer> customizer) throws IncorrectDataEntryClassException, MissingDataEntryConstructorsException {
logger.info("NucleoDB Tables Starting");
Optional<Set<Class<?>>> tableTypesOptional = getTableClasses(packagesToScan);
if (!tableTypesOptional.isPresent()) {
return;
return null;
}

Set<Class<?>> tableTypes = tableTypesOptional.get();
Expand Down Expand Up @@ -392,21 +393,25 @@ private void startTables(String[] packagesToScan, DBType dbType, String readToTi
case ALL -> tables.add(launchTable(tableName, dataEntryClass, type, new StartupRun() {
public void run(DataTable table) {
latch.countDown();
tableEvents.forEach(eventListener->eventListener.accept(table));
}
}, customizer));
case NO_LOCAL -> tables.add(launchLocalOnlyTable(tableName, dataEntryClass, type, new StartupRun() {
public void run(DataTable table) {
latch.countDown();
tableEvents.forEach(eventListener->eventListener.accept(table));
}
}, customizer));
case READ_ONLY -> tables.add(launchReadOnlyTable(tableName, dataEntryClass, type, new StartupRun() {
public void run(DataTable table) {
latch.countDown();
tableEvents.forEach(eventListener->eventListener.accept(table));
}
}, customizer));
case EXPORT -> tables.add(launchExportOnlyTable(tableName, dataEntryClass, type, new StartupRun() {
public void run(DataTable table) {
latch.countDown();
tableEvents.forEach(eventListener->eventListener.accept(table));
}
}, customizer));
}
Expand Down Expand Up @@ -439,13 +444,8 @@ public void run(DataTable table) {
throw new RuntimeException(e);
}
});
try {
latch.await();
handlers.forEach(handler->tableEvents.forEach(eventListener->eventListener.accept(handler)));
logger.info("NucleoDB Tables Started");
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
logger.info("NucleoDB Tables ready to consume.");
return latch;
}

private Set<DataTableConfig.IndexConfig> processIndexListForClass(Class<?> clazz) {
Expand Down Expand Up @@ -637,4 +637,47 @@ public void addConnectionEvent(Consumer<ConnectionHandler> connectionHandler){
public void addTableEvent(Consumer<DataTable> dataTableConsumer){
tableEvents.add(dataTableConsumer);
}

public void startConsuming(){
getConnections().values().forEach(c-> {
try {
c.consume();
} catch (IntrospectionException e) {
throw new RuntimeException(e);
} catch (InvocationTargetException e) {
throw new RuntimeException(e);
} catch (NoSuchMethodException e) {
throw new RuntimeException(e);
} catch (InstantiationException e) {
throw new RuntimeException(e);
} catch (IllegalAccessException e) {
throw new RuntimeException(e);
}
});
getTables().values().forEach(de-> {
try {
de.consume();
} catch (IntrospectionException e) {
throw new RuntimeException(e);
} catch (InvocationTargetException e) {
throw new RuntimeException(e);
} catch (NoSuchMethodException e) {
throw new RuntimeException(e);
} catch (InstantiationException e) {
throw new RuntimeException(e);
} catch (IllegalAccessException e) {
throw new RuntimeException(e);
}
});
}
public void waitTillReady() throws InterruptedException {
CountDownLatch c;
while((c=getLatches().poll())!=null){
c.await();
}
}

public Queue<CountDownLatch> getLatches() {
return latches;
}
}
Loading

0 comments on commit 7652741

Please sign in to comment.