diff --git a/README.md b/README.md index a80d2bfe..4bf42894 100644 --- a/README.md +++ b/README.md @@ -700,6 +700,7 @@ cp .env.oauth21 .env | `get_drive_file_content` | **Core** | Read file content (Office formats) | | `list_drive_items` | Extended | List folder contents | | `create_drive_file` | **Core** | Create files or fetch from URLs | +| `move_drive_file` | Extended | Move files between folders | diff --git a/auth/service_decorator.py b/auth/service_decorator.py index 8df59ca2..302eb8a5 100644 --- a/auth/service_decorator.py +++ b/auth/service_decorator.py @@ -18,6 +18,7 @@ GMAIL_COMPOSE_SCOPE, GMAIL_MODIFY_SCOPE, GMAIL_LABELS_SCOPE, + DRIVE_SCOPE, DRIVE_READONLY_SCOPE, DRIVE_FILE_SCOPE, DOCS_READONLY_SCOPE, @@ -343,6 +344,7 @@ def _remove_user_email_arg_from_docstring(docstring: str) -> str: "gmail_modify": GMAIL_MODIFY_SCOPE, "gmail_labels": GMAIL_LABELS_SCOPE, # Drive scopes + "drive": DRIVE_SCOPE, "drive_read": DRIVE_READONLY_SCOPE, "drive_file": DRIVE_FILE_SCOPE, # Docs scopes diff --git a/gdrive/drive_tools.py b/gdrive/drive_tools.py index ba103e68..ce61a6b1 100644 --- a/gdrive/drive_tools.py +++ b/gdrive/drive_tools.py @@ -3,6 +3,7 @@ This module provides MCP tools for interacting with Google Drive API. """ + import logging import asyncio from typing import Optional @@ -18,6 +19,7 @@ logger = logging.getLogger(__name__) + @server.tool() @handle_http_errors("search_drive_files", is_read_only=True, service_type="drive") @require_google_service("drive", "drive_read") @@ -46,7 +48,9 @@ async def search_drive_files( Returns: str: A formatted list of found files/folders with their details (ID, name, type, size, modified time, link). """ - logger.info(f"[search_drive_files] Invoked. Email: '{user_google_email}', Query: '{query}'") + logger.info( + f"[search_drive_files] Invoked. Email: '{user_google_email}', Query: '{query}'" + ) # Check if the query looks like a structured Drive query or free text # Look for Drive API operators and structured query patterns @@ -54,12 +58,16 @@ async def search_drive_files( if is_structured_query: final_query = query - logger.info(f"[search_drive_files] Using structured query as-is: '{final_query}'") + logger.info( + f"[search_drive_files] Using structured query as-is: '{final_query}'" + ) else: # For free text queries, wrap in fullText contains escaped_query = query.replace("'", "\\'") final_query = f"fullText contains '{escaped_query}'" - logger.info(f"[search_drive_files] Reformatting free text query '{query}' to '{final_query}'") + logger.info( + f"[search_drive_files] Reformatting free text query '{query}' to '{final_query}'" + ) list_params = build_drive_list_params( query=final_query, @@ -69,22 +77,23 @@ async def search_drive_files( corpora=corpora, ) - results = await asyncio.to_thread( - service.files().list(**list_params).execute - ) - files = results.get('files', []) + results = await asyncio.to_thread(service.files().list(**list_params).execute) + files = results.get("files", []) if not files: return f"No files found for '{query}'." - formatted_files_text_parts = [f"Found {len(files)} files for {user_google_email} matching '{query}':"] + formatted_files_text_parts = [ + f"Found {len(files)} files for {user_google_email} matching '{query}':" + ] for item in files: - size_str = f", Size: {item.get('size', 'N/A')}" if 'size' in item else "" + size_str = f", Size: {item.get('size', 'N/A')}" if "size" in item else "" formatted_files_text_parts.append( f"- Name: \"{item['name']}\" (ID: {item['id']}, Type: {item['mimeType']}{size_str}, Modified: {item.get('modifiedTime', 'N/A')}) Link: {item.get('webViewLink', '#')}" ) text_output = "\n".join(formatted_files_text_parts) return text_output + @server.tool() @handle_http_errors("get_drive_file_content", is_read_only=True, service_type="drive") @require_google_service("drive", "drive_read") @@ -111,9 +120,13 @@ async def get_drive_file_content( logger.info(f"[get_drive_file_content] Invoked. File ID: '{file_id}'") file_metadata = await asyncio.to_thread( - service.files().get( - fileId=file_id, fields="id, name, mimeType, webViewLink", supportsAllDrives=True - ).execute + service.files() + .get( + fileId=file_id, + fields="id, name, mimeType, webViewLink", + supportsAllDrives=True, + ) + .execute ) mime_type = file_metadata.get("mimeType", "") file_name = file_metadata.get("name", "Unknown File") @@ -141,7 +154,7 @@ async def get_drive_file_content( office_mime_types = { "application/vnd.openxmlformats-officedocument.wordprocessingml.document", "application/vnd.openxmlformats-officedocument.presentationml.presentation", - "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet" + "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", } if mime_type in office_mime_types: @@ -181,7 +194,7 @@ async def get_drive_file_content( async def list_drive_items( service, user_google_email: str, - folder_id: str = 'root', + folder_id: str = "root", page_size: int = 100, drive_id: Optional[str] = None, include_items_from_all_drives: bool = True, @@ -203,7 +216,9 @@ async def list_drive_items( Returns: str: A formatted list of files/folders in the specified folder. """ - logger.info(f"[list_drive_items] Invoked. Email: '{user_google_email}', Folder ID: '{folder_id}'") + logger.info( + f"[list_drive_items] Invoked. Email: '{user_google_email}', Folder ID: '{folder_id}'" + ) final_query = f"'{folder_id}' in parents and trashed=false" @@ -215,22 +230,23 @@ async def list_drive_items( corpora=corpora, ) - results = await asyncio.to_thread( - service.files().list(**list_params).execute - ) - files = results.get('files', []) + results = await asyncio.to_thread(service.files().list(**list_params).execute) + files = results.get("files", []) if not files: return f"No items found in folder '{folder_id}'." - formatted_items_text_parts = [f"Found {len(files)} items in folder '{folder_id}' for {user_google_email}:"] + formatted_items_text_parts = [ + f"Found {len(files)} items in folder '{folder_id}' for {user_google_email}:" + ] for item in files: - size_str = f", Size: {item.get('size', 'N/A')}" if 'size' in item else "" + size_str = f", Size: {item.get('size', 'N/A')}" if "size" in item else "" formatted_items_text_parts.append( f"- Name: \"{item['name']}\" (ID: {item['id']}, Type: {item['mimeType']}{size_str}, Modified: {item.get('modifiedTime', 'N/A')}) Link: {item.get('webViewLink', '#')}" ) text_output = "\n".join(formatted_items_text_parts) return text_output + @server.tool() @handle_http_errors("create_drive_file", service_type="drive") @require_google_service("drive", "drive_file") @@ -239,8 +255,8 @@ async def create_drive_file( user_google_email: str, file_name: str, content: Optional[str] = None, # Now explicitly Optional - folder_id: str = 'root', - mime_type: str = 'text/plain', + folder_id: str = "root", + mime_type: str = "text/plain", fileUrl: Optional[str] = None, # Now explicitly Optional ) -> str: """ @@ -258,7 +274,9 @@ async def create_drive_file( Returns: str: Confirmation message of the successful file creation with file link. """ - logger.info(f"[create_drive_file] Invoked. Email: '{user_google_email}', File Name: {file_name}, Folder ID: {folder_id}, fileUrl: {fileUrl}") + logger.info( + f"[create_drive_file] Invoked. Email: '{user_google_email}', File Name: {file_name}, Folder ID: {folder_id}, fileUrl: {fileUrl}" + ) if not content and not fileUrl: raise Exception("You must provide either 'content' or 'fileUrl'.") @@ -270,39 +288,44 @@ async def create_drive_file( async with httpx.AsyncClient() as client: resp = await client.get(fileUrl) if resp.status_code != 200: - raise Exception(f"Failed to fetch file from URL: {fileUrl} (status {resp.status_code})") + raise Exception( + f"Failed to fetch file from URL: {fileUrl} (status {resp.status_code})" + ) file_data = await resp.aread() # Try to get MIME type from Content-Type header content_type = resp.headers.get("Content-Type") if content_type and content_type != "application/octet-stream": mime_type = content_type - logger.info(f"[create_drive_file] Using MIME type from Content-Type header: {mime_type}") + logger.info( + f"[create_drive_file] Using MIME type from Content-Type header: {mime_type}" + ) elif content: - file_data = content.encode('utf-8') + file_data = content.encode("utf-8") - file_metadata = { - 'name': file_name, - 'parents': [folder_id], - 'mimeType': mime_type - } + file_metadata = {"name": file_name, "parents": [folder_id], "mimeType": mime_type} media = io.BytesIO(file_data) created_file = await asyncio.to_thread( - service.files().create( + service.files() + .create( body=file_metadata, media_body=MediaIoBaseUpload(media, mimetype=mime_type, resumable=True), - fields='id, name, webViewLink', - supportsAllDrives=True - ).execute + fields="id, name, webViewLink", + supportsAllDrives=True, + ) + .execute ) - link = created_file.get('webViewLink', 'No link available') + link = created_file.get("webViewLink", "No link available") confirmation_message = f"Successfully created file '{created_file.get('name', file_name)}' (ID: {created_file.get('id', 'N/A')}) in folder '{folder_id}' for {user_google_email}. Link: {link}" logger.info(f"Successfully created file. Link: {link}") return confirmation_message + @server.tool() -@handle_http_errors("get_drive_file_permissions", is_read_only=True, service_type="drive") +@handle_http_errors( + "get_drive_file_permissions", is_read_only=True, service_type="drive" +) @require_google_service("drive", "drive_read") async def get_drive_file_permissions( service, @@ -311,27 +334,31 @@ async def get_drive_file_permissions( ) -> str: """ Gets detailed metadata about a Google Drive file including sharing permissions. - + Args: user_google_email (str): The user's Google email address. Required. file_id (str): The ID of the file to check permissions for. - + Returns: str: Detailed file metadata including sharing status and URLs. """ - logger.info(f"[get_drive_file_permissions] Checking file {file_id} for {user_google_email}") - + logger.info( + f"[get_drive_file_permissions] Checking file {file_id} for {user_google_email}" + ) + try: # Get comprehensive file metadata including permissions file_metadata = await asyncio.to_thread( - service.files().get( + service.files() + .get( fileId=file_id, fields="id, name, mimeType, size, modifiedTime, owners, permissions, " - "webViewLink, webContentLink, shared, sharingUser, viewersCanCopyContent", - supportsAllDrives=True - ).execute + "webViewLink, webContentLink, shared, sharingUser, viewersCanCopyContent", + supportsAllDrives=True, + ) + .execute ) - + # Format the response output_parts = [ f"File: {file_metadata.get('name', 'Unknown')}", @@ -343,74 +370,85 @@ async def get_drive_file_permissions( "Sharing Status:", f" Shared: {file_metadata.get('shared', False)}", ] - + # Add sharing user if available - sharing_user = file_metadata.get('sharingUser') + sharing_user = file_metadata.get("sharingUser") if sharing_user: - output_parts.append(f" Shared by: {sharing_user.get('displayName', 'Unknown')} ({sharing_user.get('emailAddress', 'Unknown')})") - + output_parts.append( + f" Shared by: {sharing_user.get('displayName', 'Unknown')} ({sharing_user.get('emailAddress', 'Unknown')})" + ) + # Process permissions - permissions = file_metadata.get('permissions', []) + permissions = file_metadata.get("permissions", []) if permissions: output_parts.append(f" Number of permissions: {len(permissions)}") output_parts.append(" Permissions:") for perm in permissions: - perm_type = perm.get('type', 'unknown') - role = perm.get('role', 'unknown') - - if perm_type == 'anyone': + perm_type = perm.get("type", "unknown") + role = perm.get("role", "unknown") + + if perm_type == "anyone": output_parts.append(f" - Anyone with the link ({role})") - elif perm_type == 'user': - email = perm.get('emailAddress', 'unknown') + elif perm_type == "user": + email = perm.get("emailAddress", "unknown") output_parts.append(f" - User: {email} ({role})") - elif perm_type == 'domain': - domain = perm.get('domain', 'unknown') + elif perm_type == "domain": + domain = perm.get("domain", "unknown") output_parts.append(f" - Domain: {domain} ({role})") - elif perm_type == 'group': - email = perm.get('emailAddress', 'unknown') + elif perm_type == "group": + email = perm.get("emailAddress", "unknown") output_parts.append(f" - Group: {email} ({role})") else: output_parts.append(f" - {perm_type} ({role})") else: output_parts.append(" No additional permissions (private file)") - + # Add URLs - output_parts.extend([ - "", - "URLs:", - f" View Link: {file_metadata.get('webViewLink', 'N/A')}", - ]) - + output_parts.extend( + [ + "", + "URLs:", + f" View Link: {file_metadata.get('webViewLink', 'N/A')}", + ] + ) + # webContentLink is only available for files that can be downloaded - web_content_link = file_metadata.get('webContentLink') + web_content_link = file_metadata.get("webContentLink") if web_content_link: output_parts.append(f" Direct Download Link: {web_content_link}") - + # Check if file has "anyone with link" permission from gdrive.drive_helpers import check_public_link_permission + has_public_link = check_public_link_permission(permissions) - + if has_public_link: - output_parts.extend([ - "", - "✅ This file is shared with 'Anyone with the link' - it can be inserted into Google Docs" - ]) + output_parts.extend( + [ + "", + "✅ This file is shared with 'Anyone with the link' - it can be inserted into Google Docs", + ] + ) else: - output_parts.extend([ - "", - "❌ This file is NOT shared with 'Anyone with the link' - it cannot be inserted into Google Docs", - " To fix: Right-click the file in Google Drive → Share → Anyone with the link → Viewer" - ]) - + output_parts.extend( + [ + "", + "❌ This file is NOT shared with 'Anyone with the link' - it cannot be inserted into Google Docs", + " To fix: Right-click the file in Google Drive → Share → Anyone with the link → Viewer", + ] + ) + return "\n".join(output_parts) - + except Exception as e: logger.error(f"Error getting file permissions: {e}") return f"Error getting file permissions: {e}" @server.tool() -@handle_http_errors("check_drive_file_public_access", is_read_only=True, service_type="drive") +@handle_http_errors( + "check_drive_file_public_access", is_read_only=True, service_type="drive" +) @require_google_service("drive", "drive_read") async def check_drive_file_public_access( service, @@ -419,20 +457,20 @@ async def check_drive_file_public_access( ) -> str: """ Searches for a file by name and checks if it has public link sharing enabled. - + Args: user_google_email (str): The user's Google email address. Required. file_name (str): The name of the file to check. - + Returns: str: Information about the file's sharing status and whether it can be used in Google Docs. """ logger.info(f"[check_drive_file_public_access] Searching for {file_name}") - + # Search for the file escaped_name = file_name.replace("'", "\\'") query = f"name = '{escaped_name}'" - + list_params = { "q": query, "pageSize": 10, @@ -440,15 +478,13 @@ async def check_drive_file_public_access( "supportsAllDrives": True, "includeItemsFromAllDrives": True, } - - results = await asyncio.to_thread( - service.files().list(**list_params).execute - ) - - files = results.get('files', []) + + results = await asyncio.to_thread(service.files().list(**list_params).execute) + + files = results.get("files", []) if not files: return f"No file found with name '{file_name}'" - + if len(files) > 1: output_parts = [f"Found {len(files)} files with name '{file_name}':"] for f in files: @@ -457,40 +493,183 @@ async def check_drive_file_public_access( output_parts.append("") else: output_parts = [] - + # Check permissions for the first file - file_id = files[0]['id'] - + file_id = files[0]["id"] + # Get detailed permissions file_metadata = await asyncio.to_thread( - service.files().get( + service.files() + .get( fileId=file_id, fields="id, name, mimeType, permissions, webViewLink, webContentLink, shared", - supportsAllDrives=True - ).execute + supportsAllDrives=True, + ) + .execute ) - - permissions = file_metadata.get('permissions', []) + + permissions = file_metadata.get("permissions", []) from gdrive.drive_helpers import check_public_link_permission, get_drive_image_url + has_public_link = check_public_link_permission(permissions) - - output_parts.extend([ - f"File: {file_metadata['name']}", - f"ID: {file_id}", - f"Type: {file_metadata['mimeType']}", - f"Shared: {file_metadata.get('shared', False)}", - "" - ]) - + + output_parts.extend( + [ + f"File: {file_metadata['name']}", + f"ID: {file_id}", + f"Type: {file_metadata['mimeType']}", + f"Shared: {file_metadata.get('shared', False)}", + "", + ] + ) + if has_public_link: - output_parts.extend([ - "✅ PUBLIC ACCESS ENABLED - This file can be inserted into Google Docs", - f"Use with insert_doc_image_url: {get_drive_image_url(file_id)}" - ]) + output_parts.extend( + [ + "✅ PUBLIC ACCESS ENABLED - This file can be inserted into Google Docs", + f"Use with insert_doc_image_url: {get_drive_image_url(file_id)}", + ] + ) + else: + output_parts.extend( + [ + "❌ NO PUBLIC ACCESS - Cannot insert into Google Docs", + "Fix: Drive → Share → 'Anyone with the link' → 'Viewer'", + ] + ) + + return "\n".join(output_parts) + + +@server.tool() +@handle_http_errors("move_drive_file", service_type="drive") +@require_google_service("drive", "drive") +async def move_drive_file( + service, + user_google_email: str, + file_id: str, + destination_folder_id: str, + keep_in_original: bool = False, +) -> str: + """ + Moves a Google Drive file from its current location(s) to a new folder. + + By default, this removes the file from all current parent folders and places it in the destination folder. + If keep_in_original is True, the file will be copied to the new location while staying in the original. + + Args: + user_google_email (str): The user's Google email address. Required. + file_id (str): The ID of the file to move. + destination_folder_id (str): The ID of the destination folder. Use 'root' for the root Drive folder. + keep_in_original (bool): If True, keeps the file in its current location(s) while adding it to the new folder. Defaults to False (move). + + Returns: + str: Confirmation message with file and folder details. + """ + logger.info( + f"[move_drive_file] Invoked. Email: '{user_google_email}', File ID: {file_id}, Destination: {destination_folder_id}" + ) + + # First, get the current file metadata including its current parents + file_metadata = await asyncio.to_thread( + service.files() + .get( + fileId=file_id, + fields="id, name, parents, mimeType, webViewLink", + supportsAllDrives=True, + ) + .execute + ) + + current_parents = file_metadata.get("parents", []) + file_name = file_metadata.get("name", "Unknown") + file_type = file_metadata.get("mimeType", "Unknown") + + logger.info( + f"[move_drive_file] File '{file_name}' currently has {len(current_parents)} parent(s)" + ) + + # Validate destination folder exists + try: + destination_metadata = await asyncio.to_thread( + service.files() + .get( + fileId=destination_folder_id, + fields="id, name, mimeType", + supportsAllDrives=True, + ) + .execute + ) + + # Check if destination is actually a folder + if ( + destination_metadata.get("mimeType") != "application/vnd.google-apps.folder" + and destination_folder_id != "root" + ): + return f"Error: Destination ID '{destination_folder_id}' is not a folder. It's a {destination_metadata.get('mimeType', 'unknown type')}." + + except Exception as e: + if "not found" in str(e).lower(): + return f"Error: Destination folder with ID '{destination_folder_id}' not found or not accessible." + raise e + + destination_name = ( + destination_metadata.get("name", "Root") + if destination_folder_id != "root" + else "Root" + ) + + # Check if file is already in the destination folder + if destination_folder_id in current_parents: + return f"File '{file_name}' is already in folder '{destination_name}'. No action needed." + + # Prepare the update parameters + update_params = { + "fileId": file_id, + "addParents": destination_folder_id, + "fields": "id, name, parents", + "supportsAllDrives": True, + } + + # If not keeping in original locations, remove from current parents + if not keep_in_original and current_parents: + update_params["removeParents"] = ",".join(current_parents) + action_verb = "moved" + action_description = ( + f"removed from {len(current_parents)} previous location(s) and placed" + ) + else: + action_verb = "copied" if keep_in_original else "moved" + action_description = "added" + + # Perform the move/copy operation + updated_file = await asyncio.to_thread( + service.files().update(**update_params).execute + ) + + new_parents = updated_file.get("parents", []) + + # Build response message + response_parts = [ + f"Successfully {action_verb} file '{file_name}' to folder '{destination_name}'", + f"File ID: {file_id}", + f"File Type: {file_type}", + f"Action: File {action_description} in '{destination_name}'", + ] + + if keep_in_original: + response_parts.append( + f"Note: File remains in {len(current_parents)} original location(s)" + ) + response_parts.append(f"Total locations now: {len(new_parents)}") else: - output_parts.extend([ - "❌ NO PUBLIC ACCESS - Cannot insert into Google Docs", - "Fix: Drive → Share → 'Anyone with the link' → 'Viewer'" - ]) - - return "\n".join(output_parts) \ No newline at end of file + response_parts.append(f"Previous locations: {len(current_parents)}") + response_parts.append(f"Current locations: {len(new_parents)}") + + if file_metadata.get("webViewLink"): + response_parts.append(f"View file: {file_metadata['webViewLink']}") + + logger.info( + f"[move_drive_file] Successfully {action_verb} '{file_name}' for {user_google_email}" + ) + return "\n".join(response_parts)