Skip to content

Commit

Permalink
feat: replace sleep with time and duration (#221)
Browse files Browse the repository at this point in the history
  • Loading branch information
thantos authored Jan 15, 2023
1 parent 71efeab commit 27fc1fa
Show file tree
Hide file tree
Showing 56 changed files with 1,239 additions and 1,367 deletions.
4 changes: 2 additions & 2 deletions apps/test-app-runtime/src/slack-bot.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import { Slack, SlackCredentials } from "@eventual/integrations-slack";
import { AWSSecret } from "@eventual/aws-client";
import {
duration,
expectSignal,
JsonSecret,
sendSignal,
sleepFor,
workflow,
} from "@eventual/core";
import ms from "ms";
Expand Down Expand Up @@ -47,7 +47,7 @@ const remindMe = workflow(
message: string;
waitSeconds: number;
}) => {
await sleepFor(request.waitSeconds);
await duration(request.waitSeconds);

await slack.client.chat.postMessage({
channel: request.channel,
Expand Down
42 changes: 24 additions & 18 deletions apps/tests/aws-runtime/test/test-service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@ import {
expectSignal,
asyncResult,
sendSignal,
sleepFor,
sleepUntil,
time,
workflow,
sendActivityHeartbeat,
HeartbeatTimeout,
EventualError,
signal,
duration,
} from "@eventual/core";
import { SendMessageCommand, SQSClient } from "@aws-sdk/client-sqs";
import { AsyncWriterTestEvent } from "./async-writer-handler.js";
Expand Down Expand Up @@ -62,8 +62,8 @@ export const workflow2 = workflow("my-parent-workflow", async () => {
});

export const workflow3 = workflow("sleepy", async () => {
await sleepFor(2);
await sleepUntil(new Date(new Date().getTime() + 1000 * 2));
await duration(2);
await time(new Date(new Date().getTime() + 1000 * 2));
return `done!`;
});

Expand All @@ -81,7 +81,7 @@ export const workflow4 = workflow("parallel", async () => {
return Promise.allSettled([greetings, greetings2, greetings3, any, race]);

async function sayHelloInSeconds(seconds: number) {
await sleepFor(seconds);
await duration(seconds);
return await hello("sam");
}
});
Expand All @@ -95,7 +95,7 @@ const doneSignal = signal("done");
export const parentWorkflow = workflow("parentWorkflow", async () => {
const child = childWorkflow({ name: "child" });
while (true) {
const n = await mySignal.expectSignal({ timeoutSeconds: 10 });
const n = await mySignal.expectSignal({ timeout: duration(10, "seconds") });

console.log(n);

Expand Down Expand Up @@ -142,7 +142,9 @@ export const childWorkflow = workflow(
while (!done) {
sendSignal(parentId, mySignal, last + 1);
block = true;
if (!(await condition({ timeoutSeconds: 10 }, () => !block))) {
if (
!(await condition({ timeout: duration(10, "seconds") }, () => !block))
) {
throw new Error("timed out!");
}
}
Expand All @@ -153,27 +155,31 @@ export const childWorkflow = workflow(

const slowActivity = activity(
"slowAct",
{ timeoutSeconds: 5 },
{ timeout: duration(5, "seconds") },
() => new Promise((resolve) => setTimeout(resolve, 10 * 1000))
);

const slowWf = workflow("slowWorkflow", { timeoutSeconds: 5 }, () =>
sleepFor(10)
const slowWf = workflow(
"slowWorkflow",
{ timeout: duration(5, "seconds") },
() => duration(10)
);

export const timedOutWorkflow = workflow(
"timedOut",
{ timeoutSeconds: 100 },
{ timeout: duration(100, "seconds") },
async () => {
// chains to be able to run in parallel.
const timedOutFunctions = {
condition: async () => {
if (!(await condition({ timeoutSeconds: 2 }, () => false))) {
if (
!(await condition({ timeout: duration(2, "seconds") }, () => false))
) {
throw new Error("Timed Out!");
}
},
signal: async () => {
await mySignal.expectSignal({ timeoutSeconds: 2 });
await mySignal.expectSignal({ timeout: duration(2, "seconds") });
},
activity: slowActivity,
workflow: () => slowWf(undefined),
Expand All @@ -196,7 +202,7 @@ export const timedOutWorkflow = workflow(

export const asyncWorkflow = workflow(
"asyncWorkflow",
{ timeoutSeconds: 100 }, // timeout eventually
{ timeout: duration(100, "seconds") }, // timeout eventually
async () => {
const result = await asyncActivity("complete");

Expand All @@ -211,7 +217,7 @@ export const asyncWorkflow = workflow(

const activityWithHeartbeat = activity(
"activityWithHeartbeat",
{ heartbeatSeconds: 2 },
{ heartbeatTimeout: duration(2, "seconds") },
async (n: number, type: "success" | "no-heartbeat" | "some-heartbeat") => {
const delay = (s: number) =>
new Promise((resolve) => {
Expand All @@ -234,7 +240,7 @@ const activityWithHeartbeat = activity(

export const heartbeatWorkflow = workflow(
"heartbeatWorkflow",
{ timeoutSeconds: 100 }, // timeout eventually
{ timeout: duration(100, "seconds") }, // timeout eventually
async (n: number) => {
return await Promise.allSettled([
activityWithHeartbeat(n, "success"),
Expand Down Expand Up @@ -291,13 +297,13 @@ export const eventDrivenWorkflow = workflow(

// wait for the event to come back around and wake this workflow
const { value } = await expectSignal("start", {
timeoutSeconds: 30,
timeout: duration(30, "seconds"),
});

await sendFinishEvent(ctx.execution.id);

await expectSignal("finish", {
timeoutSeconds: 30,
timeout: duration(30, "seconds"),
});

return value;
Expand Down
4 changes: 2 additions & 2 deletions packages/@eventual/aws-cdk/src/scheduler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ export interface SchedulerProps {
}

/**
* Subsystem that orchestrates long running timers. Used to orchestrate timeouts, sleep
* Subsystem that orchestrates long running timers. Used to orchestrate timeouts, timers
* and heartbeats.
*/
export class Scheduler extends Construct implements IScheduler, IGrantable {
Expand All @@ -48,7 +48,7 @@ export class Scheduler extends Construct implements IScheduler, IGrantable {
*/
public readonly schedulerRole: IRole;
/**
* Timer (standard) queue which helps orchestrate scheduled things like sleep and dynamic retries.
* Timer (standard) queue which helps orchestrate scheduled things like timers, heartbeat, and dynamic retries.
*
* Worths in tandem with the {@link CfnSchedulerGroup} to create millisecond latency, long running timers.
*/
Expand Down
3 changes: 1 addition & 2 deletions packages/@eventual/aws-cdk/src/service-function.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { ServiceType, SERVICE_TYPE_FLAG } from "@eventual/core";
import { ServiceType } from "@eventual/core";
import {
Architecture,
Code,
Expand Down Expand Up @@ -28,7 +28,6 @@ export class ServiceFunction extends Function {
environment: {
...props.environment,
NODE_OPTIONS: "--enable-source-maps",
[SERVICE_TYPE_FLAG]: props.serviceType,
},
});
}
Expand Down
4 changes: 2 additions & 2 deletions packages/@eventual/aws-cdk/src/service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -122,11 +122,11 @@ export class Service extends Construct implements IGrantable {
*/
public readonly workflows: Workflows;
/**
* The subsystem for schedules and sleep timers.
* The subsystem for schedules and timers.
*/
public readonly scheduler: Scheduler;
/**
* The Resources for schedules and sleep timers.
* The Resources for schedules and timers.
*/
public readonly cliRole: Role;
/**
Expand Down
2 changes: 1 addition & 1 deletion packages/@eventual/aws-cdk/src/workflows.ts
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ export class Workflows extends Construct implements IWorkflows, IGrantable {
this.configureRecordHistory(this.orchestrator);
// allows the orchestrator to directly invoke the activity worker lambda function (async)
this.props.activities.configureScheduleActivity(this.orchestrator);
// allows allows the orchestrator to start timeout and sleep timers
// allows allows the orchestrator to start timeout and timers
this.props.scheduler.configureScheduleTimer(this.orchestrator);
// allows the orchestrator to send events to the workflow queue,
// write events to the execution table, and start other workflows
Expand Down
2 changes: 1 addition & 1 deletion packages/@eventual/aws-runtime/src/clients/create.ts
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ export const createTimerClient = /* @__PURE__ */ memoize(
schedulerRoleArn: props.schedulerRoleArn ?? env.schedulerRoleArn(),
schedulerDlqArn: props.schedulerDlqArn ?? env.schedulerDlqArn(),
schedulerGroup: props.schedulerGroup ?? env.schedulerGroup(),
sleepQueueThresholdSeconds: props.sleepQueueThresholdSeconds ?? 15 * 60,
timerQueueThresholdSeconds: props.timerQueueThresholdSeconds ?? 15 * 60,
sqs: props.sqs ?? sqs(),
timerQueueUrl: props.timerQueueUrl ?? env.timerQueueUrl(),
scheduleForwarderArn:
Expand Down
55 changes: 33 additions & 22 deletions packages/@eventual/aws-runtime/src/clients/timer-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@ import {
ScheduleForwarderRequest,
TimerRequest,
isActivityHeartbeatMonitorRequest,
computeUntilTime,
computeScheduleDate,
Schedule,
isTimeSchedule,
computeDurationSeconds,
} from "@eventual/core";
import { ulid } from "ulidx";

Expand All @@ -25,18 +28,18 @@ export interface AWSTimerClientProps {
readonly schedulerDlqArn: string;
readonly schedulerGroup: string;
/**
* If a sleep has a longer duration (in seconds) than this threshold,
* If a timer has a longer duration (in seconds) than this threshold,
* create an Event Bus Scheduler before sending it to the TimerQueue
*/
readonly sleepQueueThresholdSeconds: number;
readonly timerQueueThresholdSeconds: number;
readonly timerQueueUrl: string;
readonly sqs: SQSClient;
readonly scheduleForwarderArn: string;
}

export class AWSTimerClient extends TimerClient {
constructor(private props: AWSTimerClientProps) {
super();
super(() => new Date());
}

/**
Expand All @@ -52,7 +55,10 @@ export class AWSTimerClient extends TimerClient {
* the {@link TimerRequest} provided.
*/
public async startShortTimer(timerRequest: TimerRequest) {
const delaySeconds = computeTimerSeconds(timerRequest.schedule);
const delaySeconds = computeTimerSeconds(
timerRequest.schedule,
this.baseTime()
);

if (delaySeconds > 15 * 60) {
throw new Error(
Expand All @@ -74,8 +80,8 @@ export class AWSTimerClient extends TimerClient {
/**
* Starts a timer of any (positive) length.
*
* If the timer is longer than 15 minutes (configurable via `props.sleepQueueThresholdMillis`),
* the timer will create a EventBridge schedule until the untilTime - props.sleepQueueThresholdMillis
* If the timer is longer than 15 minutes (configurable via `props.timerQueueThresholdMillis`),
* the timer will create a EventBridge schedule until the untilTime - props.timerQueueThresholdMillis
* when the timer will be moved to the SQS queue.
*
* The SQS Queue will delay for floor(untilTime - currentTime) seconds until the timer handler can pick up the message.
Expand All @@ -84,22 +90,27 @@ export class AWSTimerClient extends TimerClient {
* the {@link TimerRequest} provided.
*/
public async startTimer(timerRequest: TimerRequest) {
const untilTimeIso = computeUntilTime(timerRequest.schedule);
const untilTime = new Date(untilTimeIso);
const sleepDuration = computeTimerSeconds(timerRequest.schedule);
const untilTime = computeScheduleDate(
timerRequest.schedule,
this.baseTime()
);
const timerDuration = computeTimerSeconds(
timerRequest.schedule,
this.baseTime()
);

/**
* If the sleep is longer than 15 minutes, create an EventBridge schedule first.
* If the timer is longer than 15 minutes, create an EventBridge schedule first.
* The Schedule will trigger a lambda which will re-compute the delay time and
* create a message in the timerQueue.
*
* The timerQueue ultimately will pick up the event and forward the {@link SleepComplete} to the workflow queue.
* The timerQueue ultimately will pick up the event and forward the {@link TimerComplete} to the workflow queue.
*/
if (sleepDuration > this.props.sleepQueueThresholdSeconds) {
// wait for utilTime - sleepQueueThresholdMillis and then forward the event to
if (timerDuration > this.props.timerQueueThresholdSeconds) {
// wait for utilTime - timerQueueThresholdMillis and then forward the event to
// the timerQueue
const scheduleTime =
untilTime.getTime() - this.props.sleepQueueThresholdSeconds;
untilTime.getTime() - this.props.timerQueueThresholdSeconds;
// EventBridge Scheduler only supports HH:MM:SS, strip off the milliseconds and `Z`.
const formattedSchedulerTime = new Date(scheduleTime)
.toISOString()
Expand All @@ -112,7 +123,7 @@ export class AWSTimerClient extends TimerClient {
scheduleName,
timerRequest,
forwardTime: "<aws.scheduler.scheduled-time>",
untilTime: untilTimeIso,
untilTime: untilTime.toISOString(),
};

try {
Expand Down Expand Up @@ -145,7 +156,7 @@ export class AWSTimerClient extends TimerClient {
}
} else {
/**
* When the sleep is less than 15 minutes, send the timer directly to the
* When the timer is less than 15 minutes, send the timer directly to the
* timer queue. The timer queue will pass the event on to the workflow queue
* once delaySeconds have passed.
*/
Expand All @@ -159,7 +170,7 @@ export class AWSTimerClient extends TimerClient {
* Use this method to clean the schedule.
*
* The provided schedule-forwarder function will call this method in Eventual when
* the timer is transferred from EventBridge to SQS at `props.sleepQueueThresholdMillis`.
* the timer is transferred from EventBridge to SQS at `props.timerQueueThresholdMillis`.
*/
public async clearSchedule(scheduleName: string) {
try {
Expand Down Expand Up @@ -203,16 +214,16 @@ function safeScheduleName(name: string) {
return name.replaceAll(/[^0-9a-zA-Z-_.]/g, "");
}

export function computeTimerSeconds(schedule: TimerRequest["schedule"]) {
return "untilTime" in schedule
function computeTimerSeconds(schedule: Schedule, baseTime: Date) {
return isTimeSchedule(schedule)
? Math.max(
// Compute the number of seconds (floored)
// subtract 1 because the maxBatchWindow is set to 1s on the lambda event source.
// this allows for more events to be sent at once while not adding extra latency
Math.ceil(
(new Date(schedule.untilTime).getTime() - new Date().getTime()) / 1000
(new Date(schedule.isoDate).getTime() - baseTime.getTime()) / 1000
),
0
)
: schedule.timerSeconds;
: computeDurationSeconds(schedule.dur, schedule.unit);
}
9 changes: 4 additions & 5 deletions packages/@eventual/aws-runtime/src/clients/workflow-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import {
lookupWorkflow,
SortOrder,
isExecutionStatus,
computeScheduleDate,
} from "@eventual/core";
import { ulid } from "ulidx";
import { inspect } from "util";
Expand Down Expand Up @@ -56,7 +57,7 @@ export class AWSWorkflowClient extends WorkflowClient {
executionName = ulid(),
workflow,
input,
timeoutSeconds,
timeout,
...request
}: StartExecutionRequest<W> | StartChildExecutionRequest<W>) {
if (typeof workflow === "string" && !lookupWorkflow(workflow)) {
Expand Down Expand Up @@ -107,10 +108,8 @@ export class AWSWorkflowClient extends WorkflowClient {
workflowName,
// generate the time for the workflow to timeout based on when it was started.
// the timer will be started by the orchestrator so the client does not need to have access to the timer client.
timeoutTime: timeoutSeconds
? new Date(
new Date().getTime() + timeoutSeconds * 1000
).toISOString()
timeoutTime: timeout
? computeScheduleDate(timeout, this.baseTime()).toISOString()
: undefined,
context: {
name: executionName,
Expand Down
Loading

0 comments on commit 27fc1fa

Please sign in to comment.