#
tokens: 49656/50000 32/303 files (page 3/15)
lines: off (toggle) GitHub
raw markdown copy
This is page 3 of 15. Use http://codebase.md/genomoncology/biomcp?page={x} to view the full context.

# Directory Structure

```
├── .github
│   ├── actions
│   │   └── setup-python-env
│   │       └── action.yml
│   ├── dependabot.yml
│   └── workflows
│       ├── ci.yml
│       ├── deploy-docs.yml
│       ├── main.yml.disabled
│       ├── on-release-main.yml
│       └── validate-codecov-config.yml
├── .gitignore
├── .pre-commit-config.yaml
├── BIOMCP_DATA_FLOW.md
├── CHANGELOG.md
├── CNAME
├── codecov.yaml
├── docker-compose.yml
├── Dockerfile
├── docs
│   ├── apis
│   │   ├── error-codes.md
│   │   ├── overview.md
│   │   └── python-sdk.md
│   ├── assets
│   │   ├── biomcp-cursor-locations.png
│   │   ├── favicon.ico
│   │   ├── icon.png
│   │   ├── logo.png
│   │   ├── mcp_architecture.txt
│   │   └── remote-connection
│   │       ├── 00_connectors.png
│   │       ├── 01_add_custom_connector.png
│   │       ├── 02_connector_enabled.png
│   │       ├── 03_connect_to_biomcp.png
│   │       ├── 04_select_google_oauth.png
│   │       └── 05_success_connect.png
│   ├── backend-services-reference
│   │   ├── 01-overview.md
│   │   ├── 02-biothings-suite.md
│   │   ├── 03-cbioportal.md
│   │   ├── 04-clinicaltrials-gov.md
│   │   ├── 05-nci-cts-api.md
│   │   ├── 06-pubtator3.md
│   │   └── 07-alphagenome.md
│   ├── blog
│   │   ├── ai-assisted-clinical-trial-search-analysis.md
│   │   ├── images
│   │   │   ├── deep-researcher-video.png
│   │   │   ├── researcher-announce.png
│   │   │   ├── researcher-drop-down.png
│   │   │   ├── researcher-prompt.png
│   │   │   ├── trial-search-assistant.png
│   │   │   └── what_is_biomcp_thumbnail.png
│   │   └── researcher-persona-resource.md
│   ├── changelog.md
│   ├── CNAME
│   ├── concepts
│   │   ├── 01-what-is-biomcp.md
│   │   ├── 02-the-deep-researcher-persona.md
│   │   └── 03-sequential-thinking-with-the-think-tool.md
│   ├── developer-guides
│   │   ├── 01-server-deployment.md
│   │   ├── 02-contributing-and-testing.md
│   │   ├── 03-third-party-endpoints.md
│   │   ├── 04-transport-protocol.md
│   │   ├── 05-error-handling.md
│   │   ├── 06-http-client-and-caching.md
│   │   ├── 07-performance-optimizations.md
│   │   └── generate_endpoints.py
│   ├── faq-condensed.md
│   ├── FDA_SECURITY.md
│   ├── genomoncology.md
│   ├── getting-started
│   │   ├── 01-quickstart-cli.md
│   │   ├── 02-claude-desktop-integration.md
│   │   └── 03-authentication-and-api-keys.md
│   ├── how-to-guides
│   │   ├── 01-find-articles-and-cbioportal-data.md
│   │   ├── 02-find-trials-with-nci-and-biothings.md
│   │   ├── 03-get-comprehensive-variant-annotations.md
│   │   ├── 04-predict-variant-effects-with-alphagenome.md
│   │   ├── 05-logging-and-monitoring-with-bigquery.md
│   │   └── 06-search-nci-organizations-and-interventions.md
│   ├── index.md
│   ├── policies.md
│   ├── reference
│   │   ├── architecture-diagrams.md
│   │   ├── quick-architecture.md
│   │   ├── quick-reference.md
│   │   └── visual-architecture.md
│   ├── robots.txt
│   ├── stylesheets
│   │   ├── announcement.css
│   │   └── extra.css
│   ├── troubleshooting.md
│   ├── tutorials
│   │   ├── biothings-prompts.md
│   │   ├── claude-code-biomcp-alphagenome.md
│   │   ├── nci-prompts.md
│   │   ├── openfda-integration.md
│   │   ├── openfda-prompts.md
│   │   ├── pydantic-ai-integration.md
│   │   └── remote-connection.md
│   ├── user-guides
│   │   ├── 01-command-line-interface.md
│   │   ├── 02-mcp-tools-reference.md
│   │   └── 03-integrating-with-ides-and-clients.md
│   └── workflows
│       └── all-workflows.md
├── example_scripts
│   ├── mcp_integration.py
│   └── python_sdk.py
├── glama.json
├── LICENSE
├── lzyank.toml
├── Makefile
├── mkdocs.yml
├── package-lock.json
├── package.json
├── pyproject.toml
├── README.md
├── scripts
│   ├── check_docs_in_mkdocs.py
│   ├── check_http_imports.py
│   └── generate_endpoints_doc.py
├── smithery.yaml
├── src
│   └── biomcp
│       ├── __init__.py
│       ├── __main__.py
│       ├── articles
│       │   ├── __init__.py
│       │   ├── autocomplete.py
│       │   ├── fetch.py
│       │   ├── preprints.py
│       │   ├── search_optimized.py
│       │   ├── search.py
│       │   └── unified.py
│       ├── biomarkers
│       │   ├── __init__.py
│       │   └── search.py
│       ├── cbioportal_helper.py
│       ├── circuit_breaker.py
│       ├── cli
│       │   ├── __init__.py
│       │   ├── articles.py
│       │   ├── biomarkers.py
│       │   ├── diseases.py
│       │   ├── health.py
│       │   ├── interventions.py
│       │   ├── main.py
│       │   ├── openfda.py
│       │   ├── organizations.py
│       │   ├── server.py
│       │   ├── trials.py
│       │   └── variants.py
│       ├── connection_pool.py
│       ├── constants.py
│       ├── core.py
│       ├── diseases
│       │   ├── __init__.py
│       │   ├── getter.py
│       │   └── search.py
│       ├── domain_handlers.py
│       ├── drugs
│       │   ├── __init__.py
│       │   └── getter.py
│       ├── exceptions.py
│       ├── genes
│       │   ├── __init__.py
│       │   └── getter.py
│       ├── http_client_simple.py
│       ├── http_client.py
│       ├── individual_tools.py
│       ├── integrations
│       │   ├── __init__.py
│       │   ├── biothings_client.py
│       │   └── cts_api.py
│       ├── interventions
│       │   ├── __init__.py
│       │   ├── getter.py
│       │   └── search.py
│       ├── logging_filter.py
│       ├── metrics_handler.py
│       ├── metrics.py
│       ├── openfda
│       │   ├── __init__.py
│       │   ├── adverse_events_helpers.py
│       │   ├── adverse_events.py
│       │   ├── cache.py
│       │   ├── constants.py
│       │   ├── device_events_helpers.py
│       │   ├── device_events.py
│       │   ├── drug_approvals.py
│       │   ├── drug_labels_helpers.py
│       │   ├── drug_labels.py
│       │   ├── drug_recalls_helpers.py
│       │   ├── drug_recalls.py
│       │   ├── drug_shortages_detail_helpers.py
│       │   ├── drug_shortages_helpers.py
│       │   ├── drug_shortages.py
│       │   ├── exceptions.py
│       │   ├── input_validation.py
│       │   ├── rate_limiter.py
│       │   ├── utils.py
│       │   └── validation.py
│       ├── organizations
│       │   ├── __init__.py
│       │   ├── getter.py
│       │   └── search.py
│       ├── parameter_parser.py
│       ├── prefetch.py
│       ├── query_parser.py
│       ├── query_router.py
│       ├── rate_limiter.py
│       ├── render.py
│       ├── request_batcher.py
│       ├── resources
│       │   ├── __init__.py
│       │   ├── getter.py
│       │   ├── instructions.md
│       │   └── researcher.md
│       ├── retry.py
│       ├── router_handlers.py
│       ├── router.py
│       ├── shared_context.py
│       ├── thinking
│       │   ├── __init__.py
│       │   ├── sequential.py
│       │   └── session.py
│       ├── thinking_tool.py
│       ├── thinking_tracker.py
│       ├── trials
│       │   ├── __init__.py
│       │   ├── getter.py
│       │   ├── nci_getter.py
│       │   ├── nci_search.py
│       │   └── search.py
│       ├── utils
│       │   ├── __init__.py
│       │   ├── cancer_types_api.py
│       │   ├── cbio_http_adapter.py
│       │   ├── endpoint_registry.py
│       │   ├── gene_validator.py
│       │   ├── metrics.py
│       │   ├── mutation_filter.py
│       │   ├── query_utils.py
│       │   ├── rate_limiter.py
│       │   └── request_cache.py
│       ├── variants
│       │   ├── __init__.py
│       │   ├── alphagenome.py
│       │   ├── cancer_types.py
│       │   ├── cbio_external_client.py
│       │   ├── cbioportal_mutations.py
│       │   ├── cbioportal_search_helpers.py
│       │   ├── cbioportal_search.py
│       │   ├── constants.py
│       │   ├── external.py
│       │   ├── filters.py
│       │   ├── getter.py
│       │   ├── links.py
│       │   └── search.py
│       └── workers
│           ├── __init__.py
│           ├── worker_entry_stytch.js
│           ├── worker_entry.js
│           └── worker.py
├── tests
│   ├── bdd
│   │   ├── cli_help
│   │   │   ├── help.feature
│   │   │   └── test_help.py
│   │   ├── conftest.py
│   │   ├── features
│   │   │   └── alphagenome_integration.feature
│   │   ├── fetch_articles
│   │   │   ├── fetch.feature
│   │   │   └── test_fetch.py
│   │   ├── get_trials
│   │   │   ├── get.feature
│   │   │   └── test_get.py
│   │   ├── get_variants
│   │   │   ├── get.feature
│   │   │   └── test_get.py
│   │   ├── search_articles
│   │   │   ├── autocomplete.feature
│   │   │   ├── search.feature
│   │   │   ├── test_autocomplete.py
│   │   │   └── test_search.py
│   │   ├── search_trials
│   │   │   ├── search.feature
│   │   │   └── test_search.py
│   │   ├── search_variants
│   │   │   ├── search.feature
│   │   │   └── test_search.py
│   │   └── steps
│   │       └── test_alphagenome_steps.py
│   ├── config
│   │   └── test_smithery_config.py
│   ├── conftest.py
│   ├── data
│   │   ├── ct_gov
│   │   │   ├── clinical_trials_api_v2.yaml
│   │   │   ├── trials_NCT04280705.json
│   │   │   └── trials_NCT04280705.txt
│   │   ├── myvariant
│   │   │   ├── myvariant_api.yaml
│   │   │   ├── myvariant_field_descriptions.csv
│   │   │   ├── variants_full_braf_v600e.json
│   │   │   ├── variants_full_braf_v600e.txt
│   │   │   └── variants_part_braf_v600_multiple.json
│   │   ├── openfda
│   │   │   ├── drugsfda_detail.json
│   │   │   ├── drugsfda_search.json
│   │   │   ├── enforcement_detail.json
│   │   │   └── enforcement_search.json
│   │   └── pubtator
│   │       ├── pubtator_autocomplete.json
│   │       └── pubtator3_paper.txt
│   ├── integration
│   │   ├── test_openfda_integration.py
│   │   ├── test_preprints_integration.py
│   │   ├── test_simple.py
│   │   └── test_variants_integration.py
│   ├── tdd
│   │   ├── articles
│   │   │   ├── test_autocomplete.py
│   │   │   ├── test_cbioportal_integration.py
│   │   │   ├── test_fetch.py
│   │   │   ├── test_preprints.py
│   │   │   ├── test_search.py
│   │   │   └── test_unified.py
│   │   ├── conftest.py
│   │   ├── drugs
│   │   │   ├── __init__.py
│   │   │   └── test_drug_getter.py
│   │   ├── openfda
│   │   │   ├── __init__.py
│   │   │   ├── test_adverse_events.py
│   │   │   ├── test_device_events.py
│   │   │   ├── test_drug_approvals.py
│   │   │   ├── test_drug_labels.py
│   │   │   ├── test_drug_recalls.py
│   │   │   ├── test_drug_shortages.py
│   │   │   └── test_security.py
│   │   ├── test_biothings_integration_real.py
│   │   ├── test_biothings_integration.py
│   │   ├── test_circuit_breaker.py
│   │   ├── test_concurrent_requests.py
│   │   ├── test_connection_pool.py
│   │   ├── test_domain_handlers.py
│   │   ├── test_drug_approvals.py
│   │   ├── test_drug_recalls.py
│   │   ├── test_drug_shortages.py
│   │   ├── test_endpoint_documentation.py
│   │   ├── test_error_scenarios.py
│   │   ├── test_europe_pmc_fetch.py
│   │   ├── test_mcp_integration.py
│   │   ├── test_mcp_tools.py
│   │   ├── test_metrics.py
│   │   ├── test_nci_integration.py
│   │   ├── test_nci_mcp_tools.py
│   │   ├── test_network_policies.py
│   │   ├── test_offline_mode.py
│   │   ├── test_openfda_unified.py
│   │   ├── test_pten_r173_search.py
│   │   ├── test_render.py
│   │   ├── test_request_batcher.py.disabled
│   │   ├── test_retry.py
│   │   ├── test_router.py
│   │   ├── test_shared_context.py.disabled
│   │   ├── test_unified_biothings.py
│   │   ├── thinking
│   │   │   ├── __init__.py
│   │   │   └── test_sequential.py
│   │   ├── trials
│   │   │   ├── test_backward_compatibility.py
│   │   │   ├── test_getter.py
│   │   │   └── test_search.py
│   │   ├── utils
│   │   │   ├── test_gene_validator.py
│   │   │   ├── test_mutation_filter.py
│   │   │   ├── test_rate_limiter.py
│   │   │   └── test_request_cache.py
│   │   ├── variants
│   │   │   ├── constants.py
│   │   │   ├── test_alphagenome_api_key.py
│   │   │   ├── test_alphagenome_comprehensive.py
│   │   │   ├── test_alphagenome.py
│   │   │   ├── test_cbioportal_mutations.py
│   │   │   ├── test_cbioportal_search.py
│   │   │   ├── test_external_integration.py
│   │   │   ├── test_external.py
│   │   │   ├── test_extract_gene_aa_change.py
│   │   │   ├── test_filters.py
│   │   │   ├── test_getter.py
│   │   │   ├── test_links.py
│   │   │   └── test_search.py
│   │   └── workers
│   │       └── test_worker_sanitization.js
│   └── test_pydantic_ai_integration.py
├── THIRD_PARTY_ENDPOINTS.md
├── tox.ini
├── uv.lock
└── wrangler.toml
```

# Files

--------------------------------------------------------------------------------
/src/biomcp/rate_limiter.py:
--------------------------------------------------------------------------------

```python
"""Rate limiting implementation for BioMCP API calls."""

import asyncio
import time
from collections import defaultdict
from contextlib import asynccontextmanager

from .constants import (
    DEFAULT_BURST_SIZE,
    DEFAULT_RATE_LIMIT_PER_SECOND,
)
from .exceptions import BioMCPError


class RateLimitExceeded(BioMCPError):
    """Raised when rate limit is exceeded."""

    def __init__(self, domain: str, limit: int, window: int):
        message = f"Rate limit exceeded for {domain}: {limit} requests per {window} seconds"
        super().__init__(
            message, {"domain": domain, "limit": limit, "window": window}
        )


class RateLimiter:
    """Token bucket rate limiter implementation."""

    def __init__(
        self,
        requests_per_second: float = DEFAULT_RATE_LIMIT_PER_SECOND,
        burst_size: int = DEFAULT_BURST_SIZE,
    ):
        """Initialize rate limiter.

        Args:
            requests_per_second: Sustained request rate
            burst_size: Maximum burst capacity
        """
        self.rate = requests_per_second
        self.burst_size = burst_size
        self.tokens = float(burst_size)
        self.last_update = time.monotonic()
        self._lock = asyncio.Lock()

    async def acquire(self, tokens: int = 1) -> None:
        """Acquire tokens from the bucket."""
        async with self._lock:
            now = time.monotonic()
            elapsed = now - self.last_update
            self.last_update = now

            # Add tokens based on elapsed time
            self.tokens = min(
                self.burst_size, self.tokens + elapsed * self.rate
            )

            if self.tokens < tokens:
                # Calculate wait time
                wait_time = (tokens - self.tokens) / self.rate
                await asyncio.sleep(wait_time)
                self.tokens = 0
            else:
                self.tokens -= tokens

    @asynccontextmanager
    async def limit(self):
        """Context manager for rate limiting."""
        await self.acquire()
        yield


class DomainRateLimiter:
    """Rate limiter with per-domain limits."""

    def __init__(self, default_rps: float = 10.0, default_burst: int = 20):
        """Initialize domain rate limiter.

        Args:
            default_rps: Default requests per second
            default_burst: Default burst size
        """
        self.default_rps = default_rps
        self.default_burst = default_burst
        self.limiters: dict[str, RateLimiter] = {}
        self.domain_configs = {
            "article": {"rps": 20.0, "burst": 40},  # PubMed can handle more
            "trial": {"rps": 10.0, "burst": 20},  # ClinicalTrials.gov standard
            "thinking": {"rps": 50.0, "burst": 100},  # Local processing
            "mygene": {"rps": 10.0, "burst": 20},  # MyGene.info
            "mydisease": {"rps": 10.0, "burst": 20},  # MyDisease.info
            "mychem": {"rps": 10.0, "burst": 20},  # MyChem.info
            "myvariant": {"rps": 15.0, "burst": 30},  # MyVariant.info
        }

    def get_limiter(self, domain: str) -> RateLimiter:
        """Get or create rate limiter for domain."""
        if domain not in self.limiters:
            config = self.domain_configs.get(domain, {})
            rps = config.get("rps", self.default_rps)
            burst = config.get("burst", self.default_burst)
            self.limiters[domain] = RateLimiter(rps, int(burst))
        return self.limiters[domain]

    @asynccontextmanager
    async def limit(self, domain: str):
        """Rate limit context manager for a domain."""
        limiter = self.get_limiter(domain)
        async with limiter.limit():
            yield


class SlidingWindowRateLimiter:
    """Sliding window rate limiter for user/IP based limiting."""

    def __init__(self, requests: int = 100, window_seconds: int = 60):
        """Initialize sliding window rate limiter.

        Args:
            requests: Maximum requests per window
            window_seconds: Window size in seconds
        """
        self.max_requests = requests
        self.window_seconds = window_seconds
        self.requests: dict[str, list[float]] = defaultdict(list)
        self._lock = asyncio.Lock()

    async def check_limit(self, key: str) -> bool:
        """Check if request is allowed for key."""
        async with self._lock:
            now = time.time()
            cutoff = now - self.window_seconds

            # Remove old requests
            self.requests[key] = [
                req_time
                for req_time in self.requests[key]
                if req_time > cutoff
            ]

            # Check limit
            if len(self.requests[key]) >= self.max_requests:
                return False

            # Add current request
            self.requests[key].append(now)
            return True

    async def acquire(self, key: str) -> None:
        """Acquire permission to make request."""
        if not await self.check_limit(key):
            raise RateLimitExceeded(
                key, self.max_requests, self.window_seconds
            )


# Global instances
domain_limiter = DomainRateLimiter()
user_limiter = SlidingWindowRateLimiter(
    requests=1000, window_seconds=3600
)  # 1000 req/hour


async def rate_limit_domain(domain: str) -> None:
    """Apply rate limiting for a domain."""
    async with domain_limiter.limit(domain):
        pass


async def rate_limit_user(user_id: str | None = None) -> None:
    """Apply rate limiting for a user."""
    if user_id:
        await user_limiter.acquire(user_id)

```

--------------------------------------------------------------------------------
/src/biomcp/http_client_simple.py:
--------------------------------------------------------------------------------

```python
"""Helper functions for simpler HTTP client operations."""

import asyncio
import contextlib
import json
import os
import ssl

import httpx

# Global connection pools per SSL context
_connection_pools: dict[str, httpx.AsyncClient] = {}
_pool_lock = asyncio.Lock()


def close_all_pools():
    """Close all connection pools. Useful for cleanup in tests."""
    global _connection_pools
    for pool in _connection_pools.values():
        if pool and not pool.is_closed:
            # Schedule the close in a safe way
            try:
                # Store task reference to avoid garbage collection
                close_task = asyncio.create_task(pool.aclose())
                # Optionally add a callback to handle completion
                close_task.add_done_callback(lambda t: None)
            except RuntimeError:
                # If no event loop is running, close synchronously
                pool._transport.close()
    _connection_pools.clear()


async def get_connection_pool(
    verify: ssl.SSLContext | str | bool,
    timeout: httpx.Timeout,
) -> httpx.AsyncClient:
    """Get or create a shared connection pool for the given SSL context."""
    global _connection_pools

    # Create a key for the pool based on verify setting
    if isinstance(verify, ssl.SSLContext):
        pool_key = f"ssl_{id(verify)}"
    else:
        pool_key = str(verify)

    async with _pool_lock:
        pool = _connection_pools.get(pool_key)
        if pool is None or pool.is_closed:
            # Create a new connection pool with optimized settings
            pool = httpx.AsyncClient(
                verify=verify,
                http2=False,  # HTTP/2 can add overhead for simple requests
                timeout=timeout,
                limits=httpx.Limits(
                    max_keepalive_connections=20,  # Reuse connections
                    max_connections=100,  # Total connection limit
                    keepalive_expiry=30,  # Keep connections alive for 30s
                ),
                # Enable connection pooling
                transport=httpx.AsyncHTTPTransport(
                    retries=0,  # We handle retries at a higher level
                ),
            )
            _connection_pools[pool_key] = pool
        return pool


async def execute_http_request(  # noqa: C901
    method: str,
    url: str,
    params: dict,
    verify: ssl.SSLContext | str | bool,
    headers: dict[str, str] | None = None,
) -> tuple[int, str]:
    """Execute the actual HTTP request using connection pooling.

    Args:
        method: HTTP method (GET or POST)
        url: Target URL
        params: Request parameters
        verify: SSL verification settings
        headers: Optional custom headers

    Returns:
        Tuple of (status_code, response_text)

    Raises:
        ConnectionError: For connection failures
        TimeoutError: For timeout errors
    """
    from .constants import HTTP_TIMEOUT_SECONDS

    try:
        # Extract custom headers from params if present
        custom_headers = headers or {}
        if "_headers" in params:
            with contextlib.suppress(json.JSONDecodeError, TypeError):
                custom_headers.update(json.loads(params.pop("_headers")))

        # Use the configured timeout from constants
        timeout = httpx.Timeout(HTTP_TIMEOUT_SECONDS)

        # Use connection pooling with proper error handling
        use_pool = (
            os.getenv("BIOMCP_USE_CONNECTION_POOL", "true").lower() == "true"
        )

        if use_pool:
            try:
                # Use the new connection pool manager
                from ..connection_pool import get_connection_pool as get_pool

                client = await get_pool(verify, timeout)
                should_close = False
            except Exception:
                # Fallback to creating a new client
                client = httpx.AsyncClient(
                    verify=verify, http2=False, timeout=timeout
                )
                should_close = True
        else:
            # Create a new client for each request
            client = httpx.AsyncClient(
                verify=verify, http2=False, timeout=timeout
            )
            should_close = True

        try:
            # Make the request
            if method.upper() == "GET":
                resp = await client.get(
                    url, params=params, headers=custom_headers
                )
            elif method.upper() == "POST":
                resp = await client.post(
                    url, json=params, headers=custom_headers
                )
            else:
                from .constants import HTTP_ERROR_CODE_UNSUPPORTED_METHOD

                return (
                    HTTP_ERROR_CODE_UNSUPPORTED_METHOD,
                    f"Unsupported method {method}",
                )

            # Check for empty response
            if not resp.text:
                return resp.status_code, "{}"

            return resp.status_code, resp.text
        finally:
            # Only close if we created a new client
            if should_close:
                await client.aclose()

    except httpx.ConnectError as exc:
        raise ConnectionError(f"Failed to connect to {url}: {exc}") from exc
    except httpx.TimeoutException as exc:
        raise TimeoutError(f"Request to {url} timed out: {exc}") from exc
    except httpx.HTTPError as exc:
        error_msg = str(exc) if str(exc) else "Network connectivity error"
        from .constants import HTTP_ERROR_CODE_NETWORK

        return HTTP_ERROR_CODE_NETWORK, error_msg

```

--------------------------------------------------------------------------------
/docs/developer-guides/06-http-client-and-caching.md:
--------------------------------------------------------------------------------

```markdown
# BioMCP HTTP Client Guide

## Overview

BioMCP uses a centralized HTTP client for all external API calls. This provides:

- Consistent error handling and retry logic
- Request/response caching
- Rate limiting per domain
- Circuit breaker for fault tolerance
- Offline mode support
- Comprehensive endpoint tracking

## Migration from Direct HTTP Libraries

### Before (Direct httpx usage):

```python
import httpx

async def fetch_gene(gene: str):
    async with httpx.AsyncClient() as client:
        response = await client.get(f"https://api.example.com/genes/{gene}")
        response.raise_for_status()
        return response.json()
```

### After (Centralized client):

```python
from biomcp import http_client

async def fetch_gene(gene: str):
    data, error = await http_client.request_api(
        url=f"https://api.example.com/genes/{gene}",
        request={},
        domain="example"
    )
    if error:
        # Handle error consistently
        return None
    return data
```

## Error Handling

The centralized client uses a consistent error handling pattern:

```python
result, error = await http_client.request_api(...)

if error:
    # error is a RequestError object with:
    # - error.code: HTTP status code or error type
    # - error.message: Human-readable error message
    # - error.details: Additional context
    logger.error(f"Request failed: {error.message}")
    return None  # or handle appropriately
```

### Error Handling Guidelines

1. **For optional data**: Return `None` when the data is not critical
2. **For required data**: Raise an exception or return an error to the caller
3. **For batch operations**: Collect errors and report at the end
4. **For user-facing operations**: Provide clear, actionable error messages

## Creating Domain-Specific Adapters

For complex APIs, create an adapter class:

```python
from biomcp import http_client
from biomcp.http_client import RequestError

class MyAPIAdapter:
    """Adapter for MyAPI using centralized HTTP client."""

    def __init__(self):
        self.base_url = "https://api.example.com"

    async def get_resource(self, resource_id: str) -> tuple[dict | None, RequestError | None]:
        """Fetch a resource by ID.

        Returns:
            Tuple of (data, error) where one is always None
        """
        return await http_client.request_api(
            url=f"{self.base_url}/resources/{resource_id}",
            request={},
            domain="example",
            endpoint_key="example_resources"
        )
```

## Configuration

### Cache TTL (Time To Live)

```python
# Cache for 1 hour (3600 seconds)
data, error = await http_client.request_api(
    url=url,
    request=request,
    cache_ttl=3600
)

# Disable caching for this request
data, error = await http_client.request_api(
    url=url,
    request=request,
    cache_ttl=0
)
```

### Rate Limiting

Rate limits are configured per domain in `http_client.py`:

```python
# Default rate limits
rate_limits = {
    "ncbi.nlm.nih.gov": 20,  # 20 requests/second
    "clinicaltrials.gov": 10,  # 10 requests/second
    "myvariant.info": 1000/3600,  # 1000 requests/hour
}
```

### Circuit Breaker

The circuit breaker prevents cascading failures:

- **Closed**: Normal operation
- **Open**: Failing fast after threshold exceeded
- **Half-Open**: Testing if service recovered

Configure thresholds:

```python
CIRCUIT_BREAKER_FAILURE_THRESHOLD = 5  # Open after 5 failures
CIRCUIT_BREAKER_RECOVERY_TIMEOUT = 60  # Try again after 60 seconds
```

## Offline Mode

Enable offline mode to only serve cached responses:

```bash
export BIOMCP_OFFLINE=true
biomcp run
```

In offline mode:

- Only cached responses are returned
- No external HTTP requests are made
- Missing cache entries return None with appropriate error

## Performance Tuning

### Connection Pooling

The HTTP client maintains connection pools per domain:

```python
# Configure in http_client_simple.py
limits = httpx.Limits(
    max_keepalive_connections=20,
    max_connections=100,
    keepalive_expiry=30
)
```

### Concurrent Requests

For parallel requests to the same API:

```python
import asyncio

# Fetch multiple resources concurrently
tasks = [
    http_client.request_api(f"/resource/{i}", {}, domain="example")
    for i in range(10)
]
results = await asyncio.gather(*tasks)
```

## Monitoring and Debugging

### Request Metrics

The client tracks metrics per endpoint:

- Request count
- Error count
- Cache hit/miss ratio
- Average response time

Access metrics:

```python
from biomcp.http_client import get_metrics
metrics = get_metrics()
```

### Debug Logging

Enable debug logging to see all HTTP requests:

```python
import logging
logging.getLogger("biomcp.http_client").setLevel(logging.DEBUG)
```

## Best Practices

1. **Always use the centralized client** for external HTTP calls
2. **Register new endpoints** in the endpoint registry
3. **Set appropriate cache TTLs** based on data volatility
4. **Handle errors gracefully** with user-friendly messages
5. **Test with offline mode** to ensure cache coverage
6. **Monitor rate limits** to avoid API throttling
7. **Use domain-specific adapters** for complex APIs

## Endpoint Registration

Register new endpoints in `endpoint_registry.py`:

```python
registry.register(
    "my_api_endpoint",
    EndpointInfo(
        url="https://api.example.com/v1/data",
        category=EndpointCategory.BIOMEDICAL_LITERATURE,
        data_types=[DataType.RESEARCH_ARTICLES],
        description="My API for fetching data",
        compliance_notes="Public API, no PII",
        rate_limit="100 requests/minute"
    )
)
```

This ensures the endpoint is documented and tracked properly.

```

--------------------------------------------------------------------------------
/tests/tdd/articles/test_cbioportal_integration.py:
--------------------------------------------------------------------------------

```python
"""Test cBioPortal integration with article searches."""

import json

import pytest

from biomcp.articles.search import PubmedRequest
from biomcp.articles.unified import search_articles_unified


class TestArticleCBioPortalIntegration:
    """Test that cBioPortal summaries appear in article searches."""

    @pytest.mark.asyncio
    @pytest.mark.integration
    async def test_article_search_with_gene_includes_cbioportal(self):
        """Test that searching articles for a gene includes cBioPortal summary."""
        request = PubmedRequest(
            genes=["BRAF"],
            keywords=["melanoma"],
        )

        # Test markdown output
        result = await search_articles_unified(
            request,
            include_pubmed=True,
            include_preprints=False,
            output_json=False,
        )

        # Should include cBioPortal summary
        assert "cBioPortal Summary for BRAF" in result
        assert "Mutation Frequency" in result
        # Top Hotspots is only included when mutations are found
        # When cBioPortal API returns empty data, it won't be present
        if "0.0%" not in result:  # If mutation frequency is not 0
            assert "Top Hotspots" in result
        assert "---" in result  # Separator between summary and articles

        # Should still include article results
        assert "pmid" in result or "Title" in result or "Record" in result

    @pytest.mark.asyncio
    @pytest.mark.integration
    async def test_article_search_json_with_gene(self):
        """Test JSON output includes cBioPortal summary."""
        request = PubmedRequest(
            genes=["TP53"],
            keywords=["cancer"],
        )

        result = await search_articles_unified(
            request,
            include_pubmed=True,
            include_preprints=False,
            output_json=True,
        )

        # Parse JSON
        data = json.loads(result)

        # Should have both summary and articles
        assert "cbioportal_summary" in data
        assert "articles" in data
        assert "TP53" in data["cbioportal_summary"]
        assert isinstance(data["articles"], list)
        assert len(data["articles"]) > 0

    @pytest.mark.asyncio
    @pytest.mark.integration
    async def test_article_search_without_gene_no_cbioportal(self):
        """Test that searches without genes don't include cBioPortal summary."""
        request = PubmedRequest(
            diseases=["hypertension"],
            keywords=["treatment"],
        )

        # Test markdown output
        result = await search_articles_unified(
            request,
            include_pubmed=True,
            include_preprints=False,
            output_json=False,
        )

        # Should NOT include cBioPortal summary
        assert "cBioPortal Summary" not in result
        assert "Mutation Frequency" not in result

    @pytest.mark.asyncio
    @pytest.mark.integration
    async def test_article_search_multiple_genes(self):
        """Test that searching with multiple genes uses the first one."""
        request = PubmedRequest(
            genes=["KRAS", "NRAS", "BRAF"],
            diseases=["colorectal cancer"],
        )

        result = await search_articles_unified(
            request,
            include_pubmed=True,
            include_preprints=False,
            output_json=False,
        )

        # Should include cBioPortal summary for KRAS (first gene)
        assert "cBioPortal Summary for KRAS" in result
        # Common KRAS hotspot
        assert "G12" in result or "mutation" in result

    @pytest.mark.asyncio
    @pytest.mark.integration
    async def test_article_search_with_invalid_gene(self):
        """Test graceful handling of invalid gene names."""
        request = PubmedRequest(
            genes=["BRCA1"],  # Valid gene
            keywords=["cancer"],
        )

        # First check that we handle invalid genes gracefully
        # by using a real gene that might have cBioPortal data
        result = await search_articles_unified(
            request,
            include_pubmed=True,
            include_preprints=False,
            output_json=False,
        )

        # Should have some content - either cBioPortal summary or articles
        assert len(result) > 50  # Some reasonable content

        # Now test with a gene that's valid for search but not in cBioPortal
        request2 = PubmedRequest(
            genes=["ACE2"],  # Real gene but might not be in cancer studies
            keywords=["COVID-19"],
        )

        result2 = await search_articles_unified(
            request2,
            include_pubmed=True,
            include_preprints=False,
            output_json=False,
        )

        # Should return results even if cBioPortal data is not available
        assert len(result2) > 50

    @pytest.mark.asyncio
    @pytest.mark.integration
    async def test_article_search_with_preprints_and_cbioportal(self):
        """Test that cBioPortal summary works with preprint searches too."""
        request = PubmedRequest(
            genes=["EGFR"],
            keywords=["lung cancer", "osimertinib"],
        )

        result = await search_articles_unified(
            request,
            include_pubmed=True,
            include_preprints=True,
            output_json=False,
        )

        # Should include cBioPortal summary
        assert "cBioPortal Summary for EGFR" in result
        # Should include both peer-reviewed and preprint results
        assert ("pmid" in result or "Title" in result) and (
            "Preprint" in result
            or "bioRxiv" in result
            or "peer_reviewed" in result
        )

```

--------------------------------------------------------------------------------
/src/biomcp/diseases/getter.py:
--------------------------------------------------------------------------------

```python
"""Disease information retrieval from MyDisease.info."""

import json
import logging
from typing import Annotated

from pydantic import Field

from ..integrations import BioThingsClient
from ..render import to_markdown

logger = logging.getLogger(__name__)


def _add_disease_links(disease_info, result: dict) -> None:
    """Add helpful links to disease result."""
    links = {}

    # Add MONDO browser link if available
    if (
        disease_info.mondo
        and isinstance(disease_info.mondo, dict)
        and (mondo_id := disease_info.mondo.get("mondo"))
        and isinstance(mondo_id, str)
        and mondo_id.startswith("MONDO:")
    ):
        links["MONDO Browser"] = (
            f"https://www.ebi.ac.uk/ols/ontologies/mondo/terms?iri=http://purl.obolibrary.org/obo/{mondo_id.replace(':', '_')}"
        )

    # Add Disease Ontology link if available
    if (
        disease_info.xrefs
        and isinstance(disease_info.xrefs, dict)
        and (doid := disease_info.xrefs.get("doid"))
    ):
        if isinstance(doid, list) and doid:
            doid_id = doid[0] if isinstance(doid[0], str) else str(doid[0])
            links["Disease Ontology"] = (
                f"https://www.disease-ontology.org/?id={doid_id}"
            )
        elif isinstance(doid, str):
            links["Disease Ontology"] = (
                f"https://www.disease-ontology.org/?id={doid}"
            )

    # Add PubMed search link
    if disease_info.name:
        links["PubMed Search"] = (
            f"https://pubmed.ncbi.nlm.nih.gov/?term={disease_info.name.replace(' ', '+')}"
        )

    if links:
        result["_links"] = links


def _format_disease_output(disease_info, result: dict) -> None:
    """Format disease output for display."""
    # Format synonyms nicely
    if disease_info.synonyms:
        result["synonyms"] = ", ".join(
            disease_info.synonyms[:10]
        )  # Limit to first 10
        if len(disease_info.synonyms) > 10:
            result["synonyms"] += (
                f" (and {len(disease_info.synonyms) - 10} more)"
            )

    # Format phenotypes if present
    if disease_info.phenotypes:
        # Just show count and first few phenotypes
        phenotype_names = []
        for pheno in disease_info.phenotypes[:5]:
            if isinstance(pheno, dict) and "phenotype" in pheno:
                phenotype_names.append(pheno["phenotype"])
        if phenotype_names:
            result["associated_phenotypes"] = ", ".join(phenotype_names)
            if len(disease_info.phenotypes) > 5:
                result["associated_phenotypes"] += (
                    f" (and {len(disease_info.phenotypes) - 5} more)"
                )
        # Remove the raw phenotypes data for cleaner output
        result.pop("phenotypes", None)


async def get_disease(
    disease_id_or_name: str,
    output_json: bool = False,
) -> str:
    """
    Get disease information from MyDisease.info.

    Args:
        disease_id_or_name: Disease ID (MONDO, DOID) or name (e.g., "melanoma", "MONDO:0016575")
        output_json: Return as JSON instead of markdown

    Returns:
        Disease information as markdown or JSON string
    """
    client = BioThingsClient()

    try:
        disease_info = await client.get_disease_info(disease_id_or_name)

        if not disease_info:
            error_data = {
                "error": f"Disease '{disease_id_or_name}' not found",
                "suggestion": "Please check the disease name or ID (MONDO:, DOID:, OMIM:, MESH:)",
            }
            return (
                json.dumps(error_data, indent=2)
                if output_json
                else to_markdown([error_data])
            )

        # Convert to dict for rendering
        result = disease_info.model_dump(exclude_none=True)

        # Add helpful links
        _add_disease_links(disease_info, result)

        # Format output for display
        _format_disease_output(disease_info, result)

        if output_json:
            return json.dumps(result, indent=2)
        else:
            return to_markdown([result])

    except Exception as e:
        logger.error(
            f"Error fetching disease info for {disease_id_or_name}: {e}"
        )
        error_data = {
            "error": "Failed to retrieve disease information",
            "details": str(e),
        }
        return (
            json.dumps(error_data, indent=2)
            if output_json
            else to_markdown([error_data])
        )


async def _disease_details(
    call_benefit: Annotated[
        str,
        "Define and summarize why this function is being called and the intended benefit",
    ],
    disease_id_or_name: Annotated[
        str,
        Field(
            description="Disease name (e.g., melanoma, GIST) or ID (e.g., MONDO:0016575, DOID:1909)"
        ),
    ],
) -> str:
    """
    Retrieves detailed information for a disease from MyDisease.info.

    This tool provides real-time disease annotations including:
    - Official disease name and definition
    - Disease synonyms and alternative names
    - Ontology mappings (MONDO, DOID, OMIM, etc.)
    - Associated phenotypes
    - Links to disease databases

    Parameters:
    - call_benefit: Define why this function is being called
    - disease_id_or_name: Disease name or ontology ID

    Process: Queries MyDisease.info API for up-to-date disease information
    Output: Markdown formatted disease information with definition and metadata

    Note: For clinical trials about diseases, use trial_searcher. For articles about diseases, use article_searcher.
    """
    return await get_disease(disease_id_or_name, output_json=False)

```

--------------------------------------------------------------------------------
/src/biomcp/connection_pool.py:
--------------------------------------------------------------------------------

```python
"""Connection pool manager with proper event loop lifecycle management.

This module provides HTTP connection pooling that is properly integrated
with asyncio event loops. It ensures that connection pools are:
- Created per event loop to avoid cross-loop usage
- Automatically cleaned up when event loops are garbage collected
- Reused across requests for better performance

Key Features:
- Event loop isolation - each loop gets its own pools
- Weak references prevent memory leaks
- Automatic cleanup on loop destruction
- Thread-safe pool management

Example:
    ```python
    # Get a connection pool for the current event loop
    pool = await get_connection_pool(verify=True, timeout=httpx.Timeout(30))

    # Use the pool for multiple requests (no need to close)
    response = await pool.get("https://api.example.com/data")
    ```

Environment Variables:
    BIOMCP_USE_CONNECTION_POOL: Enable/disable pooling (default: "true")
"""

import asyncio
import ssl
import weakref

# NOTE: httpx import is allowed in this file for connection pooling infrastructure
import httpx


class EventLoopConnectionPools:
    """Manages connection pools per event loop.

    This class ensures that each asyncio event loop has its own set of
    connection pools, preventing cross-loop contamination and ensuring
    proper cleanup when event loops are destroyed.

    Attributes:
        _loop_pools: Weak key dictionary mapping event loops to their pools
        _lock: Asyncio lock for thread-safe pool creation
    """

    def __init__(self):
        # Use weak references to avoid keeping event loops alive
        self._loop_pools: weakref.WeakKeyDictionary = (
            weakref.WeakKeyDictionary()
        )
        self._lock = asyncio.Lock()

    async def get_pool(
        self, verify: ssl.SSLContext | str | bool, timeout: httpx.Timeout
    ) -> httpx.AsyncClient:
        """Get or create a connection pool for the current event loop."""
        try:
            loop = asyncio.get_running_loop()
        except RuntimeError:
            # No event loop running, return a single-use client
            return self._create_client(verify, timeout, pooled=False)

        # Get or create pools dict for this event loop
        async with self._lock:
            if loop not in self._loop_pools:
                self._loop_pools[loop] = {}
                # Register cleanup when loop is garbage collected
                self._register_loop_cleanup(loop)

            pools = self._loop_pools[loop]
            pool_key = self._get_pool_key(verify)

            # Check if we have a valid pool
            if pool_key in pools and not pools[pool_key].is_closed:
                return pools[pool_key]

            # Create new pool
            client = self._create_client(verify, timeout, pooled=True)
            pools[pool_key] = client
            return client

    def _get_pool_key(self, verify: ssl.SSLContext | str | bool) -> str:
        """Generate a key for the connection pool."""
        if isinstance(verify, ssl.SSLContext):
            return f"ssl_{id(verify)}"
        return str(verify)

    def _create_client(
        self,
        verify: ssl.SSLContext | str | bool,
        timeout: httpx.Timeout,
        pooled: bool = True,
    ) -> httpx.AsyncClient:
        """Create a new HTTP client."""
        if pooled:
            limits = httpx.Limits(
                max_keepalive_connections=20,
                max_connections=100,
                keepalive_expiry=30,
            )
        else:
            # Single-use client
            limits = httpx.Limits(max_keepalive_connections=0)

        return httpx.AsyncClient(
            verify=verify,
            http2=False,  # HTTP/2 can add overhead
            timeout=timeout,
            limits=limits,
        )

    def _register_loop_cleanup(self, loop: asyncio.AbstractEventLoop):
        """Register cleanup when event loop is garbage collected."""
        # Store pools to close when loop is garbage collected
        # Note: We can't create weak references to dicts, so we'll
        # clean up pools when the loop itself is garbage collected

        def cleanup():
            # Get pools for this loop if they still exist
            pools = self._loop_pools.get(loop, {})
            if pools:
                # Try to close all clients gracefully
                for client in list(pools.values()):
                    if client and not client.is_closed:
                        # Close synchronously since loop might be gone
                        import contextlib

                        with contextlib.suppress(Exception):
                            client._transport.close()

        # Register finalizer on the loop itself
        weakref.finalize(loop, cleanup)

    async def close_all(self):
        """Close all connection pools."""
        async with self._lock:
            all_clients = []
            for pools in self._loop_pools.values():
                all_clients.extend(pools.values())

            # Close all clients
            close_tasks = []
            for client in all_clients:
                if client and not client.is_closed:
                    close_tasks.append(client.aclose())

            if close_tasks:
                await asyncio.gather(*close_tasks, return_exceptions=True)

            self._loop_pools.clear()


# Global instance
_pool_manager = EventLoopConnectionPools()


async def get_connection_pool(
    verify: ssl.SSLContext | str | bool,
    timeout: httpx.Timeout,
) -> httpx.AsyncClient:
    """Get a connection pool for the current event loop."""
    return await _pool_manager.get_pool(verify, timeout)


async def close_all_pools():
    """Close all connection pools."""
    await _pool_manager.close_all()

```

--------------------------------------------------------------------------------
/src/biomcp/parameter_parser.py:
--------------------------------------------------------------------------------

```python
"""Parameter parsing and validation for BioMCP."""

import json
import logging
from typing import Any

from biomcp.exceptions import InvalidParameterError

logger = logging.getLogger(__name__)


class ParameterParser:
    """Handles parameter parsing and validation for search requests."""

    @staticmethod
    def parse_list_param(
        param: str | list[str] | None, param_name: str
    ) -> list[str] | None:
        """Convert various input formats to lists.

        Handles:
        - JSON arrays: '["item1", "item2"]' -> ['item1', 'item2']
        - Comma-separated: 'item1, item2' -> ['item1', 'item2']
        - Single values: 'item' -> ['item']
        - None values: None -> None
        - Already parsed lists: ['item'] -> ['item']

        Args:
            param: The parameter to parse
            param_name: Name of the parameter for error messages

        Returns:
            Parsed list or None

        Raises:
            InvalidParameterError: If parameter cannot be parsed
        """
        if param is None:
            return None

        if isinstance(param, str):
            # First try to parse as JSON array
            if param.startswith("["):
                try:
                    parsed = json.loads(param)
                    if not isinstance(parsed, list):
                        raise InvalidParameterError(
                            param_name,
                            param,
                            "JSON array or comma-separated string",
                        )
                    return parsed
                except (json.JSONDecodeError, TypeError) as e:
                    logger.debug(f"Failed to parse {param_name} as JSON: {e}")

            # If it's a comma-separated string, split it
            if "," in param:
                return [item.strip() for item in param.split(",")]

            # Otherwise return as single-item list
            return [param]

        # If it's already a list, validate and return as-is
        if isinstance(param, list):
            # Validate all items are strings
            if not all(isinstance(item, str) for item in param):
                raise InvalidParameterError(
                    param_name, param, "list of strings"
                )
            return param

        # Invalid type
        raise InvalidParameterError(
            param_name, param, "string, list of strings, or None"
        )

    @staticmethod
    def normalize_phase(phase: str | None) -> str | None:
        """Normalize phase values for clinical trials.

        Converts various formats to standard enum values:
        - "Phase 3" -> "PHASE3"
        - "phase 3" -> "PHASE3"
        - "PHASE 3" -> "PHASE3"
        - "phase3" -> "PHASE3"

        Args:
            phase: Phase value to normalize

        Returns:
            Normalized phase value or None
        """
        if phase is None:
            return None

        # Convert to uppercase and remove spaces
        normalized = phase.upper().replace(" ", "")

        # Validate it matches expected pattern
        valid_phases = [
            "EARLYPHASE1",
            "PHASE1",
            "PHASE2",
            "PHASE3",
            "PHASE4",
            "NOTAPPLICABLE",
        ]
        if normalized not in valid_phases:
            # Try to be helpful with common mistakes
            if "EARLY" in normalized and "1" in normalized:
                return "EARLYPHASE1"
            if "NOT" in normalized and "APPLICABLE" in normalized:
                return "NOTAPPLICABLE"

            raise InvalidParameterError(
                "phase", phase, f"one of: {', '.join(valid_phases)}"
            )

        return normalized

    @staticmethod
    def validate_page_params(page: int, page_size: int) -> tuple[int, int]:
        """Validate pagination parameters.

        Args:
            page: Page number (minimum 1)
            page_size: Results per page (1-100)

        Returns:
            Validated (page, page_size) tuple

        Raises:
            InvalidParameterError: If parameters are invalid
        """
        if page < 1:
            raise InvalidParameterError("page", page, "integer >= 1")

        if page_size < 1 or page_size > 100:
            raise InvalidParameterError(
                "page_size", page_size, "integer between 1 and 100"
            )

        return page, page_size

    @staticmethod
    def parse_search_params(
        params: dict[str, Any], domain: str
    ) -> dict[str, Any]:
        """Parse and validate all search parameters for a domain.

        Args:
            params: Raw parameters dictionary
            domain: Domain being searched

        Returns:
            Validated parameters dictionary
        """
        parsed: dict[str, Any] = {}

        # Common list parameters
        list_params = [
            "genes",
            "diseases",
            "variants",
            "chemicals",
            "keywords",
            "conditions",
            "interventions",
        ]

        for param_name in list_params:
            if param_name in params and params[param_name] is not None:
                parsed[param_name] = ParameterParser.parse_list_param(
                    params[param_name], param_name
                )

        # Domain-specific parameters
        if (
            domain == "trial"
            and "phase" in params
            and params.get("phase") is not None
        ):
            parsed["phase"] = ParameterParser.normalize_phase(
                params.get("phase")
            )

        # Pass through other parameters
        for key, value in params.items():
            if key not in parsed and key not in list_params and key != "phase":
                parsed[key] = value

        return parsed

```

--------------------------------------------------------------------------------
/src/biomcp/openfda/drug_labels.py:
--------------------------------------------------------------------------------

```python
"""
OpenFDA Drug Labels (SPL) integration.
"""

import logging

from .constants import (
    OPENFDA_DEFAULT_LIMIT,
    OPENFDA_DISCLAIMER,
    OPENFDA_DRUG_LABELS_URL,
    OPENFDA_MAX_LIMIT,
)
from .drug_labels_helpers import (
    build_label_search_query,
    format_label_header,
    format_label_section,
    format_label_summary,
    get_default_sections,
    get_section_titles,
)
from .utils import clean_text, format_count, make_openfda_request

logger = logging.getLogger(__name__)


async def search_drug_labels(
    name: str | None = None,
    indication: str | None = None,
    boxed_warning: bool = False,
    section: str | None = None,
    limit: int = OPENFDA_DEFAULT_LIMIT,
    skip: int = 0,
    api_key: str | None = None,
) -> str:
    """
    Search FDA drug product labels (SPL).

    Args:
        name: Drug name to search for
        indication: Search for drugs indicated for this condition
        boxed_warning: Filter for drugs with boxed warnings
        section: Specific label section to search
        limit: Maximum number of results
        skip: Number of results to skip

        api_key: Optional OpenFDA API key (overrides OPENFDA_API_KEY env var)

    Returns:
        Formatted string with drug label information
    """
    if not name and not indication and not section and not boxed_warning:
        return (
            "⚠️ Please specify a drug name, indication, or label section to search.\n\n"
            "Examples:\n"
            "- Search by name: --name 'pembrolizumab'\n"
            "- Search by indication: --indication 'melanoma'\n"
            "- Search by section: --section 'contraindications'"
        )

    # Build and execute search
    search_query = build_label_search_query(
        name, indication, boxed_warning, section
    )
    params = {
        "search": search_query,
        "limit": min(limit, OPENFDA_MAX_LIMIT),
        "skip": skip,
    }

    response, error = await make_openfda_request(
        OPENFDA_DRUG_LABELS_URL, params, "openfda_drug_labels", api_key
    )

    if error:
        return f"⚠️ Error searching drug labels: {error}"

    if not response or not response.get("results"):
        return _format_no_results(name, indication, section)

    results = response["results"]
    total = (
        response.get("meta", {}).get("results", {}).get("total", len(results))
    )

    # Build output
    output = ["## FDA Drug Labels\n"]
    output.extend(_format_search_summary(name, indication, section, total))

    # Display results
    output.append(
        f"### Results (showing {min(len(results), 5)} of {total}):\n"
    )
    for i, result in enumerate(results[:5], 1):
        output.extend(format_label_summary(result, i))

    # Add tip for getting full labels
    if total > 0 and results and "set_id" in results[0]:
        output.append(
            "\n💡 **Tip**: Use `biomcp openfda label-get <label_id>` to retrieve "
            "the complete label for any drug."
        )

    output.append(f"\n{OPENFDA_DISCLAIMER}")
    return "\n".join(output)


def _format_no_results(
    name: str | None, indication: str | None, section: str | None
) -> str:
    """Format no results message."""
    search_desc = []
    if name:
        search_desc.append(f"drug '{name}'")
    if indication:
        search_desc.append(f"indication '{indication}'")
    if section:
        search_desc.append(f"section '{section}'")
    return f"No drug labels found for {' and '.join(search_desc)}."


def _format_search_summary(
    name: str | None, indication: str | None, section: str | None, total: int
) -> list[str]:
    """Format the search summary."""
    output = []

    search_desc = []
    if name:
        search_desc.append(f"**Drug**: {name}")
    if indication:
        search_desc.append(f"**Indication**: {indication}")
    if section:
        search_desc.append(f"**Section**: {section}")

    if search_desc:
        output.append(" | ".join(search_desc))
    output.append(f"**Total Labels Found**: {format_count(total, 'label')}\n")

    return output


async def get_drug_label(
    set_id: str,
    sections: list[str] | None = None,
    api_key: str | None = None,
) -> str:
    """
    Get detailed drug label information by set ID.

    Args:
        set_id: Label set ID
        sections: Specific sections to retrieve (default: key sections)

        api_key: Optional OpenFDA API key (overrides OPENFDA_API_KEY env var)

    Returns:
        Formatted string with detailed label information
    """
    params = {
        "search": f'set_id:"{set_id}"',
        "limit": 1,
    }

    response, error = await make_openfda_request(
        OPENFDA_DRUG_LABELS_URL, params, "openfda_drug_label_detail", api_key
    )

    if error:
        return f"⚠️ Error retrieving drug label: {error}"

    if not response or not response.get("results"):
        return f"Drug label with ID '{set_id}' not found."

    result = response["results"][0]

    # Use default sections if not specified
    if not sections:
        sections = get_default_sections()

    # Build output
    output = format_label_header(result, set_id)

    # Boxed warning (if exists)
    if "boxed_warning" in result:
        output.extend(_format_boxed_warning(result["boxed_warning"]))

    # Display requested sections
    section_titles = get_section_titles()
    for section in sections:
        output.extend(format_label_section(result, section, section_titles))

    output.append(f"\n{OPENFDA_DISCLAIMER}")
    return "\n".join(output)


def _format_boxed_warning(boxed_warning: list) -> list[str]:
    """Format boxed warning section."""
    output = ["### ⚠️ BOXED WARNING\n"]
    warning_text = clean_text(" ".join(boxed_warning))
    output.append(warning_text)
    output.append("")
    return output

```

--------------------------------------------------------------------------------
/src/biomcp/cli/articles.py:
--------------------------------------------------------------------------------

```python
import asyncio
import json
from typing import Annotated

import typer

from ..articles import fetch
from ..articles.search import PubmedRequest, search_articles
from ..articles.unified import search_articles_unified

article_app = typer.Typer(help="Search and retrieve biomedical articles.")


async def get_article_details(
    identifier: str, output_json: bool = False
) -> str:
    """Get article details handling both PMIDs and DOIs with proper output format."""
    # Use the fetch module functions directly to control output format
    if fetch.is_doi(identifier):
        from ..articles.preprints import fetch_europe_pmc_article

        return await fetch_europe_pmc_article(
            identifier, output_json=output_json
        )
    elif fetch.is_pmid(identifier):
        return await fetch.fetch_articles(
            [int(identifier)], full=True, output_json=output_json
        )
    else:
        # Unknown identifier format
        error_data = [
            {
                "error": f"Invalid identifier format: {identifier}. Expected either a PMID (numeric) or DOI (10.xxxx/xxxx format)."
            }
        ]
        if output_json:
            return json.dumps(error_data, indent=2)
        else:
            from .. import render

            return render.to_markdown(error_data)


@article_app.command("search")
def search_article(
    genes: Annotated[
        list[str] | None,
        typer.Option(
            "--gene",
            "-g",
            help="Gene name to search for (can be specified multiple times)",
        ),
    ] = None,
    variants: Annotated[
        list[str] | None,
        typer.Option(
            "--variant",
            "-v",
            help="Genetic variant to search for (can be specified multiple times)",
        ),
    ] = None,
    diseases: Annotated[
        list[str] | None,
        typer.Option(
            "--disease",
            "-d",
            help="Disease name to search for (can be specified multiple times)",
        ),
    ] = None,
    chemicals: Annotated[
        list[str] | None,
        typer.Option(
            "--chemical",
            "-c",
            help="Chemical name to search for (can be specified multiple times)",
        ),
    ] = None,
    keywords: Annotated[
        list[str] | None,
        typer.Option(
            "--keyword",
            "-k",
            help="Keyword to search for (can be specified multiple times)",
        ),
    ] = None,
    page: Annotated[
        int,
        typer.Option(
            "--page",
            "-p",
            help="Page number for pagination (starts at 1)",
        ),
    ] = 1,
    output_json: Annotated[
        bool,
        typer.Option(
            "--json",
            "-j",
            help="Render in JSON format",
            case_sensitive=False,
        ),
    ] = False,
    include_preprints: Annotated[
        bool,
        typer.Option(
            "--include-preprints/--no-preprints",
            help="Include preprint articles from bioRxiv/medRxiv and Europe PMC",
        ),
    ] = True,
):
    """Search biomedical research articles"""
    request = PubmedRequest(
        genes=genes or [],
        variants=variants or [],
        diseases=diseases or [],
        chemicals=chemicals or [],
        keywords=keywords or [],
    )

    if include_preprints:
        result = asyncio.run(
            search_articles_unified(
                request,
                include_pubmed=True,
                include_preprints=True,
                output_json=output_json,
            )
        )
    else:
        result = asyncio.run(search_articles(request, output_json))
    typer.echo(result)


@article_app.command("get")
def get_article(
    identifiers: Annotated[
        list[str],
        typer.Argument(
            help="Article identifiers - PubMed IDs (e.g., 38768446) or DOIs (e.g., 10.1101/2024.01.20.23288905)",
        ),
    ],
    full: Annotated[
        bool,
        typer.Option(
            "--full",
            "-f",
            help="Whether to fetch full article text (PubMed only)",
        ),
    ] = False,
    output_json: Annotated[
        bool,
        typer.Option(
            "--json",
            "-j",
            help="Render in JSON format",
            case_sensitive=False,
        ),
    ] = False,
):
    """
    Retrieve articles by PubMed ID or DOI.

    Supports:
    - PubMed IDs for published articles (e.g., 38768446)
    - DOIs for Europe PMC preprints (e.g., 10.1101/2024.01.20.23288905)

    For multiple articles, results are returned as a list.
    """
    # Handle single identifier
    if len(identifiers) == 1:
        result = asyncio.run(
            get_article_details(identifiers[0], output_json=output_json)
        )
    else:
        # For multiple identifiers, we need to handle them individually
        # since they might be a mix of PMIDs and DOIs
        results = []
        for identifier in identifiers:
            article_result = asyncio.run(
                get_article_details(identifier, output_json=True)
            )
            # Parse the result and add to list
            try:
                article_data = json.loads(article_result)
                if isinstance(article_data, list):
                    results.extend(article_data)
                else:
                    results.append(article_data)
            except json.JSONDecodeError:
                # This shouldn't happen with our new function
                results.append({
                    "error": f"Failed to parse result for {identifier}"
                })

        if output_json:
            result = json.dumps(results, indent=2)
        else:
            from .. import render

            result = render.to_markdown(results)

    typer.echo(result)

```

--------------------------------------------------------------------------------
/tests/tdd/variants/test_extract_gene_aa_change.py:
--------------------------------------------------------------------------------

```python
"""Tests for _extract_gene_aa_change method in external.py."""

import pytest

from biomcp.variants.external import ExternalVariantAggregator


class TestExtractGeneAAChange:
    """Test the _extract_gene_aa_change method."""

    @pytest.fixture
    def aggregator(self):
        """Create an ExternalVariantAggregator instance."""
        return ExternalVariantAggregator()

    def test_extract_from_docm(self, aggregator):
        """Test extraction from DOCM data."""
        variant_data = {"docm": {"gene": "BRAF", "aa_change": "p.V600E"}}

        result = aggregator._extract_gene_aa_change(variant_data)
        assert result == "BRAF V600E"

    def test_extract_from_hgvsp_long_format(self, aggregator):
        """Test extraction from hgvsp with long amino acid names."""
        variant_data = {
            "cadd": {"gene": {"genename": "TP53"}},
            "hgvsp": ["p.Arg175His"],
        }

        result = aggregator._extract_gene_aa_change(variant_data)
        # The code doesn't convert all long forms, just checks for Val/Ala
        assert result == "TP53 Arg175His"

    def test_extract_from_hgvsp_with_dbnsfp(self, aggregator):
        """Test extraction from hgvsp with dbnsfp gene name."""
        variant_data = {
            "dbnsfp": {"genename": "EGFR"},
            "hgvsp": ["p.Leu858Arg"],
        }

        result = aggregator._extract_gene_aa_change(variant_data)
        # The code doesn't convert Leu/Arg to L/R
        assert result == "EGFR Leu858Arg"

    def test_extract_from_cadd_data(self, aggregator):
        """Test extraction from CADD annotations."""
        variant_data = {
            "cadd": {
                "gene": {"genename": "KRAS", "prot": {"protpos": 12}},
                "oaa": "G",
                "naa": "D",
            }
        }

        result = aggregator._extract_gene_aa_change(variant_data)
        assert result == "KRAS G12D"

    def test_extract_from_docm_without_p_prefix(self, aggregator):
        """Test extraction from DOCM without p. prefix."""
        variant_data = {"docm": {"gene": "PIK3CA", "aa_change": "E545K"}}

        result = aggregator._extract_gene_aa_change(variant_data)
        assert result == "PIK3CA E545K"

    def test_extract_with_multiple_hgvsp(self, aggregator):
        """Test handling of multiple hgvsp entries - should take first."""
        variant_data = {
            "cadd": {"gene": {"genename": "BRCA1"}},
            "hgvsp": ["p.Gln1756Ter", "p.Gln1756*"],
        }

        result = aggregator._extract_gene_aa_change(variant_data)
        # Takes the first one, doesn't convert Gln/Ter
        assert result == "BRCA1 Gln1756Ter"

    def test_extract_with_special_characters(self, aggregator):
        """Test extraction with special characters in protein change."""
        variant_data = {
            "cadd": {"gene": {"genename": "MLH1"}},
            "hgvsp": ["p.Lys618Alafs*9"],
        }

        result = aggregator._extract_gene_aa_change(variant_data)
        # Should extract the basic AA change pattern
        assert result is not None
        assert "MLH1" in result

    def test_extract_no_gene_name(self, aggregator):
        """Test when gene name is missing."""
        variant_data = {"hgvsp": ["p.Val600Glu"]}

        result = aggregator._extract_gene_aa_change(variant_data)
        assert result is None

    def test_extract_no_aa_change(self, aggregator):
        """Test when AA change is missing."""
        variant_data = {"cadd": {"gene": {"genename": "BRAF"}}}

        result = aggregator._extract_gene_aa_change(variant_data)
        assert result is None

    def test_extract_empty_variant_data(self, aggregator):
        """Test with empty variant data."""
        result = aggregator._extract_gene_aa_change({})
        assert result is None

    def test_extract_malformed_hgvsp(self, aggregator):
        """Test with malformed HGVS protein notation."""
        variant_data = {
            "clinvar": {
                "gene": {"symbol": "MYC"},
                "hgvs": {"protein": ["invalid_format"]},
            }
        }

        result = aggregator._extract_gene_aa_change(variant_data)
        assert result is None

    def test_extract_priority_order(self, aggregator):
        """Test that DOCM is prioritized for AA change, CADD for gene name."""
        variant_data = {
            "docm": {"gene": "BRAF", "aa_change": "p.V600E"},
            "hgvsp": ["p.Val600Lys"],  # Different change
            "cadd": {
                "gene": {"genename": "WRONG", "prot": {"protpos": 600}},
                "oaa": "V",
                "naa": "K",
            },
        }

        result = aggregator._extract_gene_aa_change(variant_data)
        # CADD is prioritized for gene name, DOCM for AA change
        assert result == "WRONG V600E"

    def test_extract_regex_with_val_ala(self, aggregator):
        """Test regex extraction when Val/Ala are present."""
        # The code specifically looks for Val or Ala to trigger regex
        variant_data = {
            "cadd": {"gene": {"genename": "TEST1"}},
            "hgvsp": ["p.Val600Ala"],
        }

        result = aggregator._extract_gene_aa_change(variant_data)
        # The regex doesn't find a match in "Val600Ala" because it's looking for [A-Z]\d+[A-Z]
        # which would match "V600A" but not "Val600Ala"
        assert result == "TEST1 Val600Ala"

    def test_extract_handles_exceptions_gracefully(self, aggregator):
        """Test that exceptions are handled gracefully."""
        # This should trigger an exception internally but return None
        variant_data = {
            "cadd": {"gene": {"genename": "GENE"}},
            "hgvsp": None,  # This will cause issues
        }

        result = aggregator._extract_gene_aa_change(variant_data)
        assert result is None

```

--------------------------------------------------------------------------------
/tests/tdd/test_openfda_unified.py:
--------------------------------------------------------------------------------

```python
"""Tests for OpenFDA integration with unified search/fetch tools."""

import pytest


class TestOpenFDAUnifiedIntegration:
    """Test OpenFDA domain integration in unified tools."""

    def test_openfda_domains_registered(self):
        """Test that OpenFDA domains are properly registered in constants."""
        from biomcp.constants import (
            DOMAIN_TO_PLURAL,
            PLURAL_TO_DOMAIN,
            VALID_DOMAINS,
            VALID_DOMAINS_PLURAL,
        )

        # List of OpenFDA domains
        openfda_domains = [
            "fda_adverse",
            "fda_label",
            "fda_device",
            "fda_approval",
            "fda_recall",
            "fda_shortage",
        ]

        openfda_plurals = [
            "fda_adverse_events",
            "fda_labels",
            "fda_device_events",
            "fda_approvals",
            "fda_recalls",
            "fda_shortages",
        ]

        # Check that all OpenFDA domains are registered
        for domain in openfda_domains:
            assert domain in VALID_DOMAINS, f"{domain} not in VALID_DOMAINS"
            assert (
                domain in DOMAIN_TO_PLURAL
            ), f"{domain} not in DOMAIN_TO_PLURAL"

        # Check plural forms
        for plural in openfda_plurals:
            assert (
                plural in VALID_DOMAINS_PLURAL
            ), f"{plural} not in VALID_DOMAINS_PLURAL"
            assert (
                plural in PLURAL_TO_DOMAIN
            ), f"{plural} not in PLURAL_TO_DOMAIN"

        # Check mappings are correct
        assert DOMAIN_TO_PLURAL["fda_adverse"] == "fda_adverse_events"
        assert DOMAIN_TO_PLURAL["fda_label"] == "fda_labels"
        assert DOMAIN_TO_PLURAL["fda_device"] == "fda_device_events"
        assert DOMAIN_TO_PLURAL["fda_approval"] == "fda_approvals"
        assert DOMAIN_TO_PLURAL["fda_recall"] == "fda_recalls"
        assert DOMAIN_TO_PLURAL["fda_shortage"] == "fda_shortages"

        assert PLURAL_TO_DOMAIN["fda_adverse_events"] == "fda_adverse"
        assert PLURAL_TO_DOMAIN["fda_labels"] == "fda_label"
        assert PLURAL_TO_DOMAIN["fda_device_events"] == "fda_device"
        assert PLURAL_TO_DOMAIN["fda_approvals"] == "fda_approval"
        assert PLURAL_TO_DOMAIN["fda_recalls"] == "fda_recall"
        assert PLURAL_TO_DOMAIN["fda_shortages"] == "fda_shortage"

    def test_openfda_search_domain_type_hints(self):
        """Test that OpenFDA domains are in search tool type hints."""
        import inspect

        from biomcp.router import search

        # Get the function signature
        sig = inspect.signature(search)
        domain_param = sig.parameters.get("domain")

        # Check if domain parameter exists
        assert (
            domain_param is not None
        ), "domain parameter not found in search function"

        # Get the annotation
        annotation = domain_param.annotation

        # The annotation should be a Literal type that includes OpenFDA domains
        # We can't directly check the Literal values due to how Python handles it,
        # but we can verify that it's properly annotated
        assert (
            annotation is not None
        ), "domain parameter has no type annotation"

    def test_openfda_fetch_domain_type_hints(self):
        """Test that OpenFDA domains are in fetch tool type hints."""
        import inspect

        from biomcp.router import fetch

        # Get the function signature
        sig = inspect.signature(fetch)
        domain_param = sig.parameters.get("domain")

        # Check if domain parameter exists
        assert (
            domain_param is not None
        ), "domain parameter not found in fetch function"

        # Get the annotation
        annotation = domain_param.annotation

        # The annotation should be a Literal type that includes OpenFDA domains
        assert (
            annotation is not None
        ), "domain parameter has no type annotation"

    @pytest.mark.asyncio
    async def test_openfda_search_basic_call(self):
        """Test that OpenFDA domain search doesn't raise errors with basic call."""
        from unittest.mock import AsyncMock, patch

        # Mock the OpenFDA search function that will be imported
        with patch(
            "biomcp.openfda.adverse_events.search_adverse_events",
            new_callable=AsyncMock,
        ) as mock_search:
            mock_search.return_value = (
                "## FDA Adverse Event Reports\n\nTest results"
            )

            from biomcp.router import search

            # This should not raise an error
            result = await search(
                query=None,  # Required parameter
                domain="fda_adverse",
                chemicals=["test"],
                page_size=1,
            )

            # Basic check that result has expected structure
            assert isinstance(result, dict)
            assert "results" in result

    @pytest.mark.asyncio
    async def test_openfda_fetch_basic_call(self):
        """Test that OpenFDA domain fetch doesn't raise errors with basic call."""
        from unittest.mock import AsyncMock, patch

        # Mock the OpenFDA get function that will be imported
        with patch(
            "biomcp.openfda.drug_approvals.get_drug_approval",
            new_callable=AsyncMock,
        ) as mock_get:
            mock_get.return_value = "## Drug Approval Details\n\nTest details"

            from biomcp.router import fetch

            # This should not raise an error
            result = await fetch(
                id="TEST123",
                domain="fda_approval",
            )

            # Basic check that result has expected structure
            assert isinstance(result, dict)
            assert "title" in result
            assert "text" in result
            assert "metadata" in result

```

--------------------------------------------------------------------------------
/tests/tdd/articles/test_preprints.py:
--------------------------------------------------------------------------------

```python
"""Tests for preprint search functionality."""

from unittest.mock import AsyncMock, patch

import pytest

from biomcp.articles.preprints import (
    BiorxivClient,
    BiorxivResponse,
    BiorxivResult,
    EuropePMCClient,
    EuropePMCResponse,
    PreprintSearcher,
)
from biomcp.articles.search import PubmedRequest, ResultItem
from biomcp.core import PublicationState


class TestBiorxivClient:
    """Tests for BiorxivClient."""

    @pytest.mark.asyncio
    async def test_search_biorxiv_success(self):
        """Test successful bioRxiv search."""
        client = BiorxivClient()

        # Mock response
        mock_response = BiorxivResponse(
            collection=[
                BiorxivResult(
                    doi="10.1101/2024.01.01.123456",
                    title="Test BRAF Mutation Study",
                    authors="Smith, J.; Doe, J.",
                    date="2024-01-01",
                    abstract="Study about BRAF mutations in cancer.",
                    server="biorxiv",
                )
            ],
            total=1,
        )

        with patch("biomcp.http_client.request_api") as mock_request:
            mock_request.return_value = (mock_response, None)

            results = await client.search("BRAF")

            assert len(results) == 1
            assert results[0].doi == "10.1101/2024.01.01.123456"
            assert results[0].title == "Test BRAF Mutation Study"
            assert results[0].publication_state == PublicationState.PREPRINT
            assert "preprint" in results[0].journal.lower()

    @pytest.mark.asyncio
    async def test_search_biorxiv_no_results(self):
        """Test bioRxiv search with no results."""
        client = BiorxivClient()

        with patch("biomcp.http_client.request_api") as mock_request:
            mock_request.return_value = (
                None,
                {"code": 404, "message": "Not found"},
            )

            results = await client.search("nonexistent")

            assert len(results) == 0


class TestEuropePMCClient:
    """Tests for EuropePMCClient."""

    @pytest.mark.asyncio
    async def test_search_europe_pmc_success(self):
        """Test successful Europe PMC search."""
        client = EuropePMCClient()

        # Mock response
        mock_response = EuropePMCResponse(
            hitCount=1,
            resultList={
                "result": [
                    {
                        "id": "PPR123456",
                        "doi": "10.1101/2024.01.02.654321",
                        "title": "TP53 Mutation Analysis",
                        "authorString": "Johnson, A., Williams, B.",
                        "journalTitle": "bioRxiv",
                        "firstPublicationDate": "2024-01-02",
                        "abstractText": "Analysis of TP53 mutations.",
                    }
                ]
            },
        )

        with patch("biomcp.http_client.request_api") as mock_request:
            mock_request.return_value = (mock_response, None)

            results = await client.search("TP53")

            assert len(results) == 1
            assert results[0].doi == "10.1101/2024.01.02.654321"
            assert results[0].title == "TP53 Mutation Analysis"
            assert results[0].publication_state == PublicationState.PREPRINT


class TestPreprintSearcher:
    """Tests for PreprintSearcher."""

    @pytest.mark.asyncio
    async def test_search_combined_sources(self):
        """Test searching across multiple preprint sources."""
        searcher = PreprintSearcher()

        # Mock both clients
        mock_biorxiv_results = [
            ResultItem(
                doi="10.1101/2024.01.01.111111",
                title="BRAF Study 1",
                date="2024-01-01",
                publication_state=PublicationState.PREPRINT,
            )
        ]

        mock_europe_results = [
            ResultItem(
                doi="10.1101/2024.01.02.222222",
                title="BRAF Study 2",
                date="2024-01-02",
                publication_state=PublicationState.PREPRINT,
            )
        ]

        searcher.biorxiv_client.search = AsyncMock(
            return_value=mock_biorxiv_results
        )
        searcher.europe_pmc_client.search = AsyncMock(
            return_value=mock_europe_results
        )

        request = PubmedRequest(genes=["BRAF"])
        response = await searcher.search(request)

        assert response.count == 2
        assert len(response.results) == 2
        # Results should be sorted by date (newest first)
        assert response.results[0].doi == "10.1101/2024.01.02.222222"
        assert response.results[1].doi == "10.1101/2024.01.01.111111"

    @pytest.mark.asyncio
    async def test_search_duplicate_removal(self):
        """Test that duplicate DOIs are removed."""
        searcher = PreprintSearcher()

        # Create duplicate results with same DOI
        duplicate_doi = "10.1101/2024.01.01.999999"

        mock_biorxiv_results = [
            ResultItem(
                doi=duplicate_doi,
                title="Duplicate Study",
                date="2024-01-01",
                publication_state=PublicationState.PREPRINT,
            )
        ]

        mock_europe_results = [
            ResultItem(
                doi=duplicate_doi,
                title="Duplicate Study",
                date="2024-01-01",
                publication_state=PublicationState.PREPRINT,
            )
        ]

        searcher.biorxiv_client.search = AsyncMock(
            return_value=mock_biorxiv_results
        )
        searcher.europe_pmc_client.search = AsyncMock(
            return_value=mock_europe_results
        )

        request = PubmedRequest(keywords=["test"])
        response = await searcher.search(request)

        assert response.count == 1
        assert len(response.results) == 1
        assert response.results[0].doi == duplicate_doi

```

--------------------------------------------------------------------------------
/tests/tdd/test_render.py:
--------------------------------------------------------------------------------

```python
from biomcp import render


def test_render_full_json(data_dir):
    input_data = (data_dir / "ct_gov/trials_NCT04280705.json").read_text()
    expect_markdown = (data_dir / "ct_gov/trials_NCT04280705.txt").read_text()
    markdown = render.to_markdown(input_data)
    assert markdown == expect_markdown

    input_data = (
        data_dir / "myvariant/variants_full_braf_v600e.json"
    ).read_text()
    expect_markdown = (
        data_dir / "myvariant/variants_full_braf_v600e.txt"
    ).read_text()
    markdown = render.to_markdown(input_data)
    print("==" * 100)
    print(markdown)
    print("==" * 100)
    assert markdown == expect_markdown


def test_render_with_nones():
    markdown = render.to_markdown(data)
    assert (
        markdown
        == """# Studies

## Protocol Section

### Design Module
Study Type: interventional
Phases: phase2

### Identification Module
Brief Title:
  study of autologous tumor infiltrating lymphocytes in patients with
  solid tumors
Nct Id: nct03645928

### Status Module
Overall Status: recruiting

#### Completion Date Struct
Date: 2029-08-09

#### Start Date Struct
Date: 2019-05-07
"""
    )


data = {
    "next_page_token": None,
    "studies": [
        {
            "derived_section": None,
            "document_section": None,
            "has_results": None,
            "protocol_section": {
                "arms_interventions_module": None,
                "conditions_module": None,
                "contacts_locations_module": None,
                "description_module": None,
                "design_module": {
                    "design_info": None,
                    "enrollment_info": None,
                    "phases": ["phase2"],
                    "study_type": "interventional",
                },
                "eligibility_module": None,
                "identification_module": {
                    "acronym": None,
                    "brief_title": "study "
                    "of "
                    "autologous "
                    "tumor "
                    "infiltrating "
                    "lymphocytes "
                    "in "
                    "patients "
                    "with "
                    "solid "
                    "tumors",
                    "nct_id": "nct03645928",
                    "official_title": None,
                    "org_study_id_info": None,
                    "organization": None,
                    "secondary_id_infos": None,
                },
                "outcomes_module": None,
                "oversight_module": None,
                "references_module": None,
                "sponsor_collaborators_module": None,
                "status_module": {
                    "completion_date_struct": {
                        "date": "2029-08-09",
                        "type": None,
                    },
                    "expanded_access_info": None,
                    "last_known_status": None,
                    "last_update_post_date_struct": None,
                    "last_update_submit_date": None,
                    "overall_status": "recruiting",
                    "primary_completion_date_struct": None,
                    "results_first_post_date_struct": None,
                    "results_first_submit_date": None,
                    "results_first_submit_qc_date": None,
                    "start_date_struct": {"date": "2019-05-07", "type": None},
                    "status_verified_date": None,
                    "study_first_post_date_struct": None,
                    "study_first_submit_date": None,
                    "study_first_submit_qc_date": None,
                    "why_stopped": None,
                },
            },
            "results_section": None,
        },
    ],
}


def test_transform_key_protocol_section():
    assert render.transform_key("protocol_section") == "Protocol Section"


def test_transform_key_nct_number():
    assert render.transform_key("nct_number") == "Nct Number"


def test_transform_key_study_url():
    assert render.transform_key("study_url") == "Study Url"


def test_transform_key_allcaps():
    assert render.transform_key("allcaps") == "Allcaps"


def test_transform_key_primary_purpose():
    assert render.transform_key("primary_purpose") == "Primary Purpose"


def test_transform_key_underscores():
    assert render.transform_key("some_key_name") == "Some Key Name"


def test_transform_key_lowercase():
    assert render.transform_key("somekey") == "Somekey"


def test_transform_key_nctid():
    assert render.transform_key("nct_id") == "Nct Id"


def test_transform_key_4dct():
    assert render.transform_key("4dct") == "4dct"


def test_wrap_preserve_newlines_blank():
    assert render.wrap_preserve_newlines("", 20) == []


def test_wrap_preserve_newlines_short_line():
    text = "hello world"
    assert render.wrap_preserve_newlines(text, 20) == ["hello world"]


def test_wrap_preserve_newlines_long():
    text = "this line is definitely longer than twenty characters"
    lines = render.wrap_preserve_newlines(text, 20)
    assert len(lines) > 1
    assert "this line is" in lines[0]


def test_process_scalar_list_fits():
    lines = []
    render.process_scalar_list(
        "conditions",
        lines,
        ["condition1", "condition2"],
    )
    assert lines == ["Conditions: condition1, condition2"]


def test_process_scalar_list_too_long():
    lines = []
    big_list = ["test_value" * 10, "another" * 5]
    render.process_scalar_list("giant_field", lines, big_list)
    assert lines[0].startswith("Giant Field:")
    assert lines[1].startswith("- test_value")


def test_render_key_value_short():
    lines = []
    render.render_key_value(lines, "nct_number", "nct100")
    assert lines == ["Nct Number: nct100"]


def test_render_key_value_long():
    lines = []
    render.render_key_value(lines, "brief_summary", "hello " * 15)
    # first line "brief summary:"
    assert lines[0] == "Brief Summary:"
    assert lines[1].startswith("  hello hello")

```

--------------------------------------------------------------------------------
/src/biomcp/articles/search_optimized.py:
--------------------------------------------------------------------------------

```python
"""Optimized article search with caching and parallel processing."""

import asyncio
import hashlib

from .. import ensure_list
from ..shared_context import get_search_context
from ..utils.request_cache import get_cache
from .search import PubmedRequest
from .unified import search_articles_unified

# Cache for article search results (5 minute TTL)
_search_cache = get_cache("article_search", ttl_seconds=300)


def _get_search_cache_key(
    request: PubmedRequest, include_preprints: bool, include_cbioportal: bool
) -> str:
    """Generate a cache key for search requests."""
    # Create a deterministic key from search parameters
    key_parts = [
        f"chemicals:{sorted(request.chemicals)}",
        f"diseases:{sorted(request.diseases)}",
        f"genes:{sorted(request.genes)}",
        f"keywords:{sorted(request.keywords)}",
        f"variants:{sorted(request.variants)}",
        f"preprints:{include_preprints}",
        f"cbioportal:{include_cbioportal}",
    ]
    key_string = "|".join(key_parts)
    return hashlib.sha256(key_string.encode()).hexdigest()


async def article_searcher_optimized(
    call_benefit: str,
    chemicals: list[str] | str | None = None,
    diseases: list[str] | str | None = None,
    genes: list[str] | str | None = None,
    keywords: list[str] | str | None = None,
    variants: list[str] | str | None = None,
    include_preprints: bool = True,
    include_cbioportal: bool = True,
) -> str:
    """Optimized version of article_searcher with caching and context reuse."""

    # Convert parameters to PubmedRequest
    request = PubmedRequest(
        chemicals=ensure_list(chemicals, split_strings=True),
        diseases=ensure_list(diseases, split_strings=True),
        genes=ensure_list(genes, split_strings=True),
        keywords=ensure_list(keywords, split_strings=True),
        variants=ensure_list(variants, split_strings=True),
    )

    # Check cache first
    cache_key = _get_search_cache_key(
        request, include_preprints, include_cbioportal
    )
    cached_result = await _search_cache.get(cache_key)
    if cached_result is not None:
        return cached_result

    # Check if we're in a search context (for reusing validated entities)
    context = get_search_context()
    if context and request.genes:
        # Pre-validate genes using cached results
        valid_genes = []
        for gene in request.genes:
            if await context.validate_gene(gene):
                valid_genes.append(gene)
        request.genes = valid_genes

        # Check if we have cached cBioPortal summaries
        if include_cbioportal and request.genes:
            for gene in request.genes[:1]:  # Just first gene
                summary = context.get_gene_summary(gene)
                if summary:
                    # We have a cached summary, can skip that part
                    pass

    # Perform the search
    result = await search_articles_unified(
        request,
        include_pubmed=True,
        include_preprints=include_preprints,
        include_cbioportal=include_cbioportal,
    )

    # Cache the result (5 minute TTL)
    await _search_cache.set(cache_key, result, ttl=300)

    return result


# Additional optimization: Batch article searches
class ArticleSearchBatcher:
    """Batch multiple article searches to reduce overhead."""

    def __init__(self, batch_size: int = 5, timeout: float = 0.1):
        self.batch_size = batch_size
        self.timeout = timeout
        self._pending_searches: list[tuple[PubmedRequest, asyncio.Future]] = []
        self._batch_task: asyncio.Task | None = None

    async def search(self, request: PubmedRequest) -> str:
        """Add a search to the batch."""
        future = asyncio.get_event_loop().create_future()
        self._pending_searches.append((request, future))

        # Start batch processing if not already running
        if self._batch_task is None or self._batch_task.done():
            self._batch_task = asyncio.create_task(self._process_batch())

        return await future

    async def _process_batch(self):
        """Process pending searches in batch."""
        await asyncio.sleep(self.timeout)  # Wait for more requests

        if not self._pending_searches:
            return

        # Take up to batch_size searches
        batch = self._pending_searches[: self.batch_size]
        self._pending_searches = self._pending_searches[self.batch_size :]

        # Process searches in parallel
        search_tasks = []
        for request, _ in batch:
            task = search_articles_unified(request, include_pubmed=True)
            search_tasks.append(task)

        results = await asyncio.gather(*search_tasks, return_exceptions=True)

        # Set results on futures
        for (_, future), result in zip(batch, results, strict=False):
            if isinstance(result, Exception):
                future.set_exception(result)
            else:
                future.set_result(result)


# Global batcher instance
_article_batcher = ArticleSearchBatcher()


async def article_searcher_batched(
    call_benefit: str,
    chemicals: list[str] | str | None = None,
    diseases: list[str] | str | None = None,
    genes: list[str] | str | None = None,
    keywords: list[str] | str | None = None,
    variants: list[str] | str | None = None,
    include_preprints: bool = True,
    include_cbioportal: bool = True,
) -> str:
    """Batched version of article_searcher for multiple concurrent searches."""

    request = PubmedRequest(
        chemicals=ensure_list(chemicals, split_strings=True),
        diseases=ensure_list(diseases, split_strings=True),
        genes=ensure_list(genes, split_strings=True),
        keywords=ensure_list(keywords, split_strings=True),
        variants=ensure_list(variants, split_strings=True),
    )

    # Use the optimized version with caching
    return await article_searcher_optimized(
        call_benefit=call_benefit,
        chemicals=request.chemicals,
        diseases=request.diseases,
        genes=request.genes,
        keywords=request.keywords,
        variants=request.variants,
        include_preprints=include_preprints,
        include_cbioportal=include_cbioportal,
    )

```

--------------------------------------------------------------------------------
/tests/tdd/variants/test_cbioportal_mutations.py:
--------------------------------------------------------------------------------

```python
"""Tests for cBioPortal mutation-specific search functionality."""

import pytest

from biomcp.utils.mutation_filter import MutationFilter
from biomcp.variants.cbioportal_mutations import (
    CBioPortalMutationClient,
    MutationHit,
    StudyMutationSummary,
    format_mutation_search_result,
)


class TestCBioPortalMutationSearch:
    """Test mutation-specific search functionality."""

    @pytest.mark.asyncio
    @pytest.mark.integration
    async def test_search_specific_mutation_srsf2_f57y(self):
        """Test searching for SRSF2 F57Y mutation."""
        client = CBioPortalMutationClient()

        result = await client.search_specific_mutation(
            gene="SRSF2", mutation="F57Y", max_studies=10
        )

        assert result is not None
        assert result.gene == "SRSF2"
        assert result.specific_mutation == "F57Y"
        assert result.studies_with_mutation >= 0

        # If mutations found, check structure
        if result.studies_with_mutation > 0:
            assert len(result.top_studies) > 0
            top_study = result.top_studies[0]
            assert isinstance(top_study, StudyMutationSummary)
            assert top_study.mutation_count > 0

    @pytest.mark.asyncio
    @pytest.mark.integration
    async def test_search_mutation_pattern_srsf2_f57(self):
        """Test searching for SRSF2 F57* mutations."""
        client = CBioPortalMutationClient()

        result = await client.search_specific_mutation(
            gene="SRSF2", pattern="F57*", max_studies=10
        )

        assert result is not None
        assert result.gene == "SRSF2"
        assert result.pattern == "F57*"

        # F57* should match F57Y, F57C, etc.
        if result.total_mutations > 0:
            assert result.mutation_types is not None
            # Check that we found some F57 mutations
            f57_mutations = [
                mut for mut in result.mutation_types if mut.startswith("F57")
            ]
            assert len(f57_mutations) > 0

    @pytest.mark.asyncio
    @pytest.mark.integration
    async def test_search_braf_v600e(self):
        """Test searching for BRAF V600E - a very common mutation."""
        client = CBioPortalMutationClient()

        result = await client.search_specific_mutation(
            gene="BRAF", mutation="V600E", max_studies=20
        )

        assert result is not None
        assert result.gene == "BRAF"
        assert result.specific_mutation == "V600E"
        # V600E is very common, should have many studies
        assert result.studies_with_mutation > 10
        assert len(result.top_studies) > 0

        # Check melanoma is in top cancer types
        cancer_types = [s.cancer_type for s in result.top_studies]
        # At least some melanoma studies should have V600E
        assert any("melanoma" in ct.lower() for ct in cancer_types)

    def test_filter_mutations_specific(self):
        """Test filtering for specific mutations."""
        mutations = [
            MutationHit(
                study_id="study1",
                molecular_profile_id="study1_mutations",
                protein_change="F57Y",
                mutation_type="Missense",
            ),
            MutationHit(
                study_id="study1",
                molecular_profile_id="study1_mutations",
                protein_change="F57C",
                mutation_type="Missense",
            ),
            MutationHit(
                study_id="study2",
                molecular_profile_id="study2_mutations",
                protein_change="R88Q",
                mutation_type="Missense",
            ),
        ]

        # Filter for F57Y
        mutation_filter = MutationFilter(specific_mutation="F57Y")
        filtered = mutation_filter.filter_mutations(mutations)
        assert len(filtered) == 1
        assert filtered[0].protein_change == "F57Y"

    def test_filter_mutations_pattern(self):
        """Test filtering with wildcard patterns."""
        mutations = [
            MutationHit(
                study_id="study1",
                molecular_profile_id="study1_mutations",
                protein_change="F57Y",
                mutation_type="Missense",
            ),
            MutationHit(
                study_id="study1",
                molecular_profile_id="study1_mutations",
                protein_change="F57C",
                mutation_type="Missense",
            ),
            MutationHit(
                study_id="study2",
                molecular_profile_id="study2_mutations",
                protein_change="R88Q",
                mutation_type="Missense",
            ),
        ]

        # Filter for F57*
        mutation_filter = MutationFilter(pattern="F57*")
        filtered = mutation_filter.filter_mutations(mutations)
        assert len(filtered) == 2
        assert all(m.protein_change.startswith("F57") for m in filtered)

    def test_format_mutation_search_result(self):
        """Test formatting of mutation search results."""
        from biomcp.variants.cbioportal_mutations import MutationSearchResult

        result = MutationSearchResult(
            gene="SRSF2",
            specific_mutation="F57Y",
            total_studies=100,
            studies_with_mutation=3,
            total_mutations=5,
            top_studies=[
                StudyMutationSummary(
                    study_id="msk_ch_2023",
                    study_name="Cancer Therapy and Clonal Hematopoiesis",
                    cancer_type="mixed",
                    mutation_count=5,
                    sample_count=100,
                ),
                StudyMutationSummary(
                    study_id="mds_mskcc_2020",
                    study_name="Myelodysplastic Syndrome Study",
                    cancer_type="mds",
                    mutation_count=2,
                    sample_count=50,
                ),
            ],
            mutation_types={"F57Y": 5},
        )

        formatted = format_mutation_search_result(result)

        assert "SRSF2" in formatted
        assert "F57Y" in formatted
        assert "**Studies with Mutation**: 3" in formatted
        assert "msk_ch_2023" in formatted
        assert "|     5 |" in formatted  # mutation count

```

--------------------------------------------------------------------------------
/docs/backend-services-reference/06-pubtator3.md:
--------------------------------------------------------------------------------

```markdown
# PubTator3 API

This document describes the PubTator3 API used by BioMCP for searching biomedical literature and retrieving article details with annotations. Understanding this API provides context for how BioMCP's article commands function.

## Overview

The PubTator3 API provides a way to search for and retrieve biomedical articles
with entity annotations. This document outlines the API implementation details.
PubTator3 is a web-based tool that provides annotations of biomedical entities
in PubMed abstracts and PMC full-text articles. BioMCP uses the PubTator3 API
to search for and retrieve biomedical articles and their annotated entities (
genes, variants, diseases, chemicals, etc.).

> **CLI Documentation**: For information on using these APIs through the BioMCP
> command line interface, see
> the [Articles CLI Documentation](../user-guides/01-command-line-interface.md#article-commands).

## Usage Guide

For practical examples of searching articles with PubTator3, see [How to Find Articles and cBioPortal Data](../how-to-guides/01-find-articles-and-cbioportal-data.md).

## API Workflow

The PubTator3 integration follows a three-step workflow:

1. **Entity Autocomplete**: Get standardized entity identifiers
2. **Search**: Find articles using entity identifiers and keywords
3. **Fetch**: Retrieve full article details by PMID

## API Endpoints

### Entity Autocomplete API

**Endpoint:**
`https://www.ncbi.nlm.nih.gov/research/pubtator3-api/entity/autocomplete/`

This endpoint helps normalize entity names to their standard identifiers,
improving search precision.

#### Parameters

| Parameter | Description                 | Example                             |
| --------- | --------------------------- | ----------------------------------- |
| `query`   | Text to autocomplete        | `BRAF`                              |
| `concept` | Entity type                 | `GENE`, `CHEMICAL`, `DISEASE`, etc. |
| `limit`   | Number of results to return | `2`                                 |

#### Example Request and Response

```bash
curl "https://www.ncbi.nlm.nih.gov/research/pubtator3-api/entity/autocomplete/?query=BRAF&concept=GENE&limit=2"
```

Response:

```json
[
  {
    "_id": "@GENE_BRAF",
    "biotype": "gene",
    "name": "BRAF",
    "description": "All Species",
    "match": "Matched on name <m>BRAF</m>"
  },
  {
    "_id": "@GENE_BRAFP1",
    "biotype": "gene",
    "name": "BRAFP1",
    "description": "All Species",
    "match": "Matched on name <m>BRAFP1</m>"
  }
]
```

### Entity Search API

**Endpoint:** `https://www.ncbi.nlm.nih.gov/research/pubtator3-api/search/`

This endpoint allows searching for PMIDs (PubMed IDs) based on entity
identifiers and keywords.

#### Parameters

| Parameter | Description                     | Example                |
| --------- | ------------------------------- | ---------------------- |
| `text`    | Entity identifier or text query | `@CHEMICAL_remdesivir` |

#### Example Request and Response

```bash
curl "https://www.ncbi.nlm.nih.gov/research/pubtator3-api/search/?text=@CHEMICAL_remdesivir"
```

Response (truncated):

```json
{
  "results": [
    {
      "_id": "37711410",
      "pmid": 37711410,
      "title": "Remdesivir.",
      "journal": "Hosp Pharm",
      "authors": ["Levien TL", "Baker DE"],
      "date": "2023-10-01T00:00:00Z",
      "doi": "10.1177/0018578721999804",
      "meta_date_publication": "2023 Oct",
      "meta_volume": "58"
    }
    // More results...
  ]
}
```

### Article Fetch API

**Endpoint:**
`https://www.ncbi.nlm.nih.gov/research/pubtator3-api/publications/export/biocjson`

This endpoint retrieves detailed information about specific articles, including
annotations.

#### Parameters

| Parameter   | Description                                   | Example    |
| ----------- | --------------------------------------------- | ---------- |
| `pmids`     | List of PubMed IDs to retrieve                | `29355051` |
| `full_text` | Whether to include full text (when available) | `true`     |

#### Example Request

```bash
curl "https://www.ncbi.nlm.nih.gov/research/pubtator3-api/publications/export/biocjson?pmids=29355051&full=true"
```

Response format (truncated):

```json
{
  "PubTator3": [
    {
      "_id": "29355051|PMC6142073",
      "id": "6142073",
      "infons": {},
      "passages": [
        {
          "infons": {
            "name_3": "surname:Hu;given-names:Minghua",
            "name_2": "surname:Luo;given-names:Xia",
            "name_1": "surname:Luo;given-names:Shuang",
            "article-id_pmid": "29355051"
            // More metadata...
          }
        }
        // More passages...
      ]
    }
  ]
}
```

## Entity Types

PubTator3 annotates several types of biomedical entities:

1. **Genes/Proteins**: Gene or protein names (e.g., BRAF, TP53)
2. **Genetic Variants**: Genetic variations (e.g., BRAF V600E)
3. **Diseases**: Disease names and conditions (e.g., Melanoma)
4. **Chemicals/Drugs**: Chemical substances or drugs (e.g., Vemurafenib)

## Integration Strategy for BioMCP

The recommended workflow for integrating with PubTator3 in BioMCP is:

1. **Entity Normalization**: Use the autocomplete API to convert user-provided
   entity names to standardized identifiers
2. **Literature Search**: Use the search API with these identifiers to find
   relevant PMIDs
3. **Data Retrieval**: Fetch detailed article data with annotations using the
   fetch API

This workflow ensures consistent entity handling and optimal search results.

## Authentication

The PubTator3 API is public and does not require authentication for basic
usage. However, there are rate limits in place to prevent abuse.

## Rate Limits and Best Practices

- **Request Limits**: Approximately 30 requests per minute
- **Batch Requests**: For article retrieval, batch multiple PMIDs in a single
  request
- **Caching**: Implement caching to minimize repeated requests
- **Specific Queries**: Use specific entity names rather than general terms for
  better results

## Error Handling

Common error responses:

- **400**: Invalid parameters
- **404**: Articles not found
- **429**: Rate limit exceeded
- **500**: Server error

## More Information

For complete API documentation, visit
the [PubTator3 API Documentation](https://www.ncbi.nlm.nih.gov/research/pubtator3/api).

```

--------------------------------------------------------------------------------
/docs/backend-services-reference/04-clinicaltrials-gov.md:
--------------------------------------------------------------------------------

```markdown
# ClinicalTrials.gov API

This document outlines the key aspects of the public ClinicalTrials.gov v2 API utilized by BioMCP. Understanding these details can be helpful for advanced users interpreting BioMCP results or for developers extending its capabilities. BioMCP's CLI commands often simplify or combine these parameters for ease of use; refer to the [Trials CLI Documentation](../user-guides/01-command-line-interface.md#trial-commands) for specific command options.

## Overview

The [ClinicalTrials.gov](https://clinicaltrials.gov/) API provides programmatic
access to clinical trial information. This document outlines the API
implementation details for searching and retrieving clinical trial data.

> **CLI Documentation**: For information on using these APIs through the BioMCP
> command line interface, see the [Trials CLI Documentation](../user-guides/01-command-line-interface.md#trial-commands).

## API Endpoints

### Search API

**Endpoint:** `https://clinicaltrials.gov/api/v2/studies`

This endpoint allows searching for clinical trials using various parameters.

#### Key Parameters

| Parameter              | Description                         | Example Value                                   |
| ---------------------- | ----------------------------------- | ----------------------------------------------- |
| `query.cond`           | "Conditions or disease" query       | `lung cancer`                                   |
| `query.term`           | "Other terms" query                 | `AREA[LastUpdatePostDate]RANGE[2023-01-15,MAX]` |
| `query.intr`           | "Intervention/treatment" query      | `Vemurafenib`                                   |
| `query.locn`           | "Location terms" query              | `New York`                                      |
| `query.titles`         | "Title/acronym" query               | `BRAF Melanoma`                                 |
| `query.outc`           | "Outcome measure" query             | `overall survival`                              |
| `query.spons`          | "Sponsor/collaborator" query        | `National Cancer Institute`                     |
| `query.lead`           | Searches in "LeadSponsorName" field | `MD Anderson`                                   |
| `query.id`             | "Study IDs" query (OR semantics)    | `NCT04267848`                                   |
| `filter.overallStatus` | Comma-separated list of statuses    | `NOT_YET_RECRUITING,RECRUITING`                 |
| `filter.geo`           | Geo-location filter                 | `distance(39.0035707,-77.1013313,50mi)`         |
| `filter.ids`           | Filter by NCT IDs (AND semantics)   | `NCT04852770,NCT01728545`                       |
| `filter.advanced`      | Advanced filter query               | `AREA[StartDate]2022`                           |
| `sort`                 | Sort order                          | `LastUpdatePostDate:desc`                       |
| `fields`               | Fields to return                    | `NCTId,BriefTitle,OverallStatus,HasResults`     |

| `countTotal` | Count total number of studies | `true` or `false` |

#### Example Request

```bash
curl -X GET "https://clinicaltrials.gov/api/v2/studies?query.cond=Melanoma&query.intr=BRAF"
```

### Study Details API

**Endpoint:** `https://clinicaltrials.gov/api/v2/studies/{NCT_ID}`

This endpoint retrieves detailed information about a specific clinical trial.

#### Example Request

```bash
curl -X GET "https://clinicaltrials.gov/api/v2/studies/NCT04267848"
```

#### Response Modules

The API response contains various modules of information:

- **protocolSection**: Basic study information, eligibility criteria, and
  design
- **resultsSection**: Study outcomes and results (when available)
- **documentSection**: Related documents
- **derivedSection**: Derived data elements
- **annotationsSection**: Additional annotations

## Implementation Details

### NCT ID Filtering Semantics

BioMCP uses intelligent filtering when NCT IDs are provided:

- **ID-only mode**: When NCT IDs are the only filter criteria, `query.id` is used for fast direct lookup
- **Intersection mode**: When NCT IDs are combined with other filters (conditions, interventions, etc.), `filter.ids` is used to ensure results match ALL criteria

This ensures that specifying NCT IDs restricts results rather than expanding them.

### Query Building

When constructing API queries, parameters must be properly formatted according to the API documentation.

For implementation details on query building in BioMCP, see the [HTTP Client Developer Guide](../developer-guides/06-http-client-and-caching.md).

### Response Parsing

The API returns data in JSON format (or CSV if specified). Key sections in the
response include:

- `protocolSection`: Contains study protocol details
  - `identificationModule`: Basic identifiers including NCT ID and title
  - `statusModule`: Current recruitment status and study dates
  - `sponsorCollaboratorsModule`: Information about sponsors and
    collaborators
  - `designModule`: Study design information including interventions
  - `eligibilityModule`: Inclusion/exclusion criteria and eligible population
  - `contactsLocationsModule`: Study sites and contact information
  - `referencesModule`: Related publications

### Error Handling

The API returns standard HTTP status codes. Common error scenarios include:

- **404**: Trial not found
- **429**: Rate limit exceeded
- **400**: Invalid query parameters

For implementation details on error handling in BioMCP, see the [Error Handling Developer Guide](../developer-guides/05-error-handling.md).

## Authentication

The ClinicalTrials.gov API is public and does not require authentication for
basic usage. However, there are rate limits in place.

## Rate Limits and Best Practices

- **Rate Limit**: Approximately 50 requests per minute per IP address
- **Caching**: Implement caching to minimize repeated requests
- **Pagination**: For large result sets, use the pagination functionality with

- **Focused Queries**: Use specific search terms rather than broad queries to
  get more relevant results
- **Field Selection**: Use the fields parameter to request only the data you
  need

## More Information

For complete API documentation, visit
the [ClinicalTrials.gov API Documentation](https://clinicaltrials.gov/data-api/about-api)

```

--------------------------------------------------------------------------------
/docs/how-to-guides/05-logging-and-monitoring-with-bigquery.md:
--------------------------------------------------------------------------------

```markdown
# BigQuery Logging for BioMCP

This document outlines how BioMCP uses Google BigQuery for logging user interactions and API usage.

## Overview

BioMCP integrates with Google BigQuery to log user interactions, queries, and API usage. This logging provides valuable insights into how the system is being used, helps with debugging, and enables analytics for improving the service.

## Prerequisites

- A Google Cloud Platform (GCP) account
- A BigQuery dataset and table created in your GCP project
- A GCP service account with BigQuery permissions

## Setting Up BigQuery for BioMCP

1. **Create a BigQuery Dataset and Table**

   - In the Google Cloud Console, navigate to BigQuery
   - Create a new dataset (e.g., `biomcp_logs`)
   - Create a table within the dataset (e.g., `worker_logs`) with the following schema:
     ```
     timestamp: TIMESTAMP
     userEmail: STRING
     query: STRING
     ```
   - Adjust the schema as needed for your specific logging requirements

2. **Create a Service Account**

   - Navigate to "IAM & Admin" > "Service Accounts" in the Google Cloud Console
   - Create a new service account with a descriptive name (e.g., `biomcp-bigquery-logger`)
   - Assign the "BigQuery Data Editor" role to the service account
   - Create and download a JSON key for the service account

3. **Configure BioMCP with BigQuery Credentials**

   - Open `wrangler.toml` in the BioMCP project
   - Update the following variables with your BigQuery information:
     ```toml
     BQ_PROJECT_ID = "your-gcp-project-id"
     BQ_DATASET = "biomcp_logs"
     BQ_TABLE = "worker_logs"
     ```
   - For the service account key, use Cloudflare's secret management:
     ```bash
     npx wrangler secret put BQ_SA_KEY_JSON
     ```
     When prompted, paste the entire JSON content of your service account key file

## How BigQuery Logging Works

The BioMCP worker uses the following process to log data to BigQuery:

1. **Authentication**: The worker generates a JWT token using the service account credentials
2. **Token Exchange**: The JWT is exchanged for a Google OAuth access token
3. **Data Insertion**: The worker uses BigQuery's streaming insert API to log events

The implementation includes:

- Token caching to minimize authentication requests
- Error handling for failed logging attempts
- Automatic retry logic for transient failures

## Logged Information

By default, the following information is logged to BigQuery:

- **timestamp**: When the event occurred
- **userEmail**: The email address of the authenticated user (if available)
- **query**: The query or request that was made

You can extend the logging schema to include additional information as needed.

## Accessing and Analyzing Logs

To access and analyze the logs:

1. **Query the BigQuery Table**

   - Use the BigQuery console or SQL to query your logs
   - Example query to see recent logs:
     ```sql
     SELECT timestamp, userEmail, query
     FROM `your-project.biomcp_logs.worker_logs`
     ORDER BY timestamp DESC
     LIMIT 100
     ```

2. **Create Visualizations**

   - Use Google Data Studio to create dashboards based on your BigQuery data
   - Connect Data Studio to your BigQuery table and create visualizations

## Security Considerations

- The service account key is sensitive information and should be protected
- Use Cloudflare's secret management to store the key securely
- Consider implementing field-level encryption for sensitive data
- Implement data retention policies to comply with privacy regulations
- **IMPORTANT: Never include PHI (Protected Health Information) or PII (Personally Identifiable Information) in queries or logs**
  - Ensure all queries are sanitized to remove patient identifiers, medical record numbers, and other sensitive information
  - Consider implementing automatic redaction of potential PHI/PII from logs
  - Regularly audit logs to ensure compliance with HIPAA and other privacy regulations
  - Remember that BigQuery logs are not designed for storing protected health information

### Automatic Sanitization

BioMCP automatically sanitizes sensitive data before logging to BigQuery:

- **API Keys and Secrets**: Fields containing `api_key`, `apiKey`, `api-key`, `token`, `secret`, or `password` are automatically redacted
- **Nested Objects**: Sanitization works recursively through nested objects and arrays
- **Case-Insensitive**: Field name matching is case-insensitive to catch variations
- **Preserved Structure**: The original request structure is maintained with sensitive values replaced by `[REDACTED]`

Example of sanitization:

```javascript
// Original request
{
  "params": {
    "arguments": {
      "api_key": "AIzaSyB1234567890",
      "gene": "BRAF"
    }
  }
}

// Sanitized for BigQuery
{
  "params": {
    "arguments": {
      "api_key": "[REDACTED]",
      "gene": "BRAF"
    }
  }
}
```

### Excluded Queries

Certain types of queries are automatically excluded from BigQuery logging:

- **Think Tool Calls**: Any calls to the `think` tool are not logged
- **Thinking Domain**: Queries with `domain="thinking"` or `domain="think"` are excluded
- **Privacy-First Design**: This ensures that internal reasoning and analysis steps remain private

## Troubleshooting

- **Authentication Failures**: Verify that the service account key is correctly formatted and has the necessary permissions
- **Insertion Errors**: Check that the BigQuery table schema matches the data being inserted
- **Missing Logs**: Ensure that the worker has network access to the BigQuery API

## Example Code

The worker includes the following key functions for BigQuery logging:

- `getBQToken()`: Fetches and caches a BigQuery OAuth token
- `insertEvent()`: Inserts a single row into BigQuery via streaming insert
- `sanitizeObject()`: Recursively sanitizes sensitive fields from objects before logging

These functions handle the authentication and data insertion process automatically.

## Testing

BioMCP includes comprehensive tests for the BigQuery logging functionality:

### JavaScript Tests

The sanitization logic is tested using Node.js built-in test framework:

```bash
# Run JavaScript worker tests
make test-js

# Or run directly
node --test tests/tdd/workers/test_worker_sanitization.js
```

Tests cover:

- API key redaction
- Nested sensitive field handling
- Array sanitization
- Case-insensitive field matching
- Think tool detection
- Domain-based filtering

```

--------------------------------------------------------------------------------
/src/biomcp/organizations/search.py:
--------------------------------------------------------------------------------

```python
"""Search functionality for organizations via NCI CTS API."""

import logging
from typing import Any

from ..constants import NCI_ORGANIZATIONS_URL
from ..integrations.cts_api import CTSAPIError, make_cts_request
from ..utils import parse_or_query

logger = logging.getLogger(__name__)


async def search_organizations(
    name: str | None = None,
    org_type: str | None = None,
    city: str | None = None,
    state: str | None = None,
    page_size: int = 20,
    page: int = 1,
    api_key: str | None = None,
) -> dict[str, Any]:
    """
    Search for organizations in the NCI CTS database.

    Args:
        name: Organization name to search for (partial match)
        org_type: Type of organization (e.g., "industry", "academic")
        city: City location
        state: State location (2-letter code)
        page_size: Number of results per page
        page: Page number
        api_key: Optional API key (if not provided, uses NCI_API_KEY env var)

    Returns:
        Dictionary with search results containing:
        - organizations: List of organization records
        - total: Total number of results
        - page: Current page
        - page_size: Results per page

    Raises:
        CTSAPIError: If the API request fails
    """
    # Build query parameters
    params: dict[str, Any] = {
        "size": page_size,
    }

    # Note: The NCI API doesn't support offset/page pagination for organizations
    # It uses cursor-based pagination or returns all results up to size limit

    # Add search filters with correct API parameter names
    if name:
        params["name"] = name
    if org_type:
        params["type"] = org_type
    if city:
        params["org_city"] = city
    if state:
        params["org_state_or_province"] = state

    try:
        # Make API request
        response = await make_cts_request(
            url=NCI_ORGANIZATIONS_URL,
            params=params,
            api_key=api_key,
        )

        # Process response - adapt to actual API format
        # This is a reasonable structure based on typical REST APIs
        organizations = response.get("data", response.get("organizations", []))
        total = response.get("total", len(organizations))

        return {
            "organizations": organizations,
            "total": total,
            "page": page,
            "page_size": page_size,
        }

    except CTSAPIError:
        raise
    except Exception as e:
        logger.error(f"Failed to search organizations: {e}")
        raise CTSAPIError(f"Organization search failed: {e!s}") from e


def format_organization_results(results: dict[str, Any]) -> str:
    """
    Format organization search results as markdown.

    Args:
        results: Search results dictionary

    Returns:
        Formatted markdown string
    """
    organizations = results.get("organizations", [])
    total = results.get("total", 0)

    if not organizations:
        return "No organizations found matching the search criteria."

    # Build markdown output
    lines = [
        f"## Organization Search Results ({total} found)",
        "",
    ]

    for org in organizations:
        org_id = org.get("id", org.get("org_id", "Unknown"))
        name = org.get("name", "Unknown Organization")
        org_type = org.get("type", org.get("category", "Unknown"))
        city = org.get("city", "")
        state = org.get("state", "")

        lines.append(f"### {name}")
        lines.append(f"- **ID**: {org_id}")
        lines.append(f"- **Type**: {org_type}")

        if city or state:
            location_parts = [p for p in [city, state] if p]
            lines.append(f"- **Location**: {', '.join(location_parts)}")

        lines.append("")

    return "\n".join(lines)


async def search_organizations_with_or(
    name_query: str,
    org_type: str | None = None,
    city: str | None = None,
    state: str | None = None,
    page_size: int = 20,
    page: int = 1,
    api_key: str | None = None,
) -> dict[str, Any]:
    """
    Search for organizations with OR query support.

    This function handles OR queries by making multiple API calls and combining results.
    For example: "MD Anderson OR Mayo Clinic" will search for each term.

    Args:
        name_query: Name query that may contain OR operators
        Other args same as search_organizations

    Returns:
        Combined results from all searches with duplicates removed
    """
    # Check if this is an OR query
    if " OR " in name_query or " or " in name_query:
        search_terms = parse_or_query(name_query)
        logger.info(f"Parsed OR query into terms: {search_terms}")
    else:
        # Single term search
        search_terms = [name_query]

    # Collect all unique organizations
    all_organizations = {}
    total_found = 0

    # Search for each term
    for term in search_terms:
        logger.info(f"Searching organizations for term: {term}")
        try:
            results = await search_organizations(
                name=term,
                org_type=org_type,
                city=city,
                state=state,
                page_size=page_size,
                page=page,
                api_key=api_key,
            )

            # Add unique organizations (deduplicate by ID)
            for org in results.get("organizations", []):
                org_id = org.get("id", org.get("org_id"))
                if org_id and org_id not in all_organizations:
                    all_organizations[org_id] = org

            total_found += results.get("total", 0)

        except Exception as e:
            logger.warning(f"Failed to search for term '{term}': {e}")
            # Continue with other terms

    # Convert back to list and apply pagination
    unique_organizations = list(all_organizations.values())

    # Sort by name for consistent results
    unique_organizations.sort(key=lambda x: x.get("name", "").lower())

    # Apply pagination to combined results
    start_idx = (page - 1) * page_size
    end_idx = start_idx + page_size
    paginated_organizations = unique_organizations[start_idx:end_idx]

    return {
        "organizations": paginated_organizations,
        "total": len(unique_organizations),
        "page": page,
        "page_size": page_size,
        "search_terms": search_terms,  # Include what we searched for
        "total_found_across_terms": total_found,  # Total before deduplication
    }

```

--------------------------------------------------------------------------------
/smithery.yaml:
--------------------------------------------------------------------------------

```yaml
# Smithery configuration file: https://smithery.ai/docs/config#smitheryyaml

startCommand:
  type: stdio
  configSchema:
    # JSON Schema defining the configuration options for the MCP.
    type: object
    properties: {}
  commandFunction:
    # A JS function that produces the CLI command based on the given config to start the MCP on stdio.
    |-
    (config) => ({ command: 'biomcp', args: ['run'], env: {} })
  exampleConfig: {}

schemas:
  TrialQuery:
    type: object
    properties:
      conditions:
        type: array
        items:
          type: string
        description: "List of condition terms."
      terms:
        type: array
        items:
          type: string
        description: "General search terms that don't fit specific categories."
      interventions:
        type: array
        items:
          type: string
        description: "Intervention names."
      recruiting_status:
        type: string
        description: "Study recruitment status."
      study_type:
        type: string
        description: "Type of study."
      nct_ids:
        type: array
        items:
          type: string
        description: "Clinical trial NCT IDs"
      lat:
        type: number
        description: "Latitude for location search"
      long:
        type: number
        description: "Longitude for location search"
      distance:
        type: integer
        description: "Distance from lat/long in miles"
      min_date:
        type: string
        description: "Minimum date for filtering"
      max_date:
        type: string
        description: "Maximum date for filtering"
      date_field:
        type: string
        description: "Date field to filter on"
      phase:
        type: string
        description: "Trial phase filter"
      age_group:
        type: string
        description: "Age group filter"
      primary_purpose:
        type: string
        description: "Primary purpose of the trial"
      intervention_type:
        type: string
        description: "Type of intervention"
      sponsor_type:
        type: string
        description: "Type of sponsor"
      study_design:
        type: string
        description: "Study design"
      sort:
        type: string
        description: "Sort order for results"
      next_page_hash:
        type: string
        description: "Token to retrieve the next page of results"

  VariantQuery:
    type: object
    properties:
      gene:
        type: string
        description: "Gene symbol to search for (e.g. BRAF, TP53)"
      hgvsp:
        type: string
        description: "Protein change notation (e.g., p.V600E, p.Arg557His)"
      hgvsc:
        type: string
        description: "cDNA notation (e.g., c.1799T>A)"
      rsid:
        type: string
        description: "dbSNP rsID (e.g., rs113488022)"
      region:
        type: string
        description: "Genomic region as chr:start-end (e.g. chr1:12345-67890)"
      significance:
        type: string
        description: "ClinVar clinical significance"
      max_frequency:
        type: number
        description: "Maximum population allele frequency threshold"
      min_frequency:
        type: number
        description: "Minimum population allele frequency threshold"
      cadd:
        type: number
        description: "Minimum CADD phred score"
      polyphen:
        type: string
        description: "PolyPhen-2 prediction"
      sift:
        type: string
        description: "SIFT prediction"
      sources:
        type: array
        items:
          type: string
        description: "Include only specific data sources"
      size:
        type: integer
        description: "Number of results to return"
        default: 40
      offset:
        type: integer
        description: "Result offset for pagination"
        default: 0

  PubmedRequest:
    type: object
    properties:
      chemicals:
        type: array
        items:
          type: string
        description: "List of chemicals for filtering results."
      diseases:
        type: array
        items:
          type: string
        description: "Diseases such as Hypertension, Lung Adenocarcinoma, etc."
      genes:
        type: array
        items:
          type: string
        description: "List of genes for filtering results."
      keywords:
        type: array
        items:
          type: string
        description: "List of other keywords for filtering results."
      variants:
        type: array
        items:
          type: string
        description: "List of variants for filtering results."

tools:
  trial_searcher:
    input:
      schema:
        type: object
        properties:
          query:
            $ref: "#/schemas/TrialQuery"
        required: ["query"]

  variant_searcher:
    input:
      schema:
        type: object
        properties:
          query:
            $ref: "#/schemas/VariantQuery"
        required: ["query"]

  article_searcher:
    input:
      schema:
        type: object
        properties:
          query:
            $ref: "#/schemas/PubmedRequest"
        required: ["query"]

  # Simple string parameter functions
  trial_protocol:
    input:
      schema:
        type: object
        properties:
          nct_id:
            type: string
            description: "A single NCT ID (e.g., NCT04280705)"
        required: ["nct_id"]

  trial_locations:
    input:
      schema:
        type: object
        properties:
          nct_id:
            type: string
            description: "A single NCT ID (e.g., NCT04280705)"
        required: ["nct_id"]

  trial_outcomes:
    input:
      schema:
        type: object
        properties:
          nct_id:
            type: string
            description: "A single NCT ID (e.g., NCT04280705)"
        required: ["nct_id"]

  trial_references:
    input:
      schema:
        type: object
        properties:
          nct_id:
            type: string
            description: "A single NCT ID (e.g., NCT04280705)"
        required: ["nct_id"]

  article_details:
    input:
      schema:
        type: object
        properties:
          pmid:
            type: string
            description: "A single PubMed ID (e.g., 34397683)"
        required: ["pmid"]

  variant_details:
    input:
      schema:
        type: object
        properties:
          variant_id:
            type: string
            description: "A variant identifier (e.g., chr7:g.140453136A>T)"
        required: ["variant_id"]

```

--------------------------------------------------------------------------------
/tests/tdd/openfda/test_adverse_events.py:
--------------------------------------------------------------------------------

```python
"""
Unit tests for OpenFDA adverse events integration.
"""

from unittest.mock import patch

import pytest

from biomcp.openfda.adverse_events import (
    get_adverse_event,
    search_adverse_events,
)


@pytest.mark.asyncio
async def test_search_adverse_events_by_drug():
    """Test searching adverse events by drug name."""
    mock_response = {
        "meta": {"results": {"total": 100}},
        "results": [
            {
                "patient": {
                    "drug": [
                        {
                            "medicinalproduct": "IMATINIB",
                            "openfda": {
                                "brand_name": ["GLEEVEC"],
                                "generic_name": ["IMATINIB MESYLATE"],
                            },
                        }
                    ],
                    "reaction": [
                        {"reactionmeddrapt": "NAUSEA"},
                        {"reactionmeddrapt": "FATIGUE"},
                    ],
                    "patientonsetage": "45",
                    "patientsex": 2,
                },
                "serious": "1",
                "seriousnesshospitalization": "1",
                "receivedate": "20240115",
            }
        ],
    }

    with patch(
        "biomcp.openfda.adverse_events.make_openfda_request"
    ) as mock_request:
        mock_request.return_value = (mock_response, None)

        result = await search_adverse_events(drug="imatinib", limit=10)

        # Verify the request was made correctly
        mock_request.assert_called_once()
        call_args = mock_request.call_args
        assert "imatinib" in call_args[0][1]["search"].lower()

        # Check the output contains expected information
        assert "FDA Adverse Event Reports" in result
        assert "imatinib" in result.lower()
        assert "NAUSEA" in result
        assert "FATIGUE" in result
        assert "100 reports" in result


@pytest.mark.asyncio
async def test_search_adverse_events_by_reaction():
    """Test searching adverse events by reaction."""
    mock_response = {
        "meta": {"results": {"total": 50}},
        "results": [
            {
                "patient": {
                    "drug": [{"medicinalproduct": "ASPIRIN"}],
                    "reaction": [{"reactionmeddrapt": "HEADACHE"}],
                },
                "serious": "0",
                "receivedate": "20240201",
            }
        ],
    }

    with patch(
        "biomcp.openfda.adverse_events.make_openfda_request"
    ) as mock_request:
        mock_request.return_value = (mock_response, None)

        result = await search_adverse_events(reaction="headache", limit=10)

        # Verify the request
        mock_request.assert_called_once()
        call_args = mock_request.call_args
        assert "headache" in call_args[0][1]["search"].lower()

        # Check output
        assert "HEADACHE" in result
        assert "50 reports" in result


@pytest.mark.asyncio
async def test_search_adverse_events_no_params():
    """Test that searching without parameters returns helpful message."""
    result = await search_adverse_events()

    assert "Please specify" in result
    assert "drug name or reaction" in result
    assert "Examples:" in result


@pytest.mark.asyncio
async def test_search_adverse_events_no_results():
    """Test handling when no results are found."""
    with patch(
        "biomcp.openfda.adverse_events.make_openfda_request"
    ) as mock_request:
        mock_request.return_value = ({"results": []}, None)

        result = await search_adverse_events(drug="nonexistentdrug")

        assert "No adverse event reports found" in result
        assert "nonexistentdrug" in result


@pytest.mark.asyncio
async def test_search_adverse_events_error():
    """Test error handling in adverse event search."""
    with patch(
        "biomcp.openfda.adverse_events.make_openfda_request"
    ) as mock_request:
        mock_request.return_value = (None, "API rate limit exceeded")

        result = await search_adverse_events(drug="aspirin")

        assert "Error searching adverse events" in result
        assert "API rate limit exceeded" in result


@pytest.mark.asyncio
async def test_get_adverse_event_detail():
    """Test getting detailed adverse event report."""
    mock_response = {
        "results": [
            {
                "safetyreportid": "12345678",
                "patient": {
                    "patientonsetage": "55",
                    "patientsex": 1,
                    "patientweight": "75",
                    "drug": [
                        {
                            "medicinalproduct": "DRUG A",
                            "drugindication": "HYPERTENSION",
                            "drugdosagetext": "100mg daily",
                            "drugadministrationroute": "048",
                            "actiondrug": 4,
                        }
                    ],
                    "reaction": [
                        {"reactionmeddrapt": "DIZZINESS", "reactionoutcome": 1}
                    ],
                },
                "serious": "1",
                "seriousnesshospitalization": "1",
                "receivedate": "20240115",
                "reporttype": 1,
            }
        ]
    }

    with patch(
        "biomcp.openfda.adverse_events.make_openfda_request"
    ) as mock_request:
        mock_request.return_value = (mock_response, None)

        result = await get_adverse_event("12345678")

        # Verify request
        mock_request.assert_called_once()
        call_args = mock_request.call_args
        assert "12345678" in call_args[0][1]["search"]

        # Check detailed output
        assert "12345678" in result
        assert "Patient Information" in result
        assert "55 years" in result
        assert "Male" in result
        assert "75 kg" in result
        assert "DRUG A" in result
        assert "HYPERTENSION" in result
        assert "100mg daily" in result
        assert "DIZZINESS" in result
        assert "Recovered/Resolved" in result


@pytest.mark.asyncio
async def test_get_adverse_event_not_found():
    """Test handling when adverse event report is not found."""
    with patch(
        "biomcp.openfda.adverse_events.make_openfda_request"
    ) as mock_request:
        mock_request.return_value = ({"results": []}, None)

        result = await get_adverse_event("NOTFOUND123")

        assert "NOTFOUND123" in result
        assert "not found" in result

```

--------------------------------------------------------------------------------
/src/biomcp/openfda/adverse_events_helpers.py:
--------------------------------------------------------------------------------

```python
"""
Helper functions for OpenFDA adverse events to reduce complexity.
"""

from collections import Counter
from typing import Any

from .utils import (
    extract_drug_names,
    extract_reactions,
    format_count,
    format_drug_list,
)


def format_search_summary(
    drug: str | None, reaction: str | None, serious: bool | None, total: int
) -> list[str]:
    """Format the search summary section."""
    output = []

    # Add search criteria
    search_desc = []
    if drug:
        search_desc.append(f"**Drug**: {drug}")
    if reaction:
        search_desc.append(f"**Reaction**: {reaction}")
    if serious is not None:
        search_desc.append(f"**Serious Events**: {'Yes' if serious else 'No'}")

    if search_desc:
        output.append(" | ".join(search_desc))
    output.append(
        f"**Total Reports Found**: {format_count(total, 'report')}\n"
    )

    return output


def format_top_reactions(results: list[dict[str, Any]]) -> list[str]:
    """Format top reported reactions from search results."""
    output = []
    all_reactions = []

    for result in results:
        all_reactions.extend(extract_reactions(result))

    if all_reactions:
        reaction_counts = Counter(all_reactions)
        top_reactions = reaction_counts.most_common(10)

        output.append("### Top Reported Reactions:")
        for rxn, count in top_reactions:
            percentage = (count / len(results)) * 100
            output.append(f"- **{rxn}**: {count} reports ({percentage:.1f}%)")
        output.append("")

    return output


def format_report_summary(
    result: dict[str, Any], report_num: int
) -> list[str]:
    """Format a single report summary."""
    output = [f"#### Report {report_num}"]

    # Extract key information
    drugs = extract_drug_names(result)
    reactions = extract_reactions(result)

    # Patient info
    patient = result.get("patient", {})
    age = patient.get("patientonsetage")
    sex_map = {0: "Unknown", 1: "Male", 2: "Female"}
    sex = sex_map.get(patient.get("patientsex"), "Unknown")

    # Serious outcomes
    serious_flag = result.get("serious", "0")
    outcomes = []
    for code in [
        "seriousnessdeath",
        "seriousnesslifethreatening",
        "seriousnesshospitalization",
        "seriousnessdisabling",
    ]:
        if result.get(code) == "1":
            outcomes.append(code.replace("seriousness", "").title())

    # Format output
    output.append(f"- **Drugs**: {format_drug_list(drugs)}")
    output.append(f"- **Reactions**: {', '.join(reactions[:5])}")
    if age:
        output.append(f"- **Patient**: {age} years, {sex}")
    if serious_flag == "1" and outcomes:
        output.append(f"- **Serious Outcome**: {', '.join(outcomes)}")

    # Dates
    receive_date = result.get("receivedate", "")
    if receive_date:
        output.append(
            f"- **Report Date**: {receive_date[:4]}-{receive_date[4:6]}-{receive_date[6:]}"
        )

    output.append("")
    return output


def format_drug_details(drugs: list[dict[str, Any]]) -> list[str]:
    """Format drug information details."""
    from .utils import clean_text

    output = ["### Drug Information"]

    for i, drug in enumerate(drugs, 1):
        output.append(
            f"\n#### Drug {i}: {drug.get('medicinalproduct', 'Unknown')}"
        )

        if "drugindication" in drug:
            output.append(f"- **Indication**: {drug['drugindication']}")

        if "drugdosagetext" in drug:
            dosage = clean_text(drug["drugdosagetext"])
            output.append(f"- **Dosage**: {dosage}")

        if "drugadministrationroute" in drug:
            output.append(f"- **Route**: {drug['drugadministrationroute']}")

        # Drug action taken
        action_map = {
            1: "Drug withdrawn",
            2: "Dose reduced",
            3: "Dose increased",
            4: "Dose not changed",
            5: "Unknown",
            6: "Not applicable",
        }
        action_code = drug.get("actiondrug")
        action = (
            action_map.get(action_code, "Unknown")
            if action_code is not None
            else "Unknown"
        )
        output.append(f"- **Action Taken**: {action}")

    output.append("")
    return output


def format_reaction_details(reactions: list[dict[str, Any]]) -> list[str]:
    """Format adverse reaction details."""
    output = ["### Adverse Reactions"]

    for reaction in reactions:
        rxn_name = reaction.get("reactionmeddrapt", "Unknown")
        outcome_map = {
            1: "Recovered/Resolved",
            2: "Recovering/Resolving",
            3: "Not recovered/Not resolved",
            4: "Recovered/Resolved with sequelae",
            5: "Fatal",
            6: "Unknown",
        }
        outcome_code = reaction.get("reactionoutcome")
        outcome = (
            outcome_map.get(outcome_code, "Unknown")
            if outcome_code is not None
            else "Unknown"
        )
        output.append(f"- **{rxn_name}**: {outcome}")

    output.append("")
    return output


def format_report_metadata(result: dict[str, Any]) -> list[str]:
    """Format report metadata information."""
    output = ["### Report Information"]

    receive_date = result.get("receivedate", "")
    if receive_date:
        formatted_date = (
            f"{receive_date[:4]}-{receive_date[4:6]}-{receive_date[6:]}"
        )
        output.append(f"- **Report Date**: {formatted_date}")

    report_type_map = {
        1: "Spontaneous",
        2: "Report from study",
        3: "Other",
        4: "Not available to sender",
    }
    report_type_code = result.get("reporttype")
    report_type = (
        report_type_map.get(report_type_code, "Unknown")
        if report_type_code is not None
        else "Unknown"
    )
    output.append(f"- **Report Type**: {report_type}")

    # Seriousness
    if result.get("serious") == "1":
        outcomes = []
        if result.get("seriousnessdeath") == "1":
            outcomes.append("Death")
        if result.get("seriousnesslifethreatening") == "1":
            outcomes.append("Life-threatening")
        if result.get("seriousnesshospitalization") == "1":
            outcomes.append("Hospitalization")
        if result.get("seriousnessdisabling") == "1":
            outcomes.append("Disability")
        if result.get("seriousnesscongenitalanomali") == "1":
            outcomes.append("Congenital anomaly")
        if result.get("seriousnessother") == "1":
            outcomes.append("Other serious")

        if outcomes:
            output.append(f"- **Serious Outcomes**: {', '.join(outcomes)}")

    return output

```

--------------------------------------------------------------------------------
/docs/blog/researcher-persona-resource.md:
--------------------------------------------------------------------------------

```markdown
# BioMCP Deep Researcher Persona

With the release of BioMCP v0.1.2, users can now access a specialized
Researcher Persona that transforms Claude into a rigorous biomedical research
assistant using BioMCP's built-in sequential thinking capabilities.

This persona is designed to leverage BioMCP's suite of tools for accessing
PubMed articles, ClinicalTrials.gov data, and genomic variant information,
while incorporating Claude's web search capabilities to produce comprehensive,
thoroughly-researched reports.

## How to Use the Researcher Persona

Getting started with the BioMCP Researcher Persona is straightforward:

1. Configure Claude Desktop by updating your configuration JSON with:

```json
{
  "mcpServers": {
    "biomcp": {
      "command": "uv",
      "args": ["run", "--with", "biomcp-python>=0.1.2", "biomcp", "run"]
    }
  }
}
```

2. Restart Claude Desktop (the `>=0.1.2` ensures the latest version is used, which includes the built-in think tool)

3. Select the "Researcher" persona from the dropdown menu
   ![Select Researcher Persona](./images/researcher-drop-down.png)

4. Ask your biomedical research question

The Researcher Persona will then work through its 10-step process, keeping you
updated on its progress and ultimately producing a comprehensive research
brief.

## Video Demonstration

Below is a video demonstrating the Researcher Persona in action:

[![▶️ Watch the video](./images/deep-researcher-video.png)](https://youtu.be/tBGG53O-7Hg)

## Sequential Thinking: A Rigorous 10-Step Research Process

What makes the Researcher Persona so powerful is its integration with BioMCP's
built-in 'think' tool, which guides the AI through a comprehensive
10-step research methodology:

1. **Topic Scoping & Domain Framework**: Creating a comprehensive structure to
   ensure complete coverage
2. **Initial Information Gathering**: Establishing baseline terminology and
   recent developments
3. **Focused & Frontier Retrieval**: Filling knowledge gaps and identifying
   cutting-edge developments
4. **Primary Trials Analysis**: Identifying and analyzing key clinical trials
5. **Primary Literature Analysis**: Identifying and analyzing pivotal
   publications
6. **Initial Evidence Synthesis**: Creating a preliminary framework of findings
7. **Integrated Gap-Filling**: Addressing identified knowledge gaps
8. **Comprehensive Evidence Synthesis**: Creating a final integrated framework
   with quality assessment
9. **Self-Critique and Verification**: Rigorously assessing the quality and
   comprehensiveness
10. **Research Brief Creation**: Producing the final deliverable with all
    required elements

[![View Researcher Persona](./images/researcher-prompt.png)](https://github.com/genomoncology/biomcp/blob/main/src/biomcp/resources/researcher.md)

This structured approach ensures that no important aspects of the research
question are overlooked and that the final output is comprehensive,
well-organized, and backed by current evidence.

## Put to the Test: Emerging Treatment Strategies for Head and Neck Cancer

To evaluate the effectiveness of the Researcher Persona, we conducted a
head-to-head comparison with other AI research approaches. We asked the same
question to five different systems: "What are the emerging treatment strategies
for head and neck cancer?"

The results were impressive. The BioMCP-powered Researcher Persona, combined
with Claude's web search capabilities and the built-in think tool,
produced the highest-rated research brief among all approaches tested.

[![Researcher Announcement](./images/researcher-announce.png)](https://github.com/genomoncology/biomcp-examples#researcher-announcement)

The research brief produced by the BioMCP Researcher Persona stood out for
several reasons:

1. **Comprehensive domain coverage**: The report covered all relevant treatment
   modalities (immunotherapy, targeted therapy, radiation techniques, surgery,
   combination approaches)
2. **Structured evidence categorization**: Findings were clearly organized by
   level of evidence (Established, Emerging, Experimental, Theoretical)
3. **Evidence quality assessment**: The brief included critical evaluation of
   source quality and evidence strength
4. **Thorough citation**: All claims were backed by specific references to
   scientific literature or clinical trials
5. **Self-critique**: The report included transparent limitations and
   identified areas requiring further research

## Explore the Example and Evaluations

We've documented this comparison in detail in
the [biomcp-examples repository](https://github.com/genomoncology/biomcp-examples),
where you can find:

- The full research briefs produced by each approach
- Independent evaluations by three different AI judges (Claude 3.7, Gemini 2.5
  Pro, and OpenAI o3)
- Detailed scoring against a rubric that prioritizes accuracy, clarity, and
  comprehensiveness
- Analysis of strengths and weaknesses of each approach

The consensus among the judges placed the BioMCP-powered brief at the top,
highlighting its exceptional structure, evidence-based approach, and
comprehensive coverage.

## Beyond the Example: Wide-Ranging Applications

While our example focused on head and neck cancer treatments, the BioMCP
Researcher Persona can tackle a wide range of biomedical research questions:

- **Therapeutic comparisons**: "Compare the efficacy and safety profiles of JAK
  inhibitors versus biologics for treating rheumatoid arthritis"
- **Disease mechanisms**: "What is the current understanding of gut microbiome
  dysbiosis in inflammatory bowel disease?"
- **Biomarker investigations**: "What emerging biomarkers show promise for
  early detection of pancreatic cancer?"
- **Treatment protocols**: "What are the latest guidelines for managing
  anticoagulation in patients with atrial fibrillation and chronic kidney
  disease?"

## Join the BioMCP Community

The Researcher Persona is just one example of how BioMCP is transforming
AI-assisted biomedical research. We invite you to:

1. Try the Researcher Persona with your own research questions
2. Contribute to
   the [biomcp-examples repository](https://github.com/genomoncology/biomcp-examples)
   with your experiments
3. Share your feedback and suggestions for future improvements

By combining specialized biomedical data access with structured research
methodologies, BioMCP is helping researchers produce more comprehensive,
accurate, and useful biomedical research briefs than ever before.

Have a complex biomedical research question? Give the BioMCP Researcher Persona
a try and experience the difference a structured, tool-powered approach can
make!

```

--------------------------------------------------------------------------------
/mkdocs.yml:
--------------------------------------------------------------------------------

```yaml
site_name: BioMCP
repo_url: https://github.com/genomoncology/biomcp
site_url: https://biomcp.org/
site_description: Biomedical Model Context Protocol Server
site_author: Ian Maurer
edit_uri: edit/main/docs/
repo_name: genomoncology/biomcp
copyright: Maintained by <a href="https://genomoncology.com">genomoncology</a>.

nav:
  - Home: index.md

  - Getting Started:
      - Quick Start: getting-started/01-quickstart-cli.md
      - Claude Desktop: getting-started/02-claude-desktop-integration.md
      - API Keys: getting-started/03-authentication-and-api-keys.md
      - FAQ: faq-condensed.md
      - Troubleshooting: troubleshooting.md

  - User Guide:
      - Overview: concepts/01-what-is-biomcp.md
      - Finding Articles: how-to-guides/01-find-articles-and-cbioportal-data.md
      - Finding Trials: how-to-guides/02-find-trials-with-nci-and-biothings.md
      - Analyzing Variants: how-to-guides/03-get-comprehensive-variant-annotations.md
      - Predicting Effects: how-to-guides/04-predict-variant-effects-with-alphagenome.md
      - Searching Organizations: how-to-guides/06-search-nci-organizations-and-interventions.md
      - Research Workflows: workflows/all-workflows.md
      - Examples:
          - Pydantic AI Integration: tutorials/pydantic-ai-integration.md
          - Remote Connection: tutorials/remote-connection.md
          - BioThings Examples: tutorials/biothings-prompts.md
          - NCI Examples: tutorials/nci-prompts.md
          - AlphaGenome Tutorial: tutorials/claude-code-biomcp-alphagenome.md
          - OpenFDA Examples: tutorials/openfda-prompts.md
      - Concepts:
          - Deep Researcher: concepts/02-the-deep-researcher-persona.md
          - Sequential Thinking: concepts/03-sequential-thinking-with-the-think-tool.md

  - Reference:
      - Quick Reference: reference/quick-reference.md
      - CLI Commands: user-guides/01-command-line-interface.md
      - MCP Tools: user-guides/02-mcp-tools-reference.md
      - API Documentation:
          - API Overview: apis/overview.md
          - Python SDK: apis/python-sdk.md
          - Error Codes: apis/error-codes.md
      - IDE Integration: user-guides/03-integrating-with-ides-and-clients.md

  - Developer:
      - Architecture:
          - Overview: reference/quick-architecture.md
          - Visual Diagrams: reference/visual-architecture.md
          - Detailed Diagrams: reference/architecture-diagrams.md
      - Data Sources:
          - Overview: backend-services-reference/01-overview.md
          - PubTator3/PubMed: backend-services-reference/06-pubtator3.md
          - ClinicalTrials.gov: backend-services-reference/04-clinicaltrials-gov.md
          - NCI CTS API: backend-services-reference/05-nci-cts-api.md
          - BioThings Suite: backend-services-reference/02-biothings-suite.md
          - cBioPortal: backend-services-reference/03-cbioportal.md
          - AlphaGenome: backend-services-reference/07-alphagenome.md
          - OpenFDA: tutorials/openfda-integration.md
      - Development:
          - Contributing: developer-guides/02-contributing-and-testing.md
          - Deployment: developer-guides/01-server-deployment.md
          - BigQuery Monitoring: how-to-guides/05-logging-and-monitoring-with-bigquery.md
      - Technical Details:
          - Transport Protocol: developer-guides/04-transport-protocol.md
          - Error Handling: developer-guides/05-error-handling.md
          - HTTP Client: developer-guides/06-http-client-and-caching.md
          - Performance: developer-guides/07-performance-optimizations.md
          - Third-Party APIs: developer-guides/03-third-party-endpoints.md
      - Security:
          - FDA Integration Security: FDA_SECURITY.md

  - About:
      - Blog:
          - Clinical Trial Search: blog/ai-assisted-clinical-trial-search-analysis.md
          - Researcher Persona: blog/researcher-persona-resource.md
      - Project:
          - Changelog: changelog.md
          - Policies: policies.md
          - GenomOncology: genomoncology.md

plugins:
  - search:
      lang: en
      separator: '[\s\-\.]+'
  - mkdocstrings:
      handlers:
        python:
          paths: ["src/biomcp"]
  # Note: sitemap plugin requires additional installation
  # Uncomment after installing: pip install mkdocs-sitemap
  # - sitemap:
  #     changefreq: weekly
  #     priority: 0.5
theme:
  name: material
  # custom_dir: overrides
  favicon: assets/favicon.ico
  logo: assets/icon.png
  features:
    - navigation.tabs
    - navigation.tabs.sticky
    - navigation.sections
    - navigation.instant
    - navigation.tracking
    - navigation.top
    - toc.follow
    - search.suggest
    - search.highlight
  palette:
    - media: "(prefers-color-scheme: light)"
      scheme: default
      primary: white
      accent: deep orange
      toggle:
        icon: material/brightness-7
        name: Switch to dark mode
    - media: "(prefers-color-scheme: dark)"
      scheme: slate
      primary: black
      accent: deep orange
      toggle:
        icon: material/brightness-4
        name: Switch to light mode
  icon:
    repo: fontawesome/brands/github

extra:
  social:
    - icon: fontawesome/brands/github
      link: https://github.com/genomoncology/biomcp
    - icon: fontawesome/brands/python
      link: https://pypi.org/project/biomcp-python
  meta:
    - property: og:type
      content: website
    - property: og:title
      content: BioMCP - Biomedical Model Context Protocol Server
    - property: og:description
      content: AI-powered biomedical research tool integrating PubMed, ClinicalTrials.gov, and genomic databases
    - property: og:image
      content: https://biomcp.org/assets/icon.png
    - property: og:url
      content: https://biomcp.org/
    - name: twitter:card
      content: summary
    - name: twitter:title
      content: BioMCP - Biomedical Model Context Protocol
    - name: twitter:description
      content: AI-powered biomedical research tool for PubMed, clinical trials, and genomic data
    - name: keywords
      content: biomedical, MCP, AI, PubMed, clinical trials, genomics, bioinformatics, Claude Desktop

extra_css:
  - stylesheets/extra.css
  - stylesheets/announcement.css

# extra_javascript: (removed - no third-party dependencies)
markdown_extensions:
  - toc:
      permalink: true
  - pymdownx.arithmatex:
      generic: true
  - admonition # Nice looking note/warning boxes
  - pymdownx.details # Collapsible sections
  - pymdownx.highlight: # Code highlighting
      anchor_linenums: true
  - pymdownx.inlinehilite
  - pymdownx.snippets # Include content from other files
  - pymdownx.superfences # Nested code blocks
  - pymdownx.tabbed: # Tabbed content
      alternate_style: true

```

--------------------------------------------------------------------------------
/tests/tdd/variants/test_getter.py:
--------------------------------------------------------------------------------

```python
"""Tests for variant getter module."""

from unittest.mock import AsyncMock, patch

import pytest

from biomcp.constants import DEFAULT_ASSEMBLY
from biomcp.variants import getter


class TestGetVariant:
    """Test the get_variant function."""

    @pytest.mark.asyncio
    async def test_get_variant_default_assembly(self):
        """Test that get_variant defaults to hg19 assembly."""
        mock_response = {
            "_id": "rs113488022",
            "dbsnp": {"rsid": "rs113488022"},
        }

        with patch("biomcp.http_client.request_api") as mock_request:
            mock_request.return_value = (mock_response, None)

            await getter.get_variant("rs113488022")

            # Verify assembly parameter was passed with default value
            call_args = mock_request.call_args
            assert call_args[1]["request"]["assembly"] == "hg19"

    @pytest.mark.asyncio
    async def test_get_variant_hg38_assembly(self):
        """Test that get_variant accepts hg38 assembly parameter."""
        mock_response = {
            "_id": "rs113488022",
            "dbsnp": {"rsid": "rs113488022"},
        }

        with patch("biomcp.http_client.request_api") as mock_request:
            mock_request.return_value = (mock_response, None)

            await getter.get_variant("rs113488022", assembly="hg38")

            # Verify assembly parameter was passed correctly
            call_args = mock_request.call_args
            assert call_args[1]["request"]["assembly"] == "hg38"

    @pytest.mark.asyncio
    async def test_get_variant_hg19_assembly(self):
        """Test that get_variant accepts hg19 assembly parameter explicitly."""
        mock_response = {
            "_id": "rs113488022",
            "dbsnp": {"rsid": "rs113488022"},
        }

        with patch("biomcp.http_client.request_api") as mock_request:
            mock_request.return_value = (mock_response, None)

            await getter.get_variant("rs113488022", assembly="hg19")

            # Verify assembly parameter was passed correctly
            call_args = mock_request.call_args
            assert call_args[1]["request"]["assembly"] == "hg19"

    @pytest.mark.asyncio
    async def test_get_variant_includes_all_fields(self):
        """Test that request includes all required fields."""
        mock_response = {"_id": "rs113488022"}

        with patch("biomcp.http_client.request_api") as mock_request:
            mock_request.return_value = (mock_response, None)

            await getter.get_variant("rs113488022", assembly="hg38")

            # Verify both fields and assembly are in request
            call_args = mock_request.call_args
            request_params = call_args[1]["request"]
            assert "fields" in request_params
            assert request_params["fields"] == "all"
            assert "assembly" in request_params
            assert request_params["assembly"] == "hg38"

    @pytest.mark.asyncio
    async def test_get_variant_with_external_annotations(self):
        """Test that assembly parameter works with external annotations."""
        from biomcp.variants.external import EnhancedVariantAnnotation

        mock_response = {
            "_id": "rs113488022",
            "dbsnp": {"rsid": "rs113488022"},
            "dbnsfp": {"genename": "BRAF"},
        }

        with (
            patch("biomcp.http_client.request_api") as mock_request,
            patch(
                "biomcp.variants.getter.ExternalVariantAggregator"
            ) as mock_aggregator,
        ):
            mock_request.return_value = (mock_response, None)

            # Mock the aggregator with proper EnhancedVariantAnnotation
            mock_enhanced = EnhancedVariantAnnotation(
                variant_id="rs113488022",
                tcga=None,
                thousand_genomes=None,
                cbioportal=None,
                error_sources=[],
            )

            mock_agg_instance = AsyncMock()
            mock_agg_instance.get_enhanced_annotations = AsyncMock(
                return_value=mock_enhanced
            )
            mock_aggregator.return_value = mock_agg_instance

            await getter.get_variant(
                "rs113488022",
                assembly="hg38",
                include_external=True,
            )

            # Verify assembly was still passed correctly
            call_args = mock_request.call_args
            assert call_args[1]["request"]["assembly"] == "hg38"


class TestVariantDetailsMCPTool:
    """Test the _variant_details MCP tool."""

    @pytest.mark.asyncio
    async def test_variant_details_default_assembly(self):
        """Test that _variant_details defaults to hg19 assembly."""
        with patch("biomcp.variants.getter.get_variant") as mock_get:
            mock_get.return_value = "Variant details"

            await getter._variant_details(
                call_benefit="Testing default assembly",
                variant_id="rs113488022",
            )

            # Verify get_variant was called with default assembly
            mock_get.assert_called_once_with(
                "rs113488022",
                output_json=False,
                include_external=True,
                assembly=DEFAULT_ASSEMBLY,
            )

    @pytest.mark.asyncio
    async def test_variant_details_custom_assembly(self):
        """Test that _variant_details accepts custom assembly parameter."""
        with patch("biomcp.variants.getter.get_variant") as mock_get:
            mock_get.return_value = "Variant details"

            await getter._variant_details(
                call_benefit="Testing hg38 assembly",
                variant_id="rs113488022",
                assembly="hg38",
            )

            # Verify get_variant was called with hg38
            mock_get.assert_called_once_with(
                "rs113488022",
                output_json=False,
                include_external=True,
                assembly="hg38",
            )

    @pytest.mark.asyncio
    async def test_variant_details_with_all_params(self):
        """Test that all parameters are passed through correctly."""
        with patch("biomcp.variants.getter.get_variant") as mock_get:
            mock_get.return_value = "Variant details"

            await getter._variant_details(
                call_benefit="Testing all parameters",
                variant_id="chr7:g.140453136A>T",
                include_external=False,
                assembly="hg19",
            )

            # Verify all params were passed
            mock_get.assert_called_once_with(
                "chr7:g.140453136A>T",
                output_json=False,
                include_external=False,
                assembly="hg19",
            )

```

--------------------------------------------------------------------------------
/docs/developer-guides/04-transport-protocol.md:
--------------------------------------------------------------------------------

```markdown
# Transport Protocol Guide

This guide explains BioMCP's transport protocol options, with a focus on the new Streamable HTTP transport that provides better scalability and reliability for production deployments.

## Overview

BioMCP supports multiple transport protocols to accommodate different deployment scenarios:

| Transport           | Use Case                                     | Endpoint | Protocol Version |
| ------------------- | -------------------------------------------- | -------- | ---------------- |
| **STDIO**           | Local development, direct Claude integration | N/A      | All              |
| **Worker/SSE**      | Legacy cloud deployments                     | `/sse`   | Pre-2025         |
| **Streamable HTTP** | Modern cloud deployments                     | `/mcp`   | 2025-03-26+      |

## Streamable HTTP Transport

### What is Streamable HTTP?

Streamable HTTP is the latest MCP transport protocol (specification version 2025-03-26) that provides:

- **Single endpoint** (`/mcp`) for all operations
- **Dynamic response modes**: JSON for quick operations, SSE for long-running tasks
- **Session management** via `session_id` query parameter
- **Better scalability**: No permanent connections required
- **Automatic reconnection** and session recovery

### Architecture

The Streamable HTTP transport follows this flow:

1. **MCP Client** sends POST request to `/mcp` endpoint
2. **BioMCP Server** processes the request
3. **Response Type** determined by operation:
   - Quick operations return JSON response
   - Long operations return SSE stream
4. **Session Management** maintains state via session_id parameter

### Implementation Details

BioMCP leverages FastMCP's native streamable HTTP support:

```python
# In core.py
mcp_app = FastMCP(
    name="BioMCP",
    stateless_http=True,  # Enables streamable HTTP
)
```

The transport is automatically handled by FastMCP 1.12.3+, providing:

- Request routing
- Session management
- Response type negotiation
- Error handling

## Migration Guide

### From SSE to Streamable HTTP

If you're currently using the legacy SSE transport, migrate to streamable HTTP:

#### 1. Update Server Configuration

**Before (SSE/Worker mode):**

```bash
biomcp run --mode worker
```

**After (Streamable HTTP):**

```bash
biomcp run --mode streamable_http
```

#### 2. Update Client Configuration

**MCP Inspector:**

```bash
npx @modelcontextprotocol/inspector uv run --with . biomcp run --mode streamable_http
```

**Claude Desktop Configuration:**

```json
{
  "mcpServers": {
    "biomcp": {
      "command": "docker",
      "args": [
        "run",
        "-p",
        "8000:8000",
        "biomcp:latest",
        "biomcp",
        "run",
        "--mode",
        "streamable_http"
      ]
    }
  }
}
```

#### 3. Update Cloudflare Worker

The worker now supports both GET (legacy SSE) and POST (streamable HTTP) on the `/mcp` endpoint:

```javascript
// Automatically routes based on method
.get("/mcp", async (c) => {
  // Legacy SSE transport
})
.post("/mcp", async (c) => {
  // Streamable HTTP transport
})
```

### Backward Compatibility

All legacy endpoints remain functional:

- `/sse` - Server-sent events transport
- `/health` - Health check endpoint
- `/events` - Event streaming endpoint

## Configuration Options

### Server Modes

```bash
# Local development (STDIO)
biomcp run

# Legacy SSE transport
biomcp run --mode worker

# Modern streamable HTTP
biomcp run --mode streamable_http --host 0.0.0.0 --port 8000
```

### Environment Variables

| Variable        | Description             | Default |
| --------------- | ----------------------- | ------- |
| `MCP_TRANSPORT` | Override transport mode | None    |
| `MCP_HOST`      | Server bind address     | 0.0.0.0 |
| `MCP_PORT`      | Server port             | 8000    |

## Session Management

Streamable HTTP uses session IDs to maintain state across requests:

```http
POST /mcp?session_id=abc123 HTTP/1.1
Content-Type: application/json

{
  "jsonrpc": "2.0",
  "method": "initialize",
  "params": {...}
}
```

Sessions are:

- Created automatically on first request
- Maintained in server memory
- Cleaned up after inactivity timeout
- Isolated between different clients

## Performance Considerations

### Response Mode Selection

The server automatically selects the optimal response mode:

| Operation Type    | Response Mode | Example                |
| ----------------- | ------------- | ---------------------- |
| Quick queries     | JSON          | `search(limit=10)`     |
| Large results     | SSE           | `search(limit=1000)`   |
| Real-time updates | SSE           | Thinking tool progress |

### Optimization Tips

1. **Use session IDs** for related requests to avoid re-initialization
2. **Batch operations** when possible to reduce round trips
3. **Set appropriate timeouts** for long-running operations
4. **Monitor response times** to identify bottlenecks

## Troubleshooting

### Common Issues

#### 1. Connection Refused

```
Error: connect ECONNREFUSED 127.0.0.1:8000
```

**Solution**: Ensure server is running with `--host 0.0.0.0` for Docker deployments.

#### 2. Session Not Found

```
Error: Session 'xyz' not found
```

**Solution**: Session may have expired. Omit session_id to create new session.

#### 3. Timeout on Large Results

```
Error: Request timeout after 30s
```

**Solution**: Increase client timeout or reduce result size with `limit` parameter.

### Debug Mode

Enable debug logging to troubleshoot transport issues:

```bash
LOG_LEVEL=debug biomcp run --mode streamable_http
```

## Security Considerations

### Authentication

BioMCP does not implement authentication at the transport layer. Secure your deployment using:

- **API Gateway**: AWS API Gateway, Kong, etc.
- **Reverse Proxy**: Nginx with auth modules
- **Cloud IAM**: Platform-specific access controls

### Rate Limiting

Implement rate limiting at the infrastructure layer:

```nginx
# Nginx example
limit_req_zone $binary_remote_addr zone=mcp:10m rate=10r/s;

location /mcp {
    limit_req zone=mcp burst=20;
    proxy_pass http://biomcp:8000;
}
```

### CORS Configuration

For browser-based clients, configure CORS headers:

```python
# Handled automatically by FastMCP when stateless_http=True
```

## Monitoring

### Health Checks

```bash
# Check server health
curl http://localhost:8000/health

# Response
{"status": "ok", "transport": "streamable_http"}
```

### Metrics

Monitor these key metrics:

- Request rate on `/mcp` endpoint
- Response time percentiles (p50, p95, p99)
- Session count and duration
- Error rate by error type

## Next Steps

- Review [MCP Specification](https://spec.modelcontextprotocol.io) for protocol details

For questions or issues, please visit our [GitHub repository](https://github.com/genomoncology/biomcp).

```

--------------------------------------------------------------------------------
/tests/tdd/test_europe_pmc_fetch.py:
--------------------------------------------------------------------------------

```python
"""Tests for Europe PMC article fetching via DOI."""

import json
from unittest.mock import Mock, patch

import pytest

from biomcp.articles.fetch import _article_details, is_doi, is_pmid
from biomcp.articles.preprints import fetch_europe_pmc_article


class TestDOIDetection:
    """Test DOI and PMID detection functions."""

    def test_valid_dois(self):
        """Test that valid DOIs are correctly identified."""
        valid_dois = [
            "10.1101/2024.01.20.23288905",
            "10.1038/nature12373",
            "10.1016/j.cell.2023.05.001",
            "10.1126/science.abc1234",
        ]
        for doi in valid_dois:
            assert (
                is_doi(doi) is True
            ), f"Expected {doi} to be identified as DOI"
            assert (
                is_pmid(doi) is False
            ), f"Expected {doi} NOT to be identified as PMID"

    def test_valid_pmids(self):
        """Test that valid PMIDs are correctly identified."""
        valid_pmids = [
            "35271234",
            "12345678",
            "1",
            "999999999",
        ]
        for pmid in valid_pmids:
            assert (
                is_pmid(pmid) is True
            ), f"Expected {pmid} to be identified as PMID"
            assert (
                is_doi(pmid) is False
            ), f"Expected {pmid} NOT to be identified as DOI"

    def test_invalid_identifiers(self):
        """Test that invalid identifiers are rejected by both functions."""
        invalid_ids = [
            "PMC11193658",  # PMC ID
            "abc123",  # Random string
            "10.1101",  # Incomplete DOI
            "nature12373",  # DOI without prefix
            "",  # Empty string
        ]
        for identifier in invalid_ids:
            assert (
                is_doi(identifier) is False
            ), f"Expected {identifier} NOT to be identified as DOI"
            assert (
                is_pmid(identifier) is False
            ), f"Expected {identifier} NOT to be identified as PMID"


class TestEuropePMCFetch:
    """Test Europe PMC article fetching."""

    @pytest.mark.asyncio
    async def test_fetch_europe_pmc_article_success(self):
        """Test successful fetch from Europe PMC."""
        # Mock the response
        mock_response = Mock()
        mock_response.hitCount = 1
        mock_response.results = [
            Mock(
                id="PPR790987",
                source="PPR",
                pmid=None,
                pmcid=None,
                doi="10.1101/2024.01.20.23288905",
                title="Test Article Title",
                authorString="Author A, Author B, Author C",
                journalTitle=None,
                pubYear="2024",
                firstPublicationDate="2024-01-23",
                abstractText="This is the abstract text.",
            )
        ]

        with patch(
            "biomcp.articles.preprints.http_client.request_api"
        ) as mock_request:
            mock_request.return_value = (mock_response, None)

            result = await fetch_europe_pmc_article(
                "10.1101/2024.01.20.23288905", output_json=True
            )
            data = json.loads(result)

            assert len(data) == 1
            article = data[0]
            assert article["doi"] == "10.1101/2024.01.20.23288905"
            assert article["title"] == "Test Article Title"
            assert article["journal"] == "Preprint Server (preprint)"
            assert article["date"] == "2024-01-23"
            assert article["authors"] == ["Author A", "Author B", "Author C"]
            assert article["abstract"] == "This is the abstract text."
            assert article["source"] == "Europe PMC"
            assert article["pmid"] is None
            assert "europepmc.org" in article["pmc_url"]

    @pytest.mark.asyncio
    async def test_fetch_europe_pmc_article_not_found(self):
        """Test fetch when article is not found in Europe PMC."""
        mock_response = Mock()
        mock_response.hitCount = 0
        mock_response.results = []

        with patch(
            "biomcp.articles.preprints.http_client.request_api"
        ) as mock_request:
            mock_request.return_value = (mock_response, None)

            result = await fetch_europe_pmc_article(
                "10.1101/invalid.doi", output_json=True
            )
            data = json.loads(result)

            assert len(data) == 1
            assert data[0]["error"] == "Article not found in Europe PMC"

    @pytest.mark.asyncio
    async def test_fetch_europe_pmc_article_error(self):
        """Test fetch when Europe PMC API returns an error."""
        mock_error = Mock()
        mock_error.code = 500
        mock_error.message = "Internal Server Error"

        with patch(
            "biomcp.articles.preprints.http_client.request_api"
        ) as mock_request:
            mock_request.return_value = (None, mock_error)

            result = await fetch_europe_pmc_article(
                "10.1101/2024.01.20.23288905", output_json=True
            )
            data = json.loads(result)

            assert len(data) == 1
            assert data[0]["error"] == "Error 500: Internal Server Error"


class TestArticleDetailsRouting:
    """Test that _article_details correctly routes DOIs to Europe PMC."""

    @pytest.mark.asyncio
    async def test_doi_routes_to_europe_pmc(self):
        """Test that DOIs are routed to fetch_europe_pmc_article."""
        test_doi = "10.1101/2024.01.20.23288905"

        with patch(
            "biomcp.articles.preprints.fetch_europe_pmc_article"
        ) as mock_europe_pmc:
            mock_europe_pmc.return_value = "Europe PMC result"

            result = await _article_details("Test", test_doi)

            mock_europe_pmc.assert_called_once_with(test_doi, output_json=True)
            assert result == "Europe PMC result"

    @pytest.mark.asyncio
    async def test_pmid_routes_to_pubtator(self):
        """Test that PMIDs are routed to fetch_articles."""
        test_pmid = "35271234"

        with patch(
            "biomcp.articles.fetch.fetch_articles"
        ) as mock_fetch_articles:
            mock_fetch_articles.return_value = "PubTator result"

            result = await _article_details("Test", test_pmid)

            mock_fetch_articles.assert_called_once_with(
                [35271234], full=True, output_json=True
            )
            assert result == "PubTator result"

    @pytest.mark.asyncio
    async def test_invalid_identifier_returns_error(self):
        """Test that invalid identifiers return an error."""
        invalid_id = "PMC12345"

        result = await _article_details("Test", invalid_id)

        data = json.loads(result)
        assert len(data) == 1
        assert "Invalid identifier format" in data[0]["error"]
        assert "PMC12345" in data[0]["error"]

```

--------------------------------------------------------------------------------
/src/biomcp/workers/worker_entry.js:
--------------------------------------------------------------------------------

```javascript
/**
 * BioMCP Worker – Auth‑less version (rev 1.8)
 *
 *  Fix: Added improved error handling and increased timeouts for list requests
 */

// Server URL will be configured from environment variables
let REMOTE_MCP_SERVER_URL = "http://localhost:8000"; // Default fallback
const DEBUG = true;

const log = (m) => DEBUG && console.log("[DEBUG]", m);
const CORS = {
  "Access-Control-Allow-Origin": "*",
  "Access-Control-Allow-Methods": "GET, POST, OPTIONS",
  "Access-Control-Allow-Headers": "*",
  "Access-Control-Max-Age": "86400",
};
const json = (o, s = 200) =>
  new Response(JSON.stringify(o, null, 2), {
    status: s,
    headers: { "Content-Type": "application/json", ...CORS },
  });

let forwardPath = "/messages"; // for proxying JSON‑RPC POSTS (no query)
let resourceEndpoint = null; // full string we echo back (/messages/?sid=…)

// Track active SSE connections to avoid duplicate connections
const activeConnections = new Map();

export default {
  async fetch(req, env, ctx) {
    // Use environment variable if available, otherwise use the default
    REMOTE_MCP_SERVER_URL = env.REMOTE_MCP_SERVER_URL || REMOTE_MCP_SERVER_URL;

    const url = new URL(req.url);
    log(`${req.method} ${url.pathname}${url.search}`);

    if (req.method === "OPTIONS")
      return new Response(null, { status: 204, headers: CORS });
    if (url.pathname === "/status" || url.pathname === "/debug")
      return json({
        worker: "BioMCP-authless",
        remote: REMOTE_MCP_SERVER_URL,
        forwardPath,
        resourceEndpoint,
      });
    if (url.pathname === "/sse" || url.pathname === "/events")
      return serveSSE(req, ctx);

    if (req.method === "POST") {
      const sid = url.searchParams.get("session_id");
      if (!sid) return new Response("Missing session_id", { status: 400 });
      return proxyPost(req, forwardPath, sid);
    }

    return new Response("Not found", { status: 404 });
  },
};

async function proxyPost(req, path, sid) {
  const body = await req.text();
  const target = `${REMOTE_MCP_SERVER_URL}${path}?session_id=${encodeURIComponent(
    sid,
  )}`;

  try {
    // Parse the request to check if it's a list request that might need a longer timeout
    let jsonBody;
    try {
      jsonBody = JSON.parse(body);
    } catch (e) {
      // Not valid JSON, proceed with normal request
      jsonBody = {};
    }

    // Set a longer timeout for list requests that tend to time out
    const timeout =
      jsonBody.method &&
      (jsonBody.method === "tools/list" || jsonBody.method === "resources/list")
        ? 30000
        : 10000;

    // Use AbortController to implement timeout
    const controller = new AbortController();
    const timeoutId = setTimeout(() => controller.abort(), timeout);

    log(`Proxying ${jsonBody.method || "request"} with timeout ${timeout}ms`);

    const resp = await fetch(target, {
      method: "POST",
      headers: { "Content-Type": "application/json" },
      body,
      signal: controller.signal,
    });

    clearTimeout(timeoutId);

    // If it's a list request, cache the response for future use
    if (
      jsonBody.method &&
      (jsonBody.method === "tools/list" || jsonBody.method === "resources/list")
    ) {
      log(`Received response for ${jsonBody.method}`);
    }

    return new Response(await resp.text(), {
      status: resp.status,
      headers: { "Content-Type": "application/json", ...CORS },
    });
  } catch (error) {
    log(`POST error: ${error.message}`);

    // For timeout errors, provide a default empty response for list requests
    if (error.name === "AbortError") {
      try {
        const jsonBody = JSON.parse(body);
        if (jsonBody.method === "tools/list") {
          log("Returning empty tools list due to timeout");
          return new Response(
            JSON.stringify({
              jsonrpc: "2.0",
              id: jsonBody.id,
              result: { tools: [] },
            }),
            {
              status: 200,
              headers: { "Content-Type": "application/json", ...CORS },
            },
          );
        } else if (jsonBody.method === "resources/list") {
          log("Returning empty resources list due to timeout");
          return new Response(
            JSON.stringify({
              jsonrpc: "2.0",
              id: jsonBody.id,
              result: { resources: [] },
            }),
            {
              status: 200,
              headers: { "Content-Type": "application/json", ...CORS },
            },
          );
        }
      } catch (e) {
        // If parsing fails, fall through to default error response
      }
    }

    return new Response(JSON.stringify({ error: error.message }), {
      status: 502,
      headers: { "Content-Type": "application/json", ...CORS },
    });
  }
}

function serveSSE(clientReq, ctx) {
  const enc = new TextEncoder();
  let keepalive;
  const upstreamCtl = new AbortController();

  const stream = new ReadableStream({
    async start(ctrl) {
      ctrl.enqueue(enc.encode("event: ready\ndata: {}\n\n"));

      clientReq.signal.addEventListener("abort", () => {
        clearInterval(keepalive);
        upstreamCtl.abort();
        ctrl.close();
      });

      try {
        const u = await fetch(`${REMOTE_MCP_SERVER_URL}/sse`, {
          headers: { Accept: "text/event-stream" },
          signal: upstreamCtl.signal,
        });

        if (!u.ok || !u.body) throw new Error(`Upstream SSE ${u.status}`);
        const r = u.body.getReader();

        while (true) {
          const { value, done } = await r.read();
          if (done) break;
          if (value) {
            const text = new TextDecoder().decode(value);
            // capture first endpoint once
            if (!resourceEndpoint) {
              const m = text.match(
                /data:\s*(\/messages\/\?session_id=[A-Za-z0-9_-]+)/,
              );
              if (m) {
                resourceEndpoint = m[1];
                forwardPath = resourceEndpoint.split("?")[0];
                log(`Captured endpoint ${resourceEndpoint}`);
                ctrl.enqueue(
                  enc.encode(`event: resource\ndata: ${resourceEndpoint}\n\n`),
                );
              }
            }
            ctrl.enqueue(value);
          }
        }
      } catch (e) {
        if (e.name !== "AbortError") {
          log(`SSE error: ${e.message}`);
          ctrl.enqueue(enc.encode(`event: error\ndata: ${e.message}\n\n`));
        }
      }

      // Reduce keepalive interval to 5 seconds to prevent timeouts
      keepalive = setInterval(() => {
        try {
          ctrl.enqueue(enc.encode(":keepalive\n\n"));
        } catch (_) {
          clearInterval(keepalive);
        }
      }, 5000);
    },
  });

  return new Response(stream, {
    headers: {
      "Content-Type": "text/event-stream",
      "Cache-Control": "no-cache",
      Connection: "keep-alive",
      ...CORS,
    },
  });
}

```

--------------------------------------------------------------------------------
/tests/tdd/test_drug_approvals.py:
--------------------------------------------------------------------------------

```python
"""Tests for FDA drug approvals module."""

import json
from pathlib import Path
from unittest.mock import AsyncMock, patch

import pytest

from biomcp.openfda.drug_approvals import (
    get_drug_approval,
    search_drug_approvals,
)

# Load mock data
MOCK_DIR = Path(__file__).parent.parent / "data" / "openfda"
MOCK_APPROVALS_SEARCH = json.loads(
    (MOCK_DIR / "drugsfda_search.json").read_text()
)
MOCK_APPROVAL_DETAIL = json.loads(
    (MOCK_DIR / "drugsfda_detail.json").read_text()
)


class TestDrugApprovals:
    """Test drug approvals functionality."""

    @pytest.mark.asyncio
    async def test_search_drug_approvals_success(self):
        """Test successful drug approval search."""
        with patch(
            "biomcp.openfda.drug_approvals.make_openfda_request",
            new_callable=AsyncMock,
        ) as mock_request:
            mock_request.return_value = (MOCK_APPROVALS_SEARCH, None)

            result = await search_drug_approvals(
                drug="pembrolizumab",
                limit=10,
            )

            assert "FDA Drug Approval Records" in result
            assert "pembrolizumab" in result.lower()
            assert "Application" in result
            assert "BLA125514" in result
            mock_request.assert_called_once()

    @pytest.mark.asyncio
    async def test_search_drug_approvals_with_filters(self):
        """Test drug approval search with multiple filters."""
        with patch(
            "biomcp.openfda.drug_approvals.make_openfda_request",
            new_callable=AsyncMock,
        ) as mock_request:
            mock_request.return_value = (MOCK_APPROVALS_SEARCH, None)

            result = await search_drug_approvals(
                drug="keytruda",
                application_number="BLA125514",
                approval_year="2014",
                limit=5,
                api_key="test-key",
            )

            assert "FDA Drug Approval Records" in result
            # Verify API key was passed as the 4th positional argument
            call_args = mock_request.call_args
            assert (
                call_args[0][3] == "test-key"
            )  # api_key is 4th positional arg

    @pytest.mark.asyncio
    async def test_search_drug_approvals_no_results(self):
        """Test drug approval search with no results."""
        with patch(
            "biomcp.openfda.drug_approvals.make_openfda_request",
            new_callable=AsyncMock,
        ) as mock_request:
            mock_request.return_value = ({"results": []}, None)

            result = await search_drug_approvals(drug="nonexistent-drug")

            assert "No drug approval records found" in result

    @pytest.mark.asyncio
    async def test_search_drug_approvals_api_error(self):
        """Test drug approval search with API error."""
        with patch(
            "biomcp.openfda.drug_approvals.make_openfda_request",
            new_callable=AsyncMock,
        ) as mock_request:
            mock_request.return_value = (None, "API rate limit exceeded")

            result = await search_drug_approvals(drug="test")

            assert "Error searching drug approvals" in result
            assert "API rate limit exceeded" in result

    @pytest.mark.asyncio
    async def test_get_drug_approval_success(self):
        """Test getting specific drug approval details."""
        with patch(
            "biomcp.openfda.drug_approvals.make_openfda_request",
            new_callable=AsyncMock,
        ) as mock_request:
            mock_request.return_value = (MOCK_APPROVAL_DETAIL, None)

            result = await get_drug_approval("BLA125514")

            # Should have detailed approval info
            assert "BLA125514" in result or "Drug Approval Details" in result
            assert "BLA125514" in result
            assert "Products" in result
            assert "Submission" in result

    @pytest.mark.asyncio
    async def test_get_drug_approval_not_found(self):
        """Test getting drug approval that doesn't exist."""
        with patch(
            "biomcp.openfda.drug_approvals.make_openfda_request",
            new_callable=AsyncMock,
        ) as mock_request:
            mock_request.return_value = ({"results": []}, None)

            result = await get_drug_approval("INVALID123")

            assert "No approval record found" in result
            assert "INVALID123" in result

    @pytest.mark.asyncio
    async def test_get_drug_approval_with_api_key(self):
        """Test getting drug approval with API key."""
        with patch(
            "biomcp.openfda.drug_approvals.make_openfda_request",
            new_callable=AsyncMock,
        ) as mock_request:
            mock_request.return_value = (MOCK_APPROVAL_DETAIL, None)

            result = await get_drug_approval(
                "BLA125514",
                api_key="test-api-key",
            )

            # Should have detailed approval info
            assert "BLA125514" in result or "Drug Approval Details" in result
            # Verify API key was passed as the 4th positional argument
            call_args = mock_request.call_args
            assert (
                call_args[0][3] == "test-api-key"
            )  # api_key is 4th positional arg

    @pytest.mark.asyncio
    async def test_search_drug_approvals_pagination(self):
        """Test drug approval search pagination."""
        with patch(
            "biomcp.openfda.drug_approvals.make_openfda_request",
            new_callable=AsyncMock,
        ) as mock_request:
            mock_response = {
                "meta": {"results": {"total": 100}},
                "results": MOCK_APPROVALS_SEARCH["results"],
            }
            mock_request.return_value = (mock_response, None)

            result = await search_drug_approvals(
                drug="cancer",
                limit=10,
                skip=20,
            )

            # The output format is different - just check for the total
            assert "100" in result
            # Verify skip parameter was passed (2nd positional arg)
            call_args = mock_request.call_args
            assert (
                call_args[0][1]["skip"] == "20"
            )  # params is 2nd positional arg, value is string

    @pytest.mark.asyncio
    async def test_approval_year_validation(self):
        """Test that approval year is properly formatted."""
        with patch(
            "biomcp.openfda.drug_approvals.make_openfda_request",
            new_callable=AsyncMock,
        ) as mock_request:
            mock_request.return_value = (MOCK_APPROVALS_SEARCH, None)

            await search_drug_approvals(
                approval_year="2023",
            )

            # Check that year was properly formatted in query
            call_args = mock_request.call_args
            params = call_args[0][1]  # params is 2nd positional arg
            assert "marketing_status_date" in params["search"]
            assert "[2023-01-01 TO 2023-12-31]" in params["search"]

```

--------------------------------------------------------------------------------
/src/biomcp/articles/fetch.py:
--------------------------------------------------------------------------------

```python
import json
import re
from ssl import TLSVersion
from typing import Annotated, Any

from pydantic import BaseModel, Field, computed_field

from .. import http_client, render
from ..constants import PUBTATOR3_FULLTEXT_URL
from ..http_client import RequestError


class PassageInfo(BaseModel):
    section_type: str | None = Field(
        None,
        description="Type of the section.",
    )
    passage_type: str | None = Field(
        None,
        alias="type",
        description="Type of the passage.",
    )


class Passage(BaseModel):
    info: PassageInfo | None = Field(
        None,
        alias="infons",
    )
    text: str | None = None

    @property
    def section_type(self) -> str:
        section_type = None
        if self.info is not None:
            section_type = self.info.section_type or self.info.passage_type
        section_type = section_type or "UNKNOWN"
        return section_type.upper()

    @property
    def is_title(self) -> bool:
        return self.section_type == "TITLE"

    @property
    def is_abstract(self) -> bool:
        return self.section_type == "ABSTRACT"

    @property
    def is_text(self) -> bool:
        return self.section_type in {
            "INTRO",
            "RESULTS",
            "METHODS",
            "DISCUSS",
            "CONCL",
            "FIG",
            "TABLE",
        }


class Article(BaseModel):
    pmid: int | None = Field(
        None,
        description="PubMed ID of the reference article.",
    )
    pmcid: str | None = Field(
        None,
        description="PubMed Central ID of the reference article.",
    )
    date: str | None = Field(
        None,
        description="Date of the reference article's publication.",
    )
    journal: str | None = Field(
        None,
        description="Journal name.",
    )
    authors: list[str] | None = Field(
        None,
        description="List of authors.",
    )
    passages: list[Passage] = Field(
        ...,
        alias="passages",
        description="List of passages in the reference article.",
        exclude=True,
    )

    @computed_field
    def title(self) -> str:
        lines = []
        for passage in filter(lambda p: p.is_title, self.passages):
            if passage.text:
                lines.append(passage.text)
        return " ... ".join(lines) or f"Article: {self.pmid}"

    @computed_field
    def abstract(self) -> str:
        lines = []
        for passage in filter(lambda p: p.is_abstract, self.passages):
            if passage.text:
                lines.append(passage.text)
        return "\n\n".join(lines) or f"Article: {self.pmid}"

    @computed_field
    def full_text(self) -> str:
        lines = []
        for passage in filter(lambda p: p.is_text, self.passages):
            if passage.text:
                lines.append(passage.text)
        return "\n\n".join(lines) or ""

    @computed_field
    def pubmed_url(self) -> str | None:
        url = None
        if self.pmid:
            url = f"https://pubmed.ncbi.nlm.nih.gov/{self.pmid}/"
        return url

    @computed_field
    def pmc_url(self) -> str | None:
        """Generates the PMC URL if PMCID exists."""
        url = None
        if self.pmcid:
            url = f"https://www.ncbi.nlm.nih.gov/pmc/articles/{self.pmcid}/"
        return url


class FetchArticlesResponse(BaseModel):
    articles: list[Article] = Field(
        ...,
        alias="PubTator3",
        description="List of full texts Articles retrieved from PubTator3.",
    )

    def get_abstract(self, pmid: int | None) -> str | None:
        for article in self.articles:
            if pmid and article.pmid == pmid:
                return str(article.abstract)
        return None


async def call_pubtator_api(
    pmids: list[int],
    full: bool,
) -> tuple[FetchArticlesResponse | None, RequestError | None]:
    """Fetch the text of a list of PubMed IDs."""

    request = {
        "pmids": ",".join(str(pmid) for pmid in pmids),
        "full": str(full).lower(),
    }

    response, error = await http_client.request_api(
        url=PUBTATOR3_FULLTEXT_URL,
        request=request,
        response_model_type=FetchArticlesResponse,
        tls_version=TLSVersion.TLSv1_2,
        domain="pubmed",
    )
    return response, error


async def fetch_articles(
    pmids: list[int],
    full: bool,
    output_json: bool = False,
) -> str:
    """Fetch the text of a list of PubMed IDs."""

    response, error = await call_pubtator_api(pmids, full)

    # PubTator API returns full text even when full=False
    exclude_fields = {"full_text"} if not full else set()

    # noinspection DuplicatedCode
    if error:
        data: list[dict[str, Any]] = [
            {"error": f"Error {error.code}: {error.message}"}
        ]
    else:
        data = [
            article.model_dump(
                mode="json",
                exclude_none=True,
                exclude=exclude_fields,
            )
            for article in (response.articles if response else [])
        ]

    if data and not output_json:
        return render.to_markdown(data)
    else:
        return json.dumps(data, indent=2)


def is_doi(identifier: str) -> bool:
    """Check if the identifier is a DOI."""
    # DOI pattern: starts with 10. followed by numbers/slash/alphanumeric
    doi_pattern = r"^10\.\d{4,9}/[\-._;()/:\w]+$"
    return bool(re.match(doi_pattern, str(identifier)))


def is_pmid(identifier: str) -> bool:
    """Check if the identifier is a PubMed ID."""
    # PMID is a numeric string
    return str(identifier).isdigit()


async def _article_details(
    call_benefit: Annotated[
        str,
        "Define and summarize why this function is being called and the intended benefit",
    ],
    pmid,
) -> str:
    """
    Retrieves details for a single article given its identifier.

    Parameters:
    - call_benefit: Define and summarize why this function is being called and the intended benefit
    - pmid: An article identifier - either a PubMed ID (e.g., 34397683) or DOI (e.g., 10.1101/2024.01.20.23288905)

    Process:
    - For PMIDs: Calls the PubTator3 API to fetch the article's title, abstract, and full text (if available)
    - For DOIs: Calls Europe PMC API to fetch preprint details

    Output: A JSON formatted string containing the retrieved article content.
    """
    identifier = str(pmid)

    # Check if it's a DOI (Europe PMC preprint)
    if is_doi(identifier):
        from .preprints import fetch_europe_pmc_article

        return await fetch_europe_pmc_article(identifier, output_json=True)
    # Check if it's a PMID (PubMed article)
    elif is_pmid(identifier):
        return await fetch_articles(
            [int(identifier)], full=True, output_json=True
        )
    else:
        # Unknown identifier format
        return json.dumps(
            [
                {
                    "error": f"Invalid identifier format: {identifier}. Expected either a PMID (numeric) or DOI (10.xxxx/xxxx format)."
                }
            ],
            indent=2,
        )

```

--------------------------------------------------------------------------------
/docs/concepts/02-the-deep-researcher-persona.md:
--------------------------------------------------------------------------------

```markdown
# The Deep Researcher Persona

## Overview

The Deep Researcher Persona is a core philosophy of BioMCP that transforms AI assistants into systematic biomedical research partners. This persona embodies the methodical approach of a dedicated biomedical researcher, enabling AI agents to conduct thorough literature reviews, analyze complex datasets, and synthesize findings into actionable insights.

## Why the Deep Researcher Persona?

Traditional AI interactions often result in surface-level responses. The Deep Researcher Persona addresses this by:

- **Enforcing Systematic Thinking**: Requiring the use of the `think` tool before any research operation
- **Preventing Premature Conclusions**: Breaking complex queries into manageable research steps
- **Ensuring Comprehensive Analysis**: Following a proven 10-step methodology
- **Maintaining Research Rigor**: Documenting thought processes and decision rationale

## Core Traits and Personality

The Deep Researcher embodies these characteristics:

- **Curious and Methodical**: Always seeking deeper understanding through systematic investigation
- **Evidence-Based**: Grounding all conclusions in concrete data from multiple sources
- **Professional Voice**: Clear, concise scientific communication
- **Collaborative**: Working as a research partner, not just an information retriever
- **Objective**: Presenting balanced findings including contradictory evidence

## The 10-Step Sequential Thinking Process

This methodology ensures comprehensive research coverage:

### 1. Problem Definition and Scope

- Parse the research question to identify key concepts
- Define clear objectives and expected deliverables
- Establish research boundaries and constraints

### 2. Initial Knowledge Assessment

- Evaluate existing knowledge on the topic
- Identify knowledge gaps requiring investigation
- Form initial hypotheses to guide research

### 3. Search Strategy Development

- Design comprehensive search queries
- Select appropriate databases and tools
- Plan iterative search refinements

### 4. Data Collection and Retrieval

- Execute searches across multiple sources (PubTator3, ClinicalTrials.gov, variant databases)
- Collect relevant articles, trials, and annotations
- Document search parameters and results

### 5. Quality Assessment and Filtering

- Evaluate source credibility and relevance
- Apply inclusion/exclusion criteria
- Prioritize high-impact findings

### 6. Information Extraction

- Extract key findings, methodologies, and conclusions
- Identify patterns and relationships
- Note contradictions and uncertainties

### 7. Synthesis and Integration

- Combine findings from multiple sources
- Resolve contradictions when possible
- Build coherent narrative from evidence

### 8. Critical Analysis

- Evaluate strength of evidence
- Identify limitations and biases
- Consider alternative interpretations

### 9. Knowledge Synthesis

- Create structured summary of findings
- Highlight key insights and implications
- Prepare actionable recommendations

### 10. Communication and Reporting

- Format findings for target audience
- Include proper citations and references
- Provide clear next steps

## Mandatory Think Tool Usage

**CRITICAL**: The `think` tool must ALWAYS be used first before any BioMCP operation. This is not optional.

```python
# Correct pattern - ALWAYS start with think
think(thought="Breaking down the research question...", thoughtNumber=1)
# Then proceed with searches
article_searcher(genes=["BRAF"], diseases=["melanoma"])

# INCORRECT - Never skip the think step
article_searcher(genes=["BRAF"])  # ❌ Will produce suboptimal results
```

## Implementation in Practice

### Example Research Flow

1. **User Query**: "What are the treatment options for BRAF V600E melanoma?"

2. **Think Step 1**: Problem decomposition

   ```
   think(thought="Breaking down query: Need to find 1) BRAF V600E mutation significance, 2) current treatments, 3) clinical trials", thoughtNumber=1)
   ```

3. **Think Step 2**: Search strategy

   ```
   think(thought="Will search articles for BRAF inhibitors, then trials for V600E-specific treatments", thoughtNumber=2)
   ```

4. **Execute Searches**: Following the planned strategy
5. **Synthesize**: Combine findings into comprehensive brief

### Research Brief Format

Every research session concludes with a structured brief:

```markdown
## Research Brief: [Topic]

### Executive Summary

- 3-5 bullet points of key findings
- Clear, actionable insights

### Detailed Findings

1. **Literature Review** (X papers analyzed)

   - Key discoveries
   - Consensus findings
   - Contradictions noted

2. **Clinical Evidence** (Y trials reviewed)

   - Current treatment landscape
   - Emerging therapies
   - Trial enrollment opportunities

3. **Molecular Insights**
   - Variant annotations
   - Pathway implications
   - Biomarker relevance

### Recommendations

- Evidence-based suggestions
- Areas for further investigation
- Clinical considerations

### References

- Full citations for all sources
- Direct links to primary data
```

## Tool Inventory and Usage

The Deep Researcher has access to 24 specialized tools:

### Core Research Tools

- **think**: Sequential reasoning and planning
- **article_searcher**: PubMed/PubTator3 literature search
- **trial_searcher**: Clinical trials discovery
- **variant_searcher**: Genetic variant annotations

### Specialized Analysis Tools

- **gene_getter**: Gene function and pathway data
- **drug_getter**: Medication information
- **disease_getter**: Disease ontology and synonyms
- **alphagenome_predictor**: Variant effect prediction

### Integration Features

- **Automatic cBioPortal Integration**: Cancer genomics context for all gene searches
- **BioThings Suite Access**: Real-time biomedical annotations
- **NCI Database Integration**: Comprehensive cancer trial data

## Best Practices

1. **Always Think First**: Never skip the sequential thinking process
2. **Use Multiple Sources**: Cross-reference findings across databases
3. **Document Reasoning**: Explain why certain searches or filters were chosen
4. **Consider Context**: Account for disease stage, prior treatments, and patient factors
5. **Stay Current**: Leverage preprint integration for latest findings

## Community Impact

The Deep Researcher Persona has transformed how researchers interact with biomedical data:

- **Reduced Research Time**: From days to minutes for comprehensive reviews
- **Improved Accuracy**: Systematic approach reduces missed connections
- **Enhanced Collaboration**: Consistent methodology enables team research
- **Democratized Access**: Complex research capabilities available to all

## Getting Started

To use the Deep Researcher Persona:

1. Ensure BioMCP is installed and configured
2. Load the persona resource when starting your AI session
3. Always begin research queries with the think tool
4. Follow the 10-step methodology for comprehensive results

Remember: The Deep Researcher Persona is not just a tool configuration—it's a systematic approach to biomedical research that ensures thorough, evidence-based insights every time.

```

--------------------------------------------------------------------------------
/src/biomcp/render.py:
--------------------------------------------------------------------------------

```python
import json
import re
import textwrap
from typing import Any

MAX_WIDTH = 72

REMOVE_MULTI_LINES = re.compile(r"\s+")


def dedupe_list_keep_order(lst: list[Any]) -> list[Any]:
    """
    Remove duplicates from a list while preserving order.
    Uses string to handle elements like dicts that are not hashable.
    """
    seen = set()
    data = []
    for x in lst:
        if str(x) not in seen:
            data.append(x)
            seen.add(str(x))
    return data


def to_markdown(data: str | list | dict) -> str:
    """Convert a JSON string or already-parsed data (dict or list) into
    a simple Markdown representation.

    :param data: The input data, either as a JSON string, or a parsed list/dict.
    :return: A string containing the generated Markdown output.
    """
    if isinstance(data, str):
        data = json.loads(data)

    if isinstance(data, list):
        new_data = []
        for index, item in enumerate(data, start=1):
            new_data.append({f"Record {index}": item})
        data = new_data

    lines: list[str] = []
    process_any(data, [], lines)
    return ("\n".join(lines)).strip() + "\n"


def wrap_preserve_newlines(text: str, width: int) -> list[str]:
    """For each line in the text (split by newlines), wrap it to 'width' columns.
    Blank lines are preserved. Returns a list of wrapped lines without
    inserting extra blank lines.

    :param text: The multiline string to wrap.
    :param width: Maximum line width for wrapping.
    :return: A list of lines after wrapping.
    """
    wrapped_lines: list[str] = []
    for line in text.splitlines(keepends=False):
        if not line.strip():
            wrapped_lines.append("")
            continue
        # remove excessive spaces (pmid=38296628)
        line = REMOVE_MULTI_LINES.sub(" ", line)
        pieces = textwrap.wrap(line, width=width)
        wrapped_lines.extend(pieces)
    return wrapped_lines


def append_line(lines: list[str], line: str) -> None:
    """Append a line to 'lines', avoiding consecutive blank lines.

    :param lines: The running list of lines to which we add.
    :param line: The line to append.
    """
    line = line.rstrip()
    lines.append(line)


def process_any(
    value: Any,
    path_keys: list[str],
    lines: list[str],
) -> None:
    """Dispatch function to handle dict, list, or scalar (str/int/float/bool).

    :param value: The current JSON data node.
    :param path_keys: The list of keys leading to this node (for headings).
    :param lines: The running list of output Markdown lines.
    """
    if isinstance(value, dict):
        process_dict(value, path_keys, lines)
    elif isinstance(value, list):
        process_list(value, path_keys, lines)
    elif value is not None:
        render_key_value(lines, path_keys[-1], value)


def process_dict(dct: dict, path_keys: list[str], lines: list[str]) -> None:
    """Handle a dictionary by printing a heading for the current path (if any),
    then processing key/value pairs in order: scalars first, then nested dicts, then lists.

    :param dct: The dictionary to process.
    :param path_keys: The list of keys leading to this dict (for heading).
    :param lines: The running list of output Markdown lines.
    """
    if path_keys:
        level = min(len(path_keys), 5)
        heading_hash = "#" * level
        heading_text = transform_key(path_keys[-1])
        # Blank line, then heading
        append_line(lines, "")
        append_line(lines, f"{heading_hash} {heading_text}")

    # Group keys by value type
    scalar_keys = []
    dict_keys = []
    list_keys = []

    for key, val in dct.items():
        if isinstance(val, str | int | float | bool) or val is None:
            scalar_keys.append(key)
        elif isinstance(val, dict):
            dict_keys.append(key)
        elif isinstance(val, list):
            list_keys.append(key)

    # Process scalars first
    for key in scalar_keys:
        next_path = path_keys + [key]
        process_any(dct[key], next_path, lines)

    # Process dicts second
    for key in dict_keys:
        next_path = path_keys + [key]
        process_any(dct[key], next_path, lines)

    # Process lists last
    for key in list_keys:
        next_path = path_keys + [key]
        process_any(dct[key], next_path, lines)


def process_list(lst: list, path_keys: list[str], lines: list[str]) -> None:
    """If all items in the list are scalar, attempt to render them on one line
    if it fits, otherwise use bullet points. Otherwise, we recursively
    process each item.

    :param lst: The list of items to process.
    :param path_keys: The keys leading to this list.
    :param lines: The running list of Markdown lines.
    """
    all_scalars = all(isinstance(i, str | int | float | bool) for i in lst)
    lst = dedupe_list_keep_order(lst)
    if path_keys and all_scalars:
        key = path_keys[-1]
        process_scalar_list(key, lines, lst)
    else:
        for item in lst:
            process_any(item, path_keys, lines)


def process_scalar_list(key: str, lines: list[str], lst: list) -> None:
    """Print a list of scalars either on one line as "Key: item1, item2, ..."
    if it fits within MAX_WIDTH, otherwise print a bullet list.

    :param key: The key name for this list of scalars.
    :param lines: The running list of Markdown lines.
    :param lst: The actual list of scalar items.
    """
    label = transform_key(key)
    items_str = ", ".join(str(item) for item in lst)
    single_line = f"{label}: {items_str}"
    if len(single_line) <= MAX_WIDTH:
        append_line(lines, single_line)
    else:
        # bullet list
        append_line(lines, f"{label}:")
        for item in lst:
            bullet = f"- {item}"
            append_line(lines, bullet)


def render_key_value(lines: list[str], key: str, value: Any) -> None:
    """Render a single "key: value" pair. If the value is a long string,
    we do multiline wrapping with an indentation for clarity. Otherwise,
    it appears on the same line.

    :param lines: The running list of Markdown lines.
    :param key: The raw key name (untransformed).
    :param value: The value associated with this key.
    """
    label = transform_key(key)
    val_str = str(value)

    # If the value is a fairly long string, do multiline
    if isinstance(value, str) and len(value) > MAX_WIDTH:
        append_line(lines, f"{label}:")
        for wrapped in wrap_preserve_newlines(val_str, MAX_WIDTH):
            append_line(lines, "  " + wrapped)
    else:
        append_line(lines, f"{label}: {val_str}")


def transform_key(s: str) -> str:
    # Replace underscores with spaces.
    s = s.replace("_", " ")
    # Insert a space between an uppercase letter followed by an uppercase letter then a lowercase letter.
    s = re.sub(r"(?<=[A-Z])(?=[A-Z][a-z])", " ", s)
    # Insert a space between a lowercase letter or digit and an uppercase letter.
    s = re.sub(r"(?<=[a-z0-9])(?=[A-Z])", " ", s)

    words = s.split()
    transformed_words = []
    for word in words:
        transformed_words.append(word.capitalize())
    return " ".join(transformed_words)

```
Page 3/15FirstPrevNextLast