Skip to content

Commit

Permalink
feat: implement setUpdateCallback(Consumer func) for Watcher interface (
Browse files Browse the repository at this point in the history
  • Loading branch information
seriouszyx authored Nov 5, 2023
1 parent 75a6a50 commit 660ae42
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 6 deletions.
12 changes: 6 additions & 6 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,15 @@
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
</dependency>
<!-- <dependency>-->
<!-- <groupId>org.slf4j</groupId>-->
<!-- <artifactId>slf4j-simple</artifactId>-->
<!-- <version>${slf4j.version}</version>-->
<!-- </dependency>-->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>org.casbin</groupId>
<artifactId>jcasbin</artifactId>
<version>1.6.3</version>
<version>1.13.3</version>
</dependency>
<dependency>
<groupId>junit</groupId>
Expand Down
10 changes: 10 additions & 0 deletions src/main/java/org/casbin/watcher/KafkaWatcher.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.function.Consumer;

public class KafkaWatcher implements Watcher{
private Runnable updateCallback;
private Consumer<String> updateConsumerCallback;
private final String localId;
private final Map<String,Object> producerProps;
private final Map<String,Object> consumerProps;
Expand All @@ -38,6 +40,11 @@ public void setUpdateCallback(Runnable runnable) {
this.updateCallback=runnable;
}

@Override
public void setUpdateCallback(Consumer<String> consumer) {
this.updateConsumerCallback = consumer;
}

@Override
public void update() {
KafkaProducer<String, String> producer = new KafkaProducer<>(producerProps);
Expand Down Expand Up @@ -72,6 +79,9 @@ public void startSub(){
log.info(String.format("topic: %s, partition: %s, offset: %s, key: %s, value: %s",
record.topic(), record.partition(), record.offset(), record.key(), record.value()));
updateCallback.run();
if (updateConsumerCallback != null)
updateConsumerCallback.accept(String.format("topic: %s, partition: %s, offset: %s, key: %s, value: %s",
record.topic(), record.partition(), record.offset(), record.key(), record.value()));
}
}
}
Expand Down
11 changes: 11 additions & 0 deletions src/test/java/KafkaWatcherTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,4 +47,15 @@ public void testUpdate() throws InterruptedException {
kafkaWatcher.update();
countDownLatch.await(1, TimeUnit.MINUTES);
}

@Test
public void testConsumerCallback() throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(1);
kafkaWatcher.setUpdateCallback((s)-> {
countDownLatch.countDown();
System.out.println("[callback]" + s);
});
kafkaWatcher.update();
countDownLatch.await(1, TimeUnit.MINUTES);
}
}

0 comments on commit 660ae42

Please sign in to comment.