#
tokens: 28071/50000 42/44 files (page 1/3)
lines: off (toggle) GitHub
raw markdown copy
This is page 1 of 3. Use http://codebase.md/twelvedata/mcp?lines=false&page={x} to view the full context.

# Directory Structure

```
├── .dockerignore
├── .env.template
├── .gitignore
├── .python-version
├── config
│   └── mcpConfigStdio.json
├── docker-compose.yml
├── Dockerfile
├── example.gif
├── extra
│   ├── commands.txt
│   ├── endpoints_spec_en.csv
│   ├── full_descriptions.json
│   ├── instructions.txt
│   └── openapi_clean.json
├── favicon.ico
├── LICENSE
├── pyproject.toml
├── README.md
├── scripts
│   ├── check_embedings.py
│   ├── generate_docs_embeddings.py
│   ├── generate_endpoints_embeddings.py
│   ├── generate_requests_models.py
│   ├── generate_response_models.py
│   ├── generate_tools.py
│   ├── generate.md
│   ├── patch_vector_in_embeddings.py
│   ├── select_embedding.py
│   ├── split_openapi.py
│   └── split_opnapi_by_groups.py
├── src
│   └── mcp_server_twelve_data
│       ├── __init__.py
│       ├── __main__.py
│       ├── common.py
│       ├── doc_tool_remote.py
│       ├── doc_tool_response.py
│       ├── doc_tool.py
│       ├── key_provider.py
│       ├── prompts.py
│       ├── request_models.py
│       ├── response_models.py
│       ├── server.py
│       ├── tools.py
│       ├── u_tool_remote.py
│       ├── u_tool_response.py
│       └── u_tool.py
├── test
│   ├── __init__.py
│   ├── common.py
│   ├── endpoint_pairs.py
│   ├── test_doc_tool.py
│   ├── test_mcp_main.py
│   ├── test_top_n_filter.py
│   └── test_user_plan.py
└── uv.lock
```

# Files

--------------------------------------------------------------------------------
/.python-version:
--------------------------------------------------------------------------------

```

```

--------------------------------------------------------------------------------
/.dockerignore:
--------------------------------------------------------------------------------

```
.git
data/
publish.md
```

--------------------------------------------------------------------------------
/.env.template:
--------------------------------------------------------------------------------

```
LANCE_DB_ENDPOINTS_PATH=
LANCE_DB_DOCS_PATH=
OPENAI_API_KEY=
TWELVE_DATA_API_KEY=
MCP_URL=
```

--------------------------------------------------------------------------------
/.gitignore:
--------------------------------------------------------------------------------

```
dist/
publish.md
.idea
src/mcp_server_twelve_data/__pycache__/
src/mcp_server_twelve_data/requests/__pycache__/
src/mcp_server_twelve_data/responses/__pycache__/
scripts/__pycache__/
test/__pycache__/
/data/
/.env
debug.txt
src/resources/docs.lancedb/
src/resources/endpoints.lancedb/


```

--------------------------------------------------------------------------------
/README.md:
--------------------------------------------------------------------------------

```markdown

# Twelve Data MCP Server

## Overview

The Twelve Data MCP Server provides a seamless integration with the Twelve Data API to access financial market data. It enables retrieval of historical time series, real-time quotes, and instrument metadata for stocks, forex pairs, and cryptocurrencies.

> Note: This server is currently in early-stage development; features and tools may evolve alongside updates to the Twelve Data API.

## Obtaining Your API Key

To use Twelve Data MCP Server, you must first obtain an API key from Twelve Data:

1. Visit [Twelve Data Sign Up](https://twelvedata.com/register?utm_source=github&utm_medium=repository&utm_campaign=mcp_repo).
2. Create an account or log in if you already have one.
3. Navigate to your Dashboard and copy your API key.

Important: Access to specific endpoints or markets may vary depending on your Twelve Data subscription plan.

## U-tool
u-tool is an AI-powered universal router for the Twelve Data API that transforms how you access financial data. Instead of navigating 100+ individual endpoints and complex documentation, simply describe what you need in plain English.

How it works:
🧠 Natural Language Processing: Understands your request in conversational English
🔍 Smart Routing: Uses vector search to find the most relevant endpoints from Twelve Data's entire API catalog
🎯 Intelligent Selection: Leverages OpenAI GPT-4o to choose the optimal method and generate correct parameters
⚡ Automatic Execution: Calls the appropriate endpoint and returns formatted results

What you can ask:
📈 "Show me Apple stock performance this week"
📊 "Calculate RSI for Bitcoin with 14-day period" 
💰 "Get Tesla's financial ratios and balance sheet"
🌍 "Compare EUR/USD exchange rates over 6 months"
🏦 "Find top-performing tech ETFs"

Supported data categories:
- Market data & quotes • Technical indicators (100+)
- Fundamental data & financials • Currencies & crypto
- Mutual funds & ETFs • Economic calendars & events

One tool, entire Twelve Data ecosystem. No API documentation required.

## Installation

### Using **UV** (recommended)

Directly run without local installation using [`uvx`](https://docs.astral.sh/uv/guides/tools/):

```bash
uvx mcp-server-twelve-data --help
```

### Using **pip**

Install the server via pip:

```bash
pip install mcp-server-twelve-data
python -m mcp_server_twelve_data --help
```

## Configuration

### Claude Desktop integration

Add one of the following snippets to your `claude_desktop_config.json`:
(1) local stdio server configured with utool
```json
{
  "mcpServers": {
    "twelvedata": {
      "command": "uvx",
      "args": ["mcp-server-twelve-data@latest", "-k", "YOUR_TWELVE_DATA_API_KEY", "-u", "YOUR_OPEN_AI_APIKEY"]
    }
  }
}
```

(2) local stdio server only with 10 the most popular endpoints
```json
{
  "mcpServers": {
    "twelvedata": {
      "command": "uvx",
      "args": ["mcp-server-twelve-data@latest", "-k", "YOUR_TWELVE_DATA_API_KEY", "-n", "10"]
    }
  }
}
```

(3) twelve data remote mcp server

```json
{
  "mcpServers": {
    "twelvedata-remote": {
      "command": "npx",
      "args":    [
        "mcp-remote", "https://mcp.twelvedata.com/mcp",
        "--header",
        "Authorization:${AUTH_HEADER}",
        "--header",
        "X-OpenAPI-Key:${OPENAI_API_KEY}"
      ],
      "env": {
        "AUTH_HEADER": "apikey YOUR_TWELVE_DATA_API_KEY",
        "OPENAI_API_KEY": "YOUR_OPENAI_API_KEY"
      }
    }
  }
}
```

See how easy it is to connect Claude Desktop to Twelve Data MCP Server:

![Example usage with Claude Desktop](./example.gif)

### VS Code integration

#### Automatic setup (with UV)

[![Install with UV in VS Code](https://img.shields.io/badge/VS_Code-UV-0098FF?style=flat-square&logo=visualstudiocode&logoColor=white)](https://insiders.vscode.dev/redirect/mcp/install?name=twelvedata&config=%7B%22command%22%3A%22uvx%22%2C%22args%22%3A%5B%22mcp-server-twelve-data%22%2C%22-k%22%2C%22YOUR_TWELVE_DATA_API_KEY%22%2C%22-u%22%2C%22YOUR_OPENAI_API_KEY%22%5D%7D)

#### Manual setup

For manual configuration, add to your **User Settings (JSON)**:

```json
{
  "mcp": {
    "servers": {
      "twelvedata": {
          "command": "uvx",
          "args": [
            "mcp-server-twelve-data",
            "-k", "YOUR_TWELVE_DATA_API_KEY",
            "-u", "YOUR_OPENAI_API_KEY"
          ]
        }
    }
  }
}
```

## Debugging

Use the MCP Inspector for troubleshooting:

```bash
npx @modelcontextprotocol/inspector uvx mcp-server-twelve-data@latest -k YOUR_TWELVE_DATA_API_KEY
```

## Development guide

1. **Local testing:** Utilize the MCP Inspector as described in **Debugging**.
2. **Claude Desktop:**: Update `claude_desktop_config.json` to reference local source paths.

## Docker usage

Build and run the server using Docker:

```bash
docker build -t mcp-server-twelve-data .

docker run --rm mcp-server-twelve-data \
  -k YOUR_TWELVE_DATA_API_KEY \
  -u YOUR_OPENAI_API_KEY \
  -t streamable-http
```

## License

This MCP server is licensed under the MIT License. See the [LICENSE](../../LICENSE) file for details.

```

--------------------------------------------------------------------------------
/extra/instructions.txt:
--------------------------------------------------------------------------------

```

```

--------------------------------------------------------------------------------
/test/__init__.py:
--------------------------------------------------------------------------------

```python

```

--------------------------------------------------------------------------------
/src/mcp_server_twelve_data/__main__.py:
--------------------------------------------------------------------------------

```python
from mcp_server_twelve_data import main

main()

```

--------------------------------------------------------------------------------
/config/mcpConfigStdio.json:
--------------------------------------------------------------------------------

```json
{
  "mcpServers": {
    "mcpServerDev": {
      "command": "uvx",
      "args": [
        "mcp-server-twelve-data"
      ],
      "workingDir": ".",
      "env": {

      }
    }
  }
}
```

--------------------------------------------------------------------------------
/docker-compose.yml:
--------------------------------------------------------------------------------

```yaml
version: '3.9'

services:
  mcp-server-twelve-data:
    build: .
    container_name: mcp-server-twelve-data
    restart: unless-stopped
    ports:
      - "8000:8000"

    command: ["-k", "demo", "-t", "streamable-http"]

networks:
  backend:
```

--------------------------------------------------------------------------------
/src/mcp_server_twelve_data/doc_tool_response.py:
--------------------------------------------------------------------------------

```python
from typing import Optional, List, Callable, Awaitable
from mcp.server.fastmcp import Context
from pydantic import BaseModel


class DocToolResponse(BaseModel):
    query: str
    top_candidates: Optional[List[str]] = None
    result: Optional[str] = None
    error: Optional[str] = None


doctool_func_type = Callable[[str, Context], Awaitable[DocToolResponse]]

```

--------------------------------------------------------------------------------
/extra/commands.txt:
--------------------------------------------------------------------------------

```
{"method":"initialize","params":{"protocolVersion":"2025-03-26","capabilities":{"sampling":{},"roots":{"listChanged":true}},"clientInfo":{"name":"mcp-inspector","version":"0.13.0"}},"jsonrpc":"2.0","id":0}
{"method":"notifications/initialized","jsonrpc":"2.0"}
{"method":"tools/list","params":{"_meta":{"progressToken":1}},"jsonrpc":"2.0","id":1}
{"method":"tools/call","params":{"name":"add","arguments":{"a":1,"b":5},"_meta":{"progressToken":2}},"jsonrpc":"2.0","id":2}

```

--------------------------------------------------------------------------------
/Dockerfile:
--------------------------------------------------------------------------------

```dockerfile
# Use official Python 3.13 runtime
FROM python:3.13-slim

# Set working directory
WORKDIR /app

# Install pip and UV for dependency management
RUN pip install --upgrade pip uv

# Copy project metadata and README for build context
COPY pyproject.toml uv.lock* README.md LICENSE ./

# Copy source code in src directory
COPY src ./src

# Install project dependencies and build the package using UV
RUN uv pip install . --system

# Run the MCP server directly from source
ENTRYPOINT ["python", "-m", "mcp_server_twelve_data"]
CMD ["-k", "demo", "-t", "streamable-http"]

```

--------------------------------------------------------------------------------
/scripts/generate_response_models.py:
--------------------------------------------------------------------------------

```python
import subprocess
from pathlib import Path

openapi_path = '../extra/openapi_clean.json'
output_path = '../data/response_models.py'

cmd = [
    'datamodel-codegen',
    '--input', str(openapi_path),
    '--input-file-type', 'openapi',
    '--output', str(output_path),
    '--output-model-type', 'pydantic_v2.BaseModel',
    '--reuse-model',
    '--use-title-as-name',
    '--disable-timestamp',
    '--field-constraints',
    '--use-double-quotes',
]

subprocess.run(cmd, check=True)

# Append aliases
alias_lines = [
    '',
    '# Aliases for response models',
    'GetMarketMovers200Response = MarketMoversResponseBody',
    'GetTimeSeriesPercent_B200Response = GetTimeSeriesPercentB200Response',
    ''
]

with open(output_path, 'a', encoding='utf-8') as f:
    f.write('\n'.join(alias_lines))

print(f"[SUCCESS] Models generated using CLI and aliases added: {output_path}")

```

--------------------------------------------------------------------------------
/scripts/select_embedding.py:
--------------------------------------------------------------------------------

```python
import os
from dotenv import load_dotenv
from lancedb import connect
from openai import OpenAI
import numpy as np
from numpy.linalg import norm

load_dotenv('../.env')
client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))

query = "Show me tax information for AAPL."
query_vector = np.array(
    client.embeddings.create(input=query, model="text-embedding-3-large").data[0].embedding
)

db = connect("../src/mcp_server_twelve_data/resources/endpoints.lancedb")
tbl = db.open_table("endpoints")
df = tbl.to_pandas()

tax_vector = np.array(df.query("id == 'GetTaxInfo'").iloc[0]["vector"])
balance_vector = np.array(df.query("id == 'GetBalanceSheetConsolidated'").iloc[0]["vector"])


def cosine_similarity(a, b):
    return np.dot(a, b) / (norm(a) * norm(b))


print(f"GetTaxInfo: {cosine_similarity(query_vector, tax_vector):.4f}")
print(f"GetBalanceSheetConsolidated: {cosine_similarity(query_vector, balance_vector):.4f}")

```

--------------------------------------------------------------------------------
/src/mcp_server_twelve_data/u_tool_response.py:
--------------------------------------------------------------------------------

```python
from typing import Optional, List, Any, Callable, Awaitable
from pydantic import BaseModel, Field

from mcp.server.fastmcp import Context


class UToolResponse(BaseModel):
    """Response object returned by the u-tool."""

    top_candidates: Optional[List[str]] = Field(
        default=None, description="List of tool operationIds considered by the vector search."
    )
    premium_only_candidates: Optional[List[str]] = Field(
        default=None, description="Relevant tool IDs available only in higher-tier plans"
    )
    selected_tool: Optional[str] = Field(
        default=None, description="Name (operationId) of the tool selected by the LLM."
    )
    param: Optional[dict] = Field(
        default=None, description="Parameters passed to the selected tool."
    )
    response: Optional[Any] = Field(
        default=None, description="Result returned by the selected tool."
    )
    error: Optional[str] = Field(
        default=None, description="Error message, if tool resolution or execution fails."
    )


utool_func_type = Callable[[str, Context, Optional[str], Optional[str]], Awaitable[UToolResponse]]

```

--------------------------------------------------------------------------------
/test/common.py:
--------------------------------------------------------------------------------

```python
import asyncio
import os
import signal
import sys

import pytest_asyncio
from dotenv import load_dotenv

sys.unraisablehook = lambda unraisable: None

dotenv_path = os.path.join(os.path.dirname(__file__), '..', '.env')
load_dotenv(dotenv_path)

SERVER_URL = os.environ["SERVER_URL"]
MCP_URL = SERVER_URL + '/mcp/'
TD_API_KEY = os.environ["TWELVE_DATA_API_KEY"]
OPENAI_API_KEY = os.environ["OPENAI_API_KEY"]


@pytest_asyncio.fixture(scope="function")
async def run_server():
    proc = await asyncio.create_subprocess_exec(
        "python", "-m", "mcp_server_twelve_data",
        "-t", "streamable-http",
        "-k", TD_API_KEY,
        "-u", OPENAI_API_KEY,
        stdout=asyncio.subprocess.DEVNULL,
        stderr=asyncio.subprocess.DEVNULL,
    )

    for _ in range(40):
        try:
            import httpx
            async with httpx.AsyncClient() as client:
                r = await client.get(f"{SERVER_URL}/health")
                if r.status_code == 200:
                    break
        except Exception:
            await asyncio.sleep(1)
    else:
        proc.terminate()
        raise RuntimeError("Server did not start")

    yield
    proc.send_signal(signal.SIGINT)
    await proc.wait()

```

--------------------------------------------------------------------------------
/src/mcp_server_twelve_data/prompts.py:
--------------------------------------------------------------------------------

```python

utool_doc_string = """
A universal tool router for the MCP system, designed for the Twelve Data API.

This tool accepts a natural language query in English and performs the following:
1. Uses vector search to retrieve the top-N relevant Twelve Data endpoints.
2. Sends the query and tool descriptions to OpenAI's gpt-4o with function calling.
3. The model selects the most appropriate tool and generates the input parameters.
4. The selected endpoint (tool) is executed and its response is returned.

Supported endpoint categories (from Twelve Data docs):
- Market & Reference: price, quote, symbol_search, stocks, exchanges, market_state
- Time Series: time_series, eod, splits, dividends, etc.
- Technical Indicators: rsi, macd, ema, bbands, atr, vwap, and 100+ others
- Fundamentals & Reports: earnings, earnings_estimate, income_statement,
  balance_sheet, cash_flow, statistics, profile, ipo_calendar, analyst_ratings
- Currency & Crypto: currency_conversion, exchange_rate, price_target
- Mutual Funds / ETFs: funds, mutual_funds/type, mutual_funds/world
- Misc Utilities: logo, calendar endpoints, time_series_calendar, etc.
"""

doctool_doc_string = """
Search Twelve Data documentation and return a Markdown summary of the most relevant sections.
"""
```

--------------------------------------------------------------------------------
/test/test_doc_tool.py:
--------------------------------------------------------------------------------

```python
import json

import pytest
from mcp.client.streamable_http import streamablehttp_client
from mcp import ClientSession

from test.common import TD_API_KEY, OPENAI_API_KEY, MCP_URL, run_server


@pytest.mark.asyncio
@pytest.mark.parametrize("query, expected_title_keyword", [
    ("what does the macd indicator do?", "MACD"),
    ("how to fetch time series data?", "Time Series"),
    ("supported intervals for time_series?", "interval"),
])
async def test_doc_tool_async(query, expected_title_keyword, run_server):
    headers = {
        "Authorization": f"apikey {TD_API_KEY}",
        "x-openapi-key": OPENAI_API_KEY
    }

    async with streamablehttp_client(MCP_URL, headers=headers) as (read_stream, write_stream, _):
        async with ClientSession(read_stream, write_stream) as session:
            await session.initialize()
            call_result = await session.call_tool("doc-tool", arguments={"query": query})
        await read_stream.aclose()
        await write_stream.aclose()

    assert not call_result.isError, f"doc-tool error: {call_result.content}"
    raw = call_result.content[0].text
    payload = json.loads(raw)

    assert payload["error"] is None
    assert payload["result"] is not None
    assert expected_title_keyword.lower() in payload["result"].lower(), (
        f"Expected '{expected_title_keyword}' in result Markdown:\n{payload['result']}"
    )
    assert len(payload["top_candidates"]) > 0

```

--------------------------------------------------------------------------------
/pyproject.toml:
--------------------------------------------------------------------------------

```toml
[project]
name = "mcp-server-twelve-data"
version = "0.2.4"
description = "A Model Context Protocol server providing tools access Twelve Data."
readme = "README.md"
requires-python = ">=3.13"
authors = [{ name = "Twelve Data, PBC." }]
maintainers = [{ name = "Kopyev Eugene", email = "[email protected]" }]
keywords = ["twelve", "data", "mcp", "llm", "automation"]
license = { text = "MIT" }
classifiers = [
    "Development Status :: 4 - Beta",
    "Intended Audience :: Developers",
    "License :: OSI Approved :: MIT License",
    "Programming Language :: Python :: 3",
    "Programming Language :: Python :: 3.13",
]
dependencies = [
    "click==8.2.1",
    "mcp[cli]>=1.9.4",
    "openai>=1.86.0",
    "pydantic==2.11.5",
    "pylint>=3.3.7",
    "pyyml>=0.0.2",
]

[project.scripts]
mcp-server-twelve-data = "mcp_server_twelve_data:main"

[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"

[tool.uv]
dev-dependencies = [
    "pyright>=1.1.389",
    "ruff>=0.7.3",
    "pytest>=8.0.0",
    "datamodel-code-generator>=0.31.2",
    "pytest-asyncio>=1.0.0",
    "bs4>=0.0.2",
]

[project.optional-dependencies]
db = [
    "lancedb>=0.23.0",
    "pandas>=2.3.1"
]

[tool.pytest.ini_options]
testpaths = ["tests"]
python_files = "test_*.py"
python_classes = "Test*"
python_functions = "test_*"
asyncio_default_fixture_loop_scope = "function"
addopts = "-s"
log_cli = true
log_cli_level = "INFO"


[tool.hatch.build]
exclude = [
    "src/resources/*",
    "example.gif"
]

```

--------------------------------------------------------------------------------
/test/test_top_n_filter.py:
--------------------------------------------------------------------------------

```python
import json

import pytest
from mcp.client.streamable_http import streamablehttp_client
from mcp import ClientSession

from test.common import TD_API_KEY, MCP_URL, run_server
from test.endpoint_pairs import pairs


@pytest.mark.asyncio
@pytest.mark.parametrize("user_query,expected_op_id", pairs)
async def test_embedding_and_utool_async(user_query, expected_op_id, run_server):
    headers = {"Authorization": f"apikey {TD_API_KEY}"}

    async with streamablehttp_client(MCP_URL, headers=headers) as (read_stream, write_stream, _):
        async with ClientSession(read_stream, write_stream) as session:
            await session.initialize()
            call_result = await session.call_tool("u-tool", arguments={"query": user_query})
        await read_stream.aclose()
        await write_stream.aclose()

    assert not call_result.isError, f"u-tool error: {call_result.content}"
    raw = call_result.content[0].text
    payload = json.loads(raw)
    top_cands = payload.get("top_candidates", [])
    error = payload.get("error")
    selected_tool = payload.get("selected_tool")
    response = payload.get("response")
    assert expected_op_id in top_cands, f"{expected_op_id!r} not in {top_cands!r}"
    assert error is None, f"u-tool error: {error}"
    assert selected_tool == expected_op_id, (
        f"selected_tool {payload.get('selected_tool')!r} != expected {expected_op_id!r}"
    )
    assert response is not None
    if "GetTimeSeries" in selected_tool:
        values = response['values']
        assert values is not None and len(values) > 0

```

--------------------------------------------------------------------------------
/test/test_user_plan.py:
--------------------------------------------------------------------------------

```python
import json

import pytest
from mcp.client.streamable_http import streamablehttp_client
from mcp import ClientSession

from test.common import TD_API_KEY, MCP_URL, run_server


@pytest.mark.asyncio
@pytest.mark.parametrize("user_query, expected_operation_id", [
    ("Show me market movers", "GetMarketMovers"),
    ("Show me earnings estimates for AAPL", "GetEarningsEstimate"),
    ("Show me price targets for TSLA", "GetPriceTarget"),
])
async def test_utool_basic_plan_restrictions(user_query, expected_operation_id, run_server):
    """
    Users on Basic plan should be denied access to endpoints that require higher plans.
    Error message must include required operationId.
    """
    headers = {"Authorization": f"apikey {TD_API_KEY}"}
    user_plan = "Basic"

    async with streamablehttp_client(MCP_URL, headers=headers) as (read_stream, write_stream, _):
        async with ClientSession(read_stream, write_stream) as session:
            await session.initialize()
            result = await session.call_tool("u-tool", arguments={
                "query": user_query,
                "plan": user_plan
            })
        await read_stream.aclose()
        await write_stream.aclose()

    assert not result.isError, f"u-tool error: {result.content}"
    payload = json.loads(result.content[0].text)
    error = payload.get("error")
    selected_tool = payload.get("selected_tool")
    premium_only_candidates = payload.get("premium_only_candidates")
    assert expected_operation_id in premium_only_candidates
    assert selected_tool != expected_operation_id
    assert error is  None

```

--------------------------------------------------------------------------------
/scripts/patch_vector_in_embeddings.py:
--------------------------------------------------------------------------------

```python
import json
import lancedb
import openai
from pathlib import Path


def patch_one_vector(
    operation_id: str,
    db_path: str = "../src/mcp_server_twelve_data/resources/endpoints.lancedb",
    table_name: str = "endpoints",
    desc_path: str = "../extra/full_descriptions.json",
    verbose: bool = True
):
    desc_file = Path(desc_path)
    if not desc_file.exists():
        raise FileNotFoundError(f"{desc_path} not found")

    with desc_file.open("r", encoding="utf-8") as f:
        full_descriptions = json.load(f)

    if operation_id not in full_descriptions:
        raise ValueError(f"No description found for operation_id '{operation_id}'")

    new_description = full_descriptions[operation_id]

    embedding = openai.OpenAI().embeddings.create(
        model="text-embedding-3-small",
        input=[new_description]
    ).data[0].embedding

    db = lancedb.connect(db_path)
    table = db.open_table(table_name)

    matches = table.to_arrow().to_pylist()
    record = next((row for row in matches if row["id"] == operation_id), None)

    if not record:
        raise ValueError(f"operation_id '{operation_id}' not found in LanceDB")

    if verbose:
        print(f"Updating vector for operation_id: {operation_id}")
        print(f"Old description:\n{record['description']}\n")
        print(f"New description:\n{new_description}\n")

    table.update(
        where=f"id == '{operation_id}'",
        values={
            "description": new_description,
            "vector": embedding
        }
    )

    if verbose:
        print("Update complete.")


if __name__ == "__main__":
    patch_one_vector("GetETFsList")

```

--------------------------------------------------------------------------------
/src/mcp_server_twelve_data/doc_tool_remote.py:
--------------------------------------------------------------------------------

```python
from typing import Optional

import httpx
from mcp.server.fastmcp import FastMCP, Context

from mcp_server_twelve_data.common import mcp_server_base_url
from mcp_server_twelve_data.doc_tool_response import doctool_func_type, DocToolResponse
from mcp_server_twelve_data.key_provider import extract_open_ai_apikey, extract_twelve_data_apikey
from mcp_server_twelve_data.prompts import utool_doc_string


def register_doc_tool_remote(
    server: FastMCP,
    transport: str,
    open_ai_api_key_from_args: Optional[str],
    twelve_data_apikey: Optional[str],
) -> doctool_func_type:

    @server.tool(name="doc-tool")
    async def doc_tool(
        query: str,
        ctx: Context,
    ) -> DocToolResponse:
        o_ai_api_key_to_use, error = extract_open_ai_apikey(
            transport=transport,
            open_ai_api_key=open_ai_api_key_from_args,
            ctx=ctx,
        )
        if error is not None:
            return DocToolResponse(query=query, error=error)

        td_key_to_use = extract_twelve_data_apikey(
            transport=transport,
            twelve_data_apikey=twelve_data_apikey,
            ctx=ctx,
        )

        async with httpx.AsyncClient(
            trust_env=False,
            headers={
                "accept": "application/json",
                "user-agent": "python-httpx/0.24.0",
                "x-openapi-key": o_ai_api_key_to_use,
                "Authorization": f'apikey {td_key_to_use}',
            },
            timeout=30,
        ) as client:
            resp = await client.get(
                f"{mcp_server_base_url}/doctool",
                params={
                    "query": query,
                }
            )
            resp.raise_for_status()
            resp_json = resp.json()
            return DocToolResponse.model_validate(resp_json)

    doc_tool.__doc__ = utool_doc_string
    return doc_tool

```

--------------------------------------------------------------------------------
/scripts/generate.md:
--------------------------------------------------------------------------------

```markdown
## Update utool

To update the `utool` tool and regenerate all dependent files and embeddings.

---

### 1. Copy the New OpenAPI Spec

Copy the updated OpenAPI specification to:

```
extra/openapi_clean.json
```

---

### 2. Update Endpoint Embeddings

If **new methods were added**, you must regenerate all embeddings:

```bash
python scripts/generate_endpoints_embeddings.py
```

This will generate two files:

- `full_descriptions.json`
- `endpoints.lancedb`

> ⚠️ This process is time-consuming.

If you only need to **update one endpoint**, modify or insert the updated description in:

```
data/full_descriptions.txt
```

Then run:

```bash
python scripts/patch_vector_in_embeddings.py
```

---

### 3. Generate Request Models

Run:

```bash
python scripts/generate_requests_models.py
```

This will create:

```
data/requests_models.py
```

---

### 4. Generate Response Models

Run:

```bash
python scripts/generate_response_models.py
```

This will create:

```
data/response_models.py
```

---

### 5. Generate Tools

Run:

```bash
python scripts/generate_tools.py
```

This will create:

```
data/tools.py
```

---

### 6. Update Existing Files

Replace the existing versions with the newly generated files:

- `requests_models.py`
- `response_models.py`
- `tools.py`

---

### 7. Run Tests

Run the following to verify vector search functionality:

```bash
python scripts/test_top_n_filter.py
```

---

### 8. Fix and Extend Tests

- Fix any failing tests
- Add new test cases for newly added endpoints

---

## Update doctool

To update the `doctool` vector database:

---

### 1. Generate Documentation Embeddings

Run:

```bash
python scripts/generate_doc_tool_embeddings.py
```

This will create:

```
docs.lancedb
```

---

### 2. Update Existing Files

Replace any relevant files with the updated ones produced by the script.

---

### 3. Run doctool Tests

Run the following to verify `doctool` embeddings:

```bash
python scripts/run_test_doctool.py
```

---
```

--------------------------------------------------------------------------------
/src/mcp_server_twelve_data/u_tool_remote.py:
--------------------------------------------------------------------------------

```python
from typing import Optional

import httpx
from mcp.server.fastmcp import FastMCP, Context

from mcp_server_twelve_data.common import mcp_server_base_url
from mcp_server_twelve_data.key_provider import extract_open_ai_apikey, extract_twelve_data_apikey
from mcp_server_twelve_data.prompts import utool_doc_string
from mcp_server_twelve_data.u_tool_response import UToolResponse, utool_func_type


def register_u_tool_remote(
    server: FastMCP,
    transport: str,
    open_ai_api_key_from_args: Optional[str],
    twelve_data_apikey: Optional[str],
) -> utool_func_type:

    @server.tool(name="u-tool")
    async def u_tool(
        query: str,
        ctx: Context,
        format: Optional[str] = None,
        plan: Optional[str] = None,
    ) -> UToolResponse:
        o_ai_api_key_to_use, error = extract_open_ai_apikey(
            transport=transport,
            open_ai_api_key=open_ai_api_key_from_args,
            ctx=ctx,
        )
        if error is not None:
            return UToolResponse(error=error)

        td_key_to_use = extract_twelve_data_apikey(
            transport=transport,
            twelve_data_apikey=twelve_data_apikey,
            ctx=ctx,
        )

        async with httpx.AsyncClient(
            trust_env=False,
            headers={
                "accept": "application/json",
                "user-agent": "python-httpx/0.24.0",
                "x-openapi-key": o_ai_api_key_to_use,
                "Authorization": f'apikey {td_key_to_use}',
            },
            timeout=30,
        ) as client:
            resp = await client.get(
                f"{mcp_server_base_url}/utool",
                params={
                    "query": query,
                    "format": format,
                    "plan": plan,
                }
            )
            resp.raise_for_status()
            resp_json = resp.json()
            return UToolResponse.model_validate(resp_json)

    u_tool.__doc__ = utool_doc_string
    return u_tool

```

--------------------------------------------------------------------------------
/scripts/generate_docs_embeddings.py:
--------------------------------------------------------------------------------

```python
import os
import uuid
import httpx
import openai
import pandas as pd
import lancedb
from bs4 import BeautifulSoup
from dotenv import load_dotenv
from tqdm import tqdm

# === CONFIG ===
load_dotenv('../.env')

OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
DB_PATH = os.getenv("LANCEDB_PATH", '../data/docs.lancedb')
OPENAI_MODEL = 'text-embedding-3-large'
DOCS_URL = 'https://twelvedata.com/docs'

client = openai.OpenAI(api_key=OPENAI_API_KEY)


def download_docs(url: str) -> str:
    print(f"Downloading documentation from: {url}")
    response = httpx.get(url, timeout=10)
    response.raise_for_status()
    print("HTML download complete.")
    return response.text


def parse_sections(html: str) -> list[dict]:
    soup = BeautifulSoup(html, "html.parser")
    sections = soup.select("section[id]")

    records = []
    for idx, section in enumerate(sections, start=1):
        section_id = section["id"]
        title_el = section.find("h2") or section.find("h3") or section.find("h1")
        title = title_el.get_text(strip=True) if title_el else section_id
        content = section.get_text(separator="\n", strip=True)
        print(f"[{idx}/{len(sections)}] Parsed section: {title}")
        records.append({
            "id": str(uuid.uuid4()),
            "section_id": section_id,
            "title": title,
            "content": content
        })
    return records


def generate_embedding(text: str) -> list[float]:
    response = client.embeddings.create(
        model=OPENAI_MODEL,
        input=[text]
    )
    return response.data[0].embedding


def build_lancedb(records: list[dict], db_path: str):
    df = pd.DataFrame(records)

    print(f"Generating embeddings for {len(df)} sections...")
    vectors = []
    for content in tqdm(df["content"], desc="Embedding"):
        vectors.append(generate_embedding(content))
    df["vector"] = vectors

    db = lancedb.connect(db_path)
    db.create_table("docs", data=df, mode="overwrite")

    print(f"Saved {len(df)} sections to LanceDB at: {db_path}")
    print("Section titles:")
    for title in df["title"]:
        print(f" - {title}")


def main():
    print("Step 1: Downloading HTML")
    html = download_docs(DOCS_URL)

    print("Step 2: Parsing sections")
    records = parse_sections(html)

    print("Step 3: Building LanceDB")
    build_lancedb(records, DB_PATH)


if __name__ == '__main__':
    main()

```

--------------------------------------------------------------------------------
/src/mcp_server_twelve_data/__init__.py:
--------------------------------------------------------------------------------

```python
from typing import Literal, Optional

import click
import logging
import sys

from dotenv import load_dotenv

from .server import serve


@click.command()
@click.option("-v", "--verbose", count=True)
@click.option("-t", "--transport", default="stdio", help="stdio, streamable-http")
@click.option(
    "-k",
    "--twelve-data-apikey",
    default=None,
    help=(
        "This parameter is required for 'stdio' transport. "
        "For 'streamable-http', you have three options: "
        "1. Use the -k option to set a predefined API key. "
        "2. Use the -ua option to retrieve the API key from the Twelve Data server. "
        "3. Provide the API key in the 'Authorization' header as: 'apikey <your-apikey>'."
    )
)
@click.option(
    "-n", "--number-of-tools", default=35,
    help="limit number of tools to prevent problems with mcp clients, max n value is 193, default is 35"
)
@click.option(
    "-u", "--u-tool-open-ai-api-key", default=None,
    help=(
        "If set, activates a unified 'u-tool' powered by OpenAI "
        "to select and call the appropriate Twelve Data endpoint."
    ),
)
@click.option(
    "-ua", "--u-tool-oauth2", default=False, is_flag=True,
    help=(
        "If set, activates the unified 'u-tool' powered by OpenAI, "
        "and fetches Twelve Data and OpenAI API keys directly from the Twelve Data server."
    )
)
def main(
    verbose: bool,
    transport: Literal["stdio", "sse", "streamable-http"] = "stdio",
    twelve_data_apikey: Optional[str] = None,
    number_of_tools: int = 30,
    u_tool_open_ai_api_key: Optional[str] = None,
    u_tool_oauth2: bool = False,
) -> None:
    load_dotenv()
    logging_level = logging.WARN
    if verbose == 1:
        logging_level = logging.INFO
    elif verbose >= 2:
        logging_level = logging.DEBUG

    logging.basicConfig(level=logging_level, stream=sys.stderr)

    if u_tool_oauth2 and u_tool_open_ai_api_key is not None:
        RuntimeError("Set either u_tool_open_ai_api_key or u_tool_oauth2")
    if u_tool_oauth2 and transport != "streamable-http":
        RuntimeError("Set transport to streamable-http if you want to use -ua option")
    if transport == "stdio" and twelve_data_apikey is None:
        RuntimeError("Set -k to use stdio transport")

    serve(
        api_base="https://api.twelvedata.com",
        transport=transport,
        twelve_data_apikey=twelve_data_apikey,
        number_of_tools=number_of_tools,
        u_tool_open_ai_api_key=u_tool_open_ai_api_key,
        u_tool_oauth2=u_tool_oauth2,
    )


if __name__ == "__main__":
    main()

```

--------------------------------------------------------------------------------
/extra/endpoints_spec_en.csv:
--------------------------------------------------------------------------------

```
Path,Default,Number
/time_series,1,2939
/price,1,949
/quote,1,768
/rsi,1,355
/macd,1,193
/stocks,1,193
/bbands,1,182
/exchange_rate,1,175
/ema,1,173
/statistics,1,146
/profile,1,128
/market_state,1,116
/symbol_search,1,104
/eod,1,96
/sma,1,94
/forex_pairs,1,59
/atr,1,57
/dividends,1,54
/cryptocurrencies,1,48
/earnings,1,43
/currency_conversion,1,41
/exchanges,1,35
/splits,1,32
/earliest_timestamp,1,31
/etfs,1,22
/commodities,1,15
/funds,1,14
/ipo_calendar,1,8
/cryptocurrency_exchanges,1,5
/time_series/cross,1,4
/cross_listings,1,3
/technical_indicators,0,247
/api_usage,0,106
/logo,0,80
/adx,0,52
/vwap,0,42
/stoch,0,41
/income_statement,0,39
/cash_flow,0,34
/balance_sheet,0,32
/recommendations,0,29
/obv,0,25
/cci,0,23
/price_target,0,22
/mfi,0,21
/ma,0,18
/splits_calendar,0,17
/supertrend,0,17
/earnings_calendar,0,16
/willr,0,16
/dividends_calendar,0,15
/earnings_estimate,0,12
/heikinashicandles,0,11
/analyst_ratings/us_equities,0,10
/ichimoku,0,10
/percent_b,0,10
/sar,0,10
/analyst_ratings/light,0,9
/revenue_estimate,0,9
/insider_transactions,0,9
/plus_di,0,9
/institutional_holders,0,8
/market_cap,0,8
/minus_di,0,8
/etfs/list,0,8
/bonds,0,8
/ad,0,8
/growth_estimates,0,7
/mutual_funds/list,0,7
/key_executives,0,7
/stochrsi,0,7
/eps_trend,0,7
/income_statement/consolidated,0,6
/exchange_schedule,0,6
/eps_revisions,0,6
/pivot_points_hl,0,6
/etfs/world,0,6
/fund_holders,0,6
/avg,0,6
/mutual_funds/world,0,5
/etfs/world/summary,0,5
/etfs/family,0,5
/avgprice,0,5
/countries,0,5
/rvol,0,5
/adxr,0,5
/wma,0,5
/mom,0,5
/beta,0,5
/crsi,0,5
/adosc,0,5
/roc,0,5
/mutual_funds/world/performance,0,4
/balance_sheet/consolidated,0,4
/mutual_funds/world/ratings,0,4
/mutual_funds/world/summary,0,4
/etfs/type,0,4
/direct_holders,0,4
/linearreg,0,4
/tema,0,4
/keltner,0,4
/kst,0,4
/mutual_funds/world/composition,0,3
/mutual_funds/world/purchase_info,0,3
/etfs/world/performance,0,3
/edgar_filings/archive,0,3
/ht_trendmode,0,3
/midprice,0,3
/instrument_type,0,3
/trima,0,3
/dema,0,3
/bop,0,3
/mama,0,3
/ppo,0,3
/mutual_funds/world/sustainability,0,2
/etfs/world/composition,0,2
/mutual_funds/world/risk,0,2
/mutual_funds/type,0,2
/mutual_funds/family,0,2
/cash_flow/consolidated,0,2
/wclprice,0,2
/ht_dcperiod,0,2
/medprice,0,2
/typprice,0,2
/ht_trendline,0,2
/linearregangle,0,2
/minus_dm,0,2
/ht_dcphase,0,2
/midpoint,0,2
/ht_phasor,0,2
/ultosc,0,2
/kama,0,2
/trange,0,2
/apo,0,2
/aroon,0,2
/plus_dm,0,2
/dx,0,2
/stddev,0,2
/macdext,0,2
/natr,0,2
/cmo,0,2
/correl,0,2
/max,0,2
/stochf,0,2
/hlc3,0,2

```

--------------------------------------------------------------------------------
/src/mcp_server_twelve_data/key_provider.py:
--------------------------------------------------------------------------------

```python
from typing import Optional

from mcp.server.fastmcp import Context
from mcp.client.streamable_http import RequestContext


def extract_open_ai_apikey(
    transport: str,
    open_ai_api_key: str,
    ctx: Context,
) -> (Optional[str], Optional[str]):
    """Returns optional key and optional error"""
    if transport == 'stdio':
        if open_ai_api_key is not None:
            return (open_ai_api_key, None)
        else:
            # It's not a possible case
            error = (
                f"Transport is stdio and u_tool_open_ai_api_key is None. "
                f"Something goes wrong. Please contact support."
            )
            return None, error
    elif transport == "streamable-http":
        if open_ai_api_key is not None:
            return open_ai_api_key, None
        else:
            rc: RequestContext = ctx.request_context
            token_from_rc = get_tokens_from_rc(rc=rc)
            if token_from_rc.error is not None:
                return None, token_from_rc.error
            elif token_from_rc.twelve_data_api_key and token_from_rc.open_ai_api_key:
                o_ai_api_key_to_use = token_from_rc.open_ai_api_key
                return o_ai_api_key_to_use, None
            else:
                return None, "Either OPEN API KEY or TWELVE Data API key is not provided."
    else:
        return None, "This transport is not supported"


def extract_twelve_data_apikey(
    transport: str,
    twelve_data_apikey: Optional[str],
    ctx: Context,
):
    if transport in {'stdio', 'streamable-http'} and twelve_data_apikey:
        return twelve_data_apikey
    else:
        rc: RequestContext = ctx.request_context
        tokens = get_tokens_from_rc(rc=rc)
        return tokens.twelve_data_api_key


class ToolTokens:
    def __init__(
        self,
        twelve_data_api_key: Optional[str] = None,
        open_ai_api_key: Optional[str] = None,
        error: Optional[str] = None,
    ):
        self.twelve_data_api_key = twelve_data_api_key
        self.open_ai_api_key = open_ai_api_key
        self.error = error


def get_tokens_from_rc(rc: RequestContext) -> ToolTokens:
    if hasattr(rc, "headers"):
        headers = rc.headers
    elif hasattr(rc, "request"):
        headers = rc.request.headers
    else:
        return ToolTokens(error="Headers were not found in a request context.")
    auth_header = headers.get("authorization")
    split = auth_header.split(" ") if auth_header else []
    if len(split) == 2:
        access_token = split[1]
        openai_key = headers.get("x-openapi-key")
        return ToolTokens(
            twelve_data_api_key=access_token,
            open_ai_api_key=openai_key,
        )
    return ToolTokens(error=f"Bad or missing authorization header: {auth_header}")
```

--------------------------------------------------------------------------------
/scripts/check_embedings.py:
--------------------------------------------------------------------------------

```python
import os
import lancedb
import openai

# Константы
EMBEDDING_MODEL = "text-embedding-3-small"

def generate_embedding(text: str) -> list[float]:
    client = openai.OpenAI()
    response = client.embeddings.create(
        model=EMBEDDING_MODEL,
        input=[text]
    )
    return response.data[0].embedding

def evaluate_queries(query_target_pairs: list[tuple[str, str]]):
    db_path = os.getenv('LANCEDB_PATH', '../extra/endpoints.lancedb')
    db = lancedb.connect(db_path)
    table = db.open_table("endpoints")

    results_summary = []

    for user_query, target_endpoint in query_target_pairs:
        query_vec = generate_embedding(user_query)
        results = table.search(query_vec).metric("cosine").limit(30).to_list()

        top_endpoint = results[0]['path'] if results else None
        target_position = next((i for i, r in enumerate(results) if r['path'] == target_endpoint), None)

        print(f"Query: {user_query}")
        print(f"Target endpoint: {target_endpoint}")
        print(f"Top 1 endpoint: {top_endpoint}")
        print(f"Target position in top 30: {target_position}\n")

        results_summary.append((user_query, target_endpoint, top_endpoint, target_position))

    return results_summary


def main():
    query_target_pairs = [
        ("Show me intraday stock prices for Tesla (TSLA) with 1-minute intervals for the past 3 hours.", "/time_series"),
        ("What is the current exchange rate between USD and EUR?", "/price"),
        ("Get the RSI indicator for Apple (AAPL) over the last 14 days.", "/rsi"),
        ("When did Amazon last split its stock?", "/splits"),
        ("Give me daily closing prices for Bitcoin in the past 6 months.", "/time_series"),
        ("Show the MACD for Microsoft.", "/macd"),
        ("Get Google earnings reports for the last year.", "/earnings"),
        ("Fetch dividend history for Johnson & Johnson.", "/dividends"),
        ("Give me fundamentals for Netflix including P/E ratio.", "/fundamentals"),
        ("What is the latest stock quote for Nvidia?", "/quote"),
        ("Retrieve the Bollinger Bands for Apple.", "/bbands"),
        ("What is the VWAP for Tesla?", "/vwap"),
        ("Get ATR indicator for Amazon.", "/atr"),
        ("What is the stochastic oscillator for MSFT?", "/stoch"),
        ("Show me the EMA for S&P 500.", "/ema"),
        ("Retrieve the ADX indicator for crude oil.", "/adx"),
        ("Get the OBV for Bitcoin.", "/obv"),
        ("What is the highest stock price of Apple in the last 30 days?", "/max"),
        ("Give me the minimum price for TSLA in January 2024.", "/min"),
        ("Get the ROC indicator for Ethereum.", "/roc"),
    ]

    results = evaluate_queries(query_target_pairs)

    print("\nSummary:")
    for row in results:
        print(row)


if __name__ == "__main__":
    main()

```

--------------------------------------------------------------------------------
/scripts/generate_tools.py:
--------------------------------------------------------------------------------

```python
import json
import csv
from pathlib import Path

OPENAPI_PATH = "../extra/openapi_clean.json"
ENDPOINTS_PATH = "../extra/endpoints_spec_en.csv"
OUTPUT_PATH = "../data/tools_autogen.py"


def load_csv_paths(path):
    with open(path, newline='', encoding='utf-8') as f:
        return [row[0] for i, row in enumerate(csv.reader(f)) if i > 0 and row]


def load_openapi_spec(path):
    with open(path, encoding='utf-8') as f:
        return json.load(f)


def collect_operations(paths, spec):
    ops = []
    seen = set()
    for path in paths:
        path_item = spec.get("paths", {}).get(path)
        if not path_item:
            continue
        for method, details in path_item.items():
            op_id = details.get("operationId")
            if not op_id or op_id in seen:
                continue
            seen.add(op_id)
            desc = details.get("description", "").strip().replace('"', '\\"').replace('\n', ' ')
            ops.append((op_id, desc, path.lstrip('/')))
    return ops


def generate_code(ops):
    def fix_case(name: str) -> str:
        return name[0].upper() + name[1:] if name.lower().startswith("advanced") else name

    lines = [
        'from mcp.server import FastMCP',
        'from mcp.server.fastmcp import Context',
        ''
    ]

    # Import request models
    for op, _, _ in ops:
        lines.append(f'from .request_models import {fix_case(op)}Request')
    lines.append('')

    # Import response models
    for op, _, _ in ops:
        lines.append(f'from .response_models import {fix_case(op)}200Response')
    lines.append('')

    # Register tools
    lines.append('def register_all_tools(server: FastMCP, _call_endpoint):')
    for op, desc, key in ops:
        fixed_op = fix_case(op)
        lines += [
            f'    @server.tool(name="{op}",',
            f'                 description="{desc}")',
            f'    async def {op}(params: {fixed_op}Request, ctx: Context) -> {fixed_op}200Response:',
            f'        return await _call_endpoint("{key}", params, {fixed_op}200Response, ctx)',
            ''
        ]
    return '\n'.join(lines)


def main():
    spec = load_openapi_spec(OPENAPI_PATH)
    csv_paths = load_csv_paths(ENDPOINTS_PATH)
    all_spec_paths = list(spec.get("paths", {}).keys())
    extra_paths = sorted(set(all_spec_paths) - set(csv_paths))
    final_paths = csv_paths + extra_paths

    ops = collect_operations(final_paths, spec)
    total = len(ops)
    from_csv = len([op for op in ops if '/' + op[2] in csv_paths])
    from_extra = total - from_csv

    print(f"[INFO] Loaded {len(csv_paths)} paths from CSV.")
    print(f"[INFO] Found {len(all_spec_paths)} paths in OpenAPI spec.")
    print(f"[INFO] Added {from_extra} additional paths not listed in CSV.")
    print(f"[INFO] Generated {total} tools in total.")

    code = '# AUTOGENERATED FILE - DO NOT EDIT MANUALLY\n\n' + generate_code(ops)
    Path(OUTPUT_PATH).write_text(code, encoding='utf-8')
    print(f"[SUCCESS] File written to: {OUTPUT_PATH}")


if __name__ == '__main__':
    main()

```

--------------------------------------------------------------------------------
/src/mcp_server_twelve_data/common.py:
--------------------------------------------------------------------------------

```python
import os
import importlib.util
from pathlib import Path
from typing import Optional, List, Tuple
from starlette.requests import Request
from mcp.client.streamable_http import RequestContext


mcp_server_base_url = "https://mcp.twelvedata.com"
spec = importlib.util.find_spec("mcp_server_twelve_data")
MODULE_PATH = Path(spec.origin).resolve()
PACKAGE_ROOT = MODULE_PATH.parent  # src/mcp_server_twelve_data

LANCE_DB_ENDPOINTS_PATH = os.environ.get(
    "LANCE_DB_ENDPOINTS_PATH",
    str(PACKAGE_ROOT / ".." / "resources" / "endpoints.lancedb")
)

LANCE_DB_DOCS_PATH = os.environ.get(
    "LANCE_DB_DOCS_PATH",
    str(PACKAGE_ROOT / ".." / "resources" / "docs.lancedb")
)


def vector_db_exists():
    return os.path.isdir(LANCE_DB_ENDPOINTS_PATH)


def create_dummy_request_context(request: Request) -> RequestContext:
    return RequestContext(
        client=object(),
        headers=dict(request.headers),
        session_id="generated-session-id",
        session_message=object(),
        metadata=object(),
        read_stream_writer=object(),
        sse_read_timeout=10.0,
    )


class ToolPlanMap:
    def __init__(self, df):
        self.df = df
        self.plan_to_int = {
            'basic': 0,
            'grow': 1,
            'pro': 2,
            'ultra': 3,
            'enterprise': 4,
        }

    def split(self, user_plan: Optional[str], tool_operation_ids: List[str]) -> Tuple[List[str], List[str]]:
        if user_plan is None:
            # if user plan param was not specified, then we have no restrictions for function calling
            return tool_operation_ids, []
        user_plan_key = user_plan.lower()
        user_plan_int = self.plan_to_int.get(user_plan_key)
        if user_plan_int is None:
            raise ValueError(f"Wrong user_plan: '{user_plan}'")

        tools_df = self.df[self.df["id"].isin(tool_operation_ids)]

        candidates = []
        premium_only_candidates = []

        for _, row in tools_df.iterrows():
            tool_id = row["id"]
            tool_plan_raw = row["x-starting-plan"]
            if tool_plan_raw is None:
                tool_plan_raw = 'basic'

            tool_plan_key = tool_plan_raw.lower()
            tool_plan_int = self.plan_to_int.get(tool_plan_key)
            if tool_plan_int is None:
                raise ValueError(f"Wrong tool_starting_plan: '{tool_plan_key}'")

            if user_plan_int >= tool_plan_int:
                candidates.append(tool_id)
            else:
                premium_only_candidates.append(tool_id)

        return candidates, premium_only_candidates


def build_openai_tools_subset(tool_list):
    def expand_parameters(params):
        if (
            "properties" in params and
            "params" in params["properties"] and
            "$ref" in params["properties"]["params"] and
            "$defs" in params
        ):
            ref_path = params["properties"]["params"]["$ref"]
            ref_name = ref_path.split("/")[-1]
            schema = params["$defs"].get(ref_name, {})
            return {
                "type": "object",
                "properties": {
                    "params": {
                        "type": "object",
                        "properties": schema.get("properties", {}),
                        "required": schema.get("required", []),
                        "description": schema.get("description", "")
                    }
                },
                "required": ["params"]
            }
        else:
            return params

    tools = []
    for tool in tool_list:
        expanded_parameters = expand_parameters(tool.parameters)
        tools.append({
            "type": "function",
            "function": {
                "name": tool.name,
                "description": tool.description or "No description provided.",
                "parameters": expanded_parameters
            }
        })
    # [t for t in tools if t["function"]["name"] in ["GetTimeSeriesAdd", "GetTimeSeriesAd"]]
    return tools

```

--------------------------------------------------------------------------------
/test/test_mcp_main.py:
--------------------------------------------------------------------------------

```python
import json
import os
import signal

import httpx
import pytest
import asyncio
import urllib.parse


import pytest_asyncio
from dotenv import load_dotenv
from mcp import stdio_client, ClientSession, StdioServerParameters

dotenv_path = os.path.join(os.path.dirname(__file__), '..', '.env')
load_dotenv(dotenv_path)
server_url = os.environ['SERVER_URL']
td_api_key = os.environ['TWELVE_DATA_API_KEY']
OPENAI_API_KEY = os.environ['OPENAI_API_KEY']


@pytest_asyncio.fixture
def run_server_factory():
    async def _start_server(*args):
        proc = await asyncio.create_subprocess_exec(
            "python", "-m", "mcp_server_twelve_data",
            *args,
            # stdout=asyncio.subprocess.DEVNULL,
            # stderr=asyncio.subprocess.DEVNULL,
        )

        # healthcheck
        for _ in range(30):
            try:
                async with httpx.AsyncClient() as client:
                    r = await client.get(f"{server_url}/health")
                    if r.status_code == 200:
                        break
            except Exception:
                await asyncio.sleep(1)
        else:
            proc.terminate()
            raise RuntimeError("Server did not start")

        async def stop():
            proc.send_signal(signal.SIGINT)
            await proc.wait()

        return stop

    return _start_server


@pytest.mark.asyncio
async def test_call_utool(run_server_factory):
    stop_server = await run_server_factory(
        "-t", "streamable-http",
        "-k", td_api_key,
        "-u", OPENAI_API_KEY,
    )
    try:
        async with httpx.AsyncClient() as client:
            response = await client.get(
                f"{server_url}/utool?query={urllib.parse.quote('show me RSI for AAPL')}",
                timeout=30,
            )
        assert response.status_code == 200
        data = response.json()
        response = data.get("response")
        assert response
        assert "values" in response
        assert len(response["values"]) > 0
    finally:
        await stop_server()


@pytest.mark.asyncio
async def test_call_utool_both_keys_in_header(run_server_factory):
    stop_server = await run_server_factory(
        "-t", "streamable-http", "-ua"
    )

    try:
        async with httpx.AsyncClient() as client:
            response = await client.get(
                f"{server_url}/utool?query={urllib.parse.quote('show me RSI for AAPL')}",
                timeout=30,
                headers={
                    'Authorization': f'apikey {td_api_key}',
                    'X-OpenAPI-Key': OPENAI_API_KEY,
                }
            )
        assert response.status_code == 200
        data = response.json()
        response = data.get("response")
        assert response
        assert "values" in response
        assert len(response["values"]) > 0
    finally:
        await stop_server()


@pytest.mark.asyncio
async def test_call_utool_stdio():
    server_params = StdioServerParameters(
        command="python",
        args=[
            "-m", "mcp_server_twelve_data",
            "-t", "stdio",
            "-k", td_api_key,
            "-u", OPENAI_API_KEY
        ],
    )

    async with stdio_client(server_params) as (reader, writer):
        async with ClientSession(reader, writer) as session:
            await session.initialize()
            result = await session.call_tool("u-tool", arguments={"query": "show me RSI for AAPL"})
            data = json.loads(result.content[0].text)
            response = data.get("response")
            assert response
            assert "values" in response
            assert len(response["values"]) > 0


@pytest.mark.asyncio
async def test_call_time_series_stdio():
    server_params = StdioServerParameters(
        command="python",
        args=[
            "-m", "mcp_server_twelve_data",
            "-t", "stdio",
            "-k", td_api_key,
        ],
    )

    async with stdio_client(server_params) as (reader, writer):
        async with ClientSession(reader, writer) as session:
            await session.initialize()
            arguments = {
                "params": {
                    "symbol": "AAPL",
                    "interval": "1day",
                }
            }

            result = await session.call_tool("GetTimeSeries", arguments=arguments)
            data = json.loads(result.content[0].text)

            assert "values" in data
            assert len(data["values"]) > 0

```

--------------------------------------------------------------------------------
/src/mcp_server_twelve_data/doc_tool.py:
--------------------------------------------------------------------------------

```python
from typing import Optional, Literal, cast

import openai
from openai.types.chat import ChatCompletionSystemMessageParam
from starlette.requests import Request
from starlette.responses import JSONResponse

from mcp.server.fastmcp import FastMCP, Context
from mcp_server_twelve_data.common import (
    create_dummy_request_context, LANCE_DB_DOCS_PATH,
)
from mcp_server_twelve_data.doc_tool_response import DocToolResponse, doctool_func_type
from mcp_server_twelve_data.key_provider import extract_open_ai_apikey
from mcp_server_twelve_data.prompts import doctool_doc_string


def register_doc_tool(
    server: FastMCP,
    open_ai_api_key_from_args: Optional[str],
    transport: Literal["stdio", "sse", "streamable-http"],
) -> doctool_func_type:
    embedding_model = "text-embedding-3-large"
    llm_model = "gpt-4.1-mini"
    # llm_model = "gpt-4o-mini"
    # llm_model = "gpt-4.1-nano"

    db_path = LANCE_DB_DOCS_PATH
    top_k = 15

    import lancedb
    db = lancedb.connect(db_path)
    table = db.open_table("docs")

    @server.tool(name="doc-tool")
    async def doc_tool(query: str, ctx: Context) -> DocToolResponse:
        openai_key, error = extract_open_ai_apikey(
            transport=transport,
            open_ai_api_key=open_ai_api_key_from_args,
            ctx=ctx,
        )
        if error is not None:
            return DocToolResponse(query=query, error=error)

        client = openai.OpenAI(api_key=openai_key)

        try:
            embedding = client.embeddings.create(
                model=embedding_model,
                input=[query],
            ).data[0].embedding

            results = table.search(embedding).metric("cosine").limit(top_k).to_list()
            matches = [r["title"] for r in results]
            combined_text = "\n\n---\n\n".join([r["content"] for r in results])

        except Exception as e:
            return DocToolResponse(query=query, top_candidates=[], error=f"Vector search failed: {e}")

        try:
            prompt = (
                "You are a documentation assistant. Given a user query and relevant documentation sections, "
                "generate a helpful, accurate, and Markdown-formatted answer.\n\n"
                "Use:\n"
                "- Headings\n"
                "- Bullet points\n"
                "- Short paragraphs\n"
                "- Code blocks if applicable\n\n"
                "Do not repeat the full documentation — summarize only what's relevant to the query.\n\n"
                "If the user asks how to perform an action "
                "(e.g., 'how to get', 'ways to retrieve', 'methods for', etc.), "
                "and there are multiple suitable API endpoints, provide "
                "a list of the most relevant ones with a brief description of each.\n"
                "Highlight when to use which endpoint and what kind of data they return."
            )

            llm_response = client.chat.completions.create(
                model=llm_model,
                messages=[
                    cast(ChatCompletionSystemMessageParam, {"role": "system", "content": prompt}),
                    cast(ChatCompletionSystemMessageParam, {"role": "user", "content": f"User query:\n{query}"}),
                    cast(ChatCompletionSystemMessageParam,
                         {"role": "user", "content": f"Documentation:\n{combined_text}"}),
                ],
                temperature=0.2,
            )

            markdown = llm_response.choices[0].message.content.strip()
            return DocToolResponse(
                query=query,
                top_candidates=matches,
                result=markdown,
            )

        except Exception as e:
            return DocToolResponse(query=query, top_candidates=matches, error=f"LLM summarization failed: {e}")

    doc_tool.__doc__ = doctool_doc_string
    return doc_tool


def register_http_doctool(
    transport: str,
    server: FastMCP,
    doc_tool,
):
    if transport == "streamable-http":
        @server.custom_route("/doctool", ["GET"])
        async def doc_tool_http(request: Request):
            query = request.query_params.get("query")
            if not query:
                return JSONResponse({"error": "Missing 'query' query parameter"}, status_code=400)

            ctx = Context(request_context=create_dummy_request_context(request))
            result = await doc_tool(query=query, ctx=ctx)
            return JSONResponse(content=result.model_dump())

```

--------------------------------------------------------------------------------
/src/mcp_server_twelve_data/server.py:
--------------------------------------------------------------------------------

```python
from typing import Type, TypeVar, Literal, Optional
import httpx
from pydantic import BaseModel
from mcp.server.fastmcp import FastMCP, Context
from starlette.exceptions import HTTPException
from starlette.requests import Request
from starlette.responses import JSONResponse
import re

from .common import vector_db_exists
from .doc_tool import register_doc_tool, register_http_doctool
from .doc_tool_remote import register_doc_tool_remote
from .key_provider import extract_twelve_data_apikey
from .tools import register_all_tools
from .u_tool import register_u_tool, register_http_utool
from .u_tool_remote import register_u_tool_remote


def serve(
    api_base: str,
    transport: Literal["stdio", "sse", "streamable-http"],
    twelve_data_apikey: Optional[str],
    number_of_tools: int,
    u_tool_open_ai_api_key: Optional[str],
    u_tool_oauth2: bool
) -> None:
    server = FastMCP(
        "mcp-twelve-data",
        host="0.0.0.0",
        port="8000",
    )

    P = TypeVar('P', bound=BaseModel)
    R = TypeVar('R', bound=BaseModel)

    def resolve_path_params(endpoint: str, params_dict: dict) -> str:
        def replacer(match):
            key = match.group(1)
            if key not in params_dict:
                raise ValueError(f"Missing path parameter: {key}")
            return str(params_dict.pop(key))
        return re.sub(r"{(\w+)}", replacer, endpoint)

    async def _call_endpoint(
        endpoint: str,
        params: P,
        response_model: Type[R],
        ctx: Context
    ) -> R:
        params.apikey = extract_twelve_data_apikey(
            twelve_data_apikey=twelve_data_apikey,
            transport=transport,
            ctx=ctx
        )

        params_dict = params.model_dump(exclude_none=True)
        resolved_endpoint = resolve_path_params(endpoint, params_dict)

        async with httpx.AsyncClient(
            trust_env=False,
            headers={
                "accept": "application/json",
                "user-agent": "python-httpx/0.24.0"
            },
        ) as client:
            resp = await client.get(
                f"{api_base}/{resolved_endpoint}",
                params=params_dict
            )
            resp.raise_for_status()
            resp_json = resp.json()

            if isinstance(resp_json, dict):
                status = resp_json.get("status")
                if status == "error":
                    code = resp_json.get('code')
                    raise HTTPException(
                        status_code=code,
                        detail=f"Failed to perform request,"
                               f" code = {code}, message = {resp_json.get('message')}"
                    )

            return response_model.model_validate(resp_json)

    if u_tool_oauth2 or u_tool_open_ai_api_key is not None:
        # we will not publish large vector db, without it server will work in remote mode
        if vector_db_exists():
            register_all_tools(server=server, _call_endpoint=_call_endpoint)
            u_tool = register_u_tool(
                server=server,
                open_ai_api_key_from_args=u_tool_open_ai_api_key,
                transport=transport
            )
            doc_tool = register_doc_tool(
                server=server,
                open_ai_api_key_from_args=u_tool_open_ai_api_key,
                transport=transport
            )
        else:
            u_tool = register_u_tool_remote(
                server=server,
                twelve_data_apikey=twelve_data_apikey,
                open_ai_api_key_from_args=u_tool_open_ai_api_key,
                transport=transport,
            )
            doc_tool = register_doc_tool_remote(
                server=server,
                twelve_data_apikey=twelve_data_apikey,
                open_ai_api_key_from_args=u_tool_open_ai_api_key,
                transport=transport,
            )
        register_http_utool(
            transport=transport,
            u_tool=u_tool,
            server=server,
        )
        register_http_doctool(
            transport=transport,
            server=server,
            doc_tool=doc_tool,
        )

    else:
        register_all_tools(server=server, _call_endpoint=_call_endpoint)
        all_tools = server._tool_manager._tools
        server._tool_manager._tools = dict(list(all_tools.items())[:number_of_tools])

    @server.custom_route("/health", ["GET"])
    async def health(_: Request):
        return JSONResponse({"status": "ok"})

    server.run(transport=transport)

```

--------------------------------------------------------------------------------
/scripts/generate_endpoints_embeddings.py:
--------------------------------------------------------------------------------

```python
import os
import json
from typing import cast

import yaml
import openai
import lancedb
from dotenv import load_dotenv
from openai.types.chat import ChatCompletionSystemMessageParam, ChatCompletionUserMessageParam


# === CONFIG ===
load_dotenv('../.env')

OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
OPENAI_MODEL = "gpt-4o-mini"
OPENAI_MODEL_EMBEDDINGS = "text-embedding-3-large"
spec_path = os.getenv('OPENAPI_SPEC', '../extra/openapi_clean.json')
db_path = os.getenv('LANCEDB_PATH', '../data/endpoints.lancedb')
desc_path = os.getenv('DESC_JSON_PATH', '../extra/full_descriptions.json')


def load_spec(path: str) -> dict:
    with open(path, 'r', encoding='utf-8') as f:
        return yaml.safe_load(f) if path.lower().endswith(('.yaml', '.yml')) else json.load(f)


def extract_endpoints(spec: dict) -> list[dict]:
    paths = spec.get('paths', {})
    components = spec.get('components', {})

    def resolve_ref(obj):
        if isinstance(obj, dict):
            if '$ref' in obj:
                ref_path = obj['$ref'].lstrip('#/').split('/')
                resolved = spec
                for part in ref_path:
                    resolved = resolved.get(part, {})
                return resolve_ref(resolved)
            else:
                return {k: resolve_ref(v) for k, v in obj.items()}
        elif isinstance(obj, list):
            return [resolve_ref(item) for item in obj]
        return obj

    endpoints = []
    for path, methods in paths.items():
        for method, op in methods.items():
            if not isinstance(op, dict):
                continue

            parameters = op.get('parameters', [])
            request_body = op.get('requestBody', {})
            responses = []

            for code, raw_resp in op.get('responses', {}).items():
                resolved_resp = resolve_ref(raw_resp)
                content = resolved_resp.get('content', {})
                resolved_content = {}

                for mime_type, mime_obj in content.items():
                    schema = mime_obj.get('schema', {})
                    resolved_schema = resolve_ref(schema)
                    resolved_content[mime_type] = {
                        'schema': resolved_schema
                    }

                responses.append({
                    'code': code,
                    'description': resolved_resp.get('description', ''),
                    'content': resolved_content
                })

            endpoints.append({
                'path': path,
                'method': method.upper(),
                'summary': op.get('summary', ''),
                'description': op.get('description', ''),
                'parameters': parameters,
                'requestBody': request_body,
                'responses': responses,
                'operationId': op.get('operationId', f'{method}_{path}'),
                'x-starting-plan': op.get('x-starting-plan', None),
            })

    return endpoints


def generate_llm_description(info: dict) -> str:
    prompt = (
        "You are an OpenAPI endpoint explainer. Your goal is to produce a clear, concise, and "
        "natural-language explanation of the given API endpoint based on its metadata. "
        "This description will be embedded into a vector space for solving a top-N retrieval task. "
        "Given a user query, the system will compare it semantically to these embeddings to find "
        "the most relevant endpoints. Therefore, the output must reflect both the purpose of the "
        "endpoint and its parameter semantics using natural language.\n\n"
        "Please summarize the endpoint's purpose, its key input parameters and their roles, and "
        "what the endpoint returns. You may include short usage context or constraints to help clarify its behavior. "
        "Do not echo raw JSON. Avoid listing all optional or less relevant fields unless necessary for understanding.\n"
        "Instead of showing URL-style query examples, include two or three natural-language questions "
        "a user might ask that this endpoint could satisfy. These examples will help optimize the embedding "
        "for semantic search over user queries."
    )
    client = openai.OpenAI()
    messages = [
        cast(ChatCompletionSystemMessageParam, {"role": "system", "content": prompt}),
        cast(ChatCompletionUserMessageParam, {"role": "user", "content": json.dumps(info, indent=2)})
    ]

    response = client.chat.completions.create(
        model=OPENAI_MODEL,
        messages=messages,
        temperature=0.3
    )

    return response.choices[0].message.content.strip()


def generate_embedding(text: str) -> list[float]:
    response = openai.OpenAI().embeddings.create(
        model=OPENAI_MODEL_EMBEDDINGS,
        input=[text]
    )
    return response.data[0].embedding


def load_existing_descriptions(path: str) -> dict:
    if os.path.exists(path):
        with open(path, 'r', encoding='utf-8') as f:
            return json.load(f)
    return {}


def save_descriptions(path: str, data: dict):
    with open(path, 'w', encoding='utf-8') as f:
        json.dump(data, f, indent=2, ensure_ascii=False)


def main():
    spec = load_spec(spec_path)
    endpoints = extract_endpoints(spec)

    full_descriptions = load_existing_descriptions(desc_path)
    records = []

    for info in endpoints:
        try:
            operation_id = info.get('operationId', f"{info['method']}_{info['path']}")
            if operation_id in full_descriptions:
                description = full_descriptions[operation_id]
            else:
                description = generate_llm_description(info)
                full_descriptions[operation_id] = description
                save_descriptions(desc_path, full_descriptions)  # Save on each iteration

            print(f"\n--- LLM Description for {info['method']} {info['path']} ---\n{description}\n")
            vector = generate_embedding(description)
            records.append({
                'id': operation_id,
                'vector': vector,
                'path': info['path'],
                'method': info['method'],
                'summary': info['summary'],
                'x-starting-plan': info.get('x-starting-plan', None),
            })
        except Exception as e:
            print(f"Error processing {info['method']} {info['path']}: {e}")

    db = lancedb.connect(db_path)
    db.create_table(name='endpoints', data=records, mode='overwrite')

    save_descriptions(desc_path, full_descriptions)
    print(f"Indexed {len(records)} endpoints into '{db_path}' and saved LLM descriptions to '{desc_path}'.")


if __name__ == '__main__':
    main()

```

--------------------------------------------------------------------------------
/scripts/split_opnapi_by_groups.py:
--------------------------------------------------------------------------------

```python
import os
import yaml
import json
import re
from collections import defaultdict

input_path = "../data/openapi01.06.2025.json"
output_dir = "../data"

GROUPS = {
    "reference_data": [
        "/stocks",
        "/forex_pairs",
        "/cryptocurrencies",
        "/funds",
        "/bonds",
        "/etfs",
        "/commodities",
        "/cross_listings",
        "/exchanges",
        "/exchange_schedule",
        "/cryptocurrency_exchanges",
        "/market_state",
        "/instrument_type",
        "/countries",
        "/earliest_timestamp",
        "/symbol_search",
        "/intervals"
    ],
    "core_data": [
        "/time_series",
        "/time_series/cross",
        "/exchange_rate",
        "/currency_conversion",
        "/quote",
        "/price",
        "/eod",
        "/market_movers/{market}"
    ],
    "mutual_funds": [
        "/mutual_funds/list",
        "/mutual_funds/family",
        "/mutual_funds/type",
        "/mutual_funds/world",
        "/mutual_funds/world/summary",
        "/mutual_funds/world/performance",
        "/mutual_funds/world/risk",
        "/mutual_funds/world/ratings",
        "/mutual_funds/world/composition",
        "/mutual_funds/world/purchase_info",
        "/mutual_funds/world/sustainability"
    ],
    "etfs": [
        "/etfs/list",
        "/etfs/family",
        "/etfs/type",
        "/etfs/world",
        "/etfs/world/summary",
        "/etfs/world/performance",
        "/etfs/world/risk",
        "/etfs/world/composition"
    ],
    "fundamentals": [
        "/balance_sheet",
        "/balance_sheet/consolidated",
        "/cash_flow",
        "/cash_flow/consolidated",
        "/dividends",
        "/dividends_calendar",
        "/earnings",
        "/income_statement",
        "/income_statement/consolidated",
        "/ipo_calendar",
        "/key_executives",
        "/last_change/{endpoint}",
        "/logo",
        "/market_cap",
        "/profile",
        "/splits",
        "/splits_calendar",
        "/statistics"
    ],
    "analysis": [
        "/analyst_ratings/light",
        "/analyst_ratings/us_equities",
        "/earnings_estimate",
        "/revenue_estimate",
        "/eps_trend",
        "/eps_revisions",
        "/growth_estimates",
        "/price_target",
        "/recommendations",
        "/earnings_calendar"
    ],
    "regulatory": [
        "/tax_info",
        "/edgar_filings/archive",
        "/insider_transactions",
        "/direct_holders",
        "/fund_holders",
        "/institutional_holders",
        "/sanctions/{source}"
    ]
}

mirrors = [
    "https://api-reference-data.twelvedata.com",
    "https://api-time-series.twelvedata.com",
    "https://api-mutual-funds.twelvedata.com",
    "https://api-etfs.twelvedata.com",
    "https://api-fundamental.twelvedata.com",
    "https://api-analysis.twelvedata.com",
    "https://api-regulator.twelvedata.com",
]


def load_spec(path):
    with open(path, 'r', encoding='utf-8') as f:
        if path.lower().endswith(('.yaml', '.yml')):
            return yaml.safe_load(f)
        return json.load(f)


def dump_spec(spec, path):
    with open(path, 'w', encoding='utf-8') as f:
        if path.lower().endswith(('.yaml', '.yml')):
            yaml.safe_dump(spec, f, sort_keys=False, allow_unicode=True)
        else:
            json.dump(spec, f, ensure_ascii=False, indent=2)


def find_refs(obj):
    refs = set()
    if isinstance(obj, dict):
        for k, v in obj.items():
            if k == '$ref' and isinstance(v, str):
                refs.add(v)
            else:
                refs |= find_refs(v)
    elif isinstance(obj, list):
        for item in obj:
            refs |= find_refs(item)
    return refs


def prune_components(full_components, used_refs):
    pattern = re.compile(r'^#/components/([^/]+)/(.+)$')
    used = defaultdict(set)
    for ref in used_refs:
        m = pattern.match(ref)
        if m:
            comp_type, comp_name = m.group(1), m.group(2)
            used[comp_type].add(comp_name)
    changed = True
    while changed:
        changed = False
        for comp_type, names in list(used.items()):
            for name in list(names):
                definition = full_components.get(comp_type, {}).get(name)
                if definition:
                    for r in find_refs(definition):
                        m2 = pattern.match(r)
                        if m2:
                            ct, cn = m2.group(1), m2.group(2)
                            if cn not in used[ct]:
                                used[ct].add(cn)
                                changed = True
    pruned = {}
    for comp_type, defs in full_components.items():
        if comp_type in used:
            kept = {n: defs[n] for n in defs if n in used[comp_type]}
            if kept:
                pruned[comp_type] = kept
    return pruned


def trim_fields(obj):
    if isinstance(obj, dict):
        for k, v in obj.items():
            if k == "description" and isinstance(v, str):
                if len(v) > 300:
                    obj[k] = v[:300]
            elif k == "example" and isinstance(v, str):
                if len(v) > 700:
                    obj[k] = v[:700]
            else:
                trim_fields(v)
    elif isinstance(obj, list):
        for item in obj:
            trim_fields(item)


def add_empty_properties(obj):
    if isinstance(obj, dict):
        for k, v in obj.items():
            if k == "schema" and isinstance(v, dict):
                if "properties" not in v:
                    v["properties"] = {}
                add_empty_properties(v)
            else:
                add_empty_properties(v)
    elif isinstance(obj, list):
        for item in obj:
            add_empty_properties(item)


def filter_paths(all_paths, allowed_list):
    return [path for path in all_paths if path in allowed_list]


def main():
    os.makedirs(output_dir, exist_ok=True)
    spec = load_spec(input_path)
    all_paths = set(spec.get('paths', {}).keys())

    for idx, (group_name, group_paths) in enumerate(GROUPS.items()):
        group_allowed = filter_paths(all_paths, group_paths)
        if not group_allowed:
            continue

        new_spec = {
            'openapi': spec.get('openapi'),
            'info': spec.get('info'),
            'servers': [{'url': mirrors[idx]}] if idx < len(mirrors) else spec.get('servers', []),
            'paths': {k: spec['paths'][k] for k in group_allowed}
        }

        add_empty_properties(new_spec)
        trim_fields(new_spec)
        used_refs = find_refs(new_spec['paths'])
        pruned = prune_components(spec.get('components', {}), used_refs)
        if pruned:
            new_spec['components'] = pruned

        out_file = os.path.join(output_dir, f"{os.path.splitext(os.path.basename(input_path))[0]}_{group_name}{os.path.splitext(input_path)[1]}")
        dump_spec(new_spec, out_file)
        print(f"{group_name}: {len(new_spec['paths'])} paths -> {out_file}")


if __name__ == "__main__":
    main()

```

--------------------------------------------------------------------------------
/src/mcp_server_twelve_data/u_tool.py:
--------------------------------------------------------------------------------

```python
from starlette.requests import Request

import openai
import json

from mcp.server.fastmcp import FastMCP, Context
from pydantic import BaseModel
from typing import Optional, List, cast, Literal
from openai.types.chat import ChatCompletionSystemMessageParam
from starlette.responses import JSONResponse

from mcp_server_twelve_data.common import create_dummy_request_context, ToolPlanMap, \
    build_openai_tools_subset, LANCE_DB_ENDPOINTS_PATH
from mcp_server_twelve_data.key_provider import extract_open_ai_apikey
from mcp_server_twelve_data.prompts import utool_doc_string
from mcp_server_twelve_data.u_tool_response import UToolResponse, utool_func_type


def get_md_response(
    client: openai.OpenAI,
    llm_model: str,
    query: str,
    result: BaseModel
) -> str:
    prompt = """
    You are a Markdown report generator.
    
    Your task is to generate a clear, well-structured and readable response in Markdown format based on:
    1. A user query
    2. A JSON object containing the data relevant to the query
    
    Instructions:
    - Do NOT include raw JSON.
    - Instead, extract relevant information and present it using Markdown structure: headings, bullet points, tables,
      bold/italic text, etc.
    - Be concise, accurate, and helpful.
    - If the data is insufficient to fully answer the query, say so clearly.
    
    Respond only with Markdown. Do not explain or include extra commentary outside of the Markdown response.
    """

    llm_response = client.chat.completions.create(
        model=llm_model,
        messages=[
            cast(ChatCompletionSystemMessageParam, {"role": "system", "content": prompt}),
            cast(ChatCompletionSystemMessageParam, {"role": "user", "content": f"User query:\n{query}"}),
            cast(ChatCompletionSystemMessageParam, {"role": "user", "content": f"Data:\n{result.model_dump_json()}"}),
        ],
        temperature=0,
    )

    return llm_response.choices[0].message.content.strip()


def register_u_tool(
    server: FastMCP,
    open_ai_api_key_from_args: Optional[str],
    transport: Literal["stdio", "sse", "streamable-http"],
) -> utool_func_type:
    # llm_model = "gpt-4o"         # Input $2.5,   Output $10
    # llm_model = "gpt-4-turbo"    # Input $10.00, Output $30
    llm_model = "gpt-4o-mini"    # Input $0.15,  Output $0.60
    # llm_model = "gpt-4.1-nano"     # Input $0.10,  Output $0.40

    embedding_model = "text-embedding-3-large"
    top_n = 30

    all_tools = server._tool_manager._tools
    server._tool_manager._tools = {}  # leave only u-tool

    import lancedb
    db = lancedb.connect(LANCE_DB_ENDPOINTS_PATH)
    table = db.open_table("endpoints")
    table_df = table.to_pandas()
    tool_plan_map = ToolPlanMap(table_df)

    @server.tool(name="u-tool")
    async def u_tool(
        query: str,
        ctx: Context,
        format: Optional[str] = None,
        plan: Optional[str] = None,
    ) -> UToolResponse:
        o_ai_api_key_to_use, error = extract_open_ai_apikey(
            transport=transport,
            open_ai_api_key=open_ai_api_key_from_args,
            ctx=ctx,
        )
        if error is not None:
            return UToolResponse(error=error)

        client = openai.OpenAI(api_key=o_ai_api_key_to_use)
        all_candidate_ids: List[str]

        try:
            embedding = client.embeddings.create(
                model=embedding_model,
                input=[query]
            ).data[0].embedding

            results = table.search(embedding).metric("cosine").limit(top_n).to_list()  # type: ignore[attr-defined]
            all_candidate_ids = [r["id"] for r in results]
            if "GetTimeSeries" not in all_candidate_ids:
                all_candidate_ids.append('GetTimeSeries')

            candidates, premium_only_candidates = tool_plan_map.split(
                user_plan=plan, tool_operation_ids=all_candidate_ids
            )

        except Exception as e:
            return UToolResponse(error=f"Embedding or vector search failed: {e}")

        filtered_tools = [tool for tool in all_tools.values() if tool.name in candidates]  # type: ignore
        openai_tools = build_openai_tools_subset(filtered_tools)

        prompt = (
            "You are a function-calling assistant. Based on the user query, "
            "you must select the most appropriate function from the provided tools and return "
            "a valid tool call with all required parameters. "
            "Before the function call, provide a brief plain-text explanation (1–2 sentences) of "
            "why you chose that function, based on the user's intent and tool descriptions."
        )

        try:
            llm_response = client.chat.completions.create(
                model=llm_model,
                messages=[
                    cast(ChatCompletionSystemMessageParam, {"role": "system", "content": prompt}),
                    cast(ChatCompletionSystemMessageParam, {"role": "user", "content": query}),
                ],
                tools=openai_tools,
                tool_choice="required",
                temperature=0,
            )

            call = llm_response.choices[0].message.tool_calls[0]
            name = call.function.name
            arguments = json.loads(call.function.arguments)
            # all tools require single parameter with nested attributes, but sometimes LLM flattens it
            if "params" not in arguments:
                arguments = {"params": arguments}

        except Exception as e:
            return UToolResponse(
                top_candidates=candidates,
                premium_only_candidates=premium_only_candidates,
                error=f"LLM did not return valid tool call: {e}",
            )

        tool = all_tools.get(name)
        if not tool:
            return UToolResponse(
                top_candidates=candidates,
                premium_only_candidates=premium_only_candidates,
                selected_tool=name,
                param=arguments,
                error=f"Tool '{name}' not found in MCP",
            )

        try:
            params_type = tool.fn_metadata.arg_model.model_fields["params"].annotation
            arguments['params'] = params_type(**arguments['params'])
            arguments['ctx'] = ctx

            result = await tool.fn(**arguments)

            if format == "md":
                result = get_md_response(
                    client=client,
                    llm_model=llm_model,
                    query=query,
                    result=result,
                )

            return UToolResponse(
                top_candidates=candidates,
                premium_only_candidates=premium_only_candidates,
                selected_tool=name,
                param=arguments,
                response=result,
            )
        except Exception as e:
            return UToolResponse(
                top_candidates=candidates,
                premium_only_candidates=premium_only_candidates,
                selected_tool=name,
                param=arguments,
                error=str(e),
            )
    u_tool.__doc__ = utool_doc_string
    return u_tool


def register_http_utool(
    transport: str,
    server: FastMCP,
    u_tool,
):
    if transport == "streamable-http":
        @server.custom_route("/utool", ["GET"])
        async def u_tool_http(request: Request):
            query = request.query_params.get("query")
            format_param = request.query_params.get("format", default="json").lower()
            user_plan_param = request.query_params.get("plan", None)
            if not query:
                return JSONResponse({"error": "Missing 'query' query parameter"}, status_code=400)

            request_context = create_dummy_request_context(request)
            ctx = Context(request_context=request_context)
            result = await u_tool(
                query=query, ctx=ctx,
                format=format_param,
                plan=user_plan_param
            )

            return JSONResponse(content=result.model_dump(mode="json"))

```

--------------------------------------------------------------------------------
/scripts/split_openapi.py:
--------------------------------------------------------------------------------

```python
import os
import yaml
import json
import re
from collections import defaultdict

input_path = "../data/openapi01.06.2025.json"
output_dir = "../data"

mirrors = [
    "https://api-reference-data.twelvedata.com",
    "https://api-time-series.twelvedata.com",
    "https://api-mutual-funds.twelvedata.com",
    "https://api-etfs.twelvedata.com",
    "https://api-fundamental.twelvedata.com",
    "https://api-analysis.twelvedata.com",
    "https://api-regulator.twelvedata.com",
    "https://api-ti-overlap-studies.twelvedata.com",
    "https://api-ti-volume-indicators.twelvedata.com",
    "https://api-ti-price-transform.twelvedata.com",
    "https://api-ti-cycle-indicators.twelvedata.com",
    "https://api-ti-statistics-functions.twelvedata.com"
]

allowed_endpoints = [
    # Reference Data
    "/stocks",
    "/forex_pairs",
    "/cryptocurrencies",
    "/funds",
    "/bonds",
    "/etfs",
    "/commodities",
    "/cross_listings",
    "/exchanges",
    "/exchange_schedule",
    "/cryptocurrency_exchanges",
    "/market_state",
    "/instrument_type",
    "/countries",
    "/earliest_timestamp",
    "/symbol_search",
    "/intervals",

    # Core Data
    "/time_series",
    "/time_series/cross",
    "/exchange_rate",
    "/currency_conversion",
    "/quote",
    "/price",
    "/eod",
    "/market_movers/{market}",

    # Mutual Funds
    "/mutual_funds/list",
    "/mutual_funds/family",
    "/mutual_funds/type",
    "/mutual_funds/world",
    "/mutual_funds/world/summary",
    "/mutual_funds/world/performance",
    "/mutual_funds/world/risk",
    "/mutual_funds/world/ratings",
    "/mutual_funds/world/composition",
    "/mutual_funds/world/purchase_info",
    "/mutual_funds/world/sustainability",

    # ETFs
    "/etfs/list",
    "/etfs/family",
    "/etfs/type",
    "/etfs/world",
    "/etfs/world/summary",
    "/etfs/world/performance",
    "/etfs/world/risk",
    "/etfs/world/composition",

    # Fundamentals
    "/balance_sheet",
    "/balance_sheet/consolidated",
    "/cash_flow",
    "/cash_flow/consolidated",
    "/dividends",
    "/dividends_calendar",
    "/earnings",
    "/income_statement",
    "/income_statement/consolidated",
    "/ipo_calendar",
    "/key_executives",
    "/last_change/{endpoint}",
    "/logo",
    "/market_cap",
    "/profile",
    "/splits",
    "/splits_calendar",
    "/statistics",

    # Analysis
    "/analyst_ratings/light",
    "/analyst_ratings/us_equities",
    "/earnings_estimate",
    "/revenue_estimate",
    "/eps_trend",
    "/eps_revisions",
    "/growth_estimates",
    "/price_target",
    "/recommendations",
    "/earnings_calendar",

    # Regulatory
    "/tax_info",
    "/edgar_filings/archive",
    "/insider_transactions",
    "/direct_holders",
    "/fund_holders",
    "/institutional_holders",
    "/sanctions/{source}",
]

added_endpoints = []


def load_spec(path):
    with open(path, 'r', encoding='utf-8') as f:
        if path.lower().endswith(('.yaml', '.yml')):
            return yaml.safe_load(f)
        return json.load(f)


def dump_spec(spec, path):
    with open(path, 'w', encoding='utf-8') as f:
        if path.lower().endswith(('.yaml', '.yml')):
            yaml.safe_dump(spec, f, sort_keys=False, allow_unicode=True)
        else:
            json.dump(spec, f, ensure_ascii=False, indent=2)


def find_refs(obj):
    refs = set()
    if isinstance(obj, dict):
        for k, v in obj.items():
            if k == '$ref' and isinstance(v, str):
                refs.add(v)
            else:
                refs |= find_refs(v)
    elif isinstance(obj, list):
        for item in obj:
            refs |= find_refs(item)
    return refs


def split_paths(keys, chunk_size=25):
    for i in range(0, len(keys), chunk_size):
        yield keys[i:i + chunk_size]


def filter_paths(all_paths, allowed_list):
    """
    Returns a list of only those paths from all_paths that are present in allowed_list.
    Additionally, prints paths that are in allowed_list but not found in all_paths.
    """
    f_paths = [path for path in all_paths if path in allowed_list]
    missing_paths = [path for path in allowed_list if path not in f_paths]
    if missing_paths:
        print("Paths in allowed_list but not found in all_paths:", missing_paths)
    return f_paths


def prune_components(full_components, used_refs):
    pattern = re.compile(r'^#/components/([^/]+)/(.+)$')
    used = defaultdict(set)

    # Mark direct references
    for ref in used_refs:
        m = pattern.match(ref)
        if m:
            comp_type, comp_name = m.group(1), m.group(2)
            used[comp_type].add(comp_name)

    # Recursively include nested references
    changed = True
    while changed:
        changed = False
        for comp_type, names in list(used.items()):
            for name in list(names):
                definition = full_components.get(comp_type, {}).get(name)
                if definition:
                    for r in find_refs(definition):
                        m2 = pattern.match(r)
                        if m2:
                            ct, cn = m2.group(1), m2.group(2)
                            if cn not in used[ct]:
                                used[ct].add(cn)
                                changed = True

    # Assemble a limited set
    pruned = {}
    for comp_type, defs in full_components.items():
        if comp_type in used:
            kept = {n: defs[n] for n in defs if n in used[comp_type]}
            if kept:
                pruned[comp_type] = kept
    return pruned


def trim_fields(obj):
    """
    Recursively trims string fields:
    - 'description' to 300 characters
    - 'example' to 700 characters
    """
    if isinstance(obj, dict):
        for k, v in obj.items():
            if k == "description" and isinstance(v, str):
                if len(v) > 300:
                    obj[k] = v[:300]
            elif k == "example" and isinstance(v, str):
                if len(v) > 700:
                    obj[k] = v[:700]
            else:
                trim_fields(v)
    elif isinstance(obj, list):
        for item in obj:
            trim_fields(item)


def add_empty_properties(obj):
    """
    Recursively searches for all occurrences of the 'schema' key.
    If its value is a dict and this dict does not have 'properties',
    adds 'properties': {}.
    """
    if isinstance(obj, dict):
        for k, v in obj.items():
            if k == "schema" and isinstance(v, dict):
                if "properties" not in v:
                    v["properties"] = {}
                # Continue deep traversal within the found schema
                add_empty_properties(v)
            else:
                add_empty_properties(v)
    elif isinstance(obj, list):
        for item in obj:
            add_empty_properties(item)


def main():
    os.makedirs(output_dir, exist_ok=True)

    spec = load_spec(input_path)
    all_paths = list(spec.get('paths', {}).keys())
    f_paths = filter_paths(all_paths, allowed_endpoints)
    chunks = list(split_paths(f_paths))

    for idx, paths_chunk in enumerate(chunks, start=1):
        # Build a new part of the specification
        new_spec = {
            'openapi': spec.get('openapi'),
            'info': spec.get('info'),
            'servers': (
                [{'url': mirrors[idx - 1]}]
                if idx - 1 < len(mirrors)
                else spec.get('servers', [])
            ),
            'paths': {k: spec['paths'][k] for k in paths_chunk}
        }

        # Trim long fields and add missing 'properties'
        add_empty_properties(new_spec)
        trim_fields(new_spec)

        # Prune components
        used_refs = find_refs(new_spec['paths'])
        pruned = prune_components(spec.get('components', {}), used_refs)
        if pruned:
            new_spec['components'] = pruned

        # Calculate metrics and save the file
        new_paths_count = len(new_spec['paths'])
        new_components_count = sum(
            len(v) for v in new_spec.get('components', {}).values()
        )
        out_file = os.path.join(
            output_dir,
            f"{os.path.splitext(os.path.basename(input_path))[0]}"
            f"_part{idx}{os.path.splitext(input_path)[1]}"
        )
        dump_spec(new_spec, out_file)
        print(
            f"Part {idx}: {new_paths_count} paths, "
            f"{new_components_count} components -> {out_file}"
        )


if __name__ == "__main__":
    main()

```

--------------------------------------------------------------------------------
/scripts/generate_requests_models.py:
--------------------------------------------------------------------------------

```python
import json
from pathlib import Path
import keyword
from typing import Any, List, Optional

OPENAPI_PATH = "../extra/openapi_clean.json"
REQUESTS_FILE = "../data/request_models.py"

PRIMITIVES = {
    "string": "str",
    "integer": "int",
    "number": "float",
    "boolean": "bool",
    "object": "dict",
    "array": "list",
}


def canonical_class_name(opid: str, suffix: str) -> str:
    if not opid:
        return ""
    return opid[0].upper() + opid[1:] + suffix


def safe_field_name(name: str) -> str:
    # Append underscore if name is a Python keyword
    if keyword.iskeyword(name):
        return name + "_"
    return name


def python_type(schema: dict, components: dict) -> str:
    # Resolve $ref to the corresponding model class name
    if "$ref" in schema:
        ref_name = schema["$ref"].split("/")[-1]
        return canonical_class_name(ref_name, "")
    # Handle allOf by delegating to the first subschema
    if "allOf" in schema:
        for subschema in schema["allOf"]:
            return python_type(subschema, components)
    t = schema.get("type", "string")
    if t == "array":
        # Construct type for lists recursively
        return f"list[{python_type(schema.get('items', {}), components)}]"
    return PRIMITIVES.get(t, "Any")


def resolve_schema(schema: dict, components: dict) -> dict:
    # Fully resolve $ref and allOf compositions into a merged schema
    if "$ref" in schema:
        ref = schema["$ref"].split("/")[-1]
        return resolve_schema(components.get(ref, {}), components)
    if "allOf" in schema:
        merged = {"properties": {}, "required": [], "description": ""}
        for subschema in schema["allOf"]:
            sub = resolve_schema(subschema, components)
            merged["properties"].update(sub.get("properties", {}))
            merged["required"].extend(sub.get("required", []))
            if sub.get("description"):
                merged["description"] += sub["description"] + "\n"
        merged["required"] = list(set(merged["required"]))
        merged["description"] = merged["description"].strip() or None
        return merged
    return schema


def collect_examples(param: dict, sch: dict) -> List[Any]:
    # Collect all examples from parameter, schema, and enums without deduplication
    examples: List[Any] = []
    if "example" in param:
        examples.append(param["example"])
    if "examples" in param:
        exs = param["examples"]
        if isinstance(exs, dict):
            for v in exs.values():
                examples.append(v["value"] if isinstance(v, dict) and "value" in v else v)
        elif isinstance(exs, list):
            examples.extend(exs)
    if "example" in sch:
        examples.append(sch["example"])
    if "examples" in sch:
        exs = sch["examples"]
        if isinstance(exs, dict):
            for v in exs.values():
                examples.append(v["value"] if isinstance(v, dict) and "value" in v else v)
        elif isinstance(exs, list):
            examples.extend(exs)
    # Include enum values as examples if present
    if "enum" in sch and isinstance(sch["enum"], list):
        examples.extend(sch["enum"])
    return [e for e in examples if e is not None]


def gen_field(name: str, typ: str, required: bool, desc: Optional[str],
              examples: List[Any], default: Any) -> str:
    name = safe_field_name(name)
    # Wrap in Optional[...] if default is None and field is not required
    if default is None and not required:
        typ = f"Optional[{typ}]"
    args: List[str] = []
    if required:
        args.append("...")
    else:
        args.append(f"default={repr(default)}")
    if desc:
        args.append(f"description={repr(desc)}")
    if examples:
        args.append(f"examples={repr(examples)}")
    return f"    {name}: {typ} = Field({', '.join(args)})"


def gen_class(name: str, props: dict, desc: Optional[str]) -> str:
    lines = [f"class {name}(BaseModel):"]
    if desc:
        # Add class docstring if description is present
        lines.append(f'    """{desc.replace(chr(34)*3, "")}"""')
    if not props:
        lines.append("    pass")
    else:
        for pname, fdict in props.items():
            lines.append(gen_field(
                pname,
                fdict["type"],
                fdict["required"],
                fdict["description"],
                fdict["examples"],
                fdict["default"]
            ))
    return "\n".join(lines)


def main():
    # Load the OpenAPI specification
    with open(OPENAPI_PATH, "r", encoding="utf-8") as f:
        spec = json.load(f)

    components = spec.get("components", {}).get("schemas", {})
    request_models: List[str] = []
    request_names: set = set()

    for path, methods in spec.get("paths", {}).items():
        for http_method, op in methods.items():
            opid = op.get("operationId")
            if not opid:
                continue
            class_name = canonical_class_name(opid, "Request")

            # Collect parameters from path, query, header, etc.
            props: dict = {}
            for param in op.get("parameters", []):
                name = param["name"]
                sch = param.get("schema", {"type": "string"})
                typ = python_type(sch, components)
                required = param.get("required", False)
                desc = param.get("description") or sch.get("description")
                examples = collect_examples(param, sch)
                default = sch.get("default", None)
                props[name] = {
                    "type": typ,
                    "required": required,
                    "description": desc,
                    "examples": examples,
                    "default": default,
                }

            # Collect JSON body properties
            body = op.get("requestBody", {}) \
                     .get("content", {}) \
                     .get("application/json", {}) \
                     .get("schema")
            if body:
                body_sch = resolve_schema(body, components)
                for name, sch in body_sch.get("properties", {}).items():
                    typ = python_type(sch, components)
                    required = name in body_sch.get("required", [])
                    desc = sch.get("description")
                    examples = collect_examples({}, sch)
                    default = sch.get("default", None)
                    props[name] = {
                        "type": typ,
                        "required": required,
                        "description": desc,
                        "examples": examples,
                        "default": default,
                    }

            if "outputsize" not in props:
                props["outputsize"] = {
                    "type": "int",
                    "required": False,
                    "description": (
                        "Number of data points to retrieve. Supports values in the range from `1` to `5000`. "
                        "Default `10` when no date parameters are set, otherwise set to maximum"
                    ),
                    "examples": [10],
                    "default": 10,
                }
            else:
                props["outputsize"]["default"] = 10
                props["outputsize"]["description"] = props["outputsize"]["description"].replace(
                    'Default `30`', 'Default `10`'
                )
                props["outputsize"]["examples"] = [10]

            # Add apikey with default="demo"
            props["apikey"] = {
                "type": "str",
                "required": False,
                "description": "API key",
                "examples": ["demo"],
                "default": "demo",
            }

            if "interval" in props:
                props["interval"]["required"] = False
                props["interval"]["default"] = "1day"

            # Append plan availability to the description if x-starting-plan is present
            starting_plan = op.get("x-starting-plan")
            description = op.get("description", "")
            if starting_plan:
                addon = f" Available starting from the `{starting_plan}` plan."
                description = (description or "") + addon

            code = gen_class(class_name, props, description)

            if class_name not in request_names:
                request_models.append(code)
                request_names.add(class_name)

    # Write all generated models to the target file
    header = (
        "from pydantic import BaseModel, Field\n"
        "from typing import Any, List, Optional\n\n"
    )
    Path(REQUESTS_FILE).write_text(header + "\n\n".join(request_models), encoding="utf-8")
    print(f"Generated request models: {REQUESTS_FILE}")


if __name__ == "__main__":
    main()

```

--------------------------------------------------------------------------------
/test/endpoint_pairs.py:
--------------------------------------------------------------------------------

```python
pairs_ = [
    # ('Show me batches for AAPL.', 'advanced'),  # skipped-error,
    ('Tell me the last update time for Apple’s income statement?', 'GetLastChanges'),
]

pairs = [
    ('Give me the accumulation/distribution indicator for AAPL.', 'GetTimeSeriesAd'),
    ('Show me add for AAPL.', 'GetTimeSeriesAdd'),
    ('Show me adosc for AAPL.', 'GetTimeSeriesAdOsc'),
    ('Show me adx for AAPL.', 'GetTimeSeriesAdx'),
    ("Show me the Average Directional Movement Index Rating (ADXR) time series for AAPL.", "GetTimeSeriesAdxr"),
    ('Show me analyst ratings - light for AAPL.', 'GetAnalystRatingsLight'),
    ('Show me analyst ratings - us equities for AAPL.', 'GetAnalystRatingsUsEquities'),
    ('How many API requests have I made in the last minute', 'GetApiUsage'),
    ('Show me apo for AAPL.', 'GetTimeSeriesApo'),
    ('Show me aroon for AAPL.', 'GetTimeSeriesAroon'),

    ('Show me aroonosc for AAPL.', 'GetTimeSeriesAroonOsc'),
    ('Show me atr for AAPL.', 'GetTimeSeriesAtr'),
    ('Show me avg for AAPL.', 'GetTimeSeriesAvg'),
    ('Show me avgprice for AAPL.', 'GetTimeSeriesAvgPrice'),
    ('Show me balance sheet for AAPL.', 'GetBalanceSheet'),
    ('Show me balance sheet consolidated for AAPL.', 'GetBalanceSheetConsolidated'),
    # ('Show me batches for AAPL.', 'advanced'),  # skipped-error,
    ('Show me bbands for AAPL.', 'GetTimeSeriesBBands'),
    ('Show me beta for AAPL.', 'GetTimeSeriesBeta'),
    ('Show me bonds list for AAPL.', 'GetBonds'),

    ('Show me bop for AAPL.', 'GetTimeSeriesBop'),
    ('Show me cash flow for AAPL.', 'GetCashFlow'),
    ('Show me cash flow consolidated for AAPL.', 'GetCashFlowConsolidated'),
    ('Show me cci for AAPL.', 'GetTimeSeriesCci'),
    ('Show me ceil for AAPL.', 'GetTimeSeriesCeil'),
    ('Show me cmo for AAPL.', 'GetTimeSeriesCmo'),
    ('Show me commodities list for AAPL.', 'GetCommodities'),
    ('Show me coppock for AAPL.', 'GetTimeSeriesCoppock'),
    ('Show me correl for AAPL.', 'GetTimeSeriesCorrel'),
    ('Show me countries list for AAPL.', 'GetCountries'),

    ('Show me cross listings for AAPL.', 'GetCrossListings'),
    ('Show me crsi for AAPL.', 'GetTimeSeriesCrsi'),
    ('Show me cryptocurrencies list for BTC/USD.', 'GetCryptocurrencies'),
    ('Show me cryptocurrency exchanges', 'GetCryptocurrencyExchanges'),
    ('Show me currency conversion for EUR/USD.', 'GetCurrencyConversion'),
    ('Show me dema for AAPL.', 'GetTimeSeriesDema'),
    ('Show me direct holders for AAPL.', 'GetDirectHolders'),
    ('Calculate DIV indicator for AAPL.', 'GetTimeSeriesDiv'),
    ('Show me dividends for AAPL.', 'GetDividends'),
    ('Show me dividends calendar for AAPL.', 'GetDividendsCalendar'),

    ('Show me dpo for AAPL.', 'GetTimeSeriesDpo'),
    ('Show me dx for AAPL.', 'GetTimeSeriesDx'),
    ('Show me earliest timestamp for AAPL.', 'GetEarliestTimestamp'),
    ('Show me earnings for AAPL.', 'GetEarnings'),
    ('Show me earnings calendar for China for 2024 year.', 'GetEarningsCalendar'),
    ('Show me earnings estimate for AAPL.', 'GetEarningsEstimate'),
    ('Show me edgar filings archive for AAPL.', 'GetEdgarFilingsArchive'),
    ('Show me ema for AAPL.', 'GetTimeSeriesEma'),
    ('Show me end of day price for AAPL.', 'GetEod'),
    ('Show me eps revisions for AAPL.', 'GetEpsRevisions'),

    ('Show me eps trend for AAPL.', 'GetEpsTrend'),
    ('Show me ETFs for SPY on NYSE.', 'GetEtf'),
    ('Show me ETFs in the same family as SPY.', 'GetETFsFamily'),
    ("Show me the full list of bond-type exchange-traded funds issued by BlackRock investment company.", "GetETFsList"),
    ('Show me ETF types available in the United States.', 'GetETFsType'),
    ("Give me a complete ETF analysis report for IVV, with all metrics like performance,"
     " summary, volatility, sector weights and country allocations.", "GetETFsWorld"),
    ("Show me the portfolio composition of ETF IVV.", "GetETFsWorldComposition"),
    ("Show me performance for the iShares Core S&P 500 ETF.", "GetETFsWorldPerformance"),
    ("Show me the risk metrics for the iShares Core S&P 500 ETF.", "GetETFsWorldRisk"),
    ('Show me a summary for the SPY ETF.', 'GetETFsWorldSummary'),

    ('Show me the exchange rate from USD to EUR.', 'GetExchangeRate'),
    ('Show me exchange schedule for AAPL.', 'GetExchangeSchedule'),
    ('Show me the list of available exchanges.', 'GetExchanges'),
    ('Show me exp for AAPL.', 'GetTimeSeriesExp'),
    ('Show me floor for AAPL.', 'GetTimeSeriesFloor'),
    ('Show me all available forex trading pairs.', 'GetForexPairs'),
    ('Show me fund holders for AAPL.', 'GetFundHolders'),
    ('Show me funds list for AAPL.', 'GetFunds'),
    ('Show me growth estimates for AAPL.', 'GetGrowthEstimates'),
    ('Show me heikinashicandles for AAPL.', 'GetTimeSeriesHeikinashiCandles'),

    ('Show me hlc3 for AAPL.', 'GetTimeSeriesHlc3'),
    ('Show me ht_dcperiod for AAPL.', 'GetTimeSeriesHtDcPeriod'),
    ('Show me ht_dcphase for AAPL.', 'GetTimeSeriesHtDcPhase'),
    ('Show me ht_phasor for AAPL.', 'GetTimeSeriesHtPhasor'),
    ('Show me ht_sine for AAPL.', 'GetTimeSeriesHtSine'),
    ('Show me ht_trendline for AAPL.', 'GetTimeSeriesHtTrendline'),
    ('Show me ht_trendmode for AAPL.', 'GetTimeSeriesHtTrendMode'),
    ('Show me ichimoku for AAPL.', 'GetTimeSeriesIchimoku'),
    ('Show me income statement for AAPL.', 'GetIncomeStatement'),
    ('Show me income statement consolidated for AAPL.', 'GetIncomeStatementConsolidated'),

    ('Show me insider transactions for AAPL.', 'GetInsiderTransactions'),
    ('Show me institutional holders for AAPL.', 'GetInstitutionalHolders'),
    ('What types of instruments are available through the API?', 'GetInstrumentType'),
    ('Show me the list of available time intervals.', 'GetIntervals'),
    ('Show me the IPO calendar for upcoming companies.', 'GetIpoCalendar'),
    ('Show me kama for AAPL.', 'GetTimeSeriesKama'),
    ('Show me keltner for AAPL.', 'GetTimeSeriesKeltner'),
    ('Show me key executives for AAPL.', 'GetKeyExecutives'),
    ('Show me kst for AAPL.', 'GetTimeSeriesKst'),
    ('Tell me the last update time for Apple’s income statement?', 'GetLastChanges'),

    ('Show me linearreg for AAPL.', 'GetTimeSeriesLinearReg'),
    ('Show me linearregangle for AAPL.', 'GetTimeSeriesLinearRegAngle'),
    ('Show me linearregintercept for AAPL.', 'GetTimeSeriesLinearRegIntercept'),
    ('Show me linearregslope for AAPL.', 'GetTimeSeriesLinearRegSlope'),
    ('Show me ln for AAPL.', 'GetTimeSeriesLn'),
    ('Show me log10 for AAPL.', 'GetTimeSeriesLog10'),
    ('Show me logo for AAPL.', 'GetLogo'),
    ('Show me ma for AAPL.', 'GetTimeSeriesMa'),
    ('Show me macd for AAPL.', 'GetTimeSeriesMacd'),
    ('Show me macd slope for AAPL.', 'GetTimeSeriesMacdSlope'),

    ('Show me macdext for AAPL.', 'GetTimeSeriesMacdExt'),
    ('Show me mama for AAPL.', 'GetTimeSeriesMama'),
    ('Show me market capitalization for AAPL.', 'GetMarketCap'),
    ("Show me the top market movers in the US stock market.", "GetMarketMovers"),
    ("Is the NASDAQ market currently open?", "GetMarketState"),
    ('Show me max for AAPL.', 'GetTimeSeriesMax'),
    ('Show me maxindex for AAPL.', 'GetTimeSeriesMaxIndex'),
    ('Show me mcginley_dynamic for AAPL.', 'GetTimeSeriesMcGinleyDynamic'),
    ('Show me medprice for AAPL.', 'GetTimeSeriesMedPrice'),

    ('Show me mfi for AAPL.', 'GetTimeSeriesMfi'),
    ("Show me the MIDPOINT indicator for AAPL", 'GetTimeSeriesMidPoint'),
    ('Show me midprice for AAPL.', 'GetTimeSeriesMidPrice'),
    ('Show me min for AAPL.', 'GetTimeSeriesMin'),
    ('Show me minimal price index for AAPL.', 'GetTimeSeriesMinIndex'),
    ('Show me minmax for AAPL.', 'GetTimeSeriesMinMax'),
    ('Show me minmaxindex for AAPL.', 'GetTimeSeriesMinMaxIndex'),
    ('Show me minus_di for AAPL.', 'GetTimeSeriesMinusDI'),
    ('Show me minus_dm for AAPL.', 'GetTimeSeriesMinusDM'),
    ('Show me mom for AAPL.', 'GetTimeSeriesMom'),

    ('Show me mult for AAPL.', 'GetTimeSeriesMult'),
    ('Show me mutual fonds family list.', 'GetMutualFundsFamily'),
    ('Show me mutual fonds list.', 'GetMutualFundsList'),
    ('Show me mutual fonds type list.', 'GetMutualFundsType'),
    ('Show me all data for mutual fund VTSMX.', 'GetMutualFundsWorld'),
    ('Show me composition for mutual fund VTSMX.', 'GetMutualFundsWorldComposition'),
    ('Show me performance for mutual fund VTSMX.', 'GetMutualFundsWorldPerformance'),
    ('Show me purchase info for mutual fund VTSMX.', 'GetMutualFundsWorldPurchaseInfo'),
    ('Show me ratings for mutual fund VTSMX.', 'GetMutualFundsWorldRatings'),
    ('Show me risk for mutual fund VTSMX.', 'GetMutualFundsWorldRisk'),

    ('Show me summary for mutual fund VTSMX.', 'GetMutualFundsWorldSummary'),
    ('Show me sustainability for mutual fund VTSMX.', 'GetMutualFundsWorldSustainability'),
    ('Show me natr indicator for AAPL.', 'GetTimeSeriesNatr'),
    ('Show me obv indicator for AAPL.', 'GetTimeSeriesObv'),
    ('Show me percent B indicator for AAPL.', 'GetTimeSeriesPercent_B'),
    ('Show me pivot points HL for AAPL.', 'GetTimeSeriesPivotPointsHL'),
    ('Show me plus DI indicator for AAPL.', 'GetTimeSeriesPlusDI'),
    ('Show me plus DM indicator for AAPL.', 'GetTimeSeriesPlusDM'),
    ('Show me PPO indicator for AAPL.', 'GetTimeSeriesPpo'),
    ('Show me real-time price for AAPL.', 'GetPrice'),

    ('Show me price target for AAPL.', 'GetPriceTarget'),
    ('Show me company profile for AAPL.', 'GetProfile'),
    ('Show me real-time quote for AAPL.', 'GetQuote'),
    ('Show me analyst recommendations for AAPL.', 'GetRecommendations'),
    ('Show me revenue estimate for AAPL.', 'GetRevenueEstimate'),
    ('Show me ROC indicator for AAPL.', 'GetTimeSeriesRoc'),
    ('Show me ROCP indicator for AAPL.', 'GetTimeSeriesRocp'),
    ('Show me ROCR indicator for AAPL.', 'GetTimeSeriesRocr'),
    ('Show me ROCR100 indicator for AAPL.', 'GetTimeSeriesRocr100'),
    ('Show me RSI indicator for AAPL.', 'GetTimeSeriesRsi'),

    ('Show me RVOL indicator for AAPL.', 'GetTimeSeriesRvol'),
    ('List all entities sanctioned by OFAC.', 'GetSourceSanctionedEntities'),
    ('Show me SAR indicator for AAPL.', 'GetTimeSeriesSar'),
    ('Show me extended SAR indicator for AAPL.', 'GetTimeSeriesSarExt'),
    ('Show me SMA indicator for AAPL.', 'GetTimeSeriesSma'),
    ('Show me stock splits for AAPL.', 'GetSplits'),
    ('Show me splits calendar for AAPL.', 'GetSplitsCalendar'),
    ('Show me SQRT indicator for AAPL.', 'GetTimeSeriesSqrt'),
    ('Show me statistics for AAPL.', 'GetStatistics'),
    ('Show me standard deviation for AAPL.', 'GetTimeSeriesStdDev'),

    ('Show me stoch for AAPL.', 'GetTimeSeriesStoch'),
    ('Show me stochf for AAPL.', 'GetTimeSeriesStochF'),
    ('Show me stochrsi for AAPL.', 'GetTimeSeriesStochRsi'),
    ('Show me stocks list for AAPL.', 'GetStocks'),
    ('Show me sub for AAPL.', 'GetTimeSeriesSub'),
    ('Show me sum for AAPL.', 'GetTimeSeriesSum'),
    ('Show me supertrend for AAPL.', 'GetTimeSeriesSuperTrend'),
    ('Show me supertrend heikinashicandles for AAPL.', 'GetTimeSeriesSuperTrendHeikinAshiCandles'),
    ('Show me symbol search for AAPL.', 'GetSymbolSearch'),
    ('Show me t3ma for AAPL.', 'GetTimeSeriesT3ma'),

    ('Show me tax information for AAPL.', 'GetTaxInfo'),
    ('Show me technical indicators interface for AAPL.', 'GetTechnicalIndicators'),
    ('Show me tema for AAPL.', 'GetTimeSeriesTema'),
    ('Show me time series for AAPL.', 'GetTimeSeries'),
    ('Get cross rate time series for USD/BTC', 'GetTimeSeriesCross'),
    ('Show me trange for AAPL.', 'GetTimeSeriesTRange'),
    ('Show me trima for AAPL.', 'GetTimeSeriesTrima'),
    ('Show me tsf for AAPL.', 'GetTimeSeriesTsf'),
    ('Show me typprice for AAPL.', 'GetTimeSeriesTypPrice'),
    ('Show me ultosc for AAPL.', 'GetTimeSeriesUltOsc'),

    ('Show me var for AAPL.', 'GetTimeSeriesVar'),
    ('Show me vwap for AAPL.', 'GetTimeSeriesVwap'),
    ('Show me wclprice for AAPL.', 'GetTimeSeriesWclPrice'),
    ('Show me willr for AAPL.', 'GetTimeSeriesWillR'),
    ('Show me wma for AAPL.', 'GetTimeSeriesWma'),
]

```
Page 1/3FirstPrevNextLast