#
tokens: 16603/50000 8/8 files
lines: off (toggle) GitHub
raw markdown copy
# Directory Structure

```
├── ansible.py
├── app.py
├── claude_desktop_config.json
├── eda.py
├── llama_quick_start_guide.md
├── mcp_server
│   ├── ansible.py
│   ├── Containerfile
│   └── deployment.yaml
├── README.md
└── redhat_insights_mcp.py
```

# Files

--------------------------------------------------------------------------------
/README.md:
--------------------------------------------------------------------------------

```markdown
# Setup of the Environment for the AI Powered Ansible & OpenShift Automation with Model Context Protocols (MCP) Servers

## Overview

This guide will walk you through setting up the MCP Servers + Claude Desktop portions of the demo that focused on using Claude Desktop to interact with your Ansible Automation Platform and OpenShift Cluster environments. 

## Prerequisites

Ensure you have the following installed. 

### Required
- An Ansible Automation Platform (AAP) environment
- An OpenShift Cluster with OpenShift Virtualization
- [Claude Desktop](https://claude.ai/download) installed on your laptop (Pro Plan required for best results)
- Python 3.10 or higher installed on your laptop
- Ensure you are authenticated with your OpenShift cluster (e.g. exporting kubeconfig)

## Step One: Setup your laptop environment

Install `uv` and setup your Python project and environment.

```
curl -LsSf https://astral.sh/uv/install.sh | sh
```

Install [jbang](https://www.jbang.dev/download/) which will be used when using the Kubernetes MCP Server. (jbang needs to be installed globally, recommend using the homebrew install pattern. If you install it locally (the curl pattern), Claude won't be able to access it).

Restart your terminal to ensure that the `uv` and `jbang` command are now available.

## Step Two: Create and Setup your Project

```
# Create a new directory for our project
uv init ansible
cd ansible

# Create virtual environment and activate it
uv venv
source .venv/bin/activate

# Install dependencies
uv add "mcp[cli]" httpx

# Create our server file
touch ansible.py
```

## Step 3 Building your Ansible Automation Controller MCP Server

This is the MCP Server I used to interact with my automation controller. Feel free to copy/paste this into your `ansible.py` file.

Note: to connect to self-signed SSL, use edit the async client to be https.AsyncClient(verify=False)

```
import os
import httpx
from mcp.server.fastmcp import FastMCP
from typing import Any

# Environment variables for authentication
AAP_URL = os.getenv("AAP_URL")
AAP_TOKEN = os.getenv("AAP_TOKEN")

if not AAP_TOKEN:
    raise ValueError("AAP_TOKEN is required")

# Headers for API authentication
HEADERS = {
    "Authorization": f"Bearer {AAP_TOKEN}",
    "Content-Type": "application/json"
}

# Initialize FastMCP
mcp = FastMCP("ansible")

async def make_request(url: str, method: str = "GET", json: dict = None) -> Any:
    """Helper function to make authenticated API requests to AAP."""
    async with httpx.AsyncClient() as client:
        response = await client.request(method, url, headers=HEADERS, json=json)
    if response.status_code not in [200, 201]:
        return f"Error {response.status_code}: {response.text}"
    return response.json() if "application/json" in response.headers.get("Content-Type", "") else response.text

@mcp.tool()
async def list_inventories() -> Any:
    """List all inventories in Ansible Automation Platform."""
    return await make_request(f"{AAP_URL}/inventories/")

@mcp.tool()
async def get_inventory(inventory_id: str) -> Any:
    """Get details of a specific inventory by ID."""
    return await make_request(f"{AAP_URL}/inventories/{inventory_id}/")

@mcp.tool()
async def run_job(template_id: int, extra_vars: dict = {}) -> Any:
    """Run a job template by ID, optionally with extra_vars."""
    return await make_request(f"{AAP_URL}/job_templates/{template_id}/launch/", method="POST", json={"extra_vars": extra_vars})

@mcp.tool()
async def job_status(job_id: int) -> Any:
    """Check the status of a job by ID."""
    return await make_request(f"{AAP_URL}/jobs/{job_id}/")

@mcp.tool()
async def job_logs(job_id: int) -> str:
    """Retrieve logs for a job."""
    return await make_request(f"{AAP_URL}/jobs/{job_id}/stdout/")

@mcp.tool()
async def create_project(
    name: str,
    organization_id: int,
    source_control_url: str,
    source_control_type: str = "git",
    description: str = "",
    execution_environment_id: int = None,
    content_signature_validation_credential_id: int = None,
    source_control_branch: str = "",
    source_control_refspec: str = "",
    source_control_credential_id: int = None,
    clean: bool = False,
    update_revision_on_launch: bool = False,
    delete: bool = False,
    allow_branch_override: bool = False,
    track_submodules: bool = False,
) -> Any:
    """Create a new project in Ansible Automation Platform."""

    payload = {
        "name": name,
        "description": description,
        "organization": organization_id,
        "scm_type": source_control_type.lower(),  # Git is default
        "scm_url": source_control_url,
        "scm_branch": source_control_branch,
        "scm_refspec": source_control_refspec,
        "scm_clean": clean,
        "scm_delete_on_update": delete,
        "scm_update_on_launch": update_revision_on_launch,
        "allow_override": allow_branch_override,
        "scm_track_submodules": track_submodules,
    }

    if execution_environment_id:
        payload["execution_environment"] = execution_environment_id
    if content_signature_validation_credential_id:
        payload["signature_validation_credential"] = content_signature_validation_credential_id
    if source_control_credential_id:
        payload["credential"] = source_control_credential_id

    return await make_request(f"{AAP_URL}/projects/", method="POST", json=payload)

@mcp.tool()
async def create_job_template(
    name: str,
    project_id: int,
    playbook: str,
    inventory_id: int,
    job_type: str = "run",
    description: str = "",
    credential_id: int = None,
    execution_environment_id: int = None,
    labels: list[str] = None,
    forks: int = 0,
    limit: str = "",
    verbosity: int = 0,
    timeout: int = 0,
    job_tags: list[str] = None,
    skip_tags: list[str] = None,
    extra_vars: dict = None,
    privilege_escalation: bool = False,
    concurrent_jobs: bool = False,
    provisioning_callback: bool = False,
    enable_webhook: bool = False,
    prevent_instance_group_fallback: bool = False,
) -> Any:
    """Create a new job template in Ansible Automation Platform."""

    payload = {
        "name": name,
        "description": description,
        "job_type": job_type,
        "project": project_id,
        "playbook": playbook,
        "inventory": inventory_id,
        "forks": forks,
        "limit": limit,
        "verbosity": verbosity,
        "timeout": timeout,
        "ask_variables_on_launch": bool(extra_vars),
        "ask_tags_on_launch": bool(job_tags),
        "ask_skip_tags_on_launch": bool(skip_tags),
        "ask_credential_on_launch": credential_id is None,
        "ask_execution_environment_on_launch": execution_environment_id is None,
        "ask_labels_on_launch": labels is None,
        "ask_inventory_on_launch": False,  # Inventory is required, so not prompting
        "ask_job_type_on_launch": False,  # Job type is required, so not prompting
        "become_enabled": privilege_escalation,
        "allow_simultaneous": concurrent_jobs,
        "scm_branch": "",
        "webhook_service": "github" if enable_webhook else "",
        "prevent_instance_group_fallback": prevent_instance_group_fallback,
    }

    if credential_id:
        payload["credential"] = credential_id
    if execution_environment_id:
        payload["execution_environment"] = execution_environment_id
    if labels:
        payload["labels"] = labels
    if job_tags:
        payload["job_tags"] = job_tags
    if skip_tags:
        payload["skip_tags"] = skip_tags
    if extra_vars:
        payload["extra_vars"] = extra_vars

    return await make_request(f"{AAP_URL}/job_templates/", method="POST", json=payload)

@mcp.tool()
async def list_inventory_sources() -> Any:
    """List all inventory sources in Ansible Automation Platform."""
    return await make_request(f"{AAP_URL}/inventory_sources/")

@mcp.tool()
async def get_inventory_source(inventory_source_id: int) -> Any:
    """Get details of a specific inventory source."""
    return await make_request(f"{AAP_URL}/inventory_sources/{inventory_source_id}/")

@mcp.tool()
async def create_inventory_source(
    name: str,
    inventory_id: int,
    source: str,
    credential_id: int,
    source_vars: dict = None,
    update_on_launch: bool = True,
    timeout: int = 0,
) -> Any:
    """Create a dynamic inventory source. Claude will ask for the source type and credential before proceeding."""
    valid_sources = [
        "file", "constructed", "scm", "ec2", "gce", "azure_rm", "vmware", "satellite6", "openstack", 
        "rhv", "controller", "insights", "terraform", "openshift_virtualization"
    ]
    
    if source not in valid_sources:
        return f"Error: Invalid source type '{source}'. Please select from: {', '.join(valid_sources)}"
    
    if not credential_id:
        return "Error: Credential is required to create an inventory source."
    
    payload = {
        "name": name,
        "inventory": inventory_id,
        "source": source,
        "credential": credential_id,
        "source_vars": source_vars,
        "update_on_launch": update_on_launch,
        "timeout": timeout,
    }
    return await make_request(f"{AAP_URL}/inventory_sources/", method="POST", json=payload)

@mcp.tool()
async def update_inventory_source(inventory_source_id: int, update_data: dict) -> Any:
    """Update an existing inventory source."""
    return await make_request(f"{AAP_URL}/inventory_sources/{inventory_source_id}/", method="PATCH", json=update_data)

@mcp.tool()
async def delete_inventory_source(inventory_source_id: int) -> Any:
    """Delete an inventory source."""
    return await make_request(f"{AAP_URL}/inventory_sources/{inventory_source_id}/", method="DELETE")

@mcp.tool()
async def sync_inventory_source(inventory_source_id: int) -> Any:
    """Manually trigger a sync for an inventory source."""
    return await make_request(f"{AAP_URL}/inventory_sources/{inventory_source_id}/update/", method="POST")

@mcp.tool()
async def create_inventory(
    name: str,
    organization_id: int,
    description: str = "",
    kind: str = "",
    host_filter: str = "",
    variables: dict = None,
    prevent_instance_group_fallback: bool = False,
) -> Any:
    """Create an inventory in Ansible Automation Platform."""
    payload = {
        "name": name,
        "organization": organization_id,
        "description": description,
        "kind": kind,
        "host_filter": host_filter,
        "variables": variables,
        "prevent_instance_group_fallback": prevent_instance_group_fallback,
    }
    return await make_request(f"{AAP_URL}/inventories/", method="POST", json=payload)

@mcp.tool()
async def delete_inventory(inventory_id: int) -> Any:
    """Delete an inventory from Ansible Automation Platform."""
    return await make_request(f"{AAP_URL}/inventories/{inventory_id}/", method="DELETE")

@mcp.tool()
async def list_job_templates() -> Any:
    """List all job templates available in Ansible Automation Platform."""
    return await make_request(f"{AAP_URL}/job_templates/")

@mcp.tool()
async def get_job_template(template_id: int) -> Any:
    """Retrieve details of a specific job template."""
    return await make_request(f"{AAP_URL}/job_templates/{template_id}/")

@mcp.tool()
async def list_jobs() -> Any:
    """List all jobs available in Ansible Automation Platform."""
    return await make_request(f"{AAP_URL}/jobs/")

@mcp.tool()
async def list_recent_jobs(hours: int = 24) -> Any:
    """List all jobs executed in the last specified hours (default 24 hours)."""
    from datetime import datetime, timedelta
    
    time_filter = (datetime.utcnow() - timedelta(hours=hours)).isoformat() + "Z"
    return await make_request(f"{AAP_URL}/jobs/?created__gte={time_filter}")

if __name__ == "__main__":
    mcp.run(transport="stdio")

```

## Step 4: Configuring Claude Desktop to use your MCP Servers

In my particular case, I want to take advantage of two MCP Servers: the Ansible MCP Server above and the Kubernetes MCP Server that I found within the [quarkus-mcp-servers](https://github.com/quarkiverse/quarkus-mcp-servers/tree/main/kubernetes) Git repo

Open the `claude_desktop_config.json` , which on MacOS is located at

```
~/Library/Application\ Support/Claude/claude_desktop_config.json
```

```
{
  "mcpServers": {
    "ansible": {
        "command": "/absolute/path/to/uv",
        "args": [
            "--directory",
            "/absolute/path/to/ansible_mcp",
            "run",
            "ansible.py"
        ],
        "env": {
            "AAP_TOKEN": "<aap-token>",
            "AAP_URL": "https://<aap-url>/api/controller/v2"
        }
    },
    "kubernetes": {
      "command": "jbang",
      "args": [
        "--quiet",
        "https://github.com/quarkiverse/quarkus-mcp-servers/blob/main/kubernetes/src/main/java/io/quarkiverse/mcp/servers/kubernetes/MCPServerKubernetes.java"
      ]
    }
  }
}
```
Save the file.

WARNING: Absolute path to your `uv` binary is required. Do a `which uv` on your system to get the full path. 

NOTE: If you need to create the AAP_TOKEN, go to the AAP Dashboard, select Access Management -> Users -> <your_user> -> Tokens -> Create token -> Select the Scope dropdown and select 'Write' and click Create token.



## Step 5: Re-Launch Claude Desktop 

If you already had Claude Desktop open, relaunch it, otherwise make sure Claude Desktop is picking up the MCP servers. You can verify this by ensuring the hammer icon is launched.

![Screenshot 2025-02-26 at 3 46 30 PM](https://github.com/user-attachments/assets/064e2edb-dfaa-4250-8a82-d8e59c21644f)

NOTE: The number next to the hammer will vary based up on the amount of MCP tools available. 

Once you click on the hammer icon, you can see a list of tools. Below is an example.

![Screenshot 2025-02-26 at 3 50 23 PM](https://github.com/user-attachments/assets/78ae7be0-e1a6-4fbb-8520-4d57a6563bbe)

## Step 6: Test your Environment

Now with everything setup, see if you can interact with your Ansible Automation Platform and OpenShift cluster. 

Feel free to ask it questions such as:

* How many Job Templates are available?
* How many VMs are on my OpenShift cluster?

NOTE: It is very likely you will need to take advantage of the Claude Desktop Pro Plan in order to get the full functionality. 

## References

[Claude Desktop Quickstart for Server Developers](https://modelcontextprotocol.io/quickstart/server)


## BONUS: Adding Event Driven Ansible MCP Server

If you have setup Event Driven Ansible, you can take advantage of the Event Driven Ansible MCP Server below. The instructions are similar to the above. 

* Create an `eda.py` and store it in your `/absolute/path/to/ansible_mcp`
* Update your `claude_desktop_config.json`
* Restart your Claude Desktop and verify the hammer has picked up your new MCP tools

The two files are listed below for easy copy/paste.

### claude_desktop_config.json
```
{
  "mcpServers": {
    "ansible": {
        "command": "/absolute/path/to/uv",
        "args": [
            "--directory",
            "/absolute/path/to/ansible_mcp",
            "run",
            "ansible.py"
        ],
        "env": {
            "AAP_TOKEN": "<aap-token>",
            "AAP_URL": "https://<aap-url>/api/controller/v2"
        }
    },
    "kubernetes": {
      "command": "jbang",
      "args": [
        "--quiet",
        "https://github.com/quarkiverse/quarkus-mcp-servers/blob/main/kubernetes/src/main/java/io/quarkiverse/mcp/servers/kubernetes/MCPServerKubernetes.java"
      ]
    },
    "eda": {
        "command": "/absolute/path/to/uv",
        "args": [
            "--directory",
            "/absolute/path/to/ansible_mcp",
            "run",
            "eda.py"
        ],
        "env": {
            "EDA_TOKEN": "<EDA_TOKEN>",
            "EDA_URL": "https://<aap-url>/api/eda/v1"
        }
    }
  }
}
```

WARNING: Absolute path to your `uv` binary is required. Do a `which uv` on your system to get the full path. 

NOTE: An EDA Token can be generated from the AAP Dashboard. 

### eda.py MCP Server 

```
import os
import httpx
from mcp.server.fastmcp import FastMCP
from typing import Any, Dict

# Environment variables for authentication
EDA_URL = os.getenv("EDA_URL")
EDA_TOKEN = os.getenv("EDA_TOKEN")

if not EDA_TOKEN:
    raise ValueError("EDA_TOKEN is required")

# Headers for API authentication
HEADERS = {
    "Authorization": f"Bearer {EDA_TOKEN}",
    "Content-Type": "application/json"
}

# Initialize FastMCP
mcp = FastMCP("eda")

async def make_request(url: str, method: str = "GET", json: Dict = None) -> Any:
    """Helper function to make authenticated API requests to EDA."""
    async with httpx.AsyncClient() as client:
        response = await client.request(method, url, headers=HEADERS, json=json)
    if response.status_code not in [200, 201, 204]:
        return f"Error {response.status_code}: {response.text}"
    return response.json() if "application/json" in response.headers.get("Content-Type", "") else response.text

@mcp.tool()
async def list_activations() -> Any:
    """List all activations in Event-Driven Ansible."""
    return await make_request(f"{EDA_URL}/activations/")

@mcp.tool()
async def get_activation(activation_id: int) -> Any:
    """Get details of a specific activation."""
    return await make_request(f"{EDA_URL}/activations/{activation_id}/")

@mcp.tool()
async def create_activation(payload: Dict) -> Any:
    """Create a new activation."""
    return await make_request(f"{EDA_URL}/activations/", method="POST", json=payload)

@mcp.tool()
async def disable_activation(activation_id: int) -> Any:
    """Disable an activation."""
    return await make_request(f"{EDA_URL}/activations/{activation_id}/disable/", method="POST")

@mcp.tool()
async def enable_activation(activation_id: int) -> Any:
    """Enable an activation."""
    return await make_request(f"{EDA_URL}/activations/{activation_id}/enable/", method="POST")

@mcp.tool()
async def restart_activation(activation_id: int) -> Any:
    """Restart an activation."""
    return await make_request(f"{EDA_URL}/activations/{activation_id}/restart/", method="POST")

@mcp.tool()
async def delete_activation(activation_id: int) -> Any:
    """Delete an activation."""
    return await make_request(f"{EDA_URL}/activations/{activation_id}/", method="DELETE")

@mcp.tool()
async def list_decision_environments() -> Any:
    """List all decision environments."""
    return await make_request(f"{EDA_URL}/decision-environments/")

@mcp.tool()
async def create_decision_environment(payload: Dict) -> Any:
    """Create a new decision environment."""
    return await make_request(f"{EDA_URL}/decision-environments/", method="POST", json=payload)

@mcp.tool()
async def list_rulebooks() -> Any:
    """List all rulebooks in EDA."""
    return await make_request(f"{EDA_URL}/rulebooks/")

@mcp.tool()
async def get_rulebook(rulebook_id: int) -> Any:
    """Retrieve details of a specific rulebook."""
    return await make_request(f"{EDA_URL}/rulebooks/{rulebook_id}/")

@mcp.tool()
async def list_event_streams() -> Any:
    """List all event streams."""
    return await make_request(f"{EDA_URL}/event-streams/")

if __name__ == "__main__":
    mcp.run(transport="stdio")
```


```

--------------------------------------------------------------------------------
/claude_desktop_config.json:
--------------------------------------------------------------------------------

```json
{
  "mcpServers": {
    "ansible": {
        "command": "/absolute/path/to/uv",
        "args": [
            "--directory",
            "/absolute/path/to/ansible_mcp",
            "run",
            "ansible.py"
        ],
        "env": {
            "AAP_TOKEN": "<aap-token>",
            "AAP_URL": "https://<my-automation-controller>/api/controller/v2"
        }
    },
    "kubernetes": {
      "command": "jbang",
      "args": [
        "--quiet",
        "https://github.com/quarkiverse/quarkus-mcp-servers/blob/main/kubernetes/src/main/java/io/quarkiverse/mcp/servers/kubernetes/MCPServerKubernetes.java"
      ]
    }
  }
}

```

--------------------------------------------------------------------------------
/mcp_server/deployment.yaml:
--------------------------------------------------------------------------------

```yaml
kind: Deployment
apiVersion: apps/v1
metadata:
  name: ansible-mcp-server
spec:
  selector:
    matchLabels:
      app: ansible-mcp-server
  replicas: 1
  template:
    metadata:
      labels:
        app: ansible-mcp-server
    spec:
      containers:
      - name: ansible-mcp-server
        image: quay.io/rcook/ansible-mcp:amd
        ports:
          - containerPort: 8000
            protocol: TCP
          - containerPort: 8080
            protocol: TCP
        env:
          - name: AAP_TOKEN
            valueFrom:
              secretKeyRef:
                name: aap
                key: token
          - name: AAP_URL
            valueFrom:
              secretKeyRef:
                name: aap
                key: url
        resources: {}

```

--------------------------------------------------------------------------------
/eda.py:
--------------------------------------------------------------------------------

```python
import os
import httpx
from mcp.server.fastmcp import FastMCP
from typing import Any, Dict

# Environment variables for authentication
EDA_URL = os.getenv("EDA_URL")
EDA_TOKEN = os.getenv("EDA_TOKEN")

if not EDA_TOKEN:
    raise ValueError("EDA_TOKEN environment variable is required")

# Headers for API authentication
HEADERS = {
    "Authorization": f"Bearer {EDA_TOKEN}",
    "Content-Type": "application/json"
}

# Initialize FastMCP
mcp = FastMCP("eda")

async def make_request(url: str, method: str = "GET", json: Dict = None) -> Any:
    """Helper function to make authenticated API requests to EDA."""
    async with httpx.AsyncClient() as client:
        response = await client.request(method, url, headers=HEADERS, json=json)
    if response.status_code not in [200, 201, 204]:
        return f"Error {response.status_code}: {response.text}"
    return response.json() if "application/json" in response.headers.get("Content-Type", "") else response.text

@mcp.tool()
async def list_activations() -> Any:
    """List all activations in Event-Driven Ansible."""
    return await make_request(f"{EDA_URL}/activations/")

@mcp.tool()
async def get_activation(activation_id: int) -> Any:
    """Get details of a specific activation."""
    return await make_request(f"{EDA_URL}/activations/{activation_id}/")

@mcp.tool()
async def create_activation(payload: Dict) -> Any:
    """Create a new activation."""
    return await make_request(f"{EDA_URL}/activations/", method="POST", json=payload)

@mcp.tool()
async def disable_activation(activation_id: int) -> Any:
    """Disable an activation."""
    return await make_request(f"{EDA_URL}/activations/{activation_id}/disable/", method="POST")

@mcp.tool()
async def enable_activation(activation_id: int) -> Any:
    """Enable an activation."""
    return await make_request(f"{EDA_URL}/activations/{activation_id}/enable/", method="POST")

@mcp.tool()
async def restart_activation(activation_id: int) -> Any:
    """Restart an activation."""
    return await make_request(f"{EDA_URL}/activations/{activation_id}/restart/", method="POST")

@mcp.tool()
async def delete_activation(activation_id: int) -> Any:
    """Delete an activation."""
    return await make_request(f"{EDA_URL}/activations/{activation_id}/", method="DELETE")

@mcp.tool()
async def list_decision_environments() -> Any:
    """List all decision environments."""
    return await make_request(f"{EDA_URL}/decision-environments/")

@mcp.tool()
async def create_decision_environment(payload: Dict) -> Any:
    """Create a new decision environment."""
    return await make_request(f"{EDA_URL}/decision-environments/", method="POST", json=payload)

@mcp.tool()
async def list_rulebooks() -> Any:
    """List all rulebooks in EDA."""
    return await make_request(f"{EDA_URL}/rulebooks/")

@mcp.tool()
async def get_rulebook(rulebook_id: int) -> Any:
    """Retrieve details of a specific rulebook."""
    return await make_request(f"{EDA_URL}/rulebooks/{rulebook_id}/")

@mcp.tool()
async def list_event_streams() -> Any:
    """List all event streams."""
    return await make_request(f"{EDA_URL}/event-streams/")

if __name__ == "__main__":
    mcp.run(transport="stdio")


```

--------------------------------------------------------------------------------
/ansible.py:
--------------------------------------------------------------------------------

```python
import os
import httpx
from mcp.server.fastmcp import FastMCP
from typing import Any

# Environment variables for authentication
AAP_URL = os.getenv("AAP_URL")
AAP_TOKEN = os.getenv("AAP_TOKEN")

if not AAP_TOKEN:
    raise ValueError("AAP_TOKEN is required")

# Headers for API authentication
HEADERS = {
    "Authorization": f"Bearer {AAP_TOKEN}",
    "Content-Type": "application/json"
}

# Initialize FastMCP
mcp = FastMCP("ansible")

async def make_request(url: str, method: str = "GET", json: dict = None) -> Any:
    """Helper function to make authenticated API requests to AAP."""
    async with httpx.AsyncClient() as client:
        response = await client.request(method, url, headers=HEADERS, json=json)
    if response.status_code not in [200, 201]:
        return f"Error {response.status_code}: {response.text}"
    return response.json() if "application/json" in response.headers.get("Content-Type", "") else response.text

@mcp.tool()
async def list_inventories() -> Any:
    """List all inventories in Ansible Automation Platform."""
    return await make_request(f"{AAP_URL}/inventories/")

@mcp.tool()
async def get_inventory(inventory_id: str) -> Any:
    """Get details of a specific inventory by ID."""
    return await make_request(f"{AAP_URL}/inventories/{inventory_id}/")

@mcp.tool()
async def run_job(template_id: int, extra_vars: dict = {}) -> Any:
    """Run a job template by ID, optionally with extra_vars."""
    return await make_request(f"{AAP_URL}/job_templates/{template_id}/launch/", method="POST", json={"extra_vars": extra_vars})

@mcp.tool()
async def job_status(job_id: int) -> Any:
    """Check the status of a job by ID."""
    return await make_request(f"{AAP_URL}/jobs/{job_id}/")

@mcp.tool()
async def job_logs(job_id: int) -> str:
    """Retrieve logs for a job."""
    return await make_request(f"{AAP_URL}/jobs/{job_id}/stdout/")

@mcp.tool()
async def create_project(
    name: str,
    organization_id: int,
    source_control_url: str,
    source_control_type: str = "git",
    description: str = "",
    execution_environment_id: int = None,
    content_signature_validation_credential_id: int = None,
    source_control_branch: str = "",
    source_control_refspec: str = "",
    source_control_credential_id: int = None,
    clean: bool = False,
    update_revision_on_launch: bool = False,
    delete: bool = False,
    allow_branch_override: bool = False,
    track_submodules: bool = False,
) -> Any:
    """Create a new project in Ansible Automation Platform."""

    payload = {
        "name": name,
        "description": description,
        "organization": organization_id,
        "scm_type": source_control_type.lower(),  # Git is default
        "scm_url": source_control_url,
        "scm_branch": source_control_branch,
        "scm_refspec": source_control_refspec,
        "scm_clean": clean,
        "scm_delete_on_update": delete,
        "scm_update_on_launch": update_revision_on_launch,
        "allow_override": allow_branch_override,
        "scm_track_submodules": track_submodules,
    }

    if execution_environment_id:
        payload["execution_environment"] = execution_environment_id
    if content_signature_validation_credential_id:
        payload["signature_validation_credential"] = content_signature_validation_credential_id
    if source_control_credential_id:
        payload["credential"] = source_control_credential_id

    return await make_request(f"{AAP_URL}/projects/", method="POST", json=payload)

@mcp.tool()
async def create_job_template(
    name: str,
    project_id: int,
    playbook: str,
    inventory_id: int,
    job_type: str = "run",
    description: str = "",
    credential_id: int = None,
    execution_environment_id: int = None,
    labels: list[str] = None,
    forks: int = 0,
    limit: str = "",
    verbosity: int = 0,
    timeout: int = 0,
    job_tags: list[str] = None,
    skip_tags: list[str] = None,
    extra_vars: dict = None,
    privilege_escalation: bool = False,
    concurrent_jobs: bool = False,
    provisioning_callback: bool = False,
    enable_webhook: bool = False,
    prevent_instance_group_fallback: bool = False,
) -> Any:
    """Create a new job template in Ansible Automation Platform."""

    payload = {
        "name": name,
        "description": description,
        "job_type": job_type,
        "project": project_id,
        "playbook": playbook,
        "inventory": inventory_id,
        "forks": forks,
        "limit": limit,
        "verbosity": verbosity,
        "timeout": timeout,
        "ask_variables_on_launch": bool(extra_vars),
        "ask_tags_on_launch": bool(job_tags),
        "ask_skip_tags_on_launch": bool(skip_tags),
        "ask_credential_on_launch": credential_id is None,
        "ask_execution_environment_on_launch": execution_environment_id is None,
        "ask_labels_on_launch": labels is None,
        "ask_inventory_on_launch": False,  # Inventory is required, so not prompting
        "ask_job_type_on_launch": False,  # Job type is required, so not prompting
        "become_enabled": privilege_escalation,
        "allow_simultaneous": concurrent_jobs,
        "scm_branch": "",
        "webhook_service": "github" if enable_webhook else "",
        "prevent_instance_group_fallback": prevent_instance_group_fallback,
    }

    if credential_id:
        payload["credential"] = credential_id
    if execution_environment_id:
        payload["execution_environment"] = execution_environment_id
    if labels:
        payload["labels"] = labels
    if job_tags:
        payload["job_tags"] = job_tags
    if skip_tags:
        payload["skip_tags"] = skip_tags
    if extra_vars:
        payload["extra_vars"] = extra_vars

    return await make_request(f"{AAP_URL}/job_templates/", method="POST", json=payload)

@mcp.tool()
async def list_inventory_sources() -> Any:
    """List all inventory sources in Ansible Automation Platform."""
    return await make_request(f"{AAP_URL}/inventory_sources/")

@mcp.tool()
async def get_inventory_source(inventory_source_id: int) -> Any:
    """Get details of a specific inventory source."""
    return await make_request(f"{AAP_URL}/inventory_sources/{inventory_source_id}/")

@mcp.tool()
async def create_inventory_source(
    name: str,
    inventory_id: int,
    source: str,
    credential_id: int,
    source_vars: dict = None,
    update_on_launch: bool = True,
    timeout: int = 0,
) -> Any:
    """Create a dynamic inventory source. Claude will ask for the source type and credential before proceeding."""
    valid_sources = [
        "file", "constructed", "scm", "ec2", "gce", "azure_rm", "vmware", "satellite6", "openstack", 
        "rhv", "controller", "insights", "terraform", "openshift_virtualization"
    ]
    
    if source not in valid_sources:
        return f"Error: Invalid source type '{source}'. Please select from: {', '.join(valid_sources)}"
    
    if not credential_id:
        return "Error: Credential is required to create an inventory source."
    
    payload = {
        "name": name,
        "inventory": inventory_id,
        "source": source,
        "credential": credential_id,
        "source_vars": source_vars,
        "update_on_launch": update_on_launch,
        "timeout": timeout,
    }
    return await make_request(f"{AAP_URL}/inventory_sources/", method="POST", json=payload)

@mcp.tool()
async def update_inventory_source(inventory_source_id: int, update_data: dict) -> Any:
    """Update an existing inventory source."""
    return await make_request(f"{AAP_URL}/inventory_sources/{inventory_source_id}/", method="PATCH", json=update_data)

@mcp.tool()
async def delete_inventory_source(inventory_source_id: int) -> Any:
    """Delete an inventory source."""
    return await make_request(f"{AAP_URL}/inventory_sources/{inventory_source_id}/", method="DELETE")

@mcp.tool()
async def sync_inventory_source(inventory_source_id: int) -> Any:
    """Manually trigger a sync for an inventory source."""
    return await make_request(f"{AAP_URL}/inventory_sources/{inventory_source_id}/update/", method="POST")

@mcp.tool()
async def create_inventory(
    name: str,
    organization_id: int,
    description: str = "",
    kind: str = "",
    host_filter: str = "",
    variables: dict = None,
    prevent_instance_group_fallback: bool = False,
) -> Any:
    """Create an inventory in Ansible Automation Platform."""
    payload = {
        "name": name,
        "organization": organization_id,
        "description": description,
        "kind": kind,
        "host_filter": host_filter,
        "variables": variables,
        "prevent_instance_group_fallback": prevent_instance_group_fallback,
    }
    return await make_request(f"{AAP_URL}/inventories/", method="POST", json=payload)

@mcp.tool()
async def delete_inventory(inventory_id: int) -> Any:
    """Delete an inventory from Ansible Automation Platform."""
    return await make_request(f"{AAP_URL}/inventories/{inventory_id}/", method="DELETE")

@mcp.tool()
async def list_job_templates() -> Any:
    """List all job templates available in Ansible Automation Platform."""
    return await make_request(f"{AAP_URL}/job_templates/")

@mcp.tool()
async def get_job_template(template_id: int) -> Any:
    """Retrieve details of a specific job template."""
    return await make_request(f"{AAP_URL}/job_templates/{template_id}/")

@mcp.tool()
async def list_jobs() -> Any:
    """List all jobs available in Ansible Automation Platform."""
    return await make_request(f"{AAP_URL}/jobs/")

@mcp.tool()
async def list_recent_jobs(hours: int = 24) -> Any:
    """List all jobs executed in the last specified hours (default 24 hours)."""
    from datetime import datetime, timedelta
    
    time_filter = (datetime.utcnow() - timedelta(hours=hours)).isoformat() + "Z"
    return await make_request(f"{AAP_URL}/jobs/?created__gte={time_filter}")

if __name__ == "__main__":
    mcp.run(transport="stdio")

```

--------------------------------------------------------------------------------
/mcp_server/ansible.py:
--------------------------------------------------------------------------------

```python
import os
import httpx
from mcp.server.fastmcp import FastMCP
from typing import Any

# Environment variables for authentication
AAP_URL = os.getenv("AAP_URL")
AAP_TOKEN = os.getenv("AAP_TOKEN")

if not AAP_TOKEN:
    raise ValueError("AAP_TOKEN is required")

# Headers for API authentication
HEADERS = {
    "Authorization": f"Bearer {AAP_TOKEN}",
    "Content-Type": "application/json"
}

# Initialize FastMCP
mcp = FastMCP("ansible")

async def make_request(url: str, method: str = "GET", json: dict = None) -> Any:
    """Helper function to make authenticated API requests to AAP."""
    async with httpx.AsyncClient() as client:
        response = await client.request(method, url, headers=HEADERS, json=json)
    if response.status_code not in [200, 201]:
        return f"Error {response.status_code}: {response.text}"
    return response.json() if "application/json" in response.headers.get("Content-Type", "") else response.text

@mcp.tool()
async def list_inventories() -> Any:
    """List all inventories in Ansible Automation Platform."""
    return await make_request(f"{AAP_URL}/inventories/")

@mcp.tool()
async def get_inventory(inventory_id: str) -> Any:
    """Get details of a specific inventory by ID."""
    return await make_request(f"{AAP_URL}/inventories/{inventory_id}/")

@mcp.tool()
async def run_job(template_id: int, extra_vars: dict = {}) -> Any:
    """Run a job template by ID, optionally with extra_vars."""
    return await make_request(f"{AAP_URL}/job_templates/{template_id}/launch/", method="POST", json={"extra_vars": extra_vars})

@mcp.tool()
async def job_status(job_id: int) -> Any:
    """Check the status of a job by ID."""
    return await make_request(f"{AAP_URL}/jobs/{job_id}/")

@mcp.tool()
async def job_logs(job_id: int) -> str:
    """Retrieve logs for a job."""
    return await make_request(f"{AAP_URL}/jobs/{job_id}/stdout/")

@mcp.tool()
async def create_project(
    name: str,
    organization_id: int,
    source_control_url: str,
    source_control_type: str = "git",
    description: str = "",
    execution_environment_id: int = None,
    content_signature_validation_credential_id: int = None,
    source_control_branch: str = "",
    source_control_refspec: str = "",
    source_control_credential_id: int = None,
    clean: bool = False,
    update_revision_on_launch: bool = False,
    delete: bool = False,
    allow_branch_override: bool = False,
    track_submodules: bool = False,
) -> Any:
    """Create a new project in Ansible Automation Platform."""

    payload = {
        "name": name,
        "description": description,
        "organization": organization_id,
        "scm_type": source_control_type.lower(),  # Git is default
        "scm_url": source_control_url,
        "scm_branch": source_control_branch,
        "scm_refspec": source_control_refspec,
        "scm_clean": clean,
        "scm_delete_on_update": delete,
        "scm_update_on_launch": update_revision_on_launch,
        "allow_override": allow_branch_override,
        "scm_track_submodules": track_submodules,
    }

    if execution_environment_id:
        payload["execution_environment"] = execution_environment_id
    if content_signature_validation_credential_id:
        payload["signature_validation_credential"] = content_signature_validation_credential_id
    if source_control_credential_id:
        payload["credential"] = source_control_credential_id

    return await make_request(f"{AAP_URL}/projects/", method="POST", json=payload)

@mcp.tool()
async def create_job_template(
    name: str,
    project_id: int,
    playbook: str,
    inventory_id: int,
    job_type: str = "run",
    description: str = "",
    credential_id: int = None,
    execution_environment_id: int = None,
    labels: list[str] = None,
    forks: int = 0,
    limit: str = "",
    verbosity: int = 0,
    timeout: int = 0,
    job_tags: list[str] = None,
    skip_tags: list[str] = None,
    extra_vars: dict = None,
    privilege_escalation: bool = False,
    concurrent_jobs: bool = False,
    provisioning_callback: bool = False,
    enable_webhook: bool = False,
    prevent_instance_group_fallback: bool = False,
) -> Any:
    """Create a new job template in Ansible Automation Platform."""

    payload = {
        "name": name,
        "description": description,
        "job_type": job_type,
        "project": project_id,
        "playbook": playbook,
        "inventory": inventory_id,
        "forks": forks,
        "limit": limit,
        "verbosity": verbosity,
        "timeout": timeout,
        "ask_variables_on_launch": bool(extra_vars),
        "ask_tags_on_launch": bool(job_tags),
        "ask_skip_tags_on_launch": bool(skip_tags),
        "ask_credential_on_launch": credential_id is None,
        "ask_execution_environment_on_launch": execution_environment_id is None,
        "ask_labels_on_launch": labels is None,
        "ask_inventory_on_launch": False,  # Inventory is required, so not prompting
        "ask_job_type_on_launch": False,  # Job type is required, so not prompting
        "become_enabled": privilege_escalation,
        "allow_simultaneous": concurrent_jobs,
        "scm_branch": "",
        "webhook_service": "github" if enable_webhook else "",
        "prevent_instance_group_fallback": prevent_instance_group_fallback,
    }

    if credential_id:
        payload["credential"] = credential_id
    if execution_environment_id:
        payload["execution_environment"] = execution_environment_id
    if labels:
        payload["labels"] = labels
    if job_tags:
        payload["job_tags"] = job_tags
    if skip_tags:
        payload["skip_tags"] = skip_tags
    if extra_vars:
        payload["extra_vars"] = extra_vars

    return await make_request(f"{AAP_URL}/job_templates/", method="POST", json=payload)

@mcp.tool()
async def list_inventory_sources() -> Any:
    """List all inventory sources in Ansible Automation Platform."""
    return await make_request(f"{AAP_URL}/inventory_sources/")

@mcp.tool()
async def get_inventory_source(inventory_source_id: int) -> Any:
    """Get details of a specific inventory source."""
    return await make_request(f"{AAP_URL}/inventory_sources/{inventory_source_id}/")

@mcp.tool()
async def create_inventory_source(
    name: str,
    inventory_id: int,
    source: str,
    credential_id: int,
    source_vars: dict = None,
    update_on_launch: bool = True,
    timeout: int = 0,
) -> Any:
    """Create a dynamic inventory source. Claude will ask for the source type and credential before proceeding."""
    valid_sources = [
        "file", "constructed", "scm", "ec2", "gce", "azure_rm", "vmware", "satellite6", "openstack", 
        "rhv", "controller", "insights", "terraform", "openshift_virtualization"
    ]
    
    if source not in valid_sources:
        return f"Error: Invalid source type '{source}'. Please select from: {', '.join(valid_sources)}"
    
    if not credential_id:
        return "Error: Credential is required to create an inventory source."
    
    payload = {
        "name": name,
        "inventory": inventory_id,
        "source": source,
        "credential": credential_id,
        "source_vars": source_vars,
        "update_on_launch": update_on_launch,
        "timeout": timeout,
    }
    return await make_request(f"{AAP_URL}/inventory_sources/", method="POST", json=payload)

@mcp.tool()
async def update_inventory_source(inventory_source_id: int, update_data: dict) -> Any:
    """Update an existing inventory source."""
    return await make_request(f"{AAP_URL}/inventory_sources/{inventory_source_id}/", method="PATCH", json=update_data)

@mcp.tool()
async def delete_inventory_source(inventory_source_id: int) -> Any:
    """Delete an inventory source."""
    return await make_request(f"{AAP_URL}/inventory_sources/{inventory_source_id}/", method="DELETE")

@mcp.tool()
async def sync_inventory_source(inventory_source_id: int) -> Any:
    """Manually trigger a sync for an inventory source."""
    return await make_request(f"{AAP_URL}/inventory_sources/{inventory_source_id}/update/", method="POST")

@mcp.tool()
async def create_inventory(
    name: str,
    organization_id: int,
    description: str = "",
    kind: str = "",
    host_filter: str = "",
    variables: dict = None,
    prevent_instance_group_fallback: bool = False,
) -> Any:
    """Create an inventory in Ansible Automation Platform."""
    payload = {
        "name": name,
        "organization": organization_id,
        "description": description,
        "kind": kind,
        "host_filter": host_filter,
        "variables": variables,
        "prevent_instance_group_fallback": prevent_instance_group_fallback,
    }
    return await make_request(f"{AAP_URL}/inventories/", method="POST", json=payload)

@mcp.tool()
async def delete_inventory(inventory_id: int) -> Any:
    """Delete an inventory from Ansible Automation Platform."""
    return await make_request(f"{AAP_URL}/inventories/{inventory_id}/", method="DELETE")

@mcp.tool()
async def list_job_templates() -> Any:
    """List all job templates available in Ansible Automation Platform."""
    return await make_request(f"{AAP_URL}/job_templates/")

@mcp.tool()
async def get_job_template(template_id: int) -> Any:
    """Retrieve details of a specific job template."""
    return await make_request(f"{AAP_URL}/job_templates/{template_id}/")

@mcp.tool()
async def list_jobs() -> Any:
    """List all jobs available in Ansible Automation Platform."""
    return await make_request(f"{AAP_URL}/jobs/")

@mcp.tool()
async def list_recent_jobs(hours: int = 24) -> Any:
    """List all jobs executed in the last specified hours (default 24 hours)."""
    from datetime import datetime, timedelta
    
    time_filter = (datetime.utcnow() - timedelta(hours=hours)).isoformat() + "Z"
    return await make_request(f"{AAP_URL}/jobs/?created__gte={time_filter}")

if __name__ == "__main__":
    # Change from stdio to sse transport
    mcp.run(transport="sse")

```

--------------------------------------------------------------------------------
/app.py:
--------------------------------------------------------------------------------

```python
import streamlit as st
from llama_stack_client import LlamaStackClient
from llama_stack_client.lib.agents.agent import Agent
from llama_stack_client.types.agent_create_params import AgentConfig
import os
import json
from datetime import datetime

# Initialize LlamaStack client
base_url = os.getenv("BASE_URL", "http://localhost:8321") 
client = LlamaStackClient(base_url=base_url)

# Page configuration
st.set_page_config(
    page_title="Llama-stack Chat",
    page_icon="🦙",
    layout="wide",
)

# Styling for code blocks
st.markdown("""
<style>
    code {
        background-color: #f0f2f6;
        border-radius: 3px;
        padding: 0.2em 0.4em;
    }
    pre {
        background-color: #f0f2f6;
        border-radius: 5px;
        padding: 0.5em;
    }
    .chat-message {
        padding: 1.5rem;
        border-radius: 0.5rem;
        margin-bottom: 1rem;
    }
    .blinking-cursor {
        animation: blink 1s step-end infinite;
    }
    @keyframes blink {
        50% { opacity: 0; }
    }
</style>
""", unsafe_allow_html=True)

# Get providers with valid credentials (simplified approach)
@st.cache_data(ttl=300)
def get_configured_providers():
    try:
        # This is a simplified approach that assumes Anthropic is configured
        # In a real implementation, you would check LLama Stack's configuration
        return ["anthropic"]
    except Exception as e:
        st.error(f"Error checking configured providers: {e}")
        return ["anthropic"]  # Fallback to Anthropic

# Get available models from configured providers
@st.cache_data(ttl=300)
def get_available_models():
    try:
        models = client.models.list()
        configured_providers = get_configured_providers()
        
        # Filter for LLM models from configured providers
        available_models = [
            model.identifier 
            for model in models 
            if model.model_type == "llm" and model.provider_id in configured_providers
        ]
        
        return available_models
    except Exception as e:
        st.error(f"Error fetching models: {e}")
        return ["anthropic/claude-3-7-sonnet-latest"]  # Fallback default

# Get all available toolgroups
@st.cache_data(ttl=300)
def get_all_toolgroups():
    try:
        toolgroups = client.toolgroups.list()
        return [toolgroup.identifier for toolgroup in toolgroups]
    except Exception as e:
        st.error(f"Error fetching toolgroups: {e}")
        return ["mcp::ansible"]  # Fallback default

# Initialize session state
if "messages" not in st.session_state:
    st.session_state.messages = []
if "system_instruction" not in st.session_state:
    st.session_state.system_instruction = "You are a helpful assistant that can access various tools to help the user."
if "show_save_dialog" not in st.session_state:
    st.session_state.show_save_dialog = False
if "saved_chats" not in st.session_state:
    st.session_state.saved_chats = "{}"
if "chat_updated" not in st.session_state:
    st.session_state.chat_updated = False

# Function to check if there are messages
def has_messages():
    return len(st.session_state.messages) > 0

# Streamlit UI
st.title("Llama-stack Chat")
st.markdown("Chat with LLama Stack and its toolgroups")

# Sidebar configurations
with st.sidebar:
    st.header("Configuration")
    
    # Model selection
    available_models = get_available_models()
    selected_model = st.selectbox("Select Model", available_models)
    
    # Get all toolgroups automatically and display as collapsible
    all_toolgroups = get_all_toolgroups()
    with st.expander(f"{len(all_toolgroups)} Toolgroups Loaded"):
        for toolgroup in all_toolgroups:
            st.caption(f"• {toolgroup}")
    
    # System instructions
    with st.expander("System Instructions", expanded=False):
        new_instruction = st.text_area(
            "Customize how the assistant behaves:", 
            st.session_state.system_instruction,
            height=100
        )
        if new_instruction != st.session_state.system_instruction:
            st.session_state.system_instruction = new_instruction
            st.toast("System instructions updated")
    
    # Collapsible Query Context section
    with st.expander("Query Context", expanded=False):
        query_context = st.text_area("Add background information for this query:", "", height=150)
        st.caption("This information will be included with each of your queries but won't be visible in the chat.")
    
    # Temperature in collapsible
    with st.expander("Temperature", expanded=False):
        temperature = st.slider("Temperature", min_value=0.0, max_value=1.0, value=0.7, step=0.1)
        top_p = st.slider("Top P", min_value=0.0, max_value=1.0, value=0.9, step=0.1)
    
    # Chat history management
    st.header("Chat Management")
    
    # Using two separate buttons instead of columns for better visibility
    clear_col, save_col = st.columns(2)
    
    # Clear chat button
    with clear_col:
        if st.button("🗑️ Clear Chat", key="clear_chat"):
            st.session_state.messages = []
            st.session_state.chat_updated = True
            st.rerun()
    
    # Save button - disabled when no messages
    with save_col:
        if st.button("💾 Save Chat", key="save_chat", disabled=not has_messages()):
            st.session_state.show_save_dialog = True
    
    # Save dialog - shown when save button is clicked
    if st.session_state.show_save_dialog:
        st.text_input("Conversation name:", key="save_name")
        save_confirm, cancel = st.columns(2)
        
        with save_confirm:
            if st.button("Confirm Save", key="confirm_save"):
                if st.session_state.save_name:
                    saved_chats = json.loads(st.session_state.saved_chats)
                    saved_chats[st.session_state.save_name] = st.session_state.messages
                    st.session_state.saved_chats = json.dumps(saved_chats)
                    st.session_state.show_save_dialog = False
                    st.toast(f"Saved conversation: {st.session_state.save_name}")
                    st.rerun()
                else:
                    st.warning("Please enter a name for the conversation")
        
        with cancel:
            if st.button("Cancel", key="cancel_save"):
                st.session_state.show_save_dialog = False
                st.rerun()
    
    # Export chat button - disabled when no messages
    if st.button("📥 Export Chat", key="export_chat", disabled=not has_messages()):
        chat_export = ""
        for msg in st.session_state.messages:
            prefix = "🧑" if msg["role"] == "user" else "🤖"
            chat_export += f"{prefix} **{msg['role'].capitalize()}**: {msg['content']}\n\n"
        
        # Create download link
        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
        filename = f"chat_export_{timestamp}.md"
        st.download_button(
            label="Download Chat",
            data=chat_export,
            file_name=filename,
            mime="text/markdown",
            key="download_chat"
        )
    
    # Load saved conversations
    st.header("Saved Conversations")
    
    # Load saved conversation
    saved_chats = json.loads(st.session_state.saved_chats)
    if saved_chats:
        chat_names = list(saved_chats.keys())
        selected_chat = st.selectbox("Select a saved conversation:", [""] + chat_names)
        if selected_chat and st.button("📂 Load Conversation"):
            st.session_state.messages = saved_chats[selected_chat]
            st.session_state.chat_updated = True
            st.toast(f"Loaded conversation: {selected_chat}")
            st.rerun()
    else:
        st.caption("No saved conversations yet")

# Display chat history
for message in st.session_state.messages:
    with st.chat_message(message["role"]):
        # Apply syntax highlighting for code blocks
        content = message["content"]
        st.markdown(content)

# Input for new messages
prompt = st.chat_input("Ask something...")
if prompt:
    full_response = ""
    agent_config = AgentConfig(
        model=selected_model,
        instructions=st.session_state.system_instruction,
        sampling_params={
            "strategy": {"type": "top_p", "temperature": temperature, "top_p": top_p},
        },
        toolgroups=all_toolgroups,  # Use all toolgroups automatically
        tool_choice="auto",
        input_shields=[],
        output_shields=[],
        enable_session_persistence=True,
    )
    
    try:
        agent = Agent(client, agent_config)
        session_id = agent.create_session("chat-session")
        
        # Add user input to chat history
        st.session_state.messages.append({"role": "user", "content": prompt})
        with st.chat_message("user"):
            st.markdown(prompt)

        # Get response from LlamaStack API
        with st.chat_message("assistant"):
            message_placeholder = st.empty()
            
            # Add query context if provided
            user_message = prompt
            if query_context:
                user_message += f"\n\nContext: {query_context}"
            
            response = agent.create_turn(
                messages=[{"role": "user", "content": user_message}],
                session_id=session_id,
            )
         
            for chunk in response:
                if hasattr(chunk, 'event') and hasattr(chunk.event, 'payload'):
                    payload = chunk.event.payload
                    if hasattr(payload, 'event_type') and payload.event_type == "step_progress":
                        if hasattr(payload, 'delta') and hasattr(payload.delta, 'type') and payload.delta.type == "text":
                            full_response += payload.delta.text
                            message_placeholder.markdown(full_response + "▌")
            
            message_placeholder.markdown(full_response)
            st.session_state.messages.append({"role": "assistant", "content": full_response})
            
        # Set flag to indicate chat has been updated
        st.session_state.chat_updated = True
        
        # Force a rerun to update the UI state (including button states)
        st.rerun()
    
    except Exception as e:
        st.error(f"Error: {str(e)}")

# Reset the chat_updated flag after processing
if st.session_state.chat_updated:
    st.session_state.chat_updated = False

```

--------------------------------------------------------------------------------
/redhat_insights_mcp.py:
--------------------------------------------------------------------------------

```python
"""
Red Hat Insights MCP Server

This server requires a Red Hat service account with client credentials.

Setup:
1. Create a service account in Red Hat Console (console.redhat.com)
2. Assign appropriate permissions via User Access → Groups
3. Set environment variables:
   export INSIGHTS_CLIENT_ID="your-client-id"
   export INSIGHTS_CLIENT_SECRET="your-client-secret"
   
Optional:
   export INSIGHTS_BASE_URL="https://console.redhat.com/api"
   export SSO_URL="https://sso.redhat.com/auth/realms/redhat-external/protocol/openid-connect/token"
"""

import os
import httpx
from mcp.server.fastmcp import FastMCP
from typing import Any, Optional
from datetime import datetime, timedelta

# Environment variables for authentication
INSIGHTS_BASE_URL = os.getenv("INSIGHTS_BASE_URL", "https://console.redhat.com/api")
INSIGHTS_CLIENT_ID = os.getenv("INSIGHTS_CLIENT_ID")
INSIGHTS_CLIENT_SECRET = os.getenv("INSIGHTS_CLIENT_SECRET")
SSO_URL = os.getenv("SSO_URL", "https://sso.redhat.com/auth/realms/redhat-external/protocol/openid-connect/token")

if not INSIGHTS_CLIENT_ID or not INSIGHTS_CLIENT_SECRET:
    raise ValueError("INSIGHTS_CLIENT_ID and INSIGHTS_CLIENT_SECRET are required")

# Global variable to store the access token
_access_token = None
_token_expires_at = None

# Initialize FastMCP
mcp = FastMCP("insights")

async def get_access_token() -> str:
    """Get or refresh the access token using client credentials."""
    global _access_token, _token_expires_at
    
    # Check if we have a valid token
    if _access_token and _token_expires_at and datetime.utcnow() < _token_expires_at:
        return _access_token
    
    # Request new token
    async with httpx.AsyncClient() as client:
        response = await client.post(
            SSO_URL,
            headers={"Content-Type": "application/x-www-form-urlencoded"},
            data={
                "grant_type": "client_credentials",
                "scope": "api.console",
                "client_id": INSIGHTS_CLIENT_ID,
                "client_secret": INSIGHTS_CLIENT_SECRET
            }
        )
    
    if response.status_code != 200:
        raise Exception(f"Failed to get access token: {response.status_code} {response.text}")
    
    token_data = response.json()
    _access_token = token_data["access_token"]
    # Set expiration time with some buffer (subtract 60 seconds)
    expires_in = token_data.get("expires_in", 300)  # Default to 5 minutes
    _token_expires_at = datetime.utcnow() + timedelta(seconds=expires_in - 60)
    
    return _access_token

async def make_request(url: str, method: str = "GET", json: dict = None, params: dict = None) -> Any:
    """Helper function to make authenticated API requests to Red Hat Insights."""
    token = await get_access_token()
    headers = {
        "Authorization": f"Bearer {token}",
        "Content-Type": "application/json"
    }
    
    async with httpx.AsyncClient() as client:
        response = await client.request(method, url, headers=headers, json=json, params=params)
    
    if response.status_code not in [200, 201, 204]:
        return f"Error {response.status_code}: {response.text}"
    
    return response.json() if "application/json" in response.headers.get("Content-Type", "") else response.text

# Authentication Test
@mcp.tool()
async def test_authentication() -> Any:
    """Test authentication with Red Hat Insights using service account credentials."""
    try:
        token = await get_access_token()
        # Test with a simple API call
        result = await make_request(f"{INSIGHTS_BASE_URL}/inventory/v1/hosts?limit=1")
        return {"status": "success", "message": "Authentication successful", "sample_data": result}
    except Exception as e:
        return {"status": "error", "message": f"Authentication failed: {str(e)}"}

# Host Inventory Management Tools
@mcp.tool()
async def list_systems(limit: int = 50, offset: int = 0, display_name: str = None, staleness: str = None) -> Any:
    """List all hosts/systems registered with Red Hat Insights. Use staleness='fresh' or 'stale' to filter."""
    params = {"limit": limit, "offset": offset}
    if display_name:
        params["display_name"] = display_name
    if staleness:
        params["staleness"] = staleness
    return await make_request(f"{INSIGHTS_BASE_URL}/inventory/v1/hosts", params=params)

@mcp.tool()
async def get_system(system_id: str) -> Any:
    """Get details of a specific system by UUID."""
    return await make_request(f"{INSIGHTS_BASE_URL}/inventory/v1/hosts/{system_id}")

@mcp.tool()
async def get_system_profile(system_id: str, fields: list[str] = None) -> Any:
    """Get system profile/facts for a specific system. Specify fields to limit response."""
    url = f"{INSIGHTS_BASE_URL}/inventory/v1/hosts/{system_id}/system_profile"
    params = {}
    if fields:
        for field in fields:
            params[f"fields[system_profile]"] = field
    return await make_request(url, params=params)

@mcp.tool()
async def get_system_tags(system_id: str) -> Any:
    """Get tags for a specific system."""
    return await make_request(f"{INSIGHTS_BASE_URL}/inventory/v1/hosts/{system_id}/tags")

@mcp.tool()
async def delete_system(system_id: str) -> Any:
    """Remove a system from Red Hat Insights inventory."""
    return await make_request(f"{INSIGHTS_BASE_URL}/inventory/v1/hosts/{system_id}", method="DELETE")

# Vulnerability Management Tools
@mcp.tool()
async def list_vulnerabilities(
    limit: int = 50, 
    offset: int = 0,
    affecting: bool = True,
    cvss_score_gte: float = None,
    cvss_score_lte: float = None
) -> Any:
    """List vulnerabilities affecting your systems. Set affecting=True to only show CVEs affecting systems."""
    params = {"limit": limit, "offset": offset}
    if affecting:
        params["affecting"] = "true"
    if cvss_score_gte:
        params["cvss_score_gte"] = cvss_score_gte
    if cvss_score_lte:
        params["cvss_score_lte"] = cvss_score_lte
    return await make_request(f"{INSIGHTS_BASE_URL}/vulnerability/v1/vulnerabilities/cves", params=params)

@mcp.tool()
async def get_vulnerability_executive_report() -> Any:
    """Get executive vulnerability report with CVE summaries by severity."""
    return await make_request(f"{INSIGHTS_BASE_URL}/vulnerability/v1/report/executive")

# Patch Management Tools
@mcp.tool()
async def list_advisories(
    limit: int = 50,
    offset: int = 0,
    advisory_type: str = None,
    severity: str = None
) -> Any:
    """List available advisories (patches). Export format from patch/v3."""
    params = {"limit": limit, "offset": offset}
    if advisory_type:
        params["advisory_type"] = advisory_type
    if severity:
        params["severity"] = severity
    return await make_request(f"{INSIGHTS_BASE_URL}/patch/v3/export/advisories", params=params)

# Compliance Tools
@mcp.tool()
async def list_compliance_policies(limit: int = 50, offset: int = 0) -> Any:
    """List SCAP compliance policies."""
    params = {"limit": limit, "offset": offset}
    return await make_request(f"{INSIGHTS_BASE_URL}/compliance/v2/policies", params=params)

@mcp.tool()
async def list_compliance_systems(assigned_or_scanned: bool = True) -> Any:
    """List systems associated with SCAP policies."""
    params = {}
    if assigned_or_scanned:
        params["filter"] = "assigned_or_scanned=true"
    return await make_request(f"{INSIGHTS_BASE_URL}/compliance/v2/systems", params=params)

@mcp.tool()
async def associate_compliance_policy(policy_id: str, system_id: str) -> Any:
    """Associate a system with a SCAP compliance policy."""
    return await make_request(f"{INSIGHTS_BASE_URL}/compliance/v2/policies/{policy_id}/systems/{system_id}", method="PATCH")

@mcp.tool()
async def list_compliance_reports(limit: int = 50, offset: int = 0) -> Any:
    """List all compliance reports."""
    params = {"limit": limit, "offset": offset}
    return await make_request(f"{INSIGHTS_BASE_URL}/compliance/v2/reports", params=params)

# Recommendations and Advisor Tools
@mcp.tool()
async def list_recommendations(
    category: str = None,
    impact: str = None,
    limit: int = 50,
    offset: int = 0
) -> Any:
    """List available recommendation rules from Advisor."""
    params = {"limit": limit, "offset": offset}
    if category:
        params["category"] = category
    if impact:
        params["impact"] = impact
    return await make_request(f"{INSIGHTS_BASE_URL}/insights/v1/rule", params=params)

@mcp.tool()
async def export_rule_hits(has_playbook: bool = None, format: str = "json") -> Any:
    """Export all rule hits (recommendations) for systems. Set has_playbook=True for Ansible playbooks."""
    params = {}
    if has_playbook:
        params["has_playbook"] = "true"
    return await make_request(f"{INSIGHTS_BASE_URL}/insights/v1/export/hits", params=params)

@mcp.tool()
async def get_system_recommendations(system_id: str) -> Any:
    """Get recommendation summary for a specific system."""
    return await make_request(f"{INSIGHTS_BASE_URL}/insights/v1/system/{system_id}")

# Policy Management Tools
@mcp.tool()
async def list_policies(limit: int = 50, offset: int = 0) -> Any:
    """List all defined custom policies."""
    params = {"limit": limit, "offset": offset}
    return await make_request(f"{INSIGHTS_BASE_URL}/policies/v1/policies", params=params)

@mcp.tool()
async def create_policy(name: str, description: str, conditions: str, actions: str = "notification", is_enabled: bool = True) -> Any:
    """Create a new custom policy. Example conditions: 'arch = \"x86_64\"'"""
    payload = {
        "name": name,
        "description": description,
        "conditions": conditions,
        "actions": actions,
        "isEnabled": is_enabled
    }
    return await make_request(f"{INSIGHTS_BASE_URL}/policies/v1/policies", method="POST", json=payload)

@mcp.tool()
async def get_policy_triggers(policy_id: str) -> Any:
    """Get systems that triggered a specific policy."""
    return await make_request(f"{INSIGHTS_BASE_URL}/policies/v1/policies/{policy_id}/history/trigger")

# Remediation Tools
@mcp.tool()
async def list_remediations(limit: int = 50, offset: int = 0) -> Any:
    """List all defined remediation plans."""
    params = {"limit": limit, "offset": offset}
    return await make_request(f"{INSIGHTS_BASE_URL}/remediations/v1/remediations", params=params)

@mcp.tool()
async def create_remediation(name: str, issues: list[dict], auto_reboot: bool = False, archived: bool = False) -> Any:
    """Create a new remediation plan. Issues should be list of dicts with id, resolution, systems."""
    payload = {
        "name": name,
        "auto_reboot": auto_reboot,
        "archived": archived,
        "add": {
            "issues": issues
        }
    }
    return await make_request(f"{INSIGHTS_BASE_URL}/remediations/v1/remediations", method="POST", json=payload)

@mcp.tool()
async def get_remediation_playbook(remediation_id: str) -> Any:
    """Get Ansible playbook for a remediation plan."""
    return await make_request(f"{INSIGHTS_BASE_URL}/remediations/v1/remediations/{remediation_id}/playbook")

@mcp.tool()
async def execute_remediation(remediation_id: str) -> Any:
    """Execute a remediation plan."""
    return await make_request(f"{INSIGHTS_BASE_URL}/remediations/v1/remediations/{remediation_id}/playbook_runs", method="POST")

# Subscription Management
@mcp.tool()
async def list_rhel_subscriptions(product: str = "RHEL for x86", limit: int = 50, offset: int = 0) -> Any:
    """List systems with RHEL subscriptions. Product examples: 'RHEL for x86', 'RHEL for x86_64'"""
    from urllib.parse import quote
    encoded_product = quote(product)
    params = {"limit": limit, "offset": offset}
    return await make_request(f"{INSIGHTS_BASE_URL}/rhsm-subscriptions/v1/instances/products/{encoded_product}", params=params)

# Export Tools
@mcp.tool()
async def create_export(name: str, format: str, application: str, resource: str) -> Any:
    """Create an export request. Common applications: 'urn:redhat:application:inventory', 'subscriptions'"""
    payload = {
        "name": name,
        "format": format,
        "sources": [{
            "application": application,
            "resource": resource
        }]
    }
    return await make_request(f"{INSIGHTS_BASE_URL}/export/v1/exports", method="POST", json=payload)

@mcp.tool()
async def get_export_status(export_id: str) -> Any:
    """Get status of an export request."""
    return await make_request(f"{INSIGHTS_BASE_URL}/export/v1/exports/{export_id}/status")

@mcp.tool()
async def download_export(export_id: str) -> Any:
    """Download completed export as ZIP file."""
    return await make_request(f"{INSIGHTS_BASE_URL}/export/v1/exports/{export_id}")

# Notifications and Integrations
@mcp.tool()
async def list_notification_events(start_date: str = None, end_date: str = None, limit: int = 20, offset: int = 0) -> Any:
    """Get notification event history. Dates in YYYY-MM-DD format."""
    params = {"limit": limit, "offset": offset}
    if start_date:
        params["startDate"] = start_date
    if end_date:
        params["endDate"] = end_date
    return await make_request(f"{INSIGHTS_BASE_URL}/notifications/v1/notifications/events", params=params)

@mcp.tool()
async def list_integrations() -> Any:
    """List configured third-party integrations."""
    return await make_request(f"{INSIGHTS_BASE_URL}/integrations/v1/endpoints")

# Analytics and Statistics
@mcp.tool()
async def get_insights_overview() -> Any:
    """Get overview of systems and basic statistics by querying inventory."""
    # Use inventory endpoint to get basic stats since there's no single stats endpoint
    result = await make_request(f"{INSIGHTS_BASE_URL}/inventory/v1/hosts?limit=1")
    return result

# Content Sources and Templates
@mcp.tool()
async def list_repositories(limit: int = 50, offset: int = 0) -> Any:
    """List all existing content repositories."""
    params = {"limit": limit, "offset": offset}
    return await make_request(f"{INSIGHTS_BASE_URL}/content-sources/v1.0/repositories", params=params)

@mcp.tool()
async def create_repository(name: str, url: str, distribution_arch: str = "x86_64", distribution_versions: list[str] = None) -> Any:
    """Create a new custom repository."""
    payload = {
        "name": name,
        "url": url,
        "distribution_arch": distribution_arch,
        "distribution_versions": distribution_versions or ["9"],
        "metadata_verification": False,
        "module_hotfixes": False,
        "snapshot": False
    }
    return await make_request(f"{INSIGHTS_BASE_URL}/content-sources/v1.0/repositories", method="POST", json=payload)

@mcp.tool()
async def list_content_templates(limit: int = 50, offset: int = 0) -> Any:
    """List all content templates."""
    params = {"limit": limit, "offset": offset}
    return await make_request(f"{INSIGHTS_BASE_URL}/content-sources/v1.0/templates", params=params)

@mcp.tool()
async def create_content_template(name: str, arch: str, version: str, repository_uuids: list[str], description: str = "") -> Any:
    """Create a new content template."""
    payload = {
        "name": name,
        "arch": arch,
        "version": version,
        "description": description,
        "repository_uuids": repository_uuids,
        "use_latest": True
    }
    return await make_request(f"{INSIGHTS_BASE_URL}/content-sources/v1.0/templates", method="POST", json=payload)

if __name__ == "__main__":
    mcp.run(transport="stdio")

```