diff --git a/.jules/bolt.md b/.jules/bolt.md index 61511e5..0e99867 100644 --- a/.jules/bolt.md +++ b/.jules/bolt.md @@ -6,3 +6,6 @@ ## 2025-05-20 - Pre-compiling Regex in Loops **Learning:** `re.findall(pattern, string)` recompiles (or retrieves from cache) the pattern on every call. In high-frequency functions called inside loops (like complexity estimation), this overhead adds up. **Action:** Always pre-compile regexes (`re.compile`) into module-level or class-level constants if they are used repeatedly, especially in tight loops or recursive functions. +## 2025-05-20 - O(N*M) Lookup Optimization +**Learning:** Re-iterating over a list to find matches (O(N*M)) is extremely slow compared to using a dictionary for O(1) lookups. +**Action:** When finding corresponding items in lists by an ID, pre-compute a dictionary mapping the ID to the item, effectively turning O(N*M) time complexity into O(N). diff --git a/evolutia/material_extractor.py b/evolutia/material_extractor.py index 28b1756..0048df7 100644 --- a/evolutia/material_extractor.py +++ b/evolutia/material_extractor.py @@ -2,32 +2,33 @@ Extractor de materiales didácticos. Lee y parsea archivos Markdown de lecturas, prácticas y tareas. """ + from pathlib import Path -from typing import Dict, List, Optional, Union +from typing import Dict, List, Union import logging import time - -try: - from utils.markdown_parser import ( - read_markdown_file, - extract_frontmatter, - extract_exercise_blocks, - extract_solution_blocks, - resolve_include_path - ) -except ImportError: - from .utils.markdown_parser import ( - read_markdown_file, - extract_frontmatter, - extract_exercise_blocks, - extract_solution_blocks, - resolve_include_path - ) - - -logger = logging.getLogger(__name__) - - + +try: + from utils.markdown_parser import ( + read_markdown_file, + extract_frontmatter, + extract_exercise_blocks, + extract_solution_blocks, + resolve_include_path, + ) +except ImportError: + from .utils.markdown_parser import ( + read_markdown_file, + extract_frontmatter, + extract_exercise_blocks, + extract_solution_blocks, + resolve_include_path, + ) + + +logger = logging.getLogger(__name__) + + class MaterialExtractor: """Extrae ejercicios y soluciones de materiales didácticos.""" @@ -47,7 +48,7 @@ def __init__(self, base_path: Union[Path, str]): self._last_scan_timestamp: float = 0 # TTL del caché en segundos (5 minutos) self._cache_ttl = 300 - + def extract_from_file(self, file_path: Path, use_cache: bool = True) -> Dict: """ Extrae ejercicios y soluciones de un archivo Markdown. @@ -62,84 +63,92 @@ def extract_from_file(self, file_path: Path, use_cache: bool = True) -> Dict: # Verificar caché primero if use_cache and self._is_cache_valid(file_path): logger.debug(f"[MaterialExtractor] Usando caché para {file_path.name}") - return self._file_cache[file_path]['data'] + return self._file_cache[file_path]["data"] try: - content = read_markdown_file(file_path) - frontmatter, content_body = extract_frontmatter(content) - - exercises = extract_exercise_blocks(content_body) - solutions = extract_solution_blocks(content_body) - - # Resolver includes de ejercicios - for exercise in exercises: - if exercise['include_path']: - include_path = resolve_include_path( - exercise['include_path'], - file_path.parent - ) + content = read_markdown_file(file_path) + frontmatter, content_body = extract_frontmatter(content) + + exercises = extract_exercise_blocks(content_body) + solutions = extract_solution_blocks(content_body) + + # Resolver includes de ejercicios + for exercise in exercises: + if exercise["include_path"]: + include_path = resolve_include_path( + exercise["include_path"], file_path.parent + ) if include_path.exists(): - exercise['resolved_content'] = read_markdown_file(include_path) + exercise["resolved_content"] = read_markdown_file(include_path) else: - logger.warning(f"[MaterialExtractor] Include no encontrado en ejercicio: {include_path} (archivo: {file_path})") - exercise['resolved_content'] = exercise['content'] + logger.warning( + f"[MaterialExtractor] Include no encontrado en ejercicio: {include_path} (archivo: {file_path})" + ) + exercise["resolved_content"] = exercise["content"] else: - exercise['resolved_content'] = exercise['content'] + exercise["resolved_content"] = exercise["content"] # Resolver includes de soluciones for solution in solutions: resolved_content_parts = [] - for include_path_str in solution['include_paths']: + for include_path_str in solution["include_paths"]: include_path = resolve_include_path( - include_path_str, - file_path.parent + include_path_str, file_path.parent ) if include_path.exists(): resolved_content_parts.append(read_markdown_file(include_path)) else: - logger.warning(f"[MaterialExtractor] Include no encontrado en solución: {include_path} (archivo: {file_path})") + logger.warning( + f"[MaterialExtractor] Include no encontrado en solución: {include_path} (archivo: {file_path})" + ) if resolved_content_parts: - solution['resolved_content'] = '\n\n---\n\n'.join(resolved_content_parts) + solution["resolved_content"] = "\n\n---\n\n".join( + resolved_content_parts + ) else: - solution['resolved_content'] = solution['content'] - + solution["resolved_content"] = solution["content"] + return { - 'file_path': file_path, - 'frontmatter': frontmatter, - 'exercises': exercises, - 'solutions': solutions, - 'content_body': content_body # Exponer contenido para indexación de lecturas + "file_path": file_path, + "frontmatter": frontmatter, + "exercises": exercises, + "solutions": solutions, + "content_body": content_body, # Exponer contenido para indexación de lecturas } # Guardar en caché if use_cache: self._file_cache[file_path] = { - 'data': result, - 'timestamp': file_path.stat().st_mtime + "data": result, + "timestamp": file_path.stat().st_mtime, } - self._last_scan_timestamp = max(self._last_scan_timestamp, file_path.stat().st_mtime) + self._last_scan_timestamp = max( + self._last_scan_timestamp, file_path.stat().st_mtime + ) return result except Exception as e: logger.error(f"[MaterialExtractor] Error extrayendo de {file_path}: {e}") error_result = { - 'file_path': file_path, - 'frontmatter': {}, - 'exercises': [], - 'solutions': [] + "file_path": file_path, + "frontmatter": {}, + "exercises": [], + "solutions": [], } # Guardar incluso errores en caché para evitar reintentos fallidos if use_cache: self._file_cache[file_path] = { - 'data': error_result, - 'timestamp': time.time() # Usar tiempo actual para archivos que no existen + "data": error_result, + "timestamp": time.time(), # Usar tiempo actual para archivos que no existen } return error_result - def extract_from_directory(self, directory: Path, pattern: str = "*.md") -> List[Dict]: + def extract_from_directory( + self, directory: Path, pattern: str = "*.md" + ) -> List[Dict]: """ Extrae materiales de todos los archivos .md en un directorio. @@ -153,118 +162,146 @@ def extract_from_directory(self, directory: Path, pattern: str = "*.md") -> List directory = Path(directory) if not directory.exists(): logger.warning(f"[MaterialExtractor] Directorio no existe: {directory}") - return [] - - materials = [] - for md_file in directory.rglob(pattern): - # Ignorar archivos en _build y otros directorios temporales - if '_build' in md_file.parts or 'node_modules' in md_file.parts: - continue - - material = self.extract_from_file(md_file) - # Incluirlos si tienen ejercicios/soluciones O si parecen ser materiales de lectura/teoría - if material['exercises'] or material['solutions'] or 'lectura' in md_file.name.lower() or 'teoria' in md_file.name.lower(): - materials.append(material) - - return materials - - def extract_by_topic(self, topic: str) -> List[Dict]: - """ - Extrae materiales de un tema específico. - - Busca en: - - {topic}/semana*_practica.md - - {topic}/semana*_lectura.md - - tareas/tarea*/tarea*.md - - Args: - topic: Nombre del tema (ej: "analisis_vectorial") - - Returns: - Lista de materiales extraídos - """ - materials = [] - - # Buscar en directorio del tema - topic_dir = self.base_path / topic - if topic_dir.exists(): - # Buscar prácticas - practice_files = list(topic_dir.glob("*practica*.md")) - for file in practice_files: - materials.append(self.extract_from_file(file)) - - # Buscar lecturas (pueden tener ejercicios) - reading_files = list(topic_dir.glob("*lectura*.md")) - for file in reading_files: - materials.append(self.extract_from_file(file)) - - # Buscar en tareas (pueden ser de múltiples temas) - tareas_dir = self.base_path / "tareas" - if tareas_dir.exists(): - for tarea_dir in tareas_dir.iterdir(): - if tarea_dir.is_dir(): - tarea_file = tarea_dir / f"{tarea_dir.name}.md" - if tarea_file.exists(): - material = self.extract_from_file(tarea_file) - # Filtrar por tema si es relevante (checking subject or tags) - subject_match = material['frontmatter'].get('subject', '').lower().find(topic.lower()) != -1 - tags_match = any(topic.lower() in tag.lower() for tag in material['frontmatter'].get('tags', [])) - if subject_match or tags_match: - materials.append(material) - - # Buscar en examenes (pueden ser de múltiples temas) - examenes_dir = self.base_path / "examenes" - if examenes_dir.exists(): - for examen_dir in examenes_dir.iterdir(): - if examen_dir.is_dir(): - examen_file = examen_dir / f"{examen_dir.name}.md" - if examen_file.exists(): - material = self.extract_from_file(examen_file) - # Filtrar por tema si es relevante - subject_match = material['frontmatter'].get('subject', '').lower().find(topic.lower()) != -1 - tags_match = any(topic.lower() in tag.lower() for tag in material['frontmatter'].get('tags', [])) - - # Si es examen, a veces no tiene subject especifico o tiene "Examen X". - # Si no hay match explícito, tal vez incluirlo si no se encontraron otros materiales? - # Para seguridad, requerimos algún match en subject, tags o keywords - keywords_match = any(topic.lower() in kw.lower() for kw in material['frontmatter'].get('keywords', [])) - - if subject_match or tags_match or keywords_match: - materials.append(material) - - return materials - - def get_all_exercises(self, materials: List[Dict]) -> List[Dict]: - """ - Obtiene todos los ejercicios de una lista de materiales. - - Args: - materials: Lista de materiales extraídos - - Returns: - Lista de ejercicios con sus metadatos - """ - all_exercises = [] - - for material in materials: - for exercise in material['exercises']: - # Buscar solución correspondiente - solution = None - for sol in material['solutions']: - if sol['exercise_label'] == exercise['label']: - solution = sol - break - - exercise_data = { - 'label': exercise['label'], - 'content': exercise['resolved_content'], - 'source_file': material['file_path'], - 'frontmatter': material['frontmatter'], - 'solution': solution['resolved_content'] if solution else None, - 'solution_label': solution['label'] if solution else None - } - all_exercises.append(exercise_data) - + return [] + + materials = [] + for md_file in directory.rglob(pattern): + # Ignorar archivos en _build y otros directorios temporales + if "_build" in md_file.parts or "node_modules" in md_file.parts: + continue + + material = self.extract_from_file(md_file) + # Incluirlos si tienen ejercicios/soluciones O si parecen ser materiales de lectura/teoría + if ( + material["exercises"] + or material["solutions"] + or "lectura" in md_file.name.lower() + or "teoria" in md_file.name.lower() + ): + materials.append(material) + + return materials + + def extract_by_topic(self, topic: str) -> List[Dict]: + """ + Extrae materiales de un tema específico. + + Busca en: + - {topic}/semana*_practica.md + - {topic}/semana*_lectura.md + - tareas/tarea*/tarea*.md + + Args: + topic: Nombre del tema (ej: "analisis_vectorial") + + Returns: + Lista de materiales extraídos + """ + materials = [] + + # Buscar en directorio del tema + topic_dir = self.base_path / topic + if topic_dir.exists(): + # Buscar prácticas + practice_files = list(topic_dir.glob("*practica*.md")) + for file in practice_files: + materials.append(self.extract_from_file(file)) + + # Buscar lecturas (pueden tener ejercicios) + reading_files = list(topic_dir.glob("*lectura*.md")) + for file in reading_files: + materials.append(self.extract_from_file(file)) + + # Buscar en tareas (pueden ser de múltiples temas) + tareas_dir = self.base_path / "tareas" + if tareas_dir.exists(): + for tarea_dir in tareas_dir.iterdir(): + if tarea_dir.is_dir(): + tarea_file = tarea_dir / f"{tarea_dir.name}.md" + if tarea_file.exists(): + material = self.extract_from_file(tarea_file) + # Filtrar por tema si es relevante (checking subject or tags) + subject_match = ( + material["frontmatter"] + .get("subject", "") + .lower() + .find(topic.lower()) + != -1 + ) + tags_match = any( + topic.lower() in tag.lower() + for tag in material["frontmatter"].get("tags", []) + ) + if subject_match or tags_match: + materials.append(material) + + # Buscar en examenes (pueden ser de múltiples temas) + examenes_dir = self.base_path / "examenes" + if examenes_dir.exists(): + for examen_dir in examenes_dir.iterdir(): + if examen_dir.is_dir(): + examen_file = examen_dir / f"{examen_dir.name}.md" + if examen_file.exists(): + material = self.extract_from_file(examen_file) + # Filtrar por tema si es relevante + subject_match = ( + material["frontmatter"] + .get("subject", "") + .lower() + .find(topic.lower()) + != -1 + ) + tags_match = any( + topic.lower() in tag.lower() + for tag in material["frontmatter"].get("tags", []) + ) + + # Si es examen, a veces no tiene subject especifico o tiene "Examen X". + # Si no hay match explícito, tal vez incluirlo si no se encontraron otros materiales? + # Para seguridad, requerimos algún match en subject, tags o keywords + keywords_match = any( + topic.lower() in kw.lower() + for kw in material["frontmatter"].get("keywords", []) + ) + + if subject_match or tags_match or keywords_match: + materials.append(material) + + return materials + + def get_all_exercises(self, materials: List[Dict]) -> List[Dict]: + """ + Obtiene todos los ejercicios de una lista de materiales. + + Args: + materials: Lista de materiales extraídos + + Returns: + Lista de ejercicios con sus metadatos + """ + all_exercises = [] + + for material in materials: + # Pre-compute solutions dictionary for O(1) lookup + solutions_dict = {} + for sol in material["solutions"]: + if sol["exercise_label"] not in solutions_dict: + solutions_dict[sol["exercise_label"]] = sol + + for exercise in material["exercises"]: + # Buscar solución correspondiente + solution = solutions_dict.get(exercise["label"]) + + exercise_data = { + "label": exercise["label"], + "content": exercise["resolved_content"], + "source_file": material["file_path"], + "frontmatter": material["frontmatter"], + "solution": solution["resolved_content"] if solution else None, + "solution_label": solution["label"] if solution else None, + } + all_exercises.append(exercise_data) + return all_exercises def clear_cache(self): @@ -307,9 +344,7 @@ def get_cache_stats(self) -> Dict: Diccionario con estadísticas del caché """ return { - 'cached_files': len(self._file_cache), - 'last_scan_timestamp': self._last_scan_timestamp, - 'cache_ttl': self._cache_ttl + "cached_files": len(self._file_cache), + "last_scan_timestamp": self._last_scan_timestamp, + "cache_ttl": self._cache_ttl, } - - diff --git a/evolutia/rag/rag_indexer.py b/evolutia/rag/rag_indexer.py index 21ac78d..37a490d 100644 --- a/evolutia/rag/rag_indexer.py +++ b/evolutia/rag/rag_indexer.py @@ -1,57 +1,61 @@ -""" -RAG Indexer: Indexa materiales didácticos en un vector store. -""" -import os -import logging -from pathlib import Path -from typing import Dict, List, Optional, Any -import hashlib - -try: - import chromadb - from chromadb.config import Settings - CHROMADB_AVAILABLE = True -except ImportError: - CHROMADB_AVAILABLE = False - -try: - from sentence_transformers import SentenceTransformer - SENTENCE_TRANSFORMERS_AVAILABLE = True -except ImportError: - SENTENCE_TRANSFORMERS_AVAILABLE = False - -try: - from openai import OpenAI - OPENAI_AVAILABLE = True -except ImportError: - OPENAI_AVAILABLE = False - -from dotenv import load_dotenv - -load_dotenv() - -logger = logging.getLogger(__name__) - - -class RAGIndexer: - """Indexa materiales didácticos en un vector store.""" - - def __init__(self, config: Dict[str, Any], base_path: Path, chroma_client=None): - """ - Inicializa el indexador. - - Args: - config: Configuración de RAG desde config.yaml - base_path: Ruta base del proyecto - chroma_client: Cliente ChromaDB compartido (opcional) - """ +""" +RAG Indexer: Indexa materiales didácticos en un vector store. +""" + +import os +import logging +from pathlib import Path +from typing import Dict, List, Any +import hashlib + +try: + import chromadb + from chromadb.config import Settings + + CHROMADB_AVAILABLE = True +except ImportError: + CHROMADB_AVAILABLE = False + +try: + from sentence_transformers import SentenceTransformer + + SENTENCE_TRANSFORMERS_AVAILABLE = True +except ImportError: + SENTENCE_TRANSFORMERS_AVAILABLE = False + +try: + from openai import OpenAI + + OPENAI_AVAILABLE = True +except ImportError: + OPENAI_AVAILABLE = False + +from dotenv import load_dotenv + +load_dotenv() + +logger = logging.getLogger(__name__) + + +class RAGIndexer: + """Indexa materiales didácticos en un vector store.""" + + def __init__(self, config: Dict[str, Any], base_path: Path, chroma_client=None): + """ + Inicializa el indexador. + + Args: + config: Configuración de RAG desde config.yaml + base_path: Ruta base del proyecto + chroma_client: Cliente ChromaDB compartido (opcional) + """ self.config = config self.base_path = Path(base_path) self.vector_store = None self.embedding_model = None self.embedding_client = None self.embedding_model_name = None - self.embedding_provider = config.get('embeddings', {}).get('provider', 'openai') + self.embedding_provider = config.get("embeddings", {}).get("provider", "openai") self.chroma_client = chroma_client self._embeddings_initialized = False self._setup_vector_store() @@ -63,13 +67,15 @@ def _ensure_embeddings_initialized(self): if self._embeddings_initialized: return - embeddings_config = self.config.get('embeddings', {}) - provider = embeddings_config.get('provider', 'openai') - model_name = embeddings_config.get('model', 'text-embedding-3-small') + embeddings_config = self.config.get("embeddings", {}) + provider = embeddings_config.get("provider", "openai") + model_name = embeddings_config.get("model", "text-embedding-3-small") - if provider == 'openai': + if provider == "openai": if not OPENAI_AVAILABLE: - raise ImportError("openai no está instalado. Instala con: pip install openai") + raise ImportError( + "openai no está instalado. Instala con: pip install openai" + ) api_key = os.getenv("OPENAI_API_KEY") if not api_key: @@ -77,11 +83,15 @@ def _ensure_embeddings_initialized(self): self.embedding_client = OpenAI(api_key=api_key) self.embedding_model_name = model_name - logger.info(f"[RAGIndexer] Inicializados embeddings de OpenAI: {model_name}") + logger.info( + f"[RAGIndexer] Inicializados embeddings de OpenAI: {model_name}" + ) - elif provider == 'sentence-transformers': + elif provider == "sentence-transformers": if not SENTENCE_TRANSFORMERS_AVAILABLE: - raise ImportError("sentence-transformers no está instalado. Instala con: pip install sentence-transformers") + raise ImportError( + "sentence-transformers no está instalado. Instala con: pip install sentence-transformers" + ) self.embedding_model = SentenceTransformer(model_name) logger.info(f"[RAGIndexer] Inicializados embeddings locales: {model_name}") @@ -93,40 +103,41 @@ def _ensure_embeddings_initialized(self): def _setup_embeddings(self): """Configura el modelo de embeddings (mantenido para compatibilidad).""" self._ensure_embeddings_initialized() - - def _setup_vector_store(self): - """Configura el vector store.""" - if not CHROMADB_AVAILABLE: - raise ImportError("chromadb no está instalado. Instala con: pip install chromadb") - - vs_config = self.config.get('vector_store', {}) - persist_dir = Path(vs_config.get('persist_directory', './storage/vector_store')) - collection_name = vs_config.get('collection_name', 'ejercicios_mmfi') - - # Crear directorio si no existe - persist_dir.mkdir(parents=True, exist_ok=True) - - # Usar cliente compartido si está disponible, sino crear uno nuevo - if self.chroma_client is not None: - self.client = self.chroma_client - else: - # Inicializar ChromaDB - self.client = chromadb.PersistentClient( - path=str(persist_dir.resolve()), - settings=Settings(anonymized_telemetry=False) - ) - - # Obtener o crear colección - try: - self.collection = self.client.get_collection(name=collection_name) - logger.info(f"Colección existente cargada: {collection_name}") - except Exception: - self.collection = self.client.create_collection( - name=collection_name, - metadata={"hnsw:space": "cosine"} - ) - logger.info(f"Nueva colección creada: {collection_name}") - + + def _setup_vector_store(self): + """Configura el vector store.""" + if not CHROMADB_AVAILABLE: + raise ImportError( + "chromadb no está instalado. Instala con: pip install chromadb" + ) + + vs_config = self.config.get("vector_store", {}) + persist_dir = Path(vs_config.get("persist_directory", "./storage/vector_store")) + collection_name = vs_config.get("collection_name", "ejercicios_mmfi") + + # Crear directorio si no existe + persist_dir.mkdir(parents=True, exist_ok=True) + + # Usar cliente compartido si está disponible, sino crear uno nuevo + if self.chroma_client is not None: + self.client = self.chroma_client + else: + # Inicializar ChromaDB + self.client = chromadb.PersistentClient( + path=str(persist_dir.resolve()), + settings=Settings(anonymized_telemetry=False), + ) + + # Obtener o crear colección + try: + self.collection = self.client.get_collection(name=collection_name) + logger.info(f"Colección existente cargada: {collection_name}") + except Exception: + self.collection = self.client.create_collection( + name=collection_name, metadata={"hnsw:space": "cosine"} + ) + logger.info(f"Nueva colección creada: {collection_name}") + def _generate_embedding(self, text: str) -> List[float]: """ Genera embedding para un texto. @@ -139,16 +150,15 @@ def _generate_embedding(self, text: str) -> List[float]: """ self._ensure_embeddings_initialized() - if self.embedding_provider == 'openai': + if self.embedding_provider == "openai": response = self.embedding_client.embeddings.create( - model=self.embedding_model_name, - input=text + model=self.embedding_model_name, input=text ) return response.data[0].embedding - elif self.embedding_provider == 'sentence-transformers': + elif self.embedding_provider == "sentence-transformers": return self.embedding_model.encode(text, show_progress_bar=False).tolist() - + def _generate_embeddings_batch(self, texts: List[str]) -> List[List[float]]: """ Genera embeddings para múltiples textos en batch. @@ -161,8 +171,8 @@ def _generate_embeddings_batch(self, texts: List[str]) -> List[List[float]]: """ self._ensure_embeddings_initialized() - if self.embedding_provider == 'openai': - batch_size = self.config.get('embeddings', {}).get('batch_size', 100) + if self.embedding_provider == "openai": + batch_size = self.config.get("embeddings", {}).get("batch_size", 100) embeddings = [] # Filtrar textos vacíos para evitar error 400 de OpenAI @@ -171,11 +181,10 @@ def _generate_embeddings_batch(self, texts: List[str]) -> List[List[float]]: return [] for i in range(0, len(valid_texts), batch_size): - batch = valid_texts[i:i + batch_size] + batch = valid_texts[i : i + batch_size] try: response = self.embedding_client.embeddings.create( - model=self.embedding_model_name, - input=batch + model=self.embedding_model_name, input=batch ) embeddings.extend([item.embedding for item in response.data]) except Exception as e: @@ -185,257 +194,271 @@ def _generate_embeddings_batch(self, texts: List[str]) -> List[List[float]]: return embeddings - elif self.embedding_provider == 'sentence-transformers': - return self.embedding_model.encode(texts, show_progress_bar=True, batch_size=32).tolist() - - def _chunk_text(self, text: str, chunk_size: int = 1000, overlap: int = 100) -> List[str]: - """ - Divide un texto en chunks con overlap. - - Args: - text: Texto a dividir - chunk_size: Tamaño de cada chunk (en caracteres aproximados) - overlap: Overlap entre chunks - - Returns: - Lista de chunks - """ - if len(text) <= chunk_size: - return [text] - - chunks = [] - start = 0 - - while start < len(text): - end = start + chunk_size - chunk = text[start:end] - - # Intentar cortar en un punto razonable (espacio o salto de línea) - if end < len(text): - last_newline = chunk.rfind('\n') - last_space = chunk.rfind(' ') - cut_point = max(last_newline, last_space) - - if cut_point > chunk_size * 0.5: # Si encontramos un buen punto de corte - chunk = chunk[:cut_point] - end = start + cut_point - - chunks.append(chunk.strip()) - start = end - overlap - - return chunks - - def _create_chunk_id(self, source: str, chunk_index: int) -> str: - """Crea un ID único para un chunk.""" - content = f"{source}_{chunk_index}" - return hashlib.md5(content.encode()).hexdigest() - - def index_exercise(self, exercise: Dict, analysis: Dict, metadata: Dict = None) -> List[str]: - """ - Indexa un ejercicio en el vector store. - - Args: - exercise: Información del ejercicio - analysis: Análisis de complejidad - metadata: Metadatos adicionales - - Returns: - Lista de IDs de chunks creados - """ - content = exercise.get('content', '') - solution = exercise.get('solution', '') - - # Combinar ejercicio y solución - full_text = f"EJERCICIO:\n{content}\n\n" - if solution: - full_text += f"SOLUCIÓN:\n{solution}\n" - - # Para ejercicios, usar un solo chunk (son relativamente cortos) - chunks = [full_text] if len(full_text) < 2000 else self._chunk_text(full_text) - - # Preparar metadatos - chunk_metadata = { - 'type': 'exercise', - 'exercise_type': analysis.get('type', 'desconocido'), - 'complexity': str(analysis.get('total_complexity', 0)), - 'num_variables': str(analysis.get('num_variables', 0)), - 'num_concepts': str(analysis.get('num_concepts', 0)), - 'concepts': ','.join(analysis.get('concepts', [])), - 'source_file': str(exercise.get('source_file', '')), - 'label': exercise.get('label', ''), - } - - if metadata: - chunk_metadata.update(metadata) - - # Generar embeddings - embeddings = self._generate_embeddings_batch(chunks) - - # Sincronizar chunks con embeddings (por si se filtraron vacíos en _generate_embeddings_batch) - # Aunque aquí preferimos filtrar antes para mantener consistencia - valid_indices = [i for i, chunk in enumerate(chunks) if chunk and chunk.strip()] - chunks = [chunks[i] for i in valid_indices] - - if not chunks: - logger.warning(f"Ejercicio {exercise.get('label', 'unknown')} no tiene contenido válido para indexar") - return [] - - # Crear IDs y documentos - chunk_ids = [] - documents = [] - metadatas = [] - - for i, (chunk, embedding) in enumerate(zip(chunks, embeddings)): - chunk_id = self._create_chunk_id(f"{exercise.get('label', 'exercise')}_{i}", i) - chunk_ids.append(chunk_id) - documents.append(chunk) - metadatas.append({**chunk_metadata, 'chunk_index': str(i)}) - - # Agregar a la colección - self.collection.add( - ids=chunk_ids, - embeddings=embeddings, - documents=documents, - metadatas=metadatas - ) - - logger.info(f"Indexado ejercicio {exercise.get('label', 'unknown')}: {len(chunks)} chunks") - return chunk_ids - - def index_reading(self, content: str, metadata: Dict) -> List[str]: - """ - Indexa una lectura en el vector store. - - Args: - content: Contenido de la lectura - metadata: Metadatos (tema, título, etc.) - - Returns: - Lista de IDs de chunks creados - """ - chunking_config = self.config.get('chunking', {}) - chunk_size = chunking_config.get('chunk_size', 1000) - chunk_overlap = chunking_config.get('chunk_overlap', 100) - - chunks = self._chunk_text(content, chunk_size, chunk_overlap) - - # Preparar metadatos - chunk_metadata = { - 'type': 'reading', - **metadata - } - - # Generar embeddings - embeddings = self._generate_embeddings_batch(chunks) - - # Sincronizar chunks con embeddings - valid_indices = [i for i, chunk in enumerate(chunks) if chunk and chunk.strip()] - chunks = [chunks[i] for i in valid_indices] - - if not chunks: - logger.warning(f"Lectura {metadata.get('title', 'unknown')} no tiene contenido válido para indexar") - return [] - - # Crear IDs y documentos - chunk_ids = [] - documents = [] - metadatas = [] - - source = metadata.get('source_file', 'reading') - - for i, (chunk, embedding) in enumerate(zip(chunks, embeddings)): - chunk_id = self._create_chunk_id(f"{source}_{i}", i) - chunk_ids.append(chunk_id) - documents.append(chunk) - metadatas.append({**chunk_metadata, 'chunk_index': str(i)}) - - # Agregar a la colección - self.collection.add( - ids=chunk_ids, - embeddings=embeddings, - documents=documents, - metadatas=metadatas - ) - - logger.info(f"Indexada lectura {metadata.get('title', 'unknown')}: {len(chunks)} chunks") - return chunk_ids - - def index_materials(self, materials: List[Dict], analyzer) -> Dict[str, int]: - """ - Indexa una lista de materiales. - - Args: - materials: Lista de materiales extraídos - analyzer: ExerciseAnalyzer para analizar ejercicios - - Returns: - Diccionario con estadísticas de indexación - """ - stats = { - 'exercises': 0, - 'readings': 0, - 'chunks': 0 - } - - for material in materials: - # Indexar ejercicios - exercises = material.get('exercises', []) - for exercise_data in exercises: - # Buscar solución correspondiente - solution = None - for sol in material.get('solutions', []): - if sol['exercise_label'] == exercise_data['label']: - solution = sol - break - - exercise = { - 'label': exercise_data['label'], - 'content': exercise_data.get('resolved_content', ''), - 'source_file': material['file_path'], - 'solution': solution['resolved_content'] if solution else None - } - - # Analizar ejercicio - analysis = analyzer.analyze(exercise) - - # Indexar - metadata = { - 'topic': material.get('frontmatter', {}).get('subject', ''), - 'file_path': str(material['file_path']) - } - - chunk_ids = self.index_exercise(exercise, analysis, metadata) - stats['exercises'] += 1 - stats['chunks'] += len(chunk_ids) - - # Indexar lecturas (si hay contenido de lectura) - content_body = material.get('content_body', '') - filename = str(material.get('file_path', '')) - - # Heurística: Indexar como lectura si tiene "lectura" o "teoria" en el nombre - # y tiene contenido sustancial (> 200 chars) - if ('lectura' in filename.lower() or 'teoria' in filename.lower()) and len(content_body) > 200: - metadata = { - 'title': material.get('frontmatter', {}).get('title', ''), - 'subject': material.get('frontmatter', {}).get('subject', ''), - 'tags': ','.join(material.get('frontmatter', {}).get('tags', [])), - 'source_file': filename - } - chunk_ids = self.index_reading(content_body, metadata) - stats['readings'] += 1 - stats['chunks'] += len(chunk_ids) - - logger.info(f"Indexación completada: {stats}") - return stats - - def clear_collection(self): - """Limpia la colección (útil para re-indexar).""" - collection_name = self.collection.name - self.client.delete_collection(name=collection_name) - vs_config = self.config.get('vector_store', {}) - self.collection = self.client.create_collection( - name=collection_name, - metadata={"hnsw:space": "cosine"} - ) - logger.info(f"Colección {collection_name} limpiada") - + elif self.embedding_provider == "sentence-transformers": + return self.embedding_model.encode( + texts, show_progress_bar=True, batch_size=32 + ).tolist() + + def _chunk_text( + self, text: str, chunk_size: int = 1000, overlap: int = 100 + ) -> List[str]: + """ + Divide un texto en chunks con overlap. + + Args: + text: Texto a dividir + chunk_size: Tamaño de cada chunk (en caracteres aproximados) + overlap: Overlap entre chunks + + Returns: + Lista de chunks + """ + if len(text) <= chunk_size: + return [text] + + chunks = [] + start = 0 + + while start < len(text): + end = start + chunk_size + chunk = text[start:end] + + # Intentar cortar en un punto razonable (espacio o salto de línea) + if end < len(text): + last_newline = chunk.rfind("\n") + last_space = chunk.rfind(" ") + cut_point = max(last_newline, last_space) + + if ( + cut_point > chunk_size * 0.5 + ): # Si encontramos un buen punto de corte + chunk = chunk[:cut_point] + end = start + cut_point + + chunks.append(chunk.strip()) + start = end - overlap + + return chunks + + def _create_chunk_id(self, source: str, chunk_index: int) -> str: + """Crea un ID único para un chunk.""" + content = f"{source}_{chunk_index}" + return hashlib.md5(content.encode()).hexdigest() + + def index_exercise( + self, exercise: Dict, analysis: Dict, metadata: Dict = None + ) -> List[str]: + """ + Indexa un ejercicio en el vector store. + + Args: + exercise: Información del ejercicio + analysis: Análisis de complejidad + metadata: Metadatos adicionales + + Returns: + Lista de IDs de chunks creados + """ + content = exercise.get("content", "") + solution = exercise.get("solution", "") + + # Combinar ejercicio y solución + full_text = f"EJERCICIO:\n{content}\n\n" + if solution: + full_text += f"SOLUCIÓN:\n{solution}\n" + + # Para ejercicios, usar un solo chunk (son relativamente cortos) + chunks = [full_text] if len(full_text) < 2000 else self._chunk_text(full_text) + + # Preparar metadatos + chunk_metadata = { + "type": "exercise", + "exercise_type": analysis.get("type", "desconocido"), + "complexity": str(analysis.get("total_complexity", 0)), + "num_variables": str(analysis.get("num_variables", 0)), + "num_concepts": str(analysis.get("num_concepts", 0)), + "concepts": ",".join(analysis.get("concepts", [])), + "source_file": str(exercise.get("source_file", "")), + "label": exercise.get("label", ""), + } + + if metadata: + chunk_metadata.update(metadata) + + # Generar embeddings + embeddings = self._generate_embeddings_batch(chunks) + + # Sincronizar chunks con embeddings (por si se filtraron vacíos en _generate_embeddings_batch) + # Aunque aquí preferimos filtrar antes para mantener consistencia + valid_indices = [i for i, chunk in enumerate(chunks) if chunk and chunk.strip()] + chunks = [chunks[i] for i in valid_indices] + + if not chunks: + logger.warning( + f"Ejercicio {exercise.get('label', 'unknown')} no tiene contenido válido para indexar" + ) + return [] + + # Crear IDs y documentos + chunk_ids = [] + documents = [] + metadatas = [] + + for i, (chunk, embedding) in enumerate(zip(chunks, embeddings)): + chunk_id = self._create_chunk_id( + f"{exercise.get('label', 'exercise')}_{i}", i + ) + chunk_ids.append(chunk_id) + documents.append(chunk) + metadatas.append({**chunk_metadata, "chunk_index": str(i)}) + + # Agregar a la colección + self.collection.add( + ids=chunk_ids, + embeddings=embeddings, + documents=documents, + metadatas=metadatas, + ) + + logger.info( + f"Indexado ejercicio {exercise.get('label', 'unknown')}: {len(chunks)} chunks" + ) + return chunk_ids + + def index_reading(self, content: str, metadata: Dict) -> List[str]: + """ + Indexa una lectura en el vector store. + + Args: + content: Contenido de la lectura + metadata: Metadatos (tema, título, etc.) + + Returns: + Lista de IDs de chunks creados + """ + chunking_config = self.config.get("chunking", {}) + chunk_size = chunking_config.get("chunk_size", 1000) + chunk_overlap = chunking_config.get("chunk_overlap", 100) + + chunks = self._chunk_text(content, chunk_size, chunk_overlap) + + # Preparar metadatos + chunk_metadata = {"type": "reading", **metadata} + + # Generar embeddings + embeddings = self._generate_embeddings_batch(chunks) + + # Sincronizar chunks con embeddings + valid_indices = [i for i, chunk in enumerate(chunks) if chunk and chunk.strip()] + chunks = [chunks[i] for i in valid_indices] + + if not chunks: + logger.warning( + f"Lectura {metadata.get('title', 'unknown')} no tiene contenido válido para indexar" + ) + return [] + + # Crear IDs y documentos + chunk_ids = [] + documents = [] + metadatas = [] + + source = metadata.get("source_file", "reading") + + for i, (chunk, embedding) in enumerate(zip(chunks, embeddings)): + chunk_id = self._create_chunk_id(f"{source}_{i}", i) + chunk_ids.append(chunk_id) + documents.append(chunk) + metadatas.append({**chunk_metadata, "chunk_index": str(i)}) + + # Agregar a la colección + self.collection.add( + ids=chunk_ids, + embeddings=embeddings, + documents=documents, + metadatas=metadatas, + ) + + logger.info( + f"Indexada lectura {metadata.get('title', 'unknown')}: {len(chunks)} chunks" + ) + return chunk_ids + + def index_materials(self, materials: List[Dict], analyzer) -> Dict[str, int]: + """ + Indexa una lista de materiales. + + Args: + materials: Lista de materiales extraídos + analyzer: ExerciseAnalyzer para analizar ejercicios + + Returns: + Diccionario con estadísticas de indexación + """ + stats = {"exercises": 0, "readings": 0, "chunks": 0} + + for material in materials: + # Indexar ejercicios + exercises = material.get("exercises", []) + + # Pre-compute solutions dictionary for O(1) lookup + solutions_dict = {} + for sol in material.get("solutions", []): + if sol["exercise_label"] not in solutions_dict: + solutions_dict[sol["exercise_label"]] = sol + + for exercise_data in exercises: + # Buscar solución correspondiente + solution = solutions_dict.get(exercise_data["label"]) + + exercise = { + "label": exercise_data["label"], + "content": exercise_data.get("resolved_content", ""), + "source_file": material["file_path"], + "solution": solution["resolved_content"] if solution else None, + } + + # Analizar ejercicio + analysis = analyzer.analyze(exercise) + + # Indexar + metadata = { + "topic": material.get("frontmatter", {}).get("subject", ""), + "file_path": str(material["file_path"]), + } + + chunk_ids = self.index_exercise(exercise, analysis, metadata) + stats["exercises"] += 1 + stats["chunks"] += len(chunk_ids) + + # Indexar lecturas (si hay contenido de lectura) + content_body = material.get("content_body", "") + filename = str(material.get("file_path", "")) + + # Heurística: Indexar como lectura si tiene "lectura" o "teoria" en el nombre + # y tiene contenido sustancial (> 200 chars) + if ("lectura" in filename.lower() or "teoria" in filename.lower()) and len( + content_body + ) > 200: + metadata = { + "title": material.get("frontmatter", {}).get("title", ""), + "subject": material.get("frontmatter", {}).get("subject", ""), + "tags": ",".join(material.get("frontmatter", {}).get("tags", [])), + "source_file": filename, + } + chunk_ids = self.index_reading(content_body, metadata) + stats["readings"] += 1 + stats["chunks"] += len(chunk_ids) + + logger.info(f"Indexación completada: {stats}") + return stats + + def clear_collection(self): + """Limpia la colección (útil para re-indexar).""" + collection_name = self.collection.name + self.client.delete_collection(name=collection_name) + vs_config = self.config.get("vector_store", {}) + self.collection = self.client.create_collection( + name=collection_name, metadata={"hnsw:space": "cosine"} + ) + logger.info(f"Colección {collection_name} limpiada")