diff --git a/README.md b/README.md index 2095172..95c30b8 100644 --- a/README.md +++ b/README.md @@ -291,10 +291,14 @@ the semaphore is released. `runExclusive` returns a promise that adopts the stat The semaphore is released and the result rejected if an exception occurs during execution of the callback. -`runExclusive` accepts an optional argument `weight`. Specifying a `weight` will decrement the +`runExclusive` accepts a first optional argument `weight`. Specifying a `weight` will decrement the semaphore by the specified value, and the callback will only be invoked once the semaphore's value greater or equal to `weight`. +`runExclusive` accepts a second optional argument `priority`. Specifying a greater value for `priority` +tells the scheduler to run this task before other tasks. `priority` can be any real number. The default +is zero. + ### Manual locking / releasing Promise style: @@ -328,10 +332,14 @@ has completed. The `release` callback is idempotent. likely deadlock the application. Make sure to call `release` under all circumstances and handle exceptions accordingly. -`runExclusive` accepts an optional argument `weight`. Specifying a `weight` will decrement the -semaphore by the specified value, and the semaphore will only be acquired once the its +`acquire` accepts a first optional argument `weight`. Specifying a `weight` will decrement the +semaphore by the specified value, and the semaphore will only be acquired once its value is greater or equal to `weight`. +`acquire` accepts a second optional argument `priority`. Specifying a greater value for `priority` +tells the scheduler to release the semaphore to the caller before other callers. `priority` can be +any real number. The default is zero. + ### Unscoped release As an alternative to calling the `release` callback returned by `acquire`, the semaphore @@ -444,8 +452,10 @@ await semaphore.waitForUnlock(); // ... ``` -`waitForUnlock` accepts an optional argument `weight`. If `weight` is specified the promise -will only resolve once the semaphore's value is greater or equal to `weight`; +`waitForUnlock` accepts optional arguments `weight` and `priority`. The promise will resolve as soon +as it is possible to `acquire` the semaphore with the given weight and priority. Scheduled tasks with +the greatest `priority` values execute first. + ## Limiting the time waiting for a mutex or semaphore to become available diff --git a/src/Mutex.ts b/src/Mutex.ts index 5298c22..20cffd6 100644 --- a/src/Mutex.ts +++ b/src/Mutex.ts @@ -6,22 +6,22 @@ class Mutex implements MutexInterface { this._semaphore = new Semaphore(1, cancelError); } - async acquire(): Promise { - const [, releaser] = await this._semaphore.acquire(); + async acquire(priority = 0): Promise { + const [, releaser] = await this._semaphore.acquire(1, priority); return releaser; } - runExclusive(callback: MutexInterface.Worker): Promise { - return this._semaphore.runExclusive(() => callback()); + runExclusive(callback: MutexInterface.Worker, priority = 0): Promise { + return this._semaphore.runExclusive(() => callback(), 1, priority); } isLocked(): boolean { return this._semaphore.isLocked(); } - waitForUnlock(): Promise { - return this._semaphore.waitForUnlock(); + waitForUnlock(priority = 0): Promise { + return this._semaphore.waitForUnlock(1, priority); } release(): void { diff --git a/src/MutexInterface.ts b/src/MutexInterface.ts index 31e1a3e..c09f22a 100644 --- a/src/MutexInterface.ts +++ b/src/MutexInterface.ts @@ -1,9 +1,9 @@ interface MutexInterface { - acquire(): Promise; + acquire(priority?: number): Promise; - runExclusive(callback: MutexInterface.Worker): Promise; + runExclusive(callback: MutexInterface.Worker, priority?: number): Promise; - waitForUnlock(): Promise; + waitForUnlock(priority?: number): Promise; isLocked(): boolean; diff --git a/src/Semaphore.ts b/src/Semaphore.ts index 90ae148..b912f2d 100644 --- a/src/Semaphore.ts +++ b/src/Semaphore.ts @@ -1,27 +1,43 @@ import { E_CANCELED } from './errors'; import SemaphoreInterface from './SemaphoreInterface'; + +interface Priority { + priority: number; +} + interface QueueEntry { resolve(result: [number, SemaphoreInterface.Releaser]): void; reject(error: unknown): void; + weight: number; + priority: number; +} + +interface Waiter { + resolve(): void; + priority: number; } class Semaphore implements SemaphoreInterface { constructor(private _value: number, private _cancelError: Error = E_CANCELED) {} - acquire(weight = 1): Promise<[number, SemaphoreInterface.Releaser]> { + acquire(weight = 1, priority = 0): Promise<[number, SemaphoreInterface.Releaser]> { if (weight <= 0) throw new Error(`invalid weight ${weight}: must be positive`); return new Promise((resolve, reject) => { - if (!this._weightedQueues[weight - 1]) this._weightedQueues[weight - 1] = []; - this._weightedQueues[weight - 1].push({ resolve, reject }); - - this._dispatch(); + const task: QueueEntry = { resolve, reject, weight, priority }; + const i = findIndexFromEnd(this._queue, (other) => priority <= other.priority); + if (i === -1 && weight <= this._value) { + // Needs immediate dispatch, skip the queue + this._dispatchItem(task); + } else { + this._queue.splice(i + 1, 0, task); + } }); } - async runExclusive(callback: SemaphoreInterface.Worker, weight = 1): Promise { - const [value, release] = await this.acquire(weight); + async runExclusive(callback: SemaphoreInterface.Worker, weight = 1, priority = 0): Promise { + const [value, release] = await this.acquire(weight, priority); try { return await callback(value); @@ -30,15 +46,17 @@ class Semaphore implements SemaphoreInterface { } } - waitForUnlock(weight = 1): Promise { + waitForUnlock(weight = 1, priority = 0): Promise { if (weight <= 0) throw new Error(`invalid weight ${weight}: must be positive`); - return new Promise((resolve) => { - if (!this._weightedWaiters[weight - 1]) this._weightedWaiters[weight - 1] = []; - this._weightedWaiters[weight - 1].push(resolve); - - this._dispatch(); - }); + if (this._couldLockImmediately(weight, priority)) { + return Promise.resolve(); + } else { + return new Promise((resolve) => { + if (!this._weightedWaiters[weight - 1]) this._weightedWaiters[weight - 1] = []; + insertSorted(this._weightedWaiters[weight - 1], { resolve, priority }); + }); + } } isLocked(): boolean { @@ -51,36 +69,33 @@ class Semaphore implements SemaphoreInterface { setValue(value: number): void { this._value = value; - this._dispatch(); + this._dispatchQueue(); } release(weight = 1): void { if (weight <= 0) throw new Error(`invalid weight ${weight}: must be positive`); this._value += weight; - this._dispatch(); + this._dispatchQueue(); } cancel(): void { - this._weightedQueues.forEach((queue) => queue.forEach((entry) => entry.reject(this._cancelError))); - this._weightedQueues = []; + this._queue.forEach((entry) => entry.reject(this._cancelError)); + this._queue = []; } - private _dispatch(): void { - for (let weight = this._value; weight > 0; weight--) { - const queueEntry = this._weightedQueues[weight - 1]?.shift(); - if (!queueEntry) continue; - - const previousValue = this._value; - const previousWeight = weight; - - this._value -= weight; - weight = this._value + 1; - - queueEntry.resolve([previousValue, this._newReleaser(previousWeight)]); + private _dispatchQueue(): void { + this._drainUnlockWaiters(); + while (this._queue.length > 0 && this._queue[0].weight <= this._value) { + this._dispatchItem(this._queue.shift()!); + this._drainUnlockWaiters(); } + } - this._drainUnlockWaiters(); + private _dispatchItem(item: QueueEntry): void { + const previousValue = this._value; + this._value -= item.weight; + item.resolve([previousValue, this._newReleaser(item.weight)]); } private _newReleaser(weight: number): () => void { @@ -95,16 +110,46 @@ class Semaphore implements SemaphoreInterface { } private _drainUnlockWaiters(): void { - for (let weight = this._value; weight > 0; weight--) { - if (!this._weightedWaiters[weight - 1]) continue; - - this._weightedWaiters[weight - 1].forEach((waiter) => waiter()); - this._weightedWaiters[weight - 1] = []; + if (this._queue.length === 0) { + for (let weight = this._value; weight > 0; weight--) { + const waiters = this._weightedWaiters[weight - 1]; + if (!waiters) continue; + waiters.forEach((waiter) => waiter.resolve()); + this._weightedWaiters[weight - 1] = []; + } + } else { + const queuedPriority = this._queue[0].priority; + for (let weight = this._value; weight > 0; weight--) { + const waiters = this._weightedWaiters[weight - 1]; + if (!waiters) continue; + const i = waiters.findIndex((waiter) => waiter.priority <= queuedPriority); + (i === -1 ? waiters : waiters.splice(0, i)) + .forEach((waiter => waiter.resolve())); + } } } - private _weightedQueues: Array> = []; - private _weightedWaiters: Array void>> = []; + private _couldLockImmediately(weight: number, priority: number) { + return (this._queue.length === 0 || this._queue[0].priority < priority) && + weight <= this._value; + } + + private _queue: Array = []; + private _weightedWaiters: Array> = []; +} + +function insertSorted(a: T[], v: T) { + const i = findIndexFromEnd(a, (other) => v.priority <= other.priority); + a.splice(i + 1, 0, v); +} + +function findIndexFromEnd(a: T[], predicate: (e: T) => boolean): number { + for (let i = a.length - 1; i >= 0; i--) { + if (predicate(a[i])) { + return i; + } + } + return -1; } export default Semaphore; diff --git a/src/SemaphoreInterface.ts b/src/SemaphoreInterface.ts index 6398c24..14e40f6 100644 --- a/src/SemaphoreInterface.ts +++ b/src/SemaphoreInterface.ts @@ -1,9 +1,9 @@ interface SemaphoreInterface { - acquire(weight?: number): Promise<[number, SemaphoreInterface.Releaser]>; + acquire(weight?: number, priority?: number): Promise<[number, SemaphoreInterface.Releaser]>; - runExclusive(callback: SemaphoreInterface.Worker, weight?: number): Promise; + runExclusive(callback: SemaphoreInterface.Worker, weight?: number, priority?: number): Promise; - waitForUnlock(weight?: number): Promise; + waitForUnlock(weight?: number, priority?: number): Promise; isLocked(): boolean; diff --git a/src/withTimeout.ts b/src/withTimeout.ts index 4e75352..77e232f 100644 --- a/src/withTimeout.ts +++ b/src/withTimeout.ts @@ -7,7 +7,14 @@ export function withTimeout(mutex: MutexInterface, timeout: number, timeoutError export function withTimeout(semaphore: SemaphoreInterface, timeout: number, timeoutError?: Error): SemaphoreInterface; export function withTimeout(sync: MutexInterface | SemaphoreInterface, timeout: number, timeoutError = E_TIMEOUT): any { return { - acquire: (weight?: number): Promise => { + acquire: (weightOrPriority?: number, priority?: number): Promise => { + let weight: number | undefined; + if (isSemaphore(sync)) { + weight = weightOrPriority; + } else { + weight = undefined; + priority = weightOrPriority; + } if (weight !== undefined && weight <= 0) { throw new Error(`invalid weight ${weight}: must be positive`); } @@ -21,8 +28,10 @@ export function withTimeout(sync: MutexInterface | SemaphoreInterface, timeout: }, timeout); try { - const ticket = await sync.acquire(weight); - + const ticket = await (isSemaphore(sync) + ? sync.acquire(weight, priority) + : sync.acquire(priority) + ); if (isTimeout) { const release = Array.isArray(ticket) ? ticket[1] : ticket; @@ -41,11 +50,11 @@ export function withTimeout(sync: MutexInterface | SemaphoreInterface, timeout: }); }, - async runExclusive(callback: (value?: number) => Promise | T, weight?: number): Promise { + async runExclusive(callback: (value?: number) => Promise | T, weight?: number, priority?: number): Promise { let release: () => void = () => undefined; try { - const ticket = await this.acquire(weight); + const ticket = await this.acquire(weight, priority); if (Array.isArray(ticket)) { release = ticket[1]; @@ -69,16 +78,26 @@ export function withTimeout(sync: MutexInterface | SemaphoreInterface, timeout: return sync.cancel(); }, - waitForUnlock: (weight?: number): Promise => { + waitForUnlock: (weightOrPriority?: number, priority?: number): Promise => { + let weight: number | undefined; + if (isSemaphore(sync)) { + weight = weightOrPriority; + } else { + weight = undefined; + priority = weightOrPriority; + } if (weight !== undefined && weight <= 0) { throw new Error(`invalid weight ${weight}: must be positive`); } return new Promise((resolve, reject) => { const handle = setTimeout(() => reject(timeoutError), timeout); - sync.waitForUnlock(weight).then(() => { - clearTimeout(handle); - resolve(); + (isSemaphore(sync) + ? sync.waitForUnlock(weight, priority) + : sync.waitForUnlock(priority) + ).then(() => { + clearTimeout(handle); + resolve(); }); }); }, @@ -90,3 +109,7 @@ export function withTimeout(sync: MutexInterface | SemaphoreInterface, timeout: setValue: (value: number) => (sync as SemaphoreInterface).setValue(value), }; } + +function isSemaphore(sync: SemaphoreInterface | MutexInterface): sync is SemaphoreInterface { + return (sync as SemaphoreInterface).getValue !== undefined; +} diff --git a/test/mutex.ts b/test/mutex.ts index 241aafc..d945209 100644 --- a/test/mutex.ts +++ b/test/mutex.ts @@ -36,6 +36,33 @@ export const mutexSuite = (factory: (cancelError?: Error) => MutexInterface): vo assert(flag); })); + test('acquire unblocks the highest-priority task first', () => + withTimer(clock, async () => { + const values: number[] = []; + + // Scheduled immediately + mutex.acquire(0).then((release) => { + values.push(0); + setTimeout(release, 100) + }); + + // Low priority task + mutex.acquire(-1).then((release) => { + values.push(-1); + setTimeout(release, 100) + }); + + // High priority task; jumps the queue + mutex.acquire(1).then((release) => { + values.push(1); + setTimeout(release, 100) + }); + + await clock.runAllAsync(); + assert.deepStrictEqual(values, [0, 1, -1]); + }) + ); + test('runExclusive passes result (immediate)', async () => { assert.strictEqual(await mutex.runExclusive(() => 10), 10); }); @@ -81,6 +108,15 @@ export const mutexSuite = (factory: (cancelError?: Error) => MutexInterface): vo assert(flag); })); + test('runExclusive unblocks the highest-priority task first', async () => { + const values: number[] = []; + mutex.runExclusive(() => { values.push(0); }, 0); + mutex.runExclusive(() => { values.push(-1); }, -1); + mutex.runExclusive(() => { values.push(+1); }, +1); + await clock.runAllAsync(); + assert.deepStrictEqual(values, [0, +1, -1]); + }); + test('exceptions during runExclusive do not leave mutex locked', async () => { let flag = false; @@ -265,6 +301,26 @@ export const mutexSuite = (factory: (cancelError?: Error) => MutexInterface): vo assert.strictEqual(flag, true); }); + + test('waitForUnlock unblocks high-priority waiters before low-priority queued tasks', async () => { + mutex.acquire(0); // Immediately scheduled + mutex.acquire(0); // Waiting + let flag = false; + mutex.waitForUnlock(1).then(() => { flag = true; }); + mutex.release(); + await clock.tickAsync(0); + assert.strictEqual(flag, true); + }); + + test('waitForUnlock unblocks low-priority waiters after high-priority queued tasks', async () => { + mutex.acquire(0); // Immediately scheduled + mutex.acquire(0); // Waiting + let flag = false; + mutex.waitForUnlock(-1).then(() => { flag = true; }); + mutex.release(); + await clock.tickAsync(0); + assert.strictEqual(flag, false); + }); }; suite('Mutex', () => mutexSuite((e) => new Mutex(e))); diff --git a/test/semaphoreSuite.ts b/test/semaphoreSuite.ts index 4c9bc6c..c75a179 100644 --- a/test/semaphoreSuite.ts +++ b/test/semaphoreSuite.ts @@ -50,6 +50,89 @@ export const semaphoreSuite = (factory: (maxConcurrency: number, err?: Error) => assert.deepStrictEqual(values.sort(), [2, 2]); }); + test('acquire unblocks high-priority tasks first', async () => { + const values: Array = []; + + // priority=0; runs first because nothing else is waiting + semaphore.acquire(2, 0).then(([, release]) => { + values.push(0); + setTimeout(release, 100); + }); + + // priority=-1; queues first + semaphore.acquire(2, -1).then(([, release]) => { + values.push(-1); + setTimeout(release, 100); + }); + + // priority=+1; jumps ahead of priority=-1 + semaphore.acquire(2, +1).then(([, release]) => { + values.push(+1); + setTimeout(release, 100); + }); + + await clock.runAllAsync(); + assert.deepStrictEqual(values, [0, +1, -1]); + }); + + test('acquire allows light high-priority tasks to skip the line', async () => { + let executed = false; + semaphore.acquire(3, 0); + semaphore.acquire(1, 1).then(([, release]) => { + executed = true; + setTimeout(release, 100); + }); + await clock.runAllAsync(); + assert.strictEqual(executed, true); + }); + + test('acquire prioritizes high-priority tasks even if they are heavier', async () => { + const values: Array = []; + + // two items with weight 1; runs first because nothing else is waiting + semaphore.acquire(1, 0).then(([, release]) => { + values.push(0); + setTimeout(release, 100); + }); + semaphore.acquire(1, 0).then(([, release]) => { + values.push(0); + setTimeout(release, 100); + }); + + // low-priority item with weight 1 + semaphore.acquire(1, -1).then(([, release]) => { + values.push(-1); + setTimeout(release, 100); + }); + + // high-priority item with weight 2; should run before the others + semaphore.acquire(2, +1).then(([, release]) => { + values.push(+1); + setTimeout(release, 100); + }); + + await clock.runAllAsync(); + assert.deepStrictEqual(values, [0, 0, +1, -1]); + }); + + test('acquire allows heavy items to run eventually', async () => { + let done = false; + async function lightLoop() { + while (!done) { + const [,release] = await semaphore.acquire(1); + await new Promise((resolve) => { setTimeout(resolve, 10); }); + release(); + } + } + lightLoop(); + await clock.tickAsync(5); + lightLoop(); + semaphore.acquire(2).then(() => { done = true; }); + await clock.tickAsync(10); + await clock.tickAsync(10); + assert.strictEqual(done, true); + }); + test('acquire blocks when the semaphore has reached zero until it is released again', async () => { const values: Array = []; @@ -232,6 +315,15 @@ export const semaphoreSuite = (factory: (maxConcurrency: number, err?: Error) => assert.strictEqual(semaphore.getValue(), 2); }); + test('runExclusive executes high-priority tasks first', async () => { + const values: number[] = []; + semaphore.runExclusive(() => { values.push(0) }, 2); + semaphore.runExclusive(() => { values.push(-1) }, 2, -1); + semaphore.runExclusive(() => { values.push(+1) }, 2, +1); + await clock.runAllAsync(); + assert.deepStrictEqual(values, [0, +1, -1]); + }); + test('new semaphore is unlocked', () => { assert(!semaphore.isLocked()); }); @@ -290,8 +382,8 @@ export const semaphoreSuite = (factory: (maxConcurrency: number, err?: Error) => test('setValue works fine with isolated weights', async () => { let flag = false; - semaphore.acquire(8); semaphore.acquire(4).then(() => (flag = true)); + semaphore.acquire(8); semaphore.setValue(4); await clock.tickAsync(1); @@ -441,6 +533,40 @@ export const semaphoreSuite = (factory: (maxConcurrency: number, err?: Error) => assert.deepStrictEqual([flag1, flag2], [true, true]); }); + test('waitForUnlock unblocks only high-priority waiters immediately', async () => { + const calledBack: number[] = []; + semaphore.acquire(3, 1); // A big heavy waiting task + semaphore.waitForUnlock(1, 0).then(() => { calledBack.push(0); }); // Low priority + semaphore.waitForUnlock(1, 2).then(() => { calledBack.push(2); }); // High priority + semaphore.waitForUnlock(1, 1).then(() => { calledBack.push(1); }); // Queued behind the heavy task + await clock.runAllAsync(); + assert.deepStrictEqual(calledBack, [2]); + }); + + test('waitForUnlock unblocks waiters of descending priority as the queue drains', async () => { + let calledBack = false; + let release: SemaphoreInterface.Releaser; + + semaphore.acquire(2, 2).then(([, r]) => { release = r; }); + semaphore.acquire(2, 0).then(([, r]) => { setTimeout(r, 100); }); + + semaphore.waitForUnlock(2, 1).then(() => { calledBack = true; }); + + await clock.tickAsync(0); + assert.strictEqual(calledBack, false); + release!(); + await clock.tickAsync(0); + assert.strictEqual(calledBack, true); + await clock.runAllAsync(); + }); + + test('waitForUnlock resolves immediately when the queue is empty', async () => { + let calledBack = false; + semaphore.waitForUnlock(1).then(() => { calledBack = true; }); + await clock.tickAsync(0); + assert.strictEqual(calledBack, true); + }); + test('waitForUnlock only unblocks when the semaphore can actually be acquired again', async () => { semaphore.acquire(2); semaphore.acquire(2);