#
tokens: 47910/50000 37/42 files (page 1/2)
lines: off (toggle) GitHub
raw markdown copy
This is page 1 of 2. Use http://codebase.md/alexei-led/aws-mcp-server?page={x} to view the full context.

# Directory Structure

```
├── .dockerignore
├── .github
│   └── workflows
│       ├── ci.yml
│       └── release.yml
├── .gitignore
├── CLAUDE.md
├── codecov.yml
├── deploy
│   └── docker
│       ├── docker-compose.yml
│       └── Dockerfile
├── docs
│   └── VERSION.md
├── LICENSE
├── Makefile
├── media
│   └── demo.mp4
├── pyproject.toml
├── README.md
├── security_config_example.yaml
├── smithery.yaml
├── spec.md
├── src
│   └── aws_mcp_server
│       ├── __init__.py
│       ├── __main__.py
│       ├── cli_executor.py
│       ├── config.py
│       ├── prompts.py
│       ├── resources.py
│       ├── security.py
│       ├── server.py
│       └── tools.py
├── tests
│   ├── __init__.py
│   ├── conftest.py
│   ├── integration
│   │   ├── __init__.py
│   │   ├── test_aws_live.py
│   │   ├── test_security_integration.py
│   │   └── test_server_integration.py
│   ├── test_aws_integration.py
│   ├── test_aws_setup.py
│   ├── test_bucket_creation.py
│   ├── test_run_integration.py
│   └── unit
│       ├── __init__.py
│       ├── test_cli_executor.py
│       ├── test_init.py
│       ├── test_main.py
│       ├── test_prompts.py
│       ├── test_resources.py
│       ├── test_security.py
│       ├── test_server.py
│       └── test_tools.py
└── uv.lock
```

# Files

--------------------------------------------------------------------------------
/.dockerignore:
--------------------------------------------------------------------------------

```
# Version Control
.git/
.github/
.gitignore
.gitattributes

# Docker
.dockerignore
deploy/
docker-compose*.yml
Dockerfile*

# Documentation
docs/

# Markdown files except README.md
*.md
!README.md

# Python
__pycache__/
*.py[cod]
*.$py.class
*.so
.Python
*.egg-info/
*.egg
.installed.cfg
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/

# Virtual Environments
.env
.venv/
env/
ENV/
venv/

# Testing and Coverage
.coverage
.pytest_cache/
.tox/
.nox/
htmlcov/
tests/

# Development and IDE
.idea/
.vscode/
.ruff_cache/
.mypy_cache/
.aider*
*.swp
*.swo

# OS Generated
.DS_Store
Thumbs.db

# Logs
logs/
*.log
```

--------------------------------------------------------------------------------
/.gitignore:
--------------------------------------------------------------------------------

```
# Python
__pycache__/
*.py[cod]
*$py.class
*.so
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST

# Testing and Coverage
.coverage
.coverage.*
.pytest_cache/
.tox/
.nox/
htmlcov/
.hypothesis/
coverage.xml
*.cover
nosetests.xml

# Virtual Environments
.env
.venv/
env/
venv/
ENV/
env.bak/
venv.bak/

# Development and IDE
.idea/
.vscode/
.ruff_cache/
.mypy_cache/
.dmypy.json
dmypy.json
.pytype/
.spyderproject
.spyproject
.ropeproject
.aider*
*.swp
*.swo
*~
.*.sw[op]

# Jupyter
.ipynb_checkpoints

# Logs
logs/
*.log
pip-log.txt
pip-delete-this-directory.txt

# OS Generated
.DS_Store
Thumbs.db
Icon?
ehthumbs.db
Desktop.ini

# Secrets and Credentials
*.pem
*.key
secrets/
config.local.yaml
credentials.json
aws_credentials

# Local Development
.direnv/
.envrc
*.local.yml
*.local.yaml
local_settings.py

# Distribution
*.tar.gz
*.tgz
*.zip
*.gz
```

--------------------------------------------------------------------------------
/README.md:
--------------------------------------------------------------------------------

```markdown
# AWS Model Context Protocol (MCP) Server

[![CI](https://github.com/alexei-led/aws-mcp-server/actions/workflows/ci.yml/badge.svg)](https://github.com/alexei-led/aws-mcp-server/actions/workflows/ci.yml)
[![Code Coverage](https://codecov.io/gh/alexei-led/aws-mcp-server/branch/main/graph/badge.svg?token=K8vdP3zyuy)](https://codecov.io/gh/alexei-led/aws-mcp-server)
[![Linter: Ruff](https://img.shields.io/badge/Linter-Ruff-brightgreen?style=flat-square)](https://github.com/alexei-led/aws-mcp-server)
[![Image Tags](https://ghcr-badge.egpl.dev/alexei-led/aws-mcp-server/tags?color=%2344cc11&ignore=latest&n=4&label=image+tags&trim=)](https://github.com/alexei-led/aws-mcp-server/pkgs/container/aws-mcp-server/versions)
[![Image Size](https://ghcr-badge.egpl.dev/alexei-led/aws-mcp-server/size?color=%2344cc11&tag=latest&label=image+size&trim=)](https://github.com/alexei-led/aws-mcp-server/pkgs/container/aws-mcp-server)

A lightweight service that enables AI assistants to execute AWS CLI commands through the Model Context Protocol (MCP).

## Overview

The AWS MCP Server provides a bridge between MCP-aware AI assistants (like Claude Desktop, Cursor, Windsurf) and the AWS CLI. It enables these assistants to:

1. **Retrieve AWS CLI documentation** (`aws_cli_help`) - Get detailed help on AWS services and commands
2. **Execute AWS CLI commands** (`aws_cli_pipeline`) - Run commands with Unix pipes and receive formatted results optimized for AI consumption

```mermaid
flowchart LR
    AI[AI Assistant] <-->|MCP Protocol| Server[AWS MCP Server]
    Server <-->|Subprocess| AWS[AWS CLI]
    AWS <-->|API| Cloud[AWS Cloud]
```

## Demo

[Demo](https://private-user-images.githubusercontent.com/1898375/424996801-b51ddc8e-5df5-40c4-8509-84c1a7800d62.mp4?jwt=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJnaXRodWIuY29tIiwiYXVkIjoicmF3LmdpdGh1YnVzZXJjb250ZW50LmNvbSIsImtleSI6ImtleTUiLCJleHAiOjE3NDI0NzY5OTUsIm5iZiI6MTc0MjQ3NjY5NSwicGF0aCI6Ii8xODk4Mzc1LzQyNDk5NjgwMS1iNTFkZGM4ZS01ZGY1LTQwYzQtODUwOS04NGMxYTc4MDBkNjIubXA0P1gtQW16LUFsZ29yaXRobT1BV1M0LUhNQUMtU0hBMjU2JlgtQW16LUNyZWRlbnRpYWw9QUtJQVZDT0RZTFNBNTNQUUs0WkElMkYyMDI1MDMyMCUyRnVzLWVhc3QtMSUyRnMzJTJGYXdzNF9yZXF1ZXN0JlgtQW16LURhdGU9MjAyNTAzMjBUMTMxODE1WiZYLUFtei1FeHBpcmVzPTMwMCZYLUFtei1TaWduYXR1cmU9NjgwNTM4MDVjN2U4YjQzN2Y2N2Y5MGVkMThiZTgxYWEyNzBhZTlhMTRjZDY3ZDJmMzJkNmViM2U4M2U4MTEzNSZYLUFtei1TaWduZWRIZWFkZXJzPWhvc3QifQ.tIb7uSkDpSaspIluzCliHS8ATmlzkvEnF3CiClD-UGQ)

The video demonstrates using Claude Desktop with AWS MCP Server to create a new AWS EC2 instance with AWS SSM agent installed.

## Features

- **Command Documentation** - Detailed help information for AWS CLI commands
- **Command Execution** - Execute AWS CLI commands and return human-readable results
- **Unix Pipe Support** - Filter and transform AWS CLI output using standard Unix pipes and utilities
- **AWS Resources Context** - Access to AWS profiles, regions, account information, and environment details via MCP Resources
- **Prompt Templates** - Pre-defined prompt templates for common AWS tasks following best practices
- **Docker Integration** - Simple deployment through containerization with multi-architecture support (AMD64/x86_64 and ARM64)
- **AWS Authentication** - Leverages existing AWS credentials on the host machine

## Requirements

- Docker (default) or Python 3.13+ (and AWS CLI installed locally)
- AWS credentials configured

## Getting Started

**Note:** For security and reliability, running the server inside a Docker container is the **strongly recommended** method. Please review the [Security Considerations](#security-considerations) section for important considerations.

### Run Server Option 1: Using Docker (Recommended)

```bash
# Clone repository
git clone https://github.com/alexei-led/aws-mcp-server.git
cd aws-mcp-server

# Build and run Docker container
docker compose -f deploy/docker/docker-compose.yml up -d
```

The Docker image supports both AMD64/x86_64 (Intel/AMD) and ARM64 (Apple Silicon M1-M4, AWS Graviton) architectures.

> **Note**: The official image from GitHub Packages is multi-architecture and will automatically use the appropriate version for your system.
>
> ```bash
> # Use the latest stable version
> docker pull ghcr.io/alexei-led/aws-mcp-server:latest
> 
> # Or pin to a specific version (recommended for production)
> docker pull ghcr.io/alexei-led/aws-mcp-server:1.0.0
> ```
>
> **Docker Image Tags**:
>
> - `latest`: Latest stable release
> - `x.y.z` (e.g., `1.0.0`): Specific version
> - `sha-<commit-sha>`: Development builds, tagged with Git commit SHA (e.g., `sha-gb697684`)

### Run Server Option 2: Using Python

**Use with Caution:** Running natively requires careful environment setup and carries higher security risks compared to the recommended Docker deployment. Ensure you understand the implications outlined in the [Security Considerations](#security-considerations) section.

```bash
# Clone repository
git clone https://github.com/alexei-led/aws-mcp-server.git
cd aws-mcp-server

# Set up virtual environment
python -m venv venv
source venv/bin/activate  # On Windows: venv\Scripts\activate

# Install in development mode
pip install -e .

# Run the server
python -m aws_mcp_server
```

## Configuration

The AWS MCP Server can be configured using environment variables:

| Environment Variable      | Description                                  | Default   |
|--------------------------|----------------------------------------------|-----------|
| `AWS_MCP_TIMEOUT`        | Command execution timeout in seconds         | 300       |
| `AWS_MCP_MAX_OUTPUT`     | Maximum output size in characters            | 100000    |
| `AWS_MCP_TRANSPORT`      | Transport protocol to use ("stdio" or "sse") | stdio     |
| `AWS_PROFILE`            | AWS profile to use                           | default   |
| `AWS_REGION`             | AWS region to use                            | us-east-1 |
| `AWS_MCP_SECURITY_MODE`  | Security mode ("strict" or "permissive")     | strict    |
| `AWS_MCP_SECURITY_CONFIG`| Path to custom security configuration file   | ""        |

**Important:** Securely manage the AWS credentials provided to the server, whether via mounted `~/.aws` files or environment variables. Ensure the credentials follow the principle of least privilege as detailed in the [Security Considerations](#security-considerations) section. When running via Docker, ensure these variables are passed correctly to the container environment (e.g., using `docker run -e VAR=value ...`).

## Security Considerations

Security is paramount when executing commands against your AWS environment. While AWS MCP Server provides functionality, **you are responsible** for configuring and running it securely. Please adhere strictly to the following:

**1. Recommended Deployment: Docker Container**

*   **Isolation:** Running the server inside a Docker container is the **strongly recommended and default** deployment method. Containerization provides crucial filesystem and process isolation. Potentially destructive Unix commands (like `rm`, `mv`) executed via pipes, even if misused, will be contained within the ephemeral Docker environment and will **not** affect your host machine's filesystem. The container can be easily stopped and recreated.
*   **Controlled Environment:** Docker ensures a consistent environment with necessary dependencies, reducing unexpected behavior.

**2. AWS Credentials and IAM Least Privilege (Critical)**

*   **User Responsibility:** You provide the AWS credentials to the server (via mounted `~/.aws` or environment variables).
*   **Least Privilege is Essential:** The server executes AWS CLI commands *using the credentials you provide*. It is **absolutely critical** that these credentials belong to an IAM principal (User or Role) configured with the **minimum necessary permissions** (least privilege) for *only* the AWS actions you intend to perform through this tool.
    *   **Do Not Use Root Credentials:** Never use AWS account root user credentials.
    *   **Regularly Review Permissions:** Periodically audit the IAM permissions associated with the credentials.
*   **Impact Limitation:** Properly configured IAM permissions are the **primary mechanism** for limiting the potential impact of *any* command executed via the server, whether intended or unintended. Even if a command were manipulated, it could only perform actions allowed by the specific IAM policy.

**3. Trusted User Model**

*   The server assumes the end-user interacting with the MCP client (e.g., Claude Desktop, Cursor) is the **same trusted individual** who configured the server and provided the least-privilege AWS credentials. Do not expose the server or connected client to untrusted users.

**4. Understanding Execution Risks (Current Implementation)**

*   **Command Execution:** The current implementation uses shell features (`shell=True` in subprocess calls) to execute AWS commands and handle Unix pipes. While convenient, this approach carries inherent risks if the input command string were manipulated (command injection).
*   **Mitigation via Operational Controls:** In the context of the **trusted user model** and **Docker deployment**, these risks are mitigated operationally:
    *   The trusted user is assumed not to provide intentionally malicious commands against their own environment.
    *   Docker contains filesystem side-effects.
    *   **Crucially, IAM least privilege limits the scope of *any* AWS action that could be executed.**
*   **Credential Exfiltration Risk:** Despite containerization and IAM, a sophisticated command injection could potentially attempt to read the mounted credentials (`~/.aws`) or environment variables within the container and exfiltrate them (e.g., via `curl`). **Strict IAM policies remain the most vital defense** to limit the value of potentially exfiltrated credentials.

**5. Network Exposure (SSE Transport)**

*   If using the `sse` transport (which implies a network listener), ensure you bind the server only to trusted network interfaces (e.g., `localhost`) or implement appropriate network security controls (firewalls, authentication proxies) if exposing it more broadly. The default `stdio` transport does not open network ports.

**6. Shared Responsibility Summary**

*   **AWS MCP Server provides the tool.**
*   **You, the user, are responsible for:**
    *   Running it within the recommended secure Docker environment.
    *   Providing and securely managing **least-privilege** AWS credentials.
    *   Ensuring only trusted users interact with the server/client.
    *   Securing the network environment if applicable.

By strictly adhering to Docker deployment and meticulous IAM least-privilege configuration, you establish the necessary operational controls for using the AWS MCP Server securely with its current implementation.

## Integrating with Claude Desktop

### Configuration

To manually integrate AWS MCP Server with Claude Desktop:

1. **Locate the Claude Desktop configuration file**:
   - macOS: `~/Library/Application Support/Claude/claude_desktop_config.json`
   - Windows: `%APPDATA%\Claude\claude_desktop_config.json`

2. **Edit the configuration file** to include the AWS MCP Server:
   ```json
   {
     "mcpServers": {
       "aws-mcp-server": {
         "command": "docker",
         "args": [
           "run",
           "-i",
           "--rm",
           "-v",
           "/Users/YOUR_USER_NAME/.aws:/home/appuser/.aws:ro",
           "ghcr.io/alexei-led/aws-mcp-server:latest"
         ]
       }
     }
   }
   ```
   
3. **Restart Claude Desktop** to apply the changes
   - After restarting, you should see a hammer 🔨 icon in the bottom right corner of the input box
   - This indicates that the AWS MCP Server is available for use

```mermaid
flowchart TD
    subgraph "User Device"
        config[Edit claude_desktop_config.json]
        claude[Claude Desktop]
        docker[Docker Container]
        aws_creds[AWS Credentials]
    end
    
    subgraph "AWS Cloud"
        services[AWS Services]
    end
    
    subgraph "MCP Server Components"
        tools[AWS CLI Tools]
        resources[AWS Resources]
        templates[Prompt Templates]
    end
    
    config -->|Add MCP Server Config| claude
    claude -->|Docker Run Command| docker
    docker --- tools
    docker --- resources
    docker --- templates
    aws_creds -->|Mount Read-only| docker
    resources -.->|Read| aws_creds
    tools -->|API Calls| services
```

### Example Interactions

**Getting AWS CLI Documentation**:
```
User: What are the available options for the AWS S3 cp command?

Claude: Let me get that information for you.

The AWS S3 cp command has the following options:

NAME
    cp - Copies a local file or S3 object to another location locally or in S3.

SYNOPSIS
    cp <LocalPath> <S3Uri> or <S3Uri> <LocalPath> or <S3Uri> <S3Uri> [--acl <value>] [--cache-control <value>] ...
```

**Executing AWS CLI Commands**:
```
User: List my S3 buckets

Claude: I'll list your S3 buckets for you.

2023-10-15 14:30:45 my-bucket-1
2023-11-20 09:15:32 my-bucket-2
2024-01-05 11:22:18 my-backup-bucket
```

**Using Command Pipes**:
```
User: List my S3 buckets that have "backup" in their name and show them in alphabetical order

Claude: Let me find that for you.

2024-01-05 11:22:18 my-backup-bucket
2024-03-01 09:44:12 weekly-backup-bucket
2024-03-15 13:10:57 database-backup-bucket
```

**Accessing AWS Resources**:
```
User: What AWS regions are available for me to use?

Claude: Let me check what AWS regions are available for you.

Available regions:
- us-east-1 (US East, N. Virginia) - Currently selected
- us-east-2 (US East, Ohio)
- us-west-1 (US West, N. California)
- us-west-2 (US West, Oregon)
- eu-west-1 (EU West, Ireland)
- eu-central-1 (EU Central, Frankfurt)
- ap-northeast-1 (Asia Pacific, Tokyo)
- ap-southeast-1 (Asia Pacific, Singapore)
- ap-southeast-2 (Asia Pacific, Sydney)

You're currently using the us-east-1 region.
```

**Using Prompt Templates**:
```
User: I need to perform a security audit of my S3 buckets

Claude: I'll help you audit the security of your S3 buckets.

Here are AWS CLI commands to perform a comprehensive security audit:

# List all buckets with public access settings
aws s3api list-buckets --query 'Buckets[*].Name' --output text | xargs -I {} aws s3api get-public-access-block --bucket {}

# Check bucket policies for public access
aws s3api list-buckets --query 'Buckets[*].Name' --output text | xargs -I {} aws s3api get-bucket-policy --bucket {} 2>/dev/null

# Identify buckets without encryption
aws s3api list-buckets --query 'Buckets[*].Name' --output text | xargs -I {} aws s3api get-bucket-encryption --bucket {} 2>/dev/null

# Check for buckets without logging enabled
aws s3api list-buckets --query 'Buckets[*].Name' --output text | xargs -I {} aws s3api get-bucket-logging --bucket {} 2>/dev/null

# Verify bucket versioning status
aws s3api list-buckets --query 'Buckets[*].Name' --output text | xargs -I {} aws s3api get-bucket-versioning --bucket {}
```

## Available Prompt Templates

The AWS MCP Server includes the following pre-defined prompt templates:

### Core Operations

| Prompt                 | Description                                                   | Parameters                                          |
|------------------------|---------------------------------------------------------------|-----------------------------------------------------|
| `create_resource`      | Generate commands to create AWS resources with best practices | `resource_type`, `resource_name`                    |
| `resource_inventory`   | Create comprehensive inventory of resources                   | `service`, `region` (optional)                      |
| `troubleshoot_service` | Generate commands to troubleshoot service issues              | `service`, `resource_id`                            |
| `resource_cleanup`     | Identify and safely clean up resources                        | `service`, `criteria` (optional)                    |

### Security & Compliance

| Prompt                     | Description                                                | Parameters                                          |
|----------------------------|------------------------------------------------------------|-----------------------------------------------------|
| `security_audit`           | Audit security settings for a specific AWS service         | `service`                                           |
| `security_posture_assessment` | Comprehensive security assessment across your AWS environment | None                                          |
| `iam_policy_generator`     | Create least-privilege IAM policies                        | `service`, `actions`, `resource_pattern` (optional) |
| `compliance_check`         | Check compliance with standards                            | `compliance_standard`, `service` (optional)         |

### Cost & Performance

| Prompt               | Description                                             | Parameters                                         |
|----------------------|---------------------------------------------------------|----------------------------------------------------|
| `cost_optimization`  | Find cost optimization opportunities for a service      | `service`                                          |
| `performance_tuning` | Optimize and tune performance of AWS resources          | `service`, `resource_id`                           |

### Infrastructure & Architecture

| Prompt                      | Description                                              | Parameters                                           |
|-----------------------------|----------------------------------------------------------|------------------------------------------------------|
| `serverless_deployment`     | Deploy serverless applications with best practices       | `application_name`, `runtime` (optional)             |
| `container_orchestration`   | Set up container environments (ECS/EKS)                  | `cluster_name`, `service_type` (optional)            |
| `vpc_network_design`        | Design and implement secure VPC networking               | `vpc_name`, `cidr_block` (optional)                  |
| `infrastructure_automation` | Automate infrastructure management                       | `resource_type`, `automation_scope` (optional)       |
| `multi_account_governance`  | Implement secure multi-account strategies                | `account_type` (optional)                            |

### Reliability & Monitoring

| Prompt               | Description                                           | Parameters                                          |
|----------------------|-------------------------------------------------------|-----------------------------------------------------|
| `service_monitoring` | Set up comprehensive monitoring                       | `service`, `metric_type` (optional)                 |
| `disaster_recovery`  | Implement enterprise-grade DR solutions               | `service`, `recovery_point_objective` (optional)    |

## Security

The AWS MCP Server implements a comprehensive multi-layered approach to command validation and security:

### Command Validation System

The server validates all AWS CLI commands through a three-layer system:

1. **Basic Command Structure**: 
   - Verifies commands start with 'aws' prefix and contain a valid service
   - Ensures proper command syntax

2. **Security-Focused Command Filtering**:
   - **Dangerous Commands**: Blocks commands that could compromise security
   - **Safe Patterns**: Explicitly allows read-only operations needed for normal use
   - **Regex Pattern Matching**: Prevents complex security risks with pattern matching

3. **Pipe Command Security**:
   - Validates Unix commands used in pipes
   - Restricts commands to a safe allowlist
   - Prevents filesystem manipulation and arbitrary command execution

### Default Security Configuration

The default security configuration focuses on preventing the following attack vectors:

#### 1. Identity and Access Management (IAM) Risks

| Blocked Command | Security Risk |
|-----------------|---------------|
| `aws iam create-user` | Creates potential backdoor accounts with persistent access |
| `aws iam create-access-key` | Creates long-term credentials that can be stolen or misused |
| `aws iam attach-*-policy` | Potential privilege escalation via policy attachments |
| `aws iam put-user-policy` | Inline policies can grant excessive permissions |
| `aws iam create-policy` | Creating new policies with potentially dangerous permissions |
| `aws iam create-login-profile` | Creates console passwords for existing users |
| `aws iam deactivate-mfa-device` | Disables multi-factor authentication, weakening security |
| `aws iam update-assume-role-policy` | Modifies trust relationships, enabling privilege escalation |

#### 2. Audit and Logging Tampering

| Blocked Command | Security Risk |
|-----------------|---------------|
| `aws cloudtrail delete-trail` | Removes audit trail of AWS activity |
| `aws cloudtrail stop-logging` | Stops collecting activity logs, creating blind spots |
| `aws cloudtrail update-trail` | Can redirect or modify logging configuration |
| `aws config delete-configuration-recorder` | Disables AWS Config recording of resource changes |
| `aws guardduty delete-detector` | Disables threat detection capabilities |

#### 3. Sensitive Data Access and Protection

| Blocked Command | Security Risk |
|-----------------|---------------|
| `aws secretsmanager put-secret-value` | Modifies sensitive credentials |
| `aws secretsmanager delete-secret` | Removes sensitive credentials |
| `aws kms schedule-key-deletion` | Schedules deletion of encryption keys, risking data loss |
| `aws kms disable-key` | Disables encryption keys, potentially exposing data |
| `aws s3api put-bucket-policy` | Can create public S3 buckets, exposing data |
| `aws s3api delete-bucket-policy` | Removes protective policies from buckets |

#### 4. Network Security Risks

| Blocked Command | Security Risk |
|-----------------|---------------|
| `aws ec2 authorize-security-group-ingress` | Opens inbound network access, potential exposure |
| `aws ec2 authorize-security-group-egress` | Opens outbound network access, potential data exfiltration |
| `aws ec2 modify-instance-attribute` | Can alter security properties of instances |

Many read-only operations that match these patterns are explicitly allowed via safe patterns:

- All `get-`, `list-`, and `describe-` commands
- All help commands (`--help`, `help`)
- Simulation and testing commands (e.g., `aws iam simulate-custom-policy`)

### Configuration Options

- **Security Modes**:
  - `strict` (default): Enforces all security validations
  - `permissive`: Logs warnings but allows execution (use with caution)

- **Custom Configuration**:
  - Override default security rules via YAML configuration file
  - Configure service-specific dangerous commands
  - Define custom safe patterns and regex rules
  - Environment variable: `AWS_MCP_SECURITY_CONFIG`

- **Execution Controls**:
  - Timeouts prevent long-running commands (default: 300 seconds)
  - Output size limits prevent memory issues
  - Environment variables: `AWS_MCP_TIMEOUT`, `AWS_MCP_MAX_OUTPUT`

### Custom Security Rules Example

You can create custom security rules by defining a YAML configuration file:

```yaml
# Example custom security configuration
# Save to a file and set AWS_MCP_SECURITY_CONFIG environment variable

# Dangerous commands to block
dangerous_commands:
  iam:
    # Only block specific IAM operations for your environment
    - "aws iam create-user"
    - "aws iam attach-user-policy"
  
  # Custom service restrictions for your organization
  lambda:
    - "aws lambda delete-function"
    - "aws lambda remove-permission"
  
  # Prevent accidental DynamoDB table deletion
  dynamodb:
    - "aws dynamodb delete-table"

# Safe patterns to explicitly allow
safe_patterns:
  # Global safe patterns
  general:
    - "--help"
    - "--dry-run"
  
  # Allow read operations on IAM
  iam:
    - "aws iam get-"
    - "aws iam list-"
  
  # Allow specific Lambda operations
  lambda:
    - "aws lambda list-functions"
    - "aws lambda get-function"

# Complex regex rules for security validation
regex_rules:
  general:
    # Prevent use of root credentials
    - pattern: "aws .* --profile\\s+root"
      description: "Prevent use of root profile"
      error_message: "Using the root profile is not allowed for security reasons"
  
  iam:
    # Block creation of admin users
    - pattern: "aws iam create-user.*--user-name\\s+.*admin.*"
      description: "Prevent creation of admin users"
      error_message: "Creating users with 'admin' in the name is restricted"
    
    # Prevent wildcards in IAM policies
    - pattern: "aws iam create-policy.*\"Effect\":\\s*\"Allow\".*\"Action\":\\s*\"\\*\".*\"Resource\":\\s*\"\\*\""
      description: "Prevent wildcards in policies"
      error_message: "Creating policies with '*' wildcards for both Action and Resource is not allowed"
  
  s3:
    # Prevent public bucket policies
    - pattern: "aws s3api put-bucket-policy.*\"Effect\":\\s*\"Allow\".*\"Principal\":\\s*\"\\*\""
      description: "Prevent public bucket policies"
      error_message: "Creating bucket policies with public access is restricted"
```

### Security Examples

The system follows IAM best practices, focusing on preventing escalation of privilege:

```bash
# This command would be blocked (creates user)
aws iam create-user --user-name new-user
> Error: This command (aws iam create-user) is restricted for security reasons.

# This command would be blocked (attaches admin policy)
aws iam attach-user-policy --user-name any-user --policy-arn arn:aws:iam::aws:policy/AdministratorAccess
> Error: Attaching Administrator policies is restricted for security reasons.

# This command would be blocked (opens SSH port globally)
aws ec2 authorize-security-group-ingress --group-id sg-12345 --protocol tcp --port 22 --cidr 0.0.0.0/0
> Error: Opening non-web ports to the entire internet (0.0.0.0/0) is restricted.

# These commands are allowed (read-only operations)
aws iam list-users
aws s3 ls
aws ec2 describe-instances
```

### Security Best Practices

- Always use the default `strict` security mode in production
- Follow the deployment recommendations in [Security Considerations](#security-considerations)
- Run with least-privilege AWS credentials
- For custom configurations, focus on your security requirements

## Development

### Setting Up the Development Environment

```bash
# Install only runtime dependencies using pip
pip install -e .

# Install all development dependencies using pip
pip install -e ".[dev]"

# Or use uv for faster dependency management
make uv-install       # Install runtime dependencies
make uv-dev-install   # Install development dependencies
```

### Makefile Commands

The project includes a Makefile with various targets for common tasks:

```bash
# Test commands
make test             # Run tests excluding integration tests
make test-unit        # Run unit tests only (all tests except integration tests)
make test-integration # Run integration tests only (requires AWS credentials)
make test-all         # Run all tests including integration tests

# Test coverage commands
make test-coverage    # Run tests with coverage report (excluding integration tests)
make test-coverage-all # Run all tests with coverage report (including integration tests)

# Linting and formatting
make lint             # Run linters (ruff check and format --check)
make lint-fix         # Run linters and auto-fix issues where possible
make format           # Format code with ruff
```

For a complete list of available commands, run `make help`.

### Code Coverage

The project includes configuration for [Codecov](https://codecov.io) to track code coverage metrics. The configuration is in the `codecov.yml` file, which:

- Sets a target coverage threshold of 80%
- Excludes test files, setup files, and documentation from coverage reports
- Configures PR comments and status checks

Coverage reports are automatically generated during CI/CD runs and uploaded to Codecov.

### Integration Testing

Integration tests verify AWS MCP Server works correctly with actual AWS resources. To run them:

1. **Set up AWS resources**:
   - Create an S3 bucket for testing
   - Set the environment variable: `export AWS_TEST_BUCKET=your-test-bucket-name`
   - Ensure your AWS credentials are configured

2. **Run integration tests**:
   ```bash
   # Run all tests including integration tests
   make test-all
   
   # Run only integration tests
   make test-integration
   ```

Or you can run the pytest commands directly:
```bash
# Run all tests including integration tests
pytest --run-integration

# Run only integration tests
pytest --run-integration -m integration
```

## Troubleshooting

- **Authentication Issues**: Ensure your AWS credentials are properly configured
- **Connection Errors**: Verify the server is running and AI assistant connection settings are correct
- **Permission Errors**: Check that your AWS credentials have the necessary permissions
- **Timeout Errors**: For long-running commands, increase the `AWS_MCP_TIMEOUT` environment variable

## Why Deploy with Docker

Deploying AWS MCP Server via Docker is the recommended approach, offering significant security and reliability advantages that form the core of the tool's secure usage pattern:

### Security Benefits

- **Isolation (Primary Mitigation):** The Docker container provides essential filesystem and process isolation. AWS CLI commands and piped Unix utilities run in a contained environment. Accidental or misused commands affecting the filesystem are limited to the container, **protecting your host machine**.
- **Controlled Credential Access:** When mounting credentials, using the `:ro` (read-only) flag limits the container's ability to modify your AWS configuration files.
- **No Local Installation:** Avoids installing the AWS CLI and its dependencies directly on your host system.
- **Clean Environment:** Each container run starts with a known, clean state.

### Reliability Advantages

- **Consistent Configuration**: All required tools (AWS CLI, SSM plugin, jq) are pre-installed and properly configured
- **Dependency Management**: Avoid version conflicts between tools and dependencies
- **Cross-Platform Consistency**: Works the same way across different operating systems
- **Complete Environment**: Includes all necessary tools for command pipes, filtering, and formatting

### Other Benefits

- **Multi-Architecture Support**: Runs on both Intel/AMD (x86_64) and ARM (Apple Silicon, AWS Graviton) processors
- **Simple Updates**: Update to new versions with a single pull command
- **No Python Environment Conflicts**: Avoids potential conflicts with other Python applications on your system
- **Version Pinning**: Easily pin to specific versions for stability in production environments

## Versioning

This project uses [setuptools_scm](https://github.com/pypa/setuptools_scm) to automatically determine versions based on Git tags:

- **Release versions**: When a Git tag exists (e.g., `1.2.3`), the version will be exactly that tag
- **Development versions**: For commits without tags, a development version is generated in the format: 
  `<last-tag>.post<commits-since-tag>+g<commit-hash>.d<date>` (e.g., `1.2.3.post10+gb697684.d20250406`)

The version is automatically included in:
- Package version information
- Docker image labels
- Continuous integration builds

### Creating Releases

To create a new release version:

```bash
# Create and push a new tag
git tag -a 1.2.3 -m "Release version 1.2.3"
git push origin 1.2.3
```

The CI/CD pipeline will automatically build and publish Docker images with appropriate version tags.

For more detailed information about the version management system, see [VERSION.md](docs/VERSION.md).

## License

This project is licensed under the MIT License - see the LICENSE file for details.

```

--------------------------------------------------------------------------------
/CLAUDE.md:
--------------------------------------------------------------------------------

```markdown
# AWS MCP Server Development Guide

## Build & Test Commands

### Using uv (recommended)
- Install dependencies: `uv pip install --system -e .`
- Install dev dependencies: `uv pip install --system -e ".[dev]"`
- Update lock file: `uv pip compile --system pyproject.toml -o uv.lock`
- Install from lock file: `uv pip sync --system uv.lock`

### Using pip (alternative)
- Install dependencies: `pip install -e .`
- Install dev dependencies: `pip install -e ".[dev]"`

### Running the server
- Run server: `python -m aws_mcp_server`
- Run server with SSE transport: `AWS_MCP_TRANSPORT=sse python -m aws_mcp_server`
- Run with MCP CLI: `mcp run src/aws_mcp_server/server.py`

### Testing and linting
- Run tests: `pytest`
- Run single test: `pytest tests/path/to/test_file.py::test_function_name -v`
- Run tests with coverage: `python -m pytest --cov=src/aws_mcp_server tests/`
- Run linter: `ruff check src/ tests/`
- Format code: `ruff format src/ tests/`

## Technical Stack

- **Python version**: Python 3.13+
- **Project config**: `pyproject.toml` for configuration and dependency management
- **Environment**: Use virtual environment in `.venv` for dependency isolation
- **Package management**: Use `uv` for faster, more reliable dependency management with lock file
- **Dependencies**: Separate production and dev dependencies in `pyproject.toml`
- **Version management**: Use `setuptools_scm` for automatic versioning from Git tags
- **Linting**: `ruff` for style and error checking
- **Type checking**: Use VS Code with Pylance for static type checking
- **Project layout**: Organize code with `src/` layout

## Code Style Guidelines

- **Formatting**: Black-compatible formatting via `ruff format`
- **Imports**: Sort imports with `ruff` (stdlib, third-party, local)
- **Type hints**: Use native Python type hints (e.g., `list[str]` not `List[str]`)
- **Documentation**: Google-style docstrings for all modules, classes, functions
- **Naming**: snake_case for variables/functions, PascalCase for classes
- **Function length**: Keep functions short (< 30 lines) and single-purpose
- **PEP 8**: Follow PEP 8 style guide (enforced via `ruff`)

## Python Best Practices

- **File handling**: Prefer `pathlib.Path` over `os.path`
- **Debugging**: Use `logging` module instead of `print`
- **Error handling**: Use specific exceptions with context messages and proper logging
- **Data structures**: Use list/dict comprehensions for concise, readable code
- **Function arguments**: Avoid mutable default arguments
- **Data containers**: Leverage `dataclasses` to reduce boilerplate
- **Configuration**: Use environment variables (via `python-dotenv`) for configuration
- **AWS CLI**: Validate all commands before execution (must start with "aws")
- **Security**: Never store/log AWS credentials, set command timeouts

## Development Patterns & Best Practices

- **Favor simplicity**: Choose the simplest solution that meets requirements
- **DRY principle**: Avoid code duplication; reuse existing functionality
- **Configuration management**: Use environment variables for different environments
- **Focused changes**: Only implement explicitly requested or fully understood changes
- **Preserve patterns**: Follow existing code patterns when fixing bugs
- **File size**: Keep files under 300 lines; refactor when exceeding this limit
- **Test coverage**: Write comprehensive unit and integration tests with `pytest`; include fixtures
- **Test structure**: Use table-driven tests with parameterization for similar test cases
- **Mocking**: Use unittest.mock for external dependencies; don't test implementation details
- **Modular design**: Create reusable, modular components
- **Logging**: Implement appropriate logging levels (debug, info, error)
- **Error handling**: Implement robust error handling for production reliability
- **Security best practices**: Follow input validation and data protection practices
- **Performance**: Optimize critical code sections when necessary
- **Dependency management**: Add libraries only when essential
  - When adding/updating dependencies, update `pyproject.toml` first
  - Regenerate the lock file with `uv pip compile --system pyproject.toml -o uv.lock`
  - Install the new dependencies with `uv pip sync --system uv.lock`

## Development Workflow

- **Version control**: Commit frequently with clear messages
- **Versioning**: Use Git tags for versioning (e.g., `git tag -a 1.2.3 -m "Release 1.2.3"`)
  - For releases, create and push a tag
  - For development, let `setuptools_scm` automatically determine versions
- **Impact assessment**: Evaluate how changes affect other codebase areas
- **Documentation**: Keep documentation up-to-date for complex logic and features
- **Dependencies**: When adding dependencies, always update the `uv.lock` file
- **CI/CD**: All changes should pass CI checks (tests, linting, etc.) before merging

```

--------------------------------------------------------------------------------
/tests/unit/__init__.py:
--------------------------------------------------------------------------------

```python
"""Unit tests for AWS MCP Server."""

```

--------------------------------------------------------------------------------
/tests/__init__.py:
--------------------------------------------------------------------------------

```python
"""Test package for AWS MCP Server."""

```

--------------------------------------------------------------------------------
/tests/integration/__init__.py:
--------------------------------------------------------------------------------

```python
"""Integration tests for AWS MCP Server."""

```

--------------------------------------------------------------------------------
/tests/test_run_integration.py:
--------------------------------------------------------------------------------

```python
"""Simple test to verify integration test setup."""

import pytest


@pytest.mark.integration
def test_integration_marker_works():
    """Test that tests with integration marker run."""
    print("Integration test is running!")
    assert True

```

--------------------------------------------------------------------------------
/src/aws_mcp_server/__init__.py:
--------------------------------------------------------------------------------

```python
"""AWS Model Context Protocol (MCP) Server.

A lightweight service that enables AI assistants to execute AWS CLI commands through the Model Context Protocol (MCP).
"""

from importlib.metadata import PackageNotFoundError, version

try:
    __version__ = version("aws-mcp-server")
except PackageNotFoundError:
    # package is not installed
    pass

```

--------------------------------------------------------------------------------
/tests/test_aws_integration.py:
--------------------------------------------------------------------------------

```python
"""Simple test to verify AWS integration setup."""

import pytest


@pytest.mark.integration
def test_aws_credentials(ensure_aws_credentials):
    """Test that AWS credentials fixture works."""
    print("AWS credentials test is running!")
    assert True


@pytest.mark.integration
@pytest.mark.asyncio
async def test_aws_bucket(aws_s3_bucket):
    """Test that AWS bucket fixture works."""
    # We need to manually extract the bucket name from the async generator
    bucket_name = None
    async for name in aws_s3_bucket:
        bucket_name = name
        break

    print(f"AWS bucket fixture returned: {bucket_name}")
    assert bucket_name is not None
    assert isinstance(bucket_name, str)
    assert len(bucket_name) > 0

```

--------------------------------------------------------------------------------
/codecov.yml:
--------------------------------------------------------------------------------

```yaml
codecov:
  require_ci_to_pass: yes
  notify:
    wait_for_ci: yes

coverage:
  precision: 2
  round: down
  range: "70...90"
  status:
    project:
      default:
        # Target minimum coverage percentage
        target: 80%
        # Allow a small decrease in coverage without failing
        threshold: 5%
        if_ci_failed: error
    patch:
      default:
        # Target coverage for new code or changes
        target: 80%
        threshold: 5%

ignore:
  # Deployment and configuration files
  - "deploy/**/*"
  - "scripts/**/*"
  # Test files should not count toward coverage
  - "tests/**/*"
  # Setup and initialization files
  - "setup.py"
  - "aws_mcp_server/__main__.py"
  - "aws_mcp_server/__init__.py"
  # Documentation files
  - "docs/**/*"
  - "*.md"
  # Version generated file
  - "aws_mcp_server/_version.py"

comment:
  layout: "reach, diff, flags, files"
  behavior: default
  require_changes: false
  require_base: no
  require_head: yes
```

--------------------------------------------------------------------------------
/deploy/docker/docker-compose.yml:
--------------------------------------------------------------------------------

```yaml
services:
  aws-mcp-server:
    # Use either local build or official image from GitHub Packages
    build:
      context: ../../
      dockerfile: ./deploy/docker/Dockerfile
    # Alternatively, use the pre-built multi-arch image
    # image: ghcr.io/alexei-led/aws-mcp-server:latest
    ports:
      - "8000:8000"
    volumes:
      - ~/.aws://home/appuser/.aws:ro # Mount AWS credentials as read-only
    environment:
      - AWS_PROFILE=default # Specify default AWS profile
      - AWS_MCP_TIMEOUT=300 # Default timeout in seconds (5 minutes)
      - AWS_MCP_TRANSPORT=stdio # Transport protocol ("stdio" or "sse")
      # - AWS_MCP_MAX_OUTPUT=100000  # Uncomment to set max output size
    restart: unless-stopped
# To build multi-architecture images:
# 1. Set up Docker buildx: docker buildx create --name mybuilder --use
# 2. Build and push the multi-arch image:
#    docker buildx build --platform linux/amd64,linux/arm64 -t yourrepo/aws-mcp-server:latest --push .

```

--------------------------------------------------------------------------------
/src/aws_mcp_server/__main__.py:
--------------------------------------------------------------------------------

```python
"""Main entry point for the AWS MCP Server.

This module provides the entry point for running the AWS MCP Server.
FastMCP handles the command-line arguments and server configuration.
"""

import logging
import signal
import sys

from aws_mcp_server.server import logger, mcp

# Configure root logger
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", handlers=[logging.StreamHandler(sys.stderr)])


def handle_interrupt(signum, frame):
    """Handle keyboard interrupt (Ctrl+C) gracefully."""
    logger.info(f"Received signal {signum}, shutting down gracefully...")
    sys.exit(0)


# Using FastMCP's built-in CLI handling
if __name__ == "__main__":
    # Set up signal handler for graceful shutdown
    signal.signal(signal.SIGINT, handle_interrupt)
    signal.signal(signal.SIGTERM, handle_interrupt)

    try:
        # Use configured transport protocol
        from aws_mcp_server.config import TRANSPORT

        # Validate transport protocol
        if TRANSPORT not in ("stdio", "sse"):
            logger.error(f"Invalid transport protocol: {TRANSPORT}. Must be 'stdio' or 'sse'")
            sys.exit(1)

        # Run with the specified transport protocol
        logger.info(f"Starting server with transport protocol: {TRANSPORT}")
        mcp.run(transport=TRANSPORT)
    except KeyboardInterrupt:
        logger.info("Keyboard interrupt received. Shutting down gracefully...")
        sys.exit(0)

```

--------------------------------------------------------------------------------
/smithery.yaml:
--------------------------------------------------------------------------------

```yaml
# Smithery configuration file: https://smithery.ai/docs/config#smitheryyaml

startCommand:
  type: stdio
  configSchema:
    # JSON Schema defining the configuration options for the MCP.
    type: object
    properties:
      awsMcpTimeout:
        type: number
        default: 300
        description: Command execution timeout in seconds.
      awsMcpMaxOutput:
        type: number
        default: 100000
        description: Maximum output size in characters.
      awsMcpTransport:
        type: string
        default: stdio
        description: Transport protocol to use ('stdio' or 'sse').
      awsProfile:
        type: string
        default: default
        description: AWS profile to use.
      awsRegion:
        type: string
        default: us-east-1
        description: AWS region to use.
  commandFunction:
    # A JS function that produces the CLI command based on the given config to start the MCP on stdio.
    |-
    (config) => ({
      command: 'python',
      args: ['-m', 'aws_mcp_server'],
      env: {
        AWS_MCP_TIMEOUT: String(config.awsMcpTimeout || 300),
        AWS_MCP_MAX_OUTPUT: String(config.awsMcpMaxOutput || 100000),
        AWS_MCP_TRANSPORT: config.awsMcpTransport || 'stdio',
        AWS_PROFILE: config.awsProfile || 'default',
        AWS_REGION: config.awsRegion || 'us-east-1'
      }
    })
  exampleConfig:
    awsMcpTimeout: 300
    awsMcpMaxOutput: 100000
    awsMcpTransport: stdio
    awsProfile: default
    awsRegion: us-east-1

build:
  dockerfile: deploy/docker/Dockerfile
  dockerBuildPath: .
```

--------------------------------------------------------------------------------
/docs/VERSION.md:
--------------------------------------------------------------------------------

```markdown
# Version Management with setuptools_scm

This project uses [setuptools_scm](https://setuptools-scm.readthedocs.io/) to automatically determine version numbers from Git tags.

## How it works

1. The version is automatically determined from your git tags
2. In development environments, the version is dynamically determined
3. For Docker builds and CI, the version is passed as a build argument

## Version Format

- Release: When on a tag (e.g., `1.2.3`), the version is exactly that tag
- Development: When between tags, the version is `<last-tag>.post<n>+g<commit-hash>`
  - Example: `1.2.3.post10+gb697684`

## Local Development

The version is automatically determined whenever you:

```bash
# Install the package
pip install -e .

# Run the version-file generator
make version-file

# Check the current version
python -m setuptools_scm
```

## Importing Version in Code

```python
# Preferred method - via Python metadata
from importlib.metadata import version
__version__ = version("aws-mcp-server")

# Alternative - if using version file
from aws_mcp_server._version import version, __version__
```

## Docker and CI

For Docker builds, the version is:

1. Determined by setuptools_scm
2. Passed to Docker as a build argument
3. Used in the image's labels and metadata

## Creating Releases

To create a new release:

1. Create and push a tag that follows semantic versioning:
   ```bash
   git tag -a 1.2.3 -m "Release 1.2.3"
   git push origin 1.2.3
   ```

2. The CI pipeline will:
   - Use setuptools_scm to get the version
   - Build the Docker image with proper tags
   - Push the release to registries

## Usage Notes

- The `_version.py` file is automatically generated and ignored by git
- Always include the patch version in tags (e.g., use `1.2.3` instead of `1.2`)
- For the Docker image, the `+` in versions is replaced with `-` for compatibility
```

--------------------------------------------------------------------------------
/pyproject.toml:
--------------------------------------------------------------------------------

```toml
[build-system]
requires = ["setuptools>=61.0", "setuptools_scm>=8.0.0"]
build-backend = "setuptools.build_meta"

[project]
name = "aws-mcp-server"
dynamic = ["version"]
description = "AWS Model Context Protocol Server"
readme = "README.md"
requires-python = ">=3.13"
license = { text = "MIT" }
authors = [{ name = "Alexei Ledenev" }]
dependencies = [
    "fastmcp>=0.4.1",
    "mcp>=1.0.0",
    "boto3>=1.34.0",
    "pyyaml>=6.0.0"
]

[project.optional-dependencies]
dev = [
    "pytest>=7.0.0",
    "pytest-cov>=4.0.0",
    "pytest-asyncio>=0.23.0",
    "ruff>=0.2.0",
    "moto>=4.0.0",
    "setuptools_scm>=7.0.0",
]
# Production dependencies, optimized for Docker
prod = [
    "fastmcp>=0.4.1",
    "mcp>=1.0.0",
    "boto3>=1.34.0",
    "pyyaml>=6.0.0",
]

[tool.setuptools]
packages = ["aws_mcp_server"]
package-dir = { "" = "src" }

[tool.ruff]
line-length = 160
target-version = "py313"
exclude = ["src/aws_mcp_server/_version.py"]

[tool.ruff.lint]
select = ["E", "F", "I", "B"]

[tool.ruff.format]
quote-style = "double"
indent-style = "space"
line-ending = "auto"

[tool.ruff.lint.isort]
known-first-party = ["aws_mcp_server"]

# Using VSCode + Pylance static typing instead of mypy

[tool.pytest.ini_options]
testpaths = ["tests"]
python_files = "test_*.py"
markers = [
    "integration: marks tests that require AWS CLI and AWS credentials",
    "asyncio: mark test as requiring asyncio",
]
asyncio_mode = "strict"
asyncio_default_fixture_loop_scope = "function"
filterwarnings = [
    "ignore::RuntimeWarning:unittest.mock:",
    "ignore::RuntimeWarning:weakref:"
]

[tool.coverage.run]
source = ["src/aws_mcp_server"]
omit = [
    "*/tests/*",
    "*/setup.py",
    "*/conftest.py",
    "src/aws_mcp_server/__main__.py",
]

[tool.coverage.report]
exclude_lines = [
    "pragma: no cover",
    "def __repr__",
    "if self.debug",
    "raise NotImplementedError",
    "if __name__ == .__main__.:",
    "pass",
    "raise ImportError",
]

[tool.setuptools_scm]
fallback_version="0.0.0-dev0"
```

--------------------------------------------------------------------------------
/tests/unit/test_init.py:
--------------------------------------------------------------------------------

```python
"""Tests for the package initialization module."""

import unittest
from importlib import reload
from unittest.mock import patch


class TestInitModule(unittest.TestCase):
    """Tests for the __init__ module."""

    def test_version_from_package(self):
        """Test __version__ is set from package metadata."""
        with patch("importlib.metadata.version", return_value="1.2.3"):
            # Import the module fresh to apply the patch
            import aws_mcp_server

            # Reload to apply our patch
            reload(aws_mcp_server)

            # Check that __version__ is set correctly
            self.assertEqual(aws_mcp_server.__version__, "1.2.3")

    def test_version_fallback_on_package_not_found(self):
        """Test handling of PackageNotFoundError."""
        from importlib.metadata import PackageNotFoundError

        # Looking at the actual implementation, when PackageNotFoundError is raised,
        # it just uses 'pass', so the attribute __version__ may or may not be set.
        # If it was previously set (which is likely), it will retain its previous value.
        with patch("importlib.metadata.version", side_effect=PackageNotFoundError):
            # Create a fresh module
            import sys

            if "aws_mcp_server" in sys.modules:
                del sys.modules["aws_mcp_server"]

            # Import the module fresh with our patch
            import aws_mcp_server

            # In this case, the __version__ may not even be set
            # We're just testing that the code doesn't crash with PackageNotFoundError
            # Our test should pass regardless of whether __version__ is set
            # The important part is that the exception is handled
            try:
                # This could raise AttributeError
                _ = aws_mcp_server.__version__
                # If we get here, it's set to something - hard to assert exactly what
                # Just ensure no exception was thrown
                self.assertTrue(True)
            except AttributeError:
                # If AttributeError is raised, that's also fine - the attribute doesn't exist
                self.assertTrue(True)


if __name__ == "__main__":
    unittest.main()

```

--------------------------------------------------------------------------------
/tests/test_bucket_creation.py:
--------------------------------------------------------------------------------

```python
"""Test for creating and managing S3 buckets directly."""

import asyncio
import os
import time
import uuid

import pytest

from aws_mcp_server.config import AWS_REGION
from aws_mcp_server.server import aws_cli_pipeline


@pytest.mark.integration
@pytest.mark.asyncio
async def test_create_and_delete_s3_bucket():
    """Test creating and deleting an S3 bucket using AWS MCP server."""
    # Get region from environment or use default
    region = os.environ.get("AWS_TEST_REGION", AWS_REGION)
    print(f"Using AWS region: {region}")

    # Generate a unique bucket name
    timestamp = int(time.time())
    random_id = str(uuid.uuid4())[:8]
    bucket_name = f"aws-mcp-test-{timestamp}-{random_id}"

    try:
        # Create the bucket
        create_cmd = f"aws s3 mb s3://{bucket_name} --region {region}"
        result = await aws_cli_pipeline(command=create_cmd, timeout=None, ctx=None)

        # Check if bucket was created successfully
        assert result["status"] == "success", f"Failed to create bucket: {result['output']}"

        # Wait for bucket to be fully available
        await asyncio.sleep(3)

        # List buckets to verify it exists
        list_result = await aws_cli_pipeline(command="aws s3 ls", timeout=None, ctx=None)
        assert bucket_name in list_result["output"], "Bucket not found in bucket list"

        # Try to create a test file
        test_content = "Test content"
        with open("test_file.txt", "w") as f:
            f.write(test_content)

        # Upload the file
        upload_result = await aws_cli_pipeline(command=f"aws s3 cp test_file.txt s3://{bucket_name}/test_file.txt --region {region}", timeout=None, ctx=None)
        assert upload_result["status"] == "success", f"Failed to upload file: {upload_result['output']}"

        # List bucket contents
        list_files_result = await aws_cli_pipeline(command=f"aws s3 ls s3://{bucket_name}/ --region {region}", timeout=None, ctx=None)
        assert "test_file.txt" in list_files_result["output"], "Uploaded file not found in bucket"

    finally:
        # Clean up
        # Remove test file
        if os.path.exists("test_file.txt"):
            os.remove("test_file.txt")

        # Delete all objects in the bucket
        await aws_cli_pipeline(command=f"aws s3 rm s3://{bucket_name} --recursive --region {region}", timeout=None, ctx=None)

        # Delete the bucket
        delete_result = await aws_cli_pipeline(command=f"aws s3 rb s3://{bucket_name} --region {region}", timeout=None, ctx=None)
        assert delete_result["status"] == "success", f"Failed to delete bucket: {delete_result['output']}"

```

--------------------------------------------------------------------------------
/tests/unit/test_main.py:
--------------------------------------------------------------------------------

```python
"""Tests for the main entry point of the AWS MCP Server."""

from unittest.mock import MagicMock, patch

import pytest

# Import handle_interrupt function for direct testing
from aws_mcp_server.__main__ import handle_interrupt


def test_handle_interrupt():
    """Test the handle_interrupt function."""
    with patch("sys.exit") as mock_exit:
        # Call the function with mock signal and frame
        handle_interrupt(MagicMock(), MagicMock())
        # Verify sys.exit was called with 0
        mock_exit.assert_called_once_with(0)


@pytest.mark.skip(reason="Cannot reload main module during testing")
def test_main_with_valid_transport():
    """Test main module with valid transport setting."""
    with patch("aws_mcp_server.__main__.TRANSPORT", "stdio"):
        with patch("aws_mcp_server.__main__.mcp.run") as mock_run:
            # We can't easily test the full __main__ module execution
            from aws_mcp_server.__main__ import mcp

            # Instead, we'll test the specific function we modified
            with patch("aws_mcp_server.__main__.logger") as mock_logger:
                # Import the function to ensure proper validation
                from aws_mcp_server.__main__ import TRANSPORT

                # Call the relevant function directly
                mcp.run(transport=TRANSPORT)

                # Check that mcp.run was called with the correct transport
                mock_run.assert_called_once_with(transport="stdio")
                # Verify logger was called
                mock_logger.info.assert_any_call("Starting server with transport protocol: stdio")


def test_main_transport_validation():
    """Test transport protocol validation."""
    with patch("aws_mcp_server.config.TRANSPORT", "invalid"):
        from aws_mcp_server.config import TRANSPORT

        # Test the main function's validation logic
        with patch("aws_mcp_server.server.mcp.run") as mock_run:
            with patch("sys.exit") as mock_exit:
                with patch("aws_mcp_server.__main__.logger") as mock_logger:
                    # Execute the validation logic directly
                    if TRANSPORT not in ("stdio", "sse"):
                        mock_logger.error(f"Invalid transport protocol: {TRANSPORT}. Must be 'stdio' or 'sse'")
                        mock_exit(1)
                    else:
                        mock_run(transport=TRANSPORT)

                    # Check that error was logged with invalid transport
                    mock_logger.error.assert_called_once_with("Invalid transport protocol: invalid. Must be 'stdio' or 'sse'")
                    # Check that exit was called
                    mock_exit.assert_called_once_with(1)
                    # Check that mcp.run was not called
                    mock_run.assert_not_called()

```

--------------------------------------------------------------------------------
/.github/workflows/ci.yml:
--------------------------------------------------------------------------------

```yaml
name: PR Validation

on:
  pull_request:
    paths-ignore:
      - 'deploy/**'
      - '*.md'

jobs:
  test:
    runs-on: ubuntu-latest
    if: "!contains(github.event.head_commit.message, '[ci skip]') && !contains(github.event.head_commit.message, '[skip ci]')"
    strategy:
      matrix:
        python-version: ["3.13"]

    steps:
      - uses: actions/checkout@v4

      - name: Set up Python ${{ matrix.python-version }}
        uses: actions/setup-python@v5
        with:
          python-version: ${{ matrix.python-version }}
          cache: "pip"

      - name: Install uv
        run: |
          # Install uv using the official installation method
          curl -LsSf https://astral.sh/uv/install.sh | sh

          # Add uv to PATH
          echo "$HOME/.cargo/bin" >> $GITHUB_PATH

      - name: Install dependencies using uv
        run: |
          # Install dependencies using uv with the lock file and the --system flag
          uv pip install --system -e ".[dev]"

      - name: Lint
        run: make lint
        continue-on-error: true  # Display errors but don't fail build for lint warnings

      - name: Test
        run: make test

      - name: Upload coverage to Codecov
        uses: codecov/codecov-action@v4
        with:
          token: ${{ secrets.CODECOV_TOKEN }}
          file: ./coverage.xml
          fail_ci_if_error: false
          verbose: true

  build:
    runs-on: ubuntu-latest
    needs: test
    steps:
      - uses: actions/checkout@v4

      - name: Set up Docker Buildx
        uses: docker/setup-buildx-action@v3

      - name: Get current date
        id: date
        run: echo "date=$(date -u +'%Y-%m-%dT%H:%M:%SZ')" >> $GITHUB_OUTPUT

      - name: Install setuptools_scm
        run: pip install setuptools_scm
        
      - name: Generate version file and get version info
        id: version
        run: |
          # Generate version file automatically
          python -m setuptools_scm
          
          # Get the raw version from setuptools_scm
          VERSION=$(python -m setuptools_scm)
          
          # Make version Docker-compatible (replace + with -)
          DOCKER_VERSION=$(echo "$VERSION" | tr '+' '-')
          
          # Update the version in pyproject.toml
          sed -i "s|fallback_version=\"0.0.0-dev0\"|fallback_version=\"${VERSION}\"|g" pyproject.toml
          
          echo "version=$DOCKER_VERSION" >> $GITHUB_OUTPUT

      - name: Build Docker image
        uses: docker/build-push-action@v5
        with:
          context: .
          file: ./deploy/docker/Dockerfile
          push: false
          tags: aws-mcp-server:${{ steps.version.outputs.version }}
          platforms: linux/amd64
          build-args: |
            BUILD_DATE=${{ steps.date.outputs.date }}
            VERSION=${{ steps.version.outputs.version }}
          cache-from: type=gha
          cache-to: type=gha,mode=max

```

--------------------------------------------------------------------------------
/deploy/docker/Dockerfile:
--------------------------------------------------------------------------------

```dockerfile
# Multi-stage build with platform-specific configuration
ARG PYTHON_VERSION=3.13-slim
ARG VERSION

# =========== BUILDER STAGE ===========
FROM --platform=${TARGETPLATFORM} python:${PYTHON_VERSION} AS builder

# Install build dependencies
RUN apt-get update && apt-get install -y --no-install-recommends \
    build-essential \
    && apt-get clean \
    && rm -rf /var/lib/apt/lists/*

# Set up working directory
WORKDIR /build

# Copy package definition files
COPY pyproject.toml README.md LICENSE ./
COPY src/ ./src/

RUN cat pyproject.toml

# Install package and dependencies with pip wheel
RUN pip install --no-cache-dir wheel && \
    pip wheel --no-cache-dir --wheel-dir=/wheels -e .

# =========== FINAL STAGE ===========
FROM --platform=${TARGETPLATFORM} python:${PYTHON_VERSION}

# Set target architecture argument
ARG TARGETPLATFORM
ARG TARGETARCH
ARG BUILD_DATE
ARG VERSION

# Add metadata
LABEL maintainer="alexei-led" \
      description="AWS Multi-Command Proxy Server" \
      org.opencontainers.image.source="https://github.com/alexei-led/aws-mcp-server" \
      org.opencontainers.image.version="${VERSION}" \
      org.opencontainers.image.created="${BUILD_DATE}"

# Step 1: Install system packages - keeping all original packages
RUN apt-get update && apt-get install -y --no-install-recommends \
    unzip \
    curl \
    wget \
    less \
    groff \
    jq \
    gnupg \
    tar \
    gzip \
    zip \
    vim \
    net-tools \
    dnsutils \
    openssh-client \
    grep \
    sed \
    gawk \
    findutils \
    && apt-get clean \
    && rm -rf /var/lib/apt/lists/*

# Step 2: Install AWS CLI based on architecture
RUN if [ "${TARGETARCH}" = "arm64" ]; then \
        curl -sSL "https://awscli.amazonaws.com/awscli-exe-linux-aarch64.zip" -o "awscliv2.zip"; \
    else \
        curl -sSL "https://awscli.amazonaws.com/awscli-exe-linux-x86_64.zip" -o "awscliv2.zip"; \
    fi \
    && unzip -q awscliv2.zip \
    && ./aws/install \
    && rm -rf awscliv2.zip aws

# Step 3: Install Session Manager plugin (only for x86_64 due to compatibility issues on ARM)
RUN if [ "${TARGETARCH}" = "amd64" ]; then \
        curl -sSL "https://s3.amazonaws.com/session-manager-downloads/plugin/latest/ubuntu_64bit/session-manager-plugin.deb" -o "session-manager-plugin.deb" \
        && dpkg -i session-manager-plugin.deb 2>/dev/null || apt-get -f install -y \
        && rm session-manager-plugin.deb; \
    else \
        echo "Skipping Session Manager plugin installation for ${TARGETARCH} architecture"; \
    fi

# Set up application directory, user, and permissions
RUN useradd -m -s /bin/bash -u 10001 appuser \
    && mkdir -p /app/logs && chmod 777 /app/logs \
    && mkdir -p /home/appuser/.aws && chmod 700 /home/appuser/.aws

WORKDIR /app

# Copy application code
COPY pyproject.toml README.md LICENSE ./
COPY src/ ./src/

# Copy wheels from builder and install
COPY --from=builder /wheels /wheels
RUN pip install --no-cache-dir --no-index --find-links=/wheels aws-mcp-server && \
    rm -rf /wheels

# Set ownership after all files have been copied - avoid .aws directory
RUN chown -R appuser:appuser /app

# Switch to non-root user
USER appuser

# Set all environment variables in one layer
ENV HOME="/home/appuser" \
    PATH="/usr/local/bin:/usr/local/aws/v2/bin:${PATH}" \
    PYTHONUNBUFFERED=1 \
    AWS_MCP_TRANSPORT=stdio

# Expose the service port
EXPOSE 8000

# Set command to run the server
ENTRYPOINT ["python", "-m", "aws_mcp_server"]
```

--------------------------------------------------------------------------------
/tests/test_aws_setup.py:
--------------------------------------------------------------------------------

```python
"""Test file to verify AWS integration setup works correctly."""

import asyncio
import os
import subprocess
import time
import uuid
from unittest.mock import AsyncMock, patch

import pytest

from aws_mcp_server.server import aws_cli_pipeline


def test_aws_cli_installed():
    """Test that AWS CLI is installed."""
    result = subprocess.run(["aws", "--version"], stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=False)
    assert result.returncode == 0, "AWS CLI is not installed or not in PATH"


@pytest.mark.integration
def test_aws_credentials_exist():
    """Test that AWS credentials exist.

    This test is marked as integration because it requires AWS credentials.
    """
    result = subprocess.run(["aws", "sts", "get-caller-identity"], stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=False)
    assert result.returncode == 0, f"AWS credentials check failed: {result.stderr.decode('utf-8')}"


@pytest.mark.asyncio
@pytest.mark.integration
async def test_aws_execute_command():
    """Test that we can execute a basic AWS command.

    This test is marked as integration because it requires AWS credentials.
    """
    # Test a simple S3 bucket listing command
    result = await aws_cli_pipeline(command="aws s3 ls", timeout=None, ctx=None)

    # Verify the result
    assert isinstance(result, dict)
    assert "status" in result
    assert result["status"] == "success", f"Command failed: {result.get('output', '')}"


@pytest.mark.asyncio
@pytest.mark.integration
async def test_aws_bucket_creation():
    """Test that we can create and delete a bucket.

    This test is marked as integration because it requires AWS credentials.
    """
    # Generate a bucket name
    timestamp = int(time.time())
    random_id = str(uuid.uuid4())[:8]
    bucket_name = f"aws-mcp-test-{timestamp}-{random_id}"

    # Get region from environment or use default
    region = os.environ.get("AWS_TEST_REGION", os.environ.get("AWS_REGION", "us-east-1"))

    try:
        # Create bucket with region specification
        create_result = await aws_cli_pipeline(command=f"aws s3 mb s3://{bucket_name} --region {region}", timeout=None, ctx=None)
        assert create_result["status"] == "success", f"Failed to create bucket: {create_result['output']}"

        # Verify bucket exists
        await asyncio.sleep(3)  # Wait for bucket to be fully available
        list_result = await aws_cli_pipeline(command="aws s3 ls", timeout=None, ctx=None)
        assert bucket_name in list_result["output"], "Bucket was not found in bucket list"

    finally:
        # Clean up - delete the bucket
        await aws_cli_pipeline(command=f"aws s3 rb s3://{bucket_name} --region {region}", timeout=None, ctx=None)


@pytest.mark.asyncio
async def test_aws_command_mocked():
    """Test executing an AWS command with mocked execution.

    This test is mocked so it doesn't require AWS credentials, suitable for CI.
    """
    # We need to patch the correct module path
    with patch("aws_mcp_server.server.execute_aws_command", new_callable=AsyncMock) as mock_execute:
        # Set up mock return value
        mock_execute.return_value = {"status": "success", "output": "Mock bucket list output"}

        # Execute the command
        result = await aws_cli_pipeline(command="aws s3 ls", timeout=None, ctx=None)

        # Verify the mock was called correctly
        mock_execute.assert_called_once()

        # Check the results
        assert result["status"] == "success"
        assert "Mock bucket list output" in result["output"]

```

--------------------------------------------------------------------------------
/src/aws_mcp_server/config.py:
--------------------------------------------------------------------------------

```python
"""Configuration settings for the AWS MCP Server.

This module contains configuration settings for the AWS MCP Server.

Environment variables:
- AWS_MCP_TIMEOUT: Custom timeout in seconds (default: 300)
- AWS_MCP_MAX_OUTPUT: Maximum output size in characters (default: 100000)
- AWS_MCP_TRANSPORT: Transport protocol to use ("stdio" or "sse", default: "stdio")
- AWS_PROFILE: AWS profile to use (default: "default")
- AWS_REGION: AWS region to use (default: "us-east-1")
- AWS_DEFAULT_REGION: Alternative to AWS_REGION (used if AWS_REGION not set)
- AWS_MCP_SECURITY_MODE: Security mode for command validation (strict or permissive, default: strict)
- AWS_MCP_SECURITY_CONFIG: Path to custom security configuration file
"""

import os
from pathlib import Path

# Command execution settings
DEFAULT_TIMEOUT = int(os.environ.get("AWS_MCP_TIMEOUT", "300"))
MAX_OUTPUT_SIZE = int(os.environ.get("AWS_MCP_MAX_OUTPUT", "100000"))

# Transport protocol
TRANSPORT = os.environ.get("AWS_MCP_TRANSPORT", "stdio")

# AWS CLI settings
AWS_PROFILE = os.environ.get("AWS_PROFILE", "default")
AWS_REGION = os.environ.get("AWS_REGION", os.environ.get("AWS_DEFAULT_REGION", "us-east-1"))

# Security settings
SECURITY_MODE = os.environ.get("AWS_MCP_SECURITY_MODE", "strict")
SECURITY_CONFIG_PATH = os.environ.get("AWS_MCP_SECURITY_CONFIG", "")

# Instructions displayed to client during initialization
INSTRUCTIONS = """
AWS MCP Server provides a comprehensive interface to the AWS CLI with best practices guidance.
- Use the describe_command tool to get AWS CLI documentation
- Use the execute_command tool to run AWS CLI commands
- The execute_command tool supports Unix pipes (|) to filter or transform AWS CLI output:
  Example: aws s3api list-buckets --query 'Buckets[*].Name' --output text | sort
- Access AWS environment resources for context:
  - aws://config/profiles: List available AWS profiles and active profile
  - aws://config/regions: List available AWS regions and active region
  - aws://config/regions/{region}: Get detailed information about a specific region 
    including name, code, availability zones, geographic location, and available services
  - aws://config/environment: Get current AWS environment details (profile, region, credentials)
  - aws://config/account: Get current AWS account information (ID, alias, organization)
- Use the built-in prompt templates for common AWS tasks following AWS Well-Architected Framework best practices:

  Essential Operations:
  - create_resource: Create AWS resources with comprehensive security settings
  - resource_inventory: Create detailed resource inventories across regions
  - troubleshoot_service: Perform systematic service issue diagnostics

  Security & Compliance:
  - security_audit: Perform comprehensive service security audits
  - security_posture_assessment: Evaluate overall AWS security posture
  - iam_policy_generator: Generate least-privilege IAM policies
  - compliance_check: Verify compliance with regulatory standards

  Cost & Performance:
  - cost_optimization: Find and implement cost optimization opportunities
  - resource_cleanup: Safely clean up unused resources
  - performance_tuning: Optimize performance for specific resources

  Infrastructure & Architecture:
  - serverless_deployment: Deploy serverless applications with best practices
  - container_orchestration: Set up container environments (ECS/EKS)
  - vpc_network_design: Design and deploy secure VPC networking
  - infrastructure_automation: Automate infrastructure management
  - multi_account_governance: Implement secure multi-account strategies

  Reliability & Monitoring:
  - service_monitoring: Configure comprehensive service monitoring
  - disaster_recovery: Implement enterprise-grade DR solutions
"""

# Application paths
BASE_DIR = Path(__file__).parent.parent.parent

```

--------------------------------------------------------------------------------
/.github/workflows/release.yml:
--------------------------------------------------------------------------------

```yaml
name: Release

on:
  push:
    branches:
      - master
      - main
    tags:
      - '[0-9]+.[0-9]+.[0-9]+'
      - 'v[0-9]+.[0-9]+.[0-9]+'
    paths-ignore:
      - 'tests/**'
      - '*.md'

jobs:
  build-and-push:
    runs-on: ubuntu-latest
    if: "!contains(github.event.head_commit.message, '[ci skip]') && !contains(github.event.head_commit.message, '[skip ci]')"

    permissions:
      contents: read
      packages: write

    steps:
      - uses: actions/checkout@v4

      - name: Set up Python 3.13
        uses: actions/setup-python@v5
        with:
          python-version: "3.13"
          cache: "pip"

      - name: Install dependencies and run tests
        run: |
          python -m pip install -e ".[dev]"
          # Run linting and tests to verify before release
          make lint
          make test
          
      - name: Upload coverage to Codecov
        uses: codecov/codecov-action@v4
        with:
          token: ${{ secrets.CODECOV_TOKEN }}
          file: ./coverage.xml
          fail_ci_if_error: false
          verbose: true

      - name: Log in to GitHub Container Registry
        uses: docker/login-action@v3
        with:
          registry: ghcr.io
          username: ${{ github.actor }}
          password: ${{ secrets.GITHUB_TOKEN }}

      - name: Install setuptools_scm
        run: pip install setuptools_scm

      - name: Generate version file and get version information
        id: version
        run: |
          # Generate version file automatically 
          VERSION=$(python -m setuptools_scm)
          
          # Check if we're on a tag
          if [[ "${{ github.ref_type }}" == "tag" ]]; then
            echo "is_tag=true" >> $GITHUB_OUTPUT
            
            # Parse semver components for tagging
            VERSION_NO_V=$(echo "${{ github.ref_name }}" | sed 's/^v//')
            # overwrite VERSION with the tag name
            VERSION=${VERSION_NO_V}
            MAJOR=$(echo "${VERSION_NO_V}" | cut -d. -f1)
            MINOR=$(echo "${VERSION_NO_V}" | cut -d. -f2)
            PATCH=$(echo "${VERSION_NO_V}" | cut -d. -f3)
            
            echo "major=${MAJOR}" >> $GITHUB_OUTPUT
            echo "major_minor=${MAJOR}.${MINOR}" >> $GITHUB_OUTPUT
            echo "major_minor_patch=${VERSION_NO_V}" >> $GITHUB_OUTPUT
            echo "version=${VERSION_NO_V}" >> $GITHUB_OUTPUT
          else
            # For non-tag builds, use setuptools_scm
            VERSION=$(python -m setuptools_scm)
            # Make version Docker-compatible (replace + with -)
            DOCKER_VERSION=$(echo "$VERSION" | tr '+' '-')
            echo "is_tag=false" >> $GITHUB_OUTPUT
            echo "version=${DOCKER_VERSION}" >> $GITHUB_OUTPUT
          fi
          echo "build_date=$(date -u +'%Y-%m-%dT%H:%M:%SZ')" >> $GITHUB_OUTPUT
          
          # Update the version in pyproject.toml
          sed -i "s|fallback_version=\"0.0.0-dev0\"|fallback_version=\"${VERSION}\"|g" pyproject.toml

      - name: Extract metadata for Docker
        id: meta
        uses: docker/metadata-action@v5
        with:
          images: ghcr.io/${{ github.repository }}
          tags: |
            # For tags: exact semver from the tag name
            type=raw,value=${{ steps.version.outputs.major_minor_patch }},enable=${{ steps.version.outputs.is_tag == 'true' }}
            type=raw,value=${{ steps.version.outputs.major_minor }},enable=${{ steps.version.outputs.is_tag == 'true' }}
            type=raw,value=${{ steps.version.outputs.major }},enable=${{ steps.version.outputs.is_tag == 'true' }}
            type=raw,value=latest,enable=${{ steps.version.outputs.is_tag == 'true' }}
            # Git SHA for both tag and non-tag builds
            type=sha,format=short
            # For main branch: dev tag
            type=raw,value=dev,enable=${{ github.ref == format('refs/heads/{0}', 'main') }}

      - name: Set up Docker Buildx
        uses: docker/setup-buildx-action@v3

      - name: Build and push multi-architecture Docker image
        uses: docker/build-push-action@v6
        with:
          context: .
          file: ./deploy/docker/Dockerfile
          push: true
          platforms: linux/amd64,linux/arm64
          tags: ${{ steps.meta.outputs.tags }}
          labels: ${{ steps.meta.outputs.labels }}
          build-args: |
            BUILD_DATE=${{ steps.version.outputs.build_date }}
            VERSION=${{ steps.version.outputs.version }}
          cache-from: type=gha
          cache-to: type=gha,mode=max

```

--------------------------------------------------------------------------------
/tests/unit/test_prompts.py:
--------------------------------------------------------------------------------

```python
"""Unit tests for AWS MCP Server prompts.

Tests the prompt templates functionality in the AWS MCP Server.
"""

from unittest.mock import MagicMock

import pytest

from aws_mcp_server.prompts import register_prompts


@pytest.fixture
def prompt_functions():
    """Fixture that returns a dictionary of prompt functions.

    This fixture captures all prompt functions registered with the MCP instance.
    """
    captured_functions = {}

    # Create a special mock decorator that captures the functions
    def mock_prompt_decorator(*args, **kwargs):
        def decorator(func):
            captured_functions[func.__name__] = func
            return func

        return decorator

    mock_mcp = MagicMock()
    mock_mcp.prompt = mock_prompt_decorator

    # Register prompts with our special mock
    register_prompts(mock_mcp)

    return captured_functions


def test_prompt_registration(prompt_functions):
    """Test that prompts are registered correctly."""
    # All expected prompt names
    expected_prompt_names = [
        "create_resource",
        "security_audit",
        "cost_optimization",
        "resource_inventory",
        "troubleshoot_service",
        "iam_policy_generator",
        "service_monitoring",
        "disaster_recovery",
        "compliance_check",
        "resource_cleanup",
        "serverless_deployment",
        "container_orchestration",
        "vpc_network_design",
        "infrastructure_automation",
        "security_posture_assessment",
        "performance_tuning",
        "multi_account_governance",
    ]

    # Check that we captured the expected number of functions
    assert len(prompt_functions) == len(expected_prompt_names), f"Expected {len(expected_prompt_names)} prompts, got {len(prompt_functions)}"

    # Check that all expected prompts are registered
    for prompt_name in expected_prompt_names:
        assert prompt_name in prompt_functions, f"Expected prompt '{prompt_name}' not found"


@pytest.mark.parametrize(
    "prompt_name,args,expected_content",
    [
        # Original prompts
        ("create_resource", {"resource_type": "s3-bucket", "resource_name": "my-test-bucket"}, ["s3-bucket", "my-test-bucket", "security", "best practices"]),
        ("security_audit", {"service": "s3"}, ["s3", "security audit", "public access"]),
        ("cost_optimization", {"service": "ec2"}, ["ec2", "cost optimization", "unused"]),
        ("resource_inventory", {"service": "ec2", "region": "us-west-2"}, ["ec2", "in the us-west-2 region", "inventory"]),
        ("resource_inventory", {"service": "s3"}, ["s3", "across all regions", "inventory"]),
        ("troubleshoot_service", {"service": "lambda", "resource_id": "my-function"}, ["lambda", "my-function", "troubleshoot"]),
        (
            "iam_policy_generator",
            {"service": "s3", "actions": "GetObject,PutObject", "resource_pattern": "arn:aws:s3:::my-bucket/*"},
            ["s3", "GetObject,PutObject", "arn:aws:s3:::my-bucket/*", "least-privilege"],
        ),
        ("service_monitoring", {"service": "rds", "metric_type": "performance"}, ["rds", "performance", "monitoring", "CloudWatch"]),
        ("disaster_recovery", {"service": "dynamodb", "recovery_point_objective": "15 minutes"}, ["dynamodb", "15 minutes", "disaster recovery"]),
        ("compliance_check", {"compliance_standard": "HIPAA", "service": "s3"}, ["HIPAA", "for s3", "compliance"]),
        ("resource_cleanup", {"service": "ec2", "criteria": "old"}, ["ec2", "old", "cleanup"]),
        # New prompts
        ("serverless_deployment", {"application_name": "test-app", "runtime": "python3.13"}, ["test-app", "python3.13", "serverless", "AWS SAM"]),
        ("container_orchestration", {"cluster_name": "test-cluster", "service_type": "fargate"}, ["test-cluster", "fargate", "container"]),
        ("vpc_network_design", {"vpc_name": "test-vpc", "cidr_block": "10.0.0.0/16"}, ["test-vpc", "10.0.0.0/16", "VPC", "security"]),
        ("infrastructure_automation", {"resource_type": "ec2", "automation_scope": "deployment"}, ["ec2", "deployment", "automation"]),
        ("security_posture_assessment", {}, ["Security Hub", "GuardDuty", "posture", "assessment"]),
        ("performance_tuning", {"service": "rds", "resource_id": "test-db"}, ["rds", "test-db", "performance", "metrics"]),
        ("multi_account_governance", {"account_type": "organization"}, ["organization", "multi-account", "governance"]),
    ],
)
def test_prompt_templates(prompt_functions, prompt_name, args, expected_content):
    """Test all prompt templates with various inputs using parametrized tests."""
    # Get the captured function
    prompt_func = prompt_functions.get(prompt_name)
    assert prompt_func is not None, f"{prompt_name} prompt not found"

    # Test prompt output with the specified arguments
    prompt_text = prompt_func(**args)

    # Check for expected content
    for content in expected_content:
        assert content.lower() in prompt_text.lower(), f"Expected '{content}' in {prompt_name} output"

```

--------------------------------------------------------------------------------
/src/aws_mcp_server/server.py:
--------------------------------------------------------------------------------

```python
"""Main server implementation for AWS MCP Server.

This module defines the MCP server instance and tool functions for AWS CLI interaction,
providing a standardized interface for AWS CLI command execution and documentation.
It also provides MCP Resources for AWS profiles, regions, and configuration.
"""

import asyncio
import logging
import sys

from mcp.server.fastmcp import Context, FastMCP
from pydantic import Field

from aws_mcp_server import __version__
from aws_mcp_server.cli_executor import (
    CommandExecutionError,
    CommandHelpResult,
    CommandResult,
    CommandValidationError,
    check_aws_cli_installed,
    execute_aws_command,
    get_command_help,
)
from aws_mcp_server.config import INSTRUCTIONS
from aws_mcp_server.prompts import register_prompts
from aws_mcp_server.resources import register_resources

# Configure logging
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", handlers=[logging.StreamHandler(sys.stderr)])
logger = logging.getLogger("aws-mcp-server")


# Run startup checks in synchronous context
def run_startup_checks():
    """Run startup checks to ensure AWS CLI is installed."""
    logger.info("Running startup checks...")
    if not asyncio.run(check_aws_cli_installed()):
        logger.error("AWS CLI is not installed or not in PATH. Please install AWS CLI.")
        sys.exit(1)
    logger.info("AWS CLI is installed and available")


# Call the checks
run_startup_checks()

# Create the FastMCP server following FastMCP best practices
mcp = FastMCP(
    "AWS MCP Server",
    instructions=INSTRUCTIONS,
    version=__version__,
    capabilities={"resources": {}},  # Enable resources capability
)

# Register prompt templates
register_prompts(mcp)

# Register AWS resources
register_resources(mcp)


@mcp.tool()
async def aws_cli_help(
    service: str = Field(description="AWS service (e.g., s3, ec2)"),
    command: str | None = Field(description="Command within the service", default=None),
    ctx: Context | None = None,
) -> CommandHelpResult:
    """Get AWS CLI command documentation.

    Retrieves the help documentation for a specified AWS service or command
    by executing the 'aws <service> [command] help' command.

    Returns:
        CommandHelpResult containing the help text
    """
    logger.info(f"Getting documentation for service: {service}, command: {command or 'None'}")

    try:
        if ctx:
            await ctx.info(f"Fetching help for AWS {service} {command or ''}")

        # Reuse the get_command_help function from cli_executor
        result = await get_command_help(service, command)
        return result
    except Exception as e:
        logger.error(f"Error in aws_cli_help: {e}")
        return CommandHelpResult(help_text=f"Error retrieving help: {str(e)}")


@mcp.tool()
async def aws_cli_pipeline(
    command: str = Field(description="Complete AWS CLI command to execute (can include pipes with Unix commands)"),
    timeout: int | None = Field(description="Timeout in seconds (defaults to AWS_MCP_TIMEOUT)", default=None),
    ctx: Context | None = None,
) -> CommandResult:
    """Execute an AWS CLI command, optionally with Unix command pipes.

    Validates, executes, and processes the results of an AWS CLI command,
    handling errors and formatting the output for better readability.

    The command can include Unix pipes (|) to filter or transform the output,
    similar to a regular shell. The first command must be an AWS CLI command,
    and subsequent piped commands must be basic Unix utilities.

    Supported Unix commands in pipes:
    - File operations: ls, cat, cd, pwd, cp, mv, rm, mkdir, touch, chmod, chown
    - Text processing: grep, sed, awk, cut, sort, uniq, wc, head, tail, tr, find
    - System tools: ps, top, df, du, uname, whoami, date, which, echo
    - Network tools: ping, ifconfig, netstat, curl, wget, dig, nslookup, ssh, scp
    - Other utilities: man, less, tar, gzip, zip, xargs, jq, tee

    Examples:
    - aws s3api list-buckets --query 'Buckets[*].Name' --output text
    - aws s3api list-buckets --query 'Buckets[*].Name' --output text | sort
    - aws ec2 describe-instances | grep InstanceId | wc -l

    Returns:
        CommandResult containing output and status
    """
    logger.info(f"Executing command: {command}" + (f" with timeout: {timeout}" if timeout else ""))

    if ctx:
        is_pipe = "|" in command
        message = "Executing" + (" piped" if is_pipe else "") + " AWS CLI command"
        await ctx.info(message + (f" with timeout: {timeout}s" if timeout else ""))

    try:
        result = await execute_aws_command(command, timeout)

        # Format the output for better readability
        if result["status"] == "success":
            if ctx:
                await ctx.info("Command executed successfully")
        else:
            if ctx:
                await ctx.warning("Command failed")

        return CommandResult(status=result["status"], output=result["output"])
    except CommandValidationError as e:
        logger.warning(f"Command validation error: {e}")
        return CommandResult(status="error", output=f"Command validation error: {str(e)}")
    except CommandExecutionError as e:
        logger.warning(f"Command execution error: {e}")
        return CommandResult(status="error", output=f"Command execution error: {str(e)}")
    except Exception as e:
        logger.error(f"Error in aws_cli_pipeline: {e}")
        return CommandResult(status="error", output=f"Unexpected error: {str(e)}")

```

--------------------------------------------------------------------------------
/tests/integration/test_security_integration.py:
--------------------------------------------------------------------------------

```python
"""Integration tests for security rules in AWS MCP Server.

These tests verify that security rules properly prevent dangerous commands
while allowing safe operations.
"""

import pytest

from aws_mcp_server.server import aws_cli_pipeline


class TestSecurityIntegration:
    """Integration tests for security system.

    These tests validate that:
    1. Safe operations are allowed
    2. Dangerous operations are blocked
    3. Pipe commands are properly validated
    """

    @pytest.mark.asyncio
    @pytest.mark.integration
    @pytest.mark.parametrize(
        "command,should_succeed,expected_message",
        [
            # Safe operations that should succeed
            ("aws s3 ls", True, None),
            ("aws ec2 describe-instances", True, None),
            ("aws iam list-users", True, None),
            # Dangerous IAM operations that should be blocked
            (
                "aws iam create-user --user-name test-user-12345",
                False,
                "restricted for security reasons",
            ),
            (
                "aws iam create-access-key --user-name admin",
                False,
                "restricted for security reasons",
            ),
            # Dangerous CloudTrail operations (good for testing as they're security-related but not destructive)
            (
                "aws cloudtrail delete-trail --name test-trail",
                False,
                "restricted for security reasons",
            ),
            # Complex regex pattern tests
            (
                "aws iam create-user --user-name admin-user12345",
                False,
                "Creating users with sensitive names",
            ),
            (
                "aws ec2 authorize-security-group-ingress --group-id sg-12345 --protocol tcp --port 22 --cidr 0.0.0.0/0",
                False,
                "restricted for security reasons",
            ),
            # Commands with safe overrides
            (
                "aws iam create-user --help",
                True,
                None,
            ),
            (
                "aws ec2 describe-security-groups",
                True,
                None,
            ),
        ],
    )
    async def test_security_rules(self, ensure_aws_credentials, command, should_succeed, expected_message):
        """Test that security rules block dangerous commands and allow safe operations.

        This test verifies each command against security rules without actually executing them
        against AWS services.
        """
        # Execute the command
        result = await aws_cli_pipeline(command=command, timeout=None, ctx=None)

        if should_succeed:
            if result["status"] != "success":
                # If command would succeed but API returns error (e.g., invalid resource),
                # we still want to verify it wasn't blocked by security rules
                assert "restricted for security reasons" not in result["output"], f"Command should pass security validation but was blocked: {result['output']}"
                assert "Command validation error" not in result["output"], f"Command should pass security validation but failed validation: {result['output']}"
            else:
                assert result["status"] == "success", f"Command should succeed but failed: {result['output']}"
        else:
            assert result["status"] == "error", f"Command should fail but succeeded: {result['output']}"
            assert expected_message in result["output"], f"Expected error message '{expected_message}' not found in: {result['output']}"

    @pytest.mark.asyncio
    @pytest.mark.integration
    @pytest.mark.parametrize(
        "command,should_succeed,expected_message",
        [
            # Safe pipe commands
            (
                "aws ec2 describe-regions --output text | grep us-east",
                True,
                None,
            ),
            (
                "aws s3 ls | grep bucket | wc -l",
                True,
                None,
            ),
            # Dangerous first command
            (
                "aws iam create-user --user-name test-user-12345 | grep test",
                False,
                "restricted for security reasons",
            ),
            # Unsafe pipe command
            (
                "aws s3 ls | sudo",  # sudo shouldn't be allowed in the allowed unix command list
                False,
                "not allowed",
            ),
            # Complex pipe chain
            (
                "aws ec2 describe-regions --output json | grep RegionName | head -5 | sort",
                True,
                None,
            ),
        ],
    )
    async def test_piped_command_security(self, ensure_aws_credentials, command, should_succeed, expected_message):
        """Test that security rules properly validate piped commands."""
        result = await aws_cli_pipeline(command=command, timeout=None, ctx=None)

        if should_succeed:
            if result["status"] != "success":
                # If command should be allowed but failed for other reasons,
                # verify it wasn't blocked by security rules
                assert "restricted for security reasons" not in result["output"], f"Command should pass security validation but was blocked: {result['output']}"
                assert "not allowed" not in result["output"], f"Command should pass security validation but was blocked: {result['output']}"
        else:
            assert result["status"] == "error", f"Command should fail but succeeded: {result['output']}"
            assert expected_message in result["output"], f"Expected error message '{expected_message}' not found in: {result['output']}"

```

--------------------------------------------------------------------------------
/tests/conftest.py:
--------------------------------------------------------------------------------

```python
"""Configuration for pytest."""

import os

import pytest


def pytest_addoption(parser):
    """Add command-line options to pytest."""
    parser.addoption(
        "--run-integration",
        action="store_true",
        default=False,
        help="Run integration tests that require AWS CLI and AWS account",
    )


def pytest_configure(config):
    """Register custom markers."""
    config.addinivalue_line("markers", "integration: mark test as requiring AWS CLI and AWS account")


def pytest_collection_modifyitems(config, items):
    """Skip integration tests unless --run-integration is specified."""
    print(f"Run integration flag: {config.getoption('--run-integration')}")

    if config.getoption("--run-integration"):
        # Run all tests
        print("Integration tests will be run")
        return

    skip_integration = pytest.mark.skip(reason="Integration tests need --run-integration option")
    print(f"Will check {len(items)} items for integration markers")

    for item in items:
        print(f"Test: {item.name}, keywords: {list(item.keywords)}")
        if "integration" in item.keywords:
            print(f"Skipping integration test: {item.name}")
            item.add_marker(skip_integration)


@pytest.fixture(scope="function")
async def aws_s3_bucket(ensure_aws_credentials):
    """Create or use an S3 bucket for integration tests.

    Uses AWS_TEST_BUCKET if specified, otherwise creates a temporary bucket
    and cleans it up after tests complete.
    """
    import asyncio
    import time
    import uuid

    from aws_mcp_server.server import aws_cli_pipeline

    print("AWS S3 bucket fixture called")

    # Use specified bucket or create a dynamically named one
    bucket_name = os.environ.get("AWS_TEST_BUCKET")
    bucket_created = False

    # Get region from environment or use configured default
    region = os.environ.get("AWS_TEST_REGION", os.environ.get("AWS_REGION", "us-east-1"))
    print(f"Using AWS region: {region}")

    print(f"Using bucket name: {bucket_name or 'Will create dynamic bucket'}")

    if not bucket_name:
        # Generate a unique bucket name with timestamp and random id
        timestamp = int(time.time())
        random_id = str(uuid.uuid4())[:8]
        bucket_name = f"aws-mcp-test-{timestamp}-{random_id}"
        print(f"Generated bucket name: {bucket_name}")

        # Create the bucket with region specified
        create_cmd = f"aws s3 mb s3://{bucket_name} --region {region}"
        print(f"Creating bucket with command: {create_cmd}")
        result = await aws_cli_pipeline(command=create_cmd, timeout=None, ctx=None)
        if result["status"] != "success":
            print(f"Failed to create bucket: {result['output']}")
            pytest.skip(f"Failed to create test bucket: {result['output']}")
        bucket_created = True
        print("Bucket created successfully")
        # Wait a moment for bucket to be fully available
        await asyncio.sleep(3)

    # Yield the bucket name for tests to use
    print(f"Yielding bucket name: {bucket_name}")
    yield bucket_name

    # Clean up the bucket if we created it
    if bucket_created:
        print(f"Cleaning up bucket: {bucket_name}")
        try:
            # First remove all objects
            print("Removing objects from bucket")
            await aws_cli_pipeline(command=f"aws s3 rm s3://{bucket_name} --recursive --region {region}", timeout=None, ctx=None)
            # Then delete the bucket
            print("Deleting bucket")
            await aws_cli_pipeline(command=f"aws s3 rb s3://{bucket_name} --region {region}", timeout=None, ctx=None)
            print("Bucket cleanup complete")
        except Exception as e:
            print(f"Warning: Error cleaning up test bucket: {e}")


@pytest.fixture
def ensure_aws_credentials():
    """Ensure AWS credentials are configured and AWS CLI is installed."""
    import subprocess

    print("Checking AWS credentials and CLI")

    # Check for AWS CLI installation
    try:
        result = subprocess.run(["aws", "--version"], stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=False)
        print(f"AWS CLI check: {result.returncode == 0}")
        if result.returncode != 0:
            print(f"AWS CLI not found: {result.stderr.decode('utf-8')}")
            pytest.skip("AWS CLI not installed or not in PATH")
    except (subprocess.SubprocessError, FileNotFoundError) as e:
        print(f"AWS CLI check error: {str(e)}")
        pytest.skip("AWS CLI not installed or not in PATH")

    # Check for AWS credentials - simplified check
    home_dir = os.path.expanduser("~")
    creds_file = os.path.join(home_dir, ".aws", "credentials")
    config_file = os.path.join(home_dir, ".aws", "config")

    has_creds = os.path.exists(creds_file)
    has_config = os.path.exists(config_file)
    print(f"AWS files: credentials={has_creds}, config={has_config}")
    # Don't skip based on file presence - let the get-caller-identity check decide

    # Verify AWS credentials work by making a simple call
    try:
        result = subprocess.run(["aws", "sts", "get-caller-identity"], stdout=subprocess.PIPE, stderr=subprocess.PIPE, timeout=5, check=False)
        print(f"AWS auth check: {result.returncode == 0}")
        if result.returncode != 0:
            error_msg = result.stderr.decode("utf-8")
            print(f"AWS auth failed: {error_msg}")
            pytest.skip(f"AWS credentials not valid: {error_msg}")
        else:
            print(f"AWS identity: {result.stdout.decode('utf-8')}")
    except subprocess.SubprocessError as e:
        print(f"AWS auth check error: {str(e)}")
        pytest.skip("Failed to verify AWS credentials")

    # All checks passed - AWS CLI and credentials are working
    print("AWS credentials verification successful")
    return True

```

--------------------------------------------------------------------------------
/tests/integration/test_server_integration.py:
--------------------------------------------------------------------------------

```python
"""Mocked integration tests for AWS MCP Server functionality.

These tests use mocks rather than actual AWS CLI calls, so they can
run without AWS credentials or AWS CLI installed.
"""

import json
import logging
import os
from unittest.mock import patch

import pytest

from aws_mcp_server.server import aws_cli_help, aws_cli_pipeline, mcp

# Enable debug logging for tests
logging.basicConfig(level=logging.DEBUG)


@pytest.fixture
def mock_aws_environment():
    """Set up mock AWS environment variables for testing."""
    original_env = os.environ.copy()
    os.environ["AWS_PROFILE"] = "test-profile"
    os.environ["AWS_REGION"] = "us-west-2"
    yield
    # Restore original environment
    os.environ.clear()
    os.environ.update(original_env)


@pytest.fixture
def mcp_client():
    """Return a FastMCP client for testing."""
    return mcp


class TestServerIntegration:
    """Integration tests for the AWS MCP Server using mocks.

    These tests use mocks and don't actually call AWS, but they test
    more of the system together than unit tests. They don't require the
    integration marker since they can run without AWS CLI or credentials."""

    @pytest.mark.asyncio
    @pytest.mark.parametrize(
        "service,command,mock_response,expected_content",
        [
            # Basic service help
            ("s3", None, {"help_text": "AWS S3 HELP\nCommands:\ncp\nls\nmv\nrm\nsync"}, ["AWS S3 HELP", "Commands", "ls", "sync"]),
            # Command-specific help
            (
                "ec2",
                "describe-instances",
                {"help_text": "DESCRIPTION\n  Describes the specified instances.\n\nSYNOPSIS\n  describe-instances\n  [--instance-ids <value>]"},
                ["DESCRIPTION", "SYNOPSIS", "instance-ids"],
            ),
            # Help for a different service
            ("lambda", "list-functions", {"help_text": "LAMBDA LIST-FUNCTIONS\nLists your Lambda functions"}, ["LAMBDA", "LIST-FUNCTIONS", "Lists"]),
        ],
    )
    @patch("aws_mcp_server.server.get_command_help")
    async def test_aws_cli_help_integration(self, mock_get_help, mock_aws_environment, service, command, mock_response, expected_content):
        """Test the aws_cli_help functionality with table-driven tests."""
        # Configure the mock response
        mock_get_help.return_value = mock_response

        # Call the aws_cli_help function
        result = await aws_cli_help(service=service, command=command, ctx=None)

        # Verify the results
        assert "help_text" in result
        for content in expected_content:
            assert content in result["help_text"], f"Expected '{content}' in help text"

        # Verify the mock was called correctly
        mock_get_help.assert_called_once_with(service, command)

    @pytest.mark.asyncio
    @pytest.mark.parametrize(
        "command,mock_response,expected_result,timeout",
        [
            # JSON output test
            (
                "aws s3 ls --output json",
                {"status": "success", "output": json.dumps({"Buckets": [{"Name": "test-bucket", "CreationDate": "2023-01-01T00:00:00Z"}]})},
                {"status": "success", "contains": ["Buckets", "test-bucket"]},
                None,
            ),
            # Text output test
            (
                "aws ec2 describe-instances --query 'Reservations[*]' --output text",
                {"status": "success", "output": "i-12345\trunning\tt2.micro"},
                {"status": "success", "contains": ["i-12345", "running"]},
                None,
            ),
            # Test with custom timeout
            ("aws rds describe-db-instances", {"status": "success", "output": "DB instances list"}, {"status": "success", "contains": ["DB instances"]}, 60),
            # Error case
            (
                "aws s3 ls --invalid-flag",
                {"status": "error", "output": "Unknown options: --invalid-flag"},
                {"status": "error", "contains": ["--invalid-flag"]},
                None,
            ),
            # Piped command
            (
                "aws s3api list-buckets --query 'Buckets[*].Name' --output text | sort",
                {"status": "success", "output": "bucket1\nbucket2\nbucket3"},
                {"status": "success", "contains": ["bucket1", "bucket3"]},
                None,
            ),
        ],
    )
    @patch("aws_mcp_server.server.execute_aws_command")
    async def test_aws_cli_pipeline_scenarios(self, mock_execute, mock_aws_environment, command, mock_response, expected_result, timeout):
        """Test aws_cli_pipeline with various scenarios using table-driven tests."""
        # Configure the mock response
        mock_execute.return_value = mock_response

        # Call the aws_cli_pipeline function
        result = await aws_cli_pipeline(command=command, timeout=timeout, ctx=None)

        # Verify status
        assert result["status"] == expected_result["status"]

        # Verify expected content is present
        for content in expected_result["contains"]:
            assert content in result["output"], f"Expected '{content}' in output"

        # Verify the mock was called correctly
        mock_execute.assert_called_once_with(command, timeout)

    @pytest.mark.asyncio
    @patch("aws_mcp_server.resources.get_aws_profiles")
    @patch("aws_mcp_server.resources.get_aws_regions")
    @patch("aws_mcp_server.resources.get_aws_environment")
    @patch("aws_mcp_server.resources.get_aws_account_info")
    async def test_mcp_resources_access(
        self, mock_get_aws_account_info, mock_get_aws_environment, mock_get_aws_regions, mock_get_aws_profiles, mock_aws_environment, mcp_client
    ):
        """Test that MCP resources are properly registered and accessible to clients."""
        # Set up mock return values
        mock_get_aws_profiles.return_value = ["default", "test-profile", "dev"]
        mock_get_aws_regions.return_value = [
            {"RegionName": "us-east-1", "RegionDescription": "US East (N. Virginia)"},
            {"RegionName": "us-west-2", "RegionDescription": "US West (Oregon)"},
        ]
        mock_get_aws_environment.return_value = {
            "aws_profile": "test-profile",
            "aws_region": "us-west-2",
            "has_credentials": True,
            "credentials_source": "profile",
        }
        mock_get_aws_account_info.return_value = {
            "account_id": "123456789012",
            "account_alias": "test-account",
            "organization_id": "o-abcdef123456",
        }

        # Define the expected resource URIs
        expected_resources = ["aws://config/profiles", "aws://config/regions", "aws://config/environment", "aws://config/account"]

        # Test that resources are accessible through MCP client
        resources = await mcp_client.list_resources()

        # Verify all expected resources are present
        resource_uris = [str(r.uri) for r in resources]
        for uri in expected_resources:
            assert uri in resource_uris, f"Resource {uri} not found in resources list"

        # Test accessing each resource by URI
        for uri in expected_resources:
            resource = await mcp_client.read_resource(uri=uri)
            assert resource is not None, f"Failed to read resource {uri}"

            # Resource is a list with one item that has a content attribute
            # The content is a JSON string that needs to be parsed
            import json

            content = json.loads(resource[0].content)

            # Verify specific resource content
            if uri == "aws://config/profiles":
                assert "profiles" in content
                assert len(content["profiles"]) == 3
                assert any(p["name"] == "test-profile" and p["is_current"] for p in content["profiles"])

            elif uri == "aws://config/regions":
                assert "regions" in content
                assert len(content["regions"]) == 2
                assert any(r["name"] == "us-west-2" and r["is_current"] for r in content["regions"])

            elif uri == "aws://config/environment":
                assert content["aws_profile"] == "test-profile"
                assert content["aws_region"] == "us-west-2"
                assert content["has_credentials"] is True

            elif uri == "aws://config/account":
                assert content["account_id"] == "123456789012"
                assert content["account_alias"] == "test-account"

```

--------------------------------------------------------------------------------
/src/aws_mcp_server/tools.py:
--------------------------------------------------------------------------------

```python
"""Command execution utilities for AWS MCP Server.

This module provides utilities for validating and executing commands, including:
- AWS CLI commands
- Basic Unix commands
- Command pipes (piping output from one command to another)
"""

import asyncio
import logging
import shlex
from typing import List, TypedDict

from aws_mcp_server.config import DEFAULT_TIMEOUT, MAX_OUTPUT_SIZE

# Configure module logger
logger = logging.getLogger(__name__)

# List of allowed Unix commands that can be used in a pipe
ALLOWED_UNIX_COMMANDS = [
    # File operations
    "cat",
    "ls",
    "cd",
    "pwd",
    "cp",
    "mv",
    "rm",
    "mkdir",
    "touch",
    "chmod",
    "chown",
    # Text processing
    "grep",
    "sed",
    "awk",
    "cut",
    "sort",
    "uniq",
    "wc",
    "head",
    "tail",
    "tr",
    "find",
    # System information
    "ps",
    "top",
    "df",
    "du",
    "uname",
    "whoami",
    "date",
    "which",
    "echo",
    # Networking
    "ping",
    "ifconfig",
    "netstat",
    "curl",
    "wget",
    "dig",
    "nslookup",
    "ssh",
    "scp",
    # Other utilities
    "man",
    "less",
    "tar",
    "gzip",
    "gunzip",
    "zip",
    "unzip",
    "xargs",
    "jq",
    "tee",
]


class CommandResult(TypedDict):
    """Type definition for command execution results."""

    status: str
    output: str


def validate_unix_command(command: str) -> bool:
    """Validate that a command is an allowed Unix command.

    Args:
        command: The Unix command to validate

    Returns:
        True if the command is valid, False otherwise
    """
    cmd_parts = shlex.split(command)
    if not cmd_parts:
        return False

    # Check if the command is in the allowed list
    return cmd_parts[0] in ALLOWED_UNIX_COMMANDS


def is_pipe_command(command: str) -> bool:
    """Check if a command contains a pipe operator.

    Args:
        command: The command to check

    Returns:
        True if the command contains a pipe operator, False otherwise
    """
    # Check for pipe operator that's not inside quotes
    in_single_quote = False
    in_double_quote = False
    escaped = False

    for _, char in enumerate(command):
        # Handle escape sequences
        if char == "\\" and not escaped:
            escaped = True
            continue

        if not escaped:
            if char == "'" and not in_double_quote:
                in_single_quote = not in_single_quote
            elif char == '"' and not in_single_quote:
                in_double_quote = not in_double_quote
            elif char == "|" and not in_single_quote and not in_double_quote:
                return True

        escaped = False

    return False


def split_pipe_command(pipe_command: str) -> List[str]:
    """Split a piped command into individual commands.

    Args:
        pipe_command: The piped command string

    Returns:
        List of individual command strings
    """
    commands = []
    current_command = ""
    in_single_quote = False
    in_double_quote = False
    escaped = False

    for _, char in enumerate(pipe_command):
        # Handle escape sequences
        if char == "\\" and not escaped:
            escaped = True
            current_command += char
            continue

        if not escaped:
            if char == "'" and not in_double_quote:
                in_single_quote = not in_single_quote
                current_command += char
            elif char == '"' and not in_single_quote:
                in_double_quote = not in_double_quote
                current_command += char
            elif char == "|" and not in_single_quote and not in_double_quote:
                commands.append(current_command.strip())
                current_command = ""
            else:
                current_command += char
        else:
            # Add the escaped character
            current_command += char
            escaped = False

    if current_command.strip():
        commands.append(current_command.strip())

    return commands


async def execute_piped_command(pipe_command: str, timeout: int | None = None) -> CommandResult:
    """Execute a command that contains pipes.

    Args:
        pipe_command: The piped command to execute
        timeout: Optional timeout in seconds (defaults to DEFAULT_TIMEOUT)

    Returns:
        CommandResult containing output and status
    """
    # Set timeout
    if timeout is None:
        timeout = DEFAULT_TIMEOUT

    logger.debug(f"Executing piped command: {pipe_command}")

    try:
        # Split the pipe_command into individual commands
        commands = split_pipe_command(pipe_command)

        # For each command, split it into command parts for subprocess_exec
        command_parts_list = [shlex.split(cmd) for cmd in commands]

        if len(commands) == 0:
            return CommandResult(status="error", output="Empty command")

        # Execute the first command
        first_cmd = command_parts_list[0]
        first_process = await asyncio.create_subprocess_exec(*first_cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE)

        current_process = first_process
        current_stdout = None
        current_stderr = None

        # For each additional command in the pipe, execute it with the previous command's output
        for cmd_parts in command_parts_list[1:]:
            try:
                # Wait for the previous command to complete with timeout
                current_stdout, current_stderr = await asyncio.wait_for(current_process.communicate(), timeout)

                if current_process.returncode != 0:
                    # If previous command failed, stop the pipe execution
                    stderr_str = current_stderr.decode("utf-8", errors="replace")
                    logger.warning(f"Piped command failed with return code {current_process.returncode}: {pipe_command}")
                    logger.debug(f"Command error output: {stderr_str}")
                    return CommandResult(status="error", output=stderr_str or "Command failed with no error output")

                # Create the next process with the previous output as input
                next_process = await asyncio.create_subprocess_exec(
                    *cmd_parts, stdin=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
                )

                # Pass the output of the previous command to the input of the next command
                stdout, stderr = await asyncio.wait_for(next_process.communicate(input=current_stdout), timeout)

                current_process = next_process
                current_stdout = stdout
                current_stderr = stderr

            except asyncio.TimeoutError:
                logger.warning(f"Piped command timed out after {timeout} seconds: {pipe_command}")
                try:
                    # process.kill() is synchronous, not a coroutine
                    current_process.kill()
                except Exception as e:
                    logger.error(f"Error killing process: {e}")
                return CommandResult(status="error", output=f"Command timed out after {timeout} seconds")

        # Wait for the final command to complete if it hasn't already
        if current_stdout is None:
            try:
                current_stdout, current_stderr = await asyncio.wait_for(current_process.communicate(), timeout)
            except asyncio.TimeoutError:
                logger.warning(f"Piped command timed out after {timeout} seconds: {pipe_command}")
                try:
                    current_process.kill()
                except Exception as e:
                    logger.error(f"Error killing process: {e}")
                return CommandResult(status="error", output=f"Command timed out after {timeout} seconds")

        # Process output
        stdout_str = current_stdout.decode("utf-8", errors="replace")
        stderr_str = current_stderr.decode("utf-8", errors="replace")

        # Truncate output if necessary
        if len(stdout_str) > MAX_OUTPUT_SIZE:
            logger.info(f"Output truncated from {len(stdout_str)} to {MAX_OUTPUT_SIZE} characters")
            stdout_str = stdout_str[:MAX_OUTPUT_SIZE] + "\n... (output truncated)"

        if current_process.returncode != 0:
            logger.warning(f"Piped command failed with return code {current_process.returncode}: {pipe_command}")
            logger.debug(f"Command error output: {stderr_str}")
            return CommandResult(status="error", output=stderr_str or "Command failed with no error output")

        return CommandResult(status="success", output=stdout_str)
    except Exception as e:
        logger.error(f"Failed to execute piped command: {str(e)}")
        return CommandResult(status="error", output=f"Failed to execute command: {str(e)}")

```

--------------------------------------------------------------------------------
/tests/unit/test_server.py:
--------------------------------------------------------------------------------

```python
"""Tests for the FastMCP server implementation."""

from unittest.mock import ANY, AsyncMock, patch

import pytest

from aws_mcp_server.cli_executor import CommandExecutionError, CommandValidationError
from aws_mcp_server.server import aws_cli_help, aws_cli_pipeline, mcp, run_startup_checks


def test_run_startup_checks():
    """Test the run_startup_checks function."""
    # Create a complete mock for asyncio.run to avoid the coroutine warning
    # We'll mock both the check_aws_cli_installed and asyncio.run
    # This way we don't rely on any actual coroutine behavior in testing

    # Test when AWS CLI is installed
    with patch("aws_mcp_server.server.check_aws_cli_installed") as mock_check:
        # Don't use the actual coroutine
        mock_check.return_value = None  # Not used when mocking asyncio.run

        with patch("aws_mcp_server.server.asyncio.run", return_value=True):
            with patch("sys.exit") as mock_exit:
                run_startup_checks()
                mock_exit.assert_not_called()

    # Test when AWS CLI is not installed
    with patch("aws_mcp_server.server.check_aws_cli_installed") as mock_check:
        # Don't use the actual coroutine
        mock_check.return_value = None  # Not used when mocking asyncio.run

        with patch("aws_mcp_server.server.asyncio.run", return_value=False):
            with patch("sys.exit") as mock_exit:
                run_startup_checks()
                mock_exit.assert_called_once_with(1)


@pytest.mark.asyncio
@pytest.mark.parametrize(
    "service,command,expected_result",
    [
        ("s3", None, {"help_text": "Test help text"}),
        ("s3", "ls", {"help_text": "Test help text"}),
        ("ec2", "describe-instances", {"help_text": "Test help text"}),
    ],
)
async def test_aws_cli_help(service, command, expected_result):
    """Test the aws_cli_help tool with various inputs."""
    # Mock the get_command_help function instead of execute_aws_command
    with patch("aws_mcp_server.server.get_command_help", new_callable=AsyncMock) as mock_get_help:
        mock_get_help.return_value = expected_result

        # Call the tool with specified service and command
        result = await aws_cli_help(service=service, command=command)

        # Verify the result
        assert result == expected_result

        # Verify the correct arguments were passed to the mocked function
        mock_get_help.assert_called_with(service, command)


@pytest.mark.asyncio
async def test_aws_cli_help_with_context():
    """Test the aws_cli_help tool with context."""
    mock_ctx = AsyncMock()

    with patch("aws_mcp_server.server.get_command_help", new_callable=AsyncMock) as mock_get_help:
        mock_get_help.return_value = {"help_text": "Test help text"}

        result = await aws_cli_help(service="s3", command="ls", ctx=mock_ctx)

        assert result == {"help_text": "Test help text"}
        mock_ctx.info.assert_called_once()
        assert "Fetching help for AWS s3 ls" in mock_ctx.info.call_args[0][0]


@pytest.mark.asyncio
async def test_aws_cli_help_exception_handling():
    """Test exception handling in aws_cli_help."""
    with patch("aws_mcp_server.server.get_command_help", side_effect=Exception("Test exception")):
        result = await aws_cli_help(service="s3")

        assert "help_text" in result
        assert "Error retrieving help" in result["help_text"]
        assert "Test exception" in result["help_text"]


@pytest.mark.asyncio
@pytest.mark.parametrize(
    "command,timeout,expected_result",
    [
        # Basic success case
        ("aws s3 ls", None, {"status": "success", "output": "Test output"}),
        # Success with custom timeout
        ("aws s3 ls", 60, {"status": "success", "output": "Test output"}),
        # Complex command success
        ("aws ec2 describe-instances --filters Name=instance-state-name,Values=running", None, {"status": "success", "output": "Running instances"}),
    ],
)
async def test_aws_cli_pipeline_success(command, timeout, expected_result):
    """Test the aws_cli_pipeline tool with successful execution."""
    # Need to patch check_aws_cli_installed to avoid the coroutine warning
    with patch("aws_mcp_server.server.check_aws_cli_installed", return_value=None):
        # Mock the execute_aws_command function
        with patch("aws_mcp_server.server.execute_aws_command", new_callable=AsyncMock) as mock_execute:
            mock_execute.return_value = expected_result

            # Call the aws_cli_pipeline function
            result = await aws_cli_pipeline(command=command, timeout=timeout)

            # Verify the result
            assert result["status"] == expected_result["status"]
            assert result["output"] == expected_result["output"]

            # Verify the correct arguments were passed to the mocked function
            mock_execute.assert_called_with(command, timeout if timeout else ANY)


@pytest.mark.asyncio
async def test_aws_cli_pipeline_with_context():
    """Test the aws_cli_pipeline tool with context."""
    mock_ctx = AsyncMock()

    # Need to patch check_aws_cli_installed to avoid the coroutine warning
    with patch("aws_mcp_server.server.check_aws_cli_installed", return_value=None):
        # Test successful command with context
        with patch("aws_mcp_server.server.execute_aws_command", new_callable=AsyncMock) as mock_execute:
            mock_execute.return_value = {"status": "success", "output": "Test output"}

            result = await aws_cli_pipeline(command="aws s3 ls", ctx=mock_ctx)

            assert result["status"] == "success"
            assert result["output"] == "Test output"

            # Verify context was used correctly
            assert mock_ctx.info.call_count == 2
            assert "Executing AWS CLI command" in mock_ctx.info.call_args_list[0][0][0]
            assert "Command executed successfully" in mock_ctx.info.call_args_list[1][0][0]

        # Test failed command with context
        mock_ctx.reset_mock()
        with patch("aws_mcp_server.server.execute_aws_command", new_callable=AsyncMock) as mock_execute:
            mock_execute.return_value = {"status": "error", "output": "Error output"}

            result = await aws_cli_pipeline(command="aws s3 ls", ctx=mock_ctx)

            assert result["status"] == "error"
            assert result["output"] == "Error output"

            # Verify context was used correctly
            assert mock_ctx.info.call_count == 1
            assert mock_ctx.warning.call_count == 1
            assert "Command failed" in mock_ctx.warning.call_args[0][0]


@pytest.mark.asyncio
async def test_aws_cli_pipeline_with_context_and_timeout():
    """Test the aws_cli_pipeline tool with context and timeout."""
    mock_ctx = AsyncMock()

    # Need to patch check_aws_cli_installed to avoid the coroutine warning
    with patch("aws_mcp_server.server.check_aws_cli_installed", return_value=None):
        with patch("aws_mcp_server.server.execute_aws_command", new_callable=AsyncMock) as mock_execute:
            mock_execute.return_value = {"status": "success", "output": "Test output"}

            await aws_cli_pipeline(command="aws s3 ls", timeout=60, ctx=mock_ctx)

            # Verify timeout was mentioned in the context message
            message = mock_ctx.info.call_args_list[0][0][0]
            assert "with timeout: 60s" in message


@pytest.mark.asyncio
@pytest.mark.parametrize(
    "command,exception,expected_error_type,expected_message",
    [
        # Validation error
        ("not aws", CommandValidationError("Invalid command"), "Command validation error", "Invalid command"),
        # Execution error
        ("aws s3 ls", CommandExecutionError("Execution failed"), "Command execution error", "Execution failed"),
        # Timeout error
        ("aws ec2 describe-instances", CommandExecutionError("Command timed out"), "Command execution error", "Command timed out"),
        # Generic/unexpected error
        ("aws dynamodb scan", Exception("Unexpected error"), "Unexpected error", "Unexpected error"),
    ],
)
async def test_aws_cli_pipeline_errors(command, exception, expected_error_type, expected_message):
    """Test the aws_cli_pipeline tool with various error scenarios."""
    # Need to patch check_aws_cli_installed to avoid the coroutine warning
    with patch("aws_mcp_server.server.check_aws_cli_installed", return_value=None):
        # Mock the execute_aws_command function to raise the specified exception
        with patch("aws_mcp_server.server.execute_aws_command", side_effect=exception) as mock_execute:
            # Call the tool
            result = await aws_cli_pipeline(command=command)

            # Verify error status and message
            assert result["status"] == "error"
            assert expected_error_type in result["output"]
            assert expected_message in result["output"]

            # Verify the command was called correctly
            mock_execute.assert_called_with(command, ANY)


@pytest.mark.asyncio
async def test_mcp_server_initialization():
    """Test that the MCP server initializes correctly."""
    # Verify server was created with correct name
    assert mcp.name == "AWS MCP Server"

    # Verify tools are registered by calling them
    # This ensures the tools exist without depending on FastMCP's internal structure
    assert callable(aws_cli_help)
    assert callable(aws_cli_pipeline)

```

--------------------------------------------------------------------------------
/src/aws_mcp_server/cli_executor.py:
--------------------------------------------------------------------------------

```python
"""Utility for executing AWS CLI commands.

This module provides functions to validate and execute AWS CLI commands
with proper error handling, timeouts, and output processing.
"""

import asyncio
import logging
import shlex
from typing import TypedDict

from aws_mcp_server.config import DEFAULT_TIMEOUT, MAX_OUTPUT_SIZE
from aws_mcp_server.security import validate_aws_command, validate_pipe_command
from aws_mcp_server.tools import (
    CommandResult,
    execute_piped_command,
    is_pipe_command,
    split_pipe_command,
)

# Configure module logger
logger = logging.getLogger(__name__)


class CommandHelpResult(TypedDict):
    """Type definition for command help results."""

    help_text: str


class CommandValidationError(Exception):
    """Exception raised when a command fails validation.

    This exception is raised when a command doesn't meet the
    validation requirements, such as starting with 'aws'.
    """

    pass


class CommandExecutionError(Exception):
    """Exception raised when a command fails to execute.

    This exception is raised when there's an error during command
    execution, such as timeouts or subprocess failures.
    """

    pass


def is_auth_error(error_output: str) -> bool:
    """Detect if an error is related to authentication.

    Args:
        error_output: The error output from AWS CLI

    Returns:
        True if the error is related to authentication, False otherwise
    """
    auth_error_patterns = [
        "Unable to locate credentials",
        "ExpiredToken",
        "AccessDenied",
        "AuthFailure",
        "The security token included in the request is invalid",
        "The config profile could not be found",
        "UnrecognizedClientException",
        "InvalidClientTokenId",
        "InvalidAccessKeyId",
        "SignatureDoesNotMatch",
        "Your credential profile is not properly configured",
        "credentials could not be refreshed",
        "NoCredentialProviders",
    ]
    return any(pattern in error_output for pattern in auth_error_patterns)


async def check_aws_cli_installed() -> bool:
    """Check if AWS CLI is installed and accessible.

    Returns:
        True if AWS CLI is installed, False otherwise
    """
    try:
        # Split command safely for exec
        cmd_parts = ["aws", "--version"]

        # Create subprocess using exec (safer than shell=True)
        process = await asyncio.create_subprocess_exec(*cmd_parts, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE)
        await process.communicate()
        return process.returncode == 0
    except Exception:
        return False


# Command validation functions are now imported from aws_mcp_server.security


async def execute_aws_command(command: str, timeout: int | None = None) -> CommandResult:
    """Execute an AWS CLI command and return the result.

    Validates, executes, and processes the results of an AWS CLI command,
    handling timeouts and output size limits.

    Args:
        command: The AWS CLI command to execute (must start with 'aws')
        timeout: Optional timeout in seconds (defaults to DEFAULT_TIMEOUT)

    Returns:
        CommandResult containing output and status

    Raises:
        CommandValidationError: If the command is invalid
        CommandExecutionError: If the command fails to execute
    """
    # Check if this is a piped command
    if is_pipe_command(command):
        return await execute_pipe_command(command, timeout)

    # Validate the command
    try:
        validate_aws_command(command)
    except ValueError as e:
        raise CommandValidationError(str(e)) from e

    # Set timeout
    if timeout is None:
        timeout = DEFAULT_TIMEOUT

    # Check if the command needs a region and doesn't have one specified
    from aws_mcp_server.config import AWS_REGION

    # Split by spaces and check for EC2 service specifically
    cmd_parts = shlex.split(command)
    is_ec2_command = len(cmd_parts) >= 2 and cmd_parts[0] == "aws" and cmd_parts[1] == "ec2"
    has_region = "--region" in cmd_parts

    # If it's an EC2 command and doesn't have --region
    if is_ec2_command and not has_region:
        # Add the region parameter
        command = f"{command} --region {AWS_REGION}"
        logger.debug(f"Added region to command: {command}")

    logger.debug(f"Executing AWS command: {command}")

    try:
        # Split command safely for exec
        cmd_parts = shlex.split(command)

        # Create subprocess using exec (safer than shell=True)
        process = await asyncio.create_subprocess_exec(*cmd_parts, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE)

        # Wait for the process to complete with timeout
        try:
            stdout, stderr = await asyncio.wait_for(process.communicate(), timeout)
            logger.debug(f"Command completed with return code: {process.returncode}")
        except asyncio.TimeoutError as timeout_error:
            logger.warning(f"Command timed out after {timeout} seconds: {command}")
            try:
                # process.kill() is synchronous, not a coroutine
                process.kill()
            except Exception as e:
                logger.error(f"Error killing process: {e}")
            raise CommandExecutionError(f"Command timed out after {timeout} seconds") from timeout_error

        # Process output
        stdout_str = stdout.decode("utf-8", errors="replace")
        stderr_str = stderr.decode("utf-8", errors="replace")

        # Truncate output if necessary
        if len(stdout_str) > MAX_OUTPUT_SIZE:
            logger.info(f"Output truncated from {len(stdout_str)} to {MAX_OUTPUT_SIZE} characters")
            stdout_str = stdout_str[:MAX_OUTPUT_SIZE] + "\n... (output truncated)"

        if process.returncode != 0:
            logger.warning(f"Command failed with return code {process.returncode}: {command}")
            logger.debug(f"Command error output: {stderr_str}")

            if is_auth_error(stderr_str):
                return CommandResult(status="error", output=f"Authentication error: {stderr_str}\nPlease check your AWS credentials.")

            return CommandResult(status="error", output=stderr_str or "Command failed with no error output")

        return CommandResult(status="success", output=stdout_str)
    except asyncio.CancelledError:
        raise
    except Exception as e:
        raise CommandExecutionError(f"Failed to execute command: {str(e)}") from e


async def execute_pipe_command(pipe_command: str, timeout: int | None = None) -> CommandResult:
    """Execute a command that contains pipes.

    Validates and executes a piped command where output is fed into subsequent commands.
    The first command must be an AWS CLI command, and subsequent commands must be
    allowed Unix utilities.

    Args:
        pipe_command: The piped command to execute
        timeout: Optional timeout in seconds (defaults to DEFAULT_TIMEOUT)

    Returns:
        CommandResult containing output and status

    Raises:
        CommandValidationError: If any command in the pipe is invalid
        CommandExecutionError: If the command fails to execute
    """
    # Validate the pipe command
    try:
        validate_pipe_command(pipe_command)
    except ValueError as e:
        raise CommandValidationError(f"Invalid pipe command: {str(e)}") from e
    except CommandValidationError as e:
        raise CommandValidationError(f"Invalid pipe command: {str(e)}") from e

    # Check if the first command in the pipe is an EC2 command and needs a region
    from aws_mcp_server.config import AWS_REGION

    commands = split_pipe_command(pipe_command)
    if commands:
        # Split first command by spaces to check for EC2 service specifically
        first_cmd_parts = shlex.split(commands[0])
        is_ec2_command = len(first_cmd_parts) >= 2 and first_cmd_parts[0] == "aws" and first_cmd_parts[1] == "ec2"
        has_region = "--region" in first_cmd_parts

        if is_ec2_command and not has_region:
            # Add the region parameter to the first command
            commands[0] = f"{commands[0]} --region {AWS_REGION}"
            # Rebuild the pipe command
            pipe_command = " | ".join(commands)
            logger.debug(f"Added region to piped command: {pipe_command}")

    logger.debug(f"Executing piped command: {pipe_command}")

    try:
        # Execute the piped command using our tools module
        return await execute_piped_command(pipe_command, timeout)
    except Exception as e:
        raise CommandExecutionError(f"Failed to execute piped command: {str(e)}") from e


async def get_command_help(service: str, command: str | None = None) -> CommandHelpResult:
    """Get help documentation for an AWS CLI service or command.

    Retrieves the help documentation for a specified AWS service or command
    by executing the appropriate AWS CLI help command.

    Args:
        service: The AWS service (e.g., s3, ec2)
        command: Optional command within the service

    Returns:
        CommandHelpResult containing the help text

    Raises:
        CommandExecutionError: If the help command fails
    """
    # Build the help command
    cmd_parts: list[str] = ["aws", service]
    if command:
        cmd_parts.append(command)
    cmd_parts.append("help")

    cmd_str = " ".join(cmd_parts)

    try:
        logger.debug(f"Getting command help for: {cmd_str}")
        result = await execute_aws_command(cmd_str)

        help_text = result["output"] if result["status"] == "success" else f"Error: {result['output']}"

        return CommandHelpResult(help_text=help_text)
    except CommandValidationError as e:
        logger.warning(f"Command validation error while getting help: {e}")
        return CommandHelpResult(help_text=f"Command validation error: {str(e)}")
    except CommandExecutionError as e:
        logger.warning(f"Command execution error while getting help: {e}")
        return CommandHelpResult(help_text=f"Error retrieving help: {str(e)}")
    except Exception as e:
        logger.error(f"Unexpected error while getting command help: {e}", exc_info=True)
        return CommandHelpResult(help_text=f"Error retrieving help: {str(e)}")

```

--------------------------------------------------------------------------------
/tests/unit/test_tools.py:
--------------------------------------------------------------------------------

```python
"""Unit tests for the tools module."""

import asyncio
from unittest.mock import AsyncMock, MagicMock, patch

import pytest

from aws_mcp_server.tools import (
    ALLOWED_UNIX_COMMANDS,
    execute_piped_command,
    is_pipe_command,
    split_pipe_command,
    validate_unix_command,
)


def test_allowed_unix_commands():
    """Test that ALLOWED_UNIX_COMMANDS contains expected commands."""
    # Verify that common Unix utilities are in the allowed list
    common_commands = ["grep", "xargs", "cat", "ls", "wc", "sort", "uniq", "jq"]
    for cmd in common_commands:
        assert cmd in ALLOWED_UNIX_COMMANDS


def test_validate_unix_command():
    """Test the validate_unix_command function."""
    # Test valid commands
    for cmd in ["grep pattern", "ls -la", "wc -l", "cat file.txt"]:
        assert validate_unix_command(cmd), f"Command should be valid: {cmd}"

    # Test invalid commands
    for cmd in ["invalid_cmd", "sudo ls", ""]:
        assert not validate_unix_command(cmd), f"Command should be invalid: {cmd}"


def test_is_pipe_command():
    """Test the is_pipe_command function."""
    # Test commands with pipes
    assert is_pipe_command("aws s3 ls | grep bucket")
    assert is_pipe_command("aws s3api list-buckets | jq '.Buckets[].Name' | sort")

    # Test commands without pipes
    assert not is_pipe_command("aws s3 ls")
    assert not is_pipe_command("aws ec2 describe-instances")

    # Test commands with pipes in quotes (should not be detected as pipe commands)
    assert not is_pipe_command("aws s3 ls 's3://my-bucket/file|other'")
    assert not is_pipe_command('aws ec2 run-instances --user-data "echo hello | grep world"')

    # Test commands with escaped quotes - these should not confuse the parser
    assert is_pipe_command('aws s3 ls --query "Name=\\"value\\"" | grep bucket')
    assert not is_pipe_command('aws s3 ls "s3://my-bucket/file\\"|other"')


def test_split_pipe_command():
    """Test the split_pipe_command function."""
    # Test simple pipe command
    cmd = "aws s3 ls | grep bucket"
    result = split_pipe_command(cmd)
    assert result == ["aws s3 ls", "grep bucket"]

    # Test multi-pipe command
    cmd = "aws s3api list-buckets | jq '.Buckets[].Name' | sort"
    result = split_pipe_command(cmd)
    assert result == ["aws s3api list-buckets", "jq '.Buckets[].Name'", "sort"]

    # Test with quoted pipe symbols (should not split inside quotes)
    cmd = "aws s3 ls 's3://bucket/file|name' | grep 'pattern|other'"
    result = split_pipe_command(cmd)
    assert result == ["aws s3 ls 's3://bucket/file|name'", "grep 'pattern|other'"]

    # Test with double quotes
    cmd = 'aws s3 ls "s3://bucket/file|name" | grep "pattern|other"'
    result = split_pipe_command(cmd)
    assert result == ['aws s3 ls "s3://bucket/file|name"', 'grep "pattern|other"']

    # Test with escaped quotes
    cmd = 'aws s3 ls --query "Name=\\"value\\"" | grep bucket'
    result = split_pipe_command(cmd)
    assert result == ['aws s3 ls --query "Name=\\"value\\""', "grep bucket"]

    # Test with escaped pipe symbol in quotes
    cmd = 'aws s3 ls "s3://bucket/file\\"|name" | grep pattern'
    result = split_pipe_command(cmd)
    assert result == ['aws s3 ls "s3://bucket/file\\"|name"', "grep pattern"]


@pytest.mark.asyncio
async def test_execute_piped_command_success():
    """Test successful execution of a piped command."""
    with patch("asyncio.create_subprocess_exec", new_callable=AsyncMock) as mock_subprocess:
        # Mock the first process in the pipe
        first_process_mock = AsyncMock()
        first_process_mock.returncode = 0
        first_process_mock.communicate.return_value = (b"S3 output", b"")

        # Mock the second process in the pipe
        second_process_mock = AsyncMock()
        second_process_mock.returncode = 0
        second_process_mock.communicate.return_value = (b"Filtered output", b"")

        # Set up the mock to return different values on subsequent calls
        mock_subprocess.side_effect = [first_process_mock, second_process_mock]

        result = await execute_piped_command("aws s3 ls | grep bucket")

        assert result["status"] == "success"
        assert result["output"] == "Filtered output"

        # Verify first command was called with correct args
        mock_subprocess.assert_any_call("aws", "s3", "ls", stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE)

        # Verify second command was called with correct args
        mock_subprocess.assert_any_call("grep", "bucket", stdin=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE)


@pytest.mark.asyncio
async def test_execute_piped_command_error_first_command():
    """Test error handling in execute_piped_command when first command fails."""
    with patch("asyncio.create_subprocess_exec", new_callable=AsyncMock) as mock_subprocess:
        # Mock a failed first process
        process_mock = AsyncMock()
        process_mock.returncode = 1
        process_mock.communicate.return_value = (b"", b"Command failed: aws")
        mock_subprocess.return_value = process_mock

        result = await execute_piped_command("aws s3 ls | grep bucket")

        assert result["status"] == "error"
        assert "Command failed: aws" in result["output"]


@pytest.mark.asyncio
async def test_execute_piped_command_error_second_command():
    """Test error handling in execute_piped_command when second command fails."""
    with patch("asyncio.create_subprocess_exec", new_callable=AsyncMock) as mock_subprocess:
        # Mock the first process in the pipe (success)
        first_process_mock = AsyncMock()
        first_process_mock.returncode = 0
        first_process_mock.communicate.return_value = (b"S3 output", b"")

        # Mock the second process in the pipe (failure)
        second_process_mock = AsyncMock()
        second_process_mock.returncode = 1
        second_process_mock.communicate.return_value = (b"", b"Command not found: xyz")

        # Set up the mock to return different values on subsequent calls
        mock_subprocess.side_effect = [first_process_mock, second_process_mock]

        result = await execute_piped_command("aws s3 ls | xyz")

        assert result["status"] == "error"
        assert "Command not found: xyz" in result["output"]


@pytest.mark.asyncio
async def test_execute_piped_command_timeout():
    """Test timeout handling in execute_piped_command."""
    with patch("asyncio.create_subprocess_exec", new_callable=AsyncMock) as mock_subprocess:
        # Mock a process that times out
        process_mock = AsyncMock()
        # Use a properly awaitable mock that raises TimeoutError
        communicate_mock = AsyncMock(side_effect=asyncio.TimeoutError())
        process_mock.communicate = communicate_mock
        # Use regular MagicMock since kill() is not an async method
        process_mock.kill = MagicMock()
        mock_subprocess.return_value = process_mock

        result = await execute_piped_command("aws s3 ls | grep bucket", timeout=1)

        assert result["status"] == "error"
        assert "Command timed out after 1 seconds" in result["output"]
        process_mock.kill.assert_called_once()


@pytest.mark.asyncio
async def test_execute_piped_command_exception():
    """Test general exception handling in execute_piped_command."""
    with patch("asyncio.create_subprocess_exec", side_effect=Exception("Test exception")):
        result = await execute_piped_command("aws s3 ls | grep bucket")

        assert result["status"] == "error"
        assert "Failed to execute command" in result["output"]
        assert "Test exception" in result["output"]


@pytest.mark.asyncio
async def test_execute_piped_command_empty_command():
    """Test handling of empty commands."""
    result = await execute_piped_command("")

    assert result["status"] == "error"
    assert "Empty command" in result["output"]


@pytest.mark.asyncio
async def test_execute_piped_command_timeout_during_final_wait():
    """Test timeout handling during wait for the final command in a pipe."""
    # This test directly tests the branch where a timeout occurs during awaiting the final command
    with patch("asyncio.wait_for", side_effect=asyncio.TimeoutError()):
        with patch("aws_mcp_server.tools.split_pipe_command") as mock_split:
            mock_split.return_value = ["aws s3 ls", "grep bucket"]

            # We don't need to mock the subprocess - it won't reach that point
            # because wait_for will raise a TimeoutError first
            result = await execute_piped_command("aws s3 ls | grep bucket", timeout=5)

            assert result["status"] == "error"
            assert "Command timed out after 5 seconds" in result["output"]


@pytest.mark.asyncio
async def test_execute_piped_command_kill_error_during_timeout():
    """Test error handling when killing a process after timeout fails."""
    with patch("asyncio.create_subprocess_exec", new_callable=AsyncMock) as mock_subprocess:
        # Mock a process that times out
        process_mock = AsyncMock()
        process_mock.communicate.side_effect = asyncio.TimeoutError()
        process_mock.kill = MagicMock(side_effect=Exception("Failed to kill process"))
        mock_subprocess.return_value = process_mock

        result = await execute_piped_command("aws s3 ls", timeout=1)

        assert result["status"] == "error"
        assert "Command timed out after 1 seconds" in result["output"]
        process_mock.kill.assert_called_once()


@pytest.mark.asyncio
async def test_execute_piped_command_large_output():
    """Test output truncation in execute_piped_command."""
    from aws_mcp_server.config import MAX_OUTPUT_SIZE

    with patch("asyncio.create_subprocess_exec", new_callable=AsyncMock) as mock_subprocess:
        # Mock a process with large output
        process_mock = AsyncMock()
        process_mock.returncode = 0

        # Generate output larger than MAX_OUTPUT_SIZE
        large_output = "x" * (MAX_OUTPUT_SIZE + 1000)
        process_mock.communicate.return_value = (large_output.encode("utf-8"), b"")
        mock_subprocess.return_value = process_mock

        result = await execute_piped_command("aws s3 ls")

        assert result["status"] == "success"
        assert len(result["output"]) <= MAX_OUTPUT_SIZE + 100  # Allow for truncation message
        assert "output truncated" in result["output"]

```

--------------------------------------------------------------------------------
/tests/integration/test_aws_live.py:
--------------------------------------------------------------------------------

```python
"""Live AWS integration tests for the AWS MCP Server.

These tests connect to real AWS resources and require:
1. AWS CLI installed locally
2. AWS credentials configured with access to test resources
3. The --run-integration flag when running pytest

Note: The tests that require an S3 bucket will create a temporary bucket
if AWS_TEST_BUCKET environment variable is not set.
"""

import asyncio
import json
import logging
import os
import time
import uuid

import pytest

from aws_mcp_server.server import aws_cli_help, aws_cli_pipeline

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


class TestAWSLiveIntegration:
    """Integration tests that interact with real AWS services.

    These tests require AWS credentials and actual AWS resources.
    They verify the AWS MCP Server can properly interact with AWS.
    """

    # Apply the integration marker to each test method instead of the class

    @pytest.mark.asyncio
    @pytest.mark.integration
    @pytest.mark.parametrize(
        "service,command,expected_content",
        [
            ("s3", None, ["description", "ls", "cp", "mv"]),
            ("ec2", None, ["description", "run-instances", "describe-instances"]),
            # The AWS CLI outputs help with control characters that complicate exact matching
            # We need to use content that will be in the help text even with the escape characters
            ("s3", "ls", ["list s3 objects", "options", "examples"]),
        ],
    )
    async def test_aws_cli_help(self, ensure_aws_credentials, service, command, expected_content):
        """Test getting help for various AWS commands."""
        result = await aws_cli_help(service=service, command=command, ctx=None)

        # Verify we got a valid response
        assert isinstance(result, dict)
        assert "help_text" in result

        # Check for expected content in the help text (case-insensitive)
        help_text = result["help_text"].lower()
        for content in expected_content:
            assert content.lower() in help_text, f"Expected '{content}' in {service} {command} help text"

    @pytest.mark.asyncio
    @pytest.mark.integration
    async def test_list_s3_buckets(self, ensure_aws_credentials):
        """Test listing S3 buckets."""
        result = await aws_cli_pipeline(command="aws s3 ls", timeout=None, ctx=None)

        # Verify the result format
        assert isinstance(result, dict)
        assert "status" in result
        assert "output" in result
        assert result["status"] == "success"

        # Output should be a string containing the bucket listing (or empty if no buckets)
        assert isinstance(result["output"], str)

        logger.info(f"S3 bucket list result: {result['output']}")

    @pytest.mark.asyncio
    @pytest.mark.integration
    async def test_s3_operations_with_test_bucket(self, ensure_aws_credentials):
        """Test S3 operations using a test bucket.

        This test:
        1. Creates a temporary bucket
        2. Creates a test file
        3. Uploads it to S3
        4. Lists the bucket contents
        5. Downloads the file with a different name
        6. Verifies the downloaded content
        7. Cleans up all test files and the bucket
        """
        # Get region from environment or use default
        region = os.environ.get("AWS_TEST_REGION", os.environ.get("AWS_REGION", "us-east-1"))
        print(f"Using AWS region: {region}")

        # Generate a unique bucket name
        timestamp = int(time.time())
        random_id = str(uuid.uuid4())[:8]
        bucket_name = f"aws-mcp-test-{timestamp}-{random_id}"

        test_file_name = "test_file.txt"
        test_file_content = "This is a test file for AWS MCP Server integration tests"
        downloaded_file_name = "test_file_downloaded.txt"

        try:
            # Create the bucket
            create_cmd = f"aws s3 mb s3://{bucket_name} --region {region}"
            result = await aws_cli_pipeline(command=create_cmd, timeout=None, ctx=None)
            assert result["status"] == "success", f"Failed to create bucket: {result['output']}"

            # Wait for bucket to be fully available
            await asyncio.sleep(3)

            # Create a local test file
            with open(test_file_name, "w") as f:
                f.write(test_file_content)

            # Upload the file to S3
            upload_result = await aws_cli_pipeline(
                command=f"aws s3 cp {test_file_name} s3://{bucket_name}/{test_file_name} --region {region}", timeout=None, ctx=None
            )
            assert upload_result["status"] == "success"

            # List the bucket contents
            list_result = await aws_cli_pipeline(command=f"aws s3 ls s3://{bucket_name}/ --region {region}", timeout=None, ctx=None)
            assert list_result["status"] == "success"
            assert test_file_name in list_result["output"]

            # Download the file with a different name
            download_result = await aws_cli_pipeline(
                command=f"aws s3 cp s3://{bucket_name}/{test_file_name} {downloaded_file_name} --region {region}", timeout=None, ctx=None
            )
            assert download_result["status"] == "success"

            # Verify the downloaded file content
            with open(downloaded_file_name, "r") as f:
                downloaded_content = f.read()
            assert downloaded_content == test_file_content

        finally:
            # Clean up local files
            for file_name in [test_file_name, downloaded_file_name]:
                if os.path.exists(file_name):
                    os.remove(file_name)

            # Clean up: Remove files from S3
            await aws_cli_pipeline(command=f"aws s3 rm s3://{bucket_name} --recursive --region {region}", timeout=None, ctx=None)

            # Delete the bucket
            await aws_cli_pipeline(command=f"aws s3 rb s3://{bucket_name} --region {region}", timeout=None, ctx=None)

    @pytest.mark.asyncio
    @pytest.mark.integration
    @pytest.mark.parametrize(
        "command,expected_attributes,description",
        [
            # Test JSON formatting with EC2 regions
            ("aws ec2 describe-regions --output json", {"json_key": "Regions", "expected_type": list}, "JSON output with EC2 regions"),
            # Test JSON formatting with S3 buckets (may be empty but should be valid JSON)
            ("aws s3api list-buckets --output json", {"json_key": "Buckets", "expected_type": list}, "JSON output with S3 buckets"),
        ],
    )
    async def test_aws_json_output_formatting(self, ensure_aws_credentials, command, expected_attributes, description):
        """Test JSON output formatting from various AWS commands."""
        result = await aws_cli_pipeline(command=command, timeout=None, ctx=None)

        assert result["status"] == "success", f"Command failed: {result.get('output', '')}"

        # The output should be valid JSON
        try:
            json_data = json.loads(result["output"])

            # Verify expected JSON structure
            json_key = expected_attributes["json_key"]
            expected_type = expected_attributes["expected_type"]

            assert json_key in json_data, f"Expected key '{json_key}' not found in JSON response"
            assert isinstance(json_data[json_key], expected_type), f"Expected {json_key} to be of type {expected_type.__name__}"

            # Log some info about the response
            logger.info(f"Successfully parsed JSON response for {description} with {len(json_data[json_key])} items")

        except json.JSONDecodeError:
            pytest.fail(f"Output is not valid JSON: {result['output'][:100]}...")

    @pytest.mark.asyncio
    @pytest.mark.integration
    @pytest.mark.parametrize(
        "command,validation_func,description",
        [
            # Test simple pipe with count
            ("aws ec2 describe-regions --query 'Regions[*].RegionName' --output text | wc -l", lambda output: int(output.strip()) > 0, "Count of AWS regions"),
            # Test pipe with grep and sort
            (
                "aws ec2 describe-regions --query 'Regions[*].RegionName' --output text | grep east | sort",
                lambda output: all("east" in r.lower() for r in output.strip().split("\n") if r),
                "Filtered and sorted east regions",
            ),
            # Test more complex pipe with multiple operations
            (
                "aws ec2 describe-regions --output json | grep RegionName | head -3 | wc -l",
                lambda output: int(output.strip()) <= 3,
                "Limited region output with multiple pipes",
            ),
            # Test pipe with JSON grep
            (
                "aws iam list-roles --output json | grep RoleName",
                lambda output: "RoleName" in output or output.strip() == "",
                "Lists IAM roles or returns empty if none exist",
            ),
            # Very simple pipe command that should work anywhere
            (
                "aws --version | grep aws",
                lambda output: "aws" in output.lower(),  # Just check for the word "aws" in output
                "AWS version with grep",
            ),
        ],
    )
    async def test_piped_commands(self, ensure_aws_credentials, command, validation_func, description):
        """Test execution of various piped commands with AWS CLI and Unix utilities."""
        result = await aws_cli_pipeline(command=command, timeout=None, ctx=None)

        assert result["status"] == "success", f"Command failed: {result.get('output', '')}"

        # Validate the output using the provided validation function
        assert validation_func(result["output"]), f"Output validation failed for {description}"

        # Log success
        logger.info(f"Successfully executed piped command for {description}: {result['output'][:50]}...")

    @pytest.mark.asyncio
    @pytest.mark.integration
    async def test_aws_account_resource(self, ensure_aws_credentials):
        """Test that the AWS account resource returns non-null account information."""
        # Import resources module
        from aws_mcp_server.resources import get_aws_account_info

        # Get account info directly using the function
        account_info = get_aws_account_info()

        # Verify account info is not empty
        assert account_info is not None, "AWS account info is None"

        # Verify the account_id field is not null
        # We don't check specific values, just that they are not null when credentials are present
        assert account_info["account_id"] is not None, "AWS account_id is null"

        # Log success with masked account ID for verification (show first 4 chars)
        account_id = account_info["account_id"]
        masked_id = f"{account_id[:4]}{'*' * (len(account_id) - 4)}" if account_id else "None"
        logger.info(f"Successfully accessed AWS account info with account_id: {masked_id}")

        # Log organization_id status - this might be null depending on permissions
        has_org_id = account_info["organization_id"] is not None
        logger.info(f"Organization ID available: {has_org_id}")

    @pytest.mark.asyncio
    @pytest.mark.integration
    async def test_us_east_1_region_services(self, ensure_aws_credentials):
        """Test that the us-east-1 region resource returns expected services.

        This test verifies that:
        1. The region details endpoint for us-east-1 works
        2. The core AWS services we expect are listed as available
        3. The service information is correctly formatted
        """
        # Import resources module and server
        from aws_mcp_server.resources import get_region_details
        from aws_mcp_server.server import mcp

        # Get region details directly using the function
        region_code = "us-east-1"
        region_details = get_region_details(region_code)

        # Verify region details is not empty
        assert region_details is not None, "Region details is None"
        assert region_details["code"] == region_code, "Region code does not match expected value"
        assert region_details["name"] == "US East (N. Virginia)", "Region name does not match expected value"

        # Verify services is a list and not empty
        assert "services" in region_details, "Services not found in region details"
        assert isinstance(region_details["services"], list), "Services is not a list"
        assert len(region_details["services"]) > 0, "Services list is empty"

        # Verify each service has id and name fields
        for service in region_details["services"]:
            assert "id" in service, "Service missing 'id' field"
            assert "name" in service, "Service missing 'name' field"

        # Check for core AWS services that should be available in us-east-1
        required_services = ["ec2", "s3", "lambda", "dynamodb", "rds", "cloudformation", "iam"]

        service_ids = [service["id"] for service in region_details["services"]]

        for required_service in required_services:
            assert required_service in service_ids, f"Required service '{required_service}' not found in us-east-1 services"

        # Log the number of services found
        logger.info(f"Found {len(region_details['services'])} services in us-east-1")

        # Test access through the MCP resource URI
        try:
            resource = await mcp.resources_read(uri=f"aws://config/regions/{region_code}")
            assert resource is not None, "Failed to read region resource through MCP"
            assert resource.content["code"] == region_code, "Resource region code does not match"
            assert resource.content["name"] == "US East (N. Virginia)", "Resource region name does not match"
            assert "services" in resource.content, "Services not found in MCP resource content"

            # Verify at least the same core services are present in the resource response
            mcp_service_ids = [service["id"] for service in resource.content["services"]]
            for required_service in required_services:
                assert required_service in mcp_service_ids, f"Required service '{required_service}' not found in MCP resource services"

            logger.info("Successfully accessed us-east-1 region details through MCP resource")
        except Exception as e:
            logger.warning(f"Could not test MCP resource access: {e}")
            # Don't fail the test if this part doesn't work - focus on the direct API test

```

--------------------------------------------------------------------------------
/tests/unit/test_security.py:
--------------------------------------------------------------------------------

```python
"""Unit tests for the security module."""

from unittest.mock import mock_open, patch

import pytest
import yaml

from aws_mcp_server.security import (
    DEFAULT_DANGEROUS_COMMANDS,
    DEFAULT_SAFE_PATTERNS,
    SecurityConfig,
    ValidationRule,
    check_regex_rules,
    is_service_command_safe,
    load_security_config,
    reload_security_config,
    validate_aws_command,
    validate_command,
    validate_pipe_command,
)


def test_is_service_command_safe():
    """Test the is_service_command_safe function."""
    # Test with known safe pattern
    assert is_service_command_safe("aws s3 ls", "s3") is True

    # Test with known dangerous pattern that has safe override
    assert is_service_command_safe("aws s3 ls --profile test", "s3") is True

    # Test with known dangerous pattern with no safe override
    assert is_service_command_safe("aws s3 rb s3://my-bucket", "s3") is False

    # Test with unknown service
    assert is_service_command_safe("aws unknown-service command", "unknown-service") is False


def test_check_regex_rules():
    """Test the check_regex_rules function."""
    # Test with a pattern that should match
    with patch("aws_mcp_server.security.SECURITY_CONFIG") as mock_config:
        mock_config.regex_rules = {
            "general": [
                ValidationRule(
                    pattern=r"aws .* --profile\s+(root|admin|administrator)",
                    description="Prevent use of sensitive profiles",
                    error_message="Using sensitive profiles (root, admin) is restricted",
                    regex=True,
                )
            ]
        }

        # Should match the rule
        error = check_regex_rules("aws s3 ls --profile root")
        assert error is not None
        assert "Using sensitive profiles" in error

        # Should not match
        assert check_regex_rules("aws s3 ls --profile user") is None


@patch("aws_mcp_server.security.SECURITY_MODE", "strict")
def test_validate_aws_command_basic():
    """Test basic validation of AWS commands."""
    # Valid command should not raise
    validate_aws_command("aws s3 ls")

    # Invalid commands should raise ValueError
    with pytest.raises(ValueError, match="Commands must start with 'aws'"):
        validate_aws_command("s3 ls")

    with pytest.raises(ValueError, match="must include an AWS service"):
        validate_aws_command("aws")


@patch("aws_mcp_server.security.SECURITY_MODE", "strict")
def test_validate_aws_command_dangerous():
    """Test validation of dangerous AWS commands."""
    # Use a test config
    with patch("aws_mcp_server.security.SECURITY_CONFIG") as mock_config:
        mock_config.dangerous_commands = {
            "iam": ["aws iam create-user", "aws iam create-access-key"],
            "ec2": ["aws ec2 terminate-instances"],
        }
        mock_config.safe_patterns = {
            "iam": ["aws iam create-user --help"],
            "ec2": [],
        }
        mock_config.regex_rules = {}

        # Dangerous command should raise ValueError
        with pytest.raises(ValueError, match="restricted for security reasons"):
            validate_aws_command("aws iam create-user --user-name test-user")

        # Help on dangerous command should be allowed
        validate_aws_command("aws iam create-user --help")

        # Dangerous command with no safe override should raise
        with pytest.raises(ValueError, match="restricted for security reasons"):
            validate_aws_command("aws ec2 terminate-instances --instance-id i-12345")


@patch("aws_mcp_server.security.SECURITY_MODE", "strict")
def test_validate_aws_command_regex():
    """Test validation of AWS commands with regex rules."""
    # Set up command for testing
    profile_command = "aws s3 ls --profile root"
    policy_command = """aws s3api put-bucket-policy --bucket my-bucket --policy "{\\"Version\\":\\"2012-10-17\\",\
\\"Statement\\":[{\\"Effect\\":\\"Allow\\",\\"Principal\\":\\"*\\",\\"Action\\":\\"s3:GetObject\\",\
\\"Resource\\":\\"arn:aws:s3:::my-bucket/*\\"}]}" """

    # We need to patch both the check_regex_rules function and the config
    with patch("aws_mcp_server.security.SECURITY_CONFIG") as mock_config:
        mock_config.dangerous_commands = {}
        mock_config.safe_patterns = {}

        # Test for the root profile check
        with patch("aws_mcp_server.security.check_regex_rules") as mock_check:
            mock_check.return_value = "Using sensitive profiles is restricted"

            with pytest.raises(ValueError, match="Using sensitive profiles is restricted"):
                validate_aws_command(profile_command)

            # Verify check_regex_rules was called
            mock_check.assert_called_once()

        # Test for the bucket policy check
        with patch("aws_mcp_server.security.check_regex_rules") as mock_check:
            # Have the mock return error for the policy command
            mock_check.return_value = "Creating public bucket policies is restricted"

            with pytest.raises(ValueError, match="Creating public bucket policies is restricted"):
                validate_aws_command(policy_command)

            # Verify check_regex_rules was called
            mock_check.assert_called_once()


@patch("aws_mcp_server.security.SECURITY_MODE", "permissive")
def test_validate_aws_command_permissive():
    """Test validation of AWS commands in permissive mode."""
    # In permissive mode, dangerous commands should be allowed
    with patch("aws_mcp_server.security.logger.warning") as mock_warning:
        validate_aws_command("aws iam create-user --user-name test-user")
        mock_warning.assert_called_once()


@patch("aws_mcp_server.security.SECURITY_MODE", "strict")
def test_validate_pipe_command():
    """Test validation of piped commands."""
    # Mock the validate_aws_command and validate_unix_command functions
    with patch("aws_mcp_server.security.validate_aws_command") as mock_aws_validate:
        with patch("aws_mcp_server.security.validate_unix_command") as mock_unix_validate:
            # Set up return values
            mock_unix_validate.return_value = True

            # Test valid piped command
            validate_pipe_command("aws s3 ls | grep bucket")
            mock_aws_validate.assert_called_once_with("aws s3 ls")

            # Reset mocks
            mock_aws_validate.reset_mock()
            mock_unix_validate.reset_mock()

            # Test command with unrecognized Unix command
            mock_unix_validate.return_value = False
            with pytest.raises(ValueError, match="not allowed"):
                validate_pipe_command("aws s3 ls | unknown_command")

            # Empty command should raise
            with pytest.raises(ValueError, match="Empty command"):
                validate_pipe_command("")

            # Empty second command test
            # Configure split_pipe_command to return a list with an empty second command
            with patch("aws_mcp_server.security.split_pipe_command") as mock_split_pipe:
                mock_split_pipe.return_value = ["aws s3 ls", ""]
                with pytest.raises(ValueError, match="Empty command at position"):
                    validate_pipe_command("aws s3 ls | ")


@patch("aws_mcp_server.security.SECURITY_MODE", "strict")
def test_validate_command():
    """Test the centralized validate_command function."""
    # Simple AWS command
    validate_command("aws s3 ls")

    # Piped command
    validate_command("aws s3 ls | grep bucket")

    # Invalid command
    with pytest.raises(ValueError):
        validate_command("s3 ls")


def test_load_security_config_default():
    """Test loading security configuration with defaults."""
    with patch("aws_mcp_server.security.SECURITY_CONFIG_PATH", ""):
        config = load_security_config()

        # Should have loaded default values
        assert config.dangerous_commands == DEFAULT_DANGEROUS_COMMANDS
        assert config.safe_patterns == DEFAULT_SAFE_PATTERNS

        # Should have regex rules converted from DEFAULT_REGEX_RULES
        assert "general" in config.regex_rules
        assert len(config.regex_rules["general"]) > 0
        assert isinstance(config.regex_rules["general"][0], ValidationRule)


def test_load_security_config_custom():
    """Test loading security configuration from a custom file."""
    # Mock YAML file contents
    test_config = {
        "dangerous_commands": {"test_service": ["aws test_service dangerous_command"]},
        "safe_patterns": {"test_service": ["aws test_service safe_pattern"]},
        "regex_rules": {"test_service": [{"pattern": "test_pattern", "description": "Test description", "error_message": "Test error message"}]},
    }

    # Mock the open function to return our test config
    with patch("builtins.open", mock_open(read_data=yaml.dump(test_config))):
        with patch("aws_mcp_server.security.SECURITY_CONFIG_PATH", "/fake/path.yaml"):
            with patch("pathlib.Path.exists", return_value=True):
                config = load_security_config()

                # Should have our custom values
                assert "test_service" in config.dangerous_commands
                assert "test_service" in config.safe_patterns
                assert "test_service" in config.regex_rules
                assert config.regex_rules["test_service"][0].pattern == "test_pattern"


def test_load_security_config_error():
    """Test error handling when loading security configuration."""
    with patch("builtins.open", side_effect=Exception("Test error")):
        with patch("aws_mcp_server.security.SECURITY_CONFIG_PATH", "/fake/path.yaml"):
            with patch("pathlib.Path.exists", return_value=True):
                with patch("aws_mcp_server.security.logger.error") as mock_error:
                    with patch("aws_mcp_server.security.logger.warning") as mock_warning:
                        config = load_security_config()

                        # Should log error and warning
                        mock_error.assert_called_once()
                        mock_warning.assert_called_once()

                        # Should still have default values
                        assert config.dangerous_commands == DEFAULT_DANGEROUS_COMMANDS


def test_reload_security_config():
    """Test reloading security configuration."""
    with patch("aws_mcp_server.security.load_security_config") as mock_load:
        mock_load.return_value = SecurityConfig(dangerous_commands={"test": ["test"]}, safe_patterns={"test": ["test"]})

        reload_security_config()

        # Should have called load_security_config
        mock_load.assert_called_once()


# Integration-like tests for specific dangerous commands
@patch("aws_mcp_server.security.SECURITY_MODE", "strict")
def test_specific_dangerous_commands():
    """Test validation of specific dangerous commands."""
    # Configure the SECURITY_CONFIG with some dangerous commands
    with patch("aws_mcp_server.security.SECURITY_CONFIG") as mock_config:
        mock_config.dangerous_commands = {
            "iam": ["aws iam create-user", "aws iam create-access-key", "aws iam attach-user-policy"],
            "ec2": ["aws ec2 terminate-instances"],
            "s3": ["aws s3 rb"],
            "rds": ["aws rds delete-db-instance"],
        }
        mock_config.safe_patterns = {
            "iam": ["aws iam get-", "aws iam list-"],
            "ec2": ["aws ec2 describe-"],
            "s3": ["aws s3 ls"],
            "rds": ["aws rds describe-"],
        }
        mock_config.regex_rules = {}

        # IAM dangerous commands
        with pytest.raises(ValueError, match="restricted for security reasons"):
            validate_aws_command("aws iam create-user --user-name test-user")

        with pytest.raises(ValueError, match="restricted for security reasons"):
            validate_aws_command("aws iam create-access-key --user-name test-user")

        with pytest.raises(ValueError, match="restricted for security reasons"):
            validate_aws_command("aws iam attach-user-policy --user-name test-user --policy-arn arn:aws:iam::aws:policy/AdministratorAccess")

        # EC2 dangerous commands
        with pytest.raises(ValueError, match="restricted for security reasons"):
            validate_aws_command("aws ec2 terminate-instances --instance-ids i-12345")

        # S3 dangerous commands
        with pytest.raises(ValueError, match="restricted for security reasons"):
            validate_aws_command("aws s3 rb s3://my-bucket --force")

        # RDS dangerous commands
        with pytest.raises(ValueError, match="restricted for security reasons"):
            validate_aws_command("aws rds delete-db-instance --db-instance-identifier my-db --skip-final-snapshot")


# Tests for safe patterns overriding dangerous commands
@patch("aws_mcp_server.security.SECURITY_MODE", "strict")
def test_safe_overrides():
    """Test safe patterns that override dangerous commands."""
    # IAM help commands should be allowed even if potentially dangerous
    validate_aws_command("aws iam --help")
    validate_aws_command("aws iam help")
    validate_aws_command("aws iam get-user --user-name test-user")
    validate_aws_command("aws iam list-users")

    # EC2 describe commands should be allowed
    validate_aws_command("aws ec2 describe-instances")

    # S3 list commands should be allowed
    validate_aws_command("aws s3 ls")
    validate_aws_command("aws s3api list-buckets")


# Tests for complex regex patterns
@patch("aws_mcp_server.security.SECURITY_MODE", "strict")
def test_complex_regex_patterns():
    """Test more complex regex patterns."""
    # Instead of testing the regex directly, test the behavior we expect
    dangerous_sg_command = "aws ec2 authorize-security-group-ingress --group-id sg-12345 --protocol tcp --port 22 --cidr 0.0.0.0/0"
    safe_sg_command_80 = "aws ec2 authorize-security-group-ingress --group-id sg-12345 --protocol tcp --port 80 --cidr 0.0.0.0/0"

    # Define the validation rule directly
    ValidationRule(
        pattern=r"aws ec2 authorize-security-group-ingress.*--cidr\s+0\.0\.0\.0/0.*--port\s+(?!80|443)\d+",
        description="Prevent open security groups for non-web ports",
        error_message="Security group error",
        regex=True,
    )

    # Test with mocked check_regex_rules
    with patch("aws_mcp_server.security.SECURITY_CONFIG") as mock_config:
        mock_config.dangerous_commands = {}
        mock_config.safe_patterns = {}

        with patch("aws_mcp_server.security.check_regex_rules") as mock_check:
            # Set up mock to return error for the dangerous command
            mock_check.side_effect = lambda cmd, svc=None: "Security group error" if "--port 22" in cmd else None

            # Test dangerous command raises error
            with pytest.raises(ValueError, match="Security group error"):
                validate_aws_command(dangerous_sg_command)

            # Test safe command doesn't raise
            mock_check.reset_mock()
            mock_check.return_value = None  # Explicit safe return
            validate_aws_command(safe_sg_command_80)  # Should not raise

```

--------------------------------------------------------------------------------
/security_config_example.yaml:
--------------------------------------------------------------------------------

```yaml
# AWS MCP Server Security Configuration Example
# Place this file at a location specified by AWS_MCP_SECURITY_CONFIG environment variable

# ---------------------------------------------------------------------------------
# 🔒 Security Rules Overview 🔒
# ---------------------------------------------------------------------------------
# The AWS MCP Server security system uses three layers of protection:
#
# 1. DANGEROUS_COMMANDS: Block specific commands that could compromise security
#    or lead to account takeover, privilege escalation, or audit tampering
#
# 2. SAFE_PATTERNS: Allow read-only and explicitly safe operations that
#    match dangerous patterns but are needed for normal operation
#
# 3. REGEX_RULES: Complex pattern matching for security risks that can't
#    be captured by simple command patterns
#
# How the layers work together:
# - First, the system checks if a command matches any dangerous pattern
# - If it does, the system then checks if it matches any safe pattern
# - If it matches a safe pattern, it's allowed despite being dangerous
# - Finally, the command is checked against all regex rules
# - Any match with a regex rule will block the command, regardless of other checks
#
# Security Mode:
# - Set AWS_MCP_SECURITY_MODE=strict (default) to enforce all rules
# - Set AWS_MCP_SECURITY_MODE=permissive to log warnings but allow execution
# ---------------------------------------------------------------------------------

# ---------------------------------------------------------------------------------
# 🔑 Identity and Access Control Security Rules
# ---------------------------------------------------------------------------------
# These rules focus on preventing identity-based attacks such as:
# - Account takeover via creation of unauthorized users/credentials
# - Privilege escalation by attaching permissive policies
# - Credential exposure through access key creation
# - Console password creation and MFA device manipulation
# ---------------------------------------------------------------------------------

# Commands considered dangerous by security category
# Keys are AWS service names, values are lists of command prefixes to block
dangerous_commands:
  # Identity and Access Management - core of security
  iam:
    # User management (potential backdoor accounts)
    - "aws iam create-user"              # Creates new IAM users that could persist after compromise
    - "aws iam update-user"              # Updates existing user properties
    
    # Credential management (theft risk)
    - "aws iam create-access-key"        # Creates long-term credentials that can be exfiltrated
    - "aws iam update-access-key"        # Changes status of access keys (enabling/disabling)
    - "aws iam create-login-profile"     # Creates console passwords for existing users
    - "aws iam update-login-profile"     # Updates console passwords
    
    # Authentication controls
    - "aws iam create-virtual-mfa-device" # Creates new MFA devices
    - "aws iam deactivate-mfa-device"    # Removes MFA protection from accounts
    - "aws iam delete-virtual-mfa-device" # Deletes MFA devices
    - "aws iam enable-mfa-device"        # Enables/associates MFA devices
    
    # Privilege escalation via policy manipulation
    - "aws iam attach-user-policy"       # Attaches managed policies to users
    - "aws iam attach-role-policy"       # Attaches managed policies to roles
    - "aws iam attach-group-policy"      # Attaches managed policies to groups
    - "aws iam create-policy"            # Creates new managed policies
    - "aws iam create-policy-version"    # Creates new versions of managed policies
    - "aws iam set-default-policy-version" # Changes active policy version
    
    # Inline policy manipulation (harder to detect)
    - "aws iam put-user-policy"          # Creates/updates inline policies for users
    - "aws iam put-role-policy"          # Creates/updates inline policies for roles
    - "aws iam put-group-policy"         # Creates/updates inline policies for groups
    
    # Trust relationship manipulation
    - "aws iam update-assume-role-policy" # Changes who can assume a role
    - "aws iam update-role"              # Updates role properties
  
  # Security Token Service - temporary credentials
  sts:
    - "aws sts assume-role"              # Assumes roles with potentially higher privileges
    - "aws sts get-federation-token"     # Gets federated access tokens
  
  # AWS Organizations - multi-account management
  organizations:
    - "aws organizations create-account"  # Creates new AWS accounts
    - "aws organizations invite-account-to-organization" # Brings accounts under org control
    - "aws organizations leave-organization" # Removes accounts from organization
    - "aws organizations remove-account-from-organization" # Removes accounts from organization
    - "aws organizations disable-policy-type" # Disables policy enforcement
    - "aws organizations create-policy"   # Creates organization policies
    - "aws organizations attach-policy"   # Attaches organization policies

  # ---------------------------------------------------------------------------------
  # 🔍 Audit and Logging Security Rules
  # ---------------------------------------------------------------------------------
  # These rules prevent attackers from covering their tracks by:
  # - Disabling or deleting audit logs (CloudTrail)
  # - Turning off compliance monitoring (Config)
  # - Disabling threat detection (GuardDuty)
  # - Removing alarm systems (CloudWatch)
  # ---------------------------------------------------------------------------------
  
  # CloudTrail - AWS activity logging
  cloudtrail:
    - "aws cloudtrail delete-trail"       # Removes audit trail completely
    - "aws cloudtrail stop-logging"       # Stops collecting audit logs
    - "aws cloudtrail update-trail"       # Modifies logging settings (e.g., disabling logging)
    - "aws cloudtrail put-event-selectors" # Changes what events are logged
    - "aws cloudtrail delete-event-data-store" # Deletes storage for CloudTrail events
  
  # AWS Config - configuration monitoring
  config:
    - "aws configservice delete-configuration-recorder" # Removes configuration tracking
    - "aws configservice stop-configuration-recorder"   # Stops recording configuration changes
    - "aws configservice delete-delivery-channel"       # Stops delivering configuration snapshots
    - "aws configservice delete-remediation-configuration" # Removes auto-remediation
  
  # GuardDuty - threat detection
  guardduty:
    - "aws guardduty delete-detector"     # Disables threat detection completely
    - "aws guardduty disable-organization-admin-account" # Disables central security
    - "aws guardduty update-detector"     # Modifies threat detection settings
  
  # CloudWatch - monitoring and alerting
  cloudwatch:
    - "aws cloudwatch delete-alarms"     # Removes security alarm configurations
    - "aws cloudwatch disable-alarm-actions" # Disables alarm action triggers
    - "aws cloudwatch delete-dashboards" # Removes monitoring dashboards

  # ---------------------------------------------------------------------------------
  # 🔐 Data Security Rules
  # ---------------------------------------------------------------------------------
  # These rules prevent data exposure through:
  # - Secret and encryption key management
  # - Storage bucket permission controls
  # - Encryption settings management
  # ---------------------------------------------------------------------------------
  
  # Secrets Manager - sensitive credential storage
  secretsmanager:
    - "aws secretsmanager put-secret-value"   # Changes stored secrets
    - "aws secretsmanager update-secret"      # Updates secret properties
    - "aws secretsmanager restore-secret"     # Restores deleted secrets
    - "aws secretsmanager delete-secret"      # Removes sensitive secrets
  
  # KMS - encryption key management
  kms:
    - "aws kms disable-key"              # Disables encryption keys
    - "aws kms delete-alias"             # Removes key aliases
    - "aws kms schedule-key-deletion"    # Schedules deletion of encryption keys
    - "aws kms cancel-key-deletion"      # Cancels pending key deletion
    - "aws kms revoke-grant"             # Revokes permissions to use keys
  
  # S3 - object storage security
  s3:
    - "aws s3api put-bucket-policy"      # Changes bucket permissions
    - "aws s3api put-bucket-acl"         # Changes bucket access controls
    - "aws s3api delete-bucket-policy"   # Removes bucket protection policies
    - "aws s3api delete-bucket-encryption" # Removes encryption settings
    - "aws s3api put-public-access-block" # Changes public access settings

  # ---------------------------------------------------------------------------------
  # 🌐 Network Security Rules
  # ---------------------------------------------------------------------------------
  # These rules prevent network-based attacks through:
  # - Security group modification (firewall rules)
  # - Network ACL changes
  # - VPC endpoint manipulation
  # ---------------------------------------------------------------------------------
  
  # EC2 network security
  ec2:
    - "aws ec2 authorize-security-group-ingress" # Opens inbound network access
    - "aws ec2 authorize-security-group-egress"  # Opens outbound network access
    - "aws ec2 revoke-security-group-ingress"    # Removes inbound security rules
    - "aws ec2 revoke-security-group-egress"     # Removes outbound security rules
    - "aws ec2 modify-vpc-endpoint"              # Changes VPC endpoint settings
    - "aws ec2 create-flow-logs"                 # Creates network flow logs
    - "aws ec2 delete-flow-logs"                 # Removes network flow logs
    - "aws ec2 modify-instance-attribute"        # Changes security attributes of instances

# ---------------------------------------------------------------------------------
# ✓ Safe Patterns
# ---------------------------------------------------------------------------------
# These patterns explicitly allow read-only operations that don't modify resources
# and pose minimal or no security risk, even if they match dangerous patterns.
# ---------------------------------------------------------------------------------

# Safe patterns that override dangerous commands
safe_patterns:
  # Universal safe patterns for any service
  general:
    - "--help"             # Getting command help documentation
    - "help"               # Getting command help documentation
    - "--version"          # Checking AWS CLI version
    - "--dry-run"          # Testing without making changes
    - "--generate-cli-skeleton" # Generating skeleton templates
  
  # Read-only IAM operations
  iam:
    - "aws iam get-"           # All get operations (reading resources)
    - "aws iam list-"          # All list operations (listing resources)
    - "aws iam generate-"      # Report generation
    - "aws iam simulate-"      # Policy simulation (no changes)
    - "aws iam tag-"           # Adding organizational tags is generally safe
  
  # Read-only STS operations
  sts:
    - "aws sts get-caller-identity" # Checking current identity
    - "aws sts decode-authorization-message" # Decoding error messages
  
  # Read-only Organizations operations
  organizations:
    - "aws organizations describe-" # Reading organization details
    - "aws organizations list-"     # Listing organization resources
  
  # Read-only CloudTrail operations
  cloudtrail:
    - "aws cloudtrail describe-"     # Reading trail configurations
    - "aws cloudtrail get-"          # Getting trail settings
    - "aws cloudtrail list-"         # Listing trails/events
    - "aws cloudtrail lookup-events" # Searching audit events
  
  # Read-only AWS Config operations
  config:
    - "aws configservice describe-"  # Reading configuration details
    - "aws configservice get-"       # Getting configuration settings
    - "aws configservice list-"      # Listing configuration resources
    - "aws configservice select-resource-config" # Querying resources
  
  # Read-only GuardDuty operations
  guardduty:
    - "aws guardduty describe-"    # Reading detector configurations
    - "aws guardduty get-"         # Getting detector settings/findings
    - "aws guardduty list-"        # Listing detectors/findings
  
  # Read-only CloudWatch operations
  cloudwatch:
    - "aws cloudwatch describe-"   # Reading alarm configurations 
    - "aws cloudwatch get-"        # Getting metric data
    - "aws cloudwatch list-"       # Listing metrics/alarms
  
  # Read-only Secrets Manager operations
  secretsmanager:
    - "aws secretsmanager list-"     # Listing secrets (metadata only)
    - "aws secretsmanager describe-" # Reading metadata about secrets
  
  # Read-only KMS operations
  kms:
    - "aws kms describe-"   # Reading key details
    - "aws kms get-"        # Getting key settings
    - "aws kms list-"       # Listing keys and aliases
  
  # Read-only S3 operations
  s3:
    - "aws s3 ls"            # Listing buckets/objects
    - "aws s3api get-"       # Getting bucket settings/objects
    - "aws s3api list-"      # Listing buckets/objects
    - "aws s3api head-"      # Getting object metadata
  
  # Read-only EC2 network operations
  ec2:
    - "aws ec2 describe-"    # Reading network configurations
    - "aws ec2 get-"         # Getting network settings

# ---------------------------------------------------------------------------------
# 🔎 Regex Pattern Rules
# ---------------------------------------------------------------------------------
# These complex patterns detect security risks that can't be caught with simple
# command prefix matching. They use regular expressions to identify risky
# command patterns that could compromise security.
# ---------------------------------------------------------------------------------

# Complex pattern matching using regular expressions
regex_rules:
  # Global security patterns (apply to all services)
  general:
    # Identity and authentication risks
    - pattern: "aws .* --profile\\s+(root|admin|administrator)"
      description: "Prevent use of sensitive profiles"
      error_message: "Using sensitive profiles (root, admin) is restricted for security reasons."
    
    # Protocol security risks
    - pattern: "aws .* --no-verify-ssl"
      description: "Prevent disabling SSL verification"
      error_message: "Disabling SSL verification is not allowed for security reasons."
    
    # Data exposure risks
    - pattern: "aws .* --output\\s+text\\s+.*--query\\s+.*Password"
      description: "Prevent password exposure in text output"
      error_message: "Outputting sensitive data like passwords in text format is restricted."
    
    # Debug mode risks
    - pattern: "aws .* --debug"
      description: "Prevent debug mode which shows sensitive info"
      error_message: "Debug mode is restricted as it may expose sensitive information."

  # IAM-specific security patterns
  iam:
    # Privileged user creation
    - pattern: "aws iam create-user.*--user-name\\s+(root|admin|administrator|backup|security|finance|billing)"
      description: "Prevent creation of privileged-sounding users"
      error_message: "Creating users with sensitive names is restricted for security reasons."
    
    # Privilege escalation via policies
    - pattern: "aws iam attach-user-policy.*--policy-arn\\s+.*Administrator"
      description: "Prevent attaching Administrator policies"
      error_message: "Attaching Administrator policies is restricted for security reasons."
    
    - pattern: "aws iam attach-user-policy.*--policy-arn\\s+.*FullAccess"
      description: "Prevent attaching FullAccess policies to users"
      error_message: "Attaching FullAccess policies directly to users is restricted (use roles instead)."
    
    # Unrestricted permissions in policies
    - pattern: "aws iam create-policy.*\"Effect\":\\s*\"Allow\".*\"Action\":\\s*\"\*\".*\"Resource\":\\s*\"\*\""
      description: "Prevent creation of policies with * permissions"
      error_message: "Creating policies with unrestricted (*) permissions is not allowed."
    
    # Password policy weakening
    - pattern: "aws iam create-login-profile.*--password-reset-required\\s+false"
      description: "Enforce password reset for new profiles"
      error_message: "Creating login profiles without requiring password reset is restricted."
    
    - pattern: "aws iam update-account-password-policy.*--require-uppercase-characters\\s+false"
      description: "Prevent weakening password policies"
      error_message: "Weakening account password policies is restricted."

  # S3 security patterns
  s3:
    # Public bucket exposure
    - pattern: "aws s3api put-bucket-policy.*\"Effect\":\\s*\"Allow\".*\"Principal\":\\s*\"\*\""
      description: "Prevent public bucket policies"
      error_message: "Creating public bucket policies is restricted for security reasons."
    
    # Disabling public access blocks
    - pattern: "aws s3api put-public-access-block.*--public-access-block-configuration\\s+.*\"BlockPublicAcls\":\\s*false"
      description: "Prevent disabling public access blocks"
      error_message: "Disabling S3 public access blocks is restricted for security reasons."
    
    # Public bucket creation outside approved regions
    - pattern: "aws s3api create-bucket.*--region\\s+(?!eu|us-east-1).*--acl\\s+public"
      description: "Prevent public buckets outside of allowed regions"
      error_message: "Creating public buckets outside allowed regions is restricted."

  # EC2 network security patterns
  ec2:
    # Open security groups for sensitive ports
    - pattern: "aws ec2 authorize-security-group-ingress.*--cidr\\s+0\\.0\\.0\\.0/0.*--port\\s+(?!80|443)[0-9]+"
      description: "Prevent open security groups for non-web ports"
      error_message: "Opening non-web ports to the entire internet (0.0.0.0/0) is restricted."
    
    # Unsafe user-data scripts
    - pattern: "aws ec2 run-instances.*--user-data\\s+.*curl.*\\|.*sh"
      description: "Detect potentially unsafe user-data scripts"
      error_message: "Running scripts from remote sources in user-data presents security risks."

  # CloudTrail integrity patterns
  cloudtrail:
    # Disabling global event logging
    - pattern: "aws cloudtrail update-trail.*--no-include-global-service-events"
      description: "Prevent disabling global event logging"
      error_message: "Disabling CloudTrail logging for global service events is restricted."
    
    # Making trails single-region
    - pattern: "aws cloudtrail update-trail.*--no-multi-region"
      description: "Prevent making trails single-region"
      error_message: "Changing CloudTrail trails from multi-region to single-region is restricted."
```

--------------------------------------------------------------------------------
/tests/unit/test_cli_executor.py:
--------------------------------------------------------------------------------

```python
"""Tests for the CLI executor module."""

import asyncio
from unittest.mock import AsyncMock, MagicMock, patch

import pytest

from aws_mcp_server.cli_executor import (
    CommandExecutionError,
    CommandValidationError,
    check_aws_cli_installed,
    execute_aws_command,
    execute_pipe_command,
    get_command_help,
    is_auth_error,
)
from aws_mcp_server.config import DEFAULT_TIMEOUT, MAX_OUTPUT_SIZE


@pytest.mark.asyncio
async def test_execute_aws_command_success():
    """Test successful command execution."""
    with patch("asyncio.create_subprocess_exec", new_callable=AsyncMock) as mock_subprocess:
        # Mock a successful process
        process_mock = AsyncMock()
        process_mock.returncode = 0
        process_mock.communicate.return_value = (b"Success output", b"")
        mock_subprocess.return_value = process_mock

        result = await execute_aws_command("aws s3 ls")

        assert result["status"] == "success"
        assert result["output"] == "Success output"
        mock_subprocess.assert_called_once_with("aws", "s3", "ls", stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE)


@pytest.mark.asyncio
async def test_execute_aws_command_ec2_with_region_added():
    """Test that region is automatically added to EC2 commands."""
    with patch("asyncio.create_subprocess_exec", new_callable=AsyncMock) as mock_subprocess:
        # Mock a successful process
        process_mock = AsyncMock()
        process_mock.returncode = 0
        process_mock.communicate.return_value = (b"EC2 instances", b"")
        mock_subprocess.return_value = process_mock

        # Import here to ensure the test uses the actual value
        from aws_mcp_server.config import AWS_REGION

        # Execute an EC2 command without region
        result = await execute_aws_command("aws ec2 describe-instances")

        assert result["status"] == "success"
        assert result["output"] == "EC2 instances"

        # Verify region was added to the command
        mock_subprocess.assert_called_once()
        call_args = mock_subprocess.call_args[0]
        assert call_args[0] == "aws"
        assert call_args[1] == "ec2"
        assert call_args[2] == "describe-instances"
        assert "--region" in call_args
        assert AWS_REGION in call_args


@pytest.mark.asyncio
async def test_execute_aws_command_with_custom_timeout():
    """Test command execution with custom timeout."""
    with patch("asyncio.create_subprocess_exec", new_callable=AsyncMock) as mock_subprocess:
        process_mock = AsyncMock()
        process_mock.returncode = 0
        process_mock.communicate.return_value = (b"Success output", b"")
        mock_subprocess.return_value = process_mock

        # Use a custom timeout
        custom_timeout = 120
        with patch("asyncio.wait_for") as mock_wait_for:
            mock_wait_for.return_value = (b"Success output", b"")
            await execute_aws_command("aws s3 ls", timeout=custom_timeout)

            # Check that wait_for was called with the custom timeout
            mock_wait_for.assert_called_once()
            args, kwargs = mock_wait_for.call_args
            assert kwargs.get("timeout") == custom_timeout or args[1] == custom_timeout


@pytest.mark.asyncio
async def test_execute_aws_command_error():
    """Test command execution error."""
    with patch("asyncio.create_subprocess_exec", new_callable=AsyncMock) as mock_subprocess:
        # Mock a failed process
        process_mock = AsyncMock()
        process_mock.returncode = 1
        # Set up an awaitable communicate method
        communicate_mock = AsyncMock()
        communicate_mock.return_value = (b"", b"Error message")
        process_mock.communicate = communicate_mock
        mock_subprocess.return_value = process_mock

        result = await execute_aws_command("aws s3 ls")

        assert result["status"] == "error"
        assert result["output"] == "Error message"
        # Verify communicate was called
        communicate_mock.assert_called_once()


@pytest.mark.asyncio
async def test_execute_aws_command_auth_error():
    """Test command execution with authentication error."""
    with patch("asyncio.create_subprocess_exec", new_callable=AsyncMock) as mock_subprocess:
        # Mock a process that returns auth error
        process_mock = AsyncMock()
        process_mock.returncode = 1
        process_mock.communicate.return_value = (b"", b"Unable to locate credentials")
        mock_subprocess.return_value = process_mock

        result = await execute_aws_command("aws s3 ls")

        assert result["status"] == "error"
        assert "Authentication error" in result["output"]
        assert "Unable to locate credentials" in result["output"]
        assert "Please check your AWS credentials" in result["output"]


@pytest.mark.asyncio
async def test_execute_aws_command_timeout():
    """Test command timeout."""
    with patch("asyncio.create_subprocess_exec", new_callable=AsyncMock) as mock_subprocess:
        # Mock a process that times out
        process_mock = AsyncMock()
        # Use a properly awaitable mock that raises TimeoutError
        communicate_mock = AsyncMock(side_effect=asyncio.TimeoutError())
        process_mock.communicate = communicate_mock
        mock_subprocess.return_value = process_mock

        # Mock a regular function instead of an async one for process.kill
        process_mock.kill = MagicMock()

        with pytest.raises(CommandExecutionError) as excinfo:
            await execute_aws_command("aws s3 ls", timeout=1)

        # Check error message
        assert "Command timed out after 1 seconds" in str(excinfo.value)

        # Verify process was killed
        process_mock.kill.assert_called_once()


@pytest.mark.asyncio
async def test_execute_aws_command_kill_failure():
    """Test failure to kill process after timeout."""
    with patch("asyncio.create_subprocess_exec", new_callable=AsyncMock) as mock_subprocess:
        # Mock a process that times out
        process_mock = AsyncMock()
        # Use a properly awaitable mock that raises TimeoutError
        communicate_mock = AsyncMock(side_effect=asyncio.TimeoutError())
        process_mock.communicate = communicate_mock
        # Use regular MagicMock since kill() is not an async method
        process_mock.kill = MagicMock(side_effect=Exception("Failed to kill process"))
        mock_subprocess.return_value = process_mock

        with pytest.raises(CommandExecutionError) as excinfo:
            await execute_aws_command("aws s3 ls", timeout=1)

        # The main exception should still be about the timeout
        assert "Command timed out after 1 seconds" in str(excinfo.value)


@pytest.mark.asyncio
async def test_execute_aws_command_general_exception():
    """Test handling of general exceptions during command execution."""
    with patch("asyncio.create_subprocess_exec", side_effect=Exception("Test exception")):
        with pytest.raises(CommandExecutionError) as excinfo:
            await execute_aws_command("aws s3 ls")

        assert "Failed to execute command" in str(excinfo.value)
        assert "Test exception" in str(excinfo.value)


@pytest.mark.asyncio
async def test_execute_aws_command_truncate_output():
    """Test truncation of large outputs."""
    with patch("asyncio.create_subprocess_exec", new_callable=AsyncMock) as mock_subprocess:
        # Mock a successful process with large output
        process_mock = AsyncMock()
        process_mock.returncode = 0

        # Generate a large output that exceeds MAX_OUTPUT_SIZE
        large_output = "x" * (MAX_OUTPUT_SIZE + 1000)
        process_mock.communicate.return_value = (large_output.encode("utf-8"), b"")
        mock_subprocess.return_value = process_mock

        result = await execute_aws_command("aws s3 ls")

        assert result["status"] == "success"
        assert len(result["output"]) <= MAX_OUTPUT_SIZE + 100  # Allow for the truncation message
        assert "output truncated" in result["output"]


@pytest.mark.parametrize(
    "error_message,expected_result",
    [
        # Positive cases
        ("Unable to locate credentials", True),
        ("Some text before ExpiredToken and after", True),
        ("Error: AccessDenied when attempting to perform operation", True),
        ("AuthFailure: credentials could not be verified", True),
        ("The security token included in the request is invalid", True),
        ("The config profile could not be found", True),
        # Negative cases
        ("S3 bucket not found", False),
        ("Resource not found: myresource", False),
        ("Invalid parameter value", False),
    ],
)
def test_is_auth_error(error_message, expected_result):
    """Test the is_auth_error function with various error messages."""
    assert is_auth_error(error_message) == expected_result


@pytest.mark.asyncio
@pytest.mark.parametrize(
    "returncode,stdout,stderr,exception,expected_result",
    [
        # CLI installed
        (0, b"aws-cli/2.15.0", b"", None, True),
        # CLI not installed - command not found
        (127, b"", b"command not found", None, False),
        # CLI error case
        (1, b"", b"some error", None, False),
        # Exception during command execution
        (None, None, None, Exception("Test exception"), False),
    ],
)
async def test_check_aws_cli_installed(returncode, stdout, stderr, exception, expected_result):
    """Test check_aws_cli_installed function with various scenarios."""
    if exception:
        with patch("asyncio.create_subprocess_exec", side_effect=exception):
            result = await check_aws_cli_installed()
            assert result is expected_result
    else:
        with patch("asyncio.create_subprocess_exec", new_callable=AsyncMock) as mock_subprocess:
            process_mock = AsyncMock()
            process_mock.returncode = returncode
            process_mock.communicate.return_value = (stdout, stderr)
            mock_subprocess.return_value = process_mock

            result = await check_aws_cli_installed()
            assert result is expected_result

            if returncode == 0:  # Only verify call args for success case to avoid redundancy
                mock_subprocess.assert_called_once_with("aws", "--version", stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE)


@pytest.mark.asyncio
@pytest.mark.parametrize(
    "service,command,mock_type,mock_value,expected_text,expected_call",
    [
        # Successful help retrieval with service and command
        ("s3", "ls", "return_value", {"status": "success", "output": "Help text"}, "Help text", "aws s3 ls help"),
        # Successful help retrieval with service only
        ("s3", None, "return_value", {"status": "success", "output": "Help text for service"}, "Help text for service", "aws s3 help"),
        # Error scenarios
        ("s3", "ls", "side_effect", CommandValidationError("Test validation error"), "Command validation error: Test validation error", None),
        ("s3", "ls", "side_effect", CommandExecutionError("Test execution error"), "Error retrieving help: Test execution error", None),
        ("s3", "ls", "side_effect", Exception("Test exception"), "Error retrieving help: Test exception", None),
        # Error result from AWS command
        ("s3", "ls", "return_value", {"status": "error", "output": "Command failed"}, "Error: Command failed", "aws s3 ls help"),
    ],
)
async def test_get_command_help(service, command, mock_type, mock_value, expected_text, expected_call):
    """Test get_command_help function with various scenarios."""
    with patch("aws_mcp_server.cli_executor.execute_aws_command", new_callable=AsyncMock) as mock_execute:
        # Configure the mock based on the test case
        if mock_type == "return_value":
            mock_execute.return_value = mock_value
        else:  # side_effect
            mock_execute.side_effect = mock_value

        # Call the function
        result = await get_command_help(service, command)

        # Verify the result
        assert expected_text in result["help_text"]

        # Verify the mock was called correctly if expected_call is provided
        if expected_call:
            mock_execute.assert_called_once_with(expected_call)


@pytest.mark.asyncio
async def test_execute_aws_command_with_pipe():
    """Test execute_aws_command with a piped command."""
    # Test that execute_aws_command calls execute_pipe_command for piped commands
    with patch("aws_mcp_server.cli_executor.is_pipe_command", return_value=True):
        with patch("aws_mcp_server.cli_executor.execute_pipe_command", new_callable=AsyncMock) as mock_pipe_exec:
            mock_pipe_exec.return_value = {"status": "success", "output": "Piped result"}

            result = await execute_aws_command("aws s3 ls | grep bucket")

            assert result["status"] == "success"
            assert result["output"] == "Piped result"
            mock_pipe_exec.assert_called_once_with("aws s3 ls | grep bucket", None)


@pytest.mark.asyncio
async def test_execute_pipe_command_success():
    """Test successful execution of a pipe command."""
    with patch("aws_mcp_server.cli_executor.validate_pipe_command") as mock_validate:
        with patch("aws_mcp_server.cli_executor.execute_piped_command", new_callable=AsyncMock) as mock_pipe_exec:
            mock_pipe_exec.return_value = {"status": "success", "output": "Filtered results"}

            result = await execute_pipe_command("aws s3 ls | grep bucket")

            assert result["status"] == "success"
            assert result["output"] == "Filtered results"
            mock_validate.assert_called_once_with("aws s3 ls | grep bucket")
            mock_pipe_exec.assert_called_once_with("aws s3 ls | grep bucket", None)


@pytest.mark.asyncio
async def test_execute_pipe_command_ec2_with_region_added():
    """Test that region is automatically added to EC2 commands in a pipe."""
    with patch("aws_mcp_server.cli_executor.validate_pipe_command"):
        with patch("aws_mcp_server.cli_executor.execute_piped_command", new_callable=AsyncMock) as mock_pipe_exec:
            mock_pipe_exec.return_value = {"status": "success", "output": "Filtered EC2 instances"}

            # Mock split_pipe_command to simulate pipe command splitting
            with patch("aws_mcp_server.cli_executor.split_pipe_command") as mock_split:
                mock_split.return_value = ["aws ec2 describe-instances", "grep instance-id"]

                # Import here to ensure the test uses the actual value
                from aws_mcp_server.config import AWS_REGION

                # Execute a piped EC2 command without region
                result = await execute_pipe_command("aws ec2 describe-instances | grep instance-id")

                assert result["status"] == "success"
                assert result["output"] == "Filtered EC2 instances"

                # Verify the command was modified to include region
                expected_cmd = f"aws ec2 describe-instances --region {AWS_REGION} | grep instance-id"
                mock_pipe_exec.assert_called_once_with(expected_cmd, None)


@pytest.mark.asyncio
async def test_execute_pipe_command_validation_error():
    """Test execute_pipe_command with validation error."""
    with patch("aws_mcp_server.cli_executor.validate_pipe_command", side_effect=CommandValidationError("Invalid pipe command")):
        with pytest.raises(CommandValidationError) as excinfo:
            await execute_pipe_command("invalid | pipe | command")

        assert "Invalid pipe command" in str(excinfo.value)


@pytest.mark.asyncio
async def test_execute_pipe_command_execution_error():
    """Test execute_pipe_command with execution error."""
    with patch("aws_mcp_server.cli_executor.validate_pipe_command"):
        with patch("aws_mcp_server.cli_executor.execute_piped_command", side_effect=Exception("Execution error")):
            with pytest.raises(CommandExecutionError) as excinfo:
                await execute_pipe_command("aws s3 ls | grep bucket")

            assert "Failed to execute piped command" in str(excinfo.value)
            assert "Execution error" in str(excinfo.value)


# New test cases to improve coverage


@pytest.mark.asyncio
async def test_execute_pipe_command_timeout():
    """Test timeout handling in piped commands."""
    with patch("aws_mcp_server.cli_executor.validate_pipe_command"):
        with patch("aws_mcp_server.cli_executor.execute_piped_command", new_callable=AsyncMock) as mock_exec:
            # Simulate timeout in the executed command
            mock_exec.return_value = {"status": "error", "output": f"Command timed out after {DEFAULT_TIMEOUT} seconds"}

            result = await execute_pipe_command("aws s3 ls | grep bucket")

            assert result["status"] == "error"
            assert f"Command timed out after {DEFAULT_TIMEOUT} seconds" in result["output"]
            mock_exec.assert_called_once()


@pytest.mark.asyncio
async def test_execute_pipe_command_with_custom_timeout():
    """Test piped command execution with custom timeout."""
    with patch("aws_mcp_server.cli_executor.validate_pipe_command"):
        with patch("aws_mcp_server.cli_executor.execute_piped_command", new_callable=AsyncMock) as mock_exec:
            mock_exec.return_value = {"status": "success", "output": "Piped output"}

            custom_timeout = 120
            await execute_pipe_command("aws s3 ls | grep bucket", timeout=custom_timeout)

            # Verify the custom timeout was passed to the execute_piped_command
            mock_exec.assert_called_once_with("aws s3 ls | grep bucket", custom_timeout)


@pytest.mark.asyncio
async def test_execute_pipe_command_large_output():
    """Test handling of large output in piped commands."""
    with patch("aws_mcp_server.cli_executor.validate_pipe_command"):
        with patch("aws_mcp_server.cli_executor.execute_piped_command", new_callable=AsyncMock) as mock_exec:
            # Generate large output that would be truncated
            large_output = "x" * (MAX_OUTPUT_SIZE + 1000)
            mock_exec.return_value = {"status": "success", "output": large_output}

            result = await execute_pipe_command("aws s3 ls | grep bucket")

            assert result["status"] == "success"
            assert len(result["output"]) == len(large_output)  # Length should be preserved here as truncation happens in tools module


@pytest.mark.parametrize(
    "exit_code,stderr,expected_status,expected_msg",
    [
        (0, b"", "success", ""),  # Success case
        (1, b"Error: bucket not found", "error", "Error: bucket not found"),  # Standard error
        (1, b"AccessDenied", "error", "Authentication error"),  # Auth error
        (0, b"Warning: deprecated feature", "success", ""),  # Warning on stderr but success exit code
    ],
)
@pytest.mark.asyncio
async def test_execute_aws_command_exit_codes(exit_code, stderr, expected_status, expected_msg):
    """Test handling of different process exit codes and stderr output."""
    with patch("asyncio.create_subprocess_exec", new_callable=AsyncMock) as mock_subprocess:
        process_mock = AsyncMock()
        process_mock.returncode = exit_code
        stdout = b"Command output" if exit_code == 0 else b""
        process_mock.communicate.return_value = (stdout, stderr)
        mock_subprocess.return_value = process_mock

        result = await execute_aws_command("aws s3 ls")

        assert result["status"] == expected_status
        if expected_status == "success":
            assert result["output"] == "Command output"
        else:
            assert expected_msg in result["output"]

```
Page 1/2FirstPrevNextLast