diff --git a/src/aws-healthomics-mcp-server/awslabs/aws_healthomics_mcp_server/server.py b/src/aws-healthomics-mcp-server/awslabs/aws_healthomics_mcp_server/server.py index 1c133a101a..8195144f11 100644 --- a/src/aws-healthomics-mcp-server/awslabs/aws_healthomics_mcp_server/server.py +++ b/src/aws-healthomics-mcp-server/awslabs/aws_healthomics_mcp_server/server.py @@ -14,12 +14,52 @@ """awslabs aws-healthomics MCP Server implementation.""" +from awslabs.aws_healthomics_mcp_server.tools.annotation_store_tools import ( + get_aho_annotation_import_job, + get_aho_annotation_store, + list_aho_annotation_stores, + search_aho_annotations, + start_aho_annotation_import_job, +) +from awslabs.aws_healthomics_mcp_server.tools.data_import_tools import ( + discover_aho_genomic_files, + get_aho_s3_file_metadata, + list_aho_s3_bucket_contents, + prepare_aho_import_sources, + validate_aho_s3_uri_format, +) from awslabs.aws_healthomics_mcp_server.tools.helper_tools import ( get_supported_regions, package_workflow, ) +from awslabs.aws_healthomics_mcp_server.tools.reference_store_tools import ( + get_aho_reference, + get_aho_reference_import_job, + get_aho_reference_store, + list_aho_reference_stores, + list_aho_references, + start_aho_reference_import_job, +) from awslabs.aws_healthomics_mcp_server.tools.run_analysis import analyze_run_performance + +# Data store tools +from awslabs.aws_healthomics_mcp_server.tools.sequence_store_tools import ( + get_aho_read_set, + get_aho_read_set_import_job, + list_aho_read_set_import_jobs, + list_aho_read_sets, + list_aho_sequence_stores, + start_aho_read_set_import_job, +) from awslabs.aws_healthomics_mcp_server.tools.troubleshooting import diagnose_run_failure +from awslabs.aws_healthomics_mcp_server.tools.variant_store_tools import ( + count_aho_variants, + get_aho_variant_import_job, + get_aho_variant_store, + list_aho_variant_stores, + search_aho_variants, + start_aho_variant_import_job, +) from awslabs.aws_healthomics_mcp_server.tools.workflow_analysis import ( get_run_engine_logs, get_run_logs, @@ -53,7 +93,7 @@ instructions=""" # AWS HealthOmics MCP Server -This MCP server provides tools for creating, managing, and analyzing genomic workflows using AWS HealthOmics. It enables AI assistants to help users with workflow creation, execution, monitoring, and troubleshooting. +This MCP server provides comprehensive tools for managing AWS HealthOmics workflows and data stores. It enables AI assistants to help users with workflow creation, execution, monitoring, data management, and genomic analysis. ## Available Tools @@ -85,10 +125,62 @@ - **LintAHOWorkflowDefinition**: Lint single WDL or CWL workflow files using miniwdl and cwltool - **LintAHOWorkflowBundle**: Lint multi-file WDL or CWL workflow bundles with import/dependency support +### Sequence Store Operations +- **ListAHOSequenceStores**: List available sequence stores +- **ListAHOReadSets**: List read sets in a sequence store with optional filtering +- **GetAHOReadSet**: Get detailed metadata for a specific read set +- **StartAHOReadSetImportJob**: Start importing read sets from S3 to a sequence store +- **GetAHOReadSetImportJob**: Get status and details of a read set import job +- **ListAHOReadSetImportJobs**: List read set import jobs for a sequence store + +### Variant Store Operations +- **ListAHOVariantStores**: List available variant stores +- **GetAHOVariantStore**: Get detailed information about a variant store +- **SearchAHOVariants**: Search for variants by gene, position, type, etc. +- **CountAHOVariants**: Count variants matching specific criteria +- **StartAHOVariantImportJob**: Start importing variants from VCF files to a variant store +- **GetAHOVariantImportJob**: Get status and details of a variant import job + +### Reference Store Operations +- **ListAHOReferenceStores**: List available reference stores +- **GetAHOReferenceStore**: Get detailed information about a reference store +- **ListAHOReferences**: List reference genomes in a reference store +- **GetAHOReference**: Get detailed metadata for a specific reference +- **StartAHOReferenceImportJob**: Start importing reference genomes from S3 +- **GetAHOReferenceImportJob**: Get status and details of a reference import job + +### Annotation Store Operations +- **ListAHOAnnotationStores**: List available annotation stores +- **GetAHOAnnotationStore**: Get detailed information about an annotation store +- **SearchAHOAnnotations**: Search for annotations by gene, position, type, etc. +- **StartAHOAnnotationImportJob**: Start importing annotations from files +- **GetAHOAnnotationImportJob**: Get status and details of an annotation import job + +### Data Import and S3 Integration +- **ValidateAHOS3UriFormat**: Validate S3 URI format and syntax +- **DiscoverAHOGenomicFiles**: Auto-discover genomic files in S3 locations +- **ListAHOS3BucketContents**: Browse S3 bucket contents with optional filtering +- **GetAHOS3FileMetadata**: Get metadata for specific S3 files +- **PrepareAHOImportSources**: Prepare source file configurations for import operations + ### Helper Tools - **PackageAHOWorkflow**: Package workflow definition files into a base64-encoded ZIP - **GetAHOSupportedRegions**: Get the list of AWS regions where HealthOmics is available +## Usage Patterns + +### Complete Genomic Analysis Workflow +1. Use S3 tools to discover and validate genomic files +2. Import data using sequence/variant/reference store import tools +3. Create and execute workflows using workflow management tools +4. Analyze results using variant search and annotation tools +5. Monitor and troubleshoot using analysis and diagnostic tools + +### Data Management +- Organize genomic data across sequence, variant, reference, and annotation stores +- Import data from S3 with automatic file discovery and validation +- Search and retrieve specific genomic data for analysis + ## Service Availability AWS HealthOmics is available in select AWS regions. Use the GetAHOSupportedRegions tool to get the current list of supported regions. """, @@ -133,6 +225,44 @@ mcp.tool(name='PackageAHOWorkflow')(package_workflow) mcp.tool(name='GetAHOSupportedRegions')(get_supported_regions) +# Register sequence store tools +mcp.tool(name='ListAHOSequenceStores')(list_aho_sequence_stores) +mcp.tool(name='ListAHOReadSets')(list_aho_read_sets) +mcp.tool(name='GetAHOReadSet')(get_aho_read_set) +mcp.tool(name='StartAHOReadSetImportJob')(start_aho_read_set_import_job) +mcp.tool(name='GetAHOReadSetImportJob')(get_aho_read_set_import_job) +mcp.tool(name='ListAHOReadSetImportJobs')(list_aho_read_set_import_jobs) + +# Register variant store tools +mcp.tool(name='ListAHOVariantStores')(list_aho_variant_stores) +mcp.tool(name='GetAHOVariantStore')(get_aho_variant_store) +mcp.tool(name='SearchAHOVariants')(search_aho_variants) +mcp.tool(name='CountAHOVariants')(count_aho_variants) +mcp.tool(name='StartAHOVariantImportJob')(start_aho_variant_import_job) +mcp.tool(name='GetAHOVariantImportJob')(get_aho_variant_import_job) + +# Register reference store tools +mcp.tool(name='ListAHOReferenceStores')(list_aho_reference_stores) +mcp.tool(name='GetAHOReferenceStore')(get_aho_reference_store) +mcp.tool(name='ListAHOReferences')(list_aho_references) +mcp.tool(name='GetAHOReference')(get_aho_reference) +mcp.tool(name='StartAHOReferenceImportJob')(start_aho_reference_import_job) +mcp.tool(name='GetAHOReferenceImportJob')(get_aho_reference_import_job) + +# Register annotation store tools +mcp.tool(name='ListAHOAnnotationStores')(list_aho_annotation_stores) +mcp.tool(name='GetAHOAnnotationStore')(get_aho_annotation_store) +mcp.tool(name='SearchAHOAnnotations')(search_aho_annotations) +mcp.tool(name='StartAHOAnnotationImportJob')(start_aho_annotation_import_job) +mcp.tool(name='GetAHOAnnotationImportJob')(get_aho_annotation_import_job) + +# Register data import tools +mcp.tool(name='ValidateAHOS3UriFormat')(validate_aho_s3_uri_format) +mcp.tool(name='DiscoverAHOGenomicFiles')(discover_aho_genomic_files) +mcp.tool(name='ListAHOS3BucketContents')(list_aho_s3_bucket_contents) +mcp.tool(name='GetAHOS3FileMetadata')(get_aho_s3_file_metadata) +mcp.tool(name='PrepareAHOImportSources')(prepare_aho_import_sources) + def main(): """Run the MCP server with CLI argument support.""" diff --git a/src/aws-healthomics-mcp-server/awslabs/aws_healthomics_mcp_server/tools/annotation_store_tools.py b/src/aws-healthomics-mcp-server/awslabs/aws_healthomics_mcp_server/tools/annotation_store_tools.py new file mode 100644 index 0000000000..484e835333 --- /dev/null +++ b/src/aws-healthomics-mcp-server/awslabs/aws_healthomics_mcp_server/tools/annotation_store_tools.py @@ -0,0 +1,380 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Annotation store tools for the AWS HealthOmics MCP server.""" + +import botocore.exceptions +from awslabs.aws_healthomics_mcp_server.consts import ( + DEFAULT_MAX_RESULTS, +) +from awslabs.aws_healthomics_mcp_server.utils.aws_utils import ( + get_omics_client, +) +from loguru import logger +from mcp.server.fastmcp import Context +from pydantic import Field +from typing import Any, Dict, List, Optional + + +async def list_aho_annotation_stores( + ctx: Context, + max_results: int = Field( + DEFAULT_MAX_RESULTS, + description='Maximum number of annotation stores to return', + ge=1, + le=100, + ), + next_token: Optional[str] = Field( + None, + description='Token for pagination from a previous response', + ), +) -> Dict[str, Any]: + """List available HealthOmics annotation stores. + + Args: + ctx: MCP context for error reporting + max_results: Maximum number of annotation stores to return + next_token: Token for pagination from a previous response + + Returns: + Dictionary containing annotation stores list and pagination info + + Raises: + Exception: If there's an error listing annotation stores + """ + try: + client = get_omics_client() + + params = {'maxResults': max_results} + + if next_token: + params['nextToken'] = next_token + + response = client.list_annotation_stores(**params) + + return { + 'annotationStores': response.get('annotationStores', []), + 'nextToken': response.get('nextToken'), + 'totalCount': len(response.get('annotationStores', [])), + } + + except botocore.exceptions.ClientError as e: + error_code = e.response['Error']['Code'] + error_message = e.response['Error']['Message'] + + logger.error(f'Failed to list annotation stores: {error_code} - {error_message}') + + raise Exception(f'Failed to list annotation stores: {error_code} - {error_message}') + except Exception as e: + logger.error(f'Unexpected error listing annotation stores: {str(e)}') + raise Exception(f'Failed to list annotation stores: {str(e)}') + + +async def get_aho_annotation_store( + ctx: Context, + annotation_store_id: str = Field( + ..., + description='ID of the annotation store to retrieve', + ), +) -> Dict[str, Any]: + """Get detailed information about a specific annotation store. + + Args: + ctx: MCP context for error reporting + annotation_store_id: ID of the annotation store + + Returns: + Dictionary containing annotation store details + + Raises: + Exception: If there's an error retrieving annotation store information + """ + try: + client = get_omics_client() + + response = client.get_annotation_store(name=annotation_store_id) + + return { + 'annotationStore': { + 'id': response.get('id'), + 'reference': response.get('reference'), + 'status': response.get('status'), + 'statusMessage': response.get('statusMessage'), + 'storeArn': response.get('storeArn'), + 'name': response.get('name'), + 'description': response.get('description'), + 'sseConfig': response.get('sseConfig'), + 'creationTime': response.get('creationTime'), + 'updateTime': response.get('updateTime'), + 'tags': response.get('tags', {}), + 'storeFormat': response.get('storeFormat'), + 'storeOptions': response.get('storeOptions'), + } + } + + except botocore.exceptions.ClientError as e: + error_code = e.response['Error']['Code'] + error_message = e.response['Error']['Message'] + + logger.error(f'Failed to get annotation store: {error_code} - {error_message}') + + raise Exception(f'Failed to get annotation store: {error_code} - {error_message}') + except Exception as e: + logger.error(f'Unexpected error getting annotation store: {str(e)}') + raise Exception(f'Failed to get annotation store: {str(e)}') + + +async def search_aho_annotations( + ctx: Context, + annotation_store_id: str = Field( + ..., + description='ID of the annotation store to search', + ), + gene: Optional[str] = Field( + None, + description='Gene name to search for annotations (e.g., BRCA1)', + ), + chromosome: Optional[str] = Field( + None, + description='Chromosome to search (e.g., chr1, 1)', + ), + start_position: Optional[int] = Field( + None, + description='Start position for genomic range search', + ge=1, + ), + end_position: Optional[int] = Field( + None, + description='End position for genomic range search', + ge=1, + ), + annotation_type: Optional[str] = Field( + None, + description='Type of annotation to search for', + ), + max_results: int = Field( + DEFAULT_MAX_RESULTS, + description='Maximum number of annotations to return', + ge=1, + le=1000, + ), + next_token: Optional[str] = Field( + None, + description='Token for pagination from a previous response', + ), +) -> Dict[str, Any]: + """Search for annotations in a HealthOmics annotation store. + + Args: + ctx: MCP context for error reporting + annotation_store_id: ID of the annotation store + gene: Gene name to search for + chromosome: Chromosome to search + start_position: Start position for range search + end_position: End position for range search + annotation_type: Type of annotation to search for + max_results: Maximum number of annotations to return + next_token: Token for pagination + + Returns: + Dictionary containing search results + + Raises: + Exception: If there's an error searching annotations + """ + try: + client = get_omics_client() + + # Build search criteria + filter_criteria = {} + + if gene: + filter_criteria['gene'] = {'eq': gene} + + if chromosome: + # Normalize chromosome format + if not chromosome.startswith('chr'): + chromosome = f'chr{chromosome}' + filter_criteria['contigName'] = {'eq': chromosome} + + if start_position is not None and end_position is not None: + filter_criteria['start'] = {'gte': start_position} + filter_criteria['end'] = {'lte': end_position} + elif start_position is not None: + filter_criteria['start'] = {'gte': start_position} + elif end_position is not None: + filter_criteria['end'] = {'lte': end_position} + + if annotation_type: + filter_criteria['annotationType'] = {'eq': annotation_type} + + params = {'annotationStoreId': annotation_store_id, 'maxResults': max_results} + + if filter_criteria: + params['filter'] = filter_criteria + + if next_token: + params['nextToken'] = next_token + + response = client.search_annotations(**params) + + return { + 'annotations': response.get('annotations', []), + 'nextToken': response.get('nextToken'), + 'totalCount': len(response.get('annotations', [])), + 'annotationStoreId': annotation_store_id, + 'searchCriteria': { + 'gene': gene, + 'chromosome': chromosome, + 'startPosition': start_position, + 'endPosition': end_position, + 'annotationType': annotation_type, + }, + } + + except botocore.exceptions.ClientError as e: + error_code = e.response['Error']['Code'] + error_message = e.response['Error']['Message'] + + logger.error(f'Failed to search annotations: {error_code} - {error_message}') + + raise Exception(f'Failed to search annotations: {error_code} - {error_message}') + except Exception as e: + logger.error(f'Unexpected error searching annotations: {str(e)}') + raise Exception(f'Failed to search annotations: {str(e)}') + + +async def start_aho_annotation_import_job( + ctx: Context, + annotation_store_id: str = Field( + ..., + description='ID of the annotation store to import into', + ), + role_arn: str = Field( + ..., + description='ARN of the IAM role to use for the import', + ), + items: List[Dict[str, Any]] = Field( + ..., + description='List of annotation files to import from S3', + ), + run_left_normalization: bool = Field( + False, + description='Whether to run left normalization on annotations', + ), + client_token: Optional[str] = Field( + None, + description='Client token for idempotency', + ), +) -> Dict[str, Any]: + """Start an annotation import job from S3 files. + + Args: + ctx: MCP context for error reporting + annotation_store_id: ID of the annotation store + role_arn: ARN of the IAM role for the import + items: List of annotation file configurations + run_left_normalization: Whether to run left normalization + client_token: Client token for idempotency + + Returns: + Dictionary containing import job information + + Raises: + Exception: If there's an error starting the import job + """ + try: + client = get_omics_client() + + params = { + 'annotationStoreId': annotation_store_id, + 'roleArn': role_arn, + 'items': items, + 'runLeftNormalization': run_left_normalization, + } + + if client_token: + params['clientToken'] = client_token + + response = client.start_annotation_import_job(**params) + + return { + 'id': response.get('id'), + 'annotationStoreId': response.get('annotationStoreId'), + 'roleArn': response.get('roleArn'), + 'status': response.get('status'), + 'creationTime': response.get('creationTime'), + 'runLeftNormalization': run_left_normalization, + 'items': items, + } + + except botocore.exceptions.ClientError as e: + error_code = e.response['Error']['Code'] + error_message = e.response['Error']['Message'] + + logger.error(f'Failed to start annotation import job: {error_code} - {error_message}') + + raise Exception(f'Failed to start annotation import job: {error_code} - {error_message}') + except Exception as e: + logger.error(f'Unexpected error starting annotation import job: {str(e)}') + raise Exception(f'Failed to start annotation import job: {str(e)}') + + +async def get_aho_annotation_import_job( + ctx: Context, + annotation_import_job_id: str = Field( + ..., + description='ID of the annotation import job', + ), +) -> Dict[str, Any]: + """Get the status and details of an annotation import job. + + Args: + ctx: MCP context for error reporting + annotation_import_job_id: ID of the annotation import job + + Returns: + Dictionary containing import job status and details + + Raises: + Exception: If there's an error retrieving import job status + """ + try: + client = get_omics_client() + + response = client.get_annotation_import_job(id=annotation_import_job_id) + + return { + 'id': response.get('id'), + 'annotationStoreId': response.get('annotationStoreId'), + 'roleArn': response.get('roleArn'), + 'status': response.get('status'), + 'statusMessage': response.get('statusMessage'), + 'creationTime': response.get('creationTime'), + 'updateTime': response.get('updateTime'), + 'completionTime': response.get('completionTime'), + 'items': response.get('items', []), + 'runLeftNormalization': response.get('runLeftNormalization'), + } + + except botocore.exceptions.ClientError as e: + error_code = e.response['Error']['Code'] + error_message = e.response['Error']['Message'] + + logger.error(f'Failed to get annotation import job: {error_code} - {error_message}') + + raise Exception(f'Failed to get annotation import job: {error_code} - {error_message}') + except Exception as e: + logger.error(f'Unexpected error getting annotation import job: {str(e)}') + raise Exception(f'Failed to get annotation import job: {str(e)}') diff --git a/src/aws-healthomics-mcp-server/awslabs/aws_healthomics_mcp_server/tools/data_import_tools.py b/src/aws-healthomics-mcp-server/awslabs/aws_healthomics_mcp_server/tools/data_import_tools.py new file mode 100644 index 0000000000..b20f8a22be --- /dev/null +++ b/src/aws-healthomics-mcp-server/awslabs/aws_healthomics_mcp_server/tools/data_import_tools.py @@ -0,0 +1,470 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Data import tools for the AWS HealthOmics MCP server.""" + +import botocore.exceptions +import os +import re +from awslabs.aws_healthomics_mcp_server.utils.aws_utils import ( + create_aws_client, +) +from loguru import logger +from mcp.server.fastmcp import Context +from pydantic import Field +from typing import Any, Dict, List, Optional +from urllib.parse import urlparse + + +def parse_s3_uri(s3_uri: str) -> Dict[str, str]: + """Parse S3 URI into bucket and key components. + + Args: + s3_uri: S3 URI (e.g., s3://bucket-name/path/to/file) + + Returns: + Dictionary with bucket and key + """ + if not s3_uri.startswith('s3://'): + raise ValueError(f'Invalid S3 URI format: {s3_uri}') + + parsed = urlparse(s3_uri) + return {'bucket': parsed.netloc, 'key': parsed.path.lstrip('/')} + + +async def validate_aho_s3_uri_format( + ctx: Context, + s3_uri: str = Field( + ..., + description='S3 URI to validate (e.g., s3://bucket/path/file.txt)', + ), +) -> Dict[str, Any]: + """Validate S3 URI format and syntax. + + Args: + ctx: MCP context for error reporting + s3_uri: S3 URI to validate + + Returns: + Dictionary containing validation results + + Raises: + Exception: If there's an error validating the S3 URI + """ + try: + # Basic format validation + if not isinstance(s3_uri, str): + return {'valid': False, 'error': 'S3 URI must be a string', 's3Uri': s3_uri} + + if not s3_uri.startswith('s3://'): + return {'valid': False, 'error': 'S3 URI must start with s3://', 's3Uri': s3_uri} + + try: + parsed = parse_s3_uri(s3_uri) + except ValueError as e: + return {'valid': False, 'error': str(e), 's3Uri': s3_uri} + + # Validate bucket name + bucket = parsed['bucket'] + if not bucket: + return {'valid': False, 'error': 'Bucket name cannot be empty', 's3Uri': s3_uri} + + # Basic bucket name validation (simplified) + if not re.match(r'^[a-z0-9][a-z0-9\-\.]*[a-z0-9]$', bucket): + return {'valid': False, 'error': 'Invalid bucket name format', 's3Uri': s3_uri} + + return {'valid': True, 'bucket': bucket, 'key': parsed['key'], 's3Uri': s3_uri} + + except Exception as e: + logger.error(f'Unexpected error validating S3 URI: {str(e)}') + return {'valid': False, 'error': f'Validation error: {str(e)}', 's3Uri': s3_uri} + + +async def discover_aho_genomic_files( + ctx: Context, + s3_uri: str = Field( + ..., + description='S3 URI to search for genomic files (e.g., s3://bucket/genomics/)', + ), + file_types: Optional[List[str]] = Field( + None, + description='List of file types to search for (default: FASTQ, BAM, CRAM, VCF)', + ), + max_files: int = Field( + 1000, + description='Maximum number of files to discover', + ge=1, + le=10000, + ), +) -> Dict[str, Any]: + """Auto-discover genomic files in S3 location. + + Args: + ctx: MCP context for error reporting + s3_uri: S3 URI to search in + file_types: List of file types to search for + max_files: Maximum number of files to discover + + Returns: + Dictionary containing discovered genomic files + + Raises: + Exception: If there's an error discovering files + """ + try: + # Validate S3 URI + validation_result = await validate_aho_s3_uri_format(ctx, s3_uri) + if not validation_result['valid']: + raise Exception(f'Invalid S3 URI: {validation_result["error"]}') + + # Default file types for genomic data + if file_types is None: + file_types = ['FASTQ', 'BAM', 'CRAM', 'VCF', 'FASTA', 'FA'] + + # File extensions mapping + extension_map = { + 'FASTQ': ['.fastq', '.fq', '.fastq.gz', '.fq.gz'], + 'BAM': ['.bam'], + 'CRAM': ['.cram'], + 'VCF': ['.vcf', '.vcf.gz'], + 'FASTA': ['.fasta', '.fa', '.fasta.gz', '.fa.gz'], + 'FA': ['.fa', '.fa.gz'], + } + + # Build list of extensions to search for + target_extensions = [] + for file_type in file_types: + if file_type.upper() in extension_map: + target_extensions.extend(extension_map[file_type.upper()]) + + s3_client = create_aws_client('s3') + parsed = parse_s3_uri(s3_uri) + + discovered_files = [] + + # List objects in S3 + paginator = s3_client.get_paginator('list_objects_v2') + + for page in paginator.paginate( + Bucket=parsed['bucket'], Prefix=parsed['key'], MaxKeys=max_files + ): + if 'Contents' not in page: + continue + + for obj in page['Contents']: + key = obj['Key'] + file_name = os.path.basename(key) + + # Check if file matches any target extensions + for ext in target_extensions: + if file_name.lower().endswith(ext.lower()): + # Determine file type + detected_type = None + for ft, extensions in extension_map.items(): + if any(file_name.lower().endswith(e.lower()) for e in extensions): + detected_type = ft + break + + discovered_files.append( + { + 'fileName': file_name, + 'fileType': detected_type, + 's3Uri': f's3://{parsed["bucket"]}/{key}', + 'size': obj['Size'], + 'lastModified': obj['LastModified'].isoformat(), + 'etag': obj['ETag'].strip('"'), + } + ) + break + + if len(discovered_files) >= max_files: + break + + if len(discovered_files) >= max_files: + break + + return { + 'discoveredFiles': discovered_files, + 'totalCount': len(discovered_files), + 'searchLocation': s3_uri, + 'searchedFileTypes': file_types, + 'maxFilesReached': len(discovered_files) >= max_files, + } + + except botocore.exceptions.ClientError as e: + error_code = e.response['Error']['Code'] + error_message = e.response['Error']['Message'] + + logger.error(f'Failed to discover genomic files: {error_code} - {error_message}') + + raise Exception(f'Failed to discover genomic files: {error_code} - {error_message}') + except Exception as e: + logger.error(f'Unexpected error discovering genomic files: {str(e)}') + raise Exception(f'Failed to discover genomic files: {str(e)}') + + +async def list_aho_s3_bucket_contents( + ctx: Context, + s3_uri: str = Field( + ..., + description='S3 URI to browse (e.g., s3://bucket/path/)', + ), + pattern: Optional[str] = Field( + None, + description='File name pattern to filter by (supports wildcards)', + ), + max_keys: int = Field( + 1000, + description='Maximum number of objects to list', + ge=1, + le=10000, + ), +) -> Dict[str, Any]: + """Browse S3 bucket contents with optional filtering. + + Args: + ctx: MCP context for error reporting + s3_uri: S3 URI to browse + pattern: File name pattern to filter by + max_keys: Maximum number of objects to list + + Returns: + Dictionary containing S3 object listing + + Raises: + Exception: If there's an error listing S3 contents + """ + try: + # Validate S3 URI + validation_result = await validate_aho_s3_uri_format(ctx, s3_uri) + if not validation_result['valid']: + raise Exception(f'Invalid S3 URI: {validation_result["error"]}') + + s3_client = create_aws_client('s3') + parsed = parse_s3_uri(s3_uri) + + objects = [] + + # List objects in S3 + paginator = s3_client.get_paginator('list_objects_v2') + + for page in paginator.paginate( + Bucket=parsed['bucket'], Prefix=parsed['key'], MaxKeys=max_keys + ): + if 'Contents' not in page: + continue + + for obj in page['Contents']: + key = obj['Key'] + file_name = os.path.basename(key) + + # Apply pattern filter if specified + if pattern: + import fnmatch + + if not fnmatch.fnmatch(file_name, pattern): + continue + + objects.append( + { + 'key': key, + 'fileName': file_name, + 's3Uri': f's3://{parsed["bucket"]}/{key}', + 'size': obj['Size'], + 'lastModified': obj['LastModified'].isoformat(), + 'etag': obj['ETag'].strip('"'), + 'storageClass': obj.get('StorageClass', 'STANDARD'), + } + ) + + if len(objects) >= max_keys: + break + + if len(objects) >= max_keys: + break + + return { + 'objects': objects, + 'totalCount': len(objects), + 'bucketLocation': s3_uri, + 'appliedPattern': pattern, + 'maxKeysReached': len(objects) >= max_keys, + } + + except botocore.exceptions.ClientError as e: + error_code = e.response['Error']['Code'] + error_message = e.response['Error']['Message'] + + logger.error(f'Failed to list S3 bucket contents: {error_code} - {error_message}') + + raise Exception(f'Failed to list S3 bucket contents: {error_code} - {error_message}') + except Exception as e: + logger.error(f'Unexpected error listing S3 bucket contents: {str(e)}') + raise Exception(f'Failed to list S3 bucket contents: {str(e)}') + + +async def get_aho_s3_file_metadata( + ctx: Context, + s3_uri: str = Field( + ..., + description='S3 URI of the file to get metadata for', + ), +) -> Dict[str, Any]: + """Get metadata for a specific S3 file. + + Args: + ctx: MCP context for error reporting + s3_uri: S3 URI of the file + + Returns: + Dictionary containing file metadata + + Raises: + Exception: If there's an error retrieving file metadata + """ + try: + # Validate S3 URI + validation_result = await validate_aho_s3_uri_format(ctx, s3_uri) + if not validation_result['valid']: + raise Exception(f'Invalid S3 URI: {validation_result["error"]}') + + s3_client = create_aws_client('s3') + parsed = parse_s3_uri(s3_uri) + + # Get object metadata + response = s3_client.head_object(Bucket=parsed['bucket'], Key=parsed['key']) + + return { + 'fileMetadata': { + 's3Uri': s3_uri, + 'bucket': parsed['bucket'], + 'key': parsed['key'], + 'fileName': os.path.basename(parsed['key']), + 'size': response['ContentLength'], + 'lastModified': response['LastModified'].isoformat(), + 'etag': response['ETag'].strip('"'), + 'contentType': response.get('ContentType'), + 'storageClass': response.get('StorageClass', 'STANDARD'), + 'serverSideEncryption': response.get('ServerSideEncryption'), + 'metadata': response.get('Metadata', {}), + } + } + + except botocore.exceptions.ClientError as e: + error_code = e.response['Error']['Code'] + error_message = e.response['Error']['Message'] + + if error_code == 'NoSuchKey': + raise Exception(f'File not found: {s3_uri}') + elif error_code == 'NoSuchBucket': + raise Exception(f'Bucket not found: {parsed["bucket"]}') + else: + logger.error(f'Failed to get S3 file metadata: {error_code} - {error_message}') + raise Exception(f'Failed to get S3 file metadata: {error_code} - {error_message}') + except Exception as e: + logger.error(f'Unexpected error getting S3 file metadata: {str(e)}') + raise Exception(f'Failed to get S3 file metadata: {str(e)}') + + +async def prepare_aho_import_sources( + ctx: Context, + files: List[Dict[str, Any]] = Field( + ..., + description='List of file information to prepare for import', + ), + sample_id: Optional[str] = Field( + None, + description='Sample ID for the import', + ), + subject_id: Optional[str] = Field( + None, + description='Subject ID for the import', + ), + reference_arn: Optional[str] = Field( + None, + description='Reference ARN to use for the import', + ), +) -> Dict[str, Any]: + """Prepare source file configurations for HealthOmics import operations. + + Args: + ctx: MCP context for error reporting + files: List of file information + sample_id: Sample ID for the import + subject_id: Subject ID for the import + reference_arn: Reference ARN to use + + Returns: + Dictionary containing prepared import sources + + Raises: + Exception: If there's an error preparing import sources + """ + try: + prepared_sources = [] + + for file_info in files: + s3_uri = file_info.get('s3Uri') or file_info.get('s3_uri') + file_type = file_info.get('fileType') or file_info.get('file_type', 'FASTQ') + + if not s3_uri: + raise Exception(f'Missing s3Uri in file info: {file_info}') + + # Validate S3 URI + validation_result = await validate_aho_s3_uri_format(ctx, s3_uri) + if not validation_result['valid']: + raise Exception(f'Invalid S3 URI: {validation_result["error"]}') + + source_config = { + 'sourceFileType': file_type.upper(), + 'sourceFiles': {'source1': s3_uri}, + } + + # Add metadata + if sample_id: + source_config['sampleId'] = sample_id + if subject_id: + source_config['subjectId'] = subject_id + if reference_arn: + source_config['referenceArn'] = reference_arn + + # For paired-end FASTQ files, check for R2 + if file_type.upper() == 'FASTQ': + file_name = os.path.basename(s3_uri) + if '_R1' in file_name or '_1.fastq' in file_name: + # Look for corresponding R2 file + r2_uri = s3_uri.replace('_R1', '_R2').replace('_1.fastq', '_2.fastq') + + # Check if R2 file exists in the provided files + for other_file in files: + other_uri = other_file.get('s3Uri') or other_file.get('s3_uri') + if other_uri == r2_uri: + source_config['sourceFiles']['source2'] = r2_uri + break + + prepared_sources.append(source_config) + + return { + 'importSources': prepared_sources, + 'totalFiles': len(prepared_sources), + 'configuration': { + 'sampleId': sample_id, + 'subjectId': subject_id, + 'referenceArn': reference_arn, + }, + } + + except Exception as e: + logger.error(f'Unexpected error preparing import sources: {str(e)}') + raise Exception(f'Failed to prepare import sources: {str(e)}') diff --git a/src/aws-healthomics-mcp-server/awslabs/aws_healthomics_mcp_server/tools/reference_store_tools.py b/src/aws-healthomics-mcp-server/awslabs/aws_healthomics_mcp_server/tools/reference_store_tools.py new file mode 100644 index 0000000000..d7f1151d09 --- /dev/null +++ b/src/aws-healthomics-mcp-server/awslabs/aws_healthomics_mcp_server/tools/reference_store_tools.py @@ -0,0 +1,368 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Reference store tools for the AWS HealthOmics MCP server.""" + +import botocore.exceptions +from awslabs.aws_healthomics_mcp_server.consts import ( + DEFAULT_MAX_RESULTS, +) +from awslabs.aws_healthomics_mcp_server.utils.aws_utils import ( + get_omics_client, +) +from loguru import logger +from mcp.server.fastmcp import Context +from pydantic import Field +from typing import Any, Dict, Optional + + +async def list_aho_reference_stores( + ctx: Context, + max_results: int = Field( + DEFAULT_MAX_RESULTS, + description='Maximum number of reference stores to return', + ge=1, + le=100, + ), + next_token: Optional[str] = Field( + None, + description='Token for pagination from a previous response', + ), +) -> Dict[str, Any]: + """List available HealthOmics reference stores. + + Args: + ctx: MCP context for error reporting + max_results: Maximum number of reference stores to return + next_token: Token for pagination from a previous response + + Returns: + Dictionary containing reference stores list and pagination info + + Raises: + Exception: If there's an error listing reference stores + """ + try: + client = get_omics_client() + + params = {'maxResults': max_results} + + if next_token: + params['nextToken'] = next_token + + response = client.list_reference_stores(**params) + + return { + 'referenceStores': response.get('referenceStores', []), + 'nextToken': response.get('nextToken'), + 'totalCount': len(response.get('referenceStores', [])), + } + + except botocore.exceptions.ClientError as e: + error_code = e.response['Error']['Code'] + error_message = e.response['Error']['Message'] + + logger.error(f'Failed to list reference stores: {error_code} - {error_message}') + + raise Exception(f'Failed to list reference stores: {error_code} - {error_message}') + except Exception as e: + logger.error(f'Unexpected error listing reference stores: {str(e)}') + raise Exception(f'Failed to list reference stores: {str(e)}') + + +async def get_aho_reference_store( + ctx: Context, + reference_store_id: str = Field( + ..., + description='ID of the reference store to retrieve', + ), +) -> Dict[str, Any]: + """Get detailed information about a specific reference store. + + Args: + ctx: MCP context for error reporting + reference_store_id: ID of the reference store + + Returns: + Dictionary containing reference store details + + Raises: + Exception: If there's an error retrieving reference store information + """ + try: + client = get_omics_client() + + response = client.get_reference_store(id=reference_store_id) + + return { + 'referenceStore': { + 'id': response.get('id'), + 'arn': response.get('arn'), + 'name': response.get('name'), + 'description': response.get('description'), + 'sseConfig': response.get('sseConfig'), + 'creationTime': response.get('creationTime'), + 'status': response.get('status'), + 'statusMessage': response.get('statusMessage'), + } + } + + except botocore.exceptions.ClientError as e: + error_code = e.response['Error']['Code'] + error_message = e.response['Error']['Message'] + + logger.error(f'Failed to get reference store: {error_code} - {error_message}') + + raise Exception(f'Failed to get reference store: {error_code} - {error_message}') + except Exception as e: + logger.error(f'Unexpected error getting reference store: {str(e)}') + raise Exception(f'Failed to get reference store: {str(e)}') + + +async def list_aho_references( + ctx: Context, + reference_store_id: str = Field( + ..., + description='ID of the reference store to list references from', + ), + max_results: int = Field( + DEFAULT_MAX_RESULTS, + description='Maximum number of references to return', + ge=1, + le=100, + ), + next_token: Optional[str] = Field( + None, + description='Token for pagination from a previous response', + ), +) -> Dict[str, Any]: + """List references in a HealthOmics reference store. + + Args: + ctx: MCP context for error reporting + reference_store_id: ID of the reference store + max_results: Maximum number of references to return + next_token: Token for pagination + + Returns: + Dictionary containing references list and pagination info + + Raises: + Exception: If there's an error listing references + """ + try: + client = get_omics_client() + + params = {'referenceStoreId': reference_store_id, 'maxResults': max_results} + + if next_token: + params['nextToken'] = next_token + + response = client.list_references(**params) + + return { + 'references': response.get('references', []), + 'nextToken': response.get('nextToken'), + 'totalCount': len(response.get('references', [])), + 'referenceStoreId': reference_store_id, + } + + except botocore.exceptions.ClientError as e: + error_code = e.response['Error']['Code'] + error_message = e.response['Error']['Message'] + + logger.error(f'Failed to list references: {error_code} - {error_message}') + + raise Exception(f'Failed to list references: {error_code} - {error_message}') + except Exception as e: + logger.error(f'Unexpected error listing references: {str(e)}') + raise Exception(f'Failed to list references: {str(e)}') + + +async def get_aho_reference( + ctx: Context, + reference_store_id: str = Field( + ..., + description='ID of the reference store', + ), + reference_id: str = Field( + ..., + description='ID of the reference to retrieve', + ), +) -> Dict[str, Any]: + """Get detailed metadata for a specific reference. + + Args: + ctx: MCP context for error reporting + reference_store_id: ID of the reference store + reference_id: ID of the reference + + Returns: + Dictionary containing reference metadata + + Raises: + Exception: If there's an error retrieving reference metadata + """ + try: + client = get_omics_client() + + response = client.get_reference_metadata( + referenceStoreId=reference_store_id, id=reference_id + ) + + return { + 'reference': { + 'id': response.get('id'), + 'arn': response.get('arn'), + 'referenceStoreId': response.get('referenceStoreId'), + 'md5': response.get('md5'), + 'status': response.get('status'), + 'name': response.get('name'), + 'description': response.get('description'), + 'creationTime': response.get('creationTime'), + 'updateTime': response.get('updateTime'), + 'files': response.get('files'), + }, + 'referenceStoreId': reference_store_id, + } + + except botocore.exceptions.ClientError as e: + error_code = e.response['Error']['Code'] + error_message = e.response['Error']['Message'] + + logger.error(f'Failed to get reference metadata: {error_code} - {error_message}') + + raise Exception(f'Failed to get reference metadata: {error_code} - {error_message}') + except Exception as e: + logger.error(f'Unexpected error getting reference metadata: {str(e)}') + raise Exception(f'Failed to get reference metadata: {str(e)}') + + +async def start_aho_reference_import_job( + ctx: Context, + reference_store_id: str = Field( + ..., + description='ID of the reference store to import into', + ), + role_arn: str = Field( + ..., + description='ARN of the IAM role to use for the import', + ), + sources: list = Field( + ..., + description='List of reference source files to import from S3', + ), + client_token: Optional[str] = Field( + None, + description='Client token for idempotency', + ), +) -> Dict[str, Any]: + """Start a reference import job from S3 sources. + + Args: + ctx: MCP context for error reporting + reference_store_id: ID of the reference store + role_arn: ARN of the IAM role for the import + sources: List of reference source file configurations + client_token: Client token for idempotency + + Returns: + Dictionary containing import job information + + Raises: + Exception: If there's an error starting the import job + """ + try: + client = get_omics_client() + + params = {'referenceStoreId': reference_store_id, 'roleArn': role_arn, 'sources': sources} + + if client_token: + params['clientToken'] = client_token + + response = client.start_reference_import_job(**params) + + return { + 'id': response.get('id'), + 'referenceStoreId': response.get('referenceStoreId'), + 'roleArn': response.get('roleArn'), + 'status': response.get('status'), + 'creationTime': response.get('creationTime'), + 'sources': sources, + } + + except botocore.exceptions.ClientError as e: + error_code = e.response['Error']['Code'] + error_message = e.response['Error']['Message'] + + logger.error(f'Failed to start reference import job: {error_code} - {error_message}') + + raise Exception(f'Failed to start reference import job: {error_code} - {error_message}') + except Exception as e: + logger.error(f'Unexpected error starting reference import job: {str(e)}') + raise Exception(f'Failed to start reference import job: {str(e)}') + + +async def get_aho_reference_import_job( + ctx: Context, + reference_store_id: str = Field( + ..., + description='ID of the reference store', + ), + import_job_id: str = Field( + ..., + description='ID of the import job', + ), +) -> Dict[str, Any]: + """Get the status and details of a reference import job. + + Args: + ctx: MCP context for error reporting + reference_store_id: ID of the reference store + import_job_id: ID of the import job + + Returns: + Dictionary containing import job status and details + + Raises: + Exception: If there's an error retrieving import job status + """ + try: + client = get_omics_client() + + response = client.get_reference_import_job( + referenceStoreId=reference_store_id, id=import_job_id + ) + + return { + 'id': response.get('id'), + 'referenceStoreId': response.get('referenceStoreId'), + 'roleArn': response.get('roleArn'), + 'status': response.get('status'), + 'statusMessage': response.get('statusMessage'), + 'creationTime': response.get('creationTime'), + 'completionTime': response.get('completionTime'), + 'sources': response.get('sources', []), + } + + except botocore.exceptions.ClientError as e: + error_code = e.response['Error']['Code'] + error_message = e.response['Error']['Message'] + + logger.error(f'Failed to get reference import job: {error_code} - {error_message}') + + raise Exception(f'Failed to get reference import job: {error_code} - {error_message}') + except Exception as e: + logger.error(f'Unexpected error getting reference import job: {str(e)}') + raise Exception(f'Failed to get reference import job: {str(e)}') diff --git a/src/aws-healthomics-mcp-server/awslabs/aws_healthomics_mcp_server/tools/sequence_store_tools.py b/src/aws-healthomics-mcp-server/awslabs/aws_healthomics_mcp_server/tools/sequence_store_tools.py new file mode 100644 index 0000000000..043e575cd6 --- /dev/null +++ b/src/aws-healthomics-mcp-server/awslabs/aws_healthomics_mcp_server/tools/sequence_store_tools.py @@ -0,0 +1,447 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Sequence store tools for the AWS HealthOmics MCP server.""" + +import botocore.exceptions +from awslabs.aws_healthomics_mcp_server.consts import ( + DEFAULT_MAX_RESULTS, +) +from awslabs.aws_healthomics_mcp_server.utils.aws_utils import ( + get_omics_client, +) +from datetime import datetime +from loguru import logger +from mcp.server.fastmcp import Context +from pydantic import Field +from typing import Any, Dict, List, Optional + + +async def list_aho_sequence_stores( + ctx: Context, + max_results: int = Field( + DEFAULT_MAX_RESULTS, + description='Maximum number of sequence stores to return', + ge=1, + le=100, + ), + next_token: Optional[str] = Field( + None, + description='Token for pagination from a previous response', + ), +) -> Dict[str, Any]: + """List available HealthOmics sequence stores. + + Args: + ctx: MCP context for error reporting + max_results: Maximum number of sequence stores to return + next_token: Token for pagination from a previous response + + Returns: + Dictionary containing sequence stores list and pagination info + + Raises: + Exception: If there's an error listing sequence stores + """ + try: + client = get_omics_client() + + params = {'maxResults': max_results} + + if next_token: + params['nextToken'] = next_token + + response = client.list_sequence_stores(**params) + + return { + 'sequenceStores': response.get('sequenceStores', []), + 'nextToken': response.get('nextToken'), + 'totalCount': len(response.get('sequenceStores', [])), + } + + except botocore.exceptions.ClientError as e: + error_code = e.response['Error']['Code'] + error_message = e.response['Error']['Message'] + + logger.error(f'Failed to list sequence stores: {error_code} - {error_message}') + + raise Exception(f'Failed to list sequence stores: {error_code} - {error_message}') + except Exception as e: + logger.error(f'Unexpected error listing sequence stores: {str(e)}') + raise Exception(f'Failed to list sequence stores: {str(e)}') + + +async def list_aho_read_sets( + ctx: Context, + sequence_store_id: str = Field( + ..., + description='ID of the sequence store to list read sets from', + ), + species: Optional[str] = Field( + None, + description='Filter by species name', + ), + chromosome: Optional[str] = Field( + None, + description='Filter by chromosome', + ), + uploaded_after: Optional[str] = Field( + None, + description='ISO datetime string to filter read sets uploaded after this time', + ), + max_results: int = Field( + DEFAULT_MAX_RESULTS, + description='Maximum number of read sets to return', + ge=1, + le=100, + ), + next_token: Optional[str] = Field( + None, + description='Token for pagination from a previous response', + ), +) -> Dict[str, Any]: + """List read sets from a HealthOmics sequence store with optional filters. + + Args: + ctx: MCP context for error reporting + sequence_store_id: ID of the sequence store + species: Filter by species name + chromosome: Filter by chromosome + uploaded_after: ISO datetime string for filtering + max_results: Maximum number of read sets to return + next_token: Token for pagination + + Returns: + Dictionary containing read sets list and pagination info + + Raises: + Exception: If there's an error listing read sets + """ + try: + client = get_omics_client() + + params = {'sequenceStoreId': sequence_store_id, 'maxResults': max_results} + + if next_token: + params['nextToken'] = next_token + + # Apply filters + filters = {} + + if species: + # Note: This would need to be mapped to actual read set metadata fields + # The exact filtering depends on how metadata is stored + logger.info(f'Filtering by species: {species}') + + if chromosome: + logger.info(f'Filtering by chromosome: {chromosome}') + + if uploaded_after: + try: + upload_time = datetime.fromisoformat(uploaded_after.replace('Z', '+00:00')) + filters['createdAfter'] = upload_time + except ValueError: + raise Exception(f'Invalid datetime format for uploaded_after: {uploaded_after}') + + if filters: + params['filter'] = filters + + response = client.list_read_sets(**params) + + # Post-process for additional filtering if needed + read_sets = response.get('readSets', []) + + if species or chromosome: + filtered_read_sets = [] + for read_set in read_sets: + include = True + + # Filter by species in metadata + if species: + read_set_species = read_set.get('subjectId', '').lower() + if species.lower() not in read_set_species: + include = False + + # Filter by chromosome would require examining file references + if chromosome and include: + # This would need implementation based on HealthOmics data structure + pass + + if include: + filtered_read_sets.append(read_set) + + read_sets = filtered_read_sets + + return { + 'readSets': read_sets, + 'nextToken': response.get('nextToken'), + 'totalCount': len(read_sets), + 'sequenceStoreId': sequence_store_id, + 'appliedFilters': { + 'species': species, + 'chromosome': chromosome, + 'uploadedAfter': uploaded_after, + }, + } + + except botocore.exceptions.ClientError as e: + error_code = e.response['Error']['Code'] + error_message = e.response['Error']['Message'] + + logger.error(f'Failed to list read sets: {error_code} - {error_message}') + + raise Exception(f'Failed to list read sets: {error_code} - {error_message}') + except Exception as e: + logger.error(f'Unexpected error listing read sets: {str(e)}') + raise Exception(f'Failed to list read sets: {str(e)}') + + +async def get_aho_read_set( + ctx: Context, + sequence_store_id: str = Field( + ..., + description='ID of the sequence store', + ), + read_set_id: str = Field( + ..., + description='ID of the read set to retrieve', + ), +) -> Dict[str, Any]: + """Get detailed metadata for a specific read set. + + Args: + ctx: MCP context for error reporting + sequence_store_id: ID of the sequence store + read_set_id: ID of the read set + + Returns: + Dictionary containing read set metadata + + Raises: + Exception: If there's an error retrieving read set metadata + """ + try: + client = get_omics_client() + + response = client.get_read_set_metadata(sequenceStoreId=sequence_store_id, id=read_set_id) + + return { + 'readSet': { + 'id': response.get('id'), + 'arn': response.get('arn'), + 'name': response.get('name'), + 'description': response.get('description'), + 'sampleId': response.get('sampleId'), + 'subjectId': response.get('subjectId'), + 'referenceArn': response.get('referenceArn'), + 'fileType': response.get('fileType'), + 'status': response.get('status'), + 'statusMessage': response.get('statusMessage'), + 'creationTime': response.get('creationTime'), + 'sequenceInformation': response.get('sequenceInformation'), + 'files': response.get('files'), + 'etag': response.get('etag'), + }, + 'sequenceStoreId': sequence_store_id, + } + + except botocore.exceptions.ClientError as e: + error_code = e.response['Error']['Code'] + error_message = e.response['Error']['Message'] + + logger.error(f'Failed to get read set metadata: {error_code} - {error_message}') + + raise Exception(f'Failed to get read set metadata: {error_code} - {error_message}') + except Exception as e: + logger.error(f'Unexpected error getting read set metadata: {str(e)}') + raise Exception(f'Failed to get read set metadata: {str(e)}') + + +async def start_aho_read_set_import_job( + ctx: Context, + sequence_store_id: str = Field( + ..., + description='ID of the sequence store to import into', + ), + role_arn: str = Field( + ..., + description='ARN of the IAM role to use for the import', + ), + sources: List[Dict[str, Any]] = Field( + ..., + description='List of source files to import from S3', + ), + client_token: Optional[str] = Field( + None, + description='Client token for idempotency', + ), +) -> Dict[str, Any]: + """Start a read set import job from S3 sources. + + Args: + ctx: MCP context for error reporting + sequence_store_id: ID of the sequence store + role_arn: ARN of the IAM role for the import + sources: List of source file configurations + client_token: Client token for idempotency + + Returns: + Dictionary containing import job information + + Raises: + Exception: If there's an error starting the import job + """ + try: + client = get_omics_client() + + params = {'sequenceStoreId': sequence_store_id, 'roleArn': role_arn, 'sources': sources} + + if client_token: + params['clientToken'] = client_token + + response = client.start_read_set_import_job(**params) + + return { + 'id': response.get('id'), + 'sequenceStoreId': response.get('sequenceStoreId'), + 'roleArn': response.get('roleArn'), + 'status': response.get('status'), + 'creationTime': response.get('creationTime'), + 'sources': sources, + } + + except botocore.exceptions.ClientError as e: + error_code = e.response['Error']['Code'] + error_message = e.response['Error']['Message'] + + logger.error(f'Failed to start read set import job: {error_code} - {error_message}') + + raise Exception(f'Failed to start read set import job: {error_code} - {error_message}') + except Exception as e: + logger.error(f'Unexpected error starting read set import job: {str(e)}') + raise Exception(f'Failed to start read set import job: {str(e)}') + + +async def get_aho_read_set_import_job( + ctx: Context, + sequence_store_id: str = Field( + ..., + description='ID of the sequence store', + ), + import_job_id: str = Field( + ..., + description='ID of the import job', + ), +) -> Dict[str, Any]: + """Get the status and details of a read set import job. + + Args: + ctx: MCP context for error reporting + sequence_store_id: ID of the sequence store + import_job_id: ID of the import job + + Returns: + Dictionary containing import job status and details + + Raises: + Exception: If there's an error retrieving import job status + """ + try: + client = get_omics_client() + + response = client.get_read_set_import_job( + sequenceStoreId=sequence_store_id, id=import_job_id + ) + + return { + 'id': response.get('id'), + 'sequenceStoreId': response.get('sequenceStoreId'), + 'roleArn': response.get('roleArn'), + 'status': response.get('status'), + 'statusMessage': response.get('statusMessage'), + 'creationTime': response.get('creationTime'), + 'completionTime': response.get('completionTime'), + 'sources': response.get('sources', []), + } + + except botocore.exceptions.ClientError as e: + error_code = e.response['Error']['Code'] + error_message = e.response['Error']['Message'] + + logger.error(f'Failed to get read set import job: {error_code} - {error_message}') + + raise Exception(f'Failed to get read set import job: {error_code} - {error_message}') + except Exception as e: + logger.error(f'Unexpected error getting read set import job: {str(e)}') + raise Exception(f'Failed to get read set import job: {str(e)}') + + +async def list_aho_read_set_import_jobs( + ctx: Context, + sequence_store_id: str = Field( + ..., + description='ID of the sequence store', + ), + max_results: int = Field( + DEFAULT_MAX_RESULTS, + description='Maximum number of import jobs to return', + ge=1, + le=100, + ), + next_token: Optional[str] = Field( + None, + description='Token for pagination from a previous response', + ), +) -> Dict[str, Any]: + """List read set import jobs for a sequence store. + + Args: + ctx: MCP context for error reporting + sequence_store_id: ID of the sequence store + max_results: Maximum number of import jobs to return + next_token: Token for pagination + + Returns: + Dictionary containing import jobs list and pagination info + + Raises: + Exception: If there's an error listing import jobs + """ + try: + client = get_omics_client() + + params = {'sequenceStoreId': sequence_store_id, 'maxResults': max_results} + + if next_token: + params['nextToken'] = next_token + + response = client.list_read_set_import_jobs(**params) + + return { + 'importJobs': response.get('importJobs', []), + 'nextToken': response.get('nextToken'), + 'totalCount': len(response.get('importJobs', [])), + 'sequenceStoreId': sequence_store_id, + } + + except botocore.exceptions.ClientError as e: + error_code = e.response['Error']['Code'] + error_message = e.response['Error']['Message'] + + logger.error(f'Failed to list read set import jobs: {error_code} - {error_message}') + + raise Exception(f'Failed to list read set import jobs: {error_code} - {error_message}') + except Exception as e: + logger.error(f'Unexpected error listing read set import jobs: {str(e)}') + raise Exception(f'Failed to list read set import jobs: {str(e)}') diff --git a/src/aws-healthomics-mcp-server/awslabs/aws_healthomics_mcp_server/tools/variant_store_tools.py b/src/aws-healthomics-mcp-server/awslabs/aws_healthomics_mcp_server/tools/variant_store_tools.py new file mode 100644 index 0000000000..b664695c33 --- /dev/null +++ b/src/aws-healthomics-mcp-server/awslabs/aws_healthomics_mcp_server/tools/variant_store_tools.py @@ -0,0 +1,443 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Variant store tools for the AWS HealthOmics MCP server.""" + +import botocore.exceptions +from awslabs.aws_healthomics_mcp_server.consts import ( + DEFAULT_MAX_RESULTS, +) +from awslabs.aws_healthomics_mcp_server.utils.aws_utils import ( + get_omics_client, +) +from loguru import logger +from mcp.server.fastmcp import Context +from pydantic import Field +from typing import Any, Dict, List, Optional + + +async def list_aho_variant_stores( + ctx: Context, + max_results: int = Field( + DEFAULT_MAX_RESULTS, + description='Maximum number of variant stores to return', + ge=1, + le=100, + ), + next_token: Optional[str] = Field( + None, + description='Token for pagination from a previous response', + ), +) -> Dict[str, Any]: + """List available HealthOmics variant stores. + + Args: + ctx: MCP context for error reporting + max_results: Maximum number of variant stores to return + next_token: Token for pagination from a previous response + + Returns: + Dictionary containing variant stores list and pagination info + + Raises: + Exception: If there's an error listing variant stores + """ + try: + client = get_omics_client() + + params = {'maxResults': max_results} + + if next_token: + params['nextToken'] = next_token + + response = client.list_variant_stores(**params) + + return { + 'variantStores': response.get('variantStores', []), + 'nextToken': response.get('nextToken'), + 'totalCount': len(response.get('variantStores', [])), + } + + except botocore.exceptions.ClientError as e: + error_code = e.response['Error']['Code'] + error_message = e.response['Error']['Message'] + + logger.error(f'Failed to list variant stores: {error_code} - {error_message}') + + raise Exception(f'Failed to list variant stores: {error_code} - {error_message}') + except Exception as e: + logger.error(f'Unexpected error listing variant stores: {str(e)}') + raise Exception(f'Failed to list variant stores: {str(e)}') + + +async def get_aho_variant_store( + ctx: Context, + variant_store_id: str = Field( + ..., + description='ID of the variant store to retrieve', + ), +) -> Dict[str, Any]: + """Get detailed information about a specific variant store. + + Args: + ctx: MCP context for error reporting + variant_store_id: ID of the variant store + + Returns: + Dictionary containing variant store details + + Raises: + Exception: If there's an error retrieving variant store information + """ + try: + client = get_omics_client() + + response = client.get_variant_store(name=variant_store_id) + + return { + 'variantStore': { + 'id': response.get('id'), + 'reference': response.get('reference'), + 'status': response.get('status'), + 'statusMessage': response.get('statusMessage'), + 'storeArn': response.get('storeArn'), + 'name': response.get('name'), + 'description': response.get('description'), + 'sseConfig': response.get('sseConfig'), + 'creationTime': response.get('creationTime'), + 'updateTime': response.get('updateTime'), + 'tags': response.get('tags', {}), + } + } + + except botocore.exceptions.ClientError as e: + error_code = e.response['Error']['Code'] + error_message = e.response['Error']['Message'] + + logger.error(f'Failed to get variant store: {error_code} - {error_message}') + + raise Exception(f'Failed to get variant store: {error_code} - {error_message}') + except Exception as e: + logger.error(f'Unexpected error getting variant store: {str(e)}') + raise Exception(f'Failed to get variant store: {str(e)}') + + +async def search_aho_variants( + ctx: Context, + variant_store_id: str = Field( + ..., + description='ID of the variant store to search', + ), + gene: Optional[str] = Field( + None, + description='Gene name to search for variants (e.g., BRCA1)', + ), + chromosome: Optional[str] = Field( + None, + description='Chromosome to search (e.g., chr1, 1)', + ), + start_position: Optional[int] = Field( + None, + description='Start position for genomic range search', + ge=1, + ), + end_position: Optional[int] = Field( + None, + description='End position for genomic range search', + ge=1, + ), + variant_type: Optional[str] = Field( + None, + description='Type of variant (e.g., SNV, INDEL, CNV)', + ), + max_results: int = Field( + DEFAULT_MAX_RESULTS, + description='Maximum number of variants to return', + ge=1, + le=1000, + ), + next_token: Optional[str] = Field( + None, + description='Token for pagination from a previous response', + ), +) -> Dict[str, Any]: + """Search for variants in a HealthOmics variant store. + + Args: + ctx: MCP context for error reporting + variant_store_id: ID of the variant store + gene: Gene name to search for + chromosome: Chromosome to search + start_position: Start position for range search + end_position: End position for range search + variant_type: Type of variant to search for + max_results: Maximum number of variants to return + next_token: Token for pagination + + Returns: + Dictionary containing search results + + Raises: + Exception: If there's an error searching variants + """ + try: + client = get_omics_client() + + # Build search criteria + filter_criteria = {} + + if gene: + filter_criteria['gene'] = {'eq': gene} + + if chromosome: + # Normalize chromosome format + if not chromosome.startswith('chr'): + chromosome = f'chr{chromosome}' + filter_criteria['contigName'] = {'eq': chromosome} + + if start_position is not None and end_position is not None: + filter_criteria['start'] = {'gte': start_position} + filter_criteria['end'] = {'lte': end_position} + elif start_position is not None: + filter_criteria['start'] = {'gte': start_position} + elif end_position is not None: + filter_criteria['end'] = {'lte': end_position} + + if variant_type: + filter_criteria['variantType'] = {'eq': variant_type.upper()} + + params = {'variantStoreId': variant_store_id, 'maxResults': max_results} + + if filter_criteria: + params['filter'] = filter_criteria + + if next_token: + params['nextToken'] = next_token + + response = client.search_variants(**params) + + return { + 'variants': response.get('variants', []), + 'nextToken': response.get('nextToken'), + 'totalCount': len(response.get('variants', [])), + 'variantStoreId': variant_store_id, + 'searchCriteria': { + 'gene': gene, + 'chromosome': chromosome, + 'startPosition': start_position, + 'endPosition': end_position, + 'variantType': variant_type, + }, + } + + except botocore.exceptions.ClientError as e: + error_code = e.response['Error']['Code'] + error_message = e.response['Error']['Message'] + + logger.error(f'Failed to search variants: {error_code} - {error_message}') + + raise Exception(f'Failed to search variants: {error_code} - {error_message}') + except Exception as e: + logger.error(f'Unexpected error searching variants: {str(e)}') + raise Exception(f'Failed to search variants: {str(e)}') + + +async def count_aho_variants( + ctx: Context, + variant_store_id: str = Field( + ..., + description='ID of the variant store to count variants in', + ), + gene: Optional[str] = Field( + None, + description='Gene name to count variants for', + ), + chromosome: Optional[str] = Field( + None, + description='Chromosome to count variants on', + ), + variant_type: Optional[str] = Field( + None, + description='Type of variant to count', + ), +) -> Dict[str, Any]: + """Count variants in a HealthOmics variant store with optional filters. + + Args: + ctx: MCP context for error reporting + variant_store_id: ID of the variant store + gene: Gene name to filter by + chromosome: Chromosome to filter by + variant_type: Type of variant to filter by + + Returns: + Dictionary containing variant count information + + Raises: + Exception: If there's an error counting variants + """ + try: + # Use search with a large max_results to get an accurate count + # In practice, this might need to be implemented differently + # depending on the actual HealthOmics API capabilities + + search_result = await search_aho_variants( + ctx=ctx, + variant_store_id=variant_store_id, + gene=gene, + chromosome=chromosome, + variant_type=variant_type, + max_results=1000, # Use large number for counting + ) + + # If there's a nextToken, we need to continue searching to get accurate count + total_count = len(search_result['variants']) + has_more = 'nextToken' in search_result and search_result['nextToken'] + + return { + 'variantCount': total_count, + 'hasMoreResults': has_more, + 'variantStoreId': variant_store_id, + 'countCriteria': {'gene': gene, 'chromosome': chromosome, 'variantType': variant_type}, + 'note': 'Count may be partial if hasMoreResults is true', + } + + except Exception as e: + logger.error(f'Unexpected error counting variants: {str(e)}') + raise Exception(f'Failed to count variants: {str(e)}') + + +async def start_aho_variant_import_job( + ctx: Context, + variant_store_id: str = Field( + ..., + description='ID of the variant store to import into', + ), + role_arn: str = Field( + ..., + description='ARN of the IAM role to use for the import', + ), + items: List[Dict[str, Any]] = Field( + ..., + description='List of VCF files to import from S3', + ), + run_left_normalization: bool = Field( + False, + description='Whether to run left normalization on variants', + ), + client_token: Optional[str] = Field( + None, + description='Client token for idempotency', + ), +) -> Dict[str, Any]: + """Start a variant import job from S3 VCF files. + + Args: + ctx: MCP context for error reporting + variant_store_id: ID of the variant store + role_arn: ARN of the IAM role for the import + items: List of VCF file configurations + run_left_normalization: Whether to run left normalization + client_token: Client token for idempotency + + Returns: + Dictionary containing import job information + + Raises: + Exception: If there's an error starting the import job + """ + try: + client = get_omics_client() + + params = { + 'variantStoreId': variant_store_id, + 'roleArn': role_arn, + 'items': items, + 'runLeftNormalization': run_left_normalization, + } + + if client_token: + params['clientToken'] = client_token + + response = client.start_variant_import_job(**params) + + return { + 'id': response.get('id'), + 'variantStoreId': response.get('variantStoreId'), + 'roleArn': response.get('roleArn'), + 'status': response.get('status'), + 'creationTime': response.get('creationTime'), + 'runLeftNormalization': run_left_normalization, + 'items': items, + } + + except botocore.exceptions.ClientError as e: + error_code = e.response['Error']['Code'] + error_message = e.response['Error']['Message'] + + logger.error(f'Failed to start variant import job: {error_code} - {error_message}') + + raise Exception(f'Failed to start variant import job: {error_code} - {error_message}') + except Exception as e: + logger.error(f'Unexpected error starting variant import job: {str(e)}') + raise Exception(f'Failed to start variant import job: {str(e)}') + + +async def get_aho_variant_import_job( + ctx: Context, + variant_import_job_id: str = Field( + ..., + description='ID of the variant import job', + ), +) -> Dict[str, Any]: + """Get the status and details of a variant import job. + + Args: + ctx: MCP context for error reporting + variant_import_job_id: ID of the variant import job + + Returns: + Dictionary containing import job status and details + + Raises: + Exception: If there's an error retrieving import job status + """ + try: + client = get_omics_client() + + response = client.get_variant_import_job(id=variant_import_job_id) + + return { + 'id': response.get('id'), + 'variantStoreId': response.get('variantStoreId'), + 'roleArn': response.get('roleArn'), + 'status': response.get('status'), + 'statusMessage': response.get('statusMessage'), + 'creationTime': response.get('creationTime'), + 'updateTime': response.get('updateTime'), + 'completionTime': response.get('completionTime'), + 'items': response.get('items', []), + 'runLeftNormalization': response.get('runLeftNormalization'), + } + + except botocore.exceptions.ClientError as e: + error_code = e.response['Error']['Code'] + error_message = e.response['Error']['Message'] + + logger.error(f'Failed to get variant import job: {error_code} - {error_message}') + + raise Exception(f'Failed to get variant import job: {error_code} - {error_message}') + except Exception as e: + logger.error(f'Unexpected error getting variant import job: {str(e)}') + raise Exception(f'Failed to get variant import job: {str(e)}') diff --git a/src/aws-healthomics-mcp-server/tests/test_data_import_tools.py b/src/aws-healthomics-mcp-server/tests/test_data_import_tools.py new file mode 100644 index 0000000000..62c1d0e846 --- /dev/null +++ b/src/aws-healthomics-mcp-server/tests/test_data_import_tools.py @@ -0,0 +1,335 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"). You may not use this file except in compliance +# with the License. A copy of the License is located at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# or in the 'license' file accompanying this file. This file is distributed on an 'AS IS' BASIS, WITHOUT WARRANTIES +# OR CONDITIONS OF ANY KIND, express or implied. See the License for the specific language governing permissions +# and limitations under the License. +"""Tests for data import tools.""" + +import pytest +from awslabs.aws_healthomics_mcp_server.tools.data_import_tools import ( + discover_aho_genomic_files, + get_aho_s3_file_metadata, + list_aho_s3_bucket_contents, + parse_s3_uri, + prepare_aho_import_sources, + validate_aho_s3_uri_format, +) +from mcp.server.fastmcp import Context +from unittest.mock import MagicMock, patch + + +@pytest.fixture +def mock_context(): + """Create a mock MCP context.""" + return MagicMock(spec=Context) + + +@pytest.fixture +def mock_s3_client(): + """Create a mock S3 client.""" + client = MagicMock() + return client + + +class TestDataImportTools: + """Test data import tools functionality.""" + + def test_parse_s3_uri_valid(self): + """Test parsing valid S3 URIs.""" + # Test basic S3 URI + result = parse_s3_uri('s3://my-bucket/path/to/file.txt') + assert result['bucket'] == 'my-bucket' + assert result['key'] == 'path/to/file.txt' + + # Test S3 URI with just bucket + result = parse_s3_uri('s3://my-bucket/') + assert result['bucket'] == 'my-bucket' + assert result['key'] == '' + + # Test S3 URI with nested path + result = parse_s3_uri('s3://my-bucket/deep/nested/path/file.fastq.gz') + assert result['bucket'] == 'my-bucket' + assert result['key'] == 'deep/nested/path/file.fastq.gz' + + def test_parse_s3_uri_invalid(self): + """Test parsing invalid S3 URIs.""" + with pytest.raises(ValueError): + parse_s3_uri('http://bucket/file.txt') + + with pytest.raises(ValueError): + parse_s3_uri('bucket/file.txt') + + with pytest.raises(ValueError): + parse_s3_uri('') + + @pytest.mark.asyncio + async def test_validate_aho_s3_uri_format_valid(self, mock_context): + """Test validation of valid S3 URIs.""" + # Test valid S3 URI + result = await validate_aho_s3_uri_format( + mock_context, 's3://my-genomics-bucket/data/sample1.fastq.gz' + ) + + assert result['valid'] is True + assert result['bucket'] == 'my-genomics-bucket' + assert result['key'] == 'data/sample1.fastq.gz' + + @pytest.mark.asyncio + async def test_validate_aho_s3_uri_format_invalid(self, mock_context): + """Test validation of invalid S3 URIs.""" + # Test invalid protocol + result = await validate_aho_s3_uri_format(mock_context, 'http://bucket/file.txt') + assert result['valid'] is False + assert 'must start with s3://' in result['error'] + + # Test non-string input + result = await validate_aho_s3_uri_format(mock_context, 123) + assert result['valid'] is False + assert 'must be a string' in result['error'] + + # Test empty bucket + result = await validate_aho_s3_uri_format(mock_context, 's3:///file.txt') + assert result['valid'] is False + assert 'cannot be empty' in result['error'] + + @patch('awslabs.aws_healthomics_mcp_server.tools.data_import_tools.create_aws_client') + @pytest.mark.asyncio + async def test_discover_aho_genomic_files_success( + self, mock_create_client, mock_context, mock_s3_client + ): + """Test successful genomic file discovery.""" + # Arrange + mock_create_client.return_value = mock_s3_client + + # Mock paginator + mock_paginator = MagicMock() + mock_s3_client.get_paginator.return_value = mock_paginator + + from datetime import datetime + + mock_page = { + 'Contents': [ + { + 'Key': 'genomics/sample1_R1.fastq.gz', + 'Size': 1024000, + 'LastModified': datetime(2023, 10, 1, 12, 0, 0), + 'ETag': '"abc123"', + }, + { + 'Key': 'genomics/sample1_R2.fastq.gz', + 'Size': 1024000, + 'LastModified': datetime(2023, 10, 1, 12, 0, 0), + 'ETag': '"def456"', + }, + { + 'Key': 'genomics/variants.vcf.gz', + 'Size': 512000, + 'LastModified': datetime(2023, 10, 1, 13, 0, 0), + 'ETag': '"ghi789"', + }, + ] + } + mock_paginator.paginate.return_value = [mock_page] + + # Act + result = await discover_aho_genomic_files( + mock_context, 's3://test-bucket/genomics/', file_types=['FASTQ', 'VCF'], max_files=100 + ) + + # Assert + assert 'discoveredFiles' in result + assert 'totalCount' in result + assert result['totalCount'] == 3 + assert len(result['discoveredFiles']) == 3 + + # Check FASTQ files + fastq_files = [f for f in result['discoveredFiles'] if f['fileType'] == 'FASTQ'] + assert len(fastq_files) == 2 + + # Check VCF files + vcf_files = [f for f in result['discoveredFiles'] if f['fileType'] == 'VCF'] + assert len(vcf_files) == 1 + + # Verify S3 URIs are correctly formed + for file_info in result['discoveredFiles']: + assert file_info['s3Uri'].startswith('s3://test-bucket/') + + @patch('awslabs.aws_healthomics_mcp_server.tools.data_import_tools.create_aws_client') + @pytest.mark.asyncio + async def test_list_aho_s3_bucket_contents_success( + self, mock_create_client, mock_context, mock_s3_client + ): + """Test successful S3 bucket content listing.""" + # Arrange + mock_create_client.return_value = mock_s3_client + + mock_paginator = MagicMock() + mock_s3_client.get_paginator.return_value = mock_paginator + + from datetime import datetime + + mock_page = { + 'Contents': [ + { + 'Key': 'data/file1.txt', + 'Size': 1024, + 'LastModified': datetime(2023, 10, 1, 12, 0, 0), + 'ETag': '"abc123"', + 'StorageClass': 'STANDARD', + }, + { + 'Key': 'data/file2.txt', + 'Size': 2048, + 'LastModified': datetime(2023, 10, 1, 13, 0, 0), + 'ETag': '"def456"', + 'StorageClass': 'STANDARD', + }, + ] + } + mock_paginator.paginate.return_value = [mock_page] + + # Act + result = await list_aho_s3_bucket_contents( + mock_context, 's3://test-bucket/data/', pattern='*.txt', max_keys=1000 + ) + + # Assert + assert 'objects' in result + assert 'totalCount' in result + assert result['totalCount'] == 2 + assert len(result['objects']) == 2 + assert result['appliedPattern'] == '*.txt' + + for obj in result['objects']: + assert 's3Uri' in obj + assert 'fileName' in obj + assert 'size' in obj + + @patch('awslabs.aws_healthomics_mcp_server.tools.data_import_tools.create_aws_client') + @pytest.mark.asyncio + async def test_get_aho_s3_file_metadata_success( + self, mock_create_client, mock_context, mock_s3_client + ): + """Test successful S3 file metadata retrieval.""" + # Arrange + mock_create_client.return_value = mock_s3_client + + from datetime import datetime + + mock_response = { + 'ContentLength': 1024000, + 'LastModified': datetime(2023, 10, 1, 12, 0, 0), + 'ETag': '"abc123def456"', # pragma: allowlist secret + 'ContentType': 'application/gzip', + 'StorageClass': 'STANDARD', + 'Metadata': {'sample-id': 'sample-001', 'experiment': 'rna-seq'}, + } + mock_s3_client.head_object.return_value = mock_response + + # Act + result = await get_aho_s3_file_metadata( + mock_context, 's3://test-bucket/data/sample1.fastq.gz' + ) + + # Assert + assert 'fileMetadata' in result + metadata = result['fileMetadata'] + assert metadata['s3Uri'] == 's3://test-bucket/data/sample1.fastq.gz' + assert metadata['bucket'] == 'test-bucket' + assert metadata['key'] == 'data/sample1.fastq.gz' + assert metadata['fileName'] == 'sample1.fastq.gz' + assert metadata['size'] == 1024000 + assert metadata['contentType'] == 'application/gzip' + assert 'sample-id' in metadata['metadata'] + + @patch('awslabs.aws_healthomics_mcp_server.tools.data_import_tools.create_aws_client') + @pytest.mark.asyncio + async def test_get_aho_s3_file_metadata_not_found( + self, mock_create_client, mock_context, mock_s3_client + ): + """Test S3 file metadata retrieval for non-existent file.""" + # Arrange + from botocore.exceptions import ClientError + + mock_create_client.return_value = mock_s3_client + + error_response = { + 'Error': {'Code': 'NoSuchKey', 'Message': 'The specified key does not exist.'} + } + mock_s3_client.head_object.side_effect = ClientError(error_response, 'HeadObject') + + # Act & Assert + with pytest.raises(Exception) as exc_info: + await get_aho_s3_file_metadata(mock_context, 's3://test-bucket/nonexistent.txt') + + assert 'File not found' in str(exc_info.value) + + @pytest.mark.asyncio + async def test_prepare_aho_import_sources_success(self, mock_context): + """Test successful preparation of import sources.""" + # Arrange + files = [ + {'s3Uri': 's3://test-bucket/sample1_R1.fastq.gz', 'fileType': 'FASTQ'}, + {'s3Uri': 's3://test-bucket/sample1_R2.fastq.gz', 'fileType': 'FASTQ'}, + ] + + # Act + result = await prepare_aho_import_sources( + mock_context, + files=files, + sample_id='sample-001', + subject_id='patient-123', + reference_arn='arn:aws:omics:us-east-1:123456789012:referenceStore/123456789012345678/reference/1234567890123456', + ) + + # Assert + assert 'importSources' in result + assert 'totalFiles' in result + assert 'configuration' in result + + assert result['totalFiles'] == 2 + assert result['configuration']['sampleId'] == 'sample-001' + assert result['configuration']['subjectId'] == 'patient-123' + + # Check that paired-end FASTQ files are properly configured + sources = result['importSources'] + assert len(sources) == 2 + + for source in sources: + assert 'sourceFileType' in source + assert 'sourceFiles' in source + assert source['sourceFileType'] == 'FASTQ' + assert 'sampleId' in source + assert 'subjectId' in source + + @pytest.mark.asyncio + async def test_prepare_aho_import_sources_paired_end(self, mock_context): + """Test preparation of paired-end FASTQ import sources.""" + # Arrange + files = [ + {'s3Uri': 's3://test-bucket/sample1_R1.fastq.gz', 'fileType': 'FASTQ'}, + {'s3Uri': 's3://test-bucket/sample1_R2.fastq.gz', 'fileType': 'FASTQ'}, + ] + + # Act + result = await prepare_aho_import_sources( + mock_context, files=files, sample_id='sample-001' + ) + + # Assert + sources = result['importSources'] + + # Find the R1 file source + r1_source = next( + (s for s in sources if 'sample1_R1.fastq.gz' in s['sourceFiles']['source1']), None + ) + assert r1_source is not None + + # Check if R2 was automatically paired + if 'source2' in r1_source['sourceFiles']: + assert 'sample1_R2.fastq.gz' in r1_source['sourceFiles']['source2'] diff --git a/src/aws-healthomics-mcp-server/tests/test_sequence_store_tools.py b/src/aws-healthomics-mcp-server/tests/test_sequence_store_tools.py new file mode 100644 index 0000000000..26aa6206e6 --- /dev/null +++ b/src/aws-healthomics-mcp-server/tests/test_sequence_store_tools.py @@ -0,0 +1,276 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"). You may not use this file except in compliance +# with the License. A copy of the License is located at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# or in the 'license' file accompanying this file. This file is distributed on an 'AS IS' BASIS, WITHOUT WARRANTIES +# OR CONDITIONS OF ANY KIND, express or implied. See the License for the specific language governing permissions +# and limitations under the License. +"""Tests for sequence store tools.""" + +import pytest +from awslabs.aws_healthomics_mcp_server.tools.sequence_store_tools import ( + get_aho_read_set, + get_aho_read_set_import_job, + list_aho_read_sets, + list_aho_sequence_stores, + start_aho_read_set_import_job, +) +from mcp.server.fastmcp import Context +from unittest.mock import MagicMock, patch + + +@pytest.fixture +def mock_context(): + """Create a mock MCP context.""" + return MagicMock(spec=Context) + + +@pytest.fixture +def mock_omics_client(): + """Create a mock HealthOmics client.""" + client = MagicMock() + return client + + +class TestSequenceStoreTools: + """Test sequence store tools functionality.""" + + @patch('awslabs.aws_healthomics_mcp_server.tools.sequence_store_tools.get_omics_client') + @pytest.mark.asyncio + async def test_list_aho_sequence_stores_success( + self, mock_get_client, mock_context, mock_omics_client + ): + """Test successful listing of sequence stores.""" + # Arrange + mock_get_client.return_value = mock_omics_client + mock_response = { + 'sequenceStores': [ + { + 'id': 'test-store-1', + 'name': 'Test Store 1', + 'arn': 'arn:aws:omics:us-east-1:123456789012:sequenceStore/test-store-1', + } + ] + } + mock_omics_client.list_sequence_stores.return_value = mock_response + + # Act + result = await list_aho_sequence_stores(mock_context, max_results=10) + + # Assert + assert 'sequenceStores' in result + assert 'totalCount' in result + assert result['totalCount'] == 1 + assert len(result['sequenceStores']) == 1 + assert result['sequenceStores'][0]['id'] == 'test-store-1' + mock_omics_client.list_sequence_stores.assert_called_once_with(maxResults=10) + + @patch('awslabs.aws_healthomics_mcp_server.tools.sequence_store_tools.get_omics_client') + @pytest.mark.asyncio + async def test_list_aho_read_sets_success( + self, mock_get_client, mock_context, mock_omics_client + ): + """Test successful listing of read sets.""" + # Arrange + mock_get_client.return_value = mock_omics_client + mock_response = { + 'readSets': [ + { + 'id': 'test-readset-1', + 'arn': 'arn:aws:omics:us-east-1:123456789012:readSet/test-readset-1', + 'sequenceStoreId': 'test-store-1', + 'sampleId': 'sample-1', + 'subjectId': 'subject-1', + } + ] + } + mock_omics_client.list_read_sets.return_value = mock_response + + # Act + result = await list_aho_read_sets( + mock_context, sequence_store_id='test-store-1', max_results=10 + ) + + # Assert + assert 'readSets' in result + assert 'totalCount' in result + assert 'sequenceStoreId' in result + assert result['sequenceStoreId'] == 'test-store-1' + assert result['totalCount'] == 1 + assert len(result['readSets']) == 1 + assert result['readSets'][0]['id'] == 'test-readset-1' + mock_omics_client.list_read_sets.assert_called_once() + + @patch('awslabs.aws_healthomics_mcp_server.tools.sequence_store_tools.get_omics_client') + @pytest.mark.asyncio + async def test_get_aho_read_set_success( + self, mock_get_client, mock_context, mock_omics_client + ): + """Test successful retrieval of read set metadata.""" + # Arrange + mock_get_client.return_value = mock_omics_client + mock_response = { + 'id': 'test-readset-1', + 'arn': 'arn:aws:omics:us-east-1:123456789012:readSet/test-readset-1', + 'sequenceStoreId': 'test-store-1', + 'name': 'Test Read Set', + 'sampleId': 'sample-1', + 'subjectId': 'subject-1', + 'status': 'ACTIVE', + 'fileType': 'FASTQ', + } + mock_omics_client.get_read_set_metadata.return_value = mock_response + + # Act + result = await get_aho_read_set( + mock_context, sequence_store_id='test-store-1', read_set_id='test-readset-1' + ) + + # Assert + assert 'readSet' in result + assert 'sequenceStoreId' in result + assert result['sequenceStoreId'] == 'test-store-1' + assert result['readSet']['id'] == 'test-readset-1' + assert result['readSet']['status'] == 'ACTIVE' + mock_omics_client.get_read_set_metadata.assert_called_once_with( + sequenceStoreId='test-store-1', id='test-readset-1' + ) + + @patch('awslabs.aws_healthomics_mcp_server.tools.sequence_store_tools.get_omics_client') + @pytest.mark.asyncio + async def test_start_aho_read_set_import_job_success( + self, mock_get_client, mock_context, mock_omics_client + ): + """Test successful start of read set import job.""" + # Arrange + mock_get_client.return_value = mock_omics_client + mock_response = { + 'id': 'import-job-123', + 'sequenceStoreId': 'test-store-1', + 'roleArn': 'arn:aws:iam::123456789012:role/HealthOmicsRole', + 'status': 'SUBMITTED', + 'creationTime': '2023-10-01T12:00:00Z', + } + mock_omics_client.start_read_set_import_job.return_value = mock_response + + sources = [ + { + 'sourceFileType': 'FASTQ', + 'sourceFiles': { + 'source1': 's3://test-bucket/sample1_R1.fastq.gz', + 'source2': 's3://test-bucket/sample1_R2.fastq.gz', + }, + 'sampleId': 'sample-1', + 'subjectId': 'subject-1', + } + ] + + # Act + result = await start_aho_read_set_import_job( + mock_context, + sequence_store_id='test-store-1', + role_arn='arn:aws:iam::123456789012:role/HealthOmicsRole', + sources=sources, + ) + + # Assert + assert 'id' in result + assert 'status' in result + assert result['id'] == 'import-job-123' + assert result['status'] == 'SUBMITTED' + assert result['sequenceStoreId'] == 'test-store-1' + mock_omics_client.start_read_set_import_job.assert_called_once() + + @patch('awslabs.aws_healthomics_mcp_server.tools.sequence_store_tools.get_omics_client') + @pytest.mark.asyncio + async def test_list_aho_sequence_stores_client_error( + self, mock_get_client, mock_context, mock_omics_client + ): + """Test handling of AWS client errors.""" + # Arrange + from botocore.exceptions import ClientError + + mock_get_client.return_value = mock_omics_client + error_response = { + 'Error': { + 'Code': 'AccessDeniedException', + 'Message': 'User is not authorized to perform this operation', + } + } + mock_omics_client.list_sequence_stores.side_effect = ClientError( + error_response, 'ListSequenceStores' + ) + + # Act & Assert + with pytest.raises(Exception) as exc_info: + await list_aho_sequence_stores(mock_context, max_results=10) + + assert 'AccessDeniedException' in str(exc_info.value) + assert 'User is not authorized' in str(exc_info.value) + + @patch('awslabs.aws_healthomics_mcp_server.tools.sequence_store_tools.get_omics_client') + @pytest.mark.asyncio + async def test_list_aho_read_sets_with_filters( + self, mock_get_client, mock_context, mock_omics_client + ): + """Test read set listing with filters applied.""" + # Arrange + mock_get_client.return_value = mock_omics_client + mock_response = { + 'readSets': [ + {'id': 'test-readset-1', 'subjectId': 'human-sample-1', 'sampleId': 'sample-1'} + ] + } + mock_omics_client.list_read_sets.return_value = mock_response + + # Act + result = await list_aho_read_sets( + mock_context, + sequence_store_id='test-store-1', + species='human', + chromosome='chr1', + uploaded_after='2023-01-01T00:00:00Z', + max_results=10, + ) + + # Assert + assert 'appliedFilters' in result + assert result['appliedFilters']['species'] == 'human' + assert result['appliedFilters']['chromosome'] == 'chr1' + assert result['appliedFilters']['uploadedAfter'] == '2023-01-01T00:00:00Z' + mock_omics_client.list_read_sets.assert_called_once() + + @patch('awslabs.aws_healthomics_mcp_server.tools.sequence_store_tools.get_omics_client') + @pytest.mark.asyncio + async def test_get_aho_read_set_import_job_success( + self, mock_get_client, mock_context, mock_omics_client + ): + """Test successful retrieval of import job status.""" + # Arrange + mock_get_client.return_value = mock_omics_client + mock_response = { + 'id': 'import-job-123', + 'sequenceStoreId': 'test-store-1', + 'roleArn': 'arn:aws:iam::123456789012:role/HealthOmicsRole', + 'status': 'COMPLETED', + 'creationTime': '2023-10-01T12:00:00Z', + 'completionTime': '2023-10-01T12:30:00Z', + 'sources': [], + } + mock_omics_client.get_read_set_import_job.return_value = mock_response + + # Act + result = await get_aho_read_set_import_job( + mock_context, sequence_store_id='test-store-1', import_job_id='import-job-123' + ) + + # Assert + assert result['id'] == 'import-job-123' + assert result['status'] == 'COMPLETED' + assert 'completionTime' in result + mock_omics_client.get_read_set_import_job.assert_called_once_with( + sequenceStoreId='test-store-1', id='import-job-123' + ) diff --git a/src/aws-healthomics-mcp-server/tests/test_server.py b/src/aws-healthomics-mcp-server/tests/test_server.py index c4b5f10b83..86c3815bc5 100644 --- a/src/aws-healthomics-mcp-server/tests/test_server.py +++ b/src/aws-healthomics-mcp-server/tests/test_server.py @@ -29,6 +29,7 @@ def test_server_has_required_tools(): """Test that the server has all required tools registered.""" # Arrange expected_tools = [ + # Workflow tools 'ListAHOWorkflows', 'CreateAHOWorkflow', 'GetAHOWorkflow', @@ -47,6 +48,35 @@ def test_server_has_required_tools(): 'DiagnoseAHORunFailure', 'PackageAHOWorkflow', 'GetAHOSupportedRegions', + # Data store tools + 'ListAHOSequenceStores', + 'ListAHOReadSets', + 'GetAHOReadSet', + 'StartAHOReadSetImportJob', + 'GetAHOReadSetImportJob', + 'ListAHOReadSetImportJobs', + 'ListAHOVariantStores', + 'GetAHOVariantStore', + 'SearchAHOVariants', + 'CountAHOVariants', + 'StartAHOVariantImportJob', + 'GetAHOVariantImportJob', + 'ListAHOReferenceStores', + 'GetAHOReferenceStore', + 'ListAHOReferences', + 'GetAHOReference', + 'StartAHOReferenceImportJob', + 'GetAHOReferenceImportJob', + 'ListAHOAnnotationStores', + 'GetAHOAnnotationStore', + 'SearchAHOAnnotations', + 'StartAHOAnnotationImportJob', + 'GetAHOAnnotationImportJob', + 'ValidateAHOS3UriFormat', + 'DiscoverAHOGenomicFiles', + 'ListAHOS3BucketContents', + 'GetAHOS3FileMetadata', + 'PrepareAHOImportSources', ] # Act