diff --git a/src/main/java/com/ibm/etcd/client/EtcdClient.java b/src/main/java/com/ibm/etcd/client/EtcdClient.java index 8787668..8652e76 100644 --- a/src/main/java/com/ibm/etcd/client/EtcdClient.java +++ b/src/main/java/com/ibm/etcd/client/EtcdClient.java @@ -210,7 +210,7 @@ public EtcdClient build() { } private static int defaultThreadCount() { - return Math.min(4, Runtime.getRuntime().availableProcessors()); + return Math.min(8, Runtime.getRuntime().availableProcessors()); } public static Builder forEndpoint(String host, int port) { @@ -342,6 +342,7 @@ public void close() { }); } + // used only in clean shutdown logic final Iterable eventLoops; /** diff --git a/src/main/java/com/ibm/etcd/client/GrpcClient.java b/src/main/java/com/ibm/etcd/client/GrpcClient.java index 8a590ef..c8a7247 100644 --- a/src/main/java/com/ibm/etcd/client/GrpcClient.java +++ b/src/main/java/com/ibm/etcd/client/GrpcClient.java @@ -322,7 +322,7 @@ class ResilientBiDiStream { * * @param method * @param respStream - * @param responseExecutor must be serialized + * @param responseExecutor */ public ResilientBiDiStream(MethodDescriptor method, ResilientResponseObserver respStream, @@ -330,8 +330,8 @@ public ResilientBiDiStream(MethodDescriptor method, this.method = method; this.respStream = respStream; this.responseExecutor = serialized(responseExecutor != null - ? responseExecutor : userExecutor, 0); - this.requestExecutor = !sendViaEventLoop ? null : serialized(ses, 0); + ? responseExecutor : userExecutor); + this.requestExecutor = !sendViaEventLoop ? null : serialized(ses); } // must only be called once - enforcement logic omitted since private @@ -629,6 +629,10 @@ public static I sentinel(Class intface) { }); } + public static Executor serialized(Executor parent) { + return serialized(parent, 0); + } + public static Executor serialized(Executor parent, int bufferSize) { return parent instanceof SerializingExecutor || parent instanceof io.grpc.internal.SerializingExecutor diff --git a/src/main/java/com/ibm/etcd/client/SerializingExecutor.java b/src/main/java/com/ibm/etcd/client/SerializingExecutor.java index 245a1af..6211eed 100644 --- a/src/main/java/com/ibm/etcd/client/SerializingExecutor.java +++ b/src/main/java/com/ibm/etcd/client/SerializingExecutor.java @@ -16,9 +16,10 @@ package com.ibm.etcd.client; import java.util.Queue; -import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.Executor; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.locks.ReentrantLock; import org.slf4j.Logger; @@ -35,7 +36,6 @@ public class SerializingExecutor implements Executor { private final Executor sharedPool; private final Queue workQueue; - private final boolean bounded; private volatile boolean scheduled = false; public SerializingExecutor(Executor parentPool) { @@ -45,8 +45,7 @@ public SerializingExecutor(Executor parentPool) { public SerializingExecutor(Executor parentPool, int capacity) { if(parentPool == null) throw new NullPointerException(); this.sharedPool = parentPool; - this.bounded = capacity > 0; - this.workQueue = bounded ? new ArrayBlockingQueue<>(capacity) + this.workQueue = capacity > 0 ? new LinkedBlockingQueue<>(capacity) : new ConcurrentLinkedQueue<>(); } @@ -89,12 +88,9 @@ class TaskRun extends ReentrantLock implements Runnable { @Override public void execute(Runnable command) { - if(bounded) try { - ((ArrayBlockingQueue)workQueue).put(command); - } catch (InterruptedException e) { - throw new RuntimeException(e); //TODO TBD - } - else workQueue.offer(command); + if(!workQueue.offer(command)) { + throw new RejectedExecutionException("SerializingExecutor work queue full"); + } if(!scheduled) { boolean doit = false; diff --git a/src/main/java/com/ibm/etcd/client/lease/EtcdLeaseClient.java b/src/main/java/com/ibm/etcd/client/lease/EtcdLeaseClient.java index 1ceb35c..7b8b1e8 100644 --- a/src/main/java/com/ibm/etcd/client/lease/EtcdLeaseClient.java +++ b/src/main/java/com/ibm/etcd/client/lease/EtcdLeaseClient.java @@ -86,8 +86,8 @@ public class EtcdLeaseClient implements LeaseClient, Closeable { public EtcdLeaseClient(GrpcClient client) { this.client = client; this.ses = client.getInternalExecutor(); - this.kaReqExecutor = GrpcClient.serialized(ses, 0); - this.respExecutor = GrpcClient.serialized(ses, 0); + this.kaReqExecutor = GrpcClient.serialized(ses); + this.respExecutor = GrpcClient.serialized(ses); } // ------ simple lease APIs @@ -305,7 +305,7 @@ public LeaseRecord(long leaseId, this.observers = observer == null ? new CopyOnWriteArrayList<>() : new CopyOnWriteArrayList<>(Collections.singletonList(observer)); this.eventLoop = GrpcClient.serialized(executor != null ? executor - : client.getResponseExecutor(), 0); + : client.getResponseExecutor()); } @Override diff --git a/src/main/java/com/ibm/etcd/client/utils/PersistentLeaseKey.java b/src/main/java/com/ibm/etcd/client/utils/PersistentLeaseKey.java index 6756a7b..e597084 100644 --- a/src/main/java/com/ibm/etcd/client/utils/PersistentLeaseKey.java +++ b/src/main/java/com/ibm/etcd/client/utils/PersistentLeaseKey.java @@ -80,7 +80,7 @@ public class PersistentLeaseKey extends AbstractFuture implements Au public PersistentLeaseKey(EtcdClient client, PersistentLease lease, ByteString key, ByteString defaultValue, RangeCache rangeCache) { this.client = client; - //TODO if rangeCache != null, verify key lies within it's range + //TODO if rangeCache != null, verify key lies within its range this.rangeCache = rangeCache; this.lease = lease; this.key = key; @@ -108,7 +108,7 @@ protected boolean isActive() { * @param client * @param key * @param defaultValue - * @param rangeCache + * @param rangeCache optional, may be null */ public PersistentLeaseKey(EtcdClient client, ByteString key, ByteString defaultValue, RangeCache rangeCache) { @@ -119,7 +119,7 @@ public synchronized void start() { if(executor != null) throw new IllegalStateException("already started"); if(closeFuture != null) throw new IllegalStateException("closed"); //TODO TBD or have lease expose its response executor - executor = GrpcClient.serialized(client.getExecutor(), 0); + executor = GrpcClient.serialized(client.getExecutor()); if(lease == null) lease = client.getSessionLease(); lease.addStateObserver(stateObserver, true); } diff --git a/src/main/java/com/ibm/etcd/client/utils/RangeCache.java b/src/main/java/com/ibm/etcd/client/utils/RangeCache.java index 6d6a552..bd8277d 100644 --- a/src/main/java/com/ibm/etcd/client/utils/RangeCache.java +++ b/src/main/java/com/ibm/etcd/client/utils/RangeCache.java @@ -137,7 +137,7 @@ public RangeCache(EtcdClient client, ByteString fromKey, ByteString toKey, boole int diff = Long.compare(kv1.getModRevision(), kv2.getModRevision()); return diff != 0 ? diff : KeyUtils.compareByteStrings(kv1.getKey(), kv2.getKey()); }); - this.listenerExecutor = GrpcClient.serialized(client.getExecutor(), 0); + this.listenerExecutor = GrpcClient.serialized(client.getExecutor()); } /** @@ -211,8 +211,8 @@ protected ListenableFuture fullRefreshCache() { Watch newWatch = kvClient.watch(fromKey).rangeEnd(toKey) //.prevKv() //TODO TBD .progressNotify().startRevision(snapshotRev + 1).executor(listenerExecutor) .start(new StreamObserver() { - - @Override public void onNext(WatchUpdate update) { + @Override + public void onNext(WatchUpdate update) { List events = update.getEvents(); int eventCount = events != null ? events.size() : 0; if(eventCount > 0) for(Event event : events) { @@ -234,7 +234,8 @@ protected ListenableFuture fullRefreshCache() { revisionUpdate(eventCount == 0 ? update.getHeader().getRevision() - 1L : events.get(eventCount-1).getKv().getModRevision()); } - @Override public void onCompleted() { + @Override + public void onCompleted() { // should only happen after external close() if(!closed) { if(!client.isClosed()) { @@ -243,7 +244,8 @@ protected ListenableFuture fullRefreshCache() { close(); } } - @Override public void onError(Throwable t) { + @Override + public void onError(Throwable t) { logger.error("Watch failed with exception ", t); if(t instanceof RevisionCompactedException) synchronized(RangeCache.this) { // fail if happens during start, otherwise refresh diff --git a/src/main/java/com/ibm/etcd/client/watch/EtcdWatchClient.java b/src/main/java/com/ibm/etcd/client/watch/EtcdWatchClient.java index fc2aa84..4ea9722 100644 --- a/src/main/java/com/ibm/etcd/client/watch/EtcdWatchClient.java +++ b/src/main/java/com/ibm/etcd/client/watch/EtcdWatchClient.java @@ -93,7 +93,7 @@ public EtcdWatchClient(GrpcClient client) { public EtcdWatchClient(GrpcClient client, Executor executor) { this.client = client; this.observerExecutor = executor != null ? executor : ForkJoinPool.commonPool(); - this.eventLoop = GrpcClient.serialized(client.getInternalExecutor(), 128); // bounded for back-pressure + this.eventLoop = GrpcClient.serialized(client.getInternalExecutor()); } /** @@ -120,7 +120,7 @@ public WatcherRecord(WatchCreateRequest request, long rev = request.getStartRevision(); this.upToRevision = rev - 1; // bounded for back-pressure - this.watcherExecutor = GrpcClient.serialized(parentExecutor, 64); + this.watcherExecutor = GrpcClient.serialized(parentExecutor); } // null => cancelled (non-error)