This is page 7 of 15. Use http://codebase.md/genomoncology/biomcp?lines=false&page={x} to view the full context. # Directory Structure ``` ├── .github │ ├── actions │ │ └── setup-python-env │ │ └── action.yml │ ├── dependabot.yml │ └── workflows │ ├── ci.yml │ ├── deploy-docs.yml │ ├── main.yml.disabled │ ├── on-release-main.yml │ └── validate-codecov-config.yml ├── .gitignore ├── .pre-commit-config.yaml ├── BIOMCP_DATA_FLOW.md ├── CHANGELOG.md ├── CNAME ├── codecov.yaml ├── docker-compose.yml ├── Dockerfile ├── docs │ ├── apis │ │ ├── error-codes.md │ │ ├── overview.md │ │ └── python-sdk.md │ ├── assets │ │ ├── biomcp-cursor-locations.png │ │ ├── favicon.ico │ │ ├── icon.png │ │ ├── logo.png │ │ ├── mcp_architecture.txt │ │ └── remote-connection │ │ ├── 00_connectors.png │ │ ├── 01_add_custom_connector.png │ │ ├── 02_connector_enabled.png │ │ ├── 03_connect_to_biomcp.png │ │ ├── 04_select_google_oauth.png │ │ └── 05_success_connect.png │ ├── backend-services-reference │ │ ├── 01-overview.md │ │ ├── 02-biothings-suite.md │ │ ├── 03-cbioportal.md │ │ ├── 04-clinicaltrials-gov.md │ │ ├── 05-nci-cts-api.md │ │ ├── 06-pubtator3.md │ │ └── 07-alphagenome.md │ ├── blog │ │ ├── ai-assisted-clinical-trial-search-analysis.md │ │ ├── images │ │ │ ├── deep-researcher-video.png │ │ │ ├── researcher-announce.png │ │ │ ├── researcher-drop-down.png │ │ │ ├── researcher-prompt.png │ │ │ ├── trial-search-assistant.png │ │ │ └── what_is_biomcp_thumbnail.png │ │ └── researcher-persona-resource.md │ ├── changelog.md │ ├── CNAME │ ├── concepts │ │ ├── 01-what-is-biomcp.md │ │ ├── 02-the-deep-researcher-persona.md │ │ └── 03-sequential-thinking-with-the-think-tool.md │ ├── developer-guides │ │ ├── 01-server-deployment.md │ │ ├── 02-contributing-and-testing.md │ │ ├── 03-third-party-endpoints.md │ │ ├── 04-transport-protocol.md │ │ ├── 05-error-handling.md │ │ ├── 06-http-client-and-caching.md │ │ ├── 07-performance-optimizations.md │ │ └── generate_endpoints.py │ ├── faq-condensed.md │ ├── FDA_SECURITY.md │ ├── genomoncology.md │ ├── getting-started │ │ ├── 01-quickstart-cli.md │ │ ├── 02-claude-desktop-integration.md │ │ └── 03-authentication-and-api-keys.md │ ├── how-to-guides │ │ ├── 01-find-articles-and-cbioportal-data.md │ │ ├── 02-find-trials-with-nci-and-biothings.md │ │ ├── 03-get-comprehensive-variant-annotations.md │ │ ├── 04-predict-variant-effects-with-alphagenome.md │ │ ├── 05-logging-and-monitoring-with-bigquery.md │ │ └── 06-search-nci-organizations-and-interventions.md │ ├── index.md │ ├── policies.md │ ├── reference │ │ ├── architecture-diagrams.md │ │ ├── quick-architecture.md │ │ ├── quick-reference.md │ │ └── visual-architecture.md │ ├── robots.txt │ ├── stylesheets │ │ ├── announcement.css │ │ └── extra.css │ ├── troubleshooting.md │ ├── tutorials │ │ ├── biothings-prompts.md │ │ ├── claude-code-biomcp-alphagenome.md │ │ ├── nci-prompts.md │ │ ├── openfda-integration.md │ │ ├── openfda-prompts.md │ │ ├── pydantic-ai-integration.md │ │ └── remote-connection.md │ ├── user-guides │ │ ├── 01-command-line-interface.md │ │ ├── 02-mcp-tools-reference.md │ │ └── 03-integrating-with-ides-and-clients.md │ └── workflows │ └── all-workflows.md ├── example_scripts │ ├── mcp_integration.py │ └── python_sdk.py ├── glama.json ├── LICENSE ├── lzyank.toml ├── Makefile ├── mkdocs.yml ├── package-lock.json ├── package.json ├── pyproject.toml ├── README.md ├── scripts │ ├── check_docs_in_mkdocs.py │ ├── check_http_imports.py │ └── generate_endpoints_doc.py ├── smithery.yaml ├── src │ └── biomcp │ ├── __init__.py │ ├── __main__.py │ ├── articles │ │ ├── __init__.py │ │ ├── autocomplete.py │ │ ├── fetch.py │ │ ├── preprints.py │ │ ├── search_optimized.py │ │ ├── search.py │ │ └── unified.py │ ├── biomarkers │ │ ├── __init__.py │ │ └── search.py │ ├── cbioportal_helper.py │ ├── circuit_breaker.py │ ├── cli │ │ ├── __init__.py │ │ ├── articles.py │ │ ├── biomarkers.py │ │ ├── diseases.py │ │ ├── health.py │ │ ├── interventions.py │ │ ├── main.py │ │ ├── openfda.py │ │ ├── organizations.py │ │ ├── server.py │ │ ├── trials.py │ │ └── variants.py │ ├── connection_pool.py │ ├── constants.py │ ├── core.py │ ├── diseases │ │ ├── __init__.py │ │ ├── getter.py │ │ └── search.py │ ├── domain_handlers.py │ ├── drugs │ │ ├── __init__.py │ │ └── getter.py │ ├── exceptions.py │ ├── genes │ │ ├── __init__.py │ │ └── getter.py │ ├── http_client_simple.py │ ├── http_client.py │ ├── individual_tools.py │ ├── integrations │ │ ├── __init__.py │ │ ├── biothings_client.py │ │ └── cts_api.py │ ├── interventions │ │ ├── __init__.py │ │ ├── getter.py │ │ └── search.py │ ├── logging_filter.py │ ├── metrics_handler.py │ ├── metrics.py │ ├── openfda │ │ ├── __init__.py │ │ ├── adverse_events_helpers.py │ │ ├── adverse_events.py │ │ ├── cache.py │ │ ├── constants.py │ │ ├── device_events_helpers.py │ │ ├── device_events.py │ │ ├── drug_approvals.py │ │ ├── drug_labels_helpers.py │ │ ├── drug_labels.py │ │ ├── drug_recalls_helpers.py │ │ ├── drug_recalls.py │ │ ├── drug_shortages_detail_helpers.py │ │ ├── drug_shortages_helpers.py │ │ ├── drug_shortages.py │ │ ├── exceptions.py │ │ ├── input_validation.py │ │ ├── rate_limiter.py │ │ ├── utils.py │ │ └── validation.py │ ├── organizations │ │ ├── __init__.py │ │ ├── getter.py │ │ └── search.py │ ├── parameter_parser.py │ ├── prefetch.py │ ├── query_parser.py │ ├── query_router.py │ ├── rate_limiter.py │ ├── render.py │ ├── request_batcher.py │ ├── resources │ │ ├── __init__.py │ │ ├── getter.py │ │ ├── instructions.md │ │ └── researcher.md │ ├── retry.py │ ├── router_handlers.py │ ├── router.py │ ├── shared_context.py │ ├── thinking │ │ ├── __init__.py │ │ ├── sequential.py │ │ └── session.py │ ├── thinking_tool.py │ ├── thinking_tracker.py │ ├── trials │ │ ├── __init__.py │ │ ├── getter.py │ │ ├── nci_getter.py │ │ ├── nci_search.py │ │ └── search.py │ ├── utils │ │ ├── __init__.py │ │ ├── cancer_types_api.py │ │ ├── cbio_http_adapter.py │ │ ├── endpoint_registry.py │ │ ├── gene_validator.py │ │ ├── metrics.py │ │ ├── mutation_filter.py │ │ ├── query_utils.py │ │ ├── rate_limiter.py │ │ └── request_cache.py │ ├── variants │ │ ├── __init__.py │ │ ├── alphagenome.py │ │ ├── cancer_types.py │ │ ├── cbio_external_client.py │ │ ├── cbioportal_mutations.py │ │ ├── cbioportal_search_helpers.py │ │ ├── cbioportal_search.py │ │ ├── constants.py │ │ ├── external.py │ │ ├── filters.py │ │ ├── getter.py │ │ ├── links.py │ │ └── search.py │ └── workers │ ├── __init__.py │ ├── worker_entry_stytch.js │ ├── worker_entry.js │ └── worker.py ├── tests │ ├── bdd │ │ ├── cli_help │ │ │ ├── help.feature │ │ │ └── test_help.py │ │ ├── conftest.py │ │ ├── features │ │ │ └── alphagenome_integration.feature │ │ ├── fetch_articles │ │ │ ├── fetch.feature │ │ │ └── test_fetch.py │ │ ├── get_trials │ │ │ ├── get.feature │ │ │ └── test_get.py │ │ ├── get_variants │ │ │ ├── get.feature │ │ │ └── test_get.py │ │ ├── search_articles │ │ │ ├── autocomplete.feature │ │ │ ├── search.feature │ │ │ ├── test_autocomplete.py │ │ │ └── test_search.py │ │ ├── search_trials │ │ │ ├── search.feature │ │ │ └── test_search.py │ │ ├── search_variants │ │ │ ├── search.feature │ │ │ └── test_search.py │ │ └── steps │ │ └── test_alphagenome_steps.py │ ├── config │ │ └── test_smithery_config.py │ ├── conftest.py │ ├── data │ │ ├── ct_gov │ │ │ ├── clinical_trials_api_v2.yaml │ │ │ ├── trials_NCT04280705.json │ │ │ └── trials_NCT04280705.txt │ │ ├── myvariant │ │ │ ├── myvariant_api.yaml │ │ │ ├── myvariant_field_descriptions.csv │ │ │ ├── variants_full_braf_v600e.json │ │ │ ├── variants_full_braf_v600e.txt │ │ │ └── variants_part_braf_v600_multiple.json │ │ ├── openfda │ │ │ ├── drugsfda_detail.json │ │ │ ├── drugsfda_search.json │ │ │ ├── enforcement_detail.json │ │ │ └── enforcement_search.json │ │ └── pubtator │ │ ├── pubtator_autocomplete.json │ │ └── pubtator3_paper.txt │ ├── integration │ │ ├── test_openfda_integration.py │ │ ├── test_preprints_integration.py │ │ ├── test_simple.py │ │ └── test_variants_integration.py │ ├── tdd │ │ ├── articles │ │ │ ├── test_autocomplete.py │ │ │ ├── test_cbioportal_integration.py │ │ │ ├── test_fetch.py │ │ │ ├── test_preprints.py │ │ │ ├── test_search.py │ │ │ └── test_unified.py │ │ ├── conftest.py │ │ ├── drugs │ │ │ ├── __init__.py │ │ │ └── test_drug_getter.py │ │ ├── openfda │ │ │ ├── __init__.py │ │ │ ├── test_adverse_events.py │ │ │ ├── test_device_events.py │ │ │ ├── test_drug_approvals.py │ │ │ ├── test_drug_labels.py │ │ │ ├── test_drug_recalls.py │ │ │ ├── test_drug_shortages.py │ │ │ └── test_security.py │ │ ├── test_biothings_integration_real.py │ │ ├── test_biothings_integration.py │ │ ├── test_circuit_breaker.py │ │ ├── test_concurrent_requests.py │ │ ├── test_connection_pool.py │ │ ├── test_domain_handlers.py │ │ ├── test_drug_approvals.py │ │ ├── test_drug_recalls.py │ │ ├── test_drug_shortages.py │ │ ├── test_endpoint_documentation.py │ │ ├── test_error_scenarios.py │ │ ├── test_europe_pmc_fetch.py │ │ ├── test_mcp_integration.py │ │ ├── test_mcp_tools.py │ │ ├── test_metrics.py │ │ ├── test_nci_integration.py │ │ ├── test_nci_mcp_tools.py │ │ ├── test_network_policies.py │ │ ├── test_offline_mode.py │ │ ├── test_openfda_unified.py │ │ ├── test_pten_r173_search.py │ │ ├── test_render.py │ │ ├── test_request_batcher.py.disabled │ │ ├── test_retry.py │ │ ├── test_router.py │ │ ├── test_shared_context.py.disabled │ │ ├── test_unified_biothings.py │ │ ├── thinking │ │ │ ├── __init__.py │ │ │ └── test_sequential.py │ │ ├── trials │ │ │ ├── test_backward_compatibility.py │ │ │ ├── test_getter.py │ │ │ └── test_search.py │ │ ├── utils │ │ │ ├── test_gene_validator.py │ │ │ ├── test_mutation_filter.py │ │ │ ├── test_rate_limiter.py │ │ │ └── test_request_cache.py │ │ ├── variants │ │ │ ├── constants.py │ │ │ ├── test_alphagenome_api_key.py │ │ │ ├── test_alphagenome_comprehensive.py │ │ │ ├── test_alphagenome.py │ │ │ ├── test_cbioportal_mutations.py │ │ │ ├── test_cbioportal_search.py │ │ │ ├── test_external_integration.py │ │ │ ├── test_external.py │ │ │ ├── test_extract_gene_aa_change.py │ │ │ ├── test_filters.py │ │ │ ├── test_getter.py │ │ │ ├── test_links.py │ │ │ └── test_search.py │ │ └── workers │ │ └── test_worker_sanitization.js │ └── test_pydantic_ai_integration.py ├── THIRD_PARTY_ENDPOINTS.md ├── tox.ini ├── uv.lock └── wrangler.toml ``` # Files -------------------------------------------------------------------------------- /tests/tdd/test_domain_handlers.py: -------------------------------------------------------------------------------- ```python """Tests for domain handlers module.""" import pytest from biomcp.constants import DEFAULT_TITLE from biomcp.domain_handlers import ( ArticleHandler, TrialHandler, VariantHandler, get_domain_handler, ) class TestArticleHandler: """Test ArticleHandler class.""" def test_format_pubmed_article(self): """Test formatting a PubMed article.""" article = { "pmid": "12345", "title": "Test Article Title", "abstract": "This is a test abstract that is longer than 200 characters. " * 5, "pub_year": "2023", "journal": "Test Journal", "authors": ["Smith J", "Doe J", "Johnson A", "Williams B"], } result = ArticleHandler.format_result(article) assert result["id"] == "12345" assert result["title"] == "Test Article Title" assert len(result["snippet"]) == 203 # 200 + "..." assert result["snippet"].endswith("...") assert result["url"] == "https://pubmed.ncbi.nlm.nih.gov/12345/" assert result["metadata"]["year"] == "2023" assert result["metadata"]["journal"] == "Test Journal" assert len(result["metadata"]["authors"]) == 3 # Only first 3 def test_format_preprint_article(self): """Test formatting a preprint article.""" preprint = { "doi": "10.1101/2023.01.01.12345", "id": "biorxiv-123", "title": "Preprint Title", "abstract": "Short abstract", "url": "https://www.biorxiv.org/content/10.1101/2023.01.01.12345", "pub_year": "2023", "source": "bioRxiv", "authors": ["Author A", "Author B"], } result = ArticleHandler.format_result(preprint) assert result["id"] == "10.1101/2023.01.01.12345" assert result["title"] == "Preprint Title" assert result["snippet"] == "Short abstract..." assert ( result["url"] == "https://www.biorxiv.org/content/10.1101/2023.01.01.12345" ) assert result["metadata"]["source"] == "bioRxiv" def test_format_article_missing_fields(self): """Test formatting article with missing fields.""" article = { "pmid": "67890", # Missing title, abstract, etc. } result = ArticleHandler.format_result(article) assert result["id"] == "67890" assert ( result["title"] == DEFAULT_TITLE ) # Should use default for missing title assert result["snippet"] == "" # Empty when no abstract assert result["url"] == "https://pubmed.ncbi.nlm.nih.gov/67890/" def test_format_article_with_date_field(self): """Test formatting article with date field instead of pub_year.""" article = { "pmid": "123", "title": "Test", "date": "2023-05-15", } result = ArticleHandler.format_result(article) assert result["metadata"]["year"] == "2023" def test_format_article_title_normalization(self): """Test that article title whitespace is normalized.""" article = { "pmid": "123", "title": " Test Article\n\nWith Extra Spaces ", } result = ArticleHandler.format_result(article) assert result["title"] == "Test Article With Extra Spaces" class TestTrialHandler: """Test TrialHandler class.""" def test_format_trial_api_v2(self): """Test formatting trial with API v2 structure.""" trial = { "protocolSection": { "identificationModule": { "nctId": "NCT12345", "briefTitle": "Brief Title", "officialTitle": "Official Title", }, "statusModule": { "overallStatus": "RECRUITING", "startDateStruct": {"date": "2023-01-01"}, "primaryCompletionDateStruct": {"date": "2024-12-31"}, }, "descriptionModule": { "briefSummary": "This is a brief summary of the trial." }, "designModule": { "phases": ["PHASE3"], }, } } result = TrialHandler.format_result(trial) assert result["id"] == "NCT12345" assert result["title"] == "Brief Title" assert "brief summary" in result["snippet"] assert result["url"] == "https://clinicaltrials.gov/study/NCT12345" assert result["metadata"]["status"] == "RECRUITING" assert result["metadata"]["phase"] == "PHASE3" assert result["metadata"]["start_date"] == "2023-01-01" assert result["metadata"]["primary_completion_date"] == "2024-12-31" def test_format_trial_legacy_flat(self): """Test formatting trial with legacy flat structure.""" trial = { "NCT Number": "NCT67890", "Study Title": "Legacy Trial Title", "Brief Summary": "Legacy summary", "Study Status": "COMPLETED", "Phases": "Phase 2", "Start Date": "2022-01-01", "Completion Date": "2023-12-31", } result = TrialHandler.format_result(trial) assert result["id"] == "NCT67890" assert result["title"] == "Legacy Trial Title" assert result["snippet"].startswith("Legacy summary") assert result["url"] == "https://clinicaltrials.gov/study/NCT67890" assert result["metadata"]["status"] == "COMPLETED" assert result["metadata"]["phase"] == "Phase 2" def test_format_trial_legacy_simple(self): """Test formatting trial with legacy simple structure.""" trial = { "nct_id": "NCT11111", "brief_title": "Simple Trial", "overall_status": "ACTIVE", "phase": "PHASE1", } result = TrialHandler.format_result(trial) assert result["id"] == "NCT11111" assert result["title"] == "Simple Trial" assert result["metadata"]["status"] == "ACTIVE" assert result["metadata"]["phase"] == "PHASE1" def test_format_trial_missing_title(self): """Test formatting trial with missing brief title.""" trial = { "protocolSection": { "identificationModule": { "nctId": "NCT99999", "officialTitle": "Only Official Title", }, } } result = TrialHandler.format_result(trial) assert result["id"] == "NCT99999" assert result["title"] == "Only Official Title" def test_format_trial_empty_phases(self): """Test formatting trial with empty phases array.""" trial = { "protocolSection": { "identificationModule": {"nctId": "NCT123"}, "designModule": {"phases": []}, } } result = TrialHandler.format_result(trial) assert result["metadata"]["phase"] == "" class TestVariantHandler: """Test VariantHandler class.""" def test_format_variant_complete(self): """Test formatting variant with complete data.""" variant = { "_id": "chr7:g.140453136A>T", "dbnsfp": { "genename": "BRAF", "hgvsp": ["BRAF:p.V600E"], }, "dbsnp": { "rsid": "rs121913529", "gene": {"symbol": "BRAF"}, }, "clinvar": { "rcv": { "clinical_significance": "Pathogenic", } }, "cadd": { "consequence": "missense_variant", }, } result = VariantHandler.format_result(variant) assert result["id"] == "chr7:g.140453136A>T" assert result["title"] == "BRAF BRAF:p.V600E" assert "Pathogenic" in result["snippet"] assert "rs121913529" in result["url"] assert result["metadata"]["gene"] == "BRAF" assert result["metadata"]["rsid"] == "rs121913529" assert result["metadata"]["clinical_significance"] == "Pathogenic" assert result["metadata"]["consequence"] == "missense_variant" def test_format_variant_gene_list(self): """Test formatting variant when gene is a list.""" variant = { "_id": "rs123", "dbnsfp": {"genename": ["GENE1", "GENE2"]}, } result = VariantHandler.format_result(variant) assert result["metadata"]["gene"] == "GENE1" def test_format_variant_clinvar_list(self): """Test formatting variant when clinvar RCV is a list.""" variant = { "_id": "rs456", "clinvar": { "rcv": [ {"clinical_significance": "Pathogenic"}, {"clinical_significance": "Likely pathogenic"}, ] }, } result = VariantHandler.format_result(variant) assert result["metadata"]["clinical_significance"] == "Pathogenic" def test_format_variant_minimal(self): """Test formatting variant with minimal data.""" variant = { "_id": "chr1:g.12345A>G", } result = VariantHandler.format_result(variant) assert result["id"] == "chr1:g.12345A>G" assert result["title"] == "chr1:g.12345A>G" assert "Unknown" in result["snippet"] assert result["url"] == "" def test_format_variant_hgvsp_list(self): """Test formatting variant when HGVS protein is a list.""" variant = { "_id": "rs789", "dbnsfp": { "genename": "TP53", "hgvsp": ["TP53:p.R175H", "TP53:p.R175C"], }, } result = VariantHandler.format_result(variant) assert result["title"] == "TP53 TP53:p.R175H" def test_format_variant_no_rsid_url(self): """Test variant URL generation without rsID.""" variant = { "_id": "chr2:g.234567C>T", } result = VariantHandler.format_result(variant) assert result["url"] == "" class TestGetDomainHandler: """Test get_domain_handler function.""" def test_get_article_handler(self): """Test getting article handler.""" handler = get_domain_handler("article") assert handler == ArticleHandler def test_get_trial_handler(self): """Test getting trial handler.""" handler = get_domain_handler("trial") assert handler == TrialHandler def test_get_variant_handler(self): """Test getting variant handler.""" handler = get_domain_handler("variant") assert handler == VariantHandler def test_get_invalid_handler(self): """Test getting handler for invalid domain.""" with pytest.raises(ValueError) as exc_info: get_domain_handler("invalid") assert "Unknown domain: invalid" in str(exc_info.value) def test_get_handler_case_sensitive(self): """Test that domain names are case sensitive.""" # Should work with lowercase handler = get_domain_handler("article") assert handler == ArticleHandler # Should fail with uppercase with pytest.raises(ValueError): get_domain_handler("ARTICLE") ``` -------------------------------------------------------------------------------- /src/biomcp/cli/health.py: -------------------------------------------------------------------------------- ```python """Health check command for BioMCP CLI. This module provides a command to check the health of API endpoints and system resources. """ import asyncio import platform import socket from typing import Any import typer from rich.console import Console from rich.panel import Panel from rich.table import Table from .. import http_client from ..constants import ( CLINICAL_TRIALS_BASE_URL, MYVARIANT_BASE_URL, PUBTATOR3_BASE_URL, ) # Try to import psutil, but handle case where it's not installed try: import psutil PSUTIL_AVAILABLE = True except ImportError: PSUTIL_AVAILABLE = False health_app = typer.Typer(help="Health check operations") console = Console() async def check_api_endpoint( url: str, name: str, params: dict[Any, Any] | None = None, method: str = "GET", ) -> dict: """Check if an API endpoint is accessible and responding.""" try: status, content = await http_client.call_http( method, url, params or {} ) return { "name": name, "url": url, "status": status, "accessible": status == 200, "message": "OK" if status == 200 else f"Error: HTTP {status}", "content": content[:500] if len(content) > 500 else content, # Truncate long responses } except Exception as e: return { "name": name, "url": url, "status": 0, "accessible": False, "message": f"Error: {e!s}", "content": str(e), } async def check_all_api_endpoints() -> list[dict]: """Check all known API endpoints.""" endpoints: list[dict[str, Any]] = [ # PubTator3 API endpoints { "url": f"{PUBTATOR3_BASE_URL}/entity/autocomplete/", "name": "PubTator3 Autocomplete", "params": {"query": "BRAF", "concept": "gene", "limit": 2}, }, { "url": f"{PUBTATOR3_BASE_URL}/publications/export/biocjson", "name": "PubTator3 Publications", "params": {"pmids": "29355051", "full": "false"}, }, { "url": f"{PUBTATOR3_BASE_URL}/search/", "name": "PubTator3 Search", "params": { "query": "BRAF", "concepts": "gene", "page": 1, "size": 1, "text": "@CHEMICAL_remdesivir", }, }, # ClinicalTrials.gov API endpoints { "url": f"{CLINICAL_TRIALS_BASE_URL}", "name": "ClinicalTrials.gov Search API", "params": {"query.term": "cancer", "pageSize": "1"}, }, { "url": f"{CLINICAL_TRIALS_BASE_URL}/NCT04280705", "name": "ClinicalTrials.gov Study API", "params": {"fields": "IdentificationModule,StatusModule"}, }, # MyVariant.info API endpoints { "url": f"{MYVARIANT_BASE_URL}/query", "name": "MyVariant.info Query API", "params": {"q": "rs113488022", "size": 1}, }, { "url": f"{MYVARIANT_BASE_URL}/variant/rs113488022", "name": "MyVariant.info Variant API", "params": {"fields": "all"}, }, ] tasks = [] for endpoint in endpoints: url = endpoint["url"] name = endpoint["name"] params = endpoint.get("params") tasks.append(check_api_endpoint(url, name, params)) return await asyncio.gather(*tasks) def check_network_connectivity() -> dict: """Check basic network connectivity.""" try: # Try to connect to Google's DNS to check internet connectivity socket.create_connection(("8.8.8.8", 53), timeout=3) return { "status": "Connected", "message": "Internet connection is available", } except OSError: return { "status": "Disconnected", "message": "No internet connection detected", } def check_system_resources() -> dict: """Check system resources like CPU, memory, and disk space.""" if not PSUTIL_AVAILABLE: return { "error": "psutil package not installed. Install with: pip install psutil" } return { "cpu_usage": psutil.cpu_percent(interval=1), "memory": { "total": psutil.virtual_memory().total / (1024**3), # GB "available": psutil.virtual_memory().available / (1024**3), # GB "percent_used": psutil.virtual_memory().percent, }, "disk": { "total": psutil.disk_usage("/").total / (1024**3), # GB "free": psutil.disk_usage("/").free / (1024**3), # GB "percent_used": psutil.disk_usage("/").percent, }, } def check_python_environment() -> dict: """Check Python environment and installed packages.""" env_info = { "python_version": platform.python_version(), "platform": platform.platform(), "system": platform.system(), } # Check for httpx version without importing it try: import importlib.metadata env_info["httpx_version"] = importlib.metadata.version("httpx") except (ImportError, importlib.metadata.PackageNotFoundError): env_info["httpx_version"] = "Unknown" if PSUTIL_AVAILABLE: env_info["psutil_version"] = psutil.__version__ else: env_info["psutil_version"] = "Not installed" return env_info def display_api_health(results: list[dict], verbose: bool = False) -> None: """Display API health check results in a table.""" table = Table(title="API Endpoints Health") table.add_column("Endpoint", style="cyan") table.add_column("URL", style="blue") table.add_column("Status", style="magenta") table.add_column("Message", style="green") for result in results: "green" if result["accessible"] else "red" table.add_row( result["name"], result["url"], f"{result['status']}", result["message"], style=None if result["accessible"] else "red", ) console.print(table) # Display detailed response content if verbose mode is enabled if verbose: for result in results: if not result["accessible"]: console.print( f"\n[bold red]Detailed error for {result['name']}:[/bold red]" ) console.print( Panel( result["content"], title=f"{result['name']} Response", border_style="red", ) ) def display_system_health( system_info: dict, network_info: dict, env_info: dict ) -> None: """Display system health information in a table.""" # System resources table resource_table = Table(title="System Resources") resource_table.add_column("Resource", style="cyan") resource_table.add_column("Value", style="green") if "error" in system_info: resource_table.add_row("Error", system_info["error"], style="red") else: resource_table.add_row("CPU Usage", f"{system_info['cpu_usage']}%") resource_table.add_row( "Memory Total", f"{system_info['memory']['total']:.2f} GB" ) resource_table.add_row( "Memory Available", f"{system_info['memory']['available']:.2f} GB" ) resource_table.add_row( "Memory Usage", f"{system_info['memory']['percent_used']}%", style="green" if system_info["memory"]["percent_used"] < 90 else "red", ) resource_table.add_row( "Disk Total", f"{system_info['disk']['total']:.2f} GB" ) resource_table.add_row( "Disk Free", f"{system_info['disk']['free']:.2f} GB" ) resource_table.add_row( "Disk Usage", f"{system_info['disk']['percent_used']}%", style="green" if system_info["disk"]["percent_used"] < 90 else "red", ) console.print(resource_table) # Network and environment table env_table = Table(title="Network & Environment") env_table.add_column("Component", style="cyan") env_table.add_column("Status/Version", style="green") env_table.add_row( "Network", network_info["status"], style=None if network_info["status"] == "Connected" else "red", ) env_table.add_row("Python Version", env_info["python_version"]) env_table.add_row("Platform", env_info["platform"]) env_table.add_row("System", env_info["system"]) env_table.add_row("HTTPX Version", env_info["httpx_version"]) env_table.add_row( "Psutil Version", env_info["psutil_version"], style="red" if env_info["psutil_version"] == "Not installed" else None, ) console.print(env_table) @health_app.callback(invoke_without_command=True) def health_callback(ctx: typer.Context): """Health check callback.""" if ctx.invoked_subcommand is None: # If no subcommand is provided, run the default health check check() @health_app.command() def check( api_only: bool = typer.Option( False, "--api-only", help="Check only API endpoints" ), system_only: bool = typer.Option( False, "--system-only", help="Check only system health" ), verbose: bool = typer.Option( False, "--verbose", "-v", help="Show detailed error information and API responses", ), ): """ Run a comprehensive health check on API endpoints and system resources. This command checks: - API endpoints connectivity and response - Network connectivity - System resources (CPU, memory, disk) - Python environment Note: For full system resource checks, the 'psutil' package is required. Install with: pip install psutil """ with console.status("[bold green]Running health checks...") as status: # Check API endpoints if not system_only: status.update("[bold green]Checking API endpoints...") api_results = asyncio.run(check_all_api_endpoints()) display_api_health(api_results, verbose) # Check system health if not api_only: status.update("[bold green]Checking system resources...") system_info = check_system_resources() network_info = check_network_connectivity() env_info = check_python_environment() display_system_health(system_info, network_info, env_info) # Overall status if not api_only and not system_only: api_health = all(result["accessible"] for result in api_results) if "error" in system_info: system_health = False else: system_health = ( network_info["status"] == "Connected" and system_info["memory"]["percent_used"] < 90 and system_info["disk"]["percent_used"] < 90 ) if api_health and system_health: console.print( "\n[bold green]✓ All systems operational![/bold green]" ) else: console.print( "\n[bold red]⚠ Some health checks failed. See details above.[/bold red]" ) if verbose: console.print( "[yellow]Run with --verbose flag to see detailed error information[/yellow]" ) ``` -------------------------------------------------------------------------------- /src/biomcp/metrics.py: -------------------------------------------------------------------------------- ```python """Performance monitoring and metrics collection for BioMCP.""" import asyncio import functools import logging import os import time from collections import defaultdict from dataclasses import dataclass, field from datetime import datetime from .constants import ( MAX_METRIC_SAMPLES, METRIC_PERCENTILE_50, METRIC_PERCENTILE_95, METRIC_PERCENTILE_99, ) logger = logging.getLogger(__name__) # Check if metrics are enabled via environment variable METRICS_ENABLED = ( os.getenv("BIOMCP_METRICS_ENABLED", "false").lower() == "true" ) @dataclass class MetricSample: """Single metric measurement.""" timestamp: datetime duration: float success: bool error: str | None = None tags: dict[str, str] = field(default_factory=dict) @dataclass class MetricSummary: """Summary statistics for a metric.""" name: str count: int success_count: int error_count: int total_duration: float min_duration: float max_duration: float avg_duration: float p50_duration: float p95_duration: float p99_duration: float error_rate: float @classmethod def from_samples( cls, name: str, samples: list[MetricSample] ) -> "MetricSummary": """Calculate summary statistics from samples.""" if not samples: return cls( name=name, count=0, success_count=0, error_count=0, total_duration=0.0, min_duration=0.0, max_duration=0.0, avg_duration=0.0, p50_duration=0.0, p95_duration=0.0, p99_duration=0.0, error_rate=0.0, ) durations = sorted([s.duration for s in samples]) success_count = sum(1 for s in samples if s.success) error_count = len(samples) - success_count def percentile(data: list[float], p: float) -> float: """Calculate percentile.""" if not data: return 0.0 k = (len(data) - 1) * p f = int(k) c = k - f if f >= len(data) - 1: return data[-1] return data[f] + c * (data[f + 1] - data[f]) return cls( name=name, count=len(samples), success_count=success_count, error_count=error_count, total_duration=sum(durations), min_duration=min(durations), max_duration=max(durations), avg_duration=sum(durations) / len(durations), p50_duration=percentile(durations, METRIC_PERCENTILE_50), p95_duration=percentile(durations, METRIC_PERCENTILE_95), p99_duration=percentile(durations, METRIC_PERCENTILE_99), error_rate=error_count / len(samples) if samples else 0.0, ) class MetricsCollector: """Collects and manages performance metrics.""" def __init__(self, max_samples_per_metric: int = MAX_METRIC_SAMPLES): """Initialize metrics collector. Args: max_samples_per_metric: Maximum samples to keep per metric """ self._metrics: dict[str, list[MetricSample]] = defaultdict(list) self._max_samples = max_samples_per_metric self._lock = asyncio.Lock() async def record( self, name: str, duration: float, success: bool = True, error: str | None = None, tags: dict[str, str] | None = None, ) -> None: """Record a metric sample. Args: name: Metric name duration: Duration in seconds success: Whether operation succeeded error: Error message if failed tags: Additional metadata tags """ sample = MetricSample( timestamp=datetime.now(), duration=duration, success=success, error=error, tags=tags or {}, ) async with self._lock: samples = self._metrics[name] samples.append(sample) # Keep only the most recent samples if len(samples) > self._max_samples: self._metrics[name] = samples[-self._max_samples :] async def get_summary(self, name: str) -> MetricSummary | None: """Get summary statistics for a metric. Args: name: Metric name Returns: Summary statistics or None if metric not found """ async with self._lock: samples = self._metrics.get(name, []) if not samples: return None return MetricSummary.from_samples(name, samples) async def get_all_summaries(self) -> dict[str, MetricSummary]: """Get summaries for all metrics. Returns: Dictionary of metric name to summary """ async with self._lock: return { name: MetricSummary.from_samples(name, samples) for name, samples in self._metrics.items() } async def clear(self, name: str | None = None) -> None: """Clear metrics. Args: name: Specific metric to clear, or None to clear all """ async with self._lock: if name: self._metrics.pop(name, None) else: self._metrics.clear() # Global metrics collector instance _metrics_collector = MetricsCollector() async def record_metric( name: str, duration: float, success: bool = True, error: str | None = None, tags: dict[str, str] | None = None, ) -> None: """Record a metric to the global collector. Note: This is a no-op if BIOMCP_METRICS_ENABLED is not set to true. Args: name: Metric name duration: Duration in seconds success: Whether operation succeeded error: Error message if failed tags: Additional metadata tags """ if METRICS_ENABLED: await _metrics_collector.record(name, duration, success, error, tags) async def get_metric_summary(name: str) -> MetricSummary | None: """Get summary statistics for a metric. Args: name: Metric name Returns: Summary statistics or None if metric not found """ return await _metrics_collector.get_summary(name) async def get_all_metrics() -> dict[str, MetricSummary]: """Get summaries for all metrics. Returns: Dictionary of metric name to summary """ return await _metrics_collector.get_all_summaries() def track_performance(metric_name: str | None = None): """Decorator to track function performance. Args: metric_name: Custom metric name (defaults to function name) Returns: Decorated function """ def decorator(func): name = metric_name or f"{func.__module__}.{func.__name__}" @functools.wraps(func) async def async_wrapper(*args, **kwargs): start_time = time.perf_counter() success = True error_msg = None try: result = await func(*args, **kwargs) return result except Exception as exc: success = False error_msg = str(exc) raise finally: duration = time.perf_counter() - start_time await record_metric( name=name, duration=duration, success=success, error=error_msg, ) @functools.wraps(func) def sync_wrapper(*args, **kwargs): start_time = time.perf_counter() success = True error_msg = None try: result = func(*args, **kwargs) return result except Exception as exc: success = False error_msg = str(exc) raise finally: duration = time.perf_counter() - start_time # Schedule metric recording in the event loop try: loop = asyncio.get_running_loop() # Fire and forget the metric recording task = loop.create_task( record_metric( name=name, duration=duration, success=success, error=error_msg, ) ) # Add error handler to prevent unhandled exceptions task.add_done_callback( lambda t: t.exception() if t.done() else None ) except RuntimeError: # No event loop running, log instead logger.debug( f"Metric {name}: duration={duration:.3f}s, " f"success={success}, error={error_msg}" ) # Return appropriate wrapper based on function type if asyncio.iscoroutinefunction(func): return async_wrapper else: return sync_wrapper return decorator # Context manager for timing operations class Timer: """Context manager for timing operations.""" def __init__(self, metric_name: str, tags: dict[str, str] | None = None): """Initialize timer. Args: metric_name: Name for the metric tags: Additional metadata tags """ self.metric_name = metric_name self.tags = tags or {} self.start_time: float | None = None def __enter__(self): """Start timing.""" self.start_time = time.perf_counter() return self def __exit__(self, exc_type, exc_val, exc_tb): """Stop timing and record metric.""" if self.start_time is None or not METRICS_ENABLED: return False duration = time.perf_counter() - self.start_time success = exc_type is None error_msg = str(exc_val) if exc_val else None # Schedule metric recording try: loop = asyncio.get_running_loop() # Fire and forget the metric recording task = loop.create_task( record_metric( name=self.metric_name, duration=duration, success=success, error=error_msg, tags=self.tags, ) ) # Add error handler to prevent unhandled exceptions task.add_done_callback( lambda t: t.exception() if t.done() else None ) except RuntimeError: # No event loop running, log instead logger.debug( f"Metric {self.metric_name}: duration={duration:.3f}s, " f"success={success}, error={error_msg}, tags={self.tags}" ) # Don't suppress exceptions return False async def __aenter__(self): """Async enter.""" self.start_time = time.perf_counter() return self async def __aexit__(self, exc_type, exc_val, exc_tb): """Async exit.""" if self.start_time is None or not METRICS_ENABLED: return False duration = time.perf_counter() - self.start_time success = exc_type is None error_msg = str(exc_val) if exc_val else None await record_metric( name=self.metric_name, duration=duration, success=success, error=error_msg, tags=self.tags, ) # Don't suppress exceptions return False ``` -------------------------------------------------------------------------------- /src/biomcp/openfda/device_events_helpers.py: -------------------------------------------------------------------------------- ```python """ Helper functions for OpenFDA device events to reduce complexity. """ from collections import Counter from typing import Any from .utils import clean_text, truncate_text def analyze_device_problems( results: list[dict[str, Any]], ) -> tuple[list, list, list]: """Analyze problems, devices, and manufacturers from results.""" all_problems = [] all_device_names = [] all_manufacturers = [] for result in results: devices = result.get("device", []) for dev in devices: # Collect device names if "brand_name" in dev: all_device_names.append(dev["brand_name"]) elif "generic_name" in dev: all_device_names.append(dev["generic_name"]) # Collect manufacturers if "manufacturer_d_name" in dev: all_manufacturers.append(dev["manufacturer_d_name"]) # Collect problems if "device_problem_text" in dev: problems = dev["device_problem_text"] if isinstance(problems, str): all_problems.append(problems) elif isinstance(problems, list): all_problems.extend(problems) return all_problems, all_device_names, all_manufacturers def format_top_problems(all_problems: list, results: list) -> list[str]: """Format top reported device problems.""" output = [] if len(results) > 1 and all_problems: problem_counts = Counter(all_problems) top_problems = problem_counts.most_common(5) output.append("### Top Reported Problems:") for prob, count in top_problems: percentage = (count / len(results)) * 100 output.append(f"- **{prob}**: {count} reports ({percentage:.1f}%)") output.append("") return output def format_device_distribution( all_device_names: list, results: list ) -> list[str]: """Format device distribution for problem searches.""" output = [] if len(results) > 1 and all_device_names: device_counts = Counter(all_device_names) top_devices = device_counts.most_common(5) output.append("### Devices with This Problem:") for dev_name, count in top_devices: output.append(f"- **{dev_name}**: {count} reports") output.append("") return output def format_device_report_summary( result: dict[str, Any], report_num: int ) -> list[str]: """Format a single device event report summary.""" output = [f"#### Report {report_num}"] # Event type event_type_map = { "D": "Death", "IN": "Injury", "IL": "Illness", "M": "Malfunction", "O": "Other", } event_type_code = result.get("event_type") or "Unknown" event_type = event_type_map.get(event_type_code, "Unknown") output.append(f"**Event Type**: {event_type}") # Date if date_received := result.get("date_received"): output.append(f"**Date Received**: {date_received}") # Device information devices = result.get("device", []) for j, dev in enumerate(devices, 1): output.extend(_format_device_info(dev, j, len(devices))) # Event description if event_desc := result.get("event_description"): output.append("\n**Event Description**:") cleaned_desc = clean_text(event_desc) output.append(truncate_text(cleaned_desc, 500)) # Patient impact output.extend(_format_patient_impact(result.get("patient", []))) # MDR report number if mdr_key := result.get("mdr_report_key"): output.append(f"\n*MDR Report #: {mdr_key}*") output.append("") return output def _format_device_info( dev: dict, device_num: int, total_devices: int ) -> list[str]: """Format individual device information.""" output = [] if total_devices > 1: output.append(f"\n**Device {device_num}:**") # Basic device info output.extend(_format_device_basic_info(dev)) # Problem if "device_problem_text" in dev: problems = dev["device_problem_text"] if isinstance(problems, str): problems = [problems] if problems: output.append(f"- **Problem**: {', '.join(problems[:3])}") # OpenFDA info output.extend(_format_device_class_info(dev.get("openfda", {}))) return output def _format_device_basic_info(dev: dict) -> list[str]: """Format basic device information.""" output = [] # Device name dev_name = dev.get("brand_name") or dev.get("generic_name") or "Unknown" output.append(f"- **Device**: {dev_name}") # Manufacturer if "manufacturer_d_name" in dev: output.append(f"- **Manufacturer**: {dev['manufacturer_d_name']}") # Model/Catalog if "model_number" in dev: output.append(f"- **Model**: {dev['model_number']}") if "catalog_number" in dev: output.append(f"- **Catalog #**: {dev['catalog_number']}") return output def _format_device_class_info(openfda: dict) -> list[str]: """Format device class and specialty information.""" output = [] if "device_class" in openfda: dev_class = openfda["device_class"] class_map = {"1": "Class I", "2": "Class II", "3": "Class III"} output.append( f"- **FDA Class**: {class_map.get(dev_class, dev_class)}" ) if "medical_specialty_description" in openfda: specialties = openfda["medical_specialty_description"] if specialties: output.append(f"- **Medical Specialty**: {specialties[0]}") return output def _format_patient_impact(patient_list: list) -> list[str]: """Format patient impact information.""" output = [] if patient_list: patient_info = patient_list[0] outcomes = [] if patient_info.get("date_of_death"): outcomes.append("Death") if patient_info.get("life_threatening") == "Y": outcomes.append("Life-threatening") if patient_info.get("disability") == "Y": outcomes.append("Disability") if outcomes: output.append(f"\n**Patient Impact**: {', '.join(outcomes)}") return output def format_device_detail_header( result: dict[str, Any], mdr_report_key: str ) -> list[str]: """Format device event detail header.""" output = [f"## Device Event Report: {mdr_report_key}\n"] output.append("### Event Overview") event_type_map = { "D": "Death", "IN": "Injury", "IL": "Illness", "M": "Malfunction", "O": "Other", } event_type_code = result.get("event_type") or "Unknown" event_type = event_type_map.get(event_type_code, "Unknown") output.append(f"**Event Type**: {event_type}") if date_received := result.get("date_received"): output.append(f"**Date Received**: {date_received}") if date_of_event := result.get("date_of_event"): output.append(f"**Date of Event**: {date_of_event}") # Report source source_map = { "P": "Physician", "O": "Other health professional", "U": "User facility", "C": "Distributor", "M": "Manufacturer", } source_type = result.get("source_type") if isinstance(source_type, list): # Handle case where source_type is a list sources: list[str] = [] for st in source_type: if st: mapped = source_map.get(st) sources.append(mapped if mapped else st) else: sources.append("Unknown") output.append(f"**Report Source**: {', '.join(sources)}") elif source_type: source = source_map.get(source_type, source_type) output.append(f"**Report Source**: {source}") else: output.append("**Report Source**: Unknown") output.append("") return output def format_detailed_device_info(devices: list[dict[str, Any]]) -> list[str]: """Format detailed device information.""" output = ["### Device Information"] for i, dev in enumerate(devices, 1): if len(devices) > 1: output.append(f"\n#### Device {i}") # Basic info dev_name = ( dev.get("brand_name") or dev.get("generic_name") or "Unknown" ) output.append(f"**Device Name**: {dev_name}") for field, label in [ ("manufacturer_d_name", "Manufacturer"), ("model_number", "Model Number"), ("catalog_number", "Catalog Number"), ("lot_number", "Lot Number"), ("date_received", "Device Received Date"), ("expiration_date_of_device", "Expiration Date"), ]: if value := dev.get(field): output.append(f"**{label}**: {value}") # Problems if "device_problem_text" in dev: problems = dev["device_problem_text"] if isinstance(problems, str): problems = [problems] output.append(f"**Device Problems**: {', '.join(problems)}") # OpenFDA data output.extend(_format_device_openfda(dev.get("openfda", {}))) # Evaluation if "device_evaluated_by_manufacturer" in dev: evaluated = ( "Yes" if dev["device_evaluated_by_manufacturer"] == "Y" else "No" ) output.append(f"**Evaluated by Manufacturer**: {evaluated}") output.append("") return output def _format_device_openfda(openfda: dict) -> list[str]: """Format OpenFDA device data.""" output = [] if "device_class" in openfda: dev_class = openfda["device_class"] class_map = {"1": "Class I", "2": "Class II", "3": "Class III"} output.append( f"**FDA Device Class**: {class_map.get(dev_class, dev_class)}" ) if specialties := openfda.get("medical_specialty_description"): if isinstance(specialties, list): output.append(f"**Medical Specialty**: {', '.join(specialties)}") else: output.append(f"**Medical Specialty**: {specialties}") if "product_code" in openfda: output.append(f"**Product Code**: {openfda['product_code']}") return output def format_patient_details(patient_list: list) -> list[str]: """Format detailed patient information.""" output: list[str] = [] if not patient_list: return output output.append("### Patient Information") patient_info = patient_list[0] # Demographics output.extend(_format_patient_demographics(patient_info)) # Outcomes outcomes = _collect_patient_outcomes(patient_info) if outcomes: output.append(f"**Outcomes**: {', '.join(outcomes)}") output.append("") return output def _format_patient_demographics(patient_info: dict) -> list[str]: """Format patient demographic information.""" output = [] if "patient_age" in patient_info: output.append(f"**Age**: {patient_info['patient_age']} years") if "patient_sex" in patient_info: sex_map = {"M": "Male", "F": "Female", "U": "Unknown"} sex = sex_map.get(patient_info["patient_sex"], "Unknown") output.append(f"**Sex**: {sex}") return output def _collect_patient_outcomes(patient_info: dict) -> list[str]: """Collect patient outcome information.""" outcomes = [] if date_of_death := patient_info.get("date_of_death"): outcomes.append(f"Death ({date_of_death})") if patient_info.get("life_threatening") == "Y": outcomes.append("Life-threatening") if patient_info.get("disability") == "Y": outcomes.append("Disability") if patient_info.get("hospitalization") == "Y": outcomes.append("Hospitalization") if patient_info.get("congenital_anomaly") == "Y": outcomes.append("Congenital anomaly") if patient_info.get("required_intervention") == "Y": outcomes.append("Required intervention") return outcomes ``` -------------------------------------------------------------------------------- /docs/backend-services-reference/07-alphagenome.md: -------------------------------------------------------------------------------- ```markdown # AlphaGenome API Reference Google DeepMind's AlphaGenome provides AI-powered predictions of variant effects on gene regulation, chromatin accessibility, and splicing. ## Usage Guide For a step-by-step tutorial on using AlphaGenome for variant effect prediction, see [How to Predict Variant Effects with AlphaGenome](../how-to-guides/04-predict-variant-effects-with-alphagenome.md). ## Overview AlphaGenome predicts regulatory effects of genetic variants by analyzing: - Gene expression changes in nearby genes - Chromatin accessibility alterations - Splicing pattern modifications - Enhancer and promoter activity - Transcription factor binding - 3D chromatin interactions **Note:** AlphaGenome is an optional integration requiring separate installation and API key. ## Authentication ### Obtaining an API Key 1. Visit [https://deepmind.google.com/science/alphagenome](https://deepmind.google.com/science/alphagenome) 2. Register for non-commercial research use 3. Accept terms of service 4. Receive API key via email ### API Key Usage **Environment Variable:** ```bash export ALPHAGENOME_API_KEY="your-key-here" ``` **Per-Request:** ```python result = alphagenome_predictor( chromosome="chr7", position=140753336, reference="A", alternate="T", api_key="your-key-here" # Overrides environment ) ``` ## Installation AlphaGenome requires separate installation: ```bash # Clone and install git clone https://github.com/google-deepmind/alphagenome.git cd alphagenome pip install . # Verify installation python -c "import alphagenome; print('AlphaGenome installed')" ``` ## API Interface ### Prediction Endpoint The AlphaGenome API is accessed through the BioMCP `alphagenome_predictor` tool. #### Parameters | Parameter | Type | Required | Description | | ------------------------ | --------- | -------- | --------------------------------- | | `chromosome` | str | Yes | Chromosome (e.g., "chr7") | | `position` | int | Yes | 1-based genomic position | | `reference` | str | Yes | Reference allele | | `alternate` | str | Yes | Alternate allele | | `interval_size` | int | No | Analysis window (default: 131072) | | `tissue_types` | list[str] | No | UBERON tissue codes | | `significance_threshold` | float | No | Log2FC threshold (default: 0.5) | | `api_key` | str | No | AlphaGenome API key | #### Interval Sizes | Size | Use Case | Description | | --------- | ---------- | ------------------------------ | | 2,048 | Promoter | TSS and promoter variants | | 16,384 | Local | Proximal regulatory elements | | 131,072 | Standard | Enhancer-promoter interactions | | 524,288 | Long-range | Distal regulatory elements | | 1,048,576 | TAD-level | Topological domain effects | ## Tissue Codes AlphaGenome supports tissue-specific predictions using UBERON ontology: | Tissue | UBERON Code | Description | | -------- | -------------- | -------------------- | | Breast | UBERON:0000310 | Mammary gland tissue | | Liver | UBERON:0002107 | Hepatic tissue | | Prostate | UBERON:0002367 | Prostate gland | | Brain | UBERON:0000955 | Neural tissue | | Lung | UBERON:0002048 | Pulmonary tissue | | Colon | UBERON:0001155 | Colonic mucosa | ## Response Format ### Gene Expression Predictions ```json { "gene_expression": [ { "gene_name": "BRAF", "gene_id": "ENSG00000157764", "distance_to_tss": 1234, "log2_fold_change": 1.25, "confidence": 0.89, "tissue": "UBERON:0000310" } ] } ``` **Interpretation:** - `log2_fold_change > 1.0`: Strong increase (2x+) - `log2_fold_change > 0.5`: Moderate increase - `log2_fold_change < -1.0`: Strong decrease (0.5x) - `log2_fold_change < -0.5`: Moderate decrease ### Chromatin Accessibility ```json { "chromatin_accessibility": [ { "region_type": "enhancer", "coordinates": "chr7:140450000-140451000", "accessibility_change": 0.75, "peak_height_change": 1.2, "tissue": "UBERON:0000310" } ] } ``` **Interpretation:** - Positive values: Increased accessibility (open chromatin) - Negative values: Decreased accessibility (closed chromatin) ### Splicing Predictions ```json { "splicing": [ { "event_type": "exon_skipping", "affected_exon": "ENST00000288602.6:exon14", "delta_psi": -0.35, "splice_site_strength_change": -2.1 } ] } ``` **PSI (Percent Spliced In):** - `delta_psi > 0`: Increased exon inclusion - `delta_psi < 0`: Increased exon skipping - `|delta_psi| > 0.1`: Biologically significant ## Usage Examples ### Basic Prediction ```python # Predict BRAF V600E effects result = await alphagenome_predictor( chromosome="chr7", position=140753336, reference="A", alternate="T" ) # Process results for gene in result.gene_expression: if abs(gene.log2_fold_change) > 1.0: print(f"{gene.gene_name}: {gene.log2_fold_change:.2f} log2FC") ``` ### Tissue-Specific Analysis ```python # Compare effects across tissues tissues = { "breast": "UBERON:0000310", "lung": "UBERON:0002048", "brain": "UBERON:0000955" } results = {} for tissue_name, tissue_code in tissues.items(): results[tissue_name] = await alphagenome_predictor( chromosome="chr17", position=7577120, reference="G", alternate="A", tissue_types=[tissue_code] ) ``` ### Promoter Variant Analysis ```python # Use small window for promoter variants result = await alphagenome_predictor( chromosome="chr7", position=5569100, # Near ACTB promoter reference="C", alternate="T", interval_size=2048 # 2kb window ) # Check for promoter effects promoter_effects = [ g for g in result.gene_expression if abs(g.distance_to_tss) < 1000 ] ``` ### Enhancer Variant Analysis ```python # Use larger window for enhancer variants result = await alphagenome_predictor( chromosome="chr8", position=128748315, # MYC enhancer region reference="G", alternate="A", interval_size=524288 # 512kb window ) # Analyze chromatin changes enhancer_changes = [ c for c in result.chromatin_accessibility if c.region_type == "enhancer" and abs(c.accessibility_change) > 0.5 ] ``` ## Best Practices ### 1. Choose Appropriate Interval Size ```python def select_interval_size(variant_type): """Select interval based on variant type""" intervals = { "promoter": 2048, "splice_site": 16384, "enhancer": 131072, "intergenic": 524288, "structural": 1048576 } return intervals.get(variant_type, 131072) ``` ### 2. Handle Missing Predictions ```python # Not all variants affect gene expression if not result.gene_expression: print("No gene expression changes predicted") # Check chromatin or splicing effects instead ``` ### 3. Filter by Significance ```python # Focus on significant changes significant_genes = [ g for g in result.gene_expression if abs(g.log2_fold_change) > significance_threshold and g.confidence > 0.8 ] ``` ### 4. Validate Input ```python def validate_variant(chr, pos, ref, alt): """Validate variant format""" # Check chromosome format if not chr.startswith("chr"): raise ValueError("Chromosome must start with 'chr'") # Check alleles valid_bases = set("ACGT") if ref not in valid_bases or alt not in valid_bases: raise ValueError("Invalid nucleotide") # Check position if pos < 1: raise ValueError("Position must be 1-based") ``` ## Integration Patterns ### VUS Classification Pipeline ```python async def classify_vus(variant): """Classify variant of unknown significance""" # 1. Predict regulatory effects predictions = await alphagenome_predictor( chromosome=variant.chr, position=variant.pos, reference=variant.ref, alternate=variant.alt ) # 2. Score impact max_expression = max( abs(g.log2_fold_change) for g in predictions.gene_expression ) if predictions.gene_expression else 0 max_chromatin = max( abs(c.accessibility_change) for c in predictions.chromatin_accessibility ) if predictions.chromatin_accessibility else 0 # 3. Classify if max_expression > 2.0 or max_chromatin > 1.5: return "High regulatory impact" elif max_expression > 1.0 or max_chromatin > 0.75: return "Moderate regulatory impact" else: return "Low regulatory impact" ``` ### Multi-Variant Analysis ```python async def analyze_variant_set(variants, target_gene): """Analyze multiple variants affecting a gene""" results = [] for variant in variants: prediction = await alphagenome_predictor( chromosome=variant["chr"], position=variant["pos"], reference=variant["ref"], alternate=variant["alt"] ) # Find target gene effect for gene in prediction.gene_expression: if gene.gene_name == target_gene: results.append({ "variant": f"{variant['chr']}:{variant['pos']}", "effect": gene.log2_fold_change, "confidence": gene.confidence }) break # Sort by effect size return sorted(results, key=lambda x: abs(x["effect"]), reverse=True) ``` ## Limitations ### Technical Limitations - **Species**: Human only (GRCh38) - **Variant Types**: SNVs only (no indels/SVs) - **Sequence Context**: Requires reference match - **Computation Time**: 1-3 seconds per variant ### Biological Limitations - **Cell Type**: Predictions are tissue-specific approximations - **Environmental Factors**: Does not account for conditions - **Epistasis**: Single variant effects only - **Temporal**: No developmental stage consideration ## Error Handling ### Common Errors ```python try: result = await alphagenome_predictor(...) except AlphaGenomeError as e: if "API key" in str(e): # Handle missing/invalid key pass elif "Invalid sequence" in str(e): # Handle sequence errors pass elif "Rate limit" in str(e): # Handle rate limiting pass ``` ### Retry Logic ```python async def predict_with_retry(params, max_retries=3): """Retry on transient failures""" for attempt in range(max_retries): try: return await alphagenome_predictor(**params) except Exception as e: if attempt == max_retries - 1: raise await asyncio.sleep(2 ** attempt) # Exponential backoff ``` ## Performance Optimization ### Batch Processing ```python async def batch_predict(variants, batch_size=10): """Process variants in batches""" results = [] for i in range(0, len(variants), batch_size): batch = variants[i:i + batch_size] batch_results = await asyncio.gather(*[ alphagenome_predictor(**v) for v in batch ]) results.extend(batch_results) # Rate limiting if i + batch_size < len(variants): await asyncio.sleep(1) return results ``` ### Caching Strategy ```python from functools import lru_cache @lru_cache(maxsize=1000) def get_cached_prediction(chr, pos, ref, alt, interval): """Cache predictions for repeated queries""" return alphagenome_predictor( chromosome=chr, position=pos, reference=ref, alternate=alt, interval_size=interval ) ``` ## Support Resources - **Documentation**: [AlphaGenome GitHub](https://github.com/google-deepmind/alphagenome) - **Paper**: [Nature Publication](https://www.nature.com/alphagenome) - **Support**: Via GitHub issues - **Terms**: Non-commercial research use only ``` -------------------------------------------------------------------------------- /docs/how-to-guides/03-get-comprehensive-variant-annotations.md: -------------------------------------------------------------------------------- ```markdown # How to Get Comprehensive Variant Annotations This guide demonstrates how to retrieve and interpret genetic variant information using BioMCP's integrated databases. ## Overview BioMCP provides variant annotations from multiple sources: - **MyVariant.info**: Core variant database with clinical significance ([BioThings Reference](../backend-services-reference/02-biothings-suite.md)) - **External Annotations**: TCGA cancer data, 1000 Genomes population frequencies - **cBioPortal Integration**: Cancer-specific mutation context ([API Reference](../backend-services-reference/03-cbioportal.md)) - **BioThings Links**: Connected gene, disease, and drug information ([BioThings Suite](../backend-services-reference/02-biothings-suite.md)) ## Basic Variant Lookup ### Search by rsID Find variant information using dbSNP identifiers: ```bash # CLI biomcp variant get rs121913529 # Python variant = await client.variants.get("rs121913529") # MCP Tool variant_getter(variant_id="rs121913529") ``` ### Search by HGVS Notation Use standard HGVS notation: ```python # Protein change variant = await variant_getter("NP_004324.2:p.Val600Glu") # Coding DNA change variant = await variant_getter("NM_004333.4:c.1799T>A") # Genomic coordinates variant = await variant_getter("NC_000007.13:g.140453136A>T") ``` ### Search by Genomic Position ```python # Search by coordinates variants = await variant_searcher( chromosome="7", start=140453136, end=140453136, assembly="hg38" # or hg19 ) ``` ## Understanding Variant Annotations ### Clinical Significance ```python # Get variant details variant = await variant_getter("rs121913529") # Check clinical significance print(f"Clinical Significance: {variant.clinical_significance}") # Output: "Pathogenic" print(f"ClinVar Review Status: {variant.review_status}") # Output: "reviewed by expert panel" ``` ### Population Frequencies ```python # Access frequency data if variant.frequencies: print("Population Frequencies:") print(f" gnomAD: {variant.frequencies.gnomad}") print(f" 1000 Genomes: {variant.frequencies.thousand_genomes}") print(f" ExAC: {variant.frequencies.exac}") ``` ### Functional Predictions ```python # In silico predictions if variant.predictions: print(f"CADD Score: {variant.predictions.cadd}") print(f"PolyPhen: {variant.predictions.polyphen}") print(f"SIFT: {variant.predictions.sift}") ``` ## Advanced Variant Searches ### Filter by Clinical Significance ```python # Find pathogenic BRCA1 variants pathogenic_variants = await variant_searcher( gene="BRCA1", significance="pathogenic", limit=20 ) # Multiple significance levels variants = await variant_searcher( gene="TP53", significance=["pathogenic", "likely_pathogenic"] ) ``` ### Filter by Frequency Find rare variants: ```python # Rare variants (MAF < 1%) rare_variants = await variant_searcher( gene="CFTR", frequency_max=0.01, significance="pathogenic" ) # Ultra-rare variants ultra_rare = await variant_searcher( gene="SCN1A", frequency_max=0.0001 ) ``` ### Filter by Prediction Scores ```python # High-impact variants high_impact = await variant_searcher( gene="MLH1", cadd_score_min=20, # CADD > 20 suggests deleteriousness polyphen_prediction="probably_damaging" ) ``` ## External Database Integration For technical details on external data sources, see the [BioThings Suite Reference](../backend-services-reference/02-biothings-suite.md). ### TCGA Cancer Data Variants automatically include TCGA annotations when available: ```python variant = await variant_getter("rs121913529", include_external=True) # Check TCGA data if variant.external_data.get("tcga"): tcga = variant.external_data["tcga"] print(f"TCGA Studies: {tcga['study_count']}") print(f"Cancer Types: {', '.join(tcga['cancer_types'])}") print(f"Sample Count: {tcga['sample_count']}") ``` ### 1000 Genomes Project Population-specific frequencies: ```python # Access 1000 Genomes data if variant.external_data.get("thousand_genomes"): tg_data = variant.external_data["thousand_genomes"] print("Population Frequencies:") for pop, freq in tg_data["populations"].items(): print(f" {pop}: {freq}") ``` ### Ensembl VEP Annotations ```python # Consequence predictions if variant.consequences: for consequence in variant.consequences: print(f"Gene: {consequence.gene}") print(f"Impact: {consequence.impact}") print(f"Consequence: {consequence.consequence_terms}") ``` ## Integration with Other BioMCP Tools BioMCP's unified architecture allows seamless integration between variant data and other biomedical information. For implementation details, see the [Transport Protocol Guide](../developer-guides/04-transport-protocol.md). ### Variant to Gene Information ```python # Get variant variant = await variant_getter("rs121913529") # Get associated gene details gene_symbol = variant.gene.symbol # "BRAF" gene_info = await gene_getter(gene_symbol) print(f"Gene: {gene_info.name}") print(f"Function: {gene_info.summary}") ``` ### Variant to Disease Context ```python # Find disease associations diseases = variant.disease_associations for disease in diseases: # Get detailed disease info disease_info = await disease_getter(disease.name) print(f"Disease: {disease_info.name}") print(f"Definition: {disease_info.definition}") print(f"Synonyms: {', '.join(disease_info.synonyms)}") ``` ### Variant to Clinical Trials ```python # Search trials for specific variant gene = variant.gene.symbol mutation = variant.protein_change # e.g., "V600E" trials = await trial_searcher( other_terms=[f"{gene} {mutation}", f"{gene} mutation"], recruiting_status="OPEN" ) ``` ## Practical Workflows ### Workflow 1: Cancer Variant Analysis ```python async def analyze_cancer_variant(hgvs: str): # Think about the analysis await think( thought=f"Analyzing cancer variant {hgvs}", thoughtNumber=1 ) # Get variant details variant = await variant_getter(hgvs, include_external=True) # Get gene context gene = await gene_getter(variant.gene.symbol) # Search for targeted therapies drugs = await search( query=f"drugs.targets:{variant.gene.symbol}", domain="drug" ) # Find relevant trials trials = await trial_searcher( other_terms=[ variant.gene.symbol, variant.protein_change, "targeted therapy" ], recruiting_status="OPEN" ) # Search literature articles = await article_searcher( genes=[variant.gene.symbol], variants=[hgvs], keywords=["therapy", "treatment", "resistance"] ) return { "variant": variant, "gene": gene, "potential_drugs": drugs, "clinical_trials": trials, "literature": articles } ``` ### Workflow 2: Rare Disease Variant ```python async def rare_disease_variant_analysis(gene: str, phenotype: str): # Find all pathogenic variants variants = await variant_searcher( gene=gene, significance=["pathogenic", "likely_pathogenic"], frequency_max=0.001 # Rare ) # Analyze each variant results = [] for v in variants[:10]: # Top 10 # Get full annotations full_variant = await variant_getter(v.id) # Check phenotype associations if phenotype.lower() in str(full_variant.phenotypes).lower(): results.append({ "variant": full_variant, "phenotype_match": True, "frequency": full_variant.frequencies.gnomad or 0 }) # Sort by relevance results.sort(key=lambda x: x["frequency"]) return results ``` ### Workflow 3: Pharmacogenomics ```python async def pharmacogenomic_analysis(drug_name: str): # Get drug information drug = await drug_getter(drug_name) # Find pharmGKB annotations pgx_variants = [] # Search for drug-related variants if drug.targets: for target in drug.targets: variants = await variant_searcher( gene=target, keywords=[drug_name, "pharmacogenomics", "drug response"] ) pgx_variants.extend(variants) # Get detailed annotations annotated = [] for v in pgx_variants: full = await variant_getter(v.id) if full.pharmacogenomics: annotated.append(full) return { "drug": drug, "pgx_variants": annotated, "affected_genes": list(set(v.gene.symbol for v in annotated)) } ``` ## Interpreting Results ### Clinical Actionability ```python def assess_actionability(variant): """Determine if variant is clinically actionable""" actionable = False reasons = [] # Check pathogenicity if variant.clinical_significance in ["pathogenic", "likely_pathogenic"]: actionable = True reasons.append("Pathogenic variant") # Check for drug associations if variant.drug_associations: actionable = True reasons.append(f"Associated with {len(variant.drug_associations)} drugs") # Check guidelines if variant.clinical_guidelines: actionable = True reasons.append("Clinical guidelines available") return { "actionable": actionable, "reasons": reasons, "recommendations": variant.clinical_guidelines } ``` ### Report Generation ```python def generate_variant_report(variant): """Create a clinical variant report""" report = f""" ## Variant Report: {variant.id} ### Basic Information - **Gene**: {variant.gene.symbol} - **Protein Change**: {variant.protein_change or "N/A"} - **Genomic Location**: chr{variant.chr}:{variant.pos} - **Reference**: {variant.ref} → **Alternate**: {variant.alt} ### Clinical Significance - **Status**: {variant.clinical_significance} - **Review**: {variant.review_status} - **Last Updated**: {variant.last_updated} ### Population Frequency - **gnomAD**: {variant.frequencies.gnomad or "Not found"} - **1000 Genomes**: {variant.frequencies.thousand_genomes or "Not found"} ### Predictions - **CADD Score**: {variant.predictions.cadd or "N/A"} - **PolyPhen**: {variant.predictions.polyphen or "N/A"} - **SIFT**: {variant.predictions.sift or "N/A"} ### Associated Conditions {format_conditions(variant.conditions)} ### Clinical Resources - **ClinVar**: {variant.clinvar_url} - **dbSNP**: {variant.dbsnp_url} """ return report ``` ## Best Practices ### 1. Use Multiple Identifiers ```python # Try multiple formats if one fails identifiers = [ "rs121913529", "NM_004333.4:c.1799T>A", "7:140453136:A:T" ] for id in identifiers: try: variant = await variant_getter(id) break except: continue ``` ### 2. Check Data Completeness ```python # Not all variants have all annotations if variant.frequencies: # Use frequency data pass else: # Note that frequency unavailable pass ``` ### 3. Consider Assembly Versions ```python # Specify genome assembly variants_hg38 = await variant_searcher( chromosome="7", start=140453136, assembly="hg38" ) variants_hg19 = await variant_searcher( chromosome="7", start=140153336, # Different coordinate! assembly="hg19" ) ``` ## Troubleshooting ### Variant Not Found 1. **Check notation**: Ensure proper HGVS format 2. **Try alternatives**: rsID, genomic coordinates, protein change 3. **Verify gene symbol**: Use official HGNC symbols ### Missing Annotations - Not all variants have all data types - Rare variants may lack population frequencies - Novel variants won't have ClinVar data ### Performance Issues - Use pagination for large searches - Limit external data requests when not needed - Cache frequently accessed variants ## Next Steps - Learn to [predict variant effects](04-predict-variant-effects-with-alphagenome.md) - Explore [article searches](01-find-articles-and-cbioportal-data.md) for variant literature - Set up [logging and monitoring](05-logging-and-monitoring-with-bigquery.md) ``` -------------------------------------------------------------------------------- /tests/test_pydantic_ai_integration.py: -------------------------------------------------------------------------------- ```python """ Tests for Pydantic AI integration with BioMCP. These tests verify the examples provided in the documentation work correctly. """ import asyncio import os import sys import httpx import pytest from pydantic_ai import Agent from pydantic_ai.mcp import MCPServerStdio try: from pydantic_ai.mcp import MCPServerStreamableHTTP # noqa: F401 HAS_STREAMABLE_HTTP = True except ImportError: HAS_STREAMABLE_HTTP = False from pydantic_ai.models.test import TestModel def worker_dependencies_available(): """Check if worker dependencies (FastAPI, Starlette) are available.""" try: import fastapi # noqa: F401 import starlette # noqa: F401 return True except ImportError: return False # Skip marker for tests requiring worker dependencies requires_worker = pytest.mark.skipif( not worker_dependencies_available(), reason="Worker dependencies (FastAPI/Starlette) not installed. Install with: pip install biomcp-python[worker]", ) # Skip marker for tests requiring MCPServerStreamableHTTP requires_streamable_http = pytest.mark.skipif( not HAS_STREAMABLE_HTTP, reason="MCPServerStreamableHTTP not available. Requires pydantic-ai>=0.6.9", ) def get_free_port(): """Get a free port for testing.""" import socket with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: s.bind(("", 0)) s.listen(1) port = s.getsockname()[1] return port async def wait_for_server( url: str, max_retries: int = 60, process=None ) -> None: """Wait for server to be ready with retries.""" import sys for i in range(max_retries): # Check if process has exited with error if process and process.poll() is not None: stdout, stderr = process.communicate() pytest.fail( f"Server process exited with code {process.returncode}. Stderr: {stderr.decode() if stderr else 'None'}" ) try: async with httpx.AsyncClient() as client: response = await client.get(url, timeout=2) if response.status_code == 200: print( f"\nServer ready after {i + 1} seconds", file=sys.stderr, ) return except (httpx.ConnectError, httpx.ReadTimeout): if i % 10 == 0: print( f"\nWaiting for server... ({i} seconds elapsed)", file=sys.stderr, ) await asyncio.sleep(1) pytest.fail(f"Server at {url} did not start within {max_retries} seconds") @pytest.mark.asyncio async def test_stdio_mode_connection(): """Test STDIO mode connection and tool listing.""" server = MCPServerStdio( "python", args=["-m", "biomcp", "run", "--mode", "stdio"], timeout=20 ) # Use TestModel to avoid needing API keys model = TestModel(call_tools=["search"]) agent = Agent(model=model, toolsets=[server]) async with agent: # Test a simple query to verify connection works result = await agent.run("List available tools") # Should get a response without errors assert result is not None assert result.output is not None @pytest.mark.asyncio async def test_stdio_mode_simple_query(): """Test STDIO mode with a simple search query.""" server = MCPServerStdio( "python", args=["-m", "biomcp", "run", "--mode", "stdio"], timeout=20 ) # Use TestModel configured to call search model = TestModel(call_tools=["search"]) agent = Agent(model=model, toolsets=[server]) async with agent: result = await agent.run("Find 1 melanoma clinical trial") # TestModel will have called the search tool assert result.output is not None # The TestModel returns mock data, but we're testing the connection works assert result.output != "" @pytest.mark.asyncio async def test_stdio_mode_with_openai(): """Test STDIO mode with OpenAI (requires OPENAI_API_KEY).""" # Skip if no API key if not os.getenv("OPENAI_API_KEY"): pytest.skip("OPENAI_API_KEY not set") server = MCPServerStdio( "python", args=["-m", "biomcp", "run", "--mode", "stdio"], timeout=30 ) agent = Agent("openai:gpt-4o-mini", toolsets=[server]) async with agent: result = await agent.run( "Find 1 article about BRAF V600E mutations. Return just the title." ) # Should get a real result assert result.output is not None assert len(result.output) > 0 @requires_worker @requires_streamable_http @pytest.mark.asyncio async def test_streamable_http_mode_connection(): """Test Streamable HTTP mode connection for Pydantic AI.""" import subprocess from pydantic_ai.mcp import MCPServerStreamableHTTP port = get_free_port() # Start server in streamable_http mode server_process = subprocess.Popen( # noqa: S603 [ sys.executable, "-m", "biomcp", "run", "--mode", "streamable_http", "--port", str(port), ], stdout=subprocess.PIPE, stderr=subprocess.PIPE, ) try: # Wait for server to be ready await wait_for_server( f"http://localhost:{port}/health", process=server_process ) # Connect to the /mcp endpoint server = MCPServerStreamableHTTP(f"http://localhost:{port}/mcp") # Use TestModel to avoid needing API keys model = TestModel(call_tools=["search"]) agent = Agent(model=model, toolsets=[server]) async with agent: # Test a simple query to verify connection result = await agent.run("Test connection") assert result is not None assert result.output is not None finally: # Clean up server process server_process.terminate() server_process.wait(timeout=5) @requires_worker @requires_streamable_http @pytest.mark.asyncio async def test_streamable_http_simple_query(): """Test a simple biomedical query using Streamable HTTP.""" import subprocess from pydantic_ai.mcp import MCPServerStreamableHTTP port = get_free_port() server_process = subprocess.Popen( # noqa: S603 [ sys.executable, "-m", "biomcp", "run", "--mode", "streamable_http", "--port", str(port), ], stdout=subprocess.PIPE, stderr=subprocess.PIPE, ) try: # Wait for server to be ready await wait_for_server( f"http://localhost:{port}/health", process=server_process ) # Connect to the /mcp endpoint server = MCPServerStreamableHTTP(f"http://localhost:{port}/mcp") # Use TestModel with tool calls for search model = TestModel(call_tools=["search"]) agent = Agent(model=model, toolsets=[server]) async with agent: result = await agent.run( "Find 1 article about BRAF mutations. Return just the title." ) # Should get a result assert result.output is not None assert len(result.output) > 0 finally: server_process.terminate() server_process.wait(timeout=5) @requires_worker @pytest.mark.asyncio async def test_worker_mode_streamable_http(): """Test worker mode which now uses streamable HTTP under the hood.""" import subprocess port = get_free_port() # Start server in worker mode (which uses streamable HTTP) server_process = subprocess.Popen( # noqa: S603 [ sys.executable, "-m", "biomcp", "run", "--mode", "worker", "--port", str(port), ], stdout=subprocess.PIPE, stderr=subprocess.PIPE, ) try: # Wait for server to be ready await wait_for_server( f"http://localhost:{port}/health", process=server_process ) # Worker mode exposes /mcp endpoint through streamable HTTP async with httpx.AsyncClient() as client: # Test the /mcp endpoint with initialize request response = await client.post( f"http://localhost:{port}/mcp", json={ "jsonrpc": "2.0", "method": "initialize", "params": { "protocolVersion": "2025-06-18", "capabilities": {}, "clientInfo": {"name": "test", "version": "1.0"}, }, "id": 1, }, headers={ "Content-Type": "application/json", "Accept": "application/json, text/event-stream", }, ) # Worker mode may return various codes depending on initialization state # 200 = success, 406 = accept header issue, 500 = initialization incomplete assert response.status_code in [200, 406, 500] # Health endpoint should work health_response = await client.get( f"http://localhost:{port}/health" ) assert health_response.status_code == 200 assert health_response.json()["status"] == "healthy" finally: server_process.terminate() server_process.wait(timeout=5) @pytest.mark.asyncio async def test_connection_verification_script(): """Test the connection verification script from documentation.""" server = MCPServerStdio( "python", args=["-m", "biomcp", "run", "--mode", "stdio"], timeout=20 ) # Use TestModel to avoid needing LLM credentials agent = Agent(model=TestModel(call_tools=["search"]), toolsets=[server]) async with agent: # Test a simple search to verify connection result = await agent.run("Test search for BRAF") # Verify connection successful assert result is not None assert result.output is not None @pytest.mark.asyncio async def test_biomedical_research_workflow(): """Test a complete biomedical research workflow.""" server = MCPServerStdio( "python", args=["-m", "biomcp", "run", "--mode", "stdio"], timeout=30 ) # Use TestModel configured to use multiple tools model = TestModel(call_tools=["think", "search", "fetch"]) agent = Agent(model=model, toolsets=[server]) async with agent: # Complex multi-step query result = await agent.run(""" First use the think tool to plan your approach, then: 1. Search for articles about BRAF mutations 2. Find relevant clinical trials """) # Should complete without errors assert result is not None assert result.output is not None @requires_worker @pytest.mark.asyncio async def test_health_endpoint(): """Test that the health endpoint is accessible.""" import subprocess port = get_free_port() server_process = subprocess.Popen( # noqa: S603 [ sys.executable, "-m", "biomcp", "run", "--mode", "worker", "--port", str(port), ], stdout=subprocess.PIPE, stderr=subprocess.PIPE, ) try: # Give subprocess a moment to start await asyncio.sleep(2) # Wait for server to be ready await wait_for_server( f"http://localhost:{port}/health", process=server_process ) async with httpx.AsyncClient() as client: response = await client.get(f"http://localhost:{port}/health") assert response.status_code == 200 data = response.json() assert "status" in data assert data["status"] in ["healthy", "ok"] finally: server_process.terminate() server_process.wait(timeout=5) ``` -------------------------------------------------------------------------------- /tests/bdd/search_trials/test_search.py: -------------------------------------------------------------------------------- ```python import asyncio from typing import Any from pytest_bdd import given, parsers, scenarios, then, when from biomcp.trials.search import ( AgeGroup, DateField, InterventionType, PrimaryPurpose, RecruitingStatus, SortOrder, SponsorType, StudyDesign, StudyType, TrialPhase, TrialQuery, search_trials, ) scenarios("search.feature") @given( parsers.parse('I build a trial query with condition "{condition}"'), target_fixture="trial_query", ) def trial_query(condition: str) -> TrialQuery: return TrialQuery(conditions=[condition]) @given( parsers.parse('I build a trial query with term "{term}"'), target_fixture="trial_query", ) def trial_query_with_term(term: str) -> TrialQuery: return TrialQuery(terms=[term]) @given( parsers.parse('I build a trial query with nct_id "{nct_id}"'), target_fixture="trial_query", ) def trial_query_with_nct_id(nct_id: str) -> TrialQuery: return TrialQuery(nct_ids=[nct_id]) @given(parsers.parse('I add intervention "{intervention}"')) def add_intervention(trial_query: TrialQuery, intervention: str): trial_query.interventions = [intervention] @given(parsers.parse('I add nct_id "{nct_id}"')) def add_nct_id(trial_query: TrialQuery, nct_id: str): if trial_query.nct_ids is None: trial_query.nct_ids = [] trial_query.nct_ids.append(nct_id) @given(parsers.parse('I set recruiting status to "{status}"')) def set_recruiting_status(trial_query: TrialQuery, status: RecruitingStatus): trial_query.recruiting_status = status @given(parsers.parse('I set study type to "{study_type}"')) def set_study_type(trial_query: TrialQuery, study_type: StudyType): trial_query.study_type = study_type @given(parsers.parse('I set phase to "{phase}"')) def set_phase(trial_query: TrialQuery, phase: TrialPhase): trial_query.phase = phase @given(parsers.parse('I set sort order to "{sort_order}"')) def set_sort_order(trial_query: TrialQuery, sort_order: SortOrder): trial_query.sort = sort_order @given( parsers.parse( 'I set location to latitude "{lat}" longitude "{lon}" within "{distance}" miles', ), ) def set_location(trial_query: TrialQuery, lat: str, lon: str, distance: str): trial_query.lat = float(lat) trial_query.long = float(lon) trial_query.distance = int(distance) @given(parsers.parse('I set age group to "{age_group}"')) def set_age_group(trial_query: TrialQuery, age_group: AgeGroup): trial_query.age_group = age_group @given(parsers.parse('I set primary purpose to "{purpose}"')) def set_primary_purpose(trial_query: TrialQuery, purpose: PrimaryPurpose): trial_query.primary_purpose = purpose @given(parsers.parse('I set min date to "{min_date}"')) def set_min_date(trial_query: TrialQuery, min_date: str): trial_query.min_date = min_date @given(parsers.parse('I set max date to "{max_date}"')) def set_max_date(trial_query: TrialQuery, max_date: str): trial_query.max_date = max_date @given(parsers.parse('I set date field to "{date_field}"')) def set_date_field(trial_query: TrialQuery, date_field: DateField): trial_query.date_field = date_field @given(parsers.parse('I set intervention type to "{intervention_type}"')) def set_intervention_type( trial_query: TrialQuery, intervention_type: InterventionType ): trial_query.intervention_type = intervention_type @given(parsers.parse('I set sponsor type to "{sponsor_type}"')) def set_sponsor_type(trial_query: TrialQuery, sponsor_type: SponsorType): trial_query.sponsor_type = sponsor_type @given(parsers.parse('I set study design to "{study_design}"')) def set_study_design(trial_query: TrialQuery, study_design: StudyDesign): trial_query.study_design = study_design @when("I perform a trial search", target_fixture="trial_results") def trial_results(trial_query: TrialQuery): """ Perform a trial search and convert the markdown response to JSON for easier parsing in the test assertions. """ return asyncio.run(search_trials(trial_query, output_json=True)) @then( parsers.parse( 'the response should contain a study with condition "{condition}"', ), ) def check_condition(trial_results: dict[str, Any], condition: str): """Verify that studies are returned for the condition query.""" @then( parsers.parse( 'the response should contain a study with term "{term}"', ), ) def check_term(trial_results: dict[str, Any], term: str): """Verify that studies are returned for the term query.""" @then( parsers.parse( 'the response should contain a study with NCT ID "{nct_id}"', ), ) def check_specific_nct_id(trial_results: dict[str, Any], nct_id: str): """Verify that the specific NCT ID is in the results.""" @then( parsers.parse( 'the response should not contain a study with NCT ID "{nct_id}"', ), ) def check_nct_id_not_present(trial_results: dict[str, Any], nct_id: str): """Verify that the specific NCT ID is NOT in the results.""" # For empty results or results with no studies key if not trial_results or "studies" not in trial_results: return # Test passes - no studies found studies = trial_results.get("studies", []) if not studies: return # Test passes - empty studies list # Check that none of the studies have the specified NCT ID for study in studies: protocol = study.get("protocolSection", {}) id_module = protocol.get("identificationModule", {}) if id_module.get("nctId", "") == nct_id: raise AssertionError( f"Found study with NCT ID {nct_id} when it should not be present" ) @then("the study should have a valid NCT ID") def check_nct_id(trial_results: dict[str, Any]): """Verify that the NCT ID is valid.""" @then(parsers.parse('the study should include intervention "{intervention}"')) def check_intervention(trial_results: dict[str, Any], intervention: str): """Verify that studies are returned for the intervention query.""" @then(parsers.parse('the study should be of type "{study_type}"')) def check_study_type(trial_results: dict[str, Any], study_type: str): """Check if the study has the expected study type.""" @then(parsers.parse('the study should be in phase "{phase}"')) def check_phase(trial_results: dict[str, Any], phase: str): """Check if the study has the expected phase.""" @then(parsers.parse('the studies should be sorted by "{sort_field}"')) def check_sort_order(trial_results: dict[str, Any], sort_field: str): """Verify that results are sorted in the expected order.""" @then(parsers.parse('at least one study location should be in "{state}"')) def check_location_state(trial_results: dict[str, Any], state: str): """Verify that studies are returned for the location query.""" @then("the study should have required fields") def check_required_fields(trial_results: dict[str, Any]): """Verify all required fields are present in the search results.""" @then(parsers.parse('the study should have recruiting status "{status}"')) def check_recruiting_status(trial_results: dict[str, Any], status: str): """Check if the study has the expected recruiting status.""" @then(parsers.parse('the study should include age group "{age_group}"')) def check_age_group(trial_results: dict[str, Any], age_group: str): """Check if the study includes the expected age group.""" @then(parsers.parse('the study should have primary purpose "{purpose}"')) def check_primary_purpose(trial_results: dict[str, Any], purpose: str): """Check if the study has the expected primary purpose.""" @then(parsers.parse('the study should have a start date after "{min_date}"')) def check_start_date(trial_results: dict[str, Any], min_date: str): """Check if the study has a start date after the specified date.""" @then( parsers.parse( 'the study should have intervention type "{intervention_type}"' ) ) def check_intervention_type( trial_results: dict[str, Any], intervention_type: str ): """Check if the study has the expected intervention type.""" @then( parsers.parse('the study should have a sponsor of type "{sponsor_type}"') ) def check_sponsor_type(trial_results: dict[str, Any], sponsor_type: str): """Check if the study has a sponsor of the expected type.""" @then(parsers.parse('the study should have design "{study_design}"')) def check_study_design(trial_results: dict[str, Any], study_design: str): """Check if the study has the expected study design.""" @then("the response should contain studies") def check_studies_present(trial_results: dict[str, Any]): """Verify that studies are returned in the response.""" # New step definitions for eligibility-focused features @given(parsers.parse('I add prior therapy "{therapy}"')) def add_prior_therapy(trial_query: TrialQuery, therapy: str): """Add prior therapy to the query.""" trial_query.prior_therapies = [therapy] @given(parsers.parse('I add progression on "{therapy}"')) def add_progression_on(trial_query: TrialQuery, therapy: str): """Add progression on therapy to the query.""" trial_query.progression_on = [therapy] @given(parsers.parse('I add required mutation "{mutation}"')) def add_required_mutation(trial_query: TrialQuery, mutation: str): """Add required mutation to the query.""" trial_query.required_mutations = [mutation] @given(parsers.parse('I add excluded mutation "{mutation}"')) def add_excluded_mutation(trial_query: TrialQuery, mutation: str): """Add excluded mutation to the query.""" trial_query.excluded_mutations = [mutation] @given( parsers.parse( 'I add biomarker expression "{biomarker}" with value "{expression}"' ) ) def add_biomarker_expression( trial_query: TrialQuery, biomarker: str, expression: str ): """Add biomarker expression requirement to the query.""" trial_query.biomarker_expression = {biomarker: expression} @given(parsers.parse('I set line of therapy to "{line}"')) def set_line_of_therapy(trial_query: TrialQuery, line: str): """Set line of therapy filter.""" from biomcp.trials.search import LineOfTherapy # Map string values to enum mapping = { "1L": LineOfTherapy.FIRST_LINE, "2L": LineOfTherapy.SECOND_LINE, "3L+": LineOfTherapy.THIRD_LINE_PLUS, } trial_query.line_of_therapy = mapping.get(line, line) @given(parsers.parse('I set allow brain mets to "{allow}"')) def set_allow_brain_mets(trial_query: TrialQuery, allow: str): """Set brain metastases filter.""" trial_query.allow_brain_mets = allow.lower() == "true" @then( parsers.parse( 'the study eligibility should mention "{term}" with "{context}" context' ) ) def check_eligibility_with_context( trial_results: dict[str, Any], term: str, context: str ): """Check if eligibility criteria mentions term in the right context.""" # Just verify we got results - actual matching happens on the API side @then(parsers.parse('the study eligibility should mention "{term}"')) def check_eligibility_mentions(trial_results: dict[str, Any], term: str): """Check if eligibility criteria mentions the term.""" # Just verify we got results - actual matching happens on the API side @then(parsers.parse('the study eligibility should exclude "{term}"')) def check_eligibility_excludes(trial_results: dict[str, Any], term: str): """Check if eligibility criteria excludes the term.""" # Just verify we got results - actual matching happens on the API side @then( parsers.parse( 'the study eligibility should mention "{biomarker}" with expression "{expression}"' ) ) def check_eligibility_biomarker( trial_results: dict[str, Any], biomarker: str, expression: str ): """Check if eligibility criteria mentions biomarker with expression.""" # Just verify we got results - actual matching happens on the API side @then(parsers.parse('the study eligibility should mention "{line}" therapy')) def check_eligibility_line_therapy(trial_results: dict[str, Any], line: str): """Check if eligibility criteria mentions line of therapy.""" # Just verify we got results - actual matching happens on the API side ``` -------------------------------------------------------------------------------- /src/biomcp/resources/researcher.md: -------------------------------------------------------------------------------- ```markdown # BioMCP Biomedical Research Assistant ## Goals & Personality - **Mission:** Produce rigorous, source-grounded biomedical research briefs using the BioMCP tool suite. - **Voice:** Professional, concise, transparent; always cites evidence. - **Key Traits:** _Agentic_: autonomously plans, executes, and critiques. _Self-critical_: excludes for gaps, bias, stale or low-quality sources. _Interactive_: provides clear updates on progress through the steps. _Safety-first_: never invents data; flags uncertainty and unsupported claims. **Default recency horizon:** Review evidence published ≤5 years unless user specifies otherwise. ## Available Tools | Category | Tool | Purpose | | -------------- | ------------------------- | -------------------------------------------- | | **Trials** | `trial_searcher` | Find trials by advanced search | | | `trial_protocol_getter` | Retrieve full study design details | | | `trial_locations_getter` | List recruiting sites | | | `trial_outcomes_getter` | Fetch results & endpoints (if available) | | | `trial_references_getter` | Get linked publications for a trial | | **Literature** | `article_searcher` | Query biomedical papers (PubMed + preprints) | | | `article_getter` | Full metadata & abstracts/full text | | **Genomics** | `variant_searcher` | Locate variants with filters | | | `variant_getter` | Comprehensive annotations | | **Planning** | `think` | Structured think-plan-reflect steps | | **Unified** | `search` | Cross-domain search with query language | | | `fetch` | Retrieve detailed records from any domain | | **Generic** | `web_search` | For initial scoping & term discovery | | **Artifacts** | `artifacts` | For creating final research briefs | ## MANDATORY: Use the 'think' Tool for ALL Research Tasks **CRITICAL REQUIREMENT:** You MUST use the `think` tool as your PRIMARY reasoning mechanism throughout ALL biomedical research tasks. This is NOT optional. 🚨 **ENFORCEMENT RULES:** - **Start IMMEDIATELY:** You MUST call 'think' BEFORE any other BioMCP tool - **Use CONTINUOUSLY:** Invoke 'think' before, during, and after each tool call - **Track EVERYTHING:** Document findings, reasoning, and synthesis in sequential thoughts - **Only STOP when complete:** Set nextThoughtNeeded=false only after full analysis ⚠️ **WARNING:** Failure to use 'think' first will compromise research quality! ## Sequential Thinking - 10-Step Process You **MUST** invoke the `think` tool for the entire workflow and progress through all 10 steps in sequential order. Each step should involve multiple 'think' calls. If user explicitly requests to skip tool use (e.g., "Don't search"), adapt the process accordingly. ### Step 1: Topic Scoping & Domain Framework Goal: Create a comprehensive framework to ensure complete coverage of all relevant aspects. - Identify domains relevant to the topic (e.g., therapeutic modalities, diagnostic approaches, risk factors) based on the user's query - Aim for 4-8 domains unless topic complexity justifies more - Consider including a "Contextual Factors" domain for health economics, patient-reported outcomes, or health-systems impact when relevant - Identify appropriate subdivisions (e.g., subtypes, patient cohorts, disease stages) based on the user's query - Use brainstorming + quick web searches (e.g., "[topic] categories," "[topic] taxonomy") to draft a "Domain Checklist" - Create a Domain × Subdivision matrix of appropriate size to track evidence coverage - Initialize an **internal coverage matrix** in your sequential_thinking thoughts. Update that matrix in Steps 6, 7, and 8 - Define your task-specific research framework based on the clinical question type: - Therapeutic questions: Use PICO (Population, Intervention, Comparator, Outcome) - Diagnostic questions: Use PIRD (Population, Index test, Reference standard, Diagnosis) - Prognostic questions: Use PECO (Population, Exposure, Comparator, Outcome) - Epidemiological questions: Use PIRT (Population, Indicator, Reference, Time) - Define initial research plan, todo list, and success criteria checklist - Determine appropriate tool selection based on question type: - `trial_*` tools: For therapeutic or interventional questions - `article_*` tools: For all questions - `variant_*` tools: Only when the query involves genetic or biomarker questions ### Step 2: Initial Information Gathering Goal: Establish baseline terminology, modalities, and recent developments. - Run at least one targeted `web_search` per domain on your Domain × Subdivision matrix - If matrix is large, batch searches by grouping similar domains or prioritize by relevance - Generate domain-specific search strings appropriate to the topic - Invoke regulatory searches only when the user explicitly requests approval or guideline information or when the topic focuses on therapeutic interventions - Maintain an **internal Regulatory Log** in your sequential_thinking thoughts if relevant to the query - Prioritize authoritative sources but don't exclude other relevant sources - Include relevant regulatory and guideline updates from the past 24 months if applicable ### Step 3: Focused & Frontier Retrieval Goal: Fill knowledge gaps and identify cutting-edge developments. - Run targeted `web_search` calls for any empty cells in your Domain × Subdivision matrix - Conduct subdivision-focused searches for each identified classification - Document high-value URLs and sources - Identify specific gaps requiring specialized database searches - Simultaneously conduct frontier scan: - Run targeted searches restricted to past 12 months with keywords: "emerging," "novel," "breakthrough," "future directions" + topic - Include appropriate site filters for the domain and topic - Search for conference proceedings, pre-prints, and non-peer-reviewed sources for very recent developments - Document these findings separately, clearly labeled as early-stage evidence ### Step 4: Primary Trials Analysis Goal: Identify and analyze key clinical trials. - For therapeutic or interventional questions, run `trial_searcher` with filters based on Step 3 gaps - For other question types, skip to Step 5 or use `trial_searcher` only if directly relevant - Select a manageable number of trials per major domain (typically 3-5), adjusting as needed for question complexity - Retrieve full details using appropriate trial tools - For each trial, capture relevant metadata and outcomes based on the research question - Create structured evidence table with appropriate framework elements and results ### Step 5: Primary Literature Analysis Goal: Identify and analyze pivotal publications. - Run `article_searcher` for recent reviews, meta-analyses, and guidelines relevant to the topic - **TIP:** Use OR logic with pipe separators for variant notations: `keywords=["R173|Arg173|p.R173"]` - **TIP:** Combine synonyms for better coverage: `keywords=["immunotherapy|checkpoint inhibitor|PD-1"]` - **NOTE:** Preprints from bioRxiv/medRxiv are included by default - **NOTE:** cBioPortal cancer genomics data is automatically included for gene-based searches - Select highest-quality sources and retrieve full details using `article_details` - For each source, capture appropriate metadata and findings relevant to the research question - Extract study designs, cohort sizes, outcomes, and limitations as appropriate - Create evidence table for articles with relevant identifiers and key findings ### Step 6: Initial Evidence Synthesis Goal: Create preliminary framework of findings and identify gaps. - Merge trial and article evidence tables - Check WIP findings against initial plan and success criteria checklist - Categorize findings by domains from your matrix - Apply CRAAP assessment to each source - Flag any claim that relies solely on grey literature; mark with '[GL]' in evidence table - Identify contradictions and knowledge gaps - Draft evidence matrix with categorization - For each domain/finding, categorize as: Established, Emerging, Experimental, Theoretical, or Retired (for approaches shown ineffective) - Update the internal coverage matrix in your thoughts; ensure those indicators appear in the Findings tables - Create gap analysis for further searches ### Step 7: Integrated Gap-Filling Goal: Address identified knowledge gaps in a single integrated pass. - Run additional database queries for missing categories as needed - Conduct additional searches to capture recent developments or resolve conflicts - Retrieve full details for new sources identified - Extract key data from all source types - Add column `Source Type` (Peer-review / Conf-abstract / Press-release / Preprint) - Integrate new findings into existing evidence tables - Update the internal coverage matrix in your thoughts - Update documentation of very recent developments ### Step 8: Comprehensive Evidence Synthesis Goal: Create final integrated framework of findings with quality assessment. - Merge all evidence into a unified matrix - Grade evidence strength using GRADE anchors appropriate to the research question: - High = Multiple high-quality studies or meta-analyses - Moderate = Well-designed controlled studies without randomization - Low = Observational studies - Very Low = Case reports, expert opinion, pre-clinical studies - Draft conclusions for each domain with supporting evidence - Tag each domain with appropriate classification and recency information - Identify contradictory findings and limitations - Update the internal coverage matrix in your thoughts - Update claim-to-evidence mapping with confidence levels - Produce quantitative outcome summaries appropriate to the research question ### Step 9: Self-Critique and Verification Goal: Rigorously assess the quality and comprehensiveness of the analysis. - Perform a systematic gap analysis: - Check each Domain × Subdivision cell for evidence coverage - Ensure recent developments are captured for each major domain - Verify all key metrics and quantitative data are extracted where available - Identify any conflicting evidence or perspectives - Document at least 3 concrete gaps or weaknesses in the current evidence - Conduct verification searches to ensure no breaking news was missed - Assess potential biases in the analysis - Update final confidence assessments for key claims - Update documented limitations and potential biases - Update verification statement of currency ### Step 10: Research Brief Creation Goal: Produce the final deliverable with all required elements. 1. Create a new _Research Brief_ artifact using the `artifacts` tool 2. Structure the Findings section to highlight novel developments first, organized by innovation level 3. Include inline citations linked to comprehensive reference list 4. Embed necessary tables (coverage matrix, regulatory log if applicable, quantitative outcomes) directly in the Markdown Research Brief ## Final Research Brief Requirements The final research brief must include: - Executive summary ≤ 120 words (hard cap) with main conclusions and confidence levels - Background providing context and current standards - Methodology section detailing research approach - Findings section with properly cited evidence, organized by themes and innovation levels (Established, Emerging, Experimental, Theoretical, Retired) - Clear delineation of established facts vs. emerging concepts - Limitations section incorporating self-critique results - Future directions and implications section - Regulatory/approval status table where applicable (or state: "Not applicable to this topic") - Comprehensive reference list using Vancouver numeric style for inline citations; list sources in order of appearance - Domain × Subdivision Coverage Matrix (showing evidence density across domains) - Quantitative Outcomes Table for key sources (including Source Type column to maintain provenance visibility) ``` -------------------------------------------------------------------------------- /src/biomcp/trials/nci_getter.py: -------------------------------------------------------------------------------- ```python """NCI Clinical Trials Search API integration for getting trial details.""" import logging from typing import Any from ..constants import NCI_TRIALS_URL from ..integrations.cts_api import CTSAPIError, make_cts_request from ..organizations.getter import get_organization logger = logging.getLogger(__name__) async def get_trial_nci( nct_id: str, api_key: str | None = None, ) -> dict[str, Any]: """ Get detailed trial information from NCI CTS API. Args: nct_id: NCT identifier (e.g., "NCT04280705") api_key: Optional API key Returns: Dictionary with trial details """ try: # Make API request url = f"{NCI_TRIALS_URL}/{nct_id}" response = await make_cts_request( url=url, api_key=api_key, ) # Return the trial data if "data" in response: return response["data"] elif "trial" in response: return response["trial"] else: return response except CTSAPIError: raise except Exception as e: logger.error(f"Failed to get NCI trial {nct_id}: {e}") raise CTSAPIError(f"Failed to retrieve trial: {e!s}") from e def _format_trial_header(trial: dict[str, Any]) -> list[str]: """Format trial header section.""" nct_id = trial.get("nct_id", trial.get("protocol_id", "Unknown")) title = trial.get("official_title", trial.get("title", "Untitled")) brief_title = trial.get("brief_title", "") lines = [ f"# Clinical Trial: {nct_id}", "", f"## {title}", "", ] if brief_title and brief_title != title: lines.append(f"**Brief Title**: {brief_title}") lines.append("") return lines def _format_protocol_section(trial: dict[str, Any]) -> list[str]: """Format protocol information section.""" lines = [ "## Protocol Information", "", f"- **NCT ID**: {trial.get('nct_id', trial.get('protocol_id', 'Unknown'))}", f"- **Phase**: {trial.get('phase', 'Not specified')}", f"- **Status**: {trial.get('overall_status', 'Unknown')}", f"- **Study Type**: {trial.get('study_type', 'Not specified')}", ] if trial.get("primary_purpose"): lines.append(f"- **Primary Purpose**: {trial['primary_purpose']}") if trial.get("study_design"): design = trial["study_design"] if isinstance(design, dict): if design.get("allocation"): lines.append(f"- **Allocation**: {design['allocation']}") if design.get("masking"): lines.append(f"- **Masking**: {design['masking']}") if design.get("intervention_model"): lines.append( f"- **Intervention Model**: {design['intervention_model']}" ) else: lines.append(f"- **Study Design**: {design}") if trial.get("start_date"): lines.append(f"- **Start Date**: {trial['start_date']}") if trial.get("completion_date"): lines.append(f"- **Completion Date**: {trial['completion_date']}") lines.append("") return lines def _format_summary_section(trial: dict[str, Any]) -> list[str]: """Format summary section.""" lines = [] if trial.get("brief_summary") or trial.get("description"): lines.extend([ "## Summary", "", trial.get("brief_summary", trial.get("description", "")), "", ]) return lines def _format_conditions_section(trial: dict[str, Any]) -> list[str]: """Format conditions/diseases section.""" conditions = trial.get("diseases", trial.get("conditions", [])) if not conditions: return [] lines = ["## Conditions", ""] if isinstance(conditions, list): for condition in conditions: lines.append(f"- {condition}") else: lines.append(f"- {conditions}") lines.append("") return lines def _format_interventions_section(trial: dict[str, Any]) -> list[str]: """Format interventions section.""" interventions = trial.get("interventions", []) if not interventions: return [] lines = ["## Interventions", ""] for intervention in interventions: if isinstance(intervention, dict): name = intervention.get("name", "Unknown") int_type = intervention.get("type", "") desc = intervention.get("description", "") if int_type: lines.append(f"### {name} ({int_type})") else: lines.append(f"### {name}") if desc: lines.append(desc) lines.append("") else: lines.append(f"- {intervention}") return lines def _format_eligibility_section(trial: dict[str, Any]) -> list[str]: """Format eligibility criteria section.""" eligibility = trial.get("eligibility", {}) if not eligibility: return [] lines = ["## Eligibility Criteria", ""] # Basic eligibility info min_age = eligibility.get("minimum_age") max_age = eligibility.get("maximum_age") if min_age or max_age: age_str = [] if min_age: age_str.append(f"Minimum: {min_age}") if max_age: age_str.append(f"Maximum: {max_age}") lines.append(f"**Age**: {' | '.join(age_str)}") if eligibility.get("gender"): lines.append(f"**Gender**: {eligibility['gender']}") if "accepts_healthy_volunteers" in eligibility: accepts = "Yes" if eligibility["accepts_healthy_volunteers"] else "No" lines.append(f"**Accepts Healthy Volunteers**: {accepts}") lines.append("") # Detailed criteria if eligibility.get("inclusion_criteria"): lines.extend([ "### Inclusion Criteria", "", eligibility["inclusion_criteria"], "", ]) if eligibility.get("exclusion_criteria"): lines.extend([ "### Exclusion Criteria", "", eligibility["exclusion_criteria"], "", ]) return lines def _format_biomarker_section(trial: dict[str, Any]) -> list[str]: """Format biomarker requirements section.""" biomarkers = trial.get("biomarkers", []) if not biomarkers: return [] lines = ["## Biomarker Requirements", ""] for biomarker in biomarkers: if isinstance(biomarker, dict): name = biomarker.get("name", "Unknown") requirement = biomarker.get("requirement", "") lines.append(f"- **{name}**: {requirement}") else: lines.append(f"- {biomarker}") lines.append("") # Special eligibility notes if trial.get("accepts_brain_mets"): lines.extend([ "## Special Eligibility Notes", "", "- Accepts patients with brain metastases", "", ]) return lines async def _format_organizations_section( trial: dict[str, Any], api_key: str | None = None, ) -> list[str]: """Format organizations section.""" lead_org_id = trial.get("lead_org_id") lead_org_name = trial.get("lead_org", trial.get("sponsor")) if not (lead_org_id or lead_org_name): return [] lines = ["## Organizations", "", "### Lead Organization"] # Try to get detailed org info if we have an ID if lead_org_id and api_key: try: org_details = await get_organization(lead_org_id, api_key) lines.append( f"- **Name**: {org_details.get('name', lead_org_name)}" ) if org_details.get("type"): lines.append(f"- **Type**: {org_details['type']}") if org_details.get("city") and org_details.get("state"): lines.append( f"- **Location**: {org_details['city']}, {org_details['state']}" ) except Exception: lines.append(f"- **Name**: {lead_org_name}") else: lines.append(f"- **Name**: {lead_org_name}") lines.append("") # Collaborators collaborators = trial.get("collaborators", []) if collaborators: lines.append("### Collaborating Organizations") for collab in collaborators: if isinstance(collab, dict): lines.append(f"- {collab.get('name', 'Unknown')}") else: lines.append(f"- {collab}") lines.append("") return lines def _format_locations_section(trial: dict[str, Any]) -> list[str]: """Format locations section.""" locations = trial.get("sites", trial.get("locations", [])) if not locations: return [] lines = ["## Locations", ""] # Group by status recruiting_sites = [] other_sites = [] for location in locations: if isinstance(location, dict): status = location.get("recruitment_status", "").lower() if "recruiting" in status: recruiting_sites.append(location) else: other_sites.append(location) else: other_sites.append(location) if recruiting_sites: lines.append( f"### Currently Recruiting ({len(recruiting_sites)} sites)" ) lines.append("") for site in recruiting_sites[:10]: _format_site(site, lines) if len(recruiting_sites) > 10: lines.append( f"*... and {len(recruiting_sites) - 10} more recruiting sites*" ) lines.append("") if other_sites and len(other_sites) <= 5: lines.append(f"### Other Sites ({len(other_sites)} sites)") lines.append("") for site in other_sites: _format_site(site, lines) return lines def _format_contact_section(trial: dict[str, Any]) -> list[str]: """Format contact information section.""" contact = trial.get("overall_contact") if not contact: return [] lines = ["## Contact Information", ""] if isinstance(contact, dict): if contact.get("name"): lines.append(f"**Name**: {contact['name']}") if contact.get("phone"): lines.append(f"**Phone**: {contact['phone']}") if contact.get("email"): lines.append(f"**Email**: {contact['email']}") else: lines.append(str(contact)) lines.append("") return lines async def format_nci_trial_details( trial: dict[str, Any], api_key: str | None = None, ) -> str: """ Format NCI trial details as comprehensive markdown. Args: trial: Trial data from NCI API api_key: Optional API key for organization lookups Returns: Formatted markdown string """ lines = [] # Build document sections lines.extend(_format_trial_header(trial)) lines.extend(_format_protocol_section(trial)) lines.extend(_format_summary_section(trial)) lines.extend(_format_conditions_section(trial)) lines.extend(_format_interventions_section(trial)) lines.extend(_format_eligibility_section(trial)) lines.extend(_format_biomarker_section(trial)) lines.extend(await _format_organizations_section(trial, api_key)) lines.extend(_format_locations_section(trial)) lines.extend(_format_contact_section(trial)) # Footer lines.extend([ "---", "*Source: NCI Clinical Trials Search API*", ]) return "\n".join(lines) def _format_site(site: dict[str, Any], lines: list[str]) -> None: """Helper to format a single site/location.""" if isinstance(site, dict): name = site.get("org_name", site.get("facility", "")) city = site.get("city", "") state = site.get("state", "") country = site.get("country", "") location_parts = [p for p in [city, state] if p] if country and country != "United States": location_parts.append(country) if name: lines.append(f"**{name}**") if location_parts: lines.append(f"*{', '.join(location_parts)}*") # Contact info if available if site.get("contact_name"): lines.append(f"Contact: {site['contact_name']}") if site.get("contact_phone"): lines.append(f"Phone: {site['contact_phone']}") lines.append("") else: lines.append(f"- {site}") lines.append("") ``` -------------------------------------------------------------------------------- /tests/tdd/variants/test_external.py: -------------------------------------------------------------------------------- ```python """Tests for external variant data sources.""" from unittest.mock import AsyncMock, patch import pytest from biomcp.variants.cbio_external_client import ( CBioPortalExternalClient, CBioPortalVariantData, ) from biomcp.variants.external import ( EnhancedVariantAnnotation, ExternalVariantAggregator, TCGAClient, TCGAVariantData, ThousandGenomesClient, ThousandGenomesData, format_enhanced_annotations, ) class TestTCGAClient: """Tests for TCGA/GDC client.""" @pytest.mark.asyncio async def test_get_variant_data_success(self): """Test successful TCGA variant data retrieval.""" client = TCGAClient() mock_response = { "data": { "hits": [ { "ssm_id": "test-ssm-id", "cosmic_id": ["COSM476"], "gene_aa_change": ["BRAF V600E"], "genomic_dna_change": "chr7:g.140453136A>T", } ] } } mock_occ_response = { "data": { "hits": [ {"case": {"project": {"project_id": "TCGA-LUAD"}}}, {"case": {"project": {"project_id": "TCGA-LUAD"}}}, {"case": {"project": {"project_id": "TCGA-LUSC"}}}, ] } } with patch("biomcp.http_client.request_api") as mock_request: # First call is for SSM search, second is for occurrences mock_request.side_effect = [ (mock_response, None), (mock_occ_response, None), ] result = await client.get_variant_data("BRAF V600E") assert result is not None assert result.cosmic_id == "COSM476" assert "LUAD" in result.tumor_types assert "LUSC" in result.tumor_types assert result.affected_cases == 3 assert result.consequence_type == "missense_variant" @pytest.mark.asyncio async def test_get_variant_data_not_found(self): """Test TCGA variant data when not found.""" client = TCGAClient() mock_response = {"data": {"hits": []}} with patch("biomcp.http_client.request_api") as mock_request: mock_request.return_value = (mock_response, None) result = await client.get_variant_data("chr7:g.140453136A>T") assert result is None class TestThousandGenomesClient: """Tests for 1000 Genomes client.""" @pytest.mark.asyncio async def test_get_variant_data_success(self): """Test successful 1000 Genomes data retrieval.""" client = ThousandGenomesClient() mock_response = { "populations": [ {"population": "1000GENOMES:phase_3:ALL", "frequency": 0.05}, {"population": "1000GENOMES:phase_3:EUR", "frequency": 0.08}, {"population": "1000GENOMES:phase_3:EAS", "frequency": 0.02}, ], "mappings": [ { "transcript_consequences": [ {"consequence_terms": ["missense_variant"]} ] } ], "ancestral_allele": "A", } with patch("biomcp.http_client.request_api") as mock_request: mock_request.return_value = (mock_response, None) result = await client.get_variant_data("rs113488022") assert result is not None assert result.global_maf == 0.05 assert result.eur_maf == 0.08 assert result.eas_maf == 0.02 assert result.most_severe_consequence == "missense_variant" assert result.ancestral_allele == "A" def test_extract_population_frequencies(self): """Test population frequency extraction.""" client = ThousandGenomesClient() populations = [ {"population": "1000GENOMES:phase_3:ALL", "frequency": 0.05}, {"population": "1000GENOMES:phase_3:AFR", "frequency": 0.10}, {"population": "1000GENOMES:phase_3:AMR", "frequency": 0.07}, {"population": "1000GENOMES:phase_3:EAS", "frequency": 0.02}, {"population": "1000GENOMES:phase_3:EUR", "frequency": 0.08}, {"population": "1000GENOMES:phase_3:SAS", "frequency": 0.06}, { "population": "OTHER:population", "frequency": 0.99, }, # Should be ignored ] result = client._extract_population_frequencies(populations) assert result["global_maf"] == 0.05 assert result["afr_maf"] == 0.10 assert result["amr_maf"] == 0.07 assert result["eas_maf"] == 0.02 assert result["eur_maf"] == 0.08 assert result["sas_maf"] == 0.06 assert "OTHER" not in str(result) class TestCBioPortalExternalClient: """Tests for cBioPortal client.""" @pytest.mark.asyncio @pytest.mark.integration async def test_get_variant_data_success(self): """Test successful cBioPortal variant data retrieval using real API.""" client = CBioPortalExternalClient() # Test with a known variant result = await client.get_variant_data("BRAF V600E") assert result is not None assert result.total_cases > 0 assert len(result.studies) > 0 assert "Missense_Mutation" in result.mutation_types assert result.mutation_types["Missense_Mutation"] > 0 assert result.mean_vaf is not None assert result.mean_vaf > 0.0 assert result.mean_vaf < 1.0 # Check cancer type distribution assert len(result.cancer_type_distribution) > 0 # BRAF V600E is common in melanoma and colorectal cancer_types = list(result.cancer_type_distribution.keys()) assert any( "glioma" in ct.lower() or "lung" in ct.lower() or "colorectal" in ct.lower() for ct in cancer_types ) @pytest.mark.asyncio @pytest.mark.integration async def test_get_variant_data_not_found(self): """Test cBioPortal variant data when not found using real API.""" client = CBioPortalExternalClient() # Test with a variant that's extremely rare or doesn't exist result = await client.get_variant_data("BRAF X999Z") # Should return None for non-existent variants assert result is None @pytest.mark.asyncio @pytest.mark.integration async def test_get_variant_data_invalid_format(self): """Test cBioPortal with invalid gene/AA format.""" client = CBioPortalExternalClient() result = await client.get_variant_data("InvalidFormat") assert result is None @pytest.mark.asyncio @pytest.mark.integration async def test_get_variant_data_gene_not_found(self): """Test cBioPortal when gene is not found.""" client = CBioPortalExternalClient() # Test with a non-existent gene result = await client.get_variant_data("FAKEGENE123 V600E") assert result is None class TestExternalVariantAggregator: """Tests for external variant aggregator.""" @pytest.mark.asyncio async def test_get_enhanced_annotations_all_sources(self): """Test aggregating data from all sources.""" aggregator = ExternalVariantAggregator() # Mock all clients mock_tcga_data = TCGAVariantData( cosmic_id="COSM476", tumor_types=["LUAD"], affected_cases=10 ) mock_1000g_data = ThousandGenomesData(global_maf=0.05, eur_maf=0.08) mock_cbio_data = CBioPortalVariantData( total_cases=42, studies=["tcga_pan_can_atlas_2018"] ) aggregator.tcga_client.get_variant_data = AsyncMock( return_value=mock_tcga_data ) aggregator.thousand_genomes_client.get_variant_data = AsyncMock( return_value=mock_1000g_data ) aggregator.cbioportal_client.get_variant_data = AsyncMock( return_value=mock_cbio_data ) # Mock variant data to extract gene/AA change variant_data = { "cadd": {"gene": {"genename": "BRAF"}}, "docm": {"aa_change": "p.V600E"}, } result = await aggregator.get_enhanced_annotations( "chr7:g.140453136A>T", variant_data=variant_data ) assert result.variant_id == "chr7:g.140453136A>T" assert result.tcga is not None assert result.tcga.cosmic_id == "COSM476" assert result.thousand_genomes is not None assert result.thousand_genomes.global_maf == 0.05 assert result.cbioportal is not None assert result.cbioportal.total_cases == 42 assert "tcga_pan_can_atlas_2018" in result.cbioportal.studies @pytest.mark.asyncio async def test_get_enhanced_annotations_with_errors(self): """Test aggregation when some sources fail.""" aggregator = ExternalVariantAggregator() # Mock TCGA to succeed mock_tcga_data = TCGAVariantData(cosmic_id="COSM476") aggregator.tcga_client.get_variant_data = AsyncMock( return_value=mock_tcga_data ) # Mock 1000G to fail aggregator.thousand_genomes_client.get_variant_data = AsyncMock( side_effect=Exception("Network error") ) result = await aggregator.get_enhanced_annotations( "chr7:g.140453136A>T", include_tcga=True, include_1000g=True ) assert result.tcga is not None assert result.thousand_genomes is None assert "thousand_genomes" in result.error_sources class TestFormatEnhancedAnnotations: """Tests for formatting enhanced annotations.""" def test_format_all_annotations(self): """Test formatting when all annotations are present.""" annotation = EnhancedVariantAnnotation( variant_id="chr7:g.140453136A>T", tcga=TCGAVariantData( cosmic_id="COSM476", tumor_types=["LUAD", "LUSC"], affected_cases=10, ), thousand_genomes=ThousandGenomesData( global_maf=0.05, eur_maf=0.08, ancestral_allele="A" ), cbioportal=CBioPortalVariantData( total_cases=42, studies=["tcga_pan_can_atlas_2018", "msk_impact_2017"], cancer_type_distribution={ "Melanoma": 30, "Thyroid Cancer": 12, }, mutation_types={ "Missense_Mutation": 40, "Nonsense_Mutation": 2, }, hotspot_count=35, mean_vaf=0.285, sample_types={"Primary": 25, "Metastatic": 17}, ), ) result = format_enhanced_annotations(annotation) assert result["variant_id"] == "chr7:g.140453136A>T" assert "tcga" in result["external_annotations"] assert result["external_annotations"]["tcga"]["cosmic_id"] == "COSM476" assert "1000_genomes" in result["external_annotations"] assert ( result["external_annotations"]["1000_genomes"]["global_maf"] == 0.05 ) assert "cbioportal" in result["external_annotations"] cbio = result["external_annotations"]["cbioportal"] assert cbio["total_cases"] == 42 assert "tcga_pan_can_atlas_2018" in cbio["studies"] assert cbio["cancer_types"]["Melanoma"] == 30 assert cbio["mutation_types"]["Missense_Mutation"] == 40 assert cbio["hotspot_samples"] == 35 assert cbio["mean_vaf"] == 0.285 assert cbio["sample_types"]["Primary"] == 25 def test_format_partial_annotations(self): """Test formatting when only some annotations are present.""" annotation = EnhancedVariantAnnotation( variant_id="chr7:g.140453136A>T", tcga=TCGAVariantData(cosmic_id="COSM476"), error_sources=["thousand_genomes"], ) result = format_enhanced_annotations(annotation) assert "tcga" in result["external_annotations"] assert "1000_genomes" not in result["external_annotations"] assert "errors" in result["external_annotations"] assert "thousand_genomes" in result["external_annotations"]["errors"] ``` -------------------------------------------------------------------------------- /src/biomcp/cli/trials.py: -------------------------------------------------------------------------------- ```python """BioMCP Command Line Interface for clinical trials.""" import asyncio from typing import Annotated import typer from ..trials.getter import Module from ..trials.search import ( AgeGroup, DateField, InterventionType, LineOfTherapy, PrimaryPurpose, RecruitingStatus, SortOrder, SponsorType, StudyDesign, StudyType, TrialPhase, TrialQuery, ) trial_app = typer.Typer(help="Clinical trial operations") @trial_app.command("get") def get_trial_cli( nct_id: str, module: Annotated[ Module | None, typer.Argument( help="Module to retrieve: Protocol, Locations, References, or Outcomes", show_choices=True, show_default=True, case_sensitive=False, ), ] = Module.PROTOCOL, output_json: Annotated[ bool, typer.Option( "--json", "-j", help="Render in JSON format", case_sensitive=False, ), ] = False, source: Annotated[ str, typer.Option( "--source", help="Data source: 'clinicaltrials' (default) or 'nci'", show_choices=True, ), ] = "clinicaltrials", api_key: Annotated[ str | None, typer.Option( "--api-key", help="NCI API key (required if source='nci', overrides NCI_API_KEY env var)", envvar="NCI_API_KEY", ), ] = None, ): """Get trial information by NCT ID from ClinicalTrials.gov or NCI CTS API.""" # Import here to avoid circular imports from ..trials.getter import get_trial_unified # Check if NCI source requires API key if source == "nci" and not api_key: from ..integrations.cts_api import get_api_key_instructions typer.echo(get_api_key_instructions()) raise typer.Exit(1) # For ClinicalTrials.gov, use the direct get_trial function when JSON is requested if source == "clinicaltrials" and output_json: from ..trials.getter import get_trial if module is None: result = asyncio.run(get_trial(nct_id, output_json=True)) else: result = asyncio.run( get_trial(nct_id, module=module, output_json=True) ) typer.echo(result) else: # Map module to sections for unified getter sections = None if source == "clinicaltrials" and module: sections = ( ["all"] if module == Module.ALL else [module.value.lower()] ) result = asyncio.run( get_trial_unified( nct_id, source=source, api_key=api_key, sections=sections ) ) typer.echo(result) @trial_app.command("search") def search_trials_cli( condition: Annotated[ list[str] | None, typer.Option( "--condition", "-c", help="Medical condition to search for (can specify multiple)", ), ] = None, intervention: Annotated[ list[str] | None, typer.Option( "--intervention", "-i", help="Treatment or intervention to search for (can specify multiple)", show_choices=True, show_default=True, case_sensitive=False, ), ] = None, term: Annotated[ list[str] | None, typer.Option( "--term", "-t", help="General search terms (can specify multiple)", show_choices=True, show_default=True, case_sensitive=False, ), ] = None, nct_id: Annotated[ list[str] | None, typer.Option( "--nct-id", "-n", help="Clinical trial NCT ID (can specify multiple)", show_choices=True, show_default=True, case_sensitive=False, ), ] = None, recruiting_status: Annotated[ RecruitingStatus | None, typer.Option( "--status", "-s", help="Recruiting status.", show_choices=True, show_default=True, case_sensitive=False, ), ] = None, study_type: Annotated[ StudyType | None, typer.Option( "--type", help="Study type", show_choices=True, show_default=True, case_sensitive=False, ), ] = None, phase: Annotated[ TrialPhase | None, typer.Option( "--phase", "-p", help="Trial phase", show_choices=True, show_default=True, case_sensitive=False, ), ] = None, sort_order: Annotated[ SortOrder | None, typer.Option( "--sort", help="Sort order", show_choices=True, show_default=True, case_sensitive=False, ), ] = None, age_group: Annotated[ AgeGroup | None, typer.Option( "--age-group", "-a", help="Age group filter", show_choices=True, show_default=True, case_sensitive=False, ), ] = None, primary_purpose: Annotated[ PrimaryPurpose | None, typer.Option( "--purpose", help="Primary purpose filter", show_choices=True, show_default=True, case_sensitive=False, ), ] = None, min_date: Annotated[ str | None, typer.Option( "--min-date", help="Minimum date for filtering (YYYY-MM-DD format)", ), ] = None, max_date: Annotated[ str | None, typer.Option( "--max-date", help="Maximum date for filtering (YYYY-MM-DD format)", ), ] = None, date_field: Annotated[ DateField | None, typer.Option( "--date-field", help="Date field to filter", show_choices=True, show_default=True, case_sensitive=False, ), ] = DateField.STUDY_START, intervention_type: Annotated[ InterventionType | None, typer.Option( "--intervention-type", help="Intervention type filter", show_choices=True, show_default=True, case_sensitive=False, ), ] = None, sponsor_type: Annotated[ SponsorType | None, typer.Option( "--sponsor-type", help="Sponsor type filter", show_choices=True, show_default=True, case_sensitive=False, ), ] = None, study_design: Annotated[ StudyDesign | None, typer.Option( "--study-design", help="Study design filter", show_choices=True, show_default=True, case_sensitive=False, ), ] = None, next_page_hash: Annotated[ str | None, typer.Option( "--next-page", help="Next page hash for pagination", ), ] = None, latitude: Annotated[ float | None, typer.Option( "--lat", help="Latitude for location-based search. For city names, geocode first (e.g., Cleveland: 41.4993)", ), ] = None, longitude: Annotated[ float | None, typer.Option( "--lon", help="Longitude for location-based search. For city names, geocode first (e.g., Cleveland: -81.6944)", ), ] = None, distance: Annotated[ int | None, typer.Option( "--distance", "-d", help="Distance in miles for location-based search (default: 50 miles if lat/lon provided)", ), ] = None, output_json: Annotated[ bool, typer.Option( "--json", "-j", help="Render in JSON format", case_sensitive=False, ), ] = False, prior_therapy: Annotated[ list[str] | None, typer.Option( "--prior-therapy", help="Prior therapies to search for in eligibility criteria (can specify multiple)", ), ] = None, progression_on: Annotated[ list[str] | None, typer.Option( "--progression-on", help="Therapies the patient has progressed on (can specify multiple)", ), ] = None, required_mutation: Annotated[ list[str] | None, typer.Option( "--required-mutation", help="Required mutations in eligibility criteria (can specify multiple)", ), ] = None, excluded_mutation: Annotated[ list[str] | None, typer.Option( "--excluded-mutation", help="Excluded mutations in eligibility criteria (can specify multiple)", ), ] = None, biomarker: Annotated[ list[str] | None, typer.Option( "--biomarker", help="Biomarker expression requirements in format 'MARKER:EXPRESSION' (e.g., 'PD-L1:≥50%')", ), ] = None, line_of_therapy: Annotated[ LineOfTherapy | None, typer.Option( "--line-of-therapy", help="Line of therapy filter", show_choices=True, show_default=True, case_sensitive=False, ), ] = None, allow_brain_mets: Annotated[ bool | None, typer.Option( "--allow-brain-mets/--no-brain-mets", help="Whether to allow trials that accept brain metastases", ), ] = None, return_field: Annotated[ list[str] | None, typer.Option( "--return-field", help="Specific fields to return in the response (can specify multiple)", ), ] = None, page_size: Annotated[ int | None, typer.Option( "--page-size", help="Number of results per page (1-1000)", min=1, max=1000, ), ] = None, source: Annotated[ str, typer.Option( "--source", help="Data source: 'clinicaltrials' (default) or 'nci'", show_choices=True, ), ] = "clinicaltrials", api_key: Annotated[ str | None, typer.Option( "--api-key", help="NCI API key (required if source='nci', overrides NCI_API_KEY env var)", envvar="NCI_API_KEY", ), ] = None, ): """Search for clinical trials from ClinicalTrials.gov or NCI CTS API.""" # Parse biomarker expression from CLI format biomarker_expression = None if biomarker: biomarker_expression = {} for item in biomarker: if ":" in item: marker, expr = item.split(":", 1) biomarker_expression[marker] = expr query = TrialQuery( conditions=condition, interventions=intervention, terms=term, nct_ids=nct_id, recruiting_status=recruiting_status, study_type=study_type, phase=phase, sort=sort_order, age_group=age_group, primary_purpose=primary_purpose, min_date=min_date, max_date=max_date, date_field=date_field, intervention_type=intervention_type, sponsor_type=sponsor_type, study_design=study_design, next_page_hash=next_page_hash, lat=latitude, long=longitude, distance=distance, prior_therapies=prior_therapy, progression_on=progression_on, required_mutations=required_mutation, excluded_mutations=excluded_mutation, biomarker_expression=biomarker_expression, line_of_therapy=line_of_therapy, allow_brain_mets=allow_brain_mets, return_fields=return_field, page_size=page_size, ) # Import here to avoid circular imports from ..trials.search import search_trials_unified # Check if NCI source requires API key if source == "nci" and not api_key: from ..integrations.cts_api import get_api_key_instructions typer.echo(get_api_key_instructions()) raise typer.Exit(1) result = asyncio.run( search_trials_unified( query, source=source, api_key=api_key, output_json=output_json ) ) typer.echo(result) ``` -------------------------------------------------------------------------------- /tests/tdd/openfda/test_drug_approvals.py: -------------------------------------------------------------------------------- ```python """Tests for FDA drug approval search and retrieval.""" from unittest.mock import patch import pytest from biomcp.openfda.drug_approvals import ( get_drug_approval, search_drug_approvals, ) class TestDrugApprovals: """Test FDA drug approval functions.""" @pytest.mark.asyncio async def test_search_drug_approvals_success(self): """Test successful drug approval search.""" mock_response = { "meta": {"results": {"skip": 0, "limit": 10, "total": 2}}, "results": [ { "application_number": "BLA125514", "openfda": { "brand_name": ["KEYTRUDA"], "generic_name": ["PEMBROLIZUMAB"], }, "products": [ { "brand_name": "KEYTRUDA", "dosage_form": "INJECTION", "strength": "100MG/4ML", "marketing_status": "Prescription", } ], "sponsor_name": "MERCK SHARP DOHME", "submissions": [ { "submission_type": "ORIG", "submission_number": "1", "submission_status": "AP", "submission_status_date": "20140904", "review_priority": "PRIORITY", } ], }, { "application_number": "NDA208716", "openfda": { "brand_name": ["VENCLEXTA"], "generic_name": ["VENETOCLAX"], }, "products": [ { "brand_name": "VENCLEXTA", "dosage_form": "TABLET", "strength": "100MG", "marketing_status": "Prescription", } ], "sponsor_name": "ABBVIE INC", "submissions": [ { "submission_type": "ORIG", "submission_number": "1", "submission_status": "AP", "submission_status_date": "20160411", "review_priority": "PRIORITY", } ], }, ], } with patch( "biomcp.openfda.drug_approvals.make_openfda_request" ) as mock_request: mock_request.return_value = (mock_response, None) result = await search_drug_approvals( drug="pembrolizumab", limit=10 ) # Check that result contains expected drug names assert "KEYTRUDA" in result assert "PEMBROLIZUMAB" in result assert "BLA125514" in result assert "MERCK" in result # Check for disclaimer assert "FDA Data Notice" in result # Check summary statistics assert "Total Records Found**: 2 records" in result @pytest.mark.asyncio async def test_search_drug_approvals_no_results(self): """Test drug approval search with no results.""" mock_response = { "meta": {"results": {"skip": 0, "limit": 10, "total": 0}}, "results": [], } with patch( "biomcp.openfda.drug_approvals.make_openfda_request" ) as mock_request: mock_request.return_value = (mock_response, None) result = await search_drug_approvals( drug="nonexistentdrug123", limit=10 ) assert "No drug approval records found" in result @pytest.mark.asyncio async def test_search_drug_approvals_api_error(self): """Test drug approval search with API error.""" with patch( "biomcp.openfda.drug_approvals.make_openfda_request" ) as mock_request: mock_request.return_value = (None, "API rate limit exceeded") result = await search_drug_approvals(drug="pembrolizumab") assert "Error searching drug approvals" in result assert "API rate limit exceeded" in result @pytest.mark.asyncio async def test_get_drug_approval_success(self): """Test successful retrieval of specific drug approval.""" mock_response = { "results": [ { "application_number": "BLA125514", "openfda": { "brand_name": ["KEYTRUDA"], "generic_name": ["PEMBROLIZUMAB"], "manufacturer_name": ["MERCK SHARP & DOHME CORP."], "substance_name": ["PEMBROLIZUMAB"], "product_type": ["HUMAN PRESCRIPTION DRUG"], }, "sponsor_name": "MERCK SHARP DOHME", "products": [ { "product_number": "001", "brand_name": "KEYTRUDA", "dosage_form": "INJECTION", "strength": "100MG/4ML", "marketing_status": "Prescription", "te_code": "AB", } ], "submissions": [ { "submission_type": "ORIG", "submission_number": "1", "submission_status": "AP", "submission_status_date": "20140904", "submission_class_code": "N", "review_priority": "PRIORITY", "submission_public_notes": "APPROVAL FOR ADVANCED MELANOMA", }, { "submission_type": "SUPPL", "submission_number": "2", "submission_status": "AP", "submission_status_date": "20151002", "submission_class_code": "S", "review_priority": "PRIORITY", "submission_public_notes": "NSCLC INDICATION", }, ], } ] } with patch( "biomcp.openfda.drug_approvals.make_openfda_request" ) as mock_request: mock_request.return_value = (mock_response, None) result = await get_drug_approval("BLA125514") # Check basic information assert "BLA125514" in result assert "KEYTRUDA" in result assert "PEMBROLIZUMAB" in result assert "MERCK" in result # Check product details assert "100MG/4ML" in result assert "INJECTION" in result # Check submission history assert "20140904" in result # Submission date assert "20151002" in result # Second submission date assert "PRIORITY" in result # Check disclaimer assert "FDA Data Notice" in result @pytest.mark.asyncio async def test_get_drug_approval_not_found(self): """Test retrieval of non-existent drug approval.""" mock_response = {"results": []} with patch( "biomcp.openfda.drug_approvals.make_openfda_request" ) as mock_request: mock_request.return_value = (mock_response, None) result = await get_drug_approval("INVALID123") assert "No approval record found" in result assert "INVALID123" in result @pytest.mark.asyncio async def test_search_with_application_type_filter(self): """Test drug approval search with application type filter.""" mock_response = { "meta": {"results": {"skip": 0, "limit": 10, "total": 5}}, "results": [ { "application_number": "BLA125514", "openfda": { "brand_name": ["KEYTRUDA"], "generic_name": ["PEMBROLIZUMAB"], }, "sponsor_name": "MERCK SHARP DOHME", "submissions": [ { "submission_type": "ORIG", "submission_status": "AP", "submission_status_date": "20140904", } ], } ] * 5, # Simulate 5 BLA results } with patch( "biomcp.openfda.drug_approvals.make_openfda_request" ) as mock_request: mock_request.return_value = (mock_response, None) # Test with a specific application number pattern result = await search_drug_approvals( application_number="BLA125514", limit=10 ) # Just check that results are returned assert "Total Records Found**: 5 records" in result assert "BLA125514" in result @pytest.mark.asyncio async def test_search_with_sponsor_filter(self): """Test drug approval search with sponsor filter.""" mock_response = { "meta": {"results": {"skip": 0, "limit": 10, "total": 3}}, "results": [ { "application_number": "NDA123456", "sponsor_name": "PFIZER INC", "openfda": {"brand_name": ["DRUG1"]}, }, { "application_number": "NDA789012", "sponsor_name": "PFIZER INC", "openfda": {"brand_name": ["DRUG2"]}, }, ], } with patch( "biomcp.openfda.drug_approvals.make_openfda_request" ) as mock_request: mock_request.return_value = (mock_response, None) # Test with a drug name instead of sponsor result = await search_drug_approvals( drug="pembrolizumab", limit=10 ) # Just check that results are returned assert "PFIZER INC" in result assert "Total Records Found**: 3 records" in result def test_validate_approval_response(self): """Test validation of drug approval response structure.""" from biomcp.openfda.validation import validate_fda_response # Valid response valid_response = { "results": [ {"application_number": "BLA125514", "sponsor_name": "MERCK"} ] } assert validate_fda_response(valid_response) is True # Invalid response (not a dict) from biomcp.openfda.exceptions import OpenFDAValidationError with pytest.raises(OpenFDAValidationError): validate_fda_response("not a dict") # Response missing results empty_response = {} assert ( validate_fda_response(empty_response) is True ) # Should handle gracefully @pytest.mark.asyncio async def test_rate_limit_handling(self): """Test handling of FDA API rate limits.""" with patch( "biomcp.openfda.drug_approvals.make_openfda_request" ) as mock_request: # First call returns rate limit error mock_request.side_effect = [ (None, "429 Too Many Requests"), ( { # Second call succeeds after retry "meta": {"results": {"total": 1}}, "results": [{"application_number": "NDA123456"}], }, None, ), ] result = await search_drug_approvals(drug="test") # Should retry and eventually succeed assert mock_request.call_count >= 1 # Result should be from successful retry if "NDA123456" in result: assert "NDA123456" in result else: # Or should show rate limit error if retries exhausted assert "429" in result.lower() or "too many" in result.lower() ``` -------------------------------------------------------------------------------- /src/biomcp/variants/cbioportal_mutations.py: -------------------------------------------------------------------------------- ```python """cBioPortal mutation-specific search functionality.""" import logging from collections import Counter, defaultdict from typing import Any, cast from pydantic import BaseModel, Field from ..utils.cancer_types_api import get_cancer_type_client from ..utils.cbio_http_adapter import CBioHTTPAdapter from ..utils.gene_validator import is_valid_gene_symbol, sanitize_gene_symbol from ..utils.metrics import track_api_call from ..utils.mutation_filter import MutationFilter from ..utils.request_cache import request_cache logger = logging.getLogger(__name__) class MutationHit(BaseModel): """A specific mutation occurrence in a study.""" study_id: str molecular_profile_id: str protein_change: str mutation_type: str start_position: int | None = None end_position: int | None = None reference_allele: str | None = None variant_allele: str | None = None sample_id: str | None = None class StudyMutationSummary(BaseModel): """Summary of mutations in a specific study.""" study_id: str study_name: str cancer_type: str mutation_count: int sample_count: int = 0 mutations: list[str] = Field(default_factory=list) class MutationSearchResult(BaseModel): """Result of a mutation-specific search.""" gene: str specific_mutation: str | None = None pattern: str | None = None total_studies: int = 0 studies_with_mutation: int = 0 total_mutations: int = 0 top_studies: list[StudyMutationSummary] = Field(default_factory=list) mutation_types: dict[str, int] = Field(default_factory=dict) class CBioPortalMutationClient: """Client for mutation-specific searches in cBioPortal.""" def __init__(self): """Initialize the mutation search client.""" self.http_adapter = CBioHTTPAdapter() async def __aenter__(self): """Async context manager entry.""" return self async def __aexit__(self, exc_type, exc_val, exc_tb): """Async context manager exit.""" pass # No cleanup needed with centralized client @request_cache(ttl=1800) # Cache for 30 minutes @track_api_call("cbioportal_mutation_search") async def search_specific_mutation( self, gene: str, mutation: str | None = None, pattern: str | None = None, max_studies: int = 20, ) -> MutationSearchResult | None: """Search for specific mutations across all cBioPortal studies. Args: gene: Gene symbol (e.g., "SRSF2") mutation: Specific mutation (e.g., "F57Y") pattern: Pattern to match (e.g., "F57" for F57*) max_studies: Maximum number of top studies to return Returns: Detailed mutation search results or None if not found """ # Validate gene if not is_valid_gene_symbol(gene): logger.warning(f"Invalid gene symbol: {gene}") return None gene = sanitize_gene_symbol(gene) try: return await self._search_mutations_with_adapter( gene, mutation, pattern, max_studies ) except TimeoutError: logger.error(f"Timeout searching mutations for {gene}") return None except Exception as e: logger.error(f"Error searching mutations for {gene}: {e}") return None async def _search_mutations_with_adapter( self, gene: str, mutation: str | None, pattern: str | None, max_studies: int, ) -> MutationSearchResult | None: """Perform the actual mutation search with the adapter.""" # Get gene info gene_data, error = await self.http_adapter.get( f"/genes/{gene}", endpoint_key="cbioportal_genes" ) if error or not gene_data: logger.warning(f"Gene {gene} not found in cBioPortal") return None entrez_id = gene_data.get("entrezGeneId") if not entrez_id: logger.warning(f"No Entrez ID found for gene {gene}") return None # Get all mutation profiles logger.info(f"Fetching mutation profiles for {gene}") all_profiles, prof_error = await self.http_adapter.get( "/molecular-profiles", params={"molecularAlterationType": "MUTATION_EXTENDED"}, endpoint_key="cbioportal_molecular_profiles", ) if prof_error or not all_profiles: logger.error("Failed to fetch molecular profiles") return None profile_ids = [p["molecularProfileId"] for p in all_profiles] # Batch fetch mutations (this is the slow part) logger.info( f"Fetching mutations for {gene} across {len(profile_ids)} profiles" ) mutations = await self._fetch_all_mutations(profile_ids, entrez_id) if not mutations: logger.info(f"No mutations found for {gene}") return MutationSearchResult(gene=gene) # Filter mutations based on criteria mutation_filter = MutationFilter(mutation, pattern) filtered_mutations = mutation_filter.filter_mutations(mutations) # Get study information studies_info = await self._get_studies_info() # Aggregate results by study study_mutations = self._aggregate_by_study( cast(list[MutationHit], filtered_mutations), studies_info ) # Sort by mutation count and take top studies top_studies = sorted( study_mutations.values(), key=lambda x: x.mutation_count, reverse=True, )[:max_studies] # Count mutation types mutation_types = Counter(m.protein_change for m in filtered_mutations) return MutationSearchResult( gene=gene, specific_mutation=mutation, pattern=pattern, total_studies=len(all_profiles), studies_with_mutation=len(study_mutations), total_mutations=len(filtered_mutations), top_studies=top_studies, mutation_types=dict(mutation_types.most_common(10)), ) @track_api_call("cbioportal_fetch_mutations") async def _fetch_all_mutations( self, profile_ids: list[str], entrez_id: int, ) -> list[MutationHit]: """Fetch all mutations for a gene across all profiles.""" try: raw_mutations, error = await self.http_adapter.post( "/mutations/fetch", data={ "molecularProfileIds": profile_ids, "entrezGeneIds": [entrez_id], }, endpoint_key="cbioportal_mutations", cache_ttl=1800, # Cache for 30 minutes ) if error or not raw_mutations: logger.error(f"Failed to fetch mutations: {error}") return [] # Convert to MutationHit objects mutations = [] for mut in raw_mutations: try: # Extract study ID from molecular profile ID study_id = mut.get("molecularProfileId", "").replace( "_mutations", "" ) mutations.append( MutationHit( study_id=study_id, molecular_profile_id=mut.get( "molecularProfileId", "" ), protein_change=mut.get("proteinChange", ""), mutation_type=mut.get("mutationType", ""), start_position=mut.get("startPosition"), end_position=mut.get("endPosition"), reference_allele=mut.get("referenceAllele"), variant_allele=mut.get("variantAllele"), sample_id=mut.get("sampleId"), ) ) except Exception as e: logger.debug(f"Failed to parse mutation: {e}") continue return mutations except Exception as e: logger.error(f"Error fetching mutations: {e}") return [] async def _get_studies_info(self) -> dict[str, dict[str, Any]]: """Get information about all studies.""" try: studies, error = await self.http_adapter.get( "/studies", endpoint_key="cbioportal_studies", cache_ttl=3600, # Cache for 1 hour ) if error or not studies: return {} study_info = {} cancer_type_client = get_cancer_type_client() for s in studies: cancer_type_id = s.get("cancerTypeId", "") if cancer_type_id and cancer_type_id != "unknown": # Use the API to get the proper display name cancer_type = ( await cancer_type_client.get_cancer_type_name( cancer_type_id ) ) else: # Try to get from full study info cancer_type = ( await cancer_type_client.get_study_cancer_type( s["studyId"] ) ) study_info[s["studyId"]] = { "name": s.get("name", ""), "cancer_type": cancer_type, } return study_info except Exception as e: logger.error(f"Error fetching studies: {e}") return {} def _aggregate_by_study( self, mutations: list[MutationHit], studies_info: dict[str, dict[str, Any]], ) -> dict[str, StudyMutationSummary]: """Aggregate mutations by study.""" study_mutations = defaultdict(list) study_samples = defaultdict(set) for mut in mutations: study_id = mut.study_id study_mutations[study_id].append(mut.protein_change) if mut.sample_id: study_samples[study_id].add(mut.sample_id) # Create summaries summaries = {} for study_id, mutations_list in study_mutations.items(): info = studies_info.get(study_id, {}) summaries[study_id] = StudyMutationSummary( study_id=study_id, study_name=info.get("name", study_id), cancer_type=info.get("cancer_type", "unknown"), mutation_count=len(mutations_list), sample_count=len(study_samples[study_id]), mutations=list(set(mutations_list))[ :5 ], # Top 5 unique mutations ) return summaries def format_mutation_search_result(result: MutationSearchResult) -> str: """Format mutation search results as markdown.""" lines = [f"### cBioPortal Mutation Search: {result.gene}"] if result.specific_mutation: lines.append(f"**Specific Mutation**: {result.specific_mutation}") elif result.pattern: lines.append(f"**Pattern**: {result.pattern}") lines.extend([ f"- **Total Studies**: {result.total_studies}", f"- **Studies with Mutation**: {result.studies_with_mutation}", f"- **Total Mutations Found**: {result.total_mutations}", ]) if result.top_studies: lines.append("\n**Top Studies by Mutation Count:**") lines.append("| Count | Study ID | Cancer Type | Study Name |") lines.append("|-------|----------|-------------|------------|") for study in result.top_studies[:10]: study_id = ( study.study_id[:20] + "..." if len(study.study_id) > 20 else study.study_id ) study_name = ( study.study_name[:40] + "..." if len(study.study_name) > 40 else study.study_name ) lines.append( f"| {study.mutation_count:5d} | {study_id:<20} | " f"{study.cancer_type:<11} | {study_name} |" ) if result.mutation_types and len(result.mutation_types) > 1: lines.append("\n**Mutation Types Found:**") for mut_type, count in list(result.mutation_types.items())[:5]: lines.append(f"- {mut_type}: {count} occurrences") return "\n".join(lines) ``` -------------------------------------------------------------------------------- /src/biomcp/router_handlers.py: -------------------------------------------------------------------------------- ```python """Domain-specific search handlers for the router module.""" import json import logging from typing import Any from .exceptions import ( InvalidParameterError, ResultParsingError, SearchExecutionError, ) from .parameter_parser import ParameterParser logger = logging.getLogger(__name__) async def handle_article_search( genes: list[str] | None, diseases: list[str] | None, variants: list[str] | None, chemicals: list[str] | None, keywords: list[str] | None, page: int, page_size: int, ) -> tuple[list[dict], int]: """Handle article domain search.""" logger.info("Executing article search") try: from biomcp.articles.search import PubmedRequest from biomcp.articles.unified import search_articles_unified request = PubmedRequest( chemicals=chemicals or [], diseases=diseases or [], genes=genes or [], keywords=keywords or [], variants=variants or [], ) result_str = await search_articles_unified( request, include_pubmed=True, include_preprints=True, # Changed to match individual tool default output_json=True, ) except Exception as e: logger.error(f"Article search failed: {e}") raise SearchExecutionError("article", e) from e # Parse the JSON results try: parsed_result = json.loads(result_str) # Handle unified search format (may include cBioPortal data) if isinstance(parsed_result, dict) and "articles" in parsed_result: all_results = parsed_result["articles"] # Log if cBioPortal data was included if "cbioportal_summary" in parsed_result: logger.info("Article search included cBioPortal summary data") elif isinstance(parsed_result, list): all_results = parsed_result else: # Handle unexpected format logger.warning( f"Unexpected article result format: {type(parsed_result)}" ) all_results = [] except (json.JSONDecodeError, TypeError) as e: logger.error(f"Failed to parse article results: {e}") raise ResultParsingError("article", e) from e # Manual pagination start = (page - 1) * page_size end = start + page_size items = all_results[start:end] total = len(all_results) logger.info( f"Article search returned {total} total results, showing {len(items)}" ) return items, total def _parse_trial_results(result_str: str) -> tuple[list[dict], int]: """Parse trial search results from JSON.""" try: result_dict = json.loads(result_str) # Handle both API v2 structure and flat structure if isinstance(result_dict, dict) and "studies" in result_dict: all_results = result_dict["studies"] elif isinstance(result_dict, list): all_results = result_dict else: all_results = [result_dict] except (json.JSONDecodeError, TypeError) as e: logger.error(f"Failed to parse trial results: {e}") raise ResultParsingError("trial", e) from e return all_results, len(all_results) async def handle_trial_search( conditions: list[str] | None, interventions: list[str] | None, keywords: list[str] | None, recruiting_status: str | None, phase: str | None, genes: list[str] | None, page: int, page_size: int, ) -> tuple[list[dict], int]: """Handle trial domain search.""" logger.info("Executing trial search") # Build the trial search parameters search_params: dict[str, Any] = {} if conditions: search_params["conditions"] = conditions if interventions: search_params["interventions"] = interventions if recruiting_status: search_params["recruiting_status"] = recruiting_status if phase: try: search_params["phase"] = ParameterParser.normalize_phase(phase) except InvalidParameterError: raise if keywords: search_params["keywords"] = keywords # Add gene support for trials if genes: # Convert genes to keywords for trial search if "keywords" in search_params: search_params["keywords"].extend(genes) else: search_params["keywords"] = genes try: from biomcp.trials.search import TrialQuery, search_trials # Convert search_params to TrialQuery trial_query = TrialQuery(**search_params, page_size=page_size) result_str = await search_trials(trial_query, output_json=True) except Exception as e: logger.error(f"Trial search failed: {e}") raise SearchExecutionError("trial", e) from e # Parse the JSON results all_results, total = _parse_trial_results(result_str) # Manual pagination start = (page - 1) * page_size end = start + page_size items = all_results[start:end] logger.info( f"Trial search returned {total} total results, showing {len(items)}" ) return items, total async def handle_variant_search( genes: list[str] | None, significance: str | None, keywords: list[str] | None, page: int, page_size: int, ) -> tuple[list[dict], int]: """Handle variant domain search.""" logger.info("Executing variant search") try: from biomcp.variants.search import VariantQuery, search_variants # Build query queries = [] if genes: queries.extend(genes) if keywords: queries.extend(keywords) if not queries: raise InvalidParameterError( "genes or keywords", None, "at least one search term for variant search", ) request = VariantQuery( gene=genes[0] if genes else None, size=page_size, significance=significance, ) result_str = await search_variants(request, output_json=True) except Exception as e: logger.error(f"Variant search failed: {e}") raise SearchExecutionError("variant", e) from e # Parse the JSON results try: all_results = json.loads(result_str) except (json.JSONDecodeError, TypeError) as e: logger.error(f"Failed to parse variant results: {e}") raise ResultParsingError("variant", e) from e # Variants API returns paginated results total = len(all_results) logger.info(f"Variant search returned {total} results") return all_results, total async def handle_nci_organization_search( name: str | None, organization_type: str | None, city: str | None, state: str | None, api_key: str | None, page: int, page_size: int, ) -> tuple[list[dict], int]: """Handle NCI organization domain search.""" logger.info("Executing NCI organization search") try: from biomcp.organizations import ( search_organizations, search_organizations_with_or, ) # Check if name contains OR query if name and (" OR " in name or " or " in name): results = await search_organizations_with_or( name_query=name, org_type=organization_type, city=city, state=state, page_size=page_size, page=page, api_key=api_key, ) else: results = await search_organizations( name=name, org_type=organization_type, city=city, state=state, page_size=page_size, page=page, api_key=api_key, ) items = results.get("organizations", []) total = results.get("total", len(items)) except Exception as e: logger.error(f"NCI organization search failed: {e}") raise SearchExecutionError("nci_organization", e) from e logger.info(f"NCI organization search returned {total} results") return items, total async def handle_nci_intervention_search( name: str | None, intervention_type: str | None, synonyms: bool, api_key: str | None, page: int, page_size: int, ) -> tuple[list[dict], int]: """Handle NCI intervention domain search.""" logger.info("Executing NCI intervention search") try: from biomcp.interventions import ( search_interventions, search_interventions_with_or, ) # Check if name contains OR query if name and (" OR " in name or " or " in name): results = await search_interventions_with_or( name_query=name, intervention_type=intervention_type, synonyms=synonyms, page_size=page_size, page=page, api_key=api_key, ) else: results = await search_interventions( name=name, intervention_type=intervention_type, synonyms=synonyms, page_size=page_size, page=page, api_key=api_key, ) items = results.get("interventions", []) total = results.get("total", len(items)) except Exception as e: logger.error(f"NCI intervention search failed: {e}") raise SearchExecutionError("nci_intervention", e) from e logger.info(f"NCI intervention search returned {total} results") return items, total async def handle_nci_biomarker_search( name: str | None, gene: str | None, biomarker_type: str | None, assay_type: str | None, api_key: str | None, page: int, page_size: int, ) -> tuple[list[dict], int]: """Handle NCI biomarker domain search.""" logger.info("Executing NCI biomarker search") try: from biomcp.biomarkers import ( search_biomarkers, search_biomarkers_with_or, ) # Check if name contains OR query if name and (" OR " in name or " or " in name): results = await search_biomarkers_with_or( name_query=name, eligibility_criterion=gene, # Map gene to eligibility_criterion biomarker_type=biomarker_type, assay_purpose=assay_type, # Map assay_type to assay_purpose page_size=page_size, page=page, api_key=api_key, ) else: results = await search_biomarkers( name=name, eligibility_criterion=gene, # Map gene to eligibility_criterion biomarker_type=biomarker_type, assay_purpose=assay_type, # Map assay_type to assay_purpose page_size=page_size, page=page, api_key=api_key, ) items = results.get("biomarkers", []) total = results.get("total", len(items)) except Exception as e: logger.error(f"NCI biomarker search failed: {e}") raise SearchExecutionError("nci_biomarker", e) from e logger.info(f"NCI biomarker search returned {total} results") return items, total async def handle_nci_disease_search( name: str | None, include_synonyms: bool, category: str | None, api_key: str | None, page: int, page_size: int, ) -> tuple[list[dict], int]: """Handle NCI disease domain search.""" logger.info("Executing NCI disease search") try: from biomcp.diseases import search_diseases, search_diseases_with_or # Check if name contains OR query if name and (" OR " in name or " or " in name): results = await search_diseases_with_or( name_query=name, include_synonyms=include_synonyms, category=category, page_size=page_size, page=page, api_key=api_key, ) else: results = await search_diseases( name=name, include_synonyms=include_synonyms, category=category, page_size=page_size, page=page, api_key=api_key, ) items = results.get("diseases", []) total = results.get("total", len(items)) except Exception as e: logger.error(f"NCI disease search failed: {e}") raise SearchExecutionError("nci_disease", e) from e logger.info(f"NCI disease search returned {total} results") return items, total ``` -------------------------------------------------------------------------------- /docs/apis/python-sdk.md: -------------------------------------------------------------------------------- ```markdown # Python Package Reference The BioMCP Python package provides direct access to biomedical data search and retrieval functions through modular domain-specific APIs. ## Installation ```bash pip install biomcp-python ``` ## Quick Start ```python import asyncio from biomcp.variants.search import search_variants, VariantQuery, ClinicalSignificance from biomcp.articles.search import search_articles, PubmedRequest from biomcp.trials.search import search_trials, TrialQuery async def main(): # Search for pathogenic variants variant_query = VariantQuery( gene="BRAF", significance=ClinicalSignificance.PATHOGENIC ) variants_result = await search_variants(variant_query) # Search articles article_request = PubmedRequest( genes=["BRAF"], diseases=["melanoma"] ) articles_result = await search_articles(article_request) # Search clinical trials trial_query = TrialQuery( conditions=["melanoma"], status="RECRUITING" ) trials_result = await search_trials(trial_query) asyncio.run(main()) ``` ## API Structure The BioMCP package is organized into domain-specific modules that you import directly: ### Available Modules - **Variants**: `biomcp.variants.search` - Search genetic variants - **Articles**: `biomcp.articles.search` - Search biomedical literature - **Trials**: `biomcp.trials.search` - Search clinical trials - **Genes**: `biomcp.genes` - Get gene information - **Diseases**: `biomcp.diseases` - Get disease information - **Drugs**: `biomcp.drugs` - Get drug information ### Import Patterns ```python # Variants from biomcp.variants.search import search_variants, VariantQuery, ClinicalSignificance from biomcp.variants.getter import get_variant from biomcp.variants.alphagenome import predict_variant_effects # Articles from biomcp.articles.search import search_articles, PubmedRequest # Trials from biomcp.trials.search import search_trials, TrialQuery, TrialPhase # Direct functions from biomcp.genes import get_gene from biomcp.diseases import get_disease from biomcp.drugs import get_drug ``` ## Articles API ### search_articles() Search PubMed/PubTator3 for biomedical literature. ```python from biomcp.articles.search import search_articles, PubmedRequest async def search_articles( request: PubmedRequest, output_json: bool = False ) -> str: ``` **PubmedRequest Parameters:** - `genes`: List of gene symbols (e.g., ["BRAF", "KRAS"]) - `diseases`: List of disease/condition terms - `chemicals`: List of drug/chemical names - `variants`: List of variant notations - `keywords`: Additional search keywords (supports OR with |) **Example:** ```python from biomcp.articles.search import search_articles, PubmedRequest # Basic search request = PubmedRequest( genes=["EGFR"], diseases=["lung cancer"] ) results = await search_articles(request) # Advanced search with keywords request = PubmedRequest( genes=["BRAF"], keywords=["V600E|p.V600E|resistance"], chemicals=["vemurafenib", "dabrafenib"] ) results = await search_articles(request) ``` ## Trials API ### search_trials() Search clinical trials from ClinicalTrials.gov. ```python from biomcp.trials.search import search_trials, TrialQuery, TrialPhase, RecruitingStatus async def search_trials( query: TrialQuery, output_json: bool = False ) -> str: ``` **TrialQuery Parameters:** - `conditions`: Disease/condition terms - `interventions`: Treatment/intervention terms - `other_terms`: Additional search terms - `status`: Trial status (use RecruitingStatus enum) - `phase`: Trial phase (use TrialPhase enum) - `study_type`: INTERVENTIONAL or OBSERVATIONAL - `lat`, `long`, `distance`: Geographic search parameters **Available Enums:** - `TrialPhase`: EARLY_PHASE1, PHASE1, PHASE2, PHASE3, PHASE4, NOT_APPLICABLE - `RecruitingStatus`: OPEN, CLOSED, ANY - `StudyType`: INTERVENTIONAL, OBSERVATIONAL, EXPANDED_ACCESS **Example:** ```python from biomcp.trials.search import search_trials, TrialQuery, TrialPhase # Basic search query = TrialQuery( conditions=["melanoma"], phase=TrialPhase.PHASE3, recruiting_status="RECRUITING" ) results = await search_trials(query) # Location-based search query = TrialQuery( conditions=["breast cancer"], lat=40.7128, long=-74.0060, distance=50 ) results = await search_trials(query) ``` ## Variants API ### search_variants() Search genetic variants in MyVariant.info. ```python from biomcp.variants.search import search_variants, VariantQuery, ClinicalSignificance async def search_variants( query: VariantQuery, output_json: bool = False, include_cbioportal: bool = True ) -> str: ``` **VariantQuery Parameters:** - `gene`: Gene symbol (e.g. BRAF, TP53) - `hgvsp`: Protein change notation (e.g., p.V600E, p.Arg557His) - `hgvsc`: cDNA notation (e.g., c.1799T>A) - `rsid`: dbSNP rsID (e.g., rs113488022) - `region`: Genomic region as chr:start-end (e.g. chr1:12345-67890) - `significance`: ClinVar clinical significance (use ClinicalSignificance enum) - `min_frequency`, `max_frequency`: Allele frequency filters - `cadd`: Minimum CADD phred score - `polyphen`: PolyPhen-2 prediction (use PolyPhenPrediction enum) - `sift`: SIFT prediction (use SiftPrediction enum) - `sources`: Include only specific data sources - `size`: Number of results to return - `offset`: Result offset for pagination **Available Enums:** - `ClinicalSignificance`: PATHOGENIC, LIKELY_PATHOGENIC, UNCERTAIN_SIGNIFICANCE, LIKELY_BENIGN, BENIGN - `PolyPhenPrediction`: PROBABLY_DAMAGING, POSSIBLY_DAMAGING, BENIGN - `SiftPrediction`: DELETERIOUS, TOLERATED **Example:** ```python from biomcp.variants.search import search_variants, VariantQuery, ClinicalSignificance # Search pathogenic variants query = VariantQuery( gene="BRCA1", significance=ClinicalSignificance.PATHOGENIC, max_frequency=0.01 ) results = await search_variants(query) # Search by genomic region query = VariantQuery( region="chr7:140453136-140453137" ) results = await search_variants(query) # Search by protein change query = VariantQuery( gene="BRAF", hgvsp="p.V600E" ) results = await search_variants(query) ``` ### get_variant() Get detailed variant information. ```python from biomcp.variants.getter import get_variant async def get_variant( variant_id: str, output_json: bool = False, include_external: bool = False ) -> str: ``` **Parameters:** - `variant_id`: Variant identifier (HGVS, rsID, or genomic like "chr7:g.140453136A>T") - `output_json`: Return JSON format instead of markdown - `include_external`: Include external database annotations **Example:** ```python # Get by HGVS variant_info = await get_variant("chr7:g.140453136A>T") # Get by rsID variant_info = await get_variant("rs113488022") ``` ### predict_variant_effects() Predict variant effects using AlphaGenome AI. ```python from biomcp.variants.alphagenome import predict_variant_effects async def predict_variant_effects( chromosome: str, position: int, reference: str, alternate: str, interval_size: int = 131_072, tissue_types: list[str] | None = None, significance_threshold: float = 0.5, api_key: str | None = None ) -> str: ``` **Parameters:** - `chromosome`: Chromosome (e.g., 'chr7') - `position`: 1-based genomic position - `reference`: Reference allele(s) - `alternate`: Alternate allele(s) - `interval_size`: Size of genomic context window (max 1,000,000) - `tissue_types`: UBERON tissue ontology terms for tissue-specific predictions - `significance_threshold`: Threshold for significant log2 fold changes - `api_key`: AlphaGenome API key (or set ALPHAGENOME_API_KEY env var) **Example:** ```python # Predict effects of BRAF V600E mutation prediction = await predict_variant_effects( chromosome="chr7", position=140753336, reference="A", alternate="T", api_key="your-alphagenome-api-key" ) ``` ## Direct Data APIs ### get_gene() Get gene information from MyGene.info. ```python from biomcp.genes import get_gene async def get_gene( gene_id_or_symbol: str, output_json: bool = False ) -> str: ``` **Example:** ```python gene_info = await get_gene("BRCA1") ``` ### get_disease() Get disease information from MyDisease.info. ```python from biomcp.diseases import get_disease async def get_disease( disease_id_or_name: str, output_json: bool = False ) -> str: ``` **Example:** ```python disease_info = await get_disease("melanoma") ``` ### get_drug() Get drug information from MyChem.info. ```python from biomcp.drugs import get_drug async def get_drug( drug_id_or_name: str, output_json: bool = False ) -> str: ``` **Example:** ```python drug_info = await get_drug("imatinib") ``` ## Complete Analysis Example ```python import asyncio from biomcp.variants.search import search_variants, VariantQuery, ClinicalSignificance from biomcp.articles.search import search_articles, PubmedRequest from biomcp.trials.search import search_trials, TrialQuery, TrialPhase from biomcp.genes import get_gene async def analyze_gene_variants(gene_symbol: str, disease: str): """Complete gene variant analysis workflow.""" # 1. Get gene information gene_info = await get_gene(gene_symbol) print(f"Gene: {gene_symbol}") # 2. Search for pathogenic variants variant_query = VariantQuery( gene=gene_symbol, significance=ClinicalSignificance.PATHOGENIC, max_frequency=0.01 # Rare variants ) variants_result = await search_variants(variant_query) print(f"Found pathogenic variants for {gene_symbol}") # 3. Search related literature article_request = PubmedRequest( genes=[gene_symbol], diseases=[disease], keywords=["therapy", "treatment", "prognosis"] ) articles_result = await search_articles(article_request) print(f"Found literature on {gene_symbol} and {disease}") # 4. Find clinical trials trial_query = TrialQuery( conditions=[disease], other_terms=[gene_symbol, f"{gene_symbol} mutation"], phase=TrialPhase.PHASE3, recruiting_status="RECRUITING" ) trials_result = await search_trials(trial_query) print(f"Found trials for {disease} with {gene_symbol}") return { "gene_info": gene_info, "variants": variants_result, "articles": articles_result, "trials": trials_result } # Run the analysis results = asyncio.run(analyze_gene_variants("BRAF", "melanoma")) ``` ## LangChain Integration ```python from langchain.tools import tool from biomcp.variants.search import search_variants, VariantQuery, ClinicalSignificance from biomcp.articles.search import search_articles, PubmedRequest @tool def search_pathogenic_variants(gene: str) -> str: """Search for pathogenic variants in a specific gene.""" import asyncio async def _search(): query = VariantQuery( gene=gene, significance=ClinicalSignificance.PATHOGENIC ) return await search_variants(query) return asyncio.run(_search()) @tool def search_gene_literature(gene: str, disease: str = None) -> str: """Search for scientific literature about a gene and optionally a disease.""" import asyncio async def _search(): request = PubmedRequest( genes=[gene], diseases=[disease] if disease else [] ) return await search_articles(request) return asyncio.run(_search()) # Use with your LLM/agent framework tools = [search_pathogenic_variants, search_gene_literature] ``` ## Key Differences from Other Documentation ❌ **Does NOT work:** ```python from biomcp import BioMCPClient # This class doesn't exist ``` ✅ **Actually works:** ```python from biomcp.variants.search import search_variants, VariantQuery from biomcp.articles.search import search_articles, PubmedRequest from biomcp.trials.search import search_trials, TrialQuery ``` ## Summary The BioMCP package provides powerful biomedical data access through: - **Direct async functions** for each domain (variants, articles, trials, genes, diseases, drugs) - **Pydantic models** for type-safe queries and responses - **Comprehensive enums** for standardized values - **No unified client** - use individual domain modules directly This modular approach works well for building tools and integrating with frameworks like LangChain, as it provides direct access to specific functionality without the overhead of a unified client interface. ## Additional Resources - [MCP Tools Reference](../mcp-tools/) - [CLI Commands](../cli/) - [How-to Guides](../how-to-guides/01-find-articles-and-cbioportal-data.md) ```