Skip to content

Commit

Permalink
feat: support async handling and add CronJob status tracking (#894)
Browse files Browse the repository at this point in the history
## Description
This PR improves async handling in the CronJob class and adds status
tracking functionality.

- Modified the `fireOnTick()` method to return a Promise for better
async callback handling
- Added an `isCallbackRunning` flag to track the running state of
CronJob instances
- Updated the test suite to use the new async behavior and track the
job's running state
- Added `waitForCompletion` functionality to the `job.stop()` method
- waits for running jobs to complete before executing the `onComplete`
callback

During test case writing, I encountered a type error with sinon.
To resolve this, added `sinon.restore()` to the `afterEach` block.

Reference:
https://stackoverflow.com/questions/73232999/sinon-cant-install-fake-timers-twice-on-the-same-global-object

<img width="811" alt="스크린샷 2024-09-03 오후 7 10 58"
src="https://github.com/user-attachments/assets/b87deee7-14b2-4407-8fea-a1cb469ef44b">

## Related Issue
Closes #713
Closes #556

## Motivation and Context
These changes allow the CronJob class to handle asynchronous callbacks
more effectively and provide a way to track the running state of jobs.

## How Has This Been Tested?
- Updated existing test suite to verify the new async behavior
- Adjusted test timeouts to use the `tickAsync` method
- Added new test cases to check for proper waiting of running callbacks
before stopping

## Types of changes
- [ ] Bug fix (non-breaking change which fixes an issue)
- [x] New feature (non-breaking change which adds functionality)
- [ ] Breaking change (fix or feature that would cause existing
functionality to change)

## Checklist:
- [x] My code follows the code style of this project.
- [x] My change requires a change to the documentation.
- [ ] I have updated the documentation accordingly.
- [x] I have added tests to cover my changes.
- [x] All new and existing tests passed.
- [ ] If my change introduces a breaking change, I have added a `!`
after the type/scope in the title (see the Conventional Commits
standard).

---------

Co-authored-by: Brandon der Blätter <[email protected]>
  • Loading branch information
Zamoca42 and intcreator authored Dec 10, 2024
1 parent 94465ae commit b58fb6b
Show file tree
Hide file tree
Showing 4 changed files with 251 additions and 131 deletions.
19 changes: 19 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,8 @@ day of week 0-7 (0 or 7 is Sunday, or use names)

- `unrefTimeout`: [OPTIONAL] - Useful for controlling event loop behavior. More details [here](https://nodejs.org/api/timers.html#timers_timeout_unref).

- `waitForCompletion`: [OPTIONAL] - If `true`, no additional instances of the `onTick` callback function will run until the current onTick callback has completed. Any new scheduled executions that occur while the current callback is running will be skipped entirely. Default is `false`.

#### Methods

- `from` (static): Create a new CronJob object providing arguments as an object. See argument names and descriptions above.
Expand All @@ -214,6 +216,23 @@ day of week 0-7 (0 or 7 is Sunday, or use names)

- `addCallback`: Permits addition of `onTick` callbacks.

#### Properties

- `isCallbackRunning`: [READ-ONLY] Indicates if a callback is currently executing.

```javascript
const job = new CronJob('* * * * * *', async () => {
console.log(job.isCallbackRunning); // true during callback execution
await someAsyncTask();
console.log(job.isCallbackRunning); // still true until callback completes
});

console.log(job.isCallbackRunning); // false
job.start();
console.log(job.running); // true (schedule is active)
console.log(job.isCallbackRunning); // false (no callback executing)
```

### CronTime Class

#### Constructor
Expand Down
92 changes: 68 additions & 24 deletions src/job.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,16 @@ export class CronJob<OC extends CronOnCompleteCommand | null = null, C = null> {
onComplete?: WithOnComplete<OC> extends true
? CronOnCompleteCallback
: undefined;
waitForCompletion = false;

private _isCallbackRunning = false;
private _timeout?: NodeJS.Timeout;
private _callbacks: CronCallback<C, WithOnComplete<OC>>[] = [];

get isCallbackRunning() {
return this._isCallbackRunning;
}

constructor(
cronTime: CronJobParams<OC, C>['cronTime'],
onTick: CronJobParams<OC, C>['onTick'],
Expand All @@ -34,7 +40,8 @@ export class CronJob<OC extends CronOnCompleteCommand | null = null, C = null> {
context?: CronJobParams<OC, C>['context'],
runOnInit?: CronJobParams<OC, C>['runOnInit'],
utcOffset?: null,
unrefTimeout?: CronJobParams<OC, C>['unrefTimeout']
unrefTimeout?: CronJobParams<OC, C>['unrefTimeout'],
waitForCompletion?: CronJobParams<OC, C>['waitForCompletion']
);
constructor(
cronTime: CronJobParams<OC, C>['cronTime'],
Expand All @@ -45,7 +52,8 @@ export class CronJob<OC extends CronOnCompleteCommand | null = null, C = null> {
context?: CronJobParams<OC, C>['context'],
runOnInit?: CronJobParams<OC, C>['runOnInit'],
utcOffset?: CronJobParams<OC, C>['utcOffset'],
unrefTimeout?: CronJobParams<OC, C>['unrefTimeout']
unrefTimeout?: CronJobParams<OC, C>['unrefTimeout'],
waitForCompletion?: CronJobParams<OC, C>['waitForCompletion']
);
constructor(
cronTime: CronJobParams<OC, C>['cronTime'],
Expand All @@ -56,9 +64,11 @@ export class CronJob<OC extends CronOnCompleteCommand | null = null, C = null> {
context?: CronJobParams<OC, C>['context'],
runOnInit?: CronJobParams<OC, C>['runOnInit'],
utcOffset?: CronJobParams<OC, C>['utcOffset'],
unrefTimeout?: CronJobParams<OC, C>['unrefTimeout']
unrefTimeout?: CronJobParams<OC, C>['unrefTimeout'],
waitForCompletion?: CronJobParams<OC, C>['waitForCompletion']
) {
this.context = (context ?? this) as CronContext<C>;
this.waitForCompletion = Boolean(waitForCompletion);

// runtime check for JS users
if (timeZone != null && utcOffset != null) {
Expand Down Expand Up @@ -92,7 +102,7 @@ export class CronJob<OC extends CronOnCompleteCommand | null = null, C = null> {

if (runOnInit) {
this.lastExecution = new Date();
this.fireOnTick();
void this.fireOnTick();
}

if (start) this.start();
Expand All @@ -117,7 +127,8 @@ export class CronJob<OC extends CronOnCompleteCommand | null = null, C = null> {
params.context,
params.runOnInit,
params.utcOffset,
params.unrefTimeout
params.unrefTimeout,
params.waitForCompletion
);
} else if (params.utcOffset != null) {
return new CronJob<OC, C>(
Expand All @@ -129,7 +140,8 @@ export class CronJob<OC extends CronOnCompleteCommand | null = null, C = null> {
params.context,
params.runOnInit,
params.utcOffset,
params.unrefTimeout
params.unrefTimeout,
params.waitForCompletion
);
} else {
return new CronJob<OC, C>(
Expand All @@ -141,7 +153,8 @@ export class CronJob<OC extends CronOnCompleteCommand | null = null, C = null> {
params.context,
params.runOnInit,
params.utcOffset,
params.unrefTimeout
params.unrefTimeout,
params.waitForCompletion
);
}
}
Expand Down Expand Up @@ -193,14 +206,26 @@ export class CronJob<OC extends CronOnCompleteCommand | null = null, C = null> {
return this.cronTime.sendAt();
}

fireOnTick() {
for (const callback of this._callbacks) {
void callback.call(
this.context,
this.onComplete as WithOnComplete<OC> extends true
? CronOnCompleteCallback
: never
);
async fireOnTick() {
if (!this.waitForCompletion && this._isCallbackRunning) return;

this._isCallbackRunning = true;

try {
for (const callback of this._callbacks) {
const result = callback.call(
this.context,
this.onComplete as WithOnComplete<OC> extends true
? CronOnCompleteCallback
: never
);

if (this.waitForCompletion) await result;
}
} catch (error) {
console.error('[Cron] error in callback', error);
} finally {
this._isCallbackRunning = false;
}
}

Expand All @@ -209,9 +234,7 @@ export class CronJob<OC extends CronOnCompleteCommand | null = null, C = null> {
}

start() {
if (this.running) {
return;
}
if (this.running) return;

const MAXDELAY = 2147483647; // The maximum number of milliseconds setTimeout will wait.
let timeout = this.cronTime.getTimeout();
Expand Down Expand Up @@ -262,11 +285,9 @@ export class CronJob<OC extends CronOnCompleteCommand | null = null, C = null> {
this.running = false;

// start before calling back so the callbacks have the ability to stop the cron job
if (!this.runOnce) {
this.start();
}
if (!this.runOnce) this.start();

this.fireOnTick();
void this.fireOnTick();
}
};

Expand All @@ -290,14 +311,37 @@ export class CronJob<OC extends CronOnCompleteCommand | null = null, C = null> {
return this.lastExecution;
}

private async _executeOnComplete() {
if (typeof this.onComplete !== 'function') return;

try {
await this.onComplete.call(this.context);
} catch (error) {
console.error('[Cron] error in onComplete callback:', error);
}
}

private async _waitForJobCompletion() {
while (this._isCallbackRunning) {
await new Promise(resolve => setTimeout(resolve, 100));
}
}

/**
* Stop the cronjob.
*/
stop() {
if (this._timeout) clearTimeout(this._timeout);
this.running = false;
if (typeof this.onComplete === 'function') {
void this.onComplete.call(this.context);

if (!this.waitForCompletion) {
void this._executeOnComplete();
return;
}

void Promise.resolve().then(async () => {
await this._waitForJobCompletion();
await this._executeOnComplete();
});
}
}
1 change: 1 addition & 0 deletions src/types/cron.types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ interface BaseCronJobParams<
context?: C;
runOnInit?: boolean | null;
unrefTimeout?: boolean | null;
waitForCompletion?: boolean | null;
}

export type CronJobParams<
Expand Down
Loading

0 comments on commit b58fb6b

Please sign in to comment.