-
Notifications
You must be signed in to change notification settings - Fork 182
feat: [server] Add Human in the Loop example with FastAPI integration #630
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
- Introduced a new example in `main.py` demonstrating the use of LlamaIndexServer with a human in the loop. - Added a README.md file to explain the example. - Enhanced chat API to support resuming workflows with human responses. - Implemented `WorkflowService` for context management in `workflow.py`.
|
Important Review skippedDraft detected. Please check the settings in the CodeRabbit UI or the You can disable this status message by setting the Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
Documentation and Community
|
@@ -129,6 +179,25 @@ async def _text_stream( | |||
elif hasattr(event, "to_response"): | |||
event_response = event.to_response() | |||
yield VercelStreamResponse.convert_data(event_response) | |||
elif isinstance(event, InputRequiredEvent): | |||
run_id = handler.workflow_handler.run_id |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what is this run_id - how is it guaranteed that it's not the same between requests?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A run_id is generated each time we trigger a workflow run, which is unique for every request.
see: https://github.com/run-llama/llama_index/blob/c09c6ae228e6436a00568ad6b03b582307668b66/llama-index-core/llama_index/core/workflow/workflow.py#L239
So it's ideal for storing the workflow context.
python/llama-index-server/llama_index/server/api/routers/chat.py
Outdated
Show resolved
Hide resolved
import json | ||
from pathlib import Path | ||
|
||
from llama_index.core.workflow import Context, JsonSerializer, Workflow |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
python/llama-index-server/llama_index/server/api/routers/chat.py
Outdated
Show resolved
Hide resolved
python/llama-index-server/llama_index/server/api/routers/chat.py
Outdated
Show resolved
Hide resolved
from llama_index.server.services.llamacloud import LlamaCloudFileService | ||
|
||
|
||
class ResumeRequest(BaseModel): | ||
chat_id: str | ||
response: str |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use might response with a more complex answer (e.g. an image and a text) - in TS we're using MessageContent
response: str | |
response: MessageContent |
logger.error(e) | ||
raise HTTPException(status_code=500, detail=str(e)) | ||
|
||
@router.post("/resume") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
to ensure this endpoint is working with the UI, it should have the same contract as chat
how about:
class ChatRequest(BaseModel):
chat_id: Optional[str] = None
messages: List[ChatAPIMessage]
data: Optional[Any] = None
config: Optional[ChatConfig] = ChatConfig()
@field_validator("messages")
def validate_messages(cls, v: List[ChatAPIMessage], info: "ValidationInfo") -> List[ChatAPIMessage]:
if not v:
raise ValueError("Messages list cannot be empty.")
chat_id_value = info.data.get("chat_id")
if chat_id_value is not None:
if len(v) != 1:
raise ValueError(
"If chat_id is provided, messages list must contain exactly one message."
)
if v[-1].role != MessageRole.USER:
raise ValueError(
"Last message must be from user."
)
return v
run_id=run_id, | ||
ctx=ctx, | ||
) | ||
yield VercelStreamResponse.convert_data( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this shouldn't be in the streaming code, just extend to_response of InputRequiredEvent
question = "Where is your place now?" | ||
event = await ctx.wait_for_event( | ||
HumanResponseEvent, | ||
waiter_id=question, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why is the question an id?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a simple UI to demonstrate FE integration
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, keep in mind that we can't add workflow specific components to the server package (they must be in the dynamic componentsDir
), so this is only for testing
|
||
class ChatRequest(BaseModel): | ||
id: str # provided by FE |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it the same as https://ai-sdk.dev/docs/reference/ai-sdk-ui/use-chat#id ? when does it update? e.g. if the user reloads the UI page?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, keep in mind that we can't add workflow specific components to the server package (they must be in the dynamic componentsDir
), so this is only for testing
human_response = last_message.human_response | ||
if human_response: | ||
previous_ctx = WorkflowService.load_context(request.id, workflow) | ||
# send a new human response event then resume the workflow with the previous context | ||
previous_ctx.send_event( | ||
HumanResponseEvent( | ||
response=human_response, | ||
) | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
try to separete persistence concern and HITL - if request.id
is sent, we should re-create the workflow
human_response = last_message.human_response | |
if human_response: | |
previous_ctx = WorkflowService.load_context(request.id, workflow) | |
# send a new human response event then resume the workflow with the previous context | |
previous_ctx.send_event( | |
HumanResponseEvent( | |
response=human_response, | |
) | |
) | |
if request.id: | |
previous_ctx = WorkflowService.load_context(request.id, workflow) | |
# send a new human response event then resume the workflow with the previous context |
then we need to run the workflow with a specific event, either start event as is or the human response event
main.py
demonstrating the use of LlamaIndexServer with a human in the loop.WorkflowService
for context management inworkflow.py
.