This is page 2 of 20. Use http://codebase.md/genomoncology/biomcp?lines=true&page={x} to view the full context.
# Directory Structure
```
├── .github
│ ├── actions
│ │ └── setup-python-env
│ │ └── action.yml
│ ├── dependabot.yml
│ └── workflows
│ ├── ci.yml
│ ├── deploy-docs.yml
│ ├── main.yml.disabled
│ ├── on-release-main.yml
│ └── validate-codecov-config.yml
├── .gitignore
├── .pre-commit-config.yaml
├── BIOMCP_DATA_FLOW.md
├── CHANGELOG.md
├── CNAME
├── codecov.yaml
├── docker-compose.yml
├── Dockerfile
├── docs
│ ├── apis
│ │ ├── error-codes.md
│ │ ├── overview.md
│ │ └── python-sdk.md
│ ├── assets
│ │ ├── biomcp-cursor-locations.png
│ │ ├── favicon.ico
│ │ ├── icon.png
│ │ ├── logo.png
│ │ ├── mcp_architecture.txt
│ │ └── remote-connection
│ │ ├── 00_connectors.png
│ │ ├── 01_add_custom_connector.png
│ │ ├── 02_connector_enabled.png
│ │ ├── 03_connect_to_biomcp.png
│ │ ├── 04_select_google_oauth.png
│ │ └── 05_success_connect.png
│ ├── backend-services-reference
│ │ ├── 01-overview.md
│ │ ├── 02-biothings-suite.md
│ │ ├── 03-cbioportal.md
│ │ ├── 04-clinicaltrials-gov.md
│ │ ├── 05-nci-cts-api.md
│ │ ├── 06-pubtator3.md
│ │ └── 07-alphagenome.md
│ ├── blog
│ │ ├── ai-assisted-clinical-trial-search-analysis.md
│ │ ├── images
│ │ │ ├── deep-researcher-video.png
│ │ │ ├── researcher-announce.png
│ │ │ ├── researcher-drop-down.png
│ │ │ ├── researcher-prompt.png
│ │ │ ├── trial-search-assistant.png
│ │ │ └── what_is_biomcp_thumbnail.png
│ │ └── researcher-persona-resource.md
│ ├── changelog.md
│ ├── CNAME
│ ├── concepts
│ │ ├── 01-what-is-biomcp.md
│ │ ├── 02-the-deep-researcher-persona.md
│ │ └── 03-sequential-thinking-with-the-think-tool.md
│ ├── developer-guides
│ │ ├── 01-server-deployment.md
│ │ ├── 02-contributing-and-testing.md
│ │ ├── 03-third-party-endpoints.md
│ │ ├── 04-transport-protocol.md
│ │ ├── 05-error-handling.md
│ │ ├── 06-http-client-and-caching.md
│ │ ├── 07-performance-optimizations.md
│ │ └── generate_endpoints.py
│ ├── faq-condensed.md
│ ├── FDA_SECURITY.md
│ ├── genomoncology.md
│ ├── getting-started
│ │ ├── 01-quickstart-cli.md
│ │ ├── 02-claude-desktop-integration.md
│ │ └── 03-authentication-and-api-keys.md
│ ├── how-to-guides
│ │ ├── 01-find-articles-and-cbioportal-data.md
│ │ ├── 02-find-trials-with-nci-and-biothings.md
│ │ ├── 03-get-comprehensive-variant-annotations.md
│ │ ├── 04-predict-variant-effects-with-alphagenome.md
│ │ ├── 05-logging-and-monitoring-with-bigquery.md
│ │ └── 06-search-nci-organizations-and-interventions.md
│ ├── index.md
│ ├── policies.md
│ ├── reference
│ │ ├── architecture-diagrams.md
│ │ ├── quick-architecture.md
│ │ ├── quick-reference.md
│ │ └── visual-architecture.md
│ ├── robots.txt
│ ├── stylesheets
│ │ ├── announcement.css
│ │ └── extra.css
│ ├── troubleshooting.md
│ ├── tutorials
│ │ ├── biothings-prompts.md
│ │ ├── claude-code-biomcp-alphagenome.md
│ │ ├── nci-prompts.md
│ │ ├── openfda-integration.md
│ │ ├── openfda-prompts.md
│ │ ├── pydantic-ai-integration.md
│ │ └── remote-connection.md
│ ├── user-guides
│ │ ├── 01-command-line-interface.md
│ │ ├── 02-mcp-tools-reference.md
│ │ └── 03-integrating-with-ides-and-clients.md
│ └── workflows
│ └── all-workflows.md
├── example_scripts
│ ├── mcp_integration.py
│ └── python_sdk.py
├── glama.json
├── LICENSE
├── lzyank.toml
├── Makefile
├── mkdocs.yml
├── package-lock.json
├── package.json
├── pyproject.toml
├── README.md
├── scripts
│ ├── check_docs_in_mkdocs.py
│ ├── check_http_imports.py
│ └── generate_endpoints_doc.py
├── smithery.yaml
├── src
│ └── biomcp
│ ├── __init__.py
│ ├── __main__.py
│ ├── articles
│ │ ├── __init__.py
│ │ ├── autocomplete.py
│ │ ├── fetch.py
│ │ ├── preprints.py
│ │ ├── search_optimized.py
│ │ ├── search.py
│ │ └── unified.py
│ ├── biomarkers
│ │ ├── __init__.py
│ │ └── search.py
│ ├── cbioportal_helper.py
│ ├── circuit_breaker.py
│ ├── cli
│ │ ├── __init__.py
│ │ ├── articles.py
│ │ ├── biomarkers.py
│ │ ├── diseases.py
│ │ ├── health.py
│ │ ├── interventions.py
│ │ ├── main.py
│ │ ├── openfda.py
│ │ ├── organizations.py
│ │ ├── server.py
│ │ ├── trials.py
│ │ └── variants.py
│ ├── connection_pool.py
│ ├── constants.py
│ ├── core.py
│ ├── diseases
│ │ ├── __init__.py
│ │ ├── getter.py
│ │ └── search.py
│ ├── domain_handlers.py
│ ├── drugs
│ │ ├── __init__.py
│ │ └── getter.py
│ ├── exceptions.py
│ ├── genes
│ │ ├── __init__.py
│ │ └── getter.py
│ ├── http_client_simple.py
│ ├── http_client.py
│ ├── individual_tools.py
│ ├── integrations
│ │ ├── __init__.py
│ │ ├── biothings_client.py
│ │ └── cts_api.py
│ ├── interventions
│ │ ├── __init__.py
│ │ ├── getter.py
│ │ └── search.py
│ ├── logging_filter.py
│ ├── metrics_handler.py
│ ├── metrics.py
│ ├── oncokb_helper.py
│ ├── openfda
│ │ ├── __init__.py
│ │ ├── adverse_events_helpers.py
│ │ ├── adverse_events.py
│ │ ├── cache.py
│ │ ├── constants.py
│ │ ├── device_events_helpers.py
│ │ ├── device_events.py
│ │ ├── drug_approvals.py
│ │ ├── drug_labels_helpers.py
│ │ ├── drug_labels.py
│ │ ├── drug_recalls_helpers.py
│ │ ├── drug_recalls.py
│ │ ├── drug_shortages_detail_helpers.py
│ │ ├── drug_shortages_helpers.py
│ │ ├── drug_shortages.py
│ │ ├── exceptions.py
│ │ ├── input_validation.py
│ │ ├── rate_limiter.py
│ │ ├── utils.py
│ │ └── validation.py
│ ├── organizations
│ │ ├── __init__.py
│ │ ├── getter.py
│ │ └── search.py
│ ├── parameter_parser.py
│ ├── query_parser.py
│ ├── query_router.py
│ ├── rate_limiter.py
│ ├── render.py
│ ├── request_batcher.py
│ ├── resources
│ │ ├── __init__.py
│ │ ├── getter.py
│ │ ├── instructions.md
│ │ └── researcher.md
│ ├── retry.py
│ ├── router_handlers.py
│ ├── router.py
│ ├── shared_context.py
│ ├── thinking
│ │ ├── __init__.py
│ │ ├── sequential.py
│ │ └── session.py
│ ├── thinking_tool.py
│ ├── thinking_tracker.py
│ ├── trials
│ │ ├── __init__.py
│ │ ├── getter.py
│ │ ├── nci_getter.py
│ │ ├── nci_search.py
│ │ └── search.py
│ ├── utils
│ │ ├── __init__.py
│ │ ├── cancer_types_api.py
│ │ ├── cbio_http_adapter.py
│ │ ├── endpoint_registry.py
│ │ ├── gene_validator.py
│ │ ├── metrics.py
│ │ ├── mutation_filter.py
│ │ ├── query_utils.py
│ │ ├── rate_limiter.py
│ │ └── request_cache.py
│ ├── variants
│ │ ├── __init__.py
│ │ ├── alphagenome.py
│ │ ├── cancer_types.py
│ │ ├── cbio_external_client.py
│ │ ├── cbioportal_mutations.py
│ │ ├── cbioportal_search_helpers.py
│ │ ├── cbioportal_search.py
│ │ ├── constants.py
│ │ ├── external.py
│ │ ├── filters.py
│ │ ├── getter.py
│ │ ├── links.py
│ │ ├── oncokb_client.py
│ │ ├── oncokb_models.py
│ │ └── search.py
│ └── workers
│ ├── __init__.py
│ ├── worker_entry_stytch.js
│ ├── worker_entry.js
│ └── worker.py
├── tests
│ ├── bdd
│ │ ├── cli_help
│ │ │ ├── help.feature
│ │ │ └── test_help.py
│ │ ├── conftest.py
│ │ ├── features
│ │ │ └── alphagenome_integration.feature
│ │ ├── fetch_articles
│ │ │ ├── fetch.feature
│ │ │ └── test_fetch.py
│ │ ├── get_trials
│ │ │ ├── get.feature
│ │ │ └── test_get.py
│ │ ├── get_variants
│ │ │ ├── get.feature
│ │ │ └── test_get.py
│ │ ├── search_articles
│ │ │ ├── autocomplete.feature
│ │ │ ├── search.feature
│ │ │ ├── test_autocomplete.py
│ │ │ └── test_search.py
│ │ ├── search_trials
│ │ │ ├── search.feature
│ │ │ └── test_search.py
│ │ ├── search_variants
│ │ │ ├── search.feature
│ │ │ └── test_search.py
│ │ └── steps
│ │ └── test_alphagenome_steps.py
│ ├── config
│ │ └── test_smithery_config.py
│ ├── conftest.py
│ ├── data
│ │ ├── ct_gov
│ │ │ ├── clinical_trials_api_v2.yaml
│ │ │ ├── trials_NCT04280705.json
│ │ │ └── trials_NCT04280705.txt
│ │ ├── myvariant
│ │ │ ├── myvariant_api.yaml
│ │ │ ├── myvariant_field_descriptions.csv
│ │ │ ├── variants_full_braf_v600e.json
│ │ │ ├── variants_full_braf_v600e.txt
│ │ │ └── variants_part_braf_v600_multiple.json
│ │ ├── oncokb_mock_responses.json
│ │ ├── openfda
│ │ │ ├── drugsfda_detail.json
│ │ │ ├── drugsfda_search.json
│ │ │ ├── enforcement_detail.json
│ │ │ └── enforcement_search.json
│ │ └── pubtator
│ │ ├── pubtator_autocomplete.json
│ │ └── pubtator3_paper.txt
│ ├── integration
│ │ ├── test_oncokb_integration.py
│ │ ├── test_openfda_integration.py
│ │ ├── test_preprints_integration.py
│ │ ├── test_simple.py
│ │ └── test_variants_integration.py
│ ├── tdd
│ │ ├── articles
│ │ │ ├── test_autocomplete.py
│ │ │ ├── test_cbioportal_integration.py
│ │ │ ├── test_fetch.py
│ │ │ ├── test_preprints.py
│ │ │ ├── test_search.py
│ │ │ └── test_unified.py
│ │ ├── conftest.py
│ │ ├── drugs
│ │ │ ├── __init__.py
│ │ │ └── test_drug_getter.py
│ │ ├── openfda
│ │ │ ├── __init__.py
│ │ │ ├── test_adverse_events.py
│ │ │ ├── test_device_events.py
│ │ │ ├── test_drug_approvals.py
│ │ │ ├── test_drug_labels.py
│ │ │ ├── test_drug_recalls.py
│ │ │ ├── test_drug_shortages.py
│ │ │ └── test_security.py
│ │ ├── test_biothings_integration_real.py
│ │ ├── test_biothings_integration.py
│ │ ├── test_circuit_breaker.py
│ │ ├── test_concurrent_requests.py
│ │ ├── test_connection_pool.py
│ │ ├── test_domain_handlers.py
│ │ ├── test_drug_approvals.py
│ │ ├── test_drug_recalls.py
│ │ ├── test_drug_shortages.py
│ │ ├── test_endpoint_documentation.py
│ │ ├── test_error_scenarios.py
│ │ ├── test_europe_pmc_fetch.py
│ │ ├── test_mcp_integration.py
│ │ ├── test_mcp_tools.py
│ │ ├── test_metrics.py
│ │ ├── test_nci_integration.py
│ │ ├── test_nci_mcp_tools.py
│ │ ├── test_network_policies.py
│ │ ├── test_offline_mode.py
│ │ ├── test_openfda_unified.py
│ │ ├── test_pten_r173_search.py
│ │ ├── test_render.py
│ │ ├── test_request_batcher.py.disabled
│ │ ├── test_retry.py
│ │ ├── test_router.py
│ │ ├── test_shared_context.py.disabled
│ │ ├── test_unified_biothings.py
│ │ ├── thinking
│ │ │ ├── __init__.py
│ │ │ └── test_sequential.py
│ │ ├── trials
│ │ │ ├── test_backward_compatibility.py
│ │ │ ├── test_getter.py
│ │ │ └── test_search.py
│ │ ├── utils
│ │ │ ├── test_gene_validator.py
│ │ │ ├── test_mutation_filter.py
│ │ │ ├── test_rate_limiter.py
│ │ │ └── test_request_cache.py
│ │ ├── variants
│ │ │ ├── constants.py
│ │ │ ├── test_alphagenome_api_key.py
│ │ │ ├── test_alphagenome_comprehensive.py
│ │ │ ├── test_alphagenome.py
│ │ │ ├── test_cbioportal_mutations.py
│ │ │ ├── test_cbioportal_search.py
│ │ │ ├── test_external_integration.py
│ │ │ ├── test_external.py
│ │ │ ├── test_extract_gene_aa_change.py
│ │ │ ├── test_filters.py
│ │ │ ├── test_getter.py
│ │ │ ├── test_links.py
│ │ │ ├── test_oncokb_client.py
│ │ │ ├── test_oncokb_helper.py
│ │ │ └── test_search.py
│ │ └── workers
│ │ └── test_worker_sanitization.js
│ └── test_pydantic_ai_integration.py
├── THIRD_PARTY_ENDPOINTS.md
├── tox.ini
├── uv.lock
└── wrangler.toml
```
# Files
--------------------------------------------------------------------------------
/src/biomcp/cli/main.py:
--------------------------------------------------------------------------------
```python
1 | import importlib.metadata
2 | from typing import Annotated
3 |
4 | import typer
5 |
6 | from .articles import article_app
7 | from .biomarkers import biomarker_app
8 | from .diseases import disease_app
9 | from .health import health_app
10 | from .interventions import intervention_app
11 | from .openfda import openfda_app
12 | from .organizations import organization_app
13 | from .server import run_server
14 | from .trials import trial_app
15 | from .variants import variant_app
16 |
17 | # --- Get version from installed package metadata ---
18 | try:
19 | __version__ = importlib.metadata.version("biomcp-python")
20 | except importlib.metadata.PackageNotFoundError:
21 | __version__ = "unknown" # Fallback if package not installed properly
22 |
23 |
24 | # --- Callback for --version option ---
25 | def version_callback(value: bool):
26 | if value:
27 | typer.echo(f"biomcp version: {__version__}")
28 | raise typer.Exit()
29 |
30 |
31 | # --- Main Typer App ---
32 | app = typer.Typer(
33 | help="BioMCP: Biomedical Model Context Protocol",
34 | no_args_is_help=True,
35 | # Add a callback to handle top-level options like --version
36 | # This callback itself doesn't do much, but allows defining eager options
37 | callback=lambda: None,
38 | )
39 |
40 | app.add_typer(
41 | trial_app,
42 | name="trial",
43 | no_args_is_help=True,
44 | )
45 |
46 | app.add_typer(
47 | article_app,
48 | name="article",
49 | no_args_is_help=True,
50 | )
51 |
52 | app.add_typer(
53 | variant_app,
54 | name="variant",
55 | no_args_is_help=True,
56 | )
57 |
58 | app.add_typer(
59 | health_app,
60 | name="health",
61 | no_args_is_help=True,
62 | )
63 |
64 | app.add_typer(
65 | organization_app,
66 | name="organization",
67 | no_args_is_help=True,
68 | )
69 |
70 | app.add_typer(
71 | intervention_app,
72 | name="intervention",
73 | no_args_is_help=True,
74 | )
75 |
76 | app.add_typer(
77 | biomarker_app,
78 | name="biomarker",
79 | no_args_is_help=True,
80 | )
81 |
82 | app.add_typer(
83 | disease_app,
84 | name="disease",
85 | no_args_is_help=True,
86 | )
87 |
88 | app.add_typer(
89 | openfda_app,
90 | name="openfda",
91 | no_args_is_help=True,
92 | )
93 |
94 |
95 | # --- Add --version Option using Annotation ---
96 | # We add this directly to the app's callback invocation signature via annotation
97 | # Note: This relies on Typer magic linking Annotated options in the callback signature
98 | # This approach is cleaner than adding it to every subcommand.
99 | @app.callback()
100 | def main_callback(
101 | version: Annotated[
102 | bool | None, # Allows the option to not be present
103 | typer.Option(
104 | "--version", # The flag name
105 | callback=version_callback, # Function to call when flag is used
106 | is_eager=True, # Process this option before any commands
107 | help="Show the application's version and exit.",
108 | ),
109 | ] = None, # Default value
110 | ):
111 | """
112 | BioMCP main application callback. Handles global options like --version.
113 | """
114 | # The actual logic is in version_callback due to is_eager=True
115 | pass
116 |
117 |
118 | # --- Add Explicit 'version' Command ---
119 | @app.command()
120 | def version():
121 | """
122 | Display the installed biomcp version.
123 | """
124 | typer.echo(f"biomcp version: {__version__}")
125 |
126 |
127 | # Directly expose run_server as the 'run' command with all its options
128 | app.command("run")(run_server)
129 |
130 |
131 | if __name__ == "__main__":
132 | app()
133 |
```
--------------------------------------------------------------------------------
/src/biomcp/openfda/drug_shortages_helpers.py:
--------------------------------------------------------------------------------
```python
1 | """
2 | Helper functions for drug shortage search to reduce complexity.
3 | """
4 |
5 | from datetime import datetime
6 | from typing import Any
7 |
8 |
9 | def matches_drug_filter(shortage: dict[str, Any], drug: str | None) -> bool:
10 | """Check if shortage matches drug name filter."""
11 | if not drug:
12 | return True
13 |
14 | drug_lower = drug.lower()
15 | generic = shortage.get("generic_name", "").lower()
16 | brands = [b.lower() for b in shortage.get("brand_names", [])]
17 |
18 | return drug_lower in generic or any(drug_lower in b for b in brands)
19 |
20 |
21 | def matches_status_filter(
22 | shortage: dict[str, Any], status: str | None
23 | ) -> bool:
24 | """Check if shortage matches status filter."""
25 | if not status:
26 | return True
27 |
28 | status_lower = status.lower()
29 | shortage_status = shortage.get("status", "").lower()
30 |
31 | if status_lower == "current":
32 | return "current" in shortage_status
33 | elif status_lower == "resolved":
34 | return "resolved" in shortage_status
35 |
36 | return False
37 |
38 |
39 | def matches_category_filter(
40 | shortage: dict[str, Any], therapeutic_category: str | None
41 | ) -> bool:
42 | """Check if shortage matches therapeutic category filter."""
43 | if not therapeutic_category:
44 | return True
45 |
46 | cat_lower = therapeutic_category.lower()
47 | shortage_cat = shortage.get("therapeutic_category", "").lower()
48 |
49 | return cat_lower in shortage_cat
50 |
51 |
52 | def filter_shortages(
53 | shortages: list[dict[str, Any]],
54 | drug: str | None,
55 | status: str | None,
56 | therapeutic_category: str | None,
57 | ) -> list[dict[str, Any]]:
58 | """Filter shortage list based on criteria."""
59 | filtered = []
60 |
61 | for shortage in shortages:
62 | if not matches_drug_filter(shortage, drug):
63 | continue
64 | if not matches_status_filter(shortage, status):
65 | continue
66 | if not matches_category_filter(shortage, therapeutic_category):
67 | continue
68 |
69 | filtered.append(shortage)
70 |
71 | return filtered
72 |
73 |
74 | def format_shortage_search_header(
75 | drug: str | None,
76 | status: str | None,
77 | therapeutic_category: str | None,
78 | last_updated: str | None,
79 | ) -> list[str]:
80 | """Format header for shortage search results."""
81 | output = []
82 |
83 | # Add last updated time
84 | if last_updated:
85 | try:
86 | updated_dt = datetime.fromisoformat(last_updated)
87 | output.append(
88 | f"*Last Updated: {updated_dt.strftime('%Y-%m-%d %H:%M')}*\n"
89 | )
90 | except (ValueError, TypeError):
91 | pass
92 |
93 | if drug:
94 | output.append(f"**Drug**: {drug}")
95 | if status:
96 | output.append(f"**Status Filter**: {status}")
97 | if therapeutic_category:
98 | output.append(f"**Category**: {therapeutic_category}")
99 |
100 | return output
101 |
102 |
103 | def format_cache_timestamp(data: dict[str, Any]) -> str | None:
104 | """Format cache timestamp from data."""
105 | last_updated = data.get("last_updated") or data.get("_fetched_at")
106 | if not last_updated:
107 | return None
108 |
109 | try:
110 | updated_dt = datetime.fromisoformat(last_updated)
111 | return f"*Data Updated: {updated_dt.strftime('%Y-%m-%d %H:%M')}*\n"
112 | except (ValueError, TypeError):
113 | return None
114 |
```
--------------------------------------------------------------------------------
/src/biomcp/utils/mutation_filter.py:
--------------------------------------------------------------------------------
```python
1 | """Mutation filtering utilities."""
2 |
3 | import re
4 | from collections.abc import Sequence
5 | from typing import Protocol
6 |
7 |
8 | class MutationHitProtocol(Protocol):
9 | """Protocol for mutation hit objects."""
10 |
11 | protein_change: str
12 |
13 |
14 | class MutationFilter:
15 | """Filter mutations based on specific mutation or pattern."""
16 |
17 | def __init__(
18 | self, specific_mutation: str | None = None, pattern: str | None = None
19 | ):
20 | """Initialize the filter.
21 |
22 | Args:
23 | specific_mutation: Exact mutation to match (e.g., "V600E")
24 | pattern: Pattern to match (e.g., "V600*" for any V600 mutation)
25 | """
26 | self.specific_mutation = specific_mutation
27 | self.pattern = pattern
28 |
29 | def matches(self, protein_change: str) -> bool:
30 | """Check if a protein change matches the filter criteria.
31 |
32 | Args:
33 | protein_change: The protein change to check
34 |
35 | Returns:
36 | True if matches, False otherwise
37 | """
38 | if not protein_change:
39 | return False
40 |
41 | if self.specific_mutation:
42 | return protein_change == self.specific_mutation
43 |
44 | if self.pattern:
45 | return self._matches_pattern(protein_change)
46 |
47 | # No filter specified, match all
48 | return True
49 |
50 | def _matches_pattern(self, protein_change: str) -> bool:
51 | """Check if protein change matches pattern.
52 |
53 | Args:
54 | protein_change: The protein change to check
55 |
56 | Returns:
57 | True if matches pattern, False otherwise
58 | """
59 | if not self.pattern:
60 | return False
61 |
62 | if self.pattern.endswith("*"):
63 | # Wildcard pattern (e.g., "V600*" matches "V600E", "V600K", etc.)
64 | prefix = self.pattern[:-1]
65 | return protein_change.startswith(prefix)
66 |
67 | # Try regex match
68 | try:
69 | # Escape special regex characters except *
70 | escaped_pattern = re.escape(self.pattern).replace(r"\*", ".*")
71 | return bool(re.match(f"^{escaped_pattern}$", protein_change))
72 | except re.error:
73 | # Fallback to simple prefix match
74 | return protein_change.startswith(self.pattern)
75 |
76 | def filter_mutations(
77 | self, mutations: Sequence[MutationHitProtocol]
78 | ) -> list[MutationHitProtocol]:
79 | """Filter a list of mutations.
80 |
81 | Args:
82 | mutations: List of mutation objects with protein_change attribute
83 |
84 | Returns:
85 | Filtered list of mutations
86 | """
87 | if not self.specific_mutation and not self.pattern:
88 | return list(mutations)
89 |
90 | return [mut for mut in mutations if self.matches(mut.protein_change)]
91 |
92 | def __str__(self) -> str:
93 | """String representation of the filter."""
94 | if self.specific_mutation:
95 | return f"MutationFilter(specific={self.specific_mutation})"
96 | elif self.pattern:
97 | return f"MutationFilter(pattern={self.pattern})"
98 | else:
99 | return "MutationFilter(no_filter)"
100 |
101 | def __repr__(self) -> str:
102 | """Detailed representation of the filter."""
103 | return f"MutationFilter(specific_mutation={self.specific_mutation!r}, pattern={self.pattern!r})"
104 |
```
--------------------------------------------------------------------------------
/docs/apis/overview.md:
--------------------------------------------------------------------------------
```markdown
1 | # API Reference Overview
2 |
3 | BioMCP provides multiple interfaces for programmatic access to biomedical data. This reference covers the Python SDK, MCP protocol implementation, and HTTP API endpoints.
4 |
5 | ## Available APIs
6 |
7 | ### 1. Python SDK
8 |
9 | The Python SDK provides async/await interfaces for all BioMCP functionality:
10 |
11 | - **Client API**: High-level client for all domains
12 | - **Domain-specific APIs**: Specialized interfaces for articles, trials, variants
13 | - **Streaming API**: For real-time data processing
14 | - **Batch API**: For bulk operations
15 |
16 | See [Python SDK Reference](python-sdk.md) for detailed documentation.
17 |
18 | ### 2. MCP Protocol
19 |
20 | BioMCP implements the Model Context Protocol for AI assistant integration:
21 |
22 | - **24 specialized tools** for biomedical research
23 | - **Unified search** across all domains
24 | - **Sequential thinking** for complex queries
25 | - **Streaming responses** for large datasets
26 |
27 | See [MCP Tools Reference](../user-guides/02-mcp-tools-reference.md) for implementation details.
28 |
29 | ### 3. HTTP REST API
30 |
31 | When running in HTTP mode, BioMCP exposes RESTful endpoints:
32 |
33 | - **Search endpoints** for each domain
34 | - **Fetch endpoints** for detailed records
35 | - **Health monitoring** endpoints
36 | - **WebSocket support** for streaming
37 |
38 | See [Transport Protocol Guide](../developer-guides/04-transport-protocol.md) for endpoint documentation.
39 |
40 | ## Common Patterns
41 |
42 | ### Authentication
43 |
44 | Most endpoints work without authentication. API keys enable enhanced features:
45 |
46 | ```python
47 | # Python SDK
48 | client = BioMCPClient(
49 | nci_api_key="your-key",
50 | alphagenome_api_key="your-key"
51 | )
52 |
53 | # HTTP API
54 | headers = {
55 | "X-NCI-API-Key": "your-key",
56 | "X-AlphaGenome-API-Key": "your-key"
57 | }
58 | ```
59 |
60 | ### Error Handling
61 |
62 | All APIs use consistent error codes:
63 |
64 | | Code | Meaning | Action |
65 | | ---- | ------------ | ------------------ |
66 | | 400 | Bad Request | Check parameters |
67 | | 401 | Unauthorized | Check API key |
68 | | 404 | Not Found | Verify ID exists |
69 | | 429 | Rate Limited | Retry with backoff |
70 | | 500 | Server Error | Retry later |
71 |
72 | ### Pagination
73 |
74 | Standard pagination across all APIs:
75 |
76 | ```python
77 | # Python SDK
78 | results = await client.search(
79 | domain="article",
80 | page=1,
81 | page_size=20
82 | )
83 |
84 | # HTTP API
85 | GET /api/articles?page=1&page_size=20
86 | ```
87 |
88 | ### Response Formats
89 |
90 | All APIs support multiple response formats:
91 |
92 | - **JSON**: Default, structured data
93 | - **JSONL**: Streaming line-delimited JSON
94 | - **Markdown**: Human-readable formatting
95 | - **CSV**: Tabular data export
96 |
97 | ## Rate Limits
98 |
99 | | API | Without Key | With Key |
100 | | ------------------ | ----------- | ------------ |
101 | | PubMed/PubTator3 | 3 req/sec | 10 req/sec |
102 | | ClinicalTrials.gov | 50 req/min | 50 req/min |
103 | | BioThings | 3 req/sec | 10 req/sec |
104 | | NCI | N/A | 1000 req/day |
105 | | AlphaGenome | N/A | 100 req/day |
106 |
107 | ## Next Steps
108 |
109 | - [Python SDK Reference](python-sdk.md) - Detailed Python API documentation
110 | - [MCP Tools Reference](../user-guides/02-mcp-tools-reference.md) - MCP implementation details
111 | - [Transport Protocol Guide](../developer-guides/04-transport-protocol.md) - REST endpoint documentation
112 | - [Error Codes Reference](error-codes.md) - Complete error code listing
113 |
```
--------------------------------------------------------------------------------
/example_scripts/python_sdk.py:
--------------------------------------------------------------------------------
```python
1 | #!/usr/bin/env -S uv --quiet run --script
2 | # /// script
3 | # requires-python = ">=3.11"
4 | # dependencies = [
5 | # "biomcp-python",
6 | # ]
7 | # ///
8 |
9 | # Scripts to reproduce this page:
10 | # https://biomcp.org/python_sdk/
11 |
12 | import asyncio
13 | import json
14 |
15 | from biomcp.trials.search import (
16 | RecruitingStatus,
17 | TrialPhase,
18 | TrialQuery,
19 | search_trials,
20 | )
21 | from biomcp.variants.getter import get_variant
22 | from biomcp.variants.search import VariantQuery, search_variants
23 |
24 |
25 | async def find_pathogenic_tp53():
26 | # noinspection PyTypeChecker
27 | query = VariantQuery(gene="TP53", significance="pathogenic", size=5)
28 | # Get results as Markdown (default)
29 | json_output_str = await search_variants(query, output_json=True)
30 | data = json.loads(json_output_str)
31 | assert len(data) == 5
32 | for item in data:
33 | clinvar = item.get("clinvar")
34 | for rcv in clinvar.get("rcv", []):
35 | assert "pathogenic" in rcv["clinical_significance"].lower()
36 |
37 |
38 | async def get_braf_v600e_details():
39 | variant_id = "chr7:g.140453136A>T" # BRAF V600E variant
40 |
41 | # Get results as JSON string
42 | json_output_str = await get_variant(variant_id, output_json=True)
43 | data = json.loads(json_output_str)
44 |
45 | # Process the variant data
46 | assert data, "No data returned for BRAF V600E variant"
47 | variant = data[0]
48 | clinvar = variant.get("clinvar", {})
49 | cosmic = variant.get("cosmic", {})
50 | docm = variant.get("docm", {})
51 |
52 | # Verify key variant details
53 | assert clinvar.get("gene", {}).get("symbol") == "BRAF"
54 | assert clinvar.get("chrom") == "7"
55 | assert clinvar.get("cytogenic") == "7q34"
56 | assert cosmic.get("cosmic_id") == "COSM476"
57 | assert docm.get("aa_change") == "p.V600E"
58 |
59 | # Verify HGVS coding variants
60 | hgvs_coding = clinvar.get("hgvs", {}).get("coding", [])
61 | assert len(hgvs_coding) >= 13
62 | assert "NM_004333.6:c.1799T>A" in hgvs_coding
63 |
64 |
65 | async def find_melanoma_trials():
66 | query = TrialQuery(
67 | conditions=["Melanoma"],
68 | interventions=["Pembrolizumab"],
69 | recruiting_status=RecruitingStatus.OPEN,
70 | phase=TrialPhase.PHASE3,
71 | )
72 |
73 | # Get results as JSON string
74 | json_output_str = await search_trials(query, output_json=True)
75 | data = json.loads(json_output_str)
76 |
77 | # Verify we got results
78 | assert data, "No trials found"
79 | assert len(data) >= 2, "Expected at least 2 melanoma trials"
80 |
81 | # Verify first trial details (NCT05727904)
82 | trial1 = data[0]
83 | assert trial1["NCT Number"] == "NCT05727904"
84 | assert "lifileucel" in trial1["Study Title"].lower()
85 | assert trial1["Study Status"] == "RECRUITING"
86 | assert trial1["Phases"] == "PHASE3"
87 | assert int(trial1["Enrollment"]) == 670
88 | assert "Melanoma" in trial1["Conditions"]
89 | assert "Pembrolizumab" in trial1["Interventions"]
90 |
91 | # Verify second trial details (NCT06697301)
92 | trial2 = data[1]
93 | assert trial2["NCT Number"] == "NCT06697301"
94 | assert "EIK1001" in trial2["Study Title"]
95 | assert trial2["Study Status"] == "RECRUITING"
96 | assert "PHASE3" in trial2["Phases"]
97 | assert int(trial2["Enrollment"]) == 740
98 | assert trial2["Conditions"] == "Advanced Melanoma"
99 |
100 |
101 | def run():
102 | asyncio.run(find_pathogenic_tp53())
103 | asyncio.run(get_braf_v600e_details())
104 | asyncio.run(find_melanoma_trials())
105 |
106 |
107 | if __name__ == "__main__":
108 | run()
109 |
```
--------------------------------------------------------------------------------
/src/biomcp/genes/getter.py:
--------------------------------------------------------------------------------
```python
1 | """Gene information retrieval from MyGene.info."""
2 |
3 | import json
4 | import logging
5 | from typing import Annotated
6 |
7 | from pydantic import Field
8 |
9 | from ..integrations import BioThingsClient
10 | from ..render import to_markdown
11 |
12 | logger = logging.getLogger(__name__)
13 |
14 |
15 | async def get_gene(
16 | gene_id_or_symbol: str,
17 | output_json: bool = False,
18 | ) -> str:
19 | """
20 | Get gene information from MyGene.info.
21 |
22 | Args:
23 | gene_id_or_symbol: Gene ID (Entrez, Ensembl) or symbol (e.g., "TP53", "7157")
24 | output_json: Return as JSON instead of markdown
25 |
26 | Returns:
27 | Gene information as markdown or JSON string
28 | """
29 | client = BioThingsClient()
30 |
31 | try:
32 | gene_info = await client.get_gene_info(gene_id_or_symbol)
33 |
34 | if not gene_info:
35 | error_data = {
36 | "error": f"Gene '{gene_id_or_symbol}' not found",
37 | "suggestion": "Please check the gene symbol or ID",
38 | }
39 | return (
40 | json.dumps(error_data, indent=2)
41 | if output_json
42 | else to_markdown([error_data])
43 | )
44 |
45 | # Convert to dict for rendering
46 | result = gene_info.model_dump(exclude_none=True)
47 |
48 | # Add helpful links
49 | if gene_info.entrezgene:
50 | result["_links"] = {
51 | "NCBI Gene": f"https://www.ncbi.nlm.nih.gov/gene/{gene_info.entrezgene}",
52 | "PubMed": f"https://pubmed.ncbi.nlm.nih.gov/?term={gene_info.symbol}",
53 | }
54 |
55 | # Format aliases nicely
56 | if gene_info.alias:
57 | result["alias"] = ", ".join(
58 | gene_info.alias[:10]
59 | ) # Limit to first 10
60 | if len(gene_info.alias) > 10:
61 | result["alias"] += f" (and {len(gene_info.alias) - 10} more)"
62 |
63 | if output_json:
64 | return json.dumps(result, indent=2)
65 | else:
66 | return to_markdown([result])
67 |
68 | except Exception as e:
69 | logger.error(f"Error fetching gene info for {gene_id_or_symbol}: {e}")
70 | error_data = {
71 | "error": "Failed to retrieve gene information",
72 | "details": str(e),
73 | }
74 | return (
75 | json.dumps(error_data, indent=2)
76 | if output_json
77 | else to_markdown([error_data])
78 | )
79 |
80 |
81 | async def _gene_details(
82 | call_benefit: Annotated[
83 | str,
84 | "Define and summarize why this function is being called and the intended benefit",
85 | ],
86 | gene_id_or_symbol: Annotated[
87 | str,
88 | Field(description="Gene symbol (e.g., TP53, BRAF) or ID (e.g., 7157)"),
89 | ],
90 | ) -> str:
91 | """
92 | Retrieves detailed information for a single gene from MyGene.info.
93 |
94 | This tool provides real-time gene annotations including:
95 | - Official gene name and symbol
96 | - Gene summary/description
97 | - Aliases and alternative names
98 | - Gene type (protein-coding, etc.)
99 | - Links to external databases
100 |
101 | Parameters:
102 | - call_benefit: Define why this function is being called
103 | - gene_id_or_symbol: Gene symbol (e.g., "TP53") or Entrez ID (e.g., "7157")
104 |
105 | Process: Queries MyGene.info API for up-to-date gene annotations
106 | Output: Markdown formatted gene information with description and metadata
107 |
108 | Note: For variant information, use variant_searcher. For articles about genes, use article_searcher.
109 | """
110 | return await get_gene(gene_id_or_symbol, output_json=False)
111 |
```
--------------------------------------------------------------------------------
/src/biomcp/openfda/drug_recalls_helpers.py:
--------------------------------------------------------------------------------
```python
1 | """
2 | Helper functions for drug recall search to reduce complexity.
3 | """
4 |
5 |
6 | def build_drug_search_query(drug: str) -> str:
7 | """Build search query for drug name."""
8 | return (
9 | f'(openfda.brand_name:"{drug}" OR '
10 | f'openfda.generic_name:"{drug}" OR '
11 | f'product_description:"{drug}")'
12 | )
13 |
14 |
15 | def build_class_search_query(recall_class: str) -> str | None:
16 | """Build search query for recall classification."""
17 | # Handle various input formats
18 | recall_class = recall_class.strip()
19 |
20 | # If already in "Class X" format, use it directly
21 | if recall_class.upper().startswith("CLASS "):
22 | return f'classification:"{recall_class.title()}"'
23 |
24 | # Map single digits/numerals to Class format
25 | class_map = {
26 | "1": "Class I",
27 | "I": "Class I",
28 | "2": "Class II",
29 | "II": "Class II",
30 | "3": "Class III",
31 | "III": "Class III",
32 | }
33 | if mapped_class := class_map.get(recall_class.upper()):
34 | return f'classification:"{mapped_class}"'
35 | return None
36 |
37 |
38 | def build_status_search_query(status: str) -> str | None:
39 | """Build search query for recall status."""
40 | status_lower = status.lower()
41 | if status_lower in ["ongoing", "completed", "terminated"]:
42 | return f'status:"{status_lower.capitalize()}"'
43 | return None
44 |
45 |
46 | def build_date_search_query(since_date: str) -> str | None:
47 | """Build search query for date range."""
48 | if len(since_date) == 8:
49 | formatted_date = f"{since_date[:4]}-{since_date[4:6]}-{since_date[6:]}"
50 | return f"recall_initiation_date:[{formatted_date} TO *]"
51 | return None
52 |
53 |
54 | def format_recall_search_header(
55 | drug: str | None,
56 | recall_class: str | None,
57 | status: str | None,
58 | since_date: str | None,
59 | total: int,
60 | ) -> list[str]:
61 | """Format header for recall search results."""
62 | output = []
63 |
64 | if drug:
65 | output.append(f"**Drug**: {drug}")
66 | if recall_class:
67 | output.append(f"**Classification**: Class {recall_class}")
68 | if status:
69 | output.append(f"**Status**: {status}")
70 | if since_date:
71 | output.append(f"**Since**: {since_date}")
72 |
73 | return output
74 |
75 |
76 | def build_recall_search_params(
77 | drug: str | None,
78 | recall_class: str | None,
79 | status: str | None,
80 | reason: str | None,
81 | since_date: str | None,
82 | limit: int,
83 | skip: int,
84 | ) -> dict:
85 | """Build search parameters for recall API."""
86 | # Build search query
87 | search_parts = []
88 |
89 | # Default to human drugs only (exclude veterinary)
90 | search_parts.append('product_type:"Human"')
91 |
92 | if drug:
93 | search_parts.append(build_drug_search_query(drug))
94 |
95 | if recall_class and (
96 | class_query := build_class_search_query(recall_class)
97 | ):
98 | search_parts.append(class_query)
99 |
100 | if status and (status_query := build_status_search_query(status)):
101 | search_parts.append(status_query)
102 |
103 | if reason:
104 | search_parts.append(f'reason_for_recall:"{reason}"')
105 |
106 | if since_date and (date_query := build_date_search_query(since_date)):
107 | search_parts.append(date_query)
108 |
109 | # Combine search parts
110 | search_params = {}
111 | if search_parts:
112 | search_params["search"] = " AND ".join(search_parts)
113 |
114 | # Add pagination
115 | search_params["limit"] = str(min(limit, 100))
116 | search_params["skip"] = str(skip)
117 |
118 | # Sort by recall date (most recent first)
119 | search_params["sort"] = "recall_initiation_date:desc"
120 |
121 | return search_params
122 |
```
--------------------------------------------------------------------------------
/src/biomcp/shared_context.py:
--------------------------------------------------------------------------------
```python
1 | """Shared context for search operations to avoid redundant validations.
2 |
3 | This module provides a context manager that maintains validated entities
4 | (genes, diseases, chemicals) across multiple search operations to improve
5 | performance by eliminating redundant API calls.
6 |
7 | Example:
8 | ```python
9 | from biomcp.shared_context import SearchContextManager
10 |
11 | with SearchContextManager() as context:
12 | # First validation hits the API
13 | is_valid = await context.validate_gene("BRAF")
14 |
15 | # Subsequent validation uses cache
16 | is_valid_again = await context.validate_gene("BRAF")
17 | ```
18 | """
19 |
20 | from typing import Any
21 |
22 |
23 | class SearchContext:
24 | """Shared context to avoid redundant operations across searches.
25 |
26 | This class maintains a cache of validated entities to prevent
27 | redundant API calls during a search session.
28 |
29 | Attributes:
30 | validated_genes: Cache of gene validation results
31 | validated_cache: General validation cache for other entities
32 | """
33 |
34 | def __init__(self):
35 | self.validated_genes: dict[str, bool] = {}
36 | self.gene_summaries: dict[str, Any] = {}
37 | self.cancer_types: dict[str, Any] | None = None
38 | self._validation_cache: dict[str, Any] = {}
39 |
40 | async def validate_gene(self, gene: str) -> bool:
41 | """Validate gene symbol with caching."""
42 | if gene in self.validated_genes:
43 | return self.validated_genes[gene]
44 |
45 | # Import here to avoid circular imports
46 | from .utils.gene_validator import is_valid_gene_symbol
47 |
48 | is_valid = is_valid_gene_symbol(gene)
49 | self.validated_genes[gene] = is_valid
50 | return is_valid
51 |
52 | def get_gene_summary(self, gene: str) -> Any | None:
53 | """Get cached gene summary if available."""
54 | return self.gene_summaries.get(gene)
55 |
56 | def set_gene_summary(self, gene: str, summary: Any):
57 | """Cache gene summary."""
58 | self.gene_summaries[gene] = summary
59 |
60 | def cache_validation(self, key: str, value: Any):
61 | """Cache arbitrary validation results."""
62 | self._validation_cache[key] = value
63 |
64 | def get_cached_validation(self, key: str) -> Any | None:
65 | """Get cached validation result."""
66 | return self._validation_cache.get(key)
67 |
68 |
69 | # Thread-local context for current search operation
70 | _search_context: SearchContext | None = None
71 |
72 |
73 | def get_search_context() -> SearchContext | None:
74 | """Get the current search context."""
75 | return _search_context
76 |
77 |
78 | def set_search_context(context: SearchContext | None):
79 | """Set the current search context."""
80 | global _search_context
81 | _search_context = context
82 |
83 |
84 | class SearchContextManager:
85 | """Context manager for search operations."""
86 |
87 | _instance = None
88 |
89 | def __init__(self):
90 | self.context = None
91 | self.previous_context = None
92 |
93 | def __enter__(self):
94 | # Use singleton pattern within context
95 | if SearchContextManager._instance is None:
96 | SearchContextManager._instance = SearchContext()
97 | self.context = SearchContextManager._instance
98 | self.previous_context = get_search_context()
99 | set_search_context(self.context)
100 | return self.context
101 |
102 | def __exit__(self, exc_type, exc_val, exc_tb):
103 | set_search_context(self.previous_context)
104 | # Clear singleton when last context exits
105 | if self.previous_context is None:
106 | SearchContextManager._instance = None
107 | return False
108 |
```
--------------------------------------------------------------------------------
/src/biomcp/utils/request_cache.py:
--------------------------------------------------------------------------------
```python
1 | """Simple request-level caching for API calls."""
2 |
3 | import asyncio
4 | import time
5 | from collections import OrderedDict
6 | from collections.abc import Awaitable, Callable
7 | from functools import wraps
8 | from typing import Any, TypeVar
9 |
10 |
11 | # LRU cache with size limit
12 | class LRUCache:
13 | """Simple LRU cache with TTL support."""
14 |
15 | def __init__(self, max_size: int = 1000):
16 | self.cache: OrderedDict[str, tuple[Any, float]] = OrderedDict()
17 | self.max_size = max_size
18 | self._lock = asyncio.Lock()
19 |
20 | async def get(self, key: str) -> Any | None:
21 | """Get item from cache if not expired."""
22 | async with self._lock:
23 | if key not in self.cache:
24 | return None
25 |
26 | value, expiry = self.cache[key]
27 | if time.time() > expiry:
28 | del self.cache[key]
29 | return None
30 |
31 | # Move to end (most recently used)
32 | self.cache.move_to_end(key)
33 | return value
34 |
35 | async def set(self, key: str, value: Any, ttl: float):
36 | """Set item in cache with TTL."""
37 | async with self._lock:
38 | # Remove oldest items if at capacity
39 | while len(self.cache) >= self.max_size:
40 | self.cache.popitem(last=False)
41 |
42 | expiry = time.time() + ttl
43 | self.cache[key] = (value, expiry)
44 |
45 |
46 | # Global LRU cache instance
47 | _cache = LRUCache(max_size=1000)
48 |
49 | # Default TTL in seconds (15 minutes)
50 | DEFAULT_TTL = 900
51 |
52 | # Named caches for different purposes
53 | _named_caches: dict[str, LRUCache] = {}
54 |
55 |
56 | def get_cache(
57 | name: str, ttl_seconds: int = 300, max_size: int = 100
58 | ) -> LRUCache:
59 | """Get or create a named cache."""
60 | if name not in _named_caches:
61 | _named_caches[name] = LRUCache(max_size=max_size)
62 | return _named_caches[name]
63 |
64 |
65 | T = TypeVar("T")
66 |
67 |
68 | def cache_key(*args, **kwargs) -> str:
69 | """Generate a cache key from function arguments."""
70 | key_parts = [str(arg) for arg in args]
71 | key_parts.extend(f"{k}={v}" for k, v in sorted(kwargs.items()))
72 | return ":".join(key_parts)
73 |
74 |
75 | async def get_cached(key: str) -> Any | None:
76 | """Get a value from cache if not expired."""
77 | return await _cache.get(key)
78 |
79 |
80 | async def set_cached(key: str, value: Any, ttl: int = DEFAULT_TTL) -> None:
81 | """Set a value in cache with TTL."""
82 | await _cache.set(key, value, ttl)
83 |
84 |
85 | def request_cache(ttl: int = DEFAULT_TTL) -> Callable:
86 | """Decorator for caching async function results.
87 |
88 | Args:
89 | ttl: Time to live in seconds
90 |
91 | Returns:
92 | Decorated function with caching
93 | """
94 |
95 | def decorator(
96 | func: Callable[..., Awaitable[T]],
97 | ) -> Callable[..., Awaitable[T]]:
98 | @wraps(func)
99 | async def wrapper(*args, **kwargs) -> T:
100 | # Skip caching if explicitly disabled
101 | if kwargs.pop("skip_cache", False):
102 | return await func(*args, **kwargs)
103 |
104 | # Generate cache key
105 | key = f"{func.__module__}.{func.__name__}:{cache_key(*args, **kwargs)}"
106 |
107 | # Check cache
108 | cached_value = await get_cached(key)
109 | if cached_value is not None:
110 | return cached_value
111 |
112 | # Call function and cache result
113 | result = await func(*args, **kwargs)
114 | if result is not None: # Only cache non-None results
115 | await set_cached(key, result, ttl)
116 |
117 | return result
118 |
119 | return wrapper
120 |
121 | return decorator
122 |
123 |
124 | async def clear_cache() -> None:
125 | """Clear all cached entries."""
126 | # Use the LRU cache's clear method
127 | _cache.cache.clear()
128 |
```
--------------------------------------------------------------------------------
/src/biomcp/utils/cbio_http_adapter.py:
--------------------------------------------------------------------------------
```python
1 | """Adapter for using centralized HTTP client with cBioPortal API.
2 |
3 | This module provides a thin wrapper around the centralized HTTP client
4 | specifically for cBioPortal API calls. It handles:
5 | - Authorization header injection for authenticated requests
6 | - Consistent error handling and response formatting
7 | - Endpoint-specific caching and rate limiting
8 | - Seamless migration from direct httpx usage
9 |
10 | Example:
11 | adapter = CBioHTTPAdapter()
12 | data, error = await adapter.get("/genes/BRAF")
13 | if error:
14 | print(f"Failed to fetch gene: {error}")
15 | else:
16 | print(f"Gene ID: {data.get('entrezGeneId')}")
17 | """
18 |
19 | import json
20 | from typing import Any
21 |
22 | from ..http_client import RequestError, request_api
23 | from ..variants.constants import CBIO_BASE_URL, CBIO_TOKEN
24 |
25 |
26 | class CBioHTTPAdapter:
27 | """Adapter for cBioPortal API calls using centralized HTTP client."""
28 |
29 | def __init__(self):
30 | self.base_url = CBIO_BASE_URL
31 | self.headers = self._build_headers()
32 |
33 | def _build_headers(self) -> dict[str, str]:
34 | """Build authorization headers if token is available."""
35 | headers = {}
36 | if CBIO_TOKEN:
37 | if not CBIO_TOKEN.startswith("Bearer "):
38 | headers["Authorization"] = f"Bearer {CBIO_TOKEN}"
39 | else:
40 | headers["Authorization"] = CBIO_TOKEN
41 | return headers
42 |
43 | async def get(
44 | self,
45 | path: str,
46 | params: dict[str, Any] | None = None,
47 | endpoint_key: str = "cbioportal_api",
48 | cache_ttl: int = 900, # 15 minutes default
49 | ) -> tuple[dict[str, Any] | None, RequestError | None]:
50 | """Make a GET request to cBioPortal API.
51 |
52 | Args:
53 | path: API path (e.g., "/genes/BRAF")
54 | params: Query parameters
55 | endpoint_key: Registry key for endpoint tracking
56 | cache_ttl: Cache time-to-live in seconds
57 |
58 | Returns:
59 | Tuple of (response_data, error)
60 | """
61 | url = f"{self.base_url}{path}"
62 |
63 | # Prepare request with headers
64 | request_params = params or {}
65 | if self.headers:
66 | # Need to pass headers through params for centralized client
67 | request_params["_headers"] = json.dumps(self.headers)
68 |
69 | result, error = await request_api(
70 | url=url,
71 | request=request_params,
72 | method="GET",
73 | domain="cbioportal", # For rate limiting
74 | endpoint_key=endpoint_key,
75 | cache_ttl=cache_ttl,
76 | enable_retry=True,
77 | )
78 |
79 | return result, error
80 |
81 | async def post(
82 | self,
83 | path: str,
84 | data: dict[str, Any],
85 | endpoint_key: str = "cbioportal_api",
86 | cache_ttl: int = 0, # No caching for POST by default
87 | ) -> tuple[dict[str, Any] | None, RequestError | None]:
88 | """Make a POST request to cBioPortal API.
89 |
90 | Args:
91 | path: API path
92 | data: Request body data
93 | endpoint_key: Registry key for endpoint tracking
94 | cache_ttl: Cache time-to-live in seconds
95 |
96 | Returns:
97 | Tuple of (response_data, error)
98 | """
99 | url = f"{self.base_url}{path}"
100 |
101 | # Add headers to request
102 | if self.headers:
103 | data["_headers"] = json.dumps(self.headers)
104 |
105 | result, error = await request_api(
106 | url=url,
107 | request=data,
108 | method="POST",
109 | domain="cbioportal",
110 | endpoint_key=endpoint_key,
111 | cache_ttl=cache_ttl,
112 | enable_retry=True,
113 | )
114 |
115 | return result, error
116 |
```
--------------------------------------------------------------------------------
/tests/tdd/utils/test_gene_validator.py:
--------------------------------------------------------------------------------
```python
1 | """Tests for gene validation utilities."""
2 |
3 | from biomcp.utils.gene_validator import (
4 | is_valid_gene_symbol,
5 | sanitize_gene_symbol,
6 | )
7 |
8 |
9 | class TestGeneValidator:
10 | """Test gene symbol validation."""
11 |
12 | def test_valid_gene_symbols(self):
13 | """Test that valid gene symbols are accepted."""
14 | valid_genes = [
15 | "BRAF",
16 | "TP53",
17 | "KRAS",
18 | "EGFR",
19 | "PIK3CA",
20 | "BRCA1",
21 | "BRCA2",
22 | "MYC",
23 | "ERBB2",
24 | "CDKN2A",
25 | "VHL",
26 | "RB1",
27 | "PTEN",
28 | "APC",
29 | "MLH1",
30 | "MSH2",
31 | "MSH6",
32 | "PMS2",
33 | "ATM",
34 | "CHEK2",
35 | "PALB2",
36 | "RAD51C",
37 | "RAD51D",
38 | "BRIP1",
39 | "CDH1",
40 | "STK11",
41 | "MUTYH",
42 | "BMPR1A",
43 | "SMAD4",
44 | "ALK",
45 | "ROS1",
46 | "RET",
47 | "MET",
48 | "HER2",
49 | "FGFR1",
50 | "FGFR2",
51 | "FGFR3",
52 | "FGFR4",
53 | "IDH1",
54 | "IDH2",
55 | "TERT",
56 | "ATRX",
57 | "H3F3A",
58 | "HIST1H3B",
59 | "BRAFV600E", # With mutation
60 | "KRASG12D", # With mutation
61 | "EGFRL858R", # With mutation
62 | ]
63 |
64 | for gene in valid_genes:
65 | assert is_valid_gene_symbol(
66 | gene
67 | ), f"Should accept valid gene: {gene}"
68 |
69 | def test_invalid_gene_symbols(self):
70 | """Test that invalid gene symbols are rejected."""
71 | invalid_genes = [
72 | None,
73 | "",
74 | " ",
75 | " ",
76 | "123", # Starts with number
77 | "A", # Too short
78 | "INVALID_GENE_XYZ", # Known invalid
79 | "TEST",
80 | "NULL",
81 | "NONE",
82 | "UNKNOWN",
83 | "gene", # Lowercase
84 | "Braf", # Mixed case
85 | "GENE-WITH-SPECIAL-CHARS!",
86 | "GENE WITH SPACES",
87 | "GENE/WITH/SLASHES",
88 | "GENE.WITH.DOTS",
89 | "VERYLONGGENENAMETHATEXCEEDSLIMIT", # Too long
90 | "_GENE", # Starts with underscore
91 | "-GENE", # Starts with hyphen
92 | ]
93 |
94 | for gene in invalid_genes:
95 | assert not is_valid_gene_symbol(
96 | gene
97 | ), f"Should reject invalid gene: {gene}"
98 |
99 | def test_gene_symbols_with_version(self):
100 | """Test gene symbols with version suffixes."""
101 | versioned_genes = [
102 | "MT-CO1",
103 | "MT-CO2",
104 | "MT-CO3",
105 | "HLA-A",
106 | "HLA-B",
107 | "HLA-C",
108 | "HLA-DRB1",
109 | "HLA-DQB1",
110 | "HLA-DPB1",
111 | ]
112 |
113 | for gene in versioned_genes:
114 | assert is_valid_gene_symbol(
115 | gene
116 | ), f"Should accept versioned gene: {gene}"
117 |
118 | def test_sanitize_gene_symbol(self):
119 | """Test gene symbol sanitization."""
120 | # Test uppercase conversion
121 | assert sanitize_gene_symbol("braf") == "BRAF"
122 | assert sanitize_gene_symbol("Tp53") == "TP53"
123 | assert sanitize_gene_symbol("kRaS") == "KRAS"
124 |
125 | # Test whitespace stripping
126 | assert sanitize_gene_symbol(" BRAF ") == "BRAF"
127 | assert sanitize_gene_symbol("\tTP53\n") == "TP53"
128 | assert sanitize_gene_symbol(" KRAS ") == "KRAS"
129 |
130 | # Test combination
131 | assert sanitize_gene_symbol(" braf ") == "BRAF"
132 | assert sanitize_gene_symbol("\ttp53\n") == "TP53"
133 |
```
--------------------------------------------------------------------------------
/src/biomcp/cli/server.py:
--------------------------------------------------------------------------------
```python
1 | from enum import Enum
2 | from typing import Annotated
3 |
4 | import typer
5 | from dotenv import load_dotenv
6 |
7 | from .. import logger, mcp_app # mcp_app is already instantiated in core.py
8 |
9 | # Load environment variables from .env file
10 | load_dotenv()
11 |
12 | server_app = typer.Typer(help="Server operations")
13 |
14 |
15 | class ServerMode(str, Enum):
16 | STDIO = "stdio"
17 | WORKER = "worker"
18 | STREAMABLE_HTTP = "streamable_http"
19 |
20 |
21 | def run_stdio_server():
22 | """Run server in STDIO mode."""
23 | logger.info("Starting MCP server with STDIO transport:")
24 | mcp_app.run(transport="stdio")
25 |
26 |
27 | def run_http_server(host: str, port: int, mode: ServerMode):
28 | """Run server in HTTP-based mode (worker or streamable_http)."""
29 | try:
30 | from typing import Any
31 |
32 | import uvicorn
33 |
34 | app: Any # Type will be either FastAPI or Starlette
35 |
36 | if mode == ServerMode.WORKER:
37 | logger.info("Starting MCP server with Worker/SSE transport")
38 | try:
39 | from ..workers.worker import app
40 | except ImportError as e:
41 | logger.error(
42 | f"Failed to import worker mode dependencies: {e}\n"
43 | "Please install with: pip install biomcp-python[worker]"
44 | )
45 | raise typer.Exit(1) from e
46 | else: # STREAMABLE_HTTP
47 | logger.info(
48 | f"Starting MCP server with Streamable HTTP transport on {host}:{port}"
49 | )
50 | logger.info(f"Endpoint: http://{host}:{port}/mcp")
51 | logger.info("Using FastMCP's native Streamable HTTP support")
52 |
53 | try:
54 | from starlette.responses import JSONResponse
55 | from starlette.routing import Route
56 | except ImportError as e:
57 | logger.error(
58 | f"Failed to import Starlette dependencies: {e}\n"
59 | "Please install with: pip install biomcp-python[worker]"
60 | )
61 | raise typer.Exit(1) from e
62 |
63 | from .. import mcp_app
64 |
65 | # Get FastMCP's streamable_http_app
66 | app = mcp_app.streamable_http_app()
67 |
68 | # Add health endpoint to the Starlette app
69 | async def health_check(request):
70 | return JSONResponse({"status": "healthy"})
71 |
72 | health_route = Route("/health", health_check, methods=["GET"])
73 | app.routes.append(health_route)
74 |
75 | uvicorn.run(
76 | app,
77 | host=host,
78 | port=port,
79 | log_level="info",
80 | )
81 | except ImportError as e:
82 | logger.error(f"Failed to start {mode.value} mode: {e}")
83 | raise typer.Exit(1) from e
84 | except Exception as e:
85 | logger.error(f"An unexpected error occurred: {e}", exc_info=True)
86 | raise typer.Exit(1) from e
87 |
88 |
89 | @server_app.command("run")
90 | def run_server(
91 | mode: Annotated[
92 | ServerMode,
93 | typer.Option(
94 | help="Server mode: stdio (local), worker (legacy SSE), or streamable_http (MCP spec compliant)",
95 | case_sensitive=False,
96 | ),
97 | ] = ServerMode.STDIO,
98 | host: Annotated[
99 | str,
100 | typer.Option(
101 | help="Host to bind to (for HTTP modes)",
102 | ),
103 | ] = "0.0.0.0", # noqa: S104 - Required for Docker container networking
104 | port: Annotated[
105 | int,
106 | typer.Option(
107 | help="Port to bind to (for HTTP modes)",
108 | ),
109 | ] = 8000,
110 | ):
111 | """Run the BioMCP server with selected transport mode."""
112 | if mode == ServerMode.STDIO:
113 | run_stdio_server()
114 | else:
115 | run_http_server(host, port, mode)
116 |
```
--------------------------------------------------------------------------------
/src/biomcp/thinking/sequential.py:
--------------------------------------------------------------------------------
```python
1 | """Sequential thinking module for BioMCP."""
2 |
3 | from typing import Annotated
4 |
5 | from .session import ThoughtEntry, _session_manager
6 |
7 |
8 | def get_current_timestamp() -> str:
9 | """Get current timestamp in ISO format."""
10 | from datetime import datetime
11 |
12 | return datetime.now().isoformat()
13 |
14 |
15 | async def _sequential_thinking(
16 | thought: Annotated[
17 | str, "Current thinking step - be detailed and thorough"
18 | ],
19 | nextThoughtNeeded: Annotated[
20 | bool, "True if more thinking needed, False only when completely done"
21 | ],
22 | thoughtNumber: Annotated[int, "Current thought number (start at 1)"],
23 | totalThoughts: Annotated[
24 | int, "Best estimate of total thoughts (adjust as needed)"
25 | ],
26 | isRevision: Annotated[
27 | bool, "True when correcting/improving a previous thought"
28 | ] = False,
29 | revisesThought: Annotated[
30 | int | None, "The thought number being revised"
31 | ] = None,
32 | branchFromThought: Annotated[
33 | int | None, "Create alternative path from this thought number"
34 | ] = None,
35 | needsMoreThoughts: Annotated[
36 | bool | None,
37 | "True when problem is significantly larger than initially estimated",
38 | ] = None,
39 | ) -> str:
40 | """
41 | ALWAYS use this tool for complex reasoning, analysis, or problem-solving. This facilitates a detailed, step-by-step thinking process that helps break down problems systematically.
42 |
43 | Use this tool when:
44 | - Analyzing complex problems or questions
45 | - Planning multi-step solutions
46 | - Breaking down tasks into components
47 | - Reasoning through uncertainties
48 | - Exploring alternative approaches
49 |
50 | Start with thoughtNumber=1 and totalThoughts as your best estimate. Set nextThoughtNeeded=true to continue thinking, or false when done. You can revise earlier thoughts or branch into alternative paths as needed.
51 |
52 | This is your primary reasoning tool - USE IT LIBERALLY for any non-trivial thinking task.
53 | """
54 |
55 | # Validate inputs
56 | if thoughtNumber < 1:
57 | return "Error: thoughtNumber must be >= 1"
58 |
59 | if totalThoughts < 1:
60 | return "Error: totalThoughts must be >= 1"
61 |
62 | if isRevision and not revisesThought:
63 | return "Error: revisesThought must be specified when isRevision=True"
64 |
65 | # Get or create session
66 | session = _session_manager.get_or_create_session()
67 |
68 | # Create thought entry
69 | branch_id = f"branch_{branchFromThought}" if branchFromThought else None
70 |
71 | entry = ThoughtEntry(
72 | thought=thought,
73 | thought_number=thoughtNumber,
74 | total_thoughts=totalThoughts,
75 | next_thought_needed=nextThoughtNeeded,
76 | is_revision=isRevision,
77 | revises_thought=revisesThought,
78 | branch_from_thought=branchFromThought,
79 | branch_id=branch_id,
80 | metadata={"needsMoreThoughts": needsMoreThoughts}
81 | if needsMoreThoughts
82 | else {},
83 | )
84 |
85 | # Add thought to session
86 | session.add_thought(entry)
87 |
88 | # Generate status message
89 | if branchFromThought:
90 | status_msg = f"Added thought {thoughtNumber} to branch '{branch_id}'"
91 | elif isRevision and revisesThought:
92 | status_msg = (
93 | f"Revised thought {revisesThought} (now thought {thoughtNumber})"
94 | )
95 | else:
96 | status_msg = f"Added thought {thoughtNumber} to main sequence"
97 |
98 | # Generate progress information
99 | progress_msg = f"Progress: {thoughtNumber}/{totalThoughts} thoughts"
100 | next_msg = (
101 | "Next thought needed"
102 | if nextThoughtNeeded
103 | else "Thinking sequence complete"
104 | )
105 |
106 | return f"{status_msg}. {progress_msg}. {next_msg}."
107 |
```
--------------------------------------------------------------------------------
/docs/stylesheets/extra.css:
--------------------------------------------------------------------------------
```css
1 | /* Custom styles for BioMCP documentation */
2 |
3 | /* Style for main navigation tabs */
4 | .md-tabs__link {
5 | font-weight: 600;
6 | text-transform: uppercase;
7 | letter-spacing: 0.03em;
8 | }
9 |
10 | /* Bold section headers in sidebar */
11 | .md-nav__item--section > .md-nav__link {
12 | font-weight: 700 !important;
13 | font-size: 0.9rem !important;
14 | margin-top: 0.8rem;
15 | margin-bottom: 0.4rem;
16 | padding-bottom: 0.4rem;
17 | border-bottom: 1px solid var(--md-default-fg-color--lightest);
18 | display: block;
19 | }
20 |
21 | /* Nested section headers - slightly smaller */
22 | .md-nav__item--section .md-nav__item--section > .md-nav__link {
23 | font-weight: 600 !important;
24 | font-size: 0.85rem !important;
25 | margin-top: 0.4rem;
26 | margin-bottom: 0.2rem;
27 | }
28 |
29 | /* Regular navigation links */
30 | .md-nav__link {
31 | font-weight: 400;
32 | }
33 |
34 | /* Active/current page link */
35 | .md-nav__link--active {
36 | font-weight: 600 !important;
37 | color: var(--md-accent-fg-color) !important;
38 | }
39 |
40 | /* Table of contents header - make it lighter */
41 | .md-nav--secondary > .md-nav__title {
42 | font-weight: 600 !important;
43 | font-size: 0.8rem !important;
44 | text-transform: none !important;
45 | letter-spacing: normal !important;
46 | color: var(--md-default-fg-color--light) !important;
47 | background-color: transparent !important;
48 | box-shadow: none !important;
49 | border-bottom: 1px solid var(--md-default-fg-color--lightest);
50 | padding-bottom: 0.4rem;
51 | }
52 |
53 | /* Add visual separation between major sections */
54 | .md-nav--primary > .md-nav__list > .md-nav__item {
55 | margin-bottom: 0.5rem;
56 | }
57 |
58 | /* Improve readability of code blocks */
59 | .highlight pre {
60 | line-height: 1.5;
61 | overflow-x: auto;
62 | white-space: pre;
63 | }
64 |
65 | /* Fix code blocks in grid cards */
66 | .md-typeset .grid.cards code,
67 | .md-typeset .grid.cards pre {
68 | word-break: break-word;
69 | white-space: pre-wrap;
70 | overflow-wrap: break-word;
71 | }
72 |
73 | /* Specific fix for grid card code blocks */
74 | .md-typeset .grid.cards .highlight {
75 | margin: 0.5em 0;
76 | }
77 |
78 | .md-typeset .grid.cards .highlight pre {
79 | padding: 0.5em;
80 | font-size: 0.8em;
81 | }
82 |
83 | /* Prevent horizontal scroll for inline code */
84 | .md-typeset code {
85 | word-break: break-word;
86 | }
87 |
88 | /* Better spacing for admonitions */
89 | .admonition {
90 | margin: 1.5rem 0;
91 | }
92 |
93 | /* Improve table readability */
94 | .md-typeset table {
95 | font-size: 0.85rem;
96 | }
97 |
98 | /* Make external links more visible */
99 | .md-content a[href^="http"]:not(.md-button)::after {
100 | content: " ↗";
101 | font-size: 0.75em;
102 | vertical-align: super;
103 | opacity: 0.7;
104 | }
105 |
106 | /* Better spacing for navigation expansion arrows */
107 | .md-nav__icon {
108 | margin-left: 0.2rem;
109 | }
110 |
111 | /* Accessibility improvements */
112 | /* Ensure focus indicators are visible */
113 | a:focus,
114 | button:focus,
115 | input:focus,
116 | select:focus,
117 | textarea:focus {
118 | outline: 2px solid var(--md-accent-fg-color);
119 | outline-offset: 2px;
120 | }
121 |
122 | /* Skip to main content link */
123 | .md-skip {
124 | position: fixed;
125 | top: -40px;
126 | left: 0;
127 | background: var(--md-primary-fg-color);
128 | color: var(--md-primary-bg-color);
129 | padding: 8px;
130 | z-index: 100;
131 | text-decoration: none;
132 | }
133 |
134 | .md-skip:focus {
135 | top: 0;
136 | }
137 |
138 | /* Improve readability with better line height */
139 | .md-typeset {
140 | line-height: 1.6;
141 | }
142 |
143 | /* Ensure code blocks have sufficient contrast */
144 | .highlight pre code {
145 | font-size: 0.85rem;
146 | line-height: 1.5;
147 | }
148 |
149 | /* Make interactive elements more obvious */
150 | .md-typeset .tabbed-set > input:checked + label {
151 | border-bottom: 2px solid var(--md-accent-fg-color);
152 | }
153 |
154 | /* Improve form accessibility */
155 | .md-search__input {
156 | font-size: 1rem;
157 | }
158 |
159 | /* Screen reader only text utility */
160 | .sr-only {
161 | position: absolute;
162 | width: 1px;
163 | height: 1px;
164 | padding: 0;
165 | margin: -1px;
166 | overflow: hidden;
167 | clip: rect(0, 0, 0, 0);
168 | white-space: nowrap;
169 | border: 0;
170 | }
171 |
```
--------------------------------------------------------------------------------
/tests/tdd/test_error_scenarios.py:
--------------------------------------------------------------------------------
```python
1 | """Tests for error scenarios and edge cases - fixed version."""
2 |
3 | import asyncio
4 | from unittest.mock import MagicMock, patch
5 |
6 | import pytest
7 |
8 | from biomcp.exceptions import (
9 | InvalidDomainError,
10 | )
11 | from biomcp.rate_limiter import RateLimiter
12 | from biomcp.router import format_results
13 |
14 |
15 | @pytest.fixture(autouse=True)
16 | def enable_metrics_for_concurrent_test(monkeypatch):
17 | """Enable metrics for concurrent test."""
18 | monkeypatch.setenv("BIOMCP_METRICS_ENABLED", "true")
19 | # Force reload of the module to pick up the new env var
20 | import importlib
21 |
22 | import biomcp.metrics
23 |
24 | importlib.reload(biomcp.metrics)
25 |
26 |
27 | def test_format_results_invalid_domain():
28 | """Test format_results with invalid domain."""
29 | with pytest.raises(InvalidDomainError) as exc_info:
30 | format_results([], "invalid_domain", 1, 10, 100)
31 |
32 | assert "invalid_domain" in str(exc_info.value)
33 | assert "Valid domains are:" in str(exc_info.value)
34 |
35 |
36 | def test_format_results_handler_exception():
37 | """Test format_results when handler raises exception."""
38 | # Create a result that will cause formatting to fail
39 | bad_result = {"missing": "required_fields"}
40 |
41 | with patch(
42 | "biomcp.domain_handlers.ArticleHandler.format_result"
43 | ) as mock_format:
44 | mock_format.side_effect = KeyError("id")
45 |
46 | # Should handle the error gracefully
47 | result = format_results([bad_result], "article", 1, 10, 100)
48 |
49 | assert result["results"] == [] # Bad result is skipped
50 |
51 |
52 | @pytest.mark.asyncio
53 | async def test_rate_limiter_basic():
54 | """Test basic rate limiter functionality."""
55 | # Test normal operation
56 | limiter = RateLimiter(requests_per_second=10, burst_size=5)
57 |
58 | # Should allow burst through context manager
59 | for _ in range(5):
60 | async with limiter.limit():
61 | pass # Should not raise
62 |
63 |
64 | @pytest.mark.asyncio
65 | async def test_concurrent_operations():
66 | """Test system behavior under concurrent load."""
67 | # Clear metrics
68 | from biomcp.metrics import (
69 | _metrics_collector,
70 | get_metric_summary,
71 | record_metric,
72 | )
73 |
74 | await _metrics_collector.clear()
75 |
76 | # Simulate concurrent metric recording
77 | async def record_operation(i):
78 | await record_metric(
79 | "concurrent_test",
80 | duration=0.1 * (i % 5),
81 | success=i % 10 != 0, # 10% failure rate
82 | )
83 |
84 | # Run 100 concurrent operations
85 | tasks = [record_operation(i) for i in range(100)]
86 | await asyncio.gather(*tasks)
87 |
88 | # Check metrics
89 | summary = await get_metric_summary("concurrent_test")
90 | assert summary is not None
91 | assert summary.count == 100
92 | assert summary.error_rate == 0.1 # 10% errors
93 | assert (
94 | 0.18 <= summary.avg_duration <= 0.22
95 | ) # Average of 0.1, 0.2, 0.3, 0.4
96 |
97 |
98 | def test_cache_corruption_handling():
99 | """Test handling of corrupted cache data."""
100 | from biomcp.http_client import get_cached_response
101 |
102 | # Simulate corrupted cache entry
103 | with patch("biomcp.http_client.get_cache") as mock_get_cache:
104 | mock_cache = MagicMock()
105 | mock_cache.get.return_value = "corrupted\x00data" # Invalid data
106 | mock_get_cache.return_value = mock_cache
107 |
108 | # Should handle corrupted data gracefully
109 | result = get_cached_response("test_key")
110 | assert (
111 | result == "corrupted\x00data"
112 | ) # Returns as-is, parsing handles it
113 |
114 |
115 | def test_exception_hierarchy():
116 | """Test custom exception hierarchy and messages."""
117 | # Test InvalidDomainError
118 | exc = InvalidDomainError("bad_domain", ["article", "trial"])
119 | assert "bad_domain" in str(exc)
120 | assert "article" in str(exc)
121 |
```
--------------------------------------------------------------------------------
/src/biomcp/thinking_tool.py:
--------------------------------------------------------------------------------
```python
1 | """Sequential thinking tool for structured problem-solving.
2 |
3 | This module provides a dedicated MCP tool for sequential thinking,
4 | separate from the main search functionality.
5 | """
6 |
7 | from typing import Annotated
8 |
9 | from pydantic import Field
10 |
11 | from biomcp.core import mcp_app
12 | from biomcp.metrics import track_performance
13 | from biomcp.thinking.sequential import _sequential_thinking
14 | from biomcp.thinking_tracker import mark_thinking_used
15 |
16 |
17 | @mcp_app.tool()
18 | @track_performance("biomcp.think")
19 | async def think(
20 | thought: Annotated[
21 | str,
22 | Field(description="Current thinking step for analysis"),
23 | ],
24 | thoughtNumber: Annotated[
25 | int,
26 | Field(
27 | description="Current thought number, starting at 1",
28 | ge=1,
29 | ),
30 | ],
31 | totalThoughts: Annotated[
32 | int,
33 | Field(
34 | description="Estimated total thoughts needed for complete analysis",
35 | ge=1,
36 | ),
37 | ],
38 | nextThoughtNeeded: Annotated[
39 | bool,
40 | Field(
41 | description="Whether more thinking steps are needed after this one",
42 | ),
43 | ] = True,
44 | ) -> dict:
45 | """REQUIRED FIRST STEP: Perform structured sequential thinking for ANY biomedical research task.
46 |
47 | 🚨 IMPORTANT: You MUST use this tool BEFORE any search or fetch operations when:
48 | - Researching ANY biomedical topic (genes, diseases, variants, trials)
49 | - Planning to use multiple BioMCP tools
50 | - Answering questions that require analysis or synthesis
51 | - Comparing information from different sources
52 | - Making recommendations or drawing conclusions
53 |
54 | ⚠️ FAILURE TO USE THIS TOOL FIRST will result in:
55 | - Incomplete or poorly structured analysis
56 | - Missing important connections between data
57 | - Suboptimal search strategies
58 | - Overlooked critical information
59 |
60 | Sequential thinking ensures you:
61 | 1. Fully understand the research question
62 | 2. Plan an optimal search strategy
63 | 3. Identify all relevant data sources
64 | 4. Structure your analysis properly
65 | 5. Deliver comprehensive, well-reasoned results
66 |
67 | ## Usage Pattern:
68 | 1. Start with thoughtNumber=1 to initiate analysis
69 | 2. Progress through numbered thoughts sequentially
70 | 3. Adjust totalThoughts estimate as understanding develops
71 | 4. Set nextThoughtNeeded=False only when analysis is complete
72 |
73 | ## Example:
74 | ```python
75 | # Initial analysis
76 | await think(
77 | thought="Breaking down the relationship between BRAF mutations and melanoma treatment resistance...",
78 | thoughtNumber=1,
79 | totalThoughts=5,
80 | nextThoughtNeeded=True
81 | )
82 |
83 | # Continue analysis
84 | await think(
85 | thought="Examining specific BRAF V600E mutation mechanisms...",
86 | thoughtNumber=2,
87 | totalThoughts=5,
88 | nextThoughtNeeded=True
89 | )
90 |
91 | # Final thought
92 | await think(
93 | thought="Synthesizing findings and proposing research directions...",
94 | thoughtNumber=5,
95 | totalThoughts=5,
96 | nextThoughtNeeded=False
97 | )
98 | ```
99 |
100 | ## Important Notes:
101 | - Each thought builds on previous ones within a session
102 | - State is maintained throughout the MCP session
103 | - Use thoughtful, detailed analysis in each step
104 | - Revisions and branching are supported through the underlying implementation
105 | """
106 | # Mark that thinking has been used
107 | mark_thinking_used()
108 |
109 | result = await _sequential_thinking(
110 | thought=thought,
111 | thoughtNumber=thoughtNumber,
112 | totalThoughts=totalThoughts,
113 | nextThoughtNeeded=nextThoughtNeeded,
114 | )
115 |
116 | return {
117 | "domain": "thinking",
118 | "result": result,
119 | "thoughtNumber": thoughtNumber,
120 | "nextThoughtNeeded": nextThoughtNeeded,
121 | }
122 |
```
--------------------------------------------------------------------------------
/docs/developer-guides/07-performance-optimizations.md:
--------------------------------------------------------------------------------
```markdown
1 | # Performance Optimizations
2 |
3 | This document describes the performance optimizations implemented in BioMCP to improve response times and throughput.
4 |
5 | ## Overview
6 |
7 | BioMCP has been optimized for high-performance biomedical data retrieval through several key improvements:
8 |
9 | - **65% faster test execution** (from ~120s to ~42s)
10 | - **Reduced API calls** through intelligent caching and batching
11 | - **Lower latency** via connection pooling
12 | - **Better resource utilization** with parallel processing
13 |
14 | ## Key Optimizations
15 |
16 | ### 1. Connection Pooling
17 |
18 | HTTP connections are now reused across requests, eliminating connection establishment overhead.
19 |
20 | **Configuration:**
21 |
22 | - `BIOMCP_USE_CONNECTION_POOL` - Enable/disable pooling (default: "true")
23 | - Automatically manages pools per event loop
24 | - Graceful cleanup on shutdown
25 |
26 | **Impact:** ~30% reduction in request latency for sequential operations
27 |
28 | ### 2. Parallel Test Execution
29 |
30 | Tests now run in parallel using pytest-xdist, dramatically reducing test suite execution time.
31 |
32 | **Usage:**
33 |
34 | ```bash
35 | make test # Automatically uses parallel execution
36 | ```
37 |
38 | **Impact:** ~5x faster test execution
39 |
40 | ### 3. Request Batching
41 |
42 | Multiple API requests are batched together when possible, particularly for cBioPortal queries.
43 |
44 | **Features:**
45 |
46 | - Automatic batching based on size/time thresholds
47 | - Configurable batch size (default: 5 for cBioPortal)
48 | - Error isolation per request
49 |
50 | **Impact:** Up to 80% reduction in API calls for bulk operations
51 |
52 | ### 4. Smart Caching
53 |
54 | Multiple caching layers optimize repeated queries:
55 |
56 | - **LRU Cache:** Memory-bounded caching for recent requests
57 | - **Hash-based keys:** 10x faster cache key generation
58 | - **Shared validation context:** Eliminates redundant gene/entity validations
59 |
60 | **Configuration:**
61 |
62 | - Cache size: 1000 entries (configurable)
63 | - TTL: 5-30 minutes depending on data type
64 |
65 | ### 5. Pagination Support
66 |
67 | Europe PMC searches now use pagination for large result sets:
68 |
69 | - Optimal page size: 25 results
70 | - Progressive loading
71 | - Memory-efficient processing
72 |
73 | ### 6. Conditional Metrics
74 |
75 | Performance metrics are only collected when explicitly enabled, reducing overhead.
76 |
77 | **Configuration:**
78 |
79 | - `BIOMCP_METRICS_ENABLED` - Enable metrics (default: "false")
80 |
81 | ## Performance Benchmarks
82 |
83 | ### API Response Times
84 |
85 | | Operation | Before | After | Improvement |
86 | | ------------------------------ | ------ | ----- | ----------- |
87 | | Single gene search | 850ms | 320ms | 62% |
88 | | Bulk variant lookup | 4.2s | 1.1s | 74% |
89 | | Article search with cBioPortal | 2.1s | 780ms | 63% |
90 |
91 | ### Resource Usage
92 |
93 | | Metric | Before | After | Improvement |
94 | | ------------- | ------ | ----- | ----------- |
95 | | Memory (idle) | 145MB | 152MB | +5% |
96 | | Memory (peak) | 512MB | 385MB | -25% |
97 | | CPU (avg) | 35% | 28% | -20% |
98 |
99 | ## Best Practices
100 |
101 | 1. **Keep connection pooling enabled** unless experiencing issues
102 | 2. **Use the unified search** methods to benefit from parallel execution
103 | 3. **Batch operations** when performing multiple lookups
104 | 4. **Monitor cache hit rates** in production environments
105 |
106 | ## Troubleshooting
107 |
108 | ### Connection Pool Issues
109 |
110 | If experiencing connection errors:
111 |
112 | 1. Disable pooling: `export BIOMCP_USE_CONNECTION_POOL=false`
113 | 2. Check for firewall/proxy issues
114 | 3. Verify SSL certificates
115 |
116 | ### Memory Usage
117 |
118 | If memory usage is high:
119 |
120 | 1. Reduce cache size in `request_cache.py`
121 | 2. Lower connection pool limits
122 |
123 | ### Performance Regression
124 |
125 | To identify performance issues:
126 |
127 | 1. Enable metrics: `export BIOMCP_METRICS_ENABLED=true`
128 | 2. Check slow operations in logs
129 | 3. Profile with `py-spy` or similar tools
130 |
131 | ## Future Optimizations
132 |
133 | Planned improvements include:
134 |
135 | - GraphQL batching for complex queries
136 | - Redis integration for distributed caching
137 | - WebSocket support for real-time updates
138 | - GPU acceleration for variant analysis
139 |
```
--------------------------------------------------------------------------------
/tests/tdd/variants/test_search.py:
--------------------------------------------------------------------------------
```python
1 | import pytest
2 |
3 | from biomcp.variants.search import (
4 | ClinicalSignificance,
5 | PolyPhenPrediction,
6 | SiftPrediction,
7 | VariantQuery,
8 | build_query_string,
9 | search_variants,
10 | )
11 |
12 |
13 | @pytest.fixture
14 | def basic_query():
15 | """Create a basic gene query."""
16 | return VariantQuery(gene="BRAF")
17 |
18 |
19 | @pytest.fixture
20 | def complex_query():
21 | """Create a complex query with multiple parameters."""
22 | return VariantQuery(
23 | gene="BRCA1",
24 | significance=ClinicalSignificance.PATHOGENIC,
25 | min_frequency=0.0001,
26 | max_frequency=0.01,
27 | )
28 |
29 |
30 | def test_query_validation():
31 | """Test VariantQuery model validation."""
32 | # Test basic query with gene
33 | query = VariantQuery(gene="BRAF")
34 | assert query.gene == "BRAF"
35 |
36 | # Test query with rsid
37 | query = VariantQuery(rsid="rs113488022")
38 | assert query.rsid == "rs113488022"
39 |
40 | # Test query requires at least one search parameter
41 | with pytest.raises(ValueError):
42 | VariantQuery()
43 |
44 | # Test query with clinical significance enum requires a search parameter
45 | query = VariantQuery(
46 | gene="BRCA1", significance=ClinicalSignificance.PATHOGENIC
47 | )
48 | assert query.significance == ClinicalSignificance.PATHOGENIC
49 |
50 | # Test query with prediction scores
51 | query = VariantQuery(
52 | gene="TP53",
53 | polyphen=PolyPhenPrediction.PROBABLY_DAMAGING,
54 | sift=SiftPrediction.DELETERIOUS,
55 | )
56 | assert query.polyphen == PolyPhenPrediction.PROBABLY_DAMAGING
57 | assert query.sift == SiftPrediction.DELETERIOUS
58 |
59 |
60 | def test_build_query_string():
61 | """Test build_query_string function."""
62 | # Test single field
63 | query = VariantQuery(gene="BRAF")
64 | q_string = build_query_string(query)
65 | assert 'dbnsfp.genename:"BRAF"' in q_string
66 |
67 | # Test multiple fields
68 | query = VariantQuery(gene="BRAF", rsid="rs113488022")
69 | q_string = build_query_string(query)
70 | assert 'dbnsfp.genename:"BRAF"' in q_string
71 | assert "rs113488022" in q_string
72 |
73 | # Test genomic region
74 | query = VariantQuery(region="chr7:140753300-140753400")
75 | q_string = build_query_string(query)
76 | assert "chr7:140753300-140753400" in q_string
77 |
78 | # Test clinical significance
79 | query = VariantQuery(significance=ClinicalSignificance.LIKELY_BENIGN)
80 | q_string = build_query_string(query)
81 | assert 'clinvar.rcv.clinical_significance:"likely benign"' in q_string
82 |
83 | # Test frequency filters
84 | query = VariantQuery(min_frequency=0.0001, max_frequency=0.01)
85 | q_string = build_query_string(query)
86 | assert "gnomad_exome.af.af:>=0.0001" in q_string
87 | assert "gnomad_exome.af.af:<=0.01" in q_string
88 |
89 |
90 | async def test_search_variants_basic(basic_query, anyio_backend):
91 | """Test search_variants function with a basic query."""
92 | # Use a real API query for a common gene
93 | result = await search_variants(basic_query)
94 |
95 | # Verify we got sensible results
96 | assert "BRAF" in result
97 | assert not result.startswith("Error")
98 |
99 |
100 | async def test_search_variants_complex(complex_query, anyio_backend):
101 | """Test search_variants function with a complex query."""
102 | # Use a simple common query that will return results
103 | simple_query = VariantQuery(gene="TP53")
104 | result = await search_variants(simple_query)
105 |
106 | # Verify response formatting
107 | assert not result.startswith("Error")
108 |
109 |
110 | async def test_search_variants_no_results(anyio_backend):
111 | """Test search_variants function with a query that returns no results."""
112 | query = VariantQuery(gene="UNKNOWN_XYZ")
113 | result = await search_variants(query, output_json=True)
114 | assert result == "[]"
115 |
116 |
117 | async def test_search_variants_with_limit(anyio_backend):
118 | """Test search_variants function with size limit."""
119 | # Query with a small limit
120 | query = VariantQuery(gene="TP53", size=3)
121 | result = await search_variants(query)
122 |
123 | # Result should be valid but limited
124 | assert not result.startswith("Error")
125 |
```
--------------------------------------------------------------------------------
/tests/tdd/test_offline_mode.py:
--------------------------------------------------------------------------------
```python
1 | """Tests for offline mode functionality."""
2 |
3 | import os
4 | from unittest.mock import patch
5 |
6 | import pytest
7 |
8 | from biomcp.http_client import RequestError, request_api
9 |
10 |
11 | @pytest.mark.asyncio
12 | async def test_offline_mode_blocks_requests():
13 | """Test that offline mode prevents HTTP requests."""
14 | # Set offline mode
15 | with patch.dict(os.environ, {"BIOMCP_OFFLINE": "true"}):
16 | # Try to make a request
17 | result, error = await request_api(
18 | url="https://api.example.com/test",
19 | request={"test": "data"},
20 | cache_ttl=0, # Disable caching for this test
21 | )
22 |
23 | # Should get an error
24 | assert result is None
25 | assert error is not None
26 | assert isinstance(error, RequestError)
27 | assert error.code == 503
28 | assert "Offline mode enabled" in error.message
29 |
30 |
31 | @pytest.mark.asyncio
32 | async def test_offline_mode_allows_cached_responses():
33 | """Test that offline mode still returns cached responses."""
34 | # First, cache a response (with offline mode disabled)
35 | with (
36 | patch.dict(os.environ, {"BIOMCP_OFFLINE": "false"}),
37 | patch("biomcp.http_client.call_http") as mock_call,
38 | ):
39 | mock_call.return_value = (200, '{"data": "cached"}')
40 |
41 | # Make a request to cache it
42 | result, error = await request_api(
43 | url="https://api.example.com/cached",
44 | request={"test": "data"},
45 | cache_ttl=3600, # Cache for 1 hour
46 | )
47 |
48 | assert result == {"data": "cached"}
49 | assert error is None
50 |
51 | # Now enable offline mode
52 | with patch.dict(os.environ, {"BIOMCP_OFFLINE": "true"}):
53 | # Try to get the same request - should return cached result
54 | result, error = await request_api(
55 | url="https://api.example.com/cached",
56 | request={"test": "data"},
57 | cache_ttl=3600,
58 | )
59 |
60 | # Should get the cached response
61 | assert result == {"data": "cached"}
62 | assert error is None
63 |
64 |
65 | @pytest.mark.asyncio
66 | async def test_offline_mode_case_insensitive():
67 | """Test that offline mode environment variable is case insensitive."""
68 | test_values = ["TRUE", "True", "1", "yes", "YES", "Yes"]
69 |
70 | for value in test_values:
71 | with patch.dict(os.environ, {"BIOMCP_OFFLINE": value}):
72 | result, error = await request_api(
73 | url="https://api.example.com/test",
74 | request={"test": "data"},
75 | cache_ttl=0,
76 | )
77 |
78 | assert result is None
79 | assert error is not None
80 | assert error.code == 503
81 | assert "Offline mode enabled" in error.message
82 |
83 |
84 | @pytest.mark.asyncio
85 | async def test_offline_mode_disabled_by_default():
86 | """Test that offline mode is disabled by default."""
87 | # Clear the environment variable
88 | with (
89 | patch.dict(os.environ, {}, clear=True),
90 | patch("biomcp.http_client.call_http") as mock_call,
91 | ):
92 | mock_call.return_value = (200, '{"data": "response"}')
93 |
94 | result, error = await request_api(
95 | url="https://api.example.com/test",
96 | request={"test": "data"},
97 | cache_ttl=0,
98 | )
99 |
100 | # Should make the request successfully
101 | assert result == {"data": "response"}
102 | assert error is None
103 | mock_call.assert_called_once()
104 |
105 |
106 | @pytest.mark.asyncio
107 | async def test_offline_mode_with_endpoint_tracking():
108 | """Test that offline mode works with endpoint tracking."""
109 | with patch.dict(os.environ, {"BIOMCP_OFFLINE": "true"}):
110 | result, error = await request_api(
111 | url="https://www.ncbi.nlm.nih.gov/research/pubtator3-api/search/",
112 | request={"text": "BRAF"},
113 | endpoint_key="pubtator3_search",
114 | cache_ttl=0,
115 | )
116 |
117 | assert result is None
118 | assert error is not None
119 | assert error.code == 503
120 | assert "pubtator3-api/search/" in error.message
121 |
```
--------------------------------------------------------------------------------
/src/biomcp/variants/links.py:
--------------------------------------------------------------------------------
```python
1 | """Functions for adding database links to variant data."""
2 |
3 | from typing import Any
4 |
5 |
6 | def _calculate_vcf_end(variant: dict[str, Any]) -> int:
7 | """Calculate the end position for UCSC Genome Browser link."""
8 | if "vcf" not in variant:
9 | return 0
10 |
11 | vcf = variant["vcf"]
12 | pos = int(vcf.get("position", 0))
13 | ref = vcf.get("ref", "")
14 | alt = vcf.get("alt", "")
15 |
16 | # For insertions/deletions, handle special cases
17 | if not ref and alt: # insertion
18 | return pos + 1
19 | elif ref and not alt: # deletion
20 | return pos + len(ref)
21 | else: # substitution
22 | return pos + max(0, ((len(alt) + 1) - len(ref)))
23 |
24 |
25 | def _get_first_value(data: Any) -> Any:
26 | """Get the first value from a list or return the value itself."""
27 | if isinstance(data, list) and data:
28 | return data[0]
29 | return data
30 |
31 |
32 | def _ensure_url_section(variant: dict[str, Any]) -> None:
33 | """Ensure the URL section exists in the variant."""
34 | if "url" not in variant:
35 | variant["url"] = {}
36 |
37 |
38 | def _add_dbsnp_links(variant: dict[str, Any]) -> None:
39 | """Add dbSNP and Ensembl links if rsid is present."""
40 | if "dbsnp" in variant and variant["dbsnp"].get("rsid"):
41 | variant["dbsnp"]["url"] = (
42 | f"https://www.ncbi.nlm.nih.gov/snp/{variant['dbsnp']['rsid']}"
43 | )
44 | _ensure_url_section(variant)
45 | variant["url"]["ensembl"] = (
46 | f"https://ensembl.org/Homo_sapiens/Variation/Explore?v={variant['dbsnp']['rsid']}"
47 | )
48 |
49 |
50 | def _add_clinvar_link(variant: dict[str, Any]) -> None:
51 | """Add ClinVar link if variant_id is present."""
52 | if "clinvar" in variant and variant["clinvar"].get("variant_id"):
53 | variant["clinvar"]["url"] = (
54 | f"https://www.ncbi.nlm.nih.gov/clinvar/variation/{variant['clinvar']['variant_id']}/"
55 | )
56 |
57 |
58 | def _add_cosmic_link(variant: dict[str, Any]) -> None:
59 | """Add COSMIC link if cosmic_id is present."""
60 | if "cosmic" in variant and variant["cosmic"].get("cosmic_id"):
61 | variant["cosmic"]["url"] = (
62 | f"https://cancer.sanger.ac.uk/cosmic/mutation/overview?id={variant['cosmic']['cosmic_id']}"
63 | )
64 |
65 |
66 | def _add_civic_link(variant: dict[str, Any]) -> None:
67 | """Add CIViC link if id is present."""
68 | if "civic" in variant and variant["civic"].get("id"):
69 | variant["civic"]["url"] = (
70 | f"https://civicdb.org/variants/{variant['civic']['id']}/summary"
71 | )
72 |
73 |
74 | def _add_ucsc_link(variant: dict[str, Any]) -> None:
75 | """Add UCSC Genome Browser link if chromosome and position are present."""
76 | if (
77 | "chrom" in variant
78 | and "vcf" in variant
79 | and variant["vcf"].get("position")
80 | ):
81 | vcf_end = _calculate_vcf_end(variant)
82 | _ensure_url_section(variant)
83 | variant["url"]["ucsc_genome_browser"] = (
84 | f"https://genome.ucsc.edu/cgi-bin/hgTracks?db=hg19&"
85 | f"position=chr{variant['chrom']}:{variant['vcf']['position']}-{vcf_end}"
86 | )
87 |
88 |
89 | def _add_hgnc_link(variant: dict[str, Any]) -> None:
90 | """Add HGNC link if gene name is present."""
91 | if "dbnsfp" in variant and variant["dbnsfp"].get("genename"):
92 | gene = _get_first_value(variant["dbnsfp"]["genename"])
93 | if gene:
94 | _ensure_url_section(variant)
95 | variant["url"]["hgnc"] = (
96 | f"https://www.genenames.org/data/gene-symbol-report/#!/symbol/{gene}"
97 | )
98 |
99 |
100 | def inject_links(variants: list[dict[str, Any]]) -> list[dict[str, Any]]:
101 | """
102 | Inject database links into variant data.
103 |
104 | Args:
105 | variants: List of variant dictionaries from MyVariant.info API
106 |
107 | Returns:
108 | List of variant dictionaries with added URL links in appropriate sections
109 | """
110 | for variant in variants:
111 | _add_dbsnp_links(variant)
112 | _add_clinvar_link(variant)
113 | _add_cosmic_link(variant)
114 | _add_civic_link(variant)
115 | _add_ucsc_link(variant)
116 | _add_hgnc_link(variant)
117 |
118 | return variants
119 |
```
--------------------------------------------------------------------------------
/src/biomcp/organizations/getter.py:
--------------------------------------------------------------------------------
```python
1 | """Get specific organization details via NCI CTS API."""
2 |
3 | import logging
4 | from typing import Any
5 |
6 | from ..constants import NCI_ORGANIZATIONS_URL
7 | from ..integrations.cts_api import CTSAPIError, make_cts_request
8 |
9 | logger = logging.getLogger(__name__)
10 |
11 |
12 | async def get_organization(
13 | org_id: str,
14 | api_key: str | None = None,
15 | ) -> dict[str, Any]:
16 | """
17 | Get detailed information about a specific organization.
18 |
19 | Args:
20 | org_id: Organization ID
21 | api_key: Optional API key (if not provided, uses NCI_API_KEY env var)
22 |
23 | Returns:
24 | Dictionary with organization details
25 |
26 | Raises:
27 | CTSAPIError: If the API request fails or organization not found
28 | """
29 | try:
30 | # Make API request
31 | url = f"{NCI_ORGANIZATIONS_URL}/{org_id}"
32 | response = await make_cts_request(
33 | url=url,
34 | api_key=api_key,
35 | )
36 |
37 | # Return the organization data
38 | # Handle different possible response formats
39 | if "data" in response:
40 | return response["data"]
41 | elif "organization" in response:
42 | return response["organization"]
43 | else:
44 | return response
45 |
46 | except CTSAPIError:
47 | raise
48 | except Exception as e:
49 | logger.error(f"Failed to get organization {org_id}: {e}")
50 | raise CTSAPIError(f"Failed to retrieve organization: {e!s}") from e
51 |
52 |
53 | def _format_address_fields(org: dict[str, Any]) -> list[str]:
54 | """Extract and format address fields from organization data."""
55 | address_fields = []
56 |
57 | if org.get("address"):
58 | addr = org["address"]
59 | if isinstance(addr, dict):
60 | fields = [
61 | addr.get("street", ""),
62 | addr.get("city", ""),
63 | addr.get("state", ""),
64 | addr.get("zip", ""),
65 | ]
66 | address_fields = [f for f in fields if f]
67 |
68 | country = addr.get("country", "")
69 | if country and country != "United States":
70 | address_fields.append(country)
71 | else:
72 | # Try individual fields
73 | city = org.get("city", "")
74 | state = org.get("state", "")
75 | address_fields = [p for p in [city, state] if p]
76 |
77 | return address_fields
78 |
79 |
80 | def _format_contact_info(org: dict[str, Any]) -> list[str]:
81 | """Format contact information lines."""
82 | lines = []
83 | if org.get("phone"):
84 | lines.append(f"- **Phone**: {org['phone']}")
85 | if org.get("email"):
86 | lines.append(f"- **Email**: {org['email']}")
87 | if org.get("website"):
88 | lines.append(f"- **Website**: {org['website']}")
89 | return lines
90 |
91 |
92 | def format_organization_details(org: dict[str, Any]) -> str:
93 | """
94 | Format organization details as markdown.
95 |
96 | Args:
97 | org: Organization data dictionary
98 |
99 | Returns:
100 | Formatted markdown string
101 | """
102 | # Extract fields with defaults
103 | org_id = org.get("id", org.get("org_id", "Unknown"))
104 | name = org.get("name", "Unknown Organization")
105 | org_type = org.get("type", org.get("category", "Unknown"))
106 |
107 | # Build markdown output
108 | lines = [
109 | f"## Organization: {name}",
110 | "",
111 | "### Basic Information",
112 | f"- **ID**: {org_id}",
113 | f"- **Type**: {org_type}",
114 | ]
115 |
116 | # Add location if available
117 | address_fields = _format_address_fields(org)
118 | if address_fields:
119 | lines.append(f"- **Location**: {', '.join(address_fields)}")
120 |
121 | # Add contact info
122 | lines.extend(_format_contact_info(org))
123 |
124 | # Add description if available
125 | if org.get("description"):
126 | lines.extend([
127 | "",
128 | "### Description",
129 | org["description"],
130 | ])
131 |
132 | # Add parent organization metadata
133 | if org.get("parent_org"):
134 | lines.extend([
135 | "",
136 | "### Parent Organization",
137 | f"- **Name**: {org['parent_org'].get('name', 'Unknown')}",
138 | f"- **ID**: {org['parent_org'].get('id', 'Unknown')}",
139 | ])
140 |
141 | return "\n".join(lines)
142 |
```
--------------------------------------------------------------------------------
/tests/tdd/utils/test_request_cache.py:
--------------------------------------------------------------------------------
```python
1 | """Tests for request caching utilities."""
2 |
3 | import asyncio
4 |
5 | import pytest
6 |
7 | from biomcp.utils.request_cache import (
8 | clear_cache,
9 | get_cached,
10 | request_cache,
11 | set_cached,
12 | )
13 |
14 |
15 | class TestRequestCache:
16 | """Test request caching functionality."""
17 |
18 | @pytest.fixture(autouse=True)
19 | async def clear_cache_before_test(self):
20 | """Clear cache before each test."""
21 | await clear_cache()
22 | yield
23 | await clear_cache()
24 |
25 | @pytest.mark.asyncio
26 | async def test_basic_caching(self):
27 | """Test basic cache get/set operations."""
28 | # Initially should be empty
29 | result = await get_cached("test_key")
30 | assert result is None
31 |
32 | # Set a value
33 | await set_cached("test_key", "test_value", ttl=10)
34 |
35 | # Should retrieve the value
36 | result = await get_cached("test_key")
37 | assert result == "test_value"
38 |
39 | @pytest.mark.asyncio
40 | async def test_cache_expiry(self):
41 | """Test that cached values expire."""
42 | # Set with very short TTL
43 | await set_cached("test_key", "test_value", ttl=0.1)
44 |
45 | # Should be available immediately
46 | result = await get_cached("test_key")
47 | assert result == "test_value"
48 |
49 | # Wait for expiry
50 | await asyncio.sleep(0.2)
51 |
52 | # Should be expired
53 | result = await get_cached("test_key")
54 | assert result is None
55 |
56 | @pytest.mark.asyncio
57 | async def test_request_cache_decorator(self):
58 | """Test the @request_cache decorator."""
59 | call_count = 0
60 |
61 | @request_cache(ttl=10)
62 | async def expensive_function(arg1, arg2):
63 | nonlocal call_count
64 | call_count += 1
65 | return f"{arg1}-{arg2}-{call_count}"
66 |
67 | # First call should execute function
68 | result1 = await expensive_function("a", "b")
69 | assert result1 == "a-b-1"
70 | assert call_count == 1
71 |
72 | # Second call with same args should use cache
73 | result2 = await expensive_function("a", "b")
74 | assert result2 == "a-b-1" # Same result
75 | assert call_count == 1 # Function not called again
76 |
77 | # Different args should execute function
78 | result3 = await expensive_function("c", "d")
79 | assert result3 == "c-d-2"
80 | assert call_count == 2
81 |
82 | @pytest.mark.asyncio
83 | async def test_skip_cache_option(self):
84 | """Test that skip_cache bypasses caching."""
85 | call_count = 0
86 |
87 | @request_cache(ttl=10)
88 | async def cached_function():
89 | nonlocal call_count
90 | call_count += 1
91 | return call_count
92 |
93 | # Normal call - cached
94 | result1 = await cached_function()
95 | assert result1 == 1
96 |
97 | # Skip cache - new execution
98 | result2 = await cached_function(skip_cache=True)
99 | assert result2 == 2
100 |
101 | # Normal call again - still cached
102 | result3 = await cached_function()
103 | assert result3 == 1
104 |
105 | @pytest.mark.asyncio
106 | async def test_none_values_not_cached(self):
107 | """Test that None return values are not cached."""
108 | call_count = 0
109 |
110 | @request_cache(ttl=10)
111 | async def sometimes_none_function(return_none=False):
112 | nonlocal call_count
113 | call_count += 1
114 | return None if return_none else call_count
115 |
116 | # Return None - should not cache
117 | result1 = await sometimes_none_function(return_none=True)
118 | assert result1 is None
119 | assert call_count == 1
120 |
121 | # Call again - should execute again (not cached)
122 | result2 = await sometimes_none_function(return_none=True)
123 | assert result2 is None
124 | assert call_count == 2
125 |
126 | # Return value - should cache
127 | result3 = await sometimes_none_function(return_none=False)
128 | assert result3 == 3
129 | assert call_count == 3
130 |
131 | # Call again - should use cache
132 | result4 = await sometimes_none_function(return_none=False)
133 | assert result4 == 3
134 | assert call_count == 3
135 |
```
--------------------------------------------------------------------------------
/docs/blog/ai-assisted-clinical-trial-search-analysis.md:
--------------------------------------------------------------------------------
```markdown
1 | # AI-Assisted Clinical Trial Search: How BioMCP Transforms Research
2 |
3 | Finding the right clinical trial for a research project has traditionally been
4 | a complex process requiring specialized knowledge of database syntax and
5 | medical terminology. BioMCP is changing this landscape by making clinical trial
6 | data accessible through natural language conversation.
7 |
8 | Video Link:
9 | [](https://www.youtube.com/watch?v=jqGXXnVesjg&list=PLu1amIF_MEfPWhhEsXSuBi90S_xtmVJIW&index=2)
10 |
11 | ## Breaking Down the Barriers to Clinical Trial Information
12 |
13 | BioMCP serves as a specialized Model Context Protocol (MCP) server that
14 | empowers AI assistants and agents with tools to interact with critical
15 | biomedical resources. For clinical trials specifically, BioMCP connects to the
16 | ClinicalTrials.gov API, allowing researchers and clinicians to search and
17 | retrieve trial information through simple conversational queries.
18 |
19 | The power of this approach becomes apparent when we look at how it transforms a
20 | complex search requirement. Imagine needing to find active clinical trials for
21 | pembrolizumab (a cancer immunotherapy drug) specifically for non-small cell
22 | lung carcinoma near Cleveland, Ohio. Traditionally, this would require:
23 |
24 | 1. Navigating to ClinicalTrials.gov
25 | 2. Understanding the proper search fields and syntax
26 | 3. Creating multiple filters for intervention (pembrolizumab), condition (
27 | non-small cell lung carcinoma), status (recruiting), and location (Cleveland
28 | area)
29 | 4. Interpreting the results
30 |
31 | ## From Natural Language to Precise Database Queries
32 |
33 | With BioMCP, this entire process is streamlined into a simple natural language
34 | request. The underlying large language model (LLM) interprets the query,
35 | identifies the key entities (drug name, cancer type, location), and translates
36 | these into the precise parameters needed for the ClinicalTrials.gov API.
37 |
38 | The system returns relevant trials that match all criteria, presenting them in
39 | an easy-to-understand format. But the interaction doesn't end there—BioMCP
40 | maintains context throughout the conversation, enabling follow-up questions
41 | like:
42 |
43 | - Where exactly are these trials located and how far are they from downtown
44 | Cleveland?
45 | - What biomarker eligibility criteria do these trials require?
46 | - Are there exclusion criteria I should be aware of?
47 |
48 | For each of these questions, BioMCP calls the appropriate tool (trial
49 | locations, trial protocols) and processes the information to provide meaningful
50 | answers without requiring the user to navigate different interfaces or learn
51 | new query languages.
52 |
53 | ## Beyond Basic Search: Understanding Trial Details
54 |
55 | What truly sets BioMCP apart is its ability to go beyond simple listings. When
56 | asked about biomarker eligibility criteria, the system can extract this
57 | information from the full trial protocol, synthesize it, and present a clear
58 | summary of requirements. This capability transforms what would typically be
59 | hours of reading dense clinical documentation into a conversational exchange
60 | that delivers precisely what the researcher needs.
61 |
62 | ## Transforming Clinical Research Workflows
63 |
64 | The implications for clinical research are significant. By lowering the
65 | technical barriers to accessing trial information, BioMCP can help:
66 |
67 | - Researchers understand the landscape of current research in their field
68 | - Research teams identify promising studies more efficiently
69 | - Clinical research organizations track competing or complementary trials
70 | - Research coordinators identify potential recruitment sites based on location
71 |
72 | As part of the broader BioMCP ecosystem—which also includes access to genomic
73 | variant information and PubMed literature—this clinical trial search capability
74 | represents a fundamental shift in how we interact with biomedical information.
75 | By bringing the power of natural language processing to specialized databases,
76 | BioMCP is helping to democratize access to critical health information and
77 | accelerate the research process.
78 |
```
--------------------------------------------------------------------------------
/src/biomcp/utils/query_utils.py:
--------------------------------------------------------------------------------
```python
1 | """Utilities for query parsing and manipulation."""
2 |
3 | import re
4 | from typing import Any
5 |
6 |
7 | def parse_or_query(query: str) -> list[str]:
8 | """Parse OR query into individual search terms.
9 |
10 | Handles formats like:
11 | - "term1 OR term2"
12 | - 'term1 OR term2 OR "term with spaces"'
13 | - "TERM1 or term2 or term3" (case insensitive)
14 |
15 | Args:
16 | query: Query string that may contain OR operators
17 |
18 | Returns:
19 | List of individual search terms with quotes and whitespace cleaned
20 |
21 | Examples:
22 | >>> parse_or_query("PD-L1 OR CD274")
23 | ['PD-L1', 'CD274']
24 |
25 | >>> parse_or_query('BRAF OR "v-raf murine" OR ARAF')
26 | ['BRAF', 'v-raf murine', 'ARAF']
27 | """
28 | # Split by OR (case insensitive)
29 | terms = re.split(r"\s+OR\s+", query, flags=re.IGNORECASE)
30 |
31 | # Clean up each term - remove quotes and extra whitespace
32 | cleaned_terms = []
33 | for term in terms:
34 | # Remove surrounding quotes (both single and double)
35 | term = term.strip().strip('"').strip("'").strip()
36 | if term:
37 | cleaned_terms.append(term)
38 |
39 | return cleaned_terms
40 |
41 |
42 | def contains_or_operator(query: str) -> bool:
43 | """Check if a query contains OR operators.
44 |
45 | Args:
46 | query: Query string to check
47 |
48 | Returns:
49 | True if query contains " OR " or " or ", False otherwise
50 | """
51 | return " OR " in query or " or " in query
52 |
53 |
54 | async def search_with_or_support(
55 | query: str,
56 | search_func: Any,
57 | search_params: dict[str, Any],
58 | id_field: str = "id",
59 | fallback_id_field: str | None = None,
60 | ) -> dict[str, Any]:
61 | """Generic OR query search handler.
62 |
63 | This function handles OR queries by making multiple API calls and combining results.
64 |
65 | Args:
66 | query: Query string that may contain OR operators
67 | search_func: Async search function to call for each term
68 | search_params: Base parameters to pass to search function (excluding the query term)
69 | id_field: Primary field name for deduplication (default: "id")
70 | fallback_id_field: Alternative field name if primary is missing
71 |
72 | Returns:
73 | Combined results from all searches with duplicates removed
74 | """
75 | # Check if this is an OR query
76 | if contains_or_operator(query):
77 | search_terms = parse_or_query(query)
78 | else:
79 | search_terms = [query]
80 |
81 | # Collect all unique results
82 | all_results = {}
83 | total_found = 0
84 |
85 | # Search for each term
86 | for term in search_terms:
87 | try:
88 | # Call the search function with the term
89 | results = await search_func(**{**search_params, "name": term})
90 |
91 | # Extract results list (handle different response formats)
92 | items_key = None
93 | for key in [
94 | "biomarkers",
95 | "organizations",
96 | "interventions",
97 | "diseases",
98 | "data",
99 | "items",
100 | ]:
101 | if key in results:
102 | items_key = key
103 | break
104 |
105 | if not items_key:
106 | continue
107 |
108 | # Add unique items (deduplicate by ID)
109 | for item in results.get(items_key, []):
110 | item_id = item.get(id_field)
111 | if not item_id and fallback_id_field:
112 | item_id = item.get(fallback_id_field)
113 |
114 | if item_id and item_id not in all_results:
115 | all_results[item_id] = item
116 |
117 | total_found += results.get("total", 0)
118 |
119 | except Exception as e:
120 | # Log the error and continue with other terms
121 | import logging
122 |
123 | logger = logging.getLogger(__name__)
124 | logger.warning(f"Failed to search for term '{term}': {e}")
125 | continue
126 |
127 | # Convert back to list
128 | unique_items = list(all_results.values())
129 |
130 | # Return in standard format
131 | return {
132 | "items": unique_items,
133 | "total": len(unique_items),
134 | "search_terms": search_terms,
135 | "total_found_across_terms": total_found,
136 | }
137 |
```
--------------------------------------------------------------------------------
/tests/tdd/test_endpoint_documentation.py:
--------------------------------------------------------------------------------
```python
1 | """Test that endpoint documentation is kept up to date."""
2 |
3 | import subprocess
4 | import sys
5 | from pathlib import Path
6 |
7 |
8 | class TestEndpointDocumentation:
9 | """Test the endpoint documentation generation."""
10 |
11 | def test_third_party_endpoints_file_exists(self):
12 | """Test that THIRD_PARTY_ENDPOINTS.md exists."""
13 | endpoints_file = (
14 | Path(__file__).parent.parent.parent / "THIRD_PARTY_ENDPOINTS.md"
15 | )
16 | assert endpoints_file.exists(), "THIRD_PARTY_ENDPOINTS.md must exist"
17 |
18 | def test_endpoints_documentation_is_current(self):
19 | """Test that the endpoints documentation can be generated without errors."""
20 | # Run the generation script
21 | script_path = (
22 | Path(__file__).parent.parent.parent
23 | / "scripts"
24 | / "generate_endpoints_doc.py"
25 | )
26 | result = subprocess.run( # noqa: S603
27 | [sys.executable, str(script_path)],
28 | capture_output=True,
29 | text=True,
30 | check=False,
31 | )
32 |
33 | assert result.returncode == 0, f"Script failed: {result.stderr}"
34 |
35 | # The script should report that it generated the file
36 | assert (
37 | "Generated" in result.stdout or result.stdout == ""
38 | ), f"Unexpected output: {result.stdout}"
39 |
40 | def test_all_endpoints_documented(self):
41 | """Test that all endpoints in the registry are documented."""
42 | from biomcp.utils.endpoint_registry import get_registry
43 |
44 | registry = get_registry()
45 | endpoints = registry.get_all_endpoints()
46 |
47 | # Read the documentation
48 | endpoints_file = (
49 | Path(__file__).parent.parent.parent / "THIRD_PARTY_ENDPOINTS.md"
50 | )
51 | content = endpoints_file.read_text()
52 |
53 | # Check each endpoint is mentioned
54 | for key, info in endpoints.items():
55 | assert key in content, f"Endpoint {key} not found in documentation"
56 | assert (
57 | info.url in content
58 | ), f"URL {info.url} not found in documentation"
59 |
60 | def test_documentation_contains_required_sections(self):
61 | """Test that documentation contains all required sections."""
62 | endpoints_file = (
63 | Path(__file__).parent.parent.parent / "THIRD_PARTY_ENDPOINTS.md"
64 | )
65 | content = endpoints_file.read_text()
66 |
67 | required_sections = [
68 | "# Third-Party Endpoints Used by BioMCP",
69 | "## Overview",
70 | "## Endpoints by Category",
71 | "### Biomedical Literature",
72 | "### Clinical Trials",
73 | "### Variant Databases",
74 | "### Cancer Genomics",
75 | "## Domain Summary",
76 | "## Compliance and Privacy",
77 | "## Network Control",
78 | "BIOMCP_OFFLINE",
79 | ]
80 |
81 | for section in required_sections:
82 | assert (
83 | section in content
84 | ), f"Required section '{section}' not found in documentation"
85 |
86 | def test_endpoint_counts_accurate(self):
87 | """Test that endpoint counts in the overview are accurate."""
88 | from biomcp.utils.endpoint_registry import get_registry
89 |
90 | registry = get_registry()
91 | endpoints = registry.get_all_endpoints()
92 | domains = registry.get_unique_domains()
93 |
94 | endpoints_file = (
95 | Path(__file__).parent.parent.parent / "THIRD_PARTY_ENDPOINTS.md"
96 | )
97 | content = endpoints_file.read_text()
98 |
99 | # Extract counts from overview
100 | import re
101 |
102 | match = re.search(
103 | r"BioMCP connects to (\d+) external domains across (\d+) endpoints",
104 | content,
105 | )
106 |
107 | assert match, "Could not find endpoint counts in overview"
108 |
109 | doc_domains = int(match.group(1))
110 | doc_endpoints = int(match.group(2))
111 |
112 | assert (
113 | doc_domains == len(domains)
114 | ), f"Document says {doc_domains} domains but registry has {len(domains)}"
115 | assert (
116 | doc_endpoints == len(endpoints)
117 | ), f"Document says {doc_endpoints} endpoints but registry has {len(endpoints)}"
118 |
```
--------------------------------------------------------------------------------
/src/biomcp/cli/organizations.py:
--------------------------------------------------------------------------------
```python
1 | """CLI commands for organization search and lookup."""
2 |
3 | import asyncio
4 | from typing import Annotated
5 |
6 | import typer
7 |
8 | from ..integrations.cts_api import CTSAPIError, get_api_key_instructions
9 | from ..organizations import get_organization, search_organizations
10 | from ..organizations.getter import format_organization_details
11 | from ..organizations.search import format_organization_results
12 |
13 | organization_app = typer.Typer(
14 | no_args_is_help=True,
15 | help="Search and retrieve organization information from NCI CTS API",
16 | )
17 |
18 |
19 | @organization_app.command("search")
20 | def search_organizations_cli(
21 | name: Annotated[
22 | str | None,
23 | typer.Argument(
24 | help="Organization name to search for (partial match supported)"
25 | ),
26 | ] = None,
27 | org_type: Annotated[
28 | str | None,
29 | typer.Option(
30 | "--type",
31 | help="Type of organization (e.g., industry, academic)",
32 | ),
33 | ] = None,
34 | city: Annotated[
35 | str | None,
36 | typer.Option(
37 | "--city",
38 | help="City location",
39 | ),
40 | ] = None,
41 | state: Annotated[
42 | str | None,
43 | typer.Option(
44 | "--state",
45 | help="State location (2-letter code)",
46 | ),
47 | ] = None,
48 | page_size: Annotated[
49 | int,
50 | typer.Option(
51 | "--page-size",
52 | help="Number of results per page",
53 | min=1,
54 | max=100,
55 | ),
56 | ] = 20,
57 | page: Annotated[
58 | int,
59 | typer.Option(
60 | "--page",
61 | help="Page number",
62 | min=1,
63 | ),
64 | ] = 1,
65 | api_key: Annotated[
66 | str | None,
67 | typer.Option(
68 | "--api-key",
69 | help="NCI API key (overrides NCI_API_KEY env var)",
70 | envvar="NCI_API_KEY",
71 | ),
72 | ] = None,
73 | ) -> None:
74 | """
75 | Search for organizations in the NCI Clinical Trials database.
76 |
77 | Examples:
78 | # Search by name
79 | biomcp organization search "MD Anderson"
80 |
81 | # Search by type
82 | biomcp organization search --type academic
83 |
84 | # Search by location
85 | biomcp organization search --city Boston --state MA
86 |
87 | # Combine filters
88 | biomcp organization search Cancer --type industry --state CA
89 | """
90 | try:
91 | results = asyncio.run(
92 | search_organizations(
93 | name=name,
94 | org_type=org_type,
95 | city=city,
96 | state=state,
97 | page_size=page_size,
98 | page=page,
99 | api_key=api_key,
100 | )
101 | )
102 |
103 | output = format_organization_results(results)
104 | typer.echo(output)
105 |
106 | except CTSAPIError as e:
107 | if "API key required" in str(e):
108 | typer.echo(get_api_key_instructions())
109 | else:
110 | typer.echo(f"Error: {e}", err=True)
111 | raise typer.Exit(1) from e
112 | except Exception as e:
113 | typer.echo(f"Unexpected error: {e}", err=True)
114 | raise typer.Exit(1) from e
115 |
116 |
117 | @organization_app.command("get")
118 | def get_organization_cli(
119 | org_id: Annotated[
120 | str,
121 | typer.Argument(help="Organization ID"),
122 | ],
123 | api_key: Annotated[
124 | str | None,
125 | typer.Option(
126 | "--api-key",
127 | help="NCI API key (overrides NCI_API_KEY env var)",
128 | envvar="NCI_API_KEY",
129 | ),
130 | ] = None,
131 | ) -> None:
132 | """
133 | Get detailed information about a specific organization.
134 |
135 | Example:
136 | biomcp organization get ORG123456
137 | """
138 | try:
139 | org_data = asyncio.run(
140 | get_organization(
141 | org_id=org_id,
142 | api_key=api_key,
143 | )
144 | )
145 |
146 | output = format_organization_details(org_data)
147 | typer.echo(output)
148 |
149 | except CTSAPIError as e:
150 | if "API key required" in str(e):
151 | typer.echo(get_api_key_instructions())
152 | else:
153 | typer.echo(f"Error: {e}", err=True)
154 | raise typer.Exit(1) from e
155 | except Exception as e:
156 | typer.echo(f"Unexpected error: {e}", err=True)
157 | raise typer.Exit(1) from e
158 |
```
--------------------------------------------------------------------------------
/tests/bdd/search_variants/test_search.py:
--------------------------------------------------------------------------------
```python
1 | import json
2 | import shlex
3 | from typing import Any
4 |
5 | from assertpy import assert_that
6 | from pytest_bdd import parsers, scenarios, then, when
7 | from typer.testing import CliRunner
8 |
9 | from biomcp.cli import app
10 |
11 | scenarios("search.feature")
12 |
13 | runner = CliRunner()
14 |
15 | # Field mapping - Updated chromosome key
16 | FIELD_MAP = {
17 | "chromosome": ["chrom"],
18 | "frequency": ["gnomad_exome", "af", "af"],
19 | "gene": ["dbnsfp", "genename"],
20 | "hgvsc": ["dbnsfp", "hgvsc"],
21 | "hgvsp": ["dbnsfp", "hgvsp"],
22 | "cadd": ["cadd", "phred"],
23 | "polyphen": ["dbnsfp", "polyphen2", "hdiv", "pred"],
24 | "position": ["vcf", "position"],
25 | "rsid": ["dbsnp", "rsid"],
26 | "sift": ["dbnsfp", "sift", "pred"],
27 | "significance": ["clinvar", "rcv", "clinical_significance"],
28 | "uniprot_id": ["mutdb", "uniprot_id"],
29 | }
30 |
31 |
32 | def get_value(data: dict, key: str) -> Any | None:
33 | """Extract value from nested dictionary using field mapping."""
34 | key_path = FIELD_MAP.get(key, [key])
35 | current_value = data.get("hits")
36 | for key in key_path:
37 | if isinstance(current_value, dict):
38 | current_value = current_value.get(key)
39 | elif isinstance(current_value, list):
40 | current_value = current_value[0].get(key)
41 | if current_value and isinstance(current_value, list):
42 | return current_value[0]
43 | return current_value
44 |
45 |
46 | # --- @when Step ---
47 | @when(
48 | parsers.re(r'I run "(?P<command>.*?)"(?: #.*)?$'),
49 | target_fixture="variants_data",
50 | )
51 | def variants_data(command) -> dict:
52 | """Run variant search command with --json and return parsed results."""
53 | args = shlex.split(command)[1:] # trim 'biomcp'
54 | args += ["--json"]
55 | if "--size" not in args:
56 | args.extend(["--size", "10"])
57 |
58 | result = runner.invoke(app, args, catch_exceptions=False)
59 | assert result.exit_code == 0, "CLI command failed"
60 | data = json.loads(result.stdout)
61 | return data
62 |
63 |
64 | def normalize(v):
65 | try:
66 | return float(v)
67 | except ValueError:
68 | try:
69 | return int(v)
70 | except ValueError:
71 | return v.lower()
72 |
73 |
74 | @then(
75 | parsers.re(
76 | r"each variant should have (?P<field>\w+) that (?P<operator>(?:is|equal|to|contains|greater|less|than|or|\s)+)\s+(?P<expected>.+)$"
77 | )
78 | )
79 | def check_variant_field(it, variants_data, field, operator, expected):
80 | """
81 | For each variant, apply an assertpy operator against a given field.
82 | Supports operator names with spaces (e.g. "is equal to") or underscores (e.g. "is_equal_to").
83 | """
84 | # Normalize operator: lower case and replace spaces with underscores.
85 | operator = operator.strip().lower().replace(" ", "_")
86 | successes = set()
87 | failures = set()
88 | for v_num, value in it(FIELD_MAP, variants_data, field):
89 | value = normalize(value)
90 | expected = normalize(expected)
91 | f = getattr(assert_that(value), operator)
92 | try:
93 | f(expected)
94 | successes.add(v_num)
95 | except AssertionError:
96 | failures.add(v_num)
97 |
98 | failures -= successes
99 | assert len(failures) == 0, f"Failure: {field} {operator} {expected}"
100 |
101 |
102 | @then(
103 | parsers.re(
104 | r"the number of variants (?P<operator>(?:is|equal|to|contains|greater|less|than|or|\s)+)\s+(?P<expected>\d+)$"
105 | )
106 | )
107 | def number_of_variants_check(variants_data, operator, expected):
108 | """Check the number of variants returned."""
109 | if (
110 | isinstance(variants_data, list)
111 | and len(variants_data) == 1
112 | and "error" in variants_data[0]
113 | ):
114 | count = 0 # If we have an error response, count as 0 variants
115 | elif isinstance(variants_data, dict) and "variants" in variants_data:
116 | # Handle new format with cBioPortal summary
117 | count = len(variants_data["variants"])
118 | elif isinstance(variants_data, dict) and "hits" in variants_data:
119 | # Handle myvariant.info response format
120 | count = len(variants_data["hits"])
121 | else:
122 | count = len(variants_data) if isinstance(variants_data, list) else 0
123 | operator = operator.strip().lower().replace(" ", "_")
124 | f = getattr(assert_that(count), operator)
125 | f(int(expected))
126 |
```
--------------------------------------------------------------------------------
/src/biomcp/cli/diseases.py:
--------------------------------------------------------------------------------
```python
1 | """CLI commands for disease information and search."""
2 |
3 | import asyncio
4 | from typing import Annotated
5 |
6 | import typer
7 |
8 | from ..diseases import get_disease
9 | from ..diseases.search import format_disease_results, search_diseases
10 | from ..integrations.cts_api import CTSAPIError, get_api_key_instructions
11 |
12 | disease_app = typer.Typer(
13 | no_args_is_help=True,
14 | help="Search and retrieve disease information",
15 | )
16 |
17 |
18 | @disease_app.command("get")
19 | def get_disease_cli(
20 | disease_name: Annotated[
21 | str,
22 | typer.Argument(help="Disease name or identifier"),
23 | ],
24 | ) -> None:
25 | """
26 | Get disease information from MyDisease.info.
27 |
28 | This returns detailed information including synonyms, definitions,
29 | and database cross-references.
30 |
31 | Examples:
32 | biomcp disease get melanoma
33 | biomcp disease get "lung cancer"
34 | biomcp disease get GIST
35 | """
36 | result = asyncio.run(get_disease(disease_name))
37 | typer.echo(result)
38 |
39 |
40 | @disease_app.command("search")
41 | def search_diseases_cli(
42 | name: Annotated[
43 | str | None,
44 | typer.Argument(
45 | help="Disease name to search for (partial match supported)"
46 | ),
47 | ] = None,
48 | include_synonyms: Annotated[
49 | bool,
50 | typer.Option(
51 | "--synonyms/--no-synonyms",
52 | help="[Deprecated] This option is ignored - API always searches synonyms",
53 | ),
54 | ] = True,
55 | category: Annotated[
56 | str | None,
57 | typer.Option(
58 | "--category",
59 | help="Disease category/type filter",
60 | ),
61 | ] = None,
62 | page_size: Annotated[
63 | int,
64 | typer.Option(
65 | "--page-size",
66 | help="Number of results per page",
67 | min=1,
68 | max=100,
69 | ),
70 | ] = 20,
71 | page: Annotated[
72 | int,
73 | typer.Option(
74 | "--page",
75 | help="Page number",
76 | min=1,
77 | ),
78 | ] = 1,
79 | api_key: Annotated[
80 | str | None,
81 | typer.Option(
82 | "--api-key",
83 | help="NCI API key (overrides NCI_API_KEY env var)",
84 | envvar="NCI_API_KEY",
85 | ),
86 | ] = None,
87 | source: Annotated[
88 | str,
89 | typer.Option(
90 | "--source",
91 | help="Data source: 'mydisease' (default) or 'nci'",
92 | show_choices=True,
93 | ),
94 | ] = "mydisease",
95 | ) -> None:
96 | """
97 | Search for diseases in MyDisease.info or NCI CTS database.
98 |
99 | The NCI source provides controlled vocabulary of cancer conditions
100 | used in clinical trials, with official terms and synonyms.
101 |
102 | Examples:
103 | # Search MyDisease.info (default)
104 | biomcp disease search melanoma
105 |
106 | # Search NCI cancer terms
107 | biomcp disease search melanoma --source nci
108 |
109 | # Search without synonyms
110 | biomcp disease search "breast cancer" --no-synonyms --source nci
111 |
112 | # Filter by category
113 | biomcp disease search --category neoplasm --source nci
114 | """
115 | if source == "nci":
116 | # Use NCI CTS API
117 | try:
118 | results = asyncio.run(
119 | search_diseases(
120 | name=name,
121 | include_synonyms=include_synonyms,
122 | category=category,
123 | page_size=page_size,
124 | page=page,
125 | api_key=api_key,
126 | )
127 | )
128 |
129 | output = format_disease_results(results)
130 | typer.echo(output)
131 |
132 | except CTSAPIError as e:
133 | if "API key required" in str(e):
134 | typer.echo(get_api_key_instructions())
135 | else:
136 | typer.echo(f"Error: {e}", err=True)
137 | raise typer.Exit(1) from e
138 | except Exception as e:
139 | typer.echo(f"Unexpected error: {e}", err=True)
140 | raise typer.Exit(1) from e
141 | else:
142 | # Default to MyDisease.info
143 | # For now, just search by name
144 | if name:
145 | result = asyncio.run(get_disease(name))
146 | typer.echo(result)
147 | else:
148 | typer.echo("Please provide a disease name to search for.")
149 | raise typer.Exit(1)
150 |
```
--------------------------------------------------------------------------------
/tests/tdd/test_mcp_tools.py:
--------------------------------------------------------------------------------
```python
1 | """Tests for MCP tool wrappers."""
2 |
3 | import json
4 | from unittest.mock import patch
5 |
6 | import pytest
7 |
8 | from biomcp.articles.search import _article_searcher
9 |
10 |
11 | class TestArticleSearcherMCPTool:
12 | """Test the _article_searcher MCP tool."""
13 |
14 | @pytest.mark.asyncio
15 | async def test_article_searcher_with_all_params(self):
16 | """Test article_searcher with all parameters."""
17 | mock_results = [{"title": "Test Article", "pmid": 12345}]
18 |
19 | with patch(
20 | "biomcp.articles.search_optimized.article_searcher_optimized"
21 | ) as mock_search:
22 | mock_search.return_value = json.dumps(mock_results)
23 |
24 | await _article_searcher(
25 | call_benefit="Testing search functionality",
26 | chemicals="aspirin,ibuprofen",
27 | diseases="cancer,diabetes",
28 | genes="BRAF,TP53",
29 | keywords="mutation,therapy",
30 | variants="V600E,R175H",
31 | include_preprints=True,
32 | )
33 |
34 | # Verify the function was called
35 | mock_search.assert_called_once()
36 |
37 | # Check the parameters were passed correctly
38 | kwargs = mock_search.call_args[1]
39 | assert kwargs["call_benefit"] == "Testing search functionality"
40 | assert kwargs["chemicals"] == "aspirin,ibuprofen"
41 | assert kwargs["diseases"] == "cancer,diabetes"
42 | assert kwargs["genes"] == "BRAF,TP53"
43 | assert kwargs["keywords"] == "mutation,therapy"
44 | assert kwargs["variants"] == "V600E,R175H"
45 | assert kwargs["include_preprints"] is True
46 | assert kwargs.get("include_cbioportal", True) is True
47 |
48 | @pytest.mark.asyncio
49 | async def test_article_searcher_with_lists(self):
50 | """Test article_searcher with list inputs."""
51 | with patch(
52 | "biomcp.articles.search_optimized.article_searcher_optimized"
53 | ) as mock_search:
54 | mock_search.return_value = "## Results"
55 |
56 | await _article_searcher(
57 | call_benefit="Testing with lists",
58 | chemicals=["drug1", "drug2"],
59 | diseases=["disease1"],
60 | genes=["GENE1"],
61 | include_preprints=False,
62 | )
63 |
64 | # Check list parameters were passed correctly
65 | kwargs = mock_search.call_args[1]
66 | assert kwargs["call_benefit"] == "Testing with lists"
67 | assert kwargs["chemicals"] == ["drug1", "drug2"]
68 | assert kwargs["diseases"] == ["disease1"]
69 | assert kwargs["genes"] == ["GENE1"]
70 | assert kwargs["include_preprints"] is False
71 |
72 | @pytest.mark.asyncio
73 | async def test_article_searcher_minimal_params(self):
74 | """Test article_searcher with minimal parameters."""
75 | with patch(
76 | "biomcp.articles.search_optimized.article_searcher_optimized"
77 | ) as mock_search:
78 | mock_search.return_value = "## No results"
79 |
80 | await _article_searcher(call_benefit="Minimal test")
81 |
82 | # Should still work with no search parameters
83 | kwargs = mock_search.call_args[1]
84 | assert kwargs["call_benefit"] == "Minimal test"
85 | assert kwargs.get("chemicals") is None
86 | assert kwargs.get("diseases") is None
87 | assert kwargs.get("genes") is None
88 | assert kwargs.get("keywords") is None
89 | assert kwargs.get("variants") is None
90 |
91 | @pytest.mark.asyncio
92 | async def test_article_searcher_empty_strings(self):
93 | """Test article_searcher with empty strings."""
94 | with patch(
95 | "biomcp.articles.search_optimized.article_searcher_optimized"
96 | ) as mock_search:
97 | mock_search.return_value = "## Results"
98 |
99 | await _article_searcher(
100 | call_benefit="Empty string test",
101 | chemicals="",
102 | diseases="",
103 | genes="",
104 | )
105 |
106 | # Empty strings are passed through
107 | kwargs = mock_search.call_args[1]
108 | assert kwargs["call_benefit"] == "Empty string test"
109 | assert kwargs["chemicals"] == ""
110 | assert kwargs["diseases"] == ""
111 | assert kwargs["genes"] == ""
112 |
```
--------------------------------------------------------------------------------
/docs/tutorials/remote-connection.md:
--------------------------------------------------------------------------------
```markdown
1 | # Connecting to Remote BioMCP
2 |
3 | This guide walks you through connecting Claude to the remote BioMCP server, providing instant access to biomedical research tools without any local installation.
4 |
5 | ## Overview
6 |
7 | The remote BioMCP server (https://remote.biomcp.org/mcp) provides cloud-hosted access to all BioMCP tools. This eliminates the need for local installation while maintaining full functionality.
8 |
9 | !!! success "Benefits of Remote Connection" - **No Installation Required**: Start using BioMCP immediately - **Always Up-to-Date**: Automatically receive the latest features and improvements - **Cloud-Powered**: Leverage server-side resources for faster searches - **Secure Authentication**: Uses Google OAuth for secure access
10 |
11 | !!! info "Privacy Notice"
12 | We log user emails and queries to improve the service. All data is handled according to our privacy policy.
13 |
14 | ## Step-by-Step Setup
15 |
16 | ### Step 1: Access Custom Connectors
17 |
18 | Navigate to the **Custom Connectors** section in your Claude interface. This is where you'll configure the connection to BioMCP.
19 |
20 | 
21 |
22 | ### Step 2: Add Custom Connector
23 |
24 | Click the **Add Custom Connector** button and enter the following details:
25 |
26 | - **Name**: BioMCP
27 | - **URL**: `https://remote.biomcp.org/mcp`
28 |
29 | 
30 |
31 | ### Step 3: Verify Connector is Enabled
32 |
33 | After adding, you should see BioMCP listed with an "Enabled" status. This confirms the connector was added successfully.
34 |
35 | 
36 |
37 | ### Step 4: Connect to BioMCP
38 |
39 | Return to the main Connectors section where you'll now see BioMCP available for connection. Click the **Connect** button.
40 |
41 | 
42 |
43 | ### Step 5: Authenticate with Google
44 |
45 | You'll be redirected to Google OAuth for authentication. Sign in with any valid Google account. This step ensures secure access to the service.
46 |
47 | 
48 |
49 | !!! note "Authentication" - Any valid Google account works - Your email is logged for service improvement - Authentication is handled securely through Google OAuth
50 |
51 | ### Step 6: Connection Success
52 |
53 | Once authenticated, you'll see a successful connection message displaying the available tool count. As of January 2025, there are 23 tools available (this number may increase as new features are added).
54 |
55 | 
56 |
57 | ## Verifying Your Connection
58 |
59 | After successful connection, you can verify BioMCP is working by asking Claude:
60 |
61 | ```
62 | What tools do you have available from BioMCP?
63 | ```
64 |
65 | Claude should list the available tools including:
66 |
67 | - Article search and retrieval (PubMed/PubTator3)
68 | - Clinical trials search (ClinicalTrials.gov and NCI)
69 | - Variant analysis (MyVariant.info)
70 | - Gene, drug, and disease information
71 | - Sequential thinking for complex research
72 |
73 | ## Troubleshooting
74 |
75 | ### Connection Failed
76 |
77 | - Ensure you entered the URL exactly as shown: `https://remote.biomcp.org/mcp`
78 | - Check your internet connection
79 | - Try disconnecting and reconnecting
80 |
81 | ### Authentication Issues
82 |
83 | - Make sure you're using a valid Google account
84 | - Clear your browser cache if authentication hangs
85 | - Try using a different browser if issues persist
86 |
87 | ### Tools Not Available
88 |
89 | - Disconnect and reconnect to BioMCP
90 | - Refresh your Claude session
91 | - Contact support if tools remain unavailable
92 |
93 | ## Next Steps
94 |
95 | Now that you're connected to BioMCP, you can:
96 |
97 | 1. **Search biomedical literature**: "Find recent papers on BRAF mutations in melanoma"
98 | 2. **Analyze clinical trials**: "What trials are recruiting for lung cancer with EGFR mutations?"
99 | 3. **Interpret variants**: "What is the clinical significance of TP53 p.R273H?"
100 | 4. **Explore drug information**: "Tell me about pembrolizumab's mechanism and indications"
101 |
102 | ## Support
103 |
104 | For issues or questions about the remote BioMCP connection:
105 |
106 | - GitHub Issues: [https://github.com/genomoncology/biomcp/issues](https://github.com/genomoncology/biomcp/issues)
107 | - Documentation: [https://biomcp.org](https://biomcp.org)
108 |
```
--------------------------------------------------------------------------------
/tests/config/test_smithery_config.py:
--------------------------------------------------------------------------------
```python
1 | #!/usr/bin/env python
2 | """
3 | Test script to validate Smithery configuration against actual function implementations.
4 | This script checks that the schema definitions in smithery.yaml match the expected
5 | function parameters in your codebase.
6 | """
7 |
8 | import os
9 | from typing import Any
10 |
11 | import pytest
12 | import yaml
13 | from pydantic import BaseModel
14 |
15 | from biomcp.articles.search import PubmedRequest
16 |
17 | # Import the functions we want to test
18 | from biomcp.trials.search import TrialQuery
19 | from biomcp.variants.search import VariantQuery
20 |
21 |
22 | @pytest.fixture
23 | def smithery_config():
24 | """Load the Smithery configuration."""
25 | # Get the project root directory
26 | project_root = os.path.abspath(
27 | os.path.join(os.path.dirname(__file__), "../..")
28 | )
29 | config_path = os.path.join(project_root, "smithery.yaml")
30 |
31 | with open(config_path) as f:
32 | return yaml.safe_load(f)
33 |
34 |
35 | def test_smithery_config(smithery_config):
36 | """Test that all tool schemas in smithery.yaml match the expected function parameters."""
37 | # Functions to test and their expected parameter types
38 | functions_to_test = {
39 | "trial_searcher": {"param_name": "query", "expected_type": TrialQuery},
40 | "variant_searcher": {
41 | "param_name": "query",
42 | "expected_type": VariantQuery,
43 | },
44 | "article_searcher": {
45 | "param_name": "query",
46 | "expected_type": PubmedRequest,
47 | },
48 | "trial_protocol": {"param_name": "nct_id", "expected_type": str},
49 | "trial_locations": {"param_name": "nct_id", "expected_type": str},
50 | "trial_outcomes": {"param_name": "nct_id", "expected_type": str},
51 | "trial_references": {"param_name": "nct_id", "expected_type": str},
52 | "article_details": {"param_name": "pmid", "expected_type": str},
53 | "variant_details": {"param_name": "variant_id", "expected_type": str},
54 | }
55 |
56 | for tool_name, param_info in functions_to_test.items():
57 | validate_tool_schema(smithery_config, tool_name, param_info)
58 |
59 |
60 | def validate_tool_schema(
61 | smithery_config, tool_name: str, param_info: dict[str, Any]
62 | ):
63 | """Validate that the tool schema in smithery.yaml matches the expected function parameter."""
64 | param_name = param_info["param_name"]
65 | expected_type = param_info["expected_type"]
66 |
67 | # Check if the tool is defined in the smithery.yaml
68 | assert tool_name in smithery_config.get(
69 | "tools", {}
70 | ), f"Tool '{tool_name}' is not defined in smithery.yaml"
71 |
72 | tool_config = smithery_config["tools"][tool_name]
73 |
74 | # Check if the tool has an input schema
75 | assert (
76 | "input" in tool_config
77 | ), f"Tool '{tool_name}' does not have an input schema defined"
78 |
79 | input_schema = tool_config["input"].get("schema", {})
80 |
81 | # Check if the parameter is required
82 | if issubclass(expected_type, BaseModel):
83 | # For complex types like TrialQuery, check if 'query' is required
84 | assert (
85 | "required" in input_schema
86 | ), f"Tool '{tool_name}' does not have required parameters specified"
87 | assert (
88 | "query" in input_schema.get("required", [])
89 | ), f"Parameter 'query' for tool '{tool_name}' is not marked as required"
90 | else:
91 | assert (
92 | "required" in input_schema
93 | ), f"Tool '{tool_name}' does not have required parameters specified"
94 | assert (
95 | param_name in input_schema.get("required", [])
96 | ), f"Parameter '{param_name}' for tool '{tool_name}' is not marked as required"
97 |
98 | # For complex types (Pydantic models), check if the schema references the correct type
99 | if issubclass(expected_type, BaseModel):
100 | properties = input_schema.get("properties", {})
101 | assert (
102 | "query" in properties
103 | ), f"Tool '{tool_name}' does not have a 'query' property defined"
104 |
105 | query_prop = properties["query"]
106 | assert (
107 | "$ref" in query_prop
108 | ), f"Tool '{tool_name}' query property does not reference a schema"
109 |
110 | schema_ref = query_prop["$ref"]
111 | expected_schema_name = expected_type.__name__
112 | assert schema_ref.endswith(
113 | expected_schema_name
114 | ), f"Tool '{tool_name}' references incorrect schema: {schema_ref}, expected: {expected_schema_name}"
115 |
```
--------------------------------------------------------------------------------
/scripts/check_http_imports.py:
--------------------------------------------------------------------------------
```python
1 | #!/usr/bin/env python3
2 | """Check for direct HTTP library imports outside of allowed files."""
3 |
4 | import ast
5 | import sys
6 | from pathlib import Path
7 |
8 | # HTTP libraries to check for
9 | HTTP_LIBRARIES = {
10 | "httpx",
11 | "aiohttp",
12 | "requests",
13 | "urllib3",
14 | } # Note: urllib is allowed for URL parsing
15 |
16 | # Files allowed to import HTTP libraries
17 | ALLOWED_FILES = {
18 | "http_client.py",
19 | "http_client_simple.py",
20 | "http_client_test.py",
21 | "test_http_client.py",
22 | "connection_pool.py", # Connection pooling infrastructure
23 | }
24 |
25 | # Additional allowed patterns (for version checks, etc.)
26 | ALLOWED_PATTERNS = {
27 | # Allow httpx import just for version check
28 | ("health.py", "httpx"): "version check only",
29 | }
30 |
31 |
32 | def _check_import_node(
33 | node: ast.Import, file_name: str
34 | ) -> set[tuple[str, int]]:
35 | """Check ast.Import node for violations."""
36 | violations = set()
37 | for alias in node.names:
38 | module_name = alias.name.split(".")[0]
39 | if module_name in HTTP_LIBRARIES:
40 | pattern_key = (file_name, module_name)
41 | if pattern_key not in ALLOWED_PATTERNS:
42 | violations.add((module_name, node.lineno))
43 | return violations
44 |
45 |
46 | def _check_import_from_node(
47 | node: ast.ImportFrom, file_name: str
48 | ) -> set[tuple[str, int]]:
49 | """Check ast.ImportFrom node for violations."""
50 | violations = set()
51 | if node.module:
52 | module_name = node.module.split(".")[0]
53 | if module_name in HTTP_LIBRARIES:
54 | pattern_key = (file_name, module_name)
55 | if pattern_key not in ALLOWED_PATTERNS:
56 | violations.add((module_name, node.lineno))
57 | return violations
58 |
59 |
60 | def check_imports(file_path: Path) -> set[tuple[str, int]]:
61 | """Check a Python file for HTTP library imports.
62 |
63 | Returns set of (library, line_number) tuples for violations.
64 | """
65 | violations = set()
66 |
67 | # Check if this file is allowed
68 | if file_path.name in ALLOWED_FILES:
69 | return violations
70 |
71 | try:
72 | with open(file_path, encoding="utf-8") as f:
73 | content = f.read()
74 |
75 | tree = ast.parse(content)
76 |
77 | for node in ast.walk(tree):
78 | if isinstance(node, ast.Import):
79 | violations.update(_check_import_node(node, file_path.name))
80 | elif isinstance(node, ast.ImportFrom):
81 | violations.update(
82 | _check_import_from_node(node, file_path.name)
83 | )
84 |
85 | except Exception as e:
86 | print(f"Error parsing {file_path}: {e}", file=sys.stderr)
87 |
88 | return violations
89 |
90 |
91 | def find_python_files(root_dir: Path) -> list[Path]:
92 | """Find all Python files in the project."""
93 | python_files = []
94 |
95 | for path in root_dir.rglob("*.py"):
96 | # Skip virtual environments, cache, etc.
97 | if any(
98 | part.startswith(".")
99 | or part in ["__pycache__", "venv", "env", ".tox"]
100 | for part in path.parts
101 | ):
102 | continue
103 | python_files.append(path)
104 |
105 | return python_files
106 |
107 |
108 | def main():
109 | """Main function to check all Python files."""
110 | # Get project root (parent of scripts directory)
111 | script_dir = Path(__file__).parent
112 | project_root = script_dir.parent
113 | src_dir = project_root / "src"
114 |
115 | # Find all Python files
116 | python_files = find_python_files(src_dir)
117 |
118 | all_violations = []
119 |
120 | for file_path in python_files:
121 | violations = check_imports(file_path)
122 | if violations:
123 | for lib, line in violations:
124 | all_violations.append((file_path, lib, line))
125 |
126 | if all_violations:
127 | print("❌ Found direct HTTP library imports:\n")
128 | for file_path, lib, line in sorted(all_violations):
129 | rel_path = file_path.relative_to(project_root)
130 | print(f" {rel_path}:{line} - imports '{lib}'")
131 |
132 | print(f"\n❌ Total violations: {len(all_violations)}")
133 | print(
134 | "\nPlease use the centralized HTTP client (biomcp.http_client) instead."
135 | )
136 | print(
137 | "If you need to add an exception, update ALLOWED_FILES or ALLOWED_PATTERNS in this script."
138 | )
139 | return 1
140 | else:
141 | print("✅ No direct HTTP library imports found outside allowed files.")
142 | return 0
143 |
144 |
145 | if __name__ == "__main__":
146 | sys.exit(main())
147 |
```
--------------------------------------------------------------------------------
/src/biomcp/variants/cbioportal_search_helpers.py:
--------------------------------------------------------------------------------
```python
1 | """Helper functions for cBioPortal search to reduce complexity."""
2 |
3 | import logging
4 | import re
5 | from typing import Any
6 |
7 | from .cbioportal_search import GeneHotspot
8 |
9 | logger = logging.getLogger(__name__)
10 |
11 |
12 | async def process_mutation_results(
13 | mutation_results: list[tuple[Any, str]],
14 | cancer_types_lookup: dict[str, dict[str, Any]],
15 | client: Any,
16 | ) -> dict[str, Any]:
17 | """Process mutation results from multiple studies.
18 |
19 | Args:
20 | mutation_results: List of (result, study_id) tuples
21 | cancer_types_lookup: Cancer type lookup dictionary
22 | client: Client instance for API calls
23 |
24 | Returns:
25 | Dictionary with aggregated mutation data
26 | """
27 | total_mutations = 0
28 | total_samples = 0
29 | hotspot_counts: dict[str, dict[str, Any]] = {}
30 | cancer_distribution: dict[str, int] = {}
31 | studies_with_data = 0
32 |
33 | for result, study_id in mutation_results:
34 | if isinstance(result, Exception):
35 | logger.debug(f"Failed to get mutations for {study_id}: {result}")
36 | continue
37 |
38 | if result and "mutations" in result:
39 | mutations = result["mutations"]
40 | sample_count = result["sample_count"]
41 |
42 | if mutations:
43 | studies_with_data += 1
44 | # Count unique samples with mutations
45 | unique_samples = {
46 | m.get("sampleId") for m in mutations if m.get("sampleId")
47 | }
48 | total_mutations += len(unique_samples)
49 | total_samples += sample_count
50 |
51 | # Process mutations for hotspots and cancer types
52 | study_cancer_type = await client._get_study_cancer_type(
53 | study_id, cancer_types_lookup
54 | )
55 | _update_hotspot_counts(
56 | mutations, hotspot_counts, study_cancer_type
57 | )
58 | _update_cancer_distribution(
59 | mutations, cancer_distribution, study_cancer_type
60 | )
61 |
62 | return {
63 | "total_mutations": total_mutations,
64 | "total_samples": total_samples,
65 | "studies_with_data": studies_with_data,
66 | "hotspot_counts": hotspot_counts,
67 | "cancer_distribution": cancer_distribution,
68 | }
69 |
70 |
71 | def _update_hotspot_counts(
72 | mutations: list[dict[str, Any]],
73 | hotspot_counts: dict[str, dict[str, Any]],
74 | cancer_type: str,
75 | ) -> None:
76 | """Update hotspot counts from mutations."""
77 | for mut in mutations:
78 | protein_change = mut.get("proteinChange", "")
79 | if protein_change:
80 | if protein_change not in hotspot_counts:
81 | hotspot_counts[protein_change] = {
82 | "count": 0,
83 | "cancer_types": set(),
84 | }
85 | hotspot_counts[protein_change]["count"] += 1
86 | hotspot_counts[protein_change]["cancer_types"].add(cancer_type)
87 |
88 |
89 | def _update_cancer_distribution(
90 | mutations: list[dict[str, Any]],
91 | cancer_distribution: dict[str, int],
92 | cancer_type: str,
93 | ) -> None:
94 | """Update cancer type distribution."""
95 | cancer_distribution[cancer_type] = cancer_distribution.get(
96 | cancer_type, 0
97 | ) + len({m.get("sampleId") for m in mutations if m.get("sampleId")})
98 |
99 |
100 | def format_hotspots(
101 | hotspot_counts: dict[str, dict[str, Any]], total_mutations: int
102 | ) -> list[GeneHotspot]:
103 | """Format hotspot counts into GeneHotspot objects."""
104 | hotspots = []
105 |
106 | for protein_change, data in sorted(
107 | hotspot_counts.items(), key=lambda x: x[1]["count"], reverse=True
108 | )[:5]: # Top 5 hotspots
109 | # Try to extract position from protein change
110 | position = 0
111 | try:
112 | match = re.search(r"(\d+)", protein_change)
113 | if match:
114 | position = int(match.group(1))
115 | except Exception:
116 | logger.debug("Failed to extract position from protein change")
117 |
118 | hotspots.append(
119 | GeneHotspot(
120 | position=position,
121 | amino_acid_change=protein_change,
122 | count=data["count"],
123 | frequency=data["count"] / total_mutations
124 | if total_mutations > 0
125 | else 0.0,
126 | cancer_types=list(data["cancer_types"]),
127 | )
128 | )
129 |
130 | return hotspots
131 |
```
--------------------------------------------------------------------------------
/tests/tdd/workers/test_worker_sanitization.js:
--------------------------------------------------------------------------------
```javascript
1 | /**
2 | * Tests for worker_entry_stytch.js sanitization functionality
3 | */
4 |
5 | const { test } = require("node:test");
6 | const assert = require("node:assert");
7 |
8 | // Mock the sanitizeObject function for testing
9 | const SENSITIVE_FIELDS = [
10 | "api_key",
11 | "apiKey",
12 | "api-key",
13 | "token",
14 | "secret",
15 | "password",
16 | ];
17 |
18 | const sanitizeObject = (obj) => {
19 | if (!obj || typeof obj !== "object") return obj;
20 |
21 | // Handle arrays
22 | if (Array.isArray(obj)) {
23 | return obj.map((item) => sanitizeObject(item));
24 | }
25 |
26 | // Handle objects
27 | const sanitized = {};
28 | for (const [key, value] of Object.entries(obj)) {
29 | // Check if this key is sensitive
30 | const lowerKey = key.toLowerCase();
31 | if (
32 | SENSITIVE_FIELDS.some((field) => lowerKey.includes(field.toLowerCase()))
33 | ) {
34 | sanitized[key] = "[REDACTED]";
35 | } else if (typeof value === "object" && value !== null) {
36 | // Recursively sanitize nested objects
37 | sanitized[key] = sanitizeObject(value);
38 | } else {
39 | sanitized[key] = value;
40 | }
41 | }
42 | return sanitized;
43 | };
44 |
45 | // Test cases
46 | test("should redact api_key field", () => {
47 | const input = {
48 | params: {
49 | arguments: {
50 | api_key: "AIzaSyB1234567890",
51 | gene: "BRAF",
52 | position: 140753336,
53 | },
54 | },
55 | };
56 |
57 | const result = sanitizeObject(input);
58 | assert.strictEqual(result.params.arguments.api_key, "[REDACTED]");
59 | assert.strictEqual(result.params.arguments.gene, "BRAF");
60 | assert.strictEqual(result.params.arguments.position, 140753336);
61 | });
62 |
63 | test("should handle nested sensitive fields", () => {
64 | const input = {
65 | outer: {
66 | token: "secret-token",
67 | inner: {
68 | password: "my-password",
69 | apiKey: "another-key",
70 | safe_field: "visible",
71 | },
72 | },
73 | };
74 |
75 | const result = sanitizeObject(input);
76 | assert.strictEqual(result.outer.token, "[REDACTED]");
77 | assert.strictEqual(result.outer.inner.password, "[REDACTED]");
78 | assert.strictEqual(result.outer.inner.apiKey, "[REDACTED]");
79 | assert.strictEqual(result.outer.inner.safe_field, "visible");
80 | });
81 |
82 | test("should handle arrays with sensitive data", () => {
83 | const input = {
84 | requests: [
85 | { api_key: "key1", data: "safe" },
86 | { api_key: "key2", data: "also safe" },
87 | ],
88 | };
89 |
90 | const result = sanitizeObject(input);
91 | assert.strictEqual(result.requests[0].api_key, "[REDACTED]");
92 | assert.strictEqual(result.requests[1].api_key, "[REDACTED]");
93 | assert.strictEqual(result.requests[0].data, "safe");
94 | assert.strictEqual(result.requests[1].data, "also safe");
95 | });
96 |
97 | test("should be case-insensitive for field names", () => {
98 | const input = {
99 | API_KEY: "uppercase",
100 | Api_Key: "mixed",
101 | "api-key": "hyphenated",
102 | };
103 |
104 | const result = sanitizeObject(input);
105 | assert.strictEqual(result.API_KEY, "[REDACTED]");
106 | assert.strictEqual(result.Api_Key, "[REDACTED]");
107 | assert.strictEqual(result["api-key"], "[REDACTED]");
108 | });
109 |
110 | test("should not modify non-sensitive fields", () => {
111 | const input = {
112 | gene: "TP53",
113 | chromosome: "chr17",
114 | position: 7577121,
115 | reference: "C",
116 | alternate: "T",
117 | };
118 |
119 | const result = sanitizeObject(input);
120 | assert.deepStrictEqual(result, input);
121 | });
122 |
123 | test("should handle null and undefined values", () => {
124 | const input = {
125 | api_key: null,
126 | token: undefined,
127 | valid: "data",
128 | };
129 |
130 | const result = sanitizeObject(input);
131 | assert.strictEqual(result.api_key, "[REDACTED]");
132 | assert.strictEqual(result.token, "[REDACTED]");
133 | assert.strictEqual(result.valid, "data");
134 | });
135 |
136 | test("should handle think tool detection", () => {
137 | const thinkRequest = {
138 | params: {
139 | name: "think",
140 | arguments: {
141 | thought: "Analyzing the problem...",
142 | thoughtNumber: 1,
143 | },
144 | },
145 | };
146 |
147 | const toolName = thinkRequest.params?.name;
148 | assert.strictEqual(toolName, "think");
149 | });
150 |
151 | test("should handle domain-based filtering", () => {
152 | const searchRequest1 = {
153 | params: {
154 | name: "search",
155 | arguments: {
156 | domain: "thinking",
157 | query: "some query",
158 | },
159 | },
160 | };
161 |
162 | const searchRequest2 = {
163 | params: {
164 | name: "search",
165 | arguments: {
166 | domain: "think",
167 | query: "some query",
168 | },
169 | },
170 | };
171 |
172 | const domain1 = searchRequest1.params?.arguments?.domain;
173 | const domain2 = searchRequest2.params?.arguments?.domain;
174 |
175 | assert.ok(domain1 === "thinking" || domain1 === "think");
176 | assert.ok(domain2 === "thinking" || domain2 === "think");
177 | });
178 |
```
--------------------------------------------------------------------------------
/src/biomcp/cli/interventions.py:
--------------------------------------------------------------------------------
```python
1 | """CLI commands for intervention search and lookup."""
2 |
3 | import asyncio
4 | from typing import Annotated
5 |
6 | import typer
7 |
8 | from ..integrations.cts_api import CTSAPIError, get_api_key_instructions
9 | from ..interventions import get_intervention, search_interventions
10 | from ..interventions.getter import format_intervention_details
11 | from ..interventions.search import (
12 | INTERVENTION_TYPES,
13 | format_intervention_results,
14 | )
15 |
16 | intervention_app = typer.Typer(
17 | no_args_is_help=True,
18 | help="Search and retrieve intervention information from NCI CTS API",
19 | )
20 |
21 |
22 | @intervention_app.command("search")
23 | def search_interventions_cli(
24 | name: Annotated[
25 | str | None,
26 | typer.Argument(
27 | help="Intervention name to search for (partial match supported)"
28 | ),
29 | ] = None,
30 | intervention_type: Annotated[
31 | str | None,
32 | typer.Option(
33 | "--type",
34 | help=f"Type of intervention. Options: {', '.join(INTERVENTION_TYPES)}",
35 | show_choices=True,
36 | ),
37 | ] = None,
38 | synonyms: Annotated[
39 | bool,
40 | typer.Option(
41 | "--synonyms/--no-synonyms",
42 | help="Include synonym matches in search",
43 | ),
44 | ] = True,
45 | page_size: Annotated[
46 | int,
47 | typer.Option(
48 | "--page-size",
49 | help="Number of results per page",
50 | min=1,
51 | max=100,
52 | ),
53 | ] = 20,
54 | page: Annotated[
55 | int,
56 | typer.Option(
57 | "--page",
58 | help="Page number",
59 | min=1,
60 | ),
61 | ] = 1,
62 | api_key: Annotated[
63 | str | None,
64 | typer.Option(
65 | "--api-key",
66 | help="NCI API key (overrides NCI_API_KEY env var)",
67 | envvar="NCI_API_KEY",
68 | ),
69 | ] = None,
70 | ) -> None:
71 | """
72 | Search for interventions (drugs, devices, procedures) in the NCI database.
73 |
74 | Examples:
75 | # Search by drug name
76 | biomcp intervention search pembrolizumab
77 |
78 | # Search by type
79 | biomcp intervention search --type Drug
80 |
81 | # Search for devices
82 | biomcp intervention search "CAR T" --type Biological
83 |
84 | # Search without synonyms
85 | biomcp intervention search imatinib --no-synonyms
86 | """
87 | try:
88 | results = asyncio.run(
89 | search_interventions(
90 | name=name,
91 | intervention_type=intervention_type,
92 | synonyms=synonyms,
93 | page_size=page_size,
94 | page=page,
95 | api_key=api_key,
96 | )
97 | )
98 |
99 | output = format_intervention_results(results)
100 | typer.echo(output)
101 |
102 | except CTSAPIError as e:
103 | if "API key required" in str(e):
104 | typer.echo(get_api_key_instructions())
105 | else:
106 | typer.echo(f"Error: {e}", err=True)
107 | raise typer.Exit(1) from e
108 | except Exception as e:
109 | typer.echo(f"Unexpected error: {e}", err=True)
110 | raise typer.Exit(1) from e
111 |
112 |
113 | @intervention_app.command("get")
114 | def get_intervention_cli(
115 | intervention_id: Annotated[
116 | str,
117 | typer.Argument(help="Intervention ID"),
118 | ],
119 | api_key: Annotated[
120 | str | None,
121 | typer.Option(
122 | "--api-key",
123 | help="NCI API key (overrides NCI_API_KEY env var)",
124 | envvar="NCI_API_KEY",
125 | ),
126 | ] = None,
127 | ) -> None:
128 | """
129 | Get detailed information about a specific intervention.
130 |
131 | Example:
132 | biomcp intervention get INT123456
133 | """
134 | try:
135 | intervention_data = asyncio.run(
136 | get_intervention(
137 | intervention_id=intervention_id,
138 | api_key=api_key,
139 | )
140 | )
141 |
142 | output = format_intervention_details(intervention_data)
143 | typer.echo(output)
144 |
145 | except CTSAPIError as e:
146 | if "API key required" in str(e):
147 | typer.echo(get_api_key_instructions())
148 | else:
149 | typer.echo(f"Error: {e}", err=True)
150 | raise typer.Exit(1) from e
151 | except Exception as e:
152 | typer.echo(f"Unexpected error: {e}", err=True)
153 | raise typer.Exit(1) from e
154 |
155 |
156 | @intervention_app.command("types")
157 | def list_intervention_types() -> None:
158 | """
159 | List all available intervention types.
160 | """
161 | typer.echo("## Available Intervention Types\n")
162 | for int_type in INTERVENTION_TYPES:
163 | typer.echo(f"- {int_type}")
164 | typer.echo("\nUse these values with the --type option when searching.")
165 |
```