Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: replace sleep with time and duration #221

Merged
merged 17 commits into from
Jan 15, 2023
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions apps/test-app-runtime/src/slack-bot.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import { Slack, SlackCredentials } from "@eventual/integrations-slack";
import { AWSSecret } from "@eventual/aws-client";
import {
duration,
expectSignal,
JsonSecret,
sendSignal,
sleepFor,
workflow,
} from "@eventual/core";
import ms from "ms";
Expand Down Expand Up @@ -47,7 +47,7 @@ const remindMe = workflow(
message: string;
waitSeconds: number;
}) => {
await sleepFor(request.waitSeconds);
await duration(request.waitSeconds);

await slack.client.chat.postMessage({
channel: request.channel,
Expand Down
42 changes: 24 additions & 18 deletions apps/tests/aws-runtime/test/test-service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@ import {
expectSignal,
asyncResult,
sendSignal,
sleepFor,
sleepUntil,
time,
workflow,
sendActivityHeartbeat,
HeartbeatTimeout,
EventualError,
signal,
duration,
} from "@eventual/core";
import { SendMessageCommand, SQSClient } from "@aws-sdk/client-sqs";
import { AsyncWriterTestEvent } from "./async-writer-handler.js";
Expand Down Expand Up @@ -62,8 +62,8 @@ export const workflow2 = workflow("my-parent-workflow", async () => {
});

export const workflow3 = workflow("sleepy", async () => {
await sleepFor(2);
await sleepUntil(new Date(new Date().getTime() + 1000 * 2));
await duration(2);
await time(new Date(new Date().getTime() + 1000 * 2));
return `done!`;
});

Expand All @@ -81,7 +81,7 @@ export const workflow4 = workflow("parallel", async () => {
return Promise.allSettled([greetings, greetings2, greetings3, any, race]);

async function sayHelloInSeconds(seconds: number) {
await sleepFor(seconds);
await duration(seconds);
return await hello("sam");
}
});
Expand All @@ -95,7 +95,7 @@ const doneSignal = signal("done");
export const parentWorkflow = workflow("parentWorkflow", async () => {
const child = childWorkflow({ name: "child" });
while (true) {
const n = await mySignal.expectSignal({ timeoutSeconds: 10 });
const n = await mySignal.expectSignal({ timeout: duration(10, "seconds") });

console.log(n);

Expand Down Expand Up @@ -142,7 +142,9 @@ export const childWorkflow = workflow(
while (!done) {
sendSignal(parentId, mySignal, last + 1);
block = true;
if (!(await condition({ timeoutSeconds: 10 }, () => !block))) {
if (
!(await condition({ timeout: duration(10, "seconds") }, () => !block))
) {
throw new Error("timed out!");
}
}
Expand All @@ -153,27 +155,31 @@ export const childWorkflow = workflow(

const slowActivity = activity(
"slowAct",
{ timeoutSeconds: 5 },
{ timeout: duration(5, "seconds") },
() => new Promise((resolve) => setTimeout(resolve, 10 * 1000))
);

const slowWf = workflow("slowWorkflow", { timeoutSeconds: 5 }, () =>
sleepFor(10)
const slowWf = workflow(
"slowWorkflow",
{ timeout: duration(5, "seconds") },
() => duration(10)
);

export const timedOutWorkflow = workflow(
"timedOut",
{ timeoutSeconds: 100 },
{ timeout: duration(100, "seconds") },
async () => {
// chains to be able to run in parallel.
const timedOutFunctions = {
condition: async () => {
if (!(await condition({ timeoutSeconds: 2 }, () => false))) {
if (
!(await condition({ timeout: duration(2, "seconds") }, () => false))
) {
throw new Error("Timed Out!");
}
},
signal: async () => {
await mySignal.expectSignal({ timeoutSeconds: 2 });
await mySignal.expectSignal({ timeout: duration(2, "seconds") });
},
activity: slowActivity,
workflow: () => slowWf(undefined),
Expand All @@ -196,7 +202,7 @@ export const timedOutWorkflow = workflow(

export const asyncWorkflow = workflow(
"asyncWorkflow",
{ timeoutSeconds: 100 }, // timeout eventually
{ timeout: duration(100, "seconds") }, // timeout eventually
async () => {
const result = await asyncActivity("complete");

Expand All @@ -211,7 +217,7 @@ export const asyncWorkflow = workflow(

const activityWithHeartbeat = activity(
"activityWithHeartbeat",
{ heartbeatSeconds: 2 },
{ heartbeatTimeout: duration(2, "seconds") },
async (n: number, type: "success" | "no-heartbeat" | "some-heartbeat") => {
const delay = (s: number) =>
new Promise((resolve) => {
Expand All @@ -234,7 +240,7 @@ const activityWithHeartbeat = activity(

export const heartbeatWorkflow = workflow(
"heartbeatWorkflow",
{ timeoutSeconds: 100 }, // timeout eventually
{ timeout: duration(100, "seconds") }, // timeout eventually
async (n: number) => {
return await Promise.allSettled([
activityWithHeartbeat(n, "success"),
Expand Down Expand Up @@ -291,13 +297,13 @@ export const eventDrivenWorkflow = workflow(

// wait for the event to come back around and wake this workflow
const { value } = await expectSignal("start", {
timeoutSeconds: 30,
timeout: duration(30, "seconds"),
});

await sendFinishEvent(ctx.execution.id);

await expectSignal("finish", {
timeoutSeconds: 30,
timeout: duration(30, "seconds"),
});

return value;
Expand Down
3 changes: 1 addition & 2 deletions packages/@eventual/aws-cdk/src/service-function.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { ServiceType, SERVICE_TYPE_FLAG } from "@eventual/core";
import { ServiceType } from "@eventual/core";
import {
Architecture,
Code,
Expand Down Expand Up @@ -28,7 +28,6 @@ export class ServiceFunction extends Function {
environment: {
...props.environment,
NODE_OPTIONS: "--enable-source-maps",
[SERVICE_TYPE_FLAG]: props.serviceType,
},
});
}
Expand Down
11 changes: 7 additions & 4 deletions packages/@eventual/aws-runtime/src/clients/workflow-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import {
lookupWorkflow,
SortOrder,
isExecutionStatus,
computeDurationDate,
} from "@eventual/core";
import { ulid } from "ulidx";
import { inspect } from "util";
Expand Down Expand Up @@ -56,7 +57,7 @@ export class AWSWorkflowClient extends WorkflowClient {
executionName = ulid(),
workflow,
input,
timeoutSeconds,
timeout,
...request
}: StartExecutionRequest<W> | StartChildExecutionRequest<W>) {
if (typeof workflow === "string" && !lookupWorkflow(workflow)) {
Expand Down Expand Up @@ -107,9 +108,11 @@ export class AWSWorkflowClient extends WorkflowClient {
workflowName,
// generate the time for the workflow to timeout based on when it was started.
// the timer will be started by the orchestrator so the client does not need to have access to the timer client.
timeoutTime: timeoutSeconds
? new Date(
new Date().getTime() + timeoutSeconds * 1000
timeoutTime: timeout
? computeDurationDate(
new Date(),
timeout.dur,
timeout.unit
).toISOString()
: undefined,
context: {
Expand Down
39 changes: 29 additions & 10 deletions packages/@eventual/aws-runtime/src/handlers/api/executions/new.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
// startWorkflow uses the global workflows() to validate the workflow name.
import "@eventual/entry/injected";

import type { StartExecutionResponse } from "@eventual/core";
import {
DurationUnit,
DURATION_UNITS,
isDurationUnit,
StartExecutionResponse,
} from "@eventual/core";
import type {
APIGatewayProxyEventV2,
APIGatewayProxyHandlerV2,
Expand All @@ -20,21 +25,30 @@ const workflowClient = createWorkflowClient({
* * workflowName - name of the workflow to start
*
* Query Parameters:
* * timeoutSeconds - Number of seconds the workflow should run before it times out. Default: use the configured timeout or no timeout.
* * timeout - Number of `timeoutUnit` (default seconds) the workflow should run before it times out. Default: use the configured timeout or no timeout.
* * timeoutUnit - "seconds" | "minutes" | "hours" | "days" | "years". Units to use for the timeout, default: "seconds".
* * executionName - name to give the workflow. Default: auto generated UUID.
*/
export const handler: APIGatewayProxyHandlerV2<StartExecutionResponse> =
withErrorMiddleware(async (event: APIGatewayProxyEventV2) => {
const { timeoutSeconds: timeoutSecondsString, executionName } =
event.queryStringParameters ?? {};
const {
timeout: timeoutString,
timeoutUnit,
executionName,
} = event.queryStringParameters ?? {};

const timeout = timeoutString ? parseInt(timeoutString) : undefined;

const timeoutSeconds = timeoutSecondsString
? parseInt(timeoutSecondsString)
: undefined;
if (timeout !== undefined && isNaN(timeout)) {
throw new Error(
"Expected optional parameter timeout to be a valid number"
);
}

if (timeoutSeconds !== undefined && isNaN(timeoutSeconds)) {
if (timeoutUnit && !isDurationUnit(timeoutUnit)) {
throw new Error(
"Expected optional parameter timeoutSeconds to be a valid number"
"Expected optional parameter timeoutUnit to be one of: " +
DURATION_UNITS.join()
);
}

Expand All @@ -47,6 +61,11 @@ export const handler: APIGatewayProxyHandlerV2<StartExecutionResponse> =
workflow: workflowName,
input: event.body && JSON.parse(event.body),
executionName,
timeoutSeconds,
timeout: timeout
? {
dur: timeout,
unit: (timeoutUnit as DurationUnit) ?? "seconds",
}
: undefined,
});
});
15 changes: 14 additions & 1 deletion packages/@eventual/cli/src/commands/start.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import {
isWorkflowSucceeded,
isWorkflowFailed,
ExecutionEventsResponse,
DURATION_UNITS,
DurationUnit,
} from "@eventual/core";
import { Argv } from "yargs";
import { serviceAction, setServiceOptions } from "../service-action.js";
Expand Down Expand Up @@ -49,6 +51,12 @@ export const start = (yargs: Argv) =>
type: "number",
defaultDescription:
"Configured on the workflow definition or no timeout.",
})
.option("timeoutUnit", {
describe: "Number of seconds until the execution times out.",
type: "string",
choices: DURATION_UNITS,
default: "seconds",
}),
(args) => {
return serviceAction(
Expand Down Expand Up @@ -94,7 +102,12 @@ export const start = (yargs: Argv) =>
workflow: args.workflow,
input: inputJSON,
executionName: args.name,
timeoutSeconds: args.timeout,
timeout: args.timeout
? {
dur: args.timeout,
unit: args.timeoutUnit as DurationUnit,
}
: undefined,
});
}
}
Expand Down
3 changes: 2 additions & 1 deletion packages/@eventual/client/src/http-service-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@ export class HttpServiceClient implements EventualServiceClient {
: request.workflow.workflowName;

const queryString = formatQueryString({
timeoutSeconds: request.timeoutSeconds,
timeout: request.timeout?.dur,
timeoutUnit: request.timeout?.unit,
executionName: request.executionName,
});

Expand Down
2 changes: 1 addition & 1 deletion packages/@eventual/compiler/test-files/workflow.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,6 @@ export const workflow3 = workflow("timeoutFlow", async () => {
await callMe();

async function callMe() {
await sleepFor(20);
await duration(20, "seconds");
}
});
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ exports[`esbuild-plugin ts workflow 1`] = `
});
var workflow3 = workflow("timeoutFlow", function* () {
const callMe = $eventual(function* () {
yield sleepFor(20);
yield duration(20, "seconds");
});
yield callMe();
});
Expand Down
18 changes: 14 additions & 4 deletions packages/@eventual/core/src/activity.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
import { DurationSpec } from "./await-time.js";
import { createActivityCall } from "./calls/activity-call.js";
import { createAwaitDurationCall } from "./calls/await-time-call.js";
import {
callableActivities,
getActivityContext,
getServiceClient,
} from "./global.js";
import { computeDurationSeconds } from "./index.js";
import { isActivityWorker, isOrchestratorWorker } from "./runtime/flags.js";
import {
EventualServiceClient,
Expand All @@ -19,7 +22,7 @@ export interface ActivityOptions {
*
* @default - workflow will run forever.
*/
timeoutSeconds?: number;
timeout?: DurationSpec;
/**
* For long running activities, it is suggested that they report back that they
* are still in progress to avoid waiting forever or until a long timeout when
Expand All @@ -30,7 +33,7 @@ export interface ActivityOptions {
*
* If it fails to do so, the workflow will cancel the activity and throw an error.
*/
heartbeatSeconds?: number;
heartbeatTimeout?: DurationSpec;
}

export interface ActivityFunction<Arguments extends any[], Output = any> {
Expand Down Expand Up @@ -225,8 +228,15 @@ export function activity<Arguments extends any[], Output = any>(
return createActivityCall(
activityID,
args,
opts?.timeoutSeconds,
opts?.heartbeatSeconds
opts?.timeout
? createAwaitDurationCall(opts.timeout.dur, opts.timeout.unit)
: undefined,
opts?.heartbeatTimeout
? computeDurationSeconds(
opts.heartbeatTimeout.dur,
opts.heartbeatTimeout.unit
) / 1000
: undefined
) as any;
} else {
// calling the activity from outside the orchestrator just calls the handler
Expand Down
Loading