#
tokens: 21378/50000 8/8 files
lines: on (toggle) GitHub
raw markdown copy reset
# Directory Structure

```
├── ansible.py
├── app.py
├── claude_desktop_config.json
├── eda.py
├── llama_quick_start_guide.md
├── mcp_server
│   ├── ansible.py
│   ├── Containerfile
│   └── deployment.yaml
├── README.md
└── redhat_insights_mcp.py
```

# Files

--------------------------------------------------------------------------------
/README.md:
--------------------------------------------------------------------------------

```markdown
  1 | # Setup of the Environment for the AI Powered Ansible & OpenShift Automation with Model Context Protocols (MCP) Servers
  2 | 
  3 | ## Overview
  4 | 
  5 | This guide will walk you through setting up the MCP Servers + Claude Desktop portions of the demo that focused on using Claude Desktop to interact with your Ansible Automation Platform and OpenShift Cluster environments. 
  6 | 
  7 | ## Prerequisites
  8 | 
  9 | Ensure you have the following installed. 
 10 | 
 11 | ### Required
 12 | - An Ansible Automation Platform (AAP) environment
 13 | - An OpenShift Cluster with OpenShift Virtualization
 14 | - [Claude Desktop](https://claude.ai/download) installed on your laptop (Pro Plan required for best results)
 15 | - Python 3.10 or higher installed on your laptop
 16 | - Ensure you are authenticated with your OpenShift cluster (e.g. exporting kubeconfig)
 17 | 
 18 | ## Step One: Setup your laptop environment
 19 | 
 20 | Install `uv` and setup your Python project and environment.
 21 | 
 22 | ```
 23 | curl -LsSf https://astral.sh/uv/install.sh | sh
 24 | ```
 25 | 
 26 | Install [jbang](https://www.jbang.dev/download/) which will be used when using the Kubernetes MCP Server. (jbang needs to be installed globally, recommend using the homebrew install pattern. If you install it locally (the curl pattern), Claude won't be able to access it).
 27 | 
 28 | Restart your terminal to ensure that the `uv` and `jbang` command are now available.
 29 | 
 30 | ## Step Two: Create and Setup your Project
 31 | 
 32 | ```
 33 | # Create a new directory for our project
 34 | uv init ansible
 35 | cd ansible
 36 | 
 37 | # Create virtual environment and activate it
 38 | uv venv
 39 | source .venv/bin/activate
 40 | 
 41 | # Install dependencies
 42 | uv add "mcp[cli]" httpx
 43 | 
 44 | # Create our server file
 45 | touch ansible.py
 46 | ```
 47 | 
 48 | ## Step 3 Building your Ansible Automation Controller MCP Server
 49 | 
 50 | This is the MCP Server I used to interact with my automation controller. Feel free to copy/paste this into your `ansible.py` file.
 51 | 
 52 | Note: to connect to self-signed SSL, use edit the async client to be https.AsyncClient(verify=False)
 53 | 
 54 | ```
 55 | import os
 56 | import httpx
 57 | from mcp.server.fastmcp import FastMCP
 58 | from typing import Any
 59 | 
 60 | # Environment variables for authentication
 61 | AAP_URL = os.getenv("AAP_URL")
 62 | AAP_TOKEN = os.getenv("AAP_TOKEN")
 63 | 
 64 | if not AAP_TOKEN:
 65 |     raise ValueError("AAP_TOKEN is required")
 66 | 
 67 | # Headers for API authentication
 68 | HEADERS = {
 69 |     "Authorization": f"Bearer {AAP_TOKEN}",
 70 |     "Content-Type": "application/json"
 71 | }
 72 | 
 73 | # Initialize FastMCP
 74 | mcp = FastMCP("ansible")
 75 | 
 76 | async def make_request(url: str, method: str = "GET", json: dict = None) -> Any:
 77 |     """Helper function to make authenticated API requests to AAP."""
 78 |     async with httpx.AsyncClient() as client:
 79 |         response = await client.request(method, url, headers=HEADERS, json=json)
 80 |     if response.status_code not in [200, 201]:
 81 |         return f"Error {response.status_code}: {response.text}"
 82 |     return response.json() if "application/json" in response.headers.get("Content-Type", "") else response.text
 83 | 
 84 | @mcp.tool()
 85 | async def list_inventories() -> Any:
 86 |     """List all inventories in Ansible Automation Platform."""
 87 |     return await make_request(f"{AAP_URL}/inventories/")
 88 | 
 89 | @mcp.tool()
 90 | async def get_inventory(inventory_id: str) -> Any:
 91 |     """Get details of a specific inventory by ID."""
 92 |     return await make_request(f"{AAP_URL}/inventories/{inventory_id}/")
 93 | 
 94 | @mcp.tool()
 95 | async def run_job(template_id: int, extra_vars: dict = {}) -> Any:
 96 |     """Run a job template by ID, optionally with extra_vars."""
 97 |     return await make_request(f"{AAP_URL}/job_templates/{template_id}/launch/", method="POST", json={"extra_vars": extra_vars})
 98 | 
 99 | @mcp.tool()
100 | async def job_status(job_id: int) -> Any:
101 |     """Check the status of a job by ID."""
102 |     return await make_request(f"{AAP_URL}/jobs/{job_id}/")
103 | 
104 | @mcp.tool()
105 | async def job_logs(job_id: int) -> str:
106 |     """Retrieve logs for a job."""
107 |     return await make_request(f"{AAP_URL}/jobs/{job_id}/stdout/")
108 | 
109 | @mcp.tool()
110 | async def create_project(
111 |     name: str,
112 |     organization_id: int,
113 |     source_control_url: str,
114 |     source_control_type: str = "git",
115 |     description: str = "",
116 |     execution_environment_id: int = None,
117 |     content_signature_validation_credential_id: int = None,
118 |     source_control_branch: str = "",
119 |     source_control_refspec: str = "",
120 |     source_control_credential_id: int = None,
121 |     clean: bool = False,
122 |     update_revision_on_launch: bool = False,
123 |     delete: bool = False,
124 |     allow_branch_override: bool = False,
125 |     track_submodules: bool = False,
126 | ) -> Any:
127 |     """Create a new project in Ansible Automation Platform."""
128 | 
129 |     payload = {
130 |         "name": name,
131 |         "description": description,
132 |         "organization": organization_id,
133 |         "scm_type": source_control_type.lower(),  # Git is default
134 |         "scm_url": source_control_url,
135 |         "scm_branch": source_control_branch,
136 |         "scm_refspec": source_control_refspec,
137 |         "scm_clean": clean,
138 |         "scm_delete_on_update": delete,
139 |         "scm_update_on_launch": update_revision_on_launch,
140 |         "allow_override": allow_branch_override,
141 |         "scm_track_submodules": track_submodules,
142 |     }
143 | 
144 |     if execution_environment_id:
145 |         payload["execution_environment"] = execution_environment_id
146 |     if content_signature_validation_credential_id:
147 |         payload["signature_validation_credential"] = content_signature_validation_credential_id
148 |     if source_control_credential_id:
149 |         payload["credential"] = source_control_credential_id
150 | 
151 |     return await make_request(f"{AAP_URL}/projects/", method="POST", json=payload)
152 | 
153 | @mcp.tool()
154 | async def create_job_template(
155 |     name: str,
156 |     project_id: int,
157 |     playbook: str,
158 |     inventory_id: int,
159 |     job_type: str = "run",
160 |     description: str = "",
161 |     credential_id: int = None,
162 |     execution_environment_id: int = None,
163 |     labels: list[str] = None,
164 |     forks: int = 0,
165 |     limit: str = "",
166 |     verbosity: int = 0,
167 |     timeout: int = 0,
168 |     job_tags: list[str] = None,
169 |     skip_tags: list[str] = None,
170 |     extra_vars: dict = None,
171 |     privilege_escalation: bool = False,
172 |     concurrent_jobs: bool = False,
173 |     provisioning_callback: bool = False,
174 |     enable_webhook: bool = False,
175 |     prevent_instance_group_fallback: bool = False,
176 | ) -> Any:
177 |     """Create a new job template in Ansible Automation Platform."""
178 | 
179 |     payload = {
180 |         "name": name,
181 |         "description": description,
182 |         "job_type": job_type,
183 |         "project": project_id,
184 |         "playbook": playbook,
185 |         "inventory": inventory_id,
186 |         "forks": forks,
187 |         "limit": limit,
188 |         "verbosity": verbosity,
189 |         "timeout": timeout,
190 |         "ask_variables_on_launch": bool(extra_vars),
191 |         "ask_tags_on_launch": bool(job_tags),
192 |         "ask_skip_tags_on_launch": bool(skip_tags),
193 |         "ask_credential_on_launch": credential_id is None,
194 |         "ask_execution_environment_on_launch": execution_environment_id is None,
195 |         "ask_labels_on_launch": labels is None,
196 |         "ask_inventory_on_launch": False,  # Inventory is required, so not prompting
197 |         "ask_job_type_on_launch": False,  # Job type is required, so not prompting
198 |         "become_enabled": privilege_escalation,
199 |         "allow_simultaneous": concurrent_jobs,
200 |         "scm_branch": "",
201 |         "webhook_service": "github" if enable_webhook else "",
202 |         "prevent_instance_group_fallback": prevent_instance_group_fallback,
203 |     }
204 | 
205 |     if credential_id:
206 |         payload["credential"] = credential_id
207 |     if execution_environment_id:
208 |         payload["execution_environment"] = execution_environment_id
209 |     if labels:
210 |         payload["labels"] = labels
211 |     if job_tags:
212 |         payload["job_tags"] = job_tags
213 |     if skip_tags:
214 |         payload["skip_tags"] = skip_tags
215 |     if extra_vars:
216 |         payload["extra_vars"] = extra_vars
217 | 
218 |     return await make_request(f"{AAP_URL}/job_templates/", method="POST", json=payload)
219 | 
220 | @mcp.tool()
221 | async def list_inventory_sources() -> Any:
222 |     """List all inventory sources in Ansible Automation Platform."""
223 |     return await make_request(f"{AAP_URL}/inventory_sources/")
224 | 
225 | @mcp.tool()
226 | async def get_inventory_source(inventory_source_id: int) -> Any:
227 |     """Get details of a specific inventory source."""
228 |     return await make_request(f"{AAP_URL}/inventory_sources/{inventory_source_id}/")
229 | 
230 | @mcp.tool()
231 | async def create_inventory_source(
232 |     name: str,
233 |     inventory_id: int,
234 |     source: str,
235 |     credential_id: int,
236 |     source_vars: dict = None,
237 |     update_on_launch: bool = True,
238 |     timeout: int = 0,
239 | ) -> Any:
240 |     """Create a dynamic inventory source. Claude will ask for the source type and credential before proceeding."""
241 |     valid_sources = [
242 |         "file", "constructed", "scm", "ec2", "gce", "azure_rm", "vmware", "satellite6", "openstack", 
243 |         "rhv", "controller", "insights", "terraform", "openshift_virtualization"
244 |     ]
245 |     
246 |     if source not in valid_sources:
247 |         return f"Error: Invalid source type '{source}'. Please select from: {', '.join(valid_sources)}"
248 |     
249 |     if not credential_id:
250 |         return "Error: Credential is required to create an inventory source."
251 |     
252 |     payload = {
253 |         "name": name,
254 |         "inventory": inventory_id,
255 |         "source": source,
256 |         "credential": credential_id,
257 |         "source_vars": source_vars,
258 |         "update_on_launch": update_on_launch,
259 |         "timeout": timeout,
260 |     }
261 |     return await make_request(f"{AAP_URL}/inventory_sources/", method="POST", json=payload)
262 | 
263 | @mcp.tool()
264 | async def update_inventory_source(inventory_source_id: int, update_data: dict) -> Any:
265 |     """Update an existing inventory source."""
266 |     return await make_request(f"{AAP_URL}/inventory_sources/{inventory_source_id}/", method="PATCH", json=update_data)
267 | 
268 | @mcp.tool()
269 | async def delete_inventory_source(inventory_source_id: int) -> Any:
270 |     """Delete an inventory source."""
271 |     return await make_request(f"{AAP_URL}/inventory_sources/{inventory_source_id}/", method="DELETE")
272 | 
273 | @mcp.tool()
274 | async def sync_inventory_source(inventory_source_id: int) -> Any:
275 |     """Manually trigger a sync for an inventory source."""
276 |     return await make_request(f"{AAP_URL}/inventory_sources/{inventory_source_id}/update/", method="POST")
277 | 
278 | @mcp.tool()
279 | async def create_inventory(
280 |     name: str,
281 |     organization_id: int,
282 |     description: str = "",
283 |     kind: str = "",
284 |     host_filter: str = "",
285 |     variables: dict = None,
286 |     prevent_instance_group_fallback: bool = False,
287 | ) -> Any:
288 |     """Create an inventory in Ansible Automation Platform."""
289 |     payload = {
290 |         "name": name,
291 |         "organization": organization_id,
292 |         "description": description,
293 |         "kind": kind,
294 |         "host_filter": host_filter,
295 |         "variables": variables,
296 |         "prevent_instance_group_fallback": prevent_instance_group_fallback,
297 |     }
298 |     return await make_request(f"{AAP_URL}/inventories/", method="POST", json=payload)
299 | 
300 | @mcp.tool()
301 | async def delete_inventory(inventory_id: int) -> Any:
302 |     """Delete an inventory from Ansible Automation Platform."""
303 |     return await make_request(f"{AAP_URL}/inventories/{inventory_id}/", method="DELETE")
304 | 
305 | @mcp.tool()
306 | async def list_job_templates() -> Any:
307 |     """List all job templates available in Ansible Automation Platform."""
308 |     return await make_request(f"{AAP_URL}/job_templates/")
309 | 
310 | @mcp.tool()
311 | async def get_job_template(template_id: int) -> Any:
312 |     """Retrieve details of a specific job template."""
313 |     return await make_request(f"{AAP_URL}/job_templates/{template_id}/")
314 | 
315 | @mcp.tool()
316 | async def list_jobs() -> Any:
317 |     """List all jobs available in Ansible Automation Platform."""
318 |     return await make_request(f"{AAP_URL}/jobs/")
319 | 
320 | @mcp.tool()
321 | async def list_recent_jobs(hours: int = 24) -> Any:
322 |     """List all jobs executed in the last specified hours (default 24 hours)."""
323 |     from datetime import datetime, timedelta
324 |     
325 |     time_filter = (datetime.utcnow() - timedelta(hours=hours)).isoformat() + "Z"
326 |     return await make_request(f"{AAP_URL}/jobs/?created__gte={time_filter}")
327 | 
328 | if __name__ == "__main__":
329 |     mcp.run(transport="stdio")
330 | 
331 | ```
332 | 
333 | ## Step 4: Configuring Claude Desktop to use your MCP Servers
334 | 
335 | In my particular case, I want to take advantage of two MCP Servers: the Ansible MCP Server above and the Kubernetes MCP Server that I found within the [quarkus-mcp-servers](https://github.com/quarkiverse/quarkus-mcp-servers/tree/main/kubernetes) Git repo
336 | 
337 | Open the `claude_desktop_config.json` , which on MacOS is located at
338 | 
339 | ```
340 | ~/Library/Application\ Support/Claude/claude_desktop_config.json
341 | ```
342 | 
343 | ```
344 | {
345 |   "mcpServers": {
346 |     "ansible": {
347 |         "command": "/absolute/path/to/uv",
348 |         "args": [
349 |             "--directory",
350 |             "/absolute/path/to/ansible_mcp",
351 |             "run",
352 |             "ansible.py"
353 |         ],
354 |         "env": {
355 |             "AAP_TOKEN": "<aap-token>",
356 |             "AAP_URL": "https://<aap-url>/api/controller/v2"
357 |         }
358 |     },
359 |     "kubernetes": {
360 |       "command": "jbang",
361 |       "args": [
362 |         "--quiet",
363 |         "https://github.com/quarkiverse/quarkus-mcp-servers/blob/main/kubernetes/src/main/java/io/quarkiverse/mcp/servers/kubernetes/MCPServerKubernetes.java"
364 |       ]
365 |     }
366 |   }
367 | }
368 | ```
369 | Save the file.
370 | 
371 | WARNING: Absolute path to your `uv` binary is required. Do a `which uv` on your system to get the full path. 
372 | 
373 | NOTE: If you need to create the AAP_TOKEN, go to the AAP Dashboard, select Access Management -> Users -> <your_user> -> Tokens -> Create token -> Select the Scope dropdown and select 'Write' and click Create token.
374 | 
375 | 
376 | 
377 | ## Step 5: Re-Launch Claude Desktop 
378 | 
379 | If you already had Claude Desktop open, relaunch it, otherwise make sure Claude Desktop is picking up the MCP servers. You can verify this by ensuring the hammer icon is launched.
380 | 
381 | ![Screenshot 2025-02-26 at 3 46 30 PM](https://github.com/user-attachments/assets/064e2edb-dfaa-4250-8a82-d8e59c21644f)
382 | 
383 | NOTE: The number next to the hammer will vary based up on the amount of MCP tools available. 
384 | 
385 | Once you click on the hammer icon, you can see a list of tools. Below is an example.
386 | 
387 | ![Screenshot 2025-02-26 at 3 50 23 PM](https://github.com/user-attachments/assets/78ae7be0-e1a6-4fbb-8520-4d57a6563bbe)
388 | 
389 | ## Step 6: Test your Environment
390 | 
391 | Now with everything setup, see if you can interact with your Ansible Automation Platform and OpenShift cluster. 
392 | 
393 | Feel free to ask it questions such as:
394 | 
395 | * How many Job Templates are available?
396 | * How many VMs are on my OpenShift cluster?
397 | 
398 | NOTE: It is very likely you will need to take advantage of the Claude Desktop Pro Plan in order to get the full functionality. 
399 | 
400 | ## References
401 | 
402 | [Claude Desktop Quickstart for Server Developers](https://modelcontextprotocol.io/quickstart/server)
403 | 
404 | 
405 | ## BONUS: Adding Event Driven Ansible MCP Server
406 | 
407 | If you have setup Event Driven Ansible, you can take advantage of the Event Driven Ansible MCP Server below. The instructions are similar to the above. 
408 | 
409 | * Create an `eda.py` and store it in your `/absolute/path/to/ansible_mcp`
410 | * Update your `claude_desktop_config.json`
411 | * Restart your Claude Desktop and verify the hammer has picked up your new MCP tools
412 | 
413 | The two files are listed below for easy copy/paste.
414 | 
415 | ### claude_desktop_config.json
416 | ```
417 | {
418 |   "mcpServers": {
419 |     "ansible": {
420 |         "command": "/absolute/path/to/uv",
421 |         "args": [
422 |             "--directory",
423 |             "/absolute/path/to/ansible_mcp",
424 |             "run",
425 |             "ansible.py"
426 |         ],
427 |         "env": {
428 |             "AAP_TOKEN": "<aap-token>",
429 |             "AAP_URL": "https://<aap-url>/api/controller/v2"
430 |         }
431 |     },
432 |     "kubernetes": {
433 |       "command": "jbang",
434 |       "args": [
435 |         "--quiet",
436 |         "https://github.com/quarkiverse/quarkus-mcp-servers/blob/main/kubernetes/src/main/java/io/quarkiverse/mcp/servers/kubernetes/MCPServerKubernetes.java"
437 |       ]
438 |     },
439 |     "eda": {
440 |         "command": "/absolute/path/to/uv",
441 |         "args": [
442 |             "--directory",
443 |             "/absolute/path/to/ansible_mcp",
444 |             "run",
445 |             "eda.py"
446 |         ],
447 |         "env": {
448 |             "EDA_TOKEN": "<EDA_TOKEN>",
449 |             "EDA_URL": "https://<aap-url>/api/eda/v1"
450 |         }
451 |     }
452 |   }
453 | }
454 | ```
455 | 
456 | WARNING: Absolute path to your `uv` binary is required. Do a `which uv` on your system to get the full path. 
457 | 
458 | NOTE: An EDA Token can be generated from the AAP Dashboard. 
459 | 
460 | ### eda.py MCP Server 
461 | 
462 | ```
463 | import os
464 | import httpx
465 | from mcp.server.fastmcp import FastMCP
466 | from typing import Any, Dict
467 | 
468 | # Environment variables for authentication
469 | EDA_URL = os.getenv("EDA_URL")
470 | EDA_TOKEN = os.getenv("EDA_TOKEN")
471 | 
472 | if not EDA_TOKEN:
473 |     raise ValueError("EDA_TOKEN is required")
474 | 
475 | # Headers for API authentication
476 | HEADERS = {
477 |     "Authorization": f"Bearer {EDA_TOKEN}",
478 |     "Content-Type": "application/json"
479 | }
480 | 
481 | # Initialize FastMCP
482 | mcp = FastMCP("eda")
483 | 
484 | async def make_request(url: str, method: str = "GET", json: Dict = None) -> Any:
485 |     """Helper function to make authenticated API requests to EDA."""
486 |     async with httpx.AsyncClient() as client:
487 |         response = await client.request(method, url, headers=HEADERS, json=json)
488 |     if response.status_code not in [200, 201, 204]:
489 |         return f"Error {response.status_code}: {response.text}"
490 |     return response.json() if "application/json" in response.headers.get("Content-Type", "") else response.text
491 | 
492 | @mcp.tool()
493 | async def list_activations() -> Any:
494 |     """List all activations in Event-Driven Ansible."""
495 |     return await make_request(f"{EDA_URL}/activations/")
496 | 
497 | @mcp.tool()
498 | async def get_activation(activation_id: int) -> Any:
499 |     """Get details of a specific activation."""
500 |     return await make_request(f"{EDA_URL}/activations/{activation_id}/")
501 | 
502 | @mcp.tool()
503 | async def create_activation(payload: Dict) -> Any:
504 |     """Create a new activation."""
505 |     return await make_request(f"{EDA_URL}/activations/", method="POST", json=payload)
506 | 
507 | @mcp.tool()
508 | async def disable_activation(activation_id: int) -> Any:
509 |     """Disable an activation."""
510 |     return await make_request(f"{EDA_URL}/activations/{activation_id}/disable/", method="POST")
511 | 
512 | @mcp.tool()
513 | async def enable_activation(activation_id: int) -> Any:
514 |     """Enable an activation."""
515 |     return await make_request(f"{EDA_URL}/activations/{activation_id}/enable/", method="POST")
516 | 
517 | @mcp.tool()
518 | async def restart_activation(activation_id: int) -> Any:
519 |     """Restart an activation."""
520 |     return await make_request(f"{EDA_URL}/activations/{activation_id}/restart/", method="POST")
521 | 
522 | @mcp.tool()
523 | async def delete_activation(activation_id: int) -> Any:
524 |     """Delete an activation."""
525 |     return await make_request(f"{EDA_URL}/activations/{activation_id}/", method="DELETE")
526 | 
527 | @mcp.tool()
528 | async def list_decision_environments() -> Any:
529 |     """List all decision environments."""
530 |     return await make_request(f"{EDA_URL}/decision-environments/")
531 | 
532 | @mcp.tool()
533 | async def create_decision_environment(payload: Dict) -> Any:
534 |     """Create a new decision environment."""
535 |     return await make_request(f"{EDA_URL}/decision-environments/", method="POST", json=payload)
536 | 
537 | @mcp.tool()
538 | async def list_rulebooks() -> Any:
539 |     """List all rulebooks in EDA."""
540 |     return await make_request(f"{EDA_URL}/rulebooks/")
541 | 
542 | @mcp.tool()
543 | async def get_rulebook(rulebook_id: int) -> Any:
544 |     """Retrieve details of a specific rulebook."""
545 |     return await make_request(f"{EDA_URL}/rulebooks/{rulebook_id}/")
546 | 
547 | @mcp.tool()
548 | async def list_event_streams() -> Any:
549 |     """List all event streams."""
550 |     return await make_request(f"{EDA_URL}/event-streams/")
551 | 
552 | if __name__ == "__main__":
553 |     mcp.run(transport="stdio")
554 | ```
555 | 
556 | 
```

--------------------------------------------------------------------------------
/claude_desktop_config.json:
--------------------------------------------------------------------------------

```json
 1 | {
 2 |   "mcpServers": {
 3 |     "ansible": {
 4 |         "command": "/absolute/path/to/uv",
 5 |         "args": [
 6 |             "--directory",
 7 |             "/absolute/path/to/ansible_mcp",
 8 |             "run",
 9 |             "ansible.py"
10 |         ],
11 |         "env": {
12 |             "AAP_TOKEN": "<aap-token>",
13 |             "AAP_URL": "https://<my-automation-controller>/api/controller/v2"
14 |         }
15 |     },
16 |     "kubernetes": {
17 |       "command": "jbang",
18 |       "args": [
19 |         "--quiet",
20 |         "https://github.com/quarkiverse/quarkus-mcp-servers/blob/main/kubernetes/src/main/java/io/quarkiverse/mcp/servers/kubernetes/MCPServerKubernetes.java"
21 |       ]
22 |     }
23 |   }
24 | }
25 | 
```

--------------------------------------------------------------------------------
/mcp_server/deployment.yaml:
--------------------------------------------------------------------------------

```yaml
 1 | kind: Deployment
 2 | apiVersion: apps/v1
 3 | metadata:
 4 |   name: ansible-mcp-server
 5 | spec:
 6 |   selector:
 7 |     matchLabels:
 8 |       app: ansible-mcp-server
 9 |   replicas: 1
10 |   template:
11 |     metadata:
12 |       labels:
13 |         app: ansible-mcp-server
14 |     spec:
15 |       containers:
16 |       - name: ansible-mcp-server
17 |         image: quay.io/rcook/ansible-mcp:amd
18 |         ports:
19 |           - containerPort: 8000
20 |             protocol: TCP
21 |           - containerPort: 8080
22 |             protocol: TCP
23 |         env:
24 |           - name: AAP_TOKEN
25 |             valueFrom:
26 |               secretKeyRef:
27 |                 name: aap
28 |                 key: token
29 |           - name: AAP_URL
30 |             valueFrom:
31 |               secretKeyRef:
32 |                 name: aap
33 |                 key: url
34 |         resources: {}
35 | 
```

--------------------------------------------------------------------------------
/eda.py:
--------------------------------------------------------------------------------

```python
 1 | import os
 2 | import httpx
 3 | from mcp.server.fastmcp import FastMCP
 4 | from typing import Any, Dict
 5 | 
 6 | # Environment variables for authentication
 7 | EDA_URL = os.getenv("EDA_URL")
 8 | EDA_TOKEN = os.getenv("EDA_TOKEN")
 9 | 
10 | if not EDA_TOKEN:
11 |     raise ValueError("EDA_TOKEN environment variable is required")
12 | 
13 | # Headers for API authentication
14 | HEADERS = {
15 |     "Authorization": f"Bearer {EDA_TOKEN}",
16 |     "Content-Type": "application/json"
17 | }
18 | 
19 | # Initialize FastMCP
20 | mcp = FastMCP("eda")
21 | 
22 | async def make_request(url: str, method: str = "GET", json: Dict = None) -> Any:
23 |     """Helper function to make authenticated API requests to EDA."""
24 |     async with httpx.AsyncClient() as client:
25 |         response = await client.request(method, url, headers=HEADERS, json=json)
26 |     if response.status_code not in [200, 201, 204]:
27 |         return f"Error {response.status_code}: {response.text}"
28 |     return response.json() if "application/json" in response.headers.get("Content-Type", "") else response.text
29 | 
30 | @mcp.tool()
31 | async def list_activations() -> Any:
32 |     """List all activations in Event-Driven Ansible."""
33 |     return await make_request(f"{EDA_URL}/activations/")
34 | 
35 | @mcp.tool()
36 | async def get_activation(activation_id: int) -> Any:
37 |     """Get details of a specific activation."""
38 |     return await make_request(f"{EDA_URL}/activations/{activation_id}/")
39 | 
40 | @mcp.tool()
41 | async def create_activation(payload: Dict) -> Any:
42 |     """Create a new activation."""
43 |     return await make_request(f"{EDA_URL}/activations/", method="POST", json=payload)
44 | 
45 | @mcp.tool()
46 | async def disable_activation(activation_id: int) -> Any:
47 |     """Disable an activation."""
48 |     return await make_request(f"{EDA_URL}/activations/{activation_id}/disable/", method="POST")
49 | 
50 | @mcp.tool()
51 | async def enable_activation(activation_id: int) -> Any:
52 |     """Enable an activation."""
53 |     return await make_request(f"{EDA_URL}/activations/{activation_id}/enable/", method="POST")
54 | 
55 | @mcp.tool()
56 | async def restart_activation(activation_id: int) -> Any:
57 |     """Restart an activation."""
58 |     return await make_request(f"{EDA_URL}/activations/{activation_id}/restart/", method="POST")
59 | 
60 | @mcp.tool()
61 | async def delete_activation(activation_id: int) -> Any:
62 |     """Delete an activation."""
63 |     return await make_request(f"{EDA_URL}/activations/{activation_id}/", method="DELETE")
64 | 
65 | @mcp.tool()
66 | async def list_decision_environments() -> Any:
67 |     """List all decision environments."""
68 |     return await make_request(f"{EDA_URL}/decision-environments/")
69 | 
70 | @mcp.tool()
71 | async def create_decision_environment(payload: Dict) -> Any:
72 |     """Create a new decision environment."""
73 |     return await make_request(f"{EDA_URL}/decision-environments/", method="POST", json=payload)
74 | 
75 | @mcp.tool()
76 | async def list_rulebooks() -> Any:
77 |     """List all rulebooks in EDA."""
78 |     return await make_request(f"{EDA_URL}/rulebooks/")
79 | 
80 | @mcp.tool()
81 | async def get_rulebook(rulebook_id: int) -> Any:
82 |     """Retrieve details of a specific rulebook."""
83 |     return await make_request(f"{EDA_URL}/rulebooks/{rulebook_id}/")
84 | 
85 | @mcp.tool()
86 | async def list_event_streams() -> Any:
87 |     """List all event streams."""
88 |     return await make_request(f"{EDA_URL}/event-streams/")
89 | 
90 | if __name__ == "__main__":
91 |     mcp.run(transport="stdio")
92 | 
93 | 
```

--------------------------------------------------------------------------------
/ansible.py:
--------------------------------------------------------------------------------

```python
  1 | import os
  2 | import httpx
  3 | from mcp.server.fastmcp import FastMCP
  4 | from typing import Any
  5 | 
  6 | # Environment variables for authentication
  7 | AAP_URL = os.getenv("AAP_URL")
  8 | AAP_TOKEN = os.getenv("AAP_TOKEN")
  9 | 
 10 | if not AAP_TOKEN:
 11 |     raise ValueError("AAP_TOKEN is required")
 12 | 
 13 | # Headers for API authentication
 14 | HEADERS = {
 15 |     "Authorization": f"Bearer {AAP_TOKEN}",
 16 |     "Content-Type": "application/json"
 17 | }
 18 | 
 19 | # Initialize FastMCP
 20 | mcp = FastMCP("ansible")
 21 | 
 22 | async def make_request(url: str, method: str = "GET", json: dict = None) -> Any:
 23 |     """Helper function to make authenticated API requests to AAP."""
 24 |     async with httpx.AsyncClient() as client:
 25 |         response = await client.request(method, url, headers=HEADERS, json=json)
 26 |     if response.status_code not in [200, 201]:
 27 |         return f"Error {response.status_code}: {response.text}"
 28 |     return response.json() if "application/json" in response.headers.get("Content-Type", "") else response.text
 29 | 
 30 | @mcp.tool()
 31 | async def list_inventories() -> Any:
 32 |     """List all inventories in Ansible Automation Platform."""
 33 |     return await make_request(f"{AAP_URL}/inventories/")
 34 | 
 35 | @mcp.tool()
 36 | async def get_inventory(inventory_id: str) -> Any:
 37 |     """Get details of a specific inventory by ID."""
 38 |     return await make_request(f"{AAP_URL}/inventories/{inventory_id}/")
 39 | 
 40 | @mcp.tool()
 41 | async def run_job(template_id: int, extra_vars: dict = {}) -> Any:
 42 |     """Run a job template by ID, optionally with extra_vars."""
 43 |     return await make_request(f"{AAP_URL}/job_templates/{template_id}/launch/", method="POST", json={"extra_vars": extra_vars})
 44 | 
 45 | @mcp.tool()
 46 | async def job_status(job_id: int) -> Any:
 47 |     """Check the status of a job by ID."""
 48 |     return await make_request(f"{AAP_URL}/jobs/{job_id}/")
 49 | 
 50 | @mcp.tool()
 51 | async def job_logs(job_id: int) -> str:
 52 |     """Retrieve logs for a job."""
 53 |     return await make_request(f"{AAP_URL}/jobs/{job_id}/stdout/")
 54 | 
 55 | @mcp.tool()
 56 | async def create_project(
 57 |     name: str,
 58 |     organization_id: int,
 59 |     source_control_url: str,
 60 |     source_control_type: str = "git",
 61 |     description: str = "",
 62 |     execution_environment_id: int = None,
 63 |     content_signature_validation_credential_id: int = None,
 64 |     source_control_branch: str = "",
 65 |     source_control_refspec: str = "",
 66 |     source_control_credential_id: int = None,
 67 |     clean: bool = False,
 68 |     update_revision_on_launch: bool = False,
 69 |     delete: bool = False,
 70 |     allow_branch_override: bool = False,
 71 |     track_submodules: bool = False,
 72 | ) -> Any:
 73 |     """Create a new project in Ansible Automation Platform."""
 74 | 
 75 |     payload = {
 76 |         "name": name,
 77 |         "description": description,
 78 |         "organization": organization_id,
 79 |         "scm_type": source_control_type.lower(),  # Git is default
 80 |         "scm_url": source_control_url,
 81 |         "scm_branch": source_control_branch,
 82 |         "scm_refspec": source_control_refspec,
 83 |         "scm_clean": clean,
 84 |         "scm_delete_on_update": delete,
 85 |         "scm_update_on_launch": update_revision_on_launch,
 86 |         "allow_override": allow_branch_override,
 87 |         "scm_track_submodules": track_submodules,
 88 |     }
 89 | 
 90 |     if execution_environment_id:
 91 |         payload["execution_environment"] = execution_environment_id
 92 |     if content_signature_validation_credential_id:
 93 |         payload["signature_validation_credential"] = content_signature_validation_credential_id
 94 |     if source_control_credential_id:
 95 |         payload["credential"] = source_control_credential_id
 96 | 
 97 |     return await make_request(f"{AAP_URL}/projects/", method="POST", json=payload)
 98 | 
 99 | @mcp.tool()
100 | async def create_job_template(
101 |     name: str,
102 |     project_id: int,
103 |     playbook: str,
104 |     inventory_id: int,
105 |     job_type: str = "run",
106 |     description: str = "",
107 |     credential_id: int = None,
108 |     execution_environment_id: int = None,
109 |     labels: list[str] = None,
110 |     forks: int = 0,
111 |     limit: str = "",
112 |     verbosity: int = 0,
113 |     timeout: int = 0,
114 |     job_tags: list[str] = None,
115 |     skip_tags: list[str] = None,
116 |     extra_vars: dict = None,
117 |     privilege_escalation: bool = False,
118 |     concurrent_jobs: bool = False,
119 |     provisioning_callback: bool = False,
120 |     enable_webhook: bool = False,
121 |     prevent_instance_group_fallback: bool = False,
122 | ) -> Any:
123 |     """Create a new job template in Ansible Automation Platform."""
124 | 
125 |     payload = {
126 |         "name": name,
127 |         "description": description,
128 |         "job_type": job_type,
129 |         "project": project_id,
130 |         "playbook": playbook,
131 |         "inventory": inventory_id,
132 |         "forks": forks,
133 |         "limit": limit,
134 |         "verbosity": verbosity,
135 |         "timeout": timeout,
136 |         "ask_variables_on_launch": bool(extra_vars),
137 |         "ask_tags_on_launch": bool(job_tags),
138 |         "ask_skip_tags_on_launch": bool(skip_tags),
139 |         "ask_credential_on_launch": credential_id is None,
140 |         "ask_execution_environment_on_launch": execution_environment_id is None,
141 |         "ask_labels_on_launch": labels is None,
142 |         "ask_inventory_on_launch": False,  # Inventory is required, so not prompting
143 |         "ask_job_type_on_launch": False,  # Job type is required, so not prompting
144 |         "become_enabled": privilege_escalation,
145 |         "allow_simultaneous": concurrent_jobs,
146 |         "scm_branch": "",
147 |         "webhook_service": "github" if enable_webhook else "",
148 |         "prevent_instance_group_fallback": prevent_instance_group_fallback,
149 |     }
150 | 
151 |     if credential_id:
152 |         payload["credential"] = credential_id
153 |     if execution_environment_id:
154 |         payload["execution_environment"] = execution_environment_id
155 |     if labels:
156 |         payload["labels"] = labels
157 |     if job_tags:
158 |         payload["job_tags"] = job_tags
159 |     if skip_tags:
160 |         payload["skip_tags"] = skip_tags
161 |     if extra_vars:
162 |         payload["extra_vars"] = extra_vars
163 | 
164 |     return await make_request(f"{AAP_URL}/job_templates/", method="POST", json=payload)
165 | 
166 | @mcp.tool()
167 | async def list_inventory_sources() -> Any:
168 |     """List all inventory sources in Ansible Automation Platform."""
169 |     return await make_request(f"{AAP_URL}/inventory_sources/")
170 | 
171 | @mcp.tool()
172 | async def get_inventory_source(inventory_source_id: int) -> Any:
173 |     """Get details of a specific inventory source."""
174 |     return await make_request(f"{AAP_URL}/inventory_sources/{inventory_source_id}/")
175 | 
176 | @mcp.tool()
177 | async def create_inventory_source(
178 |     name: str,
179 |     inventory_id: int,
180 |     source: str,
181 |     credential_id: int,
182 |     source_vars: dict = None,
183 |     update_on_launch: bool = True,
184 |     timeout: int = 0,
185 | ) -> Any:
186 |     """Create a dynamic inventory source. Claude will ask for the source type and credential before proceeding."""
187 |     valid_sources = [
188 |         "file", "constructed", "scm", "ec2", "gce", "azure_rm", "vmware", "satellite6", "openstack", 
189 |         "rhv", "controller", "insights", "terraform", "openshift_virtualization"
190 |     ]
191 |     
192 |     if source not in valid_sources:
193 |         return f"Error: Invalid source type '{source}'. Please select from: {', '.join(valid_sources)}"
194 |     
195 |     if not credential_id:
196 |         return "Error: Credential is required to create an inventory source."
197 |     
198 |     payload = {
199 |         "name": name,
200 |         "inventory": inventory_id,
201 |         "source": source,
202 |         "credential": credential_id,
203 |         "source_vars": source_vars,
204 |         "update_on_launch": update_on_launch,
205 |         "timeout": timeout,
206 |     }
207 |     return await make_request(f"{AAP_URL}/inventory_sources/", method="POST", json=payload)
208 | 
209 | @mcp.tool()
210 | async def update_inventory_source(inventory_source_id: int, update_data: dict) -> Any:
211 |     """Update an existing inventory source."""
212 |     return await make_request(f"{AAP_URL}/inventory_sources/{inventory_source_id}/", method="PATCH", json=update_data)
213 | 
214 | @mcp.tool()
215 | async def delete_inventory_source(inventory_source_id: int) -> Any:
216 |     """Delete an inventory source."""
217 |     return await make_request(f"{AAP_URL}/inventory_sources/{inventory_source_id}/", method="DELETE")
218 | 
219 | @mcp.tool()
220 | async def sync_inventory_source(inventory_source_id: int) -> Any:
221 |     """Manually trigger a sync for an inventory source."""
222 |     return await make_request(f"{AAP_URL}/inventory_sources/{inventory_source_id}/update/", method="POST")
223 | 
224 | @mcp.tool()
225 | async def create_inventory(
226 |     name: str,
227 |     organization_id: int,
228 |     description: str = "",
229 |     kind: str = "",
230 |     host_filter: str = "",
231 |     variables: dict = None,
232 |     prevent_instance_group_fallback: bool = False,
233 | ) -> Any:
234 |     """Create an inventory in Ansible Automation Platform."""
235 |     payload = {
236 |         "name": name,
237 |         "organization": organization_id,
238 |         "description": description,
239 |         "kind": kind,
240 |         "host_filter": host_filter,
241 |         "variables": variables,
242 |         "prevent_instance_group_fallback": prevent_instance_group_fallback,
243 |     }
244 |     return await make_request(f"{AAP_URL}/inventories/", method="POST", json=payload)
245 | 
246 | @mcp.tool()
247 | async def delete_inventory(inventory_id: int) -> Any:
248 |     """Delete an inventory from Ansible Automation Platform."""
249 |     return await make_request(f"{AAP_URL}/inventories/{inventory_id}/", method="DELETE")
250 | 
251 | @mcp.tool()
252 | async def list_job_templates() -> Any:
253 |     """List all job templates available in Ansible Automation Platform."""
254 |     return await make_request(f"{AAP_URL}/job_templates/")
255 | 
256 | @mcp.tool()
257 | async def get_job_template(template_id: int) -> Any:
258 |     """Retrieve details of a specific job template."""
259 |     return await make_request(f"{AAP_URL}/job_templates/{template_id}/")
260 | 
261 | @mcp.tool()
262 | async def list_jobs() -> Any:
263 |     """List all jobs available in Ansible Automation Platform."""
264 |     return await make_request(f"{AAP_URL}/jobs/")
265 | 
266 | @mcp.tool()
267 | async def list_recent_jobs(hours: int = 24) -> Any:
268 |     """List all jobs executed in the last specified hours (default 24 hours)."""
269 |     from datetime import datetime, timedelta
270 |     
271 |     time_filter = (datetime.utcnow() - timedelta(hours=hours)).isoformat() + "Z"
272 |     return await make_request(f"{AAP_URL}/jobs/?created__gte={time_filter}")
273 | 
274 | if __name__ == "__main__":
275 |     mcp.run(transport="stdio")
276 | 
```

--------------------------------------------------------------------------------
/mcp_server/ansible.py:
--------------------------------------------------------------------------------

```python
  1 | import os
  2 | import httpx
  3 | from mcp.server.fastmcp import FastMCP
  4 | from typing import Any
  5 | 
  6 | # Environment variables for authentication
  7 | AAP_URL = os.getenv("AAP_URL")
  8 | AAP_TOKEN = os.getenv("AAP_TOKEN")
  9 | 
 10 | if not AAP_TOKEN:
 11 |     raise ValueError("AAP_TOKEN is required")
 12 | 
 13 | # Headers for API authentication
 14 | HEADERS = {
 15 |     "Authorization": f"Bearer {AAP_TOKEN}",
 16 |     "Content-Type": "application/json"
 17 | }
 18 | 
 19 | # Initialize FastMCP
 20 | mcp = FastMCP("ansible")
 21 | 
 22 | async def make_request(url: str, method: str = "GET", json: dict = None) -> Any:
 23 |     """Helper function to make authenticated API requests to AAP."""
 24 |     async with httpx.AsyncClient() as client:
 25 |         response = await client.request(method, url, headers=HEADERS, json=json)
 26 |     if response.status_code not in [200, 201]:
 27 |         return f"Error {response.status_code}: {response.text}"
 28 |     return response.json() if "application/json" in response.headers.get("Content-Type", "") else response.text
 29 | 
 30 | @mcp.tool()
 31 | async def list_inventories() -> Any:
 32 |     """List all inventories in Ansible Automation Platform."""
 33 |     return await make_request(f"{AAP_URL}/inventories/")
 34 | 
 35 | @mcp.tool()
 36 | async def get_inventory(inventory_id: str) -> Any:
 37 |     """Get details of a specific inventory by ID."""
 38 |     return await make_request(f"{AAP_URL}/inventories/{inventory_id}/")
 39 | 
 40 | @mcp.tool()
 41 | async def run_job(template_id: int, extra_vars: dict = {}) -> Any:
 42 |     """Run a job template by ID, optionally with extra_vars."""
 43 |     return await make_request(f"{AAP_URL}/job_templates/{template_id}/launch/", method="POST", json={"extra_vars": extra_vars})
 44 | 
 45 | @mcp.tool()
 46 | async def job_status(job_id: int) -> Any:
 47 |     """Check the status of a job by ID."""
 48 |     return await make_request(f"{AAP_URL}/jobs/{job_id}/")
 49 | 
 50 | @mcp.tool()
 51 | async def job_logs(job_id: int) -> str:
 52 |     """Retrieve logs for a job."""
 53 |     return await make_request(f"{AAP_URL}/jobs/{job_id}/stdout/")
 54 | 
 55 | @mcp.tool()
 56 | async def create_project(
 57 |     name: str,
 58 |     organization_id: int,
 59 |     source_control_url: str,
 60 |     source_control_type: str = "git",
 61 |     description: str = "",
 62 |     execution_environment_id: int = None,
 63 |     content_signature_validation_credential_id: int = None,
 64 |     source_control_branch: str = "",
 65 |     source_control_refspec: str = "",
 66 |     source_control_credential_id: int = None,
 67 |     clean: bool = False,
 68 |     update_revision_on_launch: bool = False,
 69 |     delete: bool = False,
 70 |     allow_branch_override: bool = False,
 71 |     track_submodules: bool = False,
 72 | ) -> Any:
 73 |     """Create a new project in Ansible Automation Platform."""
 74 | 
 75 |     payload = {
 76 |         "name": name,
 77 |         "description": description,
 78 |         "organization": organization_id,
 79 |         "scm_type": source_control_type.lower(),  # Git is default
 80 |         "scm_url": source_control_url,
 81 |         "scm_branch": source_control_branch,
 82 |         "scm_refspec": source_control_refspec,
 83 |         "scm_clean": clean,
 84 |         "scm_delete_on_update": delete,
 85 |         "scm_update_on_launch": update_revision_on_launch,
 86 |         "allow_override": allow_branch_override,
 87 |         "scm_track_submodules": track_submodules,
 88 |     }
 89 | 
 90 |     if execution_environment_id:
 91 |         payload["execution_environment"] = execution_environment_id
 92 |     if content_signature_validation_credential_id:
 93 |         payload["signature_validation_credential"] = content_signature_validation_credential_id
 94 |     if source_control_credential_id:
 95 |         payload["credential"] = source_control_credential_id
 96 | 
 97 |     return await make_request(f"{AAP_URL}/projects/", method="POST", json=payload)
 98 | 
 99 | @mcp.tool()
100 | async def create_job_template(
101 |     name: str,
102 |     project_id: int,
103 |     playbook: str,
104 |     inventory_id: int,
105 |     job_type: str = "run",
106 |     description: str = "",
107 |     credential_id: int = None,
108 |     execution_environment_id: int = None,
109 |     labels: list[str] = None,
110 |     forks: int = 0,
111 |     limit: str = "",
112 |     verbosity: int = 0,
113 |     timeout: int = 0,
114 |     job_tags: list[str] = None,
115 |     skip_tags: list[str] = None,
116 |     extra_vars: dict = None,
117 |     privilege_escalation: bool = False,
118 |     concurrent_jobs: bool = False,
119 |     provisioning_callback: bool = False,
120 |     enable_webhook: bool = False,
121 |     prevent_instance_group_fallback: bool = False,
122 | ) -> Any:
123 |     """Create a new job template in Ansible Automation Platform."""
124 | 
125 |     payload = {
126 |         "name": name,
127 |         "description": description,
128 |         "job_type": job_type,
129 |         "project": project_id,
130 |         "playbook": playbook,
131 |         "inventory": inventory_id,
132 |         "forks": forks,
133 |         "limit": limit,
134 |         "verbosity": verbosity,
135 |         "timeout": timeout,
136 |         "ask_variables_on_launch": bool(extra_vars),
137 |         "ask_tags_on_launch": bool(job_tags),
138 |         "ask_skip_tags_on_launch": bool(skip_tags),
139 |         "ask_credential_on_launch": credential_id is None,
140 |         "ask_execution_environment_on_launch": execution_environment_id is None,
141 |         "ask_labels_on_launch": labels is None,
142 |         "ask_inventory_on_launch": False,  # Inventory is required, so not prompting
143 |         "ask_job_type_on_launch": False,  # Job type is required, so not prompting
144 |         "become_enabled": privilege_escalation,
145 |         "allow_simultaneous": concurrent_jobs,
146 |         "scm_branch": "",
147 |         "webhook_service": "github" if enable_webhook else "",
148 |         "prevent_instance_group_fallback": prevent_instance_group_fallback,
149 |     }
150 | 
151 |     if credential_id:
152 |         payload["credential"] = credential_id
153 |     if execution_environment_id:
154 |         payload["execution_environment"] = execution_environment_id
155 |     if labels:
156 |         payload["labels"] = labels
157 |     if job_tags:
158 |         payload["job_tags"] = job_tags
159 |     if skip_tags:
160 |         payload["skip_tags"] = skip_tags
161 |     if extra_vars:
162 |         payload["extra_vars"] = extra_vars
163 | 
164 |     return await make_request(f"{AAP_URL}/job_templates/", method="POST", json=payload)
165 | 
166 | @mcp.tool()
167 | async def list_inventory_sources() -> Any:
168 |     """List all inventory sources in Ansible Automation Platform."""
169 |     return await make_request(f"{AAP_URL}/inventory_sources/")
170 | 
171 | @mcp.tool()
172 | async def get_inventory_source(inventory_source_id: int) -> Any:
173 |     """Get details of a specific inventory source."""
174 |     return await make_request(f"{AAP_URL}/inventory_sources/{inventory_source_id}/")
175 | 
176 | @mcp.tool()
177 | async def create_inventory_source(
178 |     name: str,
179 |     inventory_id: int,
180 |     source: str,
181 |     credential_id: int,
182 |     source_vars: dict = None,
183 |     update_on_launch: bool = True,
184 |     timeout: int = 0,
185 | ) -> Any:
186 |     """Create a dynamic inventory source. Claude will ask for the source type and credential before proceeding."""
187 |     valid_sources = [
188 |         "file", "constructed", "scm", "ec2", "gce", "azure_rm", "vmware", "satellite6", "openstack", 
189 |         "rhv", "controller", "insights", "terraform", "openshift_virtualization"
190 |     ]
191 |     
192 |     if source not in valid_sources:
193 |         return f"Error: Invalid source type '{source}'. Please select from: {', '.join(valid_sources)}"
194 |     
195 |     if not credential_id:
196 |         return "Error: Credential is required to create an inventory source."
197 |     
198 |     payload = {
199 |         "name": name,
200 |         "inventory": inventory_id,
201 |         "source": source,
202 |         "credential": credential_id,
203 |         "source_vars": source_vars,
204 |         "update_on_launch": update_on_launch,
205 |         "timeout": timeout,
206 |     }
207 |     return await make_request(f"{AAP_URL}/inventory_sources/", method="POST", json=payload)
208 | 
209 | @mcp.tool()
210 | async def update_inventory_source(inventory_source_id: int, update_data: dict) -> Any:
211 |     """Update an existing inventory source."""
212 |     return await make_request(f"{AAP_URL}/inventory_sources/{inventory_source_id}/", method="PATCH", json=update_data)
213 | 
214 | @mcp.tool()
215 | async def delete_inventory_source(inventory_source_id: int) -> Any:
216 |     """Delete an inventory source."""
217 |     return await make_request(f"{AAP_URL}/inventory_sources/{inventory_source_id}/", method="DELETE")
218 | 
219 | @mcp.tool()
220 | async def sync_inventory_source(inventory_source_id: int) -> Any:
221 |     """Manually trigger a sync for an inventory source."""
222 |     return await make_request(f"{AAP_URL}/inventory_sources/{inventory_source_id}/update/", method="POST")
223 | 
224 | @mcp.tool()
225 | async def create_inventory(
226 |     name: str,
227 |     organization_id: int,
228 |     description: str = "",
229 |     kind: str = "",
230 |     host_filter: str = "",
231 |     variables: dict = None,
232 |     prevent_instance_group_fallback: bool = False,
233 | ) -> Any:
234 |     """Create an inventory in Ansible Automation Platform."""
235 |     payload = {
236 |         "name": name,
237 |         "organization": organization_id,
238 |         "description": description,
239 |         "kind": kind,
240 |         "host_filter": host_filter,
241 |         "variables": variables,
242 |         "prevent_instance_group_fallback": prevent_instance_group_fallback,
243 |     }
244 |     return await make_request(f"{AAP_URL}/inventories/", method="POST", json=payload)
245 | 
246 | @mcp.tool()
247 | async def delete_inventory(inventory_id: int) -> Any:
248 |     """Delete an inventory from Ansible Automation Platform."""
249 |     return await make_request(f"{AAP_URL}/inventories/{inventory_id}/", method="DELETE")
250 | 
251 | @mcp.tool()
252 | async def list_job_templates() -> Any:
253 |     """List all job templates available in Ansible Automation Platform."""
254 |     return await make_request(f"{AAP_URL}/job_templates/")
255 | 
256 | @mcp.tool()
257 | async def get_job_template(template_id: int) -> Any:
258 |     """Retrieve details of a specific job template."""
259 |     return await make_request(f"{AAP_URL}/job_templates/{template_id}/")
260 | 
261 | @mcp.tool()
262 | async def list_jobs() -> Any:
263 |     """List all jobs available in Ansible Automation Platform."""
264 |     return await make_request(f"{AAP_URL}/jobs/")
265 | 
266 | @mcp.tool()
267 | async def list_recent_jobs(hours: int = 24) -> Any:
268 |     """List all jobs executed in the last specified hours (default 24 hours)."""
269 |     from datetime import datetime, timedelta
270 |     
271 |     time_filter = (datetime.utcnow() - timedelta(hours=hours)).isoformat() + "Z"
272 |     return await make_request(f"{AAP_URL}/jobs/?created__gte={time_filter}")
273 | 
274 | if __name__ == "__main__":
275 |     # Change from stdio to sse transport
276 |     mcp.run(transport="sse")
277 | 
```

--------------------------------------------------------------------------------
/app.py:
--------------------------------------------------------------------------------

```python
  1 | import streamlit as st
  2 | from llama_stack_client import LlamaStackClient
  3 | from llama_stack_client.lib.agents.agent import Agent
  4 | from llama_stack_client.types.agent_create_params import AgentConfig
  5 | import os
  6 | import json
  7 | from datetime import datetime
  8 | 
  9 | # Initialize LlamaStack client
 10 | base_url = os.getenv("BASE_URL", "http://localhost:8321") 
 11 | client = LlamaStackClient(base_url=base_url)
 12 | 
 13 | # Page configuration
 14 | st.set_page_config(
 15 |     page_title="Llama-stack Chat",
 16 |     page_icon="🦙",
 17 |     layout="wide",
 18 | )
 19 | 
 20 | # Styling for code blocks
 21 | st.markdown("""
 22 | <style>
 23 |     code {
 24 |         background-color: #f0f2f6;
 25 |         border-radius: 3px;
 26 |         padding: 0.2em 0.4em;
 27 |     }
 28 |     pre {
 29 |         background-color: #f0f2f6;
 30 |         border-radius: 5px;
 31 |         padding: 0.5em;
 32 |     }
 33 |     .chat-message {
 34 |         padding: 1.5rem;
 35 |         border-radius: 0.5rem;
 36 |         margin-bottom: 1rem;
 37 |     }
 38 |     .blinking-cursor {
 39 |         animation: blink 1s step-end infinite;
 40 |     }
 41 |     @keyframes blink {
 42 |         50% { opacity: 0; }
 43 |     }
 44 | </style>
 45 | """, unsafe_allow_html=True)
 46 | 
 47 | # Get providers with valid credentials (simplified approach)
 48 | @st.cache_data(ttl=300)
 49 | def get_configured_providers():
 50 |     try:
 51 |         # This is a simplified approach that assumes Anthropic is configured
 52 |         # In a real implementation, you would check LLama Stack's configuration
 53 |         return ["anthropic"]
 54 |     except Exception as e:
 55 |         st.error(f"Error checking configured providers: {e}")
 56 |         return ["anthropic"]  # Fallback to Anthropic
 57 | 
 58 | # Get available models from configured providers
 59 | @st.cache_data(ttl=300)
 60 | def get_available_models():
 61 |     try:
 62 |         models = client.models.list()
 63 |         configured_providers = get_configured_providers()
 64 |         
 65 |         # Filter for LLM models from configured providers
 66 |         available_models = [
 67 |             model.identifier 
 68 |             for model in models 
 69 |             if model.model_type == "llm" and model.provider_id in configured_providers
 70 |         ]
 71 |         
 72 |         return available_models
 73 |     except Exception as e:
 74 |         st.error(f"Error fetching models: {e}")
 75 |         return ["anthropic/claude-3-7-sonnet-latest"]  # Fallback default
 76 | 
 77 | # Get all available toolgroups
 78 | @st.cache_data(ttl=300)
 79 | def get_all_toolgroups():
 80 |     try:
 81 |         toolgroups = client.toolgroups.list()
 82 |         return [toolgroup.identifier for toolgroup in toolgroups]
 83 |     except Exception as e:
 84 |         st.error(f"Error fetching toolgroups: {e}")
 85 |         return ["mcp::ansible"]  # Fallback default
 86 | 
 87 | # Initialize session state
 88 | if "messages" not in st.session_state:
 89 |     st.session_state.messages = []
 90 | if "system_instruction" not in st.session_state:
 91 |     st.session_state.system_instruction = "You are a helpful assistant that can access various tools to help the user."
 92 | if "show_save_dialog" not in st.session_state:
 93 |     st.session_state.show_save_dialog = False
 94 | if "saved_chats" not in st.session_state:
 95 |     st.session_state.saved_chats = "{}"
 96 | if "chat_updated" not in st.session_state:
 97 |     st.session_state.chat_updated = False
 98 | 
 99 | # Function to check if there are messages
100 | def has_messages():
101 |     return len(st.session_state.messages) > 0
102 | 
103 | # Streamlit UI
104 | st.title("Llama-stack Chat")
105 | st.markdown("Chat with LLama Stack and its toolgroups")
106 | 
107 | # Sidebar configurations
108 | with st.sidebar:
109 |     st.header("Configuration")
110 |     
111 |     # Model selection
112 |     available_models = get_available_models()
113 |     selected_model = st.selectbox("Select Model", available_models)
114 |     
115 |     # Get all toolgroups automatically and display as collapsible
116 |     all_toolgroups = get_all_toolgroups()
117 |     with st.expander(f"{len(all_toolgroups)} Toolgroups Loaded"):
118 |         for toolgroup in all_toolgroups:
119 |             st.caption(f"• {toolgroup}")
120 |     
121 |     # System instructions
122 |     with st.expander("System Instructions", expanded=False):
123 |         new_instruction = st.text_area(
124 |             "Customize how the assistant behaves:", 
125 |             st.session_state.system_instruction,
126 |             height=100
127 |         )
128 |         if new_instruction != st.session_state.system_instruction:
129 |             st.session_state.system_instruction = new_instruction
130 |             st.toast("System instructions updated")
131 |     
132 |     # Collapsible Query Context section
133 |     with st.expander("Query Context", expanded=False):
134 |         query_context = st.text_area("Add background information for this query:", "", height=150)
135 |         st.caption("This information will be included with each of your queries but won't be visible in the chat.")
136 |     
137 |     # Temperature in collapsible
138 |     with st.expander("Temperature", expanded=False):
139 |         temperature = st.slider("Temperature", min_value=0.0, max_value=1.0, value=0.7, step=0.1)
140 |         top_p = st.slider("Top P", min_value=0.0, max_value=1.0, value=0.9, step=0.1)
141 |     
142 |     # Chat history management
143 |     st.header("Chat Management")
144 |     
145 |     # Using two separate buttons instead of columns for better visibility
146 |     clear_col, save_col = st.columns(2)
147 |     
148 |     # Clear chat button
149 |     with clear_col:
150 |         if st.button("🗑️ Clear Chat", key="clear_chat"):
151 |             st.session_state.messages = []
152 |             st.session_state.chat_updated = True
153 |             st.rerun()
154 |     
155 |     # Save button - disabled when no messages
156 |     with save_col:
157 |         if st.button("💾 Save Chat", key="save_chat", disabled=not has_messages()):
158 |             st.session_state.show_save_dialog = True
159 |     
160 |     # Save dialog - shown when save button is clicked
161 |     if st.session_state.show_save_dialog:
162 |         st.text_input("Conversation name:", key="save_name")
163 |         save_confirm, cancel = st.columns(2)
164 |         
165 |         with save_confirm:
166 |             if st.button("Confirm Save", key="confirm_save"):
167 |                 if st.session_state.save_name:
168 |                     saved_chats = json.loads(st.session_state.saved_chats)
169 |                     saved_chats[st.session_state.save_name] = st.session_state.messages
170 |                     st.session_state.saved_chats = json.dumps(saved_chats)
171 |                     st.session_state.show_save_dialog = False
172 |                     st.toast(f"Saved conversation: {st.session_state.save_name}")
173 |                     st.rerun()
174 |                 else:
175 |                     st.warning("Please enter a name for the conversation")
176 |         
177 |         with cancel:
178 |             if st.button("Cancel", key="cancel_save"):
179 |                 st.session_state.show_save_dialog = False
180 |                 st.rerun()
181 |     
182 |     # Export chat button - disabled when no messages
183 |     if st.button("📥 Export Chat", key="export_chat", disabled=not has_messages()):
184 |         chat_export = ""
185 |         for msg in st.session_state.messages:
186 |             prefix = "🧑" if msg["role"] == "user" else "🤖"
187 |             chat_export += f"{prefix} **{msg['role'].capitalize()}**: {msg['content']}\n\n"
188 |         
189 |         # Create download link
190 |         timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
191 |         filename = f"chat_export_{timestamp}.md"
192 |         st.download_button(
193 |             label="Download Chat",
194 |             data=chat_export,
195 |             file_name=filename,
196 |             mime="text/markdown",
197 |             key="download_chat"
198 |         )
199 |     
200 |     # Load saved conversations
201 |     st.header("Saved Conversations")
202 |     
203 |     # Load saved conversation
204 |     saved_chats = json.loads(st.session_state.saved_chats)
205 |     if saved_chats:
206 |         chat_names = list(saved_chats.keys())
207 |         selected_chat = st.selectbox("Select a saved conversation:", [""] + chat_names)
208 |         if selected_chat and st.button("📂 Load Conversation"):
209 |             st.session_state.messages = saved_chats[selected_chat]
210 |             st.session_state.chat_updated = True
211 |             st.toast(f"Loaded conversation: {selected_chat}")
212 |             st.rerun()
213 |     else:
214 |         st.caption("No saved conversations yet")
215 | 
216 | # Display chat history
217 | for message in st.session_state.messages:
218 |     with st.chat_message(message["role"]):
219 |         # Apply syntax highlighting for code blocks
220 |         content = message["content"]
221 |         st.markdown(content)
222 | 
223 | # Input for new messages
224 | prompt = st.chat_input("Ask something...")
225 | if prompt:
226 |     full_response = ""
227 |     agent_config = AgentConfig(
228 |         model=selected_model,
229 |         instructions=st.session_state.system_instruction,
230 |         sampling_params={
231 |             "strategy": {"type": "top_p", "temperature": temperature, "top_p": top_p},
232 |         },
233 |         toolgroups=all_toolgroups,  # Use all toolgroups automatically
234 |         tool_choice="auto",
235 |         input_shields=[],
236 |         output_shields=[],
237 |         enable_session_persistence=True,
238 |     )
239 |     
240 |     try:
241 |         agent = Agent(client, agent_config)
242 |         session_id = agent.create_session("chat-session")
243 |         
244 |         # Add user input to chat history
245 |         st.session_state.messages.append({"role": "user", "content": prompt})
246 |         with st.chat_message("user"):
247 |             st.markdown(prompt)
248 | 
249 |         # Get response from LlamaStack API
250 |         with st.chat_message("assistant"):
251 |             message_placeholder = st.empty()
252 |             
253 |             # Add query context if provided
254 |             user_message = prompt
255 |             if query_context:
256 |                 user_message += f"\n\nContext: {query_context}"
257 |             
258 |             response = agent.create_turn(
259 |                 messages=[{"role": "user", "content": user_message}],
260 |                 session_id=session_id,
261 |             )
262 |          
263 |             for chunk in response:
264 |                 if hasattr(chunk, 'event') and hasattr(chunk.event, 'payload'):
265 |                     payload = chunk.event.payload
266 |                     if hasattr(payload, 'event_type') and payload.event_type == "step_progress":
267 |                         if hasattr(payload, 'delta') and hasattr(payload.delta, 'type') and payload.delta.type == "text":
268 |                             full_response += payload.delta.text
269 |                             message_placeholder.markdown(full_response + "▌")
270 |             
271 |             message_placeholder.markdown(full_response)
272 |             st.session_state.messages.append({"role": "assistant", "content": full_response})
273 |             
274 |         # Set flag to indicate chat has been updated
275 |         st.session_state.chat_updated = True
276 |         
277 |         # Force a rerun to update the UI state (including button states)
278 |         st.rerun()
279 |     
280 |     except Exception as e:
281 |         st.error(f"Error: {str(e)}")
282 | 
283 | # Reset the chat_updated flag after processing
284 | if st.session_state.chat_updated:
285 |     st.session_state.chat_updated = False
286 | 
```

--------------------------------------------------------------------------------
/redhat_insights_mcp.py:
--------------------------------------------------------------------------------

```python
  1 | """
  2 | Red Hat Insights MCP Server
  3 | 
  4 | This server requires a Red Hat service account with client credentials.
  5 | 
  6 | Setup:
  7 | 1. Create a service account in Red Hat Console (console.redhat.com)
  8 | 2. Assign appropriate permissions via User Access → Groups
  9 | 3. Set environment variables:
 10 |    export INSIGHTS_CLIENT_ID="your-client-id"
 11 |    export INSIGHTS_CLIENT_SECRET="your-client-secret"
 12 |    
 13 | Optional:
 14 |    export INSIGHTS_BASE_URL="https://console.redhat.com/api"
 15 |    export SSO_URL="https://sso.redhat.com/auth/realms/redhat-external/protocol/openid-connect/token"
 16 | """
 17 | 
 18 | import os
 19 | import httpx
 20 | from mcp.server.fastmcp import FastMCP
 21 | from typing import Any, Optional
 22 | from datetime import datetime, timedelta
 23 | 
 24 | # Environment variables for authentication
 25 | INSIGHTS_BASE_URL = os.getenv("INSIGHTS_BASE_URL", "https://console.redhat.com/api")
 26 | INSIGHTS_CLIENT_ID = os.getenv("INSIGHTS_CLIENT_ID")
 27 | INSIGHTS_CLIENT_SECRET = os.getenv("INSIGHTS_CLIENT_SECRET")
 28 | SSO_URL = os.getenv("SSO_URL", "https://sso.redhat.com/auth/realms/redhat-external/protocol/openid-connect/token")
 29 | 
 30 | if not INSIGHTS_CLIENT_ID or not INSIGHTS_CLIENT_SECRET:
 31 |     raise ValueError("INSIGHTS_CLIENT_ID and INSIGHTS_CLIENT_SECRET are required")
 32 | 
 33 | # Global variable to store the access token
 34 | _access_token = None
 35 | _token_expires_at = None
 36 | 
 37 | # Initialize FastMCP
 38 | mcp = FastMCP("insights")
 39 | 
 40 | async def get_access_token() -> str:
 41 |     """Get or refresh the access token using client credentials."""
 42 |     global _access_token, _token_expires_at
 43 |     
 44 |     # Check if we have a valid token
 45 |     if _access_token and _token_expires_at and datetime.utcnow() < _token_expires_at:
 46 |         return _access_token
 47 |     
 48 |     # Request new token
 49 |     async with httpx.AsyncClient() as client:
 50 |         response = await client.post(
 51 |             SSO_URL,
 52 |             headers={"Content-Type": "application/x-www-form-urlencoded"},
 53 |             data={
 54 |                 "grant_type": "client_credentials",
 55 |                 "scope": "api.console",
 56 |                 "client_id": INSIGHTS_CLIENT_ID,
 57 |                 "client_secret": INSIGHTS_CLIENT_SECRET
 58 |             }
 59 |         )
 60 |     
 61 |     if response.status_code != 200:
 62 |         raise Exception(f"Failed to get access token: {response.status_code} {response.text}")
 63 |     
 64 |     token_data = response.json()
 65 |     _access_token = token_data["access_token"]
 66 |     # Set expiration time with some buffer (subtract 60 seconds)
 67 |     expires_in = token_data.get("expires_in", 300)  # Default to 5 minutes
 68 |     _token_expires_at = datetime.utcnow() + timedelta(seconds=expires_in - 60)
 69 |     
 70 |     return _access_token
 71 | 
 72 | async def make_request(url: str, method: str = "GET", json: dict = None, params: dict = None) -> Any:
 73 |     """Helper function to make authenticated API requests to Red Hat Insights."""
 74 |     token = await get_access_token()
 75 |     headers = {
 76 |         "Authorization": f"Bearer {token}",
 77 |         "Content-Type": "application/json"
 78 |     }
 79 |     
 80 |     async with httpx.AsyncClient() as client:
 81 |         response = await client.request(method, url, headers=headers, json=json, params=params)
 82 |     
 83 |     if response.status_code not in [200, 201, 204]:
 84 |         return f"Error {response.status_code}: {response.text}"
 85 |     
 86 |     return response.json() if "application/json" in response.headers.get("Content-Type", "") else response.text
 87 | 
 88 | # Authentication Test
 89 | @mcp.tool()
 90 | async def test_authentication() -> Any:
 91 |     """Test authentication with Red Hat Insights using service account credentials."""
 92 |     try:
 93 |         token = await get_access_token()
 94 |         # Test with a simple API call
 95 |         result = await make_request(f"{INSIGHTS_BASE_URL}/inventory/v1/hosts?limit=1")
 96 |         return {"status": "success", "message": "Authentication successful", "sample_data": result}
 97 |     except Exception as e:
 98 |         return {"status": "error", "message": f"Authentication failed: {str(e)}"}
 99 | 
100 | # Host Inventory Management Tools
101 | @mcp.tool()
102 | async def list_systems(limit: int = 50, offset: int = 0, display_name: str = None, staleness: str = None) -> Any:
103 |     """List all hosts/systems registered with Red Hat Insights. Use staleness='fresh' or 'stale' to filter."""
104 |     params = {"limit": limit, "offset": offset}
105 |     if display_name:
106 |         params["display_name"] = display_name
107 |     if staleness:
108 |         params["staleness"] = staleness
109 |     return await make_request(f"{INSIGHTS_BASE_URL}/inventory/v1/hosts", params=params)
110 | 
111 | @mcp.tool()
112 | async def get_system(system_id: str) -> Any:
113 |     """Get details of a specific system by UUID."""
114 |     return await make_request(f"{INSIGHTS_BASE_URL}/inventory/v1/hosts/{system_id}")
115 | 
116 | @mcp.tool()
117 | async def get_system_profile(system_id: str, fields: list[str] = None) -> Any:
118 |     """Get system profile/facts for a specific system. Specify fields to limit response."""
119 |     url = f"{INSIGHTS_BASE_URL}/inventory/v1/hosts/{system_id}/system_profile"
120 |     params = {}
121 |     if fields:
122 |         for field in fields:
123 |             params[f"fields[system_profile]"] = field
124 |     return await make_request(url, params=params)
125 | 
126 | @mcp.tool()
127 | async def get_system_tags(system_id: str) -> Any:
128 |     """Get tags for a specific system."""
129 |     return await make_request(f"{INSIGHTS_BASE_URL}/inventory/v1/hosts/{system_id}/tags")
130 | 
131 | @mcp.tool()
132 | async def delete_system(system_id: str) -> Any:
133 |     """Remove a system from Red Hat Insights inventory."""
134 |     return await make_request(f"{INSIGHTS_BASE_URL}/inventory/v1/hosts/{system_id}", method="DELETE")
135 | 
136 | # Vulnerability Management Tools
137 | @mcp.tool()
138 | async def list_vulnerabilities(
139 |     limit: int = 50, 
140 |     offset: int = 0,
141 |     affecting: bool = True,
142 |     cvss_score_gte: float = None,
143 |     cvss_score_lte: float = None
144 | ) -> Any:
145 |     """List vulnerabilities affecting your systems. Set affecting=True to only show CVEs affecting systems."""
146 |     params = {"limit": limit, "offset": offset}
147 |     if affecting:
148 |         params["affecting"] = "true"
149 |     if cvss_score_gte:
150 |         params["cvss_score_gte"] = cvss_score_gte
151 |     if cvss_score_lte:
152 |         params["cvss_score_lte"] = cvss_score_lte
153 |     return await make_request(f"{INSIGHTS_BASE_URL}/vulnerability/v1/vulnerabilities/cves", params=params)
154 | 
155 | @mcp.tool()
156 | async def get_vulnerability_executive_report() -> Any:
157 |     """Get executive vulnerability report with CVE summaries by severity."""
158 |     return await make_request(f"{INSIGHTS_BASE_URL}/vulnerability/v1/report/executive")
159 | 
160 | # Patch Management Tools
161 | @mcp.tool()
162 | async def list_advisories(
163 |     limit: int = 50,
164 |     offset: int = 0,
165 |     advisory_type: str = None,
166 |     severity: str = None
167 | ) -> Any:
168 |     """List available advisories (patches). Export format from patch/v3."""
169 |     params = {"limit": limit, "offset": offset}
170 |     if advisory_type:
171 |         params["advisory_type"] = advisory_type
172 |     if severity:
173 |         params["severity"] = severity
174 |     return await make_request(f"{INSIGHTS_BASE_URL}/patch/v3/export/advisories", params=params)
175 | 
176 | # Compliance Tools
177 | @mcp.tool()
178 | async def list_compliance_policies(limit: int = 50, offset: int = 0) -> Any:
179 |     """List SCAP compliance policies."""
180 |     params = {"limit": limit, "offset": offset}
181 |     return await make_request(f"{INSIGHTS_BASE_URL}/compliance/v2/policies", params=params)
182 | 
183 | @mcp.tool()
184 | async def list_compliance_systems(assigned_or_scanned: bool = True) -> Any:
185 |     """List systems associated with SCAP policies."""
186 |     params = {}
187 |     if assigned_or_scanned:
188 |         params["filter"] = "assigned_or_scanned=true"
189 |     return await make_request(f"{INSIGHTS_BASE_URL}/compliance/v2/systems", params=params)
190 | 
191 | @mcp.tool()
192 | async def associate_compliance_policy(policy_id: str, system_id: str) -> Any:
193 |     """Associate a system with a SCAP compliance policy."""
194 |     return await make_request(f"{INSIGHTS_BASE_URL}/compliance/v2/policies/{policy_id}/systems/{system_id}", method="PATCH")
195 | 
196 | @mcp.tool()
197 | async def list_compliance_reports(limit: int = 50, offset: int = 0) -> Any:
198 |     """List all compliance reports."""
199 |     params = {"limit": limit, "offset": offset}
200 |     return await make_request(f"{INSIGHTS_BASE_URL}/compliance/v2/reports", params=params)
201 | 
202 | # Recommendations and Advisor Tools
203 | @mcp.tool()
204 | async def list_recommendations(
205 |     category: str = None,
206 |     impact: str = None,
207 |     limit: int = 50,
208 |     offset: int = 0
209 | ) -> Any:
210 |     """List available recommendation rules from Advisor."""
211 |     params = {"limit": limit, "offset": offset}
212 |     if category:
213 |         params["category"] = category
214 |     if impact:
215 |         params["impact"] = impact
216 |     return await make_request(f"{INSIGHTS_BASE_URL}/insights/v1/rule", params=params)
217 | 
218 | @mcp.tool()
219 | async def export_rule_hits(has_playbook: bool = None, format: str = "json") -> Any:
220 |     """Export all rule hits (recommendations) for systems. Set has_playbook=True for Ansible playbooks."""
221 |     params = {}
222 |     if has_playbook:
223 |         params["has_playbook"] = "true"
224 |     return await make_request(f"{INSIGHTS_BASE_URL}/insights/v1/export/hits", params=params)
225 | 
226 | @mcp.tool()
227 | async def get_system_recommendations(system_id: str) -> Any:
228 |     """Get recommendation summary for a specific system."""
229 |     return await make_request(f"{INSIGHTS_BASE_URL}/insights/v1/system/{system_id}")
230 | 
231 | # Policy Management Tools
232 | @mcp.tool()
233 | async def list_policies(limit: int = 50, offset: int = 0) -> Any:
234 |     """List all defined custom policies."""
235 |     params = {"limit": limit, "offset": offset}
236 |     return await make_request(f"{INSIGHTS_BASE_URL}/policies/v1/policies", params=params)
237 | 
238 | @mcp.tool()
239 | async def create_policy(name: str, description: str, conditions: str, actions: str = "notification", is_enabled: bool = True) -> Any:
240 |     """Create a new custom policy. Example conditions: 'arch = \"x86_64\"'"""
241 |     payload = {
242 |         "name": name,
243 |         "description": description,
244 |         "conditions": conditions,
245 |         "actions": actions,
246 |         "isEnabled": is_enabled
247 |     }
248 |     return await make_request(f"{INSIGHTS_BASE_URL}/policies/v1/policies", method="POST", json=payload)
249 | 
250 | @mcp.tool()
251 | async def get_policy_triggers(policy_id: str) -> Any:
252 |     """Get systems that triggered a specific policy."""
253 |     return await make_request(f"{INSIGHTS_BASE_URL}/policies/v1/policies/{policy_id}/history/trigger")
254 | 
255 | # Remediation Tools
256 | @mcp.tool()
257 | async def list_remediations(limit: int = 50, offset: int = 0) -> Any:
258 |     """List all defined remediation plans."""
259 |     params = {"limit": limit, "offset": offset}
260 |     return await make_request(f"{INSIGHTS_BASE_URL}/remediations/v1/remediations", params=params)
261 | 
262 | @mcp.tool()
263 | async def create_remediation(name: str, issues: list[dict], auto_reboot: bool = False, archived: bool = False) -> Any:
264 |     """Create a new remediation plan. Issues should be list of dicts with id, resolution, systems."""
265 |     payload = {
266 |         "name": name,
267 |         "auto_reboot": auto_reboot,
268 |         "archived": archived,
269 |         "add": {
270 |             "issues": issues
271 |         }
272 |     }
273 |     return await make_request(f"{INSIGHTS_BASE_URL}/remediations/v1/remediations", method="POST", json=payload)
274 | 
275 | @mcp.tool()
276 | async def get_remediation_playbook(remediation_id: str) -> Any:
277 |     """Get Ansible playbook for a remediation plan."""
278 |     return await make_request(f"{INSIGHTS_BASE_URL}/remediations/v1/remediations/{remediation_id}/playbook")
279 | 
280 | @mcp.tool()
281 | async def execute_remediation(remediation_id: str) -> Any:
282 |     """Execute a remediation plan."""
283 |     return await make_request(f"{INSIGHTS_BASE_URL}/remediations/v1/remediations/{remediation_id}/playbook_runs", method="POST")
284 | 
285 | # Subscription Management
286 | @mcp.tool()
287 | async def list_rhel_subscriptions(product: str = "RHEL for x86", limit: int = 50, offset: int = 0) -> Any:
288 |     """List systems with RHEL subscriptions. Product examples: 'RHEL for x86', 'RHEL for x86_64'"""
289 |     from urllib.parse import quote
290 |     encoded_product = quote(product)
291 |     params = {"limit": limit, "offset": offset}
292 |     return await make_request(f"{INSIGHTS_BASE_URL}/rhsm-subscriptions/v1/instances/products/{encoded_product}", params=params)
293 | 
294 | # Export Tools
295 | @mcp.tool()
296 | async def create_export(name: str, format: str, application: str, resource: str) -> Any:
297 |     """Create an export request. Common applications: 'urn:redhat:application:inventory', 'subscriptions'"""
298 |     payload = {
299 |         "name": name,
300 |         "format": format,
301 |         "sources": [{
302 |             "application": application,
303 |             "resource": resource
304 |         }]
305 |     }
306 |     return await make_request(f"{INSIGHTS_BASE_URL}/export/v1/exports", method="POST", json=payload)
307 | 
308 | @mcp.tool()
309 | async def get_export_status(export_id: str) -> Any:
310 |     """Get status of an export request."""
311 |     return await make_request(f"{INSIGHTS_BASE_URL}/export/v1/exports/{export_id}/status")
312 | 
313 | @mcp.tool()
314 | async def download_export(export_id: str) -> Any:
315 |     """Download completed export as ZIP file."""
316 |     return await make_request(f"{INSIGHTS_BASE_URL}/export/v1/exports/{export_id}")
317 | 
318 | # Notifications and Integrations
319 | @mcp.tool()
320 | async def list_notification_events(start_date: str = None, end_date: str = None, limit: int = 20, offset: int = 0) -> Any:
321 |     """Get notification event history. Dates in YYYY-MM-DD format."""
322 |     params = {"limit": limit, "offset": offset}
323 |     if start_date:
324 |         params["startDate"] = start_date
325 |     if end_date:
326 |         params["endDate"] = end_date
327 |     return await make_request(f"{INSIGHTS_BASE_URL}/notifications/v1/notifications/events", params=params)
328 | 
329 | @mcp.tool()
330 | async def list_integrations() -> Any:
331 |     """List configured third-party integrations."""
332 |     return await make_request(f"{INSIGHTS_BASE_URL}/integrations/v1/endpoints")
333 | 
334 | # Analytics and Statistics
335 | @mcp.tool()
336 | async def get_insights_overview() -> Any:
337 |     """Get overview of systems and basic statistics by querying inventory."""
338 |     # Use inventory endpoint to get basic stats since there's no single stats endpoint
339 |     result = await make_request(f"{INSIGHTS_BASE_URL}/inventory/v1/hosts?limit=1")
340 |     return result
341 | 
342 | # Content Sources and Templates
343 | @mcp.tool()
344 | async def list_repositories(limit: int = 50, offset: int = 0) -> Any:
345 |     """List all existing content repositories."""
346 |     params = {"limit": limit, "offset": offset}
347 |     return await make_request(f"{INSIGHTS_BASE_URL}/content-sources/v1.0/repositories", params=params)
348 | 
349 | @mcp.tool()
350 | async def create_repository(name: str, url: str, distribution_arch: str = "x86_64", distribution_versions: list[str] = None) -> Any:
351 |     """Create a new custom repository."""
352 |     payload = {
353 |         "name": name,
354 |         "url": url,
355 |         "distribution_arch": distribution_arch,
356 |         "distribution_versions": distribution_versions or ["9"],
357 |         "metadata_verification": False,
358 |         "module_hotfixes": False,
359 |         "snapshot": False
360 |     }
361 |     return await make_request(f"{INSIGHTS_BASE_URL}/content-sources/v1.0/repositories", method="POST", json=payload)
362 | 
363 | @mcp.tool()
364 | async def list_content_templates(limit: int = 50, offset: int = 0) -> Any:
365 |     """List all content templates."""
366 |     params = {"limit": limit, "offset": offset}
367 |     return await make_request(f"{INSIGHTS_BASE_URL}/content-sources/v1.0/templates", params=params)
368 | 
369 | @mcp.tool()
370 | async def create_content_template(name: str, arch: str, version: str, repository_uuids: list[str], description: str = "") -> Any:
371 |     """Create a new content template."""
372 |     payload = {
373 |         "name": name,
374 |         "arch": arch,
375 |         "version": version,
376 |         "description": description,
377 |         "repository_uuids": repository_uuids,
378 |         "use_latest": True
379 |     }
380 |     return await make_request(f"{INSIGHTS_BASE_URL}/content-sources/v1.0/templates", method="POST", json=payload)
381 | 
382 | if __name__ == "__main__":
383 |     mcp.run(transport="stdio")
384 | 
```