Skip to content

Commit

Permalink
add working export, save resume
Browse files Browse the repository at this point in the history
  • Loading branch information
firestar committed Jun 24, 2024
1 parent f6f7dd3 commit 23e33fb
Show file tree
Hide file tree
Showing 18 changed files with 734 additions and 315 deletions.
47 changes: 42 additions & 5 deletions build.gradle
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
plugins {
id 'java'
id 'jvm-test-suite'
id 'java-library'
id 'maven-publish'
id 'jacoco'
}

group = 'com.nucleodb'
version = '1.16.1'
version = '1.17.0'

repositories {
mavenCentral()
Expand Down Expand Up @@ -53,10 +54,11 @@ dependencies {
api group: 'com.fasterxml.jackson.datatype', name:'jackson-datatype-jsr310', version:'2.15.2'
api 'com.github.jsqlparser:jsqlparser:4.6'

dependencies {
testImplementation 'org.junit.jupiter:junit-jupiter-api:5.8.1'
testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine:5.8.1'
}
testImplementation 'org.junit.jupiter:junit-jupiter-api:5.8.1'
testImplementation 'org.junit.jupiter:junit-jupiter-engine:5.8.1'
testImplementation "org.mockito:mockito-core:5.12.0"
testImplementation 'io.cucumber:cucumber-java:7.18.0'

}

task sourceJar(type: Jar) {
Expand All @@ -68,6 +70,41 @@ task javadocJar(type: Jar) {
archiveClassifier.set('javadoc')
}

testing {
suites {
test {
sources {
java {
srcDirs = ['src/test/java']
}
}
useJUnitJupiter()
}
integrationTest(JvmTestSuite) {
sources {
java {
srcDirs = ['src/integrationTest/java']
}
}
dependencies {
implementation project()
}
targets {
all {
testTask.configure {
shouldRunAfter(test)
}
}
}
}
}
}


tasks.named("check") {
dependsOn(testing.suites.integrationTest)
}

publishing {
publications {
mavenJava(MavenPublication) {
Expand Down
5 changes: 5 additions & 0 deletions docker/kafka/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ networks:
driver: bridge
services:
zookeeper:
restart: always
image: docker.io/bitnami/zookeeper:3.7
ports:
- "2181:2181"
Expand All @@ -13,6 +14,7 @@ services:
networks:
- nucleodb-network
ui:
restart: always
networks:
- nucleodb-network
image: provectuslabs/kafka-ui
Expand All @@ -28,6 +30,7 @@ services:
- KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=kafka1:9092
- DYNAMIC_CONFIG_ENABLED='true'
kafka1:
restart: always
image: bitnami/kafka:latest
ports:
- 29092:29092
Expand All @@ -43,6 +46,7 @@ services:
networks:
- nucleodb-network
kafka2:
restart: always
image: bitnami/kafka:latest
ports:
- 29093:29093
Expand All @@ -59,6 +63,7 @@ services:
- nucleodb-network
kafka3:
image: bitnami/kafka:latest
restart: always
ports:
- 29094:29094
environment:
Expand Down
106 changes: 106 additions & 0 deletions src/integrationTest/java/com/nucleodb/library/ExportTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
package com.nucleodb.library;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.nucleodb.library.database.tables.table.DataEntry;
import com.nucleodb.library.database.tables.table.DataEntryProjection;
import com.nucleodb.library.database.tables.table.DataTable;
import com.nucleodb.library.database.utils.Serializer;
import com.nucleodb.library.database.utils.exceptions.IncorrectDataEntryClassException;
import com.nucleodb.library.database.utils.exceptions.IncorrectDataEntryObjectException;
import com.nucleodb.library.database.utils.exceptions.MissingDataEntryConstructorsException;
import com.nucleodb.library.models.Author;
import com.nucleodb.library.models.AuthorDE;
import com.nucleodb.library.mqs.kafka.KafkaConfiguration;
import com.nucleodb.library.mqs.local.LocalConfiguration;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.beans.IntrospectionException;
import java.lang.reflect.InvocationTargetException;
import java.util.Set;
import java.util.stream.Collectors;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;

class ExportTest {
NucleoDB nucleoDB;
DataTable<AuthorDE> table;

@BeforeEach
public void createLocalDB() throws IncorrectDataEntryClassException, MissingDataEntryConstructorsException, IntrospectionException, InvocationTargetException, NoSuchMethodException, InstantiationException, IllegalAccessException, IncorrectDataEntryObjectException, InterruptedException {
nucleoDB = new NucleoDB(
NucleoDB.DBType.ALL,
c -> {
c.getConnectionConfig().setMqsConfiguration(new KafkaConfiguration());

c.getConnectionConfig().setLoadSaved(true);
c.getConnectionConfig().setJsonExport(true);
c.getConnectionConfig().setSaveChanges(true);
c.getConnectionConfig().setConnectionFileName("./data/connection.dat");
c.getConnectionConfig().setExportInterval(50);
c.getConnectionConfig().setSaveInterval(50);
},
c -> {
c.getDataTableConfig().setMqsConfiguration(new KafkaConfiguration());
c.getDataTableConfig().setLoadSave(true);
c.getDataTableConfig().setSaveChanges(true);
c.getDataTableConfig().setJsonExport(true);
c.getDataTableConfig().setTableFileName("./data/datatable.dat");
c.getDataTableConfig().setExportInterval(50);
c.getDataTableConfig().setSaveInterval(50);
},
c -> {
c.setMqsConfiguration(new KafkaConfiguration());
},
"com.nucleodb.library.models"
);
table = nucleoDB.getTable(Author.class);
System.out.println("STARTED");
table.saveSync(new AuthorDE(new Author("George Orwell", "science-fiction")));
}

public AuthorDE copy(AuthorDE authorDE) throws JsonProcessingException {
return Serializer.getObjectMapper().getOm().readValue(Serializer.getObjectMapper().getOm().writeValueAsString(authorDE), AuthorDE.class);
}
@AfterEach
public void deleteEntries() {
System.out.println("DONE");
table.getEntries().stream().map(author -> {
try {
return copy(author);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}).collect(Collectors.toSet()).forEach(author-> {
try {
table.deleteSync(author);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}

@Test
public void checkSaving() throws IncorrectDataEntryObjectException, InterruptedException {
AuthorDE edgarAllenPoe = new AuthorDE(new Author("Edgar Allen Poe", "fiction"));
table.saveSync(edgarAllenPoe);
assertEquals(
1,
table.get(
"id",
edgarAllenPoe.getKey(),
null
).size()
);
Thread.sleep(5000);
}

}
42 changes: 42 additions & 0 deletions src/integrationTest/java/com/nucleodb/library/models/Author.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package com.nucleodb.library.models;

import com.nucleodb.library.database.index.TrieIndex;
import com.nucleodb.library.database.index.annotation.Index;
import com.nucleodb.library.database.tables.annotation.Table;

import java.io.Serializable;
import java.util.UUID;

@Table(tableName = "authorIT", dataEntryClass = AuthorDE.class)
public class Author implements Serializable {
private static final long serialVersionUID = 1;
@Index(type = TrieIndex.class)
String name;

@Index
String areaOfInterest;

public Author() {
}

public Author(String name, String areaOfInterest) {
this.name = name;
this.areaOfInterest = areaOfInterest;
}

public String getName() {
return name;
}

public void setName(String name) {
this.name = name;
}

public String getAreaOfInterest() {
return areaOfInterest;
}

public void setAreaOfInterest(String areaOfInterest) {
this.areaOfInterest = areaOfInterest;
}
}
22 changes: 22 additions & 0 deletions src/integrationTest/java/com/nucleodb/library/models/AuthorDE.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package com.nucleodb.library.models;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.nucleodb.library.database.modifications.Create;
import com.nucleodb.library.database.tables.table.DataEntry;

public class AuthorDE extends DataEntry<Author>{
public AuthorDE(Author obj) {
super(obj);
}

public AuthorDE(Create create) throws ClassNotFoundException, JsonProcessingException {
super(create);
}

public AuthorDE() {
}

public AuthorDE(String key) {
super(key);
}
}
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
package com.nucleodb.library.database.tables.connection;

import com.fasterxml.jackson.annotation.JsonIgnore;
import com.nucleodb.library.database.tables.table.DataEntry;
import com.nucleodb.library.database.utils.StartupRun;
import com.nucleodb.library.event.ConnectionEventListener;
import com.nucleodb.library.event.DataTableEventListener;
import com.nucleodb.library.mqs.config.MQSConfiguration;
import com.nucleodb.library.mqs.kafka.KafkaConfiguration;

Expand All @@ -15,25 +13,24 @@

public class ConnectionConfig implements Serializable{
private static final long serialVersionUID = 1;

Instant readToTime = null;
boolean write = true;
boolean read = true;
boolean loadSaved = true;
boolean jsonExport = false;
long exportInterval = 5000;
boolean saveChanges = true;
long saveInterval = 5000;
String topic;
String label;

Class connectionClass;
Class toTable;
Class fromTable;

ConnectionEventListener<Connection> eventListener = null;

MQSConfiguration mqsConfiguration = new KafkaConfiguration();

Map<String, Object> settingsMap = new TreeMap<>();

String connectionFileName;
@JsonIgnore
private transient StartupRun startupRun = null;

Expand Down Expand Up @@ -161,4 +158,28 @@ public ConnectionEventListener<Connection> getEventListener() {
public void setEventListener(ConnectionEventListener<Connection> eventListener) {
this.eventListener = eventListener;
}

public long getSaveInterval() {
return saveInterval;
}

public void setSaveInterval(long saveInterval) {
this.saveInterval = saveInterval;
}

public long getExportInterval() {
return exportInterval;
}

public void setExportInterval(long exportInterval) {
this.exportInterval = exportInterval;
}

public String getConnectionFileName() {
return connectionFileName;
}

public void setConnectionFileName(String connectionFileName) {
this.connectionFileName = connectionFileName;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ public class ConnectionHandler implements Serializable{
private String consumerId = UUID.randomUUID().toString();
private Set<Connection> allConnections = new TreeSetExt<>();
@JsonIgnore
private transient ExportHandler exportHandler;
@JsonIgnore
private transient NucleoDB nucleoDB;
private ConnectionConfig config;
@JsonIgnore
Expand Down Expand Up @@ -128,7 +130,8 @@ public ConnectionHandler(NucleoDB nucleoDB, ConnectionConfig config) throws Intr
new Thread(new SaveHandler(this)).start();
}
if (config.isJsonExport()) {
new Thread(new ExportHandler(this)).start();
exportHandler = new ExportHandler(this);
new Thread(exportHandler).start();
}
}

Expand All @@ -152,7 +155,8 @@ public ConnectionHandler(NucleoDB nucleoDB, String bootstrap) throws Introspecti
}

if (config.isJsonExport()) {
new Thread(new ExportHandler(this)).start();
exportHandler = new ExportHandler(this);
new Thread(exportHandler).start();
}
}

Expand Down Expand Up @@ -799,4 +803,12 @@ public AtomicBoolean getStartupPhase() {
public void setStartupPhase(AtomicBoolean startupPhase) {
this.startupPhase = startupPhase;
}

public ExportHandler getExportHandler() {
return exportHandler;
}

public void setExportHandler(ExportHandler exportHandler) {
this.exportHandler = exportHandler;
}
}
Loading

0 comments on commit 23e33fb

Please sign in to comment.