This is page 2 of 2. Use http://codebase.md/alexei-led/aws-mcp-server?page={x} to view the full context.
# Directory Structure
```
├── .dockerignore
├── .github
│ └── workflows
│ ├── ci.yml
│ └── release.yml
├── .gitignore
├── CLAUDE.md
├── codecov.yml
├── deploy
│ └── docker
│ ├── docker-compose.yml
│ └── Dockerfile
├── docs
│ └── VERSION.md
├── LICENSE
├── Makefile
├── media
│ └── demo.mp4
├── pyproject.toml
├── README.md
├── security_config_example.yaml
├── smithery.yaml
├── spec.md
├── src
│ └── aws_mcp_server
│ ├── __init__.py
│ ├── __main__.py
│ ├── cli_executor.py
│ ├── config.py
│ ├── prompts.py
│ ├── resources.py
│ ├── security.py
│ ├── server.py
│ └── tools.py
├── tests
│ ├── __init__.py
│ ├── conftest.py
│ ├── integration
│ │ ├── __init__.py
│ │ ├── test_aws_live.py
│ │ ├── test_security_integration.py
│ │ └── test_server_integration.py
│ ├── test_aws_integration.py
│ ├── test_aws_setup.py
│ ├── test_bucket_creation.py
│ ├── test_run_integration.py
│ └── unit
│ ├── __init__.py
│ ├── test_cli_executor.py
│ ├── test_init.py
│ ├── test_main.py
│ ├── test_prompts.py
│ ├── test_resources.py
│ ├── test_security.py
│ ├── test_server.py
│ └── test_tools.py
└── uv.lock
```
# Files
--------------------------------------------------------------------------------
/spec.md:
--------------------------------------------------------------------------------
```markdown
# AWS Model Context Protocol (MCP) Server Specification
## Project Overview
The **AWS MCP Server** is a lightweight service that enables users to execute AWS CLI commands through an MCP (Model Context Protocol) interface. It integrates with MCP-aware AI assistants (e.g., Claude Desktop, Cursor, Windsurf) via the [Model Context Protocol](https://modelcontextprotocol.io/), which is based on JSON-RPC 2.0. The server facilitates AWS CLI command documentation and execution, returning human-readable output optimized for AI consumption.
### Key Objectives
- **Command Documentation**: Provide detailed help information for AWS CLI commands.
- **Command Execution**: Execute AWS CLI commands and return formatted results.
- **MCP Compliance**: Fully implement the standard MCP protocol.
- **Human-Readable Output**: Ensure command output is optimized for AI assistants.
- **AWS Resource Context**: Provide access to AWS resources like profiles and regions.
- **Easy Deployment**: Prioritize Docker-based deployment for environment consistency.
- **Open Source**: Release under MIT license with GitHub repository and CI/CD.
## Core Features
### 1. Command Documentation Tool
The `describe_command` tool retrieves and formats AWS CLI help information:
- Use `aws help` and `aws <service> help` to access documentation.
- Present results in a structured, readable format optimized for AI consumption.
- Support parameter exploration to help understand command options.
**Examples:**
```
describe_command({"service": "s3"})
// Returns high-level AWS S3 service documentation
describe_command({"service": "s3", "command": "ls"})
// Returns specific documentation for the S3 ls command
```
### 2. Command Execution Tool
The `execute_command` tool runs AWS CLI commands:
- Accept complete AWS CLI command strings.
- Execute commands using the OS's AWS CLI installation.
- Format output for readability.
- Support optional parameters (timeout).
- Support Unix pipes to filter or transform output.
**Examples:**
```
execute_command({"command": "aws s3 ls"})
// Lists all S3 buckets
execute_command({"command": "aws ec2 describe-instances --region us-west-2"})
// Lists EC2 instances in the Oregon region
execute_command({"command": "aws s3api list-buckets --query 'Buckets[*].Name' --output text | sort"})
// Lists bucket names sorted alphabetically
```
### 3. AWS Context Resources
The server exposes AWS resources through the MCP Resources protocol:
- **AWS Profiles** (`aws://config/profiles`): Available AWS CLI profiles from AWS config.
- **AWS Regions** (`aws://config/regions`): List of available AWS regions.
- **AWS Region Details** (`aws://config/regions/{region}`): Detailed information about a specific region, including availability zones, geographic location, and services.
- **AWS Environment Variables** (`aws://config/environment`): Current AWS-related environment variables and credential information.
- **AWS Account Information** (`aws://config/account`): Information about the current AWS account.
These resources provide context for executing AWS commands, allowing AI assistants to suggest region-specific commands, use the correct profile, and understand the current AWS environment.
### 4. Output Formatting
Transform raw AWS CLI output into human-readable formats:
- Default to AWS CLI's default output format.
- Format complex outputs for better readability.
- Handle JSON, YAML, and text output formats.
- Support truncation for very large outputs.
### 5. Authentication Management
- Leverage existing AWS CLI authentication on the host machine.
- Support AWS profiles through command parameters.
- Provide clear error messages for authentication issues.
- Expose available profiles as MCP Resources.
### 6. Prompt Templates
Provide a collection of useful prompt templates for common AWS use cases:
- Resource creation with best practices
- Security audits
- Cost optimization
- Resource inventory
- Service troubleshooting
- IAM policy generation
- Service monitoring
- Disaster recovery
- Compliance checking
- Resource cleanup
## MCP Protocol Implementation
The server implements the MCP protocol with the following components:
### 1. Initialization Workflow
**Client Request:**
```json
{
"jsonrpc": "2.0",
"id": 1,
"method": "initialize",
"params": {
"protocolVersion": "DRAFT-2025-v1",
"capabilities": {
"experimental": {},
"resources": {}
},
"clientInfo": {
"name": "Claude Desktop",
"version": "1.0.0"
}
}
}
```
**Server Response:**
```json
{
"jsonrpc": "2.0",
"id": 1,
"result": {
"protocolVersion": "DRAFT-2025-v1",
"capabilities": {
"tools": {},
"resources": {}
},
"serverInfo": {
"name": "AWS MCP Server",
"version": "1.0.0"
},
"instructions": "Use this server to retrieve AWS CLI documentation and execute AWS CLI commands."
}
}
```
**Client Notification:**
```json
{
"jsonrpc": "2.0",
"method": "notifications/initialized"
}
```
### 2. Tool Definitions
The server defines two primary tools:
#### describe_command
**Request:**
```json
{
"jsonrpc": "2.0",
"id": 2,
"method": "tools/describe_command",
"params": {
"service": "s3",
"command": "ls" // Optional
}
}
```
**Response:**
```json
{
"jsonrpc": "2.0",
"id": 2,
"result": {
"help_text": "Description: Lists all your buckets or all the objects in a bucket.\n\nUsage: aws s3 ls [bucket] [options]\n\nOptions:\n --bucket TEXT The bucket name\n --prefix TEXT Prefix to filter objects\n --delimiter TEXT Delimiter to use for grouping\n --max-items INTEGER Maximum number of items to return\n --page-size INTEGER Number of items to return per page\n --starting-token TEXT Starting token for pagination\n --request-payer TEXT Confirms that the requester knows they will be charged for the request\n\nExamples:\n aws s3 ls\n aws s3 ls my-bucket\n aws s3 ls my-bucket --prefix folder/\n"
}
}
```
#### execute_command
**Request:**
```json
{
"jsonrpc": "2.0",
"id": 3,
"method": "tools/execute_command",
"params": {
"command": "aws s3 ls --region us-west-2",
"timeout": 60 // Optional
}
}
```
**Response:**
```json
{
"jsonrpc": "2.0",
"id": 3,
"result": {
"output": "2023-10-15 14:30:45 my-bucket-1\n2023-11-20 09:15:32 my-bucket-2",
"status": "success"
}
}
```
### 3. Resource Definitions
The server provides access to AWS resources:
#### aws_profiles
**Request:**
```json
{
"jsonrpc": "2.0",
"id": 4,
"method": "resources/aws_profiles"
}
```
**Response:**
```json
{
"jsonrpc": "2.0",
"id": 4,
"result": {
"profiles": [
{ "name": "default", "is_current": true },
{ "name": "dev" },
{ "name": "prod" }
]
}
}
```
#### aws_regions
**Request:**
```json
{
"jsonrpc": "2.0",
"id": 5,
"method": "resources/aws_regions"
}
```
**Response:**
```json
{
"jsonrpc": "2.0",
"id": 5,
"result": {
"regions": [
{ "name": "us-east-1", "description": "US East (N. Virginia)", "is_current": true },
{ "name": "us-east-2", "description": "US East (Ohio)" },
{ "name": "us-west-1", "description": "US West (N. California)" },
{ "name": "us-west-2", "description": "US West (Oregon)" }
]
}
}
```
#### aws_region_details
**Request:**
```json
{
"jsonrpc": "2.0",
"id": 8,
"method": "resources/aws_region_details",
"params": {
"region": "us-east-1"
}
}
```
**Response:**
```json
{
"jsonrpc": "2.0",
"id": 8,
"result": {
"code": "us-east-1",
"name": "US East (N. Virginia)",
"geographic_location": {
"continent": "North America",
"country": "United States",
"city": "Ashburn, Virginia"
},
"availability_zones": [
{
"name": "us-east-1a",
"state": "available",
"zone_id": "use1-az1",
"zone_type": "availability-zone"
},
{
"name": "us-east-1b",
"state": "available",
"zone_id": "use1-az2",
"zone_type": "availability-zone"
}
],
"services": ["ec2", "s3", "lambda", "dynamodb", "rds"],
"is_current": true
}
}
```
#### aws_environment
**Request:**
```json
{
"jsonrpc": "2.0",
"id": 6,
"method": "resources/aws_environment"
}
```
**Response:**
```json
{
"jsonrpc": "2.0",
"id": 6,
"result": {
"aws_profile": "default",
"aws_region": "us-east-1",
"aws_access_key_id": "AKI***********", // Masked for security
"has_credentials": true,
"credentials_source": "environment" // Can be "environment", "profile", "instance-profile", etc.
}
}
```
#### aws_account
**Request:**
```json
{
"jsonrpc": "2.0",
"id": 7,
"method": "resources/aws_account"
}
```
**Response:**
```json
{
"jsonrpc": "2.0",
"id": 7,
"result": {
"account_id": "123456789012",
"account_alias": "my-org",
"organization_id": "o-abc123"
}
}
```
### 4. Error Handling
The server returns standardized JSON-RPC error responses:
```json
{
"jsonrpc": "2.0",
"id": 3,
"error": {
"code": -32603,
"message": "Internal error",
"data": "AWS CLI command failed: Unable to locate credentials"
}
}
```
**Standard Error Codes:**
- `-32600`: Invalid Request
- `-32601`: Method Not Found
- `-32602`: Invalid Parameters
- `-32603`: Internal Error
## Architecture
### Component Architecture
```mermaid
graph TD
Client[MCP Client\nClaude/Cursor] <--> MCP[MCP Interface\nJSON-RPC]
MCP --> Tools[Tool Handler]
MCP --> Resources[Resources Handler]
MCP --> Prompts[Prompt Templates]
Tools --> Executor[AWS CLI Executor]
Resources --> AWS_Config[AWS Config Reader]
Resources --> AWS_STS[AWS STS Client]
style Client fill:#f9f,stroke:#333,stroke-width:2px
style MCP fill:#bbf,stroke:#333,stroke-width:2px
style Tools fill:#bfb,stroke:#333,stroke-width:2px
style Resources fill:#fbf,stroke:#333,stroke-width:2px
style Prompts fill:#bff,stroke:#333,stroke-width:2px
style Executor fill:#fbb,stroke:#333,stroke-width:2px
style AWS_Config fill:#ffd,stroke:#333,stroke-width:2px
style AWS_STS fill:#dff,stroke:#333,stroke-width:2px
```
### Current Components
1. **MCP Interface**
- Implements JSON-RPC 2.0 protocol endpoints
- Handles MCP initialization and notifications
- Routes tool requests to appropriate handlers
- Implemented using FastMCP library
2. **Tool Handler**
- Processes `describe_command` requests
- Processes `execute_command` requests
- Validates parameters
- Handles command execution with timeout
3. **AWS CLI Executor**
- Executes AWS CLI commands via subprocess
- Captures standard output and error streams
- Handles command timing and timeout
- Supports piped commands with Unix utilities
4. **Prompt Templates**
- Provides pre-defined prompt templates for common AWS tasks
- Helps ensure best practices in AWS operations
- Supports various use cases like security, cost optimization, etc.
### New Components for Resources
5. **Resources Handler**
- Manages MCP Resources capabilities
- Provides access to AWS-specific resources
- Handles resource requests and responds with resource data
6. **AWS Config Reader**
- Reads AWS configuration files (~/.aws/config, ~/.aws/credentials)
- Provides information about available profiles
- Respects AWS credential precedence rules
7. **AWS STS Client**
- Obtains AWS account information
- Verifies credential validity
- Provides current identity information
## Implementation Details
### 1. Server Implementation
**Current Python Implementation:**
```python
from mcp.server.fastmcp import Context, FastMCP
from pydantic import Field
# Create the FastMCP server
mcp = FastMCP(
"AWS MCP Server",
instructions=INSTRUCTIONS,
version=SERVER_INFO["version"],
)
# Register tools
@mcp.tool()
async def describe_command(
service: str = Field(description="AWS service (e.g., s3, ec2)"),
command: str | None = Field(description="Command within the service", default=None),
ctx: Context | None = None,
) -> CommandHelpResult:
"""Get AWS CLI command documentation."""
# Implementation...
@mcp.tool()
async def execute_command(
command: str = Field(description="Complete AWS CLI command to execute"),
timeout: int | None = Field(description="Timeout in seconds", default=None),
ctx: Context | None = None,
) -> CommandResult:
"""Execute an AWS CLI command."""
# Implementation...
# Register prompts
register_prompts(mcp)
```
**Resource Implementation:**
```python
# Register all MCP resources
def register_resources(mcp):
"""Register all resources with the MCP server instance."""
logger.info("Registering AWS resources")
@mcp.resource(uri="aws://config/profiles", mime_type="application/json")
async def aws_profiles() -> dict:
"""Get available AWS profiles."""
profiles = get_aws_profiles()
current_profile = os.environ.get("AWS_PROFILE", "default")
return {
"profiles": [
{"name": profile, "is_current": profile == current_profile}
for profile in profiles
]
}
@mcp.resource(uri="aws://config/regions", mime_type="application/json")
async def aws_regions() -> dict:
"""Get available AWS regions."""
regions = get_aws_regions()
current_region = os.environ.get("AWS_REGION", os.environ.get("AWS_DEFAULT_REGION", "us-east-1"))
return {
"regions": [
{
"name": region["RegionName"],
"description": region["RegionDescription"],
"is_current": region["RegionName"] == current_region,
}
for region in regions
]
}
@mcp.resource(uri="aws://config/regions/{region}", mime_type="application/json")
async def aws_region_details(region: str) -> dict:
"""Get detailed information about a specific AWS region."""
return get_region_details(region)
@mcp.resource(uri="aws://config/environment", mime_type="application/json")
async def aws_environment() -> dict:
"""Get AWS environment information."""
return get_aws_environment()
@mcp.resource(uri="aws://config/account", mime_type="application/json")
async def aws_account() -> dict:
"""Get AWS account information."""
return get_aws_account_info()
```
### 2. Directory Structure
Current structure:
```
aws-mcp-server/
├── src/
│ ├── aws_mcp_server/
│ │ ├── __init__.py
│ │ ├── __main__.py
│ │ ├── cli_executor.py
│ │ ├── config.py
│ │ ├── prompts.py
│ │ ├── server.py
│ │ └── tools.py
├── tests/
│ ├── unit/
│ │ └── ...
│ └── integration/
│ └── ...
├── deploy/
│ └── docker/
│ ├── Dockerfile
│ └── docker-compose.yml
├── docs/
│ └── VERSION.md
├── pyproject.toml
└── README.md
```
Extended structure with resources:
```
aws-mcp-server/
├── src/
│ ├── aws_mcp_server/
│ │ ├── __init__.py
│ │ ├── __main__.py
│ │ ├── cli_executor.py
│ │ ├── config.py
│ │ ├── prompts.py
│ │ ├── resources.py # New file for resource implementations
│ │ ├── server.py
│ │ └── tools.py
├── tests/
│ ├── unit/
│ │ ├── test_resources.py # New tests for resources
│ │ └── ...
│ └── integration/
│ └── ...
├── deploy/
│ └── docker/
│ ├── Dockerfile
│ └── docker-compose.yml
├── docs/
│ └── VERSION.md
├── pyproject.toml
└── README.md
```
### 3. Error Handling Strategy
Implement comprehensive error handling for common scenarios:
- **AWS CLI Not Installed**: Check for AWS CLI presence at startup
- **Authentication Failures**: Return clear error messages with resolution steps
- **Permission Issues**: Clarify required AWS permissions
- **Invalid Commands**: Validate commands before execution
- **Timeout Handling**: Set reasonable command timeouts (default: 300 seconds)
- **Resource Access Failures**: Handle failures to access AWS resources gracefully
## Deployment Strategy
### 1. Docker Deployment (Primary Method)
**Dockerfile:**
```dockerfile
FROM python:3.13-slim
# Install AWS CLI v2
RUN apt-get update && apt-get install -y \
unzip \
curl \
less \
&& curl "https://awscli.amazonaws.com/awscli-exe-linux-x86_64.zip" -o "awscliv2.zip" \
&& unzip awscliv2.zip \
&& ./aws/install \
&& rm -rf awscliv2.zip aws \
&& apt-get clean \
&& rm -rf /var/lib/apt/lists/*
WORKDIR /app
# Copy application files
COPY pyproject.toml .
COPY uv.lock .
RUN pip install uv && uv pip sync --system uv.lock
COPY src/ ./src/
# Command to run the MCP server
ENTRYPOINT ["python", "-m", "aws_mcp_server"]
```
**Docker Compose:**
```yaml
version: '3'
services:
aws-mcp-server:
build: .
volumes:
- ~/.aws:/root/.aws:ro # Mount AWS credentials as read-only
environment:
- AWS_PROFILE=default # Optional: specify AWS profile
- AWS_REGION=us-east-1 # Optional: specify AWS region
```
### 2. Alternative: Python Virtual Environment
For users who prefer direct Python installation:
```bash
# Clone repository
git clone https://github.com/username/aws-mcp-server.git
cd aws-mcp-server
# Create and activate virtual environment
python -m venv .venv
source .venv/bin/activate # On Windows: .venv\Scripts\activate
# Install dependencies with uv
pip install uv
uv pip sync --system uv.lock
# Run server
python -m aws_mcp_server
```
## Testing Strategy
### 1. Unit Tests
Test individual components in isolation:
- **CLI Executor Tests**: Mock subprocess calls to verify command construction
- **Resource Provider Tests**: Verify proper extraction of AWS profiles, regions, etc.
- **MCP Resource Tests**: Test resource endpoint implementations
### 2. Integration Tests
Test end-to-end functionality:
- **MCP Protocol Tests**: Verify proper protocol implementation
- **AWS CLI Integration**: Test with actual AWS CLI using mock credentials
- **Resource Access Tests**: Verify correct resource information retrieval
### 3. Test Automation
Implement CI/CD with GitHub Actions:
```yaml
name: Test and Build
on:
push:
branches: [ main ]
pull_request:
branches: [ main ]
jobs:
test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: '3.13'
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install uv
uv pip sync --system uv.lock
- name: Test with pytest
run: |
pytest --cov=src tests/
build-docker:
needs: test
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Build Docker image
run: docker build -t aws-mcp-server .
- name: Test Docker image
run: |
docker run --rm aws-mcp-server python -c "import aws_mcp_server; print('OK')"
```
## Security Considerations
### Authentication Handling
- Use AWS credentials on the host machine
- Support profile specification through environment variables
- Never store or log AWS credentials
- Mask sensitive credential information in resource outputs
### Command Validation
- Verify all commands begin with "aws" prefix
- Implement a simple allow/deny pattern for certain services or commands
- Rely on MCP host's approval mechanism for command execution
### Resource Limitations
- Set reasonable timeouts for command execution (default: 300 seconds)
- Limit output size to prevent memory issues (default: 100,000 characters)
- Implement rate limiting for multiple rapid commands
## Conclusion
This updated AWS MCP Server specification provides a clear approach for building a server that integrates with the Model Context Protocol to execute AWS CLI commands and provide AWS resource context through MCP Resources. The implementation leverages the FastMCP library and follows best practices for AWS tool development.
The updated specification enhances the original by adding MCP Resources support for AWS profiles, regions, environment, and account information. These resources provide valuable context for AI assistants to generate more accurate and relevant AWS CLI commands based on the user's AWS environment.
```
--------------------------------------------------------------------------------
/src/aws_mcp_server/resources.py:
--------------------------------------------------------------------------------
```python
"""AWS Resource definitions for the AWS MCP Server.
This module provides MCP Resources that expose AWS environment information
including available profiles, regions, and current configuration state.
"""
import configparser
import logging
import os
import re
from typing import Any, Dict, List, Optional
import boto3
from botocore.exceptions import BotoCoreError, ClientError
logger = logging.getLogger(__name__)
def get_aws_profiles() -> List[str]:
"""Get available AWS profiles from config and credentials files.
Reads the AWS config and credentials files to extract all available profiles.
Returns:
List of profile names
"""
profiles = ["default"] # default profile always exists
config_paths = [
os.path.expanduser("~/.aws/config"),
os.path.expanduser("~/.aws/credentials"),
]
try:
for config_path in config_paths:
if not os.path.exists(config_path):
continue
config = configparser.ConfigParser()
config.read(config_path)
for section in config.sections():
# In config file, profiles are named [profile xyz] except default
# In credentials file, profiles are named [xyz]
profile_match = re.match(r"profile\s+(.+)", section)
if profile_match:
# This is from config file
profile_name = profile_match.group(1)
if profile_name not in profiles:
profiles.append(profile_name)
elif section != "default" and section not in profiles:
# This is likely from credentials file
profiles.append(section)
except Exception as e:
logger.warning(f"Error reading AWS profiles: {e}")
return profiles
def get_aws_regions() -> List[Dict[str, str]]:
"""Get available AWS regions.
Uses boto3 to retrieve the list of available AWS regions.
Automatically uses credentials from environment variables if no config file is available.
Returns:
List of region dictionaries with name and description
"""
try:
# Create a session - boto3 will automatically use credentials from
# environment variables if no config file is available
session = boto3.session.Session(region_name=os.environ.get("AWS_REGION", os.environ.get("AWS_DEFAULT_REGION", "us-east-1")))
ec2 = session.client("ec2")
response = ec2.describe_regions()
# Format the regions
regions = []
for region in response["Regions"]:
region_name = region["RegionName"]
# Create a friendly name based on the region code
description = _get_region_description(region_name)
regions.append({"RegionName": region_name, "RegionDescription": description})
# Sort regions by name
regions.sort(key=lambda r: r["RegionName"])
return regions
except (BotoCoreError, ClientError) as e:
logger.warning(f"Error fetching AWS regions: {e}")
# Fallback to a static list of common regions
return [
{"RegionName": "us-east-1", "RegionDescription": "US East (N. Virginia)"},
{"RegionName": "us-east-2", "RegionDescription": "US East (Ohio)"},
{"RegionName": "us-west-1", "RegionDescription": "US West (N. California)"},
{"RegionName": "us-west-2", "RegionDescription": "US West (Oregon)"},
{"RegionName": "eu-west-1", "RegionDescription": "EU West (Ireland)"},
{"RegionName": "eu-west-2", "RegionDescription": "EU West (London)"},
{"RegionName": "eu-central-1", "RegionDescription": "EU Central (Frankfurt)"},
{"RegionName": "ap-northeast-1", "RegionDescription": "Asia Pacific (Tokyo)"},
{"RegionName": "ap-northeast-2", "RegionDescription": "Asia Pacific (Seoul)"},
{"RegionName": "ap-southeast-1", "RegionDescription": "Asia Pacific (Singapore)"},
{"RegionName": "ap-southeast-2", "RegionDescription": "Asia Pacific (Sydney)"},
{"RegionName": "sa-east-1", "RegionDescription": "South America (São Paulo)"},
]
except Exception as e:
logger.warning(f"Unexpected error fetching AWS regions: {e}")
return []
def _get_region_description(region_code: str) -> str:
"""Convert region code to a human-readable description.
Args:
region_code: AWS region code (e.g., us-east-1)
Returns:
Human-readable region description
"""
region_map = {
"us-east-1": "US East (N. Virginia)",
"us-east-2": "US East (Ohio)",
"us-west-1": "US West (N. California)",
"us-west-2": "US West (Oregon)",
"af-south-1": "Africa (Cape Town)",
"ap-east-1": "Asia Pacific (Hong Kong)",
"ap-south-1": "Asia Pacific (Mumbai)",
"ap-northeast-1": "Asia Pacific (Tokyo)",
"ap-northeast-2": "Asia Pacific (Seoul)",
"ap-northeast-3": "Asia Pacific (Osaka)",
"ap-southeast-1": "Asia Pacific (Singapore)",
"ap-southeast-2": "Asia Pacific (Sydney)",
"ap-southeast-3": "Asia Pacific (Jakarta)",
"ca-central-1": "Canada (Central)",
"eu-central-1": "EU Central (Frankfurt)",
"eu-west-1": "EU West (Ireland)",
"eu-west-2": "EU West (London)",
"eu-west-3": "EU West (Paris)",
"eu-north-1": "EU North (Stockholm)",
"eu-south-1": "EU South (Milan)",
"me-south-1": "Middle East (Bahrain)",
"sa-east-1": "South America (São Paulo)",
}
return region_map.get(region_code, f"AWS Region {region_code}")
def get_region_available_services(session: boto3.session.Session, region_code: str) -> List[Dict[str, str]]:
"""Get available AWS services for a specific region.
Uses the Service Quotas API to get a comprehensive list of services available
in the given region. Falls back to testing client creation for common services
if the Service Quotas API fails.
Args:
session: Boto3 session to use for API calls
region_code: AWS region code (e.g., us-east-1)
Returns:
List of dictionaries with service ID and name
"""
available_services = []
try:
# Create a Service Quotas client
quotas_client = session.client("service-quotas", region_name=region_code)
# List all services available in the region
next_token = None
while True:
if next_token:
response = quotas_client.list_services(NextToken=next_token)
else:
response = quotas_client.list_services()
# Extract service codes
for service in response.get("Services", []):
service_code = service.get("ServiceCode")
if service_code:
# Convert ServiceQuota service codes to boto3 service names
# by removing the "AWS." prefix if present
boto3_service_id = service_code
if service_code.startswith("AWS."):
boto3_service_id = service_code[4:].lower()
# Some other service codes need additional transformation
elif "." in service_code:
boto3_service_id = service_code.split(".")[-1].lower()
else:
boto3_service_id = service_code.lower()
available_services.append({"id": boto3_service_id, "name": service.get("ServiceName", service_code)})
# Check if there are more services to fetch
next_token = response.get("NextToken")
if not next_token:
break
except Exception as e:
logger.debug(f"Error fetching services with Service Quotas API for {region_code}: {e}")
# Fall back to the client creation method for a subset of common services
common_services = [
"ec2",
"s3",
"lambda",
"rds",
"dynamodb",
"cloudformation",
"sqs",
"sns",
"iam",
"cloudwatch",
"kinesis",
"apigateway",
"ecs",
"ecr",
"eks",
"route53",
"secretsmanager",
"ssm",
"kms",
"elasticbeanstalk",
"elasticache",
"elasticsearch",
]
for service_name in common_services:
try:
# Try to create a client for the service in the region
# If it succeeds, the service is available
session.client(service_name, region_name=region_code)
available_services.append(
{"id": service_name, "name": service_name.upper() if service_name in ["ec2", "s3"] else service_name.replace("-", " ").title()}
)
except Exception:
# If client creation fails, the service might not be available in this region
pass
return available_services
def _get_region_geographic_location(region_code: str) -> Dict[str, str]:
"""Get geographic location information for a region.
Args:
region_code: AWS region code (e.g., us-east-1)
Returns:
Dictionary with geographic information
"""
# Map of region codes to geographic information
geo_map = {
"us-east-1": {"continent": "North America", "country": "United States", "city": "Ashburn, Virginia"},
"us-east-2": {"continent": "North America", "country": "United States", "city": "Columbus, Ohio"},
"us-west-1": {"continent": "North America", "country": "United States", "city": "San Francisco, California"},
"us-west-2": {"continent": "North America", "country": "United States", "city": "Portland, Oregon"},
"af-south-1": {"continent": "Africa", "country": "South Africa", "city": "Cape Town"},
"ap-east-1": {"continent": "Asia", "country": "China", "city": "Hong Kong"},
"ap-south-1": {"continent": "Asia", "country": "India", "city": "Mumbai"},
"ap-northeast-1": {"continent": "Asia", "country": "Japan", "city": "Tokyo"},
"ap-northeast-2": {"continent": "Asia", "country": "South Korea", "city": "Seoul"},
"ap-northeast-3": {"continent": "Asia", "country": "Japan", "city": "Osaka"},
"ap-southeast-1": {"continent": "Asia", "country": "Singapore", "city": "Singapore"},
"ap-southeast-2": {"continent": "Oceania", "country": "Australia", "city": "Sydney"},
"ap-southeast-3": {"continent": "Asia", "country": "Indonesia", "city": "Jakarta"},
"ca-central-1": {"continent": "North America", "country": "Canada", "city": "Montreal"},
"eu-central-1": {"continent": "Europe", "country": "Germany", "city": "Frankfurt"},
"eu-west-1": {"continent": "Europe", "country": "Ireland", "city": "Dublin"},
"eu-west-2": {"continent": "Europe", "country": "United Kingdom", "city": "London"},
"eu-west-3": {"continent": "Europe", "country": "France", "city": "Paris"},
"eu-north-1": {"continent": "Europe", "country": "Sweden", "city": "Stockholm"},
"eu-south-1": {"continent": "Europe", "country": "Italy", "city": "Milan"},
"me-south-1": {"continent": "Middle East", "country": "Bahrain", "city": "Manama"},
"sa-east-1": {"continent": "South America", "country": "Brazil", "city": "São Paulo"},
}
# Return default information if region not found
default_geo = {"continent": "Unknown", "country": "Unknown", "city": "Unknown"}
return geo_map.get(region_code, default_geo)
def get_region_details(region_code: str) -> Dict[str, Any]:
"""Get detailed information about a specific AWS region.
Args:
region_code: AWS region code (e.g., us-east-1)
Returns:
Dictionary with region details
"""
region_info = {
"code": region_code,
"name": _get_region_description(region_code),
"geographic_location": _get_region_geographic_location(region_code),
"availability_zones": [],
"services": [],
"is_current": region_code == os.environ.get("AWS_REGION", os.environ.get("AWS_DEFAULT_REGION", "us-east-1")),
}
try:
# Create a session with the specified region
session = boto3.session.Session(region_name=region_code)
# Get availability zones
try:
ec2 = session.client("ec2", region_name=region_code)
response = ec2.describe_availability_zones(Filters=[{"Name": "region-name", "Values": [region_code]}])
azs = []
for az in response.get("AvailabilityZones", []):
azs.append(
{
"name": az.get("ZoneName", ""),
"state": az.get("State", ""),
"zone_id": az.get("ZoneId", ""),
"zone_type": az.get("ZoneType", ""),
}
)
region_info["availability_zones"] = azs
except Exception as e:
logger.debug(f"Error fetching availability zones for {region_code}: {e}")
# Get available services for the region
region_info["services"] = get_region_available_services(session, region_code)
except Exception as e:
logger.warning(f"Error fetching region details for {region_code}: {e}")
return region_info
def get_aws_environment() -> Dict[str, str]:
"""Get information about the current AWS environment.
Collects information about the active AWS environment,
including profile, region, and credential status.
Works with both config files and environment variables for credentials.
Returns:
Dictionary with AWS environment information
"""
env_info = {
"aws_profile": os.environ.get("AWS_PROFILE", "default"),
"aws_region": os.environ.get("AWS_REGION", os.environ.get("AWS_DEFAULT_REGION", "us-east-1")),
"has_credentials": False,
"credentials_source": "none",
}
try:
# Try to load credentials from the session (preferred method)
session = boto3.session.Session()
credentials = session.get_credentials()
if credentials:
env_info["has_credentials"] = True
source = "profile"
# Determine credential source if possible
if credentials.method == "shared-credentials-file":
source = "profile"
elif credentials.method == "environment":
source = "environment"
elif credentials.method == "iam-role":
source = "instance-profile"
elif credentials.method == "assume-role":
source = "assume-role"
elif credentials.method == "container-role":
source = "container-role"
env_info["credentials_source"] = source
except Exception as e:
logger.warning(f"Error checking credentials: {e}")
return env_info
def _mask_key(key: str) -> str:
"""Mask a sensitive key for security.
Args:
key: The key to mask
Returns:
Masked key with only the first few characters visible
"""
if not key:
return ""
# Show only first few characters
visible_len = min(3, len(key))
return key[:visible_len] + "*" * (len(key) - visible_len)
def get_aws_account_info() -> Dict[str, Optional[str]]:
"""Get information about the current AWS account.
Uses STS to retrieve account ID and alias information.
Automatically uses credentials from environment variables if no config file is available.
Returns:
Dictionary with AWS account information
"""
account_info = {
"account_id": None,
"account_alias": None,
"organization_id": None,
}
try:
# Create a session - boto3 will automatically use credentials from
# environment variables if no config file is available
session = boto3.session.Session(region_name=os.environ.get("AWS_REGION", os.environ.get("AWS_DEFAULT_REGION", "us-east-1")))
# Get account ID from STS
sts = session.client("sts")
account_id = sts.get_caller_identity().get("Account")
account_info["account_id"] = account_id
# Try to get account alias
if account_id:
try:
iam = session.client("iam")
aliases = iam.list_account_aliases().get("AccountAliases", [])
if aliases:
account_info["account_alias"] = aliases[0]
except Exception as e:
logger.debug(f"Error getting account alias: {e}")
# Try to get organization info
try:
org = session.client("organizations")
# First try to get organization info
try:
org_response = org.describe_organization()
if "OrganizationId" in org_response:
account_info["organization_id"] = org_response["OrganizationId"]
except Exception:
# Then try to get account-specific info if org-level call fails
account_response = org.describe_account(AccountId=account_id)
if "Account" in account_response and "Id" in account_response["Account"]:
# The account ID itself isn't the organization ID, but we might
# be able to extract information from other means
account_info["account_id"] = account_response["Account"]["Id"]
except Exception as e:
# Organizations access is often restricted, so this is expected to fail in many cases
logger.debug(f"Error getting organization info: {e}")
except Exception as e:
logger.warning(f"Error getting AWS account info: {e}")
return account_info
def register_resources(mcp):
"""Register all resources with the MCP server instance.
Args:
mcp: The FastMCP server instance
"""
logger.info("Registering AWS resources")
@mcp.resource(name="aws_profiles", description="Get available AWS profiles", uri="aws://config/profiles", mime_type="application/json")
async def aws_profiles() -> dict:
"""Get available AWS profiles.
Retrieves a list of available AWS profile names from the
AWS configuration and credentials files.
Returns:
Dictionary with profile information
"""
profiles = get_aws_profiles()
current_profile = os.environ.get("AWS_PROFILE", "default")
return {"profiles": [{"name": profile, "is_current": profile == current_profile} for profile in profiles]}
@mcp.resource(name="aws_regions", description="Get available AWS regions", uri="aws://config/regions", mime_type="application/json")
async def aws_regions() -> dict:
"""Get available AWS regions.
Retrieves a list of available AWS regions with
their descriptive names.
Returns:
Dictionary with region information
"""
regions = get_aws_regions()
current_region = os.environ.get("AWS_REGION", os.environ.get("AWS_DEFAULT_REGION", "us-east-1"))
return {
"regions": [
{
"name": region["RegionName"],
"description": region["RegionDescription"],
"is_current": region["RegionName"] == current_region,
}
for region in regions
]
}
@mcp.resource(
name="aws_region_details",
description="Get detailed information about a specific AWS region",
uri="aws://config/regions/{region}",
mime_type="application/json",
)
async def aws_region_details(region: str) -> dict:
"""Get detailed information about a specific AWS region.
Retrieves detailed information about a specific AWS region,
including its name, code, availability zones, geographic location,
and available services.
Args:
region: AWS region code (e.g., us-east-1)
Returns:
Dictionary with detailed region information
"""
logger.info(f"Getting detailed information for region: {region}")
return get_region_details(region)
@mcp.resource(name="aws_environment", description="Get AWS environment information", uri="aws://config/environment", mime_type="application/json")
async def aws_environment() -> dict:
"""Get AWS environment information.
Retrieves information about the current AWS environment,
including profile, region, and credential status.
Returns:
Dictionary with environment information
"""
return get_aws_environment()
@mcp.resource(name="aws_account", description="Get AWS account information", uri="aws://config/account", mime_type="application/json")
async def aws_account() -> dict:
"""Get AWS account information.
Retrieves information about the current AWS account,
including account ID and alias.
Returns:
Dictionary with account information
"""
return get_aws_account_info()
logger.info("Successfully registered all AWS resources")
```
--------------------------------------------------------------------------------
/src/aws_mcp_server/security.py:
--------------------------------------------------------------------------------
```python
"""Security utilities for AWS MCP Server.
This module provides security validation for AWS CLI commands,
including validation of command structure, dangerous command detection,
and pipe command validation.
"""
import logging
import re
import shlex
from dataclasses import dataclass, field
from pathlib import Path
from typing import Dict, List, Optional
import yaml
from aws_mcp_server.config import SECURITY_CONFIG_PATH, SECURITY_MODE
from aws_mcp_server.tools import (
is_pipe_command,
split_pipe_command,
validate_unix_command,
)
logger = logging.getLogger(__name__)
# Default dictionary of potentially dangerous commands by security category
# Focus on commands that could lead to security incidents, privilege escalation,
# credential theft, or account takeover
DEFAULT_DANGEROUS_COMMANDS: Dict[str, List[str]] = {
# Identity and Access Management - core of security
"iam": [
"aws iam create-user", # Creating new users (potential backdoor accounts)
"aws iam create-access-key", # Creating credentials (could lead to credential theft)
"aws iam attach-user-policy", # Attaching policies to users (privilege escalation)
"aws iam attach-role-policy", # Attaching policies to roles (privilege escalation)
"aws iam attach-group-policy", # Attaching policies to groups (privilege escalation)
"aws iam create-policy", # Creating new policies (potentially overprivileged)
"aws iam put-user-policy", # Inline policies for users (privilege escalation)
"aws iam put-role-policy", # Inline policies for roles (privilege escalation)
"aws iam put-group-policy", # Inline policies for groups (privilege escalation)
"aws iam create-login-profile", # Creating console passwords (potential backdoor)
"aws iam update-access-key", # Updating access key status (credential management)
"aws iam update-assume-role-policy", # Changing who can assume a role
"aws iam remove-role-from-instance-profile", # Removing roles (privilege escalation)
"aws iam update-role", # Modifying role (privilege escalation)
"aws iam create-virtual-mfa-device", # Creating MFA devices
"aws iam deactivate-mfa-device", # Disabling MFA (security circumvention)
"aws iam delete-", # Any IAM delete operations (potential denial of service)
],
# Security, Identity & Compliance services
"organizations": [
"aws organizations create-account", # Creating accounts
"aws organizations leave-organization", # Leaving an organization
"aws organizations remove-account-from-organization", # Removing accounts
"aws organizations disable-policy-type", # Disabling policy enforcement
"aws organizations create-policy", # Creating organization policies
"aws organizations attach-policy", # Attaching organization policies
],
"sts": [
"aws sts assume-role", # Assuming roles with potentially higher privileges
"aws sts get-session-token", # Getting session tokens
"aws sts get-federation-token", # Getting federated tokens
],
"secretsmanager": [
"aws secretsmanager put-secret-value", # Changing secrets
"aws secretsmanager update-secret", # Updating secrets
"aws secretsmanager delete-secret", # Deleting secrets
"aws secretsmanager restore-secret", # Restoring deleted secrets
],
"kms": [
"aws kms schedule-key-deletion", # Scheduling key deletion (potential data loss)
"aws kms disable-key", # Disabling keys (potential data loss)
"aws kms create-grant", # Creating grants (key access)
"aws kms revoke-grant", # Revoking grants (potential denial of service)
],
# Audit & Logging services - tampering with these is critical
"cloudtrail": [
"aws cloudtrail delete-trail", # Deleting audit trails
"aws cloudtrail stop-logging", # Stopping audit logging
"aws cloudtrail update-trail", # Modifying audit configurations
"aws cloudtrail put-event-selectors", # Changing what events are logged
],
"cloudwatch": [
"aws cloudwatch delete-alarms", # Deleting security alarms
"aws cloudwatch disable-alarm-actions", # Disabling alarm actions
"aws cloudwatch delete-dashboards", # Deleting monitoring dashboards
],
"config": [
"aws configservice delete-configuration-recorder", # Deleting config recording
"aws configservice stop-configuration-recorder", # Stopping config recording
"aws configservice delete-delivery-channel", # Deleting config delivery
"aws configservice delete-remediation-configuration", # Deleting auto-remediation
],
"guardduty": [
"aws guardduty delete-detector", # Deleting threat detection
"aws guardduty disable-organization-admin-account", # Disabling central security
"aws guardduty update-detector", # Modifying threat detection
],
# Network & Data security
"ec2": [
"aws ec2 authorize-security-group-ingress", # Opening inbound network access
"aws ec2 authorize-security-group-egress", # Opening outbound network access
"aws ec2 modify-instance-attribute", # Changing security attributes
],
"s3": [
"aws s3api put-bucket-policy", # Changing bucket permissions
"aws s3api put-bucket-acl", # Changing bucket ACLs
"aws s3api delete-bucket-policy", # Removing bucket policy protections
"aws s3api delete-bucket-encryption", # Removing encryption
"aws s3api put-public-access-block", # Changing public access settings
],
}
# Default dictionary of safe patterns that override dangerous commands
# These patterns explicitly allow read-only operations that are needed for normal use
DEFAULT_SAFE_PATTERNS: Dict[str, List[str]] = {
# Universal safe patterns for any AWS service
"general": [
"--help", # All help commands are safe
"help", # All help subcommands are safe
"--version", # Version information is safe
"--dry-run", # Dry run operations don't make changes
],
# Identity and Access Management
"iam": [
"aws iam get-", # Read-only IAM operations
"aws iam list-", # Listing IAM resources
"aws iam generate-credential-report", # Generate reports (no security impact)
"aws iam generate-service-last-accessed-details", # Generate access reports
"aws iam simulate-custom-policy", # Policy simulation (no changes)
"aws iam simulate-principal-policy", # Policy simulation (no changes)
],
# Security, Identity & Compliance services
"organizations": [
"aws organizations describe-", # Read-only Organizations operations
"aws organizations list-", # Listing Organization resources
],
"sts": [
"aws sts get-caller-identity", # Checking current identity (safe)
"aws sts decode-authorization-message", # Decoding error messages (safe)
],
"secretsmanager": [
"aws secretsmanager get-", # Reading secrets (note: still sensitive)
"aws secretsmanager list-", # Listing secrets
"aws secretsmanager describe-", # Reading metadata about secrets
],
"kms": [
"aws kms describe-", # Reading key metadata
"aws kms get-", # Getting key information
"aws kms list-", # Listing keys
],
# Audit & Logging services
"cloudtrail": [
"aws cloudtrail describe-", # Reading trail info
"aws cloudtrail get-", # Getting trail settings
"aws cloudtrail list-", # Listing trails
"aws cloudtrail lookup-events", # Searching events (read-only)
],
"cloudwatch": [
"aws cloudwatch describe-", # Reading alarm info
"aws cloudwatch get-", # Getting metric data
"aws cloudwatch list-", # Listing metrics and alarms
],
"config": [
"aws configservice describe-", # Reading configuration info
"aws configservice get-", # Getting config data
"aws configservice list-", # Listing config resources
"aws configservice select-resource-config", # Querying config (read-only)
],
"guardduty": [
"aws guardduty describe-", # Reading detector info
"aws guardduty get-", # Getting findings and settings
"aws guardduty list-", # Listing GuardDuty resources
],
# Network & Data security
"ec2": [
"aws ec2 describe-", # All EC2 describe operations
"aws ec2 get-", # All EC2 get operations
# Network security specific commands
"aws ec2 describe-security-groups", # Reading security group configurations
"aws ec2 describe-network-acls", # Reading network ACL configurations
],
"s3": [
"aws s3 ls", # Listing buckets or objects (read-only)
"aws s3api get-", # All S3 API get operations (read-only)
"aws s3api list-", # All S3 API list operations (read-only)
"aws s3api head-", # All S3 API head operations (read-only)
# Security-specific S3 operations
"aws s3api get-bucket-policy", # Reading bucket policies
"aws s3api get-bucket-encryption", # Reading encryption settings
"aws s3api get-public-access-block", # Reading public access settings
],
}
# Default regex patterns for more complex rules that cannot be easily captured
# with simple command prefix matching
DEFAULT_REGEX_RULES: Dict[str, List[Dict[str, str]]] = {
# Security patterns that apply to all AWS services
"general": [
# Identity and Authentication Risks
{
"pattern": r"aws .* --profile\s+(root|admin|administrator)",
"description": "Prevent use of sensitive profiles",
"error_message": "Using sensitive profiles (root, admin) is restricted for security reasons.",
},
# Protocol and Encryption Risks
{
"pattern": r"aws .* --no-verify-ssl",
"description": "Prevent disabling SSL verification",
"error_message": "Disabling SSL verification is not allowed for security reasons.",
},
{
"pattern": r"aws .* --output\s+text\s+.*--query\s+.*Password",
"description": "Prevent password exposure in text output",
"error_message": "Outputting sensitive data like passwords in text format is restricted.",
},
# Parameter security
{
"pattern": r"aws .* --debug",
"description": "Prevent debug mode which shows sensitive info",
"error_message": "Debug mode is restricted as it may expose sensitive information.",
},
],
# IAM-specific security patterns
"iam": [
# Privileged user creation
{
"pattern": r"aws iam create-user.*--user-name\s+(root|admin|administrator|backup|security|finance|billing)",
"description": "Prevent creation of privileged-sounding users",
"error_message": "Creating users with sensitive names is restricted for security reasons.",
},
# Privilege escalation via policies
{
"pattern": r"aws iam attach-user-policy.*--policy-arn\s+.*Administrator",
"description": "Prevent attaching Administrator policies",
"error_message": "Attaching Administrator policies is restricted for security reasons.",
},
{
"pattern": r"aws iam attach-user-policy.*--policy-arn\s+.*FullAccess",
"description": "Prevent attaching FullAccess policies to users",
"error_message": "Attaching FullAccess policies directly to users is restricted (use roles instead).",
},
{
"pattern": r"aws iam create-policy.*\"Effect\":\s*\"Allow\".*\"Action\":\s*\"\*\".*\"Resource\":\s*\"\*\"",
"description": "Prevent creation of policies with * permissions",
"error_message": "Creating policies with unrestricted (*) permissions is not allowed.",
},
# Password and access key controls
{
"pattern": r"aws iam create-login-profile.*--password-reset-required\s+false",
"description": "Enforce password reset for new profiles",
"error_message": "Creating login profiles without requiring password reset is restricted.",
},
{
"pattern": r"aws iam update-account-password-policy.*--require-uppercase-characters\s+false",
"description": "Prevent weakening password policies",
"error_message": "Weakening account password policies is restricted.",
},
],
# Data security patterns
"s3": [
# Public access risks
{
"pattern": r"aws s3api put-bucket-policy.*\"Effect\":\s*\"Allow\".*\"Principal\":\s*\"\*\"",
"description": "Prevent public bucket policies",
"error_message": "Creating public bucket policies is restricted for security reasons.",
},
{
"pattern": r"aws s3api put-public-access-block.*--public-access-block-configuration\s+.*\"BlockPublicAcls\":\s*false",
"description": "Prevent disabling public access blocks",
"error_message": "Disabling S3 public access blocks is restricted for security reasons.",
},
# Encryption risks
{
"pattern": r"aws s3api create-bucket.*--region\s+(?!eu|us-east-1).*--acl\s+public",
"description": "Prevent public buckets outside of allowed regions",
"error_message": "Creating public buckets outside allowed regions is restricted.",
},
],
# Network security patterns
"ec2": [
# Network exposure risks
{
"pattern": r"aws ec2 authorize-security-group-ingress.*--cidr\s+0\.0\.0\.0/0.*--port\s+(?!80|443)[0-9]+",
"description": "Prevent open security groups for non-web ports",
"error_message": "Opening non-web ports (other than 80/443) to the entire internet (0.0.0.0/0) is restricted.",
},
{
"pattern": r"aws ec2 run-instances.*--user-data\s+.*curl.*\|.*sh",
"description": "Detect potentially unsafe user-data scripts",
"error_message": "Running scripts from remote sources in user-data presents security risks.",
},
],
# Logging and monitoring integrity
"cloudtrail": [
{
"pattern": r"aws cloudtrail update-trail.*--no-include-global-service-events",
"description": "Prevent disabling global event logging",
"error_message": "Disabling CloudTrail logging for global service events is restricted.",
},
{
"pattern": r"aws cloudtrail update-trail.*--no-multi-region",
"description": "Prevent making trails single-region",
"error_message": "Changing CloudTrail trails from multi-region to single-region is restricted.",
},
],
}
@dataclass
class ValidationRule:
"""Represents a command validation rule."""
pattern: str
description: str
error_message: str
regex: bool = False
@dataclass
class SecurityConfig:
"""Security configuration for command validation."""
dangerous_commands: Dict[str, List[str]]
safe_patterns: Dict[str, List[str]]
regex_rules: Dict[str, List[ValidationRule]] = field(default_factory=dict)
def __post_init__(self):
"""Initialize default values."""
if not self.regex_rules:
self.regex_rules = {}
def load_security_config() -> SecurityConfig:
"""Load security configuration from YAML file or use defaults.
Returns:
SecurityConfig object with loaded configuration
"""
dangerous_commands = DEFAULT_DANGEROUS_COMMANDS.copy()
safe_patterns = DEFAULT_SAFE_PATTERNS.copy()
regex_rules = {}
# Load default regex rules
for category, rules in DEFAULT_REGEX_RULES.items():
regex_rules[category] = []
for rule in rules:
regex_rules[category].append(
ValidationRule(
pattern=rule["pattern"],
description=rule["description"],
error_message=rule["error_message"],
regex=True,
)
)
# Load custom configuration if provided
if SECURITY_CONFIG_PATH:
config_path = Path(SECURITY_CONFIG_PATH)
if config_path.exists():
try:
with open(config_path) as f:
config_data = yaml.safe_load(f)
# Update dangerous commands
if "dangerous_commands" in config_data:
for service, commands in config_data["dangerous_commands"].items():
dangerous_commands[service] = commands
# Update safe patterns
if "safe_patterns" in config_data:
for service, patterns in config_data["safe_patterns"].items():
safe_patterns[service] = patterns
# Load custom regex rules
if "regex_rules" in config_data:
for category, rules in config_data["regex_rules"].items():
if category not in regex_rules:
regex_rules[category] = []
for rule in rules:
regex_rules[category].append(
ValidationRule(
pattern=rule["pattern"],
description=rule["description"],
error_message=rule.get("error_message", f"Command matches restricted pattern: {rule['pattern']}"),
regex=True,
)
)
logger.info(f"Loaded security configuration from {config_path}")
except Exception as e:
logger.error(f"Error loading security configuration: {str(e)}")
logger.warning("Using default security configuration")
return SecurityConfig(dangerous_commands=dangerous_commands, safe_patterns=safe_patterns, regex_rules=regex_rules)
# Initialize security configuration
SECURITY_CONFIG = load_security_config()
def is_service_command_safe(command: str, service: str) -> bool:
"""Check if a command for a specific service is safe.
This checks if a command that might match a dangerous pattern
also matches a safe pattern, which would override the dangerous check.
The function checks both:
1. Service-specific safe patterns (e.g., "aws iam list-")
2. General safe patterns that apply to any service (e.g., "--help")
Args:
command: The command to check
service: The AWS service being used
Returns:
True if the command is safe, False otherwise
"""
# First check service-specific safe patterns
if service in SECURITY_CONFIG.safe_patterns:
# Check if the command matches any safe pattern for this service
for safe_pattern in SECURITY_CONFIG.safe_patterns[service]:
if command.startswith(safe_pattern):
logger.debug(f"Command matches service-specific safe pattern: {safe_pattern}")
return True
# Then check general safe patterns that apply to all services
if "general" in SECURITY_CONFIG.safe_patterns:
for safe_pattern in SECURITY_CONFIG.safe_patterns["general"]:
if safe_pattern in command:
logger.debug(f"Command matches general safe pattern: {safe_pattern}")
return True
return False
def check_regex_rules(command: str, service: Optional[str] = None) -> Optional[str]:
"""Check command against regex rules.
Args:
command: The command to check
service: The AWS service being used, if known
Returns:
Error message if command matches a regex rule, None otherwise
"""
# Check general rules that apply to all commands
if "general" in SECURITY_CONFIG.regex_rules:
for rule in SECURITY_CONFIG.regex_rules["general"]:
pattern = re.compile(rule.pattern)
if pattern.search(command):
logger.warning(f"Command matches regex rule: {rule.description}")
return rule.error_message
# Check service-specific rules if service is provided
if service and service in SECURITY_CONFIG.regex_rules:
for rule in SECURITY_CONFIG.regex_rules[service]:
pattern = re.compile(rule.pattern)
if pattern.search(command):
logger.warning(f"Command matches service-specific regex rule for {service}: {rule.description}")
return rule.error_message
return None
def validate_aws_command(command: str) -> None:
"""Validate that the command is a proper AWS CLI command.
Args:
command: The AWS CLI command to validate
Raises:
ValueError: If the command is invalid
"""
logger.debug(f"Validating AWS command: {command}")
# Skip validation in permissive mode
if SECURITY_MODE.lower() == "permissive":
logger.warning(f"Running in permissive security mode, skipping validation for: {command}")
return
# Basic validation
cmd_parts = shlex.split(command)
if not cmd_parts or cmd_parts[0].lower() != "aws":
raise ValueError("Commands must start with 'aws'")
if len(cmd_parts) < 2:
raise ValueError("Command must include an AWS service (e.g., aws s3)")
# Get the service from the command
service = cmd_parts[1].lower()
# Check regex rules first (these apply regardless of service)
error_message = check_regex_rules(command, service)
if error_message:
raise ValueError(error_message)
# Check against dangerous commands for this service
if service in SECURITY_CONFIG.dangerous_commands:
# Check each dangerous command pattern
for dangerous_cmd in SECURITY_CONFIG.dangerous_commands[service]:
if command.startswith(dangerous_cmd):
# If it's a dangerous command, check if it's also in safe patterns
if is_service_command_safe(command, service):
return # Command is safe despite matching dangerous pattern
# Command is dangerous, raise an error
raise ValueError(
f"This command ({dangerous_cmd}) is restricted for security reasons. "
f"Please use a more specific, read-only command or add 'help' or '--help' to see available options."
)
logger.debug(f"Command validation successful: {command}")
def validate_pipe_command(pipe_command: str) -> None:
"""Validate a command that contains pipes.
This checks both AWS CLI commands and Unix commands within a pipe chain.
Args:
pipe_command: The piped command to validate
Raises:
ValueError: If any command in the pipe is invalid
"""
logger.debug(f"Validating pipe command: {pipe_command}")
# Skip validation in permissive mode
if SECURITY_MODE.lower() == "permissive":
logger.warning(f"Running in permissive security mode, skipping validation for: {pipe_command}")
return
commands = split_pipe_command(pipe_command)
if not commands:
raise ValueError("Empty command")
# First command must be an AWS CLI command
validate_aws_command(commands[0])
# Subsequent commands should be valid Unix commands
for i, cmd in enumerate(commands[1:], 1):
cmd_parts = shlex.split(cmd)
if not cmd_parts:
raise ValueError(f"Empty command at position {i} in pipe")
if not validate_unix_command(cmd):
raise ValueError(f"Command '{cmd_parts[0]}' at position {i} in pipe is not allowed. Only AWS commands and basic Unix utilities are permitted.")
logger.debug(f"Pipe command validation successful: {pipe_command}")
def reload_security_config() -> None:
"""Reload security configuration from file.
This allows for dynamic reloading of security rules without restarting the server.
"""
global SECURITY_CONFIG
SECURITY_CONFIG = load_security_config()
logger.info("Security configuration reloaded")
def validate_command(command: str) -> None:
"""Centralized validation for all commands.
This is the main entry point for command validation. The validation follows a multi-step process:
1. Check if security validation should be skipped (permissive mode)
2. Determine command type (piped or regular AWS command)
3. For regular AWS commands:
a. Verify basic structure (starts with 'aws' and has a service)
b. Check against regex rules for complex patterns
c. Check if it matches any dangerous command patterns
d. If dangerous, check if it also matches any safe patterns
4. For piped commands:
a. Validate the AWS portion as above
b. Validate that pipe targets are allowed Unix commands
Args:
command: The command to validate
Raises:
ValueError: If the command is invalid with a descriptive error message
"""
logger.debug(f"Validating command: {command}")
# Step 1: Skip validation in permissive mode
if SECURITY_MODE.lower() == "permissive":
logger.warning(f"Running in permissive security mode, skipping validation for: {command}")
return
# Step 2: Determine command type and validate accordingly
if is_pipe_command(command):
validate_pipe_command(command)
else:
validate_aws_command(command)
logger.debug(f"Command validation successful: {command}")
```
--------------------------------------------------------------------------------
/tests/unit/test_resources.py:
--------------------------------------------------------------------------------
```python
"""Unit tests for AWS MCP Server resources module.
These tests verify that the resources functionality in the resources module
works correctly, with appropriate mocking to avoid actual AWS API calls.
"""
import os
from unittest.mock import MagicMock, patch
import pytest
from botocore.exceptions import ClientError
from aws_mcp_server.resources import (
_get_region_description,
_get_region_geographic_location,
_mask_key,
get_aws_account_info,
get_aws_environment,
get_aws_profiles,
get_aws_regions,
get_region_available_services,
get_region_details,
register_resources,
)
@pytest.fixture
def mock_config_files(monkeypatch, tmp_path):
"""Create mock AWS config and credentials files for testing."""
config_dir = tmp_path / ".aws"
config_dir.mkdir()
# Create mock config file
config_file = config_dir / "config"
config_file.write_text("[default]\nregion = us-west-2\n\n[profile dev]\nregion = us-east-1\n\n[profile prod]\nregion = eu-west-1\n")
# Create mock credentials file
creds_file = config_dir / "credentials"
creds_file.write_text(
"[default]\n"
"aws_access_key_id = AKIADEFAULT000000000\n"
"aws_secret_access_key = 1234567890abcdef1234567890abcdef\n"
"\n"
"[dev]\n"
"aws_access_key_id = AKIADEV0000000000000\n"
"aws_secret_access_key = abcdef1234567890abcdef1234567890\n"
"\n"
"[test]\n" # Profile in credentials but not in config
"aws_access_key_id = AKIATEST000000000000\n"
"aws_secret_access_key = test1234567890abcdef1234567890ab\n"
)
# Set environment to use these files
monkeypatch.setenv("HOME", str(tmp_path))
return tmp_path
def test_get_aws_profiles(mock_config_files):
"""Test retrieving AWS profiles from config files."""
profiles = get_aws_profiles()
# Should find all profiles from both files, with no duplicates
assert set(profiles) == {"default", "dev", "prod", "test"}
@patch("boto3.session.Session")
def test_get_aws_regions(mock_session):
"""Test retrieving AWS regions with mocked boto3."""
# Mock boto3 session and client response
mock_ec2 = MagicMock()
mock_session.return_value.client.return_value = mock_ec2
mock_ec2.describe_regions.return_value = {
"Regions": [
{"RegionName": "us-east-1"},
{"RegionName": "us-west-2"},
{"RegionName": "eu-central-1"},
]
}
regions = get_aws_regions()
# Check regions are properly formatted
assert len(regions) == 3
assert regions[0]["RegionName"] == "eu-central-1" # Sorted alphabetically
assert regions[0]["RegionDescription"] == "EU Central (Frankfurt)"
assert regions[1]["RegionName"] == "us-east-1"
assert regions[2]["RegionName"] == "us-west-2"
# Verify correct session and client creation
mock_session.assert_called_once()
mock_session.return_value.client.assert_called_once_with("ec2")
@patch("boto3.session.Session")
def test_get_aws_regions_fallback(mock_session):
"""Test fallback behavior when region retrieval fails."""
# Mock boto3 to raise an exception
mock_session.return_value.client.side_effect = ClientError({"Error": {"Code": "AccessDenied", "Message": "Access denied"}}, "DescribeRegions")
regions = get_aws_regions()
# Should fall back to static region list
assert len(regions) >= 12 # Should include at least the major regions
assert any(r["RegionName"] == "us-east-1" for r in regions)
assert any(r["RegionName"] == "eu-west-1" for r in regions)
@patch("boto3.session.Session")
def test_get_aws_environment(mock_session, monkeypatch):
"""Test retrieving AWS environment information."""
# Set up environment variables
monkeypatch.setenv("AWS_PROFILE", "test-profile")
monkeypatch.setenv("AWS_REGION", "us-west-2")
# Mock boto3 credentials
mock_credentials = MagicMock()
mock_credentials.method = "shared-credentials-file"
mock_session.return_value.get_credentials.return_value = mock_credentials
env_info = get_aws_environment()
# Check environment information
assert env_info["aws_profile"] == "test-profile"
assert env_info["aws_region"] == "us-west-2"
assert env_info["has_credentials"] is True
assert env_info["credentials_source"] == "profile"
@patch("boto3.session.Session")
def test_get_aws_environment_no_credentials(mock_session, monkeypatch):
"""Test environment info with no credentials."""
# Clear relevant environment variables
for var in ["AWS_PROFILE", "AWS_REGION", "AWS_DEFAULT_REGION", "AWS_ACCESS_KEY_ID"]:
if var in os.environ:
monkeypatch.delenv(var)
# No credentials available
mock_session.return_value.get_credentials.return_value = None
env_info = get_aws_environment()
# Check environment information with defaults
assert env_info["aws_profile"] == "default"
assert env_info["aws_region"] == "us-east-1"
assert env_info["has_credentials"] is False
assert env_info["credentials_source"] == "none"
@patch("boto3.session.Session")
def test_get_aws_account_info(mock_session):
"""Test retrieving AWS account information."""
# Mock boto3 clients
mock_sts = MagicMock()
mock_iam = MagicMock()
mock_org = MagicMock()
mock_session.return_value.client.side_effect = lambda service: {"sts": mock_sts, "iam": mock_iam, "organizations": mock_org}[service]
# Mock API responses
mock_sts.get_caller_identity.return_value = {"Account": "123456789012"}
mock_iam.list_account_aliases.return_value = {"AccountAliases": ["my-account"]}
mock_org.describe_organization.return_value = {"OrganizationId": "o-abcdef1234"}
account_info = get_aws_account_info()
# Check account information
assert account_info["account_id"] == "123456789012"
assert account_info["account_alias"] == "my-account"
assert account_info["organization_id"] == "o-abcdef1234"
@patch("boto3.session.Session")
def test_get_aws_account_info_minimal(mock_session):
"""Test account info with minimal permissions."""
# Mock boto3 sts client, but iam/org calls fail
mock_sts = MagicMock()
def mock_client(service):
if service == "sts":
return mock_sts
elif service == "iam":
raise ClientError({"Error": {"Code": "AccessDenied"}}, "ListAccountAliases")
else:
raise ClientError({"Error": {"Code": "AccessDenied"}}, "DescribeAccount")
mock_session.return_value.client.side_effect = mock_client
# Mock API response
mock_sts.get_caller_identity.return_value = {"Account": "123456789012"}
account_info = get_aws_account_info()
# Should have account ID but not alias or org ID
assert account_info["account_id"] == "123456789012"
assert account_info["account_alias"] is None
assert account_info["organization_id"] is None
def test_register_resources():
"""Test registering MCP resources."""
# Mock MCP instance
mock_mcp = MagicMock()
# Register resources
register_resources(mock_mcp)
# Verify resource registration
assert mock_mcp.resource.call_count == 5 # Should register 5 resources
# Check that resource was called with the correct URIs, names and descriptions
expected_resources = [
{"uri": "aws://config/profiles", "name": "aws_profiles", "description": "Get available AWS profiles"},
{"uri": "aws://config/regions", "name": "aws_regions", "description": "Get available AWS regions"},
{"uri": "aws://config/regions/{region}", "name": "aws_region_details", "description": "Get detailed information about a specific AWS region"},
{"uri": "aws://config/environment", "name": "aws_environment", "description": "Get AWS environment information"},
{"uri": "aws://config/account", "name": "aws_account", "description": "Get AWS account information"},
]
# Extract parameters from each call
for call in mock_mcp.resource.call_args_list:
found = False
for resource in expected_resources:
if resource["uri"] == call.kwargs.get("uri"):
# Check name and description too
assert call.kwargs.get("name") == resource["name"]
assert call.kwargs.get("description") == resource["description"]
found = True
break
assert found, f"URI {call.kwargs.get('uri')} not found in expected resources"
def test_get_region_description():
"""Test the region description utility function."""
# Test known regions
assert _get_region_description("us-east-1") == "US East (N. Virginia)"
assert _get_region_description("eu-west-2") == "EU West (London)"
assert _get_region_description("ap-southeast-1") == "Asia Pacific (Singapore)"
# Test unknown regions
assert _get_region_description("unknown-region-1") == "AWS Region unknown-region-1"
assert _get_region_description("test-region-2") == "AWS Region test-region-2"
def test_mask_key():
"""Test the key masking utility function."""
# Test empty input
assert _mask_key("") == ""
# Test short keys (less than 3 chars)
assert _mask_key("a") == "a"
assert _mask_key("ab") == "ab"
# Test longer keys
assert _mask_key("abc") == "abc"
assert _mask_key("abcd") == "abc*"
assert _mask_key("abcdef") == "abc***"
assert _mask_key("AKIAIOSFODNN7EXAMPLE") == "AKI*****************"
@patch("configparser.ConfigParser")
@patch("os.path.exists")
def test_get_aws_profiles_exception(mock_exists, mock_config_parser):
"""Test exception handling in get_aws_profiles."""
# Setup mocks
mock_exists.return_value = True
mock_parser_instance = MagicMock()
mock_config_parser.return_value = mock_parser_instance
# Simulate an exception when reading the config
mock_parser_instance.read.side_effect = Exception("Config file error")
# Call function
profiles = get_aws_profiles()
# Verify profiles contains only the default profile
assert profiles == ["default"]
assert mock_parser_instance.read.called
@patch("boto3.session.Session")
def test_get_aws_regions_generic_exception(mock_session):
"""Test general exception handling in get_aws_regions."""
# Mock boto3 to raise a generic exception (not ClientError)
mock_session.return_value.client.side_effect = Exception("Generic error")
# Call function
regions = get_aws_regions()
# Should return empty list for general exceptions
assert len(regions) == 0
assert isinstance(regions, list)
@patch("boto3.session.Session")
def test_get_aws_environment_credential_methods(mock_session):
"""Test different credential methods in get_aws_environment."""
# Set up different credential methods to test
test_cases = [
("environment", "environment"),
("iam-role", "instance-profile"),
("assume-role", "assume-role"),
("container-role", "container-role"),
("unknown-method", "profile"), # Should fall back to "profile" for unknown methods
]
for method, expected_source in test_cases:
# Reset mock
mock_session.reset_mock()
# Set up mock credentials
mock_credentials = MagicMock()
mock_credentials.method = method
mock_session.return_value.get_credentials.return_value = mock_credentials
# Call function
env_info = get_aws_environment()
# Verify credential source
assert env_info["has_credentials"] is True
assert env_info["credentials_source"] == expected_source
@patch("boto3.session.Session")
def test_get_aws_environment_exception(mock_session):
"""Test exception handling in get_aws_environment."""
# Mock boto3 to raise an exception
mock_session.return_value.get_credentials.side_effect = Exception("Credential error")
# Call function
env_info = get_aws_environment()
# Should still return valid env info with default values
assert env_info["aws_profile"] == "default"
assert env_info["aws_region"] == "us-east-1"
assert env_info["has_credentials"] is False
assert env_info["credentials_source"] == "none"
@patch("boto3.session.Session")
def test_get_aws_account_info_with_org(mock_session):
"""Test AWS account info with organization access."""
# Mock boto3 clients
mock_sts = MagicMock()
mock_iam = MagicMock()
mock_org = MagicMock()
mock_session.return_value.client.side_effect = lambda service: {"sts": mock_sts, "iam": mock_iam, "organizations": mock_org}[service]
# Mock API responses
mock_sts.get_caller_identity.return_value = {"Account": "123456789012"}
mock_iam.list_account_aliases.return_value = {"AccountAliases": ["my-account"]}
# Mock org response for describe_organization
mock_org.describe_organization.return_value = {"OrganizationId": None}
# Call function
account_info = get_aws_account_info()
# Verify account info (organization_id should be None)
assert account_info["account_id"] == "123456789012"
assert account_info["account_alias"] == "my-account"
assert account_info["organization_id"] is None
@patch("boto3.session.Session")
def test_get_aws_account_info_general_exception(mock_session):
"""Test general exception handling in get_aws_account_info."""
# Mock boto3 to raise a generic exception
mock_session.return_value.client.side_effect = Exception("Generic error")
# Call function
account_info = get_aws_account_info()
# All fields should be None
assert account_info["account_id"] is None
assert account_info["account_alias"] is None
assert account_info["organization_id"] is None
@patch("aws_mcp_server.resources.get_aws_profiles")
@patch("os.environ.get")
def test_resource_aws_profiles(mock_environ_get, mock_get_aws_profiles):
"""Test the aws_profiles resource function implementation."""
# Set up environment mocks
mock_environ_get.return_value = "test-profile"
# Set up profiles mock
mock_get_aws_profiles.return_value = ["default", "test-profile", "dev"]
# Create a mock function that simulates the decorated function
# Note: We need to call the mocked functions, not the original ones
async def mock_resource_function():
profiles = mock_get_aws_profiles.return_value
current_profile = mock_environ_get.return_value
return {"profiles": [{"name": profile, "is_current": profile == current_profile} for profile in profiles]}
# Call the function
import asyncio
result = asyncio.run(mock_resource_function())
# Verify the result
assert "profiles" in result
assert len(result["profiles"]) == 3
# Check that current profile is marked
current_profile = None
for profile in result["profiles"]:
if profile["is_current"]:
current_profile = profile["name"]
assert current_profile == "test-profile"
@patch("aws_mcp_server.resources.get_aws_regions")
@patch("os.environ.get")
def test_resource_aws_regions(mock_environ_get, mock_get_aws_regions):
"""Test the aws_regions resource function implementation."""
# Set up environment mocks to return us-west-2 for either AWS_REGION or AWS_DEFAULT_REGION
mock_environ_get.side_effect = lambda key, default=None: "us-west-2" if key in ("AWS_REGION", "AWS_DEFAULT_REGION") else default
# Set up regions mock
mock_get_aws_regions.return_value = [
{"RegionName": "us-east-1", "RegionDescription": "US East (N. Virginia)"},
{"RegionName": "us-west-2", "RegionDescription": "US West (Oregon)"},
]
# Create a mock function that simulates the decorated function
# Note: We need to call the mocked functions, not the original ones
async def mock_resource_function():
regions = mock_get_aws_regions.return_value
current_region = "us-west-2" # From the mock_environ_get.side_effect
return {
"regions": [
{
"name": region["RegionName"],
"description": region["RegionDescription"],
"is_current": region["RegionName"] == current_region,
}
for region in regions
]
}
# Call the function
import asyncio
result = asyncio.run(mock_resource_function())
# Verify the result
assert "regions" in result
assert len(result["regions"]) == 2
# Check that current region is marked
current_region = None
for region in result["regions"]:
if region["is_current"]:
current_region = region["name"]
assert current_region == "us-west-2"
@patch("aws_mcp_server.resources.get_aws_environment")
def test_resource_aws_environment(mock_get_aws_environment):
"""Test the aws_environment resource function implementation."""
# Set up environment mock
mock_env = {
"aws_profile": "test-profile",
"aws_region": "us-west-2",
"has_credentials": True,
"credentials_source": "profile",
}
mock_get_aws_environment.return_value = mock_env
# Create a mock function that simulates the decorated function
# Note: We need to call the mocked function, not the original one
async def mock_resource_function():
return mock_get_aws_environment.return_value
# Call the function
import asyncio
result = asyncio.run(mock_resource_function())
# Verify the result is the same as the mock env
assert result == mock_env
@patch("aws_mcp_server.resources.get_aws_account_info")
def test_resource_aws_account(mock_get_aws_account_info):
"""Test the aws_account resource function implementation."""
# Set up account info mock
mock_account_info = {
"account_id": "123456789012",
"account_alias": "test-account",
"organization_id": "o-abcdef123456",
}
mock_get_aws_account_info.return_value = mock_account_info
# Create a mock function that simulates the decorated function
# Note: We need to call the mocked function, not the original one
async def mock_resource_function():
return mock_get_aws_account_info.return_value
# Call the function
import asyncio
result = asyncio.run(mock_resource_function())
# Verify the result is the same as the mock account info
assert result == mock_account_info
def test_get_region_geographic_location():
"""Test the region geographic location utility function."""
# Test known regions
us_east_1 = _get_region_geographic_location("us-east-1")
assert us_east_1["continent"] == "North America"
assert us_east_1["country"] == "United States"
assert us_east_1["city"] == "Ashburn, Virginia"
eu_west_2 = _get_region_geographic_location("eu-west-2")
assert eu_west_2["continent"] == "Europe"
assert eu_west_2["country"] == "United Kingdom"
assert eu_west_2["city"] == "London"
# Test unknown region
unknown = _get_region_geographic_location("unknown-region")
assert unknown["continent"] == "Unknown"
assert unknown["country"] == "Unknown"
assert unknown["city"] == "Unknown"
@patch("boto3.session.Session")
def test_get_region_available_services(mock_session):
"""Test retrieving available AWS services for a region using Service Quotas API."""
# Mock the Service Quotas client
mock_quotas_client = MagicMock()
# Set up the mock session to return our mock clients
def mock_client(service_name, **kwargs):
if service_name == "service-quotas":
return mock_quotas_client
return MagicMock()
mock_session.return_value.client.side_effect = mock_client
# Mock the Service Quotas API response
mock_quotas_client.list_services.return_value = {
"Services": [
{"ServiceCode": "AWS.EC2", "ServiceName": "Amazon Elastic Compute Cloud"},
{"ServiceCode": "AWS.S3", "ServiceName": "Amazon Simple Storage Service"},
{"ServiceCode": "Lambda", "ServiceName": "AWS Lambda"},
{"ServiceCode": "Organizations", "ServiceName": "AWS Organizations"},
{"ServiceCode": "AWS.CloudFormation", "ServiceName": "AWS CloudFormation"},
],
"NextToken": None,
}
# Call the function
services = get_region_available_services(mock_session.return_value, "us-east-1")
# Verify the results
assert len(services) == 5
# Verify service code transformations
assert {"id": "ec2", "name": "Amazon Elastic Compute Cloud"} in services
assert {"id": "s3", "name": "Amazon Simple Storage Service"} in services
assert {"id": "lambda", "name": "AWS Lambda"} in services
assert {"id": "organizations", "name": "AWS Organizations"} in services
assert {"id": "cloudformation", "name": "AWS CloudFormation"} in services
# Verify the API was called correctly
mock_quotas_client.list_services.assert_called_once()
@patch("boto3.session.Session")
def test_get_region_available_services_pagination(mock_session):
"""Test pagination handling in Service Quotas API."""
# Mock the Service Quotas client
mock_quotas_client = MagicMock()
# Set up the mock session to return our mock client
mock_session.return_value.client.return_value = mock_quotas_client
# Mock paginated responses
mock_quotas_client.list_services.side_effect = [
{
"Services": [
{"ServiceCode": "AWS.EC2", "ServiceName": "Amazon Elastic Compute Cloud"},
{"ServiceCode": "AWS.S3", "ServiceName": "Amazon Simple Storage Service"},
],
"NextToken": "next-token-1",
},
{
"Services": [{"ServiceCode": "Lambda", "ServiceName": "AWS Lambda"}, {"ServiceCode": "AWS.DynamoDB", "ServiceName": "Amazon DynamoDB"}],
"NextToken": None,
},
]
# Call the function
services = get_region_available_services(mock_session.return_value, "us-east-1")
# Verify the results
assert len(services) == 4
# Verify the pagination was handled correctly
assert mock_quotas_client.list_services.call_count == 2
# First call should have no NextToken
mock_quotas_client.list_services.assert_any_call()
# Second call should include the NextToken
mock_quotas_client.list_services.assert_any_call(NextToken="next-token-1")
@patch("boto3.session.Session")
def test_get_region_available_services_fallback(mock_session):
"""Test fallback to client creation when Service Quotas API fails."""
# Mock the session to raise an exception for Service Quotas
def mock_client(service_name, **kwargs):
if service_name == "service-quotas":
raise ClientError({"Error": {"Code": "AccessDenied"}}, "ListServices")
# For other services, return a mock to simulate success
return MagicMock()
mock_session.return_value.client.side_effect = mock_client
# Call the function
services = get_region_available_services(mock_session.return_value, "us-east-1")
# Verify we got results from fallback method
assert len(services) > 0
# At least these common services should be in the result
common_service_ids = [service["id"] for service in services]
for service_id in ["ec2", "s3", "lambda"]:
assert service_id in common_service_ids
# Verify services have the correct structure
for service in services:
assert "id" in service
assert "name" in service
@patch("aws_mcp_server.resources.get_region_available_services")
@patch("boto3.session.Session")
def test_get_region_details(mock_session, mock_get_region_available_services):
"""Test retrieving detailed AWS region information."""
# Mock the boto3 session and clients
mock_ec2 = MagicMock()
# Handle different boto3 client calls
def mock_client(service_name, **kwargs):
if service_name == "ec2":
return mock_ec2
# Return a mock for other services
return MagicMock()
mock_session.return_value.client.side_effect = mock_client
# Mock EC2 availability zones response
mock_ec2.describe_availability_zones.return_value = {
"AvailabilityZones": [
{"ZoneName": "us-east-1a", "State": "available", "ZoneId": "use1-az1", "ZoneType": "availability-zone"},
{"ZoneName": "us-east-1b", "State": "available", "ZoneId": "use1-az2", "ZoneType": "availability-zone"},
]
}
# Mock the services list
mock_services = [{"id": "ec2", "name": "EC2"}, {"id": "s3", "name": "S3"}, {"id": "lambda", "name": "Lambda"}]
mock_get_region_available_services.return_value = mock_services
# Call the function being tested
region_details = get_region_details("us-east-1")
# Verify basic region information
assert region_details["code"] == "us-east-1"
assert region_details["name"] == "US East (N. Virginia)"
# Verify geographic location information
geo_location = region_details["geographic_location"]
assert geo_location["continent"] == "North America"
assert geo_location["country"] == "United States"
assert geo_location["city"] == "Ashburn, Virginia"
# Verify availability zones
assert len(region_details["availability_zones"]) == 2
assert region_details["availability_zones"][0]["name"] == "us-east-1a"
assert region_details["availability_zones"][1]["name"] == "us-east-1b"
# Verify services
assert region_details["services"] == mock_services
mock_get_region_available_services.assert_called_once_with(mock_session.return_value, "us-east-1")
@patch("aws_mcp_server.resources.get_region_available_services")
@patch("boto3.session.Session")
def test_get_region_details_with_error(mock_session, mock_get_region_available_services):
"""Test region details with API errors."""
# Mock boto3 to raise an exception
mock_session.return_value.client.side_effect = ClientError({"Error": {"Code": "AccessDenied", "Message": "Access denied"}}, "DescribeAvailabilityZones")
# Mock the get_region_available_services function to return an empty list
mock_get_region_available_services.return_value = []
# Call the function being tested
region_details = get_region_details("us-east-1")
# Should still return basic information even if AWS APIs fail
assert region_details["code"] == "us-east-1"
assert region_details["name"] == "US East (N. Virginia)"
assert "geographic_location" in region_details
assert len(region_details["availability_zones"]) == 0
assert region_details["services"] == []
mock_get_region_available_services.assert_called_once_with(mock_session.return_value, "us-east-1")
@patch("aws_mcp_server.resources.get_region_details")
def test_resource_aws_region_details(mock_get_region_details):
"""Test the aws_region_details resource function implementation."""
# Set up region details mock
mock_region_details = {
"code": "us-east-1",
"name": "US East (N. Virginia)",
"geographic_location": {"continent": "North America", "country": "United States", "city": "Ashburn, Virginia"},
"availability_zones": [
{"name": "us-east-1a", "state": "available", "zone_id": "use1-az1", "zone_type": "availability-zone"},
{"name": "us-east-1b", "state": "available", "zone_id": "use1-az2", "zone_type": "availability-zone"},
],
"services": [{"id": "ec2", "name": "EC2"}, {"id": "s3", "name": "S3"}, {"id": "lambda", "name": "Lambda"}],
"is_current": True,
}
mock_get_region_details.return_value = mock_region_details
# Create a mock function that simulates the decorated function
async def mock_resource_function(region: str):
return mock_get_region_details(region)
# Call the function
import asyncio
result = asyncio.run(mock_resource_function("us-east-1"))
# Verify the function was called with the correct region code
mock_get_region_details.assert_called_once_with("us-east-1")
# Verify the result is the same as the mock details
assert result == mock_region_details
```
--------------------------------------------------------------------------------
/src/aws_mcp_server/prompts.py:
--------------------------------------------------------------------------------
```python
"""AWS CLI prompt definitions for the AWS MCP Server.
This module provides a collection of useful prompt templates for common AWS use cases.
These prompts help ensure consistent best practices and efficient AWS resource management.
"""
import logging
logger = logging.getLogger(__name__)
def register_prompts(mcp):
"""Register all prompts with the MCP server instance.
Args:
mcp: The FastMCP server instance
"""
logger.info("Registering AWS prompt templates")
@mcp.prompt(name="create_resource", description="Generate AWS CLI commands to create common AWS resources with best practices")
def create_resource(resource_type: str, resource_name: str) -> str:
"""Generate AWS CLI commands to create common AWS resources with best practices.
Args:
resource_type: Type of AWS resource to create (e.g., s3-bucket, ec2-instance, lambda)
resource_name: Name for the new resource
Returns:
Formatted prompt string for resource creation
"""
return f"""Generate the AWS CLI commands to create a new {resource_type} named {resource_name}
following AWS Well-Architected Framework best practices.
Please include:
1. The primary creation command with appropriate security settings
2. Any supporting resources needed (roles, policies, etc.)
3. Required tagging commands (Name, Environment, Purpose, Owner, Cost-Center)
4. Security hardening commands to enforce principle of least privilege
5. Encryption and data protection configuration
6. Verification commands to confirm successful creation
Ensure the solution includes:
- Proper encryption at rest and in transit
- Secure access control mechanisms
- Resource policies with appropriate permissions
- Monitoring and logging setup with CloudWatch
- Cost optimization considerations
For IAM roles and policies, follow the principle of least privilege and explain any important
security considerations specific to this resource type."""
@mcp.prompt(name="security_audit", description="Generate AWS CLI commands for performing a security audit on a service")
def security_audit(service: str) -> str:
"""Generate AWS CLI commands for performing a security audit on a service.
Args:
service: AWS service to audit (e.g., s3, ec2, iam, rds)
Returns:
Formatted prompt string for security auditing
"""
return f"""Generate AWS CLI commands to perform a comprehensive security audit
of {service} resources in my AWS account according to AWS Security Hub and Well-Architected Framework.
Include commands to:
1. Identify resources with public access, excessive permissions, or security group vulnerabilities
2. Detect weak or unused security configurations and access controls
3. Check for unencrypted data (both at rest and in transit)
4. Verify enabled logging and monitoring capabilities
5. Assess IAM roles and policies attached to resources for overly permissive settings
6. Check for resource compliance with CIS AWS Foundations Benchmark
7. Identify potential security misconfigurations based on AWS security best practices
8. Detect unused credentials, access keys, and permissions
Also provide:
- Security findings categorized by severity (High, Medium, Low)
- A prioritized list of remediation steps with corresponding CLI commands
- Recommendations to implement automated security checks using AWS Config Rules"""
@mcp.prompt(name="cost_optimization", description="Generate AWS CLI commands for cost optimization recommendations")
def cost_optimization(service: str) -> str:
"""Generate AWS CLI commands for cost optimization recommendations.
Args:
service: AWS service to optimize costs for
Returns:
Formatted prompt string for cost optimization
"""
return f"""Generate AWS CLI commands to identify cost optimization opportunities
for {service} in my AWS account using AWS Cost Explorer and other cost management tools.
Include commands to:
1. Find unused, idle, or underutilized resources with detailed utilization metrics
2. Identify resources that could be rightsized, downsized, or use a different pricing model
3. Detect patterns of usage that could benefit from Reserved Instances, Savings Plans, or Spot instances
4. Analyze resources without proper cost allocation tags and suggest tagging strategies
5. Generate a detailed cost breakdown by resource for the past 30 days
6. Identify optimal instance families based on workload patterns
7. Find opportunities to utilize AWS Graviton processors for better price-performance ratio
8. Check for resources that can leverage multi-region strategies for cost efficiency
Also provide:
- Cost-saving estimates for each recommendation
- Commands to implement automated cost management using AWS Budgets
- Scripts to schedule automated start/stop for dev/test environments
- Best practices for implementing FinOps for {service}"""
@mcp.prompt(name="resource_inventory", description="Generate AWS CLI commands to inventory resources for a service")
def resource_inventory(service: str, region: str = "all") -> str:
"""Generate AWS CLI commands to inventory resources for a service.
Args:
service: AWS service to inventory (e.g., s3, ec2, rds)
region: AWS region or "all" for multi-region inventory
Returns:
Formatted prompt string for resource inventory
"""
region_text = f"in the {region} region" if region != "all" else "across all regions"
return f"""Generate AWS CLI commands to create a comprehensive inventory
of all {service} resources {region_text}.
Include commands to:
1. List all resources with their key properties, metadata, and creation dates
2. Show resource relationships, dependencies, and associated infrastructure
3. Display resource tags, ownership information, and cost allocation
4. Identify untagged, potentially abandoned, or non-compliant resources
5. Export the inventory in structured formats (JSON, CSV) for further analysis
6. Group resources by type, status, size, and configuration
7. Include usage metrics and performance data where applicable
8. List attached IAM roles, policies, and security configurations
Structure the commands to output to easily parsable formats that can be programmatically processed.
Include jq filters to transform complex JSON output into useful summaries."""
@mcp.prompt(name="troubleshoot_service", description="Generate AWS CLI commands for troubleshooting service issues")
def troubleshoot_service(service: str, resource_id: str) -> str:
"""Generate AWS CLI commands for troubleshooting service issues.
Args:
service: AWS service to troubleshoot (e.g., ec2, rds, lambda)
resource_id: ID of the specific resource having issues
Returns:
Formatted prompt string for troubleshooting
"""
return f"""Generate AWS CLI commands to troubleshoot issues with {service}
resource {resource_id} using a systematic diagnostic approach.
Include commands to:
1. Check resource status, health, configuration, and performance metrics
2. Review recent changes, modifications, deployments, or infrastructure updates
3. Examine detailed logs, metrics, alarm history, and error patterns from CloudWatch
4. Verify network connectivity, security groups, NACLs, and routing settings
5. Diagnose potential service limits, throttling, or quota issues
6. Check for dependent services and connectivity between resources
7. Analyze IAM permissions and resource policies that might affect access
8. Validate configuration against AWS best practices and common failure patterns
Structure the troubleshooting as a systematic process from:
- Basic health and status verification
- Configuration and recent changes analysis
- Performance and resource utilization assessment
- Network and connectivity validation
- IAM and security verification
- Dependent services analysis
- Logging and monitoring data collection
Include commands to collect all relevant diagnostic information into a single report that can be shared with AWS Support if needed."""
@mcp.prompt(name="iam_policy_generator", description="Generate least-privilege IAM policies for specific services and actions")
def iam_policy_generator(service: str, actions: str, resource_pattern: str = "*") -> str:
"""Generate least-privilege IAM policies for specific services and actions.
Args:
service: AWS service for the policy (e.g., s3, dynamodb)
actions: Comma-separated list of actions (e.g., "GetObject,PutObject")
resource_pattern: Resource ARN pattern (e.g., "arn:aws:s3:::my-bucket/*")
Returns:
Formatted prompt string for IAM policy generation
"""
return f"""Generate a least-privilege IAM policy that allows only the required permissions
for {service} with these specific actions: {actions}.
Resource pattern: {resource_pattern}
The policy should:
1. Follow AWS IAM security best practices and use the latest policy structure
2. Include only the minimum permissions needed for the stated actions
3. Use proper condition keys to restrict access by source IP, VPC, time, MFA, etc.
4. Implement appropriate resource-level permissions where supported
5. Include explanatory comments for each permission block
6. Use AWS managed policies where appropriate to reduce maintenance overhead
7. Be ready to use with the AWS CLI for policy creation
Also provide:
- The AWS CLI command to apply this policy to a role or user
- Best practice recommendations for using policy boundaries
- Explanation of potential security impact if permissions are too broad
- Alternative permissions strategies if applicable (e.g., attribute-based access control)"""
@mcp.prompt(name="service_monitoring", description="Generate AWS CLI commands to set up monitoring for a service")
def service_monitoring(service: str, metric_type: str = "performance") -> str:
"""Generate AWS CLI commands to set up monitoring for a service.
Args:
service: AWS service to monitor (e.g., ec2, rds, lambda)
metric_type: Type of metrics to monitor (e.g., performance, cost, security)
Returns:
Formatted prompt string for monitoring setup
"""
return f"""Generate AWS CLI commands to set up comprehensive {metric_type} monitoring
for {service} resources using CloudWatch, X-Ray, and other observability tools.
Include commands to:
1. Create CloudWatch dashboards with relevant metrics and service-specific KPIs
2. Set up appropriate CloudWatch alarms with actionable thresholds and anomaly detection
3. Configure detailed logging with Log Insights queries for common analysis patterns
4. Enable AWS X-Ray tracing for distributed systems analysis where applicable
5. Create SNS topics and subscription for multi-channel notifications (email, Slack, PagerDuty)
6. Set up metric filters to extract critical information from log patterns
7. Configure composite alarms for complex monitoring scenarios
8. Enable AWS Service Health Dashboard notifications for service issues
The monitoring solution should include:
- Resource-specific metrics that indicate health and performance
- Operational thresholds based on industry best practices
- Multi-tier alerting with different severity levels
- Automated remediation actions where appropriate
- Integration with incident management workflows
Ensure the commands follow operational excellence best practices from the Well-Architected Framework."""
@mcp.prompt(name="disaster_recovery", description="Generate AWS CLI commands to implement disaster recovery for a service")
def disaster_recovery(service: str, recovery_point_objective: str = "1 hour") -> str:
"""Generate AWS CLI commands to implement disaster recovery for a service.
Args:
service: AWS service to protect (e.g., ec2, rds, dynamodb)
recovery_point_objective: Target RPO (e.g., "1 hour", "15 minutes")
Returns:
Formatted prompt string for DR setup
"""
return f"""Generate AWS CLI commands to implement a disaster recovery solution
for {service} with a Recovery Point Objective (RPO) of {recovery_point_objective} and minimal Recovery Time Objective (RTO).
Include commands to:
1. Configure appropriate backup mechanisms (snapshots, replication, AWS Backup)
2. Set up cross-region or cross-account redundancy with proper data synchronization
3. Create automation for recovery processes using AWS Systems Manager documents
4. Implement comprehensive monitoring and alerting for backup failures
5. Define validation procedures to verify recovery readiness and integrity
6. Setup regular DR testing through automation
7. Configure failover mechanisms and DNS routing strategies using Route 53
8. Implement data integrity checks for backups and replicas
The solution should:
- Balance cost effectiveness with meeting the specified RPO
- Follow AWS Well-Architected Framework best practices for reliability
- Include automated recovery procedures that minimize manual intervention
- Provide appropriate IAM roles and permissions for DR operations
- Consider regional service availability differences
- Include both data and configuration recovery"""
@mcp.prompt(name="compliance_check", description="Generate AWS CLI commands to check compliance with standards")
def compliance_check(compliance_standard: str, service: str = "all") -> str:
"""Generate AWS CLI commands to check compliance with standards.
Args:
compliance_standard: Compliance standard to check (e.g., "HIPAA", "PCI", "GDPR")
service: Specific AWS service or "all" for account-wide checks
Returns:
Formatted prompt string for compliance checking
"""
service_scope = f"for {service}" if service != "all" else "across all relevant services"
return f"""Generate AWS CLI commands to assess {compliance_standard} compliance {service_scope}
using AWS Config, AWS Security Hub, and AWS Audit Manager.
Include commands to:
1. Identify resources that may not meet {compliance_standard} compliance requirements
2. Check encryption settings, key management, and data protection measures
3. Audit access controls, authentication mechanisms, and privilege management
4. Verify logging, monitoring configurations, and audit trail completeness
5. Assess network security, isolation, and boundary protection
6. Evaluate resource configurations against specific {compliance_standard} controls
7. Check for compliant tagging and resource documentation
8. Analyze retention policies for backups, logs, and archived data
Also provide:
- Remediation commands for common compliance gaps with {compliance_standard}
- Explanation of specific {compliance_standard} requirements being checked
- Commands to generate compliance reports using AWS Audit Manager
- Instructions to set up continuous compliance monitoring
- Best practices for maintaining ongoing compliance"""
@mcp.prompt(name="resource_cleanup", description="Generate AWS CLI commands to identify and cleanup unused resources")
def resource_cleanup(service: str, criteria: str = "unused") -> str:
"""Generate AWS CLI commands to identify and cleanup unused resources.
Args:
service: AWS service to cleanup (e.g., ec2, ebs, rds)
criteria: Criteria for cleanup (e.g., "unused", "old", "untagged")
Returns:
Formatted prompt string for resource cleanup
"""
return f"""Generate AWS CLI commands to identify and safely clean up {criteria} {service} resources
to reduce costs and improve account hygiene.
Include commands to:
1. Identify resources matching the {criteria} criteria with appropriate filters and metrics
2. Generate a detailed report of resources before deletion for review and approval
3. Create backups, snapshots, or exports where appropriate before removal
4. Safely delete or terminate the identified resources with proper validation
5. Verify successful cleanup and calculate actual cost savings
6. Check for orphaned dependent resources (volumes, snapshots, ENIs)
7. Identify resources that could be scheduled for regular cleanup
8. Capture resource metadata before deletion for audit purposes
The commands should include:
- Appropriate safeguards to prevent accidental deletion of critical resources
- Dry-run options to preview changes before execution
- Validation checks to ensure resources are truly unused
- Tag-based identification of approved resources to preserve
- Staged approach that isolates resources before deletion
- Estimate of cost savings from cleanup activities
Follow AWS operational best practices and include error handling."""
@mcp.prompt(name="serverless_deployment", description="Generate AWS CLI commands to deploy a serverless application")
def serverless_deployment(application_name: str, runtime: str = "python3.13") -> str:
"""Generate AWS CLI commands to deploy a serverless application.
Args:
application_name: Name for the serverless application
runtime: Runtime environment (e.g., "python3.13", "nodejs20.x", "java17")
Returns:
Formatted prompt string for serverless deployment
"""
return f"""Generate AWS CLI commands to deploy a serverless application named {application_name}
using AWS SAM, Lambda, API Gateway, and DynamoDB with {runtime} runtime.
Include commands to:
1. Initialize a new SAM application with best practices structure
2. Create necessary Lambda functions with appropriate IAM roles
3. Set up API Gateway endpoints with proper authorization
4. Deploy DynamoDB tables with optimal capacity and indexing
5. Configure CloudWatch Logs and X-Ray tracing
6. Set up CI/CD pipeline using AWS CodePipeline
7. Implement proper versioning and deployment strategies (canary, linear)
8. Create CloudFormation custom resources if needed
The deployment should follow serverless best practices:
- Appropriate function timeouts and memory allocation
- Least privilege IAM permissions for each component
- Parameter Store or Secrets Manager for configuration
- Proper error handling and dead-letter queues
- Efficient cold start optimization
- Secure API authorization (JWT, IAM, Cognito)
- Cost-effective resource utilization
Include commands to verify the deployment and test the application endpoints."""
@mcp.prompt(name="container_orchestration", description="Generate AWS CLI commands to set up container orchestration")
def container_orchestration(cluster_name: str, service_type: str = "fargate") -> str:
"""Generate AWS CLI commands to set up container orchestration.
Args:
cluster_name: Name for the ECS/EKS cluster
service_type: Type of service (e.g., "fargate", "ec2", "eks")
Returns:
Formatted prompt string for container deployment
"""
return f"""Generate AWS CLI commands to set up a container orchestration environment
with a {service_type} cluster named {cluster_name} following AWS best practices.
Include commands to:
1. Create the {service_type} cluster with appropriate networking and security settings
2. Set up necessary IAM roles, task execution roles, and service roles
3. Configure task definitions with optimal resource allocation
4. Deploy services with appropriate scaling policies and load balancing
5. Implement service discovery and container insights monitoring
6. Set up logging and metric collection for containers
7. Configure secrets management for sensitive configuration
8. Implement proper security controls (ECR scanning, networking)
The commands should address:
- Proper networking design with security groups and VPC settings
- Auto-scaling based on CPU, memory, and custom metrics
- CI/CD pipeline integration for container deployment
- Health checks and graceful deployment strategies
- Container image security scanning and validation
- Efficient resource utilization and cost management
- High availability across multiple availability zones
- Secrets and environment variable management
Include validation commands to verify successful deployment and access."""
@mcp.prompt(name="vpc_network_design", description="Generate AWS CLI commands to design and deploy a secure VPC")
def vpc_network_design(vpc_name: str, cidr_block: str = "10.0.0.0/16") -> str:
"""Generate AWS CLI commands to design and deploy a secure VPC.
Args:
vpc_name: Name for the VPC
cidr_block: CIDR block for the VPC (e.g., "10.0.0.0/16")
Returns:
Formatted prompt string for VPC design
"""
return f"""Generate AWS CLI commands to design and deploy a secure, well-architected VPC
named {vpc_name} with CIDR block {cidr_block} following AWS networking best practices.
Include commands to:
1. Create the VPC with appropriate DNS and tenancy settings
2. Set up public and private subnets across multiple availability zones
3. Configure Internet Gateway, NAT Gateways, and route tables
4. Implement Network ACLs and security groups with least-privilege rules
5. Set up VPC endpoints for AWS services to improve security
6. Configure VPC Flow Logs for network traffic monitoring
7. Implement Transit Gateway or VPC Peering if needed
8. Set up DNS management with Route 53
The VPC design should include:
- High availability across at least 3 availability zones
- Secure subnet segmentation (public, private, data)
- Proper CIDR block allocation for future expansion
- Security controls at multiple layers (NACLs, security groups)
- Efficient routing and traffic flow optimization
- Private connectivity to AWS services using endpoints
- Network traffic monitoring and logging
- Disaster recovery considerations
Include validation commands to verify the network connectivity and security."""
@mcp.prompt(name="infrastructure_automation", description="Generate AWS CLI commands for infrastructure automation")
def infrastructure_automation(resource_type: str, automation_scope: str = "deployment") -> str:
"""Generate AWS CLI commands for infrastructure automation.
Args:
resource_type: Type of AWS resource to automate (e.g., ec2, rds, lambda)
automation_scope: Type of automation (e.g., "deployment", "scaling", "patching")
Returns:
Formatted prompt string for infrastructure automation
"""
return f"""Generate AWS CLI commands to implement {automation_scope} automation
for {resource_type} resources using AWS Systems Manager, CloudFormation, and EventBridge.
Include commands to:
1. Create automation documents or CloudFormation templates for consistent {automation_scope}
2. Set up EventBridge rules to trigger automation on schedule or event patterns
3. Configure necessary IAM roles and permissions with least privilege
4. Implement parameter validation and error handling in automation scripts
5. Set up notification and reporting for automation results
6. Create maintenance windows and safe deployment practices
7. Implement automated rollback mechanisms for failures
8. Configure cross-account or cross-region automation if needed
The automation solution should:
- Minimize manual intervention while maintaining appropriate approvals
- Include proper logging and audit trails for all activities
- Handle edge cases and failure scenarios gracefully
- Scale to manage multiple resources efficiently
- Follow infrastructure as code best practices
- Include proper testing and validation steps
- Respect maintenance windows and business hours
- Provide detailed reporting and status tracking
Include commands to validate the automation and test it in a controlled environment."""
@mcp.prompt(name="security_posture_assessment", description="Generate AWS CLI commands for comprehensive security posture assessment")
def security_posture_assessment() -> str:
"""Generate AWS CLI commands for comprehensive security posture assessment.
Returns:
Formatted prompt string for security assessment
"""
return """Generate AWS CLI commands to perform a comprehensive security posture assessment
across your AWS environment using Security Hub, IAM Access Analyzer, and GuardDuty.
Include commands to:
1. Enable and configure AWS Security Hub with appropriate standards
2. Setup AWS Config for resource configuration monitoring
3. Enable GuardDuty for threat detection across all regions
4. Configure IAM Access Analyzer to identify external access
5. Review CloudTrail for complete activity logging coverage
6. Assess S3 bucket policies and access controls
7. Analyze password policies and MFA implementation
8. Evaluate network security groups and NACLs
The assessment should check for:
- Identity and access management best practices
- Data protection mechanisms and encryption
- Infrastructure security configurations
- Detective controls and logging completeness
- Compliance with industry standards (CIS, NIST, PCI)
- Privileged access management
- Potential lateral movement paths
- Security monitoring and incident response readiness
Include commands to generate comprehensive reports of findings organized by severity,
and provide remediation steps for common security issues."""
@mcp.prompt(name="performance_tuning", description="Generate AWS CLI commands for performance tuning of AWS resources")
def performance_tuning(service: str, resource_id: str) -> str:
"""Generate AWS CLI commands for performance tuning of AWS resources.
Args:
service: AWS service to optimize (e.g., rds, ec2, lambda)
resource_id: ID of the specific resource to tune
Returns:
Formatted prompt string for performance tuning
"""
return f"""Generate AWS CLI commands to analyze and tune the performance of {service}
resource {resource_id} based on metrics, benchmarks, and AWS best practices.
Include commands to:
1. Gather detailed performance metrics using CloudWatch over various time periods
2. Analyze resource configuration and compare to recommended settings
3. Identify performance bottlenecks and resource constraints
4. Modify configuration parameters for optimal performance
5. Implement caching strategies if applicable
6. Adjust scaling policies and resource allocation
7. Configure enhanced monitoring for detailed insights
8. Benchmark performance before and after changes
The performance tuning approach should:
- Establish baseline performance metrics before changes
- Target specific performance issues with measured approaches
- Consider workload patterns and usage characteristics
- Balance performance improvements with cost implications
- Implement changes in staged approach with validation
- Document performance gains and configuration changes
- Address both immediate bottlenecks and long-term scaling
Include commands to verify performance improvements and monitor for regressions."""
@mcp.prompt(name="multi_account_governance", description="Generate AWS CLI commands to implement multi-account governance")
def multi_account_governance(account_type: str = "organization") -> str:
"""Generate AWS CLI commands to implement multi-account governance.
Args:
account_type: Type of account structure (e.g., "organization", "control tower")
Returns:
Formatted prompt string for multi-account governance
"""
return f"""Generate AWS CLI commands to implement robust multi-account governance
using AWS Organizations, Control Tower, and {account_type} best practices.
Include commands to:
1. Set up organizational units (OUs) with logical account grouping
2. Implement service control policies (SCPs) for security guardrails
3. Configure centralized logging with CloudTrail and CloudWatch Logs
4. Set up cross-account IAM roles with least privilege
5. Implement tag policies and resource tagging strategies
6. Configure AWS Config for multi-account compliance monitoring
7. Set up centralized security monitoring with Security Hub
8. Implement account baselining and standardization
The governance framework should address:
- Preventative guardrails using SCPs and permission boundaries
- Detective controls with centralized logging and monitoring
- Cost management and billing consolidation
- Standardized network architecture across accounts
- Identity federation and cross-account access
- Centralized audit and compliance reporting
- Automated account provisioning and baseline configuration
- Resource sharing and cross-account service usage
Include guidance on implementing a secure landing zone and account structure."""
logger.info("Successfully registered all AWS prompt templates")
```