Skip to content

Commit

Permalink
Unbound SerializingExecutors used for watch events
Browse files Browse the repository at this point in the history
Fixes possible blocking/timeouts on network threads which break watch
streams

Also:
- Increase max shared event loop group size from 4 to 8
- Change SerializingExecutor to throw
RejectedExecutionException instead of blocking when bounded and full
- Minor tweaks to comments
  • Loading branch information
njhill committed Jul 9, 2018
1 parent ad9a66f commit 20e5aba
Show file tree
Hide file tree
Showing 7 changed files with 30 additions and 27 deletions.
3 changes: 2 additions & 1 deletion src/main/java/com/ibm/etcd/client/EtcdClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ public EtcdClient build() {
}

private static int defaultThreadCount() {
return Math.min(4, Runtime.getRuntime().availableProcessors());
return Math.min(8, Runtime.getRuntime().availableProcessors());
}

public static Builder forEndpoint(String host, int port) {
Expand Down Expand Up @@ -342,6 +342,7 @@ public void close() {
});
}

// used only in clean shutdown logic
final Iterable<SingleThreadEventLoop> eventLoops;

/**
Expand Down
10 changes: 7 additions & 3 deletions src/main/java/com/ibm/etcd/client/GrpcClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -322,16 +322,16 @@ class ResilientBiDiStream<ReqT,RespT> {
*
* @param method
* @param respStream
* @param responseExecutor must be serialized
* @param responseExecutor
*/
public ResilientBiDiStream(MethodDescriptor<ReqT,RespT> method,
ResilientResponseObserver<ReqT,RespT> respStream,
Executor responseExecutor) {
this.method = method;
this.respStream = respStream;
this.responseExecutor = serialized(responseExecutor != null
? responseExecutor : userExecutor, 0);
this.requestExecutor = !sendViaEventLoop ? null : serialized(ses, 0);
? responseExecutor : userExecutor);
this.requestExecutor = !sendViaEventLoop ? null : serialized(ses);
}

// must only be called once - enforcement logic omitted since private
Expand Down Expand Up @@ -629,6 +629,10 @@ public static <I> I sentinel(Class<I> intface) {
});
}

public static Executor serialized(Executor parent) {
return serialized(parent, 0);
}

public static Executor serialized(Executor parent, int bufferSize) {
return parent instanceof SerializingExecutor
|| parent instanceof io.grpc.internal.SerializingExecutor
Expand Down
16 changes: 6 additions & 10 deletions src/main/java/com/ibm/etcd/client/SerializingExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@
package com.ibm.etcd.client;

import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.locks.ReentrantLock;

import org.slf4j.Logger;
Expand All @@ -35,7 +36,6 @@ public class SerializingExecutor implements Executor {

private final Executor sharedPool;
private final Queue<Runnable> workQueue;
private final boolean bounded;
private volatile boolean scheduled = false;

public SerializingExecutor(Executor parentPool) {
Expand All @@ -45,8 +45,7 @@ public SerializingExecutor(Executor parentPool) {
public SerializingExecutor(Executor parentPool, int capacity) {
if(parentPool == null) throw new NullPointerException();
this.sharedPool = parentPool;
this.bounded = capacity > 0;
this.workQueue = bounded ? new ArrayBlockingQueue<>(capacity)
this.workQueue = capacity > 0 ? new LinkedBlockingQueue<>(capacity)
: new ConcurrentLinkedQueue<>();
}

Expand Down Expand Up @@ -89,12 +88,9 @@ class TaskRun extends ReentrantLock implements Runnable {

@Override
public void execute(Runnable command) {
if(bounded) try {
((ArrayBlockingQueue<Runnable>)workQueue).put(command);
} catch (InterruptedException e) {
throw new RuntimeException(e); //TODO TBD
}
else workQueue.offer(command);
if(!workQueue.offer(command)) {
throw new RejectedExecutionException("SerializingExecutor work queue full");
}

if(!scheduled) {
boolean doit = false;
Expand Down
6 changes: 3 additions & 3 deletions src/main/java/com/ibm/etcd/client/lease/EtcdLeaseClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,8 @@ public class EtcdLeaseClient implements LeaseClient, Closeable {
public EtcdLeaseClient(GrpcClient client) {
this.client = client;
this.ses = client.getInternalExecutor();
this.kaReqExecutor = GrpcClient.serialized(ses, 0);
this.respExecutor = GrpcClient.serialized(ses, 0);
this.kaReqExecutor = GrpcClient.serialized(ses);
this.respExecutor = GrpcClient.serialized(ses);
}

// ------ simple lease APIs
Expand Down Expand Up @@ -305,7 +305,7 @@ public LeaseRecord(long leaseId,
this.observers = observer == null ? new CopyOnWriteArrayList<>()
: new CopyOnWriteArrayList<>(Collections.singletonList(observer));
this.eventLoop = GrpcClient.serialized(executor != null ? executor
: client.getResponseExecutor(), 0);
: client.getResponseExecutor());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ public class PersistentLeaseKey extends AbstractFuture<ByteString> implements Au
public PersistentLeaseKey(EtcdClient client, PersistentLease lease,
ByteString key, ByteString defaultValue, RangeCache rangeCache) {
this.client = client;
//TODO if rangeCache != null, verify key lies within it's range
//TODO if rangeCache != null, verify key lies within its range
this.rangeCache = rangeCache;
this.lease = lease;
this.key = key;
Expand Down Expand Up @@ -108,7 +108,7 @@ protected boolean isActive() {
* @param client
* @param key
* @param defaultValue
* @param rangeCache
* @param rangeCache optional, may be null
*/
public PersistentLeaseKey(EtcdClient client,
ByteString key, ByteString defaultValue, RangeCache rangeCache) {
Expand All @@ -119,7 +119,7 @@ public synchronized void start() {
if(executor != null) throw new IllegalStateException("already started");
if(closeFuture != null) throw new IllegalStateException("closed");
//TODO TBD or have lease expose its response executor
executor = GrpcClient.serialized(client.getExecutor(), 0);
executor = GrpcClient.serialized(client.getExecutor());
if(lease == null) lease = client.getSessionLease();
lease.addStateObserver(stateObserver, true);
}
Expand Down
12 changes: 7 additions & 5 deletions src/main/java/com/ibm/etcd/client/utils/RangeCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ public RangeCache(EtcdClient client, ByteString fromKey, ByteString toKey, boole
int diff = Long.compare(kv1.getModRevision(), kv2.getModRevision());
return diff != 0 ? diff : KeyUtils.compareByteStrings(kv1.getKey(), kv2.getKey());
});
this.listenerExecutor = GrpcClient.serialized(client.getExecutor(), 0);
this.listenerExecutor = GrpcClient.serialized(client.getExecutor());
}

/**
Expand Down Expand Up @@ -211,8 +211,8 @@ protected ListenableFuture<Boolean> fullRefreshCache() {
Watch newWatch = kvClient.watch(fromKey).rangeEnd(toKey) //.prevKv() //TODO TBD
.progressNotify().startRevision(snapshotRev + 1).executor(listenerExecutor)
.start(new StreamObserver<WatchUpdate>() {

@Override public void onNext(WatchUpdate update) {
@Override
public void onNext(WatchUpdate update) {
List<Event> events = update.getEvents();
int eventCount = events != null ? events.size() : 0;
if(eventCount > 0) for(Event event : events) {
Expand All @@ -234,7 +234,8 @@ protected ListenableFuture<Boolean> fullRefreshCache() {
revisionUpdate(eventCount == 0 ? update.getHeader().getRevision() - 1L
: events.get(eventCount-1).getKv().getModRevision());
}
@Override public void onCompleted() {
@Override
public void onCompleted() {
// should only happen after external close()
if(!closed) {
if(!client.isClosed()) {
Expand All @@ -243,7 +244,8 @@ protected ListenableFuture<Boolean> fullRefreshCache() {
close();
}
}
@Override public void onError(Throwable t) {
@Override
public void onError(Throwable t) {
logger.error("Watch failed with exception ", t);
if(t instanceof RevisionCompactedException) synchronized(RangeCache.this) {
// fail if happens during start, otherwise refresh
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/com/ibm/etcd/client/watch/EtcdWatchClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ public EtcdWatchClient(GrpcClient client) {
public EtcdWatchClient(GrpcClient client, Executor executor) {
this.client = client;
this.observerExecutor = executor != null ? executor : ForkJoinPool.commonPool();
this.eventLoop = GrpcClient.serialized(client.getInternalExecutor(), 128); // bounded for back-pressure
this.eventLoop = GrpcClient.serialized(client.getInternalExecutor());
}

/**
Expand All @@ -120,7 +120,7 @@ public WatcherRecord(WatchCreateRequest request,
long rev = request.getStartRevision();
this.upToRevision = rev - 1;
// bounded for back-pressure
this.watcherExecutor = GrpcClient.serialized(parentExecutor, 64);
this.watcherExecutor = GrpcClient.serialized(parentExecutor);
}

// null => cancelled (non-error)
Expand Down

0 comments on commit 20e5aba

Please sign in to comment.