Skip to content

Commit

Permalink
add local mqs for unit testing
Browse files Browse the repository at this point in the history
  • Loading branch information
firestar committed Dec 19, 2023
1 parent b60f825 commit 9a64a2a
Show file tree
Hide file tree
Showing 29 changed files with 1,311 additions and 565 deletions.
3 changes: 2 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ plugins {
}

group = 'com.nucleodb'
version = '1.13.13'
version = '1.13.14'

repositories {
mavenCentral()
Expand All @@ -29,6 +29,7 @@ compileJava {

test {
useJUnitPlatform()
environment "TEST2", "CHECK"
}

dependencies {
Expand Down
117 changes: 75 additions & 42 deletions src/main/java/com/nucleodb/library/NucleoDB.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package com.nucleodb.library;

import com.nucleodb.library.database.index.TreeIndex;
import com.nucleodb.library.database.modifications.Create;
import com.nucleodb.library.database.tables.annotation.Conn;
import com.nucleodb.library.database.tables.connection.ConnectionConfig;
Expand All @@ -24,14 +23,17 @@
import net.sf.jsqlparser.statement.select.Select;
import net.sf.jsqlparser.statement.update.Update;
import org.reflections.Reflections;

import java.beans.IntrospectionException;
import java.lang.annotation.Annotation;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.time.Instant;
import java.time.format.DateTimeParseException;
import java.util.*;
import java.util.concurrent.CountDownLatch;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.logging.Logger;

import static com.nucleodb.library.utils.EnvReplace.replaceEnvVariables;
Expand All @@ -47,26 +49,25 @@ public class NucleoDB{
public NucleoDB() {
}


public enum DBType{
NO_LOCAL,
READ_ONLY,
EXPORT,
ALL;
}

public NucleoDB(String bootstrap, String... packagesToScan) throws IncorrectDataEntryClassException, MissingDataEntryConstructorsException {
this(bootstrap, DBType.ALL, packagesToScan);
public NucleoDB(Consumer<ConnectionConfig> connectionCustomizer, Consumer<DataTableConfig> dataTableCustomizer, String... packagesToScan) throws IncorrectDataEntryClassException, MissingDataEntryConstructorsException, IntrospectionException, InvocationTargetException, NoSuchMethodException, InstantiationException, IllegalAccessException {
this(DBType.ALL, connectionCustomizer, dataTableCustomizer, packagesToScan);
}

public NucleoDB(String bootstrap, DBType dbType, String... packagesToScan) throws IncorrectDataEntryClassException, MissingDataEntryConstructorsException {
this(bootstrap, dbType, null, packagesToScan);
public NucleoDB(DBType dbType, Consumer<ConnectionConfig> connectionCustomizer, Consumer<DataTableConfig> dataTableCustomizer, String... packagesToScan) throws IncorrectDataEntryClassException, MissingDataEntryConstructorsException, IntrospectionException, InvocationTargetException, NoSuchMethodException, InstantiationException, IllegalAccessException {
this(dbType, null, connectionCustomizer, dataTableCustomizer, packagesToScan);
}
public NucleoDB(String bootstrap, DBType dbType, String upToTime, String... packagesToScan) throws IncorrectDataEntryClassException, MissingDataEntryConstructorsException {
startTables(replaceEnvVariables(bootstrap), packagesToScan, dbType, upToTime);
startConnections(replaceEnvVariables(bootstrap), packagesToScan, dbType, upToTime);
public NucleoDB(DBType dbType, String readToTime, Consumer<ConnectionConfig> connectionCustomizer, Consumer<DataTableConfig> dataTableCustomizer, String... packagesToScan) throws IncorrectDataEntryClassException, MissingDataEntryConstructorsException, IntrospectionException, InvocationTargetException, NoSuchMethodException, InstantiationException, IllegalAccessException {
startTables(packagesToScan, dbType, readToTime, dataTableCustomizer);
startConnections(packagesToScan, dbType, readToTime, connectionCustomizer);
}
private void startConnections(String bootstrap, String[] packagesToScan, DBType dbType, String readToTime){
private void startConnections(String[] packagesToScan, DBType dbType, String readToTime, Consumer<ConnectionConfig> customizer) throws IntrospectionException, InvocationTargetException, NoSuchMethodException, InstantiationException, IllegalAccessException {
logger.info("NucleoDB Connections Starting");
Optional<Set<Class<?>>> connectionTypesOptional = Arrays.stream(packagesToScan).map(packageToScan->new Reflections(replaceEnvVariables(packageToScan)).getTypesAnnotatedWith(Conn.class)).reduce((a, b)->{
a.addAll(b);
Expand All @@ -81,7 +82,6 @@ private void startConnections(String bootstrap, String[] packagesToScan, DBType
Conn connectionType = type.getAnnotation(Conn.class);
String topic = String.format("%ss",connectionType.value().toLowerCase());
ConnectionConfig config = new ConnectionConfig();
config.setBootstrap(bootstrap);
config.setTopic(topic);

Type[] actualTypeArguments = ((ParameterizedType) type.getGenericSuperclass()).getActualTypeArguments();
Expand All @@ -108,6 +108,7 @@ private void startConnections(String bootstrap, String[] packagesToScan, DBType
}
}
config.setLabel(connectionType.value().toUpperCase());
customizer.accept(config);
config.setStartupRun(new StartupRun(){
public void run(ConnectionHandler connectionHandler) {
latch.countDown();
Expand All @@ -134,7 +135,7 @@ public void run(ConnectionHandler connectionHandler) {
throw new RuntimeException(e);
}
}
private void startTables(String bootstrap, String[] packagesToScan, DBType dbType, String readToTime) throws IncorrectDataEntryClassException, MissingDataEntryConstructorsException {
private void startTables(String[] packagesToScan, DBType dbType, String readToTime, Consumer<DataTableConfig> customizer) throws IncorrectDataEntryClassException, MissingDataEntryConstructorsException {
logger.info("NucleoDB Tables Starting");
Optional<Set<Class<?>>> tableTypesOptional = Arrays.stream(packagesToScan).map(packageToScan->new Reflections(replaceEnvVariables(packageToScan)).getTypesAnnotatedWith(Table.class)).reduce((a, b)->{
a.addAll(b);
Expand Down Expand Up @@ -172,26 +173,26 @@ private void startTables(String bootstrap, String[] packagesToScan, DBType dbTyp
indexes.put(tableName, processIndexListForClass(type));

switch (dbType) {
case ALL -> tables.add(launchTable(bootstrap, tableName, dataEntryClass, type, new StartupRun(){
case ALL -> tables.add(launchTable(tableName, dataEntryClass, type, new StartupRun(){
public void run(DataTable table) {
latch.countDown();
}
}));
case NO_LOCAL -> tables.add(launchLocalOnlyTable(bootstrap, tableName, dataEntryClass, type, new StartupRun(){
}, customizer));
case NO_LOCAL -> tables.add(launchLocalOnlyTable(tableName, dataEntryClass, type, new StartupRun(){
public void run(DataTable table) {
latch.countDown();
}
}));
case READ_ONLY -> tables.add(launchReadOnlyTable(bootstrap, tableName, dataEntryClass, type, new StartupRun(){
}, customizer));
case READ_ONLY -> tables.add(launchReadOnlyTable(tableName, dataEntryClass, type, new StartupRun(){
public void run(DataTable table) {
latch.countDown();
}
}));
case EXPORT -> tables.add(launchExportOnlyTable(bootstrap, tableName, dataEntryClass, type, new StartupRun(){
}, customizer));
case EXPORT -> tables.add(launchExportOnlyTable(tableName, dataEntryClass, type, new StartupRun(){
public void run(DataTable table) {
latch.countDown();
}
}));
}, customizer));
}
}
tables.stream().forEach(table -> {
Expand All @@ -203,7 +204,19 @@ public void run(DataTable table) {
}
}
table.addIndexes(indexes.get(table.getConfig().getTable()));
table.build();
try {
table.build();
} catch (IntrospectionException e) {
throw new RuntimeException(e);
} catch (InvocationTargetException e) {
throw new RuntimeException(e);
} catch (NoSuchMethodException e) {
throw new RuntimeException(e);
} catch (InstantiationException e) {
throw new RuntimeException(e);
} catch (IllegalAccessException e) {
throw new RuntimeException(e);
}
});
try {

Expand Down Expand Up @@ -317,43 +330,63 @@ public DataTable getTable(Class clazz) {
return null;
}

public DataTableBuilder launchTable(String bootstrap, String table, Class dataEntryClass, Class clazz) {
return DataTableBuilder.create(bootstrap, table, clazz).setDataEntryClass(dataEntryClass).setDb(this);
public DataTableBuilder launchTable(String table, Class dataEntryClass, Class clazz, Consumer<DataTableConfig> customizer) {
DataTableBuilder dataTableBuilder = DataTableBuilder.create(table, clazz).setDataEntryClass(dataEntryClass).setDb(this);
customizer.accept(dataTableBuilder.getConfig());
return dataTableBuilder;
}

public DataTableBuilder launchTable(String bootstrap, String table, Class dataEntryClass, Class clazz, StartupRun runnable) {
return DataTableBuilder.create(bootstrap, table, clazz).setDataEntryClass(dataEntryClass).setDb(this).setStartupRun(runnable);
public DataTableBuilder launchTable(String table, Class dataEntryClass, Class clazz, StartupRun runnable, Consumer<DataTableConfig> customizer) {
DataTableBuilder dataTableBuilder = DataTableBuilder.create(table, clazz).setDataEntryClass(dataEntryClass).setDb(this).setStartupRun(runnable);
customizer.accept(dataTableBuilder.getConfig());
return dataTableBuilder;
}

public DataTableBuilder launchReadOnlyTable(String bootstrap, String table, Class dataEntryClass, Class clazz) {
return DataTableBuilder.createReadOnly(bootstrap, table, clazz).setDataEntryClass(dataEntryClass).setDb(this);
public DataTableBuilder launchReadOnlyTable(String table, Class dataEntryClass, Class clazz, Consumer<DataTableConfig> customizer) {
DataTableBuilder dataTableBuilder = DataTableBuilder.createReadOnly(table, clazz).setDataEntryClass(dataEntryClass).setDb(this);
customizer.accept(dataTableBuilder.getConfig());
return dataTableBuilder;
}

public DataTableBuilder launchReadOnlyTable(String bootstrap, String table, Class dataEntryClass, Class clazz, StartupRun startupRun) {
return DataTableBuilder.createReadOnly(bootstrap, table, clazz).setDataEntryClass(dataEntryClass).setDb(this).setStartupRun(startupRun);
public DataTableBuilder launchReadOnlyTable(String table, Class dataEntryClass, Class clazz, StartupRun startupRun, Consumer<DataTableConfig> customizer) {
DataTableBuilder dataTableBuilder = DataTableBuilder.createReadOnly(table, clazz).setDataEntryClass(dataEntryClass).setDb(this).setStartupRun(startupRun);
customizer.accept(dataTableBuilder.getConfig());
return dataTableBuilder;
}

public DataTableBuilder launchLocalOnlyTable(String bootstrap, String table, Class dataEntryClass, Class clazz) {
return DataTableBuilder.create(bootstrap, table, clazz).setDataEntryClass(dataEntryClass).setLoadSave(false).setSaveChanges(false).setDb(this);
public DataTableBuilder launchLocalOnlyTable(String table, Class dataEntryClass, Class clazz, Consumer<DataTableConfig> customizer) {
DataTableBuilder dataTableBuilder = DataTableBuilder.create(table, clazz).setDataEntryClass(dataEntryClass).setLoadSave(false).setSaveChanges(false).setDb(this);
customizer.accept(dataTableBuilder.getConfig());
return dataTableBuilder;
}

public DataTableBuilder launchLocalOnlyTable(String bootstrap, String table, Class dataEntryClass, Class clazz, StartupRun startupRun) {
return DataTableBuilder.create(bootstrap, table, clazz).setDataEntryClass(dataEntryClass).setLoadSave(false).setSaveChanges(false).setDb(this).setStartupRun(startupRun);
public DataTableBuilder launchLocalOnlyTable(String table, Class dataEntryClass, Class clazz, StartupRun startupRun, Consumer<DataTableConfig> customizer) {
DataTableBuilder dataTableBuilder = DataTableBuilder.create(table, clazz).setDataEntryClass(dataEntryClass).setLoadSave(false).setSaveChanges(false).setDb(this).setStartupRun(startupRun);
customizer.accept(dataTableBuilder.getConfig());
return dataTableBuilder;
}

public DataTableBuilder launchExportOnlyTable(String bootstrap, String table, Class dataEntryClass, Class clazz) {
return DataTableBuilder.create(bootstrap, table, clazz).setDataEntryClass(dataEntryClass).setJSONExport(true).setDb(this);
public DataTableBuilder launchExportOnlyTable(String table, Class dataEntryClass, Class clazz, Consumer<DataTableConfig> customizer) {
DataTableBuilder dataTableBuilder = DataTableBuilder.create(table, clazz).setDataEntryClass(dataEntryClass).setJSONExport(true).setDb(this);
customizer.accept(dataTableBuilder.getConfig());
return dataTableBuilder;
}
public DataTableBuilder launchExportOnlyTable(String bootstrap, String table, Class dataEntryClass, Class clazz, StartupRun startupRun) {
return DataTableBuilder.create(bootstrap, table, clazz).setDataEntryClass(dataEntryClass).setJSONExport(true).setDb(this).setStartupRun(startupRun);
public DataTableBuilder launchExportOnlyTable(String table, Class dataEntryClass, Class clazz, StartupRun startupRun, Consumer<DataTableConfig> customizer) {
DataTableBuilder dataTableBuilder = DataTableBuilder.create(table, clazz).setDataEntryClass(dataEntryClass).setJSONExport(true).setDb(this).setStartupRun(startupRun);
customizer.accept(dataTableBuilder.getConfig());
return dataTableBuilder;
}

public DataTableBuilder launchWriteOnlyTable(String bootstrap, String table, Class dataEntryClass, Class clazz) {
return DataTableBuilder.createWriteOnly(bootstrap, table, clazz).setDataEntryClass(dataEntryClass).setLoadSave(false).setDb(this);
public DataTableBuilder launchWriteOnlyTable(String table, Class dataEntryClass, Class clazz, Consumer<DataTableConfig> customizer) {
DataTableBuilder dataTableBuilder = DataTableBuilder.createWriteOnly(table, clazz).setDataEntryClass(dataEntryClass).setLoadSave(false).setDb(this);
customizer.accept(dataTableBuilder.getConfig());
return dataTableBuilder;
}

public DataTableBuilder launchWriteOnlyTable(String bootstrap, String table, Class dataEntryClass, Class clazz, StartupRun startupRun) {
return DataTableBuilder.createWriteOnly(bootstrap, table, clazz).setDataEntryClass(dataEntryClass).setLoadSave(false).setDb(this).setStartupRun(startupRun);
public DataTableBuilder launchWriteOnlyTable(String table, Class dataEntryClass, Class clazz, StartupRun startupRun, Consumer<DataTableConfig> customizer) {
DataTableBuilder dataTableBuilder = DataTableBuilder.createWriteOnly(table, clazz).setDataEntryClass(dataEntryClass).setLoadSave(false).setDb(this).setStartupRun(startupRun);
customizer.accept(dataTableBuilder.getConfig());
return dataTableBuilder;
}

public ConnectionHandler getConnectionHandler(Class clazz) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,5 @@
@Target(ElementType.TYPE)
public @interface Table {
String tableName();
Class dataEntryClass() default DataEntry.class;
Class<? extends DataEntry> dataEntryClass() default DataEntry.class;
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,13 @@

import com.fasterxml.jackson.annotation.JsonIgnore;
import com.nucleodb.library.database.utils.StartupRun;
import com.nucleodb.library.mqs.config.MQSConfiguration;
import com.nucleodb.library.mqs.kafka.KafkaConfiguration;

import java.io.Serializable;
import java.time.Instant;
import java.util.Map;
import java.util.TreeMap;

public class ConnectionConfig implements Serializable{
private static final long serialVersionUID = 1;
Expand All @@ -16,11 +20,15 @@ public class ConnectionConfig implements Serializable{
boolean saveChanges = true;
String topic;
String label;
String bootstrap = "127.0.0.1:19092";

Class connectionClass;
Class toTable;
Class fromTable;

MQSConfiguration mqsConfiguration = new KafkaConfiguration();

Map<String, Object> settingsMap = new TreeMap<>();

@JsonIgnore
private transient StartupRun startupRun = null;

Expand All @@ -43,14 +51,6 @@ public void setWrite(boolean write) {
this.write = write;
}

public String getBootstrap() {
return bootstrap;
}

public void setBootstrap(String bootstrap) {
this.bootstrap = bootstrap;
}

public StartupRun getStartupRun() {
return startupRun;
}
Expand Down Expand Up @@ -116,11 +116,13 @@ public void setFromTable(Class fromTable) {
}

public String getTopic() {

return topic;
}

public void setTopic(String topic) {
this.topic = topic;
this.settingsMap.put("table", topic);
}

public String getLabel() {
Expand All @@ -130,4 +132,20 @@ public String getLabel() {
public void setLabel(String label) {
this.label = label;
}

public MQSConfiguration getMqsConfiguration() {
return mqsConfiguration;
}

public void setMqsConfiguration(MQSConfiguration mqsConfiguration) {
this.mqsConfiguration = mqsConfiguration;
}

public Map<String, Object> getSettingsMap() {
return settingsMap;
}

public void setSettingsMap(Map<String, Object> settingsMap) {
this.settingsMap = settingsMap;
}
}
Loading

0 comments on commit 9a64a2a

Please sign in to comment.