From 04254a3d1e79c0d7588f0cc23327cc6b804a27b1 Mon Sep 17 00:00:00 2001 From: walletkun Date: Mon, 25 Aug 2025 02:17:12 -0400 Subject: [PATCH 1/3] Finished setting up the basic LLM analysis of the monitoring. Let me know if theres any part that is confusing.Or let me know if theres anything that needs to be changed. But for now this script will analyze the basic monitoring commands like top, df, etc. --- cli/ai/__init__.py | 0 cli/ai/ai_monitor.py | 471 +++++++++++++++++++++++++++++++++++++++++++ cli/app.py | 33 ++- cli/requirements.txt | 53 ++++- 4 files changed, 555 insertions(+), 2 deletions(-) create mode 100644 cli/ai/__init__.py create mode 100644 cli/ai/ai_monitor.py diff --git a/cli/ai/__init__.py b/cli/ai/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/cli/ai/ai_monitor.py b/cli/ai/ai_monitor.py new file mode 100644 index 0000000..684c421 --- /dev/null +++ b/cli/ai/ai_monitor.py @@ -0,0 +1,471 @@ +""" +AI Monitoring Module +Collects metrics -> Analyze -> Suggest Fixes -> Execute with confirmation (ideally) +""" +import json +import subprocess +from typing import Dict, List, Optional, Tuple, Any +from dataclasses import dataclass, asdict +from enum import Enum +from dotenv import load_dotenv +import os + +from langchain_google_genai import ChatGoogleGenerativeAI +from langchain.memory import ConversationBufferWindowMemory +from langchain.schema import SystemMessage, HumanMessage +from rich.console import Console +from rich.prompt import Confirm +from rich.table import Table +from rich.panel import Panel + +load_dotenv() +GEMINI_API_KEY = os.getenv("GEMINI_API_KEY") + +class DataFormat(Enum): + """ + Experiment with different data formats to find out what works best + + Three data formats: + RAW: raw data contents + STRUCTURED: parsed into json formats + HYBRID: both of them + + """ + RAW = "raw" + STRUCTURED = "structured" + HYBRID = "hybrid" + +class Verbosity(Enum): + """ + AI output detail level + + Having three level: + CONCISE: just the essentials + NORMAL: balanced detail + DETAILED: full analysis + """ + + CONCISE = "concise" + NORMAL = "normal" + DETAILED = "detailed" + +@dataclass +class SystemSnapshot: + """ + Data structure for system state + + Usage: + This will be used for storing the data snapshot from our monitor such as: + load average + cpu cores + memory db + disk usage + """ + + load_avg: List[float] + cpu_cores: int + memory_db: Dict[str,int] + disk_usage_percent: float + + # raw outputs for experimentation + raw_uptime: Optional[str] = None + raw_free: Optional[str] = None + raw_df: Optional[str] = None + raw_top: Optional[str] = None + + def to_dict(self) -> Dict: + return {k: v for k, v in asdict(self).items() if v is not None} + + +class MetricCollector: + """ + Metric collection from shell commands + + Usage: + It will run some simple shell commands and we'll take that output + then store it in a snap shot and use it for our AI to analyze + """ + + @staticmethod + def run_cmd(cmd:str) -> str: + """Execute shell commands and return stripped output""" + try: + result = subprocess.run( + cmd, shell=True, capture_output=True, + text=True, timeout=5 + ) + + return result.stdout.strip() + except Exception as e: + # When testing I'll see what exactly the error is then isolate it with correct error exceptons + pass + + + def collect(self, include_raw: bool = False) -> SystemSnapshot: + """Collecting current system metrics""" + + # getting the strcutured data + uptime_raw = self.run_cmd("uptime") + # using awk to pattern search for 'average' and output the value $2 + load_str = self.run_cmd("uptime | awk -F'average:' '{print $2}'") + load_avg = [float(x.strip(',')) for x in load_str.split()] + + # Number of cpu cores + cores = int(self.run_cmd("nproc")) + # memory information + mem_info = self.run_cmd("free -m | grep Mem").split() + memory = { + "total": int(mem_info[1]), + "used": int(mem_info[2]), + "free": int(mem_info[3]), + } + + # disk information + disk_info = self.run_cmd("df -h / | tail -1").split() + disk_percent = float(disk_info[4].strip("%")) + + # create the snapshot + snapshot = SystemSnapshot( + load_avg=load_avg, + cpu_cores=cores, + memory_db=memory, + disk_usage_percent=disk_percent + ) + + # if we want to check out the raw output we can just set that to true when calling it + if include_raw: + snapshot.raw_uptime = uptime_raw + snapshot.raw_free = self.run_cmd("free -m") + snapshot.raw_df = self.run_cmd("df -h") + snapshot.raw_top = self.run_cmd("top -bn1 | head -20") + + # otherwise we'll just return the snapshot we got + return snapshot + + +class CommandExecutor: + """ + Command execution with confirmation by users + + Usage: + Uses for actionable fixes determine by the LLM + User can then determine if they want to use such commands to allow fix + """ + + FIX_COMMANDS = [ + "systemctl restart", + "systemctl stop", + "kill", + "nice", + "renice", + "sync", + # found this on stack overflow: clear caches + "echo 3 > /proc/sys/vm/drop_caches" + ] + + DIAGNOSTICS_COMMANDS = [ + "systemctl status", + "journalctl", + "ps aux", + "netstat", + "free", + "lsof", + "df", + "top -bn1" + ] + + + def __init__(self, console: Console): + self.console = console + + def is_diagnostic_command(self, command: str) -> bool: + """Checking if the command is part of what we have in the diagnostics commands""" + return any(cmd in command for cmd in self.DIAGNOSTICS_COMMANDS) + + def execute(self, command: str, require_confirm: bool = True) -> Tuple[bool, str]: + """ + Execution of the command from LLM with user confirmation + + Usage: + LLM will analyze our metrics and come up with some sort of solution that allows actionable cmds + Thus user can check the command and confirm with LLM to give consent to action those cmds + """ + + if not self.is_diagnostic_command(command): + if require_confirm: + self.console.print(f"[yellow]Command: {command}[/yellow]") + if not Confirm.ask("Execute this fix?"): + return False, "Cancelled by user" + + + try: + result = subprocess.run( + command, shell=True, capture_output=True, + text=True, timeout=30 + ) + + return result.returncode == 0, result.stdout + + except Exception as e: + return False, str("Error is: ", e) + + + +class AIMonitor: + """ + Main part of the execution for the AI analysis + """ + # we will need to tweak the prompt based on the output, maybe we'll create like a ranking system to value which kind + # of the output we'd like to display + + SYSTEM_PROMPT = ''' + You are a DevOps expert analyzing system metrics. + + Your job: + 1. Analyze the provided system state + 2. Identify any issues or optimization opportunities + 3. Suggest specific, actionable fixes + + Be CONCISE and ACTIONABLE. Format your response as: + SUMMARY: One sentence system status + ISSUES: Bullet points of problems (if theres any) + ACTIONS: Specific commands to fix issues (if theres any) + + ONLY suggest fixes if there are actual problems. + ''' + + + def __init__(self, \ + data_format: DataFormat = DataFormat.HYBRID, \ + verbosity: Verbosity = Verbosity.NORMAL, \ + memory_window: int = 3, + ): + self.console = Console() + self.data_format = data_format + self.verbosity = verbosity + + # calling in the LLM + self.llm = ChatGoogleGenerativeAI( + model="gemini-2.5-flash", + google_api_key=GEMINI_API_KEY, + temperature=0.1 + ) + + # memory for context + self.memory = ConversationBufferWindowMemory( + k=memory_window, + return_messages=True, + ) + + self.collector = MetricCollector() + self.executor = CommandExecutor(self.console) + + + def _prepare_data(self, snapshot: SystemSnapshot) -> str: + """Prepare data based on format setting""" + + if self.data_format == DataFormat.RAW: + return f""" + Raw outputs: + uptime: {snapshot.raw_uptime} + free: {snapshot.raw_free} + df: {snapshot.raw_df} + """ + elif self.data_format == DataFormat.STRUCTURED: + # Sending structure data with json + data = { + "load_avg" : snapshot.load_avg, + "cores": snapshot.cpu_cores, + "load_per_core": [load/snapshot.cpu_cores for load in snapshot.load_avg], + "memory" : snapshot.memory_db, + "memory_usage_percent": (snapshot.memory_db['used'] / snapshot.memory_db['total']) * 100, + "disk_usage_percent" : snapshot.disk_usage_percent, + } + + return f"System Metrics:\n{json.dumps(data, indent=2)}" + + + else: + structured = { + "load_per_core": [l/snapshot.cpu_cores for l in snapshot.load_avg], + "memory_usage_percent": (snapshot.memory_db["used"] / snapshot.memory_db["total"]) * 100, + "disk_usage_percent": snapshot.disk_usage_percent + } + + return f""" + Structured metrics: {json.dumps(structured, indent=2)} + + Raw context: + {snapshot.raw_uptime} + {snapshot.raw_free} + """ + + def _format_response(self, response:str) -> str: + """Formatting response based on verbosity""" + if self.verbosity == Verbosity.CONCISE: + # extract only the summary and critical actions + lines = response.split("\n") + important = [ line for line in lines if any( + k in line.upper() for k in ["SUMMARY:", "CRITICAL:", "ACTION:"] + )] + + # maxing 3 lines + return "\n".join(important[3:]) + + elif self.verbosity == Verbosity.DETAILED: + # add in extra context + return f"{response}\n\n[Context: Using {self.data_format.value} format]" + + # If normal is selected then we'll just return regular response + return response + + + # analyzing section + def analyze(self) -> Dict[str, Any]: + """ + Workflow: collect data -> analyze -> suggest + Return: dict with summary, issues, and suggested commands (fixes) + """ + + # collecting the data + self.console.print("[blue]Collecting system metrics...[/blue]") + snapshot = self.collector.collect( + include_raw=(self.data_format != DataFormat.STRUCTURED) + ) + + # preparing the data for AI + data_str = self._prepare_data(snapshot) + + # ai analysis + self.console.print("[green]Analyzing system state...[/green]") + + messages = [ + SystemMessage(content=self.SYSTEM_PROMPT), + HumanMessage(content=data_str), + ] + + response = self.llm.invoke(messages) + formatted_response = self._format_response(response.content) + + # parsing the response for commands + suggested_commands = self._extract_commands(response.content) + + # saving to memory so the LLM can continue investigating + self.memory.save_context( + {'input': f"System analysis at {snapshot.load_avg}"}, + {'output': formatted_response} + ) + + + return { + "snapshot": snapshot, + "analysis": formatted_response, + "commands": suggested_commands + } + + + def _extract_commands(self, response:str) -> List[str]: + """Extracting suggested commands from AI responses""" + commands = [] + lines = response.split("\n") + + + for line in lines: + line = line.strip() + + # looking for lines that looks like it has commands + if line.startswith("$") or line.startswith("sudo") or "systemctl" in line: + # clean up the command + cmd = line.strip("$").strip() + + if cmd: + commands.append(cmd) + + + + return commands + + def run_fixes(self, commands: List[str]) -> List[Dict]: + """Execute suggested fixes with user confirmation""" + results = [] + + for cmd in commands: + self.console.print(f"\n[yellow]Suggested fix:[/yellow] {cmd}") + + if Confirm.ask("Execute this command?"): + success, output = self.executor.execute(cmd) + + results.append({ + "command": cmd, + "success": success, + "output": output[:200] + }) + + if success: + self.console.print("[green]Command executed[/green]") + else: + self.console.print(f"[red]Failed: {output}[/red]") + else: + results.append({ + "command": cmd, + "success": False, + "output": "Skipped by user" + }) + + return results + + def monitor_loop(self): + """Main monitoring loop""" + self.console.print(Panel.fit( + "[bold cyan]AI System Monitor[/bold cyan]\n" + "Analyzing system and suggesting optimizations", + border_style="cyan" + )) + + # analyze system + result = self.analyze() + + # display analysis + self.console.print("\n[bold]Analysis:[/bold]") + self.console.print(result["analysis"]) + + # if there are suggested fixes + if result["commands"]: + self.console.print(f"\n[yellow]Found {len(result['commands'])} suggested fixes[/yellow]") + + if Confirm.ask("Would you like to review and execute fixes?"): + fix_results = self.run_fixes(result["commands"]) + + # print out the summary + self.console.print("\n[bold]Execution Summary:[/bold]") + for r in fix_results: + status = "[green]:)[/green]" if r["success"] else "[red]:([/red]" + self.console.print(f"{status} {r['command']}") + else: + self.console.print("\n[green]System is healthy, no fixes needed[/green]") + + + + +# intergration point for main cli +def run_ai_monitor( + data_format: str = "hybrid", + verbosity: str = "normal", + auto_fix: bool = False +): + + """Entry point from main CLI""" + try: + monitor = AIMonitor( + data_format=DataFormat[data_format.upper()], + verbosity=Verbosity[verbosity.upper()] + ) + monitor.monitor_loop() + + except Exception as e: + Console().print(f"[red]Error: {str(e)}[/red]") + Console().print("[yellow]Check the API key and try again[/yellow]") + + + + diff --git a/cli/app.py b/cli/app.py index d6ae4fe..fc65686 100644 --- a/cli/app.py +++ b/cli/app.py @@ -2,7 +2,7 @@ import typer from rich import print from typing import Annotated - +import os app = typer.Typer(help = 'Surge - A DevOps CLI Tool For System Monitoring and Production Reliability') @@ -131,5 +131,36 @@ def network( print(f'Testing network connection to {url} with {requests} requests.') # TODO: Add http requests or use curl through subprocess + +@app.command() +def ai( + format: Annotated[str, typer.Option("--format", '-f',help='Data format: raw/structured/hybrid')] = "hybrid", + verbosity: Annotated[str, typer.Option('--verbosity', '-v', help="Output detail: concise/normal/hybrid")] = 'normal', + auto_fix: Annotated[bool, typer.Option("--auto-fix", help="Auto-execute safe fixes")] = False, +): + """ + Using Gemini 2.5 Flash for now for simple AI suggestions and reading through machine metrics + giving suggest fixes upon user confirmations + """ + + if not os.getenv('GEMINI_API_KEY'): + print('[red]Error: GEMINI_API_KEY is empty[/red]') + print('Set it with: export GEMINI_API_KEY="your api key"') + return + + try: + from ai.ai_monitor import run_ai_monitor + + run_ai_monitor( + data_format=format, + verbosity=verbosity, + auto_fix=auto_fix + ) + except ImportError: + print('[red]AI packages not installed[/red]') + print('Run: pip install langchain langchain-google-genai rich') + except Exception as e: + print(f'[red]Error: {str(e)}[/red]') + if __name__ == "__main__": app() \ No newline at end of file diff --git a/cli/requirements.txt b/cli/requirements.txt index 7379290..1552890 100644 --- a/cli/requirements.txt +++ b/cli/requirements.txt @@ -1 +1,52 @@ -typer \ No newline at end of file +annotated-types +anyio +cachetools +certifi +charset-normalizer +click +dotenv +filetype +google-ai-generativelanguage +google-api-core +google-auth +googleapis-common-protos +greenlet +grpcio +grpcio-status +h11 +httpcore +httpx +idna +jsonpatch +jsonpointer +langchain +langchain-core +langchain-google-genai +langchain-text-splitters +langsmith +markdown-it-py +mdurl +orjson +packaging +proto-plus +protobuf +pyasn1 +pyasn1_modules +pydantic +pydantic_core +Pygments +python-dotenv +PyYAML +requests +requests-toolbelt +rich +rsa +shellingham +sniffio +SQLAlchemy +tenacity +typer +typing-inspection +typing_extensions +urllib3 +zstandard From 93802cd8ed96f7dc0d6e86d337fa3f6e20853380 Mon Sep 17 00:00:00 2001 From: sidsun1 Date: Mon, 25 Aug 2025 23:11:33 -0700 Subject: [PATCH 2/3] Updated code formatting with ruff + supressed warnings --- cli/ai/ai_monitor.py | 13 ++--- cli/app.py | 133 +++++++++++++++++++++++++------------------ cli/requirements.txt | 1 + 3 files changed, 86 insertions(+), 61 deletions(-) diff --git a/cli/ai/ai_monitor.py b/cli/ai/ai_monitor.py index 684c421..7b77331 100644 --- a/cli/ai/ai_monitor.py +++ b/cli/ai/ai_monitor.py @@ -15,7 +15,6 @@ from langchain.schema import SystemMessage, HumanMessage from rich.console import Console from rich.prompt import Confirm -from rich.table import Table from rich.panel import Panel load_dotenv() @@ -96,7 +95,7 @@ def run_cmd(cmd:str) -> str: ) return result.stdout.strip() - except Exception as e: + except Exception: # When testing I'll see what exactly the error is then isolate it with correct error exceptons pass @@ -206,8 +205,8 @@ def execute(self, command: str, require_confirm: bool = True) -> Tuple[bool, str return result.returncode == 0, result.stdout - except Exception as e: - return False, str("Error is: ", e) + except Exception as err: + return False, str("Error is: ", err) @@ -287,7 +286,7 @@ def _prepare_data(self, snapshot: SystemSnapshot) -> str: else: structured = { - "load_per_core": [l/snapshot.cpu_cores for l in snapshot.load_avg], + "load_per_core": [line/snapshot.cpu_cores for line in snapshot.load_avg], "memory_usage_percent": (snapshot.memory_db["used"] / snapshot.memory_db["total"]) * 100, "disk_usage_percent": snapshot.disk_usage_percent } @@ -462,8 +461,8 @@ def run_ai_monitor( ) monitor.monitor_loop() - except Exception as e: - Console().print(f"[red]Error: {str(e)}[/red]") + except Exception as err: + Console().print(f"[red]Error: {str(err)}[/red]") Console().print("[yellow]Check the API key and try again[/yellow]") diff --git a/cli/app.py b/cli/app.py index fc65686..12a89bf 100644 --- a/cli/app.py +++ b/cli/app.py @@ -1,10 +1,12 @@ +import os import subprocess import typer from rich import print from typing import Annotated -import os -app = typer.Typer(help = 'Surge - A DevOps CLI Tool For System Monitoring and Production Reliability') +app = typer.Typer( + help="Surge - A DevOps CLI Tool For System Monitoring and Production Reliability" +) def run_cmd(cmd: str) -> str: @@ -12,7 +14,9 @@ def run_cmd(cmd: str) -> str: Helper function to abstract lengthy subprocess command implementation :D """ - return subprocess.run(cmd, shell=True, capture_output=True, text=True).stdout.strip() + return subprocess.run( + cmd, shell=True, capture_output=True, text=True + ).stdout.strip() def get_load() -> tuple[float] | int: @@ -21,8 +25,8 @@ def get_load() -> tuple[float] | int: """ uptime = run_cmd("uptime | awk -F'average:' '{print $2}'") - averages = [float(x.replace(',', '')) for x in uptime.split()] - cores = run_cmd('nproc') + averages = [float(x.replace(",", "")) for x in uptime.split()] + cores = run_cmd("nproc") return averages, cores @@ -31,7 +35,7 @@ def get_cpu() -> tuple[float]: """ Uses top to find CPU utilization grouped by user, system, and idle percents. """ - top = run_cmd('top -bn1 | grep "Cpu(s)"').split(',') + top = run_cmd('top -bn1 | grep "Cpu(s)"').split(",") user = top[0].split()[-2] system = top[1].split()[0] idle = top[3].split()[0] @@ -43,7 +47,7 @@ def get_memory() -> tuple[float]: """ Returns remaining memory by group using free. """ - free = run_cmd('free -m | grep Mem').split() + free = run_cmd("free -m | grep Mem").split() total, used, free_mem = free[1], free[2], free[3] return total, used, free_mem @@ -53,7 +57,7 @@ def get_disk() -> tuple[float]: """ Returns disk usage with df. """ - df = run_cmd('df -h / | tail -1').split() + df = run_cmd("df -h / | tail -1").split() size, used, available, percent = df[1], df[2], df[3], df[4] return size, used, available, percent @@ -66,13 +70,21 @@ def get_io() -> tuple[float]: @app.command() def monitor( - load: Annotated[bool, typer.Option('-l', '--load', help = 'Show system load averages')] = True, - cpu: Annotated[bool, typer.Option('-c', '--cpu', help = 'Show CPU usage')] = True, - ram: Annotated[bool, typer.Option('-r', '--ram', help = 'Show RAM usage')] = True, - disk: Annotated[bool, typer.Option('-d', '--disk', help = 'Show Disk usage')] = True, - io: Annotated[bool, typer.Option('-o', '--io', help = 'Show Disk I/O statistics')] = True, - interval: Annotated[int, typer.Option('-i', '--interval', help = 'Polling interval in seconds')] = 5, - verbose: Annotated[bool, typer.Option('-v', '--verbose', help = 'Show detailed system metrics')] = False + load: Annotated[ + bool, typer.Option("-l", "--load", help="Show system load averages") + ] = True, + cpu: Annotated[bool, typer.Option("-c", "--cpu", help="Show CPU usage")] = True, + ram: Annotated[bool, typer.Option("-r", "--ram", help="Show RAM usage")] = True, + disk: Annotated[bool, typer.Option("-d", "--disk", help="Show Disk usage")] = True, + io: Annotated[ + bool, typer.Option("-o", "--io", help="Show Disk I/O statistics") + ] = True, + interval: Annotated[ + int, typer.Option("-i", "--interval", help="Polling interval in seconds") + ] = 5, + verbose: Annotated[ + bool, typer.Option("-v", "--verbose", help="Show detailed system metrics") + ] = False, ): """ Summary of all system metrics, including utilization of CPU, Memory, Network, and I/O. @@ -85,82 +97,95 @@ def monitor( # if verbose: # ... - if load: averages, cores = get_load() - print('\n[bold]System Load Averages[/bold]') - print('---------------------') + print("\n[bold]System Load Averages[/bold]") + print("---------------------") if not verbose: - print(f'Load avg (1m): {averages[0]}') - print(f'Load avg (5m): {averages[1]}') - print(f'Load avg (15m): {averages[2]}') + print(f"Load avg (1m): {averages[0]}") + print(f"Load avg (5m): {averages[1]}") + print(f"Load avg (15m): {averages[2]}") else: - print(f'Load avg (1m): {averages[0]:.2f} ({averages[0] / cores:.3f} per CPU)') - print(f'Load avg (5m): {averages[1]:.2f} ({averages[1] / cores:.3f} per CPU)') - print(f'Load avg (15m): {averages[2]:.2f} ({averages[2] / cores:.3f} per CPU)') - + print( + f"Load avg (1m): {averages[0]:.2f} ({averages[0] / cores:.3f} per CPU)" + ) + print( + f"Load avg (5m): {averages[1]:.2f} ({averages[1] / cores:.3f} per CPU)" + ) + print( + f"Load avg (15m): {averages[2]:.2f} ({averages[2] / cores:.3f} per CPU)" + ) + if cpu: user, system, idle = get_cpu() - print('\n[bold]CPU Utilization[/bold]') - print('---------------------') - print(f'User: {user}% | System: {system}% | Idle: {idle}%') - + print("\n[bold]CPU Utilization[/bold]") + print("---------------------") + print(f"User: {user}% | System: {system}% | Idle: {idle}%") + if ram: total, used, free = get_memory() - print('\n[bold]Memory Usage (MB)[/bold]') - print('---------------------') - print(f'Total: {total} | Used: {used} | Free: {free}') - + print("\n[bold]Memory Usage (MB)[/bold]") + print("---------------------") + print(f"Total: {total} | Used: {used} | Free: {free}") + if disk: size, used, available, percent = get_disk() - print('\n[bold]Disk Usage[/bold]') - print('---------------------') - print(f'Size: {size} | Used: {used} | Available: {available} | Usage: {percent}') + print("\n[bold]Disk Usage[/bold]") + print("---------------------") + print( + f"Size: {size} | Used: {used} | Available: {available} | Usage: {percent}" + ) @app.command() def network( - url: Annotated[str, typer.Argument(help = 'URL to test network/API metrics')], - requests: Annotated[int, typer.Option('-n', '--count', help = 'Number of requests to send')] = 5 + url: Annotated[str, typer.Argument(help="URL to test network/API metrics")], + requests: Annotated[ + int, typer.Option("-n", "--count", help="Number of requests to send") + ] = 5, ): """ Run basic network/API tests with a number of requests. """ - print(f'Testing network connection to {url} with {requests} requests.') + print(f"Testing network connection to {url} with {requests} requests.") # TODO: Add http requests or use curl through subprocess @app.command() def ai( - format: Annotated[str, typer.Option("--format", '-f',help='Data format: raw/structured/hybrid')] = "hybrid", - verbosity: Annotated[str, typer.Option('--verbosity', '-v', help="Output detail: concise/normal/hybrid")] = 'normal', - auto_fix: Annotated[bool, typer.Option("--auto-fix", help="Auto-execute safe fixes")] = False, + format: Annotated[ + str, typer.Option("--format", "-f", help="Data format: raw/structured/hybrid") + ] = "hybrid", + verbosity: Annotated[ + str, + typer.Option("--verbosity", "-v", help="Output detail: concise/normal/hybrid"), + ] = "normal", + auto_fix: Annotated[ + bool, typer.Option("--auto-fix", help="Auto-execute safe fixes") + ] = False, ): """ Using Gemini 2.5 Flash for now for simple AI suggestions and reading through machine metrics giving suggest fixes upon user confirmations """ - if not os.getenv('GEMINI_API_KEY'): - print('[red]Error: GEMINI_API_KEY is empty[/red]') + if not os.getenv("GEMINI_API_KEY"): + print("[red]Error: GEMINI_API_KEY is empty[/red]") print('Set it with: export GEMINI_API_KEY="your api key"') return try: from ai.ai_monitor import run_ai_monitor - - run_ai_monitor( - data_format=format, - verbosity=verbosity, - auto_fix=auto_fix - ) + + run_ai_monitor(data_format=format, verbosity=verbosity, auto_fix=auto_fix) except ImportError: - print('[red]AI packages not installed[/red]') - print('Run: pip install langchain langchain-google-genai rich') + print("[red]AI packages not installed[/red]") + print("Run: pip install langchain langchain-google-genai rich") except Exception as e: - print(f'[red]Error: {str(e)}[/red]') + print(f"[red]Error: {str(e)}[/red]") + if __name__ == "__main__": - app() \ No newline at end of file + app() diff --git a/cli/requirements.txt b/cli/requirements.txt index 1552890..dd2512b 100644 --- a/cli/requirements.txt +++ b/cli/requirements.txt @@ -41,6 +41,7 @@ requests requests-toolbelt rich rsa +ruff shellingham sniffio SQLAlchemy From 0218936c13a3bab5302ab41d65403e691717d9b6 Mon Sep 17 00:00:00 2001 From: sidsun1 Date: Mon, 1 Sep 2025 20:47:35 -0700 Subject: [PATCH 3/3] Updated and reformatted codebase including llm summary --- cli/ai/ai_monitor.py | 177 ++++++++++++++++++++---------------------- cli/app.py | 75 ++++++++++++++---- tests/test_network.py | 27 ++++++- 3 files changed, 171 insertions(+), 108 deletions(-) diff --git a/cli/ai/ai_monitor.py b/cli/ai/ai_monitor.py index 7b77331..a6053dd 100644 --- a/cli/ai/ai_monitor.py +++ b/cli/ai/ai_monitor.py @@ -2,6 +2,7 @@ AI Monitoring Module Collects metrics -> Analyze -> Suggest Fixes -> Execute with confirmation (ideally) """ + import json import subprocess from typing import Dict, List, Optional, Tuple, Any @@ -20,6 +21,7 @@ load_dotenv() GEMINI_API_KEY = os.getenv("GEMINI_API_KEY") + class DataFormat(Enum): """ Experiment with different data formats to find out what works best @@ -28,12 +30,14 @@ class DataFormat(Enum): RAW: raw data contents STRUCTURED: parsed into json formats HYBRID: both of them - + """ - RAW = "raw" + + RAW = "raw" STRUCTURED = "structured" HYBRID = "hybrid" + class Verbosity(Enum): """ AI output detail level @@ -48,6 +52,7 @@ class Verbosity(Enum): NORMAL = "normal" DETAILED = "detailed" + @dataclass class SystemSnapshot: """ @@ -63,10 +68,10 @@ class SystemSnapshot: load_avg: List[float] cpu_cores: int - memory_db: Dict[str,int] + memory_db: Dict[str, int] disk_usage_percent: float - # raw outputs for experimentation + # raw outputs for experimentation raw_uptime: Optional[str] = None raw_free: Optional[str] = None raw_df: Optional[str] = None @@ -86,12 +91,11 @@ class MetricCollector: """ @staticmethod - def run_cmd(cmd:str) -> str: + def run_cmd(cmd: str) -> str: """Execute shell commands and return stripped output""" try: result = subprocess.run( - cmd, shell=True, capture_output=True, - text=True, timeout=5 + cmd, shell=True, capture_output=True, text=True, timeout=5 ) return result.stdout.strip() @@ -99,7 +103,6 @@ def run_cmd(cmd:str) -> str: # When testing I'll see what exactly the error is then isolate it with correct error exceptons pass - def collect(self, include_raw: bool = False) -> SystemSnapshot: """Collecting current system metrics""" @@ -107,7 +110,7 @@ def collect(self, include_raw: bool = False) -> SystemSnapshot: uptime_raw = self.run_cmd("uptime") # using awk to pattern search for 'average' and output the value $2 load_str = self.run_cmd("uptime | awk -F'average:' '{print $2}'") - load_avg = [float(x.strip(',')) for x in load_str.split()] + load_avg = [float(x.strip(",")) for x in load_str.split()] # Number of cpu cores cores = int(self.run_cmd("nproc")) @@ -118,7 +121,7 @@ def collect(self, include_raw: bool = False) -> SystemSnapshot: "used": int(mem_info[2]), "free": int(mem_info[3]), } - + # disk information disk_info = self.run_cmd("df -h / | tail -1").split() disk_percent = float(disk_info[4].strip("%")) @@ -128,7 +131,7 @@ def collect(self, include_raw: bool = False) -> SystemSnapshot: load_avg=load_avg, cpu_cores=cores, memory_db=memory, - disk_usage_percent=disk_percent + disk_usage_percent=disk_percent, ) # if we want to check out the raw output we can just set that to true when calling it @@ -147,7 +150,7 @@ class CommandExecutor: Command execution with confirmation by users Usage: - Uses for actionable fixes determine by the LLM + Uses for actionable fixes determine by the LLM User can then determine if they want to use such commands to allow fix """ @@ -159,7 +162,7 @@ class CommandExecutor: "renice", "sync", # found this on stack overflow: clear caches - "echo 3 > /proc/sys/vm/drop_caches" + "echo 3 > /proc/sys/vm/drop_caches", ] DIAGNOSTICS_COMMANDS = [ @@ -170,10 +173,9 @@ class CommandExecutor: "free", "lsof", "df", - "top -bn1" + "top -bn1", ] - def __init__(self, console: Console): self.console = console @@ -183,7 +185,7 @@ def is_diagnostic_command(self, command: str) -> bool: def execute(self, command: str, require_confirm: bool = True) -> Tuple[bool, str]: """ - Execution of the command from LLM with user confirmation + Execution of the command from LLM with user confirmation Usage: LLM will analyze our metrics and come up with some sort of solution that allows actionable cmds @@ -196,11 +198,9 @@ def execute(self, command: str, require_confirm: bool = True) -> Tuple[bool, str if not Confirm.ask("Execute this fix?"): return False, "Cancelled by user" - try: result = subprocess.run( - command, shell=True, capture_output=True, - text=True, timeout=30 + command, shell=True, capture_output=True, text=True, timeout=30 ) return result.returncode == 0, result.stdout @@ -209,15 +209,15 @@ def execute(self, command: str, require_confirm: bool = True) -> Tuple[bool, str return False, str("Error is: ", err) - class AIMonitor: """ Main part of the execution for the AI analysis """ + # we will need to tweak the prompt based on the output, maybe we'll create like a ranking system to value which kind # of the output we'd like to display - SYSTEM_PROMPT = ''' + SYSTEM_PROMPT = """ You are a DevOps expert analyzing system metrics. Your job: @@ -231,13 +231,13 @@ class AIMonitor: ACTIONS: Specific commands to fix issues (if theres any) ONLY suggest fixes if there are actual problems. - ''' - + """ - def __init__(self, \ - data_format: DataFormat = DataFormat.HYBRID, \ - verbosity: Verbosity = Verbosity.NORMAL, \ - memory_window: int = 3, + def __init__( + self, + data_format: DataFormat = DataFormat.HYBRID, + verbosity: Verbosity = Verbosity.NORMAL, + memory_window: int = 3, ): self.console = Console() self.data_format = data_format @@ -245,9 +245,7 @@ def __init__(self, \ # calling in the LLM self.llm = ChatGoogleGenerativeAI( - model="gemini-2.5-flash", - google_api_key=GEMINI_API_KEY, - temperature=0.1 + model="gemini-2.5-flash", google_api_key=GEMINI_API_KEY, temperature=0.1 ) # memory for context @@ -259,7 +257,6 @@ def __init__(self, \ self.collector = MetricCollector() self.executor = CommandExecutor(self.console) - def _prepare_data(self, snapshot: SystemSnapshot) -> str: """Prepare data based on format setting""" @@ -273,22 +270,31 @@ def _prepare_data(self, snapshot: SystemSnapshot) -> str: elif self.data_format == DataFormat.STRUCTURED: # Sending structure data with json data = { - "load_avg" : snapshot.load_avg, + "load_avg": snapshot.load_avg, "cores": snapshot.cpu_cores, - "load_per_core": [load/snapshot.cpu_cores for load in snapshot.load_avg], - "memory" : snapshot.memory_db, - "memory_usage_percent": (snapshot.memory_db['used'] / snapshot.memory_db['total']) * 100, - "disk_usage_percent" : snapshot.disk_usage_percent, + "load_per_core": [ + load / snapshot.cpu_cores for load in snapshot.load_avg + ], + "memory": snapshot.memory_db, + "memory_usage_percent": ( + snapshot.memory_db["used"] / snapshot.memory_db["total"] + ) + * 100, + "disk_usage_percent": snapshot.disk_usage_percent, } return f"System Metrics:\n{json.dumps(data, indent=2)}" - else: structured = { - "load_per_core": [line/snapshot.cpu_cores for line in snapshot.load_avg], - "memory_usage_percent": (snapshot.memory_db["used"] / snapshot.memory_db["total"]) * 100, - "disk_usage_percent": snapshot.disk_usage_percent + "load_per_core": [ + line / snapshot.cpu_cores for line in snapshot.load_avg + ], + "memory_usage_percent": ( + snapshot.memory_db["used"] / snapshot.memory_db["total"] + ) + * 100, + "disk_usage_percent": snapshot.disk_usage_percent, } return f""" @@ -299,14 +305,16 @@ def _prepare_data(self, snapshot: SystemSnapshot) -> str: {snapshot.raw_free} """ - def _format_response(self, response:str) -> str: + def _format_response(self, response: str) -> str: """Formatting response based on verbosity""" if self.verbosity == Verbosity.CONCISE: # extract only the summary and critical actions lines = response.split("\n") - important = [ line for line in lines if any( - k in line.upper() for k in ["SUMMARY:", "CRITICAL:", "ACTION:"] - )] + important = [ + line + for line in lines + if any(k in line.upper() for k in ["SUMMARY:", "CRITICAL:", "ACTION:"]) + ] # maxing 3 lines return "\n".join(important[3:]) @@ -318,7 +326,6 @@ def _format_response(self, response:str) -> str: # If normal is selected then we'll just return regular response return response - # analyzing section def analyze(self) -> Dict[str, Any]: """ @@ -351,24 +358,21 @@ def analyze(self) -> Dict[str, Any]: # saving to memory so the LLM can continue investigating self.memory.save_context( - {'input': f"System analysis at {snapshot.load_avg}"}, - {'output': formatted_response} + {"input": f"System analysis at {snapshot.load_avg}"}, + {"output": formatted_response}, ) - return { "snapshot": snapshot, "analysis": formatted_response, - "commands": suggested_commands + "commands": suggested_commands, } - - def _extract_commands(self, response:str) -> List[str]: + def _extract_commands(self, response: str) -> List[str]: """Extracting suggested commands from AI responses""" commands = [] lines = response.split("\n") - for line in lines: line = line.strip() @@ -376,65 +380,63 @@ def _extract_commands(self, response:str) -> List[str]: if line.startswith("$") or line.startswith("sudo") or "systemctl" in line: # clean up the command cmd = line.strip("$").strip() - + if cmd: commands.append(cmd) - - return commands def run_fixes(self, commands: List[str]) -> List[Dict]: """Execute suggested fixes with user confirmation""" results = [] - + for cmd in commands: self.console.print(f"\n[yellow]Suggested fix:[/yellow] {cmd}") - + if Confirm.ask("Execute this command?"): success, output = self.executor.execute(cmd) - - results.append({ - "command": cmd, - "success": success, - "output": output[:200] - }) - + + results.append( + {"command": cmd, "success": success, "output": output[:200]} + ) + if success: self.console.print("[green]Command executed[/green]") else: self.console.print(f"[red]Failed: {output}[/red]") else: - results.append({ - "command": cmd, - "success": False, - "output": "Skipped by user" - }) - + results.append( + {"command": cmd, "success": False, "output": "Skipped by user"} + ) + return results def monitor_loop(self): """Main monitoring loop""" - self.console.print(Panel.fit( - "[bold cyan]AI System Monitor[/bold cyan]\n" - "Analyzing system and suggesting optimizations", - border_style="cyan" - )) - + self.console.print( + Panel.fit( + "[bold cyan]AI System Monitor[/bold cyan]\n" + "Analyzing system and suggesting optimizations", + border_style="cyan", + ) + ) + # analyze system result = self.analyze() - + # display analysis self.console.print("\n[bold]Analysis:[/bold]") self.console.print(result["analysis"]) - + # if there are suggested fixes if result["commands"]: - self.console.print(f"\n[yellow]Found {len(result['commands'])} suggested fixes[/yellow]") - + self.console.print( + f"\n[yellow]Found {len(result['commands'])} suggested fixes[/yellow]" + ) + if Confirm.ask("Would you like to review and execute fixes?"): fix_results = self.run_fixes(result["commands"]) - + # print out the summary self.console.print("\n[bold]Execution Summary:[/bold]") for r in fix_results: @@ -443,28 +445,19 @@ def monitor_loop(self): else: self.console.print("\n[green]System is healthy, no fixes needed[/green]") - - # intergration point for main cli def run_ai_monitor( - data_format: str = "hybrid", - verbosity: str = "normal", - auto_fix: bool = False + data_format: str = "hybrid", verbosity: str = "normal", auto_fix: bool = False ): - """Entry point from main CLI""" try: monitor = AIMonitor( data_format=DataFormat[data_format.upper()], - verbosity=Verbosity[verbosity.upper()] + verbosity=Verbosity[verbosity.upper()], ) monitor.monitor_loop() except Exception as err: Console().print(f"[red]Error: {str(err)}[/red]") Console().print("[yellow]Check the API key and try again[/yellow]") - - - - diff --git a/cli/app.py b/cli/app.py index a29a890..83f2a67 100644 --- a/cli/app.py +++ b/cli/app.py @@ -9,6 +9,7 @@ help="Surge - A DevOps CLI Tool For System Monitoring and Production Reliability" ) + def run_cmd(cmd: str) -> str: """ Helper function to abstract lengthy subprocess command implementation :D @@ -86,6 +87,9 @@ def monitor( bool, typer.Option("-v", "--verbose", help="Show detailed system metrics") ] = False, ): + if interval <= 0: + raise typer.BadParameter("Interval must be a positive integer.") + """ Summary of all system metrics, including utilization of CPU, Memory, Network, and I/O. """ @@ -138,15 +142,38 @@ def monitor( f"Size: {size} | Used: {used} | Available: {available} | Usage: {percent}" ) + @app.command("network") def network( - url: Annotated[str | None, typer.Option("-u", "--url", help="HTTP URL to test (curl)", show_default=False)] = None, - host: Annotated[str | None, typer.Option("-h", "--host", help="Host/IP for ping and traceroute", show_default=False)] = None, - domain: Annotated[str | None, typer.Option("-d", "--domain", help="Domain for DNS lookup", show_default=False)] = None, - requests: Annotated[int, typer.Option("-n", "--count", help="Number of ICMP echo requests")] = 5, - dtype: Annotated[str, typer.Option("-t", "--type", help="DNS record type (A, AAAA, MX, TXT, etc.)")] = "A", - sockets: Annotated[bool, typer.Option("--sockets", help="Show socket information (ss)")] = False, - no_trace: Annotated[bool, typer.Option("--no-trace", help="Skip traceroute/mtr when --host is set")] = False, + url: Annotated[ + str | None, + typer.Option("-u", "--url", help="HTTP URL to test (curl)", show_default=False), + ] = None, + host: Annotated[ + str | None, + typer.Option( + "-h", "--host", help="Host/IP for ping and traceroute", show_default=False + ), + ] = None, + domain: Annotated[ + str | None, + typer.Option( + "-d", "--domain", help="Domain for DNS lookup", show_default=False + ), + ] = None, + requests: Annotated[ + int, typer.Option("-n", "--count", help="Number of ICMP echo requests") + ] = 5, + dtype: Annotated[ + str, + typer.Option("-t", "--type", help="DNS record type (A, AAAA, MX, TXT, etc.)"), + ] = "A", + sockets: Annotated[ + bool, typer.Option("--sockets", help="Show socket information (ss)") + ] = False, + no_trace: Annotated[ + bool, typer.Option("--no-trace", help="Skip traceroute/mtr when --host is set") + ] = False, ): """ Flexible network diagnostics: run only what you request. @@ -191,7 +218,7 @@ def summarize_ping(out: str) -> str: bits.append(f"loss={loss}") if avg is not None: bits.append(f"avg_rtt_ms={avg}") - + return " | ".join(bits) if bits else (out.strip()[:200] if out else "") def summarize_trace(out: str, max_lines: int = 12) -> str: @@ -213,19 +240,29 @@ def summarize_trace(out: str, max_lines: int = 12) -> str: # --- then require at least one section --- if not any([host, url, domain, sockets]): - warn("Nothing to do. Provide at least one of: --host, --url, --domain, --sockets") + warn( + "Nothing to do. Provide at least one of: --host, --url, --domain, --sockets" + ) raise typer.Exit(code=1) # ---- ping / traceroute (or mtr -r fallback) ---- if host: header("Ping") ping_out = run_cmd(f"ping -c {requests} {host}") - print(summarize_ping(ping_out) if ping_out else "[warn] ping not available or produced no output") + print( + summarize_ping(ping_out) + if ping_out + else "[warn] ping not available or produced no output" + ) if not no_trace: header("Traceroute") trace_out = run_cmd(f"traceroute {host}") or run_cmd(f"mtr -r {host}") - print(summarize_trace(trace_out) if trace_out else "[warn] traceroute/mtr not available or produced no output") + print( + summarize_trace(trace_out) + if trace_out + else "[warn] traceroute/mtr not available or produced no output" + ) # ---- http (curl) ---- if url: @@ -233,13 +270,23 @@ def summarize_trace(out: str, max_lines: int = 12) -> str: u = normalize_url(url) print(curl_brief(u)) headers = run_cmd(f"curl -s -I {u}") - print(headers.strip() if headers else "[warn] curl not available or produced no output") + print( + headers.strip() + if headers + else "[warn] curl not available or produced no output" + ) # ---- dns ---- if domain: header("DNS") - dns_out = run_cmd(f"dig +short {domain} {dtype}") or run_cmd(f"nslookup -type={dtype} {domain}") - print(dns_out.strip() if dns_out else "[warn] dig/nslookup not available or produced no output") + dns_out = run_cmd(f"dig +short {domain} {dtype}") or run_cmd( + f"nslookup -type={dtype} {domain}" + ) + print( + dns_out.strip() + if dns_out + else "[warn] dig/nslookup not available or produced no output" + ) # ---- sockets (ss) ---- if sockets: diff --git a/tests/test_network.py b/tests/test_network.py index abd25d0..3c52fc9 100644 --- a/tests/test_network.py +++ b/tests/test_network.py @@ -2,10 +2,13 @@ from typer.testing import CliRunner import cli.app as appmod + runner = CliRunner() + def run_cmd_spy_factory(): calls = [] + def _spy(cmd: str): calls.append(cmd) if cmd.startswith("ping "): @@ -25,13 +28,17 @@ def _spy(cmd: str): if cmd.startswith("ss -tulwn"): return "Netid State Local Address:Port Peer Address:Port\n" return "" + _spy.calls = calls return _spy + def test_network_runs_all_sections(monkeypatch, capsys): spy = run_cmd_spy_factory() monkeypatch.setattr(appmod, "run_cmd", spy) - appmod.network(url="http://example.com", host="1.1.1.1", domain="example.com", sockets=True) + appmod.network( + url="http://example.com", host="1.1.1.1", domain="example.com", sockets=True + ) out = capsys.readouterr().out assert "Ping" in out assert "Traceroute" in out @@ -40,6 +47,7 @@ def test_network_runs_all_sections(monkeypatch, capsys): assert "Sockets (ss)" in out assert "HTTP 200 | total" in out + def test_empty_flags_fail_fast(monkeypatch): spy = run_cmd_spy_factory() monkeypatch.setattr(appmod, "run_cmd", spy) @@ -50,6 +58,7 @@ def test_empty_flags_fail_fast(monkeypatch): except click.exceptions.Exit as e: assert e.exit_code == 2 + def test_traceroute_then_mtr_fallback(monkeypatch, capsys): def run_cmd_fake(cmd: str): if cmd.startswith("ping "): @@ -59,14 +68,28 @@ def run_cmd_fake(cmd: str): if cmd.startswith("mtr -r "): return "Start: mtr report\n1. a\n2. b" return "" + monkeypatch.setattr(appmod, "run_cmd", run_cmd_fake) appmod.network(host="1.1.1.1") out = capsys.readouterr().out assert "mtr report" in out + def test_cli_invocation_smoke(monkeypatch): spy = run_cmd_spy_factory() monkeypatch.setattr(appmod, "run_cmd", spy) - result = runner.invoke(appmod.app, ["network", "-u", "http://example.com", "-h", "1.1.1.1", "-d", "example.com", "--sockets"]) + result = runner.invoke( + appmod.app, + [ + "network", + "-u", + "http://example.com", + "-h", + "1.1.1.1", + "-d", + "example.com", + "--sockets", + ], + ) assert result.exit_code == 0 assert "HTTP (curl)" in result.stdout