Advanced TypeScript for Harness Builders
15. Generic Classes with `AsyncIterable`
Build a push-based event stream that works with for await...of.
Building a push-based event stream that implements AsyncIterable for for await...of consumption.
File: packages/ai/src/utils/event-stream.ts L4-66
export class EventStream<T, R = T> implements AsyncIterable<T> {
private queue: T[] = [];
private waiting: ((value: IteratorResult<T>) => void)[] = [];
private done = false;
private finalResultPromise: Promise<R>;
private resolveFinalResult!: (result: R) => void;
constructor(
private isComplete: (event: T) => boolean,
private extractResult: (event: T) => R
) {
/* ... */
}
push(event: T): void {
/* ... */
}
async *[Symbol.asyncIterator](): AsyncIterator<T> {
while (true) {
if (this.queue.length > 0) {
yield this.queue.shift()!;
} else if (this.done) {
return;
} else {
const result = await new Promise<IteratorResult<T>>((resolve) => this.waiting.push(resolve));
if (result.done) return;
yield result.value;
}
}
}
result(): Promise<R> {
return this.finalResultPromise;
}
}
Key patterns:
- Two type parameters:
Tfor events,Rfor the final result -
Symbol.asyncIterator: makes the class work withfor await...of -
async *: generator syntax inside a class method - Non-null assertion
!:this.queue.shift()!when we’ve checkedlength > 0
Specialized subclass:
File: packages/ai/src/utils/event-stream.ts L68-82
export class AssistantMessageEventStream extends EventStream<AssistantMessageEvent, AssistantMessage> {
constructor() {
super(
(event) => event.type === "done" || event.type === "error",
(event) => {
if (event.type === "done") return event.message;
else if (event.type === "error") return event.error;
throw new Error("Unexpected event type for final result");
}
);
}
}