Skip to content

Commit

Permalink
feat: allow batching by bucket (#31)
Browse files Browse the repository at this point in the history
  • Loading branch information
andogq committed Jul 5, 2024
2 parents 338e1fc + 2370529 commit 4cb1473
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 18 deletions.
5 changes: 5 additions & 0 deletions .changeset/olive-jokes-admire.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"windpipe": patch
---

feat: allow batching by bucket
77 changes: 59 additions & 18 deletions src/stream/transforms.ts
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,7 @@ export class StreamTransforms<T, E> extends StreamConsumption<T, E> {
timeout?: number;
yieldRemaining?: boolean;
yieldEmpty?: boolean;
byBucket?: (value: T) => string | number;
}): Stream<T[], E> {
return this.consume(async function* (it) {
const atoms = it[Symbol.asyncIterator]();
Expand All @@ -439,12 +440,15 @@ export class StreamTransforms<T, E> extends StreamConsumption<T, E> {
let nextAtom = atoms.next();
let heartbeat = newHeartbeat();

const batch: T[] = [];
const batches: Record<string | number, T[]> = {};
let totalBatchSize = 0;

const batchFilter = options?.byBucket ?? (() => "default");

let end = false;

while (!end) {
if (batch.length > 1000) {
if (totalBatchSize > 1000) {
// Producer may not be I/O bound, yield to the event loop to give other things a
// chance to run.
await new Promise((resolve) => {
Expand All @@ -461,7 +465,24 @@ export class StreamTransforms<T, E> extends StreamConsumption<T, E> {

if (atom) {
if (isOk(atom)) {
const key = batchFilter(atom.value);

let batch = batches[key];

// Add the bucket if it doesn't exist
if (!batch) {
batch = batches[key] = [];
}

batch.push(atom.value);
totalBatchSize += 1;

// Batch was modified, see if it's ready to emit
if (options?.n && batch.length >= options.n) {
yield ok<T[], E>(batch.splice(0, options.n));

totalBatchSize -= options.n;
}
} else {
// Immediately yield any errors
yield atom;
Expand All @@ -477,34 +498,54 @@ export class StreamTransforms<T, E> extends StreamConsumption<T, E> {
}
}

if (
(result === "timeout" &&
"timeout" in options &&
(batch.length > 0 || options.yieldEmpty)) ||
(options?.n && batch.length >= options.n)
) {
const end = ("n" in options && options?.n) || batch.length;
yield ok<T[], E>(batch.splice(0, end));
if (result === "timeout" && "timeout" in options) {
if (totalBatchSize > 0) {
// Work out which batches are ready
const ready = Object.values(batches).filter(
(batch) => batch.length >= (options?.n ?? 1),
);

for (const batch of ready) {
const items = batch.splice(0, options?.n ?? -1);
yield ok<T[], E>(items);
totalBatchSize -= items.length;
}
} else if (totalBatchSize === 0 && options?.yieldEmpty) {
yield ok([]);
}
}

if (result === "timeout") {
heartbeat = newHeartbeat();
}
}

if ("timeout" in options && (batch.length > 0 || options.yieldEmpty)) {
if ("timeout" in options && (totalBatchSize > 0 || options.yieldEmpty)) {
// Wait for heartbeat to finish
await heartbeat;

// Yield the rest of the batch
yield ok(batch);
} else if (options?.n) {
while (batch.length >= options.n) {
yield ok(batch.splice(0, options.n));
if (totalBatchSize > 0) {
// Yield the rest of the batches
for (const batch of Object.values(batches)) {
if (batch.length === 0) {
continue;
}

yield ok(batch);
}
} else if (options.yieldEmpty) {
// Yield the final empty batch
yield ok([]);
}
} else if (options?.n && totalBatchSize > 0) {
for (const batch of Object.values(batches)) {
while (batch.length >= options.n) {
yield ok(batch.splice(0, options.n));
}

if (batch.length > 0 && options.yieldRemaining) {
yield ok(batch);
if (batch.length > 0 && options.yieldRemaining) {
yield ok(batch);
}
}
}
});
Expand Down
13 changes: 13 additions & 0 deletions test/transforms.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -416,5 +416,18 @@ describe("stream transforms", () => {
expect(mapper).toHaveBeenCalledTimes(2);
expect(mapper).toHaveBeenNthCalledWith(2, [4, 5]);
});

test("with bucket", async ({ expect }) => {
expect.assertions(1);

const s = await $.from([1, 2, 3, 4, 5, 6])
.batch({ n: 2, byBucket: (n) => (n % 2 === 0 ? "even" : "odd") })
.toArray();

expect(s).toEqual([
[1, 3],
[2, 4],
]);
});
});
});

0 comments on commit 4cb1473

Please sign in to comment.