Skip to content

Commit

Permalink
GH-8745: Add RFT.shouldMarkSessionAsDirty() (#8759)
Browse files Browse the repository at this point in the history
* GH-8745: Add RFT.shouldMarkSessionAsDirty()

Fixes #8745

Not all errors caught in the `RemoteFileTemplate.execute()`
are fatal to mark session as dirty and physically close the target session
in the cache

* Introduce a `RemoteFileTemplate.shouldMarkSessionAsDirty()`
to consult with an exception if it is really a fatal error to close
the session in the end.
* Override `shouldMarkSessionAsDirty()` in the `RemoteFileTemplate`
implementations to check statuses of respective protocol errors

**Cherry-pick to `6.1.x` & `6.0.x`**

* * Fix tests for pool interaction

* * Fix language in Javadocs
* Add more `not dirty` statuses to `SftpRemoteFileTemplate` & `SmbRemoteFileTemplate`
  • Loading branch information
artembilan authored Oct 11, 2023
1 parent b822853 commit 44433ed
Show file tree
Hide file tree
Showing 7 changed files with 239 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -451,27 +451,42 @@ public <T> T execute(SessionCallback<F, T> callback) {
}
return callback.doInSession(session);
}
catch (Exception e) {
if (session != null) {
catch (Exception ex) {
if (session != null && shouldMarkSessionAsDirty(ex)) {
session.dirty();
}
if (e instanceof MessagingException) { // NOSONAR
throw (MessagingException) e;
if (ex instanceof MessagingException messagingException) { // NOSONAR
throw messagingException;
}
throw new MessagingException("Failed to execute on session", e);
throw new MessagingException("Failed to execute on session", ex);
}
finally {
if (!invokeScope && session != null) {
try {
session.close();
}
catch (Exception ignored) {
this.logger.debug("failed to close Session", ignored);
catch (Exception ex) {
this.logger.debug("failed to close Session", ex);
}
}
}
}

/**
* Determine whether {@link Session#dirty()} should be called
* in the {@link #execute(SessionCallback)} when an exception is thrown from the callback.
* By default, this method returns {@code true}.
* Remote file protocol extensions can override this method to provide
* a specific strategy against the thrown exception, e.g. {@code file not found} error
* is not a signal that session is broken.
* @param ex the exception to check if {@link Session} must be marked as dirty.
* @return true if {@link Session#dirty()} should be called.
* @since 6.0.8
*/
protected boolean shouldMarkSessionAsDirty(Exception ex) {
return true;
}

@Override
public <T> T invoke(OperationsCallback<F, T> action) {
Session<F> contextSession = this.contextSessions.get();
Expand Down Expand Up @@ -503,8 +518,7 @@ public <T, C> T executeWithClient(ClientCallback<C, T> callback) {
private StreamHolder payloadToInputStream(Message<?> message) throws MessageDeliveryException {
Object payload = message.getPayload();
try {
if (payload instanceof File) {
File inputFile = (File) payload;
if (payload instanceof File inputFile) {
if (inputFile.exists()) {
return new StreamHolder(
new BufferedInputStream(new FileInputStream(inputFile)), inputFile.getAbsolutePath());
Expand All @@ -526,8 +540,7 @@ else if (payload instanceof byte[] || payload instanceof String) {
else if (payload instanceof InputStream) {
return new StreamHolder((InputStream) payload, "InputStream payload");
}
else if (payload instanceof Resource) {
Resource resource = (Resource) payload;
else if (payload instanceof Resource resource) {
String filename = resource.getFilename();
return new StreamHolder(resource.getInputStream(), filename != null ? filename : "Resource payload");
}
Expand Down Expand Up @@ -619,16 +632,7 @@ else if (!directoryPath.endsWith(this.remoteFileSeparator)) {
}
}

private static final class StreamHolder {

private final InputStream stream;

private final String name;

StreamHolder(InputStream stream, String name) {
this.stream = stream;
this.name = name;
}
private record StreamHolder(InputStream stream, String name) {

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@

import org.apache.commons.net.ftp.FTPClient;
import org.apache.commons.net.ftp.FTPFile;
import org.apache.commons.net.ftp.FTPReply;

import org.springframework.integration.file.remote.ClientCallback;
import org.springframework.integration.file.remote.RemoteFileTemplate;
import org.springframework.integration.file.remote.session.SessionFactory;
import org.springframework.lang.Nullable;
import org.springframework.messaging.MessagingException;
import org.springframework.util.Assert;
import org.springframework.util.ObjectUtils;
Expand All @@ -34,6 +36,7 @@
*
* @author Gary Russell
* @author Artem Bilan
*
* @since 4.1
*
*/
Expand Down Expand Up @@ -82,29 +85,50 @@ protected <T> T doExecuteWithClient(final ClientCallback<FTPClient, T> callback)
public boolean exists(final String path) {
return doExecuteWithClient(client -> {
try {
switch (FtpRemoteFileTemplate.this.existsMode) {

case STAT:
return client.getStatus(path) != null;

case NLST:
String[] names = client.listNames(path);
return !ObjectUtils.isEmpty(names);

case NLST_AND_DIRS:
return FtpRemoteFileTemplate.super.exists(path);

default:
throw new IllegalStateException("Unsupported 'existsMode': " +
FtpRemoteFileTemplate.this.existsMode);
}
return switch (FtpRemoteFileTemplate.this.existsMode) {
case STAT -> client.getStatus(path) != null;
case NLST -> !ObjectUtils.isEmpty(client.listNames(path));
case NLST_AND_DIRS -> FtpRemoteFileTemplate.super.exists(path);
};
}
catch (IOException e) {
throw new MessagingException("Failed to check the remote path for " + path, e);
}
});
}

@Override
protected boolean shouldMarkSessionAsDirty(Exception ex) {
IOException ftpException = findIoException(ex);
if (ftpException != null) {
return isStatusDirty(ftpException.getMessage());
}
else {
return super.shouldMarkSessionAsDirty(ex);
}
}

/**
* Check if {@link IOException#getMessage()} is treated as fatal.
* @param ftpErrorMessage the value from {@link IOException#getMessage()}.
* @return true if {@link IOException#getMessage()} is treated as fatal.
* @since 6.0.8
*/
protected boolean isStatusDirty(String ftpErrorMessage) {
return !ftpErrorMessage.contains("" + FTPReply.FILE_UNAVAILABLE)
&& !ftpErrorMessage.contains("" + FTPReply.FILE_NAME_NOT_ALLOWED);
}

@Nullable
private static IOException findIoException(Throwable ex) {
if (ex == null || ex instanceof IOException) {
return (IOException) ex;
}
else {
return findIoException(ex.getCause());
}
}

/**
* The {@link #exists(String)} operation mode.
* @since 4.1.9
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2014-2022 the original author or authors.
* Copyright 2014-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -34,6 +34,7 @@
import org.springframework.integration.file.DefaultFileNameGenerator;
import org.springframework.integration.file.remote.ClientCallbackWithoutResult;
import org.springframework.integration.file.remote.SessionCallbackWithoutResult;
import org.springframework.integration.file.remote.session.Session;
import org.springframework.integration.file.remote.session.SessionFactory;
import org.springframework.integration.file.support.FileExistsMode;
import org.springframework.integration.ftp.FtpTestSupport;
Expand All @@ -53,9 +54,7 @@
/**
* @author Gary Russell
* @author Artem Bilan
*
* @since 4.1
*
*/
@SpringJUnitConfig
@DirtiesContext
Expand Down Expand Up @@ -142,6 +141,25 @@ public void testConnectionClosedAfterExists() throws Exception {
assertThat(pool.getActiveCount()).isEqualTo(0);
}

@Test
public void sessionIsNotDirtyOnNoSuchFileError() {
Session<FTPFile> session = this.sessionFactory.getSession();
session.close();

FtpRemoteFileTemplate template = new FtpRemoteFileTemplate(this.sessionFactory);

assertThatExceptionOfType(MessagingException.class)
.isThrownBy(() -> template.rename("No_such_file1", "No_such_file2"))
.withRootCauseInstanceOf(IOException.class)
.withStackTraceContaining("553 : No such file or directory");

Session<FTPFile> newSession = this.sessionFactory.getSession();
assertThat(TestUtils.getPropertyValue(newSession, "targetSession"))
.isSameAs(TestUtils.getPropertyValue(session, "targetSession"));

newSession.close();
}

@Configuration
public static class Config {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,16 @@

package org.springframework.integration.sftp.session;

import java.util.List;

import org.apache.sshd.sftp.client.SftpClient;
import org.apache.sshd.sftp.common.SftpConstants;
import org.apache.sshd.sftp.common.SftpException;

import org.springframework.integration.file.remote.ClientCallback;
import org.springframework.integration.file.remote.RemoteFileTemplate;
import org.springframework.integration.file.remote.session.SessionFactory;
import org.springframework.lang.Nullable;

/**
* SFTP version of {@code RemoteFileTemplate} providing type-safe access to
Expand All @@ -34,6 +39,21 @@
*/
public class SftpRemoteFileTemplate extends RemoteFileTemplate<SftpClient.DirEntry> {

protected static final List<Integer> NOT_DIRTY_STATUSES = // NOSONAR
List.of(
SftpConstants.SSH_FX_NO_SUCH_FILE,
SftpConstants.SSH_FX_NO_SUCH_PATH,
SftpConstants.SSH_FX_INVALID_FILENAME,
SftpConstants.SSH_FX_INVALID_HANDLE,
SftpConstants.SSH_FX_FILE_ALREADY_EXISTS,
SftpConstants.SSH_FX_DIR_NOT_EMPTY,
SftpConstants.SSH_FX_NOT_A_DIRECTORY,
SftpConstants.SSH_FX_EOF,
SftpConstants.SSH_FX_CANNOT_DELETE,
SftpConstants.SSH_FX_FILE_IS_A_DIRECTORY,
SftpConstants.SSH_FX_FILE_CORRUPT
);

public SftpRemoteFileTemplate(SessionFactory<SftpClient.DirEntry> sessionFactory) {
super(sessionFactory);
}
Expand All @@ -48,4 +68,35 @@ protected <T> T doExecuteWithClient(final ClientCallback<SftpClient, T> callback
return execute(session -> callback.doWithClient((SftpClient) session.getClientInstance()));
}

@Override
protected boolean shouldMarkSessionAsDirty(Exception ex) {
SftpException sftpException = findSftpException(ex);
if (sftpException != null) {
return isStatusDirty(sftpException.getStatus());
}
else {
return super.shouldMarkSessionAsDirty(ex);
}
}

/**
* Check if {@link SftpException#getStatus()} is treated as fatal.
* @param status the value from {@link SftpException#getStatus()}.
* @return true if {@link SftpException#getStatus()} is treated as fatal.
* @since 6.0.8
*/
protected boolean isStatusDirty(int status) {
return !NOT_DIRTY_STATUSES.contains(status);
}

@Nullable
private static SftpException findSftpException(Throwable ex) {
if (ex == null || ex instanceof SftpException) {
return (SftpException) ex;
}
else {
return findSftpException(ex.getCause());
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import org.apache.sshd.sftp.client.SftpClient;
import org.apache.sshd.sftp.client.SftpVersionSelector;
import org.apache.sshd.sftp.common.SftpException;
import org.junit.jupiter.api.Test;

import org.springframework.beans.factory.BeanFactory;
Expand All @@ -38,6 +39,7 @@
import org.springframework.integration.file.support.FileExistsMode;
import org.springframework.integration.sftp.SftpTestSupport;
import org.springframework.integration.test.condition.LogLevels;
import org.springframework.integration.test.util.TestUtils;
import org.springframework.messaging.MessageDeliveryException;
import org.springframework.messaging.MessagingException;
import org.springframework.messaging.support.GenericMessage;
Expand All @@ -52,9 +54,7 @@
/**
* @author Gary Russell
* @author Artem Bilan
*
* @since 4.1
*
*/
@SpringJUnitConfig
@DirtiesContext
Expand All @@ -63,7 +63,7 @@ public class SftpRemoteFileTemplateTests extends SftpTestSupport {
@Autowired
private CachingSessionFactory<SftpClient.DirEntry> sessionFactory;

@LogLevels(level = "trace", categories = { "org.apache.sshd", "org.springframework.integration.sftp" })
@LogLevels(level = "trace", categories = {"org.apache.sshd", "org.springframework.integration.sftp"})
@Test
public void testINT3412AppendStatRmdir() {
SftpRemoteFileTemplate template = new SftpRemoteFileTemplate(sessionFactory);
Expand Down Expand Up @@ -164,6 +164,25 @@ public void renameWithOldSftpVersion() {
oldVersionSession.close();
}

@Test
public void sessionIsNotDirtyOnNoSuchFileError() {
Session<SftpClient.DirEntry> session = this.sessionFactory.getSession();
session.close();

SftpRemoteFileTemplate template = new SftpRemoteFileTemplate(this.sessionFactory);

assertThatExceptionOfType(MessagingException.class)
.isThrownBy(() -> template.list("No_such_dir"))
.withRootCauseInstanceOf(SftpException.class)
.withStackTraceContaining("(SSH_FX_NO_SUCH_FILE): No such file or directory");

Session<SftpClient.DirEntry> newSession = this.sessionFactory.getSession();
assertThat(TestUtils.getPropertyValue(newSession, "targetSession"))
.isSameAs(TestUtils.getPropertyValue(session, "targetSession"));

newSession.close();
}

@Configuration
public static class Config {

Expand Down
Loading

0 comments on commit 44433ed

Please sign in to comment.