diff --git a/gdocs/docs_tools.py b/gdocs/docs_tools.py index c7f65afc..4bb0c472 100644 --- a/gdocs/docs_tools.py +++ b/gdocs/docs_tools.py @@ -28,6 +28,9 @@ create_bullet_list_request ) +# Import Drive helper functions +from gdrive.drive_helpers import find_or_create_folder + # Import document structure and table utilities from gdocs.docs_structure import ( parse_document_structure, @@ -48,6 +51,139 @@ logger = logging.getLogger(__name__) +@server.tool() +@handle_http_errors("create_drive_folder", service_type="drive") +@require_google_service("drive", "drive_file") +async def create_drive_folder( + service: Any, + user_google_email: str, + folder_name: str, + parent_folder_id: str = 'root', + drive_id: str = None, +) -> str: + """ + Creates a new folder in Google Drive, supporting both My Drive and Shared Drives. + + Args: + user_google_email: User's Google email address + folder_name: Name for the new folder + parent_folder_id: ID of the parent folder (defaults to 'root' for My Drive) + drive_id: ID of the shared drive (optional, for Shared Drives/Team Drives) + + Returns: + str: Confirmation message with folder ID and link. + """ + logger.info(f"[create_drive_folder] Invoked. Email: '{user_google_email}', Name='{folder_name}', parent={parent_folder_id}, drive_id={drive_id}") + + # Determine the correct parent folder + if drive_id and parent_folder_id == 'root': + # For shared drives, use the drive ID as parent + parent_folder_id = drive_id + + # Create folder metadata + folder_metadata = { + 'name': folder_name, + 'mimeType': 'application/vnd.google-apps.folder', + 'parents': [parent_folder_id] + } + + # Create the folder + created_folder = await asyncio.to_thread( + service.files().create( + body=folder_metadata, + fields='id, name, webViewLink', + supportsAllDrives=True + ).execute + ) + + folder_id = created_folder.get('id') + folder_link = created_folder.get('webViewLink', '#') + + parent_info = f" in folder {parent_folder_id}" if parent_folder_id != 'root' else " in root" + drive_info = f" (Shared Drive: {drive_id})" if drive_id else "" + + msg = f"Created folder '{folder_name}' (ID: {folder_id}) for {user_google_email}{parent_info}{drive_info}. Link: {folder_link}" + logger.info(f"Successfully created folder '{folder_name}' (ID: {folder_id}) for {user_google_email}. Link: {folder_link}") + return msg + +@server.tool() +@handle_http_errors("search_drive_folders", is_read_only=True, service_type="drive") +@require_google_service("drive", "drive_read") +async def search_drive_folders( + service: Any, + user_google_email: str, + folder_name: str = None, + parent_folder_id: str = 'root', + drive_id: str = None, + include_items_from_all_drives: bool = True, + page_size: int = 20, +) -> str: + """ + Searches for folders in Google Drive, supporting both My Drive and Shared Drives. + + Args: + user_google_email: User's Google email address + folder_name: Name of folder to search for (optional, searches all folders if not provided) + parent_folder_id: ID of parent folder to search within (defaults to 'root') + drive_id: ID of the shared drive (optional, for Shared Drives/Team Drives) + include_items_from_all_drives: Whether to include items from all drives when searching + page_size: Maximum number of folders to return + + Returns: + str: A formatted list of found folders with their details. + """ + logger.info(f"[search_drive_folders] Invoked. Email: '{user_google_email}', folder_name='{folder_name}', parent={parent_folder_id}, drive_id={drive_id}") + + # Build search query + query_parts = ["mimeType = 'application/vnd.google-apps.folder'", "trashed = false"] + + if folder_name: + escaped_name = folder_name.replace("'", "\\'") + query_parts.append(f"name contains '{escaped_name}'") + + if parent_folder_id != 'root': + query_parts.append(f"'{parent_folder_id}' in parents") + + query = " and ".join(query_parts) + + # Build search parameters + search_params = { + "q": query, + "pageSize": page_size, + "fields": "files(id, name, parents, modifiedTime, webViewLink)", + "supportsAllDrives": True, + "includeItemsFromAllDrives": include_items_from_all_drives + } + + # Add drive-specific parameters if drive_id is provided + if drive_id: + search_params["corpora"] = "drive" + search_params["driveId"] = drive_id + + results = await asyncio.to_thread( + service.files().list(**search_params).execute + ) + + folders = results.get('files', []) + if not folders: + search_desc = f"matching '{folder_name}'" if folder_name else "in the specified location" + return f"No folders found {search_desc}." + + output = [f"Found {len(folders)} folder(s):"] + for folder in folders: + folder_id = folder['id'] + folder_name = folder['name'] + modified_time = folder.get('modifiedTime', 'N/A') + web_link = folder.get('webViewLink', '#') + parents = folder.get('parents', []) + parent_info = f" (Parent: {parents[0]})" if parents else "" + + output.append( + f"- {folder_name} (ID: {folder_id}){parent_info} Modified: {modified_time} Link: {web_link}" + ) + + return "\n".join(output) + @server.tool() @handle_http_errors("search_docs", is_read_only=True, service_type="docs") @require_google_service("drive", "drive_read") @@ -278,29 +414,86 @@ async def list_docs_in_folder( @server.tool() @handle_http_errors("create_doc", service_type="docs") -@require_google_service("docs", "docs_write") +@require_multiple_services([ + {"service_type": "docs", "scopes": "docs_write", "param_name": "docs_service"}, + {"service_type": "drive", "scopes": "drive_file", "param_name": "drive_service"} +]) async def create_doc( - service: Any, + docs_service: Any, + drive_service: Any, user_google_email: str, title: str, content: str = '', + folder_id: str = None, + folder_name: str = None, + drive_id: str = None, + include_items_from_all_drives: bool = True, ) -> str: """ Creates a new Google Doc and optionally inserts initial content. + Supports creating documents in specific folders (My Drive or Shared Drives). + + Args: + user_google_email: User's Google email address + title: Title for the new document + content: Initial content to insert (optional) + folder_id: ID of the folder to create the document in (optional) + folder_name: Name of the folder to create the document in (optional, will create if doesn't exist) + drive_id: ID of the shared drive (optional, for Shared Drives/Team Drives) + include_items_from_all_drives: Whether to include items from all drives when searching for folders Returns: str: Confirmation message with document ID and link. """ - logger.info(f"[create_doc] Invoked. Email: '{user_google_email}', Title='{title}'") - - doc = await asyncio.to_thread(service.documents().create(body={'title': title}).execute) - doc_id = doc.get('documentId') + logger.info(f"[create_doc] Invoked. Email: '{user_google_email}', Title='{title}', folder_id={folder_id}, folder_name={folder_name}, drive_id={drive_id}") + + # Handle folder placement + target_folder_id = None + folder_info = "" + + if folder_id: + # Use provided folder ID + target_folder_id = folder_id + folder_info = f" in folder {folder_id}" + logger.info(f"[create_doc] Using provided folder_id: {folder_id}") + + elif folder_name: + # Search for folder by name + target_folder_id = await find_or_create_folder( + drive_service, folder_name, drive_id, include_items_from_all_drives + ) + if target_folder_id: + folder_info = f" in folder '{folder_name}' (ID: {target_folder_id})" + logger.info(f"[create_doc] Found/created folder '{folder_name}' with ID: {target_folder_id}") + else: + logger.warning(f"[create_doc] Could not find or create folder '{folder_name}', document will be in root") + + # Create the document using Drive API to specify folder location + file_metadata = { + 'name': title, + 'mimeType': 'application/vnd.google-apps.document' + } + if target_folder_id: + file_metadata['parents'] = [target_folder_id] + + # Create the document file in Drive + created_file = await asyncio.to_thread( + drive_service.files().create( + body=file_metadata, + fields='id, name', + supportsAllDrives=True + ).execute + ) + doc_id = created_file.get('id') + + # Insert content if provided if content: requests = [{'insertText': {'location': {'index': 1}, 'text': content}}] - await asyncio.to_thread(service.documents().batchUpdate(documentId=doc_id, body={'requests': requests}).execute) + await asyncio.to_thread(docs_service.documents().batchUpdate(documentId=doc_id, body={'requests': requests}).execute) + link = f"https://docs.google.com/document/d/{doc_id}/edit" - msg = f"Created Google Doc '{title}' (ID: {doc_id}) for {user_google_email}. Link: {link}" - logger.info(f"Successfully created Google Doc '{title}' (ID: {doc_id}) for {user_google_email}. Link: {link}") + msg = f"Created Google Doc '{title}' (ID: {doc_id}) for {user_google_email}{folder_info}. Link: {link}" + logger.info(f"Successfully created Google Doc '{title}' (ID: {doc_id}) for {user_google_email}{folder_info}. Link: {link}") return msg diff --git a/gdrive/drive_helpers.py b/gdrive/drive_helpers.py index 74de7c65..d6298723 100644 --- a/gdrive/drive_helpers.py +++ b/gdrive/drive_helpers.py @@ -4,8 +4,12 @@ Shared utilities for Google Drive operations including permission checking. """ import re +import asyncio +import logging from typing import List, Dict, Any, Optional +logger = logging.getLogger(__name__) + def check_public_link_permission(permissions: List[Dict[str, Any]]) -> bool: """ @@ -107,4 +111,90 @@ def build_drive_list_params( elif corpora: list_params["corpora"] = corpora - return list_params \ No newline at end of file + return list_params + + +async def find_or_create_folder( + drive_service: Any, + folder_name: str, + drive_id: str = None, + include_items_from_all_drives: bool = True +) -> str: + """ + Finds a folder by name or creates it if it doesn't exist. + Supports both My Drive and Shared Drives. + + Args: + drive_service: Google Drive service instance + folder_name: Name of the folder to find/create + drive_id: ID of the shared drive (optional) + include_items_from_all_drives: Whether to include items from all drives when searching + + Returns: + str: Folder ID if found/created, None if failed + """ + logger.info(f"[find_or_create_folder] Looking for folder '{folder_name}', drive_id={drive_id}") + + try: + # Search for existing folder + escaped_name = folder_name.replace("'", "\\'") + query = f"name = '{escaped_name}' and mimeType = 'application/vnd.google-apps.folder' and trashed = false" + + # Build search parameters + search_params = { + "q": query, + "pageSize": 10, + "fields": "files(id, name, parents)", + "supportsAllDrives": True, + "includeItemsFromAllDrives": include_items_from_all_drives + } + + # Add drive-specific parameters if drive_id is provided + if drive_id: + search_params["corpora"] = "drive" + search_params["driveId"] = drive_id + + results = await asyncio.to_thread( + drive_service.files().list(**search_params).execute + ) + + folders = results.get('files', []) + + if folders: + # Folder exists, return the first match + folder_id = folders[0]['id'] + logger.info(f"[find_or_create_folder] Found existing folder '{folder_name}' with ID: {folder_id}") + return folder_id + + # Folder doesn't exist, create it + logger.info(f"[find_or_create_folder] Folder '{folder_name}' not found, creating new folder") + + # Determine parent folder for the new folder + parent_folder = 'root' + if drive_id: + # For shared drives, the parent is the drive root + parent_folder = drive_id + + # Create folder metadata + folder_metadata = { + 'name': folder_name, + 'mimeType': 'application/vnd.google-apps.folder', + 'parents': [parent_folder] + } + + # Create the folder + created_folder = await asyncio.to_thread( + drive_service.files().create( + body=folder_metadata, + fields='id, name', + supportsAllDrives=True + ).execute + ) + + new_folder_id = created_folder.get('id') + logger.info(f"[find_or_create_folder] Successfully created folder '{folder_name}' with ID: {new_folder_id}") + return new_folder_id + + except Exception as e: + logger.error(f"[find_or_create_folder] Error finding/creating folder '{folder_name}': {e}") + return None \ No newline at end of file