Skip to content

Commit

Permalink
Merge pull request #484 from lutovich/1.5-tx-conn-error
Browse files Browse the repository at this point in the history
Improve fatal error propagation in commit & rollback
  • Loading branch information
lutovich authored Apr 13, 2018
2 parents 364fca4 + f2eec3f commit ef48d78
Show file tree
Hide file tree
Showing 14 changed files with 314 additions and 231 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -173,11 +173,6 @@ else if ( state == State.ROLLED_BACK )
{
return failedFuture( new ClientException( "Can't commit, transaction has been rolled back" ) );
}
else if ( state == State.TERMINATED )
{
transactionClosed( State.ROLLED_BACK );
return failedFuture( new ClientException( "Can't commit, transaction has been terminated" ) );
}
else
{
return resultCursors.retrieveNotConsumedError()
Expand All @@ -197,12 +192,6 @@ else if ( state == State.ROLLED_BACK )
{
return completedWithNull();
}
else if ( state == State.TERMINATED )
{
// no need for explicit rollback, transaction should've been rolled back by the database
transactionClosed( State.ROLLED_BACK );
return completedWithNull();
}
else
{
return resultCursors.retrieveNotConsumedError()
Expand Down Expand Up @@ -344,6 +333,11 @@ public void setBookmark( Bookmark bookmark )

private CompletionStage<Void> doCommitAsync()
{
if ( state == State.TERMINATED )
{
return failedFuture( new ClientException( "Can't commit, transaction has been terminated" ) );
}

CompletableFuture<Void> commitFuture = new CompletableFuture<>();
ResponseHandler pullAllHandler = new CommitTxResponseHandler( commitFuture, this );
connection.runAndFlush( COMMIT_QUERY, emptyMap(), NoOpResponseHandler.INSTANCE, pullAllHandler );
Expand All @@ -352,6 +346,11 @@ private CompletionStage<Void> doCommitAsync()

private CompletionStage<Void> doRollbackAsync()
{
if ( state == State.TERMINATED )
{
return completedWithNull();
}

CompletableFuture<Void> rollbackFuture = new CompletableFuture<>();
ResponseHandler pullAllHandler = new RollbackTxResponseHandler( rollbackFuture );
connection.runAndFlush( ROLLBACK_QUERY, emptyMap(), NoOpResponseHandler.INSTANCE, pullAllHandler );
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,16 +95,8 @@ public void exceptionCaught( ChannelHandlerContext ctx, Throwable error )
else
{
failed = true;

Throwable cause = error instanceof DecoderException ? error.getCause() : error;
if ( cause instanceof SSLHandshakeException )
{
fail( ctx, new SecurityException( "Failed to establish secured connection with the server", cause ) );
}
else
{
fail( ctx, cause );
}
Throwable cause = transformError( error );
fail( ctx, cause );
}
}

Expand Down Expand Up @@ -161,4 +153,21 @@ private static Throwable protocolNoSupportedByDriverError( int suggestedProtocol
return new ClientException(
"Protocol error, server suggested unexpected protocol version: " + suggestedProtocolVersion );
}

private static Throwable transformError( Throwable error )
{
Throwable cause = error instanceof DecoderException ? error.getCause() : error;
if ( cause instanceof ServiceUnavailableException )
{
return cause;
}
else if ( cause instanceof SSLHandshakeException )
{
return new SecurityException( "Failed to establish secured connection with the server", cause );
}
else
{
return new ServiceUnavailableException( "Failed to establish connection with the server", cause );
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ private void fail( ChannelHandlerContext ctx, Throwable error )
ctx.close();
}

private Throwable transformError( Throwable error )
private static Throwable transformError( Throwable error )
{
if ( error instanceof CodecException )
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,12 @@
*/
package org.neo4j.driver.internal.cluster;

import static java.util.concurrent.TimeUnit.SECONDS;

public class RoutingSettings
{
public static final RoutingSettings DEFAULT = new RoutingSettings( 1, SECONDS.toMillis( 5 ) );

private final int maxRoutingFailures;
private final long retryTimeoutDelay;
private final RoutingContext routingContext;
Expand Down

This file was deleted.

4 changes: 2 additions & 2 deletions driver/src/main/java/org/neo4j/driver/v1/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -267,8 +267,8 @@ public static class ConfigBuilder
private boolean encrypted = true;
private TrustStrategy trustStrategy = trustAllCertificates();
private LoadBalancingStrategy loadBalancingStrategy = LoadBalancingStrategy.LEAST_CONNECTED;
private int routingFailureLimit = 1;
private long routingRetryDelayMillis = TimeUnit.SECONDS.toMillis( 5 );
private int routingFailureLimit = RoutingSettings.DEFAULT.maxRoutingFailures();
private long routingRetryDelayMillis = RoutingSettings.DEFAULT.retryTimeoutDelay();
private int connectionTimeoutMillis = (int) TimeUnit.SECONDS.toMillis( 5 );
private RetrySettings retrySettings = RetrySettings.DEFAULT;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@
import org.junit.Test;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

Expand All @@ -44,6 +46,7 @@
import org.neo4j.driver.v1.exceptions.ServiceUnavailableException;
import org.neo4j.driver.v1.util.TestNeo4j;

import static java.util.concurrent.CompletableFuture.runAsync;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.startsWith;
import static org.junit.Assert.assertEquals;
Expand Down Expand Up @@ -186,6 +189,42 @@ public void shouldFailWhenTLSHandshakeTakesTooLong() throws Exception
testReadTimeoutOnConnect( SecurityPlan.forAllCertificates() );
}

@Test
public void shouldThrowServiceUnavailableExceptionOnFailureDuringConnect() throws Exception
{
ServerSocket server = new ServerSocket( 0 );
BoltServerAddress address = new BoltServerAddress( "localhost", server.getLocalPort() );

runAsync( () ->
{
try
{
// wait for a connection
Socket socket = server.accept();
// and terminate it immediately so that client gets a "reset by peer" IOException
socket.close();
server.close();
}
catch ( IOException e )
{
throw new UncheckedIOException( e );
}
} );

ChannelConnector connector = newConnector( neo4j.authToken() );
ChannelFuture channelFuture = connector.connect( address, bootstrap );

// connect operation should fail with ServiceUnavailableException
try
{
await( channelFuture );
fail( "Exception expected" );
}
catch ( ServiceUnavailableException ignore )
{
}
}

private void testReadTimeoutOnConnect( SecurityPlan securityPlan ) throws IOException
{
try ( ServerSocket server = new ServerSocket( 0 ) ) // server that accepts connections but does not reply
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,10 @@
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;
import static org.neo4j.driver.internal.async.ChannelAttributes.setMessageDispatcher;
import static org.neo4j.driver.internal.async.BoltProtocolV1Util.HTTP;
import static org.neo4j.driver.internal.async.BoltProtocolV1Util.NO_PROTOCOL_VERSION;
import static org.neo4j.driver.internal.async.BoltProtocolV1Util.PROTOCOL_VERSION_1;
import static org.neo4j.driver.internal.async.ChannelAttributes.setMessageDispatcher;
import static org.neo4j.driver.internal.logging.DevNullLogging.DEV_NULL_LOGGING;
import static org.neo4j.driver.v1.util.TestUtil.await;

Expand Down Expand Up @@ -85,9 +85,34 @@ public void shouldFailGivenPromiseWhenExceptionCaught()
await( handshakeCompletedPromise );
fail( "Exception expected" );
}
catch ( Exception e )
catch ( ServiceUnavailableException e )
{
assertEquals( cause, e.getCause() );
}

// channel should be closed
assertNull( await( channel.closeFuture() ) );
}

@Test
public void shouldFailGivenPromiseWhenServiceUnavailableExceptionCaught()
{
ChannelPromise handshakeCompletedPromise = channel.newPromise();
HandshakeHandler handler = newHandler( handshakeCompletedPromise );
channel.pipeline().addLast( handler );

ServiceUnavailableException error = new ServiceUnavailableException( "Bad error" );
channel.pipeline().fireExceptionCaught( error );

try
{
assertEquals( cause, e );
// promise should fail
await( handshakeCompletedPromise );
fail( "Exception expected" );
}
catch ( ServiceUnavailableException e )
{
assertEquals( error, e );
}

// channel should be closed
Expand All @@ -112,9 +137,9 @@ public void shouldFailGivenPromiseWhenMultipleExceptionsCaught()
await( handshakeCompletedPromise );
fail( "Exception expected" );
}
catch ( RuntimeException e )
catch ( ServiceUnavailableException e )
{
assertEquals( error1, e );
assertEquals( error1, e.getCause() );
}

// channel should be closed
Expand Down Expand Up @@ -147,9 +172,9 @@ public void shouldUnwrapDecoderException()
await( handshakeCompletedPromise );
fail( "Exception expected" );
}
catch ( Exception e )
catch ( ServiceUnavailableException e )
{
assertEquals( cause, e );
assertEquals( cause, e.getCause() );
}

// channel should be closed
Expand Down
Loading

0 comments on commit ef48d78

Please sign in to comment.