#
tokens: 33213/50000 34/34 files
lines: off (toggle) GitHub
raw markdown copy
# Directory Structure

```
├── .dockerignore
├── .env.local
├── .gitignore
├── .python-version
├── dockerfile
├── LICENSE
├── makefile
├── pyproject.toml
├── README.md
└── src
    ├── __init__.py
    ├── api_service
    │   ├── __init__.py
    │   ├── os_api.py
    │   └── protocols.py
    ├── config_docs
    │   ├── ogcapi-features-1.yaml
    │   ├── ogcapi-features-2.yaml
    │   └── ogcapi-features-3.yaml
    ├── http_client_test.py
    ├── mcp_service
    │   ├── __init__.py
    │   ├── guardrails.py
    │   ├── os_service.py
    │   ├── prompts.py
    │   ├── protocols.py
    │   ├── resources.py
    │   └── routing_service.py
    ├── middleware
    │   ├── __init__.py
    │   ├── http_middleware.py
    │   └── stdio_middleware.py
    ├── models.py
    ├── prompt_templates
    │   ├── __init__.py
    │   └── prompt_templates.py
    ├── server.py
    ├── stdio_client_test.py
    ├── utils
    │   ├── __init__.py
    │   └── logging_config.py
    └── workflow_generator
        ├── __init__.py
        └── workflow_planner.py
```

# Files

--------------------------------------------------------------------------------
/.python-version:
--------------------------------------------------------------------------------

```
3.11

```

--------------------------------------------------------------------------------
/.env.local:
--------------------------------------------------------------------------------

```
OS_API_KEY=xxxxxxx
STDIO_KEY=xxxxxxx
BEARER_TOKENS=xxxxxxx

```

--------------------------------------------------------------------------------
/.gitignore:
--------------------------------------------------------------------------------

```
# Python-generated files
__pycache__/
*.py[oc]
build/
dist/
wheels/
*.egg-info

# Virtual environments
.venv
.env
.DS_Store
.ruff_cache
uv.lock
pyrightconfig.json
*.log
.pytest_cache/
.mypy_cache/
```

--------------------------------------------------------------------------------
/.dockerignore:
--------------------------------------------------------------------------------

```
# Python artifacts
__pycache__/
*.py[cod]
*$py.class
*.so
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
*.egg-info/
.installed.cfg
*.egg

# Virtual environments
.venv/
venv/
ENV/
env/
.env

# Testing and type checking
.pytest_cache/
.mypy_cache/
.coverage
htmlcov/
.tox/
.hypothesis/
*.cover
.coverage.*

# Development tools
.ruff_cache/
pyrightconfig.json
.vscode/
.idea/
*.swp
*.swo
*~

# OS files
.DS_Store
.DS_Store?
._*
.Spotlight-V100
.Trashes
ehthumbs.db
Thumbs.db

# Git
.git/
.gitignore
.gitattributes

# Documentation
*.md
docs/
LICENSE

# Build files
Makefile
makefile
dockerfile
Dockerfile

# Logs
*.log
logs/

# Package manager
uv.lock
poetry.lock
Pipfile.lock

# Test files
*_test.py
test_*.py
tests/
```

--------------------------------------------------------------------------------
/README.md:
--------------------------------------------------------------------------------

```markdown
# Ordnance Survey MCP Server

A MCP server for accessing UK geospatial data through Ordnance Survey APIs.

## What it does

Provides LLM access to the Ordnance Survey's Data Hub APIs. 

Ask simple questions such as find me all cinemas in Leeds City Centre or use the prompts templates for more complex, speciifc use cases - relating to street works, planning, etc.

This MCP server enforces a 2 step workflow plan to ensure that the user gets the best results possible.

## Quick Start

### 1. Get an OS API Key

Register at [OS Data Hub](https://osdatahub.os.uk/) to get your free API key and set up a project.

### 2. Run with Docker and add to your Claude Desktop config (easiest)

```bash
Clone the repository:
git clone https://github.com/your-username/os-mcp-server.git
cd os-mcp-server
```

Then build the Docker image:

```bash
docker build -t os-mcp-server .
```

Add the following to your Claude Desktop config:

```json
{
  "mcpServers": {
    "os-mcp-server": {
      "command": "docker",
      "args": [
        "run",
        "--rm",
        "-i",
        "-e",
        "OS_API_KEY=your_api_key_here",
        "-e",
        "STDIO_KEY=any_value",
        "os-mcp-server"
      ]
    }
  }
}
```

Open Claude Desktop and you should now see all available tools, resources, and prompts.

## Requirements

- Python 3.11+
- OS API Key from [OS Data Hub](https://osdatahub.os.uk/)
- Set a STDIO_KEY env var which can be any value for now whilst auth is improved.

## License

MIT License. This project does not have the endorsement of Ordnance Survey. This is a personal project and not affiliated with Ordnance Survey. This is not a commercial product. It is actively being worked on so expect breaking changes and bugs.

```

--------------------------------------------------------------------------------
/src/__init__.py:
--------------------------------------------------------------------------------

```python

```

--------------------------------------------------------------------------------
/src/api_service/__init__.py:
--------------------------------------------------------------------------------

```python

```

--------------------------------------------------------------------------------
/src/mcp_service/__init__.py:
--------------------------------------------------------------------------------

```python

```

--------------------------------------------------------------------------------
/src/middleware/__init__.py:
--------------------------------------------------------------------------------

```python

```

--------------------------------------------------------------------------------
/src/prompt_templates/__init__.py:
--------------------------------------------------------------------------------

```python

```

--------------------------------------------------------------------------------
/src/utils/__init__.py:
--------------------------------------------------------------------------------

```python

```

--------------------------------------------------------------------------------
/src/workflow_generator/__init__.py:
--------------------------------------------------------------------------------

```python

```

--------------------------------------------------------------------------------
/pyproject.toml:
--------------------------------------------------------------------------------

```toml
[project]
name = "os-mcp"
version = "0.1.5"
description = "A Python MCP server that provides access to Ordnance Survey's DataHub APIs."
readme = "README.md"
requires-python = ">=3.11"
dependencies = [
    "aiohttp>=3.11.18",
    "anthropic>=0.51.0",
    "fastapi>=0.116.1",
    "mcp>=1.12.0",
    "starlette>=0.47.1",
    "uvicorn>=0.35.0",
]

```

--------------------------------------------------------------------------------
/dockerfile:
--------------------------------------------------------------------------------

```dockerfile
FROM python:3.11-slim

WORKDIR /app

ENV PYTHONPATH=/app/src \
    PYTHONUNBUFFERED=1 \
    PIP_NO_CACHE_DIR=1 \
    PIP_DISABLE_PIP_VERSION_CHECK=1


RUN apt-get update && apt-get install -y \
    gcc \
    && rm -rf /var/lib/apt/lists/* \
    && apt-get clean

COPY pyproject.toml ./
RUN pip install --no-cache-dir .

COPY src/ ./src/

EXPOSE 8000

CMD ["python", "src/server.py", "--transport", "stdio"]

```

--------------------------------------------------------------------------------
/src/workflow_generator/workflow_planner.py:
--------------------------------------------------------------------------------

```python
from typing import Dict, Any, Optional, List
from models import OpenAPISpecification
from utils.logging_config import get_logger

logger = get_logger(__name__)


class WorkflowPlanner:
    """Context provider for LLM workflow planning"""

    def __init__(
        self,
        openapi_spec: Optional[OpenAPISpecification],
        basic_collections_info: Optional[Dict[str, Any]] = None,
    ):
        self.spec = openapi_spec
        self.basic_collections_info = basic_collections_info or {}
        self.detailed_collections_cache = {}

    def get_basic_context(self) -> Dict[str, Any]:
        """Get basic context for LLM to plan its workflow - no detailed queryables"""
        return {
            "available_collections": self.basic_collections_info,
            "openapi_spec": self.spec,
        }

    def get_detailed_context(self, collection_ids: List[str]) -> Dict[str, Any]:
        """Get detailed context for specific collections mentioned in the plan"""
        detailed_collections = {
            coll_id: self.detailed_collections_cache.get(coll_id)
            for coll_id in collection_ids
            if coll_id in self.detailed_collections_cache
        }

        return {
            "available_collections": detailed_collections,
            "openapi_spec": self.spec,
        }

```

--------------------------------------------------------------------------------
/src/api_service/protocols.py:
--------------------------------------------------------------------------------

```python
from typing import Protocol, Dict, List, Any, Optional, runtime_checkable
from models import (
    OpenAPISpecification,
    CollectionsCache,
    CollectionQueryables,
)


@runtime_checkable
class APIClient(Protocol):
    """Protocol for API clients"""

    async def initialise(self):
        """Initialise the aiohttp session if not already created"""
        ...

    async def close(self):
        """Close the aiohttp session"""
        ...

    async def get_api_key(self) -> str:
        """Get the API key"""
        ...

    async def make_request(
        self,
        endpoint: str,
        params: Optional[Dict[str, Any]] = None,
        path_params: Optional[List[str]] = None,
    ) -> Dict[str, Any]:
        """Make a request to an API endpoint"""
        ...

    async def make_request_no_auth(
        self,
        url: str,
        params: Optional[Dict[str, Any]] = None,
        max_retries: int = 2,
    ) -> str:
        """Make a request without authentication"""
        ...

    async def cache_openapi_spec(self) -> OpenAPISpecification:
        """Cache the OpenAPI spec"""
        ...

    async def cache_collections(self) -> CollectionsCache:
        """Cache the collections data"""
        ...

    async def fetch_collections_queryables(
        self, collection_ids: List[str]
    ) -> Dict[str, CollectionQueryables]:
        """Fetch detailed queryables for specific collections only"""
        ...

```

--------------------------------------------------------------------------------
/src/utils/logging_config.py:
--------------------------------------------------------------------------------

```python
import logging
import sys
import os
import re


class APIKeySanitisingFilter(logging.Filter):
    """Filter to sanitise API keys from log messages"""

    def __init__(self):
        super().__init__()
        self.patterns = [
            r"[?&]key=[^&\s]*",
            r"[?&]api_key=[^&\s]*",
            r"[?&]apikey=[^&\s]*",
            r"[?&]token=[^&\s]*",
        ]

    def filter(self, record: logging.LogRecord) -> bool:
        """Sanitise the log record message"""
        if hasattr(record, "msg") and isinstance(record.msg, str):
            record.msg = self._sanitise_text(record.msg)

        if hasattr(record, "args") and record.args:
            sanitised_args = []
            for arg in record.args:
                if isinstance(arg, str):
                    sanitised_args.append(self._sanitise_text(arg))
                else:
                    sanitised_args.append(arg)
            record.args = tuple(sanitised_args)

        return True

    def _sanitise_text(self, text: str) -> str:
        """Remove API keys from text"""
        sanitised = text
        for pattern in self.patterns:
            sanitised = re.sub(pattern, "", sanitised, flags=re.IGNORECASE)

        sanitised = re.sub(r"[?&]$", "", sanitised)
        sanitised = re.sub(r"&{2,}", "&", sanitised)
        sanitised = re.sub(r"\?&", "?", sanitised)

        return sanitised


def configure_logging(debug: bool = False) -> logging.Logger:
    """
    Configure logging for the entire application.

    Args:
        debug: Whether to enable debug logging or not
    """
    log_level = (
        logging.DEBUG if (debug or os.environ.get("DEBUG") == "1") else logging.INFO
    )

    formatter = logging.Formatter(
        "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
    )

    root_logger = logging.getLogger()
    root_logger.setLevel(log_level)

    for handler in root_logger.handlers[:]:
        root_logger.removeHandler(handler)

    console_handler = logging.StreamHandler(sys.stderr)
    console_handler.setFormatter(formatter)
    console_handler.setLevel(log_level)

    api_key_filter = APIKeySanitisingFilter()
    console_handler.addFilter(api_key_filter)

    root_logger.addHandler(console_handler)

    logging.getLogger("uvicorn").propagate = False

    return root_logger


def get_logger(name: str) -> logging.Logger:
    """
    Get a logger for a module.

    Args:
        name: Module name (usually __name__)

    Returns:
        Logger instance
    """
    return logging.getLogger(name)

```

--------------------------------------------------------------------------------
/src/prompt_templates/prompt_templates.py:
--------------------------------------------------------------------------------

```python
PROMPT_TEMPLATES = {
    "usrn_breakdown": (
        "Break down USRN {usrn} into its component road links for routing analysis. "
        "Step 1: GET /collections/trn-ntwk-street-1/items?filter=usrn='{usrn}' to get street details. "
        "Step 2: Use Street geometry bbox to query Road Links: GET /collections/trn-ntwk-roadlink-4/items?bbox=[street_bbox] "
        "Step 3: Filter Road Links by Street reference using properties.street_ref matching street feature ID. "
        "Step 4: For each Road Link: GET /collections/trn-ntwk-roadnode-1/items?filter=roadlink_ref='roadlink_id' "
        "Step 5: Set crs=EPSG:27700 for British National Grid coordinates. "
        "Return: Complete breakdown of USRN into constituent Road Links with node connections."
    ),
    "restriction_matching_analysis": (
        "Perform comprehensive traffic restriction matching for road network in bbox {bbox} with SPECIFIC STREET IDENTIFICATION. "
        "Step 1: Build routing network: get_routing_data(bbox='{bbox}', limit={limit}, build_network=True) "
        "Step 2: Extract restriction data from build_status.restrictions array. "
        "Step 3: For each restriction, match to road links using restrictionnetworkreference: "
        "  - networkreferenceid = Road Link UUID from trn-ntwk-roadlink-4 "
        "  - roadlinkdirection = 'In Direction' (with geometry) or 'In Opposite Direction' (against geometry) "
        "  - roadlinksequence = order for multi-link restrictions (turns) "
        "Step 4: IMMEDIATELY lookup street names: Use get_bulk_features(collection_id='trn-ntwk-roadlink-4', identifiers=[road_link_uuids]) to resolve UUIDs to actual street names (name1_text, roadclassification, roadclassificationnumber) "
        "Step 5: Analyze restriction types by actual street names: "
        "  - One Way: Apply directional constraint to specific named road "
        "  - Turn Restriction: Block movement between named streets (from street A to street B) "
        "  - Vehicle Restrictions: Apply dimension/weight limits to specific named roads "
        "  - Access Restrictions: Apply vehicle type constraints to specific named streets "
        "Step 6: Check exemptions array (e.g., 'Pedal Cycles' exempt from one-way) for each named street "
        "Step 7: Group results by street names and road classifications (A Roads, B Roads, Local Roads) "
        "Return: Complete restriction mapping showing ACTUAL STREET NAMES with their specific restrictions, directions, exemptions, and road classifications. Present results as 'Street Name (Road Class)' rather than UUIDs."
    ),
}

```

--------------------------------------------------------------------------------
/src/mcp_service/resources.py:
--------------------------------------------------------------------------------

```python
"""MCP Resources for OS NGD documentation"""

import json
import time
from models import NGDAPIEndpoint
from utils.logging_config import get_logger

logger = get_logger(__name__)


# TODO: Do this for
class OSDocumentationResources:
    """Handles registration of OS NGD documentation resources"""

    def __init__(self, mcp_service, api_client):
        self.mcp = mcp_service
        self.api_client = api_client

    def register_all(self) -> None:
        """Register all documentation resources"""
        self._register_transport_network_resources()
        # Future: self._register_land_resources()
        # Future: self._register_building_resources()

    def _register_transport_network_resources(self) -> None:
        """Register transport network documentation resources"""

        @self.mcp.resource("os-docs://street")
        async def street_docs() -> str:
            return await self._fetch_doc_resource(
                "street", NGDAPIEndpoint.MARKDOWN_STREET.value
            )

        @self.mcp.resource("os-docs://road")
        async def road_docs() -> str:
            return await self._fetch_doc_resource(
                "road", NGDAPIEndpoint.MARKDOWN_ROAD.value
            )

        @self.mcp.resource("os-docs://tram-on-road")
        async def tram_on_road_docs() -> str:
            return await self._fetch_doc_resource(
                "tram-on-road", NGDAPIEndpoint.TRAM_ON_ROAD.value
            )

        @self.mcp.resource("os-docs://road-node")
        async def road_node_docs() -> str:
            return await self._fetch_doc_resource(
                "road-node", NGDAPIEndpoint.ROAD_NODE.value
            )

        @self.mcp.resource("os-docs://road-link")
        async def road_link_docs() -> str:
            return await self._fetch_doc_resource(
                "road-link", NGDAPIEndpoint.ROAD_LINK.value
            )

        @self.mcp.resource("os-docs://road-junction")
        async def road_junction_docs() -> str:
            return await self._fetch_doc_resource(
                "road-junction", NGDAPIEndpoint.ROAD_JUNCTION.value
            )

    async def _fetch_doc_resource(self, feature_type: str, url: str) -> str:
        """Generic method to fetch documentation resources"""
        try:
            content = await self.api_client.make_request_no_auth(url)

            return json.dumps(
                {
                    "feature_type": feature_type,
                    "content": content,
                    "content_type": "markdown",
                    "source_url": url,
                    "timestamp": time.time(),
                }
            )

        except Exception as e:
            logger.error(f"Error fetching {feature_type} documentation: {e}")
            return json.dumps({"error": str(e), "feature_type": feature_type})

```

--------------------------------------------------------------------------------
/src/mcp_service/prompts.py:
--------------------------------------------------------------------------------

```python
from typing import List
from mcp.types import PromptMessage, TextContent
from prompt_templates.prompt_templates import PROMPT_TEMPLATES
from utils.logging_config import get_logger

logger = get_logger(__name__)


class OSWorkflowPrompts:
    """Handles registration of OS NGD workflow prompts"""

    def __init__(self, mcp_service):
        self.mcp = mcp_service

    def register_all(self) -> None:
        """Register all workflow prompts"""
        self._register_analysis_prompts()
        self._register_general_prompts()

    def _register_analysis_prompts(self) -> None:
        """Register analysis workflow prompts"""

        @self.mcp.prompt()
        def usrn_breakdown_analysis(usrn: str) -> List[PromptMessage]:
            """Generate a step-by-step USRN breakdown workflow"""
            template = PROMPT_TEMPLATES["usrn_breakdown"].format(usrn=usrn)

            return [
                PromptMessage(
                    role="user",
                    content=TextContent(
                        type="text",
                        text=f"As an expert in OS NGD API workflows and transport network analysis, {template}",
                    ),
                )
            ]

    def _register_general_prompts(self) -> None:
        """Register general OS NGD guidance prompts"""

        @self.mcp.prompt()
        def collection_query_guidance(
            collection_id: str, query_type: str = "features"
        ) -> List[PromptMessage]:
            """Generate guidance for querying OS NGD collections"""
            return [
                PromptMessage(
                    role="user",
                    content=TextContent(
                        type="text",
                        text=f"As an OS NGD API expert, guide me through querying the '{collection_id}' collection for {query_type}. "
                        f"Include: 1) Available filters, 2) Best practices for bbox queries, "
                        f"3) CRS considerations, 4) Example queries with proper syntax.",
                    ),
                )
            ]

        @self.mcp.prompt()
        def workflow_planning(
            user_request: str, data_theme: str = "transport"
        ) -> List[PromptMessage]:
            """Generate a workflow plan for complex OS NGD queries"""
            return [
                PromptMessage(
                    role="user",
                    content=TextContent(
                        type="text",
                        text=f"As a geospatial workflow planner, create a detailed workflow plan for: '{user_request}'. "
                        f"Focus on {data_theme} theme data. Include: "
                        f"1) Collection selection rationale, "
                        f"2) Query sequence with dependencies, "
                        f"3) Filter strategies, "
                        f"4) Error handling considerations.",
                    ),
                )
            ]

```

--------------------------------------------------------------------------------
/src/config_docs/ogcapi-features-2.yaml:
--------------------------------------------------------------------------------

```yaml
openapi: 3.0.3
info:
  title: "Building Blocks specified in OGC API - Features - Part 2: Coordinate Reference Systems by Reference"
  description: |-
    Common components used in the
    [OGC standard "OGC API - Features - Part 2: Coordinate Reference Systems by Reference"](http://docs.opengeospatial.org/is/18-058/18-058.html).

    OGC API - Features - Part 2: Coordinate Reference Systems by Reference 1.0 is an OGC Standard.
    Copyright (c) 2020 Open Geospatial Consortium.
    To obtain additional rights of use, visit http://www.opengeospatial.org/legal/ .

    This document is also available on
    [OGC](http://schemas.opengis.net/ogcapi/features/part2/1.0/openapi/ogcapi-features-2.yaml).
  version: '1.0.0'
  contact:
    name: Clemens Portele
    email: [email protected]
  license:
    name: OGC License
    url: 'http://www.opengeospatial.org/legal/'
components:
  parameters:
    bbox-crs:
      name: bbox-crs
      in: query
      required: false
      schema:
        type: string
        format: uri
      style: form
      explode: false
    crs:
      name: crs
      in: query
      required: false
      schema:
        type: string
        format: uri
      style: form
      explode: false
  schemas:
    collection:
      allOf:
      - $ref: http://schemas.opengis.net/ogcapi/features/part1/1.0/openapi/schemas/collection.yaml
      - $ref: '#/components/schemas/collectionExtensionCrs'
    collectionExtensionCrs:
      type: object
      properties:
        storageCrs:
          description: the CRS identifier, from the list of supported CRS identifiers, that may be used to retrieve features from a collection without the need to apply a CRS transformation
          type: string
          format: uri
        storageCrsCoordinateEpoch:
          description: point in time at which coordinates in the spatial feature collection are referenced to the dynamic coordinate reference system in `storageCrs`, that may be used to retrieve features from a collection without the need to apply a change of coordinate epoch. It is expressed as a decimal year in the Gregorian calendar
          type: number
          example: '2017-03-25 in the Gregorian calendar is epoch 2017.23'
    collections:
      allOf:
      - $ref: http://schemas.opengis.net/ogcapi/features/part1/1.0/openapi/schemas/collections.yaml
      - $ref: '#/components/schemas/collectionsExtensionCrs'
    collectionsExtensionCrs:
      type: object
      properties:
        crs:
          description: a global list of CRS identifiers that are supported by spatial feature collections offered by the service
          type: array
          items:
            type: string
            format: uri
  headers:
    Content-Crs:
      description: a URI, in angular brackets, identifying the coordinate reference system used in the content / payload
      schema:
        type: string
      example: '<http://www.opengis.net/def/crs/EPSG/0/3395>'

```

--------------------------------------------------------------------------------
/src/mcp_service/protocols.py:
--------------------------------------------------------------------------------

```python
from typing import Protocol, Optional, Callable, List, runtime_checkable, Any


@runtime_checkable
class MCPService(Protocol):
    """Protocol for MCP services"""

    def tool(self) -> Callable[..., Any]:
        """Register a function as an MCP tool"""
        ...

    def resource(
        self,
        uri: str,
        *,
        name: str | None = None,
        title: str | None = None,
        description: str | None = None,
        mime_type: str | None = None,
    ) -> Callable[[Any], Any]:
        """Register a function as an MCP resource"""
        ...

    def run(self) -> None:
        """Run the MCP service"""
        ...


@runtime_checkable
class FeatureService(Protocol):
    """Protocol for OS NGD feature services"""

    def hello_world(self, name: str) -> str:
        """Test connection to the service"""
        ...

    def check_api_key(self) -> str:
        """Check if API key is available"""
        ...

    async def list_collections(self) -> str:
        """List all available feature collections"""
        ...

    async def get_single_collection(self, collection_id: str) -> str:
        """Get detailed information about a specific collection"""
        ...

    async def get_single_collection_queryables(self, collection_id: str) -> str:
        """Get queryable properties for a collection"""
        ...

    # TODO: Need to make sure the full list of parameters is supported
    # TODO: Supporting cql-text is clunky and need to figure out how to support this better
    async def search_features(
        self,
        collection_id: str,
        bbox: Optional[str] = None,
        crs: Optional[str] = None,
        limit: int = 10,
        offset: int = 0,
        filter: Optional[str] = None,
        filter_lang: Optional[str] = "cql-text",
        query_attr: Optional[str] = None,
        query_attr_value: Optional[str] = None,
    ) -> str:
        """Search for features in a collection with full CQL filter support"""
        ...

    async def get_feature(
        self, collection_id: str, feature_id: str, crs: Optional[str] = None
    ) -> str:
        """Get a specific feature by ID"""
        ...

    async def get_linked_identifiers(
        self, identifier_type: str, identifier: str, feature_type: Optional[str] = None
    ) -> str:
        """Get linked identifiers for a specified identifier"""
        ...

    async def get_bulk_features(
        self,
        collection_id: str,
        identifiers: List[str],
        query_by_attr: Optional[str] = None,
    ) -> str:
        """Get multiple features in a single call"""
        ...

    async def get_bulk_linked_features(
        self,
        identifier_type: str,
        identifiers: List[str],
        feature_type: Optional[str] = None,
    ) -> str:
        """Get linked features for multiple identifiers"""
        ...

    async def fetch_detailed_collections(self, collection_ids: List[str]) -> str:
        """Get detailed information about specific collections for workflow planning"""
        ...

```

--------------------------------------------------------------------------------
/src/middleware/stdio_middleware.py:
--------------------------------------------------------------------------------

```python
import json
import time
import asyncio
from collections import deque
from typing import Callable, TypeVar, Any, Union, cast
from functools import wraps
from utils.logging_config import get_logger

logger = get_logger(__name__)
F = TypeVar("F", bound=Callable[..., Any])


class StdioRateLimiter:
    """STDIO-specific rate limiting"""

    def __init__(self, requests_per_minute: int = 10, window_seconds: int = 60):
        self.requests_per_minute = requests_per_minute
        self.window_seconds = window_seconds
        self.request_timestamps = deque()

    def check_rate_limit(self) -> bool:
        """Check rate limit for STDIO client"""
        current_time = time.time()

        while (
            self.request_timestamps
            and current_time - self.request_timestamps[0] >= self.window_seconds
        ):
            self.request_timestamps.popleft()

        if len(self.request_timestamps) >= self.requests_per_minute:
            logger.warning("STDIO rate limit exceeded")
            return False

        self.request_timestamps.append(current_time)
        return True


class StdioMiddleware:
    """STDIO authentication and rate limiting"""

    def __init__(self, requests_per_minute: int = 20):
        self.authenticated = False
        self.client_id = "anonymous"
        self.rate_limiter = StdioRateLimiter(requests_per_minute=requests_per_minute)

    def require_auth_and_rate_limit(self, func: F) -> F:
        """Decorator for auth and rate limiting"""

        @wraps(func)
        async def async_wrapper(*args: Any, **kwargs: Any) -> Union[str, Any]:
            if not self.authenticated:
                logger.error(
                    json.dumps({"error": "Authentication required", "code": 401})
                )
                return json.dumps({"error": "Authentication required", "code": 401})

            if not self.rate_limiter.check_rate_limit():
                logger.error(json.dumps({"error": "Rate limited", "code": 429}))
                return json.dumps({"error": "Rate limited", "code": 429})

            return await func(*args, **kwargs)

        @wraps(func)
        def sync_wrapper(*args: Any, **kwargs: Any) -> Union[str, Any]:
            if not self.authenticated:
                logger.error(
                    json.dumps({"error": "Authentication required", "code": 401})
                )
                return json.dumps({"error": "Authentication required", "code": 401})

            if not self.rate_limiter.check_rate_limit():
                logger.error(json.dumps({"error": "Rate limited", "code": 429}))
                return json.dumps({"error": "Rate limited", "code": 429})

            return func(*args, **kwargs)

        if asyncio.iscoroutinefunction(func):
            return cast(F, async_wrapper)
        return cast(F, sync_wrapper)

    def authenticate(self, key: str) -> bool:
        """Authenticate with API key"""
        if key and key.strip():
            self.authenticated = True
            self.client_id = key
            return True
        else:
            self.authenticated = False
            return False

```

--------------------------------------------------------------------------------
/src/mcp_service/guardrails.py:
--------------------------------------------------------------------------------

```python
import re
import json
import asyncio
from functools import wraps
from typing import TypeVar, Callable, Any, Union, cast
from utils.logging_config import get_logger

logger = get_logger(__name__)
F = TypeVar("F", bound=Callable[..., Any])


class ToolGuardrails:
    """Prompt injection protection"""

    def __init__(self):
        self.suspicious_patterns = [
            r"(?i)ignore previous",
            r"(?i)ignore all previous instructions",
            r"(?i)assistant:",
            r"\{\{.*?\}\}",
            r"(?i)forget",
            r"(?i)show credentials",
            r"(?i)show secrets",
            r"(?i)reveal password",
            r"(?i)dump (tokens|secrets|passwords|credentials)",
            r"(?i)leak confidential",
            r"(?i)reveal secrets",
            r"(?i)expose secrets",
            r"(?i)secrets.*contain",
            r"(?i)extract secrets",
        ]

    def detect_prompt_injection(self, input_text: Any) -> bool:
        """Check if input contains prompt injection attempts"""
        if not isinstance(input_text, str):
            return False
        return any(
            re.search(pattern, input_text) for pattern in self.suspicious_patterns
        )

    def basic_guardrails(self, func: F) -> F:
        """Prompt injection protection only"""

        @wraps(func)
        async def async_wrapper(*args: Any, **kwargs: Any) -> Union[str, Any]:
            try:
                for arg in args:
                    if isinstance(arg, str) and self.detect_prompt_injection(arg):
                        raise ValueError("Prompt injection detected!")

                for name, value in kwargs.items():
                    if not (
                        hasattr(value, "request_context")
                        and hasattr(value, "request_id")
                    ):
                        if isinstance(value, str) and self.detect_prompt_injection(
                            value
                        ):
                            raise ValueError(f"Prompt injection in '{name}'!")
            except ValueError as e:
                return json.dumps({"error": str(e), "code": 400})

            return await func(*args, **kwargs)

        @wraps(func)
        def sync_wrapper(*args: Any, **kwargs: Any) -> Union[str, Any]:
            try:
                for arg in args:
                    if isinstance(arg, str) and self.detect_prompt_injection(arg):
                        raise ValueError("Prompt injection detected!")

                for name, value in kwargs.items():
                    if not (
                        hasattr(value, "request_context")
                        and hasattr(value, "request_id")
                    ):
                        if isinstance(value, str) and self.detect_prompt_injection(
                            value
                        ):
                            raise ValueError(f"Prompt injection in '{name}'!")
            except ValueError as e:
                return json.dumps({"error": str(e), "code": 400})

            return func(*args, **kwargs)

        if asyncio.iscoroutinefunction(func):
            return cast(F, async_wrapper)
        return cast(F, sync_wrapper)

```

--------------------------------------------------------------------------------
/src/models.py:
--------------------------------------------------------------------------------

```python
"""
Types for the OS NGD API MCP Server
"""

from enum import Enum
from pydantic import BaseModel
from typing import Any, List, Dict


class NGDAPIEndpoint(Enum):
    """
    Enum for the OS API Endpoints following OGC API Features standard
    """

    # NGD Features Endpoints
    NGD_FEATURES_BASE_PATH = "https://api.os.uk/features/ngd/ofa/v1/{}"
    COLLECTIONS = NGD_FEATURES_BASE_PATH.format("collections")
    COLLECTION_INFO = NGD_FEATURES_BASE_PATH.format("collections/{}")
    COLLECTION_SCHEMA = NGD_FEATURES_BASE_PATH.format("collections/{}/schema")
    COLLECTION_FEATURES = NGD_FEATURES_BASE_PATH.format("collections/{}/items")
    COLLECTION_FEATURE_BY_ID = NGD_FEATURES_BASE_PATH.format("collections/{}/items/{}")
    COLLECTION_QUERYABLES = NGD_FEATURES_BASE_PATH.format("collections/{}/queryables")

    # OpenAPI Specification Endpoint
    OPENAPI_SPEC = NGD_FEATURES_BASE_PATH.format("api")

    # Linked Identifiers Endpoints
    LINKED_IDENTIFIERS_BASE_PATH = "https://api.os.uk/search/links/v1/{}"
    LINKED_IDENTIFIERS = LINKED_IDENTIFIERS_BASE_PATH.format("identifierTypes/{}/{}")

    # Markdown Resources
    MARKDOWN_BASE_PATH = "https://docs.os.uk/osngd/data-structure/{}"
    MARKDOWN_STREET = MARKDOWN_BASE_PATH.format("transport/transport-network/street.md")
    MARKDOWN_ROAD = MARKDOWN_BASE_PATH.format("transport/transport-network/road.md")
    TRAM_ON_ROAD = MARKDOWN_BASE_PATH.format(
        "transport/transport-network/tram-on-road.md"
    )
    ROAD_NODE = MARKDOWN_BASE_PATH.format("transport/transport-network/road-node.md")
    ROAD_LINK = MARKDOWN_BASE_PATH.format("transport/transport-network/road-link.md")
    ROAD_JUNCTION = MARKDOWN_BASE_PATH.format(
        "transport/transport-network/road-junction.md"
    )

    # Places API Endpoints
    # TODO: Add these back in when I get access to the Places API from OS
    # PLACES_BASE_PATH = "https://api.os.uk/search/places/v1/{}"
    # PLACES_UPRN = PLACES_BASE_PATH.format("uprn")
    # POST_CODE = PLACES_BASE_PATH.format("postcode")


class OpenAPISpecification(BaseModel):
    """Parsed OpenAPI specification optimized for LLM context"""

    title: str
    version: str
    base_url: str
    endpoints: Dict[str, str]
    collection_ids: List[str]
    supported_crs: Dict[str, Any]
    crs_guide: Dict[str, str]


class WorkflowStep(BaseModel):
    """A single step in a workflow plan"""

    step_number: int
    description: str
    api_endpoint: str
    parameters: Dict[str, Any]
    dependencies: List[int] = []


class WorkflowPlan(BaseModel):
    """Generated workflow plan for user requests"""

    user_request: str
    steps: List[WorkflowStep]
    reasoning: str
    estimated_complexity: str = "simple"


class Collection(BaseModel):
    """Represents a feature collection from the OS NGD API"""

    id: str
    title: str
    description: str = ""
    links: List[Dict[str, Any]] = []
    extent: Dict[str, Any] = {}
    itemType: str = "feature"


class CollectionsCache(BaseModel):
    """Cached collections data with filtering applied"""

    collections: List[Collection]
    raw_response: Dict[str, Any]


class CollectionQueryables(BaseModel):
    """Queryables information for a collection"""

    id: str
    title: str
    description: str
    all_queryables: Dict[str, Any]
    enum_queryables: Dict[str, Any]
    has_enum_filters: bool
    total_queryables: int
    enum_count: int


class WorkflowContextCache(BaseModel):
    """Cached workflow context data"""

    collections_info: Dict[str, CollectionQueryables]
    openapi_spec: OpenAPISpecification
    cached_at: float

```

--------------------------------------------------------------------------------
/src/server.py:
--------------------------------------------------------------------------------

```python
import argparse
import os
import uvicorn
from typing import Any
from utils.logging_config import configure_logging

from api_service.os_api import OSAPIClient
from mcp_service.os_service import OSDataHubService
from mcp.server.fastmcp import FastMCP
from middleware.stdio_middleware import StdioMiddleware
from middleware.http_middleware import HTTPMiddleware
from starlette.middleware import Middleware
from starlette.middleware.cors import CORSMiddleware
from starlette.routing import Route
from starlette.responses import JSONResponse

logger = configure_logging()


def main():
    """Main entry point"""
    parser = argparse.ArgumentParser(description="OS DataHub API MCP Server")
    parser.add_argument(
        "--transport",
        choices=["stdio", "streamable-http"],
        default="stdio",
        help="Transport protocol to use (stdio or streamable-http)",
    )
    parser.add_argument(
        "--host", default="0.0.0.0", help="Host to bind to (default: 0.0.0.0)"
    )
    parser.add_argument(
        "--port", type=int, default=8000, help="Port to bind to (default: 8000)"
    )
    parser.add_argument("--debug", action="store_true", help="Enable debug mode")
    args = parser.parse_args()

    configure_logging(debug=args.debug)

    logger.info(
        f"OS DataHub API MCP Server starting with {args.transport} transport..."
    )

    api_client = OSAPIClient()

    match args.transport:
        case "stdio":
            logger.info("Starting with stdio transport")

            mcp = FastMCP(
                "os-ngd-api",
                debug=args.debug,
                log_level="DEBUG" if args.debug else "INFO",
            )

            stdio_auth = StdioMiddleware()

            service = OSDataHubService(api_client, mcp, stdio_middleware=stdio_auth)

            stdio_api_key = os.environ.get("STDIO_KEY")
            if not stdio_api_key or not stdio_auth.authenticate(stdio_api_key):
                logger.error("Authentication failed")
                return

            service.run()

        case "streamable-http":
            logger.info(f"Starting Streamable HTTP server on {args.host}:{args.port}")

            mcp = FastMCP(
                "os-ngd-api",
                host=args.host,
                port=args.port,
                debug=args.debug,
                json_response=True,
                stateless_http=False,
                log_level="DEBUG" if args.debug else "INFO",
            )

            OSDataHubService(api_client, mcp)

            async def auth_discovery(_: Any) -> JSONResponse:
                """Return authentication methods."""
                return JSONResponse(
                    content={"authMethods": [{"type": "http", "scheme": "bearer"}]}
                )

            app = mcp.streamable_http_app()

            app.routes.append(
                Route(
                    "/.well-known/mcp-auth",
                    endpoint=auth_discovery,
                    methods=["GET"],
                )
            )

            app.user_middleware.extend(
                [
                    Middleware(
                        CORSMiddleware,
                        allow_origins=["*"],
                        allow_credentials=True,
                        allow_methods=["GET", "POST", "OPTIONS"],
                        allow_headers=["*"],
                        expose_headers=["*"],
                    ),
                    Middleware(HTTPMiddleware),
                ]
            )

            uvicorn.run(
                app,
                host=args.host,
                port=args.port,
                log_level="debug" if args.debug else "info",
            )

        case _:
            logger.error(f"Unknown transport: {args.transport}")
            return


if __name__ == "__main__":
    main()

```

--------------------------------------------------------------------------------
/src/config_docs/ogcapi-features-3.yaml:
--------------------------------------------------------------------------------

```yaml
openapi: 3.0.3
info:
  title: "Building Blocks specified in OGC API - Features - Part 3: Filtering"
  description: |-
    Common components used in the
    [OGC standard "OGC API - Features - Part 3: Filtering"](https://docs.ogc.org/is/19-079r2/19-079r2.html).

    OGC API - Features - Part 3: Filtering 1.0 is an OGC Standard.
    Copyright (c) 2024 Open Geospatial Consortium.
    To obtain additional rights of use, visit https://www.ogc.org/legal/ .

    This document is also available in the
    [OGC Schema Repository](https://schemas.opengis.net/ogcapi/features/part3/1.0/openapi/ogcapi-features-3.yaml).
  version: '1.0.0'
  contact:
    name: Clemens Portele
    email: [email protected]
  license:
    name: OGC License
    url: 'https://www.ogc.org/legal/'
paths:
  /collections/{collectionId}/queryables:
    get:
      summary: Get the list of supported queryables for a collection
      description: |-
        This operation returns the list of supported queryables of a collection. Queryables are 
        the properties that may be used to construct a filter expression on items in the collection.
        The response is a JSON Schema of a object where each property is a queryable.
      operationId: getQueryables
      parameters:
        - $ref: 'https://schemas.opengis.net/ogcapi/features/part1/1.0/openapi/parameters/collectionId.yaml'
      responses:
        '200':
          description: The queryable properties of the collection.
          content:
            application/schema+json:
              schema:
                type: object
  /functions:
    get:
      summary: Get the list of supported functions
      description: |-
        This operation returns the list of custom functions supported in CQL2 expressions.
      operationId: getFunctions
      responses:
        '200':
          description: The list of custom functions supported in CQL2 expressions.
          content:
            application/json:
              schema:
                $ref: '#/components/schemas/functions'
components:
  parameters:
    filter:
      name: filter
      in: query
      required: false
      schema:
        type: string
      style: form
      explode: false
    filter-lang:
      name: filter-lang
      in: query
      required: false
      schema:
        type: string
        enum:
          - 'cql2-text'
          - 'cql2-json'
        default: 'cql2-text'
      style: form
    filter-crs:
      name: filter-crs
      in: query
      required: false
      schema:
        type: string
        format: uri-reference
      style: form
      explode: false
  schemas:
    functions:
      type: object
      required:
      - functions
      properties:
        functions:
          type: array
          items:
            type: object
            required:
            - name
            - returns
            properties:
              name:
                type: string
              description:
                type: string
              metadataUrl:
                type: string
                format: uri-reference
              arguments:
                type: array
                items:
                  type: object
                  required:
                  - type
                  properties:
                    title:
                      type: string
                    description:
                      type: string
                    type:
                      type: array
                      items:
                        type: string
                        enum:
                        - string
                        - number
                        - integer
                        - datetime
                        - geometry
                        - boolean
              returns:
                type: array
                items:
                  type: string
                  enum:
                  - string
                  - number
                  - integer
                  - datetime
                  - geometry
                  - boolean

```

--------------------------------------------------------------------------------
/src/middleware/http_middleware.py:
--------------------------------------------------------------------------------

```python
import os
import time
from collections import defaultdict, deque
from typing import List, Callable, Awaitable
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.requests import Request
from starlette.responses import Response, JSONResponse
from utils.logging_config import get_logger

logger = get_logger(__name__)


class RateLimiter:
    """HTTP-layer rate limiting"""

    def __init__(self, requests_per_minute: int = 10, window_seconds: int = 60):
        self.requests_per_minute = requests_per_minute
        self.window_seconds = window_seconds
        self.request_timestamps = defaultdict(lambda: deque())

    def check_rate_limit(self, client_id: str) -> bool:
        """Check if client has exceeded rate limit"""
        current_time = time.time()
        timestamps = self.request_timestamps[client_id]

        while timestamps and current_time - timestamps[0] >= self.window_seconds:
            timestamps.popleft()

        if len(timestamps) >= self.requests_per_minute:
            logger.warning(f"HTTP rate limit exceeded for client {client_id}")
            return False

        timestamps.append(current_time)
        return True


def get_valid_bearer_tokens() -> List[str]:
    """Get valid bearer tokens from environment variable."""
    try:
        tokens = os.environ.get("BEARER_TOKENS", "").split(",")
        valid_tokens = [t.strip() for t in tokens if t.strip()]

        if not valid_tokens:
            logger.warning(
                "No BEARER_TOKENS configured, all authentication will be rejected"
            )
            return []

        return valid_tokens
    except Exception as e:
        logger.error(f"Error getting valid tokens: {e}")
        return []


async def verify_bearer_token(token: str) -> bool:
    """Verify bearer token is valid."""
    try:
        valid_tokens = get_valid_bearer_tokens()
        if not valid_tokens or not token:
            return False
        return token in valid_tokens
    except Exception as e:
        logger.error(f"Error validating token: {e}")
        return False


class HTTPMiddleware(BaseHTTPMiddleware):
    def __init__(self, app, requests_per_minute: int = 10):
        super().__init__(app)
        self.rate_limiter = RateLimiter(requests_per_minute=requests_per_minute)

    async def dispatch(
        self, request: Request, call_next: Callable[[Request], Awaitable[Response]]
    ) -> Response:
        if request.url.path == "/.well-known/mcp-auth" or request.method == "OPTIONS":
            return await call_next(request)

        session_id = request.headers.get("mcp-session-id")
        if not session_id:
            client_ip = request.client.host if request.client else "unknown"
            session_id = f"ip-{client_ip}"

        if not self.rate_limiter.check_rate_limit(session_id):
            return JSONResponse(
                status_code=429,
                content={"detail": "Too many requests. Please try again later."},
                headers={"Retry-After": "60"},
            )

        origin = request.headers.get("origin", "")
        if origin and not self._is_valid_origin(origin, request):
            client_ip = request.client.host if request.client else "unknown"
            logger.warning(
                f"Blocked request with suspicious origin from {client_ip}, Origin: {origin}"
            )
            return JSONResponse(status_code=403, content={"detail": "Invalid origin"})

        user_agent = request.headers.get("user-agent", "")
        if self._is_browser_plugin(user_agent, request):
            client_ip = request.client.host if request.client else "unknown"
            logger.warning(
                f"Blocked browser plugin access from {client_ip}, User-Agent: {user_agent}"
            )
            return JSONResponse(
                status_code=403,
                content={"detail": "Browser plugin access is not allowed"},
            )

        auth_header = request.headers.get("Authorization")
        if auth_header and auth_header.startswith("Bearer "):
            token = auth_header.replace("Bearer ", "")
            if await verify_bearer_token(token):
                request.state.token = token
                return await call_next(request)
            else:
                logger.warning(
                    f"Invalid bearer token attempt from {request.client.host if request.client else 'unknown'}"
                )
        else:
            logger.warning(
                f"Missing or invalid Authorization header from {request.client.host if request.client else 'unknown'}"
            )

        return JSONResponse(
            status_code=401,
            content={"detail": "Authentication required"},
            headers={"WWW-Authenticate": "Bearer"},
        )

    def _is_valid_origin(self, origin: str, request: Request) -> bool:
        """Validate Origin header to prevent DNS rebinding attacks."""
        valid_local_origins = ["http://localhost:", "http://127.0.0.1:"]
        valid_domains = os.environ.get("ALLOWED_ORIGINS", "").split(",")
        valid_domains = [d.strip() for d in valid_domains if d.strip()]

        for valid_origin in valid_local_origins:
            if origin.startswith(valid_origin):
                return True

        for domain in valid_domains:
            if (
                origin == domain
                or origin.startswith(f"https://{domain}")
                or origin.startswith(f"http://{domain}")
            ):
                return True

        return False

    def _is_browser_plugin(self, user_agent: str, request: Request) -> bool:
        """Check if request is from a browser plugin."""
        plugin_patterns = [
            "Chrome-Extension",
            "Mozilla/5.0 (compatible; Extension)",
            "Browser-Extension",
        ]

        for pattern in plugin_patterns:
            if pattern.lower() in user_agent.lower():
                return True

        origin = request.headers.get("origin", "")
        if origin and (
            origin.startswith("chrome-extension://")
            or origin.startswith("moz-extension://")
            or origin.startswith("safari-extension://")
        ):
            return True

        return False

```

--------------------------------------------------------------------------------
/src/stdio_client_test.py:
--------------------------------------------------------------------------------

```python
import asyncio
import subprocess
import sys
import os
import time
from mcp import ClientSession
from mcp.client.stdio import stdio_client
from mcp.client.stdio import StdioServerParameters
from mcp.types import TextContent


def extract_text_from_result(result) -> str:
    """Safely extract text from MCP tool result"""
    if not result.content:
        return "No content"

    for item in result.content:
        if isinstance(item, TextContent):
            return item.text

    content_types = [type(item).__name__ for item in result.content]
    return f"Non-text content: {', '.join(content_types)}"


async def test_stdio_rate_limiting():
    """Test STDIO rate limiting - should block after 1 request per minute"""
    env = os.environ.copy()
    env["STDIO_KEY"] = "test-stdio-key"
    env["OS_API_KEY"] = os.environ.get("OS_API_KEY", "dummy-key-for-testing")
    env["PYTHONPATH"] = "src"

    print("Starting STDIO server subprocess...")

    server_process = subprocess.Popen(
        [sys.executable, "src/server.py", "--transport", "stdio", "--debug"],
        stdin=subprocess.PIPE,
        stdout=subprocess.PIPE,
        stderr=subprocess.PIPE,
        env=env,
        text=True,
        bufsize=0,
    )

    try:
        print("Connecting to STDIO server...")

        server_params = StdioServerParameters(
            command=sys.executable,
            args=["src/server.py", "--transport", "stdio", "--debug"],
            env=env,
        )
        async with stdio_client(server_params) as (read_stream, write_stream):
            async with ClientSession(read_stream, write_stream) as session:
                print("Initializing session...")
                await session.initialize()

                print("Listing available tools...")
                tools = await session.list_tools()
                print(f"   Found {len(tools.tools)} tools")

                print("\nRAPID FIRE TEST (should hit rate limit)")
                print("-" * 50)

                results = []
                for i in range(3):
                    try:
                        start_time = time.time()
                        print(f"Request #{i + 1}: ", end="", flush=True)

                        result = await session.call_tool(
                            "hello_world", {"name": f"TestUser{i + 1}"}
                        )
                        elapsed = time.time() - start_time

                        result_text = extract_text_from_result(result)

                        if "rate limit" in result_text.lower() or "429" in result_text:
                            print(f"BLOCKED ({elapsed:.1f}s) - {result_text}")
                            results.append(("BLOCKED", elapsed, result_text))
                        else:
                            print(f"SUCCESS ({elapsed:.1f}s) - {result_text}")
                            results.append(("SUCCESS", elapsed, result_text))

                    except Exception as e:
                        elapsed = time.time() - start_time
                        print(f"ERROR ({elapsed:.1f}s) - {e}")
                        results.append(("ERROR", elapsed, str(e)))

                    if i < 2:
                        await asyncio.sleep(0.1)

                print("\nRESULTS ANALYSIS")
                print("-" * 50)

                success_count = sum(1 for r in results if r[0] == "SUCCESS")
                blocked_count = sum(1 for r in results if r[0] == "BLOCKED")
                error_count = sum(1 for r in results if r[0] == "ERROR")

                print(f"Successful requests: {success_count}")
                print(f"Rate limited requests: {blocked_count}")
                print(f"Error requests: {error_count}")

                print("\nVISUAL TIMELINE:")
                timeline = ""
                for i, (status, elapsed, _) in enumerate(results):
                    if status == "SUCCESS":
                        timeline += "OK"
                    elif status == "BLOCKED":
                        timeline += "XX"
                    else:
                        timeline += "ER"

                    if i < len(results) - 1:
                        timeline += "--"

                print(f"   {timeline}")
                print("   Request: 1   2   3")

                print("\nTEST VERDICT")
                print("-" * 50)

                if success_count == 1 and blocked_count >= 1:
                    print("PASS: Rate limiting works correctly!")
                    print("   First request succeeded")
                    print("   Subsequent requests blocked")
                elif success_count > 1:
                    print("FAIL: Rate limiting too permissive!")
                    print(f"   {success_count} requests succeeded (expected 1)")
                else:
                    print("FAIL: No requests succeeded!")
                    print("   Check authentication or server issues")

                print("\nWAITING FOR RATE LIMIT RESET...")
                print("   (Testing if limit resets after window)")
                print("   Waiting 10 seconds...")

                for countdown in range(10, 0, -1):
                    print(f"   {countdown}s remaining...", end="\r", flush=True)
                    await asyncio.sleep(1)

                print("\nTesting after rate limit window...")
                try:
                    result = await session.call_tool(
                        "hello_world", {"name": "AfterWait"}
                    )
                    result_text = extract_text_from_result(result)

                    if "rate limit" not in result_text.lower():
                        print("SUCCESS: Rate limit properly reset!")
                    else:
                        print("Rate limit still active (might need longer wait)")

                except Exception as e:
                    print(f"Error after wait: {e}")

    except Exception as e:
        print(f"Test failed with error: {e}")

    finally:
        print("\nCleaning up server process...")
        server_process.terminate()
        try:
            await asyncio.wait_for(
                asyncio.create_task(asyncio.to_thread(server_process.wait)), timeout=5.0
            )
            print("Server terminated gracefully")
        except asyncio.TimeoutError:
            print("Force killing server...")
            server_process.kill()
            server_process.wait()


if __name__ == "__main__":
    print("STDIO Rate Limit Test Suite")
    print("Testing if STDIO middleware properly blocks > 1 request/minute")
    print()

    if not os.environ.get("OS_API_KEY"):
        print("Warning: OS_API_KEY not set, using dummy key for testing")
        print("   (This is OK for rate limit testing)")
        print()

    asyncio.run(test_stdio_rate_limiting())

```

--------------------------------------------------------------------------------
/src/http_client_test.py:
--------------------------------------------------------------------------------

```python
import asyncio
import logging
from mcp.client.streamable_http import streamablehttp_client
from mcp import ClientSession
from mcp.types import TextContent

logging.basicConfig(level=logging.WARNING)
logger = logging.getLogger(__name__)


def extract_text_from_result(result) -> str:
    """Safely extract text from MCP tool result"""
    if not result.content:
        return "No content"

    for item in result.content:
        if isinstance(item, TextContent):
            return item.text

    content_types = [type(item).__name__ for item in result.content]
    return f"Non-text content: {', '.join(content_types)}"


async def test_usrn_search(session_name: str, usrn_values: list):
    """Test USRN searches with different values"""
    headers = {"Authorization": "Bearer dev-token"}
    results = []
    session_id = None

    print(f"\n{session_name} - Testing USRN searches")
    print("-" * 40)

    try:
        async with streamablehttp_client(
            "http://127.0.0.1:8000/mcp", headers=headers
        ) as (
            read_stream,
            write_stream,
            get_session_id,
        ):
            async with ClientSession(read_stream, write_stream) as session:
                await session.initialize()
                session_id = get_session_id()
                print(f"{session_name} Session ID: {session_id}")

                for i, usrn in enumerate(usrn_values):
                    try:
                        result = await session.call_tool(
                            "search_features",
                            {
                                "collection_id": "trn-ntwk-street-1",
                                "query_attr": "usrn",
                                "query_attr_value": str(usrn),
                                "limit": 5,
                            },
                        )
                        result_text = extract_text_from_result(result)
                        print(f"  USRN {usrn}: SUCCESS - Found data")
                        print(result_text)
                        results.append(("SUCCESS", usrn, result_text[:100] + "..."))
                        await asyncio.sleep(0.1)

                    except Exception as e:
                        if "429" in str(e) or "Too Many Requests" in str(e):
                            print(f"  USRN {usrn}: BLOCKED - Rate limited")
                            results.append(("BLOCKED", usrn, str(e)))
                        else:
                            print(f"  USRN {usrn}: ERROR - {e}")
                            results.append(("ERROR", usrn, str(e)))

    except Exception as e:
        print(f"{session_name}: Connection error - {str(e)[:100]}")
        if len(results) == 0:
            results = [("ERROR", "connection", str(e))] * len(usrn_values)

    return results, session_id


async def test_usrn_calls():
    """Test calling USRN searches with different values"""
    print("USRN Search Test - Two Different USRNs")
    print("=" * 50)

    # Different USRN values to test
    usrn_values_1 = ["24501091", "24502114"]
    usrn_values_2 = ["24502114", "24501091"]

    results_a, session_id_a = await test_usrn_search("SESSION-A", usrn_values_1)
    await asyncio.sleep(0.5)
    results_b, session_id_b = await test_usrn_search("SESSION-B", usrn_values_2)

    # Analyze results
    print("\nRESULTS SUMMARY")
    print("=" * 30)

    success_a = len([r for r in results_a if r[0] == "SUCCESS"])
    blocked_a = len([r for r in results_a if r[0] == "BLOCKED"])
    error_a = len([r for r in results_a if r[0] == "ERROR"])

    success_b = len([r for r in results_b if r[0] == "SUCCESS"])
    blocked_b = len([r for r in results_b if r[0] == "BLOCKED"])
    error_b = len([r for r in results_b if r[0] == "ERROR"])

    print(f"SESSION-A: {success_a} success, {blocked_a} blocked, {error_a} errors")
    print(f"SESSION-B: {success_b} success, {blocked_b} blocked, {error_b} errors")
    print(f"Total: {success_a + success_b} success, {blocked_a + blocked_b} blocked")

    if session_id_a and session_id_b:
        print(f"Different session IDs: {session_id_a != session_id_b}")
    else:
        print("Could not compare session IDs")

    # Show detailed results
    print("\nDETAILED RESULTS:")
    print("SESSION-A:")
    for status, usrn, details in results_a:
        print(f"  USRN {usrn}: {status}")

    print("SESSION-B:")
    for status, usrn, details in results_b:
        print(f"  USRN {usrn}: {status}")


async def test_session_safe(session_name: str, requests: int = 3):
    """Test a single session with safer error handling"""
    headers = {"Authorization": "Bearer dev-token"}
    results = []
    session_id = None

    print(f"\n{session_name} - Testing {requests} requests")
    print("-" * 40)

    try:
        async with streamablehttp_client(
            "http://127.0.0.1:8000/mcp", headers=headers
        ) as (
            read_stream,
            write_stream,
            get_session_id,
        ):
            async with ClientSession(read_stream, write_stream) as session:
                await session.initialize()
                session_id = get_session_id()
                print(f"{session_name} Session ID: {session_id}")

                for i in range(requests):
                    try:
                        result = await session.call_tool(
                            "hello_world", {"name": f"{session_name}-User{i + 1}"}
                        )
                        result_text = extract_text_from_result(result)
                        print(f"  Request {i + 1}: SUCCESS - {result_text}")
                        results.append("SUCCESS")
                        await asyncio.sleep(0.1)

                    except Exception as e:
                        if "429" in str(e) or "Too Many Requests" in str(e):
                            print(f"  Request {i + 1}: BLOCKED - Rate limited")
                            results.append("BLOCKED")
                        else:
                            print(f"  Request {i + 1}: ERROR - {e}")
                            results.append("ERROR")

                        for j in range(i + 1, requests):
                            print(f"  Request {j + 1}: BLOCKED - Session rate limited")
                            results.append("BLOCKED")
                        break

    except Exception as e:
        print(f"{session_name}: Connection error - {str(e)[:100]}")
        if len(results) == 0:
            results = ["ERROR"] * requests
        elif len(results) < requests:
            remaining = requests - len(results)
            results.extend(["BLOCKED"] * remaining)

    return results, session_id


async def test_two_sessions():
    """Test rate limiting across two sessions"""
    print("HTTP Rate Limit Test - Two Sessions")
    print("=" * 50)

    results_a, session_id_a = await test_session_safe("SESSION-A", 20)
    await asyncio.sleep(0.5)
    results_b, session_id_b = await test_session_safe("SESSION-B", 20)

    # Analyze results
    print("\nRESULTS SUMMARY")
    print("=" * 30)

    success_a = results_a.count("SUCCESS")
    blocked_a = results_a.count("BLOCKED")
    error_a = results_a.count("ERROR")

    success_b = results_b.count("SUCCESS")
    blocked_b = results_b.count("BLOCKED")
    error_b = results_b.count("ERROR")

    print(f"SESSION-A: {success_a} success, {blocked_a} blocked, {error_a} errors")
    print(f"SESSION-B: {success_b} success, {blocked_b} blocked, {error_b} errors")
    print(f"Total: {success_a + success_b} success, {blocked_a + blocked_b} blocked")

    if session_id_a and session_id_b:
        print(f"Different session IDs: {session_id_a != session_id_b}")
    else:
        print("Could not compare session IDs")

    total_success = success_a + success_b
    total_blocked = blocked_a + blocked_b

    print("\nRATE LIMITING ASSESSMENT:")
    if total_success >= 2 and total_blocked >= 2:
        print("✅ PASS: Rate limiting is working")
        print(f"   - {total_success} requests succeeded")
        print(f"   - {total_blocked} requests were rate limited")
        print("   - Each session got limited after ~2 requests")
    elif total_success == 0:
        print("❌ FAIL: No requests succeeded (check server/auth)")
    else:
        print("⚠️  UNCLEAR: Unexpected pattern")
        print("   Check server logs for actual behavior")


if __name__ == "__main__":
    # Run the USRN test by default
    asyncio.run(test_usrn_calls())

    # Uncomment the line below to run the original test instead
    # asyncio.run(test_two_sessions())

```

--------------------------------------------------------------------------------
/src/mcp_service/routing_service.py:
--------------------------------------------------------------------------------

```python
from typing import Dict, List, Set, Optional, Any
from dataclasses import dataclass
from utils.logging_config import get_logger

logger = get_logger(__name__)


@dataclass
class RouteNode:
    """Represents a routing node (intersection)"""

    id: int
    node_identifier: str
    connected_edges: Set[int]


@dataclass
class RouteEdge:
    """Represents a routing edge (road segment)"""

    id: int
    road_id: str
    road_name: Optional[str]
    source_node_id: int
    target_node_id: int
    cost: float
    reverse_cost: float
    geometry: Optional[Dict[str, Any]]


class InMemoryRoutingNetwork:
    """In-memory routing network built from OS NGD data"""

    def __init__(self):
        self.nodes: Dict[int, RouteNode] = {}
        self.edges: Dict[int, RouteEdge] = {}
        self.node_lookup: Dict[str, int] = {}
        self.is_built = False

    def add_node(self, node_identifier: str) -> int:
        """Add a node and return its internal ID"""
        if node_identifier in self.node_lookup:
            return self.node_lookup[node_identifier]

        node_id = len(self.nodes) + 1
        self.nodes[node_id] = RouteNode(
            id=node_id,
            node_identifier=node_identifier,
            connected_edges=set(),
        )
        self.node_lookup[node_identifier] = node_id
        return node_id

    def add_edge(self, road_data: Dict[str, Any]) -> None:
        """Add a road link as an edge"""
        properties = road_data.get("properties", {})

        start_node = properties.get("startnode", "")
        end_node = properties.get("endnode", "")

        if not start_node or not end_node:
            logger.warning(f"Road link {properties.get('id')} missing node data")
            return

        source_id = self.add_node(start_node)
        target_id = self.add_node(end_node)

        roadlink_id = None
        road_track_refs = properties.get("roadtrackorpathreference", [])
        if road_track_refs and len(road_track_refs) > 0:
            roadlink_id = road_track_refs[0].get("roadlinkid", "")

        edge_id = len(self.edges) + 1
        cost = properties.get("geometry_length", 100.0)

        edge = RouteEdge(
            id=edge_id,
            road_id=roadlink_id or "NONE",
            road_name=properties.get("name1_text"),
            source_node_id=source_id,
            target_node_id=target_id,
            cost=cost,
            reverse_cost=cost,
            geometry=road_data.get("geometry"),
        )

        self.edges[edge_id] = edge

        self.nodes[source_id].connected_edges.add(edge_id)
        self.nodes[target_id].connected_edges.add(edge_id)

    def get_connected_edges(self, node_id: int) -> List[RouteEdge]:
        """Get all edges connected to a node"""
        if node_id not in self.nodes:
            return []

        return [self.edges[edge_id] for edge_id in self.nodes[node_id].connected_edges]

    def get_summary(self) -> Dict[str, Any]:
        """Get network summary statistics"""
        return {
            "total_nodes": len(self.nodes),
            "total_edges": len(self.edges),
            "is_built": self.is_built,
            "sample_nodes": [
                {
                    "id": node.id,
                    "identifier": node.node_identifier,
                    "connected_edges": len(node.connected_edges),
                }
                for node in list(self.nodes.values())[:5]
            ],
        }

    def get_all_nodes(self) -> List[Dict[str, Any]]:
        """Get all nodes as a flat list"""
        return [
            {
                "id": node.id,
                "node_identifier": node.node_identifier,
                "connected_edge_count": len(node.connected_edges),
                "connected_edge_ids": list(node.connected_edges),
            }
            for node in self.nodes.values()
        ]

    def get_all_edges(self) -> List[Dict[str, Any]]:
        """Get all edges as a flat list"""
        return [
            {
                "id": edge.id,
                "road_id": edge.road_id,
                "road_name": edge.road_name,
                "source_node_id": edge.source_node_id,
                "target_node_id": edge.target_node_id,
                "cost": edge.cost,
                "reverse_cost": edge.reverse_cost,
                "geometry": edge.geometry,
            }
            for edge in self.edges.values()
        ]


class OSRoutingService:
    """Service to build and query routing networks from OS NGD data"""

    def __init__(self, api_client):
        self.api_client = api_client
        self.network = InMemoryRoutingNetwork()
        self.raw_restrictions: List[Dict[str, Any]] = []

    async def _fetch_restriction_data(
        self, bbox: Optional[str] = None, limit: int = 100
    ) -> List[Dict[str, Any]]:
        """Fetch raw restriction data"""
        try:
            logger.debug("Fetching restriction data...")
            params = {
                "limit": min(limit, 100),
                "crs": "http://www.opengis.net/def/crs/EPSG/0/4326",
            }

            if bbox:
                params["bbox"] = bbox

            restriction_data = await self.api_client.make_request(
                "COLLECTION_FEATURES",
                params=params,
                path_params=["trn-rami-restriction-1"],
            )

            features = restriction_data.get("features", [])
            logger.debug(f"Fetched {len(features)} restriction features")

            return features

        except Exception as e:
            logger.error(f"Error fetching restriction data: {e}")
            return []

    async def build_routing_network(
        self,
        bbox: Optional[str] = None,
        limit: int = 1000,
        include_restrictions: bool = True,
    ) -> Dict[str, Any]:
        """Build the routing network from OS NGD road links with optional restriction data"""
        try:
            logger.debug("Building routing network from OS NGD data...")

            if include_restrictions:
                self.raw_restrictions = await self._fetch_restriction_data(bbox, limit)

            params = {
                "limit": min(limit, 100),
                "crs": "http://www.opengis.net/def/crs/EPSG/0/4326",
            }

            if bbox:
                params["bbox"] = bbox

            road_links_data = await self.api_client.make_request(
                "COLLECTION_FEATURES",
                params=params,
                path_params=["trn-ntwk-roadlink-4"],
            )

            features = road_links_data.get("features", [])
            logger.debug(f"Processing {len(features)} road links...")

            for feature in features:
                self.network.add_edge(feature)

            self.network.is_built = True

            summary = self.network.get_summary()
            logger.debug(
                f"Network built: {summary['total_nodes']} nodes, {summary['total_edges']} edges"
            )

            return {
                "status": "success",
                "message": f"Built routing network with {summary['total_nodes']} nodes and {summary['total_edges']} edges",
                "network_summary": summary,
                "restrictions": self.raw_restrictions if include_restrictions else [],
                "restriction_count": len(self.raw_restrictions)
                if include_restrictions
                else 0,
            }

        except Exception as e:
            logger.error(f"Error building routing network: {e}")
            return {"status": "error", "error": str(e)}

    def get_network_info(self) -> Dict[str, Any]:
        """Get current network information"""
        return {"status": "success", "network": self.network.get_summary()}

    def get_flat_nodes(self) -> Dict[str, Any]:
        """Get flat list of all nodes"""
        if not self.network.is_built:
            return {
                "status": "error",
                "error": "Routing network not built. Call build_routing_network first.",
            }

        return {"status": "success", "nodes": self.network.get_all_nodes()}

    def get_flat_edges(self) -> Dict[str, Any]:
        """Get flat list of all edges with connections"""
        if not self.network.is_built:
            return {
                "status": "error",
                "error": "Routing network not built. Call build_routing_network first.",
            }

        return {"status": "success", "edges": self.network.get_all_edges()}

    def get_routing_tables(self) -> Dict[str, Any]:
        """Get both nodes and edges as flat tables"""
        if not self.network.is_built:
            return {
                "status": "error",
                "error": "Routing network not built. Call build_routing_network first.",
            }

        return {
            "status": "success",
            "nodes": self.network.get_all_nodes(),
            "edges": self.network.get_all_edges(),
            "summary": self.network.get_summary(),
            "restrictions": self.raw_restrictions,
        }

```

--------------------------------------------------------------------------------
/src/api_service/os_api.py:
--------------------------------------------------------------------------------

```python
import os
import aiohttp
import asyncio
import re
import concurrent.futures
import threading

from typing import Dict, List, Any, Optional
from models import (
    NGDAPIEndpoint,
    OpenAPISpecification,
    Collection,
    CollectionsCache,
    CollectionQueryables,
)
from api_service.protocols import APIClient
from utils.logging_config import get_logger

logger = get_logger(__name__)


class OSAPIClient(APIClient):
    """Implementation an OS API client"""

    user_agent = "os-ngd-mcp-server/1.0"

    def __init__(self, api_key: Optional[str] = None):
        """
        Initialise the OS API client

        Args:
            api_key: Optional API key, if not provided will use OS_API_KEY env var
        """
        self.api_key = api_key
        self.session = None
        self.last_request_time = 0
        # TODO: This is because there seems to be some rate limiting in place - TBC if this is the case
        self.request_delay = 0.7
        self._cached_openapi_spec: Optional[OpenAPISpecification] = None
        self._cached_collections: Optional[CollectionsCache] = None

    # Private helper methods
    def _sanitise_api_key(self, text: Any) -> str:
        """Remove API keys from any text (URLs, error messages, etc.)"""
        if not isinstance(text, str):
            return text

        patterns = [
            r"[?&]key=[^&\s]*",
            r"[?&]api_key=[^&\s]*",
            r"[?&]apikey=[^&\s]*",
            r"[?&]token=[^&\s]*",
        ]

        sanitized = text
        for pattern in patterns:
            sanitized = re.sub(pattern, "", sanitized, flags=re.IGNORECASE)

        sanitized = re.sub(r"[?&]$", "", sanitized)
        sanitized = re.sub(r"&{2,}", "&", sanitized)
        sanitized = re.sub(r"\?&", "?", sanitized)

        return sanitized

    def _sanitise_response(self, data: Any) -> Any:
        """Remove API keys from response data recursively"""
        if isinstance(data, dict):
            sanitized_dict = {}
            for key, value in data.items():
                if isinstance(value, str) and any(
                    url_indicator in key.lower()
                    for url_indicator in ["href", "url", "link", "uri"]
                ):
                    sanitized_dict[key] = self._sanitise_api_key(value)
                elif isinstance(value, (dict, list)):
                    sanitized_dict[key] = self._sanitise_response(value)
                else:
                    sanitized_dict[key] = value
            return sanitized_dict
        elif isinstance(data, list):
            return [self._sanitise_response(item) for item in data]
        elif isinstance(data, str):
            if any(
                indicator in data
                for indicator in [
                    "http://",
                    "https://",
                    "key=",
                    "api_key=",
                    "apikey=",
                    "token=",
                ]
            ):
                return self._sanitise_api_key(data)

        return data

    def _filter_latest_collections(
        self, collections: List[Dict[str, Any]]
    ) -> List[Collection]:
        """
        Filter collections to keep only the latest version of each collection type.
        For collections with IDs like 'trn-ntwk-roadlink-1', 'trn-ntwk-roadlink-2', 'trn-ntwk-roadlink-3',
        only keep the one with the highest number.

        Args:
            collections: Raw collections from API

        Returns:
            Filtered list of Collection objects
        """
        latest_versions: Dict[str, Dict[str, Any]] = {}

        for col in collections:
            col_id = col.get("id", "")

            match = re.match(r"^(.+?)-(\d+)$", col_id)

            if match:
                base_name = match.group(1)
                version_num = int(match.group(2))

                if (
                    base_name not in latest_versions
                    or version_num > latest_versions[base_name]["version"]
                ):
                    latest_versions[base_name] = {"version": version_num, "data": col}
            else:
                latest_versions[col_id] = {"version": 0, "data": col}

        filtered_collections = []
        for item in latest_versions.values():
            col_data = item["data"]
            filtered_collections.append(
                Collection(
                    id=col_data.get("id", ""),
                    title=col_data.get("title", ""),
                    description=col_data.get("description", ""),
                    links=col_data.get("links", []),
                    extent=col_data.get("extent", {}),
                    itemType=col_data.get("itemType", "feature"),
                )
            )

        return filtered_collections

    def _parse_openapi_spec_for_llm(
        self, spec_data: dict, collection_ids: List[str]
    ) -> dict:
        """Parse OpenAPI spec to extract only essential information for LLM context"""
        supported_crs = {
            "input": [],
            "output": [],
            "default": "http://www.opengis.net/def/crs/OGC/1.3/CRS84",
        }

        parsed = {
            "title": spec_data.get("info", {}).get("title", ""),
            "version": spec_data.get("info", {}).get("version", ""),
            "base_url": spec_data.get("servers", [{}])[0].get("url", ""),
            "endpoints": {},
            "collection_ids": collection_ids,
            "supported_crs": supported_crs,
        }

        paths = spec_data.get("paths", {})
        for path, methods in paths.items():
            for method, details in methods.items():
                if method == "get" and "parameters" in details:
                    for param in details["parameters"]:
                        param_name = param.get("name", "")

                        if param_name == "collectionId" and "schema" in param:
                            enum_values = param["schema"].get("enum", [])
                            if enum_values:
                                parsed["collection_ids"] = enum_values

                        elif (
                            param_name in ["bbox-crs", "filter-crs"]
                            and "schema" in param
                        ):
                            crs_values = param["schema"].get("enum", [])
                            if crs_values and not supported_crs["input"]:
                                supported_crs["input"] = crs_values

                        elif param_name == "crs" and "schema" in param:
                            crs_values = param["schema"].get("enum", [])
                            if crs_values and not supported_crs["output"]:
                                supported_crs["output"] = crs_values

        endpoint_patterns = {
            "/collections": "List all collections",
            "/collections/{collectionId}": "Get collection info",
            "/collections/{collectionId}/schema": "Get collection schema",
            "/collections/{collectionId}/queryables": "Get collection queryables",
            "/collections/{collectionId}/items": "Search features in collection",
            "/collections/{collectionId}/items/{featureId}": "Get specific feature",
        }
        parsed["endpoints"] = endpoint_patterns
        parsed["crs_guide"] = {
            "WGS84": "http://www.opengis.net/def/crs/OGC/1.3/CRS84 (default, longitude/latitude)",
            "British_National_Grid": "http://www.opengis.net/def/crs/EPSG/0/27700 (UK Ordnance Survey)",
            "WGS84_latlon": "http://www.opengis.net/def/crs/EPSG/0/4326 (latitude/longitude)",
            "Web_Mercator": "http://www.opengis.net/def/crs/EPSG/0/3857 (Web mapping)",
        }

        return parsed

    # Private async methods
    async def _get_open_api_spec(self) -> OpenAPISpecification:
        """Get the OpenAPI spec for the OS NGD API"""
        try:
            response = await self.make_request("OPENAPI_SPEC", params={"f": "json"})

            # Sanitize the raw response before processing
            sanitized_response = self._sanitise_response(response)

            collections_cache = await self.cache_collections()
            filtered_collection_ids = [col.id for col in collections_cache.collections]

            parsed_spec = self._parse_openapi_spec_for_llm(
                sanitized_response, filtered_collection_ids
            )

            spec = OpenAPISpecification(
                title=parsed_spec["title"],
                version=parsed_spec["version"],
                base_url=parsed_spec["base_url"],
                endpoints=parsed_spec["endpoints"],
                collection_ids=filtered_collection_ids,
                supported_crs=parsed_spec["supported_crs"],
                crs_guide=parsed_spec["crs_guide"],
            )
            return spec
        except Exception as e:
            raise ValueError(f"Failed to get OpenAPI spec: {e}")

    async def cache_openapi_spec(self) -> OpenAPISpecification:
        """
        Cache the OpenAPI spec.

        Returns:
            The cached OpenAPI spec
        """
        if self._cached_openapi_spec is None:
            logger.debug("Caching OpenAPI spec for LLM context...")
            try:
                self._cached_openapi_spec = await self._get_open_api_spec()
                logger.debug("OpenAPI spec successfully cached")
            except Exception as e:
                raise ValueError(f"Failed to cache OpenAPI spec: {e}")
        return self._cached_openapi_spec

    async def _get_collections(self) -> CollectionsCache:
        """Get all collections from the OS NGD API"""
        try:
            response = await self.make_request("COLLECTIONS")
            collections_list = response.get("collections", [])
            filtered = self._filter_latest_collections(collections_list)
            logger.debug(f"Filtered collections: {len(filtered)} collections")
            return CollectionsCache(collections=filtered, raw_response=response)
        except Exception as e:
            sanitized_error = self._sanitise_api_key(str(e))
            logger.error(f"Error getting collections: {sanitized_error}")
            raise ValueError(f"Failed to get collections: {sanitized_error}")

    async def cache_collections(self) -> CollectionsCache:
        """
        Cache the collections data with filtering applied.

        Returns:
            The cached collections
        """
        if self._cached_collections is None:
            logger.debug("Caching collections for LLM context...")
            try:
                self._cached_collections = await self._get_collections()
                logger.debug(
                    f"Collections successfully cached - {len(self._cached_collections.collections)} collections after filtering"
                )
            except Exception as e:
                sanitized_error = self._sanitise_api_key(str(e))
                raise ValueError(f"Failed to cache collections: {sanitized_error}")
        return self._cached_collections

    async def fetch_collections_queryables(
        self, collection_ids: List[str]
    ) -> Dict[str, CollectionQueryables]:
        """Fetch detailed queryables for specific collections only"""
        if not collection_ids:
            return {}

        logger.debug(f"Fetching queryables for specific collections: {collection_ids}")

        collections_cache = await self.cache_collections()
        collections_map = {coll.id: coll for coll in collections_cache.collections}

        tasks = [
            self.make_request("COLLECTION_QUERYABLES", path_params=[collection_id])
            for collection_id in collection_ids
            if collection_id in collections_map
        ]

        if not tasks:
            return {}

        raw_queryables = await asyncio.gather(*tasks, return_exceptions=True)

        def process_single_collection_queryables(collection_id, queryables_data):
            collection = collections_map[collection_id]
            logger.debug(
                f"Processing collection {collection.id} in thread {threading.current_thread().name}"
            )

            if isinstance(queryables_data, Exception):
                logger.warning(
                    f"Failed to fetch queryables for {collection.id}: {queryables_data}"
                )
                return (
                    collection.id,
                    CollectionQueryables(
                        id=collection.id,
                        title=collection.title,
                        description=collection.description,
                        all_queryables={},
                        enum_queryables={},
                        has_enum_filters=False,
                        total_queryables=0,
                        enum_count=0,
                    ),
                )

            all_queryables = {}
            enum_queryables = {}
            properties = queryables_data.get("properties", {})

            for prop_name, prop_details in properties.items():
                prop_type = prop_details.get("type", ["string"])
                if isinstance(prop_type, list):
                    main_type = prop_type[0] if prop_type else "string"
                    is_nullable = "null" in prop_type
                else:
                    main_type = prop_type
                    is_nullable = False

                all_queryables[prop_name] = {
                    "type": main_type,
                    "nullable": is_nullable,
                    "max_length": prop_details.get("maxLength"),
                    "format": prop_details.get("format"),
                    "pattern": prop_details.get("pattern"),
                    "minimum": prop_details.get("minimum"),
                    "maximum": prop_details.get("maximum"),
                    "is_enum": prop_details.get("enumeration", False),
                }

                if prop_details.get("enumeration") and "enum" in prop_details:
                    enum_queryables[prop_name] = {
                        "values": prop_details["enum"],
                        "type": main_type,
                        "nullable": is_nullable,
                        "max_length": prop_details.get("maxLength"),
                    }
                    all_queryables[prop_name]["enum_values"] = prop_details["enum"]

                all_queryables[prop_name] = {
                    k: v for k, v in all_queryables[prop_name].items() if v is not None
                }

            return (
                collection.id,
                CollectionQueryables(
                    id=collection.id,
                    title=collection.title,
                    description=collection.description,
                    all_queryables=all_queryables,
                    enum_queryables=enum_queryables,
                    has_enum_filters=len(enum_queryables) > 0,
                    total_queryables=len(all_queryables),
                    enum_count=len(enum_queryables),
                ),
            )

        with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
            collection_data_pairs = list(zip(collection_ids, raw_queryables))
            processed = await asyncio.get_event_loop().run_in_executor(
                executor,
                lambda: [
                    process_single_collection_queryables(coll_id, data)
                    for coll_id, data in collection_data_pairs
                    if coll_id in collections_map
                ],
            )

        return {coll_id: queryables for coll_id, queryables in processed}

    # Public async methods
    async def initialise(self):
        """Initialise the aiohttp session if not already created"""
        if self.session is None:
            self.session = aiohttp.ClientSession(
                connector=aiohttp.TCPConnector(
                    force_close=True,
                    limit=1,  # TODO: Strict limit to only 1 connection - may need to revisit this
                )
            )

    async def close(self):
        """Close the session when done"""
        if self.session:
            await self.session.close()
            self.session = None
            self._cached_openapi_spec = None
            self._cached_collections = None

    async def get_api_key(self) -> str:
        """Get the OS API key from environment variable or init param."""
        if self.api_key:
            return self.api_key

        api_key = os.environ.get("OS_API_KEY")
        if not api_key:
            raise ValueError("OS_API_KEY environment variable is not set")
        return api_key

    async def make_request(
        self,
        endpoint: str,
        params: Optional[Dict[str, Any]] = None,
        path_params: Optional[List[str]] = None,
        max_retries: int = 2,
    ) -> Dict[str, Any]:
        """
        Make a request to the OS NGD API with proper error handling.

        Args:
            endpoint: Enum endpoint to use
            params: Additional query parameters
            path_params: Parameters to format into the URL path
            max_retries: Maximum number of retries for transient errors

        Returns:
            JSON response as dictionary
        """
        await self.initialise()

        if self.session is None:
            raise ValueError("Session not initialised")

        current_time = asyncio.get_event_loop().time()
        elapsed = current_time - self.last_request_time
        if elapsed < self.request_delay:
            await asyncio.sleep(self.request_delay - elapsed)

        try:
            endpoint_value = NGDAPIEndpoint[endpoint].value
        except KeyError:
            raise ValueError(f"Invalid endpoint: {endpoint}")

        if path_params:
            endpoint_value = endpoint_value.format(*path_params)

        api_key = await self.get_api_key()
        request_params = params or {}
        request_params["key"] = api_key

        headers = {"User-Agent": self.user_agent, "Accept": "application/json"}

        client_ip = getattr(self.session, "_source_address", None)
        client_info = f" from {client_ip}" if client_ip else ""

        sanitized_url = self._sanitise_api_key(endpoint_value)
        logger.info(f"Requesting URL: {sanitized_url}{client_info}")

        for attempt in range(1, max_retries + 1):
            try:
                self.last_request_time = asyncio.get_event_loop().time()

                timeout = aiohttp.ClientTimeout(total=30.0)
                async with self.session.get(
                    endpoint_value,
                    params=request_params,
                    headers=headers,
                    timeout=timeout,
                ) as response:
                    if response.status >= 400:
                        error_text = await response.text()
                        sanitized_error = self._sanitise_api_key(error_text)
                        error_message = (
                            f"HTTP Error: {response.status} - {sanitized_error}"
                        )
                        logger.error(f"Error: {error_message}")
                        raise ValueError(error_message)

                    response_data = await response.json()

                    return self._sanitise_response(response_data)
            except (aiohttp.ClientError, asyncio.TimeoutError) as e:
                if attempt == max_retries:
                    sanitized_exception = self._sanitise_api_key(str(e))
                    error_message = f"Request failed after {max_retries} attempts: {sanitized_exception}"
                    logger.error(f"Error: {error_message}")
                    raise ValueError(error_message)
                else:
                    await asyncio.sleep(0.7)
            except Exception as e:
                sanitized_exception = self._sanitise_api_key(str(e))
                error_message = f"Request failed: {sanitized_exception}"
                logger.error(f"Error: {error_message}")
                raise ValueError(error_message)
        raise RuntimeError(
            "Unreachable: make_request exited retry loop without returning or raising"
        )

    async def make_request_no_auth(
        self,
        url: str,
        params: Optional[Dict[str, Any]] = None,
        max_retries: int = 2,
    ) -> str:
        """
        Make a request without authentication (for public endpoints like documentation).

        Args:
            url: Full URL to request
            params: Additional query parameters
            max_retries: Maximum number of retries for transient errors

        Returns:
            Response text (not JSON parsed)
        """
        await self.initialise()

        if self.session is None:
            raise ValueError("Session not initialised")

        current_time = asyncio.get_event_loop().time()
        elapsed = current_time - self.last_request_time
        if elapsed < self.request_delay:
            await asyncio.sleep(self.request_delay - elapsed)

        request_params = params or {}
        headers = {"User-Agent": self.user_agent}

        logger.info(f"Requesting URL (no auth): {url}")

        for attempt in range(1, max_retries + 1):
            try:
                self.last_request_time = asyncio.get_event_loop().time()

                timeout = aiohttp.ClientTimeout(total=30.0)
                async with self.session.get(
                    url,
                    params=request_params,
                    headers=headers,
                    timeout=timeout,
                ) as response:
                    if response.status >= 400:
                        error_text = await response.text()
                        error_message = f"HTTP Error: {response.status} - {error_text}"
                        logger.error(f"Error: {error_message}")
                        raise ValueError(error_message)

                    return await response.text()

            except (aiohttp.ClientError, asyncio.TimeoutError) as e:
                if attempt == max_retries:
                    error_message = (
                        f"Request failed after {max_retries} attempts: {str(e)}"
                    )
                    logger.error(f"Error: {error_message}")
                    raise ValueError(error_message)
                else:
                    await asyncio.sleep(0.7)
            except Exception as e:
                error_message = f"Request failed: {str(e)}"
                logger.error(f"Error: {error_message}")
                raise ValueError(error_message)

        raise RuntimeError(
            "Unreachable: make_request_no_auth exited retry loop without returning or raising"
        )

```

--------------------------------------------------------------------------------
/src/mcp_service/os_service.py:
--------------------------------------------------------------------------------

```python
import json
import asyncio
import functools
import re

from typing import Optional, List, Dict, Any, Union, Callable
from api_service.protocols import APIClient
from prompt_templates.prompt_templates import PROMPT_TEMPLATES
from mcp_service.protocols import MCPService, FeatureService
from mcp_service.guardrails import ToolGuardrails
from workflow_generator.workflow_planner import WorkflowPlanner
from utils.logging_config import get_logger
from mcp_service.resources import OSDocumentationResources
from mcp_service.prompts import OSWorkflowPrompts
from mcp_service.routing_service import OSRoutingService

logger = get_logger(__name__)


class OSDataHubService(FeatureService):
    """Implementation of the OS NGD API service with MCP"""

    def __init__(
        self, api_client: APIClient, mcp_service: MCPService, stdio_middleware=None
    ):
        """
        Initialise the OS NGD service

        Args:
            api_client: API client implementation
            mcp_service: MCP service implementation
            stdio_middleware: Optional STDIO middleware for rate limiting
        """
        self.api_client = api_client
        self.mcp = mcp_service
        self.stdio_middleware = stdio_middleware
        self.workflow_planner: Optional[WorkflowPlanner] = None
        self.guardrails = ToolGuardrails()
        self.routing_service = OSRoutingService(api_client)
        self.register_tools()
        self.register_resources()
        self.register_prompts()

    # Register all the resources, tools, and prompts
    def register_resources(self) -> None:
        """Register all MCP resources"""
        doc_resources = OSDocumentationResources(self.mcp, self.api_client)
        doc_resources.register_all()

    def register_tools(self) -> None:
        """Register all MCP tools with guardrails and middleware"""

        def apply_middleware(func: Callable) -> Callable:
            wrapped = self.guardrails.basic_guardrails(func)
            wrapped = self._require_workflow_context(wrapped)
            if self.stdio_middleware:
                wrapped = self.stdio_middleware.require_auth_and_rate_limit(wrapped)
            return wrapped

        # Apply middleware to ALL tools
        self.get_workflow_context = self.mcp.tool()(
            apply_middleware(self.get_workflow_context)
        )
        self.hello_world = self.mcp.tool()(apply_middleware(self.hello_world))
        self.check_api_key = self.mcp.tool()(apply_middleware(self.check_api_key))
        self.list_collections = self.mcp.tool()(apply_middleware(self.list_collections))
        self.get_single_collection = self.mcp.tool()(
            apply_middleware(self.get_single_collection)
        )
        self.get_single_collection_queryables = self.mcp.tool()(
            apply_middleware(self.get_single_collection_queryables)
        )
        self.search_features = self.mcp.tool()(apply_middleware(self.search_features))
        self.get_feature = self.mcp.tool()(apply_middleware(self.get_feature))
        self.get_linked_identifiers = self.mcp.tool()(
            apply_middleware(self.get_linked_identifiers)
        )
        self.get_bulk_features = self.mcp.tool()(
            apply_middleware(self.get_bulk_features)
        )
        self.get_bulk_linked_features = self.mcp.tool()(
            apply_middleware(self.get_bulk_linked_features)
        )
        self.get_prompt_templates = self.mcp.tool()(
            apply_middleware(self.get_prompt_templates)
        )
        self.fetch_detailed_collections = self.mcp.tool()(
            apply_middleware(self.fetch_detailed_collections)
        )
        self.get_routing_data = self.mcp.tool()(apply_middleware(self.get_routing_data))

    def register_prompts(self) -> None:
        """Register all MCP prompts"""
        workflow_prompts = OSWorkflowPrompts(self.mcp)
        workflow_prompts.register_all()

    # Run the MCP service
    def run(self) -> None:
        """Run the MCP service"""
        try:
            self.mcp.run()
        finally:
            try:
                try:
                    loop = asyncio.get_running_loop()
                    loop.create_task(self._cleanup())
                except RuntimeError:
                    asyncio.run(self._cleanup())
            except Exception as e:
                logger.error(f"Error during cleanup: {e}")

    async def _cleanup(self):
        """Async cleanup method"""
        try:
            if hasattr(self, "api_client") and self.api_client:
                await self.api_client.close()
                logger.debug("API client closed successfully")
        except Exception as e:
            logger.error(f"Error closing API client: {e}")

    # Get the workflow context from the cached API client data
    # TODO: Lots of work to do here to reduce the size of the context and make it more readable for the LLM but not sacrificing the information
    async def get_workflow_context(self) -> str:
        """Get basic workflow context - no detailed queryables yet"""
        try:
            if self.workflow_planner is None:
                collections_cache = await self.api_client.cache_collections()
                basic_collections_info = {
                    coll.id: {
                        "id": coll.id,
                        "title": coll.title,
                        "description": coll.description,
                        # No queryables here - will be fetched on-demand
                    }
                    for coll in collections_cache.collections
                }

                self.workflow_planner = WorkflowPlanner(
                    await self.api_client.cache_openapi_spec(), basic_collections_info
                )

            context = self.workflow_planner.get_basic_context()
            return json.dumps(
                {
                    "CRITICAL_COLLECTION_LIST": sorted(
                        context["available_collections"].keys()
                    ),
                    "MANDATORY_PLANNING_REQUIREMENT": {
                        "CRITICAL": "You MUST follow the 2-step planning process:",
                        "step_1": "Explain your complete plan listing which specific collections you will use and why",
                        "step_2": "Call fetch_detailed_collections('collection-id-1,collection-id-2') to get queryables for those collections BEFORE making search calls",
                        "required_explanation": {
                            "1": "Which collections you will use and why",
                            "2": "What you expect to find in those collections",
                            "3": "What your search strategy will be",
                        },
                        "workflow_enforcement": "Do not proceed with search_features until you have fetched detailed queryables",
                        "example_planning": "I will use 'lus-fts-site-1' for finding cinemas. Let me fetch its detailed queryables first...",
                    },
                    "available_collections": context[
                        "available_collections"
                    ],  # Basic info only - no queryables yet - this is to reduce the size of the context for the LLM
                    "openapi_spec": context["openapi_spec"].model_dump()
                    if context["openapi_spec"]
                    else None,
                    "TWO_STEP_WORKFLOW": {
                        "step_1": "Plan with basic collection info (no detailed queryables available yet)",
                        "step_2": "Use fetch_detailed_collections() to get queryables for your chosen collections",
                        "step_3": "Execute search_features with proper filters using the fetched queryables",
                    },
                    "AVAILABLE_TOOLS": {
                        "fetch_detailed_collections": "Get detailed queryables for specific collections: fetch_detailed_collections('lus-fts-site-1,trn-ntwk-street-1')",
                        "search_features": "Search features (requires detailed queryables first)",
                    },
                    "QUICK_FILTERING_GUIDE": {
                        "primary_tool": "search_features",
                        "key_parameter": "filter",
                        "enum_fields": "Use exact values from collection's enum_queryables (fetch these first!)",
                        "simple_fields": "Use direct values (e.g., usrn = 12345678)",
                    },
                    "COMMON_EXAMPLES": {
                        "workflow_example": "1) Explain plan → 2) fetch_detailed_collections('lus-fts-site-1') → 3) search_features with proper filter",
                        "cinema_search": "After fetching queryables: search_features(collection_id='lus-fts-site-1', filter=\"oslandusetertiarygroup = 'Cinema'\")",
                    },
                    "CRITICAL_RULES": {
                        "1": "ALWAYS explain your plan first",
                        "2": "ALWAYS call fetch_detailed_collections() before search_features",
                        "3": "Use exact enum values from the fetched enum_queryables",
                        "4": "Quote string values in single quotes",
                    },
                }
            )

        except Exception as e:
            logger.error(f"Error getting workflow context: {e}")
            return json.dumps(
                {"error": str(e), "instruction": "Proceed with available tools"}
            )

    def _require_workflow_context(self, func: Callable) -> Callable:
        # Functions that don't need workflow context
        skip_functions = {"get_workflow_context", "hello_world", "check_api_key"}

        @functools.wraps(func)
        async def wrapper(*args: Any, **kwargs: Any) -> Any:
            if self.workflow_planner is None:
                if func.__name__ in skip_functions:
                    return await func(*args, **kwargs)
                else:
                    return json.dumps(
                        {
                            "error": "WORKFLOW CONTEXT REQUIRED",
                            "blocked_tool": func.__name__,
                            "required_action": "You must call 'get_workflow_context' first",
                            "message": "No tools are available until you get the workflow context. Please call get_workflow_context() now.",
                        }
                    )
            return await func(*args, **kwargs)

        return wrapper

    # TODO: This is a bit of a hack - we need to improve the error handling and retry logic
    # TODO: Could we actually spawn a seperate AI agent to handle the retry logic and return the result to the main agent?
    def _add_retry_context(self, response_data: dict, tool_name: str) -> dict:
        """Add retry guidance to tool responses"""
        if "error" in response_data:
            response_data["retry_guidance"] = {
                "tool": tool_name,
                "MANDATORY_INSTRUCTION 1": "Review the error message and try again with corrected parameters",
                "MANDATORY_INSTRUCTION 2": "YOU MUST call get_workflow_context() if you need to see available options again",
            }
        return response_data

    # All the tools
    async def hello_world(self, name: str) -> str:
        """Simple hello world tool for testing"""
        return f"Hello, {name}! 👋"

    async def check_api_key(self) -> str:
        """Check if the OS API key is available."""
        try:
            await self.api_client.get_api_key()
            return json.dumps({"status": "success", "message": "OS_API_KEY is set!"})
        except ValueError as e:
            return json.dumps({"status": "error", "message": str(e)})

    async def list_collections(
        self,
    ) -> str:
        """
        List all available feature collections in the OS NGD API.

        Returns:
            JSON string with collection info (id, title only)
        """
        try:
            data = await self.api_client.make_request("COLLECTIONS")

            if not data or "collections" not in data:
                return json.dumps({"error": "No collections found"})

            collections = [
                {"id": col.get("id"), "title": col.get("title")}
                for col in data.get("collections", [])
            ]

            return json.dumps({"collections": collections})
        except Exception as e:
            error_response = {"error": str(e)}
            return json.dumps(
                self._add_retry_context(error_response, "list_collections")
            )

    async def get_single_collection(
        self,
        collection_id: str,
    ) -> str:
        """
        Get detailed information about a specific collection.

        Args:
            collection_id: The collection ID

        Returns:
            JSON string with collection information
        """
        try:
            data = await self.api_client.make_request(
                "COLLECTION_INFO", path_params=[collection_id]
            )

            return json.dumps(data)
        except Exception as e:
            error_response = {"error": str(e)}
            return json.dumps(
                self._add_retry_context(error_response, "get_collection_info")
            )

    async def get_single_collection_queryables(
        self,
        collection_id: str,
    ) -> str:
        """
        Get the list of queryable properties for a collection.

        Args:
            collection_id: The collection ID

        Returns:
            JSON string with queryable properties
        """
        try:
            data = await self.api_client.make_request(
                "COLLECTION_QUERYABLES", path_params=[collection_id]
            )

            return json.dumps(data)
        except Exception as e:
            error_response = {"error": str(e)}
            return json.dumps(
                self._add_retry_context(error_response, "get_collection_queryables")
            )

    async def search_features(
        self,
        collection_id: str,
        bbox: Optional[str] = None,
        crs: Optional[str] = None,
        limit: int = 10,
        offset: int = 0,
        filter: Optional[str] = None,
        filter_lang: Optional[str] = "cql-text",
        query_attr: Optional[str] = None,
        query_attr_value: Optional[str] = None,
    ) -> str:
        """Search for features in a collection with full CQL2 filter support."""
        try:
            params: Dict[str, Union[str, int]] = {}

            if limit:
                params["limit"] = min(limit, 100)
            if offset:
                params["offset"] = max(0, offset)
            if bbox:
                params["bbox"] = bbox
            if crs:
                params["crs"] = crs
            if filter:
                if len(filter) > 1000:
                    raise ValueError("Filter too long")
                dangerous_patterns = [
                    r";\s*--",
                    r";\s*/\*",
                    r"\bUNION\b",
                    r"\bSELECT\b",
                    r"\bINSERT\b",
                    r"\bUPDATE\b",
                    r"\bDELETE\b",
                    r"\bDROP\b",
                    r"\bCREATE\b",
                    r"\bALTER\b",
                    r"\bTRUNCATE\b",
                    r"\bEXEC\b",
                    r"\bEXECUTE\b",
                    r"\bSP_\b",
                    r"\bXP_\b",
                    r"<script\b",
                    r"javascript:",
                    r"vbscript:",
                    r"onload\s*=",
                    r"onerror\s*=",
                    r"onclick\s*=",
                    r"\beval\s*\(",
                    r"document\.",
                    r"window\.",
                    r"location\.",
                    r"cookie",
                    r"innerHTML",
                    r"outerHTML",
                    r"alert\s*\(",
                    r"confirm\s*\(",
                    r"prompt\s*\(",
                    r"setTimeout\s*\(",
                    r"setInterval\s*\(",
                    r"Function\s*\(",
                    r"constructor",
                    r"prototype",
                    r"__proto__",
                    r"process\.",
                    r"require\s*\(",
                    r"import\s+",
                    r"from\s+.*import",
                    r"\.\./",
                    r"file://",
                    r"ftp://",
                    r"data:",
                    r"blob:",
                    r"\\x[0-9a-fA-F]{2}",
                    r"%[0-9a-fA-F]{2}",
                    r"&#x[0-9a-fA-F]+;",
                    r"&[a-zA-Z]+;",
                    r"\$\{",
                    r"#\{",
                    r"<%",
                    r"%>",
                    r"{{",
                    r"}}",
                    r"\\\w+",
                    r"\0",
                    r"\r\n",
                    r"\n\r",
                ]

                for pattern in dangerous_patterns:
                    if re.search(pattern, filter, re.IGNORECASE):
                        raise ValueError("Invalid filter content")

                if filter.count("'") % 2 != 0:
                    raise ValueError("Unmatched quotes in filter")

                params["filter"] = filter.strip()
                if filter_lang:
                    params["filter-lang"] = filter_lang

            elif query_attr and query_attr_value:
                if not re.match(r"^[a-zA-Z_][a-zA-Z0-9_]*$", query_attr):
                    raise ValueError("Invalid field name")

                escaped_value = str(query_attr_value).replace("'", "''")
                params["filter"] = f"{query_attr} = '{escaped_value}'"
                if filter_lang:
                    params["filter-lang"] = filter_lang

            if self.workflow_planner:
                valid_collections = set(
                    self.workflow_planner.basic_collections_info.keys()
                )
                if collection_id not in valid_collections:
                    return json.dumps(
                        {
                            "error": f"Invalid collection '{collection_id}'. Valid collections: {sorted(valid_collections)[:10]}...",
                            "suggestion": "Call get_workflow_context() to see all available collections",
                        }
                    )

            data = await self.api_client.make_request(
                "COLLECTION_FEATURES", params=params, path_params=[collection_id]
            )

            return json.dumps(data)
        except ValueError as ve:
            error_response = {"error": f"Invalid input: {str(ve)}"}
            return json.dumps(
                self._add_retry_context(error_response, "search_features")
            )
        except Exception as e:
            error_response = {"error": str(e)}
            return json.dumps(
                self._add_retry_context(error_response, "search_features")
            )

    async def get_feature(
        self,
        collection_id: str,
        feature_id: str,
        crs: Optional[str] = None,
    ) -> str:
        """
        Get a specific feature by ID.

        Args:
            collection_id: The collection ID
            feature_id: The feature ID
            crs: Coordinate reference system for the response

        Returns:
            JSON string with feature data
        """
        try:
            params: Dict[str, str] = {}
            if crs:
                params["crs"] = crs

            data = await self.api_client.make_request(
                "COLLECTION_FEATURE_BY_ID",
                params=params,
                path_params=[collection_id, feature_id],
            )

            return json.dumps(data)
        except Exception as e:
            error_response = {"error": f"Error getting feature: {str(e)}"}
            return json.dumps(self._add_retry_context(error_response, "get_feature"))

    async def get_linked_identifiers(
        self,
        identifier_type: str,
        identifier: str,
        feature_type: Optional[str] = None,
    ) -> str:
        """
        Get linked identifiers for a specified identifier.

        Args:
            identifier_type: The type of identifier (e.g., 'TOID', 'UPRN')
            identifier: The identifier value
            feature_type: Optional feature type to filter results

        Returns:
            JSON string with linked identifiers or filtered results
        """
        try:
            data = await self.api_client.make_request(
                "LINKED_IDENTIFIERS", path_params=[identifier_type, identifier]
            )

            if feature_type:
                # Filter results by feature type
                filtered_results = []
                for item in data.get("results", []):
                    if item.get("featureType") == feature_type:
                        filtered_results.append(item)
                return json.dumps({"results": filtered_results})

            return json.dumps(data)
        except Exception as e:
            error_response = {"error": str(e)}
            return json.dumps(
                self._add_retry_context(error_response, "get_linked_identifiers")
            )

    async def get_bulk_features(
        self,
        collection_id: str,
        identifiers: List[str],
        query_by_attr: Optional[str] = None,
    ) -> str:
        """
        Get multiple features in a single call.

        Args:
            collection_id: The collection ID
            identifiers: List of feature identifiers
            query_by_attr: Attribute to query by (if not provided, assumes feature IDs)

        Returns:
            JSON string with features data
        """
        try:
            tasks: List[Any] = []
            for identifier in identifiers:
                if query_by_attr:
                    task = self.search_features(
                        collection_id=collection_id,
                        query_attr=query_by_attr,
                        query_attr_value=identifier,
                        limit=1,
                    )
                else:
                    task = self.get_feature(collection_id, identifier)

                tasks.append(task)

            results = await asyncio.gather(*tasks)

            parsed_results = [json.loads(result) for result in results]

            return json.dumps({"results": parsed_results})
        except Exception as e:
            error_response = {"error": str(e)}
            return json.dumps(
                self._add_retry_context(error_response, "get_bulk_features")
            )

    async def get_bulk_linked_features(
        self,
        identifier_type: str,
        identifiers: List[str],
        feature_type: Optional[str] = None,
    ) -> str:
        """
        Get linked features for multiple identifiers in a single call.

        Args:
            identifier_type: The type of identifier (e.g., 'TOID', 'UPRN')
            identifiers: List of identifier values
            feature_type: Optional feature type to filter results

        Returns:
            JSON string with linked features data
        """
        try:
            tasks = [
                self.get_linked_identifiers(identifier_type, identifier, feature_type)
                for identifier in identifiers
            ]

            results = await asyncio.gather(*tasks)

            parsed_results = [json.loads(result) for result in results]

            return json.dumps({"results": parsed_results})
        except Exception as e:
            error_response = {"error": str(e)}
            return json.dumps(
                self._add_retry_context(error_response, "get_bulk_linked_features")
            )

    async def get_prompt_templates(
        self,
        category: Optional[str] = None,
    ) -> str:
        """
        Get standard prompt templates for interacting with this service.

        Args:
            category: Optional category of templates to return
                     (general, collections, features, linked_identifiers)

        Returns:
            JSON string containing prompt templates
        """
        if category and category in PROMPT_TEMPLATES:
            return json.dumps({category: PROMPT_TEMPLATES[category]})

        return json.dumps(PROMPT_TEMPLATES)

    async def fetch_detailed_collections(self, collection_ids: str) -> str:
        """
        Fetch detailed queryables for specific collections mentioned in LLM workflow plan.

        This is mainly to reduce the size of the context for the LLM.

        Only fetch what you really need.

        Args:
            collection_ids: Comma-separated list of collection IDs (e.g., "lus-fts-site-1,trn-ntwk-street-1")

        Returns:
            JSON string with detailed queryables for the specified collections
        """
        try:
            if not self.workflow_planner:
                return json.dumps(
                    {
                        "error": "Workflow planner not initialized. Call get_workflow_context() first."
                    }
                )

            requested_collections = [cid.strip() for cid in collection_ids.split(",")]

            valid_collections = set(self.workflow_planner.basic_collections_info.keys())
            invalid_collections = [
                cid for cid in requested_collections if cid not in valid_collections
            ]

            if invalid_collections:
                return json.dumps(
                    {
                        "error": f"Invalid collection IDs: {invalid_collections}",
                        "valid_collections": sorted(valid_collections),
                    }
                )

            cached_collections = [
                cid
                for cid in requested_collections
                if cid in self.workflow_planner.detailed_collections_cache
            ]

            collections_to_fetch = [
                cid
                for cid in requested_collections
                if cid not in self.workflow_planner.detailed_collections_cache
            ]

            if collections_to_fetch:
                logger.info(f"Fetching detailed queryables for: {collections_to_fetch}")
                detailed_queryables = (
                    await self.api_client.fetch_collections_queryables(
                        collections_to_fetch
                    )
                )

                for coll_id, queryables in detailed_queryables.items():
                    self.workflow_planner.detailed_collections_cache[coll_id] = {
                        "id": queryables.id,
                        "title": queryables.title,
                        "description": queryables.description,
                        "all_queryables": queryables.all_queryables,
                        "enum_queryables": queryables.enum_queryables,
                        "has_enum_filters": queryables.has_enum_filters,
                        "total_queryables": queryables.total_queryables,
                        "enum_count": queryables.enum_count,
                    }

            context = self.workflow_planner.get_detailed_context(requested_collections)

            return json.dumps(
                {
                    "success": True,
                    "collections_processed": requested_collections,
                    "collections_fetched_from_api": collections_to_fetch,
                    "collections_from_cache": cached_collections,
                    "detailed_collections": context["available_collections"],
                    "message": f"Detailed queryables now available for: {', '.join(requested_collections)}",
                }
            )

        except Exception as e:
            logger.error(f"Error fetching detailed collections: {e}")
            return json.dumps(
                {"error": str(e), "suggestion": "Check collection IDs and try again"}
            )

    async def get_routing_data(
        self,
        bbox: Optional[str] = None,
        limit: int = 100,
        include_nodes: bool = True,
        include_edges: bool = True,
        build_network: bool = True,
    ) -> str:
        """
        Get routing data - builds network and returns nodes/edges as flat tables.

        Args:
            bbox: Optional bounding box (format: "minx,miny,maxx,maxy")
            limit: Maximum number of road links to process (default: 1000)
            include_nodes: Whether to include nodes in response (default: True)
            include_edges: Whether to include edges in response (default: True)
            build_network: Whether to build network first (default: True)

        Returns:
            JSON string with routing network data
        """
        try:
            result = {}

            if build_network:
                build_result = await self.routing_service.build_routing_network(
                    bbox, limit
                )
                result["build_status"] = build_result

                if build_result.get("status") != "success":
                    return json.dumps(result)

            if include_nodes:
                nodes_result = self.routing_service.get_flat_nodes()
                result["nodes"] = nodes_result.get("nodes", [])

            if include_edges:
                edges_result = self.routing_service.get_flat_edges()
                result["edges"] = edges_result.get("edges", [])

            summary = self.routing_service.get_network_info()
            result["summary"] = summary.get("network", {})
            result["status"] = "success"

            return json.dumps(result)
        except Exception as e:
            return json.dumps({"error": str(e)})

```

--------------------------------------------------------------------------------
/src/config_docs/ogcapi-features-1.yaml:
--------------------------------------------------------------------------------

```yaml
openapi: 3.0.2
info:
  title: "Building Blocks specified in OGC API - Features - Part 1: Core"
  description: |-
    Common components used in the
    [OGC standard "OGC API - Features - Part 1: Core"](http://docs.opengeospatial.org/is/17-069r3/17-069r3.html).

    OGC API - Features - Part 1: Core 1.0 is an OGC Standard.
    Copyright (c) 2019 Open Geospatial Consortium.
    To obtain additional rights of use, visit http://www.opengeospatial.org/legal/ .

    This document is also available on
    [OGC](http://schemas.opengis.net/ogcapi/features/part1/1.0/openapi/ogcapi-features-1.yaml).
  version: '1.0.0'
  contact:
    name: Clemens Portele
    email: [email protected]
  license:
    name: OGC License
    url: 'http://www.opengeospatial.org/legal/'
components:
  parameters:
    bbox:
      name: bbox
      in: query
      description: |-
        Only features that have a geometry that intersects the bounding box are selected.
        The bounding box is provided as four or six numbers, depending on whether the
        coordinate reference system includes a vertical axis (height or depth):

        * Lower left corner, coordinate axis 1
        * Lower left corner, coordinate axis 2
        * Minimum value, coordinate axis 3 (optional)
        * Upper right corner, coordinate axis 1
        * Upper right corner, coordinate axis 2
        * Maximum value, coordinate axis 3 (optional)

        If the value consists of four numbers, the coordinate reference system is
        WGS 84 longitude/latitude (http://www.opengis.net/def/crs/OGC/1.3/CRS84)
        unless a different coordinate reference system is specified in the parameter `bbox-crs`.

        If the value consists of six numbers, the coordinate reference system is WGS 84
        longitude/latitude/ellipsoidal height (http://www.opengis.net/def/crs/OGC/0/CRS84h)
        unless a different coordinate reference system is specified in the parameter `bbox-crs`.

        The query parameter `bbox-crs` is specified in OGC API - Features - Part 2: Coordinate
        Reference Systems by Reference.

        For WGS 84 longitude/latitude the values are in most cases the sequence of
        minimum longitude, minimum latitude, maximum longitude and maximum latitude.
        However, in cases where the box spans the antimeridian the first value
        (west-most box edge) is larger than the third value (east-most box edge).

        If the vertical axis is included, the third and the sixth number are
        the bottom and the top of the 3-dimensional bounding box.

        If a feature has multiple spatial geometry properties, it is the decision of the
        server whether only a single spatial geometry property is used to determine
        the extent or all relevant geometries.
      required: false
      schema:
        type: array
        oneOf:
        - minItems: 4
          maxItems: 4
        - minItems: 6
          maxItems: 6
        items:
          type: number
      style: form
      explode: false
    collectionId:
      name: collectionId
      in: path
      description: local identifier of a collection
      required: true
      schema:
        type: string
    datetime:
      name: datetime
      in: query
      description: |-
        Either a date-time or an interval. Date and time expressions adhere to RFC 3339.
        Intervals may be bounded or half-bounded (double-dots at start or end).

        Examples:

        * A date-time: "2018-02-12T23:20:50Z"
        * A bounded interval: "2018-02-12T00:00:00Z/2018-03-18T12:31:12Z"
        * Half-bounded intervals: "2018-02-12T00:00:00Z/.." or "../2018-03-18T12:31:12Z"

        Only features that have a temporal property that intersects the value of
        `datetime` are selected.

        If a feature has multiple temporal properties, it is the decision of the
        server whether only a single temporal property is used to determine
        the extent or all relevant temporal properties.
      required: false
      schema:
        type: string
      style: form
      explode: false
    featureId:
      name: featureId
      in: path
      description: local identifier of a feature
      required: true
      schema:
        type: string
    limit:
      name: limit
      in: query
      description: |-
        The optional limit parameter limits the number of items that are presented in the response document.

        Only items are counted that are on the first level of the collection in the response document.
        Nested objects contained within the explicitly requested items shall not be counted.

        Minimum = 1. Maximum = 10000. Default = 10.
      required: false
      schema:
        type: integer
        minimum: 1
        maximum: 10000
        default: 10
      style: form
      explode: false
  schemas:
    collection:
      type: object
      required:
        - id
        - links
      properties:
        id:
          description: identifier of the collection used, for example, in URIs
          type: string
          example: address
        title:
          description: human readable title of the collection
          type: string
          example: address
        description:
          description: a description of the features in the collection
          type: string
          example: An address.
        links:
          type: array
          items:
            $ref: "#/components/schemas/link"
          example:
            - href: http://data.example.com/buildings
              rel: item
            - href: http://example.com/concepts/buildings.html
              rel: describedby
              type: text/html
        extent:
          $ref: "#/components/schemas/extent"
        itemType:
          description: indicator about the type of the items in the collection (the default value is 'feature').
          type: string
          default: feature
        crs:
          description: the list of coordinate reference systems supported by the service
          type: array
          items:
            type: string
          default:
            - http://www.opengis.net/def/crs/OGC/1.3/CRS84
          example:
            - http://www.opengis.net/def/crs/OGC/1.3/CRS84
            - http://www.opengis.net/def/crs/EPSG/0/4326
    collections:
      type: object
      required:
        - links
        - collections
      properties:
        links:
          type: array
          items:
            $ref: "#/components/schemas/link"
        collections:
          type: array
          items:
            $ref: "#/components/schemas/collection"
    confClasses:
      type: object
      required:
        - conformsTo
      properties:
        conformsTo:
          type: array
          items:
            type: string
    exception:
      type: object
      description: |-
        Information about the exception: an error code plus an optional description.
      required:
        - code
      properties:
        code:
          type: string
        description:
          type: string
    extent:
      type: object
      description: |-
        The extent of the features in the collection. In the Core only spatial and temporal
        extents are specified. Extensions may add additional members to represent other
        extents, for example, thermal or pressure ranges.
      properties:
        spatial:
          description: |-
            The spatial extent of the features in the collection.
          type: object
          properties:
            bbox:
              description: |-
                One or more bounding boxes that describe the spatial extent of the dataset.
                In the Core only a single bounding box is supported. Extensions may support
                additional areas. If multiple areas are provided, the union of the bounding
                boxes describes the spatial extent.
              type: array
              minItems: 1
              items:
                description: |-
                  Each bounding box is provided as four or six numbers, depending on
                  whether the coordinate reference system includes a vertical axis
                  (height or depth):

                  * Lower left corner, coordinate axis 1
                  * Lower left corner, coordinate axis 2
                  * Minimum value, coordinate axis 3 (optional)
                  * Upper right corner, coordinate axis 1
                  * Upper right corner, coordinate axis 2
                  * Maximum value, coordinate axis 3 (optional)

                  The coordinate reference system of the values is WGS 84 longitude/latitude
                  (http://www.opengis.net/def/crs/OGC/1.3/CRS84) unless a different coordinate
                  reference system is specified in `crs`.

                  For WGS 84 longitude/latitude the values are in most cases the sequence of
                  minimum longitude, minimum latitude, maximum longitude and maximum latitude.
                  However, in cases where the box spans the antimeridian the first value
                  (west-most box edge) is larger than the third value (east-most box edge).

                  If the vertical axis is included, the third and the sixth number are
                  the bottom and the top of the 3-dimensional bounding box.

                  If a feature has multiple spatial geometry properties, it is the decision of the
                  server whether only a single spatial geometry property is used to determine
                  the extent or all relevant geometries.
                type: array
                oneOf:
                - minItems: 4
                  maxItems: 4
                - minItems: 6
                  maxItems: 6
                items:
                  type: number
                example:
                  - -180
                  - -90
                  - 180
                  - 90
            crs:
              description: |-
                Coordinate reference system of the coordinates in the spatial extent
                (property `bbox`). The default reference system is WGS 84 longitude/latitude.
                In the Core this is the only supported coordinate reference system.
                Extensions may support additional coordinate reference systems and add
                additional enum values.
              type: string
              enum:
                - 'http://www.opengis.net/def/crs/OGC/1.3/CRS84'
              default: 'http://www.opengis.net/def/crs/OGC/1.3/CRS84'
        temporal:
          description: |-
            The temporal extent of the features in the collection.
          type: object
          properties:
            interval:
              description: |-
                One or more time intervals that describe the temporal extent of the dataset.
                The value `null` is supported and indicates an unbounded interval end.
                In the Core only a single time interval is supported. Extensions may support
                multiple intervals. If multiple intervals are provided, the union of the
                intervals describes the temporal extent.
              type: array
              minItems: 1
              items:
                description: |-
                  Begin and end times of the time interval. The timestamps are in the
                  temporal coordinate reference system specified in `trs`. By default
                  this is the Gregorian calendar.
                type: array
                minItems: 2
                maxItems: 2
                items:
                  type: string
                  format: date-time
                  nullable: true
                example:
                  - '2011-11-11T12:22:11Z'
                  - null
            trs:
              description: |-
                Coordinate reference system of the coordinates in the temporal extent
                (property `interval`). The default reference system is the Gregorian calendar.
                In the Core this is the only supported temporal coordinate reference system.
                Extensions may support additional temporal coordinate reference systems and add
                additional enum values.
              type: string
              enum:
                - 'http://www.opengis.net/def/uom/ISO-8601/0/Gregorian'
              default: 'http://www.opengis.net/def/uom/ISO-8601/0/Gregorian'
    featureCollectionGeoJSON:
      type: object
      required:
        - type
        - features
      properties:
        type:
          type: string
          enum:
            - FeatureCollection
        features:
          type: array
          items:
            $ref: "#/components/schemas/featureGeoJSON"
        links:
          type: array
          items:
            $ref: "#/components/schemas/link"
        timeStamp:
          $ref: "#/components/schemas/timeStamp"
        numberMatched:
          $ref: "#/components/schemas/numberMatched"
        numberReturned:
          $ref: "#/components/schemas/numberReturned"
    featureGeoJSON:
      type: object
      required:
        - type
        - geometry
        - properties
      properties:
        type:
          type: string
          enum:
            - Feature
        geometry:
          $ref: "#/components/schemas/geometryGeoJSON"
        properties:
          type: object
          nullable: true
        id:
          oneOf:
            - type: string
            - type: integer
        links:
          type: array
          items:
            $ref: "#/components/schemas/link"
    geometryGeoJSON:
      oneOf:
      - $ref: "#/components/schemas/pointGeoJSON"
      - $ref: "#/components/schemas/multipointGeoJSON"
      - $ref: "#/components/schemas/linestringGeoJSON"
      - $ref: "#/components/schemas/multilinestringGeoJSON"
      - $ref: "#/components/schemas/polygonGeoJSON"
      - $ref: "#/components/schemas/multipolygonGeoJSON"
      - $ref: "#/components/schemas/geometrycollectionGeoJSON"
    geometrycollectionGeoJSON:
      type: object
      required:
        - type
        - geometries
      properties:
        type:
          type: string
          enum:
            - GeometryCollection
        geometries:
          type: array
          items:
            $ref: "#/components/schemas/geometryGeoJSON"
    landingPage:
      type: object
      required:
        - links
      properties:
        title:
          type: string
          example: Buildings in Bonn
        description:
          type: string
          example: Access to data about buildings in the city of Bonn via a Web API that conforms to the OGC API Features specification.
        links:
          type: array
          items:
            $ref: "#/components/schemas/link"
    linestringGeoJSON:
      type: object
      required:
        - type
        - coordinates
      properties:
        type:
          type: string
          enum:
            - LineString
        coordinates:
          type: array
          minItems: 2
          items:
            type: array
            minItems: 2
            items:
              type: number
    link:
      type: object
      required:
        - href
      properties:
        href:
          type: string
          example: http://data.example.com/buildings/123
        rel:
          type: string
          example: alternate
        type:
          type: string
          example: application/geo+json
        hreflang:
          type: string
          example: en
        title:
          type: string
          example: Trierer Strasse 70, 53115 Bonn
        length:
          type: integer
    multilinestringGeoJSON:
      type: object
      required:
        - type
        - coordinates
      properties:
        type:
          type: string
          enum:
            - MultiLineString
        coordinates:
          type: array
          items:
            type: array
            minItems: 2
            items:
              type: array
              minItems: 2
              items:
                type: number
    multipointGeoJSON:
      type: object
      required:
        - type
        - coordinates
      properties:
        type:
          type: string
          enum:
            - MultiPoint
        coordinates:
          type: array
          items:
            type: array
            minItems: 2
            items:
              type: number
    multipolygonGeoJSON:
      type: object
      required:
        - type
        - coordinates
      properties:
        type:
          type: string
          enum:
            - MultiPolygon
        coordinates:
          type: array
          items:
            type: array
            items:
              type: array
              minItems: 4
              items:
                type: array
                minItems: 2
                items:
                  type: number
    numberMatched:
      description: |-
        The number of features of the feature type that match the selection
        parameters like `bbox`.
      type: integer
      minimum: 0
      example: 127
    numberReturned:
      description: |-
        The number of features in the feature collection.

        A server may omit this information in a response, if the information
        about the number of features is not known or difficult to compute.

        If the value is provided, the value shall be identical to the number
        of items in the "features" array.
      type: integer
      minimum: 0
      example: 10
    pointGeoJSON:
      type: object
      required:
        - type
        - coordinates
      properties:
        type:
          type: string
          enum:
            - Point
        coordinates:
          type: array
          minItems: 2
          items:
            type: number
    polygonGeoJSON:
      type: object
      required:
        - type
        - coordinates
      properties:
        type:
          type: string
          enum:
            - Polygon
        coordinates:
          type: array
          items:
            type: array
            minItems: 4
            items:
              type: array
              minItems: 2
              items:
                type: number
    timeStamp:
      description: This property indicates the time and date when the response was generated.
      type: string
      format: date-time
      example: '2017-08-17T08:05:32Z'
  responses:
    LandingPage:
      description: |-
        The landing page provides links to the API definition
        (link relations `service-desc` and `service-doc`),
        the Conformance declaration (path `/conformance`,
        link relation `conformance`), and the Feature
        Collections (path `/collections`, link relation
        `data`).
      content:
        application/json:
          schema:
            $ref: '#/components/schemas/landingPage'
          example:
            title: Buildings in Bonn
            description: Access to data about buildings in the city of Bonn via a Web API that conforms to the OGC API Features specification.
            links:
              - href: 'http://data.example.org/'
                rel: self
                type: application/json
                title: this document
              - href: 'http://data.example.org/api'
                rel: service-desc
                type: application/vnd.oai.openapi+json;version=3.0
                title: the API definition
              - href: 'http://data.example.org/api.html'
                rel: service-doc
                type: text/html
                title: the API documentation
              - href: 'http://data.example.org/conformance'
                rel: conformance
                type: application/json
                title: OGC API conformance classes implemented by this server
              - href: 'http://data.example.org/collections'
                rel: data
                type: application/json
                title: Information about the feature collections
        text/html:
          schema:
            type: string
    ConformanceDeclaration:
      description: |-
        The URIs of all conformance classes supported by the server.

        To support "generic" clients that want to access multiple
        OGC API Features implementations - and not "just" a specific
        API / server, the server declares the conformance
        classes it implements and conforms to.
      content:
        application/json:
          schema:
            $ref: '#/components/schemas/confClasses'
          example:
            conformsTo:
              - 'http://www.opengis.net/spec/ogcapi-features-1/1.0/conf/core'
              - 'http://www.opengis.net/spec/ogcapi-features-1/1.0/conf/oas30'
              - 'http://www.opengis.net/spec/ogcapi-features-1/1.0/conf/html'
              - 'http://www.opengis.net/spec/ogcapi-features-1/1.0/conf/geojson'
        text/html:
          schema:
            type: string
    Collections:
      description: |-
        The feature collections shared by this API.

        The dataset is organized as one or more feature collections. This resource
        provides information about and access to the collections.

        The response contains the list of collections. For each collection, a link
        to the items in the collection (path `/collections/{collectionId}/items`,
        link relation `items`) as well as key information about the collection.
        This information includes:

        * A local identifier for the collection that is unique for the dataset;
        * A list of coordinate reference systems (CRS) in which geometries may be returned by the server. The first CRS is the default coordinate reference system (the default is always WGS 84 with axis order longitude/latitude);
        * An optional title and description for the collection;
        * An optional extent that can be used to provide an indication of the spatial and temporal extent of the collection - typically derived from the data;
        * An optional indicator about the type of the items in the collection (the default value, if the indicator is not provided, is 'feature').
      content:
        application/json:
          schema:
            $ref: '#/components/schemas/collections'
          example:
            links:
              - href: 'http://data.example.org/collections.json'
                rel: self
                type: application/json
                title: this document
              - href: 'http://data.example.org/collections.html'
                rel: alternate
                type: text/html
                title: this document as HTML
              - href: 'http://schemas.example.org/1.0/buildings.xsd'
                rel: describedby
                type: application/xml
                title: GML application schema for Acme Corporation building data
              - href: 'http://download.example.org/buildings.gpkg'
                rel: enclosure
                type: application/geopackage+sqlite3
                title: Bulk download (GeoPackage)
                length: 472546
            collections:
              - id: buildings
                title: Buildings
                description: Buildings in the city of Bonn.
                extent:
                  spatial:
                    bbox:
                      - - 7.01
                        - 50.63
                        - 7.22
                        - 50.78
                  temporal:
                    interval:
                      - - '2010-02-15T12:34:56Z'
                        - null
                links:
                  - href: 'http://data.example.org/collections/buildings/items'
                    rel: items
                    type: application/geo+json
                    title: Buildings
                  - href: 'http://data.example.org/collections/buildings/items.html'
                    rel: items
                    type: text/html
                    title: Buildings
                  - href: 'https://creativecommons.org/publicdomain/zero/1.0/'
                    rel: license
                    type: text/html
                    title: CC0-1.0
                  - href: 'https://creativecommons.org/publicdomain/zero/1.0/rdf'
                    rel: license
                    type: application/rdf+xml
                    title: CC0-1.0
        text/html:
          schema:
            type: string
    Collection:
      description: |-
        Information about the feature collection with id `collectionId`.

        The response contains a link to the items in the collection
        (path `/collections/{collectionId}/items`, link relation `items`)
        as well as key information about the collection. This information
        includes:

        * A local identifier for the collection that is unique for the dataset;
        * A list of coordinate reference systems (CRS) in which geometries may be returned by the server. The first CRS is the default coordinate reference system (the default is always WGS 84 with axis order longitude/latitude);
        * An optional title and description for the collection;
        * An optional extent that can be used to provide an indication of the spatial and temporal extent of the collection - typically derived from the data;
        * An optional indicator about the type of the items in the collection (the default value, if the indicator is not provided, is 'feature').
      content:
        application/json:
          schema:
            $ref: '#/components/schemas/collection'
          example:
            id: buildings
            title: Buildings
            description: Buildings in the city of Bonn.
            extent:
              spatial:
                bbox:
                  - - 7.01
                    - 50.63
                    - 7.22
                    - 50.78
              temporal:
                interval:
                  - - '2010-02-15T12:34:56Z'
                    - null
            links:
              - href: 'http://data.example.org/collections/buildings/items'
                rel: items
                type: application/geo+json
                title: Buildings
              - href: 'http://data.example.org/collections/buildings/items.html'
                rel: items
                type: text/html
                title: Buildings
              - href: 'https://creativecommons.org/publicdomain/zero/1.0/'
                rel: license
                type: text/html
                title: CC0-1.0
              - href: 'https://creativecommons.org/publicdomain/zero/1.0/rdf'
                rel: license
                type: application/rdf+xml
                title: CC0-1.0
        text/html:
          schema:
            type: string
    Features:
      description: |-
        The response is a document consisting of features in the collection.
        The features included in the response are determined by the server
        based on the query parameters of the request. To support access to
        larger collections without overloading the client, the API supports
        paged access with links to the next page, if more features are selected
        that the page size.

        The `bbox` and `datetime` parameter can be used to select only a
        subset of the features in the collection (the features that are in the
        bounding box or time interval). The `bbox` parameter matches all features
        in the collection that are not associated with a location, too. The
        `datetime` parameter matches all features in the collection that are
        not associated with a time stamp or interval, too.

        The `limit` parameter may be used to control the subset of the
        selected features that should be returned in the response, the page size.
        Each page may include information about the number of selected and
        returned features (`numberMatched` and `numberReturned`) as well as
        links to support paging (link relation `next`).
      content:
        application/geo+json:
          schema:
            $ref: '#/components/schemas/featureCollectionGeoJSON'
          example:
            type: FeatureCollection
            links:
              - href: 'http://data.example.com/collections/buildings/items.json'
                rel: self
                type: application/geo+json
                title: this document
              - href: 'http://data.example.com/collections/buildings/items.html'
                rel: alternate
                type: text/html
                title: this document as HTML
              - href: 'http://data.example.com/collections/buildings/items.json&offset=10&limit=2'
                rel: next
                type: application/geo+json
                title: next page
            timeStamp: '2018-04-03T14:52:23Z'
            numberMatched: 123
            numberReturned: 2
            features:
              - type: Feature
                id: '123'
                geometry:
                  type: Polygon
                  coordinates:
                    - ...
                properties:
                  function: residential
                  floors: '2'
                  lastUpdate: '2015-08-01T12:34:56Z'
              - type: Feature
                id: '132'
                geometry:
                  type: Polygon
                  coordinates:
                    - ...
                properties:
                  function: public use
                  floors: '10'
                  lastUpdate: '2013-12-03T10:15:37Z'
        text/html:
          schema:
            type: string
    Feature:
      description: |-
        fetch the feature with id `featureId` in the feature collection
        with id `collectionId`
      content:
        application/geo+json:
          schema:
            $ref: '#/components/schemas/featureGeoJSON'
          example:
            type: Feature
            links:
              - href: 'http://data.example.com/id/building/123'
                rel: canonical
                title: canonical URI of the building
              - href: 'http://data.example.com/collections/buildings/items/123.json'
                rel: self
                type: application/geo+json
                title: this document
              - href: 'http://data.example.com/collections/buildings/items/123.html'
                rel: alternate
                type: text/html
                title: this document as HTML
              - href: 'http://data.example.com/collections/buildings'
                rel: collection
                type: application/geo+json
                title: the collection document
            id: '123'
            geometry:
              type: Polygon
              coordinates:
                - ...
            properties:
              function: residential
              floors: '2'
              lastUpdate: '2015-08-01T12:34:56Z'
        text/html:
          schema:
            type: string
    InvalidParameter:
      description: |-
        A query parameter has an invalid value.
      content:
        application/json:
          schema:
            $ref: '#/components/schemas/exception'
        text/html:
          schema:
            type: string
    NotFound:
      description: |-
        The requested resource does not exist on the server. For example, a path parameter had an incorrect value.
    NotAcceptable:
      description: |-
        Content negotiation failed. For example, the `Accept` header submitted in the request did not support any of the media types supported by the server for the requested resource.
    ServerError:
      description: |-
        A server error occurred.
      content:
        application/json:
          schema:
            $ref: '#/components/schemas/exception'
        text/html:
          schema:
            type: string

```