This is page 3 of 15. Use http://codebase.md/genomoncology/biomcp?page={x} to view the full context. # Directory Structure ``` ├── .github │ ├── actions │ │ └── setup-python-env │ │ └── action.yml │ ├── dependabot.yml │ └── workflows │ ├── ci.yml │ ├── deploy-docs.yml │ ├── main.yml.disabled │ ├── on-release-main.yml │ └── validate-codecov-config.yml ├── .gitignore ├── .pre-commit-config.yaml ├── BIOMCP_DATA_FLOW.md ├── CHANGELOG.md ├── CNAME ├── codecov.yaml ├── docker-compose.yml ├── Dockerfile ├── docs │ ├── apis │ │ ├── error-codes.md │ │ ├── overview.md │ │ └── python-sdk.md │ ├── assets │ │ ├── biomcp-cursor-locations.png │ │ ├── favicon.ico │ │ ├── icon.png │ │ ├── logo.png │ │ ├── mcp_architecture.txt │ │ └── remote-connection │ │ ├── 00_connectors.png │ │ ├── 01_add_custom_connector.png │ │ ├── 02_connector_enabled.png │ │ ├── 03_connect_to_biomcp.png │ │ ├── 04_select_google_oauth.png │ │ └── 05_success_connect.png │ ├── backend-services-reference │ │ ├── 01-overview.md │ │ ├── 02-biothings-suite.md │ │ ├── 03-cbioportal.md │ │ ├── 04-clinicaltrials-gov.md │ │ ├── 05-nci-cts-api.md │ │ ├── 06-pubtator3.md │ │ └── 07-alphagenome.md │ ├── blog │ │ ├── ai-assisted-clinical-trial-search-analysis.md │ │ ├── images │ │ │ ├── deep-researcher-video.png │ │ │ ├── researcher-announce.png │ │ │ ├── researcher-drop-down.png │ │ │ ├── researcher-prompt.png │ │ │ ├── trial-search-assistant.png │ │ │ └── what_is_biomcp_thumbnail.png │ │ └── researcher-persona-resource.md │ ├── changelog.md │ ├── CNAME │ ├── concepts │ │ ├── 01-what-is-biomcp.md │ │ ├── 02-the-deep-researcher-persona.md │ │ └── 03-sequential-thinking-with-the-think-tool.md │ ├── developer-guides │ │ ├── 01-server-deployment.md │ │ ├── 02-contributing-and-testing.md │ │ ├── 03-third-party-endpoints.md │ │ ├── 04-transport-protocol.md │ │ ├── 05-error-handling.md │ │ ├── 06-http-client-and-caching.md │ │ ├── 07-performance-optimizations.md │ │ └── generate_endpoints.py │ ├── faq-condensed.md │ ├── FDA_SECURITY.md │ ├── genomoncology.md │ ├── getting-started │ │ ├── 01-quickstart-cli.md │ │ ├── 02-claude-desktop-integration.md │ │ └── 03-authentication-and-api-keys.md │ ├── how-to-guides │ │ ├── 01-find-articles-and-cbioportal-data.md │ │ ├── 02-find-trials-with-nci-and-biothings.md │ │ ├── 03-get-comprehensive-variant-annotations.md │ │ ├── 04-predict-variant-effects-with-alphagenome.md │ │ ├── 05-logging-and-monitoring-with-bigquery.md │ │ └── 06-search-nci-organizations-and-interventions.md │ ├── index.md │ ├── policies.md │ ├── reference │ │ ├── architecture-diagrams.md │ │ ├── quick-architecture.md │ │ ├── quick-reference.md │ │ └── visual-architecture.md │ ├── robots.txt │ ├── stylesheets │ │ ├── announcement.css │ │ └── extra.css │ ├── troubleshooting.md │ ├── tutorials │ │ ├── biothings-prompts.md │ │ ├── claude-code-biomcp-alphagenome.md │ │ ├── nci-prompts.md │ │ ├── openfda-integration.md │ │ ├── openfda-prompts.md │ │ ├── pydantic-ai-integration.md │ │ └── remote-connection.md │ ├── user-guides │ │ ├── 01-command-line-interface.md │ │ ├── 02-mcp-tools-reference.md │ │ └── 03-integrating-with-ides-and-clients.md │ └── workflows │ └── all-workflows.md ├── example_scripts │ ├── mcp_integration.py │ └── python_sdk.py ├── glama.json ├── LICENSE ├── lzyank.toml ├── Makefile ├── mkdocs.yml ├── package-lock.json ├── package.json ├── pyproject.toml ├── README.md ├── scripts │ ├── check_docs_in_mkdocs.py │ ├── check_http_imports.py │ └── generate_endpoints_doc.py ├── smithery.yaml ├── src │ └── biomcp │ ├── __init__.py │ ├── __main__.py │ ├── articles │ │ ├── __init__.py │ │ ├── autocomplete.py │ │ ├── fetch.py │ │ ├── preprints.py │ │ ├── search_optimized.py │ │ ├── search.py │ │ └── unified.py │ ├── biomarkers │ │ ├── __init__.py │ │ └── search.py │ ├── cbioportal_helper.py │ ├── circuit_breaker.py │ ├── cli │ │ ├── __init__.py │ │ ├── articles.py │ │ ├── biomarkers.py │ │ ├── diseases.py │ │ ├── health.py │ │ ├── interventions.py │ │ ├── main.py │ │ ├── openfda.py │ │ ├── organizations.py │ │ ├── server.py │ │ ├── trials.py │ │ └── variants.py │ ├── connection_pool.py │ ├── constants.py │ ├── core.py │ ├── diseases │ │ ├── __init__.py │ │ ├── getter.py │ │ └── search.py │ ├── domain_handlers.py │ ├── drugs │ │ ├── __init__.py │ │ └── getter.py │ ├── exceptions.py │ ├── genes │ │ ├── __init__.py │ │ └── getter.py │ ├── http_client_simple.py │ ├── http_client.py │ ├── individual_tools.py │ ├── integrations │ │ ├── __init__.py │ │ ├── biothings_client.py │ │ └── cts_api.py │ ├── interventions │ │ ├── __init__.py │ │ ├── getter.py │ │ └── search.py │ ├── logging_filter.py │ ├── metrics_handler.py │ ├── metrics.py │ ├── openfda │ │ ├── __init__.py │ │ ├── adverse_events_helpers.py │ │ ├── adverse_events.py │ │ ├── cache.py │ │ ├── constants.py │ │ ├── device_events_helpers.py │ │ ├── device_events.py │ │ ├── drug_approvals.py │ │ ├── drug_labels_helpers.py │ │ ├── drug_labels.py │ │ ├── drug_recalls_helpers.py │ │ ├── drug_recalls.py │ │ ├── drug_shortages_detail_helpers.py │ │ ├── drug_shortages_helpers.py │ │ ├── drug_shortages.py │ │ ├── exceptions.py │ │ ├── input_validation.py │ │ ├── rate_limiter.py │ │ ├── utils.py │ │ └── validation.py │ ├── organizations │ │ ├── __init__.py │ │ ├── getter.py │ │ └── search.py │ ├── parameter_parser.py │ ├── prefetch.py │ ├── query_parser.py │ ├── query_router.py │ ├── rate_limiter.py │ ├── render.py │ ├── request_batcher.py │ ├── resources │ │ ├── __init__.py │ │ ├── getter.py │ │ ├── instructions.md │ │ └── researcher.md │ ├── retry.py │ ├── router_handlers.py │ ├── router.py │ ├── shared_context.py │ ├── thinking │ │ ├── __init__.py │ │ ├── sequential.py │ │ └── session.py │ ├── thinking_tool.py │ ├── thinking_tracker.py │ ├── trials │ │ ├── __init__.py │ │ ├── getter.py │ │ ├── nci_getter.py │ │ ├── nci_search.py │ │ └── search.py │ ├── utils │ │ ├── __init__.py │ │ ├── cancer_types_api.py │ │ ├── cbio_http_adapter.py │ │ ├── endpoint_registry.py │ │ ├── gene_validator.py │ │ ├── metrics.py │ │ ├── mutation_filter.py │ │ ├── query_utils.py │ │ ├── rate_limiter.py │ │ └── request_cache.py │ ├── variants │ │ ├── __init__.py │ │ ├── alphagenome.py │ │ ├── cancer_types.py │ │ ├── cbio_external_client.py │ │ ├── cbioportal_mutations.py │ │ ├── cbioportal_search_helpers.py │ │ ├── cbioportal_search.py │ │ ├── constants.py │ │ ├── external.py │ │ ├── filters.py │ │ ├── getter.py │ │ ├── links.py │ │ └── search.py │ └── workers │ ├── __init__.py │ ├── worker_entry_stytch.js │ ├── worker_entry.js │ └── worker.py ├── tests │ ├── bdd │ │ ├── cli_help │ │ │ ├── help.feature │ │ │ └── test_help.py │ │ ├── conftest.py │ │ ├── features │ │ │ └── alphagenome_integration.feature │ │ ├── fetch_articles │ │ │ ├── fetch.feature │ │ │ └── test_fetch.py │ │ ├── get_trials │ │ │ ├── get.feature │ │ │ └── test_get.py │ │ ├── get_variants │ │ │ ├── get.feature │ │ │ └── test_get.py │ │ ├── search_articles │ │ │ ├── autocomplete.feature │ │ │ ├── search.feature │ │ │ ├── test_autocomplete.py │ │ │ └── test_search.py │ │ ├── search_trials │ │ │ ├── search.feature │ │ │ └── test_search.py │ │ ├── search_variants │ │ │ ├── search.feature │ │ │ └── test_search.py │ │ └── steps │ │ └── test_alphagenome_steps.py │ ├── config │ │ └── test_smithery_config.py │ ├── conftest.py │ ├── data │ │ ├── ct_gov │ │ │ ├── clinical_trials_api_v2.yaml │ │ │ ├── trials_NCT04280705.json │ │ │ └── trials_NCT04280705.txt │ │ ├── myvariant │ │ │ ├── myvariant_api.yaml │ │ │ ├── myvariant_field_descriptions.csv │ │ │ ├── variants_full_braf_v600e.json │ │ │ ├── variants_full_braf_v600e.txt │ │ │ └── variants_part_braf_v600_multiple.json │ │ ├── openfda │ │ │ ├── drugsfda_detail.json │ │ │ ├── drugsfda_search.json │ │ │ ├── enforcement_detail.json │ │ │ └── enforcement_search.json │ │ └── pubtator │ │ ├── pubtator_autocomplete.json │ │ └── pubtator3_paper.txt │ ├── integration │ │ ├── test_openfda_integration.py │ │ ├── test_preprints_integration.py │ │ ├── test_simple.py │ │ └── test_variants_integration.py │ ├── tdd │ │ ├── articles │ │ │ ├── test_autocomplete.py │ │ │ ├── test_cbioportal_integration.py │ │ │ ├── test_fetch.py │ │ │ ├── test_preprints.py │ │ │ ├── test_search.py │ │ │ └── test_unified.py │ │ ├── conftest.py │ │ ├── drugs │ │ │ ├── __init__.py │ │ │ └── test_drug_getter.py │ │ ├── openfda │ │ │ ├── __init__.py │ │ │ ├── test_adverse_events.py │ │ │ ├── test_device_events.py │ │ │ ├── test_drug_approvals.py │ │ │ ├── test_drug_labels.py │ │ │ ├── test_drug_recalls.py │ │ │ ├── test_drug_shortages.py │ │ │ └── test_security.py │ │ ├── test_biothings_integration_real.py │ │ ├── test_biothings_integration.py │ │ ├── test_circuit_breaker.py │ │ ├── test_concurrent_requests.py │ │ ├── test_connection_pool.py │ │ ├── test_domain_handlers.py │ │ ├── test_drug_approvals.py │ │ ├── test_drug_recalls.py │ │ ├── test_drug_shortages.py │ │ ├── test_endpoint_documentation.py │ │ ├── test_error_scenarios.py │ │ ├── test_europe_pmc_fetch.py │ │ ├── test_mcp_integration.py │ │ ├── test_mcp_tools.py │ │ ├── test_metrics.py │ │ ├── test_nci_integration.py │ │ ├── test_nci_mcp_tools.py │ │ ├── test_network_policies.py │ │ ├── test_offline_mode.py │ │ ├── test_openfda_unified.py │ │ ├── test_pten_r173_search.py │ │ ├── test_render.py │ │ ├── test_request_batcher.py.disabled │ │ ├── test_retry.py │ │ ├── test_router.py │ │ ├── test_shared_context.py.disabled │ │ ├── test_unified_biothings.py │ │ ├── thinking │ │ │ ├── __init__.py │ │ │ └── test_sequential.py │ │ ├── trials │ │ │ ├── test_backward_compatibility.py │ │ │ ├── test_getter.py │ │ │ └── test_search.py │ │ ├── utils │ │ │ ├── test_gene_validator.py │ │ │ ├── test_mutation_filter.py │ │ │ ├── test_rate_limiter.py │ │ │ └── test_request_cache.py │ │ ├── variants │ │ │ ├── constants.py │ │ │ ├── test_alphagenome_api_key.py │ │ │ ├── test_alphagenome_comprehensive.py │ │ │ ├── test_alphagenome.py │ │ │ ├── test_cbioportal_mutations.py │ │ │ ├── test_cbioportal_search.py │ │ │ ├── test_external_integration.py │ │ │ ├── test_external.py │ │ │ ├── test_extract_gene_aa_change.py │ │ │ ├── test_filters.py │ │ │ ├── test_getter.py │ │ │ ├── test_links.py │ │ │ └── test_search.py │ │ └── workers │ │ └── test_worker_sanitization.js │ └── test_pydantic_ai_integration.py ├── THIRD_PARTY_ENDPOINTS.md ├── tox.ini ├── uv.lock └── wrangler.toml ``` # Files -------------------------------------------------------------------------------- /src/biomcp/rate_limiter.py: -------------------------------------------------------------------------------- ```python """Rate limiting implementation for BioMCP API calls.""" import asyncio import time from collections import defaultdict from contextlib import asynccontextmanager from .constants import ( DEFAULT_BURST_SIZE, DEFAULT_RATE_LIMIT_PER_SECOND, ) from .exceptions import BioMCPError class RateLimitExceeded(BioMCPError): """Raised when rate limit is exceeded.""" def __init__(self, domain: str, limit: int, window: int): message = f"Rate limit exceeded for {domain}: {limit} requests per {window} seconds" super().__init__( message, {"domain": domain, "limit": limit, "window": window} ) class RateLimiter: """Token bucket rate limiter implementation.""" def __init__( self, requests_per_second: float = DEFAULT_RATE_LIMIT_PER_SECOND, burst_size: int = DEFAULT_BURST_SIZE, ): """Initialize rate limiter. Args: requests_per_second: Sustained request rate burst_size: Maximum burst capacity """ self.rate = requests_per_second self.burst_size = burst_size self.tokens = float(burst_size) self.last_update = time.monotonic() self._lock = asyncio.Lock() async def acquire(self, tokens: int = 1) -> None: """Acquire tokens from the bucket.""" async with self._lock: now = time.monotonic() elapsed = now - self.last_update self.last_update = now # Add tokens based on elapsed time self.tokens = min( self.burst_size, self.tokens + elapsed * self.rate ) if self.tokens < tokens: # Calculate wait time wait_time = (tokens - self.tokens) / self.rate await asyncio.sleep(wait_time) self.tokens = 0 else: self.tokens -= tokens @asynccontextmanager async def limit(self): """Context manager for rate limiting.""" await self.acquire() yield class DomainRateLimiter: """Rate limiter with per-domain limits.""" def __init__(self, default_rps: float = 10.0, default_burst: int = 20): """Initialize domain rate limiter. Args: default_rps: Default requests per second default_burst: Default burst size """ self.default_rps = default_rps self.default_burst = default_burst self.limiters: dict[str, RateLimiter] = {} self.domain_configs = { "article": {"rps": 20.0, "burst": 40}, # PubMed can handle more "trial": {"rps": 10.0, "burst": 20}, # ClinicalTrials.gov standard "thinking": {"rps": 50.0, "burst": 100}, # Local processing "mygene": {"rps": 10.0, "burst": 20}, # MyGene.info "mydisease": {"rps": 10.0, "burst": 20}, # MyDisease.info "mychem": {"rps": 10.0, "burst": 20}, # MyChem.info "myvariant": {"rps": 15.0, "burst": 30}, # MyVariant.info } def get_limiter(self, domain: str) -> RateLimiter: """Get or create rate limiter for domain.""" if domain not in self.limiters: config = self.domain_configs.get(domain, {}) rps = config.get("rps", self.default_rps) burst = config.get("burst", self.default_burst) self.limiters[domain] = RateLimiter(rps, int(burst)) return self.limiters[domain] @asynccontextmanager async def limit(self, domain: str): """Rate limit context manager for a domain.""" limiter = self.get_limiter(domain) async with limiter.limit(): yield class SlidingWindowRateLimiter: """Sliding window rate limiter for user/IP based limiting.""" def __init__(self, requests: int = 100, window_seconds: int = 60): """Initialize sliding window rate limiter. Args: requests: Maximum requests per window window_seconds: Window size in seconds """ self.max_requests = requests self.window_seconds = window_seconds self.requests: dict[str, list[float]] = defaultdict(list) self._lock = asyncio.Lock() async def check_limit(self, key: str) -> bool: """Check if request is allowed for key.""" async with self._lock: now = time.time() cutoff = now - self.window_seconds # Remove old requests self.requests[key] = [ req_time for req_time in self.requests[key] if req_time > cutoff ] # Check limit if len(self.requests[key]) >= self.max_requests: return False # Add current request self.requests[key].append(now) return True async def acquire(self, key: str) -> None: """Acquire permission to make request.""" if not await self.check_limit(key): raise RateLimitExceeded( key, self.max_requests, self.window_seconds ) # Global instances domain_limiter = DomainRateLimiter() user_limiter = SlidingWindowRateLimiter( requests=1000, window_seconds=3600 ) # 1000 req/hour async def rate_limit_domain(domain: str) -> None: """Apply rate limiting for a domain.""" async with domain_limiter.limit(domain): pass async def rate_limit_user(user_id: str | None = None) -> None: """Apply rate limiting for a user.""" if user_id: await user_limiter.acquire(user_id) ``` -------------------------------------------------------------------------------- /src/biomcp/http_client_simple.py: -------------------------------------------------------------------------------- ```python """Helper functions for simpler HTTP client operations.""" import asyncio import contextlib import json import os import ssl import httpx # Global connection pools per SSL context _connection_pools: dict[str, httpx.AsyncClient] = {} _pool_lock = asyncio.Lock() def close_all_pools(): """Close all connection pools. Useful for cleanup in tests.""" global _connection_pools for pool in _connection_pools.values(): if pool and not pool.is_closed: # Schedule the close in a safe way try: # Store task reference to avoid garbage collection close_task = asyncio.create_task(pool.aclose()) # Optionally add a callback to handle completion close_task.add_done_callback(lambda t: None) except RuntimeError: # If no event loop is running, close synchronously pool._transport.close() _connection_pools.clear() async def get_connection_pool( verify: ssl.SSLContext | str | bool, timeout: httpx.Timeout, ) -> httpx.AsyncClient: """Get or create a shared connection pool for the given SSL context.""" global _connection_pools # Create a key for the pool based on verify setting if isinstance(verify, ssl.SSLContext): pool_key = f"ssl_{id(verify)}" else: pool_key = str(verify) async with _pool_lock: pool = _connection_pools.get(pool_key) if pool is None or pool.is_closed: # Create a new connection pool with optimized settings pool = httpx.AsyncClient( verify=verify, http2=False, # HTTP/2 can add overhead for simple requests timeout=timeout, limits=httpx.Limits( max_keepalive_connections=20, # Reuse connections max_connections=100, # Total connection limit keepalive_expiry=30, # Keep connections alive for 30s ), # Enable connection pooling transport=httpx.AsyncHTTPTransport( retries=0, # We handle retries at a higher level ), ) _connection_pools[pool_key] = pool return pool async def execute_http_request( # noqa: C901 method: str, url: str, params: dict, verify: ssl.SSLContext | str | bool, headers: dict[str, str] | None = None, ) -> tuple[int, str]: """Execute the actual HTTP request using connection pooling. Args: method: HTTP method (GET or POST) url: Target URL params: Request parameters verify: SSL verification settings headers: Optional custom headers Returns: Tuple of (status_code, response_text) Raises: ConnectionError: For connection failures TimeoutError: For timeout errors """ from .constants import HTTP_TIMEOUT_SECONDS try: # Extract custom headers from params if present custom_headers = headers or {} if "_headers" in params: with contextlib.suppress(json.JSONDecodeError, TypeError): custom_headers.update(json.loads(params.pop("_headers"))) # Use the configured timeout from constants timeout = httpx.Timeout(HTTP_TIMEOUT_SECONDS) # Use connection pooling with proper error handling use_pool = ( os.getenv("BIOMCP_USE_CONNECTION_POOL", "true").lower() == "true" ) if use_pool: try: # Use the new connection pool manager from ..connection_pool import get_connection_pool as get_pool client = await get_pool(verify, timeout) should_close = False except Exception: # Fallback to creating a new client client = httpx.AsyncClient( verify=verify, http2=False, timeout=timeout ) should_close = True else: # Create a new client for each request client = httpx.AsyncClient( verify=verify, http2=False, timeout=timeout ) should_close = True try: # Make the request if method.upper() == "GET": resp = await client.get( url, params=params, headers=custom_headers ) elif method.upper() == "POST": resp = await client.post( url, json=params, headers=custom_headers ) else: from .constants import HTTP_ERROR_CODE_UNSUPPORTED_METHOD return ( HTTP_ERROR_CODE_UNSUPPORTED_METHOD, f"Unsupported method {method}", ) # Check for empty response if not resp.text: return resp.status_code, "{}" return resp.status_code, resp.text finally: # Only close if we created a new client if should_close: await client.aclose() except httpx.ConnectError as exc: raise ConnectionError(f"Failed to connect to {url}: {exc}") from exc except httpx.TimeoutException as exc: raise TimeoutError(f"Request to {url} timed out: {exc}") from exc except httpx.HTTPError as exc: error_msg = str(exc) if str(exc) else "Network connectivity error" from .constants import HTTP_ERROR_CODE_NETWORK return HTTP_ERROR_CODE_NETWORK, error_msg ``` -------------------------------------------------------------------------------- /docs/developer-guides/06-http-client-and-caching.md: -------------------------------------------------------------------------------- ```markdown # BioMCP HTTP Client Guide ## Overview BioMCP uses a centralized HTTP client for all external API calls. This provides: - Consistent error handling and retry logic - Request/response caching - Rate limiting per domain - Circuit breaker for fault tolerance - Offline mode support - Comprehensive endpoint tracking ## Migration from Direct HTTP Libraries ### Before (Direct httpx usage): ```python import httpx async def fetch_gene(gene: str): async with httpx.AsyncClient() as client: response = await client.get(f"https://api.example.com/genes/{gene}") response.raise_for_status() return response.json() ``` ### After (Centralized client): ```python from biomcp import http_client async def fetch_gene(gene: str): data, error = await http_client.request_api( url=f"https://api.example.com/genes/{gene}", request={}, domain="example" ) if error: # Handle error consistently return None return data ``` ## Error Handling The centralized client uses a consistent error handling pattern: ```python result, error = await http_client.request_api(...) if error: # error is a RequestError object with: # - error.code: HTTP status code or error type # - error.message: Human-readable error message # - error.details: Additional context logger.error(f"Request failed: {error.message}") return None # or handle appropriately ``` ### Error Handling Guidelines 1. **For optional data**: Return `None` when the data is not critical 2. **For required data**: Raise an exception or return an error to the caller 3. **For batch operations**: Collect errors and report at the end 4. **For user-facing operations**: Provide clear, actionable error messages ## Creating Domain-Specific Adapters For complex APIs, create an adapter class: ```python from biomcp import http_client from biomcp.http_client import RequestError class MyAPIAdapter: """Adapter for MyAPI using centralized HTTP client.""" def __init__(self): self.base_url = "https://api.example.com" async def get_resource(self, resource_id: str) -> tuple[dict | None, RequestError | None]: """Fetch a resource by ID. Returns: Tuple of (data, error) where one is always None """ return await http_client.request_api( url=f"{self.base_url}/resources/{resource_id}", request={}, domain="example", endpoint_key="example_resources" ) ``` ## Configuration ### Cache TTL (Time To Live) ```python # Cache for 1 hour (3600 seconds) data, error = await http_client.request_api( url=url, request=request, cache_ttl=3600 ) # Disable caching for this request data, error = await http_client.request_api( url=url, request=request, cache_ttl=0 ) ``` ### Rate Limiting Rate limits are configured per domain in `http_client.py`: ```python # Default rate limits rate_limits = { "ncbi.nlm.nih.gov": 20, # 20 requests/second "clinicaltrials.gov": 10, # 10 requests/second "myvariant.info": 1000/3600, # 1000 requests/hour } ``` ### Circuit Breaker The circuit breaker prevents cascading failures: - **Closed**: Normal operation - **Open**: Failing fast after threshold exceeded - **Half-Open**: Testing if service recovered Configure thresholds: ```python CIRCUIT_BREAKER_FAILURE_THRESHOLD = 5 # Open after 5 failures CIRCUIT_BREAKER_RECOVERY_TIMEOUT = 60 # Try again after 60 seconds ``` ## Offline Mode Enable offline mode to only serve cached responses: ```bash export BIOMCP_OFFLINE=true biomcp run ``` In offline mode: - Only cached responses are returned - No external HTTP requests are made - Missing cache entries return None with appropriate error ## Performance Tuning ### Connection Pooling The HTTP client maintains connection pools per domain: ```python # Configure in http_client_simple.py limits = httpx.Limits( max_keepalive_connections=20, max_connections=100, keepalive_expiry=30 ) ``` ### Concurrent Requests For parallel requests to the same API: ```python import asyncio # Fetch multiple resources concurrently tasks = [ http_client.request_api(f"/resource/{i}", {}, domain="example") for i in range(10) ] results = await asyncio.gather(*tasks) ``` ## Monitoring and Debugging ### Request Metrics The client tracks metrics per endpoint: - Request count - Error count - Cache hit/miss ratio - Average response time Access metrics: ```python from biomcp.http_client import get_metrics metrics = get_metrics() ``` ### Debug Logging Enable debug logging to see all HTTP requests: ```python import logging logging.getLogger("biomcp.http_client").setLevel(logging.DEBUG) ``` ## Best Practices 1. **Always use the centralized client** for external HTTP calls 2. **Register new endpoints** in the endpoint registry 3. **Set appropriate cache TTLs** based on data volatility 4. **Handle errors gracefully** with user-friendly messages 5. **Test with offline mode** to ensure cache coverage 6. **Monitor rate limits** to avoid API throttling 7. **Use domain-specific adapters** for complex APIs ## Endpoint Registration Register new endpoints in `endpoint_registry.py`: ```python registry.register( "my_api_endpoint", EndpointInfo( url="https://api.example.com/v1/data", category=EndpointCategory.BIOMEDICAL_LITERATURE, data_types=[DataType.RESEARCH_ARTICLES], description="My API for fetching data", compliance_notes="Public API, no PII", rate_limit="100 requests/minute" ) ) ``` This ensures the endpoint is documented and tracked properly. ``` -------------------------------------------------------------------------------- /tests/tdd/articles/test_cbioportal_integration.py: -------------------------------------------------------------------------------- ```python """Test cBioPortal integration with article searches.""" import json import pytest from biomcp.articles.search import PubmedRequest from biomcp.articles.unified import search_articles_unified class TestArticleCBioPortalIntegration: """Test that cBioPortal summaries appear in article searches.""" @pytest.mark.asyncio @pytest.mark.integration async def test_article_search_with_gene_includes_cbioportal(self): """Test that searching articles for a gene includes cBioPortal summary.""" request = PubmedRequest( genes=["BRAF"], keywords=["melanoma"], ) # Test markdown output result = await search_articles_unified( request, include_pubmed=True, include_preprints=False, output_json=False, ) # Should include cBioPortal summary assert "cBioPortal Summary for BRAF" in result assert "Mutation Frequency" in result # Top Hotspots is only included when mutations are found # When cBioPortal API returns empty data, it won't be present if "0.0%" not in result: # If mutation frequency is not 0 assert "Top Hotspots" in result assert "---" in result # Separator between summary and articles # Should still include article results assert "pmid" in result or "Title" in result or "Record" in result @pytest.mark.asyncio @pytest.mark.integration async def test_article_search_json_with_gene(self): """Test JSON output includes cBioPortal summary.""" request = PubmedRequest( genes=["TP53"], keywords=["cancer"], ) result = await search_articles_unified( request, include_pubmed=True, include_preprints=False, output_json=True, ) # Parse JSON data = json.loads(result) # Should have both summary and articles assert "cbioportal_summary" in data assert "articles" in data assert "TP53" in data["cbioportal_summary"] assert isinstance(data["articles"], list) assert len(data["articles"]) > 0 @pytest.mark.asyncio @pytest.mark.integration async def test_article_search_without_gene_no_cbioportal(self): """Test that searches without genes don't include cBioPortal summary.""" request = PubmedRequest( diseases=["hypertension"], keywords=["treatment"], ) # Test markdown output result = await search_articles_unified( request, include_pubmed=True, include_preprints=False, output_json=False, ) # Should NOT include cBioPortal summary assert "cBioPortal Summary" not in result assert "Mutation Frequency" not in result @pytest.mark.asyncio @pytest.mark.integration async def test_article_search_multiple_genes(self): """Test that searching with multiple genes uses the first one.""" request = PubmedRequest( genes=["KRAS", "NRAS", "BRAF"], diseases=["colorectal cancer"], ) result = await search_articles_unified( request, include_pubmed=True, include_preprints=False, output_json=False, ) # Should include cBioPortal summary for KRAS (first gene) assert "cBioPortal Summary for KRAS" in result # Common KRAS hotspot assert "G12" in result or "mutation" in result @pytest.mark.asyncio @pytest.mark.integration async def test_article_search_with_invalid_gene(self): """Test graceful handling of invalid gene names.""" request = PubmedRequest( genes=["BRCA1"], # Valid gene keywords=["cancer"], ) # First check that we handle invalid genes gracefully # by using a real gene that might have cBioPortal data result = await search_articles_unified( request, include_pubmed=True, include_preprints=False, output_json=False, ) # Should have some content - either cBioPortal summary or articles assert len(result) > 50 # Some reasonable content # Now test with a gene that's valid for search but not in cBioPortal request2 = PubmedRequest( genes=["ACE2"], # Real gene but might not be in cancer studies keywords=["COVID-19"], ) result2 = await search_articles_unified( request2, include_pubmed=True, include_preprints=False, output_json=False, ) # Should return results even if cBioPortal data is not available assert len(result2) > 50 @pytest.mark.asyncio @pytest.mark.integration async def test_article_search_with_preprints_and_cbioportal(self): """Test that cBioPortal summary works with preprint searches too.""" request = PubmedRequest( genes=["EGFR"], keywords=["lung cancer", "osimertinib"], ) result = await search_articles_unified( request, include_pubmed=True, include_preprints=True, output_json=False, ) # Should include cBioPortal summary assert "cBioPortal Summary for EGFR" in result # Should include both peer-reviewed and preprint results assert ("pmid" in result or "Title" in result) and ( "Preprint" in result or "bioRxiv" in result or "peer_reviewed" in result ) ``` -------------------------------------------------------------------------------- /src/biomcp/diseases/getter.py: -------------------------------------------------------------------------------- ```python """Disease information retrieval from MyDisease.info.""" import json import logging from typing import Annotated from pydantic import Field from ..integrations import BioThingsClient from ..render import to_markdown logger = logging.getLogger(__name__) def _add_disease_links(disease_info, result: dict) -> None: """Add helpful links to disease result.""" links = {} # Add MONDO browser link if available if ( disease_info.mondo and isinstance(disease_info.mondo, dict) and (mondo_id := disease_info.mondo.get("mondo")) and isinstance(mondo_id, str) and mondo_id.startswith("MONDO:") ): links["MONDO Browser"] = ( f"https://www.ebi.ac.uk/ols/ontologies/mondo/terms?iri=http://purl.obolibrary.org/obo/{mondo_id.replace(':', '_')}" ) # Add Disease Ontology link if available if ( disease_info.xrefs and isinstance(disease_info.xrefs, dict) and (doid := disease_info.xrefs.get("doid")) ): if isinstance(doid, list) and doid: doid_id = doid[0] if isinstance(doid[0], str) else str(doid[0]) links["Disease Ontology"] = ( f"https://www.disease-ontology.org/?id={doid_id}" ) elif isinstance(doid, str): links["Disease Ontology"] = ( f"https://www.disease-ontology.org/?id={doid}" ) # Add PubMed search link if disease_info.name: links["PubMed Search"] = ( f"https://pubmed.ncbi.nlm.nih.gov/?term={disease_info.name.replace(' ', '+')}" ) if links: result["_links"] = links def _format_disease_output(disease_info, result: dict) -> None: """Format disease output for display.""" # Format synonyms nicely if disease_info.synonyms: result["synonyms"] = ", ".join( disease_info.synonyms[:10] ) # Limit to first 10 if len(disease_info.synonyms) > 10: result["synonyms"] += ( f" (and {len(disease_info.synonyms) - 10} more)" ) # Format phenotypes if present if disease_info.phenotypes: # Just show count and first few phenotypes phenotype_names = [] for pheno in disease_info.phenotypes[:5]: if isinstance(pheno, dict) and "phenotype" in pheno: phenotype_names.append(pheno["phenotype"]) if phenotype_names: result["associated_phenotypes"] = ", ".join(phenotype_names) if len(disease_info.phenotypes) > 5: result["associated_phenotypes"] += ( f" (and {len(disease_info.phenotypes) - 5} more)" ) # Remove the raw phenotypes data for cleaner output result.pop("phenotypes", None) async def get_disease( disease_id_or_name: str, output_json: bool = False, ) -> str: """ Get disease information from MyDisease.info. Args: disease_id_or_name: Disease ID (MONDO, DOID) or name (e.g., "melanoma", "MONDO:0016575") output_json: Return as JSON instead of markdown Returns: Disease information as markdown or JSON string """ client = BioThingsClient() try: disease_info = await client.get_disease_info(disease_id_or_name) if not disease_info: error_data = { "error": f"Disease '{disease_id_or_name}' not found", "suggestion": "Please check the disease name or ID (MONDO:, DOID:, OMIM:, MESH:)", } return ( json.dumps(error_data, indent=2) if output_json else to_markdown([error_data]) ) # Convert to dict for rendering result = disease_info.model_dump(exclude_none=True) # Add helpful links _add_disease_links(disease_info, result) # Format output for display _format_disease_output(disease_info, result) if output_json: return json.dumps(result, indent=2) else: return to_markdown([result]) except Exception as e: logger.error( f"Error fetching disease info for {disease_id_or_name}: {e}" ) error_data = { "error": "Failed to retrieve disease information", "details": str(e), } return ( json.dumps(error_data, indent=2) if output_json else to_markdown([error_data]) ) async def _disease_details( call_benefit: Annotated[ str, "Define and summarize why this function is being called and the intended benefit", ], disease_id_or_name: Annotated[ str, Field( description="Disease name (e.g., melanoma, GIST) or ID (e.g., MONDO:0016575, DOID:1909)" ), ], ) -> str: """ Retrieves detailed information for a disease from MyDisease.info. This tool provides real-time disease annotations including: - Official disease name and definition - Disease synonyms and alternative names - Ontology mappings (MONDO, DOID, OMIM, etc.) - Associated phenotypes - Links to disease databases Parameters: - call_benefit: Define why this function is being called - disease_id_or_name: Disease name or ontology ID Process: Queries MyDisease.info API for up-to-date disease information Output: Markdown formatted disease information with definition and metadata Note: For clinical trials about diseases, use trial_searcher. For articles about diseases, use article_searcher. """ return await get_disease(disease_id_or_name, output_json=False) ``` -------------------------------------------------------------------------------- /src/biomcp/connection_pool.py: -------------------------------------------------------------------------------- ```python """Connection pool manager with proper event loop lifecycle management. This module provides HTTP connection pooling that is properly integrated with asyncio event loops. It ensures that connection pools are: - Created per event loop to avoid cross-loop usage - Automatically cleaned up when event loops are garbage collected - Reused across requests for better performance Key Features: - Event loop isolation - each loop gets its own pools - Weak references prevent memory leaks - Automatic cleanup on loop destruction - Thread-safe pool management Example: ```python # Get a connection pool for the current event loop pool = await get_connection_pool(verify=True, timeout=httpx.Timeout(30)) # Use the pool for multiple requests (no need to close) response = await pool.get("https://api.example.com/data") ``` Environment Variables: BIOMCP_USE_CONNECTION_POOL: Enable/disable pooling (default: "true") """ import asyncio import ssl import weakref # NOTE: httpx import is allowed in this file for connection pooling infrastructure import httpx class EventLoopConnectionPools: """Manages connection pools per event loop. This class ensures that each asyncio event loop has its own set of connection pools, preventing cross-loop contamination and ensuring proper cleanup when event loops are destroyed. Attributes: _loop_pools: Weak key dictionary mapping event loops to their pools _lock: Asyncio lock for thread-safe pool creation """ def __init__(self): # Use weak references to avoid keeping event loops alive self._loop_pools: weakref.WeakKeyDictionary = ( weakref.WeakKeyDictionary() ) self._lock = asyncio.Lock() async def get_pool( self, verify: ssl.SSLContext | str | bool, timeout: httpx.Timeout ) -> httpx.AsyncClient: """Get or create a connection pool for the current event loop.""" try: loop = asyncio.get_running_loop() except RuntimeError: # No event loop running, return a single-use client return self._create_client(verify, timeout, pooled=False) # Get or create pools dict for this event loop async with self._lock: if loop not in self._loop_pools: self._loop_pools[loop] = {} # Register cleanup when loop is garbage collected self._register_loop_cleanup(loop) pools = self._loop_pools[loop] pool_key = self._get_pool_key(verify) # Check if we have a valid pool if pool_key in pools and not pools[pool_key].is_closed: return pools[pool_key] # Create new pool client = self._create_client(verify, timeout, pooled=True) pools[pool_key] = client return client def _get_pool_key(self, verify: ssl.SSLContext | str | bool) -> str: """Generate a key for the connection pool.""" if isinstance(verify, ssl.SSLContext): return f"ssl_{id(verify)}" return str(verify) def _create_client( self, verify: ssl.SSLContext | str | bool, timeout: httpx.Timeout, pooled: bool = True, ) -> httpx.AsyncClient: """Create a new HTTP client.""" if pooled: limits = httpx.Limits( max_keepalive_connections=20, max_connections=100, keepalive_expiry=30, ) else: # Single-use client limits = httpx.Limits(max_keepalive_connections=0) return httpx.AsyncClient( verify=verify, http2=False, # HTTP/2 can add overhead timeout=timeout, limits=limits, ) def _register_loop_cleanup(self, loop: asyncio.AbstractEventLoop): """Register cleanup when event loop is garbage collected.""" # Store pools to close when loop is garbage collected # Note: We can't create weak references to dicts, so we'll # clean up pools when the loop itself is garbage collected def cleanup(): # Get pools for this loop if they still exist pools = self._loop_pools.get(loop, {}) if pools: # Try to close all clients gracefully for client in list(pools.values()): if client and not client.is_closed: # Close synchronously since loop might be gone import contextlib with contextlib.suppress(Exception): client._transport.close() # Register finalizer on the loop itself weakref.finalize(loop, cleanup) async def close_all(self): """Close all connection pools.""" async with self._lock: all_clients = [] for pools in self._loop_pools.values(): all_clients.extend(pools.values()) # Close all clients close_tasks = [] for client in all_clients: if client and not client.is_closed: close_tasks.append(client.aclose()) if close_tasks: await asyncio.gather(*close_tasks, return_exceptions=True) self._loop_pools.clear() # Global instance _pool_manager = EventLoopConnectionPools() async def get_connection_pool( verify: ssl.SSLContext | str | bool, timeout: httpx.Timeout, ) -> httpx.AsyncClient: """Get a connection pool for the current event loop.""" return await _pool_manager.get_pool(verify, timeout) async def close_all_pools(): """Close all connection pools.""" await _pool_manager.close_all() ``` -------------------------------------------------------------------------------- /src/biomcp/parameter_parser.py: -------------------------------------------------------------------------------- ```python """Parameter parsing and validation for BioMCP.""" import json import logging from typing import Any from biomcp.exceptions import InvalidParameterError logger = logging.getLogger(__name__) class ParameterParser: """Handles parameter parsing and validation for search requests.""" @staticmethod def parse_list_param( param: str | list[str] | None, param_name: str ) -> list[str] | None: """Convert various input formats to lists. Handles: - JSON arrays: '["item1", "item2"]' -> ['item1', 'item2'] - Comma-separated: 'item1, item2' -> ['item1', 'item2'] - Single values: 'item' -> ['item'] - None values: None -> None - Already parsed lists: ['item'] -> ['item'] Args: param: The parameter to parse param_name: Name of the parameter for error messages Returns: Parsed list or None Raises: InvalidParameterError: If parameter cannot be parsed """ if param is None: return None if isinstance(param, str): # First try to parse as JSON array if param.startswith("["): try: parsed = json.loads(param) if not isinstance(parsed, list): raise InvalidParameterError( param_name, param, "JSON array or comma-separated string", ) return parsed except (json.JSONDecodeError, TypeError) as e: logger.debug(f"Failed to parse {param_name} as JSON: {e}") # If it's a comma-separated string, split it if "," in param: return [item.strip() for item in param.split(",")] # Otherwise return as single-item list return [param] # If it's already a list, validate and return as-is if isinstance(param, list): # Validate all items are strings if not all(isinstance(item, str) for item in param): raise InvalidParameterError( param_name, param, "list of strings" ) return param # Invalid type raise InvalidParameterError( param_name, param, "string, list of strings, or None" ) @staticmethod def normalize_phase(phase: str | None) -> str | None: """Normalize phase values for clinical trials. Converts various formats to standard enum values: - "Phase 3" -> "PHASE3" - "phase 3" -> "PHASE3" - "PHASE 3" -> "PHASE3" - "phase3" -> "PHASE3" Args: phase: Phase value to normalize Returns: Normalized phase value or None """ if phase is None: return None # Convert to uppercase and remove spaces normalized = phase.upper().replace(" ", "") # Validate it matches expected pattern valid_phases = [ "EARLYPHASE1", "PHASE1", "PHASE2", "PHASE3", "PHASE4", "NOTAPPLICABLE", ] if normalized not in valid_phases: # Try to be helpful with common mistakes if "EARLY" in normalized and "1" in normalized: return "EARLYPHASE1" if "NOT" in normalized and "APPLICABLE" in normalized: return "NOTAPPLICABLE" raise InvalidParameterError( "phase", phase, f"one of: {', '.join(valid_phases)}" ) return normalized @staticmethod def validate_page_params(page: int, page_size: int) -> tuple[int, int]: """Validate pagination parameters. Args: page: Page number (minimum 1) page_size: Results per page (1-100) Returns: Validated (page, page_size) tuple Raises: InvalidParameterError: If parameters are invalid """ if page < 1: raise InvalidParameterError("page", page, "integer >= 1") if page_size < 1 or page_size > 100: raise InvalidParameterError( "page_size", page_size, "integer between 1 and 100" ) return page, page_size @staticmethod def parse_search_params( params: dict[str, Any], domain: str ) -> dict[str, Any]: """Parse and validate all search parameters for a domain. Args: params: Raw parameters dictionary domain: Domain being searched Returns: Validated parameters dictionary """ parsed: dict[str, Any] = {} # Common list parameters list_params = [ "genes", "diseases", "variants", "chemicals", "keywords", "conditions", "interventions", ] for param_name in list_params: if param_name in params and params[param_name] is not None: parsed[param_name] = ParameterParser.parse_list_param( params[param_name], param_name ) # Domain-specific parameters if ( domain == "trial" and "phase" in params and params.get("phase") is not None ): parsed["phase"] = ParameterParser.normalize_phase( params.get("phase") ) # Pass through other parameters for key, value in params.items(): if key not in parsed and key not in list_params and key != "phase": parsed[key] = value return parsed ``` -------------------------------------------------------------------------------- /src/biomcp/openfda/drug_labels.py: -------------------------------------------------------------------------------- ```python """ OpenFDA Drug Labels (SPL) integration. """ import logging from .constants import ( OPENFDA_DEFAULT_LIMIT, OPENFDA_DISCLAIMER, OPENFDA_DRUG_LABELS_URL, OPENFDA_MAX_LIMIT, ) from .drug_labels_helpers import ( build_label_search_query, format_label_header, format_label_section, format_label_summary, get_default_sections, get_section_titles, ) from .utils import clean_text, format_count, make_openfda_request logger = logging.getLogger(__name__) async def search_drug_labels( name: str | None = None, indication: str | None = None, boxed_warning: bool = False, section: str | None = None, limit: int = OPENFDA_DEFAULT_LIMIT, skip: int = 0, api_key: str | None = None, ) -> str: """ Search FDA drug product labels (SPL). Args: name: Drug name to search for indication: Search for drugs indicated for this condition boxed_warning: Filter for drugs with boxed warnings section: Specific label section to search limit: Maximum number of results skip: Number of results to skip api_key: Optional OpenFDA API key (overrides OPENFDA_API_KEY env var) Returns: Formatted string with drug label information """ if not name and not indication and not section and not boxed_warning: return ( "⚠️ Please specify a drug name, indication, or label section to search.\n\n" "Examples:\n" "- Search by name: --name 'pembrolizumab'\n" "- Search by indication: --indication 'melanoma'\n" "- Search by section: --section 'contraindications'" ) # Build and execute search search_query = build_label_search_query( name, indication, boxed_warning, section ) params = { "search": search_query, "limit": min(limit, OPENFDA_MAX_LIMIT), "skip": skip, } response, error = await make_openfda_request( OPENFDA_DRUG_LABELS_URL, params, "openfda_drug_labels", api_key ) if error: return f"⚠️ Error searching drug labels: {error}" if not response or not response.get("results"): return _format_no_results(name, indication, section) results = response["results"] total = ( response.get("meta", {}).get("results", {}).get("total", len(results)) ) # Build output output = ["## FDA Drug Labels\n"] output.extend(_format_search_summary(name, indication, section, total)) # Display results output.append( f"### Results (showing {min(len(results), 5)} of {total}):\n" ) for i, result in enumerate(results[:5], 1): output.extend(format_label_summary(result, i)) # Add tip for getting full labels if total > 0 and results and "set_id" in results[0]: output.append( "\n💡 **Tip**: Use `biomcp openfda label-get <label_id>` to retrieve " "the complete label for any drug." ) output.append(f"\n{OPENFDA_DISCLAIMER}") return "\n".join(output) def _format_no_results( name: str | None, indication: str | None, section: str | None ) -> str: """Format no results message.""" search_desc = [] if name: search_desc.append(f"drug '{name}'") if indication: search_desc.append(f"indication '{indication}'") if section: search_desc.append(f"section '{section}'") return f"No drug labels found for {' and '.join(search_desc)}." def _format_search_summary( name: str | None, indication: str | None, section: str | None, total: int ) -> list[str]: """Format the search summary.""" output = [] search_desc = [] if name: search_desc.append(f"**Drug**: {name}") if indication: search_desc.append(f"**Indication**: {indication}") if section: search_desc.append(f"**Section**: {section}") if search_desc: output.append(" | ".join(search_desc)) output.append(f"**Total Labels Found**: {format_count(total, 'label')}\n") return output async def get_drug_label( set_id: str, sections: list[str] | None = None, api_key: str | None = None, ) -> str: """ Get detailed drug label information by set ID. Args: set_id: Label set ID sections: Specific sections to retrieve (default: key sections) api_key: Optional OpenFDA API key (overrides OPENFDA_API_KEY env var) Returns: Formatted string with detailed label information """ params = { "search": f'set_id:"{set_id}"', "limit": 1, } response, error = await make_openfda_request( OPENFDA_DRUG_LABELS_URL, params, "openfda_drug_label_detail", api_key ) if error: return f"⚠️ Error retrieving drug label: {error}" if not response or not response.get("results"): return f"Drug label with ID '{set_id}' not found." result = response["results"][0] # Use default sections if not specified if not sections: sections = get_default_sections() # Build output output = format_label_header(result, set_id) # Boxed warning (if exists) if "boxed_warning" in result: output.extend(_format_boxed_warning(result["boxed_warning"])) # Display requested sections section_titles = get_section_titles() for section in sections: output.extend(format_label_section(result, section, section_titles)) output.append(f"\n{OPENFDA_DISCLAIMER}") return "\n".join(output) def _format_boxed_warning(boxed_warning: list) -> list[str]: """Format boxed warning section.""" output = ["### ⚠️ BOXED WARNING\n"] warning_text = clean_text(" ".join(boxed_warning)) output.append(warning_text) output.append("") return output ``` -------------------------------------------------------------------------------- /src/biomcp/cli/articles.py: -------------------------------------------------------------------------------- ```python import asyncio import json from typing import Annotated import typer from ..articles import fetch from ..articles.search import PubmedRequest, search_articles from ..articles.unified import search_articles_unified article_app = typer.Typer(help="Search and retrieve biomedical articles.") async def get_article_details( identifier: str, output_json: bool = False ) -> str: """Get article details handling both PMIDs and DOIs with proper output format.""" # Use the fetch module functions directly to control output format if fetch.is_doi(identifier): from ..articles.preprints import fetch_europe_pmc_article return await fetch_europe_pmc_article( identifier, output_json=output_json ) elif fetch.is_pmid(identifier): return await fetch.fetch_articles( [int(identifier)], full=True, output_json=output_json ) else: # Unknown identifier format error_data = [ { "error": f"Invalid identifier format: {identifier}. Expected either a PMID (numeric) or DOI (10.xxxx/xxxx format)." } ] if output_json: return json.dumps(error_data, indent=2) else: from .. import render return render.to_markdown(error_data) @article_app.command("search") def search_article( genes: Annotated[ list[str] | None, typer.Option( "--gene", "-g", help="Gene name to search for (can be specified multiple times)", ), ] = None, variants: Annotated[ list[str] | None, typer.Option( "--variant", "-v", help="Genetic variant to search for (can be specified multiple times)", ), ] = None, diseases: Annotated[ list[str] | None, typer.Option( "--disease", "-d", help="Disease name to search for (can be specified multiple times)", ), ] = None, chemicals: Annotated[ list[str] | None, typer.Option( "--chemical", "-c", help="Chemical name to search for (can be specified multiple times)", ), ] = None, keywords: Annotated[ list[str] | None, typer.Option( "--keyword", "-k", help="Keyword to search for (can be specified multiple times)", ), ] = None, page: Annotated[ int, typer.Option( "--page", "-p", help="Page number for pagination (starts at 1)", ), ] = 1, output_json: Annotated[ bool, typer.Option( "--json", "-j", help="Render in JSON format", case_sensitive=False, ), ] = False, include_preprints: Annotated[ bool, typer.Option( "--include-preprints/--no-preprints", help="Include preprint articles from bioRxiv/medRxiv and Europe PMC", ), ] = True, ): """Search biomedical research articles""" request = PubmedRequest( genes=genes or [], variants=variants or [], diseases=diseases or [], chemicals=chemicals or [], keywords=keywords or [], ) if include_preprints: result = asyncio.run( search_articles_unified( request, include_pubmed=True, include_preprints=True, output_json=output_json, ) ) else: result = asyncio.run(search_articles(request, output_json)) typer.echo(result) @article_app.command("get") def get_article( identifiers: Annotated[ list[str], typer.Argument( help="Article identifiers - PubMed IDs (e.g., 38768446) or DOIs (e.g., 10.1101/2024.01.20.23288905)", ), ], full: Annotated[ bool, typer.Option( "--full", "-f", help="Whether to fetch full article text (PubMed only)", ), ] = False, output_json: Annotated[ bool, typer.Option( "--json", "-j", help="Render in JSON format", case_sensitive=False, ), ] = False, ): """ Retrieve articles by PubMed ID or DOI. Supports: - PubMed IDs for published articles (e.g., 38768446) - DOIs for Europe PMC preprints (e.g., 10.1101/2024.01.20.23288905) For multiple articles, results are returned as a list. """ # Handle single identifier if len(identifiers) == 1: result = asyncio.run( get_article_details(identifiers[0], output_json=output_json) ) else: # For multiple identifiers, we need to handle them individually # since they might be a mix of PMIDs and DOIs results = [] for identifier in identifiers: article_result = asyncio.run( get_article_details(identifier, output_json=True) ) # Parse the result and add to list try: article_data = json.loads(article_result) if isinstance(article_data, list): results.extend(article_data) else: results.append(article_data) except json.JSONDecodeError: # This shouldn't happen with our new function results.append({ "error": f"Failed to parse result for {identifier}" }) if output_json: result = json.dumps(results, indent=2) else: from .. import render result = render.to_markdown(results) typer.echo(result) ``` -------------------------------------------------------------------------------- /tests/tdd/variants/test_extract_gene_aa_change.py: -------------------------------------------------------------------------------- ```python """Tests for _extract_gene_aa_change method in external.py.""" import pytest from biomcp.variants.external import ExternalVariantAggregator class TestExtractGeneAAChange: """Test the _extract_gene_aa_change method.""" @pytest.fixture def aggregator(self): """Create an ExternalVariantAggregator instance.""" return ExternalVariantAggregator() def test_extract_from_docm(self, aggregator): """Test extraction from DOCM data.""" variant_data = {"docm": {"gene": "BRAF", "aa_change": "p.V600E"}} result = aggregator._extract_gene_aa_change(variant_data) assert result == "BRAF V600E" def test_extract_from_hgvsp_long_format(self, aggregator): """Test extraction from hgvsp with long amino acid names.""" variant_data = { "cadd": {"gene": {"genename": "TP53"}}, "hgvsp": ["p.Arg175His"], } result = aggregator._extract_gene_aa_change(variant_data) # The code doesn't convert all long forms, just checks for Val/Ala assert result == "TP53 Arg175His" def test_extract_from_hgvsp_with_dbnsfp(self, aggregator): """Test extraction from hgvsp with dbnsfp gene name.""" variant_data = { "dbnsfp": {"genename": "EGFR"}, "hgvsp": ["p.Leu858Arg"], } result = aggregator._extract_gene_aa_change(variant_data) # The code doesn't convert Leu/Arg to L/R assert result == "EGFR Leu858Arg" def test_extract_from_cadd_data(self, aggregator): """Test extraction from CADD annotations.""" variant_data = { "cadd": { "gene": {"genename": "KRAS", "prot": {"protpos": 12}}, "oaa": "G", "naa": "D", } } result = aggregator._extract_gene_aa_change(variant_data) assert result == "KRAS G12D" def test_extract_from_docm_without_p_prefix(self, aggregator): """Test extraction from DOCM without p. prefix.""" variant_data = {"docm": {"gene": "PIK3CA", "aa_change": "E545K"}} result = aggregator._extract_gene_aa_change(variant_data) assert result == "PIK3CA E545K" def test_extract_with_multiple_hgvsp(self, aggregator): """Test handling of multiple hgvsp entries - should take first.""" variant_data = { "cadd": {"gene": {"genename": "BRCA1"}}, "hgvsp": ["p.Gln1756Ter", "p.Gln1756*"], } result = aggregator._extract_gene_aa_change(variant_data) # Takes the first one, doesn't convert Gln/Ter assert result == "BRCA1 Gln1756Ter" def test_extract_with_special_characters(self, aggregator): """Test extraction with special characters in protein change.""" variant_data = { "cadd": {"gene": {"genename": "MLH1"}}, "hgvsp": ["p.Lys618Alafs*9"], } result = aggregator._extract_gene_aa_change(variant_data) # Should extract the basic AA change pattern assert result is not None assert "MLH1" in result def test_extract_no_gene_name(self, aggregator): """Test when gene name is missing.""" variant_data = {"hgvsp": ["p.Val600Glu"]} result = aggregator._extract_gene_aa_change(variant_data) assert result is None def test_extract_no_aa_change(self, aggregator): """Test when AA change is missing.""" variant_data = {"cadd": {"gene": {"genename": "BRAF"}}} result = aggregator._extract_gene_aa_change(variant_data) assert result is None def test_extract_empty_variant_data(self, aggregator): """Test with empty variant data.""" result = aggregator._extract_gene_aa_change({}) assert result is None def test_extract_malformed_hgvsp(self, aggregator): """Test with malformed HGVS protein notation.""" variant_data = { "clinvar": { "gene": {"symbol": "MYC"}, "hgvs": {"protein": ["invalid_format"]}, } } result = aggregator._extract_gene_aa_change(variant_data) assert result is None def test_extract_priority_order(self, aggregator): """Test that DOCM is prioritized for AA change, CADD for gene name.""" variant_data = { "docm": {"gene": "BRAF", "aa_change": "p.V600E"}, "hgvsp": ["p.Val600Lys"], # Different change "cadd": { "gene": {"genename": "WRONG", "prot": {"protpos": 600}}, "oaa": "V", "naa": "K", }, } result = aggregator._extract_gene_aa_change(variant_data) # CADD is prioritized for gene name, DOCM for AA change assert result == "WRONG V600E" def test_extract_regex_with_val_ala(self, aggregator): """Test regex extraction when Val/Ala are present.""" # The code specifically looks for Val or Ala to trigger regex variant_data = { "cadd": {"gene": {"genename": "TEST1"}}, "hgvsp": ["p.Val600Ala"], } result = aggregator._extract_gene_aa_change(variant_data) # The regex doesn't find a match in "Val600Ala" because it's looking for [A-Z]\d+[A-Z] # which would match "V600A" but not "Val600Ala" assert result == "TEST1 Val600Ala" def test_extract_handles_exceptions_gracefully(self, aggregator): """Test that exceptions are handled gracefully.""" # This should trigger an exception internally but return None variant_data = { "cadd": {"gene": {"genename": "GENE"}}, "hgvsp": None, # This will cause issues } result = aggregator._extract_gene_aa_change(variant_data) assert result is None ``` -------------------------------------------------------------------------------- /tests/tdd/test_openfda_unified.py: -------------------------------------------------------------------------------- ```python """Tests for OpenFDA integration with unified search/fetch tools.""" import pytest class TestOpenFDAUnifiedIntegration: """Test OpenFDA domain integration in unified tools.""" def test_openfda_domains_registered(self): """Test that OpenFDA domains are properly registered in constants.""" from biomcp.constants import ( DOMAIN_TO_PLURAL, PLURAL_TO_DOMAIN, VALID_DOMAINS, VALID_DOMAINS_PLURAL, ) # List of OpenFDA domains openfda_domains = [ "fda_adverse", "fda_label", "fda_device", "fda_approval", "fda_recall", "fda_shortage", ] openfda_plurals = [ "fda_adverse_events", "fda_labels", "fda_device_events", "fda_approvals", "fda_recalls", "fda_shortages", ] # Check that all OpenFDA domains are registered for domain in openfda_domains: assert domain in VALID_DOMAINS, f"{domain} not in VALID_DOMAINS" assert ( domain in DOMAIN_TO_PLURAL ), f"{domain} not in DOMAIN_TO_PLURAL" # Check plural forms for plural in openfda_plurals: assert ( plural in VALID_DOMAINS_PLURAL ), f"{plural} not in VALID_DOMAINS_PLURAL" assert ( plural in PLURAL_TO_DOMAIN ), f"{plural} not in PLURAL_TO_DOMAIN" # Check mappings are correct assert DOMAIN_TO_PLURAL["fda_adverse"] == "fda_adverse_events" assert DOMAIN_TO_PLURAL["fda_label"] == "fda_labels" assert DOMAIN_TO_PLURAL["fda_device"] == "fda_device_events" assert DOMAIN_TO_PLURAL["fda_approval"] == "fda_approvals" assert DOMAIN_TO_PLURAL["fda_recall"] == "fda_recalls" assert DOMAIN_TO_PLURAL["fda_shortage"] == "fda_shortages" assert PLURAL_TO_DOMAIN["fda_adverse_events"] == "fda_adverse" assert PLURAL_TO_DOMAIN["fda_labels"] == "fda_label" assert PLURAL_TO_DOMAIN["fda_device_events"] == "fda_device" assert PLURAL_TO_DOMAIN["fda_approvals"] == "fda_approval" assert PLURAL_TO_DOMAIN["fda_recalls"] == "fda_recall" assert PLURAL_TO_DOMAIN["fda_shortages"] == "fda_shortage" def test_openfda_search_domain_type_hints(self): """Test that OpenFDA domains are in search tool type hints.""" import inspect from biomcp.router import search # Get the function signature sig = inspect.signature(search) domain_param = sig.parameters.get("domain") # Check if domain parameter exists assert ( domain_param is not None ), "domain parameter not found in search function" # Get the annotation annotation = domain_param.annotation # The annotation should be a Literal type that includes OpenFDA domains # We can't directly check the Literal values due to how Python handles it, # but we can verify that it's properly annotated assert ( annotation is not None ), "domain parameter has no type annotation" def test_openfda_fetch_domain_type_hints(self): """Test that OpenFDA domains are in fetch tool type hints.""" import inspect from biomcp.router import fetch # Get the function signature sig = inspect.signature(fetch) domain_param = sig.parameters.get("domain") # Check if domain parameter exists assert ( domain_param is not None ), "domain parameter not found in fetch function" # Get the annotation annotation = domain_param.annotation # The annotation should be a Literal type that includes OpenFDA domains assert ( annotation is not None ), "domain parameter has no type annotation" @pytest.mark.asyncio async def test_openfda_search_basic_call(self): """Test that OpenFDA domain search doesn't raise errors with basic call.""" from unittest.mock import AsyncMock, patch # Mock the OpenFDA search function that will be imported with patch( "biomcp.openfda.adverse_events.search_adverse_events", new_callable=AsyncMock, ) as mock_search: mock_search.return_value = ( "## FDA Adverse Event Reports\n\nTest results" ) from biomcp.router import search # This should not raise an error result = await search( query=None, # Required parameter domain="fda_adverse", chemicals=["test"], page_size=1, ) # Basic check that result has expected structure assert isinstance(result, dict) assert "results" in result @pytest.mark.asyncio async def test_openfda_fetch_basic_call(self): """Test that OpenFDA domain fetch doesn't raise errors with basic call.""" from unittest.mock import AsyncMock, patch # Mock the OpenFDA get function that will be imported with patch( "biomcp.openfda.drug_approvals.get_drug_approval", new_callable=AsyncMock, ) as mock_get: mock_get.return_value = "## Drug Approval Details\n\nTest details" from biomcp.router import fetch # This should not raise an error result = await fetch( id="TEST123", domain="fda_approval", ) # Basic check that result has expected structure assert isinstance(result, dict) assert "title" in result assert "text" in result assert "metadata" in result ``` -------------------------------------------------------------------------------- /tests/tdd/articles/test_preprints.py: -------------------------------------------------------------------------------- ```python """Tests for preprint search functionality.""" from unittest.mock import AsyncMock, patch import pytest from biomcp.articles.preprints import ( BiorxivClient, BiorxivResponse, BiorxivResult, EuropePMCClient, EuropePMCResponse, PreprintSearcher, ) from biomcp.articles.search import PubmedRequest, ResultItem from biomcp.core import PublicationState class TestBiorxivClient: """Tests for BiorxivClient.""" @pytest.mark.asyncio async def test_search_biorxiv_success(self): """Test successful bioRxiv search.""" client = BiorxivClient() # Mock response mock_response = BiorxivResponse( collection=[ BiorxivResult( doi="10.1101/2024.01.01.123456", title="Test BRAF Mutation Study", authors="Smith, J.; Doe, J.", date="2024-01-01", abstract="Study about BRAF mutations in cancer.", server="biorxiv", ) ], total=1, ) with patch("biomcp.http_client.request_api") as mock_request: mock_request.return_value = (mock_response, None) results = await client.search("BRAF") assert len(results) == 1 assert results[0].doi == "10.1101/2024.01.01.123456" assert results[0].title == "Test BRAF Mutation Study" assert results[0].publication_state == PublicationState.PREPRINT assert "preprint" in results[0].journal.lower() @pytest.mark.asyncio async def test_search_biorxiv_no_results(self): """Test bioRxiv search with no results.""" client = BiorxivClient() with patch("biomcp.http_client.request_api") as mock_request: mock_request.return_value = ( None, {"code": 404, "message": "Not found"}, ) results = await client.search("nonexistent") assert len(results) == 0 class TestEuropePMCClient: """Tests for EuropePMCClient.""" @pytest.mark.asyncio async def test_search_europe_pmc_success(self): """Test successful Europe PMC search.""" client = EuropePMCClient() # Mock response mock_response = EuropePMCResponse( hitCount=1, resultList={ "result": [ { "id": "PPR123456", "doi": "10.1101/2024.01.02.654321", "title": "TP53 Mutation Analysis", "authorString": "Johnson, A., Williams, B.", "journalTitle": "bioRxiv", "firstPublicationDate": "2024-01-02", "abstractText": "Analysis of TP53 mutations.", } ] }, ) with patch("biomcp.http_client.request_api") as mock_request: mock_request.return_value = (mock_response, None) results = await client.search("TP53") assert len(results) == 1 assert results[0].doi == "10.1101/2024.01.02.654321" assert results[0].title == "TP53 Mutation Analysis" assert results[0].publication_state == PublicationState.PREPRINT class TestPreprintSearcher: """Tests for PreprintSearcher.""" @pytest.mark.asyncio async def test_search_combined_sources(self): """Test searching across multiple preprint sources.""" searcher = PreprintSearcher() # Mock both clients mock_biorxiv_results = [ ResultItem( doi="10.1101/2024.01.01.111111", title="BRAF Study 1", date="2024-01-01", publication_state=PublicationState.PREPRINT, ) ] mock_europe_results = [ ResultItem( doi="10.1101/2024.01.02.222222", title="BRAF Study 2", date="2024-01-02", publication_state=PublicationState.PREPRINT, ) ] searcher.biorxiv_client.search = AsyncMock( return_value=mock_biorxiv_results ) searcher.europe_pmc_client.search = AsyncMock( return_value=mock_europe_results ) request = PubmedRequest(genes=["BRAF"]) response = await searcher.search(request) assert response.count == 2 assert len(response.results) == 2 # Results should be sorted by date (newest first) assert response.results[0].doi == "10.1101/2024.01.02.222222" assert response.results[1].doi == "10.1101/2024.01.01.111111" @pytest.mark.asyncio async def test_search_duplicate_removal(self): """Test that duplicate DOIs are removed.""" searcher = PreprintSearcher() # Create duplicate results with same DOI duplicate_doi = "10.1101/2024.01.01.999999" mock_biorxiv_results = [ ResultItem( doi=duplicate_doi, title="Duplicate Study", date="2024-01-01", publication_state=PublicationState.PREPRINT, ) ] mock_europe_results = [ ResultItem( doi=duplicate_doi, title="Duplicate Study", date="2024-01-01", publication_state=PublicationState.PREPRINT, ) ] searcher.biorxiv_client.search = AsyncMock( return_value=mock_biorxiv_results ) searcher.europe_pmc_client.search = AsyncMock( return_value=mock_europe_results ) request = PubmedRequest(keywords=["test"]) response = await searcher.search(request) assert response.count == 1 assert len(response.results) == 1 assert response.results[0].doi == duplicate_doi ``` -------------------------------------------------------------------------------- /tests/tdd/test_render.py: -------------------------------------------------------------------------------- ```python from biomcp import render def test_render_full_json(data_dir): input_data = (data_dir / "ct_gov/trials_NCT04280705.json").read_text() expect_markdown = (data_dir / "ct_gov/trials_NCT04280705.txt").read_text() markdown = render.to_markdown(input_data) assert markdown == expect_markdown input_data = ( data_dir / "myvariant/variants_full_braf_v600e.json" ).read_text() expect_markdown = ( data_dir / "myvariant/variants_full_braf_v600e.txt" ).read_text() markdown = render.to_markdown(input_data) print("==" * 100) print(markdown) print("==" * 100) assert markdown == expect_markdown def test_render_with_nones(): markdown = render.to_markdown(data) assert ( markdown == """# Studies ## Protocol Section ### Design Module Study Type: interventional Phases: phase2 ### Identification Module Brief Title: study of autologous tumor infiltrating lymphocytes in patients with solid tumors Nct Id: nct03645928 ### Status Module Overall Status: recruiting #### Completion Date Struct Date: 2029-08-09 #### Start Date Struct Date: 2019-05-07 """ ) data = { "next_page_token": None, "studies": [ { "derived_section": None, "document_section": None, "has_results": None, "protocol_section": { "arms_interventions_module": None, "conditions_module": None, "contacts_locations_module": None, "description_module": None, "design_module": { "design_info": None, "enrollment_info": None, "phases": ["phase2"], "study_type": "interventional", }, "eligibility_module": None, "identification_module": { "acronym": None, "brief_title": "study " "of " "autologous " "tumor " "infiltrating " "lymphocytes " "in " "patients " "with " "solid " "tumors", "nct_id": "nct03645928", "official_title": None, "org_study_id_info": None, "organization": None, "secondary_id_infos": None, }, "outcomes_module": None, "oversight_module": None, "references_module": None, "sponsor_collaborators_module": None, "status_module": { "completion_date_struct": { "date": "2029-08-09", "type": None, }, "expanded_access_info": None, "last_known_status": None, "last_update_post_date_struct": None, "last_update_submit_date": None, "overall_status": "recruiting", "primary_completion_date_struct": None, "results_first_post_date_struct": None, "results_first_submit_date": None, "results_first_submit_qc_date": None, "start_date_struct": {"date": "2019-05-07", "type": None}, "status_verified_date": None, "study_first_post_date_struct": None, "study_first_submit_date": None, "study_first_submit_qc_date": None, "why_stopped": None, }, }, "results_section": None, }, ], } def test_transform_key_protocol_section(): assert render.transform_key("protocol_section") == "Protocol Section" def test_transform_key_nct_number(): assert render.transform_key("nct_number") == "Nct Number" def test_transform_key_study_url(): assert render.transform_key("study_url") == "Study Url" def test_transform_key_allcaps(): assert render.transform_key("allcaps") == "Allcaps" def test_transform_key_primary_purpose(): assert render.transform_key("primary_purpose") == "Primary Purpose" def test_transform_key_underscores(): assert render.transform_key("some_key_name") == "Some Key Name" def test_transform_key_lowercase(): assert render.transform_key("somekey") == "Somekey" def test_transform_key_nctid(): assert render.transform_key("nct_id") == "Nct Id" def test_transform_key_4dct(): assert render.transform_key("4dct") == "4dct" def test_wrap_preserve_newlines_blank(): assert render.wrap_preserve_newlines("", 20) == [] def test_wrap_preserve_newlines_short_line(): text = "hello world" assert render.wrap_preserve_newlines(text, 20) == ["hello world"] def test_wrap_preserve_newlines_long(): text = "this line is definitely longer than twenty characters" lines = render.wrap_preserve_newlines(text, 20) assert len(lines) > 1 assert "this line is" in lines[0] def test_process_scalar_list_fits(): lines = [] render.process_scalar_list( "conditions", lines, ["condition1", "condition2"], ) assert lines == ["Conditions: condition1, condition2"] def test_process_scalar_list_too_long(): lines = [] big_list = ["test_value" * 10, "another" * 5] render.process_scalar_list("giant_field", lines, big_list) assert lines[0].startswith("Giant Field:") assert lines[1].startswith("- test_value") def test_render_key_value_short(): lines = [] render.render_key_value(lines, "nct_number", "nct100") assert lines == ["Nct Number: nct100"] def test_render_key_value_long(): lines = [] render.render_key_value(lines, "brief_summary", "hello " * 15) # first line "brief summary:" assert lines[0] == "Brief Summary:" assert lines[1].startswith(" hello hello") ``` -------------------------------------------------------------------------------- /src/biomcp/articles/search_optimized.py: -------------------------------------------------------------------------------- ```python """Optimized article search with caching and parallel processing.""" import asyncio import hashlib from .. import ensure_list from ..shared_context import get_search_context from ..utils.request_cache import get_cache from .search import PubmedRequest from .unified import search_articles_unified # Cache for article search results (5 minute TTL) _search_cache = get_cache("article_search", ttl_seconds=300) def _get_search_cache_key( request: PubmedRequest, include_preprints: bool, include_cbioportal: bool ) -> str: """Generate a cache key for search requests.""" # Create a deterministic key from search parameters key_parts = [ f"chemicals:{sorted(request.chemicals)}", f"diseases:{sorted(request.diseases)}", f"genes:{sorted(request.genes)}", f"keywords:{sorted(request.keywords)}", f"variants:{sorted(request.variants)}", f"preprints:{include_preprints}", f"cbioportal:{include_cbioportal}", ] key_string = "|".join(key_parts) return hashlib.sha256(key_string.encode()).hexdigest() async def article_searcher_optimized( call_benefit: str, chemicals: list[str] | str | None = None, diseases: list[str] | str | None = None, genes: list[str] | str | None = None, keywords: list[str] | str | None = None, variants: list[str] | str | None = None, include_preprints: bool = True, include_cbioportal: bool = True, ) -> str: """Optimized version of article_searcher with caching and context reuse.""" # Convert parameters to PubmedRequest request = PubmedRequest( chemicals=ensure_list(chemicals, split_strings=True), diseases=ensure_list(diseases, split_strings=True), genes=ensure_list(genes, split_strings=True), keywords=ensure_list(keywords, split_strings=True), variants=ensure_list(variants, split_strings=True), ) # Check cache first cache_key = _get_search_cache_key( request, include_preprints, include_cbioportal ) cached_result = await _search_cache.get(cache_key) if cached_result is not None: return cached_result # Check if we're in a search context (for reusing validated entities) context = get_search_context() if context and request.genes: # Pre-validate genes using cached results valid_genes = [] for gene in request.genes: if await context.validate_gene(gene): valid_genes.append(gene) request.genes = valid_genes # Check if we have cached cBioPortal summaries if include_cbioportal and request.genes: for gene in request.genes[:1]: # Just first gene summary = context.get_gene_summary(gene) if summary: # We have a cached summary, can skip that part pass # Perform the search result = await search_articles_unified( request, include_pubmed=True, include_preprints=include_preprints, include_cbioportal=include_cbioportal, ) # Cache the result (5 minute TTL) await _search_cache.set(cache_key, result, ttl=300) return result # Additional optimization: Batch article searches class ArticleSearchBatcher: """Batch multiple article searches to reduce overhead.""" def __init__(self, batch_size: int = 5, timeout: float = 0.1): self.batch_size = batch_size self.timeout = timeout self._pending_searches: list[tuple[PubmedRequest, asyncio.Future]] = [] self._batch_task: asyncio.Task | None = None async def search(self, request: PubmedRequest) -> str: """Add a search to the batch.""" future = asyncio.get_event_loop().create_future() self._pending_searches.append((request, future)) # Start batch processing if not already running if self._batch_task is None or self._batch_task.done(): self._batch_task = asyncio.create_task(self._process_batch()) return await future async def _process_batch(self): """Process pending searches in batch.""" await asyncio.sleep(self.timeout) # Wait for more requests if not self._pending_searches: return # Take up to batch_size searches batch = self._pending_searches[: self.batch_size] self._pending_searches = self._pending_searches[self.batch_size :] # Process searches in parallel search_tasks = [] for request, _ in batch: task = search_articles_unified(request, include_pubmed=True) search_tasks.append(task) results = await asyncio.gather(*search_tasks, return_exceptions=True) # Set results on futures for (_, future), result in zip(batch, results, strict=False): if isinstance(result, Exception): future.set_exception(result) else: future.set_result(result) # Global batcher instance _article_batcher = ArticleSearchBatcher() async def article_searcher_batched( call_benefit: str, chemicals: list[str] | str | None = None, diseases: list[str] | str | None = None, genes: list[str] | str | None = None, keywords: list[str] | str | None = None, variants: list[str] | str | None = None, include_preprints: bool = True, include_cbioportal: bool = True, ) -> str: """Batched version of article_searcher for multiple concurrent searches.""" request = PubmedRequest( chemicals=ensure_list(chemicals, split_strings=True), diseases=ensure_list(diseases, split_strings=True), genes=ensure_list(genes, split_strings=True), keywords=ensure_list(keywords, split_strings=True), variants=ensure_list(variants, split_strings=True), ) # Use the optimized version with caching return await article_searcher_optimized( call_benefit=call_benefit, chemicals=request.chemicals, diseases=request.diseases, genes=request.genes, keywords=request.keywords, variants=request.variants, include_preprints=include_preprints, include_cbioportal=include_cbioportal, ) ``` -------------------------------------------------------------------------------- /tests/tdd/variants/test_cbioportal_mutations.py: -------------------------------------------------------------------------------- ```python """Tests for cBioPortal mutation-specific search functionality.""" import pytest from biomcp.utils.mutation_filter import MutationFilter from biomcp.variants.cbioportal_mutations import ( CBioPortalMutationClient, MutationHit, StudyMutationSummary, format_mutation_search_result, ) class TestCBioPortalMutationSearch: """Test mutation-specific search functionality.""" @pytest.mark.asyncio @pytest.mark.integration async def test_search_specific_mutation_srsf2_f57y(self): """Test searching for SRSF2 F57Y mutation.""" client = CBioPortalMutationClient() result = await client.search_specific_mutation( gene="SRSF2", mutation="F57Y", max_studies=10 ) assert result is not None assert result.gene == "SRSF2" assert result.specific_mutation == "F57Y" assert result.studies_with_mutation >= 0 # If mutations found, check structure if result.studies_with_mutation > 0: assert len(result.top_studies) > 0 top_study = result.top_studies[0] assert isinstance(top_study, StudyMutationSummary) assert top_study.mutation_count > 0 @pytest.mark.asyncio @pytest.mark.integration async def test_search_mutation_pattern_srsf2_f57(self): """Test searching for SRSF2 F57* mutations.""" client = CBioPortalMutationClient() result = await client.search_specific_mutation( gene="SRSF2", pattern="F57*", max_studies=10 ) assert result is not None assert result.gene == "SRSF2" assert result.pattern == "F57*" # F57* should match F57Y, F57C, etc. if result.total_mutations > 0: assert result.mutation_types is not None # Check that we found some F57 mutations f57_mutations = [ mut for mut in result.mutation_types if mut.startswith("F57") ] assert len(f57_mutations) > 0 @pytest.mark.asyncio @pytest.mark.integration async def test_search_braf_v600e(self): """Test searching for BRAF V600E - a very common mutation.""" client = CBioPortalMutationClient() result = await client.search_specific_mutation( gene="BRAF", mutation="V600E", max_studies=20 ) assert result is not None assert result.gene == "BRAF" assert result.specific_mutation == "V600E" # V600E is very common, should have many studies assert result.studies_with_mutation > 10 assert len(result.top_studies) > 0 # Check melanoma is in top cancer types cancer_types = [s.cancer_type for s in result.top_studies] # At least some melanoma studies should have V600E assert any("melanoma" in ct.lower() for ct in cancer_types) def test_filter_mutations_specific(self): """Test filtering for specific mutations.""" mutations = [ MutationHit( study_id="study1", molecular_profile_id="study1_mutations", protein_change="F57Y", mutation_type="Missense", ), MutationHit( study_id="study1", molecular_profile_id="study1_mutations", protein_change="F57C", mutation_type="Missense", ), MutationHit( study_id="study2", molecular_profile_id="study2_mutations", protein_change="R88Q", mutation_type="Missense", ), ] # Filter for F57Y mutation_filter = MutationFilter(specific_mutation="F57Y") filtered = mutation_filter.filter_mutations(mutations) assert len(filtered) == 1 assert filtered[0].protein_change == "F57Y" def test_filter_mutations_pattern(self): """Test filtering with wildcard patterns.""" mutations = [ MutationHit( study_id="study1", molecular_profile_id="study1_mutations", protein_change="F57Y", mutation_type="Missense", ), MutationHit( study_id="study1", molecular_profile_id="study1_mutations", protein_change="F57C", mutation_type="Missense", ), MutationHit( study_id="study2", molecular_profile_id="study2_mutations", protein_change="R88Q", mutation_type="Missense", ), ] # Filter for F57* mutation_filter = MutationFilter(pattern="F57*") filtered = mutation_filter.filter_mutations(mutations) assert len(filtered) == 2 assert all(m.protein_change.startswith("F57") for m in filtered) def test_format_mutation_search_result(self): """Test formatting of mutation search results.""" from biomcp.variants.cbioportal_mutations import MutationSearchResult result = MutationSearchResult( gene="SRSF2", specific_mutation="F57Y", total_studies=100, studies_with_mutation=3, total_mutations=5, top_studies=[ StudyMutationSummary( study_id="msk_ch_2023", study_name="Cancer Therapy and Clonal Hematopoiesis", cancer_type="mixed", mutation_count=5, sample_count=100, ), StudyMutationSummary( study_id="mds_mskcc_2020", study_name="Myelodysplastic Syndrome Study", cancer_type="mds", mutation_count=2, sample_count=50, ), ], mutation_types={"F57Y": 5}, ) formatted = format_mutation_search_result(result) assert "SRSF2" in formatted assert "F57Y" in formatted assert "**Studies with Mutation**: 3" in formatted assert "msk_ch_2023" in formatted assert "| 5 |" in formatted # mutation count ``` -------------------------------------------------------------------------------- /docs/backend-services-reference/06-pubtator3.md: -------------------------------------------------------------------------------- ```markdown # PubTator3 API This document describes the PubTator3 API used by BioMCP for searching biomedical literature and retrieving article details with annotations. Understanding this API provides context for how BioMCP's article commands function. ## Overview The PubTator3 API provides a way to search for and retrieve biomedical articles with entity annotations. This document outlines the API implementation details. PubTator3 is a web-based tool that provides annotations of biomedical entities in PubMed abstracts and PMC full-text articles. BioMCP uses the PubTator3 API to search for and retrieve biomedical articles and their annotated entities ( genes, variants, diseases, chemicals, etc.). > **CLI Documentation**: For information on using these APIs through the BioMCP > command line interface, see > the [Articles CLI Documentation](../user-guides/01-command-line-interface.md#article-commands). ## Usage Guide For practical examples of searching articles with PubTator3, see [How to Find Articles and cBioPortal Data](../how-to-guides/01-find-articles-and-cbioportal-data.md). ## API Workflow The PubTator3 integration follows a three-step workflow: 1. **Entity Autocomplete**: Get standardized entity identifiers 2. **Search**: Find articles using entity identifiers and keywords 3. **Fetch**: Retrieve full article details by PMID ## API Endpoints ### Entity Autocomplete API **Endpoint:** `https://www.ncbi.nlm.nih.gov/research/pubtator3-api/entity/autocomplete/` This endpoint helps normalize entity names to their standard identifiers, improving search precision. #### Parameters | Parameter | Description | Example | | --------- | --------------------------- | ----------------------------------- | | `query` | Text to autocomplete | `BRAF` | | `concept` | Entity type | `GENE`, `CHEMICAL`, `DISEASE`, etc. | | `limit` | Number of results to return | `2` | #### Example Request and Response ```bash curl "https://www.ncbi.nlm.nih.gov/research/pubtator3-api/entity/autocomplete/?query=BRAF&concept=GENE&limit=2" ``` Response: ```json [ { "_id": "@GENE_BRAF", "biotype": "gene", "name": "BRAF", "description": "All Species", "match": "Matched on name <m>BRAF</m>" }, { "_id": "@GENE_BRAFP1", "biotype": "gene", "name": "BRAFP1", "description": "All Species", "match": "Matched on name <m>BRAFP1</m>" } ] ``` ### Entity Search API **Endpoint:** `https://www.ncbi.nlm.nih.gov/research/pubtator3-api/search/` This endpoint allows searching for PMIDs (PubMed IDs) based on entity identifiers and keywords. #### Parameters | Parameter | Description | Example | | --------- | ------------------------------- | ---------------------- | | `text` | Entity identifier or text query | `@CHEMICAL_remdesivir` | #### Example Request and Response ```bash curl "https://www.ncbi.nlm.nih.gov/research/pubtator3-api/search/?text=@CHEMICAL_remdesivir" ``` Response (truncated): ```json { "results": [ { "_id": "37711410", "pmid": 37711410, "title": "Remdesivir.", "journal": "Hosp Pharm", "authors": ["Levien TL", "Baker DE"], "date": "2023-10-01T00:00:00Z", "doi": "10.1177/0018578721999804", "meta_date_publication": "2023 Oct", "meta_volume": "58" } // More results... ] } ``` ### Article Fetch API **Endpoint:** `https://www.ncbi.nlm.nih.gov/research/pubtator3-api/publications/export/biocjson` This endpoint retrieves detailed information about specific articles, including annotations. #### Parameters | Parameter | Description | Example | | ----------- | --------------------------------------------- | ---------- | | `pmids` | List of PubMed IDs to retrieve | `29355051` | | `full_text` | Whether to include full text (when available) | `true` | #### Example Request ```bash curl "https://www.ncbi.nlm.nih.gov/research/pubtator3-api/publications/export/biocjson?pmids=29355051&full=true" ``` Response format (truncated): ```json { "PubTator3": [ { "_id": "29355051|PMC6142073", "id": "6142073", "infons": {}, "passages": [ { "infons": { "name_3": "surname:Hu;given-names:Minghua", "name_2": "surname:Luo;given-names:Xia", "name_1": "surname:Luo;given-names:Shuang", "article-id_pmid": "29355051" // More metadata... } } // More passages... ] } ] } ``` ## Entity Types PubTator3 annotates several types of biomedical entities: 1. **Genes/Proteins**: Gene or protein names (e.g., BRAF, TP53) 2. **Genetic Variants**: Genetic variations (e.g., BRAF V600E) 3. **Diseases**: Disease names and conditions (e.g., Melanoma) 4. **Chemicals/Drugs**: Chemical substances or drugs (e.g., Vemurafenib) ## Integration Strategy for BioMCP The recommended workflow for integrating with PubTator3 in BioMCP is: 1. **Entity Normalization**: Use the autocomplete API to convert user-provided entity names to standardized identifiers 2. **Literature Search**: Use the search API with these identifiers to find relevant PMIDs 3. **Data Retrieval**: Fetch detailed article data with annotations using the fetch API This workflow ensures consistent entity handling and optimal search results. ## Authentication The PubTator3 API is public and does not require authentication for basic usage. However, there are rate limits in place to prevent abuse. ## Rate Limits and Best Practices - **Request Limits**: Approximately 30 requests per minute - **Batch Requests**: For article retrieval, batch multiple PMIDs in a single request - **Caching**: Implement caching to minimize repeated requests - **Specific Queries**: Use specific entity names rather than general terms for better results ## Error Handling Common error responses: - **400**: Invalid parameters - **404**: Articles not found - **429**: Rate limit exceeded - **500**: Server error ## More Information For complete API documentation, visit the [PubTator3 API Documentation](https://www.ncbi.nlm.nih.gov/research/pubtator3/api). ``` -------------------------------------------------------------------------------- /docs/backend-services-reference/04-clinicaltrials-gov.md: -------------------------------------------------------------------------------- ```markdown # ClinicalTrials.gov API This document outlines the key aspects of the public ClinicalTrials.gov v2 API utilized by BioMCP. Understanding these details can be helpful for advanced users interpreting BioMCP results or for developers extending its capabilities. BioMCP's CLI commands often simplify or combine these parameters for ease of use; refer to the [Trials CLI Documentation](../user-guides/01-command-line-interface.md#trial-commands) for specific command options. ## Overview The [ClinicalTrials.gov](https://clinicaltrials.gov/) API provides programmatic access to clinical trial information. This document outlines the API implementation details for searching and retrieving clinical trial data. > **CLI Documentation**: For information on using these APIs through the BioMCP > command line interface, see the [Trials CLI Documentation](../user-guides/01-command-line-interface.md#trial-commands). ## API Endpoints ### Search API **Endpoint:** `https://clinicaltrials.gov/api/v2/studies` This endpoint allows searching for clinical trials using various parameters. #### Key Parameters | Parameter | Description | Example Value | | ---------------------- | ----------------------------------- | ----------------------------------------------- | | `query.cond` | "Conditions or disease" query | `lung cancer` | | `query.term` | "Other terms" query | `AREA[LastUpdatePostDate]RANGE[2023-01-15,MAX]` | | `query.intr` | "Intervention/treatment" query | `Vemurafenib` | | `query.locn` | "Location terms" query | `New York` | | `query.titles` | "Title/acronym" query | `BRAF Melanoma` | | `query.outc` | "Outcome measure" query | `overall survival` | | `query.spons` | "Sponsor/collaborator" query | `National Cancer Institute` | | `query.lead` | Searches in "LeadSponsorName" field | `MD Anderson` | | `query.id` | "Study IDs" query (OR semantics) | `NCT04267848` | | `filter.overallStatus` | Comma-separated list of statuses | `NOT_YET_RECRUITING,RECRUITING` | | `filter.geo` | Geo-location filter | `distance(39.0035707,-77.1013313,50mi)` | | `filter.ids` | Filter by NCT IDs (AND semantics) | `NCT04852770,NCT01728545` | | `filter.advanced` | Advanced filter query | `AREA[StartDate]2022` | | `sort` | Sort order | `LastUpdatePostDate:desc` | | `fields` | Fields to return | `NCTId,BriefTitle,OverallStatus,HasResults` | | `countTotal` | Count total number of studies | `true` or `false` | #### Example Request ```bash curl -X GET "https://clinicaltrials.gov/api/v2/studies?query.cond=Melanoma&query.intr=BRAF" ``` ### Study Details API **Endpoint:** `https://clinicaltrials.gov/api/v2/studies/{NCT_ID}` This endpoint retrieves detailed information about a specific clinical trial. #### Example Request ```bash curl -X GET "https://clinicaltrials.gov/api/v2/studies/NCT04267848" ``` #### Response Modules The API response contains various modules of information: - **protocolSection**: Basic study information, eligibility criteria, and design - **resultsSection**: Study outcomes and results (when available) - **documentSection**: Related documents - **derivedSection**: Derived data elements - **annotationsSection**: Additional annotations ## Implementation Details ### NCT ID Filtering Semantics BioMCP uses intelligent filtering when NCT IDs are provided: - **ID-only mode**: When NCT IDs are the only filter criteria, `query.id` is used for fast direct lookup - **Intersection mode**: When NCT IDs are combined with other filters (conditions, interventions, etc.), `filter.ids` is used to ensure results match ALL criteria This ensures that specifying NCT IDs restricts results rather than expanding them. ### Query Building When constructing API queries, parameters must be properly formatted according to the API documentation. For implementation details on query building in BioMCP, see the [HTTP Client Developer Guide](../developer-guides/06-http-client-and-caching.md). ### Response Parsing The API returns data in JSON format (or CSV if specified). Key sections in the response include: - `protocolSection`: Contains study protocol details - `identificationModule`: Basic identifiers including NCT ID and title - `statusModule`: Current recruitment status and study dates - `sponsorCollaboratorsModule`: Information about sponsors and collaborators - `designModule`: Study design information including interventions - `eligibilityModule`: Inclusion/exclusion criteria and eligible population - `contactsLocationsModule`: Study sites and contact information - `referencesModule`: Related publications ### Error Handling The API returns standard HTTP status codes. Common error scenarios include: - **404**: Trial not found - **429**: Rate limit exceeded - **400**: Invalid query parameters For implementation details on error handling in BioMCP, see the [Error Handling Developer Guide](../developer-guides/05-error-handling.md). ## Authentication The ClinicalTrials.gov API is public and does not require authentication for basic usage. However, there are rate limits in place. ## Rate Limits and Best Practices - **Rate Limit**: Approximately 50 requests per minute per IP address - **Caching**: Implement caching to minimize repeated requests - **Pagination**: For large result sets, use the pagination functionality with - **Focused Queries**: Use specific search terms rather than broad queries to get more relevant results - **Field Selection**: Use the fields parameter to request only the data you need ## More Information For complete API documentation, visit the [ClinicalTrials.gov API Documentation](https://clinicaltrials.gov/data-api/about-api) ``` -------------------------------------------------------------------------------- /docs/how-to-guides/05-logging-and-monitoring-with-bigquery.md: -------------------------------------------------------------------------------- ```markdown # BigQuery Logging for BioMCP This document outlines how BioMCP uses Google BigQuery for logging user interactions and API usage. ## Overview BioMCP integrates with Google BigQuery to log user interactions, queries, and API usage. This logging provides valuable insights into how the system is being used, helps with debugging, and enables analytics for improving the service. ## Prerequisites - A Google Cloud Platform (GCP) account - A BigQuery dataset and table created in your GCP project - A GCP service account with BigQuery permissions ## Setting Up BigQuery for BioMCP 1. **Create a BigQuery Dataset and Table** - In the Google Cloud Console, navigate to BigQuery - Create a new dataset (e.g., `biomcp_logs`) - Create a table within the dataset (e.g., `worker_logs`) with the following schema: ``` timestamp: TIMESTAMP userEmail: STRING query: STRING ``` - Adjust the schema as needed for your specific logging requirements 2. **Create a Service Account** - Navigate to "IAM & Admin" > "Service Accounts" in the Google Cloud Console - Create a new service account with a descriptive name (e.g., `biomcp-bigquery-logger`) - Assign the "BigQuery Data Editor" role to the service account - Create and download a JSON key for the service account 3. **Configure BioMCP with BigQuery Credentials** - Open `wrangler.toml` in the BioMCP project - Update the following variables with your BigQuery information: ```toml BQ_PROJECT_ID = "your-gcp-project-id" BQ_DATASET = "biomcp_logs" BQ_TABLE = "worker_logs" ``` - For the service account key, use Cloudflare's secret management: ```bash npx wrangler secret put BQ_SA_KEY_JSON ``` When prompted, paste the entire JSON content of your service account key file ## How BigQuery Logging Works The BioMCP worker uses the following process to log data to BigQuery: 1. **Authentication**: The worker generates a JWT token using the service account credentials 2. **Token Exchange**: The JWT is exchanged for a Google OAuth access token 3. **Data Insertion**: The worker uses BigQuery's streaming insert API to log events The implementation includes: - Token caching to minimize authentication requests - Error handling for failed logging attempts - Automatic retry logic for transient failures ## Logged Information By default, the following information is logged to BigQuery: - **timestamp**: When the event occurred - **userEmail**: The email address of the authenticated user (if available) - **query**: The query or request that was made You can extend the logging schema to include additional information as needed. ## Accessing and Analyzing Logs To access and analyze the logs: 1. **Query the BigQuery Table** - Use the BigQuery console or SQL to query your logs - Example query to see recent logs: ```sql SELECT timestamp, userEmail, query FROM `your-project.biomcp_logs.worker_logs` ORDER BY timestamp DESC LIMIT 100 ``` 2. **Create Visualizations** - Use Google Data Studio to create dashboards based on your BigQuery data - Connect Data Studio to your BigQuery table and create visualizations ## Security Considerations - The service account key is sensitive information and should be protected - Use Cloudflare's secret management to store the key securely - Consider implementing field-level encryption for sensitive data - Implement data retention policies to comply with privacy regulations - **IMPORTANT: Never include PHI (Protected Health Information) or PII (Personally Identifiable Information) in queries or logs** - Ensure all queries are sanitized to remove patient identifiers, medical record numbers, and other sensitive information - Consider implementing automatic redaction of potential PHI/PII from logs - Regularly audit logs to ensure compliance with HIPAA and other privacy regulations - Remember that BigQuery logs are not designed for storing protected health information ### Automatic Sanitization BioMCP automatically sanitizes sensitive data before logging to BigQuery: - **API Keys and Secrets**: Fields containing `api_key`, `apiKey`, `api-key`, `token`, `secret`, or `password` are automatically redacted - **Nested Objects**: Sanitization works recursively through nested objects and arrays - **Case-Insensitive**: Field name matching is case-insensitive to catch variations - **Preserved Structure**: The original request structure is maintained with sensitive values replaced by `[REDACTED]` Example of sanitization: ```javascript // Original request { "params": { "arguments": { "api_key": "AIzaSyB1234567890", "gene": "BRAF" } } } // Sanitized for BigQuery { "params": { "arguments": { "api_key": "[REDACTED]", "gene": "BRAF" } } } ``` ### Excluded Queries Certain types of queries are automatically excluded from BigQuery logging: - **Think Tool Calls**: Any calls to the `think` tool are not logged - **Thinking Domain**: Queries with `domain="thinking"` or `domain="think"` are excluded - **Privacy-First Design**: This ensures that internal reasoning and analysis steps remain private ## Troubleshooting - **Authentication Failures**: Verify that the service account key is correctly formatted and has the necessary permissions - **Insertion Errors**: Check that the BigQuery table schema matches the data being inserted - **Missing Logs**: Ensure that the worker has network access to the BigQuery API ## Example Code The worker includes the following key functions for BigQuery logging: - `getBQToken()`: Fetches and caches a BigQuery OAuth token - `insertEvent()`: Inserts a single row into BigQuery via streaming insert - `sanitizeObject()`: Recursively sanitizes sensitive fields from objects before logging These functions handle the authentication and data insertion process automatically. ## Testing BioMCP includes comprehensive tests for the BigQuery logging functionality: ### JavaScript Tests The sanitization logic is tested using Node.js built-in test framework: ```bash # Run JavaScript worker tests make test-js # Or run directly node --test tests/tdd/workers/test_worker_sanitization.js ``` Tests cover: - API key redaction - Nested sensitive field handling - Array sanitization - Case-insensitive field matching - Think tool detection - Domain-based filtering ``` -------------------------------------------------------------------------------- /src/biomcp/organizations/search.py: -------------------------------------------------------------------------------- ```python """Search functionality for organizations via NCI CTS API.""" import logging from typing import Any from ..constants import NCI_ORGANIZATIONS_URL from ..integrations.cts_api import CTSAPIError, make_cts_request from ..utils import parse_or_query logger = logging.getLogger(__name__) async def search_organizations( name: str | None = None, org_type: str | None = None, city: str | None = None, state: str | None = None, page_size: int = 20, page: int = 1, api_key: str | None = None, ) -> dict[str, Any]: """ Search for organizations in the NCI CTS database. Args: name: Organization name to search for (partial match) org_type: Type of organization (e.g., "industry", "academic") city: City location state: State location (2-letter code) page_size: Number of results per page page: Page number api_key: Optional API key (if not provided, uses NCI_API_KEY env var) Returns: Dictionary with search results containing: - organizations: List of organization records - total: Total number of results - page: Current page - page_size: Results per page Raises: CTSAPIError: If the API request fails """ # Build query parameters params: dict[str, Any] = { "size": page_size, } # Note: The NCI API doesn't support offset/page pagination for organizations # It uses cursor-based pagination or returns all results up to size limit # Add search filters with correct API parameter names if name: params["name"] = name if org_type: params["type"] = org_type if city: params["org_city"] = city if state: params["org_state_or_province"] = state try: # Make API request response = await make_cts_request( url=NCI_ORGANIZATIONS_URL, params=params, api_key=api_key, ) # Process response - adapt to actual API format # This is a reasonable structure based on typical REST APIs organizations = response.get("data", response.get("organizations", [])) total = response.get("total", len(organizations)) return { "organizations": organizations, "total": total, "page": page, "page_size": page_size, } except CTSAPIError: raise except Exception as e: logger.error(f"Failed to search organizations: {e}") raise CTSAPIError(f"Organization search failed: {e!s}") from e def format_organization_results(results: dict[str, Any]) -> str: """ Format organization search results as markdown. Args: results: Search results dictionary Returns: Formatted markdown string """ organizations = results.get("organizations", []) total = results.get("total", 0) if not organizations: return "No organizations found matching the search criteria." # Build markdown output lines = [ f"## Organization Search Results ({total} found)", "", ] for org in organizations: org_id = org.get("id", org.get("org_id", "Unknown")) name = org.get("name", "Unknown Organization") org_type = org.get("type", org.get("category", "Unknown")) city = org.get("city", "") state = org.get("state", "") lines.append(f"### {name}") lines.append(f"- **ID**: {org_id}") lines.append(f"- **Type**: {org_type}") if city or state: location_parts = [p for p in [city, state] if p] lines.append(f"- **Location**: {', '.join(location_parts)}") lines.append("") return "\n".join(lines) async def search_organizations_with_or( name_query: str, org_type: str | None = None, city: str | None = None, state: str | None = None, page_size: int = 20, page: int = 1, api_key: str | None = None, ) -> dict[str, Any]: """ Search for organizations with OR query support. This function handles OR queries by making multiple API calls and combining results. For example: "MD Anderson OR Mayo Clinic" will search for each term. Args: name_query: Name query that may contain OR operators Other args same as search_organizations Returns: Combined results from all searches with duplicates removed """ # Check if this is an OR query if " OR " in name_query or " or " in name_query: search_terms = parse_or_query(name_query) logger.info(f"Parsed OR query into terms: {search_terms}") else: # Single term search search_terms = [name_query] # Collect all unique organizations all_organizations = {} total_found = 0 # Search for each term for term in search_terms: logger.info(f"Searching organizations for term: {term}") try: results = await search_organizations( name=term, org_type=org_type, city=city, state=state, page_size=page_size, page=page, api_key=api_key, ) # Add unique organizations (deduplicate by ID) for org in results.get("organizations", []): org_id = org.get("id", org.get("org_id")) if org_id and org_id not in all_organizations: all_organizations[org_id] = org total_found += results.get("total", 0) except Exception as e: logger.warning(f"Failed to search for term '{term}': {e}") # Continue with other terms # Convert back to list and apply pagination unique_organizations = list(all_organizations.values()) # Sort by name for consistent results unique_organizations.sort(key=lambda x: x.get("name", "").lower()) # Apply pagination to combined results start_idx = (page - 1) * page_size end_idx = start_idx + page_size paginated_organizations = unique_organizations[start_idx:end_idx] return { "organizations": paginated_organizations, "total": len(unique_organizations), "page": page, "page_size": page_size, "search_terms": search_terms, # Include what we searched for "total_found_across_terms": total_found, # Total before deduplication } ``` -------------------------------------------------------------------------------- /smithery.yaml: -------------------------------------------------------------------------------- ```yaml # Smithery configuration file: https://smithery.ai/docs/config#smitheryyaml startCommand: type: stdio configSchema: # JSON Schema defining the configuration options for the MCP. type: object properties: {} commandFunction: # A JS function that produces the CLI command based on the given config to start the MCP on stdio. |- (config) => ({ command: 'biomcp', args: ['run'], env: {} }) exampleConfig: {} schemas: TrialQuery: type: object properties: conditions: type: array items: type: string description: "List of condition terms." terms: type: array items: type: string description: "General search terms that don't fit specific categories." interventions: type: array items: type: string description: "Intervention names." recruiting_status: type: string description: "Study recruitment status." study_type: type: string description: "Type of study." nct_ids: type: array items: type: string description: "Clinical trial NCT IDs" lat: type: number description: "Latitude for location search" long: type: number description: "Longitude for location search" distance: type: integer description: "Distance from lat/long in miles" min_date: type: string description: "Minimum date for filtering" max_date: type: string description: "Maximum date for filtering" date_field: type: string description: "Date field to filter on" phase: type: string description: "Trial phase filter" age_group: type: string description: "Age group filter" primary_purpose: type: string description: "Primary purpose of the trial" intervention_type: type: string description: "Type of intervention" sponsor_type: type: string description: "Type of sponsor" study_design: type: string description: "Study design" sort: type: string description: "Sort order for results" next_page_hash: type: string description: "Token to retrieve the next page of results" VariantQuery: type: object properties: gene: type: string description: "Gene symbol to search for (e.g. BRAF, TP53)" hgvsp: type: string description: "Protein change notation (e.g., p.V600E, p.Arg557His)" hgvsc: type: string description: "cDNA notation (e.g., c.1799T>A)" rsid: type: string description: "dbSNP rsID (e.g., rs113488022)" region: type: string description: "Genomic region as chr:start-end (e.g. chr1:12345-67890)" significance: type: string description: "ClinVar clinical significance" max_frequency: type: number description: "Maximum population allele frequency threshold" min_frequency: type: number description: "Minimum population allele frequency threshold" cadd: type: number description: "Minimum CADD phred score" polyphen: type: string description: "PolyPhen-2 prediction" sift: type: string description: "SIFT prediction" sources: type: array items: type: string description: "Include only specific data sources" size: type: integer description: "Number of results to return" default: 40 offset: type: integer description: "Result offset for pagination" default: 0 PubmedRequest: type: object properties: chemicals: type: array items: type: string description: "List of chemicals for filtering results." diseases: type: array items: type: string description: "Diseases such as Hypertension, Lung Adenocarcinoma, etc." genes: type: array items: type: string description: "List of genes for filtering results." keywords: type: array items: type: string description: "List of other keywords for filtering results." variants: type: array items: type: string description: "List of variants for filtering results." tools: trial_searcher: input: schema: type: object properties: query: $ref: "#/schemas/TrialQuery" required: ["query"] variant_searcher: input: schema: type: object properties: query: $ref: "#/schemas/VariantQuery" required: ["query"] article_searcher: input: schema: type: object properties: query: $ref: "#/schemas/PubmedRequest" required: ["query"] # Simple string parameter functions trial_protocol: input: schema: type: object properties: nct_id: type: string description: "A single NCT ID (e.g., NCT04280705)" required: ["nct_id"] trial_locations: input: schema: type: object properties: nct_id: type: string description: "A single NCT ID (e.g., NCT04280705)" required: ["nct_id"] trial_outcomes: input: schema: type: object properties: nct_id: type: string description: "A single NCT ID (e.g., NCT04280705)" required: ["nct_id"] trial_references: input: schema: type: object properties: nct_id: type: string description: "A single NCT ID (e.g., NCT04280705)" required: ["nct_id"] article_details: input: schema: type: object properties: pmid: type: string description: "A single PubMed ID (e.g., 34397683)" required: ["pmid"] variant_details: input: schema: type: object properties: variant_id: type: string description: "A variant identifier (e.g., chr7:g.140453136A>T)" required: ["variant_id"] ``` -------------------------------------------------------------------------------- /tests/tdd/openfda/test_adverse_events.py: -------------------------------------------------------------------------------- ```python """ Unit tests for OpenFDA adverse events integration. """ from unittest.mock import patch import pytest from biomcp.openfda.adverse_events import ( get_adverse_event, search_adverse_events, ) @pytest.mark.asyncio async def test_search_adverse_events_by_drug(): """Test searching adverse events by drug name.""" mock_response = { "meta": {"results": {"total": 100}}, "results": [ { "patient": { "drug": [ { "medicinalproduct": "IMATINIB", "openfda": { "brand_name": ["GLEEVEC"], "generic_name": ["IMATINIB MESYLATE"], }, } ], "reaction": [ {"reactionmeddrapt": "NAUSEA"}, {"reactionmeddrapt": "FATIGUE"}, ], "patientonsetage": "45", "patientsex": 2, }, "serious": "1", "seriousnesshospitalization": "1", "receivedate": "20240115", } ], } with patch( "biomcp.openfda.adverse_events.make_openfda_request" ) as mock_request: mock_request.return_value = (mock_response, None) result = await search_adverse_events(drug="imatinib", limit=10) # Verify the request was made correctly mock_request.assert_called_once() call_args = mock_request.call_args assert "imatinib" in call_args[0][1]["search"].lower() # Check the output contains expected information assert "FDA Adverse Event Reports" in result assert "imatinib" in result.lower() assert "NAUSEA" in result assert "FATIGUE" in result assert "100 reports" in result @pytest.mark.asyncio async def test_search_adverse_events_by_reaction(): """Test searching adverse events by reaction.""" mock_response = { "meta": {"results": {"total": 50}}, "results": [ { "patient": { "drug": [{"medicinalproduct": "ASPIRIN"}], "reaction": [{"reactionmeddrapt": "HEADACHE"}], }, "serious": "0", "receivedate": "20240201", } ], } with patch( "biomcp.openfda.adverse_events.make_openfda_request" ) as mock_request: mock_request.return_value = (mock_response, None) result = await search_adverse_events(reaction="headache", limit=10) # Verify the request mock_request.assert_called_once() call_args = mock_request.call_args assert "headache" in call_args[0][1]["search"].lower() # Check output assert "HEADACHE" in result assert "50 reports" in result @pytest.mark.asyncio async def test_search_adverse_events_no_params(): """Test that searching without parameters returns helpful message.""" result = await search_adverse_events() assert "Please specify" in result assert "drug name or reaction" in result assert "Examples:" in result @pytest.mark.asyncio async def test_search_adverse_events_no_results(): """Test handling when no results are found.""" with patch( "biomcp.openfda.adverse_events.make_openfda_request" ) as mock_request: mock_request.return_value = ({"results": []}, None) result = await search_adverse_events(drug="nonexistentdrug") assert "No adverse event reports found" in result assert "nonexistentdrug" in result @pytest.mark.asyncio async def test_search_adverse_events_error(): """Test error handling in adverse event search.""" with patch( "biomcp.openfda.adverse_events.make_openfda_request" ) as mock_request: mock_request.return_value = (None, "API rate limit exceeded") result = await search_adverse_events(drug="aspirin") assert "Error searching adverse events" in result assert "API rate limit exceeded" in result @pytest.mark.asyncio async def test_get_adverse_event_detail(): """Test getting detailed adverse event report.""" mock_response = { "results": [ { "safetyreportid": "12345678", "patient": { "patientonsetage": "55", "patientsex": 1, "patientweight": "75", "drug": [ { "medicinalproduct": "DRUG A", "drugindication": "HYPERTENSION", "drugdosagetext": "100mg daily", "drugadministrationroute": "048", "actiondrug": 4, } ], "reaction": [ {"reactionmeddrapt": "DIZZINESS", "reactionoutcome": 1} ], }, "serious": "1", "seriousnesshospitalization": "1", "receivedate": "20240115", "reporttype": 1, } ] } with patch( "biomcp.openfda.adverse_events.make_openfda_request" ) as mock_request: mock_request.return_value = (mock_response, None) result = await get_adverse_event("12345678") # Verify request mock_request.assert_called_once() call_args = mock_request.call_args assert "12345678" in call_args[0][1]["search"] # Check detailed output assert "12345678" in result assert "Patient Information" in result assert "55 years" in result assert "Male" in result assert "75 kg" in result assert "DRUG A" in result assert "HYPERTENSION" in result assert "100mg daily" in result assert "DIZZINESS" in result assert "Recovered/Resolved" in result @pytest.mark.asyncio async def test_get_adverse_event_not_found(): """Test handling when adverse event report is not found.""" with patch( "biomcp.openfda.adverse_events.make_openfda_request" ) as mock_request: mock_request.return_value = ({"results": []}, None) result = await get_adverse_event("NOTFOUND123") assert "NOTFOUND123" in result assert "not found" in result ``` -------------------------------------------------------------------------------- /src/biomcp/openfda/adverse_events_helpers.py: -------------------------------------------------------------------------------- ```python """ Helper functions for OpenFDA adverse events to reduce complexity. """ from collections import Counter from typing import Any from .utils import ( extract_drug_names, extract_reactions, format_count, format_drug_list, ) def format_search_summary( drug: str | None, reaction: str | None, serious: bool | None, total: int ) -> list[str]: """Format the search summary section.""" output = [] # Add search criteria search_desc = [] if drug: search_desc.append(f"**Drug**: {drug}") if reaction: search_desc.append(f"**Reaction**: {reaction}") if serious is not None: search_desc.append(f"**Serious Events**: {'Yes' if serious else 'No'}") if search_desc: output.append(" | ".join(search_desc)) output.append( f"**Total Reports Found**: {format_count(total, 'report')}\n" ) return output def format_top_reactions(results: list[dict[str, Any]]) -> list[str]: """Format top reported reactions from search results.""" output = [] all_reactions = [] for result in results: all_reactions.extend(extract_reactions(result)) if all_reactions: reaction_counts = Counter(all_reactions) top_reactions = reaction_counts.most_common(10) output.append("### Top Reported Reactions:") for rxn, count in top_reactions: percentage = (count / len(results)) * 100 output.append(f"- **{rxn}**: {count} reports ({percentage:.1f}%)") output.append("") return output def format_report_summary( result: dict[str, Any], report_num: int ) -> list[str]: """Format a single report summary.""" output = [f"#### Report {report_num}"] # Extract key information drugs = extract_drug_names(result) reactions = extract_reactions(result) # Patient info patient = result.get("patient", {}) age = patient.get("patientonsetage") sex_map = {0: "Unknown", 1: "Male", 2: "Female"} sex = sex_map.get(patient.get("patientsex"), "Unknown") # Serious outcomes serious_flag = result.get("serious", "0") outcomes = [] for code in [ "seriousnessdeath", "seriousnesslifethreatening", "seriousnesshospitalization", "seriousnessdisabling", ]: if result.get(code) == "1": outcomes.append(code.replace("seriousness", "").title()) # Format output output.append(f"- **Drugs**: {format_drug_list(drugs)}") output.append(f"- **Reactions**: {', '.join(reactions[:5])}") if age: output.append(f"- **Patient**: {age} years, {sex}") if serious_flag == "1" and outcomes: output.append(f"- **Serious Outcome**: {', '.join(outcomes)}") # Dates receive_date = result.get("receivedate", "") if receive_date: output.append( f"- **Report Date**: {receive_date[:4]}-{receive_date[4:6]}-{receive_date[6:]}" ) output.append("") return output def format_drug_details(drugs: list[dict[str, Any]]) -> list[str]: """Format drug information details.""" from .utils import clean_text output = ["### Drug Information"] for i, drug in enumerate(drugs, 1): output.append( f"\n#### Drug {i}: {drug.get('medicinalproduct', 'Unknown')}" ) if "drugindication" in drug: output.append(f"- **Indication**: {drug['drugindication']}") if "drugdosagetext" in drug: dosage = clean_text(drug["drugdosagetext"]) output.append(f"- **Dosage**: {dosage}") if "drugadministrationroute" in drug: output.append(f"- **Route**: {drug['drugadministrationroute']}") # Drug action taken action_map = { 1: "Drug withdrawn", 2: "Dose reduced", 3: "Dose increased", 4: "Dose not changed", 5: "Unknown", 6: "Not applicable", } action_code = drug.get("actiondrug") action = ( action_map.get(action_code, "Unknown") if action_code is not None else "Unknown" ) output.append(f"- **Action Taken**: {action}") output.append("") return output def format_reaction_details(reactions: list[dict[str, Any]]) -> list[str]: """Format adverse reaction details.""" output = ["### Adverse Reactions"] for reaction in reactions: rxn_name = reaction.get("reactionmeddrapt", "Unknown") outcome_map = { 1: "Recovered/Resolved", 2: "Recovering/Resolving", 3: "Not recovered/Not resolved", 4: "Recovered/Resolved with sequelae", 5: "Fatal", 6: "Unknown", } outcome_code = reaction.get("reactionoutcome") outcome = ( outcome_map.get(outcome_code, "Unknown") if outcome_code is not None else "Unknown" ) output.append(f"- **{rxn_name}**: {outcome}") output.append("") return output def format_report_metadata(result: dict[str, Any]) -> list[str]: """Format report metadata information.""" output = ["### Report Information"] receive_date = result.get("receivedate", "") if receive_date: formatted_date = ( f"{receive_date[:4]}-{receive_date[4:6]}-{receive_date[6:]}" ) output.append(f"- **Report Date**: {formatted_date}") report_type_map = { 1: "Spontaneous", 2: "Report from study", 3: "Other", 4: "Not available to sender", } report_type_code = result.get("reporttype") report_type = ( report_type_map.get(report_type_code, "Unknown") if report_type_code is not None else "Unknown" ) output.append(f"- **Report Type**: {report_type}") # Seriousness if result.get("serious") == "1": outcomes = [] if result.get("seriousnessdeath") == "1": outcomes.append("Death") if result.get("seriousnesslifethreatening") == "1": outcomes.append("Life-threatening") if result.get("seriousnesshospitalization") == "1": outcomes.append("Hospitalization") if result.get("seriousnessdisabling") == "1": outcomes.append("Disability") if result.get("seriousnesscongenitalanomali") == "1": outcomes.append("Congenital anomaly") if result.get("seriousnessother") == "1": outcomes.append("Other serious") if outcomes: output.append(f"- **Serious Outcomes**: {', '.join(outcomes)}") return output ``` -------------------------------------------------------------------------------- /docs/blog/researcher-persona-resource.md: -------------------------------------------------------------------------------- ```markdown # BioMCP Deep Researcher Persona With the release of BioMCP v0.1.2, users can now access a specialized Researcher Persona that transforms Claude into a rigorous biomedical research assistant using BioMCP's built-in sequential thinking capabilities. This persona is designed to leverage BioMCP's suite of tools for accessing PubMed articles, ClinicalTrials.gov data, and genomic variant information, while incorporating Claude's web search capabilities to produce comprehensive, thoroughly-researched reports. ## How to Use the Researcher Persona Getting started with the BioMCP Researcher Persona is straightforward: 1. Configure Claude Desktop by updating your configuration JSON with: ```json { "mcpServers": { "biomcp": { "command": "uv", "args": ["run", "--with", "biomcp-python>=0.1.2", "biomcp", "run"] } } } ``` 2. Restart Claude Desktop (the `>=0.1.2` ensures the latest version is used, which includes the built-in think tool) 3. Select the "Researcher" persona from the dropdown menu  4. Ask your biomedical research question The Researcher Persona will then work through its 10-step process, keeping you updated on its progress and ultimately producing a comprehensive research brief. ## Video Demonstration Below is a video demonstrating the Researcher Persona in action: [](https://youtu.be/tBGG53O-7Hg) ## Sequential Thinking: A Rigorous 10-Step Research Process What makes the Researcher Persona so powerful is its integration with BioMCP's built-in 'think' tool, which guides the AI through a comprehensive 10-step research methodology: 1. **Topic Scoping & Domain Framework**: Creating a comprehensive structure to ensure complete coverage 2. **Initial Information Gathering**: Establishing baseline terminology and recent developments 3. **Focused & Frontier Retrieval**: Filling knowledge gaps and identifying cutting-edge developments 4. **Primary Trials Analysis**: Identifying and analyzing key clinical trials 5. **Primary Literature Analysis**: Identifying and analyzing pivotal publications 6. **Initial Evidence Synthesis**: Creating a preliminary framework of findings 7. **Integrated Gap-Filling**: Addressing identified knowledge gaps 8. **Comprehensive Evidence Synthesis**: Creating a final integrated framework with quality assessment 9. **Self-Critique and Verification**: Rigorously assessing the quality and comprehensiveness 10. **Research Brief Creation**: Producing the final deliverable with all required elements [](https://github.com/genomoncology/biomcp/blob/main/src/biomcp/resources/researcher.md) This structured approach ensures that no important aspects of the research question are overlooked and that the final output is comprehensive, well-organized, and backed by current evidence. ## Put to the Test: Emerging Treatment Strategies for Head and Neck Cancer To evaluate the effectiveness of the Researcher Persona, we conducted a head-to-head comparison with other AI research approaches. We asked the same question to five different systems: "What are the emerging treatment strategies for head and neck cancer?" The results were impressive. The BioMCP-powered Researcher Persona, combined with Claude's web search capabilities and the built-in think tool, produced the highest-rated research brief among all approaches tested. [](https://github.com/genomoncology/biomcp-examples#researcher-announcement) The research brief produced by the BioMCP Researcher Persona stood out for several reasons: 1. **Comprehensive domain coverage**: The report covered all relevant treatment modalities (immunotherapy, targeted therapy, radiation techniques, surgery, combination approaches) 2. **Structured evidence categorization**: Findings were clearly organized by level of evidence (Established, Emerging, Experimental, Theoretical) 3. **Evidence quality assessment**: The brief included critical evaluation of source quality and evidence strength 4. **Thorough citation**: All claims were backed by specific references to scientific literature or clinical trials 5. **Self-critique**: The report included transparent limitations and identified areas requiring further research ## Explore the Example and Evaluations We've documented this comparison in detail in the [biomcp-examples repository](https://github.com/genomoncology/biomcp-examples), where you can find: - The full research briefs produced by each approach - Independent evaluations by three different AI judges (Claude 3.7, Gemini 2.5 Pro, and OpenAI o3) - Detailed scoring against a rubric that prioritizes accuracy, clarity, and comprehensiveness - Analysis of strengths and weaknesses of each approach The consensus among the judges placed the BioMCP-powered brief at the top, highlighting its exceptional structure, evidence-based approach, and comprehensive coverage. ## Beyond the Example: Wide-Ranging Applications While our example focused on head and neck cancer treatments, the BioMCP Researcher Persona can tackle a wide range of biomedical research questions: - **Therapeutic comparisons**: "Compare the efficacy and safety profiles of JAK inhibitors versus biologics for treating rheumatoid arthritis" - **Disease mechanisms**: "What is the current understanding of gut microbiome dysbiosis in inflammatory bowel disease?" - **Biomarker investigations**: "What emerging biomarkers show promise for early detection of pancreatic cancer?" - **Treatment protocols**: "What are the latest guidelines for managing anticoagulation in patients with atrial fibrillation and chronic kidney disease?" ## Join the BioMCP Community The Researcher Persona is just one example of how BioMCP is transforming AI-assisted biomedical research. We invite you to: 1. Try the Researcher Persona with your own research questions 2. Contribute to the [biomcp-examples repository](https://github.com/genomoncology/biomcp-examples) with your experiments 3. Share your feedback and suggestions for future improvements By combining specialized biomedical data access with structured research methodologies, BioMCP is helping researchers produce more comprehensive, accurate, and useful biomedical research briefs than ever before. Have a complex biomedical research question? Give the BioMCP Researcher Persona a try and experience the difference a structured, tool-powered approach can make! ``` -------------------------------------------------------------------------------- /mkdocs.yml: -------------------------------------------------------------------------------- ```yaml site_name: BioMCP repo_url: https://github.com/genomoncology/biomcp site_url: https://biomcp.org/ site_description: Biomedical Model Context Protocol Server site_author: Ian Maurer edit_uri: edit/main/docs/ repo_name: genomoncology/biomcp copyright: Maintained by <a href="https://genomoncology.com">genomoncology</a>. nav: - Home: index.md - Getting Started: - Quick Start: getting-started/01-quickstart-cli.md - Claude Desktop: getting-started/02-claude-desktop-integration.md - API Keys: getting-started/03-authentication-and-api-keys.md - FAQ: faq-condensed.md - Troubleshooting: troubleshooting.md - User Guide: - Overview: concepts/01-what-is-biomcp.md - Finding Articles: how-to-guides/01-find-articles-and-cbioportal-data.md - Finding Trials: how-to-guides/02-find-trials-with-nci-and-biothings.md - Analyzing Variants: how-to-guides/03-get-comprehensive-variant-annotations.md - Predicting Effects: how-to-guides/04-predict-variant-effects-with-alphagenome.md - Searching Organizations: how-to-guides/06-search-nci-organizations-and-interventions.md - Research Workflows: workflows/all-workflows.md - Examples: - Pydantic AI Integration: tutorials/pydantic-ai-integration.md - Remote Connection: tutorials/remote-connection.md - BioThings Examples: tutorials/biothings-prompts.md - NCI Examples: tutorials/nci-prompts.md - AlphaGenome Tutorial: tutorials/claude-code-biomcp-alphagenome.md - OpenFDA Examples: tutorials/openfda-prompts.md - Concepts: - Deep Researcher: concepts/02-the-deep-researcher-persona.md - Sequential Thinking: concepts/03-sequential-thinking-with-the-think-tool.md - Reference: - Quick Reference: reference/quick-reference.md - CLI Commands: user-guides/01-command-line-interface.md - MCP Tools: user-guides/02-mcp-tools-reference.md - API Documentation: - API Overview: apis/overview.md - Python SDK: apis/python-sdk.md - Error Codes: apis/error-codes.md - IDE Integration: user-guides/03-integrating-with-ides-and-clients.md - Developer: - Architecture: - Overview: reference/quick-architecture.md - Visual Diagrams: reference/visual-architecture.md - Detailed Diagrams: reference/architecture-diagrams.md - Data Sources: - Overview: backend-services-reference/01-overview.md - PubTator3/PubMed: backend-services-reference/06-pubtator3.md - ClinicalTrials.gov: backend-services-reference/04-clinicaltrials-gov.md - NCI CTS API: backend-services-reference/05-nci-cts-api.md - BioThings Suite: backend-services-reference/02-biothings-suite.md - cBioPortal: backend-services-reference/03-cbioportal.md - AlphaGenome: backend-services-reference/07-alphagenome.md - OpenFDA: tutorials/openfda-integration.md - Development: - Contributing: developer-guides/02-contributing-and-testing.md - Deployment: developer-guides/01-server-deployment.md - BigQuery Monitoring: how-to-guides/05-logging-and-monitoring-with-bigquery.md - Technical Details: - Transport Protocol: developer-guides/04-transport-protocol.md - Error Handling: developer-guides/05-error-handling.md - HTTP Client: developer-guides/06-http-client-and-caching.md - Performance: developer-guides/07-performance-optimizations.md - Third-Party APIs: developer-guides/03-third-party-endpoints.md - Security: - FDA Integration Security: FDA_SECURITY.md - About: - Blog: - Clinical Trial Search: blog/ai-assisted-clinical-trial-search-analysis.md - Researcher Persona: blog/researcher-persona-resource.md - Project: - Changelog: changelog.md - Policies: policies.md - GenomOncology: genomoncology.md plugins: - search: lang: en separator: '[\s\-\.]+' - mkdocstrings: handlers: python: paths: ["src/biomcp"] # Note: sitemap plugin requires additional installation # Uncomment after installing: pip install mkdocs-sitemap # - sitemap: # changefreq: weekly # priority: 0.5 theme: name: material # custom_dir: overrides favicon: assets/favicon.ico logo: assets/icon.png features: - navigation.tabs - navigation.tabs.sticky - navigation.sections - navigation.instant - navigation.tracking - navigation.top - toc.follow - search.suggest - search.highlight palette: - media: "(prefers-color-scheme: light)" scheme: default primary: white accent: deep orange toggle: icon: material/brightness-7 name: Switch to dark mode - media: "(prefers-color-scheme: dark)" scheme: slate primary: black accent: deep orange toggle: icon: material/brightness-4 name: Switch to light mode icon: repo: fontawesome/brands/github extra: social: - icon: fontawesome/brands/github link: https://github.com/genomoncology/biomcp - icon: fontawesome/brands/python link: https://pypi.org/project/biomcp-python meta: - property: og:type content: website - property: og:title content: BioMCP - Biomedical Model Context Protocol Server - property: og:description content: AI-powered biomedical research tool integrating PubMed, ClinicalTrials.gov, and genomic databases - property: og:image content: https://biomcp.org/assets/icon.png - property: og:url content: https://biomcp.org/ - name: twitter:card content: summary - name: twitter:title content: BioMCP - Biomedical Model Context Protocol - name: twitter:description content: AI-powered biomedical research tool for PubMed, clinical trials, and genomic data - name: keywords content: biomedical, MCP, AI, PubMed, clinical trials, genomics, bioinformatics, Claude Desktop extra_css: - stylesheets/extra.css - stylesheets/announcement.css # extra_javascript: (removed - no third-party dependencies) markdown_extensions: - toc: permalink: true - pymdownx.arithmatex: generic: true - admonition # Nice looking note/warning boxes - pymdownx.details # Collapsible sections - pymdownx.highlight: # Code highlighting anchor_linenums: true - pymdownx.inlinehilite - pymdownx.snippets # Include content from other files - pymdownx.superfences # Nested code blocks - pymdownx.tabbed: # Tabbed content alternate_style: true ``` -------------------------------------------------------------------------------- /tests/tdd/variants/test_getter.py: -------------------------------------------------------------------------------- ```python """Tests for variant getter module.""" from unittest.mock import AsyncMock, patch import pytest from biomcp.constants import DEFAULT_ASSEMBLY from biomcp.variants import getter class TestGetVariant: """Test the get_variant function.""" @pytest.mark.asyncio async def test_get_variant_default_assembly(self): """Test that get_variant defaults to hg19 assembly.""" mock_response = { "_id": "rs113488022", "dbsnp": {"rsid": "rs113488022"}, } with patch("biomcp.http_client.request_api") as mock_request: mock_request.return_value = (mock_response, None) await getter.get_variant("rs113488022") # Verify assembly parameter was passed with default value call_args = mock_request.call_args assert call_args[1]["request"]["assembly"] == "hg19" @pytest.mark.asyncio async def test_get_variant_hg38_assembly(self): """Test that get_variant accepts hg38 assembly parameter.""" mock_response = { "_id": "rs113488022", "dbsnp": {"rsid": "rs113488022"}, } with patch("biomcp.http_client.request_api") as mock_request: mock_request.return_value = (mock_response, None) await getter.get_variant("rs113488022", assembly="hg38") # Verify assembly parameter was passed correctly call_args = mock_request.call_args assert call_args[1]["request"]["assembly"] == "hg38" @pytest.mark.asyncio async def test_get_variant_hg19_assembly(self): """Test that get_variant accepts hg19 assembly parameter explicitly.""" mock_response = { "_id": "rs113488022", "dbsnp": {"rsid": "rs113488022"}, } with patch("biomcp.http_client.request_api") as mock_request: mock_request.return_value = (mock_response, None) await getter.get_variant("rs113488022", assembly="hg19") # Verify assembly parameter was passed correctly call_args = mock_request.call_args assert call_args[1]["request"]["assembly"] == "hg19" @pytest.mark.asyncio async def test_get_variant_includes_all_fields(self): """Test that request includes all required fields.""" mock_response = {"_id": "rs113488022"} with patch("biomcp.http_client.request_api") as mock_request: mock_request.return_value = (mock_response, None) await getter.get_variant("rs113488022", assembly="hg38") # Verify both fields and assembly are in request call_args = mock_request.call_args request_params = call_args[1]["request"] assert "fields" in request_params assert request_params["fields"] == "all" assert "assembly" in request_params assert request_params["assembly"] == "hg38" @pytest.mark.asyncio async def test_get_variant_with_external_annotations(self): """Test that assembly parameter works with external annotations.""" from biomcp.variants.external import EnhancedVariantAnnotation mock_response = { "_id": "rs113488022", "dbsnp": {"rsid": "rs113488022"}, "dbnsfp": {"genename": "BRAF"}, } with ( patch("biomcp.http_client.request_api") as mock_request, patch( "biomcp.variants.getter.ExternalVariantAggregator" ) as mock_aggregator, ): mock_request.return_value = (mock_response, None) # Mock the aggregator with proper EnhancedVariantAnnotation mock_enhanced = EnhancedVariantAnnotation( variant_id="rs113488022", tcga=None, thousand_genomes=None, cbioportal=None, error_sources=[], ) mock_agg_instance = AsyncMock() mock_agg_instance.get_enhanced_annotations = AsyncMock( return_value=mock_enhanced ) mock_aggregator.return_value = mock_agg_instance await getter.get_variant( "rs113488022", assembly="hg38", include_external=True, ) # Verify assembly was still passed correctly call_args = mock_request.call_args assert call_args[1]["request"]["assembly"] == "hg38" class TestVariantDetailsMCPTool: """Test the _variant_details MCP tool.""" @pytest.mark.asyncio async def test_variant_details_default_assembly(self): """Test that _variant_details defaults to hg19 assembly.""" with patch("biomcp.variants.getter.get_variant") as mock_get: mock_get.return_value = "Variant details" await getter._variant_details( call_benefit="Testing default assembly", variant_id="rs113488022", ) # Verify get_variant was called with default assembly mock_get.assert_called_once_with( "rs113488022", output_json=False, include_external=True, assembly=DEFAULT_ASSEMBLY, ) @pytest.mark.asyncio async def test_variant_details_custom_assembly(self): """Test that _variant_details accepts custom assembly parameter.""" with patch("biomcp.variants.getter.get_variant") as mock_get: mock_get.return_value = "Variant details" await getter._variant_details( call_benefit="Testing hg38 assembly", variant_id="rs113488022", assembly="hg38", ) # Verify get_variant was called with hg38 mock_get.assert_called_once_with( "rs113488022", output_json=False, include_external=True, assembly="hg38", ) @pytest.mark.asyncio async def test_variant_details_with_all_params(self): """Test that all parameters are passed through correctly.""" with patch("biomcp.variants.getter.get_variant") as mock_get: mock_get.return_value = "Variant details" await getter._variant_details( call_benefit="Testing all parameters", variant_id="chr7:g.140453136A>T", include_external=False, assembly="hg19", ) # Verify all params were passed mock_get.assert_called_once_with( "chr7:g.140453136A>T", output_json=False, include_external=False, assembly="hg19", ) ``` -------------------------------------------------------------------------------- /docs/developer-guides/04-transport-protocol.md: -------------------------------------------------------------------------------- ```markdown # Transport Protocol Guide This guide explains BioMCP's transport protocol options, with a focus on the new Streamable HTTP transport that provides better scalability and reliability for production deployments. ## Overview BioMCP supports multiple transport protocols to accommodate different deployment scenarios: | Transport | Use Case | Endpoint | Protocol Version | | ------------------- | -------------------------------------------- | -------- | ---------------- | | **STDIO** | Local development, direct Claude integration | N/A | All | | **Worker/SSE** | Legacy cloud deployments | `/sse` | Pre-2025 | | **Streamable HTTP** | Modern cloud deployments | `/mcp` | 2025-03-26+ | ## Streamable HTTP Transport ### What is Streamable HTTP? Streamable HTTP is the latest MCP transport protocol (specification version 2025-03-26) that provides: - **Single endpoint** (`/mcp`) for all operations - **Dynamic response modes**: JSON for quick operations, SSE for long-running tasks - **Session management** via `session_id` query parameter - **Better scalability**: No permanent connections required - **Automatic reconnection** and session recovery ### Architecture The Streamable HTTP transport follows this flow: 1. **MCP Client** sends POST request to `/mcp` endpoint 2. **BioMCP Server** processes the request 3. **Response Type** determined by operation: - Quick operations return JSON response - Long operations return SSE stream 4. **Session Management** maintains state via session_id parameter ### Implementation Details BioMCP leverages FastMCP's native streamable HTTP support: ```python # In core.py mcp_app = FastMCP( name="BioMCP", stateless_http=True, # Enables streamable HTTP ) ``` The transport is automatically handled by FastMCP 1.12.3+, providing: - Request routing - Session management - Response type negotiation - Error handling ## Migration Guide ### From SSE to Streamable HTTP If you're currently using the legacy SSE transport, migrate to streamable HTTP: #### 1. Update Server Configuration **Before (SSE/Worker mode):** ```bash biomcp run --mode worker ``` **After (Streamable HTTP):** ```bash biomcp run --mode streamable_http ``` #### 2. Update Client Configuration **MCP Inspector:** ```bash npx @modelcontextprotocol/inspector uv run --with . biomcp run --mode streamable_http ``` **Claude Desktop Configuration:** ```json { "mcpServers": { "biomcp": { "command": "docker", "args": [ "run", "-p", "8000:8000", "biomcp:latest", "biomcp", "run", "--mode", "streamable_http" ] } } } ``` #### 3. Update Cloudflare Worker The worker now supports both GET (legacy SSE) and POST (streamable HTTP) on the `/mcp` endpoint: ```javascript // Automatically routes based on method .get("/mcp", async (c) => { // Legacy SSE transport }) .post("/mcp", async (c) => { // Streamable HTTP transport }) ``` ### Backward Compatibility All legacy endpoints remain functional: - `/sse` - Server-sent events transport - `/health` - Health check endpoint - `/events` - Event streaming endpoint ## Configuration Options ### Server Modes ```bash # Local development (STDIO) biomcp run # Legacy SSE transport biomcp run --mode worker # Modern streamable HTTP biomcp run --mode streamable_http --host 0.0.0.0 --port 8000 ``` ### Environment Variables | Variable | Description | Default | | --------------- | ----------------------- | ------- | | `MCP_TRANSPORT` | Override transport mode | None | | `MCP_HOST` | Server bind address | 0.0.0.0 | | `MCP_PORT` | Server port | 8000 | ## Session Management Streamable HTTP uses session IDs to maintain state across requests: ```http POST /mcp?session_id=abc123 HTTP/1.1 Content-Type: application/json { "jsonrpc": "2.0", "method": "initialize", "params": {...} } ``` Sessions are: - Created automatically on first request - Maintained in server memory - Cleaned up after inactivity timeout - Isolated between different clients ## Performance Considerations ### Response Mode Selection The server automatically selects the optimal response mode: | Operation Type | Response Mode | Example | | ----------------- | ------------- | ---------------------- | | Quick queries | JSON | `search(limit=10)` | | Large results | SSE | `search(limit=1000)` | | Real-time updates | SSE | Thinking tool progress | ### Optimization Tips 1. **Use session IDs** for related requests to avoid re-initialization 2. **Batch operations** when possible to reduce round trips 3. **Set appropriate timeouts** for long-running operations 4. **Monitor response times** to identify bottlenecks ## Troubleshooting ### Common Issues #### 1. Connection Refused ``` Error: connect ECONNREFUSED 127.0.0.1:8000 ``` **Solution**: Ensure server is running with `--host 0.0.0.0` for Docker deployments. #### 2. Session Not Found ``` Error: Session 'xyz' not found ``` **Solution**: Session may have expired. Omit session_id to create new session. #### 3. Timeout on Large Results ``` Error: Request timeout after 30s ``` **Solution**: Increase client timeout or reduce result size with `limit` parameter. ### Debug Mode Enable debug logging to troubleshoot transport issues: ```bash LOG_LEVEL=debug biomcp run --mode streamable_http ``` ## Security Considerations ### Authentication BioMCP does not implement authentication at the transport layer. Secure your deployment using: - **API Gateway**: AWS API Gateway, Kong, etc. - **Reverse Proxy**: Nginx with auth modules - **Cloud IAM**: Platform-specific access controls ### Rate Limiting Implement rate limiting at the infrastructure layer: ```nginx # Nginx example limit_req_zone $binary_remote_addr zone=mcp:10m rate=10r/s; location /mcp { limit_req zone=mcp burst=20; proxy_pass http://biomcp:8000; } ``` ### CORS Configuration For browser-based clients, configure CORS headers: ```python # Handled automatically by FastMCP when stateless_http=True ``` ## Monitoring ### Health Checks ```bash # Check server health curl http://localhost:8000/health # Response {"status": "ok", "transport": "streamable_http"} ``` ### Metrics Monitor these key metrics: - Request rate on `/mcp` endpoint - Response time percentiles (p50, p95, p99) - Session count and duration - Error rate by error type ## Next Steps - Review [MCP Specification](https://spec.modelcontextprotocol.io) for protocol details For questions or issues, please visit our [GitHub repository](https://github.com/genomoncology/biomcp). ``` -------------------------------------------------------------------------------- /tests/tdd/test_europe_pmc_fetch.py: -------------------------------------------------------------------------------- ```python """Tests for Europe PMC article fetching via DOI.""" import json from unittest.mock import Mock, patch import pytest from biomcp.articles.fetch import _article_details, is_doi, is_pmid from biomcp.articles.preprints import fetch_europe_pmc_article class TestDOIDetection: """Test DOI and PMID detection functions.""" def test_valid_dois(self): """Test that valid DOIs are correctly identified.""" valid_dois = [ "10.1101/2024.01.20.23288905", "10.1038/nature12373", "10.1016/j.cell.2023.05.001", "10.1126/science.abc1234", ] for doi in valid_dois: assert ( is_doi(doi) is True ), f"Expected {doi} to be identified as DOI" assert ( is_pmid(doi) is False ), f"Expected {doi} NOT to be identified as PMID" def test_valid_pmids(self): """Test that valid PMIDs are correctly identified.""" valid_pmids = [ "35271234", "12345678", "1", "999999999", ] for pmid in valid_pmids: assert ( is_pmid(pmid) is True ), f"Expected {pmid} to be identified as PMID" assert ( is_doi(pmid) is False ), f"Expected {pmid} NOT to be identified as DOI" def test_invalid_identifiers(self): """Test that invalid identifiers are rejected by both functions.""" invalid_ids = [ "PMC11193658", # PMC ID "abc123", # Random string "10.1101", # Incomplete DOI "nature12373", # DOI without prefix "", # Empty string ] for identifier in invalid_ids: assert ( is_doi(identifier) is False ), f"Expected {identifier} NOT to be identified as DOI" assert ( is_pmid(identifier) is False ), f"Expected {identifier} NOT to be identified as PMID" class TestEuropePMCFetch: """Test Europe PMC article fetching.""" @pytest.mark.asyncio async def test_fetch_europe_pmc_article_success(self): """Test successful fetch from Europe PMC.""" # Mock the response mock_response = Mock() mock_response.hitCount = 1 mock_response.results = [ Mock( id="PPR790987", source="PPR", pmid=None, pmcid=None, doi="10.1101/2024.01.20.23288905", title="Test Article Title", authorString="Author A, Author B, Author C", journalTitle=None, pubYear="2024", firstPublicationDate="2024-01-23", abstractText="This is the abstract text.", ) ] with patch( "biomcp.articles.preprints.http_client.request_api" ) as mock_request: mock_request.return_value = (mock_response, None) result = await fetch_europe_pmc_article( "10.1101/2024.01.20.23288905", output_json=True ) data = json.loads(result) assert len(data) == 1 article = data[0] assert article["doi"] == "10.1101/2024.01.20.23288905" assert article["title"] == "Test Article Title" assert article["journal"] == "Preprint Server (preprint)" assert article["date"] == "2024-01-23" assert article["authors"] == ["Author A", "Author B", "Author C"] assert article["abstract"] == "This is the abstract text." assert article["source"] == "Europe PMC" assert article["pmid"] is None assert "europepmc.org" in article["pmc_url"] @pytest.mark.asyncio async def test_fetch_europe_pmc_article_not_found(self): """Test fetch when article is not found in Europe PMC.""" mock_response = Mock() mock_response.hitCount = 0 mock_response.results = [] with patch( "biomcp.articles.preprints.http_client.request_api" ) as mock_request: mock_request.return_value = (mock_response, None) result = await fetch_europe_pmc_article( "10.1101/invalid.doi", output_json=True ) data = json.loads(result) assert len(data) == 1 assert data[0]["error"] == "Article not found in Europe PMC" @pytest.mark.asyncio async def test_fetch_europe_pmc_article_error(self): """Test fetch when Europe PMC API returns an error.""" mock_error = Mock() mock_error.code = 500 mock_error.message = "Internal Server Error" with patch( "biomcp.articles.preprints.http_client.request_api" ) as mock_request: mock_request.return_value = (None, mock_error) result = await fetch_europe_pmc_article( "10.1101/2024.01.20.23288905", output_json=True ) data = json.loads(result) assert len(data) == 1 assert data[0]["error"] == "Error 500: Internal Server Error" class TestArticleDetailsRouting: """Test that _article_details correctly routes DOIs to Europe PMC.""" @pytest.mark.asyncio async def test_doi_routes_to_europe_pmc(self): """Test that DOIs are routed to fetch_europe_pmc_article.""" test_doi = "10.1101/2024.01.20.23288905" with patch( "biomcp.articles.preprints.fetch_europe_pmc_article" ) as mock_europe_pmc: mock_europe_pmc.return_value = "Europe PMC result" result = await _article_details("Test", test_doi) mock_europe_pmc.assert_called_once_with(test_doi, output_json=True) assert result == "Europe PMC result" @pytest.mark.asyncio async def test_pmid_routes_to_pubtator(self): """Test that PMIDs are routed to fetch_articles.""" test_pmid = "35271234" with patch( "biomcp.articles.fetch.fetch_articles" ) as mock_fetch_articles: mock_fetch_articles.return_value = "PubTator result" result = await _article_details("Test", test_pmid) mock_fetch_articles.assert_called_once_with( [35271234], full=True, output_json=True ) assert result == "PubTator result" @pytest.mark.asyncio async def test_invalid_identifier_returns_error(self): """Test that invalid identifiers return an error.""" invalid_id = "PMC12345" result = await _article_details("Test", invalid_id) data = json.loads(result) assert len(data) == 1 assert "Invalid identifier format" in data[0]["error"] assert "PMC12345" in data[0]["error"] ``` -------------------------------------------------------------------------------- /src/biomcp/workers/worker_entry.js: -------------------------------------------------------------------------------- ```javascript /** * BioMCP Worker – Auth‑less version (rev 1.8) * * Fix: Added improved error handling and increased timeouts for list requests */ // Server URL will be configured from environment variables let REMOTE_MCP_SERVER_URL = "http://localhost:8000"; // Default fallback const DEBUG = true; const log = (m) => DEBUG && console.log("[DEBUG]", m); const CORS = { "Access-Control-Allow-Origin": "*", "Access-Control-Allow-Methods": "GET, POST, OPTIONS", "Access-Control-Allow-Headers": "*", "Access-Control-Max-Age": "86400", }; const json = (o, s = 200) => new Response(JSON.stringify(o, null, 2), { status: s, headers: { "Content-Type": "application/json", ...CORS }, }); let forwardPath = "/messages"; // for proxying JSON‑RPC POSTS (no query) let resourceEndpoint = null; // full string we echo back (/messages/?sid=…) // Track active SSE connections to avoid duplicate connections const activeConnections = new Map(); export default { async fetch(req, env, ctx) { // Use environment variable if available, otherwise use the default REMOTE_MCP_SERVER_URL = env.REMOTE_MCP_SERVER_URL || REMOTE_MCP_SERVER_URL; const url = new URL(req.url); log(`${req.method} ${url.pathname}${url.search}`); if (req.method === "OPTIONS") return new Response(null, { status: 204, headers: CORS }); if (url.pathname === "/status" || url.pathname === "/debug") return json({ worker: "BioMCP-authless", remote: REMOTE_MCP_SERVER_URL, forwardPath, resourceEndpoint, }); if (url.pathname === "/sse" || url.pathname === "/events") return serveSSE(req, ctx); if (req.method === "POST") { const sid = url.searchParams.get("session_id"); if (!sid) return new Response("Missing session_id", { status: 400 }); return proxyPost(req, forwardPath, sid); } return new Response("Not found", { status: 404 }); }, }; async function proxyPost(req, path, sid) { const body = await req.text(); const target = `${REMOTE_MCP_SERVER_URL}${path}?session_id=${encodeURIComponent( sid, )}`; try { // Parse the request to check if it's a list request that might need a longer timeout let jsonBody; try { jsonBody = JSON.parse(body); } catch (e) { // Not valid JSON, proceed with normal request jsonBody = {}; } // Set a longer timeout for list requests that tend to time out const timeout = jsonBody.method && (jsonBody.method === "tools/list" || jsonBody.method === "resources/list") ? 30000 : 10000; // Use AbortController to implement timeout const controller = new AbortController(); const timeoutId = setTimeout(() => controller.abort(), timeout); log(`Proxying ${jsonBody.method || "request"} with timeout ${timeout}ms`); const resp = await fetch(target, { method: "POST", headers: { "Content-Type": "application/json" }, body, signal: controller.signal, }); clearTimeout(timeoutId); // If it's a list request, cache the response for future use if ( jsonBody.method && (jsonBody.method === "tools/list" || jsonBody.method === "resources/list") ) { log(`Received response for ${jsonBody.method}`); } return new Response(await resp.text(), { status: resp.status, headers: { "Content-Type": "application/json", ...CORS }, }); } catch (error) { log(`POST error: ${error.message}`); // For timeout errors, provide a default empty response for list requests if (error.name === "AbortError") { try { const jsonBody = JSON.parse(body); if (jsonBody.method === "tools/list") { log("Returning empty tools list due to timeout"); return new Response( JSON.stringify({ jsonrpc: "2.0", id: jsonBody.id, result: { tools: [] }, }), { status: 200, headers: { "Content-Type": "application/json", ...CORS }, }, ); } else if (jsonBody.method === "resources/list") { log("Returning empty resources list due to timeout"); return new Response( JSON.stringify({ jsonrpc: "2.0", id: jsonBody.id, result: { resources: [] }, }), { status: 200, headers: { "Content-Type": "application/json", ...CORS }, }, ); } } catch (e) { // If parsing fails, fall through to default error response } } return new Response(JSON.stringify({ error: error.message }), { status: 502, headers: { "Content-Type": "application/json", ...CORS }, }); } } function serveSSE(clientReq, ctx) { const enc = new TextEncoder(); let keepalive; const upstreamCtl = new AbortController(); const stream = new ReadableStream({ async start(ctrl) { ctrl.enqueue(enc.encode("event: ready\ndata: {}\n\n")); clientReq.signal.addEventListener("abort", () => { clearInterval(keepalive); upstreamCtl.abort(); ctrl.close(); }); try { const u = await fetch(`${REMOTE_MCP_SERVER_URL}/sse`, { headers: { Accept: "text/event-stream" }, signal: upstreamCtl.signal, }); if (!u.ok || !u.body) throw new Error(`Upstream SSE ${u.status}`); const r = u.body.getReader(); while (true) { const { value, done } = await r.read(); if (done) break; if (value) { const text = new TextDecoder().decode(value); // capture first endpoint once if (!resourceEndpoint) { const m = text.match( /data:\s*(\/messages\/\?session_id=[A-Za-z0-9_-]+)/, ); if (m) { resourceEndpoint = m[1]; forwardPath = resourceEndpoint.split("?")[0]; log(`Captured endpoint ${resourceEndpoint}`); ctrl.enqueue( enc.encode(`event: resource\ndata: ${resourceEndpoint}\n\n`), ); } } ctrl.enqueue(value); } } } catch (e) { if (e.name !== "AbortError") { log(`SSE error: ${e.message}`); ctrl.enqueue(enc.encode(`event: error\ndata: ${e.message}\n\n`)); } } // Reduce keepalive interval to 5 seconds to prevent timeouts keepalive = setInterval(() => { try { ctrl.enqueue(enc.encode(":keepalive\n\n")); } catch (_) { clearInterval(keepalive); } }, 5000); }, }); return new Response(stream, { headers: { "Content-Type": "text/event-stream", "Cache-Control": "no-cache", Connection: "keep-alive", ...CORS, }, }); } ``` -------------------------------------------------------------------------------- /tests/tdd/test_drug_approvals.py: -------------------------------------------------------------------------------- ```python """Tests for FDA drug approvals module.""" import json from pathlib import Path from unittest.mock import AsyncMock, patch import pytest from biomcp.openfda.drug_approvals import ( get_drug_approval, search_drug_approvals, ) # Load mock data MOCK_DIR = Path(__file__).parent.parent / "data" / "openfda" MOCK_APPROVALS_SEARCH = json.loads( (MOCK_DIR / "drugsfda_search.json").read_text() ) MOCK_APPROVAL_DETAIL = json.loads( (MOCK_DIR / "drugsfda_detail.json").read_text() ) class TestDrugApprovals: """Test drug approvals functionality.""" @pytest.mark.asyncio async def test_search_drug_approvals_success(self): """Test successful drug approval search.""" with patch( "biomcp.openfda.drug_approvals.make_openfda_request", new_callable=AsyncMock, ) as mock_request: mock_request.return_value = (MOCK_APPROVALS_SEARCH, None) result = await search_drug_approvals( drug="pembrolizumab", limit=10, ) assert "FDA Drug Approval Records" in result assert "pembrolizumab" in result.lower() assert "Application" in result assert "BLA125514" in result mock_request.assert_called_once() @pytest.mark.asyncio async def test_search_drug_approvals_with_filters(self): """Test drug approval search with multiple filters.""" with patch( "biomcp.openfda.drug_approvals.make_openfda_request", new_callable=AsyncMock, ) as mock_request: mock_request.return_value = (MOCK_APPROVALS_SEARCH, None) result = await search_drug_approvals( drug="keytruda", application_number="BLA125514", approval_year="2014", limit=5, api_key="test-key", ) assert "FDA Drug Approval Records" in result # Verify API key was passed as the 4th positional argument call_args = mock_request.call_args assert ( call_args[0][3] == "test-key" ) # api_key is 4th positional arg @pytest.mark.asyncio async def test_search_drug_approvals_no_results(self): """Test drug approval search with no results.""" with patch( "biomcp.openfda.drug_approvals.make_openfda_request", new_callable=AsyncMock, ) as mock_request: mock_request.return_value = ({"results": []}, None) result = await search_drug_approvals(drug="nonexistent-drug") assert "No drug approval records found" in result @pytest.mark.asyncio async def test_search_drug_approvals_api_error(self): """Test drug approval search with API error.""" with patch( "biomcp.openfda.drug_approvals.make_openfda_request", new_callable=AsyncMock, ) as mock_request: mock_request.return_value = (None, "API rate limit exceeded") result = await search_drug_approvals(drug="test") assert "Error searching drug approvals" in result assert "API rate limit exceeded" in result @pytest.mark.asyncio async def test_get_drug_approval_success(self): """Test getting specific drug approval details.""" with patch( "biomcp.openfda.drug_approvals.make_openfda_request", new_callable=AsyncMock, ) as mock_request: mock_request.return_value = (MOCK_APPROVAL_DETAIL, None) result = await get_drug_approval("BLA125514") # Should have detailed approval info assert "BLA125514" in result or "Drug Approval Details" in result assert "BLA125514" in result assert "Products" in result assert "Submission" in result @pytest.mark.asyncio async def test_get_drug_approval_not_found(self): """Test getting drug approval that doesn't exist.""" with patch( "biomcp.openfda.drug_approvals.make_openfda_request", new_callable=AsyncMock, ) as mock_request: mock_request.return_value = ({"results": []}, None) result = await get_drug_approval("INVALID123") assert "No approval record found" in result assert "INVALID123" in result @pytest.mark.asyncio async def test_get_drug_approval_with_api_key(self): """Test getting drug approval with API key.""" with patch( "biomcp.openfda.drug_approvals.make_openfda_request", new_callable=AsyncMock, ) as mock_request: mock_request.return_value = (MOCK_APPROVAL_DETAIL, None) result = await get_drug_approval( "BLA125514", api_key="test-api-key", ) # Should have detailed approval info assert "BLA125514" in result or "Drug Approval Details" in result # Verify API key was passed as the 4th positional argument call_args = mock_request.call_args assert ( call_args[0][3] == "test-api-key" ) # api_key is 4th positional arg @pytest.mark.asyncio async def test_search_drug_approvals_pagination(self): """Test drug approval search pagination.""" with patch( "biomcp.openfda.drug_approvals.make_openfda_request", new_callable=AsyncMock, ) as mock_request: mock_response = { "meta": {"results": {"total": 100}}, "results": MOCK_APPROVALS_SEARCH["results"], } mock_request.return_value = (mock_response, None) result = await search_drug_approvals( drug="cancer", limit=10, skip=20, ) # The output format is different - just check for the total assert "100" in result # Verify skip parameter was passed (2nd positional arg) call_args = mock_request.call_args assert ( call_args[0][1]["skip"] == "20" ) # params is 2nd positional arg, value is string @pytest.mark.asyncio async def test_approval_year_validation(self): """Test that approval year is properly formatted.""" with patch( "biomcp.openfda.drug_approvals.make_openfda_request", new_callable=AsyncMock, ) as mock_request: mock_request.return_value = (MOCK_APPROVALS_SEARCH, None) await search_drug_approvals( approval_year="2023", ) # Check that year was properly formatted in query call_args = mock_request.call_args params = call_args[0][1] # params is 2nd positional arg assert "marketing_status_date" in params["search"] assert "[2023-01-01 TO 2023-12-31]" in params["search"] ``` -------------------------------------------------------------------------------- /src/biomcp/articles/fetch.py: -------------------------------------------------------------------------------- ```python import json import re from ssl import TLSVersion from typing import Annotated, Any from pydantic import BaseModel, Field, computed_field from .. import http_client, render from ..constants import PUBTATOR3_FULLTEXT_URL from ..http_client import RequestError class PassageInfo(BaseModel): section_type: str | None = Field( None, description="Type of the section.", ) passage_type: str | None = Field( None, alias="type", description="Type of the passage.", ) class Passage(BaseModel): info: PassageInfo | None = Field( None, alias="infons", ) text: str | None = None @property def section_type(self) -> str: section_type = None if self.info is not None: section_type = self.info.section_type or self.info.passage_type section_type = section_type or "UNKNOWN" return section_type.upper() @property def is_title(self) -> bool: return self.section_type == "TITLE" @property def is_abstract(self) -> bool: return self.section_type == "ABSTRACT" @property def is_text(self) -> bool: return self.section_type in { "INTRO", "RESULTS", "METHODS", "DISCUSS", "CONCL", "FIG", "TABLE", } class Article(BaseModel): pmid: int | None = Field( None, description="PubMed ID of the reference article.", ) pmcid: str | None = Field( None, description="PubMed Central ID of the reference article.", ) date: str | None = Field( None, description="Date of the reference article's publication.", ) journal: str | None = Field( None, description="Journal name.", ) authors: list[str] | None = Field( None, description="List of authors.", ) passages: list[Passage] = Field( ..., alias="passages", description="List of passages in the reference article.", exclude=True, ) @computed_field def title(self) -> str: lines = [] for passage in filter(lambda p: p.is_title, self.passages): if passage.text: lines.append(passage.text) return " ... ".join(lines) or f"Article: {self.pmid}" @computed_field def abstract(self) -> str: lines = [] for passage in filter(lambda p: p.is_abstract, self.passages): if passage.text: lines.append(passage.text) return "\n\n".join(lines) or f"Article: {self.pmid}" @computed_field def full_text(self) -> str: lines = [] for passage in filter(lambda p: p.is_text, self.passages): if passage.text: lines.append(passage.text) return "\n\n".join(lines) or "" @computed_field def pubmed_url(self) -> str | None: url = None if self.pmid: url = f"https://pubmed.ncbi.nlm.nih.gov/{self.pmid}/" return url @computed_field def pmc_url(self) -> str | None: """Generates the PMC URL if PMCID exists.""" url = None if self.pmcid: url = f"https://www.ncbi.nlm.nih.gov/pmc/articles/{self.pmcid}/" return url class FetchArticlesResponse(BaseModel): articles: list[Article] = Field( ..., alias="PubTator3", description="List of full texts Articles retrieved from PubTator3.", ) def get_abstract(self, pmid: int | None) -> str | None: for article in self.articles: if pmid and article.pmid == pmid: return str(article.abstract) return None async def call_pubtator_api( pmids: list[int], full: bool, ) -> tuple[FetchArticlesResponse | None, RequestError | None]: """Fetch the text of a list of PubMed IDs.""" request = { "pmids": ",".join(str(pmid) for pmid in pmids), "full": str(full).lower(), } response, error = await http_client.request_api( url=PUBTATOR3_FULLTEXT_URL, request=request, response_model_type=FetchArticlesResponse, tls_version=TLSVersion.TLSv1_2, domain="pubmed", ) return response, error async def fetch_articles( pmids: list[int], full: bool, output_json: bool = False, ) -> str: """Fetch the text of a list of PubMed IDs.""" response, error = await call_pubtator_api(pmids, full) # PubTator API returns full text even when full=False exclude_fields = {"full_text"} if not full else set() # noinspection DuplicatedCode if error: data: list[dict[str, Any]] = [ {"error": f"Error {error.code}: {error.message}"} ] else: data = [ article.model_dump( mode="json", exclude_none=True, exclude=exclude_fields, ) for article in (response.articles if response else []) ] if data and not output_json: return render.to_markdown(data) else: return json.dumps(data, indent=2) def is_doi(identifier: str) -> bool: """Check if the identifier is a DOI.""" # DOI pattern: starts with 10. followed by numbers/slash/alphanumeric doi_pattern = r"^10\.\d{4,9}/[\-._;()/:\w]+$" return bool(re.match(doi_pattern, str(identifier))) def is_pmid(identifier: str) -> bool: """Check if the identifier is a PubMed ID.""" # PMID is a numeric string return str(identifier).isdigit() async def _article_details( call_benefit: Annotated[ str, "Define and summarize why this function is being called and the intended benefit", ], pmid, ) -> str: """ Retrieves details for a single article given its identifier. Parameters: - call_benefit: Define and summarize why this function is being called and the intended benefit - pmid: An article identifier - either a PubMed ID (e.g., 34397683) or DOI (e.g., 10.1101/2024.01.20.23288905) Process: - For PMIDs: Calls the PubTator3 API to fetch the article's title, abstract, and full text (if available) - For DOIs: Calls Europe PMC API to fetch preprint details Output: A JSON formatted string containing the retrieved article content. """ identifier = str(pmid) # Check if it's a DOI (Europe PMC preprint) if is_doi(identifier): from .preprints import fetch_europe_pmc_article return await fetch_europe_pmc_article(identifier, output_json=True) # Check if it's a PMID (PubMed article) elif is_pmid(identifier): return await fetch_articles( [int(identifier)], full=True, output_json=True ) else: # Unknown identifier format return json.dumps( [ { "error": f"Invalid identifier format: {identifier}. Expected either a PMID (numeric) or DOI (10.xxxx/xxxx format)." } ], indent=2, ) ``` -------------------------------------------------------------------------------- /docs/concepts/02-the-deep-researcher-persona.md: -------------------------------------------------------------------------------- ```markdown # The Deep Researcher Persona ## Overview The Deep Researcher Persona is a core philosophy of BioMCP that transforms AI assistants into systematic biomedical research partners. This persona embodies the methodical approach of a dedicated biomedical researcher, enabling AI agents to conduct thorough literature reviews, analyze complex datasets, and synthesize findings into actionable insights. ## Why the Deep Researcher Persona? Traditional AI interactions often result in surface-level responses. The Deep Researcher Persona addresses this by: - **Enforcing Systematic Thinking**: Requiring the use of the `think` tool before any research operation - **Preventing Premature Conclusions**: Breaking complex queries into manageable research steps - **Ensuring Comprehensive Analysis**: Following a proven 10-step methodology - **Maintaining Research Rigor**: Documenting thought processes and decision rationale ## Core Traits and Personality The Deep Researcher embodies these characteristics: - **Curious and Methodical**: Always seeking deeper understanding through systematic investigation - **Evidence-Based**: Grounding all conclusions in concrete data from multiple sources - **Professional Voice**: Clear, concise scientific communication - **Collaborative**: Working as a research partner, not just an information retriever - **Objective**: Presenting balanced findings including contradictory evidence ## The 10-Step Sequential Thinking Process This methodology ensures comprehensive research coverage: ### 1. Problem Definition and Scope - Parse the research question to identify key concepts - Define clear objectives and expected deliverables - Establish research boundaries and constraints ### 2. Initial Knowledge Assessment - Evaluate existing knowledge on the topic - Identify knowledge gaps requiring investigation - Form initial hypotheses to guide research ### 3. Search Strategy Development - Design comprehensive search queries - Select appropriate databases and tools - Plan iterative search refinements ### 4. Data Collection and Retrieval - Execute searches across multiple sources (PubTator3, ClinicalTrials.gov, variant databases) - Collect relevant articles, trials, and annotations - Document search parameters and results ### 5. Quality Assessment and Filtering - Evaluate source credibility and relevance - Apply inclusion/exclusion criteria - Prioritize high-impact findings ### 6. Information Extraction - Extract key findings, methodologies, and conclusions - Identify patterns and relationships - Note contradictions and uncertainties ### 7. Synthesis and Integration - Combine findings from multiple sources - Resolve contradictions when possible - Build coherent narrative from evidence ### 8. Critical Analysis - Evaluate strength of evidence - Identify limitations and biases - Consider alternative interpretations ### 9. Knowledge Synthesis - Create structured summary of findings - Highlight key insights and implications - Prepare actionable recommendations ### 10. Communication and Reporting - Format findings for target audience - Include proper citations and references - Provide clear next steps ## Mandatory Think Tool Usage **CRITICAL**: The `think` tool must ALWAYS be used first before any BioMCP operation. This is not optional. ```python # Correct pattern - ALWAYS start with think think(thought="Breaking down the research question...", thoughtNumber=1) # Then proceed with searches article_searcher(genes=["BRAF"], diseases=["melanoma"]) # INCORRECT - Never skip the think step article_searcher(genes=["BRAF"]) # ❌ Will produce suboptimal results ``` ## Implementation in Practice ### Example Research Flow 1. **User Query**: "What are the treatment options for BRAF V600E melanoma?" 2. **Think Step 1**: Problem decomposition ``` think(thought="Breaking down query: Need to find 1) BRAF V600E mutation significance, 2) current treatments, 3) clinical trials", thoughtNumber=1) ``` 3. **Think Step 2**: Search strategy ``` think(thought="Will search articles for BRAF inhibitors, then trials for V600E-specific treatments", thoughtNumber=2) ``` 4. **Execute Searches**: Following the planned strategy 5. **Synthesize**: Combine findings into comprehensive brief ### Research Brief Format Every research session concludes with a structured brief: ```markdown ## Research Brief: [Topic] ### Executive Summary - 3-5 bullet points of key findings - Clear, actionable insights ### Detailed Findings 1. **Literature Review** (X papers analyzed) - Key discoveries - Consensus findings - Contradictions noted 2. **Clinical Evidence** (Y trials reviewed) - Current treatment landscape - Emerging therapies - Trial enrollment opportunities 3. **Molecular Insights** - Variant annotations - Pathway implications - Biomarker relevance ### Recommendations - Evidence-based suggestions - Areas for further investigation - Clinical considerations ### References - Full citations for all sources - Direct links to primary data ``` ## Tool Inventory and Usage The Deep Researcher has access to 24 specialized tools: ### Core Research Tools - **think**: Sequential reasoning and planning - **article_searcher**: PubMed/PubTator3 literature search - **trial_searcher**: Clinical trials discovery - **variant_searcher**: Genetic variant annotations ### Specialized Analysis Tools - **gene_getter**: Gene function and pathway data - **drug_getter**: Medication information - **disease_getter**: Disease ontology and synonyms - **alphagenome_predictor**: Variant effect prediction ### Integration Features - **Automatic cBioPortal Integration**: Cancer genomics context for all gene searches - **BioThings Suite Access**: Real-time biomedical annotations - **NCI Database Integration**: Comprehensive cancer trial data ## Best Practices 1. **Always Think First**: Never skip the sequential thinking process 2. **Use Multiple Sources**: Cross-reference findings across databases 3. **Document Reasoning**: Explain why certain searches or filters were chosen 4. **Consider Context**: Account for disease stage, prior treatments, and patient factors 5. **Stay Current**: Leverage preprint integration for latest findings ## Community Impact The Deep Researcher Persona has transformed how researchers interact with biomedical data: - **Reduced Research Time**: From days to minutes for comprehensive reviews - **Improved Accuracy**: Systematic approach reduces missed connections - **Enhanced Collaboration**: Consistent methodology enables team research - **Democratized Access**: Complex research capabilities available to all ## Getting Started To use the Deep Researcher Persona: 1. Ensure BioMCP is installed and configured 2. Load the persona resource when starting your AI session 3. Always begin research queries with the think tool 4. Follow the 10-step methodology for comprehensive results Remember: The Deep Researcher Persona is not just a tool configuration—it's a systematic approach to biomedical research that ensures thorough, evidence-based insights every time. ``` -------------------------------------------------------------------------------- /src/biomcp/render.py: -------------------------------------------------------------------------------- ```python import json import re import textwrap from typing import Any MAX_WIDTH = 72 REMOVE_MULTI_LINES = re.compile(r"\s+") def dedupe_list_keep_order(lst: list[Any]) -> list[Any]: """ Remove duplicates from a list while preserving order. Uses string to handle elements like dicts that are not hashable. """ seen = set() data = [] for x in lst: if str(x) not in seen: data.append(x) seen.add(str(x)) return data def to_markdown(data: str | list | dict) -> str: """Convert a JSON string or already-parsed data (dict or list) into a simple Markdown representation. :param data: The input data, either as a JSON string, or a parsed list/dict. :return: A string containing the generated Markdown output. """ if isinstance(data, str): data = json.loads(data) if isinstance(data, list): new_data = [] for index, item in enumerate(data, start=1): new_data.append({f"Record {index}": item}) data = new_data lines: list[str] = [] process_any(data, [], lines) return ("\n".join(lines)).strip() + "\n" def wrap_preserve_newlines(text: str, width: int) -> list[str]: """For each line in the text (split by newlines), wrap it to 'width' columns. Blank lines are preserved. Returns a list of wrapped lines without inserting extra blank lines. :param text: The multiline string to wrap. :param width: Maximum line width for wrapping. :return: A list of lines after wrapping. """ wrapped_lines: list[str] = [] for line in text.splitlines(keepends=False): if not line.strip(): wrapped_lines.append("") continue # remove excessive spaces (pmid=38296628) line = REMOVE_MULTI_LINES.sub(" ", line) pieces = textwrap.wrap(line, width=width) wrapped_lines.extend(pieces) return wrapped_lines def append_line(lines: list[str], line: str) -> None: """Append a line to 'lines', avoiding consecutive blank lines. :param lines: The running list of lines to which we add. :param line: The line to append. """ line = line.rstrip() lines.append(line) def process_any( value: Any, path_keys: list[str], lines: list[str], ) -> None: """Dispatch function to handle dict, list, or scalar (str/int/float/bool). :param value: The current JSON data node. :param path_keys: The list of keys leading to this node (for headings). :param lines: The running list of output Markdown lines. """ if isinstance(value, dict): process_dict(value, path_keys, lines) elif isinstance(value, list): process_list(value, path_keys, lines) elif value is not None: render_key_value(lines, path_keys[-1], value) def process_dict(dct: dict, path_keys: list[str], lines: list[str]) -> None: """Handle a dictionary by printing a heading for the current path (if any), then processing key/value pairs in order: scalars first, then nested dicts, then lists. :param dct: The dictionary to process. :param path_keys: The list of keys leading to this dict (for heading). :param lines: The running list of output Markdown lines. """ if path_keys: level = min(len(path_keys), 5) heading_hash = "#" * level heading_text = transform_key(path_keys[-1]) # Blank line, then heading append_line(lines, "") append_line(lines, f"{heading_hash} {heading_text}") # Group keys by value type scalar_keys = [] dict_keys = [] list_keys = [] for key, val in dct.items(): if isinstance(val, str | int | float | bool) or val is None: scalar_keys.append(key) elif isinstance(val, dict): dict_keys.append(key) elif isinstance(val, list): list_keys.append(key) # Process scalars first for key in scalar_keys: next_path = path_keys + [key] process_any(dct[key], next_path, lines) # Process dicts second for key in dict_keys: next_path = path_keys + [key] process_any(dct[key], next_path, lines) # Process lists last for key in list_keys: next_path = path_keys + [key] process_any(dct[key], next_path, lines) def process_list(lst: list, path_keys: list[str], lines: list[str]) -> None: """If all items in the list are scalar, attempt to render them on one line if it fits, otherwise use bullet points. Otherwise, we recursively process each item. :param lst: The list of items to process. :param path_keys: The keys leading to this list. :param lines: The running list of Markdown lines. """ all_scalars = all(isinstance(i, str | int | float | bool) for i in lst) lst = dedupe_list_keep_order(lst) if path_keys and all_scalars: key = path_keys[-1] process_scalar_list(key, lines, lst) else: for item in lst: process_any(item, path_keys, lines) def process_scalar_list(key: str, lines: list[str], lst: list) -> None: """Print a list of scalars either on one line as "Key: item1, item2, ..." if it fits within MAX_WIDTH, otherwise print a bullet list. :param key: The key name for this list of scalars. :param lines: The running list of Markdown lines. :param lst: The actual list of scalar items. """ label = transform_key(key) items_str = ", ".join(str(item) for item in lst) single_line = f"{label}: {items_str}" if len(single_line) <= MAX_WIDTH: append_line(lines, single_line) else: # bullet list append_line(lines, f"{label}:") for item in lst: bullet = f"- {item}" append_line(lines, bullet) def render_key_value(lines: list[str], key: str, value: Any) -> None: """Render a single "key: value" pair. If the value is a long string, we do multiline wrapping with an indentation for clarity. Otherwise, it appears on the same line. :param lines: The running list of Markdown lines. :param key: The raw key name (untransformed). :param value: The value associated with this key. """ label = transform_key(key) val_str = str(value) # If the value is a fairly long string, do multiline if isinstance(value, str) and len(value) > MAX_WIDTH: append_line(lines, f"{label}:") for wrapped in wrap_preserve_newlines(val_str, MAX_WIDTH): append_line(lines, " " + wrapped) else: append_line(lines, f"{label}: {val_str}") def transform_key(s: str) -> str: # Replace underscores with spaces. s = s.replace("_", " ") # Insert a space between an uppercase letter followed by an uppercase letter then a lowercase letter. s = re.sub(r"(?<=[A-Z])(?=[A-Z][a-z])", " ", s) # Insert a space between a lowercase letter or digit and an uppercase letter. s = re.sub(r"(?<=[a-z0-9])(?=[A-Z])", " ", s) words = s.split() transformed_words = [] for word in words: transformed_words.append(word.capitalize()) return " ".join(transformed_words) ```