This is page 3 of 16. Use http://codebase.md/genomoncology/biomcp?page={x} to view the full context.
# Directory Structure
```
├── .github
│ ├── actions
│ │ └── setup-python-env
│ │ └── action.yml
│ ├── dependabot.yml
│ └── workflows
│ ├── ci.yml
│ ├── deploy-docs.yml
│ ├── main.yml.disabled
│ ├── on-release-main.yml
│ └── validate-codecov-config.yml
├── .gitignore
├── .pre-commit-config.yaml
├── BIOMCP_DATA_FLOW.md
├── CHANGELOG.md
├── CNAME
├── codecov.yaml
├── docker-compose.yml
├── Dockerfile
├── docs
│ ├── apis
│ │ ├── error-codes.md
│ │ ├── overview.md
│ │ └── python-sdk.md
│ ├── assets
│ │ ├── biomcp-cursor-locations.png
│ │ ├── favicon.ico
│ │ ├── icon.png
│ │ ├── logo.png
│ │ ├── mcp_architecture.txt
│ │ └── remote-connection
│ │ ├── 00_connectors.png
│ │ ├── 01_add_custom_connector.png
│ │ ├── 02_connector_enabled.png
│ │ ├── 03_connect_to_biomcp.png
│ │ ├── 04_select_google_oauth.png
│ │ └── 05_success_connect.png
│ ├── backend-services-reference
│ │ ├── 01-overview.md
│ │ ├── 02-biothings-suite.md
│ │ ├── 03-cbioportal.md
│ │ ├── 04-clinicaltrials-gov.md
│ │ ├── 05-nci-cts-api.md
│ │ ├── 06-pubtator3.md
│ │ └── 07-alphagenome.md
│ ├── blog
│ │ ├── ai-assisted-clinical-trial-search-analysis.md
│ │ ├── images
│ │ │ ├── deep-researcher-video.png
│ │ │ ├── researcher-announce.png
│ │ │ ├── researcher-drop-down.png
│ │ │ ├── researcher-prompt.png
│ │ │ ├── trial-search-assistant.png
│ │ │ └── what_is_biomcp_thumbnail.png
│ │ └── researcher-persona-resource.md
│ ├── changelog.md
│ ├── CNAME
│ ├── concepts
│ │ ├── 01-what-is-biomcp.md
│ │ ├── 02-the-deep-researcher-persona.md
│ │ └── 03-sequential-thinking-with-the-think-tool.md
│ ├── developer-guides
│ │ ├── 01-server-deployment.md
│ │ ├── 02-contributing-and-testing.md
│ │ ├── 03-third-party-endpoints.md
│ │ ├── 04-transport-protocol.md
│ │ ├── 05-error-handling.md
│ │ ├── 06-http-client-and-caching.md
│ │ ├── 07-performance-optimizations.md
│ │ └── generate_endpoints.py
│ ├── faq-condensed.md
│ ├── FDA_SECURITY.md
│ ├── genomoncology.md
│ ├── getting-started
│ │ ├── 01-quickstart-cli.md
│ │ ├── 02-claude-desktop-integration.md
│ │ └── 03-authentication-and-api-keys.md
│ ├── how-to-guides
│ │ ├── 01-find-articles-and-cbioportal-data.md
│ │ ├── 02-find-trials-with-nci-and-biothings.md
│ │ ├── 03-get-comprehensive-variant-annotations.md
│ │ ├── 04-predict-variant-effects-with-alphagenome.md
│ │ ├── 05-logging-and-monitoring-with-bigquery.md
│ │ └── 06-search-nci-organizations-and-interventions.md
│ ├── index.md
│ ├── policies.md
│ ├── reference
│ │ ├── architecture-diagrams.md
│ │ ├── quick-architecture.md
│ │ ├── quick-reference.md
│ │ └── visual-architecture.md
│ ├── robots.txt
│ ├── stylesheets
│ │ ├── announcement.css
│ │ └── extra.css
│ ├── troubleshooting.md
│ ├── tutorials
│ │ ├── biothings-prompts.md
│ │ ├── claude-code-biomcp-alphagenome.md
│ │ ├── nci-prompts.md
│ │ ├── openfda-integration.md
│ │ ├── openfda-prompts.md
│ │ ├── pydantic-ai-integration.md
│ │ └── remote-connection.md
│ ├── user-guides
│ │ ├── 01-command-line-interface.md
│ │ ├── 02-mcp-tools-reference.md
│ │ └── 03-integrating-with-ides-and-clients.md
│ └── workflows
│ └── all-workflows.md
├── example_scripts
│ ├── mcp_integration.py
│ └── python_sdk.py
├── glama.json
├── LICENSE
├── lzyank.toml
├── Makefile
├── mkdocs.yml
├── package-lock.json
├── package.json
├── pyproject.toml
├── README.md
├── scripts
│ ├── check_docs_in_mkdocs.py
│ ├── check_http_imports.py
│ └── generate_endpoints_doc.py
├── smithery.yaml
├── src
│ └── biomcp
│ ├── __init__.py
│ ├── __main__.py
│ ├── articles
│ │ ├── __init__.py
│ │ ├── autocomplete.py
│ │ ├── fetch.py
│ │ ├── preprints.py
│ │ ├── search_optimized.py
│ │ ├── search.py
│ │ └── unified.py
│ ├── biomarkers
│ │ ├── __init__.py
│ │ └── search.py
│ ├── cbioportal_helper.py
│ ├── circuit_breaker.py
│ ├── cli
│ │ ├── __init__.py
│ │ ├── articles.py
│ │ ├── biomarkers.py
│ │ ├── diseases.py
│ │ ├── health.py
│ │ ├── interventions.py
│ │ ├── main.py
│ │ ├── openfda.py
│ │ ├── organizations.py
│ │ ├── server.py
│ │ ├── trials.py
│ │ └── variants.py
│ ├── connection_pool.py
│ ├── constants.py
│ ├── core.py
│ ├── diseases
│ │ ├── __init__.py
│ │ ├── getter.py
│ │ └── search.py
│ ├── domain_handlers.py
│ ├── drugs
│ │ ├── __init__.py
│ │ └── getter.py
│ ├── exceptions.py
│ ├── genes
│ │ ├── __init__.py
│ │ └── getter.py
│ ├── http_client_simple.py
│ ├── http_client.py
│ ├── individual_tools.py
│ ├── integrations
│ │ ├── __init__.py
│ │ ├── biothings_client.py
│ │ └── cts_api.py
│ ├── interventions
│ │ ├── __init__.py
│ │ ├── getter.py
│ │ └── search.py
│ ├── logging_filter.py
│ ├── metrics_handler.py
│ ├── metrics.py
│ ├── oncokb_helper.py
│ ├── openfda
│ │ ├── __init__.py
│ │ ├── adverse_events_helpers.py
│ │ ├── adverse_events.py
│ │ ├── cache.py
│ │ ├── constants.py
│ │ ├── device_events_helpers.py
│ │ ├── device_events.py
│ │ ├── drug_approvals.py
│ │ ├── drug_labels_helpers.py
│ │ ├── drug_labels.py
│ │ ├── drug_recalls_helpers.py
│ │ ├── drug_recalls.py
│ │ ├── drug_shortages_detail_helpers.py
│ │ ├── drug_shortages_helpers.py
│ │ ├── drug_shortages.py
│ │ ├── exceptions.py
│ │ ├── input_validation.py
│ │ ├── rate_limiter.py
│ │ ├── utils.py
│ │ └── validation.py
│ ├── organizations
│ │ ├── __init__.py
│ │ ├── getter.py
│ │ └── search.py
│ ├── parameter_parser.py
│ ├── query_parser.py
│ ├── query_router.py
│ ├── rate_limiter.py
│ ├── render.py
│ ├── request_batcher.py
│ ├── resources
│ │ ├── __init__.py
│ │ ├── getter.py
│ │ ├── instructions.md
│ │ └── researcher.md
│ ├── retry.py
│ ├── router_handlers.py
│ ├── router.py
│ ├── shared_context.py
│ ├── thinking
│ │ ├── __init__.py
│ │ ├── sequential.py
│ │ └── session.py
│ ├── thinking_tool.py
│ ├── thinking_tracker.py
│ ├── trials
│ │ ├── __init__.py
│ │ ├── getter.py
│ │ ├── nci_getter.py
│ │ ├── nci_search.py
│ │ └── search.py
│ ├── utils
│ │ ├── __init__.py
│ │ ├── cancer_types_api.py
│ │ ├── cbio_http_adapter.py
│ │ ├── endpoint_registry.py
│ │ ├── gene_validator.py
│ │ ├── metrics.py
│ │ ├── mutation_filter.py
│ │ ├── query_utils.py
│ │ ├── rate_limiter.py
│ │ └── request_cache.py
│ ├── variants
│ │ ├── __init__.py
│ │ ├── alphagenome.py
│ │ ├── cancer_types.py
│ │ ├── cbio_external_client.py
│ │ ├── cbioportal_mutations.py
│ │ ├── cbioportal_search_helpers.py
│ │ ├── cbioportal_search.py
│ │ ├── constants.py
│ │ ├── external.py
│ │ ├── filters.py
│ │ ├── getter.py
│ │ ├── links.py
│ │ ├── oncokb_client.py
│ │ ├── oncokb_models.py
│ │ └── search.py
│ └── workers
│ ├── __init__.py
│ ├── worker_entry_stytch.js
│ ├── worker_entry.js
│ └── worker.py
├── tests
│ ├── bdd
│ │ ├── cli_help
│ │ │ ├── help.feature
│ │ │ └── test_help.py
│ │ ├── conftest.py
│ │ ├── features
│ │ │ └── alphagenome_integration.feature
│ │ ├── fetch_articles
│ │ │ ├── fetch.feature
│ │ │ └── test_fetch.py
│ │ ├── get_trials
│ │ │ ├── get.feature
│ │ │ └── test_get.py
│ │ ├── get_variants
│ │ │ ├── get.feature
│ │ │ └── test_get.py
│ │ ├── search_articles
│ │ │ ├── autocomplete.feature
│ │ │ ├── search.feature
│ │ │ ├── test_autocomplete.py
│ │ │ └── test_search.py
│ │ ├── search_trials
│ │ │ ├── search.feature
│ │ │ └── test_search.py
│ │ ├── search_variants
│ │ │ ├── search.feature
│ │ │ └── test_search.py
│ │ └── steps
│ │ └── test_alphagenome_steps.py
│ ├── config
│ │ └── test_smithery_config.py
│ ├── conftest.py
│ ├── data
│ │ ├── ct_gov
│ │ │ ├── clinical_trials_api_v2.yaml
│ │ │ ├── trials_NCT04280705.json
│ │ │ └── trials_NCT04280705.txt
│ │ ├── myvariant
│ │ │ ├── myvariant_api.yaml
│ │ │ ├── myvariant_field_descriptions.csv
│ │ │ ├── variants_full_braf_v600e.json
│ │ │ ├── variants_full_braf_v600e.txt
│ │ │ └── variants_part_braf_v600_multiple.json
│ │ ├── oncokb_mock_responses.json
│ │ ├── openfda
│ │ │ ├── drugsfda_detail.json
│ │ │ ├── drugsfda_search.json
│ │ │ ├── enforcement_detail.json
│ │ │ └── enforcement_search.json
│ │ └── pubtator
│ │ ├── pubtator_autocomplete.json
│ │ └── pubtator3_paper.txt
│ ├── integration
│ │ ├── test_oncokb_integration.py
│ │ ├── test_openfda_integration.py
│ │ ├── test_preprints_integration.py
│ │ ├── test_simple.py
│ │ └── test_variants_integration.py
│ ├── tdd
│ │ ├── articles
│ │ │ ├── test_autocomplete.py
│ │ │ ├── test_cbioportal_integration.py
│ │ │ ├── test_fetch.py
│ │ │ ├── test_preprints.py
│ │ │ ├── test_search.py
│ │ │ └── test_unified.py
│ │ ├── conftest.py
│ │ ├── drugs
│ │ │ ├── __init__.py
│ │ │ └── test_drug_getter.py
│ │ ├── openfda
│ │ │ ├── __init__.py
│ │ │ ├── test_adverse_events.py
│ │ │ ├── test_device_events.py
│ │ │ ├── test_drug_approvals.py
│ │ │ ├── test_drug_labels.py
│ │ │ ├── test_drug_recalls.py
│ │ │ ├── test_drug_shortages.py
│ │ │ └── test_security.py
│ │ ├── test_biothings_integration_real.py
│ │ ├── test_biothings_integration.py
│ │ ├── test_circuit_breaker.py
│ │ ├── test_concurrent_requests.py
│ │ ├── test_connection_pool.py
│ │ ├── test_domain_handlers.py
│ │ ├── test_drug_approvals.py
│ │ ├── test_drug_recalls.py
│ │ ├── test_drug_shortages.py
│ │ ├── test_endpoint_documentation.py
│ │ ├── test_error_scenarios.py
│ │ ├── test_europe_pmc_fetch.py
│ │ ├── test_mcp_integration.py
│ │ ├── test_mcp_tools.py
│ │ ├── test_metrics.py
│ │ ├── test_nci_integration.py
│ │ ├── test_nci_mcp_tools.py
│ │ ├── test_network_policies.py
│ │ ├── test_offline_mode.py
│ │ ├── test_openfda_unified.py
│ │ ├── test_pten_r173_search.py
│ │ ├── test_render.py
│ │ ├── test_request_batcher.py.disabled
│ │ ├── test_retry.py
│ │ ├── test_router.py
│ │ ├── test_shared_context.py.disabled
│ │ ├── test_unified_biothings.py
│ │ ├── thinking
│ │ │ ├── __init__.py
│ │ │ └── test_sequential.py
│ │ ├── trials
│ │ │ ├── test_backward_compatibility.py
│ │ │ ├── test_getter.py
│ │ │ └── test_search.py
│ │ ├── utils
│ │ │ ├── test_gene_validator.py
│ │ │ ├── test_mutation_filter.py
│ │ │ ├── test_rate_limiter.py
│ │ │ └── test_request_cache.py
│ │ ├── variants
│ │ │ ├── constants.py
│ │ │ ├── test_alphagenome_api_key.py
│ │ │ ├── test_alphagenome_comprehensive.py
│ │ │ ├── test_alphagenome.py
│ │ │ ├── test_cbioportal_mutations.py
│ │ │ ├── test_cbioportal_search.py
│ │ │ ├── test_external_integration.py
│ │ │ ├── test_external.py
│ │ │ ├── test_extract_gene_aa_change.py
│ │ │ ├── test_filters.py
│ │ │ ├── test_getter.py
│ │ │ ├── test_links.py
│ │ │ ├── test_oncokb_client.py
│ │ │ ├── test_oncokb_helper.py
│ │ │ └── test_search.py
│ │ └── workers
│ │ └── test_worker_sanitization.js
│ └── test_pydantic_ai_integration.py
├── THIRD_PARTY_ENDPOINTS.md
├── tox.ini
├── uv.lock
└── wrangler.toml
```
# Files
--------------------------------------------------------------------------------
/src/biomcp/rate_limiter.py:
--------------------------------------------------------------------------------
```python
"""Rate limiting implementation for BioMCP API calls."""
import asyncio
import time
from collections import defaultdict
from contextlib import asynccontextmanager
from .constants import (
DEFAULT_BURST_SIZE,
DEFAULT_RATE_LIMIT_PER_SECOND,
)
from .exceptions import BioMCPError
class RateLimitExceeded(BioMCPError):
"""Raised when rate limit is exceeded."""
def __init__(self, domain: str, limit: int, window: int):
message = f"Rate limit exceeded for {domain}: {limit} requests per {window} seconds"
super().__init__(
message, {"domain": domain, "limit": limit, "window": window}
)
class RateLimiter:
"""Token bucket rate limiter implementation."""
def __init__(
self,
requests_per_second: float = DEFAULT_RATE_LIMIT_PER_SECOND,
burst_size: int = DEFAULT_BURST_SIZE,
):
"""Initialize rate limiter.
Args:
requests_per_second: Sustained request rate
burst_size: Maximum burst capacity
"""
self.rate = requests_per_second
self.burst_size = burst_size
self.tokens = float(burst_size)
self.last_update = time.monotonic()
self._lock = asyncio.Lock()
async def acquire(self, tokens: int = 1) -> None:
"""Acquire tokens from the bucket."""
async with self._lock:
now = time.monotonic()
elapsed = now - self.last_update
self.last_update = now
# Add tokens based on elapsed time
self.tokens = min(
self.burst_size, self.tokens + elapsed * self.rate
)
if self.tokens < tokens:
# Calculate wait time
wait_time = (tokens - self.tokens) / self.rate
await asyncio.sleep(wait_time)
self.tokens = 0
else:
self.tokens -= tokens
@asynccontextmanager
async def limit(self):
"""Context manager for rate limiting."""
await self.acquire()
yield
class DomainRateLimiter:
"""Rate limiter with per-domain limits."""
def __init__(self, default_rps: float = 10.0, default_burst: int = 20):
"""Initialize domain rate limiter.
Args:
default_rps: Default requests per second
default_burst: Default burst size
"""
self.default_rps = default_rps
self.default_burst = default_burst
self.limiters: dict[str, RateLimiter] = {}
self.domain_configs = {
"article": {"rps": 20.0, "burst": 40}, # PubMed can handle more
"trial": {"rps": 10.0, "burst": 20}, # ClinicalTrials.gov standard
"thinking": {"rps": 50.0, "burst": 100}, # Local processing
"mygene": {"rps": 10.0, "burst": 20}, # MyGene.info
"mydisease": {"rps": 10.0, "burst": 20}, # MyDisease.info
"mychem": {"rps": 10.0, "burst": 20}, # MyChem.info
"myvariant": {"rps": 15.0, "burst": 30}, # MyVariant.info
"oncokb": {"rps": 5.0, "burst": 10}, # OncoKB conservative limits
}
def get_limiter(self, domain: str) -> RateLimiter:
"""Get or create rate limiter for domain."""
if domain not in self.limiters:
config = self.domain_configs.get(domain, {})
rps = config.get("rps", self.default_rps)
burst = config.get("burst", self.default_burst)
self.limiters[domain] = RateLimiter(rps, int(burst))
return self.limiters[domain]
@asynccontextmanager
async def limit(self, domain: str):
"""Rate limit context manager for a domain."""
limiter = self.get_limiter(domain)
async with limiter.limit():
yield
class SlidingWindowRateLimiter:
"""Sliding window rate limiter for user/IP based limiting."""
def __init__(self, requests: int = 100, window_seconds: int = 60):
"""Initialize sliding window rate limiter.
Args:
requests: Maximum requests per window
window_seconds: Window size in seconds
"""
self.max_requests = requests
self.window_seconds = window_seconds
self.requests: dict[str, list[float]] = defaultdict(list)
self._lock = asyncio.Lock()
async def check_limit(self, key: str) -> bool:
"""Check if request is allowed for key."""
async with self._lock:
now = time.time()
cutoff = now - self.window_seconds
# Remove old requests
self.requests[key] = [
req_time
for req_time in self.requests[key]
if req_time > cutoff
]
# Check limit
if len(self.requests[key]) >= self.max_requests:
return False
# Add current request
self.requests[key].append(now)
return True
async def acquire(self, key: str) -> None:
"""Acquire permission to make request."""
if not await self.check_limit(key):
raise RateLimitExceeded(
key, self.max_requests, self.window_seconds
)
# Global instances
domain_limiter = DomainRateLimiter()
user_limiter = SlidingWindowRateLimiter(
requests=1000, window_seconds=3600
) # 1000 req/hour
async def rate_limit_domain(domain: str) -> None:
"""Apply rate limiting for a domain."""
async with domain_limiter.limit(domain):
pass
async def rate_limit_user(user_id: str | None = None) -> None:
"""Apply rate limiting for a user."""
if user_id:
await user_limiter.acquire(user_id)
```
--------------------------------------------------------------------------------
/docs/developer-guides/06-http-client-and-caching.md:
--------------------------------------------------------------------------------
```markdown
# BioMCP HTTP Client Guide
## Overview
BioMCP uses a centralized HTTP client for all external API calls. This provides:
- Consistent error handling and retry logic
- Request/response caching
- Rate limiting per domain
- Circuit breaker for fault tolerance
- Offline mode support
- Comprehensive endpoint tracking
## Migration from Direct HTTP Libraries
### Before (Direct httpx usage):
```python
import httpx
async def fetch_gene(gene: str):
async with httpx.AsyncClient() as client:
response = await client.get(f"https://api.example.com/genes/{gene}")
response.raise_for_status()
return response.json()
```
### After (Centralized client):
```python
from biomcp import http_client
async def fetch_gene(gene: str):
data, error = await http_client.request_api(
url=f"https://api.example.com/genes/{gene}",
request={},
domain="example"
)
if error:
# Handle error consistently
return None
return data
```
## Error Handling
The centralized client uses a consistent error handling pattern:
```python
result, error = await http_client.request_api(...)
if error:
# error is a RequestError object with:
# - error.code: HTTP status code or error type
# - error.message: Human-readable error message
# - error.details: Additional context
logger.error(f"Request failed: {error.message}")
return None # or handle appropriately
```
### Error Handling Guidelines
1. **For optional data**: Return `None` when the data is not critical
2. **For required data**: Raise an exception or return an error to the caller
3. **For batch operations**: Collect errors and report at the end
4. **For user-facing operations**: Provide clear, actionable error messages
## Creating Domain-Specific Adapters
For complex APIs, create an adapter class:
```python
from biomcp import http_client
from biomcp.http_client import RequestError
class MyAPIAdapter:
"""Adapter for MyAPI using centralized HTTP client."""
def __init__(self):
self.base_url = "https://api.example.com"
async def get_resource(self, resource_id: str) -> tuple[dict | None, RequestError | None]:
"""Fetch a resource by ID.
Returns:
Tuple of (data, error) where one is always None
"""
return await http_client.request_api(
url=f"{self.base_url}/resources/{resource_id}",
request={},
domain="example",
endpoint_key="example_resources"
)
```
## Configuration
### Cache TTL (Time To Live)
```python
# Cache for 1 hour (3600 seconds)
data, error = await http_client.request_api(
url=url,
request=request,
cache_ttl=3600
)
# Disable caching for this request
data, error = await http_client.request_api(
url=url,
request=request,
cache_ttl=0
)
```
### Rate Limiting
Rate limits are configured per domain in `http_client.py`:
```python
# Default rate limits
rate_limits = {
"ncbi.nlm.nih.gov": 20, # 20 requests/second
"clinicaltrials.gov": 10, # 10 requests/second
"myvariant.info": 1000/3600, # 1000 requests/hour
}
```
### Circuit Breaker
The circuit breaker prevents cascading failures:
- **Closed**: Normal operation
- **Open**: Failing fast after threshold exceeded
- **Half-Open**: Testing if service recovered
Configure thresholds:
```python
CIRCUIT_BREAKER_FAILURE_THRESHOLD = 5 # Open after 5 failures
CIRCUIT_BREAKER_RECOVERY_TIMEOUT = 60 # Try again after 60 seconds
```
## Offline Mode
Enable offline mode to only serve cached responses:
```bash
export BIOMCP_OFFLINE=true
biomcp run
```
In offline mode:
- Only cached responses are returned
- No external HTTP requests are made
- Missing cache entries return None with appropriate error
## Performance Tuning
### Connection Pooling
The HTTP client maintains connection pools per domain:
```python
# Configure in http_client_simple.py
limits = httpx.Limits(
max_keepalive_connections=20,
max_connections=100,
keepalive_expiry=30
)
```
### Concurrent Requests
For parallel requests to the same API:
```python
import asyncio
# Fetch multiple resources concurrently
tasks = [
http_client.request_api(f"/resource/{i}", {}, domain="example")
for i in range(10)
]
results = await asyncio.gather(*tasks)
```
## Monitoring and Debugging
### Request Metrics
The client tracks metrics per endpoint:
- Request count
- Error count
- Cache hit/miss ratio
- Average response time
Access metrics:
```python
from biomcp.http_client import get_metrics
metrics = get_metrics()
```
### Debug Logging
Enable debug logging to see all HTTP requests:
```python
import logging
logging.getLogger("biomcp.http_client").setLevel(logging.DEBUG)
```
## Best Practices
1. **Always use the centralized client** for external HTTP calls
2. **Register new endpoints** in the endpoint registry
3. **Set appropriate cache TTLs** based on data volatility
4. **Handle errors gracefully** with user-friendly messages
5. **Test with offline mode** to ensure cache coverage
6. **Monitor rate limits** to avoid API throttling
7. **Use domain-specific adapters** for complex APIs
## Endpoint Registration
Register new endpoints in `endpoint_registry.py`:
```python
registry.register(
"my_api_endpoint",
EndpointInfo(
url="https://api.example.com/v1/data",
category=EndpointCategory.BIOMEDICAL_LITERATURE,
data_types=[DataType.RESEARCH_ARTICLES],
description="My API for fetching data",
compliance_notes="Public API, no PII",
rate_limit="100 requests/minute"
)
)
```
This ensures the endpoint is documented and tracked properly.
```
--------------------------------------------------------------------------------
/tests/tdd/articles/test_cbioportal_integration.py:
--------------------------------------------------------------------------------
```python
"""Test cBioPortal integration with article searches."""
import json
import pytest
from biomcp.articles.search import PubmedRequest
from biomcp.articles.unified import search_articles_unified
class TestArticleCBioPortalIntegration:
"""Test that cBioPortal summaries appear in article searches."""
@pytest.mark.asyncio
@pytest.mark.integration
async def test_article_search_with_gene_includes_cbioportal(self):
"""Test that searching articles for a gene includes cBioPortal summary."""
request = PubmedRequest(
genes=["BRAF"],
keywords=["melanoma"],
)
# Test markdown output
result = await search_articles_unified(
request,
include_pubmed=True,
include_preprints=False,
output_json=False,
)
# Should include cBioPortal summary
assert "cBioPortal Summary for BRAF" in result
assert "Mutation Frequency" in result
# Top Hotspots is only included when mutations are found
# When cBioPortal API returns empty data, it won't be present
if "0.0%" not in result: # If mutation frequency is not 0
assert "Top Hotspots" in result
assert "---" in result # Separator between summary and articles
# Should still include article results
assert "pmid" in result or "Title" in result or "Record" in result
@pytest.mark.asyncio
@pytest.mark.integration
async def test_article_search_json_with_gene(self):
"""Test JSON output includes cBioPortal summary."""
request = PubmedRequest(
genes=["TP53"],
keywords=["cancer"],
)
result = await search_articles_unified(
request,
include_pubmed=True,
include_preprints=False,
output_json=True,
)
# Parse JSON
data = json.loads(result)
# Should have both summary and articles
assert "cbioportal_summary" in data
assert "articles" in data
assert "TP53" in data["cbioportal_summary"]
assert isinstance(data["articles"], list)
assert len(data["articles"]) > 0
@pytest.mark.asyncio
@pytest.mark.integration
async def test_article_search_without_gene_no_cbioportal(self):
"""Test that searches without genes don't include cBioPortal summary."""
request = PubmedRequest(
diseases=["hypertension"],
keywords=["treatment"],
)
# Test markdown output
result = await search_articles_unified(
request,
include_pubmed=True,
include_preprints=False,
output_json=False,
)
# Should NOT include cBioPortal summary
assert "cBioPortal Summary" not in result
assert "Mutation Frequency" not in result
@pytest.mark.asyncio
@pytest.mark.integration
async def test_article_search_multiple_genes(self):
"""Test that searching with multiple genes uses the first one."""
request = PubmedRequest(
genes=["KRAS", "NRAS", "BRAF"],
diseases=["colorectal cancer"],
)
result = await search_articles_unified(
request,
include_pubmed=True,
include_preprints=False,
output_json=False,
)
# Should include cBioPortal summary for KRAS (first gene)
assert "cBioPortal Summary for KRAS" in result
# Common KRAS hotspot
assert "G12" in result or "mutation" in result
@pytest.mark.asyncio
@pytest.mark.integration
async def test_article_search_with_invalid_gene(self):
"""Test graceful handling of invalid gene names."""
request = PubmedRequest(
genes=["BRCA1"], # Valid gene
keywords=["cancer"],
)
# First check that we handle invalid genes gracefully
# by using a real gene that might have cBioPortal data
result = await search_articles_unified(
request,
include_pubmed=True,
include_preprints=False,
output_json=False,
)
# Should have some content - either cBioPortal summary or articles
assert len(result) > 50 # Some reasonable content
# Now test with a gene that's valid for search but not in cBioPortal
request2 = PubmedRequest(
genes=["ACE2"], # Real gene but might not be in cancer studies
keywords=["COVID-19"],
)
result2 = await search_articles_unified(
request2,
include_pubmed=True,
include_preprints=False,
output_json=False,
)
# Should return results even if cBioPortal data is not available
assert len(result2) > 50
@pytest.mark.asyncio
@pytest.mark.integration
async def test_article_search_with_preprints_and_cbioportal(self):
"""Test that cBioPortal summary works with preprint searches too."""
request = PubmedRequest(
genes=["EGFR"],
keywords=["lung cancer", "osimertinib"],
)
result = await search_articles_unified(
request,
include_pubmed=True,
include_preprints=True,
output_json=False,
)
# Should include cBioPortal summary
assert "cBioPortal Summary for EGFR" in result
# Should include both peer-reviewed and preprint results
assert ("pmid" in result or "Title" in result) and (
"Preprint" in result
or "bioRxiv" in result
or "peer_reviewed" in result
)
```
--------------------------------------------------------------------------------
/src/biomcp/diseases/getter.py:
--------------------------------------------------------------------------------
```python
"""Disease information retrieval from MyDisease.info."""
import json
import logging
from typing import Annotated
from pydantic import Field
from ..integrations import BioThingsClient
from ..render import to_markdown
logger = logging.getLogger(__name__)
def _add_disease_links(disease_info, result: dict) -> None:
"""Add helpful links to disease result."""
links = {}
# Add MONDO browser link if available
if (
disease_info.mondo
and isinstance(disease_info.mondo, dict)
and (mondo_id := disease_info.mondo.get("mondo"))
and isinstance(mondo_id, str)
and mondo_id.startswith("MONDO:")
):
links["MONDO Browser"] = (
f"https://www.ebi.ac.uk/ols/ontologies/mondo/terms?iri=http://purl.obolibrary.org/obo/{mondo_id.replace(':', '_')}"
)
# Add Disease Ontology link if available
if (
disease_info.xrefs
and isinstance(disease_info.xrefs, dict)
and (doid := disease_info.xrefs.get("doid"))
):
if isinstance(doid, list) and doid:
doid_id = doid[0] if isinstance(doid[0], str) else str(doid[0])
links["Disease Ontology"] = (
f"https://www.disease-ontology.org/?id={doid_id}"
)
elif isinstance(doid, str):
links["Disease Ontology"] = (
f"https://www.disease-ontology.org/?id={doid}"
)
# Add PubMed search link
if disease_info.name:
links["PubMed Search"] = (
f"https://pubmed.ncbi.nlm.nih.gov/?term={disease_info.name.replace(' ', '+')}"
)
if links:
result["_links"] = links
def _format_disease_output(disease_info, result: dict) -> None:
"""Format disease output for display."""
# Format synonyms nicely
if disease_info.synonyms:
result["synonyms"] = ", ".join(
disease_info.synonyms[:10]
) # Limit to first 10
if len(disease_info.synonyms) > 10:
result["synonyms"] += (
f" (and {len(disease_info.synonyms) - 10} more)"
)
# Format phenotypes if present
if disease_info.phenotypes:
# Just show count and first few phenotypes
phenotype_names = []
for pheno in disease_info.phenotypes[:5]:
if isinstance(pheno, dict) and "phenotype" in pheno:
phenotype_names.append(pheno["phenotype"])
if phenotype_names:
result["associated_phenotypes"] = ", ".join(phenotype_names)
if len(disease_info.phenotypes) > 5:
result["associated_phenotypes"] += (
f" (and {len(disease_info.phenotypes) - 5} more)"
)
# Remove the raw phenotypes data for cleaner output
result.pop("phenotypes", None)
async def get_disease(
disease_id_or_name: str,
output_json: bool = False,
) -> str:
"""
Get disease information from MyDisease.info.
Args:
disease_id_or_name: Disease ID (MONDO, DOID) or name (e.g., "melanoma", "MONDO:0016575")
output_json: Return as JSON instead of markdown
Returns:
Disease information as markdown or JSON string
"""
client = BioThingsClient()
try:
disease_info = await client.get_disease_info(disease_id_or_name)
if not disease_info:
error_data = {
"error": f"Disease '{disease_id_or_name}' not found",
"suggestion": "Please check the disease name or ID (MONDO:, DOID:, OMIM:, MESH:)",
}
return (
json.dumps(error_data, indent=2)
if output_json
else to_markdown([error_data])
)
# Convert to dict for rendering
result = disease_info.model_dump(exclude_none=True)
# Add helpful links
_add_disease_links(disease_info, result)
# Format output for display
_format_disease_output(disease_info, result)
if output_json:
return json.dumps(result, indent=2)
else:
return to_markdown([result])
except Exception as e:
logger.error(
f"Error fetching disease info for {disease_id_or_name}: {e}"
)
error_data = {
"error": "Failed to retrieve disease information",
"details": str(e),
}
return (
json.dumps(error_data, indent=2)
if output_json
else to_markdown([error_data])
)
async def _disease_details(
call_benefit: Annotated[
str,
"Define and summarize why this function is being called and the intended benefit",
],
disease_id_or_name: Annotated[
str,
Field(
description="Disease name (e.g., melanoma, GIST) or ID (e.g., MONDO:0016575, DOID:1909)"
),
],
) -> str:
"""
Retrieves detailed information for a disease from MyDisease.info.
This tool provides real-time disease annotations including:
- Official disease name and definition
- Disease synonyms and alternative names
- Ontology mappings (MONDO, DOID, OMIM, etc.)
- Associated phenotypes
- Links to disease databases
Parameters:
- call_benefit: Define why this function is being called
- disease_id_or_name: Disease name or ontology ID
Process: Queries MyDisease.info API for up-to-date disease information
Output: Markdown formatted disease information with definition and metadata
Note: For clinical trials about diseases, use trial_searcher. For articles about diseases, use article_searcher.
"""
return await get_disease(disease_id_or_name, output_json=False)
```
--------------------------------------------------------------------------------
/src/biomcp/connection_pool.py:
--------------------------------------------------------------------------------
```python
"""Connection pool manager with proper event loop lifecycle management.
This module provides HTTP connection pooling that is properly integrated
with asyncio event loops. It ensures that connection pools are:
- Created per event loop to avoid cross-loop usage
- Automatically cleaned up when event loops are garbage collected
- Reused across requests for better performance
Key Features:
- Event loop isolation - each loop gets its own pools
- Weak references prevent memory leaks
- Automatic cleanup on loop destruction
- Thread-safe pool management
Example:
```python
# Get a connection pool for the current event loop
pool = await get_connection_pool(verify=True, timeout=httpx.Timeout(30))
# Use the pool for multiple requests (no need to close)
response = await pool.get("https://api.example.com/data")
```
Environment Variables:
BIOMCP_USE_CONNECTION_POOL: Enable/disable pooling (default: "true")
"""
import asyncio
import ssl
import weakref
# NOTE: httpx import is allowed in this file for connection pooling infrastructure
import httpx
class EventLoopConnectionPools:
"""Manages connection pools per event loop.
This class ensures that each asyncio event loop has its own set of
connection pools, preventing cross-loop contamination and ensuring
proper cleanup when event loops are destroyed.
Attributes:
_loop_pools: Weak key dictionary mapping event loops to their pools
_lock: Asyncio lock for thread-safe pool creation
"""
def __init__(self):
# Use weak references to avoid keeping event loops alive
self._loop_pools: weakref.WeakKeyDictionary = (
weakref.WeakKeyDictionary()
)
self._lock = asyncio.Lock()
async def get_pool(
self, verify: ssl.SSLContext | str | bool, timeout: httpx.Timeout
) -> httpx.AsyncClient:
"""Get or create a connection pool for the current event loop."""
try:
loop = asyncio.get_running_loop()
except RuntimeError:
# No event loop running, return a single-use client
return self._create_client(verify, timeout, pooled=False)
# Get or create pools dict for this event loop
async with self._lock:
if loop not in self._loop_pools:
self._loop_pools[loop] = {}
# Register cleanup when loop is garbage collected
self._register_loop_cleanup(loop)
pools = self._loop_pools[loop]
pool_key = self._get_pool_key(verify)
# Check if we have a valid pool
if pool_key in pools and not pools[pool_key].is_closed:
return pools[pool_key]
# Create new pool
client = self._create_client(verify, timeout, pooled=True)
pools[pool_key] = client
return client
def _get_pool_key(self, verify: ssl.SSLContext | str | bool) -> str:
"""Generate a key for the connection pool."""
if isinstance(verify, ssl.SSLContext):
return f"ssl_{id(verify)}"
return str(verify)
def _create_client(
self,
verify: ssl.SSLContext | str | bool,
timeout: httpx.Timeout,
pooled: bool = True,
) -> httpx.AsyncClient:
"""Create a new HTTP client."""
if pooled:
limits = httpx.Limits(
max_keepalive_connections=20,
max_connections=100,
keepalive_expiry=30,
)
else:
# Single-use client
limits = httpx.Limits(max_keepalive_connections=0)
return httpx.AsyncClient(
verify=verify,
http2=False, # HTTP/2 can add overhead
timeout=timeout,
limits=limits,
)
def _register_loop_cleanup(self, loop: asyncio.AbstractEventLoop):
"""Register cleanup when event loop is garbage collected."""
# Store pools to close when loop is garbage collected
# Note: We can't create weak references to dicts, so we'll
# clean up pools when the loop itself is garbage collected
def cleanup():
# Get pools for this loop if they still exist
pools = self._loop_pools.get(loop, {})
if pools:
# Try to close all clients gracefully
for client in list(pools.values()):
if client and not client.is_closed:
# Close synchronously since loop might be gone
import contextlib
with contextlib.suppress(Exception):
client._transport.close()
# Register finalizer on the loop itself
weakref.finalize(loop, cleanup)
async def close_all(self):
"""Close all connection pools."""
async with self._lock:
all_clients = []
for pools in self._loop_pools.values():
all_clients.extend(pools.values())
# Close all clients
close_tasks = []
for client in all_clients:
if client and not client.is_closed:
close_tasks.append(client.aclose())
if close_tasks:
await asyncio.gather(*close_tasks, return_exceptions=True)
self._loop_pools.clear()
# Global instance
_pool_manager = EventLoopConnectionPools()
async def get_connection_pool(
verify: ssl.SSLContext | str | bool,
timeout: httpx.Timeout,
) -> httpx.AsyncClient:
"""Get a connection pool for the current event loop."""
return await _pool_manager.get_pool(verify, timeout)
async def close_all_pools():
"""Close all connection pools."""
await _pool_manager.close_all()
```
--------------------------------------------------------------------------------
/src/biomcp/parameter_parser.py:
--------------------------------------------------------------------------------
```python
"""Parameter parsing and validation for BioMCP."""
import json
import logging
from typing import Any
from biomcp.exceptions import InvalidParameterError
logger = logging.getLogger(__name__)
class ParameterParser:
"""Handles parameter parsing and validation for search requests."""
@staticmethod
def parse_list_param(
param: str | list[str] | None, param_name: str
) -> list[str] | None:
"""Convert various input formats to lists.
Handles:
- JSON arrays: '["item1", "item2"]' -> ['item1', 'item2']
- Comma-separated: 'item1, item2' -> ['item1', 'item2']
- Single values: 'item' -> ['item']
- None values: None -> None
- Already parsed lists: ['item'] -> ['item']
Args:
param: The parameter to parse
param_name: Name of the parameter for error messages
Returns:
Parsed list or None
Raises:
InvalidParameterError: If parameter cannot be parsed
"""
if param is None:
return None
if isinstance(param, str):
# First try to parse as JSON array
if param.startswith("["):
try:
parsed = json.loads(param)
if not isinstance(parsed, list):
raise InvalidParameterError(
param_name,
param,
"JSON array or comma-separated string",
)
return parsed
except (json.JSONDecodeError, TypeError) as e:
logger.debug(f"Failed to parse {param_name} as JSON: {e}")
# If it's a comma-separated string, split it
if "," in param:
return [item.strip() for item in param.split(",")]
# Otherwise return as single-item list
return [param]
# If it's already a list, validate and return as-is
if isinstance(param, list):
# Validate all items are strings
if not all(isinstance(item, str) for item in param):
raise InvalidParameterError(
param_name, param, "list of strings"
)
return param
# Invalid type
raise InvalidParameterError(
param_name, param, "string, list of strings, or None"
)
@staticmethod
def normalize_phase(phase: str | None) -> str | None:
"""Normalize phase values for clinical trials.
Converts various formats to standard enum values:
- "Phase 3" -> "PHASE3"
- "phase 3" -> "PHASE3"
- "PHASE 3" -> "PHASE3"
- "phase3" -> "PHASE3"
Args:
phase: Phase value to normalize
Returns:
Normalized phase value or None
"""
if phase is None:
return None
# Convert to uppercase and remove spaces
normalized = phase.upper().replace(" ", "")
# Validate it matches expected pattern
valid_phases = [
"EARLYPHASE1",
"PHASE1",
"PHASE2",
"PHASE3",
"PHASE4",
"NOTAPPLICABLE",
]
if normalized not in valid_phases:
# Try to be helpful with common mistakes
if "EARLY" in normalized and "1" in normalized:
return "EARLYPHASE1"
if "NOT" in normalized and "APPLICABLE" in normalized:
return "NOTAPPLICABLE"
raise InvalidParameterError(
"phase", phase, f"one of: {', '.join(valid_phases)}"
)
return normalized
@staticmethod
def validate_page_params(page: int, page_size: int) -> tuple[int, int]:
"""Validate pagination parameters.
Args:
page: Page number (minimum 1)
page_size: Results per page (1-100)
Returns:
Validated (page, page_size) tuple
Raises:
InvalidParameterError: If parameters are invalid
"""
if page < 1:
raise InvalidParameterError("page", page, "integer >= 1")
if page_size < 1 or page_size > 100:
raise InvalidParameterError(
"page_size", page_size, "integer between 1 and 100"
)
return page, page_size
@staticmethod
def parse_search_params(
params: dict[str, Any], domain: str
) -> dict[str, Any]:
"""Parse and validate all search parameters for a domain.
Args:
params: Raw parameters dictionary
domain: Domain being searched
Returns:
Validated parameters dictionary
"""
parsed: dict[str, Any] = {}
# Common list parameters
list_params = [
"genes",
"diseases",
"variants",
"chemicals",
"keywords",
"conditions",
"interventions",
]
for param_name in list_params:
if param_name in params and params[param_name] is not None:
parsed[param_name] = ParameterParser.parse_list_param(
params[param_name], param_name
)
# Domain-specific parameters
if (
domain == "trial"
and "phase" in params
and params.get("phase") is not None
):
parsed["phase"] = ParameterParser.normalize_phase(
params.get("phase")
)
# Pass through other parameters
for key, value in params.items():
if key not in parsed and key not in list_params and key != "phase":
parsed[key] = value
return parsed
```
--------------------------------------------------------------------------------
/src/biomcp/openfda/drug_labels.py:
--------------------------------------------------------------------------------
```python
"""
OpenFDA Drug Labels (SPL) integration.
"""
import logging
from .constants import (
OPENFDA_DEFAULT_LIMIT,
OPENFDA_DISCLAIMER,
OPENFDA_DRUG_LABELS_URL,
OPENFDA_MAX_LIMIT,
)
from .drug_labels_helpers import (
build_label_search_query,
format_label_header,
format_label_section,
format_label_summary,
get_default_sections,
get_section_titles,
)
from .utils import clean_text, format_count, make_openfda_request
logger = logging.getLogger(__name__)
async def search_drug_labels(
name: str | None = None,
indication: str | None = None,
boxed_warning: bool = False,
section: str | None = None,
limit: int = OPENFDA_DEFAULT_LIMIT,
skip: int = 0,
api_key: str | None = None,
) -> str:
"""
Search FDA drug product labels (SPL).
Args:
name: Drug name to search for
indication: Search for drugs indicated for this condition
boxed_warning: Filter for drugs with boxed warnings
section: Specific label section to search
limit: Maximum number of results
skip: Number of results to skip
api_key: Optional OpenFDA API key (overrides OPENFDA_API_KEY env var)
Returns:
Formatted string with drug label information
"""
if not name and not indication and not section and not boxed_warning:
return (
"⚠️ Please specify a drug name, indication, or label section to search.\n\n"
"Examples:\n"
"- Search by name: --name 'pembrolizumab'\n"
"- Search by indication: --indication 'melanoma'\n"
"- Search by section: --section 'contraindications'"
)
# Build and execute search
search_query = build_label_search_query(
name, indication, boxed_warning, section
)
params = {
"search": search_query,
"limit": min(limit, OPENFDA_MAX_LIMIT),
"skip": skip,
}
response, error = await make_openfda_request(
OPENFDA_DRUG_LABELS_URL, params, "openfda_drug_labels", api_key
)
if error:
return f"⚠️ Error searching drug labels: {error}"
if not response or not response.get("results"):
return _format_no_results(name, indication, section)
results = response["results"]
total = (
response.get("meta", {}).get("results", {}).get("total", len(results))
)
# Build output
output = ["## FDA Drug Labels\n"]
output.extend(_format_search_summary(name, indication, section, total))
# Display results
output.append(
f"### Results (showing {min(len(results), 5)} of {total}):\n"
)
for i, result in enumerate(results[:5], 1):
output.extend(format_label_summary(result, i))
# Add tip for getting full labels
if total > 0 and results and "set_id" in results[0]:
output.append(
"\n💡 **Tip**: Use `biomcp openfda label-get <label_id>` to retrieve "
"the complete label for any drug."
)
output.append(f"\n{OPENFDA_DISCLAIMER}")
return "\n".join(output)
def _format_no_results(
name: str | None, indication: str | None, section: str | None
) -> str:
"""Format no results message."""
search_desc = []
if name:
search_desc.append(f"drug '{name}'")
if indication:
search_desc.append(f"indication '{indication}'")
if section:
search_desc.append(f"section '{section}'")
return f"No drug labels found for {' and '.join(search_desc)}."
def _format_search_summary(
name: str | None, indication: str | None, section: str | None, total: int
) -> list[str]:
"""Format the search summary."""
output = []
search_desc = []
if name:
search_desc.append(f"**Drug**: {name}")
if indication:
search_desc.append(f"**Indication**: {indication}")
if section:
search_desc.append(f"**Section**: {section}")
if search_desc:
output.append(" | ".join(search_desc))
output.append(f"**Total Labels Found**: {format_count(total, 'label')}\n")
return output
async def get_drug_label(
set_id: str,
sections: list[str] | None = None,
api_key: str | None = None,
) -> str:
"""
Get detailed drug label information by set ID.
Args:
set_id: Label set ID
sections: Specific sections to retrieve (default: key sections)
api_key: Optional OpenFDA API key (overrides OPENFDA_API_KEY env var)
Returns:
Formatted string with detailed label information
"""
params = {
"search": f'set_id:"{set_id}"',
"limit": 1,
}
response, error = await make_openfda_request(
OPENFDA_DRUG_LABELS_URL, params, "openfda_drug_label_detail", api_key
)
if error:
return f"⚠️ Error retrieving drug label: {error}"
if not response or not response.get("results"):
return f"Drug label with ID '{set_id}' not found."
result = response["results"][0]
# Use default sections if not specified
if not sections:
sections = get_default_sections()
# Build output
output = format_label_header(result, set_id)
# Boxed warning (if exists)
if "boxed_warning" in result:
output.extend(_format_boxed_warning(result["boxed_warning"]))
# Display requested sections
section_titles = get_section_titles()
for section in sections:
output.extend(format_label_section(result, section, section_titles))
output.append(f"\n{OPENFDA_DISCLAIMER}")
return "\n".join(output)
def _format_boxed_warning(boxed_warning: list) -> list[str]:
"""Format boxed warning section."""
output = ["### ⚠️ BOXED WARNING\n"]
warning_text = clean_text(" ".join(boxed_warning))
output.append(warning_text)
output.append("")
return output
```
--------------------------------------------------------------------------------
/tests/tdd/variants/test_extract_gene_aa_change.py:
--------------------------------------------------------------------------------
```python
"""Tests for _extract_gene_aa_change method in external.py."""
import pytest
from biomcp.variants.external import ExternalVariantAggregator
class TestExtractGeneAAChange:
"""Test the _extract_gene_aa_change method."""
@pytest.fixture
def aggregator(self):
"""Create an ExternalVariantAggregator instance."""
return ExternalVariantAggregator()
def test_extract_from_docm(self, aggregator):
"""Test extraction from DOCM data."""
variant_data = {"docm": {"gene": "BRAF", "aa_change": "p.V600E"}}
result = aggregator._extract_gene_aa_change(variant_data)
assert result == "BRAF V600E"
def test_extract_from_hgvsp_long_format(self, aggregator):
"""Test extraction from hgvsp with long amino acid names."""
variant_data = {
"cadd": {"gene": {"genename": "TP53"}},
"hgvsp": ["p.Arg175His"],
}
result = aggregator._extract_gene_aa_change(variant_data)
# The code doesn't convert all long forms, just checks for Val/Ala
assert result == "TP53 Arg175His"
def test_extract_from_hgvsp_with_dbnsfp(self, aggregator):
"""Test extraction from hgvsp with dbnsfp gene name."""
variant_data = {
"dbnsfp": {"genename": "EGFR"},
"hgvsp": ["p.Leu858Arg"],
}
result = aggregator._extract_gene_aa_change(variant_data)
# The code doesn't convert Leu/Arg to L/R
assert result == "EGFR Leu858Arg"
def test_extract_from_cadd_data(self, aggregator):
"""Test extraction from CADD annotations."""
variant_data = {
"cadd": {
"gene": {"genename": "KRAS", "prot": {"protpos": 12}},
"oaa": "G",
"naa": "D",
}
}
result = aggregator._extract_gene_aa_change(variant_data)
assert result == "KRAS G12D"
def test_extract_from_docm_without_p_prefix(self, aggregator):
"""Test extraction from DOCM without p. prefix."""
variant_data = {"docm": {"gene": "PIK3CA", "aa_change": "E545K"}}
result = aggregator._extract_gene_aa_change(variant_data)
assert result == "PIK3CA E545K"
def test_extract_with_multiple_hgvsp(self, aggregator):
"""Test handling of multiple hgvsp entries - should take first."""
variant_data = {
"cadd": {"gene": {"genename": "BRCA1"}},
"hgvsp": ["p.Gln1756Ter", "p.Gln1756*"],
}
result = aggregator._extract_gene_aa_change(variant_data)
# Takes the first one, doesn't convert Gln/Ter
assert result == "BRCA1 Gln1756Ter"
def test_extract_with_special_characters(self, aggregator):
"""Test extraction with special characters in protein change."""
variant_data = {
"cadd": {"gene": {"genename": "MLH1"}},
"hgvsp": ["p.Lys618Alafs*9"],
}
result = aggregator._extract_gene_aa_change(variant_data)
# Should extract the basic AA change pattern
assert result is not None
assert "MLH1" in result
def test_extract_no_gene_name(self, aggregator):
"""Test when gene name is missing."""
variant_data = {"hgvsp": ["p.Val600Glu"]}
result = aggregator._extract_gene_aa_change(variant_data)
assert result is None
def test_extract_no_aa_change(self, aggregator):
"""Test when AA change is missing."""
variant_data = {"cadd": {"gene": {"genename": "BRAF"}}}
result = aggregator._extract_gene_aa_change(variant_data)
assert result is None
def test_extract_empty_variant_data(self, aggregator):
"""Test with empty variant data."""
result = aggregator._extract_gene_aa_change({})
assert result is None
def test_extract_malformed_hgvsp(self, aggregator):
"""Test with malformed HGVS protein notation."""
variant_data = {
"clinvar": {
"gene": {"symbol": "MYC"},
"hgvs": {"protein": ["invalid_format"]},
}
}
result = aggregator._extract_gene_aa_change(variant_data)
assert result is None
def test_extract_priority_order(self, aggregator):
"""Test that DOCM is prioritized for AA change, CADD for gene name."""
variant_data = {
"docm": {"gene": "BRAF", "aa_change": "p.V600E"},
"hgvsp": ["p.Val600Lys"], # Different change
"cadd": {
"gene": {"genename": "WRONG", "prot": {"protpos": 600}},
"oaa": "V",
"naa": "K",
},
}
result = aggregator._extract_gene_aa_change(variant_data)
# CADD is prioritized for gene name, DOCM for AA change
assert result == "WRONG V600E"
def test_extract_regex_with_val_ala(self, aggregator):
"""Test regex extraction when Val/Ala are present."""
# The code specifically looks for Val or Ala to trigger regex
variant_data = {
"cadd": {"gene": {"genename": "TEST1"}},
"hgvsp": ["p.Val600Ala"],
}
result = aggregator._extract_gene_aa_change(variant_data)
# The regex doesn't find a match in "Val600Ala" because it's looking for [A-Z]\d+[A-Z]
# which would match "V600A" but not "Val600Ala"
assert result == "TEST1 Val600Ala"
def test_extract_handles_exceptions_gracefully(self, aggregator):
"""Test that exceptions are handled gracefully."""
# This should trigger an exception internally but return None
variant_data = {
"cadd": {"gene": {"genename": "GENE"}},
"hgvsp": None, # This will cause issues
}
result = aggregator._extract_gene_aa_change(variant_data)
assert result is None
```
--------------------------------------------------------------------------------
/tests/tdd/test_openfda_unified.py:
--------------------------------------------------------------------------------
```python
"""Tests for OpenFDA integration with unified search/fetch tools."""
import pytest
class TestOpenFDAUnifiedIntegration:
"""Test OpenFDA domain integration in unified tools."""
def test_openfda_domains_registered(self):
"""Test that OpenFDA domains are properly registered in constants."""
from biomcp.constants import (
DOMAIN_TO_PLURAL,
PLURAL_TO_DOMAIN,
VALID_DOMAINS,
VALID_DOMAINS_PLURAL,
)
# List of OpenFDA domains
openfda_domains = [
"fda_adverse",
"fda_label",
"fda_device",
"fda_approval",
"fda_recall",
"fda_shortage",
]
openfda_plurals = [
"fda_adverse_events",
"fda_labels",
"fda_device_events",
"fda_approvals",
"fda_recalls",
"fda_shortages",
]
# Check that all OpenFDA domains are registered
for domain in openfda_domains:
assert domain in VALID_DOMAINS, f"{domain} not in VALID_DOMAINS"
assert (
domain in DOMAIN_TO_PLURAL
), f"{domain} not in DOMAIN_TO_PLURAL"
# Check plural forms
for plural in openfda_plurals:
assert (
plural in VALID_DOMAINS_PLURAL
), f"{plural} not in VALID_DOMAINS_PLURAL"
assert (
plural in PLURAL_TO_DOMAIN
), f"{plural} not in PLURAL_TO_DOMAIN"
# Check mappings are correct
assert DOMAIN_TO_PLURAL["fda_adverse"] == "fda_adverse_events"
assert DOMAIN_TO_PLURAL["fda_label"] == "fda_labels"
assert DOMAIN_TO_PLURAL["fda_device"] == "fda_device_events"
assert DOMAIN_TO_PLURAL["fda_approval"] == "fda_approvals"
assert DOMAIN_TO_PLURAL["fda_recall"] == "fda_recalls"
assert DOMAIN_TO_PLURAL["fda_shortage"] == "fda_shortages"
assert PLURAL_TO_DOMAIN["fda_adverse_events"] == "fda_adverse"
assert PLURAL_TO_DOMAIN["fda_labels"] == "fda_label"
assert PLURAL_TO_DOMAIN["fda_device_events"] == "fda_device"
assert PLURAL_TO_DOMAIN["fda_approvals"] == "fda_approval"
assert PLURAL_TO_DOMAIN["fda_recalls"] == "fda_recall"
assert PLURAL_TO_DOMAIN["fda_shortages"] == "fda_shortage"
def test_openfda_search_domain_type_hints(self):
"""Test that OpenFDA domains are in search tool type hints."""
import inspect
from biomcp.router import search
# Get the function signature
sig = inspect.signature(search)
domain_param = sig.parameters.get("domain")
# Check if domain parameter exists
assert (
domain_param is not None
), "domain parameter not found in search function"
# Get the annotation
annotation = domain_param.annotation
# The annotation should be a Literal type that includes OpenFDA domains
# We can't directly check the Literal values due to how Python handles it,
# but we can verify that it's properly annotated
assert (
annotation is not None
), "domain parameter has no type annotation"
def test_openfda_fetch_domain_type_hints(self):
"""Test that OpenFDA domains are in fetch tool type hints."""
import inspect
from biomcp.router import fetch
# Get the function signature
sig = inspect.signature(fetch)
domain_param = sig.parameters.get("domain")
# Check if domain parameter exists
assert (
domain_param is not None
), "domain parameter not found in fetch function"
# Get the annotation
annotation = domain_param.annotation
# The annotation should be a Literal type that includes OpenFDA domains
assert (
annotation is not None
), "domain parameter has no type annotation"
@pytest.mark.asyncio
async def test_openfda_search_basic_call(self):
"""Test that OpenFDA domain search doesn't raise errors with basic call."""
from unittest.mock import AsyncMock, patch
# Mock the OpenFDA search function that will be imported
with patch(
"biomcp.openfda.adverse_events.search_adverse_events",
new_callable=AsyncMock,
) as mock_search:
mock_search.return_value = (
"## FDA Adverse Event Reports\n\nTest results"
)
from biomcp.router import search
# This should not raise an error
result = await search(
query=None, # Required parameter
domain="fda_adverse",
chemicals=["test"],
page_size=1,
)
# Basic check that result has expected structure
assert isinstance(result, dict)
assert "results" in result
@pytest.mark.asyncio
async def test_openfda_fetch_basic_call(self):
"""Test that OpenFDA domain fetch doesn't raise errors with basic call."""
from unittest.mock import AsyncMock, patch
# Mock the OpenFDA get function that will be imported
with patch(
"biomcp.openfda.drug_approvals.get_drug_approval",
new_callable=AsyncMock,
) as mock_get:
mock_get.return_value = "## Drug Approval Details\n\nTest details"
from biomcp.router import fetch
# This should not raise an error
result = await fetch(
id="TEST123",
domain="fda_approval",
)
# Basic check that result has expected structure
assert isinstance(result, dict)
assert "title" in result
assert "text" in result
assert "metadata" in result
```
--------------------------------------------------------------------------------
/tests/tdd/articles/test_preprints.py:
--------------------------------------------------------------------------------
```python
"""Tests for preprint search functionality."""
from unittest.mock import AsyncMock, patch
import pytest
from biomcp.articles.preprints import (
BiorxivClient,
BiorxivResponse,
BiorxivResult,
EuropePMCClient,
EuropePMCResponse,
PreprintSearcher,
)
from biomcp.articles.search import PubmedRequest, ResultItem
from biomcp.core import PublicationState
class TestBiorxivClient:
"""Tests for BiorxivClient."""
@pytest.mark.asyncio
async def test_search_biorxiv_success(self):
"""Test successful bioRxiv search."""
client = BiorxivClient()
# Mock response
mock_response = BiorxivResponse(
collection=[
BiorxivResult(
doi="10.1101/2024.01.01.123456",
title="Test BRAF Mutation Study",
authors="Smith, J.; Doe, J.",
date="2024-01-01",
abstract="Study about BRAF mutations in cancer.",
server="biorxiv",
)
],
total=1,
)
with patch("biomcp.http_client.request_api") as mock_request:
mock_request.return_value = (mock_response, None)
results = await client.search("BRAF")
assert len(results) == 1
assert results[0].doi == "10.1101/2024.01.01.123456"
assert results[0].title == "Test BRAF Mutation Study"
assert results[0].publication_state == PublicationState.PREPRINT
assert "preprint" in results[0].journal.lower()
@pytest.mark.asyncio
async def test_search_biorxiv_no_results(self):
"""Test bioRxiv search with no results."""
client = BiorxivClient()
with patch("biomcp.http_client.request_api") as mock_request:
mock_request.return_value = (
None,
{"code": 404, "message": "Not found"},
)
results = await client.search("nonexistent")
assert len(results) == 0
class TestEuropePMCClient:
"""Tests for EuropePMCClient."""
@pytest.mark.asyncio
async def test_search_europe_pmc_success(self):
"""Test successful Europe PMC search."""
client = EuropePMCClient()
# Mock response
mock_response = EuropePMCResponse(
hitCount=1,
resultList={
"result": [
{
"id": "PPR123456",
"doi": "10.1101/2024.01.02.654321",
"title": "TP53 Mutation Analysis",
"authorString": "Johnson, A., Williams, B.",
"journalTitle": "bioRxiv",
"firstPublicationDate": "2024-01-02",
"abstractText": "Analysis of TP53 mutations.",
}
]
},
)
with patch("biomcp.http_client.request_api") as mock_request:
mock_request.return_value = (mock_response, None)
results = await client.search("TP53")
assert len(results) == 1
assert results[0].doi == "10.1101/2024.01.02.654321"
assert results[0].title == "TP53 Mutation Analysis"
assert results[0].publication_state == PublicationState.PREPRINT
class TestPreprintSearcher:
"""Tests for PreprintSearcher."""
@pytest.mark.asyncio
async def test_search_combined_sources(self):
"""Test searching across multiple preprint sources."""
searcher = PreprintSearcher()
# Mock both clients
mock_biorxiv_results = [
ResultItem(
doi="10.1101/2024.01.01.111111",
title="BRAF Study 1",
date="2024-01-01",
publication_state=PublicationState.PREPRINT,
)
]
mock_europe_results = [
ResultItem(
doi="10.1101/2024.01.02.222222",
title="BRAF Study 2",
date="2024-01-02",
publication_state=PublicationState.PREPRINT,
)
]
searcher.biorxiv_client.search = AsyncMock(
return_value=mock_biorxiv_results
)
searcher.europe_pmc_client.search = AsyncMock(
return_value=mock_europe_results
)
request = PubmedRequest(genes=["BRAF"])
response = await searcher.search(request)
assert response.count == 2
assert len(response.results) == 2
# Results should be sorted by date (newest first)
assert response.results[0].doi == "10.1101/2024.01.02.222222"
assert response.results[1].doi == "10.1101/2024.01.01.111111"
@pytest.mark.asyncio
async def test_search_duplicate_removal(self):
"""Test that duplicate DOIs are removed."""
searcher = PreprintSearcher()
# Create duplicate results with same DOI
duplicate_doi = "10.1101/2024.01.01.999999"
mock_biorxiv_results = [
ResultItem(
doi=duplicate_doi,
title="Duplicate Study",
date="2024-01-01",
publication_state=PublicationState.PREPRINT,
)
]
mock_europe_results = [
ResultItem(
doi=duplicate_doi,
title="Duplicate Study",
date="2024-01-01",
publication_state=PublicationState.PREPRINT,
)
]
searcher.biorxiv_client.search = AsyncMock(
return_value=mock_biorxiv_results
)
searcher.europe_pmc_client.search = AsyncMock(
return_value=mock_europe_results
)
request = PubmedRequest(keywords=["test"])
response = await searcher.search(request)
assert response.count == 1
assert len(response.results) == 1
assert response.results[0].doi == duplicate_doi
```
--------------------------------------------------------------------------------
/tests/tdd/test_render.py:
--------------------------------------------------------------------------------
```python
from biomcp import render
def test_render_full_json(data_dir):
input_data = (data_dir / "ct_gov/trials_NCT04280705.json").read_text()
expect_markdown = (data_dir / "ct_gov/trials_NCT04280705.txt").read_text()
markdown = render.to_markdown(input_data)
assert markdown == expect_markdown
input_data = (
data_dir / "myvariant/variants_full_braf_v600e.json"
).read_text()
expect_markdown = (
data_dir / "myvariant/variants_full_braf_v600e.txt"
).read_text()
markdown = render.to_markdown(input_data)
print("==" * 100)
print(markdown)
print("==" * 100)
assert markdown == expect_markdown
def test_render_with_nones():
markdown = render.to_markdown(data)
assert (
markdown
== """# Studies
## Protocol Section
### Design Module
Study Type: interventional
Phases: phase2
### Identification Module
Brief Title:
study of autologous tumor infiltrating lymphocytes in patients with
solid tumors
Nct Id: nct03645928
### Status Module
Overall Status: recruiting
#### Completion Date Struct
Date: 2029-08-09
#### Start Date Struct
Date: 2019-05-07
"""
)
data = {
"next_page_token": None,
"studies": [
{
"derived_section": None,
"document_section": None,
"has_results": None,
"protocol_section": {
"arms_interventions_module": None,
"conditions_module": None,
"contacts_locations_module": None,
"description_module": None,
"design_module": {
"design_info": None,
"enrollment_info": None,
"phases": ["phase2"],
"study_type": "interventional",
},
"eligibility_module": None,
"identification_module": {
"acronym": None,
"brief_title": "study "
"of "
"autologous "
"tumor "
"infiltrating "
"lymphocytes "
"in "
"patients "
"with "
"solid "
"tumors",
"nct_id": "nct03645928",
"official_title": None,
"org_study_id_info": None,
"organization": None,
"secondary_id_infos": None,
},
"outcomes_module": None,
"oversight_module": None,
"references_module": None,
"sponsor_collaborators_module": None,
"status_module": {
"completion_date_struct": {
"date": "2029-08-09",
"type": None,
},
"expanded_access_info": None,
"last_known_status": None,
"last_update_post_date_struct": None,
"last_update_submit_date": None,
"overall_status": "recruiting",
"primary_completion_date_struct": None,
"results_first_post_date_struct": None,
"results_first_submit_date": None,
"results_first_submit_qc_date": None,
"start_date_struct": {"date": "2019-05-07", "type": None},
"status_verified_date": None,
"study_first_post_date_struct": None,
"study_first_submit_date": None,
"study_first_submit_qc_date": None,
"why_stopped": None,
},
},
"results_section": None,
},
],
}
def test_transform_key_protocol_section():
assert render.transform_key("protocol_section") == "Protocol Section"
def test_transform_key_nct_number():
assert render.transform_key("nct_number") == "Nct Number"
def test_transform_key_study_url():
assert render.transform_key("study_url") == "Study Url"
def test_transform_key_allcaps():
assert render.transform_key("allcaps") == "Allcaps"
def test_transform_key_primary_purpose():
assert render.transform_key("primary_purpose") == "Primary Purpose"
def test_transform_key_underscores():
assert render.transform_key("some_key_name") == "Some Key Name"
def test_transform_key_lowercase():
assert render.transform_key("somekey") == "Somekey"
def test_transform_key_nctid():
assert render.transform_key("nct_id") == "Nct Id"
def test_transform_key_4dct():
assert render.transform_key("4dct") == "4dct"
def test_wrap_preserve_newlines_blank():
assert render.wrap_preserve_newlines("", 20) == []
def test_wrap_preserve_newlines_short_line():
text = "hello world"
assert render.wrap_preserve_newlines(text, 20) == ["hello world"]
def test_wrap_preserve_newlines_long():
text = "this line is definitely longer than twenty characters"
lines = render.wrap_preserve_newlines(text, 20)
assert len(lines) > 1
assert "this line is" in lines[0]
def test_process_scalar_list_fits():
lines = []
render.process_scalar_list(
"conditions",
lines,
["condition1", "condition2"],
)
assert lines == ["Conditions: condition1, condition2"]
def test_process_scalar_list_too_long():
lines = []
big_list = ["test_value" * 10, "another" * 5]
render.process_scalar_list("giant_field", lines, big_list)
assert lines[0].startswith("Giant Field:")
assert lines[1].startswith("- test_value")
def test_render_key_value_short():
lines = []
render.render_key_value(lines, "nct_number", "nct100")
assert lines == ["Nct Number: nct100"]
def test_render_key_value_long():
lines = []
render.render_key_value(lines, "brief_summary", "hello " * 15)
# first line "brief summary:"
assert lines[0] == "Brief Summary:"
assert lines[1].startswith(" hello hello")
```
--------------------------------------------------------------------------------
/src/biomcp/cli/articles.py:
--------------------------------------------------------------------------------
```python
import asyncio
import json
from typing import Annotated
import typer
from ..articles import fetch
from ..articles.search import PubmedRequest, search_articles
from ..articles.unified import search_articles_unified
article_app = typer.Typer(help="Search and retrieve biomedical articles.")
async def get_article_details(
identifier: str, output_json: bool = False
) -> str:
"""Get article details handling both PMIDs and DOIs with proper output format."""
# Use the fetch module functions directly to control output format
if fetch.is_doi(identifier):
from ..articles.preprints import fetch_europe_pmc_article
return await fetch_europe_pmc_article(
identifier, output_json=output_json
)
elif fetch.is_pmid(identifier):
return await fetch.fetch_articles(
[int(identifier)], full=True, output_json=output_json
)
else:
# Unknown identifier format
error_data = [
{
"error": f"Invalid identifier format: {identifier}. Expected either a PMID (numeric) or DOI (10.xxxx/xxxx format)."
}
]
if output_json:
return json.dumps(error_data, indent=2)
else:
from .. import render
return render.to_markdown(error_data)
@article_app.command("search")
def search_article(
genes: Annotated[
list[str] | None,
typer.Option(
"--gene",
"-g",
help="Gene name to search for (can be specified multiple times)",
),
] = None,
variants: Annotated[
list[str] | None,
typer.Option(
"--variant",
"-v",
help="Genetic variant to search for (can be specified multiple times)",
),
] = None,
diseases: Annotated[
list[str] | None,
typer.Option(
"--disease",
"-d",
help="Disease name to search for (can be specified multiple times)",
),
] = None,
chemicals: Annotated[
list[str] | None,
typer.Option(
"--chemical",
"-c",
help="Chemical name to search for (can be specified multiple times)",
),
] = None,
keywords: Annotated[
list[str] | None,
typer.Option(
"--keyword",
"-k",
help="Keyword to search for (can be specified multiple times)",
),
] = None,
limit: Annotated[
int,
typer.Option(
"--limit",
"-l",
help="Maximum number of results per page (1-100)",
min=1,
max=100,
),
] = 10,
page: Annotated[
int,
typer.Option(
"--page",
"-p",
help="Page number for pagination (starts at 1)",
min=1,
),
] = 1,
output_json: Annotated[
bool,
typer.Option(
"--json",
"-j",
help="Render in JSON format",
case_sensitive=False,
),
] = False,
include_preprints: Annotated[
bool,
typer.Option(
"--include-preprints/--no-preprints",
help="Include preprint articles from bioRxiv/medRxiv and Europe PMC",
),
] = True,
):
"""Search biomedical research articles"""
request = PubmedRequest(
genes=genes or [],
variants=variants or [],
diseases=diseases or [],
chemicals=chemicals or [],
keywords=keywords or [],
)
if include_preprints:
result = asyncio.run(
search_articles_unified(
request,
include_pubmed=True,
include_preprints=True,
output_json=output_json,
limit=limit,
page=page,
)
)
else:
result = asyncio.run(
search_articles(request, output_json, limit=limit, page=page)
)
typer.echo(result)
@article_app.command("get")
def get_article(
identifiers: Annotated[
list[str],
typer.Argument(
help="Article identifiers - PubMed IDs (e.g., 38768446) or DOIs (e.g., 10.1101/2024.01.20.23288905)",
),
],
full: Annotated[
bool,
typer.Option(
"--full",
"-f",
help="Whether to fetch full article text (PubMed only)",
),
] = False,
output_json: Annotated[
bool,
typer.Option(
"--json",
"-j",
help="Render in JSON format",
case_sensitive=False,
),
] = False,
):
"""
Retrieve articles by PubMed ID or DOI.
Supports:
- PubMed IDs for published articles (e.g., 38768446)
- DOIs for Europe PMC preprints (e.g., 10.1101/2024.01.20.23288905)
For multiple articles, results are returned as a list.
"""
# Handle single identifier
if len(identifiers) == 1:
result = asyncio.run(
get_article_details(identifiers[0], output_json=output_json)
)
else:
# For multiple identifiers, we need to handle them individually
# since they might be a mix of PMIDs and DOIs
results = []
for identifier in identifiers:
article_result = asyncio.run(
get_article_details(identifier, output_json=True)
)
# Parse the result and add to list
try:
article_data = json.loads(article_result)
if isinstance(article_data, list):
results.extend(article_data)
else:
results.append(article_data)
except json.JSONDecodeError:
# This shouldn't happen with our new function
results.append({
"error": f"Failed to parse result for {identifier}"
})
if output_json:
result = json.dumps(results, indent=2)
else:
from .. import render
result = render.to_markdown(results)
typer.echo(result)
```
--------------------------------------------------------------------------------
/src/biomcp/articles/search_optimized.py:
--------------------------------------------------------------------------------
```python
"""Optimized article search with caching and parallel processing."""
import asyncio
import hashlib
from .. import ensure_list
from ..shared_context import get_search_context
from ..utils.request_cache import get_cache
from .search import PubmedRequest
from .unified import search_articles_unified
# Cache for article search results (5 minute TTL)
_search_cache = get_cache("article_search", ttl_seconds=300)
def _get_search_cache_key(
request: PubmedRequest, include_preprints: bool, include_cbioportal: bool
) -> str:
"""Generate a cache key for search requests."""
# Create a deterministic key from search parameters
key_parts = [
f"chemicals:{sorted(request.chemicals)}",
f"diseases:{sorted(request.diseases)}",
f"genes:{sorted(request.genes)}",
f"keywords:{sorted(request.keywords)}",
f"variants:{sorted(request.variants)}",
f"preprints:{include_preprints}",
f"cbioportal:{include_cbioportal}",
]
key_string = "|".join(key_parts)
return hashlib.sha256(key_string.encode()).hexdigest()
async def article_searcher_optimized(
call_benefit: str,
chemicals: list[str] | str | None = None,
diseases: list[str] | str | None = None,
genes: list[str] | str | None = None,
keywords: list[str] | str | None = None,
variants: list[str] | str | None = None,
include_preprints: bool = True,
include_cbioportal: bool = True,
) -> str:
"""Optimized version of article_searcher with caching and context reuse."""
# Convert parameters to PubmedRequest
request = PubmedRequest(
chemicals=ensure_list(chemicals, split_strings=True),
diseases=ensure_list(diseases, split_strings=True),
genes=ensure_list(genes, split_strings=True),
keywords=ensure_list(keywords, split_strings=True),
variants=ensure_list(variants, split_strings=True),
)
# Check cache first
cache_key = _get_search_cache_key(
request, include_preprints, include_cbioportal
)
cached_result = await _search_cache.get(cache_key)
if cached_result is not None:
return cached_result
# Check if we're in a search context (for reusing validated entities)
context = get_search_context()
if context and request.genes:
# Pre-validate genes using cached results
valid_genes = []
for gene in request.genes:
if await context.validate_gene(gene):
valid_genes.append(gene)
request.genes = valid_genes
# Check if we have cached cBioPortal summaries
if include_cbioportal and request.genes:
for gene in request.genes[:1]: # Just first gene
summary = context.get_gene_summary(gene)
if summary:
# We have a cached summary, can skip that part
pass
# Perform the search
result = await search_articles_unified(
request,
include_pubmed=True,
include_preprints=include_preprints,
include_cbioportal=include_cbioportal,
)
# Cache the result (5 minute TTL)
await _search_cache.set(cache_key, result, ttl=300)
return result
# Additional optimization: Batch article searches
class ArticleSearchBatcher:
"""Batch multiple article searches to reduce overhead."""
def __init__(self, batch_size: int = 5, timeout: float = 0.1):
self.batch_size = batch_size
self.timeout = timeout
self._pending_searches: list[tuple[PubmedRequest, asyncio.Future]] = []
self._batch_task: asyncio.Task | None = None
async def search(self, request: PubmedRequest) -> str:
"""Add a search to the batch."""
future = asyncio.get_event_loop().create_future()
self._pending_searches.append((request, future))
# Start batch processing if not already running
if self._batch_task is None or self._batch_task.done():
self._batch_task = asyncio.create_task(self._process_batch())
return await future
async def _process_batch(self):
"""Process pending searches in batch."""
await asyncio.sleep(self.timeout) # Wait for more requests
if not self._pending_searches:
return
# Take up to batch_size searches
batch = self._pending_searches[: self.batch_size]
self._pending_searches = self._pending_searches[self.batch_size :]
# Process searches in parallel
search_tasks = []
for request, _ in batch:
task = search_articles_unified(request, include_pubmed=True)
search_tasks.append(task)
results = await asyncio.gather(*search_tasks, return_exceptions=True)
# Set results on futures
for (_, future), result in zip(batch, results, strict=False):
if isinstance(result, Exception):
future.set_exception(result)
else:
future.set_result(result)
# Global batcher instance
_article_batcher = ArticleSearchBatcher()
async def article_searcher_batched(
call_benefit: str,
chemicals: list[str] | str | None = None,
diseases: list[str] | str | None = None,
genes: list[str] | str | None = None,
keywords: list[str] | str | None = None,
variants: list[str] | str | None = None,
include_preprints: bool = True,
include_cbioportal: bool = True,
) -> str:
"""Batched version of article_searcher for multiple concurrent searches."""
request = PubmedRequest(
chemicals=ensure_list(chemicals, split_strings=True),
diseases=ensure_list(diseases, split_strings=True),
genes=ensure_list(genes, split_strings=True),
keywords=ensure_list(keywords, split_strings=True),
variants=ensure_list(variants, split_strings=True),
)
# Use the optimized version with caching
return await article_searcher_optimized(
call_benefit=call_benefit,
chemicals=request.chemicals,
diseases=request.diseases,
genes=request.genes,
keywords=request.keywords,
variants=request.variants,
include_preprints=include_preprints,
include_cbioportal=include_cbioportal,
)
```
--------------------------------------------------------------------------------
/tests/tdd/variants/test_cbioportal_mutations.py:
--------------------------------------------------------------------------------
```python
"""Tests for cBioPortal mutation-specific search functionality."""
import pytest
from biomcp.utils.mutation_filter import MutationFilter
from biomcp.variants.cbioportal_mutations import (
CBioPortalMutationClient,
MutationHit,
StudyMutationSummary,
format_mutation_search_result,
)
class TestCBioPortalMutationSearch:
"""Test mutation-specific search functionality."""
@pytest.mark.asyncio
@pytest.mark.integration
async def test_search_specific_mutation_srsf2_f57y(self):
"""Test searching for SRSF2 F57Y mutation."""
client = CBioPortalMutationClient()
result = await client.search_specific_mutation(
gene="SRSF2", mutation="F57Y", max_studies=10
)
assert result is not None
assert result.gene == "SRSF2"
assert result.specific_mutation == "F57Y"
assert result.studies_with_mutation >= 0
# If mutations found, check structure
if result.studies_with_mutation > 0:
assert len(result.top_studies) > 0
top_study = result.top_studies[0]
assert isinstance(top_study, StudyMutationSummary)
assert top_study.mutation_count > 0
@pytest.mark.asyncio
@pytest.mark.integration
async def test_search_mutation_pattern_srsf2_f57(self):
"""Test searching for SRSF2 F57* mutations."""
client = CBioPortalMutationClient()
result = await client.search_specific_mutation(
gene="SRSF2", pattern="F57*", max_studies=10
)
assert result is not None
assert result.gene == "SRSF2"
assert result.pattern == "F57*"
# F57* should match F57Y, F57C, etc.
if result.total_mutations > 0:
assert result.mutation_types is not None
# Check that we found some F57 mutations
f57_mutations = [
mut for mut in result.mutation_types if mut.startswith("F57")
]
assert len(f57_mutations) > 0
@pytest.mark.asyncio
@pytest.mark.integration
async def test_search_braf_v600e(self):
"""Test searching for BRAF V600E - a very common mutation."""
client = CBioPortalMutationClient()
result = await client.search_specific_mutation(
gene="BRAF", mutation="V600E", max_studies=20
)
assert result is not None
assert result.gene == "BRAF"
assert result.specific_mutation == "V600E"
# V600E is very common, should have many studies
assert result.studies_with_mutation > 10
assert len(result.top_studies) > 0
# Check melanoma is in top cancer types
cancer_types = [s.cancer_type for s in result.top_studies]
# At least some melanoma studies should have V600E
assert any("melanoma" in ct.lower() for ct in cancer_types)
def test_filter_mutations_specific(self):
"""Test filtering for specific mutations."""
mutations = [
MutationHit(
study_id="study1",
molecular_profile_id="study1_mutations",
protein_change="F57Y",
mutation_type="Missense",
),
MutationHit(
study_id="study1",
molecular_profile_id="study1_mutations",
protein_change="F57C",
mutation_type="Missense",
),
MutationHit(
study_id="study2",
molecular_profile_id="study2_mutations",
protein_change="R88Q",
mutation_type="Missense",
),
]
# Filter for F57Y
mutation_filter = MutationFilter(specific_mutation="F57Y")
filtered = mutation_filter.filter_mutations(mutations)
assert len(filtered) == 1
assert filtered[0].protein_change == "F57Y"
def test_filter_mutations_pattern(self):
"""Test filtering with wildcard patterns."""
mutations = [
MutationHit(
study_id="study1",
molecular_profile_id="study1_mutations",
protein_change="F57Y",
mutation_type="Missense",
),
MutationHit(
study_id="study1",
molecular_profile_id="study1_mutations",
protein_change="F57C",
mutation_type="Missense",
),
MutationHit(
study_id="study2",
molecular_profile_id="study2_mutations",
protein_change="R88Q",
mutation_type="Missense",
),
]
# Filter for F57*
mutation_filter = MutationFilter(pattern="F57*")
filtered = mutation_filter.filter_mutations(mutations)
assert len(filtered) == 2
assert all(m.protein_change.startswith("F57") for m in filtered)
def test_format_mutation_search_result(self):
"""Test formatting of mutation search results."""
from biomcp.variants.cbioportal_mutations import MutationSearchResult
result = MutationSearchResult(
gene="SRSF2",
specific_mutation="F57Y",
total_studies=100,
studies_with_mutation=3,
total_mutations=5,
top_studies=[
StudyMutationSummary(
study_id="msk_ch_2023",
study_name="Cancer Therapy and Clonal Hematopoiesis",
cancer_type="mixed",
mutation_count=5,
sample_count=100,
),
StudyMutationSummary(
study_id="mds_mskcc_2020",
study_name="Myelodysplastic Syndrome Study",
cancer_type="mds",
mutation_count=2,
sample_count=50,
),
],
mutation_types={"F57Y": 5},
)
formatted = format_mutation_search_result(result)
assert "SRSF2" in formatted
assert "F57Y" in formatted
assert "**Studies with Mutation**: 3" in formatted
assert "msk_ch_2023" in formatted
assert "| 5 |" in formatted # mutation count
```
--------------------------------------------------------------------------------
/docs/backend-services-reference/06-pubtator3.md:
--------------------------------------------------------------------------------
```markdown
# PubTator3 API
This document describes the PubTator3 API used by BioMCP for searching biomedical literature and retrieving article details with annotations. Understanding this API provides context for how BioMCP's article commands function.
## Overview
The PubTator3 API provides a way to search for and retrieve biomedical articles
with entity annotations. This document outlines the API implementation details.
PubTator3 is a web-based tool that provides annotations of biomedical entities
in PubMed abstracts and PMC full-text articles. BioMCP uses the PubTator3 API
to search for and retrieve biomedical articles and their annotated entities (
genes, variants, diseases, chemicals, etc.).
> **CLI Documentation**: For information on using these APIs through the BioMCP
> command line interface, see
> the [Articles CLI Documentation](../user-guides/01-command-line-interface.md#article-commands).
## Usage Guide
For practical examples of searching articles with PubTator3, see [How to Find Articles and cBioPortal Data](../how-to-guides/01-find-articles-and-cbioportal-data.md).
## API Workflow
The PubTator3 integration follows a three-step workflow:
1. **Entity Autocomplete**: Get standardized entity identifiers
2. **Search**: Find articles using entity identifiers and keywords
3. **Fetch**: Retrieve full article details by PMID
## API Endpoints
### Entity Autocomplete API
**Endpoint:**
`https://www.ncbi.nlm.nih.gov/research/pubtator3-api/entity/autocomplete/`
This endpoint helps normalize entity names to their standard identifiers,
improving search precision.
#### Parameters
| Parameter | Description | Example |
| --------- | --------------------------- | ----------------------------------- |
| `query` | Text to autocomplete | `BRAF` |
| `concept` | Entity type | `GENE`, `CHEMICAL`, `DISEASE`, etc. |
| `limit` | Number of results to return | `2` |
#### Example Request and Response
```bash
curl "https://www.ncbi.nlm.nih.gov/research/pubtator3-api/entity/autocomplete/?query=BRAF&concept=GENE&limit=2"
```
Response:
```json
[
{
"_id": "@GENE_BRAF",
"biotype": "gene",
"name": "BRAF",
"description": "All Species",
"match": "Matched on name <m>BRAF</m>"
},
{
"_id": "@GENE_BRAFP1",
"biotype": "gene",
"name": "BRAFP1",
"description": "All Species",
"match": "Matched on name <m>BRAFP1</m>"
}
]
```
### Entity Search API
**Endpoint:** `https://www.ncbi.nlm.nih.gov/research/pubtator3-api/search/`
This endpoint allows searching for PMIDs (PubMed IDs) based on entity
identifiers and keywords.
#### Parameters
| Parameter | Description | Example |
| --------- | ------------------------------- | ---------------------- |
| `text` | Entity identifier or text query | `@CHEMICAL_remdesivir` |
#### Example Request and Response
```bash
curl "https://www.ncbi.nlm.nih.gov/research/pubtator3-api/search/?text=@CHEMICAL_remdesivir"
```
Response (truncated):
```json
{
"results": [
{
"_id": "37711410",
"pmid": 37711410,
"title": "Remdesivir.",
"journal": "Hosp Pharm",
"authors": ["Levien TL", "Baker DE"],
"date": "2023-10-01T00:00:00Z",
"doi": "10.1177/0018578721999804",
"meta_date_publication": "2023 Oct",
"meta_volume": "58"
}
// More results...
]
}
```
### Article Fetch API
**Endpoint:**
`https://www.ncbi.nlm.nih.gov/research/pubtator3-api/publications/export/biocjson`
This endpoint retrieves detailed information about specific articles, including
annotations.
#### Parameters
| Parameter | Description | Example |
| ----------- | --------------------------------------------- | ---------- |
| `pmids` | List of PubMed IDs to retrieve | `29355051` |
| `full_text` | Whether to include full text (when available) | `true` |
#### Example Request
```bash
curl "https://www.ncbi.nlm.nih.gov/research/pubtator3-api/publications/export/biocjson?pmids=29355051&full=true"
```
Response format (truncated):
```json
{
"PubTator3": [
{
"_id": "29355051|PMC6142073",
"id": "6142073",
"infons": {},
"passages": [
{
"infons": {
"name_3": "surname:Hu;given-names:Minghua",
"name_2": "surname:Luo;given-names:Xia",
"name_1": "surname:Luo;given-names:Shuang",
"article-id_pmid": "29355051"
// More metadata...
}
}
// More passages...
]
}
]
}
```
## Entity Types
PubTator3 annotates several types of biomedical entities:
1. **Genes/Proteins**: Gene or protein names (e.g., BRAF, TP53)
2. **Genetic Variants**: Genetic variations (e.g., BRAF V600E)
3. **Diseases**: Disease names and conditions (e.g., Melanoma)
4. **Chemicals/Drugs**: Chemical substances or drugs (e.g., Vemurafenib)
## Integration Strategy for BioMCP
The recommended workflow for integrating with PubTator3 in BioMCP is:
1. **Entity Normalization**: Use the autocomplete API to convert user-provided
entity names to standardized identifiers
2. **Literature Search**: Use the search API with these identifiers to find
relevant PMIDs
3. **Data Retrieval**: Fetch detailed article data with annotations using the
fetch API
This workflow ensures consistent entity handling and optimal search results.
## Authentication
The PubTator3 API is public and does not require authentication for basic
usage. However, there are rate limits in place to prevent abuse.
## Rate Limits and Best Practices
- **Request Limits**: Approximately 30 requests per minute
- **Batch Requests**: For article retrieval, batch multiple PMIDs in a single
request
- **Caching**: Implement caching to minimize repeated requests
- **Specific Queries**: Use specific entity names rather than general terms for
better results
## Error Handling
Common error responses:
- **400**: Invalid parameters
- **404**: Articles not found
- **429**: Rate limit exceeded
- **500**: Server error
## More Information
For complete API documentation, visit
the [PubTator3 API Documentation](https://www.ncbi.nlm.nih.gov/research/pubtator3/api).
```
--------------------------------------------------------------------------------
/docs/backend-services-reference/04-clinicaltrials-gov.md:
--------------------------------------------------------------------------------
```markdown
# ClinicalTrials.gov API
This document outlines the key aspects of the public ClinicalTrials.gov v2 API utilized by BioMCP. Understanding these details can be helpful for advanced users interpreting BioMCP results or for developers extending its capabilities. BioMCP's CLI commands often simplify or combine these parameters for ease of use; refer to the [Trials CLI Documentation](../user-guides/01-command-line-interface.md#trial-commands) for specific command options.
## Overview
The [ClinicalTrials.gov](https://clinicaltrials.gov/) API provides programmatic
access to clinical trial information. This document outlines the API
implementation details for searching and retrieving clinical trial data.
> **CLI Documentation**: For information on using these APIs through the BioMCP
> command line interface, see the [Trials CLI Documentation](../user-guides/01-command-line-interface.md#trial-commands).
## API Endpoints
### Search API
**Endpoint:** `https://clinicaltrials.gov/api/v2/studies`
This endpoint allows searching for clinical trials using various parameters.
#### Key Parameters
| Parameter | Description | Example Value |
| ---------------------- | ----------------------------------- | ----------------------------------------------- |
| `query.cond` | "Conditions or disease" query | `lung cancer` |
| `query.term` | "Other terms" query | `AREA[LastUpdatePostDate]RANGE[2023-01-15,MAX]` |
| `query.intr` | "Intervention/treatment" query | `Vemurafenib` |
| `query.locn` | "Location terms" query | `New York` |
| `query.titles` | "Title/acronym" query | `BRAF Melanoma` |
| `query.outc` | "Outcome measure" query | `overall survival` |
| `query.spons` | "Sponsor/collaborator" query | `National Cancer Institute` |
| `query.lead` | Searches in "LeadSponsorName" field | `MD Anderson` |
| `query.id` | "Study IDs" query (OR semantics) | `NCT04267848` |
| `filter.overallStatus` | Comma-separated list of statuses | `NOT_YET_RECRUITING,RECRUITING` |
| `filter.geo` | Geo-location filter | `distance(39.0035707,-77.1013313,50mi)` |
| `filter.ids` | Filter by NCT IDs (AND semantics) | `NCT04852770,NCT01728545` |
| `filter.advanced` | Advanced filter query | `AREA[StartDate]2022` |
| `sort` | Sort order | `LastUpdatePostDate:desc` |
| `fields` | Fields to return | `NCTId,BriefTitle,OverallStatus,HasResults` |
| `countTotal` | Count total number of studies | `true` or `false` |
#### Example Request
```bash
curl -X GET "https://clinicaltrials.gov/api/v2/studies?query.cond=Melanoma&query.intr=BRAF"
```
### Study Details API
**Endpoint:** `https://clinicaltrials.gov/api/v2/studies/{NCT_ID}`
This endpoint retrieves detailed information about a specific clinical trial.
#### Example Request
```bash
curl -X GET "https://clinicaltrials.gov/api/v2/studies/NCT04267848"
```
#### Response Modules
The API response contains various modules of information:
- **protocolSection**: Basic study information, eligibility criteria, and
design
- **resultsSection**: Study outcomes and results (when available)
- **documentSection**: Related documents
- **derivedSection**: Derived data elements
- **annotationsSection**: Additional annotations
## Implementation Details
### NCT ID Filtering Semantics
BioMCP uses intelligent filtering when NCT IDs are provided:
- **ID-only mode**: When NCT IDs are the only filter criteria, `query.id` is used for fast direct lookup
- **Intersection mode**: When NCT IDs are combined with other filters (conditions, interventions, etc.), `filter.ids` is used to ensure results match ALL criteria
This ensures that specifying NCT IDs restricts results rather than expanding them.
### Query Building
When constructing API queries, parameters must be properly formatted according to the API documentation.
For implementation details on query building in BioMCP, see the [HTTP Client Developer Guide](../developer-guides/06-http-client-and-caching.md).
### Response Parsing
The API returns data in JSON format (or CSV if specified). Key sections in the
response include:
- `protocolSection`: Contains study protocol details
- `identificationModule`: Basic identifiers including NCT ID and title
- `statusModule`: Current recruitment status and study dates
- `sponsorCollaboratorsModule`: Information about sponsors and
collaborators
- `designModule`: Study design information including interventions
- `eligibilityModule`: Inclusion/exclusion criteria and eligible population
- `contactsLocationsModule`: Study sites and contact information
- `referencesModule`: Related publications
### Error Handling
The API returns standard HTTP status codes. Common error scenarios include:
- **404**: Trial not found
- **429**: Rate limit exceeded
- **400**: Invalid query parameters
For implementation details on error handling in BioMCP, see the [Error Handling Developer Guide](../developer-guides/05-error-handling.md).
## Authentication
The ClinicalTrials.gov API is public and does not require authentication for
basic usage. However, there are rate limits in place.
## Rate Limits and Best Practices
- **Rate Limit**: Approximately 50 requests per minute per IP address
- **Caching**: Implement caching to minimize repeated requests
- **Pagination**: For large result sets, use the pagination functionality with
- **Focused Queries**: Use specific search terms rather than broad queries to
get more relevant results
- **Field Selection**: Use the fields parameter to request only the data you
need
## More Information
For complete API documentation, visit
the [ClinicalTrials.gov API Documentation](https://clinicaltrials.gov/data-api/about-api)
```
--------------------------------------------------------------------------------
/docs/how-to-guides/05-logging-and-monitoring-with-bigquery.md:
--------------------------------------------------------------------------------
```markdown
# BigQuery Logging for BioMCP
This document outlines how BioMCP uses Google BigQuery for logging user interactions and API usage.
## Overview
BioMCP integrates with Google BigQuery to log user interactions, queries, and API usage. This logging provides valuable insights into how the system is being used, helps with debugging, and enables analytics for improving the service.
## Prerequisites
- A Google Cloud Platform (GCP) account
- A BigQuery dataset and table created in your GCP project
- A GCP service account with BigQuery permissions
## Setting Up BigQuery for BioMCP
1. **Create a BigQuery Dataset and Table**
- In the Google Cloud Console, navigate to BigQuery
- Create a new dataset (e.g., `biomcp_logs`)
- Create a table within the dataset (e.g., `worker_logs`) with the following schema:
```
timestamp: TIMESTAMP
userEmail: STRING
query: STRING
```
- Adjust the schema as needed for your specific logging requirements
2. **Create a Service Account**
- Navigate to "IAM & Admin" > "Service Accounts" in the Google Cloud Console
- Create a new service account with a descriptive name (e.g., `biomcp-bigquery-logger`)
- Assign the "BigQuery Data Editor" role to the service account
- Create and download a JSON key for the service account
3. **Configure BioMCP with BigQuery Credentials**
- Open `wrangler.toml` in the BioMCP project
- Update the following variables with your BigQuery information:
```toml
BQ_PROJECT_ID = "your-gcp-project-id"
BQ_DATASET = "biomcp_logs"
BQ_TABLE = "worker_logs"
```
- For the service account key, use Cloudflare's secret management:
```bash
npx wrangler secret put BQ_SA_KEY_JSON
```
When prompted, paste the entire JSON content of your service account key file
## How BigQuery Logging Works
The BioMCP worker uses the following process to log data to BigQuery:
1. **Authentication**: The worker generates a JWT token using the service account credentials
2. **Token Exchange**: The JWT is exchanged for a Google OAuth access token
3. **Data Insertion**: The worker uses BigQuery's streaming insert API to log events
The implementation includes:
- Token caching to minimize authentication requests
- Error handling for failed logging attempts
- Automatic retry logic for transient failures
## Logged Information
By default, the following information is logged to BigQuery:
- **timestamp**: When the event occurred
- **userEmail**: The email address of the authenticated user (if available)
- **query**: The query or request that was made
You can extend the logging schema to include additional information as needed.
## Accessing and Analyzing Logs
To access and analyze the logs:
1. **Query the BigQuery Table**
- Use the BigQuery console or SQL to query your logs
- Example query to see recent logs:
```sql
SELECT timestamp, userEmail, query
FROM `your-project.biomcp_logs.worker_logs`
ORDER BY timestamp DESC
LIMIT 100
```
2. **Create Visualizations**
- Use Google Data Studio to create dashboards based on your BigQuery data
- Connect Data Studio to your BigQuery table and create visualizations
## Security Considerations
- The service account key is sensitive information and should be protected
- Use Cloudflare's secret management to store the key securely
- Consider implementing field-level encryption for sensitive data
- Implement data retention policies to comply with privacy regulations
- **IMPORTANT: Never include PHI (Protected Health Information) or PII (Personally Identifiable Information) in queries or logs**
- Ensure all queries are sanitized to remove patient identifiers, medical record numbers, and other sensitive information
- Consider implementing automatic redaction of potential PHI/PII from logs
- Regularly audit logs to ensure compliance with HIPAA and other privacy regulations
- Remember that BigQuery logs are not designed for storing protected health information
### Automatic Sanitization
BioMCP automatically sanitizes sensitive data before logging to BigQuery:
- **API Keys and Secrets**: Fields containing `api_key`, `apiKey`, `api-key`, `token`, `secret`, or `password` are automatically redacted
- **Nested Objects**: Sanitization works recursively through nested objects and arrays
- **Case-Insensitive**: Field name matching is case-insensitive to catch variations
- **Preserved Structure**: The original request structure is maintained with sensitive values replaced by `[REDACTED]`
Example of sanitization:
```javascript
// Original request
{
"params": {
"arguments": {
"api_key": "AIzaSyB1234567890",
"gene": "BRAF"
}
}
}
// Sanitized for BigQuery
{
"params": {
"arguments": {
"api_key": "[REDACTED]",
"gene": "BRAF"
}
}
}
```
### Excluded Queries
Certain types of queries are automatically excluded from BigQuery logging:
- **Think Tool Calls**: Any calls to the `think` tool are not logged
- **Thinking Domain**: Queries with `domain="thinking"` or `domain="think"` are excluded
- **Privacy-First Design**: This ensures that internal reasoning and analysis steps remain private
## Troubleshooting
- **Authentication Failures**: Verify that the service account key is correctly formatted and has the necessary permissions
- **Insertion Errors**: Check that the BigQuery table schema matches the data being inserted
- **Missing Logs**: Ensure that the worker has network access to the BigQuery API
## Example Code
The worker includes the following key functions for BigQuery logging:
- `getBQToken()`: Fetches and caches a BigQuery OAuth token
- `insertEvent()`: Inserts a single row into BigQuery via streaming insert
- `sanitizeObject()`: Recursively sanitizes sensitive fields from objects before logging
These functions handle the authentication and data insertion process automatically.
## Testing
BioMCP includes comprehensive tests for the BigQuery logging functionality:
### JavaScript Tests
The sanitization logic is tested using Node.js built-in test framework:
```bash
# Run JavaScript worker tests
make test-js
# Or run directly
node --test tests/tdd/workers/test_worker_sanitization.js
```
Tests cover:
- API key redaction
- Nested sensitive field handling
- Array sanitization
- Case-insensitive field matching
- Think tool detection
- Domain-based filtering
```
--------------------------------------------------------------------------------
/src/biomcp/organizations/search.py:
--------------------------------------------------------------------------------
```python
"""Search functionality for organizations via NCI CTS API."""
import logging
from typing import Any
from ..constants import NCI_ORGANIZATIONS_URL
from ..integrations.cts_api import CTSAPIError, make_cts_request
from ..utils import parse_or_query
logger = logging.getLogger(__name__)
async def search_organizations(
name: str | None = None,
org_type: str | None = None,
city: str | None = None,
state: str | None = None,
page_size: int = 20,
page: int = 1,
api_key: str | None = None,
) -> dict[str, Any]:
"""
Search for organizations in the NCI CTS database.
Args:
name: Organization name to search for (partial match)
org_type: Type of organization (e.g., "industry", "academic")
city: City location
state: State location (2-letter code)
page_size: Number of results per page
page: Page number
api_key: Optional API key (if not provided, uses NCI_API_KEY env var)
Returns:
Dictionary with search results containing:
- organizations: List of organization records
- total: Total number of results
- page: Current page
- page_size: Results per page
Raises:
CTSAPIError: If the API request fails
"""
# Build query parameters
params: dict[str, Any] = {
"size": page_size,
}
# Note: The NCI API doesn't support offset/page pagination for organizations
# It uses cursor-based pagination or returns all results up to size limit
# Add search filters with correct API parameter names
if name:
params["name"] = name
if org_type:
params["type"] = org_type
if city:
params["org_city"] = city
if state:
params["org_state_or_province"] = state
try:
# Make API request
response = await make_cts_request(
url=NCI_ORGANIZATIONS_URL,
params=params,
api_key=api_key,
)
# Process response - adapt to actual API format
# This is a reasonable structure based on typical REST APIs
organizations = response.get("data", response.get("organizations", []))
total = response.get("total", len(organizations))
return {
"organizations": organizations,
"total": total,
"page": page,
"page_size": page_size,
}
except CTSAPIError:
raise
except Exception as e:
logger.error(f"Failed to search organizations: {e}")
raise CTSAPIError(f"Organization search failed: {e!s}") from e
def format_organization_results(results: dict[str, Any]) -> str:
"""
Format organization search results as markdown.
Args:
results: Search results dictionary
Returns:
Formatted markdown string
"""
organizations = results.get("organizations", [])
total = results.get("total", 0)
if not organizations:
return "No organizations found matching the search criteria."
# Build markdown output
lines = [
f"## Organization Search Results ({total} found)",
"",
]
for org in organizations:
org_id = org.get("id", org.get("org_id", "Unknown"))
name = org.get("name", "Unknown Organization")
org_type = org.get("type", org.get("category", "Unknown"))
city = org.get("city", "")
state = org.get("state", "")
lines.append(f"### {name}")
lines.append(f"- **ID**: {org_id}")
lines.append(f"- **Type**: {org_type}")
if city or state:
location_parts = [p for p in [city, state] if p]
lines.append(f"- **Location**: {', '.join(location_parts)}")
lines.append("")
return "\n".join(lines)
async def search_organizations_with_or(
name_query: str,
org_type: str | None = None,
city: str | None = None,
state: str | None = None,
page_size: int = 20,
page: int = 1,
api_key: str | None = None,
) -> dict[str, Any]:
"""
Search for organizations with OR query support.
This function handles OR queries by making multiple API calls and combining results.
For example: "MD Anderson OR Mayo Clinic" will search for each term.
Args:
name_query: Name query that may contain OR operators
Other args same as search_organizations
Returns:
Combined results from all searches with duplicates removed
"""
# Check if this is an OR query
if " OR " in name_query or " or " in name_query:
search_terms = parse_or_query(name_query)
logger.info(f"Parsed OR query into terms: {search_terms}")
else:
# Single term search
search_terms = [name_query]
# Collect all unique organizations
all_organizations = {}
total_found = 0
# Search for each term
for term in search_terms:
logger.info(f"Searching organizations for term: {term}")
try:
results = await search_organizations(
name=term,
org_type=org_type,
city=city,
state=state,
page_size=page_size,
page=page,
api_key=api_key,
)
# Add unique organizations (deduplicate by ID)
for org in results.get("organizations", []):
org_id = org.get("id", org.get("org_id"))
if org_id and org_id not in all_organizations:
all_organizations[org_id] = org
total_found += results.get("total", 0)
except Exception as e:
logger.warning(f"Failed to search for term '{term}': {e}")
# Continue with other terms
# Convert back to list and apply pagination
unique_organizations = list(all_organizations.values())
# Sort by name for consistent results
unique_organizations.sort(key=lambda x: x.get("name", "").lower())
# Apply pagination to combined results
start_idx = (page - 1) * page_size
end_idx = start_idx + page_size
paginated_organizations = unique_organizations[start_idx:end_idx]
return {
"organizations": paginated_organizations,
"total": len(unique_organizations),
"page": page,
"page_size": page_size,
"search_terms": search_terms, # Include what we searched for
"total_found_across_terms": total_found, # Total before deduplication
}
```
--------------------------------------------------------------------------------
/smithery.yaml:
--------------------------------------------------------------------------------
```yaml
# Smithery configuration file: https://smithery.ai/docs/config#smitheryyaml
startCommand:
type: stdio
configSchema:
# JSON Schema defining the configuration options for the MCP.
type: object
properties: {}
commandFunction:
# A JS function that produces the CLI command based on the given config to start the MCP on stdio.
|-
(config) => ({ command: 'biomcp', args: ['run'], env: {} })
exampleConfig: {}
schemas:
TrialQuery:
type: object
properties:
conditions:
type: array
items:
type: string
description: "List of condition terms."
terms:
type: array
items:
type: string
description: "General search terms that don't fit specific categories."
interventions:
type: array
items:
type: string
description: "Intervention names."
recruiting_status:
type: string
description: "Study recruitment status."
study_type:
type: string
description: "Type of study."
nct_ids:
type: array
items:
type: string
description: "Clinical trial NCT IDs"
lat:
type: number
description: "Latitude for location search"
long:
type: number
description: "Longitude for location search"
distance:
type: integer
description: "Distance from lat/long in miles"
min_date:
type: string
description: "Minimum date for filtering"
max_date:
type: string
description: "Maximum date for filtering"
date_field:
type: string
description: "Date field to filter on"
phase:
type: string
description: "Trial phase filter"
age_group:
type: string
description: "Age group filter"
primary_purpose:
type: string
description: "Primary purpose of the trial"
intervention_type:
type: string
description: "Type of intervention"
sponsor_type:
type: string
description: "Type of sponsor"
study_design:
type: string
description: "Study design"
sort:
type: string
description: "Sort order for results"
next_page_hash:
type: string
description: "Token to retrieve the next page of results"
VariantQuery:
type: object
properties:
gene:
type: string
description: "Gene symbol to search for (e.g. BRAF, TP53)"
hgvsp:
type: string
description: "Protein change notation (e.g., p.V600E, p.Arg557His)"
hgvsc:
type: string
description: "cDNA notation (e.g., c.1799T>A)"
rsid:
type: string
description: "dbSNP rsID (e.g., rs113488022)"
region:
type: string
description: "Genomic region as chr:start-end (e.g. chr1:12345-67890)"
significance:
type: string
description: "ClinVar clinical significance"
max_frequency:
type: number
description: "Maximum population allele frequency threshold"
min_frequency:
type: number
description: "Minimum population allele frequency threshold"
cadd:
type: number
description: "Minimum CADD phred score"
polyphen:
type: string
description: "PolyPhen-2 prediction"
sift:
type: string
description: "SIFT prediction"
sources:
type: array
items:
type: string
description: "Include only specific data sources"
size:
type: integer
description: "Number of results to return"
default: 40
offset:
type: integer
description: "Result offset for pagination"
default: 0
PubmedRequest:
type: object
properties:
chemicals:
type: array
items:
type: string
description: "List of chemicals for filtering results."
diseases:
type: array
items:
type: string
description: "Diseases such as Hypertension, Lung Adenocarcinoma, etc."
genes:
type: array
items:
type: string
description: "List of genes for filtering results."
keywords:
type: array
items:
type: string
description: "List of other keywords for filtering results."
variants:
type: array
items:
type: string
description: "List of variants for filtering results."
tools:
trial_searcher:
input:
schema:
type: object
properties:
query:
$ref: "#/schemas/TrialQuery"
required: ["query"]
variant_searcher:
input:
schema:
type: object
properties:
query:
$ref: "#/schemas/VariantQuery"
required: ["query"]
article_searcher:
input:
schema:
type: object
properties:
query:
$ref: "#/schemas/PubmedRequest"
required: ["query"]
# Simple string parameter functions
trial_protocol:
input:
schema:
type: object
properties:
nct_id:
type: string
description: "A single NCT ID (e.g., NCT04280705)"
required: ["nct_id"]
trial_locations:
input:
schema:
type: object
properties:
nct_id:
type: string
description: "A single NCT ID (e.g., NCT04280705)"
required: ["nct_id"]
trial_outcomes:
input:
schema:
type: object
properties:
nct_id:
type: string
description: "A single NCT ID (e.g., NCT04280705)"
required: ["nct_id"]
trial_references:
input:
schema:
type: object
properties:
nct_id:
type: string
description: "A single NCT ID (e.g., NCT04280705)"
required: ["nct_id"]
article_details:
input:
schema:
type: object
properties:
pmid:
type: string
description: "A single PubMed ID (e.g., 34397683)"
required: ["pmid"]
variant_details:
input:
schema:
type: object
properties:
variant_id:
type: string
description: "A variant identifier (e.g., chr7:g.140453136A>T)"
required: ["variant_id"]
```
--------------------------------------------------------------------------------
/tests/tdd/openfda/test_adverse_events.py:
--------------------------------------------------------------------------------
```python
"""
Unit tests for OpenFDA adverse events integration.
"""
from unittest.mock import patch
import pytest
from biomcp.openfda.adverse_events import (
get_adverse_event,
search_adverse_events,
)
@pytest.mark.asyncio
async def test_search_adverse_events_by_drug():
"""Test searching adverse events by drug name."""
mock_response = {
"meta": {"results": {"total": 100}},
"results": [
{
"patient": {
"drug": [
{
"medicinalproduct": "IMATINIB",
"openfda": {
"brand_name": ["GLEEVEC"],
"generic_name": ["IMATINIB MESYLATE"],
},
}
],
"reaction": [
{"reactionmeddrapt": "NAUSEA"},
{"reactionmeddrapt": "FATIGUE"},
],
"patientonsetage": "45",
"patientsex": 2,
},
"serious": "1",
"seriousnesshospitalization": "1",
"receivedate": "20240115",
}
],
}
with patch(
"biomcp.openfda.adverse_events.make_openfda_request"
) as mock_request:
mock_request.return_value = (mock_response, None)
result = await search_adverse_events(drug="imatinib", limit=10)
# Verify the request was made correctly
mock_request.assert_called_once()
call_args = mock_request.call_args
assert "imatinib" in call_args[0][1]["search"].lower()
# Check the output contains expected information
assert "FDA Adverse Event Reports" in result
assert "imatinib" in result.lower()
assert "NAUSEA" in result
assert "FATIGUE" in result
assert "100 reports" in result
@pytest.mark.asyncio
async def test_search_adverse_events_by_reaction():
"""Test searching adverse events by reaction."""
mock_response = {
"meta": {"results": {"total": 50}},
"results": [
{
"patient": {
"drug": [{"medicinalproduct": "ASPIRIN"}],
"reaction": [{"reactionmeddrapt": "HEADACHE"}],
},
"serious": "0",
"receivedate": "20240201",
}
],
}
with patch(
"biomcp.openfda.adverse_events.make_openfda_request"
) as mock_request:
mock_request.return_value = (mock_response, None)
result = await search_adverse_events(reaction="headache", limit=10)
# Verify the request
mock_request.assert_called_once()
call_args = mock_request.call_args
assert "headache" in call_args[0][1]["search"].lower()
# Check output
assert "HEADACHE" in result
assert "50 reports" in result
@pytest.mark.asyncio
async def test_search_adverse_events_no_params():
"""Test that searching without parameters returns helpful message."""
result = await search_adverse_events()
assert "Please specify" in result
assert "drug name or reaction" in result
assert "Examples:" in result
@pytest.mark.asyncio
async def test_search_adverse_events_no_results():
"""Test handling when no results are found."""
with patch(
"biomcp.openfda.adverse_events.make_openfda_request"
) as mock_request:
mock_request.return_value = ({"results": []}, None)
result = await search_adverse_events(drug="nonexistentdrug")
assert "No adverse event reports found" in result
assert "nonexistentdrug" in result
@pytest.mark.asyncio
async def test_search_adverse_events_error():
"""Test error handling in adverse event search."""
with patch(
"biomcp.openfda.adverse_events.make_openfda_request"
) as mock_request:
mock_request.return_value = (None, "API rate limit exceeded")
result = await search_adverse_events(drug="aspirin")
assert "Error searching adverse events" in result
assert "API rate limit exceeded" in result
@pytest.mark.asyncio
async def test_get_adverse_event_detail():
"""Test getting detailed adverse event report."""
mock_response = {
"results": [
{
"safetyreportid": "12345678",
"patient": {
"patientonsetage": "55",
"patientsex": 1,
"patientweight": "75",
"drug": [
{
"medicinalproduct": "DRUG A",
"drugindication": "HYPERTENSION",
"drugdosagetext": "100mg daily",
"drugadministrationroute": "048",
"actiondrug": 4,
}
],
"reaction": [
{"reactionmeddrapt": "DIZZINESS", "reactionoutcome": 1}
],
},
"serious": "1",
"seriousnesshospitalization": "1",
"receivedate": "20240115",
"reporttype": 1,
}
]
}
with patch(
"biomcp.openfda.adverse_events.make_openfda_request"
) as mock_request:
mock_request.return_value = (mock_response, None)
result = await get_adverse_event("12345678")
# Verify request
mock_request.assert_called_once()
call_args = mock_request.call_args
assert "12345678" in call_args[0][1]["search"]
# Check detailed output
assert "12345678" in result
assert "Patient Information" in result
assert "55 years" in result
assert "Male" in result
assert "75 kg" in result
assert "DRUG A" in result
assert "HYPERTENSION" in result
assert "100mg daily" in result
assert "DIZZINESS" in result
assert "Recovered/Resolved" in result
@pytest.mark.asyncio
async def test_get_adverse_event_not_found():
"""Test handling when adverse event report is not found."""
with patch(
"biomcp.openfda.adverse_events.make_openfda_request"
) as mock_request:
mock_request.return_value = ({"results": []}, None)
result = await get_adverse_event("NOTFOUND123")
assert "NOTFOUND123" in result
assert "not found" in result
```
--------------------------------------------------------------------------------
/src/biomcp/openfda/adverse_events_helpers.py:
--------------------------------------------------------------------------------
```python
"""
Helper functions for OpenFDA adverse events to reduce complexity.
"""
from collections import Counter
from typing import Any
from .utils import (
extract_drug_names,
extract_reactions,
format_count,
format_drug_list,
)
def format_search_summary(
drug: str | None, reaction: str | None, serious: bool | None, total: int
) -> list[str]:
"""Format the search summary section."""
output = []
# Add search criteria
search_desc = []
if drug:
search_desc.append(f"**Drug**: {drug}")
if reaction:
search_desc.append(f"**Reaction**: {reaction}")
if serious is not None:
search_desc.append(f"**Serious Events**: {'Yes' if serious else 'No'}")
if search_desc:
output.append(" | ".join(search_desc))
output.append(
f"**Total Reports Found**: {format_count(total, 'report')}\n"
)
return output
def format_top_reactions(results: list[dict[str, Any]]) -> list[str]:
"""Format top reported reactions from search results."""
output = []
all_reactions = []
for result in results:
all_reactions.extend(extract_reactions(result))
if all_reactions:
reaction_counts = Counter(all_reactions)
top_reactions = reaction_counts.most_common(10)
output.append("### Top Reported Reactions:")
for rxn, count in top_reactions:
percentage = (count / len(results)) * 100
output.append(f"- **{rxn}**: {count} reports ({percentage:.1f}%)")
output.append("")
return output
def format_report_summary(
result: dict[str, Any], report_num: int
) -> list[str]:
"""Format a single report summary."""
output = [f"#### Report {report_num}"]
# Extract key information
drugs = extract_drug_names(result)
reactions = extract_reactions(result)
# Patient info
patient = result.get("patient", {})
age = patient.get("patientonsetage")
sex_map = {0: "Unknown", 1: "Male", 2: "Female"}
sex = sex_map.get(patient.get("patientsex"), "Unknown")
# Serious outcomes
serious_flag = result.get("serious", "0")
outcomes = []
for code in [
"seriousnessdeath",
"seriousnesslifethreatening",
"seriousnesshospitalization",
"seriousnessdisabling",
]:
if result.get(code) == "1":
outcomes.append(code.replace("seriousness", "").title())
# Format output
output.append(f"- **Drugs**: {format_drug_list(drugs)}")
output.append(f"- **Reactions**: {', '.join(reactions[:5])}")
if age:
output.append(f"- **Patient**: {age} years, {sex}")
if serious_flag == "1" and outcomes:
output.append(f"- **Serious Outcome**: {', '.join(outcomes)}")
# Dates
receive_date = result.get("receivedate", "")
if receive_date:
output.append(
f"- **Report Date**: {receive_date[:4]}-{receive_date[4:6]}-{receive_date[6:]}"
)
output.append("")
return output
def format_drug_details(drugs: list[dict[str, Any]]) -> list[str]:
"""Format drug information details."""
from .utils import clean_text
output = ["### Drug Information"]
for i, drug in enumerate(drugs, 1):
output.append(
f"\n#### Drug {i}: {drug.get('medicinalproduct', 'Unknown')}"
)
if "drugindication" in drug:
output.append(f"- **Indication**: {drug['drugindication']}")
if "drugdosagetext" in drug:
dosage = clean_text(drug["drugdosagetext"])
output.append(f"- **Dosage**: {dosage}")
if "drugadministrationroute" in drug:
output.append(f"- **Route**: {drug['drugadministrationroute']}")
# Drug action taken
action_map = {
1: "Drug withdrawn",
2: "Dose reduced",
3: "Dose increased",
4: "Dose not changed",
5: "Unknown",
6: "Not applicable",
}
action_code = drug.get("actiondrug")
action = (
action_map.get(action_code, "Unknown")
if action_code is not None
else "Unknown"
)
output.append(f"- **Action Taken**: {action}")
output.append("")
return output
def format_reaction_details(reactions: list[dict[str, Any]]) -> list[str]:
"""Format adverse reaction details."""
output = ["### Adverse Reactions"]
for reaction in reactions:
rxn_name = reaction.get("reactionmeddrapt", "Unknown")
outcome_map = {
1: "Recovered/Resolved",
2: "Recovering/Resolving",
3: "Not recovered/Not resolved",
4: "Recovered/Resolved with sequelae",
5: "Fatal",
6: "Unknown",
}
outcome_code = reaction.get("reactionoutcome")
outcome = (
outcome_map.get(outcome_code, "Unknown")
if outcome_code is not None
else "Unknown"
)
output.append(f"- **{rxn_name}**: {outcome}")
output.append("")
return output
def format_report_metadata(result: dict[str, Any]) -> list[str]:
"""Format report metadata information."""
output = ["### Report Information"]
receive_date = result.get("receivedate", "")
if receive_date:
formatted_date = (
f"{receive_date[:4]}-{receive_date[4:6]}-{receive_date[6:]}"
)
output.append(f"- **Report Date**: {formatted_date}")
report_type_map = {
1: "Spontaneous",
2: "Report from study",
3: "Other",
4: "Not available to sender",
}
report_type_code = result.get("reporttype")
report_type = (
report_type_map.get(report_type_code, "Unknown")
if report_type_code is not None
else "Unknown"
)
output.append(f"- **Report Type**: {report_type}")
# Seriousness
if result.get("serious") == "1":
outcomes = []
if result.get("seriousnessdeath") == "1":
outcomes.append("Death")
if result.get("seriousnesslifethreatening") == "1":
outcomes.append("Life-threatening")
if result.get("seriousnesshospitalization") == "1":
outcomes.append("Hospitalization")
if result.get("seriousnessdisabling") == "1":
outcomes.append("Disability")
if result.get("seriousnesscongenitalanomali") == "1":
outcomes.append("Congenital anomaly")
if result.get("seriousnessother") == "1":
outcomes.append("Other serious")
if outcomes:
output.append(f"- **Serious Outcomes**: {', '.join(outcomes)}")
return output
```
--------------------------------------------------------------------------------
/docs/blog/researcher-persona-resource.md:
--------------------------------------------------------------------------------
```markdown
# BioMCP Deep Researcher Persona
With the release of BioMCP v0.1.2, users can now access a specialized
Researcher Persona that transforms Claude into a rigorous biomedical research
assistant using BioMCP's built-in sequential thinking capabilities.
This persona is designed to leverage BioMCP's suite of tools for accessing
PubMed articles, ClinicalTrials.gov data, and genomic variant information,
while incorporating Claude's web search capabilities to produce comprehensive,
thoroughly-researched reports.
## How to Use the Researcher Persona
Getting started with the BioMCP Researcher Persona is straightforward:
1. Configure Claude Desktop by updating your configuration JSON with:
```json
{
"mcpServers": {
"biomcp": {
"command": "uv",
"args": ["run", "--with", "biomcp-python>=0.1.2", "biomcp", "run"]
}
}
}
```
2. Restart Claude Desktop (the `>=0.1.2` ensures the latest version is used, which includes the built-in think tool)
3. Select the "Researcher" persona from the dropdown menu

4. Ask your biomedical research question
The Researcher Persona will then work through its 10-step process, keeping you
updated on its progress and ultimately producing a comprehensive research
brief.
## Video Demonstration
Below is a video demonstrating the Researcher Persona in action:
[](https://youtu.be/tBGG53O-7Hg)
## Sequential Thinking: A Rigorous 10-Step Research Process
What makes the Researcher Persona so powerful is its integration with BioMCP's
built-in 'think' tool, which guides the AI through a comprehensive
10-step research methodology:
1. **Topic Scoping & Domain Framework**: Creating a comprehensive structure to
ensure complete coverage
2. **Initial Information Gathering**: Establishing baseline terminology and
recent developments
3. **Focused & Frontier Retrieval**: Filling knowledge gaps and identifying
cutting-edge developments
4. **Primary Trials Analysis**: Identifying and analyzing key clinical trials
5. **Primary Literature Analysis**: Identifying and analyzing pivotal
publications
6. **Initial Evidence Synthesis**: Creating a preliminary framework of findings
7. **Integrated Gap-Filling**: Addressing identified knowledge gaps
8. **Comprehensive Evidence Synthesis**: Creating a final integrated framework
with quality assessment
9. **Self-Critique and Verification**: Rigorously assessing the quality and
comprehensiveness
10. **Research Brief Creation**: Producing the final deliverable with all
required elements
[](https://github.com/genomoncology/biomcp/blob/main/src/biomcp/resources/researcher.md)
This structured approach ensures that no important aspects of the research
question are overlooked and that the final output is comprehensive,
well-organized, and backed by current evidence.
## Put to the Test: Emerging Treatment Strategies for Head and Neck Cancer
To evaluate the effectiveness of the Researcher Persona, we conducted a
head-to-head comparison with other AI research approaches. We asked the same
question to five different systems: "What are the emerging treatment strategies
for head and neck cancer?"
The results were impressive. The BioMCP-powered Researcher Persona, combined
with Claude's web search capabilities and the built-in think tool,
produced the highest-rated research brief among all approaches tested.
[](https://github.com/genomoncology/biomcp-examples#researcher-announcement)
The research brief produced by the BioMCP Researcher Persona stood out for
several reasons:
1. **Comprehensive domain coverage**: The report covered all relevant treatment
modalities (immunotherapy, targeted therapy, radiation techniques, surgery,
combination approaches)
2. **Structured evidence categorization**: Findings were clearly organized by
level of evidence (Established, Emerging, Experimental, Theoretical)
3. **Evidence quality assessment**: The brief included critical evaluation of
source quality and evidence strength
4. **Thorough citation**: All claims were backed by specific references to
scientific literature or clinical trials
5. **Self-critique**: The report included transparent limitations and
identified areas requiring further research
## Explore the Example and Evaluations
We've documented this comparison in detail in
the [biomcp-examples repository](https://github.com/genomoncology/biomcp-examples),
where you can find:
- The full research briefs produced by each approach
- Independent evaluations by three different AI judges (Claude 3.7, Gemini 2.5
Pro, and OpenAI o3)
- Detailed scoring against a rubric that prioritizes accuracy, clarity, and
comprehensiveness
- Analysis of strengths and weaknesses of each approach
The consensus among the judges placed the BioMCP-powered brief at the top,
highlighting its exceptional structure, evidence-based approach, and
comprehensive coverage.
## Beyond the Example: Wide-Ranging Applications
While our example focused on head and neck cancer treatments, the BioMCP
Researcher Persona can tackle a wide range of biomedical research questions:
- **Therapeutic comparisons**: "Compare the efficacy and safety profiles of JAK
inhibitors versus biologics for treating rheumatoid arthritis"
- **Disease mechanisms**: "What is the current understanding of gut microbiome
dysbiosis in inflammatory bowel disease?"
- **Biomarker investigations**: "What emerging biomarkers show promise for
early detection of pancreatic cancer?"
- **Treatment protocols**: "What are the latest guidelines for managing
anticoagulation in patients with atrial fibrillation and chronic kidney
disease?"
## Join the BioMCP Community
The Researcher Persona is just one example of how BioMCP is transforming
AI-assisted biomedical research. We invite you to:
1. Try the Researcher Persona with your own research questions
2. Contribute to
the [biomcp-examples repository](https://github.com/genomoncology/biomcp-examples)
with your experiments
3. Share your feedback and suggestions for future improvements
By combining specialized biomedical data access with structured research
methodologies, BioMCP is helping researchers produce more comprehensive,
accurate, and useful biomedical research briefs than ever before.
Have a complex biomedical research question? Give the BioMCP Researcher Persona
a try and experience the difference a structured, tool-powered approach can
make!
```
--------------------------------------------------------------------------------
/src/biomcp/variants/getter.py:
--------------------------------------------------------------------------------
```python
"""Getter module for retrieving variant details."""
import inspect
import json
import logging
from typing import Annotated
from .. import ensure_list, http_client, render
from ..constants import DEFAULT_ASSEMBLY, MYVARIANT_GET_URL
from ..oncokb_helper import get_oncokb_annotation_for_variant
from .external import ExternalVariantAggregator, format_enhanced_annotations
from .filters import filter_variants
from .links import inject_links
logger = logging.getLogger(__name__)
def _format_error_response(
error: http_client.RequestError, variant_id: str
) -> list[dict[str, str]]:
"""Format error response for consistent rendering.
Args:
error: The error object from the API request
variant_id: The variant identifier that was queried
Returns:
List containing error dict for markdown/json rendering
"""
if error.code == 404:
error_msg = (
f"Variant '{variant_id}' not found in MyVariant.info database. "
"Please verify the variant identifier (e.g., rs113488022 or "
"chr7:g.140453136A>T)."
)
else:
error_msg = f"Error {error.code}: {error.message}"
return [{"error": error_msg}]
async def get_variant( # noqa: C901
variant_id: str,
output_json: bool = False,
include_external: bool = False,
assembly: str = DEFAULT_ASSEMBLY,
) -> str:
"""
Get variant details from MyVariant.info using the variant identifier.
The identifier can be a full HGVS-style string (e.g. "chr7:g.140453136A>T")
or an rsID (e.g. "rs113488022"). The API response is expected to include a
"hits" array; this function extracts the first hit.
Args:
variant_id: Variant identifier (HGVS or rsID)
output_json: Return JSON format if True, else Markdown
include_external: Include external annotations (TCGA, 1000 Genomes, cBioPortal)
assembly: Genome assembly (hg19 or hg38), defaults to hg19
Returns:
Formatted variant data as JSON or Markdown string
If output_json is True, the result is returned as a formatted JSON string;
otherwise, it is rendered as Markdown.
"""
response, error = await http_client.request_api(
url=f"{MYVARIANT_GET_URL}/{variant_id}",
request={"fields": "all", "assembly": assembly},
method="GET",
domain="myvariant",
)
# Handle errors gracefully with user-friendly messages
if error:
data_to_return = _format_error_response(error, variant_id)
# Skip all processing for error responses
if output_json:
return json.dumps(data_to_return, indent=2)
else:
return render.to_markdown(data_to_return)
data_to_return = ensure_list(response)
# Inject database links into the variant data
data_to_return = inject_links(data_to_return)
data_to_return = filter_variants(data_to_return)
# Collect OncoKB annotations separately for markdown appendage
oncokb_annotations: list[str] = []
# Add external annotations if requested
if include_external and data_to_return:
logger.info(
f"Adding external annotations for {len(data_to_return)} variants"
)
aggregator = ExternalVariantAggregator()
for _i, variant_data in enumerate(data_to_return):
logger.info(
f"Processing variant {_i}: keys={list(variant_data.keys())}"
)
# Get enhanced annotations
enhanced = await aggregator.get_enhanced_annotations(
variant_id,
include_tcga=True,
include_1000g=True,
include_cbioportal=True,
variant_data=variant_data,
)
# Add formatted annotations to the variant data
formatted = format_enhanced_annotations(enhanced)
logger.info(
f"Formatted external annotations: {formatted['external_annotations'].keys()}"
)
variant_data.update(formatted["external_annotations"])
# Get formatted OncoKB annotation separately
gene_aa = aggregator._extract_gene_aa_change(variant_data)
# Handle case where method might be mocked as async in tests
if inspect.iscoroutine(gene_aa) or inspect.isawaitable(gene_aa):
gene_aa = await gene_aa
if gene_aa:
parts = gene_aa.split(" ", 1)
if len(parts) == 2:
gene, variant = parts
oncokb_formatted = await get_oncokb_annotation_for_variant(
gene, variant
)
if oncokb_formatted:
oncokb_annotations.append(oncokb_formatted)
if output_json:
return json.dumps(data_to_return, indent=2)
else:
# Render base markdown and append OncoKB annotations
base_markdown = render.to_markdown(data_to_return)
if oncokb_annotations:
# Append OncoKB annotations as separate markdown sections
return base_markdown + "\n" + "\n".join(oncokb_annotations)
return base_markdown
async def _variant_details(
call_benefit: Annotated[
str,
"Define and summarize why this function is being called and the intended benefit",
],
variant_id: str,
include_external: Annotated[
bool,
"Include annotations from external sources (TCGA, 1000 Genomes, cBioPortal)",
] = True,
assembly: Annotated[
str,
"Genome assembly (hg19 or hg38). Default: hg19",
] = DEFAULT_ASSEMBLY,
) -> str:
"""
Retrieves detailed information for a *single* genetic variant.
Parameters:
- call_benefit: Define and summarize why this function is being called and the intended benefit
- variant_id: A variant identifier ("chr7:g.140453136A>T")
- include_external: Include annotations from TCGA, 1000 Genomes, cBioPortal, and Mastermind
- assembly: Genome assembly (hg19 or hg38). Default: hg19
Process: Queries the MyVariant.info GET endpoint, optionally fetching
additional annotations from external databases
Output: A Markdown formatted string containing comprehensive
variant annotations (genomic context, frequencies,
predictions, clinical data, external annotations). Returns error if invalid.
Note: Use the variant_searcher to find the variant id first.
"""
return await get_variant(
variant_id,
output_json=False,
include_external=include_external,
assembly=assembly,
)
```
--------------------------------------------------------------------------------
/mkdocs.yml:
--------------------------------------------------------------------------------
```yaml
site_name: BioMCP
repo_url: https://github.com/genomoncology/biomcp
site_url: https://biomcp.org/
site_description: Biomedical Model Context Protocol Server
site_author: Ian Maurer
edit_uri: edit/main/docs/
repo_name: genomoncology/biomcp
copyright: Maintained by <a href="https://genomoncology.com">genomoncology</a>.
nav:
- Home: index.md
- Getting Started:
- Quick Start: getting-started/01-quickstart-cli.md
- Claude Desktop: getting-started/02-claude-desktop-integration.md
- API Keys: getting-started/03-authentication-and-api-keys.md
- FAQ: faq-condensed.md
- Troubleshooting: troubleshooting.md
- User Guide:
- Overview: concepts/01-what-is-biomcp.md
- Finding Articles: how-to-guides/01-find-articles-and-cbioportal-data.md
- Finding Trials: how-to-guides/02-find-trials-with-nci-and-biothings.md
- Analyzing Variants: how-to-guides/03-get-comprehensive-variant-annotations.md
- Predicting Effects: how-to-guides/04-predict-variant-effects-with-alphagenome.md
- Searching Organizations: how-to-guides/06-search-nci-organizations-and-interventions.md
- Research Workflows: workflows/all-workflows.md
- Examples:
- Pydantic AI Integration: tutorials/pydantic-ai-integration.md
- Remote Connection: tutorials/remote-connection.md
- BioThings Examples: tutorials/biothings-prompts.md
- NCI Examples: tutorials/nci-prompts.md
- AlphaGenome Tutorial: tutorials/claude-code-biomcp-alphagenome.md
- OpenFDA Examples: tutorials/openfda-prompts.md
- Concepts:
- Deep Researcher: concepts/02-the-deep-researcher-persona.md
- Sequential Thinking: concepts/03-sequential-thinking-with-the-think-tool.md
- Reference:
- Quick Reference: reference/quick-reference.md
- CLI Commands: user-guides/01-command-line-interface.md
- MCP Tools: user-guides/02-mcp-tools-reference.md
- API Documentation:
- API Overview: apis/overview.md
- Python SDK: apis/python-sdk.md
- Error Codes: apis/error-codes.md
- IDE Integration: user-guides/03-integrating-with-ides-and-clients.md
- Developer:
- Architecture:
- Overview: reference/quick-architecture.md
- Visual Diagrams: reference/visual-architecture.md
- Detailed Diagrams: reference/architecture-diagrams.md
- Data Sources:
- Overview: backend-services-reference/01-overview.md
- PubTator3/PubMed: backend-services-reference/06-pubtator3.md
- ClinicalTrials.gov: backend-services-reference/04-clinicaltrials-gov.md
- NCI CTS API: backend-services-reference/05-nci-cts-api.md
- BioThings Suite: backend-services-reference/02-biothings-suite.md
- cBioPortal: backend-services-reference/03-cbioportal.md
- AlphaGenome: backend-services-reference/07-alphagenome.md
- OpenFDA: tutorials/openfda-integration.md
- Development:
- Contributing: developer-guides/02-contributing-and-testing.md
- Deployment: developer-guides/01-server-deployment.md
- BigQuery Monitoring: how-to-guides/05-logging-and-monitoring-with-bigquery.md
- Technical Details:
- Transport Protocol: developer-guides/04-transport-protocol.md
- Error Handling: developer-guides/05-error-handling.md
- HTTP Client: developer-guides/06-http-client-and-caching.md
- Performance: developer-guides/07-performance-optimizations.md
- Third-Party APIs: developer-guides/03-third-party-endpoints.md
- Security:
- FDA Integration Security: FDA_SECURITY.md
- About:
- Blog:
- Clinical Trial Search: blog/ai-assisted-clinical-trial-search-analysis.md
- Researcher Persona: blog/researcher-persona-resource.md
- Project:
- Changelog: changelog.md
- Policies: policies.md
- GenomOncology: genomoncology.md
plugins:
- search:
lang: en
separator: '[\s\-\.]+'
- mkdocstrings:
handlers:
python:
paths: ["src/biomcp"]
# Note: sitemap plugin requires additional installation
# Uncomment after installing: pip install mkdocs-sitemap
# - sitemap:
# changefreq: weekly
# priority: 0.5
theme:
name: material
# custom_dir: overrides
favicon: assets/favicon.ico
logo: assets/icon.png
features:
- navigation.tabs
- navigation.tabs.sticky
- navigation.sections
- navigation.instant
- navigation.tracking
- navigation.top
- toc.follow
- search.suggest
- search.highlight
palette:
- media: "(prefers-color-scheme: light)"
scheme: default
primary: white
accent: deep orange
toggle:
icon: material/brightness-7
name: Switch to dark mode
- media: "(prefers-color-scheme: dark)"
scheme: slate
primary: black
accent: deep orange
toggle:
icon: material/brightness-4
name: Switch to light mode
icon:
repo: fontawesome/brands/github
extra:
social:
- icon: fontawesome/brands/github
link: https://github.com/genomoncology/biomcp
- icon: fontawesome/brands/python
link: https://pypi.org/project/biomcp-python
meta:
- property: og:type
content: website
- property: og:title
content: BioMCP - Biomedical Model Context Protocol Server
- property: og:description
content: AI-powered biomedical research tool integrating PubMed, ClinicalTrials.gov, and genomic databases
- property: og:image
content: https://biomcp.org/assets/icon.png
- property: og:url
content: https://biomcp.org/
- name: twitter:card
content: summary
- name: twitter:title
content: BioMCP - Biomedical Model Context Protocol
- name: twitter:description
content: AI-powered biomedical research tool for PubMed, clinical trials, and genomic data
- name: keywords
content: biomedical, MCP, AI, PubMed, clinical trials, genomics, bioinformatics, Claude Desktop
extra_css:
- stylesheets/extra.css
- stylesheets/announcement.css
# extra_javascript: (removed - no third-party dependencies)
markdown_extensions:
- toc:
permalink: true
- pymdownx.arithmatex:
generic: true
- admonition # Nice looking note/warning boxes
- pymdownx.details # Collapsible sections
- pymdownx.highlight: # Code highlighting
anchor_linenums: true
- pymdownx.inlinehilite
- pymdownx.snippets # Include content from other files
- pymdownx.superfences # Nested code blocks
- pymdownx.tabbed: # Tabbed content
alternate_style: true
```
--------------------------------------------------------------------------------
/docs/developer-guides/04-transport-protocol.md:
--------------------------------------------------------------------------------
```markdown
# Transport Protocol Guide
This guide explains BioMCP's transport protocol options, with a focus on the new Streamable HTTP transport that provides better scalability and reliability for production deployments.
## Overview
BioMCP supports multiple transport protocols to accommodate different deployment scenarios:
| Transport | Use Case | Endpoint | Protocol Version |
| ------------------- | -------------------------------------------- | -------- | ---------------- |
| **STDIO** | Local development, direct Claude integration | N/A | All |
| **Worker/SSE** | Legacy cloud deployments | `/sse` | Pre-2025 |
| **Streamable HTTP** | Modern cloud deployments | `/mcp` | 2025-03-26+ |
## Streamable HTTP Transport
### What is Streamable HTTP?
Streamable HTTP is the latest MCP transport protocol (specification version 2025-03-26) that provides:
- **Single endpoint** (`/mcp`) for all operations
- **Dynamic response modes**: JSON for quick operations, SSE for long-running tasks
- **Session management** via `session_id` query parameter
- **Better scalability**: No permanent connections required
- **Automatic reconnection** and session recovery
### Architecture
The Streamable HTTP transport follows this flow:
1. **MCP Client** sends POST request to `/mcp` endpoint
2. **BioMCP Server** processes the request
3. **Response Type** determined by operation:
- Quick operations return JSON response
- Long operations return SSE stream
4. **Session Management** maintains state via session_id parameter
### Implementation Details
BioMCP leverages FastMCP's native streamable HTTP support:
```python
# In core.py
mcp_app = FastMCP(
name="BioMCP",
stateless_http=True, # Enables streamable HTTP
)
```
The transport is automatically handled by FastMCP 1.12.3+, providing:
- Request routing
- Session management
- Response type negotiation
- Error handling
## Migration Guide
### From SSE to Streamable HTTP
If you're currently using the legacy SSE transport, migrate to streamable HTTP:
#### 1. Update Server Configuration
**Before (SSE/Worker mode):**
```bash
biomcp run --mode worker
```
**After (Streamable HTTP):**
```bash
biomcp run --mode streamable_http
```
#### 2. Update Client Configuration
**MCP Inspector:**
```bash
npx @modelcontextprotocol/inspector uv run --with . biomcp run --mode streamable_http
```
**Claude Desktop Configuration:**
```json
{
"mcpServers": {
"biomcp": {
"command": "docker",
"args": [
"run",
"-p",
"8000:8000",
"biomcp:latest",
"biomcp",
"run",
"--mode",
"streamable_http"
]
}
}
}
```
#### 3. Update Cloudflare Worker
The worker now supports both GET (legacy SSE) and POST (streamable HTTP) on the `/mcp` endpoint:
```javascript
// Automatically routes based on method
.get("/mcp", async (c) => {
// Legacy SSE transport
})
.post("/mcp", async (c) => {
// Streamable HTTP transport
})
```
### Backward Compatibility
All legacy endpoints remain functional:
- `/sse` - Server-sent events transport
- `/health` - Health check endpoint
- `/events` - Event streaming endpoint
## Configuration Options
### Server Modes
```bash
# Local development (STDIO)
biomcp run
# Legacy SSE transport
biomcp run --mode worker
# Modern streamable HTTP
biomcp run --mode streamable_http --host 0.0.0.0 --port 8000
```
### Environment Variables
| Variable | Description | Default |
| --------------- | ----------------------- | ------- |
| `MCP_TRANSPORT` | Override transport mode | None |
| `MCP_HOST` | Server bind address | 0.0.0.0 |
| `MCP_PORT` | Server port | 8000 |
## Session Management
Streamable HTTP uses session IDs to maintain state across requests:
```http
POST /mcp?session_id=abc123 HTTP/1.1
Content-Type: application/json
{
"jsonrpc": "2.0",
"method": "initialize",
"params": {...}
}
```
Sessions are:
- Created automatically on first request
- Maintained in server memory
- Cleaned up after inactivity timeout
- Isolated between different clients
## Performance Considerations
### Response Mode Selection
The server automatically selects the optimal response mode:
| Operation Type | Response Mode | Example |
| ----------------- | ------------- | ---------------------- |
| Quick queries | JSON | `search(limit=10)` |
| Large results | SSE | `search(limit=1000)` |
| Real-time updates | SSE | Thinking tool progress |
### Optimization Tips
1. **Use session IDs** for related requests to avoid re-initialization
2. **Batch operations** when possible to reduce round trips
3. **Set appropriate timeouts** for long-running operations
4. **Monitor response times** to identify bottlenecks
## Troubleshooting
### Common Issues
#### 1. Connection Refused
```
Error: connect ECONNREFUSED 127.0.0.1:8000
```
**Solution**: Ensure server is running with `--host 0.0.0.0` for Docker deployments.
#### 2. Session Not Found
```
Error: Session 'xyz' not found
```
**Solution**: Session may have expired. Omit session_id to create new session.
#### 3. Timeout on Large Results
```
Error: Request timeout after 30s
```
**Solution**: Increase client timeout or reduce result size with `limit` parameter.
### Debug Mode
Enable debug logging to troubleshoot transport issues:
```bash
LOG_LEVEL=debug biomcp run --mode streamable_http
```
## Security Considerations
### Authentication
BioMCP does not implement authentication at the transport layer. Secure your deployment using:
- **API Gateway**: AWS API Gateway, Kong, etc.
- **Reverse Proxy**: Nginx with auth modules
- **Cloud IAM**: Platform-specific access controls
### Rate Limiting
Implement rate limiting at the infrastructure layer:
```nginx
# Nginx example
limit_req_zone $binary_remote_addr zone=mcp:10m rate=10r/s;
location /mcp {
limit_req zone=mcp burst=20;
proxy_pass http://biomcp:8000;
}
```
### CORS Configuration
For browser-based clients, configure CORS headers:
```python
# Handled automatically by FastMCP when stateless_http=True
```
## Monitoring
### Health Checks
```bash
# Check server health
curl http://localhost:8000/health
# Response
{"status": "ok", "transport": "streamable_http"}
```
### Metrics
Monitor these key metrics:
- Request rate on `/mcp` endpoint
- Response time percentiles (p50, p95, p99)
- Session count and duration
- Error rate by error type
## Next Steps
- Review [MCP Specification](https://spec.modelcontextprotocol.io) for protocol details
For questions or issues, please visit our [GitHub repository](https://github.com/genomoncology/biomcp).
```
--------------------------------------------------------------------------------
/tests/tdd/test_europe_pmc_fetch.py:
--------------------------------------------------------------------------------
```python
"""Tests for Europe PMC article fetching via DOI."""
import json
from unittest.mock import Mock, patch
import pytest
from biomcp.articles.fetch import _article_details, is_doi, is_pmid
from biomcp.articles.preprints import fetch_europe_pmc_article
class TestDOIDetection:
"""Test DOI and PMID detection functions."""
def test_valid_dois(self):
"""Test that valid DOIs are correctly identified."""
valid_dois = [
"10.1101/2024.01.20.23288905",
"10.1038/nature12373",
"10.1016/j.cell.2023.05.001",
"10.1126/science.abc1234",
]
for doi in valid_dois:
assert (
is_doi(doi) is True
), f"Expected {doi} to be identified as DOI"
assert (
is_pmid(doi) is False
), f"Expected {doi} NOT to be identified as PMID"
def test_valid_pmids(self):
"""Test that valid PMIDs are correctly identified."""
valid_pmids = [
"35271234",
"12345678",
"1",
"999999999",
]
for pmid in valid_pmids:
assert (
is_pmid(pmid) is True
), f"Expected {pmid} to be identified as PMID"
assert (
is_doi(pmid) is False
), f"Expected {pmid} NOT to be identified as DOI"
def test_invalid_identifiers(self):
"""Test that invalid identifiers are rejected by both functions."""
invalid_ids = [
"PMC11193658", # PMC ID
"abc123", # Random string
"10.1101", # Incomplete DOI
"nature12373", # DOI without prefix
"", # Empty string
]
for identifier in invalid_ids:
assert (
is_doi(identifier) is False
), f"Expected {identifier} NOT to be identified as DOI"
assert (
is_pmid(identifier) is False
), f"Expected {identifier} NOT to be identified as PMID"
class TestEuropePMCFetch:
"""Test Europe PMC article fetching."""
@pytest.mark.asyncio
async def test_fetch_europe_pmc_article_success(self):
"""Test successful fetch from Europe PMC."""
# Mock the response
mock_response = Mock()
mock_response.hitCount = 1
mock_response.results = [
Mock(
id="PPR790987",
source="PPR",
pmid=None,
pmcid=None,
doi="10.1101/2024.01.20.23288905",
title="Test Article Title",
authorString="Author A, Author B, Author C",
journalTitle=None,
pubYear="2024",
firstPublicationDate="2024-01-23",
abstractText="This is the abstract text.",
)
]
with patch(
"biomcp.articles.preprints.http_client.request_api"
) as mock_request:
mock_request.return_value = (mock_response, None)
result = await fetch_europe_pmc_article(
"10.1101/2024.01.20.23288905", output_json=True
)
data = json.loads(result)
assert len(data) == 1
article = data[0]
assert article["doi"] == "10.1101/2024.01.20.23288905"
assert article["title"] == "Test Article Title"
assert article["journal"] == "Preprint Server (preprint)"
assert article["date"] == "2024-01-23"
assert article["authors"] == ["Author A", "Author B", "Author C"]
assert article["abstract"] == "This is the abstract text."
assert article["source"] == "Europe PMC"
assert article["pmid"] is None
assert "europepmc.org" in article["pmc_url"]
@pytest.mark.asyncio
async def test_fetch_europe_pmc_article_not_found(self):
"""Test fetch when article is not found in Europe PMC."""
mock_response = Mock()
mock_response.hitCount = 0
mock_response.results = []
with patch(
"biomcp.articles.preprints.http_client.request_api"
) as mock_request:
mock_request.return_value = (mock_response, None)
result = await fetch_europe_pmc_article(
"10.1101/invalid.doi", output_json=True
)
data = json.loads(result)
assert len(data) == 1
assert data[0]["error"] == "Article not found in Europe PMC"
@pytest.mark.asyncio
async def test_fetch_europe_pmc_article_error(self):
"""Test fetch when Europe PMC API returns an error."""
mock_error = Mock()
mock_error.code = 500
mock_error.message = "Internal Server Error"
with patch(
"biomcp.articles.preprints.http_client.request_api"
) as mock_request:
mock_request.return_value = (None, mock_error)
result = await fetch_europe_pmc_article(
"10.1101/2024.01.20.23288905", output_json=True
)
data = json.loads(result)
assert len(data) == 1
assert data[0]["error"] == "Error 500: Internal Server Error"
class TestArticleDetailsRouting:
"""Test that _article_details correctly routes DOIs to Europe PMC."""
@pytest.mark.asyncio
async def test_doi_routes_to_europe_pmc(self):
"""Test that DOIs are routed to fetch_europe_pmc_article."""
test_doi = "10.1101/2024.01.20.23288905"
with patch(
"biomcp.articles.preprints.fetch_europe_pmc_article"
) as mock_europe_pmc:
mock_europe_pmc.return_value = "Europe PMC result"
result = await _article_details("Test", test_doi)
mock_europe_pmc.assert_called_once_with(test_doi, output_json=True)
assert result == "Europe PMC result"
@pytest.mark.asyncio
async def test_pmid_routes_to_pubtator(self):
"""Test that PMIDs are routed to fetch_articles."""
test_pmid = "35271234"
with patch(
"biomcp.articles.fetch.fetch_articles"
) as mock_fetch_articles:
mock_fetch_articles.return_value = "PubTator result"
result = await _article_details("Test", test_pmid)
mock_fetch_articles.assert_called_once_with(
[35271234], full=True, output_json=True
)
assert result == "PubTator result"
@pytest.mark.asyncio
async def test_invalid_identifier_returns_error(self):
"""Test that invalid identifiers return an error."""
invalid_id = "PMC12345"
result = await _article_details("Test", invalid_id)
data = json.loads(result)
assert len(data) == 1
assert "Invalid identifier format" in data[0]["error"]
assert "PMC12345" in data[0]["error"]
```
--------------------------------------------------------------------------------
/tests/tdd/variants/test_getter.py:
--------------------------------------------------------------------------------
```python
"""Tests for variant getter module."""
from unittest.mock import AsyncMock, Mock, patch
import pytest
from biomcp.constants import DEFAULT_ASSEMBLY
from biomcp.variants import getter
class TestGetVariant:
"""Test the get_variant function."""
@pytest.mark.asyncio
async def test_get_variant_default_assembly(self):
"""Test that get_variant defaults to hg19 assembly."""
mock_response = {
"_id": "rs113488022",
"dbsnp": {"rsid": "rs113488022"},
}
with patch("biomcp.http_client.request_api") as mock_request:
mock_request.return_value = (mock_response, None)
await getter.get_variant("rs113488022")
# Verify assembly parameter was passed with default value
call_args = mock_request.call_args
assert call_args[1]["request"]["assembly"] == "hg19"
@pytest.mark.asyncio
async def test_get_variant_hg38_assembly(self):
"""Test that get_variant accepts hg38 assembly parameter."""
mock_response = {
"_id": "rs113488022",
"dbsnp": {"rsid": "rs113488022"},
}
with patch("biomcp.http_client.request_api") as mock_request:
mock_request.return_value = (mock_response, None)
await getter.get_variant("rs113488022", assembly="hg38")
# Verify assembly parameter was passed correctly
call_args = mock_request.call_args
assert call_args[1]["request"]["assembly"] == "hg38"
@pytest.mark.asyncio
async def test_get_variant_hg19_assembly(self):
"""Test that get_variant accepts hg19 assembly parameter explicitly."""
mock_response = {
"_id": "rs113488022",
"dbsnp": {"rsid": "rs113488022"},
}
with patch("biomcp.http_client.request_api") as mock_request:
mock_request.return_value = (mock_response, None)
await getter.get_variant("rs113488022", assembly="hg19")
# Verify assembly parameter was passed correctly
call_args = mock_request.call_args
assert call_args[1]["request"]["assembly"] == "hg19"
@pytest.mark.asyncio
async def test_get_variant_includes_all_fields(self):
"""Test that request includes all required fields."""
mock_response = {"_id": "rs113488022"}
with patch("biomcp.http_client.request_api") as mock_request:
mock_request.return_value = (mock_response, None)
await getter.get_variant("rs113488022", assembly="hg38")
# Verify both fields and assembly are in request
call_args = mock_request.call_args
request_params = call_args[1]["request"]
assert "fields" in request_params
assert request_params["fields"] == "all"
assert "assembly" in request_params
assert request_params["assembly"] == "hg38"
@pytest.mark.asyncio
async def test_get_variant_with_external_annotations(self):
"""Test that assembly parameter works with external annotations."""
from biomcp.variants.external import EnhancedVariantAnnotation
mock_response = {
"_id": "rs113488022",
"dbsnp": {"rsid": "rs113488022"},
"dbnsfp": {"genename": "BRAF"},
}
with (
patch("biomcp.http_client.request_api") as mock_request,
patch(
"biomcp.variants.getter.ExternalVariantAggregator"
) as mock_aggregator,
):
mock_request.return_value = (mock_response, None)
# Mock the aggregator with proper EnhancedVariantAnnotation
mock_enhanced = EnhancedVariantAnnotation(
variant_id="rs113488022",
tcga=None,
thousand_genomes=None,
cbioportal=None,
error_sources=[],
)
mock_agg_instance = AsyncMock()
mock_agg_instance.get_enhanced_annotations = AsyncMock(
return_value=mock_enhanced
)
# _extract_gene_aa_change is a regular (non-async) method
mock_agg_instance._extract_gene_aa_change = Mock(
return_value="BRAF V600E"
)
mock_aggregator.return_value = mock_agg_instance
await getter.get_variant(
"rs113488022",
assembly="hg38",
include_external=True,
)
# Verify assembly was still passed correctly
call_args = mock_request.call_args
assert call_args[1]["request"]["assembly"] == "hg38"
class TestVariantDetailsMCPTool:
"""Test the _variant_details MCP tool."""
@pytest.mark.asyncio
async def test_variant_details_default_assembly(self):
"""Test that _variant_details defaults to hg19 assembly."""
with patch("biomcp.variants.getter.get_variant") as mock_get:
mock_get.return_value = "Variant details"
await getter._variant_details(
call_benefit="Testing default assembly",
variant_id="rs113488022",
)
# Verify get_variant was called with default assembly
mock_get.assert_called_once_with(
"rs113488022",
output_json=False,
include_external=True,
assembly=DEFAULT_ASSEMBLY,
)
@pytest.mark.asyncio
async def test_variant_details_custom_assembly(self):
"""Test that _variant_details accepts custom assembly parameter."""
with patch("biomcp.variants.getter.get_variant") as mock_get:
mock_get.return_value = "Variant details"
await getter._variant_details(
call_benefit="Testing hg38 assembly",
variant_id="rs113488022",
assembly="hg38",
)
# Verify get_variant was called with hg38
mock_get.assert_called_once_with(
"rs113488022",
output_json=False,
include_external=True,
assembly="hg38",
)
@pytest.mark.asyncio
async def test_variant_details_with_all_params(self):
"""Test that all parameters are passed through correctly."""
with patch("biomcp.variants.getter.get_variant") as mock_get:
mock_get.return_value = "Variant details"
await getter._variant_details(
call_benefit="Testing all parameters",
variant_id="chr7:g.140453136A>T",
include_external=False,
assembly="hg19",
)
# Verify all params were passed
mock_get.assert_called_once_with(
"chr7:g.140453136A>T",
output_json=False,
include_external=False,
assembly="hg19",
)
```
--------------------------------------------------------------------------------
/src/biomcp/workers/worker_entry.js:
--------------------------------------------------------------------------------
```javascript
/**
* BioMCP Worker – Auth‑less version (rev 1.8)
*
* Fix: Added improved error handling and increased timeouts for list requests
*/
// Server URL will be configured from environment variables
let REMOTE_MCP_SERVER_URL = "http://localhost:8000"; // Default fallback
const DEBUG = true;
const log = (m) => DEBUG && console.log("[DEBUG]", m);
const CORS = {
"Access-Control-Allow-Origin": "*",
"Access-Control-Allow-Methods": "GET, POST, OPTIONS",
"Access-Control-Allow-Headers": "*",
"Access-Control-Max-Age": "86400",
};
const json = (o, s = 200) =>
new Response(JSON.stringify(o, null, 2), {
status: s,
headers: { "Content-Type": "application/json", ...CORS },
});
let forwardPath = "/messages"; // for proxying JSON‑RPC POSTS (no query)
let resourceEndpoint = null; // full string we echo back (/messages/?sid=…)
// Track active SSE connections to avoid duplicate connections
const activeConnections = new Map();
export default {
async fetch(req, env, ctx) {
// Use environment variable if available, otherwise use the default
REMOTE_MCP_SERVER_URL = env.REMOTE_MCP_SERVER_URL || REMOTE_MCP_SERVER_URL;
const url = new URL(req.url);
log(`${req.method} ${url.pathname}${url.search}`);
if (req.method === "OPTIONS")
return new Response(null, { status: 204, headers: CORS });
if (url.pathname === "/status" || url.pathname === "/debug")
return json({
worker: "BioMCP-authless",
remote: REMOTE_MCP_SERVER_URL,
forwardPath,
resourceEndpoint,
});
if (url.pathname === "/sse" || url.pathname === "/events")
return serveSSE(req, ctx);
if (req.method === "POST") {
const sid = url.searchParams.get("session_id");
if (!sid) return new Response("Missing session_id", { status: 400 });
return proxyPost(req, forwardPath, sid);
}
return new Response("Not found", { status: 404 });
},
};
async function proxyPost(req, path, sid) {
const body = await req.text();
const target = `${REMOTE_MCP_SERVER_URL}${path}?session_id=${encodeURIComponent(
sid,
)}`;
try {
// Parse the request to check if it's a list request that might need a longer timeout
let jsonBody;
try {
jsonBody = JSON.parse(body);
} catch (e) {
// Not valid JSON, proceed with normal request
jsonBody = {};
}
// Set a longer timeout for list requests that tend to time out
const timeout =
jsonBody.method &&
(jsonBody.method === "tools/list" || jsonBody.method === "resources/list")
? 30000
: 10000;
// Use AbortController to implement timeout
const controller = new AbortController();
const timeoutId = setTimeout(() => controller.abort(), timeout);
log(`Proxying ${jsonBody.method || "request"} with timeout ${timeout}ms`);
const resp = await fetch(target, {
method: "POST",
headers: { "Content-Type": "application/json" },
body,
signal: controller.signal,
});
clearTimeout(timeoutId);
// If it's a list request, cache the response for future use
if (
jsonBody.method &&
(jsonBody.method === "tools/list" || jsonBody.method === "resources/list")
) {
log(`Received response for ${jsonBody.method}`);
}
return new Response(await resp.text(), {
status: resp.status,
headers: { "Content-Type": "application/json", ...CORS },
});
} catch (error) {
log(`POST error: ${error.message}`);
// For timeout errors, provide a default empty response for list requests
if (error.name === "AbortError") {
try {
const jsonBody = JSON.parse(body);
if (jsonBody.method === "tools/list") {
log("Returning empty tools list due to timeout");
return new Response(
JSON.stringify({
jsonrpc: "2.0",
id: jsonBody.id,
result: { tools: [] },
}),
{
status: 200,
headers: { "Content-Type": "application/json", ...CORS },
},
);
} else if (jsonBody.method === "resources/list") {
log("Returning empty resources list due to timeout");
return new Response(
JSON.stringify({
jsonrpc: "2.0",
id: jsonBody.id,
result: { resources: [] },
}),
{
status: 200,
headers: { "Content-Type": "application/json", ...CORS },
},
);
}
} catch (e) {
// If parsing fails, fall through to default error response
}
}
return new Response(JSON.stringify({ error: error.message }), {
status: 502,
headers: { "Content-Type": "application/json", ...CORS },
});
}
}
function serveSSE(clientReq, ctx) {
const enc = new TextEncoder();
let keepalive;
const upstreamCtl = new AbortController();
const stream = new ReadableStream({
async start(ctrl) {
ctrl.enqueue(enc.encode("event: ready\ndata: {}\n\n"));
clientReq.signal.addEventListener("abort", () => {
clearInterval(keepalive);
upstreamCtl.abort();
ctrl.close();
});
try {
const u = await fetch(`${REMOTE_MCP_SERVER_URL}/sse`, {
headers: { Accept: "text/event-stream" },
signal: upstreamCtl.signal,
});
if (!u.ok || !u.body) throw new Error(`Upstream SSE ${u.status}`);
const r = u.body.getReader();
while (true) {
const { value, done } = await r.read();
if (done) break;
if (value) {
const text = new TextDecoder().decode(value);
// capture first endpoint once
if (!resourceEndpoint) {
const m = text.match(
/data:\s*(\/messages\/\?session_id=[A-Za-z0-9_-]+)/,
);
if (m) {
resourceEndpoint = m[1];
forwardPath = resourceEndpoint.split("?")[0];
log(`Captured endpoint ${resourceEndpoint}`);
ctrl.enqueue(
enc.encode(`event: resource\ndata: ${resourceEndpoint}\n\n`),
);
}
}
ctrl.enqueue(value);
}
}
} catch (e) {
if (e.name !== "AbortError") {
log(`SSE error: ${e.message}`);
ctrl.enqueue(enc.encode(`event: error\ndata: ${e.message}\n\n`));
}
}
// Reduce keepalive interval to 5 seconds to prevent timeouts
keepalive = setInterval(() => {
try {
ctrl.enqueue(enc.encode(":keepalive\n\n"));
} catch (_) {
clearInterval(keepalive);
}
}, 5000);
},
});
return new Response(stream, {
headers: {
"Content-Type": "text/event-stream",
"Cache-Control": "no-cache",
Connection: "keep-alive",
...CORS,
},
});
}
```
--------------------------------------------------------------------------------
/tests/tdd/test_drug_approvals.py:
--------------------------------------------------------------------------------
```python
"""Tests for FDA drug approvals module."""
import json
from pathlib import Path
from unittest.mock import AsyncMock, patch
import pytest
from biomcp.openfda.drug_approvals import (
get_drug_approval,
search_drug_approvals,
)
# Load mock data
MOCK_DIR = Path(__file__).parent.parent / "data" / "openfda"
MOCK_APPROVALS_SEARCH = json.loads(
(MOCK_DIR / "drugsfda_search.json").read_text()
)
MOCK_APPROVAL_DETAIL = json.loads(
(MOCK_DIR / "drugsfda_detail.json").read_text()
)
class TestDrugApprovals:
"""Test drug approvals functionality."""
@pytest.mark.asyncio
async def test_search_drug_approvals_success(self):
"""Test successful drug approval search."""
with patch(
"biomcp.openfda.drug_approvals.make_openfda_request",
new_callable=AsyncMock,
) as mock_request:
mock_request.return_value = (MOCK_APPROVALS_SEARCH, None)
result = await search_drug_approvals(
drug="pembrolizumab",
limit=10,
)
assert "FDA Drug Approval Records" in result
assert "pembrolizumab" in result.lower()
assert "Application" in result
assert "BLA125514" in result
mock_request.assert_called_once()
@pytest.mark.asyncio
async def test_search_drug_approvals_with_filters(self):
"""Test drug approval search with multiple filters."""
with patch(
"biomcp.openfda.drug_approvals.make_openfda_request",
new_callable=AsyncMock,
) as mock_request:
mock_request.return_value = (MOCK_APPROVALS_SEARCH, None)
result = await search_drug_approvals(
drug="keytruda",
application_number="BLA125514",
approval_year="2014",
limit=5,
api_key="test-key",
)
assert "FDA Drug Approval Records" in result
# Verify API key was passed as the 4th positional argument
call_args = mock_request.call_args
assert (
call_args[0][3] == "test-key"
) # api_key is 4th positional arg
@pytest.mark.asyncio
async def test_search_drug_approvals_no_results(self):
"""Test drug approval search with no results."""
with patch(
"biomcp.openfda.drug_approvals.make_openfda_request",
new_callable=AsyncMock,
) as mock_request:
mock_request.return_value = ({"results": []}, None)
result = await search_drug_approvals(drug="nonexistent-drug")
assert "No drug approval records found" in result
@pytest.mark.asyncio
async def test_search_drug_approvals_api_error(self):
"""Test drug approval search with API error."""
with patch(
"biomcp.openfda.drug_approvals.make_openfda_request",
new_callable=AsyncMock,
) as mock_request:
mock_request.return_value = (None, "API rate limit exceeded")
result = await search_drug_approvals(drug="test")
assert "Error searching drug approvals" in result
assert "API rate limit exceeded" in result
@pytest.mark.asyncio
async def test_get_drug_approval_success(self):
"""Test getting specific drug approval details."""
with patch(
"biomcp.openfda.drug_approvals.make_openfda_request",
new_callable=AsyncMock,
) as mock_request:
mock_request.return_value = (MOCK_APPROVAL_DETAIL, None)
result = await get_drug_approval("BLA125514")
# Should have detailed approval info
assert "BLA125514" in result or "Drug Approval Details" in result
assert "BLA125514" in result
assert "Products" in result
assert "Submission" in result
@pytest.mark.asyncio
async def test_get_drug_approval_not_found(self):
"""Test getting drug approval that doesn't exist."""
with patch(
"biomcp.openfda.drug_approvals.make_openfda_request",
new_callable=AsyncMock,
) as mock_request:
mock_request.return_value = ({"results": []}, None)
result = await get_drug_approval("INVALID123")
assert "No approval record found" in result
assert "INVALID123" in result
@pytest.mark.asyncio
async def test_get_drug_approval_with_api_key(self):
"""Test getting drug approval with API key."""
with patch(
"biomcp.openfda.drug_approvals.make_openfda_request",
new_callable=AsyncMock,
) as mock_request:
mock_request.return_value = (MOCK_APPROVAL_DETAIL, None)
result = await get_drug_approval(
"BLA125514",
api_key="test-api-key",
)
# Should have detailed approval info
assert "BLA125514" in result or "Drug Approval Details" in result
# Verify API key was passed as the 4th positional argument
call_args = mock_request.call_args
assert (
call_args[0][3] == "test-api-key"
) # api_key is 4th positional arg
@pytest.mark.asyncio
async def test_search_drug_approvals_pagination(self):
"""Test drug approval search pagination."""
with patch(
"biomcp.openfda.drug_approvals.make_openfda_request",
new_callable=AsyncMock,
) as mock_request:
mock_response = {
"meta": {"results": {"total": 100}},
"results": MOCK_APPROVALS_SEARCH["results"],
}
mock_request.return_value = (mock_response, None)
result = await search_drug_approvals(
drug="cancer",
limit=10,
skip=20,
)
# The output format is different - just check for the total
assert "100" in result
# Verify skip parameter was passed (2nd positional arg)
call_args = mock_request.call_args
assert (
call_args[0][1]["skip"] == "20"
) # params is 2nd positional arg, value is string
@pytest.mark.asyncio
async def test_approval_year_validation(self):
"""Test that approval year is properly formatted."""
with patch(
"biomcp.openfda.drug_approvals.make_openfda_request",
new_callable=AsyncMock,
) as mock_request:
mock_request.return_value = (MOCK_APPROVALS_SEARCH, None)
await search_drug_approvals(
approval_year="2023",
)
# Check that year was properly formatted in query
call_args = mock_request.call_args
params = call_args[0][1] # params is 2nd positional arg
assert "marketing_status_date" in params["search"]
assert "[2023-01-01 TO 2023-12-31]" in params["search"]
```
--------------------------------------------------------------------------------
/src/biomcp/articles/fetch.py:
--------------------------------------------------------------------------------
```python
import json
import re
from ssl import TLSVersion
from typing import Annotated, Any
from pydantic import BaseModel, Field, computed_field
from .. import http_client, render
from ..constants import PUBTATOR3_FULLTEXT_URL
from ..http_client import RequestError
class PassageInfo(BaseModel):
section_type: str | None = Field(
None,
description="Type of the section.",
)
passage_type: str | None = Field(
None,
alias="type",
description="Type of the passage.",
)
class Passage(BaseModel):
info: PassageInfo | None = Field(
None,
alias="infons",
)
text: str | None = None
@property
def section_type(self) -> str:
section_type = None
if self.info is not None:
section_type = self.info.section_type or self.info.passage_type
section_type = section_type or "UNKNOWN"
return section_type.upper()
@property
def is_title(self) -> bool:
return self.section_type == "TITLE"
@property
def is_abstract(self) -> bool:
return self.section_type == "ABSTRACT"
@property
def is_text(self) -> bool:
return self.section_type in {
"INTRO",
"RESULTS",
"METHODS",
"DISCUSS",
"CONCL",
"FIG",
"TABLE",
}
class Article(BaseModel):
pmid: int | None = Field(
None,
description="PubMed ID of the reference article.",
)
pmcid: str | None = Field(
None,
description="PubMed Central ID of the reference article.",
)
date: str | None = Field(
None,
description="Date of the reference article's publication.",
)
journal: str | None = Field(
None,
description="Journal name.",
)
authors: list[str] | None = Field(
None,
description="List of authors.",
)
passages: list[Passage] = Field(
...,
alias="passages",
description="List of passages in the reference article.",
exclude=True,
)
@computed_field
def title(self) -> str:
lines = []
for passage in filter(lambda p: p.is_title, self.passages):
if passage.text:
lines.append(passage.text)
return " ... ".join(lines) or f"Article: {self.pmid}"
@computed_field
def abstract(self) -> str:
lines = []
for passage in filter(lambda p: p.is_abstract, self.passages):
if passage.text:
lines.append(passage.text)
return "\n\n".join(lines) or f"Article: {self.pmid}"
@computed_field
def full_text(self) -> str:
lines = []
for passage in filter(lambda p: p.is_text, self.passages):
if passage.text:
lines.append(passage.text)
return "\n\n".join(lines) or ""
@computed_field
def pubmed_url(self) -> str | None:
url = None
if self.pmid:
url = f"https://pubmed.ncbi.nlm.nih.gov/{self.pmid}/"
return url
@computed_field
def pmc_url(self) -> str | None:
"""Generates the PMC URL if PMCID exists."""
url = None
if self.pmcid:
url = f"https://www.ncbi.nlm.nih.gov/pmc/articles/{self.pmcid}/"
return url
class FetchArticlesResponse(BaseModel):
articles: list[Article] = Field(
...,
alias="PubTator3",
description="List of full texts Articles retrieved from PubTator3.",
)
def get_abstract(self, pmid: int | None) -> str | None:
for article in self.articles:
if pmid and article.pmid == pmid:
return str(article.abstract)
return None
async def call_pubtator_api(
pmids: list[int],
full: bool,
) -> tuple[FetchArticlesResponse | None, RequestError | None]:
"""Fetch the text of a list of PubMed IDs."""
request = {
"pmids": ",".join(str(pmid) for pmid in pmids),
"full": str(full).lower(),
}
response, error = await http_client.request_api(
url=PUBTATOR3_FULLTEXT_URL,
request=request,
response_model_type=FetchArticlesResponse,
tls_version=TLSVersion.TLSv1_2,
domain="pubmed",
)
return response, error
async def fetch_articles(
pmids: list[int],
full: bool,
output_json: bool = False,
) -> str:
"""Fetch the text of a list of PubMed IDs."""
response, error = await call_pubtator_api(pmids, full)
# PubTator API returns full text even when full=False
exclude_fields = {"full_text"} if not full else set()
# noinspection DuplicatedCode
if error:
data: list[dict[str, Any]] = [
{"error": f"Error {error.code}: {error.message}"}
]
else:
data = [
article.model_dump(
mode="json",
exclude_none=True,
exclude=exclude_fields,
)
for article in (response.articles if response else [])
]
if data and not output_json:
return render.to_markdown(data)
else:
return json.dumps(data, indent=2)
def is_doi(identifier: str) -> bool:
"""Check if the identifier is a DOI."""
# DOI pattern: starts with 10. followed by numbers/slash/alphanumeric
doi_pattern = r"^10\.\d{4,9}/[\-._;()/:\w]+$"
return bool(re.match(doi_pattern, str(identifier)))
def is_pmid(identifier: str) -> bool:
"""Check if the identifier is a PubMed ID."""
# PMID is a numeric string
return str(identifier).isdigit()
async def _article_details(
call_benefit: Annotated[
str,
"Define and summarize why this function is being called and the intended benefit",
],
pmid,
) -> str:
"""
Retrieves details for a single article given its identifier.
Parameters:
- call_benefit: Define and summarize why this function is being called and the intended benefit
- pmid: An article identifier - either a PubMed ID (e.g., 34397683) or DOI (e.g., 10.1101/2024.01.20.23288905)
Process:
- For PMIDs: Calls the PubTator3 API to fetch the article's title, abstract, and full text (if available)
- For DOIs: Calls Europe PMC API to fetch preprint details
Output: A JSON formatted string containing the retrieved article content.
"""
identifier = str(pmid)
# Check if it's a DOI (Europe PMC preprint)
if is_doi(identifier):
from .preprints import fetch_europe_pmc_article
return await fetch_europe_pmc_article(identifier, output_json=True)
# Check if it's a PMID (PubMed article)
elif is_pmid(identifier):
return await fetch_articles(
[int(identifier)], full=True, output_json=True
)
else:
# Unknown identifier format
return json.dumps(
[
{
"error": f"Invalid identifier format: {identifier}. Expected either a PMID (numeric) or DOI (10.xxxx/xxxx format)."
}
],
indent=2,
)
```
--------------------------------------------------------------------------------
/docs/concepts/02-the-deep-researcher-persona.md:
--------------------------------------------------------------------------------
```markdown
# The Deep Researcher Persona
## Overview
The Deep Researcher Persona is a core philosophy of BioMCP that transforms AI assistants into systematic biomedical research partners. This persona embodies the methodical approach of a dedicated biomedical researcher, enabling AI agents to conduct thorough literature reviews, analyze complex datasets, and synthesize findings into actionable insights.
## Why the Deep Researcher Persona?
Traditional AI interactions often result in surface-level responses. The Deep Researcher Persona addresses this by:
- **Enforcing Systematic Thinking**: Requiring the use of the `think` tool before any research operation
- **Preventing Premature Conclusions**: Breaking complex queries into manageable research steps
- **Ensuring Comprehensive Analysis**: Following a proven 10-step methodology
- **Maintaining Research Rigor**: Documenting thought processes and decision rationale
## Core Traits and Personality
The Deep Researcher embodies these characteristics:
- **Curious and Methodical**: Always seeking deeper understanding through systematic investigation
- **Evidence-Based**: Grounding all conclusions in concrete data from multiple sources
- **Professional Voice**: Clear, concise scientific communication
- **Collaborative**: Working as a research partner, not just an information retriever
- **Objective**: Presenting balanced findings including contradictory evidence
## The 10-Step Sequential Thinking Process
This methodology ensures comprehensive research coverage:
### 1. Problem Definition and Scope
- Parse the research question to identify key concepts
- Define clear objectives and expected deliverables
- Establish research boundaries and constraints
### 2. Initial Knowledge Assessment
- Evaluate existing knowledge on the topic
- Identify knowledge gaps requiring investigation
- Form initial hypotheses to guide research
### 3. Search Strategy Development
- Design comprehensive search queries
- Select appropriate databases and tools
- Plan iterative search refinements
### 4. Data Collection and Retrieval
- Execute searches across multiple sources (PubTator3, ClinicalTrials.gov, variant databases)
- Collect relevant articles, trials, and annotations
- Document search parameters and results
### 5. Quality Assessment and Filtering
- Evaluate source credibility and relevance
- Apply inclusion/exclusion criteria
- Prioritize high-impact findings
### 6. Information Extraction
- Extract key findings, methodologies, and conclusions
- Identify patterns and relationships
- Note contradictions and uncertainties
### 7. Synthesis and Integration
- Combine findings from multiple sources
- Resolve contradictions when possible
- Build coherent narrative from evidence
### 8. Critical Analysis
- Evaluate strength of evidence
- Identify limitations and biases
- Consider alternative interpretations
### 9. Knowledge Synthesis
- Create structured summary of findings
- Highlight key insights and implications
- Prepare actionable recommendations
### 10. Communication and Reporting
- Format findings for target audience
- Include proper citations and references
- Provide clear next steps
## Mandatory Think Tool Usage
**CRITICAL**: The `think` tool must ALWAYS be used first before any BioMCP operation. This is not optional.
```python
# Correct pattern - ALWAYS start with think
think(thought="Breaking down the research question...", thoughtNumber=1)
# Then proceed with searches
article_searcher(genes=["BRAF"], diseases=["melanoma"])
# INCORRECT - Never skip the think step
article_searcher(genes=["BRAF"]) # ❌ Will produce suboptimal results
```
## Implementation in Practice
### Example Research Flow
1. **User Query**: "What are the treatment options for BRAF V600E melanoma?"
2. **Think Step 1**: Problem decomposition
```
think(thought="Breaking down query: Need to find 1) BRAF V600E mutation significance, 2) current treatments, 3) clinical trials", thoughtNumber=1)
```
3. **Think Step 2**: Search strategy
```
think(thought="Will search articles for BRAF inhibitors, then trials for V600E-specific treatments", thoughtNumber=2)
```
4. **Execute Searches**: Following the planned strategy
5. **Synthesize**: Combine findings into comprehensive brief
### Research Brief Format
Every research session concludes with a structured brief:
```markdown
## Research Brief: [Topic]
### Executive Summary
- 3-5 bullet points of key findings
- Clear, actionable insights
### Detailed Findings
1. **Literature Review** (X papers analyzed)
- Key discoveries
- Consensus findings
- Contradictions noted
2. **Clinical Evidence** (Y trials reviewed)
- Current treatment landscape
- Emerging therapies
- Trial enrollment opportunities
3. **Molecular Insights**
- Variant annotations
- Pathway implications
- Biomarker relevance
### Recommendations
- Evidence-based suggestions
- Areas for further investigation
- Clinical considerations
### References
- Full citations for all sources
- Direct links to primary data
```
## Tool Inventory and Usage
The Deep Researcher has access to 24 specialized tools:
### Core Research Tools
- **think**: Sequential reasoning and planning
- **article_searcher**: PubMed/PubTator3 literature search
- **trial_searcher**: Clinical trials discovery
- **variant_searcher**: Genetic variant annotations
### Specialized Analysis Tools
- **gene_getter**: Gene function and pathway data
- **drug_getter**: Medication information
- **disease_getter**: Disease ontology and synonyms
- **alphagenome_predictor**: Variant effect prediction
### Integration Features
- **Automatic cBioPortal Integration**: Cancer genomics context for all gene searches
- **BioThings Suite Access**: Real-time biomedical annotations
- **NCI Database Integration**: Comprehensive cancer trial data
## Best Practices
1. **Always Think First**: Never skip the sequential thinking process
2. **Use Multiple Sources**: Cross-reference findings across databases
3. **Document Reasoning**: Explain why certain searches or filters were chosen
4. **Consider Context**: Account for disease stage, prior treatments, and patient factors
5. **Stay Current**: Leverage preprint integration for latest findings
## Community Impact
The Deep Researcher Persona has transformed how researchers interact with biomedical data:
- **Reduced Research Time**: From days to minutes for comprehensive reviews
- **Improved Accuracy**: Systematic approach reduces missed connections
- **Enhanced Collaboration**: Consistent methodology enables team research
- **Democratized Access**: Complex research capabilities available to all
## Getting Started
To use the Deep Researcher Persona:
1. Ensure BioMCP is installed and configured
2. Load the persona resource when starting your AI session
3. Always begin research queries with the think tool
4. Follow the 10-step methodology for comprehensive results
Remember: The Deep Researcher Persona is not just a tool configuration—it's a systematic approach to biomedical research that ensures thorough, evidence-based insights every time.
```