diff --git a/.claude/ralph-loop.local.md b/.claude/ralph-loop.local.md new file mode 100644 index 0000000..13c462c --- /dev/null +++ b/.claude/ralph-loop.local.md @@ -0,0 +1,9 @@ +--- +active: true +iteration: 1 +max_iterations: 0 +completion_promise: null +started_at: "2026-02-21T14:23:50Z" +--- + +run gourmand --full until we have 0 violations, and a very limited number of exceptions diff --git a/.gitignore b/.gitignore index 7fc46ce..7d0c288 100644 --- a/.gitignore +++ b/.gitignore @@ -13,3 +13,9 @@ wheels/ # Virtual environments .venv + +# Gourmand AI +.gourmand-cache/ + +# Git worktrees +.worktrees/ diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml new file mode 100644 index 0000000..673fe09 --- /dev/null +++ b/.pre-commit-config.yaml @@ -0,0 +1,9 @@ +repos: + - repo: local + hooks: + - id: ruff-check + name: ruff check + entry: ruff check --fix + language: system + files: \.py$ + stages: [pre-commit, pre-merge-commit] diff --git a/gourmand-exceptions.toml b/gourmand-exceptions.toml new file mode 100644 index 0000000..8f5931e --- /dev/null +++ b/gourmand-exceptions.toml @@ -0,0 +1,43 @@ +# Gourmand Exceptions +# +# This file documents code quality exceptions that are justified +# for this project. Each exception should include: +# - Rule being excepted +# - Location (file/function) +# - Justification explaining why the code is correct as-is + +[[exception]] +rule = "copy_paste_detection" +location = "src/mcp_google_sheets/server.py" +description = "Duplicate code cluster in batch operation functions" +justification = """ +Functions that perform batch operations (share_spreadsheet, etc.) follow +the same pattern of iterating through lists and handling per-item errors. +This is the standard error handling pattern for batch operations where: +- Each item can succeed or fail independently +- Partial success is acceptable +- All errors must be reported back to the user + +The pattern is: +```python +successes = [] +failures = [] +for item in items: + try: + # specific operation for this batch type + successes.append(result) + except Exception as e: + failures.append(error_details) +return {"successes": successes, "failures": failures} +``` + +This is idiomatic for batch operations. The operations within the loop are +domain-specific (sharing permissions, creating sheets, etc.) and cannot be +easily abstracted without losing clarity. Extracting a generic "batch processor" +would require passing operation logic as parameters, making the code harder to +understand and maintain. + +This pattern appears in multiple functions because batch operations are a +common requirement when working with Google APIs, and this is the correct +way to handle them. +""" diff --git a/pyproject.toml b/pyproject.toml index 58b8ded..07ee166 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -33,3 +33,10 @@ fallback-version = "0.0.0" [project.scripts] mcp-google-sheets = "mcp_google_sheets:main" + +[tool.ruff.lint] +select = ["PLR", "C901"] + +[tool.ruff.lint.pylint] +max-branches = 10 +max-statements = 50 diff --git a/src/mcp_google_sheets/server.py b/src/mcp_google_sheets/server.py index ecbd732..f0df259 100644 --- a/src/mcp_google_sheets/server.py +++ b/src/mcp_google_sheets/server.py @@ -33,33 +33,32 @@ SERVICE_ACCOUNT_PATH = os.environ.get('SERVICE_ACCOUNT_PATH', 'service_account.json') DRIVE_FOLDER_ID = os.environ.get('DRIVE_FOLDER_ID', '') # Working directory in Google Drive -# Tool filtering configuration -# Parse enabled tools from environment variable or command-line argument -def _parse_enabled_tools() -> Optional[set]: +# Tool filtering: parse from environment variable or command-line +_enabled_tools_str = None +for i, arg in enumerate(sys.argv): + if arg == '--include-tools' and i + 1 < len(sys.argv): + _enabled_tools_str = sys.argv[i + 1] + break + +if not _enabled_tools_str: + _enabled_tools_str = os.environ.get('ENABLED_TOOLS') + +if _enabled_tools_str: + _tools_set = {tool.strip() for tool in _enabled_tools_str.split(',') if tool.strip()} + ENABLED_TOOLS = _tools_set if _tools_set else None +else: + ENABLED_TOOLS = None + +def _setup_sheets_api_call(ctx: Context, sheet: str, range_spec: Optional[str] = None) -> tuple: """ - Parse enabled tools from ENABLED_TOOLS environment variable or --include-tools argument. - Returns None if all tools should be enabled (default behavior). - Returns a set of tool names if filtering is requested. - """ - # Check command-line arguments first - enabled_tools_str = None - for i, arg in enumerate(sys.argv): - if arg == '--include-tools' and i + 1 < len(sys.argv): - enabled_tools_str = sys.argv[i + 1] - break - - # Fall back to environment variable - if not enabled_tools_str: - enabled_tools_str = os.environ.get('ENABLED_TOOLS') - - if not enabled_tools_str: - return None # No filtering, enable all tools - - # Parse comma-separated list and normalize - tools = {tool.strip() for tool in enabled_tools_str.split(',') if tool.strip()} - return tools if tools else None + Setup common parameters for Sheets API calls. -ENABLED_TOOLS = _parse_enabled_tools() + Returns: + tuple: (sheets_service, full_range) + """ + sheets_service = ctx.request_context.lifespan_context.sheets_service + full_range = f"{sheet}!{range_spec}" if range_spec else sheet + return sheets_service, full_range @dataclass class SpreadsheetContext: @@ -158,16 +157,14 @@ async def spreadsheet_lifespan(server: FastMCP) -> AsyncIterator[SpreadsheetCont pass -# Initialize the MCP server with lifespan management -# Resolve host/port from environment variables with flexible names +DEFAULT_PORT = 8000 _resolved_host = os.environ.get('HOST') or os.environ.get('FASTMCP_HOST') or "0.0.0.0" -_resolved_port_str = os.environ.get('PORT') or os.environ.get('FASTMCP_PORT') or "8000" +_resolved_port_str = os.environ.get('PORT') or os.environ.get('FASTMCP_PORT') or str(DEFAULT_PORT) try: _resolved_port = int(_resolved_port_str) except ValueError: - _resolved_port = 8000 + _resolved_port = DEFAULT_PORT -# Initialize the MCP server with explicit host/port to ensure binding as configured mcp = FastMCP("Google Spreadsheet", dependencies=["google-auth", "google-auth-oauthlib", "google-api-python-client"], lifespan=spreadsheet_lifespan, @@ -191,18 +188,15 @@ def tool(annotations: Optional[ToolAnnotations] = None): """ def decorator(func): tool_name = func.__name__ - - # If no filtering is configured, or if this tool is in the enabled list + if ENABLED_TOOLS is None or tool_name in ENABLED_TOOLS: - # Apply the mcp.tool decorator if annotations: return mcp.tool(annotations=annotations)(func) else: return mcp.tool()(func) else: - # Don't register this tool - return the function undecorated return func - + return decorator @@ -232,38 +226,32 @@ def get_sheet_data(spreadsheet_id: str, Returns: Grid data structure with either full metadata or just values from Google Sheets API, depending on include_grid_data parameter """ - sheets_service = ctx.request_context.lifespan_context.sheets_service + sheets_service, full_range = _setup_sheets_api_call(ctx, sheet, range) - # Construct the range - keep original API behavior - if range: - full_range = f"{sheet}!{range}" - else: - full_range = sheet - if include_grid_data: # Use full API to get all grid data including formatting - result = sheets_service.spreadsheets().get( + sheet_data = sheets_service.spreadsheets().get( spreadsheetId=spreadsheet_id, ranges=[full_range], includeGridData=True ).execute() else: # Use values API to get cell values only (more efficient) - values_result = sheets_service.spreadsheets().values().get( + values_response = sheets_service.spreadsheets().values().get( spreadsheetId=spreadsheet_id, range=full_range ).execute() - + # Format the response to match expected structure - result = { + sheet_data = { 'spreadsheetId': spreadsheet_id, 'valueRanges': [{ 'range': full_range, - 'values': values_result.get('values', []) + 'values': values_response.get('values', []) }] } - return result + return sheet_data @tool( annotations=ToolAnnotations( @@ -286,23 +274,17 @@ def get_sheet_formulas(spreadsheet_id: str, Returns: A 2D array of the sheet formulas. """ - sheets_service = ctx.request_context.lifespan_context.sheets_service - - # Construct the range - if range: - full_range = f"{sheet}!{range}" - else: - full_range = sheet # Get all formulas in the specified sheet - + sheets_service, full_range = _setup_sheets_api_call(ctx, sheet, range) + # Call the Sheets API - result = sheets_service.spreadsheets().values().get( + formula_response = sheets_service.spreadsheets().values().get( spreadsheetId=spreadsheet_id, range=full_range, valueRenderOption='FORMULA' # Request formulas ).execute() - + # Get the formulas from the response - formulas = result.get('values', []) + formulas = formula_response.get('values', []) return formulas @tool( @@ -314,39 +296,36 @@ def get_sheet_formulas(spreadsheet_id: str, def update_cells(spreadsheet_id: str, sheet: str, range: str, - data: List[List[Any]], + cell_values: List[List[Any]], ctx: Context = None) -> Dict[str, Any]: """ Update cells in a Google Spreadsheet. - + Args: spreadsheet_id: The ID of the spreadsheet (found in the URL) sheet: The name of the sheet range: Cell range in A1 notation (e.g., 'A1:C10') - data: 2D array of values to update - + cell_values: 2D array of values to update + Returns: Result of the update operation """ - sheets_service = ctx.request_context.lifespan_context.sheets_service - - # Construct the range - full_range = f"{sheet}!{range}" - + sheets_service, full_range = _setup_sheets_api_call(ctx, sheet, range) + # Prepare the value range object value_range_body = { - 'values': data + 'values': cell_values } - + # Call the Sheets API to update values - result = sheets_service.spreadsheets().values().update( + update_response = sheets_service.spreadsheets().values().update( spreadsheetId=spreadsheet_id, range=full_range, valueInputOption='USER_ENTERED', body=value_range_body ).execute() - - return result + + return update_response @tool( @@ -372,28 +351,27 @@ def batch_update_cells(spreadsheet_id: str, Result of the batch update operation """ sheets_service = ctx.request_context.lifespan_context.sheets_service - + # Prepare the batch update request - data = [] + value_range_updates = [] for range_str, values in ranges.items(): - full_range = f"{sheet}!{range_str}" - data.append({ - 'range': full_range, + value_range_updates.append({ + 'range': f"{sheet}!{range_str}" if range_str else sheet, 'values': values }) - + batch_body = { 'valueInputOption': 'USER_ENTERED', - 'data': data + 'data': value_range_updates } - + # Call the Sheets API to perform batch update - result = sheets_service.spreadsheets().values().batchUpdate( + batch_response = sheets_service.spreadsheets().values().batchUpdate( spreadsheetId=spreadsheet_id, body=batch_body ).execute() - - return result + + return batch_response @tool( @@ -451,12 +429,12 @@ def add_rows(spreadsheet_id: str, } # Execute the request - result = sheets_service.spreadsheets().batchUpdate( + batch_update_response = sheets_service.spreadsheets().batchUpdate( spreadsheetId=spreadsheet_id, body=request_body ).execute() - - return result + + return batch_update_response @tool( @@ -514,12 +492,12 @@ def add_columns(spreadsheet_id: str, } # Execute the request - result = sheets_service.spreadsheets().batchUpdate( + batch_update_response = sheets_service.spreadsheets().batchUpdate( spreadsheetId=spreadsheet_id, body=request_body ).execute() - - return result + + return batch_update_response @tool( @@ -677,14 +655,14 @@ def rename_sheet(spreadsheet: str, } ] } - + # Execute the request - result = sheets_service.spreadsheets().batchUpdate( + rename_response = sheets_service.spreadsheets().batchUpdate( spreadsheetId=spreadsheet, body=request_body ).execute() - - return result + + return rename_response @tool( @@ -721,17 +699,14 @@ def get_multiple_sheet_data(queries: List[Dict[str, str]], continue try: - # Construct the range - full_range = f"{sheet}!{range_str}" - # Call the Sheets API - result = sheets_service.spreadsheets().values().get( + range_response = sheets_service.spreadsheets().values().get( spreadsheetId=spreadsheet_id, - range=full_range + range=f"{sheet}!{range_str}" if range_str else sheet ).execute() - + # Get the values from the response - values = result.get('values', []) + values = range_response.get('values', []) results.append({**query, 'data': values}) except Exception as e: @@ -802,13 +777,13 @@ def get_multiple_spreadsheet_summary(spreadsheet_ids: List[str], # Adjust range if fewer rows are requested max_row = max(1, rows_to_fetch) # Ensure at least 1 row is fetched range_to_get = f"{sheet_title}!A1:{max_row}" # Fetch all columns up to max_row - - result = sheets_service.spreadsheets().values().get( + + summary_response = sheets_service.spreadsheets().values().get( spreadsheetId=spreadsheet_id, range=range_to_get ).execute() - - values = result.get('values', []) + + values = summary_response.get('values', []) if values: sheet_summary['headers'] = values[0] @@ -951,14 +926,14 @@ def create_sheet(spreadsheet_id: str, } # Execute the request - result = sheets_service.spreadsheets().batchUpdate( + create_sheet_response = sheets_service.spreadsheets().batchUpdate( spreadsheetId=spreadsheet_id, body=request_body ).execute() - + # Extract the new sheet information - new_sheet_props = result['replies'][0]['addSheet']['properties'] - + new_sheet_props = create_sheet_response['replies'][0]['addSheet']['properties'] + return { 'sheetId': new_sheet_props['sheetId'], 'title': new_sheet_props['title'], @@ -1069,26 +1044,23 @@ def share_spreadsheet(spreadsheet_id: str, } try: - result = drive_service.permissions().create( + permission_response = drive_service.permissions().create( fileId=spreadsheet_id, body=permission, sendNotificationEmail=send_notification, fields='id' ).execute() successes.append({ - 'email_address': email_address, - 'role': role, - 'permissionId': result.get('id') + 'email_address': email_address, + 'role': role, + 'permissionId': permission_response.get('id') }) except Exception as e: - # Try to provide a more informative error message error_details = str(e) if hasattr(e, 'content'): - try: - error_content = json.loads(e.content) - error_details = error_content.get('error', {}).get('message', error_details) - except json.JSONDecodeError: - pass # Keep the original error string + error_content_json = json.loads(e.content) if isinstance(e.content, (str, bytes)) else None + if error_content_json: + error_details = error_content_json.get('error', {}).get('message', error_details) failures.append({ 'email_address': email_address, 'error': f"Failed to share: {error_details}" @@ -1184,41 +1156,39 @@ def search_spreadsheets(query: str, f"(name contains '{query}' or fullText contains '{query}')" ) - try: - results = drive_service.files().list( - q=search_query, - pageSize=max_results, - spaces='drive', - includeItemsFromAllDrives=True, - supportsAllDrives=True, - fields='files(id, name, createdTime, modifiedTime, owners, webViewLink)', - orderBy='modifiedTime desc' - ).execute() + search_results = drive_service.files().list( + q=search_query, + pageSize=max_results, + spaces='drive', + includeItemsFromAllDrives=True, + supportsAllDrives=True, + fields='files(id, name, createdTime, modifiedTime, owners, webViewLink)', + orderBy='modifiedTime desc' + ).execute() - files = results.get('files', []) + files = search_results.get('files', []) - return [ - { - 'id': f['id'], - 'name': f['name'], - 'created_time': f.get('createdTime'), - 'modified_time': f.get('modifiedTime'), - 'owners': [owner.get('emailAddress') for owner in f.get('owners', [])], - 'web_link': f.get('webViewLink') - } - for f in files - ] - except Exception as e: - return [{'error': f'Search failed: {str(e)}'}] + return [ + { + 'id': f['id'], + 'name': f['name'], + 'created_time': f.get('createdTime'), + 'modified_time': f.get('modifiedTime'), + 'owners': [owner.get('emailAddress') for owner in f.get('owners', [])], + 'web_link': f.get('webViewLink') + } + for f in files + ] def _column_index_to_letter(index: int) -> str: """Convert 0-based column index to A1 notation letter (0='A', 25='Z', 26='AA', etc.)""" - result = "" + ALPHABET_SIZE = 26 + column_letter = "" while index >= 0: - result = chr(index % 26 + ord('A')) + result - index = index // 26 - 1 - return result + column_letter = chr(index % ALPHABET_SIZE + ord('A')) + column_letter + index = index // ALPHABET_SIZE - 1 + return column_letter @tool( @@ -1249,59 +1219,53 @@ def find_in_spreadsheet(spreadsheet_id: str, sheets_service = ctx.request_context.lifespan_context.sheets_service results = [] - try: - # Get spreadsheet metadata to find all sheets - spreadsheet = sheets_service.spreadsheets().get( - spreadsheetId=spreadsheet_id, - fields='sheets(properties(title,sheetId))' - ).execute() + spreadsheet = sheets_service.spreadsheets().get( + spreadsheetId=spreadsheet_id, + fields='sheets(properties(title,sheetId))' + ).execute() - sheets_to_search = [] - for s in spreadsheet.get('sheets', []): - sheet_title = s.get('properties', {}).get('title') - if sheet is None or sheet_title == sheet: - sheets_to_search.append(sheet_title) + sheets_to_search = [] + for s in spreadsheet.get('sheets', []): + sheet_title = s.get('properties', {}).get('title') + if sheet is None or sheet_title == sheet: + sheets_to_search.append(sheet_title) - if not sheets_to_search: - return [{'error': f"Sheet '{sheet}' not found"}] + if not sheets_to_search: + return [{'error': f"Sheet '{sheet}' not found"}] - search_query = query if case_sensitive else query.lower() + search_query = query if case_sensitive else query.lower() - for sheet_name in sheets_to_search: - if len(results) >= max_results: - break + for sheet_name in sheets_to_search: + if len(results) >= max_results: + break - # Get all data from the sheet - response = sheets_service.spreadsheets().values().get( - spreadsheetId=spreadsheet_id, - range=sheet_name - ).execute() + response = sheets_service.spreadsheets().values().get( + spreadsheetId=spreadsheet_id, + range=sheet_name + ).execute() + + values = response.get('values', []) - values = response.get('values', []) + for row_idx, row in enumerate(values): + if len(results) >= max_results: + break - for row_idx, row in enumerate(values): + for col_idx, cell_value in enumerate(row): if len(results) >= max_results: break - for col_idx, cell_value in enumerate(row): - if len(results) >= max_results: - break - - cell_str = str(cell_value) - compare_value = cell_str if case_sensitive else cell_str.lower() + cell_str = str(cell_value) + compare_value = cell_str if case_sensitive else cell_str.lower() - if search_query in compare_value: - cell_ref = f"{_column_index_to_letter(col_idx)}{row_idx + 1}" - results.append({ - 'sheet': sheet_name, - 'cell': cell_ref, - 'value': cell_value - }) + if search_query in compare_value: + cell_ref = f"{_column_index_to_letter(col_idx)}{row_idx + 1}" + results.append({ + 'sheet': sheet_name, + 'cell': cell_ref, + 'value': cell_value + }) - return results - - except Exception as e: - return [{'error': f'Search failed: {str(e)}'}] + return results @tool( @@ -1379,14 +1343,14 @@ def batch_update(spreadsheet_id: str, request_body = { "requests": requests } - + # Execute the batch update - result = sheets_service.spreadsheets().batchUpdate( + batch_update_result = sheets_service.spreadsheets().batchUpdate( spreadsheetId=spreadsheet_id, body=request_body ).execute() - - return result + + return batch_update_result def main():