diff --git a/src/main/java/com/uber/cadence/internal/replay/DecisionContext.java b/src/main/java/com/uber/cadence/internal/replay/DecisionContext.java index a0d5c3e91..1cff751a5 100644 --- a/src/main/java/com/uber/cadence/internal/replay/DecisionContext.java +++ b/src/main/java/com/uber/cadence/internal/replay/DecisionContext.java @@ -179,4 +179,10 @@ Optional mutableSideEffect( /** @return replay safe UUID */ UUID randomUUID(); + + /** @return schedule time of the workflow execution. */ + long getScheduleTimeMillis(); + + /** @return start time of the workflow execution. */ + long getExecutionTimeMillis(); } diff --git a/src/main/java/com/uber/cadence/internal/replay/DecisionContextImpl.java b/src/main/java/com/uber/cadence/internal/replay/DecisionContextImpl.java index 94a4cdcc3..75b8196b1 100644 --- a/src/main/java/com/uber/cadence/internal/replay/DecisionContextImpl.java +++ b/src/main/java/com/uber/cadence/internal/replay/DecisionContextImpl.java @@ -85,6 +85,16 @@ public UUID randomUUID() { return workflowClient.randomUUID(); } + @Override + public long getScheduleTimeMillis() { + return workflowClient.getScheduleTimeMillis(); + } + + @Override + public long getExecutionTimeMillis() { + return workflowClient.getExecutionTimeMillis(); + } + @Override public Random newRandom() { return workflowClient.newRandom(); diff --git a/src/main/java/com/uber/cadence/internal/replay/WorkflowContext.java b/src/main/java/com/uber/cadence/internal/replay/WorkflowContext.java index abbca67f4..6f5fd3372 100644 --- a/src/main/java/com/uber/cadence/internal/replay/WorkflowContext.java +++ b/src/main/java/com/uber/cadence/internal/replay/WorkflowContext.java @@ -17,7 +17,12 @@ package com.uber.cadence.internal.replay; -import com.uber.cadence.*; +import com.uber.cadence.ChildPolicy; +import com.uber.cadence.PollForDecisionTaskResponse; +import com.uber.cadence.WorkflowExecution; +import com.uber.cadence.WorkflowExecutionStartedEventAttributes; +import com.uber.cadence.WorkflowType; +import java.util.concurrent.TimeUnit; final class WorkflowContext { @@ -131,4 +136,14 @@ void setCurrentRunId(String currentRunId) { String getCurrentRunId() { return currentRunId; } + + long getScheduleTimeMillis() { + return TimeUnit.NANOSECONDS.toMillis( + decisionTask.getHistory().getEvents().get(0).getTimestamp()); + } + + long getExecutionTimeMillis() { + return TimeUnit.NANOSECONDS.toMillis( + decisionTask.getHistory().getEvents().get(1).getTimestamp()); + } } diff --git a/src/main/java/com/uber/cadence/internal/replay/WorkflowDecisionContext.java b/src/main/java/com/uber/cadence/internal/replay/WorkflowDecisionContext.java index 128d5ad7a..5960f49c1 100644 --- a/src/main/java/com/uber/cadence/internal/replay/WorkflowDecisionContext.java +++ b/src/main/java/com/uber/cadence/internal/replay/WorkflowDecisionContext.java @@ -375,4 +375,12 @@ void handleExternalWorkflowExecutionSignaled(HistoryEvent event) { } } } + + long getScheduleTimeMillis() { + return workflowContext.getScheduleTimeMillis(); + } + + long getExecutionTimeMillis() { + return workflowContext.getExecutionTimeMillis(); + } } diff --git a/src/main/java/com/uber/cadence/internal/sync/DeterministicRunnerImpl.java b/src/main/java/com/uber/cadence/internal/sync/DeterministicRunnerImpl.java index 73135fbb6..a0b5b0b9e 100644 --- a/src/main/java/com/uber/cadence/internal/sync/DeterministicRunnerImpl.java +++ b/src/main/java/com/uber/cadence/internal/sync/DeterministicRunnerImpl.java @@ -648,5 +648,15 @@ public boolean getEnableLoggingInReplay() { public UUID randomUUID() { return UUID.randomUUID(); } + + @Override + public long getScheduleTimeMillis() { + throw new UnsupportedOperationException("not implemented"); + } + + @Override + public long getExecutionTimeMillis() { + throw new UnsupportedOperationException("not implemented"); + } } } diff --git a/src/main/java/com/uber/cadence/internal/sync/WorkflowInfoImpl.java b/src/main/java/com/uber/cadence/internal/sync/WorkflowInfoImpl.java index 94b7eae2d..a955d251e 100644 --- a/src/main/java/com/uber/cadence/internal/sync/WorkflowInfoImpl.java +++ b/src/main/java/com/uber/cadence/internal/sync/WorkflowInfoImpl.java @@ -64,4 +64,14 @@ public Duration getExecutionStartToCloseTimeout() { public ChildPolicy getChildPolicy() { return context.getChildPolicy(); } + + @Override + public long getScheduleTimeMillis() { + return context.getScheduleTimeMillis(); + } + + @Override + public long getExecutionTimeMillis() { + return context.getExecutionTimeMillis(); + } } diff --git a/src/main/java/com/uber/cadence/internal/testservice/StateMachines.java b/src/main/java/com/uber/cadence/internal/testservice/StateMachines.java index 77b4f943d..2a490bd17 100644 --- a/src/main/java/com/uber/cadence/internal/testservice/StateMachines.java +++ b/src/main/java/com/uber/cadence/internal/testservice/StateMachines.java @@ -485,9 +485,13 @@ private static void addStartChildTask( .execute( () -> { try { + int backoffIntervalSecs = + TestWorkflowMutableStateImpl.getBackoffIntervalSeconds( + data.service::currentTimeMillis, startChild.getCronSchedule()); + data.service.startWorkflowExecutionImpl( startChild, - 0, + backoffIntervalSecs, Optional.of(ctx.getWorkflowMutableState()), OptionalLong.of(data.initiatedEventId), Optional.empty()); diff --git a/src/main/java/com/uber/cadence/internal/testservice/TestWorkflowMutableStateImpl.java b/src/main/java/com/uber/cadence/internal/testservice/TestWorkflowMutableStateImpl.java index a952110bd..7128ca84a 100644 --- a/src/main/java/com/uber/cadence/internal/testservice/TestWorkflowMutableStateImpl.java +++ b/src/main/java/com/uber/cadence/internal/testservice/TestWorkflowMutableStateImpl.java @@ -888,21 +888,8 @@ private void startNewCronRun( WorkflowData data, byte[] lastCompletionResult) throws InternalServiceError, BadRequestError { - CronDefinition cronDefinition = CronDefinitionBuilder.instanceDefinitionFor(CronType.UNIX); - CronParser parser = new CronParser(cronDefinition); - Cron cron = parser.parse(data.cronSchedule); - - Instant i = Instant.ofEpochMilli(store.currentTimeMillis()); - ZonedDateTime now = ZonedDateTime.ofInstant(i, ZoneOffset.UTC); - - ExecutionTime executionTime = ExecutionTime.forCron(cron); - Optional backoff = executionTime.timeToNextExecution(now); - int backoffIntervalSeconds = (int) backoff.get().getSeconds(); - - if (backoffIntervalSeconds == 0) { - backoff = executionTime.timeToNextExecution(now.plusSeconds(1)); - backoffIntervalSeconds = (int) backoff.get().getSeconds() + 1; - } + int backoffIntervalSeconds = + getBackoffIntervalSeconds(store::currentTimeMillis, data.cronSchedule); ContinueAsNewWorkflowExecutionDecisionAttributes continueAsNewAttr = new ContinueAsNewWorkflowExecutionDecisionAttributes() @@ -932,6 +919,29 @@ private void startNewCronRun( continuedAsNewEventAttributes.setNewExecutionRunId(runId); } + static int getBackoffIntervalSeconds(LongSupplier currTimeMillis, String cronSchedule) { + if (Strings.isNullOrEmpty(cronSchedule)) { + return 0; + } + + CronDefinition cronDefinition = CronDefinitionBuilder.instanceDefinitionFor(CronType.UNIX); + CronParser parser = new CronParser(cronDefinition); + Cron cron = parser.parse(cronSchedule); + + Instant i = Instant.ofEpochMilli(currTimeMillis.getAsLong()); + ZonedDateTime now = ZonedDateTime.ofInstant(i, ZoneOffset.UTC); + + ExecutionTime executionTime = ExecutionTime.forCron(cron); + Optional backoff = executionTime.timeToNextExecution(now); + int backoffIntervalSeconds = (int) backoff.get().getSeconds(); + + if (backoffIntervalSeconds == 0) { + backoff = executionTime.timeToNextExecution(now.plusSeconds(1)); + backoffIntervalSeconds = (int) backoff.get().getSeconds() + 1; + } + return backoffIntervalSeconds; + } + private void processCancelWorkflowExecution( RequestContext ctx, CancelWorkflowExecutionDecisionAttributes d, long decisionTaskCompletedId) throws InternalServiceError, BadRequestError { diff --git a/src/main/java/com/uber/cadence/internal/testservice/TestWorkflowService.java b/src/main/java/com/uber/cadence/internal/testservice/TestWorkflowService.java index abfe73b2d..e585fad64 100644 --- a/src/main/java/com/uber/cadence/internal/testservice/TestWorkflowService.java +++ b/src/main/java/com/uber/cadence/internal/testservice/TestWorkflowService.java @@ -207,8 +207,16 @@ public void DeprecateDomain(DeprecateDomainRequest deprecateRequest) @Override public StartWorkflowExecutionResponse StartWorkflowExecution( StartWorkflowExecutionRequest startRequest) throws TException { + int backoffIntervalSecs = + TestWorkflowMutableStateImpl.getBackoffIntervalSeconds( + store::currentTimeMillis, startRequest.getCronSchedule()); + return startWorkflowExecutionImpl( - startRequest, 0, Optional.empty(), OptionalLong.empty(), Optional.empty()); + startRequest, + backoffIntervalSecs, + Optional.empty(), + OptionalLong.empty(), + Optional.empty()); } StartWorkflowExecutionResponse startWorkflowExecutionImpl( @@ -501,10 +509,7 @@ public void SignalWorkflowExecution(SignalWorkflowExecutionRequest signalRequest @Override public StartWorkflowExecutionResponse SignalWithStartWorkflowExecution( - SignalWithStartWorkflowExecutionRequest r) - throws BadRequestError, InternalServiceError, EntityNotExistsError, ServiceBusyError, - DomainNotActiveError, LimitExceededError, WorkflowExecutionAlreadyStartedError, - TException { + SignalWithStartWorkflowExecutionRequest r) throws TException { ExecutionId executionId = new ExecutionId(r.getDomain(), r.getWorkflowId(), null); TestWorkflowMutableState mutableState = getMutableState(executionId, false); SignalWorkflowExecutionRequest signalRequest = @@ -535,8 +540,16 @@ public StartWorkflowExecutionResponse SignalWithStartWorkflowExecution( .setCronSchedule(r.getCronSchedule()) .setRequestId(r.getRequestId()) .setIdentity(r.getIdentity()); + + int backoffIntervalSecs = + TestWorkflowMutableStateImpl.getBackoffIntervalSeconds( + store::currentTimeMillis, startRequest.getCronSchedule()); return startWorkflowExecutionImpl( - startRequest, 0, Optional.empty(), OptionalLong.empty(), Optional.of(signalRequest)); + startRequest, + backoffIntervalSecs, + Optional.empty(), + OptionalLong.empty(), + Optional.of(signalRequest)); } // TODO: https://github.com/uber/cadence-java-client/issues/359 diff --git a/src/main/java/com/uber/cadence/workflow/WorkflowInfo.java b/src/main/java/com/uber/cadence/workflow/WorkflowInfo.java index 04a0ab5a0..ab2a00e44 100644 --- a/src/main/java/com/uber/cadence/workflow/WorkflowInfo.java +++ b/src/main/java/com/uber/cadence/workflow/WorkflowInfo.java @@ -35,4 +35,8 @@ public interface WorkflowInfo { Duration getExecutionStartToCloseTimeout(); ChildPolicy getChildPolicy(); + + long getScheduleTimeMillis(); + + long getExecutionTimeMillis(); } diff --git a/src/test/java/com/uber/cadence/workflow/WorkflowTest.java b/src/test/java/com/uber/cadence/workflow/WorkflowTest.java index 1d2fb346f..035d71e52 100644 --- a/src/test/java/com/uber/cadence/workflow/WorkflowTest.java +++ b/src/test/java/com/uber/cadence/workflow/WorkflowTest.java @@ -3249,20 +3249,24 @@ public String execute(String testName) { lastCompletionResult = Workflow.getLastCompletionResult(String.class); + SimpleDateFormat sdf = new SimpleDateFormat("MMM dd,yyyy HH:mm:ss.SSS"); + + Date scheduleTime = new Date(Workflow.getWorkflowInfo().getScheduleTimeMillis()); + log.info("TestWorkflowWithCronScheduleImpl scheduled at " + sdf.format(scheduleTime)); + + Date now = new Date(Workflow.currentTimeMillis()); + log.info("TestWorkflowWithCronScheduleImpl run at " + sdf.format(now)); + AtomicInteger count = retryCount.get(testName); if (count == null) { count = new AtomicInteger(); retryCount.put(testName, count); } int c = count.incrementAndGet(); - if (c == 3) { throw new RuntimeException("simulated error"); } - SimpleDateFormat sdf = new SimpleDateFormat("MMM dd,yyyy HH:mm:ss.SSS"); - Date now = new Date(Workflow.currentTimeMillis()); - log.debug("TestWorkflowWithCronScheduleImpl run at " + sdf.format(now)); return "run " + c; } }