Skip to content

Commit

Permalink
DRILL-2575: FragmentExecutor.cancel() blasts through state transition…
Browse files Browse the repository at this point in the history
…s regardless of current state

FragmentExecutor:
- Changed cancel() to behave asynchronously, and for the cancelation request to
  be checked at an appropriate place in the run() loop.
  • Loading branch information
cwestin authored and jacques-n committed Mar 26, 2015
1 parent db2e032 commit 462e50c
Showing 1 changed file with 43 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ public class FragmentExecutor implements Runnable {
private final FragmentRoot rootOperator;
private final FragmentContext fragmentContext;
private final StatusReporter listener;
private volatile boolean canceled;
private volatile boolean closed;
private RootExec root;

Expand Down Expand Up @@ -88,15 +89,15 @@ public FragmentStatus getStatus() {
}

public void cancel() {
logger.debug("Cancelling fragment {}", fragmentContext.getHandle());

// Note this will be called outside of run(), from another thread
// Change state checked by main loop to terminate it (if not already done):
updateState(FragmentState.CANCELLED);

fragmentContext.cancel();

logger.debug("Cancelled fragment {}", fragmentContext.getHandle());
/*
* Note that this can be called from threads *other* than the one running this runnable(), so
* we need to be careful about the state transitions that can result. We set the canceled flag,
* and this is checked in the run() loop, where action will be taken as soon as possible.
*
* If the run loop has already exited, because we've already either completed or failed the query,
* then the request to cancel is a no-op anyway, so it doesn't matter that we won't see the flag.
*/
canceled = true;
}

public void receivingFragmentFinished(FragmentHandle handle) {
Expand Down Expand Up @@ -142,6 +143,23 @@ public void run() {
* alerting the user--the behavior then is to hang.
*/
while (state.get() == FragmentState.RUNNING_VALUE) {
if (canceled) {
logger.debug("Cancelling fragment {}", fragmentContext.getHandle());

// Change state checked by main loop to terminate it (if not already done):
updateState(FragmentState.CANCELLED);

fragmentContext.cancel();

logger.debug("Cancelled fragment {}", fragmentContext.getHandle());

/*
* The state will be altered because of the updateState(), which would cause
* us to fall out of the enclosing while loop; we just short-circuit that here
*/
break;
}

if (!root.next()) {
if (fragmentContext.isFailed()) {
internalFail(fragmentContext.getFailureCause());
Expand Down Expand Up @@ -180,19 +198,21 @@ private void closeOutResources() {
* be safe to call it more than once. We use this flag to bypass the body if it has
* been called before.
*/
if (closed) {
return;
}
synchronized(this) { // synchronize for the state of closed
if (closed) {
return;
}

final DeferredException deferredException = fragmentContext.getDeferredException();
try {
root.stop(); // TODO make this an AutoCloseable so we can detect lack of closure
} catch (RuntimeException e) {
logger.warn(CLOSE_FAILURE, e);
deferredException.addException(e);
}
final DeferredException deferredException = fragmentContext.getDeferredException();
try {
root.stop(); // TODO make this an AutoCloseable so we can detect lack of closure
} catch (RuntimeException e) {
logger.warn(CLOSE_FAILURE, e);
deferredException.addException(e);
}

closed = true;
closed = true;
}

/*
* This must be last, because this may throw deferred exceptions.
Expand Down Expand Up @@ -221,7 +241,7 @@ private void updateState(final FragmentState to) {
}

/**
* Updates the fragment state only if the current state matches the expected.
* Updates the fragment state only iff the current state matches the expected.
*
* @param expected expected current state
* @param to target state
Expand Down Expand Up @@ -258,7 +278,8 @@ private boolean isCompleted() {
private boolean updateStateOrFail(final FragmentState expected, final FragmentState to) {
final boolean updated = checkAndUpdateState(expected, to);
if (!updated && !isCompleted()) {
final String msg = "State was different than expected while attempting to update state from %s to %s however current state was %s.";
final String msg = "State was different than expected while attempting to update state from %s to %s"
+ "however current state was %s.";
internalFail(new StateTransitionException(
String.format(msg, expected.name(), to.name(), FragmentState.valueOf(state.get()))));
}
Expand Down

0 comments on commit 462e50c

Please sign in to comment.