# Directory Structure ``` ├── ansible.py ├── app.py ├── claude_desktop_config.json ├── eda.py ├── llama_quick_start_guide.md ├── mcp_server │ ├── ansible.py │ ├── Containerfile │ └── deployment.yaml ├── README.md └── redhat_insights_mcp.py ``` # Files -------------------------------------------------------------------------------- /README.md: -------------------------------------------------------------------------------- ```markdown # Setup of the Environment for the AI Powered Ansible & OpenShift Automation with Model Context Protocols (MCP) Servers ## Overview This guide will walk you through setting up the MCP Servers + Claude Desktop portions of the demo that focused on using Claude Desktop to interact with your Ansible Automation Platform and OpenShift Cluster environments. ## Prerequisites Ensure you have the following installed. ### Required - An Ansible Automation Platform (AAP) environment - An OpenShift Cluster with OpenShift Virtualization - [Claude Desktop](https://claude.ai/download) installed on your laptop (Pro Plan required for best results) - Python 3.10 or higher installed on your laptop - Ensure you are authenticated with your OpenShift cluster (e.g. exporting kubeconfig) ## Step One: Setup your laptop environment Install `uv` and setup your Python project and environment. ``` curl -LsSf https://astral.sh/uv/install.sh | sh ``` Install [jbang](https://www.jbang.dev/download/) which will be used when using the Kubernetes MCP Server. (jbang needs to be installed globally, recommend using the homebrew install pattern. If you install it locally (the curl pattern), Claude won't be able to access it). Restart your terminal to ensure that the `uv` and `jbang` command are now available. ## Step Two: Create and Setup your Project ``` # Create a new directory for our project uv init ansible cd ansible # Create virtual environment and activate it uv venv source .venv/bin/activate # Install dependencies uv add "mcp[cli]" httpx # Create our server file touch ansible.py ``` ## Step 3 Building your Ansible Automation Controller MCP Server This is the MCP Server I used to interact with my automation controller. Feel free to copy/paste this into your `ansible.py` file. Note: to connect to self-signed SSL, use edit the async client to be https.AsyncClient(verify=False) ``` import os import httpx from mcp.server.fastmcp import FastMCP from typing import Any # Environment variables for authentication AAP_URL = os.getenv("AAP_URL") AAP_TOKEN = os.getenv("AAP_TOKEN") if not AAP_TOKEN: raise ValueError("AAP_TOKEN is required") # Headers for API authentication HEADERS = { "Authorization": f"Bearer {AAP_TOKEN}", "Content-Type": "application/json" } # Initialize FastMCP mcp = FastMCP("ansible") async def make_request(url: str, method: str = "GET", json: dict = None) -> Any: """Helper function to make authenticated API requests to AAP.""" async with httpx.AsyncClient() as client: response = await client.request(method, url, headers=HEADERS, json=json) if response.status_code not in [200, 201]: return f"Error {response.status_code}: {response.text}" return response.json() if "application/json" in response.headers.get("Content-Type", "") else response.text @mcp.tool() async def list_inventories() -> Any: """List all inventories in Ansible Automation Platform.""" return await make_request(f"{AAP_URL}/inventories/") @mcp.tool() async def get_inventory(inventory_id: str) -> Any: """Get details of a specific inventory by ID.""" return await make_request(f"{AAP_URL}/inventories/{inventory_id}/") @mcp.tool() async def run_job(template_id: int, extra_vars: dict = {}) -> Any: """Run a job template by ID, optionally with extra_vars.""" return await make_request(f"{AAP_URL}/job_templates/{template_id}/launch/", method="POST", json={"extra_vars": extra_vars}) @mcp.tool() async def job_status(job_id: int) -> Any: """Check the status of a job by ID.""" return await make_request(f"{AAP_URL}/jobs/{job_id}/") @mcp.tool() async def job_logs(job_id: int) -> str: """Retrieve logs for a job.""" return await make_request(f"{AAP_URL}/jobs/{job_id}/stdout/") @mcp.tool() async def create_project( name: str, organization_id: int, source_control_url: str, source_control_type: str = "git", description: str = "", execution_environment_id: int = None, content_signature_validation_credential_id: int = None, source_control_branch: str = "", source_control_refspec: str = "", source_control_credential_id: int = None, clean: bool = False, update_revision_on_launch: bool = False, delete: bool = False, allow_branch_override: bool = False, track_submodules: bool = False, ) -> Any: """Create a new project in Ansible Automation Platform.""" payload = { "name": name, "description": description, "organization": organization_id, "scm_type": source_control_type.lower(), # Git is default "scm_url": source_control_url, "scm_branch": source_control_branch, "scm_refspec": source_control_refspec, "scm_clean": clean, "scm_delete_on_update": delete, "scm_update_on_launch": update_revision_on_launch, "allow_override": allow_branch_override, "scm_track_submodules": track_submodules, } if execution_environment_id: payload["execution_environment"] = execution_environment_id if content_signature_validation_credential_id: payload["signature_validation_credential"] = content_signature_validation_credential_id if source_control_credential_id: payload["credential"] = source_control_credential_id return await make_request(f"{AAP_URL}/projects/", method="POST", json=payload) @mcp.tool() async def create_job_template( name: str, project_id: int, playbook: str, inventory_id: int, job_type: str = "run", description: str = "", credential_id: int = None, execution_environment_id: int = None, labels: list[str] = None, forks: int = 0, limit: str = "", verbosity: int = 0, timeout: int = 0, job_tags: list[str] = None, skip_tags: list[str] = None, extra_vars: dict = None, privilege_escalation: bool = False, concurrent_jobs: bool = False, provisioning_callback: bool = False, enable_webhook: bool = False, prevent_instance_group_fallback: bool = False, ) -> Any: """Create a new job template in Ansible Automation Platform.""" payload = { "name": name, "description": description, "job_type": job_type, "project": project_id, "playbook": playbook, "inventory": inventory_id, "forks": forks, "limit": limit, "verbosity": verbosity, "timeout": timeout, "ask_variables_on_launch": bool(extra_vars), "ask_tags_on_launch": bool(job_tags), "ask_skip_tags_on_launch": bool(skip_tags), "ask_credential_on_launch": credential_id is None, "ask_execution_environment_on_launch": execution_environment_id is None, "ask_labels_on_launch": labels is None, "ask_inventory_on_launch": False, # Inventory is required, so not prompting "ask_job_type_on_launch": False, # Job type is required, so not prompting "become_enabled": privilege_escalation, "allow_simultaneous": concurrent_jobs, "scm_branch": "", "webhook_service": "github" if enable_webhook else "", "prevent_instance_group_fallback": prevent_instance_group_fallback, } if credential_id: payload["credential"] = credential_id if execution_environment_id: payload["execution_environment"] = execution_environment_id if labels: payload["labels"] = labels if job_tags: payload["job_tags"] = job_tags if skip_tags: payload["skip_tags"] = skip_tags if extra_vars: payload["extra_vars"] = extra_vars return await make_request(f"{AAP_URL}/job_templates/", method="POST", json=payload) @mcp.tool() async def list_inventory_sources() -> Any: """List all inventory sources in Ansible Automation Platform.""" return await make_request(f"{AAP_URL}/inventory_sources/") @mcp.tool() async def get_inventory_source(inventory_source_id: int) -> Any: """Get details of a specific inventory source.""" return await make_request(f"{AAP_URL}/inventory_sources/{inventory_source_id}/") @mcp.tool() async def create_inventory_source( name: str, inventory_id: int, source: str, credential_id: int, source_vars: dict = None, update_on_launch: bool = True, timeout: int = 0, ) -> Any: """Create a dynamic inventory source. Claude will ask for the source type and credential before proceeding.""" valid_sources = [ "file", "constructed", "scm", "ec2", "gce", "azure_rm", "vmware", "satellite6", "openstack", "rhv", "controller", "insights", "terraform", "openshift_virtualization" ] if source not in valid_sources: return f"Error: Invalid source type '{source}'. Please select from: {', '.join(valid_sources)}" if not credential_id: return "Error: Credential is required to create an inventory source." payload = { "name": name, "inventory": inventory_id, "source": source, "credential": credential_id, "source_vars": source_vars, "update_on_launch": update_on_launch, "timeout": timeout, } return await make_request(f"{AAP_URL}/inventory_sources/", method="POST", json=payload) @mcp.tool() async def update_inventory_source(inventory_source_id: int, update_data: dict) -> Any: """Update an existing inventory source.""" return await make_request(f"{AAP_URL}/inventory_sources/{inventory_source_id}/", method="PATCH", json=update_data) @mcp.tool() async def delete_inventory_source(inventory_source_id: int) -> Any: """Delete an inventory source.""" return await make_request(f"{AAP_URL}/inventory_sources/{inventory_source_id}/", method="DELETE") @mcp.tool() async def sync_inventory_source(inventory_source_id: int) -> Any: """Manually trigger a sync for an inventory source.""" return await make_request(f"{AAP_URL}/inventory_sources/{inventory_source_id}/update/", method="POST") @mcp.tool() async def create_inventory( name: str, organization_id: int, description: str = "", kind: str = "", host_filter: str = "", variables: dict = None, prevent_instance_group_fallback: bool = False, ) -> Any: """Create an inventory in Ansible Automation Platform.""" payload = { "name": name, "organization": organization_id, "description": description, "kind": kind, "host_filter": host_filter, "variables": variables, "prevent_instance_group_fallback": prevent_instance_group_fallback, } return await make_request(f"{AAP_URL}/inventories/", method="POST", json=payload) @mcp.tool() async def delete_inventory(inventory_id: int) -> Any: """Delete an inventory from Ansible Automation Platform.""" return await make_request(f"{AAP_URL}/inventories/{inventory_id}/", method="DELETE") @mcp.tool() async def list_job_templates() -> Any: """List all job templates available in Ansible Automation Platform.""" return await make_request(f"{AAP_URL}/job_templates/") @mcp.tool() async def get_job_template(template_id: int) -> Any: """Retrieve details of a specific job template.""" return await make_request(f"{AAP_URL}/job_templates/{template_id}/") @mcp.tool() async def list_jobs() -> Any: """List all jobs available in Ansible Automation Platform.""" return await make_request(f"{AAP_URL}/jobs/") @mcp.tool() async def list_recent_jobs(hours: int = 24) -> Any: """List all jobs executed in the last specified hours (default 24 hours).""" from datetime import datetime, timedelta time_filter = (datetime.utcnow() - timedelta(hours=hours)).isoformat() + "Z" return await make_request(f"{AAP_URL}/jobs/?created__gte={time_filter}") if __name__ == "__main__": mcp.run(transport="stdio") ``` ## Step 4: Configuring Claude Desktop to use your MCP Servers In my particular case, I want to take advantage of two MCP Servers: the Ansible MCP Server above and the Kubernetes MCP Server that I found within the [quarkus-mcp-servers](https://github.com/quarkiverse/quarkus-mcp-servers/tree/main/kubernetes) Git repo Open the `claude_desktop_config.json` , which on MacOS is located at ``` ~/Library/Application\ Support/Claude/claude_desktop_config.json ``` ``` { "mcpServers": { "ansible": { "command": "/absolute/path/to/uv", "args": [ "--directory", "/absolute/path/to/ansible_mcp", "run", "ansible.py" ], "env": { "AAP_TOKEN": "<aap-token>", "AAP_URL": "https://<aap-url>/api/controller/v2" } }, "kubernetes": { "command": "jbang", "args": [ "--quiet", "https://github.com/quarkiverse/quarkus-mcp-servers/blob/main/kubernetes/src/main/java/io/quarkiverse/mcp/servers/kubernetes/MCPServerKubernetes.java" ] } } } ``` Save the file. WARNING: Absolute path to your `uv` binary is required. Do a `which uv` on your system to get the full path. NOTE: If you need to create the AAP_TOKEN, go to the AAP Dashboard, select Access Management -> Users -> <your_user> -> Tokens -> Create token -> Select the Scope dropdown and select 'Write' and click Create token. ## Step 5: Re-Launch Claude Desktop If you already had Claude Desktop open, relaunch it, otherwise make sure Claude Desktop is picking up the MCP servers. You can verify this by ensuring the hammer icon is launched.  NOTE: The number next to the hammer will vary based up on the amount of MCP tools available. Once you click on the hammer icon, you can see a list of tools. Below is an example.  ## Step 6: Test your Environment Now with everything setup, see if you can interact with your Ansible Automation Platform and OpenShift cluster. Feel free to ask it questions such as: * How many Job Templates are available? * How many VMs are on my OpenShift cluster? NOTE: It is very likely you will need to take advantage of the Claude Desktop Pro Plan in order to get the full functionality. ## References [Claude Desktop Quickstart for Server Developers](https://modelcontextprotocol.io/quickstart/server) ## BONUS: Adding Event Driven Ansible MCP Server If you have setup Event Driven Ansible, you can take advantage of the Event Driven Ansible MCP Server below. The instructions are similar to the above. * Create an `eda.py` and store it in your `/absolute/path/to/ansible_mcp` * Update your `claude_desktop_config.json` * Restart your Claude Desktop and verify the hammer has picked up your new MCP tools The two files are listed below for easy copy/paste. ### claude_desktop_config.json ``` { "mcpServers": { "ansible": { "command": "/absolute/path/to/uv", "args": [ "--directory", "/absolute/path/to/ansible_mcp", "run", "ansible.py" ], "env": { "AAP_TOKEN": "<aap-token>", "AAP_URL": "https://<aap-url>/api/controller/v2" } }, "kubernetes": { "command": "jbang", "args": [ "--quiet", "https://github.com/quarkiverse/quarkus-mcp-servers/blob/main/kubernetes/src/main/java/io/quarkiverse/mcp/servers/kubernetes/MCPServerKubernetes.java" ] }, "eda": { "command": "/absolute/path/to/uv", "args": [ "--directory", "/absolute/path/to/ansible_mcp", "run", "eda.py" ], "env": { "EDA_TOKEN": "<EDA_TOKEN>", "EDA_URL": "https://<aap-url>/api/eda/v1" } } } } ``` WARNING: Absolute path to your `uv` binary is required. Do a `which uv` on your system to get the full path. NOTE: An EDA Token can be generated from the AAP Dashboard. ### eda.py MCP Server ``` import os import httpx from mcp.server.fastmcp import FastMCP from typing import Any, Dict # Environment variables for authentication EDA_URL = os.getenv("EDA_URL") EDA_TOKEN = os.getenv("EDA_TOKEN") if not EDA_TOKEN: raise ValueError("EDA_TOKEN is required") # Headers for API authentication HEADERS = { "Authorization": f"Bearer {EDA_TOKEN}", "Content-Type": "application/json" } # Initialize FastMCP mcp = FastMCP("eda") async def make_request(url: str, method: str = "GET", json: Dict = None) -> Any: """Helper function to make authenticated API requests to EDA.""" async with httpx.AsyncClient() as client: response = await client.request(method, url, headers=HEADERS, json=json) if response.status_code not in [200, 201, 204]: return f"Error {response.status_code}: {response.text}" return response.json() if "application/json" in response.headers.get("Content-Type", "") else response.text @mcp.tool() async def list_activations() -> Any: """List all activations in Event-Driven Ansible.""" return await make_request(f"{EDA_URL}/activations/") @mcp.tool() async def get_activation(activation_id: int) -> Any: """Get details of a specific activation.""" return await make_request(f"{EDA_URL}/activations/{activation_id}/") @mcp.tool() async def create_activation(payload: Dict) -> Any: """Create a new activation.""" return await make_request(f"{EDA_URL}/activations/", method="POST", json=payload) @mcp.tool() async def disable_activation(activation_id: int) -> Any: """Disable an activation.""" return await make_request(f"{EDA_URL}/activations/{activation_id}/disable/", method="POST") @mcp.tool() async def enable_activation(activation_id: int) -> Any: """Enable an activation.""" return await make_request(f"{EDA_URL}/activations/{activation_id}/enable/", method="POST") @mcp.tool() async def restart_activation(activation_id: int) -> Any: """Restart an activation.""" return await make_request(f"{EDA_URL}/activations/{activation_id}/restart/", method="POST") @mcp.tool() async def delete_activation(activation_id: int) -> Any: """Delete an activation.""" return await make_request(f"{EDA_URL}/activations/{activation_id}/", method="DELETE") @mcp.tool() async def list_decision_environments() -> Any: """List all decision environments.""" return await make_request(f"{EDA_URL}/decision-environments/") @mcp.tool() async def create_decision_environment(payload: Dict) -> Any: """Create a new decision environment.""" return await make_request(f"{EDA_URL}/decision-environments/", method="POST", json=payload) @mcp.tool() async def list_rulebooks() -> Any: """List all rulebooks in EDA.""" return await make_request(f"{EDA_URL}/rulebooks/") @mcp.tool() async def get_rulebook(rulebook_id: int) -> Any: """Retrieve details of a specific rulebook.""" return await make_request(f"{EDA_URL}/rulebooks/{rulebook_id}/") @mcp.tool() async def list_event_streams() -> Any: """List all event streams.""" return await make_request(f"{EDA_URL}/event-streams/") if __name__ == "__main__": mcp.run(transport="stdio") ``` ``` -------------------------------------------------------------------------------- /claude_desktop_config.json: -------------------------------------------------------------------------------- ```json { "mcpServers": { "ansible": { "command": "/absolute/path/to/uv", "args": [ "--directory", "/absolute/path/to/ansible_mcp", "run", "ansible.py" ], "env": { "AAP_TOKEN": "<aap-token>", "AAP_URL": "https://<my-automation-controller>/api/controller/v2" } }, "kubernetes": { "command": "jbang", "args": [ "--quiet", "https://github.com/quarkiverse/quarkus-mcp-servers/blob/main/kubernetes/src/main/java/io/quarkiverse/mcp/servers/kubernetes/MCPServerKubernetes.java" ] } } } ``` -------------------------------------------------------------------------------- /mcp_server/deployment.yaml: -------------------------------------------------------------------------------- ```yaml kind: Deployment apiVersion: apps/v1 metadata: name: ansible-mcp-server spec: selector: matchLabels: app: ansible-mcp-server replicas: 1 template: metadata: labels: app: ansible-mcp-server spec: containers: - name: ansible-mcp-server image: quay.io/rcook/ansible-mcp:amd ports: - containerPort: 8000 protocol: TCP - containerPort: 8080 protocol: TCP env: - name: AAP_TOKEN valueFrom: secretKeyRef: name: aap key: token - name: AAP_URL valueFrom: secretKeyRef: name: aap key: url resources: {} ``` -------------------------------------------------------------------------------- /eda.py: -------------------------------------------------------------------------------- ```python import os import httpx from mcp.server.fastmcp import FastMCP from typing import Any, Dict # Environment variables for authentication EDA_URL = os.getenv("EDA_URL") EDA_TOKEN = os.getenv("EDA_TOKEN") if not EDA_TOKEN: raise ValueError("EDA_TOKEN environment variable is required") # Headers for API authentication HEADERS = { "Authorization": f"Bearer {EDA_TOKEN}", "Content-Type": "application/json" } # Initialize FastMCP mcp = FastMCP("eda") async def make_request(url: str, method: str = "GET", json: Dict = None) -> Any: """Helper function to make authenticated API requests to EDA.""" async with httpx.AsyncClient() as client: response = await client.request(method, url, headers=HEADERS, json=json) if response.status_code not in [200, 201, 204]: return f"Error {response.status_code}: {response.text}" return response.json() if "application/json" in response.headers.get("Content-Type", "") else response.text @mcp.tool() async def list_activations() -> Any: """List all activations in Event-Driven Ansible.""" return await make_request(f"{EDA_URL}/activations/") @mcp.tool() async def get_activation(activation_id: int) -> Any: """Get details of a specific activation.""" return await make_request(f"{EDA_URL}/activations/{activation_id}/") @mcp.tool() async def create_activation(payload: Dict) -> Any: """Create a new activation.""" return await make_request(f"{EDA_URL}/activations/", method="POST", json=payload) @mcp.tool() async def disable_activation(activation_id: int) -> Any: """Disable an activation.""" return await make_request(f"{EDA_URL}/activations/{activation_id}/disable/", method="POST") @mcp.tool() async def enable_activation(activation_id: int) -> Any: """Enable an activation.""" return await make_request(f"{EDA_URL}/activations/{activation_id}/enable/", method="POST") @mcp.tool() async def restart_activation(activation_id: int) -> Any: """Restart an activation.""" return await make_request(f"{EDA_URL}/activations/{activation_id}/restart/", method="POST") @mcp.tool() async def delete_activation(activation_id: int) -> Any: """Delete an activation.""" return await make_request(f"{EDA_URL}/activations/{activation_id}/", method="DELETE") @mcp.tool() async def list_decision_environments() -> Any: """List all decision environments.""" return await make_request(f"{EDA_URL}/decision-environments/") @mcp.tool() async def create_decision_environment(payload: Dict) -> Any: """Create a new decision environment.""" return await make_request(f"{EDA_URL}/decision-environments/", method="POST", json=payload) @mcp.tool() async def list_rulebooks() -> Any: """List all rulebooks in EDA.""" return await make_request(f"{EDA_URL}/rulebooks/") @mcp.tool() async def get_rulebook(rulebook_id: int) -> Any: """Retrieve details of a specific rulebook.""" return await make_request(f"{EDA_URL}/rulebooks/{rulebook_id}/") @mcp.tool() async def list_event_streams() -> Any: """List all event streams.""" return await make_request(f"{EDA_URL}/event-streams/") if __name__ == "__main__": mcp.run(transport="stdio") ``` -------------------------------------------------------------------------------- /ansible.py: -------------------------------------------------------------------------------- ```python import os import httpx from mcp.server.fastmcp import FastMCP from typing import Any # Environment variables for authentication AAP_URL = os.getenv("AAP_URL") AAP_TOKEN = os.getenv("AAP_TOKEN") if not AAP_TOKEN: raise ValueError("AAP_TOKEN is required") # Headers for API authentication HEADERS = { "Authorization": f"Bearer {AAP_TOKEN}", "Content-Type": "application/json" } # Initialize FastMCP mcp = FastMCP("ansible") async def make_request(url: str, method: str = "GET", json: dict = None) -> Any: """Helper function to make authenticated API requests to AAP.""" async with httpx.AsyncClient() as client: response = await client.request(method, url, headers=HEADERS, json=json) if response.status_code not in [200, 201]: return f"Error {response.status_code}: {response.text}" return response.json() if "application/json" in response.headers.get("Content-Type", "") else response.text @mcp.tool() async def list_inventories() -> Any: """List all inventories in Ansible Automation Platform.""" return await make_request(f"{AAP_URL}/inventories/") @mcp.tool() async def get_inventory(inventory_id: str) -> Any: """Get details of a specific inventory by ID.""" return await make_request(f"{AAP_URL}/inventories/{inventory_id}/") @mcp.tool() async def run_job(template_id: int, extra_vars: dict = {}) -> Any: """Run a job template by ID, optionally with extra_vars.""" return await make_request(f"{AAP_URL}/job_templates/{template_id}/launch/", method="POST", json={"extra_vars": extra_vars}) @mcp.tool() async def job_status(job_id: int) -> Any: """Check the status of a job by ID.""" return await make_request(f"{AAP_URL}/jobs/{job_id}/") @mcp.tool() async def job_logs(job_id: int) -> str: """Retrieve logs for a job.""" return await make_request(f"{AAP_URL}/jobs/{job_id}/stdout/") @mcp.tool() async def create_project( name: str, organization_id: int, source_control_url: str, source_control_type: str = "git", description: str = "", execution_environment_id: int = None, content_signature_validation_credential_id: int = None, source_control_branch: str = "", source_control_refspec: str = "", source_control_credential_id: int = None, clean: bool = False, update_revision_on_launch: bool = False, delete: bool = False, allow_branch_override: bool = False, track_submodules: bool = False, ) -> Any: """Create a new project in Ansible Automation Platform.""" payload = { "name": name, "description": description, "organization": organization_id, "scm_type": source_control_type.lower(), # Git is default "scm_url": source_control_url, "scm_branch": source_control_branch, "scm_refspec": source_control_refspec, "scm_clean": clean, "scm_delete_on_update": delete, "scm_update_on_launch": update_revision_on_launch, "allow_override": allow_branch_override, "scm_track_submodules": track_submodules, } if execution_environment_id: payload["execution_environment"] = execution_environment_id if content_signature_validation_credential_id: payload["signature_validation_credential"] = content_signature_validation_credential_id if source_control_credential_id: payload["credential"] = source_control_credential_id return await make_request(f"{AAP_URL}/projects/", method="POST", json=payload) @mcp.tool() async def create_job_template( name: str, project_id: int, playbook: str, inventory_id: int, job_type: str = "run", description: str = "", credential_id: int = None, execution_environment_id: int = None, labels: list[str] = None, forks: int = 0, limit: str = "", verbosity: int = 0, timeout: int = 0, job_tags: list[str] = None, skip_tags: list[str] = None, extra_vars: dict = None, privilege_escalation: bool = False, concurrent_jobs: bool = False, provisioning_callback: bool = False, enable_webhook: bool = False, prevent_instance_group_fallback: bool = False, ) -> Any: """Create a new job template in Ansible Automation Platform.""" payload = { "name": name, "description": description, "job_type": job_type, "project": project_id, "playbook": playbook, "inventory": inventory_id, "forks": forks, "limit": limit, "verbosity": verbosity, "timeout": timeout, "ask_variables_on_launch": bool(extra_vars), "ask_tags_on_launch": bool(job_tags), "ask_skip_tags_on_launch": bool(skip_tags), "ask_credential_on_launch": credential_id is None, "ask_execution_environment_on_launch": execution_environment_id is None, "ask_labels_on_launch": labels is None, "ask_inventory_on_launch": False, # Inventory is required, so not prompting "ask_job_type_on_launch": False, # Job type is required, so not prompting "become_enabled": privilege_escalation, "allow_simultaneous": concurrent_jobs, "scm_branch": "", "webhook_service": "github" if enable_webhook else "", "prevent_instance_group_fallback": prevent_instance_group_fallback, } if credential_id: payload["credential"] = credential_id if execution_environment_id: payload["execution_environment"] = execution_environment_id if labels: payload["labels"] = labels if job_tags: payload["job_tags"] = job_tags if skip_tags: payload["skip_tags"] = skip_tags if extra_vars: payload["extra_vars"] = extra_vars return await make_request(f"{AAP_URL}/job_templates/", method="POST", json=payload) @mcp.tool() async def list_inventory_sources() -> Any: """List all inventory sources in Ansible Automation Platform.""" return await make_request(f"{AAP_URL}/inventory_sources/") @mcp.tool() async def get_inventory_source(inventory_source_id: int) -> Any: """Get details of a specific inventory source.""" return await make_request(f"{AAP_URL}/inventory_sources/{inventory_source_id}/") @mcp.tool() async def create_inventory_source( name: str, inventory_id: int, source: str, credential_id: int, source_vars: dict = None, update_on_launch: bool = True, timeout: int = 0, ) -> Any: """Create a dynamic inventory source. Claude will ask for the source type and credential before proceeding.""" valid_sources = [ "file", "constructed", "scm", "ec2", "gce", "azure_rm", "vmware", "satellite6", "openstack", "rhv", "controller", "insights", "terraform", "openshift_virtualization" ] if source not in valid_sources: return f"Error: Invalid source type '{source}'. Please select from: {', '.join(valid_sources)}" if not credential_id: return "Error: Credential is required to create an inventory source." payload = { "name": name, "inventory": inventory_id, "source": source, "credential": credential_id, "source_vars": source_vars, "update_on_launch": update_on_launch, "timeout": timeout, } return await make_request(f"{AAP_URL}/inventory_sources/", method="POST", json=payload) @mcp.tool() async def update_inventory_source(inventory_source_id: int, update_data: dict) -> Any: """Update an existing inventory source.""" return await make_request(f"{AAP_URL}/inventory_sources/{inventory_source_id}/", method="PATCH", json=update_data) @mcp.tool() async def delete_inventory_source(inventory_source_id: int) -> Any: """Delete an inventory source.""" return await make_request(f"{AAP_URL}/inventory_sources/{inventory_source_id}/", method="DELETE") @mcp.tool() async def sync_inventory_source(inventory_source_id: int) -> Any: """Manually trigger a sync for an inventory source.""" return await make_request(f"{AAP_URL}/inventory_sources/{inventory_source_id}/update/", method="POST") @mcp.tool() async def create_inventory( name: str, organization_id: int, description: str = "", kind: str = "", host_filter: str = "", variables: dict = None, prevent_instance_group_fallback: bool = False, ) -> Any: """Create an inventory in Ansible Automation Platform.""" payload = { "name": name, "organization": organization_id, "description": description, "kind": kind, "host_filter": host_filter, "variables": variables, "prevent_instance_group_fallback": prevent_instance_group_fallback, } return await make_request(f"{AAP_URL}/inventories/", method="POST", json=payload) @mcp.tool() async def delete_inventory(inventory_id: int) -> Any: """Delete an inventory from Ansible Automation Platform.""" return await make_request(f"{AAP_URL}/inventories/{inventory_id}/", method="DELETE") @mcp.tool() async def list_job_templates() -> Any: """List all job templates available in Ansible Automation Platform.""" return await make_request(f"{AAP_URL}/job_templates/") @mcp.tool() async def get_job_template(template_id: int) -> Any: """Retrieve details of a specific job template.""" return await make_request(f"{AAP_URL}/job_templates/{template_id}/") @mcp.tool() async def list_jobs() -> Any: """List all jobs available in Ansible Automation Platform.""" return await make_request(f"{AAP_URL}/jobs/") @mcp.tool() async def list_recent_jobs(hours: int = 24) -> Any: """List all jobs executed in the last specified hours (default 24 hours).""" from datetime import datetime, timedelta time_filter = (datetime.utcnow() - timedelta(hours=hours)).isoformat() + "Z" return await make_request(f"{AAP_URL}/jobs/?created__gte={time_filter}") if __name__ == "__main__": mcp.run(transport="stdio") ``` -------------------------------------------------------------------------------- /mcp_server/ansible.py: -------------------------------------------------------------------------------- ```python import os import httpx from mcp.server.fastmcp import FastMCP from typing import Any # Environment variables for authentication AAP_URL = os.getenv("AAP_URL") AAP_TOKEN = os.getenv("AAP_TOKEN") if not AAP_TOKEN: raise ValueError("AAP_TOKEN is required") # Headers for API authentication HEADERS = { "Authorization": f"Bearer {AAP_TOKEN}", "Content-Type": "application/json" } # Initialize FastMCP mcp = FastMCP("ansible") async def make_request(url: str, method: str = "GET", json: dict = None) -> Any: """Helper function to make authenticated API requests to AAP.""" async with httpx.AsyncClient() as client: response = await client.request(method, url, headers=HEADERS, json=json) if response.status_code not in [200, 201]: return f"Error {response.status_code}: {response.text}" return response.json() if "application/json" in response.headers.get("Content-Type", "") else response.text @mcp.tool() async def list_inventories() -> Any: """List all inventories in Ansible Automation Platform.""" return await make_request(f"{AAP_URL}/inventories/") @mcp.tool() async def get_inventory(inventory_id: str) -> Any: """Get details of a specific inventory by ID.""" return await make_request(f"{AAP_URL}/inventories/{inventory_id}/") @mcp.tool() async def run_job(template_id: int, extra_vars: dict = {}) -> Any: """Run a job template by ID, optionally with extra_vars.""" return await make_request(f"{AAP_URL}/job_templates/{template_id}/launch/", method="POST", json={"extra_vars": extra_vars}) @mcp.tool() async def job_status(job_id: int) -> Any: """Check the status of a job by ID.""" return await make_request(f"{AAP_URL}/jobs/{job_id}/") @mcp.tool() async def job_logs(job_id: int) -> str: """Retrieve logs for a job.""" return await make_request(f"{AAP_URL}/jobs/{job_id}/stdout/") @mcp.tool() async def create_project( name: str, organization_id: int, source_control_url: str, source_control_type: str = "git", description: str = "", execution_environment_id: int = None, content_signature_validation_credential_id: int = None, source_control_branch: str = "", source_control_refspec: str = "", source_control_credential_id: int = None, clean: bool = False, update_revision_on_launch: bool = False, delete: bool = False, allow_branch_override: bool = False, track_submodules: bool = False, ) -> Any: """Create a new project in Ansible Automation Platform.""" payload = { "name": name, "description": description, "organization": organization_id, "scm_type": source_control_type.lower(), # Git is default "scm_url": source_control_url, "scm_branch": source_control_branch, "scm_refspec": source_control_refspec, "scm_clean": clean, "scm_delete_on_update": delete, "scm_update_on_launch": update_revision_on_launch, "allow_override": allow_branch_override, "scm_track_submodules": track_submodules, } if execution_environment_id: payload["execution_environment"] = execution_environment_id if content_signature_validation_credential_id: payload["signature_validation_credential"] = content_signature_validation_credential_id if source_control_credential_id: payload["credential"] = source_control_credential_id return await make_request(f"{AAP_URL}/projects/", method="POST", json=payload) @mcp.tool() async def create_job_template( name: str, project_id: int, playbook: str, inventory_id: int, job_type: str = "run", description: str = "", credential_id: int = None, execution_environment_id: int = None, labels: list[str] = None, forks: int = 0, limit: str = "", verbosity: int = 0, timeout: int = 0, job_tags: list[str] = None, skip_tags: list[str] = None, extra_vars: dict = None, privilege_escalation: bool = False, concurrent_jobs: bool = False, provisioning_callback: bool = False, enable_webhook: bool = False, prevent_instance_group_fallback: bool = False, ) -> Any: """Create a new job template in Ansible Automation Platform.""" payload = { "name": name, "description": description, "job_type": job_type, "project": project_id, "playbook": playbook, "inventory": inventory_id, "forks": forks, "limit": limit, "verbosity": verbosity, "timeout": timeout, "ask_variables_on_launch": bool(extra_vars), "ask_tags_on_launch": bool(job_tags), "ask_skip_tags_on_launch": bool(skip_tags), "ask_credential_on_launch": credential_id is None, "ask_execution_environment_on_launch": execution_environment_id is None, "ask_labels_on_launch": labels is None, "ask_inventory_on_launch": False, # Inventory is required, so not prompting "ask_job_type_on_launch": False, # Job type is required, so not prompting "become_enabled": privilege_escalation, "allow_simultaneous": concurrent_jobs, "scm_branch": "", "webhook_service": "github" if enable_webhook else "", "prevent_instance_group_fallback": prevent_instance_group_fallback, } if credential_id: payload["credential"] = credential_id if execution_environment_id: payload["execution_environment"] = execution_environment_id if labels: payload["labels"] = labels if job_tags: payload["job_tags"] = job_tags if skip_tags: payload["skip_tags"] = skip_tags if extra_vars: payload["extra_vars"] = extra_vars return await make_request(f"{AAP_URL}/job_templates/", method="POST", json=payload) @mcp.tool() async def list_inventory_sources() -> Any: """List all inventory sources in Ansible Automation Platform.""" return await make_request(f"{AAP_URL}/inventory_sources/") @mcp.tool() async def get_inventory_source(inventory_source_id: int) -> Any: """Get details of a specific inventory source.""" return await make_request(f"{AAP_URL}/inventory_sources/{inventory_source_id}/") @mcp.tool() async def create_inventory_source( name: str, inventory_id: int, source: str, credential_id: int, source_vars: dict = None, update_on_launch: bool = True, timeout: int = 0, ) -> Any: """Create a dynamic inventory source. Claude will ask for the source type and credential before proceeding.""" valid_sources = [ "file", "constructed", "scm", "ec2", "gce", "azure_rm", "vmware", "satellite6", "openstack", "rhv", "controller", "insights", "terraform", "openshift_virtualization" ] if source not in valid_sources: return f"Error: Invalid source type '{source}'. Please select from: {', '.join(valid_sources)}" if not credential_id: return "Error: Credential is required to create an inventory source." payload = { "name": name, "inventory": inventory_id, "source": source, "credential": credential_id, "source_vars": source_vars, "update_on_launch": update_on_launch, "timeout": timeout, } return await make_request(f"{AAP_URL}/inventory_sources/", method="POST", json=payload) @mcp.tool() async def update_inventory_source(inventory_source_id: int, update_data: dict) -> Any: """Update an existing inventory source.""" return await make_request(f"{AAP_URL}/inventory_sources/{inventory_source_id}/", method="PATCH", json=update_data) @mcp.tool() async def delete_inventory_source(inventory_source_id: int) -> Any: """Delete an inventory source.""" return await make_request(f"{AAP_URL}/inventory_sources/{inventory_source_id}/", method="DELETE") @mcp.tool() async def sync_inventory_source(inventory_source_id: int) -> Any: """Manually trigger a sync for an inventory source.""" return await make_request(f"{AAP_URL}/inventory_sources/{inventory_source_id}/update/", method="POST") @mcp.tool() async def create_inventory( name: str, organization_id: int, description: str = "", kind: str = "", host_filter: str = "", variables: dict = None, prevent_instance_group_fallback: bool = False, ) -> Any: """Create an inventory in Ansible Automation Platform.""" payload = { "name": name, "organization": organization_id, "description": description, "kind": kind, "host_filter": host_filter, "variables": variables, "prevent_instance_group_fallback": prevent_instance_group_fallback, } return await make_request(f"{AAP_URL}/inventories/", method="POST", json=payload) @mcp.tool() async def delete_inventory(inventory_id: int) -> Any: """Delete an inventory from Ansible Automation Platform.""" return await make_request(f"{AAP_URL}/inventories/{inventory_id}/", method="DELETE") @mcp.tool() async def list_job_templates() -> Any: """List all job templates available in Ansible Automation Platform.""" return await make_request(f"{AAP_URL}/job_templates/") @mcp.tool() async def get_job_template(template_id: int) -> Any: """Retrieve details of a specific job template.""" return await make_request(f"{AAP_URL}/job_templates/{template_id}/") @mcp.tool() async def list_jobs() -> Any: """List all jobs available in Ansible Automation Platform.""" return await make_request(f"{AAP_URL}/jobs/") @mcp.tool() async def list_recent_jobs(hours: int = 24) -> Any: """List all jobs executed in the last specified hours (default 24 hours).""" from datetime import datetime, timedelta time_filter = (datetime.utcnow() - timedelta(hours=hours)).isoformat() + "Z" return await make_request(f"{AAP_URL}/jobs/?created__gte={time_filter}") if __name__ == "__main__": # Change from stdio to sse transport mcp.run(transport="sse") ``` -------------------------------------------------------------------------------- /app.py: -------------------------------------------------------------------------------- ```python import streamlit as st from llama_stack_client import LlamaStackClient from llama_stack_client.lib.agents.agent import Agent from llama_stack_client.types.agent_create_params import AgentConfig import os import json from datetime import datetime # Initialize LlamaStack client base_url = os.getenv("BASE_URL", "http://localhost:8321") client = LlamaStackClient(base_url=base_url) # Page configuration st.set_page_config( page_title="Llama-stack Chat", page_icon="🦙", layout="wide", ) # Styling for code blocks st.markdown(""" <style> code { background-color: #f0f2f6; border-radius: 3px; padding: 0.2em 0.4em; } pre { background-color: #f0f2f6; border-radius: 5px; padding: 0.5em; } .chat-message { padding: 1.5rem; border-radius: 0.5rem; margin-bottom: 1rem; } .blinking-cursor { animation: blink 1s step-end infinite; } @keyframes blink { 50% { opacity: 0; } } </style> """, unsafe_allow_html=True) # Get providers with valid credentials (simplified approach) @st.cache_data(ttl=300) def get_configured_providers(): try: # This is a simplified approach that assumes Anthropic is configured # In a real implementation, you would check LLama Stack's configuration return ["anthropic"] except Exception as e: st.error(f"Error checking configured providers: {e}") return ["anthropic"] # Fallback to Anthropic # Get available models from configured providers @st.cache_data(ttl=300) def get_available_models(): try: models = client.models.list() configured_providers = get_configured_providers() # Filter for LLM models from configured providers available_models = [ model.identifier for model in models if model.model_type == "llm" and model.provider_id in configured_providers ] return available_models except Exception as e: st.error(f"Error fetching models: {e}") return ["anthropic/claude-3-7-sonnet-latest"] # Fallback default # Get all available toolgroups @st.cache_data(ttl=300) def get_all_toolgroups(): try: toolgroups = client.toolgroups.list() return [toolgroup.identifier for toolgroup in toolgroups] except Exception as e: st.error(f"Error fetching toolgroups: {e}") return ["mcp::ansible"] # Fallback default # Initialize session state if "messages" not in st.session_state: st.session_state.messages = [] if "system_instruction" not in st.session_state: st.session_state.system_instruction = "You are a helpful assistant that can access various tools to help the user." if "show_save_dialog" not in st.session_state: st.session_state.show_save_dialog = False if "saved_chats" not in st.session_state: st.session_state.saved_chats = "{}" if "chat_updated" not in st.session_state: st.session_state.chat_updated = False # Function to check if there are messages def has_messages(): return len(st.session_state.messages) > 0 # Streamlit UI st.title("Llama-stack Chat") st.markdown("Chat with LLama Stack and its toolgroups") # Sidebar configurations with st.sidebar: st.header("Configuration") # Model selection available_models = get_available_models() selected_model = st.selectbox("Select Model", available_models) # Get all toolgroups automatically and display as collapsible all_toolgroups = get_all_toolgroups() with st.expander(f"{len(all_toolgroups)} Toolgroups Loaded"): for toolgroup in all_toolgroups: st.caption(f"• {toolgroup}") # System instructions with st.expander("System Instructions", expanded=False): new_instruction = st.text_area( "Customize how the assistant behaves:", st.session_state.system_instruction, height=100 ) if new_instruction != st.session_state.system_instruction: st.session_state.system_instruction = new_instruction st.toast("System instructions updated") # Collapsible Query Context section with st.expander("Query Context", expanded=False): query_context = st.text_area("Add background information for this query:", "", height=150) st.caption("This information will be included with each of your queries but won't be visible in the chat.") # Temperature in collapsible with st.expander("Temperature", expanded=False): temperature = st.slider("Temperature", min_value=0.0, max_value=1.0, value=0.7, step=0.1) top_p = st.slider("Top P", min_value=0.0, max_value=1.0, value=0.9, step=0.1) # Chat history management st.header("Chat Management") # Using two separate buttons instead of columns for better visibility clear_col, save_col = st.columns(2) # Clear chat button with clear_col: if st.button("🗑️ Clear Chat", key="clear_chat"): st.session_state.messages = [] st.session_state.chat_updated = True st.rerun() # Save button - disabled when no messages with save_col: if st.button("💾 Save Chat", key="save_chat", disabled=not has_messages()): st.session_state.show_save_dialog = True # Save dialog - shown when save button is clicked if st.session_state.show_save_dialog: st.text_input("Conversation name:", key="save_name") save_confirm, cancel = st.columns(2) with save_confirm: if st.button("Confirm Save", key="confirm_save"): if st.session_state.save_name: saved_chats = json.loads(st.session_state.saved_chats) saved_chats[st.session_state.save_name] = st.session_state.messages st.session_state.saved_chats = json.dumps(saved_chats) st.session_state.show_save_dialog = False st.toast(f"Saved conversation: {st.session_state.save_name}") st.rerun() else: st.warning("Please enter a name for the conversation") with cancel: if st.button("Cancel", key="cancel_save"): st.session_state.show_save_dialog = False st.rerun() # Export chat button - disabled when no messages if st.button("📥 Export Chat", key="export_chat", disabled=not has_messages()): chat_export = "" for msg in st.session_state.messages: prefix = "🧑" if msg["role"] == "user" else "🤖" chat_export += f"{prefix} **{msg['role'].capitalize()}**: {msg['content']}\n\n" # Create download link timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") filename = f"chat_export_{timestamp}.md" st.download_button( label="Download Chat", data=chat_export, file_name=filename, mime="text/markdown", key="download_chat" ) # Load saved conversations st.header("Saved Conversations") # Load saved conversation saved_chats = json.loads(st.session_state.saved_chats) if saved_chats: chat_names = list(saved_chats.keys()) selected_chat = st.selectbox("Select a saved conversation:", [""] + chat_names) if selected_chat and st.button("📂 Load Conversation"): st.session_state.messages = saved_chats[selected_chat] st.session_state.chat_updated = True st.toast(f"Loaded conversation: {selected_chat}") st.rerun() else: st.caption("No saved conversations yet") # Display chat history for message in st.session_state.messages: with st.chat_message(message["role"]): # Apply syntax highlighting for code blocks content = message["content"] st.markdown(content) # Input for new messages prompt = st.chat_input("Ask something...") if prompt: full_response = "" agent_config = AgentConfig( model=selected_model, instructions=st.session_state.system_instruction, sampling_params={ "strategy": {"type": "top_p", "temperature": temperature, "top_p": top_p}, }, toolgroups=all_toolgroups, # Use all toolgroups automatically tool_choice="auto", input_shields=[], output_shields=[], enable_session_persistence=True, ) try: agent = Agent(client, agent_config) session_id = agent.create_session("chat-session") # Add user input to chat history st.session_state.messages.append({"role": "user", "content": prompt}) with st.chat_message("user"): st.markdown(prompt) # Get response from LlamaStack API with st.chat_message("assistant"): message_placeholder = st.empty() # Add query context if provided user_message = prompt if query_context: user_message += f"\n\nContext: {query_context}" response = agent.create_turn( messages=[{"role": "user", "content": user_message}], session_id=session_id, ) for chunk in response: if hasattr(chunk, 'event') and hasattr(chunk.event, 'payload'): payload = chunk.event.payload if hasattr(payload, 'event_type') and payload.event_type == "step_progress": if hasattr(payload, 'delta') and hasattr(payload.delta, 'type') and payload.delta.type == "text": full_response += payload.delta.text message_placeholder.markdown(full_response + "▌") message_placeholder.markdown(full_response) st.session_state.messages.append({"role": "assistant", "content": full_response}) # Set flag to indicate chat has been updated st.session_state.chat_updated = True # Force a rerun to update the UI state (including button states) st.rerun() except Exception as e: st.error(f"Error: {str(e)}") # Reset the chat_updated flag after processing if st.session_state.chat_updated: st.session_state.chat_updated = False ``` -------------------------------------------------------------------------------- /redhat_insights_mcp.py: -------------------------------------------------------------------------------- ```python """ Red Hat Insights MCP Server This server requires a Red Hat service account with client credentials. Setup: 1. Create a service account in Red Hat Console (console.redhat.com) 2. Assign appropriate permissions via User Access → Groups 3. Set environment variables: export INSIGHTS_CLIENT_ID="your-client-id" export INSIGHTS_CLIENT_SECRET="your-client-secret" Optional: export INSIGHTS_BASE_URL="https://console.redhat.com/api" export SSO_URL="https://sso.redhat.com/auth/realms/redhat-external/protocol/openid-connect/token" """ import os import httpx from mcp.server.fastmcp import FastMCP from typing import Any, Optional from datetime import datetime, timedelta # Environment variables for authentication INSIGHTS_BASE_URL = os.getenv("INSIGHTS_BASE_URL", "https://console.redhat.com/api") INSIGHTS_CLIENT_ID = os.getenv("INSIGHTS_CLIENT_ID") INSIGHTS_CLIENT_SECRET = os.getenv("INSIGHTS_CLIENT_SECRET") SSO_URL = os.getenv("SSO_URL", "https://sso.redhat.com/auth/realms/redhat-external/protocol/openid-connect/token") if not INSIGHTS_CLIENT_ID or not INSIGHTS_CLIENT_SECRET: raise ValueError("INSIGHTS_CLIENT_ID and INSIGHTS_CLIENT_SECRET are required") # Global variable to store the access token _access_token = None _token_expires_at = None # Initialize FastMCP mcp = FastMCP("insights") async def get_access_token() -> str: """Get or refresh the access token using client credentials.""" global _access_token, _token_expires_at # Check if we have a valid token if _access_token and _token_expires_at and datetime.utcnow() < _token_expires_at: return _access_token # Request new token async with httpx.AsyncClient() as client: response = await client.post( SSO_URL, headers={"Content-Type": "application/x-www-form-urlencoded"}, data={ "grant_type": "client_credentials", "scope": "api.console", "client_id": INSIGHTS_CLIENT_ID, "client_secret": INSIGHTS_CLIENT_SECRET } ) if response.status_code != 200: raise Exception(f"Failed to get access token: {response.status_code} {response.text}") token_data = response.json() _access_token = token_data["access_token"] # Set expiration time with some buffer (subtract 60 seconds) expires_in = token_data.get("expires_in", 300) # Default to 5 minutes _token_expires_at = datetime.utcnow() + timedelta(seconds=expires_in - 60) return _access_token async def make_request(url: str, method: str = "GET", json: dict = None, params: dict = None) -> Any: """Helper function to make authenticated API requests to Red Hat Insights.""" token = await get_access_token() headers = { "Authorization": f"Bearer {token}", "Content-Type": "application/json" } async with httpx.AsyncClient() as client: response = await client.request(method, url, headers=headers, json=json, params=params) if response.status_code not in [200, 201, 204]: return f"Error {response.status_code}: {response.text}" return response.json() if "application/json" in response.headers.get("Content-Type", "") else response.text # Authentication Test @mcp.tool() async def test_authentication() -> Any: """Test authentication with Red Hat Insights using service account credentials.""" try: token = await get_access_token() # Test with a simple API call result = await make_request(f"{INSIGHTS_BASE_URL}/inventory/v1/hosts?limit=1") return {"status": "success", "message": "Authentication successful", "sample_data": result} except Exception as e: return {"status": "error", "message": f"Authentication failed: {str(e)}"} # Host Inventory Management Tools @mcp.tool() async def list_systems(limit: int = 50, offset: int = 0, display_name: str = None, staleness: str = None) -> Any: """List all hosts/systems registered with Red Hat Insights. Use staleness='fresh' or 'stale' to filter.""" params = {"limit": limit, "offset": offset} if display_name: params["display_name"] = display_name if staleness: params["staleness"] = staleness return await make_request(f"{INSIGHTS_BASE_URL}/inventory/v1/hosts", params=params) @mcp.tool() async def get_system(system_id: str) -> Any: """Get details of a specific system by UUID.""" return await make_request(f"{INSIGHTS_BASE_URL}/inventory/v1/hosts/{system_id}") @mcp.tool() async def get_system_profile(system_id: str, fields: list[str] = None) -> Any: """Get system profile/facts for a specific system. Specify fields to limit response.""" url = f"{INSIGHTS_BASE_URL}/inventory/v1/hosts/{system_id}/system_profile" params = {} if fields: for field in fields: params[f"fields[system_profile]"] = field return await make_request(url, params=params) @mcp.tool() async def get_system_tags(system_id: str) -> Any: """Get tags for a specific system.""" return await make_request(f"{INSIGHTS_BASE_URL}/inventory/v1/hosts/{system_id}/tags") @mcp.tool() async def delete_system(system_id: str) -> Any: """Remove a system from Red Hat Insights inventory.""" return await make_request(f"{INSIGHTS_BASE_URL}/inventory/v1/hosts/{system_id}", method="DELETE") # Vulnerability Management Tools @mcp.tool() async def list_vulnerabilities( limit: int = 50, offset: int = 0, affecting: bool = True, cvss_score_gte: float = None, cvss_score_lte: float = None ) -> Any: """List vulnerabilities affecting your systems. Set affecting=True to only show CVEs affecting systems.""" params = {"limit": limit, "offset": offset} if affecting: params["affecting"] = "true" if cvss_score_gte: params["cvss_score_gte"] = cvss_score_gte if cvss_score_lte: params["cvss_score_lte"] = cvss_score_lte return await make_request(f"{INSIGHTS_BASE_URL}/vulnerability/v1/vulnerabilities/cves", params=params) @mcp.tool() async def get_vulnerability_executive_report() -> Any: """Get executive vulnerability report with CVE summaries by severity.""" return await make_request(f"{INSIGHTS_BASE_URL}/vulnerability/v1/report/executive") # Patch Management Tools @mcp.tool() async def list_advisories( limit: int = 50, offset: int = 0, advisory_type: str = None, severity: str = None ) -> Any: """List available advisories (patches). Export format from patch/v3.""" params = {"limit": limit, "offset": offset} if advisory_type: params["advisory_type"] = advisory_type if severity: params["severity"] = severity return await make_request(f"{INSIGHTS_BASE_URL}/patch/v3/export/advisories", params=params) # Compliance Tools @mcp.tool() async def list_compliance_policies(limit: int = 50, offset: int = 0) -> Any: """List SCAP compliance policies.""" params = {"limit": limit, "offset": offset} return await make_request(f"{INSIGHTS_BASE_URL}/compliance/v2/policies", params=params) @mcp.tool() async def list_compliance_systems(assigned_or_scanned: bool = True) -> Any: """List systems associated with SCAP policies.""" params = {} if assigned_or_scanned: params["filter"] = "assigned_or_scanned=true" return await make_request(f"{INSIGHTS_BASE_URL}/compliance/v2/systems", params=params) @mcp.tool() async def associate_compliance_policy(policy_id: str, system_id: str) -> Any: """Associate a system with a SCAP compliance policy.""" return await make_request(f"{INSIGHTS_BASE_URL}/compliance/v2/policies/{policy_id}/systems/{system_id}", method="PATCH") @mcp.tool() async def list_compliance_reports(limit: int = 50, offset: int = 0) -> Any: """List all compliance reports.""" params = {"limit": limit, "offset": offset} return await make_request(f"{INSIGHTS_BASE_URL}/compliance/v2/reports", params=params) # Recommendations and Advisor Tools @mcp.tool() async def list_recommendations( category: str = None, impact: str = None, limit: int = 50, offset: int = 0 ) -> Any: """List available recommendation rules from Advisor.""" params = {"limit": limit, "offset": offset} if category: params["category"] = category if impact: params["impact"] = impact return await make_request(f"{INSIGHTS_BASE_URL}/insights/v1/rule", params=params) @mcp.tool() async def export_rule_hits(has_playbook: bool = None, format: str = "json") -> Any: """Export all rule hits (recommendations) for systems. Set has_playbook=True for Ansible playbooks.""" params = {} if has_playbook: params["has_playbook"] = "true" return await make_request(f"{INSIGHTS_BASE_URL}/insights/v1/export/hits", params=params) @mcp.tool() async def get_system_recommendations(system_id: str) -> Any: """Get recommendation summary for a specific system.""" return await make_request(f"{INSIGHTS_BASE_URL}/insights/v1/system/{system_id}") # Policy Management Tools @mcp.tool() async def list_policies(limit: int = 50, offset: int = 0) -> Any: """List all defined custom policies.""" params = {"limit": limit, "offset": offset} return await make_request(f"{INSIGHTS_BASE_URL}/policies/v1/policies", params=params) @mcp.tool() async def create_policy(name: str, description: str, conditions: str, actions: str = "notification", is_enabled: bool = True) -> Any: """Create a new custom policy. Example conditions: 'arch = \"x86_64\"'""" payload = { "name": name, "description": description, "conditions": conditions, "actions": actions, "isEnabled": is_enabled } return await make_request(f"{INSIGHTS_BASE_URL}/policies/v1/policies", method="POST", json=payload) @mcp.tool() async def get_policy_triggers(policy_id: str) -> Any: """Get systems that triggered a specific policy.""" return await make_request(f"{INSIGHTS_BASE_URL}/policies/v1/policies/{policy_id}/history/trigger") # Remediation Tools @mcp.tool() async def list_remediations(limit: int = 50, offset: int = 0) -> Any: """List all defined remediation plans.""" params = {"limit": limit, "offset": offset} return await make_request(f"{INSIGHTS_BASE_URL}/remediations/v1/remediations", params=params) @mcp.tool() async def create_remediation(name: str, issues: list[dict], auto_reboot: bool = False, archived: bool = False) -> Any: """Create a new remediation plan. Issues should be list of dicts with id, resolution, systems.""" payload = { "name": name, "auto_reboot": auto_reboot, "archived": archived, "add": { "issues": issues } } return await make_request(f"{INSIGHTS_BASE_URL}/remediations/v1/remediations", method="POST", json=payload) @mcp.tool() async def get_remediation_playbook(remediation_id: str) -> Any: """Get Ansible playbook for a remediation plan.""" return await make_request(f"{INSIGHTS_BASE_URL}/remediations/v1/remediations/{remediation_id}/playbook") @mcp.tool() async def execute_remediation(remediation_id: str) -> Any: """Execute a remediation plan.""" return await make_request(f"{INSIGHTS_BASE_URL}/remediations/v1/remediations/{remediation_id}/playbook_runs", method="POST") # Subscription Management @mcp.tool() async def list_rhel_subscriptions(product: str = "RHEL for x86", limit: int = 50, offset: int = 0) -> Any: """List systems with RHEL subscriptions. Product examples: 'RHEL for x86', 'RHEL for x86_64'""" from urllib.parse import quote encoded_product = quote(product) params = {"limit": limit, "offset": offset} return await make_request(f"{INSIGHTS_BASE_URL}/rhsm-subscriptions/v1/instances/products/{encoded_product}", params=params) # Export Tools @mcp.tool() async def create_export(name: str, format: str, application: str, resource: str) -> Any: """Create an export request. Common applications: 'urn:redhat:application:inventory', 'subscriptions'""" payload = { "name": name, "format": format, "sources": [{ "application": application, "resource": resource }] } return await make_request(f"{INSIGHTS_BASE_URL}/export/v1/exports", method="POST", json=payload) @mcp.tool() async def get_export_status(export_id: str) -> Any: """Get status of an export request.""" return await make_request(f"{INSIGHTS_BASE_URL}/export/v1/exports/{export_id}/status") @mcp.tool() async def download_export(export_id: str) -> Any: """Download completed export as ZIP file.""" return await make_request(f"{INSIGHTS_BASE_URL}/export/v1/exports/{export_id}") # Notifications and Integrations @mcp.tool() async def list_notification_events(start_date: str = None, end_date: str = None, limit: int = 20, offset: int = 0) -> Any: """Get notification event history. Dates in YYYY-MM-DD format.""" params = {"limit": limit, "offset": offset} if start_date: params["startDate"] = start_date if end_date: params["endDate"] = end_date return await make_request(f"{INSIGHTS_BASE_URL}/notifications/v1/notifications/events", params=params) @mcp.tool() async def list_integrations() -> Any: """List configured third-party integrations.""" return await make_request(f"{INSIGHTS_BASE_URL}/integrations/v1/endpoints") # Analytics and Statistics @mcp.tool() async def get_insights_overview() -> Any: """Get overview of systems and basic statistics by querying inventory.""" # Use inventory endpoint to get basic stats since there's no single stats endpoint result = await make_request(f"{INSIGHTS_BASE_URL}/inventory/v1/hosts?limit=1") return result # Content Sources and Templates @mcp.tool() async def list_repositories(limit: int = 50, offset: int = 0) -> Any: """List all existing content repositories.""" params = {"limit": limit, "offset": offset} return await make_request(f"{INSIGHTS_BASE_URL}/content-sources/v1.0/repositories", params=params) @mcp.tool() async def create_repository(name: str, url: str, distribution_arch: str = "x86_64", distribution_versions: list[str] = None) -> Any: """Create a new custom repository.""" payload = { "name": name, "url": url, "distribution_arch": distribution_arch, "distribution_versions": distribution_versions or ["9"], "metadata_verification": False, "module_hotfixes": False, "snapshot": False } return await make_request(f"{INSIGHTS_BASE_URL}/content-sources/v1.0/repositories", method="POST", json=payload) @mcp.tool() async def list_content_templates(limit: int = 50, offset: int = 0) -> Any: """List all content templates.""" params = {"limit": limit, "offset": offset} return await make_request(f"{INSIGHTS_BASE_URL}/content-sources/v1.0/templates", params=params) @mcp.tool() async def create_content_template(name: str, arch: str, version: str, repository_uuids: list[str], description: str = "") -> Any: """Create a new content template.""" payload = { "name": name, "arch": arch, "version": version, "description": description, "repository_uuids": repository_uuids, "use_latest": True } return await make_request(f"{INSIGHTS_BASE_URL}/content-sources/v1.0/templates", method="POST", json=payload) if __name__ == "__main__": mcp.run(transport="stdio") ```