# Directory Structure ``` ├── .env.example ├── clean.sh ├── iot_mcp_server.py ├── light_bulb_simulator.py ├── mem0_mcp.egg-info │ ├── dependency_links.txt │ ├── PKG-INFO │ ├── requires.txt │ ├── SOURCES.txt │ └── top_level.txt ├── memory_mcp_server.py ├── pyproject.toml ├── README.md ├── requirements.txt ├── templates │ └── index.html └── utils.py ``` # Files -------------------------------------------------------------------------------- /.env.example: -------------------------------------------------------------------------------- ``` # IoT MCP Server configuration MQTT_BROKER=localhost MQTT_PORT=1883 HOST=0.0.0.0 PORT=8090 TRANSPORT=sse # Memory MCP Server configuration HOST=0.0.0.0 PORT=8050 TRANSPORT=sse # LLM Configuration for Memory Server LLM_PROVIDER=openai # Options: openai, openrouter, ollama LLM_API_KEY=your_api_key_here LLM_CHOICE=gpt-4 # Model name (gpt-4, llama3, etc.) LLM_BASE_URL=http://localhost:11434 # For Ollama # Embedding Model Configuration EMBEDDING_MODEL_CHOICE=text-embedding-3-small # For OpenAI # EMBEDDING_MODEL_CHOICE=nomic-embed-text # For Ollama # Vector Database Configuration DATABASE_URL=postgresql://user:password@localhost:5432/vector_db ``` -------------------------------------------------------------------------------- /README.md: -------------------------------------------------------------------------------- ```markdown # MCP Servers for IoT and Memory Management This repository contains two Model Context Protocol (MCP) servers: 1. IoT Device Control MCP Server 2. Memory Management MCP Server ## IoT Device Control MCP Server A Model Context Protocol (MCP) server for controlling and monitoring IoT devices such as smart lights, sensors, and other connected devices. ### Purpose This server provides a standardized interface for IoT device control, monitoring, and state management through the Model Context Protocol. ### Use Cases - Home automation - Industrial IoT monitoring - Remote device management - Smart building control systems ### Features - Send commands to IoT devices - Query device state and status - Subscribe to real-time device updates - Support for MQTT protocol ### API Tools - `send_command`: Send a command to an IoT device - `get_device_state`: Get the current state of an IoT device - `subscribe_to_updates`: Subscribe to real-time updates from a device ## Memory Management MCP Server A Model Context Protocol (MCP) server for persistent memory storage and retrieval using the Mem0 framework. ### Purpose This server enables long-term memory storage and semantic search capabilities through the Model Context Protocol. ### Use Cases - Conversation history storage - Knowledge management - Contextual awareness in AI applications - Persistent information storage ### Features - Save information to long-term memory - Retrieve all stored memories - Search memories using semantic search ### API Tools - `save_memory`: Save information to long-term memory - `get_all_memories`: Get all stored memories for the user - `search_memories`: Search memories using semantic search ## Getting Started 1. Clone this repository 2. Install dependencies: `pip install -r requirements.txt` 3. Create a `.env` file based on the `.env.example` template 4. Run the IoT server: `python iot_mcp_server.py` 5. Run the Memory server: `python memory_mcp_server.py` ## Environment Variables ### IoT MCP Server - `MQTT_BROKER`: MQTT broker address (default: "localhost") - `MQTT_PORT`: MQTT broker port (default: 1883) - `HOST`: Server host address (default: "0.0.0.0") - `PORT`: Server port (default: "8090") - `TRANSPORT`: Transport type, "sse" or "stdio" (default: "sse") ### Memory MCP Server - `MEM0_API_KEY`: API key for Mem0 service (optional) - `MEM0_ENDPOINT`: Endpoint URL for Mem0 service (default: "https://api.mem0.ai") - `HOST`: Server host address (default: "0.0.0.0") - `PORT`: Server port (default: "8050") - `TRANSPORT`: Transport type, "sse" or "stdio" (default: "sse") ## Repository Structure - `iot_mcp_server.py` - IoT device control MCP server implementation - `memory_mcp_server.py` - Memory management MCP server implementation - `utils.py` - Utility functions used by the servers - `requirements.txt` - Package dependencies - `.env.example` - Template for environment variables configuration - `README.md` - Documentation ``` -------------------------------------------------------------------------------- /mem0_mcp.egg-info/dependency_links.txt: -------------------------------------------------------------------------------- ``` ``` -------------------------------------------------------------------------------- /mem0_mcp.egg-info/top_level.txt: -------------------------------------------------------------------------------- ``` templates ``` -------------------------------------------------------------------------------- /mem0_mcp.egg-info/requires.txt: -------------------------------------------------------------------------------- ``` httpx>=0.28.1 mcp[cli]>=1.3.0 mem0ai>=0.1.88 vecs>=0.4.5 ``` -------------------------------------------------------------------------------- /clean.sh: -------------------------------------------------------------------------------- ```bash find . -type d -name "__pycache__" -exec rm -rf {} \; rm -fr venv ``` -------------------------------------------------------------------------------- /mem0_mcp.egg-info/SOURCES.txt: -------------------------------------------------------------------------------- ``` README.md pyproject.toml mem0_mcp.egg-info/PKG-INFO mem0_mcp.egg-info/SOURCES.txt mem0_mcp.egg-info/dependency_links.txt mem0_mcp.egg-info/requires.txt mem0_mcp.egg-info/top_level.txt ``` -------------------------------------------------------------------------------- /pyproject.toml: -------------------------------------------------------------------------------- ```toml [project] name = "mem0-mcp" version = "0.1.0" description = "MCP server for integrating long term memory into AI agents with Mem0" readme = "README.md" requires-python = ">=3.12" dependencies = [ "httpx>=0.28.1", "mcp[cli]>=1.3.0", "mem0ai>=0.1.88", "vecs>=0.4.5" ] ``` -------------------------------------------------------------------------------- /requirements.txt: -------------------------------------------------------------------------------- ``` fastmcp>=0.1.0 paho-mqtt>=2.0.0 python-dotenv>=1.0.0 # mem0>=0.2.0 # Commented out as it's not available on PyPI # psycopg2-binary>=2.9.6 # Only needed for memory server # openai>=1.0.0 # Only needed for memory server setuptools>=65.5.1 # Required by some dependencies flask>=2.0.0 # For the light bulb simulator web server ``` -------------------------------------------------------------------------------- /utils.py: -------------------------------------------------------------------------------- ```python from mem0 import Memory import os from dotenv import load_dotenv # Custom instructions for memory processing # These aren't being used right now but Mem0 does support adding custom prompting # for handling memory retrieval and processing. CUSTOM_INSTRUCTIONS = """ Extract the Following Information: - Key Information: Identify and save the most important details. - Context: Capture the surrounding context to understand the memory's relevance. - Connections: Note any relationships to other topics or memories. - Importance: Highlight why this information might be valuable in the future. - Source: Record where this information came from when applicable. """ def get_mem0_client(): load_dotenv() # Get LLM provider and configuration llm_provider = os.getenv('LLM_PROVIDER') llm_api_key = os.getenv('LLM_API_KEY') llm_model = os.getenv('LLM_CHOICE') embedding_model = os.getenv('EMBEDDING_MODEL_CHOICE') # Initialize config dictionary config = {} # Configure LLM based on provider if llm_provider == 'openai' or llm_provider == 'openrouter': config["llm"] = { "provider": "openai", "config": { "model": llm_model, "temperature": 0.2, "max_tokens": 2000, } } # Set API key in environment if not already set if llm_api_key and not os.environ.get("OPENAI_API_KEY"): os.environ["OPENAI_API_KEY"] = llm_api_key # For OpenRouter, set the specific API key if llm_provider == 'openrouter' and llm_api_key: os.environ["OPENROUTER_API_KEY"] = llm_api_key elif llm_provider == 'ollama': config["llm"] = { "provider": "ollama", "config": { "model": llm_model, "temperature": 0.2, "max_tokens": 2000, } } # Set base URL for Ollama if provided llm_base_url = os.getenv('LLM_BASE_URL') if llm_base_url: config["llm"]["config"]["ollama_base_url"] = llm_base_url # Configure embedder based on provider if llm_provider == 'openai': config["embedder"] = { "provider": "openai", "config": { "model": embedding_model or "text-embedding-3-small", "embedding_dims": 1536 # Default for text-embedding-3-small } } # Set API key in environment if not already set if llm_api_key and not os.environ.get("OPENAI_API_KEY"): os.environ["OPENAI_API_KEY"] = llm_api_key elif llm_provider == 'ollama': config["embedder"] = { "provider": "ollama", "config": { "model": embedding_model or "nomic-embed-text", "embedding_dims": 768 # Default for nomic-embed-text } } # Set base URL for Ollama if provided embedding_base_url = os.getenv('LLM_BASE_URL') if embedding_base_url: config["embedder"]["config"]["ollama_base_url"] = embedding_base_url # Configure Supabase vector store config["vector_store"] = { "provider": "supabase", "config": { "connection_string": os.environ.get('DATABASE_URL', ''), "collection_name": "mem0_memories", "embedding_model_dims": 1536 if llm_provider == "openai" else 768 } } # config["custom_fact_extraction_prompt"] = CUSTOM_INSTRUCTIONS # Create and return the Memory client return Memory.from_config(config) ``` -------------------------------------------------------------------------------- /memory_mcp_server.py: -------------------------------------------------------------------------------- ```python from mcp.server.fastmcp import FastMCP, Context from contextlib import asynccontextmanager from collections.abc import AsyncIterator from dataclasses import dataclass from dotenv import load_dotenv from mem0 import Memory import asyncio import json import os from utils import get_mem0_client load_dotenv() # Default user ID for memory operations DEFAULT_USER_ID = "user" # Create a dataclass for our application context @dataclass class Mem0Context: """Context for the Mem0 MCP server.""" mem0_client: Memory @asynccontextmanager async def mem0_lifespan(server: FastMCP) -> AsyncIterator[Mem0Context]: """ Manages the Mem0 client lifecycle. Args: server: The FastMCP server instance Yields: Mem0Context: The context containing the Mem0 client """ # Create and return the Memory client with the helper function in utils.py mem0_client = get_mem0_client() try: yield Mem0Context(mem0_client=mem0_client) finally: # No explicit cleanup needed for the Mem0 client pass # Initialize FastMCP server with the Mem0 client as context mcp = FastMCP( "mcp-mem0", description="MCP server for long term memory storage and retrieval with Mem0", lifespan=mem0_lifespan, host=os.getenv("HOST", "0.0.0.0"), port=os.getenv("PORT", "8050") ) @mcp.tool() async def save_memory(ctx: Context, text: str) -> str: """Save information to your long-term memory. This tool is designed to store any type of information that might be useful in the future. The content will be processed and indexed for later retrieval through semantic search. Args: ctx: The MCP server provided context which includes the Mem0 client text: The content to store in memory, including any relevant details and context """ try: mem0_client = ctx.request_context.lifespan_context.mem0_client messages = [{"role": "user", "content": text}] mem0_client.add(messages, user_id=DEFAULT_USER_ID) return f"Successfully saved memory: {text[:100]}..." if len(text) > 100 else f"Successfully saved memory: {text}" except Exception as e: return f"Error saving memory: {str(e)}" @mcp.tool() async def get_all_memories(ctx: Context) -> str: """Get all stored memories for the user. Call this tool when you need complete context of all previously memories. Args: ctx: The MCP server provided context which includes the Mem0 client Returns a JSON formatted list of all stored memories, including when they were created and their content. Results are paginated with a default of 50 items per page. """ try: mem0_client = ctx.request_context.lifespan_context.mem0_client memories = mem0_client.get_all(user_id=DEFAULT_USER_ID) if isinstance(memories, dict) and "results" in memories: flattened_memories = [memory["memory"] for memory in memories["results"]] else: flattened_memories = memories return json.dumps(flattened_memories, indent=2) except Exception as e: return f"Error retrieving memories: {str(e)}" @mcp.tool() async def search_memories(ctx: Context, query: str, limit: int = 3) -> str: """Search memories using semantic search. This tool should be called to find relevant information from your memory. Results are ranked by relevance. Always search your memories before making decisions to ensure you leverage your existing knowledge. Args: ctx: The MCP server provided context which includes the Mem0 client query: Search query string describing what you're looking for. Can be natural language. limit: Maximum number of results to return (default: 3) """ try: mem0_client = ctx.request_context.lifespan_context.mem0_client memories = mem0_client.search(query, user_id=DEFAULT_USER_ID, limit=limit) if isinstance(memories, dict) and "results" in memories: flattened_memories = [memory["memory"] for memory in memories["results"]] else: flattened_memories = memories return json.dumps(flattened_memories, indent=2) except Exception as e: return f"Error searching memories: {str(e)}" async def main(): transport = os.getenv("TRANSPORT", "sse") if transport == 'sse': # Run the MCP server with sse transport await mcp.run_sse_async() else: # Run the MCP server with stdio transport await mcp.run_stdio_async() if __name__ == "__main__": asyncio.run(main()) ``` -------------------------------------------------------------------------------- /light_bulb_simulator.py: -------------------------------------------------------------------------------- ```python from flask import Flask, render_template, request, jsonify import paho.mqtt.client as mqtt import json import threading import logging import os import time from dotenv import load_dotenv # Load environment variables load_dotenv() # Configure logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) app = Flask(__name__) # MQTT Settings MQTT_BROKER = os.getenv("MQTT_BROKER", "localhost") MQTT_PORT = int(os.getenv("MQTT_PORT", "1883")) DEVICE_ID = "light_bulb_001" COMMAND_TOPIC = f"devices/{DEVICE_ID}/command" STATE_TOPIC = f"devices/{DEVICE_ID}/state" STATE_REQUEST_TOPIC = f"devices/{DEVICE_ID}/state/request" # Global device state device_state = { "device_id": DEVICE_ID, "type": "light", # Add device type "online": True, "last_seen": None, "properties": { "power": False, # False is off, True is on "brightness": 100, "color": "#FFFF00" # Yellow light } } # Initialize MQTT client with protocol version parameter to address deprecation warning mqtt_client = mqtt.Client(protocol=mqtt.MQTTv311) # Set up MQTT callbacks def on_connect(client, userdata, flags, rc): if rc == 0: logger.info(f"Connected to MQTT broker at {MQTT_BROKER}:{MQTT_PORT}") # Subscribe to command topic client.subscribe(COMMAND_TOPIC) client.subscribe(STATE_REQUEST_TOPIC) # Subscribe to broadcast requests client.subscribe("devices/broadcast/request") # Publish initial state immediately after connection publish_state() else: logger.error(f"Failed to connect to MQTT broker with result code {rc}") def on_message(client, userdata, message): topic = message.topic payload = message.payload.decode("utf-8") logger.info(f"Received message on {topic}: {payload}") try: # Handle command messages if topic == COMMAND_TOPIC: handle_command(payload) # Handle state request messages elif topic == STATE_REQUEST_TOPIC: publish_state() # Handle broadcast requests elif topic == "devices/broadcast/request": # Parse the message msg_data = json.loads(payload) action = msg_data.get("action") # If action is to report state, publish our state if action == "report_state": logger.info(f"Responding to broadcast request: {msg_data.get('request_id')}") publish_state() # Also publish to the broadcast response topic response = { "device_id": DEVICE_ID, "response_to": msg_data.get("request_id"), "state": device_state } mqtt_client.publish("devices/broadcast/response", json.dumps(response)) except Exception as e: logger.error(f"Error processing message: {str(e)}") def handle_command(payload_str): try: payload = json.loads(payload_str) command = payload.get("command") logger.info(f"Processing command: {command} with payload: {payload_str}") if command == "toggle": # Toggle power state device_state["properties"]["power"] = not device_state["properties"]["power"] logger.info(f"Toggled light bulb power to: {device_state['properties']['power']}") elif command == "set_power" and "payload" in payload: # Set power state directly power_state = payload["payload"].get("power", False) old_state = device_state["properties"]["power"] device_state["properties"]["power"] = bool(power_state) logger.info(f"Set light bulb power from {old_state} to {device_state['properties']['power']}") elif command == "set_brightness" and "payload" in payload: # Set brightness brightness = payload["payload"].get("brightness", 100) device_state["properties"]["brightness"] = max(0, min(100, int(brightness))) logger.info(f"Set light bulb brightness to: {device_state['properties']['brightness']}") elif command == "set_color" and "payload" in payload: # Set color color = payload["payload"].get("color") if color: device_state["properties"]["color"] = color logger.info(f"Set light bulb color to: {device_state['properties']['color']}") else: logger.warning(f"Unknown command or missing payload: {command}") # Publish updated state publish_state() except json.JSONDecodeError: logger.error(f"Invalid JSON payload: {payload_str}") except Exception as e: logger.error(f"Error handling command: {str(e)}") import traceback logger.error(traceback.format_exc()) def publish_state(): """Publish current state to MQTT""" try: # Update last_seen timestamp device_state["last_seen"] = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()) mqtt_client.publish(STATE_TOPIC, json.dumps(device_state)) logger.info(f"Published state: {json.dumps(device_state)}") except Exception as e: logger.error(f"Error publishing state: {str(e)}") # Heartbeat function to periodically publish state def heartbeat(): while True: try: publish_state() except Exception as e: logger.error(f"Error in heartbeat: {str(e)}") time.sleep(60) # Publish state every 60 seconds # Routes @app.route('/') def index(): return render_template('index.html', device_state=device_state) @app.route('/api/state', methods=['GET']) def get_state(): return jsonify(device_state) @app.route('/api/toggle', methods=['POST']) def toggle_light(): device_state["properties"]["power"] = not device_state["properties"]["power"] publish_state() return jsonify({"success": True, "power": device_state["properties"]["power"]}) @app.route('/api/brightness', methods=['POST']) def set_brightness(): data = request.json brightness = data.get('brightness', 100) device_state["properties"]["brightness"] = max(0, min(100, int(brightness))) publish_state() return jsonify({"success": True, "brightness": device_state["properties"]["brightness"]}) @app.route('/api/color', methods=['POST']) def set_color(): data = request.json color = data.get('color', "#FFFF00") device_state["properties"]["color"] = color publish_state() return jsonify({"success": True, "color": device_state["properties"]["color"]}) def start_mqtt(): mqtt_client.on_connect = on_connect mqtt_client.on_message = on_message try: mqtt_client.connect(MQTT_BROKER, MQTT_PORT, 60) mqtt_client.loop_start() except Exception as e: logger.error(f"Failed to connect to MQTT broker: {str(e)}") if __name__ == '__main__': # Start MQTT client in a separate thread mqtt_thread = threading.Thread(target=start_mqtt) mqtt_thread.daemon = True mqtt_thread.start() # Start heartbeat in a separate thread heartbeat_thread = threading.Thread(target=heartbeat) heartbeat_thread.daemon = True heartbeat_thread.start() # Give MQTT client time to connect and publish initial state time.sleep(2) # Start Flask web server app.run(host='0.0.0.0', port=7003, debug=True, use_reloader=False) ``` -------------------------------------------------------------------------------- /templates/index.html: -------------------------------------------------------------------------------- ```html <!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <meta name="viewport" content="width=device-width, initial-scale=1.0"> <title>IoT Light Bulb Simulator</title> <style> body { font-family: Arial, sans-serif; max-width: 800px; margin: 0 auto; padding: 20px; text-align: center; background-color: #f5f5f5; } .container { background-color: white; border-radius: 8px; padding: 20px; box-shadow: 0 2px 10px rgba(0, 0, 0, 0.1); } h1 { color: #333; } .light-bulb { width: 100px; height: 150px; margin: 30px auto; position: relative; } .bulb { width: 80px; height: 80px; background-color: #eee; border-radius: 50%; margin: 0 auto; position: relative; transition: all 0.3s ease; box-shadow: 0 0 10px rgba(0, 0, 0, 0.1); } .bulb.on { box-shadow: 0 0 30px; } .base { width: 30px; height: 20px; background-color: #555; margin: 0 auto; border-radius: 3px; } .controls { margin-top: 40px; display: flex; flex-direction: column; align-items: center; gap: 20px; } .toggle-btn { background-color: #4CAF50; color: white; border: none; padding: 10px 20px; border-radius: 4px; cursor: pointer; font-size: 16px; transition: background-color 0.3s; } .toggle-btn:hover { background-color: #45a049; } .slider-container { width: 100%; max-width: 300px; } .slider { width: 100%; height: 25px; background: #d3d3d3; outline: none; -webkit-appearance: none; border-radius: 10px; } .slider::-webkit-slider-thumb { -webkit-appearance: none; appearance: none; width: 25px; height: 25px; background: #4CAF50; cursor: pointer; border-radius: 50%; } .slider::-moz-range-thumb { width: 25px; height: 25px; background: #4CAF50; cursor: pointer; border-radius: 50%; } .color-picker { margin-top: 10px; } .status { margin-top: 20px; padding: 10px; background-color: #f0f0f0; border-radius: 4px; font-family: monospace; text-align: left; max-height: 200px; overflow-y: auto; } .mqtt-info { margin-top: 20px; background-color: #e8f4f8; padding: 10px; border-radius: 4px; font-size: 0.9em; text-align: left; } </style> </head> <body> <div class="container"> <h1>IoT Light Bulb Simulator</h1> <p>Device ID: <strong id="device-id">{{ device_state.device_id }}</strong></p> <div class="light-bulb"> <div id="bulb" class="bulb {% if device_state.properties.power %}on{% endif %}" style="background-color: {% if device_state.properties.power %}{{ device_state.properties.color }}{% else %}#eee{% endif %}; opacity: {% if device_state.properties.power %}{{ device_state.properties.brightness/100 }}{% else %}1{% endif %};"> </div> <div class="base"></div> </div> <div class="controls"> <button id="toggle-btn" class="toggle-btn"> {% if device_state.properties.power %}Turn Off{% else %}Turn On{% endif %} </button> <div class="slider-container"> <label for="brightness">Brightness: <span id="brightness-value">{{ device_state.properties.brightness }}</span>%</label> <input type="range" min="1" max="100" value="{{ device_state.properties.brightness }}" class="slider" id="brightness"> </div> <div class="color-picker"> <label for="color">Color:</label> <input type="color" id="color" value="{{ device_state.properties.color }}"> </div> </div> <div class="status"> <h3>Device State:</h3> <pre id="state-display">{{ device_state | tojson(indent=2) }}</pre> </div> <div class="mqtt-info"> <h3>MQTT Information:</h3> <p><strong>Broker:</strong> {{ device_state.mqtt_broker if device_state.mqtt_broker else 'localhost' }}:{{ device_state.mqtt_port if device_state.mqtt_port else '1883' }}</p> <p><strong>Command Topic:</strong> devices/{{ device_state.device_id }}/command</p> <p><strong>State Topic:</strong> devices/{{ device_state.device_id }}/state</p> <p><strong>State Request Topic:</strong> devices/{{ device_state.device_id }}/state/request</p> </div> </div> <script> // Elements const bulb = document.getElementById('bulb'); const toggleBtn = document.getElementById('toggle-btn'); const brightnessSlider = document.getElementById('brightness'); const brightnessValue = document.getElementById('brightness-value'); const colorPicker = document.getElementById('color'); const stateDisplay = document.getElementById('state-display'); // Current state let deviceState = {{ device_state | tojson }}; // Update UI based on state function updateUI() { // Update bulb appearance if (deviceState.properties.power) { bulb.classList.add('on'); bulb.style.backgroundColor = deviceState.properties.color; bulb.style.opacity = deviceState.properties.brightness / 100; toggleBtn.textContent = 'Turn Off'; } else { bulb.classList.remove('on'); bulb.style.backgroundColor = '#eee'; bulb.style.opacity = 1; toggleBtn.textContent = 'Turn On'; } // Update controls brightnessSlider.value = deviceState.properties.brightness; brightnessValue.textContent = deviceState.properties.brightness; colorPicker.value = deviceState.properties.color; // Update state display stateDisplay.textContent = JSON.stringify(deviceState, null, 2); } // Toggle light toggleBtn.addEventListener('click', async () => { try { const response = await fetch('/api/toggle', { method: 'POST', headers: { 'Content-Type': 'application/json' } }); const data = await response.json(); if (data.success) { deviceState.properties.power = data.power; updateUI(); } } catch (error) { console.error('Error toggling light:', error); } }); // Set brightness brightnessSlider.addEventListener('input', () => { brightnessValue.textContent = brightnessSlider.value; if (deviceState.properties.power) { bulb.style.opacity = brightnessSlider.value / 100; } }); brightnessSlider.addEventListener('change', async () => { try { const response = await fetch('/api/brightness', { method: 'POST', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify({ brightness: parseInt(brightnessSlider.value) }) }); const data = await response.json(); if (data.success) { deviceState.properties.brightness = data.brightness; updateUI(); } } catch (error) { console.error('Error setting brightness:', error); } }); // Set color colorPicker.addEventListener('change', async () => { try { const response = await fetch('/api/color', { method: 'POST', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify({ color: colorPicker.value }) }); const data = await response.json(); if (data.success) { deviceState.properties.color = data.color; updateUI(); } } catch (error) { console.error('Error setting color:', error); } }); // Periodically refresh state from server setInterval(async () => { try { const response = await fetch('/api/state'); const data = await response.json(); deviceState = data; updateUI(); } catch (error) { console.error('Error fetching state:', error); } }, 2000); </script> </body> </html> ``` -------------------------------------------------------------------------------- /iot_mcp_server.py: -------------------------------------------------------------------------------- ```python from mcp.server.fastmcp import FastMCP, Context from contextlib import asynccontextmanager from collections.abc import AsyncIterator from dataclasses import dataclass, field from paho.mqtt.client import Client as MQTTClient import asyncio import json import os import time from dotenv import load_dotenv # Load environment variables from .env file load_dotenv() @dataclass class IoTContext: """Context for the IoT MCP server.""" mqtt_client: MQTTClient connected_devices: dict = field(default_factory=dict) # Create a global MQTT client that can be reused across sessions global_mqtt_client = None global_devices = {} @asynccontextmanager async def iot_lifespan(server: FastMCP) -> AsyncIterator[IoTContext]: """ Manages the MQTT client lifecycle. Args: server: The FastMCP server instance Yields: IoTContext: The context containing the MQTT client """ global global_mqtt_client, global_devices # Initialize MQTT client only if it doesn't exist if global_mqtt_client is None: print("Creating new MQTT client") # Use MQTTv311 protocol to address deprecation warning import paho.mqtt.client as mqtt # Import here to ensure we have the module mqtt_client = MQTTClient(client_id="iot_mcp_server", protocol=mqtt.MQTTv311) broker_address = os.getenv("MQTT_BROKER", "localhost") broker_port = int(os.getenv("MQTT_PORT", "1883")) # Set up MQTT callbacks for device discovery def on_connect(client, userdata, flags, rc): if rc == 0: print(f"Connected to MQTT broker at {broker_address}:{broker_port}") # Subscribe to all device state topics for discovery client.subscribe("devices/+/state") # Subscribe to broadcast responses client.subscribe("devices/broadcast/response") print("Subscribed to device state topics") else: print(f"Failed to connect to MQTT broker with result code {rc}") def on_message(client, userdata, message): topic = message.topic payload = message.payload.decode("utf-8") print(f"MQTT: Received message on {topic}: {payload[:100]}...") # Check if this is a device state message if topic.startswith("devices/") and topic.endswith("/state"): try: # Parse device data device_data = json.loads(payload) device_id = device_data.get("device_id") if device_id: # Update our device registry global_devices[device_id] = { "id": device_id, "type": device_data.get("type", "unknown"), "online": device_data.get("online", True), "last_seen": time.time(), "properties": device_data.get("properties", {}), "topic": topic } print(f"Discovered/updated device: {device_id} with properties: {device_data.get('properties', {})}") except json.JSONDecodeError: print(f"Received invalid JSON in device state: {payload}") except Exception as e: print(f"Error processing device state: {str(e)}") # Set callbacks mqtt_client.on_connect = on_connect mqtt_client.on_message = on_message try: # Connect to the MQTT broker print(f"Connecting to MQTT broker at {broker_address}:{broker_port}") mqtt_client.connect(broker_address, broker_port) mqtt_client.loop_start() # Start the MQTT client loop in a separate thread # Wait a short time to ensure connection and subscription await asyncio.sleep(1) # Force a request for device states (using a broadcast topic instead of wildcards) print("Requesting states from all devices via broadcast") mqtt_client.publish("devices/broadcast/request", json.dumps({"request_id": "startup", "action": "report_state"})) # Add the light bulb simulator as a known device if we're running locally if broker_address in ("localhost", "127.0.0.1"): print("Running locally - adding light bulb as a default device") global_devices["light_bulb_001"] = { "id": "light_bulb_001", "type": "light", "online": True, "last_seen": time.time(), "properties": { "power": False, "brightness": 100, "color": "#FFFF00" }, "topic": "devices/light_bulb_001/state" } global_mqtt_client = mqtt_client except Exception as e: print(f"Failed to connect MQTT client: {str(e)}") # Clean up if connection failed mqtt_client.loop_stop() mqtt_client = None else: print("Reusing existing MQTT client") # Create context with global client and devices context = IoTContext(mqtt_client=global_mqtt_client, connected_devices=global_devices) try: yield context finally: # Don't disconnect the client, it will be reused pass # Initialize FastMCP server with the IoT context mcp = FastMCP( "mcp-iot", description="MCP server for IoT device control", lifespan=iot_lifespan, host=os.getenv("HOST", "0.0.0.0"), port=os.getenv("PORT", "8090") ) @mcp.tool() async def list_devices(ctx: Context, **kwargs) -> str: """List all connected IoT devices. This tool returns information about all IoT devices that have been discovered on the MQTT network. For each device, it provides the ID, type, online status, and available commands. Note: Do NOT try to call a device ID directly. Instead, use the command tools like: - light_bulb_on - To turn the light bulb ON - light_bulb_off - To turn the light bulb OFF - light_bulb_toggle - To toggle the light bulb - check_light_status - To check light status """ try: print(f"List devices tool called with kwargs: {kwargs}") return """Available device: light_bulb_001 (light) To control the light bulb, DO NOT call the device ID directly. Instead use: - light_bulb_on: Turn the light ON - light_bulb_off: Turn the light OFF - light_bulb_toggle: Toggle the light ON/OFF - check_light_status: Check the current status """ except Exception as e: print(f"Error in list_devices: {str(e)}") return "Found light_bulb_001 device. Use light_bulb_on or light_bulb_off to control it." @mcp.tool() async def turn_on_light(ctx: Context, **kwargs) -> str: """Turn ON the light bulb. This tool will explicitly turn on the light bulb, setting its power state to true. Args: ctx: The MCP server provided context dummy: Optional parameter that does nothing (required for schema compatibility) """ try: print(f"Turn on light called with kwargs: {kwargs}") mqtt_client = ctx.request_context.lifespan_context.mqtt_client topic = "devices/light_bulb_001/command" # Prepare the command to set power on message = { "command": "set_power", "timestamp": time.time(), "payload": { "power": True } } # Publish the message print(f"Sending turn ON command: {json.dumps(message)}") result = mqtt_client.publish(topic, json.dumps(message)) print(f"MQTT publish result: {result.rc}") # Request an immediate state update to verify the change await asyncio.sleep(0.5) # Give the bulb time to process the command state_topic = "devices/light_bulb_001/state/request" mqtt_client.publish(state_topic, json.dumps({"request_id": "verify_power_on"})) return "Light bulb has been turned ON" except Exception as e: print(f"Error in turn_on_light: {str(e)}") import traceback print(traceback.format_exc()) return f"Error turning on light: {str(e)}" @mcp.tool() async def turn_off_light(ctx: Context, **kwargs) -> str: """Turn OFF the light bulb. This tool will explicitly turn off the light bulb, setting its power state to false. Args: ctx: The MCP server provided context dummy: Optional parameter that does nothing (required for schema compatibility) """ try: print(f"Turn off light called with kwargs: {kwargs}") mqtt_client = ctx.request_context.lifespan_context.mqtt_client topic = "devices/light_bulb_001/command" # Prepare the command to set power off message = { "command": "set_power", "timestamp": time.time(), "payload": { "power": False } } # Publish the message mqtt_client.publish(topic, json.dumps(message)) return "Light bulb has been turned OFF" except Exception as e: return f"Error turning off light: {str(e)}" @mcp.tool() async def toggle_light(ctx: Context, **kwargs) -> str: """Toggle the light bulb on/off. This is a simplified tool to toggle the light bulb power state. Args: ctx: The MCP server provided context dummy: Optional parameter that does nothing (required for schema compatibility) """ try: print(f"Toggle light called with kwargs: {kwargs}") mqtt_client = ctx.request_context.lifespan_context.mqtt_client topic = "devices/light_bulb_001/command" # Prepare the toggle command message = { "command": "toggle", "timestamp": time.time() } # Publish the message mqtt_client.publish(topic, json.dumps(message)) return "Light bulb toggle command sent successfully" except Exception as e: return f"Error toggling light: {str(e)}" @mcp.tool() async def check_light_status(ctx: Context, **kwargs) -> str: """Check the current status of the light bulb. Returns the current power state, brightness and color of the light bulb. """ try: connected_devices = ctx.request_context.lifespan_context.connected_devices if "light_bulb_001" in connected_devices: properties = connected_devices["light_bulb_001"].get("properties", {}) power = "ON" if properties.get("power", False) else "OFF" brightness = properties.get("brightness", 100) color = properties.get("color", "#FFFF00") # Request a fresh state update mqtt_client = ctx.request_context.lifespan_context.mqtt_client state_topic = "devices/light_bulb_001/state/request" mqtt_client.publish(state_topic, json.dumps({"request_id": "check_status"})) return f"Light bulb status: Power is {power}, Brightness is {brightness}%, Color is {color}" else: return "Light bulb status: Device not found in registry" except Exception as e: return f"Error checking light status: {str(e)}" @mcp.tool() async def light_bulb_on(ctx: Context, **kwargs) -> str: """Turn the light bulb ON. This command turns on the light with ID 'light_bulb_001'. """ return await turn_on_light(ctx, **kwargs) @mcp.tool() async def light_bulb_off(ctx: Context, **kwargs) -> str: """Turn the light bulb OFF. This command turns off the light with ID 'light_bulb_001'. """ return await turn_off_light(ctx, **kwargs) @mcp.tool() async def light_bulb_toggle(ctx: Context, **kwargs) -> str: """Toggle the light bulb ON/OFF. This command toggles the light with ID 'light_bulb_001'. """ return await toggle_light(ctx, **kwargs) @mcp.tool() async def get_help(ctx: Context, **kwargs) -> str: """Get help on how to control IoT devices. This tool provides information about how to properly control the connected IoT devices. """ return """ ## IoT Device Control Help The following commands are available to control the light bulb: 1. `light_bulb_on` - Turn the light bulb ON 2. `light_bulb_off` - Turn the light bulb OFF 3. `light_bulb_toggle` - Toggle the light bulb ON/OFF 4. `check_light_status` - Check the current status of the light bulb IMPORTANT: You cannot control devices by using their device ID directly (e.g., "light_bulb_001"). Always use the specific command functions listed above. """ @mcp.tool() async def check_command(ctx: Context, command: str) -> str: """Check if a command is valid and provide guidance. This is a helper tool to check if a command exists and provide guidance on how to use it. Args: ctx: The MCP server provided context command: The command or device ID to check """ if command == "light_bulb_001": return """ ERROR: "light_bulb_001" is a device ID, not a command. To control this light bulb, use one of these commands: - light_bulb_on - Turn the light ON - light_bulb_off - Turn the light OFF - light_bulb_toggle - Toggle the light ON/OFF - check_light_status - Check the status """ valid_commands = [ "list_devices", "turn_on_light", "turn_off_light", "toggle_light", "check_light_status", "light_bulb_on", "light_bulb_off", "light_bulb_toggle" ] if command in valid_commands: return f"The command '{command}' is valid. You can use it to control the light bulb." else: return f""" The command '{command}' is not recognized. Available commands: - light_bulb_on - Turn the light ON - light_bulb_off - Turn the light OFF - light_bulb_toggle - Toggle the light ON/OFF - check_light_status - Check the status """ async def main(): """Run the MCP server with the configured transport.""" transport = os.getenv("TRANSPORT", "sse") if transport == 'sse': # Run the MCP server with SSE transport print("Starting MCP server with SSE transport...") await mcp.run_sse_async() else: # Run the MCP server with stdio transport print("Starting MCP server with stdio transport...") await mcp.run_stdio_async() if __name__ == "__main__": asyncio.run(main()) ```