22import json
33import logging
44import os
5+ import shutil
56import sys
67import tempfile
78import uuid
8- from pathlib import Path
99from typing import Annotated , Any
1010
1111import typer
12+ from pathvalidate import sanitize_filepath
1213from pydantic import Field
1314
1415from cycode .cli .cli_types import McpTransportOption , ScanTypeOption
2627
2728_logger = get_logger ('Cycode MCP' )
2829
29- _DEFAULT_RUN_COMMAND_TIMEOUT = 5 * 60
30+ _DEFAULT_RUN_COMMAND_TIMEOUT = 10 * 60
3031
3132_FILES_TOOL_FIELD = Field (description = 'Files to scan, mapping file paths to their content' )
3233
@@ -91,48 +92,76 @@ async def _run_cycode_command(*args: str, timeout: int = _DEFAULT_RUN_COMMAND_TI
9192 return {'error' : f'Failed to run command: { e !s} ' }
9293
9394
94- def _create_temp_files (files_content : dict [str , str ]) -> list [str ]:
95- """Create temporary files from content and return their paths."""
96- temp_dir = tempfile .mkdtemp (prefix = 'cycode_mcp_' )
97- temp_files = []
95+ def _sanitize_file_path (file_path : str ) -> str :
96+ """Sanitize file path to prevent path traversal and other security issues.
9897
99- _logger .debug ('Creating temporary files in directory: %s' , temp_dir )
98+ Args:
99+ file_path: The file path to sanitize
100100
101- for file_path , content in files_content .items ():
102- safe_filename = f'{ _gen_random_id ()} _{ Path (file_path ).name } '
103- temp_file_path = os .path .join (temp_dir , safe_filename )
101+ Returns:
102+ Sanitized file path safe for use in temporary directory
104103
105- os .makedirs (os .path .dirname (temp_file_path ), exist_ok = True )
104+ Raises:
105+ ValueError: If the path is invalid or potentially dangerous
106+ """
107+ if not file_path or not isinstance (file_path , str ):
108+ raise ValueError ('File path must be a non-empty string' )
106109
107- _logger .debug ('Creating temp file: %s' , temp_file_path )
108- with open (temp_file_path , 'w' , encoding = 'UTF-8' ) as f :
109- f .write (content )
110+ return sanitize_filepath (file_path , platform = 'auto' , validate_after_sanitize = True )
110111
111- temp_files .append (temp_file_path )
112112
113- return temp_files
113+ class _TempFilesManager :
114+ """Context manager for creating and cleaning up temporary files.
114115
116+ Creates a temporary directory structure that preserves original file paths
117+ inside a call_id as a suffix. Automatically cleans up all files and directories
118+ when exiting the context.
119+ """
115120
116- def _cleanup_temp_files (temp_files : list [str ]) -> None :
117- """Clean up temporary files and directories."""
121+ def __init__ (self , files_content : dict [str , str ], call_id : str ) -> None :
122+ self .files_content = files_content
123+ self .call_id = call_id
124+ self .temp_base_dir = None
125+ self .temp_files = []
118126
119- temp_dirs = set ()
120- for temp_file in temp_files :
121- try :
122- if os .path .exists (temp_file ):
123- _logger .debug ('Removing temp file: %s' , temp_file )
124- os .remove (temp_file )
125- temp_dirs .add (os .path .dirname (temp_file ))
126- except OSError as e :
127- _logger .warning ('Failed to remove temp file %s: %s' , temp_file , e )
128-
129- for temp_dir in temp_dirs :
130- try :
131- if os .path .exists (temp_dir ) and not os .listdir (temp_dir ):
132- _logger .debug ('Removing temp directory: %s' , temp_dir )
133- os .rmdir (temp_dir )
134- except OSError as e :
135- _logger .warning ('Failed to remove temp directory %s: %s' , temp_dir , e )
127+ def __enter__ (self ) -> list [str ]:
128+ self .temp_base_dir = tempfile .mkdtemp (prefix = 'cycode_mcp_' , suffix = self .call_id )
129+ _logger .debug ('Creating temporary files in directory: %s' , self .temp_base_dir )
130+
131+ for file_path , content in self .files_content .items ():
132+ try :
133+ sanitized_path = _sanitize_file_path (file_path )
134+ temp_file_path = os .path .join (self .temp_base_dir , sanitized_path )
135+
136+ # Ensure the normalized path is still within our temp directory
137+ normalized_temp_path = os .path .normpath (temp_file_path )
138+ normalized_base_path = os .path .normpath (self .temp_base_dir )
139+ if not normalized_temp_path .startswith (normalized_base_path + os .sep ):
140+ raise ValueError (f'Path escapes temporary directory: { file_path } ' )
141+
142+ os .makedirs (os .path .dirname (temp_file_path ), exist_ok = True )
143+
144+ _logger .debug ('Creating temp file: %s (from: %s)' , temp_file_path , file_path )
145+ with open (temp_file_path , 'w' , encoding = 'UTF-8' ) as f :
146+ f .write (content )
147+
148+ self .temp_files .append (temp_file_path )
149+ except ValueError as e :
150+ _logger .error ('Invalid file path rejected: %s - %s' , file_path , str (e ))
151+ continue
152+ except Exception as e :
153+ _logger .error ('Failed to create temp file for %s: %s' , file_path , str (e ))
154+ continue
155+
156+ if not self .temp_files :
157+ raise ValueError ('No valid files provided after sanitization' )
158+
159+ return self .temp_files
160+
161+ def __exit__ (self , * _ ) -> None :
162+ if self .temp_base_dir and os .path .exists (self .temp_base_dir ):
163+ _logger .debug ('Removing temp directory recursively: %s' , self .temp_base_dir )
164+ shutil .rmtree (self .temp_base_dir , ignore_errors = True )
136165
137166
138167async def _run_cycode_scan (scan_type : ScanTypeOption , temp_files : list [str ]) -> dict [str , Any ]:
@@ -153,21 +182,36 @@ async def _cycode_scan_tool(scan_type: ScanTypeOption, files: dict[str, str] = _
153182 _logger .error ('No files provided for scan' )
154183 return json .dumps ({'error' : 'No files provided' })
155184
156- temp_files = _create_temp_files (files )
157-
158185 try :
159- _logger .info (
160- 'Running Cycode scan, %s' ,
161- {'scan_type' : scan_type , 'files_count' : len (temp_files ), 'call_id' : _tool_call_id },
162- )
163- result = await _run_cycode_scan (scan_type , temp_files )
164- _logger .info ('Scan completed, %s' , {'scan_type' : scan_type , 'call_id' : _tool_call_id })
165- return json .dumps (result , indent = 2 )
186+ with _TempFilesManager (files , _tool_call_id ) as temp_files :
187+ original_count = len (files )
188+ processed_count = len (temp_files )
189+
190+ if processed_count < original_count :
191+ _logger .warning (
192+ 'Some files were rejected during sanitization, %s' ,
193+ {
194+ 'scan_type' : scan_type ,
195+ 'original_count' : original_count ,
196+ 'processed_count' : processed_count ,
197+ 'call_id' : _tool_call_id ,
198+ },
199+ )
200+
201+ _logger .info (
202+ 'Running Cycode scan, %s' ,
203+ {'scan_type' : scan_type , 'files_count' : processed_count , 'call_id' : _tool_call_id },
204+ )
205+ result = await _run_cycode_scan (scan_type , temp_files )
206+
207+ _logger .info ('Scan completed, %s' , {'scan_type' : scan_type , 'call_id' : _tool_call_id })
208+ return json .dumps (result , indent = 2 )
209+ except ValueError as e :
210+ _logger .error ('Invalid input files, %s' , {'scan_type' : scan_type , 'call_id' : _tool_call_id , 'error' : str (e )})
211+ return json .dumps ({'error' : f'Invalid input files: { e !s} ' }, indent = 2 )
166212 except Exception as e :
167213 _logger .error ('Scan failed, %s' , {'scan_type' : scan_type , 'call_id' : _tool_call_id , 'error' : str (e )})
168214 return json .dumps ({'error' : f'Scan failed: { e !s} ' }, indent = 2 )
169- finally :
170- _cleanup_temp_files (temp_files )
171215
172216
173217async def cycode_secret_scan (files : dict [str , str ] = _FILES_TOOL_FIELD ) -> str :
@@ -197,6 +241,10 @@ async def cycode_sca_scan(files: dict[str, str] = _FILES_TOOL_FIELD) -> str:
197241 - verify software supply chain security
198242 - review package.json, requirements.txt, pom.xml and other dependency files
199243
244+ Important:
245+ You must also include lock files (like package-lock.json, Pipfile.lock, etc.) to get accurate results.
246+ You must provide manifest and lock files together.
247+
200248 Args:
201249 files: Dictionary mapping file paths to their content
202250
0 commit comments