diff --git a/zuul-core/src/main/java/com/netflix/zuul/netty/connectionpool/DefaultClientChannelManager.java b/zuul-core/src/main/java/com/netflix/zuul/netty/connectionpool/DefaultClientChannelManager.java index daa9a9856f..9536706b23 100644 --- a/zuul-core/src/main/java/com/netflix/zuul/netty/connectionpool/DefaultClientChannelManager.java +++ b/zuul-core/src/main/java/com/netflix/zuul/netty/connectionpool/DefaultClientChannelManager.java @@ -63,7 +63,7 @@ public class DefaultClientChannelManager implements ClientChannelManager { public static final String METRIC_PREFIX = "connectionpool"; - private final Resolver dynamicServerResolver; + private final Resolver dynamicServerResolver; private final ConnectionPoolConfig connPoolConfig; private final IClientConfig clientConfig; private final Registry spectatorRegistry; @@ -100,40 +100,12 @@ public class DefaultClientChannelManager implements ClientChannelManager { public DefaultClientChannelManager( OriginName originName, IClientConfig clientConfig, Registry spectatorRegistry) { - this.originName = Objects.requireNonNull(originName, "originName"); - this.dynamicServerResolver = new DynamicServerResolver(clientConfig, new ServerPoolListener()); - - String metricId = originName.getMetricId(); - - this.clientConfig = clientConfig; - this.spectatorRegistry = spectatorRegistry; - this.perServerPools = new ConcurrentHashMap<>(200); - - this.connPoolConfig = new ConnectionPoolConfigImpl(originName, this.clientConfig); - - this.createNewConnCounter = SpectatorUtils.newCounter(METRIC_PREFIX + "_create", metricId); - this.createConnSucceededCounter = SpectatorUtils.newCounter(METRIC_PREFIX + "_create_success", metricId); - this.createConnFailedCounter = SpectatorUtils.newCounter(METRIC_PREFIX + "_create_fail", metricId); - - this.closeConnCounter = SpectatorUtils.newCounter(METRIC_PREFIX + "_close", metricId); - this.closeAbovePoolHighWaterMarkCounter = SpectatorUtils.newCounter(METRIC_PREFIX + "_closeAbovePoolHighWaterMark", metricId); - this.closeExpiredConnLifetimeCounter = SpectatorUtils.newCounter(METRIC_PREFIX + "_closeExpiredConnLifetime", metricId); - this.requestConnCounter = SpectatorUtils.newCounter(METRIC_PREFIX + "_request", metricId); - this.reuseConnCounter = SpectatorUtils.newCounter(METRIC_PREFIX + "_reuse", metricId); - this.releaseConnCounter = SpectatorUtils.newCounter(METRIC_PREFIX + "_release", metricId); - this.alreadyClosedCounter = SpectatorUtils.newCounter(METRIC_PREFIX + "_alreadyClosed", metricId); - this.connTakenFromPoolIsNotOpen = SpectatorUtils.newCounter(METRIC_PREFIX + "_fromPoolIsClosed", metricId); - this.maxConnsPerHostExceededCounter = SpectatorUtils.newCounter(METRIC_PREFIX + "_maxConnsPerHostExceeded", metricId); - this.closeWrtBusyConnCounter = SpectatorUtils.newCounter(METRIC_PREFIX + "_closeWrtBusyConnCounter", metricId); - this.connEstablishTimer = PercentileTimer.get(spectatorRegistry, spectatorRegistry.createId(METRIC_PREFIX + "_createTiming", "id", metricId)); - this.connsInPool = SpectatorUtils.newGauge(METRIC_PREFIX + "_inPool", metricId, new AtomicInteger()); - this.connsInUse = SpectatorUtils.newGauge(METRIC_PREFIX + "_inUse", metricId, new AtomicInteger()); + this(originName, clientConfig, new DynamicServerResolver(clientConfig), spectatorRegistry); } - @VisibleForTesting public DefaultClientChannelManager( OriginName originName, IClientConfig clientConfig, - Resolver resolver, Registry spectatorRegistry) { + Resolver resolver, Registry spectatorRegistry) { this.originName = Objects.requireNonNull(originName, "originName"); this.dynamicServerResolver = resolver; @@ -151,7 +123,7 @@ public DefaultClientChannelManager( this.closeConnCounter = SpectatorUtils.newCounter(METRIC_PREFIX + "_close", metricId); this.closeAbovePoolHighWaterMarkCounter = SpectatorUtils.newCounter(METRIC_PREFIX + "_closeAbovePoolHighWaterMark", metricId); - this.closeExpiredConnLifetimeCounter = SpectatorUtils.newCounter(METRIC_PREFIX + "__closeExpiredConnLifetime", metricId); + this.closeExpiredConnLifetimeCounter = SpectatorUtils.newCounter(METRIC_PREFIX + "_closeExpiredConnLifetime", metricId); this.requestConnCounter = SpectatorUtils.newCounter(METRIC_PREFIX + "_request", metricId); this.reuseConnCounter = SpectatorUtils.newCounter(METRIC_PREFIX + "_reuse", metricId); this.releaseConnCounter = SpectatorUtils.newCounter(METRIC_PREFIX + "_release", metricId); @@ -167,6 +139,7 @@ public DefaultClientChannelManager( @Override public void init() { + dynamicServerResolver.setListener(new ServerPoolListener()); // Load channel initializer and conn factory. // We don't do this within the constructor because some subclass may not be initialized until post-construct. this.channelInitializer = createChannelInitializer(clientConfig, connPoolConfig, spectatorRegistry); @@ -413,7 +386,6 @@ protected IConnectionPool createConnectionPool( } final class ServerPoolListener implements ResolverListener { - @Override public void onChange(List removedSet) { if (!removedSet.isEmpty()) { @@ -427,7 +399,6 @@ public void onChange(List removedSet) { } } } - } @Override @@ -477,4 +448,5 @@ static SocketAddress pickAddressInternal(ResolverResult chosenServer, @Nullable protected SocketAddress pickAddress(DiscoveryResult chosenServer) { return pickAddressInternal(chosenServer, connPoolConfig.getOriginName()); } + } diff --git a/zuul-discovery/build.gradle b/zuul-discovery/build.gradle index f8ba8e4912..8d02e36938 100644 --- a/zuul-discovery/build.gradle +++ b/zuul-discovery/build.gradle @@ -5,7 +5,7 @@ dependencies { implementation libraries.guava implementation libraries.slf4j - implementation "com.netflix.ribbon:ribbon-loadbalancer:${versions_ribbon}" + api "com.netflix.ribbon:ribbon-loadbalancer:${versions_ribbon}" implementation "com.netflix.ribbon:ribbon-core:${versions_ribbon}" implementation "com.netflix.ribbon:ribbon-eureka:${versions_ribbon}" implementation "com.netflix.ribbon:ribbon-archaius:${versions_ribbon}" diff --git a/zuul-discovery/src/main/java/com/netflix/zuul/discovery/DynamicServerResolver.java b/zuul-discovery/src/main/java/com/netflix/zuul/discovery/DynamicServerResolver.java index 35761209c5..35860d8b86 100644 --- a/zuul-discovery/src/main/java/com/netflix/zuul/discovery/DynamicServerResolver.java +++ b/zuul-discovery/src/main/java/com/netflix/zuul/discovery/DynamicServerResolver.java @@ -28,9 +28,12 @@ import com.netflix.zuul.resolver.ResolverListener; import java.util.HashSet; import java.util.List; +import java.util.Objects; import java.util.Set; import java.util.stream.Collectors; import javax.annotation.Nullable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * @author Argha C @@ -40,15 +43,37 @@ */ public class DynamicServerResolver implements Resolver { + private static final Logger LOG = LoggerFactory.getLogger(DynamicServerResolver.class); + private final DynamicServerListLoadBalancer loadBalancer; - ResolverListener listener; + private ResolverListener listener; + @Deprecated public DynamicServerResolver(IClientConfig clientConfig, ResolverListener listener) { this.loadBalancer = createLoadBalancer(clientConfig); this.loadBalancer.addServerListChangeListener(this::onUpdate); this.listener = listener; } + public DynamicServerResolver(IClientConfig clientConfig) { + this(createLoadBalancer(clientConfig)); + } + + public DynamicServerResolver(DynamicServerListLoadBalancer loadBalancer) { + this.loadBalancer = Objects.requireNonNull(loadBalancer); + } + + @Override + public void setListener(ResolverListener listener) { + if(this.listener != null) { + LOG.warn("Ignoring call to setListener, because a listener was already set"); + return; + } + + this.listener = Objects.requireNonNull(listener); + this.loadBalancer.addServerListChangeListener(this::onUpdate); + } + @Override public DiscoveryResult resolve(@Nullable Object key) { final Server server = loadBalancer.chooseServer(key); @@ -65,7 +90,7 @@ public void shutdown() { loadBalancer.shutdown(); } - private DynamicServerListLoadBalancer createLoadBalancer(IClientConfig clientConfig) { + private static DynamicServerListLoadBalancer createLoadBalancer(IClientConfig clientConfig) { //TODO(argha-c): Revisit this style of LB initialization post modularization. Ideally the LB should be pluggable. // Use a hard coded string for the LB default name to avoid a dependency on Ribbon classes. diff --git a/zuul-discovery/src/main/java/com/netflix/zuul/resolver/Resolver.java b/zuul-discovery/src/main/java/com/netflix/zuul/resolver/Resolver.java index 6559008562..7104b13f9e 100644 --- a/zuul-discovery/src/main/java/com/netflix/zuul/resolver/Resolver.java +++ b/zuul-discovery/src/main/java/com/netflix/zuul/resolver/Resolver.java @@ -42,4 +42,6 @@ public interface Resolver { * hook to perform activities on shutdown */ void shutdown(); + + default void setListener(ResolverListener listener) { } } diff --git a/zuul-discovery/src/test/java/com/netflix/zuul/discovery/DynamicServerResolverTest.java b/zuul-discovery/src/test/java/com/netflix/zuul/discovery/DynamicServerResolverTest.java index 69d5d1c3c2..2343f61cea 100644 --- a/zuul-discovery/src/test/java/com/netflix/zuul/discovery/DynamicServerResolverTest.java +++ b/zuul-discovery/src/test/java/com/netflix/zuul/discovery/DynamicServerResolverTest.java @@ -49,7 +49,8 @@ public List updatedList() { } final CustomListener listener = new CustomListener(); - final DynamicServerResolver resolver = new DynamicServerResolver(new DefaultClientConfigImpl(), listener); + final DynamicServerResolver resolver = new DynamicServerResolver(new DefaultClientConfigImpl()); + resolver.setListener(listener); final InstanceInfo first = Builder.newBuilder() .setAppName("zuul-discovery-1") @@ -73,11 +74,7 @@ public List updatedList() { @Test void properSentinelValueWhenServersUnavailable() { - final DynamicServerResolver resolver = new DynamicServerResolver(new DefaultClientConfigImpl(), new ResolverListener() { - @Override - public void onChange(List removedSet) { - } - }); + final DynamicServerResolver resolver = new DynamicServerResolver(new DefaultClientConfigImpl()); final DiscoveryResult nonExistentServer = resolver.resolve(null);