From ef671f9b1148114e6ae5d5dc7eae1cec6afa4cdf Mon Sep 17 00:00:00 2001 From: Dimitri Glazkov Date: Wed, 8 Nov 2023 16:06:11 -0800 Subject: [PATCH] [breadboard] Start teaching Breadboard about streams. --- seeds/breadboard/src/index.ts | 1 + seeds/breadboard/src/stream.ts | 48 +++++++++++++++++++++++ seeds/breadboard/src/ui/output.ts | 21 ++++++++++ seeds/breadboard/src/worker/controller.ts | 8 +++- seeds/llm-starter/src/nodes/fetch.ts | 28 +++++++++---- 5 files changed, 97 insertions(+), 9 deletions(-) create mode 100644 seeds/breadboard/src/stream.ts diff --git a/seeds/breadboard/src/index.ts b/seeds/breadboard/src/index.ts index bd79eaa4..e2b15b64 100644 --- a/seeds/breadboard/src/index.ts +++ b/seeds/breadboard/src/index.ts @@ -56,3 +56,4 @@ export { toMermaid } from "./mermaid.js"; export type { Schema } from "jsonschema"; export { callHandler } from "./handler.js"; export { asRuntimeKit } from "./kits/ctors.js"; +export { StreamCapability } from "./stream.js"; diff --git a/seeds/breadboard/src/stream.ts b/seeds/breadboard/src/stream.ts new file mode 100644 index 00000000..02a9127e --- /dev/null +++ b/seeds/breadboard/src/stream.ts @@ -0,0 +1,48 @@ +/** + * @license + * Copyright 2023 Google LLC + * SPDX-License-Identifier: Apache-2.0 + */ + +import type { Capability, NodeValue } from "./types.js"; + +const STREAM_KIND = "stream" as const; + +export interface StreamCapabilityType extends Capability { + kind: typeof STREAM_KIND; + stream: ReadableStream; +} + +export class StreamCapability + implements StreamCapabilityType +{ + kind = STREAM_KIND; + stream: ReadableStream; + + constructor(stream: ReadableStream) { + this.stream = stream; + } +} + +const findStreams = (value: NodeValue, foundStreams: ReadableStream[]) => { + if (Array.isArray(value)) { + value.forEach((item: NodeValue) => { + findStreams(item, foundStreams); + }); + } else if (typeof value === "object") { + const maybeCapability = value as StreamCapabilityType; + if (maybeCapability.kind && maybeCapability.kind === STREAM_KIND) { + foundStreams.push(maybeCapability.stream); + } else { + Object.values(value as object).forEach((item) => { + findStreams(item, foundStreams); + }); + } + } +}; + +export const getStreams = (value: NodeValue) => { + const foundStreams: ReadableStream[] = []; + findStreams(value, foundStreams); + return foundStreams; +}; diff --git a/seeds/breadboard/src/ui/output.ts b/seeds/breadboard/src/ui/output.ts index dcc7861d..340580f2 100644 --- a/seeds/breadboard/src/ui/output.ts +++ b/seeds/breadboard/src/ui/output.ts @@ -5,6 +5,7 @@ */ import { type Schema } from "jsonschema"; +import { StreamCapabilityType } from "../stream.js"; export type OutputArgs = Record & { schema: Schema; @@ -30,9 +31,29 @@ export class Output extends HTMLElement { return; } Object.entries(schema.properties).forEach(([key, property]) => { + if (property.type === "object" && property.format === "stream") { + this.appendStream( + property, + (values[key] as StreamCapabilityType).stream + ); + return; + } const html = document.createElement("pre"); html.innerHTML = `${values[key]}`; root.append(`${property.title}: `, html, "\n"); }); } + + appendStream(property: Schema, stream: ReadableStream) { + const root = this.shadowRoot; + if (!root) return; + root.append(`${property.title}: `); + stream.pipeThrough(new TextDecoderStream()).pipeTo( + new WritableStream({ + write(chunk) { + root.append(chunk); + }, + }) + ); + } } diff --git a/seeds/breadboard/src/worker/controller.ts b/seeds/breadboard/src/worker/controller.ts index 62df18a3..3efcd941 100644 --- a/seeds/breadboard/src/worker/controller.ts +++ b/seeds/breadboard/src/worker/controller.ts @@ -4,6 +4,8 @@ * SPDX-License-Identifier: Apache-2.0 */ +import { getStreams } from "../stream.js"; +import { InputValues } from "../types.js"; import { type ControllerMessage, type RoundTripControllerMessage, @@ -40,11 +42,13 @@ export class WorkerTransport implements MessageControllerTransport { } sendRoundTripMessage(message: T) { - this.worker.postMessage(message); + const streams = getStreams(message.data as InputValues); + this.worker.postMessage(message, streams); } sendMessage(message: T) { - this.worker.postMessage(message); + const streams = getStreams(message.data as InputValues); + this.worker.postMessage(message, streams); } #onMessage(e: MessageEvent) { diff --git a/seeds/llm-starter/src/nodes/fetch.ts b/seeds/llm-starter/src/nodes/fetch.ts index 13960801..deaaed9e 100644 --- a/seeds/llm-starter/src/nodes/fetch.ts +++ b/seeds/llm-starter/src/nodes/fetch.ts @@ -4,10 +4,11 @@ * SPDX-License-Identifier: Apache-2.0 */ -import type { - InputValues, - NodeDescriberFunction, - NodeHandler, +import { + StreamCapability, + type InputValues, + type NodeDescriberFunction, + type NodeHandler, } from "@google-labs/breadboard"; export type FetchOutputs = { @@ -32,9 +33,14 @@ export type FetchInputs = { */ body?: string; /** - * Whether or not to return raw text (as opposed to parsing JSON) + * Whether or not to return raw text (as opposed to parsing JSON). Has no + * effect when `stream` is true. */ raw?: boolean; + /** + * Whether or not to return a stream + */ + stream?: boolean; }; export const fetchDescriber: NodeDescriberFunction = async () => { @@ -98,6 +104,7 @@ export default { body, headers = {}, raw, + stream, } = inputs as FetchInputs; if (!url) throw new Error("Fetch requires `url` input"); const init: RequestInit = { @@ -109,7 +116,14 @@ export default { init.body = JSON.stringify(body); } const data = await fetch(url, init); - const response = raw ? await data.text() : await data.json(); - return { response }; + if (stream) { + if (!data.body) { + throw new Error("Response is not streamable."); + } + return { response: new StreamCapability(data.body) }; + } else { + const response = raw ? await data.text() : await data.json(); + return { response }; + } }, } satisfies NodeHandler;