diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/ChannelConnectorImpl.java b/driver/src/main/java/org/neo4j/driver/internal/async/ChannelConnectorImpl.java index 5f7b244288..12c41c80f4 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/async/ChannelConnectorImpl.java +++ b/driver/src/main/java/org/neo4j/driver/internal/async/ChannelConnectorImpl.java @@ -22,12 +22,14 @@ import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelOption; +import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelPromise; import java.util.Map; import org.neo4j.driver.internal.BoltServerAddress; import org.neo4j.driver.internal.ConnectionSettings; +import org.neo4j.driver.internal.async.inbound.ConnectTimeoutHandler; import org.neo4j.driver.internal.security.InternalAuthToken; import org.neo4j.driver.internal.security.SecurityPlan; import org.neo4j.driver.internal.util.Clock; @@ -71,7 +73,7 @@ public ChannelConnectorImpl( ConnectionSettings connectionSettings, SecurityPlan public ChannelFuture connect( BoltServerAddress address, Bootstrap bootstrap ) { bootstrap.option( ChannelOption.CONNECT_TIMEOUT_MILLIS, connectTimeoutMillis ); - bootstrap.handler( new NettyChannelInitializer( address, securityPlan, clock, logging ) ); + bootstrap.handler( new NettyChannelInitializer( address, securityPlan, connectTimeoutMillis, clock, logging ) ); ChannelFuture channelConnected = bootstrap.connect( address.toSocketAddress() ); @@ -79,12 +81,39 @@ public ChannelFuture connect( BoltServerAddress address, Bootstrap bootstrap ) ChannelPromise handshakeCompleted = channel.newPromise(); ChannelPromise connectionInitialized = channel.newPromise(); + installChannelConnectedListeners( address, channelConnected, handshakeCompleted ); + installHandshakeCompletedListeners( handshakeCompleted, connectionInitialized ); + + return connectionInitialized; + } + + private void installChannelConnectedListeners( BoltServerAddress address, ChannelFuture channelConnected, + ChannelPromise handshakeCompleted ) + { + ChannelPipeline pipeline = channelConnected.channel().pipeline(); + + // add timeout handler to the pipeline when channel is connected. it's needed to limit amount of time code + // spends in TLS and Bolt handshakes. prevents infinite waiting when database does not respond + channelConnected.addListener( future -> + pipeline.addFirst( new ConnectTimeoutHandler( connectTimeoutMillis ) ) ); + + // add listener that sends Bolt handshake bytes when channel is connected channelConnected.addListener( new ChannelConnectedListener( address, pipelineBuilder, handshakeCompleted, logging ) ); - handshakeCompleted.addListener( - new HandshakeCompletedListener( userAgent, authToken, connectionInitialized ) ); + } - return connectionInitialized; + private void installHandshakeCompletedListeners( ChannelPromise handshakeCompleted, + ChannelPromise connectionInitialized ) + { + ChannelPipeline pipeline = handshakeCompleted.channel().pipeline(); + + // remove timeout handler from the pipeline once TLS and Bolt handshakes are completed. regular protocol + // messages will flow next and we do not want to have read timeout for them + handshakeCompleted.addListener( future -> pipeline.remove( ConnectTimeoutHandler.class ) ); + + // add listener that sends an INIT message. connection is now fully established. channel pipeline if fully + // set to send/receive messages for a selected protocol version + handshakeCompleted.addListener( new HandshakeCompletedListener( userAgent, authToken, connectionInitialized ) ); } private static Map tokenAsMap( AuthToken token ) diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/NettyChannelInitializer.java b/driver/src/main/java/org/neo4j/driver/internal/async/NettyChannelInitializer.java index c025cb0ba5..233464cbdc 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/async/NettyChannelInitializer.java +++ b/driver/src/main/java/org/neo4j/driver/internal/async/NettyChannelInitializer.java @@ -39,13 +39,16 @@ public class NettyChannelInitializer extends ChannelInitializer { private final BoltServerAddress address; private final SecurityPlan securityPlan; + private final int connectTimeoutMillis; private final Clock clock; private final Logging logging; - public NettyChannelInitializer( BoltServerAddress address, SecurityPlan securityPlan, Clock clock, Logging logging ) + public NettyChannelInitializer( BoltServerAddress address, SecurityPlan securityPlan, int connectTimeoutMillis, + Clock clock, Logging logging ) { this.address = address; this.securityPlan = securityPlan; + this.connectTimeoutMillis = connectTimeoutMillis; this.clock = clock; this.logging = logging; } @@ -65,7 +68,9 @@ protected void initChannel( Channel channel ) private SslHandler createSslHandler() { SSLEngine sslEngine = createSslEngine(); - return new SslHandler( sslEngine ); + SslHandler sslHandler = new SslHandler( sslEngine ); + sslHandler.setHandshakeTimeoutMillis( connectTimeoutMillis ); + return sslHandler; } private SSLEngine createSslEngine() diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/inbound/ConnectTimeoutHandler.java b/driver/src/main/java/org/neo4j/driver/internal/async/inbound/ConnectTimeoutHandler.java new file mode 100644 index 0000000000..2980cb814b --- /dev/null +++ b/driver/src/main/java/org/neo4j/driver/internal/async/inbound/ConnectTimeoutHandler.java @@ -0,0 +1,58 @@ +/* + * Copyright (c) 2002-2017 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.internal.async.inbound; + +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.timeout.ReadTimeoutHandler; + +import java.util.concurrent.TimeUnit; + +import org.neo4j.driver.v1.exceptions.ServiceUnavailableException; + +/** + * Handler needed to limit amount of time connection performs TLS and Bolt handshakes. + * It should only be used when connection is established and removed from the pipeline afterwards. + * Otherwise it will make long running queries fail. + */ +public class ConnectTimeoutHandler extends ReadTimeoutHandler +{ + private final long timeoutMillis; + private boolean triggered; + + public ConnectTimeoutHandler( long timeoutMillis ) + { + super( timeoutMillis, TimeUnit.MILLISECONDS ); + this.timeoutMillis = timeoutMillis; + } + + @Override + protected void readTimedOut( ChannelHandlerContext ctx ) + { + if ( !triggered ) + { + triggered = true; + ctx.fireExceptionCaught( unableToConnectError() ); + } + } + + private ServiceUnavailableException unableToConnectError() + { + return new ServiceUnavailableException( "Unable to establish connection in " + timeoutMillis + "ms" ); + } +} diff --git a/driver/src/test/java/org/neo4j/driver/internal/async/ChannelConnectorImplTest.java b/driver/src/test/java/org/neo4j/driver/internal/async/ChannelConnectorImplTest.java index 3aaf496c50..3780f311c9 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/async/ChannelConnectorImplTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/async/ChannelConnectorImplTest.java @@ -21,17 +21,24 @@ import io.netty.bootstrap.Bootstrap; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelPipeline; +import io.netty.handler.ssl.SslHandler; import org.junit.After; import org.junit.Before; import org.junit.Rule; import org.junit.Test; +import org.junit.rules.RuleChain; +import org.junit.rules.Timeout; +import java.io.IOException; import java.net.ConnectException; +import java.net.ServerSocket; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import org.neo4j.driver.internal.BoltServerAddress; import org.neo4j.driver.internal.ConnectionSettings; +import org.neo4j.driver.internal.async.inbound.ConnectTimeoutHandler; import org.neo4j.driver.internal.security.SecurityPlan; import org.neo4j.driver.internal.util.FakeClock; import org.neo4j.driver.v1.AuthToken; @@ -42,7 +49,9 @@ import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.startsWith; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; @@ -52,19 +61,20 @@ public class ChannelConnectorImplTest { + private final TestNeo4j neo4j = new TestNeo4j(); @Rule - public final TestNeo4j neo4j = new TestNeo4j(); + public final RuleChain ruleChain = RuleChain.outerRule( Timeout.seconds( 20 ) ).around( neo4j ); private Bootstrap bootstrap; @Before - public void setUp() throws Exception + public void setUp() { bootstrap = BootstrapFactory.newBootstrap( 1 ); } @After - public void tearDown() throws Exception + public void tearDown() { if ( bootstrap != null ) { @@ -75,7 +85,7 @@ public void tearDown() throws Exception @Test public void shouldConnect() throws Exception { - ChannelConnectorImpl connector = newConnector( neo4j.authToken() ); + ChannelConnector connector = newConnector( neo4j.authToken() ); ChannelFuture channelFuture = connector.connect( neo4j.address(), bootstrap ); assertTrue( channelFuture.await( 10, TimeUnit.SECONDS ) ); @@ -85,10 +95,26 @@ public void shouldConnect() throws Exception assertTrue( channel.isActive() ); } + @Test + public void shouldSetupHandlers() throws Exception + { + ChannelConnector connector = newConnector( neo4j.authToken(), SecurityPlan.forAllCertificates(), 10_000 ); + + ChannelFuture channelFuture = connector.connect( neo4j.address(), bootstrap ); + assertTrue( channelFuture.await( 10, TimeUnit.SECONDS ) ); + + Channel channel = channelFuture.channel(); + ChannelPipeline pipeline = channel.pipeline(); + assertTrue( channel.isActive() ); + + assertNotNull( pipeline.get( SslHandler.class ) ); + assertNull( pipeline.get( ConnectTimeoutHandler.class ) ); + } + @Test public void shouldFailToConnectToWrongAddress() throws Exception { - ChannelConnectorImpl connector = newConnector( neo4j.authToken() ); + ChannelConnector connector = newConnector( neo4j.authToken() ); ChannelFuture channelFuture = connector.connect( new BoltServerAddress( "wrong-localhost" ), bootstrap ); assertTrue( channelFuture.await( 10, TimeUnit.SECONDS ) ); @@ -112,7 +138,7 @@ public void shouldFailToConnectToWrongAddress() throws Exception public void shouldFailToConnectWithWrongCredentials() throws Exception { AuthToken authToken = AuthTokens.basic( "neo4j", "wrong-password" ); - ChannelConnectorImpl connector = newConnector( authToken ); + ChannelConnector connector = newConnector( authToken ); ChannelFuture channelFuture = connector.connect( neo4j.address(), bootstrap ); assertTrue( channelFuture.await( 10, TimeUnit.SECONDS ) ); @@ -131,10 +157,10 @@ public void shouldFailToConnectWithWrongCredentials() throws Exception assertFalse( channel.isActive() ); } - @Test( timeout = 10000 ) + @Test public void shouldEnforceConnectTimeout() throws Exception { - ChannelConnectorImpl connector = newConnector( neo4j.authToken(), 1000 ); + ChannelConnector connector = newConnector( neo4j.authToken(), 1000 ); // try connect to a non-routable ip address 10.0.0.0, it will never respond ChannelFuture channelFuture = connector.connect( new BoltServerAddress( "10.0.0.0" ), bootstrap ); @@ -151,6 +177,41 @@ public void shouldEnforceConnectTimeout() throws Exception } } + @Test + public void shouldFailWhenProtocolNegotiationTakesTooLong() throws Exception + { + // run without TLS so that Bolt handshake is the very first operation after connection is established + testReadTimeoutOnConnect( SecurityPlan.insecure() ); + } + + @Test + public void shouldFailWhenTLSHandshakeTakesTooLong() throws Exception + { + // run with TLS so that TLS handshake is the very first operation after connection is established + testReadTimeoutOnConnect( SecurityPlan.forAllCertificates() ); + } + + private void testReadTimeoutOnConnect( SecurityPlan securityPlan ) throws IOException + { + try ( ServerSocket server = new ServerSocket( 0 ) ) // server that accepts connections but does not reply + { + int timeoutMillis = 1_000; + BoltServerAddress address = new BoltServerAddress( "localhost", server.getLocalPort() ); + ChannelConnector connector = newConnector( neo4j.authToken(), securityPlan, timeoutMillis ); + + ChannelFuture channelFuture = connector.connect( address, bootstrap ); + try + { + await( channelFuture ); + fail( "Exception expected" ); + } + catch ( ServiceUnavailableException e ) + { + assertEquals( e.getMessage(), "Unable to establish connection in " + timeoutMillis + "ms" ); + } + } + } + private ChannelConnectorImpl newConnector( AuthToken authToken ) throws Exception { return newConnector( authToken, Integer.MAX_VALUE ); @@ -158,8 +219,13 @@ private ChannelConnectorImpl newConnector( AuthToken authToken ) throws Exceptio private ChannelConnectorImpl newConnector( AuthToken authToken, int connectTimeoutMillis ) throws Exception { - ConnectionSettings settings = new ConnectionSettings( authToken, 1000 ); - return new ChannelConnectorImpl( settings, SecurityPlan.forAllCertificates(), DEV_NULL_LOGGING, - new FakeClock() ); + return newConnector( authToken, SecurityPlan.forAllCertificates(), connectTimeoutMillis ); + } + + private ChannelConnectorImpl newConnector( AuthToken authToken, SecurityPlan securityPlan, + int connectTimeoutMillis ) + { + ConnectionSettings settings = new ConnectionSettings( authToken, connectTimeoutMillis ); + return new ChannelConnectorImpl( settings, securityPlan, DEV_NULL_LOGGING, new FakeClock() ); } } diff --git a/driver/src/test/java/org/neo4j/driver/internal/async/NettyChannelInitializerTest.java b/driver/src/test/java/org/neo4j/driver/internal/async/NettyChannelInitializerTest.java index 0bd4ee8b8f..d2d770842d 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/async/NettyChannelInitializerTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/async/NettyChannelInitializerTest.java @@ -52,8 +52,7 @@ public void tearDown() public void shouldAddSslHandlerWhenRequiresEncryption() throws Exception { SecurityPlan security = SecurityPlan.forAllCertificates(); - NettyChannelInitializer initializer = new NettyChannelInitializer( LOCAL_DEFAULT, security, new FakeClock(), - DEV_NULL_LOGGING ); + NettyChannelInitializer initializer = newInitializer( security ); initializer.initChannel( channel ); @@ -64,21 +63,34 @@ public void shouldAddSslHandlerWhenRequiresEncryption() throws Exception public void shouldNotAddSslHandlerWhenDoesNotRequireEncryption() { SecurityPlan security = SecurityPlan.insecure(); - NettyChannelInitializer initializer = new NettyChannelInitializer( LOCAL_DEFAULT, security, new FakeClock(), - DEV_NULL_LOGGING ); + NettyChannelInitializer initializer = newInitializer( security ); initializer.initChannel( channel ); assertNull( channel.pipeline().get( SslHandler.class ) ); } + @Test + public void shouldAddSslHandlerWithHandshakeTimeout() throws Exception + { + int timeoutMillis = 424242; + SecurityPlan security = SecurityPlan.forAllCertificates(); + NettyChannelInitializer initializer = newInitializer( security, timeoutMillis ); + + initializer.initChannel( channel ); + + SslHandler sslHandler = channel.pipeline().get( SslHandler.class ); + assertNotNull( sslHandler ); + assertEquals( timeoutMillis, sslHandler.getHandshakeTimeoutMillis() ); + } + @Test public void shouldUpdateChannelAttributes() { Clock clock = mock( Clock.class ); when( clock.millis() ).thenReturn( 42L ); SecurityPlan security = SecurityPlan.insecure(); - NettyChannelInitializer initializer = new NettyChannelInitializer( LOCAL_DEFAULT, security, clock, DEV_NULL_LOGGING ); + NettyChannelInitializer initializer = newInitializer( security, Integer.MAX_VALUE, clock ); initializer.initChannel( channel ); @@ -86,4 +98,21 @@ public void shouldUpdateChannelAttributes() assertEquals( 42L, creationTimestamp( channel ) ); assertNotNull( messageDispatcher( channel ) ); } + + private static NettyChannelInitializer newInitializer( SecurityPlan securityPlan ) + { + return newInitializer( securityPlan, Integer.MAX_VALUE ); + } + + private static NettyChannelInitializer newInitializer( SecurityPlan securityPlan, int connectTimeoutMillis ) + { + return newInitializer( securityPlan, connectTimeoutMillis, new FakeClock() ); + } + + private static NettyChannelInitializer newInitializer( SecurityPlan securityPlan, int connectTimeoutMillis, + Clock clock ) + { + return new NettyChannelInitializer( LOCAL_DEFAULT, securityPlan, connectTimeoutMillis, clock, + DEV_NULL_LOGGING ); + } } diff --git a/driver/src/test/java/org/neo4j/driver/internal/async/inbound/ConnectTimeoutHandlerTest.java b/driver/src/test/java/org/neo4j/driver/internal/async/inbound/ConnectTimeoutHandlerTest.java new file mode 100644 index 0000000000..ed7ab5227f --- /dev/null +++ b/driver/src/test/java/org/neo4j/driver/internal/async/inbound/ConnectTimeoutHandlerTest.java @@ -0,0 +1,88 @@ +/* + * Copyright (c) 2002-2017 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.internal.async.inbound; + +import io.netty.channel.embedded.EmbeddedChannel; +import org.junit.After; +import org.junit.Test; + +import org.neo4j.driver.v1.exceptions.ServiceUnavailableException; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +public class ConnectTimeoutHandlerTest +{ + private final EmbeddedChannel channel = new EmbeddedChannel(); + + @After + public void tearDown() + { + channel.finishAndReleaseAll(); + } + + @Test + public void shouldFireExceptionOnTimeout() throws Exception + { + int timeoutMillis = 100; + channel.pipeline().addLast( new ConnectTimeoutHandler( timeoutMillis ) ); + + // sleep for more than the timeout value + Thread.sleep( timeoutMillis * 4 ); + channel.runPendingTasks(); + + try + { + channel.checkException(); + fail( "Exception expected" ); + } + catch ( ServiceUnavailableException e ) + { + assertEquals( e.getMessage(), "Unable to establish connection in " + timeoutMillis + "ms" ); + } + } + + @Test + public void shouldNotFireExceptionMultipleTimes() throws Exception + { + int timeoutMillis = 70; + channel.pipeline().addLast( new ConnectTimeoutHandler( timeoutMillis ) ); + + // sleep for more than the timeout value + Thread.sleep( timeoutMillis * 4 ); + channel.runPendingTasks(); + + try + { + channel.checkException(); + fail( "Exception expected" ); + } + catch ( ServiceUnavailableException e ) + { + assertEquals( e.getMessage(), "Unable to establish connection in " + timeoutMillis + "ms" ); + } + + // sleep even more + Thread.sleep( timeoutMillis * 4 ); + channel.runPendingTasks(); + + // no more exceptions should occur + channel.checkException(); + } +} diff --git a/driver/src/test/java/org/neo4j/driver/v1/GraphDatabaseTest.java b/driver/src/test/java/org/neo4j/driver/v1/GraphDatabaseTest.java index b31ebd61c7..eb6374d9cc 100644 --- a/driver/src/test/java/org/neo4j/driver/v1/GraphDatabaseTest.java +++ b/driver/src/test/java/org/neo4j/driver/v1/GraphDatabaseTest.java @@ -21,6 +21,7 @@ import org.junit.Test; import java.io.File; +import java.io.IOException; import java.net.ServerSocket; import java.net.URI; import java.util.List; @@ -30,6 +31,7 @@ import org.neo4j.driver.v1.util.TestUtil; import static java.util.Arrays.asList; +import static java.util.concurrent.TimeUnit.MILLISECONDS; import static org.hamcrest.Matchers.is; import static org.hamcrest.core.IsEqual.equalTo; import static org.hamcrest.core.IsInstanceOf.instanceOf; @@ -42,6 +44,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import static org.neo4j.driver.internal.logging.DevNullLogging.DEV_NULL_LOGGING; import static org.neo4j.driver.internal.util.Matchers.clusterDriver; import static org.neo4j.driver.internal.util.Matchers.directDriver; import static org.neo4j.driver.v1.Config.TrustStrategy.trustOnFirstUse; @@ -182,4 +185,53 @@ public void shouldRespondToInterruptsWhenConnectingToUnresponsiveServer() throws } } } + + @Test + public void shouldFailToCreateUnencryptedDriverWhenServerDoesNotRespond() throws IOException + { + testFailureWhenServerDoesNotRespond( false ); + } + + @Test + public void shouldFailToCreateEncryptedDriverWhenServerDoesNotRespond() throws IOException + { + testFailureWhenServerDoesNotRespond( true ); + } + + private static void testFailureWhenServerDoesNotRespond( boolean encrypted ) throws IOException + { + try ( ServerSocket server = new ServerSocket( 0 ) ) // server that accepts connections but does not reply + { + int connectionTimeoutMillis = 1_000; + Config config = createConfig( encrypted, connectionTimeoutMillis ); + + try + { + GraphDatabase.driver( URI.create( "bolt://localhost:" + server.getLocalPort() ), config ); + fail( "Exception expected" ); + } + catch ( ServiceUnavailableException e ) + { + assertEquals( e.getMessage(), "Unable to establish connection in " + connectionTimeoutMillis + "ms" ); + } + } + } + + private static Config createConfig( boolean encrypted, int timeoutMillis ) + { + Config.ConfigBuilder configBuilder = Config.build() + .withConnectionTimeout( timeoutMillis, MILLISECONDS ) + .withLogging( DEV_NULL_LOGGING ); + + if ( encrypted ) + { + configBuilder.withEncryption(); + } + else + { + configBuilder.withoutEncryption(); + } + + return configBuilder.toConfig(); + } } diff --git a/driver/src/test/java/org/neo4j/driver/v1/integration/SessionIT.java b/driver/src/test/java/org/neo4j/driver/v1/integration/SessionIT.java index 6b9ef62f1f..6826369508 100644 --- a/driver/src/test/java/org/neo4j/driver/v1/integration/SessionIT.java +++ b/driver/src/test/java/org/neo4j/driver/v1/integration/SessionIT.java @@ -42,7 +42,6 @@ import org.neo4j.driver.internal.DriverFactory; import org.neo4j.driver.internal.cluster.RoutingContext; import org.neo4j.driver.internal.cluster.RoutingSettings; -import org.neo4j.driver.internal.logging.DevNullLogging; import org.neo4j.driver.internal.retry.RetrySettings; import org.neo4j.driver.internal.util.DriverFactoryWithFixedRetryLogic; import org.neo4j.driver.internal.util.DriverFactoryWithOneEventLoopThread; @@ -92,6 +91,7 @@ import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.neo4j.driver.internal.logging.DevNullLogging.DEV_NULL_LOGGING; import static org.neo4j.driver.internal.util.Matchers.arithmeticError; import static org.neo4j.driver.internal.util.Matchers.connectionAcquisitionTimeoutError; import static org.neo4j.driver.internal.util.ServerVersion.v3_1_0; @@ -1486,6 +1486,49 @@ public void shouldBeResponsiveToThreadInterruptWhenWaitingForResult() throws Exc } } + @Test + public void shouldAllowLongRunningQueryWithConnectTimeout() throws Exception + { + int connectionTimeoutMs = 3_000; + Config config = Config.build() + .withLogging( DEV_NULL_LOGGING ) + .withConnectionTimeout( connectionTimeoutMs, TimeUnit.MILLISECONDS ) + .toConfig(); + + try ( Driver driver = GraphDatabase.driver( neo4j.uri(), neo4j.authToken(), config ) ) + { + Session session1 = driver.session(); + Session session2 = driver.session(); + + session1.run( "CREATE (:Avenger {name: 'Hulk'})" ).consume(); + + Transaction tx = session1.beginTransaction(); + tx.run( "MATCH (a:Avenger {name: 'Hulk'}) SET a.power = 100 RETURN a" ).consume(); + + // Hulk node is now locked + + CountDownLatch latch = new CountDownLatch( 1 ); + Future updateFuture = executeInDifferentThread( () -> + { + latch.countDown(); + return session2.run( "MATCH (a:Avenger {name: 'Hulk'}) SET a.weight = 1000 RETURN a.power" ) + .single().get( 0 ).asLong(); + } ); + + latch.await(); + // sleep more than connection timeout + Thread.sleep( connectionTimeoutMs + 1_000 ); + // verify that query is still executing and has not failed because of the read timeout + assertFalse( updateFuture.isDone() ); + + tx.success(); + tx.close(); + + long hulkPower = updateFuture.get( 10, TimeUnit.SECONDS ); + assertEquals( 100, hulkPower ); + } + } + private void assumeServerIs31OrLater() { ServerVersion serverVersion = ServerVersion.version( neo4j.driver() ); @@ -1634,7 +1677,7 @@ private Driver newDriverWithFixedRetries( int maxRetriesCount ) private Driver newDriverWithLimitedRetries( int maxTxRetryTime, TimeUnit unit ) { Config config = Config.build() - .withLogging( DevNullLogging.DEV_NULL_LOGGING ) + .withLogging( DEV_NULL_LOGGING ) .withMaxTransactionRetryTime( maxTxRetryTime, unit ) .toConfig(); return GraphDatabase.driver( neo4j.uri(), neo4j.authToken(), config ); @@ -1642,7 +1685,7 @@ private Driver newDriverWithLimitedRetries( int maxTxRetryTime, TimeUnit unit ) private static Config noLoggingConfig() { - return Config.build().withLogging( DevNullLogging.DEV_NULL_LOGGING ).toConfig(); + return Config.build().withLogging( DEV_NULL_LOGGING ).toConfig(); } private static ThrowingWork newThrowingWorkSpy( String query, int failures )