-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathdeepcode.py
More file actions
executable file
·1860 lines (1588 loc) · 76 KB
/
deepcode.py
File metadata and controls
executable file
·1860 lines (1588 loc) · 76 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#!/usr/bin/env python3
"""
Deep Code - CLI tool powered by DeepSeek API
Exact replica of Claude Code CLI but using DeepSeek's API
"""
import os
import sys
import argparse
import json
import subprocess
import shlex
import re
import uuid
import sqlite3
import signal
import threading
import time
from pathlib import Path
from typing import List, Optional, Dict, Any, Tuple
from datetime import datetime
from dotenv import load_dotenv
# Try to import platform-specific modules for key detection
try:
import select
HAS_SELECT = True
except ImportError:
HAS_SELECT = False
try:
import msvcrt
IS_WINDOWS = True
except ImportError:
IS_WINDOWS = False
from rich.console import Console
from rich.markdown import Markdown
from rich.panel import Panel
from rich.syntax import Syntax
from rich.prompt import Prompt, Confirm
from rich.live import Live
from rich.text import Text
from rich.table import Table
from rich.progress import Progress, SpinnerColumn, TextColumn, BarColumn, TimeElapsedColumn
from rich.layout import Layout
from rich.align import Align
from rich.box import ROUNDED, DOUBLE, HEAVY, SQUARE
from rich.rule import Rule
from rich.measure import Measurement
try:
import pyfiglet
HAS_FIGLET = True
except ImportError:
HAS_FIGLET = False
try:
import emojis
HAS_EMOJIS = True
except ImportError:
HAS_EMOJIS = False
# Fallback emoji function
def emojis_encode(text):
emoji_map = {
':file_folder:': '📁',
':information:': 'ℹ️',
':speech_balloon:': '💬',
':door:': '🚪',
':stop_sign:': '🛑',
':rocket:': '🚀',
':bust_in_silhouette:': '👤',
':robot_face:': '🤖',
':page_with_curl:': '📄',
':arrow_forward:': '▶️',
':wave:': '👋',
':white_check_mark:': '✅',
':brain:': '🧠',
}
for key, emoji in emoji_map.items():
text = text.replace(key, emoji)
return text
import openai
import requests
from bs4 import BeautifulSoup
from urllib.parse import urlparse
# Import utils for file editing and new advanced modules
try:
import sys
sys.path.insert(0, str(Path(__file__).parent))
from utils import extract_code_blocks, detect_file_edit_request, apply_code_changes
from tools import ToolRegistry, ToolResult
from security import SecurityValidator, SecurityConfig, PermissionManager
from context_manager import ContextManager, MessageBuilder
from ui import ModernUI
HAS_ADVANCED_FEATURES = True
except ImportError:
HAS_ADVANCED_FEATURES = False
ModernUI = None
# Fallback if utils.py not available - define inline
def extract_code_blocks(text: str):
"""Extract code blocks from markdown text"""
pattern = r'```(?:(?:(\w+))?(?::\s*(.+?))?\n)?(.*?)```'
matches = re.finditer(pattern, text, re.DOTALL | re.MULTILINE)
blocks = []
for match in matches:
language = match.group(1) or ""
file_path = match.group(2) or ""
code = match.group(3) or ""
if file_path:
file_path = file_path.strip()
for prefix in ['File:', 'file:', 'Path:', 'path:']:
if file_path.startswith(prefix):
file_path = file_path[len(prefix):].strip()
if code.strip():
blocks.append({'language': language, 'file_path': file_path, 'code': code})
return blocks
def detect_file_edit_request(user_input: str, response: str):
"""Detect if user wants to edit a file - only on explicit edit requests"""
# More specific edit keywords - must be clear edit intent
# Removed vague words like 'add', 'remove', 'insert', 'delete' that could match anywhere
edit_keywords = ['edit', 'modify', 'update', 'change', 'fix', 'write to', 'write', 'create', 'implement in']
# Check for explicit edit intent in user input
user_lower = user_input.lower()
has_edit_intent = any(keyword in user_lower for keyword in edit_keywords)
if not has_edit_intent:
return None
# Extract code blocks from response
code_blocks = extract_code_blocks(response)
# Only proceed if we have code blocks WITH file paths
# If code blocks exist but no file paths, it's just code examples, not an edit request
if not code_blocks:
return None
# Look for file path in code blocks first (most reliable indicator)
file_path = None
for block in code_blocks:
if block.get('file_path') and block['file_path'].strip():
file_path = block['file_path'].strip()
break
# If no file path in code blocks, try to extract from user input
# But be very specific - only match patterns that clearly indicate edit intent
if not file_path:
# More restrictive patterns - must have edit keyword near file path
file_patterns = [
r'(?:edit|modify|update|change|fix|write to|create|implement in)\s+["\']?([^\s"\'<>]+\.\w+)', # "edit file.py" or "edit 'file.py'"
r'(?:edit|modify|update|change|fix)\s+(?:file|the file|in)\s+["\']?([^\s"\'<>]+\.\w+)', # "edit the file.py"
]
for pattern in file_patterns:
match = re.search(pattern, user_input, re.IGNORECASE)
if match:
file_path = match.group(1)
break
# CRITICAL: Only return edit info if we have BOTH:
# 1. Code blocks with actual code content
# 2. A file path (either in code block or explicitly mentioned with edit intent)
# This prevents false positives when AI just shows code examples or discusses files
valid_code_blocks = [b for b in code_blocks if b.get('code', '').strip()]
if valid_code_blocks and file_path:
return {'file_path': file_path, 'code_blocks': valid_code_blocks, 'full_response': response}
return None
def apply_code_changes(file_path: str, new_code: str, mode: str = 'replace'):
"""Apply code changes to a file"""
try:
path = Path(file_path).expanduser().resolve()
path.write_text(new_code, encoding='utf-8')
return (True, f"✓ Updated {path}")
except Exception as e:
return (False, f"✗ Error updating file: {str(e)}")
# Load environment variables
load_dotenv()
console = Console(
force_terminal=True,
color_system="auto",
width=None,
emoji=True,
markup=True,
highlight=True
)
# Global interrupt flag
interrupt_flag = threading.Event()
esc_key_monitor_thread = None
esc_key_monitor_active = threading.Event()
def monitor_esc_key():
"""Monitor stdin for ESC key press and set interrupt_flag"""
# Save terminal settings
old_settings = None
fd = None
if not IS_WINDOWS and sys.stdin.isatty():
try:
import termios
import tty
fd = sys.stdin.fileno()
old_settings = termios.tcgetattr(fd)
tty.setraw(fd)
except (ImportError, AttributeError, termios.error, OSError):
# If we can't set raw mode, we'll use a simpler approach
old_settings = None
fd = None
try:
while not esc_key_monitor_active.is_set():
if esc_key_monitor_active.wait(0.1):
break
# Check for input
if IS_WINDOWS:
# Windows: use msvcrt
try:
if msvcrt.kbhit():
key = msvcrt.getch()
if key == b'\x1b': # ESC key
interrupt_flag.set()
break
except (IOError, OSError):
break
elif HAS_SELECT and sys.stdin.isatty():
# Unix: use select to check if stdin has data
try:
ready, _, _ = select.select([sys.stdin], [], [], 0.1)
if ready:
char = sys.stdin.read(1)
if char == '\x1b': # ESC key
interrupt_flag.set()
# Try to clear remaining input buffer (escape sequences)
try:
if select.select([sys.stdin], [], [], 0)[0]:
sys.stdin.read(1) # Read the '[' after ESC if it's an escape sequence
except (IOError, OSError):
pass
break
except (IOError, OSError, ValueError):
# If select fails, fall back to sleep
time.sleep(0.1)
else:
# Fallback: just sleep (can't detect ESC without select)
time.sleep(0.1)
finally:
# Restore terminal settings
if old_settings is not None and fd is not None:
try:
import termios
termios.tcsetattr(fd, termios.TCSADRAIN, old_settings)
except (termios.error, OSError, ImportError):
pass
def start_esc_monitor():
"""Start monitoring for ESC key"""
global esc_key_monitor_thread
esc_key_monitor_active.clear()
interrupt_flag.clear()
if esc_key_monitor_thread is None or not esc_key_monitor_thread.is_alive():
esc_key_monitor_thread = threading.Thread(target=monitor_esc_key, daemon=True)
esc_key_monitor_thread.start()
def stop_esc_monitor():
"""Stop monitoring for ESC key"""
esc_key_monitor_active.set()
if esc_key_monitor_thread and esc_key_monitor_thread.is_alive():
esc_key_monitor_thread.join(timeout=0.5)
# Default DeepSeek API configuration
DEFAULT_API_BASE = "https://api.deepseek.com"
DEFAULT_MODEL = "deepseek-chat" # Chat model for conversational interactions
# Session storage
SESSION_DB = Path.home() / ".deepcode" / "sessions.db"
SESSION_DIR = Path.home() / ".deepcode"
SESSION_DIR.mkdir(parents=True, exist_ok=True)
class SessionManager:
"""Manage conversation sessions"""
def __init__(self):
self._init_db()
def _init_db(self):
"""Initialize session database"""
conn = sqlite3.connect(SESSION_DB)
conn.execute("""
CREATE TABLE IF NOT EXISTS sessions (
session_id TEXT PRIMARY KEY,
directory TEXT,
created_at TEXT,
updated_at TEXT,
messages TEXT
)
""")
conn.commit()
conn.close()
def get_recent_session(self, directory: str = None) -> Optional[str]:
"""Get most recent session ID"""
conn = sqlite3.connect(SESSION_DB)
cursor = conn.cursor()
if directory:
cursor.execute("""
SELECT session_id FROM sessions
WHERE directory = ?
ORDER BY updated_at DESC
LIMIT 1
""", (directory,))
else:
cursor.execute("""
SELECT session_id FROM sessions
ORDER BY updated_at DESC
LIMIT 1
""")
result = cursor.fetchone()
conn.close()
return result[0] if result else None
def save_session(self, session_id: str, directory: str, messages: List[Dict]):
"""Save session"""
conn = sqlite3.connect(SESSION_DB)
conn.execute("""
INSERT OR REPLACE INTO sessions (session_id, directory, created_at, updated_at, messages)
VALUES (?, ?, COALESCE((SELECT created_at FROM sessions WHERE session_id = ?), ?), ?, ?)
""", (session_id, directory, session_id, datetime.now().isoformat(), datetime.now().isoformat(), json.dumps(messages)))
conn.commit()
conn.close()
def load_session(self, session_id: str) -> Optional[List[Dict]]:
"""Load session messages"""
conn = sqlite3.connect(SESSION_DB)
cursor = conn.cursor()
cursor.execute("SELECT messages FROM sessions WHERE session_id = ?", (session_id,))
result = cursor.fetchone()
conn.close()
if result:
return json.loads(result[0])
return None
def update_session(self, session_id: str, messages: List[Dict]):
"""Update session"""
conn = sqlite3.connect(SESSION_DB)
conn.execute("""
UPDATE sessions SET updated_at = ?, messages = ? WHERE session_id = ?
""", (datetime.now().isoformat(), json.dumps(messages), session_id))
conn.commit()
conn.close()
class DeepSeekClient:
"""Client for interacting with DeepSeek API"""
def __init__(self, api_key: Optional[str] = None, api_base: Optional[str] = None, model: Optional[str] = None):
self.api_key = api_key or os.getenv("DEEPSEEK_API_KEY")
self.api_base = api_base or os.getenv("DEEPSEEK_API_BASE", DEFAULT_API_BASE)
self.model = model or os.getenv("DEEPSEEK_MODEL", DEFAULT_MODEL)
if not self.api_key:
console.print("[red]Error: DEEPSEEK_API_KEY not found. Please set it in your environment or .env file.[/red]")
sys.exit(1)
self.client = openai.OpenAI(
api_key=self.api_key,
base_url=self.api_base
)
def chat(
self,
messages: List[Dict[str, str]],
stream: bool = True,
temperature: float = 0.7,
max_tokens: Optional[int] = None
):
"""Send a chat request to DeepSeek API"""
try:
response = self.client.chat.completions.create(
model=self.model,
messages=messages,
stream=stream,
temperature=temperature,
max_tokens=max_tokens
)
return response
except Exception as e:
console.print(f"[red]Error calling DeepSeek API: {str(e)}[/red]")
sys.exit(1)
def load_file_context(file_path: str, max_lines: int = 10000) -> str:
"""Load file content for context"""
try:
path = Path(file_path).expanduser().resolve()
if not path.exists():
return ""
with open(path, 'r', encoding='utf-8', errors='ignore') as f:
lines = f.readlines()
if len(lines) > max_lines:
lines = lines[:max_lines]
content = ''.join(lines)
return f"File: {path}\n```\n{content}\n```"
except Exception as e:
return ""
def load_directory_context(dir_path: str = ".", max_files: int = 100, max_file_size: int = 100000) -> str:
"""Load directory structure and key files for context"""
try:
path = Path(dir_path).expanduser().resolve()
if not path.exists():
return ""
if not path.is_dir():
return load_file_context(str(path))
context_parts = [f"# Directory: {path}\n\n"]
# Get directory structure
try:
tree_output = subprocess.run(
["tree", "-L", "3", "-I", "__pycache__|*.pyc|node_modules|.git|venv|env|.venv|dist|build"],
cwd=path,
capture_output=True,
text=True,
timeout=5
)
if tree_output.returncode == 0:
context_parts.append("## Directory Structure\n```\n")
context_parts.append(tree_output.stdout[:3000])
context_parts.append("```\n\n")
except:
pass
# Get file structure
code_extensions = [
'*.py', '*.js', '*.ts', '*.jsx', '*.tsx', '*.java', '*.go', '*.rs',
'*.cpp', '*.c', '*.h', '*.hpp', '*.rb', '*.php', '*.swift', '*.kt',
'*.sh', '*.bash', '*.zsh', '*.fish', '*.ps1', '*.bat', '*.yml',
'*.yaml', '*.json', '*.xml', '*.html', '*.css', '*.scss', '*.sql',
'*.md', '*.txt', '*.env', '*.config', '*.conf', '*.toml'
]
files = []
for ext in code_extensions:
files.extend(path.rglob(ext))
# Filter out common ignore patterns
ignore_patterns = ['__pycache__', '.git', 'node_modules', 'venv', 'env', '.venv',
'.pytest_cache', 'dist', 'build', '.next', '.nuxt', 'target']
files = [f for f in files if not any(ignore in str(f) for ignore in ignore_patterns)]
files = files[:max_files]
# Key files first
key_files = [
'README.md', 'README.txt', 'README', 'package.json', 'requirements.txt',
'setup.py', 'pyproject.toml', 'Cargo.toml', 'go.mod', 'pom.xml', 'build.gradle',
'Makefile', 'docker-compose.yml', '.env.example', 'Dockerfile',
'composer.json', 'Gemfile', 'Pipfile'
]
context_parts.append("## Key Files\n\n")
for key_file in key_files:
key_path = path / key_file
if key_path.exists():
try:
size = key_path.stat().st_size
if size < max_file_size:
content = key_path.read_text(encoding='utf-8', errors='ignore')[:5000]
context_parts.append(f"### {key_file}\n```\n{content}\n```\n\n")
except:
pass
# Sample code files
code_files = [f for f in files if f.suffix in ['.py', '.js', '.ts', '.go', '.rs', '.java', '.cpp', '.c', '.rb', '.php']]
context_parts.append("## Sample Code Files\n\n")
for f in code_files[:15]:
try:
size = f.stat().st_size
if size < max_file_size:
content = f.read_text(encoding='utf-8', errors='ignore')[:5000]
rel_path = f.relative_to(path)
context_parts.append(f"### {rel_path}\n```\n{content}\n```\n\n")
except:
pass
return ''.join(context_parts)
except Exception as e:
return ""
def execute_bash(command: str, cwd: Optional[str] = None, timeout: int = 30) -> Tuple[str, str, int]:
"""Execute a bash command"""
try:
result = subprocess.run(
command,
shell=True,
capture_output=True,
text=True,
cwd=cwd or os.getcwd(),
timeout=timeout,
executable='/bin/bash'
)
return result.stdout, result.stderr, result.returncode
except subprocess.TimeoutExpired:
return "", "Command timed out", 124
except Exception as e:
return "", str(e), 1
def web_search(query: str, num_results: int = 5) -> str:
"""Perform a web search"""
try:
url = "https://html.duckduckgo.com/html/"
params = {"q": query}
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"
}
response = requests.get(url, params=params, headers=headers, timeout=10)
response.raise_for_status()
soup = BeautifulSoup(response.text, 'html.parser')
results = []
for result in soup.find_all('div', class_='result')[:num_results]:
title_elem = result.find('a', class_='result__a')
snippet_elem = result.find('a', class_='result__snippet')
if title_elem:
title = title_elem.get_text(strip=True)
link = title_elem.get('href', '')
snippet = snippet_elem.get_text(strip=True) if snippet_elem else ""
results.append(f"Title: {title}\nLink: {link}\nSnippet: {snippet}\n")
if results:
return "\n".join(results)
else:
return f"Web search performed for: {query}"
except Exception as e:
return f"Web search attempted for: {query} (Failed: {str(e)})"
def curl_request(url: str, method: str = "GET", headers: Dict[str, str] = None, data: str = None, json_data: Dict = None) -> str:
"""Perform an HTTP request"""
try:
request_headers = headers or {}
if json_data:
request_headers['Content-Type'] = 'application/json'
method = method.upper()
if method == "GET":
response = requests.get(url, headers=request_headers, timeout=30)
elif method == "POST":
if json_data:
response = requests.post(url, json=json_data, headers=request_headers, timeout=30)
else:
response = requests.post(url, data=data, headers=request_headers, timeout=30)
elif method == "PUT":
if json_data:
response = requests.put(url, json=json_data, headers=request_headers, timeout=30)
else:
response = requests.put(url, data=data, headers=request_headers, timeout=30)
elif method == "DELETE":
response = requests.delete(url, headers=request_headers, timeout=30)
else:
return f"Unsupported HTTP method: {method}"
result = f"Status Code: {response.status_code}\n"
result += f"Headers:\n{json.dumps(dict(response.headers), indent=2)}\n\n"
try:
result += f"Response Body (JSON):\n{json.dumps(response.json(), indent=2)}\n"
except:
text_response = response.text[:10000]
result += f"Response Body (Text):\n{text_response}\n"
if len(response.text) > 10000:
result += f"\n... (truncated, total {len(response.text)} characters)\n"
return result
except Exception as e:
return f"Error performing request: {str(e)}"
def format_response_with_syntax(text: str) -> None:
"""Format response for CLI - clean, structured, left-aligned, and readable"""
if not text.strip():
return
# Parse and format content properly for CLI
lines = text.split('\n')
i = 0
in_code_block = False
code_language = ""
code_lines = []
prev_was_heading = False
prev_was_list = False
prev_empty = False
while i < len(lines):
line = lines[i]
stripped = line.strip()
original_line = line
# Handle code blocks
if stripped.startswith('```'):
# End current code block if open
if in_code_block:
if code_lines:
code_content = '\n'.join(code_lines)
syntax = Syntax(code_content, code_language, theme="monokai", line_numbers=False, word_wrap=True)
console.print(syntax)
console.print() # Space after code block
in_code_block = False
code_lines = []
code_language = ""
prev_was_heading = False
prev_was_list = False
prev_empty = False
else:
# Start new code block
in_code_block = True
code_language = stripped[3:].strip() or "text"
# Add spacing before code block if needed
if i > 0 and lines[i-1].strip():
console.print()
i += 1
continue
if in_code_block:
code_lines.append(original_line) # Preserve original indentation
i += 1
continue
# Handle headings - left-align with bold, proper spacing
if stripped.startswith('#'):
# Add spacing before heading if needed
if i > 0 and lines[i-1].strip() and not prev_was_heading:
console.print()
level = 0
while level < len(stripped) and stripped[level] == '#':
level += 1
heading_text = stripped[level:].strip()
if heading_text:
# Left-aligned bold heading (no centering)
if level == 1:
console.print(f"[bold bright_white]{heading_text}[/bold bright_white]")
elif level == 2:
console.print(f"[bold cyan]{heading_text}[/bold cyan]")
elif level == 3:
console.print(f"[bold yellow]{heading_text}[/bold yellow]")
else:
console.print(f"[bold]{heading_text}[/bold]")
prev_was_heading = True
prev_was_list = False
prev_empty = False
i += 1
continue
# Handle lists - properly formatted and indented
numbered_match = re.match(r'^(\s*)(\d+)[\.\)]\s+(.+)', line)
bullet_match = re.match(r'^(\s*)[-*•]\s+(.+)', line)
if numbered_match:
indent, num, content = numbered_match.groups()
# Add spacing before list if needed
if not prev_was_list and i > 0 and lines[i-1].strip():
console.print()
# Print numbered list item - properly aligned
console.print(f"{indent}{num}. {content}")
prev_was_list = True
prev_was_heading = False
prev_empty = False
i += 1
continue
elif bullet_match:
indent, content = bullet_match.groups()
# Add spacing before list if needed
if not prev_was_list and i > 0 and lines[i-1].strip():
console.print()
# Print bullet list item - properly aligned
console.print(f"{indent}• {content}")
prev_was_list = True
prev_was_heading = False
prev_empty = False
i += 1
continue
# Reset list flag for non-list items
if prev_was_list:
prev_was_list = False
# Handle regular text
if stripped:
# Regular paragraph text - just print as-is
console.print(original_line)
prev_was_heading = False
prev_empty = False
i += 1
else:
# Empty line - only add one if we haven't already
if not prev_empty:
console.print()
prev_empty = True
prev_was_heading = False
i += 1
# Close any open code block
if in_code_block and code_lines:
code_content = '\n'.join(code_lines)
syntax = Syntax(code_content, code_language, theme="monokai", line_numbers=False, word_wrap=True)
console.print(syntax)
# Final spacing
console.print()
def stream_response(response, show_progress: bool = True) -> str:
"""Stream and display response from API with clean Claude-like formatting"""
collected_content = []
if hasattr(response, '__iter__'):
spinner_stop = threading.Event()
spinner_thread = None
spinner_stopped = threading.Event()
def show_spinner():
"""Show spinner in separate thread until stopped"""
spinner_chars = ["⠋", "⠙", "⠹", "⠸", "⠼", "⠴", "⠦", "⠧", "⠇", "⠏"]
i = 0
while not spinner_stop.is_set() and not interrupt_flag.is_set():
if spinner_stop.is_set():
break
char = spinner_chars[i % len(spinner_chars)]
sys.stdout.write(f"\r{char} Thinking...")
sys.stdout.flush()
i += 1
for _ in range(8):
if spinner_stop.is_set():
break
time.sleep(0.01)
if spinner_stop.is_set():
sys.stdout.write("\r\033[K")
sys.stdout.flush()
spinner_stopped.set()
if show_progress:
# Start spinner immediately
sys.stdout.write("\r⠋ Thinking...")
sys.stdout.flush()
spinner_thread = threading.Thread(target=show_spinner, daemon=True)
spinner_thread.start()
start_esc_monitor()
try:
# Collect all content - don't display yet
for chunk in response:
if interrupt_flag.is_set():
if show_progress:
spinner_stop.set()
console.print("\n[yellow]⚠️ Interrupted (ESC or Ctrl+C)[/yellow]")
break
if chunk.choices and len(chunk.choices) > 0:
delta = chunk.choices[0].delta
if hasattr(delta, 'content') and delta.content:
collected_content.append(delta.content)
except KeyboardInterrupt:
if show_progress:
spinner_stop.set()
console.print("\n[yellow]⚠️ Interrupted (ESC or Ctrl+C)[/yellow]")
interrupt_flag.set()
finally:
# Stop spinner ONLY when we're about to print content
if show_progress:
spinner_stop.set()
# Wait for spinner to clear
spinner_stopped.wait(timeout=0.2)
if not spinner_stopped.is_set():
if spinner_thread and spinner_thread.is_alive():
spinner_thread.join(timeout=0.1)
sys.stdout.write("\r\033[K")
sys.stdout.flush()
# Ensure spinner thread is completely done
if spinner_thread and spinner_thread.is_alive():
spinner_thread.join(timeout=0.15)
stop_esc_monitor()
full_content = ''.join(collected_content)
# Format and display the complete response AFTER spinner is cleared
if full_content.strip():
format_response_in_panel(full_content)
else:
console.print()
return full_content
else:
# Non-streaming response
content = response.choices[0].message.content
if content:
format_response_in_panel(content)
return content
def _get_emoji(key: str) -> str:
"""Helper to get emoji with fallback"""
if HAS_EMOJIS:
return emojis.encode(key)
else:
emoji_map = {
':file_folder:': '📁', ':information:': 'ℹ️', ':speech_balloon:': '💬',
':door:': '🚪', ':stop_sign:': '🛑', ':rocket:': '🚀',
':bust_in_silhouette:': '👤', ':robot_face:': '🤖', ':page_with_curl:': '📄',
':arrow_forward:': '▶️', ':wave:': '👋', ':white_check_mark:': '✅', ':brain:': '🧠',
}
return emoji_map.get(key, '')
def format_response_in_panel(text: str) -> None:
"""Format response in Claude-like clean minimal style"""
# Always use clean formatting (no heavy panels)
format_response_with_syntax(text)
def parse_tool_calls_from_response(response_text: str, current_dir: str = None) -> Dict[str, Any]:
"""Parse tool calls from assistant response text - ONLY explicit tool calls at start of lines"""
tools = {}
# IMPORTANT: Only match tool calls at the beginning of lines to avoid false positives
# This prevents matching when AI explains "you could use @bash git status"
# Only matches actual tool requests like:
# @bash git status
# @web search query
# Split into lines to check each one
lines = response_text.split('\n')
# Track if we're inside a code block
in_code_block = False
for line in lines:
line_stripped = line.strip()
# Check for code block markers
if line_stripped.startswith('```'):
in_code_block = not in_code_block
continue
# Skip lines inside code blocks
if in_code_block:
continue
# Skip lines that are clearly explanatory (common patterns)
if any(phrase in line.lower() for phrase in [
'you can use', 'you could use', 'try using', 'consider using',
'for example', 'like this', 'such as', 'you might', 'you should',
'would be', 'could be', 'can be', 'to use', 'using the',
'available:', 'syntax:', 'example:', 'usage:', 'command:',
'can run', 'could run', 'might want to', 'use `@'
]):
continue
# Skip lines with inline code (contains backticks)
if '`' in line:
continue
# Only match if tool call is at the start of the line (after stripping)
# This ensures it's an intentional tool request, not explanation
# Web search - must start with @web or @search
if line_stripped.startswith('@web ') or line_stripped.startswith('@search '):
match = re.match(r'^@(?:web|search)\s+(.+)', line_stripped, re.IGNORECASE)
if match and 'web_search' not in tools: # Only first match
tools['web_search'] = match.group(1).strip()
# HTTP request - must start with @curl or @request or @fetch
elif line_stripped.startswith('@curl ') or line_stripped.startswith('@request ') or line_stripped.startswith('@fetch '):
match = re.match(r'^@(?:curl|request|fetch)\s+(.+)', line_stripped, re.IGNORECASE)
if match and 'curl' not in tools: # Only first match
tools['curl'] = match.group(1).strip()
# Bash execution - must start with @bash, @exec, or @run
elif line_stripped.startswith('@bash ') or line_stripped.startswith('@exec ') or line_stripped.startswith('@run '):
match = re.match(r'^@(?:bash|exec|run)\s+(.+)', line_stripped, re.IGNORECASE)
if match and 'bash' not in tools: # Only first match
tools['bash'] = match.group(1).strip()
return tools
def parse_tool_calls(user_input: str, current_dir: str = None) -> Dict[str, Any]:
"""Parse user input for tool calls - both explicit and implicit"""
tools = {}
# Explicit tool calls
if '@web' in user_input.lower() or '@search' in user_input.lower():
match = re.search(r'@(?:web|search)\s+(.+)', user_input, re.IGNORECASE)
if match:
tools['web_search'] = match.group(1).strip()
if '@curl' in user_input.lower() or '@request' in user_input.lower():
match = re.search(r'@(?:curl|request)\s+(.+)', user_input, re.IGNORECASE)
if match:
tools['curl'] = match.group(1).strip()
if '@bash' in user_input.lower() or '@exec' in user_input.lower() or '@run' in user_input.lower():
match = re.search(r'@(?:bash|exec|run)\s+(.+)', user_input, re.IGNORECASE)
if match:
tools['bash'] = match.group(1).strip()
# Implicit file reading - detect file paths in query
file_patterns = [
r'file\s+([^\s]+)', # "file path/to/file"
r'read\s+([^\s]+)', # "read path/to/file"
r'analyze\s+([^\s]+\.\w+)', # "analyze file.py"
r'([^\s]+\.(py|js|ts|jsx|tsx|java|go|rs|cpp|c|h|rb|php|sh|md|txt|json|yml|yaml))\b', # File extensions
r'["\']([^"\']+\.\w+)["\']', # Quoted file paths
]
for pattern in file_patterns:
matches = re.findall(pattern, user_input, re.IGNORECASE)
for match in matches:
file_path = match if isinstance(match, str) else match[0] if match else None
if file_path:
# Resolve relative paths
if current_dir:
full_path = Path(current_dir) / file_path
if full_path.exists():
tools.setdefault('files', []).append(str(full_path))
else:
test_path = Path(file_path)
if test_path.exists():
tools.setdefault('files', []).append(file_path)
# Implicit bash commands - ONLY for very explicit direct command requests
# Don't try to parse natural language requests - let the AI handle those
# Only trigger for clear patterns like "run git status" or "execute npm install"
bash_patterns = [
# Explicit "run <command>" or "execute <command>" at start
r'^(?:run|execute)\s+(git\s+.+?)(?:\.|$|\?)',
# Direct command-like patterns: "git status", "npm install", etc. (but only at start, not embedded)
r'^(git|npm|pip|python|node|docker)\s+([a-z]+\s+.*?)(?:\.|$|\?|and\s+.*$)',
]
if not tools.get('bash'): # Only if no explicit @bash
# Skip if it's a natural language request (not a direct command)
if re.match(r'^(?:can you|please|will you|do|help me|i need|i want)', user_input, re.IGNORECASE):
pass # Let AI handle natural language requests
else:
for pattern in bash_patterns:
match = re.search(pattern, user_input, re.IGNORECASE)
if match:
# Extract the command - handle different pattern groups
if match.lastindex >= 1:
potential_cmd = match.group(1).strip()
if match.lastindex >= 2:
# For patterns with multiple groups, combine them
potential_cmd = f"{match.group(1)} {match.group(2)}".strip()
else:
potential_cmd = match.group(0).strip()
# Extract just the command part (remove "run" or "execute" prefix)
potential_cmd = re.sub(r'^(?:run|execute)\s+', '', potential_cmd, flags=re.IGNORECASE).strip()
# Don't auto-execute dangerous commands
dangerous = ['rm -rf', 'delete', 'format', 'mkfs']
if not any(d in potential_cmd.lower() for d in dangerous):
tools['bash'] = potential_cmd
break
# Implicit web search - ONLY for clear information-seeking questions
# Don't trigger on action requests or commands
web_search_contexts = [
'what is', 'what are', 'how to', 'how do', 'tutorial', 'documentation',
'guide', 'example', 'explain', 'information about', 'tell me about'
]
# Only trigger if:
# 1. It's clearly a question (ends with ?)
# 2. AND has question words/phrases
# 3. AND is NOT an action/command request
is_question = user_input.strip().endswith('?')
has_question_words = any(ctx in user_input.lower() for ctx in web_search_contexts)
is_action_request = any(cmd_word in user_input.lower() for cmd_word in [