Skip to content

Commit

Permalink
[breadboard] Start teaching Breadboard about streams.
Browse files Browse the repository at this point in the history
  • Loading branch information
dglazkov committed Nov 9, 2023
1 parent 4690878 commit ef671f9
Show file tree
Hide file tree
Showing 5 changed files with 97 additions and 9 deletions.
1 change: 1 addition & 0 deletions seeds/breadboard/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,3 +56,4 @@ export { toMermaid } from "./mermaid.js";
export type { Schema } from "jsonschema";
export { callHandler } from "./handler.js";
export { asRuntimeKit } from "./kits/ctors.js";
export { StreamCapability } from "./stream.js";
48 changes: 48 additions & 0 deletions seeds/breadboard/src/stream.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/**
* @license
* Copyright 2023 Google LLC
* SPDX-License-Identifier: Apache-2.0
*/

import type { Capability, NodeValue } from "./types.js";

const STREAM_KIND = "stream" as const;

export interface StreamCapabilityType<ChunkType = object> extends Capability {
kind: typeof STREAM_KIND;
stream: ReadableStream<ChunkType>;
}

export class StreamCapability<ChunkType>
implements StreamCapabilityType<ChunkType>
{
kind = STREAM_KIND;
stream: ReadableStream<ChunkType>;

constructor(stream: ReadableStream<ChunkType>) {
this.stream = stream;
}
}

const findStreams = (value: NodeValue, foundStreams: ReadableStream[]) => {
if (Array.isArray(value)) {
value.forEach((item: NodeValue) => {
findStreams(item, foundStreams);
});
} else if (typeof value === "object") {
const maybeCapability = value as StreamCapabilityType;
if (maybeCapability.kind && maybeCapability.kind === STREAM_KIND) {
foundStreams.push(maybeCapability.stream);
} else {
Object.values(value as object).forEach((item) => {
findStreams(item, foundStreams);
});
}
}
};

export const getStreams = (value: NodeValue) => {
const foundStreams: ReadableStream[] = [];
findStreams(value, foundStreams);
return foundStreams;
};
21 changes: 21 additions & 0 deletions seeds/breadboard/src/ui/output.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
*/

import { type Schema } from "jsonschema";
import { StreamCapabilityType } from "../stream.js";

export type OutputArgs = Record<string, unknown> & {
schema: Schema;
Expand All @@ -30,9 +31,29 @@ export class Output extends HTMLElement {
return;
}
Object.entries(schema.properties).forEach(([key, property]) => {
if (property.type === "object" && property.format === "stream") {
this.appendStream(
property,
(values[key] as StreamCapabilityType).stream
);
return;
}
const html = document.createElement("pre");
html.innerHTML = `${values[key]}`;
root.append(`${property.title}: `, html, "\n");
});
}

appendStream(property: Schema, stream: ReadableStream) {
const root = this.shadowRoot;
if (!root) return;
root.append(`${property.title}: `);
stream.pipeThrough(new TextDecoderStream()).pipeTo(
new WritableStream({
write(chunk) {
root.append(chunk);
},
})
);
}
}
8 changes: 6 additions & 2 deletions seeds/breadboard/src/worker/controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
* SPDX-License-Identifier: Apache-2.0
*/

import { getStreams } from "../stream.js";
import { InputValues } from "../types.js";
import {
type ControllerMessage,
type RoundTripControllerMessage,
Expand Down Expand Up @@ -40,11 +42,13 @@ export class WorkerTransport implements MessageControllerTransport {
}

sendRoundTripMessage<T extends RoundTripControllerMessage>(message: T) {
this.worker.postMessage(message);
const streams = getStreams(message.data as InputValues);
this.worker.postMessage(message, streams);
}

sendMessage<T extends ControllerMessage>(message: T) {
this.worker.postMessage(message);
const streams = getStreams(message.data as InputValues);
this.worker.postMessage(message, streams);
}

#onMessage(e: MessageEvent) {
Expand Down
28 changes: 21 additions & 7 deletions seeds/llm-starter/src/nodes/fetch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@
* SPDX-License-Identifier: Apache-2.0
*/

import type {
InputValues,
NodeDescriberFunction,
NodeHandler,
import {
StreamCapability,
type InputValues,
type NodeDescriberFunction,
type NodeHandler,
} from "@google-labs/breadboard";

export type FetchOutputs = {
Expand All @@ -32,9 +33,14 @@ export type FetchInputs = {
*/
body?: string;
/**
* Whether or not to return raw text (as opposed to parsing JSON)
* Whether or not to return raw text (as opposed to parsing JSON). Has no
* effect when `stream` is true.
*/
raw?: boolean;
/**
* Whether or not to return a stream
*/
stream?: boolean;
};

export const fetchDescriber: NodeDescriberFunction = async () => {
Expand Down Expand Up @@ -98,6 +104,7 @@ export default {
body,
headers = {},
raw,
stream,
} = inputs as FetchInputs;
if (!url) throw new Error("Fetch requires `url` input");
const init: RequestInit = {
Expand All @@ -109,7 +116,14 @@ export default {
init.body = JSON.stringify(body);
}
const data = await fetch(url, init);
const response = raw ? await data.text() : await data.json();
return { response };
if (stream) {
if (!data.body) {
throw new Error("Response is not streamable.");
}
return { response: new StreamCapability(data.body) };
} else {
const response = raw ? await data.text() : await data.json();
return { response };
}
},
} satisfies NodeHandler;

0 comments on commit ef671f9

Please sign in to comment.