diff --git a/examples/video_understanding/agent/video_preprocessor/webpage_vp.py b/examples/video_understanding/agent/video_preprocessor/webpage_vp.py index ff70d96d..cbf81b67 100644 --- a/examples/video_understanding/agent/video_preprocessor/webpage_vp.py +++ b/examples/video_understanding/agent/video_preprocessor/webpage_vp.py @@ -1,4 +1,5 @@ import hashlib +import logging import pickle import time from pathlib import Path @@ -42,7 +43,7 @@ class WebpageVideoPreprocessor(BaseLLMBackend, BaseWorker): kernel_size: Optional[int] = None show_progress: bool = True - use_cache: bool = False + use_cache: bool = True cache_dir: str = "./video_cache" @field_validator("stt", mode="before") @@ -87,7 +88,7 @@ def _run(self, video_path: str, *args, **kwargs): cache_path = ( Path(self.cache_dir) - .joinpath(video_path.replace("/", "-")) + .joinpath(Path(video_path).name.replace("/", "-")) .joinpath("video_cache.pkl") ) # Load video from cache if available @@ -174,7 +175,8 @@ def _run(self, video_path: str, *args, **kwargs): pickle.dump(video.scenes, f) # Process video if not loaded from cache - if not self.stm(self.workflow_instance_id).get("video", None): + # if not self.stm(self.workflow_instance_id).get("video", None): + if not cache_path.exists(): video = VideoScenes.load( video_path=video_path, threshold=self.scene_detect_threshold, diff --git a/omagent-core/src/omagent_core/clients/devices/app/schemas.py b/omagent-core/src/omagent_core/clients/devices/app/schemas.py index 111442b2..128838fc 100644 --- a/omagent-core/src/omagent_core/clients/devices/app/schemas.py +++ b/omagent-core/src/omagent_core/clients/devices/app/schemas.py @@ -77,3 +77,6 @@ class MessageType(str, Enum): TEXT = "text" IMAGE_URL = "image_url" IMAGE_BASE64 = "image_base64" + VIDEO_URL = "video_url" + VIDEO_PATH = "video_path" + \ No newline at end of file diff --git a/omagent-core/src/omagent_core/clients/devices/webpage/client.py b/omagent-core/src/omagent_core/clients/devices/webpage/client.py index 8afa07e6..9948be13 100644 --- a/omagent-core/src/omagent_core/clients/devices/webpage/client.py +++ b/omagent-core/src/omagent_core/clients/devices/webpage/client.py @@ -1,8 +1,21 @@ import html import json +import os +import shutil +import tempfile +import uuid +from pathlib import Path from time import sleep +from urllib.parse import unquote_plus, urlparse -import gradio as gr +import requests +from PIL import Image + +os.environ['GRADIO_TEMP_DIR'] = os.getcwd() +video_root_path = os.path.join(os.getcwd(), 'video_root') +callback_root_path = os.path.join(os.getcwd(), 'callback_root') +os.makedirs(video_root_path, exist_ok=True) +os.makedirs(callback_root_path, exist_ok=True) from omagent_core.clients.devices.app.callback import AppCallback from omagent_core.clients.devices.app.input import AppInput from omagent_core.clients.devices.app.schemas import ContentStatus, MessageType @@ -22,14 +35,16 @@ container.register_callback(callback=AppCallback) container.register_input(input=AppInput) +import gradio as gr + class WebpageClient: def __init__( - self, - interactor: ConductorWorkflow = None, - processor: ConductorWorkflow = None, - config_path: str = "./config", - workers: list = [], + self, + interactor: ConductorWorkflow = None, + processor: ConductorWorkflow = None, + config_path: str = "./config", + workers: list = [], ) -> None: self._interactor = interactor self._processor = processor @@ -45,7 +60,7 @@ def __init__( max-height: calc(100vh - 190px) !important; overflow-y: auto; } - + .running-message { margin: 0; padding: 2px 4px; @@ -54,16 +69,6 @@ def __init__( font-family: inherit; } - .error-message { - background-color: #f8d7da; - color: #721c24; - margin: 0; - padding: 2px 4px; - white-space: pre-wrap; - word-wrap: break-word; - font-family: inherit; - } - /* Remove the background and border of the message box */ .message-wrap { background: none !important; @@ -71,7 +76,7 @@ def __init__( padding: 0 !important; margin: 0 !important; } - + /* Remove the bubble style of the running message */ .message:has(.running-message) { background: none !important; @@ -79,46 +84,161 @@ def __init__( padding: 0 !important; box-shadow: none !important; } - - .message:has(.error-message) { - background: none !important; - border: none !important; - padding: 0 !important; - box-shadow: none !important; - } """ - + + def gradio_app(self): + + with gr.Blocks() as demo: + + def load_local_video() -> dict: + result = {} + for root, _, files in os.walk(video_root_path): + for file in filter(lambda x: x.split('.')[-1].lower() in ( + 'mp4', 'avi', 'mov', 'wmv', 'flv', 'mkv', 'webm', 'm4v'), files): + file_obs_path = os.path.join(root, file) + result[Path(file_obs_path).name] = file_obs_path + return result + + video_dict = load_local_video() + current_video = None + state = gr.State(value={ + 'video_dict': video_dict, + 'current_video': current_video + }) + with gr.Row(): + with gr.Column(): + with gr.Column(): + def display_video_map(video_title): + # change display video + video_path = state.value.get('video_dict', {}).get(video_title) + processor_workflow_instance_id = self._processor.start_workflow_with_input( + workflow_input={'video_path': video_path}) + while True: + status = self._processor.get_workflow( + workflow_id=processor_workflow_instance_id).status + if status in terminal_status: + break + sleep(1) + + meta = self._processor.get_workflow(workflow_id=processor_workflow_instance_id) + state.value['video_dict'] = load_local_video() + state.value.update(current_video=video_path) + state.value.update(processor_result=meta.output) + state.value.update(processor_workflow_instance_id=processor_workflow_instance_id) + + return video_path, state + + def update_video_map(video_path: str): + # upload video change state, display video + target_video_path = os.path.join(video_root_path, video_path.split(os.sep)[-1]) + shutil.move(video_path, target_video_path) + shutil.rmtree(os.path.join(os.getcwd(), video_path.split(os.sep)[-2]), ignore_errors=True) + + processor_workflow_instance_id = self._processor.start_workflow_with_input( + workflow_input={'video_path': target_video_path}) + while True: + status = self._processor.get_workflow( + workflow_id=processor_workflow_instance_id).status + if status in terminal_status: + break + sleep(1) + + meta = self._processor.get_workflow(workflow_id=processor_workflow_instance_id) + state.value['video_dict'] = load_local_video() + state.value.update(current_video=target_video_path) + state.value.update(processor_result=meta.output) + state.value.update(processor_workflow_instance_id=processor_workflow_instance_id) + return target_video_path, state, gr.Dropdown(state.value.get('video_dict', {}).keys()) + + select_video = gr.Dropdown( + state.value['video_dict'].keys(), + value=None + ) + upload_video = gr.UploadButton( + file_types=['video'] + ) + display_video = gr.Video( + state.value['current_video'], + ) + upload_video.upload( + fn=update_video_map, + inputs=[upload_video], + outputs=[display_video, state, select_video] + ) + select_video.change( + fn=display_video_map, + inputs=[select_video], + outputs=[display_video, state] + ) + + with gr.Column(): + with gr.Blocks(title="OmAgent"): + chatbot = gr.Chatbot( + type="messages", + ) + + chat_input = gr.MultimodalTextbox( + interactive=True, + file_count="multiple", + placeholder="Enter message...", + show_label=False, + ) + + chat_msg = chat_input.submit( + self.add_message, + [chatbot, chat_input, state], + [chatbot, chat_input] + ) + bot_msg = chat_msg.then( + self.bot, (chatbot, state), chatbot, api_name="bot_response" + ) + bot_msg.then( + lambda: gr.Textbox(interactive=True), None, [chat_input] + ) + + demo.launch( + max_file_size='1gb', + # server_name='0.0.0.0', + server_port=7860, + share=True, + ) + def start_interactor(self): self._task_handler_interactor = TaskHandler( worker_config=self._worker_config, workers=self._workers, task_to_domain=self._task_to_domain ) self._task_handler_interactor.start_processes() try: - with gr.Blocks(title="OmAgent", css=self._custom_css) as chat_interface: - chatbot = gr.Chatbot( - elem_id="OmAgent", - bubble_full_width=False, - type="messages", - height="100%", - ) - - chat_input = gr.MultimodalTextbox( - interactive=True, - file_count="multiple", - placeholder="Enter message or upload file...", - show_label=False, - ) - - chat_msg = chat_input.submit( - self.add_message, [chatbot, chat_input], [chatbot, chat_input] - ) - bot_msg = chat_msg.then( - self.bot, chatbot, chatbot, api_name="bot_response" - ) - bot_msg.then( - lambda: gr.MultimodalTextbox(interactive=True), None, [chat_input] - ) - chat_interface.launch() + # with gr.Blocks(title="OmAgent", css=self._custom_css) as chat_interface: + # chatbot = gr.Chatbot( + # elem_id="OmAgent", + # bubble_full_width=False, + # type="messages", + # height="100%", + # ) + # + # chat_input = gr.MultimodalTextbox( + # interactive=True, + # file_count="multiple", + # placeholder="Enter message or upload file...", + # show_label=False, + # ) + # + # chat_msg = chat_input.submit( + # self.add_message, [chatbot, chat_input], [chatbot, chat_input] + # ) + # bot_msg = chat_msg.then( + # self.bot, chatbot, chatbot, api_name="bot_response" + # ) + # bot_msg.then( + # lambda: gr.MultimodalTextbox(interactive=True), None, [chat_input] + # ) + # chat_interface.launch() + # # This is a simple RAG chatbot built on top of Llama Index and Gradio. It allows you to upload any text or PDF files and ask questions about them! + # # Before running this, make sure you have exported your OpenAI API key as an environment variable: + # # export OPENAI_API_KEY="your-openai-api-key" + # + self.gradio_app() except KeyboardInterrupt: logging.info("\nDetected Ctrl+C, stopping workflow...") if self._workflow_instance_id is not None: @@ -126,16 +246,16 @@ def start_interactor(self): workflow_id=self._workflow_instance_id ) raise - + def stop_interactor(self): self._task_handler_interactor.stop_processes() - + def start_processor(self): self._task_handler_processor = TaskHandler( worker_config=self._worker_config, workers=self._workers, task_to_domain=self._task_to_domain ) self._task_handler_processor.start_processes() - + try: with gr.Blocks(title="OmAgent", css=self._custom_css) as chat_interface: chatbot = gr.Chatbot( @@ -144,14 +264,14 @@ def start_processor(self): type="messages", height="100%", ) - + chat_input = gr.MultimodalTextbox( interactive=True, file_count="multiple", placeholder="Enter message or upload file...", show_label=False, ) - + chat_msg = chat_input.submit( self.add_processor_message, [chatbot, chat_input], @@ -171,22 +291,56 @@ def start_processor(self): workflow_id=self._workflow_instance_id ) raise - + def stop_processor(self): self._task_handler_processor.stop_processes() - - def add_message(self, history, message): + + def process_chat_message(self, message): + video_suffix = ('mp4', 'avi', 'mov', 'wmv', 'flv', 'mkv', 'webm', 'm4v') + image_suffix = ('jpeg', 'jpg', 'png', 'gif', 'bmp') + result = [] + text_message = {'role': 'user', 'content': message.get('text', '')} + result.append(text_message) + for file in message.get('files', []): + if file.split('.')[-1].lower() in image_suffix: + result.append({'role': 'user', 'content': gr.Image(Image.open(file))}) + elif file.split('.')[-1].lower() in video_suffix: + result.append({'role': 'user', 'content': gr.Video(file)}) + return result + + def add_message(self, history, message, state): + question = message.get('text', '') + if isinstance(state, gr.State): + if state.value.get('current_video') is None: + history.append({"role": "user", "content": message}) + history.append({"role": "assistant", "content": 'Please select a video'}) + return history, gr.MultimodalTextbox(value=None, interactive=False) + else: + if state.get('current_video') is None: + history.append({"role": "user", "content": message}) + history.append({"role": "assistant", "content": 'Please select a video'}) + return history, gr.MultimodalTextbox(value=None, interactive=False) + if self._workflow_instance_id is None: self._workflow_instance_id = self._interactor.start_workflow_with_input( - workflow_input={}, task_to_domain=self._task_to_domain + workflow_input={ + 'question': question, + "video_md5": state.value.get('processor_result', {}).get("video_md5"), + "video_path": state.value.get('processor_result', {}).get("video_path"), + "instance_id": state.value.get('processor_result', {}).get("instance_id"), + "processor_workflow_instance_id": state.value.get("processor_workflow_instance_id") + }, + task_to_domain=self._task_to_domain ) contents = [] - for x in message["files"]: - history.append({"role": "user", "content": {"path": x}}) - contents.append({"type": "image_url", "data": x}) - if message["text"] is not None: - history.append({"role": "user", "content": message["text"]}) - contents.append({"type": "text", "data": message["text"]}) + # history.append({"role": "user", "content": question}) + history.extend(self.process_chat_message(message)) + contents.append({"data": question, "type": "text"}) + for idx, x in enumerate(message["files"]): + # history.append({"role": "user", "content": {"path": x}}) + contents.append( + {"type": "image_url", "resource_id": str(idx), "data": str(x)} + ) result = { "agent_id": self._workflow_instance_id, "messages": [{"role": "user", "content": contents}], @@ -197,7 +351,7 @@ def add_message(self, history, message): {"payload": json.dumps(result, ensure_ascii=False)}, ) return history, gr.MultimodalTextbox(value=None, interactive=False) - + def add_processor_message(self, history, message): if self._workflow_instance_id is None: self._workflow_instance_id = self._processor.start_workflow_with_input( @@ -214,14 +368,24 @@ def add_processor_message(self, history, message): f"image_process", {"payload": json.dumps(result, ensure_ascii=False)} ) return history, gr.MultimodalTextbox(value=None, interactive=False) - - def bot(self, history: list): + + def bot(self, history, state): + if isinstance(state, gr.State): + if state.value.get('current_video') is None: + yield history + return + else: + if state.get('current_video') is None: + yield history + return + stream_name = f"{self._workflow_instance_id}_output" consumer_name = f"{self._workflow_instance_id}_agent" # consumer name group_name = "omappagent" # replace with your consumer group name running_stream_name = f"{self._workflow_instance_id}_running" self._check_redis_stream_exist(stream_name, group_name) self._check_redis_stream_exist(running_stream_name, group_name) + while True: # read running stream running_messages = self._get_redis_stream_message( @@ -239,7 +403,7 @@ def bot(self, history: list): ) history.append({"role": "assistant", "content": formatted_message}) yield history - + container.get_connector("redis_stream_client")._client.xack( running_stream_name, group_name, message_id ) @@ -248,7 +412,7 @@ def bot(self, history: list): group_name, consumer_name, stream_name ) finish_flag = False - + for stream, message_list in messages: for message_id, message in message_list: incomplete_flag = False @@ -266,10 +430,30 @@ def bot(self, history: list): "content": {"path": message_item["content"]}, } ) + elif message_item["type"] == MessageType.VIDEO_URL.value: + filename = os.path.basename(urlparse(unquote_plus(message_item["content"])).path) + video_file = os.path.join(callback_root_path, filename) + res = requests.get(message_item["content"], stream=True) + with open(video_file, 'wb') as f: + for chunk in res.iter_content(chunk_size=102400): + f.write(chunk) + history.append( + { + "role": "assistant", + "content": gr.Video(video_file), + } + ) + elif message_item["type"] == MessageType.VIDEO_PATH.value: + history.append( + { + "role": "assistant", + "content": gr.Video(message_item["content"]), + } + ) else: if incomplete_flag: self._incomplete_message = ( - self._incomplete_message + message_item["content"] + self._incomplete_message + message_item["content"] ) if history and history[-1]["role"] == "assistant": history[-1]["content"] = self._incomplete_message @@ -283,7 +467,7 @@ def bot(self, history: list): else: if self._incomplete_message != "": self._incomplete_message = ( - self._incomplete_message + message_item["content"] + self._incomplete_message + message_item["content"] ) if history and history[-1]["role"] == "assistant": history[-1]["content"] = self._incomplete_message @@ -307,35 +491,35 @@ def bot(self, history: list): f'
{payload_data["error_info"]}
' ) history.append( - { - "role": "assistant", - "content": formatted_message, - } - ) + { + "role": "assistant", + "content": formatted_message, + } + ) yield history - + container.get_connector("redis_stream_client")._client.xack( stream_name, group_name, message_id ) - + # check finish flag if ( - "interaction_type" in payload_data - and payload_data["interaction_type"] == 1 + "interaction_type" in payload_data + and payload_data["interaction_type"] == 1 ): finish_flag = True if ( - "content_status" in payload_data - and payload_data["content_status"] - == ContentStatus.END_ANSWER.value + "content_status" in payload_data + and payload_data["content_status"] + == ContentStatus.END_ANSWER.value ): self._workflow_instance_id = None finish_flag = True - + if finish_flag: break sleep(0.01) - + def processor_bot(self, history: list): history.append({"role": "assistant", "content": f"processing..."}) yield history @@ -349,9 +533,9 @@ def processor_bot(self, history: list): self._workflow_instance_id = None break sleep(0.01) - + def _get_redis_stream_message( - self, group_name: str, consumer_name: str, stream_name: str + self, group_name: str, consumer_name: str, stream_name: str ): messages = container.get_connector("redis_stream_client")._client.xreadgroup( group_name, consumer_name, {stream_name: ">"}, count=1 @@ -373,7 +557,7 @@ def _get_redis_stream_message( for stream, message_list in messages ] return messages - + def _check_redis_stream_exist(self, stream_name: str, group_name: str): try: container.get_connector("redis_stream_client")._client.xgroup_create( @@ -381,7 +565,7 @@ def _check_redis_stream_exist(self, stream_name: str, group_name: str): ) except Exception as e: logging.debug(f"Consumer group may already exist: {e}") - + def _get_message_payload(self, message: dict): logging.info(f"Received running message: {message}") payload = message.get("payload")