This is page 2 of 15. Use http://codebase.md/genomoncology/biomcp?page={x} to view the full context. # Directory Structure ``` ├── .github │ ├── actions │ │ └── setup-python-env │ │ └── action.yml │ ├── dependabot.yml │ └── workflows │ ├── ci.yml │ ├── deploy-docs.yml │ ├── main.yml.disabled │ ├── on-release-main.yml │ └── validate-codecov-config.yml ├── .gitignore ├── .pre-commit-config.yaml ├── BIOMCP_DATA_FLOW.md ├── CHANGELOG.md ├── CNAME ├── codecov.yaml ├── docker-compose.yml ├── Dockerfile ├── docs │ ├── apis │ │ ├── error-codes.md │ │ ├── overview.md │ │ └── python-sdk.md │ ├── assets │ │ ├── biomcp-cursor-locations.png │ │ ├── favicon.ico │ │ ├── icon.png │ │ ├── logo.png │ │ ├── mcp_architecture.txt │ │ └── remote-connection │ │ ├── 00_connectors.png │ │ ├── 01_add_custom_connector.png │ │ ├── 02_connector_enabled.png │ │ ├── 03_connect_to_biomcp.png │ │ ├── 04_select_google_oauth.png │ │ └── 05_success_connect.png │ ├── backend-services-reference │ │ ├── 01-overview.md │ │ ├── 02-biothings-suite.md │ │ ├── 03-cbioportal.md │ │ ├── 04-clinicaltrials-gov.md │ │ ├── 05-nci-cts-api.md │ │ ├── 06-pubtator3.md │ │ └── 07-alphagenome.md │ ├── blog │ │ ├── ai-assisted-clinical-trial-search-analysis.md │ │ ├── images │ │ │ ├── deep-researcher-video.png │ │ │ ├── researcher-announce.png │ │ │ ├── researcher-drop-down.png │ │ │ ├── researcher-prompt.png │ │ │ ├── trial-search-assistant.png │ │ │ └── what_is_biomcp_thumbnail.png │ │ └── researcher-persona-resource.md │ ├── changelog.md │ ├── CNAME │ ├── concepts │ │ ├── 01-what-is-biomcp.md │ │ ├── 02-the-deep-researcher-persona.md │ │ └── 03-sequential-thinking-with-the-think-tool.md │ ├── developer-guides │ │ ├── 01-server-deployment.md │ │ ├── 02-contributing-and-testing.md │ │ ├── 03-third-party-endpoints.md │ │ ├── 04-transport-protocol.md │ │ ├── 05-error-handling.md │ │ ├── 06-http-client-and-caching.md │ │ ├── 07-performance-optimizations.md │ │ └── generate_endpoints.py │ ├── faq-condensed.md │ ├── FDA_SECURITY.md │ ├── genomoncology.md │ ├── getting-started │ │ ├── 01-quickstart-cli.md │ │ ├── 02-claude-desktop-integration.md │ │ └── 03-authentication-and-api-keys.md │ ├── how-to-guides │ │ ├── 01-find-articles-and-cbioportal-data.md │ │ ├── 02-find-trials-with-nci-and-biothings.md │ │ ├── 03-get-comprehensive-variant-annotations.md │ │ ├── 04-predict-variant-effects-with-alphagenome.md │ │ ├── 05-logging-and-monitoring-with-bigquery.md │ │ └── 06-search-nci-organizations-and-interventions.md │ ├── index.md │ ├── policies.md │ ├── reference │ │ ├── architecture-diagrams.md │ │ ├── quick-architecture.md │ │ ├── quick-reference.md │ │ └── visual-architecture.md │ ├── robots.txt │ ├── stylesheets │ │ ├── announcement.css │ │ └── extra.css │ ├── troubleshooting.md │ ├── tutorials │ │ ├── biothings-prompts.md │ │ ├── claude-code-biomcp-alphagenome.md │ │ ├── nci-prompts.md │ │ ├── openfda-integration.md │ │ ├── openfda-prompts.md │ │ ├── pydantic-ai-integration.md │ │ └── remote-connection.md │ ├── user-guides │ │ ├── 01-command-line-interface.md │ │ ├── 02-mcp-tools-reference.md │ │ └── 03-integrating-with-ides-and-clients.md │ └── workflows │ └── all-workflows.md ├── example_scripts │ ├── mcp_integration.py │ └── python_sdk.py ├── glama.json ├── LICENSE ├── lzyank.toml ├── Makefile ├── mkdocs.yml ├── package-lock.json ├── package.json ├── pyproject.toml ├── README.md ├── scripts │ ├── check_docs_in_mkdocs.py │ ├── check_http_imports.py │ └── generate_endpoints_doc.py ├── smithery.yaml ├── src │ └── biomcp │ ├── __init__.py │ ├── __main__.py │ ├── articles │ │ ├── __init__.py │ │ ├── autocomplete.py │ │ ├── fetch.py │ │ ├── preprints.py │ │ ├── search_optimized.py │ │ ├── search.py │ │ └── unified.py │ ├── biomarkers │ │ ├── __init__.py │ │ └── search.py │ ├── cbioportal_helper.py │ ├── circuit_breaker.py │ ├── cli │ │ ├── __init__.py │ │ ├── articles.py │ │ ├── biomarkers.py │ │ ├── diseases.py │ │ ├── health.py │ │ ├── interventions.py │ │ ├── main.py │ │ ├── openfda.py │ │ ├── organizations.py │ │ ├── server.py │ │ ├── trials.py │ │ └── variants.py │ ├── connection_pool.py │ ├── constants.py │ ├── core.py │ ├── diseases │ │ ├── __init__.py │ │ ├── getter.py │ │ └── search.py │ ├── domain_handlers.py │ ├── drugs │ │ ├── __init__.py │ │ └── getter.py │ ├── exceptions.py │ ├── genes │ │ ├── __init__.py │ │ └── getter.py │ ├── http_client_simple.py │ ├── http_client.py │ ├── individual_tools.py │ ├── integrations │ │ ├── __init__.py │ │ ├── biothings_client.py │ │ └── cts_api.py │ ├── interventions │ │ ├── __init__.py │ │ ├── getter.py │ │ └── search.py │ ├── logging_filter.py │ ├── metrics_handler.py │ ├── metrics.py │ ├── openfda │ │ ├── __init__.py │ │ ├── adverse_events_helpers.py │ │ ├── adverse_events.py │ │ ├── cache.py │ │ ├── constants.py │ │ ├── device_events_helpers.py │ │ ├── device_events.py │ │ ├── drug_approvals.py │ │ ├── drug_labels_helpers.py │ │ ├── drug_labels.py │ │ ├── drug_recalls_helpers.py │ │ ├── drug_recalls.py │ │ ├── drug_shortages_detail_helpers.py │ │ ├── drug_shortages_helpers.py │ │ ├── drug_shortages.py │ │ ├── exceptions.py │ │ ├── input_validation.py │ │ ├── rate_limiter.py │ │ ├── utils.py │ │ └── validation.py │ ├── organizations │ │ ├── __init__.py │ │ ├── getter.py │ │ └── search.py │ ├── parameter_parser.py │ ├── prefetch.py │ ├── query_parser.py │ ├── query_router.py │ ├── rate_limiter.py │ ├── render.py │ ├── request_batcher.py │ ├── resources │ │ ├── __init__.py │ │ ├── getter.py │ │ ├── instructions.md │ │ └── researcher.md │ ├── retry.py │ ├── router_handlers.py │ ├── router.py │ ├── shared_context.py │ ├── thinking │ │ ├── __init__.py │ │ ├── sequential.py │ │ └── session.py │ ├── thinking_tool.py │ ├── thinking_tracker.py │ ├── trials │ │ ├── __init__.py │ │ ├── getter.py │ │ ├── nci_getter.py │ │ ├── nci_search.py │ │ └── search.py │ ├── utils │ │ ├── __init__.py │ │ ├── cancer_types_api.py │ │ ├── cbio_http_adapter.py │ │ ├── endpoint_registry.py │ │ ├── gene_validator.py │ │ ├── metrics.py │ │ ├── mutation_filter.py │ │ ├── query_utils.py │ │ ├── rate_limiter.py │ │ └── request_cache.py │ ├── variants │ │ ├── __init__.py │ │ ├── alphagenome.py │ │ ├── cancer_types.py │ │ ├── cbio_external_client.py │ │ ├── cbioportal_mutations.py │ │ ├── cbioportal_search_helpers.py │ │ ├── cbioportal_search.py │ │ ├── constants.py │ │ ├── external.py │ │ ├── filters.py │ │ ├── getter.py │ │ ├── links.py │ │ └── search.py │ └── workers │ ├── __init__.py │ ├── worker_entry_stytch.js │ ├── worker_entry.js │ └── worker.py ├── tests │ ├── bdd │ │ ├── cli_help │ │ │ ├── help.feature │ │ │ └── test_help.py │ │ ├── conftest.py │ │ ├── features │ │ │ └── alphagenome_integration.feature │ │ ├── fetch_articles │ │ │ ├── fetch.feature │ │ │ └── test_fetch.py │ │ ├── get_trials │ │ │ ├── get.feature │ │ │ └── test_get.py │ │ ├── get_variants │ │ │ ├── get.feature │ │ │ └── test_get.py │ │ ├── search_articles │ │ │ ├── autocomplete.feature │ │ │ ├── search.feature │ │ │ ├── test_autocomplete.py │ │ │ └── test_search.py │ │ ├── search_trials │ │ │ ├── search.feature │ │ │ └── test_search.py │ │ ├── search_variants │ │ │ ├── search.feature │ │ │ └── test_search.py │ │ └── steps │ │ └── test_alphagenome_steps.py │ ├── config │ │ └── test_smithery_config.py │ ├── conftest.py │ ├── data │ │ ├── ct_gov │ │ │ ├── clinical_trials_api_v2.yaml │ │ │ ├── trials_NCT04280705.json │ │ │ └── trials_NCT04280705.txt │ │ ├── myvariant │ │ │ ├── myvariant_api.yaml │ │ │ ├── myvariant_field_descriptions.csv │ │ │ ├── variants_full_braf_v600e.json │ │ │ ├── variants_full_braf_v600e.txt │ │ │ └── variants_part_braf_v600_multiple.json │ │ ├── openfda │ │ │ ├── drugsfda_detail.json │ │ │ ├── drugsfda_search.json │ │ │ ├── enforcement_detail.json │ │ │ └── enforcement_search.json │ │ └── pubtator │ │ ├── pubtator_autocomplete.json │ │ └── pubtator3_paper.txt │ ├── integration │ │ ├── test_openfda_integration.py │ │ ├── test_preprints_integration.py │ │ ├── test_simple.py │ │ └── test_variants_integration.py │ ├── tdd │ │ ├── articles │ │ │ ├── test_autocomplete.py │ │ │ ├── test_cbioportal_integration.py │ │ │ ├── test_fetch.py │ │ │ ├── test_preprints.py │ │ │ ├── test_search.py │ │ │ └── test_unified.py │ │ ├── conftest.py │ │ ├── drugs │ │ │ ├── __init__.py │ │ │ └── test_drug_getter.py │ │ ├── openfda │ │ │ ├── __init__.py │ │ │ ├── test_adverse_events.py │ │ │ ├── test_device_events.py │ │ │ ├── test_drug_approvals.py │ │ │ ├── test_drug_labels.py │ │ │ ├── test_drug_recalls.py │ │ │ ├── test_drug_shortages.py │ │ │ └── test_security.py │ │ ├── test_biothings_integration_real.py │ │ ├── test_biothings_integration.py │ │ ├── test_circuit_breaker.py │ │ ├── test_concurrent_requests.py │ │ ├── test_connection_pool.py │ │ ├── test_domain_handlers.py │ │ ├── test_drug_approvals.py │ │ ├── test_drug_recalls.py │ │ ├── test_drug_shortages.py │ │ ├── test_endpoint_documentation.py │ │ ├── test_error_scenarios.py │ │ ├── test_europe_pmc_fetch.py │ │ ├── test_mcp_integration.py │ │ ├── test_mcp_tools.py │ │ ├── test_metrics.py │ │ ├── test_nci_integration.py │ │ ├── test_nci_mcp_tools.py │ │ ├── test_network_policies.py │ │ ├── test_offline_mode.py │ │ ├── test_openfda_unified.py │ │ ├── test_pten_r173_search.py │ │ ├── test_render.py │ │ ├── test_request_batcher.py.disabled │ │ ├── test_retry.py │ │ ├── test_router.py │ │ ├── test_shared_context.py.disabled │ │ ├── test_unified_biothings.py │ │ ├── thinking │ │ │ ├── __init__.py │ │ │ └── test_sequential.py │ │ ├── trials │ │ │ ├── test_backward_compatibility.py │ │ │ ├── test_getter.py │ │ │ └── test_search.py │ │ ├── utils │ │ │ ├── test_gene_validator.py │ │ │ ├── test_mutation_filter.py │ │ │ ├── test_rate_limiter.py │ │ │ └── test_request_cache.py │ │ ├── variants │ │ │ ├── constants.py │ │ │ ├── test_alphagenome_api_key.py │ │ │ ├── test_alphagenome_comprehensive.py │ │ │ ├── test_alphagenome.py │ │ │ ├── test_cbioportal_mutations.py │ │ │ ├── test_cbioportal_search.py │ │ │ ├── test_external_integration.py │ │ │ ├── test_external.py │ │ │ ├── test_extract_gene_aa_change.py │ │ │ ├── test_filters.py │ │ │ ├── test_getter.py │ │ │ ├── test_links.py │ │ │ └── test_search.py │ │ └── workers │ │ └── test_worker_sanitization.js │ └── test_pydantic_ai_integration.py ├── THIRD_PARTY_ENDPOINTS.md ├── tox.ini ├── uv.lock └── wrangler.toml ``` # Files -------------------------------------------------------------------------------- /src/biomcp/thinking_tool.py: -------------------------------------------------------------------------------- ```python """Sequential thinking tool for structured problem-solving. This module provides a dedicated MCP tool for sequential thinking, separate from the main search functionality. """ from typing import Annotated from pydantic import Field from biomcp.core import mcp_app from biomcp.metrics import track_performance from biomcp.thinking.sequential import _sequential_thinking from biomcp.thinking_tracker import mark_thinking_used @mcp_app.tool() @track_performance("biomcp.think") async def think( thought: Annotated[ str, Field(description="Current thinking step for analysis"), ], thoughtNumber: Annotated[ int, Field( description="Current thought number, starting at 1", ge=1, ), ], totalThoughts: Annotated[ int, Field( description="Estimated total thoughts needed for complete analysis", ge=1, ), ], nextThoughtNeeded: Annotated[ bool, Field( description="Whether more thinking steps are needed after this one", ), ] = True, ) -> dict: """REQUIRED FIRST STEP: Perform structured sequential thinking for ANY biomedical research task. 🚨 IMPORTANT: You MUST use this tool BEFORE any search or fetch operations when: - Researching ANY biomedical topic (genes, diseases, variants, trials) - Planning to use multiple BioMCP tools - Answering questions that require analysis or synthesis - Comparing information from different sources - Making recommendations or drawing conclusions ⚠️ FAILURE TO USE THIS TOOL FIRST will result in: - Incomplete or poorly structured analysis - Missing important connections between data - Suboptimal search strategies - Overlooked critical information Sequential thinking ensures you: 1. Fully understand the research question 2. Plan an optimal search strategy 3. Identify all relevant data sources 4. Structure your analysis properly 5. Deliver comprehensive, well-reasoned results ## Usage Pattern: 1. Start with thoughtNumber=1 to initiate analysis 2. Progress through numbered thoughts sequentially 3. Adjust totalThoughts estimate as understanding develops 4. Set nextThoughtNeeded=False only when analysis is complete ## Example: ```python # Initial analysis await think( thought="Breaking down the relationship between BRAF mutations and melanoma treatment resistance...", thoughtNumber=1, totalThoughts=5, nextThoughtNeeded=True ) # Continue analysis await think( thought="Examining specific BRAF V600E mutation mechanisms...", thoughtNumber=2, totalThoughts=5, nextThoughtNeeded=True ) # Final thought await think( thought="Synthesizing findings and proposing research directions...", thoughtNumber=5, totalThoughts=5, nextThoughtNeeded=False ) ``` ## Important Notes: - Each thought builds on previous ones within a session - State is maintained throughout the MCP session - Use thoughtful, detailed analysis in each step - Revisions and branching are supported through the underlying implementation """ # Mark that thinking has been used mark_thinking_used() result = await _sequential_thinking( thought=thought, thoughtNumber=thoughtNumber, totalThoughts=totalThoughts, nextThoughtNeeded=nextThoughtNeeded, ) return { "domain": "thinking", "result": result, "thoughtNumber": thoughtNumber, "nextThoughtNeeded": nextThoughtNeeded, } ``` -------------------------------------------------------------------------------- /tests/tdd/variants/test_search.py: -------------------------------------------------------------------------------- ```python import pytest from biomcp.variants.search import ( ClinicalSignificance, PolyPhenPrediction, SiftPrediction, VariantQuery, build_query_string, search_variants, ) @pytest.fixture def basic_query(): """Create a basic gene query.""" return VariantQuery(gene="BRAF") @pytest.fixture def complex_query(): """Create a complex query with multiple parameters.""" return VariantQuery( gene="BRCA1", significance=ClinicalSignificance.PATHOGENIC, min_frequency=0.0001, max_frequency=0.01, ) def test_query_validation(): """Test VariantQuery model validation.""" # Test basic query with gene query = VariantQuery(gene="BRAF") assert query.gene == "BRAF" # Test query with rsid query = VariantQuery(rsid="rs113488022") assert query.rsid == "rs113488022" # Test query requires at least one search parameter with pytest.raises(ValueError): VariantQuery() # Test query with clinical significance enum requires a search parameter query = VariantQuery( gene="BRCA1", significance=ClinicalSignificance.PATHOGENIC ) assert query.significance == ClinicalSignificance.PATHOGENIC # Test query with prediction scores query = VariantQuery( gene="TP53", polyphen=PolyPhenPrediction.PROBABLY_DAMAGING, sift=SiftPrediction.DELETERIOUS, ) assert query.polyphen == PolyPhenPrediction.PROBABLY_DAMAGING assert query.sift == SiftPrediction.DELETERIOUS def test_build_query_string(): """Test build_query_string function.""" # Test single field query = VariantQuery(gene="BRAF") q_string = build_query_string(query) assert 'dbnsfp.genename:"BRAF"' in q_string # Test multiple fields query = VariantQuery(gene="BRAF", rsid="rs113488022") q_string = build_query_string(query) assert 'dbnsfp.genename:"BRAF"' in q_string assert "rs113488022" in q_string # Test genomic region query = VariantQuery(region="chr7:140753300-140753400") q_string = build_query_string(query) assert "chr7:140753300-140753400" in q_string # Test clinical significance query = VariantQuery(significance=ClinicalSignificance.LIKELY_BENIGN) q_string = build_query_string(query) assert 'clinvar.rcv.clinical_significance:"likely benign"' in q_string # Test frequency filters query = VariantQuery(min_frequency=0.0001, max_frequency=0.01) q_string = build_query_string(query) assert "gnomad_exome.af.af:>=0.0001" in q_string assert "gnomad_exome.af.af:<=0.01" in q_string async def test_search_variants_basic(basic_query, anyio_backend): """Test search_variants function with a basic query.""" # Use a real API query for a common gene result = await search_variants(basic_query) # Verify we got sensible results assert "BRAF" in result assert not result.startswith("Error") async def test_search_variants_complex(complex_query, anyio_backend): """Test search_variants function with a complex query.""" # Use a simple common query that will return results simple_query = VariantQuery(gene="TP53") result = await search_variants(simple_query) # Verify response formatting assert not result.startswith("Error") async def test_search_variants_no_results(anyio_backend): """Test search_variants function with a query that returns no results.""" query = VariantQuery(gene="UNKNOWN_XYZ") result = await search_variants(query, output_json=True) assert result == "[]" async def test_search_variants_with_limit(anyio_backend): """Test search_variants function with size limit.""" # Query with a small limit query = VariantQuery(gene="TP53", size=3) result = await search_variants(query) # Result should be valid but limited assert not result.startswith("Error") ``` -------------------------------------------------------------------------------- /tests/tdd/test_offline_mode.py: -------------------------------------------------------------------------------- ```python """Tests for offline mode functionality.""" import os from unittest.mock import patch import pytest from biomcp.http_client import RequestError, request_api @pytest.mark.asyncio async def test_offline_mode_blocks_requests(): """Test that offline mode prevents HTTP requests.""" # Set offline mode with patch.dict(os.environ, {"BIOMCP_OFFLINE": "true"}): # Try to make a request result, error = await request_api( url="https://api.example.com/test", request={"test": "data"}, cache_ttl=0, # Disable caching for this test ) # Should get an error assert result is None assert error is not None assert isinstance(error, RequestError) assert error.code == 503 assert "Offline mode enabled" in error.message @pytest.mark.asyncio async def test_offline_mode_allows_cached_responses(): """Test that offline mode still returns cached responses.""" # First, cache a response (with offline mode disabled) with ( patch.dict(os.environ, {"BIOMCP_OFFLINE": "false"}), patch("biomcp.http_client.call_http") as mock_call, ): mock_call.return_value = (200, '{"data": "cached"}') # Make a request to cache it result, error = await request_api( url="https://api.example.com/cached", request={"test": "data"}, cache_ttl=3600, # Cache for 1 hour ) assert result == {"data": "cached"} assert error is None # Now enable offline mode with patch.dict(os.environ, {"BIOMCP_OFFLINE": "true"}): # Try to get the same request - should return cached result result, error = await request_api( url="https://api.example.com/cached", request={"test": "data"}, cache_ttl=3600, ) # Should get the cached response assert result == {"data": "cached"} assert error is None @pytest.mark.asyncio async def test_offline_mode_case_insensitive(): """Test that offline mode environment variable is case insensitive.""" test_values = ["TRUE", "True", "1", "yes", "YES", "Yes"] for value in test_values: with patch.dict(os.environ, {"BIOMCP_OFFLINE": value}): result, error = await request_api( url="https://api.example.com/test", request={"test": "data"}, cache_ttl=0, ) assert result is None assert error is not None assert error.code == 503 assert "Offline mode enabled" in error.message @pytest.mark.asyncio async def test_offline_mode_disabled_by_default(): """Test that offline mode is disabled by default.""" # Clear the environment variable with ( patch.dict(os.environ, {}, clear=True), patch("biomcp.http_client.call_http") as mock_call, ): mock_call.return_value = (200, '{"data": "response"}') result, error = await request_api( url="https://api.example.com/test", request={"test": "data"}, cache_ttl=0, ) # Should make the request successfully assert result == {"data": "response"} assert error is None mock_call.assert_called_once() @pytest.mark.asyncio async def test_offline_mode_with_endpoint_tracking(): """Test that offline mode works with endpoint tracking.""" with patch.dict(os.environ, {"BIOMCP_OFFLINE": "true"}): result, error = await request_api( url="https://www.ncbi.nlm.nih.gov/research/pubtator3-api/search/", request={"text": "BRAF"}, endpoint_key="pubtator3_search", cache_ttl=0, ) assert result is None assert error is not None assert error.code == 503 assert "pubtator3-api/search/" in error.message ``` -------------------------------------------------------------------------------- /src/biomcp/variants/links.py: -------------------------------------------------------------------------------- ```python """Functions for adding database links to variant data.""" from typing import Any def _calculate_vcf_end(variant: dict[str, Any]) -> int: """Calculate the end position for UCSC Genome Browser link.""" if "vcf" not in variant: return 0 vcf = variant["vcf"] pos = int(vcf.get("position", 0)) ref = vcf.get("ref", "") alt = vcf.get("alt", "") # For insertions/deletions, handle special cases if not ref and alt: # insertion return pos + 1 elif ref and not alt: # deletion return pos + len(ref) else: # substitution return pos + max(0, ((len(alt) + 1) - len(ref))) def _get_first_value(data: Any) -> Any: """Get the first value from a list or return the value itself.""" if isinstance(data, list) and data: return data[0] return data def _ensure_url_section(variant: dict[str, Any]) -> None: """Ensure the URL section exists in the variant.""" if "url" not in variant: variant["url"] = {} def _add_dbsnp_links(variant: dict[str, Any]) -> None: """Add dbSNP and Ensembl links if rsid is present.""" if "dbsnp" in variant and variant["dbsnp"].get("rsid"): variant["dbsnp"]["url"] = ( f"https://www.ncbi.nlm.nih.gov/snp/{variant['dbsnp']['rsid']}" ) _ensure_url_section(variant) variant["url"]["ensembl"] = ( f"https://ensembl.org/Homo_sapiens/Variation/Explore?v={variant['dbsnp']['rsid']}" ) def _add_clinvar_link(variant: dict[str, Any]) -> None: """Add ClinVar link if variant_id is present.""" if "clinvar" in variant and variant["clinvar"].get("variant_id"): variant["clinvar"]["url"] = ( f"https://www.ncbi.nlm.nih.gov/clinvar/variation/{variant['clinvar']['variant_id']}/" ) def _add_cosmic_link(variant: dict[str, Any]) -> None: """Add COSMIC link if cosmic_id is present.""" if "cosmic" in variant and variant["cosmic"].get("cosmic_id"): variant["cosmic"]["url"] = ( f"https://cancer.sanger.ac.uk/cosmic/mutation/overview?id={variant['cosmic']['cosmic_id']}" ) def _add_civic_link(variant: dict[str, Any]) -> None: """Add CIViC link if id is present.""" if "civic" in variant and variant["civic"].get("id"): variant["civic"]["url"] = ( f"https://civicdb.org/variants/{variant['civic']['id']}/summary" ) def _add_ucsc_link(variant: dict[str, Any]) -> None: """Add UCSC Genome Browser link if chromosome and position are present.""" if ( "chrom" in variant and "vcf" in variant and variant["vcf"].get("position") ): vcf_end = _calculate_vcf_end(variant) _ensure_url_section(variant) variant["url"]["ucsc_genome_browser"] = ( f"https://genome.ucsc.edu/cgi-bin/hgTracks?db=hg19&" f"position=chr{variant['chrom']}:{variant['vcf']['position']}-{vcf_end}" ) def _add_hgnc_link(variant: dict[str, Any]) -> None: """Add HGNC link if gene name is present.""" if "dbnsfp" in variant and variant["dbnsfp"].get("genename"): gene = _get_first_value(variant["dbnsfp"]["genename"]) if gene: _ensure_url_section(variant) variant["url"]["hgnc"] = ( f"https://www.genenames.org/data/gene-symbol-report/#!/symbol/{gene}" ) def inject_links(variants: list[dict[str, Any]]) -> list[dict[str, Any]]: """ Inject database links into variant data. Args: variants: List of variant dictionaries from MyVariant.info API Returns: List of variant dictionaries with added URL links in appropriate sections """ for variant in variants: _add_dbsnp_links(variant) _add_clinvar_link(variant) _add_cosmic_link(variant) _add_civic_link(variant) _add_ucsc_link(variant) _add_hgnc_link(variant) return variants ``` -------------------------------------------------------------------------------- /src/biomcp/organizations/getter.py: -------------------------------------------------------------------------------- ```python """Get specific organization details via NCI CTS API.""" import logging from typing import Any from ..constants import NCI_ORGANIZATIONS_URL from ..integrations.cts_api import CTSAPIError, make_cts_request logger = logging.getLogger(__name__) async def get_organization( org_id: str, api_key: str | None = None, ) -> dict[str, Any]: """ Get detailed information about a specific organization. Args: org_id: Organization ID api_key: Optional API key (if not provided, uses NCI_API_KEY env var) Returns: Dictionary with organization details Raises: CTSAPIError: If the API request fails or organization not found """ try: # Make API request url = f"{NCI_ORGANIZATIONS_URL}/{org_id}" response = await make_cts_request( url=url, api_key=api_key, ) # Return the organization data # Handle different possible response formats if "data" in response: return response["data"] elif "organization" in response: return response["organization"] else: return response except CTSAPIError: raise except Exception as e: logger.error(f"Failed to get organization {org_id}: {e}") raise CTSAPIError(f"Failed to retrieve organization: {e!s}") from e def _format_address_fields(org: dict[str, Any]) -> list[str]: """Extract and format address fields from organization data.""" address_fields = [] if org.get("address"): addr = org["address"] if isinstance(addr, dict): fields = [ addr.get("street", ""), addr.get("city", ""), addr.get("state", ""), addr.get("zip", ""), ] address_fields = [f for f in fields if f] country = addr.get("country", "") if country and country != "United States": address_fields.append(country) else: # Try individual fields city = org.get("city", "") state = org.get("state", "") address_fields = [p for p in [city, state] if p] return address_fields def _format_contact_info(org: dict[str, Any]) -> list[str]: """Format contact information lines.""" lines = [] if org.get("phone"): lines.append(f"- **Phone**: {org['phone']}") if org.get("email"): lines.append(f"- **Email**: {org['email']}") if org.get("website"): lines.append(f"- **Website**: {org['website']}") return lines def format_organization_details(org: dict[str, Any]) -> str: """ Format organization details as markdown. Args: org: Organization data dictionary Returns: Formatted markdown string """ # Extract fields with defaults org_id = org.get("id", org.get("org_id", "Unknown")) name = org.get("name", "Unknown Organization") org_type = org.get("type", org.get("category", "Unknown")) # Build markdown output lines = [ f"## Organization: {name}", "", "### Basic Information", f"- **ID**: {org_id}", f"- **Type**: {org_type}", ] # Add location if available address_fields = _format_address_fields(org) if address_fields: lines.append(f"- **Location**: {', '.join(address_fields)}") # Add contact info lines.extend(_format_contact_info(org)) # Add description if available if org.get("description"): lines.extend([ "", "### Description", org["description"], ]) # Add parent organization metadata if org.get("parent_org"): lines.extend([ "", "### Parent Organization", f"- **Name**: {org['parent_org'].get('name', 'Unknown')}", f"- **ID**: {org['parent_org'].get('id', 'Unknown')}", ]) return "\n".join(lines) ``` -------------------------------------------------------------------------------- /tests/tdd/utils/test_request_cache.py: -------------------------------------------------------------------------------- ```python """Tests for request caching utilities.""" import asyncio import pytest from biomcp.utils.request_cache import ( clear_cache, get_cached, request_cache, set_cached, ) class TestRequestCache: """Test request caching functionality.""" @pytest.fixture(autouse=True) async def clear_cache_before_test(self): """Clear cache before each test.""" await clear_cache() yield await clear_cache() @pytest.mark.asyncio async def test_basic_caching(self): """Test basic cache get/set operations.""" # Initially should be empty result = await get_cached("test_key") assert result is None # Set a value await set_cached("test_key", "test_value", ttl=10) # Should retrieve the value result = await get_cached("test_key") assert result == "test_value" @pytest.mark.asyncio async def test_cache_expiry(self): """Test that cached values expire.""" # Set with very short TTL await set_cached("test_key", "test_value", ttl=0.1) # Should be available immediately result = await get_cached("test_key") assert result == "test_value" # Wait for expiry await asyncio.sleep(0.2) # Should be expired result = await get_cached("test_key") assert result is None @pytest.mark.asyncio async def test_request_cache_decorator(self): """Test the @request_cache decorator.""" call_count = 0 @request_cache(ttl=10) async def expensive_function(arg1, arg2): nonlocal call_count call_count += 1 return f"{arg1}-{arg2}-{call_count}" # First call should execute function result1 = await expensive_function("a", "b") assert result1 == "a-b-1" assert call_count == 1 # Second call with same args should use cache result2 = await expensive_function("a", "b") assert result2 == "a-b-1" # Same result assert call_count == 1 # Function not called again # Different args should execute function result3 = await expensive_function("c", "d") assert result3 == "c-d-2" assert call_count == 2 @pytest.mark.asyncio async def test_skip_cache_option(self): """Test that skip_cache bypasses caching.""" call_count = 0 @request_cache(ttl=10) async def cached_function(): nonlocal call_count call_count += 1 return call_count # Normal call - cached result1 = await cached_function() assert result1 == 1 # Skip cache - new execution result2 = await cached_function(skip_cache=True) assert result2 == 2 # Normal call again - still cached result3 = await cached_function() assert result3 == 1 @pytest.mark.asyncio async def test_none_values_not_cached(self): """Test that None return values are not cached.""" call_count = 0 @request_cache(ttl=10) async def sometimes_none_function(return_none=False): nonlocal call_count call_count += 1 return None if return_none else call_count # Return None - should not cache result1 = await sometimes_none_function(return_none=True) assert result1 is None assert call_count == 1 # Call again - should execute again (not cached) result2 = await sometimes_none_function(return_none=True) assert result2 is None assert call_count == 2 # Return value - should cache result3 = await sometimes_none_function(return_none=False) assert result3 == 3 assert call_count == 3 # Call again - should use cache result4 = await sometimes_none_function(return_none=False) assert result4 == 3 assert call_count == 3 ``` -------------------------------------------------------------------------------- /docs/blog/ai-assisted-clinical-trial-search-analysis.md: -------------------------------------------------------------------------------- ```markdown # AI-Assisted Clinical Trial Search: How BioMCP Transforms Research Finding the right clinical trial for a research project has traditionally been a complex process requiring specialized knowledge of database syntax and medical terminology. BioMCP is changing this landscape by making clinical trial data accessible through natural language conversation. Video Link: [](https://www.youtube.com/watch?v=jqGXXnVesjg&list=PLu1amIF_MEfPWhhEsXSuBi90S_xtmVJIW&index=2) ## Breaking Down the Barriers to Clinical Trial Information BioMCP serves as a specialized Model Context Protocol (MCP) server that empowers AI assistants and agents with tools to interact with critical biomedical resources. For clinical trials specifically, BioMCP connects to the ClinicalTrials.gov API, allowing researchers and clinicians to search and retrieve trial information through simple conversational queries. The power of this approach becomes apparent when we look at how it transforms a complex search requirement. Imagine needing to find active clinical trials for pembrolizumab (a cancer immunotherapy drug) specifically for non-small cell lung carcinoma near Cleveland, Ohio. Traditionally, this would require: 1. Navigating to ClinicalTrials.gov 2. Understanding the proper search fields and syntax 3. Creating multiple filters for intervention (pembrolizumab), condition ( non-small cell lung carcinoma), status (recruiting), and location (Cleveland area) 4. Interpreting the results ## From Natural Language to Precise Database Queries With BioMCP, this entire process is streamlined into a simple natural language request. The underlying large language model (LLM) interprets the query, identifies the key entities (drug name, cancer type, location), and translates these into the precise parameters needed for the ClinicalTrials.gov API. The system returns relevant trials that match all criteria, presenting them in an easy-to-understand format. But the interaction doesn't end there—BioMCP maintains context throughout the conversation, enabling follow-up questions like: - Where exactly are these trials located and how far are they from downtown Cleveland? - What biomarker eligibility criteria do these trials require? - Are there exclusion criteria I should be aware of? For each of these questions, BioMCP calls the appropriate tool (trial locations, trial protocols) and processes the information to provide meaningful answers without requiring the user to navigate different interfaces or learn new query languages. ## Beyond Basic Search: Understanding Trial Details What truly sets BioMCP apart is its ability to go beyond simple listings. When asked about biomarker eligibility criteria, the system can extract this information from the full trial protocol, synthesize it, and present a clear summary of requirements. This capability transforms what would typically be hours of reading dense clinical documentation into a conversational exchange that delivers precisely what the researcher needs. ## Transforming Clinical Research Workflows The implications for clinical research are significant. By lowering the technical barriers to accessing trial information, BioMCP can help: - Researchers understand the landscape of current research in their field - Research teams identify promising studies more efficiently - Clinical research organizations track competing or complementary trials - Research coordinators identify potential recruitment sites based on location As part of the broader BioMCP ecosystem—which also includes access to genomic variant information and PubMed literature—this clinical trial search capability represents a fundamental shift in how we interact with biomedical information. By bringing the power of natural language processing to specialized databases, BioMCP is helping to democratize access to critical health information and accelerate the research process. ``` -------------------------------------------------------------------------------- /src/biomcp/utils/query_utils.py: -------------------------------------------------------------------------------- ```python """Utilities for query parsing and manipulation.""" import re from typing import Any def parse_or_query(query: str) -> list[str]: """Parse OR query into individual search terms. Handles formats like: - "term1 OR term2" - 'term1 OR term2 OR "term with spaces"' - "TERM1 or term2 or term3" (case insensitive) Args: query: Query string that may contain OR operators Returns: List of individual search terms with quotes and whitespace cleaned Examples: >>> parse_or_query("PD-L1 OR CD274") ['PD-L1', 'CD274'] >>> parse_or_query('BRAF OR "v-raf murine" OR ARAF') ['BRAF', 'v-raf murine', 'ARAF'] """ # Split by OR (case insensitive) terms = re.split(r"\s+OR\s+", query, flags=re.IGNORECASE) # Clean up each term - remove quotes and extra whitespace cleaned_terms = [] for term in terms: # Remove surrounding quotes (both single and double) term = term.strip().strip('"').strip("'").strip() if term: cleaned_terms.append(term) return cleaned_terms def contains_or_operator(query: str) -> bool: """Check if a query contains OR operators. Args: query: Query string to check Returns: True if query contains " OR " or " or ", False otherwise """ return " OR " in query or " or " in query async def search_with_or_support( query: str, search_func: Any, search_params: dict[str, Any], id_field: str = "id", fallback_id_field: str | None = None, ) -> dict[str, Any]: """Generic OR query search handler. This function handles OR queries by making multiple API calls and combining results. Args: query: Query string that may contain OR operators search_func: Async search function to call for each term search_params: Base parameters to pass to search function (excluding the query term) id_field: Primary field name for deduplication (default: "id") fallback_id_field: Alternative field name if primary is missing Returns: Combined results from all searches with duplicates removed """ # Check if this is an OR query if contains_or_operator(query): search_terms = parse_or_query(query) else: search_terms = [query] # Collect all unique results all_results = {} total_found = 0 # Search for each term for term in search_terms: try: # Call the search function with the term results = await search_func(**{**search_params, "name": term}) # Extract results list (handle different response formats) items_key = None for key in [ "biomarkers", "organizations", "interventions", "diseases", "data", "items", ]: if key in results: items_key = key break if not items_key: continue # Add unique items (deduplicate by ID) for item in results.get(items_key, []): item_id = item.get(id_field) if not item_id and fallback_id_field: item_id = item.get(fallback_id_field) if item_id and item_id not in all_results: all_results[item_id] = item total_found += results.get("total", 0) except Exception as e: # Log the error and continue with other terms import logging logger = logging.getLogger(__name__) logger.warning(f"Failed to search for term '{term}': {e}") continue # Convert back to list unique_items = list(all_results.values()) # Return in standard format return { "items": unique_items, "total": len(unique_items), "search_terms": search_terms, "total_found_across_terms": total_found, } ``` -------------------------------------------------------------------------------- /tests/tdd/test_endpoint_documentation.py: -------------------------------------------------------------------------------- ```python """Test that endpoint documentation is kept up to date.""" import subprocess import sys from pathlib import Path class TestEndpointDocumentation: """Test the endpoint documentation generation.""" def test_third_party_endpoints_file_exists(self): """Test that THIRD_PARTY_ENDPOINTS.md exists.""" endpoints_file = ( Path(__file__).parent.parent.parent / "THIRD_PARTY_ENDPOINTS.md" ) assert endpoints_file.exists(), "THIRD_PARTY_ENDPOINTS.md must exist" def test_endpoints_documentation_is_current(self): """Test that the endpoints documentation can be generated without errors.""" # Run the generation script script_path = ( Path(__file__).parent.parent.parent / "scripts" / "generate_endpoints_doc.py" ) result = subprocess.run( # noqa: S603 [sys.executable, str(script_path)], capture_output=True, text=True, check=False, ) assert result.returncode == 0, f"Script failed: {result.stderr}" # The script should report that it generated the file assert ( "Generated" in result.stdout or result.stdout == "" ), f"Unexpected output: {result.stdout}" def test_all_endpoints_documented(self): """Test that all endpoints in the registry are documented.""" from biomcp.utils.endpoint_registry import get_registry registry = get_registry() endpoints = registry.get_all_endpoints() # Read the documentation endpoints_file = ( Path(__file__).parent.parent.parent / "THIRD_PARTY_ENDPOINTS.md" ) content = endpoints_file.read_text() # Check each endpoint is mentioned for key, info in endpoints.items(): assert key in content, f"Endpoint {key} not found in documentation" assert ( info.url in content ), f"URL {info.url} not found in documentation" def test_documentation_contains_required_sections(self): """Test that documentation contains all required sections.""" endpoints_file = ( Path(__file__).parent.parent.parent / "THIRD_PARTY_ENDPOINTS.md" ) content = endpoints_file.read_text() required_sections = [ "# Third-Party Endpoints Used by BioMCP", "## Overview", "## Endpoints by Category", "### Biomedical Literature", "### Clinical Trials", "### Variant Databases", "### Cancer Genomics", "## Domain Summary", "## Compliance and Privacy", "## Network Control", "BIOMCP_OFFLINE", ] for section in required_sections: assert ( section in content ), f"Required section '{section}' not found in documentation" def test_endpoint_counts_accurate(self): """Test that endpoint counts in the overview are accurate.""" from biomcp.utils.endpoint_registry import get_registry registry = get_registry() endpoints = registry.get_all_endpoints() domains = registry.get_unique_domains() endpoints_file = ( Path(__file__).parent.parent.parent / "THIRD_PARTY_ENDPOINTS.md" ) content = endpoints_file.read_text() # Extract counts from overview import re match = re.search( r"BioMCP connects to (\d+) external domains across (\d+) endpoints", content, ) assert match, "Could not find endpoint counts in overview" doc_domains = int(match.group(1)) doc_endpoints = int(match.group(2)) assert ( doc_domains == len(domains) ), f"Document says {doc_domains} domains but registry has {len(domains)}" assert ( doc_endpoints == len(endpoints) ), f"Document says {doc_endpoints} endpoints but registry has {len(endpoints)}" ``` -------------------------------------------------------------------------------- /src/biomcp/cli/organizations.py: -------------------------------------------------------------------------------- ```python """CLI commands for organization search and lookup.""" import asyncio from typing import Annotated import typer from ..integrations.cts_api import CTSAPIError, get_api_key_instructions from ..organizations import get_organization, search_organizations from ..organizations.getter import format_organization_details from ..organizations.search import format_organization_results organization_app = typer.Typer( no_args_is_help=True, help="Search and retrieve organization information from NCI CTS API", ) @organization_app.command("search") def search_organizations_cli( name: Annotated[ str | None, typer.Argument( help="Organization name to search for (partial match supported)" ), ] = None, org_type: Annotated[ str | None, typer.Option( "--type", help="Type of organization (e.g., industry, academic)", ), ] = None, city: Annotated[ str | None, typer.Option( "--city", help="City location", ), ] = None, state: Annotated[ str | None, typer.Option( "--state", help="State location (2-letter code)", ), ] = None, page_size: Annotated[ int, typer.Option( "--page-size", help="Number of results per page", min=1, max=100, ), ] = 20, page: Annotated[ int, typer.Option( "--page", help="Page number", min=1, ), ] = 1, api_key: Annotated[ str | None, typer.Option( "--api-key", help="NCI API key (overrides NCI_API_KEY env var)", envvar="NCI_API_KEY", ), ] = None, ) -> None: """ Search for organizations in the NCI Clinical Trials database. Examples: # Search by name biomcp organization search "MD Anderson" # Search by type biomcp organization search --type academic # Search by location biomcp organization search --city Boston --state MA # Combine filters biomcp organization search Cancer --type industry --state CA """ try: results = asyncio.run( search_organizations( name=name, org_type=org_type, city=city, state=state, page_size=page_size, page=page, api_key=api_key, ) ) output = format_organization_results(results) typer.echo(output) except CTSAPIError as e: if "API key required" in str(e): typer.echo(get_api_key_instructions()) else: typer.echo(f"Error: {e}", err=True) raise typer.Exit(1) from e except Exception as e: typer.echo(f"Unexpected error: {e}", err=True) raise typer.Exit(1) from e @organization_app.command("get") def get_organization_cli( org_id: Annotated[ str, typer.Argument(help="Organization ID"), ], api_key: Annotated[ str | None, typer.Option( "--api-key", help="NCI API key (overrides NCI_API_KEY env var)", envvar="NCI_API_KEY", ), ] = None, ) -> None: """ Get detailed information about a specific organization. Example: biomcp organization get ORG123456 """ try: org_data = asyncio.run( get_organization( org_id=org_id, api_key=api_key, ) ) output = format_organization_details(org_data) typer.echo(output) except CTSAPIError as e: if "API key required" in str(e): typer.echo(get_api_key_instructions()) else: typer.echo(f"Error: {e}", err=True) raise typer.Exit(1) from e except Exception as e: typer.echo(f"Unexpected error: {e}", err=True) raise typer.Exit(1) from e ``` -------------------------------------------------------------------------------- /tests/bdd/search_variants/test_search.py: -------------------------------------------------------------------------------- ```python import json import shlex from typing import Any from assertpy import assert_that from pytest_bdd import parsers, scenarios, then, when from typer.testing import CliRunner from biomcp.cli import app scenarios("search.feature") runner = CliRunner() # Field mapping - Updated chromosome key FIELD_MAP = { "chromosome": ["chrom"], "frequency": ["gnomad_exome", "af", "af"], "gene": ["dbnsfp", "genename"], "hgvsc": ["dbnsfp", "hgvsc"], "hgvsp": ["dbnsfp", "hgvsp"], "cadd": ["cadd", "phred"], "polyphen": ["dbnsfp", "polyphen2", "hdiv", "pred"], "position": ["vcf", "position"], "rsid": ["dbsnp", "rsid"], "sift": ["dbnsfp", "sift", "pred"], "significance": ["clinvar", "rcv", "clinical_significance"], "uniprot_id": ["mutdb", "uniprot_id"], } def get_value(data: dict, key: str) -> Any | None: """Extract value from nested dictionary using field mapping.""" key_path = FIELD_MAP.get(key, [key]) current_value = data.get("hits") for key in key_path: if isinstance(current_value, dict): current_value = current_value.get(key) elif isinstance(current_value, list): current_value = current_value[0].get(key) if current_value and isinstance(current_value, list): return current_value[0] return current_value # --- @when Step --- @when( parsers.re(r'I run "(?P<command>.*?)"(?: #.*)?$'), target_fixture="variants_data", ) def variants_data(command) -> dict: """Run variant search command with --json and return parsed results.""" args = shlex.split(command)[1:] # trim 'biomcp' args += ["--json"] if "--size" not in args: args.extend(["--size", "10"]) result = runner.invoke(app, args, catch_exceptions=False) assert result.exit_code == 0, "CLI command failed" data = json.loads(result.stdout) return data def normalize(v): try: return float(v) except ValueError: try: return int(v) except ValueError: return v.lower() @then( parsers.re( r"each variant should have (?P<field>\w+) that (?P<operator>(?:is|equal|to|contains|greater|less|than|or|\s)+)\s+(?P<expected>.+)$" ) ) def check_variant_field(it, variants_data, field, operator, expected): """ For each variant, apply an assertpy operator against a given field. Supports operator names with spaces (e.g. "is equal to") or underscores (e.g. "is_equal_to"). """ # Normalize operator: lower case and replace spaces with underscores. operator = operator.strip().lower().replace(" ", "_") successes = set() failures = set() for v_num, value in it(FIELD_MAP, variants_data, field): value = normalize(value) expected = normalize(expected) f = getattr(assert_that(value), operator) try: f(expected) successes.add(v_num) except AssertionError: failures.add(v_num) failures -= successes assert len(failures) == 0, f"Failure: {field} {operator} {expected}" @then( parsers.re( r"the number of variants (?P<operator>(?:is|equal|to|contains|greater|less|than|or|\s)+)\s+(?P<expected>\d+)$" ) ) def number_of_variants_check(variants_data, operator, expected): """Check the number of variants returned.""" if ( isinstance(variants_data, list) and len(variants_data) == 1 and "error" in variants_data[0] ): count = 0 # If we have an error response, count as 0 variants elif isinstance(variants_data, dict) and "variants" in variants_data: # Handle new format with cBioPortal summary count = len(variants_data["variants"]) elif isinstance(variants_data, dict) and "hits" in variants_data: # Handle myvariant.info response format count = len(variants_data["hits"]) else: count = len(variants_data) if isinstance(variants_data, list) else 0 operator = operator.strip().lower().replace(" ", "_") f = getattr(assert_that(count), operator) f(int(expected)) ``` -------------------------------------------------------------------------------- /src/biomcp/cli/diseases.py: -------------------------------------------------------------------------------- ```python """CLI commands for disease information and search.""" import asyncio from typing import Annotated import typer from ..diseases import get_disease from ..diseases.search import format_disease_results, search_diseases from ..integrations.cts_api import CTSAPIError, get_api_key_instructions disease_app = typer.Typer( no_args_is_help=True, help="Search and retrieve disease information", ) @disease_app.command("get") def get_disease_cli( disease_name: Annotated[ str, typer.Argument(help="Disease name or identifier"), ], ) -> None: """ Get disease information from MyDisease.info. This returns detailed information including synonyms, definitions, and database cross-references. Examples: biomcp disease get melanoma biomcp disease get "lung cancer" biomcp disease get GIST """ result = asyncio.run(get_disease(disease_name)) typer.echo(result) @disease_app.command("search") def search_diseases_cli( name: Annotated[ str | None, typer.Argument( help="Disease name to search for (partial match supported)" ), ] = None, include_synonyms: Annotated[ bool, typer.Option( "--synonyms/--no-synonyms", help="[Deprecated] This option is ignored - API always searches synonyms", ), ] = True, category: Annotated[ str | None, typer.Option( "--category", help="Disease category/type filter", ), ] = None, page_size: Annotated[ int, typer.Option( "--page-size", help="Number of results per page", min=1, max=100, ), ] = 20, page: Annotated[ int, typer.Option( "--page", help="Page number", min=1, ), ] = 1, api_key: Annotated[ str | None, typer.Option( "--api-key", help="NCI API key (overrides NCI_API_KEY env var)", envvar="NCI_API_KEY", ), ] = None, source: Annotated[ str, typer.Option( "--source", help="Data source: 'mydisease' (default) or 'nci'", show_choices=True, ), ] = "mydisease", ) -> None: """ Search for diseases in MyDisease.info or NCI CTS database. The NCI source provides controlled vocabulary of cancer conditions used in clinical trials, with official terms and synonyms. Examples: # Search MyDisease.info (default) biomcp disease search melanoma # Search NCI cancer terms biomcp disease search melanoma --source nci # Search without synonyms biomcp disease search "breast cancer" --no-synonyms --source nci # Filter by category biomcp disease search --category neoplasm --source nci """ if source == "nci": # Use NCI CTS API try: results = asyncio.run( search_diseases( name=name, include_synonyms=include_synonyms, category=category, page_size=page_size, page=page, api_key=api_key, ) ) output = format_disease_results(results) typer.echo(output) except CTSAPIError as e: if "API key required" in str(e): typer.echo(get_api_key_instructions()) else: typer.echo(f"Error: {e}", err=True) raise typer.Exit(1) from e except Exception as e: typer.echo(f"Unexpected error: {e}", err=True) raise typer.Exit(1) from e else: # Default to MyDisease.info # For now, just search by name if name: result = asyncio.run(get_disease(name)) typer.echo(result) else: typer.echo("Please provide a disease name to search for.") raise typer.Exit(1) ``` -------------------------------------------------------------------------------- /tests/tdd/test_mcp_tools.py: -------------------------------------------------------------------------------- ```python """Tests for MCP tool wrappers.""" import json from unittest.mock import patch import pytest from biomcp.articles.search import _article_searcher class TestArticleSearcherMCPTool: """Test the _article_searcher MCP tool.""" @pytest.mark.asyncio async def test_article_searcher_with_all_params(self): """Test article_searcher with all parameters.""" mock_results = [{"title": "Test Article", "pmid": 12345}] with patch( "biomcp.articles.search_optimized.article_searcher_optimized" ) as mock_search: mock_search.return_value = json.dumps(mock_results) await _article_searcher( call_benefit="Testing search functionality", chemicals="aspirin,ibuprofen", diseases="cancer,diabetes", genes="BRAF,TP53", keywords="mutation,therapy", variants="V600E,R175H", include_preprints=True, ) # Verify the function was called mock_search.assert_called_once() # Check the parameters were passed correctly kwargs = mock_search.call_args[1] assert kwargs["call_benefit"] == "Testing search functionality" assert kwargs["chemicals"] == "aspirin,ibuprofen" assert kwargs["diseases"] == "cancer,diabetes" assert kwargs["genes"] == "BRAF,TP53" assert kwargs["keywords"] == "mutation,therapy" assert kwargs["variants"] == "V600E,R175H" assert kwargs["include_preprints"] is True assert kwargs.get("include_cbioportal", True) is True @pytest.mark.asyncio async def test_article_searcher_with_lists(self): """Test article_searcher with list inputs.""" with patch( "biomcp.articles.search_optimized.article_searcher_optimized" ) as mock_search: mock_search.return_value = "## Results" await _article_searcher( call_benefit="Testing with lists", chemicals=["drug1", "drug2"], diseases=["disease1"], genes=["GENE1"], include_preprints=False, ) # Check list parameters were passed correctly kwargs = mock_search.call_args[1] assert kwargs["call_benefit"] == "Testing with lists" assert kwargs["chemicals"] == ["drug1", "drug2"] assert kwargs["diseases"] == ["disease1"] assert kwargs["genes"] == ["GENE1"] assert kwargs["include_preprints"] is False @pytest.mark.asyncio async def test_article_searcher_minimal_params(self): """Test article_searcher with minimal parameters.""" with patch( "biomcp.articles.search_optimized.article_searcher_optimized" ) as mock_search: mock_search.return_value = "## No results" await _article_searcher(call_benefit="Minimal test") # Should still work with no search parameters kwargs = mock_search.call_args[1] assert kwargs["call_benefit"] == "Minimal test" assert kwargs.get("chemicals") is None assert kwargs.get("diseases") is None assert kwargs.get("genes") is None assert kwargs.get("keywords") is None assert kwargs.get("variants") is None @pytest.mark.asyncio async def test_article_searcher_empty_strings(self): """Test article_searcher with empty strings.""" with patch( "biomcp.articles.search_optimized.article_searcher_optimized" ) as mock_search: mock_search.return_value = "## Results" await _article_searcher( call_benefit="Empty string test", chemicals="", diseases="", genes="", ) # Empty strings are passed through kwargs = mock_search.call_args[1] assert kwargs["call_benefit"] == "Empty string test" assert kwargs["chemicals"] == "" assert kwargs["diseases"] == "" assert kwargs["genes"] == "" ``` -------------------------------------------------------------------------------- /docs/developer-guides/07-performance-optimizations.md: -------------------------------------------------------------------------------- ```markdown # Performance Optimizations This document describes the performance optimizations implemented in BioMCP to improve response times and throughput. ## Overview BioMCP has been optimized for high-performance biomedical data retrieval through several key improvements: - **65% faster test execution** (from ~120s to ~42s) - **Reduced API calls** through intelligent caching and batching - **Lower latency** via connection pooling and prefetching - **Better resource utilization** with parallel processing ## Key Optimizations ### 1. Connection Pooling HTTP connections are now reused across requests, eliminating connection establishment overhead. **Configuration:** - `BIOMCP_USE_CONNECTION_POOL` - Enable/disable pooling (default: "true") - Automatically manages pools per event loop - Graceful cleanup on shutdown **Impact:** ~30% reduction in request latency for sequential operations ### 2. Parallel Test Execution Tests now run in parallel using pytest-xdist, dramatically reducing test suite execution time. **Usage:** ```bash make test # Automatically uses parallel execution ``` **Impact:** ~5x faster test execution ### 3. Request Batching Multiple API requests are batched together when possible, particularly for cBioPortal queries. **Features:** - Automatic batching based on size/time thresholds - Configurable batch size (default: 5 for cBioPortal) - Error isolation per request **Impact:** Up to 80% reduction in API calls for bulk operations ### 4. Smart Caching Multiple caching layers optimize repeated queries: - **LRU Cache:** Memory-bounded caching for recent requests - **Hash-based keys:** 10x faster cache key generation - **Shared validation context:** Eliminates redundant gene/entity validations **Configuration:** - Cache size: 1000 entries (configurable) - TTL: 5-30 minutes depending on data type ### 5. Prefetching Common entities are prefetched on startup to warm caches: - Top genes: BRAF, EGFR, TP53, KRAS, etc. - Common diseases: lung cancer, breast cancer, etc. - Frequent chemicals: osimertinib, pembrolizumab, etc. **Impact:** First queries for common entities are instant ### 6. Pagination Support Europe PMC searches now use pagination for large result sets: - Optimal page size: 25 results - Progressive loading - Memory-efficient processing ### 7. Conditional Metrics Performance metrics are only collected when explicitly enabled, reducing overhead. **Configuration:** - `BIOMCP_METRICS_ENABLED` - Enable metrics (default: "false") ## Performance Benchmarks ### API Response Times | Operation | Before | After | Improvement | | ------------------------------ | ------ | ----- | ----------- | | Single gene search | 850ms | 320ms | 62% | | Bulk variant lookup | 4.2s | 1.1s | 74% | | Article search with cBioPortal | 2.1s | 780ms | 63% | ### Resource Usage | Metric | Before | After | Improvement | | ------------- | ------ | ----- | ----------- | | Memory (idle) | 145MB | 152MB | +5% | | Memory (peak) | 512MB | 385MB | -25% | | CPU (avg) | 35% | 28% | -20% | ## Best Practices 1. **Keep connection pooling enabled** unless experiencing issues 2. **Use the unified search** methods to benefit from parallel execution 3. **Batch operations** when performing multiple lookups 4. **Monitor cache hit rates** in production environments ## Troubleshooting ### Connection Pool Issues If experiencing connection errors: 1. Disable pooling: `export BIOMCP_USE_CONNECTION_POOL=false` 2. Check for firewall/proxy issues 3. Verify SSL certificates ### Memory Usage If memory usage is high: 1. Reduce cache size in `request_cache.py` 2. Lower connection pool limits 3. Disable prefetching by removing the lifespan hook ### Performance Regression To identify performance issues: 1. Enable metrics: `export BIOMCP_METRICS_ENABLED=true` 2. Check slow operations in logs 3. Profile with `py-spy` or similar tools ## Future Optimizations Planned improvements include: - GraphQL batching for complex queries - Redis integration for distributed caching - WebSocket support for real-time updates - GPU acceleration for variant analysis ``` -------------------------------------------------------------------------------- /docs/tutorials/remote-connection.md: -------------------------------------------------------------------------------- ```markdown # Connecting to Remote BioMCP This guide walks you through connecting Claude to the remote BioMCP server, providing instant access to biomedical research tools without any local installation. ## Overview The remote BioMCP server (https://remote.biomcp.org/mcp) provides cloud-hosted access to all BioMCP tools. This eliminates the need for local installation while maintaining full functionality. !!! success "Benefits of Remote Connection" - **No Installation Required**: Start using BioMCP immediately - **Always Up-to-Date**: Automatically receive the latest features and improvements - **Cloud-Powered**: Leverage server-side resources for faster searches - **Secure Authentication**: Uses Google OAuth for secure access !!! info "Privacy Notice" We log user emails and queries to improve the service. All data is handled according to our privacy policy. ## Step-by-Step Setup ### Step 1: Access Custom Connectors Navigate to the **Custom Connectors** section in your Claude interface. This is where you'll configure the connection to BioMCP.  ### Step 2: Add Custom Connector Click the **Add Custom Connector** button and enter the following details: - **Name**: BioMCP - **URL**: `https://remote.biomcp.org/mcp`  ### Step 3: Verify Connector is Enabled After adding, you should see BioMCP listed with an "Enabled" status. This confirms the connector was added successfully.  ### Step 4: Connect to BioMCP Return to the main Connectors section where you'll now see BioMCP available for connection. Click the **Connect** button.  ### Step 5: Authenticate with Google You'll be redirected to Google OAuth for authentication. Sign in with any valid Google account. This step ensures secure access to the service.  !!! note "Authentication" - Any valid Google account works - Your email is logged for service improvement - Authentication is handled securely through Google OAuth ### Step 6: Connection Success Once authenticated, you'll see a successful connection message displaying the available tool count. As of January 2025, there are 23 tools available (this number may increase as new features are added).  ## Verifying Your Connection After successful connection, you can verify BioMCP is working by asking Claude: ``` What tools do you have available from BioMCP? ``` Claude should list the available tools including: - Article search and retrieval (PubMed/PubTator3) - Clinical trials search (ClinicalTrials.gov and NCI) - Variant analysis (MyVariant.info) - Gene, drug, and disease information - Sequential thinking for complex research ## Troubleshooting ### Connection Failed - Ensure you entered the URL exactly as shown: `https://remote.biomcp.org/mcp` - Check your internet connection - Try disconnecting and reconnecting ### Authentication Issues - Make sure you're using a valid Google account - Clear your browser cache if authentication hangs - Try using a different browser if issues persist ### Tools Not Available - Disconnect and reconnect to BioMCP - Refresh your Claude session - Contact support if tools remain unavailable ## Next Steps Now that you're connected to BioMCP, you can: 1. **Search biomedical literature**: "Find recent papers on BRAF mutations in melanoma" 2. **Analyze clinical trials**: "What trials are recruiting for lung cancer with EGFR mutations?" 3. **Interpret variants**: "What is the clinical significance of TP53 p.R273H?" 4. **Explore drug information**: "Tell me about pembrolizumab's mechanism and indications" ## Support For issues or questions about the remote BioMCP connection: - GitHub Issues: [https://github.com/genomoncology/biomcp/issues](https://github.com/genomoncology/biomcp/issues) - Documentation: [https://biomcp.org](https://biomcp.org) ``` -------------------------------------------------------------------------------- /tests/config/test_smithery_config.py: -------------------------------------------------------------------------------- ```python #!/usr/bin/env python """ Test script to validate Smithery configuration against actual function implementations. This script checks that the schema definitions in smithery.yaml match the expected function parameters in your codebase. """ import os from typing import Any import pytest import yaml from pydantic import BaseModel from biomcp.articles.search import PubmedRequest # Import the functions we want to test from biomcp.trials.search import TrialQuery from biomcp.variants.search import VariantQuery @pytest.fixture def smithery_config(): """Load the Smithery configuration.""" # Get the project root directory project_root = os.path.abspath( os.path.join(os.path.dirname(__file__), "../..") ) config_path = os.path.join(project_root, "smithery.yaml") with open(config_path) as f: return yaml.safe_load(f) def test_smithery_config(smithery_config): """Test that all tool schemas in smithery.yaml match the expected function parameters.""" # Functions to test and their expected parameter types functions_to_test = { "trial_searcher": {"param_name": "query", "expected_type": TrialQuery}, "variant_searcher": { "param_name": "query", "expected_type": VariantQuery, }, "article_searcher": { "param_name": "query", "expected_type": PubmedRequest, }, "trial_protocol": {"param_name": "nct_id", "expected_type": str}, "trial_locations": {"param_name": "nct_id", "expected_type": str}, "trial_outcomes": {"param_name": "nct_id", "expected_type": str}, "trial_references": {"param_name": "nct_id", "expected_type": str}, "article_details": {"param_name": "pmid", "expected_type": str}, "variant_details": {"param_name": "variant_id", "expected_type": str}, } for tool_name, param_info in functions_to_test.items(): validate_tool_schema(smithery_config, tool_name, param_info) def validate_tool_schema( smithery_config, tool_name: str, param_info: dict[str, Any] ): """Validate that the tool schema in smithery.yaml matches the expected function parameter.""" param_name = param_info["param_name"] expected_type = param_info["expected_type"] # Check if the tool is defined in the smithery.yaml assert tool_name in smithery_config.get( "tools", {} ), f"Tool '{tool_name}' is not defined in smithery.yaml" tool_config = smithery_config["tools"][tool_name] # Check if the tool has an input schema assert ( "input" in tool_config ), f"Tool '{tool_name}' does not have an input schema defined" input_schema = tool_config["input"].get("schema", {}) # Check if the parameter is required if issubclass(expected_type, BaseModel): # For complex types like TrialQuery, check if 'query' is required assert ( "required" in input_schema ), f"Tool '{tool_name}' does not have required parameters specified" assert ( "query" in input_schema.get("required", []) ), f"Parameter 'query' for tool '{tool_name}' is not marked as required" else: assert ( "required" in input_schema ), f"Tool '{tool_name}' does not have required parameters specified" assert ( param_name in input_schema.get("required", []) ), f"Parameter '{param_name}' for tool '{tool_name}' is not marked as required" # For complex types (Pydantic models), check if the schema references the correct type if issubclass(expected_type, BaseModel): properties = input_schema.get("properties", {}) assert ( "query" in properties ), f"Tool '{tool_name}' does not have a 'query' property defined" query_prop = properties["query"] assert ( "$ref" in query_prop ), f"Tool '{tool_name}' query property does not reference a schema" schema_ref = query_prop["$ref"] expected_schema_name = expected_type.__name__ assert schema_ref.endswith( expected_schema_name ), f"Tool '{tool_name}' references incorrect schema: {schema_ref}, expected: {expected_schema_name}" ``` -------------------------------------------------------------------------------- /scripts/check_http_imports.py: -------------------------------------------------------------------------------- ```python #!/usr/bin/env python3 """Check for direct HTTP library imports outside of allowed files.""" import ast import sys from pathlib import Path # HTTP libraries to check for HTTP_LIBRARIES = { "httpx", "aiohttp", "requests", "urllib3", } # Note: urllib is allowed for URL parsing # Files allowed to import HTTP libraries ALLOWED_FILES = { "http_client.py", "http_client_simple.py", "http_client_test.py", "test_http_client.py", "connection_pool.py", # Connection pooling infrastructure } # Additional allowed patterns (for version checks, etc.) ALLOWED_PATTERNS = { # Allow httpx import just for version check ("health.py", "httpx"): "version check only", } def _check_import_node( node: ast.Import, file_name: str ) -> set[tuple[str, int]]: """Check ast.Import node for violations.""" violations = set() for alias in node.names: module_name = alias.name.split(".")[0] if module_name in HTTP_LIBRARIES: pattern_key = (file_name, module_name) if pattern_key not in ALLOWED_PATTERNS: violations.add((module_name, node.lineno)) return violations def _check_import_from_node( node: ast.ImportFrom, file_name: str ) -> set[tuple[str, int]]: """Check ast.ImportFrom node for violations.""" violations = set() if node.module: module_name = node.module.split(".")[0] if module_name in HTTP_LIBRARIES: pattern_key = (file_name, module_name) if pattern_key not in ALLOWED_PATTERNS: violations.add((module_name, node.lineno)) return violations def check_imports(file_path: Path) -> set[tuple[str, int]]: """Check a Python file for HTTP library imports. Returns set of (library, line_number) tuples for violations. """ violations = set() # Check if this file is allowed if file_path.name in ALLOWED_FILES: return violations try: with open(file_path, encoding="utf-8") as f: content = f.read() tree = ast.parse(content) for node in ast.walk(tree): if isinstance(node, ast.Import): violations.update(_check_import_node(node, file_path.name)) elif isinstance(node, ast.ImportFrom): violations.update( _check_import_from_node(node, file_path.name) ) except Exception as e: print(f"Error parsing {file_path}: {e}", file=sys.stderr) return violations def find_python_files(root_dir: Path) -> list[Path]: """Find all Python files in the project.""" python_files = [] for path in root_dir.rglob("*.py"): # Skip virtual environments, cache, etc. if any( part.startswith(".") or part in ["__pycache__", "venv", "env", ".tox"] for part in path.parts ): continue python_files.append(path) return python_files def main(): """Main function to check all Python files.""" # Get project root (parent of scripts directory) script_dir = Path(__file__).parent project_root = script_dir.parent src_dir = project_root / "src" # Find all Python files python_files = find_python_files(src_dir) all_violations = [] for file_path in python_files: violations = check_imports(file_path) if violations: for lib, line in violations: all_violations.append((file_path, lib, line)) if all_violations: print("❌ Found direct HTTP library imports:\n") for file_path, lib, line in sorted(all_violations): rel_path = file_path.relative_to(project_root) print(f" {rel_path}:{line} - imports '{lib}'") print(f"\n❌ Total violations: {len(all_violations)}") print( "\nPlease use the centralized HTTP client (biomcp.http_client) instead." ) print( "If you need to add an exception, update ALLOWED_FILES or ALLOWED_PATTERNS in this script." ) return 1 else: print("✅ No direct HTTP library imports found outside allowed files.") return 0 if __name__ == "__main__": sys.exit(main()) ``` -------------------------------------------------------------------------------- /src/biomcp/variants/cbioportal_search_helpers.py: -------------------------------------------------------------------------------- ```python """Helper functions for cBioPortal search to reduce complexity.""" import logging import re from typing import Any from .cbioportal_search import GeneHotspot logger = logging.getLogger(__name__) async def process_mutation_results( mutation_results: list[tuple[Any, str]], cancer_types_lookup: dict[str, dict[str, Any]], client: Any, ) -> dict[str, Any]: """Process mutation results from multiple studies. Args: mutation_results: List of (result, study_id) tuples cancer_types_lookup: Cancer type lookup dictionary client: Client instance for API calls Returns: Dictionary with aggregated mutation data """ total_mutations = 0 total_samples = 0 hotspot_counts: dict[str, dict[str, Any]] = {} cancer_distribution: dict[str, int] = {} studies_with_data = 0 for result, study_id in mutation_results: if isinstance(result, Exception): logger.debug(f"Failed to get mutations for {study_id}: {result}") continue if result and "mutations" in result: mutations = result["mutations"] sample_count = result["sample_count"] if mutations: studies_with_data += 1 # Count unique samples with mutations unique_samples = { m.get("sampleId") for m in mutations if m.get("sampleId") } total_mutations += len(unique_samples) total_samples += sample_count # Process mutations for hotspots and cancer types study_cancer_type = await client._get_study_cancer_type( study_id, cancer_types_lookup ) _update_hotspot_counts( mutations, hotspot_counts, study_cancer_type ) _update_cancer_distribution( mutations, cancer_distribution, study_cancer_type ) return { "total_mutations": total_mutations, "total_samples": total_samples, "studies_with_data": studies_with_data, "hotspot_counts": hotspot_counts, "cancer_distribution": cancer_distribution, } def _update_hotspot_counts( mutations: list[dict[str, Any]], hotspot_counts: dict[str, dict[str, Any]], cancer_type: str, ) -> None: """Update hotspot counts from mutations.""" for mut in mutations: protein_change = mut.get("proteinChange", "") if protein_change: if protein_change not in hotspot_counts: hotspot_counts[protein_change] = { "count": 0, "cancer_types": set(), } hotspot_counts[protein_change]["count"] += 1 hotspot_counts[protein_change]["cancer_types"].add(cancer_type) def _update_cancer_distribution( mutations: list[dict[str, Any]], cancer_distribution: dict[str, int], cancer_type: str, ) -> None: """Update cancer type distribution.""" cancer_distribution[cancer_type] = cancer_distribution.get( cancer_type, 0 ) + len({m.get("sampleId") for m in mutations if m.get("sampleId")}) def format_hotspots( hotspot_counts: dict[str, dict[str, Any]], total_mutations: int ) -> list[GeneHotspot]: """Format hotspot counts into GeneHotspot objects.""" hotspots = [] for protein_change, data in sorted( hotspot_counts.items(), key=lambda x: x[1]["count"], reverse=True )[:5]: # Top 5 hotspots # Try to extract position from protein change position = 0 try: match = re.search(r"(\d+)", protein_change) if match: position = int(match.group(1)) except Exception: logger.debug("Failed to extract position from protein change") hotspots.append( GeneHotspot( position=position, amino_acid_change=protein_change, count=data["count"], frequency=data["count"] / total_mutations if total_mutations > 0 else 0.0, cancer_types=list(data["cancer_types"]), ) ) return hotspots ``` -------------------------------------------------------------------------------- /tests/tdd/workers/test_worker_sanitization.js: -------------------------------------------------------------------------------- ```javascript /** * Tests for worker_entry_stytch.js sanitization functionality */ const { test } = require("node:test"); const assert = require("node:assert"); // Mock the sanitizeObject function for testing const SENSITIVE_FIELDS = [ "api_key", "apiKey", "api-key", "token", "secret", "password", ]; const sanitizeObject = (obj) => { if (!obj || typeof obj !== "object") return obj; // Handle arrays if (Array.isArray(obj)) { return obj.map((item) => sanitizeObject(item)); } // Handle objects const sanitized = {}; for (const [key, value] of Object.entries(obj)) { // Check if this key is sensitive const lowerKey = key.toLowerCase(); if ( SENSITIVE_FIELDS.some((field) => lowerKey.includes(field.toLowerCase())) ) { sanitized[key] = "[REDACTED]"; } else if (typeof value === "object" && value !== null) { // Recursively sanitize nested objects sanitized[key] = sanitizeObject(value); } else { sanitized[key] = value; } } return sanitized; }; // Test cases test("should redact api_key field", () => { const input = { params: { arguments: { api_key: "AIzaSyB1234567890", gene: "BRAF", position: 140753336, }, }, }; const result = sanitizeObject(input); assert.strictEqual(result.params.arguments.api_key, "[REDACTED]"); assert.strictEqual(result.params.arguments.gene, "BRAF"); assert.strictEqual(result.params.arguments.position, 140753336); }); test("should handle nested sensitive fields", () => { const input = { outer: { token: "secret-token", inner: { password: "my-password", apiKey: "another-key", safe_field: "visible", }, }, }; const result = sanitizeObject(input); assert.strictEqual(result.outer.token, "[REDACTED]"); assert.strictEqual(result.outer.inner.password, "[REDACTED]"); assert.strictEqual(result.outer.inner.apiKey, "[REDACTED]"); assert.strictEqual(result.outer.inner.safe_field, "visible"); }); test("should handle arrays with sensitive data", () => { const input = { requests: [ { api_key: "key1", data: "safe" }, { api_key: "key2", data: "also safe" }, ], }; const result = sanitizeObject(input); assert.strictEqual(result.requests[0].api_key, "[REDACTED]"); assert.strictEqual(result.requests[1].api_key, "[REDACTED]"); assert.strictEqual(result.requests[0].data, "safe"); assert.strictEqual(result.requests[1].data, "also safe"); }); test("should be case-insensitive for field names", () => { const input = { API_KEY: "uppercase", Api_Key: "mixed", "api-key": "hyphenated", }; const result = sanitizeObject(input); assert.strictEqual(result.API_KEY, "[REDACTED]"); assert.strictEqual(result.Api_Key, "[REDACTED]"); assert.strictEqual(result["api-key"], "[REDACTED]"); }); test("should not modify non-sensitive fields", () => { const input = { gene: "TP53", chromosome: "chr17", position: 7577121, reference: "C", alternate: "T", }; const result = sanitizeObject(input); assert.deepStrictEqual(result, input); }); test("should handle null and undefined values", () => { const input = { api_key: null, token: undefined, valid: "data", }; const result = sanitizeObject(input); assert.strictEqual(result.api_key, "[REDACTED]"); assert.strictEqual(result.token, "[REDACTED]"); assert.strictEqual(result.valid, "data"); }); test("should handle think tool detection", () => { const thinkRequest = { params: { name: "think", arguments: { thought: "Analyzing the problem...", thoughtNumber: 1, }, }, }; const toolName = thinkRequest.params?.name; assert.strictEqual(toolName, "think"); }); test("should handle domain-based filtering", () => { const searchRequest1 = { params: { name: "search", arguments: { domain: "thinking", query: "some query", }, }, }; const searchRequest2 = { params: { name: "search", arguments: { domain: "think", query: "some query", }, }, }; const domain1 = searchRequest1.params?.arguments?.domain; const domain2 = searchRequest2.params?.arguments?.domain; assert.ok(domain1 === "thinking" || domain1 === "think"); assert.ok(domain2 === "thinking" || domain2 === "think"); }); ``` -------------------------------------------------------------------------------- /src/biomcp/cli/interventions.py: -------------------------------------------------------------------------------- ```python """CLI commands for intervention search and lookup.""" import asyncio from typing import Annotated import typer from ..integrations.cts_api import CTSAPIError, get_api_key_instructions from ..interventions import get_intervention, search_interventions from ..interventions.getter import format_intervention_details from ..interventions.search import ( INTERVENTION_TYPES, format_intervention_results, ) intervention_app = typer.Typer( no_args_is_help=True, help="Search and retrieve intervention information from NCI CTS API", ) @intervention_app.command("search") def search_interventions_cli( name: Annotated[ str | None, typer.Argument( help="Intervention name to search for (partial match supported)" ), ] = None, intervention_type: Annotated[ str | None, typer.Option( "--type", help=f"Type of intervention. Options: {', '.join(INTERVENTION_TYPES)}", show_choices=True, ), ] = None, synonyms: Annotated[ bool, typer.Option( "--synonyms/--no-synonyms", help="Include synonym matches in search", ), ] = True, page_size: Annotated[ int, typer.Option( "--page-size", help="Number of results per page", min=1, max=100, ), ] = 20, page: Annotated[ int, typer.Option( "--page", help="Page number", min=1, ), ] = 1, api_key: Annotated[ str | None, typer.Option( "--api-key", help="NCI API key (overrides NCI_API_KEY env var)", envvar="NCI_API_KEY", ), ] = None, ) -> None: """ Search for interventions (drugs, devices, procedures) in the NCI database. Examples: # Search by drug name biomcp intervention search pembrolizumab # Search by type biomcp intervention search --type Drug # Search for devices biomcp intervention search "CAR T" --type Biological # Search without synonyms biomcp intervention search imatinib --no-synonyms """ try: results = asyncio.run( search_interventions( name=name, intervention_type=intervention_type, synonyms=synonyms, page_size=page_size, page=page, api_key=api_key, ) ) output = format_intervention_results(results) typer.echo(output) except CTSAPIError as e: if "API key required" in str(e): typer.echo(get_api_key_instructions()) else: typer.echo(f"Error: {e}", err=True) raise typer.Exit(1) from e except Exception as e: typer.echo(f"Unexpected error: {e}", err=True) raise typer.Exit(1) from e @intervention_app.command("get") def get_intervention_cli( intervention_id: Annotated[ str, typer.Argument(help="Intervention ID"), ], api_key: Annotated[ str | None, typer.Option( "--api-key", help="NCI API key (overrides NCI_API_KEY env var)", envvar="NCI_API_KEY", ), ] = None, ) -> None: """ Get detailed information about a specific intervention. Example: biomcp intervention get INT123456 """ try: intervention_data = asyncio.run( get_intervention( intervention_id=intervention_id, api_key=api_key, ) ) output = format_intervention_details(intervention_data) typer.echo(output) except CTSAPIError as e: if "API key required" in str(e): typer.echo(get_api_key_instructions()) else: typer.echo(f"Error: {e}", err=True) raise typer.Exit(1) from e except Exception as e: typer.echo(f"Unexpected error: {e}", err=True) raise typer.Exit(1) from e @intervention_app.command("types") def list_intervention_types() -> None: """ List all available intervention types. """ typer.echo("## Available Intervention Types\n") for int_type in INTERVENTION_TYPES: typer.echo(f"- {int_type}") typer.echo("\nUse these values with the --type option when searching.") ``` -------------------------------------------------------------------------------- /tests/tdd/test_pten_r173_search.py: -------------------------------------------------------------------------------- ```python """Test case demonstrating PTEN R173 search limitations.""" import asyncio import json import pytest from biomcp.articles.search import PubmedRequest, search_articles @pytest.mark.asyncio async def test_pten_r173_search_limitations(): """Demonstrate that current AND logic is too restrictive for finding PTEN R173 papers.""" # Test 1: Current approach with multiple keywords request_restrictive = PubmedRequest( genes=["PTEN"], keywords=["R173", "Arg173"] ) result_restrictive = await search_articles( request_restrictive, output_json=True ) data_restrictive = json.loads(result_restrictive) # Test 2: Less restrictive approach request_less_restrictive = PubmedRequest(genes=["PTEN"], keywords=["R173"]) result_less_restrictive = await search_articles( request_less_restrictive, output_json=True ) data_less_restrictive = json.loads(result_less_restrictive) # Test 3: Alternative variant notations request_notation = PubmedRequest(genes=["PTEN"], keywords=["p.R173C"]) result_notation = await search_articles(request_notation, output_json=True) data_notation = json.loads(result_notation) print("\nPTEN R173 Search Results:") print( f"1. PTEN + R173 + Arg173 (AND logic): {len(data_restrictive)} articles" ) print(f"2. PTEN + R173 only: {len(data_less_restrictive)} articles") print(f"3. PTEN + p.R173C: {len(data_notation)} articles") # The restrictive search should find fewer results assert len(data_restrictive) <= len(data_less_restrictive) # Show some example articles found if data_less_restrictive: print("\nExample articles found with 'PTEN + R173':") for i, article in enumerate(data_less_restrictive[:5]): title = article.get("title", "No title") pmid = article.get("pmid", "N/A") year = article.get("pub_year", article.get("date", "N/A")) print(f"{i + 1}. {title[:80]}... (PMID: {pmid}, Year: {year[:4]})") @pytest.mark.asyncio async def test_specific_pten_papers_not_found(): """Test that specific PTEN R173 papers mentioned by user are not found.""" # Papers mentioned by user that should be found expected_papers = [ "Mester et al 2018 Human Mutation", "Mighell et al 2020 AJHG", "Smith et al 2016 Proteins", "Smith et al 2019 AJHG", "Smith et al 2023 JPCB", ] # Search for Smith IN papers on PTEN request = PubmedRequest(keywords=["Smith IN", "PTEN"]) result = await search_articles(request, output_json=True) data = json.loads(result) print(f"\nSmith IN + PTEN search found {len(data)} articles") # Check if any contain R173 in title/abstract r173_papers = [] for article in data: title = article.get("title", "") abstract = article.get("abstract", "") if ( "R173" in title or "R173" in abstract or "Arg173" in title or "Arg173" in abstract ): r173_papers.append(article) print(f"Papers mentioning R173/Arg173: {len(r173_papers)}") # The issue: R173 might only be in full text, not abstract assert len(r173_papers) < len( expected_papers ), "Not all expected R173 papers are found" def test_and_logic_explanation(): """Document why AND logic causes issues for variant searches.""" explanation = """ Current search behavior: - Query: genes=['PTEN'], keywords=['R173', 'Arg173'] - Translates to: "@GENE_PTEN AND R173 AND Arg173" - This requires ALL terms to be present Issues: 1. Papers may use either "R173" OR "Arg173", not both 2. Variant notations vary: "R173C", "p.R173C", "c.517C>T", etc. 3. Specific mutation details may only be in full text, not abstract 4. AND logic is too restrictive for synonym/variant searches Potential solutions: 1. Implement OR logic within variant/keyword groups 2. Add variant notation normalization 3. Support multiple search strategies (AND vs OR) 4. Consider full-text search capabilities """ print(explanation) assert True # This test is for documentation if __name__ == "__main__": # Run the tests to demonstrate the issue asyncio.run(test_pten_r173_search_limitations()) asyncio.run(test_specific_pten_papers_not_found()) test_and_logic_explanation() ``` -------------------------------------------------------------------------------- /src/biomcp/interventions/getter.py: -------------------------------------------------------------------------------- ```python """Get specific intervention details via NCI CTS API.""" import logging from typing import Any from ..constants import NCI_INTERVENTIONS_URL from ..integrations.cts_api import CTSAPIError, make_cts_request logger = logging.getLogger(__name__) async def get_intervention( intervention_id: str, api_key: str | None = None, ) -> dict[str, Any]: """ Get detailed information about a specific intervention. Args: intervention_id: Intervention ID api_key: Optional API key (if not provided, uses NCI_API_KEY env var) Returns: Dictionary with intervention details Raises: CTSAPIError: If the API request fails or intervention not found """ try: # Make API request url = f"{NCI_INTERVENTIONS_URL}/{intervention_id}" response = await make_cts_request( url=url, api_key=api_key, ) # Return the intervention data if "data" in response: return response["data"] elif "intervention" in response: return response["intervention"] else: return response except CTSAPIError: raise except Exception as e: logger.error(f"Failed to get intervention {intervention_id}: {e}") raise CTSAPIError(f"Failed to retrieve intervention: {e!s}") from e def _format_intervention_header(intervention: dict[str, Any]) -> list[str]: """Format intervention header and basic info.""" int_id = intervention.get( "id", intervention.get("intervention_id", "Unknown") ) name = intervention.get("name", "Unknown Intervention") int_type = intervention.get( "type", intervention.get("category", "Unknown") ) return [ f"## Intervention: {name}", "", "### Basic Information", f"- **ID**: {int_id}", f"- **Type**: {int_type}", ] def _format_intervention_synonyms(synonyms: Any) -> list[str]: """Format intervention synonyms section.""" if not synonyms: return [] lines = ["", "### Synonyms"] if isinstance(synonyms, list): for syn in synonyms: lines.append(f"- {syn}") else: lines.append(f"- {synonyms}") return lines def _format_intervention_regulatory(intervention: dict[str, Any]) -> list[str]: """Format regulatory information section.""" if not intervention.get("fda_approved"): return [] lines = [ "", "### Regulatory Status", f"- **FDA Approved**: {'Yes' if intervention['fda_approved'] else 'No'}", ] if intervention.get("approval_date"): lines.append(f"- **Approval Date**: {intervention['approval_date']}") return lines def _format_intervention_indications(indications: Any) -> list[str]: """Format clinical indications section.""" if not indications: return [] lines = ["", "### Clinical Indications"] if isinstance(indications, list): for indication in indications: lines.append(f"- {indication}") else: lines.append(f"- {indications}") return lines def format_intervention_details(intervention: dict[str, Any]) -> str: """ Format intervention details as markdown. Args: intervention: Intervention data dictionary Returns: Formatted markdown string """ lines = _format_intervention_header(intervention) # Add synonyms lines.extend( _format_intervention_synonyms(intervention.get("synonyms", [])) ) # Add description if intervention.get("description"): lines.extend([ "", "### Description", intervention["description"], ]) # Add mechanism of action for drugs if intervention.get("mechanism_of_action"): lines.extend([ "", "### Mechanism of Action", intervention["mechanism_of_action"], ]) # Add regulatory info lines.extend(_format_intervention_regulatory(intervention)) # Add clinical indications lines.extend( _format_intervention_indications(intervention.get("indications")) ) # Add related trials count if available if intervention.get("trial_count"): lines.extend([ "", "### Clinical Trial Activity", f"- **Number of Trials**: {intervention['trial_count']}", ]) return "\n".join(lines) ``` -------------------------------------------------------------------------------- /src/biomcp/thinking/session.py: -------------------------------------------------------------------------------- ```python """Session management for sequential thinking.""" import uuid from collections import defaultdict from dataclasses import dataclass, field from datetime import datetime from typing import Any @dataclass class ThoughtEntry: """Represents a single thought in the thinking process.""" thought: str thought_number: int total_thoughts: int next_thought_needed: bool timestamp: datetime = field(default_factory=datetime.now) is_revision: bool = False revises_thought: int | None = None branch_from_thought: int | None = None branch_id: str | None = None metadata: dict[str, Any] = field(default_factory=dict) @dataclass class ThinkingSession: """Manages state for a thinking session.""" session_id: str = field(default_factory=lambda: str(uuid.uuid4())) created_at: datetime = field(default_factory=datetime.now) thought_history: list[ThoughtEntry] = field(default_factory=list) thought_branches: dict[str, list[ThoughtEntry]] = field( default_factory=lambda: defaultdict(list) ) metadata: dict[str, Any] = field(default_factory=dict) def add_thought(self, entry: ThoughtEntry) -> None: """Add a thought to the session.""" # If this is a revision, replace the original thought if entry.is_revision and entry.revises_thought: for i, thought in enumerate(self.thought_history): if thought.thought_number == entry.revises_thought: self.thought_history[i] = entry return # Add to appropriate collection if entry.branch_id: self.thought_branches[entry.branch_id].append(entry) else: self.thought_history.append(entry) def get_thought(self, thought_number: int) -> ThoughtEntry | None: """Get a specific thought by number.""" for thought in self.thought_history: if thought.thought_number == thought_number: return thought return None def get_branch_thoughts(self, branch_id: str) -> list[ThoughtEntry]: """Get all thoughts in a specific branch.""" return self.thought_branches.get(branch_id, []) def get_all_thoughts(self) -> list[ThoughtEntry]: """Get all thoughts across main history and branches.""" all_thoughts = list(self.thought_history) for branch_thoughts in self.thought_branches.values(): all_thoughts.extend(branch_thoughts) return sorted(all_thoughts, key=lambda t: t.timestamp) class SessionManager: """Manages multiple thinking sessions.""" def __init__(self): self.sessions: dict[str, ThinkingSession] = {} self._current_session_id: str | None = None def create_session(self) -> ThinkingSession: """Create a new thinking session.""" session = ThinkingSession() self.sessions[session.session_id] = session self._current_session_id = session.session_id return session def get_session( self, session_id: str | None = None ) -> ThinkingSession | None: """Get a session by ID or the current session.""" if session_id: return self.sessions.get(session_id) elif self._current_session_id: return self.sessions.get(self._current_session_id) return None def get_or_create_session( self, session_id: str | None = None ) -> ThinkingSession: """Get existing session or create new one.""" if session_id and session_id in self.sessions: self._current_session_id = session_id return self.sessions[session_id] session = self.get_session() if not session: session = self.create_session() return session def clear_session(self, session_id: str | None = None) -> None: """Clear a specific session or the current session.""" if session_id: self.sessions.pop(session_id, None) if self._current_session_id == session_id: self._current_session_id = None elif self._current_session_id: self.sessions.pop(self._current_session_id, None) self._current_session_id = None def clear_all_sessions(self) -> None: """Clear all sessions.""" self.sessions.clear() self._current_session_id = None # Global session manager instance _session_manager = SessionManager() ``` -------------------------------------------------------------------------------- /.github/workflows/ci.yml: -------------------------------------------------------------------------------- ```yaml name: CI on: push: branches: [main, develop] pull_request: branches: [main] workflow_dispatch: env: PYTHON_VERSION: "3.12" UV_VERSION: "0.4.29" jobs: # Quality check from main.yml - uses make check quality: runs-on: ubuntu-latest name: Quality steps: - name: Check out uses: actions/checkout@v5 - uses: actions/cache@v4 with: path: ~/.cache/pre-commit key: pre-commit-${{ hashFiles('.pre-commit-config.yaml') }} - name: Set up Python uses: actions/setup-python@v6 with: python-version: ${{ env.PYTHON_VERSION }} - name: Install uv uses: astral-sh/setup-uv@v6 with: version: ${{ env.UV_VERSION }} - name: Install dependencies run: | uv sync --group dev - name: Run checks run: make check # Tests and type check specifically on Python 3.11 tests-and-type-check: runs-on: ubuntu-latest name: Tests and Type Check (Python 3.11) steps: - name: Check out uses: actions/checkout@v5 - name: Set up Python uses: actions/setup-python@v6 with: python-version: "3.11" - name: Install uv uses: astral-sh/setup-uv@v6 with: version: ${{ env.UV_VERSION }} - name: Install dependencies run: | uv sync --group dev - name: Run tests run: uv run python -m pytest tests -m "not integration" --cov --cov-config=pyproject.toml --cov-report=xml - name: Check typing run: uv run mypy - name: Upload coverage reports to Codecov with GitHub Action on Python 3.11 uses: codecov/codecov-action@v5 # Documentation check from main.yml check-docs: runs-on: ubuntu-latest name: Check Docs steps: - name: Check out uses: actions/checkout@v5 - name: Set up Python uses: actions/setup-python@v6 with: python-version: ${{ env.PYTHON_VERSION }} - name: Install uv uses: astral-sh/setup-uv@v6 with: version: ${{ env.UV_VERSION }} - name: Install dependencies run: | uv sync --group dev - name: Check if documentation can be built run: uv run mkdocs build -s # Build package check build-package: runs-on: ubuntu-latest name: Build Package steps: - uses: actions/checkout@v5 - name: Set up Python uses: actions/setup-python@v6 with: python-version: ${{ env.PYTHON_VERSION }} - name: Install uv uses: astral-sh/setup-uv@v6 with: version: ${{ env.UV_VERSION }} - name: Build package run: | uvx --from build pyproject-build --installer uv - name: Check package run: | uvx twine check dist/* - name: Upload artifacts uses: actions/upload-artifact@v4 with: name: dist path: dist/ # MCP integration test - quick check test-mcp: runs-on: ubuntu-latest name: Test MCP Integration steps: - uses: actions/checkout@v5 - name: Set up Python uses: actions/setup-python@v6 with: python-version: ${{ env.PYTHON_VERSION }} - name: Install uv uses: astral-sh/setup-uv@v6 with: version: ${{ env.UV_VERSION }} - name: Install dependencies run: | uv sync --group dev - name: Test MCP server startup run: | timeout 10s uv run biomcp run || code=$?; if [[ $code -ne 124 && $code -ne 0 ]]; then exit $code; fi - name: Run MCP integration tests run: | uv run python -m pytest tests/tdd/test_mcp_integration.py -v # Run integration tests separately - allowed to fail integration-tests: runs-on: ubuntu-latest name: Integration Tests (Optional) continue-on-error: true steps: - name: Check out uses: actions/checkout@v5 - name: Set up Python uses: actions/setup-python@v6 with: python-version: "3.11" - name: Install uv uses: astral-sh/setup-uv@v6 with: version: ${{ env.UV_VERSION }} - name: Install dependencies run: | uv sync --group dev - name: Run integration tests run: | uv run python -m pytest tests -m "integration" -v --tb=short continue-on-error: true ``` -------------------------------------------------------------------------------- /docs/backend-services-reference/03-cbioportal.md: -------------------------------------------------------------------------------- ```markdown # cBioPortal Integration BioMCP integrates with [cBioPortal](https://www.cbioportal.org/), a comprehensive cancer genomics portal that provides visualization and analysis tools for large-scale cancer genomics datasets. ## Overview The cBioPortal integration enhances article searches by automatically including relevant cancer genomics data when searching for genes. This integration provides: 1. **Gene-level summaries** - Mutation frequency and distribution across cancer studies 2. **Mutation-specific searches** - Find studies containing specific mutations (e.g., BRAF V600E) 3. **Cancer type resolution** - Accurate cancer type categorization using cBioPortal's API ## How It Works ### Automatic Integration When you search for articles with a gene parameter, BioMCP automatically queries cBioPortal to provide additional context: ```python # Basic gene search includes cBioPortal summary search(domain="article", genes=["BRAF"], diseases=["melanoma"]) ``` This returns: - Standard PubMed/PubTator3 article results - cBioPortal summary showing mutation frequency across cancer studies - Top cancer types where the gene is mutated ### Mutation-Specific Searches To search for specific mutations, include the mutation notation in keywords: ```python # Search for BRAF V600E mutation search(domain="article", genes=["BRAF"], keywords=["V600E"]) # Search for SRSF2 F57Y mutation search(domain="article", genes=["SRSF2"], keywords=["F57Y"]) # Use wildcards for mutation patterns (e.g., any amino acid at position 57) search(domain="article", genes=["SRSF2"], keywords=["F57*"]) ``` Mutation-specific searches return: - Total number of studies in cBioPortal - Number of studies containing the mutation - Top studies ranked by mutation count - Cancer type distribution ## Example Output ### Gene-Level Summary ``` ### cBioPortal Summary for BRAF - **Mutation Frequency**: 76.7% (368 mutations in 480 samples) - **Top Cancer Types**: Melanoma (45%), Thyroid (23%), Colorectal (18%) - **Top Mutations**: V600E (89%), V600K (7%), G469A (2%) ``` ### Mutation-Specific Results ``` ### cBioPortal Mutation Search: BRAF **Specific Mutation**: V600E - **Total Studies**: 2340 - **Studies with Mutation**: 170 - **Total Mutations Found**: 5780 **Top Studies by Mutation Count:** | Count | Study ID | Cancer Type | Study Name | |-------|----------|-------------|------------| | 804 | msk_met_2021 | Mixed Cancer Types | MSK MetTropism (MSK, Cell 2021) | | 555 | msk_chord_2024 | Mixed Cancer Types | MSK-CHORD (MSK, Nature 2024) | | 295 | msk_impact_2017 | Mixed Cancer Types | MSK-IMPACT Clinical Sequencing Cohort | ``` ## Supported Mutation Notations The integration recognizes standard protein change notation: - **Specific mutations**: `V600E`, `F57Y`, `T790M` - **Wildcard patterns**: `F57*` (matches F57Y, F57L, etc.) - **Multiple mutations**: Include multiple keywords for OR search ## API Details ### Endpoints Used 1. **Gene Information**: `/api/genes/{gene}` 2. **Cancer Types**: `/api/cancer-types` 3. **Mutation Data**: `/api/mutations/fetch` 4. **Study Information**: `/api/studies` ### Rate Limiting - Conservative rate limit of 5 requests/second - Results cached for 15-30 minutes (mutations) or 24 hours (cancer types) ### Authentication Optional authentication via environment variable: ```bash export CBIO_TOKEN="your-api-token" ``` Public cBioPortal instance works without authentication but may have rate limits. ## CLI Usage For detailed command-line options for searching articles with cBioPortal integration, see the [CLI User Guide](../user-guides/01-command-line-interface.md#article-commands). ## Performance Considerations 1. **Caching**: Results are cached to minimize API calls - Gene summaries: 15 minutes - Mutation searches: 30 minutes - Cancer types: 24 hours 2. **Graceful Degradation**: If cBioPortal is unavailable, searches continue without the additional data 3. **Parallel Processing**: API calls are made in parallel with article searches for optimal performance ## Limitations 1. Only works with valid HUGO gene symbols 2. Mutation searches require exact protein change notation 3. Limited to mutations in cBioPortal's curated studies 4. Rate limits may apply for high-volume usage ## Error Handling The integration handles various error scenarios: - Invalid gene symbols are validated before API calls - Network timeouts fall back to article-only results - API errors are logged but don't block search results ``` -------------------------------------------------------------------------------- /src/biomcp/utils/cancer_types_api.py: -------------------------------------------------------------------------------- ```python """Cancer type utilities using cBioPortal API.""" import logging from ..utils.cbio_http_adapter import CBioHTTPAdapter from ..utils.request_cache import request_cache logger = logging.getLogger(__name__) class CancerTypeAPIClient: """Client for fetching cancer types from cBioPortal API.""" def __init__(self): """Initialize the cancer type API client.""" self.http_adapter = CBioHTTPAdapter() # Cache for cancer types self._cancer_types_cache: dict[str, str] | None = None @request_cache(ttl=86400) # Cache for 24 hours async def get_all_cancer_types(self) -> dict[str, str]: """Fetch all cancer types from cBioPortal API. Returns: Dictionary mapping cancer type IDs to display names """ if self._cancer_types_cache is not None: return self._cancer_types_cache try: cancer_types, error = await self.http_adapter.get( "/cancer-types", endpoint_key="cbioportal_cancer_types", cache_ttl=86400, # 24 hours ) if error: logger.error(f"Failed to fetch cancer types: {error.message}") return {} if cancer_types: # Build mapping from ID to name result = {} for ct in cancer_types: cancer_type_id = ct.get("cancerTypeId", "") name = ct.get("name", "") if cancer_type_id and name: result[cancer_type_id.lower()] = name # Also add common abbreviations short_name = ct.get("shortName", "") if short_name and short_name != cancer_type_id: result[short_name.lower()] = name self._cancer_types_cache = result logger.info(f"Loaded {len(result)} cancer types from API") return result return {} except Exception as e: logger.error(f"Error fetching cancer types: {e}") return {} async def get_cancer_type_name(self, cancer_type_id: str) -> str: """Get the display name for a cancer type ID. Args: cancer_type_id: The cancer type identifier Returns: Display name or the original ID if not found """ if not cancer_type_id: return "Unknown" cancer_types = await self.get_all_cancer_types() # Try exact match (case-insensitive) normalized_id = cancer_type_id.lower() if normalized_id in cancer_types: return cancer_types[normalized_id] # If not found, return the original ID with title case if cancer_type_id == cancer_type_id.lower(): return cancer_type_id.title() return cancer_type_id @request_cache(ttl=3600) # Cache for 1 hour async def get_study_cancer_type(self, study_id: str) -> str: """Get cancer type for a specific study. Args: study_id: The study identifier Returns: Cancer type name or "Unknown" """ try: study_data, error = await self.http_adapter.get( f"/studies/{study_id}", endpoint_key="cbioportal_studies", cache_ttl=3600, # 1 hour ) if error or not study_data: logger.debug(f"Study {study_id} not found") return "Unknown" cancer_type_id = study_data.get("cancerType", {}).get( "cancerTypeId", "" ) if cancer_type_id and cancer_type_id != "unknown": return await self.get_cancer_type_name(cancer_type_id) # Fallback to the cancer type name directly cancer_type_name = study_data.get("cancerType", {}).get("name", "") if cancer_type_name: return cancer_type_name return "Unknown" except Exception as e: logger.debug(f"Error fetching study {study_id}: {e}") return "Unknown" # Global instance for reuse _cancer_type_client: CancerTypeAPIClient | None = None def get_cancer_type_client() -> CancerTypeAPIClient: """Get or create the global cancer type client.""" global _cancer_type_client if _cancer_type_client is None: _cancer_type_client = CancerTypeAPIClient() return _cancer_type_client ``` -------------------------------------------------------------------------------- /tests/tdd/utils/test_mutation_filter.py: -------------------------------------------------------------------------------- ```python """Tests for mutation filter utility.""" from biomcp.utils.mutation_filter import MutationFilter class MockMutation: """Mock mutation object for testing.""" def __init__(self, protein_change: str): self.protein_change = protein_change class TestMutationFilter: """Test mutation filtering functionality.""" def test_specific_mutation_filter(self): """Test filtering for specific mutations.""" mutation_filter = MutationFilter(specific_mutation="V600E") assert mutation_filter.matches("V600E") assert not mutation_filter.matches("V600K") assert not mutation_filter.matches("V600") assert not mutation_filter.matches("") def test_wildcard_pattern_filter(self): """Test filtering with wildcard patterns.""" mutation_filter = MutationFilter(pattern="V600*") assert mutation_filter.matches("V600E") assert mutation_filter.matches("V600K") assert mutation_filter.matches("V600D") assert not mutation_filter.matches("V601E") assert not mutation_filter.matches("K600E") def test_pattern_without_wildcard(self): """Test pattern matching without wildcard.""" # Pattern does exact match via regex (no prefix matching without *) mutation_filter = MutationFilter(pattern="F57") # Exact match works assert mutation_filter.matches("F57") # No prefix matching without wildcard assert not mutation_filter.matches("F57Y") assert not mutation_filter.matches("F57L") assert not mutation_filter.matches("F58Y") def test_no_filter(self): """Test when no filter is specified.""" mutation_filter = MutationFilter() assert mutation_filter.matches("V600E") assert mutation_filter.matches("anything") # Empty protein change returns False even with no filter assert not mutation_filter.matches("") def test_filter_mutations_list(self): """Test filtering a list of mutations.""" mutations = [ MockMutation("V600E"), MockMutation("V600K"), MockMutation("V600D"), MockMutation("T790M"), MockMutation("L858R"), ] # Test specific mutation mutation_filter1 = MutationFilter(specific_mutation="V600E") filtered1 = mutation_filter1.filter_mutations(mutations) assert len(filtered1) == 1 assert filtered1[0].protein_change == "V600E" # Test pattern mutation_filter2 = MutationFilter(pattern="V600*") filtered2 = mutation_filter2.filter_mutations(mutations) assert len(filtered2) == 3 assert all(m.protein_change.startswith("V600") for m in filtered2) # Test no filter mutation_filter3 = MutationFilter() filtered3 = mutation_filter3.filter_mutations(mutations) assert len(filtered3) == 5 def test_string_representations(self): """Test string representations of filters.""" mutation_filter1 = MutationFilter(specific_mutation="V600E") assert str(mutation_filter1) == "MutationFilter(specific=V600E)" assert ( repr(mutation_filter1) == "MutationFilter(specific_mutation='V600E', pattern=None)" ) mutation_filter2 = MutationFilter(pattern="V600*") assert str(mutation_filter2) == "MutationFilter(pattern=V600*)" mutation_filter3 = MutationFilter() assert str(mutation_filter3) == "MutationFilter(no_filter)" def test_edge_cases(self): """Test edge cases in mutation matching.""" # Empty protein change mutation_filter = MutationFilter(specific_mutation="V600E") assert not mutation_filter.matches("") assert not mutation_filter.matches(None) # Complex patterns mutation_filter2 = MutationFilter(pattern="[VL]600*") # This will use regex escaping, so won't work as expected # But should not crash assert not mutation_filter2.matches("V600E") # Because [ is escaped def test_filter_mutations_preserves_type(self): """Test that filter preserves the original list type.""" mutations = [ MockMutation("V600E"), MockMutation("V600K"), ] mutation_filter = MutationFilter(pattern="V600*") result = mutation_filter.filter_mutations(mutations) # Result should be a list assert isinstance(result, list) assert len(result) == 2 ``` -------------------------------------------------------------------------------- /src/biomcp/variants/getter.py: -------------------------------------------------------------------------------- ```python """Getter module for retrieving variant details.""" import json import logging from typing import Annotated from .. import ensure_list, http_client, render from ..constants import DEFAULT_ASSEMBLY, MYVARIANT_GET_URL from .external import ExternalVariantAggregator, format_enhanced_annotations from .filters import filter_variants from .links import inject_links logger = logging.getLogger(__name__) async def get_variant( variant_id: str, output_json: bool = False, include_external: bool = False, assembly: str = DEFAULT_ASSEMBLY, ) -> str: """ Get variant details from MyVariant.info using the variant identifier. The identifier can be a full HGVS-style string (e.g. "chr7:g.140453136A>T") or an rsID (e.g. "rs113488022"). The API response is expected to include a "hits" array; this function extracts the first hit. Args: variant_id: Variant identifier (HGVS or rsID) output_json: Return JSON format if True, else Markdown include_external: Include external annotations (TCGA, 1000 Genomes, cBioPortal) assembly: Genome assembly (hg19 or hg38), defaults to hg19 Returns: Formatted variant data as JSON or Markdown string If output_json is True, the result is returned as a formatted JSON string; otherwise, it is rendered as Markdown. """ response, error = await http_client.request_api( url=f"{MYVARIANT_GET_URL}/{variant_id}", request={"fields": "all", "assembly": assembly}, method="GET", domain="myvariant", ) data_to_return: list = ensure_list(response) # Inject database links into the variant data if not error: data_to_return = inject_links(data_to_return) data_to_return = filter_variants(data_to_return) # Add external annotations if requested if include_external and data_to_return: logger.info( f"Adding external annotations for {len(data_to_return)} variants" ) aggregator = ExternalVariantAggregator() for _i, variant_data in enumerate(data_to_return): logger.info( f"Processing variant {_i}: keys={list(variant_data.keys())}" ) # Get enhanced annotations enhanced = await aggregator.get_enhanced_annotations( variant_id, include_tcga=True, include_1000g=True, include_cbioportal=True, variant_data=variant_data, ) # Add formatted annotations to the variant data formatted = format_enhanced_annotations(enhanced) logger.info( f"Formatted external annotations: {formatted['external_annotations'].keys()}" ) variant_data.update(formatted["external_annotations"]) if error: data_to_return = [{"error": f"Error {error.code}: {error.message}"}] if output_json: return json.dumps(data_to_return, indent=2) else: return render.to_markdown(data_to_return) async def _variant_details( call_benefit: Annotated[ str, "Define and summarize why this function is being called and the intended benefit", ], variant_id: str, include_external: Annotated[ bool, "Include annotations from external sources (TCGA, 1000 Genomes, cBioPortal)", ] = True, assembly: Annotated[ str, "Genome assembly (hg19 or hg38). Default: hg19", ] = DEFAULT_ASSEMBLY, ) -> str: """ Retrieves detailed information for a *single* genetic variant. Parameters: - call_benefit: Define and summarize why this function is being called and the intended benefit - variant_id: A variant identifier ("chr7:g.140453136A>T") - include_external: Include annotations from TCGA, 1000 Genomes, cBioPortal, and Mastermind - assembly: Genome assembly (hg19 or hg38). Default: hg19 Process: Queries the MyVariant.info GET endpoint, optionally fetching additional annotations from external databases Output: A Markdown formatted string containing comprehensive variant annotations (genomic context, frequencies, predictions, clinical data, external annotations). Returns error if invalid. Note: Use the variant_searcher to find the variant id first. """ return await get_variant( variant_id, output_json=False, include_external=include_external, assembly=assembly, ) ``` -------------------------------------------------------------------------------- /src/biomcp/integrations/cts_api.py: -------------------------------------------------------------------------------- ```python """NCI Clinical Trials Search API integration helper.""" import json import logging import os from typing import Any, Literal from ..constants import NCI_API_KEY_ENV from ..http_client import request_api logger = logging.getLogger(__name__) class CTSAPIError(Exception): """Error raised when CTS API requests fail.""" pass def _validate_api_key(api_key: str | None) -> str: """Validate and return API key.""" if not api_key: api_key = os.getenv(NCI_API_KEY_ENV) if not api_key: raise CTSAPIError( f"NCI API key required. Please set {NCI_API_KEY_ENV} environment " "variable or provide api_key parameter.\n" "Get a free API key at: https://www.cancer.gov/research/participate/" "clinical-trials-search/developers" ) return api_key def _prepare_request_data( method: str, params: dict[str, Any] | None, json_data: dict[str, Any] | None, headers: dict[str, str], ) -> dict[str, Any]: """Prepare request data based on method.""" if method == "GET": request_data = params or {} logger.debug(f"CTS API GET request with params: {params}") else: request_data = json_data or {} if method == "POST": logger.debug(f"CTS API POST request with data: {json_data}") # Add headers to request data if headers: request_data["_headers"] = json.dumps(headers) return request_data def _handle_api_error(error: Any) -> None: """Handle API errors with appropriate messages.""" if error.code == 401: raise CTSAPIError( f"Invalid API key. Please check your {NCI_API_KEY_ENV} " "environment variable or api_key parameter." ) elif error.code == 403: raise CTSAPIError( "Access forbidden. Your API key may not have permission " "to access this resource." ) else: raise CTSAPIError(f"CTS API error: {error.message}") async def make_cts_request( url: str, method: Literal["GET", "POST"] = "GET", params: dict[str, Any] | None = None, json_data: dict[str, Any] | None = None, api_key: str | None = None, ) -> dict[str, Any]: """ Make a request to the NCI CTS API with proper authentication. Args: url: Full URL to the CTS API endpoint method: HTTP method (GET or POST) params: Query parameters json_data: JSON data for POST requests api_key: Optional API key (if not provided, uses NCI_API_KEY env var) Returns: JSON response from the API Raises: CTSAPIError: If the request fails or API key is missing """ # Validate API key api_key = _validate_api_key(api_key) # Prepare headers headers = {"x-api-key": api_key, "Accept": "application/json"} try: # Prepare request data request_data = _prepare_request_data( method, params, json_data, headers ) # Make API request response, error = await request_api( url=url, request=request_data, method=method, cache_ttl=0, # Disable caching for NCI API to ensure fresh results ) # Handle errors if error: _handle_api_error(error) if response is None: raise CTSAPIError("No response received from NCI CTS API") return response except Exception as e: # Re-raise CTSAPIError as-is if isinstance(e, CTSAPIError): raise # Wrap other exceptions logger.error(f"CTS API request failed: {e}") raise CTSAPIError(f"Failed to connect to NCI CTS API: {e!s}") from e def get_api_key_instructions() -> str: """ Get user-friendly instructions for obtaining and setting the API key. Returns: Formatted string with instructions """ return ( "## NCI Clinical Trials API Key Required\n\n" "To use NCI's Clinical Trials Search API, you need an API key.\n\n" "**Option 1: Set environment variable (recommended)**\n" "```bash\n" f"export {NCI_API_KEY_ENV}='your-api-key'\n" "```\n\n" "**Option 2: Provide via CLI**\n" "```bash\n" "biomcp trial search --api-key YOUR_KEY --condition melanoma\n" "```\n\n" "**Get your free API key:**\n" "Visit https://www.cancer.gov/research/participate/clinical-trials-search/developers\n\n" "The API key provides access to NCI's comprehensive cancer clinical trials " "database with advanced search capabilities." ) ``` -------------------------------------------------------------------------------- /tests/tdd/variants/test_alphagenome_api_key.py: -------------------------------------------------------------------------------- ```python """Test AlphaGenome per-request API key functionality.""" import os from unittest.mock import MagicMock, patch import pandas as pd import pytest from biomcp.variants.alphagenome import predict_variant_effects @pytest.mark.asyncio async def test_api_key_parameter_overrides_env_var(): """Test that api_key parameter takes precedence over environment variable.""" # Set up environment variable with patch.dict("os.environ", {"ALPHAGENOME_API_KEY": "env-key"}): # Mock AlphaGenome modules mock_genome = MagicMock() mock_client = MagicMock() mock_scorers = MagicMock() # Mock successful prediction test_scores_df = pd.DataFrame({ "output_type": ["RNA_SEQ"], "raw_score": [1.5], "gene_name": ["BRAF"], "track_name": [None], }) # Track which API key was used api_keys_used = [] def track_create(api_key): api_keys_used.append(api_key) mock_model = MagicMock() mock_model.score_variant.return_value = test_scores_df return mock_model mock_client.create.side_effect = track_create mock_scorers.tidy_scores.return_value = test_scores_df mock_scorers.get_recommended_scorers.return_value = [] # Create a mock module with the correct attributes mock_models = MagicMock() mock_models.dna_client = mock_client mock_models.variant_scorers = mock_scorers mock_data = MagicMock() mock_data.genome = mock_genome with patch.dict( "sys.modules", { "alphagenome.data": mock_data, "alphagenome.models": mock_models, }, ): # Test with parameter API key result = await predict_variant_effects( "chr7", 140753336, "A", "T", api_key="param-key" ) # Verify the parameter key was used, not the env var assert len(api_keys_used) == 1 assert api_keys_used[0] == "param-key" assert "BRAF" in result @pytest.mark.asyncio async def test_no_api_key_shows_instructions(): """Test that missing API key shows helpful instructions.""" # Ensure no environment variable is set with patch.dict("os.environ", {}, clear=True): # Remove ALPHAGENOME_API_KEY if it exists os.environ.pop("ALPHAGENOME_API_KEY", None) result = await predict_variant_effects( "chr7", 140753336, "A", "T", skip_cache=True ) # Check for instructions assert "AlphaGenome API key required" in result assert "My AlphaGenome API key is" in result assert "ACTION REQUIRED" in result assert "https://deepmind.google.com/science/alphagenome" in result @pytest.mark.asyncio async def test_env_var_used_when_no_parameter(): """Test that environment variable is used when no parameter is provided.""" # Set up environment variable with patch.dict("os.environ", {"ALPHAGENOME_API_KEY": "env-key"}): # Mock AlphaGenome modules mock_genome = MagicMock() mock_client = MagicMock() mock_scorers = MagicMock() # Mock successful prediction test_scores_df = pd.DataFrame({ "output_type": ["RNA_SEQ"], "raw_score": [1.5], "gene_name": ["BRAF"], "track_name": [None], }) # Track which API key was used api_keys_used = [] def track_create(api_key): api_keys_used.append(api_key) mock_model = MagicMock() mock_model.score_variant.return_value = test_scores_df return mock_model mock_client.create.side_effect = track_create mock_scorers.tidy_scores.return_value = test_scores_df mock_scorers.get_recommended_scorers.return_value = [] # Create a mock module with the correct attributes mock_models = MagicMock() mock_models.dna_client = mock_client mock_models.variant_scorers = mock_scorers mock_data = MagicMock() mock_data.genome = mock_genome with patch.dict( "sys.modules", { "alphagenome.data": mock_data, "alphagenome.models": mock_models, }, ): # Test without parameter API key result = await predict_variant_effects("chr7", 140753336, "A", "T") # Verify the env var key was used assert len(api_keys_used) == 1 assert api_keys_used[0] == "env-key" assert "BRAF" in result ``` -------------------------------------------------------------------------------- /src/biomcp/request_batcher.py: -------------------------------------------------------------------------------- ```python """Request batching utility for combining multiple small requests. This module provides a request batcher that accumulates multiple requests and processes them together in batches, reducing the number of API calls and improving performance for bulk operations. Key Features: - Automatic batching based on size or time threshold - Configurable batch size and timeout - Thread-safe request accumulation - Error propagation to individual requests Example: ```python async def batch_api_call(params_list): # Make a single API call with multiple parameters return await api.bulk_request(params_list) batcher = RequestBatcher( batch_func=batch_api_call, batch_size=10, batch_timeout=0.1 ) # Individual requests are automatically batched result1 = await batcher.request({"id": 1}) result2 = await batcher.request({"id": 2}) ``` """ import asyncio from collections.abc import Callable, Coroutine from typing import Any, TypeVar T = TypeVar("T") class RequestBatcher: """Batches multiple requests together to reduce overhead. This is particularly useful for APIs that support batch operations or when network latency dominates over processing time. The batcher accumulates requests until either: 1. The batch size threshold is reached 2. The batch timeout expires At which point all accumulated requests are processed together. """ def __init__( self, batch_func: Callable[[list[Any]], Coroutine[Any, Any, list[Any]]], batch_size: int = 10, batch_timeout: float = 0.05, # 50ms ): """Initialize the batcher. Args: batch_func: Async function that processes a batch of requests batch_size: Maximum number of requests to batch together batch_timeout: Maximum time to wait for batch to fill (seconds) """ self.batch_func = batch_func self.batch_size = batch_size self.batch_timeout = batch_timeout self.pending_requests: list[tuple[Any, asyncio.Future]] = [] self.batch_task: asyncio.Task | None = None self._lock = asyncio.Lock() async def request(self, params: Any) -> Any: """Add a request to the batch and wait for result.""" future: asyncio.Future[Any] = asyncio.Future() async with self._lock: self.pending_requests.append((params, future)) # Check if we should flush immediately if len(self.pending_requests) >= self.batch_size: await self._flush_batch() elif not self.batch_task or self.batch_task.done(): # Start a timer to flush the batch self.batch_task = asyncio.create_task(self._batch_timer()) return await future async def _batch_timer(self): """Timer that flushes the batch after timeout.""" await asyncio.sleep(self.batch_timeout) async with self._lock: await self._flush_batch() async def _flush_batch(self): """Process all pending requests as a batch.""" if not self.pending_requests: return # Extract current batch batch = self.pending_requests.copy() self.pending_requests.clear() # Cancel timer if running if self.batch_task and not self.batch_task.done(): self.batch_task.cancel() # Process batch try: params_list = [params for params, _ in batch] results = await self.batch_func(params_list) # Distribute results to futures for i, (_, future) in enumerate(batch): if not future.done(): if i < len(results): future.set_result(results[i]) else: future.set_exception( Exception(f"No result for request at index {i}") ) except Exception as e: # Propagate error to all futures for _, future in batch: if not future.done(): future.set_exception(e) # Example usage for autocomplete batching async def batch_autocomplete_requests(requests: list[dict]) -> list[Any]: """Process multiple autocomplete requests in parallel. This is an example implementation that could be used to batch autocomplete requests more efficiently. """ from .articles.autocomplete import EntityRequest, autocomplete tasks = [] for req in requests: entity_req = EntityRequest(**req) tasks.append(autocomplete(entity_req)) return await asyncio.gather(*tasks) ``` -------------------------------------------------------------------------------- /pyproject.toml: -------------------------------------------------------------------------------- ```toml [project] name = "biomcp-python" version = "0.4.6" description = "Biomedical Model Context Protocol Server" authors = [{ name = "Ian Maurer", email = "[email protected]" }] readme = "README.md" keywords = ['python'] requires-python = ">=3.10,<4.0" classifiers = [ "Intended Audience :: Developers", "Programming Language :: Python", "Programming Language :: Python :: 3", "Programming Language :: Python :: 3.10", "Programming Language :: Python :: 3.11", "Programming Language :: Python :: 3.12", "Programming Language :: Python :: 3.13", "Topic :: Software Development :: Libraries :: Python Modules", ] dependencies = [ "certifi>=2025.1.31", "diskcache>=5.6.3", "httpx>=0.28.1", "mcp[cli]>=1.12.3,<2.0.0", "platformdirs>=4.3.6", "psutil>=7.0.0", "pydantic>=2.10.6", "python-dotenv>=1.0.0", "rich>=14.0.0", "typer>=0.15.2", "uvicorn>=0.34.2", "alphagenome>=0.1.0", ] [project.urls] Homepage = "https://genomoncology.com/biomcp/" Repository = "https://github.com/genomoncology/biomcp" Documentation = "https://genomoncology.com/biomcp/" [dependency-groups] dev = [ "pytest>=7.2.0", "pytest-xdist>=3.5.0", "pre-commit>=2.20.0", "tox-uv>=1.11.3", "deptry>=0.22.0", "mypy>=0.991", "pytest-cov>=4.0.0", "pytest-asyncio>=0.24.0", "ruff>=0.9.2", "mkdocs>=1.4.2", "mkdocs-material>=8.5.10", "mkdocstrings[python]>=0.26.1", "anyio>=4.8.0", # "ipython>=9.0.2", "pytest-bdd>=8.1.0", "tomlkit>=0.13.2", "assertpy>=1.1", "twine>=4.0.0", "pandas>=2.0.0", # Used for mocking AlphaGenome responses in tests "PyYAML>=6.0.0", # Used for mkdocs.yml parsing in scripts "pydantic-ai>=0.0.14", # For testing Pydantic AI integration ] [project.optional-dependencies] api = [ ] worker = [ "fastapi>=0.110.0", "starlette>=0.36.0", "uvicorn>=0.28.0", ] [build-system] requires = ["setuptools >= 61.0"] build-backend = "setuptools.build_meta" [tool.setuptools.package-data] biomcp = ["resources/*.md"] [project.scripts] biomcp = "biomcp.__main__:main" [tool.mypy] files = ["src"] ignore_missing_imports = true disallow_untyped_defs = false disallow_any_unimported = false no_implicit_optional = true check_untyped_defs = false warn_return_any = false warn_unused_ignores = true show_error_codes = true plugins = [ "pydantic.mypy" ] disable_error_code = [ "union-attr", "prop-decorator", ] [tool.pytest.ini_options] testpaths = ["tests"] addopts = "--import-mode=importlib" asyncio_mode = "auto" asyncio_default_fixture_loop_scope = "function" markers = [ "integration: marks tests as integration tests (deselect with '-m \"not integration\"')", ] filterwarnings = [ # Ignore protobuf version warnings from AlphaGenome "ignore:Protobuf gencode version.*is exactly one major version older.*:UserWarning", # Ignore false positive warning from pytest-xdist about coroutines # This occurs during parallel test execution when mock objects are cleaned up "ignore:coroutine 'search_trials_unified' was never awaited:RuntimeWarning", ] [tool.ruff] target-version = "py310" line-length = 79 fix = true unsafe-fixes = true [tool.ruff.lint] select = [ # flake8-2020 "YTT", # flake8-bandit "S", # flake8-bugbear "B", # flake8-builtins "A", # flake8-comprehensions "C4", # flake8-debugger "T10", # flake8-simplify "SIM", # isort "I", # mccabe "C90", # pycodestyle "E", "W", # pyflakes "F", # pygrep-hooks "PGH", # pyupgrade "UP", # ruff "RUF", ] ignore = [ # LineTooLong "E501", # DoNotAssignLambda "E731", # Consider unpacking "RUF005", # Union for type annotations "UP007", # Asserts are ok when I say they are ok. "S101", ] [tool.ruff.lint.per-file-ignores] "tests/*" = ["S101"] "__init__.py" = ["I001"] "src/biomcp/variants/external.py" = ["C901"] # Complex API interactions are acceptable [tool.ruff.format] preview = true [tool.ruff.lint.flake8-bugbear] extend-immutable-calls = [ "fastapi.Depends", "fastapi.Query", "typer.Argument", "typer.Option", ] [tool.coverage.report] skip_empty = true [tool.coverage.run] branch = true source = ["src"] omit = [ "src/*/__main__.py", "src/*/server.py", "src/*/http_client.py", ] [tool.deptry] exclude = [ "example_scripts/python_sdk.py", "venv", ".venv", ".direnv", "tests", ".git", "build", "dist", "scripts", "spike", ] [tool.deptry.per_rule_ignores] DEP001 = ["alphagenome"] # Optional dependency, must be installed manually DEP002 = ["uvicorn"] DEP003 = ["biomcp", "alphagenome"] ``` -------------------------------------------------------------------------------- /docs/getting-started/01-quickstart-cli.md: -------------------------------------------------------------------------------- ```markdown # Quickstart: BioMCP CLI Get started with BioMCP in under 5 minutes! This guide walks you through installation and your first biomedical search. ## Prerequisites - Python 3.10 or higher - [uv](https://docs.astral.sh/uv/) package manager (recommended) or pip ## Installation ### Option 1: Using uv (Recommended) ```bash # Install uv if you haven't already curl -LsSf https://astral.sh/uv/install.sh | sh # Install BioMCP uv tool install biomcp ``` ### Option 2: Using pip ```bash pip install biomcp ``` ## Your First Search Let's search for recent articles about BRAF mutations in melanoma: ```bash biomcp article search \ --gene BRAF --disease melanoma --limit 5 ``` This command: - Searches PubMed/PubTator3 for articles - Filters by BRAF gene and melanoma disease - Returns the 5 most recent results - Automatically includes cBioPortal cancer genomics data - Includes preprints from bioRxiv/medRxiv by default ## Understanding the Output The search returns: 1. **cBioPortal Summary** (if gene specified): Cancer genomics data showing mutation frequencies and hotspots 2. **Article Results**: Each result includes: - Title and authors - Journal and publication date - PubMed ID and direct link - Abstract snippet - Annotated entities (genes, diseases, chemicals) ## Essential Commands ### Search Clinical Trials Find active trials for lung cancer: ```bash biomcp trial search \ --condition "lung cancer" \ --status RECRUITING --limit 5 ``` ### Get Gene Information Retrieve details about the TP53 tumor suppressor: ```bash biomcp gene get TP53 ``` ### Look Up Drug Information Get details about imatinib (Gleevec): ```bash biomcp drug get imatinib ``` ### Search for Genetic Variants Find pathogenic variants in the BRCA1 gene: ```bash biomcp variant search \ --gene BRCA1 --significance pathogenic \ --limit 5 ``` ## Next Steps ### Set Up API Keys (Optional but Recommended) Some features require API keys for enhanced functionality: ```bash # For NCI clinical trials database export NCI_API_KEY="your-key-here" # For AlphaGenome variant predictions export ALPHAGENOME_API_KEY="your-key-here" # For additional cBioPortal features export CBIO_TOKEN="your-token-here" ``` See [Authentication and API Keys](03-authentication-and-api-keys.md) for detailed setup. ### Explore Advanced Features - **Combine Multiple Filters**: ```bash biomcp article search \ --gene EGFR --disease "lung cancer" \ --chemical erlotinib ``` - **Use OR Logic in Keywords**: ```bash biomcp article search --gene BRAF --keyword "V600E|p.V600E|c.1799T>A" ``` - **Exclude Preprints**: ```bash biomcp article search --gene TP53 --no-preprints ``` ### Get Help View all available commands: ```bash biomcp --help ``` Get help for a specific command: ```bash biomcp article search --help ``` ## Common Use Cases ### 1. Research a Specific Mutation ```bash # Find articles about EGFR T790M resistance mutation biomcp article search --gene EGFR \ --keyword "T790M|p.T790M" \ --disease "lung cancer" ``` ### 2. Find Trials for a Patient ```bash # Active trials for HER2-positive breast cancer biomcp trial search \ --condition "breast cancer" \ --keyword "HER2 positive" \ --status RECRUITING ``` ### 3. Investigate Drug Mechanisms ```bash # Get information about pembrolizumab biomcp drug get pembrolizumab # Find articles about its use in melanoma biomcp article search --chemical pembrolizumab --disease melanoma ``` ## Troubleshooting ### Command Not Found If `biomcp` is not recognized: - Ensure your PATH includes the installation directory - Try running with full path: `~/.local/bin/biomcp` - Restart your terminal after installation ### No Results Found If searches return no results: - Check spelling of gene names (use official symbols) - Try broader search terms - Remove filters one by one to identify the constraint ### API Rate Limits If you encounter rate limit errors: - Add delays between requests - Consider setting up API keys for higher limits - Use the `--limit` parameter to reduce result count ## Next Steps Now that you've run your first searches, explore these resources: 1. **[Complete CLI Reference](../user-guides/01-command-line-interface.md)** - Comprehensive documentation for all commands and options 2. **[Claude Desktop Integration](02-claude-desktop-integration.md)** - Use BioMCP with AI assistants 3. **[Set up API Keys](03-authentication-and-api-keys.md)** - Enable advanced features with NCI, AlphaGenome, and cBioPortal 4. **[How-to Guides](../how-to-guides/01-find-articles-and-cbioportal-data.md)** - Step-by-step tutorials for complex research workflows 5. **[Deep Researcher Persona](../concepts/02-the-deep-researcher-persona.md)** - Learn about BioMCP's philosophy and methodology Happy researching! 🧬🔬 ``` -------------------------------------------------------------------------------- /tests/integration/test_preprints_integration.py: -------------------------------------------------------------------------------- ```python """Integration tests for preprint search functionality.""" import asyncio import pytest from biomcp.articles.preprints import ( BiorxivClient, EuropePMCClient, PreprintSearcher, ) from biomcp.articles.search import PubmedRequest from biomcp.core import PublicationState class TestBiorxivIntegration: """Integration tests for bioRxiv API.""" @pytest.mark.asyncio async def test_biorxiv_real_search(self): """Test real bioRxiv API search.""" client = BiorxivClient() # Try multiple search terms to find one with results search_terms = ["cancer", "gene", "cell", "protein", "RNA", "DNA"] results = [] successful_term = None for term in search_terms: results = await client.search(term) if len(results) > 0: successful_term = term break # If no results with any term, the API might be down or have no recent articles if len(results) == 0: pytest.skip( "No results found with any search term - API may be down or have no matching recent articles" ) # Check the structure of results first_result = results[0] assert first_result.doi is not None assert first_result.title is not None assert first_result.publication_state == PublicationState.PREPRINT assert "preprint" in first_result.journal.lower() print( f"Found {len(results)} bioRxiv results for term '{successful_term}'" ) print(f"First result: {first_result.title}") class TestEuropePMCIntegration: """Integration tests for Europe PMC API.""" @pytest.mark.asyncio async def test_europe_pmc_real_search(self): """Test real Europe PMC API search for preprints.""" client = EuropePMCClient() # Try multiple search terms to find one with results search_terms = [ "cancer", "gene", "cell", "protein", "SARS-CoV-2", "COVID", ] results = [] successful_term = None for term in search_terms: results = await client.search(term) if len(results) > 0: successful_term = term break # If no results with any term, the API might be down if len(results) == 0: pytest.skip( "No results found with any search term - Europe PMC API may be down" ) # Check the structure first_result = results[0] assert first_result.title is not None assert first_result.publication_state == PublicationState.PREPRINT print( f"Found {len(results)} Europe PMC preprint results for term '{successful_term}'" ) print(f"First result: {first_result.title}") if first_result.doi: print(f"DOI: {first_result.doi}") class TestPreprintSearcherIntegration: """Integration tests for combined preprint search.""" @pytest.mark.asyncio async def test_combined_search_real(self): """Test searching across both preprint sources.""" searcher = PreprintSearcher() # Try different search combinations search_configs = [ {"genes": ["TP53"], "diseases": ["cancer"]}, {"keywords": ["protein", "structure"]}, {"genes": ["BRAF"], "diseases": ["melanoma"]}, {"keywords": ["gene", "expression"]}, ] response = None successful_config = None for config in search_configs: request = PubmedRequest(**config) response = await searcher.search(request) if response.count > 0: successful_config = config break print(f"Total results: {response.count if response else 0}") # Check if we got any results if response and response.count > 0: # Check result structure first = response.results[0] assert first.title is not None assert first.publication_state == PublicationState.PREPRINT print(f"Successful search config: {successful_config}") print(f"First result: {first.title}") print(f"Date: {first.date}") print(f"Journal: {first.journal}") else: pytest.skip( "No results found with any search configuration - APIs may be down" ) if __name__ == "__main__": # Run the tests directly asyncio.run(TestBiorxivIntegration().test_biorxiv_real_search()) print("\n" + "=" * 50 + "\n") asyncio.run(TestEuropePMCIntegration().test_europe_pmc_real_search()) print("\n" + "=" * 50 + "\n") asyncio.run(TestPreprintSearcherIntegration().test_combined_search_real()) ``` -------------------------------------------------------------------------------- /docs/developer-guides/05-error-handling.md: -------------------------------------------------------------------------------- ```markdown # Error Handling Guide ## Overview BioMCP uses a consistent error handling pattern across all HTTP operations. This guide explains the error types, when they occur, and how to handle them. ## Error Structure All HTTP operations return a tuple: `(data, error)` where one is always `None`. ```python data, error = await http_client.request_api(...) if error: # Handle error logger.error(f"Request failed: {error.code} - {error.message}") else: # Process data process_result(data) ``` ## Error Types ### Network Errors - **When**: Connection timeout, DNS resolution failure, network unreachable - **Error Code**: Various HTTP client exceptions - **Handling**: Retry with exponential backoff or fail gracefully ### HTTP Status Errors - **When**: Server returns 4xx or 5xx status codes - **Error Codes**: - `400-499`: Client errors (bad request, unauthorized, not found) - `500-599`: Server errors (internal error, service unavailable) - **Handling**: - 4xx: Fix request parameters or authentication - 5xx: Retry with backoff or use cached data ### Circuit Breaker Errors - **When**: Too many consecutive failures to a domain - **Error**: Circuit breaker opens to prevent cascading failures - **Handling**: Wait for recovery timeout or use alternative data source ### Offline Mode Errors - **When**: `BIOMCP_OFFLINE=true` and no cached data available - **Error**: Request blocked in offline mode - **Handling**: Use cached data only or inform user about offline status ### Parse Errors - **When**: Response is not valid JSON or doesn't match expected schema - **Error**: JSON decode error or validation error - **Handling**: Log error and treat as service issue ## Best Practices ### 1. Always Check Errors ```python # ❌ Bad - ignoring error data, _ = await http_client.request_api(...) process(data) # data might be None! # ✅ Good - checking error data, error = await http_client.request_api(...) if error: logger.warning(f"Failed to fetch data: {error}") return None process(data) ``` ### 2. Provide Context in Error Messages ```python # ❌ Bad - generic error if error: logger.error("Request failed") # ✅ Good - contextual error if error: logger.error(f"Failed to fetch gene {gene_id} from cBioPortal: {error.message}") ``` ### 3. Graceful Degradation ```python async def get_variant_with_fallback(variant_id: str): # Try primary source data, error = await primary_source.get_variant(variant_id) if not error: return data logger.warning(f"Primary source failed: {error}, trying secondary") # Try secondary source data, error = await secondary_source.get_variant(variant_id) if not error: return data # Use cached data as last resort return get_cached_variant(variant_id) ``` ### 4. User-Friendly Error Messages ```python def format_error_for_user(error: RequestError) -> str: if error.code >= 500: return "The service is temporarily unavailable. Please try again later." elif error.code == 404: return "The requested data was not found." elif error.code == 401: return "Authentication required. Please check your credentials." elif "OFFLINE" in str(error): return "You are in offline mode. Only cached data is available." else: return "An error occurred while fetching data. Please try again." ``` ## Testing Error Conditions ### 1. Simulate Network Errors ```python with patch("biomcp.http_client.call_http") as mock: mock.side_effect = Exception("Network error") data, error = await client.fetch_data() assert error is not None assert data is None ``` ### 2. Test Circuit Breaker ```python # Simulate multiple failures for _ in range(5): with patch("biomcp.http_client.call_http") as mock: mock.return_value = (500, "Server Error") await client.fetch_data() # Circuit should be open data, error = await client.fetch_data() assert error is not None assert "circuit" in error.message.lower() ``` ### 3. Test Offline Mode ```python with patch.dict(os.environ, {"BIOMCP_OFFLINE": "true"}): data, error = await client.fetch_data() # Should only return cached data or error ``` ## Common Patterns ### Retry with Backoff The centralized HTTP client automatically retries with exponential backoff for: - Network errors - 5xx server errors - Rate limit errors (429) ### Caching Failed requests don't overwrite cached data, ensuring availability during outages. ### Rate Limiting Requests are automatically rate-limited per domain to prevent overwhelming services. ## Debugging Enable debug logging to see all HTTP requests and errors: ```python import logging logging.getLogger("biomcp.http_client").setLevel(logging.DEBUG) ``` This will show: - All HTTP requests with URLs and methods - Response status codes and times - Error details and retry attempts - Circuit breaker state changes ``` -------------------------------------------------------------------------------- /src/biomcp/openfda/cache.py: -------------------------------------------------------------------------------- ```python """ Simple in-memory caching for OpenFDA API responses. This module provides a time-based cache to reduce API calls and improve performance. Cache entries expire after a configurable TTL (time-to-live). """ import hashlib import json import logging import os from datetime import datetime, timedelta from typing import Any logger = logging.getLogger(__name__) # Cache configuration CACHE_TTL_MINUTES = int(os.environ.get("BIOMCP_FDA_CACHE_TTL", "15")) MAX_CACHE_SIZE = int(os.environ.get("BIOMCP_FDA_MAX_CACHE_SIZE", "100")) MAX_RESPONSE_SIZE = int( os.environ.get("BIOMCP_FDA_MAX_RESPONSE_SIZE", str(1024 * 1024)) ) # 1MB default # Global cache dictionary _cache: dict[str, tuple[Any, datetime]] = {} def _generate_cache_key(endpoint: str, params: dict[str, Any]) -> str: """ Generate a unique cache key for an API request. Args: endpoint: The API endpoint URL params: Query parameters Returns: A unique hash key for the request """ # Remove sensitive parameters before hashing safe_params = { k: v for k, v in params.items() if k.lower() not in ["api_key", "apikey", "key", "token", "secret"] } # Sort params for consistent hashing sorted_params = json.dumps(safe_params, sort_keys=True) combined = f"{endpoint}:{sorted_params}" # Use SHA256 for cache key return hashlib.sha256(combined.encode()).hexdigest() def get_cached_response( endpoint: str, params: dict[str, Any] ) -> dict[str, Any] | None: """ Retrieve a cached response if available and not expired. Args: endpoint: The API endpoint URL params: Query parameters Returns: Cached response data or None if not found/expired """ cache_key = _generate_cache_key(endpoint, params) if cache_key in _cache: data, timestamp = _cache[cache_key] # Check if cache entry is still valid age = datetime.now() - timestamp if age < timedelta(minutes=CACHE_TTL_MINUTES): logger.debug( f"Cache hit for {endpoint} (age: {age.total_seconds():.1f}s)" ) return data else: # Remove expired entry del _cache[cache_key] logger.debug(f"Cache expired for {endpoint}") return None def set_cached_response( endpoint: str, params: dict[str, Any], response: dict[str, Any] ) -> None: """ Store a response in the cache. Args: endpoint: The API endpoint URL params: Query parameters response: Response data to cache """ # Check response size limit import json import sys # Better size estimation using JSON serialization try: response_json = json.dumps(response) response_size = len(response_json.encode("utf-8")) except (TypeError, ValueError): # If can't serialize, use sys.getsizeof response_size = sys.getsizeof(response) if response_size > MAX_RESPONSE_SIZE: logger.warning( f"Response too large to cache: {response_size} bytes > {MAX_RESPONSE_SIZE} bytes" ) return # Check cache size limit if len(_cache) >= MAX_CACHE_SIZE: # Remove oldest entries (simple FIFO) oldest_keys = sorted(_cache.keys(), key=lambda k: _cache[k][1])[ : len(_cache) - MAX_CACHE_SIZE + 1 ] for key in oldest_keys: del _cache[key] logger.debug( f"Cache size limit reached, removed {len(oldest_keys)} entries" ) cache_key = _generate_cache_key(endpoint, params) _cache[cache_key] = (response, datetime.now()) logger.debug(f"Cached response for {endpoint} (cache size: {len(_cache)})") def clear_cache() -> None: """Clear all cached responses.""" global _cache size = len(_cache) _cache = {} logger.info(f"Cleared FDA cache ({size} entries)") def get_cache_stats() -> dict[str, Any]: """ Get cache statistics. Returns: Dictionary with cache statistics """ now = datetime.now() valid_count = 0 total_age = 0.0 for _data, timestamp in _cache.values(): age = (now - timestamp).total_seconds() if age < CACHE_TTL_MINUTES * 60: valid_count += 1 total_age += age avg_age = total_age / valid_count if valid_count > 0 else 0 return { "total_entries": len(_cache), "valid_entries": valid_count, "expired_entries": len(_cache) - valid_count, "average_age_seconds": avg_age, "ttl_minutes": CACHE_TTL_MINUTES, "max_size": MAX_CACHE_SIZE, } def is_cacheable_request(endpoint: str, params: dict[str, Any]) -> bool: """ Determine if a request should be cached. Args: endpoint: The API endpoint URL params: Query parameters Returns: True if the request should be cached """ # Don't cache if caching is disabled if CACHE_TTL_MINUTES <= 0: return False # Don't cache very large requests return params.get("limit", 0) <= 100 ``` -------------------------------------------------------------------------------- /tests/tdd/drugs/test_drug_getter.py: -------------------------------------------------------------------------------- ```python """Unit tests for drug information retrieval.""" import json import pytest from biomcp.drugs.getter import get_drug class TestDrugGetter: """Test drug information retrieval.""" @pytest.fixture def mock_drug_response(self): """Mock drug response from MyChem.info.""" return { "_id": "CHEMBL941", "name": "Imatinib", "drugbank": { "id": "DB00619", "name": "Imatinib", "description": "Imatinib is a tyrosine kinase inhibitor...", "indication": "Treatment of chronic myeloid leukemia...", "mechanism_of_action": "Inhibits BCR-ABL tyrosine kinase...", "products": {"name": ["Gleevec", "Glivec"]}, }, "chembl": { "molecule_chembl_id": "CHEMBL941", "pref_name": "IMATINIB", }, "pubchem": {"cid": 5291}, "chebi": {"id": "CHEBI:45783", "name": "imatinib"}, "inchikey": "KTUFNOKKBVMGRW-UHFFFAOYSA-N", "formula": "C29H31N7O", } @pytest.mark.asyncio async def test_get_drug_by_name(self, monkeypatch, mock_drug_response): """Test getting drug by name.""" # Mock the API call call_count = 0 responses = [ # Query response ({"hits": [{"_id": "CHEMBL941"}]}, None), # Get response (mock_drug_response, None), ] async def mock_request_api(url, request, method, domain): nonlocal call_count result = responses[call_count] call_count += 1 return result monkeypatch.setattr("biomcp.http_client.request_api", mock_request_api) result = await get_drug("imatinib") assert "## Drug: Imatinib" in result assert "DrugBank ID**: DB00619" in result assert "ChEMBL ID**: CHEMBL941" in result assert "Formula**: C29H31N7O" in result assert "Trade Names**: Gleevec, Glivec" in result assert "External Links" in result assert "DrugBank](https://www.drugbank.ca/drugs/DB00619)" in result @pytest.mark.asyncio async def test_get_drug_by_id(self, monkeypatch, mock_drug_response): """Test getting drug by DrugBank ID.""" # Mock the API call async def mock_request_api(url, request, method, domain): return (mock_drug_response, None) monkeypatch.setattr("biomcp.http_client.request_api", mock_request_api) result = await get_drug("DB00619") assert "## Drug: Imatinib" in result assert "DrugBank ID**: DB00619" in result @pytest.mark.asyncio async def test_get_drug_json_output(self, monkeypatch, mock_drug_response): """Test getting drug with JSON output.""" # Mock the API call async def mock_request_api(url, request, method, domain): return (mock_drug_response, None) monkeypatch.setattr("biomcp.http_client.request_api", mock_request_api) result = await get_drug("DB00619", output_json=True) data = json.loads(result) assert data["drug_id"] == "CHEMBL941" assert data["name"] == "Imatinib" assert data["drugbank_id"] == "DB00619" assert ( data["_links"]["DrugBank"] == "https://www.drugbank.ca/drugs/DB00619" ) @pytest.mark.asyncio async def test_drug_not_found(self, monkeypatch): """Test drug not found.""" # Mock the API call async def mock_request_api(url, request, method, domain): return ({"hits": []}, None) monkeypatch.setattr("biomcp.http_client.request_api", mock_request_api) result = await get_drug("INVALID_DRUG_XYZ") assert "Drug 'INVALID_DRUG_XYZ' not found" in result @pytest.mark.asyncio async def test_drug_with_description_truncation(self, monkeypatch): """Test drug with long description gets truncated.""" long_desc = "A" * 600 mock_response = { "_id": "TEST001", "name": "TestDrug", "drugbank": {"id": "DB99999", "description": long_desc}, } async def mock_request_api(url, request, method, domain): return (mock_response, None) monkeypatch.setattr("biomcp.http_client.request_api", mock_request_api) result = await get_drug("DB99999") assert "Description" in result assert "A" * 500 in result assert "..." in result # Truncation indicator @pytest.mark.asyncio async def test_drug_error_handling(self, monkeypatch): """Test error handling.""" # Mock the API call to raise an exception async def mock_request_api(url, request, method, domain): raise Exception("API error") monkeypatch.setattr("biomcp.http_client.request_api", mock_request_api) result = await get_drug("imatinib") # When an exception occurs, it's caught and the drug is reported as not found assert "Drug 'imatinib' not found in MyChem.info" in result ``` -------------------------------------------------------------------------------- /src/biomcp/drugs/getter.py: -------------------------------------------------------------------------------- ```python """Drug information retrieval from MyChem.info.""" import json import logging from ..integrations import BioThingsClient logger = logging.getLogger(__name__) def _add_drug_links(drug_info, result: dict) -> None: """Add external database links for the drug.""" links = {} if drug_info.drugbank_id: links["DrugBank"] = ( f"https://www.drugbank.ca/drugs/{drug_info.drugbank_id}" ) if drug_info.chembl_id: links["ChEMBL"] = ( f"https://www.ebi.ac.uk/chembl/compound_report_card/{drug_info.chembl_id}/" ) if drug_info.pubchem_cid: links["PubChem"] = ( f"https://pubchem.ncbi.nlm.nih.gov/compound/{drug_info.pubchem_cid}" ) if drug_info.chebi_id: chebi_id = drug_info.chebi_id.replace("CHEBI:", "") links["ChEBI"] = ( f"https://www.ebi.ac.uk/chebi/searchId.do?chebiId=CHEBI:{chebi_id}" ) if links: result["_links"] = links def _format_basic_info(drug_info, output_lines: list[str]) -> None: """Format basic drug information.""" if drug_info.formula: output_lines.append(f"- **Formula**: {drug_info.formula}") if drug_info.drugbank_id: output_lines.append(f"- **DrugBank ID**: {drug_info.drugbank_id}") if drug_info.chembl_id: output_lines.append(f"- **ChEMBL ID**: {drug_info.chembl_id}") if drug_info.pubchem_cid: output_lines.append(f"- **PubChem CID**: {drug_info.pubchem_cid}") if drug_info.chebi_id: output_lines.append(f"- **ChEBI ID**: {drug_info.chebi_id}") if drug_info.inchikey: output_lines.append(f"- **InChIKey**: {drug_info.inchikey}") def _format_clinical_info(drug_info, output_lines: list[str]) -> None: """Format clinical drug information.""" if drug_info.tradename: names = drug_info.tradename[:5] # Limit to first 5 output_lines.append(f"- **Trade Names**: {', '.join(names)}") if len(drug_info.tradename) > 5: output_lines.append(f" (and {len(drug_info.tradename) - 5} more)") if drug_info.description: desc = drug_info.description[:500] if len(drug_info.description) > 500: desc += "..." output_lines.append(f"\n### Description\n{desc}") if drug_info.indication: ind = drug_info.indication[:500] if len(drug_info.indication) > 500: ind += "..." output_lines.append(f"\n### Indication\n{ind}") if drug_info.mechanism_of_action: moa = drug_info.mechanism_of_action[:500] if len(drug_info.mechanism_of_action) > 500: moa += "..." output_lines.append(f"\n### Mechanism of Action\n{moa}") def _format_drug_output(drug_info, result: dict) -> None: """Format drug information for text output.""" output_lines = [f"## Drug: {drug_info.name or 'Unknown'}"] _format_basic_info(drug_info, output_lines) _format_clinical_info(drug_info, output_lines) if result.get("_links"): output_lines.append("\n### External Links") for name, url in result["_links"].items(): output_lines.append(f"- [{name}]({url})") result["_formatted"] = "\n".join(output_lines) async def get_drug(drug_id_or_name: str, output_json: bool = False) -> str: """Get drug information from MyChem.info. Args: drug_id_or_name: Drug ID (DrugBank, ChEMBL, etc.) or name output_json: Return JSON instead of formatted text Returns: Formatted drug information or JSON string """ try: client = BioThingsClient() drug_info = await client.get_drug_info(drug_id_or_name) if not drug_info: error_msg = f"Drug '{drug_id_or_name}' not found in MyChem.info" if output_json: return json.dumps({"error": error_msg}, indent=2) return error_msg # Build result dictionary result = drug_info.model_dump(by_alias=False, exclude_none=True) # Add external links _add_drug_links(drug_info, result) if output_json: return json.dumps(result, indent=2) # Format for text output _format_drug_output(drug_info, result) return result["_formatted"] except Exception as e: logger.error(f"Error getting drug info: {e}") error_msg = f"Error retrieving drug information: {e!s}" if output_json: return json.dumps({"error": error_msg}, indent=2) return error_msg # MCP tool function async def _drug_details(drug_id_or_name: str) -> str: """Get drug/chemical information from MyChem.info. This tool retrieves comprehensive drug information including: - Drug identifiers (DrugBank, ChEMBL, PubChem, etc.) - Chemical properties (formula, InChIKey) - Trade names and synonyms - Clinical indications - Mechanism of action - Links to external databases Args: drug_id_or_name: Drug name (e.g., "aspirin") or ID (e.g., "DB00945", "CHEMBL25") Returns: Formatted drug information with external database links """ return await get_drug(drug_id_or_name, output_json=False) ``` -------------------------------------------------------------------------------- /src/biomcp/prefetch.py: -------------------------------------------------------------------------------- ```python """Prefetching system for common queries to improve performance. This module implements a prefetching mechanism that warms up caches with commonly searched biomedical entities during startup. This significantly improves response times for frequent queries. Key Features: - Prefetches common genes, diseases, and chemicals on startup - Runs asynchronously to avoid blocking server initialization - Includes timeout to prevent startup delays - Graceful error handling if prefetching fails The prefetching runs automatically when the MCP server starts via the lifespan hook in core.py. Configuration: The lists of entities to prefetch can be customized by modifying the COMMON_GENES, COMMON_DISEASES, and COMMON_CHEMICALS constants. """ import asyncio import logging from .constants import ( PREFETCH_TIMEOUT, PREFETCH_TOP_CHEMICALS, PREFETCH_TOP_DISEASES, PREFETCH_TOP_GENES, ) logger = logging.getLogger(__name__) # Common genes that are frequently searched COMMON_GENES = [ "BRAF", "EGFR", "TP53", "KRAS", "ALK", "ROS1", "MET", "RET", "NTRK1", "NTRK2", "NTRK3", ] # Common cancer types COMMON_DISEASES = [ "lung cancer", "breast cancer", "colorectal cancer", "melanoma", "non-small cell lung cancer", "small cell lung cancer", ] # Common drug names COMMON_CHEMICALS = [ "osimertinib", "pembrolizumab", "nivolumab", "dabrafenib", "trametinib", "crizotinib", "alectinib", ] class PrefetchManager: """Manages prefetching of common queries.""" def __init__(self): self._prefetch_task: asyncio.Task | None = None self._is_prefetching = False self._prefetch_complete = False async def start_prefetching(self): """Start prefetching common queries in the background.""" if self._is_prefetching or self._prefetch_complete: return self._is_prefetching = True try: # Start prefetch task self._prefetch_task = asyncio.create_task( self._prefetch_common_queries() ) except Exception as e: logger.warning(f"Failed to start prefetching: {e}") self._is_prefetching = False async def _prefetch_common_queries(self): """Prefetch common queries to warm up the cache.""" try: # Import here to avoid circular imports from .articles.autocomplete import EntityRequest, autocomplete from .variants.cbioportal_search import CBioPortalSearchClient tasks = [] # Prefetch gene autocomplete for gene in COMMON_GENES[ :PREFETCH_TOP_GENES ]: # Limit to avoid overload request = EntityRequest(concept="gene", query=gene, limit=1) tasks.append(autocomplete(request)) # Prefetch disease autocomplete for disease in COMMON_DISEASES[:PREFETCH_TOP_DISEASES]: request = EntityRequest( concept="disease", query=disease, limit=1 ) tasks.append(autocomplete(request)) # Prefetch chemical autocomplete for chemical in COMMON_CHEMICALS[:PREFETCH_TOP_CHEMICALS]: request = EntityRequest( concept="chemical", query=chemical, limit=1 ) tasks.append(autocomplete(request)) # Execute all autocomplete prefetches if tasks: await asyncio.gather(*tasks, return_exceptions=True) # Prefetch cBioPortal summaries for common genes cbio_client = CBioPortalSearchClient() cbio_tasks = [] for gene in COMMON_GENES[:PREFETCH_TOP_GENES]: # Top genes cbio_tasks.append( cbio_client.get_gene_search_summary(gene, max_studies=5) ) if cbio_tasks: await asyncio.gather(*cbio_tasks, return_exceptions=True) logger.info("Prefetching completed successfully") except Exception as e: logger.warning(f"Error during prefetching: {e}") finally: self._is_prefetching = False self._prefetch_complete = True async def wait_for_prefetch(self, timeout: float = PREFETCH_TIMEOUT): """Wait for prefetch to complete with timeout.""" if not self._prefetch_task: return try: await asyncio.wait_for(self._prefetch_task, timeout=timeout) except asyncio.TimeoutError: # Prefetch taking too long, continue without waiting logger.debug("Prefetch timeout - continuing without waiting") except Exception as e: # Ignore prefetch errors logger.debug(f"Prefetch error ignored: {e}") # Global prefetch manager _prefetch_manager = PrefetchManager() async def start_prefetching(): """Start the prefetching process.""" await _prefetch_manager.start_prefetching() async def wait_for_prefetch(timeout: float = PREFETCH_TIMEOUT): """Wait for prefetch to complete.""" await _prefetch_manager.wait_for_prefetch(timeout) ``` -------------------------------------------------------------------------------- /docs/backend-services-reference/01-overview.md: -------------------------------------------------------------------------------- ```markdown # Backend Services Reference Overview BioMCP integrates with multiple biomedical databases and services to provide comprehensive research capabilities. This reference documents the underlying APIs and their capabilities. ## Service Categories ### Literature and Publications - **[PubTator3](06-pubtator3.md)**: Biomedical literature with entity annotations - **Europe PMC**: Preprints from bioRxiv and medRxiv ### Clinical Trials - **[ClinicalTrials.gov](04-clinicaltrials-gov.md)**: U.S. and international clinical trials registry - **[NCI CTS API](05-nci-cts-api.md)**: National Cancer Institute's enhanced trial search ### Biomedical Annotations - **[BioThings Suite](02-biothings-suite.md)**: - MyGene.info - Gene annotations - MyVariant.info - Variant annotations - MyDisease.info - Disease ontology - MyChem.info - Drug/chemical data ### Cancer Genomics - **[cBioPortal](03-cbioportal.md)**: Cancer genomics portal with mutation data - **TCGA**: The Cancer Genome Atlas (via MyVariant.info) ### Variant Effect Prediction - **[AlphaGenome](07-alphagenome.md)**: Google DeepMind's AI for regulatory predictions ## API Authentication | Service | Authentication Required | Type | Rate Limits | | ------------------ | ----------------------- | ------- | ------------------- | | PubTator3 | No | Public | 3 requests/second | | ClinicalTrials.gov | No | Public | 50,000 requests/day | | NCI CTS API | Yes | API Key | 1,000 requests/day | | BioThings APIs | No | Public | 1,000 requests/hour | | cBioPortal | Optional | Token | Higher with token | | AlphaGenome | Yes | API Key | Contact provider | ## Data Flow Architecture ``` User Query → BioMCP Tools → Backend APIs → Unified Response Example Flow: 1. User: "Find articles about BRAF mutations" 2. BioMCP: article_searcher tool 3. APIs Called: - PubTator3 (articles) - cBioPortal (mutation data) - Europe PMC (preprints) 4. Response: Integrated results with citations ``` ## Service Reliability ### Primary Services - **PubTator3**: 99.9% uptime, updated daily - **ClinicalTrials.gov**: 99.5% uptime, updated daily - **BioThings APIs**: 99.9% uptime, real-time data ### Fallback Strategies - Cache frequently accessed data - Implement exponential backoff - Use alternative endpoints when available ## Common Integration Patterns ### 1. Entity Recognition Enhancement ``` PubTator3 → Extract entities → BioThings → Get detailed annotations ``` ### 2. Variant to Trial Pipeline ``` MyVariant.info → Get gene → ClinicalTrials.gov → Find relevant trials ``` ### 3. Comprehensive Gene Analysis ``` MyGene.info → Basic info cBioPortal → Cancer mutations PubTator3 → Literature AlphaGenome → Predictions ``` ## Performance Considerations ### Response Times (typical) - PubTator3: 200-500ms - ClinicalTrials.gov: 300-800ms - BioThings APIs: 100-300ms - cBioPortal: 200-600ms - AlphaGenome: 1-3 seconds ### Optimization Strategies 1. **Batch requests** when APIs support it 2. **Cache static data** (gene names, ontologies) 3. **Parallelize independent** API calls 4. **Use pagination** for large result sets ## Error Handling ### Common Error Types - **Rate Limiting**: 429 errors, implement backoff - **Invalid Parameters**: 400 errors, validate inputs - **Service Unavailable**: 503 errors, retry with delay - **Authentication**: 401 errors, check API keys ### Error Response Format ```json { "error": { "code": "RATE_LIMIT_EXCEEDED", "message": "API rate limit exceeded", "retry_after": 3600 } } ``` ## Data Formats ### Input Formats - **Identifiers**: HGNC symbols, rsIDs, NCT numbers, PMIDs - **Coordinates**: GRCh38 genomic positions - **Terms**: MeSH, MONDO, HPO ontologies ### Output Formats - **JSON**: Primary format for all APIs - **XML**: Available for some services - **TSV/CSV**: Export options for bulk data ## Update Frequencies | Service | Update Frequency | Data Lag | | ------------------ | ---------------- | ---------- | | PubTator3 | Daily | 1-2 days | | ClinicalTrials.gov | Daily | Real-time | | NCI CTS | Daily | 1 day | | BioThings | Real-time | Minutes | | cBioPortal | Quarterly | 3-6 months | ## Best Practices ### 1. API Key Management - Store keys securely - Rotate keys periodically - Monitor usage against limits ### 2. Error Recovery - Implement retry logic - Log failed requests - Provide fallback data ### 3. Data Validation - Verify gene symbols - Validate genomic coordinates - Check identifier formats ### 4. Performance - Cache when appropriate - Batch similar requests - Use appropriate page sizes ## Getting Started 1. Review individual service documentation 2. Obtain necessary API keys 3. Test endpoints with sample data 4. Implement error handling 5. Monitor usage and performance ## Support Resources - **PubTator3**: [Support Forum](https://www.ncbi.nlm.nih.gov/research/pubtator3/) - **ClinicalTrials.gov**: [Help Desk](https://clinicaltrials.gov/help) - **BioThings**: [Documentation](https://docs.biothings.io/) - **cBioPortal**: [User Guide](https://docs.cbioportal.org/) - **NCI**: [API Support](https://api.cancer.gov/support) ``` -------------------------------------------------------------------------------- /tests/tdd/test_concurrent_requests.py: -------------------------------------------------------------------------------- ```python """Test concurrent request handling in the HTTP client.""" import asyncio from unittest.mock import AsyncMock, patch import pytest from biomcp import http_client class TestConcurrentRequests: """Test concurrent request handling.""" @pytest.mark.asyncio async def test_concurrent_requests_same_domain(self): """Test multiple concurrent requests to the same domain.""" # Use patch instead of direct replacement with patch( "biomcp.http_client.call_http", new_callable=AsyncMock ) as mock_call: # Configure mock to return success mock_call.return_value = (200, '{"data": "response"}') # Make 10 concurrent requests with different URLs to avoid caching # and disable caching explicitly tasks = [ http_client.request_api( url=f"https://api.example.com/resource/{i}", request={}, domain="example", cache_ttl=0, # Disable caching ) for i in range(10) ] results = await asyncio.gather(*tasks) # All requests should succeed assert len(results) == 10 for data, error in results: assert error is None assert data == {"data": "response"} # Check that rate limiting was applied assert mock_call.call_count == 10 @pytest.mark.asyncio async def test_concurrent_requests_different_domains(self): """Test concurrent requests to different domains.""" with patch( "biomcp.http_client.call_http", new_callable=AsyncMock ) as mock_call: # Return different responses based on URL async def side_effect(method, url, *args, **kwargs): if "domain1" in url: return (200, '{"source": "domain1"}') elif "domain2" in url: return (200, '{"source": "domain2"}') else: return (200, '{"source": "other"}') mock_call.side_effect = side_effect # Make requests to different domains tasks = [ http_client.request_api( "https://domain1.com/api", {}, domain="domain1" ), http_client.request_api( "https://domain2.com/api", {}, domain="domain2" ), http_client.request_api( "https://domain3.com/api", {}, domain="domain3" ), ] results = await asyncio.gather(*tasks) # Check results assert results[0][0] == {"source": "domain1"} assert results[1][0] == {"source": "domain2"} assert results[2][0] == {"source": "other"} @pytest.mark.asyncio async def test_concurrent_cache_access(self): """Test that concurrent requests properly use cache.""" with patch( "biomcp.http_client.call_http", new_callable=AsyncMock ) as mock_call: mock_call.return_value = (200, '{"data": "cached"}') # First request to populate cache await http_client.request_api( url="https://api.example.com/data", request={}, domain="example", cache_ttl=60, ) # Reset call count initial_calls = mock_call.call_count # Make 5 concurrent requests to same URL tasks = [ http_client.request_api( url="https://api.example.com/data", request={}, domain="example", cache_ttl=60, ) for _ in range(5) ] results = await asyncio.gather(*tasks) # All should get cached response assert len(results) == 5 for data, _error in results: assert data == {"data": "cached"} # No additional HTTP calls should have been made assert mock_call.call_count == initial_calls @pytest.mark.asyncio async def test_concurrent_circuit_breaker(self): """Test circuit breaker behavior with concurrent failures.""" with patch( "biomcp.http_client.call_http", new_callable=AsyncMock ) as mock_call: # Simulate failures mock_call.return_value = (500, "Internal Server Error") # Make concurrent failing requests tasks = [ http_client.request_api( url=f"https://failing.com/api/{i}", request={}, domain="failing", ) for i in range(10) ] results = await asyncio.gather(*tasks, return_exceptions=True) # All should fail error_count = sum(1 for _, error in results if error is not None) assert error_count == 10 # Circuit should be open now # Additional requests should fail immediately _, error = await http_client.request_api( url="https://failing.com/api/test", request={}, domain="failing", ) assert error is not None # Check that circuit breaker is preventing calls # (exact behavior depends on implementation details) ``` -------------------------------------------------------------------------------- /tests/tdd/test_connection_pool.py: -------------------------------------------------------------------------------- ```python """Tests for connection pool management.""" import asyncio import ssl import weakref from unittest.mock import patch import httpx import pytest from biomcp.connection_pool import ( EventLoopConnectionPools, close_all_pools, get_connection_pool, ) @pytest.fixture def pool_manager(): """Create a fresh pool manager for testing.""" return EventLoopConnectionPools() @pytest.mark.asyncio async def test_get_pool_creates_new_pool(pool_manager): """Test that get_pool creates a new pool when none exists.""" timeout = httpx.Timeout(30) pool = await pool_manager.get_pool(verify=True, timeout=timeout) assert pool is not None assert isinstance(pool, httpx.AsyncClient) assert not pool.is_closed @pytest.mark.asyncio async def test_get_pool_reuses_existing_pool(pool_manager): """Test that get_pool reuses existing pools.""" timeout = httpx.Timeout(30) pool1 = await pool_manager.get_pool(verify=True, timeout=timeout) pool2 = await pool_manager.get_pool(verify=True, timeout=timeout) assert pool1 is pool2 @pytest.mark.asyncio async def test_get_pool_different_verify_settings(pool_manager): """Test that different verify settings create different pools.""" timeout = httpx.Timeout(30) pool1 = await pool_manager.get_pool(verify=True, timeout=timeout) pool2 = await pool_manager.get_pool(verify=False, timeout=timeout) assert pool1 is not pool2 @pytest.mark.asyncio async def test_get_pool_ssl_context(pool_manager): """Test pool creation with SSL context.""" ssl_context = ssl.create_default_context() timeout = httpx.Timeout(30) pool = await pool_manager.get_pool(verify=ssl_context, timeout=timeout) assert pool is not None assert isinstance(pool, httpx.AsyncClient) @pytest.mark.asyncio async def test_pool_cleanup_on_close_all(pool_manager): """Test that close_all properly closes all pools.""" timeout = httpx.Timeout(30) await pool_manager.get_pool(verify=True, timeout=timeout) await pool_manager.get_pool(verify=False, timeout=timeout) await pool_manager.close_all() # After close_all, pools should be cleared assert len(pool_manager._loop_pools) == 0 @pytest.mark.asyncio async def test_no_event_loop_returns_single_use_client(pool_manager): """Test behavior when no event loop is running.""" with patch("asyncio.get_running_loop", side_effect=RuntimeError): timeout = httpx.Timeout(30) pool = await pool_manager.get_pool(verify=True, timeout=timeout) assert pool is not None # Single-use client should have no keepalive # Note: httpx client internal structure may vary @pytest.mark.asyncio async def test_pool_recreation_after_close(pool_manager): """Test that a new pool is created after the old one is closed.""" timeout = httpx.Timeout(30) pool1 = await pool_manager.get_pool(verify=True, timeout=timeout) await pool1.aclose() pool2 = await pool_manager.get_pool(verify=True, timeout=timeout) assert pool1 is not pool2 assert pool1.is_closed assert not pool2.is_closed @pytest.mark.asyncio async def test_weak_reference_cleanup(): """Test that weak references are used for event loops.""" pool_manager = EventLoopConnectionPools() # Verify that the pool manager uses weak references assert isinstance(pool_manager._loop_pools, weakref.WeakKeyDictionary) # Create a pool timeout = httpx.Timeout(30) pool = await pool_manager.get_pool(verify=True, timeout=timeout) # Verify pool was created assert pool is not None # The current event loop should be in the weak key dict current_loop = asyncio.get_running_loop() assert current_loop in pool_manager._loop_pools @pytest.mark.asyncio async def test_global_get_connection_pool(): """Test the global get_connection_pool function.""" with patch.dict("os.environ", {"BIOMCP_USE_CONNECTION_POOL": "true"}): timeout = httpx.Timeout(30) pool = await get_connection_pool(verify=True, timeout=timeout) assert pool is not None assert isinstance(pool, httpx.AsyncClient) @pytest.mark.asyncio async def test_global_close_all_pools(): """Test the global close_all_pools function.""" # Create some pools timeout = httpx.Timeout(30) await get_connection_pool(verify=True, timeout=timeout) await get_connection_pool(verify=False, timeout=timeout) # Close all pools await close_all_pools() # Verify cleanup (this is implementation-specific) from biomcp.connection_pool import _pool_manager assert len(_pool_manager._loop_pools) == 0 @pytest.mark.asyncio async def test_concurrent_pool_creation(pool_manager): """Test thread-safe pool creation under concurrent access.""" timeout = httpx.Timeout(30) async def get_pool(): return await pool_manager.get_pool(verify=True, timeout=timeout) # Create 10 concurrent requests for the same pool pools = await asyncio.gather(*[get_pool() for _ in range(10)]) # All should return the same pool instance assert all(pool is pools[0] for pool in pools) @pytest.mark.asyncio async def test_connection_pool_limits(): """Test that connection pools have proper limits set.""" pool_manager = EventLoopConnectionPools() timeout = httpx.Timeout(30) pool = await pool_manager.get_pool(verify=True, timeout=timeout) # Verify pool was created (actual limits are internal to httpx) assert pool is not None assert isinstance(pool, httpx.AsyncClient) ``` -------------------------------------------------------------------------------- /tests/data/myvariant/variants_part_braf_v600_multiple.json: -------------------------------------------------------------------------------- ```json [ { "_id": "chr7:g.140453136A>G", "_score": 19.419012, "cadd": { "_license": "http://bit.ly/2TIuab9", "phred": 21.2 }, "chrom": "7", "clinvar": { "_license": "http://bit.ly/2SQdcI0", "rcv": { "clinical_significance": "Likely pathogenic" }, "variant_id": 376288 }, "cosmic": { "_license": "http://bit.ly/2VMkY7R", "cosmic_id": "COSM18443" }, "dbnsfp": { "_license": "http://bit.ly/2VLnQBz", "genename": ["BRAF", "BRAF", "BRAF", "BRAF"], "hgvsc": ["c.620T>C", "c.1919T>C", "c.1799T>C"], "hgvsp": ["p.V600A", "p.Val600Ala", "p.Val640Ala", "p.Val207Ala"], "polyphen2": { "hdiv": { "pred": "B", "score": 0.207 } } }, "dbsnp": { "_license": "http://bit.ly/2AqoLOc", "rsid": "rs113488022" }, "vcf": { "alt": "G", "position": "140453136", "ref": "A" } }, { "_id": "chr7:g.140453136A>T", "_score": 18.693962, "cadd": { "_license": "http://bit.ly/2TIuab9", "phred": 32 }, "chrom": "7", "civic": { "_license": "http://bit.ly/2FqS871", "id": 12, "openCravatUrl": "https://run.opencravat.org/webapps/variantreport/index.html?alt_base=T&chrom=chr7&pos=140753336&ref_base=A" }, "clinvar": { "_license": "http://bit.ly/2SQdcI0", "rcv": [ { "clinical_significance": "Pathogenic" }, { "clinical_significance": "Pathogenic" }, { "clinical_significance": "Pathogenic" }, { "clinical_significance": "Pathogenic" }, { "clinical_significance": "Pathogenic" }, { "clinical_significance": "Pathogenic" }, { "clinical_significance": "Pathogenic" }, { "clinical_significance": "not provided" }, { "clinical_significance": "Likely pathogenic" }, { "clinical_significance": "Likely pathogenic" }, { "clinical_significance": "Likely pathogenic" }, { "clinical_significance": "Likely pathogenic" }, { "clinical_significance": "Likely pathogenic" }, { "clinical_significance": "Likely pathogenic" }, { "clinical_significance": "Likely pathogenic" }, { "clinical_significance": "Pathogenic" }, { "clinical_significance": "Pathogenic" }, { "clinical_significance": "Likely pathogenic" }, { "clinical_significance": "Pathogenic" }, { "clinical_significance": "Likely pathogenic" }, { "clinical_significance": "Likely pathogenic" }, { "clinical_significance": "Pathogenic" }, { "clinical_significance": "Pathogenic" }, { "clinical_significance": "Pathogenic" }, { "clinical_significance": "Pathogenic" }, { "clinical_significance": "Likely pathogenic" }, { "clinical_significance": "Pathogenic" }, { "clinical_significance": "Pathogenic" }, { "clinical_significance": "Likely pathogenic" } ], "variant_id": 13961 }, "cosmic": { "_license": "http://bit.ly/2VMkY7R", "cosmic_id": "COSM476" }, "dbnsfp": { "_license": "http://bit.ly/2VLnQBz", "genename": ["BRAF", "BRAF", "BRAF", "BRAF"], "hgvsc": ["c.620T>A", "c.1919T>A", "c.1799T>A"], "hgvsp": ["p.Val640Glu", "p.Val207Glu", "p.Val600Glu", "p.V600E"], "polyphen2": { "hdiv": { "pred": "D", "score": 0.971 } } }, "dbsnp": { "_license": "http://bit.ly/2AqoLOc", "rsid": "rs113488022" }, "exac": { "_license": "http://bit.ly/2H9c4hg", "af": 1.647e-5 }, "gnomad_exome": { "_license": "http://bit.ly/2I1cl1I", "af": { "af": 3.97994e-6 } }, "vcf": { "alt": "T", "position": "140453136", "ref": "A" } }, { "_id": "chr7:g.140453136A>C", "_score": 18.476965, "cadd": { "_license": "http://bit.ly/2TIuab9", "phred": 26.0 }, "chrom": "7", "clinvar": { "_license": "http://bit.ly/2SQdcI0", "rcv": [ { "clinical_significance": "not provided" }, { "clinical_significance": "Pathogenic" }, { "clinical_significance": "Pathogenic" }, { "clinical_significance": "Uncertain significance" } ], "variant_id": 40389 }, "cosmic": { "_license": "http://bit.ly/2VMkY7R", "cosmic_id": "COSM6137" }, "dbnsfp": { "_license": "http://bit.ly/2VLnQBz", "genename": ["BRAF", "BRAF", "BRAF", "BRAF"], "hgvsc": ["c.1919T>G", "c.1799T>G", "c.620T>G"], "hgvsp": ["p.Val640Gly", "p.Val207Gly", "p.Val600Gly", "p.V600G"], "polyphen2": { "hdiv": { "pred": "P", "score": 0.822 } } }, "dbsnp": { "_license": "http://bit.ly/2AqoLOc", "rsid": "rs113488022" }, "vcf": { "alt": "C", "position": "140453136", "ref": "A" } } ] ```