#
tokens: 30413/50000 26/26 files
lines: off (toggle) GitHub
raw markdown copy
# Directory Structure

```
├── .env.example
├── .github
│   └── workflows
│       └── publish-to-pypi.yml
├── .gitignore
├── CHANGELOG.md
├── CLAUDE.md
├── CONTRIBUTING.md
├── Dockerfile
├── LICENSE
├── pyproject.toml
├── pytest.ini
├── README.md
├── requirements.txt
├── run-server.sh
├── SECURITY.md
├── setup.py
├── tests
│   ├── __init__.py
│   ├── test_api_key_management.py
│   ├── test_connection_manager.py
│   ├── test_health_checks.py
│   ├── test_integration.py
│   └── test_server.py
├── uv.lock
└── vectara_mcp
    ├── __init__.py
    ├── __main__.py
    ├── auth.py
    ├── connection_manager.py
    ├── health_checks.py
    └── server.py
```

# Files

--------------------------------------------------------------------------------
/.env.example:
--------------------------------------------------------------------------------

```
# Vectara API Configuration
# Copy this file to .env and fill in your actual values

# Your Vectara API key (required for all tools)
VECTARA_API_KEY=your_vectara_api_key_here

# Comma-separated list of corpus keys for testing
# Example: VECTARA_CORPUS_KEYS=corpus1,corpus2,corpus3
VECTARA_CORPUS_KEYS=your_corpus_key1,your_corpus_key2

# Optional: Test text for hallucination correction and factual consistency
TEST_TEXT=Your sample generated text here for testing hallucination correction and factual consistency evaluation.

# Optional: Test source documents (pipe-separated for multiple docs)
TEST_SOURCE_DOCS=First source document text here.|Second source document text here.
```

--------------------------------------------------------------------------------
/.gitignore:
--------------------------------------------------------------------------------

```
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class

# C extensions
*.so

# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST

# PyInstaller
#  Usually these files are written by a python script from a template
#  before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec

# Installer logs
pip-log.txt
pip-delete-this-directory.txt

# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
cover/

# Translations
*.mo
*.pot

# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal

# Flask stuff:
instance/
.webassets-cache

# Scrapy stuff:
.scrapy

# Sphinx documentation
docs/_build/

# PyBuilder
.pybuilder/
target/

# Jupyter Notebook
.ipynb_checkpoints

# IPython
profile_default/
ipython_config.py

# pyenv
#   For a library or package, you might want to ignore these files since the code is
#   intended to run in multiple environments; otherwise, check them in:
# .python-version

# pipenv
#   According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
#   However, in case of collaboration, if having platform-specific dependencies or dependencies
#   having no cross-platform support, pipenv may install dependencies that don't work, or not
#   install all needed dependencies.
#Pipfile.lock

# UV
#   Similar to Pipfile.lock, it is generally recommended to include uv.lock in version control.
#   This is especially recommended for binary packages to ensure reproducibility, and is more
#   commonly ignored for libraries.
#uv.lock

# poetry
#   Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control.
#   This is especially recommended for binary packages to ensure reproducibility, and is more
#   commonly ignored for libraries.
#   https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control
#poetry.lock

# pdm
#   Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control.
#pdm.lock
#   pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it
#   in version control.
#   https://pdm.fming.dev/latest/usage/project/#working-with-version-control
.pdm.toml
.pdm-python
.pdm-build/

# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm
__pypackages__/

# Celery stuff
celerybeat-schedule
celerybeat.pid

# SageMath parsed files
*.sage.py

# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/

# Spyder project settings
.spyderproject
.spyproject

# Rope project settings
.ropeproject

# mkdocs documentation
/site

# mypy
.mypy_cache/
.dmypy.json
dmypy.json

# Pyre type checker
.pyre/

# pytype static type analyzer
.pytype/

# Cython debug symbols
cython_debug/

# PyCharm
#  JetBrains specific template is maintained in a separate JetBrains.gitignore that can
#  be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
#  and can be added to the global gitignore or merged into this file.  For a more nuclear
#  option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/

# Ruff stuff:
.ruff_cache/

# PyPI configuration file
.pypirc

```

--------------------------------------------------------------------------------
/README.md:
--------------------------------------------------------------------------------

```markdown
# Vectara MCP Server

![GitHub Repo stars](https://img.shields.io/github/stars/Vectara/Vectara-mcp?style=social)
![PyPI version](https://img.shields.io/pypi/v/vectara-mcp.svg)
![License](https://img.shields.io/pypi/l/vectara-mcp.svg)
![Security](https://img.shields.io/badge/security-first-brightgreen)

> 🔌 **Compatible with [Claude Desktop](https://claude.ai/desktop), and any other MCP Client!**
>
> Vectara MCP is also compatible with any MCP client
>

The Model Context Protocol (MCP) is an open standard that enables AI systems to interact seamlessly with various data sources and tools, facilitating secure, two-way connections.

Vectara-MCP provides any agentic application with access to fast, reliable RAG with reduced hallucination, powered by Vectara's Trusted RAG platform, through the MCP protocol.

## Installation

You can install the package directly from PyPI:

```bash
pip install vectara-mcp
```

## Quick Start

### Secure by Default (HTTP/SSE with Authentication)

```bash
# Start server with secure HTTP transport (DEFAULT)
python -m vectara_mcp
# Server running at http://127.0.0.1:8000 with authentication enabled
```

### Local Development Mode (STDIO)

```bash
# For Claude Desktop or local development (less secure)
python -m vectara_mcp --stdio
# ⚠️ Warning: STDIO transport is less secure. Use only for local development.
```

### Configuration Options

```bash
# Custom host and port
python -m vectara_mcp --host 0.0.0.0 --port 8080

# SSE transport mode
python -m vectara_mcp --transport sse --path /sse

# Disable authentication (DANGEROUS - dev only)
python -m vectara_mcp --no-auth
```

## Transport Modes

### HTTP Transport (Default - Recommended)
- **Security:** Built-in authentication via bearer tokens
- **Encryption:** HTTPS ready
- **Rate Limiting:** 100 requests/minute by default
- **CORS Protection:** Configurable origin validation
- **Use Case:** Production deployments, cloud environments

### SSE Transport
- **Streaming:** Server-Sent Events for real-time updates
- **Authentication:** Bearer token support
- **Compatibility:** Works with legacy MCP clients
- **Use Case:** Real-time streaming applications

### STDIO Transport
- **⚠️ Security Warning:** No transport-layer security
- **Performance:** Low latency for local communication
- **Use Case:** Local development, Claude Desktop
- **Requirement:** Must be explicitly enabled with `--stdio` flag

## Environment Variables

```bash
# Required
export VECTARA_API_KEY="your-api-key"

# Optional
export VECTARA_AUTHORIZED_TOKENS="token1,token2"  # Additional auth tokens
export VECTARA_ALLOWED_ORIGINS="http://localhost:*,https://app.example.com"
export VECTARA_TRANSPORT="http"  # Default transport mode
export VECTARA_AUTH_REQUIRED="true"  # Enforce authentication
```

## Authentication

### HTTP/SSE Transport
When using HTTP or SSE transport, authentication is required by default:

```bash
# Using curl with bearer token
curl -H "Authorization: Bearer $VECTARA_API_KEY" \
     -H "Content-Type: application/json" \
     -X POST http://localhost:8000/call/ask_vectara \
     -d '{"query": "What is Vectara?", "corpus_keys": ["my-corpus"]}'

# Using X-API-Key header (alternative)
curl -H "X-API-Key: $VECTARA_API_KEY" \
     http://localhost:8000/sse
```

### Disabling Authentication (Development Only)
```bash
# ⚠️ NEVER use in production
python -m vectara_mcp --no-auth
```

## Available Tools

### API Key Management
- **setup_vectara_api_key:**
  Configure and validate your Vectara API key for the session (one-time setup).

  Args:
  - api_key: str, Your Vectara API key - required.

  Returns:
  - Success confirmation with masked API key or validation error.


- **clear_vectara_api_key:**
  Clear the stored API key from server memory.

  Returns:
  - Confirmation message.

### Query Tools
- **ask_vectara:**
  Run a RAG query using Vectara, returning search results with a generated response.

  Args:
  - query: str, The user query to run - required.
  - corpus_keys: list[str], List of Vectara corpus keys to use for the search - required.
  - n_sentences_before: int, Number of sentences before the answer to include in the context - optional, default is 2.
  - n_sentences_after: int, Number of sentences after the answer to include in the context - optional, default is 2.
  - lexical_interpolation: float, The amount of lexical interpolation to use - optional, default is 0.005.
  - max_used_search_results: int, The maximum number of search results to use - optional, default is 10.
  - generation_preset_name: str, The name of the generation preset to use - optional, default is "vectara-summary-table-md-query-ext-jan-2025-gpt-4o".
  - response_language: str, The language of the response - optional, default is "eng".

  Returns:
  - The response from Vectara, including the generated answer and the search results.

- **search_vectara:**
  Run a semantic search query using Vectara, without generation.

  Args:
  - query: str, The user query to run - required.
  - corpus_keys: list[str], List of Vectara corpus keys to use for the search - required.
  - n_sentences_before: int, Number of sentences before the answer to include in the context - optional, default is 2.
  - n_sentences_after: int, Number of sentences after the answer to include in the context - optional, default is 2.
  - lexical_interpolation: float, The amount of lexical interpolation to use - optional, default is 0.005.

  Returns:
  - The response from Vectara, including the matching search results.

### Analysis Tools
- **correct_hallucinations:**
  Identify and correct hallucinations in generated text using Vectara's VHC (Vectara Hallucination Correction) API.

  Args:
  - generated_text: str, The generated text to analyze for hallucinations - required.
  - documents: list[str], List of source documents to compare against - required.
  - query: str, The original user query that led to the generated text - optional.

  Returns:
  - JSON-formatted string containing corrected text and detailed correction information.

- **eval_factual_consistency:**
  Evaluate the factual consistency of generated text against source documents using Vectara's dedicated factual consistency evaluation API.

  Args:
  - generated_text: str, The generated text to evaluate for factual consistency - required.
  - documents: list[str], List of source documents to compare against - required.
  - query: str, The original user query that led to the generated text - optional.

  Returns:
  - JSON-formatted string containing factual consistency evaluation results and scoring.

**Note:** API key must be configured first using `setup_vectara_api_key` tool or `VECTARA_API_KEY` environment variable.


## Configuration with Claude Desktop

To use with Claude Desktop, update your configuration to use STDIO transport:

```json
{
  "mcpServers": {
    "Vectara": {
      "command": "python",
      "args": ["-m", "vectara_mcp", "--stdio"],
      "env": {
        "VECTARA_API_KEY": "your-api-key"
      }
    }
  }
}
```

Or using uv:

```json
{
  "mcpServers": {
    "Vectara": {
      "command": "uv",
      "args": ["tool", "run", "vectara-mcp", "--stdio"]
    }
  }
}
```

**Note:** Claude Desktop requires STDIO transport. While less secure than HTTP, it's acceptable for local desktop use.

## Usage in Claude Desktop App

Once the installation is complete, and the Claude desktop app is configured, you must completely close and re-open the Claude desktop app to see the Vectara-mcp server. You should see a hammer icon in the bottom left of the app, indicating available MCP tools, you can click on the hammer icon to see more detail on the Vectara-search and Vectara-extract tools.

Now claude will have complete access to the Vectara-mcp server, including all six Vectara tools.

## Secure Setup Workflow

**First-time setup (one-time per session):**
1. Configure your API key securely:
```
setup-vectara-api-key
API key: [your-vectara-api-key]
```


**After setup, use any tools without exposing your API key:**

### Vectara Tool Examples

1. **RAG Query with Generation**:
```
ask-vectara
Query: Who is Amr Awadallah?
Corpus keys: ["your-corpus-key"]
```

2. **Semantic Search Only**:
```
search-vectara
Query: events in NYC?
Corpus keys: ["your-corpus-key"]
```

3. **Hallucination Detection & Correction**:
```
correct-hallucinations
Generated text: [text to check]
Documents: ["source1", "source2"]
```

4. **Factual Consistency Evaluation**:
```
eval-factual-consistency
Generated text: [text to evaluate]
Documents: ["reference1", "reference2"]
```

## Security Best Practices

1. **Always use HTTP transport for production** - Never expose STDIO transport to the network
2. **Keep authentication enabled** - Only disable with `--no-auth` for local testing
3. **Use HTTPS in production** - Deploy behind a reverse proxy with TLS termination
4. **Configure CORS properly** - Set `VECTARA_ALLOWED_ORIGINS` to restrict access
5. **Rotate API keys regularly** - Update `VECTARA_API_KEY` and `VECTARA_AUTHORIZED_TOKENS`
6. **Monitor rate limits** - Default 100 req/min, adjust based on your needs

See [SECURITY.md](SECURITY.md) for detailed security guidelines.

## Support

For issues, questions, or contributions, please visit:
https://github.com/vectara/vectara-mcp
```

--------------------------------------------------------------------------------
/CONTRIBUTING.md:
--------------------------------------------------------------------------------

```markdown
# Contributing to vectara-mcp

Thank you for your interest in Vectara-mcp and considering contributing to our project! 
Whether it's a bug, a new tool, updates to the READ or anything else - we truly appreciate your time and effort.

This document provides guidelines and best practices to help you contribute effectively.

## Getting Started

1. Fork the repository and clone your fork.
2. Create a new branch for your changes (e.g. `bug-fix-1234`)
3. Make your changes in the new branch and test.
4. Commit and push your changes to your fork. Add useful comments to describe your changes.
6. Create a pull request following the guidelines in the [Submitting Pull Requests](#submitting-pull-requests) section.

## How to Contribute

### Reporting Bugs

If you find a bug in the project, please create an issue on GitHub with the following information:

- A clear, descriptive title for the issue.
- A description of the problem, including steps to reproduce the issue.
- Any relevant logs, screenshots, or other supporting information.

### Suggesting Enhancements

If you have an idea for a new feature or improvement, please create an issue on GitHub with the following information:

- A clear, descriptive title for the issue.
- A detailed description of the proposed enhancement, including any benefits and potential drawbacks.
- Any relevant examples, mockups, or supporting information.

### Submitting Pull Requests

When submitting a pull request, please ensure that your changes meet the following criteria:

- Your pull request should be atomic and focus on a single change.
- You should have thoroughly tested your changes with multiple different scenarios.
- You should have considered potential risks and mitigations for your changes.
- You should have documented your changes clearly and comprehensively.
- Please do not include any unrelated or "extra" small tweaks or changes.

```

--------------------------------------------------------------------------------
/CLAUDE.md:
--------------------------------------------------------------------------------

```markdown
# CLAUDE.md

This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository.

## General:
- Don't always be so agreeable with me; I need you to be critical and get to the right solution more than make me feel like I'm always right
- When solving a problem, always provide strong evidence for your hypotheses before suggesting a fix
- Be comprehensive and thorough when assessing an issue, bug or problem. base your conclusions in evidence.
- Think carefully and only action the specific task I have given you with the most concise and elegant solution that changes as little code as possible.
- Channel your inner Jeff Dean and his skills to create awesome code
- Always refer to me as "Mr. Ofer"

## Coding rules:
- Code must always be efficient and your testing thorough, just like Jeff Dean would work
- KISS (Keep It Simple, Straightforward). When two designs solve the same problem, choose the one with fewer moving parts.
- Avoid premature optimization. First make it work, then measure, then make it fast if the numbers say you must.
- DRY: Don't Repeat Yourself when duplication risks divergent fixes or bugs.
- Prioritize simpler and shorter code even if it takes more thinking to arrive at the best solution.
- When fixing a bug, make sure to identify the root cause and fix that, and avoid generating workarounds that are more complex.
- For python code, follow formatting best practices to ensure pylint passes

## Testing
- Before implementing a new feature or functionality, always add a unit test or regression test first, and confirm with me that it clearly defines the new features. That will help us work together and align on specification.
- Verify the code you generate does not introduce any security issues or vulnerabilities

## Development Commands

### Testing
- Run all tests: `python -m pytest tests/ -v`
- Run integration tests: `python -m pytest tests/test_integration.py -v -s`
- Run unit tests: `python -m pytest tests/test_server.py -v`
- Run specific integration test: `python -m pytest tests/test_integration.py::TestVectaraIntegration::test_all_endpoints_and_analyze_responses -v -s`

### Running the Server
- Start MCP server: `python -m vectara_mcp`
- Alternative: `python vectara_mcp/__main__.py`

### Environment Setup
- Install dependencies: `pip install -e .`
- Install test dependencies: `pip install -e .[test]`
- Create `.env` file with `VECTARA_API_KEY` and `VECTARA_CORPUS_KEYS` for integration tests

## Architecture Overview

This is a Model Context Protocol (MCP) server that provides RAG capabilities using Vectara's API. The architecture consists of:

### Core Components
- **vectara_mcp/server.py**: Main MCP server implementation using FastMCP framework
- **vectara_mcp/__main__.py**: Entry point that starts the server
- **tests/**: Integration and unit tests for all MCP tools

### MCP Tools Architecture
The server exposes 4 main MCP tools:

1. **ask_vectara**: Full RAG with generation - queries Vectara and returns AI-generated summary with citations
2. **search_vectara**: Semantic search only - returns ranked search results without generation
3. **correct_hallucinations**: Uses Vectara's VHC API to identify and correct hallucinations in generated text
4. **eval_factual_consistency**: Evaluates factual consistency using VHC API for evaluation metrics

### Key Design Patterns
- All tools use async/await pattern with FastMCP Context for progress reporting
- Shared utility functions for parameter validation, error handling, and API calls
- Consistent error message formatting across all tools using `_format_error()`
- Tools validate required parameters using shared validation functions
- Unified HTTP error handling with context-specific messages

### Vectara Integration
- Uses direct HTTP calls to Vectara API endpoints for all operations (no SDK dependency)
- Multi-corpus support via the `/v2/query` API endpoint
- Configurable search parameters: lexical interpolation, context sentences, reranking
- Direct HTTP calls to VHC API endpoints for hallucination correction/evaluation

### Testing Strategy
- Integration tests require real API credentials via `.env` file
- Tests are skipped automatically if credentials are missing
- Mock contexts used to test MCP-specific functionality
- Tests validate both successful responses and error handling
```

--------------------------------------------------------------------------------
/SECURITY.md:
--------------------------------------------------------------------------------

```markdown
# Security Policy and Guidelines for Vectara MCP Server

The Vectara trust and security center, including our security policy, can be found at
[https://vectara.com/legal/security-at-vectara/](https://vectara.com/legal/security-at-vectara/).

## Reporting a Vulnerability

Please send security vulnerability reports to [email protected].

---

## MCP Server Security Guidelines

### Overview

The Vectara MCP Server prioritizes security with a "secure by default" approach. This document outlines security best practices, transport layer considerations, and deployment guidelines.

### Transport Security Comparison

#### HTTP/SSE Transport (Default - Recommended)
✅ **Advantages:**
- Transport-layer encryption (HTTPS)
- Bearer token authentication
- Rate limiting protection
- CORS origin validation
- Session management with cryptographic IDs
- Audit logging capabilities

⚠️ **Considerations:**
- Requires proper TLS certificate configuration
- Network-exposed endpoints need firewall rules
- Token management overhead

#### STDIO Transport (Local Development Only)
⚠️ **Security Risks:**
- No transport-layer authentication
- API keys visible in process memory
- Credentials may leak to shell history
- No encryption between processes
- Vulnerable to local privilege escalation

✅ **Acceptable Use Cases:**
- Local development environments
- Claude Desktop (isolated desktop application)
- CI/CD testing pipelines (isolated containers)

### Authentication

#### Bearer Token Authentication (HTTP/SSE)

The server validates bearer tokens from multiple sources:

1. **Authorization Header** (Recommended)
```bash
Authorization: Bearer <token>
```

2. **X-API-Key Header** (Alternative)
```bash
X-API-Key: <token>
```

#### Token Management

```bash
# Primary API key (used for both Vectara API and MCP auth)
export VECTARA_API_KEY="vaa_xxxxxxxxxxxxx"

# Additional authorized tokens (comma-separated)
export VECTARA_AUTHORIZED_TOKENS="token1,token2,token3"
```

#### Disabling Authentication

⚠️ **WARNING**: Never disable authentication in production!

```bash
# Development only - creates security vulnerability
python -m vectara_mcp --no-auth
```

### CORS Configuration

#### Default Configuration
```bash
# Restricts to localhost by default
VECTARA_ALLOWED_ORIGINS="http://localhost:*"
```

#### Production Configuration
```bash
# Whitelist specific domains
VECTARA_ALLOWED_ORIGINS="https://app.example.com,https://admin.example.com"
```

#### Security Headers

The server automatically adds these security headers:
- `X-Content-Type-Options: nosniff`
- `X-Frame-Options: DENY`
- `X-XSS-Protection: 1; mode=block`
- `Strict-Transport-Security: max-age=31536000`
- `Content-Security-Policy: default-src 'self'`

### Rate Limiting

Default: 100 requests per minute per client

The built-in rate limiter prevents:
- DoS attacks
- Resource exhaustion
- API abuse

### Production Deployment Checklist

#### 1. Use HTTPS (Required)

Deploy behind a reverse proxy with TLS:

```nginx
server {
    listen 443 ssl http2;
    server_name api.example.com;

    ssl_certificate /path/to/cert.pem;
    ssl_certificate_key /path/to/key.pem;

    location / {
        proxy_pass http://127.0.0.1:8000;
        proxy_set_header Authorization $http_authorization;
        proxy_set_header X-Real-IP $remote_addr;
    }
}
```

#### 2. Environment Variables

```bash
# Production environment file (.env.production)
VECTARA_API_KEY="vaa_production_key"
VECTARA_AUTHORIZED_TOKENS="client1_token,client2_token"
VECTARA_ALLOWED_ORIGINS="https://app.example.com"
VECTARA_AUTH_REQUIRED="true"
```

#### 3. Network Security

```bash
# Firewall rules (iptables example)
# Allow HTTPS only
iptables -A INPUT -p tcp --dport 443 -j ACCEPT
# Block direct access to MCP port
iptables -A INPUT -p tcp --dport 8000 -s 127.0.0.1 -j ACCEPT
iptables -A INPUT -p tcp --dport 8000 -j DROP
```

#### 4. Container Security (Docker)

```dockerfile
FROM python:3.11-slim

# Run as non-root user
RUN useradd -m -u 1000 mcp-user
USER mcp-user

# Copy application
WORKDIR /app
COPY --chown=mcp-user:mcp-user . .

# Install dependencies
RUN pip install --user vectara-mcp

# Use secrets at runtime (not build time)
CMD ["python", "-m", "vectara_mcp"]
```

```yaml
# docker-compose.yml
version: '3.8'
services:
  vectara-mcp:
    image: vectara-mcp:latest
    environment:
      - VECTARA_API_KEY=${VECTARA_API_KEY}
    secrets:
      - vectara_tokens
    ports:
      - "127.0.0.1:8000:8000"
    restart: unless-stopped

secrets:
  vectara_tokens:
    external: true
```

### Common Security Mistakes to Avoid

#### ❌ DON'T: Expose STDIO to Network
```bash
# NEVER DO THIS
socat TCP-LISTEN:8080,fork EXEC:"python -m vectara_mcp --stdio"
```

#### ❌ DON'T: Disable Auth in Production
```bash
# NEVER DO THIS IN PRODUCTION
python -m vectara_mcp --no-auth --host 0.0.0.0
```

#### ❌ DON'T: Store Keys in Code
```python
# NEVER DO THIS
API_KEY = "vaa_hardcoded_key_12345"
```

#### ❌ DON'T: Use Wildcard CORS
```bash
# AVOID THIS
VECTARA_ALLOWED_ORIGINS="*"
```

### Security Incident Response

If you suspect a security breach:

1. **Immediately rotate all API keys**
```bash
# Revoke compromised keys in Vectara Console
# Generate new keys
# Update VECTARA_API_KEY and VECTARA_AUTHORIZED_TOKENS
```

2. **Review audit logs**
```bash
grep "401\|403" /var/log/vectara-mcp/audit.log
```

3. **Check for unauthorized access**
```bash
# Review unique IPs
awk '{print $1}' access.log | sort -u
```

4. **Update and patch**
```bash
pip install --upgrade vectara-mcp
```

### Compliance Considerations

#### Data Privacy
- No user queries or responses are stored by default
- API keys are only held in memory (not persisted)
- No telemetry or analytics collection

#### Regulatory Compliance
- Supports audit logging for compliance requirements
- Compatible with SOC2, HIPAA deployment patterns
- Allows data residency configuration via corpus selection

### Regular Security Maintenance

#### Weekly
- Review authentication logs
- Check for unusual access patterns
- Verify rate limiting is functioning

#### Monthly
- Rotate API tokens
- Update dependencies: `pip list --outdated`
- Review CORS and firewall rules

#### Quarterly
- Security audit of deployment
- Penetration testing (if applicable)
- Update TLS certificates before expiry

### Additional Resources

- [OWASP Security Guidelines](https://owasp.org/)
- [MCP Security Best Practices](https://modelcontextprotocol.io/docs/concepts/security)
- [Vectara Security Documentation](https://docs.vectara.com/docs/learn/security)

---

Remember: Security is not a one-time configuration but an ongoing process. Stay vigilant and keep your systems updated.
```

--------------------------------------------------------------------------------
/tests/__init__.py:
--------------------------------------------------------------------------------

```python

```

--------------------------------------------------------------------------------
/run-server.sh:
--------------------------------------------------------------------------------

```bash
docker build -t vectara-mcp .
docker run -d -p 8000:8000 vectara-mcp
```

--------------------------------------------------------------------------------
/requirements.txt:
--------------------------------------------------------------------------------

```
fastmcp>=0.4.1
fastapi>=0.100.0
uvicorn>=0.30.0
aiohttp>=3.9.0
pytest>=8.0.0
pytest-asyncio>=0.23.0
python-dotenv>=1.0.0
tenacity>=8.5.0
```

--------------------------------------------------------------------------------
/pytest.ini:
--------------------------------------------------------------------------------

```
[tool:pytest]
testpaths = tests
python_files = test_*.py
python_classes = Test*
python_functions = test_*
addopts = -v --tb=short
asyncio_mode = auto
```

--------------------------------------------------------------------------------
/vectara_mcp/__main__.py:
--------------------------------------------------------------------------------

```python
# __main__.py
"""
Vectara MCP Server entry point.

Supports multiple transports:
- HTTP (default, secure)
- SSE (Server-Sent Events)
- STDIO (for local development)
"""

from vectara_mcp.server import main

if __name__ == "__main__":
    main()
```

--------------------------------------------------------------------------------
/vectara_mcp/__init__.py:
--------------------------------------------------------------------------------

```python
"""
Vectara MCP Server

A Model Context Protocol server for Vectara Trusted Generative AI.
"""

__version__ = "0.2.0"

# Import main function for compatibility
from .server import main, mcp

# Define what gets imported with "from vectara-mcp import *"
__all__ = ["mcp", "main"]

```

--------------------------------------------------------------------------------
/Dockerfile:
--------------------------------------------------------------------------------

```dockerfile
FROM python:3.12-slim

WORKDIR /app

# Copy requirements file
COPY requirements.txt .

# Install dependencies
RUN pip install --no-cache-dir -r requirements.txt

# Copy the application code
COPY . .

# Expose the port the app runs on
EXPOSE 8000

# Start the application
CMD ["python", "server.py"]
```

--------------------------------------------------------------------------------
/pyproject.toml:
--------------------------------------------------------------------------------

```toml
[project]
name = "vectara-mcp"
version = "0.2.0"
description = "A Model Context Protocol (MCP) server that provides tools from Vectara"
readme = "README.md"
requires-python = ">=3.11"
dependencies = [
    "mcp>=1.6.0",
    "fastmcp>=0.4.1",
    "fastapi>=0.95.0",
    "uvicorn>=0.34.0",
    "aiohttp>=3.8.0",
    "tenacity>=8.0.0",
    "python-dotenv>=1.0.0",
]

[project.optional-dependencies]
test = [
    "pytest>=7.0.0",
    "pytest-asyncio>=0.21.0",
    "python-dotenv>=1.0.0",
]

[build-system]
requires = ["setuptools==67.8.0"]
build-backend = "setuptools.build_meta"
```

--------------------------------------------------------------------------------
/.github/workflows/publish-to-pypi.yml:
--------------------------------------------------------------------------------

```yaml
name: Publish to PyPI

on:
  push:
    tags:
      - 'v*'

jobs:
  test:
    runs-on: ubuntu-latest
    steps:
    - uses: actions/checkout@v4

    - name: Set up Python
      uses: actions/setup-python@v4
      with:
        python-version: '3.11'

    - name: Install dependencies
      run: |
        python -m pip install --upgrade pip
        pip install -e .[test]

    - name: Run tests
      run: |
        python -m pytest tests/ -v

  publish:
    needs: test
    runs-on: ubuntu-latest
    environment: release
    permissions:
      id-token: write

    steps:
    - uses: actions/checkout@v4

    - name: Set up Python
      uses: actions/setup-python@v4
      with:
        python-version: '3.11'

    - name: Install build dependencies
      run: |
        python -m pip install --upgrade pip
        pip install build

    - name: Build package
      run: python -m build

    - name: Publish to PyPI
      uses: pypa/gh-action-pypi-publish@release/v1
```

--------------------------------------------------------------------------------
/setup.py:
--------------------------------------------------------------------------------

```python
from setuptools import setup, find_packages

setup(
    name="vectara-mcp",
    version="0.2.0",
    description="Open source MCP server for Vectara",
    long_description=open("README.md").read(),
    long_description_content_type="text/markdown",
    author="Ofer Mendelevitch",
    author_email="[email protected]",
    url="https://github.com/vectara/vectara-mcp",
    packages=find_packages(),  # Automatically find all packages
    install_requires=[
        "mcp>=1.6.0",
        "fastmcp>=0.4.1",
        "fastapi>=0.95.0",
        "uvicorn>=0.34.0",
        "aiohttp>=3.8.0",
        "tenacity>=8.0.0",
        "python-dotenv>=1.0.0",
    ],
    classifiers=[
        "Development Status :: 4 - Beta",
        "Intended Audience :: Developers",
        "License :: OSI Approved :: Apache Software License",
        "Operating System :: OS Independent",
        "Programming Language :: Python :: 3.11",
        "Topic :: Software Development :: Libraries :: Python Modules",
        "Topic :: Text Processing :: Linguistic",
    ],
    python_requires=">=3.11",  # Specify the minimum Python version
    entry_points={
        "console_scripts": [
            "vectara-mcp=vectara_mcp.server:main",
        ],
    },
    keywords="vectara, mcp, rag, ai, search, semantic-search",
    project_urls={
        "Bug Reports": "https://github.com/vectara/vectara-mcp/issues",
        "Source": "https://github.com/vectara/vectara-mcp",
    },
)

```

--------------------------------------------------------------------------------
/CHANGELOG.md:
--------------------------------------------------------------------------------

```markdown
# Changelog

All notable changes to this project will be documented in this file.

The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [0.2.0] - 2025-01-07

### Added
- Added tenacity library for robust retry handling with exponential backoff
- Added centralized constants for timeouts and configuration values
- Added version consistency across all package files

### Changed
- Updated all hardcoded version strings to use `__version__` from `__init__.py`
- Centralized magic numbers (timeouts, rate limits) into named constants
- Improved DRY compliance by eliminating duplicate API key error messages
- Made `correct_hallucinations` and `eval_factual_consistency` tools fully self-contained

### Improved
- **Code Quality**: Reduced code duplication and improved maintainability
- **Reliability**: Replaced custom retry logic with proven library used by major projects
- **Consistency**: Unified error handling and validation patterns across all MCP tools
- **Configuration**: Centralized timeout and limit configurations for easier tuning

### Fixed
- Fixed version inconsistencies between `__init__.py`, `pyproject.toml`, `setup.py`, and hardcoded strings
- Synchronized missing dependencies between `requirements.txt`, `pyproject.toml`, and `setup.py`
- Resolved "Event loop is closed" issues in async retry contexts

### Technical Details
- Retry logic now uses `AsyncRetrying` with configurable stop conditions and wait strategies
- All timeout values now reference named constants (e.g., `DEFAULT_TOTAL_TIMEOUT = 30`)
- Rate limiting parameters now use constants (`DEFAULT_MAX_REQUESTS = 100`)
- Version information now sourced from single source of truth (`__version__`)

## [0.1.5] - Previous Release
- Basic MCP server functionality
- Vectara RAG integration
```

--------------------------------------------------------------------------------
/tests/test_connection_manager.py:
--------------------------------------------------------------------------------

```python
"""
Tests for connection manager and circuit breaker functionality.
"""

import pytest
import asyncio
import aiohttp
from unittest.mock import AsyncMock, MagicMock, patch
from vectara_mcp.connection_manager import (
    ConnectionManager,
    CircuitBreaker,
    CircuitState,
    get_connection_manager,
    cleanup_connections
)


class TestCircuitBreaker:
    """Test circuit breaker functionality."""

    @pytest.mark.asyncio
    async def test_circuit_breaker_success(self):
        """Test circuit breaker with successful calls."""
        circuit = CircuitBreaker(failure_threshold=3, recovery_timeout=1)

        async def successful_func():
            return "success"

        result = await circuit.call(successful_func)
        assert result == "success"
        assert circuit.state == CircuitState.CLOSED
        assert circuit.failure_count == 0

    @pytest.mark.asyncio
    async def test_circuit_breaker_failure_threshold(self):
        """Test circuit breaker opening after failure threshold."""
        circuit = CircuitBreaker(failure_threshold=2, recovery_timeout=1)

        async def failing_func():
            raise aiohttp.ClientError("Test error")

        # First failure
        with pytest.raises(aiohttp.ClientError):
            await circuit.call(failing_func)
        assert circuit.state == CircuitState.CLOSED
        assert circuit.failure_count == 1

        # Second failure - should open circuit
        with pytest.raises(aiohttp.ClientError):
            await circuit.call(failing_func)
        assert circuit.state == CircuitState.OPEN
        assert circuit.failure_count == 2

        # Third call should fail fast
        with pytest.raises(Exception, match="Circuit breaker OPEN"):
            await circuit.call(failing_func)

    @pytest.mark.asyncio
    async def test_circuit_breaker_recovery(self):
        """Test circuit breaker recovery after timeout."""
        circuit = CircuitBreaker(failure_threshold=1, recovery_timeout=0.1)

        async def failing_func():
            raise aiohttp.ClientError("Test error")

        async def successful_func():
            return "success"

        # Trigger circuit opening
        with pytest.raises(aiohttp.ClientError):
            await circuit.call(failing_func)
        assert circuit.state == CircuitState.OPEN

        # Wait for recovery timeout
        await asyncio.sleep(0.2)

        # Should transition to half-open and then closed on success
        result = await circuit.call(successful_func)
        assert result == "success"
        assert circuit.state == CircuitState.CLOSED
        assert circuit.failure_count == 0

    def test_circuit_breaker_state_info(self):
        """Test circuit breaker state information."""
        circuit = CircuitBreaker(failure_threshold=5, recovery_timeout=60)

        state = circuit.get_state()
        assert state["state"] == "closed"
        assert state["failure_count"] == 0
        assert state["failure_threshold"] == 5
        assert state["recovery_timeout"] == 60
        assert state["last_failure_time"] is None
```

--------------------------------------------------------------------------------
/tests/test_api_key_management.py:
--------------------------------------------------------------------------------

```python
import pytest
import json
from unittest.mock import AsyncMock, patch, MagicMock
import os
import aiohttp
from mcp.server.fastmcp import Context

from vectara_mcp.server import (
    setup_vectara_api_key,
    clear_vectara_api_key
)


class TestApiKeyManagement:
    """Test suite for API key management tools"""

    @pytest.fixture
    def mock_context(self):
        """Create a mock context for testing"""
        context = AsyncMock(spec=Context)
        context.info = MagicMock()
        context.report_progress = AsyncMock()
        return context

    @pytest.fixture(autouse=True)
    def clear_stored_api_key(self):
        """Clear stored API key before each test"""
        import vectara_mcp.server
        vectara_mcp.server._stored_api_key = None
        yield
        vectara_mcp.server._stored_api_key = None

    @pytest.mark.asyncio
    async def test_setup_vectara_api_key_missing_key(self, mock_context):
        """Test setup_vectara_api_key with missing API key"""
        result = await setup_vectara_api_key(
            api_key="",
            ctx=mock_context
        )
        assert result == "API key is required."

    @pytest.mark.asyncio
    @patch('vectara_mcp.server._make_api_request')
    async def test_setup_vectara_api_key_invalid_key(self, mock_api_request, mock_context):
        """Test setup_vectara_api_key with invalid API key (401 response)"""
        mock_api_request.side_effect = Exception("API error 401: API key error")

        result = await setup_vectara_api_key(
            api_key="invalid-key",
            ctx=mock_context
        )

        assert result == "Invalid API key. Please check your Vectara API key and try again."

    @pytest.mark.asyncio
    @patch('vectara_mcp.server._make_api_request')
    async def test_setup_vectara_api_key_success(self, mock_api_request, mock_context):
        """Test successful setup_vectara_api_key call"""
        mock_api_request.side_effect = Exception("Corpus not found")  # Valid API key but corpus doesn't exist

        result = await setup_vectara_api_key(
            api_key="valid-api-key-12345",
            ctx=mock_context
        )

        assert "API key configured successfully: vali***2345" in result
        mock_context.info.assert_called_once()

    @pytest.mark.asyncio
    @patch('vectara_mcp.server._make_api_request')
    async def test_setup_vectara_api_key_network_error(self, mock_api_request, mock_context):
        """Test setup_vectara_api_key with network error"""
        mock_api_request.side_effect = Exception("Network error")

        result = await setup_vectara_api_key(
            api_key="test-key",
            ctx=mock_context
        )

        assert result == "API validation failed: Network error"

    @pytest.mark.asyncio
    async def test_clear_vectara_api_key(self, mock_context):
        """Test clear_vectara_api_key"""
        # First set an API key
        import vectara_mcp.server
        vectara_mcp.server._stored_api_key = "test-key"

        result = await clear_vectara_api_key(ctx=mock_context)

        assert result == "API key cleared from server memory."
        assert vectara_mcp.server._stored_api_key is None
        mock_context.info.assert_called_once_with("Clearing stored Vectara API key")


```

--------------------------------------------------------------------------------
/vectara_mcp/auth.py:
--------------------------------------------------------------------------------

```python
"""
Authentication middleware for Vectara MCP Server.

Provides bearer token validation for HTTP/SSE transports.
"""

import os
import logging
from typing import Optional, Callable
from functools import wraps

logger = logging.getLogger(__name__)


class AuthMiddleware:
    """Authentication middleware for HTTP transport."""

    def __init__(self, auth_required: bool = True):
        """Initialize authentication middleware.

        Args:
            auth_required: Whether authentication is required (default: True)
        """
        self.auth_required = auth_required
        self.valid_tokens = self._load_valid_tokens()

    def _load_valid_tokens(self) -> set:
        """Load valid API tokens from environment.

        Returns:
            Set of valid bearer tokens
        """
        tokens = set()

        # Add main API key if configured
        api_key = os.getenv("VECTARA_API_KEY")
        if api_key:
            tokens.add(api_key)

        # Add additional authorized tokens (comma-separated)
        additional_tokens = os.getenv("VECTARA_AUTHORIZED_TOKENS", "")
        if additional_tokens:
            tokens.update(token.strip() for token in additional_tokens.split(",") if token.strip())

        return tokens

    def validate_token(self, token: Optional[str]) -> bool:
        """Validate a bearer token.

        Args:
            token: Bearer token to validate

        Returns:
            True if token is valid, False otherwise
        """
        if not self.auth_required:
            return True

        if not token:
            logger.warning("No authentication token provided")
            return False

        # Remove "Bearer " prefix if present
        if token.startswith("Bearer "):
            token = token[7:]

        if token in self.valid_tokens:
            return True

        logger.warning("Invalid authentication token")
        return False

    def extract_token_from_headers(self, headers: dict) -> Optional[str]:
        """Extract bearer token from request headers.

        Args:
            headers: Request headers dictionary

        Returns:
            Bearer token if found, None otherwise
        """
        # Check Authorization header
        auth_header = headers.get("Authorization") or headers.get("authorization")
        if auth_header:
            return auth_header

        # Check X-API-Key header (alternative)
        api_key_header = headers.get("X-API-Key") or headers.get("x-api-key")
        if api_key_header:
            return f"Bearer {api_key_header}"

        return None


def require_auth(auth_middleware: AuthMiddleware):
    """Decorator to require authentication for FastMCP tools.

    Args:
        auth_middleware: AuthMiddleware instance to use for validation

    Returns:
        Decorated function that checks authentication
    """
    def decorator(func: Callable) -> Callable:
        @wraps(func)
        async def wrapper(*args, **kwargs):
            # Extract context from args/kwargs
            ctx = kwargs.get('ctx')
            if not ctx:
                # Try to find context in args
                for arg in args:
                    if hasattr(arg, '__class__') and arg.__class__.__name__ == 'Context':
                        ctx = arg
                        break

            if ctx and hasattr(ctx, 'request_headers'):
                # For HTTP transport, check authentication
                headers = getattr(ctx, 'request_headers', {})
                token = auth_middleware.extract_token_from_headers(headers)

                if not auth_middleware.validate_token(token):
                    return {"error": "Authentication required. Please provide a valid bearer token."}

            # For STDIO transport or if auth not required, proceed normally
            return await func(*args, **kwargs)

        return wrapper
    return decorator


# Security headers for HTTP responses
SECURITY_HEADERS = {
    "X-Content-Type-Options": "nosniff",
    "X-Frame-Options": "DENY",
    "X-XSS-Protection": "1; mode=block",
    "Strict-Transport-Security": "max-age=31536000; includeSubDomains",
    "Content-Security-Policy": "default-src 'self'",
}


def add_security_headers(headers: dict) -> dict:
    """Add security headers to HTTP response.

    Args:
        headers: Existing headers dictionary

    Returns:
        Updated headers with security headers added
    """
    headers.update(SECURITY_HEADERS)
    return headers


class RateLimiter:
    """Simple in-memory rate limiter for API endpoints."""

    def __init__(self, max_requests: int = 100, window_seconds: int = 60):
        """Initialize rate limiter.

        Args:
            max_requests: Maximum requests per window (default: 100)
            window_seconds: Time window in seconds (default: 60)
        """
        self.max_requests = max_requests
        self.window_seconds = window_seconds
        self.requests = {}

    def is_allowed(self, client_id: str) -> bool:
        """Check if client is allowed to make a request.

        Args:
            client_id: Client identifier (IP address or token)

        Returns:
            True if request is allowed, False if rate limited
        """
        import time
        current_time = time.time()

        if client_id not in self.requests:
            self.requests[client_id] = []

        # Remove old requests outside the window
        self.requests[client_id] = [
            timestamp for timestamp in self.requests[client_id]
            if current_time - timestamp < self.window_seconds
        ]

        # Check if limit exceeded
        if len(self.requests[client_id]) >= self.max_requests:
            logger.warning(f"Rate limit exceeded for client: {client_id}")
            return False

        # Add current request
        self.requests[client_id].append(current_time)
        return True


# CORS configuration for HTTP transport
CORS_CONFIG = {
    "allowed_origins": os.getenv("VECTARA_ALLOWED_ORIGINS", "http://localhost:*").split(","),
    "allowed_methods": ["GET", "POST", "OPTIONS"],
    "allowed_headers": ["Authorization", "Content-Type", "X-API-Key"],
    "max_age": 3600,
}


def validate_origin(origin: Optional[str]) -> bool:
    """Validate request origin against allowed origins.

    Args:
        origin: Origin header from request

    Returns:
        True if origin is allowed, False otherwise
    """
    if not origin:
        return True  # No origin header (not a browser request)

    for allowed_origin in CORS_CONFIG["allowed_origins"]:
        if allowed_origin == "*":
            return True

        if allowed_origin.endswith("*"):
            # Wildcard matching
            prefix = allowed_origin[:-1]
            if origin.startswith(prefix):
                return True
        elif origin == allowed_origin:
            return True

    logger.warning(f"Rejected request from unauthorized origin: {origin}")
    return False
```

--------------------------------------------------------------------------------
/tests/test_integration.py:
--------------------------------------------------------------------------------

```python
import pytest
import pytest_asyncio
import os
import json
from dotenv import load_dotenv
from mcp.server.fastmcp import Context
from unittest.mock import AsyncMock, MagicMock

from vectara_mcp.server import (
    ask_vectara,
    search_vectara,
    correct_hallucinations,
    eval_factual_consistency
)

# Load environment variables
load_dotenv()

# Test configuration
API_KEY = os.getenv("VECTARA_API_KEY")
CORPUS_KEYS = os.getenv("VECTARA_CORPUS_KEYS", "").split(",") if os.getenv("VECTARA_CORPUS_KEYS") else []
TEST_TEXT = os.getenv("TEST_TEXT", "The capital of France is Berlin. The Eiffel Tower is located in London.")
TEST_SOURCE_DOCS = os.getenv("TEST_SOURCE_DOCS", "Paris is the capital of France. The Eiffel Tower is located in Paris, France.|London is the capital of the United Kingdom.").split("|")

# Skip integration tests if no API key provided
pytestmark = pytest.mark.skipif(
    not API_KEY or not CORPUS_KEYS or CORPUS_KEYS == [""],
    reason="Integration tests require VECTARA_API_KEY and VECTARA_CORPUS_KEYS in .env file"
)


class TestVectaraIntegration:
    """Integration tests for Vectara MCP tools using real API endpoints"""

    @pytest_asyncio.fixture(autouse=True)
    async def cleanup_connection_manager(self):
        """Cleanup connection manager before and after each test"""
        from vectara_mcp.connection_manager import ConnectionManager, connection_manager
        # Clean up before test
        await connection_manager.close()
        ConnectionManager.reset_instance()
        yield
        # Clean up after test
        await connection_manager.close()
        ConnectionManager.reset_instance()

    @pytest.fixture
    def mock_context(self):
        """Create a mock context for testing"""
        context = AsyncMock(spec=Context)
        context.info = MagicMock()  # Non-async mock to avoid coroutine warnings
        context.report_progress = AsyncMock()  # Keep async since this is actually async
        return context

    @pytest.mark.asyncio
    async def test_ask_vectara_integration(self, mock_context):
        """Test ask_vectara with real API to determine response format"""
        if not API_KEY or not CORPUS_KEYS:
            pytest.skip("Missing API credentials")

        query = "What is the main topic of this corpus?"

        # Set API key in environment since integration tests need it
        import vectara_mcp.server
        vectara_mcp.server._stored_api_key = API_KEY

        result = await ask_vectara(
            query=query,
            ctx=mock_context,
            corpus_keys=CORPUS_KEYS,
            max_used_search_results=5
        )

        # Print result for analysis
        print(f"\n=== ask_vectara result type: {type(result)} ===")
        print(f"Result: {result}")
        print("=" * 50)

        # Basic validation
        assert isinstance(result, dict)
        assert "summary" in result
        assert "citations" in result
        assert "error" not in result

        return result

    @pytest.mark.asyncio
    async def test_search_vectara_integration(self, mock_context):
        """Test search_vectara with real API to determine response format"""
        if not API_KEY or not CORPUS_KEYS:
            pytest.skip("Missing API credentials")

        query = "main topics"

        # Set API key in environment since integration tests need it
        import vectara_mcp.server
        vectara_mcp.server._stored_api_key = API_KEY

        result = await search_vectara(
            query=query,
            ctx=mock_context,
            corpus_keys=CORPUS_KEYS
        )

        # Print result for analysis
        print(f"\n=== search_vectara result type: {type(result)} ===")
        print(f"Result: {result}")
        print("=" * 50)

        # Basic validation
        assert isinstance(result, dict)
        assert "search_results" in result
        assert "error" not in result

        return result

    @pytest.mark.asyncio
    async def test_correct_hallucinations_integration(self, mock_context):
        """Test correct_hallucinations with real API to determine response format"""
        if not API_KEY:
            pytest.skip("Missing API key")

        # Set API key in environment since integration tests need it
        import vectara_mcp.server
        vectara_mcp.server._stored_api_key = API_KEY

        result = await correct_hallucinations(
            generated_text=TEST_TEXT,
            documents=TEST_SOURCE_DOCS,
            ctx=mock_context
        )

        # Print result for analysis
        print(f"\n=== correct_hallucinations result type: {type(result)} ===")
        print(f"Result: {result}")
        print("=" * 50)

        # Basic validation - VHC functions now return dict
        assert isinstance(result, dict)
        assert "error" not in result

        print(f"Result structure: {json.dumps(result, indent=2)}")

        return result

    @pytest.mark.asyncio
    async def test_eval_factual_consistency_integration(self, mock_context):
        """Test eval_factual_consistency with real API to determine response format"""
        if not API_KEY:
            pytest.skip("Missing API key")

        # Set API key in environment since integration tests need it
        import vectara_mcp.server
        vectara_mcp.server._stored_api_key = API_KEY

        result = await eval_factual_consistency(
            generated_text=TEST_TEXT,
            documents=TEST_SOURCE_DOCS,
            ctx=mock_context
        )

        # Print result for analysis
        print(f"\n=== eval_factual_consistency result type: {type(result)} ===")
        print(f"Result: {result}")
        print("=" * 50)

        # Basic validation - VHC functions now return dict
        assert isinstance(result, dict)
        assert "error" not in result

        print(f"Result structure: {json.dumps(result, indent=2)}")

        return result

    @pytest.mark.asyncio
    async def test_all_endpoints_and_analyze_responses(self, mock_context):
        """Run all endpoints and analyze response formats for docstring updates"""
        if not API_KEY:
            pytest.skip("Missing API key")

        print("\n" + "="*80)
        print("COMPREHENSIVE RESPONSE FORMAT ANALYSIS")
        print("="*80)

        # Test all endpoints
        results = {}

        if CORPUS_KEYS and CORPUS_KEYS != [""]:
            print("\n--- Testing ask_vectara ---")
            results['ask_vectara'] = await self.test_ask_vectara_integration(mock_context)

            print("\n--- Testing search_vectara ---")
            results['search_vectara'] = await self.test_search_vectara_integration(mock_context)
        else:
            print("\nSkipping corpus-based tests (no corpus keys)")

        print("\n--- Testing correct_hallucinations ---")
        results['correct_hallucinations'] = await self.test_correct_hallucinations_integration(mock_context)

        print("\n--- Testing eval_factual_consistency ---")
        results['eval_factual_consistency'] = await self.test_eval_factual_consistency_integration(mock_context)

        # Generate docstring recommendations
        print("\n" + "="*80)
        print("DOCSTRING UPDATE RECOMMENDATIONS")
        print("="*80)

        for tool_name, result in results.items():
            print(f"\n--- {tool_name} ---")
            if isinstance(result, dict):
                if "error" in result:
                    print(f"❌ API Error: {result['error']}")
                else:
                    print(f"✅ Returns dict with structure:")
                    print(f"   Keys: {list(result.keys())}")
                    for key, value in result.items():
                        print(f"   - {key}: {type(value).__name__}")
            else:
                print(f"⚠️  Unexpected return type: {type(result).__name__}")
                print(f"   Value: {result}")

        print("\n" + "="*80)
```

--------------------------------------------------------------------------------
/vectara_mcp/connection_manager.py:
--------------------------------------------------------------------------------

```python
"""
Connection management and resilience patterns for Vectara MCP Server.

Provides persistent connection pooling and circuit breaker pattern
for reliable communication with Vectara API.
"""

import asyncio
import logging
import time
from enum import Enum
from typing import Optional, Dict, Any
import aiohttp
import ssl
from tenacity import AsyncRetrying, stop_after_attempt, wait_exponential, retry_if_exception_type

logger = logging.getLogger(__name__)

# Connection timeout constants
DEFAULT_TOTAL_TIMEOUT = 30  # Total request timeout
DEFAULT_CONNECT_TIMEOUT = 10  # Connection timeout
DEFAULT_SOCK_READ_TIMEOUT = 20  # Socket read timeout
DEFAULT_HEALTH_CHECK_TIMEOUT = 5  # Health check timeout

# Circuit breaker constants
DEFAULT_CIRCUIT_FAILURE_THRESHOLD = 5
DEFAULT_CIRCUIT_RECOVERY_TIMEOUT = 60


class CircuitState(Enum):
    """Circuit breaker states."""
    CLOSED = "closed"      # Normal operation
    OPEN = "open"          # Failing, reject requests
    HALF_OPEN = "half_open"  # Testing recovery


class CircuitBreaker:
    """Circuit breaker pattern implementation for API resilience."""

    def __init__(
        self,
        failure_threshold: int = DEFAULT_CIRCUIT_FAILURE_THRESHOLD,
        recovery_timeout: int = DEFAULT_CIRCUIT_RECOVERY_TIMEOUT,
        expected_exception: tuple = (aiohttp.ClientError, asyncio.TimeoutError)
    ):
        """Initialize circuit breaker.

        Args:
            failure_threshold: Number of failures before opening circuit
            recovery_timeout: Seconds to wait before attempting recovery
            expected_exception: Exception types that trigger circuit opening
        """
        self.failure_threshold = failure_threshold
        self.recovery_timeout = recovery_timeout
        self.expected_exception = expected_exception

        self.failure_count = 0
        self.last_failure_time = None
        self.state = CircuitState.CLOSED
        self._lock = asyncio.Lock()

    async def call(self, func, *args, **kwargs):
        """Execute function with circuit breaker protection.

        Args:
            func: Async function to execute
            *args: Function arguments
            **kwargs: Function keyword arguments

        Returns:
            Function result

        Raises:
            Exception: If circuit is open or function fails
        """
        async with self._lock:
            if self.state == CircuitState.OPEN:
                if self._should_attempt_reset():
                    self.state = CircuitState.HALF_OPEN
                    logger.info("Circuit breaker transitioning to HALF_OPEN")
                else:
                    raise Exception(f"Circuit breaker OPEN. Last failure: {self.last_failure_time}")

        try:
            result = await func(*args, **kwargs)
            await self._on_success()
            return result
        except self.expected_exception as e:
            await self._on_failure()
            raise
        except Exception as e:
            # Unexpected exceptions don't trigger circuit breaker
            logger.warning(f"Unexpected exception in circuit breaker: {e}")
            raise

    def _should_attempt_reset(self) -> bool:
        """Check if enough time has passed to attempt reset."""
        if self.last_failure_time is None:
            return True
        return time.time() - self.last_failure_time >= self.recovery_timeout

    async def _on_success(self):
        """Handle successful execution."""
        async with self._lock:
            if self.state == CircuitState.HALF_OPEN:
                self.state = CircuitState.CLOSED
                logger.info("Circuit breaker reset to CLOSED")
            self.failure_count = 0

    async def _on_failure(self):
        """Handle failed execution."""
        async with self._lock:
            self.failure_count += 1
            self.last_failure_time = time.time()

            if self.failure_count >= self.failure_threshold:
                self.state = CircuitState.OPEN
                logger.warning(f"Circuit breaker OPEN after {self.failure_count} failures")

    def get_state(self) -> Dict[str, Any]:
        """Get current circuit breaker state for monitoring."""
        return {
            "state": self.state.value,
            "failure_count": self.failure_count,
            "last_failure_time": self.last_failure_time,
            "failure_threshold": self.failure_threshold,
            "recovery_timeout": self.recovery_timeout
        }


class ConnectionManager:
    """Manages persistent HTTP connections for Vectara API."""

    _instance: Optional['ConnectionManager'] = None
    _lock = asyncio.Lock()

    def __new__(cls):
        """Singleton pattern implementation."""
        if cls._instance is None:
            cls._instance = super().__new__(cls)
        return cls._instance

    def __init__(self):
        """Initialize connection manager."""
        if hasattr(self, '_initialized'):
            return

        self._session: Optional[aiohttp.ClientSession] = None
        self._circuit_breaker = CircuitBreaker()
        self._session_loop: Optional[asyncio.AbstractEventLoop] = None
        self._initialized = True

        # Connection pool configuration
        self._connector_config = {
            'limit': 100,  # Total connection limit
            'limit_per_host': 30,  # Connections per host
            'ttl_dns_cache': 300,  # DNS cache TTL
            'use_dns_cache': True,
            'keepalive_timeout': 30,
            'enable_cleanup_closed': True
        }

        # Request timeout configuration
        self._timeout_config = aiohttp.ClientTimeout(
            total=DEFAULT_TOTAL_TIMEOUT,
            connect=DEFAULT_CONNECT_TIMEOUT,
            sock_read=DEFAULT_SOCK_READ_TIMEOUT,
        )

    async def initialize(self):
        """Initialize the HTTP session."""
        current_loop = asyncio.get_running_loop()

        # Check if session exists and is bound to a different/closed event loop
        if self._session is not None:
            if self._session_loop != current_loop or self._session.closed:
                logger.info("Session bound to different/closed event loop, reinitializing")
                await self._close_session()
            else:
                return

        async with self._lock:
            # Double-check after acquiring lock
            if self._session is not None and self._session_loop == current_loop and not self._session.closed:
                return

            # Close existing session if it exists
            if self._session is not None:
                await self._close_session()

            # Create SSL context with verification
            ssl_context = ssl.create_default_context()
            ssl_context.check_hostname = True
            ssl_context.verify_mode = ssl.CERT_REQUIRED

            # Create TCP connector with configuration
            connector = aiohttp.TCPConnector(
                ssl=ssl_context,
                **self._connector_config
            )

            # Create session with connector and timeout
            self._session = aiohttp.ClientSession(
                connector=connector,
                timeout=self._timeout_config,
                headers={
                    'User-Agent': 'Vectara-MCP-Server/2.0',
                    'Accept': 'application/json',
                    'Accept-Encoding': 'gzip, deflate'
                }
            )
            self._session_loop = current_loop

            logger.info("Connection manager initialized with persistent session")

    async def _close_session(self):
        """Helper method to safely close the current session."""
        if self._session is not None:
            try:
                await self._session.close()
            except RuntimeError as e:
                if "Event loop is closed" not in str(e):
                    raise
                # Silently handle event loop closure during cleanup
            finally:
                self._session = None
                self._session_loop = None

    async def close(self):
        """Close the HTTP session and cleanup resources."""
        await self._close_session()
        logger.info("Connection manager closed")

    @classmethod
    def reset_instance(cls):
        """Reset the singleton instance. Use with caution - mainly for testing."""
        cls._instance = None


    async def request(
        self,
        method: str,
        url: str,
        headers: Optional[Dict[str, str]] = None,
        json_data: Optional[Dict[str, Any]] = None,
        **kwargs
    ) -> aiohttp.ClientResponse:
        """Make HTTP request with circuit breaker protection and retry logic.

        Args:
            method: HTTP method (GET, POST, etc.)
            url: Request URL
            headers: Request headers
            json_data: JSON payload
            **kwargs: Additional aiohttp parameters

        Returns:
            aiohttp.ClientResponse: HTTP response

        Raises:
            Exception: If circuit is open or request fails after retries
        """
        await self.initialize()

        if self._session is None:
            raise RuntimeError("Session not initialized")

        if self._session.closed:
            raise RuntimeError("Session has been closed")

        async def _make_request_with_circuit_breaker():
            """Make request through circuit breaker."""
            async def _make_request():
                response = await self._session.request(
                    method=method,
                    url=url,
                    headers=headers,
                    json=json_data,
                    **kwargs
                )

                # Check for HTTP errors that should trigger circuit breaker
                if response.status >= 500:
                    raise aiohttp.ClientResponseError(
                        request_info=response.request_info,
                        history=response.history,
                        status=response.status,
                        message=f"HTTP {response.status}"
                    )

                return response

            return await self._circuit_breaker.call(_make_request)

        # Apply retry logic with circuit breaker using tenacity
        async for attempt in AsyncRetrying(
            stop=stop_after_attempt(3),
            wait=wait_exponential(multiplier=1, min=1, max=10),
            retry=retry_if_exception_type((aiohttp.ClientError, asyncio.TimeoutError))
        ):
            with attempt:
                return await _make_request_with_circuit_breaker()

    def get_stats(self) -> Dict[str, Any]:
        """Get connection and circuit breaker statistics."""
        stats = {
            "session_initialized": self._session is not None,
            "circuit_breaker": self._circuit_breaker.get_state(),
            "connector_config": self._connector_config,
        }

        if self._session and hasattr(self._session.connector, '_conns'):
            # Get connection pool stats if available
            try:
                connector = self._session.connector
                stats["connection_pool"] = {
                    "total_connections": len(connector._conns),
                    "available_connections": sum(
                        len(conns) for conns in connector._conns.values()
                    )
                }
            except (AttributeError, TypeError):
                # Connector stats not available in this aiohttp version
                pass

        return stats

    async def health_check(self, url: str = "https://api.vectara.io/v2") -> Dict[str, Any]:
        """Perform health check on Vectara API.

        Args:
            url: Base URL to check

        Returns:
            Dict with health check results
        """
        start_time = time.time()

        try:
            response = await self.request('GET', f"{url}/health", timeout=DEFAULT_HEALTH_CHECK_TIMEOUT)
            duration = time.time() - start_time

            return {
                "status": "healthy",
                "response_time_ms": round(duration * 1000, 2),
                "status_code": response.status,
                "circuit_breaker_state": self._circuit_breaker.state.value
            }
        except Exception as e:
            duration = time.time() - start_time
            return {
                "status": "unhealthy",
                "error": str(e),
                "response_time_ms": round(duration * 1000, 2),
                "circuit_breaker_state": self._circuit_breaker.state.value
            }


# Global connection manager instance
connection_manager = ConnectionManager()


async def get_connection_manager() -> ConnectionManager:
    """Get the global connection manager instance.

    Returns:
        ConnectionManager: Singleton instance
    """
    await connection_manager.initialize()
    return connection_manager


async def cleanup_connections():
    """Cleanup function for graceful shutdown."""
    await connection_manager.close()
```

--------------------------------------------------------------------------------
/vectara_mcp/health_checks.py:
--------------------------------------------------------------------------------

```python
"""
Health check endpoints for Vectara MCP Server.

Provides liveness, readiness, and detailed health status endpoints
for production deployment with load balancers and orchestration platforms.
"""

import asyncio
import logging
import time
from typing import Dict, Any, Optional, List
from dataclasses import dataclass
from enum import Enum

from .connection_manager import get_connection_manager
from . import __version__

logger = logging.getLogger(__name__)


class HealthStatus(Enum):
    """Health check status values."""
    HEALTHY = "healthy"
    UNHEALTHY = "unhealthy"
    DEGRADED = "degraded"
    UNKNOWN = "unknown"


@dataclass
class HealthCheck:
    """Individual health check result."""
    name: str
    status: HealthStatus
    message: str
    response_time_ms: Optional[float] = None
    details: Optional[Dict[str, Any]] = None


class HealthChecker:
    """Manages health checks for the MCP server."""

    def __init__(self):
        """Initialize health checker."""
        self.server_start_time = time.time()
        self.last_check_cache = {}
        self.cache_ttl = 5  # Cache health checks for 5 seconds

    async def liveness_check(self) -> Dict[str, Any]:
        """Basic liveness check - is the server process running and responding?

        This should be fast and only check if the process is alive.
        Used by load balancers to determine if traffic should be routed here.

        Returns:
            Dict: Liveness status
        """
        return {
            "status": HealthStatus.HEALTHY.value,
            "timestamp": time.time(),
            "uptime_seconds": round(time.time() - self.server_start_time, 2),
            "version": __version__,
            "service": "vectara-mcp-server"
        }

    async def readiness_check(self) -> Dict[str, Any]:
        """Readiness check - can the server handle traffic?

        Checks critical dependencies that must be working for the server
        to properly handle requests. Used by orchestration platforms.

        Returns:
            Dict: Readiness status with dependency checks
        """
        checks = []
        overall_status = HealthStatus.HEALTHY
        start_time = time.time()

        # Check connection manager
        try:
            connection_check = await self._check_connection_manager()
            checks.append(connection_check)
            if connection_check.status != HealthStatus.HEALTHY:
                overall_status = HealthStatus.UNHEALTHY
        except Exception as e:
            checks.append(HealthCheck(
                name="connection_manager",
                status=HealthStatus.UNHEALTHY,
                message=f"Connection manager check failed: {str(e)}"
            ))
            overall_status = HealthStatus.UNHEALTHY

        # Check Vectara API connectivity
        try:
            vectara_check = await self._check_vectara_connectivity()
            checks.append(vectara_check)
            if vectara_check.status == HealthStatus.UNHEALTHY:
                overall_status = HealthStatus.UNHEALTHY
            elif vectara_check.status == HealthStatus.DEGRADED and overall_status == HealthStatus.HEALTHY:
                overall_status = HealthStatus.DEGRADED
        except Exception as e:
            checks.append(HealthCheck(
                name="vectara_api",
                status=HealthStatus.UNHEALTHY,
                message=f"Vectara API check failed: {str(e)}"
            ))
            overall_status = HealthStatus.UNHEALTHY

        total_time = round((time.time() - start_time) * 1000, 2)

        return {
            "status": overall_status.value,
            "timestamp": time.time(),
            "response_time_ms": total_time,
            "checks": [
                {
                    "name": check.name,
                    "status": check.status.value,
                    "message": check.message,
                    "response_time_ms": check.response_time_ms,
                    "details": check.details
                }
                for check in checks
            ]
        }

    async def detailed_health_check(self) -> Dict[str, Any]:
        """Comprehensive health check with all system components.

        Provides detailed information about all system components,
        metrics, and configuration. Used for monitoring and debugging.

        Returns:
            Dict: Detailed health status
        """
        checks = []
        metrics = {}
        overall_status = HealthStatus.HEALTHY
        start_time = time.time()

        # Basic server info
        server_info = {
            "uptime_seconds": round(time.time() - self.server_start_time, 2),
            "version": __version__,
            "service": "vectara-mcp-server",
            "pid": os.getpid() if hasattr(os, 'getpid') else None
        }

        # Connection manager health
        try:
            connection_check = await self._check_connection_manager_detailed()
            checks.append(connection_check)
            if connection_check.status != HealthStatus.HEALTHY:
                overall_status = HealthStatus.DEGRADED
        except Exception as e:
            checks.append(HealthCheck(
                name="connection_manager_detailed",
                status=HealthStatus.UNHEALTHY,
                message=f"Detailed connection check failed: {str(e)}"
            ))
            overall_status = HealthStatus.UNHEALTHY

        # Vectara API connectivity
        try:
            vectara_check = await self._check_vectara_connectivity()
            checks.append(vectara_check)
            if vectara_check.status == HealthStatus.UNHEALTHY:
                overall_status = HealthStatus.UNHEALTHY
            elif vectara_check.status == HealthStatus.DEGRADED and overall_status == HealthStatus.HEALTHY:
                overall_status = HealthStatus.DEGRADED
        except Exception as e:
            checks.append(HealthCheck(
                name="vectara_api_detailed",
                status=HealthStatus.UNHEALTHY,
                message=f"Vectara API detailed check failed: {str(e)}"
            ))
            overall_status = HealthStatus.UNHEALTHY


        # Memory usage (if available)
        try:
            import psutil
            process = psutil.Process()
            metrics["memory"] = {
                "rss_mb": round(process.memory_info().rss / 1024 / 1024, 2),
                "vms_mb": round(process.memory_info().vms / 1024 / 1024, 2),
                "percent": round(process.memory_percent(), 2)
            }
        except ImportError:
            metrics["memory"] = {"error": "psutil not available"}
        except Exception as e:
            metrics["memory"] = {"error": str(e)}

        total_time = round((time.time() - start_time) * 1000, 2)

        return {
            "status": overall_status.value,
            "timestamp": time.time(),
            "response_time_ms": total_time,
            "server": server_info,
            "checks": [
                {
                    "name": check.name,
                    "status": check.status.value,
                    "message": check.message,
                    "response_time_ms": check.response_time_ms,
                    "details": check.details
                }
                for check in checks
            ],
            "metrics": metrics
        }

    async def _check_connection_manager(self) -> HealthCheck:
        """Check connection manager basic health."""
        start_time = time.time()

        try:
            manager = await get_connection_manager()
            stats = manager.get_stats()

            response_time = round((time.time() - start_time) * 1000, 2)

            if stats["session_initialized"]:
                return HealthCheck(
                    name="connection_manager",
                    status=HealthStatus.HEALTHY,
                    message="Connection manager initialized and ready",
                    response_time_ms=response_time,
                    details={"circuit_breaker_state": stats["circuit_breaker"]["state"]}
                )
            else:
                return HealthCheck(
                    name="connection_manager",
                    status=HealthStatus.UNHEALTHY,
                    message="Connection manager not initialized",
                    response_time_ms=response_time
                )

        except Exception as e:
            response_time = round((time.time() - start_time) * 1000, 2)
            return HealthCheck(
                name="connection_manager",
                status=HealthStatus.UNHEALTHY,
                message=f"Connection manager error: {str(e)}",
                response_time_ms=response_time
            )

    async def _check_connection_manager_detailed(self) -> HealthCheck:
        """Check connection manager detailed health."""
        start_time = time.time()

        try:
            manager = await get_connection_manager()
            stats = manager.get_stats()

            response_time = round((time.time() - start_time) * 1000, 2)

            circuit_state = stats["circuit_breaker"]["state"]
            failure_count = stats["circuit_breaker"]["failure_count"]

            if stats["session_initialized"]:
                if circuit_state == "open":
                    status = HealthStatus.UNHEALTHY
                    message = f"Circuit breaker OPEN with {failure_count} failures"
                elif circuit_state == "half_open":
                    status = HealthStatus.DEGRADED
                    message = "Circuit breaker testing recovery"
                elif failure_count > 0:
                    status = HealthStatus.DEGRADED
                    message = f"Recent failures: {failure_count}"
                else:
                    status = HealthStatus.HEALTHY
                    message = "Connection manager healthy"

                return HealthCheck(
                    name="connection_manager_detailed",
                    status=status,
                    message=message,
                    response_time_ms=response_time,
                    details=stats
                )
            else:
                return HealthCheck(
                    name="connection_manager_detailed",
                    status=HealthStatus.UNHEALTHY,
                    message="Connection manager not initialized",
                    response_time_ms=response_time
                )

        except Exception as e:
            response_time = round((time.time() - start_time) * 1000, 2)
            return HealthCheck(
                name="connection_manager_detailed",
                status=HealthStatus.UNHEALTHY,
                message=f"Connection manager error: {str(e)}",
                response_time_ms=response_time
            )

    async def _check_vectara_connectivity(self) -> HealthCheck:
        """Check Vectara API connectivity."""
        cache_key = "vectara_connectivity"

        # Check cache first
        if cache_key in self.last_check_cache:
            cached_result, cache_time = self.last_check_cache[cache_key]
            if time.time() - cache_time < self.cache_ttl:
                return cached_result

        start_time = time.time()

        try:
            manager = await get_connection_manager()
            health_result = await manager.health_check("https://api.vectara.io")

            response_time = round((time.time() - start_time) * 1000, 2)

            if health_result["status"] == "healthy":
                status = HealthStatus.HEALTHY
                message = f"Vectara API accessible ({health_result['response_time_ms']}ms)"
            else:
                status = HealthStatus.DEGRADED
                message = f"Vectara API issues: {health_result.get('error', 'Unknown error')}"

            result = HealthCheck(
                name="vectara_api",
                status=status,
                message=message,
                response_time_ms=response_time,
                details={
                    "api_response_time_ms": health_result.get("response_time_ms"),
                    "circuit_breaker_state": health_result.get("circuit_breaker_state")
                }
            )

            # Cache the result
            self.last_check_cache[cache_key] = (result, time.time())
            return result

        except Exception as e:
            response_time = round((time.time() - start_time) * 1000, 2)
            result = HealthCheck(
                name="vectara_api",
                status=HealthStatus.UNHEALTHY,
                message=f"Vectara API connectivity failed: {str(e)}",
                response_time_ms=response_time
            )

            # Cache the result
            self.last_check_cache[cache_key] = (result, time.time())
            return result


# Global health checker instance
health_checker = HealthChecker()


# Convenience functions for FastMCP integration
async def get_liveness() -> Dict[str, Any]:
    """Get liveness status."""
    return await health_checker.liveness_check()


async def get_readiness() -> Dict[str, Any]:
    """Get readiness status."""
    return await health_checker.readiness_check()


async def get_detailed_health() -> Dict[str, Any]:
    """Get detailed health status."""
    return await health_checker.detailed_health_check()


# Import os here to avoid issues if not available
import os
```

--------------------------------------------------------------------------------
/tests/test_health_checks.py:
--------------------------------------------------------------------------------

```python
"""
Tests for health check functionality.
"""

import pytest
import asyncio
import time
from unittest.mock import AsyncMock, MagicMock, patch

from vectara_mcp.health_checks import (
    HealthChecker,
    HealthStatus,
    HealthCheck,
    get_liveness,
    get_readiness,
    get_detailed_health
)


class TestHealthChecker:
    """Test health checker functionality."""

    @pytest.fixture
    def health_checker(self):
        """Create a health checker instance for testing."""
        return HealthChecker()

    @pytest.mark.asyncio
    async def test_liveness_check(self, health_checker):
        """Test basic liveness check."""
        result = await health_checker.liveness_check()

        assert result["status"] == HealthStatus.HEALTHY.value
        assert "timestamp" in result
        assert "uptime_seconds" in result
        assert result["version"] == "0.2.0"
        assert result["service"] == "vectara-mcp-server"
        assert result["uptime_seconds"] >= 0

    @pytest.mark.asyncio
    async def test_readiness_check_healthy(self, health_checker):
        """Test readiness check with healthy dependencies."""
        with patch.object(health_checker, '_check_connection_manager') as mock_conn:
            with patch.object(health_checker, '_check_vectara_connectivity') as mock_vectara:
                # Mock healthy responses
                mock_conn.return_value = HealthCheck(
                    name="connection_manager",
                    status=HealthStatus.HEALTHY,
                    message="Connection manager healthy",
                    response_time_ms=10.0
                )
                mock_vectara.return_value = HealthCheck(
                    name="vectara_api",
                    status=HealthStatus.HEALTHY,
                    message="Vectara API accessible",
                    response_time_ms=20.0
                )

                result = await health_checker.readiness_check()

                assert result["status"] == HealthStatus.HEALTHY.value
                assert "timestamp" in result
                assert "response_time_ms" in result
                assert len(result["checks"]) == 2

                # Check individual components
                check_names = [check["name"] for check in result["checks"]]
                assert "connection_manager" in check_names
                assert "vectara_api" in check_names

    @pytest.mark.asyncio
    async def test_readiness_check_unhealthy(self, health_checker):
        """Test readiness check with unhealthy dependencies."""
        with patch.object(health_checker, '_check_connection_manager') as mock_conn:
            with patch.object(health_checker, '_check_vectara_connectivity') as mock_vectara:
                # Mock unhealthy connection manager
                mock_conn.return_value = HealthCheck(
                    name="connection_manager",
                    status=HealthStatus.UNHEALTHY,
                    message="Connection manager error",
                    response_time_ms=5.0
                )
                mock_vectara.return_value = HealthCheck(
                    name="vectara_api",
                    status=HealthStatus.HEALTHY,
                    message="Vectara API accessible",
                    response_time_ms=20.0
                )

                result = await health_checker.readiness_check()

                assert result["status"] == HealthStatus.UNHEALTHY.value
                assert len(result["checks"]) == 2

    @pytest.mark.asyncio
    async def test_readiness_check_degraded(self, health_checker):
        """Test readiness check with degraded dependencies."""
        with patch.object(health_checker, '_check_connection_manager') as mock_conn:
            with patch.object(health_checker, '_check_vectara_connectivity') as mock_vectara:
                # Mock healthy connection but degraded API
                mock_conn.return_value = HealthCheck(
                    name="connection_manager",
                    status=HealthStatus.HEALTHY,
                    message="Connection manager healthy",
                    response_time_ms=10.0
                )
                mock_vectara.return_value = HealthCheck(
                    name="vectara_api",
                    status=HealthStatus.DEGRADED,
                    message="Vectara API slow response",
                    response_time_ms=5000.0
                )

                result = await health_checker.readiness_check()

                assert result["status"] == HealthStatus.DEGRADED.value

    @pytest.mark.asyncio
    async def test_detailed_health_check(self, health_checker):
        """Test detailed health check."""
        with patch.object(health_checker, '_check_connection_manager_detailed') as mock_conn:
            with patch.object(health_checker, '_check_vectara_connectivity') as mock_vectara:
                # Mock detailed responses
                mock_conn.return_value = HealthCheck(
                    name="connection_manager_detailed",
                    status=HealthStatus.HEALTHY,
                    message="Connection manager healthy",
                    response_time_ms=15.0,
                    details={"circuit_breaker_state": "closed"}
                )
                mock_vectara.return_value = HealthCheck(
                    name="vectara_api",
                    status=HealthStatus.HEALTHY,
                    message="Vectara API accessible",
                    response_time_ms=25.0
                )

                result = await health_checker.detailed_health_check()

                assert result["status"] == HealthStatus.HEALTHY.value
                assert "server" in result
                assert "metrics" in result
                assert "checks" in result
                assert result["server"]["service"] == "vectara-mcp-server"

    @pytest.mark.asyncio
    async def test_connection_manager_check_healthy(self, health_checker):
        """Test connection manager health check when healthy."""
        with patch('vectara_mcp.health_checks.get_connection_manager') as mock_get_manager:
            mock_manager = MagicMock()
            mock_manager.get_stats.return_value = {
                "session_initialized": True,
                "circuit_breaker": {"state": "closed"}
            }
            mock_get_manager.return_value = mock_manager

            result = await health_checker._check_connection_manager()

            assert result.name == "connection_manager"
            assert result.status == HealthStatus.HEALTHY
            assert "initialized and ready" in result.message
            assert result.response_time_ms is not None

    @pytest.mark.asyncio
    async def test_connection_manager_check_unhealthy(self, health_checker):
        """Test connection manager health check when unhealthy."""
        with patch('vectara_mcp.health_checks.get_connection_manager') as mock_get_manager:
            mock_manager = MagicMock()
            mock_manager.get_stats.return_value = {
                "session_initialized": False,
                "circuit_breaker": {"state": "closed"}
            }
            mock_get_manager.return_value = mock_manager

            result = await health_checker._check_connection_manager()

            assert result.name == "connection_manager"
            assert result.status == HealthStatus.UNHEALTHY
            assert "not initialized" in result.message

    @pytest.mark.asyncio
    async def test_connection_manager_check_exception(self, health_checker):
        """Test connection manager health check with exception."""
        with patch('vectara_mcp.health_checks.get_connection_manager') as mock_get_manager:
            mock_get_manager.side_effect = Exception("Connection failed")

            result = await health_checker._check_connection_manager()

            assert result.name == "connection_manager"
            assert result.status == HealthStatus.UNHEALTHY
            assert "Connection failed" in result.message

    @pytest.mark.asyncio
    async def test_vectara_connectivity_check_healthy(self, health_checker):
        """Test Vectara API connectivity check when healthy."""
        with patch('vectara_mcp.health_checks.get_connection_manager') as mock_get_manager:
            mock_manager = AsyncMock()
            mock_manager.health_check.return_value = {
                "status": "healthy",
                "response_time_ms": 150.0,
                "circuit_breaker_state": "closed"
            }
            mock_get_manager.return_value = mock_manager

            result = await health_checker._check_vectara_connectivity()

            assert result.name == "vectara_api"
            assert result.status == HealthStatus.HEALTHY
            assert "accessible" in result.message

    @pytest.mark.asyncio
    async def test_vectara_connectivity_check_degraded(self, health_checker):
        """Test Vectara API connectivity check when degraded."""
        with patch('vectara_mcp.health_checks.get_connection_manager') as mock_get_manager:
            mock_manager = AsyncMock()
            mock_manager.health_check.return_value = {
                "status": "unhealthy",
                "error": "Timeout",
                "circuit_breaker_state": "open"
            }
            mock_get_manager.return_value = mock_manager

            result = await health_checker._check_vectara_connectivity()

            assert result.name == "vectara_api"
            assert result.status == HealthStatus.DEGRADED
            assert "issues" in result.message

    @pytest.mark.asyncio
    async def test_vectara_connectivity_check_exception(self, health_checker):
        """Test Vectara API connectivity check with exception."""
        with patch('vectara_mcp.health_checks.get_connection_manager') as mock_get_manager:
            mock_get_manager.side_effect = Exception("Network error")

            result = await health_checker._check_vectara_connectivity()

            assert result.name == "vectara_api"
            assert result.status == HealthStatus.UNHEALTHY
            assert "Network error" in result.message

    @pytest.mark.asyncio
    async def test_detailed_connection_manager_check_circuit_open(self, health_checker):
        """Test detailed connection manager check with open circuit."""
        with patch('vectara_mcp.health_checks.get_connection_manager') as mock_get_manager:
            mock_manager = MagicMock()
            mock_manager.get_stats.return_value = {
                "session_initialized": True,
                "circuit_breaker": {
                    "state": "open",
                    "failure_count": 5
                }
            }
            mock_get_manager.return_value = mock_manager

            result = await health_checker._check_connection_manager_detailed()

            assert result.name == "connection_manager_detailed"
            assert result.status == HealthStatus.UNHEALTHY
            assert "OPEN" in result.message

    @pytest.mark.asyncio
    async def test_detailed_connection_manager_check_circuit_half_open(self, health_checker):
        """Test detailed connection manager check with half-open circuit."""
        with patch('vectara_mcp.health_checks.get_connection_manager') as mock_get_manager:
            mock_manager = MagicMock()
            mock_manager.get_stats.return_value = {
                "session_initialized": True,
                "circuit_breaker": {
                    "state": "half_open",
                    "failure_count": 3
                }
            }
            mock_get_manager.return_value = mock_manager

            result = await health_checker._check_connection_manager_detailed()

            assert result.name == "connection_manager_detailed"
            assert result.status == HealthStatus.DEGRADED
            assert "testing recovery" in result.message

    @pytest.mark.asyncio
    async def test_cache_functionality(self, health_checker):
        """Test that connectivity check results are cached."""
        with patch('vectara_mcp.health_checks.get_connection_manager') as mock_get_manager:
            mock_manager = AsyncMock()
            mock_manager.health_check.return_value = {
                "status": "healthy",
                "response_time_ms": 100.0,
                "circuit_breaker_state": "closed"
            }
            mock_get_manager.return_value = mock_manager

            # First call
            result1 = await health_checker._check_vectara_connectivity()

            # Second call (should use cache)
            result2 = await health_checker._check_vectara_connectivity()

            # Should only call the manager once due to caching
            assert mock_manager.health_check.call_count == 1
            assert result1.status == result2.status

    def test_health_status_enum(self):
        """Test HealthStatus enum values."""
        assert HealthStatus.HEALTHY.value == "healthy"
        assert HealthStatus.UNHEALTHY.value == "unhealthy"
        assert HealthStatus.DEGRADED.value == "degraded"
        assert HealthStatus.UNKNOWN.value == "unknown"

    def test_health_check_dataclass(self):
        """Test HealthCheck dataclass."""
        check = HealthCheck(
            name="test_check",
            status=HealthStatus.HEALTHY,
            message="Test message",
            response_time_ms=100.0,
            details={"key": "value"}
        )

        assert check.name == "test_check"
        assert check.status == HealthStatus.HEALTHY
        assert check.message == "Test message"
        assert check.response_time_ms == 100.0
        assert check.details == {"key": "value"}


class TestHealthCheckEndpoints:
    """Test health check endpoint functions."""

    @pytest.mark.asyncio
    async def test_get_liveness(self):
        """Test get_liveness function."""
        result = await get_liveness()

        assert result["status"] == HealthStatus.HEALTHY.value
        assert "uptime_seconds" in result
        assert result["service"] == "vectara-mcp-server"

    @pytest.mark.asyncio
    async def test_get_readiness(self):
        """Test get_readiness function."""
        with patch('vectara_mcp.health_checks.health_checker') as mock_checker:
            async def mock_readiness():
                return {
                    "status": HealthStatus.HEALTHY.value,
                    "checks": []
                }
            mock_checker.readiness_check = mock_readiness

            result = await get_readiness()

            assert result["status"] == HealthStatus.HEALTHY.value

    @pytest.mark.asyncio
    async def test_get_detailed_health(self):
        """Test get_detailed_health function."""
        with patch('vectara_mcp.health_checks.health_checker') as mock_checker:
            async def mock_detailed():
                return {
                    "status": HealthStatus.HEALTHY.value,
                    "checks": [],
                    "metrics": {}
                }
            mock_checker.detailed_health_check = mock_detailed

            result = await get_detailed_health()

            assert result["status"] == HealthStatus.HEALTHY.value
```

--------------------------------------------------------------------------------
/tests/test_server.py:
--------------------------------------------------------------------------------

```python
import pytest
import json
import os
import sys
from unittest.mock import AsyncMock, patch, MagicMock
import aiohttp
from mcp.server.fastmcp import Context

from vectara_mcp.server import (
    ask_vectara,
    search_vectara,
    correct_hallucinations,
    eval_factual_consistency,
    main
)
from vectara_mcp.auth import AuthMiddleware


class TestVectaraTools:
    """Test suite for Vectara MCP tools with new API key management"""

    @pytest.fixture
    def mock_context(self):
        """Create a mock context for testing"""
        context = AsyncMock(spec=Context)
        context.info = MagicMock()
        context.report_progress = AsyncMock()
        return context

    @pytest.fixture(autouse=True)
    def clear_stored_api_key(self):
        """Clear stored API key before each test"""
        import vectara_mcp.server
        vectara_mcp.server._stored_api_key = None
        vectara_mcp.server._auth_required = True
        yield
        vectara_mcp.server._stored_api_key = None
        vectara_mcp.server._auth_required = True

    @pytest.fixture
    def mock_api_key(self):
        """Mock API key storage for tests that need it"""
        import vectara_mcp.server
        vectara_mcp.server._stored_api_key = "test-api-key"
        return "test-api-key"

    # ASK_VECTARA TESTS
    @pytest.mark.asyncio
    async def test_ask_vectara_missing_query(self, mock_context, mock_api_key):
        """Test ask_vectara with missing query"""
        result = await ask_vectara(
            query="",
            ctx=mock_context,
            corpus_keys=["test-corpus"]
        )
        assert result == {"error": "Query is required."}

    @pytest.mark.asyncio
    async def test_ask_vectara_missing_corpus_keys(self, mock_context, mock_api_key):
        """Test ask_vectara with missing corpus keys"""
        result = await ask_vectara(
            query="test query",
            ctx=mock_context,
            corpus_keys=[]
        )
        assert result == {"error": "Corpus keys are required. Please ask the user to provide one or more corpus keys."}

    @pytest.mark.asyncio
    @patch.dict('os.environ', {}, clear=True)
    async def test_ask_vectara_missing_api_key(self, mock_context):
        """Test ask_vectara with missing API key"""
        result = await ask_vectara(
            query="test query",
            ctx=mock_context,
            corpus_keys=["test-corpus"]
        )
        assert result == {"error": "API key not configured. Please use 'setup_vectara_api_key' tool first or set VECTARA_API_KEY environment variable."}

    @pytest.mark.asyncio
    @patch('vectara_mcp.server._call_vectara_query')
    async def test_ask_vectara_success(self, mock_api_call, mock_context, mock_api_key):
        """Test successful ask_vectara call"""
        mock_api_call.return_value = {
            "summary": "Test response summary",
            "search_results": [
                {
                    "score": 0.95,
                    "text": "Test citation text",
                    "document_metadata": {"title": "Test Source"}
                }
            ]
        }

        result = await ask_vectara(
            query="test query",
            ctx=mock_context,
            corpus_keys=["test-corpus"]
        )

        # Check the structured response format
        assert result["summary"] == "Test response summary"
        assert "citations" in result
        assert len(result["citations"]) == 1

        # Check citation details
        citation = result["citations"][0]
        assert citation["id"] == 1
        assert citation["score"] == 0.95
        assert citation["text"] == "Test citation text"
        assert citation["document_metadata"] == {"title": "Test Source"}
        mock_context.info.assert_called_once_with("Running Vectara RAG query: test query")
        mock_api_call.assert_called_once()

    @pytest.mark.asyncio
    @patch('vectara_mcp.server._call_vectara_query')
    async def test_ask_vectara_exception(self, mock_api_call, mock_context, mock_api_key):
        """Test ask_vectara with exception"""
        mock_api_call.side_effect = Exception("API Error")

        result = await ask_vectara(
            query="test query",
            ctx=mock_context,
            corpus_keys=["test-corpus"]
        )

        assert result == {"error": "Error with Vectara RAG query: API Error"}

    # SEARCH_VECTARA TESTS
    @pytest.mark.asyncio
    async def test_search_vectara_missing_query(self, mock_context, mock_api_key):
        """Test search_vectara with missing query"""
        result = await search_vectara(
            query="",
            ctx=mock_context,
            corpus_keys=["test-corpus"]
        )
        assert result == {"error": "Query is required."}

    @pytest.mark.asyncio
    @patch('vectara_mcp.server._call_vectara_query')
    async def test_search_vectara_success(self, mock_api_call, mock_context, mock_api_key):
        """Test successful search_vectara call"""
        mock_api_call.return_value = {
            "search_results": [
                {
                    "score": 0.95,
                    "text": "Test search result text",
                    "document_metadata": {"title": "Test Document"}
                }
            ]
        }

        result = await search_vectara(
            query="test query",
            ctx=mock_context,
            corpus_keys=["test-corpus"]
        )

        assert isinstance(result, dict)
        assert "search_results" in result
        assert len(result["search_results"]) == 1
        assert result["search_results"][0]["score"] == 0.95
        assert result["search_results"][0]["text"] == "Test search result text"
        assert result["search_results"][0]["document_metadata"]["title"] == "Test Document"
        mock_context.info.assert_called_once_with("Running Vectara semantic search query: test query")
        mock_api_call.assert_called_once()

    # TRANSPORT AND AUTH TESTS
    def test_auth_middleware_validation(self):
        """Test authentication middleware validation"""
        auth = AuthMiddleware(auth_required=True)

        # Valid token
        os.environ["VECTARA_API_KEY"] = "test-key"
        auth.valid_tokens = {"test-key"}
        assert auth.validate_token("test-key") is True
        assert auth.validate_token("Bearer test-key") is True

        # Invalid token
        assert auth.validate_token("invalid-key") is False
        assert auth.validate_token(None) is False

        # Auth disabled
        auth_disabled = AuthMiddleware(auth_required=False)
        assert auth_disabled.validate_token(None) is True

        # Clean up
        if "VECTARA_API_KEY" in os.environ:
            del os.environ["VECTARA_API_KEY"]

    def test_token_extraction_from_headers(self):
        """Test token extraction from different header formats"""
        auth = AuthMiddleware()

        # Authorization header
        headers = {"Authorization": "Bearer test-token"}
        assert auth.extract_token_from_headers(headers) == "Bearer test-token"

        # X-API-Key header
        headers = {"X-API-Key": "test-token"}
        assert auth.extract_token_from_headers(headers) == "Bearer test-token"

        # Case insensitive
        headers = {"authorization": "Bearer test-token"}
        assert auth.extract_token_from_headers(headers) == "Bearer test-token"

        # No token
        headers = {}
        assert auth.extract_token_from_headers(headers) is None

    @patch('sys.argv', ['test', '--stdio'])
    def test_main_stdio_transport(self, capsys):
        """Test main function with STDIO transport"""
        with patch('vectara_mcp.server.mcp.run') as mock_run:
            with pytest.raises(SystemExit):
                main()

            mock_run.assert_called_once_with()
            captured = capsys.readouterr()
            assert "STDIO transport is less secure" in captured.err

    @patch('sys.argv', ['test', '--transport', 'http'])
    def test_main_http_transport(self, capsys):
        """Test main function with HTTP transport"""
        with patch('vectara_mcp.server.mcp.run') as mock_run:
            with pytest.raises(SystemExit):
                main()

            mock_run.assert_called_once_with(transport='http', host='127.0.0.1', port=8000)
            captured = capsys.readouterr()
            assert "HTTP mode" in captured.err
            assert "Authentication: enabled" in captured.err

    @patch('sys.argv', ['test', '--no-auth'])
    def test_main_no_auth_warning(self, capsys):
        """Test main function shows warning when auth is disabled"""
        with patch('vectara_mcp.server.mcp.run') as mock_run:
            with pytest.raises(SystemExit):
                main()

            captured = capsys.readouterr()
            assert "Authentication disabled" in captured.err
            assert "NEVER use in production" in captured.err

    # ENVIRONMENT VARIABLES TESTS
    @patch.dict('os.environ', {'VECTARA_TRANSPORT': 'sse', 'VECTARA_AUTH_REQUIRED': 'false'}, clear=False)
    def test_environment_variables(self):
        """Test that environment variables are respected"""
        # This test would require integration with actual argument parsing
        # For now, just test that the environment variables exist
        assert os.getenv('VECTARA_TRANSPORT') == 'sse'
        assert os.getenv('VECTARA_AUTH_REQUIRED') == 'false'

    # CORRECT_HALLUCINATIONS TESTS
    @pytest.mark.asyncio
    async def test_correct_hallucinations_missing_text(self, mock_context, mock_api_key):
        """Test correct_hallucinations with missing text"""
        result = await correct_hallucinations(
            generated_text="",
            documents=["doc1"],
            ctx=mock_context
        )
        assert result == {"error": "Generated text is required."}

    @pytest.mark.asyncio
    async def test_correct_hallucinations_missing_source_documents(self, mock_context, mock_api_key):
        """Test correct_hallucinations with missing source documents"""
        result = await correct_hallucinations(
            generated_text="test text",
            documents=[],
            ctx=mock_context
        )
        assert result == {"error": "Documents are required."}

    @pytest.mark.asyncio
    @patch.dict('os.environ', {}, clear=True)
    async def test_correct_hallucinations_missing_api_key(self, mock_context):
        """Test correct_hallucinations with missing API key"""
        result = await correct_hallucinations(
            generated_text="test text",
            documents=["doc1"],
            ctx=mock_context
        )
        assert result == {"error": "API key not configured. Please use 'setup_vectara_api_key' tool first or set VECTARA_API_KEY environment variable."}

    @pytest.mark.asyncio
    @patch('vectara_mcp.server._make_api_request')
    async def test_correct_hallucinations_success(self, mock_api_request, mock_context, mock_api_key):
        """Test successful correct_hallucinations call"""
        mock_api_request.return_value = {"corrected_text": "Corrected version", "hallucinations": []}

        result = await correct_hallucinations(
            generated_text="test text with potential hallucination",
            documents=["Source document content"],
            ctx=mock_context
        )

        expected_result = {"corrected_text": "Corrected version", "hallucinations": []}
        assert result == expected_result
        mock_context.info.assert_called_once()

    @pytest.mark.asyncio
    @patch('vectara_mcp.server._make_api_request')
    async def test_correct_hallucinations_403_error(self, mock_api_request, mock_context, mock_api_key):
        """Test correct_hallucinations with 403 permission error"""
        mock_api_request.side_effect = Exception("Permissions do not allow hallucination correction.")

        result = await correct_hallucinations(
            generated_text="test text",
            documents=["doc1"],
            ctx=mock_context
        )

        assert result == {"error": "Error with hallucination correction: Permissions do not allow hallucination correction."}

    @pytest.mark.asyncio
    @patch('vectara_mcp.server._make_api_request')
    async def test_correct_hallucinations_400_error(self, mock_api_request, mock_context, mock_api_key):
        """Test correct_hallucinations with 400 bad request error"""
        mock_api_request.side_effect = Exception("Bad request: Invalid request format")

        result = await correct_hallucinations(
            generated_text="test text",
            documents=["doc1"],
            ctx=mock_context
        )

        assert result == {"error": "Error with hallucination correction: Bad request: Invalid request format"}

    # EVAL_FACTUAL_CONSISTENCY TESTS
    @pytest.mark.asyncio
    async def test_eval_factual_consistency_missing_text(self, mock_context, mock_api_key):
        """Test eval_factual_consistency with missing text"""
        result = await eval_factual_consistency(
            generated_text="",
            documents=["doc1"],
            ctx=mock_context
        )
        assert result == {"error": "Generated text is required."}

    @pytest.mark.asyncio
    async def test_eval_factual_consistency_missing_source_documents(self, mock_context, mock_api_key):
        """Test eval_factual_consistency with missing source documents"""
        result = await eval_factual_consistency(
            generated_text="test text",
            documents=[],
            ctx=mock_context
        )
        assert result == {"error": "Documents are required."}

    @pytest.mark.asyncio
    @patch.dict('os.environ', {}, clear=True)
    async def test_eval_factual_consistency_missing_api_key(self, mock_context):
        """Test eval_factual_consistency with missing API key"""
        result = await eval_factual_consistency(
            generated_text="test text",
            documents=["doc1"],
            ctx=mock_context
        )
        assert result == {"error": "API key not configured. Please use 'setup_vectara_api_key' tool first or set VECTARA_API_KEY environment variable."}

    @pytest.mark.asyncio
    @patch('vectara_mcp.server._make_api_request')
    async def test_eval_factual_consistency_success(self, mock_api_request, mock_context, mock_api_key):
        """Test successful eval_factual_consistency call"""
        mock_api_request.return_value = {"consistency_score": 0.85, "inconsistencies": []}

        result = await eval_factual_consistency(
            generated_text="test text for consistency check",
            documents=["Source document content"],
            ctx=mock_context
        )

        expected_result = {"consistency_score": 0.85, "inconsistencies": []}
        assert result == expected_result
        mock_context.info.assert_called_once()

    @pytest.mark.asyncio
    @patch('vectara_mcp.server._make_api_request')
    async def test_eval_factual_consistency_422_error(self, mock_api_request, mock_context, mock_api_key):
        """Test eval_factual_consistency with 422 language not supported error"""
        mock_api_request.side_effect = Exception("Language not supported by service.")

        result = await eval_factual_consistency(
            generated_text="test text",
            documents=["doc1"],
            ctx=mock_context
        )

        assert result == {"error": "Error with factual consistency evaluation: Language not supported by service."}

    @pytest.mark.asyncio
    @patch('vectara_mcp.server._make_api_request')
    async def test_eval_factual_consistency_exception(self, mock_api_request, mock_context, mock_api_key):
        """Test eval_factual_consistency with exception"""
        mock_api_request.side_effect = Exception("Network error")

        result = await eval_factual_consistency(
            generated_text="test text",
            documents=["doc1"],
            ctx=mock_context
        )

        assert result == {"error": "Error with factual consistency evaluation: Network error"}

    @pytest.mark.asyncio
    @patch('vectara_mcp.server._make_api_request')
    async def test_correct_hallucinations_exception(self, mock_api_request, mock_context, mock_api_key):
        """Test correct_hallucinations with exception"""
        mock_api_request.side_effect = Exception("Network error")

        result = await correct_hallucinations(
            generated_text="test text",
            documents=["doc1"],
            ctx=mock_context
        )

        assert result == {"error": "Error with hallucination correction: Network error"}
```

--------------------------------------------------------------------------------
/vectara_mcp/server.py:
--------------------------------------------------------------------------------

```python
import logging
import json
import aiohttp
import os
import argparse
import sys
import atexit
import signal
import asyncio

logger = logging.getLogger(__name__)

from mcp.server.fastmcp import FastMCP, Context
from vectara_mcp.auth import (
    AuthMiddleware,
    require_auth,
    add_security_headers,
    RateLimiter,
    validate_origin
)
from vectara_mcp.connection_manager import (
    get_connection_manager,
    cleanup_connections
)
from vectara_mcp.health_checks import (
    get_liveness,
    get_readiness,
    get_detailed_health
)
from vectara_mcp import __version__

logging.basicConfig(level=logging.INFO)

# Constants
VECTARA_BASE_URL = "https://api.vectara.io/v2"
VHC_MODEL_NAME = "vhc-large-1.0"
DEFAULT_LANGUAGE = "en"
API_KEY_ERROR_MESSAGE = "API key not configured. Please use 'setup_vectara_api_key' tool first or set VECTARA_API_KEY environment variable."

# Rate limiting configuration
DEFAULT_MAX_REQUESTS = 100
DEFAULT_WINDOW_SECONDS = 60

# Create the Vectara MCP server
mcp = FastMCP("vectara")

# Initialize authentication and security components
auth_middleware = None
rate_limiter = RateLimiter(max_requests=DEFAULT_MAX_REQUESTS, window_seconds=DEFAULT_WINDOW_SECONDS)

# Global API key storage (session-scoped)
_stored_api_key: str | None = None
# Global authentication requirement flag
_auth_required: bool = True

def initialize_auth(auth_required: bool):
    """Initialize authentication middleware.

    Args:
        auth_required: Whether authentication is required
    """
    global auth_middleware
    auth_middleware = AuthMiddleware(auth_required=auth_required)

def _mask_api_key(api_key: str) -> str:
    """Mask API key for safe logging/display.

    Args:
        api_key: The API key to mask

    Returns:
        str: Masked API key showing only first 4 and last 4 characters
    """
    if not api_key or len(api_key) < 8:
        return "***"
    return f"{api_key[:4]}***{api_key[-4:]}"

def _get_api_key() -> str | None:
    """Get API key with fallback priority: stored > environment > None.

    Returns:
        str: API key if available, None otherwise
    """
    global _stored_api_key

    # Priority 1: Stored API key
    if _stored_api_key:
        return _stored_api_key

    # Priority 2: Environment variable; good for local deployments
    env_key = os.getenv("VECTARA_API_KEY")
    if env_key:
        return env_key

    # Priority 3: None (will trigger error in validation)
    return None

def _validate_common_parameters(query: str = "", corpus_keys: list[str] = None) -> str | None:
    """Validate common parameters used across Vectara tools.

    Returns:
        str: Error message if validation fails, None if valid
    """
    if not query:
        return "Query is required."
    if not corpus_keys:
        return "Corpus keys are required. Please ask the user to provide one or more corpus keys."

    # Check API key availability
    api_key = _get_api_key()
    if not api_key:
        return API_KEY_ERROR_MESSAGE

    return None


def _validate_api_key(api_key_override: str = None) -> str:
    """Validate and return API key, raise exception if not found.

    Args:
        api_key_override: Optional API key override for testing

    Returns:
        str: Valid API key

    Raises:
        Exception: If no API key is configured
    """
    api_key = api_key_override or _get_api_key()
    if not api_key:
        raise Exception("API key not configured. Please use 'setup_vectara_api_key' tool first.")
    return api_key

def _build_headers(api_key: str) -> dict:
    """Build standard HTTP headers for Vectara API calls.

    Args:
        api_key: The API key to include in headers

    Returns:
        dict: Standard headers for Vectara API requests
    """
    return {
        "x-api-key": api_key,
        "Content-Type": "application/json",
        "Accept": "application/json"
    }

async def _handle_http_response(response: aiohttp.ClientResponse, error_context: str = "API") -> dict:
    """Handle HTTP response with unified error handling.

    Args:
        response: The aiohttp response object
        error_context: Context string for error messages (e.g., "query", "hallucination correction")

    Returns:
        dict: Response JSON data

    Raises:
        Exception: With descriptive error message based on status code
    """
    if response.status == 200:
        return await response.json()
    elif response.status == 400:
        error_text = await response.text()
        raise Exception(f"Bad request: {error_text}")
    elif response.status == 403:
        if "hallucination" in error_context.lower():
            raise Exception(f"Permissions do not allow {error_context}.")
        else:
            raise Exception("Permission denied. Check your API key and corpus access.")
    elif response.status == 404:
        raise Exception("Corpus not found. Check your corpus keys.")
    elif response.status == 422:
        raise Exception("Language not supported by service.")
    else:
        error_text = await response.text()
        raise Exception(f"API error {response.status}: {error_text}")

async def _make_api_request(
    url: str,
    payload: dict,
    ctx: Context = None,
    api_key_override: str = None,
    error_context: str = "API"
) -> dict:
    """Generic HTTP POST request with progress reporting and error handling.

    Uses persistent connection pooling and circuit breaker pattern.

    Args:
        url: The API endpoint URL
        payload: Request payload
        ctx: MCP context for progress reporting
        api_key_override: Optional API key override for testing
        error_context: Context for error messages

    Returns:
        dict: API response data

    Raises:
        Exception: With descriptive error message
    """
    api_key = _validate_api_key(api_key_override)
    headers = _build_headers(api_key)

    # Get connection manager with persistent session
    conn_manager = await get_connection_manager()

    if ctx:
        await ctx.report_progress(0, 1)

    try:
        # Use persistent session with circuit breaker protection
        response = await conn_manager.request(
            method='POST',
            url=url,
            headers=headers,
            json_data=payload
        )

        if ctx:
            await ctx.report_progress(1, 1)

        # Handle response using existing logic
        async with response:
            return await _handle_http_response(response, error_context)

    except Exception as e:
        # Log the error with context
        logger.error(f"API request failed: {error_context} - {str(e)}")
        raise

def _build_query_payload(
    query: str,
    corpus_keys: list[str],
    n_sentences_before: int = 2,
    n_sentences_after: int = 2,
    lexical_interpolation: float = 0.005,
    max_used_search_results: int = 10,
    generation_preset_name: str = "vectara-summary-table-md-query-ext-jan-2025-gpt-4o",
    response_language: str = "eng",
    enable_generation: bool = True
) -> dict:
    """Build the query payload for Vectara API"""
    payload = {
        "query": query,
        "search": {
            "limit": 100,
            "corpora": [
                {
                    "corpus_key": corpus_key,
                    "lexical_interpolation": lexical_interpolation
                } for corpus_key in corpus_keys
            ],
            "context_configuration": {
                "sentences_before": n_sentences_before,
                "sentences_after": n_sentences_after
            },
            "reranker": {
                "type": "customer_reranker",
                "reranker_name": "Rerank_Multilingual_v1",
                "limit": 100,
                "cutoff": 0.2
            }
        },
        "save_history": True,
    }

    if enable_generation:
        payload["generation"] = {
            "generation_preset_name": generation_preset_name,
            "max_used_search_results": max_used_search_results,
            "response_language": response_language,
            "citations": {
                "style": "markdown",
                "url_pattern": "{doc.url}",
                "text_pattern": "{doc.title}"
            },
            "enable_factual_consistency_score": True
        }

    return payload

async def _call_vectara_query(
    payload: dict,
    ctx: Context = None,
    api_key_override: str = None
) -> dict:
    """Make API call to Vectara query endpoint"""
    return await _make_api_request(
        f"{VECTARA_BASE_URL}/query",
        payload,
        ctx,
        api_key_override,
        "query"
    )


def _format_error(tool_name: str, error: Exception) -> str:
    """Format error messages consistently across tools.

    Args:
        tool_name: Name of the tool (e.g., "Vectara RAG query")
        error: The exception that occurred

    Returns:
        str: Formatted error message
    """
    return f"Error with {tool_name}: {str(error)}"

# API Key Management Tools
@mcp.tool()
async def setup_vectara_api_key(
    api_key: str,
    ctx: Context
) -> str:
    """
    Configure and validate the Vectara API key for the session.

    Args:
        api_key: str, The Vectara API key to configure - required.

    Returns:
        str: Success message with masked API key or error message.
    """
    global _stored_api_key

    if not api_key:
        return "API key is required."

    if ctx:
        ctx.info(f"Setting up Vectara API key: {_mask_api_key(api_key)}")

    try:
        # Test the API key with a minimal query to validate it
        test_payload = _build_query_payload(
            query="test",
            corpus_keys=["test"],  # This will likely fail but we just want to test API key auth
            enable_generation=False
        )

        # Use our existing query function with the test API key
        await _call_vectara_query(test_payload, ctx, api_key_override=api_key)

        # If we get here without exception, API key is valid
        _stored_api_key = api_key
        masked_key = _mask_api_key(api_key)
        return f"API key configured successfully: {masked_key}"

    except Exception as e:
        error_msg = str(e)
        if "403" in error_msg or "401" in error_msg or "Permission denied" in error_msg or "API key error" in error_msg:
            return "Invalid API key. Please check your Vectara API key and try again."
        elif any(status in error_msg for status in ["400", "404", "Bad request", "Corpus not found"]):
            # These errors indicate API key is valid but request failed for other reasons
            _stored_api_key = api_key
            masked_key = _mask_api_key(api_key)
            return f"API key configured successfully: {masked_key}"
        else:
            return f"API validation failed: {error_msg}"

@mcp.tool()
async def clear_vectara_api_key(ctx: Context) -> str:
    """
    Clear the stored Vectara API key from server memory.

    Returns:
        str: Confirmation message.
    """
    global _stored_api_key

    if ctx:
        ctx.info("Clearing stored Vectara API key")

    _stored_api_key = None
    return "API key cleared from server memory."


# Health Check Tools
@mcp.tool()
async def health_check(
    ctx: Context
) -> dict:
    """
    Get server liveness status (basic health check).

    This endpoint checks if the server process is running and responding.
    Used by load balancers to determine if traffic should be routed here.

    Returns:
        dict: Server liveness status with uptime and version info.
    """
    if ctx:
        ctx.info("Performing health check")

    try:
        return await get_liveness()
    except Exception as e:
        return {"error": _format_error("health check", e)}


@mcp.tool()
async def readiness_check(
    ctx: Context
) -> dict:
    """
    Get server readiness status (dependency health check).

    This endpoint checks if the server can handle traffic by validating
    critical dependencies like connection manager and Vectara API connectivity.
    Used by orchestration platforms to determine deployment readiness.

    Returns:
        dict: Server readiness status with dependency check results.
    """
    if ctx:
        ctx.info("Performing readiness check")

    try:
        return await get_readiness()
    except Exception as e:
        return {"error": _format_error("readiness check", e)}


@mcp.tool()
async def detailed_health_check(
    ctx: Context
) -> dict:
    """
    Get comprehensive server health status with detailed metrics.

    This endpoint provides detailed information about all system components,
    performance metrics, connection pool status, and configuration.
    Used for monitoring, debugging, and operational visibility.

    Returns:
        dict: Comprehensive health status with detailed metrics and component states.
    """
    if ctx:
        ctx.info("Performing detailed health check")

    try:
        return await get_detailed_health()
    except Exception as e:
        return {"error": _format_error("detailed health check", e)}


@mcp.tool()
async def get_server_stats(
    ctx: Context
) -> dict:
    """
    Get server statistics and metrics for monitoring.

    Provides runtime statistics including connection pool status,
    circuit breaker state, retry metrics, and performance data.

    Returns:
        dict: Server statistics and operational metrics.
    """
    if ctx:
        ctx.info("Getting server statistics")

    try:
        from vectara_mcp.connection_manager import connection_manager

        stats = {
            "connection_manager": connection_manager.get_stats(),
            "server_info": {
                "version": __version__,
                "transport": "http",  # Could be made dynamic
                "auth_enabled": bool(_auth_required)
            }
        }

        return stats
    except Exception as e:
        return {"error": _format_error("server stats", e)}


# Query tool
@mcp.tool()
async def ask_vectara(
    query: str,
    ctx: Context,
    corpus_keys: list[str],
    n_sentences_before: int = 2,
    n_sentences_after: int = 2,
    lexical_interpolation: float = 0.005,
    max_used_search_results: int = 10,
    generation_preset_name: str = "vectara-summary-table-md-query-ext-jan-2025-gpt-4o",
    response_language: str = "eng",
) -> dict:
    """
    Run a RAG query using Vectara, returning search results with a generated response.

    Args:
        query: str, The user query to run - required.
        corpus_keys: list[str], List of Vectara corpus keys to use for the search - required. Please ask the user to provide one or more corpus keys.
        n_sentences_before: int, Number of sentences before the answer to include in the context - optional, default is 2.
        n_sentences_after: int, Number of sentences after the answer to include in the context - optional, default is 2.
        lexical_interpolation: float, The amount of lexical interpolation to use - optional, default is 0.005.
        max_used_search_results: int, The maximum number of search results to use - optional, default is 10.
        generation_preset_name: str, The name of the generation preset to use - optional, default is "vectara-summary-table-md-query-ext-jan-2025-gpt-4o".
        response_language: str, The language of the response - optional, default is "eng".

    Note: API key must be configured first using 'setup_vectara_api_key' tool

    Returns:
        dict: Structured response containing:
            - "summary": Generated AI summary with markdown citations
            - "citations": List of citation objects with score, text, and metadata
            - "factual_consistency_score": Score indicating factual consistency (if available)
        On error, returns dict with "error" key containing error message.
    """
    # Validate parameters
    validation_error = _validate_common_parameters(query, corpus_keys)
    if validation_error:
        return {"error": validation_error}

    if ctx:
        ctx.info(f"Running Vectara RAG query: {query}")

    try:
        payload = _build_query_payload(
            query=query,
            corpus_keys=corpus_keys,
            n_sentences_before=n_sentences_before,
            n_sentences_after=n_sentences_after,
            lexical_interpolation=lexical_interpolation,
            max_used_search_results=max_used_search_results,
            generation_preset_name=generation_preset_name,
            response_language=response_language,
            enable_generation=True
        )

        result = await _call_vectara_query(payload, ctx)

        # Extract the generated summary from the response
        summary_text = ""
        if "summary" in result:
            summary_text = result["summary"]
        elif "answer" in result:
            summary_text = result["answer"]
        else:
            return {"error": f"Unexpected response format: {json.dumps(result, indent=2)}"}

        # Build citations list
        citations = []
        if "search_results" in result and result["search_results"]:
            for i, search_result in enumerate(result["search_results"], 1):
                citation = {
                    "id": i,
                    "score": search_result.get("score", 0.0),
                    "text": search_result.get("text", ""),
                    "document_metadata": search_result.get("document_metadata", {})
                }
                citations.append(citation)

        # Build response dict
        response = {
            "summary": summary_text,
            "citations": citations
        }

        # Add factual consistency score if available
        if "factual_consistency_score" in result:
            response["factual_consistency_score"] = result["factual_consistency_score"]

        return response

    except Exception as e:
        return {"error": _format_error("Vectara RAG query", e)}


# Query tool
@mcp.tool()
async def search_vectara(
    query: str,
    ctx: Context,
    corpus_keys: list[str],
    n_sentences_before: int = 2,
    n_sentences_after: int = 2,
    lexical_interpolation: float = 0.005
) -> dict:
    """
    Run a semantic search query using Vectara, without generation.

    Args:
        query: str, The user query to run - required.
        corpus_keys: list[str], List of Vectara corpus keys to use for the search - required. Please ask the user to provide one or more corpus keys.
        n_sentences_before: int, Number of sentences before the answer to include in the context - optional, default is 2.
        n_sentences_after: int, Number of sentences after the answer to include in the context - optional, default is 2.
        lexical_interpolation: float, The amount of lexical interpolation to use - optional, default is 0.005.

    Note: API key must be configured first using 'setup_vectara_api_key' tool

    Returns:
        dict: Raw search results from Vectara API containing:
            - "search_results": List of search result objects with scores, text, and metadata
            - Additional response metadata from the API
        On error, returns dict with "error" key containing error message.
    """
    # Validate parameters
    validation_error = _validate_common_parameters(query, corpus_keys)
    if validation_error:
        return {"error": validation_error}

    if ctx:
        ctx.info(f"Running Vectara semantic search query: {query}")

    try:
        payload = _build_query_payload(
            query=query,
            corpus_keys=corpus_keys,
            n_sentences_before=n_sentences_before,
            n_sentences_after=n_sentences_after,
            lexical_interpolation=lexical_interpolation,
            enable_generation=False
        )

        result = await _call_vectara_query(payload, ctx)
        return result

    except Exception as e:
        return {"error": _format_error("Vectara semantic search query", e)}


@mcp.tool()
async def correct_hallucinations(
    generated_text: str,
    documents: list[str],
    ctx: Context,
    query: str = "",
) -> dict:
    """
    Identify and correct hallucinations in generated text using Vectara's hallucination correction API.

    Args:
        generated_text: str, The generated text to analyze for hallucinations - required.
        documents: list[str], List of source documents to compare against - required.
        query: str, The original user query that led to the generated text - optional.

    Note: API key must be configured first using 'setup_vectara_api_key' tool

    Returns:
        dict: Structured response containing:
            - "corrected_text": Text with hallucinations corrected
            - "corrections": Array of correction objects with:
                * "original_text": The hallucinated content
                * "corrected_text": The factually accurate replacement
                * "explanation": Detailed reason for the correction
        On error, returns dict with "error" key containing error message.
    """
    # Validate parameters
    if not generated_text:
        return {"error": "Generated text is required."}
    if not documents:
        return {"error": "Documents are required."}

    # Validate API key early
    api_key = _get_api_key()
    if not api_key:
        return {"error": API_KEY_ERROR_MESSAGE}

    if ctx:
        ctx.info(f"Analyzing text for hallucinations: {generated_text[:100]}...")

    try:
        # Build payload for VHC hallucination correction endpoint
        payload = {
            "generated_text": generated_text,
            "documents": [{"text": doc} for doc in documents],
            "model_name": VHC_MODEL_NAME
        }
        if query:
            payload["query"] = query

        return await _make_api_request(
            f"{VECTARA_BASE_URL}/hallucination_correctors/correct_hallucinations",
            payload,
            ctx,
            None,
            "hallucination correction"
        )

    except Exception as e:
        return {"error": _format_error("hallucination correction", e)}


@mcp.tool()
async def eval_factual_consistency(
    generated_text: str,
    documents: list[str],
    ctx: Context,
) -> dict:
    """
    Evaluate the factual consistency of generated text against source documents using Vectara's dedicated factual consistency API.

    Args:
        generated_text: str, The generated text to evaluate for factual consistency - required.
        documents: list[str], List of source documents to compare against - required.

    Note: API key must be configured first using 'setup_vectara_api_key' tool

    Returns:
        dict: Structured response containing factual consistency evaluation score.
        On error, returns dict with "error" key containing error message.
    """
    # Validate parameters
    if not generated_text:
        return {"error": "Generated text is required."}
    if not documents:
        return {"error": "Documents are required."}

    # Validate API key early
    api_key = _get_api_key()
    if not api_key:
        return {"error": API_KEY_ERROR_MESSAGE}

    if ctx:
        ctx.info(f"Evaluating factual consistency for text: {generated_text[:100]}...")

    try:
        # Build payload for dedicated factual consistency evaluation endpoint
        payload = {
            "generated_text": generated_text,
            "source_texts": documents,
        }

        return await _make_api_request(
            f"{VECTARA_BASE_URL}/evaluate_factual_consistency",
            payload,
            ctx,
            None,
            "factual consistency evaluation"
        )

    except Exception as e:
        return {"error": _format_error("factual consistency evaluation", e)}


def _setup_signal_handlers():
    """Setup signal handlers for graceful shutdown."""
    def signal_handler(signum, frame):
        logger.info(f"Received signal {signum}, initiating graceful shutdown...")
        # Schedule cleanup in the event loop
        if hasattr(asyncio, 'get_running_loop'):
            try:
                loop = asyncio.get_running_loop()
                loop.create_task(cleanup_connections())
            except RuntimeError:
                # No running loop, cleanup will happen at exit
                pass
        sys.exit(0)

    signal.signal(signal.SIGINT, signal_handler)
    signal.signal(signal.SIGTERM, signal_handler)


def _setup_cleanup():
    """Setup cleanup for process exit."""
    atexit.register(lambda: asyncio.run(cleanup_connections()))


def main():
    """Command-line interface for starting the Vectara MCP Server."""
    parser = argparse.ArgumentParser(description="Vectara MCP Server")
    parser.add_argument(
        '--transport',
        default='http',
        choices=['http', 'sse', 'stdio'],
        help='Transport protocol (default: http for security)'
    )
    parser.add_argument(
        '--host',
        default='127.0.0.1',
        help='Host address for HTTP/SSE transport (default: 127.0.0.1)'
    )
    parser.add_argument(
        '--port',
        type=int,
        default=8000,
        help='Port for HTTP/SSE transport (default: 8000)'
    )
    parser.add_argument(
        '--stdio',
        action='store_true',
        help='Use STDIO transport (less secure, for local development only)'
    )
    parser.add_argument(
        '--no-auth',
        action='store_true',
        help='Disable authentication (DANGEROUS: development only)'
    )
    parser.add_argument(
        '--path',
        default='/sse',
        help='Path for SSE endpoint (default: /sse)'
    )

    args = parser.parse_args()

    # Override transport if --stdio flag is used
    if args.stdio:
        args.transport = 'stdio'

    # Configure authentication based on transport and flags
    auth_enabled = args.transport != 'stdio' and not args.no_auth

    # Display startup information
    if args.transport == 'stdio':
        print("⚠️  Warning: STDIO transport is less secure. Use only for local development.", file=sys.stderr)
        print("Starting Vectara MCP Server (STDIO mode)...", file=sys.stderr)
        mcp.run()
        sys.exit(0)
    else:
        if args.no_auth:
            print("⚠️  WARNING: Authentication disabled. NEVER use in production!", file=sys.stderr)

        transport_name = "HTTP" if args.transport == 'http' else "SSE"
        auth_status = "enabled" if auth_enabled else "DISABLED"

        print(f"Starting Vectara MCP Server ({transport_name} mode)", file=sys.stderr)
        print(f"Server: http://{args.host}:{args.port}{args.path if args.transport == 'sse' else ''}", file=sys.stderr)
        print(f"Authentication: {auth_status}", file=sys.stderr)

        # Initialize authentication middleware
        initialize_auth(auth_enabled)

        # Setup signal handlers and cleanup
        _setup_signal_handlers()
        _setup_cleanup()

        if args.transport == 'http':
            mcp.run(transport='http', host=args.host, port=args.port)
        else:  # sse
            mcp.run(transport='sse', host=args.host, port=args.port, path=args.path)

        sys.exit(0)

if __name__ == "__main__":
    main()

```