diff --git a/libs/python/agent/agent/adapters/models/internvl.py b/libs/python/agent/agent/adapters/models/internvl.py index bb2de42e8..053112856 100644 --- a/libs/python/agent/agent/adapters/models/internvl.py +++ b/libs/python/agent/agent/adapters/models/internvl.py @@ -25,6 +25,16 @@ class InternVLModel: """ def __init__(self, model_name: str, device: str = "auto", trust_remote_code: bool = False) -> None: + """Initialize the InternVL model with specified configuration. + + Args: + model_name: The name or path of the InternVL model to load + device: Device to load the model on, defaults to "auto" + trust_remote_code: Whether to trust remote code when loading the model + + Raises: + ImportError: If InternVL dependencies are not available + """ if not HF_AVAILABLE: raise ImportError( "InternVL dependencies not found. Install with: pip install \"cua-agent[internvl-hf]\"" @@ -37,6 +47,7 @@ def __init__(self, model_name: str, device: str = "auto", trust_remote_code: boo self._load() def _load(self) -> None: + """Load the model and tokenizer from the specified model name.""" # Load model self.model = AutoModel.from_pretrained( self.model_name, @@ -58,6 +69,15 @@ def _load(self) -> None: IMAGENET_STD = (0.229, 0.224, 0.225) def _build_transform(self, input_size: int) -> T.Compose: + """Build image transformation pipeline for preprocessing. + + Args: + input_size: Target size for image resizing + + Returns: + Composed transformation pipeline that converts images to RGB, resizes, + converts to tensor, and normalizes with ImageNet statistics + """ MEAN, STD = self.IMAGENET_MEAN, self.IMAGENET_STD transform = T.Compose([ T.Lambda(lambda img: img.convert('RGB') if img.mode != 'RGB' else img), @@ -68,6 +88,18 @@ def _build_transform(self, input_size: int) -> T.Compose: return transform def _find_closest_aspect_ratio(self, aspect_ratio: float, target_ratios: List[tuple], width: int, height: int, image_size: int): + """Find the target aspect ratio that best matches the input image. + + Args: + aspect_ratio: Original aspect ratio of the image + target_ratios: List of possible target aspect ratios as (width, height) tuples + width: Original image width + height: Original image height + image_size: Base image size for calculations + + Returns: + Best matching aspect ratio tuple from target_ratios + """ best_ratio_diff = float('inf') best_ratio = (1, 1) area = width * height @@ -83,6 +115,18 @@ def _find_closest_aspect_ratio(self, aspect_ratio: float, target_ratios: List[tu return best_ratio def _dynamic_preprocess(self, image: Image.Image, min_num: int = 1, max_num: int = 12, image_size: int = 448, use_thumbnail: bool = True) -> List[Image.Image]: + """Preprocess image by splitting it into tiles based on aspect ratio. + + Args: + image: Input PIL image to preprocess + min_num: Minimum number of tiles to generate + max_num: Maximum number of tiles to generate + image_size: Size of each tile + use_thumbnail: Whether to add a thumbnail version of the full image + + Returns: + List of processed image tiles, optionally including a thumbnail + """ orig_width, orig_height = image.size aspect_ratio = orig_width / orig_height @@ -116,7 +160,17 @@ def _dynamic_preprocess(self, image: Image.Image, min_num: int = 1, max_num: int return processed_images def _load_image_from_source(self, src: str) -> Image.Image: - """Load PIL image from various sources: data URL, http(s), or local path.""" + """Load PIL image from various sources: data URL, http(s), or local path. + + Args: + src: Image source - can be a data URL, HTTP(S) URL, or local file path + + Returns: + PIL Image object converted to RGB format + + Raises: + Various exceptions depending on source type (network errors, file errors, etc.) + """ if src.startswith("data:image/"): # data URL base64 header, b64data = src.split(",", 1) @@ -130,6 +184,17 @@ def _load_image_from_source(self, src: str) -> Image.Image: return Image.open(src).convert('RGB') def _images_to_pixel_values(self, images: List[Image.Image], input_size: int = 448, max_num: int = 12): + """Convert list of PIL images to tensor pixel values for model input. + + Args: + images: List of PIL images to convert + input_size: Target size for image preprocessing + max_num: Maximum number of tiles per image + + Returns: + Tuple of (pixel_values tensor, list of patch counts per image). + Returns (None, []) if no images provided. + """ transform = self._build_transform(input_size=input_size) pixel_values_list = [] num_patches_list: List[int] = [] @@ -151,6 +216,14 @@ def generate(self, messages: List[Dict[str, Any]], max_new_tokens: int = 128) -> This implementation constructs InternVL-compatible inputs and uses `model.chat(tokenizer, pixel_values, question, history=...)` to avoid relying on AutoProcessor (which fails for some tokenizers). + + Args: + messages: List of message dictionaries with role and content fields. + Content can contain text and image items. + max_new_tokens: Maximum number of new tokens to generate + + Returns: + Generated text response from the model, or empty string if generation fails """ assert self.model is not None and self.tokenizer is not None diff --git a/libs/python/agent/agent/adapters/models/opencua.py b/libs/python/agent/agent/adapters/models/opencua.py index 32c73134a..8c0e56c78 100644 --- a/libs/python/agent/agent/adapters/models/opencua.py +++ b/libs/python/agent/agent/adapters/models/opencua.py @@ -17,6 +17,16 @@ class OpenCUAModel: """OpenCUA model handler using AutoTokenizer, AutoModel and AutoImageProcessor.""" def __init__(self, model_name: str, device: str = "auto", trust_remote_code: bool = False) -> None: + """Initialize the OpenCUA model with specified configuration. + + Args: + model_name: The name or path of the model to load + device: Device to run the model on, defaults to "auto" + trust_remote_code: Whether to trust remote code when loading the model + + Raises: + ImportError: If OpenCUA requirements are not installed + """ if not OPENCUA_AVAILABLE: raise ImportError( "OpenCUA requirements not found. Install with: pip install \"cua-agent[opencua-hf]\"" @@ -30,6 +40,7 @@ def __init__(self, model_name: str, device: str = "auto", trust_remote_code: boo self._load() def _load(self) -> None: + """Load the tokenizer, model, and image processor from the specified model name.""" self.tokenizer = AutoTokenizer.from_pretrained( self.model_name, trust_remote_code=self.trust_remote_code ) @@ -46,6 +57,14 @@ def _load(self) -> None: @staticmethod def _extract_last_image_b64(messages: List[Dict[str, Any]]) -> str: + """Extract the base64 encoded image data from the last image in the message list. + + Args: + messages: List of message dictionaries in HF format with content items + + Returns: + Base64 encoded image data string, or empty string if no image found + """ # Expect HF-format messages with content items type: "image" with data URL for msg in reversed(messages): for item in reversed(msg.get("content", [])): @@ -56,6 +75,15 @@ def _extract_last_image_b64(messages: List[Dict[str, Any]]) -> str: return "" def generate(self, messages: List[Dict[str, Any]], max_new_tokens: int = 512) -> str: + """Generate text response from the model using the provided messages. + + Args: + messages: List of message dictionaries containing conversation history + max_new_tokens: Maximum number of new tokens to generate + + Returns: + Generated text response as a string + """ assert self.model is not None and self.tokenizer is not None and self.image_processor is not None # Tokenize text side using chat template diff --git a/libs/python/agent/agent/integrations/hud/proxy.py b/libs/python/agent/agent/integrations/hud/proxy.py index 9087d1c96..c017499a9 100644 --- a/libs/python/agent/agent/integrations/hud/proxy.py +++ b/libs/python/agent/agent/integrations/hud/proxy.py @@ -35,6 +35,12 @@ def _map_agent_output_to_openai_blocks(output_items: List[Dict[str, Any]]) -> Li Only a subset is supported: computer_call, assistant message (text), and reasoning. Unknown types are ignored. + + Args: + output_items: List of agent output items to convert + + Returns: + List of OpenAI ResponseOutputItem objects """ blocks: List[ResponseOutputItem] = [] for item in output_items or []: @@ -81,6 +87,14 @@ def _map_agent_output_to_openai_blocks(output_items: List[Dict[str, Any]]) -> Li return blocks def _to_plain_dict_list(items: Any) -> List[Dict[str, Any]]: + """Convert items to a list of plain dictionaries. + + Args: + items: Items to convert, can be objects with model_dump method or dictionaries + + Returns: + List of dictionaries representing the items + """ out: List[Dict[str, Any]] = [] for it in list(items): if hasattr(it, "model_dump"): @@ -100,11 +114,23 @@ class FakeAsyncOpenAI: """ def __init__(self, computer_agent: BaseComputerAgent) -> None: + """Initialize the fake OpenAI client. + + Args: + computer_agent: The ComputerAgent instance to use for generating responses + """ self._agent = computer_agent self.responses = self._Responses(self) class _Responses: + """Internal responses handler for the fake OpenAI client.""" + def __init__(self, parent: "FakeAsyncOpenAI") -> None: + """Initialize the responses handler. + + Args: + parent: The parent FakeAsyncOpenAI instance + """ # Caches for cross-call context when using previous_response_id self.blocks_cache: Dict[str, ResponseInputParam | ResponseOutputItem] = {} self.context_cache: Dict[str, List[str]] = {} @@ -121,6 +147,23 @@ async def create( max_retries: int = 5, **_: Any, ) -> Any: + """Create a response using the computer agent. + + Args: + model: The model name to use + input: The input parameters for the response + tools: Optional list of tools to use + instructions: Optional instructions to prepend + previous_response_id: Optional ID of previous response for context + max_retries: Maximum number of retry attempts + **_: Additional keyword arguments (ignored) + + Returns: + OpenAI Response object with agent output + + Raises: + Exception: If all retry attempts fail + """ for attempt in range(max_retries): # Prepend cached blocks from previous_response_id to input full_input = input @@ -217,6 +260,25 @@ def __init__( telemetry_enabled: bool | None = True, **kwargs: Any, ) -> None: + """Initialize the proxy operator agent. + + Args: + model: Model name to use, defaults to "computer-use-preview" + allowed_tools: List of allowed tool names, defaults to ["openai_computer"] + trajectory_dir: Directory for storing trajectories + tools: Additional tools to include + custom_loop: Custom loop implementation + only_n_most_recent_images: Limit on recent images to keep + callbacks: List of callback functions + instructions: Instructions to prepend to prompts + verbosity: Logging verbosity level + max_retries: Maximum retry attempts + screenshot_delay: Delay between screenshots + use_prompt_caching: Whether to use prompt caching + max_trajectory_budget: Budget limit for trajectories + telemetry_enabled: Whether telemetry is enabled + **kwargs: Additional arguments passed to OperatorAgent + """ model = model or "computer-use-preview" allowed_tools = allowed_tools or ["openai_computer"] diff --git a/libs/python/agent/agent/responses.py b/libs/python/agent/agent/responses.py index 34318bce8..3b4531b43 100644 --- a/libs/python/agent/agent/responses.py +++ b/libs/python/agent/agent/responses.py @@ -31,10 +31,23 @@ from openai.types.responses.response_input_image_param import ResponseInputImageParam def random_id(): + """Generate a random UUID string. + + Returns: + str: A random UUID4 string + """ return str(uuid.uuid4()) # User message items def make_input_image_item(image_data: Union[str, bytes]) -> EasyInputMessageParam: + """Create an input image message item for user messages. + + Args: + image_data: Image data as base64 string or raw bytes + + Returns: + EasyInputMessageParam: A user message containing the image + """ return EasyInputMessageParam( content=[ ResponseInputImageParam( @@ -48,6 +61,14 @@ def make_input_image_item(image_data: Union[str, bytes]) -> EasyInputMessagePara # Text items def make_reasoning_item(reasoning: str) -> ResponseReasoningItemParam: + """Create a reasoning item containing summary text. + + Args: + reasoning: The reasoning text content + + Returns: + ResponseReasoningItemParam: A reasoning item with the provided text + """ return ResponseReasoningItemParam( id=random_id(), summary=[ @@ -57,6 +78,14 @@ def make_reasoning_item(reasoning: str) -> ResponseReasoningItemParam: ) def make_output_text_item(content: str) -> ResponseOutputMessageParam: + """Create an output text message item for assistant responses. + + Args: + content: The text content to include in the message + + Returns: + ResponseOutputMessageParam: An assistant message containing the text + """ return ResponseOutputMessageParam( id=random_id(), content=[ @@ -73,6 +102,16 @@ def make_output_text_item(content: str) -> ResponseOutputMessageParam: # Function call items def make_function_call_item(function_name: str, arguments: Dict[str, Any], call_id: Optional[str] = None) -> ResponseFunctionToolCallParam: + """Create a function call item with the specified name and arguments. + + Args: + function_name: Name of the function to call + arguments: Dictionary of arguments to pass to the function + call_id: Optional call ID, generates random ID if not provided + + Returns: + ResponseFunctionToolCallParam: A completed function call item + """ return ResponseFunctionToolCallParam( id=random_id(), call_id=call_id if call_id else random_id(), @@ -84,6 +123,17 @@ def make_function_call_item(function_name: str, arguments: Dict[str, Any], call_ # Computer tool call items def make_click_item(x: int, y: int, button: Literal["left", "right", "wheel", "back", "forward"] = "left", call_id: Optional[str] = None) -> ResponseComputerToolCallParam: + """Create a computer click action item. + + Args: + x: X coordinate for the click + y: Y coordinate for the click + button: Mouse button to click (default: "left") + call_id: Optional call ID, generates random ID if not provided + + Returns: + ResponseComputerToolCallParam: A completed click action item + """ return ResponseComputerToolCallParam( id=random_id(), call_id=call_id if call_id else random_id(), @@ -99,6 +149,16 @@ def make_click_item(x: int, y: int, button: Literal["left", "right", "wheel", "b ) def make_double_click_item(x: int, y: int, call_id: Optional[str] = None) -> ResponseComputerToolCallParam: + """Create a computer double-click action item. + + Args: + x: X coordinate for the double-click + y: Y coordinate for the double-click + call_id: Optional call ID, generates random ID if not provided + + Returns: + ResponseComputerToolCallParam: A completed double-click action item + """ return ResponseComputerToolCallParam( id=random_id(), call_id=call_id if call_id else random_id(), @@ -113,6 +173,15 @@ def make_double_click_item(x: int, y: int, call_id: Optional[str] = None) -> Res ) def make_drag_item(path: List[Dict[str, int]], call_id: Optional[str] = None) -> ResponseComputerToolCallParam: + """Create a computer drag action item with a path of coordinates. + + Args: + path: List of dictionaries containing "x" and "y" coordinates for the drag path + call_id: Optional call ID, generates random ID if not provided + + Returns: + ResponseComputerToolCallParam: A completed drag action item + """ drag_path = [ActionDragPath(x=point["x"], y=point["y"]) for point in path] return ResponseComputerToolCallParam( id=random_id(), @@ -127,6 +196,15 @@ def make_drag_item(path: List[Dict[str, int]], call_id: Optional[str] = None) -> ) def make_keypress_item(keys: List[str], call_id: Optional[str] = None) -> ResponseComputerToolCallParam: + """Create a computer keypress action item. + + Args: + keys: List of key names to press + call_id: Optional call ID, generates random ID if not provided + + Returns: + ResponseComputerToolCallParam: A completed keypress action item + """ return ResponseComputerToolCallParam( id=random_id(), call_id=call_id if call_id else random_id(), @@ -140,6 +218,16 @@ def make_keypress_item(keys: List[str], call_id: Optional[str] = None) -> Respon ) def make_move_item(x: int, y: int, call_id: Optional[str] = None) -> ResponseComputerToolCallParam: + """Create a computer mouse move action item. + + Args: + x: X coordinate to move to + y: Y coordinate to move to + call_id: Optional call ID, generates random ID if not provided + + Returns: + ResponseComputerToolCallParam: A completed move action item + """ return ResponseComputerToolCallParam( id=random_id(), call_id=call_id if call_id else random_id(), @@ -154,6 +242,14 @@ def make_move_item(x: int, y: int, call_id: Optional[str] = None) -> ResponseCom ) def make_screenshot_item(call_id: Optional[str] = None) -> ResponseComputerToolCallParam: + """Create a computer screenshot action item. + + Args: + call_id: Optional call ID, generates random ID if not provided + + Returns: + ResponseComputerToolCallParam: A completed screenshot action item + """ return ResponseComputerToolCallParam( id=random_id(), call_id=call_id if call_id else random_id(), @@ -166,6 +262,18 @@ def make_screenshot_item(call_id: Optional[str] = None) -> ResponseComputerToolC ) def make_scroll_item(x: int, y: int, scroll_x: int, scroll_y: int, call_id: Optional[str] = None) -> ResponseComputerToolCallParam: + """Create a computer scroll action item. + + Args: + x: X coordinate where scrolling occurs + y: Y coordinate where scrolling occurs + scroll_x: Horizontal scroll amount + scroll_y: Vertical scroll amount + call_id: Optional call ID, generates random ID if not provided + + Returns: + ResponseComputerToolCallParam: A completed scroll action item + """ return ResponseComputerToolCallParam( id=random_id(), call_id=call_id if call_id else random_id(), @@ -182,6 +290,15 @@ def make_scroll_item(x: int, y: int, scroll_x: int, scroll_y: int, call_id: Opti ) def make_type_item(text: str, call_id: Optional[str] = None) -> ResponseComputerToolCallParam: + """Create a computer text typing action item. + + Args: + text: Text to type + call_id: Optional call ID, generates random ID if not provided + + Returns: + ResponseComputerToolCallParam: A completed type action item + """ return ResponseComputerToolCallParam( id=random_id(), call_id=call_id if call_id else random_id(), @@ -195,6 +312,14 @@ def make_type_item(text: str, call_id: Optional[str] = None) -> ResponseComputer ) def make_wait_item(call_id: Optional[str] = None) -> ResponseComputerToolCallParam: + """Create a computer wait action item. + + Args: + call_id: Optional call ID, generates random ID if not provided + + Returns: + ResponseComputerToolCallParam: A completed wait action item + """ return ResponseComputerToolCallParam( id=random_id(), call_id=call_id if call_id else random_id(), @@ -208,6 +333,16 @@ def make_wait_item(call_id: Optional[str] = None) -> ResponseComputerToolCallPar # Extra anthropic computer calls def make_left_mouse_down_item(x: Optional[int] = None, y: Optional[int] = None, call_id: Optional[str] = None) -> Dict[str, Any]: + """Create a left mouse button down action item. + + Args: + x: Optional X coordinate for the mouse down action + y: Optional Y coordinate for the mouse down action + call_id: Optional call ID, generates random ID if not provided + + Returns: + Dict[str, Any]: A completed left mouse down action item + """ return { "id": random_id(), "call_id": call_id if call_id else random_id(), @@ -222,6 +357,16 @@ def make_left_mouse_down_item(x: Optional[int] = None, y: Optional[int] = None, } def make_left_mouse_up_item(x: Optional[int] = None, y: Optional[int] = None, call_id: Optional[str] = None) -> Dict[str, Any]: + """Create a left mouse button up action item. + + Args: + x: Optional X coordinate for the mouse up action + y: Optional Y coordinate for the mouse up action + call_id: Optional call ID, generates random ID if not provided + + Returns: + Dict[str, Any]: A completed left mouse up action item + """ return { "id": random_id(), "call_id": call_id if call_id else random_id(), @@ -236,6 +381,17 @@ def make_left_mouse_up_item(x: Optional[int] = None, y: Optional[int] = None, ca } def make_failed_tool_call_items(tool_name: str, tool_kwargs: Dict[str, Any], error_message: str, call_id: Optional[str] = None) -> List[Dict[str, Any]]: + """Create a pair of items representing a failed tool call and its error output. + + Args: + tool_name: Name of the tool that failed + tool_kwargs: Arguments that were passed to the tool + error_message: Error message describing the failure + call_id: Optional call ID, generates random ID if not provided + + Returns: + List[Dict[str, Any]]: A list containing the function call and error output items + """ call_id = call_id if call_id else random_id() return [ { @@ -253,6 +409,15 @@ def make_failed_tool_call_items(tool_name: str, tool_kwargs: Dict[str, Any], err ] def make_tool_error_item(error_message: str, call_id: Optional[str] = None) -> Dict[str, Any]: + """Create a tool error output item. + + Args: + error_message: Error message to include in the output + call_id: Optional call ID, generates random ID if not provided + + Returns: + Dict[str, Any]: A function call output item containing the error + """ call_id = call_id if call_id else random_id() return { "type": "function_call_output", @@ -268,6 +433,9 @@ def replace_failed_computer_calls_with_function_calls(messages: List[Dict[str, A Args: messages: List of message items to process + + Returns: + List[Dict[str, Any]]: Modified list with computer calls replaced by function calls where appropriate """ messages = messages.copy() @@ -309,7 +477,7 @@ def convert_computer_calls_desc2xy(responses_items: List[Dict[str, Any]], desc2x desc2xy: Dictionary mapping element descriptions to (x, y) coordinate tuples Returns: - List of response items with element_description replaced by x,y coordinates + List[Dict[str, Any]]: List of response items with element_description replaced by x,y coordinates """ converted_items = [] @@ -356,7 +524,7 @@ def convert_computer_calls_xy2desc(responses_items: List[Dict[str, Any]], desc2x desc2xy: Dictionary mapping element descriptions to (x, y) coordinate tuples Returns: - List of response items with x,y coordinates replaced by element_description + List[Dict[str, Any]]: List of response items with x,y coordinates replaced by element_description """ # Create reverse mapping from coordinates to descriptions xy2desc = {coords: desc for desc, coords in desc2xy.items()} @@ -408,7 +576,7 @@ def get_all_element_descriptions(responses_items: List[Dict[str, Any]]) -> List[ responses_items: List of response items containing computer calls Returns: - List of unique element descriptions found in computer calls + List[str]: List of unique element descriptions found in computer calls """ descriptions = set() @@ -438,6 +606,9 @@ def convert_responses_items_to_completion_messages(messages: List[Dict[str, Any] messages: List of responses_items format messages allow_images_in_tool_results: If True, include images in tool role messages. If False, send tool message + separate user message with image. + + Returns: + List[Dict[str, Any]]: List of messages in completion format """ completion_messages = [] @@ -602,7 +773,14 @@ def convert_responses_items_to_completion_messages(messages: List[Dict[str, Any] def convert_completion_messages_to_responses_items(completion_messages: List[Dict[str, Any]]) -> List[Dict[str, Any]]: - """Convert completion messages format to responses_items message format.""" + """Convert completion messages format to responses_items message format. + + Args: + completion_messages: List of messages in completion format + + Returns: + List[Dict[str, Any]]: List of messages in responses_items format + """ responses_items = [] skip_next = False diff --git a/libs/python/computer-server/computer_server/main.py b/libs/python/computer-server/computer_server/main.py index ad0b0edeb..22a668214 100644 --- a/libs/python/computer-server/computer_server/main.py +++ b/libs/python/computer-server/computer_server/main.py @@ -1,3 +1,11 @@ +""" +FastAPI server for computer automation and control. + +This module provides a web API for controlling computer interactions including +mouse, keyboard, file system operations, and accessibility features. It supports +both WebSocket and HTTP endpoints with optional authentication for cloud deployments. +""" + from fastapi import FastAPI, WebSocket, WebSocketDisconnect, Request, HTTPException, Header from fastapi.responses import StreamingResponse, JSONResponse from typing import List, Dict, Any, Optional, Union, Literal, cast @@ -112,17 +120,42 @@ class AuthenticationManager: + """ + Manages authentication for cloud deployments using container names and API keys. + + Provides session caching to reduce API calls and handles authentication + against the TryCUA API service. + """ + def __init__(self): + """Initialize the authentication manager with empty session cache.""" self.sessions: Dict[str, Dict[str, Any]] = {} self.container_name = os.environ.get("CONTAINER_NAME") def _hash_credentials(self, container_name: str, api_key: str) -> str: - """Create a hash of container name and API key for session identification""" + """ + Create a hash of container name and API key for session identification. + + Args: + container_name: The container name to hash + api_key: The API key to hash + + Returns: + SHA256 hash of the combined credentials + """ combined = f"{container_name}:{api_key}" return hashlib.sha256(combined.encode()).hexdigest() def _is_session_valid(self, session_data: Dict[str, Any]) -> bool: - """Check if a session is still valid based on expiration time""" + """ + Check if a session is still valid based on expiration time. + + Args: + session_data: Dictionary containing session validity and expiration data + + Returns: + True if session is valid and not expired, False otherwise + """ if not session_data.get('valid', False): return False @@ -130,7 +163,16 @@ def _is_session_valid(self, session_data: Dict[str, Any]) -> bool: return time.time() < expires_at async def auth(self, container_name: str, api_key: str) -> bool: - """Authenticate container name and API key, using cached sessions when possible""" + """ + Authenticate container name and API key, using cached sessions when possible. + + Args: + container_name: The container name to authenticate + api_key: The API key for authentication + + Returns: + True if authentication succeeds, False otherwise + """ # If no CONTAINER_NAME is set, always allow access (local development) if not self.container_name: logger.info("No CONTAINER_NAME set in environment. Allowing access (local development mode)") @@ -201,14 +243,33 @@ async def auth(self, container_name: str, api_key: str) -> bool: class ConnectionManager: + """ + Manages WebSocket connections for the server. + + Handles connecting and disconnecting WebSocket clients. + """ + def __init__(self): + """Initialize with empty connection list.""" self.active_connections: List[WebSocket] = [] async def connect(self, websocket: WebSocket): + """ + Accept and register a new WebSocket connection. + + Args: + websocket: The WebSocket connection to accept + """ await websocket.accept() self.active_connections.append(websocket) def disconnect(self, websocket: WebSocket): + """ + Remove a WebSocket connection from active connections. + + Args: + websocket: The WebSocket connection to remove + """ self.active_connections.remove(websocket) @@ -217,6 +278,12 @@ def disconnect(self, websocket: WebSocket): @app.get("/status") async def status(): + """ + Get server status including OS type and available features. + + Returns: + Dict containing status, OS type, and available features + """ sys = platform.system().lower() # get os type if "darwin" in sys or sys == "macos" or sys == "mac": @@ -233,6 +300,15 @@ async def status(): @app.websocket("/ws", name="websocket_endpoint") async def websocket_endpoint(websocket: WebSocket): + """ + Main WebSocket endpoint for handling computer automation commands. + + Supports authentication for cloud deployments and processes commands + through registered handlers. + + Args: + websocket: The WebSocket connection + """ global handlers # WebSocket message size is configured at the app or endpoint level, not on the instance @@ -374,15 +450,23 @@ async def cmd_endpoint( Backup endpoint for when WebSocket connections fail. Accepts commands via HTTP POST with streaming response. + Args: + request: The HTTP request object + container_name: Container name for cloud authentication (from header) + api_key: API key for cloud authentication (from header) + Headers: - - X-Container-Name: Container name for cloud authentication - - X-API-Key: API key for cloud authentication + X-Container-Name: Container name for cloud authentication + X-API-Key: API key for cloud authentication Body: - { - "command": "command_name", - "params": {...} - } + { + "command": "command_name", + "params": {...} + } + + Returns: + StreamingResponse with command execution results """ global handlers @@ -420,7 +504,12 @@ async def cmd_endpoint( raise HTTPException(status_code=400, detail=f"Unknown command: {command}") async def generate_response(): - """Generate streaming response for the command execution""" + """ + Generate streaming response for the command execution. + + Yields: + JSON-formatted response data as server-sent events + """ try: # Filter params to only include those accepted by the handler function handler_func = handlers[command] @@ -463,17 +552,24 @@ async def agent_response_endpoint( """ Minimal proxy to run ComputerAgent for up to 2 turns. + Args: + request: The HTTP request object + api_key: API key for authentication (from header) + Security: - - If CONTAINER_NAME is set on the server, require X-API-Key - and validate using AuthenticationManager unless CUA_ENABLE_PUBLIC_PROXY is true. + If CONTAINER_NAME is set on the server, require X-API-Key + and validate using AuthenticationManager unless CUA_ENABLE_PUBLIC_PROXY is true. Body JSON: - { - "model": "...", # required - "input": "... or messages[]", # required - "agent_kwargs": { ... }, # optional, passed directly to ComputerAgent - "env": { ... } # optional env overrides for agent - } + { + "model": "...", # required + "input": "... or messages[]", # required + "agent_kwargs": { ... }, # optional, passed directly to ComputerAgent + "env": { ... } # optional env overrides for agent + } + + Returns: + JSONResponse with agent execution results """ if not HAS_AGENT: raise HTTPException(status_code=501, detail="ComputerAgent not available") @@ -505,14 +601,26 @@ async def agent_response_endpoint( # Simple env override context class _EnvOverride: + """Context manager for temporarily overriding environment variables.""" + def __init__(self, overrides: Dict[str, str]): + """ + Initialize with environment variable overrides. + + Args: + overrides: Dictionary of environment variables to override + """ self.overrides = overrides self._original: Dict[str, Optional[str]] = {} + def __enter__(self): + """Apply environment variable overrides.""" for k, v in (self.overrides or {}).items(): self._original[k] = os.environ.get(k) os.environ[k] = str(v) + def __exit__(self, exc_type, exc, tb): + """Restore original environment variables.""" for k, old in self._original.items(): if old is None: os.environ.pop(k, None) @@ -521,6 +629,15 @@ def __exit__(self, exc_type, exc, tb): # Convert input to messages def _to_messages(data: Union[str, List[Dict[str, Any]]]) -> List[Dict[str, Any]]: + """ + Convert string or message list to standard message format. + + Args: + data: Input data as string or list of message dictionaries + + Returns: + List of message dictionaries in standard format + """ if isinstance(data, str): return [{"role": "user", "content": data}] if isinstance(data, list): @@ -533,13 +650,26 @@ def _to_messages(data: Union[str, List[Dict[str, Any]]]) -> List[Dict[str, Any]] from agent.computers import AsyncComputerHandler # runtime-checkable Protocol class DirectComputer(AsyncComputerHandler): + """ + Direct computer interface that delegates to existing handlers. + + Implements the AsyncComputerHandler protocol for agent integration. + """ + def __init__(self): + """Initialize with module-scope handler singletons.""" # use module-scope handler singletons created by HandlerFactory self._auto = automation_handler self._file = file_handler self._access = accessibility_handler async def get_environment(self) -> Literal["windows", "mac", "linux", "browser"]: + """ + Get the current operating system environment. + + Returns: + The operating system type as a string literal + """ sys = platform.system().lower() if "darwin" in sys or sys in ("macos", "mac"): return "mac" @@ -548,14 +678,34 @@ async def get_environment(self) -> Literal["windows", "mac", "linux", "browser"] return "linux" async def get_dimensions(self) -> tuple[int, int]: + """ + Get screen dimensions as width, height tuple. + + Returns: + Tuple of (width, height) in pixels + """ size = await self._auto.get_screen_size() return size["width"], size["height"] async def screenshot(self) -> str: + """ + Take a screenshot and return as base64 encoded string. + + Returns: + Base64 encoded screenshot image data + """ img_b64 = await self._auto.screenshot() return img_b64["image_data"] async def click(self, x: int, y: int, button: str = "left") -> None: + """ + Click at the specified coordinates with the given button. + + Args: + x: X coordinate for the click + y: Y coordinate for the click + button: Mouse button to use ("left" or "right") + """ if button == "left": await self._auto.left_click(x, y) elif button == "right": @@ -564,22 +714,63 @@ async def click(self, x: int, y: int, button: str = "left") -> None: await self._auto.left_click(x, y) async def double_click(self, x: int, y: int) -> None: + """ + Double-click at the specified coordinates. + + Args: + x: X coordinate for the double-click + y: Y coordinate for the double-click + """ await self._auto.double_click(x, y) async def scroll(self, x: int, y: int, scroll_x: int, scroll_y: int) -> None: + """ + Scroll at the specified position with given scroll amounts. + + Args: + x: X coordinate where to scroll + y: Y coordinate where to scroll + scroll_x: Horizontal scroll amount + scroll_y: Vertical scroll amount + """ await self._auto.move_cursor(x, y) await self._auto.scroll(scroll_x, scroll_y) async def type(self, text: str) -> None: + """ + Type the specified text. + + Args: + text: Text to type + """ await self._auto.type_text(text) async def wait(self, ms: int = 1000) -> None: + """ + Wait for the specified number of milliseconds. + + Args: + ms: Number of milliseconds to wait + """ await asyncio.sleep(ms / 1000.0) async def move(self, x: int, y: int) -> None: + """ + Move cursor to the specified coordinates. + + Args: + x: X coordinate to move to + y: Y coordinate to move to + """ await self._auto.move_cursor(x, y) async def keypress(self, keys: Union[List[str], str]) -> None: + """ + Press the specified key or key combination. + + Args: + keys: Single key or list of keys to press + """ if isinstance(keys, str): parts = keys.replace("-", "+").split("+") if len(keys) > 1 else [keys] else: @@ -590,6 +781,12 @@ async def keypress(self, keys: Union[List[str], str]) -> None: await self._auto.hotkey(parts) async def drag(self, path: List[Dict[str, int]]) -> None: + """ + Drag along the specified path of coordinates. + + Args: + path: List of coordinate dictionaries with "x" and "y" keys + """ if not path: return start = path[0] @@ -600,13 +797,33 @@ async def drag(self, path: List[Dict[str, int]]) -> None: await self._auto.mouse_up(end["x"], end["y"]) async def get_current_url(self) -> str: + """ + Get current URL (not available in this server context). + + Returns: + Empty string as URLs are not available in desktop context + """ # Not available in this server context return "" async def left_mouse_down(self, x: Optional[int] = None, y: Optional[int] = None) -> None: + """ + Press left mouse button down at specified coordinates. + + Args: + x: X coordinate for mouse down (optional) + y: Y coordinate for mouse down (optional) + """ await self._auto.mouse_down(x, y, button="left") async def left_mouse_up(self, x: Optional[int] = None, y: Optional[int] = None) -> None: + """ + Release left mouse button at specified coordinates. + + Args: + x: X coordinate for mouse up (optional) + y: Y coordinate for mouse up (optional) + """ await self._auto.mouse_up(x, y, button="left") # # Inline image URLs to base64 @@ -703,4 +920,5 @@ async def left_mouse_up(self, x: Optional[int] = None, y: Optional[int] = None) if __name__ == "__main__": + """Run the FastAPI server when executed directly.""" uvicorn.run(app, host="0.0.0.0", port=8000)