Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 37 additions & 23 deletions src/mcp_google_sheets/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
"""

import base64
import logging
import os
import sys
from typing import List, Dict, Any, Optional, Union
Expand All @@ -25,6 +26,8 @@
from googleapiclient.discovery import build
import google.auth

logger = logging.getLogger(__name__)

# Constants
SCOPES = ['https://www.googleapis.com/auth/spreadsheets', 'https://www.googleapis.com/auth/drive']
CREDENTIALS_CONFIG = os.environ.get('CREDENTIALS_CONFIG')
Expand All @@ -33,6 +36,15 @@
SERVICE_ACCOUNT_PATH = os.environ.get('SERVICE_ACCOUNT_PATH', 'service_account.json')
DRIVE_FOLDER_ID = os.environ.get('DRIVE_FOLDER_ID', '') # Working directory in Google Drive


def _configure_logging() -> None:
"""Configure CLI logging without overriding host application logging."""
level_name = os.environ.get("LOG_LEVEL") or ("DEBUG" if os.environ.get("DEBUG") else "INFO")
level = getattr(logging, level_name.upper(), logging.INFO)
if not logging.getLogger().handlers:
logging.basicConfig(level=level, stream=sys.stderr, format="%(message)s")
logger.setLevel(level)

# Tool filtering configuration
# Parse enabled tools from environment variable or command-line argument
def _parse_enabled_tools() -> Optional[set]:
Expand Down Expand Up @@ -86,15 +98,15 @@ async def spreadsheet_lifespan(server: FastMCP) -> AsyncIterator[SpreadsheetCont
SERVICE_ACCOUNT_PATH,
scopes=SCOPES
)
print("Using service account authentication")
print(f"Working with Google Drive folder ID: {DRIVE_FOLDER_ID or 'Not specified'}")
logger.info("Using service account authentication")
logger.info("Working with Google Drive folder ID: %s", DRIVE_FOLDER_ID or "Not specified")
except Exception as e:
print(f"Error using service account authentication: {e}")
logger.error("Error using service account authentication: %s", e)
creds = None

# Fall back to OAuth flow if service account auth failed or not configured
if not creds:
print("Trying OAuth authentication flow")
logger.info("Trying OAuth authentication flow")
if os.path.exists(TOKEN_PATH):
with open(TOKEN_PATH, 'r') as token:
creds = Credentials.from_authorized_user_info(json.load(token), SCOPES)
Expand All @@ -103,15 +115,15 @@ async def spreadsheet_lifespan(server: FastMCP) -> AsyncIterator[SpreadsheetCont
if not creds or not creds.valid:
if creds and creds.expired and creds.refresh_token:
try:
print("Attempting to refresh expired token...")
logger.info("Attempting to refresh expired token...")
creds.refresh(Request())
print("Token refreshed successfully")
logger.info("Token refreshed successfully")
# Save the refreshed token
with open(TOKEN_PATH, 'w') as token:
token.write(creds.to_json())
except Exception as refresh_error:
print(f"Token refresh failed: {refresh_error}")
print("Triggering reauthentication flow...")
logger.error("Token refresh failed: %s", refresh_error)
logger.info("Triggering reauthentication flow...")
creds = None # Clear creds to trigger OAuth flow below

# If refresh failed or creds don't exist, run OAuth flow
Expand All @@ -123,28 +135,28 @@ async def spreadsheet_lifespan(server: FastMCP) -> AsyncIterator[SpreadsheetCont
# Save the credentials for the next run
with open(TOKEN_PATH, 'w') as token:
token.write(creds.to_json())
print("Successfully authenticated using OAuth flow")
logger.info("Successfully authenticated using OAuth flow")
except Exception as e:
print(f"Error with OAuth flow: {e}")
logger.error("Error with OAuth flow: %s", e)
creds = None

# Try Application Default Credentials if no creds thus far
# This will automatically check GOOGLE_APPLICATION_CREDENTIALS, gcloud auth, and metadata service
if not creds:
try:
print("Attempting to use Application Default Credentials (ADC)")
print("ADC will check: GOOGLE_APPLICATION_CREDENTIALS, gcloud auth, and metadata service")
logger.info("Attempting to use Application Default Credentials (ADC)")
logger.info("ADC will check: GOOGLE_APPLICATION_CREDENTIALS, gcloud auth, and metadata service")
creds, project = google.auth.default(
scopes=SCOPES
)
print(f"Successfully authenticated using ADC for project: {project}")
logger.info("Successfully authenticated using ADC for project: %s", project)
except Exception as e:
print(f"Error using Application Default Credentials: {e}")
logger.error("Error using Application Default Credentials: %s", e)
raise Exception("All authentication methods failed. Please configure credentials.")

# Build the services
sheets_service = build('sheets', 'v4', credentials=creds)
drive_service = build('drive', 'v3', credentials=creds)
sheets_service = build('sheets', 'v4', credentials=creds, cache_discovery=False)
drive_service = build('drive', 'v3', credentials=creds, cache_discovery=False)

try:
# Provide the service in the context
Expand Down Expand Up @@ -907,7 +919,7 @@ def create_spreadsheet(title: str, folder_id: Optional[str] = None, ctx: Context
spreadsheet_id = spreadsheet.get('id')
parents = spreadsheet.get('parents')
folder_info = f" in folder {target_folder_id}" if target_folder_id else " in root"
print(f"Spreadsheet created with ID: {spreadsheet_id}{folder_info}")
logger.info("Spreadsheet created with ID: %s%s", spreadsheet_id, folder_info)

return {
'spreadsheetId': spreadsheet_id,
Expand Down Expand Up @@ -994,9 +1006,9 @@ def list_spreadsheets(folder_id: Optional[str] = None, ctx: Context = None) -> L
# If a specific folder is provided or configured, search only in that folder
if target_folder_id:
query += f" and '{target_folder_id}' in parents"
print(f"Searching for spreadsheets in folder: {target_folder_id}")
logger.info("Searching for spreadsheets in folder: %s", target_folder_id)
else:
print("Searching for spreadsheets in 'My Drive'")
logger.info("Searching for spreadsheets in 'My Drive'")

# List spreadsheets
results = drive_service.files().list(
Expand Down Expand Up @@ -1122,11 +1134,11 @@ def list_folders(parent_folder_id: Optional[str] = None, ctx: Context = None) ->
# If a specific parent folder is provided, search only within that folder
if parent_folder_id:
query += f" and '{parent_folder_id}' in parents"
print(f"Searching for folders in parent folder: {parent_folder_id}")
logger.info("Searching for folders in parent folder: %s", parent_folder_id)
else:
# Search in root of My Drive (folders that don't have any parent folders)
query += " and 'root' in parents"
print("Searching for folders in 'My Drive' root")
logger.info("Searching for folders in 'My Drive' root")

# List folders
results = drive_service.files().list(
Expand Down Expand Up @@ -1716,11 +1728,13 @@ def add_chart(spreadsheet_id: str,


def main():
_configure_logging()

# Log tool filtering configuration if enabled
if ENABLED_TOOLS is not None:
print(f"Tool filtering enabled. Active tools: {', '.join(sorted(ENABLED_TOOLS))}")
logger.info("Tool filtering enabled. Active tools: %s", ', '.join(sorted(ENABLED_TOOLS)))
else:
print("Tool filtering disabled. All tools are enabled.")
logger.info("Tool filtering disabled. All tools are enabled.")

# Run the server
transport = "stdio"
Expand Down
41 changes: 40 additions & 1 deletion tests/test_server_unit.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import ast
import os
import sys
import unittest
Expand Down Expand Up @@ -148,6 +149,44 @@ def test_empty_configuration_enables_all_tools(self):
self.assertIsNone(server._parse_enabled_tools())


class StdioSafetyTests(unittest.TestCase):
def test_server_module_does_not_call_print(self):
source_path = os.path.abspath(server.__file__)
with open(source_path, "r", encoding="utf-8") as source_file:
tree = ast.parse(source_file.read(), filename=source_path)

print_calls = [
node.lineno
for node in ast.walk(tree)
if isinstance(node, ast.Call)
and isinstance(node.func, ast.Name)
and node.func.id == "print"
]

self.assertEqual(print_calls, [])

def test_main_writes_no_diagnostics_to_stdout(self):
with patch.object(server.mcp, "run") as run:
with patch.object(server, "_configure_logging"):
with patch.object(server.logger, "info"):
with patch.object(sys, "argv", ["mcp-google-sheets"]):
with redirect_stdout(StringIO()) as stdout:
server.main()

self.assertEqual(stdout.getvalue(), "")
run.assert_called_once_with(transport="stdio")

def test_main_configures_logging(self):
with patch.object(server.mcp, "run") as run:
with patch.object(server, "_configure_logging") as configure_logging:
with patch.object(server.logger, "info"):
with patch.object(sys, "argv", ["mcp-google-sheets"]):
server.main()

configure_logging.assert_called_once_with()
run.assert_called_once_with(transport="stdio")


class A1HelperTests(unittest.TestCase):
def test_column_index_to_letter(self):
self.assertEqual(server._column_index_to_letter(0), "A")
Expand Down Expand Up @@ -343,7 +382,7 @@ def test_add_rows_returns_error_for_missing_sheet(self):
def test_create_spreadsheet_targets_requested_folder(self):
drive_service = RecordingDriveService()

with redirect_stdout(StringIO()):
with patch.object(server.logger, "info"):
result = server.create_spreadsheet(
"Created Sheet",
folder_id="folder-id",
Expand Down
Loading