-
Notifications
You must be signed in to change notification settings - Fork 3
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: activity override and cancellation #101
base: main
Are you sure you want to change the base?
Changes from all commits
8d60e3b
0871669
073d789
bffc121
b7ce508
046ff4f
d879d04
555df09
398c653
d8a5175
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -11,6 +11,8 @@ import { | |
workflow, | ||
heartbeat, | ||
HeartbeatTimeout, | ||
completeActivity, | ||
failActivity, | ||
} from "@eventual/core"; | ||
import { SendMessageCommand, SQSClient } from "@aws-sdk/client-sqs"; | ||
import { AsyncWriterTestEvent } from "./async-writer-handler.js"; | ||
|
@@ -148,11 +150,13 @@ export const childWorkflow = workflow( | |
} | ||
); | ||
|
||
const slowActivity = activity( | ||
"slowAct", | ||
{ timeoutSeconds: 5 }, | ||
() => new Promise((resolve) => setTimeout(resolve, 10 * 1000)) | ||
); | ||
const delay = (seconds: number) => | ||
new Promise((resolve) => setTimeout(resolve, seconds * 1000)); | ||
|
||
const slowActivity = activity("slowAct", { timeoutSeconds: 5 }, async () => { | ||
await delay(10); | ||
return "done finally"; | ||
}); | ||
|
||
const slowWf = workflow("slowWorkflow", { timeoutSeconds: 5 }, () => | ||
sleepFor(10) | ||
|
@@ -332,3 +336,144 @@ const sendFinishEvent = activity("sendFinish", async (executionId: string) => { | |
proxy: true, | ||
}); | ||
}); | ||
|
||
/** | ||
* An event which is raised by the activityOverrideActivity. | ||
* Can provide a value to the activity via activity token or defer the responsibility to the workflow via signal. | ||
*/ | ||
const activityOverrideEvent = event<{ | ||
executionId: string; | ||
token: string; | ||
location: "handler" | "signal"; | ||
type: "complete" | "fail"; | ||
}>("activityOverrideEvent"); | ||
|
||
/** | ||
* An async activity which will be cancelled using it's activity token. | ||
*/ | ||
const activityOverrideActivity = activity( | ||
"eventPublish", | ||
async ({ | ||
type, | ||
location, | ||
executionId, | ||
}: { | ||
executionId: string; | ||
location: "handler" | "signal"; | ||
type: "complete" | "fail"; | ||
}) => { | ||
return asyncResult( | ||
async (token) => | ||
await activityOverrideEvent.publish({ | ||
token, | ||
location, | ||
type, | ||
executionId, | ||
}) | ||
); | ||
} | ||
); | ||
|
||
/** | ||
* A signal called by the activityOverrideEvent handler to pass the activity token to the workflow. | ||
* Used to test activity completion from the workflow. | ||
*/ | ||
const activityOverrideSignal = new Signal<{ | ||
token: string; | ||
type: "complete" | "fail"; | ||
}>("activityOverrideSignal"); | ||
|
||
activityOverrideEvent.on(async ({ token, location, type, executionId }) => { | ||
if (location === "handler") { | ||
if (type === "complete") { | ||
await completeActivity(token, "from the event handler!"); | ||
} else { | ||
await failActivity(token, new Error("WHY!!!")); | ||
} | ||
} else { | ||
await activityOverrideSignal.send(executionId, { token, type }); | ||
} | ||
}); | ||
|
||
/** | ||
* Activity which waits to be closed/cancelled. | ||
* If it is closed, it signals the workflow with "complete", if not it signals with "fail". | ||
*/ | ||
const activityOverrideAwareActivity = activity( | ||
"overrideAware", | ||
async ({ executionId }: { executionId: string }) => { | ||
let n = 0; | ||
while (n < 10) { | ||
await delay(1); | ||
const { closed } = await heartbeat(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. does heartbeat throw if it's been cancelled? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No, the activity chooses what to do with this information, it can keep running if the developer wants. (ex: fire and forget activities). |
||
if (closed) { | ||
await activityOverrideSignal.send(executionId, { | ||
token: "", | ||
type: "complete", | ||
}); | ||
return; | ||
} | ||
} | ||
|
||
await activityOverrideSignal.send(executionId, { token: "", type: "fail" }); | ||
} | ||
); | ||
|
||
/** | ||
* Activity Override Tests. | ||
* | ||
* 1. cancel an activity from the workflow and record the result | ||
* 2. Use the activity token to cancel, fail, or complete an async token from within an event handler or workflow | ||
* 3. Cancel an activity and use a signal to signify that it knows it was cancelled and record the result. | ||
*/ | ||
export const overrideWorkflow = workflow( | ||
"override", | ||
async (_, { execution: { id: executionId } }) => { | ||
const act = slowActivity(); | ||
act.cancel("because"); | ||
|
||
const results1 = await Promise.allSettled([act]); | ||
|
||
const signalHandler = activityOverrideSignal.on(async ({ token, type }) => { | ||
if (type === "complete") { | ||
await completeActivity(token, "from the signal handler!"); | ||
} else { | ||
await failActivity(token, new Error("BECAUSE!!!")); | ||
} | ||
}); | ||
|
||
const results2 = await Promise.allSettled([ | ||
activityOverrideActivity({ | ||
location: "handler", | ||
type: "complete", | ||
executionId, | ||
}), | ||
activityOverrideActivity({ | ||
location: "handler", | ||
type: "fail", | ||
executionId, | ||
}), | ||
activityOverrideActivity({ | ||
location: "signal", | ||
type: "complete", | ||
executionId, | ||
}), | ||
activityOverrideActivity({ | ||
location: "signal", | ||
type: "fail", | ||
executionId, | ||
}), | ||
]); | ||
|
||
const aware = activityOverrideAwareActivity({ executionId }); | ||
await sleepFor(1); | ||
aware.cancel("because"); | ||
|
||
signalHandler.dispose(); | ||
|
||
// the override activity SHOULD send a signal when it realizes it is cancelled. | ||
const result = await expectSignal(activityOverrideSignal); | ||
|
||
return [results1, results2, result]; | ||
} | ||
); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think its a good time to add comments about what each of the elements in this service are, and what they're used for in the tests. Its starting to become a lot to piece together.