This is page 2 of 16. Use http://codebase.md/genomoncology/biomcp?page={x} to view the full context.
# Directory Structure
```
├── .github
│ ├── actions
│ │ └── setup-python-env
│ │ └── action.yml
│ ├── dependabot.yml
│ └── workflows
│ ├── ci.yml
│ ├── deploy-docs.yml
│ ├── main.yml.disabled
│ ├── on-release-main.yml
│ └── validate-codecov-config.yml
├── .gitignore
├── .pre-commit-config.yaml
├── BIOMCP_DATA_FLOW.md
├── CHANGELOG.md
├── CNAME
├── codecov.yaml
├── docker-compose.yml
├── Dockerfile
├── docs
│ ├── apis
│ │ ├── error-codes.md
│ │ ├── overview.md
│ │ └── python-sdk.md
│ ├── assets
│ │ ├── biomcp-cursor-locations.png
│ │ ├── favicon.ico
│ │ ├── icon.png
│ │ ├── logo.png
│ │ ├── mcp_architecture.txt
│ │ └── remote-connection
│ │ ├── 00_connectors.png
│ │ ├── 01_add_custom_connector.png
│ │ ├── 02_connector_enabled.png
│ │ ├── 03_connect_to_biomcp.png
│ │ ├── 04_select_google_oauth.png
│ │ └── 05_success_connect.png
│ ├── backend-services-reference
│ │ ├── 01-overview.md
│ │ ├── 02-biothings-suite.md
│ │ ├── 03-cbioportal.md
│ │ ├── 04-clinicaltrials-gov.md
│ │ ├── 05-nci-cts-api.md
│ │ ├── 06-pubtator3.md
│ │ └── 07-alphagenome.md
│ ├── blog
│ │ ├── ai-assisted-clinical-trial-search-analysis.md
│ │ ├── images
│ │ │ ├── deep-researcher-video.png
│ │ │ ├── researcher-announce.png
│ │ │ ├── researcher-drop-down.png
│ │ │ ├── researcher-prompt.png
│ │ │ ├── trial-search-assistant.png
│ │ │ └── what_is_biomcp_thumbnail.png
│ │ └── researcher-persona-resource.md
│ ├── changelog.md
│ ├── CNAME
│ ├── concepts
│ │ ├── 01-what-is-biomcp.md
│ │ ├── 02-the-deep-researcher-persona.md
│ │ └── 03-sequential-thinking-with-the-think-tool.md
│ ├── developer-guides
│ │ ├── 01-server-deployment.md
│ │ ├── 02-contributing-and-testing.md
│ │ ├── 03-third-party-endpoints.md
│ │ ├── 04-transport-protocol.md
│ │ ├── 05-error-handling.md
│ │ ├── 06-http-client-and-caching.md
│ │ ├── 07-performance-optimizations.md
│ │ └── generate_endpoints.py
│ ├── faq-condensed.md
│ ├── FDA_SECURITY.md
│ ├── genomoncology.md
│ ├── getting-started
│ │ ├── 01-quickstart-cli.md
│ │ ├── 02-claude-desktop-integration.md
│ │ └── 03-authentication-and-api-keys.md
│ ├── how-to-guides
│ │ ├── 01-find-articles-and-cbioportal-data.md
│ │ ├── 02-find-trials-with-nci-and-biothings.md
│ │ ├── 03-get-comprehensive-variant-annotations.md
│ │ ├── 04-predict-variant-effects-with-alphagenome.md
│ │ ├── 05-logging-and-monitoring-with-bigquery.md
│ │ └── 06-search-nci-organizations-and-interventions.md
│ ├── index.md
│ ├── policies.md
│ ├── reference
│ │ ├── architecture-diagrams.md
│ │ ├── quick-architecture.md
│ │ ├── quick-reference.md
│ │ └── visual-architecture.md
│ ├── robots.txt
│ ├── stylesheets
│ │ ├── announcement.css
│ │ └── extra.css
│ ├── troubleshooting.md
│ ├── tutorials
│ │ ├── biothings-prompts.md
│ │ ├── claude-code-biomcp-alphagenome.md
│ │ ├── nci-prompts.md
│ │ ├── openfda-integration.md
│ │ ├── openfda-prompts.md
│ │ ├── pydantic-ai-integration.md
│ │ └── remote-connection.md
│ ├── user-guides
│ │ ├── 01-command-line-interface.md
│ │ ├── 02-mcp-tools-reference.md
│ │ └── 03-integrating-with-ides-and-clients.md
│ └── workflows
│ └── all-workflows.md
├── example_scripts
│ ├── mcp_integration.py
│ └── python_sdk.py
├── glama.json
├── LICENSE
├── lzyank.toml
├── Makefile
├── mkdocs.yml
├── package-lock.json
├── package.json
├── pyproject.toml
├── README.md
├── scripts
│ ├── check_docs_in_mkdocs.py
│ ├── check_http_imports.py
│ └── generate_endpoints_doc.py
├── smithery.yaml
├── src
│ └── biomcp
│ ├── __init__.py
│ ├── __main__.py
│ ├── articles
│ │ ├── __init__.py
│ │ ├── autocomplete.py
│ │ ├── fetch.py
│ │ ├── preprints.py
│ │ ├── search_optimized.py
│ │ ├── search.py
│ │ └── unified.py
│ ├── biomarkers
│ │ ├── __init__.py
│ │ └── search.py
│ ├── cbioportal_helper.py
│ ├── circuit_breaker.py
│ ├── cli
│ │ ├── __init__.py
│ │ ├── articles.py
│ │ ├── biomarkers.py
│ │ ├── diseases.py
│ │ ├── health.py
│ │ ├── interventions.py
│ │ ├── main.py
│ │ ├── openfda.py
│ │ ├── organizations.py
│ │ ├── server.py
│ │ ├── trials.py
│ │ └── variants.py
│ ├── connection_pool.py
│ ├── constants.py
│ ├── core.py
│ ├── diseases
│ │ ├── __init__.py
│ │ ├── getter.py
│ │ └── search.py
│ ├── domain_handlers.py
│ ├── drugs
│ │ ├── __init__.py
│ │ └── getter.py
│ ├── exceptions.py
│ ├── genes
│ │ ├── __init__.py
│ │ └── getter.py
│ ├── http_client_simple.py
│ ├── http_client.py
│ ├── individual_tools.py
│ ├── integrations
│ │ ├── __init__.py
│ │ ├── biothings_client.py
│ │ └── cts_api.py
│ ├── interventions
│ │ ├── __init__.py
│ │ ├── getter.py
│ │ └── search.py
│ ├── logging_filter.py
│ ├── metrics_handler.py
│ ├── metrics.py
│ ├── oncokb_helper.py
│ ├── openfda
│ │ ├── __init__.py
│ │ ├── adverse_events_helpers.py
│ │ ├── adverse_events.py
│ │ ├── cache.py
│ │ ├── constants.py
│ │ ├── device_events_helpers.py
│ │ ├── device_events.py
│ │ ├── drug_approvals.py
│ │ ├── drug_labels_helpers.py
│ │ ├── drug_labels.py
│ │ ├── drug_recalls_helpers.py
│ │ ├── drug_recalls.py
│ │ ├── drug_shortages_detail_helpers.py
│ │ ├── drug_shortages_helpers.py
│ │ ├── drug_shortages.py
│ │ ├── exceptions.py
│ │ ├── input_validation.py
│ │ ├── rate_limiter.py
│ │ ├── utils.py
│ │ └── validation.py
│ ├── organizations
│ │ ├── __init__.py
│ │ ├── getter.py
│ │ └── search.py
│ ├── parameter_parser.py
│ ├── query_parser.py
│ ├── query_router.py
│ ├── rate_limiter.py
│ ├── render.py
│ ├── request_batcher.py
│ ├── resources
│ │ ├── __init__.py
│ │ ├── getter.py
│ │ ├── instructions.md
│ │ └── researcher.md
│ ├── retry.py
│ ├── router_handlers.py
│ ├── router.py
│ ├── shared_context.py
│ ├── thinking
│ │ ├── __init__.py
│ │ ├── sequential.py
│ │ └── session.py
│ ├── thinking_tool.py
│ ├── thinking_tracker.py
│ ├── trials
│ │ ├── __init__.py
│ │ ├── getter.py
│ │ ├── nci_getter.py
│ │ ├── nci_search.py
│ │ └── search.py
│ ├── utils
│ │ ├── __init__.py
│ │ ├── cancer_types_api.py
│ │ ├── cbio_http_adapter.py
│ │ ├── endpoint_registry.py
│ │ ├── gene_validator.py
│ │ ├── metrics.py
│ │ ├── mutation_filter.py
│ │ ├── query_utils.py
│ │ ├── rate_limiter.py
│ │ └── request_cache.py
│ ├── variants
│ │ ├── __init__.py
│ │ ├── alphagenome.py
│ │ ├── cancer_types.py
│ │ ├── cbio_external_client.py
│ │ ├── cbioportal_mutations.py
│ │ ├── cbioportal_search_helpers.py
│ │ ├── cbioportal_search.py
│ │ ├── constants.py
│ │ ├── external.py
│ │ ├── filters.py
│ │ ├── getter.py
│ │ ├── links.py
│ │ ├── oncokb_client.py
│ │ ├── oncokb_models.py
│ │ └── search.py
│ └── workers
│ ├── __init__.py
│ ├── worker_entry_stytch.js
│ ├── worker_entry.js
│ └── worker.py
├── tests
│ ├── bdd
│ │ ├── cli_help
│ │ │ ├── help.feature
│ │ │ └── test_help.py
│ │ ├── conftest.py
│ │ ├── features
│ │ │ └── alphagenome_integration.feature
│ │ ├── fetch_articles
│ │ │ ├── fetch.feature
│ │ │ └── test_fetch.py
│ │ ├── get_trials
│ │ │ ├── get.feature
│ │ │ └── test_get.py
│ │ ├── get_variants
│ │ │ ├── get.feature
│ │ │ └── test_get.py
│ │ ├── search_articles
│ │ │ ├── autocomplete.feature
│ │ │ ├── search.feature
│ │ │ ├── test_autocomplete.py
│ │ │ └── test_search.py
│ │ ├── search_trials
│ │ │ ├── search.feature
│ │ │ └── test_search.py
│ │ ├── search_variants
│ │ │ ├── search.feature
│ │ │ └── test_search.py
│ │ └── steps
│ │ └── test_alphagenome_steps.py
│ ├── config
│ │ └── test_smithery_config.py
│ ├── conftest.py
│ ├── data
│ │ ├── ct_gov
│ │ │ ├── clinical_trials_api_v2.yaml
│ │ │ ├── trials_NCT04280705.json
│ │ │ └── trials_NCT04280705.txt
│ │ ├── myvariant
│ │ │ ├── myvariant_api.yaml
│ │ │ ├── myvariant_field_descriptions.csv
│ │ │ ├── variants_full_braf_v600e.json
│ │ │ ├── variants_full_braf_v600e.txt
│ │ │ └── variants_part_braf_v600_multiple.json
│ │ ├── oncokb_mock_responses.json
│ │ ├── openfda
│ │ │ ├── drugsfda_detail.json
│ │ │ ├── drugsfda_search.json
│ │ │ ├── enforcement_detail.json
│ │ │ └── enforcement_search.json
│ │ └── pubtator
│ │ ├── pubtator_autocomplete.json
│ │ └── pubtator3_paper.txt
│ ├── integration
│ │ ├── test_oncokb_integration.py
│ │ ├── test_openfda_integration.py
│ │ ├── test_preprints_integration.py
│ │ ├── test_simple.py
│ │ └── test_variants_integration.py
│ ├── tdd
│ │ ├── articles
│ │ │ ├── test_autocomplete.py
│ │ │ ├── test_cbioportal_integration.py
│ │ │ ├── test_fetch.py
│ │ │ ├── test_preprints.py
│ │ │ ├── test_search.py
│ │ │ └── test_unified.py
│ │ ├── conftest.py
│ │ ├── drugs
│ │ │ ├── __init__.py
│ │ │ └── test_drug_getter.py
│ │ ├── openfda
│ │ │ ├── __init__.py
│ │ │ ├── test_adverse_events.py
│ │ │ ├── test_device_events.py
│ │ │ ├── test_drug_approvals.py
│ │ │ ├── test_drug_labels.py
│ │ │ ├── test_drug_recalls.py
│ │ │ ├── test_drug_shortages.py
│ │ │ └── test_security.py
│ │ ├── test_biothings_integration_real.py
│ │ ├── test_biothings_integration.py
│ │ ├── test_circuit_breaker.py
│ │ ├── test_concurrent_requests.py
│ │ ├── test_connection_pool.py
│ │ ├── test_domain_handlers.py
│ │ ├── test_drug_approvals.py
│ │ ├── test_drug_recalls.py
│ │ ├── test_drug_shortages.py
│ │ ├── test_endpoint_documentation.py
│ │ ├── test_error_scenarios.py
│ │ ├── test_europe_pmc_fetch.py
│ │ ├── test_mcp_integration.py
│ │ ├── test_mcp_tools.py
│ │ ├── test_metrics.py
│ │ ├── test_nci_integration.py
│ │ ├── test_nci_mcp_tools.py
│ │ ├── test_network_policies.py
│ │ ├── test_offline_mode.py
│ │ ├── test_openfda_unified.py
│ │ ├── test_pten_r173_search.py
│ │ ├── test_render.py
│ │ ├── test_request_batcher.py.disabled
│ │ ├── test_retry.py
│ │ ├── test_router.py
│ │ ├── test_shared_context.py.disabled
│ │ ├── test_unified_biothings.py
│ │ ├── thinking
│ │ │ ├── __init__.py
│ │ │ └── test_sequential.py
│ │ ├── trials
│ │ │ ├── test_backward_compatibility.py
│ │ │ ├── test_getter.py
│ │ │ └── test_search.py
│ │ ├── utils
│ │ │ ├── test_gene_validator.py
│ │ │ ├── test_mutation_filter.py
│ │ │ ├── test_rate_limiter.py
│ │ │ └── test_request_cache.py
│ │ ├── variants
│ │ │ ├── constants.py
│ │ │ ├── test_alphagenome_api_key.py
│ │ │ ├── test_alphagenome_comprehensive.py
│ │ │ ├── test_alphagenome.py
│ │ │ ├── test_cbioportal_mutations.py
│ │ │ ├── test_cbioportal_search.py
│ │ │ ├── test_external_integration.py
│ │ │ ├── test_external.py
│ │ │ ├── test_extract_gene_aa_change.py
│ │ │ ├── test_filters.py
│ │ │ ├── test_getter.py
│ │ │ ├── test_links.py
│ │ │ ├── test_oncokb_client.py
│ │ │ ├── test_oncokb_helper.py
│ │ │ └── test_search.py
│ │ └── workers
│ │ └── test_worker_sanitization.js
│ └── test_pydantic_ai_integration.py
├── THIRD_PARTY_ENDPOINTS.md
├── tox.ini
├── uv.lock
└── wrangler.toml
```
# Files
--------------------------------------------------------------------------------
/src/biomcp/thinking_tool.py:
--------------------------------------------------------------------------------
```python
"""Sequential thinking tool for structured problem-solving.
This module provides a dedicated MCP tool for sequential thinking,
separate from the main search functionality.
"""
from typing import Annotated
from pydantic import Field
from biomcp.core import mcp_app
from biomcp.metrics import track_performance
from biomcp.thinking.sequential import _sequential_thinking
from biomcp.thinking_tracker import mark_thinking_used
@mcp_app.tool()
@track_performance("biomcp.think")
async def think(
thought: Annotated[
str,
Field(description="Current thinking step for analysis"),
],
thoughtNumber: Annotated[
int,
Field(
description="Current thought number, starting at 1",
ge=1,
),
],
totalThoughts: Annotated[
int,
Field(
description="Estimated total thoughts needed for complete analysis",
ge=1,
),
],
nextThoughtNeeded: Annotated[
bool,
Field(
description="Whether more thinking steps are needed after this one",
),
] = True,
) -> dict:
"""REQUIRED FIRST STEP: Perform structured sequential thinking for ANY biomedical research task.
🚨 IMPORTANT: You MUST use this tool BEFORE any search or fetch operations when:
- Researching ANY biomedical topic (genes, diseases, variants, trials)
- Planning to use multiple BioMCP tools
- Answering questions that require analysis or synthesis
- Comparing information from different sources
- Making recommendations or drawing conclusions
⚠️ FAILURE TO USE THIS TOOL FIRST will result in:
- Incomplete or poorly structured analysis
- Missing important connections between data
- Suboptimal search strategies
- Overlooked critical information
Sequential thinking ensures you:
1. Fully understand the research question
2. Plan an optimal search strategy
3. Identify all relevant data sources
4. Structure your analysis properly
5. Deliver comprehensive, well-reasoned results
## Usage Pattern:
1. Start with thoughtNumber=1 to initiate analysis
2. Progress through numbered thoughts sequentially
3. Adjust totalThoughts estimate as understanding develops
4. Set nextThoughtNeeded=False only when analysis is complete
## Example:
```python
# Initial analysis
await think(
thought="Breaking down the relationship between BRAF mutations and melanoma treatment resistance...",
thoughtNumber=1,
totalThoughts=5,
nextThoughtNeeded=True
)
# Continue analysis
await think(
thought="Examining specific BRAF V600E mutation mechanisms...",
thoughtNumber=2,
totalThoughts=5,
nextThoughtNeeded=True
)
# Final thought
await think(
thought="Synthesizing findings and proposing research directions...",
thoughtNumber=5,
totalThoughts=5,
nextThoughtNeeded=False
)
```
## Important Notes:
- Each thought builds on previous ones within a session
- State is maintained throughout the MCP session
- Use thoughtful, detailed analysis in each step
- Revisions and branching are supported through the underlying implementation
"""
# Mark that thinking has been used
mark_thinking_used()
result = await _sequential_thinking(
thought=thought,
thoughtNumber=thoughtNumber,
totalThoughts=totalThoughts,
nextThoughtNeeded=nextThoughtNeeded,
)
return {
"domain": "thinking",
"result": result,
"thoughtNumber": thoughtNumber,
"nextThoughtNeeded": nextThoughtNeeded,
}
```
--------------------------------------------------------------------------------
/docs/developer-guides/07-performance-optimizations.md:
--------------------------------------------------------------------------------
```markdown
# Performance Optimizations
This document describes the performance optimizations implemented in BioMCP to improve response times and throughput.
## Overview
BioMCP has been optimized for high-performance biomedical data retrieval through several key improvements:
- **65% faster test execution** (from ~120s to ~42s)
- **Reduced API calls** through intelligent caching and batching
- **Lower latency** via connection pooling
- **Better resource utilization** with parallel processing
## Key Optimizations
### 1. Connection Pooling
HTTP connections are now reused across requests, eliminating connection establishment overhead.
**Configuration:**
- `BIOMCP_USE_CONNECTION_POOL` - Enable/disable pooling (default: "true")
- Automatically manages pools per event loop
- Graceful cleanup on shutdown
**Impact:** ~30% reduction in request latency for sequential operations
### 2. Parallel Test Execution
Tests now run in parallel using pytest-xdist, dramatically reducing test suite execution time.
**Usage:**
```bash
make test # Automatically uses parallel execution
```
**Impact:** ~5x faster test execution
### 3. Request Batching
Multiple API requests are batched together when possible, particularly for cBioPortal queries.
**Features:**
- Automatic batching based on size/time thresholds
- Configurable batch size (default: 5 for cBioPortal)
- Error isolation per request
**Impact:** Up to 80% reduction in API calls for bulk operations
### 4. Smart Caching
Multiple caching layers optimize repeated queries:
- **LRU Cache:** Memory-bounded caching for recent requests
- **Hash-based keys:** 10x faster cache key generation
- **Shared validation context:** Eliminates redundant gene/entity validations
**Configuration:**
- Cache size: 1000 entries (configurable)
- TTL: 5-30 minutes depending on data type
### 5. Pagination Support
Europe PMC searches now use pagination for large result sets:
- Optimal page size: 25 results
- Progressive loading
- Memory-efficient processing
### 6. Conditional Metrics
Performance metrics are only collected when explicitly enabled, reducing overhead.
**Configuration:**
- `BIOMCP_METRICS_ENABLED` - Enable metrics (default: "false")
## Performance Benchmarks
### API Response Times
| Operation | Before | After | Improvement |
| ------------------------------ | ------ | ----- | ----------- |
| Single gene search | 850ms | 320ms | 62% |
| Bulk variant lookup | 4.2s | 1.1s | 74% |
| Article search with cBioPortal | 2.1s | 780ms | 63% |
### Resource Usage
| Metric | Before | After | Improvement |
| ------------- | ------ | ----- | ----------- |
| Memory (idle) | 145MB | 152MB | +5% |
| Memory (peak) | 512MB | 385MB | -25% |
| CPU (avg) | 35% | 28% | -20% |
## Best Practices
1. **Keep connection pooling enabled** unless experiencing issues
2. **Use the unified search** methods to benefit from parallel execution
3. **Batch operations** when performing multiple lookups
4. **Monitor cache hit rates** in production environments
## Troubleshooting
### Connection Pool Issues
If experiencing connection errors:
1. Disable pooling: `export BIOMCP_USE_CONNECTION_POOL=false`
2. Check for firewall/proxy issues
3. Verify SSL certificates
### Memory Usage
If memory usage is high:
1. Reduce cache size in `request_cache.py`
2. Lower connection pool limits
### Performance Regression
To identify performance issues:
1. Enable metrics: `export BIOMCP_METRICS_ENABLED=true`
2. Check slow operations in logs
3. Profile with `py-spy` or similar tools
## Future Optimizations
Planned improvements include:
- GraphQL batching for complex queries
- Redis integration for distributed caching
- WebSocket support for real-time updates
- GPU acceleration for variant analysis
```
--------------------------------------------------------------------------------
/tests/tdd/variants/test_search.py:
--------------------------------------------------------------------------------
```python
import pytest
from biomcp.variants.search import (
ClinicalSignificance,
PolyPhenPrediction,
SiftPrediction,
VariantQuery,
build_query_string,
search_variants,
)
@pytest.fixture
def basic_query():
"""Create a basic gene query."""
return VariantQuery(gene="BRAF")
@pytest.fixture
def complex_query():
"""Create a complex query with multiple parameters."""
return VariantQuery(
gene="BRCA1",
significance=ClinicalSignificance.PATHOGENIC,
min_frequency=0.0001,
max_frequency=0.01,
)
def test_query_validation():
"""Test VariantQuery model validation."""
# Test basic query with gene
query = VariantQuery(gene="BRAF")
assert query.gene == "BRAF"
# Test query with rsid
query = VariantQuery(rsid="rs113488022")
assert query.rsid == "rs113488022"
# Test query requires at least one search parameter
with pytest.raises(ValueError):
VariantQuery()
# Test query with clinical significance enum requires a search parameter
query = VariantQuery(
gene="BRCA1", significance=ClinicalSignificance.PATHOGENIC
)
assert query.significance == ClinicalSignificance.PATHOGENIC
# Test query with prediction scores
query = VariantQuery(
gene="TP53",
polyphen=PolyPhenPrediction.PROBABLY_DAMAGING,
sift=SiftPrediction.DELETERIOUS,
)
assert query.polyphen == PolyPhenPrediction.PROBABLY_DAMAGING
assert query.sift == SiftPrediction.DELETERIOUS
def test_build_query_string():
"""Test build_query_string function."""
# Test single field
query = VariantQuery(gene="BRAF")
q_string = build_query_string(query)
assert 'dbnsfp.genename:"BRAF"' in q_string
# Test multiple fields
query = VariantQuery(gene="BRAF", rsid="rs113488022")
q_string = build_query_string(query)
assert 'dbnsfp.genename:"BRAF"' in q_string
assert "rs113488022" in q_string
# Test genomic region
query = VariantQuery(region="chr7:140753300-140753400")
q_string = build_query_string(query)
assert "chr7:140753300-140753400" in q_string
# Test clinical significance
query = VariantQuery(significance=ClinicalSignificance.LIKELY_BENIGN)
q_string = build_query_string(query)
assert 'clinvar.rcv.clinical_significance:"likely benign"' in q_string
# Test frequency filters
query = VariantQuery(min_frequency=0.0001, max_frequency=0.01)
q_string = build_query_string(query)
assert "gnomad_exome.af.af:>=0.0001" in q_string
assert "gnomad_exome.af.af:<=0.01" in q_string
async def test_search_variants_basic(basic_query, anyio_backend):
"""Test search_variants function with a basic query."""
# Use a real API query for a common gene
result = await search_variants(basic_query)
# Verify we got sensible results
assert "BRAF" in result
assert not result.startswith("Error")
async def test_search_variants_complex(complex_query, anyio_backend):
"""Test search_variants function with a complex query."""
# Use a simple common query that will return results
simple_query = VariantQuery(gene="TP53")
result = await search_variants(simple_query)
# Verify response formatting
assert not result.startswith("Error")
async def test_search_variants_no_results(anyio_backend):
"""Test search_variants function with a query that returns no results."""
query = VariantQuery(gene="UNKNOWN_XYZ")
result = await search_variants(query, output_json=True)
assert result == "[]"
async def test_search_variants_with_limit(anyio_backend):
"""Test search_variants function with size limit."""
# Query with a small limit
query = VariantQuery(gene="TP53", size=3)
result = await search_variants(query)
# Result should be valid but limited
assert not result.startswith("Error")
```
--------------------------------------------------------------------------------
/tests/tdd/test_offline_mode.py:
--------------------------------------------------------------------------------
```python
"""Tests for offline mode functionality."""
import os
from unittest.mock import patch
import pytest
from biomcp.http_client import RequestError, request_api
@pytest.mark.asyncio
async def test_offline_mode_blocks_requests():
"""Test that offline mode prevents HTTP requests."""
# Set offline mode
with patch.dict(os.environ, {"BIOMCP_OFFLINE": "true"}):
# Try to make a request
result, error = await request_api(
url="https://api.example.com/test",
request={"test": "data"},
cache_ttl=0, # Disable caching for this test
)
# Should get an error
assert result is None
assert error is not None
assert isinstance(error, RequestError)
assert error.code == 503
assert "Offline mode enabled" in error.message
@pytest.mark.asyncio
async def test_offline_mode_allows_cached_responses():
"""Test that offline mode still returns cached responses."""
# First, cache a response (with offline mode disabled)
with (
patch.dict(os.environ, {"BIOMCP_OFFLINE": "false"}),
patch("biomcp.http_client.call_http") as mock_call,
):
mock_call.return_value = (200, '{"data": "cached"}')
# Make a request to cache it
result, error = await request_api(
url="https://api.example.com/cached",
request={"test": "data"},
cache_ttl=3600, # Cache for 1 hour
)
assert result == {"data": "cached"}
assert error is None
# Now enable offline mode
with patch.dict(os.environ, {"BIOMCP_OFFLINE": "true"}):
# Try to get the same request - should return cached result
result, error = await request_api(
url="https://api.example.com/cached",
request={"test": "data"},
cache_ttl=3600,
)
# Should get the cached response
assert result == {"data": "cached"}
assert error is None
@pytest.mark.asyncio
async def test_offline_mode_case_insensitive():
"""Test that offline mode environment variable is case insensitive."""
test_values = ["TRUE", "True", "1", "yes", "YES", "Yes"]
for value in test_values:
with patch.dict(os.environ, {"BIOMCP_OFFLINE": value}):
result, error = await request_api(
url="https://api.example.com/test",
request={"test": "data"},
cache_ttl=0,
)
assert result is None
assert error is not None
assert error.code == 503
assert "Offline mode enabled" in error.message
@pytest.mark.asyncio
async def test_offline_mode_disabled_by_default():
"""Test that offline mode is disabled by default."""
# Clear the environment variable
with (
patch.dict(os.environ, {}, clear=True),
patch("biomcp.http_client.call_http") as mock_call,
):
mock_call.return_value = (200, '{"data": "response"}')
result, error = await request_api(
url="https://api.example.com/test",
request={"test": "data"},
cache_ttl=0,
)
# Should make the request successfully
assert result == {"data": "response"}
assert error is None
mock_call.assert_called_once()
@pytest.mark.asyncio
async def test_offline_mode_with_endpoint_tracking():
"""Test that offline mode works with endpoint tracking."""
with patch.dict(os.environ, {"BIOMCP_OFFLINE": "true"}):
result, error = await request_api(
url="https://www.ncbi.nlm.nih.gov/research/pubtator3-api/search/",
request={"text": "BRAF"},
endpoint_key="pubtator3_search",
cache_ttl=0,
)
assert result is None
assert error is not None
assert error.code == 503
assert "pubtator3-api/search/" in error.message
```
--------------------------------------------------------------------------------
/src/biomcp/variants/links.py:
--------------------------------------------------------------------------------
```python
"""Functions for adding database links to variant data."""
from typing import Any
def _calculate_vcf_end(variant: dict[str, Any]) -> int:
"""Calculate the end position for UCSC Genome Browser link."""
if "vcf" not in variant:
return 0
vcf = variant["vcf"]
pos = int(vcf.get("position", 0))
ref = vcf.get("ref", "")
alt = vcf.get("alt", "")
# For insertions/deletions, handle special cases
if not ref and alt: # insertion
return pos + 1
elif ref and not alt: # deletion
return pos + len(ref)
else: # substitution
return pos + max(0, ((len(alt) + 1) - len(ref)))
def _get_first_value(data: Any) -> Any:
"""Get the first value from a list or return the value itself."""
if isinstance(data, list) and data:
return data[0]
return data
def _ensure_url_section(variant: dict[str, Any]) -> None:
"""Ensure the URL section exists in the variant."""
if "url" not in variant:
variant["url"] = {}
def _add_dbsnp_links(variant: dict[str, Any]) -> None:
"""Add dbSNP and Ensembl links if rsid is present."""
if "dbsnp" in variant and variant["dbsnp"].get("rsid"):
variant["dbsnp"]["url"] = (
f"https://www.ncbi.nlm.nih.gov/snp/{variant['dbsnp']['rsid']}"
)
_ensure_url_section(variant)
variant["url"]["ensembl"] = (
f"https://ensembl.org/Homo_sapiens/Variation/Explore?v={variant['dbsnp']['rsid']}"
)
def _add_clinvar_link(variant: dict[str, Any]) -> None:
"""Add ClinVar link if variant_id is present."""
if "clinvar" in variant and variant["clinvar"].get("variant_id"):
variant["clinvar"]["url"] = (
f"https://www.ncbi.nlm.nih.gov/clinvar/variation/{variant['clinvar']['variant_id']}/"
)
def _add_cosmic_link(variant: dict[str, Any]) -> None:
"""Add COSMIC link if cosmic_id is present."""
if "cosmic" in variant and variant["cosmic"].get("cosmic_id"):
variant["cosmic"]["url"] = (
f"https://cancer.sanger.ac.uk/cosmic/mutation/overview?id={variant['cosmic']['cosmic_id']}"
)
def _add_civic_link(variant: dict[str, Any]) -> None:
"""Add CIViC link if id is present."""
if "civic" in variant and variant["civic"].get("id"):
variant["civic"]["url"] = (
f"https://civicdb.org/variants/{variant['civic']['id']}/summary"
)
def _add_ucsc_link(variant: dict[str, Any]) -> None:
"""Add UCSC Genome Browser link if chromosome and position are present."""
if (
"chrom" in variant
and "vcf" in variant
and variant["vcf"].get("position")
):
vcf_end = _calculate_vcf_end(variant)
_ensure_url_section(variant)
variant["url"]["ucsc_genome_browser"] = (
f"https://genome.ucsc.edu/cgi-bin/hgTracks?db=hg19&"
f"position=chr{variant['chrom']}:{variant['vcf']['position']}-{vcf_end}"
)
def _add_hgnc_link(variant: dict[str, Any]) -> None:
"""Add HGNC link if gene name is present."""
if "dbnsfp" in variant and variant["dbnsfp"].get("genename"):
gene = _get_first_value(variant["dbnsfp"]["genename"])
if gene:
_ensure_url_section(variant)
variant["url"]["hgnc"] = (
f"https://www.genenames.org/data/gene-symbol-report/#!/symbol/{gene}"
)
def inject_links(variants: list[dict[str, Any]]) -> list[dict[str, Any]]:
"""
Inject database links into variant data.
Args:
variants: List of variant dictionaries from MyVariant.info API
Returns:
List of variant dictionaries with added URL links in appropriate sections
"""
for variant in variants:
_add_dbsnp_links(variant)
_add_clinvar_link(variant)
_add_cosmic_link(variant)
_add_civic_link(variant)
_add_ucsc_link(variant)
_add_hgnc_link(variant)
return variants
```
--------------------------------------------------------------------------------
/src/biomcp/organizations/getter.py:
--------------------------------------------------------------------------------
```python
"""Get specific organization details via NCI CTS API."""
import logging
from typing import Any
from ..constants import NCI_ORGANIZATIONS_URL
from ..integrations.cts_api import CTSAPIError, make_cts_request
logger = logging.getLogger(__name__)
async def get_organization(
org_id: str,
api_key: str | None = None,
) -> dict[str, Any]:
"""
Get detailed information about a specific organization.
Args:
org_id: Organization ID
api_key: Optional API key (if not provided, uses NCI_API_KEY env var)
Returns:
Dictionary with organization details
Raises:
CTSAPIError: If the API request fails or organization not found
"""
try:
# Make API request
url = f"{NCI_ORGANIZATIONS_URL}/{org_id}"
response = await make_cts_request(
url=url,
api_key=api_key,
)
# Return the organization data
# Handle different possible response formats
if "data" in response:
return response["data"]
elif "organization" in response:
return response["organization"]
else:
return response
except CTSAPIError:
raise
except Exception as e:
logger.error(f"Failed to get organization {org_id}: {e}")
raise CTSAPIError(f"Failed to retrieve organization: {e!s}") from e
def _format_address_fields(org: dict[str, Any]) -> list[str]:
"""Extract and format address fields from organization data."""
address_fields = []
if org.get("address"):
addr = org["address"]
if isinstance(addr, dict):
fields = [
addr.get("street", ""),
addr.get("city", ""),
addr.get("state", ""),
addr.get("zip", ""),
]
address_fields = [f for f in fields if f]
country = addr.get("country", "")
if country and country != "United States":
address_fields.append(country)
else:
# Try individual fields
city = org.get("city", "")
state = org.get("state", "")
address_fields = [p for p in [city, state] if p]
return address_fields
def _format_contact_info(org: dict[str, Any]) -> list[str]:
"""Format contact information lines."""
lines = []
if org.get("phone"):
lines.append(f"- **Phone**: {org['phone']}")
if org.get("email"):
lines.append(f"- **Email**: {org['email']}")
if org.get("website"):
lines.append(f"- **Website**: {org['website']}")
return lines
def format_organization_details(org: dict[str, Any]) -> str:
"""
Format organization details as markdown.
Args:
org: Organization data dictionary
Returns:
Formatted markdown string
"""
# Extract fields with defaults
org_id = org.get("id", org.get("org_id", "Unknown"))
name = org.get("name", "Unknown Organization")
org_type = org.get("type", org.get("category", "Unknown"))
# Build markdown output
lines = [
f"## Organization: {name}",
"",
"### Basic Information",
f"- **ID**: {org_id}",
f"- **Type**: {org_type}",
]
# Add location if available
address_fields = _format_address_fields(org)
if address_fields:
lines.append(f"- **Location**: {', '.join(address_fields)}")
# Add contact info
lines.extend(_format_contact_info(org))
# Add description if available
if org.get("description"):
lines.extend([
"",
"### Description",
org["description"],
])
# Add parent organization metadata
if org.get("parent_org"):
lines.extend([
"",
"### Parent Organization",
f"- **Name**: {org['parent_org'].get('name', 'Unknown')}",
f"- **ID**: {org['parent_org'].get('id', 'Unknown')}",
])
return "\n".join(lines)
```
--------------------------------------------------------------------------------
/tests/tdd/utils/test_request_cache.py:
--------------------------------------------------------------------------------
```python
"""Tests for request caching utilities."""
import asyncio
import pytest
from biomcp.utils.request_cache import (
clear_cache,
get_cached,
request_cache,
set_cached,
)
class TestRequestCache:
"""Test request caching functionality."""
@pytest.fixture(autouse=True)
async def clear_cache_before_test(self):
"""Clear cache before each test."""
await clear_cache()
yield
await clear_cache()
@pytest.mark.asyncio
async def test_basic_caching(self):
"""Test basic cache get/set operations."""
# Initially should be empty
result = await get_cached("test_key")
assert result is None
# Set a value
await set_cached("test_key", "test_value", ttl=10)
# Should retrieve the value
result = await get_cached("test_key")
assert result == "test_value"
@pytest.mark.asyncio
async def test_cache_expiry(self):
"""Test that cached values expire."""
# Set with very short TTL
await set_cached("test_key", "test_value", ttl=0.1)
# Should be available immediately
result = await get_cached("test_key")
assert result == "test_value"
# Wait for expiry
await asyncio.sleep(0.2)
# Should be expired
result = await get_cached("test_key")
assert result is None
@pytest.mark.asyncio
async def test_request_cache_decorator(self):
"""Test the @request_cache decorator."""
call_count = 0
@request_cache(ttl=10)
async def expensive_function(arg1, arg2):
nonlocal call_count
call_count += 1
return f"{arg1}-{arg2}-{call_count}"
# First call should execute function
result1 = await expensive_function("a", "b")
assert result1 == "a-b-1"
assert call_count == 1
# Second call with same args should use cache
result2 = await expensive_function("a", "b")
assert result2 == "a-b-1" # Same result
assert call_count == 1 # Function not called again
# Different args should execute function
result3 = await expensive_function("c", "d")
assert result3 == "c-d-2"
assert call_count == 2
@pytest.mark.asyncio
async def test_skip_cache_option(self):
"""Test that skip_cache bypasses caching."""
call_count = 0
@request_cache(ttl=10)
async def cached_function():
nonlocal call_count
call_count += 1
return call_count
# Normal call - cached
result1 = await cached_function()
assert result1 == 1
# Skip cache - new execution
result2 = await cached_function(skip_cache=True)
assert result2 == 2
# Normal call again - still cached
result3 = await cached_function()
assert result3 == 1
@pytest.mark.asyncio
async def test_none_values_not_cached(self):
"""Test that None return values are not cached."""
call_count = 0
@request_cache(ttl=10)
async def sometimes_none_function(return_none=False):
nonlocal call_count
call_count += 1
return None if return_none else call_count
# Return None - should not cache
result1 = await sometimes_none_function(return_none=True)
assert result1 is None
assert call_count == 1
# Call again - should execute again (not cached)
result2 = await sometimes_none_function(return_none=True)
assert result2 is None
assert call_count == 2
# Return value - should cache
result3 = await sometimes_none_function(return_none=False)
assert result3 == 3
assert call_count == 3
# Call again - should use cache
result4 = await sometimes_none_function(return_none=False)
assert result4 == 3
assert call_count == 3
```
--------------------------------------------------------------------------------
/docs/blog/ai-assisted-clinical-trial-search-analysis.md:
--------------------------------------------------------------------------------
```markdown
# AI-Assisted Clinical Trial Search: How BioMCP Transforms Research
Finding the right clinical trial for a research project has traditionally been
a complex process requiring specialized knowledge of database syntax and
medical terminology. BioMCP is changing this landscape by making clinical trial
data accessible through natural language conversation.
Video Link:
[](https://www.youtube.com/watch?v=jqGXXnVesjg&list=PLu1amIF_MEfPWhhEsXSuBi90S_xtmVJIW&index=2)
## Breaking Down the Barriers to Clinical Trial Information
BioMCP serves as a specialized Model Context Protocol (MCP) server that
empowers AI assistants and agents with tools to interact with critical
biomedical resources. For clinical trials specifically, BioMCP connects to the
ClinicalTrials.gov API, allowing researchers and clinicians to search and
retrieve trial information through simple conversational queries.
The power of this approach becomes apparent when we look at how it transforms a
complex search requirement. Imagine needing to find active clinical trials for
pembrolizumab (a cancer immunotherapy drug) specifically for non-small cell
lung carcinoma near Cleveland, Ohio. Traditionally, this would require:
1. Navigating to ClinicalTrials.gov
2. Understanding the proper search fields and syntax
3. Creating multiple filters for intervention (pembrolizumab), condition (
non-small cell lung carcinoma), status (recruiting), and location (Cleveland
area)
4. Interpreting the results
## From Natural Language to Precise Database Queries
With BioMCP, this entire process is streamlined into a simple natural language
request. The underlying large language model (LLM) interprets the query,
identifies the key entities (drug name, cancer type, location), and translates
these into the precise parameters needed for the ClinicalTrials.gov API.
The system returns relevant trials that match all criteria, presenting them in
an easy-to-understand format. But the interaction doesn't end there—BioMCP
maintains context throughout the conversation, enabling follow-up questions
like:
- Where exactly are these trials located and how far are they from downtown
Cleveland?
- What biomarker eligibility criteria do these trials require?
- Are there exclusion criteria I should be aware of?
For each of these questions, BioMCP calls the appropriate tool (trial
locations, trial protocols) and processes the information to provide meaningful
answers without requiring the user to navigate different interfaces or learn
new query languages.
## Beyond Basic Search: Understanding Trial Details
What truly sets BioMCP apart is its ability to go beyond simple listings. When
asked about biomarker eligibility criteria, the system can extract this
information from the full trial protocol, synthesize it, and present a clear
summary of requirements. This capability transforms what would typically be
hours of reading dense clinical documentation into a conversational exchange
that delivers precisely what the researcher needs.
## Transforming Clinical Research Workflows
The implications for clinical research are significant. By lowering the
technical barriers to accessing trial information, BioMCP can help:
- Researchers understand the landscape of current research in their field
- Research teams identify promising studies more efficiently
- Clinical research organizations track competing or complementary trials
- Research coordinators identify potential recruitment sites based on location
As part of the broader BioMCP ecosystem—which also includes access to genomic
variant information and PubMed literature—this clinical trial search capability
represents a fundamental shift in how we interact with biomedical information.
By bringing the power of natural language processing to specialized databases,
BioMCP is helping to democratize access to critical health information and
accelerate the research process.
```
--------------------------------------------------------------------------------
/src/biomcp/utils/query_utils.py:
--------------------------------------------------------------------------------
```python
"""Utilities for query parsing and manipulation."""
import re
from typing import Any
def parse_or_query(query: str) -> list[str]:
"""Parse OR query into individual search terms.
Handles formats like:
- "term1 OR term2"
- 'term1 OR term2 OR "term with spaces"'
- "TERM1 or term2 or term3" (case insensitive)
Args:
query: Query string that may contain OR operators
Returns:
List of individual search terms with quotes and whitespace cleaned
Examples:
>>> parse_or_query("PD-L1 OR CD274")
['PD-L1', 'CD274']
>>> parse_or_query('BRAF OR "v-raf murine" OR ARAF')
['BRAF', 'v-raf murine', 'ARAF']
"""
# Split by OR (case insensitive)
terms = re.split(r"\s+OR\s+", query, flags=re.IGNORECASE)
# Clean up each term - remove quotes and extra whitespace
cleaned_terms = []
for term in terms:
# Remove surrounding quotes (both single and double)
term = term.strip().strip('"').strip("'").strip()
if term:
cleaned_terms.append(term)
return cleaned_terms
def contains_or_operator(query: str) -> bool:
"""Check if a query contains OR operators.
Args:
query: Query string to check
Returns:
True if query contains " OR " or " or ", False otherwise
"""
return " OR " in query or " or " in query
async def search_with_or_support(
query: str,
search_func: Any,
search_params: dict[str, Any],
id_field: str = "id",
fallback_id_field: str | None = None,
) -> dict[str, Any]:
"""Generic OR query search handler.
This function handles OR queries by making multiple API calls and combining results.
Args:
query: Query string that may contain OR operators
search_func: Async search function to call for each term
search_params: Base parameters to pass to search function (excluding the query term)
id_field: Primary field name for deduplication (default: "id")
fallback_id_field: Alternative field name if primary is missing
Returns:
Combined results from all searches with duplicates removed
"""
# Check if this is an OR query
if contains_or_operator(query):
search_terms = parse_or_query(query)
else:
search_terms = [query]
# Collect all unique results
all_results = {}
total_found = 0
# Search for each term
for term in search_terms:
try:
# Call the search function with the term
results = await search_func(**{**search_params, "name": term})
# Extract results list (handle different response formats)
items_key = None
for key in [
"biomarkers",
"organizations",
"interventions",
"diseases",
"data",
"items",
]:
if key in results:
items_key = key
break
if not items_key:
continue
# Add unique items (deduplicate by ID)
for item in results.get(items_key, []):
item_id = item.get(id_field)
if not item_id and fallback_id_field:
item_id = item.get(fallback_id_field)
if item_id and item_id not in all_results:
all_results[item_id] = item
total_found += results.get("total", 0)
except Exception as e:
# Log the error and continue with other terms
import logging
logger = logging.getLogger(__name__)
logger.warning(f"Failed to search for term '{term}': {e}")
continue
# Convert back to list
unique_items = list(all_results.values())
# Return in standard format
return {
"items": unique_items,
"total": len(unique_items),
"search_terms": search_terms,
"total_found_across_terms": total_found,
}
```
--------------------------------------------------------------------------------
/tests/tdd/test_endpoint_documentation.py:
--------------------------------------------------------------------------------
```python
"""Test that endpoint documentation is kept up to date."""
import subprocess
import sys
from pathlib import Path
class TestEndpointDocumentation:
"""Test the endpoint documentation generation."""
def test_third_party_endpoints_file_exists(self):
"""Test that THIRD_PARTY_ENDPOINTS.md exists."""
endpoints_file = (
Path(__file__).parent.parent.parent / "THIRD_PARTY_ENDPOINTS.md"
)
assert endpoints_file.exists(), "THIRD_PARTY_ENDPOINTS.md must exist"
def test_endpoints_documentation_is_current(self):
"""Test that the endpoints documentation can be generated without errors."""
# Run the generation script
script_path = (
Path(__file__).parent.parent.parent
/ "scripts"
/ "generate_endpoints_doc.py"
)
result = subprocess.run( # noqa: S603
[sys.executable, str(script_path)],
capture_output=True,
text=True,
check=False,
)
assert result.returncode == 0, f"Script failed: {result.stderr}"
# The script should report that it generated the file
assert (
"Generated" in result.stdout or result.stdout == ""
), f"Unexpected output: {result.stdout}"
def test_all_endpoints_documented(self):
"""Test that all endpoints in the registry are documented."""
from biomcp.utils.endpoint_registry import get_registry
registry = get_registry()
endpoints = registry.get_all_endpoints()
# Read the documentation
endpoints_file = (
Path(__file__).parent.parent.parent / "THIRD_PARTY_ENDPOINTS.md"
)
content = endpoints_file.read_text()
# Check each endpoint is mentioned
for key, info in endpoints.items():
assert key in content, f"Endpoint {key} not found in documentation"
assert (
info.url in content
), f"URL {info.url} not found in documentation"
def test_documentation_contains_required_sections(self):
"""Test that documentation contains all required sections."""
endpoints_file = (
Path(__file__).parent.parent.parent / "THIRD_PARTY_ENDPOINTS.md"
)
content = endpoints_file.read_text()
required_sections = [
"# Third-Party Endpoints Used by BioMCP",
"## Overview",
"## Endpoints by Category",
"### Biomedical Literature",
"### Clinical Trials",
"### Variant Databases",
"### Cancer Genomics",
"## Domain Summary",
"## Compliance and Privacy",
"## Network Control",
"BIOMCP_OFFLINE",
]
for section in required_sections:
assert (
section in content
), f"Required section '{section}' not found in documentation"
def test_endpoint_counts_accurate(self):
"""Test that endpoint counts in the overview are accurate."""
from biomcp.utils.endpoint_registry import get_registry
registry = get_registry()
endpoints = registry.get_all_endpoints()
domains = registry.get_unique_domains()
endpoints_file = (
Path(__file__).parent.parent.parent / "THIRD_PARTY_ENDPOINTS.md"
)
content = endpoints_file.read_text()
# Extract counts from overview
import re
match = re.search(
r"BioMCP connects to (\d+) external domains across (\d+) endpoints",
content,
)
assert match, "Could not find endpoint counts in overview"
doc_domains = int(match.group(1))
doc_endpoints = int(match.group(2))
assert (
doc_domains == len(domains)
), f"Document says {doc_domains} domains but registry has {len(domains)}"
assert (
doc_endpoints == len(endpoints)
), f"Document says {doc_endpoints} endpoints but registry has {len(endpoints)}"
```
--------------------------------------------------------------------------------
/src/biomcp/cli/organizations.py:
--------------------------------------------------------------------------------
```python
"""CLI commands for organization search and lookup."""
import asyncio
from typing import Annotated
import typer
from ..integrations.cts_api import CTSAPIError, get_api_key_instructions
from ..organizations import get_organization, search_organizations
from ..organizations.getter import format_organization_details
from ..organizations.search import format_organization_results
organization_app = typer.Typer(
no_args_is_help=True,
help="Search and retrieve organization information from NCI CTS API",
)
@organization_app.command("search")
def search_organizations_cli(
name: Annotated[
str | None,
typer.Argument(
help="Organization name to search for (partial match supported)"
),
] = None,
org_type: Annotated[
str | None,
typer.Option(
"--type",
help="Type of organization (e.g., industry, academic)",
),
] = None,
city: Annotated[
str | None,
typer.Option(
"--city",
help="City location",
),
] = None,
state: Annotated[
str | None,
typer.Option(
"--state",
help="State location (2-letter code)",
),
] = None,
page_size: Annotated[
int,
typer.Option(
"--page-size",
help="Number of results per page",
min=1,
max=100,
),
] = 20,
page: Annotated[
int,
typer.Option(
"--page",
help="Page number",
min=1,
),
] = 1,
api_key: Annotated[
str | None,
typer.Option(
"--api-key",
help="NCI API key (overrides NCI_API_KEY env var)",
envvar="NCI_API_KEY",
),
] = None,
) -> None:
"""
Search for organizations in the NCI Clinical Trials database.
Examples:
# Search by name
biomcp organization search "MD Anderson"
# Search by type
biomcp organization search --type academic
# Search by location
biomcp organization search --city Boston --state MA
# Combine filters
biomcp organization search Cancer --type industry --state CA
"""
try:
results = asyncio.run(
search_organizations(
name=name,
org_type=org_type,
city=city,
state=state,
page_size=page_size,
page=page,
api_key=api_key,
)
)
output = format_organization_results(results)
typer.echo(output)
except CTSAPIError as e:
if "API key required" in str(e):
typer.echo(get_api_key_instructions())
else:
typer.echo(f"Error: {e}", err=True)
raise typer.Exit(1) from e
except Exception as e:
typer.echo(f"Unexpected error: {e}", err=True)
raise typer.Exit(1) from e
@organization_app.command("get")
def get_organization_cli(
org_id: Annotated[
str,
typer.Argument(help="Organization ID"),
],
api_key: Annotated[
str | None,
typer.Option(
"--api-key",
help="NCI API key (overrides NCI_API_KEY env var)",
envvar="NCI_API_KEY",
),
] = None,
) -> None:
"""
Get detailed information about a specific organization.
Example:
biomcp organization get ORG123456
"""
try:
org_data = asyncio.run(
get_organization(
org_id=org_id,
api_key=api_key,
)
)
output = format_organization_details(org_data)
typer.echo(output)
except CTSAPIError as e:
if "API key required" in str(e):
typer.echo(get_api_key_instructions())
else:
typer.echo(f"Error: {e}", err=True)
raise typer.Exit(1) from e
except Exception as e:
typer.echo(f"Unexpected error: {e}", err=True)
raise typer.Exit(1) from e
```
--------------------------------------------------------------------------------
/tests/bdd/search_variants/test_search.py:
--------------------------------------------------------------------------------
```python
import json
import shlex
from typing import Any
from assertpy import assert_that
from pytest_bdd import parsers, scenarios, then, when
from typer.testing import CliRunner
from biomcp.cli import app
scenarios("search.feature")
runner = CliRunner()
# Field mapping - Updated chromosome key
FIELD_MAP = {
"chromosome": ["chrom"],
"frequency": ["gnomad_exome", "af", "af"],
"gene": ["dbnsfp", "genename"],
"hgvsc": ["dbnsfp", "hgvsc"],
"hgvsp": ["dbnsfp", "hgvsp"],
"cadd": ["cadd", "phred"],
"polyphen": ["dbnsfp", "polyphen2", "hdiv", "pred"],
"position": ["vcf", "position"],
"rsid": ["dbsnp", "rsid"],
"sift": ["dbnsfp", "sift", "pred"],
"significance": ["clinvar", "rcv", "clinical_significance"],
"uniprot_id": ["mutdb", "uniprot_id"],
}
def get_value(data: dict, key: str) -> Any | None:
"""Extract value from nested dictionary using field mapping."""
key_path = FIELD_MAP.get(key, [key])
current_value = data.get("hits")
for key in key_path:
if isinstance(current_value, dict):
current_value = current_value.get(key)
elif isinstance(current_value, list):
current_value = current_value[0].get(key)
if current_value and isinstance(current_value, list):
return current_value[0]
return current_value
# --- @when Step ---
@when(
parsers.re(r'I run "(?P<command>.*?)"(?: #.*)?$'),
target_fixture="variants_data",
)
def variants_data(command) -> dict:
"""Run variant search command with --json and return parsed results."""
args = shlex.split(command)[1:] # trim 'biomcp'
args += ["--json"]
if "--size" not in args:
args.extend(["--size", "10"])
result = runner.invoke(app, args, catch_exceptions=False)
assert result.exit_code == 0, "CLI command failed"
data = json.loads(result.stdout)
return data
def normalize(v):
try:
return float(v)
except ValueError:
try:
return int(v)
except ValueError:
return v.lower()
@then(
parsers.re(
r"each variant should have (?P<field>\w+) that (?P<operator>(?:is|equal|to|contains|greater|less|than|or|\s)+)\s+(?P<expected>.+)$"
)
)
def check_variant_field(it, variants_data, field, operator, expected):
"""
For each variant, apply an assertpy operator against a given field.
Supports operator names with spaces (e.g. "is equal to") or underscores (e.g. "is_equal_to").
"""
# Normalize operator: lower case and replace spaces with underscores.
operator = operator.strip().lower().replace(" ", "_")
successes = set()
failures = set()
for v_num, value in it(FIELD_MAP, variants_data, field):
value = normalize(value)
expected = normalize(expected)
f = getattr(assert_that(value), operator)
try:
f(expected)
successes.add(v_num)
except AssertionError:
failures.add(v_num)
failures -= successes
assert len(failures) == 0, f"Failure: {field} {operator} {expected}"
@then(
parsers.re(
r"the number of variants (?P<operator>(?:is|equal|to|contains|greater|less|than|or|\s)+)\s+(?P<expected>\d+)$"
)
)
def number_of_variants_check(variants_data, operator, expected):
"""Check the number of variants returned."""
if (
isinstance(variants_data, list)
and len(variants_data) == 1
and "error" in variants_data[0]
):
count = 0 # If we have an error response, count as 0 variants
elif isinstance(variants_data, dict) and "variants" in variants_data:
# Handle new format with cBioPortal summary
count = len(variants_data["variants"])
elif isinstance(variants_data, dict) and "hits" in variants_data:
# Handle myvariant.info response format
count = len(variants_data["hits"])
else:
count = len(variants_data) if isinstance(variants_data, list) else 0
operator = operator.strip().lower().replace(" ", "_")
f = getattr(assert_that(count), operator)
f(int(expected))
```
--------------------------------------------------------------------------------
/src/biomcp/cli/diseases.py:
--------------------------------------------------------------------------------
```python
"""CLI commands for disease information and search."""
import asyncio
from typing import Annotated
import typer
from ..diseases import get_disease
from ..diseases.search import format_disease_results, search_diseases
from ..integrations.cts_api import CTSAPIError, get_api_key_instructions
disease_app = typer.Typer(
no_args_is_help=True,
help="Search and retrieve disease information",
)
@disease_app.command("get")
def get_disease_cli(
disease_name: Annotated[
str,
typer.Argument(help="Disease name or identifier"),
],
) -> None:
"""
Get disease information from MyDisease.info.
This returns detailed information including synonyms, definitions,
and database cross-references.
Examples:
biomcp disease get melanoma
biomcp disease get "lung cancer"
biomcp disease get GIST
"""
result = asyncio.run(get_disease(disease_name))
typer.echo(result)
@disease_app.command("search")
def search_diseases_cli(
name: Annotated[
str | None,
typer.Argument(
help="Disease name to search for (partial match supported)"
),
] = None,
include_synonyms: Annotated[
bool,
typer.Option(
"--synonyms/--no-synonyms",
help="[Deprecated] This option is ignored - API always searches synonyms",
),
] = True,
category: Annotated[
str | None,
typer.Option(
"--category",
help="Disease category/type filter",
),
] = None,
page_size: Annotated[
int,
typer.Option(
"--page-size",
help="Number of results per page",
min=1,
max=100,
),
] = 20,
page: Annotated[
int,
typer.Option(
"--page",
help="Page number",
min=1,
),
] = 1,
api_key: Annotated[
str | None,
typer.Option(
"--api-key",
help="NCI API key (overrides NCI_API_KEY env var)",
envvar="NCI_API_KEY",
),
] = None,
source: Annotated[
str,
typer.Option(
"--source",
help="Data source: 'mydisease' (default) or 'nci'",
show_choices=True,
),
] = "mydisease",
) -> None:
"""
Search for diseases in MyDisease.info or NCI CTS database.
The NCI source provides controlled vocabulary of cancer conditions
used in clinical trials, with official terms and synonyms.
Examples:
# Search MyDisease.info (default)
biomcp disease search melanoma
# Search NCI cancer terms
biomcp disease search melanoma --source nci
# Search without synonyms
biomcp disease search "breast cancer" --no-synonyms --source nci
# Filter by category
biomcp disease search --category neoplasm --source nci
"""
if source == "nci":
# Use NCI CTS API
try:
results = asyncio.run(
search_diseases(
name=name,
include_synonyms=include_synonyms,
category=category,
page_size=page_size,
page=page,
api_key=api_key,
)
)
output = format_disease_results(results)
typer.echo(output)
except CTSAPIError as e:
if "API key required" in str(e):
typer.echo(get_api_key_instructions())
else:
typer.echo(f"Error: {e}", err=True)
raise typer.Exit(1) from e
except Exception as e:
typer.echo(f"Unexpected error: {e}", err=True)
raise typer.Exit(1) from e
else:
# Default to MyDisease.info
# For now, just search by name
if name:
result = asyncio.run(get_disease(name))
typer.echo(result)
else:
typer.echo("Please provide a disease name to search for.")
raise typer.Exit(1)
```
--------------------------------------------------------------------------------
/tests/tdd/test_mcp_tools.py:
--------------------------------------------------------------------------------
```python
"""Tests for MCP tool wrappers."""
import json
from unittest.mock import patch
import pytest
from biomcp.articles.search import _article_searcher
class TestArticleSearcherMCPTool:
"""Test the _article_searcher MCP tool."""
@pytest.mark.asyncio
async def test_article_searcher_with_all_params(self):
"""Test article_searcher with all parameters."""
mock_results = [{"title": "Test Article", "pmid": 12345}]
with patch(
"biomcp.articles.search_optimized.article_searcher_optimized"
) as mock_search:
mock_search.return_value = json.dumps(mock_results)
await _article_searcher(
call_benefit="Testing search functionality",
chemicals="aspirin,ibuprofen",
diseases="cancer,diabetes",
genes="BRAF,TP53",
keywords="mutation,therapy",
variants="V600E,R175H",
include_preprints=True,
)
# Verify the function was called
mock_search.assert_called_once()
# Check the parameters were passed correctly
kwargs = mock_search.call_args[1]
assert kwargs["call_benefit"] == "Testing search functionality"
assert kwargs["chemicals"] == "aspirin,ibuprofen"
assert kwargs["diseases"] == "cancer,diabetes"
assert kwargs["genes"] == "BRAF,TP53"
assert kwargs["keywords"] == "mutation,therapy"
assert kwargs["variants"] == "V600E,R175H"
assert kwargs["include_preprints"] is True
assert kwargs.get("include_cbioportal", True) is True
@pytest.mark.asyncio
async def test_article_searcher_with_lists(self):
"""Test article_searcher with list inputs."""
with patch(
"biomcp.articles.search_optimized.article_searcher_optimized"
) as mock_search:
mock_search.return_value = "## Results"
await _article_searcher(
call_benefit="Testing with lists",
chemicals=["drug1", "drug2"],
diseases=["disease1"],
genes=["GENE1"],
include_preprints=False,
)
# Check list parameters were passed correctly
kwargs = mock_search.call_args[1]
assert kwargs["call_benefit"] == "Testing with lists"
assert kwargs["chemicals"] == ["drug1", "drug2"]
assert kwargs["diseases"] == ["disease1"]
assert kwargs["genes"] == ["GENE1"]
assert kwargs["include_preprints"] is False
@pytest.mark.asyncio
async def test_article_searcher_minimal_params(self):
"""Test article_searcher with minimal parameters."""
with patch(
"biomcp.articles.search_optimized.article_searcher_optimized"
) as mock_search:
mock_search.return_value = "## No results"
await _article_searcher(call_benefit="Minimal test")
# Should still work with no search parameters
kwargs = mock_search.call_args[1]
assert kwargs["call_benefit"] == "Minimal test"
assert kwargs.get("chemicals") is None
assert kwargs.get("diseases") is None
assert kwargs.get("genes") is None
assert kwargs.get("keywords") is None
assert kwargs.get("variants") is None
@pytest.mark.asyncio
async def test_article_searcher_empty_strings(self):
"""Test article_searcher with empty strings."""
with patch(
"biomcp.articles.search_optimized.article_searcher_optimized"
) as mock_search:
mock_search.return_value = "## Results"
await _article_searcher(
call_benefit="Empty string test",
chemicals="",
diseases="",
genes="",
)
# Empty strings are passed through
kwargs = mock_search.call_args[1]
assert kwargs["call_benefit"] == "Empty string test"
assert kwargs["chemicals"] == ""
assert kwargs["diseases"] == ""
assert kwargs["genes"] == ""
```
--------------------------------------------------------------------------------
/docs/tutorials/remote-connection.md:
--------------------------------------------------------------------------------
```markdown
# Connecting to Remote BioMCP
This guide walks you through connecting Claude to the remote BioMCP server, providing instant access to biomedical research tools without any local installation.
## Overview
The remote BioMCP server (https://remote.biomcp.org/mcp) provides cloud-hosted access to all BioMCP tools. This eliminates the need for local installation while maintaining full functionality.
!!! success "Benefits of Remote Connection" - **No Installation Required**: Start using BioMCP immediately - **Always Up-to-Date**: Automatically receive the latest features and improvements - **Cloud-Powered**: Leverage server-side resources for faster searches - **Secure Authentication**: Uses Google OAuth for secure access
!!! info "Privacy Notice"
We log user emails and queries to improve the service. All data is handled according to our privacy policy.
## Step-by-Step Setup
### Step 1: Access Custom Connectors
Navigate to the **Custom Connectors** section in your Claude interface. This is where you'll configure the connection to BioMCP.

### Step 2: Add Custom Connector
Click the **Add Custom Connector** button and enter the following details:
- **Name**: BioMCP
- **URL**: `https://remote.biomcp.org/mcp`

### Step 3: Verify Connector is Enabled
After adding, you should see BioMCP listed with an "Enabled" status. This confirms the connector was added successfully.

### Step 4: Connect to BioMCP
Return to the main Connectors section where you'll now see BioMCP available for connection. Click the **Connect** button.

### Step 5: Authenticate with Google
You'll be redirected to Google OAuth for authentication. Sign in with any valid Google account. This step ensures secure access to the service.

!!! note "Authentication" - Any valid Google account works - Your email is logged for service improvement - Authentication is handled securely through Google OAuth
### Step 6: Connection Success
Once authenticated, you'll see a successful connection message displaying the available tool count. As of January 2025, there are 23 tools available (this number may increase as new features are added).

## Verifying Your Connection
After successful connection, you can verify BioMCP is working by asking Claude:
```
What tools do you have available from BioMCP?
```
Claude should list the available tools including:
- Article search and retrieval (PubMed/PubTator3)
- Clinical trials search (ClinicalTrials.gov and NCI)
- Variant analysis (MyVariant.info)
- Gene, drug, and disease information
- Sequential thinking for complex research
## Troubleshooting
### Connection Failed
- Ensure you entered the URL exactly as shown: `https://remote.biomcp.org/mcp`
- Check your internet connection
- Try disconnecting and reconnecting
### Authentication Issues
- Make sure you're using a valid Google account
- Clear your browser cache if authentication hangs
- Try using a different browser if issues persist
### Tools Not Available
- Disconnect and reconnect to BioMCP
- Refresh your Claude session
- Contact support if tools remain unavailable
## Next Steps
Now that you're connected to BioMCP, you can:
1. **Search biomedical literature**: "Find recent papers on BRAF mutations in melanoma"
2. **Analyze clinical trials**: "What trials are recruiting for lung cancer with EGFR mutations?"
3. **Interpret variants**: "What is the clinical significance of TP53 p.R273H?"
4. **Explore drug information**: "Tell me about pembrolizumab's mechanism and indications"
## Support
For issues or questions about the remote BioMCP connection:
- GitHub Issues: [https://github.com/genomoncology/biomcp/issues](https://github.com/genomoncology/biomcp/issues)
- Documentation: [https://biomcp.org](https://biomcp.org)
```
--------------------------------------------------------------------------------
/tests/config/test_smithery_config.py:
--------------------------------------------------------------------------------
```python
#!/usr/bin/env python
"""
Test script to validate Smithery configuration against actual function implementations.
This script checks that the schema definitions in smithery.yaml match the expected
function parameters in your codebase.
"""
import os
from typing import Any
import pytest
import yaml
from pydantic import BaseModel
from biomcp.articles.search import PubmedRequest
# Import the functions we want to test
from biomcp.trials.search import TrialQuery
from biomcp.variants.search import VariantQuery
@pytest.fixture
def smithery_config():
"""Load the Smithery configuration."""
# Get the project root directory
project_root = os.path.abspath(
os.path.join(os.path.dirname(__file__), "../..")
)
config_path = os.path.join(project_root, "smithery.yaml")
with open(config_path) as f:
return yaml.safe_load(f)
def test_smithery_config(smithery_config):
"""Test that all tool schemas in smithery.yaml match the expected function parameters."""
# Functions to test and their expected parameter types
functions_to_test = {
"trial_searcher": {"param_name": "query", "expected_type": TrialQuery},
"variant_searcher": {
"param_name": "query",
"expected_type": VariantQuery,
},
"article_searcher": {
"param_name": "query",
"expected_type": PubmedRequest,
},
"trial_protocol": {"param_name": "nct_id", "expected_type": str},
"trial_locations": {"param_name": "nct_id", "expected_type": str},
"trial_outcomes": {"param_name": "nct_id", "expected_type": str},
"trial_references": {"param_name": "nct_id", "expected_type": str},
"article_details": {"param_name": "pmid", "expected_type": str},
"variant_details": {"param_name": "variant_id", "expected_type": str},
}
for tool_name, param_info in functions_to_test.items():
validate_tool_schema(smithery_config, tool_name, param_info)
def validate_tool_schema(
smithery_config, tool_name: str, param_info: dict[str, Any]
):
"""Validate that the tool schema in smithery.yaml matches the expected function parameter."""
param_name = param_info["param_name"]
expected_type = param_info["expected_type"]
# Check if the tool is defined in the smithery.yaml
assert tool_name in smithery_config.get(
"tools", {}
), f"Tool '{tool_name}' is not defined in smithery.yaml"
tool_config = smithery_config["tools"][tool_name]
# Check if the tool has an input schema
assert (
"input" in tool_config
), f"Tool '{tool_name}' does not have an input schema defined"
input_schema = tool_config["input"].get("schema", {})
# Check if the parameter is required
if issubclass(expected_type, BaseModel):
# For complex types like TrialQuery, check if 'query' is required
assert (
"required" in input_schema
), f"Tool '{tool_name}' does not have required parameters specified"
assert (
"query" in input_schema.get("required", [])
), f"Parameter 'query' for tool '{tool_name}' is not marked as required"
else:
assert (
"required" in input_schema
), f"Tool '{tool_name}' does not have required parameters specified"
assert (
param_name in input_schema.get("required", [])
), f"Parameter '{param_name}' for tool '{tool_name}' is not marked as required"
# For complex types (Pydantic models), check if the schema references the correct type
if issubclass(expected_type, BaseModel):
properties = input_schema.get("properties", {})
assert (
"query" in properties
), f"Tool '{tool_name}' does not have a 'query' property defined"
query_prop = properties["query"]
assert (
"$ref" in query_prop
), f"Tool '{tool_name}' query property does not reference a schema"
schema_ref = query_prop["$ref"]
expected_schema_name = expected_type.__name__
assert schema_ref.endswith(
expected_schema_name
), f"Tool '{tool_name}' references incorrect schema: {schema_ref}, expected: {expected_schema_name}"
```
--------------------------------------------------------------------------------
/scripts/check_http_imports.py:
--------------------------------------------------------------------------------
```python
#!/usr/bin/env python3
"""Check for direct HTTP library imports outside of allowed files."""
import ast
import sys
from pathlib import Path
# HTTP libraries to check for
HTTP_LIBRARIES = {
"httpx",
"aiohttp",
"requests",
"urllib3",
} # Note: urllib is allowed for URL parsing
# Files allowed to import HTTP libraries
ALLOWED_FILES = {
"http_client.py",
"http_client_simple.py",
"http_client_test.py",
"test_http_client.py",
"connection_pool.py", # Connection pooling infrastructure
}
# Additional allowed patterns (for version checks, etc.)
ALLOWED_PATTERNS = {
# Allow httpx import just for version check
("health.py", "httpx"): "version check only",
}
def _check_import_node(
node: ast.Import, file_name: str
) -> set[tuple[str, int]]:
"""Check ast.Import node for violations."""
violations = set()
for alias in node.names:
module_name = alias.name.split(".")[0]
if module_name in HTTP_LIBRARIES:
pattern_key = (file_name, module_name)
if pattern_key not in ALLOWED_PATTERNS:
violations.add((module_name, node.lineno))
return violations
def _check_import_from_node(
node: ast.ImportFrom, file_name: str
) -> set[tuple[str, int]]:
"""Check ast.ImportFrom node for violations."""
violations = set()
if node.module:
module_name = node.module.split(".")[0]
if module_name in HTTP_LIBRARIES:
pattern_key = (file_name, module_name)
if pattern_key not in ALLOWED_PATTERNS:
violations.add((module_name, node.lineno))
return violations
def check_imports(file_path: Path) -> set[tuple[str, int]]:
"""Check a Python file for HTTP library imports.
Returns set of (library, line_number) tuples for violations.
"""
violations = set()
# Check if this file is allowed
if file_path.name in ALLOWED_FILES:
return violations
try:
with open(file_path, encoding="utf-8") as f:
content = f.read()
tree = ast.parse(content)
for node in ast.walk(tree):
if isinstance(node, ast.Import):
violations.update(_check_import_node(node, file_path.name))
elif isinstance(node, ast.ImportFrom):
violations.update(
_check_import_from_node(node, file_path.name)
)
except Exception as e:
print(f"Error parsing {file_path}: {e}", file=sys.stderr)
return violations
def find_python_files(root_dir: Path) -> list[Path]:
"""Find all Python files in the project."""
python_files = []
for path in root_dir.rglob("*.py"):
# Skip virtual environments, cache, etc.
if any(
part.startswith(".")
or part in ["__pycache__", "venv", "env", ".tox"]
for part in path.parts
):
continue
python_files.append(path)
return python_files
def main():
"""Main function to check all Python files."""
# Get project root (parent of scripts directory)
script_dir = Path(__file__).parent
project_root = script_dir.parent
src_dir = project_root / "src"
# Find all Python files
python_files = find_python_files(src_dir)
all_violations = []
for file_path in python_files:
violations = check_imports(file_path)
if violations:
for lib, line in violations:
all_violations.append((file_path, lib, line))
if all_violations:
print("❌ Found direct HTTP library imports:\n")
for file_path, lib, line in sorted(all_violations):
rel_path = file_path.relative_to(project_root)
print(f" {rel_path}:{line} - imports '{lib}'")
print(f"\n❌ Total violations: {len(all_violations)}")
print(
"\nPlease use the centralized HTTP client (biomcp.http_client) instead."
)
print(
"If you need to add an exception, update ALLOWED_FILES or ALLOWED_PATTERNS in this script."
)
return 1
else:
print("✅ No direct HTTP library imports found outside allowed files.")
return 0
if __name__ == "__main__":
sys.exit(main())
```
--------------------------------------------------------------------------------
/src/biomcp/variants/cbioportal_search_helpers.py:
--------------------------------------------------------------------------------
```python
"""Helper functions for cBioPortal search to reduce complexity."""
import logging
import re
from typing import Any
from .cbioportal_search import GeneHotspot
logger = logging.getLogger(__name__)
async def process_mutation_results(
mutation_results: list[tuple[Any, str]],
cancer_types_lookup: dict[str, dict[str, Any]],
client: Any,
) -> dict[str, Any]:
"""Process mutation results from multiple studies.
Args:
mutation_results: List of (result, study_id) tuples
cancer_types_lookup: Cancer type lookup dictionary
client: Client instance for API calls
Returns:
Dictionary with aggregated mutation data
"""
total_mutations = 0
total_samples = 0
hotspot_counts: dict[str, dict[str, Any]] = {}
cancer_distribution: dict[str, int] = {}
studies_with_data = 0
for result, study_id in mutation_results:
if isinstance(result, Exception):
logger.debug(f"Failed to get mutations for {study_id}: {result}")
continue
if result and "mutations" in result:
mutations = result["mutations"]
sample_count = result["sample_count"]
if mutations:
studies_with_data += 1
# Count unique samples with mutations
unique_samples = {
m.get("sampleId") for m in mutations if m.get("sampleId")
}
total_mutations += len(unique_samples)
total_samples += sample_count
# Process mutations for hotspots and cancer types
study_cancer_type = await client._get_study_cancer_type(
study_id, cancer_types_lookup
)
_update_hotspot_counts(
mutations, hotspot_counts, study_cancer_type
)
_update_cancer_distribution(
mutations, cancer_distribution, study_cancer_type
)
return {
"total_mutations": total_mutations,
"total_samples": total_samples,
"studies_with_data": studies_with_data,
"hotspot_counts": hotspot_counts,
"cancer_distribution": cancer_distribution,
}
def _update_hotspot_counts(
mutations: list[dict[str, Any]],
hotspot_counts: dict[str, dict[str, Any]],
cancer_type: str,
) -> None:
"""Update hotspot counts from mutations."""
for mut in mutations:
protein_change = mut.get("proteinChange", "")
if protein_change:
if protein_change not in hotspot_counts:
hotspot_counts[protein_change] = {
"count": 0,
"cancer_types": set(),
}
hotspot_counts[protein_change]["count"] += 1
hotspot_counts[protein_change]["cancer_types"].add(cancer_type)
def _update_cancer_distribution(
mutations: list[dict[str, Any]],
cancer_distribution: dict[str, int],
cancer_type: str,
) -> None:
"""Update cancer type distribution."""
cancer_distribution[cancer_type] = cancer_distribution.get(
cancer_type, 0
) + len({m.get("sampleId") for m in mutations if m.get("sampleId")})
def format_hotspots(
hotspot_counts: dict[str, dict[str, Any]], total_mutations: int
) -> list[GeneHotspot]:
"""Format hotspot counts into GeneHotspot objects."""
hotspots = []
for protein_change, data in sorted(
hotspot_counts.items(), key=lambda x: x[1]["count"], reverse=True
)[:5]: # Top 5 hotspots
# Try to extract position from protein change
position = 0
try:
match = re.search(r"(\d+)", protein_change)
if match:
position = int(match.group(1))
except Exception:
logger.debug("Failed to extract position from protein change")
hotspots.append(
GeneHotspot(
position=position,
amino_acid_change=protein_change,
count=data["count"],
frequency=data["count"] / total_mutations
if total_mutations > 0
else 0.0,
cancer_types=list(data["cancer_types"]),
)
)
return hotspots
```
--------------------------------------------------------------------------------
/tests/tdd/workers/test_worker_sanitization.js:
--------------------------------------------------------------------------------
```javascript
/**
* Tests for worker_entry_stytch.js sanitization functionality
*/
const { test } = require("node:test");
const assert = require("node:assert");
// Mock the sanitizeObject function for testing
const SENSITIVE_FIELDS = [
"api_key",
"apiKey",
"api-key",
"token",
"secret",
"password",
];
const sanitizeObject = (obj) => {
if (!obj || typeof obj !== "object") return obj;
// Handle arrays
if (Array.isArray(obj)) {
return obj.map((item) => sanitizeObject(item));
}
// Handle objects
const sanitized = {};
for (const [key, value] of Object.entries(obj)) {
// Check if this key is sensitive
const lowerKey = key.toLowerCase();
if (
SENSITIVE_FIELDS.some((field) => lowerKey.includes(field.toLowerCase()))
) {
sanitized[key] = "[REDACTED]";
} else if (typeof value === "object" && value !== null) {
// Recursively sanitize nested objects
sanitized[key] = sanitizeObject(value);
} else {
sanitized[key] = value;
}
}
return sanitized;
};
// Test cases
test("should redact api_key field", () => {
const input = {
params: {
arguments: {
api_key: "AIzaSyB1234567890",
gene: "BRAF",
position: 140753336,
},
},
};
const result = sanitizeObject(input);
assert.strictEqual(result.params.arguments.api_key, "[REDACTED]");
assert.strictEqual(result.params.arguments.gene, "BRAF");
assert.strictEqual(result.params.arguments.position, 140753336);
});
test("should handle nested sensitive fields", () => {
const input = {
outer: {
token: "secret-token",
inner: {
password: "my-password",
apiKey: "another-key",
safe_field: "visible",
},
},
};
const result = sanitizeObject(input);
assert.strictEqual(result.outer.token, "[REDACTED]");
assert.strictEqual(result.outer.inner.password, "[REDACTED]");
assert.strictEqual(result.outer.inner.apiKey, "[REDACTED]");
assert.strictEqual(result.outer.inner.safe_field, "visible");
});
test("should handle arrays with sensitive data", () => {
const input = {
requests: [
{ api_key: "key1", data: "safe" },
{ api_key: "key2", data: "also safe" },
],
};
const result = sanitizeObject(input);
assert.strictEqual(result.requests[0].api_key, "[REDACTED]");
assert.strictEqual(result.requests[1].api_key, "[REDACTED]");
assert.strictEqual(result.requests[0].data, "safe");
assert.strictEqual(result.requests[1].data, "also safe");
});
test("should be case-insensitive for field names", () => {
const input = {
API_KEY: "uppercase",
Api_Key: "mixed",
"api-key": "hyphenated",
};
const result = sanitizeObject(input);
assert.strictEqual(result.API_KEY, "[REDACTED]");
assert.strictEqual(result.Api_Key, "[REDACTED]");
assert.strictEqual(result["api-key"], "[REDACTED]");
});
test("should not modify non-sensitive fields", () => {
const input = {
gene: "TP53",
chromosome: "chr17",
position: 7577121,
reference: "C",
alternate: "T",
};
const result = sanitizeObject(input);
assert.deepStrictEqual(result, input);
});
test("should handle null and undefined values", () => {
const input = {
api_key: null,
token: undefined,
valid: "data",
};
const result = sanitizeObject(input);
assert.strictEqual(result.api_key, "[REDACTED]");
assert.strictEqual(result.token, "[REDACTED]");
assert.strictEqual(result.valid, "data");
});
test("should handle think tool detection", () => {
const thinkRequest = {
params: {
name: "think",
arguments: {
thought: "Analyzing the problem...",
thoughtNumber: 1,
},
},
};
const toolName = thinkRequest.params?.name;
assert.strictEqual(toolName, "think");
});
test("should handle domain-based filtering", () => {
const searchRequest1 = {
params: {
name: "search",
arguments: {
domain: "thinking",
query: "some query",
},
},
};
const searchRequest2 = {
params: {
name: "search",
arguments: {
domain: "think",
query: "some query",
},
},
};
const domain1 = searchRequest1.params?.arguments?.domain;
const domain2 = searchRequest2.params?.arguments?.domain;
assert.ok(domain1 === "thinking" || domain1 === "think");
assert.ok(domain2 === "thinking" || domain2 === "think");
});
```
--------------------------------------------------------------------------------
/src/biomcp/cli/interventions.py:
--------------------------------------------------------------------------------
```python
"""CLI commands for intervention search and lookup."""
import asyncio
from typing import Annotated
import typer
from ..integrations.cts_api import CTSAPIError, get_api_key_instructions
from ..interventions import get_intervention, search_interventions
from ..interventions.getter import format_intervention_details
from ..interventions.search import (
INTERVENTION_TYPES,
format_intervention_results,
)
intervention_app = typer.Typer(
no_args_is_help=True,
help="Search and retrieve intervention information from NCI CTS API",
)
@intervention_app.command("search")
def search_interventions_cli(
name: Annotated[
str | None,
typer.Argument(
help="Intervention name to search for (partial match supported)"
),
] = None,
intervention_type: Annotated[
str | None,
typer.Option(
"--type",
help=f"Type of intervention. Options: {', '.join(INTERVENTION_TYPES)}",
show_choices=True,
),
] = None,
synonyms: Annotated[
bool,
typer.Option(
"--synonyms/--no-synonyms",
help="Include synonym matches in search",
),
] = True,
page_size: Annotated[
int,
typer.Option(
"--page-size",
help="Number of results per page",
min=1,
max=100,
),
] = 20,
page: Annotated[
int,
typer.Option(
"--page",
help="Page number",
min=1,
),
] = 1,
api_key: Annotated[
str | None,
typer.Option(
"--api-key",
help="NCI API key (overrides NCI_API_KEY env var)",
envvar="NCI_API_KEY",
),
] = None,
) -> None:
"""
Search for interventions (drugs, devices, procedures) in the NCI database.
Examples:
# Search by drug name
biomcp intervention search pembrolizumab
# Search by type
biomcp intervention search --type Drug
# Search for devices
biomcp intervention search "CAR T" --type Biological
# Search without synonyms
biomcp intervention search imatinib --no-synonyms
"""
try:
results = asyncio.run(
search_interventions(
name=name,
intervention_type=intervention_type,
synonyms=synonyms,
page_size=page_size,
page=page,
api_key=api_key,
)
)
output = format_intervention_results(results)
typer.echo(output)
except CTSAPIError as e:
if "API key required" in str(e):
typer.echo(get_api_key_instructions())
else:
typer.echo(f"Error: {e}", err=True)
raise typer.Exit(1) from e
except Exception as e:
typer.echo(f"Unexpected error: {e}", err=True)
raise typer.Exit(1) from e
@intervention_app.command("get")
def get_intervention_cli(
intervention_id: Annotated[
str,
typer.Argument(help="Intervention ID"),
],
api_key: Annotated[
str | None,
typer.Option(
"--api-key",
help="NCI API key (overrides NCI_API_KEY env var)",
envvar="NCI_API_KEY",
),
] = None,
) -> None:
"""
Get detailed information about a specific intervention.
Example:
biomcp intervention get INT123456
"""
try:
intervention_data = asyncio.run(
get_intervention(
intervention_id=intervention_id,
api_key=api_key,
)
)
output = format_intervention_details(intervention_data)
typer.echo(output)
except CTSAPIError as e:
if "API key required" in str(e):
typer.echo(get_api_key_instructions())
else:
typer.echo(f"Error: {e}", err=True)
raise typer.Exit(1) from e
except Exception as e:
typer.echo(f"Unexpected error: {e}", err=True)
raise typer.Exit(1) from e
@intervention_app.command("types")
def list_intervention_types() -> None:
"""
List all available intervention types.
"""
typer.echo("## Available Intervention Types\n")
for int_type in INTERVENTION_TYPES:
typer.echo(f"- {int_type}")
typer.echo("\nUse these values with the --type option when searching.")
```
--------------------------------------------------------------------------------
/tests/tdd/test_pten_r173_search.py:
--------------------------------------------------------------------------------
```python
"""Test case demonstrating PTEN R173 search limitations."""
import asyncio
import json
import pytest
from biomcp.articles.search import PubmedRequest, search_articles
@pytest.mark.asyncio
async def test_pten_r173_search_limitations():
"""Demonstrate that current AND logic is too restrictive for finding PTEN R173 papers."""
# Test 1: Current approach with multiple keywords
request_restrictive = PubmedRequest(
genes=["PTEN"], keywords=["R173", "Arg173"]
)
result_restrictive = await search_articles(
request_restrictive, output_json=True
)
data_restrictive = json.loads(result_restrictive)
# Test 2: Less restrictive approach
request_less_restrictive = PubmedRequest(genes=["PTEN"], keywords=["R173"])
result_less_restrictive = await search_articles(
request_less_restrictive, output_json=True
)
data_less_restrictive = json.loads(result_less_restrictive)
# Test 3: Alternative variant notations
request_notation = PubmedRequest(genes=["PTEN"], keywords=["p.R173C"])
result_notation = await search_articles(request_notation, output_json=True)
data_notation = json.loads(result_notation)
print("\nPTEN R173 Search Results:")
print(
f"1. PTEN + R173 + Arg173 (AND logic): {len(data_restrictive)} articles"
)
print(f"2. PTEN + R173 only: {len(data_less_restrictive)} articles")
print(f"3. PTEN + p.R173C: {len(data_notation)} articles")
# The restrictive search should find fewer results
assert len(data_restrictive) <= len(data_less_restrictive)
# Show some example articles found
if data_less_restrictive:
print("\nExample articles found with 'PTEN + R173':")
for i, article in enumerate(data_less_restrictive[:5]):
title = article.get("title", "No title")
pmid = article.get("pmid", "N/A")
year = article.get("pub_year", article.get("date", "N/A"))
print(f"{i + 1}. {title[:80]}... (PMID: {pmid}, Year: {year[:4]})")
@pytest.mark.asyncio
async def test_specific_pten_papers_not_found():
"""Test that specific PTEN R173 papers mentioned by user are not found."""
# Papers mentioned by user that should be found
expected_papers = [
"Mester et al 2018 Human Mutation",
"Mighell et al 2020 AJHG",
"Smith et al 2016 Proteins",
"Smith et al 2019 AJHG",
"Smith et al 2023 JPCB",
]
# Search for Smith IN papers on PTEN
request = PubmedRequest(keywords=["Smith IN", "PTEN"])
result = await search_articles(request, output_json=True)
data = json.loads(result)
print(f"\nSmith IN + PTEN search found {len(data)} articles")
# Check if any contain R173 in title/abstract
r173_papers = []
for article in data:
title = article.get("title", "")
abstract = article.get("abstract", "")
if (
"R173" in title
or "R173" in abstract
or "Arg173" in title
or "Arg173" in abstract
):
r173_papers.append(article)
print(f"Papers mentioning R173/Arg173: {len(r173_papers)}")
# The issue: R173 might only be in full text, not abstract
assert len(r173_papers) < len(
expected_papers
), "Not all expected R173 papers are found"
def test_and_logic_explanation():
"""Document why AND logic causes issues for variant searches."""
explanation = """
Current search behavior:
- Query: genes=['PTEN'], keywords=['R173', 'Arg173']
- Translates to: "@GENE_PTEN AND R173 AND Arg173"
- This requires ALL terms to be present
Issues:
1. Papers may use either "R173" OR "Arg173", not both
2. Variant notations vary: "R173C", "p.R173C", "c.517C>T", etc.
3. Specific mutation details may only be in full text, not abstract
4. AND logic is too restrictive for synonym/variant searches
Potential solutions:
1. Implement OR logic within variant/keyword groups
2. Add variant notation normalization
3. Support multiple search strategies (AND vs OR)
4. Consider full-text search capabilities
"""
print(explanation)
assert True # This test is for documentation
if __name__ == "__main__":
# Run the tests to demonstrate the issue
asyncio.run(test_pten_r173_search_limitations())
asyncio.run(test_specific_pten_papers_not_found())
test_and_logic_explanation()
```
--------------------------------------------------------------------------------
/src/biomcp/interventions/getter.py:
--------------------------------------------------------------------------------
```python
"""Get specific intervention details via NCI CTS API."""
import logging
from typing import Any
from ..constants import NCI_INTERVENTIONS_URL
from ..integrations.cts_api import CTSAPIError, make_cts_request
logger = logging.getLogger(__name__)
async def get_intervention(
intervention_id: str,
api_key: str | None = None,
) -> dict[str, Any]:
"""
Get detailed information about a specific intervention.
Args:
intervention_id: Intervention ID
api_key: Optional API key (if not provided, uses NCI_API_KEY env var)
Returns:
Dictionary with intervention details
Raises:
CTSAPIError: If the API request fails or intervention not found
"""
try:
# Make API request
url = f"{NCI_INTERVENTIONS_URL}/{intervention_id}"
response = await make_cts_request(
url=url,
api_key=api_key,
)
# Return the intervention data
if "data" in response:
return response["data"]
elif "intervention" in response:
return response["intervention"]
else:
return response
except CTSAPIError:
raise
except Exception as e:
logger.error(f"Failed to get intervention {intervention_id}: {e}")
raise CTSAPIError(f"Failed to retrieve intervention: {e!s}") from e
def _format_intervention_header(intervention: dict[str, Any]) -> list[str]:
"""Format intervention header and basic info."""
int_id = intervention.get(
"id", intervention.get("intervention_id", "Unknown")
)
name = intervention.get("name", "Unknown Intervention")
int_type = intervention.get(
"type", intervention.get("category", "Unknown")
)
return [
f"## Intervention: {name}",
"",
"### Basic Information",
f"- **ID**: {int_id}",
f"- **Type**: {int_type}",
]
def _format_intervention_synonyms(synonyms: Any) -> list[str]:
"""Format intervention synonyms section."""
if not synonyms:
return []
lines = ["", "### Synonyms"]
if isinstance(synonyms, list):
for syn in synonyms:
lines.append(f"- {syn}")
else:
lines.append(f"- {synonyms}")
return lines
def _format_intervention_regulatory(intervention: dict[str, Any]) -> list[str]:
"""Format regulatory information section."""
if not intervention.get("fda_approved"):
return []
lines = [
"",
"### Regulatory Status",
f"- **FDA Approved**: {'Yes' if intervention['fda_approved'] else 'No'}",
]
if intervention.get("approval_date"):
lines.append(f"- **Approval Date**: {intervention['approval_date']}")
return lines
def _format_intervention_indications(indications: Any) -> list[str]:
"""Format clinical indications section."""
if not indications:
return []
lines = ["", "### Clinical Indications"]
if isinstance(indications, list):
for indication in indications:
lines.append(f"- {indication}")
else:
lines.append(f"- {indications}")
return lines
def format_intervention_details(intervention: dict[str, Any]) -> str:
"""
Format intervention details as markdown.
Args:
intervention: Intervention data dictionary
Returns:
Formatted markdown string
"""
lines = _format_intervention_header(intervention)
# Add synonyms
lines.extend(
_format_intervention_synonyms(intervention.get("synonyms", []))
)
# Add description
if intervention.get("description"):
lines.extend([
"",
"### Description",
intervention["description"],
])
# Add mechanism of action for drugs
if intervention.get("mechanism_of_action"):
lines.extend([
"",
"### Mechanism of Action",
intervention["mechanism_of_action"],
])
# Add regulatory info
lines.extend(_format_intervention_regulatory(intervention))
# Add clinical indications
lines.extend(
_format_intervention_indications(intervention.get("indications"))
)
# Add related trials count if available
if intervention.get("trial_count"):
lines.extend([
"",
"### Clinical Trial Activity",
f"- **Number of Trials**: {intervention['trial_count']}",
])
return "\n".join(lines)
```
--------------------------------------------------------------------------------
/src/biomcp/thinking/session.py:
--------------------------------------------------------------------------------
```python
"""Session management for sequential thinking."""
import uuid
from collections import defaultdict
from dataclasses import dataclass, field
from datetime import datetime
from typing import Any
@dataclass
class ThoughtEntry:
"""Represents a single thought in the thinking process."""
thought: str
thought_number: int
total_thoughts: int
next_thought_needed: bool
timestamp: datetime = field(default_factory=datetime.now)
is_revision: bool = False
revises_thought: int | None = None
branch_from_thought: int | None = None
branch_id: str | None = None
metadata: dict[str, Any] = field(default_factory=dict)
@dataclass
class ThinkingSession:
"""Manages state for a thinking session."""
session_id: str = field(default_factory=lambda: str(uuid.uuid4()))
created_at: datetime = field(default_factory=datetime.now)
thought_history: list[ThoughtEntry] = field(default_factory=list)
thought_branches: dict[str, list[ThoughtEntry]] = field(
default_factory=lambda: defaultdict(list)
)
metadata: dict[str, Any] = field(default_factory=dict)
def add_thought(self, entry: ThoughtEntry) -> None:
"""Add a thought to the session."""
# If this is a revision, replace the original thought
if entry.is_revision and entry.revises_thought:
for i, thought in enumerate(self.thought_history):
if thought.thought_number == entry.revises_thought:
self.thought_history[i] = entry
return
# Add to appropriate collection
if entry.branch_id:
self.thought_branches[entry.branch_id].append(entry)
else:
self.thought_history.append(entry)
def get_thought(self, thought_number: int) -> ThoughtEntry | None:
"""Get a specific thought by number."""
for thought in self.thought_history:
if thought.thought_number == thought_number:
return thought
return None
def get_branch_thoughts(self, branch_id: str) -> list[ThoughtEntry]:
"""Get all thoughts in a specific branch."""
return self.thought_branches.get(branch_id, [])
def get_all_thoughts(self) -> list[ThoughtEntry]:
"""Get all thoughts across main history and branches."""
all_thoughts = list(self.thought_history)
for branch_thoughts in self.thought_branches.values():
all_thoughts.extend(branch_thoughts)
return sorted(all_thoughts, key=lambda t: t.timestamp)
class SessionManager:
"""Manages multiple thinking sessions."""
def __init__(self):
self.sessions: dict[str, ThinkingSession] = {}
self._current_session_id: str | None = None
def create_session(self) -> ThinkingSession:
"""Create a new thinking session."""
session = ThinkingSession()
self.sessions[session.session_id] = session
self._current_session_id = session.session_id
return session
def get_session(
self, session_id: str | None = None
) -> ThinkingSession | None:
"""Get a session by ID or the current session."""
if session_id:
return self.sessions.get(session_id)
elif self._current_session_id:
return self.sessions.get(self._current_session_id)
return None
def get_or_create_session(
self, session_id: str | None = None
) -> ThinkingSession:
"""Get existing session or create new one."""
if session_id and session_id in self.sessions:
self._current_session_id = session_id
return self.sessions[session_id]
session = self.get_session()
if not session:
session = self.create_session()
return session
def clear_session(self, session_id: str | None = None) -> None:
"""Clear a specific session or the current session."""
if session_id:
self.sessions.pop(session_id, None)
if self._current_session_id == session_id:
self._current_session_id = None
elif self._current_session_id:
self.sessions.pop(self._current_session_id, None)
self._current_session_id = None
def clear_all_sessions(self) -> None:
"""Clear all sessions."""
self.sessions.clear()
self._current_session_id = None
# Global session manager instance
_session_manager = SessionManager()
```
--------------------------------------------------------------------------------
/.github/workflows/ci.yml:
--------------------------------------------------------------------------------
```yaml
name: CI
on:
push:
branches: [main, develop]
pull_request:
branches: [main]
workflow_dispatch:
env:
PYTHON_VERSION: "3.12"
UV_VERSION: "0.4.29"
jobs:
# Quality check from main.yml - uses make check
quality:
runs-on: ubuntu-latest
name: Quality
steps:
- name: Check out
uses: actions/checkout@v6
- uses: actions/cache@v5
with:
path: ~/.cache/pre-commit
key: pre-commit-${{ hashFiles('.pre-commit-config.yaml') }}
- name: Set up Python
uses: actions/setup-python@v6
with:
python-version: ${{ env.PYTHON_VERSION }}
- name: Install uv
uses: astral-sh/setup-uv@v7
with:
version: ${{ env.UV_VERSION }}
- name: Install dependencies
run: |
uv sync --group dev
- name: Run checks
run: make check
# Tests and type check specifically on Python 3.11
tests-and-type-check:
runs-on: ubuntu-latest
name: Tests and Type Check (Python 3.11)
steps:
- name: Check out
uses: actions/checkout@v6
- name: Set up Python
uses: actions/setup-python@v6
with:
python-version: "3.11"
- name: Install uv
uses: astral-sh/setup-uv@v7
with:
version: ${{ env.UV_VERSION }}
- name: Install dependencies
run: |
uv sync --group dev
- name: Run tests
run: uv run python -m pytest tests -m "not integration" --cov --cov-config=pyproject.toml --cov-report=xml
- name: Check typing
run: uv run mypy
- name: Upload coverage reports to Codecov with GitHub Action on Python 3.11
uses: codecov/codecov-action@v5
# Documentation check from main.yml
check-docs:
runs-on: ubuntu-latest
name: Check Docs
steps:
- name: Check out
uses: actions/checkout@v6
- name: Set up Python
uses: actions/setup-python@v6
with:
python-version: ${{ env.PYTHON_VERSION }}
- name: Install uv
uses: astral-sh/setup-uv@v7
with:
version: ${{ env.UV_VERSION }}
- name: Install dependencies
run: |
uv sync --group dev
- name: Check if documentation can be built
run: uv run mkdocs build -s
# Build package check
build-package:
runs-on: ubuntu-latest
name: Build Package
steps:
- uses: actions/checkout@v6
- name: Set up Python
uses: actions/setup-python@v6
with:
python-version: ${{ env.PYTHON_VERSION }}
- name: Install uv
uses: astral-sh/setup-uv@v7
with:
version: ${{ env.UV_VERSION }}
- name: Build package
run: |
uvx --from build pyproject-build --installer uv
- name: Check package
run: |
uvx twine check dist/*
- name: Upload artifacts
uses: actions/upload-artifact@v6
with:
name: dist
path: dist/
# MCP integration test - quick check
test-mcp:
runs-on: ubuntu-latest
name: Test MCP Integration
steps:
- uses: actions/checkout@v6
- name: Set up Python
uses: actions/setup-python@v6
with:
python-version: ${{ env.PYTHON_VERSION }}
- name: Install uv
uses: astral-sh/setup-uv@v7
with:
version: ${{ env.UV_VERSION }}
- name: Install dependencies
run: |
uv sync --group dev
- name: Test MCP server startup
run: |
timeout 10s uv run biomcp run || code=$?; if [[ $code -ne 124 && $code -ne 0 ]]; then exit $code; fi
- name: Run MCP integration tests
run: |
uv run python -m pytest tests/tdd/test_mcp_integration.py -v
# Run integration tests separately - allowed to fail
integration-tests:
runs-on: ubuntu-latest
name: Integration Tests (Optional)
continue-on-error: true
steps:
- name: Check out
uses: actions/checkout@v6
- name: Set up Python
uses: actions/setup-python@v6
with:
python-version: "3.11"
- name: Install uv
uses: astral-sh/setup-uv@v7
with:
version: ${{ env.UV_VERSION }}
- name: Install dependencies
run: |
uv sync --group dev
- name: Run integration tests
run: |
uv run python -m pytest tests -m "integration" -v --tb=short
continue-on-error: true
```
--------------------------------------------------------------------------------
/docs/backend-services-reference/03-cbioportal.md:
--------------------------------------------------------------------------------
```markdown
# cBioPortal Integration
BioMCP integrates with [cBioPortal](https://www.cbioportal.org/), a comprehensive cancer genomics portal that provides visualization and analysis tools for large-scale cancer genomics datasets.
## Overview
The cBioPortal integration enhances article searches by automatically including relevant cancer genomics data when searching for genes. This integration provides:
1. **Gene-level summaries** - Mutation frequency and distribution across cancer studies
2. **Mutation-specific searches** - Find studies containing specific mutations (e.g., BRAF V600E)
3. **Cancer type resolution** - Accurate cancer type categorization using cBioPortal's API
## How It Works
### Automatic Integration
When you search for articles with a gene parameter, BioMCP automatically queries cBioPortal to provide additional context:
```python
# Basic gene search includes cBioPortal summary
search(domain="article", genes=["BRAF"], diseases=["melanoma"])
```
This returns:
- Standard PubMed/PubTator3 article results
- cBioPortal summary showing mutation frequency across cancer studies
- Top cancer types where the gene is mutated
### Mutation-Specific Searches
To search for specific mutations, include the mutation notation in keywords:
```python
# Search for BRAF V600E mutation
search(domain="article", genes=["BRAF"], keywords=["V600E"])
# Search for SRSF2 F57Y mutation
search(domain="article", genes=["SRSF2"], keywords=["F57Y"])
# Use wildcards for mutation patterns (e.g., any amino acid at position 57)
search(domain="article", genes=["SRSF2"], keywords=["F57*"])
```
Mutation-specific searches return:
- Total number of studies in cBioPortal
- Number of studies containing the mutation
- Top studies ranked by mutation count
- Cancer type distribution
## Example Output
### Gene-Level Summary
```
### cBioPortal Summary for BRAF
- **Mutation Frequency**: 76.7% (368 mutations in 480 samples)
- **Top Cancer Types**: Melanoma (45%), Thyroid (23%), Colorectal (18%)
- **Top Mutations**: V600E (89%), V600K (7%), G469A (2%)
```
### Mutation-Specific Results
```
### cBioPortal Mutation Search: BRAF
**Specific Mutation**: V600E
- **Total Studies**: 2340
- **Studies with Mutation**: 170
- **Total Mutations Found**: 5780
**Top Studies by Mutation Count:**
| Count | Study ID | Cancer Type | Study Name |
|-------|----------|-------------|------------|
| 804 | msk_met_2021 | Mixed Cancer Types | MSK MetTropism (MSK, Cell 2021) |
| 555 | msk_chord_2024 | Mixed Cancer Types | MSK-CHORD (MSK, Nature 2024) |
| 295 | msk_impact_2017 | Mixed Cancer Types | MSK-IMPACT Clinical Sequencing Cohort |
```
## Supported Mutation Notations
The integration recognizes standard protein change notation:
- **Specific mutations**: `V600E`, `F57Y`, `T790M`
- **Wildcard patterns**: `F57*` (matches F57Y, F57L, etc.)
- **Multiple mutations**: Include multiple keywords for OR search
## API Details
### Endpoints Used
1. **Gene Information**: `/api/genes/{gene}`
2. **Cancer Types**: `/api/cancer-types`
3. **Mutation Data**: `/api/mutations/fetch`
4. **Study Information**: `/api/studies`
### Rate Limiting
- Conservative rate limit of 5 requests/second
- Results cached for 15-30 minutes (mutations) or 24 hours (cancer types)
### Authentication
Optional authentication via environment variable:
```bash
export CBIO_TOKEN="your-api-token"
```
Public cBioPortal instance works without authentication but may have rate limits.
## CLI Usage
For detailed command-line options for searching articles with cBioPortal integration, see the [CLI User Guide](../user-guides/01-command-line-interface.md#article-commands).
## Performance Considerations
1. **Caching**: Results are cached to minimize API calls
- Gene summaries: 15 minutes
- Mutation searches: 30 minutes
- Cancer types: 24 hours
2. **Graceful Degradation**: If cBioPortal is unavailable, searches continue without the additional data
3. **Parallel Processing**: API calls are made in parallel with article searches for optimal performance
## Limitations
1. Only works with valid HUGO gene symbols
2. Mutation searches require exact protein change notation
3. Limited to mutations in cBioPortal's curated studies
4. Rate limits may apply for high-volume usage
## Error Handling
The integration handles various error scenarios:
- Invalid gene symbols are validated before API calls
- Network timeouts fall back to article-only results
- API errors are logged but don't block search results
```
--------------------------------------------------------------------------------
/src/biomcp/utils/cancer_types_api.py:
--------------------------------------------------------------------------------
```python
"""Cancer type utilities using cBioPortal API."""
import logging
from ..utils.cbio_http_adapter import CBioHTTPAdapter
from ..utils.request_cache import request_cache
logger = logging.getLogger(__name__)
class CancerTypeAPIClient:
"""Client for fetching cancer types from cBioPortal API."""
def __init__(self):
"""Initialize the cancer type API client."""
self.http_adapter = CBioHTTPAdapter()
# Cache for cancer types
self._cancer_types_cache: dict[str, str] | None = None
@request_cache(ttl=86400) # Cache for 24 hours
async def get_all_cancer_types(self) -> dict[str, str]:
"""Fetch all cancer types from cBioPortal API.
Returns:
Dictionary mapping cancer type IDs to display names
"""
if self._cancer_types_cache is not None:
return self._cancer_types_cache
try:
cancer_types, error = await self.http_adapter.get(
"/cancer-types",
endpoint_key="cbioportal_cancer_types",
cache_ttl=86400, # 24 hours
)
if error:
logger.error(f"Failed to fetch cancer types: {error.message}")
return {}
if cancer_types:
# Build mapping from ID to name
result = {}
for ct in cancer_types:
cancer_type_id = ct.get("cancerTypeId", "")
name = ct.get("name", "")
if cancer_type_id and name:
result[cancer_type_id.lower()] = name
# Also add common abbreviations
short_name = ct.get("shortName", "")
if short_name and short_name != cancer_type_id:
result[short_name.lower()] = name
self._cancer_types_cache = result
logger.info(f"Loaded {len(result)} cancer types from API")
return result
return {}
except Exception as e:
logger.error(f"Error fetching cancer types: {e}")
return {}
async def get_cancer_type_name(self, cancer_type_id: str) -> str:
"""Get the display name for a cancer type ID.
Args:
cancer_type_id: The cancer type identifier
Returns:
Display name or the original ID if not found
"""
if not cancer_type_id:
return "Unknown"
cancer_types = await self.get_all_cancer_types()
# Try exact match (case-insensitive)
normalized_id = cancer_type_id.lower()
if normalized_id in cancer_types:
return cancer_types[normalized_id]
# If not found, return the original ID with title case
if cancer_type_id == cancer_type_id.lower():
return cancer_type_id.title()
return cancer_type_id
@request_cache(ttl=3600) # Cache for 1 hour
async def get_study_cancer_type(self, study_id: str) -> str:
"""Get cancer type for a specific study.
Args:
study_id: The study identifier
Returns:
Cancer type name or "Unknown"
"""
try:
study_data, error = await self.http_adapter.get(
f"/studies/{study_id}",
endpoint_key="cbioportal_studies",
cache_ttl=3600, # 1 hour
)
if error or not study_data:
logger.debug(f"Study {study_id} not found")
return "Unknown"
cancer_type_id = study_data.get("cancerType", {}).get(
"cancerTypeId", ""
)
if cancer_type_id and cancer_type_id != "unknown":
return await self.get_cancer_type_name(cancer_type_id)
# Fallback to the cancer type name directly
cancer_type_name = study_data.get("cancerType", {}).get("name", "")
if cancer_type_name:
return cancer_type_name
return "Unknown"
except Exception as e:
logger.debug(f"Error fetching study {study_id}: {e}")
return "Unknown"
# Global instance for reuse
_cancer_type_client: CancerTypeAPIClient | None = None
def get_cancer_type_client() -> CancerTypeAPIClient:
"""Get or create the global cancer type client."""
global _cancer_type_client
if _cancer_type_client is None:
_cancer_type_client = CancerTypeAPIClient()
return _cancer_type_client
```
--------------------------------------------------------------------------------
/tests/tdd/utils/test_mutation_filter.py:
--------------------------------------------------------------------------------
```python
"""Tests for mutation filter utility."""
from biomcp.utils.mutation_filter import MutationFilter
class MockMutation:
"""Mock mutation object for testing."""
def __init__(self, protein_change: str):
self.protein_change = protein_change
class TestMutationFilter:
"""Test mutation filtering functionality."""
def test_specific_mutation_filter(self):
"""Test filtering for specific mutations."""
mutation_filter = MutationFilter(specific_mutation="V600E")
assert mutation_filter.matches("V600E")
assert not mutation_filter.matches("V600K")
assert not mutation_filter.matches("V600")
assert not mutation_filter.matches("")
def test_wildcard_pattern_filter(self):
"""Test filtering with wildcard patterns."""
mutation_filter = MutationFilter(pattern="V600*")
assert mutation_filter.matches("V600E")
assert mutation_filter.matches("V600K")
assert mutation_filter.matches("V600D")
assert not mutation_filter.matches("V601E")
assert not mutation_filter.matches("K600E")
def test_pattern_without_wildcard(self):
"""Test pattern matching without wildcard."""
# Pattern does exact match via regex (no prefix matching without *)
mutation_filter = MutationFilter(pattern="F57")
# Exact match works
assert mutation_filter.matches("F57")
# No prefix matching without wildcard
assert not mutation_filter.matches("F57Y")
assert not mutation_filter.matches("F57L")
assert not mutation_filter.matches("F58Y")
def test_no_filter(self):
"""Test when no filter is specified."""
mutation_filter = MutationFilter()
assert mutation_filter.matches("V600E")
assert mutation_filter.matches("anything")
# Empty protein change returns False even with no filter
assert not mutation_filter.matches("")
def test_filter_mutations_list(self):
"""Test filtering a list of mutations."""
mutations = [
MockMutation("V600E"),
MockMutation("V600K"),
MockMutation("V600D"),
MockMutation("T790M"),
MockMutation("L858R"),
]
# Test specific mutation
mutation_filter1 = MutationFilter(specific_mutation="V600E")
filtered1 = mutation_filter1.filter_mutations(mutations)
assert len(filtered1) == 1
assert filtered1[0].protein_change == "V600E"
# Test pattern
mutation_filter2 = MutationFilter(pattern="V600*")
filtered2 = mutation_filter2.filter_mutations(mutations)
assert len(filtered2) == 3
assert all(m.protein_change.startswith("V600") for m in filtered2)
# Test no filter
mutation_filter3 = MutationFilter()
filtered3 = mutation_filter3.filter_mutations(mutations)
assert len(filtered3) == 5
def test_string_representations(self):
"""Test string representations of filters."""
mutation_filter1 = MutationFilter(specific_mutation="V600E")
assert str(mutation_filter1) == "MutationFilter(specific=V600E)"
assert (
repr(mutation_filter1)
== "MutationFilter(specific_mutation='V600E', pattern=None)"
)
mutation_filter2 = MutationFilter(pattern="V600*")
assert str(mutation_filter2) == "MutationFilter(pattern=V600*)"
mutation_filter3 = MutationFilter()
assert str(mutation_filter3) == "MutationFilter(no_filter)"
def test_edge_cases(self):
"""Test edge cases in mutation matching."""
# Empty protein change
mutation_filter = MutationFilter(specific_mutation="V600E")
assert not mutation_filter.matches("")
assert not mutation_filter.matches(None)
# Complex patterns
mutation_filter2 = MutationFilter(pattern="[VL]600*")
# This will use regex escaping, so won't work as expected
# But should not crash
assert not mutation_filter2.matches("V600E") # Because [ is escaped
def test_filter_mutations_preserves_type(self):
"""Test that filter preserves the original list type."""
mutations = [
MockMutation("V600E"),
MockMutation("V600K"),
]
mutation_filter = MutationFilter(pattern="V600*")
result = mutation_filter.filter_mutations(mutations)
# Result should be a list
assert isinstance(result, list)
assert len(result) == 2
```
--------------------------------------------------------------------------------
/src/biomcp/integrations/cts_api.py:
--------------------------------------------------------------------------------
```python
"""NCI Clinical Trials Search API integration helper."""
import json
import logging
import os
from typing import Any, Literal
from ..constants import NCI_API_KEY_ENV
from ..http_client import request_api
logger = logging.getLogger(__name__)
class CTSAPIError(Exception):
"""Error raised when CTS API requests fail."""
pass
def _validate_api_key(api_key: str | None) -> str:
"""Validate and return API key."""
if not api_key:
api_key = os.getenv(NCI_API_KEY_ENV)
if not api_key:
raise CTSAPIError(
f"NCI API key required. Please set {NCI_API_KEY_ENV} environment "
"variable or provide api_key parameter.\n"
"Get a free API key at: https://clinicaltrialsapi.cancer.gov"
)
return api_key
def _prepare_request_data(
method: str,
params: dict[str, Any] | None,
json_data: dict[str, Any] | None,
headers: dict[str, str],
) -> dict[str, Any]:
"""Prepare request data based on method."""
if method == "GET":
request_data = params or {}
logger.debug(f"CTS API GET request with params: {params}")
else:
request_data = json_data or {}
if method == "POST":
logger.debug(f"CTS API POST request with data: {json_data}")
# Add headers to request data
if headers:
request_data["_headers"] = json.dumps(headers)
return request_data
def _handle_api_error(error: Any) -> None:
"""Handle API errors with appropriate messages."""
if error.code == 401:
raise CTSAPIError(
f"Invalid API key. Please check your {NCI_API_KEY_ENV} "
"environment variable or api_key parameter."
)
elif error.code == 403:
raise CTSAPIError(
"Access forbidden. Your API key may not have permission "
"to access this resource."
)
else:
raise CTSAPIError(f"CTS API error: {error.message}")
async def make_cts_request(
url: str,
method: Literal["GET", "POST"] = "GET",
params: dict[str, Any] | None = None,
json_data: dict[str, Any] | None = None,
api_key: str | None = None,
) -> dict[str, Any]:
"""
Make a request to the NCI CTS API with proper authentication.
Args:
url: Full URL to the CTS API endpoint
method: HTTP method (GET or POST)
params: Query parameters
json_data: JSON data for POST requests
api_key: Optional API key (if not provided, uses NCI_API_KEY env var)
Returns:
JSON response from the API
Raises:
CTSAPIError: If the request fails or API key is missing
"""
# Validate API key
api_key = _validate_api_key(api_key)
# Prepare headers
headers = {"x-api-key": api_key, "Accept": "application/json"}
try:
# Prepare request data
request_data = _prepare_request_data(
method, params, json_data, headers
)
# Make API request
response, error = await request_api(
url=url,
request=request_data,
method=method,
cache_ttl=0, # Disable caching for NCI API to ensure fresh results
)
# Handle errors
if error:
_handle_api_error(error)
if response is None:
raise CTSAPIError("No response received from NCI CTS API")
return response
except Exception as e:
# Re-raise CTSAPIError as-is
if isinstance(e, CTSAPIError):
raise
# Wrap other exceptions
logger.error(f"CTS API request failed: {e}")
raise CTSAPIError(f"Failed to connect to NCI CTS API: {e!s}") from e
def get_api_key_instructions() -> str:
"""
Get user-friendly instructions for obtaining and setting the API key.
Returns:
Formatted string with instructions
"""
return (
"## NCI Clinical Trials API Key Required\n\n"
"To use NCI's Clinical Trials Search API, you need an API key.\n\n"
"**Option 1: Set environment variable (recommended)**\n"
"```bash\n"
f"export {NCI_API_KEY_ENV}='your-api-key'\n"
"```\n\n"
"**Option 2: Provide via CLI**\n"
"```bash\n"
"biomcp trial search --api-key YOUR_KEY --condition melanoma\n"
"```\n\n"
"**Get your free API key:**\n"
"Visit https://clinicaltrialsapi.cancer.gov\n\n"
"The API key provides access to NCI's comprehensive cancer clinical trials "
"database with advanced search capabilities."
)
```
--------------------------------------------------------------------------------
/tests/tdd/variants/test_alphagenome_api_key.py:
--------------------------------------------------------------------------------
```python
"""Test AlphaGenome per-request API key functionality."""
import os
from unittest.mock import MagicMock, patch
import pandas as pd
import pytest
from biomcp.variants.alphagenome import predict_variant_effects
@pytest.mark.asyncio
async def test_api_key_parameter_overrides_env_var():
"""Test that api_key parameter takes precedence over environment variable."""
# Set up environment variable
with patch.dict("os.environ", {"ALPHAGENOME_API_KEY": "env-key"}):
# Mock AlphaGenome modules
mock_genome = MagicMock()
mock_client = MagicMock()
mock_scorers = MagicMock()
# Mock successful prediction
test_scores_df = pd.DataFrame({
"output_type": ["RNA_SEQ"],
"raw_score": [1.5],
"gene_name": ["BRAF"],
"track_name": [None],
})
# Track which API key was used
api_keys_used = []
def track_create(api_key):
api_keys_used.append(api_key)
mock_model = MagicMock()
mock_model.score_variant.return_value = test_scores_df
return mock_model
mock_client.create.side_effect = track_create
mock_scorers.tidy_scores.return_value = test_scores_df
mock_scorers.get_recommended_scorers.return_value = []
# Create a mock module with the correct attributes
mock_models = MagicMock()
mock_models.dna_client = mock_client
mock_models.variant_scorers = mock_scorers
mock_data = MagicMock()
mock_data.genome = mock_genome
with patch.dict(
"sys.modules",
{
"alphagenome.data": mock_data,
"alphagenome.models": mock_models,
},
):
# Test with parameter API key
result = await predict_variant_effects(
"chr7", 140753336, "A", "T", api_key="param-key"
)
# Verify the parameter key was used, not the env var
assert len(api_keys_used) == 1
assert api_keys_used[0] == "param-key"
assert "BRAF" in result
@pytest.mark.asyncio
async def test_no_api_key_shows_instructions():
"""Test that missing API key shows helpful instructions."""
# Ensure no environment variable is set
with patch.dict("os.environ", {}, clear=True):
# Remove ALPHAGENOME_API_KEY if it exists
os.environ.pop("ALPHAGENOME_API_KEY", None)
result = await predict_variant_effects(
"chr7", 140753336, "A", "T", skip_cache=True
)
# Check for instructions
assert "AlphaGenome API key required" in result
assert "My AlphaGenome API key is" in result
assert "ACTION REQUIRED" in result
assert "https://deepmind.google.com/science/alphagenome" in result
@pytest.mark.asyncio
async def test_env_var_used_when_no_parameter():
"""Test that environment variable is used when no parameter is provided."""
# Set up environment variable
with patch.dict("os.environ", {"ALPHAGENOME_API_KEY": "env-key"}):
# Mock AlphaGenome modules
mock_genome = MagicMock()
mock_client = MagicMock()
mock_scorers = MagicMock()
# Mock successful prediction
test_scores_df = pd.DataFrame({
"output_type": ["RNA_SEQ"],
"raw_score": [1.5],
"gene_name": ["BRAF"],
"track_name": [None],
})
# Track which API key was used
api_keys_used = []
def track_create(api_key):
api_keys_used.append(api_key)
mock_model = MagicMock()
mock_model.score_variant.return_value = test_scores_df
return mock_model
mock_client.create.side_effect = track_create
mock_scorers.tidy_scores.return_value = test_scores_df
mock_scorers.get_recommended_scorers.return_value = []
# Create a mock module with the correct attributes
mock_models = MagicMock()
mock_models.dna_client = mock_client
mock_models.variant_scorers = mock_scorers
mock_data = MagicMock()
mock_data.genome = mock_genome
with patch.dict(
"sys.modules",
{
"alphagenome.data": mock_data,
"alphagenome.models": mock_models,
},
):
# Test without parameter API key
result = await predict_variant_effects("chr7", 140753336, "A", "T")
# Verify the env var key was used
assert len(api_keys_used) == 1
assert api_keys_used[0] == "env-key"
assert "BRAF" in result
```
--------------------------------------------------------------------------------
/src/biomcp/request_batcher.py:
--------------------------------------------------------------------------------
```python
"""Request batching utility for combining multiple small requests.
This module provides a request batcher that accumulates multiple requests
and processes them together in batches, reducing the number of API calls
and improving performance for bulk operations.
Key Features:
- Automatic batching based on size or time threshold
- Configurable batch size and timeout
- Thread-safe request accumulation
- Error propagation to individual requests
Example:
```python
async def batch_api_call(params_list):
# Make a single API call with multiple parameters
return await api.bulk_request(params_list)
batcher = RequestBatcher(
batch_func=batch_api_call,
batch_size=10,
batch_timeout=0.1
)
# Individual requests are automatically batched
result1 = await batcher.request({"id": 1})
result2 = await batcher.request({"id": 2})
```
"""
import asyncio
from collections.abc import Callable, Coroutine
from typing import Any, TypeVar
T = TypeVar("T")
class RequestBatcher:
"""Batches multiple requests together to reduce overhead.
This is particularly useful for APIs that support batch operations
or when network latency dominates over processing time.
The batcher accumulates requests until either:
1. The batch size threshold is reached
2. The batch timeout expires
At which point all accumulated requests are processed together.
"""
def __init__(
self,
batch_func: Callable[[list[Any]], Coroutine[Any, Any, list[Any]]],
batch_size: int = 10,
batch_timeout: float = 0.05, # 50ms
):
"""Initialize the batcher.
Args:
batch_func: Async function that processes a batch of requests
batch_size: Maximum number of requests to batch together
batch_timeout: Maximum time to wait for batch to fill (seconds)
"""
self.batch_func = batch_func
self.batch_size = batch_size
self.batch_timeout = batch_timeout
self.pending_requests: list[tuple[Any, asyncio.Future]] = []
self.batch_task: asyncio.Task | None = None
self._lock = asyncio.Lock()
async def request(self, params: Any) -> Any:
"""Add a request to the batch and wait for result."""
future: asyncio.Future[Any] = asyncio.Future()
async with self._lock:
self.pending_requests.append((params, future))
# Check if we should flush immediately
if len(self.pending_requests) >= self.batch_size:
await self._flush_batch()
elif not self.batch_task or self.batch_task.done():
# Start a timer to flush the batch
self.batch_task = asyncio.create_task(self._batch_timer())
return await future
async def _batch_timer(self):
"""Timer that flushes the batch after timeout."""
await asyncio.sleep(self.batch_timeout)
async with self._lock:
await self._flush_batch()
async def _flush_batch(self):
"""Process all pending requests as a batch."""
if not self.pending_requests:
return
# Extract current batch
batch = self.pending_requests.copy()
self.pending_requests.clear()
# Cancel timer if running
if self.batch_task and not self.batch_task.done():
self.batch_task.cancel()
# Process batch
try:
params_list = [params for params, _ in batch]
results = await self.batch_func(params_list)
# Distribute results to futures
for i, (_, future) in enumerate(batch):
if not future.done():
if i < len(results):
future.set_result(results[i])
else:
future.set_exception(
Exception(f"No result for request at index {i}")
)
except Exception as e:
# Propagate error to all futures
for _, future in batch:
if not future.done():
future.set_exception(e)
# Example usage for autocomplete batching
async def batch_autocomplete_requests(requests: list[dict]) -> list[Any]:
"""Process multiple autocomplete requests in parallel.
This is an example implementation that could be used to batch
autocomplete requests more efficiently.
"""
from .articles.autocomplete import EntityRequest, autocomplete
tasks = []
for req in requests:
entity_req = EntityRequest(**req)
tasks.append(autocomplete(entity_req))
return await asyncio.gather(*tasks)
```
--------------------------------------------------------------------------------
/pyproject.toml:
--------------------------------------------------------------------------------
```toml
[project]
name = "biomcp-python"
version = "0.4.6"
description = "Biomedical Model Context Protocol Server"
authors = [{ name = "Ian Maurer", email = "[email protected]" }]
readme = "README.md"
keywords = ['python']
requires-python = ">=3.10,<4.0"
classifiers = [
"Intended Audience :: Developers",
"Programming Language :: Python",
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.11",
"Programming Language :: Python :: 3.12",
"Programming Language :: Python :: 3.13",
"Topic :: Software Development :: Libraries :: Python Modules",
]
dependencies = [
"certifi>=2025.1.31",
"diskcache>=5.6.3",
"httpx>=0.28.1",
"mcp[cli]>=1.12.3,<2.0.0",
"platformdirs>=4.3.6",
"psutil>=7.0.0",
"pydantic>=2.10.6",
"python-dotenv>=1.0.0",
"rich>=14.0.0",
"typer>=0.15.2",
"uvicorn>=0.34.2",
"alphagenome>=0.1.0",
]
[project.urls]
Homepage = "https://genomoncology.com/biomcp/"
Repository = "https://github.com/genomoncology/biomcp"
Documentation = "https://genomoncology.com/biomcp/"
[dependency-groups]
dev = [
"pytest>=7.2.0",
"pytest-xdist>=3.5.0",
"pre-commit>=2.20.0",
"tox-uv>=1.11.3",
"deptry>=0.22.0",
"mypy>=0.991",
"pytest-cov>=4.0.0",
"pytest-asyncio>=0.24.0",
"ruff>=0.9.2",
"mkdocs>=1.4.2",
"mkdocs-material>=8.5.10",
"mkdocstrings[python]>=0.26.1",
"anyio>=4.8.0",
# "ipython>=9.0.2",
"pytest-bdd>=8.1.0",
"tomlkit>=0.13.2",
"assertpy>=1.1",
"twine>=4.0.0",
"pandas>=2.0.0", # Used for mocking AlphaGenome responses in tests
"PyYAML>=6.0.0", # Used for mkdocs.yml parsing in scripts
"pydantic-ai>=0.0.14", # For testing Pydantic AI integration
]
[project.optional-dependencies]
api = [
]
worker = [
"fastapi>=0.110.0",
"starlette>=0.36.0",
"uvicorn>=0.28.0",
]
[build-system]
requires = ["setuptools >= 61.0"]
build-backend = "setuptools.build_meta"
[tool.setuptools.package-data]
biomcp = ["resources/*.md"]
[project.scripts]
biomcp = "biomcp.__main__:main"
[tool.mypy]
files = ["src"]
ignore_missing_imports = true
disallow_untyped_defs = false
disallow_any_unimported = false
no_implicit_optional = true
check_untyped_defs = false
warn_return_any = false
warn_unused_ignores = true
show_error_codes = true
plugins = [
"pydantic.mypy"
]
disable_error_code = [
"union-attr",
"prop-decorator",
]
[tool.pytest.ini_options]
testpaths = ["tests"]
addopts = "--import-mode=importlib"
asyncio_mode = "auto"
asyncio_default_fixture_loop_scope = "function"
markers = [
"integration: marks tests as integration tests (deselect with '-m \"not integration\"')",
]
filterwarnings = [
# Ignore protobuf version warnings from AlphaGenome
"ignore:Protobuf gencode version.*is exactly one major version older.*:UserWarning",
# Ignore false positive warning from pytest-xdist about coroutines
# This occurs during parallel test execution when mock objects are cleaned up
"ignore:coroutine 'search_trials_unified' was never awaited:RuntimeWarning",
]
[tool.ruff]
target-version = "py310"
line-length = 79
fix = true
unsafe-fixes = true
[tool.ruff.lint]
select = [
# flake8-2020
"YTT",
# flake8-bandit
"S",
# flake8-bugbear
"B",
# flake8-builtins
"A",
# flake8-comprehensions
"C4",
# flake8-debugger
"T10",
# flake8-simplify
"SIM",
# isort
"I",
# mccabe
"C90",
# pycodestyle
"E", "W",
# pyflakes
"F",
# pygrep-hooks
"PGH",
# pyupgrade
"UP",
# ruff
"RUF",
]
ignore = [
# LineTooLong
"E501",
# DoNotAssignLambda
"E731",
# Consider unpacking
"RUF005",
# Union for type annotations
"UP007",
# Asserts are ok when I say they are ok.
"S101",
]
[tool.ruff.lint.per-file-ignores]
"tests/*" = ["S101"]
"__init__.py" = ["I001"]
"src/biomcp/variants/external.py" = ["C901"] # Complex API interactions are acceptable
[tool.ruff.format]
preview = true
[tool.ruff.lint.flake8-bugbear]
extend-immutable-calls = [
"fastapi.Depends",
"fastapi.Query",
"typer.Argument",
"typer.Option",
]
[tool.coverage.report]
skip_empty = true
[tool.coverage.run]
branch = true
source = ["src"]
omit = [
"src/*/__main__.py",
"src/*/server.py",
"src/*/http_client.py",
]
[tool.deptry]
exclude = [
"example_scripts/python_sdk.py",
"venv",
".venv",
".direnv",
"tests",
".git",
"build",
"dist",
"scripts",
"spike",
]
[tool.deptry.per_rule_ignores]
DEP001 = ["alphagenome"] # Optional dependency, must be installed manually
DEP002 = ["uvicorn"]
DEP003 = ["biomcp", "alphagenome"]
```
--------------------------------------------------------------------------------
/tests/integration/test_preprints_integration.py:
--------------------------------------------------------------------------------
```python
"""Integration tests for preprint search functionality."""
import asyncio
import pytest
from biomcp.articles.preprints import (
BiorxivClient,
EuropePMCClient,
PreprintSearcher,
)
from biomcp.articles.search import PubmedRequest
from biomcp.core import PublicationState
class TestBiorxivIntegration:
"""Integration tests for bioRxiv API."""
@pytest.mark.asyncio
async def test_biorxiv_real_search(self):
"""Test real bioRxiv API search."""
client = BiorxivClient()
# Try multiple search terms to find one with results
search_terms = ["cancer", "gene", "cell", "protein", "RNA", "DNA"]
results = []
successful_term = None
for term in search_terms:
results = await client.search(term)
if len(results) > 0:
successful_term = term
break
# If no results with any term, the API might be down or have no recent articles
if len(results) == 0:
pytest.skip(
"No results found with any search term - API may be down or have no matching recent articles"
)
# Check the structure of results
first_result = results[0]
assert first_result.doi is not None
assert first_result.title is not None
assert first_result.publication_state == PublicationState.PREPRINT
assert "preprint" in first_result.journal.lower()
print(
f"Found {len(results)} bioRxiv results for term '{successful_term}'"
)
print(f"First result: {first_result.title}")
class TestEuropePMCIntegration:
"""Integration tests for Europe PMC API."""
@pytest.mark.asyncio
async def test_europe_pmc_real_search(self):
"""Test real Europe PMC API search for preprints."""
client = EuropePMCClient()
# Try multiple search terms to find one with results
search_terms = [
"cancer",
"gene",
"cell",
"protein",
"SARS-CoV-2",
"COVID",
]
results = []
successful_term = None
for term in search_terms:
results = await client.search(term)
if len(results) > 0:
successful_term = term
break
# If no results with any term, the API might be down
if len(results) == 0:
pytest.skip(
"No results found with any search term - Europe PMC API may be down"
)
# Check the structure
first_result = results[0]
assert first_result.title is not None
assert first_result.publication_state == PublicationState.PREPRINT
print(
f"Found {len(results)} Europe PMC preprint results for term '{successful_term}'"
)
print(f"First result: {first_result.title}")
if first_result.doi:
print(f"DOI: {first_result.doi}")
class TestPreprintSearcherIntegration:
"""Integration tests for combined preprint search."""
@pytest.mark.asyncio
async def test_combined_search_real(self):
"""Test searching across both preprint sources."""
searcher = PreprintSearcher()
# Try different search combinations
search_configs = [
{"genes": ["TP53"], "diseases": ["cancer"]},
{"keywords": ["protein", "structure"]},
{"genes": ["BRAF"], "diseases": ["melanoma"]},
{"keywords": ["gene", "expression"]},
]
response = None
successful_config = None
for config in search_configs:
request = PubmedRequest(**config)
response = await searcher.search(request)
if response.count > 0:
successful_config = config
break
print(f"Total results: {response.count if response else 0}")
# Check if we got any results
if response and response.count > 0:
# Check result structure
first = response.results[0]
assert first.title is not None
assert first.publication_state == PublicationState.PREPRINT
print(f"Successful search config: {successful_config}")
print(f"First result: {first.title}")
print(f"Date: {first.date}")
print(f"Journal: {first.journal}")
else:
pytest.skip(
"No results found with any search configuration - APIs may be down"
)
if __name__ == "__main__":
# Run the tests directly
asyncio.run(TestBiorxivIntegration().test_biorxiv_real_search())
print("\n" + "=" * 50 + "\n")
asyncio.run(TestEuropePMCIntegration().test_europe_pmc_real_search())
print("\n" + "=" * 50 + "\n")
asyncio.run(TestPreprintSearcherIntegration().test_combined_search_real())
```
--------------------------------------------------------------------------------
/docs/developer-guides/05-error-handling.md:
--------------------------------------------------------------------------------
```markdown
# Error Handling Guide
## Overview
BioMCP uses a consistent error handling pattern across all HTTP operations. This guide explains the error types, when they occur, and how to handle them.
## Error Structure
All HTTP operations return a tuple: `(data, error)` where one is always `None`.
```python
data, error = await http_client.request_api(...)
if error:
# Handle error
logger.error(f"Request failed: {error.code} - {error.message}")
else:
# Process data
process_result(data)
```
## Error Types
### Network Errors
- **When**: Connection timeout, DNS resolution failure, network unreachable
- **Error Code**: Various HTTP client exceptions
- **Handling**: Retry with exponential backoff or fail gracefully
### HTTP Status Errors
- **When**: Server returns 4xx or 5xx status codes
- **Error Codes**:
- `400-499`: Client errors (bad request, unauthorized, not found)
- `500-599`: Server errors (internal error, service unavailable)
- **Handling**:
- 4xx: Fix request parameters or authentication
- 5xx: Retry with backoff or use cached data
### Circuit Breaker Errors
- **When**: Too many consecutive failures to a domain
- **Error**: Circuit breaker opens to prevent cascading failures
- **Handling**: Wait for recovery timeout or use alternative data source
### Offline Mode Errors
- **When**: `BIOMCP_OFFLINE=true` and no cached data available
- **Error**: Request blocked in offline mode
- **Handling**: Use cached data only or inform user about offline status
### Parse Errors
- **When**: Response is not valid JSON or doesn't match expected schema
- **Error**: JSON decode error or validation error
- **Handling**: Log error and treat as service issue
## Best Practices
### 1. Always Check Errors
```python
# ❌ Bad - ignoring error
data, _ = await http_client.request_api(...)
process(data) # data might be None!
# ✅ Good - checking error
data, error = await http_client.request_api(...)
if error:
logger.warning(f"Failed to fetch data: {error}")
return None
process(data)
```
### 2. Provide Context in Error Messages
```python
# ❌ Bad - generic error
if error:
logger.error("Request failed")
# ✅ Good - contextual error
if error:
logger.error(f"Failed to fetch gene {gene_id} from cBioPortal: {error.message}")
```
### 3. Graceful Degradation
```python
async def get_variant_with_fallback(variant_id: str):
# Try primary source
data, error = await primary_source.get_variant(variant_id)
if not error:
return data
logger.warning(f"Primary source failed: {error}, trying secondary")
# Try secondary source
data, error = await secondary_source.get_variant(variant_id)
if not error:
return data
# Use cached data as last resort
return get_cached_variant(variant_id)
```
### 4. User-Friendly Error Messages
```python
def format_error_for_user(error: RequestError) -> str:
if error.code >= 500:
return "The service is temporarily unavailable. Please try again later."
elif error.code == 404:
return "The requested data was not found."
elif error.code == 401:
return "Authentication required. Please check your credentials."
elif "OFFLINE" in str(error):
return "You are in offline mode. Only cached data is available."
else:
return "An error occurred while fetching data. Please try again."
```
## Testing Error Conditions
### 1. Simulate Network Errors
```python
with patch("biomcp.http_client.call_http") as mock:
mock.side_effect = Exception("Network error")
data, error = await client.fetch_data()
assert error is not None
assert data is None
```
### 2. Test Circuit Breaker
```python
# Simulate multiple failures
for _ in range(5):
with patch("biomcp.http_client.call_http") as mock:
mock.return_value = (500, "Server Error")
await client.fetch_data()
# Circuit should be open
data, error = await client.fetch_data()
assert error is not None
assert "circuit" in error.message.lower()
```
### 3. Test Offline Mode
```python
with patch.dict(os.environ, {"BIOMCP_OFFLINE": "true"}):
data, error = await client.fetch_data()
# Should only return cached data or error
```
## Common Patterns
### Retry with Backoff
The centralized HTTP client automatically retries with exponential backoff for:
- Network errors
- 5xx server errors
- Rate limit errors (429)
### Caching
Failed requests don't overwrite cached data, ensuring availability during outages.
### Rate Limiting
Requests are automatically rate-limited per domain to prevent overwhelming services.
## Debugging
Enable debug logging to see all HTTP requests and errors:
```python
import logging
logging.getLogger("biomcp.http_client").setLevel(logging.DEBUG)
```
This will show:
- All HTTP requests with URLs and methods
- Response status codes and times
- Error details and retry attempts
- Circuit breaker state changes
```
--------------------------------------------------------------------------------
/src/biomcp/openfda/cache.py:
--------------------------------------------------------------------------------
```python
"""
Simple in-memory caching for OpenFDA API responses.
This module provides a time-based cache to reduce API calls and improve performance.
Cache entries expire after a configurable TTL (time-to-live).
"""
import hashlib
import json
import logging
import os
from datetime import datetime, timedelta
from typing import Any
logger = logging.getLogger(__name__)
# Cache configuration
CACHE_TTL_MINUTES = int(os.environ.get("BIOMCP_FDA_CACHE_TTL", "15"))
MAX_CACHE_SIZE = int(os.environ.get("BIOMCP_FDA_MAX_CACHE_SIZE", "100"))
MAX_RESPONSE_SIZE = int(
os.environ.get("BIOMCP_FDA_MAX_RESPONSE_SIZE", str(1024 * 1024))
) # 1MB default
# Global cache dictionary
_cache: dict[str, tuple[Any, datetime]] = {}
def _generate_cache_key(endpoint: str, params: dict[str, Any]) -> str:
"""
Generate a unique cache key for an API request.
Args:
endpoint: The API endpoint URL
params: Query parameters
Returns:
A unique hash key for the request
"""
# Remove sensitive parameters before hashing
safe_params = {
k: v
for k, v in params.items()
if k.lower() not in ["api_key", "apikey", "key", "token", "secret"]
}
# Sort params for consistent hashing
sorted_params = json.dumps(safe_params, sort_keys=True)
combined = f"{endpoint}:{sorted_params}"
# Use SHA256 for cache key
return hashlib.sha256(combined.encode()).hexdigest()
def get_cached_response(
endpoint: str, params: dict[str, Any]
) -> dict[str, Any] | None:
"""
Retrieve a cached response if available and not expired.
Args:
endpoint: The API endpoint URL
params: Query parameters
Returns:
Cached response data or None if not found/expired
"""
cache_key = _generate_cache_key(endpoint, params)
if cache_key in _cache:
data, timestamp = _cache[cache_key]
# Check if cache entry is still valid
age = datetime.now() - timestamp
if age < timedelta(minutes=CACHE_TTL_MINUTES):
logger.debug(
f"Cache hit for {endpoint} (age: {age.total_seconds():.1f}s)"
)
return data
else:
# Remove expired entry
del _cache[cache_key]
logger.debug(f"Cache expired for {endpoint}")
return None
def set_cached_response(
endpoint: str, params: dict[str, Any], response: dict[str, Any]
) -> None:
"""
Store a response in the cache.
Args:
endpoint: The API endpoint URL
params: Query parameters
response: Response data to cache
"""
# Check response size limit
import json
import sys
# Better size estimation using JSON serialization
try:
response_json = json.dumps(response)
response_size = len(response_json.encode("utf-8"))
except (TypeError, ValueError):
# If can't serialize, use sys.getsizeof
response_size = sys.getsizeof(response)
if response_size > MAX_RESPONSE_SIZE:
logger.warning(
f"Response too large to cache: {response_size} bytes > {MAX_RESPONSE_SIZE} bytes"
)
return
# Check cache size limit
if len(_cache) >= MAX_CACHE_SIZE:
# Remove oldest entries (simple FIFO)
oldest_keys = sorted(_cache.keys(), key=lambda k: _cache[k][1])[
: len(_cache) - MAX_CACHE_SIZE + 1
]
for key in oldest_keys:
del _cache[key]
logger.debug(
f"Cache size limit reached, removed {len(oldest_keys)} entries"
)
cache_key = _generate_cache_key(endpoint, params)
_cache[cache_key] = (response, datetime.now())
logger.debug(f"Cached response for {endpoint} (cache size: {len(_cache)})")
def clear_cache() -> None:
"""Clear all cached responses."""
global _cache
size = len(_cache)
_cache = {}
logger.info(f"Cleared FDA cache ({size} entries)")
def get_cache_stats() -> dict[str, Any]:
"""
Get cache statistics.
Returns:
Dictionary with cache statistics
"""
now = datetime.now()
valid_count = 0
total_age = 0.0
for _data, timestamp in _cache.values():
age = (now - timestamp).total_seconds()
if age < CACHE_TTL_MINUTES * 60:
valid_count += 1
total_age += age
avg_age = total_age / valid_count if valid_count > 0 else 0
return {
"total_entries": len(_cache),
"valid_entries": valid_count,
"expired_entries": len(_cache) - valid_count,
"average_age_seconds": avg_age,
"ttl_minutes": CACHE_TTL_MINUTES,
"max_size": MAX_CACHE_SIZE,
}
def is_cacheable_request(endpoint: str, params: dict[str, Any]) -> bool:
"""
Determine if a request should be cached.
Args:
endpoint: The API endpoint URL
params: Query parameters
Returns:
True if the request should be cached
"""
# Don't cache if caching is disabled
if CACHE_TTL_MINUTES <= 0:
return False
# Don't cache very large requests
return params.get("limit", 0) <= 100
```
--------------------------------------------------------------------------------
/tests/tdd/drugs/test_drug_getter.py:
--------------------------------------------------------------------------------
```python
"""Unit tests for drug information retrieval."""
import json
import pytest
from biomcp.drugs.getter import get_drug
class TestDrugGetter:
"""Test drug information retrieval."""
@pytest.fixture
def mock_drug_response(self):
"""Mock drug response from MyChem.info."""
return {
"_id": "CHEMBL941",
"name": "Imatinib",
"drugbank": {
"id": "DB00619",
"name": "Imatinib",
"description": "Imatinib is a tyrosine kinase inhibitor...",
"indication": "Treatment of chronic myeloid leukemia...",
"mechanism_of_action": "Inhibits BCR-ABL tyrosine kinase...",
"products": {"name": ["Gleevec", "Glivec"]},
},
"chembl": {
"molecule_chembl_id": "CHEMBL941",
"pref_name": "IMATINIB",
},
"pubchem": {"cid": 5291},
"chebi": {"id": "CHEBI:45783", "name": "imatinib"},
"inchikey": "KTUFNOKKBVMGRW-UHFFFAOYSA-N",
"formula": "C29H31N7O",
}
@pytest.mark.asyncio
async def test_get_drug_by_name(self, monkeypatch, mock_drug_response):
"""Test getting drug by name."""
# Mock the API call
call_count = 0
responses = [
# Query response
({"hits": [{"_id": "CHEMBL941"}]}, None),
# Get response
(mock_drug_response, None),
]
async def mock_request_api(url, request, method, domain):
nonlocal call_count
result = responses[call_count]
call_count += 1
return result
monkeypatch.setattr("biomcp.http_client.request_api", mock_request_api)
result = await get_drug("imatinib")
assert "## Drug: Imatinib" in result
assert "DrugBank ID**: DB00619" in result
assert "ChEMBL ID**: CHEMBL941" in result
assert "Formula**: C29H31N7O" in result
assert "Trade Names**: Gleevec, Glivec" in result
assert "External Links" in result
assert "DrugBank](https://www.drugbank.ca/drugs/DB00619)" in result
@pytest.mark.asyncio
async def test_get_drug_by_id(self, monkeypatch, mock_drug_response):
"""Test getting drug by DrugBank ID."""
# Mock the API call
async def mock_request_api(url, request, method, domain):
return (mock_drug_response, None)
monkeypatch.setattr("biomcp.http_client.request_api", mock_request_api)
result = await get_drug("DB00619")
assert "## Drug: Imatinib" in result
assert "DrugBank ID**: DB00619" in result
@pytest.mark.asyncio
async def test_get_drug_json_output(self, monkeypatch, mock_drug_response):
"""Test getting drug with JSON output."""
# Mock the API call
async def mock_request_api(url, request, method, domain):
return (mock_drug_response, None)
monkeypatch.setattr("biomcp.http_client.request_api", mock_request_api)
result = await get_drug("DB00619", output_json=True)
data = json.loads(result)
assert data["drug_id"] == "CHEMBL941"
assert data["name"] == "Imatinib"
assert data["drugbank_id"] == "DB00619"
assert (
data["_links"]["DrugBank"]
== "https://www.drugbank.ca/drugs/DB00619"
)
@pytest.mark.asyncio
async def test_drug_not_found(self, monkeypatch):
"""Test drug not found."""
# Mock the API call
async def mock_request_api(url, request, method, domain):
return ({"hits": []}, None)
monkeypatch.setattr("biomcp.http_client.request_api", mock_request_api)
result = await get_drug("INVALID_DRUG_XYZ")
assert "Drug 'INVALID_DRUG_XYZ' not found" in result
@pytest.mark.asyncio
async def test_drug_with_description_truncation(self, monkeypatch):
"""Test drug with long description gets truncated."""
long_desc = "A" * 600
mock_response = {
"_id": "TEST001",
"name": "TestDrug",
"drugbank": {"id": "DB99999", "description": long_desc},
}
async def mock_request_api(url, request, method, domain):
return (mock_response, None)
monkeypatch.setattr("biomcp.http_client.request_api", mock_request_api)
result = await get_drug("DB99999")
assert "Description" in result
assert "A" * 500 in result
assert "..." in result # Truncation indicator
@pytest.mark.asyncio
async def test_drug_error_handling(self, monkeypatch):
"""Test error handling."""
# Mock the API call to raise an exception
async def mock_request_api(url, request, method, domain):
raise Exception("API error")
monkeypatch.setattr("biomcp.http_client.request_api", mock_request_api)
result = await get_drug("imatinib")
# When an exception occurs, it's caught and the drug is reported as not found
assert "Drug 'imatinib' not found in MyChem.info" in result
```
--------------------------------------------------------------------------------
/src/biomcp/drugs/getter.py:
--------------------------------------------------------------------------------
```python
"""Drug information retrieval from MyChem.info."""
import json
import logging
from ..integrations import BioThingsClient
logger = logging.getLogger(__name__)
def _add_drug_links(drug_info, result: dict) -> None:
"""Add external database links for the drug."""
links = {}
if drug_info.drugbank_id:
links["DrugBank"] = (
f"https://www.drugbank.ca/drugs/{drug_info.drugbank_id}"
)
if drug_info.chembl_id:
links["ChEMBL"] = (
f"https://www.ebi.ac.uk/chembl/compound_report_card/{drug_info.chembl_id}/"
)
if drug_info.pubchem_cid:
links["PubChem"] = (
f"https://pubchem.ncbi.nlm.nih.gov/compound/{drug_info.pubchem_cid}"
)
if drug_info.chebi_id:
chebi_id = drug_info.chebi_id.replace("CHEBI:", "")
links["ChEBI"] = (
f"https://www.ebi.ac.uk/chebi/searchId.do?chebiId=CHEBI:{chebi_id}"
)
if links:
result["_links"] = links
def _format_basic_info(drug_info, output_lines: list[str]) -> None:
"""Format basic drug information."""
if drug_info.formula:
output_lines.append(f"- **Formula**: {drug_info.formula}")
if drug_info.drugbank_id:
output_lines.append(f"- **DrugBank ID**: {drug_info.drugbank_id}")
if drug_info.chembl_id:
output_lines.append(f"- **ChEMBL ID**: {drug_info.chembl_id}")
if drug_info.pubchem_cid:
output_lines.append(f"- **PubChem CID**: {drug_info.pubchem_cid}")
if drug_info.chebi_id:
output_lines.append(f"- **ChEBI ID**: {drug_info.chebi_id}")
if drug_info.inchikey:
output_lines.append(f"- **InChIKey**: {drug_info.inchikey}")
def _format_clinical_info(drug_info, output_lines: list[str]) -> None:
"""Format clinical drug information."""
if drug_info.tradename:
names = drug_info.tradename[:5] # Limit to first 5
output_lines.append(f"- **Trade Names**: {', '.join(names)}")
if len(drug_info.tradename) > 5:
output_lines.append(f" (and {len(drug_info.tradename) - 5} more)")
if drug_info.description:
desc = drug_info.description[:500]
if len(drug_info.description) > 500:
desc += "..."
output_lines.append(f"\n### Description\n{desc}")
if drug_info.indication:
ind = drug_info.indication[:500]
if len(drug_info.indication) > 500:
ind += "..."
output_lines.append(f"\n### Indication\n{ind}")
if drug_info.mechanism_of_action:
moa = drug_info.mechanism_of_action[:500]
if len(drug_info.mechanism_of_action) > 500:
moa += "..."
output_lines.append(f"\n### Mechanism of Action\n{moa}")
def _format_drug_output(drug_info, result: dict) -> None:
"""Format drug information for text output."""
output_lines = [f"## Drug: {drug_info.name or 'Unknown'}"]
_format_basic_info(drug_info, output_lines)
_format_clinical_info(drug_info, output_lines)
if result.get("_links"):
output_lines.append("\n### External Links")
for name, url in result["_links"].items():
output_lines.append(f"- [{name}]({url})")
result["_formatted"] = "\n".join(output_lines)
async def get_drug(drug_id_or_name: str, output_json: bool = False) -> str:
"""Get drug information from MyChem.info.
Args:
drug_id_or_name: Drug ID (DrugBank, ChEMBL, etc.) or name
output_json: Return JSON instead of formatted text
Returns:
Formatted drug information or JSON string
"""
try:
client = BioThingsClient()
drug_info = await client.get_drug_info(drug_id_or_name)
if not drug_info:
error_msg = f"Drug '{drug_id_or_name}' not found in MyChem.info"
if output_json:
return json.dumps({"error": error_msg}, indent=2)
return error_msg
# Build result dictionary
result = drug_info.model_dump(by_alias=False, exclude_none=True)
# Add external links
_add_drug_links(drug_info, result)
if output_json:
return json.dumps(result, indent=2)
# Format for text output
_format_drug_output(drug_info, result)
return result["_formatted"]
except Exception as e:
logger.error(f"Error getting drug info: {e}")
error_msg = f"Error retrieving drug information: {e!s}"
if output_json:
return json.dumps({"error": error_msg}, indent=2)
return error_msg
# MCP tool function
async def _drug_details(drug_id_or_name: str) -> str:
"""Get drug/chemical information from MyChem.info.
This tool retrieves comprehensive drug information including:
- Drug identifiers (DrugBank, ChEMBL, PubChem, etc.)
- Chemical properties (formula, InChIKey)
- Trade names and synonyms
- Clinical indications
- Mechanism of action
- Links to external databases
Args:
drug_id_or_name: Drug name (e.g., "aspirin") or ID (e.g., "DB00945", "CHEMBL25")
Returns:
Formatted drug information with external database links
"""
return await get_drug(drug_id_or_name, output_json=False)
```
--------------------------------------------------------------------------------
/docs/getting-started/01-quickstart-cli.md:
--------------------------------------------------------------------------------
```markdown
# Quickstart: BioMCP CLI
Get started with BioMCP in under 5 minutes! This guide walks you through installation and your first biomedical search.
## Prerequisites
- Python 3.10 or higher
- [uv](https://docs.astral.sh/uv/) package manager (recommended) or pip
## Installation
### Option 1: Using uv (Recommended)
```bash
# Install uv if you haven't already
curl -LsSf https://astral.sh/uv/install.sh | sh
# Install BioMCP
uv tool install biomcp-python
```
### Option 2: Using pip
```bash
pip install biomcp-python
```
## Your First Search
Let's search for recent articles about BRAF mutations in melanoma:
```bash
biomcp article search \
--gene BRAF --disease melanoma
```
This command:
- Searches PubMed/PubTator3 for articles
- Filters by BRAF gene and melanoma disease
- Returns the 5 most recent results
- Automatically includes cBioPortal cancer genomics data
- Includes preprints from bioRxiv/medRxiv by default
## Understanding the Output
The search returns:
1. **cBioPortal Summary** (if gene specified): Cancer genomics data showing mutation frequencies and hotspots
2. **Article Results**: Each result includes:
- Title and authors
- Journal and publication date
- PubMed ID and direct link
- Abstract snippet
- Annotated entities (genes, diseases, chemicals)
## Essential Commands
### Search Clinical Trials
Find active trials for lung cancer:
```bash
biomcp trial search \
--condition "lung cancer" \
--status open
```
### Get Gene Information
Retrieve details about the TP53 tumor suppressor:
```bash
biomcp variant search --gene TP53 # get variants for a gene
biomcp article search --gene TP53 # find articles about a gene
```
### Look Up Drug Information
Get details about imatinib (Gleevec):
```bash
biomcp intervention search imatinib
```
### Search for Genetic Variants
Find pathogenic variants in the BRCA1 gene:
```bash
biomcp variant search \
--gene BRCA1 --significance pathogenic
```
### Analyze a Clinically Actionable Variant
Get OncoKB clinical interpretations for known cancer variants. BioMCP uses a demo server for key genes like BRAF out-of-the-box, no setup required!
```bash
# Get clinical actionability for BRAF V600E
biomcp variant search --gene BRAF
```
This will automatically prepend an "OncoKB Gene Summary" table to the search results.
## Next Steps
### Set Up API Keys (Optional but Recommended)
Some features require API keys for enhanced functionality:
```bash
# For NCI clinical trials database
export NCI_API_KEY="your-key-here"
# For AlphaGenome variant predictions
export ALPHAGENOME_API_KEY="your-key-here"
# For additional cBioPortal features
export CBIO_TOKEN="your-token-here"
```
See [Authentication and API Keys](03-authentication-and-api-keys.md) for detailed setup.
### Explore Advanced Features
- **Combine Multiple Filters**:
```bash
biomcp article search \
--gene EGFR --disease "lung cancer" \
--chemical erlotinib
```
- **Use OR Logic in Keywords**:
```bash
biomcp article search --gene BRAF --keyword "V600E|p.V600E|c.1799T>A"
```
- **Exclude Preprints**:
```bash
biomcp article search --gene TP53 --no-preprints
```
### Get Help
View all available commands:
```bash
biomcp --help
```
Get help for a specific command:
```bash
biomcp article search --help
```
## Common Use Cases
### 1. Research a Specific Mutation
```bash
# Find articles about EGFR T790M resistance mutation
biomcp article search --gene EGFR \
--keyword "T790M|p.T790M" \
--disease "lung cancer"
```
### 2. Find Trials for a Patient
```bash
# Active trials for HER2-positive breast cancer
biomcp trial search \
--condition "breast cancer" \
--keyword "HER2 positive" \
--status RECRUITING
```
### 3. Investigate Drug Mechanisms
```bash
# Get information about pembrolizumab
biomcp drug get pembrolizumab
# Find articles about its use in melanoma
biomcp article search --chemical pembrolizumab --disease melanoma
```
## Troubleshooting
### Command Not Found
If `biomcp` is not recognized:
- Ensure your PATH includes the installation directory
- Try running with full path: `~/.local/bin/biomcp`
- Restart your terminal after installation
### No Results Found
If searches return no results:
- Check spelling of gene names (use official symbols)
- Try broader search terms
- Remove filters one by one to identify the constraint
### API Rate Limits
If you encounter rate limit errors:
- Add delays between requests
- Consider setting up API keys for higher limits
- Use the `--limit` parameter to reduce result count
## Next Steps
Now that you've run your first searches, explore these resources:
1. **[Complete CLI Reference](../user-guides/01-command-line-interface.md)** - Comprehensive documentation for all commands and options
2. **[Claude Desktop Integration](02-claude-desktop-integration.md)** - Use BioMCP with AI assistants
3. **[Set up API Keys](03-authentication-and-api-keys.md)** - Enable advanced features with NCI, AlphaGenome, and cBioPortal
4. **[How-to Guides](../how-to-guides/01-find-articles-and-cbioportal-data.md)** - Step-by-step tutorials for complex research workflows
5. **[Deep Researcher Persona](../concepts/02-the-deep-researcher-persona.md)** - Learn about BioMCP's philosophy and methodology
Happy researching! 🧬🔬
```
--------------------------------------------------------------------------------
/docs/backend-services-reference/01-overview.md:
--------------------------------------------------------------------------------
```markdown
# Backend Services Reference Overview
BioMCP integrates with multiple biomedical databases and services to provide comprehensive research capabilities. This reference documents the underlying APIs and their capabilities.
## Service Categories
### Literature and Publications
- **[PubTator3](06-pubtator3.md)**: Biomedical literature with entity annotations
- **Europe PMC**: Preprints from bioRxiv and medRxiv
### Clinical Trials
- **[ClinicalTrials.gov](04-clinicaltrials-gov.md)**: U.S. and international clinical trials registry
- **[NCI CTS API](05-nci-cts-api.md)**: National Cancer Institute's enhanced trial search
### Biomedical Annotations
- **[BioThings Suite](02-biothings-suite.md)**:
- MyGene.info - Gene annotations
- MyVariant.info - Variant annotations
- MyDisease.info - Disease ontology
- MyChem.info - Drug/chemical data
### Cancer Genomics
- **[cBioPortal](03-cbioportal.md)**: Cancer genomics portal with mutation data
- **TCGA**: The Cancer Genome Atlas (via MyVariant.info)
### Variant Effect Prediction
- **[AlphaGenome](07-alphagenome.md)**: Google DeepMind's AI for regulatory predictions
## API Authentication
| Service | Authentication Required | Type | Rate Limits |
| ------------------ | ----------------------- | ------- | ------------------- |
| PubTator3 | No | Public | 3 requests/second |
| ClinicalTrials.gov | No | Public | 50,000 requests/day |
| NCI CTS API | Yes | API Key | 1,000 requests/day |
| BioThings APIs | No | Public | 1,000 requests/hour |
| cBioPortal | Optional | Token | Higher with token |
| AlphaGenome | Yes | API Key | Contact provider |
## Data Flow Architecture
```
User Query → BioMCP Tools → Backend APIs → Unified Response
Example Flow:
1. User: "Find articles about BRAF mutations"
2. BioMCP: article_searcher tool
3. APIs Called:
- PubTator3 (articles)
- cBioPortal (mutation data)
- Europe PMC (preprints)
4. Response: Integrated results with citations
```
## Service Reliability
### Primary Services
- **PubTator3**: 99.9% uptime, updated daily
- **ClinicalTrials.gov**: 99.5% uptime, updated daily
- **BioThings APIs**: 99.9% uptime, real-time data
### Fallback Strategies
- Cache frequently accessed data
- Implement exponential backoff
- Use alternative endpoints when available
## Common Integration Patterns
### 1. Entity Recognition Enhancement
```
PubTator3 → Extract entities → BioThings → Get detailed annotations
```
### 2. Variant to Trial Pipeline
```
MyVariant.info → Get gene → ClinicalTrials.gov → Find relevant trials
```
### 3. Comprehensive Gene Analysis
```
MyGene.info → Basic info
cBioPortal → Cancer mutations
PubTator3 → Literature
AlphaGenome → Predictions
```
## Performance Considerations
### Response Times (typical)
- PubTator3: 200-500ms
- ClinicalTrials.gov: 300-800ms
- BioThings APIs: 100-300ms
- cBioPortal: 200-600ms
- AlphaGenome: 1-3 seconds
### Optimization Strategies
1. **Batch requests** when APIs support it
2. **Cache static data** (gene names, ontologies)
3. **Parallelize independent** API calls
4. **Use pagination** for large result sets
## Error Handling
### Common Error Types
- **Rate Limiting**: 429 errors, implement backoff
- **Invalid Parameters**: 400 errors, validate inputs
- **Service Unavailable**: 503 errors, retry with delay
- **Authentication**: 401 errors, check API keys
### Error Response Format
```json
{
"error": {
"code": "RATE_LIMIT_EXCEEDED",
"message": "API rate limit exceeded",
"retry_after": 3600
}
}
```
## Data Formats
### Input Formats
- **Identifiers**: HGNC symbols, rsIDs, NCT numbers, PMIDs
- **Coordinates**: GRCh38 genomic positions
- **Terms**: MeSH, MONDO, HPO ontologies
### Output Formats
- **JSON**: Primary format for all APIs
- **XML**: Available for some services
- **TSV/CSV**: Export options for bulk data
## Update Frequencies
| Service | Update Frequency | Data Lag |
| ------------------ | ---------------- | ---------- |
| PubTator3 | Daily | 1-2 days |
| ClinicalTrials.gov | Daily | Real-time |
| NCI CTS | Daily | 1 day |
| BioThings | Real-time | Minutes |
| cBioPortal | Quarterly | 3-6 months |
## Best Practices
### 1. API Key Management
- Store keys securely
- Rotate keys periodically
- Monitor usage against limits
### 2. Error Recovery
- Implement retry logic
- Log failed requests
- Provide fallback data
### 3. Data Validation
- Verify gene symbols
- Validate genomic coordinates
- Check identifier formats
### 4. Performance
- Cache when appropriate
- Batch similar requests
- Use appropriate page sizes
## Getting Started
1. Review individual service documentation
2. Obtain necessary API keys
3. Test endpoints with sample data
4. Implement error handling
5. Monitor usage and performance
## Support Resources
- **PubTator3**: [Support Forum](https://www.ncbi.nlm.nih.gov/research/pubtator3/)
- **ClinicalTrials.gov**: [Help Desk](https://clinicaltrials.gov/help)
- **BioThings**: [Documentation](https://docs.biothings.io/)
- **cBioPortal**: [User Guide](https://docs.cbioportal.org/)
- **NCI**: [API Support](https://clinicaltrialsapi.cancer.gov/support)
```
--------------------------------------------------------------------------------
/tests/tdd/test_concurrent_requests.py:
--------------------------------------------------------------------------------
```python
"""Test concurrent request handling in the HTTP client."""
import asyncio
from unittest.mock import AsyncMock, patch
import pytest
from biomcp import http_client
class TestConcurrentRequests:
"""Test concurrent request handling."""
@pytest.mark.asyncio
async def test_concurrent_requests_same_domain(self):
"""Test multiple concurrent requests to the same domain."""
# Use patch instead of direct replacement
with patch(
"biomcp.http_client.call_http", new_callable=AsyncMock
) as mock_call:
# Configure mock to return success
mock_call.return_value = (200, '{"data": "response"}')
# Make 10 concurrent requests with different URLs to avoid caching
# and disable caching explicitly
tasks = [
http_client.request_api(
url=f"https://api.example.com/resource/{i}",
request={},
domain="example",
cache_ttl=0, # Disable caching
)
for i in range(10)
]
results = await asyncio.gather(*tasks)
# All requests should succeed
assert len(results) == 10
for data, error in results:
assert error is None
assert data == {"data": "response"}
# Check that rate limiting was applied
assert mock_call.call_count == 10
@pytest.mark.asyncio
async def test_concurrent_requests_different_domains(self):
"""Test concurrent requests to different domains."""
with patch(
"biomcp.http_client.call_http", new_callable=AsyncMock
) as mock_call:
# Return different responses based on URL
async def side_effect(method, url, *args, **kwargs):
if "domain1" in url:
return (200, '{"source": "domain1"}')
elif "domain2" in url:
return (200, '{"source": "domain2"}')
else:
return (200, '{"source": "other"}')
mock_call.side_effect = side_effect
# Make requests to different domains
tasks = [
http_client.request_api(
"https://domain1.com/api", {}, domain="domain1"
),
http_client.request_api(
"https://domain2.com/api", {}, domain="domain2"
),
http_client.request_api(
"https://domain3.com/api", {}, domain="domain3"
),
]
results = await asyncio.gather(*tasks)
# Check results
assert results[0][0] == {"source": "domain1"}
assert results[1][0] == {"source": "domain2"}
assert results[2][0] == {"source": "other"}
@pytest.mark.asyncio
async def test_concurrent_cache_access(self):
"""Test that concurrent requests properly use cache."""
with patch(
"biomcp.http_client.call_http", new_callable=AsyncMock
) as mock_call:
mock_call.return_value = (200, '{"data": "cached"}')
# First request to populate cache
await http_client.request_api(
url="https://api.example.com/data",
request={},
domain="example",
cache_ttl=60,
)
# Reset call count
initial_calls = mock_call.call_count
# Make 5 concurrent requests to same URL
tasks = [
http_client.request_api(
url="https://api.example.com/data",
request={},
domain="example",
cache_ttl=60,
)
for _ in range(5)
]
results = await asyncio.gather(*tasks)
# All should get cached response
assert len(results) == 5
for data, _error in results:
assert data == {"data": "cached"}
# No additional HTTP calls should have been made
assert mock_call.call_count == initial_calls
@pytest.mark.asyncio
async def test_concurrent_circuit_breaker(self):
"""Test circuit breaker behavior with concurrent failures."""
with patch(
"biomcp.http_client.call_http", new_callable=AsyncMock
) as mock_call:
# Simulate failures
mock_call.return_value = (500, "Internal Server Error")
# Make concurrent failing requests
tasks = [
http_client.request_api(
url=f"https://failing.com/api/{i}",
request={},
domain="failing",
)
for i in range(10)
]
results = await asyncio.gather(*tasks, return_exceptions=True)
# All should fail
error_count = sum(1 for _, error in results if error is not None)
assert error_count == 10
# Circuit should be open now
# Additional requests should fail immediately
_, error = await http_client.request_api(
url="https://failing.com/api/test",
request={},
domain="failing",
)
assert error is not None
# Check that circuit breaker is preventing calls
# (exact behavior depends on implementation details)
```
--------------------------------------------------------------------------------
/tests/tdd/test_connection_pool.py:
--------------------------------------------------------------------------------
```python
"""Tests for connection pool management."""
import asyncio
import ssl
import weakref
from unittest.mock import patch
import httpx
import pytest
from biomcp.connection_pool import (
EventLoopConnectionPools,
close_all_pools,
get_connection_pool,
)
@pytest.fixture
def pool_manager():
"""Create a fresh pool manager for testing."""
return EventLoopConnectionPools()
@pytest.mark.asyncio
async def test_get_pool_creates_new_pool(pool_manager):
"""Test that get_pool creates a new pool when none exists."""
timeout = httpx.Timeout(30)
pool = await pool_manager.get_pool(verify=True, timeout=timeout)
assert pool is not None
assert isinstance(pool, httpx.AsyncClient)
assert not pool.is_closed
@pytest.mark.asyncio
async def test_get_pool_reuses_existing_pool(pool_manager):
"""Test that get_pool reuses existing pools."""
timeout = httpx.Timeout(30)
pool1 = await pool_manager.get_pool(verify=True, timeout=timeout)
pool2 = await pool_manager.get_pool(verify=True, timeout=timeout)
assert pool1 is pool2
@pytest.mark.asyncio
async def test_get_pool_different_verify_settings(pool_manager):
"""Test that different verify settings create different pools."""
timeout = httpx.Timeout(30)
pool1 = await pool_manager.get_pool(verify=True, timeout=timeout)
pool2 = await pool_manager.get_pool(verify=False, timeout=timeout)
assert pool1 is not pool2
@pytest.mark.asyncio
async def test_get_pool_ssl_context(pool_manager):
"""Test pool creation with SSL context."""
ssl_context = ssl.create_default_context()
timeout = httpx.Timeout(30)
pool = await pool_manager.get_pool(verify=ssl_context, timeout=timeout)
assert pool is not None
assert isinstance(pool, httpx.AsyncClient)
@pytest.mark.asyncio
async def test_pool_cleanup_on_close_all(pool_manager):
"""Test that close_all properly closes all pools."""
timeout = httpx.Timeout(30)
await pool_manager.get_pool(verify=True, timeout=timeout)
await pool_manager.get_pool(verify=False, timeout=timeout)
await pool_manager.close_all()
# After close_all, pools should be cleared
assert len(pool_manager._loop_pools) == 0
@pytest.mark.asyncio
async def test_no_event_loop_returns_single_use_client(pool_manager):
"""Test behavior when no event loop is running."""
with patch("asyncio.get_running_loop", side_effect=RuntimeError):
timeout = httpx.Timeout(30)
pool = await pool_manager.get_pool(verify=True, timeout=timeout)
assert pool is not None
# Single-use client should have no keepalive
# Note: httpx client internal structure may vary
@pytest.mark.asyncio
async def test_pool_recreation_after_close(pool_manager):
"""Test that a new pool is created after the old one is closed."""
timeout = httpx.Timeout(30)
pool1 = await pool_manager.get_pool(verify=True, timeout=timeout)
await pool1.aclose()
pool2 = await pool_manager.get_pool(verify=True, timeout=timeout)
assert pool1 is not pool2
assert pool1.is_closed
assert not pool2.is_closed
@pytest.mark.asyncio
async def test_weak_reference_cleanup():
"""Test that weak references are used for event loops."""
pool_manager = EventLoopConnectionPools()
# Verify that the pool manager uses weak references
assert isinstance(pool_manager._loop_pools, weakref.WeakKeyDictionary)
# Create a pool
timeout = httpx.Timeout(30)
pool = await pool_manager.get_pool(verify=True, timeout=timeout)
# Verify pool was created
assert pool is not None
# The current event loop should be in the weak key dict
current_loop = asyncio.get_running_loop()
assert current_loop in pool_manager._loop_pools
@pytest.mark.asyncio
async def test_global_get_connection_pool():
"""Test the global get_connection_pool function."""
with patch.dict("os.environ", {"BIOMCP_USE_CONNECTION_POOL": "true"}):
timeout = httpx.Timeout(30)
pool = await get_connection_pool(verify=True, timeout=timeout)
assert pool is not None
assert isinstance(pool, httpx.AsyncClient)
@pytest.mark.asyncio
async def test_global_close_all_pools():
"""Test the global close_all_pools function."""
# Create some pools
timeout = httpx.Timeout(30)
await get_connection_pool(verify=True, timeout=timeout)
await get_connection_pool(verify=False, timeout=timeout)
# Close all pools
await close_all_pools()
# Verify cleanup (this is implementation-specific)
from biomcp.connection_pool import _pool_manager
assert len(_pool_manager._loop_pools) == 0
@pytest.mark.asyncio
async def test_concurrent_pool_creation(pool_manager):
"""Test thread-safe pool creation under concurrent access."""
timeout = httpx.Timeout(30)
async def get_pool():
return await pool_manager.get_pool(verify=True, timeout=timeout)
# Create 10 concurrent requests for the same pool
pools = await asyncio.gather(*[get_pool() for _ in range(10)])
# All should return the same pool instance
assert all(pool is pools[0] for pool in pools)
@pytest.mark.asyncio
async def test_connection_pool_limits():
"""Test that connection pools have proper limits set."""
pool_manager = EventLoopConnectionPools()
timeout = httpx.Timeout(30)
pool = await pool_manager.get_pool(verify=True, timeout=timeout)
# Verify pool was created (actual limits are internal to httpx)
assert pool is not None
assert isinstance(pool, httpx.AsyncClient)
```
--------------------------------------------------------------------------------
/tests/data/myvariant/variants_part_braf_v600_multiple.json:
--------------------------------------------------------------------------------
```json
[
{
"_id": "chr7:g.140453136A>G",
"_score": 19.419012,
"cadd": {
"_license": "http://bit.ly/2TIuab9",
"phred": 21.2
},
"chrom": "7",
"clinvar": {
"_license": "http://bit.ly/2SQdcI0",
"rcv": {
"clinical_significance": "Likely pathogenic"
},
"variant_id": 376288
},
"cosmic": {
"_license": "http://bit.ly/2VMkY7R",
"cosmic_id": "COSM18443"
},
"dbnsfp": {
"_license": "http://bit.ly/2VLnQBz",
"genename": ["BRAF", "BRAF", "BRAF", "BRAF"],
"hgvsc": ["c.620T>C", "c.1919T>C", "c.1799T>C"],
"hgvsp": ["p.V600A", "p.Val600Ala", "p.Val640Ala", "p.Val207Ala"],
"polyphen2": {
"hdiv": {
"pred": "B",
"score": 0.207
}
}
},
"dbsnp": {
"_license": "http://bit.ly/2AqoLOc",
"rsid": "rs113488022"
},
"vcf": {
"alt": "G",
"position": "140453136",
"ref": "A"
}
},
{
"_id": "chr7:g.140453136A>T",
"_score": 18.693962,
"cadd": {
"_license": "http://bit.ly/2TIuab9",
"phred": 32
},
"chrom": "7",
"civic": {
"_license": "http://bit.ly/2FqS871",
"id": 12,
"openCravatUrl": "https://run.opencravat.org/webapps/variantreport/index.html?alt_base=T&chrom=chr7&pos=140753336&ref_base=A"
},
"clinvar": {
"_license": "http://bit.ly/2SQdcI0",
"rcv": [
{
"clinical_significance": "Pathogenic"
},
{
"clinical_significance": "Pathogenic"
},
{
"clinical_significance": "Pathogenic"
},
{
"clinical_significance": "Pathogenic"
},
{
"clinical_significance": "Pathogenic"
},
{
"clinical_significance": "Pathogenic"
},
{
"clinical_significance": "Pathogenic"
},
{
"clinical_significance": "not provided"
},
{
"clinical_significance": "Likely pathogenic"
},
{
"clinical_significance": "Likely pathogenic"
},
{
"clinical_significance": "Likely pathogenic"
},
{
"clinical_significance": "Likely pathogenic"
},
{
"clinical_significance": "Likely pathogenic"
},
{
"clinical_significance": "Likely pathogenic"
},
{
"clinical_significance": "Likely pathogenic"
},
{
"clinical_significance": "Pathogenic"
},
{
"clinical_significance": "Pathogenic"
},
{
"clinical_significance": "Likely pathogenic"
},
{
"clinical_significance": "Pathogenic"
},
{
"clinical_significance": "Likely pathogenic"
},
{
"clinical_significance": "Likely pathogenic"
},
{
"clinical_significance": "Pathogenic"
},
{
"clinical_significance": "Pathogenic"
},
{
"clinical_significance": "Pathogenic"
},
{
"clinical_significance": "Pathogenic"
},
{
"clinical_significance": "Likely pathogenic"
},
{
"clinical_significance": "Pathogenic"
},
{
"clinical_significance": "Pathogenic"
},
{
"clinical_significance": "Likely pathogenic"
}
],
"variant_id": 13961
},
"cosmic": {
"_license": "http://bit.ly/2VMkY7R",
"cosmic_id": "COSM476"
},
"dbnsfp": {
"_license": "http://bit.ly/2VLnQBz",
"genename": ["BRAF", "BRAF", "BRAF", "BRAF"],
"hgvsc": ["c.620T>A", "c.1919T>A", "c.1799T>A"],
"hgvsp": ["p.Val640Glu", "p.Val207Glu", "p.Val600Glu", "p.V600E"],
"polyphen2": {
"hdiv": {
"pred": "D",
"score": 0.971
}
}
},
"dbsnp": {
"_license": "http://bit.ly/2AqoLOc",
"rsid": "rs113488022"
},
"exac": {
"_license": "http://bit.ly/2H9c4hg",
"af": 1.647e-5
},
"gnomad_exome": {
"_license": "http://bit.ly/2I1cl1I",
"af": {
"af": 3.97994e-6
}
},
"vcf": {
"alt": "T",
"position": "140453136",
"ref": "A"
}
},
{
"_id": "chr7:g.140453136A>C",
"_score": 18.476965,
"cadd": {
"_license": "http://bit.ly/2TIuab9",
"phred": 26.0
},
"chrom": "7",
"clinvar": {
"_license": "http://bit.ly/2SQdcI0",
"rcv": [
{
"clinical_significance": "not provided"
},
{
"clinical_significance": "Pathogenic"
},
{
"clinical_significance": "Pathogenic"
},
{
"clinical_significance": "Uncertain significance"
}
],
"variant_id": 40389
},
"cosmic": {
"_license": "http://bit.ly/2VMkY7R",
"cosmic_id": "COSM6137"
},
"dbnsfp": {
"_license": "http://bit.ly/2VLnQBz",
"genename": ["BRAF", "BRAF", "BRAF", "BRAF"],
"hgvsc": ["c.1919T>G", "c.1799T>G", "c.620T>G"],
"hgvsp": ["p.Val640Gly", "p.Val207Gly", "p.Val600Gly", "p.V600G"],
"polyphen2": {
"hdiv": {
"pred": "P",
"score": 0.822
}
}
},
"dbsnp": {
"_license": "http://bit.ly/2AqoLOc",
"rsid": "rs113488022"
},
"vcf": {
"alt": "C",
"position": "140453136",
"ref": "A"
}
}
]
```
--------------------------------------------------------------------------------
/src/biomcp/http_client_simple.py:
--------------------------------------------------------------------------------
```python
"""Helper functions for simpler HTTP client operations."""
import asyncio
import contextlib
import json
import os
import ssl
import httpx
# Global connection pools per SSL context
_connection_pools: dict[str, httpx.AsyncClient] = {}
_pool_lock = asyncio.Lock()
def close_all_pools():
"""Close all connection pools. Useful for cleanup in tests."""
global _connection_pools
for pool in _connection_pools.values():
if pool and not pool.is_closed:
# Schedule the close in a safe way
try:
# Store task reference to avoid garbage collection
close_task = asyncio.create_task(pool.aclose())
# Optionally add a callback to handle completion
close_task.add_done_callback(lambda t: None)
except RuntimeError:
# If no event loop is running, close synchronously
pool._transport.close()
_connection_pools.clear()
async def get_connection_pool(
verify: ssl.SSLContext | str | bool,
timeout: httpx.Timeout,
) -> httpx.AsyncClient:
"""Get or create a shared connection pool for the given SSL context."""
global _connection_pools
# Create a key for the pool based on verify setting
if isinstance(verify, ssl.SSLContext):
pool_key = f"ssl_{id(verify)}"
else:
pool_key = str(verify)
async with _pool_lock:
pool = _connection_pools.get(pool_key)
if pool is None or pool.is_closed:
# Create a new connection pool with optimized settings
pool = httpx.AsyncClient(
verify=verify,
http2=False, # HTTP/2 can add overhead for simple requests
timeout=timeout,
limits=httpx.Limits(
max_keepalive_connections=20, # Reuse connections
max_connections=100, # Total connection limit
keepalive_expiry=30, # Keep connections alive for 30s
),
# Enable connection pooling
transport=httpx.AsyncHTTPTransport(
retries=0, # We handle retries at a higher level
),
)
_connection_pools[pool_key] = pool
return pool
async def execute_http_request( # noqa: C901
method: str,
url: str,
params: dict,
verify: ssl.SSLContext | str | bool,
headers: dict[str, str] | None = None,
) -> tuple[int, str]:
"""Execute the actual HTTP request using connection pooling.
Args:
method: HTTP method (GET or POST)
url: Target URL
params: Request parameters
verify: SSL verification settings
headers: Optional custom headers
Returns:
Tuple of (status_code, response_text)
Raises:
ConnectionError: For connection failures
TimeoutError: For timeout errors
"""
from .constants import HTTP_TIMEOUT_SECONDS
try:
# Extract custom headers from params if present
custom_headers = headers or {}
if "_headers" in params:
with contextlib.suppress(json.JSONDecodeError, TypeError):
custom_headers.update(json.loads(params.pop("_headers")))
# Use the configured timeout from constants
timeout = httpx.Timeout(HTTP_TIMEOUT_SECONDS)
# Use connection pooling with proper error handling
use_pool = (
os.getenv("BIOMCP_USE_CONNECTION_POOL", "true").lower() == "true"
)
if use_pool:
try:
# Use the new connection pool manager
from ..connection_pool import get_connection_pool as get_pool
client = await get_pool(verify, timeout)
should_close = False
except Exception:
# Fallback to creating a new client
client = httpx.AsyncClient(
verify=verify, http2=False, timeout=timeout
)
should_close = True
else:
# Create a new client for each request
client = httpx.AsyncClient(
verify=verify, http2=False, timeout=timeout
)
should_close = True
try:
# Make the request
if method.upper() == "GET":
resp = await client.get(
url, params=params, headers=custom_headers
)
elif method.upper() == "POST":
resp = await client.post(
url, json=params, headers=custom_headers
)
else:
from .constants import HTTP_ERROR_CODE_UNSUPPORTED_METHOD
return (
HTTP_ERROR_CODE_UNSUPPORTED_METHOD,
f"Unsupported method {method}",
)
# Check for empty response
if not resp.text:
return resp.status_code, "{}"
return resp.status_code, resp.text
finally:
# Only close if we created a new client
if should_close:
await client.aclose()
except httpx.ConnectError as exc:
raise ConnectionError(f"Failed to connect to {url}: {exc}") from exc
except httpx.TimeoutException as exc:
raise TimeoutError(f"Request to {url} timed out: {exc}") from exc
except httpx.HTTPError as exc:
error_msg = str(exc) if str(exc) else "Network connectivity error"
from .constants import HTTP_ERROR_CODE_NETWORK
return HTTP_ERROR_CODE_NETWORK, error_msg
```