You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

211 lines
5.5 KiB
TypeScript

import Emittery from "emittery";
import { promisify } from "util";
//needed to pass information on wether or not the promise has been aborted witout 'this'
class AbortableEmittery extends Emittery {
public aborted = false;
abort() {
this.aborted = true;
this.emit("abort");
}
}
export default class AbortablePromise<T> implements Promise<T | null> {
private emitter: AbortableEmittery;
private result_promise: Promise<T>;
private fulfilled = false;
private result: T;
constructor(
make_generator: (
await_or_abort: <L>(
promise: Promise<L> | AbortablePromise<L>,
on_abort?: () => Promise<unknown> | void
) => Promise<L>,
resolve?: Function
) => AsyncGenerator
) {
const emitter = new AbortableEmittery();
function await_or_abort<L = any>(promise: AbortablePromise<L>): Promise<L>;
function await_or_abort<L = any>(
promise: Promise<L> | AbortablePromise<L>,
on_abort?: () => Promise<unknown> | void
): Promise<L> {
console.log("await_or_abort", promise);
if (promise instanceof AbortablePromise && !on_abort) {
on_abort = async () => promise.abort();
}
if (on_abort === undefined) {
throw new Error(
"on_abort is required when the first argument is not AbortablePromise"
);
}
return new Promise((resolve, reject) => {
let resolved = false;
emitter.on("abort", () => {
if (!resolved) {
(on_abort as Function)();
resolve(null);
resolved = true;
}
});
promise
.then((result: L) => {
if (!resolved) {
resolve(result);
resolved = true;
}
})
.catch((err) => {
reject(err);
});
});
}
this.result_promise = new Promise(async (resolve, reject) => {
let step_value;
const generator = make_generator(await_or_abort);
const e = new Error();
do {
if (emitter.aborted) {
resolve(null);
return;
}
step_value = await generator.next();
} while (!step_value.done);
resolve(step_value.value);
this.fulfilled = true;
this.result = step_value.value;
});
this.emitter = emitter;
}
abort() {
this.emitter.abort();
}
then<TResult1 = T, TResult2 = never>(
onfulfilled?:
| ((value: T) => TResult1 | PromiseLike<TResult1>)
| undefined
| null,
onrejected?:
| ((reason: any) => TResult2 | PromiseLike<TResult2>)
| undefined
| null
): AbortablePromise<TResult1 | TResult2> {
const self = this;
return new AbortablePromise(async function* (await_or_abort) {
if (self.fulfilled) {
onfulfilled(self.result);
return;
}
const ret = await await_or_abort(self.result_promise, () => {
self.abort();
});
yield;
if (onfulfilled) {
onfulfilled(ret);
}
});
}
catch<TResult = never>(
onrejected?:
| ((reason: any) => TResult | PromiseLike<TResult>)
| undefined
| null
): AbortablePromise<T | TResult> {
const self = this;
return new AbortablePromise(async function* (await_or_abort) {
await await_or_abort(self.result_promise, () => {
self.abort();
}).catch(onrejected);
yield;
});
}
static deadlyRace<L>(
promises: AbortablePromise<L>[],
should_kill_others: (arg: L) => Promise<boolean> = async () => true
): Promise<L> {
return new AbortablePromise(async function* (await_or_abort) {
return await_or_abort(
new Promise((resolve, reject) => {
let resolved = false;
const callback = async (arg: L) => {
if (resolved) return;
if (arg === null) return;
resolved = await should_kill_others(arg);
if (!resolved) return;
for (const promise of promises) {
promise.abort();
}
resolve(arg);
};
for (const promise of promises) {
promise.then(callback);
}
}),
() => {
promises.forEach((promise) => promise.abort());
}
);
});
}
}
const sleep = promisify((timeout: number, callback: (...args: any[]) => void) =>
setTimeout(callback, timeout)
);
// const a = new AbortablePromise(async function* () {
// yield await sleep(1000);
// console.log("awaited 100");
// yield await sleep(2000);
// console.log("awaited 200");
// yield await sleep(3000);
// console.log("awaited 300");
// });
//
// const a = new AbortablePromise(async function* () {
// yield await sleep(1000);
// console.log("awaited 100");
// yield await sleep(2000);
// console.log("awaited 200");
// yield await sleep(3000);
// console.log("awaited 300");
// });
// function abortableSleep(ms: number): AbortablePromise<void> {
// return new AbortablePromise(async function* () {
// await sleep(ms);
// yield;
// console.log(`Slept ${ms}.`);
// });
// }
// const b = AbortablePromise.deadlyRace([
// abortableSleep(1000),
// abortableSleep(2000),
// abortableSleep(3000),
// ]);
// setTimeout(() => a.abort(), 1500);
// const b = new AbortablePromise(async function* (await_or_abort) {
// const ping = await deferedSpawn("ping", ["8.8.8.8"]);
// while (true) {
// console.log(
// await await_or_abort(ping.waitForNextData(), () => {
// ping.kill();
// })
// );
// yield;
// }
// });
// setTimeout(() => b.abort(), 5000);