Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add priority queueing for Mutex and Semaphore #75

Merged
merged 20 commits into from
Mar 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 15 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -291,10 +291,14 @@ the semaphore is released. `runExclusive` returns a promise that adopts the stat
The semaphore is released and the result rejected if an exception occurs during execution
of the callback.

`runExclusive` accepts an optional argument `weight`. Specifying a `weight` will decrement the
`runExclusive` accepts a first optional argument `weight`. Specifying a `weight` will decrement the
semaphore by the specified value, and the callback will only be invoked once the semaphore's
value greater or equal to `weight`.

`runExclusive` accepts a second optional argument `priority`. Specifying a greater value for `priority`
tells the scheduler to run this task before other tasks. `priority` can be any real number. The default
is zero.

### Manual locking / releasing

Promise style:
Expand Down Expand Up @@ -328,10 +332,14 @@ has completed. The `release` callback is idempotent.
likely deadlock the application. Make sure to call `release` under all circumstances
and handle exceptions accordingly.

`runExclusive` accepts an optional argument `weight`. Specifying a `weight` will decrement the
semaphore by the specified value, and the semaphore will only be acquired once the its
`acquire` accepts a first optional argument `weight`. Specifying a `weight` will decrement the
semaphore by the specified value, and the semaphore will only be acquired once its
value is greater or equal to `weight`.

`acquire` accepts a second optional argument `priority`. Specifying a greater value for `priority`
tells the scheduler to release the semaphore to the caller before other callers. `priority` can be
any real number. The default is zero.

### Unscoped release

As an alternative to calling the `release` callback returned by `acquire`, the semaphore
Expand Down Expand Up @@ -444,8 +452,10 @@ await semaphore.waitForUnlock();
// ...
```

`waitForUnlock` accepts an optional argument `weight`. If `weight` is specified the promise
will only resolve once the semaphore's value is greater or equal to `weight`;
`waitForUnlock` accepts optional arguments `weight` and `priority`. The promise will resolve as soon
as it is possible to `acquire` the semaphore with the given weight and priority. Scheduled tasks with
the greatest `priority` values execute first.


## Limiting the time waiting for a mutex or semaphore to become available

Expand Down
12 changes: 6 additions & 6 deletions src/Mutex.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,22 +6,22 @@ class Mutex implements MutexInterface {
this._semaphore = new Semaphore(1, cancelError);
}

async acquire(): Promise<MutexInterface.Releaser> {
const [, releaser] = await this._semaphore.acquire();
async acquire(priority = 0): Promise<MutexInterface.Releaser> {
const [, releaser] = await this._semaphore.acquire(1, priority);

return releaser;
}

runExclusive<T>(callback: MutexInterface.Worker<T>): Promise<T> {
return this._semaphore.runExclusive(() => callback());
runExclusive<T>(callback: MutexInterface.Worker<T>, priority = 0): Promise<T> {
return this._semaphore.runExclusive(() => callback(), 1, priority);
}

isLocked(): boolean {
return this._semaphore.isLocked();
}

waitForUnlock(): Promise<void> {
return this._semaphore.waitForUnlock();
waitForUnlock(priority = 0): Promise<void> {
return this._semaphore.waitForUnlock(1, priority);
}

release(): void {
Expand Down
6 changes: 3 additions & 3 deletions src/MutexInterface.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
interface MutexInterface {
acquire(): Promise<MutexInterface.Releaser>;
acquire(priority?: number): Promise<MutexInterface.Releaser>;

runExclusive<T>(callback: MutexInterface.Worker<T>): Promise<T>;
runExclusive<T>(callback: MutexInterface.Worker<T>, priority?: number): Promise<T>;

waitForUnlock(): Promise<void>;
waitForUnlock(priority?: number): Promise<void>;

isLocked(): boolean;

Expand Down
121 changes: 83 additions & 38 deletions src/Semaphore.ts
Original file line number Diff line number Diff line change
@@ -1,27 +1,43 @@
import { E_CANCELED } from './errors';
import SemaphoreInterface from './SemaphoreInterface';


interface Priority {
priority: number;
}

interface QueueEntry {
resolve(result: [number, SemaphoreInterface.Releaser]): void;
reject(error: unknown): void;
weight: number;
priority: number;
}

interface Waiter {
resolve(): void;
priority: number;
}

class Semaphore implements SemaphoreInterface {
constructor(private _value: number, private _cancelError: Error = E_CANCELED) {}

acquire(weight = 1): Promise<[number, SemaphoreInterface.Releaser]> {
acquire(weight = 1, priority = 0): Promise<[number, SemaphoreInterface.Releaser]> {
if (weight <= 0) throw new Error(`invalid weight ${weight}: must be positive`);

return new Promise((resolve, reject) => {
if (!this._weightedQueues[weight - 1]) this._weightedQueues[weight - 1] = [];
this._weightedQueues[weight - 1].push({ resolve, reject });

this._dispatch();
const task: QueueEntry = { resolve, reject, weight, priority };
const i = findIndexFromEnd(this._queue, (other) => priority <= other.priority);
if (i === -1 && weight <= this._value) {
// Needs immediate dispatch, skip the queue
this._dispatchItem(task);
} else {
this._queue.splice(i + 1, 0, task);
}
});
}

async runExclusive<T>(callback: SemaphoreInterface.Worker<T>, weight = 1): Promise<T> {
const [value, release] = await this.acquire(weight);
async runExclusive<T>(callback: SemaphoreInterface.Worker<T>, weight = 1, priority = 0): Promise<T> {
const [value, release] = await this.acquire(weight, priority);

try {
return await callback(value);
Expand All @@ -30,15 +46,17 @@ class Semaphore implements SemaphoreInterface {
}
}

waitForUnlock(weight = 1): Promise<void> {
waitForUnlock(weight = 1, priority = 0): Promise<void> {
if (weight <= 0) throw new Error(`invalid weight ${weight}: must be positive`);

return new Promise((resolve) => {
if (!this._weightedWaiters[weight - 1]) this._weightedWaiters[weight - 1] = [];
this._weightedWaiters[weight - 1].push(resolve);

this._dispatch();
});
if (this._couldLockImmediately(weight, priority)) {
return Promise.resolve();
} else {
return new Promise((resolve) => {
if (!this._weightedWaiters[weight - 1]) this._weightedWaiters[weight - 1] = [];
insertSorted(this._weightedWaiters[weight - 1], { resolve, priority });
});
}
}

isLocked(): boolean {
Expand All @@ -51,36 +69,33 @@ class Semaphore implements SemaphoreInterface {

setValue(value: number): void {
this._value = value;
this._dispatch();
this._dispatchQueue();
}

release(weight = 1): void {
if (weight <= 0) throw new Error(`invalid weight ${weight}: must be positive`);

this._value += weight;
this._dispatch();
this._dispatchQueue();
}

cancel(): void {
this._weightedQueues.forEach((queue) => queue.forEach((entry) => entry.reject(this._cancelError)));
this._weightedQueues = [];
this._queue.forEach((entry) => entry.reject(this._cancelError));
this._queue = [];
}

private _dispatch(): void {
for (let weight = this._value; weight > 0; weight--) {
const queueEntry = this._weightedQueues[weight - 1]?.shift();
if (!queueEntry) continue;

const previousValue = this._value;
const previousWeight = weight;

this._value -= weight;
weight = this._value + 1;

queueEntry.resolve([previousValue, this._newReleaser(previousWeight)]);
private _dispatchQueue(): void {
this._drainUnlockWaiters();
while (this._queue.length > 0 && this._queue[0].weight <= this._value) {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This loop will stop scanning the queue if the next item has a weight that exceeds the current value, even if there are other items further up in the queue that could be scheduled. You need to keep scanning through the whole queue.

Copy link
Contributor Author

@dmurvihill dmurvihill Feb 1, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would allow light low-priority items to crowd out heavier high-priority items. The queue could not guarantee eventual completion of a high-priority item.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, you have a point there. I still don't like that a single heavy task can forever block all lower priority tasks, but I don't see a good way out either. Let's leave it like that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Opened a separate pull request with a test to clarify this issue.

this._dispatchItem(this._queue.shift()!);
this._drainUnlockWaiters();
}
}

this._drainUnlockWaiters();
private _dispatchItem(item: QueueEntry): void {
const previousValue = this._value;
this._value -= item.weight;
item.resolve([previousValue, this._newReleaser(item.weight)]);
}

private _newReleaser(weight: number): () => void {
Expand All @@ -95,16 +110,46 @@ class Semaphore implements SemaphoreInterface {
}

private _drainUnlockWaiters(): void {
for (let weight = this._value; weight > 0; weight--) {
if (!this._weightedWaiters[weight - 1]) continue;

this._weightedWaiters[weight - 1].forEach((waiter) => waiter());
this._weightedWaiters[weight - 1] = [];
if (this._queue.length === 0) {
for (let weight = this._value; weight > 0; weight--) {
const waiters = this._weightedWaiters[weight - 1];
if (!waiters) continue;
waiters.forEach((waiter) => waiter.resolve());
this._weightedWaiters[weight - 1] = [];
}
} else {
const queuedPriority = this._queue[0].priority;
for (let weight = this._value; weight > 0; weight--) {
const waiters = this._weightedWaiters[weight - 1];
if (!waiters) continue;
const i = waiters.findIndex((waiter) => waiter.priority <= queuedPriority);
DirtyHairy marked this conversation as resolved.
Show resolved Hide resolved
(i === -1 ? waiters : waiters.splice(0, i))
.forEach((waiter => waiter.resolve()));
}
}
}

private _weightedQueues: Array<Array<QueueEntry>> = [];
private _weightedWaiters: Array<Array<() => void>> = [];
private _couldLockImmediately(weight: number, priority: number) {
return (this._queue.length === 0 || this._queue[0].priority < priority) &&
weight <= this._value;
}

private _queue: Array<QueueEntry> = [];
private _weightedWaiters: Array<Array<Waiter>> = [];
}

function insertSorted<T extends Priority>(a: T[], v: T) {
const i = findIndexFromEnd(a, (other) => v.priority <= other.priority);
a.splice(i + 1, 0, v);
}

function findIndexFromEnd<T>(a: T[], predicate: (e: T) => boolean): number {
for (let i = a.length - 1; i >= 0; i--) {
if (predicate(a[i])) {
return i;
}
}
return -1;
}

export default Semaphore;
6 changes: 3 additions & 3 deletions src/SemaphoreInterface.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
interface SemaphoreInterface {
acquire(weight?: number): Promise<[number, SemaphoreInterface.Releaser]>;
acquire(weight?: number, priority?: number): Promise<[number, SemaphoreInterface.Releaser]>;

runExclusive<T>(callback: SemaphoreInterface.Worker<T>, weight?: number): Promise<T>;
runExclusive<T>(callback: SemaphoreInterface.Worker<T>, weight?: number, priority?: number): Promise<T>;

waitForUnlock(weight?: number): Promise<void>;
waitForUnlock(weight?: number, priority?: number): Promise<void>;

isLocked(): boolean;

Expand Down
41 changes: 32 additions & 9 deletions src/withTimeout.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,14 @@ export function withTimeout(mutex: MutexInterface, timeout: number, timeoutError
export function withTimeout(semaphore: SemaphoreInterface, timeout: number, timeoutError?: Error): SemaphoreInterface;
export function withTimeout(sync: MutexInterface | SemaphoreInterface, timeout: number, timeoutError = E_TIMEOUT): any {
return {
acquire: (weight?: number): Promise<MutexInterface.Releaser | [number, SemaphoreInterface.Releaser]> => {
acquire: (weightOrPriority?: number, priority?: number): Promise<MutexInterface.Releaser | [number, SemaphoreInterface.Releaser]> => {
let weight: number | undefined;
if (isSemaphore(sync)) {
weight = weightOrPriority;
} else {
weight = undefined;
priority = weightOrPriority;
}
if (weight !== undefined && weight <= 0) {
throw new Error(`invalid weight ${weight}: must be positive`);
}
Expand All @@ -21,8 +28,10 @@ export function withTimeout(sync: MutexInterface | SemaphoreInterface, timeout:
}, timeout);

try {
const ticket = await sync.acquire(weight);

const ticket = await (isSemaphore(sync)
? sync.acquire(weight, priority)
: sync.acquire(priority)
);
if (isTimeout) {
const release = Array.isArray(ticket) ? ticket[1] : ticket;

Expand All @@ -41,11 +50,11 @@ export function withTimeout(sync: MutexInterface | SemaphoreInterface, timeout:
});
},

async runExclusive<T>(callback: (value?: number) => Promise<T> | T, weight?: number): Promise<T> {
async runExclusive<T>(callback: (value?: number) => Promise<T> | T, weight?: number, priority?: number): Promise<T> {
let release: () => void = () => undefined;

try {
const ticket = await this.acquire(weight);
const ticket = await this.acquire(weight, priority);

if (Array.isArray(ticket)) {
release = ticket[1];
Expand All @@ -69,16 +78,26 @@ export function withTimeout(sync: MutexInterface | SemaphoreInterface, timeout:
return sync.cancel();
},

waitForUnlock: (weight?: number): Promise<void> => {
waitForUnlock: (weightOrPriority?: number, priority?: number): Promise<void> => {
let weight: number | undefined;
if (isSemaphore(sync)) {
weight = weightOrPriority;
} else {
weight = undefined;
priority = weightOrPriority;
}
if (weight !== undefined && weight <= 0) {
throw new Error(`invalid weight ${weight}: must be positive`);
}

return new Promise((resolve, reject) => {
const handle = setTimeout(() => reject(timeoutError), timeout);
sync.waitForUnlock(weight).then(() => {
clearTimeout(handle);
resolve();
(isSemaphore(sync)
? sync.waitForUnlock(weight, priority)
: sync.waitForUnlock(priority)
).then(() => {
clearTimeout(handle);
resolve();
});
});
},
Expand All @@ -90,3 +109,7 @@ export function withTimeout(sync: MutexInterface | SemaphoreInterface, timeout:
setValue: (value: number) => (sync as SemaphoreInterface).setValue(value),
};
}

function isSemaphore(sync: SemaphoreInterface | MutexInterface): sync is SemaphoreInterface {
return (sync as SemaphoreInterface).getValue !== undefined;
}
Loading
Loading