Skip to content

Commit

Permalink
Explicitly close the FlowReceiver (apache#31982)
Browse files Browse the repository at this point in the history
  • Loading branch information
bzablocki authored Jul 25, 2024
1 parent 1e8c091 commit 8eb09bf
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ public class BasicAuthJcsmpSessionService extends SessionService {
private final String password;
private final String vpnName;
@Nullable private JCSMPSession jcsmpSession;
@Nullable private MessageReceiver messageReceiver;
private final RetryCallableManager retryCallableManager = RetryCallableManager.create();

/**
Expand Down Expand Up @@ -73,21 +74,25 @@ public void connect() {

@Override
public void close() {
if (isClosed()) {
return;
}
retryCallableManager.retryCallable(
() -> {
checkStateNotNull(jcsmpSession).closeSession();
if (messageReceiver != null) {
messageReceiver.close();
}
if (!isClosed()) {
checkStateNotNull(jcsmpSession).closeSession();
}
return 0;
},
ImmutableSet.of(IOException.class));
}

@Override
public MessageReceiver createReceiver() {
return retryCallableManager.retryCallable(
this::createFlowReceiver, ImmutableSet.of(JCSMPException.class));
this.messageReceiver =
retryCallableManager.retryCallable(
this::createFlowReceiver, ImmutableSet.of(JCSMPException.class));
return this.messageReceiver;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ public interface MessageReceiver {
*/
BytesXMLMessage receive() throws IOException;

/** Closes the message receiver. */
void close();

/**
* Test clients may return {@literal true} to signal that all expected messages have been pulled
* and the test may complete. Real clients should always return {@literal false}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,4 +69,11 @@ public BytesXMLMessage receive() throws IOException {
throw new IOException(e);
}
}

@Override
public void close() {
if (!isClosed()) {
this.flowReceiver.close();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,9 @@ public BytesXMLMessage receive() throws IOException {
return getRecordFn.apply(counter.getAndIncrement());
}

@Override
public void close() {}

@Override
public boolean isEOF() {
return counter.get() >= minMessagesReceived;
Expand Down

0 comments on commit 8eb09bf

Please sign in to comment.