# Directory Structure ``` ├── ansible.py ├── app.py ├── claude_desktop_config.json ├── eda.py ├── llama_quick_start_guide.md ├── mcp_server │ ├── ansible.py │ ├── Containerfile │ └── deployment.yaml ├── README.md └── redhat_insights_mcp.py ``` # Files -------------------------------------------------------------------------------- /README.md: -------------------------------------------------------------------------------- ```markdown 1 | # Setup of the Environment for the AI Powered Ansible & OpenShift Automation with Model Context Protocols (MCP) Servers 2 | 3 | ## Overview 4 | 5 | This guide will walk you through setting up the MCP Servers + Claude Desktop portions of the demo that focused on using Claude Desktop to interact with your Ansible Automation Platform and OpenShift Cluster environments. 6 | 7 | ## Prerequisites 8 | 9 | Ensure you have the following installed. 10 | 11 | ### Required 12 | - An Ansible Automation Platform (AAP) environment 13 | - An OpenShift Cluster with OpenShift Virtualization 14 | - [Claude Desktop](https://claude.ai/download) installed on your laptop (Pro Plan required for best results) 15 | - Python 3.10 or higher installed on your laptop 16 | - Ensure you are authenticated with your OpenShift cluster (e.g. exporting kubeconfig) 17 | 18 | ## Step One: Setup your laptop environment 19 | 20 | Install `uv` and setup your Python project and environment. 21 | 22 | ``` 23 | curl -LsSf https://astral.sh/uv/install.sh | sh 24 | ``` 25 | 26 | Install [jbang](https://www.jbang.dev/download/) which will be used when using the Kubernetes MCP Server. (jbang needs to be installed globally, recommend using the homebrew install pattern. If you install it locally (the curl pattern), Claude won't be able to access it). 27 | 28 | Restart your terminal to ensure that the `uv` and `jbang` command are now available. 29 | 30 | ## Step Two: Create and Setup your Project 31 | 32 | ``` 33 | # Create a new directory for our project 34 | uv init ansible 35 | cd ansible 36 | 37 | # Create virtual environment and activate it 38 | uv venv 39 | source .venv/bin/activate 40 | 41 | # Install dependencies 42 | uv add "mcp[cli]" httpx 43 | 44 | # Create our server file 45 | touch ansible.py 46 | ``` 47 | 48 | ## Step 3 Building your Ansible Automation Controller MCP Server 49 | 50 | This is the MCP Server I used to interact with my automation controller. Feel free to copy/paste this into your `ansible.py` file. 51 | 52 | Note: to connect to self-signed SSL, use edit the async client to be https.AsyncClient(verify=False) 53 | 54 | ``` 55 | import os 56 | import httpx 57 | from mcp.server.fastmcp import FastMCP 58 | from typing import Any 59 | 60 | # Environment variables for authentication 61 | AAP_URL = os.getenv("AAP_URL") 62 | AAP_TOKEN = os.getenv("AAP_TOKEN") 63 | 64 | if not AAP_TOKEN: 65 | raise ValueError("AAP_TOKEN is required") 66 | 67 | # Headers for API authentication 68 | HEADERS = { 69 | "Authorization": f"Bearer {AAP_TOKEN}", 70 | "Content-Type": "application/json" 71 | } 72 | 73 | # Initialize FastMCP 74 | mcp = FastMCP("ansible") 75 | 76 | async def make_request(url: str, method: str = "GET", json: dict = None) -> Any: 77 | """Helper function to make authenticated API requests to AAP.""" 78 | async with httpx.AsyncClient() as client: 79 | response = await client.request(method, url, headers=HEADERS, json=json) 80 | if response.status_code not in [200, 201]: 81 | return f"Error {response.status_code}: {response.text}" 82 | return response.json() if "application/json" in response.headers.get("Content-Type", "") else response.text 83 | 84 | @mcp.tool() 85 | async def list_inventories() -> Any: 86 | """List all inventories in Ansible Automation Platform.""" 87 | return await make_request(f"{AAP_URL}/inventories/") 88 | 89 | @mcp.tool() 90 | async def get_inventory(inventory_id: str) -> Any: 91 | """Get details of a specific inventory by ID.""" 92 | return await make_request(f"{AAP_URL}/inventories/{inventory_id}/") 93 | 94 | @mcp.tool() 95 | async def run_job(template_id: int, extra_vars: dict = {}) -> Any: 96 | """Run a job template by ID, optionally with extra_vars.""" 97 | return await make_request(f"{AAP_URL}/job_templates/{template_id}/launch/", method="POST", json={"extra_vars": extra_vars}) 98 | 99 | @mcp.tool() 100 | async def job_status(job_id: int) -> Any: 101 | """Check the status of a job by ID.""" 102 | return await make_request(f"{AAP_URL}/jobs/{job_id}/") 103 | 104 | @mcp.tool() 105 | async def job_logs(job_id: int) -> str: 106 | """Retrieve logs for a job.""" 107 | return await make_request(f"{AAP_URL}/jobs/{job_id}/stdout/") 108 | 109 | @mcp.tool() 110 | async def create_project( 111 | name: str, 112 | organization_id: int, 113 | source_control_url: str, 114 | source_control_type: str = "git", 115 | description: str = "", 116 | execution_environment_id: int = None, 117 | content_signature_validation_credential_id: int = None, 118 | source_control_branch: str = "", 119 | source_control_refspec: str = "", 120 | source_control_credential_id: int = None, 121 | clean: bool = False, 122 | update_revision_on_launch: bool = False, 123 | delete: bool = False, 124 | allow_branch_override: bool = False, 125 | track_submodules: bool = False, 126 | ) -> Any: 127 | """Create a new project in Ansible Automation Platform.""" 128 | 129 | payload = { 130 | "name": name, 131 | "description": description, 132 | "organization": organization_id, 133 | "scm_type": source_control_type.lower(), # Git is default 134 | "scm_url": source_control_url, 135 | "scm_branch": source_control_branch, 136 | "scm_refspec": source_control_refspec, 137 | "scm_clean": clean, 138 | "scm_delete_on_update": delete, 139 | "scm_update_on_launch": update_revision_on_launch, 140 | "allow_override": allow_branch_override, 141 | "scm_track_submodules": track_submodules, 142 | } 143 | 144 | if execution_environment_id: 145 | payload["execution_environment"] = execution_environment_id 146 | if content_signature_validation_credential_id: 147 | payload["signature_validation_credential"] = content_signature_validation_credential_id 148 | if source_control_credential_id: 149 | payload["credential"] = source_control_credential_id 150 | 151 | return await make_request(f"{AAP_URL}/projects/", method="POST", json=payload) 152 | 153 | @mcp.tool() 154 | async def create_job_template( 155 | name: str, 156 | project_id: int, 157 | playbook: str, 158 | inventory_id: int, 159 | job_type: str = "run", 160 | description: str = "", 161 | credential_id: int = None, 162 | execution_environment_id: int = None, 163 | labels: list[str] = None, 164 | forks: int = 0, 165 | limit: str = "", 166 | verbosity: int = 0, 167 | timeout: int = 0, 168 | job_tags: list[str] = None, 169 | skip_tags: list[str] = None, 170 | extra_vars: dict = None, 171 | privilege_escalation: bool = False, 172 | concurrent_jobs: bool = False, 173 | provisioning_callback: bool = False, 174 | enable_webhook: bool = False, 175 | prevent_instance_group_fallback: bool = False, 176 | ) -> Any: 177 | """Create a new job template in Ansible Automation Platform.""" 178 | 179 | payload = { 180 | "name": name, 181 | "description": description, 182 | "job_type": job_type, 183 | "project": project_id, 184 | "playbook": playbook, 185 | "inventory": inventory_id, 186 | "forks": forks, 187 | "limit": limit, 188 | "verbosity": verbosity, 189 | "timeout": timeout, 190 | "ask_variables_on_launch": bool(extra_vars), 191 | "ask_tags_on_launch": bool(job_tags), 192 | "ask_skip_tags_on_launch": bool(skip_tags), 193 | "ask_credential_on_launch": credential_id is None, 194 | "ask_execution_environment_on_launch": execution_environment_id is None, 195 | "ask_labels_on_launch": labels is None, 196 | "ask_inventory_on_launch": False, # Inventory is required, so not prompting 197 | "ask_job_type_on_launch": False, # Job type is required, so not prompting 198 | "become_enabled": privilege_escalation, 199 | "allow_simultaneous": concurrent_jobs, 200 | "scm_branch": "", 201 | "webhook_service": "github" if enable_webhook else "", 202 | "prevent_instance_group_fallback": prevent_instance_group_fallback, 203 | } 204 | 205 | if credential_id: 206 | payload["credential"] = credential_id 207 | if execution_environment_id: 208 | payload["execution_environment"] = execution_environment_id 209 | if labels: 210 | payload["labels"] = labels 211 | if job_tags: 212 | payload["job_tags"] = job_tags 213 | if skip_tags: 214 | payload["skip_tags"] = skip_tags 215 | if extra_vars: 216 | payload["extra_vars"] = extra_vars 217 | 218 | return await make_request(f"{AAP_URL}/job_templates/", method="POST", json=payload) 219 | 220 | @mcp.tool() 221 | async def list_inventory_sources() -> Any: 222 | """List all inventory sources in Ansible Automation Platform.""" 223 | return await make_request(f"{AAP_URL}/inventory_sources/") 224 | 225 | @mcp.tool() 226 | async def get_inventory_source(inventory_source_id: int) -> Any: 227 | """Get details of a specific inventory source.""" 228 | return await make_request(f"{AAP_URL}/inventory_sources/{inventory_source_id}/") 229 | 230 | @mcp.tool() 231 | async def create_inventory_source( 232 | name: str, 233 | inventory_id: int, 234 | source: str, 235 | credential_id: int, 236 | source_vars: dict = None, 237 | update_on_launch: bool = True, 238 | timeout: int = 0, 239 | ) -> Any: 240 | """Create a dynamic inventory source. Claude will ask for the source type and credential before proceeding.""" 241 | valid_sources = [ 242 | "file", "constructed", "scm", "ec2", "gce", "azure_rm", "vmware", "satellite6", "openstack", 243 | "rhv", "controller", "insights", "terraform", "openshift_virtualization" 244 | ] 245 | 246 | if source not in valid_sources: 247 | return f"Error: Invalid source type '{source}'. Please select from: {', '.join(valid_sources)}" 248 | 249 | if not credential_id: 250 | return "Error: Credential is required to create an inventory source." 251 | 252 | payload = { 253 | "name": name, 254 | "inventory": inventory_id, 255 | "source": source, 256 | "credential": credential_id, 257 | "source_vars": source_vars, 258 | "update_on_launch": update_on_launch, 259 | "timeout": timeout, 260 | } 261 | return await make_request(f"{AAP_URL}/inventory_sources/", method="POST", json=payload) 262 | 263 | @mcp.tool() 264 | async def update_inventory_source(inventory_source_id: int, update_data: dict) -> Any: 265 | """Update an existing inventory source.""" 266 | return await make_request(f"{AAP_URL}/inventory_sources/{inventory_source_id}/", method="PATCH", json=update_data) 267 | 268 | @mcp.tool() 269 | async def delete_inventory_source(inventory_source_id: int) -> Any: 270 | """Delete an inventory source.""" 271 | return await make_request(f"{AAP_URL}/inventory_sources/{inventory_source_id}/", method="DELETE") 272 | 273 | @mcp.tool() 274 | async def sync_inventory_source(inventory_source_id: int) -> Any: 275 | """Manually trigger a sync for an inventory source.""" 276 | return await make_request(f"{AAP_URL}/inventory_sources/{inventory_source_id}/update/", method="POST") 277 | 278 | @mcp.tool() 279 | async def create_inventory( 280 | name: str, 281 | organization_id: int, 282 | description: str = "", 283 | kind: str = "", 284 | host_filter: str = "", 285 | variables: dict = None, 286 | prevent_instance_group_fallback: bool = False, 287 | ) -> Any: 288 | """Create an inventory in Ansible Automation Platform.""" 289 | payload = { 290 | "name": name, 291 | "organization": organization_id, 292 | "description": description, 293 | "kind": kind, 294 | "host_filter": host_filter, 295 | "variables": variables, 296 | "prevent_instance_group_fallback": prevent_instance_group_fallback, 297 | } 298 | return await make_request(f"{AAP_URL}/inventories/", method="POST", json=payload) 299 | 300 | @mcp.tool() 301 | async def delete_inventory(inventory_id: int) -> Any: 302 | """Delete an inventory from Ansible Automation Platform.""" 303 | return await make_request(f"{AAP_URL}/inventories/{inventory_id}/", method="DELETE") 304 | 305 | @mcp.tool() 306 | async def list_job_templates() -> Any: 307 | """List all job templates available in Ansible Automation Platform.""" 308 | return await make_request(f"{AAP_URL}/job_templates/") 309 | 310 | @mcp.tool() 311 | async def get_job_template(template_id: int) -> Any: 312 | """Retrieve details of a specific job template.""" 313 | return await make_request(f"{AAP_URL}/job_templates/{template_id}/") 314 | 315 | @mcp.tool() 316 | async def list_jobs() -> Any: 317 | """List all jobs available in Ansible Automation Platform.""" 318 | return await make_request(f"{AAP_URL}/jobs/") 319 | 320 | @mcp.tool() 321 | async def list_recent_jobs(hours: int = 24) -> Any: 322 | """List all jobs executed in the last specified hours (default 24 hours).""" 323 | from datetime import datetime, timedelta 324 | 325 | time_filter = (datetime.utcnow() - timedelta(hours=hours)).isoformat() + "Z" 326 | return await make_request(f"{AAP_URL}/jobs/?created__gte={time_filter}") 327 | 328 | if __name__ == "__main__": 329 | mcp.run(transport="stdio") 330 | 331 | ``` 332 | 333 | ## Step 4: Configuring Claude Desktop to use your MCP Servers 334 | 335 | In my particular case, I want to take advantage of two MCP Servers: the Ansible MCP Server above and the Kubernetes MCP Server that I found within the [quarkus-mcp-servers](https://github.com/quarkiverse/quarkus-mcp-servers/tree/main/kubernetes) Git repo 336 | 337 | Open the `claude_desktop_config.json` , which on MacOS is located at 338 | 339 | ``` 340 | ~/Library/Application\ Support/Claude/claude_desktop_config.json 341 | ``` 342 | 343 | ``` 344 | { 345 | "mcpServers": { 346 | "ansible": { 347 | "command": "/absolute/path/to/uv", 348 | "args": [ 349 | "--directory", 350 | "/absolute/path/to/ansible_mcp", 351 | "run", 352 | "ansible.py" 353 | ], 354 | "env": { 355 | "AAP_TOKEN": "<aap-token>", 356 | "AAP_URL": "https://<aap-url>/api/controller/v2" 357 | } 358 | }, 359 | "kubernetes": { 360 | "command": "jbang", 361 | "args": [ 362 | "--quiet", 363 | "https://github.com/quarkiverse/quarkus-mcp-servers/blob/main/kubernetes/src/main/java/io/quarkiverse/mcp/servers/kubernetes/MCPServerKubernetes.java" 364 | ] 365 | } 366 | } 367 | } 368 | ``` 369 | Save the file. 370 | 371 | WARNING: Absolute path to your `uv` binary is required. Do a `which uv` on your system to get the full path. 372 | 373 | NOTE: If you need to create the AAP_TOKEN, go to the AAP Dashboard, select Access Management -> Users -> <your_user> -> Tokens -> Create token -> Select the Scope dropdown and select 'Write' and click Create token. 374 | 375 | 376 | 377 | ## Step 5: Re-Launch Claude Desktop 378 | 379 | If you already had Claude Desktop open, relaunch it, otherwise make sure Claude Desktop is picking up the MCP servers. You can verify this by ensuring the hammer icon is launched. 380 | 381 |  382 | 383 | NOTE: The number next to the hammer will vary based up on the amount of MCP tools available. 384 | 385 | Once you click on the hammer icon, you can see a list of tools. Below is an example. 386 | 387 |  388 | 389 | ## Step 6: Test your Environment 390 | 391 | Now with everything setup, see if you can interact with your Ansible Automation Platform and OpenShift cluster. 392 | 393 | Feel free to ask it questions such as: 394 | 395 | * How many Job Templates are available? 396 | * How many VMs are on my OpenShift cluster? 397 | 398 | NOTE: It is very likely you will need to take advantage of the Claude Desktop Pro Plan in order to get the full functionality. 399 | 400 | ## References 401 | 402 | [Claude Desktop Quickstart for Server Developers](https://modelcontextprotocol.io/quickstart/server) 403 | 404 | 405 | ## BONUS: Adding Event Driven Ansible MCP Server 406 | 407 | If you have setup Event Driven Ansible, you can take advantage of the Event Driven Ansible MCP Server below. The instructions are similar to the above. 408 | 409 | * Create an `eda.py` and store it in your `/absolute/path/to/ansible_mcp` 410 | * Update your `claude_desktop_config.json` 411 | * Restart your Claude Desktop and verify the hammer has picked up your new MCP tools 412 | 413 | The two files are listed below for easy copy/paste. 414 | 415 | ### claude_desktop_config.json 416 | ``` 417 | { 418 | "mcpServers": { 419 | "ansible": { 420 | "command": "/absolute/path/to/uv", 421 | "args": [ 422 | "--directory", 423 | "/absolute/path/to/ansible_mcp", 424 | "run", 425 | "ansible.py" 426 | ], 427 | "env": { 428 | "AAP_TOKEN": "<aap-token>", 429 | "AAP_URL": "https://<aap-url>/api/controller/v2" 430 | } 431 | }, 432 | "kubernetes": { 433 | "command": "jbang", 434 | "args": [ 435 | "--quiet", 436 | "https://github.com/quarkiverse/quarkus-mcp-servers/blob/main/kubernetes/src/main/java/io/quarkiverse/mcp/servers/kubernetes/MCPServerKubernetes.java" 437 | ] 438 | }, 439 | "eda": { 440 | "command": "/absolute/path/to/uv", 441 | "args": [ 442 | "--directory", 443 | "/absolute/path/to/ansible_mcp", 444 | "run", 445 | "eda.py" 446 | ], 447 | "env": { 448 | "EDA_TOKEN": "<EDA_TOKEN>", 449 | "EDA_URL": "https://<aap-url>/api/eda/v1" 450 | } 451 | } 452 | } 453 | } 454 | ``` 455 | 456 | WARNING: Absolute path to your `uv` binary is required. Do a `which uv` on your system to get the full path. 457 | 458 | NOTE: An EDA Token can be generated from the AAP Dashboard. 459 | 460 | ### eda.py MCP Server 461 | 462 | ``` 463 | import os 464 | import httpx 465 | from mcp.server.fastmcp import FastMCP 466 | from typing import Any, Dict 467 | 468 | # Environment variables for authentication 469 | EDA_URL = os.getenv("EDA_URL") 470 | EDA_TOKEN = os.getenv("EDA_TOKEN") 471 | 472 | if not EDA_TOKEN: 473 | raise ValueError("EDA_TOKEN is required") 474 | 475 | # Headers for API authentication 476 | HEADERS = { 477 | "Authorization": f"Bearer {EDA_TOKEN}", 478 | "Content-Type": "application/json" 479 | } 480 | 481 | # Initialize FastMCP 482 | mcp = FastMCP("eda") 483 | 484 | async def make_request(url: str, method: str = "GET", json: Dict = None) -> Any: 485 | """Helper function to make authenticated API requests to EDA.""" 486 | async with httpx.AsyncClient() as client: 487 | response = await client.request(method, url, headers=HEADERS, json=json) 488 | if response.status_code not in [200, 201, 204]: 489 | return f"Error {response.status_code}: {response.text}" 490 | return response.json() if "application/json" in response.headers.get("Content-Type", "") else response.text 491 | 492 | @mcp.tool() 493 | async def list_activations() -> Any: 494 | """List all activations in Event-Driven Ansible.""" 495 | return await make_request(f"{EDA_URL}/activations/") 496 | 497 | @mcp.tool() 498 | async def get_activation(activation_id: int) -> Any: 499 | """Get details of a specific activation.""" 500 | return await make_request(f"{EDA_URL}/activations/{activation_id}/") 501 | 502 | @mcp.tool() 503 | async def create_activation(payload: Dict) -> Any: 504 | """Create a new activation.""" 505 | return await make_request(f"{EDA_URL}/activations/", method="POST", json=payload) 506 | 507 | @mcp.tool() 508 | async def disable_activation(activation_id: int) -> Any: 509 | """Disable an activation.""" 510 | return await make_request(f"{EDA_URL}/activations/{activation_id}/disable/", method="POST") 511 | 512 | @mcp.tool() 513 | async def enable_activation(activation_id: int) -> Any: 514 | """Enable an activation.""" 515 | return await make_request(f"{EDA_URL}/activations/{activation_id}/enable/", method="POST") 516 | 517 | @mcp.tool() 518 | async def restart_activation(activation_id: int) -> Any: 519 | """Restart an activation.""" 520 | return await make_request(f"{EDA_URL}/activations/{activation_id}/restart/", method="POST") 521 | 522 | @mcp.tool() 523 | async def delete_activation(activation_id: int) -> Any: 524 | """Delete an activation.""" 525 | return await make_request(f"{EDA_URL}/activations/{activation_id}/", method="DELETE") 526 | 527 | @mcp.tool() 528 | async def list_decision_environments() -> Any: 529 | """List all decision environments.""" 530 | return await make_request(f"{EDA_URL}/decision-environments/") 531 | 532 | @mcp.tool() 533 | async def create_decision_environment(payload: Dict) -> Any: 534 | """Create a new decision environment.""" 535 | return await make_request(f"{EDA_URL}/decision-environments/", method="POST", json=payload) 536 | 537 | @mcp.tool() 538 | async def list_rulebooks() -> Any: 539 | """List all rulebooks in EDA.""" 540 | return await make_request(f"{EDA_URL}/rulebooks/") 541 | 542 | @mcp.tool() 543 | async def get_rulebook(rulebook_id: int) -> Any: 544 | """Retrieve details of a specific rulebook.""" 545 | return await make_request(f"{EDA_URL}/rulebooks/{rulebook_id}/") 546 | 547 | @mcp.tool() 548 | async def list_event_streams() -> Any: 549 | """List all event streams.""" 550 | return await make_request(f"{EDA_URL}/event-streams/") 551 | 552 | if __name__ == "__main__": 553 | mcp.run(transport="stdio") 554 | ``` 555 | 556 | ``` -------------------------------------------------------------------------------- /claude_desktop_config.json: -------------------------------------------------------------------------------- ```json 1 | { 2 | "mcpServers": { 3 | "ansible": { 4 | "command": "/absolute/path/to/uv", 5 | "args": [ 6 | "--directory", 7 | "/absolute/path/to/ansible_mcp", 8 | "run", 9 | "ansible.py" 10 | ], 11 | "env": { 12 | "AAP_TOKEN": "<aap-token>", 13 | "AAP_URL": "https://<my-automation-controller>/api/controller/v2" 14 | } 15 | }, 16 | "kubernetes": { 17 | "command": "jbang", 18 | "args": [ 19 | "--quiet", 20 | "https://github.com/quarkiverse/quarkus-mcp-servers/blob/main/kubernetes/src/main/java/io/quarkiverse/mcp/servers/kubernetes/MCPServerKubernetes.java" 21 | ] 22 | } 23 | } 24 | } 25 | ``` -------------------------------------------------------------------------------- /mcp_server/deployment.yaml: -------------------------------------------------------------------------------- ```yaml 1 | kind: Deployment 2 | apiVersion: apps/v1 3 | metadata: 4 | name: ansible-mcp-server 5 | spec: 6 | selector: 7 | matchLabels: 8 | app: ansible-mcp-server 9 | replicas: 1 10 | template: 11 | metadata: 12 | labels: 13 | app: ansible-mcp-server 14 | spec: 15 | containers: 16 | - name: ansible-mcp-server 17 | image: quay.io/rcook/ansible-mcp:amd 18 | ports: 19 | - containerPort: 8000 20 | protocol: TCP 21 | - containerPort: 8080 22 | protocol: TCP 23 | env: 24 | - name: AAP_TOKEN 25 | valueFrom: 26 | secretKeyRef: 27 | name: aap 28 | key: token 29 | - name: AAP_URL 30 | valueFrom: 31 | secretKeyRef: 32 | name: aap 33 | key: url 34 | resources: {} 35 | ``` -------------------------------------------------------------------------------- /eda.py: -------------------------------------------------------------------------------- ```python 1 | import os 2 | import httpx 3 | from mcp.server.fastmcp import FastMCP 4 | from typing import Any, Dict 5 | 6 | # Environment variables for authentication 7 | EDA_URL = os.getenv("EDA_URL") 8 | EDA_TOKEN = os.getenv("EDA_TOKEN") 9 | 10 | if not EDA_TOKEN: 11 | raise ValueError("EDA_TOKEN environment variable is required") 12 | 13 | # Headers for API authentication 14 | HEADERS = { 15 | "Authorization": f"Bearer {EDA_TOKEN}", 16 | "Content-Type": "application/json" 17 | } 18 | 19 | # Initialize FastMCP 20 | mcp = FastMCP("eda") 21 | 22 | async def make_request(url: str, method: str = "GET", json: Dict = None) -> Any: 23 | """Helper function to make authenticated API requests to EDA.""" 24 | async with httpx.AsyncClient() as client: 25 | response = await client.request(method, url, headers=HEADERS, json=json) 26 | if response.status_code not in [200, 201, 204]: 27 | return f"Error {response.status_code}: {response.text}" 28 | return response.json() if "application/json" in response.headers.get("Content-Type", "") else response.text 29 | 30 | @mcp.tool() 31 | async def list_activations() -> Any: 32 | """List all activations in Event-Driven Ansible.""" 33 | return await make_request(f"{EDA_URL}/activations/") 34 | 35 | @mcp.tool() 36 | async def get_activation(activation_id: int) -> Any: 37 | """Get details of a specific activation.""" 38 | return await make_request(f"{EDA_URL}/activations/{activation_id}/") 39 | 40 | @mcp.tool() 41 | async def create_activation(payload: Dict) -> Any: 42 | """Create a new activation.""" 43 | return await make_request(f"{EDA_URL}/activations/", method="POST", json=payload) 44 | 45 | @mcp.tool() 46 | async def disable_activation(activation_id: int) -> Any: 47 | """Disable an activation.""" 48 | return await make_request(f"{EDA_URL}/activations/{activation_id}/disable/", method="POST") 49 | 50 | @mcp.tool() 51 | async def enable_activation(activation_id: int) -> Any: 52 | """Enable an activation.""" 53 | return await make_request(f"{EDA_URL}/activations/{activation_id}/enable/", method="POST") 54 | 55 | @mcp.tool() 56 | async def restart_activation(activation_id: int) -> Any: 57 | """Restart an activation.""" 58 | return await make_request(f"{EDA_URL}/activations/{activation_id}/restart/", method="POST") 59 | 60 | @mcp.tool() 61 | async def delete_activation(activation_id: int) -> Any: 62 | """Delete an activation.""" 63 | return await make_request(f"{EDA_URL}/activations/{activation_id}/", method="DELETE") 64 | 65 | @mcp.tool() 66 | async def list_decision_environments() -> Any: 67 | """List all decision environments.""" 68 | return await make_request(f"{EDA_URL}/decision-environments/") 69 | 70 | @mcp.tool() 71 | async def create_decision_environment(payload: Dict) -> Any: 72 | """Create a new decision environment.""" 73 | return await make_request(f"{EDA_URL}/decision-environments/", method="POST", json=payload) 74 | 75 | @mcp.tool() 76 | async def list_rulebooks() -> Any: 77 | """List all rulebooks in EDA.""" 78 | return await make_request(f"{EDA_URL}/rulebooks/") 79 | 80 | @mcp.tool() 81 | async def get_rulebook(rulebook_id: int) -> Any: 82 | """Retrieve details of a specific rulebook.""" 83 | return await make_request(f"{EDA_URL}/rulebooks/{rulebook_id}/") 84 | 85 | @mcp.tool() 86 | async def list_event_streams() -> Any: 87 | """List all event streams.""" 88 | return await make_request(f"{EDA_URL}/event-streams/") 89 | 90 | if __name__ == "__main__": 91 | mcp.run(transport="stdio") 92 | 93 | ``` -------------------------------------------------------------------------------- /ansible.py: -------------------------------------------------------------------------------- ```python 1 | import os 2 | import httpx 3 | from mcp.server.fastmcp import FastMCP 4 | from typing import Any 5 | 6 | # Environment variables for authentication 7 | AAP_URL = os.getenv("AAP_URL") 8 | AAP_TOKEN = os.getenv("AAP_TOKEN") 9 | 10 | if not AAP_TOKEN: 11 | raise ValueError("AAP_TOKEN is required") 12 | 13 | # Headers for API authentication 14 | HEADERS = { 15 | "Authorization": f"Bearer {AAP_TOKEN}", 16 | "Content-Type": "application/json" 17 | } 18 | 19 | # Initialize FastMCP 20 | mcp = FastMCP("ansible") 21 | 22 | async def make_request(url: str, method: str = "GET", json: dict = None) -> Any: 23 | """Helper function to make authenticated API requests to AAP.""" 24 | async with httpx.AsyncClient() as client: 25 | response = await client.request(method, url, headers=HEADERS, json=json) 26 | if response.status_code not in [200, 201]: 27 | return f"Error {response.status_code}: {response.text}" 28 | return response.json() if "application/json" in response.headers.get("Content-Type", "") else response.text 29 | 30 | @mcp.tool() 31 | async def list_inventories() -> Any: 32 | """List all inventories in Ansible Automation Platform.""" 33 | return await make_request(f"{AAP_URL}/inventories/") 34 | 35 | @mcp.tool() 36 | async def get_inventory(inventory_id: str) -> Any: 37 | """Get details of a specific inventory by ID.""" 38 | return await make_request(f"{AAP_URL}/inventories/{inventory_id}/") 39 | 40 | @mcp.tool() 41 | async def run_job(template_id: int, extra_vars: dict = {}) -> Any: 42 | """Run a job template by ID, optionally with extra_vars.""" 43 | return await make_request(f"{AAP_URL}/job_templates/{template_id}/launch/", method="POST", json={"extra_vars": extra_vars}) 44 | 45 | @mcp.tool() 46 | async def job_status(job_id: int) -> Any: 47 | """Check the status of a job by ID.""" 48 | return await make_request(f"{AAP_URL}/jobs/{job_id}/") 49 | 50 | @mcp.tool() 51 | async def job_logs(job_id: int) -> str: 52 | """Retrieve logs for a job.""" 53 | return await make_request(f"{AAP_URL}/jobs/{job_id}/stdout/") 54 | 55 | @mcp.tool() 56 | async def create_project( 57 | name: str, 58 | organization_id: int, 59 | source_control_url: str, 60 | source_control_type: str = "git", 61 | description: str = "", 62 | execution_environment_id: int = None, 63 | content_signature_validation_credential_id: int = None, 64 | source_control_branch: str = "", 65 | source_control_refspec: str = "", 66 | source_control_credential_id: int = None, 67 | clean: bool = False, 68 | update_revision_on_launch: bool = False, 69 | delete: bool = False, 70 | allow_branch_override: bool = False, 71 | track_submodules: bool = False, 72 | ) -> Any: 73 | """Create a new project in Ansible Automation Platform.""" 74 | 75 | payload = { 76 | "name": name, 77 | "description": description, 78 | "organization": organization_id, 79 | "scm_type": source_control_type.lower(), # Git is default 80 | "scm_url": source_control_url, 81 | "scm_branch": source_control_branch, 82 | "scm_refspec": source_control_refspec, 83 | "scm_clean": clean, 84 | "scm_delete_on_update": delete, 85 | "scm_update_on_launch": update_revision_on_launch, 86 | "allow_override": allow_branch_override, 87 | "scm_track_submodules": track_submodules, 88 | } 89 | 90 | if execution_environment_id: 91 | payload["execution_environment"] = execution_environment_id 92 | if content_signature_validation_credential_id: 93 | payload["signature_validation_credential"] = content_signature_validation_credential_id 94 | if source_control_credential_id: 95 | payload["credential"] = source_control_credential_id 96 | 97 | return await make_request(f"{AAP_URL}/projects/", method="POST", json=payload) 98 | 99 | @mcp.tool() 100 | async def create_job_template( 101 | name: str, 102 | project_id: int, 103 | playbook: str, 104 | inventory_id: int, 105 | job_type: str = "run", 106 | description: str = "", 107 | credential_id: int = None, 108 | execution_environment_id: int = None, 109 | labels: list[str] = None, 110 | forks: int = 0, 111 | limit: str = "", 112 | verbosity: int = 0, 113 | timeout: int = 0, 114 | job_tags: list[str] = None, 115 | skip_tags: list[str] = None, 116 | extra_vars: dict = None, 117 | privilege_escalation: bool = False, 118 | concurrent_jobs: bool = False, 119 | provisioning_callback: bool = False, 120 | enable_webhook: bool = False, 121 | prevent_instance_group_fallback: bool = False, 122 | ) -> Any: 123 | """Create a new job template in Ansible Automation Platform.""" 124 | 125 | payload = { 126 | "name": name, 127 | "description": description, 128 | "job_type": job_type, 129 | "project": project_id, 130 | "playbook": playbook, 131 | "inventory": inventory_id, 132 | "forks": forks, 133 | "limit": limit, 134 | "verbosity": verbosity, 135 | "timeout": timeout, 136 | "ask_variables_on_launch": bool(extra_vars), 137 | "ask_tags_on_launch": bool(job_tags), 138 | "ask_skip_tags_on_launch": bool(skip_tags), 139 | "ask_credential_on_launch": credential_id is None, 140 | "ask_execution_environment_on_launch": execution_environment_id is None, 141 | "ask_labels_on_launch": labels is None, 142 | "ask_inventory_on_launch": False, # Inventory is required, so not prompting 143 | "ask_job_type_on_launch": False, # Job type is required, so not prompting 144 | "become_enabled": privilege_escalation, 145 | "allow_simultaneous": concurrent_jobs, 146 | "scm_branch": "", 147 | "webhook_service": "github" if enable_webhook else "", 148 | "prevent_instance_group_fallback": prevent_instance_group_fallback, 149 | } 150 | 151 | if credential_id: 152 | payload["credential"] = credential_id 153 | if execution_environment_id: 154 | payload["execution_environment"] = execution_environment_id 155 | if labels: 156 | payload["labels"] = labels 157 | if job_tags: 158 | payload["job_tags"] = job_tags 159 | if skip_tags: 160 | payload["skip_tags"] = skip_tags 161 | if extra_vars: 162 | payload["extra_vars"] = extra_vars 163 | 164 | return await make_request(f"{AAP_URL}/job_templates/", method="POST", json=payload) 165 | 166 | @mcp.tool() 167 | async def list_inventory_sources() -> Any: 168 | """List all inventory sources in Ansible Automation Platform.""" 169 | return await make_request(f"{AAP_URL}/inventory_sources/") 170 | 171 | @mcp.tool() 172 | async def get_inventory_source(inventory_source_id: int) -> Any: 173 | """Get details of a specific inventory source.""" 174 | return await make_request(f"{AAP_URL}/inventory_sources/{inventory_source_id}/") 175 | 176 | @mcp.tool() 177 | async def create_inventory_source( 178 | name: str, 179 | inventory_id: int, 180 | source: str, 181 | credential_id: int, 182 | source_vars: dict = None, 183 | update_on_launch: bool = True, 184 | timeout: int = 0, 185 | ) -> Any: 186 | """Create a dynamic inventory source. Claude will ask for the source type and credential before proceeding.""" 187 | valid_sources = [ 188 | "file", "constructed", "scm", "ec2", "gce", "azure_rm", "vmware", "satellite6", "openstack", 189 | "rhv", "controller", "insights", "terraform", "openshift_virtualization" 190 | ] 191 | 192 | if source not in valid_sources: 193 | return f"Error: Invalid source type '{source}'. Please select from: {', '.join(valid_sources)}" 194 | 195 | if not credential_id: 196 | return "Error: Credential is required to create an inventory source." 197 | 198 | payload = { 199 | "name": name, 200 | "inventory": inventory_id, 201 | "source": source, 202 | "credential": credential_id, 203 | "source_vars": source_vars, 204 | "update_on_launch": update_on_launch, 205 | "timeout": timeout, 206 | } 207 | return await make_request(f"{AAP_URL}/inventory_sources/", method="POST", json=payload) 208 | 209 | @mcp.tool() 210 | async def update_inventory_source(inventory_source_id: int, update_data: dict) -> Any: 211 | """Update an existing inventory source.""" 212 | return await make_request(f"{AAP_URL}/inventory_sources/{inventory_source_id}/", method="PATCH", json=update_data) 213 | 214 | @mcp.tool() 215 | async def delete_inventory_source(inventory_source_id: int) -> Any: 216 | """Delete an inventory source.""" 217 | return await make_request(f"{AAP_URL}/inventory_sources/{inventory_source_id}/", method="DELETE") 218 | 219 | @mcp.tool() 220 | async def sync_inventory_source(inventory_source_id: int) -> Any: 221 | """Manually trigger a sync for an inventory source.""" 222 | return await make_request(f"{AAP_URL}/inventory_sources/{inventory_source_id}/update/", method="POST") 223 | 224 | @mcp.tool() 225 | async def create_inventory( 226 | name: str, 227 | organization_id: int, 228 | description: str = "", 229 | kind: str = "", 230 | host_filter: str = "", 231 | variables: dict = None, 232 | prevent_instance_group_fallback: bool = False, 233 | ) -> Any: 234 | """Create an inventory in Ansible Automation Platform.""" 235 | payload = { 236 | "name": name, 237 | "organization": organization_id, 238 | "description": description, 239 | "kind": kind, 240 | "host_filter": host_filter, 241 | "variables": variables, 242 | "prevent_instance_group_fallback": prevent_instance_group_fallback, 243 | } 244 | return await make_request(f"{AAP_URL}/inventories/", method="POST", json=payload) 245 | 246 | @mcp.tool() 247 | async def delete_inventory(inventory_id: int) -> Any: 248 | """Delete an inventory from Ansible Automation Platform.""" 249 | return await make_request(f"{AAP_URL}/inventories/{inventory_id}/", method="DELETE") 250 | 251 | @mcp.tool() 252 | async def list_job_templates() -> Any: 253 | """List all job templates available in Ansible Automation Platform.""" 254 | return await make_request(f"{AAP_URL}/job_templates/") 255 | 256 | @mcp.tool() 257 | async def get_job_template(template_id: int) -> Any: 258 | """Retrieve details of a specific job template.""" 259 | return await make_request(f"{AAP_URL}/job_templates/{template_id}/") 260 | 261 | @mcp.tool() 262 | async def list_jobs() -> Any: 263 | """List all jobs available in Ansible Automation Platform.""" 264 | return await make_request(f"{AAP_URL}/jobs/") 265 | 266 | @mcp.tool() 267 | async def list_recent_jobs(hours: int = 24) -> Any: 268 | """List all jobs executed in the last specified hours (default 24 hours).""" 269 | from datetime import datetime, timedelta 270 | 271 | time_filter = (datetime.utcnow() - timedelta(hours=hours)).isoformat() + "Z" 272 | return await make_request(f"{AAP_URL}/jobs/?created__gte={time_filter}") 273 | 274 | if __name__ == "__main__": 275 | mcp.run(transport="stdio") 276 | ``` -------------------------------------------------------------------------------- /mcp_server/ansible.py: -------------------------------------------------------------------------------- ```python 1 | import os 2 | import httpx 3 | from mcp.server.fastmcp import FastMCP 4 | from typing import Any 5 | 6 | # Environment variables for authentication 7 | AAP_URL = os.getenv("AAP_URL") 8 | AAP_TOKEN = os.getenv("AAP_TOKEN") 9 | 10 | if not AAP_TOKEN: 11 | raise ValueError("AAP_TOKEN is required") 12 | 13 | # Headers for API authentication 14 | HEADERS = { 15 | "Authorization": f"Bearer {AAP_TOKEN}", 16 | "Content-Type": "application/json" 17 | } 18 | 19 | # Initialize FastMCP 20 | mcp = FastMCP("ansible") 21 | 22 | async def make_request(url: str, method: str = "GET", json: dict = None) -> Any: 23 | """Helper function to make authenticated API requests to AAP.""" 24 | async with httpx.AsyncClient() as client: 25 | response = await client.request(method, url, headers=HEADERS, json=json) 26 | if response.status_code not in [200, 201]: 27 | return f"Error {response.status_code}: {response.text}" 28 | return response.json() if "application/json" in response.headers.get("Content-Type", "") else response.text 29 | 30 | @mcp.tool() 31 | async def list_inventories() -> Any: 32 | """List all inventories in Ansible Automation Platform.""" 33 | return await make_request(f"{AAP_URL}/inventories/") 34 | 35 | @mcp.tool() 36 | async def get_inventory(inventory_id: str) -> Any: 37 | """Get details of a specific inventory by ID.""" 38 | return await make_request(f"{AAP_URL}/inventories/{inventory_id}/") 39 | 40 | @mcp.tool() 41 | async def run_job(template_id: int, extra_vars: dict = {}) -> Any: 42 | """Run a job template by ID, optionally with extra_vars.""" 43 | return await make_request(f"{AAP_URL}/job_templates/{template_id}/launch/", method="POST", json={"extra_vars": extra_vars}) 44 | 45 | @mcp.tool() 46 | async def job_status(job_id: int) -> Any: 47 | """Check the status of a job by ID.""" 48 | return await make_request(f"{AAP_URL}/jobs/{job_id}/") 49 | 50 | @mcp.tool() 51 | async def job_logs(job_id: int) -> str: 52 | """Retrieve logs for a job.""" 53 | return await make_request(f"{AAP_URL}/jobs/{job_id}/stdout/") 54 | 55 | @mcp.tool() 56 | async def create_project( 57 | name: str, 58 | organization_id: int, 59 | source_control_url: str, 60 | source_control_type: str = "git", 61 | description: str = "", 62 | execution_environment_id: int = None, 63 | content_signature_validation_credential_id: int = None, 64 | source_control_branch: str = "", 65 | source_control_refspec: str = "", 66 | source_control_credential_id: int = None, 67 | clean: bool = False, 68 | update_revision_on_launch: bool = False, 69 | delete: bool = False, 70 | allow_branch_override: bool = False, 71 | track_submodules: bool = False, 72 | ) -> Any: 73 | """Create a new project in Ansible Automation Platform.""" 74 | 75 | payload = { 76 | "name": name, 77 | "description": description, 78 | "organization": organization_id, 79 | "scm_type": source_control_type.lower(), # Git is default 80 | "scm_url": source_control_url, 81 | "scm_branch": source_control_branch, 82 | "scm_refspec": source_control_refspec, 83 | "scm_clean": clean, 84 | "scm_delete_on_update": delete, 85 | "scm_update_on_launch": update_revision_on_launch, 86 | "allow_override": allow_branch_override, 87 | "scm_track_submodules": track_submodules, 88 | } 89 | 90 | if execution_environment_id: 91 | payload["execution_environment"] = execution_environment_id 92 | if content_signature_validation_credential_id: 93 | payload["signature_validation_credential"] = content_signature_validation_credential_id 94 | if source_control_credential_id: 95 | payload["credential"] = source_control_credential_id 96 | 97 | return await make_request(f"{AAP_URL}/projects/", method="POST", json=payload) 98 | 99 | @mcp.tool() 100 | async def create_job_template( 101 | name: str, 102 | project_id: int, 103 | playbook: str, 104 | inventory_id: int, 105 | job_type: str = "run", 106 | description: str = "", 107 | credential_id: int = None, 108 | execution_environment_id: int = None, 109 | labels: list[str] = None, 110 | forks: int = 0, 111 | limit: str = "", 112 | verbosity: int = 0, 113 | timeout: int = 0, 114 | job_tags: list[str] = None, 115 | skip_tags: list[str] = None, 116 | extra_vars: dict = None, 117 | privilege_escalation: bool = False, 118 | concurrent_jobs: bool = False, 119 | provisioning_callback: bool = False, 120 | enable_webhook: bool = False, 121 | prevent_instance_group_fallback: bool = False, 122 | ) -> Any: 123 | """Create a new job template in Ansible Automation Platform.""" 124 | 125 | payload = { 126 | "name": name, 127 | "description": description, 128 | "job_type": job_type, 129 | "project": project_id, 130 | "playbook": playbook, 131 | "inventory": inventory_id, 132 | "forks": forks, 133 | "limit": limit, 134 | "verbosity": verbosity, 135 | "timeout": timeout, 136 | "ask_variables_on_launch": bool(extra_vars), 137 | "ask_tags_on_launch": bool(job_tags), 138 | "ask_skip_tags_on_launch": bool(skip_tags), 139 | "ask_credential_on_launch": credential_id is None, 140 | "ask_execution_environment_on_launch": execution_environment_id is None, 141 | "ask_labels_on_launch": labels is None, 142 | "ask_inventory_on_launch": False, # Inventory is required, so not prompting 143 | "ask_job_type_on_launch": False, # Job type is required, so not prompting 144 | "become_enabled": privilege_escalation, 145 | "allow_simultaneous": concurrent_jobs, 146 | "scm_branch": "", 147 | "webhook_service": "github" if enable_webhook else "", 148 | "prevent_instance_group_fallback": prevent_instance_group_fallback, 149 | } 150 | 151 | if credential_id: 152 | payload["credential"] = credential_id 153 | if execution_environment_id: 154 | payload["execution_environment"] = execution_environment_id 155 | if labels: 156 | payload["labels"] = labels 157 | if job_tags: 158 | payload["job_tags"] = job_tags 159 | if skip_tags: 160 | payload["skip_tags"] = skip_tags 161 | if extra_vars: 162 | payload["extra_vars"] = extra_vars 163 | 164 | return await make_request(f"{AAP_URL}/job_templates/", method="POST", json=payload) 165 | 166 | @mcp.tool() 167 | async def list_inventory_sources() -> Any: 168 | """List all inventory sources in Ansible Automation Platform.""" 169 | return await make_request(f"{AAP_URL}/inventory_sources/") 170 | 171 | @mcp.tool() 172 | async def get_inventory_source(inventory_source_id: int) -> Any: 173 | """Get details of a specific inventory source.""" 174 | return await make_request(f"{AAP_URL}/inventory_sources/{inventory_source_id}/") 175 | 176 | @mcp.tool() 177 | async def create_inventory_source( 178 | name: str, 179 | inventory_id: int, 180 | source: str, 181 | credential_id: int, 182 | source_vars: dict = None, 183 | update_on_launch: bool = True, 184 | timeout: int = 0, 185 | ) -> Any: 186 | """Create a dynamic inventory source. Claude will ask for the source type and credential before proceeding.""" 187 | valid_sources = [ 188 | "file", "constructed", "scm", "ec2", "gce", "azure_rm", "vmware", "satellite6", "openstack", 189 | "rhv", "controller", "insights", "terraform", "openshift_virtualization" 190 | ] 191 | 192 | if source not in valid_sources: 193 | return f"Error: Invalid source type '{source}'. Please select from: {', '.join(valid_sources)}" 194 | 195 | if not credential_id: 196 | return "Error: Credential is required to create an inventory source." 197 | 198 | payload = { 199 | "name": name, 200 | "inventory": inventory_id, 201 | "source": source, 202 | "credential": credential_id, 203 | "source_vars": source_vars, 204 | "update_on_launch": update_on_launch, 205 | "timeout": timeout, 206 | } 207 | return await make_request(f"{AAP_URL}/inventory_sources/", method="POST", json=payload) 208 | 209 | @mcp.tool() 210 | async def update_inventory_source(inventory_source_id: int, update_data: dict) -> Any: 211 | """Update an existing inventory source.""" 212 | return await make_request(f"{AAP_URL}/inventory_sources/{inventory_source_id}/", method="PATCH", json=update_data) 213 | 214 | @mcp.tool() 215 | async def delete_inventory_source(inventory_source_id: int) -> Any: 216 | """Delete an inventory source.""" 217 | return await make_request(f"{AAP_URL}/inventory_sources/{inventory_source_id}/", method="DELETE") 218 | 219 | @mcp.tool() 220 | async def sync_inventory_source(inventory_source_id: int) -> Any: 221 | """Manually trigger a sync for an inventory source.""" 222 | return await make_request(f"{AAP_URL}/inventory_sources/{inventory_source_id}/update/", method="POST") 223 | 224 | @mcp.tool() 225 | async def create_inventory( 226 | name: str, 227 | organization_id: int, 228 | description: str = "", 229 | kind: str = "", 230 | host_filter: str = "", 231 | variables: dict = None, 232 | prevent_instance_group_fallback: bool = False, 233 | ) -> Any: 234 | """Create an inventory in Ansible Automation Platform.""" 235 | payload = { 236 | "name": name, 237 | "organization": organization_id, 238 | "description": description, 239 | "kind": kind, 240 | "host_filter": host_filter, 241 | "variables": variables, 242 | "prevent_instance_group_fallback": prevent_instance_group_fallback, 243 | } 244 | return await make_request(f"{AAP_URL}/inventories/", method="POST", json=payload) 245 | 246 | @mcp.tool() 247 | async def delete_inventory(inventory_id: int) -> Any: 248 | """Delete an inventory from Ansible Automation Platform.""" 249 | return await make_request(f"{AAP_URL}/inventories/{inventory_id}/", method="DELETE") 250 | 251 | @mcp.tool() 252 | async def list_job_templates() -> Any: 253 | """List all job templates available in Ansible Automation Platform.""" 254 | return await make_request(f"{AAP_URL}/job_templates/") 255 | 256 | @mcp.tool() 257 | async def get_job_template(template_id: int) -> Any: 258 | """Retrieve details of a specific job template.""" 259 | return await make_request(f"{AAP_URL}/job_templates/{template_id}/") 260 | 261 | @mcp.tool() 262 | async def list_jobs() -> Any: 263 | """List all jobs available in Ansible Automation Platform.""" 264 | return await make_request(f"{AAP_URL}/jobs/") 265 | 266 | @mcp.tool() 267 | async def list_recent_jobs(hours: int = 24) -> Any: 268 | """List all jobs executed in the last specified hours (default 24 hours).""" 269 | from datetime import datetime, timedelta 270 | 271 | time_filter = (datetime.utcnow() - timedelta(hours=hours)).isoformat() + "Z" 272 | return await make_request(f"{AAP_URL}/jobs/?created__gte={time_filter}") 273 | 274 | if __name__ == "__main__": 275 | # Change from stdio to sse transport 276 | mcp.run(transport="sse") 277 | ``` -------------------------------------------------------------------------------- /app.py: -------------------------------------------------------------------------------- ```python 1 | import streamlit as st 2 | from llama_stack_client import LlamaStackClient 3 | from llama_stack_client.lib.agents.agent import Agent 4 | from llama_stack_client.types.agent_create_params import AgentConfig 5 | import os 6 | import json 7 | from datetime import datetime 8 | 9 | # Initialize LlamaStack client 10 | base_url = os.getenv("BASE_URL", "http://localhost:8321") 11 | client = LlamaStackClient(base_url=base_url) 12 | 13 | # Page configuration 14 | st.set_page_config( 15 | page_title="Llama-stack Chat", 16 | page_icon="🦙", 17 | layout="wide", 18 | ) 19 | 20 | # Styling for code blocks 21 | st.markdown(""" 22 | <style> 23 | code { 24 | background-color: #f0f2f6; 25 | border-radius: 3px; 26 | padding: 0.2em 0.4em; 27 | } 28 | pre { 29 | background-color: #f0f2f6; 30 | border-radius: 5px; 31 | padding: 0.5em; 32 | } 33 | .chat-message { 34 | padding: 1.5rem; 35 | border-radius: 0.5rem; 36 | margin-bottom: 1rem; 37 | } 38 | .blinking-cursor { 39 | animation: blink 1s step-end infinite; 40 | } 41 | @keyframes blink { 42 | 50% { opacity: 0; } 43 | } 44 | </style> 45 | """, unsafe_allow_html=True) 46 | 47 | # Get providers with valid credentials (simplified approach) 48 | @st.cache_data(ttl=300) 49 | def get_configured_providers(): 50 | try: 51 | # This is a simplified approach that assumes Anthropic is configured 52 | # In a real implementation, you would check LLama Stack's configuration 53 | return ["anthropic"] 54 | except Exception as e: 55 | st.error(f"Error checking configured providers: {e}") 56 | return ["anthropic"] # Fallback to Anthropic 57 | 58 | # Get available models from configured providers 59 | @st.cache_data(ttl=300) 60 | def get_available_models(): 61 | try: 62 | models = client.models.list() 63 | configured_providers = get_configured_providers() 64 | 65 | # Filter for LLM models from configured providers 66 | available_models = [ 67 | model.identifier 68 | for model in models 69 | if model.model_type == "llm" and model.provider_id in configured_providers 70 | ] 71 | 72 | return available_models 73 | except Exception as e: 74 | st.error(f"Error fetching models: {e}") 75 | return ["anthropic/claude-3-7-sonnet-latest"] # Fallback default 76 | 77 | # Get all available toolgroups 78 | @st.cache_data(ttl=300) 79 | def get_all_toolgroups(): 80 | try: 81 | toolgroups = client.toolgroups.list() 82 | return [toolgroup.identifier for toolgroup in toolgroups] 83 | except Exception as e: 84 | st.error(f"Error fetching toolgroups: {e}") 85 | return ["mcp::ansible"] # Fallback default 86 | 87 | # Initialize session state 88 | if "messages" not in st.session_state: 89 | st.session_state.messages = [] 90 | if "system_instruction" not in st.session_state: 91 | st.session_state.system_instruction = "You are a helpful assistant that can access various tools to help the user." 92 | if "show_save_dialog" not in st.session_state: 93 | st.session_state.show_save_dialog = False 94 | if "saved_chats" not in st.session_state: 95 | st.session_state.saved_chats = "{}" 96 | if "chat_updated" not in st.session_state: 97 | st.session_state.chat_updated = False 98 | 99 | # Function to check if there are messages 100 | def has_messages(): 101 | return len(st.session_state.messages) > 0 102 | 103 | # Streamlit UI 104 | st.title("Llama-stack Chat") 105 | st.markdown("Chat with LLama Stack and its toolgroups") 106 | 107 | # Sidebar configurations 108 | with st.sidebar: 109 | st.header("Configuration") 110 | 111 | # Model selection 112 | available_models = get_available_models() 113 | selected_model = st.selectbox("Select Model", available_models) 114 | 115 | # Get all toolgroups automatically and display as collapsible 116 | all_toolgroups = get_all_toolgroups() 117 | with st.expander(f"{len(all_toolgroups)} Toolgroups Loaded"): 118 | for toolgroup in all_toolgroups: 119 | st.caption(f"• {toolgroup}") 120 | 121 | # System instructions 122 | with st.expander("System Instructions", expanded=False): 123 | new_instruction = st.text_area( 124 | "Customize how the assistant behaves:", 125 | st.session_state.system_instruction, 126 | height=100 127 | ) 128 | if new_instruction != st.session_state.system_instruction: 129 | st.session_state.system_instruction = new_instruction 130 | st.toast("System instructions updated") 131 | 132 | # Collapsible Query Context section 133 | with st.expander("Query Context", expanded=False): 134 | query_context = st.text_area("Add background information for this query:", "", height=150) 135 | st.caption("This information will be included with each of your queries but won't be visible in the chat.") 136 | 137 | # Temperature in collapsible 138 | with st.expander("Temperature", expanded=False): 139 | temperature = st.slider("Temperature", min_value=0.0, max_value=1.0, value=0.7, step=0.1) 140 | top_p = st.slider("Top P", min_value=0.0, max_value=1.0, value=0.9, step=0.1) 141 | 142 | # Chat history management 143 | st.header("Chat Management") 144 | 145 | # Using two separate buttons instead of columns for better visibility 146 | clear_col, save_col = st.columns(2) 147 | 148 | # Clear chat button 149 | with clear_col: 150 | if st.button("🗑️ Clear Chat", key="clear_chat"): 151 | st.session_state.messages = [] 152 | st.session_state.chat_updated = True 153 | st.rerun() 154 | 155 | # Save button - disabled when no messages 156 | with save_col: 157 | if st.button("💾 Save Chat", key="save_chat", disabled=not has_messages()): 158 | st.session_state.show_save_dialog = True 159 | 160 | # Save dialog - shown when save button is clicked 161 | if st.session_state.show_save_dialog: 162 | st.text_input("Conversation name:", key="save_name") 163 | save_confirm, cancel = st.columns(2) 164 | 165 | with save_confirm: 166 | if st.button("Confirm Save", key="confirm_save"): 167 | if st.session_state.save_name: 168 | saved_chats = json.loads(st.session_state.saved_chats) 169 | saved_chats[st.session_state.save_name] = st.session_state.messages 170 | st.session_state.saved_chats = json.dumps(saved_chats) 171 | st.session_state.show_save_dialog = False 172 | st.toast(f"Saved conversation: {st.session_state.save_name}") 173 | st.rerun() 174 | else: 175 | st.warning("Please enter a name for the conversation") 176 | 177 | with cancel: 178 | if st.button("Cancel", key="cancel_save"): 179 | st.session_state.show_save_dialog = False 180 | st.rerun() 181 | 182 | # Export chat button - disabled when no messages 183 | if st.button("📥 Export Chat", key="export_chat", disabled=not has_messages()): 184 | chat_export = "" 185 | for msg in st.session_state.messages: 186 | prefix = "🧑" if msg["role"] == "user" else "🤖" 187 | chat_export += f"{prefix} **{msg['role'].capitalize()}**: {msg['content']}\n\n" 188 | 189 | # Create download link 190 | timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") 191 | filename = f"chat_export_{timestamp}.md" 192 | st.download_button( 193 | label="Download Chat", 194 | data=chat_export, 195 | file_name=filename, 196 | mime="text/markdown", 197 | key="download_chat" 198 | ) 199 | 200 | # Load saved conversations 201 | st.header("Saved Conversations") 202 | 203 | # Load saved conversation 204 | saved_chats = json.loads(st.session_state.saved_chats) 205 | if saved_chats: 206 | chat_names = list(saved_chats.keys()) 207 | selected_chat = st.selectbox("Select a saved conversation:", [""] + chat_names) 208 | if selected_chat and st.button("📂 Load Conversation"): 209 | st.session_state.messages = saved_chats[selected_chat] 210 | st.session_state.chat_updated = True 211 | st.toast(f"Loaded conversation: {selected_chat}") 212 | st.rerun() 213 | else: 214 | st.caption("No saved conversations yet") 215 | 216 | # Display chat history 217 | for message in st.session_state.messages: 218 | with st.chat_message(message["role"]): 219 | # Apply syntax highlighting for code blocks 220 | content = message["content"] 221 | st.markdown(content) 222 | 223 | # Input for new messages 224 | prompt = st.chat_input("Ask something...") 225 | if prompt: 226 | full_response = "" 227 | agent_config = AgentConfig( 228 | model=selected_model, 229 | instructions=st.session_state.system_instruction, 230 | sampling_params={ 231 | "strategy": {"type": "top_p", "temperature": temperature, "top_p": top_p}, 232 | }, 233 | toolgroups=all_toolgroups, # Use all toolgroups automatically 234 | tool_choice="auto", 235 | input_shields=[], 236 | output_shields=[], 237 | enable_session_persistence=True, 238 | ) 239 | 240 | try: 241 | agent = Agent(client, agent_config) 242 | session_id = agent.create_session("chat-session") 243 | 244 | # Add user input to chat history 245 | st.session_state.messages.append({"role": "user", "content": prompt}) 246 | with st.chat_message("user"): 247 | st.markdown(prompt) 248 | 249 | # Get response from LlamaStack API 250 | with st.chat_message("assistant"): 251 | message_placeholder = st.empty() 252 | 253 | # Add query context if provided 254 | user_message = prompt 255 | if query_context: 256 | user_message += f"\n\nContext: {query_context}" 257 | 258 | response = agent.create_turn( 259 | messages=[{"role": "user", "content": user_message}], 260 | session_id=session_id, 261 | ) 262 | 263 | for chunk in response: 264 | if hasattr(chunk, 'event') and hasattr(chunk.event, 'payload'): 265 | payload = chunk.event.payload 266 | if hasattr(payload, 'event_type') and payload.event_type == "step_progress": 267 | if hasattr(payload, 'delta') and hasattr(payload.delta, 'type') and payload.delta.type == "text": 268 | full_response += payload.delta.text 269 | message_placeholder.markdown(full_response + "▌") 270 | 271 | message_placeholder.markdown(full_response) 272 | st.session_state.messages.append({"role": "assistant", "content": full_response}) 273 | 274 | # Set flag to indicate chat has been updated 275 | st.session_state.chat_updated = True 276 | 277 | # Force a rerun to update the UI state (including button states) 278 | st.rerun() 279 | 280 | except Exception as e: 281 | st.error(f"Error: {str(e)}") 282 | 283 | # Reset the chat_updated flag after processing 284 | if st.session_state.chat_updated: 285 | st.session_state.chat_updated = False 286 | ``` -------------------------------------------------------------------------------- /redhat_insights_mcp.py: -------------------------------------------------------------------------------- ```python 1 | """ 2 | Red Hat Insights MCP Server 3 | 4 | This server requires a Red Hat service account with client credentials. 5 | 6 | Setup: 7 | 1. Create a service account in Red Hat Console (console.redhat.com) 8 | 2. Assign appropriate permissions via User Access → Groups 9 | 3. Set environment variables: 10 | export INSIGHTS_CLIENT_ID="your-client-id" 11 | export INSIGHTS_CLIENT_SECRET="your-client-secret" 12 | 13 | Optional: 14 | export INSIGHTS_BASE_URL="https://console.redhat.com/api" 15 | export SSO_URL="https://sso.redhat.com/auth/realms/redhat-external/protocol/openid-connect/token" 16 | """ 17 | 18 | import os 19 | import httpx 20 | from mcp.server.fastmcp import FastMCP 21 | from typing import Any, Optional 22 | from datetime import datetime, timedelta 23 | 24 | # Environment variables for authentication 25 | INSIGHTS_BASE_URL = os.getenv("INSIGHTS_BASE_URL", "https://console.redhat.com/api") 26 | INSIGHTS_CLIENT_ID = os.getenv("INSIGHTS_CLIENT_ID") 27 | INSIGHTS_CLIENT_SECRET = os.getenv("INSIGHTS_CLIENT_SECRET") 28 | SSO_URL = os.getenv("SSO_URL", "https://sso.redhat.com/auth/realms/redhat-external/protocol/openid-connect/token") 29 | 30 | if not INSIGHTS_CLIENT_ID or not INSIGHTS_CLIENT_SECRET: 31 | raise ValueError("INSIGHTS_CLIENT_ID and INSIGHTS_CLIENT_SECRET are required") 32 | 33 | # Global variable to store the access token 34 | _access_token = None 35 | _token_expires_at = None 36 | 37 | # Initialize FastMCP 38 | mcp = FastMCP("insights") 39 | 40 | async def get_access_token() -> str: 41 | """Get or refresh the access token using client credentials.""" 42 | global _access_token, _token_expires_at 43 | 44 | # Check if we have a valid token 45 | if _access_token and _token_expires_at and datetime.utcnow() < _token_expires_at: 46 | return _access_token 47 | 48 | # Request new token 49 | async with httpx.AsyncClient() as client: 50 | response = await client.post( 51 | SSO_URL, 52 | headers={"Content-Type": "application/x-www-form-urlencoded"}, 53 | data={ 54 | "grant_type": "client_credentials", 55 | "scope": "api.console", 56 | "client_id": INSIGHTS_CLIENT_ID, 57 | "client_secret": INSIGHTS_CLIENT_SECRET 58 | } 59 | ) 60 | 61 | if response.status_code != 200: 62 | raise Exception(f"Failed to get access token: {response.status_code} {response.text}") 63 | 64 | token_data = response.json() 65 | _access_token = token_data["access_token"] 66 | # Set expiration time with some buffer (subtract 60 seconds) 67 | expires_in = token_data.get("expires_in", 300) # Default to 5 minutes 68 | _token_expires_at = datetime.utcnow() + timedelta(seconds=expires_in - 60) 69 | 70 | return _access_token 71 | 72 | async def make_request(url: str, method: str = "GET", json: dict = None, params: dict = None) -> Any: 73 | """Helper function to make authenticated API requests to Red Hat Insights.""" 74 | token = await get_access_token() 75 | headers = { 76 | "Authorization": f"Bearer {token}", 77 | "Content-Type": "application/json" 78 | } 79 | 80 | async with httpx.AsyncClient() as client: 81 | response = await client.request(method, url, headers=headers, json=json, params=params) 82 | 83 | if response.status_code not in [200, 201, 204]: 84 | return f"Error {response.status_code}: {response.text}" 85 | 86 | return response.json() if "application/json" in response.headers.get("Content-Type", "") else response.text 87 | 88 | # Authentication Test 89 | @mcp.tool() 90 | async def test_authentication() -> Any: 91 | """Test authentication with Red Hat Insights using service account credentials.""" 92 | try: 93 | token = await get_access_token() 94 | # Test with a simple API call 95 | result = await make_request(f"{INSIGHTS_BASE_URL}/inventory/v1/hosts?limit=1") 96 | return {"status": "success", "message": "Authentication successful", "sample_data": result} 97 | except Exception as e: 98 | return {"status": "error", "message": f"Authentication failed: {str(e)}"} 99 | 100 | # Host Inventory Management Tools 101 | @mcp.tool() 102 | async def list_systems(limit: int = 50, offset: int = 0, display_name: str = None, staleness: str = None) -> Any: 103 | """List all hosts/systems registered with Red Hat Insights. Use staleness='fresh' or 'stale' to filter.""" 104 | params = {"limit": limit, "offset": offset} 105 | if display_name: 106 | params["display_name"] = display_name 107 | if staleness: 108 | params["staleness"] = staleness 109 | return await make_request(f"{INSIGHTS_BASE_URL}/inventory/v1/hosts", params=params) 110 | 111 | @mcp.tool() 112 | async def get_system(system_id: str) -> Any: 113 | """Get details of a specific system by UUID.""" 114 | return await make_request(f"{INSIGHTS_BASE_URL}/inventory/v1/hosts/{system_id}") 115 | 116 | @mcp.tool() 117 | async def get_system_profile(system_id: str, fields: list[str] = None) -> Any: 118 | """Get system profile/facts for a specific system. Specify fields to limit response.""" 119 | url = f"{INSIGHTS_BASE_URL}/inventory/v1/hosts/{system_id}/system_profile" 120 | params = {} 121 | if fields: 122 | for field in fields: 123 | params[f"fields[system_profile]"] = field 124 | return await make_request(url, params=params) 125 | 126 | @mcp.tool() 127 | async def get_system_tags(system_id: str) -> Any: 128 | """Get tags for a specific system.""" 129 | return await make_request(f"{INSIGHTS_BASE_URL}/inventory/v1/hosts/{system_id}/tags") 130 | 131 | @mcp.tool() 132 | async def delete_system(system_id: str) -> Any: 133 | """Remove a system from Red Hat Insights inventory.""" 134 | return await make_request(f"{INSIGHTS_BASE_URL}/inventory/v1/hosts/{system_id}", method="DELETE") 135 | 136 | # Vulnerability Management Tools 137 | @mcp.tool() 138 | async def list_vulnerabilities( 139 | limit: int = 50, 140 | offset: int = 0, 141 | affecting: bool = True, 142 | cvss_score_gte: float = None, 143 | cvss_score_lte: float = None 144 | ) -> Any: 145 | """List vulnerabilities affecting your systems. Set affecting=True to only show CVEs affecting systems.""" 146 | params = {"limit": limit, "offset": offset} 147 | if affecting: 148 | params["affecting"] = "true" 149 | if cvss_score_gte: 150 | params["cvss_score_gte"] = cvss_score_gte 151 | if cvss_score_lte: 152 | params["cvss_score_lte"] = cvss_score_lte 153 | return await make_request(f"{INSIGHTS_BASE_URL}/vulnerability/v1/vulnerabilities/cves", params=params) 154 | 155 | @mcp.tool() 156 | async def get_vulnerability_executive_report() -> Any: 157 | """Get executive vulnerability report with CVE summaries by severity.""" 158 | return await make_request(f"{INSIGHTS_BASE_URL}/vulnerability/v1/report/executive") 159 | 160 | # Patch Management Tools 161 | @mcp.tool() 162 | async def list_advisories( 163 | limit: int = 50, 164 | offset: int = 0, 165 | advisory_type: str = None, 166 | severity: str = None 167 | ) -> Any: 168 | """List available advisories (patches). Export format from patch/v3.""" 169 | params = {"limit": limit, "offset": offset} 170 | if advisory_type: 171 | params["advisory_type"] = advisory_type 172 | if severity: 173 | params["severity"] = severity 174 | return await make_request(f"{INSIGHTS_BASE_URL}/patch/v3/export/advisories", params=params) 175 | 176 | # Compliance Tools 177 | @mcp.tool() 178 | async def list_compliance_policies(limit: int = 50, offset: int = 0) -> Any: 179 | """List SCAP compliance policies.""" 180 | params = {"limit": limit, "offset": offset} 181 | return await make_request(f"{INSIGHTS_BASE_URL}/compliance/v2/policies", params=params) 182 | 183 | @mcp.tool() 184 | async def list_compliance_systems(assigned_or_scanned: bool = True) -> Any: 185 | """List systems associated with SCAP policies.""" 186 | params = {} 187 | if assigned_or_scanned: 188 | params["filter"] = "assigned_or_scanned=true" 189 | return await make_request(f"{INSIGHTS_BASE_URL}/compliance/v2/systems", params=params) 190 | 191 | @mcp.tool() 192 | async def associate_compliance_policy(policy_id: str, system_id: str) -> Any: 193 | """Associate a system with a SCAP compliance policy.""" 194 | return await make_request(f"{INSIGHTS_BASE_URL}/compliance/v2/policies/{policy_id}/systems/{system_id}", method="PATCH") 195 | 196 | @mcp.tool() 197 | async def list_compliance_reports(limit: int = 50, offset: int = 0) -> Any: 198 | """List all compliance reports.""" 199 | params = {"limit": limit, "offset": offset} 200 | return await make_request(f"{INSIGHTS_BASE_URL}/compliance/v2/reports", params=params) 201 | 202 | # Recommendations and Advisor Tools 203 | @mcp.tool() 204 | async def list_recommendations( 205 | category: str = None, 206 | impact: str = None, 207 | limit: int = 50, 208 | offset: int = 0 209 | ) -> Any: 210 | """List available recommendation rules from Advisor.""" 211 | params = {"limit": limit, "offset": offset} 212 | if category: 213 | params["category"] = category 214 | if impact: 215 | params["impact"] = impact 216 | return await make_request(f"{INSIGHTS_BASE_URL}/insights/v1/rule", params=params) 217 | 218 | @mcp.tool() 219 | async def export_rule_hits(has_playbook: bool = None, format: str = "json") -> Any: 220 | """Export all rule hits (recommendations) for systems. Set has_playbook=True for Ansible playbooks.""" 221 | params = {} 222 | if has_playbook: 223 | params["has_playbook"] = "true" 224 | return await make_request(f"{INSIGHTS_BASE_URL}/insights/v1/export/hits", params=params) 225 | 226 | @mcp.tool() 227 | async def get_system_recommendations(system_id: str) -> Any: 228 | """Get recommendation summary for a specific system.""" 229 | return await make_request(f"{INSIGHTS_BASE_URL}/insights/v1/system/{system_id}") 230 | 231 | # Policy Management Tools 232 | @mcp.tool() 233 | async def list_policies(limit: int = 50, offset: int = 0) -> Any: 234 | """List all defined custom policies.""" 235 | params = {"limit": limit, "offset": offset} 236 | return await make_request(f"{INSIGHTS_BASE_URL}/policies/v1/policies", params=params) 237 | 238 | @mcp.tool() 239 | async def create_policy(name: str, description: str, conditions: str, actions: str = "notification", is_enabled: bool = True) -> Any: 240 | """Create a new custom policy. Example conditions: 'arch = \"x86_64\"'""" 241 | payload = { 242 | "name": name, 243 | "description": description, 244 | "conditions": conditions, 245 | "actions": actions, 246 | "isEnabled": is_enabled 247 | } 248 | return await make_request(f"{INSIGHTS_BASE_URL}/policies/v1/policies", method="POST", json=payload) 249 | 250 | @mcp.tool() 251 | async def get_policy_triggers(policy_id: str) -> Any: 252 | """Get systems that triggered a specific policy.""" 253 | return await make_request(f"{INSIGHTS_BASE_URL}/policies/v1/policies/{policy_id}/history/trigger") 254 | 255 | # Remediation Tools 256 | @mcp.tool() 257 | async def list_remediations(limit: int = 50, offset: int = 0) -> Any: 258 | """List all defined remediation plans.""" 259 | params = {"limit": limit, "offset": offset} 260 | return await make_request(f"{INSIGHTS_BASE_URL}/remediations/v1/remediations", params=params) 261 | 262 | @mcp.tool() 263 | async def create_remediation(name: str, issues: list[dict], auto_reboot: bool = False, archived: bool = False) -> Any: 264 | """Create a new remediation plan. Issues should be list of dicts with id, resolution, systems.""" 265 | payload = { 266 | "name": name, 267 | "auto_reboot": auto_reboot, 268 | "archived": archived, 269 | "add": { 270 | "issues": issues 271 | } 272 | } 273 | return await make_request(f"{INSIGHTS_BASE_URL}/remediations/v1/remediations", method="POST", json=payload) 274 | 275 | @mcp.tool() 276 | async def get_remediation_playbook(remediation_id: str) -> Any: 277 | """Get Ansible playbook for a remediation plan.""" 278 | return await make_request(f"{INSIGHTS_BASE_URL}/remediations/v1/remediations/{remediation_id}/playbook") 279 | 280 | @mcp.tool() 281 | async def execute_remediation(remediation_id: str) -> Any: 282 | """Execute a remediation plan.""" 283 | return await make_request(f"{INSIGHTS_BASE_URL}/remediations/v1/remediations/{remediation_id}/playbook_runs", method="POST") 284 | 285 | # Subscription Management 286 | @mcp.tool() 287 | async def list_rhel_subscriptions(product: str = "RHEL for x86", limit: int = 50, offset: int = 0) -> Any: 288 | """List systems with RHEL subscriptions. Product examples: 'RHEL for x86', 'RHEL for x86_64'""" 289 | from urllib.parse import quote 290 | encoded_product = quote(product) 291 | params = {"limit": limit, "offset": offset} 292 | return await make_request(f"{INSIGHTS_BASE_URL}/rhsm-subscriptions/v1/instances/products/{encoded_product}", params=params) 293 | 294 | # Export Tools 295 | @mcp.tool() 296 | async def create_export(name: str, format: str, application: str, resource: str) -> Any: 297 | """Create an export request. Common applications: 'urn:redhat:application:inventory', 'subscriptions'""" 298 | payload = { 299 | "name": name, 300 | "format": format, 301 | "sources": [{ 302 | "application": application, 303 | "resource": resource 304 | }] 305 | } 306 | return await make_request(f"{INSIGHTS_BASE_URL}/export/v1/exports", method="POST", json=payload) 307 | 308 | @mcp.tool() 309 | async def get_export_status(export_id: str) -> Any: 310 | """Get status of an export request.""" 311 | return await make_request(f"{INSIGHTS_BASE_URL}/export/v1/exports/{export_id}/status") 312 | 313 | @mcp.tool() 314 | async def download_export(export_id: str) -> Any: 315 | """Download completed export as ZIP file.""" 316 | return await make_request(f"{INSIGHTS_BASE_URL}/export/v1/exports/{export_id}") 317 | 318 | # Notifications and Integrations 319 | @mcp.tool() 320 | async def list_notification_events(start_date: str = None, end_date: str = None, limit: int = 20, offset: int = 0) -> Any: 321 | """Get notification event history. Dates in YYYY-MM-DD format.""" 322 | params = {"limit": limit, "offset": offset} 323 | if start_date: 324 | params["startDate"] = start_date 325 | if end_date: 326 | params["endDate"] = end_date 327 | return await make_request(f"{INSIGHTS_BASE_URL}/notifications/v1/notifications/events", params=params) 328 | 329 | @mcp.tool() 330 | async def list_integrations() -> Any: 331 | """List configured third-party integrations.""" 332 | return await make_request(f"{INSIGHTS_BASE_URL}/integrations/v1/endpoints") 333 | 334 | # Analytics and Statistics 335 | @mcp.tool() 336 | async def get_insights_overview() -> Any: 337 | """Get overview of systems and basic statistics by querying inventory.""" 338 | # Use inventory endpoint to get basic stats since there's no single stats endpoint 339 | result = await make_request(f"{INSIGHTS_BASE_URL}/inventory/v1/hosts?limit=1") 340 | return result 341 | 342 | # Content Sources and Templates 343 | @mcp.tool() 344 | async def list_repositories(limit: int = 50, offset: int = 0) -> Any: 345 | """List all existing content repositories.""" 346 | params = {"limit": limit, "offset": offset} 347 | return await make_request(f"{INSIGHTS_BASE_URL}/content-sources/v1.0/repositories", params=params) 348 | 349 | @mcp.tool() 350 | async def create_repository(name: str, url: str, distribution_arch: str = "x86_64", distribution_versions: list[str] = None) -> Any: 351 | """Create a new custom repository.""" 352 | payload = { 353 | "name": name, 354 | "url": url, 355 | "distribution_arch": distribution_arch, 356 | "distribution_versions": distribution_versions or ["9"], 357 | "metadata_verification": False, 358 | "module_hotfixes": False, 359 | "snapshot": False 360 | } 361 | return await make_request(f"{INSIGHTS_BASE_URL}/content-sources/v1.0/repositories", method="POST", json=payload) 362 | 363 | @mcp.tool() 364 | async def list_content_templates(limit: int = 50, offset: int = 0) -> Any: 365 | """List all content templates.""" 366 | params = {"limit": limit, "offset": offset} 367 | return await make_request(f"{INSIGHTS_BASE_URL}/content-sources/v1.0/templates", params=params) 368 | 369 | @mcp.tool() 370 | async def create_content_template(name: str, arch: str, version: str, repository_uuids: list[str], description: str = "") -> Any: 371 | """Create a new content template.""" 372 | payload = { 373 | "name": name, 374 | "arch": arch, 375 | "version": version, 376 | "description": description, 377 | "repository_uuids": repository_uuids, 378 | "use_latest": True 379 | } 380 | return await make_request(f"{INSIGHTS_BASE_URL}/content-sources/v1.0/templates", method="POST", json=payload) 381 | 382 | if __name__ == "__main__": 383 | mcp.run(transport="stdio") 384 | ```