# Directory Structure ``` ├── .cursorrules ├── .gitignore ├── map_demo.py ├── mcp_osm │ ├── __init__.py │ ├── __main__.py │ ├── flask_server.py │ └── server.py ├── mcp.py ├── osm-mcp-s.webp ├── osm-mcp.webp ├── pyproject.toml ├── README.md ├── run.py ├── setup.py ├── templates │ └── index.html └── uv.lock ``` # Files -------------------------------------------------------------------------------- /.gitignore: -------------------------------------------------------------------------------- ``` # Python-generated files __pycache__/ *.py[oc] build/ dist/ wheels/ *.egg-info # Virtual environments .venv # Byte-compiled / optimized / DLL files __pycache__/ *.py[cod] *$py.class # C extensions *.so # Distribution / packaging .Python build/ develop-eggs/ dist/ downloads/ eggs/ .eggs/ lib/ lib64/ parts/ sdist/ var/ wheels/ *.egg-info/ .installed.cfg *.egg MANIFEST # PyInstaller # Usually these files are written by a python script from a template # before PyInstaller builds the exe, so as to inject date/other infos into it. *.manifest *.spec # Installer logs pip-log.txt pip-delete-this-directory.txt # Unit test / coverage reports htmlcov/ .tox/ .nox/ .coverage .coverage.* .cache nosetests.xml coverage.xml *.cover *.py,cover .hypothesis/ .pytest_cache/ # Translations *.mo *.pot # Django stuff: *.log local_settings.py db.sqlite3 db.sqlite3-journal # Flask stuff: instance/ .webassets-cache # Scrapy stuff: .scrapy # Sphinx documentation docs/_build/ # PyBuilder target/ # Jupyter Notebook .ipynb_checkpoints # IPython profile_default/ ipython_config.py # pyenv .python-version # pipenv # According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control. # However, in case of collaboration, if having platform-specific dependencies or dependencies # having no cross-platform support, pipenv may install dependencies that don't work, or not # install all needed dependencies. #Pipfile.lock # PEP 582; used by e.g. github.com/David-OConnor/pyflow __pypackages__/ # Celery stuff celerybeat-schedule celerybeat.pid # SageMath parsed files *.sage.py # Environments .env .venv env/ venv/ ENV/ env.bak/ venv.bak/ # Spyder project settings .spyderproject .spyproject # Rope project settings .ropeproject # mkdocs documentation /site # mypy .mypy_cache/ .dmypy.json dmypy.json # Pyre type checker .pyre/ # VS Code .vscode/ # PyCharm .idea/ ``` -------------------------------------------------------------------------------- /.cursorrules: -------------------------------------------------------------------------------- ``` # New Rules to Address Overzealous Agentic Functions ## Pacing and Scope Control 1. **Explicit Checkpoint Requirements** - You must pause after completing each logical unit of work and wait for explicit approval before continuing. - Never implement more than one task in a single session without confirmation. 2. **Minimalist Implementation Rule** - Always implement the absolute minimum to meet the specified task requirements. - When in doubt about scope, choose the narrower interpretation. 3. **Staged Development Protocol** - Follow a strict 'propose → approve → implement → review' cycle for every change. - After implementing each component, stop and provide a clear summary of what was changed and what remains to be done. 4. **Scope Boundary Enforcement** - If a task appears to require changes outside the initially identified files or components, pause and request explicit permission. - Never perform 'while I'm at it' improvements without prior approval. ## Communications 1. **Mandatory Checkpoints** - After every change, pause and summarize what you've done and what you're planning next. - Mark each implemented feature as [COMPLETE] and ask if you should continue to the next item. 2. **Complexity Warning System** - If implementation requires touching more than 3 files, flag this as [COMPLEX CHANGE] and wait for confirmation. - Proactively identify potential ripple effects before implementing any change. 3. **Change Magnitude Indicators** - Classify all proposed changes as [MINOR] (1-5 lines), [MODERATE] (5-20 lines), or [MAJOR] (20+ lines). - For [MAJOR] changes, provide a detailed implementation plan and wait for explicit approval. 4. **Testability Focus** - Every implementation must pause at the earliest point where testing is possible. - Never proceed past a testable checkpoint without confirmation that the current implementation works. ``` -------------------------------------------------------------------------------- /README.md: -------------------------------------------------------------------------------- ```markdown # MCP-OSM: OpenStreetMap Integration for MCP This package provides OpenStreetMap integration for MCP, allowing users to query and visualize map data through an MCP interface. [](osm-mcp.webp) ## Features - Web-based map viewer using Leaflet and OpenStreetMap - Server-to-client communication via Server-Sent Events (SSE) - MCP tools for map control (adding markers, polygons, setting view, getting view) - PostgreSQL/PostGIS query interface for OpenStreetMap data ## Installation This is my `claude_desktop_config.json`: ```json { "mcpServers": { "OSM PostgreSQL Server": { "command": "/Users/wiseman/.local/bin/uv", "args": [ "run", "--env-file", ".env", "--with", "mcp[cli]", "--with", "psycopg2", "--with-editable", "/Users/wiseman/src/mcp-osm", "--directory", "/Users/wiseman/src/mcp-osm", "mcp", "run", "mcp.py" ] } } } ``` When the MCP server starts it also starts a web server at http://localhost:8889/ that has the map interface. ### Environment Variables The following environment variables can be used to configure the MCP: - `FLASK_HOST` - Host for the Flask server (default: 127.0.0.1) - `FLASK_PORT` - Port for the Flask server (default: 8889) - `PGHOST` - PostgreSQL host (default: localhost) - `PGPORT` - PostgreSQL port (default: 5432) - `PGDB` - PostgreSQL database name (default: osm) - `PGUSER` - PostgreSQL username (default: postgres) - `PGPASSWORD` - PostgreSQL password (default: postgres) ### MCP Tools The following MCP tools are available: - `get_map_view` - Get the current map view - `set_map_view` - Set the map view to specific coordinates or bounds - `set_map_title` - Set the title displayed at the bottom right of the map - `add_map_marker` - Add a marker at specific coordinates - `add_map_line` - Add a line defined by a set of coordinates - `add_map_polygon` - Add a polygon defined by a set of coordinates - `query_osm_postgres` - Execute a SQL query against the OpenStreetMap database ``` -------------------------------------------------------------------------------- /mcp.py: -------------------------------------------------------------------------------- ```python from mcp_osm import server mcp = server.mcp ``` -------------------------------------------------------------------------------- /mcp_osm/__init__.py: -------------------------------------------------------------------------------- ```python """ MCP-OSM: MCP server with OpenStreetMap integration """ __version__ = "0.1.0" ``` -------------------------------------------------------------------------------- /mcp_osm/__main__.py: -------------------------------------------------------------------------------- ```python """Main entry point for the MCP-OSM package.""" from mcp_osm.server import run_server if __name__ == "__main__": run_server() ``` -------------------------------------------------------------------------------- /run.py: -------------------------------------------------------------------------------- ```python #!/usr/bin/env python3 """ Run the MCP-OSM server """ from mcp_osm.server import run_server if __name__ == "__main__": run_server() ``` -------------------------------------------------------------------------------- /pyproject.toml: -------------------------------------------------------------------------------- ```toml [project] name = "mcp-osm" version = "0.1.0" description = "Add your description here" readme = "README.md" requires-python = ">=3.10" dependencies = [ "mcp[cli]>=1.3.0", "psycopg2>=2.9.10", "flask>=3.1.0", ] [project.optional-dependencies] dev = ["coverage>=6.0.0"] ``` -------------------------------------------------------------------------------- /setup.py: -------------------------------------------------------------------------------- ```python #!/usr/bin/env python3 from setuptools import setup, find_packages setup( name="mcp-osm", version="0.1.0", description="MCP server with OpenStreetMap integration", author="Your Name", author_email="[email protected]", # Explicitly define packages to include packages=find_packages(exclude=["static", "templates"]), # Include our templates and static files as package data package_data={ "": ["templates/*", "static/*", "static/*/*"], }, include_package_data=True, # Define dependencies install_requires=[ "flask>=3.1.0", "psycopg2>=2.9.10", "fastmcp", ], python_requires=">=3.7", ) ``` -------------------------------------------------------------------------------- /map_demo.py: -------------------------------------------------------------------------------- ```python #!/usr/bin/env python3 """ Map Control Demo - Shows how to use the Flask server to control the map display """ import sys import os import time # Set up the path to find our package sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) from mcp_osm.flask_server import FlaskServer def demo_map_controls(): # Initialize and start the Flask server server = FlaskServer() server.start() print("Map server started. Open http://127.0.0.1:5000 in your browser.") print("This demo will show various map features after you open the page.") time.sleep(5) # Give some time for the browser to connect # Example 1: Set the map title print("\nSetting map title...") server.set_title("San Francisco Landmarks", {"color": "#0066cc"}) time.sleep(2) # Example 2: Set the map view print("\nSetting map view to San Francisco...") server.set_view(center=[37.7749, -122.4194], zoom=12) time.sleep(3) # Example 3: Show a marker print("\nAdding a marker at Coit Tower...") server.show_marker( coordinates=[37.8024, -122.4058], text="Coit Tower, San Francisco", options={"title": "Coit Tower"} ) time.sleep(3) # Example 4: Show a polygon around Golden Gate Park print("\nAdding a polygon around Golden Gate Park...") golden_gate_park = [ [37.7694, -122.5110], [37.7694, -122.4566], [37.7646, -122.4566], [37.7646, -122.5110] ] server.show_polygon( coordinates=golden_gate_park, options={"color": "green", "fillOpacity": 0.3} ) time.sleep(3) # Example 5: Show a line for Market Street print("\nAdding a line along Market Street...") market_street = [ [37.7944, -122.3953], # Ferry Building [37.7932, -122.3967], [37.7909, -122.4013], [37.7891, -122.4051], [37.7865, -122.4113], # Powell Street [37.7835, -122.4173], [37.7811, -122.4219] # Civic Center ] server.show_line( coordinates=market_street, options={"color": "red", "weight": 5, "dashArray": "10,15"} ) time.sleep(3) # Update the title with additional information print("\nUpdating map title with additional information...") server.set_title("San Francisco Landmarks - Golden Gate Park Area", {"color": "#006600", "backgroundColor": "rgba(255, 255, 255, 0.9)"}) time.sleep(2) # Example 6: Get the current view print("\nCurrent map view:") current_view = server.get_current_view() print(f" Center: {current_view['center']}") print(f" Zoom: {current_view['zoom']}") if current_view['bounds']: print(f" Bounds: {current_view['bounds']}") print("\nDemo complete! Keep the browser window open to interact with the map.") print("Press Ctrl+C to stop the server when you're done.") # Keep the script running try: while True: time.sleep(1) except KeyboardInterrupt: print("\nStopping server...") server.stop() print("Server stopped.") if __name__ == "__main__": demo_map_controls() ``` -------------------------------------------------------------------------------- /mcp_osm/flask_server.py: -------------------------------------------------------------------------------- ```python import json import logging import os import queue import socket import sys import threading import time import io import base64 from contextlib import redirect_stdout from unittest import mock _log = logging.getLogger('werkzeug') _log.setLevel(logging.WARNING) # Redirect all Flask/Werkzeug logging to stderr for handler in _log.handlers: handler.setStream(sys.stderr) from flask import ( Flask, Response, jsonify, render_template, request, send_from_directory, ) # Redirect all logging to stderr. logging.basicConfig(stream=sys.stderr) # Create a logger for this module logger = logging.getLogger(__name__) class FlaskServer: def __init__(self, host="127.0.0.1", port=5000): self.host = host self.port = port self.app = Flask(__name__, template_folder=os.path.join(os.path.dirname(os.path.dirname(__file__)), "templates"), static_folder=os.path.join(os.path.dirname(os.path.dirname(__file__)), "static")) # Capture and redirect Flask's initialization output to stderr with io.StringIO() as buf, redirect_stdout(buf): self.setup_routes() output = buf.getvalue() if output: logger.info(output.strip()) self.server_thread = None self.clients = {} # Maps client_id to message queue self.client_counter = 0 self.current_view = { "center": [0, 0], "zoom": 2, "bounds": [[-85, -180], [85, 180]] } self.sse_clients = {} # Changed from list to dict to store queues self.latest_screenshot = None # Store the latest screenshot # Add storage for geolocate requests and responses self.geolocate_requests = {} self.geolocate_responses = {} def setup_routes(self): @self.app.route("/") def index(): return render_template("index.html") @self.app.route("/static/<path:path>") def send_static(path): return send_from_directory("static", path) @self.app.route("/api/sse") def sse(): def event_stream(client_id): # Create a queue for this client client_queue = queue.Queue() self.sse_clients[client_id] = client_queue try: # Initial connection message yield 'data: {"type": "connected", "id": %d}\n\n' % client_id while True: try: # Try to get a message from the queue with a timeout message = client_queue.get(timeout=30) yield f"data: {message}\n\n" except queue.Empty: # No message received in timeout period, send a ping yield 'data: {"type": "ping"}\n\n' except GeneratorExit: # Client disconnected if client_id in self.sse_clients: del self.sse_clients[client_id] logger.info( f"Client {client_id} disconnected, {len(self.sse_clients)} clients remaining" ) # Generate a unique ID for this client client_id = int(time.time() * 1000) % 1000000 return Response(event_stream(client_id), mimetype="text/event-stream") @self.app.route("/api/viewChanged", methods=["POST"]) def view_changed(): data = request.json if data: if "center" in data: self.current_view["center"] = data["center"] if "zoom" in data: self.current_view["zoom"] = data["zoom"] if "bounds" in data: self.current_view["bounds"] = data["bounds"] return jsonify({"status": "success"}) @self.app.route("/api/screenshot", methods=["POST"]) def save_screenshot(): data = request.json if data and "image" in data: # Store the base64 image data self.latest_screenshot = data["image"] return jsonify({"status": "success"}) return jsonify({"status": "error", "message": "No image data provided"}), 400 @self.app.route("/api/geolocateResponse", methods=["POST"]) def geolocate_response(): data = request.json if data and "requestId" in data and "results" in data: request_id = data["requestId"] results = data["results"] # Store the response self.geolocate_responses[request_id] = results logger.info(f"Received geolocate response for request {request_id} with {len(results)} results") return jsonify({"status": "success"}) return jsonify({"status": "error", "message": "Invalid geolocate response data"}), 400 def is_port_in_use(self, port): """Check if a port is already in use""" with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: return s.connect_ex((self.host, port)) == 0 def start(self): """Start the Flask server in a separate thread""" # Try up to 10 ports, starting with self.port original_port = self.port max_attempts = 10 for attempt in range(max_attempts): if self.is_port_in_use(self.port): logger.info(f"Port {self.port} is already in use, trying port {self.port + 1}") self.port += 1 if attempt == max_attempts - 1: logger.error(f"Failed to find an available port after {max_attempts} attempts") # Reset port to original value self.port = original_port return False else: # Port is available, start the server def run_server(): # Redirect stdout to stderr while running Flask with redirect_stdout(sys.stderr): self.app.run( host=self.host, port=self.port, debug=False, use_reloader=False ) self.server_thread = threading.Thread(target=run_server) self.server_thread.daemon = True # Thread will exit when main thread exits self.server_thread.start() logger.info(f"Flask server started at http://{self.host}:{self.port}") return True return False def stop(self): """Stop the Flask server""" # Flask doesn't provide a clean way to stop the server from outside # In a production environment, you would use a more robust server like gunicorn # For this example, we'll rely on the daemon thread to exit when the main thread exits logger.info("Flask server stopping...") # Map control methods def send_map_command(self, command_type, data): """ Send a command to all connected SSE clients Args: command_type (str): Type of command (SHOW_POLYGON, SHOW_MARKER, SET_VIEW) data (dict): Data for the command """ command = {"type": command_type, "data": data} message = json.dumps(command) # Send the message to all connected clients clients_count = len(self.sse_clients) if clients_count == 0: logger.info("No connected clients to send message to") return logger.info(f"Sending {command_type} to {clients_count} clients") for client_id, client_queue in list(self.sse_clients.items()): try: client_queue.put(message) except Exception as e: logger.error(f"Error sending to client {client_id}: {e}") def show_polygon(self, coordinates, options=None): """ Display a polygon on the map Args: coordinates (list): List of [lat, lng] coordinates options (dict, optional): Styling options """ data = {"coordinates": coordinates, "options": options or {}} self.send_map_command("SHOW_POLYGON", data) def show_marker(self, coordinates, text=None, options=None): """ Display a marker on the map Args: coordinates (list): [lat, lng] coordinates text (str, optional): Popup text options (dict, optional): Styling options """ data = {"coordinates": coordinates, "text": text, "options": options or {}} self.send_map_command("SHOW_MARKER", data) def show_line(self, coordinates, options=None): """ Display a line (polyline) on the map Args: coordinates (list): List of [lat, lng] coordinates options (dict, optional): Styling options """ data = {"coordinates": coordinates, "options": options or {}} self.send_map_command("SHOW_LINE", data) def set_view(self, bounds=None, center=None, zoom=None): """ Set the map view Args: bounds (list, optional): [[south, west], [north, east]] center (list, optional): [lat, lng] center point zoom (int, optional): Zoom level """ data = {} if bounds: data["bounds"] = bounds if center: data["center"] = center if zoom: data["zoom"] = zoom self.send_map_command("SET_VIEW", data) def get_current_view(self): """ Get the current map view Returns: dict: Current view information """ return self.current_view def set_title(self, title, options=None): """ Set the map title displayed at the bottom right of the map Args: title (str): Title text to display options (dict, optional): Styling options like fontSize, color, etc. """ data = {"title": title, "options": options or {}} self.send_map_command("SET_TITLE", data) def capture_screenshot(self): """ Request a screenshot from the map and wait for it to be received Returns: str: Base64-encoded image data, or None if no screenshot is available """ # Send command to capture screenshot self.send_map_command("CAPTURE_SCREENSHOT", {}) # Wait for the screenshot to be received (with timeout) start_time = time.time() timeout = 5 # seconds while time.time() - start_time < timeout: if self.latest_screenshot: screenshot = self.latest_screenshot self.latest_screenshot = None # Clear after retrieving return screenshot time.sleep(0.1) logger.warning("Screenshot capture timed out") return None def geolocate(self, query): """ Send a geolocate request to the web client and wait for the response Args: query (str): The location name to search for Returns: list: Nominatim search results or None if the request times out """ # Generate a unique request ID request_id = str(int(time.time() * 1000)) # Send the geolocate command to the web client data = {"requestId": request_id, "query": query} self.send_map_command("GEOLOCATE", data) # Wait for the response (with timeout) start_time = time.time() timeout = 10 # seconds while time.time() - start_time < timeout: if request_id in self.geolocate_responses: results = self.geolocate_responses.pop(request_id) return results time.sleep(0.1) logger.warning(f"Geolocate request for '{query}' timed out") return None # For testing the Flask server directly if __name__ == "__main__": server = FlaskServer() server.start() # Keep the main thread running try: logger.info("Press Ctrl+C to stop the server") import time while True: time.sleep(1) except KeyboardInterrupt: server.stop() logger.info("Server stopped") ``` -------------------------------------------------------------------------------- /templates/index.html: -------------------------------------------------------------------------------- ```html <!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <meta name="viewport" content="width=device-width, initial-scale=1.0"> <title>OpenStreetMap Viewer</title> <!-- Leaflet CSS --> <link rel="stylesheet" href="https://unpkg.com/[email protected]/dist/leaflet.css" integrity="sha256-p4NxAoJBhIIN+hmNHrzRCf9tD/miZyoHS5obTRR9BMY=" crossorigin=""/> <!-- html2canvas for screenshot functionality --> <script src="https://html2canvas.hertzen.com/dist/html2canvas.min.js"></script> <style> body, html { margin: 0; padding: 0; height: 100%; width: 100%; } #map { height: 100vh; width: 100%; } .info-panel { padding: 10px; background: white; border-radius: 5px; box-shadow: 0 0 15px rgba(0,0,0,0.2); } .map-title { position: absolute; bottom: 20px; right: 20px; padding: 10px 15px; background-color: rgba(255, 255, 255, 0.8); border-radius: 5px; box-shadow: 0 0 15px rgba(0, 0, 0, 0.2); z-index: 1000; font-size: 24px; font-weight: bold; max-width: 50%; text-align: right; } .search-container { position: absolute; top: 20px; right: 20px; z-index: 1000; width: 300px; } .search-box { width: 100%; padding: 10px; border: none; border-radius: 5px; box-shadow: 0 0 15px rgba(0,0,0,0.2); font-size: 16px; } .search-results { margin-top: 5px; max-height: 300px; overflow-y: auto; background-color: white; border-radius: 5px; box-shadow: 0 0 15px rgba(0,0,0,0.2); display: none; } .search-result-item { padding: 10px; cursor: pointer; border-bottom: 1px solid #eee; } .search-result-item:hover { background-color: #f5f5f5; } .search-result-item:last-child { border-bottom: none; } </style> </head> <body> <div id="map"></div> <div id="mapTitle" class="map-title" style="display:none;"></div> <!-- Search Box --> <div class="search-container"> <input type="text" id="searchBox" class="search-box" placeholder="Search for a place..." autocomplete="off"> <div id="searchResults" class="search-results"></div> </div> <!-- Leaflet JavaScript --> <script src="https://unpkg.com/[email protected]/dist/leaflet.js" integrity="sha256-20nQCchB9co0qIjJZRGuk2/Z9VM+kNiyxNV1lvTlZBo=" crossorigin=""></script> <script> // Initialize the map const map = L.map('map').setView([0, 0], 2); // Add OpenStreetMap tile layer L.tileLayer('https://{s}.tile.openstreetmap.org/{z}/{x}/{y}.png', { attribution: '© <a href="https://www.openstreetmap.org/copyright">OpenStreetMap</a> contributors', maxZoom: 19 }).addTo(map); // Storage for created objects to allow removal/updates const mapObjects = { markers: {}, polygons: {}, lines: {} }; // Counter for generating unique IDs let objectCounter = 0; // Send initial view to server sendViewChanged(); // Map event handlers map.on('moveend', sendViewChanged); map.on('zoomend', sendViewChanged); // Function to send view changes to the server function sendViewChanged() { const center = map.getCenter(); const bounds = map.getBounds(); const zoom = map.getZoom(); fetch('/api/viewChanged', { method: 'POST', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify({ center: [center.lat, center.lng], zoom: zoom, bounds: [ [bounds.getSouth(), bounds.getWest()], [bounds.getNorth(), bounds.getEast()] ] }) }) .catch(error => console.error('Error sending view change:', error)); } // Search functionality const searchBox = document.getElementById('searchBox'); const searchResults = document.getElementById('searchResults'); let searchTimeout = null; // Add event listener for search input searchBox.addEventListener('input', function() { // Clear previous timeout if (searchTimeout) { clearTimeout(searchTimeout); } const query = this.value.trim(); // Hide results if query is empty if (!query) { searchResults.style.display = 'none'; return; } // Set a timeout to avoid making too many requests while typing searchTimeout = setTimeout(() => { searchPlace(query); }, 500); }); // Function to search for a place using Nominatim function searchPlace(query) { // Nominatim API URL const url = `https://nominatim.openstreetmap.org/search?format=json&q=${encodeURIComponent(query)}&limit=5`; fetch(url) .then(response => response.json()) .then(data => { displaySearchResults(data); }) .catch(error => { console.error('Error searching for place:', error); }); } // Function to display search results function displaySearchResults(results) { // Clear previous results searchResults.innerHTML = ''; if (results.length === 0) { searchResults.innerHTML = '<div class="search-result-item">No results found</div>'; searchResults.style.display = 'block'; return; } // Create result items results.forEach(result => { const resultItem = document.createElement('div'); resultItem.className = 'search-result-item'; resultItem.textContent = result.display_name; // Add click event to go to the location resultItem.addEventListener('click', () => { goToLocation(result); searchResults.style.display = 'none'; searchBox.value = result.display_name; }); searchResults.appendChild(resultItem); }); // Show results searchResults.style.display = 'block'; } // Function to go to a location function goToLocation(location) { const lat = parseFloat(location.lat); const lon = parseFloat(location.lon); // Create a marker at the location const markerId = showMarker([lat, lon], location.display_name, { openPopup: true }); // Set the view to the location map.setView([lat, lon], 14); } // Close search results when clicking outside document.addEventListener('click', function(event) { if (!event.target.closest('.search-container')) { searchResults.style.display = 'none'; } }); // Function to capture and send a screenshot of the map function captureMapScreenshot() { console.log('Capturing map screenshot...'); // Use html2canvas to capture the map element html2canvas(document.getElementById('map')).then(canvas => { // Convert canvas to base64 image data const imageData = canvas.toDataURL('image/png'); // Send the image data to the server fetch('/api/screenshot', { method: 'POST', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify({ image: imageData }) }) .then(response => response.json()) .then(data => { console.log('Screenshot sent to server:', data); }) .catch(error => { console.error('Error sending screenshot:', error); }); }); } // Function to perform a geolocate search and send results back to server function performGeolocateSearch(requestId, query) { console.log(`Performing geolocate search for "${query}" (request ID: ${requestId})`); // Nominatim API URL const url = `https://nominatim.openstreetmap.org/search?format=json&q=${encodeURIComponent(query)}&limit=10`; fetch(url) .then(response => response.json()) .then(data => { // Send the results back to the server fetch('/api/geolocateResponse', { method: 'POST', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify({ requestId: requestId, results: data }) }) .then(response => response.json()) .then(data => { console.log('Geolocate response sent to server:', data); }) .catch(error => { console.error('Error sending geolocate response:', error); }); }) .catch(error => { console.error('Error performing geolocate search:', error); // Send empty results in case of error fetch('/api/geolocateResponse', { method: 'POST', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify({ requestId: requestId, results: [] }) }); }); } // SSE connection to receive commands from the server function connectSSE() { const eventSource = new EventSource('/api/sse'); eventSource.onopen = function() { console.log('SSE connection established'); }; eventSource.onerror = function(error) { console.error('SSE connection error:', error); eventSource.close(); // Try to reconnect after a delay setTimeout(connectSSE, 5000); }; eventSource.onmessage = function(event) { try { const message = JSON.parse(event.data); handleServerCommand(message); } catch (error) { console.error('Error processing SSE message:', error); } }; } // Handle commands received from the server function handleServerCommand(message) { console.log('Received server command:', message); switch (message.type) { case 'SHOW_POLYGON': showPolygon(message.data.coordinates, message.data.options); break; case 'SHOW_MARKER': showMarker(message.data.coordinates, message.data.text, message.data.options); break; case 'SHOW_LINE': showLine(message.data.coordinates, message.data.options); break; case 'SET_VIEW': setView(message.data); break; case 'SET_TITLE': setTitle(message.data.title, message.data.options); break; case 'CAPTURE_SCREENSHOT': captureMapScreenshot(); break; case 'GEOLOCATE': performGeolocateSearch(message.data.requestId, message.data.query); break; case 'ping': // Just a keepalive, do nothing break; case 'connected': console.log('Connected to server with ID:', message.id); break; default: console.warn('Unknown command type:', message.type); } } // Function to show a polygon on the map function showPolygon(coordinates, options = {}) { const id = 'polygon_' + (objectCounter++); // Remove existing polygon with the same ID if it exists if (mapObjects.polygons[id]) { map.removeLayer(mapObjects.polygons[id]); } // Create and add the polygon const polygon = L.polygon(coordinates, options).addTo(map); mapObjects.polygons[id] = polygon; // Fit map to polygon bounds if requested if (options.fitBounds) { map.fitBounds(polygon.getBounds()); } return id; } // Function to show a marker on the map function showMarker(coordinates, text = null, options = {}) { const id = 'marker_' + (objectCounter++); // Remove existing marker with the same ID if it exists if (mapObjects.markers[id]) { map.removeLayer(mapObjects.markers[id]); } // Create and add the marker const marker = L.marker(coordinates, options).addTo(map); // Add popup if text is provided if (text) { marker.bindPopup(text); if (options.openPopup) { marker.openPopup(); } } mapObjects.markers[id] = marker; return id; } // Function to set the map view function setView(viewOptions) { if (viewOptions.bounds) { map.fitBounds(viewOptions.bounds); } else if (viewOptions.center && viewOptions.zoom) { map.setView(viewOptions.center, viewOptions.zoom); } else if (viewOptions.center) { map.panTo(viewOptions.center); } else if (viewOptions.zoom) { map.setZoom(viewOptions.zoom); } } // Function to set the map title function setTitle(title, options = {}) { const titleElement = document.getElementById('mapTitle'); if (!title) { titleElement.style.display = 'none'; return; } // Set title text titleElement.textContent = title; titleElement.style.display = 'block'; // Apply custom styling if provided if (options.fontSize) { titleElement.style.fontSize = options.fontSize; } if (options.color) { titleElement.style.color = options.color; } if (options.backgroundColor) { titleElement.style.backgroundColor = options.backgroundColor; } } // Function to show a line (polyline) on the map function showLine(coordinates, options = {}) { const id = 'line_' + (objectCounter++); // Remove existing line with the same ID if it exists if (mapObjects.lines[id]) { map.removeLayer(mapObjects.lines[id]); } // Create and add the line const line = L.polyline(coordinates, options).addTo(map); mapObjects.lines[id] = line; // Fit map to line bounds if requested if (options.fitBounds) { map.fitBounds(line.getBounds()); } return id; } // Connect to the SSE endpoint connectSSE(); </script> </body> </html> ``` -------------------------------------------------------------------------------- /mcp_osm/server.py: -------------------------------------------------------------------------------- ```python import logging import os import re import sys import time import json import base64 from dataclasses import dataclass from typing import Any, Dict, List, Optional, Tuple, AsyncIterator from contextlib import asynccontextmanager import psycopg2 import psycopg2.extras from mcp.server.fastmcp import Context, FastMCP from mcp_osm.flask_server import FlaskServer # Configure all logging to stderr logging.basicConfig( stream=sys.stderr, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) # Create a logger for this module logger = logging.getLogger(__name__) # Custom database connection class @dataclass class PostgresConnection: conn: Any async def execute_query( self, query: str, params: Optional[Dict[str, Any]] = None, max_rows: int = 1000 ) -> Tuple[List[Dict[str, Any]], int]: """Execute a query and return results as a list of dictionaries with total count.""" logger.info(f"Executing query: {query}, params: {params}") start_time = time.time() with self.conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur: try: # Set statement timeout to 20 seconds cur.execute("SET statement_timeout = 20000") if params: cur.execute(query, params) else: cur.execute(query) end_time = time.time() logger.info(f"Query execution time: {end_time - start_time} seconds") total_rows = cur.rowcount results = cur.fetchmany(max_rows) logger.info(f"Got {total_rows} rows") # Log first 3 rows. for row in results[:3]: logger.info(f"Row: {row}") return results, total_rows except psycopg2.errors.QueryCanceled: self.conn.rollback() raise TimeoutError("Query execution timed out. Did you use a bounding box, and ::geography?") except Exception as e: self.conn.rollback() raise e async def get_tables(self) -> List[str]: """Get list of tables in the database.""" query = """ SELECT table_name FROM information_schema.tables WHERE table_schema = 'public' ORDER BY table_name; """ with self.conn.cursor() as cur: cur.execute(query) return [row[0] for row in cur.fetchall()] async def get_table_schema(self, table_name: str) -> List[Dict[str, Any]]: """Get schema information for a table.""" query = """ SELECT column_name, data_type, is_nullable FROM information_schema.columns WHERE table_name = %s ORDER BY ordinal_position; """ with self.conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur: cur.execute(query, (table_name,)) return cur.fetchall() async def get_table_info(self, table_name: str) -> Dict[str, Any]: """Get detailed information about a table including indexes.""" # Get table columns columns = await self.get_table_schema(table_name) # Get table indexes index_query = """ SELECT indexname, indexdef FROM pg_indexes WHERE tablename = %s; """ with self.conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur: cur.execute(index_query, (table_name,)) indexes = cur.fetchall() # Get table row count (approximate) count_query = f"SELECT count(*) FROM {table_name};" with self.conn.cursor() as cur: cur.execute(count_query) row_count = cur.fetchone()[0] return { "name": table_name, "columns": columns, "indexes": indexes, "approximate_row_count": row_count, } @dataclass class AppContext: db_conn: Optional[PostgresConnection] = None flask_server: Optional[FlaskServer] = None @asynccontextmanager async def app_lifespan(server: FastMCP) -> AsyncIterator[AppContext]: """Manage application lifecycle with type-safe context""" app_ctx = AppContext() try: # Initialize database connection (optional) try: logger.info("Connecting to database...") conn = psycopg2.connect( host=os.environ.get("PGHOST", "localhost"), port=os.environ.get("PGPORT", "5432"), dbname=os.environ.get("PGDB", "osm"), user=os.environ.get("PGUSER", "postgres"), password=os.environ.get("PGPASSWORD", "postgres"), ) app_ctx.db_conn = PostgresConnection(conn) logger.info("Database connection established") except Exception as e: logger.warning(f"Warning: Could not connect to database: {e}") logger.warning("Continuing without database connection") # Initialize and start Flask server logger.info("Starting Flask server...") flask_server = FlaskServer( host=os.environ.get("FLASK_HOST", "127.0.0.1"), port=int(os.environ.get("FLASK_PORT","8888")) ) flask_server.start() app_ctx.flask_server = flask_server logger.info(f"Flask server started at http://{flask_server.host}:{flask_server.port}") yield app_ctx finally: # Cleanup on shutdown if app_ctx.flask_server: logger.info("Stopping Flask server...") app_ctx.flask_server.stop() if app_ctx.db_conn and app_ctx.db_conn.conn: logger.info("Closing database connection...") app_ctx.db_conn.conn.close() # Initialize the MCP server mcp = FastMCP("OSM MCP Server", dependencies=["psycopg2>=2.9.10", "flask>=3.1.0"], lifespan=app_lifespan) def is_read_only_query(query: str) -> bool: """Check if a query is read-only.""" # Normalize query by removing comments and extra whitespace query = re.sub(r"--.*$", "", query, flags=re.MULTILINE) query = re.sub(r"/\*.*?\*/", "", query, flags=re.DOTALL) query = query.strip().lower() # Check for write operations write_operations = [ r"^\s*insert\s+", r"^\s*update\s+", r"^\s*delete\s+", r"^\s*drop\s+", r"^\s*create\s+", r"^\s*alter\s+", r"^\s*truncate\s+", r"^\s*grant\s+", r"^\s*revoke\s+", r"^\s*set\s+", ] for pattern in write_operations: if re.search(pattern, query): return False return True # Database query tools @mcp.tool() async def query_osm_postgres(query: str, ctx: Context) -> str: """ Execute SQL query against the OSM PostgreSQL database. This database contains the complete OSM data in a postgres database, and is an excellent way to analyze or query geospatial/geographic data. Args: query: SQL query to execute Returns: Query results as formatted text Example query: Find points of interest near a location ```sql SELECT osm_id, name, amenity, tourism, shop, tags FROM planet_osm_point WHERE (amenity IS NOT NULL OR tourism IS NOT NULL OR shop IS NOT NULL) AND ST_DWithin( geography(way), geography(ST_SetSRID(ST_MakePoint(-73.99, 40.71), 4326)), 1000 -- 1000 meters ); ``` The database is in postgres using the postgis extension. It was created by the osm2pgsql tool. This database is a complete dump of the OSM data. In OpenStreetMap (OSM), data is structured using nodes (points), ways (lines/polygons), and relations. Nodes represent individual points with coordinates, while ways are ordered lists of nodes forming lines or closed shapes (polygons). Remember that name alone is not sufficient to disambiguate a feature. For any name you can think of, there are dozens of features around the world with that name, probably even of the same type (e.g. lots of cities named "Los Angeles"). If you know the general location, you can use a bounding box to disambiguate. YOU MUST DISAMBIGUATE FEATURES with bounding boxes!!!!!!!!!!!! Even if you have other WHERE clauses, you MUST use a bounding box to disambiguate features. Name and other tags alone are not sufficient. PostGIS has useful features like ST_Simplify which is especially helpful to reduce data to a reasonable size when doing visualizations. Always try to get and refer to OSM IDs when possible because they are unique and are the absolute fastest way to refer again to a feature. Users don't usually care what they are but they can help you speed up subsequent queries. YOU MUST DISAMBIGUATE FEATURES with bounding boxes!!!!!!!!!!!! Speaking of speed, there's a TON of data, so queries that don't use indexes will be too slow. It's usually best to use postgres and postgis functions, and advanced sql when possible. If you need to explore the data to get a sense of tags, etc., make sure to limit the number of rows you get back to a small number or use aggregation functions. Every query will either need to be filtered with WHERE clauses or be an aggregation query. YOU MUST DISAMBIGUATE FEATURES with bounding boxes!!!!!!!!!!!! IMPORTANT: All the spatial indexes are on the geography type, not the geometry type. This means if you do a spatial query, you need to use the geography function. For example: ``` SELECT b.osm_id AS building_id, b.name AS building_name, ST_AsText(b.way) AS building_geometry FROM planet_osm_polygon b JOIN planet_osm_polygon burbank ON burbank.osm_id = -3529574 JOIN planet_osm_polygon glendale ON glendale.osm_id = -2313082 WHERE ST_Intersects(b.way::geography, burbank.way::geography) AND ST_Intersects(b.way::geography, glendale.way::geography) AND b.building IS NOT NULL; ``` Here's a more detailed explanation of the data representation: • Nodes: [1, 2, 3] • Represent individual points on the map with latitude and longitude coordinates. [1, 2, 3] • Can be used to represent point features like shops, lamp posts, etc. [1] • Collections of nodes are also used to define the shape of ways. [1] • Ways: [1, 2] • Represent collections of nodes. [1, 2] • Do not store their own coordinates; instead, they store an ordered list of node identifiers. [1, 2] • Ways can be open (lines) or closed (polygons). [2, 5] • Used to represent various features like roads, railways, river centerlines, powerlines, and administrative borders. [1] • Relations: [4] • Are groups of nodes and/or ways, used to represent complex features like routes, areas, or relationships between map elements. [4] [1] https://algo.win.tue.nl/tutorials/openstreetmap/ [2] https://docs.geodesk.com/intro-to-osm [3] https://wiki.openstreetmap.org/wiki/Elements [4] https://racum.blog/articles/osm-to-geojson/ [5] https://wiki.openstreetmap.org/wiki/Way Tags are key-value pairs that describe the features in the map. They are used to store information about the features, such as their name, type, or other properties. Note that in the following tables, some tags have their own columns, but all other tags are stored in the tags column as a hstore type. List of tables: | Name | |--------------------| | planet_osm_line | | planet_osm_point | | planet_osm_polygon | | planet_osm_rels | | planet_osm_roads | | planet_osm_ways | | spatial_ref_sys | Table "public.planet_osm_line": | Column | Type | |--------------------+---------------------------| | osm_id | bigint | | access | text | | addr:housename | text | | addr:housenumber | text | | addr:interpolation | text | | admin_level | text | | aerialway | text | | aeroway | text | | amenity | text | | area | text | | barrier | text | | bicycle | text | | brand | text | | bridge | text | | boundary | text | | building | text | | construction | text | | covered | text | | culvert | text | | cutting | text | | denomination | text | | disused | text | | embankment | text | | foot | text | | generator:source | text | | harbour | text | | highway | text | | historic | text | | horse | text | | intermittent | text | | junction | text | | landuse | text | | layer | text | | leisure | text | | lock | text | | man_made | text | | military | text | | motorcar | text | | name | text | | natural | text | | office | text | | oneway | text | | operator | text | | place | text | | population | text | | power | text | | power_source | text | | public_transport | text | | railway | text | | ref | text | | religion | text | | route | text | | service | text | | shop | text | | sport | text | | surface | text | | toll | text | | tourism | text | | tower:type | text | | tracktype | text | | tunnel | text | | water | text | | waterway | text | | wetland | text | | width | text | | wood | text | | z_order | integer | | way_area | real | | tags | hstore | | way | geometry(LineString,4326) | Indexes: "planet_osm_line_osm_id_idx" btree (osm_id) "planet_osm_line_tags_idx" gin (tags) "planet_osm_line_way_geog_idx" gist (geography(way)) Table "public.planet_osm_point": | Column | Type | |--------------------+----------------------| | osm_id | bigint | | access | text | | addr:housename | text | | addr:housenumber | text | | addr:interpolation | text | | admin_level | text | | aerialway | text | | aeroway | text | | amenity | text | | area | text | | barrier | text | | bicycle | text | | brand | text | | bridge | text | | boundary | text | | building | text | | capital | text | | construction | text | | covered | text | | culvert | text | | cutting | text | | denomination | text | | disused | text | | ele | text | | embankment | text | | foot | text | | generator:source | text | | harbour | text | | highway | text | | historic | text | | horse | text | | intermittent | text | | junction | text | | landuse | text | | layer | text | | leisure | text | | lock | text | | man_made | text | | military | text | | motorcar | text | | name | text | | natural | text | | office | text | | oneway | text | | operator | text | | place | text | | population | text | | power | text | | power_source | text | | public_transport | text | | railway | text | | ref | text | | religion | text | | route | text | | service | text | | shop | text | | sport | text | | surface | text | | toll | text | | tourism | text | | tower:type | text | | tunnel | text | | water | text | | waterway | text | | wetland | text | | width | text | | wood | text | | z_order | integer | | tags | hstore | | way | geometry(Point,4326) | Indexes: "planet_osm_point_osm_id_idx" btree (osm_id) "planet_osm_point_tags_idx" gin (tags) "planet_osm_point_way_geog_idx" gist (geography(way)) Table "public.planet_osm_polygon": | Column | Type | |--------------------+-------------------------| | osm_id | bigint | | access | text | | addr:housename | text | | addr:housenumber | text | | addr:interpolation | text | | admin_level | text | | aerialway | text | | aeroway | text | | amenity | text | | area | text | | barrier | text | | bicycle | text | | brand | text | | bridge | text | | boundary | text | | building | text | | construction | text | | covered | text | | culvert | text | | cutting | text | | denomination | text | | disused | text | | embankment | text | | foot | text | | generator:source | text | | harbour | text | | highway | text | | historic | text | | horse | text | | intermittent | text | | junction | text | | landuse | text | | layer | text | | leisure | text | | lock | text | | man_made | text | | military | text | | motorcar | text | | name | text | | natural | text | | office | text | | oneway | text | | operator | text | | place | text | | population | text | | power | text | | power_source | text | | public_transport | text | | railway | text | | ref | text | | religion | text | | route | text | | service | text | | shop | text | | sport | text | | surface | text | | toll | text | | tourism | text | | tower:type | text | | tracktype | text | | tunnel | text | | water | text | | waterway | text | | wetland | text | | width | text | | wood | text | | z_order | integer | | way_area | real | | tags | hstore | | way | geometry(Geometry,4326) | Indexes: "planet_osm_polygon_osm_id_idx" btree (osm_id) "planet_osm_polygon_tags_idx" gin (tags) "planet_osm_polygon_way_geog_idx" gist (geography(way)) Table "public.planet_osm_rels": | Column | Type | |---------+----------| | id | bigint | | way_off | smallint | | rel_off | smallint | | parts | bigint[] | | members | text[] | | tags | text[] | Indexes: "planet_osm_rels_pkey" PRIMARY KEY, btree (id) "planet_osm_rels_parts_idx" gin (parts) WITH (fastupdate=off) """ # Check if database connection is available if not ctx.request_context.lifespan_context.db_conn: return "Database connection is not available. Please check your PostgreSQL server." enforce_read_only = True max_rows = 100 if enforce_read_only and not is_read_only_query(query): return "Error: Only read-only queries are allowed for security reasons." try: results, total_rows = await ctx.request_context.lifespan_context.db_conn.execute_query(query, max_rows=max_rows) if not results: return "Query executed successfully, but returned no results." # Format results as a table columns = list(results[0].keys()) rows = [[str(row.get(col, "")) for col in columns] for row in results] # Calculate column widths col_widths = [max(len(col), max([len(row[i]) for row in rows] + [0])) for i, col in enumerate(columns)] # Format header header = " | ".join(col.ljust(col_widths[i]) for i, col in enumerate(columns)) separator = "-+-".join("-" * width for width in col_widths) # Format rows formatted_rows = [ " | ".join(cell.ljust(col_widths[i]) for i, cell in enumerate(row)) for row in rows ] # Combine all parts table = f"{header}\n{separator}\n" + "\n".join(formatted_rows) # Add summary if total_rows > max_rows: table += f"\n\n(Showing {len(results)} of {total_rows} rows)" return table except Exception as e: return f"Error executing query: {str(e)}" # Map control tools @mcp.tool() async def set_map_view( ctx: Context, center: Optional[List[float]] = None, zoom: Optional[int] = None, bounds: Optional[List[List[float]]] = None ) -> str: """ Set the map view in the web interface. Args: center: [latitude, longitude] center point zoom: Zoom level (0-19) bounds: [[south, west], [north, east]] bounds to display Examples: - Set view to a specific location: `set_map_view(center=[37.7749, -122.4194], zoom=12)` - Set view to show a region: `set_map_view(bounds=[[37.7, -122.5], [37.8, -122.4]])` """ if not ctx.request_context.lifespan_context.flask_server: return "Map server is not available." # Validate parameters if center and (len(center) != 2 or not all(isinstance(c, (int, float)) for c in center)): return "Error: center must be a [latitude, longitude] pair of numbers." if zoom and (not isinstance(zoom, int) or zoom < 0 or zoom > 19): return "Error: zoom must be an integer between 0 and 19." if bounds: if (len(bounds) != 2 or len(bounds[0]) != 2 or len(bounds[1]) != 2 or not all(isinstance(c, (int, float)) for point in bounds for c in point)): return "Error: bounds must be [[south, west], [north, east]] coordinates." # At least one parameter must be provided if not center and zoom is None and not bounds: return "Error: at least one of center, zoom, or bounds must be provided." # Send the command to the map server = ctx.request_context.lifespan_context.flask_server server.set_view(bounds=bounds, center=center, zoom=zoom) # Generate success message message_parts = [] if bounds: message_parts.append(f"bounds={bounds}") if center: message_parts.append(f"center={center}") if zoom is not None: message_parts.append(f"zoom={zoom}") return f"Map view updated successfully: {', '.join(message_parts)}" @mcp.tool() async def set_map_title( ctx: Context, title: str, color: Optional[str] = None, font_size: Optional[str] = None, background_color: Optional[str] = None ) -> str: """ Set the title displayed at the bottom right of the map. Args: title: Text to display as the map title color: CSS color value for the text (e.g., "#0066cc", "red") font_size: CSS font size (e.g., "24px", "1.5em") background_color: CSS background color value (e.g., "rgba(255, 255, 255, 0.8)") Examples: - Set a basic title: `set_map_title("OpenStreetMap Viewer")` - Set a styled title: `set_map_title("San Francisco", color="#0066cc", font_size="28px")` """ if not ctx.request_context.lifespan_context.flask_server: return "Map server is not available." # Prepare options dictionary with only provided values options = {} if color: options["color"] = color if font_size: options["fontSize"] = font_size if background_color: options["backgroundColor"] = background_color # Send the command to the map server = ctx.request_context.lifespan_context.flask_server server.set_title(title, options) # Generate success message style_info = "" if options: style_parts = [] if color: style_parts.append(f"color: {color}") if font_size: style_parts.append(f"size: {font_size}") if background_color: style_parts.append(f"background: {background_color}") style_info = f" with {', '.join(style_parts)}" return f"Map title set to '{title}'{style_info}" @mcp.tool() async def add_map_marker( ctx: Context, coordinates: List[float], text: Optional[str] = None, title: Optional[str] = None, open_popup: bool = False ) -> str: """ Add a marker to the map at the specified coordinates. Args: coordinates: [latitude, longitude] location for the marker text: Text to display in a popup when the marker is clicked title: Tooltip text displayed on hover (optional) open_popup: Whether to automatically open the popup (default: False) Examples: - Add a simple marker: `add_map_marker([37.7749, -122.4194])` - Add a marker with popup: `add_map_marker([37.7749, -122.4194], text="San Francisco", open_popup=True)` """ if not ctx.request_context.lifespan_context.flask_server: return "Map server is not available." # Validate coordinates if len(coordinates) != 2 or not all(isinstance(c, (int, float)) for c in coordinates): return "Error: coordinates must be a [latitude, longitude] pair of numbers." # Prepare options options = {} if title: options["title"] = title options["openPopup"] = open_popup # Send the command to the map server = ctx.request_context.lifespan_context.flask_server server.show_marker(coordinates, text, options) # Generate success message details = [] if text: details.append(f"text: '{text}'") if title: details.append(f"title: '{title}'") details_str = f" with {', '.join(details)}" if details else "" return f"Marker added at coordinates [{coordinates[0]}, {coordinates[1]}]{details_str}" @mcp.tool() async def add_map_polygon( ctx: Context, coordinates: List[List[float]], color: Optional[str] = None, fill_color: Optional[str] = None, fill_opacity: Optional[float] = None, weight: Optional[int] = None, fit_bounds: bool = False ) -> str: """ Add a polygon to the map with the specified coordinates. If you're trying to add a polygon with more than 20 points, stop and use ST_Simplify to reduce the number of points. Args: coordinates: List of [latitude, longitude] points defining the polygon color: Border color (CSS color value) fill_color: Fill color (CSS color value) fill_opacity: Fill opacity (0.0 to 1.0) weight: Border width in pixels fit_bounds: Whether to zoom the map to show the entire polygon Examples: - Add a polygon: `add_map_polygon([[37.78, -122.41], [37.75, -122.41], [37.75, -122.45], [37.78, -122.45]])` - Add a styled polygon: `add_map_polygon([[37.78, -122.41], [37.75, -122.41], [37.75, -122.45]], color="red", fill_opacity=0.3)` """ if not ctx.request_context.lifespan_context.flask_server: return "Map server is not available." # Validate coordinates if not coordinates or not all(len(point) == 2 and all(isinstance(c, (int, float)) for c in point) for point in coordinates): return "Error: coordinates must be a list of [latitude, longitude] points." if len(coordinates) < 3: return "Error: a polygon requires at least 3 points." # Prepare options options = {} if color: options["color"] = color if fill_color: options["fillColor"] = fill_color if fill_opacity is not None: if not 0 <= fill_opacity <= 1: return "Error: fill_opacity must be between 0.0 and 1.0." options["fillOpacity"] = fill_opacity if weight is not None: if not isinstance(weight, int) or weight < 0: return "Error: weight must be a positive integer." options["weight"] = weight options["fitBounds"] = fit_bounds # Send the command to the map server = ctx.request_context.lifespan_context.flask_server server.show_polygon(coordinates, options) # Generate success message style_info = "" if any(key in options for key in ["color", "fillColor", "fillOpacity", "weight"]): style_parts = [] if color: style_parts.append(f"color: {color}") if fill_color: style_parts.append(f"fill: {fill_color}") if fill_opacity is not None: style_parts.append(f"opacity: {fill_opacity}") if weight is not None: style_parts.append(f"weight: {weight}") style_info = f" with {', '.join(style_parts)}" bounds_info = " (map zoomed to fit)" if fit_bounds else "" return f"Polygon added with {len(coordinates)} points{style_info}{bounds_info}" @mcp.tool() async def add_map_line( ctx: Context, coordinates: List[List[float]], color: Optional[str] = None, weight: Optional[int] = None, opacity: Optional[float] = None, dash_array: Optional[str] = None, fit_bounds: bool = False ) -> str: """ Add a line (polyline) to the map with the specified coordinates. If you're trying to add a line with more than 20 points, stop and use ST_Simplify to reduce the number of points. Args: coordinates: List of [latitude, longitude] points defining the line color: Line color (CSS color value) weight: Line width in pixels opacity: Line opacity (0.0 to 1.0) dash_array: SVG dash array pattern for creating dashed lines (e.g., "5,10") fit_bounds: Whether to zoom the map to show the entire line Examples: - Add a simple line: `add_map_line([[37.78, -122.41], [37.75, -122.41], [37.75, -122.45]])` - Add a styled line: `add_map_line([[37.78, -122.41], [37.75, -122.41]], color="blue", weight=3, dash_array="5,10")` """ if not ctx.request_context.lifespan_context.flask_server: return "Map server is not available." # Validate coordinates if not coordinates or not all(len(point) == 2 and all(isinstance(c, (int, float)) for c in point) for point in coordinates): return "Error: coordinates must be a list of [latitude, longitude] points." if len(coordinates) < 2: return "Error: a line requires at least 2 points." # Prepare options options = {} if color: options["color"] = color if weight is not None: if not isinstance(weight, int) or weight < 0: return "Error: weight must be a positive integer." options["weight"] = weight if opacity is not None: if not 0 <= opacity <= 1: return "Error: opacity must be between 0.0 and 1.0." options["opacity"] = opacity if dash_array: options["dashArray"] = dash_array options["fitBounds"] = fit_bounds # Send the command to the map server = ctx.request_context.lifespan_context.flask_server server.show_line(coordinates, options) # Generate success message style_info = "" if any(key in options for key in ["color", "weight", "opacity", "dashArray"]): style_parts = [] if color: style_parts.append(f"color: {color}") if weight is not None: style_parts.append(f"weight: {weight}") if opacity is not None: style_parts.append(f"opacity: {opacity}") if dash_array: style_parts.append(f"dash pattern: {dash_array}") style_info = f" with {', '.join(style_parts)}" bounds_info = " (map zoomed to fit)" if fit_bounds else "" return f"Line added with {len(coordinates)} points{style_info}{bounds_info}" @mcp.tool() async def get_map_view(ctx: Context) -> str: """ Get the current map view information including center coordinates, zoom level, and bounds. The user can pan and zoom the map at will, at any time, so if you ever need to know the current view, call this tool. Returns: JSON string containing the current map view information Examples: - Get current view: `get_map_view()` """ if not ctx.request_context.lifespan_context.flask_server: return "Map server is not available." # Get the current view from the map server server = ctx.request_context.lifespan_context.flask_server view_info = server.get_current_view() # Format the response response = { "center": view_info.get("center"), "zoom": view_info.get("zoom"), "bounds": view_info.get("bounds") } return json.dumps(response, indent=2) # @mcp.tool() # async def get_map_screenshot(ctx: Context) -> str: # """ # Capture a screenshot of the current map view and return it as a # base64-encoded image. # This function requests a screenshot from the map interface and returns it in # a format that can be displayed in the conversation. The screenshot shows the # exact current state of the map including all markers, polygons, lines, and # the current view. Don't use this tool to verify your actions, only use it if # the user asks for something like "What's this thing on the map?" # Returns: # A markdown string with the embedded image # Examples: # - Capture the current map view: `get_map_screenshot()` # """ # if not ctx.request_context.lifespan_context.flask_server: # return "Map server is not available." # # Get the Flask server instance # server = ctx.request_context.lifespan_context.flask_server # # Request a screenshot from the map # image_data = server.capture_screenshot() # if not image_data: # return "Failed to capture map screenshot. Make sure the map is visible in a browser." # # The image data already includes the data:image/png;base64, prefix # # Return as markdown image # return f"" @mcp.tool() async def geolocate(ctx: Context, name: str) -> str: """ Look up a location by name using the Nominatim geocoding service. This is the preferred way to look up a feature by name. Args: name: The name of the location to search for Returns: JSON string containing the Nominatim search results Examples: - Find a city: `geolocate("San Francisco")` - Find a landmark: `geolocate("Eiffel Tower")` - Find a country: `geolocate("New Zealand")` """ if not ctx.request_context.lifespan_context.flask_server: return "Map server is not available." # Get the Flask server instance server = ctx.request_context.lifespan_context.flask_server # Send the geolocate request to the web client via the Flask server results = server.geolocate(name) if results is None: return "Geolocate request timed out or failed. Make sure the map is visible in a browser." if not results: return f"No results found for '{name}'." # Format the results as JSON return json.dumps(results, indent=2) def run_server(): """Run the MCP server""" mcp.run() if __name__ == "__main__": run_server() ```