Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: replace sleep with time and duration #221

Merged
merged 17 commits into from
Jan 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions apps/test-app-runtime/src/slack-bot.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import { Slack, SlackCredentials } from "@eventual/integrations-slack";
import { AWSSecret } from "@eventual/aws-client";
import {
duration,
expectSignal,
JsonSecret,
sendSignal,
sleepFor,
workflow,
} from "@eventual/core";
import ms from "ms";
Expand Down Expand Up @@ -47,7 +47,7 @@ const remindMe = workflow(
message: string;
waitSeconds: number;
}) => {
await sleepFor(request.waitSeconds);
await duration(request.waitSeconds);

await slack.client.chat.postMessage({
channel: request.channel,
Expand Down
42 changes: 24 additions & 18 deletions apps/tests/aws-runtime/test/test-service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@ import {
expectSignal,
asyncResult,
sendSignal,
sleepFor,
sleepUntil,
time,
workflow,
sendActivityHeartbeat,
HeartbeatTimeout,
EventualError,
signal,
duration,
} from "@eventual/core";
import { SendMessageCommand, SQSClient } from "@aws-sdk/client-sqs";
import { AsyncWriterTestEvent } from "./async-writer-handler.js";
Expand Down Expand Up @@ -62,8 +62,8 @@ export const workflow2 = workflow("my-parent-workflow", async () => {
});

export const workflow3 = workflow("sleepy", async () => {
await sleepFor(2);
await sleepUntil(new Date(new Date().getTime() + 1000 * 2));
await duration(2);
await time(new Date(new Date().getTime() + 1000 * 2));
return `done!`;
});

Expand All @@ -81,7 +81,7 @@ export const workflow4 = workflow("parallel", async () => {
return Promise.allSettled([greetings, greetings2, greetings3, any, race]);

async function sayHelloInSeconds(seconds: number) {
await sleepFor(seconds);
await duration(seconds);
return await hello("sam");
}
});
Expand All @@ -95,7 +95,7 @@ const doneSignal = signal("done");
export const parentWorkflow = workflow("parentWorkflow", async () => {
const child = childWorkflow({ name: "child" });
while (true) {
const n = await mySignal.expectSignal({ timeoutSeconds: 10 });
const n = await mySignal.expectSignal({ timeout: duration(10, "seconds") });

console.log(n);

Expand Down Expand Up @@ -142,7 +142,9 @@ export const childWorkflow = workflow(
while (!done) {
sendSignal(parentId, mySignal, last + 1);
block = true;
if (!(await condition({ timeoutSeconds: 10 }, () => !block))) {
if (
!(await condition({ timeout: duration(10, "seconds") }, () => !block))
) {
throw new Error("timed out!");
}
}
Expand All @@ -153,27 +155,31 @@ export const childWorkflow = workflow(

const slowActivity = activity(
"slowAct",
{ timeoutSeconds: 5 },
{ timeout: duration(5, "seconds") },
() => new Promise((resolve) => setTimeout(resolve, 10 * 1000))
);

const slowWf = workflow("slowWorkflow", { timeoutSeconds: 5 }, () =>
sleepFor(10)
const slowWf = workflow(
"slowWorkflow",
{ timeout: duration(5, "seconds") },
() => duration(10)
);

export const timedOutWorkflow = workflow(
"timedOut",
{ timeoutSeconds: 100 },
{ timeout: duration(100, "seconds") },
async () => {
// chains to be able to run in parallel.
const timedOutFunctions = {
condition: async () => {
if (!(await condition({ timeoutSeconds: 2 }, () => false))) {
if (
!(await condition({ timeout: duration(2, "seconds") }, () => false))
) {
throw new Error("Timed Out!");
}
},
signal: async () => {
await mySignal.expectSignal({ timeoutSeconds: 2 });
await mySignal.expectSignal({ timeout: duration(2, "seconds") });
},
activity: slowActivity,
workflow: () => slowWf(undefined),
Expand All @@ -196,7 +202,7 @@ export const timedOutWorkflow = workflow(

export const asyncWorkflow = workflow(
"asyncWorkflow",
{ timeoutSeconds: 100 }, // timeout eventually
{ timeout: duration(100, "seconds") }, // timeout eventually
async () => {
const result = await asyncActivity("complete");

Expand All @@ -211,7 +217,7 @@ export const asyncWorkflow = workflow(

const activityWithHeartbeat = activity(
"activityWithHeartbeat",
{ heartbeatSeconds: 2 },
{ heartbeatTimeout: duration(2, "seconds") },
async (n: number, type: "success" | "no-heartbeat" | "some-heartbeat") => {
const delay = (s: number) =>
new Promise((resolve) => {
Expand All @@ -234,7 +240,7 @@ const activityWithHeartbeat = activity(

export const heartbeatWorkflow = workflow(
"heartbeatWorkflow",
{ timeoutSeconds: 100 }, // timeout eventually
{ timeout: duration(100, "seconds") }, // timeout eventually
async (n: number) => {
return await Promise.allSettled([
activityWithHeartbeat(n, "success"),
Expand Down Expand Up @@ -291,13 +297,13 @@ export const eventDrivenWorkflow = workflow(

// wait for the event to come back around and wake this workflow
const { value } = await expectSignal("start", {
timeoutSeconds: 30,
timeout: duration(30, "seconds"),
});

await sendFinishEvent(ctx.execution.id);

await expectSignal("finish", {
timeoutSeconds: 30,
timeout: duration(30, "seconds"),
});

return value;
Expand Down
4 changes: 2 additions & 2 deletions packages/@eventual/aws-cdk/src/scheduler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ export interface SchedulerProps {
}

/**
* Subsystem that orchestrates long running timers. Used to orchestrate timeouts, sleep
* Subsystem that orchestrates long running timers. Used to orchestrate timeouts, timers
* and heartbeats.
*/
export class Scheduler extends Construct implements IScheduler, IGrantable {
Expand All @@ -48,7 +48,7 @@ export class Scheduler extends Construct implements IScheduler, IGrantable {
*/
public readonly schedulerRole: IRole;
/**
* Timer (standard) queue which helps orchestrate scheduled things like sleep and dynamic retries.
* Timer (standard) queue which helps orchestrate scheduled things like timers, heartbeat, and dynamic retries.
*
* Worths in tandem with the {@link CfnSchedulerGroup} to create millisecond latency, long running timers.
*/
Expand Down
3 changes: 1 addition & 2 deletions packages/@eventual/aws-cdk/src/service-function.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { ServiceType, SERVICE_TYPE_FLAG } from "@eventual/core";
import { ServiceType } from "@eventual/core";
import {
Architecture,
Code,
Expand Down Expand Up @@ -28,7 +28,6 @@ export class ServiceFunction extends Function {
environment: {
...props.environment,
NODE_OPTIONS: "--enable-source-maps",
[SERVICE_TYPE_FLAG]: props.serviceType,
},
});
}
Expand Down
4 changes: 2 additions & 2 deletions packages/@eventual/aws-cdk/src/service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -122,11 +122,11 @@ export class Service extends Construct implements IGrantable {
*/
public readonly workflows: Workflows;
/**
* The subsystem for schedules and sleep timers.
* The subsystem for schedules and timers.
*/
public readonly scheduler: Scheduler;
/**
* The Resources for schedules and sleep timers.
* The Resources for schedules and timers.
*/
public readonly cliRole: Role;
/**
Expand Down
2 changes: 1 addition & 1 deletion packages/@eventual/aws-cdk/src/workflows.ts
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ export class Workflows extends Construct implements IWorkflows, IGrantable {
this.configureRecordHistory(this.orchestrator);
// allows the orchestrator to directly invoke the activity worker lambda function (async)
this.props.activities.configureScheduleActivity(this.orchestrator);
// allows allows the orchestrator to start timeout and sleep timers
// allows allows the orchestrator to start timeout and timers
this.props.scheduler.configureScheduleTimer(this.orchestrator);
// allows the orchestrator to send events to the workflow queue,
// write events to the execution table, and start other workflows
Expand Down
2 changes: 1 addition & 1 deletion packages/@eventual/aws-runtime/src/clients/create.ts
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ export const createTimerClient = /* @__PURE__ */ memoize(
schedulerRoleArn: props.schedulerRoleArn ?? env.schedulerRoleArn(),
schedulerDlqArn: props.schedulerDlqArn ?? env.schedulerDlqArn(),
schedulerGroup: props.schedulerGroup ?? env.schedulerGroup(),
sleepQueueThresholdSeconds: props.sleepQueueThresholdSeconds ?? 15 * 60,
timerQueueThresholdSeconds: props.timerQueueThresholdSeconds ?? 15 * 60,
sqs: props.sqs ?? sqs(),
timerQueueUrl: props.timerQueueUrl ?? env.timerQueueUrl(),
scheduleForwarderArn:
Expand Down
55 changes: 33 additions & 22 deletions packages/@eventual/aws-runtime/src/clients/timer-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@ import {
ScheduleForwarderRequest,
TimerRequest,
isActivityHeartbeatMonitorRequest,
computeUntilTime,
computeScheduleDate,
Schedule,
isTimeSchedule,
computeDurationSeconds,
} from "@eventual/core";
import { ulid } from "ulidx";

Expand All @@ -25,18 +28,18 @@ export interface AWSTimerClientProps {
readonly schedulerDlqArn: string;
readonly schedulerGroup: string;
/**
* If a sleep has a longer duration (in seconds) than this threshold,
* If a timer has a longer duration (in seconds) than this threshold,
* create an Event Bus Scheduler before sending it to the TimerQueue
*/
readonly sleepQueueThresholdSeconds: number;
readonly timerQueueThresholdSeconds: number;
readonly timerQueueUrl: string;
readonly sqs: SQSClient;
readonly scheduleForwarderArn: string;
}

export class AWSTimerClient extends TimerClient {
constructor(private props: AWSTimerClientProps) {
super();
super(() => new Date());
}

/**
Expand All @@ -52,7 +55,10 @@ export class AWSTimerClient extends TimerClient {
* the {@link TimerRequest} provided.
*/
public async startShortTimer(timerRequest: TimerRequest) {
const delaySeconds = computeTimerSeconds(timerRequest.schedule);
const delaySeconds = computeTimerSeconds(
timerRequest.schedule,
this.baseTime()
);

if (delaySeconds > 15 * 60) {
throw new Error(
Expand All @@ -74,8 +80,8 @@ export class AWSTimerClient extends TimerClient {
/**
* Starts a timer of any (positive) length.
*
* If the timer is longer than 15 minutes (configurable via `props.sleepQueueThresholdMillis`),
* the timer will create a EventBridge schedule until the untilTime - props.sleepQueueThresholdMillis
* If the timer is longer than 15 minutes (configurable via `props.timerQueueThresholdMillis`),
* the timer will create a EventBridge schedule until the untilTime - props.timerQueueThresholdMillis
* when the timer will be moved to the SQS queue.
*
* The SQS Queue will delay for floor(untilTime - currentTime) seconds until the timer handler can pick up the message.
Expand All @@ -84,22 +90,27 @@ export class AWSTimerClient extends TimerClient {
* the {@link TimerRequest} provided.
*/
public async startTimer(timerRequest: TimerRequest) {
const untilTimeIso = computeUntilTime(timerRequest.schedule);
const untilTime = new Date(untilTimeIso);
const sleepDuration = computeTimerSeconds(timerRequest.schedule);
const untilTime = computeScheduleDate(
timerRequest.schedule,
this.baseTime()
);
const timerDuration = computeTimerSeconds(
timerRequest.schedule,
this.baseTime()
);

/**
* If the sleep is longer than 15 minutes, create an EventBridge schedule first.
* If the timer is longer than 15 minutes, create an EventBridge schedule first.
* The Schedule will trigger a lambda which will re-compute the delay time and
* create a message in the timerQueue.
*
* The timerQueue ultimately will pick up the event and forward the {@link SleepComplete} to the workflow queue.
* The timerQueue ultimately will pick up the event and forward the {@link TimerComplete} to the workflow queue.
*/
if (sleepDuration > this.props.sleepQueueThresholdSeconds) {
// wait for utilTime - sleepQueueThresholdMillis and then forward the event to
if (timerDuration > this.props.timerQueueThresholdSeconds) {
// wait for utilTime - timerQueueThresholdMillis and then forward the event to
// the timerQueue
const scheduleTime =
untilTime.getTime() - this.props.sleepQueueThresholdSeconds;
untilTime.getTime() - this.props.timerQueueThresholdSeconds;
// EventBridge Scheduler only supports HH:MM:SS, strip off the milliseconds and `Z`.
const formattedSchedulerTime = new Date(scheduleTime)
.toISOString()
Expand All @@ -112,7 +123,7 @@ export class AWSTimerClient extends TimerClient {
scheduleName,
timerRequest,
forwardTime: "<aws.scheduler.scheduled-time>",
untilTime: untilTimeIso,
untilTime: untilTime.toISOString(),
};

try {
Expand Down Expand Up @@ -145,7 +156,7 @@ export class AWSTimerClient extends TimerClient {
}
} else {
/**
* When the sleep is less than 15 minutes, send the timer directly to the
* When the timer is less than 15 minutes, send the timer directly to the
* timer queue. The timer queue will pass the event on to the workflow queue
* once delaySeconds have passed.
*/
Expand All @@ -159,7 +170,7 @@ export class AWSTimerClient extends TimerClient {
* Use this method to clean the schedule.
*
* The provided schedule-forwarder function will call this method in Eventual when
* the timer is transferred from EventBridge to SQS at `props.sleepQueueThresholdMillis`.
* the timer is transferred from EventBridge to SQS at `props.timerQueueThresholdMillis`.
*/
public async clearSchedule(scheduleName: string) {
try {
Expand Down Expand Up @@ -203,16 +214,16 @@ function safeScheduleName(name: string) {
return name.replaceAll(/[^0-9a-zA-Z-_.]/g, "");
}

export function computeTimerSeconds(schedule: TimerRequest["schedule"]) {
return "untilTime" in schedule
function computeTimerSeconds(schedule: Schedule, baseTime: Date) {
return isTimeSchedule(schedule)
? Math.max(
// Compute the number of seconds (floored)
// subtract 1 because the maxBatchWindow is set to 1s on the lambda event source.
// this allows for more events to be sent at once while not adding extra latency
Math.ceil(
(new Date(schedule.untilTime).getTime() - new Date().getTime()) / 1000
(new Date(schedule.isoDate).getTime() - baseTime.getTime()) / 1000
),
0
)
: schedule.timerSeconds;
: computeDurationSeconds(schedule.dur, schedule.unit);
}
9 changes: 4 additions & 5 deletions packages/@eventual/aws-runtime/src/clients/workflow-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import {
lookupWorkflow,
SortOrder,
isExecutionStatus,
computeScheduleDate,
} from "@eventual/core";
import { ulid } from "ulidx";
import { inspect } from "util";
Expand Down Expand Up @@ -56,7 +57,7 @@ export class AWSWorkflowClient extends WorkflowClient {
executionName = ulid(),
workflow,
input,
timeoutSeconds,
timeout,
...request
}: StartExecutionRequest<W> | StartChildExecutionRequest<W>) {
if (typeof workflow === "string" && !lookupWorkflow(workflow)) {
Expand Down Expand Up @@ -107,10 +108,8 @@ export class AWSWorkflowClient extends WorkflowClient {
workflowName,
// generate the time for the workflow to timeout based on when it was started.
// the timer will be started by the orchestrator so the client does not need to have access to the timer client.
timeoutTime: timeoutSeconds
? new Date(
new Date().getTime() + timeoutSeconds * 1000
).toISOString()
timeoutTime: timeout
? computeScheduleDate(timeout, this.baseTime()).toISOString()
: undefined,
context: {
name: executionName,
Expand Down
Loading