# Directory Structure
```
├── .dockerignore
├── .env.local
├── .gitignore
├── .python-version
├── dockerfile
├── LICENSE
├── makefile
├── pyproject.toml
├── README.md
├── src
│ ├── __init__.py
│ ├── api_service
│ │ ├── __init__.py
│ │ ├── os_api.py
│ │ └── protocols.py
│ ├── config_docs
│ │ ├── ogcapi-features-1.yaml
│ │ ├── ogcapi-features-2.yaml
│ │ └── ogcapi-features-3.yaml
│ ├── http_client_test.py
│ ├── mcp_service
│ │ ├── __init__.py
│ │ ├── guardrails.py
│ │ ├── os_service.py
│ │ ├── prompts.py
│ │ ├── protocols.py
│ │ ├── resources.py
│ │ └── routing_service.py
│ ├── middleware
│ │ ├── __init__.py
│ │ ├── http_middleware.py
│ │ └── stdio_middleware.py
│ ├── models.py
│ ├── prompt_templates
│ │ ├── __init__.py
│ │ └── prompt_templates.py
│ ├── server.py
│ ├── stdio_client_test.py
│ ├── utils
│ │ ├── __init__.py
│ │ └── logging_config.py
│ └── workflow_generator
│ ├── __init__.py
│ └── workflow_planner.py
└── TODO.md
```
# Files
--------------------------------------------------------------------------------
/.python-version:
--------------------------------------------------------------------------------
```
3.11
```
--------------------------------------------------------------------------------
/.env.local:
--------------------------------------------------------------------------------
```
OS_API_KEY=xxxxxxx
STDIO_KEY=xxxxxxx
BEARER_TOKENS=xxxxxxx
```
--------------------------------------------------------------------------------
/.gitignore:
--------------------------------------------------------------------------------
```
# Python-generated files
__pycache__/
*.py[oc]
build/
dist/
wheels/
*.egg-info
# Virtual environments
.venv
.env
.DS_Store
.ruff_cache
uv.lock
pyrightconfig.json
*.log
.pytest_cache/
.mypy_cache/
```
--------------------------------------------------------------------------------
/.dockerignore:
--------------------------------------------------------------------------------
```
# Python artifacts
__pycache__/
*.py[cod]
*$py.class
*.so
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
*.egg-info/
.installed.cfg
*.egg
# Virtual environments
.venv/
venv/
ENV/
env/
.env
# Testing and type checking
.pytest_cache/
.mypy_cache/
.coverage
htmlcov/
.tox/
.hypothesis/
*.cover
.coverage.*
# Development tools
.ruff_cache/
pyrightconfig.json
.vscode/
.idea/
*.swp
*.swo
*~
# OS files
.DS_Store
.DS_Store?
._*
.Spotlight-V100
.Trashes
ehthumbs.db
Thumbs.db
# Git
.git/
.gitignore
.gitattributes
# Documentation
*.md
docs/
LICENSE
# Build files
Makefile
makefile
dockerfile
Dockerfile
# Logs
*.log
logs/
# Package manager
uv.lock
poetry.lock
Pipfile.lock
# Test files
*_test.py
test_*.py
tests/
```
--------------------------------------------------------------------------------
/README.md:
--------------------------------------------------------------------------------
```markdown
# Ordnance Survey MCP Server
A MCP server for accessing UK geospatial data through Ordnance Survey APIs.
## What it does
Provides LLM access to the Ordnance Survey's Data Hub APIs.
Ask simple questions such as find me all cinemas in Leeds City Centre or use the prompts templates for more complex, speciifc use cases - relating to street works, planning, etc.
Use the prompts templates for more complex, speciifc use cases - relating to road networks, etc.
This MCP server enforces a 2 step workflow plan to ensure the user gets the best results - it must always produce a plan before making searches.
## Quick Start
### 1. Get an OS API Key
Register at [OS Data Hub](https://osdatahub.os.uk/) to get your free API key and set up a project.
### 2. Run with Docker and add to your Claude Desktop config (easiest)
```bash
Clone the repository:
git clone https://github.com/your-username/os-mcp-server.git
cd os-mcp-server
```
Then build the Docker image:
```bash
docker build -t os-mcp-server .
```
Add the following to your Claude Desktop config:
```json
{
"mcpServers": {
"os-mcp-server": {
"command": "docker",
"args": [
"run",
"--rm",
"-i",
"-e",
"OS_API_KEY=your_api_key_here",
"-e",
"STDIO_KEY=any_value",
"os-mcp-server"
]
}
}
}
```
Open Claude Desktop and you show see all available tools, resources, and prompts.
## License
MIT License. This project does not have the endorsement of Ordnance Survey. This is a personal project and not affiliated with Ordnance Survey. This is not a commercial product. It is actively being worked on so expect breaking changes and bugs.
```
--------------------------------------------------------------------------------
/src/__init__.py:
--------------------------------------------------------------------------------
```python
```
--------------------------------------------------------------------------------
/src/api_service/__init__.py:
--------------------------------------------------------------------------------
```python
```
--------------------------------------------------------------------------------
/src/mcp_service/__init__.py:
--------------------------------------------------------------------------------
```python
```
--------------------------------------------------------------------------------
/src/middleware/__init__.py:
--------------------------------------------------------------------------------
```python
```
--------------------------------------------------------------------------------
/src/prompt_templates/__init__.py:
--------------------------------------------------------------------------------
```python
```
--------------------------------------------------------------------------------
/src/utils/__init__.py:
--------------------------------------------------------------------------------
```python
```
--------------------------------------------------------------------------------
/src/workflow_generator/__init__.py:
--------------------------------------------------------------------------------
```python
```
--------------------------------------------------------------------------------
/TODO.md:
--------------------------------------------------------------------------------
```markdown
# TODO
Add in better context window controls and explore the use of an in memory duckdb instance (as it has good json support) - store responses here first?
Move the docker build to using UV only
Need to sort out OS rate limits affecting the bulk feature fetch
```
--------------------------------------------------------------------------------
/pyproject.toml:
--------------------------------------------------------------------------------
```toml
[project]
name = "os-mcp"
version = "0.1.6"
description = "A Python MCP server that provides access to Ordnance Survey's DataHub APIs."
readme = "README.md"
requires-python = ">=3.11"
dependencies = [
"aiohttp>=3.13.0",
"anthropic>=0.71.0",
"fastapi>=0.120.0",
"mcp>=1.19.0",
"starlette>=0.48.0",
"uvicorn>=0.38.0",
]
```
--------------------------------------------------------------------------------
/dockerfile:
--------------------------------------------------------------------------------
```dockerfile
FROM python:3.11.14-slim
WORKDIR /app
ENV PYTHONPATH=/app/src \
PYTHONUNBUFFERED=1 \
PIP_NO_CACHE_DIR=1 \
PIP_DISABLE_PIP_VERSION_CHECK=1
RUN apt-get update && apt-get install -y \
gcc \
&& rm -rf /var/lib/apt/lists/* \
&& apt-get clean \
&& pip install --upgrade pip
RUN groupadd -r appuser && \
useradd -r -g appuser -u 1000 appuser && \
chown -R appuser:appuser /app
COPY --chown=appuser:appuser pyproject.toml ./
RUN pip install --no-cache-dir .
COPY --chown=appuser:appuser src/ ./src/
USER appuser
EXPOSE 8000
CMD ["python", "src/server.py", "--transport", "stdio"]
```
--------------------------------------------------------------------------------
/src/workflow_generator/workflow_planner.py:
--------------------------------------------------------------------------------
```python
from typing import Dict, Any, Optional, List
from models import OpenAPISpecification
from utils.logging_config import get_logger
logger = get_logger(__name__)
class WorkflowPlanner:
"""Context provider for LLM workflow planning"""
def __init__(
self,
openapi_spec: Optional[OpenAPISpecification],
basic_collections_info: Optional[Dict[str, Any]] = None,
):
self.spec = openapi_spec
self.basic_collections_info = basic_collections_info or {}
self.detailed_collections_cache = {}
def get_basic_context(self) -> Dict[str, Any]:
"""Get basic context for LLM to plan its workflow - no detailed queryables"""
return {
"available_collections": self.basic_collections_info,
"openapi_spec": self.spec,
}
def get_detailed_context(self, collection_ids: List[str]) -> Dict[str, Any]:
"""Get detailed context for specific collections mentioned in the plan"""
detailed_collections = {
coll_id: self.detailed_collections_cache.get(coll_id)
for coll_id in collection_ids
if coll_id in self.detailed_collections_cache
}
return {
"available_collections": detailed_collections,
"openapi_spec": self.spec,
}
```
--------------------------------------------------------------------------------
/src/api_service/protocols.py:
--------------------------------------------------------------------------------
```python
from typing import Protocol, Dict, List, Any, Optional, runtime_checkable
from models import (
OpenAPISpecification,
CollectionsCache,
CollectionQueryables,
)
@runtime_checkable
class APIClient(Protocol):
"""Protocol for API clients"""
async def initialise(self):
"""Initialise the aiohttp session if not already created"""
...
async def close(self):
"""Close the aiohttp session"""
...
async def get_api_key(self) -> str:
"""Get the API key"""
...
async def make_request(
self,
endpoint: str,
params: Optional[Dict[str, Any]] = None,
path_params: Optional[List[str]] = None,
) -> Dict[str, Any]:
"""Make a request to an API endpoint"""
...
async def make_request_no_auth(
self,
url: str,
params: Optional[Dict[str, Any]] = None,
max_retries: int = 2,
) -> str:
"""Make a request without authentication"""
...
async def cache_openapi_spec(self) -> OpenAPISpecification:
"""Cache the OpenAPI spec"""
...
async def cache_collections(self) -> CollectionsCache:
"""Cache the collections data"""
...
async def fetch_collections_queryables(
self, collection_ids: List[str]
) -> Dict[str, CollectionQueryables]:
"""Fetch detailed queryables for specific collections only"""
...
```
--------------------------------------------------------------------------------
/src/utils/logging_config.py:
--------------------------------------------------------------------------------
```python
import logging
import sys
import os
import re
class APIKeySanitisingFilter(logging.Filter):
"""Filter to sanitise API keys from log messages"""
def __init__(self):
super().__init__()
self.patterns = [
r"[?&]key=[^&\s]*",
r"[?&]api_key=[^&\s]*",
r"[?&]apikey=[^&\s]*",
r"[?&]token=[^&\s]*",
]
def filter(self, record: logging.LogRecord) -> bool:
"""Sanitise the log record message"""
if hasattr(record, "msg") and isinstance(record.msg, str):
record.msg = self._sanitise_text(record.msg)
if hasattr(record, "args") and record.args:
sanitised_args = []
for arg in record.args:
if isinstance(arg, str):
sanitised_args.append(self._sanitise_text(arg))
else:
sanitised_args.append(arg)
record.args = tuple(sanitised_args)
return True
def _sanitise_text(self, text: str) -> str:
"""Remove API keys from text"""
sanitised = text
for pattern in self.patterns:
sanitised = re.sub(pattern, "", sanitised, flags=re.IGNORECASE)
sanitised = re.sub(r"[?&]$", "", sanitised)
sanitised = re.sub(r"&{2,}", "&", sanitised)
sanitised = re.sub(r"\?&", "?", sanitised)
return sanitised
def configure_logging(debug: bool = False) -> logging.Logger:
"""
Configure logging for the entire application.
Args:
debug: Whether to enable debug logging or not
"""
log_level = (
logging.DEBUG if (debug or os.environ.get("DEBUG") == "1") else logging.INFO
)
formatter = logging.Formatter(
"%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
root_logger = logging.getLogger()
root_logger.setLevel(log_level)
for handler in root_logger.handlers[:]:
root_logger.removeHandler(handler)
console_handler = logging.StreamHandler(sys.stderr)
console_handler.setFormatter(formatter)
console_handler.setLevel(log_level)
api_key_filter = APIKeySanitisingFilter()
console_handler.addFilter(api_key_filter)
root_logger.addHandler(console_handler)
logging.getLogger("uvicorn").propagate = False
return root_logger
def get_logger(name: str) -> logging.Logger:
"""
Get a logger for a module.
Args:
name: Module name (usually __name__)
Returns:
Logger instance
"""
return logging.getLogger(name)
```
--------------------------------------------------------------------------------
/src/prompt_templates/prompt_templates.py:
--------------------------------------------------------------------------------
```python
PROMPT_TEMPLATES = {
"usrn_breakdown": (
"Break down USRN {usrn} into its component road links for routing analysis. "
"Step 1: GET /collections/trn-ntwk-street-1/items?filter=usrn='{usrn}' to get street details. "
"Step 2: Use Street geometry bbox to query Road Links: GET /collections/trn-ntwk-roadlink-4/items?bbox=[street_bbox] "
"Step 3: Filter Road Links by Street reference using properties.street_ref matching street feature ID. "
"Step 4: For each Road Link: GET /collections/trn-ntwk-roadnode-1/items?filter=roadlink_ref='roadlink_id' "
"Step 5: Set crs=EPSG:27700 for British National Grid coordinates. "
"Return: Complete breakdown of USRN into constituent Road Links with node connections."
),
"restriction_matching_analysis": (
"Perform comprehensive traffic restriction matching for road network in bbox {bbox} with SPECIFIC STREET IDENTIFICATION. "
"Step 1: Build routing network: get_routing_data(bbox='{bbox}', limit={limit}, build_network=True) "
"Step 2: Extract restriction data from build_status.restrictions array. "
"Step 3: For each restriction, match to road links using restrictionnetworkreference: "
" - networkreferenceid = Road Link UUID from trn-ntwk-roadlink-4 "
" - roadlinkdirection = 'In Direction' (with geometry) or 'In Opposite Direction' (against geometry) "
" - roadlinksequence = order for multi-link restrictions (turns) "
"Step 4: IMMEDIATELY lookup street names: Use get_bulk_features(collection_id='trn-ntwk-roadlink-4', identifiers=[road_link_uuids]) to resolve UUIDs to actual street names (name1_text, roadclassification, roadclassificationnumber) "
"Step 5: Analyze restriction types by actual street names: "
" - One Way: Apply directional constraint to specific named road "
" - Turn Restriction: Block movement between named streets (from street A to street B) "
" - Vehicle Restrictions: Apply dimension/weight limits to specific named roads "
" - Access Restrictions: Apply vehicle type constraints to specific named streets "
"Step 6: Check exemptions array (e.g., 'Pedal Cycles' exempt from one-way) for each named street "
"Step 7: Group results by street names and road classifications (A Roads, B Roads, Local Roads) "
"Return: Complete restriction mapping showing ACTUAL STREET NAMES with their specific restrictions, directions, exemptions, and road classifications. Present results as 'Street Name (Road Class)' rather than UUIDs."
),
}
```
--------------------------------------------------------------------------------
/src/mcp_service/resources.py:
--------------------------------------------------------------------------------
```python
"""MCP Resources for OS NGD documentation"""
import json
import time
from models import NGDAPIEndpoint
from utils.logging_config import get_logger
logger = get_logger(__name__)
# TODO: Do this for
class OSDocumentationResources:
"""Handles registration of OS NGD documentation resources"""
def __init__(self, mcp_service, api_client):
self.mcp = mcp_service
self.api_client = api_client
def register_all(self) -> None:
"""Register all documentation resources"""
self._register_transport_network_resources()
# Future: self._register_land_resources()
# Future: self._register_building_resources()
def _register_transport_network_resources(self) -> None:
"""Register transport network documentation resources"""
@self.mcp.resource("os-docs://street")
async def street_docs() -> str:
return await self._fetch_doc_resource(
"street", NGDAPIEndpoint.MARKDOWN_STREET.value
)
@self.mcp.resource("os-docs://road")
async def road_docs() -> str:
return await self._fetch_doc_resource(
"road", NGDAPIEndpoint.MARKDOWN_ROAD.value
)
@self.mcp.resource("os-docs://tram-on-road")
async def tram_on_road_docs() -> str:
return await self._fetch_doc_resource(
"tram-on-road", NGDAPIEndpoint.TRAM_ON_ROAD.value
)
@self.mcp.resource("os-docs://road-node")
async def road_node_docs() -> str:
return await self._fetch_doc_resource(
"road-node", NGDAPIEndpoint.ROAD_NODE.value
)
@self.mcp.resource("os-docs://road-link")
async def road_link_docs() -> str:
return await self._fetch_doc_resource(
"road-link", NGDAPIEndpoint.ROAD_LINK.value
)
@self.mcp.resource("os-docs://road-junction")
async def road_junction_docs() -> str:
return await self._fetch_doc_resource(
"road-junction", NGDAPIEndpoint.ROAD_JUNCTION.value
)
async def _fetch_doc_resource(self, feature_type: str, url: str) -> str:
"""Generic method to fetch documentation resources"""
try:
content = await self.api_client.make_request_no_auth(url)
return json.dumps(
{
"feature_type": feature_type,
"content": content,
"content_type": "markdown",
"source_url": url,
"timestamp": time.time(),
}
)
except Exception as e:
logger.error(f"Error fetching {feature_type} documentation: {e}")
return json.dumps({"error": str(e), "feature_type": feature_type})
```
--------------------------------------------------------------------------------
/src/mcp_service/prompts.py:
--------------------------------------------------------------------------------
```python
from typing import List
from mcp.types import PromptMessage, TextContent
from prompt_templates.prompt_templates import PROMPT_TEMPLATES
from utils.logging_config import get_logger
logger = get_logger(__name__)
class OSWorkflowPrompts:
"""Handles registration of OS NGD workflow prompts"""
def __init__(self, mcp_service):
self.mcp = mcp_service
def register_all(self) -> None:
"""Register all workflow prompts"""
self._register_analysis_prompts()
self._register_general_prompts()
def _register_analysis_prompts(self) -> None:
"""Register analysis workflow prompts"""
@self.mcp.prompt()
def usrn_breakdown_analysis(usrn: str) -> List[PromptMessage]:
"""Generate a step-by-step USRN breakdown workflow"""
template = PROMPT_TEMPLATES["usrn_breakdown"].format(usrn=usrn)
return [
PromptMessage(
role="user",
content=TextContent(
type="text",
text=f"As an expert in OS NGD API workflows and transport network analysis, {template}",
),
)
]
def _register_general_prompts(self) -> None:
"""Register general OS NGD guidance prompts"""
@self.mcp.prompt()
def collection_query_guidance(
collection_id: str, query_type: str = "features"
) -> List[PromptMessage]:
"""Generate guidance for querying OS NGD collections"""
return [
PromptMessage(
role="user",
content=TextContent(
type="text",
text=f"As an OS NGD API expert, guide me through querying the '{collection_id}' collection for {query_type}. "
f"Include: 1) Available filters, 2) Best practices for bbox queries, "
f"3) CRS considerations, 4) Example queries with proper syntax.",
),
)
]
@self.mcp.prompt()
def workflow_planning(
user_request: str, data_theme: str = "transport"
) -> List[PromptMessage]:
"""Generate a workflow plan for complex OS NGD queries"""
return [
PromptMessage(
role="user",
content=TextContent(
type="text",
text=f"As a geospatial workflow planner, create a detailed workflow plan for: '{user_request}'. "
f"Focus on {data_theme} theme data. Include: "
f"1) Collection selection rationale, "
f"2) Query sequence with dependencies, "
f"3) Filter strategies, "
f"4) Error handling considerations.",
),
)
]
```
--------------------------------------------------------------------------------
/src/config_docs/ogcapi-features-2.yaml:
--------------------------------------------------------------------------------
```yaml
openapi: 3.0.3
info:
title: "Building Blocks specified in OGC API - Features - Part 2: Coordinate Reference Systems by Reference"
description: |-
Common components used in the
[OGC standard "OGC API - Features - Part 2: Coordinate Reference Systems by Reference"](http://docs.opengeospatial.org/is/18-058/18-058.html).
OGC API - Features - Part 2: Coordinate Reference Systems by Reference 1.0 is an OGC Standard.
Copyright (c) 2020 Open Geospatial Consortium.
To obtain additional rights of use, visit http://www.opengeospatial.org/legal/ .
This document is also available on
[OGC](http://schemas.opengis.net/ogcapi/features/part2/1.0/openapi/ogcapi-features-2.yaml).
version: '1.0.0'
contact:
name: Clemens Portele
email: [email protected]
license:
name: OGC License
url: 'http://www.opengeospatial.org/legal/'
components:
parameters:
bbox-crs:
name: bbox-crs
in: query
required: false
schema:
type: string
format: uri
style: form
explode: false
crs:
name: crs
in: query
required: false
schema:
type: string
format: uri
style: form
explode: false
schemas:
collection:
allOf:
- $ref: http://schemas.opengis.net/ogcapi/features/part1/1.0/openapi/schemas/collection.yaml
- $ref: '#/components/schemas/collectionExtensionCrs'
collectionExtensionCrs:
type: object
properties:
storageCrs:
description: the CRS identifier, from the list of supported CRS identifiers, that may be used to retrieve features from a collection without the need to apply a CRS transformation
type: string
format: uri
storageCrsCoordinateEpoch:
description: point in time at which coordinates in the spatial feature collection are referenced to the dynamic coordinate reference system in `storageCrs`, that may be used to retrieve features from a collection without the need to apply a change of coordinate epoch. It is expressed as a decimal year in the Gregorian calendar
type: number
example: '2017-03-25 in the Gregorian calendar is epoch 2017.23'
collections:
allOf:
- $ref: http://schemas.opengis.net/ogcapi/features/part1/1.0/openapi/schemas/collections.yaml
- $ref: '#/components/schemas/collectionsExtensionCrs'
collectionsExtensionCrs:
type: object
properties:
crs:
description: a global list of CRS identifiers that are supported by spatial feature collections offered by the service
type: array
items:
type: string
format: uri
headers:
Content-Crs:
description: a URI, in angular brackets, identifying the coordinate reference system used in the content / payload
schema:
type: string
example: '<http://www.opengis.net/def/crs/EPSG/0/3395>'
```
--------------------------------------------------------------------------------
/src/mcp_service/protocols.py:
--------------------------------------------------------------------------------
```python
from typing import Protocol, Optional, Callable, List, runtime_checkable, Any
@runtime_checkable
class MCPService(Protocol):
"""Protocol for MCP services"""
def tool(self) -> Callable[..., Any]:
"""Register a function as an MCP tool"""
...
def resource(
self,
uri: str,
*,
name: str | None = None,
title: str | None = None,
description: str | None = None,
mime_type: str | None = None,
) -> Callable[[Any], Any]:
"""Register a function as an MCP resource"""
...
def run(self) -> None:
"""Run the MCP service"""
...
@runtime_checkable
class FeatureService(Protocol):
"""Protocol for OS NGD feature services"""
def hello_world(self, name: str) -> str:
"""Test connection to the service"""
...
def check_api_key(self) -> str:
"""Check if API key is available"""
...
async def list_collections(self) -> str:
"""List all available feature collections"""
...
async def get_single_collection(self, collection_id: str) -> str:
"""Get detailed information about a specific collection"""
...
async def get_single_collection_queryables(self, collection_id: str) -> str:
"""Get queryable properties for a collection"""
...
# TODO: Need to make sure the full list of parameters is supported
# TODO: Supporting cql-text is clunky and need to figure out how to support this better
async def search_features(
self,
collection_id: str,
bbox: Optional[str] = None,
crs: Optional[str] = None,
limit: int = 10,
offset: int = 0,
filter: Optional[str] = None,
filter_lang: Optional[str] = "cql-text",
query_attr: Optional[str] = None,
query_attr_value: Optional[str] = None,
) -> str:
"""Search for features in a collection with full CQL filter support"""
...
async def get_feature(
self, collection_id: str, feature_id: str, crs: Optional[str] = None
) -> str:
"""Get a specific feature by ID"""
...
async def get_linked_identifiers(
self, identifier_type: str, identifier: str, feature_type: Optional[str] = None
) -> str:
"""Get linked identifiers for a specified identifier"""
...
async def get_bulk_features(
self,
collection_id: str,
identifiers: List[str],
query_by_attr: Optional[str] = None,
) -> str:
"""Get multiple features in a single call"""
...
async def get_bulk_linked_features(
self,
identifier_type: str,
identifiers: List[str],
feature_type: Optional[str] = None,
) -> str:
"""Get linked features for multiple identifiers"""
...
async def fetch_detailed_collections(self, collection_ids: List[str]) -> str:
"""Get detailed information about specific collections for workflow planning"""
...
```
--------------------------------------------------------------------------------
/src/middleware/stdio_middleware.py:
--------------------------------------------------------------------------------
```python
import json
import time
import asyncio
from collections import deque
from typing import Callable, TypeVar, Any, Union, cast
from functools import wraps
from utils.logging_config import get_logger
logger = get_logger(__name__)
F = TypeVar("F", bound=Callable[..., Any])
class StdioRateLimiter:
"""STDIO-specific rate limiting"""
def __init__(self, requests_per_minute: int = 10, window_seconds: int = 60):
self.requests_per_minute = requests_per_minute
self.window_seconds = window_seconds
self.request_timestamps = deque()
def check_rate_limit(self) -> bool:
"""Check rate limit for STDIO client"""
current_time = time.time()
while (
self.request_timestamps
and current_time - self.request_timestamps[0] >= self.window_seconds
):
self.request_timestamps.popleft()
if len(self.request_timestamps) >= self.requests_per_minute:
logger.warning("STDIO rate limit exceeded")
return False
self.request_timestamps.append(current_time)
return True
class StdioMiddleware:
"""STDIO authentication and rate limiting"""
def __init__(self, requests_per_minute: int = 20):
self.authenticated = False
self.client_id = "anonymous"
self.rate_limiter = StdioRateLimiter(requests_per_minute=requests_per_minute)
def require_auth_and_rate_limit(self, func: F) -> F:
"""Decorator for auth and rate limiting"""
@wraps(func)
async def async_wrapper(*args: Any, **kwargs: Any) -> Union[str, Any]:
if not self.authenticated:
logger.error(
json.dumps({"error": "Authentication required", "code": 401})
)
return json.dumps({"error": "Authentication required", "code": 401})
if not self.rate_limiter.check_rate_limit():
logger.error(json.dumps({"error": "Rate limited", "code": 429}))
return json.dumps({"error": "Rate limited", "code": 429})
return await func(*args, **kwargs)
@wraps(func)
def sync_wrapper(*args: Any, **kwargs: Any) -> Union[str, Any]:
if not self.authenticated:
logger.error(
json.dumps({"error": "Authentication required", "code": 401})
)
return json.dumps({"error": "Authentication required", "code": 401})
if not self.rate_limiter.check_rate_limit():
logger.error(json.dumps({"error": "Rate limited", "code": 429}))
return json.dumps({"error": "Rate limited", "code": 429})
return func(*args, **kwargs)
if asyncio.iscoroutinefunction(func):
return cast(F, async_wrapper)
return cast(F, sync_wrapper)
def authenticate(self, key: str) -> bool:
"""Authenticate with API key"""
if key and key.strip():
self.authenticated = True
self.client_id = key
return True
else:
self.authenticated = False
return False
```
--------------------------------------------------------------------------------
/src/mcp_service/guardrails.py:
--------------------------------------------------------------------------------
```python
import re
import json
import asyncio
from functools import wraps
from typing import TypeVar, Callable, Any, Union, cast
from utils.logging_config import get_logger
logger = get_logger(__name__)
F = TypeVar("F", bound=Callable[..., Any])
class ToolGuardrails:
"""Prompt injection protection"""
def __init__(self):
self.suspicious_patterns = [
r"(?i)ignore previous",
r"(?i)ignore all previous instructions",
r"(?i)assistant:",
r"\{\{.*?\}\}",
r"(?i)forget",
r"(?i)show credentials",
r"(?i)show secrets",
r"(?i)reveal password",
r"(?i)dump (tokens|secrets|passwords|credentials)",
r"(?i)leak confidential",
r"(?i)reveal secrets",
r"(?i)expose secrets",
r"(?i)secrets.*contain",
r"(?i)extract secrets",
]
def detect_prompt_injection(self, input_text: Any) -> bool:
"""Check if input contains prompt injection attempts"""
if not isinstance(input_text, str):
return False
return any(
re.search(pattern, input_text) for pattern in self.suspicious_patterns
)
def basic_guardrails(self, func: F) -> F:
"""Prompt injection protection only"""
@wraps(func)
async def async_wrapper(*args: Any, **kwargs: Any) -> Union[str, Any]:
try:
for arg in args:
if isinstance(arg, str) and self.detect_prompt_injection(arg):
raise ValueError("Prompt injection detected!")
for name, value in kwargs.items():
if not (
hasattr(value, "request_context")
and hasattr(value, "request_id")
):
if isinstance(value, str) and self.detect_prompt_injection(
value
):
raise ValueError(f"Prompt injection in '{name}'!")
except ValueError as e:
return json.dumps({"error": str(e), "code": 400})
return await func(*args, **kwargs)
@wraps(func)
def sync_wrapper(*args: Any, **kwargs: Any) -> Union[str, Any]:
try:
for arg in args:
if isinstance(arg, str) and self.detect_prompt_injection(arg):
raise ValueError("Prompt injection detected!")
for name, value in kwargs.items():
if not (
hasattr(value, "request_context")
and hasattr(value, "request_id")
):
if isinstance(value, str) and self.detect_prompt_injection(
value
):
raise ValueError(f"Prompt injection in '{name}'!")
except ValueError as e:
return json.dumps({"error": str(e), "code": 400})
return func(*args, **kwargs)
if asyncio.iscoroutinefunction(func):
return cast(F, async_wrapper)
return cast(F, sync_wrapper)
```
--------------------------------------------------------------------------------
/src/models.py:
--------------------------------------------------------------------------------
```python
"""
Types for the OS NGD API MCP Server
"""
from enum import Enum
from pydantic import BaseModel
from typing import Any, List, Dict
class NGDAPIEndpoint(Enum):
"""
Enum for the OS API Endpoints following OGC API Features standard
"""
# NGD Features Endpoints
NGD_FEATURES_BASE_PATH = "https://api.os.uk/features/ngd/ofa/v1/{}"
COLLECTIONS = NGD_FEATURES_BASE_PATH.format("collections")
COLLECTION_INFO = NGD_FEATURES_BASE_PATH.format("collections/{}")
COLLECTION_SCHEMA = NGD_FEATURES_BASE_PATH.format("collections/{}/schema")
COLLECTION_FEATURES = NGD_FEATURES_BASE_PATH.format("collections/{}/items")
COLLECTION_FEATURE_BY_ID = NGD_FEATURES_BASE_PATH.format("collections/{}/items/{}")
COLLECTION_QUERYABLES = NGD_FEATURES_BASE_PATH.format("collections/{}/queryables")
# OpenAPI Specification Endpoint
OPENAPI_SPEC = NGD_FEATURES_BASE_PATH.format("api")
# Linked Identifiers Endpoints
LINKED_IDENTIFIERS_BASE_PATH = "https://api.os.uk/search/links/v1/{}"
LINKED_IDENTIFIERS = LINKED_IDENTIFIERS_BASE_PATH.format("identifierTypes/{}/{}")
# Markdown Resources
MARKDOWN_BASE_PATH = "https://docs.os.uk/osngd/data-structure/{}"
MARKDOWN_STREET = MARKDOWN_BASE_PATH.format("transport/transport-network/street.md")
MARKDOWN_ROAD = MARKDOWN_BASE_PATH.format("transport/transport-network/road.md")
TRAM_ON_ROAD = MARKDOWN_BASE_PATH.format(
"transport/transport-network/tram-on-road.md"
)
ROAD_NODE = MARKDOWN_BASE_PATH.format("transport/transport-network/road-node.md")
ROAD_LINK = MARKDOWN_BASE_PATH.format("transport/transport-network/road-link.md")
ROAD_JUNCTION = MARKDOWN_BASE_PATH.format(
"transport/transport-network/road-junction.md"
)
# Places API Endpoints
# TODO: Add these back in when I get access to the Places API from OS
# PLACES_BASE_PATH = "https://api.os.uk/search/places/v1/{}"
# PLACES_UPRN = PLACES_BASE_PATH.format("uprn")
# POST_CODE = PLACES_BASE_PATH.format("postcode")
class OpenAPISpecification(BaseModel):
"""Parsed OpenAPI specification optimized for LLM context"""
title: str
version: str
base_url: str
endpoints: Dict[str, str]
collection_ids: List[str]
supported_crs: Dict[str, Any]
crs_guide: Dict[str, str]
class WorkflowStep(BaseModel):
"""A single step in a workflow plan"""
step_number: int
description: str
api_endpoint: str
parameters: Dict[str, Any]
dependencies: List[int] = []
class WorkflowPlan(BaseModel):
"""Generated workflow plan for user requests"""
user_request: str
steps: List[WorkflowStep]
reasoning: str
estimated_complexity: str = "simple"
class Collection(BaseModel):
"""Represents a feature collection from the OS NGD API"""
id: str
title: str
description: str = ""
links: List[Dict[str, Any]] = []
extent: Dict[str, Any] = {}
itemType: str = "feature"
class CollectionsCache(BaseModel):
"""Cached collections data with filtering applied"""
collections: List[Collection]
raw_response: Dict[str, Any]
class CollectionQueryables(BaseModel):
"""Queryables information for a collection"""
id: str
title: str
description: str
all_queryables: Dict[str, Any]
enum_queryables: Dict[str, Any]
has_enum_filters: bool
total_queryables: int
enum_count: int
class WorkflowContextCache(BaseModel):
"""Cached workflow context data"""
collections_info: Dict[str, CollectionQueryables]
openapi_spec: OpenAPISpecification
cached_at: float
```
--------------------------------------------------------------------------------
/src/server.py:
--------------------------------------------------------------------------------
```python
import argparse
import os
import uvicorn
from typing import Any
from utils.logging_config import configure_logging
from api_service.os_api import OSAPIClient
from mcp_service.os_service import OSDataHubService
from mcp.server.fastmcp import FastMCP
from middleware.stdio_middleware import StdioMiddleware
from middleware.http_middleware import HTTPMiddleware
from starlette.middleware import Middleware
from starlette.middleware.cors import CORSMiddleware
from starlette.routing import Route
from starlette.responses import JSONResponse
logger = configure_logging()
def main():
"""Main entry point"""
parser = argparse.ArgumentParser(description="OS DataHub API MCP Server")
parser.add_argument(
"--transport",
choices=["stdio", "streamable-http"],
default="stdio",
help="Transport protocol to use (stdio or streamable-http)",
)
parser.add_argument(
"--host", default="0.0.0.0", help="Host to bind to (default: 0.0.0.0)"
)
parser.add_argument(
"--port", type=int, default=8000, help="Port to bind to (default: 8000)"
)
parser.add_argument("--debug", action="store_true", help="Enable debug mode")
args = parser.parse_args()
configure_logging(debug=args.debug)
logger.info(
f"OS DataHub API MCP Server starting with {args.transport} transport..."
)
api_client = OSAPIClient()
match args.transport:
case "stdio":
logger.info("Starting with stdio transport")
mcp = FastMCP(
"os-ngd-api",
debug=args.debug,
log_level="DEBUG" if args.debug else "INFO",
)
stdio_auth = StdioMiddleware()
service = OSDataHubService(api_client, mcp, stdio_middleware=stdio_auth)
stdio_api_key = os.environ.get("STDIO_KEY")
if not stdio_api_key or not stdio_auth.authenticate(stdio_api_key):
logger.error("Authentication failed")
return
service.run()
case "streamable-http":
logger.info(f"Starting Streamable HTTP server on {args.host}:{args.port}")
mcp = FastMCP(
"os-ngd-api",
host=args.host,
port=args.port,
debug=args.debug,
json_response=True,
stateless_http=False,
log_level="DEBUG" if args.debug else "INFO",
)
OSDataHubService(api_client, mcp)
async def auth_discovery(_: Any) -> JSONResponse:
"""Return authentication methods."""
return JSONResponse(
content={"authMethods": [{"type": "http", "scheme": "bearer"}]}
)
app = mcp.streamable_http_app()
app.routes.append(
Route(
"/.well-known/mcp-auth",
endpoint=auth_discovery,
methods=["GET"],
)
)
app.user_middleware.extend(
[
Middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["GET", "POST", "OPTIONS"],
allow_headers=["*"],
expose_headers=["*"],
),
Middleware(HTTPMiddleware),
]
)
uvicorn.run(
app,
host=args.host,
port=args.port,
log_level="debug" if args.debug else "info",
)
case _:
logger.error(f"Unknown transport: {args.transport}")
return
if __name__ == "__main__":
main()
```
--------------------------------------------------------------------------------
/src/config_docs/ogcapi-features-3.yaml:
--------------------------------------------------------------------------------
```yaml
openapi: 3.0.3
info:
title: "Building Blocks specified in OGC API - Features - Part 3: Filtering"
description: |-
Common components used in the
[OGC standard "OGC API - Features - Part 3: Filtering"](https://docs.ogc.org/is/19-079r2/19-079r2.html).
OGC API - Features - Part 3: Filtering 1.0 is an OGC Standard.
Copyright (c) 2024 Open Geospatial Consortium.
To obtain additional rights of use, visit https://www.ogc.org/legal/ .
This document is also available in the
[OGC Schema Repository](https://schemas.opengis.net/ogcapi/features/part3/1.0/openapi/ogcapi-features-3.yaml).
version: '1.0.0'
contact:
name: Clemens Portele
email: [email protected]
license:
name: OGC License
url: 'https://www.ogc.org/legal/'
paths:
/collections/{collectionId}/queryables:
get:
summary: Get the list of supported queryables for a collection
description: |-
This operation returns the list of supported queryables of a collection. Queryables are
the properties that may be used to construct a filter expression on items in the collection.
The response is a JSON Schema of a object where each property is a queryable.
operationId: getQueryables
parameters:
- $ref: 'https://schemas.opengis.net/ogcapi/features/part1/1.0/openapi/parameters/collectionId.yaml'
responses:
'200':
description: The queryable properties of the collection.
content:
application/schema+json:
schema:
type: object
/functions:
get:
summary: Get the list of supported functions
description: |-
This operation returns the list of custom functions supported in CQL2 expressions.
operationId: getFunctions
responses:
'200':
description: The list of custom functions supported in CQL2 expressions.
content:
application/json:
schema:
$ref: '#/components/schemas/functions'
components:
parameters:
filter:
name: filter
in: query
required: false
schema:
type: string
style: form
explode: false
filter-lang:
name: filter-lang
in: query
required: false
schema:
type: string
enum:
- 'cql2-text'
- 'cql2-json'
default: 'cql2-text'
style: form
filter-crs:
name: filter-crs
in: query
required: false
schema:
type: string
format: uri-reference
style: form
explode: false
schemas:
functions:
type: object
required:
- functions
properties:
functions:
type: array
items:
type: object
required:
- name
- returns
properties:
name:
type: string
description:
type: string
metadataUrl:
type: string
format: uri-reference
arguments:
type: array
items:
type: object
required:
- type
properties:
title:
type: string
description:
type: string
type:
type: array
items:
type: string
enum:
- string
- number
- integer
- datetime
- geometry
- boolean
returns:
type: array
items:
type: string
enum:
- string
- number
- integer
- datetime
- geometry
- boolean
```
--------------------------------------------------------------------------------
/src/middleware/http_middleware.py:
--------------------------------------------------------------------------------
```python
import os
import time
from collections import defaultdict, deque
from typing import List, Callable, Awaitable
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.requests import Request
from starlette.responses import Response, JSONResponse
from utils.logging_config import get_logger
logger = get_logger(__name__)
class RateLimiter:
"""HTTP-layer rate limiting"""
def __init__(self, requests_per_minute: int = 10, window_seconds: int = 60):
self.requests_per_minute = requests_per_minute
self.window_seconds = window_seconds
self.request_timestamps = defaultdict(lambda: deque())
def check_rate_limit(self, client_id: str) -> bool:
"""Check if client has exceeded rate limit"""
current_time = time.time()
timestamps = self.request_timestamps[client_id]
while timestamps and current_time - timestamps[0] >= self.window_seconds:
timestamps.popleft()
if len(timestamps) >= self.requests_per_minute:
logger.warning(f"HTTP rate limit exceeded for client {client_id}")
return False
timestamps.append(current_time)
return True
def get_valid_bearer_tokens() -> List[str]:
"""Get valid bearer tokens from environment variable."""
try:
tokens = os.environ.get("BEARER_TOKENS", "").split(",")
valid_tokens = [t.strip() for t in tokens if t.strip()]
if not valid_tokens:
logger.warning(
"No BEARER_TOKENS configured, all authentication will be rejected"
)
return []
return valid_tokens
except Exception as e:
logger.error(f"Error getting valid tokens: {e}")
return []
async def verify_bearer_token(token: str) -> bool:
"""Verify bearer token is valid."""
try:
valid_tokens = get_valid_bearer_tokens()
if not valid_tokens or not token:
return False
return token in valid_tokens
except Exception as e:
logger.error(f"Error validating token: {e}")
return False
class HTTPMiddleware(BaseHTTPMiddleware):
def __init__(self, app, requests_per_minute: int = 10):
super().__init__(app)
self.rate_limiter = RateLimiter(requests_per_minute=requests_per_minute)
async def dispatch(
self, request: Request, call_next: Callable[[Request], Awaitable[Response]]
) -> Response:
if request.url.path == "/.well-known/mcp-auth" or request.method == "OPTIONS":
return await call_next(request)
session_id = request.headers.get("mcp-session-id")
if not session_id:
client_ip = request.client.host if request.client else "unknown"
session_id = f"ip-{client_ip}"
if not self.rate_limiter.check_rate_limit(session_id):
return JSONResponse(
status_code=429,
content={"detail": "Too many requests. Please try again later."},
headers={"Retry-After": "60"},
)
origin = request.headers.get("origin", "")
if origin and not self._is_valid_origin(origin, request):
client_ip = request.client.host if request.client else "unknown"
logger.warning(
f"Blocked request with suspicious origin from {client_ip}, Origin: {origin}"
)
return JSONResponse(status_code=403, content={"detail": "Invalid origin"})
user_agent = request.headers.get("user-agent", "")
if self._is_browser_plugin(user_agent, request):
client_ip = request.client.host if request.client else "unknown"
logger.warning(
f"Blocked browser plugin access from {client_ip}, User-Agent: {user_agent}"
)
return JSONResponse(
status_code=403,
content={"detail": "Browser plugin access is not allowed"},
)
auth_header = request.headers.get("Authorization")
if auth_header and auth_header.startswith("Bearer "):
token = auth_header.replace("Bearer ", "")
if await verify_bearer_token(token):
request.state.token = token
return await call_next(request)
else:
logger.warning(
f"Invalid bearer token attempt from {request.client.host if request.client else 'unknown'}"
)
else:
logger.warning(
f"Missing or invalid Authorization header from {request.client.host if request.client else 'unknown'}"
)
return JSONResponse(
status_code=401,
content={"detail": "Authentication required"},
headers={"WWW-Authenticate": "Bearer"},
)
def _is_valid_origin(self, origin: str, request: Request) -> bool:
"""Validate Origin header to prevent DNS rebinding attacks."""
valid_local_origins = ["http://localhost:", "http://127.0.0.1:"]
valid_domains = os.environ.get("ALLOWED_ORIGINS", "").split(",")
valid_domains = [d.strip() for d in valid_domains if d.strip()]
for valid_origin in valid_local_origins:
if origin.startswith(valid_origin):
return True
for domain in valid_domains:
if (
origin == domain
or origin.startswith(f"https://{domain}")
or origin.startswith(f"http://{domain}")
):
return True
return False
def _is_browser_plugin(self, user_agent: str, request: Request) -> bool:
"""Check if request is from a browser plugin."""
plugin_patterns = [
"Chrome-Extension",
"Mozilla/5.0 (compatible; Extension)",
"Browser-Extension",
]
for pattern in plugin_patterns:
if pattern.lower() in user_agent.lower():
return True
origin = request.headers.get("origin", "")
if origin and (
origin.startswith("chrome-extension://")
or origin.startswith("moz-extension://")
or origin.startswith("safari-extension://")
):
return True
return False
```
--------------------------------------------------------------------------------
/src/stdio_client_test.py:
--------------------------------------------------------------------------------
```python
import asyncio
import subprocess
import sys
import os
import time
from mcp import ClientSession
from mcp.client.stdio import stdio_client
from mcp.client.stdio import StdioServerParameters
from mcp.types import TextContent
def extract_text_from_result(result) -> str:
"""Safely extract text from MCP tool result"""
if not result.content:
return "No content"
for item in result.content:
if isinstance(item, TextContent):
return item.text
content_types = [type(item).__name__ for item in result.content]
return f"Non-text content: {', '.join(content_types)}"
async def test_stdio_rate_limiting():
"""Test STDIO rate limiting - should block after 1 request per minute"""
env = os.environ.copy()
env["STDIO_KEY"] = "test-stdio-key"
env["OS_API_KEY"] = os.environ.get("OS_API_KEY", "dummy-key-for-testing")
env["PYTHONPATH"] = "src"
print("Starting STDIO server subprocess...")
server_process = subprocess.Popen(
[sys.executable, "src/server.py", "--transport", "stdio", "--debug"],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
env=env,
text=True,
bufsize=0,
)
try:
print("Connecting to STDIO server...")
server_params = StdioServerParameters(
command=sys.executable,
args=["src/server.py", "--transport", "stdio", "--debug"],
env=env,
)
async with stdio_client(server_params) as (read_stream, write_stream):
async with ClientSession(read_stream, write_stream) as session:
print("Initializing session...")
await session.initialize()
print("Listing available tools...")
tools = await session.list_tools()
print(f" Found {len(tools.tools)} tools")
print("\nRAPID FIRE TEST (should hit rate limit)")
print("-" * 50)
results = []
for i in range(3):
try:
start_time = time.time()
print(f"Request #{i + 1}: ", end="", flush=True)
result = await session.call_tool(
"hello_world", {"name": f"TestUser{i + 1}"}
)
elapsed = time.time() - start_time
result_text = extract_text_from_result(result)
if "rate limit" in result_text.lower() or "429" in result_text:
print(f"BLOCKED ({elapsed:.1f}s) - {result_text}")
results.append(("BLOCKED", elapsed, result_text))
else:
print(f"SUCCESS ({elapsed:.1f}s) - {result_text}")
results.append(("SUCCESS", elapsed, result_text))
except Exception as e:
elapsed = time.time() - start_time
print(f"ERROR ({elapsed:.1f}s) - {e}")
results.append(("ERROR", elapsed, str(e)))
if i < 2:
await asyncio.sleep(0.1)
print("\nRESULTS ANALYSIS")
print("-" * 50)
success_count = sum(1 for r in results if r[0] == "SUCCESS")
blocked_count = sum(1 for r in results if r[0] == "BLOCKED")
error_count = sum(1 for r in results if r[0] == "ERROR")
print(f"Successful requests: {success_count}")
print(f"Rate limited requests: {blocked_count}")
print(f"Error requests: {error_count}")
print("\nVISUAL TIMELINE:")
timeline = ""
for i, (status, elapsed, _) in enumerate(results):
if status == "SUCCESS":
timeline += "OK"
elif status == "BLOCKED":
timeline += "XX"
else:
timeline += "ER"
if i < len(results) - 1:
timeline += "--"
print(f" {timeline}")
print(" Request: 1 2 3")
print("\nTEST VERDICT")
print("-" * 50)
if success_count == 1 and blocked_count >= 1:
print("PASS: Rate limiting works correctly!")
print(" First request succeeded")
print(" Subsequent requests blocked")
elif success_count > 1:
print("FAIL: Rate limiting too permissive!")
print(f" {success_count} requests succeeded (expected 1)")
else:
print("FAIL: No requests succeeded!")
print(" Check authentication or server issues")
print("\nWAITING FOR RATE LIMIT RESET...")
print(" (Testing if limit resets after window)")
print(" Waiting 10 seconds...")
for countdown in range(10, 0, -1):
print(f" {countdown}s remaining...", end="\r", flush=True)
await asyncio.sleep(1)
print("\nTesting after rate limit window...")
try:
result = await session.call_tool(
"hello_world", {"name": "AfterWait"}
)
result_text = extract_text_from_result(result)
if "rate limit" not in result_text.lower():
print("SUCCESS: Rate limit properly reset!")
else:
print("Rate limit still active (might need longer wait)")
except Exception as e:
print(f"Error after wait: {e}")
except Exception as e:
print(f"Test failed with error: {e}")
finally:
print("\nCleaning up server process...")
server_process.terminate()
try:
await asyncio.wait_for(
asyncio.create_task(asyncio.to_thread(server_process.wait)), timeout=5.0
)
print("Server terminated gracefully")
except asyncio.TimeoutError:
print("Force killing server...")
server_process.kill()
server_process.wait()
if __name__ == "__main__":
print("STDIO Rate Limit Test Suite")
print("Testing if STDIO middleware properly blocks > 1 request/minute")
print()
if not os.environ.get("OS_API_KEY"):
print("Warning: OS_API_KEY not set, using dummy key for testing")
print(" (This is OK for rate limit testing)")
print()
asyncio.run(test_stdio_rate_limiting())
```
--------------------------------------------------------------------------------
/src/http_client_test.py:
--------------------------------------------------------------------------------
```python
import asyncio
import logging
from mcp.client.streamable_http import streamablehttp_client
from mcp import ClientSession
from mcp.types import TextContent
logging.basicConfig(level=logging.WARNING)
logger = logging.getLogger(__name__)
def extract_text_from_result(result) -> str:
"""Safely extract text from MCP tool result"""
if not result.content:
return "No content"
for item in result.content:
if isinstance(item, TextContent):
return item.text
content_types = [type(item).__name__ for item in result.content]
return f"Non-text content: {', '.join(content_types)}"
async def test_usrn_search(session_name: str, usrn_values: list):
"""Test USRN searches with different values"""
headers = {"Authorization": "Bearer dev-token"}
results = []
session_id = None
print(f"\n{session_name} - Testing USRN searches")
print("-" * 40)
try:
async with streamablehttp_client(
"http://127.0.0.1:8000/mcp", headers=headers
) as (
read_stream,
write_stream,
get_session_id,
):
async with ClientSession(read_stream, write_stream) as session:
await session.initialize()
session_id = get_session_id()
print(f"{session_name} Session ID: {session_id}")
for i, usrn in enumerate(usrn_values):
try:
result = await session.call_tool(
"search_features",
{
"collection_id": "trn-ntwk-street-1",
"query_attr": "usrn",
"query_attr_value": str(usrn),
"limit": 5,
},
)
result_text = extract_text_from_result(result)
print(f" USRN {usrn}: SUCCESS - Found data")
print(result_text)
results.append(("SUCCESS", usrn, result_text[:100] + "..."))
await asyncio.sleep(0.1)
except Exception as e:
if "429" in str(e) or "Too Many Requests" in str(e):
print(f" USRN {usrn}: BLOCKED - Rate limited")
results.append(("BLOCKED", usrn, str(e)))
else:
print(f" USRN {usrn}: ERROR - {e}")
results.append(("ERROR", usrn, str(e)))
except Exception as e:
print(f"{session_name}: Connection error - {str(e)[:100]}")
if len(results) == 0:
results = [("ERROR", "connection", str(e))] * len(usrn_values)
return results, session_id
async def test_usrn_calls():
"""Test calling USRN searches with different values"""
print("USRN Search Test - Two Different USRNs")
print("=" * 50)
# Different USRN values to test
usrn_values_1 = ["24501091", "24502114"]
usrn_values_2 = ["24502114", "24501091"]
results_a, session_id_a = await test_usrn_search("SESSION-A", usrn_values_1)
await asyncio.sleep(0.5)
results_b, session_id_b = await test_usrn_search("SESSION-B", usrn_values_2)
# Analyze results
print("\nRESULTS SUMMARY")
print("=" * 30)
success_a = len([r for r in results_a if r[0] == "SUCCESS"])
blocked_a = len([r for r in results_a if r[0] == "BLOCKED"])
error_a = len([r for r in results_a if r[0] == "ERROR"])
success_b = len([r for r in results_b if r[0] == "SUCCESS"])
blocked_b = len([r for r in results_b if r[0] == "BLOCKED"])
error_b = len([r for r in results_b if r[0] == "ERROR"])
print(f"SESSION-A: {success_a} success, {blocked_a} blocked, {error_a} errors")
print(f"SESSION-B: {success_b} success, {blocked_b} blocked, {error_b} errors")
print(f"Total: {success_a + success_b} success, {blocked_a + blocked_b} blocked")
if session_id_a and session_id_b:
print(f"Different session IDs: {session_id_a != session_id_b}")
else:
print("Could not compare session IDs")
# Show detailed results
print("\nDETAILED RESULTS:")
print("SESSION-A:")
for status, usrn, details in results_a:
print(f" USRN {usrn}: {status}")
print("SESSION-B:")
for status, usrn, details in results_b:
print(f" USRN {usrn}: {status}")
async def test_session_safe(session_name: str, requests: int = 3):
"""Test a single session with safer error handling"""
headers = {"Authorization": "Bearer dev-token"}
results = []
session_id = None
print(f"\n{session_name} - Testing {requests} requests")
print("-" * 40)
try:
async with streamablehttp_client(
"http://127.0.0.1:8000/mcp", headers=headers
) as (
read_stream,
write_stream,
get_session_id,
):
async with ClientSession(read_stream, write_stream) as session:
await session.initialize()
session_id = get_session_id()
print(f"{session_name} Session ID: {session_id}")
for i in range(requests):
try:
result = await session.call_tool(
"hello_world", {"name": f"{session_name}-User{i + 1}"}
)
result_text = extract_text_from_result(result)
print(f" Request {i + 1}: SUCCESS - {result_text}")
results.append("SUCCESS")
await asyncio.sleep(0.1)
except Exception as e:
if "429" in str(e) or "Too Many Requests" in str(e):
print(f" Request {i + 1}: BLOCKED - Rate limited")
results.append("BLOCKED")
else:
print(f" Request {i + 1}: ERROR - {e}")
results.append("ERROR")
for j in range(i + 1, requests):
print(f" Request {j + 1}: BLOCKED - Session rate limited")
results.append("BLOCKED")
break
except Exception as e:
print(f"{session_name}: Connection error - {str(e)[:100]}")
if len(results) == 0:
results = ["ERROR"] * requests
elif len(results) < requests:
remaining = requests - len(results)
results.extend(["BLOCKED"] * remaining)
return results, session_id
async def test_two_sessions():
"""Test rate limiting across two sessions"""
print("HTTP Rate Limit Test - Two Sessions")
print("=" * 50)
results_a, session_id_a = await test_session_safe("SESSION-A", 20)
await asyncio.sleep(0.5)
results_b, session_id_b = await test_session_safe("SESSION-B", 20)
# Analyze results
print("\nRESULTS SUMMARY")
print("=" * 30)
success_a = results_a.count("SUCCESS")
blocked_a = results_a.count("BLOCKED")
error_a = results_a.count("ERROR")
success_b = results_b.count("SUCCESS")
blocked_b = results_b.count("BLOCKED")
error_b = results_b.count("ERROR")
print(f"SESSION-A: {success_a} success, {blocked_a} blocked, {error_a} errors")
print(f"SESSION-B: {success_b} success, {blocked_b} blocked, {error_b} errors")
print(f"Total: {success_a + success_b} success, {blocked_a + blocked_b} blocked")
if session_id_a and session_id_b:
print(f"Different session IDs: {session_id_a != session_id_b}")
else:
print("Could not compare session IDs")
total_success = success_a + success_b
total_blocked = blocked_a + blocked_b
print("\nRATE LIMITING ASSESSMENT:")
if total_success >= 2 and total_blocked >= 2:
print("✅ PASS: Rate limiting is working")
print(f" - {total_success} requests succeeded")
print(f" - {total_blocked} requests were rate limited")
print(" - Each session got limited after ~2 requests")
elif total_success == 0:
print("❌ FAIL: No requests succeeded (check server/auth)")
else:
print("⚠️ UNCLEAR: Unexpected pattern")
print(" Check server logs for actual behavior")
if __name__ == "__main__":
# Run the USRN test by default
asyncio.run(test_usrn_calls())
# Uncomment the line below to run the original test instead
# asyncio.run(test_two_sessions())
```
--------------------------------------------------------------------------------
/src/mcp_service/routing_service.py:
--------------------------------------------------------------------------------
```python
from typing import Dict, List, Set, Optional, Any
from dataclasses import dataclass
from utils.logging_config import get_logger
logger = get_logger(__name__)
@dataclass
class RouteNode:
"""Represents a routing node (intersection)"""
id: int
node_identifier: str
connected_edges: Set[int]
@dataclass
class RouteEdge:
"""Represents a routing edge (road segment)"""
id: int
road_id: str
road_name: Optional[str]
source_node_id: int
target_node_id: int
cost: float
reverse_cost: float
geometry: Optional[Dict[str, Any]]
class InMemoryRoutingNetwork:
"""In-memory routing network built from OS NGD data"""
def __init__(self):
self.nodes: Dict[int, RouteNode] = {}
self.edges: Dict[int, RouteEdge] = {}
self.node_lookup: Dict[str, int] = {}
self.is_built = False
def add_node(self, node_identifier: str) -> int:
"""Add a node and return its internal ID"""
if node_identifier in self.node_lookup:
return self.node_lookup[node_identifier]
node_id = len(self.nodes) + 1
self.nodes[node_id] = RouteNode(
id=node_id,
node_identifier=node_identifier,
connected_edges=set(),
)
self.node_lookup[node_identifier] = node_id
return node_id
def add_edge(self, road_data: Dict[str, Any]) -> None:
"""Add a road link as an edge"""
properties = road_data.get("properties", {})
start_node = properties.get("startnode", "")
end_node = properties.get("endnode", "")
if not start_node or not end_node:
logger.warning(f"Road link {properties.get('id')} missing node data")
return
source_id = self.add_node(start_node)
target_id = self.add_node(end_node)
roadlink_id = None
road_track_refs = properties.get("roadtrackorpathreference", [])
if road_track_refs and len(road_track_refs) > 0:
roadlink_id = road_track_refs[0].get("roadlinkid", "")
edge_id = len(self.edges) + 1
cost = properties.get("geometry_length", 100.0)
edge = RouteEdge(
id=edge_id,
road_id=roadlink_id or "NONE",
road_name=properties.get("name1_text"),
source_node_id=source_id,
target_node_id=target_id,
cost=cost,
reverse_cost=cost,
geometry=road_data.get("geometry"),
)
self.edges[edge_id] = edge
self.nodes[source_id].connected_edges.add(edge_id)
self.nodes[target_id].connected_edges.add(edge_id)
def get_connected_edges(self, node_id: int) -> List[RouteEdge]:
"""Get all edges connected to a node"""
if node_id not in self.nodes:
return []
return [self.edges[edge_id] for edge_id in self.nodes[node_id].connected_edges]
def get_summary(self) -> Dict[str, Any]:
"""Get network summary statistics"""
return {
"total_nodes": len(self.nodes),
"total_edges": len(self.edges),
"is_built": self.is_built,
"sample_nodes": [
{
"id": node.id,
"identifier": node.node_identifier,
"connected_edges": len(node.connected_edges),
}
for node in list(self.nodes.values())[:5]
],
}
def get_all_nodes(self) -> List[Dict[str, Any]]:
"""Get all nodes as a flat list"""
return [
{
"id": node.id,
"node_identifier": node.node_identifier,
"connected_edge_count": len(node.connected_edges),
"connected_edge_ids": list(node.connected_edges),
}
for node in self.nodes.values()
]
def get_all_edges(self) -> List[Dict[str, Any]]:
"""Get all edges as a flat list"""
return [
{
"id": edge.id,
"road_id": edge.road_id,
"road_name": edge.road_name,
"source_node_id": edge.source_node_id,
"target_node_id": edge.target_node_id,
"cost": edge.cost,
"reverse_cost": edge.reverse_cost,
"geometry": edge.geometry,
}
for edge in self.edges.values()
]
class OSRoutingService:
"""Service to build and query routing networks from OS NGD data"""
def __init__(self, api_client):
self.api_client = api_client
self.network = InMemoryRoutingNetwork()
self.raw_restrictions: List[Dict[str, Any]] = []
async def _fetch_restriction_data(
self,
bbox: Optional[str] = None,
limit: int = 100,
) -> List[Dict[str, Any]]:
"""Fetch raw restriction data"""
try:
logger.debug("Fetching restriction data...")
params = {
"limit": min(limit, 100),
"crs": "http://www.opengis.net/def/crs/EPSG/0/4326",
}
if bbox:
params["bbox"] = bbox
restriction_data = await self.api_client.make_request(
"COLLECTION_FEATURES",
params=params,
path_params=["trn-rami-restriction-1"],
)
features = restriction_data.get("features", [])
logger.debug(f"Fetched {len(features)} restriction features")
return features
except Exception as e:
logger.error(f"Error fetching restriction data: {e}")
return []
async def build_routing_network(
self,
bbox: Optional[str] = None,
limit: int = 1000,
include_restrictions: bool = True,
) -> Dict[str, Any]:
"""Build the routing network from OS NGD road links with optional restriction data"""
try:
logger.debug("Building routing network from OS NGD data...")
if include_restrictions:
self.raw_restrictions = await self._fetch_restriction_data(bbox, limit)
params = {
"limit": min(limit, 100),
"crs": "http://www.opengis.net/def/crs/EPSG/0/4326",
}
if bbox:
params["bbox"] = bbox
road_links_data = await self.api_client.make_request(
"COLLECTION_FEATURES",
params=params,
path_params=["trn-ntwk-roadlink-4"],
)
features = road_links_data.get("features", [])
logger.debug(f"Processing {len(features)} road links...")
for feature in features:
self.network.add_edge(feature)
self.network.is_built = True
summary = self.network.get_summary()
logger.debug(
f"Network built: {summary['total_nodes']} nodes, {summary['total_edges']} edges"
)
return {
"status": "success",
"message": f"Built routing network with {summary['total_nodes']} nodes and {summary['total_edges']} edges",
"network_summary": summary,
"restrictions": self.raw_restrictions if include_restrictions else [],
"restriction_count": len(self.raw_restrictions)
if include_restrictions
else 0,
}
except Exception as e:
logger.error(f"Error building routing network: {e}")
return {"status": "error", "error": str(e)}
def get_network_info(self) -> Dict[str, Any]:
"""Get current network information"""
return {"status": "success", "network": self.network.get_summary()}
def get_flat_nodes(self) -> Dict[str, Any]:
"""Get flat list of all nodes"""
if not self.network.is_built:
return {
"status": "error",
"error": "Routing network not built. Call build_routing_network first.",
}
return {"status": "success", "nodes": self.network.get_all_nodes()}
def get_flat_edges(self) -> Dict[str, Any]:
"""Get flat list of all edges with connections"""
if not self.network.is_built:
return {
"status": "error",
"error": "Routing network not built. Call build_routing_network first.",
}
return {"status": "success", "edges": self.network.get_all_edges()}
def get_routing_tables(self) -> Dict[str, Any]:
"""Get both nodes and edges as flat tables"""
if not self.network.is_built:
return {
"status": "error",
"error": "Routing network not built. Call build_routing_network first.",
}
return {
"status": "success",
"nodes": self.network.get_all_nodes(),
"edges": self.network.get_all_edges(),
"summary": self.network.get_summary(),
"restrictions": self.raw_restrictions,
}
```
--------------------------------------------------------------------------------
/src/api_service/os_api.py:
--------------------------------------------------------------------------------
```python
import os
import aiohttp
import asyncio
import re
import concurrent.futures
import threading
from typing import Dict, List, Any, Optional
from models import (
NGDAPIEndpoint,
OpenAPISpecification,
Collection,
CollectionsCache,
CollectionQueryables,
)
from api_service.protocols import APIClient
from utils.logging_config import get_logger
logger = get_logger(__name__)
class OSAPIClient(APIClient):
"""Implementation an OS API client"""
user_agent = "os-ngd-mcp-server/1.0"
def __init__(self, api_key: Optional[str] = None, max_concurrent_requests: int = 5):
"""
Initialise the OS API client
Args:
api_key: Optional API key, if not provided will use OS_API_KEY env var
max_concurrent_requests: Maximum number of concurrent requests (default: 5)
"""
self.api_key = api_key
self.session = None
self.last_request_time = 0
# Configurable rate limiting - reduced from 0.7s to 0.3s for better performance
# Can be adjusted based on API quota
self.request_delay = float(os.environ.get("OS_API_RATE_LIMIT", "0.3"))
self._request_semaphore = asyncio.Semaphore(max_concurrent_requests)
self._cached_openapi_spec: Optional[OpenAPISpecification] = None
self._cached_collections: Optional[CollectionsCache] = None
# Private helper methods
def _sanitise_api_key(self, text: Any) -> str:
"""Remove API keys from any text (URLs, error messages, etc.)"""
if not isinstance(text, str):
return text
patterns = [
r"[?&]key=[^&\s]*",
r"[?&]api_key=[^&\s]*",
r"[?&]apikey=[^&\s]*",
r"[?&]token=[^&\s]*",
]
sanitized = text
for pattern in patterns:
sanitized = re.sub(pattern, "", sanitized, flags=re.IGNORECASE)
sanitized = re.sub(r"[?&]$", "", sanitized)
sanitized = re.sub(r"&{2,}", "&", sanitized)
sanitized = re.sub(r"\?&", "?", sanitized)
return sanitized
def _sanitise_response(self, data: Any) -> Any:
"""Remove API keys from response data recursively"""
if isinstance(data, dict):
sanitized_dict = {}
for key, value in data.items():
if isinstance(value, str) and any(
url_indicator in key.lower()
for url_indicator in ["href", "url", "link", "uri"]
):
sanitized_dict[key] = self._sanitise_api_key(value)
elif isinstance(value, (dict, list)):
sanitized_dict[key] = self._sanitise_response(value)
else:
sanitized_dict[key] = value
return sanitized_dict
elif isinstance(data, list):
return [self._sanitise_response(item) for item in data]
elif isinstance(data, str):
if any(
indicator in data
for indicator in [
"http://",
"https://",
"key=",
"api_key=",
"apikey=",
"token=",
]
):
return self._sanitise_api_key(data)
return data
def _filter_latest_collections(
self, collections: List[Dict[str, Any]]
) -> List[Collection]:
"""
Filter collections to keep only the latest version of each collection type.
For collections with IDs like 'trn-ntwk-roadlink-1', 'trn-ntwk-roadlink-2', 'trn-ntwk-roadlink-3',
only keep the one with the highest number.
Args:
collections: Raw collections from API
Returns:
Filtered list of Collection objects
"""
latest_versions: Dict[str, Dict[str, Any]] = {}
for col in collections:
col_id = col.get("id", "")
match = re.match(r"^(.+?)-(\d+)$", col_id)
if match:
base_name = match.group(1)
version_num = int(match.group(2))
if (
base_name not in latest_versions
or version_num > latest_versions[base_name]["version"]
):
latest_versions[base_name] = {"version": version_num, "data": col}
else:
latest_versions[col_id] = {"version": 0, "data": col}
filtered_collections = []
for item in latest_versions.values():
col_data = item["data"]
filtered_collections.append(
Collection(
id=col_data.get("id", ""),
title=col_data.get("title", ""),
description=col_data.get("description", ""),
links=col_data.get("links", []),
extent=col_data.get("extent", {}),
itemType=col_data.get("itemType", "feature"),
)
)
return filtered_collections
def _parse_openapi_spec_for_llm(
self, spec_data: dict, collection_ids: List[str]
) -> dict:
"""Parse OpenAPI spec to extract only essential information for LLM context"""
supported_crs = {
"input": [],
"output": [],
"default": "http://www.opengis.net/def/crs/OGC/1.3/CRS84",
}
parsed = {
"title": spec_data.get("info", {}).get("title", ""),
"version": spec_data.get("info", {}).get("version", ""),
"base_url": spec_data.get("servers", [{}])[0].get("url", ""),
"endpoints": {},
"collection_ids": collection_ids,
"supported_crs": supported_crs,
}
paths = spec_data.get("paths", {})
for path, methods in paths.items():
for method, details in methods.items():
if method == "get" and "parameters" in details:
for param in details["parameters"]:
param_name = param.get("name", "")
if param_name == "collectionId" and "schema" in param:
enum_values = param["schema"].get("enum", [])
if enum_values:
parsed["collection_ids"] = enum_values
elif (
param_name in ["bbox-crs", "filter-crs"]
and "schema" in param
):
crs_values = param["schema"].get("enum", [])
if crs_values and not supported_crs["input"]:
supported_crs["input"] = crs_values
elif param_name == "crs" and "schema" in param:
crs_values = param["schema"].get("enum", [])
if crs_values and not supported_crs["output"]:
supported_crs["output"] = crs_values
endpoint_patterns = {
"/collections": "List all collections",
"/collections/{collectionId}": "Get collection info",
"/collections/{collectionId}/schema": "Get collection schema",
"/collections/{collectionId}/queryables": "Get collection queryables",
"/collections/{collectionId}/items": "Search features in collection",
"/collections/{collectionId}/items/{featureId}": "Get specific feature",
}
parsed["endpoints"] = endpoint_patterns
parsed["crs_guide"] = {
"WGS84": "http://www.opengis.net/def/crs/OGC/1.3/CRS84 (default, longitude/latitude)",
"British_National_Grid": "http://www.opengis.net/def/crs/EPSG/0/27700 (UK Ordnance Survey)",
"WGS84_latlon": "http://www.opengis.net/def/crs/EPSG/0/4326 (latitude/longitude)",
"Web_Mercator": "http://www.opengis.net/def/crs/EPSG/0/3857 (Web mapping)",
}
return parsed
# Private async methods
async def _get_open_api_spec(self) -> OpenAPISpecification:
"""Get the OpenAPI spec for the OS NGD API"""
try:
response = await self.make_request("OPENAPI_SPEC", params={"f": "json"})
# Sanitize the raw response before processing
sanitized_response = self._sanitise_response(response)
collections_cache = await self.cache_collections()
filtered_collection_ids = [col.id for col in collections_cache.collections]
parsed_spec = self._parse_openapi_spec_for_llm(
sanitized_response, filtered_collection_ids
)
spec = OpenAPISpecification(
title=parsed_spec["title"],
version=parsed_spec["version"],
base_url=parsed_spec["base_url"],
endpoints=parsed_spec["endpoints"],
collection_ids=filtered_collection_ids,
supported_crs=parsed_spec["supported_crs"],
crs_guide=parsed_spec["crs_guide"],
)
return spec
except Exception as e:
raise ValueError(f"Failed to get OpenAPI spec: {e}")
async def cache_openapi_spec(self) -> OpenAPISpecification:
"""
Cache the OpenAPI spec.
Returns:
The cached OpenAPI spec
"""
if self._cached_openapi_spec is None:
logger.debug("Caching OpenAPI spec for LLM context...")
try:
self._cached_openapi_spec = await self._get_open_api_spec()
logger.debug("OpenAPI spec successfully cached")
except Exception as e:
raise ValueError(f"Failed to cache OpenAPI spec: {e}")
return self._cached_openapi_spec
async def _get_collections(self) -> CollectionsCache:
"""Get all collections from the OS NGD API"""
try:
response = await self.make_request("COLLECTIONS")
collections_list = response.get("collections", [])
filtered = self._filter_latest_collections(collections_list)
logger.debug(f"Filtered collections: {len(filtered)} collections")
return CollectionsCache(collections=filtered, raw_response=response)
except Exception as e:
sanitized_error = self._sanitise_api_key(str(e))
logger.error(f"Error getting collections: {sanitized_error}")
raise ValueError(f"Failed to get collections: {sanitized_error}")
async def cache_collections(self) -> CollectionsCache:
"""
Cache the collections data with filtering applied.
Returns:
The cached collections
"""
if self._cached_collections is None:
logger.debug("Caching collections for LLM context...")
try:
self._cached_collections = await self._get_collections()
logger.debug(
f"Collections successfully cached - {len(self._cached_collections.collections)} collections after filtering"
)
except Exception as e:
sanitized_error = self._sanitise_api_key(str(e))
raise ValueError(f"Failed to cache collections: {sanitized_error}")
return self._cached_collections
async def fetch_collections_queryables(
self, collection_ids: List[str]
) -> Dict[str, CollectionQueryables]:
"""Fetch detailed queryables for specific collections only"""
if not collection_ids:
return {}
logger.debug(f"Fetching queryables for specific collections: {collection_ids}")
collections_cache = await self.cache_collections()
collections_map = {coll.id: coll for coll in collections_cache.collections}
tasks = [
self.make_request("COLLECTION_QUERYABLES", path_params=[collection_id])
for collection_id in collection_ids
if collection_id in collections_map
]
if not tasks:
return {}
raw_queryables = await asyncio.gather(*tasks, return_exceptions=True)
def process_single_collection_queryables(collection_id, queryables_data):
collection = collections_map[collection_id]
logger.debug(
f"Processing collection {collection.id} in thread {threading.current_thread().name}"
)
if isinstance(queryables_data, Exception):
logger.warning(
f"Failed to fetch queryables for {collection.id}: {queryables_data}"
)
return (
collection.id,
CollectionQueryables(
id=collection.id,
title=collection.title,
description=collection.description,
all_queryables={},
enum_queryables={},
has_enum_filters=False,
total_queryables=0,
enum_count=0,
),
)
all_queryables = {}
enum_queryables = {}
properties = queryables_data.get("properties", {})
for prop_name, prop_details in properties.items():
prop_type = prop_details.get("type", ["string"])
if isinstance(prop_type, list):
main_type = prop_type[0] if prop_type else "string"
is_nullable = "null" in prop_type
else:
main_type = prop_type
is_nullable = False
all_queryables[prop_name] = {
"type": main_type,
"nullable": is_nullable,
"max_length": prop_details.get("maxLength"),
"format": prop_details.get("format"),
"pattern": prop_details.get("pattern"),
"minimum": prop_details.get("minimum"),
"maximum": prop_details.get("maximum"),
"is_enum": prop_details.get("enumeration", False),
}
if prop_details.get("enumeration") and "enum" in prop_details:
enum_queryables[prop_name] = {
"values": prop_details["enum"],
"type": main_type,
"nullable": is_nullable,
"max_length": prop_details.get("maxLength"),
}
all_queryables[prop_name]["enum_values"] = prop_details["enum"]
all_queryables[prop_name] = {
k: v for k, v in all_queryables[prop_name].items() if v is not None
}
return (
collection.id,
CollectionQueryables(
id=collection.id,
title=collection.title,
description=collection.description,
all_queryables=all_queryables,
enum_queryables=enum_queryables,
has_enum_filters=len(enum_queryables) > 0,
total_queryables=len(all_queryables),
enum_count=len(enum_queryables),
),
)
with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
collection_data_pairs = list(zip(collection_ids, raw_queryables))
processed = await asyncio.get_event_loop().run_in_executor(
executor,
lambda: [
process_single_collection_queryables(coll_id, data)
for coll_id, data in collection_data_pairs
if coll_id in collections_map
],
)
return {coll_id: queryables for coll_id, queryables in processed}
async def initialise(self):
"""Initialise the aiohttp session if not already created"""
if self.session is None:
self.session = aiohttp.ClientSession(
connector=aiohttp.TCPConnector(
limit=10,
limit_per_host=5,
ttl_dns_cache=300,
force_close=False,
enable_cleanup_closed=True,
)
)
async def close(self):
"""Close the session when done"""
if self.session:
await self.session.close()
self.session = None
self._cached_openapi_spec = None
self._cached_collections = None
async def get_api_key(self) -> str:
"""Get the OS API key from environment variable or init param."""
if self.api_key:
return self.api_key
api_key = os.environ.get("OS_API_KEY")
if not api_key:
raise ValueError("OS_API_KEY environment variable is not set")
return api_key
async def make_request(
self,
endpoint: str,
params: Optional[Dict[str, Any]] = None,
path_params: Optional[List[str]] = None,
max_retries: int = 2,
) -> Dict[str, Any]:
"""
Make a request to the OS NGD API with proper error handling.
Args:
endpoint: Enum endpoint to use
params: Additional query parameters
path_params: Parameters to format into the URL path
max_retries: Maximum number of retries for transient errors
Returns:
JSON response as dictionary
"""
await self.initialise()
if self.session is None:
raise ValueError("Session not initialised")
async with self._request_semaphore:
current_time = asyncio.get_event_loop().time()
elapsed = current_time - self.last_request_time
if elapsed < self.request_delay:
await asyncio.sleep(self.request_delay - elapsed)
try:
endpoint_value = NGDAPIEndpoint[endpoint].value
except KeyError:
raise ValueError(f"Invalid endpoint: {endpoint}")
if path_params:
endpoint_value = endpoint_value.format(*path_params)
api_key = await self.get_api_key()
request_params = params or {}
request_params["key"] = api_key
headers = {"User-Agent": self.user_agent, "Accept": "application/json"}
client_ip = getattr(self.session, "_source_address", None)
client_info = f" from {client_ip}" if client_ip else ""
sanitized_url = self._sanitise_api_key(endpoint_value)
logger.info(f"Requesting URL: {sanitized_url}{client_info}")
for attempt in range(1, max_retries + 1):
try:
self.last_request_time = asyncio.get_event_loop().time()
timeout = aiohttp.ClientTimeout(total=30.0)
async with self.session.get(
endpoint_value,
params=request_params,
headers=headers,
timeout=timeout,
) as response:
if response.status >= 400:
error_text = await response.text()
sanitized_error = self._sanitise_api_key(error_text)
error_message = (
f"HTTP Error: {response.status} - {sanitized_error}"
)
logger.error(f"Error: {error_message}")
raise ValueError(error_message)
response_data = await response.json()
return self._sanitise_response(response_data)
except (aiohttp.ClientError, asyncio.TimeoutError) as e:
if attempt == max_retries:
sanitized_exception = self._sanitise_api_key(str(e))
error_message = f"Request failed after {max_retries} attempts: {sanitized_exception}"
logger.error(f"Error: {error_message}")
raise ValueError(error_message)
else:
await asyncio.sleep(0.5 * attempt)
except Exception as e:
sanitized_exception = self._sanitise_api_key(str(e))
error_message = f"Request failed: {sanitized_exception}"
logger.error(f"Error: {error_message}")
raise ValueError(error_message)
raise RuntimeError(
"Unreachable: make_request exited retry loop without returning or raising"
)
async def make_request_no_auth(
self,
url: str,
params: Optional[Dict[str, Any]] = None,
max_retries: int = 2,
) -> str:
"""
Make a request without authentication (for public endpoints like documentation).
Args:
url: Full URL to request
params: Additional query parameters
max_retries: Maximum number of retries for transient errors
Returns:
Response text (not JSON parsed)
"""
await self.initialise()
if self.session is None:
raise ValueError("Session not initialised")
current_time = asyncio.get_event_loop().time()
elapsed = current_time - self.last_request_time
if elapsed < self.request_delay:
await asyncio.sleep(self.request_delay - elapsed)
request_params = params or {}
headers = {"User-Agent": self.user_agent}
logger.info(f"Requesting URL (no auth): {url}")
for attempt in range(1, max_retries + 1):
try:
self.last_request_time = asyncio.get_event_loop().time()
timeout = aiohttp.ClientTimeout(total=30.0)
async with self.session.get(
url,
params=request_params,
headers=headers,
timeout=timeout,
) as response:
if response.status >= 400:
error_text = await response.text()
error_message = f"HTTP Error: {response.status} - {error_text}"
logger.error(f"Error: {error_message}")
raise ValueError(error_message)
return await response.text()
except (aiohttp.ClientError, asyncio.TimeoutError) as e:
if attempt == max_retries:
error_message = (
f"Request failed after {max_retries} attempts: {str(e)}"
)
logger.error(f"Error: {error_message}")
raise ValueError(error_message)
else:
await asyncio.sleep(0.7)
except Exception as e:
error_message = f"Request failed: {str(e)}"
logger.error(f"Error: {error_message}")
raise ValueError(error_message)
raise RuntimeError(
"Unreachable: make_request_no_auth exited retry loop without returning or raising"
)
```
--------------------------------------------------------------------------------
/src/config_docs/ogcapi-features-1.yaml:
--------------------------------------------------------------------------------
```yaml
openapi: 3.0.2
info:
title: "Building Blocks specified in OGC API - Features - Part 1: Core"
description: |-
Common components used in the
[OGC standard "OGC API - Features - Part 1: Core"](http://docs.opengeospatial.org/is/17-069r3/17-069r3.html).
OGC API - Features - Part 1: Core 1.0 is an OGC Standard.
Copyright (c) 2019 Open Geospatial Consortium.
To obtain additional rights of use, visit http://www.opengeospatial.org/legal/ .
This document is also available on
[OGC](http://schemas.opengis.net/ogcapi/features/part1/1.0/openapi/ogcapi-features-1.yaml).
version: '1.0.0'
contact:
name: Clemens Portele
email: [email protected]
license:
name: OGC License
url: 'http://www.opengeospatial.org/legal/'
components:
parameters:
bbox:
name: bbox
in: query
description: |-
Only features that have a geometry that intersects the bounding box are selected.
The bounding box is provided as four or six numbers, depending on whether the
coordinate reference system includes a vertical axis (height or depth):
* Lower left corner, coordinate axis 1
* Lower left corner, coordinate axis 2
* Minimum value, coordinate axis 3 (optional)
* Upper right corner, coordinate axis 1
* Upper right corner, coordinate axis 2
* Maximum value, coordinate axis 3 (optional)
If the value consists of four numbers, the coordinate reference system is
WGS 84 longitude/latitude (http://www.opengis.net/def/crs/OGC/1.3/CRS84)
unless a different coordinate reference system is specified in the parameter `bbox-crs`.
If the value consists of six numbers, the coordinate reference system is WGS 84
longitude/latitude/ellipsoidal height (http://www.opengis.net/def/crs/OGC/0/CRS84h)
unless a different coordinate reference system is specified in the parameter `bbox-crs`.
The query parameter `bbox-crs` is specified in OGC API - Features - Part 2: Coordinate
Reference Systems by Reference.
For WGS 84 longitude/latitude the values are in most cases the sequence of
minimum longitude, minimum latitude, maximum longitude and maximum latitude.
However, in cases where the box spans the antimeridian the first value
(west-most box edge) is larger than the third value (east-most box edge).
If the vertical axis is included, the third and the sixth number are
the bottom and the top of the 3-dimensional bounding box.
If a feature has multiple spatial geometry properties, it is the decision of the
server whether only a single spatial geometry property is used to determine
the extent or all relevant geometries.
required: false
schema:
type: array
oneOf:
- minItems: 4
maxItems: 4
- minItems: 6
maxItems: 6
items:
type: number
style: form
explode: false
collectionId:
name: collectionId
in: path
description: local identifier of a collection
required: true
schema:
type: string
datetime:
name: datetime
in: query
description: |-
Either a date-time or an interval. Date and time expressions adhere to RFC 3339.
Intervals may be bounded or half-bounded (double-dots at start or end).
Examples:
* A date-time: "2018-02-12T23:20:50Z"
* A bounded interval: "2018-02-12T00:00:00Z/2018-03-18T12:31:12Z"
* Half-bounded intervals: "2018-02-12T00:00:00Z/.." or "../2018-03-18T12:31:12Z"
Only features that have a temporal property that intersects the value of
`datetime` are selected.
If a feature has multiple temporal properties, it is the decision of the
server whether only a single temporal property is used to determine
the extent or all relevant temporal properties.
required: false
schema:
type: string
style: form
explode: false
featureId:
name: featureId
in: path
description: local identifier of a feature
required: true
schema:
type: string
limit:
name: limit
in: query
description: |-
The optional limit parameter limits the number of items that are presented in the response document.
Only items are counted that are on the first level of the collection in the response document.
Nested objects contained within the explicitly requested items shall not be counted.
Minimum = 1. Maximum = 10000. Default = 10.
required: false
schema:
type: integer
minimum: 1
maximum: 10000
default: 10
style: form
explode: false
schemas:
collection:
type: object
required:
- id
- links
properties:
id:
description: identifier of the collection used, for example, in URIs
type: string
example: address
title:
description: human readable title of the collection
type: string
example: address
description:
description: a description of the features in the collection
type: string
example: An address.
links:
type: array
items:
$ref: "#/components/schemas/link"
example:
- href: http://data.example.com/buildings
rel: item
- href: http://example.com/concepts/buildings.html
rel: describedby
type: text/html
extent:
$ref: "#/components/schemas/extent"
itemType:
description: indicator about the type of the items in the collection (the default value is 'feature').
type: string
default: feature
crs:
description: the list of coordinate reference systems supported by the service
type: array
items:
type: string
default:
- http://www.opengis.net/def/crs/OGC/1.3/CRS84
example:
- http://www.opengis.net/def/crs/OGC/1.3/CRS84
- http://www.opengis.net/def/crs/EPSG/0/4326
collections:
type: object
required:
- links
- collections
properties:
links:
type: array
items:
$ref: "#/components/schemas/link"
collections:
type: array
items:
$ref: "#/components/schemas/collection"
confClasses:
type: object
required:
- conformsTo
properties:
conformsTo:
type: array
items:
type: string
exception:
type: object
description: |-
Information about the exception: an error code plus an optional description.
required:
- code
properties:
code:
type: string
description:
type: string
extent:
type: object
description: |-
The extent of the features in the collection. In the Core only spatial and temporal
extents are specified. Extensions may add additional members to represent other
extents, for example, thermal or pressure ranges.
properties:
spatial:
description: |-
The spatial extent of the features in the collection.
type: object
properties:
bbox:
description: |-
One or more bounding boxes that describe the spatial extent of the dataset.
In the Core only a single bounding box is supported. Extensions may support
additional areas. If multiple areas are provided, the union of the bounding
boxes describes the spatial extent.
type: array
minItems: 1
items:
description: |-
Each bounding box is provided as four or six numbers, depending on
whether the coordinate reference system includes a vertical axis
(height or depth):
* Lower left corner, coordinate axis 1
* Lower left corner, coordinate axis 2
* Minimum value, coordinate axis 3 (optional)
* Upper right corner, coordinate axis 1
* Upper right corner, coordinate axis 2
* Maximum value, coordinate axis 3 (optional)
The coordinate reference system of the values is WGS 84 longitude/latitude
(http://www.opengis.net/def/crs/OGC/1.3/CRS84) unless a different coordinate
reference system is specified in `crs`.
For WGS 84 longitude/latitude the values are in most cases the sequence of
minimum longitude, minimum latitude, maximum longitude and maximum latitude.
However, in cases where the box spans the antimeridian the first value
(west-most box edge) is larger than the third value (east-most box edge).
If the vertical axis is included, the third and the sixth number are
the bottom and the top of the 3-dimensional bounding box.
If a feature has multiple spatial geometry properties, it is the decision of the
server whether only a single spatial geometry property is used to determine
the extent or all relevant geometries.
type: array
oneOf:
- minItems: 4
maxItems: 4
- minItems: 6
maxItems: 6
items:
type: number
example:
- -180
- -90
- 180
- 90
crs:
description: |-
Coordinate reference system of the coordinates in the spatial extent
(property `bbox`). The default reference system is WGS 84 longitude/latitude.
In the Core this is the only supported coordinate reference system.
Extensions may support additional coordinate reference systems and add
additional enum values.
type: string
enum:
- 'http://www.opengis.net/def/crs/OGC/1.3/CRS84'
default: 'http://www.opengis.net/def/crs/OGC/1.3/CRS84'
temporal:
description: |-
The temporal extent of the features in the collection.
type: object
properties:
interval:
description: |-
One or more time intervals that describe the temporal extent of the dataset.
The value `null` is supported and indicates an unbounded interval end.
In the Core only a single time interval is supported. Extensions may support
multiple intervals. If multiple intervals are provided, the union of the
intervals describes the temporal extent.
type: array
minItems: 1
items:
description: |-
Begin and end times of the time interval. The timestamps are in the
temporal coordinate reference system specified in `trs`. By default
this is the Gregorian calendar.
type: array
minItems: 2
maxItems: 2
items:
type: string
format: date-time
nullable: true
example:
- '2011-11-11T12:22:11Z'
- null
trs:
description: |-
Coordinate reference system of the coordinates in the temporal extent
(property `interval`). The default reference system is the Gregorian calendar.
In the Core this is the only supported temporal coordinate reference system.
Extensions may support additional temporal coordinate reference systems and add
additional enum values.
type: string
enum:
- 'http://www.opengis.net/def/uom/ISO-8601/0/Gregorian'
default: 'http://www.opengis.net/def/uom/ISO-8601/0/Gregorian'
featureCollectionGeoJSON:
type: object
required:
- type
- features
properties:
type:
type: string
enum:
- FeatureCollection
features:
type: array
items:
$ref: "#/components/schemas/featureGeoJSON"
links:
type: array
items:
$ref: "#/components/schemas/link"
timeStamp:
$ref: "#/components/schemas/timeStamp"
numberMatched:
$ref: "#/components/schemas/numberMatched"
numberReturned:
$ref: "#/components/schemas/numberReturned"
featureGeoJSON:
type: object
required:
- type
- geometry
- properties
properties:
type:
type: string
enum:
- Feature
geometry:
$ref: "#/components/schemas/geometryGeoJSON"
properties:
type: object
nullable: true
id:
oneOf:
- type: string
- type: integer
links:
type: array
items:
$ref: "#/components/schemas/link"
geometryGeoJSON:
oneOf:
- $ref: "#/components/schemas/pointGeoJSON"
- $ref: "#/components/schemas/multipointGeoJSON"
- $ref: "#/components/schemas/linestringGeoJSON"
- $ref: "#/components/schemas/multilinestringGeoJSON"
- $ref: "#/components/schemas/polygonGeoJSON"
- $ref: "#/components/schemas/multipolygonGeoJSON"
- $ref: "#/components/schemas/geometrycollectionGeoJSON"
geometrycollectionGeoJSON:
type: object
required:
- type
- geometries
properties:
type:
type: string
enum:
- GeometryCollection
geometries:
type: array
items:
$ref: "#/components/schemas/geometryGeoJSON"
landingPage:
type: object
required:
- links
properties:
title:
type: string
example: Buildings in Bonn
description:
type: string
example: Access to data about buildings in the city of Bonn via a Web API that conforms to the OGC API Features specification.
links:
type: array
items:
$ref: "#/components/schemas/link"
linestringGeoJSON:
type: object
required:
- type
- coordinates
properties:
type:
type: string
enum:
- LineString
coordinates:
type: array
minItems: 2
items:
type: array
minItems: 2
items:
type: number
link:
type: object
required:
- href
properties:
href:
type: string
example: http://data.example.com/buildings/123
rel:
type: string
example: alternate
type:
type: string
example: application/geo+json
hreflang:
type: string
example: en
title:
type: string
example: Trierer Strasse 70, 53115 Bonn
length:
type: integer
multilinestringGeoJSON:
type: object
required:
- type
- coordinates
properties:
type:
type: string
enum:
- MultiLineString
coordinates:
type: array
items:
type: array
minItems: 2
items:
type: array
minItems: 2
items:
type: number
multipointGeoJSON:
type: object
required:
- type
- coordinates
properties:
type:
type: string
enum:
- MultiPoint
coordinates:
type: array
items:
type: array
minItems: 2
items:
type: number
multipolygonGeoJSON:
type: object
required:
- type
- coordinates
properties:
type:
type: string
enum:
- MultiPolygon
coordinates:
type: array
items:
type: array
items:
type: array
minItems: 4
items:
type: array
minItems: 2
items:
type: number
numberMatched:
description: |-
The number of features of the feature type that match the selection
parameters like `bbox`.
type: integer
minimum: 0
example: 127
numberReturned:
description: |-
The number of features in the feature collection.
A server may omit this information in a response, if the information
about the number of features is not known or difficult to compute.
If the value is provided, the value shall be identical to the number
of items in the "features" array.
type: integer
minimum: 0
example: 10
pointGeoJSON:
type: object
required:
- type
- coordinates
properties:
type:
type: string
enum:
- Point
coordinates:
type: array
minItems: 2
items:
type: number
polygonGeoJSON:
type: object
required:
- type
- coordinates
properties:
type:
type: string
enum:
- Polygon
coordinates:
type: array
items:
type: array
minItems: 4
items:
type: array
minItems: 2
items:
type: number
timeStamp:
description: This property indicates the time and date when the response was generated.
type: string
format: date-time
example: '2017-08-17T08:05:32Z'
responses:
LandingPage:
description: |-
The landing page provides links to the API definition
(link relations `service-desc` and `service-doc`),
the Conformance declaration (path `/conformance`,
link relation `conformance`), and the Feature
Collections (path `/collections`, link relation
`data`).
content:
application/json:
schema:
$ref: '#/components/schemas/landingPage'
example:
title: Buildings in Bonn
description: Access to data about buildings in the city of Bonn via a Web API that conforms to the OGC API Features specification.
links:
- href: 'http://data.example.org/'
rel: self
type: application/json
title: this document
- href: 'http://data.example.org/api'
rel: service-desc
type: application/vnd.oai.openapi+json;version=3.0
title: the API definition
- href: 'http://data.example.org/api.html'
rel: service-doc
type: text/html
title: the API documentation
- href: 'http://data.example.org/conformance'
rel: conformance
type: application/json
title: OGC API conformance classes implemented by this server
- href: 'http://data.example.org/collections'
rel: data
type: application/json
title: Information about the feature collections
text/html:
schema:
type: string
ConformanceDeclaration:
description: |-
The URIs of all conformance classes supported by the server.
To support "generic" clients that want to access multiple
OGC API Features implementations - and not "just" a specific
API / server, the server declares the conformance
classes it implements and conforms to.
content:
application/json:
schema:
$ref: '#/components/schemas/confClasses'
example:
conformsTo:
- 'http://www.opengis.net/spec/ogcapi-features-1/1.0/conf/core'
- 'http://www.opengis.net/spec/ogcapi-features-1/1.0/conf/oas30'
- 'http://www.opengis.net/spec/ogcapi-features-1/1.0/conf/html'
- 'http://www.opengis.net/spec/ogcapi-features-1/1.0/conf/geojson'
text/html:
schema:
type: string
Collections:
description: |-
The feature collections shared by this API.
The dataset is organized as one or more feature collections. This resource
provides information about and access to the collections.
The response contains the list of collections. For each collection, a link
to the items in the collection (path `/collections/{collectionId}/items`,
link relation `items`) as well as key information about the collection.
This information includes:
* A local identifier for the collection that is unique for the dataset;
* A list of coordinate reference systems (CRS) in which geometries may be returned by the server. The first CRS is the default coordinate reference system (the default is always WGS 84 with axis order longitude/latitude);
* An optional title and description for the collection;
* An optional extent that can be used to provide an indication of the spatial and temporal extent of the collection - typically derived from the data;
* An optional indicator about the type of the items in the collection (the default value, if the indicator is not provided, is 'feature').
content:
application/json:
schema:
$ref: '#/components/schemas/collections'
example:
links:
- href: 'http://data.example.org/collections.json'
rel: self
type: application/json
title: this document
- href: 'http://data.example.org/collections.html'
rel: alternate
type: text/html
title: this document as HTML
- href: 'http://schemas.example.org/1.0/buildings.xsd'
rel: describedby
type: application/xml
title: GML application schema for Acme Corporation building data
- href: 'http://download.example.org/buildings.gpkg'
rel: enclosure
type: application/geopackage+sqlite3
title: Bulk download (GeoPackage)
length: 472546
collections:
- id: buildings
title: Buildings
description: Buildings in the city of Bonn.
extent:
spatial:
bbox:
- - 7.01
- 50.63
- 7.22
- 50.78
temporal:
interval:
- - '2010-02-15T12:34:56Z'
- null
links:
- href: 'http://data.example.org/collections/buildings/items'
rel: items
type: application/geo+json
title: Buildings
- href: 'http://data.example.org/collections/buildings/items.html'
rel: items
type: text/html
title: Buildings
- href: 'https://creativecommons.org/publicdomain/zero/1.0/'
rel: license
type: text/html
title: CC0-1.0
- href: 'https://creativecommons.org/publicdomain/zero/1.0/rdf'
rel: license
type: application/rdf+xml
title: CC0-1.0
text/html:
schema:
type: string
Collection:
description: |-
Information about the feature collection with id `collectionId`.
The response contains a link to the items in the collection
(path `/collections/{collectionId}/items`, link relation `items`)
as well as key information about the collection. This information
includes:
* A local identifier for the collection that is unique for the dataset;
* A list of coordinate reference systems (CRS) in which geometries may be returned by the server. The first CRS is the default coordinate reference system (the default is always WGS 84 with axis order longitude/latitude);
* An optional title and description for the collection;
* An optional extent that can be used to provide an indication of the spatial and temporal extent of the collection - typically derived from the data;
* An optional indicator about the type of the items in the collection (the default value, if the indicator is not provided, is 'feature').
content:
application/json:
schema:
$ref: '#/components/schemas/collection'
example:
id: buildings
title: Buildings
description: Buildings in the city of Bonn.
extent:
spatial:
bbox:
- - 7.01
- 50.63
- 7.22
- 50.78
temporal:
interval:
- - '2010-02-15T12:34:56Z'
- null
links:
- href: 'http://data.example.org/collections/buildings/items'
rel: items
type: application/geo+json
title: Buildings
- href: 'http://data.example.org/collections/buildings/items.html'
rel: items
type: text/html
title: Buildings
- href: 'https://creativecommons.org/publicdomain/zero/1.0/'
rel: license
type: text/html
title: CC0-1.0
- href: 'https://creativecommons.org/publicdomain/zero/1.0/rdf'
rel: license
type: application/rdf+xml
title: CC0-1.0
text/html:
schema:
type: string
Features:
description: |-
The response is a document consisting of features in the collection.
The features included in the response are determined by the server
based on the query parameters of the request. To support access to
larger collections without overloading the client, the API supports
paged access with links to the next page, if more features are selected
that the page size.
The `bbox` and `datetime` parameter can be used to select only a
subset of the features in the collection (the features that are in the
bounding box or time interval). The `bbox` parameter matches all features
in the collection that are not associated with a location, too. The
`datetime` parameter matches all features in the collection that are
not associated with a time stamp or interval, too.
The `limit` parameter may be used to control the subset of the
selected features that should be returned in the response, the page size.
Each page may include information about the number of selected and
returned features (`numberMatched` and `numberReturned`) as well as
links to support paging (link relation `next`).
content:
application/geo+json:
schema:
$ref: '#/components/schemas/featureCollectionGeoJSON'
example:
type: FeatureCollection
links:
- href: 'http://data.example.com/collections/buildings/items.json'
rel: self
type: application/geo+json
title: this document
- href: 'http://data.example.com/collections/buildings/items.html'
rel: alternate
type: text/html
title: this document as HTML
- href: 'http://data.example.com/collections/buildings/items.json&offset=10&limit=2'
rel: next
type: application/geo+json
title: next page
timeStamp: '2018-04-03T14:52:23Z'
numberMatched: 123
numberReturned: 2
features:
- type: Feature
id: '123'
geometry:
type: Polygon
coordinates:
- ...
properties:
function: residential
floors: '2'
lastUpdate: '2015-08-01T12:34:56Z'
- type: Feature
id: '132'
geometry:
type: Polygon
coordinates:
- ...
properties:
function: public use
floors: '10'
lastUpdate: '2013-12-03T10:15:37Z'
text/html:
schema:
type: string
Feature:
description: |-
fetch the feature with id `featureId` in the feature collection
with id `collectionId`
content:
application/geo+json:
schema:
$ref: '#/components/schemas/featureGeoJSON'
example:
type: Feature
links:
- href: 'http://data.example.com/id/building/123'
rel: canonical
title: canonical URI of the building
- href: 'http://data.example.com/collections/buildings/items/123.json'
rel: self
type: application/geo+json
title: this document
- href: 'http://data.example.com/collections/buildings/items/123.html'
rel: alternate
type: text/html
title: this document as HTML
- href: 'http://data.example.com/collections/buildings'
rel: collection
type: application/geo+json
title: the collection document
id: '123'
geometry:
type: Polygon
coordinates:
- ...
properties:
function: residential
floors: '2'
lastUpdate: '2015-08-01T12:34:56Z'
text/html:
schema:
type: string
InvalidParameter:
description: |-
A query parameter has an invalid value.
content:
application/json:
schema:
$ref: '#/components/schemas/exception'
text/html:
schema:
type: string
NotFound:
description: |-
The requested resource does not exist on the server. For example, a path parameter had an incorrect value.
NotAcceptable:
description: |-
Content negotiation failed. For example, the `Accept` header submitted in the request did not support any of the media types supported by the server for the requested resource.
ServerError:
description: |-
A server error occurred.
content:
application/json:
schema:
$ref: '#/components/schemas/exception'
text/html:
schema:
type: string
```
--------------------------------------------------------------------------------
/src/mcp_service/os_service.py:
--------------------------------------------------------------------------------
```python
import json
import asyncio
import functools
import re
from typing import Optional, List, Dict, Any, Union, Callable
from api_service.protocols import APIClient
from prompt_templates.prompt_templates import PROMPT_TEMPLATES
from mcp_service.protocols import MCPService, FeatureService
from mcp_service.guardrails import ToolGuardrails
from workflow_generator.workflow_planner import WorkflowPlanner
from utils.logging_config import get_logger
from mcp_service.resources import OSDocumentationResources
from mcp_service.prompts import OSWorkflowPrompts
from mcp_service.routing_service import OSRoutingService
logger = get_logger(__name__)
class OSDataHubService(FeatureService):
"""Implementation of the OS NGD API service with MCP"""
def __init__(
self, api_client: APIClient, mcp_service: MCPService, stdio_middleware=None
):
"""
Initialise the OS NGD service
Args:
api_client: API client implementation
mcp_service: MCP service implementation
stdio_middleware: Optional STDIO middleware for rate limiting
"""
self.api_client = api_client
self.mcp = mcp_service
self.stdio_middleware = stdio_middleware
self.workflow_planner: Optional[WorkflowPlanner] = None
self.guardrails = ToolGuardrails()
self.routing_service = OSRoutingService(api_client)
self.register_tools()
self.register_resources()
self.register_prompts()
# Register all the resources, tools, and prompts
def register_resources(self) -> None:
"""Register all MCP resources"""
doc_resources = OSDocumentationResources(self.mcp, self.api_client)
doc_resources.register_all()
def register_tools(self) -> None:
"""Register all MCP tools with guardrails and middleware"""
def apply_middleware(func: Callable) -> Callable:
wrapped = self.guardrails.basic_guardrails(func)
wrapped = self._require_workflow_context(wrapped)
if self.stdio_middleware:
wrapped = self.stdio_middleware.require_auth_and_rate_limit(wrapped)
return wrapped
# Apply middleware to ALL tools
self.get_workflow_context = self.mcp.tool()(
apply_middleware(self.get_workflow_context)
)
self.hello_world = self.mcp.tool()(apply_middleware(self.hello_world))
self.check_api_key = self.mcp.tool()(apply_middleware(self.check_api_key))
self.list_collections = self.mcp.tool()(apply_middleware(self.list_collections))
self.get_single_collection = self.mcp.tool()(
apply_middleware(self.get_single_collection)
)
self.get_single_collection_queryables = self.mcp.tool()(
apply_middleware(self.get_single_collection_queryables)
)
self.search_features = self.mcp.tool()(apply_middleware(self.search_features))
self.get_feature = self.mcp.tool()(apply_middleware(self.get_feature))
self.get_linked_identifiers = self.mcp.tool()(
apply_middleware(self.get_linked_identifiers)
)
self.get_bulk_features = self.mcp.tool()(
apply_middleware(self.get_bulk_features)
)
self.get_bulk_linked_features = self.mcp.tool()(
apply_middleware(self.get_bulk_linked_features)
)
self.get_prompt_templates = self.mcp.tool()(
apply_middleware(self.get_prompt_templates)
)
self.fetch_detailed_collections = self.mcp.tool()(
apply_middleware(self.fetch_detailed_collections)
)
self.get_routing_data = self.mcp.tool()(apply_middleware(self.get_routing_data))
def register_prompts(self) -> None:
"""Register all MCP prompts"""
workflow_prompts = OSWorkflowPrompts(self.mcp)
workflow_prompts.register_all()
def _strip_verbose(self, data: Dict[str, Any]) -> Dict[str, Any]:
"""
Remove geometry and other verbose fields from features to reduce response size.
Args:
data: GeoJSON FeatureCollection response
Returns:
Same data structure with verbose fields removed
"""
if "features" in data:
for feature in data["features"]:
if "geometry" in feature:
if feature["geometry"]:
feature["geometry"] = {
"type": feature["geometry"].get("type", "unknown"),
"_stripped": True,
"_note": "Geometry removed to reduce context size"
}
if "properties" in feature and isinstance(feature["properties"], dict):
if "sitetoaddressreference" in feature["properties"]:
ref_count = len(feature["properties"]["sitetoaddressreference"]) if isinstance(feature["properties"]["sitetoaddressreference"], list) else 1
feature["properties"]["sitetoaddressreference"] = {
"_stripped": True,
"_count": ref_count,
"_note": "Site to address references removed to reduce context size"
}
return data
def _summarise_features(self, data: Dict[str, Any], max_features: int = 5) -> Dict[str, Any]:
"""
Summarise feature collection to reduce context size.
Args:
data: GeoJSON FeatureCollection response
max_features: Maximum number of full features to include
Returns:
Summarised response with counts and sample features
"""
if "features" not in data:
return data
features = data["features"]
total_count = len(features)
if total_count <= max_features:
return data
summary = {
"type": data.get("type", "FeatureCollection"),
"summary": {
"total_features": total_count,
"showing_samples": max_features,
"note": f"Showing {max_features} of {total_count} features to reduce context size. Use offset/limit for pagination."
},
"features": features[:max_features],
"links": data.get("links", []),
"numberMatched": data.get("numberMatched"),
"numberReturned": max_features,
"_original_numberReturned": data.get("numberReturned"),
}
return summary
def run(self) -> None:
"""Run the MCP service"""
try:
self.mcp.run()
finally:
try:
try:
loop = asyncio.get_running_loop()
loop.create_task(self._cleanup())
except RuntimeError:
asyncio.run(self._cleanup())
except Exception as e:
logger.error(f"Error during cleanup: {e}")
async def _cleanup(self):
"""Async cleanup method"""
try:
if hasattr(self, "api_client") and self.api_client:
await self.api_client.close()
logger.debug("API client closed successfully")
except Exception as e:
logger.error(f"Error closing API client: {e}")
# Get the workflow context from the cached API client data
# TODO: Lots of work to do here to reduce the size of the context and make it more readable for the LLM but not sacrificing the information
async def get_workflow_context(self) -> str:
"""Get basic workflow context - no detailed queryables yet"""
try:
if self.workflow_planner is None:
collections_cache = await self.api_client.cache_collections()
basic_collections_info = {
coll.id: {
"id": coll.id,
"title": coll.title,
"description": coll.description,
# No queryables here - will be fetched on-demand
}
for coll in collections_cache.collections
}
self.workflow_planner = WorkflowPlanner(
await self.api_client.cache_openapi_spec(), basic_collections_info
)
context = self.workflow_planner.get_basic_context()
return json.dumps(
{
"CRITICAL_COLLECTION_LIST": sorted(
context["available_collections"].keys()
),
"MANDATORY_PLANNING_REQUIREMENT": {
"CRITICAL": "You MUST follow the 2-step planning process:",
"step_1": "Explain your complete plan listing which specific collections you will use and why",
"step_2": "Call fetch_detailed_collections('collection-id-1,collection-id-2') to get queryables for those collections BEFORE making search calls",
"required_explanation": {
"1": "Which collections you will use and why",
"2": "What you expect to find in those collections",
"3": "What your search strategy will be",
},
"workflow_enforcement": "Do not proceed with search_features until you have fetched detailed queryables",
"example_planning": "I will use 'lus-fts-site-1' for finding cinemas. Let me fetch its detailed queryables first...",
},
"available_collections": context[
"available_collections"
], # Basic info only - no queryables yet - this is to reduce the size of the context for the LLM
"openapi_spec": context["openapi_spec"].model_dump()
if context["openapi_spec"]
else None,
"TWO_STEP_WORKFLOW": {
"step_1": "Plan with basic collection info (no detailed queryables available yet)",
"step_2": "Use fetch_detailed_collections() to get queryables for your chosen collections",
"step_3": "Execute search_features with proper filters using the fetched queryables",
},
"AVAILABLE_TOOLS": {
"fetch_detailed_collections": "Get detailed queryables for specific collections: fetch_detailed_collections('lus-fts-site-1,trn-ntwk-street-1')",
"search_features": "Search features (requires detailed queryables first)",
},
"QUICK_FILTERING_GUIDE": {
"primary_tool": "search_features",
"key_parameter": "filter",
"enum_fields": "Use exact values from collection's enum_queryables (fetch these first!)",
"simple_fields": "Use direct values (e.g., usrn = 12345678)",
},
"COMMON_EXAMPLES": {
"workflow_example": "1) Explain plan → 2) fetch_detailed_collections('lus-fts-site-1') → 3) search_features with proper filter",
"cinema_search": "After fetching queryables: search_features(collection_id='lus-fts-site-1', filter=\"oslandusetertiarygroup = 'Cinema'\")",
},
"CRITICAL_RULES": {
"1": "ALWAYS explain your plan first",
"2": "ALWAYS call fetch_detailed_collections() before search_features",
"3": "Use exact enum values from the fetched enum_queryables",
"4": "Quote string values in single quotes",
},
}
)
except Exception as e:
logger.error(f"Error getting workflow context: {e}")
return json.dumps(
{"error": str(e), "instruction": "Proceed with available tools"}
)
def _require_workflow_context(self, func: Callable) -> Callable:
# Functions that don't need workflow context
skip_functions = {"get_workflow_context", "hello_world", "check_api_key"}
@functools.wraps(func)
async def wrapper(*args: Any, **kwargs: Any) -> Any:
if self.workflow_planner is None:
if func.__name__ in skip_functions:
return await func(*args, **kwargs)
else:
return json.dumps(
{
"error": "WORKFLOW CONTEXT REQUIRED",
"blocked_tool": func.__name__,
"required_action": "You must call 'get_workflow_context' first",
"message": "No tools are available until you get the workflow context. Please call get_workflow_context() now.",
}
)
return await func(*args, **kwargs)
return wrapper
# TODO: This is a bit of a hack - we need to improve the error handling and retry logic
# TODO: Could we actually spawn a seperate AI agent to handle the retry logic and return the result to the main agent?
def _add_retry_context(self, response_data: dict, tool_name: str) -> dict:
"""Add retry guidance to tool responses"""
if "error" in response_data:
response_data["retry_guidance"] = {
"tool": tool_name,
"MANDATORY_INSTRUCTION 1": "Review the error message and try again with corrected parameters",
"MANDATORY_INSTRUCTION 2": "YOU MUST call get_workflow_context() if you need to see available options again",
}
return response_data
# All the tools
async def hello_world(self, name: str) -> str:
"""Simple hello world tool for testing"""
return f"Hello, {name}! 👋"
async def check_api_key(self) -> str:
"""Check if the OS API key is available."""
try:
await self.api_client.get_api_key()
return json.dumps({"status": "success", "message": "OS_API_KEY is set!"})
except ValueError as e:
return json.dumps({"status": "error", "message": str(e)})
async def list_collections(
self,
) -> str:
"""
List all available feature collections in the OS NGD API.
Returns:
JSON string with collection info (id, title only)
"""
try:
data = await self.api_client.make_request("COLLECTIONS")
if not data or "collections" not in data:
return json.dumps({"error": "No collections found"})
collections = [
{"id": col.get("id"), "title": col.get("title")}
for col in data.get("collections", [])
]
return json.dumps({"collections": collections})
except Exception as e:
error_response = {"error": str(e)}
return json.dumps(
self._add_retry_context(error_response, "list_collections")
)
async def get_single_collection(
self,
collection_id: str,
) -> str:
"""
Get detailed information about a specific collection.
Args:
collection_id: The collection ID
Returns:
JSON string with collection information
"""
try:
data = await self.api_client.make_request(
"COLLECTION_INFO", path_params=[collection_id]
)
return json.dumps(data)
except Exception as e:
error_response = {"error": str(e)}
return json.dumps(
self._add_retry_context(error_response, "get_collection_info")
)
async def get_single_collection_queryables(
self,
collection_id: str,
) -> str:
"""
Get the list of queryable properties for a collection.
Args:
collection_id: The collection ID
Returns:
JSON string with queryable properties
"""
try:
data = await self.api_client.make_request(
"COLLECTION_QUERYABLES", path_params=[collection_id]
)
return json.dumps(data)
except Exception as e:
error_response = {"error": str(e)}
return json.dumps(
self._add_retry_context(error_response, "get_collection_queryables")
)
async def search_features(
self,
collection_id: str,
bbox: Optional[str] = None,
crs: Optional[str] = None,
limit: int = 10,
offset: int = 0,
filter: Optional[str] = None,
filter_lang: Optional[str] = "cql-text",
query_attr: Optional[str] = None,
query_attr_value: Optional[str] = None,
exclude_geometry: bool = True,
summarize: bool = False,
max_features_in_summary: int = 5,
) -> str:
"""
Search for features in a collection with full CQL2 filter support.
Args:
collection_id: The collection to search
bbox: Bounding box filter
crs: Coordinate reference system
limit: Max features to return from API (default: 10, max: 100)
offset: Offset for pagination
filter: CQL2 filter expression
filter_lang: Filter language (default: cql-text)
query_attr: Simple attribute query field name
query_attr_value: Simple attribute query value
exclude_geometry: Strip geometry to reduce context size (default: True)
summarize: Return summary with sample features instead of all (default: False)
max_features_in_summary: Max features to show if summarizing (default: 5)
Returns:
JSON string with features or summary
"""
try:
params: Dict[str, Union[str, int]] = {}
if limit:
params["limit"] = min(limit, 100)
if offset:
params["offset"] = max(0, offset)
if bbox:
params["bbox"] = bbox
if crs:
params["crs"] = crs
if filter:
if len(filter) > 1000:
raise ValueError("Filter too long")
dangerous_patterns = [
r";\s*--",
r";\s*/\*",
r"\bUNION\b",
r"\bSELECT\b",
r"\bINSERT\b",
r"\bUPDATE\b",
r"\bDELETE\b",
r"\bDROP\b",
r"\bCREATE\b",
r"\bALTER\b",
r"\bTRUNCATE\b",
r"\bEXEC\b",
r"\bEXECUTE\b",
r"\bSP_\b",
r"\bXP_\b",
r"<script\b",
r"javascript:",
r"vbscript:",
r"onload\s*=",
r"onerror\s*=",
r"onclick\s*=",
r"\beval\s*\(",
r"document\.",
r"window\.",
r"location\.",
r"cookie",
r"innerHTML",
r"outerHTML",
r"alert\s*\(",
r"confirm\s*\(",
r"prompt\s*\(",
r"setTimeout\s*\(",
r"setInterval\s*\(",
r"Function\s*\(",
r"constructor",
r"prototype",
r"__proto__",
r"process\.",
r"require\s*\(",
r"import\s+",
r"from\s+.*import",
r"\.\./",
r"file://",
r"ftp://",
r"data:",
r"blob:",
r"\\x[0-9a-fA-F]{2}",
r"%[0-9a-fA-F]{2}",
r"&#x[0-9a-fA-F]+;",
r"&[a-zA-Z]+;",
r"\$\{",
r"#\{",
r"<%",
r"%>",
r"{{",
r"}}",
r"\\\w+",
r"\0",
r"\r\n",
r"\n\r",
]
for pattern in dangerous_patterns:
if re.search(pattern, filter, re.IGNORECASE):
raise ValueError("Invalid filter content")
if filter.count("'") % 2 != 0:
raise ValueError("Unmatched quotes in filter")
params["filter"] = filter.strip()
if filter_lang:
params["filter-lang"] = filter_lang
elif query_attr and query_attr_value:
if not re.match(r"^[a-zA-Z_][a-zA-Z0-9_]*$", query_attr):
raise ValueError("Invalid field name")
escaped_value = str(query_attr_value).replace("'", "''")
params["filter"] = f"{query_attr} = '{escaped_value}'"
if filter_lang:
params["filter-lang"] = filter_lang
if self.workflow_planner:
valid_collections = set(
self.workflow_planner.basic_collections_info.keys()
)
if collection_id not in valid_collections:
return json.dumps(
{
"error": f"Invalid collection '{collection_id}'. Valid collections: {sorted(valid_collections)[:10]}...",
"suggestion": "Call get_workflow_context() to see all available collections",
}
)
data = await self.api_client.make_request(
"COLLECTION_FEATURES", params=params, path_params=[collection_id]
)
# Apply response optimizations to reduce context size
if exclude_geometry:
data = self._strip_verbose(data)
logger.debug(f"Stripped geometry from {len(data.get('features', []))} features")
if summarize:
data = self._summarise_features(data, max_features_in_summary)
logger.debug(f"Summarized response to {max_features_in_summary} features")
return json.dumps(data)
except ValueError as ve:
error_response = {"error": f"Invalid input: {str(ve)}"}
return json.dumps(
self._add_retry_context(error_response, "search_features")
)
except Exception as e:
error_response = {"error": str(e)}
return json.dumps(
self._add_retry_context(error_response, "search_features")
)
async def get_feature(
self,
collection_id: str,
feature_id: str,
crs: Optional[str] = None,
) -> str:
"""
Get a specific feature by ID.
Args:
collection_id: The collection ID
feature_id: The feature ID
crs: Coordinate reference system for the response
Returns:
JSON string with feature data
"""
try:
params: Dict[str, str] = {}
if crs:
params["crs"] = crs
data = await self.api_client.make_request(
"COLLECTION_FEATURE_BY_ID",
params=params,
path_params=[collection_id, feature_id],
)
return json.dumps(data)
except Exception as e:
error_response = {"error": f"Error getting feature: {str(e)}"}
return json.dumps(self._add_retry_context(error_response, "get_feature"))
async def get_linked_identifiers(
self,
identifier_type: str,
identifier: str,
feature_type: Optional[str] = None,
) -> str:
"""
Get linked identifiers for a specified identifier.
Args:
identifier_type: The type of identifier (e.g., 'TOID', 'UPRN')
identifier: The identifier value
feature_type: Optional feature type to filter results
Returns:
JSON string with linked identifiers or filtered results
"""
try:
data = await self.api_client.make_request(
"LINKED_IDENTIFIERS", path_params=[identifier_type, identifier]
)
if feature_type:
# Filter results by feature type
filtered_results = []
for item in data.get("results", []):
if item.get("featureType") == feature_type:
filtered_results.append(item)
return json.dumps({"results": filtered_results})
return json.dumps(data)
except Exception as e:
error_response = {"error": str(e)}
return json.dumps(
self._add_retry_context(error_response, "get_linked_identifiers")
)
async def get_bulk_features(
self,
collection_id: str,
identifiers: List[str],
query_by_attr: Optional[str] = None,
) -> str:
"""
Get multiple features in a single call.
Args:
collection_id: The collection ID
identifiers: List of feature identifiers
query_by_attr: Attribute to query by (if not provided, assumes feature IDs)
Returns:
JSON string with features data
"""
try:
tasks: List[Any] = []
for identifier in identifiers:
if query_by_attr:
task = self.search_features(
collection_id=collection_id,
query_attr=query_by_attr,
query_attr_value=identifier,
limit=1,
)
else:
task = self.get_feature(collection_id, identifier)
tasks.append(task)
results = await asyncio.gather(*tasks)
parsed_results = [json.loads(result) for result in results]
return json.dumps({"results": parsed_results})
except Exception as e:
error_response = {"error": str(e)}
return json.dumps(
self._add_retry_context(error_response, "get_bulk_features")
)
async def get_bulk_linked_features(
self,
identifier_type: str,
identifiers: List[str],
feature_type: Optional[str] = None,
) -> str:
"""
Get linked features for multiple identifiers in a single call.
Args:
identifier_type: The type of identifier (e.g., 'TOID', 'UPRN')
identifiers: List of identifier values
feature_type: Optional feature type to filter results
Returns:
JSON string with linked features data
"""
try:
tasks = [
self.get_linked_identifiers(identifier_type, identifier, feature_type)
for identifier in identifiers
]
results = await asyncio.gather(*tasks)
parsed_results = [json.loads(result) for result in results]
return json.dumps({"results": parsed_results})
except Exception as e:
error_response = {"error": str(e)}
return json.dumps(
self._add_retry_context(error_response, "get_bulk_linked_features")
)
async def get_prompt_templates(
self,
category: Optional[str] = None,
) -> str:
"""
Get standard prompt templates for interacting with this service.
Args:
category: Optional category of templates to return
(general, collections, features, linked_identifiers)
Returns:
JSON string containing prompt templates
"""
if category and category in PROMPT_TEMPLATES:
return json.dumps({category: PROMPT_TEMPLATES[category]})
return json.dumps(PROMPT_TEMPLATES)
async def fetch_detailed_collections(self, collection_ids: str) -> str:
"""
Fetch detailed queryables for specific collections mentioned in LLM workflow plan.
This is mainly to reduce the size of the context for the LLM.
Only fetch what you really need.
Args:
collection_ids: Comma-separated list of collection IDs (e.g., "lus-fts-site-1,trn-ntwk-street-1")
Returns:
JSON string with detailed queryables for the specified collections
"""
try:
if not self.workflow_planner:
return json.dumps(
{
"error": "Workflow planner not initialized. Call get_workflow_context() first."
}
)
requested_collections = [cid.strip() for cid in collection_ids.split(",")]
valid_collections = set(self.workflow_planner.basic_collections_info.keys())
invalid_collections = [
cid for cid in requested_collections if cid not in valid_collections
]
if invalid_collections:
return json.dumps(
{
"error": f"Invalid collection IDs: {invalid_collections}",
"valid_collections": sorted(valid_collections),
}
)
cached_collections = [
cid
for cid in requested_collections
if cid in self.workflow_planner.detailed_collections_cache
]
collections_to_fetch = [
cid
for cid in requested_collections
if cid not in self.workflow_planner.detailed_collections_cache
]
if collections_to_fetch:
logger.info(f"Fetching detailed queryables for: {collections_to_fetch}")
detailed_queryables = (
await self.api_client.fetch_collections_queryables(
collections_to_fetch
)
)
for coll_id, queryables in detailed_queryables.items():
self.workflow_planner.detailed_collections_cache[coll_id] = {
"id": queryables.id,
"title": queryables.title,
"description": queryables.description,
"all_queryables": queryables.all_queryables,
"enum_queryables": queryables.enum_queryables,
"has_enum_filters": queryables.has_enum_filters,
"total_queryables": queryables.total_queryables,
"enum_count": queryables.enum_count,
}
context = self.workflow_planner.get_detailed_context(requested_collections)
return json.dumps(
{
"success": True,
"collections_processed": requested_collections,
"collections_fetched_from_api": collections_to_fetch,
"collections_from_cache": cached_collections,
"detailed_collections": context["available_collections"],
"message": f"Detailed queryables now available for: {', '.join(requested_collections)}",
}
)
except Exception as e:
logger.error(f"Error fetching detailed collections: {e}")
return json.dumps(
{"error": str(e), "suggestion": "Check collection IDs and try again"}
)
async def get_routing_data(
self,
bbox: Optional[str] = None,
limit: int = 100,
include_nodes: bool = True,
include_edges: bool = True,
build_network: bool = True,
exclude_geometry: bool = True,
) -> str:
"""
Get routing data - builds network and returns nodes/edges as flat tables.
Args:
bbox: Optional bounding box (format: "minx,miny,maxx,maxy")
limit: Maximum number of road links to process (default: 100)
include_nodes: Whether to include nodes in response (default: True)
include_edges: Whether to include edges in response (default: True)
build_network: Whether to build network first (default: True)
exclude_geometry: Strip geometry to reduce context size (default: True)
Returns:
JSON string with routing network data
"""
try:
result = {}
if build_network:
build_result = await self.routing_service.build_routing_network(
bbox, limit
)
result["build_status"] = build_result
if build_result.get("status") != "success":
return json.dumps(result)
if include_nodes:
nodes_result = self.routing_service.get_flat_nodes()
result["nodes"] = nodes_result.get("nodes", [])
if include_edges:
edges_result = self.routing_service.get_flat_edges()
edges = edges_result.get("edges", [])
# Strip geometry from edges to reduce context size
if exclude_geometry and edges:
for edge in edges:
if "geometry" in edge and edge["geometry"]:
edge["geometry"] = {
"type": edge["geometry"].get("type", "unknown"),
"_stripped": True,
"_note": "Geometry removed to reduce context size"
}
logger.debug(f"Stripped geometry from {len(edges)} routing edges")
result["edges"] = edges
summary = self.routing_service.get_network_info()
result["summary"] = summary.get("network", {})
result["status"] = "success"
return json.dumps(result)
except Exception as e:
return json.dumps({"error": str(e)})
```