# Directory Structure ``` ├── .env-example ├── .gitignore ├── .python-version ├── LICENSE ├── meraki-mcp.py ├── pyproject.toml ├── README.md └── requirements.txt ``` # Files -------------------------------------------------------------------------------- /.python-version: -------------------------------------------------------------------------------- ``` 3.13 ``` -------------------------------------------------------------------------------- /.env-example: -------------------------------------------------------------------------------- ``` MERAKI_API_KEY="Meraki API Key here" MERAKI_ORG_ID="Meraki Org ID here" ``` -------------------------------------------------------------------------------- /.gitignore: -------------------------------------------------------------------------------- ``` # Python __pycache__/ *.py[cod] *$py.class *.so .Python build/ develop-eggs/ dist/ downloads/ eggs/ .eggs/ lib/ lib64/ parts/ sdist/ var/ wheels/ *.egg-info/ .installed.cfg *.egg uv.lock # Virtual Environment venv/ ENV/ env/ .env .venv/ # IDE .idea/ .vscode/ *.swp *.swo # OS .DS_Store Thumbs.db # Documentation ADDITIONAL_TOOLS_ROADMAP.md ``` -------------------------------------------------------------------------------- /README.md: -------------------------------------------------------------------------------- ```markdown # Meraki Magic MCP Meraki Magic is a Python-based MCP (Model Context Protocol) server for Cisco's Meraki Dashboard. Meraki Magic provides tools for querying the Meraki Dashboard API to discover, monitor, and manage your Meraki environment. ## Features - **Comprehensive Network Management**: Full network discovery, monitoring, and management - **Advanced Device Management**: Device provisioning, monitoring, and live tools - **Wireless Management**: Complete wireless SSID and RF profile management - **Switch Management**: Port management, VLAN configuration, and QoS rules - **Appliance Management**: VPN, firewall, content filtering, and security management - **Camera Management**: Analytics, snapshots, and sense configuration - **Network Automation**: Action batches and bulk operations - **Live Device Tools**: Ping, cable testing, LED control, and wake-on-LAN - **Advanced Monitoring**: Events, alerts, and performance analytics ## Installation 1. Clone the repository: ```bash git clone https://github.com/mkutka/meraki-magic.git cd meraki-magic-mcp ``` 2. Create a virtual environment and activate it: ```bash python -m venv .venv source .venv/bin/activate # On Windows: .venv\Scripts\activate ``` 3. Install dependencies: ```bash pip install -r requirements.txt ``` ## Configuration 1. Copy the example environment file: ```bash cp .env-example .env ``` 2. Update the `.env` file with your Meraki API Key and Organization ID: ```env MERAKI_API_KEY="Meraki API Key here" MERAKI_ORG_ID="Meraki Org ID here" ``` ## Usage With Claude Desktop Client 1. Configure Claude Desktop to use this MCP server: - Open Claude Desktop - Go to Settings > Developer > Edit Config - Add the following configuration file `claude_desktop_config.json` ```json { "mcpServers": { "Meraki_Magic_MCP": { "command": "/Users/mkutka/meraki-magic-mcp/.venv/bin/fastmcp", "args": [ "run", "/Users/mkutka/meraki-magic-mcp/meraki-mcp.py" ] } } } ``` - Replace the paths above to reflect your local environment. 2. Restart Claude Desktop 3. Interact with Claude Desktop ## Network Tools Guide This guide provides a comprehensive overview of all the network tools available in the Meraki Magic MCP, organized by category and functionality. ### Table of Contents 1. [Organization Management Tools](#organization-management-tools) 2. [Network Management Tools](#network-management-tools) 3. [Device Management Tools](#device-management-tools) 4. [Wireless Management Tools](#wireless-management-tools) 5. [Switch Management Tools](#switch-management-tools) 6. [Appliance Management Tools](#appliance-management-tools) 7. [Camera Management Tools](#camera-management-tools) 8. [Network Automation Tools](#network-automation-tools) 9. [Advanced Monitoring Tools](#advanced-monitoring-tools) 10. [Live Device Tools](#live-device-tools) --- ## Organization Management Tools ### Basic Organization Operations - **`get_organizations()`** - Get a list of organizations the user has access to - **`get_organization_details(org_id)`** - Get details for a specific organization - **`get_organization_status(org_id)`** - Get the status and health of an organization - **`get_organization_inventory(org_id)`** - Get the inventory for an organization - **`get_organization_license(org_id)`** - Get the license state for an organization - **`get_organization_conf_change(org_id)`** - Get the org change state for an organization ### Advanced Organization Management - **`get_organization_admins(org_id)`** - Get a list of organization admins - **`create_organization_admin(org_id, email, name, org_access, tags, networks)`** - Create a new organization admin - **`get_organization_api_requests(org_id, timespan)`** - Get organization API request history - **`get_organization_webhook_logs(org_id, timespan)`** - Get organization webhook logs ### Network Management - **`get_networks(org_id)`** - Get a list of networks from Meraki - **`create_network(name, tags, productTypes, org_id, copyFromNetworkId)`** - Create a new network - **`delete_network(network_id)`** - Delete a network in Meraki - **`get_network_details(network_id)`** - Get details for a specific network - **`update_network(network_id, update_data)`** - Update a network's properties --- ## Network Management Tools ### Network Monitoring - **`get_network_events(network_id, timespan, per_page)`** - Get network events history - **`get_network_event_types(network_id)`** - Get available network event types - **`get_network_alerts_history(network_id, timespan)`** - Get network alerts history - **`get_network_alerts_settings(network_id)`** - Get network alerts settings - **`update_network_alerts_settings(network_id, defaultDestinations, alerts)`** - Update network alerts settings ### Client Management - **`get_clients(network_id, timespan)`** - Get a list of clients from a network - **`get_client_details(network_id, client_id)`** - Get details for a specific client - **`get_client_usage(network_id, client_id)`** - Get the usage history for a client - **`get_client_policy(network_id, client_id)`** - Get the policy for a specific client - **`update_client_policy(network_id, client_id, device_policy, group_policy_id)`** - Update policy for a client ### Network Traffic & Analysis - **`get_network_traffic(network_id, timespan)`** - Get traffic analysis data for a network --- ## Device Management Tools ### Device Information - **`get_devices(org_id)`** - Get a list of devices from Meraki - **`get_network_devices(network_id)`** - Get a list of devices in a specific network - **`get_device_details(serial)`** - Get details for a specific device by serial number - **`get_device_status(serial)`** - Get the current status of a device - **`get_device_uplink(serial)`** - Get the uplink status of a device ### Device Operations - **`update_device(serial, device_settings)`** - Update a device in the Meraki organization - **`claim_devices(network_id, serials)`** - Claim one or more devices into a Meraki network - **`remove_device(serial)`** - Remove a device from its network - **`reboot_device(serial)`** - Reboot a device ### Device Monitoring - **`get_device_clients(serial, timespan)`** - Get clients connected to a specific device --- ## Live Device Tools ### Network Diagnostics - **`ping_device(serial, target_ip, count)`** - Ping a device from another device - **`get_device_ping_results(serial, ping_id)`** - Get results from a device ping test - **`cable_test_device(serial, ports)`** - Run cable test on device ports - **`get_device_cable_test_results(serial, cable_test_id)`** - Get results from a device cable test ### Device Control - **`blink_device_leds(serial, duration)`** - Blink device LEDs for identification - **`wake_on_lan_device(serial, mac)`** - Send wake-on-LAN packet to a device --- ## Wireless Management Tools ### Basic Wireless Operations - **`get_wireless_ssids(network_id)`** - Get wireless SSIDs for a network - **`update_wireless_ssid(network_id, ssid_number, ssid_settings)`** - Update a wireless SSID - **`get_wireless_settings(network_id)`** - Get wireless settings for a network ### Advanced Wireless Management - **`get_wireless_rf_profiles(network_id)`** - Get wireless RF profiles for a network - **`create_wireless_rf_profile(network_id, name, band_selection_type, **kwargs)`** - Create a wireless RF profile - **`get_wireless_channel_utilization(network_id, timespan)`** - Get wireless channel utilization history - **`get_wireless_signal_quality(network_id, timespan)`** - Get wireless signal quality history - **`get_wireless_connection_stats(network_id, timespan)`** - Get wireless connection statistics - **`get_wireless_client_connectivity_events(network_id, client_id, timespan)`** - Get wireless client connectivity events --- ## Switch Management Tools ### Basic Switch Operations - **`get_switch_ports(serial)`** - Get ports for a switch - **`update_switch_port(serial, port_id, name, tags, enabled, vlan)`** - Update a switch port - **`get_switch_vlans(network_id)`** - Get VLANs for a network - **`create_switch_vlan(network_id, vlan_id, name, subnet, appliance_ip)`** - Create a switch VLAN ### Advanced Switch Management - **`get_switch_port_statuses(serial)`** - Get switch port statuses - **`cycle_switch_ports(serial, ports)`** - Cycle (restart) switch ports - **`get_switch_access_control_lists(network_id)`** - Get switch access control lists - **`update_switch_access_control_lists(network_id, rules)`** - Update switch access control lists - **`get_switch_qos_rules(network_id)`** - Get switch QoS rules - **`create_switch_qos_rule(network_id, vlan, protocol, src_port, **kwargs)`** - Create a switch QoS rule --- ## Appliance Management Tools ### Basic Appliance Operations - **`get_security_center(network_id)`** - Get security information for a network - **`get_vpn_status(network_id)`** - Get VPN status for a network - **`get_firewall_rules(network_id)`** - Get firewall rules for a network - **`update_firewall_rules(network_id, rules)`** - Update firewall rules for a network ### Advanced Appliance Management - **`get_appliance_vpn_site_to_site(network_id)`** - Get appliance VPN site-to-site configuration - **`update_appliance_vpn_site_to_site(network_id, mode, hubs, subnets)`** - Update appliance VPN site-to-site configuration - **`get_appliance_content_filtering(network_id)`** - Get appliance content filtering settings - **`update_appliance_content_filtering(network_id, **kwargs)`** - Update appliance content filtering settings - **`get_appliance_security_events(network_id, timespan)`** - Get appliance security events - **`get_appliance_traffic_shaping(network_id)`** - Get appliance traffic shaping settings - **`update_appliance_traffic_shaping(network_id, global_bandwidth_limits)`** - Update appliance traffic shaping settings --- ## Camera Management Tools ### Basic Camera Operations - **`get_camera_video_settings(network_id, serial)`** - Get video settings for a camera - **`get_camera_quality_settings(network_id)`** - Get quality and retention settings for cameras ### Advanced Camera Management - **`get_camera_analytics_live(serial)`** - Get live camera analytics - **`get_camera_analytics_overview(serial, timespan)`** - Get camera analytics overview - **`get_camera_analytics_zones(serial)`** - Get camera analytics zones - **`generate_camera_snapshot(serial, timestamp)`** - Generate a camera snapshot - **`get_camera_sense(serial)`** - Get camera sense configuration - **`update_camera_sense(serial, sense_enabled, mqtt_broker_id, audio_detection)`** - Update camera sense configuration --- ## Network Automation Tools ### Action Batches - **`create_action_batch(org_id, actions, confirmed, synchronous)`** - Create an action batch for bulk operations - **`get_action_batch_status(org_id, batch_id)`** - Get action batch status - **`get_action_batches(org_id)`** - Get all action batches for an organization --- ## Advanced Monitoring Tools ### Network Events & Alerts - **`get_network_events(network_id, timespan, per_page)`** - Get network events history - **`get_network_event_types(network_id)`** - Get available network event types - **`get_network_alerts_history(network_id, timespan)`** - Get network alerts history - **`get_network_alerts_settings(network_id)`** - Get network alerts settings - **`update_network_alerts_settings(network_id, defaultDestinations, alerts)`** - Update network alerts settings ### Organization Monitoring - **`get_organization_api_requests(org_id, timespan)`** - Get organization API request history - **`get_organization_webhook_logs(org_id, timespan)`** - Get organization webhook logs --- ## Schema Definitions The MCP includes comprehensive Pydantic schemas for data validation: - `SsidUpdateSchema` - Wireless SSID configuration - `FirewallRule` - Firewall rule configuration - `DeviceUpdateSchema` - Device update parameters - `NetworkUpdateSchema` - Network update parameters - `AdminCreationSchema` - Admin creation parameters - `ActionBatchSchema` - Action batch configuration - `VpnSiteToSiteSchema` - VPN site-to-site configuration - `ContentFilteringSchema` - Content filtering settings - `TrafficShapingSchema` - Traffic shaping configuration - `CameraSenseSchema` - Camera sense settings - `SwitchQosRuleSchema` - Switch QoS rule configuration --- ## Best Practices 1. **Error Handling**: Always check API responses for errors 2. **Rate Limiting**: The Meraki API has rate limits; use appropriate delays 3. **Batch Operations**: Use action batches for bulk operations 4. **Validation**: Use the provided schemas for data validation 5. **Monitoring**: Regularly check network events and alerts 6. **Security**: Keep API keys secure and rotate them regularly --- ## Troubleshooting ### Common Issues 1. **Authentication Errors**: Verify your API key is correct and has appropriate permissions 2. **Rate Limiting**: If you encounter rate limiting, implement delays between requests 3. **Network Not Found**: Ensure the network ID is correct and accessible 4. **Device Not Found**: Verify the device serial number is correct and the device is online ### Debug Information Enable debug logging by setting the appropriate log level in your environment. --- ## Additional Resources - [Meraki API Documentation](https://developer.cisco.com/meraki/api-v1/) - [MCP Protocol Documentation](https://modelcontextprotocol.io/) - [FastMCP Documentation](https://github.com/jlowin/fastmcp) For more detailed information about additional tools and future enhancements, see the [Additional Tools Roadmap](ADDITIONAL_TOOLS_ROADMAP.md). --- ## ⚠️ Disclaimer **IMPORTANT: PRODUCTION USE DISCLAIMER** This software is provided "AS IS" without warranty of any kind, either express or implied. The authors and contributors make no representations or warranties regarding the suitability, reliability, availability, accuracy, or completeness of this software for any purpose. **USE AT YOUR OWN RISK**: This MCP server is designed for development, testing, and educational purposes. Running this software in production environments is done entirely at your own risk. The authors and contributors are not responsible for any damages, data loss, service interruptions, or other issues that may arise from the use of this software in production environments. **SECURITY CONSIDERATIONS**: This software requires access to your Meraki API credentials. Ensure that: - API keys are stored securely and not committed to version control - API keys have appropriate permissions and are rotated regularly - Network access is properly secured - Regular security audits are performed **NO WARRANTY**: The authors disclaim all warranties, including but not limited to warranties of merchantability, fitness for a particular purpose, and non-infringement. In no event shall the authors be liable for any claim, damages, or other liability arising from the use of this software. **SUPPORT**: This is an open-source project. For production use, consider implementing additional testing, monitoring, and support mechanisms appropriate for your environment. ``` -------------------------------------------------------------------------------- /pyproject.toml: -------------------------------------------------------------------------------- ```toml [project] name = "meraki-magic-mcp" version = "0.1.0" description = "Add your description here" readme = "README.md" requires-python = ">=3.13" dependencies = [ "mcp[cli]>=1.8.0", "meraki>=2.0.2", ] ``` -------------------------------------------------------------------------------- /requirements.txt: -------------------------------------------------------------------------------- ``` aiohappyeyeballs==2.6.1 aiohttp==3.11.18 aiosignal==1.3.2 annotated-types==0.7.0 anyio==4.9.0 attrs==25.3.0 build==1.2.2.post1 certifi==2025.4.26 charset-normalizer==3.4.2 click==8.1.8 exceptiongroup==1.2.2 fastmcp==2.2.10 frozenlist==1.6.0 h11==0.16.0 httpcore==1.0.9 httpx==0.28.1 httpx-sse==0.4.0 idna==3.10 iniconfig==2.1.0 Jinja2==3.1.6 markdown-it-py==3.0.0 MarkupSafe==3.0.2 mcp==1.7.1 mdurl==0.1.2 meraki==2.0.2 multidict==6.4.3 openapi-pydantic==0.5.1 packaging==25.0 pluggy==1.5.0 propcache==0.3.1 pydantic==2.11.4 pydantic-settings==2.9.1 pydantic_core==2.33.2 Pygments==2.19.1 pyproject_hooks==1.2.0 pytest==7.4.4 python-dotenv==1.1.0 python-multipart==0.0.20 requests==2.32.3 rich==14.0.0 setuptools==70.3.0 shellingham==1.5.4 sniffio==1.3.1 sse-starlette==2.3.4 starlette==0.46.2 typer==0.15.3 typing-inspection==0.4.0 typing_extensions==4.13.2 urllib3==2.4.0 uvicorn==0.34.2 websockets==15.0.1 wheel==0.45.1 yarl==1.20.0 ``` -------------------------------------------------------------------------------- /meraki-mcp.py: -------------------------------------------------------------------------------- ```python import os import json import meraki import asyncio import functools from typing import Dict, List, Optional, Any, TypedDict, Union, Callable from pydantic import BaseModel, Field from mcp.server.fastmcp import FastMCP from dotenv import load_dotenv # Load environment variables from .env file load_dotenv() # Create an MCP server mcp = FastMCP("Meraki Magic MCP") # Configuration MERAKI_API_KEY = os.getenv("MERAKI_API_KEY") MERAKI_ORG_ID = os.getenv("MERAKI_ORG_ID") # Initialize Meraki API client using Meraki SDK dashboard = meraki.DashboardAPI(api_key=MERAKI_API_KEY, suppress_logging=True) ################### # ASYNC UTILITIES ################### def to_async(func: Callable) -> Callable: """ Convert a synchronous function to an asynchronous function Args: func: The synchronous function to convert Returns: An asynchronous version of the function """ @functools.wraps(func) async def wrapper(*args, **kwargs): loop = asyncio.get_event_loop() return await loop.run_in_executor( None, lambda: func(*args, **kwargs) ) return wrapper # Create async versions of commonly used Meraki API methods async_get_organizations = to_async(dashboard.organizations.getOrganizations) async_get_organization = to_async(dashboard.organizations.getOrganization) async_get_organization_networks = to_async(dashboard.organizations.getOrganizationNetworks) async_get_organization_devices = to_async(dashboard.organizations.getOrganizationDevices) async_get_network = to_async(dashboard.networks.getNetwork) async_get_network_devices = to_async(dashboard.networks.getNetworkDevices) async_get_network_clients = to_async(dashboard.networks.getNetworkClients) async_get_device = to_async(dashboard.devices.getDevice) async_update_device = to_async(dashboard.devices.updateDevice) async_get_wireless_ssids = to_async(dashboard.wireless.getNetworkWirelessSsids) async_update_wireless_ssid = to_async(dashboard.wireless.updateNetworkWirelessSsid) ################### # SCHEMA DEFINITIONS ################### # Wireless SSID Schema class Dot11wSettings(BaseModel): enabled: bool = Field(False, description="Whether 802.11w is enabled or not") required: bool = Field(False, description="Whether 802.11w is required or not") class Dot11rSettings(BaseModel): enabled: bool = Field(False, description="Whether 802.11r is enabled or not") adaptive: bool = Field(False, description="Whether 802.11r is adaptive or not") class RadiusServer(BaseModel): host: str = Field(..., description="IP address of the RADIUS server") port: int = Field(..., description="Port of the RADIUS server") secret: str = Field(..., description="Secret for the RADIUS server") radsecEnabled: Optional[bool] = Field(None, description="Whether RADSEC is enabled or not") openRoamingCertificateId: Optional[int] = Field(None, description="OpenRoaming certificate ID") caCertificate: Optional[str] = Field(None, description="CA certificate for RADSEC") class SsidUpdateSchema(BaseModel): name: Optional[str] = Field(None, description="The name of the SSID") enabled: Optional[bool] = Field(None, description="Whether the SSID is enabled or not") authMode: Optional[str] = Field(None, description="The auth mode for the SSID (e.g., 'open', 'psk', '8021x-radius')") enterpriseAdminAccess: Optional[str] = Field(None, description="Enterprise admin access setting") encryptionMode: Optional[str] = Field(None, description="The encryption mode for the SSID") psk: Optional[str] = Field(None, description="The pre-shared key for the SSID when using PSK auth mode") wpaEncryptionMode: Optional[str] = Field(None, description="WPA encryption mode (e.g., 'WPA1 and WPA2', 'WPA2 only')") dot11w: Optional[Dot11wSettings] = Field(None, description="802.11w settings") dot11r: Optional[Dot11rSettings] = Field(None, description="802.11r settings") splashPage: Optional[str] = Field(None, description="The type of splash page for the SSID") radiusServers: Optional[List[RadiusServer]] = Field(None, description="List of RADIUS servers") visible: Optional[bool] = Field(None, description="Whether the SSID is visible or not") availableOnAllAps: Optional[bool] = Field(None, description="Whether the SSID is available on all APs") bandSelection: Optional[str] = Field(None, description="Band selection for SSID (e.g., '5 GHz band only', 'Dual band operation')") # Firewall Rule Schema class FirewallRule(BaseModel): comment: str = Field(..., description="Description of the firewall rule") policy: str = Field(..., description="'allow' or 'deny'") protocol: str = Field(..., description="The protocol (e.g., 'tcp', 'udp', 'any')") srcPort: Optional[str] = Field("Any", description="Source port (e.g., '80', '443-8080', 'Any')") srcCidr: str = Field("Any", description="Source CIDR (e.g., '192.168.1.0/24', 'Any')") destPort: Optional[str] = Field("Any", description="Destination port (e.g., '80', '443-8080', 'Any')") destCidr: str = Field("Any", description="Destination CIDR (e.g., '192.168.1.0/24', 'Any')") syslogEnabled: Optional[bool] = Field(False, description="Whether syslog is enabled for this rule") # Device Update Schema class DeviceUpdateSchema(BaseModel): name: Optional[str] = Field(None, description="The name of the device") tags: Optional[List[str]] = Field(None, description="List of tags for the device") lat: Optional[float] = Field(None, description="Latitude of the device") lng: Optional[float] = Field(None, description="Longitude of the device") address: Optional[str] = Field(None, description="Physical address of the device") notes: Optional[str] = Field(None, description="Notes for the device") moveMapMarker: Optional[bool] = Field(None, description="Whether to move the map marker or not") switchProfileId: Optional[str] = Field(None, description="Switch profile ID") floorPlanId: Optional[str] = Field(None, description="Floor plan ID") # Network Update Schema class NetworkUpdateSchema(BaseModel): name: Optional[str] = Field(None, description="The name of the network") timeZone: Optional[str] = Field(None, description="The timezone of the network") tags: Optional[List[str]] = Field(None, description="List of tags for the network") enrollmentString: Optional[str] = Field(None, description="Enrollment string for the network") notes: Optional[str] = Field(None, description="Notes for the network") # Admin Creation Schema class AdminCreationSchema(BaseModel): email: str = Field(..., description="Email address of the admin") name: str = Field(..., description="Name of the admin") orgAccess: str = Field(..., description="Access level for the organization") tags: Optional[List[str]] = Field(None, description="List of tags for the admin") networks: Optional[List[dict]] = Field(None, description="Network access for the admin") # Action Batch Schema class ActionBatchSchema(BaseModel): actions: List[dict] = Field(..., description="List of actions to perform") confirmed: bool = Field(True, description="Whether the batch is confirmed") synchronous: bool = Field(False, description="Whether the batch is synchronous") # VPN Configuration Schema class VpnSiteToSiteSchema(BaseModel): mode: str = Field(..., description="VPN mode (none, full, or hub-and-spoke)") hubs: Optional[List[dict]] = Field(None, description="List of hub configurations") subnets: Optional[List[dict]] = Field(None, description="List of subnet configurations") # Content Filtering Schema class ContentFilteringSchema(BaseModel): allowedUrls: Optional[List[str]] = Field(None, description="List of allowed URLs") blockedUrls: Optional[List[str]] = Field(None, description="List of blocked URLs") blockedUrlPatterns: Optional[List[str]] = Field(None, description="List of blocked URL patterns") youtubeRestrictedForTeenagers: Optional[bool] = Field(None, description="Restrict YouTube for teenagers") youtubeRestrictedForMature: Optional[bool] = Field(None, description="Restrict YouTube for mature content") # Traffic Shaping Schema class TrafficShapingSchema(BaseModel): globalBandwidthLimits: Optional[dict] = Field(None, description="Global bandwidth limits") rules: Optional[List[dict]] = Field(None, description="Traffic shaping rules") # Camera Sense Schema class CameraSenseSchema(BaseModel): senseEnabled: Optional[bool] = Field(None, description="Whether camera sense is enabled") mqttBrokerId: Optional[str] = Field(None, description="MQTT broker ID") audioDetection: Optional[dict] = Field(None, description="Audio detection settings") # Switch QoS Rule Schema class SwitchQosRuleSchema(BaseModel): vlan: int = Field(..., description="VLAN ID") protocol: str = Field(..., description="Protocol (tcp, udp, any)") srcPort: int = Field(..., description="Source port") srcPortRange: Optional[str] = Field(None, description="Source port range") dstPort: Optional[int] = Field(None, description="Destination port") dstPortRange: Optional[str] = Field(None, description="Destination port range") dscp: Optional[int] = Field(None, description="DSCP value") ####################### # ORGANIZATION TOOLS # ####################### # Get organizations @mcp.tool() async def get_organizations() -> str: """Get a list of organizations the user has access to""" organizations = await async_get_organizations() return json.dumps(organizations, indent=2) # Get organization details @mcp.tool() async def get_organization_details(org_id: str = None) -> str: """Get details for a specific organization, defaults to the configured organization""" organization_id = org_id or MERAKI_ORG_ID org_details = await async_get_organization(organization_id) return json.dumps(org_details, indent=2) # Get networks from Meraki @mcp.tool() async def get_networks(org_id: str = None) -> str: """Get a list of networks from Meraki""" organization_id = org_id or MERAKI_ORG_ID networks = await async_get_organization_networks(organization_id) return json.dumps(networks, indent=2) # Get devices from Meraki @mcp.tool() async def get_devices(org_id: str = None) -> str: """Get a list of devices from Meraki""" organization_id = org_id or MERAKI_ORG_ID devices = await async_get_organization_devices(organization_id) return json.dumps(devices, indent=2) # Create network in Meraki @mcp.tool() def create_network(name: str, tags: list[str], productTypes: list[str], org_id: str = None, copyFromNetworkId: str = None) -> str: """Create a new network in Meraki, optionally copying from another network.""" organization_id = org_id or MERAKI_ORG_ID kwargs = {} if copyFromNetworkId: kwargs['copyFromNetworkId'] = copyFromNetworkId network = dashboard.organizations.createOrganizationNetwork(organization_id, name, productTypes, tags=tags, **kwargs) return json.dumps(network, indent=2) # Delete network in Meraki @mcp.tool() def delete_network(network_id: str) -> str: """Delete a network in Meraki""" dashboard.networks.deleteNetwork(network_id) return f"Network {network_id} deleted" # Get organization status @mcp.tool() def get_organization_status(org_id: str = None) -> str: """Get the status and health of an organization""" organization_id = org_id or MERAKI_ORG_ID status = dashboard.organizations.getOrganizationStatus(organization_id) return json.dumps(status, indent=2) # Get organization inventory @mcp.tool() def get_organization_inventory(org_id: str = None) -> str: """Get the inventory for an organization""" organization_id = org_id or MERAKI_ORG_ID inventory = dashboard.organizations.getOrganizationInventoryDevices(organization_id) return json.dumps(inventory, indent=2) # Get organization license state @mcp.tool() def get_organization_license(org_id: str = None) -> str: """Get the license state for an organization""" organization_id = org_id or MERAKI_ORG_ID license_state = dashboard.organizations.getOrganizationLicensesOverview(organization_id) return json.dumps(license_state, indent=2) # Get organization configuration changes @mcp.tool() def get_organization_conf_change(org_id: str = None) -> str: """Get the org change state for an organization""" organization_id = org_id or MERAKI_ORG_ID org_config_changes = dashboard.organizations.getOrganizationConfigurationChanges(organization_id) return json.dumps(org_config_changes, indent=2) ####################### # NETWORK TOOLS # ####################### # Get network details @mcp.tool() def get_network_details(network_id: str) -> str: """Get details for a specific network""" network = dashboard.networks.getNetwork(network_id) return json.dumps(network, indent=2) # Get network devices @mcp.tool() def get_network_devices(network_id: str) -> str: """Get a list of devices in a specific network""" devices = dashboard.networks.getNetworkDevices(network_id) return json.dumps(devices, indent=2) # Update network @mcp.tool() def update_network(network_id: str, update_data: NetworkUpdateSchema) -> str: """ Update a network's properties using a schema-validated model Args: network_id: The ID of the network to update update_data: Network properties to update (name, timeZone, tags, enrollmentString, notes) """ # Convert the Pydantic model to a dictionary and filter out None values update_dict = {k: v for k, v in update_data.dict().items() if v is not None} result = dashboard.networks.updateNetwork(network_id, **update_dict) return json.dumps(result, indent=2) # Get clients from Meraki @mcp.tool() def get_clients(network_id: str, timespan: int = 86400) -> str: """ Get a list of clients from a specific Meraki network. Args: network_id (str): The ID of the Meraki network. timespan (int): The timespan in seconds to get clients (default: 24 hours) Returns: str: JSON-formatted list of clients. """ clients = dashboard.networks.getNetworkClients(network_id, timespan=timespan) return json.dumps(clients, indent=2) # Get client details @mcp.tool() def get_client_details(network_id: str, client_id: str) -> str: """Get details for a specific client in a network""" client = dashboard.networks.getNetworkClient(network_id, client_id) return json.dumps(client, indent=2) # Get client usage history @mcp.tool() def get_client_usage(network_id: str, client_id: str) -> str: """Get the usage history for a client""" usage = dashboard.networks.getNetworkClientUsageHistory(network_id, client_id) return json.dumps(usage, indent=2) # Get client policy from Meraki @mcp.tool() async def get_client_policy(network_id: str, client_id: str) -> str: """ Get the policy for a specific client in a specific Meraki network. Args: network_id (str): The ID of the Meraki network. client_id (str): The ID (MAC address or client ID) of the client. Returns: str: JSON-formatted client policy. """ loop = asyncio.get_event_loop() policy = await loop.run_in_executor( None, lambda: dashboard.networks.getNetworkClientPolicy(network_id, client_id) ) return json.dumps(policy, indent=2) # Update client policy @mcp.tool() def update_client_policy(network_id: str, client_id: str, device_policy: str, group_policy_id: str = None) -> str: """Update policy for a client""" kwargs = {'devicePolicy': device_policy} if group_policy_id: kwargs['groupPolicyId'] = group_policy_id result = dashboard.networks.updateNetworkClientPolicy(network_id, client_id, **kwargs) return json.dumps(result, indent=2) # Get network traffic analysis @mcp.tool() def get_network_traffic(network_id: str, timespan: int = 86400) -> str: """Get traffic analysis data for a network""" traffic = dashboard.networks.getNetworkTraffic(network_id, timespan=timespan) return json.dumps(traffic, indent=2) ####################### # DEVICE TOOLS # ####################### # Get device details @mcp.tool() async def get_device_details(serial: str) -> str: """Get details for a specific device by serial number""" device = await async_get_device(serial) return json.dumps(device, indent=2) # Update device @mcp.tool() async def update_device(serial: str, device_settings: DeviceUpdateSchema) -> str: """ Update a device in the Meraki organization using a schema-validated model Args: serial: The serial number of the device to update device_settings: Device properties to update (name, tags, lat, lng, address, notes, etc.) Returns: Confirmation of the update with the new settings """ # Convert the Pydantic model to a dictionary and filter out None values update_dict = {k: v for k, v in device_settings.dict().items() if v is not None} await async_update_device(serial, **update_dict) # Get the updated device details to return updated_device = await async_get_device(serial) return json.dumps({ "status": "success", "message": f"Device {serial} updated", "updated_settings": update_dict, "current_device": updated_device }, indent=2) # Claim devices into the Meraki organization @mcp.tool() def claim_devices(network_id: str, serials: list[str]) -> str: """Claim one or more devices into a Meraki network""" dashboard.networks.claimNetworkDevices(network_id, serials) return f"Devices {serials} claimed into network {network_id}" # Remove device from network @mcp.tool() def remove_device(serial: str) -> str: """Remove a device from its network""" dashboard.networks.removeNetworkDevices(serial) return f"Device {serial} removed from network" # Reboot device @mcp.tool() def reboot_device(serial: str) -> str: """Reboot a device""" result = dashboard.devices.rebootDevice(serial) return json.dumps(result, indent=2) # Get device clients @mcp.tool() def get_device_clients(serial: str, timespan: int = 86400) -> str: """Get clients connected to a specific device""" clients = dashboard.devices.getDeviceClients(serial, timespan=timespan) return json.dumps(clients, indent=2) # Get device status @mcp.tool() def get_device_status(serial: str) -> str: """Get the current status of a device""" status = dashboard.devices.getDeviceStatuses(serial) return json.dumps(status, indent=2) # Get device uplink status @mcp.tool() def get_device_uplink(serial: str) -> str: """Get the uplink status of a device""" uplink = dashboard.devices.getDeviceUplink(serial) return json.dumps(uplink, indent=2) ####################### # WIRELESS TOOLS # ####################### # Get wireless SSIDs @mcp.tool() async def get_wireless_ssids(network_id: str) -> str: """Get wireless SSIDs for a network""" ssids = await async_get_wireless_ssids(network_id) return json.dumps(ssids, indent=2) # Update wireless SSID @mcp.tool() async def update_wireless_ssid(network_id: str, ssid_number: str, ssid_settings: SsidUpdateSchema) -> str: """ Update a wireless SSID with comprehensive schema validation Args: network_id: The ID of the network containing the SSID ssid_number: The number of the SSID to update ssid_settings: Comprehensive SSID settings following the Meraki schema Returns: The updated SSID configuration """ # Convert the Pydantic model to a dictionary and filter out None values update_dict = {k: v for k, v in ssid_settings.dict().items() if v is not None} result = await async_update_wireless_ssid(network_id, ssid_number, **update_dict) return json.dumps(result, indent=2) # Get wireless settings @mcp.tool() def get_wireless_settings(network_id: str) -> str: """Get wireless settings for a network""" settings = dashboard.wireless.getNetworkWirelessSettings(network_id) return json.dumps(settings, indent=2) ####################### # SWITCH TOOLS # ####################### # Get switch ports @mcp.tool() def get_switch_ports(serial: str) -> str: """Get ports for a switch""" ports = dashboard.switch.getDeviceSwitchPorts(serial) return json.dumps(ports, indent=2) # Update switch port @mcp.tool() def update_switch_port(serial: str, port_id: str, name: str = None, tags: list[str] = None, enabled: bool = None, vlan: int = None) -> str: """Update a switch port""" kwargs = {} if name: kwargs['name'] = name if tags: kwargs['tags'] = tags if enabled is not None: kwargs['enabled'] = enabled if vlan: kwargs['vlan'] = vlan result = dashboard.switch.updateDeviceSwitchPort(serial, port_id, **kwargs) return json.dumps(result, indent=2) # Get switch VLAN settings @mcp.tool() def get_switch_vlans(network_id: str) -> str: """Get VLANs for a network""" vlans = dashboard.switch.getNetworkSwitchVlans(network_id) return json.dumps(vlans, indent=2) # Create switch VLAN @mcp.tool() def create_switch_vlan(network_id: str, vlan_id: int, name: str, subnet: str = None, appliance_ip: str = None) -> str: """Create a switch VLAN""" kwargs = {} if subnet: kwargs['subnet'] = subnet if appliance_ip: kwargs['applianceIp'] = appliance_ip result = dashboard.switch.createNetworkSwitchVlan(network_id, vlan_id, name, **kwargs) return json.dumps(result, indent=2) ####################### # APPLIANCE TOOLS # ####################### # Get security center @mcp.tool() def get_security_center(network_id: str) -> str: """Get security information for a network""" security = dashboard.appliance.getNetworkApplianceSecurityCenter(network_id) return json.dumps(security, indent=2) # Get VPN status @mcp.tool() def get_vpn_status(network_id: str) -> str: """Get VPN status for a network""" vpn_status = dashboard.appliance.getNetworkApplianceVpnSiteToSiteVpn(network_id) return json.dumps(vpn_status, indent=2) # Get firewall rules @mcp.tool() def get_firewall_rules(network_id: str) -> str: """Get firewall rules for a network""" rules = dashboard.appliance.getNetworkApplianceFirewallL3FirewallRules(network_id) return json.dumps(rules, indent=2) # Update firewall rules @mcp.tool() def update_firewall_rules(network_id: str, rules: List[FirewallRule]) -> str: """ Update firewall rules for a network using schema-validated models Args: network_id: The ID of the network rules: List of firewall rules following the Meraki schema Returns: The updated firewall rules configuration """ # Convert the list of Pydantic models to a list of dictionaries rules_dict = [rule.dict(exclude_none=True) for rule in rules] result = dashboard.appliance.updateNetworkApplianceFirewallL3FirewallRules(network_id, rules=rules_dict) return json.dumps(result, indent=2) ####################### # CAMERA TOOLS # ####################### # Get camera video settings @mcp.tool() def get_camera_video_settings(network_id: str, serial: str) -> str: """Get video settings for a camera""" settings = dashboard.camera.getDeviceCameraVideoSettings(serial) return json.dumps(settings, indent=2) # Get camera quality and retention settings @mcp.tool() def get_camera_quality_settings(network_id: str) -> str: """Get quality and retention settings for cameras in a network""" settings = dashboard.camera.getNetworkCameraQualityRetentionProfiles(network_id) return json.dumps(settings, indent=2) ####################### # ADVANCED ORGANIZATION TOOLS ####################### # Get organization admins @mcp.tool() def get_organization_admins(org_id: str = None) -> str: """Get a list of organization admins""" organization_id = org_id or MERAKI_ORG_ID admins = dashboard.organizations.getOrganizationAdmins(organization_id) return json.dumps(admins, indent=2) # Create organization admin @mcp.tool() def create_organization_admin(org_id: str, email: str, name: str, org_access: str, tags: list[str] = None, networks: list[dict] = None) -> str: """Create a new organization admin""" organization_id = org_id or MERAKI_ORG_ID kwargs = { 'email': email, 'name': name, 'orgAccess': org_access } if tags: kwargs['tags'] = tags if networks: kwargs['networks'] = networks result = dashboard.organizations.createOrganizationAdmin(organization_id, **kwargs) return json.dumps(result, indent=2) # Get organization API requests @mcp.tool() def get_organization_api_requests(org_id: str = None, timespan: int = 86400) -> str: """Get organization API request history""" organization_id = org_id or MERAKI_ORG_ID requests = dashboard.organizations.getOrganizationApiRequests(organization_id, timespan=timespan) return json.dumps(requests, indent=2) # Get organization webhook logs @mcp.tool() def get_organization_webhook_logs(org_id: str = None, timespan: int = 86400) -> str: """Get organization webhook logs""" organization_id = org_id or MERAKI_ORG_ID logs = dashboard.organizations.getOrganizationWebhooksLogs(organization_id, timespan=timespan) return json.dumps(logs, indent=2) ####################### # ENHANCED NETWORK MONITORING ####################### # Get network events @mcp.tool() def get_network_events(network_id: str, timespan: int = 86400, per_page: int = 100) -> str: """Get network events history""" events = dashboard.networks.getNetworkEvents(network_id, timespan=timespan, perPage=per_page) return json.dumps(events, indent=2) # Get network event types @mcp.tool() def get_network_event_types(network_id: str) -> str: """Get available network event types""" event_types = dashboard.networks.getNetworkEventsEventTypes(network_id) return json.dumps(event_types, indent=2) # Get network alerts history @mcp.tool() def get_network_alerts_history(network_id: str, timespan: int = 86400) -> str: """Get network alerts history""" alerts = dashboard.networks.getNetworkAlertsHistory(network_id, timespan=timespan) return json.dumps(alerts, indent=2) # Get network alerts settings @mcp.tool() def get_network_alerts_settings(network_id: str) -> str: """Get network alerts settings""" settings = dashboard.networks.getNetworkAlertsSettings(network_id) return json.dumps(settings, indent=2) # Update network alerts settings @mcp.tool() def update_network_alerts_settings(network_id: str, defaultDestinations: dict = None, alerts: list[dict] = None) -> str: """Update network alerts settings""" kwargs = {} if defaultDestinations: kwargs['defaultDestinations'] = defaultDestinations if alerts: kwargs['alerts'] = alerts result = dashboard.networks.updateNetworkAlertsSettings(network_id, **kwargs) return json.dumps(result, indent=2) ####################### # LIVE DEVICE TOOLS ####################### # Ping device @mcp.tool() def ping_device(serial: str, target_ip: str, count: int = 5) -> str: """Ping a device from another device""" result = dashboard.devices.createDeviceLiveToolsPing(serial, target_ip, count=count) return json.dumps(result, indent=2) # Get ping results @mcp.tool() def get_device_ping_results(serial: str, ping_id: str) -> str: """Get results from a device ping test""" result = dashboard.devices.getDeviceLiveToolsPing(serial, ping_id) return json.dumps(result, indent=2) # Cable test device @mcp.tool() def cable_test_device(serial: str, ports: list[str]) -> str: """Run cable test on device ports""" result = dashboard.devices.createDeviceLiveToolsCableTest(serial, ports) return json.dumps(result, indent=2) # Get cable test results @mcp.tool() def get_device_cable_test_results(serial: str, cable_test_id: str) -> str: """Get results from a device cable test""" result = dashboard.devices.getDeviceLiveToolsCableTest(serial, cable_test_id) return json.dumps(result, indent=2) # Blink device LEDs @mcp.tool() def blink_device_leds(serial: str, duration: int = 5) -> str: """Blink device LEDs for identification""" result = dashboard.devices.blinkDeviceLeds(serial, duration=duration) return json.dumps(result, indent=2) # Wake on LAN @mcp.tool() def wake_on_lan_device(serial: str, mac: str) -> str: """Send wake-on-LAN packet to a device""" result = dashboard.devices.createDeviceLiveToolsWakeOnLan(serial, mac) return json.dumps(result, indent=2) ####################### # ADVANCED WIRELESS TOOLS ####################### # Get wireless RF profiles @mcp.tool() def get_wireless_rf_profiles(network_id: str) -> str: """Get wireless RF profiles for a network""" profiles = dashboard.wireless.getNetworkWirelessRfProfiles(network_id) return json.dumps(profiles, indent=2) # Create wireless RF profile @mcp.tool() def create_wireless_rf_profile(network_id: str, name: str, band_selection_type: str, **kwargs) -> str: """Create a wireless RF profile""" result = dashboard.wireless.createNetworkWirelessRfProfile(network_id, name, bandSelectionType=band_selection_type, **kwargs) return json.dumps(result, indent=2) # Get wireless channel utilization @mcp.tool() def get_wireless_channel_utilization(network_id: str, timespan: int = 86400) -> str: """Get wireless channel utilization history""" utilization = dashboard.wireless.getNetworkWirelessChannelUtilizationHistory(network_id, timespan=timespan) return json.dumps(utilization, indent=2) # Get wireless signal quality @mcp.tool() def get_wireless_signal_quality(network_id: str, timespan: int = 86400) -> str: """Get wireless signal quality history""" quality = dashboard.wireless.getNetworkWirelessSignalQualityHistory(network_id, timespan=timespan) return json.dumps(quality, indent=2) # Get wireless connection stats @mcp.tool() def get_wireless_connection_stats(network_id: str, timespan: int = 86400) -> str: """Get wireless connection statistics""" stats = dashboard.wireless.getNetworkWirelessConnectionStats(network_id, timespan=timespan) return json.dumps(stats, indent=2) # Get wireless client connectivity events @mcp.tool() def get_wireless_client_connectivity_events(network_id: str, client_id: str, timespan: int = 86400) -> str: """Get wireless client connectivity events""" events = dashboard.wireless.getNetworkWirelessClientConnectivityEvents(network_id, client_id, timespan=timespan) return json.dumps(events, indent=2) ####################### # ADVANCED SWITCH TOOLS ####################### # Get switch port statuses @mcp.tool() def get_switch_port_statuses(serial: str) -> str: """Get switch port statuses""" statuses = dashboard.switch.getDeviceSwitchPortsStatuses(serial) return json.dumps(statuses, indent=2) # Cycle switch ports @mcp.tool() def cycle_switch_ports(serial: str, ports: list[str]) -> str: """Cycle (restart) switch ports""" result = dashboard.switch.cycleDeviceSwitchPorts(serial, ports) return json.dumps(result, indent=2) # Get switch access control lists @mcp.tool() def get_switch_access_control_lists(network_id: str) -> str: """Get switch access control lists""" acls = dashboard.switch.getNetworkSwitchAccessControlLists(network_id) return json.dumps(acls, indent=2) # Update switch access control lists @mcp.tool() def update_switch_access_control_lists(network_id: str, rules: list[dict]) -> str: """Update switch access control lists""" result = dashboard.switch.updateNetworkSwitchAccessControlLists(network_id, rules) return json.dumps(result, indent=2) # Get switch QoS rules @mcp.tool() def get_switch_qos_rules(network_id: str) -> str: """Get switch QoS rules""" rules = dashboard.switch.getNetworkSwitchQosRules(network_id) return json.dumps(rules, indent=2) # Create switch QoS rule @mcp.tool() def create_switch_qos_rule(network_id: str, vlan: int, protocol: str, src_port: int, src_port_range: str = None, dst_port: int = None, dst_port_range: str = None, dscp: int = None) -> str: """Create a switch QoS rule""" kwargs = { 'vlan': vlan, 'protocol': protocol, 'srcPort': src_port } if src_port_range: kwargs['srcPortRange'] = src_port_range if dst_port: kwargs['dstPort'] = dst_port if dst_port_range: kwargs['dstPortRange'] = dst_port_range if dscp: kwargs['dscp'] = dscp result = dashboard.switch.createNetworkSwitchQosRule(network_id, **kwargs) return json.dumps(result, indent=2) ####################### # ADVANCED APPLIANCE TOOLS ####################### # Get appliance VPN site-to-site status @mcp.tool() def get_appliance_vpn_site_to_site(network_id: str) -> str: """Get appliance VPN site-to-site configuration""" vpn = dashboard.appliance.getNetworkApplianceVpnSiteToSiteVpn(network_id) return json.dumps(vpn, indent=2) # Update appliance VPN site-to-site @mcp.tool() def update_appliance_vpn_site_to_site(network_id: str, mode: str, hubs: list[dict] = None, subnets: list[dict] = None) -> str: """Update appliance VPN site-to-site configuration""" kwargs = {'mode': mode} if hubs: kwargs['hubs'] = hubs if subnets: kwargs['subnets'] = subnets result = dashboard.appliance.updateNetworkApplianceVpnSiteToSiteVpn(network_id, **kwargs) return json.dumps(result, indent=2) # Get appliance content filtering @mcp.tool() def get_appliance_content_filtering(network_id: str) -> str: """Get appliance content filtering settings""" filtering = dashboard.appliance.getNetworkApplianceContentFiltering(network_id) return json.dumps(filtering, indent=2) # Update appliance content filtering @mcp.tool() def update_appliance_content_filtering(network_id: str, allowed_urls: list[str] = None, blocked_urls: list[str] = None, blocked_url_patterns: list[str] = None, youtube_restricted_for_teenagers: bool = None, youtube_restricted_for_mature: bool = None) -> str: """Update appliance content filtering settings""" kwargs = {} if allowed_urls: kwargs['allowedUrls'] = allowed_urls if blocked_urls: kwargs['blockedUrls'] = blocked_urls if blocked_url_patterns: kwargs['blockedUrlPatterns'] = blocked_url_patterns if youtube_restricted_for_teenagers is not None: kwargs['youtubeRestrictedForTeenagers'] = youtube_restricted_for_teenagers if youtube_restricted_for_mature is not None: kwargs['youtubeRestrictedForMature'] = youtube_restricted_for_mature result = dashboard.appliance.updateNetworkApplianceContentFiltering(network_id, **kwargs) return json.dumps(result, indent=2) # Get appliance security events @mcp.tool() def get_appliance_security_events(network_id: str, timespan: int = 86400) -> str: """Get appliance security events""" events = dashboard.appliance.getNetworkApplianceSecurityEvents(network_id, timespan=timespan) return json.dumps(events, indent=2) # Get appliance traffic shaping @mcp.tool() def get_appliance_traffic_shaping(network_id: str) -> str: """Get appliance traffic shaping settings""" shaping = dashboard.appliance.getNetworkApplianceTrafficShaping(network_id) return json.dumps(shaping, indent=2) # Update appliance traffic shaping @mcp.tool() def update_appliance_traffic_shaping(network_id: str, global_bandwidth_limits: dict = None) -> str: """Update appliance traffic shaping settings""" kwargs = {} if global_bandwidth_limits: kwargs['globalBandwidthLimits'] = global_bandwidth_limits result = dashboard.appliance.updateNetworkApplianceTrafficShaping(network_id, **kwargs) return json.dumps(result, indent=2) ####################### # CAMERA TOOLS ####################### # Get camera analytics live @mcp.tool() def get_camera_analytics_live(serial: str) -> str: """Get live camera analytics""" analytics = dashboard.camera.getDeviceCameraAnalyticsLive(serial) return json.dumps(analytics, indent=2) # Get camera analytics overview @mcp.tool() def get_camera_analytics_overview(serial: str, timespan: int = 86400) -> str: """Get camera analytics overview""" overview = dashboard.camera.getDeviceCameraAnalyticsOverview(serial, timespan=timespan) return json.dumps(overview, indent=2) # Get camera analytics zones @mcp.tool() def get_camera_analytics_zones(serial: str) -> str: """Get camera analytics zones""" zones = dashboard.camera.getDeviceCameraAnalyticsZones(serial) return json.dumps(zones, indent=2) # Generate camera snapshot @mcp.tool() def generate_camera_snapshot(serial: str, timestamp: str = None) -> str: """Generate a camera snapshot""" kwargs = {} if timestamp: kwargs['timestamp'] = timestamp result = dashboard.camera.generateDeviceCameraSnapshot(serial, **kwargs) return json.dumps(result, indent=2) # Get camera sense @mcp.tool() def get_camera_sense(serial: str) -> str: """Get camera sense configuration""" sense = dashboard.camera.getDeviceCameraSense(serial) return json.dumps(sense, indent=2) # Update camera sense @mcp.tool() def update_camera_sense(serial: str, sense_enabled: bool = None, mqtt_broker_id: str = None, audio_detection: dict = None) -> str: """Update camera sense configuration""" kwargs = {} if sense_enabled is not None: kwargs['senseEnabled'] = sense_enabled if mqtt_broker_id: kwargs['mqttBrokerId'] = mqtt_broker_id if audio_detection: kwargs['audioDetection'] = audio_detection result = dashboard.camera.updateDeviceCameraSense(serial, **kwargs) return json.dumps(result, indent=2) ####################### # NETWORK AUTOMATION TOOLS ####################### # Create action batch @mcp.tool() def create_action_batch(org_id: str, actions: list[dict], confirmed: bool = True, synchronous: bool = False) -> str: """Create an action batch for bulk operations""" organization_id = org_id or MERAKI_ORG_ID result = dashboard.organizations.createOrganizationActionBatch(organization_id, actions, confirmed=confirmed, synchronous=synchronous) return json.dumps(result, indent=2) # Get action batch status @mcp.tool() def get_action_batch_status(org_id: str, batch_id: str) -> str: """Get action batch status""" organization_id = org_id or MERAKI_ORG_ID status = dashboard.organizations.getOrganizationActionBatch(organization_id, batch_id) return json.dumps(status, indent=2) # Get action batches @mcp.tool() def get_action_batches(org_id: str = None) -> str: """Get all action batches for an organization""" organization_id = org_id or MERAKI_ORG_ID batches = dashboard.organizations.getOrganizationActionBatches(organization_id) return json.dumps(batches, indent=2) # Define resources #Add a dynamic greeting resource @mcp.resource("greeting: //{name}") def greeting(name: str) -> str: """Greet a user by name""" return f"Hello {name}!" #execute and return the stdio output if __name__ == "__main__": mcp.run() ```