diff --git a/src/main/java/com/uber/cadence/internal/sync/SyncDecisionContext.java b/src/main/java/com/uber/cadence/internal/sync/SyncDecisionContext.java index 8d4ebec07..90b2290fc 100644 --- a/src/main/java/com/uber/cadence/internal/sync/SyncDecisionContext.java +++ b/src/main/java/com/uber/cadence/internal/sync/SyncDecisionContext.java @@ -26,6 +26,7 @@ import com.uber.cadence.activity.LocalActivityOptions; import com.uber.cadence.common.RetryOptions; import com.uber.cadence.converter.DataConverter; +import com.uber.cadence.converter.DataConverterException; import com.uber.cadence.internal.common.RetryParameters; import com.uber.cadence.internal.replay.ActivityTaskFailedException; import com.uber.cadence.internal.replay.ActivityTaskTimeoutException; @@ -83,7 +84,12 @@ final class SyncDecisionContext implements WorkflowInterceptor { private final Map> queryCallbacks = new HashMap<>(); private final byte[] lastCompletionResult; - public SyncDecisionContext( + @FunctionalInterface + public interface ActivityExecutor { + Promise getResult(byte[] input); + } + + SyncDecisionContext( DecisionContext context, DataConverter converter, Function interceptorFactory, @@ -127,19 +133,51 @@ public Promise executeActivity( if (retryOptions != null && !context.isServerSideActivityRetry()) { return WorkflowRetryerInternal.retryAsync( retryOptions, - () -> executeActivityOnce(activityName, options, args, resultClass, resultType)); - } - return executeActivityOnce(activityName, options, args, resultClass, resultType); + () -> + executeActivityOnce( + activityName, + args, + resultClass, + resultType, + input -> executeActivityOnce(activityName, options, input))); + } + return executeActivityOnce( + activityName, + args, + resultClass, + resultType, + input -> executeActivityOnce(activityName, options, input)); } private Promise executeActivityOnce( - String name, ActivityOptions options, Object[] args, Class returnClass, Type returnType) { - byte[] input = converter.toData(args); - Promise binaryResult = executeActivityOnce(name, options, input); + String name, + Object[] args, + Class returnClass, + Type returnType, + ActivityExecutor executor) { + byte[] input; + try { + input = converter.toData(args); + } catch (DataConverterException e) { + CompletablePromise result = Workflow.newPromise(); + result.completeExceptionally( + new ActivityFailureException(new ActivityType().setName(name), e)); + return result; + } + + Promise binaryResult = executor.getResult(input); + if (returnClass == Void.TYPE) { return binaryResult.thenApply((r) -> null); } - return binaryResult.thenApply((r) -> converter.fromData(r, returnClass, returnType)); + return binaryResult.thenApply( + (r) -> { + try { + return converter.fromData(r, returnClass, returnType); + } catch (DataConverterException e) { + throw new ActivityFailureException(new ActivityType().setName(name), e); + } + }); } private Promise executeActivityOnce(String name, ActivityOptions options, byte[] input) { @@ -237,34 +275,18 @@ public Promise executeLocalActivity( long startTime = WorkflowInternal.currentTimeMillis(); return WorkflowRetryerInternal.retryAsync( (attempt, currentStart) -> - executeLocalActivityOnce( + executeActivityOnce( activityName, - options, args, resultClass, resultType, - currentStart - startTime, - attempt), + input -> + executeLocalActivityOnce( + activityName, options, input, currentStart - startTime, attempt)), 1, startTime); } - private Promise executeLocalActivityOnce( - String name, - LocalActivityOptions options, - Object[] args, - Class returnClass, - Type returnType, - long elapsed, - int attempt) { - byte[] input = converter.toData(args); - Promise binaryResult = executeLocalActivityOnce(name, options, input, elapsed, attempt); - if (returnClass == Void.TYPE) { - return binaryResult.thenApply((r) -> null); - } - return binaryResult.thenApply((r) -> converter.fromData(r, returnClass, returnType)); - } - private Promise executeLocalActivityOnce( String name, LocalActivityOptions options, byte[] input, long elapsed, int attempt) { ActivityCallback callback = new ActivityCallback(); diff --git a/src/main/java/com/uber/cadence/workflow/ActivityFailureException.java b/src/main/java/com/uber/cadence/workflow/ActivityFailureException.java index eef366eb1..3beef8e5a 100644 --- a/src/main/java/com/uber/cadence/workflow/ActivityFailureException.java +++ b/src/main/java/com/uber/cadence/workflow/ActivityFailureException.java @@ -29,6 +29,11 @@ public final class ActivityFailureException extends ActivityException { private int attempt; private Duration backoff; + public ActivityFailureException(ActivityType activityType, Throwable cause) { + super("ActivityFailureException", -1, activityType, ""); + initCause(cause); + } + public ActivityFailureException( long eventId, ActivityType activityType, String activityId, Throwable cause) { super("ActivityFailureException", eventId, activityType, activityId);