diff --git a/pom.xml b/pom.xml index be93f89..4a592e5 100644 --- a/pom.xml +++ b/pom.xml @@ -21,15 +21,15 @@ slf4j-api ${slf4j.version} - - - - - + + org.slf4j + slf4j-simple + ${slf4j.version} + org.casbin jcasbin - 1.6.3 + 1.13.3 junit diff --git a/src/main/java/org/casbin/watcher/KafkaWatcher.java b/src/main/java/org/casbin/watcher/KafkaWatcher.java index c59a826..04ae677 100644 --- a/src/main/java/org/casbin/watcher/KafkaWatcher.java +++ b/src/main/java/org/casbin/watcher/KafkaWatcher.java @@ -16,9 +16,11 @@ import java.util.UUID; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; +import java.util.function.Consumer; public class KafkaWatcher implements Watcher{ private Runnable updateCallback; + private Consumer updateConsumerCallback; private final String localId; private final Map producerProps; private final Map consumerProps; @@ -38,6 +40,11 @@ public void setUpdateCallback(Runnable runnable) { this.updateCallback=runnable; } + @Override + public void setUpdateCallback(Consumer consumer) { + this.updateConsumerCallback = consumer; + } + @Override public void update() { KafkaProducer producer = new KafkaProducer<>(producerProps); @@ -72,6 +79,9 @@ public void startSub(){ log.info(String.format("topic: %s, partition: %s, offset: %s, key: %s, value: %s", record.topic(), record.partition(), record.offset(), record.key(), record.value())); updateCallback.run(); + if (updateConsumerCallback != null) + updateConsumerCallback.accept(String.format("topic: %s, partition: %s, offset: %s, key: %s, value: %s", + record.topic(), record.partition(), record.offset(), record.key(), record.value())); } } } diff --git a/src/test/java/KafkaWatcherTest.java b/src/test/java/KafkaWatcherTest.java index b9561f1..35f6369 100644 --- a/src/test/java/KafkaWatcherTest.java +++ b/src/test/java/KafkaWatcherTest.java @@ -47,4 +47,15 @@ public void testUpdate() throws InterruptedException { kafkaWatcher.update(); countDownLatch.await(1, TimeUnit.MINUTES); } + + @Test + public void testConsumerCallback() throws InterruptedException { + CountDownLatch countDownLatch = new CountDownLatch(1); + kafkaWatcher.setUpdateCallback((s)-> { + countDownLatch.countDown(); + System.out.println("[callback]" + s); + }); + kafkaWatcher.update(); + countDownLatch.await(1, TimeUnit.MINUTES); + } }