From 8b42db87e0d99501ca0830ab54ba05fb47266212 Mon Sep 17 00:00:00 2001 From: Nathaniel Davidson Date: Wed, 2 Oct 2024 00:21:39 -0700 Subject: [PATCH] add reload and reload consumer --- build.gradle | 2 +- .../tables/connection/ConnectionHandler.java | 15 +- .../database/tables/table/DataTable.java | 42 ++- .../nucleodb/library/mqs/ConsumerHandler.java | 49 +-- .../nucleodb/library/mqs/ProducerHandler.java | 13 +- .../nucleodb/library/mqs/QueueHandler.java | 59 +-- .../library/mqs/config/MQSSettings.java | 8 +- .../library/mqs/kafka/KafkaConfiguration.java | 8 +- .../mqs/kafka/KafkaConsumerHandler.java | 335 ++++++++++++------ .../mqs/kafka/KafkaProducerHandler.java | 10 +- .../library/mqs/local/LocalConfiguration.java | 8 +- .../mqs/local/LocalConsumerHandler.java | 10 +- .../mqs/local/LocalProducerHandler.java | 6 +- 13 files changed, 345 insertions(+), 220 deletions(-) diff --git a/build.gradle b/build.gradle index 0854f27..35ca034 100644 --- a/build.gradle +++ b/build.gradle @@ -7,7 +7,7 @@ plugins { } group = 'com.nucleodb' -version = '1.19.1' +version = '1.19.2' repositories { mavenCentral() diff --git a/src/main/java/com/nucleodb/library/database/tables/connection/ConnectionHandler.java b/src/main/java/com/nucleodb/library/database/tables/connection/ConnectionHandler.java index 09c716d..1d89ce9 100644 --- a/src/main/java/com/nucleodb/library/database/tables/connection/ConnectionHandler.java +++ b/src/main/java/com/nucleodb/library/database/tables/connection/ConnectionHandler.java @@ -50,9 +50,7 @@ import java.util.Set; import java.util.TreeMap; import java.util.UUID; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -377,6 +375,17 @@ public boolean saveSync(C connection) throws InvalidConnectionException, Interru return true; } + @JsonIgnore + private transient ExecutorService reloadExecutor = Executors.newSingleThreadExecutor(); + @JsonIgnore + private transient Future runningReload = null; + + public boolean reload(Consumer reloadComplete) { + if(runningReload!=null && !runningReload.isDone()) return false; + runningReload = reloadExecutor.submit(this.consumer.reload(reloadComplete)); + return true; + } + private boolean deleteInternalConsumer(C connection, Consumer consumer) throws IOException { String changeUUID = UUID.randomUUID().toString(); diff --git a/src/main/java/com/nucleodb/library/database/tables/table/DataTable.java b/src/main/java/com/nucleodb/library/database/tables/table/DataTable.java index 8e3494a..2e18bcd 100644 --- a/src/main/java/com/nucleodb/library/database/tables/table/DataTable.java +++ b/src/main/java/com/nucleodb/library/database/tables/table/DataTable.java @@ -2,7 +2,6 @@ import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.JsonNode; import com.github.fge.jsonpatch.diff.JsonDiff; import com.google.common.cache.Cache; @@ -31,17 +30,13 @@ import java.lang.reflect.Field; import java.lang.reflect.InvocationTargetException; import java.util.*; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Consumer; import java.util.function.Function; import java.util.logging.Logger; -import java.util.stream.Collectors; -import java.util.stream.Stream; public class DataTable implements Serializable{ private static final long serialVersionUID = 1; @@ -66,12 +61,13 @@ public class DataTable implements Serializable{ private transient Queue indexQueue = Queues.newLinkedBlockingQueue(); @JsonIgnore private transient ProducerHandler producer = null; + @JsonIgnore private transient ConsumerHandler consumer = null; private transient AtomicInteger startupLoadCount = new AtomicInteger(0); private transient AtomicBoolean startupPhase = new AtomicBoolean(true); - private transient Cache> consumers = CacheBuilder.newBuilder() + private transient Cache> changeListeners = CacheBuilder.newBuilder() .maximumSize(10000) .softValues() .expireAfterWrite(5, TimeUnit.SECONDS) @@ -165,7 +161,7 @@ public DataTable(DataTableConfig config) throws IntrospectionException, Invocati addAll(Arrays.asList(config.getClazz().getSuperclass().getFields())); }}; - consume(); + startRootConsumer(); if (config.isRead()) { new Thread(new ModQueueHandler(this)).start(); @@ -180,7 +176,7 @@ public DataTable(DataTableConfig config) throws IntrospectionException, Invocati } } - public void consume() throws IntrospectionException, InvocationTargetException, NoSuchMethodException, InstantiationException, IllegalAccessException { + public void startRootConsumer() throws IntrospectionException, InvocationTargetException, NoSuchMethodException, InstantiationException, IllegalAccessException { this.consumer = this.getConfig() .getMqsConfiguration() .createConsumerHandler(this.config.getSettingsMap()); @@ -194,6 +190,16 @@ public void consume() throws IntrospectionException, InvocationTargetException, } } + @JsonIgnore + private transient ExecutorService reloadExecutor = Executors.newSingleThreadExecutor(); + @JsonIgnore + private transient Future runningReload = null; + public boolean reload(Consumer reloadComplete) { + if(runningReload!=null && !runningReload.isDone()) return false; + runningReload = reloadExecutor.submit(this.consumer.reload(reloadComplete)); + return true; + } + public void exportTo(DataTable tb) throws IncorrectDataEntryObjectException { for (T de : this.entries) { //Serializer.log("INSERTING " + de.getKey()); @@ -211,7 +217,7 @@ public void flush() { } catch (Exception e) { //e.printStackTrace(); } - consumers.cleanUp(); + changeListeners.cleanUp(); listeners = new HashMap<>(); System.gc(); } @@ -391,7 +397,7 @@ private boolean deleteInternalConsumer(T entry, Consumer consumer) { entry.versionIncrease(); Delete delete = new Delete(changeUUID, entry); if (consumer != null) { - consumers.put(changeUUID, consumer); + changeListeners.put(changeUUID, consumer); } producer.push(delete.getKey(), delete.getVersion(), delete, null); return true; @@ -443,7 +449,7 @@ private boolean saveInternalConsumer(T entry, Consumer consumer) { } String changeUUID = UUID.randomUUID().toString(); if (consumer != null) { - consumers.put(changeUUID, consumer); + changeListeners.put(changeUUID, consumer); } if (saveInternal(entry, changeUUID)) { return true; @@ -784,10 +790,10 @@ private void consumerResponse(T T, String changeUUID) throws ExecutionException } if (changeUUID != null) { - Consumer TConsumer = consumers.getIfPresent(changeUUID); + Consumer TConsumer = changeListeners.getIfPresent(changeUUID); if (TConsumer != null) { new Thread(() -> TConsumer.accept(T)).start(); - consumers.invalidate(changeUUID); + changeListeners.invalidate(changeUUID); } } } catch (CacheLoader.InvalidCacheLoadException e) { @@ -903,12 +909,12 @@ public void setConsumer(ConsumerHandler consumer) { this.consumer = consumer; } - public Cache> getConsumers() { - return consumers; + public Cache> getChangeListeners() { + return changeListeners; } - public void setConsumers(Cache> consumers) { - this.consumers = consumers; + public void setChangeListeners(Cache> changeListeners) { + this.changeListeners = changeListeners; } public Map>> getListeners() { diff --git a/src/main/java/com/nucleodb/library/mqs/ConsumerHandler.java b/src/main/java/com/nucleodb/library/mqs/ConsumerHandler.java index 4823dbe..caff599 100644 --- a/src/main/java/com/nucleodb/library/mqs/ConsumerHandler.java +++ b/src/main/java/com/nucleodb/library/mqs/ConsumerHandler.java @@ -2,10 +2,12 @@ import com.google.common.collect.Queues; import com.nucleodb.library.database.lock.LockManager; +import com.nucleodb.library.database.modifications.Modify; import com.nucleodb.library.database.tables.connection.ConnectionHandler; import com.nucleodb.library.database.tables.table.DataTable; import com.nucleodb.library.mqs.config.MQSSettings; import com.nucleodb.library.mqs.exceptions.RequiredMethodNotImplementedException; +import org.apache.kafka.clients.consumer.Consumer; import java.util.*; import java.util.concurrent.ExecutorService; @@ -24,12 +26,13 @@ public class ConsumerHandler implements Runnable{ private ConnectionHandler connectionHandler = null; private LockManager lockManager = null; - private String table; + private String topic; int startupItems = -1; private Queue queue = Queues.newConcurrentLinkedQueue(); private transient AtomicInteger leftToRead = new AtomicInteger(0); private AtomicInteger startupLoadCount = new AtomicInteger(0); private AtomicBoolean startupPhaseConsume = new AtomicBoolean(true); + boolean reloadConsumer = false; private MQSSettings settings; @@ -37,10 +40,14 @@ public class ConsumerHandler implements Runnable{ private ExecutorService queueTasks = Executors.newFixedThreadPool(60); - public ConsumerHandler(MQSSettings settings, String table) { - this.table = table; + public ConsumerHandler(MQSSettings settings) { this.settings = settings; } + public ConsumerHandler reload(java.util.function.Consumer completeCallback) { + ConsumerHandler reloadConsumerHandler = new ConsumerHandler(settings); + reloadConsumerHandler.setReloadConsumer(true); + return reloadConsumerHandler; + } public void start(int queues){ for (int x = 0; x < queues; x++) { Thread queueThread = new Thread(new QueueHandler(this)); @@ -48,17 +55,6 @@ public void start(int queues){ } } - public void readFromStart() throws InterruptedException { - startupLoadCount.set(0); - queue.clear(); - startupPhaseConsume.set(true); - leftToRead.set(0); - startupItems = -1; - queueTasks.shutdownNow(); - queueTasks.awaitTermination(4, TimeUnit.SECONDS); - queueTasks = Executors.newFixedThreadPool(60); - } - @Override public void run() { try { @@ -92,15 +88,6 @@ public ConnectionHandler getConnectionHandler() { public void setConnectionHandler(ConnectionHandler connectionHandler) { this.connectionHandler = connectionHandler; } - - public String getTable() { - return table; - } - - public void setTable(String table) { - this.table = table; - } - public int getStartupItems() { return startupItems; } @@ -153,7 +140,23 @@ public LockManager getLockManager() { return lockManager; } + public String getTopic() { + return topic; + } + + public void setTopic(String topic) { + this.topic = topic; + } + public void setLockManager(LockManager lockManager) { this.lockManager = lockManager; } + + public boolean isReloadConsumer() { + return reloadConsumer; + } + + public void setReloadConsumer(boolean reloadConsumer) { + this.reloadConsumer = reloadConsumer; + } } diff --git a/src/main/java/com/nucleodb/library/mqs/ProducerHandler.java b/src/main/java/com/nucleodb/library/mqs/ProducerHandler.java index 6e9f46a..03b4b84 100644 --- a/src/main/java/com/nucleodb/library/mqs/ProducerHandler.java +++ b/src/main/java/com/nucleodb/library/mqs/ProducerHandler.java @@ -6,11 +6,10 @@ import org.apache.kafka.clients.producer.Callback; public class ProducerHandler { - private String table; + private String topic; private MQSSettings settings; - public ProducerHandler(MQSSettings settings, String table) { - this.table = table; + public ProducerHandler(MQSSettings settings) { this.settings = settings; } @@ -33,12 +32,12 @@ public void push(String key, String message){ System.exit(1); } - public String getTable() { - return table; + public String getTopic() { + return topic; } - public void setTable(String table) { - this.table = table; + public void setTopic(String topic) { + this.topic = topic; } public MQSSettings getSettings() { diff --git a/src/main/java/com/nucleodb/library/mqs/QueueHandler.java b/src/main/java/com/nucleodb/library/mqs/QueueHandler.java index 54f1ad7..a06321d 100644 --- a/src/main/java/com/nucleodb/library/mqs/QueueHandler.java +++ b/src/main/java/com/nucleodb/library/mqs/QueueHandler.java @@ -1,13 +1,12 @@ package com.nucleodb.library.mqs; +import com.fasterxml.jackson.core.JsonProcessingException; import com.nucleodb.library.database.lock.LockReference; import com.nucleodb.library.database.modifications.Modification; import com.nucleodb.library.database.modifications.Modify; import com.nucleodb.library.database.utils.Serializer; -import com.nucleodb.library.mqs.kafka.KafkaConsumerHandler; -import java.io.Serial; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.ExecutionException; import java.util.logging.Logger; class QueueHandler implements Runnable{ @@ -30,30 +29,9 @@ public void run() { this.consumerHandler.getLeftToRead().decrementAndGet(); try { if (databaseType) { - String type = entry.substring(0, 6); - String data = entry.substring(6); - Modification mod = Modification.get(type); - if (mod != null) { - if (this.consumerHandler.getDatabase()!=null && this.consumerHandler.getDatabase().getConfig() != null && this.consumerHandler.getDatabase().getConfig().isJsonExport()) { - this.consumerHandler.getDatabase().getExportHandler().getModifications().add(entry); - } - this.consumerHandler.getDatabase().modify(mod, Serializer.getObjectMapper().getOm().readValue(data, mod.getModification())); - } + dataTableType(entry); } else if (connectionType) { - String type = entry.substring(0, 16); - String data = entry.substring(16); - Modification mod = Modification.get(type); - if (mod != null) { - try { - Modify modifiedEntry = (Modify) Serializer.getObjectMapper().getOm().readValue(data, mod.getModification()); - if (this.consumerHandler.getConnectionHandler()!=null && this.consumerHandler.getConnectionHandler().getConfig() != null && this.consumerHandler.getConnectionHandler().getConfig().isJsonExport()) { - this.consumerHandler.getConnectionHandler().getExportHandler().getModifications().add(entry); - } - this.consumerHandler.getConnectionHandler().modify(mod, modifiedEntry); - } catch (Exception e) { - e.printStackTrace(); - } - } + connectionType(entry); }else if(lockdownType){ logger.info("processing lockdown"); this.consumerHandler.getLockManager().lockAction( @@ -83,4 +61,33 @@ public void run() { } } } + + private void dataTableType(String entry) throws ExecutionException, JsonProcessingException { + String type = entry.substring(0, 6); + String data = entry.substring(6); + Modification mod = Modification.get(type); + if (mod != null) { + if (this.consumerHandler.getDatabase()!=null && this.consumerHandler.getDatabase().getConfig() != null && this.consumerHandler.getDatabase().getConfig().isJsonExport()) { + this.consumerHandler.getDatabase().getExportHandler().getModifications().add(entry); + } + this.consumerHandler.getDatabase().modify(mod, Serializer.getObjectMapper().getOm().readValue(data, mod.getModification())); + } + } + + private void connectionType(String entry) { + String type = entry.substring(0, 16); + String data = entry.substring(16); + Modification mod = Modification.get(type); + if (mod != null) { + try { + Modify modifiedEntry = (Modify) Serializer.getObjectMapper().getOm().readValue(data, mod.getModification()); + if (this.consumerHandler.getConnectionHandler()!=null && this.consumerHandler.getConnectionHandler().getConfig() != null && this.consumerHandler.getConnectionHandler().getConfig().isJsonExport()) { + this.consumerHandler.getConnectionHandler().getExportHandler().getModifications().add(entry); + } + this.consumerHandler.getConnectionHandler().modify(mod, modifiedEntry); + } catch (Exception e) { + e.printStackTrace(); + } + } + } } \ No newline at end of file diff --git a/src/main/java/com/nucleodb/library/mqs/config/MQSSettings.java b/src/main/java/com/nucleodb/library/mqs/config/MQSSettings.java index 52f8894..4088daf 100644 --- a/src/main/java/com/nucleodb/library/mqs/config/MQSSettings.java +++ b/src/main/java/com/nucleodb/library/mqs/config/MQSSettings.java @@ -8,10 +8,10 @@ public class MQSSettings{ ConsumerHandler consumerHandler; - String table; + String topic; public MQSSettings(Map objs) { - this.table = (String) objs.get("table"); + this.topic = (String) objs.get("topic"); this.consumerHandler = (ConsumerHandler) objs.get("consumerHandler"); } @@ -25,10 +25,10 @@ public void setConsumerHandler(ConsumerHandler consumerHandler) { } public String getTable() { - return table; + return topic; } public void setTable(String table) { - this.table = table; + this.topic = table; } } diff --git a/src/main/java/com/nucleodb/library/mqs/kafka/KafkaConfiguration.java b/src/main/java/com/nucleodb/library/mqs/kafka/KafkaConfiguration.java index fbb4e48..d0efa49 100644 --- a/src/main/java/com/nucleodb/library/mqs/kafka/KafkaConfiguration.java +++ b/src/main/java/com/nucleodb/library/mqs/kafka/KafkaConfiguration.java @@ -11,13 +11,13 @@ public KafkaConfiguration() { super( new MQSConstructorSettings<>( KafkaConsumerHandler.class, - new String[]{"servers", "groupName", "table"}, - new Class[]{MQSSettings.class, String.class, String.class, String.class} + new String[]{"servers", "groupName"}, + new Class[]{MQSSettings.class, String.class, String.class} ), new MQSConstructorSettings<>( KafkaProducerHandler.class, - new String[]{"servers", "table"}, - new Class[]{MQSSettings.class, String.class, String.class} + new String[]{"servers"}, + new Class[]{MQSSettings.class, String.class} ), KafkaSettings.class ); diff --git a/src/main/java/com/nucleodb/library/mqs/kafka/KafkaConsumerHandler.java b/src/main/java/com/nucleodb/library/mqs/kafka/KafkaConsumerHandler.java index 2ba66af..aae5d1a 100644 --- a/src/main/java/com/nucleodb/library/mqs/kafka/KafkaConsumerHandler.java +++ b/src/main/java/com/nucleodb/library/mqs/kafka/KafkaConsumerHandler.java @@ -1,6 +1,7 @@ package com.nucleodb.library.mqs.kafka; import com.fasterxml.jackson.core.JsonProcessingException; +import com.nucleodb.library.database.index.trie.Entry; import com.nucleodb.library.database.tables.connection.ConnectionHandler; import com.nucleodb.library.database.tables.table.DataTable; import com.nucleodb.library.database.utils.Serializer; @@ -30,27 +31,80 @@ public class KafkaConsumerHandler extends ConsumerHandler { private static Logger logger = Logger.getLogger(KafkaConsumerHandler.class.getName()); + private KafkaConsumer consumer = null; private ConnectionHandler connectionHandler = null; - private ExecutorService thread = Executors.newFixedThreadPool(1); + private ExecutorService thread = Executors.newFixedThreadPool(5); + private String groupName; + private String servers; + private java.util.function.Consumer> completeCallback; private int threads = 36; - private String table; - public KafkaConsumerHandler(MQSSettings settings, String servers, String groupName, String table) { - super(settings, table); + public KafkaConsumerHandler(MQSSettings settings, String servers, String groupName) { + super(settings); createTopics(); + this.servers = servers; + this.groupName = groupName; logger.info(servers + " using group id " + groupName); this.consumer = createConsumer(servers, groupName); - this.subscribe(new String[]{this.getSettings().getTable().toLowerCase()}); + } + public KafkaConsumerHandler(KafkaConsumerHandler kafkaConsumerHandler) { + super(kafkaConsumerHandler.getSettings()); + this.servers = kafkaConsumerHandler.getServers(); + this.groupName = UUID.randomUUID().toString(); + super.setDatabase(kafkaConsumerHandler.getDatabase()); + super.setConnectionHandler(kafkaConsumerHandler.getConnectionHandler()); + super.setQueue(kafkaConsumerHandler.getQueue()); + super.setTopic(kafkaConsumerHandler.getTopic()); + super.setLockManager(kafkaConsumerHandler.getLockManager()); + logger.info(servers + " using group id " + groupName); + this.consumer = createConsumer(servers, groupName); + + } + + public KafkaConsumerHandler reload(java.util.function.Consumer completeCallback) { + KafkaConsumerHandler kafkaConsumerHandler = new KafkaConsumerHandler(this); + kafkaConsumerHandler.setReloadConsumer(true); + this.completeCallback = completeCallback; + return kafkaConsumerHandler; + } + + private KafkaConsumer createConsumer(String bootstrap, String groupName) { + Properties props = new Properties(); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrap); + //System.out.println(groupName); + props.put(ConsumerConfig.GROUP_ID_CONFIG, groupName); + props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); + //props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 100); + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, ((KafkaSettings) getSettings()).getOffsetReset()); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + KafkaConsumer consumer = new KafkaConsumer(props); - this.table = table; + return consumer; } + public void subscribe(String[] topics) { + //System.out.println("Subscribed to topic " + Arrays.asList(topics).toString()); + consumer.subscribe(Arrays.asList(topics), new ConsumerRebalanceListener() { + @Override + public void onPartitionsRevoked(Collection collection) { + logger.log(Level.FINEST,"revoked: " + collection.stream().map(c -> c.topic() + c.partition()).collect(Collectors.joining(", "))); + } + + @Override + public void onPartitionsAssigned(Collection collection) { + assigned = collection.stream().map(c -> c.toString()).collect(Collectors.toSet()); + logger.log(Level.FINEST,"assigned: " + assigned.stream().collect(Collectors.joining(", "))); + + } + }); + } public void createTopics() { Properties props = new Properties(); KafkaSettings settings = (KafkaSettings) getSettings(); @@ -113,28 +167,6 @@ public void start(int queueHandlers) { super.start(queueHandlers); } - @Override - public synchronized void readFromStart() throws InterruptedException { - thread.shutdownNow(); - thread.awaitTermination(4, TimeUnit.SECONDS); - thread = Executors.newFixedThreadPool(1); - - boolean connectionType = this.getConnectionHandler() != null; - boolean databaseType = this.getDatabase() != null; - - if(connectionType){ - getConnectionHandler().setPartitionOffsets(new TreeMap<>()); - } - if(databaseType){ - getDatabase().setPartitionOffsets(new TreeMap<>()); - } - if(connectionType || databaseType) { - consumer.seekToBeginning(consumer.assignment()); - super.readFromStart(); - this.start(this.threads); - } - } - private Map startupMap = null; private boolean initialLoad() { @@ -160,15 +192,13 @@ public void seek(Map offsetMap) { Set assigned = new HashSet<>(); - @Override - public void run() { + + private void regularConsumer(){ boolean connectionType = this.getConnectionHandler() != null; boolean databaseType = this.getDatabase() != null; boolean lockManagerType = this.getLockManager() != null; boolean saveConnection = connectionType && this.getConnectionHandler().getConfig().isSaveChanges(); boolean saveDatabase = databaseType && this.getDatabase().getConfig().isSaveChanges(); - - Map offsets = new HashMap<>(); try { @@ -199,112 +229,168 @@ public void run() { super.setStartupLoadCount(new AtomicInteger(0)); } }catch (Exception e){} - Map offsetMetaMap = new HashMap<>(); - try { - do { - ConsumerRecords rs = getConsumer().poll(Duration.ofMillis(1000)); - if (rs.count() > 0) { - Map finalOffsets = offsets; - rs.iterator().forEachRemaining(action -> { - Long offsetAtPartition = finalOffsets.get(action.partition()); - if (offsetAtPartition != null && action.offset() <= offsetAtPartition) return; - if (getStartupPhaseConsume().get()) getStartupLoadCount().incrementAndGet(); - String pop = action.value(); - //System.out.println("Change added to queue."); - if (connectionType) { - if (this.getConnectionHandler().getConfig().getNodeFilter().accept(action.key())) { - getQueue().add(pop); - getLeftToRead().incrementAndGet(); - synchronized (getQueue()) { - getQueue().notifyAll(); - } - } - } - if (databaseType) { - if (this.getDatabase().getConfig().getNodeFilter().accept(action.key())) { - getQueue().add(pop); - getLeftToRead().incrementAndGet(); - synchronized (getQueue()) { - getQueue().notifyAll(); - } + Map offsetMetaMap = new HashMap<>(); + try { + do { + ConsumerRecords rs = getConsumer().poll(Duration.ofMillis(1000)); + if (rs.count() > 0) { + Map finalOffsets = offsets; + rs.iterator().forEachRemaining(action -> { + Long offsetAtPartition = finalOffsets.get(action.partition()); + if (offsetAtPartition != null && action.offset() <= offsetAtPartition) return; + if (getStartupPhaseConsume().get()) getStartupLoadCount().incrementAndGet(); + String pop = action.value(); + //System.out.println("Change added to queue."); + if (connectionType) { + if (this.getConnectionHandler().getConfig().getNodeFilter().accept(action.key())) { + getQueue().add(pop); + getLeftToRead().incrementAndGet(); + synchronized (getQueue()) { + getQueue().notifyAll(); } } - - if (lockManagerType) { + } + if (databaseType) { + if (this.getDatabase().getConfig().getNodeFilter().accept(action.key())) { getQueue().add(pop); getLeftToRead().incrementAndGet(); synchronized (getQueue()) { getQueue().notifyAll(); } } + } - if (saveConnection) - this.getConnectionHandler().getPartitionOffsets().put(action.partition(), action.offset()); - if (saveDatabase) - this.getDatabase().getPartitionOffsets().put(action.partition(), action.offset()); - offsetMetaMap.put(action.partition(), new OffsetAndMetadata(action.offset())); - }); - consumer.commitAsync(); - } + if (lockManagerType) { + getQueue().add(pop); + getLeftToRead().incrementAndGet(); + synchronized (getQueue()) { + getQueue().notifyAll(); + } + } - while (getStartupPhaseConsume().get() && getLeftToRead().get() > 50000) { - Thread.sleep(1000); + if (saveConnection) + this.getConnectionHandler().getPartitionOffsets().put(action.partition(), action.offset()); + if (saveDatabase) + this.getDatabase().getPartitionOffsets().put(action.partition(), action.offset()); + offsetMetaMap.put(action.partition(), new OffsetAndMetadata(action.offset())); + }); + consumer.commitAsync(); + } + + while (getStartupPhaseConsume().get() && getLeftToRead().get() > 50000) { + Thread.sleep(1000); + } + //logger.info("consumed: "+leftToRead.get()); + if (getStartupPhaseConsume().get() && initialLoad()) { + getStartupPhaseConsume().set(false); + if (getStartupLoadCount().get() == 0) { + if (connectionType) { + getConnectionHandler().getStartupPhase().set(false); + new Thread(() -> getConnectionHandler().startup()).start(); + } + if (databaseType) { + getDatabase().getStartupPhase().set(false); + new Thread(() -> getDatabase().startup()).start(); + } + if (lockManagerType) { + new Thread(() -> getLockManager().startup()).start(); + } } - //logger.info("consumed: "+leftToRead.get()); - if (getStartupPhaseConsume().get() && initialLoad()) { - getStartupPhaseConsume().set(false); - if (getStartupLoadCount().get() == 0) { - if (connectionType) { - getConnectionHandler().getStartupPhase().set(false); - new Thread(() -> getConnectionHandler().startup()).start(); + } + } while (!Thread.interrupted()); + logger.log(Level.FINEST, "Consumer interrupted " + (databaseType ? this.getDatabase().getConfig().getTable() : "connections")); + } catch (Exception e) { + //e.printStackTrace(); + } + } + + private void reloadConsumer(){ + + boolean connectionType = this.getConnectionHandler() != null; + boolean databaseType = this.getDatabase() != null; + boolean lockManagerType = this.getLockManager() != null; + + int partitions = getConsumer().partitionsFor(getTopic()).size(); + while (getConsumer().assignment().size() assignments = getConsumer().assignment(); + Map currentOffsets = new HashMap<>(); + Map endOffsets = new HashMap<>(); + Map endOffsetMap = getConsumer().endOffsets(assignments); + for (Map.Entry entry : endOffsetMap.entrySet()) { + endOffsets.put(entry.getKey().partition(), entry.getValue()); + } + boolean completedLoad = false; + try { + do { + ConsumerRecords rs = getConsumer().poll(Duration.ofMillis(1000)); + if (rs.count() > 0) { + for (ConsumerRecord action : rs) { + + if(endOffsets.get(action.partition()) getDatabase().startup()).start(); + } + if (databaseType) { + if (this.getDatabase().getConfig().getNodeFilter().accept(action.key())) { + getQueue().add(pop); + getLeftToRead().incrementAndGet(); + synchronized (getQueue()) { + getQueue().notifyAll(); + } } - if (lockManagerType) { - new Thread(() -> getLockManager().startup()).start(); + } + if (lockManagerType) { + getQueue().add(pop); + getLeftToRead().incrementAndGet(); + synchronized (getQueue()) { + getQueue().notifyAll(); } } } - } while (!Thread.interrupted()); - logger.log(Level.FINEST, "Consumer interrupted " + (databaseType ? this.getDatabase().getConfig().getTable() : "connections")); - } catch (Exception e) { - //e.printStackTrace(); - } - + consumer.commitAsync(); + } + completedLoad = true; + for (Map.Entry integerLongEntry : endOffsets.entrySet()) { + if(currentOffsets.get(integerLongEntry.getKey())>=integerLongEntry.getValue()) completedLoad=false; + } + } while (!Thread.interrupted() && !completedLoad); + } catch (Exception e) { + //e.printStackTrace(); + } + completeCallback.accept(currentOffsets); } - private KafkaConsumer createConsumer(String bootstrap, String groupName) { - Properties props = new Properties(); - props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrap); - //System.out.println(groupName); - props.put(ConsumerConfig.GROUP_ID_CONFIG, groupName); - props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); - //props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 100); - props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, ((KafkaSettings) getSettings()).getOffsetReset()); - props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); - props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); - KafkaConsumer consumer = new KafkaConsumer(props); - - return consumer; - } + @Override + public void run() { - public void subscribe(String[] topics) { - //System.out.println("Subscribed to topic " + Arrays.asList(topics).toString()); - consumer.subscribe(Arrays.asList(topics), new ConsumerRebalanceListener() { - @Override - public void onPartitionsRevoked(Collection collection) { - logger.log(Level.FINEST,"revoked: " + collection.stream().map(c -> c.topic() + c.partition()).collect(Collectors.joining(", "))); - } + this.subscribe(new String[]{this.getSettings().getTable().toLowerCase()}); - @Override - public void onPartitionsAssigned(Collection collection) { - assigned = collection.stream().map(c -> c.toString()).collect(Collectors.toSet()); - logger.log(Level.FINEST,"assigned: " + assigned.stream().collect(Collectors.joining(", "))); + if(isReloadConsumer()){ + regularConsumer(); + return; + } - } - }); + regularConsumer(); } public KafkaConsumer getConsumer() { @@ -323,4 +409,19 @@ public void setConnectionHandler(ConnectionHandler connectionHandler) { this.connectionHandler = connectionHandler; } + public String getGroupName() { + return groupName; + } + + public void setGroupName(String groupName) { + this.groupName = groupName; + } + + public String getServers() { + return servers; + } + + public void setServers(String servers) { + this.servers = servers; + } } \ No newline at end of file diff --git a/src/main/java/com/nucleodb/library/mqs/kafka/KafkaProducerHandler.java b/src/main/java/com/nucleodb/library/mqs/kafka/KafkaProducerHandler.java index a83830b..3bfe2be 100644 --- a/src/main/java/com/nucleodb/library/mqs/kafka/KafkaProducerHandler.java +++ b/src/main/java/com/nucleodb/library/mqs/kafka/KafkaProducerHandler.java @@ -28,8 +28,8 @@ public class KafkaProducerHandler extends ProducerHandler{ private KafkaProducer producer; - public KafkaProducerHandler(MQSSettings settings, String servers, String table) { - super(settings, table); + public KafkaProducerHandler(MQSSettings settings, String servers) { + super(settings); createTopics(); producer = createProducer(servers); } @@ -56,7 +56,7 @@ public void createTopics() { props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, settings.getServers()); AdminClient client = KafkaAdminClient.create(props); - String topic = getSettings().getTable().toLowerCase(); + String topic = super.getTopic(); CountDownLatch countDownLatch = new CountDownLatch(1); try { ListTopicsResult listTopicsResult = client.listTopics(); @@ -118,7 +118,7 @@ public void push(String key, long version, Modify modify, Callback callback){ new Thread(()-> { try { ProducerRecord record = new ProducerRecord( - getTable().toLowerCase(), + super.getTopic(), key, modify.getClass().getSimpleName() + Serializer.getObjectMapper().getOm().writeValueAsString(modify) ); @@ -143,7 +143,7 @@ public void push(String key, long version, Modify modify, Callback callback){ public void push(String key, String message){ try { ProducerRecord record = new ProducerRecord( - getTable().toLowerCase(), + super.getTopic(), key, message ); diff --git a/src/main/java/com/nucleodb/library/mqs/local/LocalConfiguration.java b/src/main/java/com/nucleodb/library/mqs/local/LocalConfiguration.java index c5844c8..6b968ab 100644 --- a/src/main/java/com/nucleodb/library/mqs/local/LocalConfiguration.java +++ b/src/main/java/com/nucleodb/library/mqs/local/LocalConfiguration.java @@ -14,13 +14,13 @@ public LocalConfiguration() { super( new MQSConstructorSettings<>( LocalConsumerHandler.class, - new String[]{"table"}, - new Class[]{MQSSettings.class, String.class} + new String[]{}, + new Class[]{MQSSettings.class} ), new MQSConstructorSettings<>( LocalProducerHandler.class, - new String[]{"consumerHandler", "table"}, - new Class[]{MQSSettings.class, LocalConsumerHandler.class, String.class} + new String[]{"consumerHandler"}, + new Class[]{MQSSettings.class, LocalConsumerHandler.class} ), MQSSettings.class ); diff --git a/src/main/java/com/nucleodb/library/mqs/local/LocalConsumerHandler.java b/src/main/java/com/nucleodb/library/mqs/local/LocalConsumerHandler.java index 269a125..7dbd485 100644 --- a/src/main/java/com/nucleodb/library/mqs/local/LocalConsumerHandler.java +++ b/src/main/java/com/nucleodb/library/mqs/local/LocalConsumerHandler.java @@ -11,21 +11,21 @@ public class LocalConsumerHandler extends ConsumerHandler{ private static Logger logger = Logger.getLogger(LocalConsumerHandler.class.getName()); - public LocalConsumerHandler(MQSSettings settings, String table) { - super(settings, table); - logger.info("local consumer handler started for "+table); + public LocalConsumerHandler(MQSSettings settings) { + super(settings); + logger.info("local consumer handler started for "+super.getTopic()); } @Override public void start(int queueHandlers) { super.getStartupPhaseConsume().set(false); if(getConnectionHandler()!=null){ - logger.info("startup for connection for "+getTable()); + logger.info("startup for connection for "+super.getTopic()); getConnectionHandler().getStartupPhase().set(false); new Thread(() -> getConnectionHandler().startup()).start(); } if(getDatabase()!=null){ - logger.info("startup for data table for "+getTable()); + logger.info("startup for data table for "+super.getTopic()); getDatabase().getStartupPhase().set(false); new Thread(() -> getDatabase().startup()).start(); } diff --git a/src/main/java/com/nucleodb/library/mqs/local/LocalProducerHandler.java b/src/main/java/com/nucleodb/library/mqs/local/LocalProducerHandler.java index 6912137..55e6005 100644 --- a/src/main/java/com/nucleodb/library/mqs/local/LocalProducerHandler.java +++ b/src/main/java/com/nucleodb/library/mqs/local/LocalProducerHandler.java @@ -11,10 +11,10 @@ public class LocalProducerHandler extends ProducerHandler{ private static Logger logger = Logger.getLogger(LocalProducerHandler.class.getName()); LocalConsumerHandler localConsumerHandler; - public LocalProducerHandler(MQSSettings settings, LocalConsumerHandler localConsumerHandler, String table) { - super(settings, table); + public LocalProducerHandler(MQSSettings settings, LocalConsumerHandler localConsumerHandler) { + super(settings); this.localConsumerHandler = localConsumerHandler; - logger.info("local producer handler started for "+table); + logger.info("local producer handler started for "+super.getTopic()); } @Override