Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-8745: Add RFT.shouldMarkSessionAsDirty() #8759

Merged
merged 3 commits into from
Oct 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -451,27 +451,42 @@ public <T> T execute(SessionCallback<F, T> callback) {
}
return callback.doInSession(session);
}
catch (Exception e) {
if (session != null) {
catch (Exception ex) {
if (session != null && shouldMarkSessionAsDirty(ex)) {
session.dirty();
}
if (e instanceof MessagingException) { // NOSONAR
throw (MessagingException) e;
if (ex instanceof MessagingException messagingException) { // NOSONAR
throw messagingException;
}
throw new MessagingException("Failed to execute on session", e);
throw new MessagingException("Failed to execute on session", ex);
}
finally {
if (!invokeScope && session != null) {
try {
session.close();
}
catch (Exception ignored) {
this.logger.debug("failed to close Session", ignored);
catch (Exception ex) {
this.logger.debug("failed to close Session", ex);
}
}
}
}

/**
* Determine whether {@link Session#dirty()} should be called
* in the {@link #execute(SessionCallback)} when an exception is thrown from the callback.
* By default, this method returns {@code true}.
* Remote file protocol extensions can override this method to provide
* a specific strategy against the thrown exception, e.g. {@code file not found} error
* is not a signal that session is broken.
* @param ex the exception to check if {@link Session} must be marked as dirty.
* @return true if {@link Session#dirty()} should be called.
* @since 6.0.8
*/
protected boolean shouldMarkSessionAsDirty(Exception ex) {
return true;
}

@Override
public <T> T invoke(OperationsCallback<F, T> action) {
Session<F> contextSession = this.contextSessions.get();
Expand Down Expand Up @@ -503,8 +518,7 @@ public <T, C> T executeWithClient(ClientCallback<C, T> callback) {
private StreamHolder payloadToInputStream(Message<?> message) throws MessageDeliveryException {
Object payload = message.getPayload();
try {
if (payload instanceof File) {
File inputFile = (File) payload;
if (payload instanceof File inputFile) {
if (inputFile.exists()) {
return new StreamHolder(
new BufferedInputStream(new FileInputStream(inputFile)), inputFile.getAbsolutePath());
Expand All @@ -526,8 +540,7 @@ else if (payload instanceof byte[] || payload instanceof String) {
else if (payload instanceof InputStream) {
return new StreamHolder((InputStream) payload, "InputStream payload");
}
else if (payload instanceof Resource) {
Resource resource = (Resource) payload;
else if (payload instanceof Resource resource) {
artembilan marked this conversation as resolved.
Show resolved Hide resolved
String filename = resource.getFilename();
return new StreamHolder(resource.getInputStream(), filename != null ? filename : "Resource payload");
}
Expand Down Expand Up @@ -619,16 +632,7 @@ else if (!directoryPath.endsWith(this.remoteFileSeparator)) {
}
}

private static final class StreamHolder {

private final InputStream stream;

private final String name;

StreamHolder(InputStream stream, String name) {
this.stream = stream;
this.name = name;
}
private record StreamHolder(InputStream stream, String name) {

artembilan marked this conversation as resolved.
Show resolved Hide resolved
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@

import org.apache.commons.net.ftp.FTPClient;
import org.apache.commons.net.ftp.FTPFile;
import org.apache.commons.net.ftp.FTPReply;

import org.springframework.integration.file.remote.ClientCallback;
import org.springframework.integration.file.remote.RemoteFileTemplate;
import org.springframework.integration.file.remote.session.SessionFactory;
import org.springframework.lang.Nullable;
import org.springframework.messaging.MessagingException;
import org.springframework.util.Assert;
import org.springframework.util.ObjectUtils;
Expand All @@ -34,6 +36,7 @@
*
* @author Gary Russell
* @author Artem Bilan
*
* @since 4.1
*
*/
Expand Down Expand Up @@ -82,29 +85,50 @@ protected <T> T doExecuteWithClient(final ClientCallback<FTPClient, T> callback)
public boolean exists(final String path) {
return doExecuteWithClient(client -> {
try {
switch (FtpRemoteFileTemplate.this.existsMode) {

case STAT:
return client.getStatus(path) != null;

case NLST:
String[] names = client.listNames(path);
return !ObjectUtils.isEmpty(names);

case NLST_AND_DIRS:
return FtpRemoteFileTemplate.super.exists(path);

default:
throw new IllegalStateException("Unsupported 'existsMode': " +
FtpRemoteFileTemplate.this.existsMode);
}
return switch (FtpRemoteFileTemplate.this.existsMode) {
case STAT -> client.getStatus(path) != null;
case NLST -> !ObjectUtils.isEmpty(client.listNames(path));
case NLST_AND_DIRS -> FtpRemoteFileTemplate.super.exists(path);
artembilan marked this conversation as resolved.
Show resolved Hide resolved
};
}
catch (IOException e) {
throw new MessagingException("Failed to check the remote path for " + path, e);
}
});
}

@Override
protected boolean shouldMarkSessionAsDirty(Exception ex) {
IOException ftpException = findIoException(ex);
if (ftpException != null) {
return isStatusDirty(ftpException.getMessage());
}
else {
return super.shouldMarkSessionAsDirty(ex);
}
}

/**
* Check if {@link IOException#getMessage()} is treated as fatal.
* @param ftpErrorMessage the value from {@link IOException#getMessage()}.
* @return true if {@link IOException#getMessage()} is treated as fatal.
* @since 6.0.8
*/
protected boolean isStatusDirty(String ftpErrorMessage) {
return !ftpErrorMessage.contains("" + FTPReply.FILE_UNAVAILABLE)
&& !ftpErrorMessage.contains("" + FTPReply.FILE_NAME_NOT_ALLOWED);
}

@Nullable
private static IOException findIoException(Throwable ex) {
if (ex == null || ex instanceof IOException) {
return (IOException) ex;
}
else {
return findIoException(ex.getCause());
}
}

/**
* The {@link #exists(String)} operation mode.
* @since 4.1.9
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2014-2022 the original author or authors.
* Copyright 2014-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -34,6 +34,7 @@
import org.springframework.integration.file.DefaultFileNameGenerator;
import org.springframework.integration.file.remote.ClientCallbackWithoutResult;
import org.springframework.integration.file.remote.SessionCallbackWithoutResult;
import org.springframework.integration.file.remote.session.Session;
import org.springframework.integration.file.remote.session.SessionFactory;
import org.springframework.integration.file.support.FileExistsMode;
import org.springframework.integration.ftp.FtpTestSupport;
Expand All @@ -53,9 +54,7 @@
/**
* @author Gary Russell
* @author Artem Bilan
*
* @since 4.1
*
*/
@SpringJUnitConfig
@DirtiesContext
Expand Down Expand Up @@ -142,6 +141,25 @@ public void testConnectionClosedAfterExists() throws Exception {
assertThat(pool.getActiveCount()).isEqualTo(0);
}

@Test
public void sessionIsNotDirtyOnNoSuchFileError() {
Session<FTPFile> session = this.sessionFactory.getSession();
session.close();

FtpRemoteFileTemplate template = new FtpRemoteFileTemplate(this.sessionFactory);

assertThatExceptionOfType(MessagingException.class)
.isThrownBy(() -> template.rename("No_such_file1", "No_such_file2"))
.withRootCauseInstanceOf(IOException.class)
.withStackTraceContaining("553 : No such file or directory");

Session<FTPFile> newSession = this.sessionFactory.getSession();
assertThat(TestUtils.getPropertyValue(newSession, "targetSession"))
.isSameAs(TestUtils.getPropertyValue(session, "targetSession"));

newSession.close();
}

@Configuration
public static class Config {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,16 @@

package org.springframework.integration.sftp.session;

import java.util.List;

import org.apache.sshd.sftp.client.SftpClient;
import org.apache.sshd.sftp.common.SftpConstants;
import org.apache.sshd.sftp.common.SftpException;

import org.springframework.integration.file.remote.ClientCallback;
import org.springframework.integration.file.remote.RemoteFileTemplate;
import org.springframework.integration.file.remote.session.SessionFactory;
import org.springframework.lang.Nullable;

/**
* SFTP version of {@code RemoteFileTemplate} providing type-safe access to
Expand All @@ -34,6 +39,21 @@
*/
public class SftpRemoteFileTemplate extends RemoteFileTemplate<SftpClient.DirEntry> {

protected static final List<Integer> NOT_DIRTY_STATUSES = // NOSONAR
List.of(
SftpConstants.SSH_FX_NO_SUCH_FILE,
SftpConstants.SSH_FX_NO_SUCH_PATH,
SftpConstants.SSH_FX_INVALID_FILENAME,
SftpConstants.SSH_FX_INVALID_HANDLE,
SftpConstants.SSH_FX_FILE_ALREADY_EXISTS,
SftpConstants.SSH_FX_DIR_NOT_EMPTY,
SftpConstants.SSH_FX_NOT_A_DIRECTORY,
SftpConstants.SSH_FX_EOF,
SftpConstants.SSH_FX_CANNOT_DELETE,
SftpConstants.SSH_FX_FILE_IS_A_DIRECTORY,
SftpConstants.SSH_FX_FILE_CORRUPT
);

public SftpRemoteFileTemplate(SessionFactory<SftpClient.DirEntry> sessionFactory) {
super(sessionFactory);
}
Expand All @@ -48,4 +68,35 @@ protected <T> T doExecuteWithClient(final ClientCallback<SftpClient, T> callback
return execute(session -> callback.doWithClient((SftpClient) session.getClientInstance()));
}

@Override
protected boolean shouldMarkSessionAsDirty(Exception ex) {
SftpException sftpException = findSftpException(ex);
if (sftpException != null) {
return isStatusDirty(sftpException.getStatus());
}
else {
return super.shouldMarkSessionAsDirty(ex);
}
}

/**
* Check if {@link SftpException#getStatus()} is treated as fatal.
* @param status the value from {@link SftpException#getStatus()}.
* @return true if {@link SftpException#getStatus()} is treated as fatal.
* @since 6.0.8
*/
protected boolean isStatusDirty(int status) {
return !NOT_DIRTY_STATUSES.contains(status);
}

@Nullable
private static SftpException findSftpException(Throwable ex) {
if (ex == null || ex instanceof SftpException) {
return (SftpException) ex;
}
else {
return findSftpException(ex.getCause());
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import org.apache.sshd.sftp.client.SftpClient;
import org.apache.sshd.sftp.client.SftpVersionSelector;
import org.apache.sshd.sftp.common.SftpException;
import org.junit.jupiter.api.Test;

import org.springframework.beans.factory.BeanFactory;
Expand All @@ -38,6 +39,7 @@
import org.springframework.integration.file.support.FileExistsMode;
import org.springframework.integration.sftp.SftpTestSupport;
import org.springframework.integration.test.condition.LogLevels;
import org.springframework.integration.test.util.TestUtils;
import org.springframework.messaging.MessageDeliveryException;
import org.springframework.messaging.MessagingException;
import org.springframework.messaging.support.GenericMessage;
Expand All @@ -52,9 +54,7 @@
/**
* @author Gary Russell
* @author Artem Bilan
*
* @since 4.1
*
*/
@SpringJUnitConfig
@DirtiesContext
Expand All @@ -63,7 +63,7 @@ public class SftpRemoteFileTemplateTests extends SftpTestSupport {
@Autowired
private CachingSessionFactory<SftpClient.DirEntry> sessionFactory;

@LogLevels(level = "trace", categories = { "org.apache.sshd", "org.springframework.integration.sftp" })
@LogLevels(level = "trace", categories = {"org.apache.sshd", "org.springframework.integration.sftp"})
@Test
public void testINT3412AppendStatRmdir() {
SftpRemoteFileTemplate template = new SftpRemoteFileTemplate(sessionFactory);
Expand Down Expand Up @@ -164,6 +164,25 @@ public void renameWithOldSftpVersion() {
oldVersionSession.close();
}

@Test
public void sessionIsNotDirtyOnNoSuchFileError() {
Session<SftpClient.DirEntry> session = this.sessionFactory.getSession();
session.close();

SftpRemoteFileTemplate template = new SftpRemoteFileTemplate(this.sessionFactory);

assertThatExceptionOfType(MessagingException.class)
.isThrownBy(() -> template.list("No_such_dir"))
.withRootCauseInstanceOf(SftpException.class)
.withStackTraceContaining("(SSH_FX_NO_SUCH_FILE): No such file or directory");

Session<SftpClient.DirEntry> newSession = this.sessionFactory.getSession();
assertThat(TestUtils.getPropertyValue(newSession, "targetSession"))
.isSameAs(TestUtils.getPropertyValue(session, "targetSession"));

newSession.close();
}

@Configuration
public static class Config {

Expand Down
Loading
Loading