diff --git a/maab/agents/mlzero_default/mlzero_default.sh b/maab/agents/mlzero_default/mlzero_default.sh index 0ce7cac6..86829e44 100644 --- a/maab/agents/mlzero_default/mlzero_default.sh +++ b/maab/agents/mlzero_default/mlzero_default.sh @@ -57,9 +57,9 @@ fi mlzero \ -i "$TRAINING_PATH" \ -o "$OUTPUT_DIR" \ - -n 10 \ + -n 5 \ -v 1 \ - -u "complete the task in 10 minutes" + -u "Use models in Huggingface." # Check if the process was successful if [ $? -ne 0 ]; then diff --git a/src/autogluon/assistant/agents/reranker_agent.py b/src/autogluon/assistant/agents/reranker_agent.py index 69805658..e17cd2bd 100644 --- a/src/autogluon/assistant/agents/reranker_agent.py +++ b/src/autogluon/assistant/agents/reranker_agent.py @@ -37,9 +37,6 @@ def __call__(self): """Select and rerank relevant tutorials from retrieved candidates.""" self.manager.log_agent_start("RerankerAgent: reranking and selecting top tutorials from retrieved candidates.") - # Get retrieved tutorials from manager - retrieved_tutorials = self.manager.tutorial_retrieval - # Build prompt for tutorial reranking prompt = self.reranker_prompt.build() @@ -56,7 +53,7 @@ def __call__(self): # Fallback: if parsing fails or returns empty, use top tutorials by score if not selected_tutorials: logger.warning("Tutorial reranking failed, falling back to top tutorials by retrieval score.") - selected_tutorials = self._select_top_by_score(retrieved_tutorials) + selected_tutorials = self._select_top_by_score(self.reranker_prompt.tutorials) # Generate tutorial prompt using selected tutorials tutorial_prompt = self._generate_tutorial_prompt(selected_tutorials) diff --git a/src/autogluon/assistant/agents/tool_selector_agent.py b/src/autogluon/assistant/agents/tool_selector_agent.py index 7fc6ada1..b25cd6b2 100644 --- a/src/autogluon/assistant/agents/tool_selector_agent.py +++ b/src/autogluon/assistant/agents/tool_selector_agent.py @@ -56,6 +56,8 @@ def __call__(self) -> Tuple[str, str]: selected_tool = self.tool_selector_prompt.parse(response) + selected_tool = "huggingface" + self.manager.log_agent_end("ToolSelectorAgent: selected tool and recorded justification.") return selected_tool diff --git a/src/autogluon/assistant/configs/default.yaml b/src/autogluon/assistant/configs/default.yaml index 33219463..13ac531b 100644 --- a/src/autogluon/assistant/configs/default.yaml +++ b/src/autogluon/assistant/configs/default.yaml @@ -1,18 +1,18 @@ # Tutorial Prompt Generator Configuration -per_execution_timeout: 86400 +per_execution_timeout: 7200 # Data Perception max_file_group_size_to_show: 5 num_example_files_to_show: 1 max_chars_per_file: 1024 -num_tutorial_retrievals: 20 -max_num_tutorials: 5 +num_tutorial_retrievals: 50 +max_num_tutorials: 1 max_user_input_length: 2048 max_error_message_length: 2048 max_tutorial_length: 8192 -create_venv: false +create_venv: True condense_tutorials: True use_tutorial_summary: True diff --git a/src/autogluon/assistant/prompts/reranker_prompt.py b/src/autogluon/assistant/prompts/reranker_prompt.py index dddf61bd..4ee51465 100644 --- a/src/autogluon/assistant/prompts/reranker_prompt.py +++ b/src/autogluon/assistant/prompts/reranker_prompt.py @@ -84,8 +84,8 @@ def build(self) -> str: condense_tutorials = self.manager.config.condense_tutorials use_tutorial_summary = self.manager.config.use_tutorial_summary - # Get all available tutorials - self.tutorials = get_all_tutorials(selected_tool, condensed=condense_tutorials) + # Get retrieved tutorials from manager + self.tutorials = self.manager.tutorial_retrieval if not self.tutorials: logger.warning(f"No tutorials found for {selected_tool}") diff --git a/src/autogluon/assistant/tools_registry/_common/catalog.json b/src/autogluon/assistant/tools_registry/_common/catalog.json index 41935648..0fc5b8ce 100644 --- a/src/autogluon/assistant/tools_registry/_common/catalog.json +++ b/src/autogluon/assistant/tools_registry/_common/catalog.json @@ -24,6 +24,11 @@ "path": "machine learning", "version": "0.1.0", "description": "You should select this as a general reference of machine learning or deep learning algorithms in case other tools are not helpful." + }, + "huggingface": { + "path": "huggingface", + "version": "1.0.0", + "description": "Here we collect top liked/downloaded models from huggingface for each task." } } } \ No newline at end of file diff --git a/src/autogluon/assistant/tools_registry/register_huggingface.py b/src/autogluon/assistant/tools_registry/register_huggingface.py new file mode 100644 index 00000000..afe82876 --- /dev/null +++ b/src/autogluon/assistant/tools_registry/register_huggingface.py @@ -0,0 +1,574 @@ +#!/usr/bin/env python3 +""" +Hugging Face Tool Registration Script + +This script registers Hugging Face as an ML library tool in the registry by: +1. Fetching top models across all tasks +2. Extracting detailed model descriptions and documentation +3. Creating organized documentation files +4. Registering the tool with the registry +""" + +import logging +import re +from pathlib import Path +from typing import Dict, List, Optional +from urllib.parse import urlparse + +import requests +from omegaconf import OmegaConf + +from .registry import ToolsRegistry +from .utils import HuggingFaceModelScraper, HuggingFaceModelsFetcher + +# Configure logging +logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") +logger = logging.getLogger(__name__) + + +class HuggingFaceToolRegistrar: + def __init__(self, output_dir: str = "hf_tutorials", top_models_per_task: int = 3): + """ + Initialize the Hugging Face tool registrar. + + Args: + output_dir: Directory to save tutorial files + top_models_per_task: Number of top models to fetch per task + """ + self.output_dir = Path(output_dir) + self.top_models_per_task = top_models_per_task + self.models_fetcher = HuggingFaceModelsFetcher() + self.model_scraper = HuggingFaceModelScraper(delay=0.5) # Reduced delay for faster processing + self.registry = ToolsRegistry() + + # Create output directory + self.output_dir.mkdir(exist_ok=True) + + def fetch_top_models(self) -> Dict[str, List[Dict]]: + """ + Fetch top models for all tasks from Hugging Face. + + Returns: + Dictionary with task names as keys and model lists as values + """ + logger.info(f"Fetching top {self.top_models_per_task} models for each task...") + + # Get top liked models + top_liked = self.models_fetcher.get_top_models_all_tasks(n=self.top_models_per_task, sort_by="likes") + + # Get top downloaded models + top_downloaded = self.models_fetcher.get_top_models_all_tasks(n=self.top_models_per_task, sort_by="downloads") + + # Merge the results + merged_models = self.models_fetcher.merge_top_models( + top_liked, top_downloaded, max_per_task=self.top_models_per_task * 2 + ) + + logger.info(f"Successfully fetched models for {len(merged_models)} tasks") + return merged_models + + def create_model_documentation(self, models_by_task: Dict[str, List[Dict]]) -> None: + """ + Create detailed documentation files for models organized by task. + + Args: + models_by_task: Dictionary of models organized by task + """ + logger.info("Creating model documentation files...") + + # Create individual model tutorial files directly in output directory + for task, models in models_by_task.items(): + logger.info(f"Processing {len(models)} models for task: {task}") + + for model in models: + self.create_model_tutorial(model, task) + + # Create master index file + self.create_master_index(models_by_task) + + def create_model_tutorial(self, model: Dict, task: str) -> None: + """ + Create a detailed tutorial file for a specific model using scraped content. + Checks for GitHub links and appends GitHub README content if found. + + Args: + model: Model information dictionary + task: Task name for this model + """ + model_id = model["model_id"] + safe_model_name = self.sanitize_filename(model_id.replace("/", "_")) + + # Create filename: [task] model_identifier.md + filename = f"[{task}] {safe_model_name}.md" + tutorial_file = self.output_dir / filename + + logger.info(f"Creating tutorial for {model_id}...") + + # Try to get detailed model information using scraper + detailed_info = None + if model.get("url"): + try: + logger.info(f"Fetching detailed info for {model_id}...") + detailed_info = self.model_scraper.extract_model_content(model["url"]) + except Exception as e: + logger.warning(f"Could not fetch detailed info for {model_id}: {e}") + + # Create comprehensive model tutorial content + content = f"""# {model_id} - {task.replace('-', ' ').title()} + +## Model Overview + +**Model ID**: `{model_id}` +**Task**: {task} +**URL**: {model.get('url', 'N/A')} +**Likes**: {model.get('likes', 'N/A'):,} +**Downloads**: {model.get('downloads', 'N/A'):,} +**Library**: {model.get('library_name', 'N/A')} +**Pipeline Tag**: {model.get('pipeline_tag', 'N/A')} +**Source Ranking**: {model.get('source', 'N/A')} + +""" + + github_readme_contents = [] + + if detailed_info and not detailed_info.get("error"): + # Add description + if detailed_info.get("description"): + content += f"""## Description + +{detailed_info['description']} + +""" + + # Add tags + if detailed_info.get("tags"): + content += f"""## Tags + +`{' | '.join(detailed_info['tags'][:15])}` + +""" + + # Add the full README content if available + if detailed_info.get("readme_content"): + content += f"""## Model Documentation + +{detailed_info['readme_content']} + +""" + + # Add metadata if available + if detailed_info.get("metadata"): + content += """## Additional Metadata + +""" + for key, value in detailed_info["metadata"].items(): + content += f"- **{key.title()}**: {value}\n" + content += "\n" + + github_links = self._extract_github_links(content, max_repos=3) + if github_links: + for github_link in github_links: + logger.info(f"Found GitHub links for {model_id}: {github_link}") + # Try to fetch README from the first GitHub repository + github_readme_content = self._fetch_github_readme(github_link) + content += f"""## GitHub Repository Documentation at {github_link} + +{github_readme_content} + +""" + + else: + # Fallback content if scraping failed + content += f"""## Description + +This is a {task.replace('-', ' ')} model from Hugging Face. Detailed documentation may be available at the model's Hugging Face page. + +""" + + # Write the tutorial file + with open(tutorial_file, "w", encoding="utf-8") as f: + f.write(content) + + logger.info(f"Created tutorial: {filename}") + + def _extract_github_links(self, content: str, max_repos: int = 3) -> list: + """ + Extract GitHub repository links from content. + + Args: + content: Text content to search for GitHub links + max_repos: Maximum number of unique repositories to return + + Returns: + List of unique GitHub repository URLs (limited by max_repos) + """ + if not content: + return [] + + # Pattern to match GitHub repository URLs, excluding common punctuation at the end + github_patterns = [ + r"https?://github\.com/[^/\s]+/[^/\s]+(?:/[^/\s)]*)?", + r"github\.com/[^/\s]+/[^/\s]+(?:/[^/\s)]*)?", + ] + + github_links = [] + + for pattern in github_patterns: + matches = re.findall(pattern, content, re.IGNORECASE) + for match in matches: + # Clean up the URL by removing trailing punctuation + match = match.rstrip(".,;:!?)") + + # Normalize the URL + if not match.startswith("http"): + match = "https://" + match + + # Extract just the repository part (owner/repo) + parsed = urlparse(match) + path_parts = [p for p in parsed.path.split("/") if p] + + if len(path_parts) >= 2: + repo_url = f"https://github.com/{path_parts[0]}/{path_parts[1]}" + if repo_url not in github_links: + github_links.append(repo_url) + # Stop if we've reached the maximum number of repos + if len(github_links) >= max_repos: + return github_links + + return github_links + + def _fetch_github_readme(self, github_url: str) -> Optional[str]: + """ + Fetch README content from a GitHub repository. + + Args: + github_url: GitHub repository URL + + Returns: + README content as string, or None if not found + """ + try: + # Parse the GitHub URL to get owner and repo + parsed = urlparse(github_url) + path_parts = [p for p in parsed.path.split("/") if p] + + if len(path_parts) < 2: + return None + + owner, repo = path_parts[0], path_parts[1] + + # Try different README file names + readme_files = ["README.md", "readme.md", "README.rst", "readme.rst", "README.txt", "readme.txt"] + + for readme_file in readme_files: + # Use GitHub's raw content URL + raw_url = f"https://raw.githubusercontent.com/{owner}/{repo}/main/{readme_file}" + + print(raw_url) + try: + response = requests.get(raw_url, timeout=10) + if response.status_code == 200: + logger.info(f"Successfully fetched {readme_file} from {github_url}") + return response.text + except requests.RequestException: + continue + + # Try 'master' branch if 'main' doesn't work + raw_url = f"https://raw.githubusercontent.com/{owner}/{repo}/master/{readme_file}" + + print(raw_url) + try: + response = requests.get(raw_url, timeout=10) + if response.status_code == 200: + logger.info(f"Successfully fetched {readme_file} from {github_url} (master branch)") + return response.text + except requests.RequestException as e: + continue + + logger.warning(f"Could not find README file for {github_url}") + return None + + except Exception as e: + logger.warning(f"Error fetching GitHub README from {github_url}: {e}") + return None + + def create_master_index(self, models_by_task: Dict[str, List[Dict]]) -> None: + """ + Create a master index file for all tasks and models. + + Args: + models_by_task: Dictionary of models organized by task + """ + content = """# Hugging Face Models Documentation + +This documentation provides comprehensive tutorials for top Hugging Face models across different tasks. + +## Available Models by Task + +""" + + # Group models by task and create links + for task, models in sorted(models_by_task.items()): + content += f"### {task.replace('-', ' ').title()} ({len(models)} models)\n\n" + + for model in models: + safe_model_name = self.sanitize_filename(model["model_id"].replace("/", "_")) + filename = f"[{task}] {safe_model_name}.md" + content += f"- [{model['model_id']}](./{filename})\n" + content += "\n" + + content += """ + +## Quick Navigation by Category + +### Natural Language Processing +""" + + nlp_tasks = [ + task + for task in models_by_task.keys() + if any( + nlp_term in task.lower() + for nlp_term in [ + "text", + "language", + "translation", + "question", + "summarization", + "classification", + "generation", + "fill-mask", + "token-classification", + ] + ) + ] + for task in sorted(nlp_tasks): + models = models_by_task[task] + content += f"- **{task}**: {len(models)} models\n" + + content += """ +### Computer Vision +""" + + cv_tasks = [ + task + for task in models_by_task.keys() + if any( + cv_term in task.lower() + for cv_term in [ + "image", + "vision", + "detection", + "segmentation", + "depth", + "keypoint", + "object", + "video", + "unconditional", + ] + ) + ] + for task in sorted(cv_tasks): + models = models_by_task[task] + content += f"- **{task}**: {len(models)} models\n" + + content += """ +### Audio Processing +""" + + audio_tasks = [ + task + for task in models_by_task.keys() + if any( + audio_term in task.lower() + for audio_term in ["audio", "speech", "voice", "automatic-speech-recognition"] + ) + ] + for task in sorted(audio_tasks): + models = models_by_task[task] + content += f"- **{task}**: {len(models)} models\n" + + content += """ +### Multimodal +""" + + multimodal_tasks = [ + task + for task in models_by_task.keys() + if any( + mm_term in task.lower() + for mm_term in [ + "any-to-any", + "visual-question", + "document-question", + "image-text", + "video-text", + "audio-text", + ] + ) + ] + for task in sorted(multimodal_tasks): + models = models_by_task[task] + content += f"- **{task}**: {len(models)} models\n" + + content += f""" + +## Statistics + +- **Total Tasks**: {len(models_by_task)} +- **Total Models**: {sum(len(models) for models in models_by_task.values())} +- **Tutorial Files**: {sum(len(models) for models in models_by_task.values())} + +## How to Use These Tutorials + +Each tutorial file contains: + +1. **Model Overview**: Basic information and statistics +2. **Description**: Detailed model description from Hugging Face +3. **Full Documentation**: Complete README content from the model page +4. **Quick Start**: Ready-to-use code examples +5. **Integration Tips**: Best practices and optimization suggestions + +## Common Usage Patterns + +### For Text Tasks +```python +from transformers import pipeline + +# Quick pipeline approach +pipe = pipeline("task-name", model="model-id") +result = pipe("Your input text") +``` + +### For Vision Tasks +```python +from transformers import pipeline + +# Image processing pipeline +pipe = pipeline("image-classification", model="model-id") +result = pipe("path/to/image.jpg") +``` + +### For Audio Tasks +```python +from transformers import pipeline + +# Audio processing pipeline +pipe = pipeline("automatic-speech-recognition", model="model-id") +result = pipe("path/to/audio.wav") +``` + +## Installation Requirements + +```bash +pip install transformers torch torchvision torchaudio +``` + +For specific models, additional dependencies might be required. Check individual tutorial files for model-specific requirements. + +--- + +*This documentation was automatically generated from the top Hugging Face models across all tasks.* +""" + + index_file = self.output_dir / "README.md" + with open(index_file, "w", encoding="utf-8") as f: + f.write(content) + + def register_tool(self) -> None: + """ + Register Hugging Face as a tool in the registry. + """ + logger.info("Registering Hugging Face tool...") + + # Define tool information + tool_name = "huggingface" + version = "1.0.0" + description = "Here we collect top liked/downloaded models from huggingface for each task." + + features = ["All tasks supported in huggingface are available."] + + requirements = [] + + prompt_template = [] + + mlzero_dir = Path(__file__).parent.parent + + # Always load default config first + default_config_path = mlzero_dir / "configs" / "default.yaml" + if not default_config_path.exists(): + raise FileNotFoundError(f"Default config file not found: {default_config_path}") + + config = OmegaConf.load(default_config_path) + + # Register the tool + self.registry.register_tool( + name=tool_name, + version=version, + description=description, + features=features, + requirements=requirements, + prompt_template=prompt_template, + tutorials_path=self.output_dir, + condense=True, + max_length=16384, # Reasonable length for condensed tutorials + llm_config=config.llm, # TODO: add customizable config + ) + + logger.info(f"Successfully registered {tool_name} tool with {len(features)} features") + + @staticmethod + def sanitize_filename(filename: str) -> str: + """ + Sanitize filename for cross-platform compatibility. + + Args: + filename: Original filename + + Returns: + Sanitized filename + """ + # Replace problematic characters + filename = filename.replace("/", "_").replace("\\", "_") + filename = filename.replace(":", "_").replace("*", "_") + filename = filename.replace("?", "_").replace('"', "_") + filename = filename.replace("<", "_").replace(">", "_") + filename = filename.replace("|", "_").replace(" ", "_") + + # Remove consecutive underscores + while "__" in filename: + filename = filename.replace("__", "_") + + return filename.strip("_") + + def run_registration_process(self) -> None: + """ + Execute the complete registration process. + """ + try: + logger.info("Starting Hugging Face tool registration process...") + + # Step 1: Fetch top models + models_by_task = self.fetch_top_models() + + # Step 2: Create documentation + self.create_model_documentation(models_by_task) + + # Step 3: Register the tool + self.register_tool() + + logger.info("Hugging Face tool registration completed successfully!") + logger.info(f"Documentation created in: {self.output_dir.absolute()}") + + # Print summary + total_models = sum(len(models) for models in models_by_task.values()) + logger.info("Summary:") + logger.info(f" - Tasks processed: {len(models_by_task)}") + logger.info(f" - Models documented: {total_models}") + logger.info( + f" - Documentation files created: {sum(len(models) + 1 for models in models_by_task.values()) + 1}" + ) + + except Exception as e: + logger.error(f"Registration process failed: {e}") + raise diff --git a/src/autogluon/assistant/tools_registry/registry.py b/src/autogluon/assistant/tools_registry/registry.py index 7349b0c0..30f80fc3 100644 --- a/src/autogluon/assistant/tools_registry/registry.py +++ b/src/autogluon/assistant/tools_registry/registry.py @@ -155,7 +155,7 @@ def add_tool_tutorials( condense: bool = True, llm_config=None, max_length: int = 9999, - chunk_size: int = 8192, # Size of chunks for processing + chunk_size: int = 16384, # Size of chunks for processing ) -> None: """ Add tutorials to a registered tool, with option to condense them using LLM. @@ -213,17 +213,24 @@ def add_tool_tutorials( for i, chunk in enumerate(chunks): context = "This is a continuation of the previous chunk. " if i > 0 else "" - chunk_prompt = f"""{context}Condense this portion of the tutorial while preserving essential implementation details, code samples, and key concepts. Focus on: + chunk_prompt = f"""{context}Condense this portion of the tutorial while preserving essential implementation details, code samples, and key concepts. Remove unnecessary information that is not helpful for using the model, such as benchmarking results, citations, performance comparisons, research paper references, and promotional content. Focus on: 1. Implementation details and techniques 2. Code snippets with necessary context 3. Critical configurations and parameters 4. Important warnings and best practices +Remove or minimize: +- Benchmarking results and performance metrics +- Academic citations and paper references +- Marketing language and promotional content +- Historical context unless directly relevant to implementation +- Theoretical background that doesn't impact practical usage + Chunk {i+1}/{len(chunks)}: {chunk} -Provide the condensed content in markdown format.""" +Provide the condensed content in markdown format, focusing on actionable information for practitioners.""" condensed_chunk = llm.assistant_chat(chunk_prompt) condensed_chunks.append(condensed_chunk) diff --git a/src/autogluon/assistant/tools_registry/utils.py b/src/autogluon/assistant/tools_registry/utils.py index 0cda47bd..b0e8ac9d 100644 --- a/src/autogluon/assistant/tools_registry/utils.py +++ b/src/autogluon/assistant/tools_registry/utils.py @@ -1,4 +1,13 @@ -from typing import List +import json +import re +import time +from collections import defaultdict +from typing import Dict, List +from urllib.parse import urljoin + +import pandas as pd +import requests +from bs4 import BeautifulSoup def split_markdown_into_chunks(content: str, max_chunk_size: int = 4000) -> List[str]: @@ -122,3 +131,645 @@ def _split_large_section(section: str, max_chunk_size: int) -> List[str]: chunks.append("\n".join(current_chunk)) return chunks + + +class HuggingFaceModelsFetcher: + def __init__(self): + self.base_url = "https://huggingface.co/api" + self.session = requests.Session() + self.session.headers.update({"User-Agent": "HF-Models-Fetcher/1.0"}) + + def get_models_by_task(self, task: str, sort_by: str = "likes", limit: int = 10) -> List[Dict]: + """Get models filtered by specific task""" + url = f"{self.base_url}/models" + params = {"pipeline_tag": task, "sort": sort_by, "limit": limit, "full": "true"} + + try: + response = self.session.get(url, params=params) + response.raise_for_status() + return response.json() + except requests.RequestException as e: + print(f"Error fetching models for task {task}: {e}") + return [] + + def get_all_pipeline_tags(self) -> List[str]: + """Get all available pipeline tags/tasks from Hugging Face""" + return [ + # Multimodal + "any-to-any", + "audio-text-to-text", + "document-question-answering", + "visual-document-retrieval", + "image-text-to-text", + "video-text-to-text", + "visual-question-answering", + # Natural Language Processing + "feature-extraction", + "fill-mask", + "question-answering", + "sentence-similarity", + "summarization", + "table-question-answering", + "text-classification", + "text-generation", + "text-ranking", + "token-classification", + "translation", + "zero-shot-classification", + # Computer Vision + "depth-estimation", + "image-classification", + "image-feature-extraction", + "image-segmentation", + "image-to-image", + "image-to-text", + "image-to-video", + "keypoint-detection", + "mask-generation", + "object-detection", + "video-classification", + "text-to-image", + "text-to-video", + "unconditional-image-generation", + "zero-shot-image-classification", + "zero-shot-object-detection", + "text-to-3d", + "image-to-3d", + # Audio + "audio-classification", + "audio-to-audio", + "automatic-speech-recognition", + "text-to-speech", + # Tabular + "tabular-classification", + "tabular-regression", + # Reinforcement Learning + "reinforcement-learning", + # Additional common tasks that might use different naming conventions + "conversational", + "text2text-generation", + "voice-activity-detection", + "time-series-forecasting", + "robotics", + "other", + ] + + def extract_model_info(self, model: Dict) -> Dict: + """Extract relevant information from model data""" + model_id = model.get("id", "") + return { + "model_id": model_id, + "url": f"https://huggingface.co/{model_id}" if model_id else "", + "author": model.get("author", ""), + "likes": model.get("likes", 0), + "downloads": model.get("downloads", 0), + "created_at": model.get("createdAt", ""), + "last_modified": model.get("lastModified", ""), + "pipeline_tag": model.get("pipeline_tag", ""), + "library_name": model.get("library_name", ""), + "tags": model.get("tags", []), + "model_size": model.get("safetensors", {}).get("total", 0) if model.get("safetensors") else 0, + } + + def get_top_models_all_tasks( + self, n: int = 10, sort_by: str = "likes", include_downloads: bool = True + ) -> Dict[str, List[Dict]]: + """ + Get top N models for each task + + Args: + n: Number of top models to fetch per task + sort_by: Sort criteria ('likes', 'downloads', 'modified', 'created') + include_downloads: Whether to also fetch top downloaded models + + Returns: + Dictionary with task names as keys and list of model info as values + """ + all_tasks_models = defaultdict(list) + pipeline_tags = self.get_all_pipeline_tags() + + print(f"Fetching top {n} models for {len(pipeline_tags)} tasks...") + + for i, task in enumerate(pipeline_tags): + print(f"Processing task {i+1}/{len(pipeline_tags)}: {task}") + + # Get top models by specified criteria (likes by default) + models = self.get_models_by_task(task, sort_by=sort_by, limit=n) + + if models: + task_models = [] + for model in models[:n]: # Ensure we only get top N + model_info = self.extract_model_info(model) + model_info["sort_criteria"] = sort_by + task_models.append(model_info) + + all_tasks_models[task] = task_models + print(f" Found {len(task_models)} models for {task}") + else: + print(f" No models found for {task}") + + # Add small delay to be respectful to the API + time.sleep(0.1) + + return dict(all_tasks_models) + + def merge_top_models( + self, liked_models: Dict[str, List[Dict]], downloaded_models: Dict[str, List[Dict]], max_per_task: int = 15 + ) -> Dict[str, List[Dict]]: + """ + Merge liked and downloaded models, removing duplicates and keeping top models + + Args: + liked_models: Dictionary of models sorted by likes + downloaded_models: Dictionary of models sorted by downloads + max_per_task: Maximum number of models to keep per task + + Returns: + Dictionary of merged top models per task + """ + merged_models = defaultdict(list) + + # Get all unique tasks + all_tasks = set(liked_models.keys()) | set(downloaded_models.keys()) + + for task in all_tasks: + liked_list = liked_models.get(task, []) + downloaded_list = downloaded_models.get(task, []) + + # Create a dictionary to track unique models by model_id + unique_models = {} + + # Add liked models first + for model in liked_list: + model_id = model["model_id"] + if model_id not in unique_models: + model_copy = model.copy() + model_copy["source"] = "liked" + unique_models[model_id] = model_copy + + # Add downloaded models, updating existing entries + for model in downloaded_list: + model_id = model["model_id"] + if model_id in unique_models: + # Model exists in both lists, mark as both + unique_models[model_id]["source"] = "both" + else: + # New model from downloads + model_copy = model.copy() + model_copy["source"] = "downloaded" + unique_models[model_id] = model_copy + + # Convert back to list and sort by a composite score + models_list = list(unique_models.values()) + + # Sort by composite score: prioritize models that appear in both lists, + # then by likes + normalized downloads + def composite_score(model): + base_score = model.get("likes", 0) + (model.get("downloads", 0) / 1000) # Normalize downloads + if model["source"] == "both": + base_score *= 1.5 # Bonus for appearing in both lists + return base_score + + models_list.sort(key=composite_score, reverse=True) + + # Keep only top models per task + merged_models[task] = models_list[:max_per_task] + + print(f"Task {task}: {len(models_list)} unique models merged, keeping top {len(merged_models[task])}") + + return dict(merged_models) + + def save_to_json(self, data: Dict, filename: str = "top_hf_models.json"): + """Save results to JSON file""" + try: + with open(filename, "w", encoding="utf-8") as f: + json.dump(data, f, indent=2, ensure_ascii=False) + print(f"Results saved to {filename}") + except Exception as e: + print(f"Error saving to JSON: {e}") + + def save_to_csv(self, data: Dict, filename: str = "top_hf_models.csv"): + """Save results to CSV file""" + try: + # Flatten the data for CSV format + rows = [] + for task, models in data.items(): + for model in models: + row = model.copy() + row["task"] = task + rows.append(row) + + df = pd.DataFrame(rows) + df.to_csv(filename, index=False) + print(f"Results saved to {filename}") + except Exception as e: + print(f"Error saving to CSV: {e}") + + def print_summary(self, data: Dict, title: str = "TOP MODELS BY TASK"): + """Print a summary of the results""" + print("\n" + "=" * 60) + print(title) + print("=" * 60) + + total_models = sum(len(models) for models in data.values()) + print(f"Total tasks processed: {len(data)}") + print(f"Total models found: {total_models}") + + print("\nTop 3 most liked models overall:") + all_models = [] + for task, models in data.items(): + for model in models: + model_copy = model.copy() + model_copy["task"] = task + all_models.append(model_copy) + + # Sort by likes + top_overall = sorted(all_models, key=lambda x: x.get("likes", 0), reverse=True)[:3] + for i, model in enumerate(top_overall, 1): + source_info = f" ({model.get('source', 'unknown')})" if "source" in model else "" + print(f"{i}. {model['model_id']} ({model['task']}) - {model['likes']} likes{source_info}") + + print("\nTasks with most models available:") + task_counts = [(task, len(models)) for task, models in data.items()] + task_counts.sort(key=lambda x: x[1], reverse=True) + for task, count in task_counts[:5]: + print(f" {task}: {count} models") + + # Show source distribution if available + if all_models and "source" in all_models[0]: + source_counts = defaultdict(int) + for model in all_models: + source_counts[model.get("source", "unknown")] += 1 + + print("\nModel source distribution:") + for source, count in source_counts.items(): + print(f" {source}: {count} models") + + +class HuggingFaceModelScraper: + def __init__(self, delay: float = 1.0): + """ + Initialize the scraper with optional delay between requests. + + Args: + delay: Delay in seconds between requests to be respectful + """ + self.session = requests.Session() + self.session.headers.update( + { + "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36" + } + ) + self.delay = delay + self.base_url = "https://huggingface.co" + + def extract_model_content(self, url: str) -> Dict: + """ + Extract all relevant content from a Hugging Face model page. + + Args: + url: The Hugging Face model page URL + + Returns: + Dictionary containing extracted model information + """ + try: + # Add delay to be respectful + time.sleep(self.delay) + + response = self.session.get(url) + response.raise_for_status() + + soup = BeautifulSoup(response.content, "html.parser") + + model_data = { + "url": url, + "model_name": self._extract_model_name(soup, url), + "description": self._extract_description(soup), + "readme_content": self._extract_readme(soup), + "metadata": self._extract_metadata(soup), + "tags": self._extract_tags(soup), + "model_card": self._extract_model_card(soup), + "files": self._extract_files_info(soup), + "pipeline_tag": self._extract_pipeline_tag(soup), + "library_name": self._extract_library_name(soup), + } + + return model_data + + except requests.RequestException as e: + print(f"Error fetching URL {url}: {e}") + return {"error": str(e), "url": url} + except Exception as e: + print(f"Error processing content: {e}") + return {"error": str(e), "url": url} + + def _extract_model_name(self, soup: BeautifulSoup, url: str) -> str: + """Extract model name from the page.""" + # Try to get from title tag + title_tag = soup.find("title") + if title_tag: + title = title_tag.get_text().strip() + # Remove " · Hugging Face" suffix if present + if " · Hugging Face" in title: + return title.replace(" · Hugging Face", "").strip() + + # Fallback: extract from URL + return url.split("/")[-1] if url.split("/") else "Unknown" + + def _extract_description(self, soup: BeautifulSoup) -> str: + """Extract model description.""" + # Look for meta description + meta_desc = soup.find("meta", attrs={"name": "description"}) + if meta_desc and meta_desc.get("content"): + return meta_desc.get("content").strip() + + # Look for description in various possible locations + desc_selectors = [ + 'div[data-target="ModelHeader"] p', + ".model-card-description", + "div.text-gray-700", + "p.text-gray-600", + ] + + for selector in desc_selectors: + desc_elem = soup.select_one(selector) + if desc_elem: + return desc_elem.get_text().strip() + + return "" + + def _extract_readme(self, soup: BeautifulSoup) -> str: + """Extract README/model card content in original markdown format.""" + # First try to get raw markdown from API endpoint + raw_markdown = self._get_raw_markdown_from_api(soup) + if raw_markdown: + return raw_markdown + + # Fallback: Try to extract from page source + raw_markdown = self._extract_markdown_from_page(soup) + if raw_markdown: + return raw_markdown + + # Last resort: Convert HTML back to approximate markdown + return self._convert_html_to_markdown(soup) + + def _get_raw_markdown_from_api(self, soup: BeautifulSoup) -> str: + """Try to get raw markdown content from HuggingFace API.""" + try: + # Extract model path from current URL + current_url = soup.find("link", {"rel": "canonical"}) + if not current_url: + return "" + + url = current_url.get("href", "") + if not url: + return "" + + # Parse model owner/name from URL + parts = url.replace("https://huggingface.co/", "").split("/") + if len(parts) < 2: + return "" + + model_path = f"{parts[0]}/{parts[1]}" + + # Try to fetch raw README.md from the API + api_url = f"https://huggingface.co/{model_path}/raw/main/README.md" + + time.sleep(self.delay) # Be respectful + response = self.session.get(api_url) + + if response.status_code == 200: + return response.text + + except Exception as e: + print(f"Could not fetch raw markdown from API: {e}") + + return "" + + def _extract_markdown_from_page(self, soup: BeautifulSoup) -> str: + """Try to extract markdown from script tags or data attributes.""" + # Look for script tags that might contain markdown + scripts = soup.find_all("script") + for script in scripts: + if script.string: + # Look for markdown content in various formats + if "README.md" in script.string or "# " in script.string: + # Try to extract markdown from JSON data + try: + import json + + # Look for JSON that might contain markdown + json_matches = re.findall(r"\{.*?\}", script.string, re.DOTALL) + for match in json_matches: + try: + data = json.loads(match) + if isinstance(data, dict): + # Look for markdown in various keys + for key in ["content", "markdown", "readme", "text"]: + if key in data and isinstance(data[key], str): + if len(data[key]) > 100 and "#" in data[key]: + return data[key] + except json.JSONDecodeError: + continue + except: + pass + + return "" + + def _convert_html_to_markdown(self, soup: BeautifulSoup) -> str: + """Convert HTML content back to approximate markdown format.""" + # Look for the main content area + readme_selectors = [ + 'div[data-target="ModelHeader"] + div', + ".prose", + "div.markdown", + "article", + 'div[class*="readme"]', + "div.model-card", + ] + + content_elem = None + for selector in readme_selectors: + content_elem = soup.select_one(selector) + if content_elem: + break + + if not content_elem: + return "" + + # Convert HTML elements to markdown + markdown_content = [] + + for element in content_elem.find_all( + [ + "h1", + "h2", + "h3", + "h4", + "h5", + "h6", + "p", + "ul", + "ol", + "li", + "pre", + "code", + "blockquote", + "a", + "strong", + "em", + ] + ): + if element.name in ["h1", "h2", "h3", "h4", "h5", "h6"]: + level = int(element.name[1]) + markdown_content.append(f"{'#' * level} {element.get_text().strip()}\n") + + elif element.name == "p": + text = element.get_text().strip() + if text: + markdown_content.append(f"{text}\n") + + elif element.name == "pre": + code_text = element.get_text() + # Check if it's a code block + if element.find("code"): + markdown_content.append(f"```\n{code_text}\n```\n") + else: + markdown_content.append(f"```\n{code_text}\n```\n") + + elif element.name == "code" and element.parent.name != "pre": + markdown_content.append(f"`{element.get_text()}`") + + elif element.name == "ul": + for li in element.find_all("li", recursive=False): + markdown_content.append(f"- {li.get_text().strip()}\n") + markdown_content.append("\n") + + elif element.name == "ol": + for i, li in enumerate(element.find_all("li", recursive=False), 1): + markdown_content.append(f"{i}. {li.get_text().strip()}\n") + markdown_content.append("\n") + + elif element.name == "blockquote": + quote_text = element.get_text().strip() + for line in quote_text.split("\n"): + if line.strip(): + markdown_content.append(f"> {line.strip()}\n") + markdown_content.append("\n") + + elif element.name == "a": + href = element.get("href", "") + text = element.get_text().strip() + if href and text: + markdown_content.append(f"[{text}]({href})") + + elif element.name == "strong": + markdown_content.append(f"**{element.get_text()}**") + + elif element.name == "em": + markdown_content.append(f"*{element.get_text()}*") + + # Join and clean up the markdown + markdown_text = "".join(markdown_content) + + # Clean up extra newlines + markdown_text = re.sub(r"\n{3,}", "\n\n", markdown_text) + + return markdown_text.strip() + + def _extract_metadata(self, soup: BeautifulSoup) -> Dict: + """Extract model metadata like downloads, likes, etc.""" + metadata = {} + + # Look for download count + download_elem = soup.find(text=re.compile(r"\d+\s*downloads?")) + if download_elem: + downloads = re.search(r"([\d,]+)\s*downloads?", download_elem, re.I) + if downloads: + metadata["downloads"] = downloads.group(1).replace(",", "") + + # Look for likes + like_elem = soup.find(text=re.compile(r"\d+\s*likes?")) + if like_elem: + likes = re.search(r"(\d+)\s*likes?", like_elem, re.I) + if likes: + metadata["likes"] = likes.group(1) + + # Look for model size + size_elem = soup.find(text=re.compile(r"\d+\.?\d*\s*[KMGT]?B")) + if size_elem: + size = re.search(r"(\d+\.?\d*\s*[KMGT]?B)", size_elem) + if size: + metadata["model_size"] = size.group(1) + + return metadata + + def _extract_tags(self, soup: BeautifulSoup) -> List[str]: + """Extract model tags.""" + tags = [] + + # Look for tag elements + tag_selectors = ["span.tag", ".badge", "[data-tag]", 'span[class*="tag"]'] + + for selector in tag_selectors: + tag_elems = soup.select(selector) + for elem in tag_elems: + tag_text = elem.get_text().strip() + if tag_text and tag_text not in tags: + tags.append(tag_text) + + return tags + + def _extract_model_card(self, soup: BeautifulSoup) -> Dict: + """Extract structured model card information.""" + model_card = {} + + # Look for JSON-LD structured data + json_scripts = soup.find_all("script", type="application/ld+json") + for script in json_scripts: + try: + data = json.loads(script.string) + if isinstance(data, dict): + model_card.update(data) + except json.JSONDecodeError: + continue + + return model_card + + def _extract_files_info(self, soup: BeautifulSoup) -> List[Dict]: + """Extract information about model files.""" + files = [] + + # Look for file listings + file_elems = soup.select('a[href*="/blob/"], a[href*="/resolve/"]') + for elem in file_elems: + href = elem.get("href", "") + filename = href.split("/")[-1] if href else "" + if filename: + files.append({"filename": filename, "url": urljoin(self.base_url, href)}) + + return files + + def _extract_pipeline_tag(self, soup: BeautifulSoup) -> str: + """Extract the pipeline tag/task type.""" + # Look for pipeline tag in various locations + pipeline_selectors = ["[data-pipeline-tag]", 'span[class*="pipeline"]', 'div[class*="task"]'] + + for selector in pipeline_selectors: + elem = soup.select_one(selector) + if elem: + return elem.get("data-pipeline-tag") or elem.get_text().strip() + + return "" + + def _extract_library_name(self, soup: BeautifulSoup) -> str: + """Extract the library name (e.g., transformers, sentence-transformers).""" + # Look for library information + lib_elem = soup.find(text=re.compile(r"(transformers|sentence-transformers|diffusers|timm)", re.I)) + if lib_elem: + match = re.search(r"(transformers|sentence-transformers|diffusers|timm)", lib_elem, re.I) + if match: + return match.group(1).lower() + + return "" diff --git a/tools/get_hf_model_descriptions.py b/tools/get_hf_model_descriptions.py deleted file mode 100644 index 5fc1efe4..00000000 --- a/tools/get_hf_model_descriptions.py +++ /dev/null @@ -1,444 +0,0 @@ -#!/usr/bin/env python3 -""" -Hugging Face Model Page Content Extractor - -This script extracts content from Hugging Face model pages including: -- Model name and description -- README content -- Model metadata (downloads, likes, etc.) -- Model card information -- Tags and task information -""" - -import json -import re -import time -from typing import Dict, List -from urllib.parse import urljoin - -import requests -from bs4 import BeautifulSoup - - -class HuggingFaceModelScraper: - def __init__(self, delay: float = 1.0): - """ - Initialize the scraper with optional delay between requests. - - Args: - delay: Delay in seconds between requests to be respectful - """ - self.session = requests.Session() - self.session.headers.update( - { - "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36" - } - ) - self.delay = delay - self.base_url = "https://huggingface.co" - - def extract_model_content(self, url: str) -> Dict: - """ - Extract all relevant content from a Hugging Face model page. - - Args: - url: The Hugging Face model page URL - - Returns: - Dictionary containing extracted model information - """ - try: - # Add delay to be respectful - time.sleep(self.delay) - - response = self.session.get(url) - response.raise_for_status() - - soup = BeautifulSoup(response.content, "html.parser") - - model_data = { - "url": url, - "model_name": self._extract_model_name(soup, url), - "description": self._extract_description(soup), - "readme_content": self._extract_readme(soup), - "metadata": self._extract_metadata(soup), - "tags": self._extract_tags(soup), - "model_card": self._extract_model_card(soup), - "files": self._extract_files_info(soup), - "pipeline_tag": self._extract_pipeline_tag(soup), - "library_name": self._extract_library_name(soup), - } - - return model_data - - except requests.RequestException as e: - print(f"Error fetching URL {url}: {e}") - return {"error": str(e), "url": url} - except Exception as e: - print(f"Error processing content: {e}") - return {"error": str(e), "url": url} - - def _extract_model_name(self, soup: BeautifulSoup, url: str) -> str: - """Extract model name from the page.""" - # Try to get from title tag - title_tag = soup.find("title") - if title_tag: - title = title_tag.get_text().strip() - # Remove " · Hugging Face" suffix if present - if " · Hugging Face" in title: - return title.replace(" · Hugging Face", "").strip() - - # Fallback: extract from URL - return url.split("/")[-1] if url.split("/") else "Unknown" - - def _extract_description(self, soup: BeautifulSoup) -> str: - """Extract model description.""" - # Look for meta description - meta_desc = soup.find("meta", attrs={"name": "description"}) - if meta_desc and meta_desc.get("content"): - return meta_desc.get("content").strip() - - # Look for description in various possible locations - desc_selectors = [ - 'div[data-target="ModelHeader"] p', - ".model-card-description", - "div.text-gray-700", - "p.text-gray-600", - ] - - for selector in desc_selectors: - desc_elem = soup.select_one(selector) - if desc_elem: - return desc_elem.get_text().strip() - - return "" - - def _extract_readme(self, soup: BeautifulSoup) -> str: - """Extract README/model card content in original markdown format.""" - # First try to get raw markdown from API endpoint - raw_markdown = self._get_raw_markdown_from_api(soup) - if raw_markdown: - return raw_markdown - - # Fallback: Try to extract from page source - raw_markdown = self._extract_markdown_from_page(soup) - if raw_markdown: - return raw_markdown - - # Last resort: Convert HTML back to approximate markdown - return self._convert_html_to_markdown(soup) - - def _get_raw_markdown_from_api(self, soup: BeautifulSoup) -> str: - """Try to get raw markdown content from HuggingFace API.""" - try: - # Extract model path from current URL - current_url = soup.find("link", {"rel": "canonical"}) - if not current_url: - return "" - - url = current_url.get("href", "") - if not url: - return "" - - # Parse model owner/name from URL - parts = url.replace("https://huggingface.co/", "").split("/") - if len(parts) < 2: - return "" - - model_path = f"{parts[0]}/{parts[1]}" - - # Try to fetch raw README.md from the API - api_url = f"https://huggingface.co/{model_path}/raw/main/README.md" - - time.sleep(self.delay) # Be respectful - response = self.session.get(api_url) - - if response.status_code == 200: - return response.text - - except Exception as e: - print(f"Could not fetch raw markdown from API: {e}") - - return "" - - def _extract_markdown_from_page(self, soup: BeautifulSoup) -> str: - """Try to extract markdown from script tags or data attributes.""" - # Look for script tags that might contain markdown - scripts = soup.find_all("script") - for script in scripts: - if script.string: - # Look for markdown content in various formats - if "README.md" in script.string or "# " in script.string: - # Try to extract markdown from JSON data - try: - import json - - # Look for JSON that might contain markdown - json_matches = re.findall(r"\{.*?\}", script.string, re.DOTALL) - for match in json_matches: - try: - data = json.loads(match) - if isinstance(data, dict): - # Look for markdown in various keys - for key in ["content", "markdown", "readme", "text"]: - if key in data and isinstance(data[key], str): - if len(data[key]) > 100 and "#" in data[key]: - return data[key] - except json.JSONDecodeError: - continue - except: - pass - - return "" - - def _convert_html_to_markdown(self, soup: BeautifulSoup) -> str: - """Convert HTML content back to approximate markdown format.""" - # Look for the main content area - readme_selectors = [ - 'div[data-target="ModelHeader"] + div', - ".prose", - "div.markdown", - "article", - 'div[class*="readme"]', - "div.model-card", - ] - - content_elem = None - for selector in readme_selectors: - content_elem = soup.select_one(selector) - if content_elem: - break - - if not content_elem: - return "" - - # Convert HTML elements to markdown - markdown_content = [] - - for element in content_elem.find_all( - [ - "h1", - "h2", - "h3", - "h4", - "h5", - "h6", - "p", - "ul", - "ol", - "li", - "pre", - "code", - "blockquote", - "a", - "strong", - "em", - ] - ): - if element.name in ["h1", "h2", "h3", "h4", "h5", "h6"]: - level = int(element.name[1]) - markdown_content.append(f"{'#' * level} {element.get_text().strip()}\n") - - elif element.name == "p": - text = element.get_text().strip() - if text: - markdown_content.append(f"{text}\n") - - elif element.name == "pre": - code_text = element.get_text() - # Check if it's a code block - if element.find("code"): - markdown_content.append(f"```\n{code_text}\n```\n") - else: - markdown_content.append(f"```\n{code_text}\n```\n") - - elif element.name == "code" and element.parent.name != "pre": - markdown_content.append(f"`{element.get_text()}`") - - elif element.name == "ul": - for li in element.find_all("li", recursive=False): - markdown_content.append(f"- {li.get_text().strip()}\n") - markdown_content.append("\n") - - elif element.name == "ol": - for i, li in enumerate(element.find_all("li", recursive=False), 1): - markdown_content.append(f"{i}. {li.get_text().strip()}\n") - markdown_content.append("\n") - - elif element.name == "blockquote": - quote_text = element.get_text().strip() - for line in quote_text.split("\n"): - if line.strip(): - markdown_content.append(f"> {line.strip()}\n") - markdown_content.append("\n") - - elif element.name == "a": - href = element.get("href", "") - text = element.get_text().strip() - if href and text: - markdown_content.append(f"[{text}]({href})") - - elif element.name == "strong": - markdown_content.append(f"**{element.get_text()}**") - - elif element.name == "em": - markdown_content.append(f"*{element.get_text()}*") - - # Join and clean up the markdown - markdown_text = "".join(markdown_content) - - # Clean up extra newlines - markdown_text = re.sub(r"\n{3,}", "\n\n", markdown_text) - - return markdown_text.strip() - - def _extract_metadata(self, soup: BeautifulSoup) -> Dict: - """Extract model metadata like downloads, likes, etc.""" - metadata = {} - - # Look for download count - download_elem = soup.find(text=re.compile(r"\d+\s*downloads?")) - if download_elem: - downloads = re.search(r"([\d,]+)\s*downloads?", download_elem, re.I) - if downloads: - metadata["downloads"] = downloads.group(1).replace(",", "") - - # Look for likes - like_elem = soup.find(text=re.compile(r"\d+\s*likes?")) - if like_elem: - likes = re.search(r"(\d+)\s*likes?", like_elem, re.I) - if likes: - metadata["likes"] = likes.group(1) - - # Look for model size - size_elem = soup.find(text=re.compile(r"\d+\.?\d*\s*[KMGT]?B")) - if size_elem: - size = re.search(r"(\d+\.?\d*\s*[KMGT]?B)", size_elem) - if size: - metadata["model_size"] = size.group(1) - - return metadata - - def _extract_tags(self, soup: BeautifulSoup) -> List[str]: - """Extract model tags.""" - tags = [] - - # Look for tag elements - tag_selectors = ["span.tag", ".badge", "[data-tag]", 'span[class*="tag"]'] - - for selector in tag_selectors: - tag_elems = soup.select(selector) - for elem in tag_elems: - tag_text = elem.get_text().strip() - if tag_text and tag_text not in tags: - tags.append(tag_text) - - return tags - - def _extract_model_card(self, soup: BeautifulSoup) -> Dict: - """Extract structured model card information.""" - model_card = {} - - # Look for JSON-LD structured data - json_scripts = soup.find_all("script", type="application/ld+json") - for script in json_scripts: - try: - data = json.loads(script.string) - if isinstance(data, dict): - model_card.update(data) - except json.JSONDecodeError: - continue - - return model_card - - def _extract_files_info(self, soup: BeautifulSoup) -> List[Dict]: - """Extract information about model files.""" - files = [] - - # Look for file listings - file_elems = soup.select('a[href*="/blob/"], a[href*="/resolve/"]') - for elem in file_elems: - href = elem.get("href", "") - filename = href.split("/")[-1] if href else "" - if filename: - files.append({"filename": filename, "url": urljoin(self.base_url, href)}) - - return files - - def _extract_pipeline_tag(self, soup: BeautifulSoup) -> str: - """Extract the pipeline tag/task type.""" - # Look for pipeline tag in various locations - pipeline_selectors = ["[data-pipeline-tag]", 'span[class*="pipeline"]', 'div[class*="task"]'] - - for selector in pipeline_selectors: - elem = soup.select_one(selector) - if elem: - return elem.get("data-pipeline-tag") or elem.get_text().strip() - - return "" - - def _extract_library_name(self, soup: BeautifulSoup) -> str: - """Extract the library name (e.g., transformers, sentence-transformers).""" - # Look for library information - lib_elem = soup.find(text=re.compile(r"(transformers|sentence-transformers|diffusers|timm)", re.I)) - if lib_elem: - match = re.search(r"(transformers|sentence-transformers|diffusers|timm)", lib_elem, re.I) - if match: - return match.group(1).lower() - - return "" - - -def main(): - """Example usage of the scraper.""" - scraper = HuggingFaceModelScraper(delay=1.0) - - # Example URLs - test_urls = [ - "https://huggingface.co/sentence-transformers/all-MiniLM-L6-v2", - "https://huggingface.co/bert-base-uncased", - "https://huggingface.co/gpt2", - ] - - for url in test_urls: - print(f"\n{'='*60}") - print(f"Extracting content from: {url}") - print("=" * 60) - - model_data = scraper.extract_model_content(url) - - if "error" in model_data: - print(f"Error: {model_data['error']}") - continue - - # Print extracted information - print(f"Model Name: {model_data['model_name']}") - print(f"Description: {model_data['description'][:200]}...") - print(f"Pipeline Tag: {model_data['pipeline_tag']}") - print(f"Library: {model_data['library_name']}") - print(f"Tags: {', '.join(model_data['tags'][:5])}") # First 5 tags - print(f"Metadata: {model_data['metadata']}") - print(f"Files: {len(model_data['files'])} files found") - - if model_data["readme_content"]: - print(f"README (first 300 chars): {model_data['readme_content'][:300]}...") - - # Save to JSON file - safe_model_name = model_data["model_name"].replace("/", "_").replace("\\", "_") - output_filename = f"{safe_model_name}_data.json" - with open(output_filename, "w", encoding="utf-8") as f: - json.dump(model_data, f, indent=2, ensure_ascii=False) - print(f"Data saved to: {output_filename}") - - # Save README to separate .md file - if model_data["readme_content"]: - readme_filename = f"{safe_model_name}_README.md" - with open(readme_filename, "w", encoding="utf-8") as f: - f.write(model_data["readme_content"]) - print(f"README saved to: {readme_filename}") - else: - print("No README content found to save") - - -if __name__ == "__main__": - main() diff --git a/tools/get_hf_models.py b/tools/get_hf_models.py deleted file mode 100644 index a849d809..00000000 --- a/tools/get_hf_models.py +++ /dev/null @@ -1,326 +0,0 @@ -#!/usr/bin/env python3 -""" -Hugging Face Top Models Fetcher -Retrieves top K trending models and most downloaded models for each task -""" -import json -import time -from collections import defaultdict -from typing import Dict, List - -import pandas as pd -import requests - - -class HuggingFaceModelsFetcher: - def __init__(self): - self.base_url = "https://huggingface.co/api" - self.session = requests.Session() - self.session.headers.update({"User-Agent": "HF-Models-Fetcher/1.0"}) - - def get_models_by_task(self, task: str, sort_by: str = "likes", limit: int = 10) -> List[Dict]: - """Get models filtered by specific task""" - url = f"{self.base_url}/models" - params = {"pipeline_tag": task, "sort": sort_by, "limit": limit, "full": "true"} - - try: - response = self.session.get(url, params=params) - response.raise_for_status() - return response.json() - except requests.RequestException as e: - print(f"Error fetching models for task {task}: {e}") - return [] - - def get_all_pipeline_tags(self) -> List[str]: - """Get all available pipeline tags/tasks from Hugging Face""" - return [ - # Multimodal - "any-to-any", - "audio-text-to-text", - "document-question-answering", - "visual-document-retrieval", - "image-text-to-text", - "video-text-to-text", - "visual-question-answering", - # Natural Language Processing - "feature-extraction", - "fill-mask", - "question-answering", - "sentence-similarity", - "summarization", - "table-question-answering", - "text-classification", - "text-generation", - "text-ranking", - "token-classification", - "translation", - "zero-shot-classification", - # Computer Vision - "depth-estimation", - "image-classification", - "image-feature-extraction", - "image-segmentation", - "image-to-image", - "image-to-text", - "image-to-video", - "keypoint-detection", - "mask-generation", - "object-detection", - "video-classification", - "text-to-image", - "text-to-video", - "unconditional-image-generation", - "zero-shot-image-classification", - "zero-shot-object-detection", - "text-to-3d", - "image-to-3d", - # Audio - "audio-classification", - "audio-to-audio", - "automatic-speech-recognition", - "text-to-speech", - # Tabular - "tabular-classification", - "tabular-regression", - # Reinforcement Learning - "reinforcement-learning", - # Additional common tasks that might use different naming conventions - "conversational", - "text2text-generation", - "voice-activity-detection", - "time-series-forecasting", - "robotics", - "other", - ] - - def extract_model_info(self, model: Dict) -> Dict: - """Extract relevant information from model data""" - model_id = model.get("id", "") - return { - "model_id": model_id, - "url": f"https://huggingface.co/{model_id}" if model_id else "", - "author": model.get("author", ""), - "likes": model.get("likes", 0), - "downloads": model.get("downloads", 0), - "created_at": model.get("createdAt", ""), - "last_modified": model.get("lastModified", ""), - "pipeline_tag": model.get("pipeline_tag", ""), - "library_name": model.get("library_name", ""), - "tags": model.get("tags", []), - "model_size": model.get("safetensors", {}).get("total", 0) if model.get("safetensors") else 0, - } - - def get_top_models_all_tasks( - self, n: int = 10, sort_by: str = "likes", include_downloads: bool = True - ) -> Dict[str, List[Dict]]: - """ - Get top N models for each task - - Args: - n: Number of top models to fetch per task - sort_by: Sort criteria ('likes', 'downloads', 'modified', 'created') - include_downloads: Whether to also fetch top downloaded models - - Returns: - Dictionary with task names as keys and list of model info as values - """ - all_tasks_models = defaultdict(list) - pipeline_tags = self.get_all_pipeline_tags() - - print(f"Fetching top {n} models for {len(pipeline_tags)} tasks...") - - for i, task in enumerate(pipeline_tags): - print(f"Processing task {i+1}/{len(pipeline_tags)}: {task}") - - # Get top models by specified criteria (likes by default) - models = self.get_models_by_task(task, sort_by=sort_by, limit=n) - - if models: - task_models = [] - for model in models[:n]: # Ensure we only get top N - model_info = self.extract_model_info(model) - model_info["sort_criteria"] = sort_by - task_models.append(model_info) - - all_tasks_models[task] = task_models - print(f" Found {len(task_models)} models for {task}") - else: - print(f" No models found for {task}") - - # Add small delay to be respectful to the API - time.sleep(0.1) - - return dict(all_tasks_models) - - def merge_top_models( - self, liked_models: Dict[str, List[Dict]], downloaded_models: Dict[str, List[Dict]], max_per_task: int = 15 - ) -> Dict[str, List[Dict]]: - """ - Merge liked and downloaded models, removing duplicates and keeping top models - - Args: - liked_models: Dictionary of models sorted by likes - downloaded_models: Dictionary of models sorted by downloads - max_per_task: Maximum number of models to keep per task - - Returns: - Dictionary of merged top models per task - """ - merged_models = defaultdict(list) - - # Get all unique tasks - all_tasks = set(liked_models.keys()) | set(downloaded_models.keys()) - - for task in all_tasks: - liked_list = liked_models.get(task, []) - downloaded_list = downloaded_models.get(task, []) - - # Create a dictionary to track unique models by model_id - unique_models = {} - - # Add liked models first - for model in liked_list: - model_id = model["model_id"] - if model_id not in unique_models: - model_copy = model.copy() - model_copy["source"] = "liked" - unique_models[model_id] = model_copy - - # Add downloaded models, updating existing entries - for model in downloaded_list: - model_id = model["model_id"] - if model_id in unique_models: - # Model exists in both lists, mark as both - unique_models[model_id]["source"] = "both" - else: - # New model from downloads - model_copy = model.copy() - model_copy["source"] = "downloaded" - unique_models[model_id] = model_copy - - # Convert back to list and sort by a composite score - models_list = list(unique_models.values()) - - # Sort by composite score: prioritize models that appear in both lists, - # then by likes + normalized downloads - def composite_score(model): - base_score = model.get("likes", 0) + (model.get("downloads", 0) / 1000) # Normalize downloads - if model["source"] == "both": - base_score *= 1.5 # Bonus for appearing in both lists - return base_score - - models_list.sort(key=composite_score, reverse=True) - - # Keep only top models per task - merged_models[task] = models_list[:max_per_task] - - print(f"Task {task}: {len(models_list)} unique models merged, keeping top {len(merged_models[task])}") - - return dict(merged_models) - - def save_to_json(self, data: Dict, filename: str = "top_hf_models.json"): - """Save results to JSON file""" - try: - with open(filename, "w", encoding="utf-8") as f: - json.dump(data, f, indent=2, ensure_ascii=False) - print(f"Results saved to {filename}") - except Exception as e: - print(f"Error saving to JSON: {e}") - - def save_to_csv(self, data: Dict, filename: str = "top_hf_models.csv"): - """Save results to CSV file""" - try: - # Flatten the data for CSV format - rows = [] - for task, models in data.items(): - for model in models: - row = model.copy() - row["task"] = task - rows.append(row) - - df = pd.DataFrame(rows) - df.to_csv(filename, index=False) - print(f"Results saved to {filename}") - except Exception as e: - print(f"Error saving to CSV: {e}") - - def print_summary(self, data: Dict, title: str = "TOP MODELS BY TASK"): - """Print a summary of the results""" - print("\n" + "=" * 60) - print(title) - print("=" * 60) - - total_models = sum(len(models) for models in data.values()) - print(f"Total tasks processed: {len(data)}") - print(f"Total models found: {total_models}") - - print("\nTop 3 most liked models overall:") - all_models = [] - for task, models in data.items(): - for model in models: - model_copy = model.copy() - model_copy["task"] = task - all_models.append(model_copy) - - # Sort by likes - top_overall = sorted(all_models, key=lambda x: x.get("likes", 0), reverse=True)[:3] - for i, model in enumerate(top_overall, 1): - source_info = f" ({model.get('source', 'unknown')})" if "source" in model else "" - print(f"{i}. {model['model_id']} ({model['task']}) - {model['likes']} likes{source_info}") - - print("\nTasks with most models available:") - task_counts = [(task, len(models)) for task, models in data.items()] - task_counts.sort(key=lambda x: x[1], reverse=True) - for task, count in task_counts[:5]: - print(f" {task}: {count} models") - - # Show source distribution if available - if all_models and "source" in all_models[0]: - source_counts = defaultdict(int) - for model in all_models: - source_counts[model.get("source", "unknown")] += 1 - - print("\nModel source distribution:") - for source, count in source_counts.items(): - print(f" {source}: {count} models") - - -def main(): - """Main function to demonstrate usage""" - fetcher = HuggingFaceModelsFetcher() - - # Get top models for each task - k = 3 - print(f"Fetching top {k} most liked models for each task...") - top_liked_models = fetcher.get_top_models_all_tasks(n=k, sort_by="likes") - - print(f"\nFetching top {k} most downloaded models for each task...") - top_downloaded_models = fetcher.get_top_models_all_tasks(n=k, sort_by="downloads") - - # Merge the results - print("\nMerging liked and downloaded models...") - merged_top_models = fetcher.merge_top_models( - top_liked_models, - top_downloaded_models, - max_per_task=2 * k, - ) - - # Print summaries - fetcher.print_summary(top_liked_models, "TOP LIKED MODELS BY TASK") - fetcher.print_summary(top_downloaded_models, "TOP DOWNLOADED MODELS BY TASK") - fetcher.print_summary(merged_top_models, "MERGED TOP MODELS BY TASK") - - # Save results - # fetcher.save_to_json(merged_top_models, f'top_merged_models_by_task.json') - fetcher.save_to_csv(merged_top_models, f"top_{k}_merged_models_by_task.csv") - - print(f"\n{'='*60}") - print("PROCESSING COMPLETE") - print(f"{'='*60}") - print("Files saved:") - # print("- top_merged_models_by_task.json") - print(f"- top_{k}_merged_models_by_task.csv") - - -if __name__ == "__main__": - main() diff --git a/tools/register_huggingface.py b/tools/register_huggingface.py new file mode 100644 index 00000000..ca60972a --- /dev/null +++ b/tools/register_huggingface.py @@ -0,0 +1,31 @@ +from autogluon.assistant.tools_registry.register_huggingface import HuggingFaceToolRegistrar + + +def main(): + """ + Main function to run the registration process. + """ + import argparse + + parser = argparse.ArgumentParser(description="Register Hugging Face as an ML tool") + parser.add_argument( + "--output-dir", default="hf_tutorials", help="Directory to save tutorial files (default: hf_tutorials)" + ) + parser.add_argument( + "--top-models", type=int, default=2, help="Number of top models to fetch per task (default: 2)" + ) + parser.add_argument("--debug", action="store_true", help="Enable debug logging") + + args = parser.parse_args() + + if args.debug: + logging.getLogger().setLevel(logging.DEBUG) + + # Create registrar and run the process + registrar = HuggingFaceToolRegistrar(output_dir=args.output_dir, top_models_per_task=args.top_models) + + registrar.run_registration_process() + + +if __name__ == "__main__": + main()