-
Notifications
You must be signed in to change notification settings - Fork 64
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add priority queueing for Mutex and Semaphore #75
Changes from 19 commits
b64a203
cdb2725
f40a465
f50ee83
3e34bcc
be76be2
fd097a7
b1a38fc
87f7238
2b11b72
90d1092
aeeae18
8711a6d
7aacdc7
6a55acb
c546c3f
2d13278
981554f
4350d80
285229f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,27 +1,46 @@ | ||
import { E_CANCELED } from './errors'; | ||
import SemaphoreInterface from './SemaphoreInterface'; | ||
|
||
|
||
interface Priority { | ||
priority: number; | ||
} | ||
|
||
interface QueueEntry { | ||
resolve(result: [number, SemaphoreInterface.Releaser]): void; | ||
reject(error: unknown): void; | ||
weight: number; | ||
priority: number; | ||
} | ||
|
||
interface Waiter { | ||
resolve(): void; | ||
priority: number; | ||
} | ||
|
||
class Semaphore implements SemaphoreInterface { | ||
constructor(private _value: number, private _cancelError: Error = E_CANCELED) {} | ||
|
||
acquire(weight = 1): Promise<[number, SemaphoreInterface.Releaser]> { | ||
acquire(weight = 1, priority = 0): Promise<[number, SemaphoreInterface.Releaser]> { | ||
if (weight <= 0) throw new Error(`invalid weight ${weight}: must be positive`); | ||
|
||
return new Promise((resolve, reject) => { | ||
if (!this._weightedQueues[weight - 1]) this._weightedQueues[weight - 1] = []; | ||
this._weightedQueues[weight - 1].push({ resolve, reject }); | ||
|
||
this._dispatch(); | ||
const task: QueueEntry = { resolve, reject, weight, priority }; | ||
const i = findIndexFromEnd(this._queue, (other) => priority <= other.priority); | ||
if (i === -1 && weight <= this._value) { | ||
// Needs immediate dispatch, skip the queue | ||
this._dispatchItem(task); | ||
} else if (i === -1) { | ||
this._queue.splice(0, 0, task); | ||
} else { | ||
this._queue.splice(i + 1, 0, task); | ||
} | ||
this._dispatchQueue(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I wonder whether we actually still need the call to |
||
}); | ||
} | ||
|
||
async runExclusive<T>(callback: SemaphoreInterface.Worker<T>, weight = 1): Promise<T> { | ||
const [value, release] = await this.acquire(weight); | ||
async runExclusive<T>(callback: SemaphoreInterface.Worker<T>, weight = 1, priority = 0): Promise<T> { | ||
const [value, release] = await this.acquire(weight, priority); | ||
|
||
try { | ||
return await callback(value); | ||
|
@@ -30,15 +49,18 @@ class Semaphore implements SemaphoreInterface { | |
} | ||
} | ||
|
||
waitForUnlock(weight = 1): Promise<void> { | ||
waitForUnlock(weight = 1, priority = 0): Promise<void> { | ||
if (weight <= 0) throw new Error(`invalid weight ${weight}: must be positive`); | ||
|
||
return new Promise((resolve) => { | ||
if (!this._weightedWaiters[weight - 1]) this._weightedWaiters[weight - 1] = []; | ||
this._weightedWaiters[weight - 1].push(resolve); | ||
|
||
this._dispatch(); | ||
}); | ||
if (this._couldLockImmediately(weight, priority)) { | ||
return Promise.resolve(); | ||
} else { | ||
return new Promise((resolve) => { | ||
if (!this._weightedWaiters[weight - 1]) this._weightedWaiters[weight - 1] = []; | ||
insertSorted(this._weightedWaiters[weight - 1], { resolve, priority }); | ||
this._dispatchQueue(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Again, I am not really sure whether we need this anymore, I think that case is already handled by the first branch. |
||
}); | ||
} | ||
} | ||
|
||
isLocked(): boolean { | ||
|
@@ -51,36 +73,33 @@ class Semaphore implements SemaphoreInterface { | |
|
||
setValue(value: number): void { | ||
this._value = value; | ||
this._dispatch(); | ||
this._dispatchQueue(); | ||
} | ||
|
||
release(weight = 1): void { | ||
if (weight <= 0) throw new Error(`invalid weight ${weight}: must be positive`); | ||
|
||
this._value += weight; | ||
this._dispatch(); | ||
this._dispatchQueue(); | ||
} | ||
|
||
cancel(): void { | ||
this._weightedQueues.forEach((queue) => queue.forEach((entry) => entry.reject(this._cancelError))); | ||
this._weightedQueues = []; | ||
this._queue.forEach((entry) => entry.reject(this._cancelError)); | ||
this._queue = []; | ||
} | ||
|
||
private _dispatch(): void { | ||
for (let weight = this._value; weight > 0; weight--) { | ||
const queueEntry = this._weightedQueues[weight - 1]?.shift(); | ||
if (!queueEntry) continue; | ||
|
||
const previousValue = this._value; | ||
const previousWeight = weight; | ||
|
||
this._value -= weight; | ||
weight = this._value + 1; | ||
|
||
queueEntry.resolve([previousValue, this._newReleaser(previousWeight)]); | ||
private _dispatchQueue(): void { | ||
this._drainUnlockWaiters(); | ||
while (this._queue.length > 0 && this._queue[0].weight <= this._value) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This loop will stop scanning the queue if the next item has a weight that exceeds the current value, even if there are other items further up in the queue that could be scheduled. You need to keep scanning through the whole queue. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That would allow light low-priority items to crowd out heavier high-priority items. The queue could not guarantee eventual completion of a high-priority item. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hm, you have a point there. I still don't like that a single heavy task can forever block all lower priority tasks, but I don't see a good way out either. Let's leave it like that. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Opened a separate pull request with a test to clarify this issue. |
||
this._dispatchItem(this._queue.shift()!); | ||
this._drainUnlockWaiters(); | ||
} | ||
} | ||
|
||
this._drainUnlockWaiters(); | ||
private _dispatchItem(item: QueueEntry): void { | ||
const previousValue = this._value; | ||
this._value -= item.weight; | ||
item.resolve([previousValue, this._newReleaser(item.weight)]); | ||
} | ||
|
||
private _newReleaser(weight: number): () => void { | ||
|
@@ -95,16 +114,50 @@ class Semaphore implements SemaphoreInterface { | |
} | ||
|
||
private _drainUnlockWaiters(): void { | ||
for (let weight = this._value; weight > 0; weight--) { | ||
if (!this._weightedWaiters[weight - 1]) continue; | ||
|
||
this._weightedWaiters[weight - 1].forEach((waiter) => waiter()); | ||
this._weightedWaiters[weight - 1] = []; | ||
if (this._queue.length === 0) { | ||
for (let weight = this._value; weight > 0; weight--) { | ||
const waiters = this._weightedWaiters[weight - 1]; | ||
if (!waiters) continue; | ||
waiters.forEach((waiter) => waiter.resolve()); | ||
this._weightedWaiters[weight - 1] = []; | ||
} | ||
} else { | ||
const queuedPriority = this._queue[0].priority; | ||
for (let weight = this._value; weight > 0; weight--) { | ||
const waiters = this._weightedWaiters[weight - 1]; | ||
if (!waiters) continue; | ||
const i = waiters.findIndex((waiter) => waiter.priority <= queuedPriority); | ||
DirtyHairy marked this conversation as resolved.
Show resolved
Hide resolved
|
||
(i === -1 ? waiters : waiters.splice(0, i)) | ||
.forEach((waiter => waiter.resolve())); | ||
} | ||
} | ||
} | ||
|
||
private _weightedQueues: Array<Array<QueueEntry>> = []; | ||
private _weightedWaiters: Array<Array<() => void>> = []; | ||
private _couldLockImmediately(weight: number, priority: number) { | ||
return (this._queue.length === 0 || this._queue[0].priority < priority) && | ||
weight <= this._value; | ||
} | ||
|
||
private _queue: Array<QueueEntry> = []; | ||
private _weightedWaiters: Array<Array<Waiter>> = []; | ||
} | ||
|
||
function insertSorted<T extends Priority>(a: T[], v: T) { | ||
const i = findIndexFromEnd(a, (other) => v.priority <= other.priority); | ||
if (i === -1) { | ||
a.splice(0, 0, v); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Again, I think the case is redundant, as -1 + 1 = 0 |
||
} else { | ||
a.splice(i + 1, 0, v); | ||
} | ||
} | ||
|
||
function findIndexFromEnd<T>(a: T[], predicate: (e: T) => boolean): number { | ||
for (let i = a.length - 1; i >= 0; i--) { | ||
if (predicate(a[i])) { | ||
return i; | ||
} | ||
} | ||
return -1; | ||
} | ||
|
||
export default Semaphore; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This and the next case could be merged, as -1 + 1 = 0