-
Notifications
You must be signed in to change notification settings - Fork 175
Expand file tree
/
Copy pathonefilellm.py
More file actions
3888 lines (3290 loc) · 202 KB
/
onefilellm.py
File metadata and controls
3888 lines (3290 loc) · 202 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
import asyncio
import time
import requests
from bs4 import BeautifulSoup, Comment
from urllib.parse import urljoin, urlparse, parse_qs
import os
import sys
import json
import tiktoken
import re
import shlex
import pyperclip
from pathlib import Path
import wget
from rich import print
from rich.console import Console
from rich.panel import Panel
from rich.text import Text
from rich.prompt import Prompt
from rich.progress import Progress, TextColumn, BarColumn, TimeRemainingColumn
import xml.etree.ElementTree as ET # Keep for preprocess_text if needed
from typing import Union, List, Dict, Optional, Set, Tuple
from dotenv import load_dotenv
from urllib.robotparser import RobotFileParser
from readability import Document
import io
import argparse
import tempfile
# Import utility functions
from utils import (
safe_file_read, read_from_clipboard, read_from_stdin,
detect_text_format, parse_as_plaintext, parse_as_markdown,
parse_as_json, parse_as_html, parse_as_yaml,
download_file, is_same_domain, is_within_depth,
is_excluded_file, is_allowed_filetype, escape_xml
)
# Try to import yaml, but don't fail if not available
try:
import yaml
except ImportError:
yaml = None
print("[Warning] PyYAML module not found. YAML format detection/parsing will be limited.", file=sys.stderr)
# --- Configuration Flags ---
ENABLE_COMPRESSION_AND_NLTK = False # Set to True to enable NLTK download, stopword removal, and compressed output
TOKEN_ESTIMATE_MULTIPLIER = 1.37 # Multiplier to estimate model token usage from tiktoken count
# Default token placeholder used when GITHUB_TOKEN is absent
DEFAULT_GITHUB_TOKEN = "default_token_here"
# Flag to suppress repeating missing-token warnings
_WARNED_ABOUT_TOKEN = False
# Global flag to disable network operations when set; populated by load_configuration()
OFFLINE_MODE = False
# Placeholder for the GitHub token used for authenticated requests
TOKEN = DEFAULT_GITHUB_TOKEN
# Prepared headers for GitHub requests; populated by load_configuration()
headers: Dict[str, str] = {}
def load_configuration() -> None:
"""Load environment-driven configuration values."""
global OFFLINE_MODE, TOKEN, headers, _WARNED_ABOUT_TOKEN
load_dotenv()
OFFLINE_MODE = os.getenv("OFFLINE_MODE", "").lower() in ("1", "true", "yes")
token_from_env = os.getenv("GITHUB_TOKEN")
if token_from_env is None:
TOKEN = DEFAULT_GITHUB_TOKEN
if not _WARNED_ABOUT_TOKEN:
print(
"[bold red]Warning:[/bold red] GITHUB_TOKEN environment variable not set. "
"GitHub API requests may fail or be rate-limited."
)
_WARNED_ABOUT_TOKEN = True
else:
TOKEN = token_from_env
_WARNED_ABOUT_TOKEN = False
headers = {"Authorization": f"token {TOKEN}"} if TOKEN != DEFAULT_GITHUB_TOKEN else {}
# Path to cached tiktoken encoding for offline use
LOCAL_TIKTOKEN_PATH = os.path.join(os.path.dirname(__file__), "cl100k_base.tiktoken")
# Cached Encoding instance to avoid repeated downloads
_TIKTOKEN_ENCODING = None
load_configuration()
# --- End Configuration Flags ---
# --- Output Format Notes ---
# This script produces output wrapped in XML-like tags for structure (e.g., <source>, <file>).
# However, the *content* within these tags (especially code) is NOT XML-escaped.
# This means characters like < > & within code blocks are preserved as-is for readability
# and correct interpretation by LLMs. The escape_xml function currently returns text unchanged.
# --- End Output Format Notes ---
# --- Configuration Directories ---
EXCLUDED_DIRS = ["dist", "node_modules", ".git", "__pycache__"]
# Extensions to skip when processing direct file URLs
DISALLOWED_EXTENSIONS = {'.pdf'}
# --- Alias Configuration ---
ALIAS_DIR_NAME = ".onefilellm_aliases" # Re-use existing constant
ALIAS_DIR = Path.home() / ALIAS_DIR_NAME
# Backwards compatibility: some tests and code may reference ALIAS_CONFIG_DIR
ALIAS_CONFIG_DIR = ALIAS_DIR
USER_ALIASES_PATH = ALIAS_DIR / "aliases.json"
CORE_ALIASES = {
"ofl_readme": "https://github.com/jimmc414/onefilellm/blob/main/readme.md",
"ofl_repo": "https://github.com/jimmc414/onefilellm",
"gh_search": "https://github.com/search?q={}", # Example alias expecting a placeholder
"arxiv_search": "https://arxiv.org/search/?query={}&searchtype=all&source=header",
# Consider adding more common aliases
}
# --- End Alias Configuration ---
def ensure_alias_dir_exists():
"""Ensures the alias directory exists, creating it if necessary."""
ALIAS_DIR.mkdir(parents=True, exist_ok=True)
class AliasManager:
def __init__(self, console, core_aliases_dict, user_aliases_file_path):
self.console = console
self.core_aliases_map = core_aliases_dict.copy() # Store the original core aliases
self.user_aliases_file_path = user_aliases_file_path
self.user_aliases_map = {}
self.effective_aliases_map = {} # Merged view: user takes precedence
self._ensure_alias_dir()
def _ensure_alias_dir(self):
"""Ensures the alias directory exists."""
try:
self.user_aliases_file_path.parent.mkdir(parents=True, exist_ok=True)
except OSError as e:
self.console.print(f"[bold red]Error:[/bold red] Could not create alias directory {self.user_aliases_file_path.parent}: {e}")
def load_aliases(self):
"""Loads user aliases from file and merges with core aliases."""
self._load_user_aliases()
self.effective_aliases_map = self.core_aliases_map.copy()
self.effective_aliases_map.update(self.user_aliases_map) # User aliases override core
def _load_user_aliases(self):
"""Loads user aliases from the JSON file."""
self.user_aliases_map = {}
if self.user_aliases_file_path.exists():
try:
with open(self.user_aliases_file_path, "r", encoding="utf-8") as f:
data = json.load(f)
if isinstance(data, dict):
self.user_aliases_map = data
else:
self.console.print(f"[bold yellow]Warning:[/bold yellow] Alias file {self.user_aliases_file_path} is not a valid JSON object. Ignoring.")
except json.JSONDecodeError:
self.console.print(f"[bold yellow]Warning:[/bold yellow] Could not parse alias file {self.user_aliases_file_path}. It may be corrupt. Please check or remove it.")
except IOError as e:
self.console.print(f"[bold red]Error:[/bold red] Could not read alias file {self.user_aliases_file_path}: {e}")
# If file doesn't exist, user_aliases_map remains empty, which is fine.
def _save_user_aliases(self):
"""Saves the current user aliases to the JSON file."""
self._ensure_alias_dir() # Ensure directory exists before writing
try:
with open(self.user_aliases_file_path, "w", encoding="utf-8") as f:
json.dump(self.user_aliases_map, f, indent=2)
except IOError as e:
self.console.print(f"[bold red]Error:[/bold red] Could not write to alias file {self.user_aliases_file_path}: {e}")
return False
return True
def get_command(self, alias_name: str) -> Optional[str]:
"""Gets the command string for a given alias name from the effective list."""
return self.effective_aliases_map.get(alias_name)
def _is_valid_alias_name(self, name: str) -> bool:
if not name or name.startswith("--"):
return False
# Basic check for path-like characters or other problematic chars.
# Allows alphanumeric, underscore, hyphen.
if not re.fullmatch(r"[a-zA-Z0-9_-]+", name):
return False
return True
def add_or_update_alias(self, name: str, command_string: str) -> bool:
"""Adds or updates a user-defined alias."""
if not self._is_valid_alias_name(name):
self.console.print(f"[bold red]Error:[/bold red] Invalid alias name '{name}'. Names must be alphanumeric and can include '-' or '_'. They cannot start with '--'.")
return False
self.user_aliases_map[name] = command_string
if self._save_user_aliases():
self.effective_aliases_map[name] = command_string # Update effective map
self.console.print(f"Alias '{name}' set to: \"{command_string}\"")
return True
return False
def remove_alias(self, name: str) -> bool:
"""Removes a user-defined alias."""
if name in self.user_aliases_map:
del self.user_aliases_map[name]
if self._save_user_aliases():
# Update effective map: if core alias was shadowed, it's now active
if name in self.core_aliases_map:
self.effective_aliases_map[name] = self.core_aliases_map[name]
else: # No core alias with this name, so it's gone from effective too
if name in self.effective_aliases_map:
del self.effective_aliases_map[name]
self.console.print(f"User alias '{name}' removed.")
return True
return False # Save failed
else:
self.console.print(f"User alias '{name}' not found.")
return False
def list_aliases_formatted(self, list_user=True, list_core=True) -> str:
"""Returns a formatted string of aliases for display."""
output_lines = []
# Determine combined keys for proper ordering and precedence display
all_names = sorted(list(set(self.core_aliases_map.keys()) | set(self.user_aliases_map.keys())))
if not all_names and (list_user or list_core):
return "No aliases defined."
for name in all_names:
command_str = ""
source_type = ""
is_user = name in self.user_aliases_map
is_core = name in self.core_aliases_map
if is_user and list_user:
command_str = self.user_aliases_map[name]
source_type = "(user)" + (" (overrides core)" if is_core else "")
elif is_core and list_core:
command_str = self.core_aliases_map[name]
source_type = "(core)" + (" (overridden by user)" if is_user else "")
if command_str: # If we have something to show based on filters
output_lines.append(f"- [cyan]{name}[/cyan] {source_type}: \"{command_str}\"")
if not output_lines:
if list_user and not list_core: return "No user aliases defined."
if list_core and not list_user: return "No core aliases defined."
return "No aliases to display with current filters."
return "\n".join(output_lines)
# --- Placeholders for custom formats ---
def parse_as_doculing(text_content: str) -> str:
"""Placeholder for Doculing parsing. Returns text as is for V1."""
# TODO: Implement actual Doculing parsing logic when specifications are available.
return text_content
def parse_as_markitdown(text_content: str) -> str:
"""Placeholder for Markitdown parsing. Returns text as is for V1."""
# TODO: Implement actual Markitdown parsing logic when specifications are available.
return text_content
def get_parser_for_format(format_name: str) -> callable:
"""
Returns the appropriate parser function based on the format name.
Defaults to parse_as_plaintext if format is unknown.
"""
parsers = {
"text": parse_as_plaintext,
"markdown": parse_as_markdown,
"json": parse_as_json,
"html": parse_as_html,
"yaml": parse_as_yaml,
"doculing": parse_as_doculing, # Placeholder
"markitdown": parse_as_markitdown, # Placeholder
}
return parsers.get(format_name, parse_as_plaintext) # Default to plaintext parser
def process_text_stream(raw_text_content: str, source_info: dict, console: Console, format_override: str | None = None) -> str | None:
"""
Processes text from a stream (stdin or clipboard).
Detects format, parses, and builds the XML structure.
Args:
raw_text_content (str): The raw text from the input stream.
source_info (dict): Information about the source, e.g., {'type': 'stdin'}.
console (Console): The Rich console object for printing messages.
format_override (str | None): User-specified format, if any.
Returns:
str | None: The XML structured output string, or None if processing fails.
"""
actual_format = ""
parsed_content = ""
if format_override:
actual_format = format_override.lower()
console.print(f"[green]Processing input as [bold]{actual_format}[/bold] (user override).[/green]")
else:
actual_format = detect_text_format(raw_text_content)
console.print(f"[green]Detected format: [bold]{actual_format}[/bold][/green]")
if actual_format == "yaml" and yaml is None:
console.print(
"[bold yellow]Warning:[/bold yellow] PyYAML is not installed; "
"falling back to plain text parsing for YAML input."
)
actual_format = "text"
parser_function = get_parser_for_format(actual_format)
try:
parsed_content = parser_function(raw_text_content)
except json.JSONDecodeError as e:
console.print(f"[bold red]Error:[/bold red] Input specified or detected as JSON, but it's not valid JSON. Details: {e}")
return None
except Exception as e: # Catch-all for other parsing errors
if yaml is not None and isinstance(e, yaml.YAMLError):
console.print(
"[bold red]Error:[/bold red] Input specified or detected as YAML, "
f"but it's not valid YAML. Details: {e}"
)
return None
console.print(f"[bold red]Error:[/bold red] Failed to parse content as {actual_format}. Details: {e}")
return None
# XML Generation for the stream
# This XML structure should be consistent with how single files/sources are wrapped.
# The escape_xml function currently does nothing, which is correct for content.
# Attributes of XML tags *should* be escaped if they could contain special chars,
# but 'stdin', 'clipboard', and format names are safe.
source_type_attr = escape_xml(source_info.get('type', 'unknown_stream'))
format_attr = escape_xml(actual_format)
# Build the XML for this specific stream source
# This part creates the XML for THIS stream.
# It will be wrapped by <onefilellm_output> in main() if it's the only input,
# or combined with other sources by combine_xml_outputs() if multiple inputs are supported later.
# For now, let's assume process_text_stream provides the content for a single <source> tag
# and main() will handle the <onefilellm_output> wrapper.
# XML structure should mirror existing <source> tags for files/URLs where possible
# but with type="stdin" or type="clipboard".
# Instead of <file path="...">, we might have a <content_block> or similar.
# Let's create a simple XML structure for the stream content.
# The content itself (parsed_content) is NOT XML-escaped, preserving its raw form.
xml_parts = [
f'<source type="{source_type_attr}" processed_as_format="{format_attr}">',
f'<content>{escape_xml(parsed_content)}</content>', # escape_xml does nothing to parsed_content
f'</source>'
]
final_xml_for_stream = "\n".join(xml_parts)
return final_xml_for_stream
def process_ipynb_file(temp_file):
try:
import nbformat
from nbconvert import PythonExporter
with open(temp_file, "r", encoding='utf-8', errors='ignore') as f:
notebook_content = f.read()
exporter = PythonExporter()
python_code, _ = exporter.from_notebook_node(nbformat.reads(notebook_content, as_version=4))
return python_code
except Exception as e:
print(f"[bold red]Error processing notebook {temp_file}: {e}[/bold red]")
# Return error message instead of raising, wrapped in comments
return f"# ERROR PROCESSING NOTEBOOK: {e}\n"
# --- XML Handling ---
# --- End XML Handling ---
def process_github_repo(repo_url):
"""
Processes a GitHub repository, extracting file contents and wrapping them in XML structure.
"""
if OFFLINE_MODE:
msg = "Offline mode enabled; skipping GitHub repository fetch"
print(f"[bold yellow]{msg}[/bold yellow]")
return f'<source type="github_repository" url="{escape_xml(repo_url)}"><error>{escape_xml(msg)}</error></source>'
api_base_url = "https://api.github.com/repos/"
repo_url_parts = repo_url.split("https://github.com/")[-1].split("/")
repo_name = "/".join(repo_url_parts[:2])
branch_or_tag = ""
subdirectory = ""
if len(repo_url_parts) > 2 and repo_url_parts[2] == "tree":
if len(repo_url_parts) > 3:
branch_or_tag = repo_url_parts[3]
if len(repo_url_parts) > 4:
subdirectory = "/".join(repo_url_parts[4:])
contents_url = f"{api_base_url}{repo_name}/contents"
if subdirectory:
contents_url = f"{contents_url}/{subdirectory}"
if branch_or_tag:
contents_url = f"{contents_url}?ref={branch_or_tag}"
# Start XML structure
repo_content = [f'<source type="github_repository" url="{escape_xml(repo_url)}">']
def process_directory_recursive(url, repo_content_list):
try:
response = requests.get(url, headers=headers, timeout=30)
response.raise_for_status()
files = response.json()
for file_info in files:
if file_info["type"] == "dir" and file_info["name"] in EXCLUDED_DIRS:
continue
if file_info["type"] == "file" and is_allowed_filetype(file_info["name"]):
print(f"Processing {file_info['path']}...")
with tempfile.TemporaryDirectory() as temp_dir:
temp_file = os.path.join(temp_dir, file_info["name"])
try:
download_file(file_info["download_url"], temp_file, headers=headers)
repo_content_list.append(f'\n<file path="{escape_xml(file_info["path"])}">')
if file_info["name"].endswith(".ipynb"):
# Append raw code - escape_xml not needed as it does nothing
repo_content_list.append(process_ipynb_file(temp_file))
else:
# Append raw code - escape_xml not needed here
repo_content_list.append(safe_file_read(temp_file))
repo_content_list.append('</file>')
except Exception as e:
print(f"[bold red]Error processing file {file_info['path']}: {e}[/bold red]")
repo_content_list.append(f'\n<file path="{escape_xml(file_info["path"])}">')
repo_content_list.append(f'<error>Failed to download or process: {escape_xml(str(e))}</error>')
repo_content_list.append('</file>')
elif file_info["type"] == "dir":
process_directory_recursive(file_info["url"], repo_content_list)
except requests.exceptions.RequestException as e:
print(f"[bold red]Error fetching directory {url}: {e}[/bold red]")
repo_content_list.append(f'<error>Failed to fetch directory {escape_xml(url)}: {escape_xml(str(e))}</error>')
except Exception as e: # Catch other potential errors like JSON parsing
print(f"[bold red]Error processing directory {url}: {e}[/bold red]")
repo_content_list.append(f'<error>Failed processing directory {escape_xml(url)}: {escape_xml(str(e))}</error>')
process_directory_recursive(contents_url, repo_content)
repo_content.append('\n</source>') # Close source tag
print("GitHub repository processing finished.")
return "\n".join(repo_content)
def process_local_folder(local_path, console: Console):
"""
Processes a local directory, extracting file contents and wrapping them in XML structure.
Args:
local_path: Path to the local directory to process.
console: Rich Console instance for displaying progress and errors.
"""
def process_local_directory_recursive(current_path, content_list, console):
try:
for item in os.listdir(current_path):
item_path = os.path.join(current_path, item)
relative_path = os.path.relpath(item_path, local_path)
if os.path.isdir(item_path):
if item not in EXCLUDED_DIRS:
process_local_directory_recursive(item_path, content_list, console)
elif os.path.isfile(item_path):
if is_allowed_filetype(item):
console.print(f"Processing {item_path}...")
content_list.append(f'\n<file path="{escape_xml(relative_path)}">')
try:
if item.lower().endswith(".ipynb"): # Case-insensitive check
content_list.append(process_ipynb_file(item_path))
elif item.lower().endswith(".pdf"): # Case-insensitive check
content_list.append(_process_pdf_content_from_path(item_path))
elif item.lower().endswith(('.xls', '.xlsx')): # Case-insensitive check for Excel files
# Need to pop the opening file tag we already added
content_list.pop() # Remove the <file> tag
# Generate Markdown for each sheet
try:
for sheet, md in excel_to_markdown(item_path).items():
virtual_name = f"{os.path.splitext(relative_path)[0]}_{sheet}.md"
content_list.append(f'\n<file path="{escape_xml(virtual_name)}">')
content_list.append(md) # raw Markdown table
content_list.append('</file>')
except Exception as e:
console.print(f"[bold red]Error processing Excel file {item_path}: {e}[/bold red]")
# Re-add the original file tag for the error message
content_list.append(f'\n<file path="{escape_xml(relative_path)}">')
content_list.append(f'<e>Failed to process Excel file: {escape_xml(str(e))}</e>')
content_list.append('</file>')
continue # Skip the final </file> for Excel files
else:
content_list.append(safe_file_read(item_path))
except Exception as e:
console.print(f"[bold red]Error reading file {item_path}: {e}[/bold red]")
content_list.append(f'<error>Failed to read file: {escape_xml(str(e))}</error>')
content_list.append('</file>')
except Exception as e:
console.print(f"[bold red]Error reading directory {current_path}: {e}[/bold red]")
content_list.append(f'<error>Failed reading directory {escape_xml(current_path)}: {escape_xml(str(e))}</error>')
# Start XML structure
content = [f'<source type="local_folder" path="{escape_xml(local_path)}">']
process_local_directory_recursive(local_path, content, console)
content.append('\n</source>') # Close source tag
console.print("Local folder processing finished.")
return '\n'.join(content)
def _process_pdf_content_from_path(file_path):
"""
Extracts text content from a local PDF file.
Returns the extracted text or an error message string.
"""
print(f" Extracting text from local PDF: {file_path}")
text_list = []
try:
from PyPDF2 import PdfReader
with open(file_path, 'rb') as pdf_file_obj:
pdf_reader = PdfReader(pdf_file_obj)
if not pdf_reader.pages:
print(f" [bold yellow]Warning:[/bold yellow] PDF file has no pages or is encrypted: {file_path}")
return "<e>PDF file has no pages or could not be read (possibly encrypted).</e>"
for i, page_obj in enumerate(pdf_reader.pages):
try:
page_text = page_obj.extract_text()
if page_text:
text_list.append(page_text)
except Exception as page_e: # Catch error extracting from a specific page
print(f" [bold yellow]Warning:[/bold yellow] Could not extract text from page {i+1} of {file_path}: {page_e}")
text_list.append(f"\n<e>Could not extract text from page {i+1}.</e>\n")
if not text_list:
print(f" [bold yellow]Warning:[/bold yellow] No text could be extracted from PDF: {file_path}")
return "<e>No text could be extracted from PDF.</e>"
return ' '.join(text_list)
except Exception as e:
print(f"[bold red]Error reading PDF file {file_path}: {e}[/bold red]")
return f"<e>Failed to read or process PDF file: {escape_xml(str(e))}</e>"
def _download_and_read_file(url):
"""
Downloads and reads the content of a file from a URL.
Returns the content as text or an error message string.
"""
if OFFLINE_MODE:
msg = "Offline mode enabled; skipping download"
print(f"[bold yellow]{msg}[/bold yellow]")
return f"<e>{escape_xml(msg)}</e>"
print(f" Downloading and reading content from: {url}")
try:
# Add headers conditionally
response = requests.get(url, headers=headers if TOKEN != DEFAULT_GITHUB_TOKEN else None, timeout=30)
response.raise_for_status()
# Use apparent_encoding (chardet-based) to avoid RFC 2616 ISO-8859-1 default
encoding = response.apparent_encoding or 'utf-8'
try:
# Try to decode as text
content = response.content.decode(encoding)
return content
except UnicodeDecodeError:
# If that fails, try a fallback encoding
try:
content = response.content.decode('latin-1')
return content
except Exception as decode_err:
print(f" [bold yellow]Warning:[/bold yellow] Could not decode content: {decode_err}")
return f"<e>Failed to decode content: {escape_xml(str(decode_err))}</e>"
except requests.RequestException as e:
print(f"[bold red]Error downloading file from {url}: {e}[/bold red]")
return f"<e>Failed to download file: {escape_xml(str(e))}</e>"
except Exception as e:
print(f"[bold red]Unexpected error processing file from {url}: {e}[/bold red]")
return f"<e>Unexpected error: {escape_xml(str(e))}</e>"
def excel_to_markdown(
file_path: Union[str, Path],
*,
skip_rows: int = 0, # Changed from 3 to 0 to not skip potential headers
min_header_cells: int = 2,
sheet_filter: List[str] | None = None,
) -> Dict[str, str]:
"""
Convert an Excel workbook (.xls / .xlsx) to Markdown.
Parameters
----------
file_path :
Path to the workbook.
skip_rows :
How many leading rows to ignore before we start hunting for a header row.
Default is 0 to ensure we don't miss any potential headers.
min_header_cells :
Minimum number of non-NA cells that makes a row "look like" a header.
sheet_filter :
Optional list of sheet names to include (exact match, case-sensitive).
Returns
-------
Dict[str, str]
Mapping of ``sheet_name → markdown_table``.
Empty dict means the workbook had no usable sheets by the above rules.
Raises
------
ValueError
If the file extension is not .xls or .xlsx.
RuntimeError
If *none* of the sheets meet the header-detection criteria.
"""
import pandas as pd
file_path = Path(file_path).expanduser().resolve()
if file_path.suffix.lower() not in {".xls", ".xlsx"}:
raise ValueError("Only .xls/.xlsx files are supported")
print(f"Processing Excel file: {file_path}")
# For simple Excel files, it's often better to use header=0 directly
# Try both approaches: first with automatic header detection, then fallback to header=0
try:
# Let pandas pick the right engine (openpyxl for xlsx, xlrd/pyxlsb if installed for xls)
wb = pd.read_excel(file_path, sheet_name=None, header=None)
md_tables: Dict[str, str] = {}
for name, df in wb.items():
if sheet_filter and name not in sheet_filter:
continue
df = df.iloc[skip_rows:].reset_index(drop=True)
try:
# Try to find a header row
header_idx = next(i for i, row in df.iterrows() if row.count() >= min_header_cells)
# Use ffill instead of deprecated method parameter
header = df.loc[header_idx].copy()
header = header.ffill() # Forward-fill NaN values
body = df.loc[header_idx + 1:].copy()
body.columns = header
body.dropna(how="all", inplace=True)
# Convert to markdown
md_tables[name] = body.to_markdown(index=False)
print(f" Processed sheet '{name}' with detected header")
except StopIteration:
# No row looked like a header - skip for now, we'll try again with header=0
print(f" No header detected in sheet '{name}', will try fallback")
continue
# If no headers were found with our heuristic, try again with header=0
if not md_tables:
print(" No headers detected with heuristic, trying with fixed header row")
wb = pd.read_excel(file_path, sheet_name=None, header=0)
for name, df in wb.items():
if sheet_filter and name not in sheet_filter:
continue
# Drop rows that are all NaN
df = df.dropna(how="all")
# Convert to markdown
md_tables[name] = df.to_markdown(index=False)
print(f" Processed sheet '{name}' with fixed header")
if not md_tables:
raise RuntimeError("Workbook contained no sheets with usable data")
return md_tables
except Exception as e:
print(f"Error processing Excel file: {e}")
# Last resort: try with the most basic approach
wb = pd.read_excel(file_path, sheet_name=None)
md_tables = {name: df.to_markdown(index=False) for name, df in wb.items()
if not (sheet_filter and name not in sheet_filter)}
if not md_tables:
raise RuntimeError(f"Failed to extract any usable data from Excel file: {e}")
return md_tables
def excel_to_markdown_from_url(
url: str,
*,
skip_rows: int = 0, # Changed from 3 to 0 to not skip potential headers
min_header_cells: int = 2,
sheet_filter: List[str] | None = None,
) -> Dict[str, str]:
"""
Download an Excel workbook from a URL and convert it to Markdown.
This function downloads the Excel file from the URL to a BytesIO buffer
and then processes it using excel_to_markdown.
Parameters are the same as excel_to_markdown.
Returns
-------
Dict[str, str]
Mapping of ``sheet_name → markdown_table``.
Raises
------
ValueError, RuntimeError, RequestException
Various errors that might occur during downloading or processing.
"""
if OFFLINE_MODE:
msg = "Offline mode enabled; skipping Excel download"
print(f"[bold yellow]{msg}[/bold yellow]")
raise RuntimeError(msg)
import pandas as pd
import io
print(f" Downloading Excel file from URL: {url}")
try:
# Add headers conditionally
response = requests.get(url, headers=headers if TOKEN != DEFAULT_GITHUB_TOKEN else None, timeout=30)
response.raise_for_status()
# Create a BytesIO buffer from the downloaded content
excel_buffer = io.BytesIO(response.content)
# For simple Excel files, it's often better to use header=0 directly
# Try both approaches: first with automatic header detection, then fallback to header=0
try:
# Let pandas read from the buffer
wb = pd.read_excel(excel_buffer, sheet_name=None, header=None)
md_tables: Dict[str, str] = {}
for name, df in wb.items():
if sheet_filter and name not in sheet_filter:
continue
df = df.iloc[skip_rows:].reset_index(drop=True)
try:
# Try to find a header row
header_idx = next(i for i, row in df.iterrows() if row.count() >= min_header_cells)
# Use ffill instead of deprecated method parameter
header = df.loc[header_idx].copy()
header = header.ffill() # Forward-fill NaN values
body = df.loc[header_idx + 1:].copy()
body.columns = header
body.dropna(how="all", inplace=True)
# Convert to markdown
md_tables[name] = body.to_markdown(index=False)
print(f" Processed sheet '{name}' with detected header")
except StopIteration:
# No row looked like a header - skip for now, we'll try again with header=0
print(f" No header detected in sheet '{name}', will try fallback")
continue
# If no headers were found with our heuristic, try again with header=0
if not md_tables:
print(" No headers detected with heuristic, trying with fixed header row")
excel_buffer.seek(0) # Reset the buffer position
wb = pd.read_excel(excel_buffer, sheet_name=None, header=0)
for name, df in wb.items():
if sheet_filter and name not in sheet_filter:
continue
# Drop rows that are all NaN
df = df.dropna(how="all")
# Convert to markdown
md_tables[name] = df.to_markdown(index=False)
print(f" Processed sheet '{name}' with fixed header")
if not md_tables:
raise RuntimeError("Workbook contained no sheets with usable data")
return md_tables
except Exception as e:
print(f"Error processing Excel file: {e}")
# Last resort: try with the most basic approach
excel_buffer.seek(0) # Reset the buffer position
wb = pd.read_excel(excel_buffer, sheet_name=None)
md_tables = {name: df.to_markdown(index=False) for name, df in wb.items()
if not (sheet_filter and name not in sheet_filter)}
if not md_tables:
raise RuntimeError(f"Failed to extract any usable data from Excel file: {e}")
return md_tables
except requests.RequestException as e:
print(f"[bold red]Error downloading Excel file from {url}: {e}[/bold red]")
raise
except Exception as e:
print(f"[bold red]Error processing Excel file from {url}: {e}[/bold red]")
raise
def process_arxiv_pdf(arxiv_abs_url):
"""
Downloads and extracts text from an ArXiv PDF, wrapped in XML.
"""
# Strip query parameters before constructing PDF URL
arxiv_clean_url = arxiv_abs_url.split('?')[0].split('#')[0]
pdf_url = arxiv_clean_url.replace("/abs/", "/pdf/") + ".pdf"
if OFFLINE_MODE:
msg = "Offline mode enabled; skipping ArXiv download"
print(f"[bold yellow]{msg}[/bold yellow]")
return f'<source type="arxiv" url="{escape_xml(arxiv_abs_url)}"><error>{escape_xml(msg)}</error></source>'
try:
print(f"Downloading ArXiv PDF from {pdf_url}...")
response = requests.get(pdf_url, timeout=30)
response.raise_for_status()
with tempfile.NamedTemporaryFile(suffix='.pdf') as temp_pdf:
temp_pdf.write(response.content)
temp_pdf.flush()
print("Extracting text from PDF...")
text_list = []
from PyPDF2 import PdfReader
temp_pdf.seek(0)
pdf_reader = PdfReader(temp_pdf)
for i, page in enumerate(range(len(pdf_reader.pages))):
print(f" Processing page {i+1}/{len(pdf_reader.pages)}")
page_text = pdf_reader.pages[page].extract_text()
if page_text: # Add text only if extraction was successful
text_list.append(page_text)
# Use XML structure
formatted_text = f'<source type="arxiv" url="{escape_xml(arxiv_abs_url)}">\n'
formatted_text += ' '.join(text_list) # Append raw extracted text
formatted_text += '\n</source>' # Close source tag
print("ArXiv paper processed successfully.")
return formatted_text
except requests.exceptions.ProxyError as e:
print(f"[bold red]Proxy error downloading ArXiv PDF {pdf_url}: {e}[/bold red]")
return f'<source type="arxiv" url="{escape_xml(arxiv_abs_url)}"><error>Proxy error: {escape_xml(str(e))}</error></source>'
except requests.RequestException as e:
print(f"[bold red]Error downloading ArXiv PDF {pdf_url}: {e}[/bold red]")
return f'<source type="arxiv" url="{escape_xml(arxiv_abs_url)}"><error>Failed to download PDF: {escape_xml(str(e))}</error></source>'
except Exception as e: # Catch PdfReader errors or others
print(f"[bold red]Error processing ArXiv PDF {arxiv_abs_url}: {e}[/bold red]")
return f'<source type="arxiv" url="{escape_xml(arxiv_abs_url)}"><error>Failed to process PDF: {escape_xml(str(e))}</error></source>'
def fetch_youtube_transcript(url):
"""
Fetches YouTube transcript using yt-dlp with fallback to youtube_transcript_api, wrapped in XML.
"""
import tempfile
import subprocess
def extract_video_id(url):
"""Extract a YouTube video ID from a variety of URL formats."""
def is_valid(video_id):
return bool(video_id and re.fullmatch(r"[a-zA-Z0-9_-]{11}", video_id))
try:
parsed = urlparse(url)
except Exception:
parsed = None
if parsed and parsed.hostname:
hostname = parsed.hostname.lower()
if hostname.endswith("youtu.be"):
candidate = parsed.path.strip("/").split("/")[0]
if is_valid(candidate):
return candidate
if hostname.endswith("youtube.com"):
query_params = parse_qs(parsed.query)
if "v" in query_params:
candidate = query_params["v"][0]
if is_valid(candidate):
return candidate
path_parts = [part for part in parsed.path.split("/") if part]
if len(path_parts) >= 2 and path_parts[0] in {"embed", "v", "shorts", "live"}:
candidate = path_parts[1]
if is_valid(candidate):
return candidate
for part in reversed(path_parts):
if is_valid(part):
return part
pattern = r'(?:https?:\/\/)?(?:www\.)?(?:youtube\.com\/(?:[^\/\n\s]+\/\S+\/|(?:v|e(?:mbed)?)\/|\S*?[?&]v=)|youtu\.be\/)([a-zA-Z0-9_-]{11})'
match = re.search(pattern, url)
return match.group(1) if match else None
video_id = extract_video_id(url)
if not video_id:
print(f"[bold red]Could not extract YouTube video ID from URL: {url}[/bold red]")
return f'<source type="youtube_transcript" url="{escape_xml(url)}">\n<error>Could not extract video ID from URL.</error>\n</source>'
if OFFLINE_MODE:
msg = "Offline mode enabled; skipping YouTube transcript fetch"
print(f"[bold yellow]{msg}[/bold yellow]")
return f'<source type="youtube_transcript" url="{escape_xml(url)}">\n<error>{escape_xml(msg)}</error>\n</source>'
transcript_text = None
error_msg = None
# Try Method 1: Use yt-dlp (most reliable)
try:
print(f"Fetching transcript for YouTube video ID: {video_id} using yt-dlp...")
# Create a temporary directory for subtitle files
with tempfile.TemporaryDirectory() as temp_dir:
output_template = os.path.join(temp_dir, '%(id)s.%(ext)s')
# yt-dlp command to download subtitles only
cmd = [
'yt-dlp',
'--write-auto-sub', # Get automatic subtitles if available
'--write-sub', # Get manual subtitles if available
'--sub-lang', 'en', # Prefer English, but will get others if not available
'--skip-download', # Don't download the video
'--quiet', # Reduce output
'--no-warnings',
'-o', output_template,
url
]
# Run yt-dlp
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
# Capture stderr for more informative error messages
stderr = result.stderr.strip() if result.stderr else ""
error_msg = (
f"yt-dlp failed with exit code {result.returncode}: {stderr}"
if stderr
else f"yt-dlp failed with exit code {result.returncode}"
)
else:
# Look for subtitle files
subtitle_files = []
for ext in ['.en.vtt', '.en.srt', '.vtt', '.srt']:
subtitle_path = os.path.join(temp_dir, f"{video_id}{ext}")
if os.path.exists(subtitle_path):
subtitle_files.append(subtitle_path)
if subtitle_files:
# Read the first available subtitle file
with open(subtitle_files[0], 'r', encoding='utf-8') as f:
content = f.read()
# Parse VTT or SRT format to extract just the text
lines = content.split('\n')
transcript_lines = []
for line in lines:
# Skip timestamp lines and empty lines
if '-->' not in line and line.strip() and not line.strip().isdigit() and not line.startswith('WEBVTT'):
# Remove HTML tags if present
clean_line = re.sub(r'<[^>]+>', '', line)
if clean_line.strip():
transcript_lines.append(clean_line.strip())
transcript_text = ' '.join(transcript_lines)
print(f"Transcript fetched successfully using yt-dlp. Got {len(transcript_lines)} lines.")
else:
error_msg = "No subtitle files found"
except FileNotFoundError:
error_msg = "yt-dlp not found. Please install it with: pip install yt-dlp"
except Exception as e:
error_msg = f"yt-dlp failed: {str(e)}"
print(f"yt-dlp method failed: {error_msg}")
# Try Method 2: Fallback to youtube_transcript_api