Skip to content

Commit

Permalink
add reload and reload consumer
Browse files Browse the repository at this point in the history
  • Loading branch information
firestar committed Oct 2, 2024
1 parent 877fc01 commit 8b42db8
Show file tree
Hide file tree
Showing 13 changed files with 345 additions and 220 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ plugins {
}

group = 'com.nucleodb'
version = '1.19.1'
version = '1.19.2'

repositories {
mavenCentral()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,7 @@
import java.util.Set;
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
Expand Down Expand Up @@ -377,6 +375,17 @@ public boolean saveSync(C connection) throws InvalidConnectionException, Interru
return true;
}

@JsonIgnore
private transient ExecutorService reloadExecutor = Executors.newSingleThreadExecutor();
@JsonIgnore
private transient Future<?> runningReload = null;

public boolean reload(Consumer reloadComplete) {
if(runningReload!=null && !runningReload.isDone()) return false;
runningReload = reloadExecutor.submit(this.consumer.reload(reloadComplete));
return true;
}


private boolean deleteInternalConsumer(C connection, Consumer<C> consumer) throws IOException {
String changeUUID = UUID.randomUUID().toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonNode;
import com.github.fge.jsonpatch.diff.JsonDiff;
import com.google.common.cache.Cache;
Expand Down Expand Up @@ -31,17 +30,13 @@
import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.util.*;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.logging.Logger;
import java.util.stream.Collectors;
import java.util.stream.Stream;

public class DataTable<T extends DataEntry> implements Serializable{
private static final long serialVersionUID = 1;
Expand All @@ -66,12 +61,13 @@ public class DataTable<T extends DataEntry> implements Serializable{
private transient Queue<Object[]> indexQueue = Queues.newLinkedBlockingQueue();
@JsonIgnore
private transient ProducerHandler producer = null;

@JsonIgnore
private transient ConsumerHandler consumer = null;
private transient AtomicInteger startupLoadCount = new AtomicInteger(0);
private transient AtomicBoolean startupPhase = new AtomicBoolean(true);

private transient Cache<String, Consumer<T>> consumers = CacheBuilder.newBuilder()
private transient Cache<String, Consumer<T>> changeListeners = CacheBuilder.newBuilder()
.maximumSize(10000)
.softValues()
.expireAfterWrite(5, TimeUnit.SECONDS)
Expand Down Expand Up @@ -165,7 +161,7 @@ public DataTable(DataTableConfig config) throws IntrospectionException, Invocati
addAll(Arrays.asList(config.getClazz().getSuperclass().getFields()));
}};

consume();
startRootConsumer();

if (config.isRead()) {
new Thread(new ModQueueHandler(this)).start();
Expand All @@ -180,7 +176,7 @@ public DataTable(DataTableConfig config) throws IntrospectionException, Invocati
}
}

public void consume() throws IntrospectionException, InvocationTargetException, NoSuchMethodException, InstantiationException, IllegalAccessException {
public void startRootConsumer() throws IntrospectionException, InvocationTargetException, NoSuchMethodException, InstantiationException, IllegalAccessException {
this.consumer = this.getConfig()
.getMqsConfiguration()
.createConsumerHandler(this.config.getSettingsMap());
Expand All @@ -194,6 +190,16 @@ public void consume() throws IntrospectionException, InvocationTargetException,
}
}

@JsonIgnore
private transient ExecutorService reloadExecutor = Executors.newSingleThreadExecutor();
@JsonIgnore
private transient Future<?> runningReload = null;
public boolean reload(Consumer reloadComplete) {
if(runningReload!=null && !runningReload.isDone()) return false;
runningReload = reloadExecutor.submit(this.consumer.reload(reloadComplete));
return true;
}

public void exportTo(DataTable tb) throws IncorrectDataEntryObjectException {
for (T de : this.entries) {
//Serializer.log("INSERTING " + de.getKey());
Expand All @@ -211,7 +217,7 @@ public void flush() {
} catch (Exception e) {
//e.printStackTrace();
}
consumers.cleanUp();
changeListeners.cleanUp();
listeners = new HashMap<>();
System.gc();
}
Expand Down Expand Up @@ -391,7 +397,7 @@ private boolean deleteInternalConsumer(T entry, Consumer<T> consumer) {
entry.versionIncrease();
Delete delete = new Delete(changeUUID, entry);
if (consumer != null) {
consumers.put(changeUUID, consumer);
changeListeners.put(changeUUID, consumer);
}
producer.push(delete.getKey(), delete.getVersion(), delete, null);
return true;
Expand Down Expand Up @@ -443,7 +449,7 @@ private boolean saveInternalConsumer(T entry, Consumer<T> consumer) {
}
String changeUUID = UUID.randomUUID().toString();
if (consumer != null) {
consumers.put(changeUUID, consumer);
changeListeners.put(changeUUID, consumer);
}
if (saveInternal(entry, changeUUID)) {
return true;
Expand Down Expand Up @@ -784,10 +790,10 @@ private void consumerResponse(T T, String changeUUID) throws ExecutionException
}
if (changeUUID != null) {

Consumer<T> TConsumer = consumers.getIfPresent(changeUUID);
Consumer<T> TConsumer = changeListeners.getIfPresent(changeUUID);
if (TConsumer != null) {
new Thread(() -> TConsumer.accept(T)).start();
consumers.invalidate(changeUUID);
changeListeners.invalidate(changeUUID);
}
}
} catch (CacheLoader.InvalidCacheLoadException e) {
Expand Down Expand Up @@ -903,12 +909,12 @@ public void setConsumer(ConsumerHandler consumer) {
this.consumer = consumer;
}

public Cache<String, Consumer<T>> getConsumers() {
return consumers;
public Cache<String, Consumer<T>> getChangeListeners() {
return changeListeners;
}

public void setConsumers(Cache<String, Consumer<T>> consumers) {
this.consumers = consumers;
public void setChangeListeners(Cache<String, Consumer<T>> changeListeners) {
this.changeListeners = changeListeners;
}

public Map<Modification, Set<Consumer<T>>> getListeners() {
Expand Down
49 changes: 26 additions & 23 deletions src/main/java/com/nucleodb/library/mqs/ConsumerHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@

import com.google.common.collect.Queues;
import com.nucleodb.library.database.lock.LockManager;
import com.nucleodb.library.database.modifications.Modify;
import com.nucleodb.library.database.tables.connection.ConnectionHandler;
import com.nucleodb.library.database.tables.table.DataTable;
import com.nucleodb.library.mqs.config.MQSSettings;
import com.nucleodb.library.mqs.exceptions.RequiredMethodNotImplementedException;
import org.apache.kafka.clients.consumer.Consumer;

import java.util.*;
import java.util.concurrent.ExecutorService;
Expand All @@ -24,41 +26,35 @@ public class ConsumerHandler implements Runnable{
private ConnectionHandler connectionHandler = null;

private LockManager lockManager = null;
private String table;
private String topic;
int startupItems = -1;
private Queue<String> queue = Queues.newConcurrentLinkedQueue();
private transient AtomicInteger leftToRead = new AtomicInteger(0);
private AtomicInteger startupLoadCount = new AtomicInteger(0);
private AtomicBoolean startupPhaseConsume = new AtomicBoolean(true);
boolean reloadConsumer = false;


private MQSSettings settings;

private ExecutorService queueTasks = Executors.newFixedThreadPool(60);


public ConsumerHandler(MQSSettings settings, String table) {
this.table = table;
public ConsumerHandler(MQSSettings settings) {
this.settings = settings;
}
public ConsumerHandler reload(java.util.function.Consumer completeCallback) {
ConsumerHandler reloadConsumerHandler = new ConsumerHandler(settings);
reloadConsumerHandler.setReloadConsumer(true);
return reloadConsumerHandler;
}
public void start(int queues){
for (int x = 0; x < queues; x++) {
Thread queueThread = new Thread(new QueueHandler(this));
queueTasks.submit(queueThread);
}
}

public void readFromStart() throws InterruptedException {
startupLoadCount.set(0);
queue.clear();
startupPhaseConsume.set(true);
leftToRead.set(0);
startupItems = -1;
queueTasks.shutdownNow();
queueTasks.awaitTermination(4, TimeUnit.SECONDS);
queueTasks = Executors.newFixedThreadPool(60);
}

@Override
public void run() {
try {
Expand Down Expand Up @@ -92,15 +88,6 @@ public ConnectionHandler getConnectionHandler() {
public void setConnectionHandler(ConnectionHandler connectionHandler) {
this.connectionHandler = connectionHandler;
}

public String getTable() {
return table;
}

public void setTable(String table) {
this.table = table;
}

public int getStartupItems() {
return startupItems;
}
Expand Down Expand Up @@ -153,7 +140,23 @@ public LockManager getLockManager() {
return lockManager;
}

public String getTopic() {
return topic;
}

public void setTopic(String topic) {
this.topic = topic;
}

public void setLockManager(LockManager lockManager) {
this.lockManager = lockManager;
}

public boolean isReloadConsumer() {
return reloadConsumer;
}

public void setReloadConsumer(boolean reloadConsumer) {
this.reloadConsumer = reloadConsumer;
}
}
13 changes: 6 additions & 7 deletions src/main/java/com/nucleodb/library/mqs/ProducerHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,10 @@
import org.apache.kafka.clients.producer.Callback;

public class ProducerHandler {
private String table;
private String topic;
private MQSSettings settings;

public ProducerHandler(MQSSettings settings, String table) {
this.table = table;
public ProducerHandler(MQSSettings settings) {
this.settings = settings;
}

Expand All @@ -33,12 +32,12 @@ public void push(String key, String message){
System.exit(1);
}

public String getTable() {
return table;
public String getTopic() {
return topic;
}

public void setTable(String table) {
this.table = table;
public void setTopic(String topic) {
this.topic = topic;
}

public MQSSettings getSettings() {
Expand Down
59 changes: 33 additions & 26 deletions src/main/java/com/nucleodb/library/mqs/QueueHandler.java
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
package com.nucleodb.library.mqs;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.nucleodb.library.database.lock.LockReference;
import com.nucleodb.library.database.modifications.Modification;
import com.nucleodb.library.database.modifications.Modify;
import com.nucleodb.library.database.utils.Serializer;
import com.nucleodb.library.mqs.kafka.KafkaConsumerHandler;

import java.io.Serial;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.ExecutionException;
import java.util.logging.Logger;

class QueueHandler implements Runnable{
Expand All @@ -30,30 +29,9 @@ public void run() {
this.consumerHandler.getLeftToRead().decrementAndGet();
try {
if (databaseType) {
String type = entry.substring(0, 6);
String data = entry.substring(6);
Modification mod = Modification.get(type);
if (mod != null) {
if (this.consumerHandler.getDatabase()!=null && this.consumerHandler.getDatabase().getConfig() != null && this.consumerHandler.getDatabase().getConfig().isJsonExport()) {
this.consumerHandler.getDatabase().getExportHandler().getModifications().add(entry);
}
this.consumerHandler.getDatabase().modify(mod, Serializer.getObjectMapper().getOm().readValue(data, mod.getModification()));
}
dataTableType(entry);
} else if (connectionType) {
String type = entry.substring(0, 16);
String data = entry.substring(16);
Modification mod = Modification.get(type);
if (mod != null) {
try {
Modify modifiedEntry = (Modify) Serializer.getObjectMapper().getOm().readValue(data, mod.getModification());
if (this.consumerHandler.getConnectionHandler()!=null && this.consumerHandler.getConnectionHandler().getConfig() != null && this.consumerHandler.getConnectionHandler().getConfig().isJsonExport()) {
this.consumerHandler.getConnectionHandler().getExportHandler().getModifications().add(entry);
}
this.consumerHandler.getConnectionHandler().modify(mod, modifiedEntry);
} catch (Exception e) {
e.printStackTrace();
}
}
connectionType(entry);
}else if(lockdownType){
logger.info("processing lockdown");
this.consumerHandler.getLockManager().lockAction(
Expand Down Expand Up @@ -83,4 +61,33 @@ public void run() {
}
}
}

private void dataTableType(String entry) throws ExecutionException, JsonProcessingException {
String type = entry.substring(0, 6);
String data = entry.substring(6);
Modification mod = Modification.get(type);
if (mod != null) {
if (this.consumerHandler.getDatabase()!=null && this.consumerHandler.getDatabase().getConfig() != null && this.consumerHandler.getDatabase().getConfig().isJsonExport()) {
this.consumerHandler.getDatabase().getExportHandler().getModifications().add(entry);
}
this.consumerHandler.getDatabase().modify(mod, Serializer.getObjectMapper().getOm().readValue(data, mod.getModification()));
}
}

private void connectionType(String entry) {
String type = entry.substring(0, 16);
String data = entry.substring(16);
Modification mod = Modification.get(type);
if (mod != null) {
try {
Modify modifiedEntry = (Modify) Serializer.getObjectMapper().getOm().readValue(data, mod.getModification());
if (this.consumerHandler.getConnectionHandler()!=null && this.consumerHandler.getConnectionHandler().getConfig() != null && this.consumerHandler.getConnectionHandler().getConfig().isJsonExport()) {
this.consumerHandler.getConnectionHandler().getExportHandler().getModifications().add(entry);
}
this.consumerHandler.getConnectionHandler().modify(mod, modifiedEntry);
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
Loading

0 comments on commit 8b42db8

Please sign in to comment.