#
tokens: 49633/50000 43/303 files (page 2/15)
lines: off (toggle) GitHub
raw markdown copy
This is page 2 of 15. Use http://codebase.md/genomoncology/biomcp?page={x} to view the full context.

# Directory Structure

```
├── .github
│   ├── actions
│   │   └── setup-python-env
│   │       └── action.yml
│   ├── dependabot.yml
│   └── workflows
│       ├── ci.yml
│       ├── deploy-docs.yml
│       ├── main.yml.disabled
│       ├── on-release-main.yml
│       └── validate-codecov-config.yml
├── .gitignore
├── .pre-commit-config.yaml
├── BIOMCP_DATA_FLOW.md
├── CHANGELOG.md
├── CNAME
├── codecov.yaml
├── docker-compose.yml
├── Dockerfile
├── docs
│   ├── apis
│   │   ├── error-codes.md
│   │   ├── overview.md
│   │   └── python-sdk.md
│   ├── assets
│   │   ├── biomcp-cursor-locations.png
│   │   ├── favicon.ico
│   │   ├── icon.png
│   │   ├── logo.png
│   │   ├── mcp_architecture.txt
│   │   └── remote-connection
│   │       ├── 00_connectors.png
│   │       ├── 01_add_custom_connector.png
│   │       ├── 02_connector_enabled.png
│   │       ├── 03_connect_to_biomcp.png
│   │       ├── 04_select_google_oauth.png
│   │       └── 05_success_connect.png
│   ├── backend-services-reference
│   │   ├── 01-overview.md
│   │   ├── 02-biothings-suite.md
│   │   ├── 03-cbioportal.md
│   │   ├── 04-clinicaltrials-gov.md
│   │   ├── 05-nci-cts-api.md
│   │   ├── 06-pubtator3.md
│   │   └── 07-alphagenome.md
│   ├── blog
│   │   ├── ai-assisted-clinical-trial-search-analysis.md
│   │   ├── images
│   │   │   ├── deep-researcher-video.png
│   │   │   ├── researcher-announce.png
│   │   │   ├── researcher-drop-down.png
│   │   │   ├── researcher-prompt.png
│   │   │   ├── trial-search-assistant.png
│   │   │   └── what_is_biomcp_thumbnail.png
│   │   └── researcher-persona-resource.md
│   ├── changelog.md
│   ├── CNAME
│   ├── concepts
│   │   ├── 01-what-is-biomcp.md
│   │   ├── 02-the-deep-researcher-persona.md
│   │   └── 03-sequential-thinking-with-the-think-tool.md
│   ├── developer-guides
│   │   ├── 01-server-deployment.md
│   │   ├── 02-contributing-and-testing.md
│   │   ├── 03-third-party-endpoints.md
│   │   ├── 04-transport-protocol.md
│   │   ├── 05-error-handling.md
│   │   ├── 06-http-client-and-caching.md
│   │   ├── 07-performance-optimizations.md
│   │   └── generate_endpoints.py
│   ├── faq-condensed.md
│   ├── FDA_SECURITY.md
│   ├── genomoncology.md
│   ├── getting-started
│   │   ├── 01-quickstart-cli.md
│   │   ├── 02-claude-desktop-integration.md
│   │   └── 03-authentication-and-api-keys.md
│   ├── how-to-guides
│   │   ├── 01-find-articles-and-cbioportal-data.md
│   │   ├── 02-find-trials-with-nci-and-biothings.md
│   │   ├── 03-get-comprehensive-variant-annotations.md
│   │   ├── 04-predict-variant-effects-with-alphagenome.md
│   │   ├── 05-logging-and-monitoring-with-bigquery.md
│   │   └── 06-search-nci-organizations-and-interventions.md
│   ├── index.md
│   ├── policies.md
│   ├── reference
│   │   ├── architecture-diagrams.md
│   │   ├── quick-architecture.md
│   │   ├── quick-reference.md
│   │   └── visual-architecture.md
│   ├── robots.txt
│   ├── stylesheets
│   │   ├── announcement.css
│   │   └── extra.css
│   ├── troubleshooting.md
│   ├── tutorials
│   │   ├── biothings-prompts.md
│   │   ├── claude-code-biomcp-alphagenome.md
│   │   ├── nci-prompts.md
│   │   ├── openfda-integration.md
│   │   ├── openfda-prompts.md
│   │   ├── pydantic-ai-integration.md
│   │   └── remote-connection.md
│   ├── user-guides
│   │   ├── 01-command-line-interface.md
│   │   ├── 02-mcp-tools-reference.md
│   │   └── 03-integrating-with-ides-and-clients.md
│   └── workflows
│       └── all-workflows.md
├── example_scripts
│   ├── mcp_integration.py
│   └── python_sdk.py
├── glama.json
├── LICENSE
├── lzyank.toml
├── Makefile
├── mkdocs.yml
├── package-lock.json
├── package.json
├── pyproject.toml
├── README.md
├── scripts
│   ├── check_docs_in_mkdocs.py
│   ├── check_http_imports.py
│   └── generate_endpoints_doc.py
├── smithery.yaml
├── src
│   └── biomcp
│       ├── __init__.py
│       ├── __main__.py
│       ├── articles
│       │   ├── __init__.py
│       │   ├── autocomplete.py
│       │   ├── fetch.py
│       │   ├── preprints.py
│       │   ├── search_optimized.py
│       │   ├── search.py
│       │   └── unified.py
│       ├── biomarkers
│       │   ├── __init__.py
│       │   └── search.py
│       ├── cbioportal_helper.py
│       ├── circuit_breaker.py
│       ├── cli
│       │   ├── __init__.py
│       │   ├── articles.py
│       │   ├── biomarkers.py
│       │   ├── diseases.py
│       │   ├── health.py
│       │   ├── interventions.py
│       │   ├── main.py
│       │   ├── openfda.py
│       │   ├── organizations.py
│       │   ├── server.py
│       │   ├── trials.py
│       │   └── variants.py
│       ├── connection_pool.py
│       ├── constants.py
│       ├── core.py
│       ├── diseases
│       │   ├── __init__.py
│       │   ├── getter.py
│       │   └── search.py
│       ├── domain_handlers.py
│       ├── drugs
│       │   ├── __init__.py
│       │   └── getter.py
│       ├── exceptions.py
│       ├── genes
│       │   ├── __init__.py
│       │   └── getter.py
│       ├── http_client_simple.py
│       ├── http_client.py
│       ├── individual_tools.py
│       ├── integrations
│       │   ├── __init__.py
│       │   ├── biothings_client.py
│       │   └── cts_api.py
│       ├── interventions
│       │   ├── __init__.py
│       │   ├── getter.py
│       │   └── search.py
│       ├── logging_filter.py
│       ├── metrics_handler.py
│       ├── metrics.py
│       ├── openfda
│       │   ├── __init__.py
│       │   ├── adverse_events_helpers.py
│       │   ├── adverse_events.py
│       │   ├── cache.py
│       │   ├── constants.py
│       │   ├── device_events_helpers.py
│       │   ├── device_events.py
│       │   ├── drug_approvals.py
│       │   ├── drug_labels_helpers.py
│       │   ├── drug_labels.py
│       │   ├── drug_recalls_helpers.py
│       │   ├── drug_recalls.py
│       │   ├── drug_shortages_detail_helpers.py
│       │   ├── drug_shortages_helpers.py
│       │   ├── drug_shortages.py
│       │   ├── exceptions.py
│       │   ├── input_validation.py
│       │   ├── rate_limiter.py
│       │   ├── utils.py
│       │   └── validation.py
│       ├── organizations
│       │   ├── __init__.py
│       │   ├── getter.py
│       │   └── search.py
│       ├── parameter_parser.py
│       ├── prefetch.py
│       ├── query_parser.py
│       ├── query_router.py
│       ├── rate_limiter.py
│       ├── render.py
│       ├── request_batcher.py
│       ├── resources
│       │   ├── __init__.py
│       │   ├── getter.py
│       │   ├── instructions.md
│       │   └── researcher.md
│       ├── retry.py
│       ├── router_handlers.py
│       ├── router.py
│       ├── shared_context.py
│       ├── thinking
│       │   ├── __init__.py
│       │   ├── sequential.py
│       │   └── session.py
│       ├── thinking_tool.py
│       ├── thinking_tracker.py
│       ├── trials
│       │   ├── __init__.py
│       │   ├── getter.py
│       │   ├── nci_getter.py
│       │   ├── nci_search.py
│       │   └── search.py
│       ├── utils
│       │   ├── __init__.py
│       │   ├── cancer_types_api.py
│       │   ├── cbio_http_adapter.py
│       │   ├── endpoint_registry.py
│       │   ├── gene_validator.py
│       │   ├── metrics.py
│       │   ├── mutation_filter.py
│       │   ├── query_utils.py
│       │   ├── rate_limiter.py
│       │   └── request_cache.py
│       ├── variants
│       │   ├── __init__.py
│       │   ├── alphagenome.py
│       │   ├── cancer_types.py
│       │   ├── cbio_external_client.py
│       │   ├── cbioportal_mutations.py
│       │   ├── cbioportal_search_helpers.py
│       │   ├── cbioportal_search.py
│       │   ├── constants.py
│       │   ├── external.py
│       │   ├── filters.py
│       │   ├── getter.py
│       │   ├── links.py
│       │   └── search.py
│       └── workers
│           ├── __init__.py
│           ├── worker_entry_stytch.js
│           ├── worker_entry.js
│           └── worker.py
├── tests
│   ├── bdd
│   │   ├── cli_help
│   │   │   ├── help.feature
│   │   │   └── test_help.py
│   │   ├── conftest.py
│   │   ├── features
│   │   │   └── alphagenome_integration.feature
│   │   ├── fetch_articles
│   │   │   ├── fetch.feature
│   │   │   └── test_fetch.py
│   │   ├── get_trials
│   │   │   ├── get.feature
│   │   │   └── test_get.py
│   │   ├── get_variants
│   │   │   ├── get.feature
│   │   │   └── test_get.py
│   │   ├── search_articles
│   │   │   ├── autocomplete.feature
│   │   │   ├── search.feature
│   │   │   ├── test_autocomplete.py
│   │   │   └── test_search.py
│   │   ├── search_trials
│   │   │   ├── search.feature
│   │   │   └── test_search.py
│   │   ├── search_variants
│   │   │   ├── search.feature
│   │   │   └── test_search.py
│   │   └── steps
│   │       └── test_alphagenome_steps.py
│   ├── config
│   │   └── test_smithery_config.py
│   ├── conftest.py
│   ├── data
│   │   ├── ct_gov
│   │   │   ├── clinical_trials_api_v2.yaml
│   │   │   ├── trials_NCT04280705.json
│   │   │   └── trials_NCT04280705.txt
│   │   ├── myvariant
│   │   │   ├── myvariant_api.yaml
│   │   │   ├── myvariant_field_descriptions.csv
│   │   │   ├── variants_full_braf_v600e.json
│   │   │   ├── variants_full_braf_v600e.txt
│   │   │   └── variants_part_braf_v600_multiple.json
│   │   ├── openfda
│   │   │   ├── drugsfda_detail.json
│   │   │   ├── drugsfda_search.json
│   │   │   ├── enforcement_detail.json
│   │   │   └── enforcement_search.json
│   │   └── pubtator
│   │       ├── pubtator_autocomplete.json
│   │       └── pubtator3_paper.txt
│   ├── integration
│   │   ├── test_openfda_integration.py
│   │   ├── test_preprints_integration.py
│   │   ├── test_simple.py
│   │   └── test_variants_integration.py
│   ├── tdd
│   │   ├── articles
│   │   │   ├── test_autocomplete.py
│   │   │   ├── test_cbioportal_integration.py
│   │   │   ├── test_fetch.py
│   │   │   ├── test_preprints.py
│   │   │   ├── test_search.py
│   │   │   └── test_unified.py
│   │   ├── conftest.py
│   │   ├── drugs
│   │   │   ├── __init__.py
│   │   │   └── test_drug_getter.py
│   │   ├── openfda
│   │   │   ├── __init__.py
│   │   │   ├── test_adverse_events.py
│   │   │   ├── test_device_events.py
│   │   │   ├── test_drug_approvals.py
│   │   │   ├── test_drug_labels.py
│   │   │   ├── test_drug_recalls.py
│   │   │   ├── test_drug_shortages.py
│   │   │   └── test_security.py
│   │   ├── test_biothings_integration_real.py
│   │   ├── test_biothings_integration.py
│   │   ├── test_circuit_breaker.py
│   │   ├── test_concurrent_requests.py
│   │   ├── test_connection_pool.py
│   │   ├── test_domain_handlers.py
│   │   ├── test_drug_approvals.py
│   │   ├── test_drug_recalls.py
│   │   ├── test_drug_shortages.py
│   │   ├── test_endpoint_documentation.py
│   │   ├── test_error_scenarios.py
│   │   ├── test_europe_pmc_fetch.py
│   │   ├── test_mcp_integration.py
│   │   ├── test_mcp_tools.py
│   │   ├── test_metrics.py
│   │   ├── test_nci_integration.py
│   │   ├── test_nci_mcp_tools.py
│   │   ├── test_network_policies.py
│   │   ├── test_offline_mode.py
│   │   ├── test_openfda_unified.py
│   │   ├── test_pten_r173_search.py
│   │   ├── test_render.py
│   │   ├── test_request_batcher.py.disabled
│   │   ├── test_retry.py
│   │   ├── test_router.py
│   │   ├── test_shared_context.py.disabled
│   │   ├── test_unified_biothings.py
│   │   ├── thinking
│   │   │   ├── __init__.py
│   │   │   └── test_sequential.py
│   │   ├── trials
│   │   │   ├── test_backward_compatibility.py
│   │   │   ├── test_getter.py
│   │   │   └── test_search.py
│   │   ├── utils
│   │   │   ├── test_gene_validator.py
│   │   │   ├── test_mutation_filter.py
│   │   │   ├── test_rate_limiter.py
│   │   │   └── test_request_cache.py
│   │   ├── variants
│   │   │   ├── constants.py
│   │   │   ├── test_alphagenome_api_key.py
│   │   │   ├── test_alphagenome_comprehensive.py
│   │   │   ├── test_alphagenome.py
│   │   │   ├── test_cbioportal_mutations.py
│   │   │   ├── test_cbioportal_search.py
│   │   │   ├── test_external_integration.py
│   │   │   ├── test_external.py
│   │   │   ├── test_extract_gene_aa_change.py
│   │   │   ├── test_filters.py
│   │   │   ├── test_getter.py
│   │   │   ├── test_links.py
│   │   │   └── test_search.py
│   │   └── workers
│   │       └── test_worker_sanitization.js
│   └── test_pydantic_ai_integration.py
├── THIRD_PARTY_ENDPOINTS.md
├── tox.ini
├── uv.lock
└── wrangler.toml
```

# Files

--------------------------------------------------------------------------------
/src/biomcp/thinking_tool.py:
--------------------------------------------------------------------------------

```python
"""Sequential thinking tool for structured problem-solving.

This module provides a dedicated MCP tool for sequential thinking,
separate from the main search functionality.
"""

from typing import Annotated

from pydantic import Field

from biomcp.core import mcp_app
from biomcp.metrics import track_performance
from biomcp.thinking.sequential import _sequential_thinking
from biomcp.thinking_tracker import mark_thinking_used


@mcp_app.tool()
@track_performance("biomcp.think")
async def think(
    thought: Annotated[
        str,
        Field(description="Current thinking step for analysis"),
    ],
    thoughtNumber: Annotated[
        int,
        Field(
            description="Current thought number, starting at 1",
            ge=1,
        ),
    ],
    totalThoughts: Annotated[
        int,
        Field(
            description="Estimated total thoughts needed for complete analysis",
            ge=1,
        ),
    ],
    nextThoughtNeeded: Annotated[
        bool,
        Field(
            description="Whether more thinking steps are needed after this one",
        ),
    ] = True,
) -> dict:
    """REQUIRED FIRST STEP: Perform structured sequential thinking for ANY biomedical research task.

    🚨 IMPORTANT: You MUST use this tool BEFORE any search or fetch operations when:
    - Researching ANY biomedical topic (genes, diseases, variants, trials)
    - Planning to use multiple BioMCP tools
    - Answering questions that require analysis or synthesis
    - Comparing information from different sources
    - Making recommendations or drawing conclusions

    ⚠️ FAILURE TO USE THIS TOOL FIRST will result in:
    - Incomplete or poorly structured analysis
    - Missing important connections between data
    - Suboptimal search strategies
    - Overlooked critical information

    Sequential thinking ensures you:
    1. Fully understand the research question
    2. Plan an optimal search strategy
    3. Identify all relevant data sources
    4. Structure your analysis properly
    5. Deliver comprehensive, well-reasoned results

    ## Usage Pattern:
    1. Start with thoughtNumber=1 to initiate analysis
    2. Progress through numbered thoughts sequentially
    3. Adjust totalThoughts estimate as understanding develops
    4. Set nextThoughtNeeded=False only when analysis is complete

    ## Example:
    ```python
    # Initial analysis
    await think(
        thought="Breaking down the relationship between BRAF mutations and melanoma treatment resistance...",
        thoughtNumber=1,
        totalThoughts=5,
        nextThoughtNeeded=True
    )

    # Continue analysis
    await think(
        thought="Examining specific BRAF V600E mutation mechanisms...",
        thoughtNumber=2,
        totalThoughts=5,
        nextThoughtNeeded=True
    )

    # Final thought
    await think(
        thought="Synthesizing findings and proposing research directions...",
        thoughtNumber=5,
        totalThoughts=5,
        nextThoughtNeeded=False
    )
    ```

    ## Important Notes:
    - Each thought builds on previous ones within a session
    - State is maintained throughout the MCP session
    - Use thoughtful, detailed analysis in each step
    - Revisions and branching are supported through the underlying implementation
    """
    # Mark that thinking has been used
    mark_thinking_used()

    result = await _sequential_thinking(
        thought=thought,
        thoughtNumber=thoughtNumber,
        totalThoughts=totalThoughts,
        nextThoughtNeeded=nextThoughtNeeded,
    )

    return {
        "domain": "thinking",
        "result": result,
        "thoughtNumber": thoughtNumber,
        "nextThoughtNeeded": nextThoughtNeeded,
    }

```

--------------------------------------------------------------------------------
/tests/tdd/variants/test_search.py:
--------------------------------------------------------------------------------

```python
import pytest

from biomcp.variants.search import (
    ClinicalSignificance,
    PolyPhenPrediction,
    SiftPrediction,
    VariantQuery,
    build_query_string,
    search_variants,
)


@pytest.fixture
def basic_query():
    """Create a basic gene query."""
    return VariantQuery(gene="BRAF")


@pytest.fixture
def complex_query():
    """Create a complex query with multiple parameters."""
    return VariantQuery(
        gene="BRCA1",
        significance=ClinicalSignificance.PATHOGENIC,
        min_frequency=0.0001,
        max_frequency=0.01,
    )


def test_query_validation():
    """Test VariantQuery model validation."""
    # Test basic query with gene
    query = VariantQuery(gene="BRAF")
    assert query.gene == "BRAF"

    # Test query with rsid
    query = VariantQuery(rsid="rs113488022")
    assert query.rsid == "rs113488022"

    # Test query requires at least one search parameter
    with pytest.raises(ValueError):
        VariantQuery()

    # Test query with clinical significance enum requires a search parameter
    query = VariantQuery(
        gene="BRCA1", significance=ClinicalSignificance.PATHOGENIC
    )
    assert query.significance == ClinicalSignificance.PATHOGENIC

    # Test query with prediction scores
    query = VariantQuery(
        gene="TP53",
        polyphen=PolyPhenPrediction.PROBABLY_DAMAGING,
        sift=SiftPrediction.DELETERIOUS,
    )
    assert query.polyphen == PolyPhenPrediction.PROBABLY_DAMAGING
    assert query.sift == SiftPrediction.DELETERIOUS


def test_build_query_string():
    """Test build_query_string function."""
    # Test single field
    query = VariantQuery(gene="BRAF")
    q_string = build_query_string(query)
    assert 'dbnsfp.genename:"BRAF"' in q_string

    # Test multiple fields
    query = VariantQuery(gene="BRAF", rsid="rs113488022")
    q_string = build_query_string(query)
    assert 'dbnsfp.genename:"BRAF"' in q_string
    assert "rs113488022" in q_string

    # Test genomic region
    query = VariantQuery(region="chr7:140753300-140753400")
    q_string = build_query_string(query)
    assert "chr7:140753300-140753400" in q_string

    # Test clinical significance
    query = VariantQuery(significance=ClinicalSignificance.LIKELY_BENIGN)
    q_string = build_query_string(query)
    assert 'clinvar.rcv.clinical_significance:"likely benign"' in q_string

    # Test frequency filters
    query = VariantQuery(min_frequency=0.0001, max_frequency=0.01)
    q_string = build_query_string(query)
    assert "gnomad_exome.af.af:>=0.0001" in q_string
    assert "gnomad_exome.af.af:<=0.01" in q_string


async def test_search_variants_basic(basic_query, anyio_backend):
    """Test search_variants function with a basic query."""
    # Use a real API query for a common gene
    result = await search_variants(basic_query)

    # Verify we got sensible results
    assert "BRAF" in result
    assert not result.startswith("Error")


async def test_search_variants_complex(complex_query, anyio_backend):
    """Test search_variants function with a complex query."""
    # Use a simple common query that will return results
    simple_query = VariantQuery(gene="TP53")
    result = await search_variants(simple_query)

    # Verify response formatting
    assert not result.startswith("Error")


async def test_search_variants_no_results(anyio_backend):
    """Test search_variants function with a query that returns no results."""
    query = VariantQuery(gene="UNKNOWN_XYZ")
    result = await search_variants(query, output_json=True)
    assert result == "[]"


async def test_search_variants_with_limit(anyio_backend):
    """Test search_variants function with size limit."""
    # Query with a small limit
    query = VariantQuery(gene="TP53", size=3)
    result = await search_variants(query)

    # Result should be valid but limited
    assert not result.startswith("Error")

```

--------------------------------------------------------------------------------
/tests/tdd/test_offline_mode.py:
--------------------------------------------------------------------------------

```python
"""Tests for offline mode functionality."""

import os
from unittest.mock import patch

import pytest

from biomcp.http_client import RequestError, request_api


@pytest.mark.asyncio
async def test_offline_mode_blocks_requests():
    """Test that offline mode prevents HTTP requests."""
    # Set offline mode
    with patch.dict(os.environ, {"BIOMCP_OFFLINE": "true"}):
        # Try to make a request
        result, error = await request_api(
            url="https://api.example.com/test",
            request={"test": "data"},
            cache_ttl=0,  # Disable caching for this test
        )

        # Should get an error
        assert result is None
        assert error is not None
        assert isinstance(error, RequestError)
        assert error.code == 503
        assert "Offline mode enabled" in error.message


@pytest.mark.asyncio
async def test_offline_mode_allows_cached_responses():
    """Test that offline mode still returns cached responses."""
    # First, cache a response (with offline mode disabled)
    with (
        patch.dict(os.environ, {"BIOMCP_OFFLINE": "false"}),
        patch("biomcp.http_client.call_http") as mock_call,
    ):
        mock_call.return_value = (200, '{"data": "cached"}')

        # Make a request to cache it
        result, error = await request_api(
            url="https://api.example.com/cached",
            request={"test": "data"},
            cache_ttl=3600,  # Cache for 1 hour
        )

        assert result == {"data": "cached"}
        assert error is None

    # Now enable offline mode
    with patch.dict(os.environ, {"BIOMCP_OFFLINE": "true"}):
        # Try to get the same request - should return cached result
        result, error = await request_api(
            url="https://api.example.com/cached",
            request={"test": "data"},
            cache_ttl=3600,
        )

        # Should get the cached response
        assert result == {"data": "cached"}
        assert error is None


@pytest.mark.asyncio
async def test_offline_mode_case_insensitive():
    """Test that offline mode environment variable is case insensitive."""
    test_values = ["TRUE", "True", "1", "yes", "YES", "Yes"]

    for value in test_values:
        with patch.dict(os.environ, {"BIOMCP_OFFLINE": value}):
            result, error = await request_api(
                url="https://api.example.com/test",
                request={"test": "data"},
                cache_ttl=0,
            )

            assert result is None
            assert error is not None
            assert error.code == 503
            assert "Offline mode enabled" in error.message


@pytest.mark.asyncio
async def test_offline_mode_disabled_by_default():
    """Test that offline mode is disabled by default."""
    # Clear the environment variable
    with (
        patch.dict(os.environ, {}, clear=True),
        patch("biomcp.http_client.call_http") as mock_call,
    ):
        mock_call.return_value = (200, '{"data": "response"}')

        result, error = await request_api(
            url="https://api.example.com/test",
            request={"test": "data"},
            cache_ttl=0,
        )

        # Should make the request successfully
        assert result == {"data": "response"}
        assert error is None
        mock_call.assert_called_once()


@pytest.mark.asyncio
async def test_offline_mode_with_endpoint_tracking():
    """Test that offline mode works with endpoint tracking."""
    with patch.dict(os.environ, {"BIOMCP_OFFLINE": "true"}):
        result, error = await request_api(
            url="https://www.ncbi.nlm.nih.gov/research/pubtator3-api/search/",
            request={"text": "BRAF"},
            endpoint_key="pubtator3_search",
            cache_ttl=0,
        )

        assert result is None
        assert error is not None
        assert error.code == 503
        assert "pubtator3-api/search/" in error.message

```

--------------------------------------------------------------------------------
/src/biomcp/variants/links.py:
--------------------------------------------------------------------------------

```python
"""Functions for adding database links to variant data."""

from typing import Any


def _calculate_vcf_end(variant: dict[str, Any]) -> int:
    """Calculate the end position for UCSC Genome Browser link."""
    if "vcf" not in variant:
        return 0

    vcf = variant["vcf"]
    pos = int(vcf.get("position", 0))
    ref = vcf.get("ref", "")
    alt = vcf.get("alt", "")

    # For insertions/deletions, handle special cases
    if not ref and alt:  # insertion
        return pos + 1
    elif ref and not alt:  # deletion
        return pos + len(ref)
    else:  # substitution
        return pos + max(0, ((len(alt) + 1) - len(ref)))


def _get_first_value(data: Any) -> Any:
    """Get the first value from a list or return the value itself."""
    if isinstance(data, list) and data:
        return data[0]
    return data


def _ensure_url_section(variant: dict[str, Any]) -> None:
    """Ensure the URL section exists in the variant."""
    if "url" not in variant:
        variant["url"] = {}


def _add_dbsnp_links(variant: dict[str, Any]) -> None:
    """Add dbSNP and Ensembl links if rsid is present."""
    if "dbsnp" in variant and variant["dbsnp"].get("rsid"):
        variant["dbsnp"]["url"] = (
            f"https://www.ncbi.nlm.nih.gov/snp/{variant['dbsnp']['rsid']}"
        )
        _ensure_url_section(variant)
        variant["url"]["ensembl"] = (
            f"https://ensembl.org/Homo_sapiens/Variation/Explore?v={variant['dbsnp']['rsid']}"
        )


def _add_clinvar_link(variant: dict[str, Any]) -> None:
    """Add ClinVar link if variant_id is present."""
    if "clinvar" in variant and variant["clinvar"].get("variant_id"):
        variant["clinvar"]["url"] = (
            f"https://www.ncbi.nlm.nih.gov/clinvar/variation/{variant['clinvar']['variant_id']}/"
        )


def _add_cosmic_link(variant: dict[str, Any]) -> None:
    """Add COSMIC link if cosmic_id is present."""
    if "cosmic" in variant and variant["cosmic"].get("cosmic_id"):
        variant["cosmic"]["url"] = (
            f"https://cancer.sanger.ac.uk/cosmic/mutation/overview?id={variant['cosmic']['cosmic_id']}"
        )


def _add_civic_link(variant: dict[str, Any]) -> None:
    """Add CIViC link if id is present."""
    if "civic" in variant and variant["civic"].get("id"):
        variant["civic"]["url"] = (
            f"https://civicdb.org/variants/{variant['civic']['id']}/summary"
        )


def _add_ucsc_link(variant: dict[str, Any]) -> None:
    """Add UCSC Genome Browser link if chromosome and position are present."""
    if (
        "chrom" in variant
        and "vcf" in variant
        and variant["vcf"].get("position")
    ):
        vcf_end = _calculate_vcf_end(variant)
        _ensure_url_section(variant)
        variant["url"]["ucsc_genome_browser"] = (
            f"https://genome.ucsc.edu/cgi-bin/hgTracks?db=hg19&"
            f"position=chr{variant['chrom']}:{variant['vcf']['position']}-{vcf_end}"
        )


def _add_hgnc_link(variant: dict[str, Any]) -> None:
    """Add HGNC link if gene name is present."""
    if "dbnsfp" in variant and variant["dbnsfp"].get("genename"):
        gene = _get_first_value(variant["dbnsfp"]["genename"])
        if gene:
            _ensure_url_section(variant)
            variant["url"]["hgnc"] = (
                f"https://www.genenames.org/data/gene-symbol-report/#!/symbol/{gene}"
            )


def inject_links(variants: list[dict[str, Any]]) -> list[dict[str, Any]]:
    """
    Inject database links into variant data.

    Args:
        variants: List of variant dictionaries from MyVariant.info API

    Returns:
        List of variant dictionaries with added URL links in appropriate sections
    """
    for variant in variants:
        _add_dbsnp_links(variant)
        _add_clinvar_link(variant)
        _add_cosmic_link(variant)
        _add_civic_link(variant)
        _add_ucsc_link(variant)
        _add_hgnc_link(variant)

    return variants

```

--------------------------------------------------------------------------------
/src/biomcp/organizations/getter.py:
--------------------------------------------------------------------------------

```python
"""Get specific organization details via NCI CTS API."""

import logging
from typing import Any

from ..constants import NCI_ORGANIZATIONS_URL
from ..integrations.cts_api import CTSAPIError, make_cts_request

logger = logging.getLogger(__name__)


async def get_organization(
    org_id: str,
    api_key: str | None = None,
) -> dict[str, Any]:
    """
    Get detailed information about a specific organization.

    Args:
        org_id: Organization ID
        api_key: Optional API key (if not provided, uses NCI_API_KEY env var)

    Returns:
        Dictionary with organization details

    Raises:
        CTSAPIError: If the API request fails or organization not found
    """
    try:
        # Make API request
        url = f"{NCI_ORGANIZATIONS_URL}/{org_id}"
        response = await make_cts_request(
            url=url,
            api_key=api_key,
        )

        # Return the organization data
        # Handle different possible response formats
        if "data" in response:
            return response["data"]
        elif "organization" in response:
            return response["organization"]
        else:
            return response

    except CTSAPIError:
        raise
    except Exception as e:
        logger.error(f"Failed to get organization {org_id}: {e}")
        raise CTSAPIError(f"Failed to retrieve organization: {e!s}") from e


def _format_address_fields(org: dict[str, Any]) -> list[str]:
    """Extract and format address fields from organization data."""
    address_fields = []

    if org.get("address"):
        addr = org["address"]
        if isinstance(addr, dict):
            fields = [
                addr.get("street", ""),
                addr.get("city", ""),
                addr.get("state", ""),
                addr.get("zip", ""),
            ]
            address_fields = [f for f in fields if f]

            country = addr.get("country", "")
            if country and country != "United States":
                address_fields.append(country)
    else:
        # Try individual fields
        city = org.get("city", "")
        state = org.get("state", "")
        address_fields = [p for p in [city, state] if p]

    return address_fields


def _format_contact_info(org: dict[str, Any]) -> list[str]:
    """Format contact information lines."""
    lines = []
    if org.get("phone"):
        lines.append(f"- **Phone**: {org['phone']}")
    if org.get("email"):
        lines.append(f"- **Email**: {org['email']}")
    if org.get("website"):
        lines.append(f"- **Website**: {org['website']}")
    return lines


def format_organization_details(org: dict[str, Any]) -> str:
    """
    Format organization details as markdown.

    Args:
        org: Organization data dictionary

    Returns:
        Formatted markdown string
    """
    # Extract fields with defaults
    org_id = org.get("id", org.get("org_id", "Unknown"))
    name = org.get("name", "Unknown Organization")
    org_type = org.get("type", org.get("category", "Unknown"))

    # Build markdown output
    lines = [
        f"## Organization: {name}",
        "",
        "### Basic Information",
        f"- **ID**: {org_id}",
        f"- **Type**: {org_type}",
    ]

    # Add location if available
    address_fields = _format_address_fields(org)
    if address_fields:
        lines.append(f"- **Location**: {', '.join(address_fields)}")

    # Add contact info
    lines.extend(_format_contact_info(org))

    # Add description if available
    if org.get("description"):
        lines.extend([
            "",
            "### Description",
            org["description"],
        ])

    # Add parent organization metadata
    if org.get("parent_org"):
        lines.extend([
            "",
            "### Parent Organization",
            f"- **Name**: {org['parent_org'].get('name', 'Unknown')}",
            f"- **ID**: {org['parent_org'].get('id', 'Unknown')}",
        ])

    return "\n".join(lines)

```

--------------------------------------------------------------------------------
/tests/tdd/utils/test_request_cache.py:
--------------------------------------------------------------------------------

```python
"""Tests for request caching utilities."""

import asyncio

import pytest

from biomcp.utils.request_cache import (
    clear_cache,
    get_cached,
    request_cache,
    set_cached,
)


class TestRequestCache:
    """Test request caching functionality."""

    @pytest.fixture(autouse=True)
    async def clear_cache_before_test(self):
        """Clear cache before each test."""
        await clear_cache()
        yield
        await clear_cache()

    @pytest.mark.asyncio
    async def test_basic_caching(self):
        """Test basic cache get/set operations."""
        # Initially should be empty
        result = await get_cached("test_key")
        assert result is None

        # Set a value
        await set_cached("test_key", "test_value", ttl=10)

        # Should retrieve the value
        result = await get_cached("test_key")
        assert result == "test_value"

    @pytest.mark.asyncio
    async def test_cache_expiry(self):
        """Test that cached values expire."""
        # Set with very short TTL
        await set_cached("test_key", "test_value", ttl=0.1)

        # Should be available immediately
        result = await get_cached("test_key")
        assert result == "test_value"

        # Wait for expiry
        await asyncio.sleep(0.2)

        # Should be expired
        result = await get_cached("test_key")
        assert result is None

    @pytest.mark.asyncio
    async def test_request_cache_decorator(self):
        """Test the @request_cache decorator."""
        call_count = 0

        @request_cache(ttl=10)
        async def expensive_function(arg1, arg2):
            nonlocal call_count
            call_count += 1
            return f"{arg1}-{arg2}-{call_count}"

        # First call should execute function
        result1 = await expensive_function("a", "b")
        assert result1 == "a-b-1"
        assert call_count == 1

        # Second call with same args should use cache
        result2 = await expensive_function("a", "b")
        assert result2 == "a-b-1"  # Same result
        assert call_count == 1  # Function not called again

        # Different args should execute function
        result3 = await expensive_function("c", "d")
        assert result3 == "c-d-2"
        assert call_count == 2

    @pytest.mark.asyncio
    async def test_skip_cache_option(self):
        """Test that skip_cache bypasses caching."""
        call_count = 0

        @request_cache(ttl=10)
        async def cached_function():
            nonlocal call_count
            call_count += 1
            return call_count

        # Normal call - cached
        result1 = await cached_function()
        assert result1 == 1

        # Skip cache - new execution
        result2 = await cached_function(skip_cache=True)
        assert result2 == 2

        # Normal call again - still cached
        result3 = await cached_function()
        assert result3 == 1

    @pytest.mark.asyncio
    async def test_none_values_not_cached(self):
        """Test that None return values are not cached."""
        call_count = 0

        @request_cache(ttl=10)
        async def sometimes_none_function(return_none=False):
            nonlocal call_count
            call_count += 1
            return None if return_none else call_count

        # Return None - should not cache
        result1 = await sometimes_none_function(return_none=True)
        assert result1 is None
        assert call_count == 1

        # Call again - should execute again (not cached)
        result2 = await sometimes_none_function(return_none=True)
        assert result2 is None
        assert call_count == 2

        # Return value - should cache
        result3 = await sometimes_none_function(return_none=False)
        assert result3 == 3
        assert call_count == 3

        # Call again - should use cache
        result4 = await sometimes_none_function(return_none=False)
        assert result4 == 3
        assert call_count == 3

```

--------------------------------------------------------------------------------
/docs/blog/ai-assisted-clinical-trial-search-analysis.md:
--------------------------------------------------------------------------------

```markdown
# AI-Assisted Clinical Trial Search: How BioMCP Transforms Research

Finding the right clinical trial for a research project has traditionally been
a complex process requiring specialized knowledge of database syntax and
medical terminology. BioMCP is changing this landscape by making clinical trial
data accessible through natural language conversation.

Video Link:
[![▶️ Watch the video](./images/trial-search-assistant.png)](https://www.youtube.com/watch?v=jqGXXnVesjg&list=PLu1amIF_MEfPWhhEsXSuBi90S_xtmVJIW&index=2)

## Breaking Down the Barriers to Clinical Trial Information

BioMCP serves as a specialized Model Context Protocol (MCP) server that
empowers AI assistants and agents with tools to interact with critical
biomedical resources. For clinical trials specifically, BioMCP connects to the
ClinicalTrials.gov API, allowing researchers and clinicians to search and
retrieve trial information through simple conversational queries.

The power of this approach becomes apparent when we look at how it transforms a
complex search requirement. Imagine needing to find active clinical trials for
pembrolizumab (a cancer immunotherapy drug) specifically for non-small cell
lung carcinoma near Cleveland, Ohio. Traditionally, this would require:

1. Navigating to ClinicalTrials.gov
2. Understanding the proper search fields and syntax
3. Creating multiple filters for intervention (pembrolizumab), condition (
   non-small cell lung carcinoma), status (recruiting), and location (Cleveland
   area)
4. Interpreting the results

## From Natural Language to Precise Database Queries

With BioMCP, this entire process is streamlined into a simple natural language
request. The underlying large language model (LLM) interprets the query,
identifies the key entities (drug name, cancer type, location), and translates
these into the precise parameters needed for the ClinicalTrials.gov API.

The system returns relevant trials that match all criteria, presenting them in
an easy-to-understand format. But the interaction doesn't end there—BioMCP
maintains context throughout the conversation, enabling follow-up questions
like:

- Where exactly are these trials located and how far are they from downtown
  Cleveland?
- What biomarker eligibility criteria do these trials require?
- Are there exclusion criteria I should be aware of?

For each of these questions, BioMCP calls the appropriate tool (trial
locations, trial protocols) and processes the information to provide meaningful
answers without requiring the user to navigate different interfaces or learn
new query languages.

## Beyond Basic Search: Understanding Trial Details

What truly sets BioMCP apart is its ability to go beyond simple listings. When
asked about biomarker eligibility criteria, the system can extract this
information from the full trial protocol, synthesize it, and present a clear
summary of requirements. This capability transforms what would typically be
hours of reading dense clinical documentation into a conversational exchange
that delivers precisely what the researcher needs.

## Transforming Clinical Research Workflows

The implications for clinical research are significant. By lowering the
technical barriers to accessing trial information, BioMCP can help:

- Researchers understand the landscape of current research in their field
- Research teams identify promising studies more efficiently
- Clinical research organizations track competing or complementary trials
- Research coordinators identify potential recruitment sites based on location

As part of the broader BioMCP ecosystem—which also includes access to genomic
variant information and PubMed literature—this clinical trial search capability
represents a fundamental shift in how we interact with biomedical information.
By bringing the power of natural language processing to specialized databases,
BioMCP is helping to democratize access to critical health information and
accelerate the research process.

```

--------------------------------------------------------------------------------
/src/biomcp/utils/query_utils.py:
--------------------------------------------------------------------------------

```python
"""Utilities for query parsing and manipulation."""

import re
from typing import Any


def parse_or_query(query: str) -> list[str]:
    """Parse OR query into individual search terms.

    Handles formats like:
    - "term1 OR term2"
    - 'term1 OR term2 OR "term with spaces"'
    - "TERM1 or term2 or term3" (case insensitive)

    Args:
        query: Query string that may contain OR operators

    Returns:
        List of individual search terms with quotes and whitespace cleaned

    Examples:
        >>> parse_or_query("PD-L1 OR CD274")
        ['PD-L1', 'CD274']

        >>> parse_or_query('BRAF OR "v-raf murine" OR ARAF')
        ['BRAF', 'v-raf murine', 'ARAF']
    """
    # Split by OR (case insensitive)
    terms = re.split(r"\s+OR\s+", query, flags=re.IGNORECASE)

    # Clean up each term - remove quotes and extra whitespace
    cleaned_terms = []
    for term in terms:
        # Remove surrounding quotes (both single and double)
        term = term.strip().strip('"').strip("'").strip()
        if term:
            cleaned_terms.append(term)

    return cleaned_terms


def contains_or_operator(query: str) -> bool:
    """Check if a query contains OR operators.

    Args:
        query: Query string to check

    Returns:
        True if query contains " OR " or " or ", False otherwise
    """
    return " OR " in query or " or " in query


async def search_with_or_support(
    query: str,
    search_func: Any,
    search_params: dict[str, Any],
    id_field: str = "id",
    fallback_id_field: str | None = None,
) -> dict[str, Any]:
    """Generic OR query search handler.

    This function handles OR queries by making multiple API calls and combining results.

    Args:
        query: Query string that may contain OR operators
        search_func: Async search function to call for each term
        search_params: Base parameters to pass to search function (excluding the query term)
        id_field: Primary field name for deduplication (default: "id")
        fallback_id_field: Alternative field name if primary is missing

    Returns:
        Combined results from all searches with duplicates removed
    """
    # Check if this is an OR query
    if contains_or_operator(query):
        search_terms = parse_or_query(query)
    else:
        search_terms = [query]

    # Collect all unique results
    all_results = {}
    total_found = 0

    # Search for each term
    for term in search_terms:
        try:
            # Call the search function with the term
            results = await search_func(**{**search_params, "name": term})

            # Extract results list (handle different response formats)
            items_key = None
            for key in [
                "biomarkers",
                "organizations",
                "interventions",
                "diseases",
                "data",
                "items",
            ]:
                if key in results:
                    items_key = key
                    break

            if not items_key:
                continue

            # Add unique items (deduplicate by ID)
            for item in results.get(items_key, []):
                item_id = item.get(id_field)
                if not item_id and fallback_id_field:
                    item_id = item.get(fallback_id_field)

                if item_id and item_id not in all_results:
                    all_results[item_id] = item

            total_found += results.get("total", 0)

        except Exception as e:
            # Log the error and continue with other terms
            import logging

            logger = logging.getLogger(__name__)
            logger.warning(f"Failed to search for term '{term}': {e}")
            continue

    # Convert back to list
    unique_items = list(all_results.values())

    # Return in standard format
    return {
        "items": unique_items,
        "total": len(unique_items),
        "search_terms": search_terms,
        "total_found_across_terms": total_found,
    }

```

--------------------------------------------------------------------------------
/tests/tdd/test_endpoint_documentation.py:
--------------------------------------------------------------------------------

```python
"""Test that endpoint documentation is kept up to date."""

import subprocess
import sys
from pathlib import Path


class TestEndpointDocumentation:
    """Test the endpoint documentation generation."""

    def test_third_party_endpoints_file_exists(self):
        """Test that THIRD_PARTY_ENDPOINTS.md exists."""
        endpoints_file = (
            Path(__file__).parent.parent.parent / "THIRD_PARTY_ENDPOINTS.md"
        )
        assert endpoints_file.exists(), "THIRD_PARTY_ENDPOINTS.md must exist"

    def test_endpoints_documentation_is_current(self):
        """Test that the endpoints documentation can be generated without errors."""
        # Run the generation script
        script_path = (
            Path(__file__).parent.parent.parent
            / "scripts"
            / "generate_endpoints_doc.py"
        )
        result = subprocess.run(  # noqa: S603
            [sys.executable, str(script_path)],
            capture_output=True,
            text=True,
            check=False,
        )

        assert result.returncode == 0, f"Script failed: {result.stderr}"

        # The script should report that it generated the file
        assert (
            "Generated" in result.stdout or result.stdout == ""
        ), f"Unexpected output: {result.stdout}"

    def test_all_endpoints_documented(self):
        """Test that all endpoints in the registry are documented."""
        from biomcp.utils.endpoint_registry import get_registry

        registry = get_registry()
        endpoints = registry.get_all_endpoints()

        # Read the documentation
        endpoints_file = (
            Path(__file__).parent.parent.parent / "THIRD_PARTY_ENDPOINTS.md"
        )
        content = endpoints_file.read_text()

        # Check each endpoint is mentioned
        for key, info in endpoints.items():
            assert key in content, f"Endpoint {key} not found in documentation"
            assert (
                info.url in content
            ), f"URL {info.url} not found in documentation"

    def test_documentation_contains_required_sections(self):
        """Test that documentation contains all required sections."""
        endpoints_file = (
            Path(__file__).parent.parent.parent / "THIRD_PARTY_ENDPOINTS.md"
        )
        content = endpoints_file.read_text()

        required_sections = [
            "# Third-Party Endpoints Used by BioMCP",
            "## Overview",
            "## Endpoints by Category",
            "### Biomedical Literature",
            "### Clinical Trials",
            "### Variant Databases",
            "### Cancer Genomics",
            "## Domain Summary",
            "## Compliance and Privacy",
            "## Network Control",
            "BIOMCP_OFFLINE",
        ]

        for section in required_sections:
            assert (
                section in content
            ), f"Required section '{section}' not found in documentation"

    def test_endpoint_counts_accurate(self):
        """Test that endpoint counts in the overview are accurate."""
        from biomcp.utils.endpoint_registry import get_registry

        registry = get_registry()
        endpoints = registry.get_all_endpoints()
        domains = registry.get_unique_domains()

        endpoints_file = (
            Path(__file__).parent.parent.parent / "THIRD_PARTY_ENDPOINTS.md"
        )
        content = endpoints_file.read_text()

        # Extract counts from overview
        import re

        match = re.search(
            r"BioMCP connects to (\d+) external domains across (\d+) endpoints",
            content,
        )

        assert match, "Could not find endpoint counts in overview"

        doc_domains = int(match.group(1))
        doc_endpoints = int(match.group(2))

        assert (
            doc_domains == len(domains)
        ), f"Document says {doc_domains} domains but registry has {len(domains)}"
        assert (
            doc_endpoints == len(endpoints)
        ), f"Document says {doc_endpoints} endpoints but registry has {len(endpoints)}"

```

--------------------------------------------------------------------------------
/src/biomcp/cli/organizations.py:
--------------------------------------------------------------------------------

```python
"""CLI commands for organization search and lookup."""

import asyncio
from typing import Annotated

import typer

from ..integrations.cts_api import CTSAPIError, get_api_key_instructions
from ..organizations import get_organization, search_organizations
from ..organizations.getter import format_organization_details
from ..organizations.search import format_organization_results

organization_app = typer.Typer(
    no_args_is_help=True,
    help="Search and retrieve organization information from NCI CTS API",
)


@organization_app.command("search")
def search_organizations_cli(
    name: Annotated[
        str | None,
        typer.Argument(
            help="Organization name to search for (partial match supported)"
        ),
    ] = None,
    org_type: Annotated[
        str | None,
        typer.Option(
            "--type",
            help="Type of organization (e.g., industry, academic)",
        ),
    ] = None,
    city: Annotated[
        str | None,
        typer.Option(
            "--city",
            help="City location",
        ),
    ] = None,
    state: Annotated[
        str | None,
        typer.Option(
            "--state",
            help="State location (2-letter code)",
        ),
    ] = None,
    page_size: Annotated[
        int,
        typer.Option(
            "--page-size",
            help="Number of results per page",
            min=1,
            max=100,
        ),
    ] = 20,
    page: Annotated[
        int,
        typer.Option(
            "--page",
            help="Page number",
            min=1,
        ),
    ] = 1,
    api_key: Annotated[
        str | None,
        typer.Option(
            "--api-key",
            help="NCI API key (overrides NCI_API_KEY env var)",
            envvar="NCI_API_KEY",
        ),
    ] = None,
) -> None:
    """
    Search for organizations in the NCI Clinical Trials database.

    Examples:
        # Search by name
        biomcp organization search "MD Anderson"

        # Search by type
        biomcp organization search --type academic

        # Search by location
        biomcp organization search --city Boston --state MA

        # Combine filters
        biomcp organization search Cancer --type industry --state CA
    """
    try:
        results = asyncio.run(
            search_organizations(
                name=name,
                org_type=org_type,
                city=city,
                state=state,
                page_size=page_size,
                page=page,
                api_key=api_key,
            )
        )

        output = format_organization_results(results)
        typer.echo(output)

    except CTSAPIError as e:
        if "API key required" in str(e):
            typer.echo(get_api_key_instructions())
        else:
            typer.echo(f"Error: {e}", err=True)
        raise typer.Exit(1) from e
    except Exception as e:
        typer.echo(f"Unexpected error: {e}", err=True)
        raise typer.Exit(1) from e


@organization_app.command("get")
def get_organization_cli(
    org_id: Annotated[
        str,
        typer.Argument(help="Organization ID"),
    ],
    api_key: Annotated[
        str | None,
        typer.Option(
            "--api-key",
            help="NCI API key (overrides NCI_API_KEY env var)",
            envvar="NCI_API_KEY",
        ),
    ] = None,
) -> None:
    """
    Get detailed information about a specific organization.

    Example:
        biomcp organization get ORG123456
    """
    try:
        org_data = asyncio.run(
            get_organization(
                org_id=org_id,
                api_key=api_key,
            )
        )

        output = format_organization_details(org_data)
        typer.echo(output)

    except CTSAPIError as e:
        if "API key required" in str(e):
            typer.echo(get_api_key_instructions())
        else:
            typer.echo(f"Error: {e}", err=True)
        raise typer.Exit(1) from e
    except Exception as e:
        typer.echo(f"Unexpected error: {e}", err=True)
        raise typer.Exit(1) from e

```

--------------------------------------------------------------------------------
/tests/bdd/search_variants/test_search.py:
--------------------------------------------------------------------------------

```python
import json
import shlex
from typing import Any

from assertpy import assert_that
from pytest_bdd import parsers, scenarios, then, when
from typer.testing import CliRunner

from biomcp.cli import app

scenarios("search.feature")

runner = CliRunner()

# Field mapping - Updated chromosome key
FIELD_MAP = {
    "chromosome": ["chrom"],
    "frequency": ["gnomad_exome", "af", "af"],
    "gene": ["dbnsfp", "genename"],
    "hgvsc": ["dbnsfp", "hgvsc"],
    "hgvsp": ["dbnsfp", "hgvsp"],
    "cadd": ["cadd", "phred"],
    "polyphen": ["dbnsfp", "polyphen2", "hdiv", "pred"],
    "position": ["vcf", "position"],
    "rsid": ["dbsnp", "rsid"],
    "sift": ["dbnsfp", "sift", "pred"],
    "significance": ["clinvar", "rcv", "clinical_significance"],
    "uniprot_id": ["mutdb", "uniprot_id"],
}


def get_value(data: dict, key: str) -> Any | None:
    """Extract value from nested dictionary using field mapping."""
    key_path = FIELD_MAP.get(key, [key])
    current_value = data.get("hits")
    for key in key_path:
        if isinstance(current_value, dict):
            current_value = current_value.get(key)
        elif isinstance(current_value, list):
            current_value = current_value[0].get(key)
    if current_value and isinstance(current_value, list):
        return current_value[0]
    return current_value


# --- @when Step ---
@when(
    parsers.re(r'I run "(?P<command>.*?)"(?: #.*)?$'),
    target_fixture="variants_data",
)
def variants_data(command) -> dict:
    """Run variant search command with --json and return parsed results."""
    args = shlex.split(command)[1:]  # trim 'biomcp'
    args += ["--json"]
    if "--size" not in args:
        args.extend(["--size", "10"])

    result = runner.invoke(app, args, catch_exceptions=False)
    assert result.exit_code == 0, "CLI command failed"
    data = json.loads(result.stdout)
    return data


def normalize(v):
    try:
        return float(v)
    except ValueError:
        try:
            return int(v)
        except ValueError:
            return v.lower()


@then(
    parsers.re(
        r"each variant should have (?P<field>\w+) that (?P<operator>(?:is|equal|to|contains|greater|less|than|or|\s)+)\s+(?P<expected>.+)$"
    )
)
def check_variant_field(it, variants_data, field, operator, expected):
    """
    For each variant, apply an assertpy operator against a given field.
    Supports operator names with spaces (e.g. "is equal to") or underscores (e.g. "is_equal_to").
    """
    # Normalize operator: lower case and replace spaces with underscores.
    operator = operator.strip().lower().replace(" ", "_")
    successes = set()
    failures = set()
    for v_num, value in it(FIELD_MAP, variants_data, field):
        value = normalize(value)
        expected = normalize(expected)
        f = getattr(assert_that(value), operator)
        try:
            f(expected)
            successes.add(v_num)
        except AssertionError:
            failures.add(v_num)

    failures -= successes
    assert len(failures) == 0, f"Failure: {field} {operator} {expected}"


@then(
    parsers.re(
        r"the number of variants (?P<operator>(?:is|equal|to|contains|greater|less|than|or|\s)+)\s+(?P<expected>\d+)$"
    )
)
def number_of_variants_check(variants_data, operator, expected):
    """Check the number of variants returned."""
    if (
        isinstance(variants_data, list)
        and len(variants_data) == 1
        and "error" in variants_data[0]
    ):
        count = 0  # If we have an error response, count as 0 variants
    elif isinstance(variants_data, dict) and "variants" in variants_data:
        # Handle new format with cBioPortal summary
        count = len(variants_data["variants"])
    elif isinstance(variants_data, dict) and "hits" in variants_data:
        # Handle myvariant.info response format
        count = len(variants_data["hits"])
    else:
        count = len(variants_data) if isinstance(variants_data, list) else 0
    operator = operator.strip().lower().replace(" ", "_")
    f = getattr(assert_that(count), operator)
    f(int(expected))

```

--------------------------------------------------------------------------------
/src/biomcp/cli/diseases.py:
--------------------------------------------------------------------------------

```python
"""CLI commands for disease information and search."""

import asyncio
from typing import Annotated

import typer

from ..diseases import get_disease
from ..diseases.search import format_disease_results, search_diseases
from ..integrations.cts_api import CTSAPIError, get_api_key_instructions

disease_app = typer.Typer(
    no_args_is_help=True,
    help="Search and retrieve disease information",
)


@disease_app.command("get")
def get_disease_cli(
    disease_name: Annotated[
        str,
        typer.Argument(help="Disease name or identifier"),
    ],
) -> None:
    """
    Get disease information from MyDisease.info.

    This returns detailed information including synonyms, definitions,
    and database cross-references.

    Examples:
        biomcp disease get melanoma
        biomcp disease get "lung cancer"
        biomcp disease get GIST
    """
    result = asyncio.run(get_disease(disease_name))
    typer.echo(result)


@disease_app.command("search")
def search_diseases_cli(
    name: Annotated[
        str | None,
        typer.Argument(
            help="Disease name to search for (partial match supported)"
        ),
    ] = None,
    include_synonyms: Annotated[
        bool,
        typer.Option(
            "--synonyms/--no-synonyms",
            help="[Deprecated] This option is ignored - API always searches synonyms",
        ),
    ] = True,
    category: Annotated[
        str | None,
        typer.Option(
            "--category",
            help="Disease category/type filter",
        ),
    ] = None,
    page_size: Annotated[
        int,
        typer.Option(
            "--page-size",
            help="Number of results per page",
            min=1,
            max=100,
        ),
    ] = 20,
    page: Annotated[
        int,
        typer.Option(
            "--page",
            help="Page number",
            min=1,
        ),
    ] = 1,
    api_key: Annotated[
        str | None,
        typer.Option(
            "--api-key",
            help="NCI API key (overrides NCI_API_KEY env var)",
            envvar="NCI_API_KEY",
        ),
    ] = None,
    source: Annotated[
        str,
        typer.Option(
            "--source",
            help="Data source: 'mydisease' (default) or 'nci'",
            show_choices=True,
        ),
    ] = "mydisease",
) -> None:
    """
    Search for diseases in MyDisease.info or NCI CTS database.

    The NCI source provides controlled vocabulary of cancer conditions
    used in clinical trials, with official terms and synonyms.

    Examples:
        # Search MyDisease.info (default)
        biomcp disease search melanoma

        # Search NCI cancer terms
        biomcp disease search melanoma --source nci

        # Search without synonyms
        biomcp disease search "breast cancer" --no-synonyms --source nci

        # Filter by category
        biomcp disease search --category neoplasm --source nci
    """
    if source == "nci":
        # Use NCI CTS API
        try:
            results = asyncio.run(
                search_diseases(
                    name=name,
                    include_synonyms=include_synonyms,
                    category=category,
                    page_size=page_size,
                    page=page,
                    api_key=api_key,
                )
            )

            output = format_disease_results(results)
            typer.echo(output)

        except CTSAPIError as e:
            if "API key required" in str(e):
                typer.echo(get_api_key_instructions())
            else:
                typer.echo(f"Error: {e}", err=True)
            raise typer.Exit(1) from e
        except Exception as e:
            typer.echo(f"Unexpected error: {e}", err=True)
            raise typer.Exit(1) from e
    else:
        # Default to MyDisease.info
        # For now, just search by name
        if name:
            result = asyncio.run(get_disease(name))
            typer.echo(result)
        else:
            typer.echo("Please provide a disease name to search for.")
            raise typer.Exit(1)

```

--------------------------------------------------------------------------------
/tests/tdd/test_mcp_tools.py:
--------------------------------------------------------------------------------

```python
"""Tests for MCP tool wrappers."""

import json
from unittest.mock import patch

import pytest

from biomcp.articles.search import _article_searcher


class TestArticleSearcherMCPTool:
    """Test the _article_searcher MCP tool."""

    @pytest.mark.asyncio
    async def test_article_searcher_with_all_params(self):
        """Test article_searcher with all parameters."""
        mock_results = [{"title": "Test Article", "pmid": 12345}]

        with patch(
            "biomcp.articles.search_optimized.article_searcher_optimized"
        ) as mock_search:
            mock_search.return_value = json.dumps(mock_results)

            await _article_searcher(
                call_benefit="Testing search functionality",
                chemicals="aspirin,ibuprofen",
                diseases="cancer,diabetes",
                genes="BRAF,TP53",
                keywords="mutation,therapy",
                variants="V600E,R175H",
                include_preprints=True,
            )

            # Verify the function was called
            mock_search.assert_called_once()

            # Check the parameters were passed correctly
            kwargs = mock_search.call_args[1]
            assert kwargs["call_benefit"] == "Testing search functionality"
            assert kwargs["chemicals"] == "aspirin,ibuprofen"
            assert kwargs["diseases"] == "cancer,diabetes"
            assert kwargs["genes"] == "BRAF,TP53"
            assert kwargs["keywords"] == "mutation,therapy"
            assert kwargs["variants"] == "V600E,R175H"
            assert kwargs["include_preprints"] is True
            assert kwargs.get("include_cbioportal", True) is True

    @pytest.mark.asyncio
    async def test_article_searcher_with_lists(self):
        """Test article_searcher with list inputs."""
        with patch(
            "biomcp.articles.search_optimized.article_searcher_optimized"
        ) as mock_search:
            mock_search.return_value = "## Results"

            await _article_searcher(
                call_benefit="Testing with lists",
                chemicals=["drug1", "drug2"],
                diseases=["disease1"],
                genes=["GENE1"],
                include_preprints=False,
            )

            # Check list parameters were passed correctly
            kwargs = mock_search.call_args[1]
            assert kwargs["call_benefit"] == "Testing with lists"
            assert kwargs["chemicals"] == ["drug1", "drug2"]
            assert kwargs["diseases"] == ["disease1"]
            assert kwargs["genes"] == ["GENE1"]
            assert kwargs["include_preprints"] is False

    @pytest.mark.asyncio
    async def test_article_searcher_minimal_params(self):
        """Test article_searcher with minimal parameters."""
        with patch(
            "biomcp.articles.search_optimized.article_searcher_optimized"
        ) as mock_search:
            mock_search.return_value = "## No results"

            await _article_searcher(call_benefit="Minimal test")

            # Should still work with no search parameters
            kwargs = mock_search.call_args[1]
            assert kwargs["call_benefit"] == "Minimal test"
            assert kwargs.get("chemicals") is None
            assert kwargs.get("diseases") is None
            assert kwargs.get("genes") is None
            assert kwargs.get("keywords") is None
            assert kwargs.get("variants") is None

    @pytest.mark.asyncio
    async def test_article_searcher_empty_strings(self):
        """Test article_searcher with empty strings."""
        with patch(
            "biomcp.articles.search_optimized.article_searcher_optimized"
        ) as mock_search:
            mock_search.return_value = "## Results"

            await _article_searcher(
                call_benefit="Empty string test",
                chemicals="",
                diseases="",
                genes="",
            )

            # Empty strings are passed through
            kwargs = mock_search.call_args[1]
            assert kwargs["call_benefit"] == "Empty string test"
            assert kwargs["chemicals"] == ""
            assert kwargs["diseases"] == ""
            assert kwargs["genes"] == ""

```

--------------------------------------------------------------------------------
/docs/developer-guides/07-performance-optimizations.md:
--------------------------------------------------------------------------------

```markdown
# Performance Optimizations

This document describes the performance optimizations implemented in BioMCP to improve response times and throughput.

## Overview

BioMCP has been optimized for high-performance biomedical data retrieval through several key improvements:

- **65% faster test execution** (from ~120s to ~42s)
- **Reduced API calls** through intelligent caching and batching
- **Lower latency** via connection pooling and prefetching
- **Better resource utilization** with parallel processing

## Key Optimizations

### 1. Connection Pooling

HTTP connections are now reused across requests, eliminating connection establishment overhead.

**Configuration:**

- `BIOMCP_USE_CONNECTION_POOL` - Enable/disable pooling (default: "true")
- Automatically manages pools per event loop
- Graceful cleanup on shutdown

**Impact:** ~30% reduction in request latency for sequential operations

### 2. Parallel Test Execution

Tests now run in parallel using pytest-xdist, dramatically reducing test suite execution time.

**Usage:**

```bash
make test  # Automatically uses parallel execution
```

**Impact:** ~5x faster test execution

### 3. Request Batching

Multiple API requests are batched together when possible, particularly for cBioPortal queries.

**Features:**

- Automatic batching based on size/time thresholds
- Configurable batch size (default: 5 for cBioPortal)
- Error isolation per request

**Impact:** Up to 80% reduction in API calls for bulk operations

### 4. Smart Caching

Multiple caching layers optimize repeated queries:

- **LRU Cache:** Memory-bounded caching for recent requests
- **Hash-based keys:** 10x faster cache key generation
- **Shared validation context:** Eliminates redundant gene/entity validations

**Configuration:**

- Cache size: 1000 entries (configurable)
- TTL: 5-30 minutes depending on data type

### 5. Prefetching

Common entities are prefetched on startup to warm caches:

- Top genes: BRAF, EGFR, TP53, KRAS, etc.
- Common diseases: lung cancer, breast cancer, etc.
- Frequent chemicals: osimertinib, pembrolizumab, etc.

**Impact:** First queries for common entities are instant

### 6. Pagination Support

Europe PMC searches now use pagination for large result sets:

- Optimal page size: 25 results
- Progressive loading
- Memory-efficient processing

### 7. Conditional Metrics

Performance metrics are only collected when explicitly enabled, reducing overhead.

**Configuration:**

- `BIOMCP_METRICS_ENABLED` - Enable metrics (default: "false")

## Performance Benchmarks

### API Response Times

| Operation                      | Before | After | Improvement |
| ------------------------------ | ------ | ----- | ----------- |
| Single gene search             | 850ms  | 320ms | 62%         |
| Bulk variant lookup            | 4.2s   | 1.1s  | 74%         |
| Article search with cBioPortal | 2.1s   | 780ms | 63%         |

### Resource Usage

| Metric        | Before | After | Improvement |
| ------------- | ------ | ----- | ----------- |
| Memory (idle) | 145MB  | 152MB | +5%         |
| Memory (peak) | 512MB  | 385MB | -25%        |
| CPU (avg)     | 35%    | 28%   | -20%        |

## Best Practices

1. **Keep connection pooling enabled** unless experiencing issues
2. **Use the unified search** methods to benefit from parallel execution
3. **Batch operations** when performing multiple lookups
4. **Monitor cache hit rates** in production environments

## Troubleshooting

### Connection Pool Issues

If experiencing connection errors:

1. Disable pooling: `export BIOMCP_USE_CONNECTION_POOL=false`
2. Check for firewall/proxy issues
3. Verify SSL certificates

### Memory Usage

If memory usage is high:

1. Reduce cache size in `request_cache.py`
2. Lower connection pool limits
3. Disable prefetching by removing the lifespan hook

### Performance Regression

To identify performance issues:

1. Enable metrics: `export BIOMCP_METRICS_ENABLED=true`
2. Check slow operations in logs
3. Profile with `py-spy` or similar tools

## Future Optimizations

Planned improvements include:

- GraphQL batching for complex queries
- Redis integration for distributed caching
- WebSocket support for real-time updates
- GPU acceleration for variant analysis

```

--------------------------------------------------------------------------------
/docs/tutorials/remote-connection.md:
--------------------------------------------------------------------------------

```markdown
# Connecting to Remote BioMCP

This guide walks you through connecting Claude to the remote BioMCP server, providing instant access to biomedical research tools without any local installation.

## Overview

The remote BioMCP server (https://remote.biomcp.org/mcp) provides cloud-hosted access to all BioMCP tools. This eliminates the need for local installation while maintaining full functionality.

!!! success "Benefits of Remote Connection" - **No Installation Required**: Start using BioMCP immediately - **Always Up-to-Date**: Automatically receive the latest features and improvements - **Cloud-Powered**: Leverage server-side resources for faster searches - **Secure Authentication**: Uses Google OAuth for secure access

!!! info "Privacy Notice"
We log user emails and queries to improve the service. All data is handled according to our privacy policy.

## Step-by-Step Setup

### Step 1: Access Custom Connectors

Navigate to the **Custom Connectors** section in your Claude interface. This is where you'll configure the connection to BioMCP.

![Navigate to Custom Connectors](../assets/remote-connection/00_connectors.png)

### Step 2: Add Custom Connector

Click the **Add Custom Connector** button and enter the following details:

- **Name**: BioMCP
- **URL**: `https://remote.biomcp.org/mcp`

![Add Custom Connector Dialog](../assets/remote-connection/01_add_custom_connector.png)

### Step 3: Verify Connector is Enabled

After adding, you should see BioMCP listed with an "Enabled" status. This confirms the connector was added successfully.

![Connector Enabled Status](../assets/remote-connection/02_connector_enabled.png)

### Step 4: Connect to BioMCP

Return to the main Connectors section where you'll now see BioMCP available for connection. Click the **Connect** button.

![Connect to BioMCP](../assets/remote-connection/03_connect_to_biomcp.png)

### Step 5: Authenticate with Google

You'll be redirected to Google OAuth for authentication. Sign in with any valid Google account. This step ensures secure access to the service.

![Google OAuth Authentication](../assets/remote-connection/04_select_google_oauth.png)

!!! note "Authentication" - Any valid Google account works - Your email is logged for service improvement - Authentication is handled securely through Google OAuth

### Step 6: Connection Success

Once authenticated, you'll see a successful connection message displaying the available tool count. As of January 2025, there are 23 tools available (this number may increase as new features are added).

![Successful Connection](../assets/remote-connection/05_success_connect.png)

## Verifying Your Connection

After successful connection, you can verify BioMCP is working by asking Claude:

```
What tools do you have available from BioMCP?
```

Claude should list the available tools including:

- Article search and retrieval (PubMed/PubTator3)
- Clinical trials search (ClinicalTrials.gov and NCI)
- Variant analysis (MyVariant.info)
- Gene, drug, and disease information
- Sequential thinking for complex research

## Troubleshooting

### Connection Failed

- Ensure you entered the URL exactly as shown: `https://remote.biomcp.org/mcp`
- Check your internet connection
- Try disconnecting and reconnecting

### Authentication Issues

- Make sure you're using a valid Google account
- Clear your browser cache if authentication hangs
- Try using a different browser if issues persist

### Tools Not Available

- Disconnect and reconnect to BioMCP
- Refresh your Claude session
- Contact support if tools remain unavailable

## Next Steps

Now that you're connected to BioMCP, you can:

1. **Search biomedical literature**: "Find recent papers on BRAF mutations in melanoma"
2. **Analyze clinical trials**: "What trials are recruiting for lung cancer with EGFR mutations?"
3. **Interpret variants**: "What is the clinical significance of TP53 p.R273H?"
4. **Explore drug information**: "Tell me about pembrolizumab's mechanism and indications"

## Support

For issues or questions about the remote BioMCP connection:

- GitHub Issues: [https://github.com/genomoncology/biomcp/issues](https://github.com/genomoncology/biomcp/issues)
- Documentation: [https://biomcp.org](https://biomcp.org)

```

--------------------------------------------------------------------------------
/tests/config/test_smithery_config.py:
--------------------------------------------------------------------------------

```python
#!/usr/bin/env python
"""
Test script to validate Smithery configuration against actual function implementations.
This script checks that the schema definitions in smithery.yaml match the expected
function parameters in your codebase.
"""

import os
from typing import Any

import pytest
import yaml
from pydantic import BaseModel

from biomcp.articles.search import PubmedRequest

# Import the functions we want to test
from biomcp.trials.search import TrialQuery
from biomcp.variants.search import VariantQuery


@pytest.fixture
def smithery_config():
    """Load the Smithery configuration."""
    # Get the project root directory
    project_root = os.path.abspath(
        os.path.join(os.path.dirname(__file__), "../..")
    )
    config_path = os.path.join(project_root, "smithery.yaml")

    with open(config_path) as f:
        return yaml.safe_load(f)


def test_smithery_config(smithery_config):
    """Test that all tool schemas in smithery.yaml match the expected function parameters."""
    # Functions to test and their expected parameter types
    functions_to_test = {
        "trial_searcher": {"param_name": "query", "expected_type": TrialQuery},
        "variant_searcher": {
            "param_name": "query",
            "expected_type": VariantQuery,
        },
        "article_searcher": {
            "param_name": "query",
            "expected_type": PubmedRequest,
        },
        "trial_protocol": {"param_name": "nct_id", "expected_type": str},
        "trial_locations": {"param_name": "nct_id", "expected_type": str},
        "trial_outcomes": {"param_name": "nct_id", "expected_type": str},
        "trial_references": {"param_name": "nct_id", "expected_type": str},
        "article_details": {"param_name": "pmid", "expected_type": str},
        "variant_details": {"param_name": "variant_id", "expected_type": str},
    }

    for tool_name, param_info in functions_to_test.items():
        validate_tool_schema(smithery_config, tool_name, param_info)


def validate_tool_schema(
    smithery_config, tool_name: str, param_info: dict[str, Any]
):
    """Validate that the tool schema in smithery.yaml matches the expected function parameter."""
    param_name = param_info["param_name"]
    expected_type = param_info["expected_type"]

    # Check if the tool is defined in the smithery.yaml
    assert tool_name in smithery_config.get(
        "tools", {}
    ), f"Tool '{tool_name}' is not defined in smithery.yaml"

    tool_config = smithery_config["tools"][tool_name]

    # Check if the tool has an input schema
    assert (
        "input" in tool_config
    ), f"Tool '{tool_name}' does not have an input schema defined"

    input_schema = tool_config["input"].get("schema", {})

    # Check if the parameter is required
    if issubclass(expected_type, BaseModel):
        # For complex types like TrialQuery, check if 'query' is required
        assert (
            "required" in input_schema
        ), f"Tool '{tool_name}' does not have required parameters specified"
        assert (
            "query" in input_schema.get("required", [])
        ), f"Parameter 'query' for tool '{tool_name}' is not marked as required"
    else:
        assert (
            "required" in input_schema
        ), f"Tool '{tool_name}' does not have required parameters specified"
        assert (
            param_name in input_schema.get("required", [])
        ), f"Parameter '{param_name}' for tool '{tool_name}' is not marked as required"

    # For complex types (Pydantic models), check if the schema references the correct type
    if issubclass(expected_type, BaseModel):
        properties = input_schema.get("properties", {})
        assert (
            "query" in properties
        ), f"Tool '{tool_name}' does not have a 'query' property defined"

        query_prop = properties["query"]
        assert (
            "$ref" in query_prop
        ), f"Tool '{tool_name}' query property does not reference a schema"

        schema_ref = query_prop["$ref"]
        expected_schema_name = expected_type.__name__
        assert schema_ref.endswith(
            expected_schema_name
        ), f"Tool '{tool_name}' references incorrect schema: {schema_ref}, expected: {expected_schema_name}"

```

--------------------------------------------------------------------------------
/scripts/check_http_imports.py:
--------------------------------------------------------------------------------

```python
#!/usr/bin/env python3
"""Check for direct HTTP library imports outside of allowed files."""

import ast
import sys
from pathlib import Path

# HTTP libraries to check for
HTTP_LIBRARIES = {
    "httpx",
    "aiohttp",
    "requests",
    "urllib3",
}  # Note: urllib is allowed for URL parsing

# Files allowed to import HTTP libraries
ALLOWED_FILES = {
    "http_client.py",
    "http_client_simple.py",
    "http_client_test.py",
    "test_http_client.py",
    "connection_pool.py",  # Connection pooling infrastructure
}

# Additional allowed patterns (for version checks, etc.)
ALLOWED_PATTERNS = {
    # Allow httpx import just for version check
    ("health.py", "httpx"): "version check only",
}


def _check_import_node(
    node: ast.Import, file_name: str
) -> set[tuple[str, int]]:
    """Check ast.Import node for violations."""
    violations = set()
    for alias in node.names:
        module_name = alias.name.split(".")[0]
        if module_name in HTTP_LIBRARIES:
            pattern_key = (file_name, module_name)
            if pattern_key not in ALLOWED_PATTERNS:
                violations.add((module_name, node.lineno))
    return violations


def _check_import_from_node(
    node: ast.ImportFrom, file_name: str
) -> set[tuple[str, int]]:
    """Check ast.ImportFrom node for violations."""
    violations = set()
    if node.module:
        module_name = node.module.split(".")[0]
        if module_name in HTTP_LIBRARIES:
            pattern_key = (file_name, module_name)
            if pattern_key not in ALLOWED_PATTERNS:
                violations.add((module_name, node.lineno))
    return violations


def check_imports(file_path: Path) -> set[tuple[str, int]]:
    """Check a Python file for HTTP library imports.

    Returns set of (library, line_number) tuples for violations.
    """
    violations = set()

    # Check if this file is allowed
    if file_path.name in ALLOWED_FILES:
        return violations

    try:
        with open(file_path, encoding="utf-8") as f:
            content = f.read()

        tree = ast.parse(content)

        for node in ast.walk(tree):
            if isinstance(node, ast.Import):
                violations.update(_check_import_node(node, file_path.name))
            elif isinstance(node, ast.ImportFrom):
                violations.update(
                    _check_import_from_node(node, file_path.name)
                )

    except Exception as e:
        print(f"Error parsing {file_path}: {e}", file=sys.stderr)

    return violations


def find_python_files(root_dir: Path) -> list[Path]:
    """Find all Python files in the project."""
    python_files = []

    for path in root_dir.rglob("*.py"):
        # Skip virtual environments, cache, etc.
        if any(
            part.startswith(".")
            or part in ["__pycache__", "venv", "env", ".tox"]
            for part in path.parts
        ):
            continue
        python_files.append(path)

    return python_files


def main():
    """Main function to check all Python files."""
    # Get project root (parent of scripts directory)
    script_dir = Path(__file__).parent
    project_root = script_dir.parent
    src_dir = project_root / "src"

    # Find all Python files
    python_files = find_python_files(src_dir)

    all_violations = []

    for file_path in python_files:
        violations = check_imports(file_path)
        if violations:
            for lib, line in violations:
                all_violations.append((file_path, lib, line))

    if all_violations:
        print("❌ Found direct HTTP library imports:\n")
        for file_path, lib, line in sorted(all_violations):
            rel_path = file_path.relative_to(project_root)
            print(f"  {rel_path}:{line} - imports '{lib}'")

        print(f"\n❌ Total violations: {len(all_violations)}")
        print(
            "\nPlease use the centralized HTTP client (biomcp.http_client) instead."
        )
        print(
            "If you need to add an exception, update ALLOWED_FILES or ALLOWED_PATTERNS in this script."
        )
        return 1
    else:
        print("✅ No direct HTTP library imports found outside allowed files.")
        return 0


if __name__ == "__main__":
    sys.exit(main())

```

--------------------------------------------------------------------------------
/src/biomcp/variants/cbioportal_search_helpers.py:
--------------------------------------------------------------------------------

```python
"""Helper functions for cBioPortal search to reduce complexity."""

import logging
import re
from typing import Any

from .cbioportal_search import GeneHotspot

logger = logging.getLogger(__name__)


async def process_mutation_results(
    mutation_results: list[tuple[Any, str]],
    cancer_types_lookup: dict[str, dict[str, Any]],
    client: Any,
) -> dict[str, Any]:
    """Process mutation results from multiple studies.

    Args:
        mutation_results: List of (result, study_id) tuples
        cancer_types_lookup: Cancer type lookup dictionary
        client: Client instance for API calls

    Returns:
        Dictionary with aggregated mutation data
    """
    total_mutations = 0
    total_samples = 0
    hotspot_counts: dict[str, dict[str, Any]] = {}
    cancer_distribution: dict[str, int] = {}
    studies_with_data = 0

    for result, study_id in mutation_results:
        if isinstance(result, Exception):
            logger.debug(f"Failed to get mutations for {study_id}: {result}")
            continue

        if result and "mutations" in result:
            mutations = result["mutations"]
            sample_count = result["sample_count"]

            if mutations:
                studies_with_data += 1
                # Count unique samples with mutations
                unique_samples = {
                    m.get("sampleId") for m in mutations if m.get("sampleId")
                }
                total_mutations += len(unique_samples)
                total_samples += sample_count

                # Process mutations for hotspots and cancer types
                study_cancer_type = await client._get_study_cancer_type(
                    study_id, cancer_types_lookup
                )
                _update_hotspot_counts(
                    mutations, hotspot_counts, study_cancer_type
                )
                _update_cancer_distribution(
                    mutations, cancer_distribution, study_cancer_type
                )

    return {
        "total_mutations": total_mutations,
        "total_samples": total_samples,
        "studies_with_data": studies_with_data,
        "hotspot_counts": hotspot_counts,
        "cancer_distribution": cancer_distribution,
    }


def _update_hotspot_counts(
    mutations: list[dict[str, Any]],
    hotspot_counts: dict[str, dict[str, Any]],
    cancer_type: str,
) -> None:
    """Update hotspot counts from mutations."""
    for mut in mutations:
        protein_change = mut.get("proteinChange", "")
        if protein_change:
            if protein_change not in hotspot_counts:
                hotspot_counts[protein_change] = {
                    "count": 0,
                    "cancer_types": set(),
                }
            hotspot_counts[protein_change]["count"] += 1
            hotspot_counts[protein_change]["cancer_types"].add(cancer_type)


def _update_cancer_distribution(
    mutations: list[dict[str, Any]],
    cancer_distribution: dict[str, int],
    cancer_type: str,
) -> None:
    """Update cancer type distribution."""
    cancer_distribution[cancer_type] = cancer_distribution.get(
        cancer_type, 0
    ) + len({m.get("sampleId") for m in mutations if m.get("sampleId")})


def format_hotspots(
    hotspot_counts: dict[str, dict[str, Any]], total_mutations: int
) -> list[GeneHotspot]:
    """Format hotspot counts into GeneHotspot objects."""
    hotspots = []

    for protein_change, data in sorted(
        hotspot_counts.items(), key=lambda x: x[1]["count"], reverse=True
    )[:5]:  # Top 5 hotspots
        # Try to extract position from protein change
        position = 0
        try:
            match = re.search(r"(\d+)", protein_change)
            if match:
                position = int(match.group(1))
        except Exception:
            logger.debug("Failed to extract position from protein change")

        hotspots.append(
            GeneHotspot(
                position=position,
                amino_acid_change=protein_change,
                count=data["count"],
                frequency=data["count"] / total_mutations
                if total_mutations > 0
                else 0.0,
                cancer_types=list(data["cancer_types"]),
            )
        )

    return hotspots

```

--------------------------------------------------------------------------------
/tests/tdd/workers/test_worker_sanitization.js:
--------------------------------------------------------------------------------

```javascript
/**
 * Tests for worker_entry_stytch.js sanitization functionality
 */

const { test } = require("node:test");
const assert = require("node:assert");

// Mock the sanitizeObject function for testing
const SENSITIVE_FIELDS = [
  "api_key",
  "apiKey",
  "api-key",
  "token",
  "secret",
  "password",
];

const sanitizeObject = (obj) => {
  if (!obj || typeof obj !== "object") return obj;

  // Handle arrays
  if (Array.isArray(obj)) {
    return obj.map((item) => sanitizeObject(item));
  }

  // Handle objects
  const sanitized = {};
  for (const [key, value] of Object.entries(obj)) {
    // Check if this key is sensitive
    const lowerKey = key.toLowerCase();
    if (
      SENSITIVE_FIELDS.some((field) => lowerKey.includes(field.toLowerCase()))
    ) {
      sanitized[key] = "[REDACTED]";
    } else if (typeof value === "object" && value !== null) {
      // Recursively sanitize nested objects
      sanitized[key] = sanitizeObject(value);
    } else {
      sanitized[key] = value;
    }
  }
  return sanitized;
};

// Test cases
test("should redact api_key field", () => {
  const input = {
    params: {
      arguments: {
        api_key: "AIzaSyB1234567890",
        gene: "BRAF",
        position: 140753336,
      },
    },
  };

  const result = sanitizeObject(input);
  assert.strictEqual(result.params.arguments.api_key, "[REDACTED]");
  assert.strictEqual(result.params.arguments.gene, "BRAF");
  assert.strictEqual(result.params.arguments.position, 140753336);
});

test("should handle nested sensitive fields", () => {
  const input = {
    outer: {
      token: "secret-token",
      inner: {
        password: "my-password",
        apiKey: "another-key",
        safe_field: "visible",
      },
    },
  };

  const result = sanitizeObject(input);
  assert.strictEqual(result.outer.token, "[REDACTED]");
  assert.strictEqual(result.outer.inner.password, "[REDACTED]");
  assert.strictEqual(result.outer.inner.apiKey, "[REDACTED]");
  assert.strictEqual(result.outer.inner.safe_field, "visible");
});

test("should handle arrays with sensitive data", () => {
  const input = {
    requests: [
      { api_key: "key1", data: "safe" },
      { api_key: "key2", data: "also safe" },
    ],
  };

  const result = sanitizeObject(input);
  assert.strictEqual(result.requests[0].api_key, "[REDACTED]");
  assert.strictEqual(result.requests[1].api_key, "[REDACTED]");
  assert.strictEqual(result.requests[0].data, "safe");
  assert.strictEqual(result.requests[1].data, "also safe");
});

test("should be case-insensitive for field names", () => {
  const input = {
    API_KEY: "uppercase",
    Api_Key: "mixed",
    "api-key": "hyphenated",
  };

  const result = sanitizeObject(input);
  assert.strictEqual(result.API_KEY, "[REDACTED]");
  assert.strictEqual(result.Api_Key, "[REDACTED]");
  assert.strictEqual(result["api-key"], "[REDACTED]");
});

test("should not modify non-sensitive fields", () => {
  const input = {
    gene: "TP53",
    chromosome: "chr17",
    position: 7577121,
    reference: "C",
    alternate: "T",
  };

  const result = sanitizeObject(input);
  assert.deepStrictEqual(result, input);
});

test("should handle null and undefined values", () => {
  const input = {
    api_key: null,
    token: undefined,
    valid: "data",
  };

  const result = sanitizeObject(input);
  assert.strictEqual(result.api_key, "[REDACTED]");
  assert.strictEqual(result.token, "[REDACTED]");
  assert.strictEqual(result.valid, "data");
});

test("should handle think tool detection", () => {
  const thinkRequest = {
    params: {
      name: "think",
      arguments: {
        thought: "Analyzing the problem...",
        thoughtNumber: 1,
      },
    },
  };

  const toolName = thinkRequest.params?.name;
  assert.strictEqual(toolName, "think");
});

test("should handle domain-based filtering", () => {
  const searchRequest1 = {
    params: {
      name: "search",
      arguments: {
        domain: "thinking",
        query: "some query",
      },
    },
  };

  const searchRequest2 = {
    params: {
      name: "search",
      arguments: {
        domain: "think",
        query: "some query",
      },
    },
  };

  const domain1 = searchRequest1.params?.arguments?.domain;
  const domain2 = searchRequest2.params?.arguments?.domain;

  assert.ok(domain1 === "thinking" || domain1 === "think");
  assert.ok(domain2 === "thinking" || domain2 === "think");
});

```

--------------------------------------------------------------------------------
/src/biomcp/cli/interventions.py:
--------------------------------------------------------------------------------

```python
"""CLI commands for intervention search and lookup."""

import asyncio
from typing import Annotated

import typer

from ..integrations.cts_api import CTSAPIError, get_api_key_instructions
from ..interventions import get_intervention, search_interventions
from ..interventions.getter import format_intervention_details
from ..interventions.search import (
    INTERVENTION_TYPES,
    format_intervention_results,
)

intervention_app = typer.Typer(
    no_args_is_help=True,
    help="Search and retrieve intervention information from NCI CTS API",
)


@intervention_app.command("search")
def search_interventions_cli(
    name: Annotated[
        str | None,
        typer.Argument(
            help="Intervention name to search for (partial match supported)"
        ),
    ] = None,
    intervention_type: Annotated[
        str | None,
        typer.Option(
            "--type",
            help=f"Type of intervention. Options: {', '.join(INTERVENTION_TYPES)}",
            show_choices=True,
        ),
    ] = None,
    synonyms: Annotated[
        bool,
        typer.Option(
            "--synonyms/--no-synonyms",
            help="Include synonym matches in search",
        ),
    ] = True,
    page_size: Annotated[
        int,
        typer.Option(
            "--page-size",
            help="Number of results per page",
            min=1,
            max=100,
        ),
    ] = 20,
    page: Annotated[
        int,
        typer.Option(
            "--page",
            help="Page number",
            min=1,
        ),
    ] = 1,
    api_key: Annotated[
        str | None,
        typer.Option(
            "--api-key",
            help="NCI API key (overrides NCI_API_KEY env var)",
            envvar="NCI_API_KEY",
        ),
    ] = None,
) -> None:
    """
    Search for interventions (drugs, devices, procedures) in the NCI database.

    Examples:
        # Search by drug name
        biomcp intervention search pembrolizumab

        # Search by type
        biomcp intervention search --type Drug

        # Search for devices
        biomcp intervention search "CAR T" --type Biological

        # Search without synonyms
        biomcp intervention search imatinib --no-synonyms
    """
    try:
        results = asyncio.run(
            search_interventions(
                name=name,
                intervention_type=intervention_type,
                synonyms=synonyms,
                page_size=page_size,
                page=page,
                api_key=api_key,
            )
        )

        output = format_intervention_results(results)
        typer.echo(output)

    except CTSAPIError as e:
        if "API key required" in str(e):
            typer.echo(get_api_key_instructions())
        else:
            typer.echo(f"Error: {e}", err=True)
        raise typer.Exit(1) from e
    except Exception as e:
        typer.echo(f"Unexpected error: {e}", err=True)
        raise typer.Exit(1) from e


@intervention_app.command("get")
def get_intervention_cli(
    intervention_id: Annotated[
        str,
        typer.Argument(help="Intervention ID"),
    ],
    api_key: Annotated[
        str | None,
        typer.Option(
            "--api-key",
            help="NCI API key (overrides NCI_API_KEY env var)",
            envvar="NCI_API_KEY",
        ),
    ] = None,
) -> None:
    """
    Get detailed information about a specific intervention.

    Example:
        biomcp intervention get INT123456
    """
    try:
        intervention_data = asyncio.run(
            get_intervention(
                intervention_id=intervention_id,
                api_key=api_key,
            )
        )

        output = format_intervention_details(intervention_data)
        typer.echo(output)

    except CTSAPIError as e:
        if "API key required" in str(e):
            typer.echo(get_api_key_instructions())
        else:
            typer.echo(f"Error: {e}", err=True)
        raise typer.Exit(1) from e
    except Exception as e:
        typer.echo(f"Unexpected error: {e}", err=True)
        raise typer.Exit(1) from e


@intervention_app.command("types")
def list_intervention_types() -> None:
    """
    List all available intervention types.
    """
    typer.echo("## Available Intervention Types\n")
    for int_type in INTERVENTION_TYPES:
        typer.echo(f"- {int_type}")
    typer.echo("\nUse these values with the --type option when searching.")

```

--------------------------------------------------------------------------------
/tests/tdd/test_pten_r173_search.py:
--------------------------------------------------------------------------------

```python
"""Test case demonstrating PTEN R173 search limitations."""

import asyncio
import json

import pytest

from biomcp.articles.search import PubmedRequest, search_articles


@pytest.mark.asyncio
async def test_pten_r173_search_limitations():
    """Demonstrate that current AND logic is too restrictive for finding PTEN R173 papers."""

    # Test 1: Current approach with multiple keywords
    request_restrictive = PubmedRequest(
        genes=["PTEN"], keywords=["R173", "Arg173"]
    )
    result_restrictive = await search_articles(
        request_restrictive, output_json=True
    )
    data_restrictive = json.loads(result_restrictive)

    # Test 2: Less restrictive approach
    request_less_restrictive = PubmedRequest(genes=["PTEN"], keywords=["R173"])
    result_less_restrictive = await search_articles(
        request_less_restrictive, output_json=True
    )
    data_less_restrictive = json.loads(result_less_restrictive)

    # Test 3: Alternative variant notations
    request_notation = PubmedRequest(genes=["PTEN"], keywords=["p.R173C"])
    result_notation = await search_articles(request_notation, output_json=True)
    data_notation = json.loads(result_notation)

    print("\nPTEN R173 Search Results:")
    print(
        f"1. PTEN + R173 + Arg173 (AND logic): {len(data_restrictive)} articles"
    )
    print(f"2. PTEN + R173 only: {len(data_less_restrictive)} articles")
    print(f"3. PTEN + p.R173C: {len(data_notation)} articles")

    # The restrictive search should find fewer results
    assert len(data_restrictive) <= len(data_less_restrictive)

    # Show some example articles found
    if data_less_restrictive:
        print("\nExample articles found with 'PTEN + R173':")
        for i, article in enumerate(data_less_restrictive[:5]):
            title = article.get("title", "No title")
            pmid = article.get("pmid", "N/A")
            year = article.get("pub_year", article.get("date", "N/A"))
            print(f"{i + 1}. {title[:80]}... (PMID: {pmid}, Year: {year[:4]})")


@pytest.mark.asyncio
async def test_specific_pten_papers_not_found():
    """Test that specific PTEN R173 papers mentioned by user are not found."""

    # Papers mentioned by user that should be found
    expected_papers = [
        "Mester et al 2018 Human Mutation",
        "Mighell et al 2020 AJHG",
        "Smith et al 2016 Proteins",
        "Smith et al 2019 AJHG",
        "Smith et al 2023 JPCB",
    ]

    # Search for Smith IN papers on PTEN
    request = PubmedRequest(keywords=["Smith IN", "PTEN"])
    result = await search_articles(request, output_json=True)
    data = json.loads(result)

    print(f"\nSmith IN + PTEN search found {len(data)} articles")

    # Check if any contain R173 in title/abstract
    r173_papers = []
    for article in data:
        title = article.get("title", "")
        abstract = article.get("abstract", "")
        if (
            "R173" in title
            or "R173" in abstract
            or "Arg173" in title
            or "Arg173" in abstract
        ):
            r173_papers.append(article)

    print(f"Papers mentioning R173/Arg173: {len(r173_papers)}")

    # The issue: R173 might only be in full text, not abstract
    assert len(r173_papers) < len(
        expected_papers
    ), "Not all expected R173 papers are found"


def test_and_logic_explanation():
    """Document why AND logic causes issues for variant searches."""

    explanation = """
    Current search behavior:
    - Query: genes=['PTEN'], keywords=['R173', 'Arg173']
    - Translates to: "@GENE_PTEN AND R173 AND Arg173"
    - This requires ALL terms to be present

    Issues:
    1. Papers may use either "R173" OR "Arg173", not both
    2. Variant notations vary: "R173C", "p.R173C", "c.517C>T", etc.
    3. Specific mutation details may only be in full text, not abstract
    4. AND logic is too restrictive for synonym/variant searches

    Potential solutions:
    1. Implement OR logic within variant/keyword groups
    2. Add variant notation normalization
    3. Support multiple search strategies (AND vs OR)
    4. Consider full-text search capabilities
    """

    print(explanation)
    assert True  # This test is for documentation


if __name__ == "__main__":
    # Run the tests to demonstrate the issue
    asyncio.run(test_pten_r173_search_limitations())
    asyncio.run(test_specific_pten_papers_not_found())
    test_and_logic_explanation()

```

--------------------------------------------------------------------------------
/src/biomcp/interventions/getter.py:
--------------------------------------------------------------------------------

```python
"""Get specific intervention details via NCI CTS API."""

import logging
from typing import Any

from ..constants import NCI_INTERVENTIONS_URL
from ..integrations.cts_api import CTSAPIError, make_cts_request

logger = logging.getLogger(__name__)


async def get_intervention(
    intervention_id: str,
    api_key: str | None = None,
) -> dict[str, Any]:
    """
    Get detailed information about a specific intervention.

    Args:
        intervention_id: Intervention ID
        api_key: Optional API key (if not provided, uses NCI_API_KEY env var)

    Returns:
        Dictionary with intervention details

    Raises:
        CTSAPIError: If the API request fails or intervention not found
    """
    try:
        # Make API request
        url = f"{NCI_INTERVENTIONS_URL}/{intervention_id}"
        response = await make_cts_request(
            url=url,
            api_key=api_key,
        )

        # Return the intervention data
        if "data" in response:
            return response["data"]
        elif "intervention" in response:
            return response["intervention"]
        else:
            return response

    except CTSAPIError:
        raise
    except Exception as e:
        logger.error(f"Failed to get intervention {intervention_id}: {e}")
        raise CTSAPIError(f"Failed to retrieve intervention: {e!s}") from e


def _format_intervention_header(intervention: dict[str, Any]) -> list[str]:
    """Format intervention header and basic info."""
    int_id = intervention.get(
        "id", intervention.get("intervention_id", "Unknown")
    )
    name = intervention.get("name", "Unknown Intervention")
    int_type = intervention.get(
        "type", intervention.get("category", "Unknown")
    )

    return [
        f"## Intervention: {name}",
        "",
        "### Basic Information",
        f"- **ID**: {int_id}",
        f"- **Type**: {int_type}",
    ]


def _format_intervention_synonyms(synonyms: Any) -> list[str]:
    """Format intervention synonyms section."""
    if not synonyms:
        return []

    lines = ["", "### Synonyms"]
    if isinstance(synonyms, list):
        for syn in synonyms:
            lines.append(f"- {syn}")
    else:
        lines.append(f"- {synonyms}")

    return lines


def _format_intervention_regulatory(intervention: dict[str, Any]) -> list[str]:
    """Format regulatory information section."""
    if not intervention.get("fda_approved"):
        return []

    lines = [
        "",
        "### Regulatory Status",
        f"- **FDA Approved**: {'Yes' if intervention['fda_approved'] else 'No'}",
    ]

    if intervention.get("approval_date"):
        lines.append(f"- **Approval Date**: {intervention['approval_date']}")

    return lines


def _format_intervention_indications(indications: Any) -> list[str]:
    """Format clinical indications section."""
    if not indications:
        return []

    lines = ["", "### Clinical Indications"]
    if isinstance(indications, list):
        for indication in indications:
            lines.append(f"- {indication}")
    else:
        lines.append(f"- {indications}")

    return lines


def format_intervention_details(intervention: dict[str, Any]) -> str:
    """
    Format intervention details as markdown.

    Args:
        intervention: Intervention data dictionary

    Returns:
        Formatted markdown string
    """
    lines = _format_intervention_header(intervention)

    # Add synonyms
    lines.extend(
        _format_intervention_synonyms(intervention.get("synonyms", []))
    )

    # Add description
    if intervention.get("description"):
        lines.extend([
            "",
            "### Description",
            intervention["description"],
        ])

    # Add mechanism of action for drugs
    if intervention.get("mechanism_of_action"):
        lines.extend([
            "",
            "### Mechanism of Action",
            intervention["mechanism_of_action"],
        ])

    # Add regulatory info
    lines.extend(_format_intervention_regulatory(intervention))

    # Add clinical indications
    lines.extend(
        _format_intervention_indications(intervention.get("indications"))
    )

    # Add related trials count if available
    if intervention.get("trial_count"):
        lines.extend([
            "",
            "### Clinical Trial Activity",
            f"- **Number of Trials**: {intervention['trial_count']}",
        ])

    return "\n".join(lines)

```

--------------------------------------------------------------------------------
/src/biomcp/thinking/session.py:
--------------------------------------------------------------------------------

```python
"""Session management for sequential thinking."""

import uuid
from collections import defaultdict
from dataclasses import dataclass, field
from datetime import datetime
from typing import Any


@dataclass
class ThoughtEntry:
    """Represents a single thought in the thinking process."""

    thought: str
    thought_number: int
    total_thoughts: int
    next_thought_needed: bool
    timestamp: datetime = field(default_factory=datetime.now)
    is_revision: bool = False
    revises_thought: int | None = None
    branch_from_thought: int | None = None
    branch_id: str | None = None
    metadata: dict[str, Any] = field(default_factory=dict)


@dataclass
class ThinkingSession:
    """Manages state for a thinking session."""

    session_id: str = field(default_factory=lambda: str(uuid.uuid4()))
    created_at: datetime = field(default_factory=datetime.now)
    thought_history: list[ThoughtEntry] = field(default_factory=list)
    thought_branches: dict[str, list[ThoughtEntry]] = field(
        default_factory=lambda: defaultdict(list)
    )
    metadata: dict[str, Any] = field(default_factory=dict)

    def add_thought(self, entry: ThoughtEntry) -> None:
        """Add a thought to the session."""
        # If this is a revision, replace the original thought
        if entry.is_revision and entry.revises_thought:
            for i, thought in enumerate(self.thought_history):
                if thought.thought_number == entry.revises_thought:
                    self.thought_history[i] = entry
                    return

        # Add to appropriate collection
        if entry.branch_id:
            self.thought_branches[entry.branch_id].append(entry)
        else:
            self.thought_history.append(entry)

    def get_thought(self, thought_number: int) -> ThoughtEntry | None:
        """Get a specific thought by number."""
        for thought in self.thought_history:
            if thought.thought_number == thought_number:
                return thought
        return None

    def get_branch_thoughts(self, branch_id: str) -> list[ThoughtEntry]:
        """Get all thoughts in a specific branch."""
        return self.thought_branches.get(branch_id, [])

    def get_all_thoughts(self) -> list[ThoughtEntry]:
        """Get all thoughts across main history and branches."""
        all_thoughts = list(self.thought_history)
        for branch_thoughts in self.thought_branches.values():
            all_thoughts.extend(branch_thoughts)
        return sorted(all_thoughts, key=lambda t: t.timestamp)


class SessionManager:
    """Manages multiple thinking sessions."""

    def __init__(self):
        self.sessions: dict[str, ThinkingSession] = {}
        self._current_session_id: str | None = None

    def create_session(self) -> ThinkingSession:
        """Create a new thinking session."""
        session = ThinkingSession()
        self.sessions[session.session_id] = session
        self._current_session_id = session.session_id
        return session

    def get_session(
        self, session_id: str | None = None
    ) -> ThinkingSession | None:
        """Get a session by ID or the current session."""
        if session_id:
            return self.sessions.get(session_id)
        elif self._current_session_id:
            return self.sessions.get(self._current_session_id)
        return None

    def get_or_create_session(
        self, session_id: str | None = None
    ) -> ThinkingSession:
        """Get existing session or create new one."""
        if session_id and session_id in self.sessions:
            self._current_session_id = session_id
            return self.sessions[session_id]

        session = self.get_session()
        if not session:
            session = self.create_session()
        return session

    def clear_session(self, session_id: str | None = None) -> None:
        """Clear a specific session or the current session."""
        if session_id:
            self.sessions.pop(session_id, None)
            if self._current_session_id == session_id:
                self._current_session_id = None
        elif self._current_session_id:
            self.sessions.pop(self._current_session_id, None)
            self._current_session_id = None

    def clear_all_sessions(self) -> None:
        """Clear all sessions."""
        self.sessions.clear()
        self._current_session_id = None


# Global session manager instance
_session_manager = SessionManager()

```

--------------------------------------------------------------------------------
/.github/workflows/ci.yml:
--------------------------------------------------------------------------------

```yaml
name: CI

on:
  push:
    branches: [main, develop]
  pull_request:
    branches: [main]
  workflow_dispatch:

env:
  PYTHON_VERSION: "3.12"
  UV_VERSION: "0.4.29"

jobs:
  # Quality check from main.yml - uses make check
  quality:
    runs-on: ubuntu-latest
    name: Quality
    steps:
      - name: Check out
        uses: actions/checkout@v5

      - uses: actions/cache@v4
        with:
          path: ~/.cache/pre-commit
          key: pre-commit-${{ hashFiles('.pre-commit-config.yaml') }}

      - name: Set up Python
        uses: actions/setup-python@v6
        with:
          python-version: ${{ env.PYTHON_VERSION }}

      - name: Install uv
        uses: astral-sh/setup-uv@v6
        with:
          version: ${{ env.UV_VERSION }}

      - name: Install dependencies
        run: |
          uv sync --group dev

      - name: Run checks
        run: make check

  # Tests and type check specifically on Python 3.11
  tests-and-type-check:
    runs-on: ubuntu-latest
    name: Tests and Type Check (Python 3.11)
    steps:
      - name: Check out
        uses: actions/checkout@v5

      - name: Set up Python
        uses: actions/setup-python@v6
        with:
          python-version: "3.11"

      - name: Install uv
        uses: astral-sh/setup-uv@v6
        with:
          version: ${{ env.UV_VERSION }}

      - name: Install dependencies
        run: |
          uv sync --group dev

      - name: Run tests
        run: uv run python -m pytest tests -m "not integration" --cov --cov-config=pyproject.toml --cov-report=xml

      - name: Check typing
        run: uv run mypy

      - name: Upload coverage reports to Codecov with GitHub Action on Python 3.11
        uses: codecov/codecov-action@v5

  # Documentation check from main.yml
  check-docs:
    runs-on: ubuntu-latest
    name: Check Docs
    steps:
      - name: Check out
        uses: actions/checkout@v5

      - name: Set up Python
        uses: actions/setup-python@v6
        with:
          python-version: ${{ env.PYTHON_VERSION }}

      - name: Install uv
        uses: astral-sh/setup-uv@v6
        with:
          version: ${{ env.UV_VERSION }}

      - name: Install dependencies
        run: |
          uv sync --group dev

      - name: Check if documentation can be built
        run: uv run mkdocs build -s

  # Build package check
  build-package:
    runs-on: ubuntu-latest
    name: Build Package
    steps:
      - uses: actions/checkout@v5

      - name: Set up Python
        uses: actions/setup-python@v6
        with:
          python-version: ${{ env.PYTHON_VERSION }}

      - name: Install uv
        uses: astral-sh/setup-uv@v6
        with:
          version: ${{ env.UV_VERSION }}

      - name: Build package
        run: |
          uvx --from build pyproject-build --installer uv

      - name: Check package
        run: |
          uvx twine check dist/*

      - name: Upload artifacts
        uses: actions/upload-artifact@v4
        with:
          name: dist
          path: dist/

  # MCP integration test - quick check
  test-mcp:
    runs-on: ubuntu-latest
    name: Test MCP Integration
    steps:
      - uses: actions/checkout@v5

      - name: Set up Python
        uses: actions/setup-python@v6
        with:
          python-version: ${{ env.PYTHON_VERSION }}

      - name: Install uv
        uses: astral-sh/setup-uv@v6
        with:
          version: ${{ env.UV_VERSION }}

      - name: Install dependencies
        run: |
          uv sync --group dev

      - name: Test MCP server startup
        run: |
          timeout 10s uv run biomcp run || code=$?; if [[ $code -ne 124 && $code -ne 0 ]]; then exit $code; fi

      - name: Run MCP integration tests
        run: |
          uv run python -m pytest tests/tdd/test_mcp_integration.py -v

  # Run integration tests separately - allowed to fail
  integration-tests:
    runs-on: ubuntu-latest
    name: Integration Tests (Optional)
    continue-on-error: true
    steps:
      - name: Check out
        uses: actions/checkout@v5

      - name: Set up Python
        uses: actions/setup-python@v6
        with:
          python-version: "3.11"

      - name: Install uv
        uses: astral-sh/setup-uv@v6
        with:
          version: ${{ env.UV_VERSION }}

      - name: Install dependencies
        run: |
          uv sync --group dev

      - name: Run integration tests
        run: |
          uv run python -m pytest tests -m "integration" -v --tb=short
        continue-on-error: true

```

--------------------------------------------------------------------------------
/docs/backend-services-reference/03-cbioportal.md:
--------------------------------------------------------------------------------

```markdown
# cBioPortal Integration

BioMCP integrates with [cBioPortal](https://www.cbioportal.org/), a comprehensive cancer genomics portal that provides visualization and analysis tools for large-scale cancer genomics datasets.

## Overview

The cBioPortal integration enhances article searches by automatically including relevant cancer genomics data when searching for genes. This integration provides:

1. **Gene-level summaries** - Mutation frequency and distribution across cancer studies
2. **Mutation-specific searches** - Find studies containing specific mutations (e.g., BRAF V600E)
3. **Cancer type resolution** - Accurate cancer type categorization using cBioPortal's API

## How It Works

### Automatic Integration

When you search for articles with a gene parameter, BioMCP automatically queries cBioPortal to provide additional context:

```python
# Basic gene search includes cBioPortal summary
search(domain="article", genes=["BRAF"], diseases=["melanoma"])
```

This returns:

- Standard PubMed/PubTator3 article results
- cBioPortal summary showing mutation frequency across cancer studies
- Top cancer types where the gene is mutated

### Mutation-Specific Searches

To search for specific mutations, include the mutation notation in keywords:

```python
# Search for BRAF V600E mutation
search(domain="article", genes=["BRAF"], keywords=["V600E"])

# Search for SRSF2 F57Y mutation
search(domain="article", genes=["SRSF2"], keywords=["F57Y"])

# Use wildcards for mutation patterns (e.g., any amino acid at position 57)
search(domain="article", genes=["SRSF2"], keywords=["F57*"])
```

Mutation-specific searches return:

- Total number of studies in cBioPortal
- Number of studies containing the mutation
- Top studies ranked by mutation count
- Cancer type distribution

## Example Output

### Gene-Level Summary

```
### cBioPortal Summary for BRAF
- **Mutation Frequency**: 76.7% (368 mutations in 480 samples)
- **Top Cancer Types**: Melanoma (45%), Thyroid (23%), Colorectal (18%)
- **Top Mutations**: V600E (89%), V600K (7%), G469A (2%)
```

### Mutation-Specific Results

```
### cBioPortal Mutation Search: BRAF
**Specific Mutation**: V600E
- **Total Studies**: 2340
- **Studies with Mutation**: 170
- **Total Mutations Found**: 5780

**Top Studies by Mutation Count:**
| Count | Study ID | Cancer Type | Study Name |
|-------|----------|-------------|------------|
|   804 | msk_met_2021 | Mixed Cancer Types | MSK MetTropism (MSK, Cell 2021) |
|   555 | msk_chord_2024 | Mixed Cancer Types | MSK-CHORD (MSK, Nature 2024) |
|   295 | msk_impact_2017 | Mixed Cancer Types | MSK-IMPACT Clinical Sequencing Cohort |
```

## Supported Mutation Notations

The integration recognizes standard protein change notation:

- **Specific mutations**: `V600E`, `F57Y`, `T790M`
- **Wildcard patterns**: `F57*` (matches F57Y, F57L, etc.)
- **Multiple mutations**: Include multiple keywords for OR search

## API Details

### Endpoints Used

1. **Gene Information**: `/api/genes/{gene}`
2. **Cancer Types**: `/api/cancer-types`
3. **Mutation Data**: `/api/mutations/fetch`
4. **Study Information**: `/api/studies`

### Rate Limiting

- Conservative rate limit of 5 requests/second
- Results cached for 15-30 minutes (mutations) or 24 hours (cancer types)

### Authentication

Optional authentication via environment variable:

```bash
export CBIO_TOKEN="your-api-token"
```

Public cBioPortal instance works without authentication but may have rate limits.

## CLI Usage

For detailed command-line options for searching articles with cBioPortal integration, see the [CLI User Guide](../user-guides/01-command-line-interface.md#article-commands).

## Performance Considerations

1. **Caching**: Results are cached to minimize API calls

   - Gene summaries: 15 minutes
   - Mutation searches: 30 minutes
   - Cancer types: 24 hours

2. **Graceful Degradation**: If cBioPortal is unavailable, searches continue without the additional data

3. **Parallel Processing**: API calls are made in parallel with article searches for optimal performance

## Limitations

1. Only works with valid HUGO gene symbols
2. Mutation searches require exact protein change notation
3. Limited to mutations in cBioPortal's curated studies
4. Rate limits may apply for high-volume usage

## Error Handling

The integration handles various error scenarios:

- Invalid gene symbols are validated before API calls
- Network timeouts fall back to article-only results
- API errors are logged but don't block search results

```

--------------------------------------------------------------------------------
/src/biomcp/utils/cancer_types_api.py:
--------------------------------------------------------------------------------

```python
"""Cancer type utilities using cBioPortal API."""

import logging

from ..utils.cbio_http_adapter import CBioHTTPAdapter
from ..utils.request_cache import request_cache

logger = logging.getLogger(__name__)


class CancerTypeAPIClient:
    """Client for fetching cancer types from cBioPortal API."""

    def __init__(self):
        """Initialize the cancer type API client."""
        self.http_adapter = CBioHTTPAdapter()
        # Cache for cancer types
        self._cancer_types_cache: dict[str, str] | None = None

    @request_cache(ttl=86400)  # Cache for 24 hours
    async def get_all_cancer_types(self) -> dict[str, str]:
        """Fetch all cancer types from cBioPortal API.

        Returns:
            Dictionary mapping cancer type IDs to display names
        """
        if self._cancer_types_cache is not None:
            return self._cancer_types_cache

        try:
            cancer_types, error = await self.http_adapter.get(
                "/cancer-types",
                endpoint_key="cbioportal_cancer_types",
                cache_ttl=86400,  # 24 hours
            )

            if error:
                logger.error(f"Failed to fetch cancer types: {error.message}")
                return {}

            if cancer_types:
                # Build mapping from ID to name
                result = {}
                for ct in cancer_types:
                    cancer_type_id = ct.get("cancerTypeId", "")
                    name = ct.get("name", "")

                    if cancer_type_id and name:
                        result[cancer_type_id.lower()] = name

                        # Also add common abbreviations
                        short_name = ct.get("shortName", "")
                        if short_name and short_name != cancer_type_id:
                            result[short_name.lower()] = name

                self._cancer_types_cache = result
                logger.info(f"Loaded {len(result)} cancer types from API")
                return result

            return {}

        except Exception as e:
            logger.error(f"Error fetching cancer types: {e}")
            return {}

    async def get_cancer_type_name(self, cancer_type_id: str) -> str:
        """Get the display name for a cancer type ID.

        Args:
            cancer_type_id: The cancer type identifier

        Returns:
            Display name or the original ID if not found
        """
        if not cancer_type_id:
            return "Unknown"

        cancer_types = await self.get_all_cancer_types()

        # Try exact match (case-insensitive)
        normalized_id = cancer_type_id.lower()
        if normalized_id in cancer_types:
            return cancer_types[normalized_id]

        # If not found, return the original ID with title case
        if cancer_type_id == cancer_type_id.lower():
            return cancer_type_id.title()
        return cancer_type_id

    @request_cache(ttl=3600)  # Cache for 1 hour
    async def get_study_cancer_type(self, study_id: str) -> str:
        """Get cancer type for a specific study.

        Args:
            study_id: The study identifier

        Returns:
            Cancer type name or "Unknown"
        """
        try:
            study_data, error = await self.http_adapter.get(
                f"/studies/{study_id}",
                endpoint_key="cbioportal_studies",
                cache_ttl=3600,  # 1 hour
            )

            if error or not study_data:
                logger.debug(f"Study {study_id} not found")
                return "Unknown"

            cancer_type_id = study_data.get("cancerType", {}).get(
                "cancerTypeId", ""
            )

            if cancer_type_id and cancer_type_id != "unknown":
                return await self.get_cancer_type_name(cancer_type_id)

            # Fallback to the cancer type name directly
            cancer_type_name = study_data.get("cancerType", {}).get("name", "")
            if cancer_type_name:
                return cancer_type_name

            return "Unknown"

        except Exception as e:
            logger.debug(f"Error fetching study {study_id}: {e}")
            return "Unknown"


# Global instance for reuse
_cancer_type_client: CancerTypeAPIClient | None = None


def get_cancer_type_client() -> CancerTypeAPIClient:
    """Get or create the global cancer type client."""
    global _cancer_type_client
    if _cancer_type_client is None:
        _cancer_type_client = CancerTypeAPIClient()
    return _cancer_type_client

```

--------------------------------------------------------------------------------
/tests/tdd/utils/test_mutation_filter.py:
--------------------------------------------------------------------------------

```python
"""Tests for mutation filter utility."""

from biomcp.utils.mutation_filter import MutationFilter


class MockMutation:
    """Mock mutation object for testing."""

    def __init__(self, protein_change: str):
        self.protein_change = protein_change


class TestMutationFilter:
    """Test mutation filtering functionality."""

    def test_specific_mutation_filter(self):
        """Test filtering for specific mutations."""
        mutation_filter = MutationFilter(specific_mutation="V600E")

        assert mutation_filter.matches("V600E")
        assert not mutation_filter.matches("V600K")
        assert not mutation_filter.matches("V600")
        assert not mutation_filter.matches("")

    def test_wildcard_pattern_filter(self):
        """Test filtering with wildcard patterns."""
        mutation_filter = MutationFilter(pattern="V600*")

        assert mutation_filter.matches("V600E")
        assert mutation_filter.matches("V600K")
        assert mutation_filter.matches("V600D")
        assert not mutation_filter.matches("V601E")
        assert not mutation_filter.matches("K600E")

    def test_pattern_without_wildcard(self):
        """Test pattern matching without wildcard."""
        # Pattern does exact match via regex (no prefix matching without *)
        mutation_filter = MutationFilter(pattern="F57")

        # Exact match works
        assert mutation_filter.matches("F57")
        # No prefix matching without wildcard
        assert not mutation_filter.matches("F57Y")
        assert not mutation_filter.matches("F57L")
        assert not mutation_filter.matches("F58Y")

    def test_no_filter(self):
        """Test when no filter is specified."""
        mutation_filter = MutationFilter()

        assert mutation_filter.matches("V600E")
        assert mutation_filter.matches("anything")
        # Empty protein change returns False even with no filter
        assert not mutation_filter.matches("")

    def test_filter_mutations_list(self):
        """Test filtering a list of mutations."""
        mutations = [
            MockMutation("V600E"),
            MockMutation("V600K"),
            MockMutation("V600D"),
            MockMutation("T790M"),
            MockMutation("L858R"),
        ]

        # Test specific mutation
        mutation_filter1 = MutationFilter(specific_mutation="V600E")
        filtered1 = mutation_filter1.filter_mutations(mutations)
        assert len(filtered1) == 1
        assert filtered1[0].protein_change == "V600E"

        # Test pattern
        mutation_filter2 = MutationFilter(pattern="V600*")
        filtered2 = mutation_filter2.filter_mutations(mutations)
        assert len(filtered2) == 3
        assert all(m.protein_change.startswith("V600") for m in filtered2)

        # Test no filter
        mutation_filter3 = MutationFilter()
        filtered3 = mutation_filter3.filter_mutations(mutations)
        assert len(filtered3) == 5

    def test_string_representations(self):
        """Test string representations of filters."""
        mutation_filter1 = MutationFilter(specific_mutation="V600E")
        assert str(mutation_filter1) == "MutationFilter(specific=V600E)"
        assert (
            repr(mutation_filter1)
            == "MutationFilter(specific_mutation='V600E', pattern=None)"
        )

        mutation_filter2 = MutationFilter(pattern="V600*")
        assert str(mutation_filter2) == "MutationFilter(pattern=V600*)"

        mutation_filter3 = MutationFilter()
        assert str(mutation_filter3) == "MutationFilter(no_filter)"

    def test_edge_cases(self):
        """Test edge cases in mutation matching."""
        # Empty protein change
        mutation_filter = MutationFilter(specific_mutation="V600E")
        assert not mutation_filter.matches("")
        assert not mutation_filter.matches(None)

        # Complex patterns
        mutation_filter2 = MutationFilter(pattern="[VL]600*")
        # This will use regex escaping, so won't work as expected
        # But should not crash
        assert not mutation_filter2.matches("V600E")  # Because [ is escaped

    def test_filter_mutations_preserves_type(self):
        """Test that filter preserves the original list type."""
        mutations = [
            MockMutation("V600E"),
            MockMutation("V600K"),
        ]

        mutation_filter = MutationFilter(pattern="V600*")
        result = mutation_filter.filter_mutations(mutations)

        # Result should be a list
        assert isinstance(result, list)
        assert len(result) == 2

```

--------------------------------------------------------------------------------
/src/biomcp/variants/getter.py:
--------------------------------------------------------------------------------

```python
"""Getter module for retrieving variant details."""

import json
import logging
from typing import Annotated

from .. import ensure_list, http_client, render
from ..constants import DEFAULT_ASSEMBLY, MYVARIANT_GET_URL
from .external import ExternalVariantAggregator, format_enhanced_annotations
from .filters import filter_variants
from .links import inject_links

logger = logging.getLogger(__name__)


async def get_variant(
    variant_id: str,
    output_json: bool = False,
    include_external: bool = False,
    assembly: str = DEFAULT_ASSEMBLY,
) -> str:
    """
    Get variant details from MyVariant.info using the variant identifier.

    The identifier can be a full HGVS-style string (e.g. "chr7:g.140453136A>T")
    or an rsID (e.g. "rs113488022"). The API response is expected to include a
    "hits" array; this function extracts the first hit.

    Args:
        variant_id: Variant identifier (HGVS or rsID)
        output_json: Return JSON format if True, else Markdown
        include_external: Include external annotations (TCGA, 1000 Genomes, cBioPortal)
        assembly: Genome assembly (hg19 or hg38), defaults to hg19

    Returns:
        Formatted variant data as JSON or Markdown string

    If output_json is True, the result is returned as a formatted JSON string;
    otherwise, it is rendered as Markdown.
    """
    response, error = await http_client.request_api(
        url=f"{MYVARIANT_GET_URL}/{variant_id}",
        request={"fields": "all", "assembly": assembly},
        method="GET",
        domain="myvariant",
    )

    data_to_return: list = ensure_list(response)

    # Inject database links into the variant data
    if not error:
        data_to_return = inject_links(data_to_return)
        data_to_return = filter_variants(data_to_return)

        # Add external annotations if requested
        if include_external and data_to_return:
            logger.info(
                f"Adding external annotations for {len(data_to_return)} variants"
            )
            aggregator = ExternalVariantAggregator()

            for _i, variant_data in enumerate(data_to_return):
                logger.info(
                    f"Processing variant {_i}: keys={list(variant_data.keys())}"
                )
                # Get enhanced annotations
                enhanced = await aggregator.get_enhanced_annotations(
                    variant_id,
                    include_tcga=True,
                    include_1000g=True,
                    include_cbioportal=True,
                    variant_data=variant_data,
                )

                # Add formatted annotations to the variant data
                formatted = format_enhanced_annotations(enhanced)
                logger.info(
                    f"Formatted external annotations: {formatted['external_annotations'].keys()}"
                )
                variant_data.update(formatted["external_annotations"])

    if error:
        data_to_return = [{"error": f"Error {error.code}: {error.message}"}]

    if output_json:
        return json.dumps(data_to_return, indent=2)
    else:
        return render.to_markdown(data_to_return)


async def _variant_details(
    call_benefit: Annotated[
        str,
        "Define and summarize why this function is being called and the intended benefit",
    ],
    variant_id: str,
    include_external: Annotated[
        bool,
        "Include annotations from external sources (TCGA, 1000 Genomes, cBioPortal)",
    ] = True,
    assembly: Annotated[
        str,
        "Genome assembly (hg19 or hg38). Default: hg19",
    ] = DEFAULT_ASSEMBLY,
) -> str:
    """
    Retrieves detailed information for a *single* genetic variant.

    Parameters:
    - call_benefit: Define and summarize why this function is being called and the intended benefit
    - variant_id: A variant identifier ("chr7:g.140453136A>T")
    - include_external: Include annotations from TCGA, 1000 Genomes, cBioPortal, and Mastermind
    - assembly: Genome assembly (hg19 or hg38). Default: hg19

    Process: Queries the MyVariant.info GET endpoint, optionally fetching
            additional annotations from external databases
    Output: A Markdown formatted string containing comprehensive
            variant annotations (genomic context, frequencies,
            predictions, clinical data, external annotations). Returns error if invalid.
    Note: Use the variant_searcher to find the variant id first.
    """
    return await get_variant(
        variant_id,
        output_json=False,
        include_external=include_external,
        assembly=assembly,
    )

```

--------------------------------------------------------------------------------
/src/biomcp/integrations/cts_api.py:
--------------------------------------------------------------------------------

```python
"""NCI Clinical Trials Search API integration helper."""

import json
import logging
import os
from typing import Any, Literal

from ..constants import NCI_API_KEY_ENV
from ..http_client import request_api

logger = logging.getLogger(__name__)


class CTSAPIError(Exception):
    """Error raised when CTS API requests fail."""

    pass


def _validate_api_key(api_key: str | None) -> str:
    """Validate and return API key."""
    if not api_key:
        api_key = os.getenv(NCI_API_KEY_ENV)

    if not api_key:
        raise CTSAPIError(
            f"NCI API key required. Please set {NCI_API_KEY_ENV} environment "
            "variable or provide api_key parameter.\n"
            "Get a free API key at: https://www.cancer.gov/research/participate/"
            "clinical-trials-search/developers"
        )

    return api_key


def _prepare_request_data(
    method: str,
    params: dict[str, Any] | None,
    json_data: dict[str, Any] | None,
    headers: dict[str, str],
) -> dict[str, Any]:
    """Prepare request data based on method."""
    if method == "GET":
        request_data = params or {}
        logger.debug(f"CTS API GET request with params: {params}")
    else:
        request_data = json_data or {}
        if method == "POST":
            logger.debug(f"CTS API POST request with data: {json_data}")

    # Add headers to request data
    if headers:
        request_data["_headers"] = json.dumps(headers)

    return request_data


def _handle_api_error(error: Any) -> None:
    """Handle API errors with appropriate messages."""
    if error.code == 401:
        raise CTSAPIError(
            f"Invalid API key. Please check your {NCI_API_KEY_ENV} "
            "environment variable or api_key parameter."
        )
    elif error.code == 403:
        raise CTSAPIError(
            "Access forbidden. Your API key may not have permission "
            "to access this resource."
        )
    else:
        raise CTSAPIError(f"CTS API error: {error.message}")


async def make_cts_request(
    url: str,
    method: Literal["GET", "POST"] = "GET",
    params: dict[str, Any] | None = None,
    json_data: dict[str, Any] | None = None,
    api_key: str | None = None,
) -> dict[str, Any]:
    """
    Make a request to the NCI CTS API with proper authentication.

    Args:
        url: Full URL to the CTS API endpoint
        method: HTTP method (GET or POST)
        params: Query parameters
        json_data: JSON data for POST requests
        api_key: Optional API key (if not provided, uses NCI_API_KEY env var)

    Returns:
        JSON response from the API

    Raises:
        CTSAPIError: If the request fails or API key is missing
    """
    # Validate API key
    api_key = _validate_api_key(api_key)

    # Prepare headers
    headers = {"x-api-key": api_key, "Accept": "application/json"}

    try:
        # Prepare request data
        request_data = _prepare_request_data(
            method, params, json_data, headers
        )

        # Make API request
        response, error = await request_api(
            url=url,
            request=request_data,
            method=method,
            cache_ttl=0,  # Disable caching for NCI API to ensure fresh results
        )

        # Handle errors
        if error:
            _handle_api_error(error)

        if response is None:
            raise CTSAPIError("No response received from NCI CTS API")

        return response

    except Exception as e:
        # Re-raise CTSAPIError as-is
        if isinstance(e, CTSAPIError):
            raise

        # Wrap other exceptions
        logger.error(f"CTS API request failed: {e}")
        raise CTSAPIError(f"Failed to connect to NCI CTS API: {e!s}") from e


def get_api_key_instructions() -> str:
    """
    Get user-friendly instructions for obtaining and setting the API key.

    Returns:
        Formatted string with instructions
    """
    return (
        "## NCI Clinical Trials API Key Required\n\n"
        "To use NCI's Clinical Trials Search API, you need an API key.\n\n"
        "**Option 1: Set environment variable (recommended)**\n"
        "```bash\n"
        f"export {NCI_API_KEY_ENV}='your-api-key'\n"
        "```\n\n"
        "**Option 2: Provide via CLI**\n"
        "```bash\n"
        "biomcp trial search --api-key YOUR_KEY --condition melanoma\n"
        "```\n\n"
        "**Get your free API key:**\n"
        "Visit https://www.cancer.gov/research/participate/clinical-trials-search/developers\n\n"
        "The API key provides access to NCI's comprehensive cancer clinical trials "
        "database with advanced search capabilities."
    )

```

--------------------------------------------------------------------------------
/tests/tdd/variants/test_alphagenome_api_key.py:
--------------------------------------------------------------------------------

```python
"""Test AlphaGenome per-request API key functionality."""

import os
from unittest.mock import MagicMock, patch

import pandas as pd
import pytest

from biomcp.variants.alphagenome import predict_variant_effects


@pytest.mark.asyncio
async def test_api_key_parameter_overrides_env_var():
    """Test that api_key parameter takes precedence over environment variable."""
    # Set up environment variable
    with patch.dict("os.environ", {"ALPHAGENOME_API_KEY": "env-key"}):
        # Mock AlphaGenome modules
        mock_genome = MagicMock()
        mock_client = MagicMock()
        mock_scorers = MagicMock()

        # Mock successful prediction
        test_scores_df = pd.DataFrame({
            "output_type": ["RNA_SEQ"],
            "raw_score": [1.5],
            "gene_name": ["BRAF"],
            "track_name": [None],
        })

        # Track which API key was used
        api_keys_used = []

        def track_create(api_key):
            api_keys_used.append(api_key)
            mock_model = MagicMock()
            mock_model.score_variant.return_value = test_scores_df
            return mock_model

        mock_client.create.side_effect = track_create

        mock_scorers.tidy_scores.return_value = test_scores_df
        mock_scorers.get_recommended_scorers.return_value = []

        # Create a mock module with the correct attributes
        mock_models = MagicMock()
        mock_models.dna_client = mock_client
        mock_models.variant_scorers = mock_scorers

        mock_data = MagicMock()
        mock_data.genome = mock_genome

        with patch.dict(
            "sys.modules",
            {
                "alphagenome.data": mock_data,
                "alphagenome.models": mock_models,
            },
        ):
            # Test with parameter API key
            result = await predict_variant_effects(
                "chr7", 140753336, "A", "T", api_key="param-key"
            )

            # Verify the parameter key was used, not the env var
            assert len(api_keys_used) == 1
            assert api_keys_used[0] == "param-key"
            assert "BRAF" in result


@pytest.mark.asyncio
async def test_no_api_key_shows_instructions():
    """Test that missing API key shows helpful instructions."""
    # Ensure no environment variable is set
    with patch.dict("os.environ", {}, clear=True):
        # Remove ALPHAGENOME_API_KEY if it exists
        os.environ.pop("ALPHAGENOME_API_KEY", None)

        result = await predict_variant_effects(
            "chr7", 140753336, "A", "T", skip_cache=True
        )

        # Check for instructions
        assert "AlphaGenome API key required" in result
        assert "My AlphaGenome API key is" in result
        assert "ACTION REQUIRED" in result
        assert "https://deepmind.google.com/science/alphagenome" in result


@pytest.mark.asyncio
async def test_env_var_used_when_no_parameter():
    """Test that environment variable is used when no parameter is provided."""
    # Set up environment variable
    with patch.dict("os.environ", {"ALPHAGENOME_API_KEY": "env-key"}):
        # Mock AlphaGenome modules
        mock_genome = MagicMock()
        mock_client = MagicMock()
        mock_scorers = MagicMock()

        # Mock successful prediction
        test_scores_df = pd.DataFrame({
            "output_type": ["RNA_SEQ"],
            "raw_score": [1.5],
            "gene_name": ["BRAF"],
            "track_name": [None],
        })

        # Track which API key was used
        api_keys_used = []

        def track_create(api_key):
            api_keys_used.append(api_key)
            mock_model = MagicMock()
            mock_model.score_variant.return_value = test_scores_df
            return mock_model

        mock_client.create.side_effect = track_create

        mock_scorers.tidy_scores.return_value = test_scores_df
        mock_scorers.get_recommended_scorers.return_value = []

        # Create a mock module with the correct attributes
        mock_models = MagicMock()
        mock_models.dna_client = mock_client
        mock_models.variant_scorers = mock_scorers

        mock_data = MagicMock()
        mock_data.genome = mock_genome

        with patch.dict(
            "sys.modules",
            {
                "alphagenome.data": mock_data,
                "alphagenome.models": mock_models,
            },
        ):
            # Test without parameter API key
            result = await predict_variant_effects("chr7", 140753336, "A", "T")

            # Verify the env var key was used
            assert len(api_keys_used) == 1
            assert api_keys_used[0] == "env-key"
            assert "BRAF" in result

```

--------------------------------------------------------------------------------
/src/biomcp/request_batcher.py:
--------------------------------------------------------------------------------

```python
"""Request batching utility for combining multiple small requests.

This module provides a request batcher that accumulates multiple requests
and processes them together in batches, reducing the number of API calls
and improving performance for bulk operations.

Key Features:
- Automatic batching based on size or time threshold
- Configurable batch size and timeout
- Thread-safe request accumulation
- Error propagation to individual requests

Example:
    ```python
    async def batch_api_call(params_list):
        # Make a single API call with multiple parameters
        return await api.bulk_request(params_list)

    batcher = RequestBatcher(
        batch_func=batch_api_call,
        batch_size=10,
        batch_timeout=0.1
    )

    # Individual requests are automatically batched
    result1 = await batcher.request({"id": 1})
    result2 = await batcher.request({"id": 2})
    ```
"""

import asyncio
from collections.abc import Callable, Coroutine
from typing import Any, TypeVar

T = TypeVar("T")


class RequestBatcher:
    """Batches multiple requests together to reduce overhead.

    This is particularly useful for APIs that support batch operations
    or when network latency dominates over processing time.

    The batcher accumulates requests until either:
    1. The batch size threshold is reached
    2. The batch timeout expires

    At which point all accumulated requests are processed together.
    """

    def __init__(
        self,
        batch_func: Callable[[list[Any]], Coroutine[Any, Any, list[Any]]],
        batch_size: int = 10,
        batch_timeout: float = 0.05,  # 50ms
    ):
        """Initialize the batcher.

        Args:
            batch_func: Async function that processes a batch of requests
            batch_size: Maximum number of requests to batch together
            batch_timeout: Maximum time to wait for batch to fill (seconds)
        """
        self.batch_func = batch_func
        self.batch_size = batch_size
        self.batch_timeout = batch_timeout
        self.pending_requests: list[tuple[Any, asyncio.Future]] = []
        self.batch_task: asyncio.Task | None = None
        self._lock = asyncio.Lock()

    async def request(self, params: Any) -> Any:
        """Add a request to the batch and wait for result."""
        future: asyncio.Future[Any] = asyncio.Future()

        async with self._lock:
            self.pending_requests.append((params, future))

            # Check if we should flush immediately
            if len(self.pending_requests) >= self.batch_size:
                await self._flush_batch()
            elif not self.batch_task or self.batch_task.done():
                # Start a timer to flush the batch
                self.batch_task = asyncio.create_task(self._batch_timer())

        return await future

    async def _batch_timer(self):
        """Timer that flushes the batch after timeout."""
        await asyncio.sleep(self.batch_timeout)
        async with self._lock:
            await self._flush_batch()

    async def _flush_batch(self):
        """Process all pending requests as a batch."""
        if not self.pending_requests:
            return

        # Extract current batch
        batch = self.pending_requests.copy()
        self.pending_requests.clear()

        # Cancel timer if running
        if self.batch_task and not self.batch_task.done():
            self.batch_task.cancel()

        # Process batch
        try:
            params_list = [params for params, _ in batch]
            results = await self.batch_func(params_list)

            # Distribute results to futures
            for i, (_, future) in enumerate(batch):
                if not future.done():
                    if i < len(results):
                        future.set_result(results[i])
                    else:
                        future.set_exception(
                            Exception(f"No result for request at index {i}")
                        )
        except Exception as e:
            # Propagate error to all futures
            for _, future in batch:
                if not future.done():
                    future.set_exception(e)


# Example usage for autocomplete batching
async def batch_autocomplete_requests(requests: list[dict]) -> list[Any]:
    """Process multiple autocomplete requests in parallel.

    This is an example implementation that could be used to batch
    autocomplete requests more efficiently.
    """
    from .articles.autocomplete import EntityRequest, autocomplete

    tasks = []
    for req in requests:
        entity_req = EntityRequest(**req)
        tasks.append(autocomplete(entity_req))

    return await asyncio.gather(*tasks)

```

--------------------------------------------------------------------------------
/pyproject.toml:
--------------------------------------------------------------------------------

```toml
[project]
name = "biomcp-python"
version = "0.4.6"
description = "Biomedical Model Context Protocol Server"
authors = [{ name = "Ian Maurer", email = "[email protected]" }]
readme = "README.md"
keywords = ['python']
requires-python = ">=3.10,<4.0"
classifiers = [
    "Intended Audience :: Developers",
    "Programming Language :: Python",
    "Programming Language :: Python :: 3",
    "Programming Language :: Python :: 3.10",
    "Programming Language :: Python :: 3.11",
    "Programming Language :: Python :: 3.12",
    "Programming Language :: Python :: 3.13",
    "Topic :: Software Development :: Libraries :: Python Modules",
]
dependencies = [
    "certifi>=2025.1.31",
    "diskcache>=5.6.3",
    "httpx>=0.28.1",
    "mcp[cli]>=1.12.3,<2.0.0",
    "platformdirs>=4.3.6",
    "psutil>=7.0.0",
    "pydantic>=2.10.6",
    "python-dotenv>=1.0.0",
    "rich>=14.0.0",
    "typer>=0.15.2",
    "uvicorn>=0.34.2",
    "alphagenome>=0.1.0",
]

[project.urls]
Homepage = "https://genomoncology.com/biomcp/"
Repository = "https://github.com/genomoncology/biomcp"
Documentation = "https://genomoncology.com/biomcp/"

[dependency-groups]
dev = [
    "pytest>=7.2.0",
    "pytest-xdist>=3.5.0",
    "pre-commit>=2.20.0",
    "tox-uv>=1.11.3",
    "deptry>=0.22.0",
    "mypy>=0.991",
    "pytest-cov>=4.0.0",
    "pytest-asyncio>=0.24.0",
    "ruff>=0.9.2",
    "mkdocs>=1.4.2",
    "mkdocs-material>=8.5.10",
    "mkdocstrings[python]>=0.26.1",
    "anyio>=4.8.0",
    # "ipython>=9.0.2",
    "pytest-bdd>=8.1.0",
    "tomlkit>=0.13.2",
    "assertpy>=1.1",
    "twine>=4.0.0",
    "pandas>=2.0.0",  # Used for mocking AlphaGenome responses in tests
    "PyYAML>=6.0.0",  # Used for mkdocs.yml parsing in scripts
    "pydantic-ai>=0.0.14",  # For testing Pydantic AI integration
]

[project.optional-dependencies]
api = [
]

worker = [
    "fastapi>=0.110.0",
    "starlette>=0.36.0",
    "uvicorn>=0.28.0",
]

[build-system]
requires = ["setuptools >= 61.0"]
build-backend = "setuptools.build_meta"

[tool.setuptools.package-data]
biomcp = ["resources/*.md"]

[project.scripts]
biomcp = "biomcp.__main__:main"

[tool.mypy]
files = ["src"]
ignore_missing_imports = true
disallow_untyped_defs = false
disallow_any_unimported = false
no_implicit_optional = true
check_untyped_defs = false
warn_return_any = false
warn_unused_ignores = true
show_error_codes = true
plugins = [
    "pydantic.mypy"
]
disable_error_code = [
    "union-attr",
    "prop-decorator",
]

[tool.pytest.ini_options]
testpaths = ["tests"]
addopts = "--import-mode=importlib"
asyncio_mode = "auto"
asyncio_default_fixture_loop_scope = "function"
markers = [
    "integration: marks tests as integration tests (deselect with '-m \"not integration\"')",
]
filterwarnings = [
    # Ignore protobuf version warnings from AlphaGenome
    "ignore:Protobuf gencode version.*is exactly one major version older.*:UserWarning",
    # Ignore false positive warning from pytest-xdist about coroutines
    # This occurs during parallel test execution when mock objects are cleaned up
    "ignore:coroutine 'search_trials_unified' was never awaited:RuntimeWarning",
]

[tool.ruff]
target-version = "py310"
line-length = 79
fix = true
unsafe-fixes = true

[tool.ruff.lint]
select = [
    # flake8-2020
    "YTT",
    # flake8-bandit
    "S",
    # flake8-bugbear
    "B",
    # flake8-builtins
    "A",
    # flake8-comprehensions
    "C4",
    # flake8-debugger
    "T10",
    # flake8-simplify
    "SIM",
    # isort
    "I",
    # mccabe
    "C90",
    # pycodestyle
    "E", "W",
    # pyflakes
    "F",
    # pygrep-hooks
    "PGH",
    # pyupgrade
    "UP",
    # ruff
    "RUF",
]
ignore = [
    # LineTooLong
    "E501",
    # DoNotAssignLambda
    "E731",
    # Consider unpacking
    "RUF005",
    # Union for type annotations
    "UP007",
    # Asserts are ok when I say they are ok.
    "S101",
]

[tool.ruff.lint.per-file-ignores]
"tests/*" = ["S101"]
"__init__.py" = ["I001"]
"src/biomcp/variants/external.py" = ["C901"]  # Complex API interactions are acceptable

[tool.ruff.format]
preview = true

[tool.ruff.lint.flake8-bugbear]
extend-immutable-calls = [
    "fastapi.Depends",
    "fastapi.Query",
    "typer.Argument",
    "typer.Option",
]

[tool.coverage.report]
skip_empty = true

[tool.coverage.run]
branch = true
source = ["src"]
omit = [
    "src/*/__main__.py",
    "src/*/server.py",
    "src/*/http_client.py",
]

[tool.deptry]
exclude = [
  "example_scripts/python_sdk.py",
  "venv",
  ".venv",
  ".direnv",
  "tests",
  ".git",
  "build",
  "dist",
  "scripts",
  "spike",
]

[tool.deptry.per_rule_ignores]
DEP001 = ["alphagenome"]  # Optional dependency, must be installed manually
DEP002 = ["uvicorn"]
DEP003 = ["biomcp", "alphagenome"]

```

--------------------------------------------------------------------------------
/docs/getting-started/01-quickstart-cli.md:
--------------------------------------------------------------------------------

```markdown
# Quickstart: BioMCP CLI

Get started with BioMCP in under 5 minutes! This guide walks you through installation and your first biomedical search.

## Prerequisites

- Python 3.10 or higher
- [uv](https://docs.astral.sh/uv/) package manager (recommended) or pip

## Installation

### Option 1: Using uv (Recommended)

```bash
# Install uv if you haven't already
curl -LsSf https://astral.sh/uv/install.sh | sh

# Install BioMCP
uv tool install biomcp
```

### Option 2: Using pip

```bash
pip install biomcp
```

## Your First Search

Let's search for recent articles about BRAF mutations in melanoma:

```bash
biomcp article search \
  --gene BRAF --disease melanoma --limit 5
```

This command:

- Searches PubMed/PubTator3 for articles
- Filters by BRAF gene and melanoma disease
- Returns the 5 most recent results
- Automatically includes cBioPortal cancer genomics data
- Includes preprints from bioRxiv/medRxiv by default

## Understanding the Output

The search returns:

1. **cBioPortal Summary** (if gene specified): Cancer genomics data showing mutation frequencies and hotspots
2. **Article Results**: Each result includes:
   - Title and authors
   - Journal and publication date
   - PubMed ID and direct link
   - Abstract snippet
   - Annotated entities (genes, diseases, chemicals)

## Essential Commands

### Search Clinical Trials

Find active trials for lung cancer:

```bash
biomcp trial search \
  --condition "lung cancer" \
  --status RECRUITING --limit 5
```

### Get Gene Information

Retrieve details about the TP53 tumor suppressor:

```bash
biomcp gene get TP53
```

### Look Up Drug Information

Get details about imatinib (Gleevec):

```bash
biomcp drug get imatinib
```

### Search for Genetic Variants

Find pathogenic variants in the BRCA1 gene:

```bash
biomcp variant search \
  --gene BRCA1 --significance pathogenic \
  --limit 5
```

## Next Steps

### Set Up API Keys (Optional but Recommended)

Some features require API keys for enhanced functionality:

```bash
# For NCI clinical trials database
export NCI_API_KEY="your-key-here"

# For AlphaGenome variant predictions
export ALPHAGENOME_API_KEY="your-key-here"

# For additional cBioPortal features
export CBIO_TOKEN="your-token-here"
```

See [Authentication and API Keys](03-authentication-and-api-keys.md) for detailed setup.

### Explore Advanced Features

- **Combine Multiple Filters**:

  ```bash
  biomcp article search \
    --gene EGFR --disease "lung cancer" \
    --chemical erlotinib
  ```

- **Use OR Logic in Keywords**:

  ```bash
  biomcp article search --gene BRAF --keyword "V600E|p.V600E|c.1799T>A"
  ```

- **Exclude Preprints**:
  ```bash
  biomcp article search --gene TP53 --no-preprints
  ```

### Get Help

View all available commands:

```bash
biomcp --help
```

Get help for a specific command:

```bash
biomcp article search --help
```

## Common Use Cases

### 1. Research a Specific Mutation

```bash
# Find articles about EGFR T790M resistance mutation
biomcp article search --gene EGFR \
  --keyword "T790M|p.T790M" \
  --disease "lung cancer"
```

### 2. Find Trials for a Patient

```bash
# Active trials for HER2-positive breast cancer
biomcp trial search \
  --condition "breast cancer" \
  --keyword "HER2 positive" \
  --status RECRUITING
```

### 3. Investigate Drug Mechanisms

```bash
# Get information about pembrolizumab
biomcp drug get pembrolizumab

# Find articles about its use in melanoma
biomcp article search --chemical pembrolizumab --disease melanoma
```

## Troubleshooting

### Command Not Found

If `biomcp` is not recognized:

- Ensure your PATH includes the installation directory
- Try running with full path: `~/.local/bin/biomcp`
- Restart your terminal after installation

### No Results Found

If searches return no results:

- Check spelling of gene names (use official symbols)
- Try broader search terms
- Remove filters one by one to identify the constraint

### API Rate Limits

If you encounter rate limit errors:

- Add delays between requests
- Consider setting up API keys for higher limits
- Use the `--limit` parameter to reduce result count

## Next Steps

Now that you've run your first searches, explore these resources:

1. **[Complete CLI Reference](../user-guides/01-command-line-interface.md)** - Comprehensive documentation for all commands and options
2. **[Claude Desktop Integration](02-claude-desktop-integration.md)** - Use BioMCP with AI assistants
3. **[Set up API Keys](03-authentication-and-api-keys.md)** - Enable advanced features with NCI, AlphaGenome, and cBioPortal
4. **[How-to Guides](../how-to-guides/01-find-articles-and-cbioportal-data.md)** - Step-by-step tutorials for complex research workflows
5. **[Deep Researcher Persona](../concepts/02-the-deep-researcher-persona.md)** - Learn about BioMCP's philosophy and methodology

Happy researching! 🧬🔬

```

--------------------------------------------------------------------------------
/tests/integration/test_preprints_integration.py:
--------------------------------------------------------------------------------

```python
"""Integration tests for preprint search functionality."""

import asyncio

import pytest

from biomcp.articles.preprints import (
    BiorxivClient,
    EuropePMCClient,
    PreprintSearcher,
)
from biomcp.articles.search import PubmedRequest
from biomcp.core import PublicationState


class TestBiorxivIntegration:
    """Integration tests for bioRxiv API."""

    @pytest.mark.asyncio
    async def test_biorxiv_real_search(self):
        """Test real bioRxiv API search."""
        client = BiorxivClient()

        # Try multiple search terms to find one with results
        search_terms = ["cancer", "gene", "cell", "protein", "RNA", "DNA"]
        results = []
        successful_term = None

        for term in search_terms:
            results = await client.search(term)
            if len(results) > 0:
                successful_term = term
                break

        # If no results with any term, the API might be down or have no recent articles
        if len(results) == 0:
            pytest.skip(
                "No results found with any search term - API may be down or have no matching recent articles"
            )

        # Check the structure of results
        first_result = results[0]
        assert first_result.doi is not None
        assert first_result.title is not None
        assert first_result.publication_state == PublicationState.PREPRINT
        assert "preprint" in first_result.journal.lower()

        print(
            f"Found {len(results)} bioRxiv results for term '{successful_term}'"
        )
        print(f"First result: {first_result.title}")


class TestEuropePMCIntegration:
    """Integration tests for Europe PMC API."""

    @pytest.mark.asyncio
    async def test_europe_pmc_real_search(self):
        """Test real Europe PMC API search for preprints."""
        client = EuropePMCClient()

        # Try multiple search terms to find one with results
        search_terms = [
            "cancer",
            "gene",
            "cell",
            "protein",
            "SARS-CoV-2",
            "COVID",
        ]
        results = []
        successful_term = None

        for term in search_terms:
            results = await client.search(term)
            if len(results) > 0:
                successful_term = term
                break

        # If no results with any term, the API might be down
        if len(results) == 0:
            pytest.skip(
                "No results found with any search term - Europe PMC API may be down"
            )

        # Check the structure
        first_result = results[0]
        assert first_result.title is not None
        assert first_result.publication_state == PublicationState.PREPRINT

        print(
            f"Found {len(results)} Europe PMC preprint results for term '{successful_term}'"
        )
        print(f"First result: {first_result.title}")
        if first_result.doi:
            print(f"DOI: {first_result.doi}")


class TestPreprintSearcherIntegration:
    """Integration tests for combined preprint search."""

    @pytest.mark.asyncio
    async def test_combined_search_real(self):
        """Test searching across both preprint sources."""
        searcher = PreprintSearcher()

        # Try different search combinations
        search_configs = [
            {"genes": ["TP53"], "diseases": ["cancer"]},
            {"keywords": ["protein", "structure"]},
            {"genes": ["BRAF"], "diseases": ["melanoma"]},
            {"keywords": ["gene", "expression"]},
        ]

        response = None
        successful_config = None

        for config in search_configs:
            request = PubmedRequest(**config)
            response = await searcher.search(request)
            if response.count > 0:
                successful_config = config
                break

        print(f"Total results: {response.count if response else 0}")

        # Check if we got any results
        if response and response.count > 0:
            # Check result structure
            first = response.results[0]
            assert first.title is not None
            assert first.publication_state == PublicationState.PREPRINT

            print(f"Successful search config: {successful_config}")
            print(f"First result: {first.title}")
            print(f"Date: {first.date}")
            print(f"Journal: {first.journal}")
        else:
            pytest.skip(
                "No results found with any search configuration - APIs may be down"
            )


if __name__ == "__main__":
    # Run the tests directly
    asyncio.run(TestBiorxivIntegration().test_biorxiv_real_search())
    print("\n" + "=" * 50 + "\n")
    asyncio.run(TestEuropePMCIntegration().test_europe_pmc_real_search())
    print("\n" + "=" * 50 + "\n")
    asyncio.run(TestPreprintSearcherIntegration().test_combined_search_real())

```

--------------------------------------------------------------------------------
/docs/developer-guides/05-error-handling.md:
--------------------------------------------------------------------------------

```markdown
# Error Handling Guide

## Overview

BioMCP uses a consistent error handling pattern across all HTTP operations. This guide explains the error types, when they occur, and how to handle them.

## Error Structure

All HTTP operations return a tuple: `(data, error)` where one is always `None`.

```python
data, error = await http_client.request_api(...)
if error:
    # Handle error
    logger.error(f"Request failed: {error.code} - {error.message}")
else:
    # Process data
    process_result(data)
```

## Error Types

### Network Errors

- **When**: Connection timeout, DNS resolution failure, network unreachable
- **Error Code**: Various HTTP client exceptions
- **Handling**: Retry with exponential backoff or fail gracefully

### HTTP Status Errors

- **When**: Server returns 4xx or 5xx status codes
- **Error Codes**:
  - `400-499`: Client errors (bad request, unauthorized, not found)
  - `500-599`: Server errors (internal error, service unavailable)
- **Handling**:
  - 4xx: Fix request parameters or authentication
  - 5xx: Retry with backoff or use cached data

### Circuit Breaker Errors

- **When**: Too many consecutive failures to a domain
- **Error**: Circuit breaker opens to prevent cascading failures
- **Handling**: Wait for recovery timeout or use alternative data source

### Offline Mode Errors

- **When**: `BIOMCP_OFFLINE=true` and no cached data available
- **Error**: Request blocked in offline mode
- **Handling**: Use cached data only or inform user about offline status

### Parse Errors

- **When**: Response is not valid JSON or doesn't match expected schema
- **Error**: JSON decode error or validation error
- **Handling**: Log error and treat as service issue

## Best Practices

### 1. Always Check Errors

```python
# ❌ Bad - ignoring error
data, _ = await http_client.request_api(...)
process(data)  # data might be None!

# ✅ Good - checking error
data, error = await http_client.request_api(...)
if error:
    logger.warning(f"Failed to fetch data: {error}")
    return None
process(data)
```

### 2. Provide Context in Error Messages

```python
# ❌ Bad - generic error
if error:
    logger.error("Request failed")

# ✅ Good - contextual error
if error:
    logger.error(f"Failed to fetch gene {gene_id} from cBioPortal: {error.message}")
```

### 3. Graceful Degradation

```python
async def get_variant_with_fallback(variant_id: str):
    # Try primary source
    data, error = await primary_source.get_variant(variant_id)
    if not error:
        return data

    logger.warning(f"Primary source failed: {error}, trying secondary")

    # Try secondary source
    data, error = await secondary_source.get_variant(variant_id)
    if not error:
        return data

    # Use cached data as last resort
    return get_cached_variant(variant_id)
```

### 4. User-Friendly Error Messages

```python
def format_error_for_user(error: RequestError) -> str:
    if error.code >= 500:
        return "The service is temporarily unavailable. Please try again later."
    elif error.code == 404:
        return "The requested data was not found."
    elif error.code == 401:
        return "Authentication required. Please check your credentials."
    elif "OFFLINE" in str(error):
        return "You are in offline mode. Only cached data is available."
    else:
        return "An error occurred while fetching data. Please try again."
```

## Testing Error Conditions

### 1. Simulate Network Errors

```python
with patch("biomcp.http_client.call_http") as mock:
    mock.side_effect = Exception("Network error")
    data, error = await client.fetch_data()
    assert error is not None
    assert data is None
```

### 2. Test Circuit Breaker

```python
# Simulate multiple failures
for _ in range(5):
    with patch("biomcp.http_client.call_http") as mock:
        mock.return_value = (500, "Server Error")
        await client.fetch_data()

# Circuit should be open
data, error = await client.fetch_data()
assert error is not None
assert "circuit" in error.message.lower()
```

### 3. Test Offline Mode

```python
with patch.dict(os.environ, {"BIOMCP_OFFLINE": "true"}):
    data, error = await client.fetch_data()
    # Should only return cached data or error
```

## Common Patterns

### Retry with Backoff

The centralized HTTP client automatically retries with exponential backoff for:

- Network errors
- 5xx server errors
- Rate limit errors (429)

### Caching

Failed requests don't overwrite cached data, ensuring availability during outages.

### Rate Limiting

Requests are automatically rate-limited per domain to prevent overwhelming services.

## Debugging

Enable debug logging to see all HTTP requests and errors:

```python
import logging
logging.getLogger("biomcp.http_client").setLevel(logging.DEBUG)
```

This will show:

- All HTTP requests with URLs and methods
- Response status codes and times
- Error details and retry attempts
- Circuit breaker state changes

```

--------------------------------------------------------------------------------
/src/biomcp/openfda/cache.py:
--------------------------------------------------------------------------------

```python
"""
Simple in-memory caching for OpenFDA API responses.

This module provides a time-based cache to reduce API calls and improve performance.
Cache entries expire after a configurable TTL (time-to-live).
"""

import hashlib
import json
import logging
import os
from datetime import datetime, timedelta
from typing import Any

logger = logging.getLogger(__name__)

# Cache configuration
CACHE_TTL_MINUTES = int(os.environ.get("BIOMCP_FDA_CACHE_TTL", "15"))
MAX_CACHE_SIZE = int(os.environ.get("BIOMCP_FDA_MAX_CACHE_SIZE", "100"))
MAX_RESPONSE_SIZE = int(
    os.environ.get("BIOMCP_FDA_MAX_RESPONSE_SIZE", str(1024 * 1024))
)  # 1MB default

# Global cache dictionary
_cache: dict[str, tuple[Any, datetime]] = {}


def _generate_cache_key(endpoint: str, params: dict[str, Any]) -> str:
    """
    Generate a unique cache key for an API request.

    Args:
        endpoint: The API endpoint URL
        params: Query parameters

    Returns:
        A unique hash key for the request
    """
    # Remove sensitive parameters before hashing
    safe_params = {
        k: v
        for k, v in params.items()
        if k.lower() not in ["api_key", "apikey", "key", "token", "secret"]
    }

    # Sort params for consistent hashing
    sorted_params = json.dumps(safe_params, sort_keys=True)
    combined = f"{endpoint}:{sorted_params}"

    # Use SHA256 for cache key
    return hashlib.sha256(combined.encode()).hexdigest()


def get_cached_response(
    endpoint: str, params: dict[str, Any]
) -> dict[str, Any] | None:
    """
    Retrieve a cached response if available and not expired.

    Args:
        endpoint: The API endpoint URL
        params: Query parameters

    Returns:
        Cached response data or None if not found/expired
    """
    cache_key = _generate_cache_key(endpoint, params)

    if cache_key in _cache:
        data, timestamp = _cache[cache_key]

        # Check if cache entry is still valid
        age = datetime.now() - timestamp
        if age < timedelta(minutes=CACHE_TTL_MINUTES):
            logger.debug(
                f"Cache hit for {endpoint} (age: {age.total_seconds():.1f}s)"
            )
            return data
        else:
            # Remove expired entry
            del _cache[cache_key]
            logger.debug(f"Cache expired for {endpoint}")

    return None


def set_cached_response(
    endpoint: str, params: dict[str, Any], response: dict[str, Any]
) -> None:
    """
    Store a response in the cache.

    Args:
        endpoint: The API endpoint URL
        params: Query parameters
        response: Response data to cache
    """
    # Check response size limit
    import json
    import sys

    # Better size estimation using JSON serialization
    try:
        response_json = json.dumps(response)
        response_size = len(response_json.encode("utf-8"))
    except (TypeError, ValueError):
        # If can't serialize, use sys.getsizeof
        response_size = sys.getsizeof(response)

    if response_size > MAX_RESPONSE_SIZE:
        logger.warning(
            f"Response too large to cache: {response_size} bytes > {MAX_RESPONSE_SIZE} bytes"
        )
        return

    # Check cache size limit
    if len(_cache) >= MAX_CACHE_SIZE:
        # Remove oldest entries (simple FIFO)
        oldest_keys = sorted(_cache.keys(), key=lambda k: _cache[k][1])[
            : len(_cache) - MAX_CACHE_SIZE + 1
        ]

        for key in oldest_keys:
            del _cache[key]

        logger.debug(
            f"Cache size limit reached, removed {len(oldest_keys)} entries"
        )

    cache_key = _generate_cache_key(endpoint, params)
    _cache[cache_key] = (response, datetime.now())

    logger.debug(f"Cached response for {endpoint} (cache size: {len(_cache)})")


def clear_cache() -> None:
    """Clear all cached responses."""
    global _cache
    size = len(_cache)
    _cache = {}
    logger.info(f"Cleared FDA cache ({size} entries)")


def get_cache_stats() -> dict[str, Any]:
    """
    Get cache statistics.

    Returns:
        Dictionary with cache statistics
    """
    now = datetime.now()
    valid_count = 0
    total_age = 0.0

    for _data, timestamp in _cache.values():
        age = (now - timestamp).total_seconds()
        if age < CACHE_TTL_MINUTES * 60:
            valid_count += 1
            total_age += age

    avg_age = total_age / valid_count if valid_count > 0 else 0

    return {
        "total_entries": len(_cache),
        "valid_entries": valid_count,
        "expired_entries": len(_cache) - valid_count,
        "average_age_seconds": avg_age,
        "ttl_minutes": CACHE_TTL_MINUTES,
        "max_size": MAX_CACHE_SIZE,
    }


def is_cacheable_request(endpoint: str, params: dict[str, Any]) -> bool:
    """
    Determine if a request should be cached.

    Args:
        endpoint: The API endpoint URL
        params: Query parameters

    Returns:
        True if the request should be cached
    """
    # Don't cache if caching is disabled
    if CACHE_TTL_MINUTES <= 0:
        return False

    # Don't cache very large requests
    return params.get("limit", 0) <= 100

```

--------------------------------------------------------------------------------
/tests/tdd/drugs/test_drug_getter.py:
--------------------------------------------------------------------------------

```python
"""Unit tests for drug information retrieval."""

import json

import pytest

from biomcp.drugs.getter import get_drug


class TestDrugGetter:
    """Test drug information retrieval."""

    @pytest.fixture
    def mock_drug_response(self):
        """Mock drug response from MyChem.info."""
        return {
            "_id": "CHEMBL941",
            "name": "Imatinib",
            "drugbank": {
                "id": "DB00619",
                "name": "Imatinib",
                "description": "Imatinib is a tyrosine kinase inhibitor...",
                "indication": "Treatment of chronic myeloid leukemia...",
                "mechanism_of_action": "Inhibits BCR-ABL tyrosine kinase...",
                "products": {"name": ["Gleevec", "Glivec"]},
            },
            "chembl": {
                "molecule_chembl_id": "CHEMBL941",
                "pref_name": "IMATINIB",
            },
            "pubchem": {"cid": 5291},
            "chebi": {"id": "CHEBI:45783", "name": "imatinib"},
            "inchikey": "KTUFNOKKBVMGRW-UHFFFAOYSA-N",
            "formula": "C29H31N7O",
        }

    @pytest.mark.asyncio
    async def test_get_drug_by_name(self, monkeypatch, mock_drug_response):
        """Test getting drug by name."""
        # Mock the API call
        call_count = 0
        responses = [
            # Query response
            ({"hits": [{"_id": "CHEMBL941"}]}, None),
            # Get response
            (mock_drug_response, None),
        ]

        async def mock_request_api(url, request, method, domain):
            nonlocal call_count
            result = responses[call_count]
            call_count += 1
            return result

        monkeypatch.setattr("biomcp.http_client.request_api", mock_request_api)

        result = await get_drug("imatinib")

        assert "## Drug: Imatinib" in result
        assert "DrugBank ID**: DB00619" in result
        assert "ChEMBL ID**: CHEMBL941" in result
        assert "Formula**: C29H31N7O" in result
        assert "Trade Names**: Gleevec, Glivec" in result
        assert "External Links" in result
        assert "DrugBank](https://www.drugbank.ca/drugs/DB00619)" in result

    @pytest.mark.asyncio
    async def test_get_drug_by_id(self, monkeypatch, mock_drug_response):
        """Test getting drug by DrugBank ID."""

        # Mock the API call
        async def mock_request_api(url, request, method, domain):
            return (mock_drug_response, None)

        monkeypatch.setattr("biomcp.http_client.request_api", mock_request_api)

        result = await get_drug("DB00619")

        assert "## Drug: Imatinib" in result
        assert "DrugBank ID**: DB00619" in result

    @pytest.mark.asyncio
    async def test_get_drug_json_output(self, monkeypatch, mock_drug_response):
        """Test getting drug with JSON output."""

        # Mock the API call
        async def mock_request_api(url, request, method, domain):
            return (mock_drug_response, None)

        monkeypatch.setattr("biomcp.http_client.request_api", mock_request_api)

        result = await get_drug("DB00619", output_json=True)
        data = json.loads(result)

        assert data["drug_id"] == "CHEMBL941"
        assert data["name"] == "Imatinib"
        assert data["drugbank_id"] == "DB00619"
        assert (
            data["_links"]["DrugBank"]
            == "https://www.drugbank.ca/drugs/DB00619"
        )

    @pytest.mark.asyncio
    async def test_drug_not_found(self, monkeypatch):
        """Test drug not found."""

        # Mock the API call
        async def mock_request_api(url, request, method, domain):
            return ({"hits": []}, None)

        monkeypatch.setattr("biomcp.http_client.request_api", mock_request_api)

        result = await get_drug("INVALID_DRUG_XYZ")

        assert "Drug 'INVALID_DRUG_XYZ' not found" in result

    @pytest.mark.asyncio
    async def test_drug_with_description_truncation(self, monkeypatch):
        """Test drug with long description gets truncated."""
        long_desc = "A" * 600
        mock_response = {
            "_id": "TEST001",
            "name": "TestDrug",
            "drugbank": {"id": "DB99999", "description": long_desc},
        }

        async def mock_request_api(url, request, method, domain):
            return (mock_response, None)

        monkeypatch.setattr("biomcp.http_client.request_api", mock_request_api)

        result = await get_drug("DB99999")

        assert "Description" in result
        assert "A" * 500 in result
        assert "..." in result  # Truncation indicator

    @pytest.mark.asyncio
    async def test_drug_error_handling(self, monkeypatch):
        """Test error handling."""

        # Mock the API call to raise an exception
        async def mock_request_api(url, request, method, domain):
            raise Exception("API error")

        monkeypatch.setattr("biomcp.http_client.request_api", mock_request_api)

        result = await get_drug("imatinib")

        # When an exception occurs, it's caught and the drug is reported as not found
        assert "Drug 'imatinib' not found in MyChem.info" in result

```

--------------------------------------------------------------------------------
/src/biomcp/drugs/getter.py:
--------------------------------------------------------------------------------

```python
"""Drug information retrieval from MyChem.info."""

import json
import logging

from ..integrations import BioThingsClient

logger = logging.getLogger(__name__)


def _add_drug_links(drug_info, result: dict) -> None:
    """Add external database links for the drug."""
    links = {}

    if drug_info.drugbank_id:
        links["DrugBank"] = (
            f"https://www.drugbank.ca/drugs/{drug_info.drugbank_id}"
        )

    if drug_info.chembl_id:
        links["ChEMBL"] = (
            f"https://www.ebi.ac.uk/chembl/compound_report_card/{drug_info.chembl_id}/"
        )

    if drug_info.pubchem_cid:
        links["PubChem"] = (
            f"https://pubchem.ncbi.nlm.nih.gov/compound/{drug_info.pubchem_cid}"
        )

    if drug_info.chebi_id:
        chebi_id = drug_info.chebi_id.replace("CHEBI:", "")
        links["ChEBI"] = (
            f"https://www.ebi.ac.uk/chebi/searchId.do?chebiId=CHEBI:{chebi_id}"
        )

    if links:
        result["_links"] = links


def _format_basic_info(drug_info, output_lines: list[str]) -> None:
    """Format basic drug information."""
    if drug_info.formula:
        output_lines.append(f"- **Formula**: {drug_info.formula}")

    if drug_info.drugbank_id:
        output_lines.append(f"- **DrugBank ID**: {drug_info.drugbank_id}")

    if drug_info.chembl_id:
        output_lines.append(f"- **ChEMBL ID**: {drug_info.chembl_id}")

    if drug_info.pubchem_cid:
        output_lines.append(f"- **PubChem CID**: {drug_info.pubchem_cid}")

    if drug_info.chebi_id:
        output_lines.append(f"- **ChEBI ID**: {drug_info.chebi_id}")

    if drug_info.inchikey:
        output_lines.append(f"- **InChIKey**: {drug_info.inchikey}")


def _format_clinical_info(drug_info, output_lines: list[str]) -> None:
    """Format clinical drug information."""
    if drug_info.tradename:
        names = drug_info.tradename[:5]  # Limit to first 5
        output_lines.append(f"- **Trade Names**: {', '.join(names)}")
        if len(drug_info.tradename) > 5:
            output_lines.append(f"  (and {len(drug_info.tradename) - 5} more)")

    if drug_info.description:
        desc = drug_info.description[:500]
        if len(drug_info.description) > 500:
            desc += "..."
        output_lines.append(f"\n### Description\n{desc}")

    if drug_info.indication:
        ind = drug_info.indication[:500]
        if len(drug_info.indication) > 500:
            ind += "..."
        output_lines.append(f"\n### Indication\n{ind}")

    if drug_info.mechanism_of_action:
        moa = drug_info.mechanism_of_action[:500]
        if len(drug_info.mechanism_of_action) > 500:
            moa += "..."
        output_lines.append(f"\n### Mechanism of Action\n{moa}")


def _format_drug_output(drug_info, result: dict) -> None:
    """Format drug information for text output."""
    output_lines = [f"## Drug: {drug_info.name or 'Unknown'}"]

    _format_basic_info(drug_info, output_lines)
    _format_clinical_info(drug_info, output_lines)

    if result.get("_links"):
        output_lines.append("\n### External Links")
        for name, url in result["_links"].items():
            output_lines.append(f"- [{name}]({url})")

    result["_formatted"] = "\n".join(output_lines)


async def get_drug(drug_id_or_name: str, output_json: bool = False) -> str:
    """Get drug information from MyChem.info.

    Args:
        drug_id_or_name: Drug ID (DrugBank, ChEMBL, etc.) or name
        output_json: Return JSON instead of formatted text

    Returns:
        Formatted drug information or JSON string
    """
    try:
        client = BioThingsClient()
        drug_info = await client.get_drug_info(drug_id_or_name)

        if not drug_info:
            error_msg = f"Drug '{drug_id_or_name}' not found in MyChem.info"
            if output_json:
                return json.dumps({"error": error_msg}, indent=2)
            return error_msg

        # Build result dictionary
        result = drug_info.model_dump(by_alias=False, exclude_none=True)

        # Add external links
        _add_drug_links(drug_info, result)

        if output_json:
            return json.dumps(result, indent=2)

        # Format for text output
        _format_drug_output(drug_info, result)
        return result["_formatted"]

    except Exception as e:
        logger.error(f"Error getting drug info: {e}")
        error_msg = f"Error retrieving drug information: {e!s}"
        if output_json:
            return json.dumps({"error": error_msg}, indent=2)
        return error_msg


# MCP tool function
async def _drug_details(drug_id_or_name: str) -> str:
    """Get drug/chemical information from MyChem.info.

    This tool retrieves comprehensive drug information including:
    - Drug identifiers (DrugBank, ChEMBL, PubChem, etc.)
    - Chemical properties (formula, InChIKey)
    - Trade names and synonyms
    - Clinical indications
    - Mechanism of action
    - Links to external databases

    Args:
        drug_id_or_name: Drug name (e.g., "aspirin") or ID (e.g., "DB00945", "CHEMBL25")

    Returns:
        Formatted drug information with external database links
    """
    return await get_drug(drug_id_or_name, output_json=False)

```

--------------------------------------------------------------------------------
/src/biomcp/prefetch.py:
--------------------------------------------------------------------------------

```python
"""Prefetching system for common queries to improve performance.

This module implements a prefetching mechanism that warms up caches with
commonly searched biomedical entities during startup. This significantly
improves response times for frequent queries.

Key Features:
- Prefetches common genes, diseases, and chemicals on startup
- Runs asynchronously to avoid blocking server initialization
- Includes timeout to prevent startup delays
- Graceful error handling if prefetching fails

The prefetching runs automatically when the MCP server starts via the
lifespan hook in core.py.

Configuration:
    The lists of entities to prefetch can be customized by modifying
    the COMMON_GENES, COMMON_DISEASES, and COMMON_CHEMICALS constants.
"""

import asyncio
import logging

from .constants import (
    PREFETCH_TIMEOUT,
    PREFETCH_TOP_CHEMICALS,
    PREFETCH_TOP_DISEASES,
    PREFETCH_TOP_GENES,
)

logger = logging.getLogger(__name__)

# Common genes that are frequently searched
COMMON_GENES = [
    "BRAF",
    "EGFR",
    "TP53",
    "KRAS",
    "ALK",
    "ROS1",
    "MET",
    "RET",
    "NTRK1",
    "NTRK2",
    "NTRK3",
]

# Common cancer types
COMMON_DISEASES = [
    "lung cancer",
    "breast cancer",
    "colorectal cancer",
    "melanoma",
    "non-small cell lung cancer",
    "small cell lung cancer",
]

# Common drug names
COMMON_CHEMICALS = [
    "osimertinib",
    "pembrolizumab",
    "nivolumab",
    "dabrafenib",
    "trametinib",
    "crizotinib",
    "alectinib",
]


class PrefetchManager:
    """Manages prefetching of common queries."""

    def __init__(self):
        self._prefetch_task: asyncio.Task | None = None
        self._is_prefetching = False
        self._prefetch_complete = False

    async def start_prefetching(self):
        """Start prefetching common queries in the background."""
        if self._is_prefetching or self._prefetch_complete:
            return

        self._is_prefetching = True
        try:
            # Start prefetch task
            self._prefetch_task = asyncio.create_task(
                self._prefetch_common_queries()
            )
        except Exception as e:
            logger.warning(f"Failed to start prefetching: {e}")
            self._is_prefetching = False

    async def _prefetch_common_queries(self):
        """Prefetch common queries to warm up the cache."""
        try:
            # Import here to avoid circular imports
            from .articles.autocomplete import EntityRequest, autocomplete
            from .variants.cbioportal_search import CBioPortalSearchClient

            tasks = []

            # Prefetch gene autocomplete
            for gene in COMMON_GENES[
                :PREFETCH_TOP_GENES
            ]:  # Limit to avoid overload
                request = EntityRequest(concept="gene", query=gene, limit=1)
                tasks.append(autocomplete(request))

            # Prefetch disease autocomplete
            for disease in COMMON_DISEASES[:PREFETCH_TOP_DISEASES]:
                request = EntityRequest(
                    concept="disease", query=disease, limit=1
                )
                tasks.append(autocomplete(request))

            # Prefetch chemical autocomplete
            for chemical in COMMON_CHEMICALS[:PREFETCH_TOP_CHEMICALS]:
                request = EntityRequest(
                    concept="chemical", query=chemical, limit=1
                )
                tasks.append(autocomplete(request))

            # Execute all autocomplete prefetches
            if tasks:
                await asyncio.gather(*tasks, return_exceptions=True)

            # Prefetch cBioPortal summaries for common genes
            cbio_client = CBioPortalSearchClient()
            cbio_tasks = []

            for gene in COMMON_GENES[:PREFETCH_TOP_GENES]:  # Top genes
                cbio_tasks.append(
                    cbio_client.get_gene_search_summary(gene, max_studies=5)
                )

            if cbio_tasks:
                await asyncio.gather(*cbio_tasks, return_exceptions=True)

            logger.info("Prefetching completed successfully")

        except Exception as e:
            logger.warning(f"Error during prefetching: {e}")
        finally:
            self._is_prefetching = False
            self._prefetch_complete = True

    async def wait_for_prefetch(self, timeout: float = PREFETCH_TIMEOUT):
        """Wait for prefetch to complete with timeout."""
        if not self._prefetch_task:
            return

        try:
            await asyncio.wait_for(self._prefetch_task, timeout=timeout)
        except asyncio.TimeoutError:
            # Prefetch taking too long, continue without waiting
            logger.debug("Prefetch timeout - continuing without waiting")
        except Exception as e:
            # Ignore prefetch errors
            logger.debug(f"Prefetch error ignored: {e}")


# Global prefetch manager
_prefetch_manager = PrefetchManager()


async def start_prefetching():
    """Start the prefetching process."""
    await _prefetch_manager.start_prefetching()


async def wait_for_prefetch(timeout: float = PREFETCH_TIMEOUT):
    """Wait for prefetch to complete."""
    await _prefetch_manager.wait_for_prefetch(timeout)

```

--------------------------------------------------------------------------------
/docs/backend-services-reference/01-overview.md:
--------------------------------------------------------------------------------

```markdown
# Backend Services Reference Overview

BioMCP integrates with multiple biomedical databases and services to provide comprehensive research capabilities. This reference documents the underlying APIs and their capabilities.

## Service Categories

### Literature and Publications

- **[PubTator3](06-pubtator3.md)**: Biomedical literature with entity annotations
- **Europe PMC**: Preprints from bioRxiv and medRxiv

### Clinical Trials

- **[ClinicalTrials.gov](04-clinicaltrials-gov.md)**: U.S. and international clinical trials registry
- **[NCI CTS API](05-nci-cts-api.md)**: National Cancer Institute's enhanced trial search

### Biomedical Annotations

- **[BioThings Suite](02-biothings-suite.md)**:
  - MyGene.info - Gene annotations
  - MyVariant.info - Variant annotations
  - MyDisease.info - Disease ontology
  - MyChem.info - Drug/chemical data

### Cancer Genomics

- **[cBioPortal](03-cbioportal.md)**: Cancer genomics portal with mutation data
- **TCGA**: The Cancer Genome Atlas (via MyVariant.info)

### Variant Effect Prediction

- **[AlphaGenome](07-alphagenome.md)**: Google DeepMind's AI for regulatory predictions

## API Authentication

| Service            | Authentication Required | Type    | Rate Limits         |
| ------------------ | ----------------------- | ------- | ------------------- |
| PubTator3          | No                      | Public  | 3 requests/second   |
| ClinicalTrials.gov | No                      | Public  | 50,000 requests/day |
| NCI CTS API        | Yes                     | API Key | 1,000 requests/day  |
| BioThings APIs     | No                      | Public  | 1,000 requests/hour |
| cBioPortal         | Optional                | Token   | Higher with token   |
| AlphaGenome        | Yes                     | API Key | Contact provider    |

## Data Flow Architecture

```
User Query → BioMCP Tools → Backend APIs → Unified Response

Example Flow:
1. User: "Find articles about BRAF mutations"
2. BioMCP: article_searcher tool
3. APIs Called:
   - PubTator3 (articles)
   - cBioPortal (mutation data)
   - Europe PMC (preprints)
4. Response: Integrated results with citations
```

## Service Reliability

### Primary Services

- **PubTator3**: 99.9% uptime, updated daily
- **ClinicalTrials.gov**: 99.5% uptime, updated daily
- **BioThings APIs**: 99.9% uptime, real-time data

### Fallback Strategies

- Cache frequently accessed data
- Implement exponential backoff
- Use alternative endpoints when available

## Common Integration Patterns

### 1. Entity Recognition Enhancement

```
PubTator3 → Extract entities → BioThings → Get detailed annotations
```

### 2. Variant to Trial Pipeline

```
MyVariant.info → Get gene → ClinicalTrials.gov → Find relevant trials
```

### 3. Comprehensive Gene Analysis

```
MyGene.info → Basic info
cBioPortal → Cancer mutations
PubTator3 → Literature
AlphaGenome → Predictions
```

## Performance Considerations

### Response Times (typical)

- PubTator3: 200-500ms
- ClinicalTrials.gov: 300-800ms
- BioThings APIs: 100-300ms
- cBioPortal: 200-600ms
- AlphaGenome: 1-3 seconds

### Optimization Strategies

1. **Batch requests** when APIs support it
2. **Cache static data** (gene names, ontologies)
3. **Parallelize independent** API calls
4. **Use pagination** for large result sets

## Error Handling

### Common Error Types

- **Rate Limiting**: 429 errors, implement backoff
- **Invalid Parameters**: 400 errors, validate inputs
- **Service Unavailable**: 503 errors, retry with delay
- **Authentication**: 401 errors, check API keys

### Error Response Format

```json
{
  "error": {
    "code": "RATE_LIMIT_EXCEEDED",
    "message": "API rate limit exceeded",
    "retry_after": 3600
  }
}
```

## Data Formats

### Input Formats

- **Identifiers**: HGNC symbols, rsIDs, NCT numbers, PMIDs
- **Coordinates**: GRCh38 genomic positions
- **Terms**: MeSH, MONDO, HPO ontologies

### Output Formats

- **JSON**: Primary format for all APIs
- **XML**: Available for some services
- **TSV/CSV**: Export options for bulk data

## Update Frequencies

| Service            | Update Frequency | Data Lag   |
| ------------------ | ---------------- | ---------- |
| PubTator3          | Daily            | 1-2 days   |
| ClinicalTrials.gov | Daily            | Real-time  |
| NCI CTS            | Daily            | 1 day      |
| BioThings          | Real-time        | Minutes    |
| cBioPortal         | Quarterly        | 3-6 months |

## Best Practices

### 1. API Key Management

- Store keys securely
- Rotate keys periodically
- Monitor usage against limits

### 2. Error Recovery

- Implement retry logic
- Log failed requests
- Provide fallback data

### 3. Data Validation

- Verify gene symbols
- Validate genomic coordinates
- Check identifier formats

### 4. Performance

- Cache when appropriate
- Batch similar requests
- Use appropriate page sizes

## Getting Started

1. Review individual service documentation
2. Obtain necessary API keys
3. Test endpoints with sample data
4. Implement error handling
5. Monitor usage and performance

## Support Resources

- **PubTator3**: [Support Forum](https://www.ncbi.nlm.nih.gov/research/pubtator3/)
- **ClinicalTrials.gov**: [Help Desk](https://clinicaltrials.gov/help)
- **BioThings**: [Documentation](https://docs.biothings.io/)
- **cBioPortal**: [User Guide](https://docs.cbioportal.org/)
- **NCI**: [API Support](https://api.cancer.gov/support)

```

--------------------------------------------------------------------------------
/tests/tdd/test_concurrent_requests.py:
--------------------------------------------------------------------------------

```python
"""Test concurrent request handling in the HTTP client."""

import asyncio
from unittest.mock import AsyncMock, patch

import pytest

from biomcp import http_client


class TestConcurrentRequests:
    """Test concurrent request handling."""

    @pytest.mark.asyncio
    async def test_concurrent_requests_same_domain(self):
        """Test multiple concurrent requests to the same domain."""
        # Use patch instead of direct replacement
        with patch(
            "biomcp.http_client.call_http", new_callable=AsyncMock
        ) as mock_call:
            # Configure mock to return success
            mock_call.return_value = (200, '{"data": "response"}')

            # Make 10 concurrent requests with different URLs to avoid caching
            # and disable caching explicitly
            tasks = [
                http_client.request_api(
                    url=f"https://api.example.com/resource/{i}",
                    request={},
                    domain="example",
                    cache_ttl=0,  # Disable caching
                )
                for i in range(10)
            ]

            results = await asyncio.gather(*tasks)

            # All requests should succeed
            assert len(results) == 10
            for data, error in results:
                assert error is None
                assert data == {"data": "response"}

            # Check that rate limiting was applied
            assert mock_call.call_count == 10

    @pytest.mark.asyncio
    async def test_concurrent_requests_different_domains(self):
        """Test concurrent requests to different domains."""
        with patch(
            "biomcp.http_client.call_http", new_callable=AsyncMock
        ) as mock_call:
            # Return different responses based on URL
            async def side_effect(method, url, *args, **kwargs):
                if "domain1" in url:
                    return (200, '{"source": "domain1"}')
                elif "domain2" in url:
                    return (200, '{"source": "domain2"}')
                else:
                    return (200, '{"source": "other"}')

            mock_call.side_effect = side_effect

            # Make requests to different domains
            tasks = [
                http_client.request_api(
                    "https://domain1.com/api", {}, domain="domain1"
                ),
                http_client.request_api(
                    "https://domain2.com/api", {}, domain="domain2"
                ),
                http_client.request_api(
                    "https://domain3.com/api", {}, domain="domain3"
                ),
            ]

            results = await asyncio.gather(*tasks)

            # Check results
            assert results[0][0] == {"source": "domain1"}
            assert results[1][0] == {"source": "domain2"}
            assert results[2][0] == {"source": "other"}

    @pytest.mark.asyncio
    async def test_concurrent_cache_access(self):
        """Test that concurrent requests properly use cache."""
        with patch(
            "biomcp.http_client.call_http", new_callable=AsyncMock
        ) as mock_call:
            mock_call.return_value = (200, '{"data": "cached"}')

            # First request to populate cache
            await http_client.request_api(
                url="https://api.example.com/data",
                request={},
                domain="example",
                cache_ttl=60,
            )

            # Reset call count
            initial_calls = mock_call.call_count

            # Make 5 concurrent requests to same URL
            tasks = [
                http_client.request_api(
                    url="https://api.example.com/data",
                    request={},
                    domain="example",
                    cache_ttl=60,
                )
                for _ in range(5)
            ]

            results = await asyncio.gather(*tasks)

            # All should get cached response
            assert len(results) == 5
            for data, _error in results:
                assert data == {"data": "cached"}

            # No additional HTTP calls should have been made
            assert mock_call.call_count == initial_calls

    @pytest.mark.asyncio
    async def test_concurrent_circuit_breaker(self):
        """Test circuit breaker behavior with concurrent failures."""
        with patch(
            "biomcp.http_client.call_http", new_callable=AsyncMock
        ) as mock_call:
            # Simulate failures
            mock_call.return_value = (500, "Internal Server Error")

            # Make concurrent failing requests
            tasks = [
                http_client.request_api(
                    url=f"https://failing.com/api/{i}",
                    request={},
                    domain="failing",
                )
                for i in range(10)
            ]

            results = await asyncio.gather(*tasks, return_exceptions=True)

            # All should fail
            error_count = sum(1 for _, error in results if error is not None)
            assert error_count == 10

            # Circuit should be open now
            # Additional requests should fail immediately
            _, error = await http_client.request_api(
                url="https://failing.com/api/test",
                request={},
                domain="failing",
            )

            assert error is not None
            # Check that circuit breaker is preventing calls
            # (exact behavior depends on implementation details)

```

--------------------------------------------------------------------------------
/tests/tdd/test_connection_pool.py:
--------------------------------------------------------------------------------

```python
"""Tests for connection pool management."""

import asyncio
import ssl
import weakref
from unittest.mock import patch

import httpx
import pytest

from biomcp.connection_pool import (
    EventLoopConnectionPools,
    close_all_pools,
    get_connection_pool,
)


@pytest.fixture
def pool_manager():
    """Create a fresh pool manager for testing."""
    return EventLoopConnectionPools()


@pytest.mark.asyncio
async def test_get_pool_creates_new_pool(pool_manager):
    """Test that get_pool creates a new pool when none exists."""
    timeout = httpx.Timeout(30)

    pool = await pool_manager.get_pool(verify=True, timeout=timeout)

    assert pool is not None
    assert isinstance(pool, httpx.AsyncClient)
    assert not pool.is_closed


@pytest.mark.asyncio
async def test_get_pool_reuses_existing_pool(pool_manager):
    """Test that get_pool reuses existing pools."""
    timeout = httpx.Timeout(30)

    pool1 = await pool_manager.get_pool(verify=True, timeout=timeout)
    pool2 = await pool_manager.get_pool(verify=True, timeout=timeout)

    assert pool1 is pool2


@pytest.mark.asyncio
async def test_get_pool_different_verify_settings(pool_manager):
    """Test that different verify settings create different pools."""
    timeout = httpx.Timeout(30)

    pool1 = await pool_manager.get_pool(verify=True, timeout=timeout)
    pool2 = await pool_manager.get_pool(verify=False, timeout=timeout)

    assert pool1 is not pool2


@pytest.mark.asyncio
async def test_get_pool_ssl_context(pool_manager):
    """Test pool creation with SSL context."""
    ssl_context = ssl.create_default_context()
    timeout = httpx.Timeout(30)

    pool = await pool_manager.get_pool(verify=ssl_context, timeout=timeout)

    assert pool is not None
    assert isinstance(pool, httpx.AsyncClient)


@pytest.mark.asyncio
async def test_pool_cleanup_on_close_all(pool_manager):
    """Test that close_all properly closes all pools."""
    timeout = httpx.Timeout(30)

    await pool_manager.get_pool(verify=True, timeout=timeout)
    await pool_manager.get_pool(verify=False, timeout=timeout)

    await pool_manager.close_all()

    # After close_all, pools should be cleared
    assert len(pool_manager._loop_pools) == 0


@pytest.mark.asyncio
async def test_no_event_loop_returns_single_use_client(pool_manager):
    """Test behavior when no event loop is running."""
    with patch("asyncio.get_running_loop", side_effect=RuntimeError):
        timeout = httpx.Timeout(30)

        pool = await pool_manager.get_pool(verify=True, timeout=timeout)

        assert pool is not None
        # Single-use client should have no keepalive
        # Note: httpx client internal structure may vary


@pytest.mark.asyncio
async def test_pool_recreation_after_close(pool_manager):
    """Test that a new pool is created after the old one is closed."""
    timeout = httpx.Timeout(30)

    pool1 = await pool_manager.get_pool(verify=True, timeout=timeout)
    await pool1.aclose()

    pool2 = await pool_manager.get_pool(verify=True, timeout=timeout)

    assert pool1 is not pool2
    assert pool1.is_closed
    assert not pool2.is_closed


@pytest.mark.asyncio
async def test_weak_reference_cleanup():
    """Test that weak references are used for event loops."""
    pool_manager = EventLoopConnectionPools()

    # Verify that the pool manager uses weak references
    assert isinstance(pool_manager._loop_pools, weakref.WeakKeyDictionary)

    # Create a pool
    timeout = httpx.Timeout(30)
    pool = await pool_manager.get_pool(verify=True, timeout=timeout)

    # Verify pool was created
    assert pool is not None

    # The current event loop should be in the weak key dict
    current_loop = asyncio.get_running_loop()
    assert current_loop in pool_manager._loop_pools


@pytest.mark.asyncio
async def test_global_get_connection_pool():
    """Test the global get_connection_pool function."""
    with patch.dict("os.environ", {"BIOMCP_USE_CONNECTION_POOL": "true"}):
        timeout = httpx.Timeout(30)

        pool = await get_connection_pool(verify=True, timeout=timeout)

        assert pool is not None
        assert isinstance(pool, httpx.AsyncClient)


@pytest.mark.asyncio
async def test_global_close_all_pools():
    """Test the global close_all_pools function."""
    # Create some pools
    timeout = httpx.Timeout(30)
    await get_connection_pool(verify=True, timeout=timeout)
    await get_connection_pool(verify=False, timeout=timeout)

    # Close all pools
    await close_all_pools()

    # Verify cleanup (this is implementation-specific)
    from biomcp.connection_pool import _pool_manager

    assert len(_pool_manager._loop_pools) == 0


@pytest.mark.asyncio
async def test_concurrent_pool_creation(pool_manager):
    """Test thread-safe pool creation under concurrent access."""
    timeout = httpx.Timeout(30)

    async def get_pool():
        return await pool_manager.get_pool(verify=True, timeout=timeout)

    # Create 10 concurrent requests for the same pool
    pools = await asyncio.gather(*[get_pool() for _ in range(10)])

    # All should return the same pool instance
    assert all(pool is pools[0] for pool in pools)


@pytest.mark.asyncio
async def test_connection_pool_limits():
    """Test that connection pools have proper limits set."""
    pool_manager = EventLoopConnectionPools()
    timeout = httpx.Timeout(30)

    pool = await pool_manager.get_pool(verify=True, timeout=timeout)

    # Verify pool was created (actual limits are internal to httpx)
    assert pool is not None
    assert isinstance(pool, httpx.AsyncClient)

```

--------------------------------------------------------------------------------
/tests/data/myvariant/variants_part_braf_v600_multiple.json:
--------------------------------------------------------------------------------

```json
[
  {
    "_id": "chr7:g.140453136A>G",
    "_score": 19.419012,
    "cadd": {
      "_license": "http://bit.ly/2TIuab9",
      "phred": 21.2
    },
    "chrom": "7",
    "clinvar": {
      "_license": "http://bit.ly/2SQdcI0",
      "rcv": {
        "clinical_significance": "Likely pathogenic"
      },
      "variant_id": 376288
    },
    "cosmic": {
      "_license": "http://bit.ly/2VMkY7R",
      "cosmic_id": "COSM18443"
    },
    "dbnsfp": {
      "_license": "http://bit.ly/2VLnQBz",
      "genename": ["BRAF", "BRAF", "BRAF", "BRAF"],
      "hgvsc": ["c.620T>C", "c.1919T>C", "c.1799T>C"],
      "hgvsp": ["p.V600A", "p.Val600Ala", "p.Val640Ala", "p.Val207Ala"],
      "polyphen2": {
        "hdiv": {
          "pred": "B",
          "score": 0.207
        }
      }
    },
    "dbsnp": {
      "_license": "http://bit.ly/2AqoLOc",
      "rsid": "rs113488022"
    },
    "vcf": {
      "alt": "G",
      "position": "140453136",
      "ref": "A"
    }
  },
  {
    "_id": "chr7:g.140453136A>T",
    "_score": 18.693962,
    "cadd": {
      "_license": "http://bit.ly/2TIuab9",
      "phred": 32
    },
    "chrom": "7",
    "civic": {
      "_license": "http://bit.ly/2FqS871",
      "id": 12,
      "openCravatUrl": "https://run.opencravat.org/webapps/variantreport/index.html?alt_base=T&chrom=chr7&pos=140753336&ref_base=A"
    },
    "clinvar": {
      "_license": "http://bit.ly/2SQdcI0",
      "rcv": [
        {
          "clinical_significance": "Pathogenic"
        },
        {
          "clinical_significance": "Pathogenic"
        },
        {
          "clinical_significance": "Pathogenic"
        },
        {
          "clinical_significance": "Pathogenic"
        },
        {
          "clinical_significance": "Pathogenic"
        },
        {
          "clinical_significance": "Pathogenic"
        },
        {
          "clinical_significance": "Pathogenic"
        },
        {
          "clinical_significance": "not provided"
        },
        {
          "clinical_significance": "Likely pathogenic"
        },
        {
          "clinical_significance": "Likely pathogenic"
        },
        {
          "clinical_significance": "Likely pathogenic"
        },
        {
          "clinical_significance": "Likely pathogenic"
        },
        {
          "clinical_significance": "Likely pathogenic"
        },
        {
          "clinical_significance": "Likely pathogenic"
        },
        {
          "clinical_significance": "Likely pathogenic"
        },
        {
          "clinical_significance": "Pathogenic"
        },
        {
          "clinical_significance": "Pathogenic"
        },
        {
          "clinical_significance": "Likely pathogenic"
        },
        {
          "clinical_significance": "Pathogenic"
        },
        {
          "clinical_significance": "Likely pathogenic"
        },
        {
          "clinical_significance": "Likely pathogenic"
        },
        {
          "clinical_significance": "Pathogenic"
        },
        {
          "clinical_significance": "Pathogenic"
        },
        {
          "clinical_significance": "Pathogenic"
        },
        {
          "clinical_significance": "Pathogenic"
        },
        {
          "clinical_significance": "Likely pathogenic"
        },
        {
          "clinical_significance": "Pathogenic"
        },
        {
          "clinical_significance": "Pathogenic"
        },
        {
          "clinical_significance": "Likely pathogenic"
        }
      ],
      "variant_id": 13961
    },
    "cosmic": {
      "_license": "http://bit.ly/2VMkY7R",
      "cosmic_id": "COSM476"
    },
    "dbnsfp": {
      "_license": "http://bit.ly/2VLnQBz",
      "genename": ["BRAF", "BRAF", "BRAF", "BRAF"],
      "hgvsc": ["c.620T>A", "c.1919T>A", "c.1799T>A"],
      "hgvsp": ["p.Val640Glu", "p.Val207Glu", "p.Val600Glu", "p.V600E"],
      "polyphen2": {
        "hdiv": {
          "pred": "D",
          "score": 0.971
        }
      }
    },
    "dbsnp": {
      "_license": "http://bit.ly/2AqoLOc",
      "rsid": "rs113488022"
    },
    "exac": {
      "_license": "http://bit.ly/2H9c4hg",
      "af": 1.647e-5
    },
    "gnomad_exome": {
      "_license": "http://bit.ly/2I1cl1I",
      "af": {
        "af": 3.97994e-6
      }
    },
    "vcf": {
      "alt": "T",
      "position": "140453136",
      "ref": "A"
    }
  },
  {
    "_id": "chr7:g.140453136A>C",
    "_score": 18.476965,
    "cadd": {
      "_license": "http://bit.ly/2TIuab9",
      "phred": 26.0
    },
    "chrom": "7",
    "clinvar": {
      "_license": "http://bit.ly/2SQdcI0",
      "rcv": [
        {
          "clinical_significance": "not provided"
        },
        {
          "clinical_significance": "Pathogenic"
        },
        {
          "clinical_significance": "Pathogenic"
        },
        {
          "clinical_significance": "Uncertain significance"
        }
      ],
      "variant_id": 40389
    },
    "cosmic": {
      "_license": "http://bit.ly/2VMkY7R",
      "cosmic_id": "COSM6137"
    },
    "dbnsfp": {
      "_license": "http://bit.ly/2VLnQBz",
      "genename": ["BRAF", "BRAF", "BRAF", "BRAF"],
      "hgvsc": ["c.1919T>G", "c.1799T>G", "c.620T>G"],
      "hgvsp": ["p.Val640Gly", "p.Val207Gly", "p.Val600Gly", "p.V600G"],
      "polyphen2": {
        "hdiv": {
          "pred": "P",
          "score": 0.822
        }
      }
    },
    "dbsnp": {
      "_license": "http://bit.ly/2AqoLOc",
      "rsid": "rs113488022"
    },
    "vcf": {
      "alt": "C",
      "position": "140453136",
      "ref": "A"
    }
  }
]

```
Page 2/15FirstPrevNextLast