From 27fc1faaed20ec7d65bbd5c0c2bf4fb2a6745e48 Mon Sep 17 00:00:00 2001 From: Sam Sussman Date: Sun, 15 Jan 2023 03:36:11 -0600 Subject: [PATCH] feat: replace sleep with time and duration (#221) --- apps/test-app-runtime/src/slack-bot.ts | 4 +- apps/tests/aws-runtime/test/test-service.ts | 42 +- packages/@eventual/aws-cdk/src/scheduler.ts | 4 +- .../@eventual/aws-cdk/src/service-function.ts | 3 +- packages/@eventual/aws-cdk/src/service.ts | 4 +- packages/@eventual/aws-cdk/src/workflows.ts | 2 +- .../aws-runtime/src/clients/create.ts | 2 +- .../aws-runtime/src/clients/timer-client.ts | 55 ++- .../src/clients/workflow-client.ts | 9 +- .../src/handlers/api/executions/new.ts | 37 +- packages/@eventual/cli/src/commands/replay.ts | 3 - packages/@eventual/cli/src/commands/start.ts | 13 +- .../@eventual/cli/src/replay/orchestrator.ts | 24 +- .../client/src/http-service-client.ts | 3 +- .../@eventual/compiler/test-files/workflow.ts | 4 +- .../__snapshots__/esbuild-plugin.test.ts.snap | 4 +- packages/@eventual/core/src/activity.ts | 17 +- packages/@eventual/core/src/await-time.ts | 99 ++++ .../@eventual/core/src/calls/activity-call.ts | 10 +- .../core/src/calls/await-time-call.ts | 50 ++ .../core/src/calls/condition-call.ts | 7 +- .../core/src/calls/expect-signal-call.ts | 7 +- packages/@eventual/core/src/calls/index.ts | 3 +- .../@eventual/core/src/calls/sleep-call.ts | 44 -- packages/@eventual/core/src/command.ts | 60 +-- packages/@eventual/core/src/condition.ts | 12 +- packages/@eventual/core/src/error.ts | 2 +- packages/@eventual/core/src/eventual.ts | 45 +- packages/@eventual/core/src/heartbeat.ts | 2 +- packages/@eventual/core/src/index.ts | 3 +- packages/@eventual/core/src/interpret.ts | 111 +++-- .../core/src/runtime/clients/timer-client.ts | 56 +-- .../core/src/runtime/command-executor.ts | 130 +----- packages/@eventual/core/src/runtime/flags.ts | 28 ++ .../src/runtime/handlers/activity-worker.ts | 372 +++++++-------- .../core/src/runtime/handlers/api-handler.ts | 20 +- .../src/runtime/handlers/event-handler.ts | 20 +- .../core/src/runtime/handlers/orchestrator.ts | 130 +++--- .../src/runtime/handlers/timer-handler.ts | 4 +- .../core/src/runtime/metrics/constants.ts | 4 +- packages/@eventual/core/src/schedule.ts | 81 ++++ packages/@eventual/core/src/signals.ts | 54 ++- packages/@eventual/core/src/sleep.ts | 34 -- .../@eventual/core/src/workflow-events.ts | 106 +---- packages/@eventual/core/src/workflow.ts | 43 +- packages/@eventual/core/test/command-util.ts | 126 +----- .../core/test/commend-executor.test.ts | 207 +-------- .../@eventual/core/test/interpret.test.ts | 426 +++++++++++------- .../testing/src/clients/event-client.ts | 6 +- .../testing/src/clients/timer-client.ts | 11 +- .../testing/src/clients/workflow-client.ts | 7 +- .../src/clients/workflow-runtime-client.ts | 14 +- packages/@eventual/testing/src/environment.ts | 5 +- packages/@eventual/testing/src/utils.ts | 15 - packages/@eventual/testing/test/env.test.ts | 2 +- packages/@eventual/testing/test/workflow.ts | 20 +- 56 files changed, 1239 insertions(+), 1367 deletions(-) create mode 100644 packages/@eventual/core/src/await-time.ts create mode 100644 packages/@eventual/core/src/calls/await-time-call.ts delete mode 100644 packages/@eventual/core/src/calls/sleep-call.ts create mode 100644 packages/@eventual/core/src/schedule.ts delete mode 100644 packages/@eventual/core/src/sleep.ts delete mode 100644 packages/@eventual/testing/src/utils.ts diff --git a/apps/test-app-runtime/src/slack-bot.ts b/apps/test-app-runtime/src/slack-bot.ts index 865b34e65..7133527b4 100644 --- a/apps/test-app-runtime/src/slack-bot.ts +++ b/apps/test-app-runtime/src/slack-bot.ts @@ -1,10 +1,10 @@ import { Slack, SlackCredentials } from "@eventual/integrations-slack"; import { AWSSecret } from "@eventual/aws-client"; import { + duration, expectSignal, JsonSecret, sendSignal, - sleepFor, workflow, } from "@eventual/core"; import ms from "ms"; @@ -47,7 +47,7 @@ const remindMe = workflow( message: string; waitSeconds: number; }) => { - await sleepFor(request.waitSeconds); + await duration(request.waitSeconds); await slack.client.chat.postMessage({ channel: request.channel, diff --git a/apps/tests/aws-runtime/test/test-service.ts b/apps/tests/aws-runtime/test/test-service.ts index de0192702..53cb95940 100644 --- a/apps/tests/aws-runtime/test/test-service.ts +++ b/apps/tests/aws-runtime/test/test-service.ts @@ -5,13 +5,13 @@ import { expectSignal, asyncResult, sendSignal, - sleepFor, - sleepUntil, + time, workflow, sendActivityHeartbeat, HeartbeatTimeout, EventualError, signal, + duration, } from "@eventual/core"; import { SendMessageCommand, SQSClient } from "@aws-sdk/client-sqs"; import { AsyncWriterTestEvent } from "./async-writer-handler.js"; @@ -62,8 +62,8 @@ export const workflow2 = workflow("my-parent-workflow", async () => { }); export const workflow3 = workflow("sleepy", async () => { - await sleepFor(2); - await sleepUntil(new Date(new Date().getTime() + 1000 * 2)); + await duration(2); + await time(new Date(new Date().getTime() + 1000 * 2)); return `done!`; }); @@ -81,7 +81,7 @@ export const workflow4 = workflow("parallel", async () => { return Promise.allSettled([greetings, greetings2, greetings3, any, race]); async function sayHelloInSeconds(seconds: number) { - await sleepFor(seconds); + await duration(seconds); return await hello("sam"); } }); @@ -95,7 +95,7 @@ const doneSignal = signal("done"); export const parentWorkflow = workflow("parentWorkflow", async () => { const child = childWorkflow({ name: "child" }); while (true) { - const n = await mySignal.expectSignal({ timeoutSeconds: 10 }); + const n = await mySignal.expectSignal({ timeout: duration(10, "seconds") }); console.log(n); @@ -142,7 +142,9 @@ export const childWorkflow = workflow( while (!done) { sendSignal(parentId, mySignal, last + 1); block = true; - if (!(await condition({ timeoutSeconds: 10 }, () => !block))) { + if ( + !(await condition({ timeout: duration(10, "seconds") }, () => !block)) + ) { throw new Error("timed out!"); } } @@ -153,27 +155,31 @@ export const childWorkflow = workflow( const slowActivity = activity( "slowAct", - { timeoutSeconds: 5 }, + { timeout: duration(5, "seconds") }, () => new Promise((resolve) => setTimeout(resolve, 10 * 1000)) ); -const slowWf = workflow("slowWorkflow", { timeoutSeconds: 5 }, () => - sleepFor(10) +const slowWf = workflow( + "slowWorkflow", + { timeout: duration(5, "seconds") }, + () => duration(10) ); export const timedOutWorkflow = workflow( "timedOut", - { timeoutSeconds: 100 }, + { timeout: duration(100, "seconds") }, async () => { // chains to be able to run in parallel. const timedOutFunctions = { condition: async () => { - if (!(await condition({ timeoutSeconds: 2 }, () => false))) { + if ( + !(await condition({ timeout: duration(2, "seconds") }, () => false)) + ) { throw new Error("Timed Out!"); } }, signal: async () => { - await mySignal.expectSignal({ timeoutSeconds: 2 }); + await mySignal.expectSignal({ timeout: duration(2, "seconds") }); }, activity: slowActivity, workflow: () => slowWf(undefined), @@ -196,7 +202,7 @@ export const timedOutWorkflow = workflow( export const asyncWorkflow = workflow( "asyncWorkflow", - { timeoutSeconds: 100 }, // timeout eventually + { timeout: duration(100, "seconds") }, // timeout eventually async () => { const result = await asyncActivity("complete"); @@ -211,7 +217,7 @@ export const asyncWorkflow = workflow( const activityWithHeartbeat = activity( "activityWithHeartbeat", - { heartbeatSeconds: 2 }, + { heartbeatTimeout: duration(2, "seconds") }, async (n: number, type: "success" | "no-heartbeat" | "some-heartbeat") => { const delay = (s: number) => new Promise((resolve) => { @@ -234,7 +240,7 @@ const activityWithHeartbeat = activity( export const heartbeatWorkflow = workflow( "heartbeatWorkflow", - { timeoutSeconds: 100 }, // timeout eventually + { timeout: duration(100, "seconds") }, // timeout eventually async (n: number) => { return await Promise.allSettled([ activityWithHeartbeat(n, "success"), @@ -291,13 +297,13 @@ export const eventDrivenWorkflow = workflow( // wait for the event to come back around and wake this workflow const { value } = await expectSignal("start", { - timeoutSeconds: 30, + timeout: duration(30, "seconds"), }); await sendFinishEvent(ctx.execution.id); await expectSignal("finish", { - timeoutSeconds: 30, + timeout: duration(30, "seconds"), }); return value; diff --git a/packages/@eventual/aws-cdk/src/scheduler.ts b/packages/@eventual/aws-cdk/src/scheduler.ts index 9b9096f87..5531fde9e 100644 --- a/packages/@eventual/aws-cdk/src/scheduler.ts +++ b/packages/@eventual/aws-cdk/src/scheduler.ts @@ -39,7 +39,7 @@ export interface SchedulerProps { } /** - * Subsystem that orchestrates long running timers. Used to orchestrate timeouts, sleep + * Subsystem that orchestrates long running timers. Used to orchestrate timeouts, timers * and heartbeats. */ export class Scheduler extends Construct implements IScheduler, IGrantable { @@ -48,7 +48,7 @@ export class Scheduler extends Construct implements IScheduler, IGrantable { */ public readonly schedulerRole: IRole; /** - * Timer (standard) queue which helps orchestrate scheduled things like sleep and dynamic retries. + * Timer (standard) queue which helps orchestrate scheduled things like timers, heartbeat, and dynamic retries. * * Worths in tandem with the {@link CfnSchedulerGroup} to create millisecond latency, long running timers. */ diff --git a/packages/@eventual/aws-cdk/src/service-function.ts b/packages/@eventual/aws-cdk/src/service-function.ts index e0ba86516..d0fe9759a 100644 --- a/packages/@eventual/aws-cdk/src/service-function.ts +++ b/packages/@eventual/aws-cdk/src/service-function.ts @@ -1,4 +1,4 @@ -import { ServiceType, SERVICE_TYPE_FLAG } from "@eventual/core"; +import { ServiceType } from "@eventual/core"; import { Architecture, Code, @@ -28,7 +28,6 @@ export class ServiceFunction extends Function { environment: { ...props.environment, NODE_OPTIONS: "--enable-source-maps", - [SERVICE_TYPE_FLAG]: props.serviceType, }, }); } diff --git a/packages/@eventual/aws-cdk/src/service.ts b/packages/@eventual/aws-cdk/src/service.ts index 582c3f116..bb59e2901 100644 --- a/packages/@eventual/aws-cdk/src/service.ts +++ b/packages/@eventual/aws-cdk/src/service.ts @@ -122,11 +122,11 @@ export class Service extends Construct implements IGrantable { */ public readonly workflows: Workflows; /** - * The subsystem for schedules and sleep timers. + * The subsystem for schedules and timers. */ public readonly scheduler: Scheduler; /** - * The Resources for schedules and sleep timers. + * The Resources for schedules and timers. */ public readonly cliRole: Role; /** diff --git a/packages/@eventual/aws-cdk/src/workflows.ts b/packages/@eventual/aws-cdk/src/workflows.ts index ff2ca0af2..c3ec40311 100644 --- a/packages/@eventual/aws-cdk/src/workflows.ts +++ b/packages/@eventual/aws-cdk/src/workflows.ts @@ -200,7 +200,7 @@ export class Workflows extends Construct implements IWorkflows, IGrantable { this.configureRecordHistory(this.orchestrator); // allows the orchestrator to directly invoke the activity worker lambda function (async) this.props.activities.configureScheduleActivity(this.orchestrator); - // allows allows the orchestrator to start timeout and sleep timers + // allows allows the orchestrator to start timeout and timers this.props.scheduler.configureScheduleTimer(this.orchestrator); // allows the orchestrator to send events to the workflow queue, // write events to the execution table, and start other workflows diff --git a/packages/@eventual/aws-runtime/src/clients/create.ts b/packages/@eventual/aws-runtime/src/clients/create.ts index 95ce80b68..62ccb720d 100644 --- a/packages/@eventual/aws-runtime/src/clients/create.ts +++ b/packages/@eventual/aws-runtime/src/clients/create.ts @@ -117,7 +117,7 @@ export const createTimerClient = /* @__PURE__ */ memoize( schedulerRoleArn: props.schedulerRoleArn ?? env.schedulerRoleArn(), schedulerDlqArn: props.schedulerDlqArn ?? env.schedulerDlqArn(), schedulerGroup: props.schedulerGroup ?? env.schedulerGroup(), - sleepQueueThresholdSeconds: props.sleepQueueThresholdSeconds ?? 15 * 60, + timerQueueThresholdSeconds: props.timerQueueThresholdSeconds ?? 15 * 60, sqs: props.sqs ?? sqs(), timerQueueUrl: props.timerQueueUrl ?? env.timerQueueUrl(), scheduleForwarderArn: diff --git a/packages/@eventual/aws-runtime/src/clients/timer-client.ts b/packages/@eventual/aws-runtime/src/clients/timer-client.ts index 220f84dc9..1cc071308 100644 --- a/packages/@eventual/aws-runtime/src/clients/timer-client.ts +++ b/packages/@eventual/aws-runtime/src/clients/timer-client.ts @@ -15,7 +15,10 @@ import { ScheduleForwarderRequest, TimerRequest, isActivityHeartbeatMonitorRequest, - computeUntilTime, + computeScheduleDate, + Schedule, + isTimeSchedule, + computeDurationSeconds, } from "@eventual/core"; import { ulid } from "ulidx"; @@ -25,10 +28,10 @@ export interface AWSTimerClientProps { readonly schedulerDlqArn: string; readonly schedulerGroup: string; /** - * If a sleep has a longer duration (in seconds) than this threshold, + * If a timer has a longer duration (in seconds) than this threshold, * create an Event Bus Scheduler before sending it to the TimerQueue */ - readonly sleepQueueThresholdSeconds: number; + readonly timerQueueThresholdSeconds: number; readonly timerQueueUrl: string; readonly sqs: SQSClient; readonly scheduleForwarderArn: string; @@ -36,7 +39,7 @@ export interface AWSTimerClientProps { export class AWSTimerClient extends TimerClient { constructor(private props: AWSTimerClientProps) { - super(); + super(() => new Date()); } /** @@ -52,7 +55,10 @@ export class AWSTimerClient extends TimerClient { * the {@link TimerRequest} provided. */ public async startShortTimer(timerRequest: TimerRequest) { - const delaySeconds = computeTimerSeconds(timerRequest.schedule); + const delaySeconds = computeTimerSeconds( + timerRequest.schedule, + this.baseTime() + ); if (delaySeconds > 15 * 60) { throw new Error( @@ -74,8 +80,8 @@ export class AWSTimerClient extends TimerClient { /** * Starts a timer of any (positive) length. * - * If the timer is longer than 15 minutes (configurable via `props.sleepQueueThresholdMillis`), - * the timer will create a EventBridge schedule until the untilTime - props.sleepQueueThresholdMillis + * If the timer is longer than 15 minutes (configurable via `props.timerQueueThresholdMillis`), + * the timer will create a EventBridge schedule until the untilTime - props.timerQueueThresholdMillis * when the timer will be moved to the SQS queue. * * The SQS Queue will delay for floor(untilTime - currentTime) seconds until the timer handler can pick up the message. @@ -84,22 +90,27 @@ export class AWSTimerClient extends TimerClient { * the {@link TimerRequest} provided. */ public async startTimer(timerRequest: TimerRequest) { - const untilTimeIso = computeUntilTime(timerRequest.schedule); - const untilTime = new Date(untilTimeIso); - const sleepDuration = computeTimerSeconds(timerRequest.schedule); + const untilTime = computeScheduleDate( + timerRequest.schedule, + this.baseTime() + ); + const timerDuration = computeTimerSeconds( + timerRequest.schedule, + this.baseTime() + ); /** - * If the sleep is longer than 15 minutes, create an EventBridge schedule first. + * If the timer is longer than 15 minutes, create an EventBridge schedule first. * The Schedule will trigger a lambda which will re-compute the delay time and * create a message in the timerQueue. * - * The timerQueue ultimately will pick up the event and forward the {@link SleepComplete} to the workflow queue. + * The timerQueue ultimately will pick up the event and forward the {@link TimerComplete} to the workflow queue. */ - if (sleepDuration > this.props.sleepQueueThresholdSeconds) { - // wait for utilTime - sleepQueueThresholdMillis and then forward the event to + if (timerDuration > this.props.timerQueueThresholdSeconds) { + // wait for utilTime - timerQueueThresholdMillis and then forward the event to // the timerQueue const scheduleTime = - untilTime.getTime() - this.props.sleepQueueThresholdSeconds; + untilTime.getTime() - this.props.timerQueueThresholdSeconds; // EventBridge Scheduler only supports HH:MM:SS, strip off the milliseconds and `Z`. const formattedSchedulerTime = new Date(scheduleTime) .toISOString() @@ -112,7 +123,7 @@ export class AWSTimerClient extends TimerClient { scheduleName, timerRequest, forwardTime: "", - untilTime: untilTimeIso, + untilTime: untilTime.toISOString(), }; try { @@ -145,7 +156,7 @@ export class AWSTimerClient extends TimerClient { } } else { /** - * When the sleep is less than 15 minutes, send the timer directly to the + * When the timer is less than 15 minutes, send the timer directly to the * timer queue. The timer queue will pass the event on to the workflow queue * once delaySeconds have passed. */ @@ -159,7 +170,7 @@ export class AWSTimerClient extends TimerClient { * Use this method to clean the schedule. * * The provided schedule-forwarder function will call this method in Eventual when - * the timer is transferred from EventBridge to SQS at `props.sleepQueueThresholdMillis`. + * the timer is transferred from EventBridge to SQS at `props.timerQueueThresholdMillis`. */ public async clearSchedule(scheduleName: string) { try { @@ -203,16 +214,16 @@ function safeScheduleName(name: string) { return name.replaceAll(/[^0-9a-zA-Z-_.]/g, ""); } -export function computeTimerSeconds(schedule: TimerRequest["schedule"]) { - return "untilTime" in schedule +function computeTimerSeconds(schedule: Schedule, baseTime: Date) { + return isTimeSchedule(schedule) ? Math.max( // Compute the number of seconds (floored) // subtract 1 because the maxBatchWindow is set to 1s on the lambda event source. // this allows for more events to be sent at once while not adding extra latency Math.ceil( - (new Date(schedule.untilTime).getTime() - new Date().getTime()) / 1000 + (new Date(schedule.isoDate).getTime() - baseTime.getTime()) / 1000 ), 0 ) - : schedule.timerSeconds; + : computeDurationSeconds(schedule.dur, schedule.unit); } diff --git a/packages/@eventual/aws-runtime/src/clients/workflow-client.ts b/packages/@eventual/aws-runtime/src/clients/workflow-client.ts index 33ed90fbc..4a8dad199 100644 --- a/packages/@eventual/aws-runtime/src/clients/workflow-client.ts +++ b/packages/@eventual/aws-runtime/src/clients/workflow-client.ts @@ -25,6 +25,7 @@ import { lookupWorkflow, SortOrder, isExecutionStatus, + computeScheduleDate, } from "@eventual/core"; import { ulid } from "ulidx"; import { inspect } from "util"; @@ -56,7 +57,7 @@ export class AWSWorkflowClient extends WorkflowClient { executionName = ulid(), workflow, input, - timeoutSeconds, + timeout, ...request }: StartExecutionRequest | StartChildExecutionRequest) { if (typeof workflow === "string" && !lookupWorkflow(workflow)) { @@ -107,10 +108,8 @@ export class AWSWorkflowClient extends WorkflowClient { workflowName, // generate the time for the workflow to timeout based on when it was started. // the timer will be started by the orchestrator so the client does not need to have access to the timer client. - timeoutTime: timeoutSeconds - ? new Date( - new Date().getTime() + timeoutSeconds * 1000 - ).toISOString() + timeoutTime: timeout + ? computeScheduleDate(timeout, this.baseTime()).toISOString() : undefined, context: { name: executionName, diff --git a/packages/@eventual/aws-runtime/src/handlers/api/executions/new.ts b/packages/@eventual/aws-runtime/src/handlers/api/executions/new.ts index 16d8e03f5..fa13917c0 100644 --- a/packages/@eventual/aws-runtime/src/handlers/api/executions/new.ts +++ b/packages/@eventual/aws-runtime/src/handlers/api/executions/new.ts @@ -1,7 +1,13 @@ // startWorkflow uses the global workflows() to validate the workflow name. import "@eventual/entry/injected"; -import type { StartExecutionResponse } from "@eventual/core"; +import { + DurationUnit, + DURATION_UNITS, + isDurationUnit, + Schedule, + StartExecutionResponse, +} from "@eventual/core"; import type { APIGatewayProxyEventV2, APIGatewayProxyHandlerV2, @@ -20,21 +26,30 @@ const workflowClient = createWorkflowClient({ * * workflowName - name of the workflow to start * * Query Parameters: - * * timeoutSeconds - Number of seconds the workflow should run before it times out. Default: use the configured timeout or no timeout. + * * timeout - Number of `timeoutUnit` (default seconds) the workflow should run before it times out. Default: use the configured timeout or no timeout. + * * timeoutUnit - "seconds" | "minutes" | "hours" | "days" | "years". Units to use for the timeout, default: "seconds". * * executionName - name to give the workflow. Default: auto generated UUID. */ export const handler: APIGatewayProxyHandlerV2 = withErrorMiddleware(async (event: APIGatewayProxyEventV2) => { - const { timeoutSeconds: timeoutSecondsString, executionName } = - event.queryStringParameters ?? {}; + const { + timeout: timeoutString, + timeoutUnit, + executionName, + } = event.queryStringParameters ?? {}; + + const timeout = timeoutString ? parseInt(timeoutString) : undefined; - const timeoutSeconds = timeoutSecondsString - ? parseInt(timeoutSecondsString) - : undefined; + if (timeout !== undefined && isNaN(timeout)) { + throw new Error( + "Expected optional parameter timeout to be a valid number" + ); + } - if (timeoutSeconds !== undefined && isNaN(timeoutSeconds)) { + if (timeoutUnit && !isDurationUnit(timeoutUnit)) { throw new Error( - "Expected optional parameter timeoutSeconds to be a valid number" + "Expected optional parameter timeoutUnit to be one of: " + + DURATION_UNITS.join() ); } @@ -47,6 +62,8 @@ export const handler: APIGatewayProxyHandlerV2 = workflow: workflowName, input: event.body && JSON.parse(event.body), executionName, - timeoutSeconds, + timeout: timeout + ? Schedule.duration(timeout, timeoutUnit as DurationUnit) + : undefined, }); }); diff --git a/packages/@eventual/cli/src/commands/replay.ts b/packages/@eventual/cli/src/commands/replay.ts index 8a6955707..cab081cd1 100644 --- a/packages/@eventual/cli/src/commands/replay.ts +++ b/packages/@eventual/cli/src/commands/replay.ts @@ -2,8 +2,6 @@ import { encodeExecutionId, ExecutionID, parseWorkflowName, - ServiceType, - SERVICE_TYPE_FLAG, workflows, } from "@eventual/core"; import { Argv } from "yargs"; @@ -30,7 +28,6 @@ export const replay = (yargs: Argv) => }), serviceAction( async (spinner, serviceClient, { entry, service, execution }) => { - process.env[SERVICE_TYPE_FLAG] = ServiceType.OrchestratorWorker; spinner.start("Constructing replay..."); const [, { events }] = await Promise.all([ loadService(service, encodeExecutionId(execution), entry), diff --git a/packages/@eventual/cli/src/commands/start.ts b/packages/@eventual/cli/src/commands/start.ts index 33e364131..47ea0bf8a 100644 --- a/packages/@eventual/cli/src/commands/start.ts +++ b/packages/@eventual/cli/src/commands/start.ts @@ -4,6 +4,9 @@ import { isWorkflowSucceeded, isWorkflowFailed, ExecutionEventsResponse, + DURATION_UNITS, + DurationUnit, + Schedule, } from "@eventual/core"; import { Argv } from "yargs"; import { serviceAction, setServiceOptions } from "../service-action.js"; @@ -49,6 +52,12 @@ export const start = (yargs: Argv) => type: "number", defaultDescription: "Configured on the workflow definition or no timeout.", + }) + .option("timeoutUnit", { + describe: "Number of seconds until the execution times out.", + type: "string", + choices: DURATION_UNITS, + default: "seconds", }), (args) => { return serviceAction( @@ -94,7 +103,9 @@ export const start = (yargs: Argv) => workflow: args.workflow, input: inputJSON, executionName: args.name, - timeoutSeconds: args.timeout, + timeout: args.timeout + ? Schedule.duration(args.timeout, args.timeoutUnit as DurationUnit) + : undefined, }); } } diff --git a/packages/@eventual/cli/src/replay/orchestrator.ts b/packages/@eventual/cli/src/replay/orchestrator.ts index 9417d7f7e..1e0432db0 100644 --- a/packages/@eventual/cli/src/replay/orchestrator.ts +++ b/packages/@eventual/cli/src/replay/orchestrator.ts @@ -5,6 +5,8 @@ import { isWorkflowStarted, isHistoryEvent, runWorkflowDefinition, + ServiceType, + serviceTypeScopeSync, } from "@eventual/core"; export type Orchestrator = typeof orchestrator; @@ -24,15 +26,17 @@ export function orchestrator( throw new Error("Missing start event"); } const interpretEvents = historyEvents.filter(isHistoryEvent); - return interpret( - runWorkflowDefinition(workflow, startEvent.input, { - workflow: { name: workflow.name }, - execution: { - ...startEvent.context, - startTime: startEvent.timestamp, - id: executionId, - }, - }), - interpretEvents + return serviceTypeScopeSync(ServiceType.OrchestratorWorker, () => + interpret( + runWorkflowDefinition(workflow, startEvent.input, { + workflow: { name: workflow.name }, + execution: { + ...startEvent.context, + startTime: startEvent.timestamp, + id: executionId, + }, + }), + interpretEvents + ) ); } diff --git a/packages/@eventual/client/src/http-service-client.ts b/packages/@eventual/client/src/http-service-client.ts index bf54e62b5..3a0db90fd 100644 --- a/packages/@eventual/client/src/http-service-client.ts +++ b/packages/@eventual/client/src/http-service-client.ts @@ -75,7 +75,8 @@ export class HttpServiceClient implements EventualServiceClient { : request.workflow.workflowName; const queryString = formatQueryString({ - timeoutSeconds: request.timeoutSeconds, + timeout: request.timeout?.dur, + timeoutUnit: request.timeout?.unit, executionName: request.executionName, }); diff --git a/packages/@eventual/compiler/test-files/workflow.ts b/packages/@eventual/compiler/test-files/workflow.ts index f395b541d..f882e963d 100644 --- a/packages/@eventual/compiler/test-files/workflow.ts +++ b/packages/@eventual/compiler/test-files/workflow.ts @@ -59,7 +59,7 @@ export default workflow("workflow", async (input) => { export const workflow2 = workflow( "timeoutFlow", - { timeoutSeconds: 100 }, + { timeout: duration(100, "seconds") }, async () => { await doWork("something"); } @@ -69,6 +69,6 @@ export const workflow3 = workflow("timeoutFlow", async () => { await callMe(); async function callMe() { - await sleepFor(20); + await duration(20, "seconds"); } }); diff --git a/packages/@eventual/compiler/test/__snapshots__/esbuild-plugin.test.ts.snap b/packages/@eventual/compiler/test/__snapshots__/esbuild-plugin.test.ts.snap index a1d91ee7f..7188a34e5 100644 --- a/packages/@eventual/compiler/test/__snapshots__/esbuild-plugin.test.ts.snap +++ b/packages/@eventual/compiler/test/__snapshots__/esbuild-plugin.test.ts.snap @@ -111,13 +111,13 @@ exports[`esbuild-plugin ts workflow 1`] = ` yield func2(); }); var workflow2 = workflow("timeoutFlow", { - timeoutSeconds: 100 + timeout: duration(100, "seconds") }, function* () { yield doWork("something"); }); var workflow3 = workflow("timeoutFlow", function* () { const callMe = $eventual(function* () { - yield sleepFor(20); + yield duration(20, "seconds"); }); yield callMe(); }); diff --git a/packages/@eventual/core/src/activity.ts b/packages/@eventual/core/src/activity.ts index 61c9234ed..4378c8df3 100644 --- a/packages/@eventual/core/src/activity.ts +++ b/packages/@eventual/core/src/activity.ts @@ -1,9 +1,11 @@ import { createActivityCall } from "./calls/activity-call.js"; +import { createAwaitDurationCall } from "./calls/await-time-call.js"; import { callableActivities, getActivityContext, getServiceClient, } from "./global.js"; +import { computeDurationSeconds, DurationSchedule } from "./index.js"; import { isActivityWorker, isOrchestratorWorker } from "./runtime/flags.js"; import { EventualServiceClient, @@ -19,7 +21,7 @@ export interface ActivityOptions { * * @default - workflow will run forever. */ - timeoutSeconds?: number; + timeout?: DurationSchedule; /** * For long running activities, it is suggested that they report back that they * are still in progress to avoid waiting forever or until a long timeout when @@ -30,7 +32,7 @@ export interface ActivityOptions { * * If it fails to do so, the workflow will cancel the activity and throw an error. */ - heartbeatSeconds?: number; + heartbeatTimeout?: DurationSchedule; } export interface ActivityFunction { @@ -225,8 +227,15 @@ export function activity( return createActivityCall( activityID, args, - opts?.timeoutSeconds, - opts?.heartbeatSeconds + opts?.timeout + ? createAwaitDurationCall(opts.timeout.dur, opts.timeout.unit) + : undefined, + opts?.heartbeatTimeout + ? computeDurationSeconds( + opts.heartbeatTimeout.dur, + opts.heartbeatTimeout.unit + ) + : undefined ) as any; } else { // calling the activity from outside the orchestrator just calls the handler diff --git a/packages/@eventual/core/src/await-time.ts b/packages/@eventual/core/src/await-time.ts new file mode 100644 index 000000000..2d05d6861 --- /dev/null +++ b/packages/@eventual/core/src/await-time.ts @@ -0,0 +1,99 @@ +import { + createAwaitDurationCall, + createAwaitTimeCall, +} from "./calls/await-time-call.js"; +import { isOrchestratorWorker } from "./runtime/flags.js"; +import { DurationSchedule, DurationUnit, TimeSchedule } from "./schedule.js"; + +/** + * Represents a time duration. + * + * Within a workflow, awaiting a duration can be used to resume in relative period of time. + * + * ```ts + * workflow("myWorkflow", async () => { + * await duration(10, "minutes"); // sleep for 10 minutes + * return "DONE!"; + * }) + * ``` + * + * It behaves like any other promises, able to be aggregated with other promises. + * + * ```ts + * workflow("myWorkflow", async () => { + * const minTime = duration(10, "minutes"); + * // wait for 10 minutes OR the duration of myActivity, whichever is longer. + * await Promise.all([minTime, myActivity()]); + * return "DONE"; + * }) + * ``` + * + * A `duration` can be used to configure relative timeouts within a workflow or outside of it. + * + * ```ts + * // workflow that will timeout after an hour + * workflow("myWorkflow", { timeout: duration(1, "hour") }, async () => { + * // if the signal is not received within 30 minutes, the line will throw a Timeout error. + * await expectSignal("mySignal", { timeout: duration(30, "minutes"); }); + * }); + * ``` + * + * Durations are computing using a simple computation of the number of standard milliseconds in a + * period of time, not relative to the point in time, added to the milliseconds of the current execution time. + * + * duration(dur, unit): + * + * second(s) - dur * 1000 + * minute(s) - dur * 1000 * 60 + * hour(s) - dur * 1000 * 60 * 60 + * day(s) - dur * 1000 * 60 * 60 * 24 + * year(s) - dur * 1000 * 60 * 60 * 24 * 365.25 + */ +export function duration( + dur: number, + unit: DurationUnit = "seconds" +): Promise & DurationSchedule { + if (!isOrchestratorWorker()) { + return { dur, unit } as Promise & DurationSchedule; + } + + // register an await duration command and return it (to be yielded) + return createAwaitDurationCall(dur, unit) as any; +} + +/** + * Represents a point in time. + * + * Awaiting a duration can be used to resume at a point in time. + * + * ```ts + * workflow("myWorkflow", async () => { + * await time("2024-01-03T12:00:00Z"); // wait until this date + * return "DONE!"; + * }) + * ``` + * + * It behaves like any other promises, able to be aggregated with other promises. + * + * ```ts + * workflow("myWorkflow", async ({ endTime }) => { + * const goalTime = time(endTime); // sleep for 10 minutes + * // wait until the given time or until the activity is completed. + * await Promise.race([goalTime, await myActivity()]); + * return "DONE"; + * }) + * ``` + */ +export function time(isoDate: string): Promise & TimeSchedule; +export function time(date: Date): Promise & TimeSchedule; +export function time(date: Date | string): Promise & TimeSchedule { + const d = new Date(date); + const iso = d.toISOString(); + + if (!isOrchestratorWorker()) { + return { isoDate: iso } as Promise & TimeSchedule; + } + + // register an await time command and return it (to be yielded) + return createAwaitTimeCall(iso) as any; +} diff --git a/packages/@eventual/core/src/calls/activity-call.ts b/packages/@eventual/core/src/calls/activity-call.ts index 841ed3434..55708af2e 100644 --- a/packages/@eventual/core/src/calls/activity-call.ts +++ b/packages/@eventual/core/src/calls/activity-call.ts @@ -3,6 +3,7 @@ import { EventualBase, isEventualOfKind, createEventual, + Eventual, } from "../eventual.js"; import { registerEventual } from "../global.js"; import { Resolved, Failed } from "../result.js"; @@ -17,20 +18,23 @@ export interface ActivityCall name: string; args: any[]; heartbeatSeconds?: number; - timeoutSeconds?: number; + /** + * Timeout can be any Eventual (promise). When the promise resolves, the activity is considered to be timed out. + */ + timeout?: Eventual; } export function createActivityCall( name: string, args: any[], - timeoutSeconds?: number, + timeout?: Eventual, heartbeatSeconds?: number ): ActivityCall { return registerEventual( createEventual(EventualKind.ActivityCall, { name, args, - timeoutSeconds, + timeout, heartbeatSeconds, }) ); diff --git a/packages/@eventual/core/src/calls/await-time-call.ts b/packages/@eventual/core/src/calls/await-time-call.ts new file mode 100644 index 000000000..8e0cba955 --- /dev/null +++ b/packages/@eventual/core/src/calls/await-time-call.ts @@ -0,0 +1,50 @@ +import { + EventualKind, + EventualBase, + isEventualOfKind, + createEventual, +} from "../eventual.js"; +import { registerEventual } from "../global.js"; +import { Resolved } from "../result.js"; +import { DurationUnit } from "../schedule.js"; + +export function isAwaitDurationCall(a: any): a is AwaitDurationCall { + return isEventualOfKind(EventualKind.AwaitDurationCall, a); +} + +export function isAwaitTimeCall(a: any): a is AwaitTimeCall { + return isEventualOfKind(EventualKind.AwaitTimeCall, a); +} + +export interface AwaitDurationCall + extends EventualBase> { + seq?: number; + dur: number; + unit: DurationUnit; +} + +export interface AwaitTimeCall + extends EventualBase> { + seq?: number; + isoDate: string; +} + +export function createAwaitDurationCall( + dur: number, + unit: DurationUnit +): AwaitDurationCall { + return registerEventual( + createEventual(EventualKind.AwaitDurationCall, { + dur, + unit, + }) + ); +} + +export function createAwaitTimeCall(isoDate: string): AwaitTimeCall { + return registerEventual( + createEventual(EventualKind.AwaitTimeCall, { + isoDate, + }) + ); +} diff --git a/packages/@eventual/core/src/calls/condition-call.ts b/packages/@eventual/core/src/calls/condition-call.ts index 5a8c9fe07..ea5a1aa72 100644 --- a/packages/@eventual/core/src/calls/condition-call.ts +++ b/packages/@eventual/core/src/calls/condition-call.ts @@ -1,6 +1,7 @@ import { ConditionPredicate } from "../condition.js"; import { createEventual, + Eventual, EventualBase, EventualKind, isEventualOfKind, @@ -16,17 +17,17 @@ export interface ConditionCall extends EventualBase | Failed> { seq?: number; predicate: ConditionPredicate; - timeoutSeconds?: number; + timeout?: Eventual; } export function createConditionCall( predicate: ConditionPredicate, - timeoutSeconds?: number + timeout?: Eventual ) { return registerEventual( createEventual(EventualKind.ConditionCall, { predicate, - timeoutSeconds, + timeout, }) ); } diff --git a/packages/@eventual/core/src/calls/expect-signal-call.ts b/packages/@eventual/core/src/calls/expect-signal-call.ts index f15a0a661..0f63d8239 100644 --- a/packages/@eventual/core/src/calls/expect-signal-call.ts +++ b/packages/@eventual/core/src/calls/expect-signal-call.ts @@ -3,6 +3,7 @@ import { EventualBase, isEventualOfKind, createEventual, + Eventual, } from "../eventual.js"; import { registerEventual } from "../global.js"; import { Failed, Resolved } from "../result.js"; @@ -15,16 +16,16 @@ export interface ExpectSignalCall extends EventualBase | Failed> { seq?: number; signalId: string; - timeoutSeconds?: number; + timeout?: Eventual; } export function createExpectSignalCall( signalId: string, - timeoutSeconds?: number + timeout?: Eventual ): ExpectSignalCall { return registerEventual( createEventual(EventualKind.ExpectSignalCall, { - timeoutSeconds, + timeout, signalId, }) ); diff --git a/packages/@eventual/core/src/calls/index.ts b/packages/@eventual/core/src/calls/index.ts index 8da9398f4..876a7995f 100644 --- a/packages/@eventual/core/src/calls/index.ts +++ b/packages/@eventual/core/src/calls/index.ts @@ -1,3 +1,4 @@ export * from "./activity-call.js"; -export * from "./sleep-call.js"; +export * from "./condition-call.js"; +export * from "./await-time-call.js"; export * from "./expect-signal-call.js"; diff --git a/packages/@eventual/core/src/calls/sleep-call.ts b/packages/@eventual/core/src/calls/sleep-call.ts deleted file mode 100644 index 19d649565..000000000 --- a/packages/@eventual/core/src/calls/sleep-call.ts +++ /dev/null @@ -1,44 +0,0 @@ -import { - EventualKind, - EventualBase, - isEventualOfKind, - createEventual, -} from "../eventual.js"; -import { registerEventual } from "../global.js"; -import { Resolved } from "../result.js"; - -export function isSleepForCall(a: any): a is SleepForCall { - return isEventualOfKind(EventualKind.SleepForCall, a); -} - -export function isSleepUntilCall(a: any): a is SleepUntilCall { - return isEventualOfKind(EventualKind.SleepUntilCall, a); -} - -export interface SleepForCall - extends EventualBase> { - seq?: number; - durationSeconds: number; -} - -export interface SleepUntilCall - extends EventualBase> { - seq?: number; - isoDate: string; -} - -export function createSleepForCall(durationSeconds: number): SleepForCall { - return registerEventual( - createEventual(EventualKind.SleepForCall, { - durationSeconds, - }) - ); -} - -export function createSleepUntilCall(isoDate: string): SleepUntilCall { - return registerEventual( - createEventual(EventualKind.SleepUntilCall, { - isoDate, - }) - ); -} diff --git a/packages/@eventual/core/src/command.ts b/packages/@eventual/core/src/command.ts index 99e2c9957..c8c22fe43 100644 --- a/packages/@eventual/core/src/command.ts +++ b/packages/@eventual/core/src/command.ts @@ -1,16 +1,14 @@ import { EventEnvelope } from "./event.js"; +import { Schedule } from "./schedule.js"; import { SignalTarget } from "./signals.js"; import { WorkflowOptions } from "./workflow.js"; export type Command = - | ExpectSignalCommand + | StartTimerCommand | ScheduleActivityCommand | ScheduleWorkflowCommand | PublishEventsCommand - | SendSignalCommand - | SleepForCommand - | SleepUntilCommand - | StartConditionCommand; + | SendSignalCommand; interface CommandBase { kind: T; @@ -18,13 +16,10 @@ interface CommandBase { } export enum CommandType { - ExpectSignal = "ExpectSignal", PublishEvents = "PublishEvents", SendSignal = "SendSignal", - SleepFor = "SleepFor", - SleepUntil = "SleepUntil", StartActivity = "StartActivity", - StartCondition = "StartCondition", + StartTimer = "StartTimer", StartWorkflow = "StartWorkflow", } @@ -38,7 +33,6 @@ export interface ScheduleActivityCommand extends CommandBase { name: string; args: any[]; - timeoutSeconds?: number; heartbeatSeconds?: number; } @@ -62,42 +56,17 @@ export function isScheduleWorkflowCommand( return a.kind === CommandType.StartWorkflow; } -export interface SleepUntilCommand extends CommandBase { +export interface StartTimerCommand extends CommandBase { /** * Minimum time (in ISO 8601) where the machine should wake up. */ - untilTime: string; + schedule: Schedule; } -export function isSleepUntilCommand( +export function isStartTimerCommand( command: Command -): command is SleepUntilCommand { - return command.kind === CommandType.SleepUntil; -} - -export interface SleepForCommand extends CommandBase { - /** - * Number of seconds from the time the command is executed until the machine should wake up. - */ - durationSeconds: number; -} - -export function isSleepForCommand( - command: Command -): command is SleepForCommand { - return command.kind === CommandType.SleepFor; -} - -export interface ExpectSignalCommand - extends CommandBase { - signalId: string; - timeoutSeconds?: number; -} - -export function isExpectSignalCommand( - command: Command -): command is ExpectSignalCommand { - return command.kind === CommandType.ExpectSignal; +): command is StartTimerCommand { + return command.kind === CommandType.StartTimer; } export interface SendSignalCommand extends CommandBase { @@ -112,17 +81,6 @@ export function isSendSignalCommand( return command.kind === CommandType.SendSignal; } -export interface StartConditionCommand - extends CommandBase { - timeoutSeconds?: number; -} - -export function isStartConditionCommand( - command: Command -): command is StartConditionCommand { - return command.kind === CommandType.StartCondition; -} - export interface PublishEventsCommand extends CommandBase { events: EventEnvelope[]; diff --git a/packages/@eventual/core/src/condition.ts b/packages/@eventual/core/src/condition.ts index c16159a83..94d1db98f 100644 --- a/packages/@eventual/core/src/condition.ts +++ b/packages/@eventual/core/src/condition.ts @@ -1,10 +1,11 @@ import { createConditionCall } from "./calls/condition-call.js"; +import { isEventual } from "./eventual.js"; import { isOrchestratorWorker } from "./runtime/flags.js"; export type ConditionPredicate = () => boolean; export interface ConditionOptions { - timeoutSeconds?: number; + timeout?: Promise; } /** @@ -32,7 +33,7 @@ export interface ConditionOptions { * onSignal("mySignal", () => { n++ }); * * // after 5 mySignals, this promise will be resolved. - * if(!(await condition({ timeoutSeconds: 5 * 60 }, () => n === 5))) { + * if(!(await condition({ timeout: duration(5, "minutes") }, () => n === 5))) { * return "did not get 5 in 5 minutes." * } * @@ -55,5 +56,10 @@ export function condition( } const [opts, predicate] = args.length === 1 ? [undefined, args[0]] : args; - return createConditionCall(predicate, opts?.timeoutSeconds) as any; + const timeout = opts?.timeout; + if (timeout && !isEventual(timeout)) { + throw new Error("Timeout promise must be an Eventual."); + } + + return createConditionCall(predicate, timeout) as any; } diff --git a/packages/@eventual/core/src/error.ts b/packages/@eventual/core/src/error.ts index 749305e63..f32820d71 100644 --- a/packages/@eventual/core/src/error.ts +++ b/packages/@eventual/core/src/error.ts @@ -23,7 +23,7 @@ export class DeterminismError extends EventualError { * Thrown from within a workflow when any set timeout expires. * * ```ts - * const myAct = new activity("myAct", {timeoutSeconds: 100}, async () => { ... }); + * const myAct = new activity("myAct", {timeout: duration(100, "seconds") }, async () => { ... }); * workflow("myWorkflow", async () => { * try { * await myAct(); diff --git a/packages/@eventual/core/src/eventual.ts b/packages/@eventual/core/src/eventual.ts index b7dec1bde..69aafe491 100644 --- a/packages/@eventual/core/src/eventual.ts +++ b/packages/@eventual/core/src/eventual.ts @@ -4,22 +4,15 @@ import { chain, Chain } from "./chain.js"; import type { Program } from "./interpret.js"; import { Result } from "./result.js"; import { - isSleepForCall, - isSleepUntilCall, - SleepForCall, - SleepUntilCall, -} from "./calls/sleep-call.js"; -import { - isExpectSignalCall, - ExpectSignalCall, -} from "./calls/expect-signal-call.js"; -import { - isRegisterSignalHandlerCall, - RegisterSignalHandlerCall, -} from "./calls/signal-handler-call.js"; + isAwaitDurationCall, + isAwaitTimeCall, + AwaitDurationCall, + AwaitTimeCall, +} from "./calls/await-time-call.js"; +import { RegisterSignalHandlerCall } from "./calls/signal-handler-call.js"; import { isSendSignalCall, SendSignalCall } from "./calls/send-signal-call.js"; import { isWorkflowCall, WorkflowCall } from "./calls/workflow-call.js"; -import { ConditionCall, isConditionCall } from "./calls/condition-call.js"; +import { ConditionCall } from "./calls/condition-call.js"; import { isOrchestratorWorker } from "./runtime/flags.js"; import { AwaitAny, createAwaitAny } from "./await-any.js"; import { AwaitAllSettled, createAwaitAllSettled } from "./await-all-settled.js"; @@ -28,6 +21,7 @@ import { isPublishEventsCall, PublishEventsCall, } from "./calls/send-events-call.js"; +import { ExpectSignalCall } from "./calls/expect-signal-call.js"; export type AwaitedEventual = T extends Promise ? Awaited @@ -49,6 +43,8 @@ export enum EventualKind { AwaitAll = 0, AwaitAllSettled = 12, AwaitAny = 10, + AwaitDurationCall = 3, + AwaitTimeCall = 4, Chain = 2, ConditionCall = 9, ExpectSignalCall = 6, @@ -56,8 +52,6 @@ export enum EventualKind { Race = 11, RegisterSignalHandlerCall = 7, SendSignalCall = 8, - SleepForCall = 3, - SleepUntilCall = 4, WorkflowCall = 5, } @@ -86,32 +80,29 @@ export type Eventual = | AwaitAny | Chain | CommandCall - | Race; + | ConditionCall + | ExpectSignalCall + | Race + | RegisterSignalHandlerCall; /** * Calls which emit commands. */ export type CommandCall = | ActivityCall - | ConditionCall - | ExpectSignalCall - | RegisterSignalHandlerCall + | AwaitDurationCall + | AwaitTimeCall | PublishEventsCall | SendSignalCall - | SleepForCall - | SleepUntilCall | WorkflowCall; export function isCommandCall(call: Eventual): call is CommandCall { return ( isActivityCall(call) || - isConditionCall(call) || - isExpectSignalCall(call) || isPublishEventsCall(call) || - isRegisterSignalHandlerCall(call) || isSendSignalCall(call) || - isSleepForCall(call) || - isSleepUntilCall(call) || + isAwaitDurationCall(call) || + isAwaitTimeCall(call) || isWorkflowCall(call) ); } diff --git a/packages/@eventual/core/src/heartbeat.ts b/packages/@eventual/core/src/heartbeat.ts index d1be8710c..b5d939aa0 100644 --- a/packages/@eventual/core/src/heartbeat.ts +++ b/packages/@eventual/core/src/heartbeat.ts @@ -7,7 +7,7 @@ import { SendActivityHeartbeatResponse } from "./service-client.js"; * * If called from outside of an {@link activity}, the activity token must be provided. * - * If the activity has a heartbeatTimeout set and the workflow has not received a heartbeat in heartbeatTimeoutSeconds, + * If the activity has a heartbeatTimeout set and the workflow has not received a heartbeat within the set duration, * the workflow will throw a {@link HeartbeatTimeout} and cancel the activity. * * @returns {@link HeartbeatResponse} which has response.cancelled if the activity was cancelled for any reason (ex: workflow succeeded, failed, or the activity timed out). diff --git a/packages/@eventual/core/src/index.ts b/packages/@eventual/core/src/index.ts index 975f50b83..955cd7ce8 100644 --- a/packages/@eventual/core/src/index.ts +++ b/packages/@eventual/core/src/index.ts @@ -15,11 +15,12 @@ export * from "./heartbeat.js"; export * from "./interpret.js"; export * from "./result.js"; export * from "./runtime/index.js"; +export * from "./schedule.js"; export * from "./secret.js"; export * from "./service-client.js"; export * from "./service-type.js"; export * from "./signals.js"; -export * from "./sleep.js"; +export * from "./await-time.js"; export * from "./tasks.js"; export * from "./util.js"; export * from "./workflow-events.js"; diff --git a/packages/@eventual/core/src/interpret.ts b/packages/@eventual/core/src/interpret.ts index 22d70241b..4fe961351 100644 --- a/packages/@eventual/core/src/interpret.ts +++ b/packages/@eventual/core/src/interpret.ts @@ -25,16 +25,11 @@ import { isSignalReceived, isFailedEvent, isScheduledEvent, - isSleepCompleted, - isSleepScheduled, - isExpectSignalStarted, - isExpectSignalTimedOut, + isTimerCompleted, + isTimerScheduled, ScheduledEvent, isSignalSent, - isConditionStarted, - isConditionTimedOut, isWorkflowTimedOut, - isActivityTimedOut, isActivityHeartbeatTimedOut, isEventsPublished, WorkflowEvent, @@ -51,7 +46,10 @@ import { import { createChain, isChain, Chain } from "./chain.js"; import { assertNever, _Iterator, iterator, or } from "./util.js"; import { Command, CommandType } from "./command.js"; -import { isSleepForCall, isSleepUntilCall } from "./calls/sleep-call.js"; +import { + isAwaitDurationCall, + isAwaitTimeCall, +} from "./calls/await-time-call.js"; import { isExpectSignalCall, ExpectSignalCall, @@ -68,6 +66,7 @@ import { isAwaitAllSettled } from "./await-all-settled.js"; import { isAwaitAny } from "./await-any.js"; import { isRace } from "./race.js"; import { isPublishEventsCall } from "./calls/send-events-call.js"; +import { Schedule } from "./schedule.js"; export interface WorkflowResult { /** @@ -211,21 +210,16 @@ export function interpret( kind: CommandType.StartActivity, args: call.args, name: call.name, - timeoutSeconds: call.timeoutSeconds, heartbeatSeconds: call.heartbeatSeconds, seq: call.seq!, }; - } else if (isSleepUntilCall(call)) { - return { - kind: CommandType.SleepUntil, - seq: call.seq!, - untilTime: call.isoDate, - }; - } else if (isSleepForCall(call)) { + } else if (isAwaitTimeCall(call) || isAwaitDurationCall(call)) { return { - kind: CommandType.SleepFor, + kind: CommandType.StartTimer, seq: call.seq!, - durationSeconds: call.durationSeconds, + schedule: isAwaitTimeCall(call) + ? Schedule.time(call.isoDate) + : Schedule.duration(call.dur, call.unit), }; } else if (isWorkflowCall(call)) { return { @@ -235,13 +229,6 @@ export function interpret( name: call.name, opts: call.opts, }; - } else if (isExpectSignalCall(call)) { - return { - kind: CommandType.ExpectSignal, - signalId: call.signalId, - seq: call.seq!, - timeoutSeconds: call.timeoutSeconds, - }; } else if (isSendSignalCall(call)) { return { kind: CommandType.SendSignal, @@ -250,12 +237,6 @@ export function interpret( seq: call.seq!, payload: call.payload, }; - } else if (isConditionCall(call)) { - return { - kind: CommandType.StartCondition, - seq: call.seq!, - timeoutSeconds: call.timeoutSeconds, - }; } else if (isRegisterSignalHandlerCall(call)) { return []; } else if (isPublishEventsCall(call)) { @@ -291,19 +272,6 @@ export function interpret( */ pushEventual(activity) { if (isCommandCall(activity)) { - if (isExpectSignalCall(activity)) { - subscribeToSignal(activity.signalId, activity); - } else if (isConditionCall(activity)) { - // if the condition is resolvable, don't add it to the calls. - const result = tryResolveResult(activity); - if (result) { - return activity; - } - } else if (isRegisterSignalHandlerCall(activity)) { - subscribeToSignal(activity.signalId, activity); - // signal handler does not emit a call/command. It is only internal. - return activity; - } activity.seq = nextSeq(); callTable[activity.seq!] = activity; calls.push(activity); @@ -316,10 +284,19 @@ export function interpret( isAwaitAll(activity) || isAwaitAllSettled(activity) || isAwaitAny(activity) || + isConditionCall(activity) || isRace(activity) ) { return activity; + } else if (isRegisterSignalHandlerCall(activity)) { + subscribeToSignal(activity.signalId, activity); + // signal handler does not emit a call/command. It is only internal. + return activity; + } else if (isExpectSignalCall(activity)) { + subscribeToSignal(activity.signalId, activity); + return activity; } + return assertNever(activity); }, }; @@ -453,6 +430,13 @@ export function interpret( */ function resolveResult(activity: Eventual & { result: undefined }) { if (isConditionCall(activity)) { + // first check the state of the condition's timeout + if (activity.timeout) { + const timeoutResult = tryResolveResult(activity.timeout); + if (isResolved(timeoutResult) || isFailed(timeoutResult)) { + return Result.resolved(false); + } + } // try to evaluate the condition's result. const predicateResult = activity.predicate(); if (isGenerator(predicateResult)) { @@ -464,7 +448,27 @@ export function interpret( } else if (predicateResult) { return Result.resolved(true); } - } else if (isChain(activity) || isCommandCall(activity)) { + } else if (isActivityCall(activity) || isExpectSignalCall(activity)) { + if (activity.timeout) { + const timeoutResult = tryResolveResult(activity.timeout); + if (isResolved(timeoutResult) || isFailed(timeoutResult)) { + return Result.failed( + new Timeout( + isActivityCall(activity) + ? "Activity Timed Out" + : isExpectSignalCall(activity) + ? "Expect Signal Timed Out" + : assertNever(activity) + ) + ); + } + } + return undefined; + } else if ( + isChain(activity) || + isCommandCall(activity) || + isRegisterSignalHandlerCall(activity) + ) { // chain and most commands will be resolved elsewhere (ex: commitCompletionEvent or commitSignal) return undefined; } else if (isAwaitAll(activity)) { @@ -547,15 +551,8 @@ export function interpret( } call.result = isSucceededEvent(event) ? Result.resolved(event.result) - : isSleepCompleted(event) + : isTimerCompleted(event) ? Result.resolved(undefined) - : isExpectSignalTimedOut(event) - ? Result.failed(new Timeout("Expect Signal Timed Out")) - : isConditionTimedOut(event) - ? // a timed out condition returns false - Result.resolved(false) - : isActivityTimedOut(event) - ? Result.failed(new Timeout("Activity Timed Out")) : isActivityHeartbeatTimedOut(event) ? Result.failed(new HeartbeatTimeout("Activity Heartbeat TimedOut")) : Result.failed(new EventualError(event.error, event.message)); @@ -569,14 +566,10 @@ function isCorresponding(event: ScheduledEvent, call: CommandCall) { return isActivityCall(call) && call.name === event.name; } else if (isChildWorkflowScheduled(event)) { return isWorkflowCall(call) && call.name === event.name; - } else if (isSleepScheduled(event)) { - return isSleepUntilCall(call) || isSleepForCall(call); - } else if (isExpectSignalStarted(event)) { - return isExpectSignalCall(call) && event.signalId === call.signalId; + } else if (isTimerScheduled(event)) { + return isAwaitTimeCall(call) || isAwaitDurationCall(call); } else if (isSignalSent(event)) { return isSendSignalCall(call) && event.signalId === call.signalId; - } else if (isConditionStarted(event)) { - return isConditionCall(call); } else if (isEventsPublished(event)) { return isPublishEventsCall(call); } diff --git a/packages/@eventual/core/src/runtime/clients/timer-client.ts b/packages/@eventual/core/src/runtime/clients/timer-client.ts index 7d5edc002..696eaf575 100644 --- a/packages/@eventual/core/src/runtime/clients/timer-client.ts +++ b/packages/@eventual/core/src/runtime/clients/timer-client.ts @@ -1,6 +1,9 @@ +import { computeScheduleDate, Schedule } from "../../schedule.js"; import { HistoryStateEvent } from "../../workflow-events.js"; export abstract class TimerClient { + constructor(protected baseTime: () => Date) {} + /** * Starts a timer using SQS's message delay. * @@ -18,8 +21,8 @@ export abstract class TimerClient { /** * Starts a timer of any (positive) length. * - * If the timer is longer than 15 minutes (configurable via `props.sleepQueueThresholdMillis`), - * the timer will create a EventBridge schedule until the untilTime - props.sleepQueueThresholdMillis + * If the timer is longer than 15 minutes (configurable via `props.timerQueueThresholdMillis`), + * the timer will create a EventBridge schedule until the untilTime - props.timerQueueThresholdMillis * when the timer will be moved to the SQS queue. * * The SQS Queue will delay for floor(untilTime - currentTime) seconds until the timer handler can pick up the message. @@ -35,7 +38,7 @@ export abstract class TimerClient { * Use this method to clean the schedule. * * The provided schedule-forwarder function will call this method in Eventual when - * the timer is transferred from EventBridge to SQS at `props.sleepQueueThresholdMillis`. + * the timer is transferred from EventBridge to SQS at `props.timerQueueThresholdMillis`. */ public abstract clearSchedule(scheduleName: string): Promise; @@ -47,7 +50,10 @@ export abstract class TimerClient { public async scheduleEvent( request: ScheduleEventRequest ): Promise { - const untilTime = computeUntilTime(request.schedule); + const untilTime = computeScheduleDate( + request.schedule, + this.baseTime() + ).toISOString(); const event = { ...request.event, @@ -58,7 +64,7 @@ export abstract class TimerClient { event, executionId: request.executionId, type: TimerRequestType.ScheduleEvent, - schedule: Schedule.absolute(untilTime), + schedule: request.schedule, }); } } @@ -72,38 +78,6 @@ export enum TimerRequestType { ActivityHeartbeatMonitor = "CheckHeartbeat", } -export interface RelativeSchedule { - type: "Relative"; - timerSeconds: number; - baseTime: Date; -} - -export interface AbsoluteSchedule { - type: "Absolute"; - untilTime: string; -} - -export type Schedule = RelativeSchedule | AbsoluteSchedule; - -export const Schedule = { - relative( - timerSeconds: number, - baseTime: Date = new Date() - ): RelativeSchedule { - return { - type: "Relative", - timerSeconds, - baseTime, - }; - }, - absolute(untilTime: string): AbsoluteSchedule { - return { - type: "Absolute", - untilTime, - }; - }, -}; - export type TimerRequestBase = { type: T; schedule: Schedule; @@ -155,11 +129,3 @@ export interface ScheduleEventRequest extends Omit { event: Omit; } - -export function computeUntilTime(schedule: TimerRequest["schedule"]): string { - return "untilTime" in schedule - ? schedule.untilTime - : new Date( - schedule.baseTime.getTime() + schedule.timerSeconds * 1000 - ).toISOString(); -} diff --git a/packages/@eventual/core/src/runtime/command-executor.ts b/packages/@eventual/core/src/runtime/command-executor.ts index 98a456d32..8e0b46438 100644 --- a/packages/@eventual/core/src/runtime/command-executor.ts +++ b/packages/@eventual/core/src/runtime/command-executor.ts @@ -1,35 +1,24 @@ import { Command, - ExpectSignalCommand, - isExpectSignalCommand, isPublishEventsCommand, isScheduleActivityCommand, isScheduleWorkflowCommand, isSendSignalCommand, - isSleepForCommand, - isSleepUntilCommand, - isStartConditionCommand, + isStartTimerCommand, PublishEventsCommand, ScheduleActivityCommand, ScheduleWorkflowCommand, SendSignalCommand, - SleepForCommand, - SleepUntilCommand, - StartConditionCommand, + StartTimerCommand, } from "../command.js"; import { - ActivityTimedOut, WorkflowEventType, createEvent, ActivityScheduled, ChildWorkflowScheduled, - SleepScheduled, - SleepCompleted, - ExpectSignalStarted, - ExpectSignalTimedOut, + TimerScheduled, + TimerCompleted, HistoryStateEvent, - ConditionStarted, - ConditionTimedOut, SignalSent, EventsPublished, } from "../workflow-events.js"; @@ -37,11 +26,12 @@ import { assertNever } from "../util.js"; import { Workflow } from "../workflow.js"; import { formatChildExecutionName, formatExecutionId } from "./execution-id.js"; import { ActivityWorkerRequest } from "./handlers/activity-worker.js"; -import { Schedule, TimerClient } from "./clients/timer-client.js"; +import { TimerClient } from "./clients/timer-client.js"; import { WorkflowRuntimeClient } from "./clients/workflow-runtime-client.js"; import { WorkflowClient } from "./clients/workflow-client.js"; import { EventClient } from "./clients/event-client.js"; import { isChildExecutionTarget } from "../signals.js"; +import { computeScheduleDate } from "../schedule.js"; interface CommandExecutorProps { workflowRuntimeClient: WorkflowRuntimeClient; @@ -71,16 +61,11 @@ export class CommandExecutor { ); } else if (isScheduleWorkflowCommand(command)) { return this.scheduleChildWorkflow(executionId, command, baseTime); - } else if (isSleepForCommand(command) || isSleepUntilCommand(command)) { - // all sleep times are computed using the start time of the WorkflowTaskStarted - return this.scheduleSleep(executionId, command, baseTime); - } else if (isExpectSignalCommand(command)) { - // should the timeout command be generic (ex: StartTimeout) or specific (ex: ExpectSignal)? - return this.executeExpectSignal(executionId, command, baseTime); + } else if (isStartTimerCommand(command)) { + // all timers are computed using the start time of the WorkflowTaskStarted + return this.startTimer(executionId, command, baseTime); } else if (isSendSignalCommand(command)) { return this.sendSignal(executionId, command, baseTime); - } else if (isStartConditionCommand(command)) { - return this.startCondition(executionId, command, baseTime); } else if (isPublishEventsCommand(command)) { return this.publishEvents(command, baseTime); } else { @@ -102,21 +87,7 @@ export class CommandExecutor { retry: 0, }; - const timeoutStarter = command.timeoutSeconds - ? await this.props.timerClient.scheduleEvent({ - schedule: Schedule.relative(command.timeoutSeconds, baseTime), - event: { - type: WorkflowEventType.ActivityTimedOut, - seq: command.seq, - }, - executionId, - }) - : undefined; - - const activityStarter = - this.props.workflowRuntimeClient.startActivity(request); - - await Promise.all([activityStarter, timeoutStarter]); + await this.props.workflowRuntimeClient.startActivity(request); return createEvent( { @@ -153,61 +124,29 @@ export class CommandExecutor { ); } - private async scheduleSleep( + private async startTimer( executionId: string, - - command: SleepForCommand | SleepUntilCommand, + command: StartTimerCommand, baseTime: Date - ): Promise { + ): Promise { // TODO validate - const untilTime = isSleepUntilCommand(command) - ? new Date(command.untilTime) - : new Date(baseTime.getTime() + command.durationSeconds * 1000); - const untilTimeIso = untilTime.toISOString(); - - await this.props.timerClient.scheduleEvent({ + await this.props.timerClient.scheduleEvent({ event: { - type: WorkflowEventType.SleepCompleted, + type: WorkflowEventType.TimerCompleted, seq: command.seq, }, - schedule: Schedule.absolute(untilTimeIso), + schedule: command.schedule, executionId, }); - return createEvent( + return createEvent( { - type: WorkflowEventType.SleepScheduled, + type: WorkflowEventType.TimerScheduled, seq: command.seq, - untilTime: untilTime.toISOString(), - }, - baseTime - ); - } - - private async executeExpectSignal( - executionId: string, - - command: ExpectSignalCommand, - baseTime: Date - ): Promise { - if (command.timeoutSeconds) { - await this.props.timerClient.scheduleEvent({ - event: { - signalId: command.signalId, - seq: command.seq, - type: WorkflowEventType.ExpectSignalTimedOut, - }, - schedule: Schedule.relative(command.timeoutSeconds, baseTime), - executionId, - }); - } - - return createEvent( - { - signalId: command.signalId, - seq: command.seq, - type: WorkflowEventType.ExpectSignalStarted, - timeoutSeconds: command.timeoutSeconds, + untilTime: computeScheduleDate( + command.schedule, + baseTime + ).toISOString(), }, baseTime ); @@ -244,31 +183,6 @@ export class CommandExecutor { ); } - private async startCondition( - executionId: string, - command: StartConditionCommand, - baseTime: Date - ) { - if (command.timeoutSeconds) { - await this.props.timerClient.scheduleEvent({ - event: { - type: WorkflowEventType.ConditionTimedOut, - seq: command.seq, - }, - executionId, - schedule: Schedule.relative(command.timeoutSeconds, baseTime), - }); - } - - return createEvent( - { - type: WorkflowEventType.ConditionStarted, - seq: command.seq!, - }, - baseTime - ); - } - private async publishEvents(command: PublishEventsCommand, baseTime: Date) { await this.props.eventClient.publishEvents(...command.events); return createEvent( diff --git a/packages/@eventual/core/src/runtime/flags.ts b/packages/@eventual/core/src/runtime/flags.ts index 225e735f7..9bbdb3c05 100644 --- a/packages/@eventual/core/src/runtime/flags.ts +++ b/packages/@eventual/core/src/runtime/flags.ts @@ -17,3 +17,31 @@ export function isOrchestratorWorker() { export function isEventHandler() { return process.env[SERVICE_TYPE_FLAG] === ServiceType.EventHandler; } + +export async function serviceTypeScope( + serviceType: ServiceType, + handler: () => Output +): Promise> { + const back = process.env[SERVICE_TYPE_FLAG]; + try { + process.env[SERVICE_TYPE_FLAG] = serviceType; + // await before return so that the promise is completed before the finally call. + return await handler(); + } finally { + process.env[SERVICE_TYPE_FLAG] = back; + } +} + +export function serviceTypeScopeSync( + serviceType: ServiceType, + handler: () => Output +): Output { + const back = process.env[SERVICE_TYPE_FLAG]; + try { + process.env[SERVICE_TYPE_FLAG] = serviceType; + // await before return so that the promise is completed before the finally call. + return handler(); + } finally { + process.env[SERVICE_TYPE_FLAG] = back; + } +} diff --git a/packages/@eventual/core/src/runtime/handlers/activity-worker.ts b/packages/@eventual/core/src/runtime/handlers/activity-worker.ts index fb2dc89f0..8bc017a3e 100644 --- a/packages/@eventual/core/src/runtime/handlers/activity-worker.ts +++ b/packages/@eventual/core/src/runtime/handlers/activity-worker.ts @@ -22,11 +22,7 @@ import { timed } from "../metrics/utils.js"; import { ActivityProvider } from "../providers/activity-provider.js"; import { ActivityNotFoundError } from "../../error.js"; import { extendsError } from "../../util.js"; -import { - Schedule, - TimerClient, - TimerRequestType, -} from "../clients/timer-client.js"; +import { TimerClient, TimerRequestType } from "../clients/timer-client.js"; import { RuntimeServiceClient } from "../clients/runtime-service-clients.js"; import { ActivityLogContext, @@ -35,6 +31,9 @@ import { LogLevel, } from "../log-agent.js"; import { EventClient } from "../clients/event-client.js"; +import { serviceTypeScope } from "../flags.js"; +import { ServiceType } from "../../service-type.js"; +import { Schedule } from "../../schedule.js"; export interface CreateActivityWorkerProps { activityRuntimeClient: ActivityRuntimeClient; @@ -92,203 +91,210 @@ export function createActivityWorker({ request: ActivityWorkerRequest, baseTime: Date = new Date(), getEndTime = () => new Date() - ) => { - const activityHandle = logAgent.isLogLevelSatisfied(LogLevel.DEBUG) - ? `${request.command.name}:${request.command.seq} for execution ${request.executionId} on retry ${request.retry}` - : request.command.name; - metrics.resetDimensions(false); - metrics.setNamespace(MetricsCommon.EventualNamespace); - metrics.putDimensions({ - ActivityName: request.command.name, - WorkflowName: request.workflowName, - }); - // the time from the workflow emitting the activity scheduled command - // to the request being seen. - const activityLogContext: ActivityLogContext = { - type: LogContextType.Activity, - activityName: request.command.name, - executionId: request.executionId, - seq: request.command.seq, - }; - const start = baseTime; - const recordAge = - start.getTime() - new Date(request.scheduledTime).getTime(); - metrics.putMetric( - ActivityMetrics.ActivityRequestAge, - recordAge, - Unit.Milliseconds - ); - if ( - !(await timed(metrics, ActivityMetrics.ClaimDuration, () => - activityRuntimeClient.claimActivity( + ) => + await serviceTypeScope(ServiceType.ActivityWorker, async () => { + const activityHandle = logAgent.isLogLevelSatisfied(LogLevel.DEBUG) + ? `${request.command.name}:${request.command.seq} for execution ${request.executionId} on retry ${request.retry}` + : request.command.name; + metrics.resetDimensions(false); + metrics.setNamespace(MetricsCommon.EventualNamespace); + metrics.putDimensions({ + ActivityName: request.command.name, + WorkflowName: request.workflowName, + }); + // the time from the workflow emitting the activity scheduled command + // to the request being seen. + const activityLogContext: ActivityLogContext = { + type: LogContextType.Activity, + activityName: request.command.name, + executionId: request.executionId, + seq: request.command.seq, + }; + const start = baseTime; + const recordAge = + start.getTime() - new Date(request.scheduledTime).getTime(); + metrics.putMetric( + ActivityMetrics.ActivityRequestAge, + recordAge, + Unit.Milliseconds + ); + if ( + !(await timed(metrics, ActivityMetrics.ClaimDuration, () => + activityRuntimeClient.claimActivity( + request.executionId, + request.command.seq, + request.retry + ) + )) + ) { + metrics.putMetric(ActivityMetrics.ClaimRejected, 1, Unit.Count); + console.info(`Activity ${activityHandle} already claimed.`); + return; + } + if (request.command.heartbeatSeconds) { + await timerClient.startTimer({ + activitySeq: request.command.seq, + type: TimerRequestType.ActivityHeartbeatMonitor, + executionId: request.executionId, + heartbeatSeconds: request.command.heartbeatSeconds, + schedule: Schedule.duration(request.command.heartbeatSeconds), + }); + } + setActivityContext({ + activityToken: createActivityToken( request.executionId, - request.command.seq, - request.retry - ) - )) - ) { - metrics.putMetric(ActivityMetrics.ClaimRejected, 1, Unit.Count); - console.info(`Activity ${activityHandle} already claimed.`); - return; - } - if (request.command.heartbeatSeconds) { - await timerClient.startTimer({ - activitySeq: request.command.seq, - type: TimerRequestType.ActivityHeartbeatMonitor, + request.command.seq + ), executionId: request.executionId, - heartbeatSeconds: request.command.heartbeatSeconds, - schedule: Schedule.relative(request.command.heartbeatSeconds), + scheduledTime: request.scheduledTime, + workflowName: request.workflowName, }); - } - setActivityContext({ - activityToken: createActivityToken( - request.executionId, - request.command.seq - ), - executionId: request.executionId, - scheduledTime: request.scheduledTime, - workflowName: request.workflowName, - }); - metrics.putMetric(ActivityMetrics.ClaimRejected, 0, Unit.Count); + metrics.putMetric(ActivityMetrics.ClaimRejected, 0, Unit.Count); - logAgent.logWithContext( - activityLogContext, - LogLevel.DEBUG, - `Processing ${activityHandle}.` - ); + logAgent.logWithContext( + activityLogContext, + LogLevel.DEBUG, + `Processing ${activityHandle}.` + ); + + const activity = activityProvider.getActivityHandler( + request.command.name + ); + try { + if (!activity) { + metrics.putMetric(ActivityMetrics.NotFoundError, 1, Unit.Count); + throw new ActivityNotFoundError( + request.command.name, + activityProvider.getActivityIds() + ); + } - const activity = activityProvider.getActivityHandler( - request.command.name - ); - try { - if (!activity) { - metrics.putMetric(ActivityMetrics.NotFoundError, 1, Unit.Count); - throw new ActivityNotFoundError( - request.command.name, - activityProvider.getActivityIds() + const result = await logAgent.logContextScope( + activityLogContext, + async () => { + return await timed( + metrics, + ActivityMetrics.OperationDuration, + () => activity(...request.command.args) + ); + } ); - } - const result = await logAgent.logContextScope( - activityLogContext, - async () => { - return await timed( + if (isAsyncResult(result)) { + metrics.setProperty(ActivityMetrics.HasResult, 0); + metrics.setProperty(ActivityMetrics.AsyncResult, 1); + + // TODO: Send heartbeat on sync activity completion. + + /** + * The activity has declared that it is async, other than logging, there is nothing left to do here. + * The activity should call {@link WorkflowClient.sendActivitySuccess} or {@link WorkflowClient.sendActivityFailure} when it is done. + */ + return timed( metrics, - ActivityMetrics.OperationDuration, - () => activity(...request.command.args) + ActivityMetrics.ActivityLogWriteDuration, + () => logAgent.flush() ); + } else if (result) { + metrics.setProperty(ActivityMetrics.HasResult, 1); + metrics.setProperty(ActivityMetrics.AsyncResult, 0); + metrics.putMetric( + ActivityMetrics.ResultBytes, + JSON.stringify(result).length, + Unit.Bytes + ); + } else { + metrics.setProperty(ActivityMetrics.HasResult, 0); + metrics.setProperty(ActivityMetrics.AsyncResult, 0); } - ); - if (isAsyncResult(result)) { - metrics.setProperty(ActivityMetrics.HasResult, 0); - metrics.setProperty(ActivityMetrics.AsyncResult, 1); - - // TODO: Send heartbeat on sync activity completion. - - /** - * The activity has declared that it is async, other than logging, there is nothing left to do here. - * The activity should call {@link WorkflowClient.sendActivitySuccess} or {@link WorkflowClient.sendActivityFailure} when it is done. - */ - return timed( - metrics, - ActivityMetrics.ActivityLogWriteDuration, - () => logAgent.flush() - ); - } else if (result) { - metrics.setProperty(ActivityMetrics.HasResult, 1); - metrics.setProperty(ActivityMetrics.AsyncResult, 0); - metrics.putMetric( - ActivityMetrics.ResultBytes, - JSON.stringify(result).length, - Unit.Bytes + logAgent.logWithContext( + activityLogContext, + LogLevel.INFO, + `Activity ${activityHandle} succeeded, reporting back to execution.` ); - } else { - metrics.setProperty(ActivityMetrics.HasResult, 0); - metrics.setProperty(ActivityMetrics.AsyncResult, 0); - } - - logAgent.logWithContext( - activityLogContext, - LogLevel.INFO, - `Activity ${activityHandle} succeeded, reporting back to execution.` - ); - const endTime = getEndTime(start); - const event = createEvent( - { - type: WorkflowEventType.ActivitySucceeded, - seq: request.command.seq, - result, - }, - endTime - ); + const endTime = getEndTime(start); + const event = createEvent( + { + type: WorkflowEventType.ActivitySucceeded, + seq: request.command.seq, + result, + }, + endTime + ); - await finishActivity( - event, - recordAge + (endTime.getTime() - start.getTime()) - ); - } catch (err) { - const [error, message] = extendsError(err) - ? [err.name, err.message] - : ["Error", JSON.stringify(err)]; + await finishActivity( + event, + recordAge + (endTime.getTime() - start.getTime()) + ); + } catch (err) { + const [error, message] = extendsError(err) + ? [err.name, err.message] + : ["Error", JSON.stringify(err)]; - logAgent.logWithContext( - activityLogContext, - LogLevel.DEBUG, - `Activity ${activityHandle} failed, reporting failure back to execution: ${error}: ${message}` - ); + logAgent.logWithContext( + activityLogContext, + LogLevel.DEBUG, + `Activity ${activityHandle} failed, reporting failure back to execution: ${error}: ${message}` + ); - const endTime = getEndTime(start); - const event = createEvent( - { - type: WorkflowEventType.ActivityFailed, - seq: request.command.seq, - error, - message, - }, - endTime - ); + const endTime = getEndTime(start); + const event = createEvent( + { + type: WorkflowEventType.ActivityFailed, + seq: request.command.seq, + error, + message, + }, + endTime + ); - await finishActivity( - event, - recordAge + (endTime.getTime() - start.getTime()) - ); - } finally { - clearActivityContext(); - } + await finishActivity( + event, + recordAge + (endTime.getTime() - start.getTime()) + ); + } finally { + clearActivityContext(); + } - function logActivityCompleteMetrics(failed: boolean, duration: number) { - metrics.putMetric( - ActivityMetrics.ActivityFailed, - failed ? 1 : 0, - Unit.Count - ); - metrics.putMetric( - ActivityMetrics.ActivitySucceeded, - failed ? 0 : 1, - Unit.Count - ); - // The total time from the activity being scheduled until it's result is send to the workflow. - metrics.putMetric(ActivityMetrics.TotalDuration, duration); - } + function logActivityCompleteMetrics( + failed: boolean, + duration: number + ) { + metrics.putMetric( + ActivityMetrics.ActivityFailed, + failed ? 1 : 0, + Unit.Count + ); + metrics.putMetric( + ActivityMetrics.ActivitySucceeded, + failed ? 0 : 1, + Unit.Count + ); + // The total time from the activity being scheduled until it's result is send to the workflow. + metrics.putMetric(ActivityMetrics.TotalDuration, duration); + } - async function finishActivity( - event: ActivitySucceeded | ActivityFailed, - duration: number - ) { - const logFlush = timed( - metrics, - ActivityMetrics.ActivityLogWriteDuration, - () => logAgent.flush() - ); - await timed(metrics, ActivityMetrics.SubmitWorkflowTaskDuration, () => - workflowClient.submitWorkflowTask(request.executionId, event) - ); - await logFlush; + async function finishActivity( + event: ActivitySucceeded | ActivityFailed, + duration: number + ) { + const logFlush = timed( + metrics, + ActivityMetrics.ActivityLogWriteDuration, + () => logAgent.flush() + ); + await timed( + metrics, + ActivityMetrics.SubmitWorkflowTaskDuration, + () => + workflowClient.submitWorkflowTask(request.executionId, event) + ); + await logFlush; - logActivityCompleteMetrics(isWorkflowFailed(event), duration); - } - } + logActivityCompleteMetrics(isWorkflowFailed(event), duration); + } + }) ); } diff --git a/packages/@eventual/core/src/runtime/handlers/api-handler.ts b/packages/@eventual/core/src/runtime/handlers/api-handler.ts index a19a05458..51cc910b7 100644 --- a/packages/@eventual/core/src/runtime/handlers/api-handler.ts +++ b/packages/@eventual/core/src/runtime/handlers/api-handler.ts @@ -1,6 +1,8 @@ import { api } from "../../api.js"; import { registerServiceClient } from "../../global.js"; import { EventualServiceClient } from "../../service-client.js"; +import { ServiceType } from "../../service-type.js"; +import { serviceTypeScope } from "../flags.js"; export interface ApiHandlerDependencies { serviceClient: EventualServiceClient; @@ -25,13 +27,15 @@ export function createApiHandler({ serviceClient }: ApiHandlerDependencies) { * then handles the request. */ return async function processRequest(request: Request): Promise { - try { - return api.handle(request); - } catch (err) { - console.error(err); - return new Response("Internal Server Error", { - status: 500, - }); - } + return await serviceTypeScope(ServiceType.ApiHandler, async () => { + try { + return api.handle(request); + } catch (err) { + console.error(err); + return new Response("Internal Server Error", { + status: 500, + }); + } + }); }; } diff --git a/packages/@eventual/core/src/runtime/handlers/event-handler.ts b/packages/@eventual/core/src/runtime/handlers/event-handler.ts index 700bdbf68..d16feff91 100644 --- a/packages/@eventual/core/src/runtime/handlers/event-handler.ts +++ b/packages/@eventual/core/src/runtime/handlers/event-handler.ts @@ -2,6 +2,8 @@ import { registerServiceClient } from "../../global.js"; import type { EventEnvelope } from "../../event.js"; import { EventHandlerProvider } from "../providers/event-handler-provider.js"; import { EventualServiceClient } from "../../service-client.js"; +import { serviceTypeScope } from "../flags.js"; +import { ServiceType } from "../../service-type.js"; /** * The dependencies of {@link createEventHandlerWorker}. @@ -38,14 +40,16 @@ export function createEventHandlerWorker({ } return async function (events) { - await Promise.allSettled( - events.map((event) => - Promise.allSettled( - eventHandlerProvider - .getEventHandlersForEvent(event.name) - .map((handler) => handler(event.event)) + return await serviceTypeScope(ServiceType.EventHandler, async () => { + await Promise.allSettled( + events.map((event) => + Promise.allSettled( + eventHandlerProvider + .getEventHandlersForEvent(event.name) + .map((handler) => handler(event.event)) + ) ) - ) - ); + ); + }); }; } diff --git a/packages/@eventual/core/src/runtime/handlers/orchestrator.ts b/packages/@eventual/core/src/runtime/handlers/orchestrator.ts index f6c75be4f..b3d46be5e 100644 --- a/packages/@eventual/core/src/runtime/handlers/orchestrator.ts +++ b/packages/@eventual/core/src/runtime/handlers/orchestrator.ts @@ -6,7 +6,7 @@ import { getEventId, HistoryStateEvent, isHistoryEvent, - isSleepCompleted, + isTimerCompleted, isWorkflowSucceeded, isWorkflowFailed, isWorkflowStarted, @@ -55,11 +55,14 @@ import { interpret } from "../../interpret.js"; import { clearEventualCollector } from "../../global.js"; import { DeterminismError } from "../../error.js"; import { ExecutionHistoryClient } from "../clients/execution-history-client.js"; -import { Schedule, TimerClient } from "../clients/timer-client.js"; +import { TimerClient } from "../clients/timer-client.js"; import { WorkflowRuntimeClient } from "../clients/workflow-runtime-client.js"; import { WorkflowClient } from "../clients/workflow-client.js"; import { MetricsClient } from "../clients/metrics-client.js"; import { EventClient } from "../clients/event-client.js"; +import { serviceTypeScope } from "../flags.js"; +import { ServiceType } from "../../service-type.js"; +import { Schedule } from "../../schedule.js"; /** * The Orchestrator's client dependencies. @@ -107,62 +110,63 @@ export function createOrchestrator({ eventClient, }); - return async (workflowTasks, baseTime = new Date()) => { - const tasksByExecutionId = groupBy( - workflowTasks, - (task) => task.executionId - ); + return async (workflowTasks, baseTime = new Date()) => + await serviceTypeScope(ServiceType.OrchestratorWorker, async () => { + const tasksByExecutionId = groupBy( + workflowTasks, + (task) => task.executionId + ); - const eventsByExecutionId = Object.fromEntries( - Object.entries(tasksByExecutionId).map(([executionId, records]) => [ - executionId, - records.flatMap((e) => e.events), - ]) - ); + const eventsByExecutionId = Object.fromEntries( + Object.entries(tasksByExecutionId).map(([executionId, records]) => [ + executionId, + records.flatMap((e) => e.events), + ]) + ); - console.info( - "Found execution ids: " + Object.keys(eventsByExecutionId).join(", ") - ); + console.info( + "Found execution ids: " + Object.keys(eventsByExecutionId).join(", ") + ); - // for each execution id - const results = await promiseAllSettledPartitioned( - Object.entries(eventsByExecutionId), - async ([executionId, records]) => { - if (!isExecutionId(executionId)) { - throw new Error(`invalid ExecutionID: '${executionId}'`); - } - const workflowName = parseWorkflowName(executionId); - if (workflowName === undefined) { - throw new Error(`execution ID '${executionId}' does not exist`); + // for each execution id + const results = await promiseAllSettledPartitioned( + Object.entries(eventsByExecutionId), + async ([executionId, records]) => { + if (!isExecutionId(executionId)) { + throw new Error(`invalid ExecutionID: '${executionId}'`); + } + const workflowName = parseWorkflowName(executionId); + if (workflowName === undefined) { + throw new Error(`execution ID '${executionId}' does not exist`); + } + // TODO: get workflow from execution id + return orchestrateExecution( + workflowName, + executionId, + records, + baseTime + ); } - // TODO: get workflow from execution id - return orchestrateExecution( - workflowName, - executionId, - records, - baseTime - ); - } - ); - - console.debug( - "Executions succeeded: " + - results.fulfilled.map(([[executionId]]) => executionId).join(",") - ); + ); - if (results.rejected.length > 0) { - console.error( - "Executions failed: \n" + - results.rejected - .map(([[executionId], error]) => `${executionId}: ${error}`) - .join("\n") + console.debug( + "Executions succeeded: " + + results.fulfilled.map(([[executionId]]) => executionId).join(",") ); - } - return { - failedExecutionIds: results.rejected.map((rejected) => rejected[0][0]), - }; - }; + if (results.rejected.length > 0) { + console.error( + "Executions failed: \n" + + results.rejected + .map(([[executionId], error]) => `${executionId}: ${error}`) + .join("\n") + ); + } + + return { + failedExecutionIds: results.rejected.map((rejected) => rejected[0][0]), + }; + }); async function orchestrateExecution( workflowName: string, @@ -290,7 +294,7 @@ export function createOrchestrator({ OrchestratorMetrics.TimeoutStartedDuration, () => timerClient.scheduleEvent({ - schedule: Schedule.absolute(newWorkflowStart.timeoutTime!), + schedule: Schedule.time(newWorkflowStart.timeoutTime!), event: createEvent( { type: WorkflowEventType.WorkflowTimedOut, @@ -369,12 +373,16 @@ export function createOrchestrator({ logAgent.logWithContext( executionLogContext, LogLevel.DEBUG, - "Workflow terminated with: " + JSON.stringify(result) + result + ? "Workflow returned a result with: " + JSON.stringify(result) + : "Workflow did not return a result." ); logAgent.logWithContext( executionLogContext, LogLevel.DEBUG, - `Found ${newCommands.length} new commands.` + `Found ${newCommands.length} new commands. ${JSON.stringify( + newCommands + )}` ); yield* await timed( @@ -453,7 +461,7 @@ export function createOrchestrator({ const inputEvents = [...historyEvents, ...uniqueTaskEvents]; - // Generates events that are time sensitive, like sleep completed events. + // Generates events that are time sensitive, like timer completed events. const syntheticEvents = generateSyntheticEvents(inputEvents, baseTime); const allEvents = [...inputEvents, ...syntheticEvents]; @@ -717,14 +725,14 @@ function logEventMetrics( events: WorkflowEvent[], now: Date ) { - const sleepCompletedEvents = events.filter(isSleepCompleted); - if (sleepCompletedEvents.length > 0) { - const sleepCompletedVariance = sleepCompletedEvents.map( + const timerCompletedEvents = events.filter(isTimerCompleted); + if (timerCompletedEvents.length > 0) { + const timerCompletedVariance = timerCompletedEvents.map( (s) => now.getTime() - new Date(s.timestamp).getTime() ); const avg = - sleepCompletedVariance.reduce((t, n) => t + n, 0) / - sleepCompletedVariance.length; - metrics.setProperty(OrchestratorMetrics.SleepVarianceMillis, avg); + timerCompletedVariance.reduce((t, n) => t + n, 0) / + timerCompletedVariance.length; + metrics.setProperty(OrchestratorMetrics.TimerVarianceMillis, avg); } } diff --git a/packages/@eventual/core/src/runtime/handlers/timer-handler.ts b/packages/@eventual/core/src/runtime/handlers/timer-handler.ts index 4b56e36f9..f02ffc223 100644 --- a/packages/@eventual/core/src/runtime/handlers/timer-handler.ts +++ b/packages/@eventual/core/src/runtime/handlers/timer-handler.ts @@ -7,7 +7,6 @@ import { assertNever } from "../../util.js"; import { isActivityHeartbeatMonitorRequest, isTimerScheduleEventRequest, - Schedule, TimerClient, TimerRequest, TimerRequestType, @@ -15,6 +14,7 @@ import { import type { WorkflowClient } from "../clients/workflow-client.js"; import { ActivityRuntimeClient } from "../clients/activity-runtime-client.js"; import { LogAgent, LogContextType, LogLevel } from "../log-agent.js"; +import { Schedule } from "../../schedule.js"; interface TimerHandlerProps { workflowClient: WorkflowClient; @@ -76,7 +76,7 @@ export function createTimerHandler({ activitySeq: request.activitySeq, executionId: request.executionId, heartbeatSeconds: request.heartbeatSeconds, - schedule: Schedule.relative(request.heartbeatSeconds), + schedule: Schedule.duration(request.heartbeatSeconds), }); } } else { diff --git a/packages/@eventual/core/src/runtime/metrics/constants.ts b/packages/@eventual/core/src/runtime/metrics/constants.ts index 9b05790ff..977e7e923 100644 --- a/packages/@eventual/core/src/runtime/metrics/constants.ts +++ b/packages/@eventual/core/src/runtime/metrics/constants.ts @@ -100,9 +100,9 @@ export namespace OrchestratorMetrics { */ export const ExecutionResultBytes = "ExecutionResultBytes"; /** - * Number of milliseconds between the expected sleep wakeup time and the actual incoming {@link SleepCompleted} event. + * Number of milliseconds between the expected timer wakeup time and the actual incoming {@link TimerCompleted} event. */ - export const SleepVarianceMillis = "SleepVarianceMillis"; + export const TimerVarianceMillis = "TimerVarianceMillis"; /** * Number of milliseconds it takes to send execution logs to where ever they are persisted. */ diff --git a/packages/@eventual/core/src/schedule.ts b/packages/@eventual/core/src/schedule.ts new file mode 100644 index 000000000..4bffb9fc5 --- /dev/null +++ b/packages/@eventual/core/src/schedule.ts @@ -0,0 +1,81 @@ +import { assertNever } from "./util.js"; + +export const DURATION_UNITS = [ + "second", + "seconds", + "minute", + "minutes", + "hour", + "hours", + "day", + "days", + "year", + "years", +] as const; +export type DurationUnit = typeof DURATION_UNITS[number]; + +export function isDurationUnit(u: string): u is DurationUnit { + return DURATION_UNITS.includes(u as any); +} + +export interface DurationSchedule { + type: "Duration"; + dur: number; + unit: DurationUnit; +} + +export interface TimeSchedule { + type: "Time"; + isoDate: string; +} + +export type Schedule = DurationSchedule | TimeSchedule; + +export const Schedule = { + duration(dur: number, unit: DurationUnit = "seconds"): DurationSchedule { + return { + type: "Duration", + dur, + unit, + }; + }, + time(isoDate: string | Date): TimeSchedule { + return { + type: "Time", + isoDate: typeof isoDate === "string" ? isoDate : isoDate.toISOString(), + }; + }, +}; + +export function isDurationSchedule( + schedule: Schedule +): schedule is DurationSchedule { + return schedule.type === "Duration"; +} + +export function isTimeSchedule(schedule: Schedule): schedule is TimeSchedule { + return schedule.type === "Time"; +} + +export function computeScheduleDate(schedule: Schedule, baseTime: Date): Date { + return isTimeSchedule(schedule) + ? new Date(schedule.isoDate) + : new Date( + baseTime.getTime() + + computeDurationSeconds(schedule.dur, schedule.unit) * 1000 + ); +} + +export function computeDurationSeconds(dur: number, unit: DurationUnit) { + return unit === "seconds" || unit === "second" + ? dur + : unit === "minutes" || unit === "minute" + ? dur * 60 + : unit === "hours" || unit === "hour" + ? dur * 60 * 60 + : unit === "days" || unit === "day" + ? dur * 60 * 60 * 24 + : unit === "years" || unit === "year" + ? dur * 60 * 60 * 24 * 365.25 + : assertNever(unit); +} diff --git a/packages/@eventual/core/src/signals.ts b/packages/@eventual/core/src/signals.ts index 882aacf7f..f3c6b4cb8 100644 --- a/packages/@eventual/core/src/signals.ts +++ b/packages/@eventual/core/src/signals.ts @@ -4,6 +4,7 @@ import { createExpectSignalCall } from "./calls/expect-signal-call.js"; import { isOrchestratorWorker } from "./runtime/flags.js"; import { getServiceClient } from "./global.js"; import { ulid } from "ulidx"; +import { isEventual } from "./eventual.js"; /** * A reference to a created signal handler. @@ -40,7 +41,7 @@ export class Signal { * workflow("wf", () => { * let done = false; * mySignal.onSignal(async () => { - * await sleepFor(10); + * await duration(10, "seconds"); * done = true; * }); * @@ -53,7 +54,7 @@ export class Signal { * ```ts * const handler = mySignal.onSignal(() => {}); * - * await sleepFor(10); + * await duration(10, "seconds"); * * handler.dispose(); * ``` @@ -76,14 +77,14 @@ export class Signal { * }); * ``` * - * Use `opts.timeoutSeconds` to stop waiting after the provided time. The Promise will reject + * Use `opts.timeout` to stop waiting after the provided time. The Promise will reject * when the provided time has elapsed. * * ```ts * const mySignal = signal("MySignal"); * workflow("wf", async () => { * try { - * const payload = await mySignal.expectSignal({ timeoutSecond: 10 * 60 }); + * const payload = await mySignal.expectSignal({ timeout: duration(10, "minutes) }); * * return payload; * } catch { @@ -120,11 +121,24 @@ export type SignalPayload> = E extends Signal export interface ExpectSignalOptions { /** - * Optional. Seconds to wait for the signal to be received. + * Optional. A promise that determines when to timeout a signal. * - * After the provided seconds, the promise will reject. + * Can be used together with {@link time} or {@link duration} or any other promise. + * + * ```ts + * await expectSignal(signal, { timeout: duration(10, "seconds") }) + * ``` + * + * After the provided promise resolves or rejects, the {@link expectSignal} will reject. + * + * You can also chain an expect signal with other promises. + * + * ```ts + * const abortSignal = expectSignal(abortSignal); + * expectSignal(signal, { timeout: abortSignal }); + * ``` */ - timeoutSeconds: number; + timeout: Promise; } /** @@ -140,8 +154,19 @@ export interface ExpectSignalOptions { * }); * ``` * - * Use `opts.timeoutSeconds` to stop waiting after the provided time. The Promise will reject - * when the provided time has elapsed. + * Use `opts.timeout` to stop waiting after some condition. The Promise will reject + * when the provided promise resolves. + * + * ```ts + * // timeout after 10 seconds + * await expectSignal(signal, { timeout: duration(10, "seconds") }) + * ``` + * + * ```ts + * // timeout after receiving a signal + * const abortSignal = expectSignal(abortSignal); + * await expectSignal(signal, { timeout: abortSignal }); + * ``` */ export function expectSignal( signal: Signal | string, @@ -151,9 +176,14 @@ export function expectSignal( throw new Error("expectSignal is only valid in a workflow"); } + const timeout = opts?.timeout; + if (timeout && !isEventual(timeout)) { + throw new Error("Timeout promise must be an Eventual."); + } + return createExpectSignalCall( typeof signal === "string" ? signal : signal.id, - opts?.timeoutSeconds + timeout ) as any; } @@ -168,7 +198,7 @@ export function expectSignal( * workflow("wf", () => { * let done = false; * onSignal("MySignal", async () => { - * await sleepFor(10); + * await duration(10, "seconds"); * done = true; * }); * @@ -181,7 +211,7 @@ export function expectSignal( * ```ts * const handler = onSignal("MySignal", () => {}); * - * await sleepFor(10); + * await duration(10, "seconds"); * * handler.dispose(); * ``` diff --git a/packages/@eventual/core/src/sleep.ts b/packages/@eventual/core/src/sleep.ts deleted file mode 100644 index f1a6af14d..000000000 --- a/packages/@eventual/core/src/sleep.ts +++ /dev/null @@ -1,34 +0,0 @@ -import { - createSleepForCall, - createSleepUntilCall, -} from "./calls/sleep-call.js"; -import { isOrchestratorWorker } from "./runtime/flags.js"; - -/** - * ```ts - * eventual(async () => { - * await sleepFor(10 * 60); // sleep for 10 minutes - * return "DONE!"; - * }) - * ``` - */ -export function sleepFor(seconds: number): Promise { - if (!isOrchestratorWorker()) { - throw new Error("sleepFor is only valid in a workflow"); - } - - // register a sleep command and return it (to be yielded) - return createSleepForCall(seconds) as any; -} - -export function sleepUntil(isoDate: string): Promise; -export function sleepUntil(date: Date): Promise; -export function sleepUntil(date: Date | string): Promise { - if (!isOrchestratorWorker()) { - throw new Error("sleepUntil is only valid in a workflow"); - } - - const d = new Date(date); - // register a sleep command and return it (to be yielded) - return createSleepUntilCall(d.toISOString()) as any; -} diff --git a/packages/@eventual/core/src/workflow-events.ts b/packages/@eventual/core/src/workflow-events.ts index b23a690e6..890fdd1e0 100644 --- a/packages/@eventual/core/src/workflow-events.ts +++ b/packages/@eventual/core/src/workflow-events.ts @@ -21,19 +21,14 @@ export enum WorkflowEventType { ActivityFailed = "ActivityFailed", ActivityHeartbeatTimedOut = "ActivityHeartbeatTimedOut", ActivityScheduled = "ActivityScheduled", - ActivityTimedOut = "ActivityTimedOut", ChildWorkflowSucceeded = "ChildWorkflowSucceeded", ChildWorkflowFailed = "ChildWorkflowFailed", ChildWorkflowScheduled = "ChildWorkflowScheduled", - ConditionStarted = "ConditionStarted", - ConditionTimedOut = "ConditionTimedOut", EventsPublished = "EventsPublished", - ExpectSignalStarted = "ExpectSignalStarted", - ExpectSignalTimedOut = "ExpectSignalTimedOut", SignalReceived = "SignalReceived", SignalSent = "SignalSent", - SleepCompleted = "SleepCompleted", - SleepScheduled = "SleepScheduled", + TimerCompleted = "TimerCompleted", + TimerScheduled = "TimerScheduled", WorkflowSucceeded = "WorkflowSucceeded", WorkflowFailed = "WorkflowFailed", WorkflowStarted = "WorkflowStarted", @@ -55,25 +50,20 @@ export type WorkflowEvent = export type ScheduledEvent = | ActivityScheduled + | TimerScheduled | ChildWorkflowScheduled - | ConditionStarted | EventsPublished - | ExpectSignalStarted - | SignalSent - | SleepScheduled; + | SignalSent; export type SucceededEvent = | ActivitySucceeded - | ChildWorkflowSucceeded - | SleepCompleted; + | TimerCompleted + | ChildWorkflowSucceeded; export type FailedEvent = | ActivityFailed | ActivityHeartbeatTimedOut - | ActivityTimedOut - | ChildWorkflowFailed - | ConditionTimedOut - | ExpectSignalTimedOut; + | ChildWorkflowFailed; /** * Events used by the workflow to replay an execution. @@ -228,19 +218,19 @@ export function isActivityHeartbeatTimedOut( return event.type === WorkflowEventType.ActivityHeartbeatTimedOut; } -export interface SleepScheduled extends HistoryEventBase { - type: WorkflowEventType.SleepScheduled; +export interface TimerScheduled extends HistoryEventBase { + type: WorkflowEventType.TimerScheduled; untilTime: string; } -export function isSleepScheduled( +export function isTimerScheduled( event: WorkflowEvent -): event is SleepScheduled { - return event.type === WorkflowEventType.SleepScheduled; +): event is TimerScheduled { + return event.type === WorkflowEventType.TimerScheduled; } -export interface SleepCompleted extends HistoryEventBase { - type: WorkflowEventType.SleepCompleted; +export interface TimerCompleted extends HistoryEventBase { + type: WorkflowEventType.TimerCompleted; result?: undefined; } @@ -278,10 +268,10 @@ export function isChildWorkflowFailed( return event.type === WorkflowEventType.ChildWorkflowFailed; } -export function isSleepCompleted( +export function isTimerCompleted( event: WorkflowEvent -): event is SleepCompleted { - return event.type === WorkflowEventType.SleepCompleted; +): event is TimerCompleted { + return event.type === WorkflowEventType.TimerCompleted; } export const isWorkflowCompletedEvent = or( @@ -289,35 +279,12 @@ export const isWorkflowCompletedEvent = or( isWorkflowSucceeded ); -export interface ExpectSignalStarted extends HistoryEventBase { - type: WorkflowEventType.ExpectSignalStarted; - signalId: string; - timeoutSeconds?: number; -} - -export interface ExpectSignalTimedOut extends HistoryEventBase { - type: WorkflowEventType.ExpectSignalTimedOut; - signalId: string; -} - export interface SignalReceived extends BaseEvent { type: WorkflowEventType.SignalReceived; signalId: string; payload?: Payload; } -export function isExpectSignalStarted( - event: WorkflowEvent -): event is ExpectSignalStarted { - return event.type === WorkflowEventType.ExpectSignalStarted; -} - -export function isExpectSignalTimedOut( - event: WorkflowEvent -): event is ExpectSignalTimedOut { - return event.type === WorkflowEventType.ExpectSignalTimedOut; -} - export function isSignalReceived( event: WorkflowEvent ): event is SignalReceived { @@ -346,40 +313,10 @@ export function isEventsPublished( return event.type === WorkflowEventType.EventsPublished; } -export interface ConditionStarted extends HistoryEventBase { - type: WorkflowEventType.ConditionStarted; -} - -export function isConditionStarted( - event: WorkflowEvent -): event is ConditionStarted { - return event.type === WorkflowEventType.ConditionStarted; -} - -export interface ConditionTimedOut extends HistoryEventBase { - type: WorkflowEventType.ConditionTimedOut; -} - -export function isConditionTimedOut( - event: WorkflowEvent -): event is ConditionTimedOut { - return event.type === WorkflowEventType.ConditionTimedOut; -} - -export interface ActivityTimedOut extends HistoryEventBase { - type: WorkflowEventType.ActivityTimedOut; -} - export interface WorkflowTimedOut extends BaseEvent { type: WorkflowEventType.WorkflowTimedOut; } -export function isActivityTimedOut( - event: WorkflowEvent -): event is ActivityTimedOut { - return event.type === WorkflowEventType.ActivityTimedOut; -} - export function isWorkflowTimedOut( event: WorkflowEvent ): event is WorkflowTimedOut { @@ -389,26 +326,21 @@ export function isWorkflowTimedOut( export const isScheduledEvent = or( isActivityScheduled, isChildWorkflowScheduled, - isConditionStarted, isEventsPublished, - isExpectSignalStarted, isSignalSent, - isSleepScheduled + isTimerScheduled ); export const isSucceededEvent = or( isActivitySucceeded, isChildWorkflowSucceeded, - isSleepCompleted + isTimerCompleted ); export const isFailedEvent = or( isActivityFailed, - isActivityTimedOut, isActivityHeartbeatTimedOut, isChildWorkflowFailed, - isConditionTimedOut, - isExpectSignalTimedOut, isWorkflowTimedOut ); diff --git a/packages/@eventual/core/src/workflow.ts b/packages/@eventual/core/src/workflow.ts index 1b1715ee5..6e429a384 100644 --- a/packages/@eventual/core/src/workflow.ts +++ b/packages/@eventual/core/src/workflow.ts @@ -3,10 +3,10 @@ import type { Program } from "./interpret.js"; import type { Context } from "./context.js"; import { HistoryStateEvent, - isSleepCompleted, - isSleepScheduled, - SleepCompleted, - SleepScheduled, + isTimerCompleted, + isTimerScheduled, + TimerCompleted, + TimerScheduled, WorkflowEventType, } from "./workflow-events.js"; import { createWorkflowCall } from "./calls/workflow-call.js"; @@ -15,6 +15,7 @@ import { isOrchestratorWorker } from "./runtime/flags.js"; import { isChain } from "./chain.js"; import { ChildExecution, ExecutionHandle } from "./execution.js"; import { StartExecutionRequest } from "./service-client.js"; +import { DurationSchedule } from "./schedule.js"; export type WorkflowHandler = ( input: Input, @@ -32,7 +33,7 @@ export interface WorkflowOptions { * * @default - workflow will never timeout. */ - timeoutSeconds?: number; + timeout?: DurationSchedule; } export type WorkflowOutput = W extends Workflow< @@ -156,7 +157,7 @@ export function workflow( workflow: name, executionName: input.executionName, input: input.input, - timeoutSeconds: input.timeoutSeconds, + timeout: input.timeout, ...opts, }); }; @@ -181,29 +182,29 @@ export function runWorkflowDefinition( } /** - * Generates synthetic events, for example, {@link SleepCompleted} events when the time has passed, but a real completed event has not come in yet. + * Generates synthetic events, for example, {@link TimerCompleted} events when the time has passed, but a real completed event has not come in yet. */ export function generateSyntheticEvents( events: HistoryStateEvent[], baseTime: Date -): SleepCompleted[] { - const unresolvedSleep: Record = {}; +): TimerCompleted[] { + const unresolvedTimers: Record = {}; - const sleepEvents = events.filter( - (event): event is SleepScheduled | SleepCompleted => - isSleepScheduled(event) || isSleepCompleted(event) + const timerEvents = events.filter( + (event): event is TimerScheduled | TimerCompleted => + isTimerScheduled(event) || isTimerCompleted(event) ); - for (const event of sleepEvents) { - if (isSleepScheduled(event)) { - unresolvedSleep[event.seq] = event; + for (const event of timerEvents) { + if (isTimerScheduled(event)) { + unresolvedTimers[event.seq] = event; } else { - delete unresolvedSleep[event.seq]; + delete unresolvedTimers[event.seq]; } } - const syntheticSleepComplete: SleepCompleted[] = Object.values( - unresolvedSleep + const syntheticTimerComplete: TimerCompleted[] = Object.values( + unresolvedTimers ) .filter( (event) => new Date(event.untilTime).getTime() <= baseTime.getTime() @@ -211,11 +212,11 @@ export function generateSyntheticEvents( .map( (e) => ({ - type: WorkflowEventType.SleepCompleted, + type: WorkflowEventType.TimerCompleted, seq: e.seq, timestamp: baseTime.toISOString(), - } satisfies SleepCompleted) + } satisfies TimerCompleted) ); - return syntheticSleepComplete; + return syntheticTimerComplete; } diff --git a/packages/@eventual/core/test/command-util.ts b/packages/@eventual/core/test/command-util.ts index 3a3fc65a6..0f40b6e8b 100644 --- a/packages/@eventual/core/test/command-util.ts +++ b/packages/@eventual/core/test/command-util.ts @@ -1,57 +1,45 @@ import { ulid } from "ulidx"; import { + StartTimerCommand, CommandType, - ExpectSignalCommand, PublishEventsCommand, ScheduleActivityCommand, ScheduleWorkflowCommand, SendSignalCommand, - SleepForCommand, - SleepUntilCommand, - StartConditionCommand, } from "../src/command.js"; import { EventEnvelope } from "../src/event.js"; import { ActivitySucceeded, ActivityFailed, ActivityScheduled, - ActivityTimedOut, ChildWorkflowSucceeded, ChildWorkflowFailed, ChildWorkflowScheduled, - ConditionStarted, - ConditionTimedOut, EventsPublished, - ExpectSignalStarted, - ExpectSignalTimedOut, SignalReceived, SignalSent, - SleepCompleted, - SleepScheduled, WorkflowEventType, WorkflowTimedOut, ActivityHeartbeatTimedOut, + TimerCompleted, + TimerScheduled, } from "../src/workflow-events.js"; import { SignalTarget } from "../src/signals.js"; +import { Schedule } from "../src/index.js"; -export function createSleepUntilCommand( - untilTime: string, +export function createStartTimerCommand( + schedule: Schedule, seq: number -): SleepUntilCommand { +): StartTimerCommand; +export function createStartTimerCommand(seq: number): StartTimerCommand; +export function createStartTimerCommand( + ...args: [schedule: Schedule, seq: number] | [seq: number] +): StartTimerCommand { + const [schedule, seq] = + args.length === 1 ? [Schedule.time("then"), args[0]] : args; return { - kind: CommandType.SleepUntil, - untilTime, - seq, - }; -} - -export function createSleepForCommand( - durationSeconds: number, - seq: number -): SleepForCommand { - return { - kind: CommandType.SleepFor, - durationSeconds, + kind: CommandType.StartTimer, + schedule, seq, }; } @@ -82,19 +70,6 @@ export function createScheduledWorkflowCommand( }; } -export function createExpectSignalCommand( - signalId: string, - seq: number, - timeoutSeconds?: number -): ExpectSignalCommand { - return { - kind: CommandType.ExpectSignal, - signalId, - seq, - timeoutSeconds, - }; -} - export function createSendSignalCommand( target: SignalTarget, signalId: string, @@ -119,17 +94,6 @@ export function createPublishEventCommand( }; } -export function createStartConditionCommand( - seq: number, - timeoutSeconds?: number -): StartConditionCommand { - return { - kind: CommandType.StartCondition, - seq, - timeoutSeconds, - }; -} - export function activitySucceeded(result: any, seq: number): ActivitySucceeded { return { type: WorkflowEventType.ActivitySucceeded, @@ -161,14 +125,6 @@ export function activityFailed(error: any, seq: number): ActivityFailed { }; } -export function activityTimedOut(seq: number): ActivityTimedOut { - return { - type: WorkflowEventType.ActivityTimedOut, - seq, - timestamp: new Date(0).toISOString(), - }; -} - export function workflowFailed(error: any, seq: number): ChildWorkflowFailed { return { type: WorkflowEventType.ChildWorkflowFailed, @@ -224,49 +180,23 @@ export function workflowScheduled( }; } -export function scheduledSleep(untilTime: string, seq: number): SleepScheduled { +export function timerScheduled(seq: number): TimerScheduled { return { - type: WorkflowEventType.SleepScheduled, - untilTime, + type: WorkflowEventType.TimerScheduled, + untilTime: "", seq, timestamp: new Date(0).toISOString(), }; } -export function completedSleep(seq: number): SleepCompleted { +export function timerCompleted(seq: number): TimerCompleted { return { - type: WorkflowEventType.SleepCompleted, + type: WorkflowEventType.TimerCompleted, seq, timestamp: new Date(0).toISOString(), }; } -export function timedOutExpectSignal( - signalId: string, - seq: number -): ExpectSignalTimedOut { - return { - type: WorkflowEventType.ExpectSignalTimedOut, - timestamp: new Date().toISOString(), - seq, - signalId, - }; -} - -export function startedExpectSignal( - signalId: string, - seq: number, - timeoutSeconds?: number -): ExpectSignalStarted { - return { - type: WorkflowEventType.ExpectSignalStarted, - signalId, - timestamp: new Date().toISOString(), - seq, - timeoutSeconds, - }; -} - export function signalReceived( signalId: string, payload?: any @@ -296,22 +226,6 @@ export function signalSent( }; } -export function conditionStarted(seq: number): ConditionStarted { - return { - type: WorkflowEventType.ConditionStarted, - seq, - timestamp: new Date().toISOString(), - }; -} - -export function conditionTimedOut(seq: number): ConditionTimedOut { - return { - type: WorkflowEventType.ConditionTimedOut, - timestamp: new Date().toISOString(), - seq, - }; -} - export function eventsPublished( events: EventEnvelope[], seq: number diff --git a/packages/@eventual/core/test/commend-executor.test.ts b/packages/@eventual/core/test/commend-executor.test.ts index 7dc0f0620..530222231 100644 --- a/packages/@eventual/core/test/commend-executor.test.ts +++ b/packages/@eventual/core/test/commend-executor.test.ts @@ -2,16 +2,11 @@ import { jest } from "@jest/globals"; import { CommandType } from "../src/command.js"; import { ActivityScheduled, - ActivityTimedOut, ChildWorkflowScheduled, - ConditionStarted, - ConditionTimedOut, EventsPublished, - ExpectSignalStarted, - ExpectSignalTimedOut, SignalSent, - SleepCompleted, - SleepScheduled, + TimerCompleted, + TimerScheduled, WorkflowEventType, } from "../src/workflow-events.js"; import { @@ -20,13 +15,13 @@ import { formatChildExecutionName, formatExecutionId, INTERNAL_EXECUTION_ID_PREFIX, + Schedule, SendSignalRequest, SignalTargetType, WorkflowClient, WorkflowRuntimeClient, } from "../src/index.js"; import { - Schedule, ScheduleEventRequest, TimerClient, } from "../src/runtime/clients/timer-client.js"; @@ -65,67 +60,34 @@ afterEach(() => { jest.resetAllMocks(); }); -describe("sleep", () => { - test("sleep for", async () => { - const event = await testExecutor.executeCommand( - workflow, - executionId, - { - kind: CommandType.SleepFor, - durationSeconds: 10, - seq: 0, - }, - baseTime - ); - - const untilTime = new Date(baseTime.getTime() + 10 * 1000).toISOString(); - - expect(mockTimerClient.scheduleEvent).toHaveBeenCalledWith< - [ScheduleEventRequest] - >({ - event: { - type: WorkflowEventType.SleepCompleted, - seq: 0, - }, - schedule: Schedule.absolute(untilTime), - executionId, - }); - - expect(event).toMatchObject({ - seq: 0, - timestamp: expect.stringContaining("Z"), - type: WorkflowEventType.SleepScheduled, - untilTime, - }); - }); - - test("sleep until", async () => { +describe("await times", () => { + test("await time", async () => { const event = await testExecutor.executeCommand( workflow, executionId, { - kind: CommandType.SleepUntil, - untilTime: baseTime.toISOString(), + kind: CommandType.StartTimer, + schedule: Schedule.time(baseTime), seq: 0, }, baseTime ); expect(mockTimerClient.scheduleEvent).toHaveBeenCalledWith< - [ScheduleEventRequest] + [ScheduleEventRequest] >({ event: { - type: WorkflowEventType.SleepCompleted, + type: WorkflowEventType.TimerCompleted, seq: 0, }, - schedule: Schedule.absolute(baseTime.toISOString()), + schedule: Schedule.time(baseTime.toISOString()), executionId, }); - expect(event).toMatchObject({ + expect(event).toMatchObject({ seq: 0, timestamp: expect.stringContaining("Z"), - type: WorkflowEventType.SleepScheduled, + type: WorkflowEventType.TimerScheduled, untilTime: baseTime.toISOString(), }); }); @@ -156,41 +118,6 @@ describe("activity", () => { name: "activity", }); }); - - test("start with timeout", async () => { - const event = await testExecutor.executeCommand( - workflow, - executionId, - { - kind: CommandType.StartActivity, - args: [], - name: "activity", - seq: 0, - timeoutSeconds: 100, - }, - baseTime - ); - - expect(mockTimerClient.scheduleEvent).toHaveBeenCalledWith< - [ScheduleEventRequest] - >({ - event: { - type: WorkflowEventType.ActivityTimedOut, - seq: 0, - }, - schedule: Schedule.relative(100, baseTime), - executionId, - }); - - expect(mockWorkflowRuntimeClient.startActivity).toHaveBeenCalledTimes(1); - - expect(event).toMatchObject({ - seq: 0, - timestamp: expect.stringContaining("Z"), - type: WorkflowEventType.ActivityScheduled, - name: "activity", - }); - }); }); describe("workflow", () => { @@ -227,64 +154,6 @@ describe("workflow", () => { }); }); -describe("expect signal", () => { - test("start", async () => { - const event = await testExecutor.executeCommand( - workflow, - executionId, - { - kind: CommandType.ExpectSignal, - signalId: "signal", - seq: 0, - }, - baseTime - ); - - expect(mockTimerClient.scheduleEvent).not.toHaveBeenCalled(); - - expect(event).toMatchObject({ - seq: 0, - timestamp: expect.stringContaining("Z"), - type: WorkflowEventType.ExpectSignalStarted, - signalId: "signal", - }); - }); - - test("start", async () => { - const event = await testExecutor.executeCommand( - workflow, - executionId, - { - kind: CommandType.ExpectSignal, - signalId: "signal", - seq: 0, - timeoutSeconds: 100, - }, - baseTime - ); - - expect(mockTimerClient.scheduleEvent).toHaveBeenCalledWith< - [ScheduleEventRequest] - >({ - event: { - signalId: "signal", - seq: 0, - type: WorkflowEventType.ExpectSignalTimedOut, - }, - schedule: Schedule.relative(100, baseTime), - executionId, - }); - - expect(event).toMatchObject({ - seq: 0, - timestamp: expect.stringContaining("Z"), - type: WorkflowEventType.ExpectSignalStarted, - signalId: "signal", - timeoutSeconds: 100, - }); - }); -}); - describe("send signal", () => { test("send", async () => { const event = await testExecutor.executeCommand( @@ -353,58 +222,6 @@ describe("send signal", () => { }); }); -describe("condition", () => { - test("send", async () => { - const event = await testExecutor.executeCommand( - workflow, - executionId, - { - kind: CommandType.StartCondition, - seq: 0, - }, - baseTime - ); - - expect(mockTimerClient.scheduleEvent).not.toHaveBeenCalled(); - - expect(event).toMatchObject({ - seq: 0, - type: WorkflowEventType.ConditionStarted, - timestamp: expect.stringContaining("Z"), - }); - }); - - test("send with timeout", async () => { - const event = await testExecutor.executeCommand( - workflow, - executionId, - { - kind: CommandType.StartCondition, - seq: 0, - timeoutSeconds: 100, - }, - baseTime - ); - - expect(mockTimerClient.scheduleEvent).toHaveBeenCalledWith< - [ScheduleEventRequest] - >({ - event: { - type: WorkflowEventType.ConditionTimedOut, - seq: 0, - }, - executionId, - schedule: Schedule.relative(100, baseTime), - }); - - expect(event).toMatchObject({ - seq: 0, - type: WorkflowEventType.ConditionStarted, - timestamp: expect.stringContaining("Z"), - }); - }); -}); - describe("public events", () => { test("send", async () => { const event = await testExecutor.executeCommand( diff --git a/packages/@eventual/core/test/interpret.test.ts b/packages/@eventual/core/test/interpret.test.ts index e0203f3c2..4f800a71b 100644 --- a/packages/@eventual/core/test/interpret.test.ts +++ b/packages/@eventual/core/test/interpret.test.ts @@ -2,30 +2,21 @@ import { createActivityCall } from "../src/calls/activity-call.js"; import { chain } from "../src/chain.js"; import { EventualError, HeartbeatTimeout, Timeout } from "../src/error.js"; -import { createSleepUntilCall } from "../src/calls/sleep-call.js"; import { activitySucceeded, activityFailed, activityHeartbeatTimedOut, activityScheduled, - activityTimedOut, - completedSleep, - conditionStarted, - conditionTimedOut, - createExpectSignalCommand, createPublishEventCommand, createScheduledActivityCommand, createScheduledWorkflowCommand, createSendSignalCommand, - createSleepForCommand, - createSleepUntilCommand, - createStartConditionCommand, + createStartTimerCommand, eventsPublished, - scheduledSleep, signalReceived, signalSent, - startedExpectSignal, - timedOutExpectSignal, + timerCompleted, + timerScheduled, workflowSucceeded, workflowFailed, workflowScheduled, @@ -45,12 +36,18 @@ import { Context } from "../src/context.js"; import { createAwaitAll } from "../src/await-all.js"; import { Result } from "../src/result.js"; import { signal, SignalTargetType } from "../src/signals.js"; -import { sleepFor, sleepUntil } from "../src/sleep.js"; import { WorkflowHandler, Workflow, workflow as _workflow, } from "../src/workflow.js"; +import { createAwaitAllSettled } from "../src/await-all-settled.js"; +import { duration, time } from "../src/await-time.js"; +import { + createAwaitTimeCall, + createAwaitDurationCall, +} from "../src/calls/await-time-call.js"; +import { Schedule } from "../src/schedule.js"; beforeAll(() => { process.env[SERVICE_TYPE_FLAG] = ServiceType.OrchestratorWorker; @@ -68,7 +65,7 @@ function* myWorkflow(event: any): Program { createActivityCall("my-activity-0", [event]); const all = yield Eventual.all([ - createSleepUntilCall("then"), + createAwaitTimeCall("then"), createActivityCall("my-activity-2", [event]), ]) as any; return [a, all]; @@ -115,7 +112,7 @@ test("should continue with result of completed Activity", () => { ).toMatchObject({ commands: [ createScheduledActivityCommand("my-activity-0", [event], 1), - createSleepUntilCommand("then", 2), + createStartTimerCommand(2), createScheduledActivityCommand("my-activity-2", [event], 3), ], }); @@ -164,32 +161,130 @@ test("should catch error of failed Activity", () => { }); test("should catch error of timing out Activity", () => { + function* myWorkflow(event: any): Program { + try { + const a: any = yield createActivityCall( + "my-activity", + [event], + createAwaitTimeCall("") + ); + + return a; + } catch (err) { + yield createActivityCall("handle-error", [err]); + return []; + } + } + expect( interpret(myWorkflow(event), [ - activityScheduled("my-activity", 0), - activityTimedOut(0), + timerScheduled(0), + timerCompleted(0), + activityScheduled("my-activity", 1), ]) ).toMatchObject({ commands: [ createScheduledActivityCommand( "handle-error", [new Timeout("Activity Timed Out")], - 1 + 2 ), ], }); }); +test("immediately abort activity on invalid timeout", () => { + function* myWorkflow(event: any): Program { + return createActivityCall( + "my-activity", + [event], + "not an awaitable" as any + ); + } + + expect( + interpret(myWorkflow(event), [activityScheduled("my-activity", 0)]) + ).toMatchObject({ + result: Result.failed(new Timeout("Activity Timed Out")), + }); +}); + +test("timeout multiple activities at once", () => { + function* myWorkflow(event: any): Program { + const time = createAwaitTimeCall(""); + const a = createActivityCall("my-activity", [event], time); + const b = createActivityCall("my-activity", [event], time); + + return yield createAwaitAllSettled([a, b]); + } + + expect( + interpret(myWorkflow(event), [ + timerScheduled(0), + activityScheduled("my-activity", 1), + activityScheduled("my-activity", 2), + timerCompleted(0), + ]) + ).toMatchObject({ + result: Result.resolved([ + { + status: "rejected", + reason: new Timeout("Activity Timed Out").toJSON(), + }, + { + status: "rejected", + reason: new Timeout("Activity Timed Out").toJSON(), + }, + ]), + commands: [], + }); +}); + +test("activity times out activity", () => { + function* myWorkflow(event: any): Program { + const z = createActivityCall("my-activity", [event]); + const a = createActivityCall("my-activity", [event], z); + const b = createActivityCall("my-activity", [event], a); + + return yield createAwaitAllSettled([z, a, b]); + } + + expect( + interpret(myWorkflow(event), [ + activityScheduled("my-activity", 0), + activityScheduled("my-activity", 1), + activityScheduled("my-activity", 2), + activitySucceeded("woo", 0), + ]) + ).toMatchObject({ + result: Result.resolved([ + { + status: "fulfilled", + value: "woo", + }, + { + status: "rejected", + reason: new Timeout("Activity Timed Out").toJSON(), + }, + { + status: "rejected", + reason: new Timeout("Activity Timed Out").toJSON(), + }, + ]), + commands: [], + }); +}); + test("should return final result", () => { expect( interpret(myWorkflow(event), [ activityScheduled("my-activity", 0), activitySucceeded("result", 0), activityScheduled("my-activity-0", 1), - scheduledSleep("then", 2), + timerScheduled(2), activityScheduled("my-activity-2", 3), activitySucceeded("result-0", 1), - completedSleep(2), + timerCompleted(2), activitySucceeded("result-2", 3), ]) ).toMatchObject({ @@ -205,7 +300,7 @@ test("should handle missing blocks", () => { commands: [ createScheduledActivityCommand("my-activity", [event], 0), createScheduledActivityCommand("my-activity-0", [event], 1), - createSleepUntilCommand("then", 2), + createStartTimerCommand(2), createScheduledActivityCommand("my-activity-2", [event], 3), ], }); @@ -220,7 +315,7 @@ test("should handle partial blocks", () => { ]) ).toMatchObject({ commands: [ - createSleepUntilCommand("then", 2), + createStartTimerCommand(2), createScheduledActivityCommand("my-activity-2", [event], 3), ], }); @@ -236,7 +331,7 @@ test("should handle partial blocks with partial completes", () => { ]) ).toMatchObject({ commands: [ - createSleepUntilCommand("then", 2), + createStartTimerCommand(2), createScheduledActivityCommand("my-activity-2", [event], 3), ], }); @@ -318,7 +413,7 @@ describe("activity", () => { test("should throw when scheduled does not correspond to call", () => { expect( - interpret(myWorkflow(event), [scheduledSleep("result", 0)]) + interpret(myWorkflow(event), [timerScheduled(0)]) ).toMatchObject({ result: Result.failed({ name: "DeterminismError" }), commands: [], @@ -396,10 +491,10 @@ test("should wait if partial results", () => { activityScheduled("my-activity", 0), activitySucceeded("result", 0), activityScheduled("my-activity-0", 1), - scheduledSleep("then", 2), + timerScheduled(2), activityScheduled("my-activity-2", 3), activitySucceeded("result-0", 1), - completedSleep(2), + timerCompleted(2), ]) ).toMatchObject({ commands: [], @@ -420,84 +515,78 @@ test("should return result of inner function", () => { }); }); -test("should schedule sleep for", () => { +test("should schedule duration", () => { function* workflow() { - yield sleepFor(10); + yield duration(10); } expect(interpret(workflow() as any, [])).toMatchObject({ - commands: [createSleepForCommand(10, 0)], + commands: [createStartTimerCommand(Schedule.duration(10), 0)], }); }); -test("should not re-schedule sleep for", () => { +test("should not re-schedule duration", () => { function* workflow() { - yield sleepFor(10); + yield duration(10); } - expect( - interpret(workflow() as any, [scheduledSleep("anything", 0)]) - ).toMatchObject({ + expect(interpret(workflow() as any, [timerScheduled(0)])).toMatchObject(< + WorkflowResult + >{ commands: [], }); }); -test("should complete sleep for", () => { +test("should complete duration", () => { function* workflow() { - yield sleepFor(10); + yield duration(10); return "done"; } expect( - interpret(workflow() as any, [ - scheduledSleep("anything", 0), - completedSleep(0), - ]) + interpret(workflow() as any, [timerScheduled(0), timerCompleted(0)]) ).toMatchObject({ result: Result.resolved("done"), commands: [], }); }); -test("should schedule sleep until", () => { +test("should schedule time", () => { const now = new Date(); function* workflow() { - yield sleepUntil(now); + yield time(now); } expect(interpret(workflow() as any, [])).toMatchObject({ - commands: [createSleepUntilCommand(now.toISOString(), 0)], + commands: [createStartTimerCommand(Schedule.time(now.toISOString()), 0)], }); }); -test("should not re-schedule sleep until", () => { +test("should not re-schedule time", () => { const now = new Date(); function* workflow() { - yield sleepUntil(now); + yield time(now); } - expect( - interpret(workflow() as any, [scheduledSleep("anything", 0)]) - ).toMatchObject({ + expect(interpret(workflow() as any, [timerScheduled(0)])).toMatchObject(< + WorkflowResult + >{ commands: [], }); }); -test("should complete sleep until", () => { +test("should complete time", () => { const now = new Date(); function* workflow() { - yield sleepUntil(now); + yield time(now); return "done"; } expect( - interpret(workflow() as any, [ - scheduledSleep("anything", 0), - completedSleep(0), - ]) + interpret(workflow() as any, [timerScheduled(0), timerCompleted(0)]) ).toMatchObject({ result: Result.resolved("done"), commands: [], @@ -508,13 +597,13 @@ describe("temple of doom", () => { /** * In our game, the player wants to get to the end of a hallway with traps. * The trap starts above the player and moves to a space in front of them - * after a sleepUntil("X"). + * after a time("X"). * * If the trap has moved (X time), the player may jump to avoid it. * If the player jumps when then trap has not moved, they will beheaded. * If the player runs when the trap has been triggered without jumping, they will have their legs cut off. * - * The trap is represented by a sleep command for X time. + * The trap is represented by a timer command for X time. * The player starts running by returning the "run" activity. * The player jumps be returning the "jump" activity. * (this would be better modeled with signals and conditions, but the effect is the same, wait, complete) @@ -526,7 +615,7 @@ describe("temple of doom", () => { let jump = false; const startTrap = chain(function* () { - yield createSleepUntilCall("X"); + yield createAwaitTimeCall("then"); trapDown = true; }); const waitForJump = chain(function* () { @@ -556,7 +645,7 @@ describe("temple of doom", () => { test("run until blocked", () => { expect(interpret(workflow() as any, [])).toMatchObject({ commands: [ - createSleepUntilCommand("X", 0), + createStartTimerCommand(0), createScheduledActivityCommand("jump", [], 1), createScheduledActivityCommand("run", [], 2), ], @@ -566,7 +655,7 @@ describe("temple of doom", () => { test("waiting", () => { expect( interpret(workflow() as any, [ - scheduledSleep("X", 0), + timerScheduled(0), activityScheduled("jump", 1), activityScheduled("run", 2), ]) @@ -576,13 +665,13 @@ describe("temple of doom", () => { }); test("trap triggers, player has not started, nothing happens", () => { - // complete sleep, nothing happens + // complete timer, nothing happens expect( interpret(workflow() as any, [ - scheduledSleep("X", 0), + timerScheduled(0), activityScheduled("jump", 1), activityScheduled("run", 2), - completedSleep(0), + timerCompleted(0), ]) ).toMatchObject({ commands: [], @@ -590,13 +679,13 @@ describe("temple of doom", () => { }); test("trap triggers and then the player starts, player is dead", () => { - // complete sleep, turn on, release the player, dead + // complete timer, turn on, release the player, dead expect( interpret(workflow() as any, [ - scheduledSleep("X", 0), + timerScheduled(0), activityScheduled("jump", 1), activityScheduled("run", 2), - completedSleep(0), + timerCompleted(0), activitySucceeded("anything", 2), ]) ).toMatchObject({ @@ -606,12 +695,12 @@ describe("temple of doom", () => { }); test("trap triggers and then the player starts, player is dead, commands are out of order", () => { - // complete sleep, turn on, release the player, dead + // complete timer, turn on, release the player, dead expect( interpret(workflow() as any, [ - completedSleep(0), + timerCompleted(0), activitySucceeded("anything", 2), - scheduledSleep("X", 0), + timerScheduled(0), activityScheduled("jump", 1), activityScheduled("run", 2), ]) @@ -625,7 +714,7 @@ describe("temple of doom", () => { // release the player, not on, alive expect( interpret(workflow() as any, [ - scheduledSleep("X", 0), + timerScheduled(0), activityScheduled("jump", 1), activityScheduled("run", 2), activitySucceeded("anything", 2), @@ -640,7 +729,7 @@ describe("temple of doom", () => { // release the player, not on, alive expect( interpret(workflow() as any, [ - scheduledSleep("X", 0), + timerScheduled(0), activitySucceeded("anything", 2), activityScheduled("jump", 1), activityScheduled("run", 2), @@ -656,7 +745,7 @@ describe("temple of doom", () => { expect( interpret(workflow() as any, [ activitySucceeded("anything", 2), - scheduledSleep("X", 0), + timerScheduled(0), activityScheduled("jump", 1), activityScheduled("run", 2), ]) @@ -669,11 +758,11 @@ describe("temple of doom", () => { test("release the player before the trap triggers, player lives", () => { expect( interpret(workflow() as any, [ - scheduledSleep("X", 0), + timerScheduled(0), activityScheduled("jump", 1), activityScheduled("run", 2), activitySucceeded("anything", 2), - completedSleep(0), + timerCompleted(0), ]) ).toMatchObject({ result: Result.resolved("alive"), @@ -1403,7 +1492,10 @@ test("workflow calling other workflow", () => { describe("signals", () => { describe("expect signal", () => { const wf = workflow(function* (): any { - const result = yield createExpectSignalCall("MySignal", 100 * 1000); + const result = yield createExpectSignalCall( + "MySignal", + createAwaitDurationCall(100 * 1000, "seconds") + ); return result ?? "done"; }); @@ -1412,15 +1504,13 @@ describe("signals", () => { expect(interpret(wf.definition(undefined, context), [])).toMatchObject(< WorkflowResult >{ - commands: [createExpectSignalCommand("MySignal", 0, 100 * 1000)], + commands: [createStartTimerCommand(Schedule.duration(100 * 1000), 0)], }); }); test("no signal", () => { expect( - interpret(wf.definition(undefined, context), [ - startedExpectSignal("MySignal", 0, 100 * 1000), - ]) + interpret(wf.definition(undefined, context), [timerScheduled(0)]) ).toMatchObject({ commands: [], }); @@ -1429,7 +1519,7 @@ describe("signals", () => { test("match signal", () => { expect( interpret(wf.definition(undefined, context), [ - startedExpectSignal("MySignal", 0, 100 * 1000), + timerScheduled(0), signalReceived("MySignal"), ]) ).toMatchObject({ @@ -1441,7 +1531,7 @@ describe("signals", () => { test("match signal with payload", () => { expect( interpret(wf.definition(undefined, context), [ - startedExpectSignal("MySignal", 0, 100 * 1000), + timerScheduled(0), signalReceived("MySignal", { done: true }), ]) ).toMatchObject({ @@ -1453,8 +1543,8 @@ describe("signals", () => { test("timed out", () => { expect( interpret(wf.definition(undefined, context), [ - startedExpectSignal("MySignal", 0, 100 * 1000), - timedOutExpectSignal("MySignal", 0), + timerScheduled(0), + timerCompleted(0), ]) ).toMatchObject({ result: Result.failed(new Timeout("Expect Signal Timed Out")), @@ -1465,8 +1555,8 @@ describe("signals", () => { test("timed out then signal", () => { expect( interpret(wf.definition(undefined, context), [ - startedExpectSignal("MySignal", 0, 100 * 1000), - timedOutExpectSignal("MySignal", 0), + timerScheduled(0), + timerCompleted(0), signalReceived("MySignal", { done: true }), ]) ).toMatchObject({ @@ -1478,9 +1568,9 @@ describe("signals", () => { test("match signal then timeout", () => { expect( interpret(wf.definition(undefined, context), [ - startedExpectSignal("MySignal", 0, 100 * 1000), + timerScheduled(0), signalReceived("MySignal"), - timedOutExpectSignal("MySignal", 0), + timerCompleted(0), ]) ).toMatchObject({ result: Result.resolved("done"), @@ -1491,7 +1581,7 @@ describe("signals", () => { test("match signal twice", () => { expect( interpret(wf.definition(undefined, context), [ - startedExpectSignal("MySignal", 0, 100 * 1000), + timerScheduled(0), signalReceived("MySignal"), signalReceived("MySignal"), ]) @@ -1503,16 +1593,22 @@ describe("signals", () => { test("multiple of the same signal", () => { const wf = workflow(function* () { - const wait1 = createExpectSignalCall("MySignal", 100 * 1000); - const wait2 = createExpectSignalCall("MySignal", 100 * 1000); + const wait1 = createExpectSignalCall( + "MySignal", + createAwaitDurationCall(100 * 1000, "seconds") + ); + const wait2 = createExpectSignalCall( + "MySignal", + createAwaitDurationCall(100 * 1000, "seconds") + ); return Eventual.all([wait1, wait2]); }); expect( interpret(wf.definition(undefined, context), [ - startedExpectSignal("MySignal", 0, 100 * 1000), - startedExpectSignal("MySignal", 1, 100 * 1000), + timerScheduled(0), + timerScheduled(1), signalReceived("MySignal", "done!!!"), ]) ).toMatchObject({ @@ -1523,14 +1619,20 @@ describe("signals", () => { test("expect then timeout", () => { const wf = workflow(function* (): any { - yield createExpectSignalCall("MySignal", 100 * 1000); - yield createExpectSignalCall("MySignal", 100 * 1000); + yield createExpectSignalCall( + "MySignal", + createAwaitDurationCall(100 * 1000, "seconds") + ); + yield createExpectSignalCall( + "MySignal", + createAwaitDurationCall(100 * 1000, "seconds") + ); }); expect( interpret(wf.definition(undefined, context), [ - startedExpectSignal("MySignal", 0, 100 * 1000), - timedOutExpectSignal("MySignal", 0), + timerScheduled(0), + timerCompleted(0), ]) ).toMatchObject({ result: Result.failed({ name: "Timeout" }), @@ -1540,15 +1642,21 @@ describe("signals", () => { test("expect random signal then timeout", () => { const wf = workflow(function* (): any { - yield createExpectSignalCall("MySignal", 100 * 1000); - yield createExpectSignalCall("MySignal", 100 * 1000); + yield createExpectSignalCall( + "MySignal", + createAwaitDurationCall(100 * 1000, "seconds") + ); + yield createExpectSignalCall( + "MySignal", + createAwaitDurationCall(100 * 1000, "seconds") + ); }); expect( interpret(wf.definition(undefined, context), [ - startedExpectSignal("MySignal", 0, 100 * 1000), + timerScheduled(0), signalReceived("SomethingElse"), - timedOutExpectSignal("MySignal", 0), + timerCompleted(0), ]) ).toMatchObject({ result: Result.failed({ name: "Timeout" }), @@ -1578,12 +1686,12 @@ describe("signals", () => { } ); - yield createSleepUntilCall(""); + yield createAwaitTimeCall("then"); mySignalHandler.dispose(); myOtherSignalHandler.dispose(); - yield createSleepUntilCall(""); + yield createAwaitTimeCall("then"); return { mySignalHappened, @@ -1596,7 +1704,7 @@ describe("signals", () => { expect(interpret(wf.definition(undefined, context), [])).toMatchObject(< WorkflowResult >{ - commands: [createSleepUntilCommand("", 0)], + commands: [createStartTimerCommand(0)], }); }); @@ -1606,7 +1714,7 @@ describe("signals", () => { signalReceived("MySignal"), ]) ).toMatchObject({ - commands: [createSleepUntilCommand("", 0)], + commands: [createStartTimerCommand(0)], }); }); @@ -1614,10 +1722,10 @@ describe("signals", () => { expect( interpret(wf.definition(undefined, context), [ signalReceived("MySignal"), - scheduledSleep("", 0), - completedSleep(0), - scheduledSleep("", 1), - completedSleep(1), + timerScheduled(0), + timerCompleted(0), + timerScheduled(1), + timerCompleted(1), ]) ).toMatchObject({ result: Result.resolved({ @@ -1635,10 +1743,10 @@ describe("signals", () => { signalReceived("MySignal"), signalReceived("MySignal"), signalReceived("MySignal"), - scheduledSleep("", 0), - completedSleep(0), - scheduledSleep("", 1), - completedSleep(1), + timerScheduled(0), + timerCompleted(0), + timerScheduled(1), + timerCompleted(1), ]) ).toMatchObject({ result: Result.resolved({ @@ -1653,13 +1761,13 @@ describe("signals", () => { test("send signal after dispose", () => { expect( interpret(wf.definition(undefined, context), [ - scheduledSleep("", 0), - completedSleep(0), + timerScheduled(0), + timerCompleted(0), signalReceived("MySignal"), signalReceived("MySignal"), signalReceived("MySignal"), - scheduledSleep("", 1), - completedSleep(1), + timerScheduled(1), + timerCompleted(1), ]) ).toMatchObject({ result: Result.resolved({ @@ -1678,7 +1786,7 @@ describe("signals", () => { ]) ).toMatchObject({ commands: [ - createSleepUntilCommand("", 0), + createStartTimerCommand(0), createScheduledActivityCommand("act1", ["hi"], 1), ], }); @@ -1692,22 +1800,22 @@ describe("signals", () => { ]) ).toMatchObject({ commands: [ - createSleepUntilCommand("", 0), + createStartTimerCommand(0), createScheduledActivityCommand("act1", ["hi"], 1), createScheduledActivityCommand("act1", ["hi2"], 2), ], }); }); - test("send other signal, wake sleep, with act scheduled", () => { + test("send other signal, wake timer, with act scheduled", () => { expect( interpret(wf.definition(undefined, context), [ signalReceived("MyOtherSignal", "hi"), - scheduledSleep("", 0), - completedSleep(0), + timerScheduled(0), + timerCompleted(0), activityScheduled("act1", 1), - scheduledSleep("", 2), - completedSleep(2), + timerScheduled(2), + timerCompleted(2), ]) ).toMatchObject({ result: Result.resolved({ @@ -1719,16 +1827,16 @@ describe("signals", () => { }); }); - test("send other signal, wake sleep, complete activity", () => { + test("send other signal, wake timer, complete activity", () => { expect( interpret(wf.definition(undefined, context), [ signalReceived("MyOtherSignal", "hi"), - scheduledSleep("", 0), + timerScheduled(0), activityScheduled("act1", 1), activitySucceeded("act1", 1), - completedSleep(0), - scheduledSleep("", 2), - completedSleep(2), + timerCompleted(0), + timerScheduled(2), + timerCompleted(2), ]) ).toMatchObject({ result: Result.resolved({ @@ -1740,16 +1848,16 @@ describe("signals", () => { }); }); - test("send other signal, wake sleep, complete activity after dispose", () => { + test("send other signal, wake timer, complete activity after dispose", () => { expect( interpret(wf.definition(undefined, context), [ signalReceived("MyOtherSignal", "hi"), - scheduledSleep("", 0), - completedSleep(0), + timerScheduled(0), + timerCompleted(0), activityScheduled("act1", 1), activitySucceeded("act1", 1), - scheduledSleep("", 2), - completedSleep(2), + timerScheduled(2), + timerCompleted(2), ]) ).toMatchObject({ result: Result.resolved({ @@ -1764,11 +1872,11 @@ describe("signals", () => { test("send other signal after dispose", () => { expect( interpret(wf.definition(undefined, context), [ - scheduledSleep("", 0), - completedSleep(0), + timerScheduled(0), + timerCompleted(0), signalReceived("MyOtherSignal", "hi"), - scheduledSleep("", 1), - completedSleep(1), + timerScheduled(1), + timerCompleted(1), ]) ).toMatchObject({ result: Result.resolved({ @@ -1920,29 +2028,35 @@ describe("condition", () => { expect( interpret(wf.definition(undefined, context), []) ).toMatchObject({ - commands: [createStartConditionCommand(0)], + commands: [], }); }); test("false condition emits events with timeout", () => { const wf = workflow(function* (): any { - yield createConditionCall(() => false, 100); + yield createConditionCall( + () => false, + createAwaitDurationCall(100, "seconds") + ); }); expect( interpret(wf.definition(undefined, context), []) ).toMatchObject({ - commands: [createStartConditionCommand(0, 100)], + commands: [createStartTimerCommand(Schedule.duration(100), 0)], }); }); test("false condition does not re-emit", () => { const wf = workflow(function* (): any { - yield createConditionCall(() => false, 100); + yield createConditionCall( + () => false, + createAwaitDurationCall(100, "seconds") + ); }); expect( - interpret(wf.definition(undefined, context), [conditionStarted(0)]) + interpret(wf.definition(undefined, context), [timerScheduled(0)]) ).toMatchObject({ commands: [], }); @@ -1953,7 +2067,12 @@ describe("condition", () => { createRegisterSignalHandlerCall("Yes", () => { yes = true; }); - if (!(yield createConditionCall(() => yes) as any)) { + if ( + !(yield createConditionCall( + () => yes, + createAwaitDurationCall(100, "seconds") + ) as any) + ) { return "timed out"; } return "done"; @@ -1962,7 +2081,7 @@ describe("condition", () => { test("trigger success", () => { expect( interpret(signalConditionFlow.definition(undefined, context), [ - conditionStarted(0), + timerScheduled(0), signalReceived("Yes"), ]) ).toMatchObject({ @@ -1974,7 +2093,7 @@ describe("condition", () => { test("trigger success eventually", () => { expect( interpret(signalConditionFlow.definition(undefined, context), [ - conditionStarted(0), + timerScheduled(0), signalReceived("No"), signalReceived("No"), signalReceived("No"), @@ -2002,7 +2121,6 @@ describe("condition", () => { expect( interpret(signalConditionOnAndOffFlow.definition(undefined, context), [ - conditionStarted(0), signalReceived("Yes"), ]) ).toMatchObject({ @@ -2013,8 +2131,8 @@ describe("condition", () => { test("trigger timeout", () => { expect( interpret(signalConditionFlow.definition(undefined, context), [ - conditionStarted(0), - conditionTimedOut(0), + timerScheduled(0), + timerCompleted(0), ]) ).toMatchObject({ result: Result.resolved("timed out"), @@ -2025,9 +2143,9 @@ describe("condition", () => { test("trigger success before timeout", () => { expect( interpret(signalConditionFlow.definition(undefined, context), [ - conditionStarted(0), + timerScheduled(0), signalReceived("Yes"), - conditionTimedOut(0), + timerCompleted(0), ]) ).toMatchObject({ result: Result.resolved("done"), @@ -2038,8 +2156,8 @@ describe("condition", () => { test("trigger timeout before success", () => { expect( interpret(signalConditionFlow.definition(undefined, context), [ - conditionStarted(0), - conditionTimedOut(0), + timerScheduled(0), + timerCompleted(0), signalReceived("Yes"), ]) ).toMatchObject({ @@ -2057,7 +2175,7 @@ describe("condition", () => { expect( interpret(wf.definition(undefined, context), []) ).toMatchObject({ - commands: [createStartConditionCommand(0)], + commands: [], }); }); }); @@ -2066,7 +2184,7 @@ test("nestedChains", () => { const wf = workflow(function* () { const funcs = { a: chain(function* () { - yield createSleepUntilCall(""); + yield createAwaitTimeCall("then"); }), }; @@ -2084,7 +2202,7 @@ test("nestedChains", () => { expect( interpret(wf.definition(undefined, context), []) ).toMatchObject({ - commands: [createSleepUntilCommand("", 0)], + commands: [createStartTimerCommand(0)], }); }); diff --git a/packages/@eventual/testing/src/clients/event-client.ts b/packages/@eventual/testing/src/clients/event-client.ts index 8ecd732f8..7aabfe55e 100644 --- a/packages/@eventual/testing/src/clients/event-client.ts +++ b/packages/@eventual/testing/src/clients/event-client.ts @@ -3,9 +3,7 @@ import { EventEnvelope, EventHandlerWorker, EventPayload, - ServiceType, } from "@eventual/core"; -import { serviceTypeScope } from "../utils.js"; export class TestEventClient implements EventClient { constructor(private eventHandlerWorker: EventHandlerWorker) {} @@ -13,8 +11,6 @@ export class TestEventClient implements EventClient { public async publishEvents( ...event: EventEnvelope[] ): Promise { - return serviceTypeScope(ServiceType.EventHandler, async () => { - await this.eventHandlerWorker(event); - }); + return await this.eventHandlerWorker(event); } } diff --git a/packages/@eventual/testing/src/clients/timer-client.ts b/packages/@eventual/testing/src/clients/timer-client.ts index 461f53905..b263be7f9 100644 --- a/packages/@eventual/testing/src/clients/timer-client.ts +++ b/packages/@eventual/testing/src/clients/timer-client.ts @@ -1,5 +1,6 @@ import { assertNever, + computeScheduleDate, isActivityHeartbeatMonitorRequest, isTimerScheduleEventRequest, TimerClient, @@ -9,17 +10,11 @@ import { TimeConnector } from "../environment.js"; export class TestTimerClient extends TimerClient { constructor(private timeConnector: TimeConnector) { - super(); + super(() => timeConnector.getTime()); } public async startShortTimer(timerRequest: TimerRequest): Promise { - const time = - timerRequest.schedule.type === "Absolute" - ? new Date(timerRequest.schedule.untilTime) - : new Date( - timerRequest.schedule.baseTime.getTime() + - timerRequest.schedule.timerSeconds * 1000 - ); + const time = computeScheduleDate(timerRequest.schedule, this.baseTime()); const seconds = (time.getTime() - this.timeConnector.getTime().getTime()) / 1000; diff --git a/packages/@eventual/testing/src/clients/workflow-client.ts b/packages/@eventual/testing/src/clients/workflow-client.ts index e470f6cbc..73e2a04bb 100644 --- a/packages/@eventual/testing/src/clients/workflow-client.ts +++ b/packages/@eventual/testing/src/clients/workflow-client.ts @@ -1,5 +1,6 @@ import { ActivityRuntimeClient, + computeScheduleDate, createEvent, Execution, ExecutionStatus, @@ -72,10 +73,8 @@ export class TestWorkflowClient extends WorkflowClient { }, workflowName, input: request.input, - timeoutTime: request.timeoutSeconds - ? new Date( - baseTime.getTime() + request.timeoutSeconds * 1000 - ).toISOString() + timeoutTime: request.timeout + ? computeScheduleDate(request.timeout, baseTime).toISOString() : undefined, }, baseTime diff --git a/packages/@eventual/testing/src/clients/workflow-runtime-client.ts b/packages/@eventual/testing/src/clients/workflow-runtime-client.ts index 52cff5f3f..b2086c03f 100644 --- a/packages/@eventual/testing/src/clients/workflow-runtime-client.ts +++ b/packages/@eventual/testing/src/clients/workflow-runtime-client.ts @@ -7,12 +7,10 @@ import { FailedExecution, FailExecutionRequest, HistoryStateEvent, - ServiceType, UpdateHistoryRequest, WorkflowClient, WorkflowRuntimeClient, } from "@eventual/core"; -import { serviceTypeScope } from "../utils.js"; import { TimeConnector } from "../environment.js"; import { ExecutionStore } from "../execution-store.js"; @@ -77,13 +75,11 @@ export class TestWorkflowRuntimeClient extends WorkflowRuntimeClient { } public async startActivity(request: ActivityWorkerRequest): Promise { - return serviceTypeScope(ServiceType.ActivityWorker, () => - this.activityWorker( - request, - this.timeConnector.getTime(), - // end time is the start time plus one second - (start) => new Date(start.getTime() + 1000) - ) + return this.activityWorker( + request, + this.timeConnector.getTime(), + // end time is the start time plus one second + (start) => new Date(start.getTime() + 1000) ); } } diff --git a/packages/@eventual/testing/src/environment.ts b/packages/@eventual/testing/src/environment.ts index 49d779730..fed7b4b32 100644 --- a/packages/@eventual/testing/src/environment.ts +++ b/packages/@eventual/testing/src/environment.ts @@ -43,7 +43,6 @@ import { TestActivityRuntimeClient } from "./clients/activity-runtime-client.js" import { TestTimerClient } from "./clients/timer-client.js"; import { TimeController } from "./time-controller.js"; import { ExecutionStore } from "./execution-store.js"; -import { serviceTypeScope } from "./utils.js"; import { MockableActivityProvider, MockActivity, @@ -437,9 +436,7 @@ export class TestEnvironment extends RuntimeServiceClient { throw new Error("Unknown event types in the TimerController."); } - await serviceTypeScope(ServiceType.OrchestratorWorker, () => - this.orchestrator(events, this.time) - ); + await this.orchestrator(events, this.time); } } diff --git a/packages/@eventual/testing/src/utils.ts b/packages/@eventual/testing/src/utils.ts deleted file mode 100644 index e3fc636c8..000000000 --- a/packages/@eventual/testing/src/utils.ts +++ /dev/null @@ -1,15 +0,0 @@ -import { ServiceType, SERVICE_TYPE_FLAG } from "@eventual/core"; - -export async function serviceTypeScope( - serviceType: ServiceType, - handler: () => Output -): Promise> { - const back = process.env[SERVICE_TYPE_FLAG]; - try { - process.env[SERVICE_TYPE_FLAG] = serviceType; - // await before return so that the promise is completed before the finally call. - return await handler(); - } finally { - process.env[SERVICE_TYPE_FLAG] = back; - } -} diff --git a/packages/@eventual/testing/test/env.test.ts b/packages/@eventual/testing/test/env.test.ts index 5c7f52e1b..741a6860e 100644 --- a/packages/@eventual/testing/test/env.test.ts +++ b/packages/@eventual/testing/test/env.test.ts @@ -52,7 +52,7 @@ beforeAll(async () => { entry: path.resolve( url.fileURLToPath(new URL(".", import.meta.url)), "./workflow.ts" - ) + ), }); await env.initialize(); diff --git a/packages/@eventual/testing/test/workflow.ts b/packages/@eventual/testing/test/workflow.ts index d8e3d833b..47b50b820 100644 --- a/packages/@eventual/testing/test/workflow.ts +++ b/packages/@eventual/testing/test/workflow.ts @@ -1,11 +1,11 @@ import { activity, asyncResult, + duration, event, sendSignal, signal, - sleepFor, - sleepUntil, + time, workflow, } from "@eventual/core"; import { SendMessageCommand, SQSClient } from "@aws-sdk/client-sqs"; @@ -24,9 +24,9 @@ export const sleepWorkflow = workflow( "sleepWorkflow", async (relative: boolean) => { if (relative) { - await sleepFor(10); + await duration(10); } else { - await sleepUntil("2022-01-02T12:00:00Z"); + await time("2022-01-02T12:00:00Z"); } return "hello"; } @@ -140,7 +140,7 @@ export const orchestrateWorkflow = workflow( await throwWorkflow(undefined); } const execution = signalWorkflow(undefined); - await sleepFor(1); + await duration(1); await execution.sendSignal(dataSignal, "hello from a workflow"); await execution.sendSignal(dataDoneSignal); await execution.sendSignal(continueSignal); @@ -151,7 +151,7 @@ export const orchestrateWorkflow = workflow( export const actWithTimeout = activity( "actWithTimeout", - { timeoutSeconds: 30 }, + { timeout: duration(30, "seconds") }, async () => { return "hi"; } @@ -159,17 +159,17 @@ export const actWithTimeout = activity( export const workflow2WithTimeouts = workflow( "wf2", - { timeoutSeconds: 50 }, + { timeout: duration(50, "seconds") }, async () => actWithTimeout() ); export const workflowWithTimeouts = workflow( "wf1", - { timeoutSeconds: 100 }, + { timeout: duration(100, "seconds") }, async () => { return Promise.allSettled([ actWithTimeout(), workflow2WithTimeouts(undefined), - dataSignal.expectSignal({ timeoutSeconds: 30 }), + dataSignal.expectSignal({ timeout: duration(30, "seconds") }), ]); } ); @@ -195,7 +195,7 @@ export const longRunningWorkflow = workflow("longRunningWf", async () => { const result = Promise.race([ act, (async () => { - await sleepFor(60 * 60); + await duration(60 * 60); return "sleep"; })(), ]);