From a56490af94edc3fee040526c3fb09809e76a0672 Mon Sep 17 00:00:00 2001 From: lutovich Date: Thu, 14 Dec 2017 14:29:35 +0100 Subject: [PATCH] Use connect timeout in Bolt and TLS handshake Previously configured connection timeout has only been used to limit amount of time it takes to establish a TCP connection. This is not the only thing driver does to establish a logical connection. It also executes TLS handshake (if configured to use encryption) and Bolt handshake to negotiate protocol version with the database. Later two steps perform reads without any timeout. This could be a problem when database does not respond in time. Also driver might be simply connecting to something that is not the database and never responds. This commit makes driver use configured value of connect timeout as read timeout for TLS and Bolt handshakes. So both will not hang forever when other side does not respond. Default value of connection timeout is 5 seconds. With this commit driver will wait up to 5 seconds for TLS and Bolt handshakes. Timeout is enforced by `ConnectTimeoutHandler` which is added to the channel pipeline when connection is established and removed from it when Bolt handshake completes. --- .../internal/async/ChannelConnectorImpl.java | 37 +++++++- .../async/NettyChannelInitializer.java | 9 +- .../async/inbound/ConnectTimeoutHandler.java | 58 ++++++++++++ .../async/ChannelConnectorImplTest.java | 88 ++++++++++++++++--- .../async/NettyChannelInitializerTest.java | 39 ++++++-- .../inbound/ConnectTimeoutHandlerTest.java | 88 +++++++++++++++++++ .../neo4j/driver/v1/GraphDatabaseTest.java | 52 +++++++++++ .../driver/v1/integration/SessionIT.java | 49 ++++++++++- 8 files changed, 395 insertions(+), 25 deletions(-) create mode 100644 driver/src/main/java/org/neo4j/driver/internal/async/inbound/ConnectTimeoutHandler.java create mode 100644 driver/src/test/java/org/neo4j/driver/internal/async/inbound/ConnectTimeoutHandlerTest.java diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/ChannelConnectorImpl.java b/driver/src/main/java/org/neo4j/driver/internal/async/ChannelConnectorImpl.java index 5f7b244288..12c41c80f4 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/async/ChannelConnectorImpl.java +++ b/driver/src/main/java/org/neo4j/driver/internal/async/ChannelConnectorImpl.java @@ -22,12 +22,14 @@ import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelOption; +import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelPromise; import java.util.Map; import org.neo4j.driver.internal.BoltServerAddress; import org.neo4j.driver.internal.ConnectionSettings; +import org.neo4j.driver.internal.async.inbound.ConnectTimeoutHandler; import org.neo4j.driver.internal.security.InternalAuthToken; import org.neo4j.driver.internal.security.SecurityPlan; import org.neo4j.driver.internal.util.Clock; @@ -71,7 +73,7 @@ public ChannelConnectorImpl( ConnectionSettings connectionSettings, SecurityPlan public ChannelFuture connect( BoltServerAddress address, Bootstrap bootstrap ) { bootstrap.option( ChannelOption.CONNECT_TIMEOUT_MILLIS, connectTimeoutMillis ); - bootstrap.handler( new NettyChannelInitializer( address, securityPlan, clock, logging ) ); + bootstrap.handler( new NettyChannelInitializer( address, securityPlan, connectTimeoutMillis, clock, logging ) ); ChannelFuture channelConnected = bootstrap.connect( address.toSocketAddress() ); @@ -79,12 +81,39 @@ public ChannelFuture connect( BoltServerAddress address, Bootstrap bootstrap ) ChannelPromise handshakeCompleted = channel.newPromise(); ChannelPromise connectionInitialized = channel.newPromise(); + installChannelConnectedListeners( address, channelConnected, handshakeCompleted ); + installHandshakeCompletedListeners( handshakeCompleted, connectionInitialized ); + + return connectionInitialized; + } + + private void installChannelConnectedListeners( BoltServerAddress address, ChannelFuture channelConnected, + ChannelPromise handshakeCompleted ) + { + ChannelPipeline pipeline = channelConnected.channel().pipeline(); + + // add timeout handler to the pipeline when channel is connected. it's needed to limit amount of time code + // spends in TLS and Bolt handshakes. prevents infinite waiting when database does not respond + channelConnected.addListener( future -> + pipeline.addFirst( new ConnectTimeoutHandler( connectTimeoutMillis ) ) ); + + // add listener that sends Bolt handshake bytes when channel is connected channelConnected.addListener( new ChannelConnectedListener( address, pipelineBuilder, handshakeCompleted, logging ) ); - handshakeCompleted.addListener( - new HandshakeCompletedListener( userAgent, authToken, connectionInitialized ) ); + } - return connectionInitialized; + private void installHandshakeCompletedListeners( ChannelPromise handshakeCompleted, + ChannelPromise connectionInitialized ) + { + ChannelPipeline pipeline = handshakeCompleted.channel().pipeline(); + + // remove timeout handler from the pipeline once TLS and Bolt handshakes are completed. regular protocol + // messages will flow next and we do not want to have read timeout for them + handshakeCompleted.addListener( future -> pipeline.remove( ConnectTimeoutHandler.class ) ); + + // add listener that sends an INIT message. connection is now fully established. channel pipeline if fully + // set to send/receive messages for a selected protocol version + handshakeCompleted.addListener( new HandshakeCompletedListener( userAgent, authToken, connectionInitialized ) ); } private static Map tokenAsMap( AuthToken token ) diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/NettyChannelInitializer.java b/driver/src/main/java/org/neo4j/driver/internal/async/NettyChannelInitializer.java index c025cb0ba5..233464cbdc 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/async/NettyChannelInitializer.java +++ b/driver/src/main/java/org/neo4j/driver/internal/async/NettyChannelInitializer.java @@ -39,13 +39,16 @@ public class NettyChannelInitializer extends ChannelInitializer { private final BoltServerAddress address; private final SecurityPlan securityPlan; + private final int connectTimeoutMillis; private final Clock clock; private final Logging logging; - public NettyChannelInitializer( BoltServerAddress address, SecurityPlan securityPlan, Clock clock, Logging logging ) + public NettyChannelInitializer( BoltServerAddress address, SecurityPlan securityPlan, int connectTimeoutMillis, + Clock clock, Logging logging ) { this.address = address; this.securityPlan = securityPlan; + this.connectTimeoutMillis = connectTimeoutMillis; this.clock = clock; this.logging = logging; } @@ -65,7 +68,9 @@ protected void initChannel( Channel channel ) private SslHandler createSslHandler() { SSLEngine sslEngine = createSslEngine(); - return new SslHandler( sslEngine ); + SslHandler sslHandler = new SslHandler( sslEngine ); + sslHandler.setHandshakeTimeoutMillis( connectTimeoutMillis ); + return sslHandler; } private SSLEngine createSslEngine() diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/inbound/ConnectTimeoutHandler.java b/driver/src/main/java/org/neo4j/driver/internal/async/inbound/ConnectTimeoutHandler.java new file mode 100644 index 0000000000..2980cb814b --- /dev/null +++ b/driver/src/main/java/org/neo4j/driver/internal/async/inbound/ConnectTimeoutHandler.java @@ -0,0 +1,58 @@ +/* + * Copyright (c) 2002-2017 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.internal.async.inbound; + +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.timeout.ReadTimeoutHandler; + +import java.util.concurrent.TimeUnit; + +import org.neo4j.driver.v1.exceptions.ServiceUnavailableException; + +/** + * Handler needed to limit amount of time connection performs TLS and Bolt handshakes. + * It should only be used when connection is established and removed from the pipeline afterwards. + * Otherwise it will make long running queries fail. + */ +public class ConnectTimeoutHandler extends ReadTimeoutHandler +{ + private final long timeoutMillis; + private boolean triggered; + + public ConnectTimeoutHandler( long timeoutMillis ) + { + super( timeoutMillis, TimeUnit.MILLISECONDS ); + this.timeoutMillis = timeoutMillis; + } + + @Override + protected void readTimedOut( ChannelHandlerContext ctx ) + { + if ( !triggered ) + { + triggered = true; + ctx.fireExceptionCaught( unableToConnectError() ); + } + } + + private ServiceUnavailableException unableToConnectError() + { + return new ServiceUnavailableException( "Unable to establish connection in " + timeoutMillis + "ms" ); + } +} diff --git a/driver/src/test/java/org/neo4j/driver/internal/async/ChannelConnectorImplTest.java b/driver/src/test/java/org/neo4j/driver/internal/async/ChannelConnectorImplTest.java index 3aaf496c50..3780f311c9 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/async/ChannelConnectorImplTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/async/ChannelConnectorImplTest.java @@ -21,17 +21,24 @@ import io.netty.bootstrap.Bootstrap; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelPipeline; +import io.netty.handler.ssl.SslHandler; import org.junit.After; import org.junit.Before; import org.junit.Rule; import org.junit.Test; +import org.junit.rules.RuleChain; +import org.junit.rules.Timeout; +import java.io.IOException; import java.net.ConnectException; +import java.net.ServerSocket; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import org.neo4j.driver.internal.BoltServerAddress; import org.neo4j.driver.internal.ConnectionSettings; +import org.neo4j.driver.internal.async.inbound.ConnectTimeoutHandler; import org.neo4j.driver.internal.security.SecurityPlan; import org.neo4j.driver.internal.util.FakeClock; import org.neo4j.driver.v1.AuthToken; @@ -42,7 +49,9 @@ import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.startsWith; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; @@ -52,19 +61,20 @@ public class ChannelConnectorImplTest { + private final TestNeo4j neo4j = new TestNeo4j(); @Rule - public final TestNeo4j neo4j = new TestNeo4j(); + public final RuleChain ruleChain = RuleChain.outerRule( Timeout.seconds( 20 ) ).around( neo4j ); private Bootstrap bootstrap; @Before - public void setUp() throws Exception + public void setUp() { bootstrap = BootstrapFactory.newBootstrap( 1 ); } @After - public void tearDown() throws Exception + public void tearDown() { if ( bootstrap != null ) { @@ -75,7 +85,7 @@ public void tearDown() throws Exception @Test public void shouldConnect() throws Exception { - ChannelConnectorImpl connector = newConnector( neo4j.authToken() ); + ChannelConnector connector = newConnector( neo4j.authToken() ); ChannelFuture channelFuture = connector.connect( neo4j.address(), bootstrap ); assertTrue( channelFuture.await( 10, TimeUnit.SECONDS ) ); @@ -85,10 +95,26 @@ public void shouldConnect() throws Exception assertTrue( channel.isActive() ); } + @Test + public void shouldSetupHandlers() throws Exception + { + ChannelConnector connector = newConnector( neo4j.authToken(), SecurityPlan.forAllCertificates(), 10_000 ); + + ChannelFuture channelFuture = connector.connect( neo4j.address(), bootstrap ); + assertTrue( channelFuture.await( 10, TimeUnit.SECONDS ) ); + + Channel channel = channelFuture.channel(); + ChannelPipeline pipeline = channel.pipeline(); + assertTrue( channel.isActive() ); + + assertNotNull( pipeline.get( SslHandler.class ) ); + assertNull( pipeline.get( ConnectTimeoutHandler.class ) ); + } + @Test public void shouldFailToConnectToWrongAddress() throws Exception { - ChannelConnectorImpl connector = newConnector( neo4j.authToken() ); + ChannelConnector connector = newConnector( neo4j.authToken() ); ChannelFuture channelFuture = connector.connect( new BoltServerAddress( "wrong-localhost" ), bootstrap ); assertTrue( channelFuture.await( 10, TimeUnit.SECONDS ) ); @@ -112,7 +138,7 @@ public void shouldFailToConnectToWrongAddress() throws Exception public void shouldFailToConnectWithWrongCredentials() throws Exception { AuthToken authToken = AuthTokens.basic( "neo4j", "wrong-password" ); - ChannelConnectorImpl connector = newConnector( authToken ); + ChannelConnector connector = newConnector( authToken ); ChannelFuture channelFuture = connector.connect( neo4j.address(), bootstrap ); assertTrue( channelFuture.await( 10, TimeUnit.SECONDS ) ); @@ -131,10 +157,10 @@ public void shouldFailToConnectWithWrongCredentials() throws Exception assertFalse( channel.isActive() ); } - @Test( timeout = 10000 ) + @Test public void shouldEnforceConnectTimeout() throws Exception { - ChannelConnectorImpl connector = newConnector( neo4j.authToken(), 1000 ); + ChannelConnector connector = newConnector( neo4j.authToken(), 1000 ); // try connect to a non-routable ip address 10.0.0.0, it will never respond ChannelFuture channelFuture = connector.connect( new BoltServerAddress( "10.0.0.0" ), bootstrap ); @@ -151,6 +177,41 @@ public void shouldEnforceConnectTimeout() throws Exception } } + @Test + public void shouldFailWhenProtocolNegotiationTakesTooLong() throws Exception + { + // run without TLS so that Bolt handshake is the very first operation after connection is established + testReadTimeoutOnConnect( SecurityPlan.insecure() ); + } + + @Test + public void shouldFailWhenTLSHandshakeTakesTooLong() throws Exception + { + // run with TLS so that TLS handshake is the very first operation after connection is established + testReadTimeoutOnConnect( SecurityPlan.forAllCertificates() ); + } + + private void testReadTimeoutOnConnect( SecurityPlan securityPlan ) throws IOException + { + try ( ServerSocket server = new ServerSocket( 0 ) ) // server that accepts connections but does not reply + { + int timeoutMillis = 1_000; + BoltServerAddress address = new BoltServerAddress( "localhost", server.getLocalPort() ); + ChannelConnector connector = newConnector( neo4j.authToken(), securityPlan, timeoutMillis ); + + ChannelFuture channelFuture = connector.connect( address, bootstrap ); + try + { + await( channelFuture ); + fail( "Exception expected" ); + } + catch ( ServiceUnavailableException e ) + { + assertEquals( e.getMessage(), "Unable to establish connection in " + timeoutMillis + "ms" ); + } + } + } + private ChannelConnectorImpl newConnector( AuthToken authToken ) throws Exception { return newConnector( authToken, Integer.MAX_VALUE ); @@ -158,8 +219,13 @@ private ChannelConnectorImpl newConnector( AuthToken authToken ) throws Exceptio private ChannelConnectorImpl newConnector( AuthToken authToken, int connectTimeoutMillis ) throws Exception { - ConnectionSettings settings = new ConnectionSettings( authToken, 1000 ); - return new ChannelConnectorImpl( settings, SecurityPlan.forAllCertificates(), DEV_NULL_LOGGING, - new FakeClock() ); + return newConnector( authToken, SecurityPlan.forAllCertificates(), connectTimeoutMillis ); + } + + private ChannelConnectorImpl newConnector( AuthToken authToken, SecurityPlan securityPlan, + int connectTimeoutMillis ) + { + ConnectionSettings settings = new ConnectionSettings( authToken, connectTimeoutMillis ); + return new ChannelConnectorImpl( settings, securityPlan, DEV_NULL_LOGGING, new FakeClock() ); } } diff --git a/driver/src/test/java/org/neo4j/driver/internal/async/NettyChannelInitializerTest.java b/driver/src/test/java/org/neo4j/driver/internal/async/NettyChannelInitializerTest.java index 0bd4ee8b8f..d2d770842d 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/async/NettyChannelInitializerTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/async/NettyChannelInitializerTest.java @@ -52,8 +52,7 @@ public void tearDown() public void shouldAddSslHandlerWhenRequiresEncryption() throws Exception { SecurityPlan security = SecurityPlan.forAllCertificates(); - NettyChannelInitializer initializer = new NettyChannelInitializer( LOCAL_DEFAULT, security, new FakeClock(), - DEV_NULL_LOGGING ); + NettyChannelInitializer initializer = newInitializer( security ); initializer.initChannel( channel ); @@ -64,21 +63,34 @@ public void shouldAddSslHandlerWhenRequiresEncryption() throws Exception public void shouldNotAddSslHandlerWhenDoesNotRequireEncryption() { SecurityPlan security = SecurityPlan.insecure(); - NettyChannelInitializer initializer = new NettyChannelInitializer( LOCAL_DEFAULT, security, new FakeClock(), - DEV_NULL_LOGGING ); + NettyChannelInitializer initializer = newInitializer( security ); initializer.initChannel( channel ); assertNull( channel.pipeline().get( SslHandler.class ) ); } + @Test + public void shouldAddSslHandlerWithHandshakeTimeout() throws Exception + { + int timeoutMillis = 424242; + SecurityPlan security = SecurityPlan.forAllCertificates(); + NettyChannelInitializer initializer = newInitializer( security, timeoutMillis ); + + initializer.initChannel( channel ); + + SslHandler sslHandler = channel.pipeline().get( SslHandler.class ); + assertNotNull( sslHandler ); + assertEquals( timeoutMillis, sslHandler.getHandshakeTimeoutMillis() ); + } + @Test public void shouldUpdateChannelAttributes() { Clock clock = mock( Clock.class ); when( clock.millis() ).thenReturn( 42L ); SecurityPlan security = SecurityPlan.insecure(); - NettyChannelInitializer initializer = new NettyChannelInitializer( LOCAL_DEFAULT, security, clock, DEV_NULL_LOGGING ); + NettyChannelInitializer initializer = newInitializer( security, Integer.MAX_VALUE, clock ); initializer.initChannel( channel ); @@ -86,4 +98,21 @@ public void shouldUpdateChannelAttributes() assertEquals( 42L, creationTimestamp( channel ) ); assertNotNull( messageDispatcher( channel ) ); } + + private static NettyChannelInitializer newInitializer( SecurityPlan securityPlan ) + { + return newInitializer( securityPlan, Integer.MAX_VALUE ); + } + + private static NettyChannelInitializer newInitializer( SecurityPlan securityPlan, int connectTimeoutMillis ) + { + return newInitializer( securityPlan, connectTimeoutMillis, new FakeClock() ); + } + + private static NettyChannelInitializer newInitializer( SecurityPlan securityPlan, int connectTimeoutMillis, + Clock clock ) + { + return new NettyChannelInitializer( LOCAL_DEFAULT, securityPlan, connectTimeoutMillis, clock, + DEV_NULL_LOGGING ); + } } diff --git a/driver/src/test/java/org/neo4j/driver/internal/async/inbound/ConnectTimeoutHandlerTest.java b/driver/src/test/java/org/neo4j/driver/internal/async/inbound/ConnectTimeoutHandlerTest.java new file mode 100644 index 0000000000..ed7ab5227f --- /dev/null +++ b/driver/src/test/java/org/neo4j/driver/internal/async/inbound/ConnectTimeoutHandlerTest.java @@ -0,0 +1,88 @@ +/* + * Copyright (c) 2002-2017 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.internal.async.inbound; + +import io.netty.channel.embedded.EmbeddedChannel; +import org.junit.After; +import org.junit.Test; + +import org.neo4j.driver.v1.exceptions.ServiceUnavailableException; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +public class ConnectTimeoutHandlerTest +{ + private final EmbeddedChannel channel = new EmbeddedChannel(); + + @After + public void tearDown() + { + channel.finishAndReleaseAll(); + } + + @Test + public void shouldFireExceptionOnTimeout() throws Exception + { + int timeoutMillis = 100; + channel.pipeline().addLast( new ConnectTimeoutHandler( timeoutMillis ) ); + + // sleep for more than the timeout value + Thread.sleep( timeoutMillis * 4 ); + channel.runPendingTasks(); + + try + { + channel.checkException(); + fail( "Exception expected" ); + } + catch ( ServiceUnavailableException e ) + { + assertEquals( e.getMessage(), "Unable to establish connection in " + timeoutMillis + "ms" ); + } + } + + @Test + public void shouldNotFireExceptionMultipleTimes() throws Exception + { + int timeoutMillis = 70; + channel.pipeline().addLast( new ConnectTimeoutHandler( timeoutMillis ) ); + + // sleep for more than the timeout value + Thread.sleep( timeoutMillis * 4 ); + channel.runPendingTasks(); + + try + { + channel.checkException(); + fail( "Exception expected" ); + } + catch ( ServiceUnavailableException e ) + { + assertEquals( e.getMessage(), "Unable to establish connection in " + timeoutMillis + "ms" ); + } + + // sleep even more + Thread.sleep( timeoutMillis * 4 ); + channel.runPendingTasks(); + + // no more exceptions should occur + channel.checkException(); + } +} diff --git a/driver/src/test/java/org/neo4j/driver/v1/GraphDatabaseTest.java b/driver/src/test/java/org/neo4j/driver/v1/GraphDatabaseTest.java index b31ebd61c7..eb6374d9cc 100644 --- a/driver/src/test/java/org/neo4j/driver/v1/GraphDatabaseTest.java +++ b/driver/src/test/java/org/neo4j/driver/v1/GraphDatabaseTest.java @@ -21,6 +21,7 @@ import org.junit.Test; import java.io.File; +import java.io.IOException; import java.net.ServerSocket; import java.net.URI; import java.util.List; @@ -30,6 +31,7 @@ import org.neo4j.driver.v1.util.TestUtil; import static java.util.Arrays.asList; +import static java.util.concurrent.TimeUnit.MILLISECONDS; import static org.hamcrest.Matchers.is; import static org.hamcrest.core.IsEqual.equalTo; import static org.hamcrest.core.IsInstanceOf.instanceOf; @@ -42,6 +44,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import static org.neo4j.driver.internal.logging.DevNullLogging.DEV_NULL_LOGGING; import static org.neo4j.driver.internal.util.Matchers.clusterDriver; import static org.neo4j.driver.internal.util.Matchers.directDriver; import static org.neo4j.driver.v1.Config.TrustStrategy.trustOnFirstUse; @@ -182,4 +185,53 @@ public void shouldRespondToInterruptsWhenConnectingToUnresponsiveServer() throws } } } + + @Test + public void shouldFailToCreateUnencryptedDriverWhenServerDoesNotRespond() throws IOException + { + testFailureWhenServerDoesNotRespond( false ); + } + + @Test + public void shouldFailToCreateEncryptedDriverWhenServerDoesNotRespond() throws IOException + { + testFailureWhenServerDoesNotRespond( true ); + } + + private static void testFailureWhenServerDoesNotRespond( boolean encrypted ) throws IOException + { + try ( ServerSocket server = new ServerSocket( 0 ) ) // server that accepts connections but does not reply + { + int connectionTimeoutMillis = 1_000; + Config config = createConfig( encrypted, connectionTimeoutMillis ); + + try + { + GraphDatabase.driver( URI.create( "bolt://localhost:" + server.getLocalPort() ), config ); + fail( "Exception expected" ); + } + catch ( ServiceUnavailableException e ) + { + assertEquals( e.getMessage(), "Unable to establish connection in " + connectionTimeoutMillis + "ms" ); + } + } + } + + private static Config createConfig( boolean encrypted, int timeoutMillis ) + { + Config.ConfigBuilder configBuilder = Config.build() + .withConnectionTimeout( timeoutMillis, MILLISECONDS ) + .withLogging( DEV_NULL_LOGGING ); + + if ( encrypted ) + { + configBuilder.withEncryption(); + } + else + { + configBuilder.withoutEncryption(); + } + + return configBuilder.toConfig(); + } } diff --git a/driver/src/test/java/org/neo4j/driver/v1/integration/SessionIT.java b/driver/src/test/java/org/neo4j/driver/v1/integration/SessionIT.java index 6b9ef62f1f..6826369508 100644 --- a/driver/src/test/java/org/neo4j/driver/v1/integration/SessionIT.java +++ b/driver/src/test/java/org/neo4j/driver/v1/integration/SessionIT.java @@ -42,7 +42,6 @@ import org.neo4j.driver.internal.DriverFactory; import org.neo4j.driver.internal.cluster.RoutingContext; import org.neo4j.driver.internal.cluster.RoutingSettings; -import org.neo4j.driver.internal.logging.DevNullLogging; import org.neo4j.driver.internal.retry.RetrySettings; import org.neo4j.driver.internal.util.DriverFactoryWithFixedRetryLogic; import org.neo4j.driver.internal.util.DriverFactoryWithOneEventLoopThread; @@ -92,6 +91,7 @@ import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.neo4j.driver.internal.logging.DevNullLogging.DEV_NULL_LOGGING; import static org.neo4j.driver.internal.util.Matchers.arithmeticError; import static org.neo4j.driver.internal.util.Matchers.connectionAcquisitionTimeoutError; import static org.neo4j.driver.internal.util.ServerVersion.v3_1_0; @@ -1486,6 +1486,49 @@ public void shouldBeResponsiveToThreadInterruptWhenWaitingForResult() throws Exc } } + @Test + public void shouldAllowLongRunningQueryWithConnectTimeout() throws Exception + { + int connectionTimeoutMs = 3_000; + Config config = Config.build() + .withLogging( DEV_NULL_LOGGING ) + .withConnectionTimeout( connectionTimeoutMs, TimeUnit.MILLISECONDS ) + .toConfig(); + + try ( Driver driver = GraphDatabase.driver( neo4j.uri(), neo4j.authToken(), config ) ) + { + Session session1 = driver.session(); + Session session2 = driver.session(); + + session1.run( "CREATE (:Avenger {name: 'Hulk'})" ).consume(); + + Transaction tx = session1.beginTransaction(); + tx.run( "MATCH (a:Avenger {name: 'Hulk'}) SET a.power = 100 RETURN a" ).consume(); + + // Hulk node is now locked + + CountDownLatch latch = new CountDownLatch( 1 ); + Future updateFuture = executeInDifferentThread( () -> + { + latch.countDown(); + return session2.run( "MATCH (a:Avenger {name: 'Hulk'}) SET a.weight = 1000 RETURN a.power" ) + .single().get( 0 ).asLong(); + } ); + + latch.await(); + // sleep more than connection timeout + Thread.sleep( connectionTimeoutMs + 1_000 ); + // verify that query is still executing and has not failed because of the read timeout + assertFalse( updateFuture.isDone() ); + + tx.success(); + tx.close(); + + long hulkPower = updateFuture.get( 10, TimeUnit.SECONDS ); + assertEquals( 100, hulkPower ); + } + } + private void assumeServerIs31OrLater() { ServerVersion serverVersion = ServerVersion.version( neo4j.driver() ); @@ -1634,7 +1677,7 @@ private Driver newDriverWithFixedRetries( int maxRetriesCount ) private Driver newDriverWithLimitedRetries( int maxTxRetryTime, TimeUnit unit ) { Config config = Config.build() - .withLogging( DevNullLogging.DEV_NULL_LOGGING ) + .withLogging( DEV_NULL_LOGGING ) .withMaxTransactionRetryTime( maxTxRetryTime, unit ) .toConfig(); return GraphDatabase.driver( neo4j.uri(), neo4j.authToken(), config ); @@ -1642,7 +1685,7 @@ private Driver newDriverWithLimitedRetries( int maxTxRetryTime, TimeUnit unit ) private static Config noLoggingConfig() { - return Config.build().withLogging( DevNullLogging.DEV_NULL_LOGGING ).toConfig(); + return Config.build().withLogging( DEV_NULL_LOGGING ).toConfig(); } private static ThrowingWork newThrowingWorkSpy( String query, int failures )