# Directory Structure
```
├── .env.example
├── clean.sh
├── iot_mcp_server.py
├── light_bulb_simulator.py
├── mem0_mcp.egg-info
│ ├── dependency_links.txt
│ ├── PKG-INFO
│ ├── requires.txt
│ ├── SOURCES.txt
│ └── top_level.txt
├── memory_mcp_server.py
├── pyproject.toml
├── README.md
├── requirements.txt
├── templates
│ └── index.html
└── utils.py
```
# Files
--------------------------------------------------------------------------------
/.env.example:
--------------------------------------------------------------------------------
```
# IoT MCP Server configuration
MQTT_BROKER=localhost
MQTT_PORT=1883
HOST=0.0.0.0
PORT=8090
TRANSPORT=sse
# Memory MCP Server configuration
HOST=0.0.0.0
PORT=8050
TRANSPORT=sse
# LLM Configuration for Memory Server
LLM_PROVIDER=openai # Options: openai, openrouter, ollama
LLM_API_KEY=your_api_key_here
LLM_CHOICE=gpt-4 # Model name (gpt-4, llama3, etc.)
LLM_BASE_URL=http://localhost:11434 # For Ollama
# Embedding Model Configuration
EMBEDDING_MODEL_CHOICE=text-embedding-3-small # For OpenAI
# EMBEDDING_MODEL_CHOICE=nomic-embed-text # For Ollama
# Vector Database Configuration
DATABASE_URL=postgresql://user:password@localhost:5432/vector_db
```
--------------------------------------------------------------------------------
/README.md:
--------------------------------------------------------------------------------
```markdown
# MCP Servers for IoT and Memory Management
This repository contains two Model Context Protocol (MCP) servers:
1. IoT Device Control MCP Server
2. Memory Management MCP Server
## IoT Device Control MCP Server
A Model Context Protocol (MCP) server for controlling and monitoring IoT devices such as smart lights, sensors, and other connected devices.
### Purpose
This server provides a standardized interface for IoT device control, monitoring, and state management through the Model Context Protocol.
### Use Cases
- Home automation
- Industrial IoT monitoring
- Remote device management
- Smart building control systems
### Features
- Send commands to IoT devices
- Query device state and status
- Subscribe to real-time device updates
- Support for MQTT protocol
### API Tools
- `send_command`: Send a command to an IoT device
- `get_device_state`: Get the current state of an IoT device
- `subscribe_to_updates`: Subscribe to real-time updates from a device
## Memory Management MCP Server
A Model Context Protocol (MCP) server for persistent memory storage and retrieval using the Mem0 framework.
### Purpose
This server enables long-term memory storage and semantic search capabilities through the Model Context Protocol.
### Use Cases
- Conversation history storage
- Knowledge management
- Contextual awareness in AI applications
- Persistent information storage
### Features
- Save information to long-term memory
- Retrieve all stored memories
- Search memories using semantic search
### API Tools
- `save_memory`: Save information to long-term memory
- `get_all_memories`: Get all stored memories for the user
- `search_memories`: Search memories using semantic search
## Getting Started
1. Clone this repository
2. Install dependencies: `pip install -r requirements.txt`
3. Create a `.env` file based on the `.env.example` template
4. Run the IoT server: `python iot_mcp_server.py`
5. Run the Memory server: `python memory_mcp_server.py`
## Environment Variables
### IoT MCP Server
- `MQTT_BROKER`: MQTT broker address (default: "localhost")
- `MQTT_PORT`: MQTT broker port (default: 1883)
- `HOST`: Server host address (default: "0.0.0.0")
- `PORT`: Server port (default: "8090")
- `TRANSPORT`: Transport type, "sse" or "stdio" (default: "sse")
### Memory MCP Server
- `MEM0_API_KEY`: API key for Mem0 service (optional)
- `MEM0_ENDPOINT`: Endpoint URL for Mem0 service (default: "https://api.mem0.ai")
- `HOST`: Server host address (default: "0.0.0.0")
- `PORT`: Server port (default: "8050")
- `TRANSPORT`: Transport type, "sse" or "stdio" (default: "sse")
## Repository Structure
- `iot_mcp_server.py` - IoT device control MCP server implementation
- `memory_mcp_server.py` - Memory management MCP server implementation
- `utils.py` - Utility functions used by the servers
- `requirements.txt` - Package dependencies
- `.env.example` - Template for environment variables configuration
- `README.md` - Documentation
```
--------------------------------------------------------------------------------
/mem0_mcp.egg-info/dependency_links.txt:
--------------------------------------------------------------------------------
```
```
--------------------------------------------------------------------------------
/mem0_mcp.egg-info/top_level.txt:
--------------------------------------------------------------------------------
```
templates
```
--------------------------------------------------------------------------------
/mem0_mcp.egg-info/requires.txt:
--------------------------------------------------------------------------------
```
httpx>=0.28.1
mcp[cli]>=1.3.0
mem0ai>=0.1.88
vecs>=0.4.5
```
--------------------------------------------------------------------------------
/clean.sh:
--------------------------------------------------------------------------------
```bash
find . -type d -name "__pycache__" -exec rm -rf {} \;
rm -fr venv
```
--------------------------------------------------------------------------------
/mem0_mcp.egg-info/SOURCES.txt:
--------------------------------------------------------------------------------
```
README.md
pyproject.toml
mem0_mcp.egg-info/PKG-INFO
mem0_mcp.egg-info/SOURCES.txt
mem0_mcp.egg-info/dependency_links.txt
mem0_mcp.egg-info/requires.txt
mem0_mcp.egg-info/top_level.txt
```
--------------------------------------------------------------------------------
/pyproject.toml:
--------------------------------------------------------------------------------
```toml
[project]
name = "mem0-mcp"
version = "0.1.0"
description = "MCP server for integrating long term memory into AI agents with Mem0"
readme = "README.md"
requires-python = ">=3.12"
dependencies = [
"httpx>=0.28.1",
"mcp[cli]>=1.3.0",
"mem0ai>=0.1.88",
"vecs>=0.4.5"
]
```
--------------------------------------------------------------------------------
/requirements.txt:
--------------------------------------------------------------------------------
```
fastmcp>=0.1.0
paho-mqtt>=2.0.0
python-dotenv>=1.0.0
# mem0>=0.2.0 # Commented out as it's not available on PyPI
# psycopg2-binary>=2.9.6 # Only needed for memory server
# openai>=1.0.0 # Only needed for memory server
setuptools>=65.5.1 # Required by some dependencies
flask>=2.0.0 # For the light bulb simulator web server
```
--------------------------------------------------------------------------------
/utils.py:
--------------------------------------------------------------------------------
```python
from mem0 import Memory
import os
from dotenv import load_dotenv
# Custom instructions for memory processing
# These aren't being used right now but Mem0 does support adding custom prompting
# for handling memory retrieval and processing.
CUSTOM_INSTRUCTIONS = """
Extract the Following Information:
- Key Information: Identify and save the most important details.
- Context: Capture the surrounding context to understand the memory's relevance.
- Connections: Note any relationships to other topics or memories.
- Importance: Highlight why this information might be valuable in the future.
- Source: Record where this information came from when applicable.
"""
def get_mem0_client():
load_dotenv()
# Get LLM provider and configuration
llm_provider = os.getenv('LLM_PROVIDER')
llm_api_key = os.getenv('LLM_API_KEY')
llm_model = os.getenv('LLM_CHOICE')
embedding_model = os.getenv('EMBEDDING_MODEL_CHOICE')
# Initialize config dictionary
config = {}
# Configure LLM based on provider
if llm_provider == 'openai' or llm_provider == 'openrouter':
config["llm"] = {
"provider": "openai",
"config": {
"model": llm_model,
"temperature": 0.2,
"max_tokens": 2000,
}
}
# Set API key in environment if not already set
if llm_api_key and not os.environ.get("OPENAI_API_KEY"):
os.environ["OPENAI_API_KEY"] = llm_api_key
# For OpenRouter, set the specific API key
if llm_provider == 'openrouter' and llm_api_key:
os.environ["OPENROUTER_API_KEY"] = llm_api_key
elif llm_provider == 'ollama':
config["llm"] = {
"provider": "ollama",
"config": {
"model": llm_model,
"temperature": 0.2,
"max_tokens": 2000,
}
}
# Set base URL for Ollama if provided
llm_base_url = os.getenv('LLM_BASE_URL')
if llm_base_url:
config["llm"]["config"]["ollama_base_url"] = llm_base_url
# Configure embedder based on provider
if llm_provider == 'openai':
config["embedder"] = {
"provider": "openai",
"config": {
"model": embedding_model or "text-embedding-3-small",
"embedding_dims": 1536 # Default for text-embedding-3-small
}
}
# Set API key in environment if not already set
if llm_api_key and not os.environ.get("OPENAI_API_KEY"):
os.environ["OPENAI_API_KEY"] = llm_api_key
elif llm_provider == 'ollama':
config["embedder"] = {
"provider": "ollama",
"config": {
"model": embedding_model or "nomic-embed-text",
"embedding_dims": 768 # Default for nomic-embed-text
}
}
# Set base URL for Ollama if provided
embedding_base_url = os.getenv('LLM_BASE_URL')
if embedding_base_url:
config["embedder"]["config"]["ollama_base_url"] = embedding_base_url
# Configure Supabase vector store
config["vector_store"] = {
"provider": "supabase",
"config": {
"connection_string": os.environ.get('DATABASE_URL', ''),
"collection_name": "mem0_memories",
"embedding_model_dims": 1536 if llm_provider == "openai" else 768
}
}
# config["custom_fact_extraction_prompt"] = CUSTOM_INSTRUCTIONS
# Create and return the Memory client
return Memory.from_config(config)
```
--------------------------------------------------------------------------------
/memory_mcp_server.py:
--------------------------------------------------------------------------------
```python
from mcp.server.fastmcp import FastMCP, Context
from contextlib import asynccontextmanager
from collections.abc import AsyncIterator
from dataclasses import dataclass
from dotenv import load_dotenv
from mem0 import Memory
import asyncio
import json
import os
from utils import get_mem0_client
load_dotenv()
# Default user ID for memory operations
DEFAULT_USER_ID = "user"
# Create a dataclass for our application context
@dataclass
class Mem0Context:
"""Context for the Mem0 MCP server."""
mem0_client: Memory
@asynccontextmanager
async def mem0_lifespan(server: FastMCP) -> AsyncIterator[Mem0Context]:
"""
Manages the Mem0 client lifecycle.
Args:
server: The FastMCP server instance
Yields:
Mem0Context: The context containing the Mem0 client
"""
# Create and return the Memory client with the helper function in utils.py
mem0_client = get_mem0_client()
try:
yield Mem0Context(mem0_client=mem0_client)
finally:
# No explicit cleanup needed for the Mem0 client
pass
# Initialize FastMCP server with the Mem0 client as context
mcp = FastMCP(
"mcp-mem0",
description="MCP server for long term memory storage and retrieval with Mem0",
lifespan=mem0_lifespan,
host=os.getenv("HOST", "0.0.0.0"),
port=os.getenv("PORT", "8050")
)
@mcp.tool()
async def save_memory(ctx: Context, text: str) -> str:
"""Save information to your long-term memory.
This tool is designed to store any type of information that might be useful in the future.
The content will be processed and indexed for later retrieval through semantic search.
Args:
ctx: The MCP server provided context which includes the Mem0 client
text: The content to store in memory, including any relevant details and context
"""
try:
mem0_client = ctx.request_context.lifespan_context.mem0_client
messages = [{"role": "user", "content": text}]
mem0_client.add(messages, user_id=DEFAULT_USER_ID)
return f"Successfully saved memory: {text[:100]}..." if len(text) > 100 else f"Successfully saved memory: {text}"
except Exception as e:
return f"Error saving memory: {str(e)}"
@mcp.tool()
async def get_all_memories(ctx: Context) -> str:
"""Get all stored memories for the user.
Call this tool when you need complete context of all previously memories.
Args:
ctx: The MCP server provided context which includes the Mem0 client
Returns a JSON formatted list of all stored memories, including when they were created
and their content. Results are paginated with a default of 50 items per page.
"""
try:
mem0_client = ctx.request_context.lifespan_context.mem0_client
memories = mem0_client.get_all(user_id=DEFAULT_USER_ID)
if isinstance(memories, dict) and "results" in memories:
flattened_memories = [memory["memory"] for memory in memories["results"]]
else:
flattened_memories = memories
return json.dumps(flattened_memories, indent=2)
except Exception as e:
return f"Error retrieving memories: {str(e)}"
@mcp.tool()
async def search_memories(ctx: Context, query: str, limit: int = 3) -> str:
"""Search memories using semantic search.
This tool should be called to find relevant information from your memory. Results are ranked by relevance.
Always search your memories before making decisions to ensure you leverage your existing knowledge.
Args:
ctx: The MCP server provided context which includes the Mem0 client
query: Search query string describing what you're looking for. Can be natural language.
limit: Maximum number of results to return (default: 3)
"""
try:
mem0_client = ctx.request_context.lifespan_context.mem0_client
memories = mem0_client.search(query, user_id=DEFAULT_USER_ID, limit=limit)
if isinstance(memories, dict) and "results" in memories:
flattened_memories = [memory["memory"] for memory in memories["results"]]
else:
flattened_memories = memories
return json.dumps(flattened_memories, indent=2)
except Exception as e:
return f"Error searching memories: {str(e)}"
async def main():
transport = os.getenv("TRANSPORT", "sse")
if transport == 'sse':
# Run the MCP server with sse transport
await mcp.run_sse_async()
else:
# Run the MCP server with stdio transport
await mcp.run_stdio_async()
if __name__ == "__main__":
asyncio.run(main())
```
--------------------------------------------------------------------------------
/light_bulb_simulator.py:
--------------------------------------------------------------------------------
```python
from flask import Flask, render_template, request, jsonify
import paho.mqtt.client as mqtt
import json
import threading
import logging
import os
import time
from dotenv import load_dotenv
# Load environment variables
load_dotenv()
# Configure logging
logging.basicConfig(level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
app = Flask(__name__)
# MQTT Settings
MQTT_BROKER = os.getenv("MQTT_BROKER", "localhost")
MQTT_PORT = int(os.getenv("MQTT_PORT", "1883"))
DEVICE_ID = "light_bulb_001"
COMMAND_TOPIC = f"devices/{DEVICE_ID}/command"
STATE_TOPIC = f"devices/{DEVICE_ID}/state"
STATE_REQUEST_TOPIC = f"devices/{DEVICE_ID}/state/request"
# Global device state
device_state = {
"device_id": DEVICE_ID,
"type": "light", # Add device type
"online": True,
"last_seen": None,
"properties": {
"power": False, # False is off, True is on
"brightness": 100,
"color": "#FFFF00" # Yellow light
}
}
# Initialize MQTT client with protocol version parameter to address deprecation warning
mqtt_client = mqtt.Client(protocol=mqtt.MQTTv311)
# Set up MQTT callbacks
def on_connect(client, userdata, flags, rc):
if rc == 0:
logger.info(f"Connected to MQTT broker at {MQTT_BROKER}:{MQTT_PORT}")
# Subscribe to command topic
client.subscribe(COMMAND_TOPIC)
client.subscribe(STATE_REQUEST_TOPIC)
# Subscribe to broadcast requests
client.subscribe("devices/broadcast/request")
# Publish initial state immediately after connection
publish_state()
else:
logger.error(f"Failed to connect to MQTT broker with result code {rc}")
def on_message(client, userdata, message):
topic = message.topic
payload = message.payload.decode("utf-8")
logger.info(f"Received message on {topic}: {payload}")
try:
# Handle command messages
if topic == COMMAND_TOPIC:
handle_command(payload)
# Handle state request messages
elif topic == STATE_REQUEST_TOPIC:
publish_state()
# Handle broadcast requests
elif topic == "devices/broadcast/request":
# Parse the message
msg_data = json.loads(payload)
action = msg_data.get("action")
# If action is to report state, publish our state
if action == "report_state":
logger.info(f"Responding to broadcast request: {msg_data.get('request_id')}")
publish_state()
# Also publish to the broadcast response topic
response = {
"device_id": DEVICE_ID,
"response_to": msg_data.get("request_id"),
"state": device_state
}
mqtt_client.publish("devices/broadcast/response", json.dumps(response))
except Exception as e:
logger.error(f"Error processing message: {str(e)}")
def handle_command(payload_str):
try:
payload = json.loads(payload_str)
command = payload.get("command")
logger.info(f"Processing command: {command} with payload: {payload_str}")
if command == "toggle":
# Toggle power state
device_state["properties"]["power"] = not device_state["properties"]["power"]
logger.info(f"Toggled light bulb power to: {device_state['properties']['power']}")
elif command == "set_power" and "payload" in payload:
# Set power state directly
power_state = payload["payload"].get("power", False)
old_state = device_state["properties"]["power"]
device_state["properties"]["power"] = bool(power_state)
logger.info(f"Set light bulb power from {old_state} to {device_state['properties']['power']}")
elif command == "set_brightness" and "payload" in payload:
# Set brightness
brightness = payload["payload"].get("brightness", 100)
device_state["properties"]["brightness"] = max(0, min(100, int(brightness)))
logger.info(f"Set light bulb brightness to: {device_state['properties']['brightness']}")
elif command == "set_color" and "payload" in payload:
# Set color
color = payload["payload"].get("color")
if color:
device_state["properties"]["color"] = color
logger.info(f"Set light bulb color to: {device_state['properties']['color']}")
else:
logger.warning(f"Unknown command or missing payload: {command}")
# Publish updated state
publish_state()
except json.JSONDecodeError:
logger.error(f"Invalid JSON payload: {payload_str}")
except Exception as e:
logger.error(f"Error handling command: {str(e)}")
import traceback
logger.error(traceback.format_exc())
def publish_state():
"""Publish current state to MQTT"""
try:
# Update last_seen timestamp
device_state["last_seen"] = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime())
mqtt_client.publish(STATE_TOPIC, json.dumps(device_state))
logger.info(f"Published state: {json.dumps(device_state)}")
except Exception as e:
logger.error(f"Error publishing state: {str(e)}")
# Heartbeat function to periodically publish state
def heartbeat():
while True:
try:
publish_state()
except Exception as e:
logger.error(f"Error in heartbeat: {str(e)}")
time.sleep(60) # Publish state every 60 seconds
# Routes
@app.route('/')
def index():
return render_template('index.html', device_state=device_state)
@app.route('/api/state', methods=['GET'])
def get_state():
return jsonify(device_state)
@app.route('/api/toggle', methods=['POST'])
def toggle_light():
device_state["properties"]["power"] = not device_state["properties"]["power"]
publish_state()
return jsonify({"success": True, "power": device_state["properties"]["power"]})
@app.route('/api/brightness', methods=['POST'])
def set_brightness():
data = request.json
brightness = data.get('brightness', 100)
device_state["properties"]["brightness"] = max(0, min(100, int(brightness)))
publish_state()
return jsonify({"success": True, "brightness": device_state["properties"]["brightness"]})
@app.route('/api/color', methods=['POST'])
def set_color():
data = request.json
color = data.get('color', "#FFFF00")
device_state["properties"]["color"] = color
publish_state()
return jsonify({"success": True, "color": device_state["properties"]["color"]})
def start_mqtt():
mqtt_client.on_connect = on_connect
mqtt_client.on_message = on_message
try:
mqtt_client.connect(MQTT_BROKER, MQTT_PORT, 60)
mqtt_client.loop_start()
except Exception as e:
logger.error(f"Failed to connect to MQTT broker: {str(e)}")
if __name__ == '__main__':
# Start MQTT client in a separate thread
mqtt_thread = threading.Thread(target=start_mqtt)
mqtt_thread.daemon = True
mqtt_thread.start()
# Start heartbeat in a separate thread
heartbeat_thread = threading.Thread(target=heartbeat)
heartbeat_thread.daemon = True
heartbeat_thread.start()
# Give MQTT client time to connect and publish initial state
time.sleep(2)
# Start Flask web server
app.run(host='0.0.0.0', port=7003, debug=True, use_reloader=False)
```
--------------------------------------------------------------------------------
/templates/index.html:
--------------------------------------------------------------------------------
```html
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>IoT Light Bulb Simulator</title>
<style>
body {
font-family: Arial, sans-serif;
max-width: 800px;
margin: 0 auto;
padding: 20px;
text-align: center;
background-color: #f5f5f5;
}
.container {
background-color: white;
border-radius: 8px;
padding: 20px;
box-shadow: 0 2px 10px rgba(0, 0, 0, 0.1);
}
h1 {
color: #333;
}
.light-bulb {
width: 100px;
height: 150px;
margin: 30px auto;
position: relative;
}
.bulb {
width: 80px;
height: 80px;
background-color: #eee;
border-radius: 50%;
margin: 0 auto;
position: relative;
transition: all 0.3s ease;
box-shadow: 0 0 10px rgba(0, 0, 0, 0.1);
}
.bulb.on {
box-shadow: 0 0 30px;
}
.base {
width: 30px;
height: 20px;
background-color: #555;
margin: 0 auto;
border-radius: 3px;
}
.controls {
margin-top: 40px;
display: flex;
flex-direction: column;
align-items: center;
gap: 20px;
}
.toggle-btn {
background-color: #4CAF50;
color: white;
border: none;
padding: 10px 20px;
border-radius: 4px;
cursor: pointer;
font-size: 16px;
transition: background-color 0.3s;
}
.toggle-btn:hover {
background-color: #45a049;
}
.slider-container {
width: 100%;
max-width: 300px;
}
.slider {
width: 100%;
height: 25px;
background: #d3d3d3;
outline: none;
-webkit-appearance: none;
border-radius: 10px;
}
.slider::-webkit-slider-thumb {
-webkit-appearance: none;
appearance: none;
width: 25px;
height: 25px;
background: #4CAF50;
cursor: pointer;
border-radius: 50%;
}
.slider::-moz-range-thumb {
width: 25px;
height: 25px;
background: #4CAF50;
cursor: pointer;
border-radius: 50%;
}
.color-picker {
margin-top: 10px;
}
.status {
margin-top: 20px;
padding: 10px;
background-color: #f0f0f0;
border-radius: 4px;
font-family: monospace;
text-align: left;
max-height: 200px;
overflow-y: auto;
}
.mqtt-info {
margin-top: 20px;
background-color: #e8f4f8;
padding: 10px;
border-radius: 4px;
font-size: 0.9em;
text-align: left;
}
</style>
</head>
<body>
<div class="container">
<h1>IoT Light Bulb Simulator</h1>
<p>Device ID: <strong id="device-id">{{ device_state.device_id }}</strong></p>
<div class="light-bulb">
<div id="bulb" class="bulb {% if device_state.properties.power %}on{% endif %}"
style="background-color: {% if device_state.properties.power %}{{ device_state.properties.color }}{% else %}#eee{% endif %};
opacity: {% if device_state.properties.power %}{{ device_state.properties.brightness/100 }}{% else %}1{% endif %};">
</div>
<div class="base"></div>
</div>
<div class="controls">
<button id="toggle-btn" class="toggle-btn">
{% if device_state.properties.power %}Turn Off{% else %}Turn On{% endif %}
</button>
<div class="slider-container">
<label for="brightness">Brightness: <span id="brightness-value">{{ device_state.properties.brightness }}</span>%</label>
<input type="range" min="1" max="100" value="{{ device_state.properties.brightness }}" class="slider" id="brightness">
</div>
<div class="color-picker">
<label for="color">Color:</label>
<input type="color" id="color" value="{{ device_state.properties.color }}">
</div>
</div>
<div class="status">
<h3>Device State:</h3>
<pre id="state-display">{{ device_state | tojson(indent=2) }}</pre>
</div>
<div class="mqtt-info">
<h3>MQTT Information:</h3>
<p><strong>Broker:</strong> {{ device_state.mqtt_broker if device_state.mqtt_broker else 'localhost' }}:{{ device_state.mqtt_port if device_state.mqtt_port else '1883' }}</p>
<p><strong>Command Topic:</strong> devices/{{ device_state.device_id }}/command</p>
<p><strong>State Topic:</strong> devices/{{ device_state.device_id }}/state</p>
<p><strong>State Request Topic:</strong> devices/{{ device_state.device_id }}/state/request</p>
</div>
</div>
<script>
// Elements
const bulb = document.getElementById('bulb');
const toggleBtn = document.getElementById('toggle-btn');
const brightnessSlider = document.getElementById('brightness');
const brightnessValue = document.getElementById('brightness-value');
const colorPicker = document.getElementById('color');
const stateDisplay = document.getElementById('state-display');
// Current state
let deviceState = {{ device_state | tojson }};
// Update UI based on state
function updateUI() {
// Update bulb appearance
if (deviceState.properties.power) {
bulb.classList.add('on');
bulb.style.backgroundColor = deviceState.properties.color;
bulb.style.opacity = deviceState.properties.brightness / 100;
toggleBtn.textContent = 'Turn Off';
} else {
bulb.classList.remove('on');
bulb.style.backgroundColor = '#eee';
bulb.style.opacity = 1;
toggleBtn.textContent = 'Turn On';
}
// Update controls
brightnessSlider.value = deviceState.properties.brightness;
brightnessValue.textContent = deviceState.properties.brightness;
colorPicker.value = deviceState.properties.color;
// Update state display
stateDisplay.textContent = JSON.stringify(deviceState, null, 2);
}
// Toggle light
toggleBtn.addEventListener('click', async () => {
try {
const response = await fetch('/api/toggle', {
method: 'POST',
headers: {
'Content-Type': 'application/json'
}
});
const data = await response.json();
if (data.success) {
deviceState.properties.power = data.power;
updateUI();
}
} catch (error) {
console.error('Error toggling light:', error);
}
});
// Set brightness
brightnessSlider.addEventListener('input', () => {
brightnessValue.textContent = brightnessSlider.value;
if (deviceState.properties.power) {
bulb.style.opacity = brightnessSlider.value / 100;
}
});
brightnessSlider.addEventListener('change', async () => {
try {
const response = await fetch('/api/brightness', {
method: 'POST',
headers: {
'Content-Type': 'application/json'
},
body: JSON.stringify({
brightness: parseInt(brightnessSlider.value)
})
});
const data = await response.json();
if (data.success) {
deviceState.properties.brightness = data.brightness;
updateUI();
}
} catch (error) {
console.error('Error setting brightness:', error);
}
});
// Set color
colorPicker.addEventListener('change', async () => {
try {
const response = await fetch('/api/color', {
method: 'POST',
headers: {
'Content-Type': 'application/json'
},
body: JSON.stringify({
color: colorPicker.value
})
});
const data = await response.json();
if (data.success) {
deviceState.properties.color = data.color;
updateUI();
}
} catch (error) {
console.error('Error setting color:', error);
}
});
// Periodically refresh state from server
setInterval(async () => {
try {
const response = await fetch('/api/state');
const data = await response.json();
deviceState = data;
updateUI();
} catch (error) {
console.error('Error fetching state:', error);
}
}, 2000);
</script>
</body>
</html>
```
--------------------------------------------------------------------------------
/iot_mcp_server.py:
--------------------------------------------------------------------------------
```python
from mcp.server.fastmcp import FastMCP, Context
from contextlib import asynccontextmanager
from collections.abc import AsyncIterator
from dataclasses import dataclass, field
from paho.mqtt.client import Client as MQTTClient
import asyncio
import json
import os
import time
from dotenv import load_dotenv
# Load environment variables from .env file
load_dotenv()
@dataclass
class IoTContext:
"""Context for the IoT MCP server."""
mqtt_client: MQTTClient
connected_devices: dict = field(default_factory=dict)
# Create a global MQTT client that can be reused across sessions
global_mqtt_client = None
global_devices = {}
@asynccontextmanager
async def iot_lifespan(server: FastMCP) -> AsyncIterator[IoTContext]:
"""
Manages the MQTT client lifecycle.
Args:
server: The FastMCP server instance
Yields:
IoTContext: The context containing the MQTT client
"""
global global_mqtt_client, global_devices
# Initialize MQTT client only if it doesn't exist
if global_mqtt_client is None:
print("Creating new MQTT client")
# Use MQTTv311 protocol to address deprecation warning
import paho.mqtt.client as mqtt # Import here to ensure we have the module
mqtt_client = MQTTClient(client_id="iot_mcp_server", protocol=mqtt.MQTTv311)
broker_address = os.getenv("MQTT_BROKER", "localhost")
broker_port = int(os.getenv("MQTT_PORT", "1883"))
# Set up MQTT callbacks for device discovery
def on_connect(client, userdata, flags, rc):
if rc == 0:
print(f"Connected to MQTT broker at {broker_address}:{broker_port}")
# Subscribe to all device state topics for discovery
client.subscribe("devices/+/state")
# Subscribe to broadcast responses
client.subscribe("devices/broadcast/response")
print("Subscribed to device state topics")
else:
print(f"Failed to connect to MQTT broker with result code {rc}")
def on_message(client, userdata, message):
topic = message.topic
payload = message.payload.decode("utf-8")
print(f"MQTT: Received message on {topic}: {payload[:100]}...")
# Check if this is a device state message
if topic.startswith("devices/") and topic.endswith("/state"):
try:
# Parse device data
device_data = json.loads(payload)
device_id = device_data.get("device_id")
if device_id:
# Update our device registry
global_devices[device_id] = {
"id": device_id,
"type": device_data.get("type", "unknown"),
"online": device_data.get("online", True),
"last_seen": time.time(),
"properties": device_data.get("properties", {}),
"topic": topic
}
print(f"Discovered/updated device: {device_id} with properties: {device_data.get('properties', {})}")
except json.JSONDecodeError:
print(f"Received invalid JSON in device state: {payload}")
except Exception as e:
print(f"Error processing device state: {str(e)}")
# Set callbacks
mqtt_client.on_connect = on_connect
mqtt_client.on_message = on_message
try:
# Connect to the MQTT broker
print(f"Connecting to MQTT broker at {broker_address}:{broker_port}")
mqtt_client.connect(broker_address, broker_port)
mqtt_client.loop_start() # Start the MQTT client loop in a separate thread
# Wait a short time to ensure connection and subscription
await asyncio.sleep(1)
# Force a request for device states (using a broadcast topic instead of wildcards)
print("Requesting states from all devices via broadcast")
mqtt_client.publish("devices/broadcast/request", json.dumps({"request_id": "startup", "action": "report_state"}))
# Add the light bulb simulator as a known device if we're running locally
if broker_address in ("localhost", "127.0.0.1"):
print("Running locally - adding light bulb as a default device")
global_devices["light_bulb_001"] = {
"id": "light_bulb_001",
"type": "light",
"online": True,
"last_seen": time.time(),
"properties": {
"power": False,
"brightness": 100,
"color": "#FFFF00"
},
"topic": "devices/light_bulb_001/state"
}
global_mqtt_client = mqtt_client
except Exception as e:
print(f"Failed to connect MQTT client: {str(e)}")
# Clean up if connection failed
mqtt_client.loop_stop()
mqtt_client = None
else:
print("Reusing existing MQTT client")
# Create context with global client and devices
context = IoTContext(mqtt_client=global_mqtt_client, connected_devices=global_devices)
try:
yield context
finally:
# Don't disconnect the client, it will be reused
pass
# Initialize FastMCP server with the IoT context
mcp = FastMCP(
"mcp-iot",
description="MCP server for IoT device control",
lifespan=iot_lifespan,
host=os.getenv("HOST", "0.0.0.0"),
port=os.getenv("PORT", "8090")
)
@mcp.tool()
async def list_devices(ctx: Context, **kwargs) -> str:
"""List all connected IoT devices.
This tool returns information about all IoT devices that have been discovered
on the MQTT network. For each device, it provides the ID, type, online status,
and available commands.
Note: Do NOT try to call a device ID directly. Instead, use the command tools like:
- light_bulb_on - To turn the light bulb ON
- light_bulb_off - To turn the light bulb OFF
- light_bulb_toggle - To toggle the light bulb
- check_light_status - To check light status
"""
try:
print(f"List devices tool called with kwargs: {kwargs}")
return """Available device: light_bulb_001 (light)
To control the light bulb, DO NOT call the device ID directly. Instead use:
- light_bulb_on: Turn the light ON
- light_bulb_off: Turn the light OFF
- light_bulb_toggle: Toggle the light ON/OFF
- check_light_status: Check the current status
"""
except Exception as e:
print(f"Error in list_devices: {str(e)}")
return "Found light_bulb_001 device. Use light_bulb_on or light_bulb_off to control it."
@mcp.tool()
async def turn_on_light(ctx: Context, **kwargs) -> str:
"""Turn ON the light bulb.
This tool will explicitly turn on the light bulb, setting its power state to true.
Args:
ctx: The MCP server provided context
dummy: Optional parameter that does nothing (required for schema compatibility)
"""
try:
print(f"Turn on light called with kwargs: {kwargs}")
mqtt_client = ctx.request_context.lifespan_context.mqtt_client
topic = "devices/light_bulb_001/command"
# Prepare the command to set power on
message = {
"command": "set_power",
"timestamp": time.time(),
"payload": {
"power": True
}
}
# Publish the message
print(f"Sending turn ON command: {json.dumps(message)}")
result = mqtt_client.publish(topic, json.dumps(message))
print(f"MQTT publish result: {result.rc}")
# Request an immediate state update to verify the change
await asyncio.sleep(0.5) # Give the bulb time to process the command
state_topic = "devices/light_bulb_001/state/request"
mqtt_client.publish(state_topic, json.dumps({"request_id": "verify_power_on"}))
return "Light bulb has been turned ON"
except Exception as e:
print(f"Error in turn_on_light: {str(e)}")
import traceback
print(traceback.format_exc())
return f"Error turning on light: {str(e)}"
@mcp.tool()
async def turn_off_light(ctx: Context, **kwargs) -> str:
"""Turn OFF the light bulb.
This tool will explicitly turn off the light bulb, setting its power state to false.
Args:
ctx: The MCP server provided context
dummy: Optional parameter that does nothing (required for schema compatibility)
"""
try:
print(f"Turn off light called with kwargs: {kwargs}")
mqtt_client = ctx.request_context.lifespan_context.mqtt_client
topic = "devices/light_bulb_001/command"
# Prepare the command to set power off
message = {
"command": "set_power",
"timestamp": time.time(),
"payload": {
"power": False
}
}
# Publish the message
mqtt_client.publish(topic, json.dumps(message))
return "Light bulb has been turned OFF"
except Exception as e:
return f"Error turning off light: {str(e)}"
@mcp.tool()
async def toggle_light(ctx: Context, **kwargs) -> str:
"""Toggle the light bulb on/off.
This is a simplified tool to toggle the light bulb power state.
Args:
ctx: The MCP server provided context
dummy: Optional parameter that does nothing (required for schema compatibility)
"""
try:
print(f"Toggle light called with kwargs: {kwargs}")
mqtt_client = ctx.request_context.lifespan_context.mqtt_client
topic = "devices/light_bulb_001/command"
# Prepare the toggle command
message = {
"command": "toggle",
"timestamp": time.time()
}
# Publish the message
mqtt_client.publish(topic, json.dumps(message))
return "Light bulb toggle command sent successfully"
except Exception as e:
return f"Error toggling light: {str(e)}"
@mcp.tool()
async def check_light_status(ctx: Context, **kwargs) -> str:
"""Check the current status of the light bulb.
Returns the current power state, brightness and color of the light bulb.
"""
try:
connected_devices = ctx.request_context.lifespan_context.connected_devices
if "light_bulb_001" in connected_devices:
properties = connected_devices["light_bulb_001"].get("properties", {})
power = "ON" if properties.get("power", False) else "OFF"
brightness = properties.get("brightness", 100)
color = properties.get("color", "#FFFF00")
# Request a fresh state update
mqtt_client = ctx.request_context.lifespan_context.mqtt_client
state_topic = "devices/light_bulb_001/state/request"
mqtt_client.publish(state_topic, json.dumps({"request_id": "check_status"}))
return f"Light bulb status: Power is {power}, Brightness is {brightness}%, Color is {color}"
else:
return "Light bulb status: Device not found in registry"
except Exception as e:
return f"Error checking light status: {str(e)}"
@mcp.tool()
async def light_bulb_on(ctx: Context, **kwargs) -> str:
"""Turn the light bulb ON.
This command turns on the light with ID 'light_bulb_001'.
"""
return await turn_on_light(ctx, **kwargs)
@mcp.tool()
async def light_bulb_off(ctx: Context, **kwargs) -> str:
"""Turn the light bulb OFF.
This command turns off the light with ID 'light_bulb_001'.
"""
return await turn_off_light(ctx, **kwargs)
@mcp.tool()
async def light_bulb_toggle(ctx: Context, **kwargs) -> str:
"""Toggle the light bulb ON/OFF.
This command toggles the light with ID 'light_bulb_001'.
"""
return await toggle_light(ctx, **kwargs)
@mcp.tool()
async def get_help(ctx: Context, **kwargs) -> str:
"""Get help on how to control IoT devices.
This tool provides information about how to properly control the connected IoT devices.
"""
return """
## IoT Device Control Help
The following commands are available to control the light bulb:
1. `light_bulb_on` - Turn the light bulb ON
2. `light_bulb_off` - Turn the light bulb OFF
3. `light_bulb_toggle` - Toggle the light bulb ON/OFF
4. `check_light_status` - Check the current status of the light bulb
IMPORTANT: You cannot control devices by using their device ID directly (e.g., "light_bulb_001").
Always use the specific command functions listed above.
"""
@mcp.tool()
async def check_command(ctx: Context, command: str) -> str:
"""Check if a command is valid and provide guidance.
This is a helper tool to check if a command exists and provide guidance on how to use it.
Args:
ctx: The MCP server provided context
command: The command or device ID to check
"""
if command == "light_bulb_001":
return """
ERROR: "light_bulb_001" is a device ID, not a command.
To control this light bulb, use one of these commands:
- light_bulb_on - Turn the light ON
- light_bulb_off - Turn the light OFF
- light_bulb_toggle - Toggle the light ON/OFF
- check_light_status - Check the status
"""
valid_commands = [
"list_devices", "turn_on_light", "turn_off_light", "toggle_light",
"check_light_status", "light_bulb_on", "light_bulb_off", "light_bulb_toggle"
]
if command in valid_commands:
return f"The command '{command}' is valid. You can use it to control the light bulb."
else:
return f"""
The command '{command}' is not recognized.
Available commands:
- light_bulb_on - Turn the light ON
- light_bulb_off - Turn the light OFF
- light_bulb_toggle - Toggle the light ON/OFF
- check_light_status - Check the status
"""
async def main():
"""Run the MCP server with the configured transport."""
transport = os.getenv("TRANSPORT", "sse")
if transport == 'sse':
# Run the MCP server with SSE transport
print("Starting MCP server with SSE transport...")
await mcp.run_sse_async()
else:
# Run the MCP server with stdio transport
print("Starting MCP server with stdio transport...")
await mcp.run_stdio_async()
if __name__ == "__main__":
asyncio.run(main())
```