dav: improve pipelining

Signed-off-by: Varun Patil <radialapps@gmail.com>
pull/783/head
Varun Patil 2023-08-17 14:04:25 -07:00
parent 9fe1c02076
commit 6c430d54f9
1 changed files with 34 additions and 5 deletions

View File

@ -150,14 +150,43 @@ async function getFilesInternal(fileIds: number[]): Promise<IFileInfo[]> {
* Run promises in parallel, but only n at a time * Run promises in parallel, but only n at a time
* @param promises Array of promise generator funnction (async functions) * @param promises Array of promise generator funnction (async functions)
* @param n Number of promises to run in parallel * @param n Number of promises to run in parallel
* @details Each promise returned MUST resolve and not throw an error
* @returns Generator of lists of results. Each list is of length n.
*/ */
export async function* runInParallel<T>(promises: (() => Promise<T>)[], n: number) { export async function* runInParallel<T>(promises: (() => Promise<T>)[], n: number) {
while (promises.length > 0) { promises.reverse(); // reverse so we can use pop() efficiently
const promisesToRun = promises.splice(0, n);
const resultsForThisBatch = await Promise.all(promisesToRun.map((p) => p())); const results: T[] = [];
yield resultsForThisBatch; const running: Promise<void>[] = [];
while (true) {
// add one promise per iteration
if (promises.length) {
let task!: Promise<void>;
running.push(
(task = (async () => {
// run the promise
results.push(await promises.pop()!());
// remove the promise from the running list
running.splice(running.indexOf(task), 1);
})())
);
}
// wait for one of the promises to finish
if (running.length >= n || !promises.length) {
await Promise.race(running);
}
// yield the results if the threshold is reached
if (results.length >= n || !running.length) {
yield results.splice(0, results.length);
}
// stop when all promises are done
if (!running.length) break;
} }
return;
} }
/** /**