diff --git a/README.md b/README.md index b4802ae..f787201 100644 --- a/README.md +++ b/README.md @@ -162,7 +162,6 @@ When filtering, use these exact tool names (comma-separated, no spaces): - `create_sheet` - `create_spreadsheet` - `find_in_spreadsheet` -- `get_multiple_sheet_data` - `get_multiple_spreadsheet_summary` - `get_sheet_data` - `get_sheet_formulas` @@ -193,17 +192,57 @@ _Refer to the [ID Reference Guide](#-id-reference-guide) for more information ab * `title` (string): The desired title for the spreadsheet. Example: "Quarterly Report Q4". * `folder_id` (optional string): Google Drive folder ID where the spreadsheet should be created. Get from its URL. If omitted, uses configured default or root. * _Returns:_ Object with spreadsheet info, including `spreadsheetId`, `title`, and `folder`. -* **`get_sheet_data`**: Reads data from a range in a sheet/tab. +* **`get_sheet_data`**: Reads data from one or more ranges in a SINGLE spreadsheet. **Supports batching** - multiple ranges are fetched in a single API call (5-10x faster than separate calls). * `spreadsheet_id` (string): The spreadsheet ID (from its URL). - * `sheet` (string): Name of the sheet/tab (e.g., "Sheet1"). - * `range` (optional string): A1 notation (e.g., `'A1:C10'`, `'Sheet1!B2:D'`). If omitted, reads the whole sheet/tab specified by `sheet`. - * `include_grid_data` (optional boolean, default `False`): If `True`, returns full grid data including formatting and metadata (much larger). If `False`, returns values only (more efficient). - * _Returns:_ If `include_grid_data=True`, full grid data with metadata ([`get` response](https://developers.google.com/workspace/sheets/api/reference/rest/v4/spreadsheets/get#response-body)). If `False`, a values result object from the Values API ([`values.get` response](https://developers.google.com/workspace/sheets/api/reference/rest/v4/spreadsheets.values/get#response-body)). -* **`get_sheet_formulas`**: Reads formulas from a range in a sheet/tab. + * `ranges` (string or array of strings): Either a single range or multiple ranges in A1 notation with sheet name. + * Single range: `"Sheet1!A1:B10"` or just `"Sheet1"` for entire sheet + * Multiple ranges (batched): `["Sheet1!A1:B10", "Sheet2!C1:D20", "Data!E:F"]` + * **Note:** All ranges must be from the SAME spreadsheet. For multiple different spreadsheets, call this tool separately for each. + * _Returns:_ Dictionary with `spreadsheetId` and `valueRanges` (array of results). Each result has `range` and `values` (2D array). + + **Examples:** + ```python + # Single range + data = get_sheet_data("spreadsheet-id", "Sheet1!A1:B10") + + # Multiple ranges - BATCHED in one API call (recommended for multiple ranges) + data = get_sheet_data("spreadsheet-id", ["Sheet1!A1:B10", "Sheet2!C1:D20"]) + + # Entire sheets + data = get_sheet_data("spreadsheet-id", ["Sales", "Inventory", "Reports"]) + ``` +* **`get_sheet_formulas`**: Reads formulas from one or more ranges in a SINGLE spreadsheet. **Supports batching** - multiple ranges are fetched in a single API call (5-10x faster than separate calls). * `spreadsheet_id` (string): The spreadsheet ID (from its URL). - * `sheet` (string): Name of the sheet/tab (e.g., "Sheet1"). - * `range` (optional string): A1 notation (e.g., `'A1:C10'`, `'Sheet1!B2:D'`). If omitted, reads all formulas in the sheet/tab specified by `sheet`. - * _Returns:_ 2D array of cell formulas (array of arrays) ([`values.get` response](https://developers.google.com/workspace/sheets/api/reference/rest/v4/spreadsheets.values/get#response-body)). + * `ranges` (string or array of strings): Either a single range or multiple ranges in A1 notation with sheet name. + * Single range: `"Sheet1!A1:C10"` or just `"Sheet1"` for entire sheet + * Multiple ranges (batched): `["Sheet1!B:B", "Sheet2!C:C", "Sheet3!D:D"]` + * **Note:** All ranges must be from the SAME spreadsheet. For multiple different spreadsheets, call this tool separately for each. + * `format` (optional string, default `'A1'`): Formula notation format. + * `'A1'`: Returns formulas in A1 notation (e.g., `=SUM(A1:A3)`, `=B2*2`) + * `'R1C1'`: Returns formulas in R1C1 notation (e.g., `=SUM(R[-2]C:RC)`, `=RC[-1]*2`) + * R1C1 format is useful for identifying unique formula patterns across ranges, as relative references normalize to the same pattern regardless of cell position. + * _Returns:_ Dictionary with `spreadsheetId` and `valueRanges` (array of results). Each result has `range` and `values` (2D array of formulas). + + **Examples:** + ```python + # Single range - formulas in A1 notation (default) + data = get_sheet_formulas('spreadsheet-id', 'Sheet1!B1:B10') + # Returns: {valueRanges: [{range: "Sheet1!B1:B10", values: [['=SUM(A1:A3)'], ...]}]} + + # Multiple ranges - BATCHED, with R1C1 for pattern analysis + data = get_sheet_formulas('spreadsheet-id', ['Sheet1!B:B', 'Sheet2!C:C'], format='R1C1') + # Returns: {valueRanges: [{range: "Sheet1!B:B", values: [['=SUM(RC[-1]:R[2]C[-1])'], ...]}, ...]} + + # Use R1C1 to identify unique formula patterns across sheets + from collections import defaultdict + formula_patterns = defaultdict(list) + for vr in data['valueRanges']: + for row_idx, row in enumerate(vr['values']): + for col_idx, formula in enumerate(row): + if formula.startswith('='): + formula_patterns[formula].append((vr['range'], row_idx, col_idx)) + # Now formula_patterns maps unique formulas to their locations across all sheets + ``` * **`update_cells`**: Writes data to a specific range. Overwrites existing data. * `spreadsheet_id` (string): The spreadsheet ID (from its URL). * `sheet` (string): Name of the sheet/tab (e.g., "Sheet1"). @@ -228,9 +267,6 @@ _Refer to the [ID Reference Guide](#-id-reference-guide) for more information ab * `spreadsheet_id` (string): The spreadsheet ID (from its URL). * `title` (string): Name for the new sheet/tab. * _Returns:_ New sheet properties object. -* **`get_multiple_sheet_data`**: Fetches data from multiple ranges across potentially different spreadsheets in one call. - * `queries` (array of objects): Each object needs `spreadsheet_id`, `sheet`, and `range`. Example: `[{"spreadsheet_id": "abc", "sheet": "Sheet1", "range": "A1:B2"}, ...]`. - * _Returns:_ List of objects, each containing the query params and fetched `data` or an `error`. Each `data` is a [`values.get` response](https://developers.google.com/workspace/sheets/api/reference/rest/v4/spreadsheets.values/get#response-body). * **`get_multiple_spreadsheet_summary`**: Gets titles, sheet/tab names, headers, and first few rows for multiple spreadsheets. * `spreadsheet_ids` (array of strings): IDs of the spreadsheets (from their URLs). * `rows_to_fetch` (optional integer, default `5`): How many rows (including header) to preview. Example: `5`. @@ -314,6 +350,18 @@ _Refer to the [ID Reference Guide](#-id-reference-guide) for more information ab * `CREDENTIALS_PATH`: Path to the downloaded OAuth credentials JSON file (default: `credentials.json`). * `TOKEN_PATH`: Path to store the user's refresh token after first login (default: `token.json`). Must be writable. +### Method B2: Token Relay Mode (OpenShift/Kubernetes + End User Auth) ðŸšĒ + +* **Why?** Perfect for containerized deployments (OpenShift, Kubernetes) where you need end-user authentication but can't use interactive browser flows. Your frontend handles OAuth, and passes tokens to the MCP server. +* **Use Case:** OpenShift/Kubernetes deployments with frontend applications that need to relay end-user identity. +* **Steps:** + 1. **Frontend OAuth:** Your frontend app (React, Vue, Angular, etc.) handles the OAuth flow and obtains user access tokens. + 2. **Pass Token to MCP:** Frontend passes the access token to MCP server via environment variable. + 3. **Set Environment Variable:** + * `USER_ACCESS_TOKEN`: OAuth access token obtained by your frontend application. + 4. **Token Refresh:** Your frontend is responsible for refreshing tokens (they expire in ~1 hour). +* **📖 Full Guide:** See [Token Relay Mode Documentation](docs/TOKEN_RELAY_MODE.md) for complete setup instructions, frontend examples (JavaScript, Python), OpenShift deployment manifests, and security best practices. + ### Method C: Direct Credential Injection (Advanced) 🔒 * **Why?** Useful in environments like Docker, Kubernetes, or CI/CD where managing files is hard, but environment variables are easy/secure. Avoids file system access. @@ -359,15 +407,17 @@ _Refer to the [ID Reference Guide](#-id-reference-guide) for more information ab The server checks for credentials in this order: -1. `CREDENTIALS_CONFIG` (Base64 content) -2. `SERVICE_ACCOUNT_PATH` (Path to Service Account JSON) -3. `CREDENTIALS_PATH` (Path to OAuth JSON) - triggers interactive flow if token is missing/expired -4. **Application Default Credentials (ADC)** - automatic fallback +1. `USER_ACCESS_TOKEN` (External OAuth token - **Token Relay Mode**) +2. `CREDENTIALS_CONFIG` (Base64 content) +3. `SERVICE_ACCOUNT_PATH` (Path to Service Account JSON) +4. `CREDENTIALS_PATH` (Path to OAuth JSON) - triggers interactive flow if token is missing/expired +5. **Application Default Credentials (ADC)** - automatic fallback **Environment Variable Summary:** | Variable | Method(s) | Description | Default | |:---------------------------------|:----------------------------|:-----------------------------------------------------------------|:-------------------| +| `USER_ACCESS_TOKEN` | Token Relay | OAuth access token from external source (frontend app). Enables end-user auth in containers. | - | | `SERVICE_ACCOUNT_PATH` | Service Account | Path to the Service Account JSON key file (MCP server specific). | - | | `GOOGLE_APPLICATION_CREDENTIALS` | ADC | Path to service account key (Google's standard variable). | - | | `DRIVE_FOLDER_ID` | Service Account | ID of the Google Drive folder shared with the Service Account. | - | diff --git a/docs/LOCAL_TESTING.md b/docs/LOCAL_TESTING.md new file mode 100644 index 0000000..9ca6082 --- /dev/null +++ b/docs/LOCAL_TESTING.md @@ -0,0 +1,227 @@ +# Local Testing Guide + +This guide explains how to test the MCP Google Sheets server on your local machine. + +## Prerequisites + +### 1. Python 3.10 or Higher + +Check your Python version: +```bash +python --version +``` + +If you need to upgrade, visit [python.org](https://www.python.org/downloads/). + +### 2. Install `uv` (Python Package Manager) + +```bash +# Linux/macOS +curl -LsSf https://astral.sh/uv/install.sh | sh + +# Or using pip +pip install uv +``` + +Follow the installer instructions to add `uv` to your PATH if needed. + +### 3. Google Cloud Platform Setup + +You must configure Google Cloud Platform credentials and enable the necessary APIs: + +1. **Create/Select a GCP Project** + - Go to [Google Cloud Console](https://console.cloud.google.com/) + - Create a new project or select an existing one + +2. **Enable Required APIs** + - Navigate to "APIs & Services" → "Library" + - Search for and enable: + - **Google Sheets API** + - **Google Drive API** + +### 4. Set Up Authentication + +We recommend using a **Service Account** for local testing. + +#### Create a Service Account + +1. In GCP Console → "IAM & Admin" → "Service Accounts" +2. Click "+ CREATE SERVICE ACCOUNT" +3. Name it (e.g., `mcp-sheets-service`) +4. Grant **Editor** role +5. Click "Done" +6. Find the account, click Actions (â‹Ū) → "Manage keys" +7. Click "ADD KEY" → "Create new key" → **JSON** → "CREATE" +8. **Download and securely store** the JSON key file + +#### Create & Share a Google Drive Folder + +1. In [Google Drive](https://drive.google.com/), create a folder (e.g., "MCP Test Sheets") +2. Note the **Folder ID** from the URL: + ``` + https://drive.google.com/drive/folders/1xcRQCU9xrNVBPTeNzHqx4hrG7yR91WIa + └────────── Folder ID ──────────┘ + ``` +3. Right-click the folder → "Share" +4. Enter the Service Account's email (from the JSON file `client_email`) +5. Grant **Editor** access +6. Uncheck "Notify people" +7. Click "Share" + +#### Set Environment Variables + +**Linux/macOS:** +```bash +export SERVICE_ACCOUNT_PATH="/path/to/your/service-account-key.json" +export DRIVE_FOLDER_ID="YOUR_DRIVE_FOLDER_ID" +``` + +**Windows CMD:** +```cmd +set SERVICE_ACCOUNT_PATH="C:\path\to\your\service-account-key.json" +set DRIVE_FOLDER_ID="YOUR_DRIVE_FOLDER_ID" +``` + +**Windows PowerShell:** +```powershell +$env:SERVICE_ACCOUNT_PATH = "C:\path\to\your\service-account-key.json" +$env:DRIVE_FOLDER_ID = "YOUR_DRIVE_FOLDER_ID" +``` + +## Running the Server + +### Option 1: Development Mode (From Cloned Repo) + +If you've cloned the repository and want to test local changes: + +```bash +# Navigate to the project directory +cd /path/to/mcp-google-sheets + +# Run using uv +uv run mcp-google-sheets +``` + +The server will start and print logs indicating it's ready. + +### Option 2: Using `uvx` (Test Published Package) + +To test the server as end-users would experience it: + +```bash +uvx mcp-google-sheets@latest +``` + +This downloads and runs the latest published version. + +## Testing with an MCP Client + +You need an MCP-compatible client to test the functionality. The most common option is **Claude Desktop**. + +### Configure Claude Desktop + +Add the server configuration to your `claude_desktop_config.json`: + +**For Development Testing (local code):** +```json +{ + "mcpServers": { + "mcp-google-sheets-local": { + "command": "uv", + "args": [ + "run", + "--directory", + "/home/jfenal/dev/gh/mcp-google-sheets", + "mcp-google-sheets" + ], + "env": { + "SERVICE_ACCOUNT_PATH": "/path/to/service-account.json", + "DRIVE_FOLDER_ID": "your_folder_id" + } + } + } +} +``` + +**For Production Testing (published package):** +```json +{ + "mcpServers": { + "google-sheets": { + "command": "uvx", + "args": ["mcp-google-sheets@latest"], + "env": { + "SERVICE_ACCOUNT_PATH": "/path/to/service-account.json", + "DRIVE_FOLDER_ID": "your_folder_id" + } + } + } +} +``` + +**macOS Note:** If you encounter a `spawn uvx ENOENT` error, use the full path: +```json +"command": "/Users/yourusername/.local/bin/uvx" +``` + +## Testing the Functionality + +Once the server is running and connected to Claude Desktop, try these test prompts: + +### Basic Operations +- "List all spreadsheets I have access to." +- "Create a new spreadsheet titled 'Test Spreadsheet'." +- "List the sheets in spreadsheet ID ``." + +### Data Operations +- "Get data from range A1:C10 in Sheet1 of spreadsheet ``." +- "Update cell B2 to 'Test Value' in Sheet1 of spreadsheet ``." +- "Add a new sheet named 'Testing' to spreadsheet ``." + +### Advanced Operations +- "Get summaries of multiple spreadsheets." +- "Share spreadsheet `` with test@example.com as a reader." +- "Copy Sheet1 from spreadsheet `` to spreadsheet ``." + +## Testing Token Relay Mode (OpenShift Feature) + +If you're testing the token relay functionality for containerized deployments, see the [Token Relay Mode Documentation](TOKEN_RELAY_MODE.md) for detailed setup instructions. + +## Troubleshooting + +### Authentication Errors +- Verify your service account JSON file path is correct +- Ensure the folder is shared with the service account's email +- Check that both Google Sheets API and Google Drive API are enabled in GCP + +### Connection Issues +- Verify environment variables are set correctly +- Check that Claude Desktop config JSON is valid +- Look for errors in the server logs +- Restart Claude Desktop after config changes + +### Permission Errors +- Ensure the service account has Editor access to the shared folder +- Verify the folder ID is correct +- Check that spreadsheets are within the shared folder (for service accounts) + +## Alternative Authentication Methods + +### OAuth 2.0 (Interactive Login) +See the main [README.md](../README.md#method-b-oauth-20-interactive--personal-use-) for OAuth setup instructions. + +### Application Default Credentials (ADC) +For Google Cloud environments or local development with `gcloud`: + +```bash +gcloud auth application-default login --scopes=https://www.googleapis.com/auth/cloud-platform,https://www.googleapis.com/auth/spreadsheets,https://www.googleapis.com/auth/drive +gcloud auth application-default set-quota-project +``` + +Then run the server without explicit credential environment variables. + +## Next Steps + +- Review the [README.md](../README.md) for complete feature documentation +- Check [TOKEN_RELAY_MODE.md](TOKEN_RELAY_MODE.md) for containerized deployment testing +- Explore all 19 available tools and their parameters diff --git a/scripts/benchmark_a1_to_r1c1.py b/scripts/benchmark_a1_to_r1c1.py new file mode 100644 index 0000000..305ca45 --- /dev/null +++ b/scripts/benchmark_a1_to_r1c1.py @@ -0,0 +1,231 @@ +#!/usr/bin/env python +""" +Benchmark different approaches for converting A1 notation to R1C1 notation. +""" + +import re +import time +from typing import Tuple, Optional + +# ============================================================================ +# Approach 1: Computed conversion (parse column letters on the fly) +# ============================================================================ + +def column_letter_to_number_computed(col: str) -> int: + """Convert column letter(s) to number (A=1, Z=26, AA=27, etc.)""" + num = 0 + for char in col: + num = num * 26 + (ord(char) - ord('A') + 1) + return num + +def a1_to_r1c1_computed(formula: str, current_row: int, current_col: int) -> str: + """Convert A1 notation to R1C1 using computed column conversion.""" + + def replace_cell_ref(match): + sheet_prefix = match.group(1) or '' + col_abs = match.group(2) # $ before column + col_letters = match.group(3) + row_abs = match.group(4) # $ before row + row_num = match.group(5) + + # Convert column letters to number + col_num = column_letter_to_number_computed(col_letters) + row = int(row_num) + + # Build R1C1 notation + if row_abs: + row_part = f"R{row}" + else: + offset = row - current_row + row_part = f"R[{offset}]" if offset != 0 else "R" + + if col_abs: + col_part = f"C{col_num}" + else: + offset = col_num - current_col + col_part = f"C[{offset}]" if offset != 0 else "C" + + return f"{sheet_prefix}{row_part}{col_part}" + + # Regex to match cell references + # Group 1: Optional sheet name with ! + # Group 2: Optional $ before column + # Group 3: Column letters + # Group 4: Optional $ before row + # Group 5: Row number + pattern = r"(?:([^!]+!))?(\$)?([A-Z]+)(\$)?(\d+)" + + return re.sub(pattern, replace_cell_ref, formula) + + +# ============================================================================ +# Approach 2: LUT for column letters (pre-computed lookup table) +# ============================================================================ + +# Pre-build lookup table for columns A-ZZZ (up to column 18278) +def build_column_lut(max_col: int = 1000) -> dict: + """Build lookup table for column letters to numbers.""" + lut = {} + for i in range(1, max_col + 1): + col_letter = "" + num = i + while num > 0: + num -= 1 + col_letter = chr(num % 26 + ord('A')) + col_letter + num //= 26 + lut[col_letter] = i + return lut + +COLUMN_LUT = build_column_lut(1000) + +def a1_to_r1c1_lut(formula: str, current_row: int, current_col: int) -> str: + """Convert A1 notation to R1C1 using lookup table for columns.""" + + def replace_cell_ref(match): + sheet_prefix = match.group(1) or '' + col_abs = match.group(2) + col_letters = match.group(3) + row_abs = match.group(4) + row_num = match.group(5) + + # Use LUT for column conversion + col_num = COLUMN_LUT.get(col_letters) + if col_num is None: + # Fallback for columns beyond LUT + col_num = column_letter_to_number_computed(col_letters) + + row = int(row_num) + + # Build R1C1 notation + if row_abs: + row_part = f"R{row}" + else: + offset = row - current_row + row_part = f"R[{offset}]" if offset != 0 else "R" + + if col_abs: + col_part = f"C{col_num}" + else: + offset = col_num - current_col + col_part = f"C[{offset}]" if offset != 0 else "C" + + return f"{sheet_prefix}{row_part}{col_part}" + + pattern = r"(?:([^!]+!))?(\$)?([A-Z]+)(\$)?(\d+)" + return re.sub(pattern, replace_cell_ref, formula) + + +# ============================================================================ +# Approach 3: Compiled regex (pre-compile the pattern) +# ============================================================================ + +CELL_REF_PATTERN = re.compile(r"(?:([^!]+!))?(\$)?([A-Z]+)(\$)?(\d+)") + +def a1_to_r1c1_compiled(formula: str, current_row: int, current_col: int) -> str: + """Convert A1 notation to R1C1 using pre-compiled regex and LUT.""" + + def replace_cell_ref(match): + sheet_prefix = match.group(1) or '' + col_abs = match.group(2) + col_letters = match.group(3) + row_abs = match.group(4) + row_num = match.group(5) + + col_num = COLUMN_LUT.get(col_letters, column_letter_to_number_computed(col_letters)) + row = int(row_num) + + # Build R1C1 notation + if row_abs: + row_part = f"R{row}" + else: + offset = row - current_row + row_part = f"R[{offset}]" if offset != 0 else "R" + + if col_abs: + col_part = f"C{col_num}" + else: + offset = col_num - current_col + col_part = f"C[{offset}]" if offset != 0 else "C" + + return f"{sheet_prefix}{row_part}{col_part}" + + return CELL_REF_PATTERN.sub(replace_cell_ref, formula) + + +# ============================================================================ +# Benchmark +# ============================================================================ + +def benchmark(): + """Run benchmarks on different conversion approaches.""" + + # Test formulas (realistic examples) + test_cases = [ + ("=SUM(A1:A10)", 5, 2), + ("=A2*2", 3, 2), + ("=IF($A$1>0,B2*C2,0)", 2, 4), + ("=VLOOKUP(A5,Sheet1!$A$1:$D$100,2,FALSE)", 5, 3), + ("=INDEX($A$1:$Z$1000,MATCH(B10,A:A,0),3)", 10, 5), + ("=SUMIF(Data!A:A,\">100\",Data!B:B)", 1, 1), + ("=A1+B1+C1+D1+E1+F1", 1, 1), + ("=$A1*B$2+C3", 3, 3), + ] + + # Repeat test cases to simulate processing a large sheet + large_test_set = test_cases * 1000 # 8,000 formulas + + methods = [ + ("Computed (no LUT)", a1_to_r1c1_computed), + ("LUT for columns", a1_to_r1c1_lut), + ("Compiled regex + LUT", a1_to_r1c1_compiled), + ] + + print("="*80) + print("A1 to R1C1 Conversion Benchmark") + print("="*80) + print(f"\nTest set: {len(test_cases)} unique formulas × 1000 = {len(large_test_set)} conversions\n") + + # Verify all methods produce same results + print("Verifying correctness...") + for formula, row, col in test_cases: + results = [method(formula, row, col) for name, method in methods] + if len(set(results)) != 1: + print(f"ERROR: Methods produce different results for {formula}") + for i, (name, _) in enumerate(methods): + print(f" {name}: {results[i]}") + else: + print(f"✓ {formula} → {results[0]}") + + print("\n" + "="*80) + print("Performance Benchmark") + print("="*80 + "\n") + + results = [] + + for name, method in methods: + start = time.perf_counter() + + for formula, row, col in large_test_set: + _ = method(formula, row, col) + + end = time.perf_counter() + elapsed = end - start + per_conversion = (elapsed / len(large_test_set)) * 1_000_000 # microseconds + + results.append((name, elapsed, per_conversion)) + print(f"{name:25s}: {elapsed:.4f}s total, {per_conversion:.2f}Ξs per conversion") + + # Show relative performance + print("\n" + "-"*80) + print("Relative Performance (vs fastest):") + print("-"*80) + + fastest_time = min(r[1] for r in results) + for name, elapsed, per_conv in results: + speedup = elapsed / fastest_time + print(f"{name:25s}: {speedup:.2f}x {'(FASTEST)' if speedup == 1.0 else ''}") + + print("\n" + "="*80) + +if __name__ == '__main__': + benchmark() diff --git a/scripts/test_formula_notation.py b/scripts/test_formula_notation.py new file mode 100644 index 0000000..37f4afd --- /dev/null +++ b/scripts/test_formula_notation.py @@ -0,0 +1,164 @@ +#!/usr/bin/env python +""" +Test script to determine what format formulas are returned in by the Google Sheets API. +Tests both spreadsheets.values.get and spreadsheets.get methods. +""" + +import os +import json +from google.oauth2 import service_account +from googleapiclient.discovery import build + +# Use service account from environment +SERVICE_ACCOUNT_PATH = os.environ.get('SERVICE_ACCOUNT_PATH') +SCOPES = ['https://www.googleapis.com/auth/spreadsheets', 'https://www.googleapis.com/auth/drive'] + +def test_formula_formats(spreadsheet_id=None): + """Test what format formulas are returned in.""" + + # Authenticate + if not SERVICE_ACCOUNT_PATH or not os.path.exists(SERVICE_ACCOUNT_PATH): + print("ERROR: SERVICE_ACCOUNT_PATH not set or file doesn't exist") + print("Please set SERVICE_ACCOUNT_PATH environment variable") + return + + creds = service_account.Credentials.from_service_account_file( + SERVICE_ACCOUNT_PATH, + scopes=SCOPES + ) + + sheets_service = build('sheets', 'v4', credentials=creds) + drive_service = build('drive', 'v3', credentials=creds) + + created_new = False + + if not spreadsheet_id: + # Create a test spreadsheet + print("Creating test spreadsheet...") + file_body = { + 'name': 'R1C1 Formula Test', + 'mimeType': 'application/vnd.google-apps.spreadsheet', + } + + spreadsheet = drive_service.files().create( + body=file_body, + fields='id, name' + ).execute() + + spreadsheet_id = spreadsheet.get('id') + created_new = True + print(f"Created spreadsheet: {spreadsheet_id}") + else: + print(f"Using existing spreadsheet: {spreadsheet_id}") + + # Get first sheet name + spreadsheet_metadata = sheets_service.spreadsheets().get(spreadsheetId=spreadsheet_id).execute() + first_sheet = spreadsheet_metadata['sheets'][0]['properties']['title'] + print(f"Using sheet: {first_sheet}") + + if created_new: + # Add some test formulas + # Put values in A1:A3 and a formula in B1 + print("\nAdding test data and formulas...") + sheets_service.spreadsheets().values().update( + spreadsheetId=spreadsheet_id, + range=f'{first_sheet}!A1:A3', + valueInputOption='USER_ENTERED', + body={'values': [[10], [20], [30]]} + ).execute() + + # Add a formula that references cells above + sheets_service.spreadsheets().values().update( + spreadsheetId=spreadsheet_id, + range=f'{first_sheet}!B1', + valueInputOption='USER_ENTERED', + body={'values': [['=SUM(A1:A3)']]} + ).execute() + + # Add another formula with relative reference + sheets_service.spreadsheets().values().update( + spreadsheetId=spreadsheet_id, + range=f'{first_sheet}!B2', + valueInputOption='USER_ENTERED', + body={'values': [['=A2*2']]} + ).execute() + + test_range = f'{first_sheet}!B1:B2' + else: + # Use existing sheet - find some cells with formulas + print("\nScanning for existing formulas...") + test_range = f'{first_sheet}!A1:Z100' # Scan a reasonable range + + print("\n" + "="*80) + print("TEST 1: Using spreadsheets.values.get with valueRenderOption='FORMULA'") + print("="*80) + + # Test 1: values.get with FORMULA option + result = sheets_service.spreadsheets().values().get( + spreadsheetId=spreadsheet_id, + range=test_range, + valueRenderOption='FORMULA' + ).execute() + + print(f"\nResult from values.get (first 10 formulas found):") + values = result.get('values', []) + formula_count = 0 + for row_idx, row in enumerate(values): + for col_idx, cell in enumerate(row): + if isinstance(cell, str) and cell.startswith('='): + print(f" Row {row_idx+1}, Col {col_idx+1}: {cell[:100]}") + formula_count += 1 + if formula_count >= 10: + break + if formula_count >= 10: + break + print(f"\nTotal formulas found: {formula_count}") + + print("\n" + "="*80) + print("TEST 2: Using spreadsheets.get with includeGridData=True") + print("="*80) + + # Test 2: spreadsheets.get with grid data + result2 = sheets_service.spreadsheets().get( + spreadsheetId=spreadsheet_id, + ranges=[test_range], + includeGridData=True + ).execute() + + print(f"\nResult from spreadsheets.get (full structure):") + print(json.dumps(result2, indent=2)) + + # Extract just the formula values for clarity + if 'sheets' in result2: + for sheet in result2['sheets']: + if 'data' in sheet: + for grid_data in sheet['data']: + if 'rowData' in grid_data: + print("\n" + "-"*80) + print("Extracted formula values from CellData:") + print("-"*80) + for row_idx, row in enumerate(grid_data['rowData']): + if 'values' in row: + for col_idx, cell in enumerate(row['values']): + if 'userEnteredValue' in cell: + user_val = cell['userEnteredValue'] + if 'formulaValue' in user_val: + print(f"Cell B{row_idx+1}: {user_val['formulaValue']}") + + # Clean up - delete the test spreadsheet if we created it + print("\n" + "="*80) + if created_new: + delete = input("\nDelete test spreadsheet? (y/n): ") + if delete.lower() == 'y': + drive_service.files().delete(fileId=spreadsheet_id).execute() + print(f"Deleted spreadsheet {spreadsheet_id}") + else: + print(f"\nTest spreadsheet URL: https://docs.google.com/spreadsheets/d/{spreadsheet_id}/edit") + else: + print(f"\nUsed existing spreadsheet: https://docs.google.com/spreadsheets/d/{spreadsheet_id}/edit") + +if __name__ == '__main__': + import sys + # Allow passing spreadsheet ID as argument + spreadsheet_id = sys.argv[1] if len(sys.argv) > 1 else None + test_formula_formats(spreadsheet_id) diff --git a/src/mcp_google_sheets/server.py b/src/mcp_google_sheets/server.py index ecbd732..ecdf7e3 100644 --- a/src/mcp_google_sheets/server.py +++ b/src/mcp_google_sheets/server.py @@ -5,14 +5,33 @@ """ import base64 +import functools +import logging import os +import re import sys +import time +import random +import warnings +from datetime import datetime from typing import List, Dict, Any, Optional, Union import json from dataclasses import dataclass from contextlib import asynccontextmanager from collections.abc import AsyncIterator +# Suppress deprecation warning from Google auth libraries about file_cache +# The google-auth-oauthlib library prints this warning to stderr which bypasses our logging +warnings.filterwarnings('ignore', message='file_cache is only supported with oauth2client<4.0.0') + +# Configure logging BEFORE importing MCP to suppress its verbose messages +# MCP logs "Processing request of type..." at INFO level - we need to block those +logging.basicConfig(level=logging.CRITICAL, format='%(message)s', force=True) +logging.getLogger('mcp').setLevel(logging.CRITICAL) +logging.getLogger('mcp.server').setLevel(logging.CRITICAL) +logging.getLogger('mcp.server.lowlevel').setLevel(logging.CRITICAL) +logging.getLogger('mcp.server.lowlevel.server').setLevel(logging.CRITICAL) + # MCP imports from mcp.server.fastmcp import FastMCP, Context from mcp.types import ToolAnnotations @@ -23,6 +42,7 @@ from google.auth.transport.requests import Request from google_auth_oauthlib.flow import InstalledAppFlow from googleapiclient.discovery import build +from googleapiclient.errors import HttpError import google.auth # Constants @@ -32,34 +52,212 @@ CREDENTIALS_PATH = os.environ.get('CREDENTIALS_PATH', 'credentials.json') SERVICE_ACCOUNT_PATH = os.environ.get('SERVICE_ACCOUNT_PATH', 'service_account.json') DRIVE_FOLDER_ID = os.environ.get('DRIVE_FOLDER_ID', '') # Working directory in Google Drive +USER_ACCESS_TOKEN = os.environ.get('USER_ACCESS_TOKEN') # External OAuth token for token relay mode + +# Tool filtering: parse from environment variable or command-line +_enabled_tools_str = None +for i, arg in enumerate(sys.argv): + if arg == '--include-tools' and i + 1 < len(sys.argv): + _enabled_tools_str = sys.argv[i + 1] + break + +if not _enabled_tools_str: + _enabled_tools_str = os.environ.get('ENABLED_TOOLS') + +if _enabled_tools_str: + _tools_set = {tool.strip() for tool in _enabled_tools_str.split(',') if tool.strip()} + ENABLED_TOOLS = _tools_set if _tools_set else None +else: + ENABLED_TOOLS = None + +# API Configuration +API_TIMEOUT = 180 # Google Sheets API timeout limit in seconds +MAX_RETRIES = 5 +MAX_BACKOFF = 64 # Maximum backoff time in seconds +HTTP_TOO_MANY_REQUESTS = 429 # HTTP status code for rate limiting +ALPHABET_SIZE = 26 # Number of letters in the alphabet (A-Z) + +# Logging configuration +LOG_LEVELS = {'DEBUG': 10, 'INFO': 20, 'WARN': 30, 'ERROR': 40} +_LOG_LEVEL = LOG_LEVELS.get(os.environ.get('LOG_LEVEL', 'INFO').upper(), LOG_LEVELS['INFO']) + +def log(message: str, level: str = 'INFO'): + """Log message with timestamp and level to stderr""" + level_value = LOG_LEVELS.get(level.upper(), LOG_LEVELS['INFO']) + if level_value >= _LOG_LEVEL: + timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3] + print(f"[{timestamp}] [{level:5s}] {message}", file=sys.stderr, flush=True) + +def execute_with_retry(request, operation_name: str = "API call"): + """ + Execute a Google API request with exponential backoff retry on 429 rate limit errors. + + Implements Google's recommended retry strategy: + - Exponential backoff with randomized delays + - Maximum backoff of 32-64 seconds + - Up to 5 retries + + Args: + request: A Google API request object (from sheets_service.spreadsheets().get() etc.) + operation_name: Human-readable name of the operation for logging + + Returns: + The result of the API call -# Tool filtering configuration -# Parse enabled tools from environment variable or command-line argument -def _parse_enabled_tools() -> Optional[set]: + Raises: + HttpError: If the error is not a rate limit or max retries exceeded """ - Parse enabled tools from ENABLED_TOOLS environment variable or --include-tools argument. - Returns None if all tools should be enabled (default behavior). - Returns a set of tool names if filtering is requested. + last_exception = None + start_time = time.time() + log(f"Starting: {operation_name}", 'DEBUG') + + for attempt in range(MAX_RETRIES): + try: + api_response = request.execute() + elapsed = time.time() - start_time + log(f"✓ {operation_name} ({elapsed:.2f}s)", 'DEBUG') + return api_response + except HttpError as e: + if e.resp.status == HTTP_TOO_MANY_REQUESTS: + if attempt == MAX_RETRIES - 1: + elapsed = time.time() - start_time + log(f"✗ Rate limit max retries ({MAX_RETRIES}) after {elapsed:.2f}s: {operation_name}", 'ERROR') + raise + + # Calculate exponential backoff with jitter + base_delay = min(2 ** attempt, MAX_BACKOFF) + jitter = random.uniform(0, 1) # Up to 1 second randomization + delay = base_delay + jitter + + log(f"Rate limit hit, retry {attempt + 1}/{MAX_RETRIES} after {delay:.2f}s: {operation_name}", 'WARN') + time.sleep(delay) + last_exception = e + else: + # Not a rate limit error, log and raise immediately + elapsed = time.time() - start_time + log(f"✗ HTTP {e.resp.status} after {elapsed:.2f}s: {operation_name} - {e}", 'ERROR') + raise + except Exception as e: + # Other exceptions, log and raise immediately + elapsed = time.time() - start_time + log(f"✗ {type(e).__name__} after {elapsed:.2f}s: {operation_name} - {e}", 'ERROR') + raise + + # Should not reach here, but if we do, raise the last exception + if last_exception: + raise last_exception + +def _setup_sheets_api_call(ctx: Context, sheet: str, range_spec: Optional[str] = None) -> tuple: """ - # Check command-line arguments first - enabled_tools_str = None - for i, arg in enumerate(sys.argv): - if arg == '--include-tools' and i + 1 < len(sys.argv): - enabled_tools_str = sys.argv[i + 1] - break - - # Fall back to environment variable - if not enabled_tools_str: - enabled_tools_str = os.environ.get('ENABLED_TOOLS') - - if not enabled_tools_str: - return None # No filtering, enable all tools - - # Parse comma-separated list and normalize - tools = {tool.strip() for tool in enabled_tools_str.split(',') if tool.strip()} - return tools if tools else None + Setup common parameters for Sheets API calls. + + Returns: + tuple: (sheets_service, full_range) + """ + sheets_service = ctx.request_context.lifespan_context.sheets_service + full_range = f"{sheet}!{range_spec}" if range_spec else sheet + return sheets_service, full_range + +# ============================================================================ +# A1 to R1C1 Conversion Utilities +# ============================================================================ + +def _column_letter_to_number(col: str) -> int: + """ + Convert column letter(s) to number (A=1, Z=26, AA=27, etc.). + + Args: + col: Column letter(s) in uppercase (e.g., 'A', 'Z', 'AA') + + Returns: + Column number (1-indexed) + """ + num = 0 + for char in col: + num = num * ALPHABET_SIZE + (ord(char) - ord('A') + 1) + return num + + +def _build_column_lut(max_col: int = 1000) -> dict: + """ + Build lookup table for column letters to numbers. + + Args: + max_col: Maximum column number to include in LUT + + Returns: + Dictionary mapping column letters to numbers + """ + lut = {} + for i in range(1, max_col + 1): + col_letter = "" + num = i + while num > 0: + num -= 1 + col_letter = chr(num % ALPHABET_SIZE + ord('A')) + col_letter + num //= ALPHABET_SIZE + lut[col_letter] = i + return lut + -ENABLED_TOOLS = _parse_enabled_tools() +# Pre-build column lookup table at module load (covers columns A-ALL = 1000 columns) +COLUMN_LUT = _build_column_lut(1000) + +# Pre-compile regex pattern for cell references (performance optimization) +CELL_REF_PATTERN = re.compile(r"(?:([^!]+!))?(\$)?([A-Z]+)(\$)?(\d+)") + + +def _a1_to_r1c1(formula: str, current_row: int, current_col: int) -> str: + """ + Convert formula from A1 notation to R1C1 notation. + + Args: + formula: Formula string in A1 notation (e.g., "=SUM(A1:B5)") + current_row: 1-based row number of the cell containing this formula + current_col: 1-based column number of the cell containing this formula + + Returns: + Formula in R1C1 notation (e.g., "=SUM(R[-4]C[-1]:RC[0])") + + Examples: + >>> _a1_to_r1c1('=A1', 2, 2) + '=R[-1]C[-1]' + >>> _a1_to_r1c1('=$A$1', 2, 2) + '=R1C1' + """ + def replace_cell_ref(match): + sheet_prefix = match.group(1) or '' + col_abs = match.group(2) # $ before column + col_letters = match.group(3) + row_abs = match.group(4) # $ before row + row_num = match.group(5) + + # Use LUT for column conversion (fallback to computed for ultra-wide sheets) + col_num = COLUMN_LUT.get(col_letters, _column_letter_to_number(col_letters)) + row = int(row_num) + + # Build R1C1 notation + if row_abs: + row_part = f"R{row}" + else: + offset = row - current_row + if offset == 0: + row_part = "R" + else: + row_part = f"R[{offset}]" + + if col_abs: + col_part = f"C{col_num}" + else: + offset = col_num - current_col + if offset == 0: + col_part = "C" + else: + col_part = f"C[{offset}]" + + return f"{sheet_prefix}{row_part}{col_part}" + + return CELL_REF_PATTERN.sub(replace_cell_ref, formula) @dataclass class SpreadsheetContext: @@ -75,7 +273,20 @@ async def spreadsheet_lifespan(server: FastMCP) -> AsyncIterator[SpreadsheetCont # Authenticate and build the service creds = None - if CREDENTIALS_CONFIG: + # Check for external OAuth token (Token Relay Mode) - highest priority + # This allows end-user authentication in containerized/OpenShift environments + if USER_ACCESS_TOKEN: + try: + log("Using external OAuth token (Token Relay Mode)") + # Create credentials from the provided access token + creds = Credentials(token=USER_ACCESS_TOKEN) + # Note: Token validation happens on first API call + # The caller is responsible for token refresh + except Exception as e: + log(f"Error using external access token: {e}") + creds = None + + if not creds and CREDENTIALS_CONFIG: creds = service_account.Credentials.from_service_account_info(json.loads(base64.b64decode(CREDENTIALS_CONFIG)), scopes=SCOPES) # Check for explicit service account authentication first (custom SERVICE_ACCOUNT_PATH) @@ -86,15 +297,15 @@ async def spreadsheet_lifespan(server: FastMCP) -> AsyncIterator[SpreadsheetCont SERVICE_ACCOUNT_PATH, scopes=SCOPES ) - print("Using service account authentication") - print(f"Working with Google Drive folder ID: {DRIVE_FOLDER_ID or 'Not specified'}") + log("Using service account authentication", 'INFO') + log(f"Google Drive folder ID: {DRIVE_FOLDER_ID or 'Not specified'}", 'DEBUG') except Exception as e: - print(f"Error using service account authentication: {e}") + log(f"Service account authentication error: {e}", 'WARN') creds = None - + # Fall back to OAuth flow if service account auth failed or not configured if not creds: - print("Trying OAuth authentication flow") + log("Trying OAuth authentication flow", 'DEBUG') if os.path.exists(TOKEN_PATH): with open(TOKEN_PATH, 'r') as token: creds = Credentials.from_authorized_user_info(json.load(token), SCOPES) @@ -103,15 +314,15 @@ async def spreadsheet_lifespan(server: FastMCP) -> AsyncIterator[SpreadsheetCont if not creds or not creds.valid: if creds and creds.expired and creds.refresh_token: try: - print("Attempting to refresh expired token...") + log("Refreshing expired OAuth token...", 'DEBUG') creds.refresh(Request()) - print("Token refreshed successfully") + log("Token refreshed successfully", 'INFO') # Save the refreshed token with open(TOKEN_PATH, 'w') as token: token.write(creds.to_json()) except Exception as refresh_error: - print(f"Token refresh failed: {refresh_error}") - print("Triggering reauthentication flow...") + log(f"Token refresh failed: {refresh_error}", 'WARN') + log("Triggering reauthentication flow...", 'DEBUG') creds = None # Clear creds to trigger OAuth flow below # If refresh failed or creds don't exist, run OAuth flow @@ -123,26 +334,29 @@ async def spreadsheet_lifespan(server: FastMCP) -> AsyncIterator[SpreadsheetCont # Save the credentials for the next run with open(TOKEN_PATH, 'w') as token: token.write(creds.to_json()) - print("Successfully authenticated using OAuth flow") + log("Successfully authenticated using OAuth flow", 'INFO') except Exception as e: - print(f"Error with OAuth flow: {e}") + log(f"OAuth flow error: {e}", 'ERROR') creds = None - + # Try Application Default Credentials if no creds thus far # This will automatically check GOOGLE_APPLICATION_CREDENTIALS, gcloud auth, and metadata service if not creds: try: - print("Attempting to use Application Default Credentials (ADC)") - print("ADC will check: GOOGLE_APPLICATION_CREDENTIALS, gcloud auth, and metadata service") + log("Attempting Application Default Credentials (ADC)", 'DEBUG') + log("ADC checks: GOOGLE_APPLICATION_CREDENTIALS, gcloud auth, metadata service", 'DEBUG') creds, project = google.auth.default( scopes=SCOPES ) - print(f"Successfully authenticated using ADC for project: {project}") + log(f"Successfully authenticated using ADC for project: {project}", 'INFO') except Exception as e: - print(f"Error using Application Default Credentials: {e}") + log(f"ADC authentication error: {e}", 'ERROR') raise Exception("All authentication methods failed. Please configure credentials.") - + # Build the services + # Note: static_discovery=False forces fetching fresh API discovery docs which can add latency + # on first request but ensures compatibility. For production, consider caching discovery docs. + log(f"Building Google API services (API timeout limit: {API_TIMEOUT}s)", 'INFO') sheets_service = build('sheets', 'v4', credentials=creds) drive_service = build('drive', 'v3', credentials=creds) @@ -158,51 +372,66 @@ async def spreadsheet_lifespan(server: FastMCP) -> AsyncIterator[SpreadsheetCont pass -# Initialize the MCP server with lifespan management -# Resolve host/port from environment variables with flexible names +DEFAULT_PORT = 8000 _resolved_host = os.environ.get('HOST') or os.environ.get('FASTMCP_HOST') or "0.0.0.0" -_resolved_port_str = os.environ.get('PORT') or os.environ.get('FASTMCP_PORT') or "8000" +_resolved_port_str = os.environ.get('PORT') or os.environ.get('FASTMCP_PORT') or str(DEFAULT_PORT) try: _resolved_port = int(_resolved_port_str) except ValueError: - _resolved_port = 8000 + _resolved_port = DEFAULT_PORT -# Initialize the MCP server with explicit host/port to ensure binding as configured mcp = FastMCP("Google Spreadsheet", dependencies=["google-auth", "google-auth-oauthlib", "google-api-python-client"], lifespan=spreadsheet_lifespan, host=_resolved_host, - port=_resolved_port) + port=_resolved_port, + log_level='WARNING') # Suppress "Processing request..." messages def tool(annotations: Optional[ToolAnnotations] = None): """ Conditional tool decorator that only registers tools if they're enabled. - + This wrapper checks ENABLED_TOOLS configuration and only applies the @mcp.tool decorator if the tool should be enabled. If ENABLED_TOOLS is None (default), all tools are enabled. - + Args: annotations: Optional ToolAnnotations for the tool - + Returns: Decorator function """ def decorator(func): tool_name = func.__name__ - - # If no filtering is configured, or if this tool is in the enabled list + if ENABLED_TOOLS is None or tool_name in ENABLED_TOOLS: - # Apply the mcp.tool decorator + # Wrap the function to add logging using functools.wraps to preserve signature + @functools.wraps(func) + def logged_func(*args, **kwargs): + start_time = time.time() + + # Filter out ctx parameter for cleaner logging + log_kwargs = {k: v for k, v in kwargs.items() if k != 'ctx'} + params_str = ', '.join(f'{k}={repr(v)[:50]}' for k, v in log_kwargs.items()) + log(f"→ {tool_name}({params_str})", 'INFO') + try: + tool_response = func(*args, **kwargs) + elapsed = time.time() - start_time + log(f"✓ {tool_name} ({elapsed:.2f}s)", 'INFO') + return tool_response + except Exception as e: + elapsed = time.time() - start_time + log(f"✗ {tool_name} failed ({elapsed:.2f}s): {type(e).__name__}: {e}", 'ERROR') + raise + if annotations: - return mcp.tool(annotations=annotations)(func) + return mcp.tool(annotations=annotations)(logged_func) else: - return mcp.tool()(func) + return mcp.tool()(logged_func) else: - # Don't register this tool - return the function undecorated return func - + return decorator @@ -212,58 +441,85 @@ def decorator(func): readOnlyHint=True, ), ) -def get_sheet_data(spreadsheet_id: str, - sheet: str, - range: Optional[str] = None, - include_grid_data: bool = False, - ctx: Context = None) -> Dict[str, Any]: +def get_sheet_data( + spreadsheet_id: str, + ranges: Union[str, List[str]], + ctx: Context = None +) -> Dict[str, Any]: """ - Get data from a specific sheet in a Google Spreadsheet. - + Get data from one or more ranges in a SINGLE Google Spreadsheet. + + IMPORTANT - MULTIPLE RANGES ARE BATCHED: + You can fetch multiple ranges from the SAME spreadsheet in a SINGLE API call by passing + an array instead of a string. This is 5-10x faster than making separate calls. + + WHEN TO USE SINGLE STRING: + - One range from one spreadsheet: "Sheet1!A1:B10" + + WHEN TO USE ARRAY: + - Multiple ranges from SAME spreadsheet: ["Sheet1!A1:B10", "Sheet2!C1:D20"] + - Multiple sheets from same spreadsheet: ["Sales!A:D", "Inventory!A:Z"] + - Different ranges from same sheet: ["Data!A1:A10", "Data!C1:C10"] + → All batched into ONE API call (~10 seconds total, not per range) + + MULTIPLE DIFFERENT SPREADSHEETS: + There is no single tool call that reads from multiple spreadsheets. + Call this tool separately for each spreadsheet (one call per spreadsheet): + get_sheet_data("spreadsheet-1-id", ranges) # Call 1 + get_sheet_data("spreadsheet-2-id", ranges) # Call 2 + Args: spreadsheet_id: The ID of the spreadsheet (found in the URL) - sheet: The name of the sheet - range: Optional cell range in A1 notation (e.g., 'A1:C10'). If not provided, gets all data. - include_grid_data: If True, includes cell formatting and other metadata in the response. - Note: Setting this to True will significantly increase the response size and token usage - when parsing the response, as it includes detailed cell formatting information. - Default is False (returns values only, more efficient). - + ranges: String for single range OR array of strings for multiple ranges. + All ranges must be from the same spreadsheet. + Format: "SheetName!A1:B10" or just "SheetName" for entire sheet. + Returns: - Grid data structure with either full metadata or just values from Google Sheets API, depending on include_grid_data parameter + Dictionary containing: + - spreadsheetId: The spreadsheet ID + - valueRanges: Array of results (always an array, even for single range) + - range: The A1 notation of the fetched range + - values: 2D array of cell values [[row1], [row2], ...] + + Examples: + # Single range + get_sheet_data("1abc...", "Sheet1!A1:B10") + → Returns: {valueRanges: [{range: "Sheet1!A1:B10", values: [...]}]} + + # Multiple ranges - BATCHED in one API call (recommended!) + get_sheet_data("1abc...", ["Sheet1!A1:B10", "Sheet2!C1:D20", "Sheet3!E:F"]) + → Returns: {valueRanges: [{range: "Sheet1!A1:B10", values: [...]}, ...]} + + # Entire sheets + get_sheet_data("1abc...", ["Sales", "Inventory", "Reports"]) + + SEE ALSO: get_sheet_formulas - for fetching formulas instead of values + + Performance: ~10 seconds per API call, regardless of how many ranges (1 or 100). """ sheets_service = ctx.request_context.lifespan_context.sheets_service - # Construct the range - keep original API behavior - if range: - full_range = f"{sheet}!{range}" - else: - full_range = sheet - - if include_grid_data: - # Use full API to get all grid data including formatting - result = sheets_service.spreadsheets().get( - spreadsheetId=spreadsheet_id, - ranges=[full_range], - includeGridData=True - ).execute() - else: - # Use values API to get cell values only (more efficient) - values_result = sheets_service.spreadsheets().values().get( - spreadsheetId=spreadsheet_id, - range=full_range - ).execute() - - # Format the response to match expected structure - result = { - 'spreadsheetId': spreadsheet_id, - 'valueRanges': [{ - 'range': full_range, - 'values': values_result.get('values', []) - }] - } + # Normalize to list for consistent handling + is_single_range = isinstance(ranges, str) + range_list = [ranges] if is_single_range else ranges + + log(f"Fetching {len(range_list)} range(s) from spreadsheet {spreadsheet_id}", 'DEBUG') + + # Always use batchGet for consistency - works for single or multiple ranges + request = sheets_service.spreadsheets().values().batchGet( + spreadsheetId=spreadsheet_id, + ranges=range_list + ) + + batch_response = execute_with_retry( + request, + f"get_sheet_data {spreadsheet_id} ({len(range_list)} range{'s' if len(range_list) > 1 else ''})" + ) + + log(f"✓ Successfully fetched {len(range_list)} range(s)", 'DEBUG') + + return batch_response - return result @tool( annotations=ToolAnnotations( @@ -271,39 +527,129 @@ def get_sheet_data(spreadsheet_id: str, readOnlyHint=True, ), ) -def get_sheet_formulas(spreadsheet_id: str, - sheet: str, - range: Optional[str] = None, - ctx: Context = None) -> List[List[Any]]: +def get_sheet_formulas( + spreadsheet_id: str, + ranges: Union[str, List[str]], + format: str = 'A1', + ctx: Context = None +) -> Dict[str, Any]: """ - Get formulas from a specific sheet in a Google Spreadsheet. - + Get formulas from one or more ranges in a SINGLE Google Spreadsheet. + + IMPORTANT - MULTIPLE RANGES ARE BATCHED: + You can fetch formulas from multiple ranges in the SAME spreadsheet in a SINGLE API call + by passing an array instead of a string. This is 5-10x faster than making separate calls. + + WHEN TO USE SINGLE STRING: + - One range from one spreadsheet: "Sheet1!A1:B10" + + WHEN TO USE ARRAY: + - Multiple formula columns from same spreadsheet: ["Sheet1!B:B", "Sheet2!C:C", "Sheet3!D:D"] + - Multiple sheets from same spreadsheet: ["Calculations!A:Z", "Analysis!A:Z"] + - Different ranges from same sheet: ["Data!A1:A100", "Data!C1:C100"] + → All batched into ONE API call (~10 seconds total, not per range) + + MULTIPLE DIFFERENT SPREADSHEETS: + There is no single tool call that reads formulas from multiple spreadsheets. + Call this tool separately for each spreadsheet (one call per spreadsheet): + get_sheet_formulas("spreadsheet-1-id", ranges, format) # Call 1 + get_sheet_formulas("spreadsheet-2-id", ranges, format) # Call 2 + Args: spreadsheet_id: The ID of the spreadsheet (found in the URL) - sheet: The name of the sheet - range: Optional cell range in A1 notation (e.g., 'A1:C10'). If not provided, gets all formulas from the sheet. - + ranges: String for single range OR array of strings for multiple ranges. + All ranges must be from the same spreadsheet. + Format: "SheetName!A1:B10" or just "SheetName" for entire sheet. + format: Formula notation format. Either 'A1' (default) or 'R1C1'. + - 'A1': Returns formulas like =SUM(A1:A3) + - 'R1C1': Returns formulas like =SUM(R[-2]C:RC) for pattern analysis + Returns: - A 2D array of the sheet formulas. + Dictionary containing: + - spreadsheetId: The spreadsheet ID + - valueRanges: Array of results (always an array, even for single range) + - range: The A1 notation of the fetched range + - values: 2D array of formulas [[row1], [row2], ...] + + Examples: + # Single range + get_sheet_formulas("1abc...", "Sheet1!B:B", format="A1") + → Returns formulas in A1 notation: =SUM(A1:A10) + + # Multiple ranges - BATCHED in one API call (recommended!) + get_sheet_formulas("1abc...", ["Sheet1!B:B", "Sheet2!C:C"], format="R1C1") + → Returns formulas in R1C1 notation: =SUM(R[-9]C[-1]:RC[-1]) + + # Analyze formula patterns across sheets + get_sheet_formulas("1abc...", ["Sales!D:D", "Costs!D:D", "Profit!D:D"], format="R1C1") + + SEE ALSO: get_sheet_data - for fetching cell values instead of formulas + + Performance: ~10 seconds per API call, regardless of how many ranges (1 or 100). """ + # Validate format parameter + if format not in ('A1', 'R1C1'): + raise ValueError(f"format must be 'A1' or 'R1C1', got '{format}'") + sheets_service = ctx.request_context.lifespan_context.sheets_service - - # Construct the range - if range: - full_range = f"{sheet}!{range}" - else: - full_range = sheet # Get all formulas in the specified sheet - - # Call the Sheets API - result = sheets_service.spreadsheets().values().get( + + # Normalize to list for consistent handling + is_single_range = isinstance(ranges, str) + range_list = [ranges] if is_single_range else ranges + + log(f"Fetching formulas from {len(range_list)} range(s) in spreadsheet {spreadsheet_id}", 'DEBUG') + + # Use batchGet to fetch formulas from all ranges + request = sheets_service.spreadsheets().values().batchGet( spreadsheetId=spreadsheet_id, - range=full_range, - valueRenderOption='FORMULA' # Request formulas - ).execute() - - # Get the formulas from the response - formulas = result.get('values', []) - return formulas + ranges=range_list, + valueRenderOption='FORMULA' # Request formulas instead of values + ) + + batch_response = execute_with_retry( + request, + f"get_sheet_formulas {spreadsheet_id} ({len(range_list)} range{'s' if len(range_list) > 1 else ''})" + ) + + # Convert to R1C1 if requested + if format == 'R1C1': + for value_range in batch_response.get('valueRanges', []): + full_range = value_range['range'] + formulas = value_range.get('values', []) + + # Parse the range to determine starting row/column + # Format: "Sheet1!B1:B10" or "Sheet1!B1" or "Sheet1" + range_match = re.search(r'!([A-Z]+)(\d+)', full_range) + if range_match: + start_col_letter = range_match.group(1) + start_row = int(range_match.group(2)) + start_col = COLUMN_LUT.get(start_col_letter, _column_letter_to_number(start_col_letter)) + else: + # If no range specified, assume starting at A1 + start_row = 1 + start_col = 1 + + # Convert each formula + converted_formulas = [] + for row_idx, row in enumerate(formulas): + converted_row = [] + for col_idx, cell_value in enumerate(row): + if isinstance(cell_value, str) and cell_value.startswith('='): + current_row = start_row + row_idx + current_col = start_col + col_idx + converted_formula = _a1_to_r1c1(cell_value, current_row, current_col) + converted_row.append(converted_formula) + else: + # Not a formula, keep as-is + converted_row.append(cell_value) + converted_formulas.append(converted_row) + + # Update the value range with converted formulas + value_range['values'] = converted_formulas + + log(f"✓ Successfully fetched formulas from {len(range_list)} range(s)", 'DEBUG') + + return batch_response @tool( annotations=ToolAnnotations( @@ -314,39 +660,37 @@ def get_sheet_formulas(spreadsheet_id: str, def update_cells(spreadsheet_id: str, sheet: str, range: str, - data: List[List[Any]], + cell_values: List[List[Any]], ctx: Context = None) -> Dict[str, Any]: """ Update cells in a Google Spreadsheet. - + Args: spreadsheet_id: The ID of the spreadsheet (found in the URL) sheet: The name of the sheet range: Cell range in A1 notation (e.g., 'A1:C10') - data: 2D array of values to update - + cell_values: 2D array of values to update + Returns: Result of the update operation """ - sheets_service = ctx.request_context.lifespan_context.sheets_service - - # Construct the range - full_range = f"{sheet}!{range}" - + sheets_service, full_range = _setup_sheets_api_call(ctx, sheet, range) + # Prepare the value range object value_range_body = { - 'values': data + 'values': cell_values } - + # Call the Sheets API to update values - result = sheets_service.spreadsheets().values().update( + request = sheets_service.spreadsheets().values().update( spreadsheetId=spreadsheet_id, range=full_range, valueInputOption='USER_ENTERED', body=value_range_body - ).execute() - - return result + ) + update_response = execute_with_retry(request, f"update_cells {spreadsheet_id}:{full_range}") + + return update_response @tool( @@ -372,28 +716,28 @@ def batch_update_cells(spreadsheet_id: str, Result of the batch update operation """ sheets_service = ctx.request_context.lifespan_context.sheets_service - + # Prepare the batch update request - data = [] + value_range_updates = [] for range_str, values in ranges.items(): - full_range = f"{sheet}!{range_str}" - data.append({ - 'range': full_range, + value_range_updates.append({ + 'range': f"{sheet}!{range_str}" if range_str else sheet, 'values': values }) - + batch_body = { 'valueInputOption': 'USER_ENTERED', - 'data': data + 'data': value_range_updates } - + # Call the Sheets API to perform batch update - result = sheets_service.spreadsheets().values().batchUpdate( + request = sheets_service.spreadsheets().values().batchUpdate( spreadsheetId=spreadsheet_id, body=batch_body - ).execute() - - return result + ) + batch_response = execute_with_retry(request, f"batch_update_cells {spreadsheet_id}") + + return batch_response @tool( @@ -422,17 +766,18 @@ def add_rows(spreadsheet_id: str, sheets_service = ctx.request_context.lifespan_context.sheets_service # Get sheet ID - spreadsheet = sheets_service.spreadsheets().get(spreadsheetId=spreadsheet_id).execute() + request = sheets_service.spreadsheets().get(spreadsheetId=spreadsheet_id) + spreadsheet = execute_with_retry(request, f"add_rows:get_sheet {spreadsheet_id}") sheet_id = None - + for s in spreadsheet['sheets']: if s['properties']['title'] == sheet: sheet_id = s['properties']['sheetId'] break - + if sheet_id is None: return {"error": f"Sheet '{sheet}' not found"} - + # Prepare the insert rows request request_body = { "requests": [ @@ -449,14 +794,15 @@ def add_rows(spreadsheet_id: str, } ] } - + # Execute the request - result = sheets_service.spreadsheets().batchUpdate( + request = sheets_service.spreadsheets().batchUpdate( spreadsheetId=spreadsheet_id, body=request_body - ).execute() - - return result + ) + batch_update_response = execute_with_retry(request, f"add_rows {spreadsheet_id}:{sheet}") + + return batch_update_response @tool( @@ -483,19 +829,20 @@ def add_columns(spreadsheet_id: str, Result of the operation """ sheets_service = ctx.request_context.lifespan_context.sheets_service - + # Get sheet ID - spreadsheet = sheets_service.spreadsheets().get(spreadsheetId=spreadsheet_id).execute() + request = sheets_service.spreadsheets().get(spreadsheetId=spreadsheet_id) + spreadsheet = execute_with_retry(request, f"add_columns:get_sheet {spreadsheet_id}") sheet_id = None - + for s in spreadsheet['sheets']: if s['properties']['title'] == sheet: sheet_id = s['properties']['sheetId'] break - + if sheet_id is None: return {"error": f"Sheet '{sheet}' not found"} - + # Prepare the insert columns request request_body = { "requests": [ @@ -512,14 +859,15 @@ def add_columns(spreadsheet_id: str, } ] } - + # Execute the request - result = sheets_service.spreadsheets().batchUpdate( + request = sheets_service.spreadsheets().batchUpdate( spreadsheetId=spreadsheet_id, body=request_body - ).execute() - - return result + ) + batch_update_response = execute_with_retry(request, f"add_columns {spreadsheet_id}:{sheet}") + + return batch_update_response @tool( @@ -539,13 +887,14 @@ def list_sheets(spreadsheet_id: str, ctx: Context = None) -> List[str]: List of sheet names """ sheets_service = ctx.request_context.lifespan_context.sheets_service - + # Get spreadsheet metadata - spreadsheet = sheets_service.spreadsheets().get(spreadsheetId=spreadsheet_id).execute() - + request = sheets_service.spreadsheets().get(spreadsheetId=spreadsheet_id) + spreadsheet = execute_with_retry(request, f"list_sheets {spreadsheet_id}") + # Extract sheet names sheet_names = [sheet['properties']['title'] for sheet in spreadsheet['sheets']] - + return sheet_names @@ -573,33 +922,35 @@ def copy_sheet(src_spreadsheet: str, Result of the operation """ sheets_service = ctx.request_context.lifespan_context.sheets_service - + # Get source sheet ID - src = sheets_service.spreadsheets().get(spreadsheetId=src_spreadsheet).execute() + request = sheets_service.spreadsheets().get(spreadsheetId=src_spreadsheet) + src = execute_with_retry(request, f"copy_sheet:get_source {src_spreadsheet}") src_sheet_id = None - + for s in src['sheets']: if s['properties']['title'] == src_sheet: src_sheet_id = s['properties']['sheetId'] break - + if src_sheet_id is None: return {"error": f"Source sheet '{src_sheet}' not found"} - + # Copy the sheet to destination spreadsheet - copy_result = sheets_service.spreadsheets().sheets().copyTo( + request = sheets_service.spreadsheets().sheets().copyTo( spreadsheetId=src_spreadsheet, sheetId=src_sheet_id, body={ "destinationSpreadsheetId": dst_spreadsheet } - ).execute() - + ) + copy_result = execute_with_retry(request, f"copy_sheet {src_spreadsheet}:{src_sheet} -> {dst_spreadsheet}") + # If destination sheet name is different from the default copied name, rename it if 'title' in copy_result and copy_result['title'] != dst_sheet: # Get the ID of the newly copied sheet copy_sheet_id = copy_result['sheetId'] - + # Rename the copied sheet rename_request = { "requests": [ @@ -614,11 +965,12 @@ def copy_sheet(src_spreadsheet: str, } ] } - - rename_result = sheets_service.spreadsheets().batchUpdate( + + request = sheets_service.spreadsheets().batchUpdate( spreadsheetId=dst_spreadsheet, body=rename_request - ).execute() + ) + rename_result = execute_with_retry(request, f"copy_sheet:rename {dst_spreadsheet}:{dst_sheet}") return { "copy": copy_result, @@ -650,19 +1002,20 @@ def rename_sheet(spreadsheet: str, Result of the operation """ sheets_service = ctx.request_context.lifespan_context.sheets_service - + # Get sheet ID - spreadsheet_data = sheets_service.spreadsheets().get(spreadsheetId=spreadsheet).execute() + request = sheets_service.spreadsheets().get(spreadsheetId=spreadsheet) + spreadsheet_data = execute_with_retry(request, f"rename_sheet:get_sheet {spreadsheet}") sheet_id = None - + for s in spreadsheet_data['sheets']: if s['properties']['title'] == sheet: sheet_id = s['properties']['sheetId'] break - + if sheet_id is None: return {"error": f"Sheet '{sheet}' not found"} - + # Prepare the rename request request_body = { "requests": [ @@ -677,67 +1030,15 @@ def rename_sheet(spreadsheet: str, } ] } - + # Execute the request - result = sheets_service.spreadsheets().batchUpdate( + request = sheets_service.spreadsheets().batchUpdate( spreadsheetId=spreadsheet, body=request_body - ).execute() - - return result - - -@tool( - annotations=ToolAnnotations( - title="Get Multiple Sheet Data", - readOnlyHint=True, - ), -) -def get_multiple_sheet_data(queries: List[Dict[str, str]], - ctx: Context = None) -> List[Dict[str, Any]]: - """ - Get data from multiple specific ranges in Google Spreadsheets. - - Args: - queries: A list of dictionaries, each specifying a query. - Each dictionary should have 'spreadsheet_id', 'sheet', and 'range' keys. - Example: [{'spreadsheet_id': 'abc', 'sheet': 'Sheet1', 'range': 'A1:B5'}, - {'spreadsheet_id': 'xyz', 'sheet': 'Data', 'range': 'C1:C10'}] - - Returns: - A list of dictionaries, each containing the original query parameters - and the fetched 'data' or an 'error'. - """ - sheets_service = ctx.request_context.lifespan_context.sheets_service - results = [] - - for query in queries: - spreadsheet_id = query.get('spreadsheet_id') - sheet = query.get('sheet') - range_str = query.get('range') - - if not all([spreadsheet_id, sheet, range_str]): - results.append({**query, 'error': 'Missing required keys (spreadsheet_id, sheet, range)'}) - continue - - try: - # Construct the range - full_range = f"{sheet}!{range_str}" - - # Call the Sheets API - result = sheets_service.spreadsheets().values().get( - spreadsheetId=spreadsheet_id, - range=full_range - ).execute() - - # Get the values from the response - values = result.get('values', []) - results.append({**query, 'data': values}) + ) + rename_response = execute_with_retry(request, f"rename_sheet {spreadsheet}:{sheet}->{new_name}") - except Exception as e: - results.append({**query, 'error': str(e)}) - - return results + return rename_response @tool( @@ -773,13 +1074,14 @@ def get_multiple_spreadsheet_summary(spreadsheet_ids: List[str], } try: # Get spreadsheet metadata - spreadsheet = sheets_service.spreadsheets().get( + request = sheets_service.spreadsheets().get( spreadsheetId=spreadsheet_id, fields='properties.title,sheets(properties(title,sheetId))' - ).execute() - + ) + spreadsheet = execute_with_retry(request, f"get_multiple_summary:get_metadata {spreadsheet_id}") + summary_data['title'] = spreadsheet.get('properties', {}).get('title', 'Unknown Title') - + sheet_summaries = [] for sheet in spreadsheet.get('sheets', []): sheet_title = sheet.get('properties', {}).get('title') @@ -791,24 +1093,25 @@ def get_multiple_spreadsheet_summary(spreadsheet_ids: List[str], 'first_rows': [], 'error': None } - + if not sheet_title: sheet_summary['error'] = 'Sheet title not found' sheet_summaries.append(sheet_summary) continue - + try: # Fetch the first few rows (e.g., A1:Z5) # Adjust range if fewer rows are requested max_row = max(1, rows_to_fetch) # Ensure at least 1 row is fetched range_to_get = f"{sheet_title}!A1:{max_row}" # Fetch all columns up to max_row - - result = sheets_service.spreadsheets().values().get( + + request = sheets_service.spreadsheets().values().get( spreadsheetId=spreadsheet_id, range=range_to_get - ).execute() - - values = result.get('values', []) + ) + summary_response = execute_with_retry(request, f"get_multiple_summary:get_rows {spreadsheet_id}:{sheet_title}") + + values = summary_response.get('values', []) if values: sheet_summary['headers'] = values[0] @@ -848,9 +1151,10 @@ def get_spreadsheet_info(spreadsheet_id: str) -> str: # Access the context through mcp.get_lifespan_context() for resources context = mcp.get_lifespan_context() sheets_service = context.sheets_service - + # Get spreadsheet metadata - spreadsheet = sheets_service.spreadsheets().get(spreadsheetId=spreadsheet_id).execute() + request = sheets_service.spreadsheets().get(spreadsheetId=spreadsheet_id) + spreadsheet = execute_with_retry(request, f"get_spreadsheet_info {spreadsheet_id}") # Extract relevant information info = { @@ -897,17 +1201,18 @@ def create_spreadsheet(title: str, folder_id: Optional[str] = None, ctx: Context } if target_folder_id: file_body['parents'] = [target_folder_id] - - spreadsheet = drive_service.files().create( + + request = drive_service.files().create( supportsAllDrives=True, body=file_body, fields='id, name, parents' - ).execute() + ) + spreadsheet = execute_with_retry(request, f"create_spreadsheet {title}") spreadsheet_id = spreadsheet.get('id') parents = spreadsheet.get('parents') folder_info = f" in folder {target_folder_id}" if target_folder_id else " in root" - print(f"Spreadsheet created with ID: {spreadsheet_id}{folder_info}") + log(f"Spreadsheet created with ID: {spreadsheet_id}{folder_info}", 'INFO') return { 'spreadsheetId': spreadsheet_id, @@ -951,14 +1256,15 @@ def create_sheet(spreadsheet_id: str, } # Execute the request - result = sheets_service.spreadsheets().batchUpdate( + request = sheets_service.spreadsheets().batchUpdate( spreadsheetId=spreadsheet_id, body=request_body - ).execute() - + ) + create_sheet_response = execute_with_retry(request, f"create_sheet {spreadsheet_id}:{title}") + # Extract the new sheet information - new_sheet_props = result['replies'][0]['addSheet']['properties'] - + new_sheet_props = create_sheet_response['replies'][0]['addSheet']['properties'] + return { 'sheetId': new_sheet_props['sheetId'], 'title': new_sheet_props['title'], @@ -994,19 +1300,20 @@ def list_spreadsheets(folder_id: Optional[str] = None, ctx: Context = None) -> L # If a specific folder is provided or configured, search only in that folder if target_folder_id: query += f" and '{target_folder_id}' in parents" - print(f"Searching for spreadsheets in folder: {target_folder_id}") + log(f"Searching for spreadsheets in folder: {target_folder_id}", 'DEBUG') else: - print("Searching for spreadsheets in 'My Drive'") + log("Searching for spreadsheets in 'My Drive'", 'DEBUG') # List spreadsheets - results = drive_service.files().list( + request = drive_service.files().list( q=query, spaces='drive', includeItemsFromAllDrives=True, supportsAllDrives=True, fields='files(id, name)', orderBy='modifiedTime desc' - ).execute() + ) + results = execute_with_retry(request, f"list_spreadsheets folder:{target_folder_id or 'My Drive'}") spreadsheets = results.get('files', []) @@ -1069,26 +1376,24 @@ def share_spreadsheet(spreadsheet_id: str, } try: - result = drive_service.permissions().create( + request = drive_service.permissions().create( fileId=spreadsheet_id, body=permission, sendNotificationEmail=send_notification, fields='id' - ).execute() + ) + permission_response = execute_with_retry(request, f"share_spreadsheet {spreadsheet_id} with {email_address}") successes.append({ - 'email_address': email_address, - 'role': role, - 'permissionId': result.get('id') + 'email_address': email_address, + 'role': role, + 'permissionId': permission_response.get('id') }) except Exception as e: - # Try to provide a more informative error message error_details = str(e) if hasattr(e, 'content'): - try: - error_content = json.loads(e.content) - error_details = error_content.get('error', {}).get('message', error_details) - except json.JSONDecodeError: - pass # Keep the original error string + error_content_json = json.loads(e.content) if isinstance(e.content, (str, bytes)) else None + if error_content_json: + error_details = error_content_json.get('error', {}).get('message', error_details) failures.append({ 'email_address': email_address, 'error': f"Failed to share: {error_details}" @@ -1122,21 +1427,22 @@ def list_folders(parent_folder_id: Optional[str] = None, ctx: Context = None) -> # If a specific parent folder is provided, search only within that folder if parent_folder_id: query += f" and '{parent_folder_id}' in parents" - print(f"Searching for folders in parent folder: {parent_folder_id}") + log(f"Searching for folders in parent folder: {parent_folder_id}", 'DEBUG') else: # Search in root of My Drive (folders that don't have any parent folders) query += " and 'root' in parents" - print("Searching for folders in 'My Drive' root") + log("Searching for folders in 'My Drive' root", 'DEBUG') # List folders - results = drive_service.files().list( + request = drive_service.files().list( q=query, spaces='drive', includeItemsFromAllDrives=True, supportsAllDrives=True, fields='files(id, name, parents)', orderBy='name' - ).execute() + ) + results = execute_with_retry(request, f"list_folders parent:{parent_folder_id or 'root'}") folders = results.get('files', []) @@ -1184,41 +1490,40 @@ def search_spreadsheets(query: str, f"(name contains '{query}' or fullText contains '{query}')" ) - try: - results = drive_service.files().list( - q=search_query, - pageSize=max_results, - spaces='drive', - includeItemsFromAllDrives=True, - supportsAllDrives=True, - fields='files(id, name, createdTime, modifiedTime, owners, webViewLink)', - orderBy='modifiedTime desc' - ).execute() - - files = results.get('files', []) - - return [ - { - 'id': f['id'], - 'name': f['name'], - 'created_time': f.get('createdTime'), - 'modified_time': f.get('modifiedTime'), - 'owners': [owner.get('emailAddress') for owner in f.get('owners', [])], - 'web_link': f.get('webViewLink') - } - for f in files - ] - except Exception as e: - return [{'error': f'Search failed: {str(e)}'}] + request = drive_service.files().list( + q=search_query, + pageSize=max_results, + spaces='drive', + includeItemsFromAllDrives=True, + supportsAllDrives=True, + fields='files(id, name, createdTime, modifiedTime, owners, webViewLink)', + orderBy='modifiedTime desc' + ) + search_results = execute_with_retry(request, f"search_spreadsheets query:{query}") + + files = search_results.get('files', []) + + return [ + { + 'id': f['id'], + 'name': f['name'], + 'created_time': f.get('createdTime'), + 'modified_time': f.get('modifiedTime'), + 'owners': [owner.get('emailAddress') for owner in f.get('owners', [])], + 'web_link': f.get('webViewLink') + } + for f in files + ] def _column_index_to_letter(index: int) -> str: """Convert 0-based column index to A1 notation letter (0='A', 25='Z', 26='AA', etc.)""" - result = "" + ALPHABET_SIZE = 26 + column_letter = "" while index >= 0: - result = chr(index % 26 + ord('A')) + result - index = index // 26 - 1 - return result + column_letter = chr(index % ALPHABET_SIZE + ord('A')) + column_letter + index = index // ALPHABET_SIZE - 1 + return column_letter @tool( @@ -1249,59 +1554,55 @@ def find_in_spreadsheet(spreadsheet_id: str, sheets_service = ctx.request_context.lifespan_context.sheets_service results = [] - try: - # Get spreadsheet metadata to find all sheets - spreadsheet = sheets_service.spreadsheets().get( - spreadsheetId=spreadsheet_id, - fields='sheets(properties(title,sheetId))' - ).execute() + request = sheets_service.spreadsheets().get( + spreadsheetId=spreadsheet_id, + fields='sheets(properties(title,sheetId))' + ) + spreadsheet = execute_with_retry(request, f"find_in_spreadsheet:get_sheets {spreadsheet_id}") - sheets_to_search = [] - for s in spreadsheet.get('sheets', []): - sheet_title = s.get('properties', {}).get('title') - if sheet is None or sheet_title == sheet: - sheets_to_search.append(sheet_title) + sheets_to_search = [] + for s in spreadsheet.get('sheets', []): + sheet_title = s.get('properties', {}).get('title') + if sheet is None or sheet_title == sheet: + sheets_to_search.append(sheet_title) - if not sheets_to_search: - return [{'error': f"Sheet '{sheet}' not found"}] + if not sheets_to_search: + return [{'error': f"Sheet '{sheet}' not found"}] - search_query = query if case_sensitive else query.lower() + search_query = query if case_sensitive else query.lower() - for sheet_name in sheets_to_search: - if len(results) >= max_results: - break + for sheet_name in sheets_to_search: + if len(results) >= max_results: + break - # Get all data from the sheet - response = sheets_service.spreadsheets().values().get( - spreadsheetId=spreadsheet_id, - range=sheet_name - ).execute() + request = sheets_service.spreadsheets().values().get( + spreadsheetId=spreadsheet_id, + range=sheet_name + ) + response = execute_with_retry(request, f"find_in_spreadsheet:search {spreadsheet_id}:{sheet_name}") + + values = response.get('values', []) - values = response.get('values', []) + for row_idx, row in enumerate(values): + if len(results) >= max_results: + break - for row_idx, row in enumerate(values): + for col_idx, cell_value in enumerate(row): if len(results) >= max_results: break - for col_idx, cell_value in enumerate(row): - if len(results) >= max_results: - break - - cell_str = str(cell_value) - compare_value = cell_str if case_sensitive else cell_str.lower() - - if search_query in compare_value: - cell_ref = f"{_column_index_to_letter(col_idx)}{row_idx + 1}" - results.append({ - 'sheet': sheet_name, - 'cell': cell_ref, - 'value': cell_value - }) + cell_str = str(cell_value) + compare_value = cell_str if case_sensitive else cell_str.lower() - return results + if search_query in compare_value: + cell_ref = f"{_column_index_to_letter(col_idx)}{row_idx + 1}" + results.append({ + 'sheet': sheet_name, + 'cell': cell_ref, + 'value': cell_value + }) - except Exception as e: - return [{'error': f'Search failed: {str(e)}'}] + return results @tool( @@ -1379,23 +1680,39 @@ def batch_update(spreadsheet_id: str, request_body = { "requests": requests } - + # Execute the batch update - result = sheets_service.spreadsheets().batchUpdate( + request = sheets_service.spreadsheets().batchUpdate( spreadsheetId=spreadsheet_id, body=request_body - ).execute() - - return result + ) + batch_update_result = execute_with_retry(request, f"batch_update {spreadsheet_id}") + + return batch_update_result def main(): + log("=" * 60, 'INFO') + log("MCP Google Sheets Server Starting", 'INFO') + log("=" * 60, 'INFO') + log(f"Python: {sys.version.split()[0]}", 'INFO') + log(f"Working directory: {os.getcwd()}", 'INFO') + + # Show log level + level_name = [k for k, v in LOG_LEVELS.items() if v == _LOG_LEVEL][0] + log(f"Log level: {level_name} (set LOG_LEVEL env var to change: DEBUG/INFO/WARN/ERROR)", 'INFO') + # Log tool filtering configuration if enabled if ENABLED_TOOLS is not None: - print(f"Tool filtering enabled. Active tools: {', '.join(sorted(ENABLED_TOOLS))}") + log(f"Tool filtering: ENABLED ({len(ENABLED_TOOLS)} tools)", 'INFO') + log(f" Active tools: {', '.join(sorted(ENABLED_TOOLS))}", 'INFO') else: - print("Tool filtering disabled. All tools are enabled.") - + log("Tool filtering: DISABLED (all tools enabled)", 'INFO') + + log(f"API timeout limit: {API_TIMEOUT}s", 'INFO') + log(f"Rate limit retries: {MAX_RETRIES} (max backoff: {MAX_BACKOFF}s)", 'INFO') + log("=" * 60, 'INFO') + # Run the server transport = "stdio" for i, arg in enumerate(sys.argv): @@ -1403,4 +1720,5 @@ def main(): transport = sys.argv[i + 1] break + log(f"Starting MCP server with transport: {transport}", 'INFO') mcp.run(transport=transport)