diff --git a/src/main/java/org/casbin/watcher/lettuce/LettuceRedisWatcher.java b/src/main/java/org/casbin/watcher/lettuce/LettuceRedisWatcher.java index 2776049..7ac0f4d 100644 --- a/src/main/java/org/casbin/watcher/lettuce/LettuceRedisWatcher.java +++ b/src/main/java/org/casbin/watcher/lettuce/LettuceRedisWatcher.java @@ -89,7 +89,11 @@ public void update() { if (statefulRedisPubSubConnection.isOpen()) { String msg = "Casbin policy has a new version from redis watcher: ".concat(this.localId); statefulRedisPubSubConnection.async().publish(this.redisChannelName, msg); + + Thread.sleep(100); } + } catch (InterruptedException e) { + throw new RuntimeException("Publish error! The localId: " + this.localId, e); } } @@ -110,8 +114,6 @@ private void startSub() { * @return AbstractRedisClient */ private AbstractRedisClient getLettuceRedisClient(String host, Integer port, String nodes, String password, int timeout, String type) { - // todo default standalone ? - // type = StringUtils.isEmpty(type) ? WatcherConstant.LETTUCE_REDIS_TYPE_STANDALONE : type; if (StringUtils.isNotEmpty(type) && StringUtils.equalsAnyIgnoreCase(type, WatcherConstant.LETTUCE_REDIS_TYPE_STANDALONE, WatcherConstant.LETTUCE_REDIS_TYPE_CLUSTER)) { ClientResources clientResources = DefaultClientResources.builder() @@ -146,9 +148,9 @@ private AbstractRedisClient getLettuceRedisClient(String host, Integer port, Str // cluster TimeoutOptions timeoutOptions = TimeoutOptions.builder().fixedTimeout(Duration.of(timeout, ChronoUnit.SECONDS)).build(); ClusterTopologyRefreshOptions topologyRefreshOptions = ClusterTopologyRefreshOptions.builder() - .enablePeriodicRefresh(Duration.of(10, ChronoUnit.MINUTES)) - .enableAdaptiveRefreshTrigger(ClusterTopologyRefreshOptions.RefreshTrigger.MOVED_REDIRECT, ClusterTopologyRefreshOptions.RefreshTrigger.PERSISTENT_RECONNECTS) - .adaptiveRefreshTriggersTimeout(Duration.of(30, ChronoUnit.SECONDS)) + .enablePeriodicRefresh(Duration.ofMinutes(10)) + .enableAdaptiveRefreshTrigger(ClusterTopologyRefreshOptions.RefreshTrigger.values()) + .adaptiveRefreshTriggersTimeout(Duration.ofSeconds(30)) .build(); ClusterClientOptions clusterClientOptions = ClusterClientOptions.builder() .autoReconnect(true) @@ -163,7 +165,7 @@ private AbstractRedisClient getLettuceRedisClient(String host, Integer port, Str WatcherConstant.REDIS_URI_PREFIX.concat(nodes); logger.info("Redis Cluster Uri: {}", redisUri); List redisURIList = RedisClusterURIUtil.toRedisURIs(URI.create(redisUri)); - RedisClusterClient redisClusterClient = RedisClusterClient.create(clientResources, redisURIList); + RedisClusterClient redisClusterClient = RedisClusterClient.create(clientResources, redisURIList.get(0)); redisClusterClient.setOptions(clusterClientOptions); return redisClusterClient; } diff --git a/src/test/java/org/casbin/test/LettuceRedisWatcherTest.java b/src/test/java/org/casbin/test/LettuceRedisWatcherTest.java index 71ee15d..e3069ab 100644 --- a/src/test/java/org/casbin/test/LettuceRedisWatcherTest.java +++ b/src/test/java/org/casbin/test/LettuceRedisWatcherTest.java @@ -25,7 +25,7 @@ public void initWatcher() { public void initClusterWatcher() { String redisTopic = "jcasbin-topic"; - // modify your cluster nodes + // modify your cluster nodes. any one of these nodes. this.lettuceRedisWatcher = new LettuceRedisWatcher("192.168.1.234:6380,192.168.1.234:6381,192.168.1.234:6382", redisTopic, 2000, "123456"); Enforcer enforcer = new Enforcer(); enforcer.setWatcher(this.lettuceRedisWatcher); @@ -35,7 +35,6 @@ public void initClusterWatcher() { public void testUpdate() throws InterruptedException { // this.initClusterWatcher(); this.lettuceRedisWatcher.update(); - Thread.sleep(100); } @Test @@ -44,7 +43,6 @@ public void testConsumerCallback() throws InterruptedException { // while (true) { this.lettuceRedisWatcher.setUpdateCallback((s) -> System.out.println(s)); this.lettuceRedisWatcher.update(); - Thread.sleep(100); // } }