class OpenAIStreamingFunction(Protocol)
A Protocol that represents a function that can be used with OpenAI Streaming.
The OpenAI Schema for the function.
def openai_streaming_function(func: F) -> OpenAIStreamingFunction
Decorator that creates an OpenAI Schema for your function, while support using Generators for Streaming.
To document your function (so the model will know how to use it), simply use docstring. Using standard docstring styles will also allow you to document your argument's description
Arguments:
func
: The function to convert
Returns:
Your function with additional attribute openai_schema
async def stream_to_log(
response: Union[Iterator[OAIResponse], AsyncIterator[OAIResponse]]
) -> List[OAIResponse]
A utility function to convert a stream to a log.
Arguments:
response
: The response stream from OpenAI
Returns:
A list of the response stream
async def print_stream_log(log: List[OAIResponse])
A utility function to print the log of a stream nicely.
This is useful for debugging, when you first save the stream to an array and then use it.
Arguments:
log
:
class ContentFuncDef()
A class that represents a Content Function definition: function name, and argument name.
class DiffPreprocessor()
Preprocessor that returns only the difference between the current dictionary and the previous one. It is used to convert the parsed JSON stream to a dictionary of the changes, so we can stream the changes to the function calls.
def preprocess(key, current_dict)
Preprocesses the current dictionary by returning only the difference between the current dictionary and the
previous one.
Arguments:
key
: The key of the current dictionary, this is usually the function namecurrent_dict
: The current dictionary value to preprocess
Returns:
The difference between the current dictionary and the previous one
async def process_response(
response: OAIResponse,
content_func: Optional[Callable[[AsyncGenerator[str, None]],
Awaitable[None]]] = None,
funcs: Optional[List[Callable[[], Awaitable[None]]]] = None,
self: Optional = None) -> Tuple[Set[str], Dict[str, Any]]
Processes an OpenAI response stream and returns a set of function names that were invoked, and a dictionary contains
the results of the functions (to be used as part of the message history for the next api request).
Arguments:
response
: The response stream from OpenAIcontent_func
: The function to use for the assistant's text messagefuncs
: The functions to use when called by the assistantself
: An optional self argument to pass to the functions
Raises:
ValueError
: If the arguments are invalidLookupError
: If the response does not contain a delta
Returns:
A tuple of the set of function names that were invoked and a dictionary of the results of the functions
def o_func(func)
Returns the original function from a function that has been wrapped by a decorator (that preserves the original
function in the func attribute).
Arguments:
func
:
async def dispatch_yielded_functions_with_args(
gen: Callable[[], AsyncGenerator[Tuple[str, Dict], None]],
funcs: Union[List[Callable], Dict[str, Callable]],
dict_preprocessor: Optional[Callable[[str, Dict], Dict]],
self: Optional = None) -> Set[str]
Dispatches function calls from a generator that yields function names and arguments to the functions.
Arguments:
gen
: The generator that yields function names and argumentsfuncs
: The functions to dispatch todict_preprocessor
: A function that takes a function name and a dictionary of arguments and returns a new dictionary of argumentsself
: An optional self argument to pass to the functions
Returns:
A set of function names that were invoked
class BaseHandler(Protocol[TModel])
The base handler for the structured response from OpenAI.
def model() -> Type[TModel]
The Pydantic Data Model that we parse
Returns:
type of the Pydantic model
async def handle_partially_parsed(data: TModel) -> Optional[Terminate]
Handle partially parsed model
Arguments:
data
: The partially parsed object
Returns:
None or Terminate if we want to terminate the parsing
async def terminated()
Called when the parsing was terminated
async def process_struct_response(
response: OAIResponse,
handler: BaseHandler,
output_serialization: OutputSerialization = "json"
) -> Tuple[Optional[Union[TModel, Terminate]], Dict[str, Any]]
Process the structured response from OpenAI.
This is useful when we want to parse a structured response from OpenAI in streaming mode. For example: our response contains reasoning, and content - but we want to stream only the content to the user.
Arguments:
response
: The response from OpenAIhandler
: The handler for the response. It should be a subclass ofBaseHandler
output_serialization
: The output serialization of the response. It should be either "json" or "yaml"
Returns:
A tuple of the last parsed response, and a dictionary containing the OpenAI response
class YamlParser(Parser)
Parse partial YAML