# Directory Structure ``` ├── .dockerignore ├── .env.local ├── .gitignore ├── .python-version ├── dockerfile ├── LICENSE ├── makefile ├── pyproject.toml ├── README.md └── src ├── __init__.py ├── api_service │ ├── __init__.py │ ├── os_api.py │ └── protocols.py ├── config_docs │ ├── ogcapi-features-1.yaml │ ├── ogcapi-features-2.yaml │ └── ogcapi-features-3.yaml ├── http_client_test.py ├── mcp_service │ ├── __init__.py │ ├── guardrails.py │ ├── os_service.py │ ├── prompts.py │ ├── protocols.py │ ├── resources.py │ └── routing_service.py ├── middleware │ ├── __init__.py │ ├── http_middleware.py │ └── stdio_middleware.py ├── models.py ├── prompt_templates │ ├── __init__.py │ └── prompt_templates.py ├── server.py ├── stdio_client_test.py ├── utils │ ├── __init__.py │ └── logging_config.py └── workflow_generator ├── __init__.py └── workflow_planner.py ``` # Files -------------------------------------------------------------------------------- /.python-version: -------------------------------------------------------------------------------- ``` 3.11 ``` -------------------------------------------------------------------------------- /.env.local: -------------------------------------------------------------------------------- ``` OS_API_KEY=xxxxxxx STDIO_KEY=xxxxxxx BEARER_TOKENS=xxxxxxx ``` -------------------------------------------------------------------------------- /.gitignore: -------------------------------------------------------------------------------- ``` # Python-generated files __pycache__/ *.py[oc] build/ dist/ wheels/ *.egg-info # Virtual environments .venv .env .DS_Store .ruff_cache uv.lock pyrightconfig.json *.log .pytest_cache/ .mypy_cache/ ``` -------------------------------------------------------------------------------- /.dockerignore: -------------------------------------------------------------------------------- ``` # Python artifacts __pycache__/ *.py[cod] *$py.class *.so .Python build/ develop-eggs/ dist/ downloads/ eggs/ .eggs/ lib/ lib64/ parts/ sdist/ var/ wheels/ *.egg-info/ .installed.cfg *.egg # Virtual environments .venv/ venv/ ENV/ env/ .env # Testing and type checking .pytest_cache/ .mypy_cache/ .coverage htmlcov/ .tox/ .hypothesis/ *.cover .coverage.* # Development tools .ruff_cache/ pyrightconfig.json .vscode/ .idea/ *.swp *.swo *~ # OS files .DS_Store .DS_Store? ._* .Spotlight-V100 .Trashes ehthumbs.db Thumbs.db # Git .git/ .gitignore .gitattributes # Documentation *.md docs/ LICENSE # Build files Makefile makefile dockerfile Dockerfile # Logs *.log logs/ # Package manager uv.lock poetry.lock Pipfile.lock # Test files *_test.py test_*.py tests/ ``` -------------------------------------------------------------------------------- /README.md: -------------------------------------------------------------------------------- ```markdown # Ordnance Survey MCP Server A MCP server for accessing UK geospatial data through Ordnance Survey APIs. ## What it does Provides LLM access to the Ordnance Survey's Data Hub APIs. Ask simple questions such as find me all cinemas in Leeds City Centre or use the prompts templates for more complex, speciifc use cases - relating to street works, planning, etc. This MCP server enforces a 2 step workflow plan to ensure that the user gets the best results possible. ## Quick Start ### 1. Get an OS API Key Register at [OS Data Hub](https://osdatahub.os.uk/) to get your free API key and set up a project. ### 2. Run with Docker and add to your Claude Desktop config (easiest) ```bash Clone the repository: git clone https://github.com/your-username/os-mcp-server.git cd os-mcp-server ``` Then build the Docker image: ```bash docker build -t os-mcp-server . ``` Add the following to your Claude Desktop config: ```json { "mcpServers": { "os-mcp-server": { "command": "docker", "args": [ "run", "--rm", "-i", "-e", "OS_API_KEY=your_api_key_here", "-e", "STDIO_KEY=any_value", "os-mcp-server" ] } } } ``` Open Claude Desktop and you should now see all available tools, resources, and prompts. ## Requirements - Python 3.11+ - OS API Key from [OS Data Hub](https://osdatahub.os.uk/) - Set a STDIO_KEY env var which can be any value for now whilst auth is improved. ## License MIT License. This project does not have the endorsement of Ordnance Survey. This is a personal project and not affiliated with Ordnance Survey. This is not a commercial product. It is actively being worked on so expect breaking changes and bugs. ``` -------------------------------------------------------------------------------- /src/__init__.py: -------------------------------------------------------------------------------- ```python ``` -------------------------------------------------------------------------------- /src/api_service/__init__.py: -------------------------------------------------------------------------------- ```python ``` -------------------------------------------------------------------------------- /src/mcp_service/__init__.py: -------------------------------------------------------------------------------- ```python ``` -------------------------------------------------------------------------------- /src/middleware/__init__.py: -------------------------------------------------------------------------------- ```python ``` -------------------------------------------------------------------------------- /src/prompt_templates/__init__.py: -------------------------------------------------------------------------------- ```python ``` -------------------------------------------------------------------------------- /src/utils/__init__.py: -------------------------------------------------------------------------------- ```python ``` -------------------------------------------------------------------------------- /src/workflow_generator/__init__.py: -------------------------------------------------------------------------------- ```python ``` -------------------------------------------------------------------------------- /pyproject.toml: -------------------------------------------------------------------------------- ```toml [project] name = "os-mcp" version = "0.1.5" description = "A Python MCP server that provides access to Ordnance Survey's DataHub APIs." readme = "README.md" requires-python = ">=3.11" dependencies = [ "aiohttp>=3.11.18", "anthropic>=0.51.0", "fastapi>=0.116.1", "mcp>=1.12.0", "starlette>=0.47.1", "uvicorn>=0.35.0", ] ``` -------------------------------------------------------------------------------- /dockerfile: -------------------------------------------------------------------------------- ```dockerfile FROM python:3.11-slim WORKDIR /app ENV PYTHONPATH=/app/src \ PYTHONUNBUFFERED=1 \ PIP_NO_CACHE_DIR=1 \ PIP_DISABLE_PIP_VERSION_CHECK=1 RUN apt-get update && apt-get install -y \ gcc \ && rm -rf /var/lib/apt/lists/* \ && apt-get clean COPY pyproject.toml ./ RUN pip install --no-cache-dir . COPY src/ ./src/ EXPOSE 8000 CMD ["python", "src/server.py", "--transport", "stdio"] ``` -------------------------------------------------------------------------------- /src/workflow_generator/workflow_planner.py: -------------------------------------------------------------------------------- ```python from typing import Dict, Any, Optional, List from models import OpenAPISpecification from utils.logging_config import get_logger logger = get_logger(__name__) class WorkflowPlanner: """Context provider for LLM workflow planning""" def __init__( self, openapi_spec: Optional[OpenAPISpecification], basic_collections_info: Optional[Dict[str, Any]] = None, ): self.spec = openapi_spec self.basic_collections_info = basic_collections_info or {} self.detailed_collections_cache = {} def get_basic_context(self) -> Dict[str, Any]: """Get basic context for LLM to plan its workflow - no detailed queryables""" return { "available_collections": self.basic_collections_info, "openapi_spec": self.spec, } def get_detailed_context(self, collection_ids: List[str]) -> Dict[str, Any]: """Get detailed context for specific collections mentioned in the plan""" detailed_collections = { coll_id: self.detailed_collections_cache.get(coll_id) for coll_id in collection_ids if coll_id in self.detailed_collections_cache } return { "available_collections": detailed_collections, "openapi_spec": self.spec, } ``` -------------------------------------------------------------------------------- /src/api_service/protocols.py: -------------------------------------------------------------------------------- ```python from typing import Protocol, Dict, List, Any, Optional, runtime_checkable from models import ( OpenAPISpecification, CollectionsCache, CollectionQueryables, ) @runtime_checkable class APIClient(Protocol): """Protocol for API clients""" async def initialise(self): """Initialise the aiohttp session if not already created""" ... async def close(self): """Close the aiohttp session""" ... async def get_api_key(self) -> str: """Get the API key""" ... async def make_request( self, endpoint: str, params: Optional[Dict[str, Any]] = None, path_params: Optional[List[str]] = None, ) -> Dict[str, Any]: """Make a request to an API endpoint""" ... async def make_request_no_auth( self, url: str, params: Optional[Dict[str, Any]] = None, max_retries: int = 2, ) -> str: """Make a request without authentication""" ... async def cache_openapi_spec(self) -> OpenAPISpecification: """Cache the OpenAPI spec""" ... async def cache_collections(self) -> CollectionsCache: """Cache the collections data""" ... async def fetch_collections_queryables( self, collection_ids: List[str] ) -> Dict[str, CollectionQueryables]: """Fetch detailed queryables for specific collections only""" ... ``` -------------------------------------------------------------------------------- /src/utils/logging_config.py: -------------------------------------------------------------------------------- ```python import logging import sys import os import re class APIKeySanitisingFilter(logging.Filter): """Filter to sanitise API keys from log messages""" def __init__(self): super().__init__() self.patterns = [ r"[?&]key=[^&\s]*", r"[?&]api_key=[^&\s]*", r"[?&]apikey=[^&\s]*", r"[?&]token=[^&\s]*", ] def filter(self, record: logging.LogRecord) -> bool: """Sanitise the log record message""" if hasattr(record, "msg") and isinstance(record.msg, str): record.msg = self._sanitise_text(record.msg) if hasattr(record, "args") and record.args: sanitised_args = [] for arg in record.args: if isinstance(arg, str): sanitised_args.append(self._sanitise_text(arg)) else: sanitised_args.append(arg) record.args = tuple(sanitised_args) return True def _sanitise_text(self, text: str) -> str: """Remove API keys from text""" sanitised = text for pattern in self.patterns: sanitised = re.sub(pattern, "", sanitised, flags=re.IGNORECASE) sanitised = re.sub(r"[?&]$", "", sanitised) sanitised = re.sub(r"&{2,}", "&", sanitised) sanitised = re.sub(r"\?&", "?", sanitised) return sanitised def configure_logging(debug: bool = False) -> logging.Logger: """ Configure logging for the entire application. Args: debug: Whether to enable debug logging or not """ log_level = ( logging.DEBUG if (debug or os.environ.get("DEBUG") == "1") else logging.INFO ) formatter = logging.Formatter( "%(asctime)s - %(name)s - %(levelname)s - %(message)s" ) root_logger = logging.getLogger() root_logger.setLevel(log_level) for handler in root_logger.handlers[:]: root_logger.removeHandler(handler) console_handler = logging.StreamHandler(sys.stderr) console_handler.setFormatter(formatter) console_handler.setLevel(log_level) api_key_filter = APIKeySanitisingFilter() console_handler.addFilter(api_key_filter) root_logger.addHandler(console_handler) logging.getLogger("uvicorn").propagate = False return root_logger def get_logger(name: str) -> logging.Logger: """ Get a logger for a module. Args: name: Module name (usually __name__) Returns: Logger instance """ return logging.getLogger(name) ``` -------------------------------------------------------------------------------- /src/prompt_templates/prompt_templates.py: -------------------------------------------------------------------------------- ```python PROMPT_TEMPLATES = { "usrn_breakdown": ( "Break down USRN {usrn} into its component road links for routing analysis. " "Step 1: GET /collections/trn-ntwk-street-1/items?filter=usrn='{usrn}' to get street details. " "Step 2: Use Street geometry bbox to query Road Links: GET /collections/trn-ntwk-roadlink-4/items?bbox=[street_bbox] " "Step 3: Filter Road Links by Street reference using properties.street_ref matching street feature ID. " "Step 4: For each Road Link: GET /collections/trn-ntwk-roadnode-1/items?filter=roadlink_ref='roadlink_id' " "Step 5: Set crs=EPSG:27700 for British National Grid coordinates. " "Return: Complete breakdown of USRN into constituent Road Links with node connections." ), "restriction_matching_analysis": ( "Perform comprehensive traffic restriction matching for road network in bbox {bbox} with SPECIFIC STREET IDENTIFICATION. " "Step 1: Build routing network: get_routing_data(bbox='{bbox}', limit={limit}, build_network=True) " "Step 2: Extract restriction data from build_status.restrictions array. " "Step 3: For each restriction, match to road links using restrictionnetworkreference: " " - networkreferenceid = Road Link UUID from trn-ntwk-roadlink-4 " " - roadlinkdirection = 'In Direction' (with geometry) or 'In Opposite Direction' (against geometry) " " - roadlinksequence = order for multi-link restrictions (turns) " "Step 4: IMMEDIATELY lookup street names: Use get_bulk_features(collection_id='trn-ntwk-roadlink-4', identifiers=[road_link_uuids]) to resolve UUIDs to actual street names (name1_text, roadclassification, roadclassificationnumber) " "Step 5: Analyze restriction types by actual street names: " " - One Way: Apply directional constraint to specific named road " " - Turn Restriction: Block movement between named streets (from street A to street B) " " - Vehicle Restrictions: Apply dimension/weight limits to specific named roads " " - Access Restrictions: Apply vehicle type constraints to specific named streets " "Step 6: Check exemptions array (e.g., 'Pedal Cycles' exempt from one-way) for each named street " "Step 7: Group results by street names and road classifications (A Roads, B Roads, Local Roads) " "Return: Complete restriction mapping showing ACTUAL STREET NAMES with their specific restrictions, directions, exemptions, and road classifications. Present results as 'Street Name (Road Class)' rather than UUIDs." ), } ``` -------------------------------------------------------------------------------- /src/mcp_service/resources.py: -------------------------------------------------------------------------------- ```python """MCP Resources for OS NGD documentation""" import json import time from models import NGDAPIEndpoint from utils.logging_config import get_logger logger = get_logger(__name__) # TODO: Do this for class OSDocumentationResources: """Handles registration of OS NGD documentation resources""" def __init__(self, mcp_service, api_client): self.mcp = mcp_service self.api_client = api_client def register_all(self) -> None: """Register all documentation resources""" self._register_transport_network_resources() # Future: self._register_land_resources() # Future: self._register_building_resources() def _register_transport_network_resources(self) -> None: """Register transport network documentation resources""" @self.mcp.resource("os-docs://street") async def street_docs() -> str: return await self._fetch_doc_resource( "street", NGDAPIEndpoint.MARKDOWN_STREET.value ) @self.mcp.resource("os-docs://road") async def road_docs() -> str: return await self._fetch_doc_resource( "road", NGDAPIEndpoint.MARKDOWN_ROAD.value ) @self.mcp.resource("os-docs://tram-on-road") async def tram_on_road_docs() -> str: return await self._fetch_doc_resource( "tram-on-road", NGDAPIEndpoint.TRAM_ON_ROAD.value ) @self.mcp.resource("os-docs://road-node") async def road_node_docs() -> str: return await self._fetch_doc_resource( "road-node", NGDAPIEndpoint.ROAD_NODE.value ) @self.mcp.resource("os-docs://road-link") async def road_link_docs() -> str: return await self._fetch_doc_resource( "road-link", NGDAPIEndpoint.ROAD_LINK.value ) @self.mcp.resource("os-docs://road-junction") async def road_junction_docs() -> str: return await self._fetch_doc_resource( "road-junction", NGDAPIEndpoint.ROAD_JUNCTION.value ) async def _fetch_doc_resource(self, feature_type: str, url: str) -> str: """Generic method to fetch documentation resources""" try: content = await self.api_client.make_request_no_auth(url) return json.dumps( { "feature_type": feature_type, "content": content, "content_type": "markdown", "source_url": url, "timestamp": time.time(), } ) except Exception as e: logger.error(f"Error fetching {feature_type} documentation: {e}") return json.dumps({"error": str(e), "feature_type": feature_type}) ``` -------------------------------------------------------------------------------- /src/mcp_service/prompts.py: -------------------------------------------------------------------------------- ```python from typing import List from mcp.types import PromptMessage, TextContent from prompt_templates.prompt_templates import PROMPT_TEMPLATES from utils.logging_config import get_logger logger = get_logger(__name__) class OSWorkflowPrompts: """Handles registration of OS NGD workflow prompts""" def __init__(self, mcp_service): self.mcp = mcp_service def register_all(self) -> None: """Register all workflow prompts""" self._register_analysis_prompts() self._register_general_prompts() def _register_analysis_prompts(self) -> None: """Register analysis workflow prompts""" @self.mcp.prompt() def usrn_breakdown_analysis(usrn: str) -> List[PromptMessage]: """Generate a step-by-step USRN breakdown workflow""" template = PROMPT_TEMPLATES["usrn_breakdown"].format(usrn=usrn) return [ PromptMessage( role="user", content=TextContent( type="text", text=f"As an expert in OS NGD API workflows and transport network analysis, {template}", ), ) ] def _register_general_prompts(self) -> None: """Register general OS NGD guidance prompts""" @self.mcp.prompt() def collection_query_guidance( collection_id: str, query_type: str = "features" ) -> List[PromptMessage]: """Generate guidance for querying OS NGD collections""" return [ PromptMessage( role="user", content=TextContent( type="text", text=f"As an OS NGD API expert, guide me through querying the '{collection_id}' collection for {query_type}. " f"Include: 1) Available filters, 2) Best practices for bbox queries, " f"3) CRS considerations, 4) Example queries with proper syntax.", ), ) ] @self.mcp.prompt() def workflow_planning( user_request: str, data_theme: str = "transport" ) -> List[PromptMessage]: """Generate a workflow plan for complex OS NGD queries""" return [ PromptMessage( role="user", content=TextContent( type="text", text=f"As a geospatial workflow planner, create a detailed workflow plan for: '{user_request}'. " f"Focus on {data_theme} theme data. Include: " f"1) Collection selection rationale, " f"2) Query sequence with dependencies, " f"3) Filter strategies, " f"4) Error handling considerations.", ), ) ] ``` -------------------------------------------------------------------------------- /src/config_docs/ogcapi-features-2.yaml: -------------------------------------------------------------------------------- ```yaml openapi: 3.0.3 info: title: "Building Blocks specified in OGC API - Features - Part 2: Coordinate Reference Systems by Reference" description: |- Common components used in the [OGC standard "OGC API - Features - Part 2: Coordinate Reference Systems by Reference"](http://docs.opengeospatial.org/is/18-058/18-058.html). OGC API - Features - Part 2: Coordinate Reference Systems by Reference 1.0 is an OGC Standard. Copyright (c) 2020 Open Geospatial Consortium. To obtain additional rights of use, visit http://www.opengeospatial.org/legal/ . This document is also available on [OGC](http://schemas.opengis.net/ogcapi/features/part2/1.0/openapi/ogcapi-features-2.yaml). version: '1.0.0' contact: name: Clemens Portele email: [email protected] license: name: OGC License url: 'http://www.opengeospatial.org/legal/' components: parameters: bbox-crs: name: bbox-crs in: query required: false schema: type: string format: uri style: form explode: false crs: name: crs in: query required: false schema: type: string format: uri style: form explode: false schemas: collection: allOf: - $ref: http://schemas.opengis.net/ogcapi/features/part1/1.0/openapi/schemas/collection.yaml - $ref: '#/components/schemas/collectionExtensionCrs' collectionExtensionCrs: type: object properties: storageCrs: description: the CRS identifier, from the list of supported CRS identifiers, that may be used to retrieve features from a collection without the need to apply a CRS transformation type: string format: uri storageCrsCoordinateEpoch: description: point in time at which coordinates in the spatial feature collection are referenced to the dynamic coordinate reference system in `storageCrs`, that may be used to retrieve features from a collection without the need to apply a change of coordinate epoch. It is expressed as a decimal year in the Gregorian calendar type: number example: '2017-03-25 in the Gregorian calendar is epoch 2017.23' collections: allOf: - $ref: http://schemas.opengis.net/ogcapi/features/part1/1.0/openapi/schemas/collections.yaml - $ref: '#/components/schemas/collectionsExtensionCrs' collectionsExtensionCrs: type: object properties: crs: description: a global list of CRS identifiers that are supported by spatial feature collections offered by the service type: array items: type: string format: uri headers: Content-Crs: description: a URI, in angular brackets, identifying the coordinate reference system used in the content / payload schema: type: string example: '<http://www.opengis.net/def/crs/EPSG/0/3395>' ``` -------------------------------------------------------------------------------- /src/mcp_service/protocols.py: -------------------------------------------------------------------------------- ```python from typing import Protocol, Optional, Callable, List, runtime_checkable, Any @runtime_checkable class MCPService(Protocol): """Protocol for MCP services""" def tool(self) -> Callable[..., Any]: """Register a function as an MCP tool""" ... def resource( self, uri: str, *, name: str | None = None, title: str | None = None, description: str | None = None, mime_type: str | None = None, ) -> Callable[[Any], Any]: """Register a function as an MCP resource""" ... def run(self) -> None: """Run the MCP service""" ... @runtime_checkable class FeatureService(Protocol): """Protocol for OS NGD feature services""" def hello_world(self, name: str) -> str: """Test connection to the service""" ... def check_api_key(self) -> str: """Check if API key is available""" ... async def list_collections(self) -> str: """List all available feature collections""" ... async def get_single_collection(self, collection_id: str) -> str: """Get detailed information about a specific collection""" ... async def get_single_collection_queryables(self, collection_id: str) -> str: """Get queryable properties for a collection""" ... # TODO: Need to make sure the full list of parameters is supported # TODO: Supporting cql-text is clunky and need to figure out how to support this better async def search_features( self, collection_id: str, bbox: Optional[str] = None, crs: Optional[str] = None, limit: int = 10, offset: int = 0, filter: Optional[str] = None, filter_lang: Optional[str] = "cql-text", query_attr: Optional[str] = None, query_attr_value: Optional[str] = None, ) -> str: """Search for features in a collection with full CQL filter support""" ... async def get_feature( self, collection_id: str, feature_id: str, crs: Optional[str] = None ) -> str: """Get a specific feature by ID""" ... async def get_linked_identifiers( self, identifier_type: str, identifier: str, feature_type: Optional[str] = None ) -> str: """Get linked identifiers for a specified identifier""" ... async def get_bulk_features( self, collection_id: str, identifiers: List[str], query_by_attr: Optional[str] = None, ) -> str: """Get multiple features in a single call""" ... async def get_bulk_linked_features( self, identifier_type: str, identifiers: List[str], feature_type: Optional[str] = None, ) -> str: """Get linked features for multiple identifiers""" ... async def fetch_detailed_collections(self, collection_ids: List[str]) -> str: """Get detailed information about specific collections for workflow planning""" ... ``` -------------------------------------------------------------------------------- /src/middleware/stdio_middleware.py: -------------------------------------------------------------------------------- ```python import json import time import asyncio from collections import deque from typing import Callable, TypeVar, Any, Union, cast from functools import wraps from utils.logging_config import get_logger logger = get_logger(__name__) F = TypeVar("F", bound=Callable[..., Any]) class StdioRateLimiter: """STDIO-specific rate limiting""" def __init__(self, requests_per_minute: int = 10, window_seconds: int = 60): self.requests_per_minute = requests_per_minute self.window_seconds = window_seconds self.request_timestamps = deque() def check_rate_limit(self) -> bool: """Check rate limit for STDIO client""" current_time = time.time() while ( self.request_timestamps and current_time - self.request_timestamps[0] >= self.window_seconds ): self.request_timestamps.popleft() if len(self.request_timestamps) >= self.requests_per_minute: logger.warning("STDIO rate limit exceeded") return False self.request_timestamps.append(current_time) return True class StdioMiddleware: """STDIO authentication and rate limiting""" def __init__(self, requests_per_minute: int = 20): self.authenticated = False self.client_id = "anonymous" self.rate_limiter = StdioRateLimiter(requests_per_minute=requests_per_minute) def require_auth_and_rate_limit(self, func: F) -> F: """Decorator for auth and rate limiting""" @wraps(func) async def async_wrapper(*args: Any, **kwargs: Any) -> Union[str, Any]: if not self.authenticated: logger.error( json.dumps({"error": "Authentication required", "code": 401}) ) return json.dumps({"error": "Authentication required", "code": 401}) if not self.rate_limiter.check_rate_limit(): logger.error(json.dumps({"error": "Rate limited", "code": 429})) return json.dumps({"error": "Rate limited", "code": 429}) return await func(*args, **kwargs) @wraps(func) def sync_wrapper(*args: Any, **kwargs: Any) -> Union[str, Any]: if not self.authenticated: logger.error( json.dumps({"error": "Authentication required", "code": 401}) ) return json.dumps({"error": "Authentication required", "code": 401}) if not self.rate_limiter.check_rate_limit(): logger.error(json.dumps({"error": "Rate limited", "code": 429})) return json.dumps({"error": "Rate limited", "code": 429}) return func(*args, **kwargs) if asyncio.iscoroutinefunction(func): return cast(F, async_wrapper) return cast(F, sync_wrapper) def authenticate(self, key: str) -> bool: """Authenticate with API key""" if key and key.strip(): self.authenticated = True self.client_id = key return True else: self.authenticated = False return False ``` -------------------------------------------------------------------------------- /src/mcp_service/guardrails.py: -------------------------------------------------------------------------------- ```python import re import json import asyncio from functools import wraps from typing import TypeVar, Callable, Any, Union, cast from utils.logging_config import get_logger logger = get_logger(__name__) F = TypeVar("F", bound=Callable[..., Any]) class ToolGuardrails: """Prompt injection protection""" def __init__(self): self.suspicious_patterns = [ r"(?i)ignore previous", r"(?i)ignore all previous instructions", r"(?i)assistant:", r"\{\{.*?\}\}", r"(?i)forget", r"(?i)show credentials", r"(?i)show secrets", r"(?i)reveal password", r"(?i)dump (tokens|secrets|passwords|credentials)", r"(?i)leak confidential", r"(?i)reveal secrets", r"(?i)expose secrets", r"(?i)secrets.*contain", r"(?i)extract secrets", ] def detect_prompt_injection(self, input_text: Any) -> bool: """Check if input contains prompt injection attempts""" if not isinstance(input_text, str): return False return any( re.search(pattern, input_text) for pattern in self.suspicious_patterns ) def basic_guardrails(self, func: F) -> F: """Prompt injection protection only""" @wraps(func) async def async_wrapper(*args: Any, **kwargs: Any) -> Union[str, Any]: try: for arg in args: if isinstance(arg, str) and self.detect_prompt_injection(arg): raise ValueError("Prompt injection detected!") for name, value in kwargs.items(): if not ( hasattr(value, "request_context") and hasattr(value, "request_id") ): if isinstance(value, str) and self.detect_prompt_injection( value ): raise ValueError(f"Prompt injection in '{name}'!") except ValueError as e: return json.dumps({"error": str(e), "code": 400}) return await func(*args, **kwargs) @wraps(func) def sync_wrapper(*args: Any, **kwargs: Any) -> Union[str, Any]: try: for arg in args: if isinstance(arg, str) and self.detect_prompt_injection(arg): raise ValueError("Prompt injection detected!") for name, value in kwargs.items(): if not ( hasattr(value, "request_context") and hasattr(value, "request_id") ): if isinstance(value, str) and self.detect_prompt_injection( value ): raise ValueError(f"Prompt injection in '{name}'!") except ValueError as e: return json.dumps({"error": str(e), "code": 400}) return func(*args, **kwargs) if asyncio.iscoroutinefunction(func): return cast(F, async_wrapper) return cast(F, sync_wrapper) ``` -------------------------------------------------------------------------------- /src/models.py: -------------------------------------------------------------------------------- ```python """ Types for the OS NGD API MCP Server """ from enum import Enum from pydantic import BaseModel from typing import Any, List, Dict class NGDAPIEndpoint(Enum): """ Enum for the OS API Endpoints following OGC API Features standard """ # NGD Features Endpoints NGD_FEATURES_BASE_PATH = "https://api.os.uk/features/ngd/ofa/v1/{}" COLLECTIONS = NGD_FEATURES_BASE_PATH.format("collections") COLLECTION_INFO = NGD_FEATURES_BASE_PATH.format("collections/{}") COLLECTION_SCHEMA = NGD_FEATURES_BASE_PATH.format("collections/{}/schema") COLLECTION_FEATURES = NGD_FEATURES_BASE_PATH.format("collections/{}/items") COLLECTION_FEATURE_BY_ID = NGD_FEATURES_BASE_PATH.format("collections/{}/items/{}") COLLECTION_QUERYABLES = NGD_FEATURES_BASE_PATH.format("collections/{}/queryables") # OpenAPI Specification Endpoint OPENAPI_SPEC = NGD_FEATURES_BASE_PATH.format("api") # Linked Identifiers Endpoints LINKED_IDENTIFIERS_BASE_PATH = "https://api.os.uk/search/links/v1/{}" LINKED_IDENTIFIERS = LINKED_IDENTIFIERS_BASE_PATH.format("identifierTypes/{}/{}") # Markdown Resources MARKDOWN_BASE_PATH = "https://docs.os.uk/osngd/data-structure/{}" MARKDOWN_STREET = MARKDOWN_BASE_PATH.format("transport/transport-network/street.md") MARKDOWN_ROAD = MARKDOWN_BASE_PATH.format("transport/transport-network/road.md") TRAM_ON_ROAD = MARKDOWN_BASE_PATH.format( "transport/transport-network/tram-on-road.md" ) ROAD_NODE = MARKDOWN_BASE_PATH.format("transport/transport-network/road-node.md") ROAD_LINK = MARKDOWN_BASE_PATH.format("transport/transport-network/road-link.md") ROAD_JUNCTION = MARKDOWN_BASE_PATH.format( "transport/transport-network/road-junction.md" ) # Places API Endpoints # TODO: Add these back in when I get access to the Places API from OS # PLACES_BASE_PATH = "https://api.os.uk/search/places/v1/{}" # PLACES_UPRN = PLACES_BASE_PATH.format("uprn") # POST_CODE = PLACES_BASE_PATH.format("postcode") class OpenAPISpecification(BaseModel): """Parsed OpenAPI specification optimized for LLM context""" title: str version: str base_url: str endpoints: Dict[str, str] collection_ids: List[str] supported_crs: Dict[str, Any] crs_guide: Dict[str, str] class WorkflowStep(BaseModel): """A single step in a workflow plan""" step_number: int description: str api_endpoint: str parameters: Dict[str, Any] dependencies: List[int] = [] class WorkflowPlan(BaseModel): """Generated workflow plan for user requests""" user_request: str steps: List[WorkflowStep] reasoning: str estimated_complexity: str = "simple" class Collection(BaseModel): """Represents a feature collection from the OS NGD API""" id: str title: str description: str = "" links: List[Dict[str, Any]] = [] extent: Dict[str, Any] = {} itemType: str = "feature" class CollectionsCache(BaseModel): """Cached collections data with filtering applied""" collections: List[Collection] raw_response: Dict[str, Any] class CollectionQueryables(BaseModel): """Queryables information for a collection""" id: str title: str description: str all_queryables: Dict[str, Any] enum_queryables: Dict[str, Any] has_enum_filters: bool total_queryables: int enum_count: int class WorkflowContextCache(BaseModel): """Cached workflow context data""" collections_info: Dict[str, CollectionQueryables] openapi_spec: OpenAPISpecification cached_at: float ``` -------------------------------------------------------------------------------- /src/server.py: -------------------------------------------------------------------------------- ```python import argparse import os import uvicorn from typing import Any from utils.logging_config import configure_logging from api_service.os_api import OSAPIClient from mcp_service.os_service import OSDataHubService from mcp.server.fastmcp import FastMCP from middleware.stdio_middleware import StdioMiddleware from middleware.http_middleware import HTTPMiddleware from starlette.middleware import Middleware from starlette.middleware.cors import CORSMiddleware from starlette.routing import Route from starlette.responses import JSONResponse logger = configure_logging() def main(): """Main entry point""" parser = argparse.ArgumentParser(description="OS DataHub API MCP Server") parser.add_argument( "--transport", choices=["stdio", "streamable-http"], default="stdio", help="Transport protocol to use (stdio or streamable-http)", ) parser.add_argument( "--host", default="0.0.0.0", help="Host to bind to (default: 0.0.0.0)" ) parser.add_argument( "--port", type=int, default=8000, help="Port to bind to (default: 8000)" ) parser.add_argument("--debug", action="store_true", help="Enable debug mode") args = parser.parse_args() configure_logging(debug=args.debug) logger.info( f"OS DataHub API MCP Server starting with {args.transport} transport..." ) api_client = OSAPIClient() match args.transport: case "stdio": logger.info("Starting with stdio transport") mcp = FastMCP( "os-ngd-api", debug=args.debug, log_level="DEBUG" if args.debug else "INFO", ) stdio_auth = StdioMiddleware() service = OSDataHubService(api_client, mcp, stdio_middleware=stdio_auth) stdio_api_key = os.environ.get("STDIO_KEY") if not stdio_api_key or not stdio_auth.authenticate(stdio_api_key): logger.error("Authentication failed") return service.run() case "streamable-http": logger.info(f"Starting Streamable HTTP server on {args.host}:{args.port}") mcp = FastMCP( "os-ngd-api", host=args.host, port=args.port, debug=args.debug, json_response=True, stateless_http=False, log_level="DEBUG" if args.debug else "INFO", ) OSDataHubService(api_client, mcp) async def auth_discovery(_: Any) -> JSONResponse: """Return authentication methods.""" return JSONResponse( content={"authMethods": [{"type": "http", "scheme": "bearer"}]} ) app = mcp.streamable_http_app() app.routes.append( Route( "/.well-known/mcp-auth", endpoint=auth_discovery, methods=["GET"], ) ) app.user_middleware.extend( [ Middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["GET", "POST", "OPTIONS"], allow_headers=["*"], expose_headers=["*"], ), Middleware(HTTPMiddleware), ] ) uvicorn.run( app, host=args.host, port=args.port, log_level="debug" if args.debug else "info", ) case _: logger.error(f"Unknown transport: {args.transport}") return if __name__ == "__main__": main() ``` -------------------------------------------------------------------------------- /src/config_docs/ogcapi-features-3.yaml: -------------------------------------------------------------------------------- ```yaml openapi: 3.0.3 info: title: "Building Blocks specified in OGC API - Features - Part 3: Filtering" description: |- Common components used in the [OGC standard "OGC API - Features - Part 3: Filtering"](https://docs.ogc.org/is/19-079r2/19-079r2.html). OGC API - Features - Part 3: Filtering 1.0 is an OGC Standard. Copyright (c) 2024 Open Geospatial Consortium. To obtain additional rights of use, visit https://www.ogc.org/legal/ . This document is also available in the [OGC Schema Repository](https://schemas.opengis.net/ogcapi/features/part3/1.0/openapi/ogcapi-features-3.yaml). version: '1.0.0' contact: name: Clemens Portele email: [email protected] license: name: OGC License url: 'https://www.ogc.org/legal/' paths: /collections/{collectionId}/queryables: get: summary: Get the list of supported queryables for a collection description: |- This operation returns the list of supported queryables of a collection. Queryables are the properties that may be used to construct a filter expression on items in the collection. The response is a JSON Schema of a object where each property is a queryable. operationId: getQueryables parameters: - $ref: 'https://schemas.opengis.net/ogcapi/features/part1/1.0/openapi/parameters/collectionId.yaml' responses: '200': description: The queryable properties of the collection. content: application/schema+json: schema: type: object /functions: get: summary: Get the list of supported functions description: |- This operation returns the list of custom functions supported in CQL2 expressions. operationId: getFunctions responses: '200': description: The list of custom functions supported in CQL2 expressions. content: application/json: schema: $ref: '#/components/schemas/functions' components: parameters: filter: name: filter in: query required: false schema: type: string style: form explode: false filter-lang: name: filter-lang in: query required: false schema: type: string enum: - 'cql2-text' - 'cql2-json' default: 'cql2-text' style: form filter-crs: name: filter-crs in: query required: false schema: type: string format: uri-reference style: form explode: false schemas: functions: type: object required: - functions properties: functions: type: array items: type: object required: - name - returns properties: name: type: string description: type: string metadataUrl: type: string format: uri-reference arguments: type: array items: type: object required: - type properties: title: type: string description: type: string type: type: array items: type: string enum: - string - number - integer - datetime - geometry - boolean returns: type: array items: type: string enum: - string - number - integer - datetime - geometry - boolean ``` -------------------------------------------------------------------------------- /src/middleware/http_middleware.py: -------------------------------------------------------------------------------- ```python import os import time from collections import defaultdict, deque from typing import List, Callable, Awaitable from starlette.middleware.base import BaseHTTPMiddleware from starlette.requests import Request from starlette.responses import Response, JSONResponse from utils.logging_config import get_logger logger = get_logger(__name__) class RateLimiter: """HTTP-layer rate limiting""" def __init__(self, requests_per_minute: int = 10, window_seconds: int = 60): self.requests_per_minute = requests_per_minute self.window_seconds = window_seconds self.request_timestamps = defaultdict(lambda: deque()) def check_rate_limit(self, client_id: str) -> bool: """Check if client has exceeded rate limit""" current_time = time.time() timestamps = self.request_timestamps[client_id] while timestamps and current_time - timestamps[0] >= self.window_seconds: timestamps.popleft() if len(timestamps) >= self.requests_per_minute: logger.warning(f"HTTP rate limit exceeded for client {client_id}") return False timestamps.append(current_time) return True def get_valid_bearer_tokens() -> List[str]: """Get valid bearer tokens from environment variable.""" try: tokens = os.environ.get("BEARER_TOKENS", "").split(",") valid_tokens = [t.strip() for t in tokens if t.strip()] if not valid_tokens: logger.warning( "No BEARER_TOKENS configured, all authentication will be rejected" ) return [] return valid_tokens except Exception as e: logger.error(f"Error getting valid tokens: {e}") return [] async def verify_bearer_token(token: str) -> bool: """Verify bearer token is valid.""" try: valid_tokens = get_valid_bearer_tokens() if not valid_tokens or not token: return False return token in valid_tokens except Exception as e: logger.error(f"Error validating token: {e}") return False class HTTPMiddleware(BaseHTTPMiddleware): def __init__(self, app, requests_per_minute: int = 10): super().__init__(app) self.rate_limiter = RateLimiter(requests_per_minute=requests_per_minute) async def dispatch( self, request: Request, call_next: Callable[[Request], Awaitable[Response]] ) -> Response: if request.url.path == "/.well-known/mcp-auth" or request.method == "OPTIONS": return await call_next(request) session_id = request.headers.get("mcp-session-id") if not session_id: client_ip = request.client.host if request.client else "unknown" session_id = f"ip-{client_ip}" if not self.rate_limiter.check_rate_limit(session_id): return JSONResponse( status_code=429, content={"detail": "Too many requests. Please try again later."}, headers={"Retry-After": "60"}, ) origin = request.headers.get("origin", "") if origin and not self._is_valid_origin(origin, request): client_ip = request.client.host if request.client else "unknown" logger.warning( f"Blocked request with suspicious origin from {client_ip}, Origin: {origin}" ) return JSONResponse(status_code=403, content={"detail": "Invalid origin"}) user_agent = request.headers.get("user-agent", "") if self._is_browser_plugin(user_agent, request): client_ip = request.client.host if request.client else "unknown" logger.warning( f"Blocked browser plugin access from {client_ip}, User-Agent: {user_agent}" ) return JSONResponse( status_code=403, content={"detail": "Browser plugin access is not allowed"}, ) auth_header = request.headers.get("Authorization") if auth_header and auth_header.startswith("Bearer "): token = auth_header.replace("Bearer ", "") if await verify_bearer_token(token): request.state.token = token return await call_next(request) else: logger.warning( f"Invalid bearer token attempt from {request.client.host if request.client else 'unknown'}" ) else: logger.warning( f"Missing or invalid Authorization header from {request.client.host if request.client else 'unknown'}" ) return JSONResponse( status_code=401, content={"detail": "Authentication required"}, headers={"WWW-Authenticate": "Bearer"}, ) def _is_valid_origin(self, origin: str, request: Request) -> bool: """Validate Origin header to prevent DNS rebinding attacks.""" valid_local_origins = ["http://localhost:", "http://127.0.0.1:"] valid_domains = os.environ.get("ALLOWED_ORIGINS", "").split(",") valid_domains = [d.strip() for d in valid_domains if d.strip()] for valid_origin in valid_local_origins: if origin.startswith(valid_origin): return True for domain in valid_domains: if ( origin == domain or origin.startswith(f"https://{domain}") or origin.startswith(f"http://{domain}") ): return True return False def _is_browser_plugin(self, user_agent: str, request: Request) -> bool: """Check if request is from a browser plugin.""" plugin_patterns = [ "Chrome-Extension", "Mozilla/5.0 (compatible; Extension)", "Browser-Extension", ] for pattern in plugin_patterns: if pattern.lower() in user_agent.lower(): return True origin = request.headers.get("origin", "") if origin and ( origin.startswith("chrome-extension://") or origin.startswith("moz-extension://") or origin.startswith("safari-extension://") ): return True return False ``` -------------------------------------------------------------------------------- /src/stdio_client_test.py: -------------------------------------------------------------------------------- ```python import asyncio import subprocess import sys import os import time from mcp import ClientSession from mcp.client.stdio import stdio_client from mcp.client.stdio import StdioServerParameters from mcp.types import TextContent def extract_text_from_result(result) -> str: """Safely extract text from MCP tool result""" if not result.content: return "No content" for item in result.content: if isinstance(item, TextContent): return item.text content_types = [type(item).__name__ for item in result.content] return f"Non-text content: {', '.join(content_types)}" async def test_stdio_rate_limiting(): """Test STDIO rate limiting - should block after 1 request per minute""" env = os.environ.copy() env["STDIO_KEY"] = "test-stdio-key" env["OS_API_KEY"] = os.environ.get("OS_API_KEY", "dummy-key-for-testing") env["PYTHONPATH"] = "src" print("Starting STDIO server subprocess...") server_process = subprocess.Popen( [sys.executable, "src/server.py", "--transport", "stdio", "--debug"], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=env, text=True, bufsize=0, ) try: print("Connecting to STDIO server...") server_params = StdioServerParameters( command=sys.executable, args=["src/server.py", "--transport", "stdio", "--debug"], env=env, ) async with stdio_client(server_params) as (read_stream, write_stream): async with ClientSession(read_stream, write_stream) as session: print("Initializing session...") await session.initialize() print("Listing available tools...") tools = await session.list_tools() print(f" Found {len(tools.tools)} tools") print("\nRAPID FIRE TEST (should hit rate limit)") print("-" * 50) results = [] for i in range(3): try: start_time = time.time() print(f"Request #{i + 1}: ", end="", flush=True) result = await session.call_tool( "hello_world", {"name": f"TestUser{i + 1}"} ) elapsed = time.time() - start_time result_text = extract_text_from_result(result) if "rate limit" in result_text.lower() or "429" in result_text: print(f"BLOCKED ({elapsed:.1f}s) - {result_text}") results.append(("BLOCKED", elapsed, result_text)) else: print(f"SUCCESS ({elapsed:.1f}s) - {result_text}") results.append(("SUCCESS", elapsed, result_text)) except Exception as e: elapsed = time.time() - start_time print(f"ERROR ({elapsed:.1f}s) - {e}") results.append(("ERROR", elapsed, str(e))) if i < 2: await asyncio.sleep(0.1) print("\nRESULTS ANALYSIS") print("-" * 50) success_count = sum(1 for r in results if r[0] == "SUCCESS") blocked_count = sum(1 for r in results if r[0] == "BLOCKED") error_count = sum(1 for r in results if r[0] == "ERROR") print(f"Successful requests: {success_count}") print(f"Rate limited requests: {blocked_count}") print(f"Error requests: {error_count}") print("\nVISUAL TIMELINE:") timeline = "" for i, (status, elapsed, _) in enumerate(results): if status == "SUCCESS": timeline += "OK" elif status == "BLOCKED": timeline += "XX" else: timeline += "ER" if i < len(results) - 1: timeline += "--" print(f" {timeline}") print(" Request: 1 2 3") print("\nTEST VERDICT") print("-" * 50) if success_count == 1 and blocked_count >= 1: print("PASS: Rate limiting works correctly!") print(" First request succeeded") print(" Subsequent requests blocked") elif success_count > 1: print("FAIL: Rate limiting too permissive!") print(f" {success_count} requests succeeded (expected 1)") else: print("FAIL: No requests succeeded!") print(" Check authentication or server issues") print("\nWAITING FOR RATE LIMIT RESET...") print(" (Testing if limit resets after window)") print(" Waiting 10 seconds...") for countdown in range(10, 0, -1): print(f" {countdown}s remaining...", end="\r", flush=True) await asyncio.sleep(1) print("\nTesting after rate limit window...") try: result = await session.call_tool( "hello_world", {"name": "AfterWait"} ) result_text = extract_text_from_result(result) if "rate limit" not in result_text.lower(): print("SUCCESS: Rate limit properly reset!") else: print("Rate limit still active (might need longer wait)") except Exception as e: print(f"Error after wait: {e}") except Exception as e: print(f"Test failed with error: {e}") finally: print("\nCleaning up server process...") server_process.terminate() try: await asyncio.wait_for( asyncio.create_task(asyncio.to_thread(server_process.wait)), timeout=5.0 ) print("Server terminated gracefully") except asyncio.TimeoutError: print("Force killing server...") server_process.kill() server_process.wait() if __name__ == "__main__": print("STDIO Rate Limit Test Suite") print("Testing if STDIO middleware properly blocks > 1 request/minute") print() if not os.environ.get("OS_API_KEY"): print("Warning: OS_API_KEY not set, using dummy key for testing") print(" (This is OK for rate limit testing)") print() asyncio.run(test_stdio_rate_limiting()) ``` -------------------------------------------------------------------------------- /src/http_client_test.py: -------------------------------------------------------------------------------- ```python import asyncio import logging from mcp.client.streamable_http import streamablehttp_client from mcp import ClientSession from mcp.types import TextContent logging.basicConfig(level=logging.WARNING) logger = logging.getLogger(__name__) def extract_text_from_result(result) -> str: """Safely extract text from MCP tool result""" if not result.content: return "No content" for item in result.content: if isinstance(item, TextContent): return item.text content_types = [type(item).__name__ for item in result.content] return f"Non-text content: {', '.join(content_types)}" async def test_usrn_search(session_name: str, usrn_values: list): """Test USRN searches with different values""" headers = {"Authorization": "Bearer dev-token"} results = [] session_id = None print(f"\n{session_name} - Testing USRN searches") print("-" * 40) try: async with streamablehttp_client( "http://127.0.0.1:8000/mcp", headers=headers ) as ( read_stream, write_stream, get_session_id, ): async with ClientSession(read_stream, write_stream) as session: await session.initialize() session_id = get_session_id() print(f"{session_name} Session ID: {session_id}") for i, usrn in enumerate(usrn_values): try: result = await session.call_tool( "search_features", { "collection_id": "trn-ntwk-street-1", "query_attr": "usrn", "query_attr_value": str(usrn), "limit": 5, }, ) result_text = extract_text_from_result(result) print(f" USRN {usrn}: SUCCESS - Found data") print(result_text) results.append(("SUCCESS", usrn, result_text[:100] + "...")) await asyncio.sleep(0.1) except Exception as e: if "429" in str(e) or "Too Many Requests" in str(e): print(f" USRN {usrn}: BLOCKED - Rate limited") results.append(("BLOCKED", usrn, str(e))) else: print(f" USRN {usrn}: ERROR - {e}") results.append(("ERROR", usrn, str(e))) except Exception as e: print(f"{session_name}: Connection error - {str(e)[:100]}") if len(results) == 0: results = [("ERROR", "connection", str(e))] * len(usrn_values) return results, session_id async def test_usrn_calls(): """Test calling USRN searches with different values""" print("USRN Search Test - Two Different USRNs") print("=" * 50) # Different USRN values to test usrn_values_1 = ["24501091", "24502114"] usrn_values_2 = ["24502114", "24501091"] results_a, session_id_a = await test_usrn_search("SESSION-A", usrn_values_1) await asyncio.sleep(0.5) results_b, session_id_b = await test_usrn_search("SESSION-B", usrn_values_2) # Analyze results print("\nRESULTS SUMMARY") print("=" * 30) success_a = len([r for r in results_a if r[0] == "SUCCESS"]) blocked_a = len([r for r in results_a if r[0] == "BLOCKED"]) error_a = len([r for r in results_a if r[0] == "ERROR"]) success_b = len([r for r in results_b if r[0] == "SUCCESS"]) blocked_b = len([r for r in results_b if r[0] == "BLOCKED"]) error_b = len([r for r in results_b if r[0] == "ERROR"]) print(f"SESSION-A: {success_a} success, {blocked_a} blocked, {error_a} errors") print(f"SESSION-B: {success_b} success, {blocked_b} blocked, {error_b} errors") print(f"Total: {success_a + success_b} success, {blocked_a + blocked_b} blocked") if session_id_a and session_id_b: print(f"Different session IDs: {session_id_a != session_id_b}") else: print("Could not compare session IDs") # Show detailed results print("\nDETAILED RESULTS:") print("SESSION-A:") for status, usrn, details in results_a: print(f" USRN {usrn}: {status}") print("SESSION-B:") for status, usrn, details in results_b: print(f" USRN {usrn}: {status}") async def test_session_safe(session_name: str, requests: int = 3): """Test a single session with safer error handling""" headers = {"Authorization": "Bearer dev-token"} results = [] session_id = None print(f"\n{session_name} - Testing {requests} requests") print("-" * 40) try: async with streamablehttp_client( "http://127.0.0.1:8000/mcp", headers=headers ) as ( read_stream, write_stream, get_session_id, ): async with ClientSession(read_stream, write_stream) as session: await session.initialize() session_id = get_session_id() print(f"{session_name} Session ID: {session_id}") for i in range(requests): try: result = await session.call_tool( "hello_world", {"name": f"{session_name}-User{i + 1}"} ) result_text = extract_text_from_result(result) print(f" Request {i + 1}: SUCCESS - {result_text}") results.append("SUCCESS") await asyncio.sleep(0.1) except Exception as e: if "429" in str(e) or "Too Many Requests" in str(e): print(f" Request {i + 1}: BLOCKED - Rate limited") results.append("BLOCKED") else: print(f" Request {i + 1}: ERROR - {e}") results.append("ERROR") for j in range(i + 1, requests): print(f" Request {j + 1}: BLOCKED - Session rate limited") results.append("BLOCKED") break except Exception as e: print(f"{session_name}: Connection error - {str(e)[:100]}") if len(results) == 0: results = ["ERROR"] * requests elif len(results) < requests: remaining = requests - len(results) results.extend(["BLOCKED"] * remaining) return results, session_id async def test_two_sessions(): """Test rate limiting across two sessions""" print("HTTP Rate Limit Test - Two Sessions") print("=" * 50) results_a, session_id_a = await test_session_safe("SESSION-A", 20) await asyncio.sleep(0.5) results_b, session_id_b = await test_session_safe("SESSION-B", 20) # Analyze results print("\nRESULTS SUMMARY") print("=" * 30) success_a = results_a.count("SUCCESS") blocked_a = results_a.count("BLOCKED") error_a = results_a.count("ERROR") success_b = results_b.count("SUCCESS") blocked_b = results_b.count("BLOCKED") error_b = results_b.count("ERROR") print(f"SESSION-A: {success_a} success, {blocked_a} blocked, {error_a} errors") print(f"SESSION-B: {success_b} success, {blocked_b} blocked, {error_b} errors") print(f"Total: {success_a + success_b} success, {blocked_a + blocked_b} blocked") if session_id_a and session_id_b: print(f"Different session IDs: {session_id_a != session_id_b}") else: print("Could not compare session IDs") total_success = success_a + success_b total_blocked = blocked_a + blocked_b print("\nRATE LIMITING ASSESSMENT:") if total_success >= 2 and total_blocked >= 2: print("✅ PASS: Rate limiting is working") print(f" - {total_success} requests succeeded") print(f" - {total_blocked} requests were rate limited") print(" - Each session got limited after ~2 requests") elif total_success == 0: print("❌ FAIL: No requests succeeded (check server/auth)") else: print("⚠️ UNCLEAR: Unexpected pattern") print(" Check server logs for actual behavior") if __name__ == "__main__": # Run the USRN test by default asyncio.run(test_usrn_calls()) # Uncomment the line below to run the original test instead # asyncio.run(test_two_sessions()) ``` -------------------------------------------------------------------------------- /src/mcp_service/routing_service.py: -------------------------------------------------------------------------------- ```python from typing import Dict, List, Set, Optional, Any from dataclasses import dataclass from utils.logging_config import get_logger logger = get_logger(__name__) @dataclass class RouteNode: """Represents a routing node (intersection)""" id: int node_identifier: str connected_edges: Set[int] @dataclass class RouteEdge: """Represents a routing edge (road segment)""" id: int road_id: str road_name: Optional[str] source_node_id: int target_node_id: int cost: float reverse_cost: float geometry: Optional[Dict[str, Any]] class InMemoryRoutingNetwork: """In-memory routing network built from OS NGD data""" def __init__(self): self.nodes: Dict[int, RouteNode] = {} self.edges: Dict[int, RouteEdge] = {} self.node_lookup: Dict[str, int] = {} self.is_built = False def add_node(self, node_identifier: str) -> int: """Add a node and return its internal ID""" if node_identifier in self.node_lookup: return self.node_lookup[node_identifier] node_id = len(self.nodes) + 1 self.nodes[node_id] = RouteNode( id=node_id, node_identifier=node_identifier, connected_edges=set(), ) self.node_lookup[node_identifier] = node_id return node_id def add_edge(self, road_data: Dict[str, Any]) -> None: """Add a road link as an edge""" properties = road_data.get("properties", {}) start_node = properties.get("startnode", "") end_node = properties.get("endnode", "") if not start_node or not end_node: logger.warning(f"Road link {properties.get('id')} missing node data") return source_id = self.add_node(start_node) target_id = self.add_node(end_node) roadlink_id = None road_track_refs = properties.get("roadtrackorpathreference", []) if road_track_refs and len(road_track_refs) > 0: roadlink_id = road_track_refs[0].get("roadlinkid", "") edge_id = len(self.edges) + 1 cost = properties.get("geometry_length", 100.0) edge = RouteEdge( id=edge_id, road_id=roadlink_id or "NONE", road_name=properties.get("name1_text"), source_node_id=source_id, target_node_id=target_id, cost=cost, reverse_cost=cost, geometry=road_data.get("geometry"), ) self.edges[edge_id] = edge self.nodes[source_id].connected_edges.add(edge_id) self.nodes[target_id].connected_edges.add(edge_id) def get_connected_edges(self, node_id: int) -> List[RouteEdge]: """Get all edges connected to a node""" if node_id not in self.nodes: return [] return [self.edges[edge_id] for edge_id in self.nodes[node_id].connected_edges] def get_summary(self) -> Dict[str, Any]: """Get network summary statistics""" return { "total_nodes": len(self.nodes), "total_edges": len(self.edges), "is_built": self.is_built, "sample_nodes": [ { "id": node.id, "identifier": node.node_identifier, "connected_edges": len(node.connected_edges), } for node in list(self.nodes.values())[:5] ], } def get_all_nodes(self) -> List[Dict[str, Any]]: """Get all nodes as a flat list""" return [ { "id": node.id, "node_identifier": node.node_identifier, "connected_edge_count": len(node.connected_edges), "connected_edge_ids": list(node.connected_edges), } for node in self.nodes.values() ] def get_all_edges(self) -> List[Dict[str, Any]]: """Get all edges as a flat list""" return [ { "id": edge.id, "road_id": edge.road_id, "road_name": edge.road_name, "source_node_id": edge.source_node_id, "target_node_id": edge.target_node_id, "cost": edge.cost, "reverse_cost": edge.reverse_cost, "geometry": edge.geometry, } for edge in self.edges.values() ] class OSRoutingService: """Service to build and query routing networks from OS NGD data""" def __init__(self, api_client): self.api_client = api_client self.network = InMemoryRoutingNetwork() self.raw_restrictions: List[Dict[str, Any]] = [] async def _fetch_restriction_data( self, bbox: Optional[str] = None, limit: int = 100 ) -> List[Dict[str, Any]]: """Fetch raw restriction data""" try: logger.debug("Fetching restriction data...") params = { "limit": min(limit, 100), "crs": "http://www.opengis.net/def/crs/EPSG/0/4326", } if bbox: params["bbox"] = bbox restriction_data = await self.api_client.make_request( "COLLECTION_FEATURES", params=params, path_params=["trn-rami-restriction-1"], ) features = restriction_data.get("features", []) logger.debug(f"Fetched {len(features)} restriction features") return features except Exception as e: logger.error(f"Error fetching restriction data: {e}") return [] async def build_routing_network( self, bbox: Optional[str] = None, limit: int = 1000, include_restrictions: bool = True, ) -> Dict[str, Any]: """Build the routing network from OS NGD road links with optional restriction data""" try: logger.debug("Building routing network from OS NGD data...") if include_restrictions: self.raw_restrictions = await self._fetch_restriction_data(bbox, limit) params = { "limit": min(limit, 100), "crs": "http://www.opengis.net/def/crs/EPSG/0/4326", } if bbox: params["bbox"] = bbox road_links_data = await self.api_client.make_request( "COLLECTION_FEATURES", params=params, path_params=["trn-ntwk-roadlink-4"], ) features = road_links_data.get("features", []) logger.debug(f"Processing {len(features)} road links...") for feature in features: self.network.add_edge(feature) self.network.is_built = True summary = self.network.get_summary() logger.debug( f"Network built: {summary['total_nodes']} nodes, {summary['total_edges']} edges" ) return { "status": "success", "message": f"Built routing network with {summary['total_nodes']} nodes and {summary['total_edges']} edges", "network_summary": summary, "restrictions": self.raw_restrictions if include_restrictions else [], "restriction_count": len(self.raw_restrictions) if include_restrictions else 0, } except Exception as e: logger.error(f"Error building routing network: {e}") return {"status": "error", "error": str(e)} def get_network_info(self) -> Dict[str, Any]: """Get current network information""" return {"status": "success", "network": self.network.get_summary()} def get_flat_nodes(self) -> Dict[str, Any]: """Get flat list of all nodes""" if not self.network.is_built: return { "status": "error", "error": "Routing network not built. Call build_routing_network first.", } return {"status": "success", "nodes": self.network.get_all_nodes()} def get_flat_edges(self) -> Dict[str, Any]: """Get flat list of all edges with connections""" if not self.network.is_built: return { "status": "error", "error": "Routing network not built. Call build_routing_network first.", } return {"status": "success", "edges": self.network.get_all_edges()} def get_routing_tables(self) -> Dict[str, Any]: """Get both nodes and edges as flat tables""" if not self.network.is_built: return { "status": "error", "error": "Routing network not built. Call build_routing_network first.", } return { "status": "success", "nodes": self.network.get_all_nodes(), "edges": self.network.get_all_edges(), "summary": self.network.get_summary(), "restrictions": self.raw_restrictions, } ``` -------------------------------------------------------------------------------- /src/api_service/os_api.py: -------------------------------------------------------------------------------- ```python import os import aiohttp import asyncio import re import concurrent.futures import threading from typing import Dict, List, Any, Optional from models import ( NGDAPIEndpoint, OpenAPISpecification, Collection, CollectionsCache, CollectionQueryables, ) from api_service.protocols import APIClient from utils.logging_config import get_logger logger = get_logger(__name__) class OSAPIClient(APIClient): """Implementation an OS API client""" user_agent = "os-ngd-mcp-server/1.0" def __init__(self, api_key: Optional[str] = None): """ Initialise the OS API client Args: api_key: Optional API key, if not provided will use OS_API_KEY env var """ self.api_key = api_key self.session = None self.last_request_time = 0 # TODO: This is because there seems to be some rate limiting in place - TBC if this is the case self.request_delay = 0.7 self._cached_openapi_spec: Optional[OpenAPISpecification] = None self._cached_collections: Optional[CollectionsCache] = None # Private helper methods def _sanitise_api_key(self, text: Any) -> str: """Remove API keys from any text (URLs, error messages, etc.)""" if not isinstance(text, str): return text patterns = [ r"[?&]key=[^&\s]*", r"[?&]api_key=[^&\s]*", r"[?&]apikey=[^&\s]*", r"[?&]token=[^&\s]*", ] sanitized = text for pattern in patterns: sanitized = re.sub(pattern, "", sanitized, flags=re.IGNORECASE) sanitized = re.sub(r"[?&]$", "", sanitized) sanitized = re.sub(r"&{2,}", "&", sanitized) sanitized = re.sub(r"\?&", "?", sanitized) return sanitized def _sanitise_response(self, data: Any) -> Any: """Remove API keys from response data recursively""" if isinstance(data, dict): sanitized_dict = {} for key, value in data.items(): if isinstance(value, str) and any( url_indicator in key.lower() for url_indicator in ["href", "url", "link", "uri"] ): sanitized_dict[key] = self._sanitise_api_key(value) elif isinstance(value, (dict, list)): sanitized_dict[key] = self._sanitise_response(value) else: sanitized_dict[key] = value return sanitized_dict elif isinstance(data, list): return [self._sanitise_response(item) for item in data] elif isinstance(data, str): if any( indicator in data for indicator in [ "http://", "https://", "key=", "api_key=", "apikey=", "token=", ] ): return self._sanitise_api_key(data) return data def _filter_latest_collections( self, collections: List[Dict[str, Any]] ) -> List[Collection]: """ Filter collections to keep only the latest version of each collection type. For collections with IDs like 'trn-ntwk-roadlink-1', 'trn-ntwk-roadlink-2', 'trn-ntwk-roadlink-3', only keep the one with the highest number. Args: collections: Raw collections from API Returns: Filtered list of Collection objects """ latest_versions: Dict[str, Dict[str, Any]] = {} for col in collections: col_id = col.get("id", "") match = re.match(r"^(.+?)-(\d+)$", col_id) if match: base_name = match.group(1) version_num = int(match.group(2)) if ( base_name not in latest_versions or version_num > latest_versions[base_name]["version"] ): latest_versions[base_name] = {"version": version_num, "data": col} else: latest_versions[col_id] = {"version": 0, "data": col} filtered_collections = [] for item in latest_versions.values(): col_data = item["data"] filtered_collections.append( Collection( id=col_data.get("id", ""), title=col_data.get("title", ""), description=col_data.get("description", ""), links=col_data.get("links", []), extent=col_data.get("extent", {}), itemType=col_data.get("itemType", "feature"), ) ) return filtered_collections def _parse_openapi_spec_for_llm( self, spec_data: dict, collection_ids: List[str] ) -> dict: """Parse OpenAPI spec to extract only essential information for LLM context""" supported_crs = { "input": [], "output": [], "default": "http://www.opengis.net/def/crs/OGC/1.3/CRS84", } parsed = { "title": spec_data.get("info", {}).get("title", ""), "version": spec_data.get("info", {}).get("version", ""), "base_url": spec_data.get("servers", [{}])[0].get("url", ""), "endpoints": {}, "collection_ids": collection_ids, "supported_crs": supported_crs, } paths = spec_data.get("paths", {}) for path, methods in paths.items(): for method, details in methods.items(): if method == "get" and "parameters" in details: for param in details["parameters"]: param_name = param.get("name", "") if param_name == "collectionId" and "schema" in param: enum_values = param["schema"].get("enum", []) if enum_values: parsed["collection_ids"] = enum_values elif ( param_name in ["bbox-crs", "filter-crs"] and "schema" in param ): crs_values = param["schema"].get("enum", []) if crs_values and not supported_crs["input"]: supported_crs["input"] = crs_values elif param_name == "crs" and "schema" in param: crs_values = param["schema"].get("enum", []) if crs_values and not supported_crs["output"]: supported_crs["output"] = crs_values endpoint_patterns = { "/collections": "List all collections", "/collections/{collectionId}": "Get collection info", "/collections/{collectionId}/schema": "Get collection schema", "/collections/{collectionId}/queryables": "Get collection queryables", "/collections/{collectionId}/items": "Search features in collection", "/collections/{collectionId}/items/{featureId}": "Get specific feature", } parsed["endpoints"] = endpoint_patterns parsed["crs_guide"] = { "WGS84": "http://www.opengis.net/def/crs/OGC/1.3/CRS84 (default, longitude/latitude)", "British_National_Grid": "http://www.opengis.net/def/crs/EPSG/0/27700 (UK Ordnance Survey)", "WGS84_latlon": "http://www.opengis.net/def/crs/EPSG/0/4326 (latitude/longitude)", "Web_Mercator": "http://www.opengis.net/def/crs/EPSG/0/3857 (Web mapping)", } return parsed # Private async methods async def _get_open_api_spec(self) -> OpenAPISpecification: """Get the OpenAPI spec for the OS NGD API""" try: response = await self.make_request("OPENAPI_SPEC", params={"f": "json"}) # Sanitize the raw response before processing sanitized_response = self._sanitise_response(response) collections_cache = await self.cache_collections() filtered_collection_ids = [col.id for col in collections_cache.collections] parsed_spec = self._parse_openapi_spec_for_llm( sanitized_response, filtered_collection_ids ) spec = OpenAPISpecification( title=parsed_spec["title"], version=parsed_spec["version"], base_url=parsed_spec["base_url"], endpoints=parsed_spec["endpoints"], collection_ids=filtered_collection_ids, supported_crs=parsed_spec["supported_crs"], crs_guide=parsed_spec["crs_guide"], ) return spec except Exception as e: raise ValueError(f"Failed to get OpenAPI spec: {e}") async def cache_openapi_spec(self) -> OpenAPISpecification: """ Cache the OpenAPI spec. Returns: The cached OpenAPI spec """ if self._cached_openapi_spec is None: logger.debug("Caching OpenAPI spec for LLM context...") try: self._cached_openapi_spec = await self._get_open_api_spec() logger.debug("OpenAPI spec successfully cached") except Exception as e: raise ValueError(f"Failed to cache OpenAPI spec: {e}") return self._cached_openapi_spec async def _get_collections(self) -> CollectionsCache: """Get all collections from the OS NGD API""" try: response = await self.make_request("COLLECTIONS") collections_list = response.get("collections", []) filtered = self._filter_latest_collections(collections_list) logger.debug(f"Filtered collections: {len(filtered)} collections") return CollectionsCache(collections=filtered, raw_response=response) except Exception as e: sanitized_error = self._sanitise_api_key(str(e)) logger.error(f"Error getting collections: {sanitized_error}") raise ValueError(f"Failed to get collections: {sanitized_error}") async def cache_collections(self) -> CollectionsCache: """ Cache the collections data with filtering applied. Returns: The cached collections """ if self._cached_collections is None: logger.debug("Caching collections for LLM context...") try: self._cached_collections = await self._get_collections() logger.debug( f"Collections successfully cached - {len(self._cached_collections.collections)} collections after filtering" ) except Exception as e: sanitized_error = self._sanitise_api_key(str(e)) raise ValueError(f"Failed to cache collections: {sanitized_error}") return self._cached_collections async def fetch_collections_queryables( self, collection_ids: List[str] ) -> Dict[str, CollectionQueryables]: """Fetch detailed queryables for specific collections only""" if not collection_ids: return {} logger.debug(f"Fetching queryables for specific collections: {collection_ids}") collections_cache = await self.cache_collections() collections_map = {coll.id: coll for coll in collections_cache.collections} tasks = [ self.make_request("COLLECTION_QUERYABLES", path_params=[collection_id]) for collection_id in collection_ids if collection_id in collections_map ] if not tasks: return {} raw_queryables = await asyncio.gather(*tasks, return_exceptions=True) def process_single_collection_queryables(collection_id, queryables_data): collection = collections_map[collection_id] logger.debug( f"Processing collection {collection.id} in thread {threading.current_thread().name}" ) if isinstance(queryables_data, Exception): logger.warning( f"Failed to fetch queryables for {collection.id}: {queryables_data}" ) return ( collection.id, CollectionQueryables( id=collection.id, title=collection.title, description=collection.description, all_queryables={}, enum_queryables={}, has_enum_filters=False, total_queryables=0, enum_count=0, ), ) all_queryables = {} enum_queryables = {} properties = queryables_data.get("properties", {}) for prop_name, prop_details in properties.items(): prop_type = prop_details.get("type", ["string"]) if isinstance(prop_type, list): main_type = prop_type[0] if prop_type else "string" is_nullable = "null" in prop_type else: main_type = prop_type is_nullable = False all_queryables[prop_name] = { "type": main_type, "nullable": is_nullable, "max_length": prop_details.get("maxLength"), "format": prop_details.get("format"), "pattern": prop_details.get("pattern"), "minimum": prop_details.get("minimum"), "maximum": prop_details.get("maximum"), "is_enum": prop_details.get("enumeration", False), } if prop_details.get("enumeration") and "enum" in prop_details: enum_queryables[prop_name] = { "values": prop_details["enum"], "type": main_type, "nullable": is_nullable, "max_length": prop_details.get("maxLength"), } all_queryables[prop_name]["enum_values"] = prop_details["enum"] all_queryables[prop_name] = { k: v for k, v in all_queryables[prop_name].items() if v is not None } return ( collection.id, CollectionQueryables( id=collection.id, title=collection.title, description=collection.description, all_queryables=all_queryables, enum_queryables=enum_queryables, has_enum_filters=len(enum_queryables) > 0, total_queryables=len(all_queryables), enum_count=len(enum_queryables), ), ) with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor: collection_data_pairs = list(zip(collection_ids, raw_queryables)) processed = await asyncio.get_event_loop().run_in_executor( executor, lambda: [ process_single_collection_queryables(coll_id, data) for coll_id, data in collection_data_pairs if coll_id in collections_map ], ) return {coll_id: queryables for coll_id, queryables in processed} # Public async methods async def initialise(self): """Initialise the aiohttp session if not already created""" if self.session is None: self.session = aiohttp.ClientSession( connector=aiohttp.TCPConnector( force_close=True, limit=1, # TODO: Strict limit to only 1 connection - may need to revisit this ) ) async def close(self): """Close the session when done""" if self.session: await self.session.close() self.session = None self._cached_openapi_spec = None self._cached_collections = None async def get_api_key(self) -> str: """Get the OS API key from environment variable or init param.""" if self.api_key: return self.api_key api_key = os.environ.get("OS_API_KEY") if not api_key: raise ValueError("OS_API_KEY environment variable is not set") return api_key async def make_request( self, endpoint: str, params: Optional[Dict[str, Any]] = None, path_params: Optional[List[str]] = None, max_retries: int = 2, ) -> Dict[str, Any]: """ Make a request to the OS NGD API with proper error handling. Args: endpoint: Enum endpoint to use params: Additional query parameters path_params: Parameters to format into the URL path max_retries: Maximum number of retries for transient errors Returns: JSON response as dictionary """ await self.initialise() if self.session is None: raise ValueError("Session not initialised") current_time = asyncio.get_event_loop().time() elapsed = current_time - self.last_request_time if elapsed < self.request_delay: await asyncio.sleep(self.request_delay - elapsed) try: endpoint_value = NGDAPIEndpoint[endpoint].value except KeyError: raise ValueError(f"Invalid endpoint: {endpoint}") if path_params: endpoint_value = endpoint_value.format(*path_params) api_key = await self.get_api_key() request_params = params or {} request_params["key"] = api_key headers = {"User-Agent": self.user_agent, "Accept": "application/json"} client_ip = getattr(self.session, "_source_address", None) client_info = f" from {client_ip}" if client_ip else "" sanitized_url = self._sanitise_api_key(endpoint_value) logger.info(f"Requesting URL: {sanitized_url}{client_info}") for attempt in range(1, max_retries + 1): try: self.last_request_time = asyncio.get_event_loop().time() timeout = aiohttp.ClientTimeout(total=30.0) async with self.session.get( endpoint_value, params=request_params, headers=headers, timeout=timeout, ) as response: if response.status >= 400: error_text = await response.text() sanitized_error = self._sanitise_api_key(error_text) error_message = ( f"HTTP Error: {response.status} - {sanitized_error}" ) logger.error(f"Error: {error_message}") raise ValueError(error_message) response_data = await response.json() return self._sanitise_response(response_data) except (aiohttp.ClientError, asyncio.TimeoutError) as e: if attempt == max_retries: sanitized_exception = self._sanitise_api_key(str(e)) error_message = f"Request failed after {max_retries} attempts: {sanitized_exception}" logger.error(f"Error: {error_message}") raise ValueError(error_message) else: await asyncio.sleep(0.7) except Exception as e: sanitized_exception = self._sanitise_api_key(str(e)) error_message = f"Request failed: {sanitized_exception}" logger.error(f"Error: {error_message}") raise ValueError(error_message) raise RuntimeError( "Unreachable: make_request exited retry loop without returning or raising" ) async def make_request_no_auth( self, url: str, params: Optional[Dict[str, Any]] = None, max_retries: int = 2, ) -> str: """ Make a request without authentication (for public endpoints like documentation). Args: url: Full URL to request params: Additional query parameters max_retries: Maximum number of retries for transient errors Returns: Response text (not JSON parsed) """ await self.initialise() if self.session is None: raise ValueError("Session not initialised") current_time = asyncio.get_event_loop().time() elapsed = current_time - self.last_request_time if elapsed < self.request_delay: await asyncio.sleep(self.request_delay - elapsed) request_params = params or {} headers = {"User-Agent": self.user_agent} logger.info(f"Requesting URL (no auth): {url}") for attempt in range(1, max_retries + 1): try: self.last_request_time = asyncio.get_event_loop().time() timeout = aiohttp.ClientTimeout(total=30.0) async with self.session.get( url, params=request_params, headers=headers, timeout=timeout, ) as response: if response.status >= 400: error_text = await response.text() error_message = f"HTTP Error: {response.status} - {error_text}" logger.error(f"Error: {error_message}") raise ValueError(error_message) return await response.text() except (aiohttp.ClientError, asyncio.TimeoutError) as e: if attempt == max_retries: error_message = ( f"Request failed after {max_retries} attempts: {str(e)}" ) logger.error(f"Error: {error_message}") raise ValueError(error_message) else: await asyncio.sleep(0.7) except Exception as e: error_message = f"Request failed: {str(e)}" logger.error(f"Error: {error_message}") raise ValueError(error_message) raise RuntimeError( "Unreachable: make_request_no_auth exited retry loop without returning or raising" ) ``` -------------------------------------------------------------------------------- /src/mcp_service/os_service.py: -------------------------------------------------------------------------------- ```python import json import asyncio import functools import re from typing import Optional, List, Dict, Any, Union, Callable from api_service.protocols import APIClient from prompt_templates.prompt_templates import PROMPT_TEMPLATES from mcp_service.protocols import MCPService, FeatureService from mcp_service.guardrails import ToolGuardrails from workflow_generator.workflow_planner import WorkflowPlanner from utils.logging_config import get_logger from mcp_service.resources import OSDocumentationResources from mcp_service.prompts import OSWorkflowPrompts from mcp_service.routing_service import OSRoutingService logger = get_logger(__name__) class OSDataHubService(FeatureService): """Implementation of the OS NGD API service with MCP""" def __init__( self, api_client: APIClient, mcp_service: MCPService, stdio_middleware=None ): """ Initialise the OS NGD service Args: api_client: API client implementation mcp_service: MCP service implementation stdio_middleware: Optional STDIO middleware for rate limiting """ self.api_client = api_client self.mcp = mcp_service self.stdio_middleware = stdio_middleware self.workflow_planner: Optional[WorkflowPlanner] = None self.guardrails = ToolGuardrails() self.routing_service = OSRoutingService(api_client) self.register_tools() self.register_resources() self.register_prompts() # Register all the resources, tools, and prompts def register_resources(self) -> None: """Register all MCP resources""" doc_resources = OSDocumentationResources(self.mcp, self.api_client) doc_resources.register_all() def register_tools(self) -> None: """Register all MCP tools with guardrails and middleware""" def apply_middleware(func: Callable) -> Callable: wrapped = self.guardrails.basic_guardrails(func) wrapped = self._require_workflow_context(wrapped) if self.stdio_middleware: wrapped = self.stdio_middleware.require_auth_and_rate_limit(wrapped) return wrapped # Apply middleware to ALL tools self.get_workflow_context = self.mcp.tool()( apply_middleware(self.get_workflow_context) ) self.hello_world = self.mcp.tool()(apply_middleware(self.hello_world)) self.check_api_key = self.mcp.tool()(apply_middleware(self.check_api_key)) self.list_collections = self.mcp.tool()(apply_middleware(self.list_collections)) self.get_single_collection = self.mcp.tool()( apply_middleware(self.get_single_collection) ) self.get_single_collection_queryables = self.mcp.tool()( apply_middleware(self.get_single_collection_queryables) ) self.search_features = self.mcp.tool()(apply_middleware(self.search_features)) self.get_feature = self.mcp.tool()(apply_middleware(self.get_feature)) self.get_linked_identifiers = self.mcp.tool()( apply_middleware(self.get_linked_identifiers) ) self.get_bulk_features = self.mcp.tool()( apply_middleware(self.get_bulk_features) ) self.get_bulk_linked_features = self.mcp.tool()( apply_middleware(self.get_bulk_linked_features) ) self.get_prompt_templates = self.mcp.tool()( apply_middleware(self.get_prompt_templates) ) self.fetch_detailed_collections = self.mcp.tool()( apply_middleware(self.fetch_detailed_collections) ) self.get_routing_data = self.mcp.tool()(apply_middleware(self.get_routing_data)) def register_prompts(self) -> None: """Register all MCP prompts""" workflow_prompts = OSWorkflowPrompts(self.mcp) workflow_prompts.register_all() # Run the MCP service def run(self) -> None: """Run the MCP service""" try: self.mcp.run() finally: try: try: loop = asyncio.get_running_loop() loop.create_task(self._cleanup()) except RuntimeError: asyncio.run(self._cleanup()) except Exception as e: logger.error(f"Error during cleanup: {e}") async def _cleanup(self): """Async cleanup method""" try: if hasattr(self, "api_client") and self.api_client: await self.api_client.close() logger.debug("API client closed successfully") except Exception as e: logger.error(f"Error closing API client: {e}") # Get the workflow context from the cached API client data # TODO: Lots of work to do here to reduce the size of the context and make it more readable for the LLM but not sacrificing the information async def get_workflow_context(self) -> str: """Get basic workflow context - no detailed queryables yet""" try: if self.workflow_planner is None: collections_cache = await self.api_client.cache_collections() basic_collections_info = { coll.id: { "id": coll.id, "title": coll.title, "description": coll.description, # No queryables here - will be fetched on-demand } for coll in collections_cache.collections } self.workflow_planner = WorkflowPlanner( await self.api_client.cache_openapi_spec(), basic_collections_info ) context = self.workflow_planner.get_basic_context() return json.dumps( { "CRITICAL_COLLECTION_LIST": sorted( context["available_collections"].keys() ), "MANDATORY_PLANNING_REQUIREMENT": { "CRITICAL": "You MUST follow the 2-step planning process:", "step_1": "Explain your complete plan listing which specific collections you will use and why", "step_2": "Call fetch_detailed_collections('collection-id-1,collection-id-2') to get queryables for those collections BEFORE making search calls", "required_explanation": { "1": "Which collections you will use and why", "2": "What you expect to find in those collections", "3": "What your search strategy will be", }, "workflow_enforcement": "Do not proceed with search_features until you have fetched detailed queryables", "example_planning": "I will use 'lus-fts-site-1' for finding cinemas. Let me fetch its detailed queryables first...", }, "available_collections": context[ "available_collections" ], # Basic info only - no queryables yet - this is to reduce the size of the context for the LLM "openapi_spec": context["openapi_spec"].model_dump() if context["openapi_spec"] else None, "TWO_STEP_WORKFLOW": { "step_1": "Plan with basic collection info (no detailed queryables available yet)", "step_2": "Use fetch_detailed_collections() to get queryables for your chosen collections", "step_3": "Execute search_features with proper filters using the fetched queryables", }, "AVAILABLE_TOOLS": { "fetch_detailed_collections": "Get detailed queryables for specific collections: fetch_detailed_collections('lus-fts-site-1,trn-ntwk-street-1')", "search_features": "Search features (requires detailed queryables first)", }, "QUICK_FILTERING_GUIDE": { "primary_tool": "search_features", "key_parameter": "filter", "enum_fields": "Use exact values from collection's enum_queryables (fetch these first!)", "simple_fields": "Use direct values (e.g., usrn = 12345678)", }, "COMMON_EXAMPLES": { "workflow_example": "1) Explain plan → 2) fetch_detailed_collections('lus-fts-site-1') → 3) search_features with proper filter", "cinema_search": "After fetching queryables: search_features(collection_id='lus-fts-site-1', filter=\"oslandusetertiarygroup = 'Cinema'\")", }, "CRITICAL_RULES": { "1": "ALWAYS explain your plan first", "2": "ALWAYS call fetch_detailed_collections() before search_features", "3": "Use exact enum values from the fetched enum_queryables", "4": "Quote string values in single quotes", }, } ) except Exception as e: logger.error(f"Error getting workflow context: {e}") return json.dumps( {"error": str(e), "instruction": "Proceed with available tools"} ) def _require_workflow_context(self, func: Callable) -> Callable: # Functions that don't need workflow context skip_functions = {"get_workflow_context", "hello_world", "check_api_key"} @functools.wraps(func) async def wrapper(*args: Any, **kwargs: Any) -> Any: if self.workflow_planner is None: if func.__name__ in skip_functions: return await func(*args, **kwargs) else: return json.dumps( { "error": "WORKFLOW CONTEXT REQUIRED", "blocked_tool": func.__name__, "required_action": "You must call 'get_workflow_context' first", "message": "No tools are available until you get the workflow context. Please call get_workflow_context() now.", } ) return await func(*args, **kwargs) return wrapper # TODO: This is a bit of a hack - we need to improve the error handling and retry logic # TODO: Could we actually spawn a seperate AI agent to handle the retry logic and return the result to the main agent? def _add_retry_context(self, response_data: dict, tool_name: str) -> dict: """Add retry guidance to tool responses""" if "error" in response_data: response_data["retry_guidance"] = { "tool": tool_name, "MANDATORY_INSTRUCTION 1": "Review the error message and try again with corrected parameters", "MANDATORY_INSTRUCTION 2": "YOU MUST call get_workflow_context() if you need to see available options again", } return response_data # All the tools async def hello_world(self, name: str) -> str: """Simple hello world tool for testing""" return f"Hello, {name}! 👋" async def check_api_key(self) -> str: """Check if the OS API key is available.""" try: await self.api_client.get_api_key() return json.dumps({"status": "success", "message": "OS_API_KEY is set!"}) except ValueError as e: return json.dumps({"status": "error", "message": str(e)}) async def list_collections( self, ) -> str: """ List all available feature collections in the OS NGD API. Returns: JSON string with collection info (id, title only) """ try: data = await self.api_client.make_request("COLLECTIONS") if not data or "collections" not in data: return json.dumps({"error": "No collections found"}) collections = [ {"id": col.get("id"), "title": col.get("title")} for col in data.get("collections", []) ] return json.dumps({"collections": collections}) except Exception as e: error_response = {"error": str(e)} return json.dumps( self._add_retry_context(error_response, "list_collections") ) async def get_single_collection( self, collection_id: str, ) -> str: """ Get detailed information about a specific collection. Args: collection_id: The collection ID Returns: JSON string with collection information """ try: data = await self.api_client.make_request( "COLLECTION_INFO", path_params=[collection_id] ) return json.dumps(data) except Exception as e: error_response = {"error": str(e)} return json.dumps( self._add_retry_context(error_response, "get_collection_info") ) async def get_single_collection_queryables( self, collection_id: str, ) -> str: """ Get the list of queryable properties for a collection. Args: collection_id: The collection ID Returns: JSON string with queryable properties """ try: data = await self.api_client.make_request( "COLLECTION_QUERYABLES", path_params=[collection_id] ) return json.dumps(data) except Exception as e: error_response = {"error": str(e)} return json.dumps( self._add_retry_context(error_response, "get_collection_queryables") ) async def search_features( self, collection_id: str, bbox: Optional[str] = None, crs: Optional[str] = None, limit: int = 10, offset: int = 0, filter: Optional[str] = None, filter_lang: Optional[str] = "cql-text", query_attr: Optional[str] = None, query_attr_value: Optional[str] = None, ) -> str: """Search for features in a collection with full CQL2 filter support.""" try: params: Dict[str, Union[str, int]] = {} if limit: params["limit"] = min(limit, 100) if offset: params["offset"] = max(0, offset) if bbox: params["bbox"] = bbox if crs: params["crs"] = crs if filter: if len(filter) > 1000: raise ValueError("Filter too long") dangerous_patterns = [ r";\s*--", r";\s*/\*", r"\bUNION\b", r"\bSELECT\b", r"\bINSERT\b", r"\bUPDATE\b", r"\bDELETE\b", r"\bDROP\b", r"\bCREATE\b", r"\bALTER\b", r"\bTRUNCATE\b", r"\bEXEC\b", r"\bEXECUTE\b", r"\bSP_\b", r"\bXP_\b", r"<script\b", r"javascript:", r"vbscript:", r"onload\s*=", r"onerror\s*=", r"onclick\s*=", r"\beval\s*\(", r"document\.", r"window\.", r"location\.", r"cookie", r"innerHTML", r"outerHTML", r"alert\s*\(", r"confirm\s*\(", r"prompt\s*\(", r"setTimeout\s*\(", r"setInterval\s*\(", r"Function\s*\(", r"constructor", r"prototype", r"__proto__", r"process\.", r"require\s*\(", r"import\s+", r"from\s+.*import", r"\.\./", r"file://", r"ftp://", r"data:", r"blob:", r"\\x[0-9a-fA-F]{2}", r"%[0-9a-fA-F]{2}", r"&#x[0-9a-fA-F]+;", r"&[a-zA-Z]+;", r"\$\{", r"#\{", r"<%", r"%>", r"{{", r"}}", r"\\\w+", r"\0", r"\r\n", r"\n\r", ] for pattern in dangerous_patterns: if re.search(pattern, filter, re.IGNORECASE): raise ValueError("Invalid filter content") if filter.count("'") % 2 != 0: raise ValueError("Unmatched quotes in filter") params["filter"] = filter.strip() if filter_lang: params["filter-lang"] = filter_lang elif query_attr and query_attr_value: if not re.match(r"^[a-zA-Z_][a-zA-Z0-9_]*$", query_attr): raise ValueError("Invalid field name") escaped_value = str(query_attr_value).replace("'", "''") params["filter"] = f"{query_attr} = '{escaped_value}'" if filter_lang: params["filter-lang"] = filter_lang if self.workflow_planner: valid_collections = set( self.workflow_planner.basic_collections_info.keys() ) if collection_id not in valid_collections: return json.dumps( { "error": f"Invalid collection '{collection_id}'. Valid collections: {sorted(valid_collections)[:10]}...", "suggestion": "Call get_workflow_context() to see all available collections", } ) data = await self.api_client.make_request( "COLLECTION_FEATURES", params=params, path_params=[collection_id] ) return json.dumps(data) except ValueError as ve: error_response = {"error": f"Invalid input: {str(ve)}"} return json.dumps( self._add_retry_context(error_response, "search_features") ) except Exception as e: error_response = {"error": str(e)} return json.dumps( self._add_retry_context(error_response, "search_features") ) async def get_feature( self, collection_id: str, feature_id: str, crs: Optional[str] = None, ) -> str: """ Get a specific feature by ID. Args: collection_id: The collection ID feature_id: The feature ID crs: Coordinate reference system for the response Returns: JSON string with feature data """ try: params: Dict[str, str] = {} if crs: params["crs"] = crs data = await self.api_client.make_request( "COLLECTION_FEATURE_BY_ID", params=params, path_params=[collection_id, feature_id], ) return json.dumps(data) except Exception as e: error_response = {"error": f"Error getting feature: {str(e)}"} return json.dumps(self._add_retry_context(error_response, "get_feature")) async def get_linked_identifiers( self, identifier_type: str, identifier: str, feature_type: Optional[str] = None, ) -> str: """ Get linked identifiers for a specified identifier. Args: identifier_type: The type of identifier (e.g., 'TOID', 'UPRN') identifier: The identifier value feature_type: Optional feature type to filter results Returns: JSON string with linked identifiers or filtered results """ try: data = await self.api_client.make_request( "LINKED_IDENTIFIERS", path_params=[identifier_type, identifier] ) if feature_type: # Filter results by feature type filtered_results = [] for item in data.get("results", []): if item.get("featureType") == feature_type: filtered_results.append(item) return json.dumps({"results": filtered_results}) return json.dumps(data) except Exception as e: error_response = {"error": str(e)} return json.dumps( self._add_retry_context(error_response, "get_linked_identifiers") ) async def get_bulk_features( self, collection_id: str, identifiers: List[str], query_by_attr: Optional[str] = None, ) -> str: """ Get multiple features in a single call. Args: collection_id: The collection ID identifiers: List of feature identifiers query_by_attr: Attribute to query by (if not provided, assumes feature IDs) Returns: JSON string with features data """ try: tasks: List[Any] = [] for identifier in identifiers: if query_by_attr: task = self.search_features( collection_id=collection_id, query_attr=query_by_attr, query_attr_value=identifier, limit=1, ) else: task = self.get_feature(collection_id, identifier) tasks.append(task) results = await asyncio.gather(*tasks) parsed_results = [json.loads(result) for result in results] return json.dumps({"results": parsed_results}) except Exception as e: error_response = {"error": str(e)} return json.dumps( self._add_retry_context(error_response, "get_bulk_features") ) async def get_bulk_linked_features( self, identifier_type: str, identifiers: List[str], feature_type: Optional[str] = None, ) -> str: """ Get linked features for multiple identifiers in a single call. Args: identifier_type: The type of identifier (e.g., 'TOID', 'UPRN') identifiers: List of identifier values feature_type: Optional feature type to filter results Returns: JSON string with linked features data """ try: tasks = [ self.get_linked_identifiers(identifier_type, identifier, feature_type) for identifier in identifiers ] results = await asyncio.gather(*tasks) parsed_results = [json.loads(result) for result in results] return json.dumps({"results": parsed_results}) except Exception as e: error_response = {"error": str(e)} return json.dumps( self._add_retry_context(error_response, "get_bulk_linked_features") ) async def get_prompt_templates( self, category: Optional[str] = None, ) -> str: """ Get standard prompt templates for interacting with this service. Args: category: Optional category of templates to return (general, collections, features, linked_identifiers) Returns: JSON string containing prompt templates """ if category and category in PROMPT_TEMPLATES: return json.dumps({category: PROMPT_TEMPLATES[category]}) return json.dumps(PROMPT_TEMPLATES) async def fetch_detailed_collections(self, collection_ids: str) -> str: """ Fetch detailed queryables for specific collections mentioned in LLM workflow plan. This is mainly to reduce the size of the context for the LLM. Only fetch what you really need. Args: collection_ids: Comma-separated list of collection IDs (e.g., "lus-fts-site-1,trn-ntwk-street-1") Returns: JSON string with detailed queryables for the specified collections """ try: if not self.workflow_planner: return json.dumps( { "error": "Workflow planner not initialized. Call get_workflow_context() first." } ) requested_collections = [cid.strip() for cid in collection_ids.split(",")] valid_collections = set(self.workflow_planner.basic_collections_info.keys()) invalid_collections = [ cid for cid in requested_collections if cid not in valid_collections ] if invalid_collections: return json.dumps( { "error": f"Invalid collection IDs: {invalid_collections}", "valid_collections": sorted(valid_collections), } ) cached_collections = [ cid for cid in requested_collections if cid in self.workflow_planner.detailed_collections_cache ] collections_to_fetch = [ cid for cid in requested_collections if cid not in self.workflow_planner.detailed_collections_cache ] if collections_to_fetch: logger.info(f"Fetching detailed queryables for: {collections_to_fetch}") detailed_queryables = ( await self.api_client.fetch_collections_queryables( collections_to_fetch ) ) for coll_id, queryables in detailed_queryables.items(): self.workflow_planner.detailed_collections_cache[coll_id] = { "id": queryables.id, "title": queryables.title, "description": queryables.description, "all_queryables": queryables.all_queryables, "enum_queryables": queryables.enum_queryables, "has_enum_filters": queryables.has_enum_filters, "total_queryables": queryables.total_queryables, "enum_count": queryables.enum_count, } context = self.workflow_planner.get_detailed_context(requested_collections) return json.dumps( { "success": True, "collections_processed": requested_collections, "collections_fetched_from_api": collections_to_fetch, "collections_from_cache": cached_collections, "detailed_collections": context["available_collections"], "message": f"Detailed queryables now available for: {', '.join(requested_collections)}", } ) except Exception as e: logger.error(f"Error fetching detailed collections: {e}") return json.dumps( {"error": str(e), "suggestion": "Check collection IDs and try again"} ) async def get_routing_data( self, bbox: Optional[str] = None, limit: int = 100, include_nodes: bool = True, include_edges: bool = True, build_network: bool = True, ) -> str: """ Get routing data - builds network and returns nodes/edges as flat tables. Args: bbox: Optional bounding box (format: "minx,miny,maxx,maxy") limit: Maximum number of road links to process (default: 1000) include_nodes: Whether to include nodes in response (default: True) include_edges: Whether to include edges in response (default: True) build_network: Whether to build network first (default: True) Returns: JSON string with routing network data """ try: result = {} if build_network: build_result = await self.routing_service.build_routing_network( bbox, limit ) result["build_status"] = build_result if build_result.get("status") != "success": return json.dumps(result) if include_nodes: nodes_result = self.routing_service.get_flat_nodes() result["nodes"] = nodes_result.get("nodes", []) if include_edges: edges_result = self.routing_service.get_flat_edges() result["edges"] = edges_result.get("edges", []) summary = self.routing_service.get_network_info() result["summary"] = summary.get("network", {}) result["status"] = "success" return json.dumps(result) except Exception as e: return json.dumps({"error": str(e)}) ``` -------------------------------------------------------------------------------- /src/config_docs/ogcapi-features-1.yaml: -------------------------------------------------------------------------------- ```yaml openapi: 3.0.2 info: title: "Building Blocks specified in OGC API - Features - Part 1: Core" description: |- Common components used in the [OGC standard "OGC API - Features - Part 1: Core"](http://docs.opengeospatial.org/is/17-069r3/17-069r3.html). OGC API - Features - Part 1: Core 1.0 is an OGC Standard. Copyright (c) 2019 Open Geospatial Consortium. To obtain additional rights of use, visit http://www.opengeospatial.org/legal/ . This document is also available on [OGC](http://schemas.opengis.net/ogcapi/features/part1/1.0/openapi/ogcapi-features-1.yaml). version: '1.0.0' contact: name: Clemens Portele email: [email protected] license: name: OGC License url: 'http://www.opengeospatial.org/legal/' components: parameters: bbox: name: bbox in: query description: |- Only features that have a geometry that intersects the bounding box are selected. The bounding box is provided as four or six numbers, depending on whether the coordinate reference system includes a vertical axis (height or depth): * Lower left corner, coordinate axis 1 * Lower left corner, coordinate axis 2 * Minimum value, coordinate axis 3 (optional) * Upper right corner, coordinate axis 1 * Upper right corner, coordinate axis 2 * Maximum value, coordinate axis 3 (optional) If the value consists of four numbers, the coordinate reference system is WGS 84 longitude/latitude (http://www.opengis.net/def/crs/OGC/1.3/CRS84) unless a different coordinate reference system is specified in the parameter `bbox-crs`. If the value consists of six numbers, the coordinate reference system is WGS 84 longitude/latitude/ellipsoidal height (http://www.opengis.net/def/crs/OGC/0/CRS84h) unless a different coordinate reference system is specified in the parameter `bbox-crs`. The query parameter `bbox-crs` is specified in OGC API - Features - Part 2: Coordinate Reference Systems by Reference. For WGS 84 longitude/latitude the values are in most cases the sequence of minimum longitude, minimum latitude, maximum longitude and maximum latitude. However, in cases where the box spans the antimeridian the first value (west-most box edge) is larger than the third value (east-most box edge). If the vertical axis is included, the third and the sixth number are the bottom and the top of the 3-dimensional bounding box. If a feature has multiple spatial geometry properties, it is the decision of the server whether only a single spatial geometry property is used to determine the extent or all relevant geometries. required: false schema: type: array oneOf: - minItems: 4 maxItems: 4 - minItems: 6 maxItems: 6 items: type: number style: form explode: false collectionId: name: collectionId in: path description: local identifier of a collection required: true schema: type: string datetime: name: datetime in: query description: |- Either a date-time or an interval. Date and time expressions adhere to RFC 3339. Intervals may be bounded or half-bounded (double-dots at start or end). Examples: * A date-time: "2018-02-12T23:20:50Z" * A bounded interval: "2018-02-12T00:00:00Z/2018-03-18T12:31:12Z" * Half-bounded intervals: "2018-02-12T00:00:00Z/.." or "../2018-03-18T12:31:12Z" Only features that have a temporal property that intersects the value of `datetime` are selected. If a feature has multiple temporal properties, it is the decision of the server whether only a single temporal property is used to determine the extent or all relevant temporal properties. required: false schema: type: string style: form explode: false featureId: name: featureId in: path description: local identifier of a feature required: true schema: type: string limit: name: limit in: query description: |- The optional limit parameter limits the number of items that are presented in the response document. Only items are counted that are on the first level of the collection in the response document. Nested objects contained within the explicitly requested items shall not be counted. Minimum = 1. Maximum = 10000. Default = 10. required: false schema: type: integer minimum: 1 maximum: 10000 default: 10 style: form explode: false schemas: collection: type: object required: - id - links properties: id: description: identifier of the collection used, for example, in URIs type: string example: address title: description: human readable title of the collection type: string example: address description: description: a description of the features in the collection type: string example: An address. links: type: array items: $ref: "#/components/schemas/link" example: - href: http://data.example.com/buildings rel: item - href: http://example.com/concepts/buildings.html rel: describedby type: text/html extent: $ref: "#/components/schemas/extent" itemType: description: indicator about the type of the items in the collection (the default value is 'feature'). type: string default: feature crs: description: the list of coordinate reference systems supported by the service type: array items: type: string default: - http://www.opengis.net/def/crs/OGC/1.3/CRS84 example: - http://www.opengis.net/def/crs/OGC/1.3/CRS84 - http://www.opengis.net/def/crs/EPSG/0/4326 collections: type: object required: - links - collections properties: links: type: array items: $ref: "#/components/schemas/link" collections: type: array items: $ref: "#/components/schemas/collection" confClasses: type: object required: - conformsTo properties: conformsTo: type: array items: type: string exception: type: object description: |- Information about the exception: an error code plus an optional description. required: - code properties: code: type: string description: type: string extent: type: object description: |- The extent of the features in the collection. In the Core only spatial and temporal extents are specified. Extensions may add additional members to represent other extents, for example, thermal or pressure ranges. properties: spatial: description: |- The spatial extent of the features in the collection. type: object properties: bbox: description: |- One or more bounding boxes that describe the spatial extent of the dataset. In the Core only a single bounding box is supported. Extensions may support additional areas. If multiple areas are provided, the union of the bounding boxes describes the spatial extent. type: array minItems: 1 items: description: |- Each bounding box is provided as four or six numbers, depending on whether the coordinate reference system includes a vertical axis (height or depth): * Lower left corner, coordinate axis 1 * Lower left corner, coordinate axis 2 * Minimum value, coordinate axis 3 (optional) * Upper right corner, coordinate axis 1 * Upper right corner, coordinate axis 2 * Maximum value, coordinate axis 3 (optional) The coordinate reference system of the values is WGS 84 longitude/latitude (http://www.opengis.net/def/crs/OGC/1.3/CRS84) unless a different coordinate reference system is specified in `crs`. For WGS 84 longitude/latitude the values are in most cases the sequence of minimum longitude, minimum latitude, maximum longitude and maximum latitude. However, in cases where the box spans the antimeridian the first value (west-most box edge) is larger than the third value (east-most box edge). If the vertical axis is included, the third and the sixth number are the bottom and the top of the 3-dimensional bounding box. If a feature has multiple spatial geometry properties, it is the decision of the server whether only a single spatial geometry property is used to determine the extent or all relevant geometries. type: array oneOf: - minItems: 4 maxItems: 4 - minItems: 6 maxItems: 6 items: type: number example: - -180 - -90 - 180 - 90 crs: description: |- Coordinate reference system of the coordinates in the spatial extent (property `bbox`). The default reference system is WGS 84 longitude/latitude. In the Core this is the only supported coordinate reference system. Extensions may support additional coordinate reference systems and add additional enum values. type: string enum: - 'http://www.opengis.net/def/crs/OGC/1.3/CRS84' default: 'http://www.opengis.net/def/crs/OGC/1.3/CRS84' temporal: description: |- The temporal extent of the features in the collection. type: object properties: interval: description: |- One or more time intervals that describe the temporal extent of the dataset. The value `null` is supported and indicates an unbounded interval end. In the Core only a single time interval is supported. Extensions may support multiple intervals. If multiple intervals are provided, the union of the intervals describes the temporal extent. type: array minItems: 1 items: description: |- Begin and end times of the time interval. The timestamps are in the temporal coordinate reference system specified in `trs`. By default this is the Gregorian calendar. type: array minItems: 2 maxItems: 2 items: type: string format: date-time nullable: true example: - '2011-11-11T12:22:11Z' - null trs: description: |- Coordinate reference system of the coordinates in the temporal extent (property `interval`). The default reference system is the Gregorian calendar. In the Core this is the only supported temporal coordinate reference system. Extensions may support additional temporal coordinate reference systems and add additional enum values. type: string enum: - 'http://www.opengis.net/def/uom/ISO-8601/0/Gregorian' default: 'http://www.opengis.net/def/uom/ISO-8601/0/Gregorian' featureCollectionGeoJSON: type: object required: - type - features properties: type: type: string enum: - FeatureCollection features: type: array items: $ref: "#/components/schemas/featureGeoJSON" links: type: array items: $ref: "#/components/schemas/link" timeStamp: $ref: "#/components/schemas/timeStamp" numberMatched: $ref: "#/components/schemas/numberMatched" numberReturned: $ref: "#/components/schemas/numberReturned" featureGeoJSON: type: object required: - type - geometry - properties properties: type: type: string enum: - Feature geometry: $ref: "#/components/schemas/geometryGeoJSON" properties: type: object nullable: true id: oneOf: - type: string - type: integer links: type: array items: $ref: "#/components/schemas/link" geometryGeoJSON: oneOf: - $ref: "#/components/schemas/pointGeoJSON" - $ref: "#/components/schemas/multipointGeoJSON" - $ref: "#/components/schemas/linestringGeoJSON" - $ref: "#/components/schemas/multilinestringGeoJSON" - $ref: "#/components/schemas/polygonGeoJSON" - $ref: "#/components/schemas/multipolygonGeoJSON" - $ref: "#/components/schemas/geometrycollectionGeoJSON" geometrycollectionGeoJSON: type: object required: - type - geometries properties: type: type: string enum: - GeometryCollection geometries: type: array items: $ref: "#/components/schemas/geometryGeoJSON" landingPage: type: object required: - links properties: title: type: string example: Buildings in Bonn description: type: string example: Access to data about buildings in the city of Bonn via a Web API that conforms to the OGC API Features specification. links: type: array items: $ref: "#/components/schemas/link" linestringGeoJSON: type: object required: - type - coordinates properties: type: type: string enum: - LineString coordinates: type: array minItems: 2 items: type: array minItems: 2 items: type: number link: type: object required: - href properties: href: type: string example: http://data.example.com/buildings/123 rel: type: string example: alternate type: type: string example: application/geo+json hreflang: type: string example: en title: type: string example: Trierer Strasse 70, 53115 Bonn length: type: integer multilinestringGeoJSON: type: object required: - type - coordinates properties: type: type: string enum: - MultiLineString coordinates: type: array items: type: array minItems: 2 items: type: array minItems: 2 items: type: number multipointGeoJSON: type: object required: - type - coordinates properties: type: type: string enum: - MultiPoint coordinates: type: array items: type: array minItems: 2 items: type: number multipolygonGeoJSON: type: object required: - type - coordinates properties: type: type: string enum: - MultiPolygon coordinates: type: array items: type: array items: type: array minItems: 4 items: type: array minItems: 2 items: type: number numberMatched: description: |- The number of features of the feature type that match the selection parameters like `bbox`. type: integer minimum: 0 example: 127 numberReturned: description: |- The number of features in the feature collection. A server may omit this information in a response, if the information about the number of features is not known or difficult to compute. If the value is provided, the value shall be identical to the number of items in the "features" array. type: integer minimum: 0 example: 10 pointGeoJSON: type: object required: - type - coordinates properties: type: type: string enum: - Point coordinates: type: array minItems: 2 items: type: number polygonGeoJSON: type: object required: - type - coordinates properties: type: type: string enum: - Polygon coordinates: type: array items: type: array minItems: 4 items: type: array minItems: 2 items: type: number timeStamp: description: This property indicates the time and date when the response was generated. type: string format: date-time example: '2017-08-17T08:05:32Z' responses: LandingPage: description: |- The landing page provides links to the API definition (link relations `service-desc` and `service-doc`), the Conformance declaration (path `/conformance`, link relation `conformance`), and the Feature Collections (path `/collections`, link relation `data`). content: application/json: schema: $ref: '#/components/schemas/landingPage' example: title: Buildings in Bonn description: Access to data about buildings in the city of Bonn via a Web API that conforms to the OGC API Features specification. links: - href: 'http://data.example.org/' rel: self type: application/json title: this document - href: 'http://data.example.org/api' rel: service-desc type: application/vnd.oai.openapi+json;version=3.0 title: the API definition - href: 'http://data.example.org/api.html' rel: service-doc type: text/html title: the API documentation - href: 'http://data.example.org/conformance' rel: conformance type: application/json title: OGC API conformance classes implemented by this server - href: 'http://data.example.org/collections' rel: data type: application/json title: Information about the feature collections text/html: schema: type: string ConformanceDeclaration: description: |- The URIs of all conformance classes supported by the server. To support "generic" clients that want to access multiple OGC API Features implementations - and not "just" a specific API / server, the server declares the conformance classes it implements and conforms to. content: application/json: schema: $ref: '#/components/schemas/confClasses' example: conformsTo: - 'http://www.opengis.net/spec/ogcapi-features-1/1.0/conf/core' - 'http://www.opengis.net/spec/ogcapi-features-1/1.0/conf/oas30' - 'http://www.opengis.net/spec/ogcapi-features-1/1.0/conf/html' - 'http://www.opengis.net/spec/ogcapi-features-1/1.0/conf/geojson' text/html: schema: type: string Collections: description: |- The feature collections shared by this API. The dataset is organized as one or more feature collections. This resource provides information about and access to the collections. The response contains the list of collections. For each collection, a link to the items in the collection (path `/collections/{collectionId}/items`, link relation `items`) as well as key information about the collection. This information includes: * A local identifier for the collection that is unique for the dataset; * A list of coordinate reference systems (CRS) in which geometries may be returned by the server. The first CRS is the default coordinate reference system (the default is always WGS 84 with axis order longitude/latitude); * An optional title and description for the collection; * An optional extent that can be used to provide an indication of the spatial and temporal extent of the collection - typically derived from the data; * An optional indicator about the type of the items in the collection (the default value, if the indicator is not provided, is 'feature'). content: application/json: schema: $ref: '#/components/schemas/collections' example: links: - href: 'http://data.example.org/collections.json' rel: self type: application/json title: this document - href: 'http://data.example.org/collections.html' rel: alternate type: text/html title: this document as HTML - href: 'http://schemas.example.org/1.0/buildings.xsd' rel: describedby type: application/xml title: GML application schema for Acme Corporation building data - href: 'http://download.example.org/buildings.gpkg' rel: enclosure type: application/geopackage+sqlite3 title: Bulk download (GeoPackage) length: 472546 collections: - id: buildings title: Buildings description: Buildings in the city of Bonn. extent: spatial: bbox: - - 7.01 - 50.63 - 7.22 - 50.78 temporal: interval: - - '2010-02-15T12:34:56Z' - null links: - href: 'http://data.example.org/collections/buildings/items' rel: items type: application/geo+json title: Buildings - href: 'http://data.example.org/collections/buildings/items.html' rel: items type: text/html title: Buildings - href: 'https://creativecommons.org/publicdomain/zero/1.0/' rel: license type: text/html title: CC0-1.0 - href: 'https://creativecommons.org/publicdomain/zero/1.0/rdf' rel: license type: application/rdf+xml title: CC0-1.0 text/html: schema: type: string Collection: description: |- Information about the feature collection with id `collectionId`. The response contains a link to the items in the collection (path `/collections/{collectionId}/items`, link relation `items`) as well as key information about the collection. This information includes: * A local identifier for the collection that is unique for the dataset; * A list of coordinate reference systems (CRS) in which geometries may be returned by the server. The first CRS is the default coordinate reference system (the default is always WGS 84 with axis order longitude/latitude); * An optional title and description for the collection; * An optional extent that can be used to provide an indication of the spatial and temporal extent of the collection - typically derived from the data; * An optional indicator about the type of the items in the collection (the default value, if the indicator is not provided, is 'feature'). content: application/json: schema: $ref: '#/components/schemas/collection' example: id: buildings title: Buildings description: Buildings in the city of Bonn. extent: spatial: bbox: - - 7.01 - 50.63 - 7.22 - 50.78 temporal: interval: - - '2010-02-15T12:34:56Z' - null links: - href: 'http://data.example.org/collections/buildings/items' rel: items type: application/geo+json title: Buildings - href: 'http://data.example.org/collections/buildings/items.html' rel: items type: text/html title: Buildings - href: 'https://creativecommons.org/publicdomain/zero/1.0/' rel: license type: text/html title: CC0-1.0 - href: 'https://creativecommons.org/publicdomain/zero/1.0/rdf' rel: license type: application/rdf+xml title: CC0-1.0 text/html: schema: type: string Features: description: |- The response is a document consisting of features in the collection. The features included in the response are determined by the server based on the query parameters of the request. To support access to larger collections without overloading the client, the API supports paged access with links to the next page, if more features are selected that the page size. The `bbox` and `datetime` parameter can be used to select only a subset of the features in the collection (the features that are in the bounding box or time interval). The `bbox` parameter matches all features in the collection that are not associated with a location, too. The `datetime` parameter matches all features in the collection that are not associated with a time stamp or interval, too. The `limit` parameter may be used to control the subset of the selected features that should be returned in the response, the page size. Each page may include information about the number of selected and returned features (`numberMatched` and `numberReturned`) as well as links to support paging (link relation `next`). content: application/geo+json: schema: $ref: '#/components/schemas/featureCollectionGeoJSON' example: type: FeatureCollection links: - href: 'http://data.example.com/collections/buildings/items.json' rel: self type: application/geo+json title: this document - href: 'http://data.example.com/collections/buildings/items.html' rel: alternate type: text/html title: this document as HTML - href: 'http://data.example.com/collections/buildings/items.json&offset=10&limit=2' rel: next type: application/geo+json title: next page timeStamp: '2018-04-03T14:52:23Z' numberMatched: 123 numberReturned: 2 features: - type: Feature id: '123' geometry: type: Polygon coordinates: - ... properties: function: residential floors: '2' lastUpdate: '2015-08-01T12:34:56Z' - type: Feature id: '132' geometry: type: Polygon coordinates: - ... properties: function: public use floors: '10' lastUpdate: '2013-12-03T10:15:37Z' text/html: schema: type: string Feature: description: |- fetch the feature with id `featureId` in the feature collection with id `collectionId` content: application/geo+json: schema: $ref: '#/components/schemas/featureGeoJSON' example: type: Feature links: - href: 'http://data.example.com/id/building/123' rel: canonical title: canonical URI of the building - href: 'http://data.example.com/collections/buildings/items/123.json' rel: self type: application/geo+json title: this document - href: 'http://data.example.com/collections/buildings/items/123.html' rel: alternate type: text/html title: this document as HTML - href: 'http://data.example.com/collections/buildings' rel: collection type: application/geo+json title: the collection document id: '123' geometry: type: Polygon coordinates: - ... properties: function: residential floors: '2' lastUpdate: '2015-08-01T12:34:56Z' text/html: schema: type: string InvalidParameter: description: |- A query parameter has an invalid value. content: application/json: schema: $ref: '#/components/schemas/exception' text/html: schema: type: string NotFound: description: |- The requested resource does not exist on the server. For example, a path parameter had an incorrect value. NotAcceptable: description: |- Content negotiation failed. For example, the `Accept` header submitted in the request did not support any of the media types supported by the server for the requested resource. ServerError: description: |- A server error occurred. content: application/json: schema: $ref: '#/components/schemas/exception' text/html: schema: type: string ```