From fcfd340a2473955cee1162eec8b1736ba041557f Mon Sep 17 00:00:00 2001 From: Jaychaware Date: Mon, 28 Jul 2025 13:38:29 +0530 Subject: [PATCH 01/11] Initial changes --- db_init.sql | 49 +- server/app/db/dbmodels.py | 27 +- server/app/db/dbqueries.py | 117 ++++- .../db/migrations/001_create_label_index.sql | 124 +++++ server/app/main.py | 263 +++++++++- server/app/utils.py | 492 ++++++++++++++++++ ui/src/components/ArtifactPTable/index.jsx | 28 +- 7 files changed, 1089 insertions(+), 11 deletions(-) create mode 100644 server/app/db/migrations/001_create_label_index.sql diff --git a/db_init.sql b/db_init.sql index cb67ee88b..63e8b9118 100644 --- a/db_init.sql +++ b/db_init.sql @@ -1,7 +1,46 @@ -CREATE TABLE IF NOT EXISTS registered_servers( - id SERIAL, - server_name VARCHAR(255) NOT NULL, - host_info VARCHAR(255) NOT NULL PRIMARY KEY, - last_sync_time BIGINT DEFAULT NULL +-- Label indexing table for full-text search of CSV label content +CREATE TABLE IF NOT EXISTS label_index ( + id SERIAL PRIMARY KEY, + file_name VARCHAR(255) NOT NULL, + file_path TEXT NOT NULL, + row_index INTEGER NOT NULL, + content TEXT NOT NULL, + -- PostgreSQL automatically uses EXTENDED strategy: + -- - Allows compression AND out-of-line storage + -- - Compresses first, then moves to TOAST if still large + metadata JSONB, + search_vector TSVECTOR, + created_at BIGINT NOT NULL, + updated_at BIGINT NOT NULL, + + -- Unique constraint to prevent duplicate entries + CONSTRAINT unique_label_file_row UNIQUE (file_name, row_index) ); +-- Create indexes for performance +CREATE INDEX IF NOT EXISTS idx_label_index_file_name ON label_index(file_name); +CREATE INDEX IF NOT EXISTS idx_label_index_created_at ON label_index(created_at); + +-- Create GIN index for full-text search (most important for performance) +CREATE INDEX IF NOT EXISTS idx_label_index_search_vector ON label_index USING gin(search_vector); + +-- Create a trigger to automatically update the search_vector column +CREATE OR REPLACE FUNCTION update_label_search_vector() RETURNS trigger AS $$ +BEGIN + NEW.search_vector := to_tsvector('english', COALESCE(NEW.content, '')); + NEW.updated_at := EXTRACT(EPOCH FROM NOW()) * 1000; + RETURN NEW; +END; +$$ LANGUAGE plpgsql; + +-- Create the trigger (only if table exists) +DO $$ +BEGIN + IF EXISTS (SELECT 1 FROM information_schema.tables WHERE table_name = 'label_index') THEN + DROP TRIGGER IF EXISTS trigger_update_label_search_vector ON label_index; + CREATE TRIGGER trigger_update_label_search_vector + BEFORE INSERT OR UPDATE ON label_index + FOR EACH ROW EXECUTE FUNCTION update_label_search_vector(); + END IF; +END $$; + diff --git a/server/app/db/dbmodels.py b/server/app/db/dbmodels.py index b11372746..bd833e134 100644 --- a/server/app/db/dbmodels.py +++ b/server/app/db/dbmodels.py @@ -11,8 +11,10 @@ Index, UniqueConstraint, MetaData, - SmallInteger + SmallInteger, + JSON ) +from sqlalchemy.dialects.postgresql import TSVECTOR metadata = MetaData() @@ -185,4 +187,27 @@ # Unique Constraint UniqueConstraint("artifact_id", "execution_id", "type", name="uniqueevent") +) + + +# Label indexing table for PostgreSQL full-text search +label_index = Table( + "label_index", metadata, + Column("id", Integer, primary_key=True, nullable=False), + Column("file_name", String(255), nullable=False), + Column("file_path", Text, nullable=False), + Column("row_index", Integer, nullable=False), + Column("content", Text, nullable=False), + Column("metadata", JSON), + Column("search_vector", TSVECTOR), + Column("created_at", BigInteger, nullable=False), + Column("updated_at", BigInteger, nullable=False), + + # Indexes for performance + Index("idx_label_index_file_name", "file_name"), + Index("idx_label_index_search_vector", "search_vector", postgresql_using="gin"), + Index("idx_label_index_created_at", "created_at"), + + # Unique constraint to prevent duplicate entries + UniqueConstraint("file_name", "row_index", name="unique_label_file_row") ) \ No newline at end of file diff --git a/server/app/db/dbqueries.py b/server/app/db/dbqueries.py index 792dd835b..eb7c68dd9 100644 --- a/server/app/db/dbqueries.py +++ b/server/app/db/dbqueries.py @@ -1,7 +1,7 @@ from sqlalchemy.ext.asyncio import AsyncSession from fastapi import Depends from server.app.db.dbconfig import get_db -from sqlalchemy import select, func, text, String, bindparam, case, distinct +from sqlalchemy import select, func, String, distinct, text from server.app.db.dbmodels import ( artifact, artifactproperty, @@ -13,6 +13,7 @@ execution, executionproperty, event, + label_index, ) async def register_server_details(db: AsyncSession, server_name: str, host_info: str): @@ -312,3 +313,117 @@ async def fetch_executions( "total_items": total_record, "items": [dict(row) for row in rows] } + + +async def search_labels_in_artifacts(db: AsyncSession, filter_value: str, pipeline_name: str = None, limit: int = 50): + """ + Search for artifacts that have labels matching the filter value. + This function searches within label CSV content using PostgreSQL full-text search. + Works with or without explicit labels_uri properties. + """ + try: + # First, try to search labels directly and return any matching content + # This approach works even if artifacts don't have labels_uri properties + base_query = """ + SELECT DISTINCT + li.file_name as label_file, + li.content as matching_content, + li.metadata as label_metadata, + li.row_index, + ts_rank(li.search_vector, plainto_tsquery('english', :filter_value)) as relevance_score + FROM label_index li + WHERE li.search_vector @@ plainto_tsquery('english', :filter_value) + ORDER BY relevance_score DESC + LIMIT :limit + """ + + params = {"filter_value": filter_value, "limit": limit} + + result = await db.execute(text(base_query), params) + label_results = result.mappings().all() + + # Convert label results to a format compatible with artifact results + converted_results = [] + for label_result in label_results: + converted_results.append({ + 'artifact_id': None, # No specific artifact ID + 'name': f"Label Match: {label_result['label_file']}", + 'uri': f"label://{label_result['label_file']}#{label_result['row_index']}", + 'type_id': None, + 'create_time_since_epoch': None, + 'last_update_time_since_epoch': None, + 'label_file': label_result['label_file'], + 'matching_content': label_result['matching_content'], + 'label_metadata': label_result['label_metadata'], + 'relevance_score': float(label_result['relevance_score']) + }) + + return converted_results + + except Exception as e: + print(f"Label search error: {e}") + return [] + + +async def fetch_artifacts_with_label_search( + db: AsyncSession, + pipeline_name: str, + artifact_type: str, + filter_value: str, + active_page: int = 1, + page_size: int = 5, + sort_column: str = "name", + sort_order: str = "ASC" +): + """ + Enhanced artifact search that includes label content search. + This combines regular artifact search with label content search. + """ + # First, get regular artifact search results + artifact_results = await fetch_artifacts( + db, pipeline_name, artifact_type, filter_value, + active_page, page_size, sort_column, sort_order + ) + + # If filter_value is provided, also search in labels + if filter_value and filter_value.strip(): + try: + label_results = await search_labels_in_artifacts(db, filter_value, pipeline_name, 50) + + # Add label search results as separate items (since they don't correspond to existing artifacts) + if active_page == 1 and label_results: # Only add on first page + added_count = 0 + max_additional = max(0, page_size - len(artifact_results['items'])) + + for label_result in label_results: + if added_count < max_additional: + # Create a pseudo-artifact item from label search result + # Make sure all fields have non-null values that frontend expects + enhanced_item = { + 'artifact_id': f"label_{label_result['label_file']}_{label_result.get('row_index', 0)}", + 'name': f"{label_result['label_file']} (Row {label_result.get('row_index', 0) + 1})", + 'uri': label_result.get('uri', f"label://{label_result['label_file']}"), + 'type_id': 'Label', + 'create_time_since_epoch': 0, # Use 0 instead of None + 'last_update_time_since_epoch': 0, # Use 0 instead of None + 'artifact_properties': [], # Empty array instead of None + 'execution': '', # Empty string instead of None + 'label_match': True, + 'matching_label_content': label_result['matching_content'], + 'label_file': label_result['label_file'], + 'label_metadata': label_result.get('label_metadata', '{}'), + 'relevance_score': float(label_result['relevance_score']) + } + artifact_results['items'].append(enhanced_item) + added_count += 1 + + # Update total count if we added items + if added_count > 0: + artifact_results['total_items'] += added_count + print(f"Added {added_count} label search results to artifacts") + + except Exception as e: + print(f"Error in label search integration: {e}") + # Continue with regular results if label search fails + + return artifact_results diff --git a/server/app/db/migrations/001_create_label_index.sql b/server/app/db/migrations/001_create_label_index.sql new file mode 100644 index 000000000..fcb302bea --- /dev/null +++ b/server/app/db/migrations/001_create_label_index.sql @@ -0,0 +1,124 @@ +-- Migration: Create label_index table for full-text search of CSV label content +-- This migration adds the label_index table to support searching CSV label content +-- through the existing artifact search functionality + +-- Create the label_index table +CREATE TABLE IF NOT EXISTS label_index ( + id SERIAL PRIMARY KEY, + file_name VARCHAR(255) NOT NULL, + file_path TEXT NOT NULL, + row_index INTEGER NOT NULL, + content TEXT NOT NULL, + metadata JSONB, + search_vector TSVECTOR, + created_at BIGINT NOT NULL, + updated_at BIGINT NOT NULL, + + -- Unique constraint to prevent duplicate entries + CONSTRAINT unique_label_file_row UNIQUE (file_name, row_index) +); + +-- Create indexes for performance +CREATE INDEX IF NOT EXISTS idx_label_index_file_name ON label_index(file_name); +CREATE INDEX IF NOT EXISTS idx_label_index_created_at ON label_index(created_at); + +-- Create GIN index for full-text search (most important for performance) +CREATE INDEX IF NOT EXISTS idx_label_index_search_vector ON label_index USING gin(search_vector); + +-- Create a trigger to automatically update the search_vector column +-- This trigger will populate the tsvector column whenever content is inserted or updated +CREATE OR REPLACE FUNCTION update_label_search_vector() RETURNS trigger AS $$ +BEGIN + NEW.search_vector := to_tsvector('english', COALESCE(NEW.content, '')); + NEW.updated_at := EXTRACT(EPOCH FROM NOW()) * 1000; + RETURN NEW; +END; +$$ LANGUAGE plpgsql; + +-- Create the trigger +DROP TRIGGER IF EXISTS trigger_update_label_search_vector ON label_index; +CREATE TRIGGER trigger_update_label_search_vector + BEFORE INSERT OR UPDATE ON label_index + FOR EACH ROW EXECUTE FUNCTION update_label_search_vector(); + +-- Create a function to search labels with ranking +CREATE OR REPLACE FUNCTION search_labels(search_query TEXT, result_limit INTEGER DEFAULT 10) +RETURNS TABLE ( + file_name VARCHAR(255), + row_index INTEGER, + content TEXT, + metadata JSONB, + relevance_score REAL +) AS $$ +BEGIN + RETURN QUERY + SELECT + li.file_name, + li.row_index, + li.content, + li.metadata, + ts_rank(li.search_vector, plainto_tsquery('english', search_query))::REAL as relevance_score + FROM label_index li + WHERE li.search_vector @@ plainto_tsquery('english', search_query) + ORDER BY relevance_score DESC + LIMIT result_limit; +END; +$$ LANGUAGE plpgsql; + +-- Create a function to find artifacts with matching labels +CREATE OR REPLACE FUNCTION find_artifacts_with_label_matches(search_query TEXT, result_limit INTEGER DEFAULT 10) +RETURNS TABLE ( + artifact_id INTEGER, + artifact_name VARCHAR(255), + artifact_uri TEXT, + label_file VARCHAR(255), + matching_content TEXT, + relevance_score REAL +) AS $$ +BEGIN + RETURN QUERY + SELECT DISTINCT + a.id as artifact_id, + a.name as artifact_name, + a.uri as artifact_uri, + li.file_name as label_file, + li.content as matching_content, + ts_rank(li.search_vector, plainto_tsquery('english', search_query))::REAL as relevance_score + FROM artifact a + JOIN artifactproperty ap ON a.id = ap.artifact_id + JOIN label_index li ON SPLIT_PART(ap.string_value, ':', 1) = li.file_name + WHERE ap.name = 'labels_uri' + AND li.search_vector @@ plainto_tsquery('english', search_query) + ORDER BY relevance_score DESC + LIMIT result_limit; +END; +$$ LANGUAGE plpgsql; + +-- Add comments for documentation +COMMENT ON TABLE label_index IS 'Stores indexed CSV label content for full-text search integration with artifact search'; +COMMENT ON COLUMN label_index.file_name IS 'Name of the CSV label file (without path)'; +COMMENT ON COLUMN label_index.file_path IS 'Full path to the CSV label file'; +COMMENT ON COLUMN label_index.row_index IS 'Row number within the CSV file (0-based)'; +COMMENT ON COLUMN label_index.content IS 'Concatenated content of the CSV row for search'; +COMMENT ON COLUMN label_index.metadata IS 'Additional metadata about the label entry (JSON format)'; +COMMENT ON COLUMN label_index.search_vector IS 'PostgreSQL tsvector for full-text search'; +COMMENT ON COLUMN label_index.created_at IS 'Timestamp when the record was created (milliseconds since epoch)'; +COMMENT ON COLUMN label_index.updated_at IS 'Timestamp when the record was last updated (milliseconds since epoch)'; + +-- Grant permissions (adjust as needed for your setup) +-- GRANT SELECT, INSERT, UPDATE, DELETE ON label_index TO your_app_user; +-- GRANT USAGE, SELECT ON SEQUENCE label_index_id_seq TO your_app_user; + +-- Example usage queries (for testing): +-- +-- 1. Search for labels containing "data": +-- SELECT * FROM search_labels('data', 5); +-- +-- 2. Find artifacts with labels containing "training": +-- SELECT * FROM find_artifacts_with_label_matches('training', 10); +-- +-- 3. Manual search with custom ranking: +-- SELECT file_name, content, ts_rank(search_vector, plainto_tsquery('english', 'your_search_term')) as rank +-- FROM label_index +-- WHERE search_vector @@ plainto_tsquery('english', 'your_search_term') +-- ORDER BY rank DESC; diff --git a/server/app/main.py b/server/app/main.py index 848e84dc7..298cc1dbb 100644 --- a/server/app/main.py +++ b/server/app/main.py @@ -2,6 +2,7 @@ import io import time import zipfile +import csv from fastapi import FastAPI, Request, HTTPException, Query, UploadFile, File, Depends from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import HTMLResponse, PlainTextResponse, StreamingResponse @@ -34,7 +35,9 @@ register_server_details, get_registered_server_details, get_sync_status, - update_sync_status + update_sync_status, + fetch_artifacts_with_label_search, + search_labels_in_artifacts ) from pathlib import Path import os @@ -49,12 +52,52 @@ ExecutionRequest, ) import httpx +import logging from jsonpath_ng.ext import parse from cmflib.cmf_federation import update_mlmd +from server.app.db.dbconfig import DATABASE_URL, async_session +from server.app.utils import ( + auto_reindex_if_needed, + search_labels, + get_label_stats, + index_csv_labels, + index_csv_labels_with_hash +) server_store_path = "/cmf-server/data/postgres_data" query = CmfQuery(is_server=True) +async def initialize_label_search(): + """Initialize label search functionality on startup""" + try: + logger = logging.getLogger(__name__) + logger.info("Initializing label search functionality...") + + # Check if labels directory exists and has CSV files + labels_dir = Path("/cmf-server/data/labels") + if labels_dir.exists(): + csv_files = list(labels_dir.glob("*.csv")) + if csv_files: + logger.info(f"Found {len(csv_files)} CSV label files") + + # Check if we need to index (if no records exist) + stats = await get_label_stats(DATABASE_URL) + if stats['total_records'] == 0: + logger.info("Indexing label files...") + result = await index_csv_labels(DATABASE_URL) + logger.info(f"Indexed {result.get('total_records', 0)} records from {result.get('total_files', 0)} files") + else: + logger.info(f"Label search ready: {stats['total_records']} records indexed") + else: + logger.info("No CSV label files found in /cmf-server/data/labels") + else: + logger.info("Labels directory not found, label search will be available when files are added") + + except Exception as e: + logger = logging.getLogger(__name__) + logger.warning(f"Label search initialization failed: {e}") + logger.info("Label search will be available once configured properly") + #global variables dict_of_art_ids = {} dict_of_exe_ids = {} @@ -71,6 +114,10 @@ async def lifespan(app: FastAPI): dict_of_exe_ids = await async_api(get_all_exe_ids, query) # loaded artifact ids into memory dict_of_art_ids = await async_api(get_all_artifact_ids, query, dict_of_exe_ids) + + # Initialize label search functionality + await initialize_label_search() + yield dict_of_art_ids.clear() dict_of_exe_ids.clear() @@ -133,6 +180,16 @@ async def mlmd_push(info: MLMDPushRequest): # async function await update_global_exe_dict(pipeline_name) await update_global_art_dict(pipeline_name) + + # Auto-reindex labels after artifact push + try: + logger = logging.getLogger(__name__) + reindex_result = await auto_reindex_if_needed(DATABASE_URL) + if reindex_result['status'] == 'reindexed': + logger.info(f"Auto-reindexed after artifact push: {reindex_result['message']}") + except Exception as e: + logger = logging.getLogger(__name__) + logger.warning(f"Auto-reindex after push failed: {e}") finally: lock_counts[pipeline_name] -= 1 # Decrement the reference count after lock released if lock_counts[pipeline_name] == 0: #if lock_counts of pipeline is zero means lock is release from it @@ -171,6 +228,7 @@ async def get_artifacts( query_params: ArtifactRequest = Depends(), db: AsyncSession = Depends(get_db) ): + start_time = time.time() filter_value = query_params.filter_value active_page = query_params.active_page @@ -178,8 +236,38 @@ async def get_artifacts( sort_order = query_params.sort_order record_per_page = query_params.record_per_page - """Retrieve paginated artifacts with filtering, sorting, and full-text search.""" - return await fetch_artifacts(db, pipeline_name, artifact_type, filter_value, active_page, record_per_page, sort_field, sort_order) + """Retrieve paginated artifacts with filtering, sorting, and full-text search including label content.""" + + # Auto-reindex labels if needed (only on first page to avoid performance issues) + reindex_time = 0 + if active_page == 1: + try: + logger = logging.getLogger(__name__) + reindex_start = time.time() + reindex_result = await auto_reindex_if_needed(DATABASE_URL) + reindex_time = time.time() - reindex_start + if reindex_result['status'] == 'reindexed': + logger.info(f"{reindex_result['message']}") + except Exception as e: + logger = logging.getLogger(__name__) + logger.warning(f"Auto-reindex failed: {e}") + + query_start = time.time() + result = await fetch_artifacts_with_label_search(db, pipeline_name, artifact_type, filter_value, active_page, record_per_page, sort_field, sort_order) + query_time = time.time() - query_start + + total_time = time.time() - start_time + + # Add timing information to the response + result["timing"] = { + "total_time_ms": round(total_time * 1000, 2), + "query_time_ms": round(query_time * 1000, 2), + "reindex_time_ms": round(reindex_time * 1000, 2) if reindex_time > 0 else 0, + "filter_value": filter_value, + "has_label_search": bool(filter_value and filter_value.strip()) + } + + return result # api to display executions available in mlmd file[from postgres] @@ -796,3 +884,172 @@ async def artifact_lineage(request: Request, pipeline_name: str): else: return None """ + +# Label Search Management Endpoints +@app.post("/api/labels/reindex") +async def reindex_labels(): + """Reindex all label files - useful when CSV files are updated""" + try: + result = await index_csv_labels_with_hash(DATABASE_URL) + + return { + "status": "success", + "message": f"Reindexed {result['total_files']} files with {result.get('total_records', 0)} records", + "details": result + } + + except Exception as e: + raise HTTPException(status_code=500, detail=f"Reindexing failed: {str(e)}") + +@app.get("/api/labels/status") +async def get_label_search_status(): + """Get the current status of label search functionality""" + try: + # Check labels directory + labels_dir = Path("/cmf-server/data/labels") + csv_files = list(labels_dir.glob("*.csv")) if labels_dir.exists() else [] + + # Check database + stats = await get_label_stats(DATABASE_URL) + + return { + "status": "active" if stats['total_records'] > 0 else "inactive", + "labels_directory": str(labels_dir), + "csv_files_found": len(csv_files), + "indexed_files": stats['total_files'], + "indexed_records": stats['total_records'], + "files": [f.name for f in csv_files] + } + + except Exception as e: + return { + "status": "error", + "error": str(e) + } + +@app.post("/api/labels/test") +async def test_label_search(): + """Test label search functionality with sample data""" + try: + # Create sample test data + sample_data = [ + {"id": 1, "category": "training", "type": "dataset", "accuracy": 0.95}, + {"id": 2, "category": "validation", "type": "dataset", "accuracy": 0.87}, + {"id": 3, "category": "test", "type": "model", "performance": "high"}, + ] + + # Create temporary CSV file + labels_dir = Path("/cmf-server/data/labels") + labels_dir.mkdir(parents=True, exist_ok=True) + + test_file = labels_dir / "test_labels.csv" + with open(test_file, 'w', newline='') as csvfile: + fieldnames = sample_data[0].keys() + writer = csv.DictWriter(csvfile, fieldnames=fieldnames) + writer.writeheader() + writer.writerows(sample_data) + + # Index the test file + result = await index_csv_labels(DATABASE_URL) + + # Test search + search_results = await search_labels(DATABASE_URL, "training", 5) + + return { + "status": "success", + "message": "Label search test completed successfully", + "indexing_result": result, + "search_results": search_results, + "test_file": str(test_file) + } + + except Exception as e: + return { + "status": "error", + "message": f"Label search test failed: {str(e)}" + } + +@app.get("/api/labels/search") +async def search_label_content(query: str = Query(..., description="Search query"), limit: int = Query(10, description="Maximum results")): + """Search label content using PostgreSQL full-text search""" + start_time = time.time() + + try: + + search_start = time.time() + results = await search_labels(DATABASE_URL, query, limit) + search_time = time.time() - search_start + + total_time = time.time() - start_time + + return { + "status": "success", + "query": query, + "results": results, + "total_results": len(results), + "timing": { + "total_time_ms": round(total_time * 1000, 2), + "search_time_ms": round(search_time * 1000, 2), + "query": query, + "limit": limit + } + } + + except Exception as e: + raise HTTPException(status_code=500, detail=f"Search failed: {str(e)}") + +@app.get("/api/labels/search-direct") +async def search_labels_direct(query: str = Query(..., description="Search query"), limit: int = Query(10, description="Maximum results")): + """Direct label search that returns label matches as pseudo-artifacts""" + start_time = time.time() + + try: + + db_start = time.time() + async with async_session() as db: + search_start = time.time() + results = await search_labels_in_artifacts(db, query, None, limit) + search_time = time.time() - search_start + db_time = time.time() - db_start + + total_time = time.time() - start_time + + return { + "status": "success", + "query": query, + "results": results, + "total_results": len(results), + "timing": { + "total_time_ms": round(total_time * 1000, 2), + "db_time_ms": round(db_time * 1000, 2), + "search_time_ms": round(search_time * 1000, 2), + "query": query, + "limit": limit, + "uses_sqlalchemy_core": True + } + } + + except Exception as e: + raise HTTPException(status_code=500, detail=f"Direct label search failed: {str(e)}") + +@app.get("/api/labels/health") +async def label_search_health(): + """Health check for label search functionality""" + try: + stats = await get_label_stats(DATABASE_URL) + + return { + "status": "healthy" if stats['status'] == 'success' else "unhealthy", + "service": "label-search-postgres", + "version": "1.0.0", + "database": "postgresql", + "indexed_files": stats['total_files'], + "indexed_records": stats['total_records'] + } + + except Exception as e: + return { + "status": "unhealthy", + "service": "label-search-postgres", + "error": str(e) + } diff --git a/server/app/utils.py b/server/app/utils.py index c85d4a69d..bea2997e9 100644 --- a/server/app/utils.py +++ b/server/app/utils.py @@ -1,3 +1,23 @@ +# Standard library imports +import asyncio +import csv +import json +import time +import os +import hashlib +import re +import logging +from pathlib import Path +from typing import List, Dict, Any, Optional + +# Third-party imports +from sqlalchemy.ext.asyncio import create_async_engine +from sqlalchemy import text + +# Set up logger +logger = logging.getLogger(__name__) + + def modify_arti_name(arti_name, type): # artifact_name optimization based on artifact type.["Dataset","Model","Metrics"] try: @@ -64,4 +84,476 @@ def modify_arti_name(arti_name, type): print(f"Error parsing artifact name: {e}") name = arti_name # Fallback to the original arti_name in case of error return name + + +# Label Search Utility Functions + +async def index_csv_labels(database_url: str, labels_directory: str = "/cmf-server/data/labels") -> Dict[str, Any]: + """ + Index CSV label files into PostgreSQL for full-text search + """ + try: + + labels_dir = Path(labels_directory) + if not labels_dir.exists(): + return {"status": "error", "message": f"Labels directory not found: {labels_directory}"} + + # Look for both .csv files and files without extension (assuming they're CSV) + csv_files = list(labels_dir.glob("*.csv")) + + # Also include files without extension that might be CSV files + for file_path in labels_dir.iterdir(): + if file_path.is_file() and not file_path.suffix and file_path.name not in [f.stem for f in csv_files]: + # Try to detect if it's a CSV file by reading first few lines + try: + with open(file_path, 'r', encoding='utf-8') as f: + first_line = f.readline().strip() + # Simple heuristic: if it contains commas or common CSV patterns, treat as CSV + if ',' in first_line or any(keyword in first_line.lower() for keyword in ['file', 'name', 'id', 'type', 'size']): + csv_files.append(file_path) + logger.info(f"Detected CSV-like file without extension: {file_path.name}") + except: + pass # Skip files that can't be read + + if not csv_files: + return {"status": "warning", "message": "No CSV files found", "indexed_files": [], "total_files": 0} + + engine = create_async_engine(database_url, echo=False) + indexed_files = [] + total_records = 0 + + async with engine.begin() as conn: + for csv_file in csv_files: + try: + # Clear existing data for this file + await conn.execute( + text("DELETE FROM label_index WHERE file_name = :file_name"), + {"file_name": csv_file.name} + ) + + # Process CSV file + records = [] + with open(csv_file, 'r', encoding='utf-8') as csvfile: + # Detect CSV format with fallback + sample = csvfile.read(1024) + csvfile.seek(0) + + # Try to detect delimiter, with fallbacks + delimiter = ',' # Default fallback + try: + sniffer = csv.Sniffer() + detected_delimiter = sniffer.sniff(sample).delimiter + delimiter = detected_delimiter + except: + # Fallback: try common delimiters + for test_delimiter in [',', '\t', ';', '|']: + if test_delimiter in sample: + delimiter = test_delimiter + break + + reader = csv.DictReader(csvfile, delimiter=delimiter) + + for row_index, row in enumerate(reader): + # Create searchable content + content_parts = [] + metadata = {} + + for key, value in row.items(): + if value and value.strip(): + content_parts.append(f"{key}: {value.strip()}") + metadata[key] = value.strip() + + content = " | ".join(content_parts) + + if content.strip(): + records.append({ + 'file_name': csv_file.name, + 'file_path': str(csv_file), + 'row_index': row_index, + 'content': content, + 'metadata': json.dumps(metadata), + 'created_at': int(time.time() * 1000), + 'updated_at': int(time.time() * 1000) + }) + + # Insert records + if records: + insert_query = text(""" + INSERT INTO label_index (file_name, file_path, row_index, content, metadata, created_at, updated_at) + VALUES (:file_name, :file_path, :row_index, :content, :metadata, :created_at, :updated_at) + """) + await conn.execute(insert_query, records) + + indexed_files.append({ + 'file_name': csv_file.name, + 'records_indexed': len(records), + 'status': 'success' + }) + total_records += len(records) + logger.info(f"Indexed {len(records)} records from {csv_file.name}") + else: + indexed_files.append({ + 'file_name': csv_file.name, + 'records_indexed': 0, + 'status': 'empty' + }) + + except Exception as e: + logger.error(f"Error indexing {csv_file.name}: {e}") + indexed_files.append({ + 'file_name': csv_file.name, + 'records_indexed': 0, + 'status': 'error', + 'error': str(e) + }) + + await engine.dispose() + + success_count = sum(1 for f in indexed_files if f['status'] == 'success') + return { + 'status': 'success', + 'message': f'Indexed {success_count}/{len(csv_files)} files successfully', + 'indexed_files': indexed_files, + 'total_files': len(csv_files), + 'total_records': total_records + } + + except Exception as e: + logger.error(f"Label indexing failed: {e}") + return {"status": "error", "message": str(e)} + +async def search_labels(database_url: str, query: str, limit: int = 10) -> List[Dict[str, Any]]: + """ + Search indexed labels using PostgreSQL full-text search + """ + try: + + engine = create_async_engine(database_url, echo=False) + + async with engine.begin() as conn: + result = await conn.execute(text(""" + SELECT file_name, row_index, content, metadata, + ts_rank(search_vector, plainto_tsquery('english', :query)) as relevance_score + FROM label_index + WHERE search_vector @@ plainto_tsquery('english', :query) + ORDER BY relevance_score DESC + LIMIT :limit + """), {"query": query, "limit": limit}) + + results = [ + { + "file_name": row[0], + "row_index": row[1], + "content": row[2], + "metadata": json.loads(row[3]) if row[3] else {}, + "relevance_score": float(row[4]) + } + for row in result.fetchall() + ] + + await engine.dispose() + return results + + except Exception as e: + logger.error(f"Label search failed: {e}") + return [] + +async def get_label_stats(database_url: str) -> Dict[str, Any]: + """ + Get statistics about indexed labels + """ + try: + + engine = create_async_engine(database_url, echo=False) + + async with engine.begin() as conn: + # Get total records and files + result = await conn.execute(text(""" + SELECT + COUNT(*) as total_records, + COUNT(DISTINCT file_name) as total_files + FROM label_index + """)) + stats = result.fetchone() + + # Get recent files + result = await conn.execute(text(""" + SELECT file_name, COUNT(*) as record_count, MAX(updated_at) as last_updated + FROM label_index + GROUP BY file_name + ORDER BY last_updated DESC + LIMIT 10 + """)) + recent_files = [ + { + 'file_name': row[0], + 'record_count': row[1], + 'last_updated': row[2] + } + for row in result.fetchall() + ] + + await engine.dispose() + + return { + 'total_records': stats[0] if stats else 0, + 'total_files': stats[1] if stats else 0, + 'recent_files': recent_files, + 'status': 'success' + } + + except Exception as e: + logger.error(f"Error getting label stats: {e}") + return { + 'status': 'error', + 'error': str(e), + 'total_records': 0, + 'total_files': 0, + 'recent_files': [] + } + +async def auto_reindex_if_needed(database_url: str, labels_directory: str = "/cmf-server/data/labels") -> Dict[str, Any]: + """ + Automatically reindex labels if files have been modified or content has changed. + Handles MD5 hash filenames and detects content changes. + """ + try: + + labels_dir = Path(labels_directory) + if not labels_dir.exists(): + return {"status": "no_directory", "message": "Labels directory does not exist"} + + # Get all potential CSV files (with and without .csv extension) + csv_files = list(labels_dir.glob("*.csv")) + + # Also check files without extension (including MD5 hash names) + for file_path in labels_dir.iterdir(): + if file_path.is_file() and not file_path.suffix: + # Check if it's an MD5 hash (32 hex characters) + is_md5_hash = bool(re.match(r'^[a-f0-9]{32}$', file_path.name)) + + try: + with open(file_path, 'r', encoding='utf-8') as f: + first_line = f.readline().strip() + # For MD5 hash files, be more lenient in detection + if is_md5_hash or ',' in first_line or any(keyword in first_line.lower() for keyword in ['file', 'name', 'id', 'type', 'size']): + csv_files.append(file_path) + if is_md5_hash: + logger.info(f"Detected MD5 hash file: {file_path.name}") + except: + pass + + if not csv_files: + return {"status": "no_files", "message": "No CSV files found"} + + # Check if we need to reindex based on content changes + engine = create_async_engine(database_url, echo=False) + needs_reindex = False + files_to_reindex = [] + + async with engine.begin() as conn: + for csv_file in csv_files: + # Calculate content hash for change detection + try: + with open(csv_file, 'rb') as f: + content_hash = hashlib.md5(f.read()).hexdigest() + except: + continue + + # Check if file exists in index and get its content hash + result = await conn.execute(text(""" + SELECT + MAX(updated_at) as last_indexed, + COUNT(*) as record_count + FROM label_index + WHERE file_name = :file_name + """), {"file_name": csv_file.name}) + + row = result.fetchone() + last_indexed = row[0] if row else None + record_count = row[1] if row else 0 + + # Get stored content hash from metadata if available + result = await conn.execute(text(""" + SELECT metadata->>'content_hash' as stored_hash + FROM label_index + WHERE file_name = :file_name + LIMIT 1 + """), {"file_name": csv_file.name}) + + stored_hash_row = result.fetchone() + stored_hash = stored_hash_row[0] if stored_hash_row else None + + # Check if reindexing is needed + file_mtime = int(os.path.getmtime(csv_file) * 1000) + + if (not last_indexed or + record_count == 0 or + stored_hash != content_hash or + file_mtime > (last_indexed + 60000)): # 1 minute grace period + + needs_reindex = True + files_to_reindex.append({ + 'file': csv_file, + 'reason': 'new' if not last_indexed else 'content_changed' if stored_hash != content_hash else 'modified' + }) + logger.info(f"File {csv_file.name} needs reindexing - {files_to_reindex[-1]['reason']}") + + await engine.dispose() + + if needs_reindex: + logger.info(f"Auto-reindexing {len(files_to_reindex)} files...") + result = await index_csv_labels_with_hash(database_url, labels_directory) + return { + "status": "reindexed", + "message": f"Auto-reindexed {result.get('total_files', 0)} files", + "files_reindexed": [f['file'].name for f in files_to_reindex], + "details": result + } + else: + return { + "status": "up_to_date", + "message": "All label files are up to date" + } + + except Exception as e: + logger.error(f"Auto-reindex failed: {e}") + return {"status": "error", "message": str(e)} + +async def index_csv_labels_with_hash(database_url: str, labels_directory: str = "/cmf-server/data/labels") -> Dict[str, Any]: + """ + Enhanced version of index_csv_labels that stores content hashes for change detection + """ + try: + + labels_dir = Path(labels_directory) + if not labels_dir.exists(): + return {"status": "error", "message": f"Labels directory not found: {labels_directory}"} + + # Get all potential CSV files (including MD5 hash files) + csv_files = list(labels_dir.glob("*.csv")) + + # Also include files without extension + for file_path in labels_dir.iterdir(): + if file_path.is_file() and not file_path.suffix: + is_md5_hash = bool(re.match(r'^[a-f0-9]{32}$', file_path.name)) + try: + with open(file_path, 'r', encoding='utf-8') as f: + first_line = f.readline().strip() + if is_md5_hash or ',' in first_line or any(keyword in first_line.lower() for keyword in ['file', 'name', 'id', 'type', 'size']): + csv_files.append(file_path) + except: + pass + + if not csv_files: + return {"status": "warning", "message": "No CSV files found", "indexed_files": [], "total_files": 0} + + engine = create_async_engine(database_url, echo=False) + indexed_files = [] + total_records = 0 + + async with engine.begin() as conn: + for csv_file in csv_files: + try: + # Calculate content hash + with open(csv_file, 'rb') as f: + content_hash = hashlib.md5(f.read()).hexdigest() + + # Clear existing data for this file + await conn.execute( + text("DELETE FROM label_index WHERE file_name = :file_name"), + {"file_name": csv_file.name} + ) + + # Process CSV file + records = [] + with open(csv_file, 'r', encoding='utf-8') as csvfile: + # Detect CSV format with fallback + sample = csvfile.read(1024) + csvfile.seek(0) + + # Try to detect delimiter, with fallbacks + delimiter = ',' # Default fallback + try: + sniffer = csv.Sniffer() + detected_delimiter = sniffer.sniff(sample).delimiter + delimiter = detected_delimiter + except: + # Fallback: try common delimiters + for test_delimiter in [',', '\t', ';', '|']: + if test_delimiter in sample: + delimiter = test_delimiter + break + + reader = csv.DictReader(csvfile, delimiter=delimiter) + + for row_index, row in enumerate(reader): + # Create searchable content + content_parts = [] + metadata = {"content_hash": content_hash} # Store content hash + + for key, value in row.items(): + if value and value.strip(): + content_parts.append(f"{key}: {value.strip()}") + metadata[key] = value.strip() + + content = " | ".join(content_parts) + + if content.strip(): + records.append({ + 'file_name': csv_file.name, + 'file_path': str(csv_file), + 'row_index': row_index, + 'content': content, + 'metadata': json.dumps(metadata), + 'created_at': int(time.time() * 1000), + 'updated_at': int(time.time() * 1000) + }) + + # Insert records + if records: + insert_query = text(""" + INSERT INTO label_index (file_name, file_path, row_index, content, metadata, created_at, updated_at) + VALUES (:file_name, :file_path, :row_index, :content, :metadata, :created_at, :updated_at) + """) + await conn.execute(insert_query, records) + + indexed_files.append({ + 'file_name': csv_file.name, + 'records_indexed': len(records), + 'content_hash': content_hash, + 'status': 'success' + }) + total_records += len(records) + logger.info(f"Indexed {len(records)} records from {csv_file.name} (hash: {content_hash[:8]}...)") + else: + indexed_files.append({ + 'file_name': csv_file.name, + 'records_indexed': 0, + 'status': 'empty' + }) + + except Exception as e: + logger.error(f"Error indexing {csv_file.name}: {e}") + indexed_files.append({ + 'file_name': csv_file.name, + 'records_indexed': 0, + 'status': 'error', + 'error': str(e) + }) + + await engine.dispose() + + success_count = sum(1 for f in indexed_files if f['status'] == 'success') + return { + 'status': 'success', + 'message': f'Indexed {success_count}/{len(csv_files)} files successfully', + 'indexed_files': indexed_files, + 'total_files': len(csv_files), + 'total_records': total_records + } + + except Exception as e: + logger.error(f"Label indexing failed: {e}") + return {"status": "error", "message": str(e)} \ No newline at end of file diff --git a/ui/src/components/ArtifactPTable/index.jsx b/ui/src/components/ArtifactPTable/index.jsx index 00d098ce4..45d19da14 100644 --- a/ui/src/components/ArtifactPTable/index.jsx +++ b/ui/src/components/ArtifactPTable/index.jsx @@ -145,6 +145,9 @@ const ArtifactPTable = ({artifacts, artifactType, onsortOrder, onsortTimeOrder, {artifactType === "Dataset" && ( LABEL )} + {artifactType === "Label" && ( + LABEL + )} URI URL GIT REPO @@ -193,7 +196,7 @@ const ArtifactPTable = ({artifacts, artifactType, onsortOrder, onsortTimeOrder, {(getPropertyValue(artifact.artifact_properties, "labels_uri") || "") .split(",") .map((label_name) => label_name.trim()) - .filter((label_name) => label_name.length > 0) // Optional: skip empty strings + .filter((label_name) => label_name.length > 0) .map((label_name) => (
)} + {artifactType === "Label" && ( + +
+ { + e.preventDefault(); + getLabelData(artifact.name.split(":")[1] || artifact.name); + setShowPopup(true); + }} + > + {artifact.name} + + {showPopup && ( + + )} +
+ + )} From 68b1c1888421b15797654d024c6c85dd6687a843 Mon Sep 17 00:00:00 2001 From: Jaychaware Date: Thu, 31 Jul 2025 12:49:47 +0530 Subject: [PATCH 02/11] Changes for label content search with highlight --- db_init.sql | 7 + server/app/main.py | 149 +++++--- ui/src/client.js | 23 ++ ui/src/components/ArtifactPTable/index.jsx | 42 +-- ui/src/pages/artifacts_postgres/index.css | 142 +++++++ ui/src/pages/artifacts_postgres/index.jsx | 409 +++++++++++++++++++-- 6 files changed, 681 insertions(+), 91 deletions(-) diff --git a/db_init.sql b/db_init.sql index 63e8b9118..34d7a1db3 100644 --- a/db_init.sql +++ b/db_init.sql @@ -1,3 +1,10 @@ +CREATE TABLE IF NOT EXISTS registered_servers( + id SERIAL, + server_name VARCHAR(255) NOT NULL, + host_info VARCHAR(255) NOT NULL PRIMARY KEY, + last_sync_time BIGINT DEFAULT NULL +); + -- Label indexing table for full-text search of CSV label content CREATE TABLE IF NOT EXISTS label_index ( id SERIAL PRIMARY KEY, diff --git a/server/app/main.py b/server/app/main.py index 298cc1dbb..e5efdf3f1 100644 --- a/server/app/main.py +++ b/server/app/main.py @@ -228,7 +228,6 @@ async def get_artifacts( query_params: ArtifactRequest = Depends(), db: AsyncSession = Depends(get_db) ): - start_time = time.time() filter_value = query_params.filter_value active_page = query_params.active_page @@ -239,35 +238,121 @@ async def get_artifacts( """Retrieve paginated artifacts with filtering, sorting, and full-text search including label content.""" # Auto-reindex labels if needed (only on first page to avoid performance issues) - reindex_time = 0 if active_page == 1: try: logger = logging.getLogger(__name__) - reindex_start = time.time() reindex_result = await auto_reindex_if_needed(DATABASE_URL) - reindex_time = time.time() - reindex_start if reindex_result['status'] == 'reindexed': logger.info(f"{reindex_result['message']}") except Exception as e: logger = logging.getLogger(__name__) logger.warning(f"Auto-reindex failed: {e}") - query_start = time.time() result = await fetch_artifacts_with_label_search(db, pipeline_name, artifact_type, filter_value, active_page, record_per_page, sort_field, sort_order) - query_time = time.time() - query_start - total_time = time.time() - start_time + return result - # Add timing information to the response - result["timing"] = { - "total_time_ms": round(total_time * 1000, 2), - "query_time_ms": round(query_time * 1000, 2), - "reindex_time_ms": round(reindex_time * 1000, 2) if reindex_time > 0 else 0, - "filter_value": filter_value, - "has_label_search": bool(filter_value and filter_value.strip()) - } - return result +@app.get("/artifacts/{pipeline_name}/Label/search") +async def search_label_artifacts( + pipeline_name: str, + content_filter: str = Query(..., description="Search term to find in label content"), + sort_order: str = Query("asc", description="Sort order (asc or desc)"), + active_page: int = Query(1, gt=0, description="Page number"), + record_per_page: int = Query(5, gt=0, description="Number of records per page"), + db: AsyncSession = Depends(get_db) +): + """ + Search for label artifacts that contain the specified content in their CSV files. + Returns clean label artifacts with proper structure, not content search results. + """ + + try: + # Step 1: Search for labels containing the content + label_search_results = await search_labels_in_artifacts(db, content_filter, pipeline_name, 100) + + if not label_search_results: + return { + "total_items": 0, + "items": [], + "search_metadata": { + "search_term": content_filter, + "search_type": "content_filter" + } + } + + # Step 2: Extract unique label names + unique_labels = set() + label_metadata = {} + + for result in label_search_results: + label_name = result['label_file'] + unique_labels.add(label_name) + + # Store metadata for each label + if label_name not in label_metadata: + label_metadata[label_name] = { + 'matching_rows': 0, + 'total_rows': result.get('total_rows', 0), + 'relevance_score': result.get('relevance_score', 0.0) + } + label_metadata[label_name]['matching_rows'] += 1 + + # Step 3: Fetch actual label artifacts + all_artifacts_result = await fetch_artifacts( + db, pipeline_name, "Label", "", 1, 1000, "name", sort_order + ) + + # Step 4: Filter to only labels that contain the search term + filtered_artifacts = [] + for artifact in all_artifacts_result['items']: + # Extract clean label name from artifact name + clean_name = artifact['name'] + if ':' in clean_name: + clean_name = clean_name.split(':', 1)[1] + + # Remove any (Row X) suffix if present + import re + clean_name = re.sub(r'\s*\(Row\s+\d+\)$', '', clean_name).strip() + + if clean_name in unique_labels: + # Create enhanced artifact with search context + enhanced_artifact = { + 'artifact_id': artifact['artifact_id'], + 'name': clean_name, # Clean name without prefixes/suffixes + 'uri': f"artifacts/labels.csv:{clean_name}", + 'type_id': 'Label', + 'execution': artifact.get('execution', 'N/A'), + 'create_time_since_epoch': artifact.get('create_time_since_epoch', 'N/A'), + 'last_update_time_since_epoch': artifact.get('last_update_time_since_epoch', 'N/A'), + 'artifact_properties': artifact.get('artifact_properties', []), + 'search_context': { + 'search_term': content_filter, + 'matching_rows': label_metadata[clean_name]['matching_rows'], + 'total_rows': label_metadata[clean_name]['total_rows'], + 'relevance_score': label_metadata[clean_name]['relevance_score'] + } + } + filtered_artifacts.append(enhanced_artifact) + + # Step 5: Apply pagination + total_items = len(filtered_artifacts) + start_idx = (active_page - 1) * record_per_page + end_idx = start_idx + record_per_page + paginated_items = filtered_artifacts[start_idx:end_idx] + + return { + "total_items": total_items, + "items": paginated_items, + "search_metadata": { + "search_term": content_filter, + "search_type": "content_filter", + "unique_labels_found": len(unique_labels) + } + } + + except Exception as e: + raise HTTPException(status_code=500, detail=f"Error searching label artifacts: {str(e)}") # api to display executions available in mlmd file[from postgres] @@ -972,27 +1057,15 @@ async def test_label_search(): @app.get("/api/labels/search") async def search_label_content(query: str = Query(..., description="Search query"), limit: int = Query(10, description="Maximum results")): """Search label content using PostgreSQL full-text search""" - start_time = time.time() try: - - search_start = time.time() results = await search_labels(DATABASE_URL, query, limit) - search_time = time.time() - search_start - - total_time = time.time() - start_time return { "status": "success", "query": query, "results": results, - "total_results": len(results), - "timing": { - "total_time_ms": round(total_time * 1000, 2), - "search_time_ms": round(search_time * 1000, 2), - "query": query, - "limit": limit - } + "total_results": len(results) } except Exception as e: @@ -1001,32 +1074,16 @@ async def search_label_content(query: str = Query(..., description="Search query @app.get("/api/labels/search-direct") async def search_labels_direct(query: str = Query(..., description="Search query"), limit: int = Query(10, description="Maximum results")): """Direct label search that returns label matches as pseudo-artifacts""" - start_time = time.time() try: - - db_start = time.time() async with async_session() as db: - search_start = time.time() results = await search_labels_in_artifacts(db, query, None, limit) - search_time = time.time() - search_start - db_time = time.time() - db_start - - total_time = time.time() - start_time return { "status": "success", "query": query, "results": results, - "total_results": len(results), - "timing": { - "total_time_ms": round(total_time * 1000, 2), - "db_time_ms": round(db_time * 1000, 2), - "search_time_ms": round(search_time * 1000, 2), - "query": query, - "limit": limit, - "uses_sqlalchemy_core": True - } + "total_results": len(results) } except Exception as e: diff --git a/ui/src/client.js b/ui/src/client.js index 9c52de55b..0fc19404f 100644 --- a/ui/src/client.js +++ b/ui/src/client.js @@ -54,6 +54,21 @@ class FastAPIClient { }); } + async searchLabelArtifacts(pipeline_name, content_filter, sort_order = "asc", active_page = 1, record_per_page = 5) { + return this.apiClient + .get(`/artifacts/${pipeline_name}/Label/search`, { + params: { + content_filter: content_filter, + sort_order: sort_order, + active_page: active_page, + record_per_page: record_per_page, + }, + }) + .then(({ data }) => { + return data; + }); + } + async getArtifactTypes() { return this.apiClient.get(`/artifact_types`).then(({ data }) => { return data; @@ -168,6 +183,14 @@ class FastAPIClient { }); } + async getLabelsList() { + return this.apiClient + .get(`/api/labels/status`) + .then(({ data }) => { + return data.files || []; + }); + } + async getServerRegistration(server_name, host_info){ return this.apiClient .post(`/register-server`, { diff --git a/ui/src/components/ArtifactPTable/index.jsx b/ui/src/components/ArtifactPTable/index.jsx index 45d19da14..d385f26b5 100644 --- a/ui/src/components/ArtifactPTable/index.jsx +++ b/ui/src/components/ArtifactPTable/index.jsx @@ -25,7 +25,7 @@ import LabelCardPopup from "../LabelCardPopup"; const client = new FastAPIClient(config); -const ArtifactPTable = ({artifacts, artifactType, onsortOrder, onsortTimeOrder, filterValue}) => { +const ArtifactPTable = ({artifacts, artifactType, onsortOrder, onsortTimeOrder, filterValue, onLabelClick}) => { const [data, setData] = useState([]); const [sortOrder, setSortOrder] = useState("asc"); const [sortTimeOrder, setSortTimeOrder] = useState("asc"); @@ -73,7 +73,6 @@ const ArtifactPTable = ({artifacts, artifactType, onsortOrder, onsortTimeOrder, // Ensure properties is now an array if (!Array.isArray(properties)) { - console.warn("Expected an array for properties, got:", properties); return "N/A"; } @@ -109,9 +108,7 @@ const ArtifactPTable = ({artifacts, artifactType, onsortOrder, onsortTimeOrder, }; const getLabelData = (label_name) => { - console.log(label_name) client.getLabelData(label_name).then((data) => { - console.log(data); setLabelData(data); }); } @@ -166,7 +163,22 @@ const ArtifactPTable = ({artifacts, artifactType, onsortOrder, onsortTimeOrder, { /* Convert artifact ID to string and render it with highlighted search term if it matches the filter value */} - + + {artifactType === "Label" && onLabelClick ? ( + + ) : ( + + )} + {artifactType === "Model" && ( @@ -196,7 +208,7 @@ const ArtifactPTable = ({artifacts, artifactType, onsortOrder, onsortTimeOrder, {(getPropertyValue(artifact.artifact_properties, "labels_uri") || "") .split(",") .map((label_name) => label_name.trim()) - .filter((label_name) => label_name.length > 0) + .filter((label_name) => label_name.length > 0) // Optional: skip empty strings .map((label_name) => (
)} diff --git a/ui/src/pages/artifacts_postgres/index.css b/ui/src/pages/artifacts_postgres/index.css index 87db4144d..f82b80eaa 100644 --- a/ui/src/pages/artifacts_postgres/index.css +++ b/ui/src/pages/artifacts_postgres/index.css @@ -71,3 +71,145 @@ body { background: rgb(88, 147, 241); } */ + +/* Loading spinner animation */ +@keyframes spin { + 0% { transform: rotate(0deg); } + 100% { transform: rotate(360deg); } +} + +.animate-spin { + animation: spin 1s linear infinite; +} + +/* Standard DataTable styling */ +.rdt_Table { + font-size: 14px; +} + +.rdt_TableHead { + background-color: #f8f9fa; + border-bottom: 1px solid #dee2e6; +} + +.rdt_TableHeadRow { + background-color: #f8f9fa; +} + +.rdt_TableRow { + border-bottom: 1px solid #dee2e6; +} + +.rdt_TableRow:hover { + background-color: #f8f9fa; +} + +.rdt_TableCell { + padding: 12px 16px; +} + +/* Horizontal scrolling for label properties table */ +.overflow-x-auto { + overflow-x: auto; + -webkit-overflow-scrolling: touch; +} + +.overflow-x-auto::-webkit-scrollbar { + height: 8px; +} + +.overflow-x-auto::-webkit-scrollbar-track { + background: #f1f1f1; + border-radius: 4px; +} + +.overflow-x-auto::-webkit-scrollbar-thumb { + background: #c1c1c1; + border-radius: 4px; +} + +.overflow-x-auto::-webkit-scrollbar-thumb:hover { + background: #a8a8a8; +} + +/* Ensure table cells don't wrap */ +.min-w-full td { + white-space: nowrap; +} + +/* Make expanded properties table scrollable too */ +.expanded-table { + width: 100%; + overflow-x: auto; +} + +/* Split pane styles */ +.split-pane-container { + display: flex; + height: 100%; + width: 100%; +} + +.split-pane-left { + overflow: auto; + border-right: 1px solid #e5e7eb; +} + +.split-pane-right { + overflow: auto; + background-color: #f9fafb; +} + +.split-pane-resizer { + width: 4px; + background-color: #e5e7eb; + cursor: col-resize; + transition: background-color 0.2s ease; + position: relative; + flex-shrink: 0; +} + +.split-pane-resizer:hover { + background-color: #3b82f6; +} + +.split-pane-resizer:active { + background-color: #2563eb; +} + +.split-pane-resizer::after { + content: ''; + position: absolute; + top: 50%; + left: 50%; + transform: translate(-50%, -50%); + width: 2px; + height: 20px; + background-color: #9ca3af; + border-radius: 1px; +} + +.split-pane-resizer:hover::after { + background-color: white; +} + +/* Label table clickable name styling */ +.label-name-clickable { + color: #2563eb; + cursor: pointer; + transition: color 0.2s ease; +} + +.label-name-clickable:hover { + color: #1d4ed8; + text-decoration: underline; +} + +/* Selected label highlighting */ +.selected-label-row { + background-color: #eff6ff !important; +} + +.selected-label-row td { + border-color: #3b82f6; +} diff --git a/ui/src/pages/artifacts_postgres/index.jsx b/ui/src/pages/artifacts_postgres/index.jsx index 270c3c32d..bee81a064 100644 --- a/ui/src/pages/artifacts_postgres/index.jsx +++ b/ui/src/pages/artifacts_postgres/index.jsx @@ -14,7 +14,7 @@ * limitations under the License. ***/ -import React, { useEffect, useState } from "react"; +import React, { useEffect, useState, useCallback, useRef } from "react"; import FastAPIClient from "../../client"; import config from "../../config"; import DashboardHeader from "../../components/DashboardHeader"; @@ -23,6 +23,8 @@ import Footer from "../../components/Footer"; import "./index.css"; import Sidebar from "../../components/Sidebar"; import ArtifactTypeSidebar from "../../components/ArtifactTypeSidebar"; +import Papa from "papaparse"; +import Highlight from "../../components/Highlight"; const client = new FastAPIClient(config); @@ -38,9 +40,29 @@ const ArtifactsPostgres = () => { const [sortOrder, setSortOrder] = useState("asc"); const [totalItems, setTotalItems] = useState(0); const [activePage, setActivePage] = useState(1); - const [clickedButton, setClickedButton] = useState("page"); + const [clickedButton, setClickedButton] = useState("page"); const [selectedCol, setSelectedCol] = useState("name"); - + + // Label-specific state + const [selectedTableLabel, setSelectedTableLabel] = useState(null); + const [labelData, setLabelData] = useState(""); + const [parsedLabelData, setParsedLabelData] = useState([]); + const [labelColumns, setLabelColumns] = useState([]); + const [labelContentLoading, setLabelContentLoading] = useState(false); + const [currentPage, setCurrentPage] = useState(0); + const [rowsPerPage, setRowsPerPage] = useState(10); + + const clearLabelData = () => { + setLabelData(""); + setParsedLabelData([]); + setLabelColumns([]); + setLabelContentLoading(false); + setCurrentPage(0); + }; + + // Flag to prevent re-fetching artifacts when just loading label content + const [isLoadingLabelContent, setIsLoadingLabelContent] = useState(false); + useEffect(() => { fetchPipelines(); // Fetch pipelines and artifact types when the component mounts },[]); @@ -70,36 +92,71 @@ const ArtifactsPostgres = () => { }; useEffect(() => { - if ( selectedPipeline && selectedArtifactType ){ + if ( selectedPipeline && selectedArtifactType && !isLoadingLabelContent ){ fetchArtifacts(selectedPipeline, selectedArtifactType, sortOrder, activePage, filter, selectedCol); } - }, [selectedArtifactType, sortOrder, activePage, selectedCol, filter]); - - const fetchArtifacts = (pipelineName, artifactType, sortOrder, activePage, filter="", selectedCol) => { - client.getArtifacts(pipelineName, artifactType, sortOrder, activePage, filter, selectedCol) - .then((data) => { - setArtifacts(data.items); - setTotalItems(data.total_items); - }); - }; + }, [selectedArtifactType, sortOrder, activePage, selectedCol, filter, isLoadingLabelContent]); + + const fetchArtifacts = async (pipelineName, artifactType, sortOrder, activePage, filter="", selectedCol) => { + try { + // Handle Label search case + if (artifactType === "Label" && filter && filter.trim() !== "") { + try { + const searchData = await client.searchLabelArtifacts(pipelineName, filter, sortOrder, activePage, 5); + + // Add search context to artifacts + const processedItems = searchData.items.map(item => ({ + ...item, + isSearchResult: true, + searchFilter: filter + })); + + setArtifacts(processedItems); + setTotalItems(searchData.total_items); + return; // Early return + } catch (searchError) { + console.warn('Label search failed, falling back to regular fetch:', searchError); + // Fall through to regular fetch + } + } + + // Regular artifact fetching + const regularData = await client.getArtifacts(pipelineName, artifactType, sortOrder, activePage, filter, selectedCol); + setArtifacts(regularData.items); + setTotalItems(regularData.total_items); + + } catch (error) { + console.error('Failed to fetch artifacts:', error); + setArtifacts([]); + setTotalItems(0); + } + }; const handleArtifactTypeClick = (artifactType) => { if (selectedArtifactType !== artifactType) { // if same artifact type is not clicked, sets page as null until it retrieves data for that type. setArtifacts(null); - } + } setSelectedArtifactType(artifactType); setActivePage(1); - }; + + // Clear label-related state when switching artifact types + setSelectedTableLabel(null); + clearLabelData(); + }; const handlePipelineClick = (pipeline) => { if (selectedPipeline !== pipeline) { // this condition sets page as null. setArtifacts(null); - } + } setSelectedPipeline(pipeline); setActivePage(1); - }; + + // Clear label-related state when switching pipelines + setSelectedTableLabel(null); + clearLabelData(); + }; const handleFilter = (value) => { setFilter(value); // Update the filter string @@ -135,7 +192,285 @@ const ArtifactsPostgres = () => { setClickedButton("next"); handlePageClick(activePage + 1); } - }; + }; + + // Simple label content display component + const LabelContentPanel = () => { + return ( +
+ {selectedTableLabel ? ( +
+ {labelContentLoading ? ( +
+
+

Loading content...

+
+ ) : labelData ? ( +
+ {/* Header aligned with left table */} +
+

+ {selectedTableLabel.name.split(":")[1] || selectedTableLabel.name} +

+
+ + {/* Fixed size table container */} +
+
+ + + + {labelColumns.map((column, index) => ( + + ))} + + + + {parsedLabelData.slice(currentPage * rowsPerPage, (currentPage + 1) * rowsPerPage).map((row, rowIndex) => ( + + {labelColumns.map((column, colIndex) => ( + + ))} + + ))} + +
+ {column.name} +
+ +
+
+ + {/* Pagination controls */} +
+
+ Rows per page: + +
+ +
+ + {currentPage * rowsPerPage + 1}-{Math.min((currentPage + 1) * rowsPerPage, parsedLabelData.length)} of {parsedLabelData.length} + + + +
+
+
+
+ ) : ( +
+

No content available

+
+ )} +
+ ) : ( +
+
+

+ Select a Label +

+

+ Click on a label name in the table to view its content +

+
+
+ )} +
+ ); + }; + + // Handle label click from table + const handleTableLabelClick = async (labelName, artifact) => { + // Prevent useEffect from triggering fetchArtifacts while loading label content + setIsLoadingLabelContent(true); + setSelectedTableLabel(artifact); + setLabelContentLoading(true); + setCurrentPage(0); // Reset pagination when new label is selected + + // Use the URI from the artifact for getLabelData, not just the label name + const fileNameForAPI = artifact.uri || `artifacts/labels.csv:${labelName}`; + + try { + // Clear old data first + setParsedLabelData([]); + setLabelColumns([]); + + // Helper function to try different URI formats + const tryGetLabelData = async (labelName, fileNameForAPI) => { + const uriFormatsToTry = [ + fileNameForAPI, // Original: artifacts/labels.csv:93951bf... + labelName, // Just the label name: 93951bf... + `artifacts/labels.csv/${labelName}`, // Alternative format: artifacts/labels.csv/93951bf... + `labels.csv:${labelName}`, // Without artifacts prefix: labels.csv:93951bf... + `${labelName}.csv` // As CSV file: 93951bf....csv + ]; + + for (const uriToTry of uriFormatsToTry) { + try { + const data = await client.getLabelData(uriToTry); + return data; // Success - return immediately + } catch (uriError) { + continue; // Try next URI format + } + } + + throw new Error(`All URI formats failed. Tried: ${uriFormatsToTry.join(', ')}`); + }; + + const labelData = await tryGetLabelData(labelName, fileNameForAPI); + + setLabelData(labelData); + + const parsed = Papa.parse(labelData, { header: true }); + + // Check if this is a search result - if so, filter to matching rows only + if (artifact.isSearchResult && artifact.searchFilter) { + const searchFilter = artifact.searchFilter; + + // Filter to show only rows that contain the search term + const matchingRows = parsed.data.filter((row) => { + const rowValues = Object.values(row); + + const hasMatch = rowValues.some(value => { + if (value && value.toString().toLowerCase().includes(searchFilter.toLowerCase())) { + return true; + } + return false; + }); + + return hasMatch; + }); + + setParsedLabelData(matchingRows); + } else { + // Normal label - show all data + setParsedLabelData(parsed.data); + } + + if (parsed.meta.fields) { + setLabelColumns( + parsed.meta.fields.map(field => ({ + name: field, + selector: row => row[field], + sortable: true, + })) + ); + } + } catch (error) { + // Clear data on error to prevent showing old content + setParsedLabelData([]); + setLabelColumns([]); + + // Show error message to user + setParsedLabelData([{ + error: "Failed to load label content", + message: error.message, + uri: fileNameForAPI + }]); + + } finally { + setLabelContentLoading(false); + setIsLoadingLabelContent(false); // Reset flag to allow normal useEffect behavior + } + }; + + // Resizable Split Pane Component + const ResizableSplitPane = ({ leftContent, rightContent, initialSplitPercentage = 50 }) => { + const [splitPercentage, setSplitPercentage] = useState(initialSplitPercentage); + const [isDragging, setIsDragging] = useState(false); + const containerRef = useRef(null); + + const handleMouseDown = (e) => { + setIsDragging(true); + e.preventDefault(); + }; + + const handleMouseMove = useCallback((e) => { + if (!isDragging || !containerRef.current) return; + + const containerRect = containerRef.current.getBoundingClientRect(); + const newPercentage = ((e.clientX - containerRect.left) / containerRect.width) * 100; + + // Limit between 20% and 80% + const clampedPercentage = Math.max(20, Math.min(80, newPercentage)); + setSplitPercentage(clampedPercentage); + }, [isDragging]); + + const handleMouseUp = useCallback(() => { + setIsDragging(false); + }, []); + + useEffect(() => { + if (isDragging) { + document.addEventListener('mousemove', handleMouseMove); + document.addEventListener('mouseup', handleMouseUp); + return () => { + document.removeEventListener('mousemove', handleMouseMove); + document.removeEventListener('mouseup', handleMouseUp); + }; + } + }, [isDragging, handleMouseMove, handleMouseUp]); + + return ( +
+ {/* Left Pane */} +
+ {leftContent} +
+ + {/* Resizer */} +
+
+
+
+
+ + {/* Right Pane */} +
+ {rightContent} +
+
+ ); + }; + + return ( <> @@ -152,7 +487,8 @@ const ArtifactsPostgres = () => { className="flex-grow" />
-
+ +
{selectedPipeline !== null && ( { /> )}
-
+ {selectedArtifactType === "Label" ? ( +
+ + {artifacts !== null && artifacts.length > 0 ? ( + + ) : ( +
+

No label artifacts available

+
+ )} +
+ } + rightContent={} + initialSplitPercentage={50} + /> +
+ ) : ( +
{artifacts !== null && artifacts.length > 0 ? ( - - + ) : (
No data available
// Display message when there are no artifacts )} @@ -251,7 +613,8 @@ const ArtifactsPostgres = () => { )} -
+
+ )}