From 3b1019782ed332fdfc3e7ee23d0f0bfb497b5f36 Mon Sep 17 00:00:00 2001 From: Kevin Xiao Date: Tue, 17 Oct 2023 23:18:35 +0000 Subject: [PATCH 1/3] Add kit and loader to pythonboard --- seeds/pythonboard/requirements.txt | 3 +- seeds/pythonboard/src/breadboard.py | 70 ++++++++++++++++- seeds/pythonboard/src/kit.py | 34 +++++++++ seeds/pythonboard/src/loader.py | 113 ++++++++++++++++++++++++++++ seeds/pythonboard/src/main.py | 60 ++++++--------- 5 files changed, 238 insertions(+), 42 deletions(-) create mode 100644 seeds/pythonboard/src/kit.py create mode 100644 seeds/pythonboard/src/loader.py diff --git a/seeds/pythonboard/requirements.txt b/seeds/pythonboard/requirements.txt index 0d8b5884..769bceda 100644 --- a/seeds/pythonboard/requirements.txt +++ b/seeds/pythonboard/requirements.txt @@ -1,3 +1,4 @@ javascript aiofiles -dataclasses_json \ No newline at end of file +dataclasses_json +aiohttp \ No newline at end of file diff --git a/seeds/pythonboard/src/breadboard.py b/seeds/pythonboard/src/breadboard.py index 787b1e9a..653b3ca2 100644 --- a/seeds/pythonboard/src/breadboard.py +++ b/seeds/pythonboard/src/breadboard.py @@ -1,3 +1,4 @@ +import os from typing import Any, Dict, List, Optional, Self from traversal.traversal_types import ( Edge, @@ -9,6 +10,8 @@ GraphMetadata, ) #from node import Node +from loader import BoardLoader +from kit import KitLoader from breadboard_types import ( Breadboard, BreadboardSlotSpec, @@ -37,8 +40,6 @@ def addHandler(self, type: str, handler: NodeHandler): @dataclass class Board(GraphDescriptor): - #edges: List[Edge] - #nodes: List[NodeDescriptor] kits: List[Kit] = field(default_factory=list) ctr: List[Kit] = field(default_factory=list) handlers: Dict[str, Any] = field(default_factory=dict) @@ -179,6 +180,71 @@ async def await_js(func, args): result.outputsPromise = outputsPromise + @staticmethod + def _loadGraph(graph: GraphDescriptor): + edges = [] + for edge in graph['edges']: + if 'from' in edge: + edge['previous'] = edge['from'] + edge.pop('from') + if 'to' in edge: + edge['next'] = edge['to'] + edge.pop('to') + if 'in' in edge: + edge['input'] = edge['in'] + edge.pop('in') + edges.append(Edge(**edge)) + edges = edges + nodes = [NodeDescriptor(**node) for node in graph['nodes']] + return edges, nodes + + @staticmethod + async def fromGraphDescriptor(graph: GraphDescriptor) -> Self: + """Creates a new board from JSON. If you have a serialized board, you can + use this method to turn it into into a new Board instance. + + @param graph - the JSON representation of the board. + @returns - a new `Board` instance. + """ + edges, nodes = Board._loadGraph(graph) + breadboard = Board( + edges=edges, + nodes=nodes, + url=graph["url"], + title=graph["title"], + description=graph["description"], + version=graph["version"], + ) + loader = KitLoader(graph["kits"]) + kits = await loader.load() + for kit in kits: + breadboard.addKit(kit) + return breadboard + + @staticmethod + async def load( + url: str, + slotted: Optional[BreadboardSlotSpec] = None, + base: Optional[str] = None, + outerGraph: Optional[GraphDescriptor] = None, + ) -> Self: + """Loads a board from a URL or a file path. + + @param url - the URL or a file path to the board. + @param slots - optional slots to provide to the board. + @returns - a new `Board` instance. + """ + loader = BoardLoader( + url=base or "file://" + os.getcwd() + "/", + graphs=outerGraph.graphs if outerGraph else None, + ) + graph, isSubgraph = await loader.load(url) + board = await Board.fromGraphDescriptor(graph) + if isSubgraph: + board._parent = outerGraph + board._slots = slotted or {} + return board + def handlersFromBoard( self, probe: Optional[EventListener] = None, diff --git a/seeds/pythonboard/src/kit.py b/seeds/pythonboard/src/kit.py new file mode 100644 index 00000000..92769c42 --- /dev/null +++ b/seeds/pythonboard/src/kit.py @@ -0,0 +1,34 @@ +from typing import List, Optional +from traversal.traversal_types import ( + KitDescriptor, NodeHandlers +) +from javascript import require +from breadboard_types import ( + Kit, + KitConstructor, + NodeFactory, +) + +def urlToNpmSpec(url: str) -> str: + if "npm:" not in url: + raise Exception('URL protocol must be "npm:"') + return url[4:] + +class KitLoader(): + _kits: List[KitDescriptor] + def __init__(self, kits: Optional[List[KitDescriptor]] = None): + self._kits = kits or [] + + async def load(self) -> KitConstructor: + kit_constructors = [] + for kit in self._kits: + # TODO: Support `using` property. + url = kit["url"] + # TODO: Support protocols other than `npm:`. + if (url == "."): + return None + spec = urlToNpmSpec(url) + kit_constructor = require(spec) + # TODO: Check to see if this import is actually a Kit class. + kit_constructors.append(kit_constructor) + return kit_constructors \ No newline at end of file diff --git a/seeds/pythonboard/src/loader.py b/seeds/pythonboard/src/loader.py new file mode 100644 index 00000000..925b72b5 --- /dev/null +++ b/seeds/pythonboard/src/loader.py @@ -0,0 +1,113 @@ +from enum import Enum +from dataclasses import dataclass +import json +import aiohttp +from urllib.parse import urlparse +import aiofiles +from typing import Awaitable, Dict, List, Optional, Tuple +from traversal.traversal_types import ( + GraphDescriptor, + SubGraphs, +) + +class BoardLoaderType(Enum): + FILE = "file" + FETCH = "fetch" + HASH = "hash" + UNKNOWN = "unknown" + +BoardLoaders = Dict[ + BoardLoaderType, + Awaitable[str], +] + +@dataclass +class ResolverResult(): + type: BoardLoaderType + location: str + href: str + +def resolveURL ( + base: str, + urlString: str, + results: List[ResolverResult], +) -> bool: + url = base + urlString + url_tuple = urlparse(url) + hash = url_tuple.fragment + href = url + path = url_tuple.netloc + url_tuple.path if url_tuple.scheme == "file" else None + baseWithoutHash = base.replace(urlparse(base).fragment, "") + hrefWithoutHash = href.replace(hash, "") + if baseWithoutHash == hrefWithoutHash and hash: + results.append(ResolverResult(type="hash", location=hash.substring(1), href=href)) + return True + if path: + result = ResolverResult(type="file", location=path, href=href) + elif href: + result = ResolverResult(type="fetch", location=hrefWithoutHash, href=href) + else: + result = ResolverResult(type="unknown", location="", href=href) + results.append(result) + return not hash + +async def loadFromFile(path: str): + async with aiofiles.open(path, 'r') as handle: + data = await handle.read() + return json.loads(data) + +async def loadWithFetch(url: str): + async with aiohttp.ClientSession() as session: + async with session.get(url) as resp: + return await resp.json() + +class BoardLoadingStep(): + loaders: BoardLoaders + graphs: Optional[SubGraphs] = None + + def __init__(self, graphs: Optional[SubGraphs] = None): + async def get_hash(hash: str): + if not graphs: + raise Exception("No sub-graphs to load from") + return graphs[hash] + async def get_unknown(_x): + raise Exception("Unable to determine Board loader type") + self.loaders = { + "file": loadFromFile, + "fetch": loadWithFetch, + "hash": get_hash, + "unknown": get_unknown, + } + + async def load(self, result: ResolverResult) -> GraphDescriptor: + graph = await self.loaders[result.type](result.location) + graph["url"] = result.href + return graph + +class BoardLoader(): + _base: str + _graphs: Optional[SubGraphs] = None + + def __init__(self, url: str, graphs: Optional[SubGraphs] = None): + self._base = url + self._graphs = graphs + + async def load(self, urlString: str) -> Tuple[GraphDescriptor, bool]: + results = [] + base = self._base + while not resolveURL(base, urlString, results): + base = results[results.length - 1].href + graph = None + subgraphs = self._graphs + isSubgraph = True + for result in results: + if result.type == "file" or result.type == "fetch": + isSubgraph = False + step = BoardLoadingStep(subgraphs) + graph = await step.load(result) + subgraphs = graph.get("graphs") + if not graph: + raise Exception( + "BoardLoader failed to load a graph. This error likely indicates a bug in the BoardLoader." + ) + return graph, isSubgraph diff --git a/seeds/pythonboard/src/main.py b/seeds/pythonboard/src/main.py index 207345d9..d10e8d0f 100644 --- a/seeds/pythonboard/src/main.py +++ b/seeds/pythonboard/src/main.py @@ -1,61 +1,43 @@ -from javascript import require, AsyncTask -import sys import asyncio +from javascript import require, AsyncTask import json -from traversal.traversal_types import Edge, NodeDescriptor -import breadboard as breadboard1 - - - -graph_path = sys.argv[1] -with open(graph_path) as f: - graph = json.load(f) -print(graph) -print(type(graph)) -edges = [] -for edge in graph['edges']: - if 'from' in edge: - edge['previous'] = edge['from'] - edge.pop('from') - if 'to' in edge: - edge['next'] = edge['to'] - edge.pop('to') - if 'in' in edge: - edge['input'] = edge['in'] - edge.pop('in') - edges.append(Edge(**edge)) -edges = edges -nodes = [NodeDescriptor(**node) for node in graph['nodes']] - +import sys +from breadboard import Board async def main(): + graph_path = sys.argv[1] + breadboard = await Board.load(graph_path) - breadboard = breadboard1.Board(edges=edges, nodes=nodes) - print(graph['kits']) - for kit in graph['kits']: - print(kit['url'][4:]) - kit_constructor = require(kit['url'][4:]) - breadboard.addKit(kit_constructor) - - print("Running") + print("Let's traverse a graph!") running = True try: - print("Running after try") async for next_step in breadboard.run(None): - print("stepping!") if not running: return res = next_step if res: if res.type == "input": - message = res.inputArguments['message'] if res.inputArguments and res.inputArguments.get('message') else "Enter some text." + # Message can be found in any(inputArgument["schema"]["properties"])["description"] + message = "Enter some text." + if res.inputArguments and res.inputArguments.get("schema", {}).get("properties"): + props = res.inputArguments["schema"]["properties"] + if len(props) > 0: + first_prop = next(iter(props.values())) + message = first_prop.get("description", message) res.inputs = {"text": input(message+ "\n")} elif res.type == "output": if res.outputs and res.outputs['text']: - print(str(res.outputs['text']) + "\n") + for key in res.outputs: + if key == "schema": + continue + title = "" if key == "text" else f"{key}: " + if type(res.outputs[key]) == str: + print(f"{title}{res.outputs[key]}") + else: + print(f"{title}{json.dumps(res.outputs[key])}") else: print(f"All done! {next_step}") running = False From 7ced9cc192369a1e9a2294f2418e9366832e81e1 Mon Sep 17 00:00:00 2001 From: Kevin Xiao Date: Tue, 31 Oct 2023 11:59:56 +0800 Subject: [PATCH 2/3] Support handler invoke/describe for pythonboard --- seeds/pythonboard/src/breadboard.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/seeds/pythonboard/src/breadboard.py b/seeds/pythonboard/src/breadboard.py index 653b3ca2..02478591 100644 --- a/seeds/pythonboard/src/breadboard.py +++ b/seeds/pythonboard/src/breadboard.py @@ -156,7 +156,10 @@ async def run( if type(handler) == javascript.proxy.Proxy: # This can possibly be wrapped in an AsyncTask instead of a trivial coroutine async def await_js(func, args): - res = func(args) + if func.invoke: + res = func.invoke(args) + else: + res = func(args) # Outputs can be a javascript proxy, so convert to dict try: res = {k: res[k] for k in res} From 4bace4392213b71261d8af2cb751c125f2bd003f Mon Sep 17 00:00:00 2001 From: Kevin Xiao Date: Tue, 31 Oct 2023 12:01:32 +0800 Subject: [PATCH 3/3] Re-raise python exceptions in pythonboard for more information --- seeds/pythonboard/src/traversal/iterator.py | 26 ++++++++++++++------- 1 file changed, 18 insertions(+), 8 deletions(-) diff --git a/seeds/pythonboard/src/traversal/iterator.py b/seeds/pythonboard/src/traversal/iterator.py index c9c82d4a..cd2b0685 100644 --- a/seeds/pythonboard/src/traversal/iterator.py +++ b/seeds/pythonboard/src/traversal/iterator.py @@ -45,13 +45,22 @@ def _processCompletedNode( if outputs.get("$error") and not any(e.out == "$error" for e in newOpportunities): # If the node threw an exception and it wasn't routed via $error, # throw it again. This will cause the traversal to stop. - raise Exception( - "Uncaught exception in node handler. " + - "Catch by wiring up the $error output.", - { - "cause": outputs["$error"], - } - ) + if isinstance(outputs.get("$error").error, Exception): + raise Exception( + "Uncaught exception in node handler. " + + "Catch by wiring up the $error output.", + { + "cause": outputs["$error"], + } + ) from outputs.get("$error").error + else: + raise Exception( + "Uncaught exception in node handler. " + + "Catch by wiring up the $error output.", + { + "cause": outputs["$error"], + } + ) @staticmethod async def processAllPendingNodes( @@ -89,7 +98,8 @@ async def _promise(): # If not already present, add inputs and descriptor along for # context and to support retries. if "$error" in outputs: - outputs["$error"] = outputs["$error"] | { + error_dict = {k: outputs["$error"][k] for k in outputs["$error"]} + outputs["$error"] = error_dict | { "descriptor": descriptor, "inputs": inputs, }